Palacios Public Git Repository

To checkout Palacios execute

  git clone http://v3vee.org/palacios/palacios.web/palacios.git
This will give you the master branch. You probably want the devel branch or one of the release branches. To switch to the devel branch, simply execute
  cd palacios
  git checkout --track -b devel origin/devel
The other branches are similar.


5bce4315dc23ee52c27a9fac4f8a569896870968
[palacios.git] / linux_module / iface-keyed-stream.c
1 /*
2  * Palacios keyed stream interface
3  *
4  * Plus implementations for mem, file, and user space implementations
5  *
6  * (c) Peter Dinda, 2011 (interface, mem + file implementations + recooked user impl)
7  * (c) Clint Sbisa, 2011 (initial user space implementation on which this is based)
8  * (c) Diana Palsetia & Steve Rangel, 2012 (network based implementation)       
9  * (c) Peter Dinda, 2012 (updated interface, textfile)
10  */
11
12 #include <linux/fs.h>
13 #include <linux/file.h>
14 #include <linux/uaccess.h>
15 #include <linux/namei.h>
16 #include <linux/poll.h>
17 #include <linux/anon_inodes.h>
18
19 #include "palacios.h"
20 #include "util-hashtable.h"
21 #include "linux-exts.h"
22 #include "vm.h"
23
24 #define sint64_t int64_t
25 #include <interfaces/vmm_keyed_stream.h>
26
27 #include "iface-keyed-stream-user.h"
28
29 #include <interfaces/vmm_socket.h>
30
31 #include <linux/spinlock.h>
32 #include <asm/uaccess.h>
33 #include <linux/inet.h>
34 #include <linux/kthread.h>
35 #include <linux/netdevice.h>
36 #include <linux/ip.h>
37 #include <linux/in.h>
38 #include <linux/string.h>
39 #include <linux/preempt.h>
40 #include <linux/sched.h>
41 #include <linux/list.h>
42 #include <linux/syscalls.h>
43
44 #include <linux/module.h>
45 #include <linux/kernel.h>
46 #include <linux/socket.h>
47 #include <linux/net.h>
48 #include <linux/slab.h>
49
50
51 /*
52   This is an implementation of the Palacios keyed stream interface
53   that supports four flavors of streams:
54
55   "mem:"   Streams are stored in a hash table
56   The values for this hash table are hash tables associated with 
57   each stream.   A key maps to an expanding memory buffer.
58   Data is stored in a buffer like:
59
60   [boundarytag][taglen][tag][datalen][data]
61   [boundarytag][taglen][tag][datalen][data]
62   ...
63
64   "file:"  Streams are stored in files.  Each high-level
65   open corresponds to a directory, while a key corresponds to
66   a distinct file in that directory.   Data is stored in a file
67   like:
68
69   [boundarytag][taglen][tag][datalen][data]
70   [boundarytag][taglen][tag][datalen][data]
71   ...
72
73   "textfile:" Same as file, but data is stored in text format, like a
74   windows style .ini file.  A key maps to a file, and data is stored
75   in a file like:
76
77   [key]
78   tag=data_in_hex
79   tag=data_in_hex
80
81   This format makes it possible to concentenate the files to
82   produce a single "ini" file with "sections".
83
84
85   "net:"  Streams are carried over the network.  Each
86    high level open corresponds to a TCP connection, while
87    each key corresponds to a context on the stream.
88       "net:a:<ip>:<port>" => Bind to <ip>:<port> and accept a connection
89       "net:c:<ip>:<port>" => Connect to <ip>:<port>
90    "c" (client) 
91      open_stream: connect
92    "a" (server) 
93      open_stream: accept
94    "c" or "a":
95      open_key:  send [keylen-lastbyte-high-bit][key] (writer)
96            or   recv (same format as above)          (reader)
97      close_key: send [keylen-lastbyte-high-bit][key] (writer)
98            or   recv (same format as above)          (reader)
99      write_key: send [boundarytag][taglen][tag][datalen][data]
100      read_key:  recv (same format as above)
101      close_stream: close socket
102
103   "user:" Stream requests are bounced to user space to be 
104    handled there.  A rendezvous approach similar to the host 
105    device userland support is used
106
107    All keyed streams store the tags.
108    
109 */
110
111 #define STREAM_GENERIC 0
112 #define STREAM_MEM     1
113 #define STREAM_FILE    2
114 #define STREAM_USER    3
115 #define STREAM_NETWORK 4
116 #define STREAM_TEXTFILE 5
117
118 /*
119   All keyed streams and streams indicate their implementation type within the first field
120  */
121 struct generic_keyed_stream {
122     int stype;
123 };
124
125 struct generic_stream {
126     int stype;
127 };
128   
129 /*
130   boundary tags are used for some othe raw formats. 
131 */
132 static uint32_t BOUNDARY_TAG=0xabcd0123;
133
134
135 /****************************************************************************************
136    Memory-based implementation  ("mem:")
137 ****************************************************************************************/
138
139 #define DEF_NUM_STREAMS 16
140 #define DEF_NUM_KEYS    128
141 #define DEF_SIZE        128
142
143 /*
144   A memory keyed stream is a pointer to the underlying hash table
145   while a memory stream contains an extensible buffer for the stream
146  */
147 struct mem_keyed_stream {
148     int stype;
149     v3_keyed_stream_open_t ot;
150     struct hashtable *ht;
151 };
152
153 struct mem_stream {
154     int       stype;
155     char     *data;
156     uint32_t  size;
157     uint32_t  data_max;
158     uint32_t  ptr;
159 };
160
161 static struct mem_stream *create_mem_stream_internal(uint64_t size)
162 {
163     struct mem_stream *m = palacios_alloc(sizeof(struct mem_stream));
164
165     if (!m) {
166         return 0;
167     }
168
169
170     m->data = palacios_valloc(size);
171     
172     if (!m->data) { 
173         palacios_free(m);
174         return 0;
175     }
176
177     m->stype = STREAM_MEM;
178     m->size=size;
179     m->ptr=0;
180     m->data_max=0;
181     
182     return m;
183 }
184
185
186 static struct mem_stream *create_mem_stream(void)
187 {
188     return create_mem_stream_internal(DEF_SIZE);
189 }
190
191 static void destroy_mem_stream(struct mem_stream *m)
192 {
193     if (m) {
194         if (m->data) {
195             palacios_vfree(m->data);
196         }
197         m->data=0;
198         palacios_free(m);
199     }
200 }
201     
202 static int expand_mem_stream(struct mem_stream *m, uint32_t new_size)
203 {
204     void *data = palacios_valloc(new_size);
205     uint32_t nc;
206
207     if (!data) { 
208         return -1;
209     }
210     
211     nc = (new_size<m->data_max) ? new_size : m->data_max;
212
213     memcpy(data,m->data,nc);
214
215     palacios_vfree(m->data);
216
217     m->data=data;
218     m->size=new_size;
219     if (m->size<m->data_max) { 
220         m->data_max=m->size;
221     }
222    
223     return 0;
224 }
225
226 static uint32_t write_mem_stream(struct mem_stream *m,
227                                  void *data,
228                                  uint32_t len)
229 {
230     if ((m->ptr + len) > m->size) { 
231         if (expand_mem_stream(m,m->ptr + len)) { 
232             return 0;
233         }
234     }
235     memcpy(m->data+m->ptr,data,len);
236     m->ptr+=len;
237     m->data_max=m->ptr;
238     
239     return len;
240
241 }
242
243
244
245 static uint32_t read_mem_stream(struct mem_stream *m,
246                                 void *data,
247                                 uint32_t len)
248 {
249     if ((m->ptr + len) > m->data_max) { 
250         return 0;
251     }
252     memcpy(data,m->data+m->ptr,len);
253     m->ptr+=len;
254     
255     return len;
256
257 }
258
259
260 static void reset_mem_stream(struct mem_stream *m)
261 {
262     m->ptr=0;
263 }
264
265
266 static inline uint_t hash_func(addr_t key)
267 {
268     return palacios_hash_buffer((uchar_t*)key,strlen((uchar_t*)key));
269 }
270
271 static inline int hash_comp(addr_t k1, addr_t k2)
272 {
273     return strcasecmp((char*)k1,(char*)k2)==0;
274 }
275
276
277 // This stores all the memory keyed streams streams
278 static struct hashtable *mem_streams=0;
279
280
281 static v3_keyed_stream_t open_stream_mem(char *url,
282                                          v3_keyed_stream_open_t ot)
283 {
284
285     if (strncasecmp(url,"mem:",4)) { 
286         WARNING("illegitimate attempt to open memory stream \"%s\"\n",url);
287         return 0;
288     }
289
290     switch (ot) { 
291         case V3_KS_RD_ONLY:
292         case V3_KS_WR_ONLY: {
293             struct mem_keyed_stream *mks = (struct mem_keyed_stream *) palacios_htable_search(mem_streams,(addr_t)(url+4));
294             if (mks) { 
295                 mks->ot=ot;
296             }
297             return (v3_keyed_stream_t) mks;
298         }
299             break;
300
301         case V3_KS_WR_ONLY_CREATE: {
302             struct mem_keyed_stream *mks = (struct mem_keyed_stream *) palacios_htable_search(mem_streams,(addr_t)(url+4));
303             if (!mks) { 
304                 char *mykey;
305
306                 mykey = palacios_alloc(strlen(url+4)+1);
307
308                 if (!mykey) { 
309                     ERROR("cannot allocate space for new in-memory keyed stream %s\n",url);
310                     return 0;
311                 }
312
313                 strcpy(mykey,url+4);
314                 
315                 mks = (struct mem_keyed_stream *) palacios_alloc(sizeof(struct mem_keyed_stream));
316
317                 if (!mks) { 
318                     palacios_free(mykey);
319                     ERROR("cannot allocate in-memory keyed stream %s\n",url);
320                     return 0;
321                 }
322             
323                 mks->ht = (void*) palacios_create_htable(DEF_NUM_KEYS,hash_func,hash_comp);
324                 if (!mks->ht) { 
325                     palacios_free(mks);
326                     palacios_free(mykey);
327                     ERROR("cannot allocate in-memory keyed stream %s\n",url);
328                     return 0;
329                 }
330
331                 
332                 if (!palacios_htable_insert(mem_streams,(addr_t)(mykey),(addr_t)mks)) { 
333                     palacios_free_htable(mks->ht,1,1);
334                     palacios_free(mks);
335                     palacios_free(mykey);
336                     ERROR("cannot insert in-memory keyed stream %s\n",url);
337                     return 0;
338                 }
339                 mks->stype=STREAM_MEM;
340             }
341
342             mks->ot=V3_KS_WR_ONLY;
343             
344             return mks;
345             
346         }
347             break;
348
349         default:
350             ERROR("unsupported open type in open_stream_mem\n");
351             break;
352     }
353     
354     return 0;
355         
356 }
357
358
359
360 static void close_stream_mem(v3_keyed_stream_t stream)
361 {
362     // nothing to do
363     return;
364 }
365
366
367 static v3_keyed_stream_key_t open_key_mem(v3_keyed_stream_t stream,
368                                           char *key)
369 {
370     struct mem_keyed_stream *mks = (struct mem_keyed_stream *) stream;
371     struct hashtable *s = mks->ht;
372
373     struct mem_stream *m;
374
375     m = (struct mem_stream *) palacios_htable_search(s,(addr_t)key);
376
377     if (!m) { 
378         char *mykey = palacios_alloc(strlen(key)+1);
379
380         if (!mykey) { 
381             ERROR("cannot allocate copy of key for key %s\n",key);
382             return 0;
383         }
384
385         strcpy(mykey,key);
386
387         m = create_mem_stream();
388         
389         if (!m) { 
390             palacios_free(mykey);
391             ERROR("cannot allocate mem keyed stream for key %s\n",key);
392             return 0;
393         }
394
395         if (!palacios_htable_insert(s,(addr_t)mykey,(addr_t)m)) {
396             destroy_mem_stream(m);
397             palacios_free(mykey);
398             ERROR("cannot insert mem keyed stream for key %s\n",key);
399             return 0;
400         }
401     }
402
403     reset_mem_stream(m);
404     return m;
405
406 }
407
408
409 static void preallocate_hint_key_mem(v3_keyed_stream_t stream,
410                                      char *key,
411                                      uint64_t size)
412 {
413     struct mem_keyed_stream *mks = (struct mem_keyed_stream *) stream;
414     struct hashtable *s = mks->ht;
415
416     struct mem_stream *m;
417
418     if (mks->ot != V3_KS_WR_ONLY) { 
419         return;
420     }
421
422     m = (struct mem_stream *) palacios_htable_search(s,(addr_t)key);
423
424     if (!m) {
425         char *mykey;
426         
427         mykey=palacios_alloc(strlen(key)+1);
428         
429         if (!mykey) { 
430             ERROR("cannot allocate key space for preallocte for key %s\n",key);
431             return;
432         }
433         
434         strcpy(mykey,key);
435        
436         m = create_mem_stream_internal(size);
437         
438         if (!m) { 
439             ERROR("cannot preallocate mem keyed stream for key %s\n",key);
440             return;
441         }
442
443         if (!palacios_htable_insert(s,(addr_t)mykey,(addr_t)m)) {
444             ERROR("cannot insert preallocated mem keyed stream for key %s\n",key);
445             destroy_mem_stream(m);
446             return;
447         }
448     } else {
449         if (m->data_max < size) { 
450             if (expand_mem_stream(m,size)) { 
451                 ERROR("cannot expand key for preallocation for key %s\n",key);
452                 return;
453             }
454         }
455     }
456
457     return;
458
459 }
460
461 static void close_key_mem(v3_keyed_stream_t stream, 
462                           v3_keyed_stream_key_t key)
463 {
464     // nothing to do
465     return;
466 }
467
468 static sint64_t write_key_mem(v3_keyed_stream_t stream, 
469                               v3_keyed_stream_key_t key,
470                               void *tag,
471                               sint64_t taglen,
472                               void *buf,
473                               sint64_t len)
474 {
475   struct mem_keyed_stream *mks = (struct mem_keyed_stream *) stream;
476   struct mem_stream *m = (struct mem_stream *) key;
477   uint32_t mylen;
478   uint32_t writelen;
479   
480   if (mks->ot!=V3_KS_WR_ONLY) {
481     return -1;
482   }
483   
484   if (taglen<0 || len<0) { 
485     ERROR("Negative taglen or data len\n");
486     return -1;
487   }
488   
489   if (taglen>0xffffffffULL || len>0xffffffffULL) { 
490     ERROR("taglen or data len is too large\n");
491     return -1;
492   }
493       
494   writelen=write_mem_stream(m,&BOUNDARY_TAG,sizeof(BOUNDARY_TAG));
495   
496   if (writelen!=sizeof(BOUNDARY_TAG)) { 
497     ERROR("failed to write all data for boundary tag\n");
498     return -1;
499   }
500   
501   writelen=write_mem_stream(m,&taglen,sizeof(taglen));
502   
503   if (writelen!=sizeof(taglen)) { 
504     ERROR("failed to write taglen\n");
505     return -1;
506   }
507   
508   mylen = (uint32_t) taglen;
509   
510   writelen=write_mem_stream(m,tag,mylen);
511   
512   if (writelen!=mylen) { 
513     ERROR("failed to write all data for tag\n");
514     return -1;
515   } 
516
517   writelen=write_mem_stream(m,&len,sizeof(len));
518   
519   if (writelen!=sizeof(len)) { 
520     ERROR("failed to write datalen\n");
521     return -1;
522   }
523   
524   mylen = (uint32_t) len;
525
526   writelen=write_mem_stream(m,buf,mylen);
527
528   if (writelen!=mylen) { 
529     ERROR("failed to write all data for key\n");
530     return -1;
531   } else {
532     return (sint64_t)writelen;
533   }
534 }
535
536 static sint64_t read_key_mem(v3_keyed_stream_t stream, 
537                              v3_keyed_stream_key_t key,
538                              void *tag,
539                              sint64_t taglen,
540                              void *buf,
541                              sint64_t len)
542 {
543   struct mem_keyed_stream *mks = (struct mem_keyed_stream *) stream;
544   struct mem_stream *m = (struct mem_stream *) key;
545   uint32_t mylen;
546   uint32_t readlen;
547   void *temptag;
548   uint32_t tempbt;
549   sint64_t templen;
550   
551   
552   if (mks->ot!=V3_KS_RD_ONLY) {
553     return -1;
554   }
555   
556   if (len<0 || taglen<0) { 
557     ERROR("taglen or data len is negative\n");
558     return -1;
559   }
560
561   if (len>0xffffffffULL || taglen>0xffffffffULL) { 
562     ERROR("taglen or data len is too large\n");
563     return -1;
564   }
565   
566   readlen=read_mem_stream(m,&tempbt,sizeof(tempbt));
567   
568   if (readlen!=sizeof(tempbt)) { 
569     ERROR("failed to read all data for boundary tag\n");
570     return -1;
571   } 
572   
573   if (tempbt!=BOUNDARY_TAG) { 
574     ERROR("boundary tag not found (read 0x%x)\n",tempbt);
575     return -1;
576   }
577   
578   readlen=read_mem_stream(m,&templen,sizeof(templen));
579
580   if (readlen!=sizeof(templen)) { 
581     ERROR("failed to read all data for taglen\n");
582     return -1;
583   } 
584
585   if (templen!=taglen) { 
586     ERROR("tag size mismatch (requested=%lld, actual=%lld)\n",taglen,templen);
587     return -1;
588   }
589   
590   temptag = palacios_alloc(taglen);
591     
592   if (!temptag) { 
593     ERROR("cannot allocate temporary tag\n");
594     return -1;
595   }
596
597   mylen = (uint32_t) len;
598     
599   readlen=read_mem_stream(m,temptag,mylen);
600     
601   if (readlen!=mylen) { 
602     ERROR("failed to read all data for tag\n");
603     palacios_free(temptag);
604     return -1;
605   } 
606
607   if (memcmp(tag,temptag,taglen)) { 
608     ERROR("tag mismatch\n");
609     palacios_free(temptag);
610     return -1;
611   }
612   
613   palacios_free(temptag);
614
615   readlen=read_mem_stream(m,&templen,sizeof(templen));
616
617   if (readlen!=sizeof(templen)) { 
618     ERROR("failed to read all data for data len\n");
619     return -1;
620   } 
621   
622   if (templen!=len) { 
623     ERROR("data size mismatch (requested=%lld, actual=%lld)\n",len,templen);
624     return -1;
625   }
626
627   mylen = (uint32_t) len;
628   
629   readlen=read_mem_stream(m,buf,mylen);
630   
631   if (readlen!=mylen) { 
632     ERROR("failed to read all data for key\n");
633     return -1;
634   } else {
635     return (sint64_t)readlen;
636   }
637 }
638
639
640 /***************************************************************************************************
641   File-based implementation  ("file:")
642 *************************************************************************************************/
643
644 /*
645   A file keyed stream contains the fd of the directory
646   and a path
647 */
648
649 struct file_keyed_stream {
650     int   stype;
651     v3_keyed_stream_open_t ot;
652     char  *path;
653 };
654
655 struct file_stream {
656     int   stype;
657     struct file *f;   // the opened file
658 };
659
660
661 static v3_keyed_stream_t open_stream_file(char *url,
662                                           v3_keyed_stream_open_t ot)
663 {
664     struct file_keyed_stream *fks;
665     struct nameidata nd;
666
667     if (strncasecmp(url,"file:",5)) { 
668         WARNING("illegitimate attempt to open file stream \"%s\"\n",url);
669         return 0;
670     }
671
672     fks = palacios_alloc(sizeof(struct file_keyed_stream));
673     
674     if (!fks) { 
675         ERROR("cannot allocate space for file stream\n");
676         return 0;
677     }
678
679     fks->path = (char*)palacios_alloc(strlen(url+5)+1);
680     
681     if (!(fks->path)) { 
682         ERROR("cannot allocate space for file stream\n");
683         palacios_free(fks);
684         return 0;
685     }
686     
687     strcpy(fks->path,url+5);
688     
689     fks->stype=STREAM_FILE;
690
691     fks->ot= ot==V3_KS_WR_ONLY_CREATE ? V3_KS_WR_ONLY : ot;
692
693     // Does the directory exist, and can we read/write it?
694    
695     if (path_lookup(fks->path,LOOKUP_DIRECTORY|LOOKUP_FOLLOW,&nd)) { 
696
697         // directory does does not exist.  
698
699         if (ot==V3_KS_RD_ONLY || ot==V3_KS_WR_ONLY) { 
700
701             // we are not being asked to create it
702             ERROR("attempt to open %s, which does not exist\n",fks->path);
703             goto fail_out;
704
705         } else {
706
707             // We are being asked to create it
708
709             struct dentry *de;
710             int err;
711
712             // Find its parent
713             if (path_lookup(fks->path,LOOKUP_PARENT|LOOKUP_FOLLOW,&nd)) { 
714                 ERROR("attempt to create %s failed because its parent cannot be looked up\n",fks->path);
715                 goto fail_out;
716             }
717
718             // Can we write to the parent?
719
720             if (inode_permission(nd.path.dentry->d_inode, MAY_WRITE | MAY_EXEC)) { 
721                 ERROR("attempt to open %s, which has the wrong permissions for directory creation\n",fks->path);
722                 goto fail_out;
723             }
724
725             // OK, we can, so let's create it
726
727             de = lookup_create(&nd,1);
728
729             if (!de || IS_ERR(de)) { 
730                 ERROR("cannot allocate dentry\n");
731                 goto fail_out;
732             }
733
734             err = vfs_mkdir(nd.path.dentry->d_inode, de, 0700);
735
736             // lookup_create locks this for us!
737
738             mutex_unlock(&(nd.path.dentry->d_inode->i_mutex));
739
740             if (err) {
741                 ERROR("attempt to create %s failed because mkdir failed\n",fks->path);
742                 goto fail_out;
743             }
744
745             // now the directory should exist and have reasonable permissions
746             return (v3_keyed_stream_t) fks;
747         }
748     } 
749
750     
751     // we must be in V3_KS_RD_ONLY or V3_KS_WR_ONLY, 
752     // and the directory exists, so we must check the permissions
753
754     if (inode_permission(nd.path.dentry->d_inode, MAY_EXEC | (ot==V3_KS_RD_ONLY ? MAY_READ : MAY_WRITE))) {
755         ERROR("attempt to open %s, which has the wrong permissions\n",fks->path);
756         goto fail_out;
757     } else {
758         return (v3_keyed_stream_t) fks;
759     }
760
761
762  fail_out:
763     palacios_free(fks->path);
764     palacios_free(fks);
765     return 0;
766
767 }
768
769 static void close_stream_file(v3_keyed_stream_t stream)
770 {
771     struct file_keyed_stream *fks = (struct file_keyed_stream *) stream;
772     
773     palacios_free(fks->path);
774     palacios_free(fks);
775
776 }
777
778 static void preallocate_hint_key_file(v3_keyed_stream_t stream,
779                                       char *key,
780                                       uint64_t size)
781 {
782     return;
783 }
784
785 static v3_keyed_stream_key_t open_key_file(v3_keyed_stream_t stream,
786                                            char *key)
787 {
788     struct file_keyed_stream *fks = (struct file_keyed_stream *) stream;
789     struct file_stream *fs;
790     char *path;
791
792     // the path is the stream's path plus the key name
793     // file:/home/foo + "regext" => "/home/foo/regext"
794     path = (char *) palacios_alloc(strlen(fks->path)+strlen(key)+2);
795     if (!path) {                                
796         ERROR("cannot allocate file keyed stream for key %s\n",key);
797         return 0;
798     }
799     strcpy(path,fks->path);
800     strcat(path,"/");
801     strcat(path,key);
802     
803     fs = (struct file_stream *) palacios_alloc(sizeof(struct file_stream));
804     
805     if (!fs) { 
806         ERROR("cannot allocate file keyed stream for key %s\n",key);
807         palacios_free(path);
808         return 0;
809     }
810
811     fs->stype=STREAM_FILE;
812
813     fs->f = filp_open(path,O_RDWR|O_CREAT|O_LARGEFILE,0600);
814
815     if (!fs->f || IS_ERR(fs->f)) {
816         ERROR("cannot open relevent file \"%s\" for stream \"file:%s\" and key \"%s\"\n",path,fks->path,key);
817         palacios_free(fs);
818         palacios_free(path);
819         return 0;
820     }
821
822     palacios_free(path);
823
824     return fs;
825 }
826
827
828 static void close_key_file(v3_keyed_stream_t stream, 
829                            v3_keyed_stream_key_t key)
830 {
831     struct file_stream *fs = (struct file_stream *) key;
832
833     filp_close(fs->f,NULL);
834
835     palacios_free(fs);
836 }
837
838
839 static sint64_t write_file(struct file_stream *fs, void *buf, sint64_t len)
840 {
841     ssize_t done, left, total;
842     mm_segment_t old_fs;
843
844     total=len;
845     left=len;
846
847     while (left>0) {
848         old_fs = get_fs();
849         set_fs(get_ds());
850         done = fs->f->f_op->write(fs->f, buf+(total-left), left, &(fs->f->f_pos));
851         set_fs(old_fs);
852         if (done<=0) {
853             return -1;
854         } else {
855             left -= done;
856         }
857     }
858
859     return len;
860 }
861
862 static sint64_t write_key_file(v3_keyed_stream_t stream, 
863                                v3_keyed_stream_key_t key,
864                                void *tag,
865                                sint64_t taglen,
866                                void *buf,
867                                sint64_t len)
868 {
869   struct file_keyed_stream *fks = (struct file_keyed_stream *) stream;
870   struct file_stream *fs = (struct file_stream *) key;
871   sint64_t writelen;
872   
873   if (fks->ot!=V3_KS_WR_ONLY) { 
874     return -1;
875   }
876   
877   if (taglen<0 || len<0) { 
878     ERROR("Negative taglen or data len\n");
879     return -1;
880   }
881   
882   writelen=write_file(fs,&BOUNDARY_TAG,sizeof(BOUNDARY_TAG));
883   
884   if (writelen!=sizeof(BOUNDARY_TAG)) { 
885     ERROR("failed to write all data for boundary tag\n");
886     return -1;
887   }
888   
889   writelen=write_file(fs,&taglen,sizeof(taglen));
890   
891   if (writelen!=sizeof(taglen)) { 
892     ERROR("failed to write taglen\n");
893     return -1;
894   }
895   
896   if (write_file(fs,tag,taglen)!=taglen) { 
897     ERROR("failed to write tag\n");
898     return -1;
899   }
900
901   writelen=write_file(fs,&len,sizeof(len));
902   
903   if (writelen!=sizeof(len)) { 
904     ERROR("failed to write data len\n");
905     return -1;
906   }
907   
908   return write_file(fs,buf,len);
909 }
910
911 static sint64_t read_file(struct file_stream *fs, void *buf, sint64_t len)
912 {
913     ssize_t done, left, total;
914     mm_segment_t old_fs;
915
916     total=len;
917     left=len;
918
919
920     while (left>0) {
921         old_fs = get_fs();
922         set_fs(get_ds());
923         done = fs->f->f_op->read(fs->f, buf+(total-left), left, &(fs->f->f_pos));
924         set_fs(old_fs);
925         if (done<=0) {
926             return -1;
927         } else {
928             left -= done;
929         }
930     }
931
932     return len;
933 }
934
935
936 static sint64_t read_key_file(v3_keyed_stream_t stream, 
937                               v3_keyed_stream_key_t key,
938                               void *tag,
939                               sint64_t taglen,
940                               void *buf,
941                               sint64_t len)
942 {
943   struct file_keyed_stream *fks = (struct file_keyed_stream *) stream;
944   struct file_stream *fs = (struct file_stream *) key;
945   void *temptag;
946   uint32_t tempbt;
947   sint64_t templen;
948   sint64_t readlen;
949   
950   if (fks->ot!=V3_KS_RD_ONLY) { 
951     return -1;
952   }
953   
954   if (len<0 || taglen<0) { 
955     ERROR("taglen or data len is negative\n");
956     return -1;
957   }
958
959   readlen=read_file(fs,&tempbt,sizeof(tempbt));
960   
961   if (readlen!=sizeof(tempbt)) { 
962     ERROR("failed to read all data for boundary tag\n");
963     return -1;
964   } 
965   
966   if (tempbt!=BOUNDARY_TAG) { 
967     ERROR("boundary tag not found (read 0x%x)\n",tempbt);
968     return -1;
969   }
970
971   readlen=read_file(fs,&templen,sizeof(templen));
972   
973   if (readlen!=sizeof(templen)) { 
974     ERROR("failed to read all data for tag len\n");
975     return -1;
976   } 
977
978   if (templen!=taglen) { 
979     ERROR("tag size mismatch (requested=%lld, actual=%lld)\n",taglen,templen);
980     return -1;
981   }
982
983   temptag=palacios_alloc(taglen);
984
985   if (!temptag) { 
986     ERROR("Cannot allocate temptag\n");
987     return -1;
988   }
989   
990   if (read_file(fs,temptag,taglen)!=taglen) { 
991     ERROR("Cannot read tag\n");
992     palacios_free(temptag);
993     return -1;
994   }
995
996   if (memcmp(temptag,tag,taglen)) { 
997     ERROR("Tag mismatch\n");
998     palacios_free(temptag);
999     return -1;
1000   }
1001   
1002   palacios_free(temptag);
1003
1004   readlen=read_file(fs,&templen,sizeof(templen));
1005   
1006   if (readlen!=sizeof(templen)) { 
1007     ERROR("failed to read all data for data len\n");
1008     return -1;
1009   } 
1010
1011   if (templen!=len) { 
1012     ERROR("datasize mismatch (requested=%lld, actual=%lld)\n",len,templen);
1013     return -1;
1014   }
1015
1016   return read_file(fs,buf,len);
1017
1018 }
1019
1020
1021 /***************************************************************************************************
1022   Textfile-based implementation  ("textfile:")
1023
1024   Note that this implementation uses the internal structure and functions of the 
1025   "file:" implementation
1026 *************************************************************************************************/
1027
1028 // optimize the reading and decoding of hex data
1029 // this weakens the parser, so that:
1030 // tag =0A0B0D 
1031 // will work, but
1032 // tag = 0A 0B 0D
1033 // will not.  Note the leading whitespace
1034 #define TEXTFILE_OPT_HEX 0
1035
1036 //
1037 // The number of bytes handled at a time by the hex putter and getter
1038 //
1039 #define MAX_HEX_SEQ 64
1040
1041 /*
1042   A text file keyed stream is a file_keyed_stream,
1043   only with a different stype
1044 */
1045
1046 #define PAUSE()  
1047 //#define PAUSE() ssleep(5) 
1048
1049
1050 typedef struct file_keyed_stream textfile_keyed_stream;
1051
1052 typedef struct file_stream textfile_stream;
1053
1054
1055 static v3_keyed_stream_t open_stream_textfile(char *url,
1056                                               v3_keyed_stream_open_t ot)
1057 {
1058   textfile_keyed_stream *me;
1059
1060   if (strncasecmp(url,"textfile:",9)) { 
1061     WARNING("illegitimate attempt to open textfile stream \"%s\"\n",url);
1062     return 0;
1063   }
1064
1065   me = (textfile_keyed_stream *) open_stream_file(url+4, ot);
1066
1067   if (!me) {
1068     ERROR("could not create underlying file stream\n");
1069     return 0;
1070   }
1071
1072   me->stype=STREAM_TEXTFILE;
1073
1074   return me;
1075 }
1076
1077   
1078
1079 static void close_stream_textfile(v3_keyed_stream_t stream)
1080 {
1081   textfile_keyed_stream *me = stream;
1082
1083   me->stype=STREAM_FILE;
1084
1085   close_stream_file(me);
1086
1087 }
1088
1089 static void preallocate_hint_key_textfile(v3_keyed_stream_t stream,
1090                                           char *key,
1091                                           uint64_t size)
1092 {
1093   textfile_keyed_stream *me = stream;
1094   
1095   me->stype=STREAM_FILE;
1096
1097   preallocate_hint_key_file(me,key,size);
1098   
1099   me->stype=STREAM_TEXTFILE;
1100  
1101 }
1102
1103
1104 static inline int isoneof(char c, char *sl, int m)
1105 {
1106   int i;
1107   
1108   for (i=0;i<m;i++) { 
1109     if (c==sl[i]) { 
1110       return 1;
1111     }
1112   }
1113   
1114   return 0;
1115 }
1116
1117 static char get_next_char(textfile_stream *s)
1118 {
1119   char buf;
1120   if (read_file(s,&buf,1)!=1) { 
1121     return -1;
1122   } 
1123   return buf;
1124 }
1125
1126 static char hexify_nybble(char c)
1127 {
1128   if (c>=0 && c<=9) { 
1129     return '0'+c;
1130   } else if (c>=0xa && c<=0xf) { 
1131     return 'a'+(c-0xa);
1132   } else {
1133     return -1;
1134   }
1135 }
1136
1137 static int hexify_byte(char *c, char b)
1138 {
1139   char n;
1140   n = hexify_nybble( (b >> 4) & 0xf);
1141   if (n==-1) { 
1142     return -1;
1143   }
1144   c[0] = n;
1145   n = hexify_nybble( b & 0xf);
1146   if (n==-1) { 
1147     return -1;
1148   }
1149   c[1] = n;
1150   return 0;
1151 }
1152
1153
1154 static char dehexify_nybble(char c)
1155 {
1156   if (c>='0' && c<='9') { 
1157     return c-'0';
1158   } else if (c>='a' && c<='f') {
1159     return 0xa + (c-'a');
1160   } else if (c>='A' && c<='F') { 
1161     return 0xa + (c-'A');
1162   } else {
1163     return -1;
1164   }
1165 }
1166
1167 static int dehexify_byte(char *c, char *b)
1168 {
1169   char n;
1170   n = dehexify_nybble(c[0]);
1171   if (n==-1) { 
1172     return -1;
1173   }
1174   *b = n << 4;
1175   n = dehexify_nybble(c[1]);
1176   if (n==-1) { 
1177     return -1;
1178   }
1179   *b |= n;
1180   return 0;
1181 }
1182
1183
1184 #if TEXTFILE_OPT_HEX
1185
1186
1187 // Here the sl array, of length m is the number
1188 static int get_hexbytes_as_data(textfile_stream *s, char *buf, int n)
1189 {
1190   char rbuf[MAX_HEX_SEQ*2];
1191   int left = n;
1192   int off = 0;
1193   int cur = 0;
1194   int i;
1195
1196   while (left>0) {
1197     cur = left > MAX_HEX_SEQ ? MAX_HEX_SEQ : left;
1198     if (read_file(s,rbuf,cur*2)!=cur*2) { 
1199       ERROR("Cannot read data in getting hexbytes as data\n");
1200       return -1;
1201     }
1202     
1203     for (i=0;i<cur;i++) {
1204       if (dehexify_byte(rbuf+(i*2),buf+off+i)==-1) { 
1205         ERROR("Cannot decode data as hex in getting hexbytes as data\n");
1206         return -1;
1207       }
1208     }
1209     left-=cur;
1210     off+=cur;
1211   } 
1212
1213   return 0;
1214 }    
1215
1216 #endif
1217
1218 // Here the sl array, of length m is the set of characters to skip (e.g., whitespace)
1219 static int get_hexbytes_as_data_skip(textfile_stream *s, char *buf, int n, char *sl, int m)
1220 {
1221   char rbuf[2];
1222   int which = 0;
1223   int cur=0;
1224     
1225   while (cur<n) {
1226     which=0;
1227     while (which<2) { 
1228       rbuf[which] = get_next_char(s);
1229       if (rbuf[which]==-1) { 
1230         ERROR("Cannot read char in getting hexbytes as data with skiplist");
1231         return -1;
1232       }
1233       if (isoneof(rbuf[which],sl,m)) { 
1234         continue;
1235       } else {
1236         which++;
1237       }
1238     }
1239     if (dehexify_byte(rbuf,buf+cur)==-1) { 
1240       ERROR("Cannot decode data as hex in getting hexbytes as data with skiplist\n");
1241       return -1;
1242     } else {
1243       cur++;
1244     }
1245   }
1246   return 0;
1247 }    
1248
1249 /*
1250 static int put_next_char(textfile_stream *s, char d)
1251 {
1252   return write_file(s,&d,1);
1253 }
1254 */
1255
1256
1257 static int put_data_as_hexbytes(textfile_stream *s, char *buf, int n)
1258 {
1259   char rbuf[MAX_HEX_SEQ*2];
1260   int left = n;
1261   int off = 0;
1262   int cur = 0;
1263   int i;
1264
1265   while (left>0) {
1266     cur = left > MAX_HEX_SEQ ? MAX_HEX_SEQ : left;
1267     for (i=0;i<cur;i++) {
1268       if (hexify_byte(rbuf+(i*2),*(buf+off+i))==-1) { 
1269         ERROR("Cannot encode data as hex in putting data as hexbytes\n");
1270         return -1;
1271       }
1272     }
1273     if (write_file(s,rbuf,cur*2)!=cur*2) { 
1274       ERROR("Cannot write data in putting data as hexbytes\n");
1275       return -1;
1276     }
1277     left-=cur;
1278     off+=cur;
1279   } 
1280
1281   return 0;
1282 }    
1283
1284
1285 static int put_string_n(textfile_stream *s, char *buf, int n)
1286 {
1287   int rc;
1288
1289   rc = write_file(s,buf,n);
1290   
1291   if (rc!=n) { 
1292     return -1;
1293   } else {
1294     return 0;
1295   }
1296 }
1297
1298 static int put_string(textfile_stream *s, char *buf)
1299 {
1300   int n=strlen(buf);
1301
1302   return put_string_n(s,buf,n);
1303 }
1304
1305
1306
1307 static int search_for(textfile_stream *s, char d)
1308 {
1309   char c;
1310   do {
1311     c=get_next_char(s);
1312   } while (c!=-1 && c!=d);
1313   
1314   if (c==d) { 
1315     return 0;
1316   } else {
1317     return -1;
1318   }
1319 }
1320
1321 /*
1322 static int skip_matching(textfile_stream *s, char *m, int n)
1323 {
1324   char c;
1325   int rc = 0;
1326   int i;
1327
1328   while (rc==0) { 
1329     c=get_next_char(s);
1330     if (c==-1) { 
1331       rc=-1;
1332     } else {
1333       for (i=0;i<n;i++) { 
1334         if (c==m[i]) {
1335           rc=1;
1336           break;
1337         } 
1338       }
1339     }
1340   }
1341   
1342   if (rc==1) { 
1343     return 0;  // found
1344   } else {
1345     return rc; // unknown
1346   }
1347 }
1348
1349 */
1350
1351
1352 static int token_scan(textfile_stream *s, char *token, int n, char *sl, int m)
1353 {
1354   char c;
1355   int cur;
1356
1357   // Skip whitespace
1358   do {
1359     c=get_next_char(s);
1360     if (c==-1) { 
1361       ERROR("Failed to get character during token scan (preceding whitespace)\n");
1362       return -1;
1363     }
1364   } while (isoneof(c,sl,m));
1365
1366
1367   token[0]=c;
1368   
1369   // Record
1370   cur=1;
1371   while (cur<(n-1)) { 
1372     c=get_next_char(s);
1373     if (c==-1) { 
1374       ERROR("Failed to get character during token scan (token)\n");
1375       return -1;
1376     }
1377     if (isoneof(c,sl,m)) { 
1378       break;
1379     } else {
1380       token[cur]=c;
1381       cur++;
1382     } 
1383   }
1384   token[cur]=0;
1385   return 0;
1386 }
1387
1388
1389
1390 static v3_keyed_stream_key_t open_key_textfile(v3_keyed_stream_t stream,
1391                                                char *key)
1392 {
1393   textfile_keyed_stream *mks = stream;
1394   textfile_stream *ms;
1395
1396   mks->stype=STREAM_FILE;
1397
1398   ms = open_key_file(mks,key);
1399
1400   if (!ms) { 
1401     ERROR("cannot open underlying file keyed stream for key %s\n",key);
1402     mks->stype=STREAM_TEXTFILE;
1403     return 0;
1404   }
1405
1406   if (mks->ot==V3_KS_WR_ONLY) { 
1407     
1408     // Now we write the section header
1409
1410     ms->stype=STREAM_FILE;
1411     
1412     if (put_string(ms,"[")) { 
1413       close_key_file(mks,ms);
1414       mks->stype=STREAM_TEXTFILE;
1415       return 0;
1416     }
1417     
1418     if (put_string(ms,key)) { 
1419       close_key_file(mks,ms);
1420       mks->stype=STREAM_TEXTFILE;
1421       return 0;
1422     }
1423     
1424     if (put_string(ms,"]\n")) {
1425       close_key_file(mks,ms);
1426       mks->stype=STREAM_TEXTFILE;
1427       return 0;
1428     }
1429     
1430     
1431     mks->stype=STREAM_TEXTFILE;
1432     ms->stype=STREAM_TEXTFILE;
1433
1434     return ms;
1435
1436   } else if (mks->ot == V3_KS_RD_ONLY) {
1437     // Now we readthe section header
1438     int keylen=strlen(key);
1439     char *tempkey = palacios_alloc(keylen+3);
1440
1441     ms->stype=STREAM_FILE;
1442
1443     if (!tempkey) { 
1444       ERROR("Allocation failed in opening key\n");
1445       close_key_file(mks,ms);
1446       mks->stype=STREAM_FILE;
1447       return 0;
1448     }
1449
1450
1451     if (token_scan(ms,tempkey,keylen+3,"\t\r\n",3)) { 
1452       ERROR("Cannot scan for token (key search)\n");
1453       close_key_file(mks,ms);
1454       mks->stype=STREAM_TEXTFILE;
1455       palacios_free(tempkey);
1456       return 0;
1457     }
1458     
1459     tempkey[keylen+2] = 0;
1460     
1461     // Should now have [key]0
1462
1463     if (tempkey[0]!='[' ||
1464         tempkey[keylen+1]!=']' ||
1465         memcmp(key,tempkey+1,keylen)) {
1466       ERROR("key mismatch: target key=%s, found %s\n",key,tempkey);
1467       palacios_free(tempkey);
1468       close_key_file(mks,ms);
1469       mks->stype=STREAM_TEXTFILE;
1470       return 0;
1471     }
1472
1473     // key match done, success
1474
1475     mks->stype=STREAM_TEXTFILE;
1476     ms->stype=STREAM_TEXTFILE;
1477
1478     palacios_free(tempkey);
1479
1480     return ms;
1481     
1482   } else {
1483     ERROR("Unknown open type in open_key_textfile\n");
1484     ms->stype=STREAM_FILE;
1485     close_key_file(mks,ms);
1486     return 0;
1487   }
1488
1489 }
1490
1491
1492
1493 static void close_key_textfile(v3_keyed_stream_t stream, 
1494                                v3_keyed_stream_key_t key)
1495 {
1496   textfile_keyed_stream *mks = stream;
1497   textfile_stream *ms=key;
1498
1499   mks->stype=STREAM_FILE;
1500   ms->stype=STREAM_FILE;
1501
1502   close_key_file(mks,ms);
1503
1504   mks->stype=STREAM_TEXTFILE;
1505
1506 }
1507
1508
1509 static sint64_t read_key_textfile(v3_keyed_stream_t stream, 
1510                                   v3_keyed_stream_key_t key,
1511                                   void *tag,
1512                                   sint64_t taglen,
1513                                   void *buf,
1514                                   sint64_t len)
1515 {
1516     textfile_stream *ms = (textfile_stream *) key;
1517     char tags[32];
1518     char *temptag;
1519
1520
1521
1522     memcpy(tags,tag,taglen<31 ? taglen : 31);
1523     tags[taglen<32? taglen : 31 ]=0;
1524     
1525     temptag=palacios_alloc(taglen+1);
1526     if (!temptag) { 
1527       ERROR("Unable to allocate temptag in textfile read key\n");
1528       return -1;
1529     }
1530
1531     ms->stype=STREAM_FILE;
1532     
1533     if (token_scan(ms,temptag,taglen+1," \t\r\n=",5)) { 
1534       ERROR("Cannot scan for token (tag search)\n");
1535       ms->stype=STREAM_TEXTFILE;
1536       palacios_free(temptag);
1537       return -1;
1538     }
1539
1540     if (memcmp(tag,temptag,taglen)) { 
1541       ERROR("Tag mismatch in reading tag from textfile: desired tag=%s, actual tag=%s\n",tags,temptag);
1542       ms->stype=STREAM_TEXTFILE;
1543       palacios_free(temptag);
1544       return -1;
1545     }
1546
1547     // tag matches, let's go and find our =
1548     palacios_free(temptag);
1549
1550     if (search_for(ms,'=')) { 
1551       ERROR("Unable to find = sign in tag data parse (tag=%s)\n", tags);
1552       ms->stype=STREAM_TEXTFILE;
1553       return -1;
1554     }
1555
1556
1557 #if TEXTFILE_OPT_HEX
1558     if (get_hexbytes_as_data(ms,buf,len)) { 
1559       ERROR("Cannot read data in hex format (opt path) in textfile for tag %s\n",tags);
1560       ms->stype=STREAM_TEXTFILE;
1561       return -1;
1562     }
1563 #else
1564     if (get_hexbytes_as_data_skip(ms,buf,len," \t\r\n",4)) { 
1565       ERROR("Cannot read data in hex format (unopt path) in textfile for tag %s\n",tags);
1566       ms->stype=STREAM_TEXTFILE;
1567       return -1;
1568     }
1569 #endif
1570
1571     ms->stype=STREAM_TEXTFILE;
1572
1573     return len;
1574 }
1575
1576 static sint64_t write_key_textfile(v3_keyed_stream_t stream, 
1577                                    v3_keyed_stream_key_t key,
1578                                    void *tag,
1579                                    sint64_t taglen,
1580                                    void *buf,
1581                                    sint64_t len)
1582 {
1583     textfile_stream *ms = (textfile_stream *) key;
1584     char tags[32];
1585
1586
1587
1588     memcpy(tags,tag,taglen<31 ? taglen : 31);
1589     tags[taglen<32? taglen : 31 ]=0;
1590
1591     /*    if (taglen>100000 || len>100000) { 
1592       ERROR("Too big\n");
1593       return -1;
1594     }
1595     */
1596
1597     ms->stype=STREAM_FILE;
1598
1599     if (put_string_n(ms,tag,taglen)) { 
1600       ERROR("Cannot write tag %s in textfile\n",tags);
1601       ms->stype=STREAM_TEXTFILE;
1602       return -1;
1603     }
1604
1605     if (put_string(ms,"=")) { 
1606       ERROR("Cannot write = in textfile for tag %s\n",tags);
1607       ms->stype=STREAM_TEXTFILE;
1608       return -1;
1609     }
1610
1611     if (put_data_as_hexbytes(ms,buf,len)) { 
1612       ERROR("Cannot write data in hex format in textfile for tag %s\n",tags);
1613       ms->stype=STREAM_TEXTFILE;
1614       return -1;
1615     }
1616
1617     if (put_string(ms,"\n")) { 
1618       ERROR("Cannot write trailing lf in textfile for tag %s\n",tags);
1619       ms->stype=STREAM_TEXTFILE;
1620       return -1;
1621     }
1622
1623     ms->stype=STREAM_TEXTFILE;
1624
1625     return len;
1626 }
1627
1628
1629
1630 /***************************************************************************************************
1631   User implementation   ("user:")
1632 *************************************************************************************************/
1633
1634
1635 // List of all user keyed stream connections for the guest
1636 struct user_keyed_streams {
1637     spinlock_t lock;
1638     struct list_head streams;
1639 };
1640
1641
1642 // A single keyed stream connection to user space
1643 struct user_keyed_stream {
1644     int stype;
1645     v3_keyed_stream_open_t otype;
1646
1647     char *url;
1648     spinlock_t lock;
1649     int waiting;
1650
1651     wait_queue_head_t user_wait_queue;
1652     wait_queue_head_t host_wait_queue;
1653
1654     struct palacios_user_keyed_stream_op *op;
1655
1656     struct list_head node;
1657 };
1658
1659
1660 //
1661 // List of all of the user streams
1662 //
1663 static struct user_keyed_streams *user_streams;
1664
1665
1666
1667 static int resize_op(struct palacios_user_keyed_stream_op **op, uint64_t buf_len)
1668 {
1669     struct palacios_user_keyed_stream_op *old = *op;
1670     struct palacios_user_keyed_stream_op *new;
1671     
1672     if (!old) {
1673         new = palacios_alloc(sizeof(struct palacios_user_keyed_stream_op)+buf_len);
1674         if (!new) { 
1675             return -1;
1676         } else {
1677             new->len=sizeof(struct palacios_user_keyed_stream_op)+buf_len;
1678             new->buf_len=buf_len;
1679             *op=new;
1680             return 0;
1681         }
1682     } else {
1683         if ((old->len-sizeof(struct palacios_user_keyed_stream_op)) >= buf_len) { 
1684             old->buf_len=buf_len;
1685             return 0;
1686         } else {
1687             palacios_free(old);
1688             *op = 0 ;
1689             return resize_op(op,buf_len);
1690         }
1691     }
1692 }
1693
1694 //
1695 // The assumption is that we enter this with the stream locked
1696 // and we will return with it locked;  additionally, the op structure
1697 // will be overwritten with the response
1698 // 
1699 static int do_request_to_response(struct user_keyed_stream *s, unsigned long *flags)
1700 {
1701
1702     if (s->waiting) {
1703         ERROR("user keyed stream request attempted while one is already in progress on %s\n",s->url);
1704         return -1;
1705     }
1706
1707     // we are now waiting for a response
1708     s->waiting = 1;
1709
1710     // release the stream
1711     palacios_spinlock_unlock_irqrestore(&(s->lock), *flags);
1712
1713     // wake up anyone waiting on it
1714     wake_up_interruptible(&(s->user_wait_queue));
1715
1716     // wait for someone to give us a response
1717     while (wait_event_interruptible(s->host_wait_queue, (s->waiting == 0)) != 0) {}
1718
1719     // reacquire the lock for our called
1720     palacios_spinlock_lock_irqsave(&(s->lock), *flags);
1721
1722     return 0;
1723 }
1724
1725 //
1726 // The assumption is that we enter this with the stream locked
1727 // and we will return with it UNlocked
1728 // 
1729 static int do_response_to_request(struct user_keyed_stream *s, unsigned long *flags)
1730 {
1731
1732     if (!(s->waiting)) {
1733         ERROR("user keyed stream response while no request is in progress on %s\n",s->url);
1734         return -1;
1735     }
1736
1737     // we are now waiting for a request
1738     s->waiting = 0;
1739
1740     // release the stream
1741     palacios_spinlock_unlock_irqrestore(&(s->lock), *flags);
1742
1743     // wake up anyone waiting on it
1744     wake_up_interruptible(&(s->host_wait_queue));
1745     
1746     return 0;
1747 }
1748
1749
1750
1751 static unsigned int keyed_stream_poll_user(struct file *filp, poll_table *wait)
1752 {
1753     struct user_keyed_stream *s = (struct user_keyed_stream *) (filp->private_data);
1754     unsigned long flags;
1755     
1756     if (!s) {
1757         return POLLERR;
1758     }
1759     
1760     palacios_spinlock_lock_irqsave(&(s->lock), flags);
1761
1762     poll_wait(filp, &(s->user_wait_queue), wait);
1763
1764     if (s->waiting) {
1765         palacios_spinlock_unlock_irqrestore(&(s->lock), flags);
1766         return POLLIN | POLLRDNORM;
1767     }
1768     
1769     palacios_spinlock_unlock_irqrestore(&(s->lock), flags);
1770
1771     return 0;
1772 }
1773
1774 static long keyed_stream_ioctl_user(struct file * filp, unsigned int ioctl, unsigned long arg)
1775 {
1776     void __user *argp = (void __user *)arg;
1777     unsigned long flags;
1778     uint64_t size;
1779     
1780     struct user_keyed_stream *s = (struct user_keyed_stream *) (filp->private_data);
1781     
1782     switch (ioctl) {
1783
1784         case V3_KSTREAM_REQUEST_SIZE_IOCTL:
1785             
1786             // inform request size
1787             
1788             palacios_spinlock_lock_irqsave(&(s->lock), flags);
1789             
1790             if (!(s->waiting)) {
1791                 palacios_spinlock_unlock_irqrestore(&(s->lock), flags);
1792                 return 0;
1793             }
1794
1795             size =  sizeof(struct palacios_user_keyed_stream_op) + s->op->buf_len;
1796             
1797             if (copy_to_user((void * __user) argp, &size, sizeof(uint64_t))) {
1798                 palacios_spinlock_unlock_irqrestore(&(s->lock), flags);
1799                 ERROR("palacios user key size request failed to copy data\n");
1800                 return -EFAULT;
1801             }
1802             
1803             palacios_spinlock_unlock_irqrestore(&(s->lock), flags);
1804             
1805             return 1;
1806             
1807             break;
1808
1809         case V3_KSTREAM_REQUEST_PULL_IOCTL: 
1810                 
1811             // pull the request
1812             
1813             palacios_spinlock_lock_irqsave(&(s->lock), flags);
1814
1815             if (!(s->waiting)) {
1816                 palacios_spinlock_unlock_irqrestore(&(s->lock), flags);
1817                 ERROR("palacios user key pull request when not waiting\n");
1818                 return 0;
1819             }
1820
1821             size =  sizeof(struct palacios_user_keyed_stream_op) + s->op->buf_len;
1822
1823
1824             if (copy_to_user((void __user *) argp, s->op, size)) {
1825                 palacios_spinlock_unlock_irqrestore(&(s->lock), flags);
1826                 ERROR("palacios user key pull request failed to copy data\n");
1827                 return -EFAULT;
1828             }
1829
1830             palacios_spinlock_unlock_irqrestore(&(s->lock), flags);
1831             
1832             return 1;
1833             
1834          
1835             break;
1836
1837     case V3_KSTREAM_RESPONSE_PUSH_IOCTL:
1838
1839         // push the response
1840
1841         palacios_spinlock_lock_irqsave(&(s->lock), flags);
1842
1843         if (!(s->waiting)) {
1844             palacios_spinlock_unlock_irqrestore(&(s->lock), flags);
1845             ERROR("palacios user key push response when not waiting\n");
1846             return 0;
1847         }
1848         
1849         if (copy_from_user(&size, (void __user *) argp, sizeof(uint64_t))) {
1850             ERROR("palacios user key push response failed to copy size\n");
1851             palacios_spinlock_unlock_irqrestore(&(s->lock), flags);
1852             return -EFAULT;
1853         }
1854
1855         if (resize_op(&(s->op),size-sizeof(struct palacios_user_keyed_stream_op))) {
1856             ERROR("unable to resize op in user key push response\n");
1857             palacios_spinlock_unlock_irqrestore(&(s->lock), flags);
1858             return -EFAULT;
1859         }
1860
1861         if (copy_from_user(s->op, (void __user *) argp, size)) {
1862             palacios_spinlock_unlock_irqrestore(&(s->lock), flags);
1863             return -EFAULT;
1864         }
1865
1866         do_response_to_request(s,&flags);
1867         // this will have unlocked s for us
1868
1869         return 1;
1870
1871         break;
1872         
1873     default:
1874         ERROR("unknown ioctl in user keyed stream\n");
1875
1876         return -EFAULT;
1877
1878         break;
1879         
1880     }
1881 }
1882
1883
1884 static int keyed_stream_release_user(struct inode *inode, struct file *filp)
1885 {
1886     struct user_keyed_stream *s = filp->private_data;
1887     unsigned long f1,f2;
1888
1889     palacios_spinlock_lock_irqsave(&(user_streams->lock),f1);
1890     palacios_spinlock_lock_irqsave(&(s->lock), f2);
1891
1892     list_del(&(s->node));
1893
1894     palacios_spinlock_unlock_irqrestore(&(s->lock), f2);
1895     palacios_spinlock_unlock_irqrestore(&(user_streams->lock), f1);
1896     
1897     palacios_free(s->url);
1898     palacios_free(s);
1899
1900     return 0;
1901 }
1902
1903 static struct file_operations user_keyed_stream_fops = {
1904     .poll = keyed_stream_poll_user,
1905     .compat_ioctl = keyed_stream_ioctl_user,
1906     .unlocked_ioctl = keyed_stream_ioctl_user,
1907     .release = keyed_stream_release_user,
1908 };
1909
1910
1911 /*
1912   user_keyed_streams are allocated on user connect, and deallocated on user release
1913   
1914   palacios-side opens and closes only manipulate the open type
1915 */
1916
1917 int keyed_stream_connect_user(struct v3_guest *guest, unsigned int cmd, unsigned long arg, void *priv_data)
1918 {
1919     int fd;
1920     unsigned long flags;
1921     char *url;
1922     uint64_t len;
1923     struct user_keyed_stream *s;
1924     
1925     if (!user_streams) { 
1926         ERROR("no user space keyed streams!\n");
1927         return -1;
1928     }
1929
1930     // get the url
1931     if (copy_from_user(&len,(void __user *)arg,sizeof(len))) { 
1932         ERROR("cannot copy url len from user\n");
1933         return -1;
1934     }
1935
1936     url = palacios_alloc(len);
1937     
1938     if (!url) { 
1939         ERROR("cannot allocate url for user keyed stream\n");
1940         return -1;
1941     }
1942
1943     if (copy_from_user(url,((void __user *)arg)+sizeof(len),len)) {
1944         ERROR("cannot copy url from user\n");
1945         return -1;
1946     }
1947     url[len-1]=0;
1948         
1949     
1950     // Check for duplicate handler
1951     palacios_spinlock_lock_irqsave(&(user_streams->lock), flags);
1952     list_for_each_entry(s, &(user_streams->streams), node) {
1953         if (!strncasecmp(url, s->url, len)) {
1954             ERROR("user keyed stream connection with url \"%s\" already exists\n", url);
1955             palacios_free(url);
1956             return -1;
1957         }
1958     }
1959     palacios_spinlock_unlock_irqrestore(&(user_streams->lock), flags);
1960     
1961     // Create connection
1962     s = palacios_alloc(sizeof(struct user_keyed_stream));
1963     
1964     if (!s) {
1965         ERROR("cannot allocate new user keyed stream for %s\n",url);
1966         palacios_free(url);
1967         return -1;
1968     }
1969     
1970     
1971     // Get file descriptor
1972     fd = anon_inode_getfd("v3-kstream", &user_keyed_stream_fops, s, 0);
1973
1974     if (fd < 0) {
1975         ERROR("cannot allocate file descriptor for new user keyed stream for %s\n",url);
1976         palacios_free(s);
1977         palacios_free(url);
1978         return -1;
1979     }
1980     
1981     memset(s, 0, sizeof(struct user_keyed_stream));
1982     
1983     s->stype=STREAM_USER;
1984     s->url=url;
1985     
1986     init_waitqueue_head(&(s->user_wait_queue));
1987     init_waitqueue_head(&(s->host_wait_queue));
1988     
1989     // Insert connection into list
1990     palacios_spinlock_lock_irqsave(&(user_streams->lock), flags);
1991     list_add(&(s->node), &(user_streams->streams));
1992     palacios_spinlock_unlock_irqrestore(&(user_streams->lock), flags);
1993     
1994     return fd;
1995 }
1996     
1997 static struct user_keyed_stream *keyed_stream_user_find(char *url)
1998 {
1999     unsigned long flags;
2000     struct user_keyed_stream *s;
2001     
2002     if (!user_streams) { 
2003         ERROR("no user space keyed streams available\n");
2004         return NULL;
2005     }
2006     
2007     palacios_spinlock_lock_irqsave(&(user_streams->lock), flags);
2008     list_for_each_entry(s, &(user_streams->streams), node) {
2009         if (!strcasecmp(url, s->url)) {
2010             palacios_spinlock_unlock_irqrestore(&(user_streams->lock), flags);
2011             return s;
2012         }
2013     }
2014     
2015     palacios_spinlock_unlock_irqrestore(&(user_streams->lock), flags);
2016     
2017     return NULL;
2018 }
2019     
2020     
2021 static v3_keyed_stream_t open_stream_user(char *url, v3_keyed_stream_open_t ot)
2022 {
2023     unsigned long flags;
2024     struct user_keyed_stream *s;
2025     
2026     s = keyed_stream_user_find(url);
2027     
2028     if (!s) {
2029         ERROR("cannot open user stream %s as it does not exist yet\n",url);
2030         return NULL;
2031     }
2032
2033     palacios_spinlock_lock_irqsave(&(s->lock), flags);
2034
2035     if (s->waiting) {
2036         palacios_spinlock_unlock_irqrestore(&(s->lock), flags);
2037         ERROR("cannot open user stream %s as it is already in waiting state\n",url);
2038         return NULL;
2039     }
2040     
2041     s->otype = ot==V3_KS_WR_ONLY_CREATE ? V3_KS_WR_ONLY : ot;
2042     
2043     palacios_spinlock_unlock_irqrestore(&(s->lock), flags);
2044     
2045     return s;
2046     
2047 }
2048     
2049 // close stream does not do anything.  Creation of the stream and its cleanup
2050 // are driven by the user side, not the palacios side
2051 // might eventually want to reference count this, though
2052 static void close_stream_user(v3_keyed_stream_t stream)
2053 {
2054     return;
2055 }
2056
2057 static void preallocate_hint_key_user(v3_keyed_stream_t stream,
2058                                       char *key,
2059                                       uint64_t size)
2060 {
2061     return;
2062 }
2063
2064
2065
2066
2067 static v3_keyed_stream_key_t open_key_user(v3_keyed_stream_t stream, char *key)
2068 {
2069     unsigned long flags;
2070     struct user_keyed_stream *s = (struct user_keyed_stream *) stream;
2071     uint64_t   len = strlen(key)+1;
2072     void *user_key;
2073
2074     palacios_spinlock_lock_irqsave(&(s->lock), flags);
2075
2076
2077     if (resize_op(&(s->op),len)) {
2078         palacios_spinlock_unlock_irqrestore(&(s->lock),flags);
2079         ERROR("cannot resize op in opening key %s on user keyed stream %s\n",key,s->url);
2080         return NULL;
2081     }
2082
2083     s->op->type = PALACIOS_KSTREAM_OPEN_KEY;
2084     s->op->buf_len = len;
2085     strncpy(s->op->buf,key,len);
2086
2087     // enter with it locked
2088     if (do_request_to_response(s,&flags)) { 
2089         palacios_spinlock_unlock_irqrestore(&(s->lock),flags);
2090         ERROR("request/response handling failed\n");
2091         return NULL;
2092     }
2093     // return with it locked
2094
2095     user_key=s->op->user_key;
2096
2097     palacios_spinlock_unlock_irqrestore(&(s->lock),flags);
2098
2099     return user_key;
2100 }
2101
2102 static void close_key_user(v3_keyed_stream_t stream, v3_keyed_stream_key_t key)
2103 {
2104     struct user_keyed_stream *s = (struct user_keyed_stream *) stream;
2105     uint64_t   len = 0;
2106     unsigned long flags;
2107     
2108     palacios_spinlock_lock_irqsave(&(s->lock), flags);
2109
2110     if (resize_op(&(s->op),len)) {
2111         palacios_spinlock_unlock_irqrestore(&(s->lock),flags);
2112         ERROR("cannot resize op in closing key 0x%p on user keyed stream %s\n",key,s->url);
2113         return;
2114     }
2115
2116     s->op->type = PALACIOS_KSTREAM_CLOSE_KEY;
2117     s->op->buf_len = len;
2118     s->op->user_key = key;
2119
2120     // enter with it locked
2121     if (do_request_to_response(s,&flags)) { 
2122         palacios_spinlock_unlock_irqrestore(&(s->lock),flags);
2123         ERROR("request/response handling failed\n");
2124         return;
2125     }
2126     // return with it locked
2127
2128     palacios_spinlock_unlock_irqrestore(&(s->lock),flags);
2129
2130     return;
2131 }
2132
2133
2134
2135 static sint64_t read_key_user(v3_keyed_stream_t stream, v3_keyed_stream_key_t key,
2136                               void *tag,
2137                               sint64_t taglen,
2138                               void *buf, sint64_t rlen)
2139 {
2140
2141     struct user_keyed_stream *s = (struct user_keyed_stream *) stream;
2142     uint64_t   len = taglen ;
2143     sint64_t   xfer;
2144     unsigned long flags;
2145
2146     palacios_spinlock_lock_irqsave(&(s->lock), flags);
2147
2148     if (s->otype != V3_KS_RD_ONLY) { 
2149         palacios_spinlock_unlock_irqrestore(&(s->lock),flags);
2150         ERROR("attempt to read key from stream that is not in read state on %s\n",s->url);
2151     }   
2152
2153
2154     if (resize_op(&(s->op),len)) {
2155         palacios_spinlock_unlock_irqrestore(&(s->lock),flags);
2156         ERROR("cannot resize op in reading key 0x%p on user keyed stream %s\n",key,s->url);
2157         return -1;
2158     }
2159
2160     s->op->type = PALACIOS_KSTREAM_READ_KEY;
2161     s->op->buf_len = len ;
2162     s->op->xfer = rlen;
2163     s->op->user_key = key;
2164     s->op->data_off = taglen;
2165
2166     memcpy(s->op->buf,tag,taglen);
2167
2168     // enter with it locked
2169     if (do_request_to_response(s,&flags)) { 
2170         palacios_spinlock_unlock_irqrestore(&(s->lock),flags);
2171         ERROR("request/response handling failed\n");
2172         return -1;
2173     }
2174     // return with it locked
2175
2176
2177     if (s->op->xfer>0) { 
2178         // data_off must be zero
2179         memcpy(buf,s->op->buf,s->op->xfer);
2180     }
2181
2182     xfer=s->op->xfer;
2183
2184     palacios_spinlock_unlock_irqrestore(&(s->lock),flags);
2185
2186     return xfer;
2187 }
2188
2189
2190 static sint64_t write_key_user(v3_keyed_stream_t stream, v3_keyed_stream_key_t key,
2191                                void *tag,
2192                                sint64_t taglen,
2193                                void *buf, sint64_t wlen)
2194 {
2195
2196     struct user_keyed_stream *s = (struct user_keyed_stream *) stream;
2197     uint64_t   len = taglen + wlen ;
2198     sint64_t   xfer;
2199     unsigned long flags;
2200
2201
2202     palacios_spinlock_lock_irqsave(&(s->lock), flags);
2203
2204     if (s->otype != V3_KS_WR_ONLY) { 
2205         palacios_spinlock_unlock_irqrestore(&(s->lock),flags);
2206         ERROR("attempt to write key on stream that is not in write state on %s\n",s->url);
2207     }   
2208
2209     if (resize_op(&(s->op),len)) {
2210         palacios_spinlock_unlock_irqrestore(&(s->lock),flags);
2211         ERROR("cannot resize op in reading key 0x%p on user keyed stream %s\n",key,s->url);
2212         return -1;
2213     }
2214
2215     s->op->type = PALACIOS_KSTREAM_WRITE_KEY;
2216     s->op->buf_len = len;
2217     s->op->xfer = wlen;
2218     s->op->user_key = key;
2219     s->op->data_off = taglen;
2220
2221     memcpy(s->op->buf,tag,taglen);
2222     memcpy(s->op->buf+taglen,buf,wlen);
2223
2224     // enter with it locked
2225     if (do_request_to_response(s,&flags)) { 
2226         palacios_spinlock_unlock_irqrestore(&(s->lock),flags);
2227         ERROR("request/response handling failed\n");
2228         return -1;
2229     }
2230     // return with it locked
2231
2232     // no data comes back, xfer should be size of data write (not tag)
2233
2234     xfer=s->op->xfer;
2235
2236     palacios_spinlock_unlock_irqrestore(&(s->lock),flags);
2237
2238     return xfer;
2239 }
2240
2241
2242
2243 /****************************************************************************************
2244  *    Network-based implementation  ("net:")
2245  *****************************************************************************************/
2246
2247
2248 #define NET_MAX_KEY_LEN 128
2249
2250 struct net_keyed_stream {
2251     int stype;
2252     int ot;
2253     struct net_stream * ns;
2254 };
2255
2256 struct net_stream {
2257     int stype;
2258     struct socket *sock;
2259 };
2260
2261
2262 //ignore the arguments given here currently
2263 static struct net_stream * create_net_stream(void) 
2264 {
2265     struct net_stream * ns = NULL;
2266
2267     ns = palacios_alloc(sizeof(struct net_stream));
2268     
2269     if (!ns) { 
2270         ERROR("Cannot allocate a net_stream\n");
2271         return 0;
2272     }
2273
2274     memset(ns, 0, sizeof(struct net_stream));
2275
2276     ns->stype = STREAM_NETWORK;
2277
2278     return ns;
2279 }
2280
2281 static void close_socket(v3_keyed_stream_t stream)
2282 {
2283     struct net_keyed_stream *nks = (struct net_keyed_stream *) stream;
2284
2285     if (nks) { 
2286         struct net_stream *ns = nks->ns;
2287
2288         if (ns) {
2289             ns->sock->ops->release(ns->sock);
2290             palacios_free(ns);
2291             ERROR("Close Socket\n");
2292         }
2293         
2294         palacios_free(ns);
2295     }
2296 }
2297
2298
2299 static void close_stream_net(v3_keyed_stream_t stream)
2300 {
2301         close_socket(stream);
2302 }
2303
2304 static int connect_to_ip(struct net_stream *ns, int hostip, int port)
2305 {
2306     struct sockaddr_in client;
2307
2308     if (ns == NULL) {
2309         return -1;
2310     }
2311
2312     if (sock_create(PF_INET,SOCK_STREAM,IPPROTO_TCP,&(ns->sock))<0) { 
2313         ERROR("Cannot create accept socket\n");
2314         return -1;
2315     }
2316         
2317
2318     client.sin_family = AF_INET;
2319     client.sin_port = htons(port);
2320     client.sin_addr.s_addr = hostip;//in_aton(hostip);
2321
2322     return ns->sock->ops->connect(ns->sock, (struct sockaddr *)&client, sizeof(client), 0);
2323 }
2324
2325 static int send_msg(struct net_stream *ns,  char * buf, int len)
2326 {
2327     int left=len;
2328
2329     if (!ns) { 
2330         ERROR("Send message on null net_stream\n");
2331         return -1;
2332     }
2333
2334     if (!(ns->sock)) { 
2335         ERROR("Send message on net_stream without socket\n");
2336         return -1;
2337     }
2338
2339     while (left>0) {
2340
2341         struct msghdr msg;
2342         mm_segment_t oldfs;
2343         struct iovec iov;
2344         int err = 0;
2345         
2346
2347         msg.msg_flags = MSG_NOSIGNAL;//MSG_DONTWAIT;
2348         msg.msg_name = 0;
2349         msg.msg_namelen = 0;
2350         msg.msg_control = NULL;
2351         msg.msg_controllen = 0;
2352         msg.msg_iov = &iov;
2353         msg.msg_iovlen = 1;
2354
2355         iov.iov_base = (char *)&(buf[len-left]);
2356         iov.iov_len = (size_t)left;
2357
2358         oldfs = get_fs();
2359         set_fs(KERNEL_DS);
2360
2361         err = sock_sendmsg(ns->sock, &msg, (size_t)left);
2362
2363         set_fs(oldfs);
2364         
2365         if (err<0) {
2366             ERROR("Send msg error %d\n",err);
2367             return err;
2368         } else {
2369             left-=len;
2370         }
2371     }
2372
2373     return len;
2374 }
2375
2376
2377
2378 static int recv_msg(struct net_stream *ns, char * buf, int len)
2379 {
2380
2381     int left=len;
2382
2383     if (!ns) { 
2384         ERROR("Receive message on null net_stream\n");
2385         return -1;
2386     }
2387
2388     if (!(ns->sock)) { 
2389         ERROR("Receive  message on net_stream without socket\n");
2390         return -1;
2391     }
2392     
2393     
2394     while (left>0) {
2395         
2396         struct msghdr msg;
2397         mm_segment_t oldfs;
2398         struct iovec iov;
2399         int err;
2400         
2401         msg.msg_flags = 0;
2402         msg.msg_name = 0;
2403         msg.msg_namelen = 0;
2404         msg.msg_control = NULL;
2405         msg.msg_controllen = 0;
2406         msg.msg_iov = &iov;
2407         msg.msg_iovlen = 1;
2408         
2409         iov.iov_base = (void *)&(buf[len-left]);
2410         iov.iov_len = (size_t)left;
2411         
2412         oldfs = get_fs();
2413         set_fs(KERNEL_DS);
2414         
2415         err = sock_recvmsg(ns->sock, &msg, (size_t)left, 0);
2416         
2417         set_fs(oldfs);
2418         
2419         if (err<0) { 
2420             return err;
2421         } else {
2422             left -= err;
2423         }
2424     }
2425     return len;
2426 }
2427
2428 static struct net_stream * accept_once(struct net_stream * ns, const int port)
2429 {
2430     struct socket *accept_sock;
2431     struct sockaddr_in addr;
2432     int err;
2433     
2434     if (!ns) { 
2435         ERROR("Accept called on null net_stream\n");
2436         return 0;
2437     }
2438     
2439     if (sock_create(PF_INET,SOCK_STREAM,IPPROTO_TCP,&accept_sock)<0) { 
2440         ERROR("Cannot create accept socket\n"); 
2441         return NULL;
2442     }
2443     
2444
2445     addr.sin_family = AF_INET;
2446     addr.sin_port = htons(port);
2447     addr.sin_addr.s_addr = INADDR_ANY;
2448     
2449     err = accept_sock->ops->bind(accept_sock, (struct sockaddr *)&addr, sizeof(addr));
2450     
2451     if (err<0) {
2452         ERROR("Bind err: %d\n",err);
2453         return NULL;
2454     }
2455
2456     err = accept_sock->ops->listen(accept_sock,2);
2457     
2458     if (err<0) {
2459         ERROR("Listen err: %d\n",err);
2460         return NULL;
2461     }
2462     
2463     // Init the socket in the network strream
2464
2465     if (sock_create(PF_INET,SOCK_STREAM,IPPROTO_TCP,&(ns->sock))<0) { 
2466         ERROR("Cannot create socket\n");
2467         return NULL;
2468     }
2469     
2470     
2471     // Do the actual accept 
2472
2473     if (accept_sock->ops->accept(accept_sock,ns->sock,0)<0) {
2474         ERROR("accept failed");
2475         return NULL;
2476     }
2477     
2478     // close the accept socket
2479     accept_sock->ops->release(accept_sock);
2480     palacios_free(accept_sock);
2481
2482     return ns;
2483 }
2484
2485
2486 static struct v3_keyed_stream_t * open_stream_net(char * url,v3_keyed_stream_open_t ot)
2487 {
2488     struct net_keyed_stream * nks;
2489     int url_len;
2490     int i;
2491     int delimit[3];
2492     int k;
2493     char mode;
2494     int ip_len;
2495     int port_len;
2496
2497     nks = palacios_alloc(sizeof(struct net_keyed_stream)); 
2498
2499     if (!nks) { 
2500         ERROR("Could not allocate space in open_stream_net\n");
2501         return 0;
2502     }
2503     
2504     nks->ot = ot == V3_KS_WR_ONLY_CREATE ? V3_KS_WR_ONLY : ot;
2505
2506     nks->stype = STREAM_NETWORK; 
2507
2508     nks->ns = create_net_stream();
2509     
2510     if (!(nks->ns)) { 
2511         ERROR("Could not create network stream\n");
2512         palacios_free(nks);
2513         return 0;
2514     }
2515
2516     url_len=strlen(url);
2517     k=0;
2518
2519
2520     for(i = 0; i < url_len;i++){
2521         if(url[i] == ':'){
2522             delimit[k] = i;
2523             k++;        
2524         }
2525     }
2526
2527     mode = url[delimit[0] + 1];
2528     ip_len = delimit[2] - delimit[1];
2529     port_len = url_len - delimit[2];
2530
2531
2532     {
2533         char ip[ip_len];
2534         char port[port_len];
2535         int host_ip;
2536         int host_port;
2537
2538
2539         strncpy(ip,url + delimit[1]+1,ip_len-1);
2540         ip[ip_len-1]='\0';
2541         
2542         host_ip = in_aton(ip);
2543         
2544         strncpy(port,url+ delimit[2]+1,port_len-1);
2545         port[port_len-1]='\0';
2546         
2547         host_port = simple_strtol(port,NULL,10);
2548
2549         INFO("ip is %s\n",ip); 
2550         INFO("host_ip is %x\n", host_ip);
2551         INFO("port is %s (%d)\n",port,host_port);
2552         
2553         if (mode == 'a'){
2554             // accept a request
2555             INFO("Accepting Connection on INADDR_ANY port:%d\n",host_port);
2556             nks->ns = accept_once(nks->ns, host_port);
2557         } else if (mode == 'c'){
2558             // call connect to ip
2559             INFO("Connecting to %s:%d\n",ip,host_port);
2560             connect_to_ip(nks->ns,host_ip, host_port);
2561         } else {
2562             ERROR("Mode not recognized\n");
2563             palacios_free(nks);
2564             return NULL;
2565         }
2566         
2567         return (v3_keyed_stream_t)nks;
2568     }
2569 }
2570
2571 static void preallocate_hint_key_net(v3_keyed_stream_t stream, char *key,uint64_t size)
2572 {
2573     //do nothing
2574 }
2575
2576 static v3_keyed_stream_key_t open_key_net(v3_keyed_stream_t stream,char *key)
2577 {
2578    struct net_keyed_stream * nks = (struct net_keyed_stream *)stream;
2579
2580    // reciever of the key name 
2581    if (nks->ot==V3_KS_WR_ONLY)
2582    {
2583        unsigned short keylen = strlen(key);
2584
2585        if (keylen>NET_MAX_KEY_LEN || keylen>=32768) { 
2586            ERROR("Key is too long\n");
2587            return NULL;
2588        }
2589
2590        {
2591            // on-stack allocation here demands that we
2592            // keep key length low...
2593            char msg[keylen+3];
2594            int next = 0;
2595            
2596            // Opening a key for writing sends a notice of the 
2597            // key length and the key name on the channel
2598            
2599            msg[next++]=keylen & 0xFF;
2600            msg[next]=(keylen>>8) & 0xFF;
2601            // Set flag bit
2602            msg[next]=msg[next] | 0x80; // 0x80 is 128 and OR will flip leading bit to 1
2603            
2604            strncpy(msg+2,key,keylen);  // will also copy trailing zero
2605            
2606            if (send_msg(nks->ns,msg,keylen+2) != keylen+2) { 
2607                ERROR("Unable to open key for writing on net_stream (send key len+name)\n");
2608                return NULL;
2609            }
2610        }
2611    }
2612
2613    if (nks->ot==V3_KS_RD_ONLY)   {
2614        char msg_info[2];
2615        int next = 0;
2616        int keylen = 0;
2617
2618        if (recv_msg(nks->ns,msg_info,2) != 2) { 
2619            ERROR("Unable to open key for reading on net_stream (recv key len)\n");
2620            return NULL;
2621        }
2622
2623        next = 0;
2624        keylen = 0;
2625
2626        keylen |= msg_info[next++];
2627
2628        if ((msg_info[next] & 0x80) != 0x80)  {
2629            ERROR("Flag bit not set on receive of key length\n");
2630            return NULL;
2631        } else {
2632            msg_info[next] &= 0x7F; // flip the msb back to zero (clear flag)
2633        }
2634        
2635        keylen |= msg_info[next]<<8;
2636
2637        if (keylen > NET_MAX_KEY_LEN) { 
2638            ERROR("Received key length is too big\n");
2639            return NULL;
2640        }
2641        
2642        {
2643            
2644            char msg[keylen+1];
2645            
2646            if (recv_msg(nks->ns,msg,keylen) != keylen) { 
2647                ERROR("Unable to receive key\n");
2648                return NULL;
2649            }
2650            msg[keylen]=0;
2651            
2652            if (strncmp(key,msg,keylen)!=0) {
2653                ERROR("Key mismatch in open_key_net - expect %s but got %s\n",key,msg);
2654                return NULL;
2655            }
2656        }
2657    }
2658    
2659    return (v3_keyed_stream_key_t)key;
2660 }
2661
2662 static void close_key_net(v3_keyed_stream_t stream, v3_keyed_stream_key_t input_key)
2663 {
2664     char * key = (char*)input_key;
2665     struct net_keyed_stream * nks = (struct net_keyed_stream *)stream;
2666
2667     
2668     if (nks->ot==V3_KS_WR_ONLY) {
2669         unsigned short keylen = strlen(key);
2670
2671         if (keylen > NET_MAX_KEY_LEN || keylen>=32768) {
2672             ERROR("Key length too long in close_key_net\n");
2673             return;
2674         }
2675
2676         {
2677             char msg[keylen+3];
2678             int next = 0;
2679             
2680             msg[next++]=keylen & 0xFF;
2681             msg[next]=(keylen>>8) & 0xFF;
2682             // flag
2683             msg[next]=msg[next] | 0x80; // 0x80 is 128 and OR will filp leading bit to 1
2684             strncpy(msg+2,key,keylen); // will copy the zero
2685             msg[keylen+2]=0;
2686             if (send_msg(nks->ns,msg,keylen+2)!=keylen+2) { 
2687                 ERROR("Cannot send key on close_key_net\n");
2688                 return;
2689             }
2690         }
2691     }
2692     
2693     if (nks->ot==V3_KS_RD_ONLY)   {
2694         char msg_info[2];
2695         int next;
2696         int keylen;
2697         
2698         if (recv_msg(nks->ns,msg_info,2) != 2) { 
2699             ERROR("Cannot recv key length on close_key_net\n");
2700             return;
2701         }
2702         
2703         next = 0;
2704         keylen = 0;
2705         
2706         keylen |= msg_info[next++];
2707         
2708         if ((msg_info[next] & 0x80) != 0x80) {
2709             ERROR("Missing flag in close_key_net receive\n");
2710             return;
2711         } 
2712         
2713         msg_info[next] &= 0x7F; // flip the msb back to zero
2714         
2715         keylen |= msg_info[next]<<8;
2716         
2717         {
2718             char msg[keylen+1];
2719             
2720             if (recv_msg(nks->ns,msg,keylen)!=keylen) { 
2721                 ERROR("Did not receive all of key in close_key_net receive\n");
2722                 return;
2723             }
2724             
2725             msg[keylen]=0;
2726             
2727             if (strncmp(key,msg,keylen)!=0)  {
2728                 ERROR("Key mismatch in close_key_net - expect %s but got %s\n",key,msg);
2729                 return;
2730             }
2731         }
2732     }
2733 }
2734
2735 static sint64_t write_key_net(v3_keyed_stream_t stream, v3_keyed_stream_key_t key, 
2736                               void *tag,
2737                               sint64_t taglen,
2738                               void *buf, sint64_t len) 
2739 {
2740     struct net_keyed_stream * nks = (struct net_keyed_stream *)stream;
2741
2742     if (!buf) { 
2743         ERROR("Buf is NULL in write_key_net\n");
2744         return -1;
2745     }
2746
2747     if (!tag) { 
2748         ERROR("Tag is NULL in write_key_net\n");
2749         return -1;
2750     }
2751
2752     if (len<0) {
2753         ERROR("len is negative in write_key_net\n");
2754         return -1;
2755     }
2756
2757     if (taglen<0) {
2758         ERROR("taglen is negative in write_key_net\n");
2759         return -1;
2760     }
2761     
2762     if (!key){
2763        ERROR("write_key: key is NULL in write_key_net\n");
2764        return -1;
2765     }
2766     
2767     
2768     if (!nks)  {
2769         ERROR("nks is NULL in write_key_net\n");
2770         return -1;
2771     }
2772     
2773     if (nks->ot==V3_KS_WR_ONLY) {
2774         if (send_msg(nks->ns,(char*)(&BOUNDARY_TAG),sizeof(BOUNDARY_TAG))!=sizeof(BOUNDARY_TAG)) { 
2775             ERROR("Could not send boundary tag in write_key_net\n");
2776             return -1;
2777         } 
2778         if (send_msg(nks->ns,(char*)(&taglen),sizeof(taglen))!=sizeof(taglen)) { 
2779             ERROR("Could not send tag length in write_key_net\n");
2780             return -1;
2781         } 
2782         if (send_msg(nks->ns,tag,taglen)!=len) { 
2783             ERROR("Could not send tag in write_key_net\n");
2784             return -1;
2785         }
2786         if (send_msg(nks->ns,(char*)(&len),sizeof(len))!=sizeof(len)) { 
2787             ERROR("Could not send data length in write_key_net\n");
2788             return -1;
2789         } 
2790         if (send_msg(nks->ns,buf,len)!=len) { 
2791             ERROR("Could not send data in write_key_net\n");
2792             return -1;
2793         }
2794     }  else {
2795         ERROR("Permission not correct in write_key_net\n");
2796         return -1;
2797     }
2798     
2799     return len;
2800 }
2801
2802
2803 static sint64_t read_key_net(v3_keyed_stream_t stream, v3_keyed_stream_key_t key,
2804                              void *tag,
2805                              sint64_t taglen,
2806                              void *buf, sint64_t len)
2807 {
2808     struct net_keyed_stream * nks = (struct net_keyed_stream *)stream;
2809     void *temptag;
2810
2811     if (!buf) {
2812         ERROR("Buf is NULL in read_key_net\n");
2813         return -1;
2814     }
2815
2816     if (!tag) {
2817         ERROR("Tag is NULL in read_key_net\n");
2818         return -1;
2819     }
2820     
2821     if(len<0) {
2822         ERROR("len is negative in read_key_net\n");
2823         return -1;
2824     }
2825
2826     if(taglen<0) {
2827         ERROR("taglen is negative in read_key_net\n");
2828         return -1;
2829     }
2830     
2831     if (!key) {
2832         ERROR("read_key: key is NULL in read_key_net\n");
2833         return -1;
2834     }
2835
2836
2837     if (nks->ot==V3_KS_RD_ONLY) {
2838         
2839         sint64_t slen;
2840         uint32_t tempbt;
2841         
2842         if (recv_msg(nks->ns,(char*)(&tempbt),sizeof(tempbt))!=sizeof(tempbt)) { 
2843             ERROR("Cannot receive boundary tag in read_key_net\n");
2844             return -1;
2845         }
2846
2847         if (tempbt!=BOUNDARY_TAG) { 
2848           ERROR("Invalid boundary tag (received 0x%x\n",tempbt);
2849           return -1;
2850         }
2851            
2852         temptag=palacios_alloc(taglen);
2853         if (!temptag) {
2854           ERROR("failed to allocate temptag\n");
2855           return -1;
2856         }
2857
2858         if (recv_msg(nks->ns,(char*)(&slen),sizeof(slen))!=sizeof(slen)) { 
2859             ERROR("Cannot receive tag len in read_key_net\n");
2860             palacios_free(temptag);
2861             return -1;
2862         }
2863
2864         if (slen!=taglen) {
2865             ERROR("Tag len expected does not matched tag len decoded in read_key_net\n");
2866             palacios_free(temptag);
2867             return -1;
2868         }
2869
2870         if (recv_msg(nks->ns,temptag,taglen)!=taglen) { 
2871             ERROR("Cannot recieve tag in read_key_net\n");
2872             palacios_free(temptag);
2873             return -1;
2874         }
2875
2876         if (memcmp(temptag,tag,taglen)) { 
2877           ERROR("Tag mismatch\n");
2878           palacios_free(temptag);
2879           return -1;
2880         }
2881
2882         if (recv_msg(nks->ns,(char*)(&slen),sizeof(slen))!=sizeof(slen)) { 
2883             ERROR("Cannot receive data len in read_key_net\n");
2884             palacios_free(temptag);
2885             return -1;
2886         }
2887
2888         if (slen!=len) {
2889             ERROR("Data len expected does not matched data len decoded in read_key_net\n");
2890             palacios_free(temptag);
2891             return -1;
2892         }
2893
2894         if (recv_msg(nks->ns,buf,len)!=len) { 
2895             ERROR("Cannot recieve data in read_key_net\n");
2896             palacios_free(temptag);
2897             return -1;
2898         }
2899
2900         palacios_free(temptag);
2901         
2902     } else {
2903         ERROR("Permissions incorrect for the stream in read_key_net\n");
2904         return -1;
2905     }
2906
2907     return len;
2908     
2909 }
2910
2911
2912 /***************************************************************************************************
2913   Generic interface
2914 *************************************************************************************************/
2915
2916 static v3_keyed_stream_t open_stream(char *url,
2917                                      v3_keyed_stream_open_t ot)
2918 {
2919     if (!strncasecmp(url,"mem:",4)) { 
2920         return open_stream_mem(url,ot);
2921     } else if (!strncasecmp(url,"file:",5)) { 
2922         return open_stream_file(url,ot);
2923     } else if (!strncasecmp(url,"user:",5)) { 
2924         return open_stream_user(url,ot);
2925     } else if (!strncasecmp(url,"net:",4)){
2926         return open_stream_net(url,ot);
2927     } else if (!strncasecmp(url,"textfile:",9)) { 
2928         return open_stream_textfile(url,ot);
2929     } else {
2930         ERROR("unsupported type in attempt to open keyed stream \"%s\"\n",url);
2931         return 0;
2932     }
2933 }
2934
2935 static void close_stream(v3_keyed_stream_t stream)
2936 {
2937     struct generic_keyed_stream *gks = (struct generic_keyed_stream *) stream;
2938     switch (gks->stype){ 
2939         case STREAM_MEM:
2940             return close_stream_mem(stream);
2941             break;
2942         case STREAM_FILE:
2943             return close_stream_file(stream);
2944             break;
2945         case STREAM_TEXTFILE:
2946             return close_stream_textfile(stream);
2947             break;
2948         case STREAM_USER:
2949             return close_stream_user(stream);
2950             break;
2951         case STREAM_NETWORK:
2952             return close_stream_net(stream);
2953             break;
2954         default:
2955             ERROR("unknown stream type %d in close\n",gks->stype);
2956             break;
2957     }
2958 }
2959
2960 static void preallocate_hint_key(v3_keyed_stream_t stream,
2961                                  char *key,
2962                                  uint64_t size)
2963 {
2964     struct generic_keyed_stream *gks = (struct generic_keyed_stream *) stream;
2965     switch (gks->stype){ 
2966         case STREAM_MEM:
2967             preallocate_hint_key_mem(stream,key,size);
2968             break;
2969         case STREAM_FILE:
2970             preallocate_hint_key_file(stream,key,size);
2971             break;
2972         case STREAM_TEXTFILE:
2973             preallocate_hint_key_textfile(stream,key,size);
2974             break;
2975         case STREAM_USER:
2976             return preallocate_hint_key_user(stream,key,size);
2977             break;
2978         case STREAM_NETWORK:
2979             return preallocate_hint_key_net(stream,key,size);
2980             break;
2981         default:
2982             ERROR("unknown stream type %d in preallocate_hint_key\n",gks->stype);
2983             break;
2984     }
2985     return;
2986 }
2987
2988
2989 static v3_keyed_stream_key_t open_key(v3_keyed_stream_t stream,
2990                                       char *key)
2991 {
2992     struct generic_keyed_stream *gks = (struct generic_keyed_stream *) stream;
2993     switch (gks->stype){ 
2994         case STREAM_MEM:
2995             return open_key_mem(stream,key);
2996             break;
2997         case STREAM_FILE:
2998             return open_key_file(stream,key);
2999             break;
3000         case STREAM_TEXTFILE:
3001             return open_key_textfile(stream,key);
3002             break;
3003         case STREAM_USER:
3004             return open_key_user(stream,key);
3005             break;
3006         case STREAM_NETWORK:
3007             return open_key_net(stream, key);
3008             break;
3009         default:
3010             ERROR("unknown stream type %d in open_key\n",gks->stype);
3011             break;
3012     }
3013     return 0;
3014 }
3015
3016
3017 static void close_key(v3_keyed_stream_t stream, 
3018                       v3_keyed_stream_key_t key)
3019 {
3020     struct generic_keyed_stream *gks = (struct generic_keyed_stream *) stream;
3021     switch (gks->stype){ 
3022         case STREAM_MEM:
3023             return close_key_mem(stream,key);
3024             break;
3025         case STREAM_FILE:
3026             return close_key_file(stream,key);
3027             break;
3028         case STREAM_TEXTFILE:
3029             return close_key_textfile(stream,key);
3030             break;
3031         case STREAM_USER:
3032             return close_key_user(stream,key);
3033             break;
3034          case STREAM_NETWORK:
3035             return close_key_net(stream, key);
3036             break;      
3037         default:
3038             ERROR("unknown stream type %d in close_key\n",gks->stype);
3039             break;
3040     }
3041     // nothing to do
3042     return;
3043 }
3044
3045 static sint64_t write_key(v3_keyed_stream_t stream, 
3046                           v3_keyed_stream_key_t key,
3047                           void *tag,
3048                           sint64_t taglen,
3049                           void *buf,
3050                           sint64_t len)
3051 {
3052     struct generic_keyed_stream *gks = (struct generic_keyed_stream *) stream;
3053     switch (gks->stype){ 
3054         case STREAM_MEM:
3055             return write_key_mem(stream,key,tag,taglen,buf,len);
3056             break;
3057         case STREAM_FILE:
3058             return write_key_file(stream,key,tag,taglen,buf,len);
3059             break;
3060         case STREAM_TEXTFILE:
3061             return write_key_textfile(stream,key,tag,taglen,buf,len);
3062             break;
3063         case STREAM_USER:
3064             return write_key_user(stream,key,tag,taglen,buf,len);
3065             break;
3066         case STREAM_NETWORK:
3067             return write_key_net(stream,key,tag,taglen,buf,len);
3068             break;
3069         default:
3070             ERROR("unknown stream type %d in write_key\n",gks->stype);
3071             return -1;
3072             break;
3073     }
3074     return -1;
3075 }
3076
3077
3078 static sint64_t read_key(v3_keyed_stream_t stream, 
3079                          v3_keyed_stream_key_t key,
3080                          void *tag,
3081                          sint64_t taglen,
3082                          void *buf,
3083                          sint64_t len)
3084 {
3085     struct generic_keyed_stream *gks = (struct generic_keyed_stream *) stream;
3086     switch (gks->stype){ 
3087         case STREAM_MEM:
3088           return read_key_mem(stream,key,tag,taglen,buf,len);
3089             break;
3090         case STREAM_FILE:
3091             return read_key_file(stream,key,tag,taglen,buf,len);
3092             break;
3093         case STREAM_TEXTFILE:
3094             return read_key_textfile(stream,key,tag,taglen,buf,len);
3095             break;
3096         case STREAM_USER:
3097             return read_key_user(stream,key,tag,taglen,buf,len);
3098             break;
3099         case STREAM_NETWORK:
3100             return read_key_net(stream,key,tag,taglen,buf,len);
3101             break;
3102         default:
3103             ERROR("unknown stream type %d in read_key\n",gks->stype);
3104             return -1;
3105             break;
3106     }
3107     return -1;
3108 }
3109
3110
3111
3112
3113 /***************************************************************************************************
3114   Hooks to palacios and inititialization
3115 *************************************************************************************************/
3116
3117     
3118 static struct v3_keyed_stream_hooks hooks = {
3119     .open = open_stream,
3120     .close = close_stream,
3121     .preallocate_hint_key = preallocate_hint_key,
3122     .open_key = open_key,
3123     .close_key = close_key,
3124     .read_key = read_key,
3125     .write_key = write_key
3126 };
3127
3128
3129 static int init_keyed_streams( void )
3130 {
3131     mem_streams = palacios_create_htable(DEF_NUM_STREAMS,hash_func,hash_comp);
3132
3133     if (!mem_streams) { 
3134         ERROR("failed to allocated stream pool for in-memory streams\n");
3135         return -1;
3136     }
3137
3138     user_streams = palacios_alloc(sizeof(struct user_keyed_streams));
3139
3140     if (!user_streams) { 
3141         ERROR("failed to allocated list for user streams\n");
3142         return -1;
3143     }
3144
3145     INIT_LIST_HEAD(&(user_streams->streams));
3146     
3147     palacios_spinlock_init(&(user_streams->lock));
3148
3149     V3_Init_Keyed_Streams(&hooks);
3150
3151     return 0;
3152
3153 }
3154
3155 static int deinit_keyed_streams( void )
3156 {
3157     palacios_free_htable(mem_streams,1,1);
3158
3159     palacios_spinlock_deinit(&(user_streams->lock));
3160
3161     palacios_free(user_streams);
3162
3163     WARNING("Deinit of Palacios Keyed Streams likely leaked memory\n");
3164
3165     return 0;
3166 }
3167
3168
3169 static int guest_init_keyed_streams(struct v3_guest * guest, void ** vm_data ) 
3170 {
3171     
3172     add_guest_ctrl(guest, V3_VM_KSTREAM_USER_CONNECT, keyed_stream_connect_user, 0);
3173     
3174     return 0;
3175 }
3176
3177
3178 static int guest_deinit_keyed_streams(struct v3_guest * guest, void * vm_data)
3179 {
3180     remove_guest_ctrl(guest, V3_VM_KSTREAM_USER_CONNECT);
3181
3182     return 0;
3183 }
3184
3185
3186
3187
3188 static struct linux_ext key_stream_ext = {
3189     .name = "KEYED_STREAM_INTERFACE",
3190     .init = init_keyed_streams,
3191     .deinit = deinit_keyed_streams,
3192     .guest_init = guest_init_keyed_streams,
3193     .guest_deinit = guest_deinit_keyed_streams, 
3194 };
3195
3196
3197 register_extension(&key_stream_ext);