2 * Stream Implementation
7 #include <linux/errno.h>
8 #include <linux/percpu.h>
9 #include <linux/sched.h>
10 #include <linux/uaccess.h>
12 #include <linux/poll.h>
13 #include <linux/anon_inodes.h>
14 #include <linux/file.h>
17 #define sint64_t int64_t
18 #include <interfaces/vmm_stream.h>
19 #include "linux-exts.h"
20 #include "util-ringbuffer.h"
25 // This is probably overkill
26 #define STREAM_RING_LEN 4096
27 #define STREAM_NAME_LEN 128
31 static struct list_head global_streams;
36 char name[STREAM_NAME_LEN];
38 struct ringbuf * out_ring;
42 wait_queue_head_t user_poll_queue;
46 struct v3_guest * guest;
47 struct list_head stream_node;
49 struct v3_stream * v3_stream;
53 // Currently just the list of open streams
54 struct vm_global_streams {
55 struct list_head open_streams;
62 static struct stream_state * find_stream_by_name(struct v3_guest * guest, const char * name) {
63 struct stream_state * stream = NULL;
64 struct list_head * stream_list = NULL;
65 struct vm_global_streams * vm_state = NULL;
68 stream_list = &global_streams;
70 vm_state = get_vm_ext_data(guest, "STREAM_INTERFACE");
72 if (vm_state == NULL) {
73 ERROR("ERROR: Could not locate vm stream state for extension STREAM_INTERFACE\n");
77 stream_list = &(vm_state->open_streams);
80 list_for_each_entry(stream, stream_list, stream_node) {
81 if (strncmp(stream->name, name, STREAM_NAME_LEN) == 0) {
90 // host->user nonblocking data flow
91 static ssize_t stream_read(struct file * filp, char __user * buf, size_t size, loff_t * offset) {
92 struct stream_state * stream = filp->private_data;
93 ssize_t bytes_read = 0;
98 kern_buf = palacios_alloc(size);
101 ERROR("Cannot allocate space for buffer\n");
105 palacios_spinlock_lock_irqsave(&(stream->lock), flags);
106 bytes_read = ringbuf_read(stream->out_ring,kern_buf,size);
107 palacios_spinlock_unlock_irqrestore(&(stream->lock), flags);
110 if (copy_to_user(buf, kern_buf, bytes_read)) {
111 ERROR("Read Fault\n");
112 palacios_free(kern_buf);
115 palacios_free(kern_buf);
118 } else if (bytes_read==0) {
120 palacios_free(kern_buf);
122 } else { // bytes_read<0
123 ERROR("Read failed\n");
124 palacios_free(kern_buf);
131 stream_poll(struct file * filp, struct poll_table_struct * poll_tb) {
132 struct stream_state * stream = filp->private_data;
140 poll_wait(filp, &(stream->user_poll_queue), poll_tb);
142 palacios_spinlock_lock_irqsave(&(stream->lock), flags);
143 data_avail = ringbuf_data_len(stream->out_ring);
144 palacios_spinlock_unlock_irqrestore(&(stream->lock), flags);
146 if (data_avail > 0) {
147 return POLLIN | POLLRDNORM;
155 // Non-blocking user->Host->VM data flow
157 static ssize_t stream_write(struct file * filp, const char __user * buf, size_t size, loff_t * offset) {
158 struct stream_state * stream = filp->private_data;
159 char * kern_buf = NULL;
160 ssize_t bytes_written = 0;
162 kern_buf = palacios_alloc(size);
165 ERROR("Cannot allocate buffer in stream interface\n");
169 if (copy_from_user(kern_buf, buf, size)) {
170 ERROR("Stream Write Failed\n");
171 palacios_free(kern_buf);
175 bytes_written = stream->v3_stream->input(stream->v3_stream, kern_buf, size);
177 // could end up with zero here
178 if (bytes_written<0) {
179 ERROR("Error on writing to stream\n");
180 palacios_free(kern_buf);
184 if (bytes_written==0) {
185 // This is somewhat bogus, since
186 // the FD is treated as non-blocking regardless
187 palacios_free(kern_buf);
191 palacios_free(kern_buf);
192 return bytes_written;
196 static int stream_release(struct inode * i, struct file * filp) {
197 struct stream_state * stream = filp->private_data;
200 palacios_spinlock_lock_irqsave(&(stream->lock), flags);
201 stream->connected = 0;
202 palacios_spinlock_unlock_irqrestore(&(stream->lock), flags);
209 static struct file_operations stream_fops = {
211 .write = stream_write,
212 .release = stream_release,
218 static void * palacios_stream_open(struct v3_stream * v3_stream, const char * name, void * private_data) {
219 struct v3_guest * guest = (struct v3_guest *)private_data;
220 struct stream_state * stream = NULL;
221 struct vm_global_streams * vm_state = NULL;
224 vm_state = get_vm_ext_data(guest, "STREAM_INTERFACE");
226 if (vm_state == NULL) {
227 ERROR("ERROR: Could not locate vm stream state for extension STREAM_INTERFACE\n");
232 if (find_stream_by_name(guest, name) != NULL) {
233 ERROR("Stream already exists\n");
237 stream = palacios_alloc(sizeof(struct stream_state));
239 ERROR("Unable to allocate stream\n");
242 memset(stream, 0, sizeof(struct stream_state));
244 stream->out_ring = create_ringbuf(STREAM_RING_LEN);
245 stream->v3_stream = v3_stream;
246 stream->guest = guest;
247 stream->connected = 0;
249 strncpy(stream->name, name, STREAM_NAME_LEN - 1);
251 init_waitqueue_head(&(stream->user_poll_queue));
252 palacios_spinlock_init(&(stream->lock));
255 list_add(&(stream->stream_node), &(global_streams));
257 list_add(&(stream->stream_node), &(vm_state->open_streams));
265 // Non-blocking VM->host data flow
267 static sint64_t palacios_stream_output(struct v3_stream * v3_stream, uint8_t * buf, sint64_t len) {
268 struct stream_state * stream = (struct stream_state *)v3_stream->host_stream_data;
269 sint64_t bytes_written = 0;
274 palacios_spinlock_lock_irqsave(&(stream->lock), flags);
275 bytes_written=ringbuf_write(stream->out_ring, buf, len);
276 palacios_spinlock_unlock_irqrestore(&(stream->lock), flags);
278 if (bytes_written<0) {
279 // we ended in an error, so push back to VM
280 return bytes_written;
282 // normal situation, tell it how much we handled
283 wake_up_interruptible(&(stream->user_poll_queue));
284 return bytes_written;
290 static void palacios_stream_close(struct v3_stream * v3_stream) {
291 struct stream_state * stream = (struct stream_state *)v3_stream->host_stream_data;
293 free_ringbuf(stream->out_ring);
294 list_del(&(stream->stream_node));
295 palacios_spinlock_deinit(&(stream->lock));
296 palacios_free(stream);
300 static struct v3_stream_hooks palacios_stream_hooks = {
301 .open = palacios_stream_open,
302 .output = palacios_stream_output,
303 .close = palacios_stream_close,
307 static int stream_init( void ) {
308 INIT_LIST_HEAD(&(global_streams));
309 V3_Init_Stream(&palacios_stream_hooks);
315 static int stream_deinit( void ) {
316 struct stream_state * stream = NULL;
317 struct stream_state * tmp = NULL;
319 list_for_each_entry_safe(stream, tmp, &(global_streams), stream_node) {
320 free_ringbuf(stream->out_ring);
321 list_del(&(stream->stream_node));
322 palacios_free(stream);
332 static int stream_connect(struct v3_guest * guest, unsigned int cmd, unsigned long arg, void * priv_data) {
333 void __user * argp = (void __user *)arg;
334 struct stream_state * stream = NULL;
336 char name[STREAM_NAME_LEN];
337 unsigned long flags = 0;
341 if (copy_from_user(name, argp, STREAM_NAME_LEN)) {
342 ERROR("%s(%d): copy from user error...\n", __FILE__, __LINE__);
346 stream = find_stream_by_name(guest, name);
348 if (stream == NULL) {
349 ERROR("Could not find stream (%s)\n", name);
353 palacios_spinlock_lock_irqsave(&(stream->lock), flags);
354 if (stream->connected == 0) {
355 stream->connected = 1;
358 palacios_spinlock_unlock_irqrestore(&(stream->lock), flags);
362 ERROR("Stream (%s) already connected\n", name);
367 stream_fd = anon_inode_getfd("v3-stream", &stream_fops, stream, O_RDWR);
370 ERROR("Error creating stream inode for (%s)\n", name);
374 INFO("Stream (%s) connected\n", name);
380 static int guest_stream_init(struct v3_guest * guest, void ** vm_data) {
381 struct vm_global_streams * state = palacios_alloc(sizeof(struct vm_global_streams));
384 ERROR("Unable to allocate state in stream init\n");
388 INIT_LIST_HEAD(&(state->open_streams));
391 add_guest_ctrl(guest, V3_VM_STREAM_CONNECT, stream_connect, state);
397 static int guest_stream_deinit(struct v3_guest * guest, void * vm_data) {
398 struct vm_global_streams * state = vm_data;
400 struct stream_state * stream = NULL;
401 struct stream_state * tmp = NULL;
403 list_for_each_entry_safe(stream, tmp, &(global_streams), stream_node) {
404 free_ringbuf(stream->out_ring);
405 list_del(&(stream->stream_node));
406 palacios_free(stream);
409 palacios_free(state);
416 static struct linux_ext stream_ext = {
417 .name = "STREAM_INTERFACE",
419 .deinit = stream_deinit,
420 .guest_init = guest_stream_init,
421 .guest_deinit = guest_stream_deinit
425 register_extension(&stream_ext);