-
-/*
- * VM specific Controls
- * (c) Lei Xia, 2010
+/*
+ * Stream Implementation
+ * (c) Lei Xia 2010
*/
+
+
#include <linux/errno.h>
#include <linux/percpu.h>
#include <linux/sched.h>
#include <linux/uaccess.h>
+#include <linux/fs.h>
+#include <linux/poll.h>
+#include <linux/anon_inodes.h>
+#include <linux/file.h>
#include <interfaces/vmm_stream.h>
#include "linux-exts.h"
#include "util-ringbuffer.h"
#include "vm.h"
-#include "iface-stream.h"
-#define STREAM_BUF_SIZE 1024
+// This is probably overkill
+#define STREAM_RING_LEN 4096
+#define STREAM_NAME_LEN 128
+
static struct list_head global_streams;
-struct stream_buffer {
+
+
+struct stream_state {
char name[STREAM_NAME_LEN];
- struct ringbuf * buf;
+
+ struct ringbuf * out_ring;
+
+ int connected;
wait_queue_head_t intr_queue;
spinlock_t lock;
struct v3_guest * guest;
struct list_head stream_node;
+
+ struct v3_stream * v3_stream;
};
// Currently just the list of open streams
-struct vm_stream_state {
+struct vm_global_streams {
struct list_head open_streams;
};
-static int stream_enqueue(struct stream_buffer * stream, char * buf, int len) {
- int bytes = 0;
- bytes = ringbuf_write(stream->buf, buf, len);
- return bytes;
-}
-
-
-static int stream_dequeue(struct stream_buffer * stream, char * buf, int len) {
- int bytes = 0;
-
- bytes = ringbuf_read(stream->buf, buf, len);
- return bytes;
-}
-
-static int stream_datalen(struct stream_buffer * stream){
- return ringbuf_data_len(stream->buf);
-}
-
-
-
-
-static struct stream_buffer * find_stream_by_name(struct v3_guest * guest, const char * name) {
- struct stream_buffer * stream = NULL;
+static struct stream_state * find_stream_by_name(struct v3_guest * guest, const char * name) {
+ struct stream_state * stream = NULL;
struct list_head * stream_list = NULL;
- struct vm_stream_state * vm_state = NULL;
-
+ struct vm_global_streams * vm_state = NULL;
if (guest == NULL) {
stream_list = &global_streams;
vm_state = get_vm_ext_data(guest, "STREAM_INTERFACE");
if (vm_state == NULL) {
- printk("ERROR: Could not locate vm stream state for extension STREAM_INTERFACE\n");
+ ERROR("ERROR: Could not locate vm stream state for extension STREAM_INTERFACE\n");
return NULL;
}
}
-static void * palacios_stream_open(const char * name, void * private_data) {
+
+#define TMP_BUF_LEN 128
+
+static ssize_t stream_read(struct file * filp, char __user * buf, size_t size, loff_t * offset) {
+ struct stream_state * stream = filp->private_data;
+ ssize_t bytes_read = 0;
+ ssize_t bytes_left = size;
+ unsigned long flags;
+ char tmp_buf[TMP_BUF_LEN];
+ ssize_t total_bytes_left = 0;
+
+ // memset(tmp_buf, 0, TMP_BUF_LEN);
+
+ while (bytes_left > 0) {
+ int tmp_len = (TMP_BUF_LEN > bytes_left) ? bytes_left : TMP_BUF_LEN;
+ int tmp_read = 0;
+
+ palacios_spinlock_lock_irqsave(&(stream->lock), flags);
+ tmp_read = ringbuf_read(stream->out_ring, tmp_buf, tmp_len);
+ palacios_spinlock_unlock_irqrestore(&(stream->lock), flags);
+
+ if (tmp_read == 0) {
+ // If userspace reads more than we have
+ break;
+ }
+
+ if (copy_to_user(buf + bytes_read, tmp_buf, tmp_read)) {
+ ERROR("Read Fault\n");
+ return -EFAULT;
+ }
+
+ bytes_left -= tmp_read;
+ bytes_read += tmp_read;
+ }
+
+
+ palacios_spinlock_lock_irqsave(&(stream->lock), flags);
+ total_bytes_left = ringbuf_data_len(stream->out_ring);
+ palacios_spinlock_unlock_irqrestore(&(stream->lock), flags);
+
+ if (total_bytes_left > 0) {
+ wake_up_interruptible(&(stream->intr_queue));
+ }
+
+ return bytes_read;
+}
+
+static unsigned int
+stream_poll(struct file * filp, struct poll_table_struct * poll_tb) {
+ struct stream_state * stream = filp->private_data;
+ unsigned int mask = POLLIN | POLLRDNORM;
+ unsigned long flags;
+ int data_avail = 0;
+
+ poll_wait(filp, &(stream->intr_queue), poll_tb);
+
+ palacios_spinlock_lock_irqsave(&(stream->lock), flags);
+ data_avail = ringbuf_data_len(stream->out_ring);
+ palacios_spinlock_unlock_irqrestore(&(stream->lock), flags);
+
+ if (data_avail > 0) {
+ return mask;
+ }
+
+ return 0;
+
+}
+
+static ssize_t stream_write(struct file * filp, const char __user * buf, size_t size, loff_t * offset) {
+ struct stream_state * stream = filp->private_data;
+ char * kern_buf = NULL;
+ ssize_t bytes_written = 0;
+
+ kern_buf = palacios_alloc(size);
+
+ if (!kern_buf) {
+ ERROR("Cannot allocate buffer in stream interface\n");
+ return -EFAULT;
+ }
+
+ if (copy_from_user(kern_buf, buf, size)) {
+ ERROR("Stream Write Failed\n");
+ palacios_free(kern_buf);
+ return -EFAULT;
+ };
+
+ bytes_written = stream->v3_stream->input(stream->v3_stream, kern_buf, size);
+
+ palacios_free(kern_buf);
+
+ return bytes_written;
+}
+
+
+static int stream_release(struct inode * i, struct file * filp) {
+ struct stream_state * stream = filp->private_data;
+ unsigned long flags;
+
+ palacios_spinlock_lock_irqsave(&(stream->lock), flags);
+ stream->connected = 0;
+ palacios_spinlock_unlock_irqrestore(&(stream->lock), flags);
+
+
+ return 0;
+
+}
+
+static struct file_operations stream_fops = {
+ .read = stream_read,
+ .write = stream_write,
+ .release = stream_release,
+ .poll = stream_poll,
+};
+
+
+
+static void * palacios_stream_open(struct v3_stream * v3_stream, const char * name, void * private_data) {
struct v3_guest * guest = (struct v3_guest *)private_data;
- struct stream_buffer * stream = NULL;
- struct vm_stream_state * vm_state = NULL;
+ struct stream_state * stream = NULL;
+ struct vm_global_streams * vm_state = NULL;
if (guest != NULL) {
vm_state = get_vm_ext_data(guest, "STREAM_INTERFACE");
if (vm_state == NULL) {
- printk("ERROR: Could not locate vm stream state for extension STREAM_INTERFACE\n");
+ ERROR("ERROR: Could not locate vm stream state for extension STREAM_INTERFACE\n");
return NULL;
}
}
if (find_stream_by_name(guest, name) != NULL) {
- printk("Stream already exists\n");
+ ERROR("Stream already exists\n");
return NULL;
}
- stream = kmalloc(sizeof(struct stream_buffer), GFP_KERNEL);
-
- stream->buf = create_ringbuf(STREAM_BUF_SIZE);
+ stream = palacios_alloc(sizeof(struct stream_state));
+ if (!stream) {
+ ERROR("Unable to allocate stream\n");
+ return NULL;
+ }
+ memset(stream, 0, sizeof(struct stream_state));
+
+ stream->out_ring = create_ringbuf(STREAM_RING_LEN);
+ stream->v3_stream = v3_stream;
stream->guest = guest;
+ stream->connected = 0;
strncpy(stream->name, name, STREAM_NAME_LEN - 1);
init_waitqueue_head(&(stream->intr_queue));
- spin_lock_init(&(stream->lock));
+ palacios_spinlock_init(&(stream->lock));
if (guest == NULL) {
list_add(&(stream->stream_node), &(global_streams));
}
-static int palacios_stream_write(void * stream_ptr, char * buf, int len) {
- struct stream_buffer * stream = (struct stream_buffer *)stream_ptr;
- int ret = 0;
+static uint64_t palacios_stream_output(struct v3_stream * v3_stream, char * buf, int len) {
+ struct stream_state * stream = (struct stream_state *)v3_stream->host_stream_data;
+ int bytes_written = 0;
+ unsigned long flags;
+
- ret = stream_enqueue(stream, buf, len);
+ if (stream->connected == 0) {
+ return 0;
+ }
+
+ while (bytes_written < len) {
+ palacios_spinlock_lock_irqsave(&(stream->lock), flags);
+ bytes_written += ringbuf_write(stream->out_ring, buf + bytes_written, len - bytes_written);
+ palacios_spinlock_unlock_irqrestore(&(stream->lock), flags);
- if (ret > 0) {
wake_up_interruptible(&(stream->intr_queue));
+
+ if (bytes_written < len) {
+ // not enough space in ringbuffer, activate user space to drain it
+ schedule();
+ }
}
- return ret;
+
+ return bytes_written;
}
-static void palacios_stream_close(void * stream_ptr) {
- struct stream_buffer * stream = (struct stream_buffer *)stream_ptr;
+static void palacios_stream_close(struct v3_stream * v3_stream) {
+ struct stream_state * stream = (struct stream_state *)v3_stream->host_stream_data;
- free_ringbuf(stream->buf);
+ free_ringbuf(stream->out_ring);
list_del(&(stream->stream_node));
- kfree(stream);
+ palacios_free(stream);
}
static struct v3_stream_hooks palacios_stream_hooks = {
.open = palacios_stream_open,
- .write = palacios_stream_write,
+ .output = palacios_stream_output,
.close = palacios_stream_close,
};
static int stream_deinit( void ) {
- if (!list_empty(&(global_streams))) {
- printk("Error removing module with open streams\n");
+ struct stream_state * stream = NULL;
+ struct stream_state * tmp = NULL;
+
+ list_for_each_entry_safe(stream, tmp, &(global_streams), stream_node) {
+ free_ringbuf(stream->out_ring);
+ list_del(&(stream->stream_node));
+ palacios_free(stream);
}
return 0;
}
+
+
+
+
static int stream_connect(struct v3_guest * guest, unsigned int cmd, unsigned long arg, void * priv_data) {
void __user * argp = (void __user *)arg;
- char path_name[STREAM_NAME_LEN];
+ struct stream_state * stream = NULL;
+ int stream_fd = 0;
+ char name[STREAM_NAME_LEN];
+ unsigned long flags = 0;
+ int ret = -1;
+
- if (copy_from_user(path_name, argp, STREAM_NAME_LEN)) {
- printk("%s(%d): copy from user error...\n", __FILE__, __LINE__);
+ if (copy_from_user(name, argp, STREAM_NAME_LEN)) {
+ ERROR("%s(%d): copy from user error...\n", __FILE__, __LINE__);
+ return -EFAULT;
+ }
+
+ stream = find_stream_by_name(guest, name);
+
+ if (stream == NULL) {
+ ERROR("Could not find stream (%s)\n", name);
+ return -EFAULT;
+ }
+
+ palacios_spinlock_lock_irqsave(&(stream->lock), flags);
+ if (stream->connected == 0) {
+ stream->connected = 1;
+ ret = 1;
+ }
+ palacios_spinlock_unlock_irqrestore(&(stream->lock), flags);
+
+
+ if (ret == -1) {
+ ERROR("Stream (%s) already connected\n", name);
return -EFAULT;
}
+
+ stream_fd = anon_inode_getfd("v3-stream", &stream_fops, stream, O_RDWR);
+ if (stream_fd < 0) {
+ ERROR("Error creating stream inode for (%s)\n", name);
+ return stream_fd;
+ }
- printk("ERROR: Opening Streams is currently not implemented...\n");
+ INFO("Stream (%s) connected\n", name);
- return -EFAULT;
+ return stream_fd;
}
static int guest_stream_init(struct v3_guest * guest, void ** vm_data) {
- struct vm_stream_state * state = kmalloc(sizeof(struct vm_stream_state), GFP_KERNEL);
+ struct vm_global_streams * state = palacios_alloc(sizeof(struct vm_global_streams));
+
+ if (!state) {
+ ERROR("Unable to allocate state in stream init\n");
+ return -1;
+ }
INIT_LIST_HEAD(&(state->open_streams));
*vm_data = state;
-
add_guest_ctrl(guest, V3_VM_STREAM_CONNECT, stream_connect, state);
return 0;
static int guest_stream_deinit(struct v3_guest * guest, void * vm_data) {
- struct vm_stream_state * state = vm_data;
- if (!list_empty(&(state->open_streams))) {
- printk("Error shutting down VM with open streams\n");
- }
+ struct vm_global_streams * state = vm_data;
+
+ struct stream_state * stream = NULL;
+ struct stream_state * tmp = NULL;
+ list_for_each_entry_safe(stream, tmp, &(global_streams), stream_node) {
+ free_ringbuf(stream->out_ring);
+ list_del(&(stream->stream_node));
+ palacios_free(stream);
+ }
+
+ palacios_free(state);
+
return 0;
}