From: Diana Palsetia and Steve Rangel Date: Wed, 20 Jun 2012 03:58:48 +0000 (-0500) Subject: Network support for keyed streams "net:" X-Git-Url: http://v3vee.org/palacios/gitweb/gitweb.cgi?a=commitdiff_plain;h=cfa677284e96d8284d4d010f88447b1071d7cbde;p=palacios.git Network support for keyed streams "net:" --- diff --git a/linux_module/iface-keyed-stream.c b/linux_module/iface-keyed-stream.c index 9a51cf3..68445d4 100644 --- a/linux_module/iface-keyed-stream.c +++ b/linux_module/iface-keyed-stream.c @@ -5,6 +5,7 @@ * * (c) Peter Dinda, 2011 (interface, mem + file implementations + recooked user impl) * (c) Clint Sbisa, 2011 (initial user space implementation on which this is based) + * (c) Diana Palsetia & Steve Rangel, 2012 (network based implementation) */ #include @@ -25,18 +26,46 @@ #include "iface-keyed-stream-user.h" +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + + /* This is an implementation of the Palacios keyed stream interface - that supports three flavors of streams: + that supports four flavors of streams: "mem:" Streams are stored in a hash table The values for this hash table are hash tables associated with each stream. "file:" Streams are stored in files. Each high-level - open corresponds to a directory, while key corresponds to + open corresponds to a directory, while a key corresponds to a distinct file in that directory. + "net:" Streams are carried over the network. Each + high level open corresponds to a TCP connection, while + each key corresponds to a context on the stream. + "net:a::" => Bind to : and accept a connection + "net:c::" => Connect to : + "user:" Stream requests are bounced to user space to be handled there. A rendezvous approach similar to the host device userland support is used @@ -47,6 +76,7 @@ #define STREAM_MEM 1 #define STREAM_FILE 2 #define STREAM_USER 3 +#define STREAM_NETWORK 4 /* All keyed streams and streams indicate their implementation type within the first field @@ -211,7 +241,7 @@ static v3_keyed_stream_t open_stream_mem(char *url, { if (strncasecmp(url,"mem:",4)) { - WARNING("palacios: illegitimate attempt to open memory stream \"%s\"\n",url); + WARNING("illegitimate attempt to open memory stream \"%s\"\n",url); return 0; } @@ -234,7 +264,7 @@ static v3_keyed_stream_t open_stream_mem(char *url, mykey = kmalloc(strlen(url+4)+1,GFP_KERNEL); if (!mykey) { - ERROR("palacios: cannot allocate space for new in-memory keyed stream %s\n",url); + ERROR("cannot allocate space for new in-memory keyed stream %s\n",url); return 0; } @@ -244,7 +274,7 @@ static v3_keyed_stream_t open_stream_mem(char *url, if (!mks) { kfree(mykey); - ERROR("palacios: cannot allocate in-memory keyed stream %s\n",url); + ERROR("cannot allocate in-memory keyed stream %s\n",url); return 0; } @@ -252,7 +282,7 @@ static v3_keyed_stream_t open_stream_mem(char *url, if (!mks->ht) { kfree(mks); kfree(mykey); - ERROR("palacios: cannot allocate in-memory keyed stream %s\n",url); + ERROR("cannot allocate in-memory keyed stream %s\n",url); return 0; } @@ -261,7 +291,7 @@ static v3_keyed_stream_t open_stream_mem(char *url, palacios_free_htable(mks->ht,1,1); kfree(mks); kfree(mykey); - ERROR("palacios: cannot insert in-memory keyed stream %s\n",url); + ERROR("cannot insert in-memory keyed stream %s\n",url); return 0; } mks->stype=STREAM_MEM; @@ -275,7 +305,7 @@ static v3_keyed_stream_t open_stream_mem(char *url, break; default: - ERROR("palacios: unsupported open type in open_stream_mem\n"); + ERROR("unsupported open type in open_stream_mem\n"); break; } @@ -306,7 +336,7 @@ static v3_keyed_stream_key_t open_key_mem(v3_keyed_stream_t stream, char *mykey = kmalloc(strlen(key)+1,GFP_KERNEL); if (!mykey) { - ERROR("palacios: cannot allocate copy of key for key %s\n",key); + ERROR("cannot allocate copy of key for key %s\n",key); return 0; } @@ -316,14 +346,14 @@ static v3_keyed_stream_key_t open_key_mem(v3_keyed_stream_t stream, if (!m) { kfree(mykey); - ERROR("palacios: cannot allocate mem keyed stream for key %s\n",key); + ERROR("cannot allocate mem keyed stream for key %s\n",key); return 0; } if (!palacios_htable_insert(s,(addr_t)mykey,(addr_t)m)) { destroy_mem_stream(m); kfree(mykey); - ERROR("palacios: cannot insert mem keyed stream for key %s\n",key); + ERROR("cannot insert mem keyed stream for key %s\n",key); return 0; } } @@ -355,7 +385,7 @@ static void preallocate_hint_key_mem(v3_keyed_stream_t stream, mykey=kmalloc(strlen(key)+1,GFP_KERNEL); if (!mykey) { - ERROR("palacios: cannot allocate key space for preallocte for key %s\n",key); + ERROR("cannot allocate key space for preallocte for key %s\n",key); return; } @@ -364,19 +394,19 @@ static void preallocate_hint_key_mem(v3_keyed_stream_t stream, m = create_mem_stream_internal(size); if (!m) { - ERROR("palacios: cannot preallocate mem keyed stream for key %s\n",key); + ERROR("cannot preallocate mem keyed stream for key %s\n",key); return; } if (!palacios_htable_insert(s,(addr_t)mykey,(addr_t)m)) { - ERROR("palacios: cannot insert preallocated mem keyed stream for key %s\n",key); + ERROR("cannot insert preallocated mem keyed stream for key %s\n",key); destroy_mem_stream(m); return; } } else { if (m->data_max < size) { if (expand_mem_stream(m,size)) { - ERROR("palacios: cannot expand key for preallocation for key %s\n",key); + ERROR("cannot expand key for preallocation for key %s\n",key); return; } } @@ -416,7 +446,7 @@ static sint64_t write_key_mem(v3_keyed_stream_t stream, writelen=write_mem_stream(m,buf,mylen); if (writelen!=mylen) { - ERROR("palacios: failed to write all data for key\n"); + ERROR("failed to write all data for key\n"); return -1; } else { return (sint64_t)writelen; @@ -446,7 +476,7 @@ static sint64_t read_key_mem(v3_keyed_stream_t stream, readlen=read_mem_stream(m,buf,mylen); if (readlen!=mylen) { - ERROR("palacios: failed to read all data for key\n"); + ERROR("failed to read all data for key\n"); return -1; } else { return (sint64_t)readlen; @@ -482,21 +512,21 @@ static v3_keyed_stream_t open_stream_file(char *url, struct nameidata nd; if (strncasecmp(url,"file:",5)) { - WARNING("palacios: illegitimate attempt to open file stream \"%s\"\n",url); + WARNING("illegitimate attempt to open file stream \"%s\"\n",url); return 0; } fks = kmalloc(sizeof(struct file_keyed_stream),GFP_KERNEL); if (!fks) { - ERROR("palacios: cannot allocate space for file stream\n"); + ERROR("cannot allocate space for file stream\n"); return 0; } fks->path = (char*)kmalloc(strlen(url+5)+1,GFP_KERNEL); if (!(fks->path)) { - ERROR("palacios: cannot allocate space for file stream\n"); + ERROR("cannot allocate space for file stream\n"); kfree(fks); return 0; } @@ -516,7 +546,7 @@ static v3_keyed_stream_t open_stream_file(char *url, if (ot==V3_KS_RD_ONLY || ot==V3_KS_WR_ONLY) { // we are not being asked to create it - ERROR("palacios: attempt to open %s, which does not exist\n",fks->path); + ERROR("attempt to open %s, which does not exist\n",fks->path); goto fail_out; } else { @@ -528,14 +558,14 @@ static v3_keyed_stream_t open_stream_file(char *url, // Find its parent if (path_lookup(fks->path,LOOKUP_PARENT|LOOKUP_FOLLOW,&nd)) { - ERROR("palacios: attempt to create %s failed because its parent cannot be looked up\n",fks->path); + ERROR("attempt to create %s failed because its parent cannot be looked up\n",fks->path); goto fail_out; } // Can we write to the parent? if (inode_permission(nd.path.dentry->d_inode, MAY_WRITE | MAY_EXEC)) { - ERROR("palacios: attempt to open %s, which has the wrong permissions for directory creation\n",fks->path); + ERROR("attempt to open %s, which has the wrong permissions for directory creation\n",fks->path); goto fail_out; } @@ -544,7 +574,7 @@ static v3_keyed_stream_t open_stream_file(char *url, de = lookup_create(&nd,1); if (IS_ERR(de)) { - ERROR("palacios: cannot allocate dentry\n"); + ERROR("cannot allocate dentry\n"); goto fail_out; } @@ -555,7 +585,7 @@ static v3_keyed_stream_t open_stream_file(char *url, mutex_unlock(&(nd.path.dentry->d_inode->i_mutex)); if (err) { - ERROR("palacios: attempt to create %s failed because mkdir failed\n",fks->path); + ERROR("attempt to create %s failed because mkdir failed\n",fks->path); goto fail_out; } @@ -569,7 +599,7 @@ static v3_keyed_stream_t open_stream_file(char *url, // and the directory exists, so we must check the permissions if (inode_permission(nd.path.dentry->d_inode, MAY_EXEC | (ot==V3_KS_RD_ONLY ? MAY_READ : MAY_WRITE))) { - ERROR("palacios: attempt to open %s, which has the wrong permissions\n",fks->path); + ERROR("attempt to open %s, which has the wrong permissions\n",fks->path); goto fail_out; } else { return (v3_keyed_stream_t) fks; @@ -610,7 +640,7 @@ static v3_keyed_stream_key_t open_key_file(v3_keyed_stream_t stream, // file:/home/foo + "regext" => "/home/foo/regext" path = (char *) kmalloc(strlen(fks->path)+strlen(key)+2,GFP_KERNEL); if (!path) { - ERROR("palacios: cannot allocate file keyed stream for key %s\n",key); + ERROR("cannot allocate file keyed stream for key %s\n",key); return 0; } strcpy(path,fks->path); @@ -620,7 +650,7 @@ static v3_keyed_stream_key_t open_key_file(v3_keyed_stream_t stream, fs = (struct file_stream *) kmalloc(sizeof(struct file_stream *),GFP_KERNEL); if (!fs) { - ERROR("palacios: cannot allocate file keyed stream for key %s\n",key); + ERROR("cannot allocate file keyed stream for key %s\n",key); kfree(path); return 0; } @@ -630,7 +660,7 @@ static v3_keyed_stream_key_t open_key_file(v3_keyed_stream_t stream, fs->f = filp_open(path,O_RDWR|O_CREAT,0600); if (IS_ERR(fs->f)) { - ERROR("palacios: cannot open relevent file \"%s\" for stream \"file:%s\" and key \"%s\"\n",path,fks->path,key); + ERROR("cannot open relevent file \"%s\" for stream \"file:%s\" and key \"%s\"\n",path,fks->path,key); kfree(fs); kfree(path); return 0; @@ -805,7 +835,7 @@ static int do_request_to_response(struct user_keyed_stream *s, unsigned long *fl { if (s->waiting) { - ERROR("palacios: user keyed stream request attempted while one is already in progress on %s\n",s->url); + ERROR("user keyed stream request attempted while one is already in progress on %s\n",s->url); return -1; } @@ -835,7 +865,7 @@ static int do_response_to_request(struct user_keyed_stream *s, unsigned long *fl { if (!(s->waiting)) { - ERROR("palacios: user keyed stream response while no request is in progress on %s\n",s->url); + ERROR("user keyed stream response while no request is in progress on %s\n",s->url); return -1; } @@ -901,7 +931,7 @@ static long keyed_stream_ioctl_user(struct file * filp, unsigned int ioctl, unsi if (copy_to_user((void * __user) argp, &size, sizeof(uint64_t))) { spin_unlock_irqrestore(&(s->lock), flags); - ERROR("palacios: palacios user key size request failed to copy data\n"); + ERROR("palacios user key size request failed to copy data\n"); return -EFAULT; } @@ -919,7 +949,7 @@ static long keyed_stream_ioctl_user(struct file * filp, unsigned int ioctl, unsi if (!(s->waiting)) { spin_unlock_irqrestore(&(s->lock), flags); - ERROR("palacios: palacios user key pull request when not waiting\n"); + ERROR("palacios user key pull request when not waiting\n"); return 0; } @@ -928,7 +958,7 @@ static long keyed_stream_ioctl_user(struct file * filp, unsigned int ioctl, unsi if (copy_to_user((void __user *) argp, s->op, size)) { spin_unlock_irqrestore(&(s->lock), flags); - ERROR("palacios: palacios user key pull request failed to copy data\n"); + ERROR("palacios user key pull request failed to copy data\n"); return -EFAULT; } @@ -947,18 +977,18 @@ static long keyed_stream_ioctl_user(struct file * filp, unsigned int ioctl, unsi if (!(s->waiting)) { spin_unlock_irqrestore(&(s->lock), flags); - ERROR("palacios: palacios user key push response when not waiting\n"); + ERROR("palacios user key push response when not waiting\n"); return 0; } if (copy_from_user(&size, (void __user *) argp, sizeof(uint64_t))) { - ERROR("palacios: palacios user key push response failed to copy size\n"); + ERROR("palacios user key push response failed to copy size\n"); spin_unlock_irqrestore(&(s->lock), flags); return -EFAULT; } if (resize_op(&(s->op),size-sizeof(struct palacios_user_keyed_stream_op))) { - ERROR("palacios: unable to resize op in user key push response\n"); + ERROR("unable to resize op in user key push response\n"); spin_unlock_irqrestore(&(s->lock), flags); return -EFAULT; } @@ -976,7 +1006,7 @@ static long keyed_stream_ioctl_user(struct file * filp, unsigned int ioctl, unsi break; default: - ERROR("palacios: unknown ioctl in user keyed stream\n"); + ERROR("unknown ioctl in user keyed stream\n"); return -EFAULT; @@ -1028,25 +1058,25 @@ int keyed_stream_connect_user(struct v3_guest *guest, unsigned int cmd, unsigned struct user_keyed_stream *s; if (!user_streams) { - ERROR("palacios: no user space keyed streams!\n"); + ERROR("no user space keyed streams!\n"); return -1; } // get the url if (copy_from_user(&len,(void __user *)arg,sizeof(len))) { - ERROR("palacios: cannot copy url len from user\n"); + ERROR("cannot copy url len from user\n"); return -1; } url = kmalloc(len,GFP_KERNEL); if (!url) { - ERROR("palacios: cannot allocate url for user keyed stream\n"); + ERROR("cannot allocate url for user keyed stream\n"); return -1; } if (copy_from_user(url,((void __user *)arg)+sizeof(len),len)) { - ERROR("palacios: cannot copy url from user\n"); + ERROR("cannot copy url from user\n"); return -1; } url[len-1]=0; @@ -1056,7 +1086,7 @@ int keyed_stream_connect_user(struct v3_guest *guest, unsigned int cmd, unsigned spin_lock_irqsave(&(user_streams->lock), flags); list_for_each_entry(s, &(user_streams->streams), node) { if (!strncasecmp(url, s->url, len)) { - ERROR("palacios: user keyed stream connection with url \"%s\" already exists\n", url); + ERROR("user keyed stream connection with url \"%s\" already exists\n", url); kfree(url); return -1; } @@ -1067,7 +1097,7 @@ int keyed_stream_connect_user(struct v3_guest *guest, unsigned int cmd, unsigned s = kmalloc(sizeof(struct user_keyed_stream), GFP_KERNEL); if (!s) { - ERROR("palacios: cannot allocate new user keyed stream for %s\n",url); + ERROR("cannot allocate new user keyed stream for %s\n",url); kfree(url); return -1; } @@ -1077,7 +1107,7 @@ int keyed_stream_connect_user(struct v3_guest *guest, unsigned int cmd, unsigned fd = anon_inode_getfd("v3-kstream", &user_keyed_stream_fops, s, 0); if (fd < 0) { - ERROR("palacios: cannot allocate file descriptor for new user keyed stream for %s\n",url); + ERROR("cannot allocate file descriptor for new user keyed stream for %s\n",url); kfree(s); kfree(url); return -1; @@ -1105,7 +1135,7 @@ static struct user_keyed_stream *keyed_stream_user_find(char *url) struct user_keyed_stream *s; if (!user_streams) { - ERROR("palacios: no user space keyed streams available\n"); + ERROR("no user space keyed streams available\n"); return NULL; } @@ -1131,7 +1161,7 @@ static v3_keyed_stream_t open_stream_user(char *url, v3_keyed_stream_open_t ot) s = keyed_stream_user_find(url); if (!s) { - ERROR("palacios: cannot open user stream %s as it does not exist yet\n",url); + ERROR("cannot open user stream %s as it does not exist yet\n",url); return NULL; } @@ -1139,7 +1169,7 @@ static v3_keyed_stream_t open_stream_user(char *url, v3_keyed_stream_open_t ot) if (s->waiting) { spin_unlock_irqrestore(&(s->lock), flags); - ERROR("palacios: cannot open user stream %s as it is already in waiting state\n",url); + ERROR("cannot open user stream %s as it is already in waiting state\n",url); return NULL; } @@ -1181,7 +1211,7 @@ static v3_keyed_stream_key_t open_key_user(v3_keyed_stream_t stream, char *key) if (resize_op(&(s->op),len)) { spin_unlock_irqrestore(&(s->lock),flags); - ERROR("palacios: cannot resize op in opening key %s on user keyed stream %s\n",key,s->url); + ERROR("cannot resize op in opening key %s on user keyed stream %s\n",key,s->url); return NULL; } @@ -1192,7 +1222,7 @@ static v3_keyed_stream_key_t open_key_user(v3_keyed_stream_t stream, char *key) // enter with it locked if (do_request_to_response(s,&flags)) { spin_unlock_irqrestore(&(s->lock),flags); - ERROR("palacios: request/response handling failed\n"); + ERROR("request/response handling failed\n"); return NULL; } // return with it locked @@ -1214,7 +1244,7 @@ static void close_key_user(v3_keyed_stream_t stream, v3_keyed_stream_key_t key) if (resize_op(&(s->op),len)) { spin_unlock_irqrestore(&(s->lock),flags); - ERROR("palacios: cannot resize op in closing key 0x%p on user keyed stream %s\n",key,s->url); + ERROR("cannot resize op in closing key 0x%p on user keyed stream %s\n",key,s->url); return; } @@ -1225,7 +1255,7 @@ static void close_key_user(v3_keyed_stream_t stream, v3_keyed_stream_key_t key) // enter with it locked if (do_request_to_response(s,&flags)) { spin_unlock_irqrestore(&(s->lock),flags); - ERROR("palacios: request/response handling failed\n"); + ERROR("request/response handling failed\n"); return; } // return with it locked @@ -1250,12 +1280,12 @@ static sint64_t read_key_user(v3_keyed_stream_t stream, v3_keyed_stream_key_t ke if (s->otype != V3_KS_RD_ONLY) { spin_unlock_irqrestore(&(s->lock),flags); - ERROR("palacios: attempt to read key from stream that is not in read state on %s\n",s->url); + ERROR("attempt to read key from stream that is not in read state on %s\n",s->url); } if (resize_op(&(s->op),len)) { spin_unlock_irqrestore(&(s->lock),flags); - ERROR("palacios: cannot resize op in reading key 0x%p on user keyed stream %s\n",key,s->url); + ERROR("cannot resize op in reading key 0x%p on user keyed stream %s\n",key,s->url); return -1; } @@ -1267,7 +1297,7 @@ static sint64_t read_key_user(v3_keyed_stream_t stream, v3_keyed_stream_key_t ke // enter with it locked if (do_request_to_response(s,&flags)) { spin_unlock_irqrestore(&(s->lock),flags); - ERROR("palacios: request/response handling failed\n"); + ERROR("request/response handling failed\n"); return -1; } // return with it locked @@ -1299,12 +1329,12 @@ static sint64_t write_key_user(v3_keyed_stream_t stream, v3_keyed_stream_key_t k if (s->otype != V3_KS_WR_ONLY) { spin_unlock_irqrestore(&(s->lock),flags); - ERROR("palacios: attempt to write key on stream that is not in write state on %s\n",s->url); + ERROR("attempt to write key on stream that is not in write state on %s\n",s->url); } if (resize_op(&(s->op),len)) { spin_unlock_irqrestore(&(s->lock),flags); - ERROR("palacios: cannot resize op in reading key 0x%p on user keyed stream %s\n",key,s->url); + ERROR("cannot resize op in reading key 0x%p on user keyed stream %s\n",key,s->url); return -1; } @@ -1320,7 +1350,7 @@ static sint64_t write_key_user(v3_keyed_stream_t stream, v3_keyed_stream_key_t k // enter with it locked if (do_request_to_response(s,&flags)) { spin_unlock_irqrestore(&(s->lock),flags); - ERROR("palacios: request/response handling failed\n"); + ERROR("request/response handling failed\n"); return -1; } // return with it locked @@ -1334,6 +1364,589 @@ static sint64_t write_key_user(v3_keyed_stream_t stream, v3_keyed_stream_key_t k +/**************************************************************************************** + * Network-based implementation ("net:") + *****************************************************************************************/ + + +#define NET_MAX_KEY_LEN 128 + +struct net_keyed_stream { + int stype; + int ot; + struct net_stream * ns; +}; + +struct net_stream { + int stype; + struct socket *sock; +}; + + +//ignore the arguments given here currently +static struct net_stream * create_net_stream(void) +{ + struct net_stream * ns = NULL; + + ns = kmalloc(sizeof(struct net_stream), GFP_KERNEL); + + if (!ns) { + ERROR("Cannot allocate a net_stream\n"); + return 0; + } + + memset(ns, 0, sizeof(struct net_stream)); + + ns->stype = STREAM_NETWORK; + + return ns; +} + +static void close_socket(v3_keyed_stream_t stream) +{ + struct net_keyed_stream *nks = (struct net_keyed_stream *) stream; + + if (nks) { + struct net_stream *ns = nks->ns; + + if (ns) { + ns->sock->ops->release(ns->sock); + kfree(ns); + ERROR("Close Socket\n"); + } + + kfree(ns); + } +} + + +static void close_stream_net(v3_keyed_stream_t stream) +{ + close_socket(stream); +} + +static int connect_to_ip(struct net_stream *ns, int hostip, int port) +{ + struct sockaddr_in client; + + if (ns == NULL) { + return -1; + } + + if (sock_create(PF_INET,SOCK_STREAM,IPPROTO_TCP,&(ns->sock))<0) { + ERROR("Cannot create accept socket\n"); + return -1; + } + + + client.sin_family = AF_INET; + client.sin_port = htons(port); + client.sin_addr.s_addr = hostip;//in_aton(hostip); + + return ns->sock->ops->connect(ns->sock, (struct sockaddr *)&client, sizeof(client), 0); +} + +static int send_msg(struct net_stream *ns, char * buf, int len) +{ + int left=len; + + if (!ns) { + ERROR("Send message on null net_stream\n"); + return -1; + } + + if (!(ns->sock)) { + ERROR("Send message on net_stream without socket\n"); + return -1; + } + + while (left>0) { + + struct msghdr msg; + mm_segment_t oldfs; + struct iovec iov; + int err = 0; + + + msg.msg_flags = MSG_NOSIGNAL;//MSG_DONTWAIT; + msg.msg_name = 0; + msg.msg_namelen = 0; + msg.msg_control = NULL; + msg.msg_controllen = 0; + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + + iov.iov_base = (char *)&(buf[len-left]); + iov.iov_len = (size_t)left; + + oldfs = get_fs(); + set_fs(KERNEL_DS); + + err = sock_sendmsg(ns->sock, &msg, (size_t)left); + + set_fs(oldfs); + + if (err<0) { + ERROR("Send msg error %d\n",err); + return err; + } else { + left-=len; + } + } + + return len; +} + + + +static int recv_msg(struct net_stream *ns, char * buf, int len) +{ + + int left=len; + + if (!ns) { + ERROR("Receive message on null net_stream\n"); + return -1; + } + + if (!(ns->sock)) { + ERROR("Receive message on net_stream without socket\n"); + return -1; + } + + + while (left>0) { + + struct msghdr msg; + mm_segment_t oldfs; + struct iovec iov; + int err; + + msg.msg_flags = 0; + msg.msg_name = 0; + msg.msg_namelen = 0; + msg.msg_control = NULL; + msg.msg_controllen = 0; + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + + iov.iov_base = (void *)&(buf[len-left]); + iov.iov_len = (size_t)left; + + oldfs = get_fs(); + set_fs(KERNEL_DS); + + err = sock_recvmsg(ns->sock, &msg, (size_t)left, 0); + + set_fs(oldfs); + + if (err<0) { + return err; + } else { + left -= err; + } + } + return len; +} + +static struct net_stream * accept_once(struct net_stream * ns, const int port) +{ + struct socket *accept_sock; + struct sockaddr_in addr; + int err; + + if (!ns) { + ERROR("Accept called on null net_stream\n"); + return 0; + } + + if (sock_create(PF_INET,SOCK_STREAM,IPPROTO_TCP,&accept_sock)<0) { + ERROR("Cannot create accept socket\n"); + return NULL; + } + + + addr.sin_family = AF_INET; + addr.sin_port = htons(port); + addr.sin_addr.s_addr = INADDR_ANY; + + err = accept_sock->ops->bind(accept_sock, (struct sockaddr *)&addr, sizeof(addr)); + + if (err<0) { + ERROR("Bind err: %d\n",err); + return NULL; + } + + err = accept_sock->ops->listen(accept_sock,2); + + if (err<0) { + ERROR("Listen err: %d\n",err); + return NULL; + } + + // Init the socket in the network strream + + if (sock_create(PF_INET,SOCK_STREAM,IPPROTO_TCP,&(ns->sock))<0) { + ERROR("Cannot create socket\n"); + return NULL; + } + + + // Do the actual accept + + if (accept_sock->ops->accept(accept_sock,ns->sock,0)<0) { + ERROR("accept failed"); + return NULL; + } + + // close the accept socket + accept_sock->ops->release(accept_sock); + kfree(accept_sock); + + return ns; +} + + +static struct v3_keyed_stream_t * open_stream_net(char * url,v3_keyed_stream_open_t ot) +{ + struct net_keyed_stream * nks; + int url_len; + int i; + int delimit[3]; + int k; + char mode; + int ip_len; + int port_len; + + nks = kmalloc(sizeof(struct net_keyed_stream),GFP_KERNEL); + + if (!nks) { + ERROR("Could not allocate space in open_stream_net\n"); + return 0; + } + + nks->ot = ot == V3_KS_WR_ONLY_CREATE ? V3_KS_WR_ONLY : ot; + + nks->stype = STREAM_NETWORK; + + nks->ns = create_net_stream(); + + if (!(nks->ns)) { + ERROR("Could not create network stream\n"); + kfree(nks); + return 0; + } + + url_len=strlen(url); + k=0; + + + for(i = 0; i < url_len;i++){ + if(url[i] == ':'){ + delimit[k] = i; + k++; + } + } + + mode = url[delimit[0] + 1]; + ip_len = delimit[2] - delimit[1]; + port_len = url_len - delimit[2]; + + + { + char ip[ip_len]; + char port[port_len]; + int host_ip; + int host_port; + + + strncpy(ip,url + delimit[1]+1,ip_len-1); + ip[ip_len-1]='\0'; + + host_ip = in_aton(ip); + + strncpy(port,url+ delimit[2]+1,port_len-1); + port[port_len-1]='\0'; + + host_port = simple_strtol(port,NULL,10); + + INFO("ip is %s\n",ip); + INFO("host_ip is %x\n", host_ip); + INFO("port is %s (%d)\n",port,host_port); + + if (mode == 'a'){ + // accept a request + INFO("Accepting Connection on INADDR_ANY port:%d\n",host_port); + nks->ns = accept_once(nks->ns, host_port); + } else if (mode == 'c'){ + // call connect to ip + INFO("Connecting to %s:%d\n",ip,host_port); + connect_to_ip(nks->ns,host_ip, host_port); + } else { + ERROR("Mode not recognized\n"); + kfree(nks); + return NULL; + } + + return (v3_keyed_stream_t)nks; + } +} + +static void preallocate_hint_key_net(v3_keyed_stream_t stream, char *key,uint64_t size) +{ + //do nothing +} + +static v3_keyed_stream_key_t open_key_net(v3_keyed_stream_t stream,char *key) +{ + struct net_keyed_stream * nks = (struct net_keyed_stream *)stream; + + // reciever of the key name + if (nks->ot==V3_KS_WR_ONLY) + { + unsigned short keylen = strlen(key); + + if (keylen>NET_MAX_KEY_LEN || keylen>=32768) { + ERROR("Key is too long\n"); + return NULL; + } + + { + // on-stack allocation here demands that we + // keep key length low... + char msg[keylen+3]; + int next = 0; + + // Opening a key for writing sends a notice of the + // key length and the key name on the channel + + msg[next++]=keylen & 0xFF; + msg[next]=(keylen>>8) & 0xFF; + // Set flag bit + msg[next]=msg[next] | 0x80; // 0x80 is 128 and OR will flip leading bit to 1 + + strncpy(msg+2,key,keylen); // will also copy trailing zero + + if (send_msg(nks->ns,msg,keylen+2) != keylen+2) { + ERROR("Unable to open key for writing on net_stream (send key len+name)\n"); + return NULL; + } + } + } + + if (nks->ot==V3_KS_RD_ONLY) { + char msg_info[2]; + int next = 0; + int keylen = 0; + + if (recv_msg(nks->ns,msg_info,2) != 2) { + ERROR("Unable to open key for reading on net_stream (recv key len)\n"); + return NULL; + } + + next = 0; + keylen = 0; + + keylen |= msg_info[next++]; + + if ((msg_info[next] & 0x80) != 0x80) { + ERROR("Flag bit not set on receive of key length\n"); + return NULL; + } else { + msg_info[next] &= 0x7F; // flip the msb back to zero (clear flag) + } + + keylen |= msg_info[next]<<8; + + if (keylen > NET_MAX_KEY_LEN) { + ERROR("Received key length is too big\n"); + return NULL; + } + + { + + char msg[keylen+1]; + + if (recv_msg(nks->ns,msg,keylen) != keylen) { + ERROR("Unable to receive key\n"); + return NULL; + } + msg[keylen]=0; + + if (strncmp(key,msg,keylen)!=0) { + ERROR("Key mismatch in open_key_net - expect %s but got %s\n",key,msg); + return NULL; + } + } + } + + return (v3_keyed_stream_key_t)key; +} + +static void close_key_net(v3_keyed_stream_t stream, v3_keyed_stream_key_t input_key) +{ + char * key = (char*)input_key; + struct net_keyed_stream * nks = (struct net_keyed_stream *)stream; + + + if (nks->ot==V3_KS_WR_ONLY) { + unsigned short keylen = strlen(key); + + if (keylen > NET_MAX_KEY_LEN || keylen>=32768) { + ERROR("Key length too long in close_key_net\n"); + return; + } + + { + char msg[keylen+3]; + int next = 0; + + msg[next++]=keylen & 0xFF; + msg[next]=(keylen>>8) & 0xFF; + // flag + msg[next]=msg[next] | 0x80; // 0x80 is 128 and OR will filp leading bit to 1 + strncpy(msg+2,key,keylen); // will copy the zero + msg[keylen+2]=0; + if (send_msg(nks->ns,msg,keylen+2)!=keylen+2) { + ERROR("Cannot send key on close_key_net\n"); + return; + } + } + } + + if (nks->ot==V3_KS_RD_ONLY) { + char msg_info[2]; + int next; + int keylen; + + if (recv_msg(nks->ns,msg_info,2) != 2) { + ERROR("Cannot recv key length on close_key_net\n"); + return; + } + + next = 0; + keylen = 0; + + keylen |= msg_info[next++]; + + if ((msg_info[next] & 0x80) != 0x80) { + ERROR("Missing flag in close_key_net receive\n"); + return; + } + + msg_info[next] &= 0x7F; // flip the msb back to zero + + keylen |= msg_info[next]<<8; + + { + char msg[keylen+1]; + + if (recv_msg(nks->ns,msg,keylen)!=keylen) { + ERROR("Did not receive all of key in close_key_net receive\n"); + return; + } + + msg[keylen]=0; + + if (strncmp(key,msg,keylen)!=0) { + ERROR("Key mismatch in close_key_net - expect %s but got %s\n",key,msg); + return; + } + } + } +} + +static sint64_t write_key_net(v3_keyed_stream_t stream, v3_keyed_stream_key_t key, void *buf, sint64_t len) +{ + struct net_keyed_stream * nks = (struct net_keyed_stream *)stream; + + if (!buf) { + ERROR("Buf is NULL in write_key_net\n"); + return -1; + } + + if (len<0) { + ERROR("len is negative in write_key_net\n"); + return -1; + } + + if (!key){ + ERROR("write_key: key is NULL in write_key_net\n"); + return -1; + } + + + if (!nks) { + ERROR("nks is NULL in write_key_net\n"); + return -1; + } + + if (nks->ot==V3_KS_WR_ONLY) { + if (send_msg(nks->ns,(char*)(&len),sizeof(len))!=sizeof(len)) { + ERROR("Could not send data length in write_key_net\n"); + return -1; + } + if (send_msg(nks->ns,buf,len)!=len) { + ERROR("Could not send data in write_key_net\n"); + return -1; + } + } else { + ERROR("Permission not correct in write_key_net\n"); + return -1; + } + + return len; +} + + +static sint64_t read_key_net(v3_keyed_stream_t stream, v3_keyed_stream_key_t key, void *buf, sint64_t len) +{ + struct net_keyed_stream * nks = (struct net_keyed_stream *)stream; + + if (!buf) { + ERROR("Buf is NULL in read_key_net\n"); + return -1; + } + + if(len<0) { + ERROR("len is negative in read_key_net\n"); + return -1; + } + + if (!key) { + ERROR("read_key: key is NULL in read_key_net\n"); + return -1; + } + + + if (nks->ot==V3_KS_RD_ONLY) { + + sint64_t slen; + + if (recv_msg(nks->ns,(char*)(&slen),sizeof(slen))!=sizeof(slen)) { + ERROR("Cannot receive data len in read_key_net\n"); + return -1; + } + + if (slen!=len) { + ERROR("Data len expected does not matched data len decoded in read_key_net\n"); + return -1; + } + + if (recv_msg(nks->ns,buf,len)!=len) { + ERROR("Cannot recieve data in read_key_net\n"); + return -1; + } + + } else { + ERROR("Permissions incorrect for the stream in read_key_net\n"); + return -1; + } + + return len; + +} + /*************************************************************************************************** Generic interface @@ -1348,8 +1961,10 @@ static v3_keyed_stream_t open_stream(char *url, return open_stream_file(url,ot); } else if (!strncasecmp(url,"user:",5)) { return open_stream_user(url,ot); + } else if(!strncasecmp(url,"net:",4)){ + return open_stream_net(url,ot); } else { - ERROR("palacios: unsupported type in attempt to open keyed stream \"%s\"\n",url); + ERROR("unsupported type in attempt to open keyed stream \"%s\"\n",url); return 0; } } @@ -1367,8 +1982,11 @@ static void close_stream(v3_keyed_stream_t stream) case STREAM_USER: return close_stream_user(stream); break; + case STREAM_NETWORK: + return close_stream_net(stream); + break; default: - ERROR("palacios: unknown stream type %d in close\n",gks->stype); + ERROR("unknown stream type %d in close\n",gks->stype); break; } } @@ -1388,8 +2006,11 @@ static void preallocate_hint_key(v3_keyed_stream_t stream, case STREAM_USER: return preallocate_hint_key_user(stream,key,size); break; + case STREAM_NETWORK: + return preallocate_hint_key_net(stream,key,size); + break; default: - ERROR("palacios: unknown stream type %d in preallocate_hint_key\n",gks->stype); + ERROR("unknown stream type %d in preallocate_hint_key\n",gks->stype); break; } return; @@ -1410,8 +2031,11 @@ static v3_keyed_stream_key_t open_key(v3_keyed_stream_t stream, case STREAM_USER: return open_key_user(stream,key); break; + case STREAM_NETWORK: + return open_key_net(stream, key); + break; default: - ERROR("palacios: unknown stream type %d in open_key\n",gks->stype); + ERROR("unknown stream type %d in open_key\n",gks->stype); break; } return 0; @@ -1432,8 +2056,11 @@ static void close_key(v3_keyed_stream_t stream, case STREAM_USER: return close_key_user(stream,key); break; + case STREAM_NETWORK: + return close_key_net(stream, key); + break; default: - ERROR("palacios: unknown stream type %d in close_key\n",gks->stype); + ERROR("unknown stream type %d in close_key\n",gks->stype); break; } // nothing to do @@ -1456,8 +2083,11 @@ static sint64_t write_key(v3_keyed_stream_t stream, case STREAM_USER: return write_key_user(stream,key,buf,len); break; + case STREAM_NETWORK: + return write_key_net(stream,key,buf,len); + break; default: - ERROR("palacios: unknown stream type %d in write_key\n",gks->stype); + ERROR("unknown stream type %d in write_key\n",gks->stype); return -1; break; } @@ -1481,8 +2111,11 @@ static sint64_t read_key(v3_keyed_stream_t stream, case STREAM_USER: return read_key_user(stream,key,buf,len); break; + case STREAM_NETWORK: + return read_key_net(stream,key,buf,len); + break; default: - ERROR("palacios: unknown stream type %d in read_key\n",gks->stype); + ERROR("unknown stream type %d in read_key\n",gks->stype); return -1; break; } @@ -1513,14 +2146,14 @@ static int init_keyed_streams( void ) mem_streams = palacios_create_htable(DEF_NUM_STREAMS,hash_func,hash_comp); if (!mem_streams) { - ERROR("palacios: failed to allocated stream pool for in-memory streams\n"); + ERROR("failed to allocated stream pool for in-memory streams\n"); return -1; } user_streams = kmalloc(sizeof(struct user_keyed_streams),GFP_KERNEL); if (!user_streams) { - ERROR("palacios: failed to allocated list for user streams\n"); + ERROR("failed to allocated list for user streams\n"); return -1; }