#include <linux/file.h>
+#define sint64_t int64_t
#include <interfaces/vmm_stream.h>
#include "linux-exts.h"
#include "util-ringbuffer.h"
#include "vm.h"
-#include "iface-stream.h"
+
// This is probably overkill
#define STREAM_RING_LEN 4096
-
+#define STREAM_NAME_LEN 128
int connected;
- wait_queue_head_t intr_queue;
+ wait_queue_head_t user_poll_queue;
+
spinlock_t lock;
struct v3_guest * guest;
vm_state = get_vm_ext_data(guest, "STREAM_INTERFACE");
if (vm_state == NULL) {
- printk("ERROR: Could not locate vm stream state for extension STREAM_INTERFACE\n");
+ ERROR("ERROR: Could not locate vm stream state for extension STREAM_INTERFACE\n");
return NULL;
}
}
-
-#define TMP_BUF_LEN 128
-
+// host->user nonblocking data flow
static ssize_t stream_read(struct file * filp, char __user * buf, size_t size, loff_t * offset) {
- struct stream_state * stream = filp->private_data;
- ssize_t bytes_read = 0;
- ssize_t bytes_left = size;
- unsigned long flags;
- char tmp_buf[TMP_BUF_LEN];
- ssize_t total_bytes_left = 0;
-
- // memset(tmp_buf, 0, TMP_BUF_LEN);
-
- while (bytes_left > 0) {
- int tmp_len = (TMP_BUF_LEN > bytes_left) ? bytes_left : TMP_BUF_LEN;
- int tmp_read = 0;
-
- spin_lock_irqsave(&(stream->lock), flags);
- tmp_read = ringbuf_read(stream->out_ring, tmp_buf, tmp_len);
- spin_unlock_irqrestore(&(stream->lock), flags);
-
- if (tmp_read == 0) {
- // If userspace reads more than we have
- break;
- }
-
- if (copy_to_user(buf + bytes_read, tmp_buf, tmp_read)) {
- printk("Read Fault\n");
- return -EFAULT;
- }
-
- bytes_left -= tmp_read;
- bytes_read += tmp_read;
- }
+ struct stream_state * stream = filp->private_data;
+ ssize_t bytes_read = 0;
+ unsigned long flags;
+ char *kern_buf;
+
+
+ kern_buf = palacios_alloc(size);
+
+ if (!kern_buf) {
+ ERROR("Cannot allocate space for buffer\n");
+ return -1;
+ }
+
+ palacios_spinlock_lock_irqsave(&(stream->lock), flags);
+ bytes_read = ringbuf_read(stream->out_ring,kern_buf,size);
+ palacios_spinlock_unlock_irqrestore(&(stream->lock), flags);
-
- spin_lock_irqsave(&(stream->lock), flags);
- total_bytes_left = ringbuf_data_len(stream->out_ring);
- spin_unlock_irqrestore(&(stream->lock), flags);
-
- if (total_bytes_left > 0) {
- wake_up_interruptible(&(stream->intr_queue));
+ if (bytes_read>0) {
+ if (copy_to_user(buf, kern_buf, bytes_read)) {
+ ERROR("Read Fault\n");
+ palacios_free(kern_buf);
+ return -EFAULT;
+ } else {
+ palacios_free(kern_buf);
+ return bytes_read;
}
-
- return bytes_read;
+ } else if (bytes_read==0) {
+ // out of space
+ palacios_free(kern_buf);
+ return -EWOULDBLOCK;
+ } else { // bytes_read<0
+ ERROR("Read failed\n");
+ palacios_free(kern_buf);
+ return -EFAULT;
+ }
}
+
static unsigned int
stream_poll(struct file * filp, struct poll_table_struct * poll_tb) {
struct stream_state * stream = filp->private_data;
- unsigned int mask = POLLIN | POLLRDNORM;
unsigned long flags;
int data_avail = 0;
- poll_wait(filp, &(stream->intr_queue), poll_tb);
+ if (!stream) {
+ return POLLERR;
+ }
+
+ poll_wait(filp, &(stream->user_poll_queue), poll_tb);
- spin_lock_irqsave(&(stream->lock), flags);
+ palacios_spinlock_lock_irqsave(&(stream->lock), flags);
data_avail = ringbuf_data_len(stream->out_ring);
- spin_unlock_irqrestore(&(stream->lock), flags);
+ palacios_spinlock_unlock_irqrestore(&(stream->lock), flags);
if (data_avail > 0) {
- return mask;
+ return POLLIN | POLLRDNORM;
}
return 0;
}
+//
+// Non-blocking user->Host->VM data flow
+//
static ssize_t stream_write(struct file * filp, const char __user * buf, size_t size, loff_t * offset) {
struct stream_state * stream = filp->private_data;
char * kern_buf = NULL;
ssize_t bytes_written = 0;
- kern_buf = kmalloc(size, GFP_KERNEL);
+ kern_buf = palacios_alloc(size);
+
+ if (!kern_buf) {
+ ERROR("Cannot allocate buffer in stream interface\n");
+ return -EFAULT;
+ }
if (copy_from_user(kern_buf, buf, size)) {
- printk("Stream Write Failed\n");
+ ERROR("Stream Write Failed\n");
+ palacios_free(kern_buf);
return -EFAULT;
};
bytes_written = stream->v3_stream->input(stream->v3_stream, kern_buf, size);
- kfree(kern_buf);
+ // could end up with zero here
+ if (bytes_written<0) {
+ ERROR("Error on writing to stream\n");
+ palacios_free(kern_buf);
+ return -EFAULT;
+ }
+ if (bytes_written==0) {
+ // This is somewhat bogus, since
+ // the FD is treated as non-blocking regardless
+ palacios_free(kern_buf);
+ return -EWOULDBLOCK;
+ }
+
+ palacios_free(kern_buf);
return bytes_written;
}
struct stream_state * stream = filp->private_data;
unsigned long flags;
- spin_lock_irqsave(&(stream->lock), flags);
+ palacios_spinlock_lock_irqsave(&(stream->lock), flags);
stream->connected = 0;
- spin_unlock_irqrestore(&(stream->lock), flags);
+ palacios_spinlock_unlock_irqrestore(&(stream->lock), flags);
return 0;
vm_state = get_vm_ext_data(guest, "STREAM_INTERFACE");
if (vm_state == NULL) {
- printk("ERROR: Could not locate vm stream state for extension STREAM_INTERFACE\n");
+ ERROR("ERROR: Could not locate vm stream state for extension STREAM_INTERFACE\n");
return NULL;
}
}
if (find_stream_by_name(guest, name) != NULL) {
- printk("Stream already exists\n");
+ ERROR("Stream already exists\n");
return NULL;
}
- stream = kmalloc(sizeof(struct stream_state), GFP_KERNEL);
+ stream = palacios_alloc(sizeof(struct stream_state));
+ if (!stream) {
+ ERROR("Unable to allocate stream\n");
+ return NULL;
+ }
memset(stream, 0, sizeof(struct stream_state));
stream->out_ring = create_ringbuf(STREAM_RING_LEN);
stream->guest = guest;
stream->connected = 0;
- strncpy(stream->name, name, STREAM_NAME_LEN - 1);
+ strncpy(stream->name, name, STREAM_NAME_LEN);
+ stream->name[STREAM_NAME_LEN-1] = 0;
- init_waitqueue_head(&(stream->intr_queue));
- spin_lock_init(&(stream->lock));
+ init_waitqueue_head(&(stream->user_poll_queue));
+ palacios_spinlock_init(&(stream->lock));
if (guest == NULL) {
list_add(&(stream->stream_node), &(global_streams));
}
-static uint64_t palacios_stream_output(struct v3_stream * v3_stream, char * buf, int len) {
+//
+// Non-blocking VM->host data flow
+//
+static sint64_t palacios_stream_output(struct v3_stream * v3_stream, uint8_t * buf, sint64_t len) {
struct stream_state * stream = (struct stream_state *)v3_stream->host_stream_data;
- int bytes_written = 0;
+ sint64_t bytes_written = 0;
unsigned long flags;
- if (stream->connected == 0) {
- return 0;
- }
-
- while (bytes_written < len) {
- spin_lock_irqsave(&(stream->lock), flags);
- bytes_written += ringbuf_write(stream->out_ring, buf + bytes_written, len - bytes_written);
- spin_unlock_irqrestore(&(stream->lock), flags);
- wake_up_interruptible(&(stream->intr_queue));
+ palacios_spinlock_lock_irqsave(&(stream->lock), flags);
+ bytes_written=ringbuf_write(stream->out_ring, buf, len);
+ palacios_spinlock_unlock_irqrestore(&(stream->lock), flags);
- if (bytes_written < len) {
- // not enough space in ringbuffer, activate user space to drain it
- schedule();
- }
- }
+ if (bytes_written<0) {
+ // we ended in an error, so push back to VM
+ return bytes_written;
+ } else {
+ // normal situation, tell it how much we handled
+ wake_up_interruptible(&(stream->user_poll_queue));
+ return bytes_written;
+ }
-
- return bytes_written;
}
free_ringbuf(stream->out_ring);
list_del(&(stream->stream_node));
- kfree(stream);
+ palacios_spinlock_deinit(&(stream->lock));
+ palacios_free(stream);
}
static int stream_deinit( void ) {
- if (!list_empty(&(global_streams))) {
- printk("Error removing module with open streams\n");
- printk("TODO: free old streams... \n");
+ struct stream_state * stream = NULL;
+ struct stream_state * tmp = NULL;
+
+ list_for_each_entry_safe(stream, tmp, &(global_streams), stream_node) {
+ free_ringbuf(stream->out_ring);
+ list_del(&(stream->stream_node));
+ palacios_free(stream);
}
return 0;
if (copy_from_user(name, argp, STREAM_NAME_LEN)) {
- printk("%s(%d): copy from user error...\n", __FILE__, __LINE__);
+ ERROR("%s(%d): copy from user error...\n", __FILE__, __LINE__);
return -EFAULT;
}
stream = find_stream_by_name(guest, name);
if (stream == NULL) {
- printk("Could not find stream (%s)\n", name);
+ ERROR("Could not find stream (%s)\n", name);
return -EFAULT;
}
- spin_lock_irqsave(&(stream->lock), flags);
+ palacios_spinlock_lock_irqsave(&(stream->lock), flags);
if (stream->connected == 0) {
stream->connected = 1;
ret = 1;
}
- spin_unlock_irqrestore(&(stream->lock), flags);
+ palacios_spinlock_unlock_irqrestore(&(stream->lock), flags);
if (ret == -1) {
- printk("Stream (%s) already connected\n", name);
+ ERROR("Stream (%s) already connected\n", name);
return -EFAULT;
}
stream_fd = anon_inode_getfd("v3-stream", &stream_fops, stream, O_RDWR);
if (stream_fd < 0) {
- printk("Error creating stream inode for (%s)\n", name);
+ ERROR("Error creating stream inode for (%s)\n", name);
return stream_fd;
}
- printk("Stream (%s) connected\n", name);
+ INFO("Stream (%s) connected\n", name);
return stream_fd;
}
static int guest_stream_init(struct v3_guest * guest, void ** vm_data) {
- struct vm_global_streams * state = kmalloc(sizeof(struct vm_global_streams), GFP_KERNEL);
+ struct vm_global_streams * state = palacios_alloc(sizeof(struct vm_global_streams));
+
+ if (!state) {
+ ERROR("Unable to allocate state in stream init\n");
+ return -1;
+ }
INIT_LIST_HEAD(&(state->open_streams));
*vm_data = state;
static int guest_stream_deinit(struct v3_guest * guest, void * vm_data) {
struct vm_global_streams * state = vm_data;
- if (!list_empty(&(state->open_streams))) {
- printk("Error shutting down VM with open streams\n");
+
+ struct stream_state * stream = NULL;
+ struct stream_state * tmp = NULL;
+
+
+ remove_guest_ctrl(guest, V3_VM_STREAM_CONNECT);
+
+ list_for_each_entry_safe(stream, tmp, &(global_streams), stream_node) {
+ free_ringbuf(stream->out_ring);
+ list_del(&(stream->stream_node));
+ palacios_free(stream);
}
+
+ palacios_free(state);
+
return 0;
}