Palacios Public Git Repository

To checkout Palacios execute

  git clone http://v3vee.org/palacios/palacios.web/palacios.git
This will give you the master branch. You probably want the devel branch or one of the release branches. To switch to the devel branch, simply execute
  cd palacios
  git checkout --track -b devel origin/devel
The other branches are similar.


Cleanup and sanity-checking of before/after null-check and copy+paste errors (Coverit...
[palacios.git] / linux_module / iface-keyed-stream.c
1 /*
2  * Palacios keyed stream interface
3  *
4  * Plus implementations for mem, file, and user space implementations
5  *
6  * (c) Peter Dinda, 2011 (interface, mem + file implementations + recooked user impl)
7  * (c) Clint Sbisa, 2011 (initial user space implementation on which this is based)
8  * (c) Diana Palsetia & Steve Rangel, 2012 (network based implementation)       
9  * (c) Peter Dinda, 2012 (updated interface, textfile)
10  */
11
12 #include <linux/fs.h>
13 #include <linux/file.h>
14 #include <linux/uaccess.h>
15 #include <linux/namei.h>
16 #include <linux/poll.h>
17 #include <linux/anon_inodes.h>
18
19 #include "palacios.h"
20 #include "util-hashtable.h"
21 #include "linux-exts.h"
22 #include "vm.h"
23
24 #define sint64_t int64_t
25 #include <interfaces/vmm_keyed_stream.h>
26
27 #include "iface-keyed-stream-user.h"
28
29 #include <interfaces/vmm_socket.h>
30
31 #include <linux/spinlock.h>
32 #include <asm/uaccess.h>
33 #include <linux/inet.h>
34 #include <linux/kthread.h>
35 #include <linux/netdevice.h>
36 #include <linux/ip.h>
37 #include <linux/in.h>
38 #include <linux/string.h>
39 #include <linux/preempt.h>
40 #include <linux/sched.h>
41 #include <linux/list.h>
42 #include <linux/syscalls.h>
43
44 #include <linux/module.h>
45 #include <linux/kernel.h>
46 #include <linux/socket.h>
47 #include <linux/net.h>
48 #include <linux/slab.h>
49
50
51 /*
52   This is an implementation of the Palacios keyed stream interface
53   that supports four flavors of streams:
54
55   "mem:"   Streams are stored in a hash table
56   The values for this hash table are hash tables associated with 
57   each stream.   A key maps to an expanding memory buffer.
58   Data is stored in a buffer like:
59
60   [boundarytag][taglen][tag][datalen][data]
61   [boundarytag][taglen][tag][datalen][data]
62   ...
63
64   "file:"  Streams are stored in files.  Each high-level
65   open corresponds to a directory, while a key corresponds to
66   a distinct file in that directory.   Data is stored in a file
67   like:
68
69   [boundarytag][taglen][tag][datalen][data]
70   [boundarytag][taglen][tag][datalen][data]
71   ...
72
73   "textfile:" Same as file, but data is stored in text format, like a
74   windows style .ini file.  A key maps to a file, and data is stored
75   in a file like:
76
77   [key]
78   tag=data_in_hex
79   tag=data_in_hex
80
81   This format makes it possible to concentenate the files to
82   produce a single "ini" file with "sections".
83
84
85   "net:"  Streams are carried over the network.  Each
86    high level open corresponds to a TCP connection, while
87    each key corresponds to a context on the stream.
88       "net:a:<ip>:<port>" => Bind to <ip>:<port> and accept a connection
89       "net:c:<ip>:<port>" => Connect to <ip>:<port>
90    "c" (client) 
91      open_stream: connect
92    "a" (server) 
93      open_stream: accept
94    "c" or "a":
95      open_key:  send [keylen-lastbyte-high-bit][key] (writer)
96            or   recv (same format as above)          (reader)
97      close_key: send [keylen-lastbyte-high-bit][key] (writer)
98            or   recv (same format as above)          (reader)
99      write_key: send [boundarytag][taglen][tag][datalen][data]
100      read_key:  recv (same format as above)
101      close_stream: close socket
102
103   "user:" Stream requests are bounced to user space to be 
104    handled there.  A rendezvous approach similar to the host 
105    device userland support is used
106
107    All keyed streams store the tags.
108    
109 */
110
111 #define STREAM_GENERIC 0
112 #define STREAM_MEM     1
113 #define STREAM_FILE    2
114 #define STREAM_USER    3
115 #define STREAM_NETWORK 4
116 #define STREAM_TEXTFILE 5
117
118 /*
119   All keyed streams and streams indicate their implementation type within the first field
120  */
121 struct generic_keyed_stream {
122     int stype;
123 };
124
125 struct generic_stream {
126     int stype;
127 };
128   
129 /*
130   boundary tags are used for some othe raw formats. 
131 */
132 static uint32_t BOUNDARY_TAG=0xabcd0123;
133
134
135 /****************************************************************************************
136    Memory-based implementation  ("mem:")
137 ****************************************************************************************/
138
139 #define DEF_NUM_STREAMS 16
140 #define DEF_NUM_KEYS    128
141 #define DEF_SIZE        128
142
143 /*
144   A memory keyed stream is a pointer to the underlying hash table
145   while a memory stream contains an extensible buffer for the stream
146  */
147 struct mem_keyed_stream {
148     int stype;
149     v3_keyed_stream_open_t ot;
150     struct hashtable *ht;
151 };
152
153 struct mem_stream {
154     int       stype;
155     char     *data;
156     uint32_t  size;
157     uint32_t  data_max;
158     uint32_t  ptr;
159 };
160
161 static struct mem_stream *create_mem_stream_internal(uint64_t size)
162 {
163     struct mem_stream *m = palacios_alloc(sizeof(struct mem_stream));
164
165     if (!m) {
166         return 0;
167     }
168
169
170     m->data = palacios_valloc(size);
171     
172     if (!m->data) { 
173         palacios_free(m);
174         return 0;
175     }
176
177     m->stype = STREAM_MEM;
178     m->size=size;
179     m->ptr=0;
180     m->data_max=0;
181     
182     return m;
183 }
184
185
186 static struct mem_stream *create_mem_stream(void)
187 {
188     return create_mem_stream_internal(DEF_SIZE);
189 }
190
191 static void destroy_mem_stream(struct mem_stream *m)
192 {
193     if (m) {
194         if (m->data) {
195             palacios_vfree(m->data);
196         }
197         m->data=0;
198         palacios_free(m);
199     }
200 }
201     
202 static int expand_mem_stream(struct mem_stream *m, uint32_t new_size)
203 {
204     void *data = palacios_valloc(new_size);
205     uint32_t nc;
206
207     if (!data) { 
208         return -1;
209     }
210     
211     nc = (new_size<m->data_max) ? new_size : m->data_max;
212
213     memcpy(data,m->data,nc);
214
215     palacios_vfree(m->data);
216
217     m->data=data;
218     m->size=new_size;
219     if (m->size<m->data_max) { 
220         m->data_max=m->size;
221     }
222    
223     return 0;
224 }
225
226 static uint32_t write_mem_stream(struct mem_stream *m,
227                                  void *data,
228                                  uint32_t len)
229 {
230     if ((m->ptr + len) > m->size) { 
231         if (expand_mem_stream(m,m->ptr + len)) { 
232             return 0;
233         }
234     }
235     memcpy(m->data+m->ptr,data,len);
236     m->ptr+=len;
237     m->data_max=m->ptr;
238     
239     return len;
240
241 }
242
243
244
245 static uint32_t read_mem_stream(struct mem_stream *m,
246                                 void *data,
247                                 uint32_t len)
248 {
249     if ((m->ptr + len) > m->data_max) { 
250         return 0;
251     }
252     memcpy(data,m->data+m->ptr,len);
253     m->ptr+=len;
254     
255     return len;
256
257 }
258
259
260 static void reset_mem_stream(struct mem_stream *m)
261 {
262     m->ptr=0;
263 }
264
265
266 static inline uint_t hash_func(addr_t key)
267 {
268     return palacios_hash_buffer((uchar_t*)key,strlen((uchar_t*)key));
269 }
270
271 static inline int hash_comp(addr_t k1, addr_t k2)
272 {
273     return strcasecmp((char*)k1,(char*)k2)==0;
274 }
275
276
277 // This stores all the memory keyed streams streams
278 static struct hashtable *mem_streams=0;
279
280
281 static v3_keyed_stream_t open_stream_mem(char *url,
282                                          v3_keyed_stream_open_t ot)
283 {
284
285     if (strncasecmp(url,"mem:",4)) { 
286         WARNING("illegitimate attempt to open memory stream \"%s\"\n",url);
287         return 0;
288     }
289
290     switch (ot) { 
291         case V3_KS_RD_ONLY:
292         case V3_KS_WR_ONLY: {
293             struct mem_keyed_stream *mks = (struct mem_keyed_stream *) palacios_htable_search(mem_streams,(addr_t)(url+4));
294             if (mks) { 
295                 mks->ot=ot;
296             }
297             return (v3_keyed_stream_t) mks;
298         }
299             break;
300
301         case V3_KS_WR_ONLY_CREATE: {
302             struct mem_keyed_stream *mks = (struct mem_keyed_stream *) palacios_htable_search(mem_streams,(addr_t)(url+4));
303             if (!mks) { 
304                 char *mykey;
305
306                 mykey = palacios_alloc(strlen(url+4)+1);
307
308                 if (!mykey) { 
309                     ERROR("cannot allocate space for new in-memory keyed stream %s\n",url);
310                     return 0;
311                 }
312
313                 strcpy(mykey,url+4); // will fit
314                 
315                 mks = (struct mem_keyed_stream *) palacios_alloc(sizeof(struct mem_keyed_stream));
316
317                 if (!mks) { 
318                     palacios_free(mykey);
319                     ERROR("cannot allocate in-memory keyed stream %s\n",url);
320                     return 0;
321                 }
322             
323                 mks->ht = (void*) palacios_create_htable(DEF_NUM_KEYS,hash_func,hash_comp);
324                 if (!mks->ht) { 
325                     palacios_free(mks);
326                     palacios_free(mykey);
327                     ERROR("cannot allocate in-memory keyed stream %s\n",url);
328                     return 0;
329                 }
330
331                 
332                 if (!palacios_htable_insert(mem_streams,(addr_t)(mykey),(addr_t)mks)) { 
333                     palacios_free_htable(mks->ht,1,1);
334                     palacios_free(mks);
335                     palacios_free(mykey);
336                     ERROR("cannot insert in-memory keyed stream %s\n",url);
337                     return 0;
338                 }
339                 mks->stype=STREAM_MEM;
340             }
341
342             mks->ot=V3_KS_WR_ONLY;
343             
344             return mks;
345             
346         }
347             break;
348
349         default:
350             ERROR("unsupported open type in open_stream_mem\n");
351             break;
352     }
353     
354     return 0;
355         
356 }
357
358
359
360 static void close_stream_mem(v3_keyed_stream_t stream)
361 {
362     // nothing to do
363     return;
364 }
365
366
367 static v3_keyed_stream_key_t open_key_mem(v3_keyed_stream_t stream,
368                                           char *key)
369 {
370     struct mem_keyed_stream *mks = (struct mem_keyed_stream *) stream;
371     struct hashtable *s = mks->ht;
372
373     struct mem_stream *m;
374
375     m = (struct mem_stream *) palacios_htable_search(s,(addr_t)key);
376
377     if (!m) { 
378         char *mykey = palacios_alloc(strlen(key)+1);
379
380         if (!mykey) { 
381             ERROR("cannot allocate copy of key for key %s\n",key);
382             return 0;
383         }
384
385         strcpy(mykey,key); // will fit
386
387         m = create_mem_stream();
388         
389         if (!m) { 
390             palacios_free(mykey);
391             ERROR("cannot allocate mem keyed stream for key %s\n",key);
392             return 0;
393         }
394
395         if (!palacios_htable_insert(s,(addr_t)mykey,(addr_t)m)) {
396             destroy_mem_stream(m);
397             palacios_free(mykey);
398             ERROR("cannot insert mem keyed stream for key %s\n",key);
399             return 0;
400         }
401     }
402
403     reset_mem_stream(m);
404     return m;
405
406 }
407
408
409 static void preallocate_hint_key_mem(v3_keyed_stream_t stream,
410                                      char *key,
411                                      uint64_t size)
412 {
413     struct mem_keyed_stream *mks = (struct mem_keyed_stream *) stream;
414     struct hashtable *s = mks->ht;
415
416     struct mem_stream *m;
417
418     if (mks->ot != V3_KS_WR_ONLY) { 
419         return;
420     }
421
422     m = (struct mem_stream *) palacios_htable_search(s,(addr_t)key);
423
424     if (!m) {
425         char *mykey;
426         
427         mykey=palacios_alloc(strlen(key)+1);
428         
429         if (!mykey) { 
430             ERROR("cannot allocate key space for preallocte for key %s\n",key);
431             return;
432         }
433         
434         strcpy(mykey,key); // will fit
435        
436         m = create_mem_stream_internal(size);
437         
438         if (!m) { 
439             ERROR("cannot preallocate mem keyed stream for key %s\n",key);
440             return;
441         }
442
443         if (!palacios_htable_insert(s,(addr_t)mykey,(addr_t)m)) {
444             ERROR("cannot insert preallocated mem keyed stream for key %s\n",key);
445             destroy_mem_stream(m);
446             return;
447         }
448     } else {
449         if (m->data_max < size) { 
450             if (expand_mem_stream(m,size)) { 
451                 ERROR("cannot expand key for preallocation for key %s\n",key);
452                 return;
453             }
454         }
455     }
456
457     return;
458
459 }
460
461 static void close_key_mem(v3_keyed_stream_t stream, 
462                           v3_keyed_stream_key_t key)
463 {
464     // nothing to do
465     return;
466 }
467
468 static sint64_t write_key_mem(v3_keyed_stream_t stream, 
469                               v3_keyed_stream_key_t key,
470                               void *tag,
471                               sint64_t taglen,
472                               void *buf,
473                               sint64_t len)
474 {
475   struct mem_keyed_stream *mks = (struct mem_keyed_stream *) stream;
476   struct mem_stream *m = (struct mem_stream *) key;
477   uint32_t mylen;
478   uint32_t writelen;
479   
480   if (mks->ot!=V3_KS_WR_ONLY) {
481     return -1;
482   }
483   
484   if (taglen<0 || len<0) { 
485     ERROR("Negative taglen or data len\n");
486     return -1;
487   }
488   
489   if (taglen>0xffffffffULL || len>0xffffffffULL) { 
490     ERROR("taglen or data len is too large\n");
491     return -1;
492   }
493       
494   writelen=write_mem_stream(m,&BOUNDARY_TAG,sizeof(BOUNDARY_TAG));
495   
496   if (writelen!=sizeof(BOUNDARY_TAG)) { 
497     ERROR("failed to write all data for boundary tag\n");
498     return -1;
499   }
500   
501   writelen=write_mem_stream(m,&taglen,sizeof(taglen));
502   
503   if (writelen!=sizeof(taglen)) { 
504     ERROR("failed to write taglen\n");
505     return -1;
506   }
507   
508   mylen = (uint32_t) taglen;
509   
510   writelen=write_mem_stream(m,tag,mylen);
511   
512   if (writelen!=mylen) { 
513     ERROR("failed to write all data for tag\n");
514     return -1;
515   } 
516
517   writelen=write_mem_stream(m,&len,sizeof(len));
518   
519   if (writelen!=sizeof(len)) { 
520     ERROR("failed to write datalen\n");
521     return -1;
522   }
523   
524   mylen = (uint32_t) len;
525
526   writelen=write_mem_stream(m,buf,mylen);
527
528   if (writelen!=mylen) { 
529     ERROR("failed to write all data for key\n");
530     return -1;
531   } else {
532     return (sint64_t)writelen;
533   }
534 }
535
536 static sint64_t read_key_mem(v3_keyed_stream_t stream, 
537                              v3_keyed_stream_key_t key,
538                              void *tag,
539                              sint64_t taglen,
540                              void *buf,
541                              sint64_t len)
542 {
543   struct mem_keyed_stream *mks = (struct mem_keyed_stream *) stream;
544   struct mem_stream *m = (struct mem_stream *) key;
545   uint32_t mylen;
546   uint32_t readlen;
547   void *temptag;
548   uint32_t tempbt;
549   sint64_t templen;
550   
551   
552   if (mks->ot!=V3_KS_RD_ONLY) {
553     return -1;
554   }
555   
556   if (len<0 || taglen<0) { 
557     ERROR("taglen or data len is negative\n");
558     return -1;
559   }
560
561   if (len>0xffffffffULL || taglen>0xffffffffULL) { 
562     ERROR("taglen or data len is too large\n");
563     return -1;
564   }
565   
566   readlen=read_mem_stream(m,&tempbt,sizeof(tempbt));
567   
568   if (readlen!=sizeof(tempbt)) { 
569     ERROR("failed to read all data for boundary tag\n");
570     return -1;
571   } 
572   
573   if (tempbt!=BOUNDARY_TAG) { 
574     ERROR("boundary tag not found (read 0x%x)\n",tempbt);
575     return -1;
576   }
577   
578   readlen=read_mem_stream(m,&templen,sizeof(templen));
579
580   if (readlen!=sizeof(templen)) { 
581     ERROR("failed to read all data for taglen\n");
582     return -1;
583   } 
584
585   if (templen!=taglen) { 
586     ERROR("tag size mismatch (requested=%lld, actual=%lld)\n",taglen,templen);
587     return -1;
588   }
589   
590   temptag = palacios_alloc(taglen);
591     
592   if (!temptag) { 
593     ERROR("cannot allocate temporary tag\n");
594     return -1;
595   }
596
597   mylen = (uint32_t) len;
598     
599   readlen=read_mem_stream(m,temptag,mylen);
600     
601   if (readlen!=mylen) { 
602     ERROR("failed to read all data for tag\n");
603     palacios_free(temptag);
604     return -1;
605   } 
606
607   if (memcmp(tag,temptag,taglen)) { 
608     ERROR("tag mismatch\n");
609     palacios_free(temptag);
610     return -1;
611   }
612   
613   palacios_free(temptag);
614
615   readlen=read_mem_stream(m,&templen,sizeof(templen));
616
617   if (readlen!=sizeof(templen)) { 
618     ERROR("failed to read all data for data len\n");
619     return -1;
620   } 
621   
622   if (templen!=len) { 
623     ERROR("data size mismatch (requested=%lld, actual=%lld)\n",len,templen);
624     return -1;
625   }
626
627   mylen = (uint32_t) len;
628   
629   readlen=read_mem_stream(m,buf,mylen);
630   
631   if (readlen!=mylen) { 
632     ERROR("failed to read all data for key\n");
633     return -1;
634   } else {
635     return (sint64_t)readlen;
636   }
637 }
638
639
640 /***************************************************************************************************
641   File-based implementation  ("file:")
642 *************************************************************************************************/
643
644 /*
645   A file keyed stream contains the fd of the directory
646   and a path
647 */
648
649 struct file_keyed_stream {
650     int   stype;
651     v3_keyed_stream_open_t ot;
652     char  *path;
653 };
654
655 struct file_stream {
656     int   stype;
657     struct file *f;   // the opened file
658 };
659
660
661 static v3_keyed_stream_t open_stream_file(char *url,
662                                           v3_keyed_stream_open_t ot)
663 {
664     struct file_keyed_stream *fks;
665     struct nameidata nd;
666
667     if (strncasecmp(url,"file:",5)) { 
668         WARNING("illegitimate attempt to open file stream \"%s\"\n",url);
669         return 0;
670     }
671
672     fks = palacios_alloc(sizeof(struct file_keyed_stream));
673     
674     if (!fks) { 
675         ERROR("cannot allocate space for file stream\n");
676         return 0;
677     }
678
679     fks->path = (char*)palacios_alloc(strlen(url+5)+1);
680     
681     if (!(fks->path)) { 
682         ERROR("cannot allocate space for file stream\n");
683         palacios_free(fks);
684         return 0;
685     }
686     
687     strcpy(fks->path,url+5); // will fit
688     
689     fks->stype=STREAM_FILE;
690
691     fks->ot= ot==V3_KS_WR_ONLY_CREATE ? V3_KS_WR_ONLY : ot;
692
693     // Does the directory exist, and can we read/write it?
694    
695     if (path_lookup(fks->path,LOOKUP_DIRECTORY|LOOKUP_FOLLOW,&nd)) { 
696
697         // directory does does not exist.  
698
699         if (ot==V3_KS_RD_ONLY || ot==V3_KS_WR_ONLY) { 
700
701             // we are not being asked to create it
702             ERROR("attempt to open %s, which does not exist\n",fks->path);
703             goto fail_out;
704
705         } else {
706
707             // We are being asked to create it
708
709             struct dentry *de;
710             int err;
711
712             // Find its parent
713             if (path_lookup(fks->path,LOOKUP_PARENT|LOOKUP_FOLLOW,&nd)) { 
714                 ERROR("attempt to create %s failed because its parent cannot be looked up\n",fks->path);
715                 goto fail_out;
716             }
717
718             // Can we write to the parent?
719
720             if (inode_permission(nd.path.dentry->d_inode, MAY_WRITE | MAY_EXEC)) { 
721                 ERROR("attempt to open %s, which has the wrong permissions for directory creation\n",fks->path);
722                 goto fail_out;
723             }
724
725             // OK, we can, so let's create it
726
727             de = lookup_create(&nd,1);
728
729             if (!de || IS_ERR(de)) { 
730                 ERROR("cannot allocate dentry\n");
731                 goto fail_out;
732             }
733
734             err = vfs_mkdir(nd.path.dentry->d_inode, de, 0700);
735
736             // lookup_create locks this for us!
737
738             mutex_unlock(&(nd.path.dentry->d_inode->i_mutex));
739
740             if (err) {
741                 ERROR("attempt to create %s failed because mkdir failed\n",fks->path);
742                 goto fail_out;
743             }
744
745             // now the directory should exist and have reasonable permissions
746             return (v3_keyed_stream_t) fks;
747         }
748     } 
749
750     
751     // we must be in V3_KS_RD_ONLY or V3_KS_WR_ONLY, 
752     // and the directory exists, so we must check the permissions
753
754     if (inode_permission(nd.path.dentry->d_inode, MAY_EXEC | (ot==V3_KS_RD_ONLY ? MAY_READ : MAY_WRITE))) {
755         ERROR("attempt to open %s, which has the wrong permissions\n",fks->path);
756         goto fail_out;
757     } else {
758         return (v3_keyed_stream_t) fks;
759     }
760
761
762  fail_out:
763     palacios_free(fks->path);
764     palacios_free(fks);
765     return 0;
766
767 }
768
769 static void close_stream_file(v3_keyed_stream_t stream)
770 {
771     struct file_keyed_stream *fks = (struct file_keyed_stream *) stream;
772     
773     palacios_free(fks->path);
774     palacios_free(fks);
775
776 }
777
778 static void preallocate_hint_key_file(v3_keyed_stream_t stream,
779                                       char *key,
780                                       uint64_t size)
781 {
782     return;
783 }
784
785 static v3_keyed_stream_key_t open_key_file(v3_keyed_stream_t stream,
786                                            char *key)
787 {
788     struct file_keyed_stream *fks = (struct file_keyed_stream *) stream;
789     struct file_stream *fs;
790     char *path;
791
792     // the path is the stream's path plus the key name
793     // file:/home/foo + "regext" => "/home/foo/regext"
794     path = (char *) palacios_alloc(strlen(fks->path)+strlen(key)+2);
795     if (!path) {                                
796         ERROR("cannot allocate file keyed stream for key %s\n",key);
797         return 0;
798     }
799     // this sequence will fit and terminate with a zero
800     strcpy(path,fks->path);
801     strcat(path,"/");
802     strcat(path,key);
803     
804     fs = (struct file_stream *) palacios_alloc(sizeof(struct file_stream));
805     
806     if (!fs) { 
807         ERROR("cannot allocate file keyed stream for key %s\n",key);
808         palacios_free(path);
809         return 0;
810     }
811
812     fs->stype=STREAM_FILE;
813
814     fs->f = filp_open(path,O_RDWR|O_CREAT|O_LARGEFILE,0600);
815
816     if (!fs->f || IS_ERR(fs->f)) {
817         ERROR("cannot open relevent file \"%s\" for stream \"file:%s\" and key \"%s\"\n",path,fks->path,key);
818         palacios_free(fs);
819         palacios_free(path);
820         return 0;
821     }
822
823     palacios_free(path);
824
825     return fs;
826 }
827
828
829 static void close_key_file(v3_keyed_stream_t stream, 
830                            v3_keyed_stream_key_t key)
831 {
832     struct file_stream *fs = (struct file_stream *) key;
833
834     filp_close(fs->f,NULL);
835
836     palacios_free(fs);
837 }
838
839
840 static sint64_t write_file(struct file_stream *fs, void *buf, sint64_t len)
841 {
842     ssize_t done, left, total;
843     mm_segment_t old_fs;
844
845     total=len;
846     left=len;
847
848     while (left>0) {
849         old_fs = get_fs();
850         set_fs(get_ds());
851         done = fs->f->f_op->write(fs->f, buf+(total-left), left, &(fs->f->f_pos));
852         set_fs(old_fs);
853         if (done<=0) {
854             return -1;
855         } else {
856             left -= done;
857         }
858     }
859
860     return len;
861 }
862
863 static sint64_t write_key_file(v3_keyed_stream_t stream, 
864                                v3_keyed_stream_key_t key,
865                                void *tag,
866                                sint64_t taglen,
867                                void *buf,
868                                sint64_t len)
869 {
870   struct file_keyed_stream *fks = (struct file_keyed_stream *) stream;
871   struct file_stream *fs = (struct file_stream *) key;
872   sint64_t writelen;
873   
874   if (fks->ot!=V3_KS_WR_ONLY) { 
875     return -1;
876   }
877   
878   if (taglen<0 || len<0) { 
879     ERROR("Negative taglen or data len\n");
880     return -1;
881   }
882   
883   writelen=write_file(fs,&BOUNDARY_TAG,sizeof(BOUNDARY_TAG));
884   
885   if (writelen!=sizeof(BOUNDARY_TAG)) { 
886     ERROR("failed to write all data for boundary tag\n");
887     return -1;
888   }
889   
890   writelen=write_file(fs,&taglen,sizeof(taglen));
891   
892   if (writelen!=sizeof(taglen)) { 
893     ERROR("failed to write taglen\n");
894     return -1;
895   }
896   
897   if (write_file(fs,tag,taglen)!=taglen) { 
898     ERROR("failed to write tag\n");
899     return -1;
900   }
901
902   writelen=write_file(fs,&len,sizeof(len));
903   
904   if (writelen!=sizeof(len)) { 
905     ERROR("failed to write data len\n");
906     return -1;
907   }
908   
909   return write_file(fs,buf,len);
910 }
911
912 static sint64_t read_file(struct file_stream *fs, void *buf, sint64_t len)
913 {
914     ssize_t done, left, total;
915     mm_segment_t old_fs;
916
917     total=len;
918     left=len;
919
920
921     while (left>0) {
922         old_fs = get_fs();
923         set_fs(get_ds());
924         done = fs->f->f_op->read(fs->f, buf+(total-left), left, &(fs->f->f_pos));
925         set_fs(old_fs);
926         if (done<=0) {
927             return -1;
928         } else {
929             left -= done;
930         }
931     }
932
933     return len;
934 }
935
936
937 static sint64_t read_key_file(v3_keyed_stream_t stream, 
938                               v3_keyed_stream_key_t key,
939                               void *tag,
940                               sint64_t taglen,
941                               void *buf,
942                               sint64_t len)
943 {
944   struct file_keyed_stream *fks = (struct file_keyed_stream *) stream;
945   struct file_stream *fs = (struct file_stream *) key;
946   void *temptag;
947   uint32_t tempbt;
948   sint64_t templen;
949   sint64_t readlen;
950   
951   if (fks->ot!=V3_KS_RD_ONLY) { 
952     return -1;
953   }
954   
955   if (len<0 || taglen<0) { 
956     ERROR("taglen or data len is negative\n");
957     return -1;
958   }
959
960   readlen=read_file(fs,&tempbt,sizeof(tempbt));
961   
962   if (readlen!=sizeof(tempbt)) { 
963     ERROR("failed to read all data for boundary tag\n");
964     return -1;
965   } 
966   
967   if (tempbt!=BOUNDARY_TAG) { 
968     ERROR("boundary tag not found (read 0x%x)\n",tempbt);
969     return -1;
970   }
971
972   readlen=read_file(fs,&templen,sizeof(templen));
973   
974   if (readlen!=sizeof(templen)) { 
975     ERROR("failed to read all data for tag len\n");
976     return -1;
977   } 
978
979   if (templen!=taglen) { 
980     ERROR("tag size mismatch (requested=%lld, actual=%lld)\n",taglen,templen);
981     return -1;
982   }
983
984   temptag=palacios_alloc(taglen);
985
986   if (!temptag) { 
987     ERROR("Cannot allocate temptag\n");
988     return -1;
989   }
990   
991   if (read_file(fs,temptag,taglen)!=taglen) { 
992     ERROR("Cannot read tag\n");
993     palacios_free(temptag);
994     return -1;
995   }
996
997   if (memcmp(temptag,tag,taglen)) { 
998     ERROR("Tag mismatch\n");
999     palacios_free(temptag);
1000     return -1;
1001   }
1002   
1003   palacios_free(temptag);
1004
1005   readlen=read_file(fs,&templen,sizeof(templen));
1006   
1007   if (readlen!=sizeof(templen)) { 
1008     ERROR("failed to read all data for data len\n");
1009     return -1;
1010   } 
1011
1012   if (templen!=len) { 
1013     ERROR("datasize mismatch (requested=%lld, actual=%lld)\n",len,templen);
1014     return -1;
1015   }
1016
1017   return read_file(fs,buf,len);
1018
1019 }
1020
1021
1022 /***************************************************************************************************
1023   Textfile-based implementation  ("textfile:")
1024
1025   Note that this implementation uses the internal structure and functions of the 
1026   "file:" implementation
1027 *************************************************************************************************/
1028
1029 // optimize the reading and decoding of hex data
1030 // this weakens the parser, so that:
1031 // tag =0A0B0D 
1032 // will work, but
1033 // tag = 0A 0B 0D
1034 // will not.  Note the leading whitespace
1035 #define TEXTFILE_OPT_HEX 0
1036
1037 //
1038 // The number of bytes handled at a time by the hex putter and getter
1039 //
1040 #define MAX_HEX_SEQ 64
1041
1042 /*
1043   A text file keyed stream is a file_keyed_stream,
1044   only with a different stype
1045 */
1046
1047 #define PAUSE()  
1048 //#define PAUSE() ssleep(5) 
1049
1050
1051 typedef struct file_keyed_stream textfile_keyed_stream;
1052
1053 typedef struct file_stream textfile_stream;
1054
1055
1056 static v3_keyed_stream_t open_stream_textfile(char *url,
1057                                               v3_keyed_stream_open_t ot)
1058 {
1059   textfile_keyed_stream *me;
1060
1061   if (strncasecmp(url,"textfile:",9)) { 
1062     WARNING("illegitimate attempt to open textfile stream \"%s\"\n",url);
1063     return 0;
1064   }
1065
1066   me = (textfile_keyed_stream *) open_stream_file(url+4, ot);
1067
1068   if (!me) {
1069     ERROR("could not create underlying file stream\n");
1070     return 0;
1071   }
1072
1073   me->stype=STREAM_TEXTFILE;
1074
1075   return me;
1076 }
1077
1078   
1079
1080 static void close_stream_textfile(v3_keyed_stream_t stream)
1081 {
1082   textfile_keyed_stream *me = stream;
1083
1084   me->stype=STREAM_FILE;
1085
1086   close_stream_file(me);
1087
1088 }
1089
1090 static void preallocate_hint_key_textfile(v3_keyed_stream_t stream,
1091                                           char *key,
1092                                           uint64_t size)
1093 {
1094   textfile_keyed_stream *me = stream;
1095   
1096   me->stype=STREAM_FILE;
1097
1098   preallocate_hint_key_file(me,key,size);
1099   
1100   me->stype=STREAM_TEXTFILE;
1101  
1102 }
1103
1104
1105 static inline int isoneof(char c, char *sl, int m)
1106 {
1107   int i;
1108   
1109   for (i=0;i<m;i++) { 
1110     if (c==sl[i]) { 
1111       return 1;
1112     }
1113   }
1114   
1115   return 0;
1116 }
1117
1118 static char get_next_char(textfile_stream *s)
1119 {
1120   char buf;
1121   if (read_file(s,&buf,1)!=1) { 
1122     return -1;
1123   } 
1124   return buf;
1125 }
1126
1127 static char hexify_nybble(char c)
1128 {
1129   if (c>=0 && c<=9) { 
1130     return '0'+c;
1131   } else if (c>=0xa && c<=0xf) { 
1132     return 'a'+(c-0xa);
1133   } else {
1134     return -1;
1135   }
1136 }
1137
1138 static int hexify_byte(char *c, char b)
1139 {
1140   char n;
1141   n = hexify_nybble( (b >> 4) & 0xf);
1142   if (n==-1) { 
1143     return -1;
1144   }
1145   c[0] = n;
1146   n = hexify_nybble( b & 0xf);
1147   if (n==-1) { 
1148     return -1;
1149   }
1150   c[1] = n;
1151   return 0;
1152 }
1153
1154
1155 static char dehexify_nybble(char c)
1156 {
1157   if (c>='0' && c<='9') { 
1158     return c-'0';
1159   } else if (c>='a' && c<='f') {
1160     return 0xa + (c-'a');
1161   } else if (c>='A' && c<='F') { 
1162     return 0xa + (c-'A');
1163   } else {
1164     return -1;
1165   }
1166 }
1167
1168 static int dehexify_byte(char *c, char *b)
1169 {
1170   char n;
1171   n = dehexify_nybble(c[0]);
1172   if (n==-1) { 
1173     return -1;
1174   }
1175   *b = n << 4;
1176   n = dehexify_nybble(c[1]);
1177   if (n==-1) { 
1178     return -1;
1179   }
1180   *b |= n;
1181   return 0;
1182 }
1183
1184
1185 #if TEXTFILE_OPT_HEX
1186
1187
1188 // Here the sl array, of length m is the number
1189 static int get_hexbytes_as_data(textfile_stream *s, char *buf, int n)
1190 {
1191   char rbuf[MAX_HEX_SEQ*2];
1192   int left = n;
1193   int off = 0;
1194   int cur = 0;
1195   int i;
1196
1197   while (left>0) {
1198     cur = left > MAX_HEX_SEQ ? MAX_HEX_SEQ : left;
1199     if (read_file(s,rbuf,cur*2)!=cur*2) { 
1200       ERROR("Cannot read data in getting hexbytes as data\n");
1201       return -1;
1202     }
1203     
1204     for (i=0;i<cur;i++) {
1205       if (dehexify_byte(rbuf+(i*2),buf+off+i)==-1) { 
1206         ERROR("Cannot decode data as hex in getting hexbytes as data\n");
1207         return -1;
1208       }
1209     }
1210     left-=cur;
1211     off+=cur;
1212   } 
1213
1214   return 0;
1215 }    
1216
1217 #endif
1218
1219 // Here the sl array, of length m is the set of characters to skip (e.g., whitespace)
1220 static int get_hexbytes_as_data_skip(textfile_stream *s, char *buf, int n, char *sl, int m)
1221 {
1222   char rbuf[2];
1223   int which = 0;
1224   int cur=0;
1225     
1226   while (cur<n) {
1227     which=0;
1228     while (which<2) { 
1229       rbuf[which] = get_next_char(s);
1230       if (rbuf[which]==-1) { 
1231         ERROR("Cannot read char in getting hexbytes as data with skiplist");
1232         return -1;
1233       }
1234       if (isoneof(rbuf[which],sl,m)) { 
1235         continue;
1236       } else {
1237         which++;
1238       }
1239     }
1240     if (dehexify_byte(rbuf,buf+cur)==-1) { 
1241       ERROR("Cannot decode data as hex in getting hexbytes as data with skiplist\n");
1242       return -1;
1243     } else {
1244       cur++;
1245     }
1246   }
1247   return 0;
1248 }    
1249
1250 /*
1251 static int put_next_char(textfile_stream *s, char d)
1252 {
1253   return write_file(s,&d,1);
1254 }
1255 */
1256
1257
1258 static int put_data_as_hexbytes(textfile_stream *s, char *buf, int n)
1259 {
1260   char rbuf[MAX_HEX_SEQ*2];
1261   int left = n;
1262   int off = 0;
1263   int cur = 0;
1264   int i;
1265
1266   while (left>0) {
1267     cur = left > MAX_HEX_SEQ ? MAX_HEX_SEQ : left;
1268     for (i=0;i<cur;i++) {
1269       if (hexify_byte(rbuf+(i*2),*(buf+off+i))==-1) { 
1270         ERROR("Cannot encode data as hex in putting data as hexbytes\n");
1271         return -1;
1272       }
1273     }
1274     if (write_file(s,rbuf,cur*2)!=cur*2) { 
1275       ERROR("Cannot write data in putting data as hexbytes\n");
1276       return -1;
1277     }
1278     left-=cur;
1279     off+=cur;
1280   } 
1281
1282   return 0;
1283 }    
1284
1285
1286 static int put_string_n(textfile_stream *s, char *buf, int n)
1287 {
1288   int rc;
1289
1290   rc = write_file(s,buf,n);
1291   
1292   if (rc!=n) { 
1293     return -1;
1294   } else {
1295     return 0;
1296   }
1297 }
1298
1299 static int put_string(textfile_stream *s, char *buf)
1300 {
1301   int n=strlen(buf);
1302
1303   return put_string_n(s,buf,n);
1304 }
1305
1306
1307
1308 static int search_for(textfile_stream *s, char d)
1309 {
1310   char c;
1311   do {
1312     c=get_next_char(s);
1313   } while (c!=-1 && c!=d);
1314   
1315   if (c==d) { 
1316     return 0;
1317   } else {
1318     return -1;
1319   }
1320 }
1321
1322 /*
1323 static int skip_matching(textfile_stream *s, char *m, int n)
1324 {
1325   char c;
1326   int rc = 0;
1327   int i;
1328
1329   while (rc==0) { 
1330     c=get_next_char(s);
1331     if (c==-1) { 
1332       rc=-1;
1333     } else {
1334       for (i=0;i<n;i++) { 
1335         if (c==m[i]) {
1336           rc=1;
1337           break;
1338         } 
1339       }
1340     }
1341   }
1342   
1343   if (rc==1) { 
1344     return 0;  // found
1345   } else {
1346     return rc; // unknown
1347   }
1348 }
1349
1350 */
1351
1352
1353 static int token_scan(textfile_stream *s, char *token, int n, char *sl, int m)
1354 {
1355   char c;
1356   int cur;
1357
1358   // Skip whitespace
1359   do {
1360     c=get_next_char(s);
1361     if (c==-1) { 
1362       ERROR("Failed to get character during token scan (preceding whitespace)\n");
1363       return -1;
1364     }
1365   } while (isoneof(c,sl,m));
1366
1367
1368   token[0]=c;
1369   
1370   // Record
1371   cur=1;
1372   while (cur<(n-1)) { 
1373     c=get_next_char(s);
1374     if (c==-1) { 
1375       ERROR("Failed to get character during token scan (token)\n");
1376       return -1;
1377     }
1378     if (isoneof(c,sl,m)) { 
1379       break;
1380     } else {
1381       token[cur]=c;
1382       cur++;
1383     } 
1384   }
1385   token[cur]=0;
1386   return 0;
1387 }
1388
1389
1390
1391 static v3_keyed_stream_key_t open_key_textfile(v3_keyed_stream_t stream,
1392                                                char *key)
1393 {
1394   textfile_keyed_stream *mks = stream;
1395   textfile_stream *ms;
1396
1397   mks->stype=STREAM_FILE;
1398
1399   ms = open_key_file(mks,key);
1400
1401   if (!ms) { 
1402     ERROR("cannot open underlying file keyed stream for key %s\n",key);
1403     mks->stype=STREAM_TEXTFILE;
1404     return 0;
1405   }
1406
1407   if (mks->ot==V3_KS_WR_ONLY) { 
1408     
1409     // Now we write the section header
1410
1411     ms->stype=STREAM_FILE;
1412     
1413     if (put_string(ms,"[")) { 
1414       close_key_file(mks,ms);
1415       mks->stype=STREAM_TEXTFILE;
1416       return 0;
1417     }
1418     
1419     if (put_string(ms,key)) { 
1420       close_key_file(mks,ms);
1421       mks->stype=STREAM_TEXTFILE;
1422       return 0;
1423     }
1424     
1425     if (put_string(ms,"]\n")) {
1426       close_key_file(mks,ms);
1427       mks->stype=STREAM_TEXTFILE;
1428       return 0;
1429     }
1430     
1431     
1432     mks->stype=STREAM_TEXTFILE;
1433     ms->stype=STREAM_TEXTFILE;
1434
1435     return ms;
1436
1437   } else if (mks->ot == V3_KS_RD_ONLY) {
1438     // Now we readthe section header
1439     int keylen=strlen(key);
1440     char *tempkey = palacios_alloc(keylen+3);
1441
1442     ms->stype=STREAM_FILE;
1443
1444     if (!tempkey) { 
1445       ERROR("Allocation failed in opening key\n");
1446       close_key_file(mks,ms);
1447       mks->stype=STREAM_FILE;
1448       return 0;
1449     }
1450
1451
1452     if (token_scan(ms,tempkey,keylen+3,"\t\r\n",3)) { 
1453       ERROR("Cannot scan for token (key search)\n");
1454       close_key_file(mks,ms);
1455       mks->stype=STREAM_TEXTFILE;
1456       palacios_free(tempkey);
1457       return 0;
1458     }
1459     
1460     tempkey[keylen+2] = 0;
1461     
1462     // Should now have [key]0
1463
1464     if (tempkey[0]!='[' ||
1465         tempkey[keylen+1]!=']' ||
1466         memcmp(key,tempkey+1,keylen)) {
1467       ERROR("key mismatch: target key=%s, found %s\n",key,tempkey);
1468       palacios_free(tempkey);
1469       close_key_file(mks,ms);
1470       mks->stype=STREAM_TEXTFILE;
1471       return 0;
1472     }
1473
1474     // key match done, success
1475
1476     mks->stype=STREAM_TEXTFILE;
1477     ms->stype=STREAM_TEXTFILE;
1478
1479     palacios_free(tempkey);
1480
1481     return ms;
1482     
1483   } else {
1484     ERROR("Unknown open type in open_key_textfile\n");
1485     ms->stype=STREAM_FILE;
1486     close_key_file(mks,ms);
1487     return 0;
1488   }
1489
1490 }
1491
1492
1493
1494 static void close_key_textfile(v3_keyed_stream_t stream, 
1495                                v3_keyed_stream_key_t key)
1496 {
1497   textfile_keyed_stream *mks = stream;
1498   textfile_stream *ms=key;
1499
1500   mks->stype=STREAM_FILE;
1501   ms->stype=STREAM_FILE;
1502
1503   close_key_file(mks,ms);
1504
1505   mks->stype=STREAM_TEXTFILE;
1506
1507 }
1508
1509
1510 static sint64_t read_key_textfile(v3_keyed_stream_t stream, 
1511                                   v3_keyed_stream_key_t key,
1512                                   void *tag,
1513                                   sint64_t taglen,
1514                                   void *buf,
1515                                   sint64_t len)
1516 {
1517     textfile_stream *ms = (textfile_stream *) key;
1518     char tags[32];
1519     char *temptag;
1520
1521
1522
1523     memcpy(tags,tag,taglen<31 ? taglen : 31);
1524     tags[taglen<32? taglen : 31 ]=0;
1525     
1526     temptag=palacios_alloc(taglen+1);
1527     if (!temptag) { 
1528       ERROR("Unable to allocate temptag in textfile read key\n");
1529       return -1;
1530     }
1531
1532     ms->stype=STREAM_FILE;
1533     
1534     if (token_scan(ms,temptag,taglen+1," \t\r\n=",5)) { 
1535       ERROR("Cannot scan for token (tag search)\n");
1536       ms->stype=STREAM_TEXTFILE;
1537       palacios_free(temptag);
1538       return -1;
1539     }
1540
1541     if (memcmp(tag,temptag,taglen)) { 
1542       ERROR("Tag mismatch in reading tag from textfile: desired tag=%s, actual tag=%s\n",tags,temptag);
1543       ms->stype=STREAM_TEXTFILE;
1544       palacios_free(temptag);
1545       return -1;
1546     }
1547
1548     // tag matches, let's go and find our =
1549     palacios_free(temptag);
1550
1551     if (search_for(ms,'=')) { 
1552       ERROR("Unable to find = sign in tag data parse (tag=%s)\n", tags);
1553       ms->stype=STREAM_TEXTFILE;
1554       return -1;
1555     }
1556
1557
1558 #if TEXTFILE_OPT_HEX
1559     if (get_hexbytes_as_data(ms,buf,len)) { 
1560       ERROR("Cannot read data in hex format (opt path) in textfile for tag %s\n",tags);
1561       ms->stype=STREAM_TEXTFILE;
1562       return -1;
1563     }
1564 #else
1565     if (get_hexbytes_as_data_skip(ms,buf,len," \t\r\n",4)) { 
1566       ERROR("Cannot read data in hex format (unopt path) in textfile for tag %s\n",tags);
1567       ms->stype=STREAM_TEXTFILE;
1568       return -1;
1569     }
1570 #endif
1571
1572     ms->stype=STREAM_TEXTFILE;
1573
1574     return len;
1575 }
1576
1577 static sint64_t write_key_textfile(v3_keyed_stream_t stream, 
1578                                    v3_keyed_stream_key_t key,
1579                                    void *tag,
1580                                    sint64_t taglen,
1581                                    void *buf,
1582                                    sint64_t len)
1583 {
1584     textfile_stream *ms = (textfile_stream *) key;
1585     char tags[32];
1586
1587
1588
1589     memcpy(tags,tag,taglen<31 ? taglen : 31);
1590     tags[taglen<32? taglen : 31 ]=0;
1591
1592     /*    if (taglen>100000 || len>100000) { 
1593       ERROR("Too big\n");
1594       return -1;
1595     }
1596     */
1597
1598     ms->stype=STREAM_FILE;
1599
1600     if (put_string_n(ms,tag,taglen)) { 
1601       ERROR("Cannot write tag %s in textfile\n",tags);
1602       ms->stype=STREAM_TEXTFILE;
1603       return -1;
1604     }
1605
1606     if (put_string(ms,"=")) { 
1607       ERROR("Cannot write = in textfile for tag %s\n",tags);
1608       ms->stype=STREAM_TEXTFILE;
1609       return -1;
1610     }
1611
1612     if (put_data_as_hexbytes(ms,buf,len)) { 
1613       ERROR("Cannot write data in hex format in textfile for tag %s\n",tags);
1614       ms->stype=STREAM_TEXTFILE;
1615       return -1;
1616     }
1617
1618     if (put_string(ms,"\n")) { 
1619       ERROR("Cannot write trailing lf in textfile for tag %s\n",tags);
1620       ms->stype=STREAM_TEXTFILE;
1621       return -1;
1622     }
1623
1624     ms->stype=STREAM_TEXTFILE;
1625
1626     return len;
1627 }
1628
1629
1630
1631 /***************************************************************************************************
1632   User implementation   ("user:")
1633 *************************************************************************************************/
1634
1635
1636 // List of all user keyed stream connections for the guest
1637 struct user_keyed_streams {
1638     spinlock_t lock;
1639     struct list_head streams;
1640 };
1641
1642
1643 // A single keyed stream connection to user space
1644 struct user_keyed_stream {
1645     int stype;
1646     v3_keyed_stream_open_t otype;
1647
1648     char *url;
1649     spinlock_t lock;
1650     int waiting;
1651
1652     wait_queue_head_t user_wait_queue;
1653     wait_queue_head_t host_wait_queue;
1654
1655     struct palacios_user_keyed_stream_op *op;
1656
1657     struct list_head node;
1658 };
1659
1660
1661 //
1662 // List of all of the user streams
1663 //
1664 static struct user_keyed_streams *user_streams;
1665
1666
1667
1668 static int resize_op(struct palacios_user_keyed_stream_op **op, uint64_t buf_len)
1669 {
1670     struct palacios_user_keyed_stream_op *old = *op;
1671     struct palacios_user_keyed_stream_op *new;
1672     
1673     if (!old) {
1674         new = palacios_alloc(sizeof(struct palacios_user_keyed_stream_op)+buf_len);
1675         if (!new) { 
1676             return -1;
1677         } else {
1678             new->len=sizeof(struct palacios_user_keyed_stream_op)+buf_len;
1679             new->buf_len=buf_len;
1680             *op=new;
1681             return 0;
1682         }
1683     } else {
1684         if ((old->len-sizeof(struct palacios_user_keyed_stream_op)) >= buf_len) { 
1685             old->buf_len=buf_len;
1686             return 0;
1687         } else {
1688             palacios_free(old);
1689             *op = 0 ;
1690             return resize_op(op,buf_len);
1691         }
1692     }
1693 }
1694
1695 //
1696 // The assumption is that we enter this with the stream locked
1697 // and we will return with it locked;  additionally, the op structure
1698 // will be overwritten with the response
1699 // 
1700 static int do_request_to_response(struct user_keyed_stream *s, unsigned long *flags)
1701 {
1702
1703     if (s->waiting) {
1704         ERROR("user keyed stream request attempted while one is already in progress on %s\n",s->url);
1705         return -1;
1706     }
1707
1708     // we are now waiting for a response
1709     s->waiting = 1;
1710
1711     // release the stream
1712     palacios_spinlock_unlock_irqrestore(&(s->lock), *flags);
1713
1714     // wake up anyone waiting on it
1715     wake_up_interruptible(&(s->user_wait_queue));
1716
1717     // wait for someone to give us a response
1718     while (wait_event_interruptible(s->host_wait_queue, (s->waiting == 0)) != 0) {}
1719
1720     // reacquire the lock for our called
1721     palacios_spinlock_lock_irqsave(&(s->lock), *flags);
1722
1723     return 0;
1724 }
1725
1726 //
1727 // The assumption is that we enter this with the stream locked
1728 // and we will return with it UNlocked
1729 // 
1730 static int do_response_to_request(struct user_keyed_stream *s, unsigned long *flags)
1731 {
1732
1733     if (!(s->waiting)) {
1734         ERROR("user keyed stream response while no request is in progress on %s\n",s->url);
1735         return -1;
1736     }
1737
1738     // we are now waiting for a request
1739     s->waiting = 0;
1740
1741     // release the stream
1742     palacios_spinlock_unlock_irqrestore(&(s->lock), *flags);
1743
1744     // wake up anyone waiting on it
1745     wake_up_interruptible(&(s->host_wait_queue));
1746     
1747     return 0;
1748 }
1749
1750
1751
1752 static unsigned int keyed_stream_poll_user(struct file *filp, poll_table *wait)
1753 {
1754     struct user_keyed_stream *s = (struct user_keyed_stream *) (filp->private_data);
1755     unsigned long flags;
1756     
1757     if (!s) {
1758         return POLLERR;
1759     }
1760     
1761     palacios_spinlock_lock_irqsave(&(s->lock), flags);
1762
1763     poll_wait(filp, &(s->user_wait_queue), wait);
1764
1765     if (s->waiting) {
1766         palacios_spinlock_unlock_irqrestore(&(s->lock), flags);
1767         return POLLIN | POLLRDNORM;
1768     }
1769     
1770     palacios_spinlock_unlock_irqrestore(&(s->lock), flags);
1771
1772     return 0;
1773 }
1774
1775 static long keyed_stream_ioctl_user(struct file * filp, unsigned int ioctl, unsigned long arg)
1776 {
1777     void __user *argp = (void __user *)arg;
1778     unsigned long flags;
1779     uint64_t size;
1780     
1781     struct user_keyed_stream *s = (struct user_keyed_stream *) (filp->private_data);
1782     
1783     switch (ioctl) {
1784
1785         case V3_KSTREAM_REQUEST_SIZE_IOCTL:
1786             
1787             // inform request size
1788             
1789             palacios_spinlock_lock_irqsave(&(s->lock), flags);
1790             
1791             if (!(s->waiting)) {
1792                 palacios_spinlock_unlock_irqrestore(&(s->lock), flags);
1793                 return 0;
1794             }
1795
1796             size =  sizeof(struct palacios_user_keyed_stream_op) + s->op->buf_len;
1797             
1798             if (copy_to_user((void * __user) argp, &size, sizeof(uint64_t))) {
1799                 palacios_spinlock_unlock_irqrestore(&(s->lock), flags);
1800                 ERROR("palacios user key size request failed to copy data\n");
1801                 return -EFAULT;
1802             }
1803             
1804             palacios_spinlock_unlock_irqrestore(&(s->lock), flags);
1805             
1806             return 1;
1807             
1808             break;
1809
1810         case V3_KSTREAM_REQUEST_PULL_IOCTL: 
1811                 
1812             // pull the request
1813             
1814             palacios_spinlock_lock_irqsave(&(s->lock), flags);
1815
1816             if (!(s->waiting)) {
1817                 palacios_spinlock_unlock_irqrestore(&(s->lock), flags);
1818                 ERROR("palacios user key pull request when not waiting\n");
1819                 return 0;
1820             }
1821
1822             size =  sizeof(struct palacios_user_keyed_stream_op) + s->op->buf_len;
1823
1824
1825             if (copy_to_user((void __user *) argp, s->op, size)) {
1826                 palacios_spinlock_unlock_irqrestore(&(s->lock), flags);
1827                 ERROR("palacios user key pull request failed to copy data\n");
1828                 return -EFAULT;
1829             }
1830
1831             palacios_spinlock_unlock_irqrestore(&(s->lock), flags);
1832             
1833             return 1;
1834             
1835          
1836             break;
1837
1838     case V3_KSTREAM_RESPONSE_PUSH_IOCTL:
1839
1840         // push the response
1841
1842         palacios_spinlock_lock_irqsave(&(s->lock), flags);
1843
1844         if (!(s->waiting)) {
1845             palacios_spinlock_unlock_irqrestore(&(s->lock), flags);
1846             ERROR("palacios user key push response when not waiting\n");
1847             return 0;
1848         }
1849         
1850         if (copy_from_user(&size, (void __user *) argp, sizeof(uint64_t))) {
1851             ERROR("palacios user key push response failed to copy size\n");
1852             palacios_spinlock_unlock_irqrestore(&(s->lock), flags);
1853             return -EFAULT;
1854         }
1855
1856         if (resize_op(&(s->op),size-sizeof(struct palacios_user_keyed_stream_op))) {
1857             ERROR("unable to resize op in user key push response\n");
1858             palacios_spinlock_unlock_irqrestore(&(s->lock), flags);
1859             return -EFAULT;
1860         }
1861
1862         if (copy_from_user(s->op, (void __user *) argp, size)) {
1863             palacios_spinlock_unlock_irqrestore(&(s->lock), flags);
1864             return -EFAULT;
1865         }
1866
1867         do_response_to_request(s,&flags);
1868         // this will have unlocked s for us
1869
1870         return 1;
1871
1872         break;
1873         
1874     default:
1875         ERROR("unknown ioctl in user keyed stream\n");
1876
1877         return -EFAULT;
1878
1879         break;
1880         
1881     }
1882 }
1883
1884
1885 static int keyed_stream_release_user(struct inode *inode, struct file *filp)
1886 {
1887     struct user_keyed_stream *s = filp->private_data;
1888     unsigned long f1,f2;
1889
1890     palacios_spinlock_lock_irqsave(&(user_streams->lock),f1);
1891     palacios_spinlock_lock_irqsave(&(s->lock), f2);
1892
1893     list_del(&(s->node));
1894
1895     palacios_spinlock_unlock_irqrestore(&(s->lock), f2);
1896     palacios_spinlock_unlock_irqrestore(&(user_streams->lock), f1);
1897     
1898     palacios_free(s->url);
1899     palacios_free(s);
1900
1901     return 0;
1902 }
1903
1904 static struct file_operations user_keyed_stream_fops = {
1905     .poll = keyed_stream_poll_user,
1906     .compat_ioctl = keyed_stream_ioctl_user,
1907     .unlocked_ioctl = keyed_stream_ioctl_user,
1908     .release = keyed_stream_release_user,
1909 };
1910
1911
1912 /*
1913   user_keyed_streams are allocated on user connect, and deallocated on user release
1914   
1915   palacios-side opens and closes only manipulate the open type
1916 */
1917
1918 int keyed_stream_connect_user(struct v3_guest *guest, unsigned int cmd, unsigned long arg, void *priv_data)
1919 {
1920     int fd;
1921     unsigned long flags;
1922     char *url;
1923     uint64_t len;
1924     struct user_keyed_stream *s;
1925     
1926     if (!user_streams) { 
1927         ERROR("no user space keyed streams!\n");
1928         return -1;
1929     }
1930
1931     // get the url
1932     if (copy_from_user(&len,(void __user *)arg,sizeof(len))) { 
1933         ERROR("cannot copy url len from user\n");
1934         return -1;
1935     }
1936
1937     url = palacios_alloc(len);
1938     
1939     if (!url) { 
1940         ERROR("cannot allocate url for user keyed stream\n");
1941         return -1;
1942     }
1943
1944     if (copy_from_user(url,((void __user *)arg)+sizeof(len),len)) {
1945         ERROR("cannot copy url from user\n");
1946         return -1;
1947     }
1948     url[len-1]=0;
1949         
1950     
1951     // Check for duplicate handler
1952     palacios_spinlock_lock_irqsave(&(user_streams->lock), flags);
1953     list_for_each_entry(s, &(user_streams->streams), node) {
1954         if (!strncasecmp(url, s->url, len)) {
1955             ERROR("user keyed stream connection with url \"%s\" already exists\n", url);
1956             palacios_free(url);
1957             return -1;
1958         }
1959     }
1960     palacios_spinlock_unlock_irqrestore(&(user_streams->lock), flags);
1961     
1962     // Create connection
1963     s = palacios_alloc(sizeof(struct user_keyed_stream));
1964     
1965     if (!s) {
1966         ERROR("cannot allocate new user keyed stream for %s\n",url);
1967         palacios_free(url);
1968         return -1;
1969     }
1970     
1971     
1972     // Get file descriptor
1973     fd = anon_inode_getfd("v3-kstream", &user_keyed_stream_fops, s, 0);
1974
1975     if (fd < 0) {
1976         ERROR("cannot allocate file descriptor for new user keyed stream for %s\n",url);
1977         palacios_free(s);
1978         palacios_free(url);
1979         return -1;
1980     }
1981     
1982     memset(s, 0, sizeof(struct user_keyed_stream));
1983     
1984     s->stype=STREAM_USER;
1985     s->url=url;
1986     
1987     init_waitqueue_head(&(s->user_wait_queue));
1988     init_waitqueue_head(&(s->host_wait_queue));
1989     
1990     // Insert connection into list
1991     palacios_spinlock_lock_irqsave(&(user_streams->lock), flags);
1992     list_add(&(s->node), &(user_streams->streams));
1993     palacios_spinlock_unlock_irqrestore(&(user_streams->lock), flags);
1994     
1995     return fd;
1996 }
1997     
1998 static struct user_keyed_stream *keyed_stream_user_find(char *url)
1999 {
2000     unsigned long flags;
2001     struct user_keyed_stream *s;
2002     
2003     if (!user_streams) { 
2004         ERROR("no user space keyed streams available\n");
2005         return NULL;
2006     }
2007     
2008     palacios_spinlock_lock_irqsave(&(user_streams->lock), flags);
2009     list_for_each_entry(s, &(user_streams->streams), node) {
2010         if (!strcasecmp(url, s->url)) {
2011             palacios_spinlock_unlock_irqrestore(&(user_streams->lock), flags);
2012             return s;
2013         }
2014     }
2015     
2016     palacios_spinlock_unlock_irqrestore(&(user_streams->lock), flags);
2017     
2018     return NULL;
2019 }
2020     
2021     
2022 static v3_keyed_stream_t open_stream_user(char *url, v3_keyed_stream_open_t ot)
2023 {
2024     unsigned long flags;
2025     struct user_keyed_stream *s;
2026     
2027     s = keyed_stream_user_find(url);
2028     
2029     if (!s) {
2030         ERROR("cannot open user stream %s as it does not exist yet\n",url);
2031         return NULL;
2032     }
2033
2034     palacios_spinlock_lock_irqsave(&(s->lock), flags);
2035
2036     if (s->waiting) {
2037         palacios_spinlock_unlock_irqrestore(&(s->lock), flags);
2038         ERROR("cannot open user stream %s as it is already in waiting state\n",url);
2039         return NULL;
2040     }
2041     
2042     s->otype = ot==V3_KS_WR_ONLY_CREATE ? V3_KS_WR_ONLY : ot;
2043     
2044     palacios_spinlock_unlock_irqrestore(&(s->lock), flags);
2045     
2046     return s;
2047     
2048 }
2049     
2050 // close stream does not do anything.  Creation of the stream and its cleanup
2051 // are driven by the user side, not the palacios side
2052 // might eventually want to reference count this, though
2053 static void close_stream_user(v3_keyed_stream_t stream)
2054 {
2055     return;
2056 }
2057
2058 static void preallocate_hint_key_user(v3_keyed_stream_t stream,
2059                                       char *key,
2060                                       uint64_t size)
2061 {
2062     return;
2063 }
2064
2065
2066
2067
2068 static v3_keyed_stream_key_t open_key_user(v3_keyed_stream_t stream, char *key)
2069 {
2070     unsigned long flags;
2071     struct user_keyed_stream *s = (struct user_keyed_stream *) stream;
2072     uint64_t   len = strlen(key)+1;
2073     void *user_key;
2074
2075     palacios_spinlock_lock_irqsave(&(s->lock), flags);
2076
2077
2078     if (resize_op(&(s->op),len)) {
2079         palacios_spinlock_unlock_irqrestore(&(s->lock),flags);
2080         ERROR("cannot resize op in opening key %s on user keyed stream %s\n",key,s->url);
2081         return NULL;
2082     }
2083
2084     s->op->type = PALACIOS_KSTREAM_OPEN_KEY;
2085     s->op->buf_len = len;
2086     strncpy(s->op->buf,key,len); // will terminate buffer
2087
2088     // enter with it locked
2089     if (do_request_to_response(s,&flags)) { 
2090         palacios_spinlock_unlock_irqrestore(&(s->lock),flags);
2091         ERROR("request/response handling failed\n");
2092         return NULL;
2093     }
2094     // return with it locked
2095
2096     user_key=s->op->user_key;
2097
2098     palacios_spinlock_unlock_irqrestore(&(s->lock),flags);
2099
2100     return user_key;
2101 }
2102
2103 static void close_key_user(v3_keyed_stream_t stream, v3_keyed_stream_key_t key)
2104 {
2105     struct user_keyed_stream *s = (struct user_keyed_stream *) stream;
2106     uint64_t   len = 0;
2107     unsigned long flags;
2108     
2109     palacios_spinlock_lock_irqsave(&(s->lock), flags);
2110
2111     if (resize_op(&(s->op),len)) {
2112         palacios_spinlock_unlock_irqrestore(&(s->lock),flags);
2113         ERROR("cannot resize op in closing key 0x%p on user keyed stream %s\n",key,s->url);
2114         return;
2115     }
2116
2117     s->op->type = PALACIOS_KSTREAM_CLOSE_KEY;
2118     s->op->buf_len = len;
2119     s->op->user_key = key;
2120
2121     // enter with it locked
2122     if (do_request_to_response(s,&flags)) { 
2123         palacios_spinlock_unlock_irqrestore(&(s->lock),flags);
2124         ERROR("request/response handling failed\n");
2125         return;
2126     }
2127     // return with it locked
2128
2129     palacios_spinlock_unlock_irqrestore(&(s->lock),flags);
2130
2131     return;
2132 }
2133
2134
2135
2136 static sint64_t read_key_user(v3_keyed_stream_t stream, v3_keyed_stream_key_t key,
2137                               void *tag,
2138                               sint64_t taglen,
2139                               void *buf, sint64_t rlen)
2140 {
2141
2142     struct user_keyed_stream *s = (struct user_keyed_stream *) stream;
2143     uint64_t   len = taglen ;
2144     sint64_t   xfer;
2145     unsigned long flags;
2146
2147     palacios_spinlock_lock_irqsave(&(s->lock), flags);
2148
2149     if (s->otype != V3_KS_RD_ONLY) { 
2150         palacios_spinlock_unlock_irqrestore(&(s->lock),flags);
2151         ERROR("attempt to read key from stream that is not in read state on %s\n",s->url);
2152     }   
2153
2154
2155     if (resize_op(&(s->op),len)) {
2156         palacios_spinlock_unlock_irqrestore(&(s->lock),flags);
2157         ERROR("cannot resize op in reading key 0x%p on user keyed stream %s\n",key,s->url);
2158         return -1;
2159     }
2160
2161     s->op->type = PALACIOS_KSTREAM_READ_KEY;
2162     s->op->buf_len = len ;
2163     s->op->xfer = rlen;
2164     s->op->user_key = key;
2165     s->op->data_off = taglen;
2166
2167     memcpy(s->op->buf,tag,taglen);
2168
2169     // enter with it locked
2170     if (do_request_to_response(s,&flags)) { 
2171         palacios_spinlock_unlock_irqrestore(&(s->lock),flags);
2172         ERROR("request/response handling failed\n");
2173         return -1;
2174     }
2175     // return with it locked
2176
2177
2178     if (s->op->xfer>0) { 
2179         // data_off must be zero
2180         memcpy(buf,s->op->buf,s->op->xfer);
2181     }
2182
2183     xfer=s->op->xfer;
2184
2185     palacios_spinlock_unlock_irqrestore(&(s->lock),flags);
2186
2187     return xfer;
2188 }
2189
2190
2191 static sint64_t write_key_user(v3_keyed_stream_t stream, v3_keyed_stream_key_t key,
2192                                void *tag,
2193                                sint64_t taglen,
2194                                void *buf, sint64_t wlen)
2195 {
2196
2197     struct user_keyed_stream *s = (struct user_keyed_stream *) stream;
2198     uint64_t   len = taglen + wlen ;
2199     sint64_t   xfer;
2200     unsigned long flags;
2201
2202
2203     palacios_spinlock_lock_irqsave(&(s->lock), flags);
2204
2205     if (s->otype != V3_KS_WR_ONLY) { 
2206         palacios_spinlock_unlock_irqrestore(&(s->lock),flags);
2207         ERROR("attempt to write key on stream that is not in write state on %s\n",s->url);
2208     }   
2209
2210     if (resize_op(&(s->op),len)) {
2211         palacios_spinlock_unlock_irqrestore(&(s->lock),flags);
2212         ERROR("cannot resize op in reading key 0x%p on user keyed stream %s\n",key,s->url);
2213         return -1;
2214     }
2215
2216     s->op->type = PALACIOS_KSTREAM_WRITE_KEY;
2217     s->op->buf_len = len;
2218     s->op->xfer = wlen;
2219     s->op->user_key = key;
2220     s->op->data_off = taglen;
2221
2222     memcpy(s->op->buf,tag,taglen);
2223     memcpy(s->op->buf+taglen,buf,wlen);
2224
2225     // enter with it locked
2226     if (do_request_to_response(s,&flags)) { 
2227         palacios_spinlock_unlock_irqrestore(&(s->lock),flags);
2228         ERROR("request/response handling failed\n");
2229         return -1;
2230     }
2231     // return with it locked
2232
2233     // no data comes back, xfer should be size of data write (not tag)
2234
2235     xfer=s->op->xfer;
2236
2237     palacios_spinlock_unlock_irqrestore(&(s->lock),flags);
2238
2239     return xfer;
2240 }
2241
2242
2243
2244 /****************************************************************************************
2245  *    Network-based implementation  ("net:")
2246  *****************************************************************************************/
2247
2248
2249 #define NET_MAX_KEY_LEN 128
2250
2251 struct net_keyed_stream {
2252     int stype;
2253     int ot;
2254     struct net_stream * ns;
2255 };
2256
2257 struct net_stream {
2258     int stype;
2259     struct socket *sock;
2260 };
2261
2262
2263 //ignore the arguments given here currently
2264 static struct net_stream * create_net_stream(void) 
2265 {
2266     struct net_stream * ns = NULL;
2267
2268     ns = palacios_alloc(sizeof(struct net_stream));
2269     
2270     if (!ns) { 
2271         ERROR("Cannot allocate a net_stream\n");
2272         return 0;
2273     }
2274
2275     memset(ns, 0, sizeof(struct net_stream));
2276
2277     ns->stype = STREAM_NETWORK;
2278
2279     return ns;
2280 }
2281
2282 static void close_socket(v3_keyed_stream_t stream)
2283 {
2284     struct net_keyed_stream *nks = (struct net_keyed_stream *) stream;
2285
2286     if (nks) { 
2287         struct net_stream *ns = nks->ns;
2288
2289         if (ns) {
2290             ns->sock->ops->release(ns->sock);
2291             palacios_free(ns);
2292             ERROR("Close Socket\n");
2293         }
2294         
2295         palacios_free(ns);
2296     }
2297 }
2298
2299
2300 static void close_stream_net(v3_keyed_stream_t stream)
2301 {
2302         close_socket(stream);
2303 }
2304
2305 static int connect_to_ip(struct net_stream *ns, int hostip, int port)
2306 {
2307     struct sockaddr_in client;
2308
2309     if (ns == NULL) {
2310         return -1;
2311     }
2312
2313     if (sock_create(PF_INET,SOCK_STREAM,IPPROTO_TCP,&(ns->sock))<0) { 
2314         ERROR("Cannot create accept socket\n");
2315         return -1;
2316     }
2317         
2318
2319     client.sin_family = AF_INET;
2320     client.sin_port = htons(port);
2321     client.sin_addr.s_addr = hostip;//in_aton(hostip);
2322
2323     return ns->sock->ops->connect(ns->sock, (struct sockaddr *)&client, sizeof(client), 0);
2324 }
2325
2326 static int send_msg(struct net_stream *ns,  char * buf, int len)
2327 {
2328     int left=len;
2329
2330     if (!ns) { 
2331         ERROR("Send message on null net_stream\n");
2332         return -1;
2333     }
2334
2335     if (!(ns->sock)) { 
2336         ERROR("Send message on net_stream without socket\n");
2337         return -1;
2338     }
2339
2340     while (left>0) {
2341
2342         struct msghdr msg;
2343         mm_segment_t oldfs;
2344         struct iovec iov;
2345         int err = 0;
2346         
2347
2348         msg.msg_flags = MSG_NOSIGNAL;//MSG_DONTWAIT;
2349         msg.msg_name = 0;
2350         msg.msg_namelen = 0;
2351         msg.msg_control = NULL;
2352         msg.msg_controllen = 0;
2353         msg.msg_iov = &iov;
2354         msg.msg_iovlen = 1;
2355
2356         iov.iov_base = (char *)&(buf[len-left]);
2357         iov.iov_len = (size_t)left;
2358
2359         oldfs = get_fs();
2360         set_fs(KERNEL_DS);
2361
2362         err = sock_sendmsg(ns->sock, &msg, (size_t)left);
2363
2364         set_fs(oldfs);
2365         
2366         if (err<0) {
2367             ERROR("Send msg error %d\n",err);
2368             return err;
2369         } else {
2370             left-=len;
2371         }
2372     }
2373
2374     return len;
2375 }
2376
2377
2378
2379 static int recv_msg(struct net_stream *ns, char * buf, int len)
2380 {
2381
2382     int left=len;
2383
2384     if (!ns) { 
2385         ERROR("Receive message on null net_stream\n");
2386         return -1;
2387     }
2388
2389     if (!(ns->sock)) { 
2390         ERROR("Receive  message on net_stream without socket\n");
2391         return -1;
2392     }
2393     
2394     
2395     while (left>0) {
2396         
2397         struct msghdr msg;
2398         mm_segment_t oldfs;
2399         struct iovec iov;
2400         int err;
2401         
2402         msg.msg_flags = 0;
2403         msg.msg_name = 0;
2404         msg.msg_namelen = 0;
2405         msg.msg_control = NULL;
2406         msg.msg_controllen = 0;
2407         msg.msg_iov = &iov;
2408         msg.msg_iovlen = 1;
2409         
2410         iov.iov_base = (void *)&(buf[len-left]);
2411         iov.iov_len = (size_t)left;
2412         
2413         oldfs = get_fs();
2414         set_fs(KERNEL_DS);
2415         
2416         err = sock_recvmsg(ns->sock, &msg, (size_t)left, 0);
2417         
2418         set_fs(oldfs);
2419         
2420         if (err<0) { 
2421             return err;
2422         } else {
2423             left -= err;
2424         }
2425     }
2426     return len;
2427 }
2428
2429 static struct net_stream * accept_once(struct net_stream * ns, const int port)
2430 {
2431     struct socket *accept_sock;
2432     struct sockaddr_in addr;
2433     int err;
2434     
2435     if (!ns) { 
2436         ERROR("Accept called on null net_stream\n");
2437         return 0;
2438     }
2439     
2440     if (sock_create(PF_INET,SOCK_STREAM,IPPROTO_TCP,&accept_sock)<0) { 
2441         ERROR("Cannot create accept socket\n"); 
2442         return NULL;
2443     }
2444     
2445
2446     addr.sin_family = AF_INET;
2447     addr.sin_port = htons(port);
2448     addr.sin_addr.s_addr = INADDR_ANY;
2449     
2450     err = accept_sock->ops->bind(accept_sock, (struct sockaddr *)&addr, sizeof(addr));
2451     
2452     if (err<0) {
2453         ERROR("Bind err: %d\n",err);
2454         return NULL;
2455     }
2456
2457     err = accept_sock->ops->listen(accept_sock,2);
2458     
2459     if (err<0) {
2460         ERROR("Listen err: %d\n",err);
2461         return NULL;
2462     }
2463     
2464     // Init the socket in the network strream
2465
2466     if (sock_create(PF_INET,SOCK_STREAM,IPPROTO_TCP,&(ns->sock))<0) { 
2467         ERROR("Cannot create socket\n");
2468         return NULL;
2469     }
2470     
2471     
2472     // Do the actual accept 
2473
2474     if (accept_sock->ops->accept(accept_sock,ns->sock,0)<0) {
2475         ERROR("accept failed");
2476         return NULL;
2477     }
2478     
2479     // close the accept socket
2480     accept_sock->ops->release(accept_sock);
2481     palacios_free(accept_sock);
2482
2483     return ns;
2484 }
2485
2486
2487 static struct v3_keyed_stream_t * open_stream_net(char * url,v3_keyed_stream_open_t ot)
2488 {
2489     struct net_keyed_stream * nks;
2490     int url_len;
2491     int i;
2492     int delimit[3];
2493     int k;
2494     char mode;
2495     int ip_len;
2496     int port_len;
2497
2498     nks = palacios_alloc(sizeof(struct net_keyed_stream)); 
2499
2500     if (!nks) { 
2501         ERROR("Could not allocate space in open_stream_net\n");
2502         return 0;
2503     }
2504     
2505     nks->ot = ot == V3_KS_WR_ONLY_CREATE ? V3_KS_WR_ONLY : ot;
2506
2507     nks->stype = STREAM_NETWORK; 
2508
2509     nks->ns = create_net_stream();
2510     
2511     if (!(nks->ns)) { 
2512         ERROR("Could not create network stream\n");
2513         palacios_free(nks);
2514         return 0;
2515     }
2516
2517     url_len=strlen(url);
2518     k=0;
2519
2520
2521     for(i = 0; i < url_len;i++){
2522         if(url[i] == ':'){
2523             delimit[k] = i;
2524             k++;        
2525         }
2526     }
2527
2528     mode = url[delimit[0] + 1];
2529     ip_len = delimit[2] - delimit[1];
2530     port_len = url_len - delimit[2];
2531
2532
2533     {
2534         char ip[ip_len];
2535         char port[port_len];
2536         int host_ip;
2537         int host_port;
2538
2539
2540         strncpy(ip,url + delimit[1]+1,ip_len-1);
2541         ip[ip_len-1]='\0';
2542         
2543         host_ip = in_aton(ip);
2544         
2545         strncpy(port,url+ delimit[2]+1,port_len-1);
2546         port[port_len-1]='\0';
2547         
2548         host_port = simple_strtol(port,NULL,10);
2549
2550         INFO("ip is %s\n",ip); 
2551         INFO("host_ip is %x\n", host_ip);
2552         INFO("port is %s (%d)\n",port,host_port);
2553         
2554         if (mode == 'a'){
2555             // accept a request
2556             INFO("Accepting Connection on INADDR_ANY port:%d\n",host_port);
2557             nks->ns = accept_once(nks->ns, host_port);
2558         } else if (mode == 'c'){
2559             // call connect to ip
2560             INFO("Connecting to %s:%d\n",ip,host_port);
2561             connect_to_ip(nks->ns,host_ip, host_port);
2562         } else {
2563             ERROR("Mode not recognized\n");
2564             palacios_free(nks);
2565             return NULL;
2566         }
2567         
2568         return (v3_keyed_stream_t)nks;
2569     }
2570 }
2571
2572 static void preallocate_hint_key_net(v3_keyed_stream_t stream, char *key,uint64_t size)
2573 {
2574     //do nothing
2575 }
2576
2577 static v3_keyed_stream_key_t open_key_net(v3_keyed_stream_t stream,char *key)
2578 {
2579    struct net_keyed_stream * nks = (struct net_keyed_stream *)stream;
2580
2581    // reciever of the key name 
2582    if (nks->ot==V3_KS_WR_ONLY)
2583    {
2584        unsigned short keylen = strlen(key);
2585
2586        if (keylen>NET_MAX_KEY_LEN || keylen>=32768) { 
2587            ERROR("Key is too long\n");
2588            return NULL;
2589        }
2590
2591        {
2592            // on-stack allocation here demands that we
2593            // keep key length low...
2594            char msg[keylen+3];
2595            int next = 0;
2596            
2597            // Opening a key for writing sends a notice of the 
2598            // key length and the key name on the channel
2599            
2600            msg[next++]=keylen & 0xFF;
2601            msg[next]=(keylen>>8) & 0xFF;
2602            // Set flag bit
2603            msg[next]=msg[next] | 0x80; // 0x80 is 128 and OR will flip leading bit to 1
2604            
2605            strncpy(msg+2,key,keylen);  // will also copy trailing zero
2606            
2607            if (send_msg(nks->ns,msg,keylen+2) != keylen+2) { 
2608                ERROR("Unable to open key for writing on net_stream (send key len+name)\n");
2609                return NULL;
2610            }
2611        }
2612    }
2613
2614    if (nks->ot==V3_KS_RD_ONLY)   {
2615        char msg_info[2];
2616        int next = 0;
2617        int keylen = 0;
2618
2619        if (recv_msg(nks->ns,msg_info,2) != 2) { 
2620            ERROR("Unable to open key for reading on net_stream (recv key len)\n");
2621            return NULL;
2622        }
2623
2624        next = 0;
2625        keylen = 0;
2626
2627        keylen |= msg_info[next++];
2628
2629        if ((msg_info[next] & 0x80) != 0x80)  {
2630            ERROR("Flag bit not set on receive of key length\n");
2631            return NULL;
2632        } else {
2633            msg_info[next] &= 0x7F; // flip the msb back to zero (clear flag)
2634        }
2635        
2636        keylen |= msg_info[next]<<8;
2637
2638        if (keylen > NET_MAX_KEY_LEN) { 
2639            ERROR("Received key length is too big\n");
2640            return NULL;
2641        }
2642        
2643        {
2644            
2645            char msg[keylen+1];
2646            
2647            if (recv_msg(nks->ns,msg,keylen) != keylen) { 
2648                ERROR("Unable to receive key\n");
2649                return NULL;
2650            }
2651            msg[keylen]=0;
2652            
2653            if (strncmp(key,msg,keylen)!=0) {
2654                ERROR("Key mismatch in open_key_net - expect %s but got %s\n",key,msg);
2655                return NULL;
2656            }
2657        }
2658    }
2659    
2660    return (v3_keyed_stream_key_t)key;
2661 }
2662
2663 static void close_key_net(v3_keyed_stream_t stream, v3_keyed_stream_key_t input_key)
2664 {
2665     char * key = (char*)input_key;
2666     struct net_keyed_stream * nks = (struct net_keyed_stream *)stream;
2667
2668     
2669     if (nks->ot==V3_KS_WR_ONLY) {
2670         unsigned short keylen = strlen(key);
2671
2672         if (keylen > NET_MAX_KEY_LEN || keylen>=32768) {
2673             ERROR("Key length too long in close_key_net\n");
2674             return;
2675         }
2676
2677         {
2678             char msg[keylen+3];
2679             int next = 0;
2680             
2681             msg[next++]=keylen & 0xFF;
2682             msg[next]=(keylen>>8) & 0xFF;
2683             // flag
2684             msg[next]=msg[next] | 0x80; // 0x80 is 128 and OR will filp leading bit to 1
2685             strncpy(msg+2,key,keylen); // will copy the zero
2686             msg[keylen+2]=0;
2687             if (send_msg(nks->ns,msg,keylen+2)!=keylen+2) { 
2688                 ERROR("Cannot send key on close_key_net\n");
2689                 return;
2690             }
2691         }
2692     }
2693     
2694     if (nks->ot==V3_KS_RD_ONLY)   {
2695         char msg_info[2];
2696         int next;
2697         int keylen;
2698         
2699         if (recv_msg(nks->ns,msg_info,2) != 2) { 
2700             ERROR("Cannot recv key length on close_key_net\n");
2701             return;
2702         }
2703         
2704         next = 0;
2705         keylen = 0;
2706         
2707         keylen |= msg_info[next++];
2708         
2709         if ((msg_info[next] & 0x80) != 0x80) {
2710             ERROR("Missing flag in close_key_net receive\n");
2711             return;
2712         } 
2713         
2714         msg_info[next] &= 0x7F; // flip the msb back to zero
2715         
2716         keylen |= msg_info[next]<<8;
2717         
2718         {
2719             char msg[keylen+1];
2720             
2721             if (recv_msg(nks->ns,msg,keylen)!=keylen) { 
2722                 ERROR("Did not receive all of key in close_key_net receive\n");
2723                 return;
2724             }
2725             
2726             msg[keylen]=0;
2727             
2728             if (strncmp(key,msg,keylen)!=0)  {
2729                 ERROR("Key mismatch in close_key_net - expect %s but got %s\n",key,msg);
2730                 return;
2731             }
2732         }
2733     }
2734 }
2735
2736 static sint64_t write_key_net(v3_keyed_stream_t stream, v3_keyed_stream_key_t key, 
2737                               void *tag,
2738                               sint64_t taglen,
2739                               void *buf, sint64_t len) 
2740 {
2741     struct net_keyed_stream * nks = (struct net_keyed_stream *)stream;
2742
2743     if (!buf) { 
2744         ERROR("Buf is NULL in write_key_net\n");
2745         return -1;
2746     }
2747
2748     if (!tag) { 
2749         ERROR("Tag is NULL in write_key_net\n");
2750         return -1;
2751     }
2752
2753     if (len<0) {
2754         ERROR("len is negative in write_key_net\n");
2755         return -1;
2756     }
2757
2758     if (taglen<0) {
2759         ERROR("taglen is negative in write_key_net\n");
2760         return -1;
2761     }
2762     
2763     if (!key){
2764        ERROR("write_key: key is NULL in write_key_net\n");
2765        return -1;
2766     }
2767     
2768     
2769     if (!nks)  {
2770         ERROR("nks is NULL in write_key_net\n");
2771         return -1;
2772     }
2773     
2774     if (nks->ot==V3_KS_WR_ONLY) {
2775         if (send_msg(nks->ns,(char*)(&BOUNDARY_TAG),sizeof(BOUNDARY_TAG))!=sizeof(BOUNDARY_TAG)) { 
2776             ERROR("Could not send boundary tag in write_key_net\n");
2777             return -1;
2778         } 
2779         if (send_msg(nks->ns,(char*)(&taglen),sizeof(taglen))!=sizeof(taglen)) { 
2780             ERROR("Could not send tag length in write_key_net\n");
2781             return -1;
2782         } 
2783         if (send_msg(nks->ns,tag,taglen)!=taglen) { 
2784             ERROR("Could not send tag in write_key_net\n");
2785             return -1;
2786         }
2787         if (send_msg(nks->ns,(char*)(&len),sizeof(len))!=sizeof(len)) { 
2788             ERROR("Could not send data length in write_key_net\n");
2789             return -1;
2790         } 
2791         if (send_msg(nks->ns,buf,len)!=len) { 
2792             ERROR("Could not send data in write_key_net\n");
2793             return -1;
2794         }
2795     }  else {
2796         ERROR("Permission not correct in write_key_net\n");
2797         return -1;
2798     }
2799     
2800     return len;
2801 }
2802
2803
2804 static sint64_t read_key_net(v3_keyed_stream_t stream, v3_keyed_stream_key_t key,
2805                              void *tag,
2806                              sint64_t taglen,
2807                              void *buf, sint64_t len)
2808 {
2809     struct net_keyed_stream * nks = (struct net_keyed_stream *)stream;
2810     void *temptag;
2811
2812     if (!buf) {
2813         ERROR("Buf is NULL in read_key_net\n");
2814         return -1;
2815     }
2816
2817     if (!tag) {
2818         ERROR("Tag is NULL in read_key_net\n");
2819         return -1;
2820     }
2821     
2822     if(len<0) {
2823         ERROR("len is negative in read_key_net\n");
2824         return -1;
2825     }
2826
2827     if(taglen<0) {
2828         ERROR("taglen is negative in read_key_net\n");
2829         return -1;
2830     }
2831     
2832     if (!key) {
2833         ERROR("read_key: key is NULL in read_key_net\n");
2834         return -1;
2835     }
2836
2837
2838     if (nks->ot==V3_KS_RD_ONLY) {
2839         
2840         sint64_t slen;
2841         uint32_t tempbt;
2842         
2843         if (recv_msg(nks->ns,(char*)(&tempbt),sizeof(tempbt))!=sizeof(tempbt)) { 
2844             ERROR("Cannot receive boundary tag in read_key_net\n");
2845             return -1;
2846         }
2847
2848         if (tempbt!=BOUNDARY_TAG) { 
2849           ERROR("Invalid boundary tag (received 0x%x\n",tempbt);
2850           return -1;
2851         }
2852            
2853         temptag=palacios_alloc(taglen);
2854         if (!temptag) {
2855           ERROR("failed to allocate temptag\n");
2856           return -1;
2857         }
2858
2859         if (recv_msg(nks->ns,(char*)(&slen),sizeof(slen))!=sizeof(slen)) { 
2860             ERROR("Cannot receive tag len in read_key_net\n");
2861             palacios_free(temptag);
2862             return -1;
2863         }
2864
2865         if (slen!=taglen) {
2866             ERROR("Tag len expected does not matched tag len decoded in read_key_net\n");
2867             palacios_free(temptag);
2868             return -1;
2869         }
2870
2871         if (recv_msg(nks->ns,temptag,taglen)!=taglen) { 
2872             ERROR("Cannot recieve tag in read_key_net\n");
2873             palacios_free(temptag);
2874             return -1;
2875         }
2876
2877         if (memcmp(temptag,tag,taglen)) { 
2878           ERROR("Tag mismatch\n");
2879           palacios_free(temptag);
2880           return -1;
2881         }
2882
2883         if (recv_msg(nks->ns,(char*)(&slen),sizeof(slen))!=sizeof(slen)) { 
2884             ERROR("Cannot receive data len in read_key_net\n");
2885             palacios_free(temptag);
2886             return -1;
2887         }
2888
2889         if (slen!=len) {
2890             ERROR("Data len expected does not matched data len decoded in read_key_net\n");
2891             palacios_free(temptag);
2892             return -1;
2893         }
2894
2895         if (recv_msg(nks->ns,buf,len)!=len) { 
2896             ERROR("Cannot recieve data in read_key_net\n");
2897             palacios_free(temptag);
2898             return -1;
2899         }
2900
2901         palacios_free(temptag);
2902         
2903     } else {
2904         ERROR("Permissions incorrect for the stream in read_key_net\n");
2905         return -1;
2906     }
2907
2908     return len;
2909     
2910 }
2911
2912
2913 /***************************************************************************************************
2914   Generic interface
2915 *************************************************************************************************/
2916
2917 static v3_keyed_stream_t open_stream(char *url,
2918                                      v3_keyed_stream_open_t ot)
2919 {
2920     if (!strncasecmp(url,"mem:",4)) { 
2921         return open_stream_mem(url,ot);
2922     } else if (!strncasecmp(url,"file:",5)) { 
2923         return open_stream_file(url,ot);
2924     } else if (!strncasecmp(url,"user:",5)) { 
2925         return open_stream_user(url,ot);
2926     } else if (!strncasecmp(url,"net:",4)){
2927         return open_stream_net(url,ot);
2928     } else if (!strncasecmp(url,"textfile:",9)) { 
2929         return open_stream_textfile(url,ot);
2930     } else {
2931         ERROR("unsupported type in attempt to open keyed stream \"%s\"\n",url);
2932         return 0;
2933     }
2934 }
2935
2936 static void close_stream(v3_keyed_stream_t stream)
2937 {
2938     struct generic_keyed_stream *gks = (struct generic_keyed_stream *) stream;
2939     switch (gks->stype){ 
2940         case STREAM_MEM:
2941             return close_stream_mem(stream);
2942             break;
2943         case STREAM_FILE:
2944             return close_stream_file(stream);
2945             break;
2946         case STREAM_TEXTFILE:
2947             return close_stream_textfile(stream);
2948             break;
2949         case STREAM_USER:
2950             return close_stream_user(stream);
2951             break;
2952         case STREAM_NETWORK:
2953             return close_stream_net(stream);
2954             break;
2955         default:
2956             ERROR("unknown stream type %d in close\n",gks->stype);
2957             break;
2958     }
2959 }
2960
2961 static void preallocate_hint_key(v3_keyed_stream_t stream,
2962                                  char *key,
2963                                  uint64_t size)
2964 {
2965     struct generic_keyed_stream *gks = (struct generic_keyed_stream *) stream;
2966     switch (gks->stype){ 
2967         case STREAM_MEM:
2968             preallocate_hint_key_mem(stream,key,size);
2969             break;
2970         case STREAM_FILE:
2971             preallocate_hint_key_file(stream,key,size);
2972             break;
2973         case STREAM_TEXTFILE:
2974             preallocate_hint_key_textfile(stream,key,size);
2975             break;
2976         case STREAM_USER:
2977             return preallocate_hint_key_user(stream,key,size);
2978             break;
2979         case STREAM_NETWORK:
2980             return preallocate_hint_key_net(stream,key,size);
2981             break;
2982         default:
2983             ERROR("unknown stream type %d in preallocate_hint_key\n",gks->stype);
2984             break;
2985     }
2986     return;
2987 }
2988
2989
2990 static v3_keyed_stream_key_t open_key(v3_keyed_stream_t stream,
2991                                       char *key)
2992 {
2993     struct generic_keyed_stream *gks = (struct generic_keyed_stream *) stream;
2994     switch (gks->stype){ 
2995         case STREAM_MEM:
2996             return open_key_mem(stream,key);
2997             break;
2998         case STREAM_FILE:
2999             return open_key_file(stream,key);
3000             break;
3001         case STREAM_TEXTFILE:
3002             return open_key_textfile(stream,key);
3003             break;
3004         case STREAM_USER:
3005             return open_key_user(stream,key);
3006             break;
3007         case STREAM_NETWORK:
3008             return open_key_net(stream, key);
3009             break;
3010         default:
3011             ERROR("unknown stream type %d in open_key\n",gks->stype);
3012             break;
3013     }
3014     return 0;
3015 }
3016
3017
3018 static void close_key(v3_keyed_stream_t stream, 
3019                       v3_keyed_stream_key_t key)
3020 {
3021     struct generic_keyed_stream *gks = (struct generic_keyed_stream *) stream;
3022     switch (gks->stype){ 
3023         case STREAM_MEM:
3024             return close_key_mem(stream,key);
3025             break;
3026         case STREAM_FILE:
3027             return close_key_file(stream,key);
3028             break;
3029         case STREAM_TEXTFILE:
3030             return close_key_textfile(stream,key);
3031             break;
3032         case STREAM_USER:
3033             return close_key_user(stream,key);
3034             break;
3035          case STREAM_NETWORK:
3036             return close_key_net(stream, key);
3037             break;      
3038         default:
3039             ERROR("unknown stream type %d in close_key\n",gks->stype);
3040             break;
3041     }
3042     // nothing to do
3043     return;
3044 }
3045
3046 static sint64_t write_key(v3_keyed_stream_t stream, 
3047                           v3_keyed_stream_key_t key,
3048                           void *tag,
3049                           sint64_t taglen,
3050                           void *buf,
3051                           sint64_t len)
3052 {
3053     struct generic_keyed_stream *gks = (struct generic_keyed_stream *) stream;
3054     switch (gks->stype){ 
3055         case STREAM_MEM:
3056             return write_key_mem(stream,key,tag,taglen,buf,len);
3057             break;
3058         case STREAM_FILE:
3059             return write_key_file(stream,key,tag,taglen,buf,len);
3060             break;
3061         case STREAM_TEXTFILE:
3062             return write_key_textfile(stream,key,tag,taglen,buf,len);
3063             break;
3064         case STREAM_USER:
3065             return write_key_user(stream,key,tag,taglen,buf,len);
3066             break;
3067         case STREAM_NETWORK:
3068             return write_key_net(stream,key,tag,taglen,buf,len);
3069             break;
3070         default:
3071             ERROR("unknown stream type %d in write_key\n",gks->stype);
3072             return -1;
3073             break;
3074     }
3075     return -1;
3076 }
3077
3078
3079 static sint64_t read_key(v3_keyed_stream_t stream, 
3080                          v3_keyed_stream_key_t key,
3081                          void *tag,
3082                          sint64_t taglen,
3083                          void *buf,
3084                          sint64_t len)
3085 {
3086     struct generic_keyed_stream *gks = (struct generic_keyed_stream *) stream;
3087     switch (gks->stype){ 
3088         case STREAM_MEM:
3089           return read_key_mem(stream,key,tag,taglen,buf,len);
3090             break;
3091         case STREAM_FILE:
3092             return read_key_file(stream,key,tag,taglen,buf,len);
3093             break;
3094         case STREAM_TEXTFILE:
3095             return read_key_textfile(stream,key,tag,taglen,buf,len);
3096             break;
3097         case STREAM_USER:
3098             return read_key_user(stream,key,tag,taglen,buf,len);
3099             break;
3100         case STREAM_NETWORK:
3101             return read_key_net(stream,key,tag,taglen,buf,len);
3102             break;
3103         default:
3104             ERROR("unknown stream type %d in read_key\n",gks->stype);
3105             return -1;
3106             break;
3107     }
3108     return -1;
3109 }
3110
3111
3112
3113
3114 /***************************************************************************************************
3115   Hooks to palacios and inititialization
3116 *************************************************************************************************/
3117
3118     
3119 static struct v3_keyed_stream_hooks hooks = {
3120     .open = open_stream,
3121     .close = close_stream,
3122     .preallocate_hint_key = preallocate_hint_key,
3123     .open_key = open_key,
3124     .close_key = close_key,
3125     .read_key = read_key,
3126     .write_key = write_key
3127 };
3128
3129
3130 static int init_keyed_streams( void )
3131 {
3132     mem_streams = palacios_create_htable(DEF_NUM_STREAMS,hash_func,hash_comp);
3133
3134     if (!mem_streams) { 
3135         ERROR("failed to allocated stream pool for in-memory streams\n");
3136         return -1;
3137     }
3138
3139     user_streams = palacios_alloc(sizeof(struct user_keyed_streams));
3140
3141     if (!user_streams) { 
3142         ERROR("failed to allocated list for user streams\n");
3143         return -1;
3144     }
3145
3146     INIT_LIST_HEAD(&(user_streams->streams));
3147     
3148     palacios_spinlock_init(&(user_streams->lock));
3149
3150     V3_Init_Keyed_Streams(&hooks);
3151
3152     return 0;
3153
3154 }
3155
3156 static int deinit_keyed_streams( void )
3157 {
3158     palacios_free_htable(mem_streams,1,1);
3159
3160     palacios_spinlock_deinit(&(user_streams->lock));
3161
3162     palacios_free(user_streams);
3163
3164     WARNING("Deinit of Palacios Keyed Streams likely leaked memory\n");
3165
3166     return 0;
3167 }
3168
3169
3170 static int guest_init_keyed_streams(struct v3_guest * guest, void ** vm_data ) 
3171 {
3172     
3173     add_guest_ctrl(guest, V3_VM_KSTREAM_USER_CONNECT, keyed_stream_connect_user, 0);
3174     
3175     return 0;
3176 }
3177
3178
3179 static int guest_deinit_keyed_streams(struct v3_guest * guest, void * vm_data)
3180 {
3181     remove_guest_ctrl(guest, V3_VM_KSTREAM_USER_CONNECT);
3182
3183     return 0;
3184 }
3185
3186
3187
3188
3189 static struct linux_ext key_stream_ext = {
3190     .name = "KEYED_STREAM_INTERFACE",
3191     .init = init_keyed_streams,
3192     .deinit = deinit_keyed_streams,
3193     .guest_init = guest_init_keyed_streams,
3194     .guest_deinit = guest_deinit_keyed_streams, 
3195 };
3196
3197
3198 register_extension(&key_stream_ext);