/*
* Palacios keyed stream interface
- * (c) Peter Dinda, 2011
+ *
+ * Plus implementations for mem, file, and user space implementations
+ *
+ * (c) Peter Dinda, 2011 (interface, mem + file implementations + recooked user impl)
+ * (c) Clint Sbisa, 2011 (initial user space implementation on which this is based)
*/
#include <linux/fs.h>
#include <linux/uaccess.h>
#include <linux/namei.h>
#include <linux/vmalloc.h>
+#include <linux/poll.h>
+#include <linux/anon_inodes.h>
#include "palacios.h"
#include "util-hashtable.h"
#include "linux-exts.h"
+#include "vm.h"
#define sint64_t int64_t
#include <interfaces/vmm_keyed_stream.h>
+#include "iface-keyed-stream-user.h"
+
/*
This is an implementation of the Palacios keyed stream interface
- that supports two flavors of streams:
+ that supports three flavors of streams:
"mem:" Streams are stored in a hash table
The values for this hash table are hash tables associated with
open corresponds to a directory, while key corresponds to
a distinct file in that directory.
+ "user:" Stream requests are bounced to user space to be
+ handled there. A rendezvous approach similar to the host
+ device userland support is used
+
*/
#define STREAM_GENERIC 0
#define STREAM_MEM 1
#define STREAM_FILE 2
-
+#define STREAM_USER 3
/*
All keyed streams and streams indicate their implementation type within the first field
// This stores all the memory keyed streams streams
-static struct hashtable *streams=0;
+static struct hashtable *mem_streams=0;
static v3_keyed_stream_t open_stream_mem(char *url,
switch (ot) {
case V3_KS_RD_ONLY:
case V3_KS_WR_ONLY: {
- struct mem_keyed_stream *mks = (struct mem_keyed_stream *) palacios_htable_search(streams,(addr_t)(url+4));
+ struct mem_keyed_stream *mks = (struct mem_keyed_stream *) palacios_htable_search(mem_streams,(addr_t)(url+4));
if (mks) {
mks->ot=ot;
}
break;
case V3_KS_WR_ONLY_CREATE: {
- struct mem_keyed_stream *mks = (struct mem_keyed_stream *) palacios_htable_search(streams,(addr_t)(url+4));
+ struct mem_keyed_stream *mks = (struct mem_keyed_stream *) palacios_htable_search(mem_streams,(addr_t)(url+4));
if (!mks) {
char *mykey;
}
- if (!palacios_htable_insert(streams,(addr_t)(mykey),(addr_t)mks)) {
+ if (!palacios_htable_insert(mem_streams,(addr_t)(mykey),(addr_t)mks)) {
palacios_free_htable(mks->ht,1,1);
kfree(mks);
kfree(mykey);
/***************************************************************************************************
+ User implementation ("user:")
+*************************************************************************************************/
+
+
+// List of all user keyed stream connections for the guest
+struct user_keyed_streams {
+ spinlock_t lock;
+ struct list_head streams;
+};
+
+
+// A single keyed stream connection to user space
+struct user_keyed_stream {
+ int stype;
+ v3_keyed_stream_open_t otype;
+
+ char *url;
+ spinlock_t lock;
+ int waiting;
+
+ wait_queue_head_t user_wait_queue;
+ wait_queue_head_t host_wait_queue;
+
+ struct palacios_user_keyed_stream_op *op;
+
+ struct list_head node;
+};
+
+//
+// List of all of the user streams
+//
+static struct user_keyed_streams *user_streams;
+
+
+
+static int resize_op(struct palacios_user_keyed_stream_op **op, uint64_t buf_len)
+{
+ struct palacios_user_keyed_stream_op *old = *op;
+ struct palacios_user_keyed_stream_op *new;
+
+ if (!old) {
+ new = kmalloc(sizeof(struct palacios_user_keyed_stream_op)+buf_len,GFP_ATOMIC);
+ if (!new) {
+ return -1;
+ } else {
+ new->len=sizeof(struct palacios_user_keyed_stream_op)+buf_len;
+ new->buf_len=buf_len;
+ *op=new;
+ return 0;
+ }
+ } else {
+ if ((old->len-sizeof(struct palacios_user_keyed_stream_op)) >= buf_len) {
+ old->buf_len=buf_len;
+ return 0;
+ } else {
+ kfree(old);
+ old=0;
+ return resize_op(op,buf_len);
+ }
+ }
+}
+
+//
+// The assumption is that we enter this with the stream locked
+// and we will return with it locked; additionally, the op structure
+// will be overwritten with the response
+//
+static int do_request_to_response(struct user_keyed_stream *s, unsigned long *flags)
+{
+
+ if (s->waiting) {
+ printk("palacios: user keyed stream request attempted while one is already in progress on %s\n",s->url);
+ return -1;
+ }
+
+ // we are now waiting for a response
+ s->waiting = 1;
+
+ // release the stream
+ spin_unlock_irqrestore(&(s->lock), *flags);
+
+ // wake up anyone waiting on it
+ wake_up_interruptible(&(s->user_wait_queue));
+
+ // wait for someone to give us a response
+ wait_event_interruptible(s->host_wait_queue, !(s->waiting));
+
+ // reacquire the lock for our called
+ spin_lock_irqsave(&(s->lock), *flags);
+
+ return 0;
+}
+
+//
+// The assumption is that we enter this with the stream locked
+// and we will return with it UNlocked
+//
+static int do_response_to_request(struct user_keyed_stream *s, unsigned long *flags)
+{
+ if (!(s->waiting)) {
+ printk("palacios: user keyed stream response while no request is in progress on %s\n",s->url);
+ return -1;
+ }
+
+ // we are now waiting for a request
+ s->waiting = 0;
+
+ // release the stream
+ spin_unlock_irqrestore(&(s->lock), *flags);
+
+ // wake up anyone waiting on it
+ wake_up_interruptible(&(s->host_wait_queue));
+
+ return 0;
+}
+
+
+
+static unsigned int keyed_stream_poll_user(struct file *filp, poll_table *wait)
+{
+ struct user_keyed_stream *s = (struct user_keyed_stream *) (filp->private_data);
+ unsigned long flags;
+ unsigned int mask = 0;
+
+ if (!s) {
+ return POLLERR;
+ }
+
+ poll_wait(filp, &(s->user_wait_queue), wait);
+
+ spin_lock_irqsave(&(s->lock), flags);
+ if (s->waiting) {
+ mask |= POLLIN | POLLRDNORM;
+ }
+ spin_unlock_irqrestore(&(s->lock), flags);
+
+ return mask;
+}
+
+
+static int keyed_stream_ioctl_user(struct inode *inode, struct file *filp, unsigned int ioctl, unsigned long arg)
+{
+ void __user *argp = (void __user *)arg;
+ unsigned long flags;
+ uint64_t size;
+
+ struct user_keyed_stream *s = (struct user_keyed_stream *) (filp->private_data);
+
+ switch (ioctl) {
+
+ case V3_KSTREAM_REQUEST_SIZE_IOCTL:
+
+ // inform request size
+
+ spin_lock_irqsave(&(s->lock), flags);
+
+ if (!(s->waiting)) {
+ spin_unlock_irqrestore(&(s->lock), flags);
+ return 0;
+ }
+
+ size = sizeof(struct palacios_user_keyed_stream_op) + s->op->buf_len;
+
+ if (copy_to_user(argp, &size, sizeof(uint64_t))) {
+ spin_unlock_irqrestore(&(s->lock), flags);
+ printk("palacios: palacios user key size request failed to copy data\n");
+ return -EFAULT;
+ }
+
+ spin_unlock_irqrestore(&(s->lock), flags);
+
+ return 1;
+
+ break;
+
+ case V3_KSTREAM_REQUEST_PULL_IOCTL:
+
+ // pull the request
+
+ spin_lock_irqsave(&(s->lock), flags);
+
+ if (!(s->waiting)) {
+ spin_unlock_irqrestore(&(s->lock), flags);
+ printk("palacios: palacios user key pull request when not waiting\n");
+ return 0;
+ }
+
+ size = sizeof(struct palacios_user_keyed_stream_op) + s->op->buf_len;
+
+
+ if (copy_to_user(argp, s->op, size)) {
+ spin_unlock_irqrestore(&(s->lock), flags);
+ printk("palacios: palacios user key pull request failed to copy data\n");
+ return -EFAULT;
+ }
+
+ spin_unlock_irqrestore(&(s->lock), flags);
+
+ return 1;
+
+
+ break;
+
+ case V3_KSTREAM_RESPONSE_PUSH_IOCTL:
+
+ // push the response
+
+ spin_lock_irqsave(&(s->lock), flags);
+
+ if (!(s->waiting)) {
+ spin_unlock_irqrestore(&(s->lock), flags);
+ printk("palacios: palacios user key push response when not waiting\n");
+ return 0;
+ }
+
+ if (copy_from_user(&size, argp, sizeof(uint64_t))) {
+ printk("palacios: palacios user key push response failed to copy size\n");
+ spin_unlock_irqrestore(&(s->lock), flags);
+ return -EFAULT;
+ }
+
+ if (resize_op(&(s->op),size-sizeof(struct palacios_user_keyed_stream_op))) {
+ printk("palacios: unable to resize op in user key push response\n");
+ spin_unlock_irqrestore(&(s->lock), flags);
+ return -EFAULT;
+ }
+
+ if (copy_from_user(&(s->op), argp, size)) {
+ spin_unlock_irqrestore(&(s->lock), flags);
+ return -EFAULT;
+ }
+
+ do_response_to_request(s,&flags);
+ // this will have unlocked s for us
+
+ return 1;
+
+ break;
+
+ default:
+ printk("palacios: unknown ioctl in user keyed stream\n");
+
+ return -EFAULT;
+
+ break;
+
+ }
+}
+
+static int keyed_stream_release_user(struct inode *inode, struct file *filp)
+{
+ struct user_keyed_stream *s = filp->private_data;
+ unsigned long f1,f2;
+
+ spin_lock_irqsave(&(user_streams->lock),f1);
+ spin_lock_irqsave(&(s->lock), f2);
+
+ // FIXME Need to handle case of a pending request
+
+ list_del(&(s->node));
+
+ spin_unlock_irqrestore(&(s->lock), f2);
+ spin_unlock_irqrestore(&(user_streams->lock), f1);
+
+ kfree(s->url);
+ kfree(s);
+
+ return 0;
+}
+
+static struct file_operations user_keyed_stream_fops = {
+ .poll = keyed_stream_poll_user,
+ .ioctl = keyed_stream_ioctl_user,
+ .release = keyed_stream_release_user,
+};
+
+
+/*
+ user_keyed_streams are allocated on user connect, and deallocated on user release
+
+ palacios-side opens and closes only manipulate the open type
+*/
+
+int keyed_stream_connect_user(struct v3_guest *guest, unsigned int cmd, unsigned long arg, void *priv_data)
+{
+ int fd;
+ unsigned long flags;
+ char *url;
+ uint64_t len;
+ struct user_keyed_stream *s;
+
+ if (!user_streams) {
+ printk("palacios: no user space keyed streams!\n");
+ return -1;
+ }
+
+ // get the url
+ if (copy_from_user(&len,(void __user *)arg,sizeof(len))) {
+ printk("palacios: cannot copy url len from user\n");
+ return -1;
+ }
+
+ url = kmalloc(len,GFP_KERNEL);
+
+ if (!url) {
+ printk("palacios: cannot allocate url for user keyed stream\n");
+ return -1;
+ }
+
+ if (copy_from_user(url,((void __user *)arg)+sizeof(len),len)) {
+ printk("palacios: cannot copy url from user\n");
+ return -1;
+ }
+ url[len-1]=0;
+
+
+ // Check for duplicate handler
+ spin_lock_irqsave(&(user_streams->lock), flags);
+ list_for_each_entry(s, &(user_streams->streams), node) {
+ if (!strncasecmp(url, s->url, len)) {
+ printk("palacios: user keyed stream connection with url \"%s\" already exists\n", url);
+ kfree(url);
+ return -1;
+ }
+ }
+ spin_unlock_irqrestore(&(user_streams->lock), flags);
+
+ // Create connection
+ s = kmalloc(sizeof(struct user_keyed_stream), GFP_KERNEL);
+
+ if (!s) {
+ printk("palacios: cannot allocate new user keyed stream for %s\n",url);
+ kfree(url);
+ return -1;
+ }
+
+
+ // Get file descriptor
+ fd = anon_inode_getfd("v3-kstream", &user_keyed_stream_fops, s, 0);
+
+ if (fd < 0) {
+ printk("palacios: cannot allocate file descriptor for new user keyed stream for %s\n",url);
+ kfree(s);
+ kfree(url);
+ return -1;
+ }
+
+ memset(s, 0, sizeof(struct user_keyed_stream));
+
+ s->stype=STREAM_USER;
+ s->url=url;
+
+ init_waitqueue_head(&(s->user_wait_queue));
+ init_waitqueue_head(&(s->host_wait_queue));
+
+ // Insert connection into list
+ spin_lock_irqsave(&(user_streams->lock), flags);
+ list_add(&(s->node), &(user_streams->streams));
+ spin_unlock_irqrestore(&(user_streams->lock), flags);
+
+ return fd;
+}
+
+static struct user_keyed_stream *keyed_stream_user_find(char *url)
+{
+ unsigned long flags;
+ struct user_keyed_stream *s;
+
+ if (!user_streams) {
+ printk("palacios: no user space keyed streams available\n");
+ return NULL;
+ }
+
+ spin_lock_irqsave(&(user_streams->lock), flags);
+ list_for_each_entry(s, &(user_streams->streams), node) {
+ if (!strcasecmp(url, s->url)) {
+ spin_unlock_irqrestore(&(user_streams->lock), flags);
+ return s;
+ }
+ }
+
+ spin_unlock_irqrestore(&(user_streams->lock), flags);
+
+ return NULL;
+}
+
+
+static v3_keyed_stream_t open_stream_user(char *url, v3_keyed_stream_open_t ot)
+{
+ unsigned long flags;
+ struct user_keyed_stream *s;
+
+ s = keyed_stream_user_find(url);
+
+ if (!s) {
+ printk("palacios: cannot open user stream %s as it does not exist yet\n",url);
+ return NULL;
+ }
+
+ spin_lock_irqsave(&(s->lock), flags);
+
+ if (s->waiting) {
+ spin_unlock_irqrestore(&(s->lock), flags);
+ printk("palacios: cannot open user stream %s as it is already in waiting state\n",url);
+ return NULL;
+ }
+
+
+ s->otype = ot==V3_KS_WR_ONLY_CREATE ? V3_KS_WR_ONLY : ot;
+
+ spin_unlock_irqrestore(&(s->lock), flags);
+
+ return s;
+
+}
+
+// close stream does not do anything. Creation of the stream and its cleanup
+// are driven by the user side, not the palacios side
+// might eventually want to reference count this, though
+static void close_stream_user(v3_keyed_stream_t stream)
+{
+ return;
+}
+
+static void preallocate_hint_key_user(v3_keyed_stream_t stream,
+ char *key,
+ uint64_t size)
+{
+ return;
+}
+
+
+
+
+static v3_keyed_stream_key_t open_key_user(v3_keyed_stream_t stream, char *key)
+{
+ unsigned long flags;
+ struct user_keyed_stream *s = (struct user_keyed_stream *) stream;
+ struct palacios_user_keyed_stream_op *op;
+ uint64_t len = strlen(key)+1;
+ void *user_key;
+
+ spin_lock_irqsave(&(s->lock), flags);
+
+ if (resize_op(&(s->op),len)) {
+ spin_unlock_irqrestore(&(s->lock),flags);
+ printk("palacios: cannot resize op in opening key %s on user keyed stream %s\n",key,s->url);
+ return NULL;
+ }
+
+ op = s->op;
+
+ op->type = PALACIOS_KSTREAM_OPEN_KEY;
+ op->buf_len = len;
+ strncpy(op->buf,key,len);
+
+
+ // enter with it locked
+ if (do_request_to_response(s,&flags)) {
+ spin_unlock_irqrestore(&(s->lock),flags);
+ printk("palacios: request/response handling failed\n");
+ return NULL;
+ }
+ // return with it locked
+
+ user_key=s->op->user_key;
+
+ spin_unlock_irqrestore(&(s->lock),flags);
+
+ return user_key;
+}
+
+static void close_key_user(v3_keyed_stream_t stream, v3_keyed_stream_key_t key)
+{
+ struct user_keyed_stream *s = (struct user_keyed_stream *) stream;
+ struct palacios_user_keyed_stream_op *op;
+ uint64_t len = 0;
+ unsigned long flags;
+
+ spin_lock_irqsave(&(s->lock), flags);
+
+ if (resize_op(&(s->op),len)) {
+ spin_unlock_irqrestore(&(s->lock),flags);
+ printk("palacios: cannot resize op in closing key 0x%p on user keyed stream %s\n",key,s->url);
+ return;
+ }
+
+ op = s->op;
+
+ op->type = PALACIOS_KSTREAM_CLOSE_KEY;
+ op->buf_len = len;
+ op->user_key = key;
+
+ // enter with it locked
+ if (do_request_to_response(s,&flags)) {
+ spin_unlock_irqrestore(&(s->lock),flags);
+ printk("palacios: request/response handling failed\n");
+ return;
+ }
+ // return with it locked
+
+ spin_unlock_irqrestore(&(s->lock),flags);
+
+ return;
+}
+
+
+
+static sint64_t read_key_user(v3_keyed_stream_t stream, v3_keyed_stream_key_t key,
+ void *buf, sint64_t rlen)
+{
+
+ struct user_keyed_stream *s = (struct user_keyed_stream *) stream;
+ struct palacios_user_keyed_stream_op *op;
+ uint64_t len = 0 ;
+ sint64_t xfer;
+ unsigned long flags;
+
+ spin_lock_irqsave(&(s->lock), flags);
+
+ if (s->otype != V3_KS_RD_ONLY) {
+ spin_unlock_irqrestore(&(s->lock),flags);
+ printk("palacios: attempt to read key from stream that is not in read state on %s\n",s->url);
+ }
+
+ if (resize_op(&(s->op),len)) {
+ spin_unlock_irqrestore(&(s->lock),flags);
+ printk("palacios: cannot resize op in reading key 0x%p on user keyed stream %s\n",key,s->url);
+ return -1;
+ }
+
+ op = s->op;
+
+ op->type = PALACIOS_KSTREAM_READ_KEY;
+ op->buf_len = len;
+ op->user_key = key;
+
+ // enter with it locked
+ if (do_request_to_response(s,&flags)) {
+ spin_unlock_irqrestore(&(s->lock),flags);
+ printk("palacios: request/response handling failed\n");
+ return -1;
+ }
+ // return with it locked
+
+ if (op->xfer>0) {
+ memcpy(buf,op->buf,op->xfer);
+ }
+
+ xfer=op->xfer;
+
+ spin_unlock_irqrestore(&(s->lock),flags);
+
+ return xfer;
+}
+
+
+static sint64_t write_key_user(v3_keyed_stream_t stream, v3_keyed_stream_key_t key,
+ void *buf, sint64_t wlen)
+{
+
+ struct user_keyed_stream *s = (struct user_keyed_stream *) stream;
+ struct palacios_user_keyed_stream_op *op;
+ uint64_t len = wlen ;
+ sint64_t xfer;
+ unsigned long flags;
+
+ spin_lock_irqsave(&(s->lock), flags);
+
+ if (s->otype != V3_KS_WR_ONLY) {
+ spin_unlock_irqrestore(&(s->lock),flags);
+ printk("palacios: attempt to write key on stream that is not in write state on %s\n",s->url);
+ }
+
+ if (resize_op(&(s->op),len)) {
+ spin_unlock_irqrestore(&(s->lock),flags);
+ printk("palacios: cannot resize op in reading key 0x%p on user keyed stream %s\n",key,s->url);
+ return -1;
+ }
+
+ op = s->op;
+
+ op->type = PALACIOS_KSTREAM_WRITE_KEY;
+ op->buf_len = len;
+ op->user_key = key;
+
+ memcpy(op->buf,buf,wlen);
+
+ // enter with it locked
+ if (do_request_to_response(s,&flags)) {
+ spin_unlock_irqrestore(&(s->lock),flags);
+ printk("palacios: request/response handling failed\n");
+ return -1;
+ }
+ // return with it locked
+
+ xfer=op->xfer;
+
+ spin_unlock_irqrestore(&(s->lock),flags);
+
+ return xfer;
+}
+
+
+
+
+/***************************************************************************************************
Generic interface
*************************************************************************************************/
return open_stream_mem(url,ot);
} else if (!strncasecmp(url,"file:",5)) {
return open_stream_file(url,ot);
+ } else if (!strncasecmp(url,"user:",5)) {
+ return open_stream_user(url,ot);
} else {
printk("palacios: unsupported type in attempt to open keyed stream \"%s\"\n",url);
return 0;
case STREAM_FILE:
return close_stream_file(stream);
break;
+ case STREAM_USER:
+ return close_stream_user(stream);
+ break;
default:
printk("palacios: unknown stream type %d in close\n",gks->stype);
break;
case STREAM_FILE:
preallocate_hint_key_file(stream,key,size);
break;
+ case STREAM_USER:
+ return preallocate_hint_key_user(stream,key,size);
+ break;
default:
printk("palacios: unknown stream type %d in preallocate_hint_key\n",gks->stype);
break;
case STREAM_FILE:
return open_key_file(stream,key);
break;
+ case STREAM_USER:
+ return open_key_user(stream,key);
+ break;
default:
printk("palacios: unknown stream type %d in open_key\n",gks->stype);
break;
case STREAM_FILE:
return close_key_file(stream,key);
break;
+ case STREAM_USER:
+ return close_key_user(stream,key);
+ break;
default:
printk("palacios: unknown stream type %d in close_key\n",gks->stype);
break;
case STREAM_FILE:
return write_key_file(stream,key,buf,len);
break;
+ case STREAM_USER:
+ return write_key_user(stream,key,buf,len);
+ break;
default:
printk("palacios: unknown stream type %d in write_key\n",gks->stype);
return -1;
case STREAM_FILE:
return read_key_file(stream,key,buf,len);
break;
+ case STREAM_USER:
+ return read_key_user(stream,key,buf,len);
+ break;
default:
- printk("palacios: unknown stream type %d in write_key\n",gks->stype);
+ printk("palacios: unknown stream type %d in read_key\n",gks->stype);
return -1;
break;
}
static int init_keyed_streams( void )
{
- streams = palacios_create_htable(DEF_NUM_STREAMS,hash_func,hash_comp);
+ mem_streams = palacios_create_htable(DEF_NUM_STREAMS,hash_func,hash_comp);
- if (!streams) {
+ if (!mem_streams) {
printk("palacios: failed to allocated stream pool for in-memory streams\n");
return -1;
}
- V3_Init_Keyed_Streams(&hooks);
+ user_streams = kmalloc(sizeof(struct user_keyed_streams),GFP_KERNEL);
+ if (!user_streams) {
+ printk("palacios: failed to allocated list for user streams\n");
+ return -1;
+ }
+
+ INIT_LIST_HEAD(&(user_streams->streams));
+
+ spin_lock_init(&(user_streams->lock));
+
+ V3_Init_Keyed_Streams(&hooks);
return 0;
static int deinit_keyed_streams( void )
{
- printk("DEINIT OF PALACIOS KEYED STREAMS NOT IMPLEMENTED - WE HAVE JUST LEAKED MEMORY and/or file handles!\n");
- return -1;
+ palacios_free_htable(mem_streams,1,1);
+
+ kfree(user_streams);
+
+ printk("Deinit of Palacios Keyed Streams likely leaked memory\n");
+
+ return 0;
+}
+
+
+static int guest_init_keyed_streams(struct v3_guest * guest, void ** vm_data )
+{
+
+ add_guest_ctrl(guest, V3_VM_KSTREAM_USER_CONNECT, keyed_stream_connect_user, 0);
+
+ return 0;
}
+static int guest_deinit_keyed_streams(struct v3_guest * guest, void * vm_data)
+{
+
+ return 0;
+}
+
+
+
+
static struct linux_ext key_stream_ext = {
.name = "KEYED_STREAM_INTERFACE",
.init = init_keyed_streams,
.deinit = deinit_keyed_streams,
- .guest_init = NULL,
- .guest_deinit = NULL
+ .guest_init = guest_init_keyed_streams,
+ .guest_deinit = guest_deinit_keyed_streams,
};