X-Git-Url: http://v3vee.org/palacios/gitweb/gitweb.cgi?a=blobdiff_plain;f=linux_module%2Fiface-stream.c;h=207f5f02c8dec12cb984fe52395f79ff7fded7ca;hb=d22c11cec4e8c3390bfe6bf16ed07f5d073f0d4a;hp=1a590525dce46c97ecaa94cf2c7268003c1c3fbb;hpb=44169db748b8d205300eb2752412bd33f6f82ca4;p=palacios.git diff --git a/linux_module/iface-stream.c b/linux_module/iface-stream.c index 1a59052..207f5f0 100644 --- a/linux_module/iface-stream.c +++ b/linux_module/iface-stream.c @@ -1,22 +1,9 @@ /* - * This file is part of the Palacios Virtual Machine Monitor developed - * by the V3VEE Project with funding from the United States National - * Science Foundation and the Department of Energy. - * - * The V3VEE Project is a joint project between Northwestern University - * and the University of New Mexico. You can find out more at - * http://www.v3vee.org - * - * Copyright (c) 2010, Lei Xia - * Copyright (c) 2010, The V3VEE Project - * All rights reserved. - * - * This is free software. You are permitted to use, redistribute, - * and modify it under the terms of the GNU General Public License - * Version 2 (GPLv2). The accompanying COPYING file contains the - * full text of the license. + * Stream Implementation + * (c) Lei Xia 2010 */ + #include #include #include @@ -27,45 +14,55 @@ #include +#define sint64_t int64_t #include #include "linux-exts.h" #include "util-ringbuffer.h" #include "vm.h" -#include "iface-stream.h" -// This is going to need to be a lot bigger... -#define STREAM_BUF_SIZE 1024 +// This is probably overkill +#define STREAM_RING_LEN 4096 +#define STREAM_NAME_LEN 128 static struct list_head global_streams; -struct stream_buffer { + + +struct stream_state { char name[STREAM_NAME_LEN]; - struct ringbuf * buf; + + struct ringbuf * out_ring; int connected; - wait_queue_head_t intr_queue; + wait_queue_head_t user_poll_queue; + spinlock_t lock; struct v3_guest * guest; struct list_head stream_node; + + struct v3_stream * v3_stream; }; // Currently just the list of open streams -struct vm_stream_state { +struct vm_global_streams { struct list_head open_streams; }; -static struct stream_buffer * find_stream_by_name(struct v3_guest * guest, const char * name) { - struct stream_buffer * stream = NULL; + + + +static struct stream_state * find_stream_by_name(struct v3_guest * guest, const char * name) { + struct stream_state * stream = NULL; struct list_head * stream_list = NULL; - struct vm_stream_state * vm_state = NULL; + struct vm_global_streams * vm_state = NULL; if (guest == NULL) { stream_list = &global_streams; @@ -73,7 +70,7 @@ static struct stream_buffer * find_stream_by_name(struct v3_guest * guest, const vm_state = get_vm_ext_data(guest, "STREAM_INTERFACE"); if (vm_state == NULL) { - printk("ERROR: Could not locate vm stream state for extension STREAM_INTERFACE\n"); + ERROR("ERROR: Could not locate vm stream state for extension STREAM_INTERFACE\n"); return NULL; } @@ -90,53 +87,170 @@ static struct stream_buffer * find_stream_by_name(struct v3_guest * guest, const } - +// host->user nonblocking data flow static ssize_t stream_read(struct file * filp, char __user * buf, size_t size, loff_t * offset) { - struct stream_buffer * stream = filp->private_data; + struct stream_state * stream = filp->private_data; + ssize_t bytes_read = 0; + unsigned long flags; + char *kern_buf; + + + kern_buf = palacios_alloc(size); + + if (!kern_buf) { + ERROR("Cannot allocate space for buffer\n"); + return -1; + } + + palacios_spinlock_lock_irqsave(&(stream->lock), flags); + bytes_read = ringbuf_read(stream->out_ring,kern_buf,size); + palacios_spinlock_unlock_irqrestore(&(stream->lock), flags); + + if (bytes_read>0) { + if (copy_to_user(buf, kern_buf, bytes_read)) { + ERROR("Read Fault\n"); + palacios_free(kern_buf); + return -EFAULT; + } else { + palacios_free(kern_buf); + return bytes_read; + } + } else if (bytes_read==0) { + // out of space + palacios_free(kern_buf); + return -EWOULDBLOCK; + } else { // bytes_read<0 + ERROR("Read failed\n"); + palacios_free(kern_buf); + return -EFAULT; + } +} + + +static unsigned int +stream_poll(struct file * filp, struct poll_table_struct * poll_tb) { + struct stream_state * stream = filp->private_data; + unsigned long flags; + int data_avail = 0; + + if (!stream) { + return POLLERR; + } + + poll_wait(filp, &(stream->user_poll_queue), poll_tb); + + palacios_spinlock_lock_irqsave(&(stream->lock), flags); + data_avail = ringbuf_data_len(stream->out_ring); + palacios_spinlock_unlock_irqrestore(&(stream->lock), flags); + + if (data_avail > 0) { + return POLLIN | POLLRDNORM; + } + + return 0; + +} + +// +// Non-blocking user->Host->VM data flow +// +static ssize_t stream_write(struct file * filp, const char __user * buf, size_t size, loff_t * offset) { + struct stream_state * stream = filp->private_data; + char * kern_buf = NULL; + ssize_t bytes_written = 0; + + kern_buf = palacios_alloc(size); + + if (!kern_buf) { + ERROR("Cannot allocate buffer in stream interface\n"); + return -EFAULT; + } + + if (copy_from_user(kern_buf, buf, size)) { + ERROR("Stream Write Failed\n"); + palacios_free(kern_buf); + return -EFAULT; + }; - wait_event_interruptible(stream->intr_queue, (ringbuf_data_len(stream->buf) != 0)); + bytes_written = stream->v3_stream->input(stream->v3_stream, kern_buf, size); + + // could end up with zero here + if (bytes_written<0) { + ERROR("Error on writing to stream\n"); + palacios_free(kern_buf); + return -EFAULT; + } - return ringbuf_read(stream->buf, buf, size); + if (bytes_written==0) { + // This is somewhat bogus, since + // the FD is treated as non-blocking regardless + palacios_free(kern_buf); + return -EWOULDBLOCK; + } + + palacios_free(kern_buf); + return bytes_written; } +static int stream_release(struct inode * i, struct file * filp) { + struct stream_state * stream = filp->private_data; + unsigned long flags; + + palacios_spinlock_lock_irqsave(&(stream->lock), flags); + stream->connected = 0; + palacios_spinlock_unlock_irqrestore(&(stream->lock), flags); + + + return 0; + +} static struct file_operations stream_fops = { .read = stream_read, - // .release = stream_close, - // .poll = stream_poll, + .write = stream_write, + .release = stream_release, + .poll = stream_poll, }; -static void * palacios_stream_open(const char * name, void * private_data) { +static void * palacios_stream_open(struct v3_stream * v3_stream, const char * name, void * private_data) { struct v3_guest * guest = (struct v3_guest *)private_data; - struct stream_buffer * stream = NULL; - struct vm_stream_state * vm_state = NULL; + struct stream_state * stream = NULL; + struct vm_global_streams * vm_state = NULL; if (guest != NULL) { vm_state = get_vm_ext_data(guest, "STREAM_INTERFACE"); if (vm_state == NULL) { - printk("ERROR: Could not locate vm stream state for extension STREAM_INTERFACE\n"); + ERROR("ERROR: Could not locate vm stream state for extension STREAM_INTERFACE\n"); return NULL; } } if (find_stream_by_name(guest, name) != NULL) { - printk("Stream already exists\n"); + ERROR("Stream already exists\n"); + return NULL; + } + + stream = palacios_alloc(sizeof(struct stream_state)); + if (!stream) { + ERROR("Unable to allocate stream\n"); return NULL; } + memset(stream, 0, sizeof(struct stream_state)); - stream = kmalloc(sizeof(struct stream_buffer), GFP_KERNEL); - - stream->buf = create_ringbuf(STREAM_BUF_SIZE); + stream->out_ring = create_ringbuf(STREAM_RING_LEN); + stream->v3_stream = v3_stream; stream->guest = guest; + stream->connected = 0; - strncpy(stream->name, name, STREAM_NAME_LEN - 1); + strncpy(stream->name, name, STREAM_NAME_LEN); + stream->name[STREAM_NAME_LEN-1] = 0; - init_waitqueue_head(&(stream->intr_queue)); - spin_lock_init(&(stream->lock)); + init_waitqueue_head(&(stream->user_poll_queue)); + palacios_spinlock_init(&(stream->lock)); if (guest == NULL) { list_add(&(stream->stream_node), &(global_streams)); @@ -148,32 +262,45 @@ static void * palacios_stream_open(const char * name, void * private_data) { } -static int palacios_stream_write(void * stream_ptr, char * buf, int len) { - struct stream_buffer * stream = (struct stream_buffer *)stream_ptr; - int ret = 0; +// +// Non-blocking VM->host data flow +// +static sint64_t palacios_stream_output(struct v3_stream * v3_stream, uint8_t * buf, sint64_t len) { + struct stream_state * stream = (struct stream_state *)v3_stream->host_stream_data; + sint64_t bytes_written = 0; + unsigned long flags; + - ret = ringbuf_write(stream->buf, buf, len); - if (ret > 0) { - wake_up_interruptible(&(stream->intr_queue)); - } + palacios_spinlock_lock_irqsave(&(stream->lock), flags); + bytes_written=ringbuf_write(stream->out_ring, buf, len); + palacios_spinlock_unlock_irqrestore(&(stream->lock), flags); + + if (bytes_written<0) { + // we ended in an error, so push back to VM + return bytes_written; + } else { + // normal situation, tell it how much we handled + wake_up_interruptible(&(stream->user_poll_queue)); + return bytes_written; + } - return ret; } -static void palacios_stream_close(void * stream_ptr) { - struct stream_buffer * stream = (struct stream_buffer *)stream_ptr; +static void palacios_stream_close(struct v3_stream * v3_stream) { + struct stream_state * stream = (struct stream_state *)v3_stream->host_stream_data; - free_ringbuf(stream->buf); + free_ringbuf(stream->out_ring); list_del(&(stream->stream_node)); - kfree(stream); + palacios_spinlock_deinit(&(stream->lock)); + palacios_free(stream); } static struct v3_stream_hooks palacios_stream_hooks = { .open = palacios_stream_open, - .write = palacios_stream_write, + .output = palacios_stream_output, .close = palacios_stream_close, }; @@ -187,9 +314,13 @@ static int stream_init( void ) { static int stream_deinit( void ) { - if (!list_empty(&(global_streams))) { - printk("Error removing module with open streams\n"); - printk("TODO: free old streams... \n"); + struct stream_state * stream = NULL; + struct stream_state * tmp = NULL; + + list_for_each_entry_safe(stream, tmp, &(global_streams), stream_node) { + free_ringbuf(stream->out_ring); + list_del(&(stream->stream_node)); + palacios_free(stream); } return 0; @@ -201,7 +332,7 @@ static int stream_deinit( void ) { static int stream_connect(struct v3_guest * guest, unsigned int cmd, unsigned long arg, void * priv_data) { void __user * argp = (void __user *)arg; - struct stream_buffer * stream = NULL; + struct stream_state * stream = NULL; int stream_fd = 0; char name[STREAM_NAME_LEN]; unsigned long flags = 0; @@ -209,51 +340,55 @@ static int stream_connect(struct v3_guest * guest, unsigned int cmd, unsigned lo if (copy_from_user(name, argp, STREAM_NAME_LEN)) { - printk("%s(%d): copy from user error...\n", __FILE__, __LINE__); + ERROR("%s(%d): copy from user error...\n", __FILE__, __LINE__); return -EFAULT; } stream = find_stream_by_name(guest, name); if (stream == NULL) { - printk("Could not find stream (%s)\n", name); + ERROR("Could not find stream (%s)\n", name); return -EFAULT; } - spin_lock_irqsave(&(stream->lock), flags); + palacios_spinlock_lock_irqsave(&(stream->lock), flags); if (stream->connected == 0) { stream->connected = 1; ret = 1; } - spin_unlock_irqrestore(&(stream->lock), flags); + palacios_spinlock_unlock_irqrestore(&(stream->lock), flags); if (ret == -1) { - printk("Stream (%s) already connected\n", name); + ERROR("Stream (%s) already connected\n", name); return -EFAULT; } - stream_fd = anon_inode_getfd("v3-stream", &stream_fops, stream, 0); + stream_fd = anon_inode_getfd("v3-stream", &stream_fops, stream, O_RDWR); if (stream_fd < 0) { - printk("Error creating stream inode for (%s)\n", name); + ERROR("Error creating stream inode for (%s)\n", name); return stream_fd; } - printk("Stream (%s) connected\n", name); + INFO("Stream (%s) connected\n", name); return stream_fd; } static int guest_stream_init(struct v3_guest * guest, void ** vm_data) { - struct vm_stream_state * state = kmalloc(sizeof(struct vm_stream_state), GFP_KERNEL); + struct vm_global_streams * state = palacios_alloc(sizeof(struct vm_global_streams)); + + if (!state) { + ERROR("Unable to allocate state in stream init\n"); + return -1; + } INIT_LIST_HEAD(&(state->open_streams)); *vm_data = state; - add_guest_ctrl(guest, V3_VM_STREAM_CONNECT, stream_connect, state); return 0; @@ -261,11 +396,23 @@ static int guest_stream_init(struct v3_guest * guest, void ** vm_data) { static int guest_stream_deinit(struct v3_guest * guest, void * vm_data) { - struct vm_stream_state * state = vm_data; - if (!list_empty(&(state->open_streams))) { - printk("Error shutting down VM with open streams\n"); + struct vm_global_streams * state = vm_data; + + struct stream_state * stream = NULL; + struct stream_state * tmp = NULL; + + + remove_guest_ctrl(guest, V3_VM_STREAM_CONNECT); + + list_for_each_entry_safe(stream, tmp, &(global_streams), stream_node) { + free_ringbuf(stream->out_ring); + list_del(&(stream->stream_node)); + palacios_free(stream); } + + palacios_free(state); + return 0; }