#include <linux/file.h>
+#define sint64_t int64_t
#include <interfaces/vmm_stream.h>
#include "linux-exts.h"
#include "util-ringbuffer.h"
int connected;
- wait_queue_head_t intr_queue;
+ wait_queue_head_t user_poll_queue;
+
spinlock_t lock;
struct v3_guest * guest;
}
-
-#define TMP_BUF_LEN 128
-
+// host->user nonblocking data flow
static ssize_t stream_read(struct file * filp, char __user * buf, size_t size, loff_t * offset) {
- struct stream_state * stream = filp->private_data;
- ssize_t bytes_read = 0;
- ssize_t bytes_left = size;
- unsigned long flags;
- char tmp_buf[TMP_BUF_LEN];
- ssize_t total_bytes_left = 0;
-
- // memset(tmp_buf, 0, TMP_BUF_LEN);
-
- while (bytes_left > 0) {
- int tmp_len = (TMP_BUF_LEN > bytes_left) ? bytes_left : TMP_BUF_LEN;
- int tmp_read = 0;
-
- palacios_spinlock_lock_irqsave(&(stream->lock), flags);
- tmp_read = ringbuf_read(stream->out_ring, tmp_buf, tmp_len);
- palacios_spinlock_unlock_irqrestore(&(stream->lock), flags);
-
- if (tmp_read == 0) {
- // If userspace reads more than we have
- break;
- }
-
- if (copy_to_user(buf + bytes_read, tmp_buf, tmp_read)) {
- ERROR("Read Fault\n");
- return -EFAULT;
- }
-
- bytes_left -= tmp_read;
- bytes_read += tmp_read;
- }
+ struct stream_state * stream = filp->private_data;
+ ssize_t bytes_read = 0;
+ unsigned long flags;
+ char *kern_buf;
+
+
+ kern_buf = palacios_alloc(size);
+
+ if (!kern_buf) {
+ ERROR("Cannot allocate space for buffer\n");
+ return -1;
+ }
+
+ palacios_spinlock_lock_irqsave(&(stream->lock), flags);
+ bytes_read = ringbuf_read(stream->out_ring,kern_buf,size);
+ palacios_spinlock_unlock_irqrestore(&(stream->lock), flags);
-
- palacios_spinlock_lock_irqsave(&(stream->lock), flags);
- total_bytes_left = ringbuf_data_len(stream->out_ring);
- palacios_spinlock_unlock_irqrestore(&(stream->lock), flags);
-
- if (total_bytes_left > 0) {
- wake_up_interruptible(&(stream->intr_queue));
+ if (bytes_read>0) {
+ if (copy_to_user(buf, kern_buf, bytes_read)) {
+ ERROR("Read Fault\n");
+ palacios_free(kern_buf);
+ return -EFAULT;
+ } else {
+ palacios_free(kern_buf);
+ return bytes_read;
}
-
- return bytes_read;
+ } else if (bytes_read==0) {
+ // out of space
+ palacios_free(kern_buf);
+ return -EWOULDBLOCK;
+ } else { // bytes_read<0
+ ERROR("Read failed\n");
+ palacios_free(kern_buf);
+ return -EFAULT;
+ }
}
+
static unsigned int
stream_poll(struct file * filp, struct poll_table_struct * poll_tb) {
struct stream_state * stream = filp->private_data;
- unsigned int mask = POLLIN | POLLRDNORM;
unsigned long flags;
int data_avail = 0;
- poll_wait(filp, &(stream->intr_queue), poll_tb);
+ if (!stream) {
+ return POLLERR;
+ }
+
+ poll_wait(filp, &(stream->user_poll_queue), poll_tb);
palacios_spinlock_lock_irqsave(&(stream->lock), flags);
data_avail = ringbuf_data_len(stream->out_ring);
palacios_spinlock_unlock_irqrestore(&(stream->lock), flags);
if (data_avail > 0) {
- return mask;
+ return POLLIN | POLLRDNORM;
}
return 0;
}
+//
+// Non-blocking user->Host->VM data flow
+//
static ssize_t stream_write(struct file * filp, const char __user * buf, size_t size, loff_t * offset) {
struct stream_state * stream = filp->private_data;
char * kern_buf = NULL;
bytes_written = stream->v3_stream->input(stream->v3_stream, kern_buf, size);
- palacios_free(kern_buf);
+ // could end up with zero here
+ if (bytes_written<0) {
+ ERROR("Error on writing to stream\n");
+ palacios_free(kern_buf);
+ return -EFAULT;
+ }
+ if (bytes_written==0) {
+ // This is somewhat bogus, since
+ // the FD is treated as non-blocking regardless
+ palacios_free(kern_buf);
+ return -EWOULDBLOCK;
+ }
+
+ palacios_free(kern_buf);
return bytes_written;
}
strncpy(stream->name, name, STREAM_NAME_LEN - 1);
- init_waitqueue_head(&(stream->intr_queue));
+ init_waitqueue_head(&(stream->user_poll_queue));
palacios_spinlock_init(&(stream->lock));
if (guest == NULL) {
}
-static uint64_t palacios_stream_output(struct v3_stream * v3_stream, char * buf, int len) {
+//
+// Non-blocking VM->host data flow
+//
+static sint64_t palacios_stream_output(struct v3_stream * v3_stream, uint8_t * buf, sint64_t len) {
struct stream_state * stream = (struct stream_state *)v3_stream->host_stream_data;
- int bytes_written = 0;
+ sint64_t bytes_written = 0;
unsigned long flags;
- if (stream->connected == 0) {
- return 0;
- }
- while (bytes_written < len) {
- palacios_spinlock_lock_irqsave(&(stream->lock), flags);
- bytes_written += ringbuf_write(stream->out_ring, buf + bytes_written, len - bytes_written);
- palacios_spinlock_unlock_irqrestore(&(stream->lock), flags);
-
- wake_up_interruptible(&(stream->intr_queue));
+ palacios_spinlock_lock_irqsave(&(stream->lock), flags);
+ bytes_written=ringbuf_write(stream->out_ring, buf, len);
+ palacios_spinlock_unlock_irqrestore(&(stream->lock), flags);
- if (bytes_written < len) {
- // not enough space in ringbuffer, activate user space to drain it
- schedule();
- }
- }
+ if (bytes_written<0) {
+ // we ended in an error, so push back to VM
+ return bytes_written;
+ } else {
+ // normal situation, tell it how much we handled
+ wake_up_interruptible(&(stream->user_poll_queue));
+ return bytes_written;
+ }
-
- return bytes_written;
}
struct v3_stream {
void * host_stream_data;
void * guest_stream_data;
- uint64_t (*input)(struct v3_stream * stream, uint8_t * buf, uint64_t len);
+ // the semantics are:
+ // negative return = error
+ // zero return = would block
+ // otherwise = amount of data transfered
+ sint64_t (*input)(struct v3_stream * stream, uint8_t * buf, sint64_t len);
};
/* VM Can be NULL */
struct v3_stream * v3_stream_open(struct v3_vm_info * vm, const char * name,
- uint64_t (*input)(struct v3_stream * stream, uint8_t * buf, uint64_t len),
+ sint64_t (*input)(struct v3_stream * stream, uint8_t * buf, sint64_t len),
void * guest_stream_data);
-uint64_t v3_stream_output(struct v3_stream * stream, uint8_t * buf, uint32_t len);
+// the semantics are:
+// negative return = error
+// zero return = would block
+// otherwise = amount of data transfered
+sint64_t v3_stream_output(struct v3_stream * stream, uint8_t * buf, sint64_t len);
void v3_stream_close(struct v3_stream * stream);
struct v3_stream_hooks {
void *(*open)(struct v3_stream * stream, const char * name, void * host_vm_data);
- uint64_t (*output)(struct v3_stream * stream, char * buf, int len);
+ sint64_t (*output)(struct v3_stream * stream, uint8_t * buf, sint64_t len);
void (*close)(struct v3_stream * stream);
};