From: Peter Dinda Date: Fri, 2 Aug 2013 21:06:28 +0000 (-0500) Subject: Changes to stream interface to allow for flow control and error signaling X-Git-Url: http://v3vee.org/palacios/gitweb/gitweb.cgi?p=palacios.git;a=commitdiff_plain;h=b5380c0aecad02964d5aff23ab51979a063379ce Changes to stream interface to allow for flow control and error signaling --- diff --git a/linux_module/iface-stream.c b/linux_module/iface-stream.c index d6c6331..27bfe42 100644 --- a/linux_module/iface-stream.c +++ b/linux_module/iface-stream.c @@ -14,6 +14,7 @@ #include +#define sint64_t int64_t #include #include "linux-exts.h" #include "util-ringbuffer.h" @@ -38,7 +39,8 @@ struct stream_state { int connected; - wait_queue_head_t intr_queue; + wait_queue_head_t user_poll_queue; + spinlock_t lock; struct v3_guest * guest; @@ -85,74 +87,73 @@ static struct stream_state * find_stream_by_name(struct v3_guest * guest, const } - -#define TMP_BUF_LEN 128 - +// host->user nonblocking data flow static ssize_t stream_read(struct file * filp, char __user * buf, size_t size, loff_t * offset) { - struct stream_state * stream = filp->private_data; - ssize_t bytes_read = 0; - ssize_t bytes_left = size; - unsigned long flags; - char tmp_buf[TMP_BUF_LEN]; - ssize_t total_bytes_left = 0; - - // memset(tmp_buf, 0, TMP_BUF_LEN); - - while (bytes_left > 0) { - int tmp_len = (TMP_BUF_LEN > bytes_left) ? bytes_left : TMP_BUF_LEN; - int tmp_read = 0; - - palacios_spinlock_lock_irqsave(&(stream->lock), flags); - tmp_read = ringbuf_read(stream->out_ring, tmp_buf, tmp_len); - palacios_spinlock_unlock_irqrestore(&(stream->lock), flags); - - if (tmp_read == 0) { - // If userspace reads more than we have - break; - } - - if (copy_to_user(buf + bytes_read, tmp_buf, tmp_read)) { - ERROR("Read Fault\n"); - return -EFAULT; - } - - bytes_left -= tmp_read; - bytes_read += tmp_read; - } + struct stream_state * stream = filp->private_data; + ssize_t bytes_read = 0; + unsigned long flags; + char *kern_buf; + + + kern_buf = palacios_alloc(size); + + if (!kern_buf) { + ERROR("Cannot allocate space for buffer\n"); + return -1; + } + + palacios_spinlock_lock_irqsave(&(stream->lock), flags); + bytes_read = ringbuf_read(stream->out_ring,kern_buf,size); + palacios_spinlock_unlock_irqrestore(&(stream->lock), flags); - - palacios_spinlock_lock_irqsave(&(stream->lock), flags); - total_bytes_left = ringbuf_data_len(stream->out_ring); - palacios_spinlock_unlock_irqrestore(&(stream->lock), flags); - - if (total_bytes_left > 0) { - wake_up_interruptible(&(stream->intr_queue)); + if (bytes_read>0) { + if (copy_to_user(buf, kern_buf, bytes_read)) { + ERROR("Read Fault\n"); + palacios_free(kern_buf); + return -EFAULT; + } else { + palacios_free(kern_buf); + return bytes_read; } - - return bytes_read; + } else if (bytes_read==0) { + // out of space + palacios_free(kern_buf); + return -EWOULDBLOCK; + } else { // bytes_read<0 + ERROR("Read failed\n"); + palacios_free(kern_buf); + return -EFAULT; + } } + static unsigned int stream_poll(struct file * filp, struct poll_table_struct * poll_tb) { struct stream_state * stream = filp->private_data; - unsigned int mask = POLLIN | POLLRDNORM; unsigned long flags; int data_avail = 0; - poll_wait(filp, &(stream->intr_queue), poll_tb); + if (!stream) { + return POLLERR; + } + + poll_wait(filp, &(stream->user_poll_queue), poll_tb); palacios_spinlock_lock_irqsave(&(stream->lock), flags); data_avail = ringbuf_data_len(stream->out_ring); palacios_spinlock_unlock_irqrestore(&(stream->lock), flags); if (data_avail > 0) { - return mask; + return POLLIN | POLLRDNORM; } return 0; } +// +// Non-blocking user->Host->VM data flow +// static ssize_t stream_write(struct file * filp, const char __user * buf, size_t size, loff_t * offset) { struct stream_state * stream = filp->private_data; char * kern_buf = NULL; @@ -173,8 +174,21 @@ static ssize_t stream_write(struct file * filp, const char __user * buf, size_t bytes_written = stream->v3_stream->input(stream->v3_stream, kern_buf, size); - palacios_free(kern_buf); + // could end up with zero here + if (bytes_written<0) { + ERROR("Error on writing to stream\n"); + palacios_free(kern_buf); + return -EFAULT; + } + if (bytes_written==0) { + // This is somewhat bogus, since + // the FD is treated as non-blocking regardless + palacios_free(kern_buf); + return -EWOULDBLOCK; + } + + palacios_free(kern_buf); return bytes_written; } @@ -234,7 +248,7 @@ static void * palacios_stream_open(struct v3_stream * v3_stream, const char * na strncpy(stream->name, name, STREAM_NAME_LEN - 1); - init_waitqueue_head(&(stream->intr_queue)); + init_waitqueue_head(&(stream->user_poll_queue)); palacios_spinlock_init(&(stream->lock)); if (guest == NULL) { @@ -247,31 +261,29 @@ static void * palacios_stream_open(struct v3_stream * v3_stream, const char * na } -static uint64_t palacios_stream_output(struct v3_stream * v3_stream, char * buf, int len) { +// +// Non-blocking VM->host data flow +// +static sint64_t palacios_stream_output(struct v3_stream * v3_stream, uint8_t * buf, sint64_t len) { struct stream_state * stream = (struct stream_state *)v3_stream->host_stream_data; - int bytes_written = 0; + sint64_t bytes_written = 0; unsigned long flags; - if (stream->connected == 0) { - return 0; - } - while (bytes_written < len) { - palacios_spinlock_lock_irqsave(&(stream->lock), flags); - bytes_written += ringbuf_write(stream->out_ring, buf + bytes_written, len - bytes_written); - palacios_spinlock_unlock_irqrestore(&(stream->lock), flags); - - wake_up_interruptible(&(stream->intr_queue)); + palacios_spinlock_lock_irqsave(&(stream->lock), flags); + bytes_written=ringbuf_write(stream->out_ring, buf, len); + palacios_spinlock_unlock_irqrestore(&(stream->lock), flags); - if (bytes_written < len) { - // not enough space in ringbuffer, activate user space to drain it - schedule(); - } - } + if (bytes_written<0) { + // we ended in an error, so push back to VM + return bytes_written; + } else { + // normal situation, tell it how much we handled + wake_up_interruptible(&(stream->user_poll_queue)); + return bytes_written; + } - - return bytes_written; } diff --git a/palacios/include/interfaces/vmm_stream.h b/palacios/include/interfaces/vmm_stream.h index e9c49c7..4a36e1b 100644 --- a/palacios/include/interfaces/vmm_stream.h +++ b/palacios/include/interfaces/vmm_stream.h @@ -26,7 +26,11 @@ struct v3_stream { void * host_stream_data; void * guest_stream_data; - uint64_t (*input)(struct v3_stream * stream, uint8_t * buf, uint64_t len); + // the semantics are: + // negative return = error + // zero return = would block + // otherwise = amount of data transfered + sint64_t (*input)(struct v3_stream * stream, uint8_t * buf, sint64_t len); }; @@ -37,10 +41,14 @@ struct v3_stream { /* VM Can be NULL */ struct v3_stream * v3_stream_open(struct v3_vm_info * vm, const char * name, - uint64_t (*input)(struct v3_stream * stream, uint8_t * buf, uint64_t len), + sint64_t (*input)(struct v3_stream * stream, uint8_t * buf, sint64_t len), void * guest_stream_data); -uint64_t v3_stream_output(struct v3_stream * stream, uint8_t * buf, uint32_t len); +// the semantics are: +// negative return = error +// zero return = would block +// otherwise = amount of data transfered +sint64_t v3_stream_output(struct v3_stream * stream, uint8_t * buf, sint64_t len); void v3_stream_close(struct v3_stream * stream); @@ -49,7 +57,7 @@ void v3_stream_close(struct v3_stream * stream); struct v3_stream_hooks { void *(*open)(struct v3_stream * stream, const char * name, void * host_vm_data); - uint64_t (*output)(struct v3_stream * stream, char * buf, int len); + sint64_t (*output)(struct v3_stream * stream, uint8_t * buf, sint64_t len); void (*close)(struct v3_stream * stream); }; diff --git a/palacios/include/palacios/vmm_dev_mgr.h b/palacios/include/palacios/vmm_dev_mgr.h index e5e13d9..f90232a 100644 --- a/palacios/include/palacios/vmm_dev_mgr.h +++ b/palacios/include/palacios/vmm_dev_mgr.h @@ -187,6 +187,9 @@ void v3_print_dev_mgr(struct v3_vm_info * vm); struct v3_dev_blk_ops { uint64_t (*get_capacity)(void * private_data); // Reads always operate on 2048 byte blocks + + // how does this signal bytes_read < requested but not error? + int (*read)(uint8_t * buf, uint64_t lba, uint64_t num_bytes, void * private_data); int (*write)(uint8_t * buf, uint64_t lba, uint64_t num_bytes, void * private_data); }; @@ -224,11 +227,11 @@ struct v3_dev_console_ops { struct v3_dev_char_ops { /* Backend implemented functions */ - uint64_t (*output)(uint8_t * buf, uint64_t len, void * private_data); + sint64_t (*output)(uint8_t * buf, sint64_t len, void * private_data); // int (*read)(uint8_t * buf, uint64_t len, void * private_data); /* Frontend Implemented functions */ - uint64_t (*input)(struct v3_vm_info * vm, uint8_t * buf, uint64_t len, void * private_data); + sint64_t (*input)(struct v3_vm_info * vm, uint8_t * buf, sint64_t len, void * private_data); }; diff --git a/palacios/src/interfaces/vmm_stream.c b/palacios/src/interfaces/vmm_stream.c index df235aa..3a3ccf7 100644 --- a/palacios/src/interfaces/vmm_stream.c +++ b/palacios/src/interfaces/vmm_stream.c @@ -29,7 +29,7 @@ static struct v3_stream_hooks * stream_hooks = NULL; // VM can be NULL struct v3_stream * v3_stream_open(struct v3_vm_info * vm, const char * name, - uint64_t (*input)(struct v3_stream * stream, uint8_t * buf, uint64_t len), + sint64_t (*input)(struct v3_stream * stream, uint8_t * buf, sint64_t len), void * guest_stream_data) { struct v3_stream * stream = NULL; @@ -50,7 +50,7 @@ struct v3_stream * v3_stream_open(struct v3_vm_info * vm, const char * name, return stream; } -uint64_t v3_stream_output(struct v3_stream * stream, uint8_t * buf, uint32_t len) { +sint64_t v3_stream_output(struct v3_stream * stream, uint8_t * buf, sint64_t len) { V3_ASSERT(VM_NONE, VCORE_NONE, stream_hooks != NULL); V3_ASSERT(VM_NONE, VCORE_NONE, stream_hooks->output != NULL);