2 * Stream Implementation
7 #include <linux/errno.h>
8 #include <linux/percpu.h>
9 #include <linux/sched.h>
10 #include <linux/uaccess.h>
12 #include <linux/poll.h>
13 #include <linux/anon_inodes.h>
14 #include <linux/file.h>
17 #include <interfaces/vmm_stream.h>
18 #include "linux-exts.h"
19 #include "util-ringbuffer.h"
24 // This is probably overkill
25 #define STREAM_RING_LEN 4096
26 #define STREAM_NAME_LEN 128
30 static struct list_head global_streams;
35 char name[STREAM_NAME_LEN];
37 struct ringbuf * out_ring;
41 wait_queue_head_t intr_queue;
44 struct v3_guest * guest;
45 struct list_head stream_node;
47 struct v3_stream * v3_stream;
51 // Currently just the list of open streams
52 struct vm_global_streams {
53 struct list_head open_streams;
60 static struct stream_state * find_stream_by_name(struct v3_guest * guest, const char * name) {
61 struct stream_state * stream = NULL;
62 struct list_head * stream_list = NULL;
63 struct vm_global_streams * vm_state = NULL;
66 stream_list = &global_streams;
68 vm_state = get_vm_ext_data(guest, "STREAM_INTERFACE");
70 if (vm_state == NULL) {
71 ERROR("ERROR: Could not locate vm stream state for extension STREAM_INTERFACE\n");
75 stream_list = &(vm_state->open_streams);
78 list_for_each_entry(stream, stream_list, stream_node) {
79 if (strncmp(stream->name, name, STREAM_NAME_LEN) == 0) {
89 #define TMP_BUF_LEN 128
91 static ssize_t stream_read(struct file * filp, char __user * buf, size_t size, loff_t * offset) {
92 struct stream_state * stream = filp->private_data;
93 ssize_t bytes_read = 0;
94 ssize_t bytes_left = size;
96 char tmp_buf[TMP_BUF_LEN];
97 ssize_t total_bytes_left = 0;
99 // memset(tmp_buf, 0, TMP_BUF_LEN);
101 while (bytes_left > 0) {
102 int tmp_len = (TMP_BUF_LEN > bytes_left) ? bytes_left : TMP_BUF_LEN;
105 palacios_spinlock_lock_irqsave(&(stream->lock), flags);
106 tmp_read = ringbuf_read(stream->out_ring, tmp_buf, tmp_len);
107 palacios_spinlock_unlock_irqrestore(&(stream->lock), flags);
110 // If userspace reads more than we have
114 if (copy_to_user(buf + bytes_read, tmp_buf, tmp_read)) {
115 ERROR("Read Fault\n");
119 bytes_left -= tmp_read;
120 bytes_read += tmp_read;
124 palacios_spinlock_lock_irqsave(&(stream->lock), flags);
125 total_bytes_left = ringbuf_data_len(stream->out_ring);
126 palacios_spinlock_unlock_irqrestore(&(stream->lock), flags);
128 if (total_bytes_left > 0) {
129 wake_up_interruptible(&(stream->intr_queue));
136 stream_poll(struct file * filp, struct poll_table_struct * poll_tb) {
137 struct stream_state * stream = filp->private_data;
138 unsigned int mask = POLLIN | POLLRDNORM;
142 poll_wait(filp, &(stream->intr_queue), poll_tb);
144 palacios_spinlock_lock_irqsave(&(stream->lock), flags);
145 data_avail = ringbuf_data_len(stream->out_ring);
146 palacios_spinlock_unlock_irqrestore(&(stream->lock), flags);
148 if (data_avail > 0) {
156 static ssize_t stream_write(struct file * filp, const char __user * buf, size_t size, loff_t * offset) {
157 struct stream_state * stream = filp->private_data;
158 char * kern_buf = NULL;
159 ssize_t bytes_written = 0;
161 kern_buf = palacios_alloc(size);
164 ERROR("Cannot allocate buffer in stream interface\n");
168 if (copy_from_user(kern_buf, buf, size)) {
169 ERROR("Stream Write Failed\n");
170 palacios_free(kern_buf);
174 bytes_written = stream->v3_stream->input(stream->v3_stream, kern_buf, size);
176 palacios_free(kern_buf);
178 return bytes_written;
182 static int stream_release(struct inode * i, struct file * filp) {
183 struct stream_state * stream = filp->private_data;
186 palacios_spinlock_lock_irqsave(&(stream->lock), flags);
187 stream->connected = 0;
188 palacios_spinlock_unlock_irqrestore(&(stream->lock), flags);
195 static struct file_operations stream_fops = {
197 .write = stream_write,
198 .release = stream_release,
204 static void * palacios_stream_open(struct v3_stream * v3_stream, const char * name, void * private_data) {
205 struct v3_guest * guest = (struct v3_guest *)private_data;
206 struct stream_state * stream = NULL;
207 struct vm_global_streams * vm_state = NULL;
210 vm_state = get_vm_ext_data(guest, "STREAM_INTERFACE");
212 if (vm_state == NULL) {
213 ERROR("ERROR: Could not locate vm stream state for extension STREAM_INTERFACE\n");
218 if (find_stream_by_name(guest, name) != NULL) {
219 ERROR("Stream already exists\n");
223 stream = palacios_alloc(sizeof(struct stream_state));
225 ERROR("Unable to allocate stream\n");
228 memset(stream, 0, sizeof(struct stream_state));
230 stream->out_ring = create_ringbuf(STREAM_RING_LEN);
231 stream->v3_stream = v3_stream;
232 stream->guest = guest;
233 stream->connected = 0;
235 strncpy(stream->name, name, STREAM_NAME_LEN - 1);
237 init_waitqueue_head(&(stream->intr_queue));
238 palacios_spinlock_init(&(stream->lock));
241 list_add(&(stream->stream_node), &(global_streams));
243 list_add(&(stream->stream_node), &(vm_state->open_streams));
250 static uint64_t palacios_stream_output(struct v3_stream * v3_stream, char * buf, int len) {
251 struct stream_state * stream = (struct stream_state *)v3_stream->host_stream_data;
252 int bytes_written = 0;
256 if (stream->connected == 0) {
260 while (bytes_written < len) {
261 palacios_spinlock_lock_irqsave(&(stream->lock), flags);
262 bytes_written += ringbuf_write(stream->out_ring, buf + bytes_written, len - bytes_written);
263 palacios_spinlock_unlock_irqrestore(&(stream->lock), flags);
265 wake_up_interruptible(&(stream->intr_queue));
267 if (bytes_written < len) {
268 // not enough space in ringbuffer, activate user space to drain it
274 return bytes_written;
278 static void palacios_stream_close(struct v3_stream * v3_stream) {
279 struct stream_state * stream = (struct stream_state *)v3_stream->host_stream_data;
281 free_ringbuf(stream->out_ring);
282 list_del(&(stream->stream_node));
283 palacios_free(stream);
287 static struct v3_stream_hooks palacios_stream_hooks = {
288 .open = palacios_stream_open,
289 .output = palacios_stream_output,
290 .close = palacios_stream_close,
294 static int stream_init( void ) {
295 INIT_LIST_HEAD(&(global_streams));
296 V3_Init_Stream(&palacios_stream_hooks);
302 static int stream_deinit( void ) {
303 struct stream_state * stream = NULL;
304 struct stream_state * tmp = NULL;
306 list_for_each_entry_safe(stream, tmp, &(global_streams), stream_node) {
307 free_ringbuf(stream->out_ring);
308 list_del(&(stream->stream_node));
309 palacios_free(stream);
319 static int stream_connect(struct v3_guest * guest, unsigned int cmd, unsigned long arg, void * priv_data) {
320 void __user * argp = (void __user *)arg;
321 struct stream_state * stream = NULL;
323 char name[STREAM_NAME_LEN];
324 unsigned long flags = 0;
328 if (copy_from_user(name, argp, STREAM_NAME_LEN)) {
329 ERROR("%s(%d): copy from user error...\n", __FILE__, __LINE__);
333 stream = find_stream_by_name(guest, name);
335 if (stream == NULL) {
336 ERROR("Could not find stream (%s)\n", name);
340 palacios_spinlock_lock_irqsave(&(stream->lock), flags);
341 if (stream->connected == 0) {
342 stream->connected = 1;
345 palacios_spinlock_unlock_irqrestore(&(stream->lock), flags);
349 ERROR("Stream (%s) already connected\n", name);
354 stream_fd = anon_inode_getfd("v3-stream", &stream_fops, stream, O_RDWR);
357 ERROR("Error creating stream inode for (%s)\n", name);
361 INFO("Stream (%s) connected\n", name);
367 static int guest_stream_init(struct v3_guest * guest, void ** vm_data) {
368 struct vm_global_streams * state = palacios_alloc(sizeof(struct vm_global_streams));
371 ERROR("Unable to allocate state in stream init\n");
375 INIT_LIST_HEAD(&(state->open_streams));
378 add_guest_ctrl(guest, V3_VM_STREAM_CONNECT, stream_connect, state);
384 static int guest_stream_deinit(struct v3_guest * guest, void * vm_data) {
385 struct vm_global_streams * state = vm_data;
387 struct stream_state * stream = NULL;
388 struct stream_state * tmp = NULL;
390 list_for_each_entry_safe(stream, tmp, &(global_streams), stream_node) {
391 free_ringbuf(stream->out_ring);
392 list_del(&(stream->stream_node));
393 palacios_free(stream);
396 palacios_free(state);
403 static struct linux_ext stream_ext = {
404 .name = "STREAM_INTERFACE",
406 .deinit = stream_deinit,
407 .guest_init = guest_stream_init,
408 .guest_deinit = guest_stream_deinit
412 register_extension(&stream_ext);