Palacios Public Git Repository

To checkout Palacios execute

  git clone http://v3vee.org/palacios/palacios.web/palacios.git
This will give you the master branch. You probably want the devel branch or one of the release branches. To switch to the devel branch, simply execute
  cd palacios
  git checkout --track -b devel origin/devel
The other branches are similar.


Linux kernel compatability enhancements (through 3.19)
[palacios.git] / linux_module / iface-keyed-stream.c
1 /*
2  * Palacios keyed stream interface
3  *
4  * Plus implementations for mem, file, and user space implementations
5  *
6  * (c) Peter Dinda, 2011 (interface, mem + file implementations + recooked user impl)
7  * (c) Clint Sbisa, 2011 (initial user space implementation on which this is based)
8  * (c) Diana Palsetia & Steve Rangel, 2012 (network based implementation)       
9  * (c) Peter Dinda, 2012 (updated interface, textfile)
10  */
11
12 #include <linux/fs.h>
13 #include <linux/file.h>
14 #include <linux/uaccess.h>
15 #include <linux/namei.h>
16 #include <linux/poll.h>
17 #include <linux/anon_inodes.h>
18
19 #include "palacios.h"
20 #include "util-hashtable.h"
21 #include "linux-exts.h"
22 #include "vm.h"
23
24 #define sint64_t int64_t
25 #include <interfaces/vmm_keyed_stream.h>
26
27 #include "iface-keyed-stream-user.h"
28
29 #include <interfaces/vmm_socket.h>
30
31 #include <linux/spinlock.h>
32 #include <asm/uaccess.h>
33 #include <linux/inet.h>
34 #include <linux/kthread.h>
35 #include <linux/netdevice.h>
36 #include <linux/ip.h>
37 #include <linux/in.h>
38 #include <linux/string.h>
39 #include <linux/preempt.h>
40 #include <linux/sched.h>
41 #include <linux/list.h>
42 #include <linux/syscalls.h>
43
44 #include <linux/module.h>
45 #include <linux/kernel.h>
46 #include <linux/socket.h>
47 #include <linux/net.h>
48 #include <linux/slab.h>
49
50
51 /*
52   This is an implementation of the Palacios keyed stream interface
53   that supports four flavors of streams:
54
55   "mem:"   Streams are stored in a hash table
56   The values for this hash table are hash tables associated with 
57   each stream.   A key maps to an expanding memory buffer.
58   Data is stored in a buffer like:
59
60   [boundarytag][taglen][tag][datalen][data]
61   [boundarytag][taglen][tag][datalen][data]
62   ...
63
64   "file:"  Streams are stored in files.  Each high-level
65   open corresponds to a directory, while a key corresponds to
66   a distinct file in that directory.   Data is stored in a file
67   like:
68
69   [boundarytag][taglen][tag][datalen][data]
70   [boundarytag][taglen][tag][datalen][data]
71   ...
72
73   "textfile:" Same as file, but data is stored in text format, like a
74   windows style .ini file.  A key maps to a file, and data is stored
75   in a file like:
76
77   [key]
78   tag=data_in_hex
79   tag=data_in_hex
80
81   This format makes it possible to concentenate the files to
82   produce a single "ini" file with "sections".
83
84
85   "net:"  Streams are carried over the network.  Each
86    high level open corresponds to a TCP connection, while
87    each key corresponds to a context on the stream.
88       "net:a:<ip>:<port>" => Bind to <ip>:<port> and accept a connection
89       "net:c:<ip>:<port>" => Connect to <ip>:<port>
90    "c" (client) 
91      open_stream: connect
92    "a" (server) 
93      open_stream: accept
94    "c" or "a":
95      open_key:  send [keylen-lastbyte-high-bit][key] (writer)
96            or   recv (same format as above)          (reader)
97      close_key: send [keylen-lastbyte-high-bit][key] (writer)
98            or   recv (same format as above)          (reader)
99      write_key: send [boundarytag][taglen][tag][datalen][data]
100      read_key:  recv (same format as above)
101      close_stream: close socket
102
103   "user:" Stream requests are bounced to user space to be 
104    handled there.  A rendezvous approach similar to the host 
105    device userland support is used
106
107    All keyed streams store the tags.
108    
109 */
110
111 #define STREAM_GENERIC 0
112 #define STREAM_MEM     1
113 #define STREAM_FILE    2
114 #define STREAM_USER    3
115 #define STREAM_NETWORK 4
116 #define STREAM_TEXTFILE 5
117
118 /*
119   All keyed streams and streams indicate their implementation type within the first field
120  */
121 struct generic_keyed_stream {
122     int stype;
123 };
124
125 struct generic_stream {
126     int stype;
127 };
128   
129 /*
130   boundary tags are used for some othe raw formats. 
131 */
132 static uint32_t BOUNDARY_TAG=0xabcd0123;
133
134
135 /****************************************************************************************
136    Memory-based implementation  ("mem:")
137 ****************************************************************************************/
138
139 #define DEF_NUM_STREAMS 16
140 #define DEF_NUM_KEYS    128
141 #define DEF_SIZE        128
142
143 /*
144   A memory keyed stream is a pointer to the underlying hash table
145   while a memory stream contains an extensible buffer for the stream
146  */
147 struct mem_keyed_stream {
148     int stype;
149     v3_keyed_stream_open_t ot;
150     struct hashtable *ht;
151 };
152
153 struct mem_stream {
154     int       stype;
155     char     *data;
156     uint32_t  size;
157     uint32_t  data_max;
158     uint32_t  ptr;
159 };
160
161 static struct mem_stream *create_mem_stream_internal(uint64_t size)
162 {
163     struct mem_stream *m = palacios_alloc(sizeof(struct mem_stream));
164
165     if (!m) {
166         return 0;
167     }
168
169
170     m->data = palacios_valloc(size);
171     
172     if (!m->data) { 
173         palacios_free(m);
174         return 0;
175     }
176
177     m->stype = STREAM_MEM;
178     m->size=size;
179     m->ptr=0;
180     m->data_max=0;
181     
182     return m;
183 }
184
185
186 static struct mem_stream *create_mem_stream(void)
187 {
188     return create_mem_stream_internal(DEF_SIZE);
189 }
190
191 static void destroy_mem_stream(struct mem_stream *m)
192 {
193     if (m) {
194         if (m->data) {
195             palacios_vfree(m->data);
196         }
197         m->data=0;
198         palacios_free(m);
199     }
200 }
201     
202 static int expand_mem_stream(struct mem_stream *m, uint32_t new_size)
203 {
204     void *data = palacios_valloc(new_size);
205     uint32_t nc;
206
207     if (!data) { 
208         return -1;
209     }
210     
211     nc = (new_size<m->data_max) ? new_size : m->data_max;
212
213     memcpy(data,m->data,nc);
214
215     palacios_vfree(m->data);
216
217     m->data=data;
218     m->size=new_size;
219     if (m->size<m->data_max) { 
220         m->data_max=m->size;
221     }
222    
223     return 0;
224 }
225
226 static uint32_t write_mem_stream(struct mem_stream *m,
227                                  void *data,
228                                  uint32_t len)
229 {
230     if ((m->ptr + len) > m->size) { 
231         if (expand_mem_stream(m,m->ptr + len)) { 
232             return 0;
233         }
234     }
235     memcpy(m->data+m->ptr,data,len);
236     m->ptr+=len;
237     m->data_max=m->ptr;
238     
239     return len;
240
241 }
242
243
244
245 static uint32_t read_mem_stream(struct mem_stream *m,
246                                 void *data,
247                                 uint32_t len)
248 {
249     if ((m->ptr + len) > m->data_max) { 
250         return 0;
251     }
252     memcpy(data,m->data+m->ptr,len);
253     m->ptr+=len;
254     
255     return len;
256
257 }
258
259
260 static void reset_mem_stream(struct mem_stream *m)
261 {
262     m->ptr=0;
263 }
264
265
266 static inline uint_t hash_func(addr_t key)
267 {
268     return palacios_hash_buffer((uchar_t*)key,strlen((uchar_t*)key));
269 }
270
271 static inline int hash_comp(addr_t k1, addr_t k2)
272 {
273     return strcasecmp((char*)k1,(char*)k2)==0;
274 }
275
276
277 // This stores all the memory keyed streams streams
278 static struct hashtable *mem_streams=0;
279
280
281 static v3_keyed_stream_t open_stream_mem(char *url,
282                                          v3_keyed_stream_open_t ot)
283 {
284
285     if (strncasecmp(url,"mem:",4)) { 
286         WARNING("illegitimate attempt to open memory stream \"%s\"\n",url);
287         return 0;
288     }
289
290     switch (ot) { 
291         case V3_KS_RD_ONLY:
292         case V3_KS_WR_ONLY: {
293             struct mem_keyed_stream *mks = (struct mem_keyed_stream *) palacios_htable_search(mem_streams,(addr_t)(url+4));
294             if (mks) { 
295                 mks->ot=ot;
296             }
297             return (v3_keyed_stream_t) mks;
298         }
299             break;
300
301         case V3_KS_WR_ONLY_CREATE: {
302             struct mem_keyed_stream *mks = (struct mem_keyed_stream *) palacios_htable_search(mem_streams,(addr_t)(url+4));
303             if (!mks) { 
304                 char *mykey;
305
306                 mykey = palacios_alloc(strlen(url+4)+1);
307
308                 if (!mykey) { 
309                     ERROR("cannot allocate space for new in-memory keyed stream %s\n",url);
310                     return 0;
311                 }
312
313                 strcpy(mykey,url+4); // will fit
314                 
315                 mks = (struct mem_keyed_stream *) palacios_alloc(sizeof(struct mem_keyed_stream));
316
317                 if (!mks) { 
318                     palacios_free(mykey);
319                     ERROR("cannot allocate in-memory keyed stream %s\n",url);
320                     return 0;
321                 }
322             
323                 mks->ht = (void*) palacios_create_htable(DEF_NUM_KEYS,hash_func,hash_comp);
324                 if (!mks->ht) { 
325                     palacios_free(mks);
326                     palacios_free(mykey);
327                     ERROR("cannot allocate in-memory keyed stream %s\n",url);
328                     return 0;
329                 }
330
331                 
332                 if (!palacios_htable_insert(mem_streams,(addr_t)(mykey),(addr_t)mks)) { 
333                     palacios_free_htable(mks->ht,1,1);
334                     palacios_free(mks);
335                     palacios_free(mykey);
336                     ERROR("cannot insert in-memory keyed stream %s\n",url);
337                     return 0;
338                 }
339                 mks->stype=STREAM_MEM;
340             }
341
342             mks->ot=V3_KS_WR_ONLY;
343             
344             return mks;
345             
346         }
347             break;
348
349         default:
350             ERROR("unsupported open type in open_stream_mem\n");
351             break;
352     }
353     
354     return 0;
355         
356 }
357
358
359
360 static void close_stream_mem(v3_keyed_stream_t stream)
361 {
362     // nothing to do
363     return;
364 }
365
366
367 static v3_keyed_stream_key_t open_key_mem(v3_keyed_stream_t stream,
368                                           char *key)
369 {
370     struct mem_keyed_stream *mks = (struct mem_keyed_stream *) stream;
371     struct hashtable *s = mks->ht;
372
373     struct mem_stream *m;
374
375     m = (struct mem_stream *) palacios_htable_search(s,(addr_t)key);
376
377     if (!m) { 
378         char *mykey = palacios_alloc(strlen(key)+1);
379
380         if (!mykey) { 
381             ERROR("cannot allocate copy of key for key %s\n",key);
382             return 0;
383         }
384
385         strcpy(mykey,key); // will fit
386
387         m = create_mem_stream();
388         
389         if (!m) { 
390             palacios_free(mykey);
391             ERROR("cannot allocate mem keyed stream for key %s\n",key);
392             return 0;
393         }
394
395         if (!palacios_htable_insert(s,(addr_t)mykey,(addr_t)m)) {
396             destroy_mem_stream(m);
397             palacios_free(mykey);
398             ERROR("cannot insert mem keyed stream for key %s\n",key);
399             return 0;
400         }
401     }
402
403     reset_mem_stream(m);
404     return m;
405
406 }
407
408
409 static void preallocate_hint_key_mem(v3_keyed_stream_t stream,
410                                      char *key,
411                                      uint64_t size)
412 {
413     struct mem_keyed_stream *mks = (struct mem_keyed_stream *) stream;
414     struct hashtable *s = mks->ht;
415
416     struct mem_stream *m;
417
418     if (mks->ot != V3_KS_WR_ONLY) { 
419         return;
420     }
421
422     m = (struct mem_stream *) palacios_htable_search(s,(addr_t)key);
423
424     if (!m) {
425         char *mykey;
426         
427         mykey=palacios_alloc(strlen(key)+1);
428         
429         if (!mykey) { 
430             ERROR("cannot allocate key space for preallocte for key %s\n",key);
431             return;
432         }
433         
434         strcpy(mykey,key); // will fit
435        
436         m = create_mem_stream_internal(size);
437         
438         if (!m) { 
439             ERROR("cannot preallocate mem keyed stream for key %s\n",key);
440             return;
441         }
442
443         if (!palacios_htable_insert(s,(addr_t)mykey,(addr_t)m)) {
444             ERROR("cannot insert preallocated mem keyed stream for key %s\n",key);
445             destroy_mem_stream(m);
446             return;
447         }
448     } else {
449         if (m->data_max < size) { 
450             if (expand_mem_stream(m,size)) { 
451                 ERROR("cannot expand key for preallocation for key %s\n",key);
452                 return;
453             }
454         }
455     }
456
457     return;
458
459 }
460
461 static void close_key_mem(v3_keyed_stream_t stream, 
462                           v3_keyed_stream_key_t key)
463 {
464     // nothing to do
465     return;
466 }
467
468 static sint64_t write_key_mem(v3_keyed_stream_t stream, 
469                               v3_keyed_stream_key_t key,
470                               void *tag,
471                               sint64_t taglen,
472                               void *buf,
473                               sint64_t len)
474 {
475   struct mem_keyed_stream *mks = (struct mem_keyed_stream *) stream;
476   struct mem_stream *m = (struct mem_stream *) key;
477   uint32_t mylen;
478   uint32_t writelen;
479   
480   if (mks->ot!=V3_KS_WR_ONLY) {
481     return -1;
482   }
483   
484   if (taglen<0 || len<0) { 
485     ERROR("Negative taglen or data len\n");
486     return -1;
487   }
488   
489   if (taglen>0xffffffffULL || len>0xffffffffULL) { 
490     ERROR("taglen or data len is too large\n");
491     return -1;
492   }
493       
494   writelen=write_mem_stream(m,&BOUNDARY_TAG,sizeof(BOUNDARY_TAG));
495   
496   if (writelen!=sizeof(BOUNDARY_TAG)) { 
497     ERROR("failed to write all data for boundary tag\n");
498     return -1;
499   }
500   
501   writelen=write_mem_stream(m,&taglen,sizeof(taglen));
502   
503   if (writelen!=sizeof(taglen)) { 
504     ERROR("failed to write taglen\n");
505     return -1;
506   }
507   
508   mylen = (uint32_t) taglen;
509   
510   writelen=write_mem_stream(m,tag,mylen);
511   
512   if (writelen!=mylen) { 
513     ERROR("failed to write all data for tag\n");
514     return -1;
515   } 
516
517   writelen=write_mem_stream(m,&len,sizeof(len));
518   
519   if (writelen!=sizeof(len)) { 
520     ERROR("failed to write datalen\n");
521     return -1;
522   }
523   
524   mylen = (uint32_t) len;
525
526   writelen=write_mem_stream(m,buf,mylen);
527
528   if (writelen!=mylen) { 
529     ERROR("failed to write all data for key\n");
530     return -1;
531   } else {
532     return (sint64_t)writelen;
533   }
534 }
535
536 static sint64_t read_key_mem(v3_keyed_stream_t stream, 
537                              v3_keyed_stream_key_t key,
538                              void *tag,
539                              sint64_t taglen,
540                              void *buf,
541                              sint64_t len)
542 {
543   struct mem_keyed_stream *mks = (struct mem_keyed_stream *) stream;
544   struct mem_stream *m = (struct mem_stream *) key;
545   uint32_t mylen;
546   uint32_t readlen;
547   void *temptag;
548   uint32_t tempbt;
549   sint64_t templen;
550   
551   
552   if (mks->ot!=V3_KS_RD_ONLY) {
553     return -1;
554   }
555   
556   if (len<0 || taglen<0) { 
557     ERROR("taglen or data len is negative\n");
558     return -1;
559   }
560
561   if (len>0xffffffffULL || taglen>0xffffffffULL) { 
562     ERROR("taglen or data len is too large\n");
563     return -1;
564   }
565   
566   readlen=read_mem_stream(m,&tempbt,sizeof(tempbt));
567   
568   if (readlen!=sizeof(tempbt)) { 
569     ERROR("failed to read all data for boundary tag\n");
570     return -1;
571   } 
572   
573   if (tempbt!=BOUNDARY_TAG) { 
574     ERROR("boundary tag not found (read 0x%x)\n",tempbt);
575     return -1;
576   }
577   
578   readlen=read_mem_stream(m,&templen,sizeof(templen));
579
580   if (readlen!=sizeof(templen)) { 
581     ERROR("failed to read all data for taglen\n");
582     return -1;
583   } 
584
585   if (templen!=taglen) { 
586     ERROR("tag size mismatch (requested=%lld, actual=%lld)\n",taglen,templen);
587     return -1;
588   }
589   
590   temptag = palacios_alloc(taglen);
591     
592   if (!temptag) { 
593     ERROR("cannot allocate temporary tag\n");
594     return -1;
595   }
596
597   mylen = (uint32_t) len;
598     
599   readlen=read_mem_stream(m,temptag,mylen);
600     
601   if (readlen!=mylen) { 
602     ERROR("failed to read all data for tag\n");
603     palacios_free(temptag);
604     return -1;
605   } 
606
607   if (memcmp(tag,temptag,taglen)) { 
608     ERROR("tag mismatch\n");
609     palacios_free(temptag);
610     return -1;
611   }
612   
613   palacios_free(temptag);
614
615   readlen=read_mem_stream(m,&templen,sizeof(templen));
616
617   if (readlen!=sizeof(templen)) { 
618     ERROR("failed to read all data for data len\n");
619     return -1;
620   } 
621   
622   if (templen!=len) { 
623     ERROR("data size mismatch (requested=%lld, actual=%lld)\n",len,templen);
624     return -1;
625   }
626
627   mylen = (uint32_t) len;
628   
629   readlen=read_mem_stream(m,buf,mylen);
630   
631   if (readlen!=mylen) { 
632     ERROR("failed to read all data for key\n");
633     return -1;
634   } else {
635     return (sint64_t)readlen;
636   }
637 }
638
639
640 /***************************************************************************************************
641   File-based implementation  ("file:")
642 *************************************************************************************************/
643
644 /*
645   A file keyed stream contains the fd of the directory
646   and a path
647 */
648
649 struct file_keyed_stream {
650     int   stype;
651     v3_keyed_stream_open_t ot;
652     char  *path;
653 };
654
655 struct file_stream {
656     int   stype;
657     struct file *f;   // the opened file
658 };
659
660 /* lookup directory, see if it is writeable, and if so, create it if asked*/
661
662 #if LINUX_VERSION_CODE < KERNEL_VERSION(2,6,41)
663
664 static int lookup_check_mkdir(const char *path, int need_write, int create, unsigned short perms)
665 {
666   struct nameidata nd;
667
668   if (path_lookup(path,LOOKUP_DIRECTORY|LOOKUP_FOLLOW,&nd)) { 
669     
670     // directory does does not exist.  
671     
672     if (!create) { 
673       // we are not being asked to create it
674       ERROR("attempt to open %s, which does not exist\n",path);
675       return -1;
676     } else {
677       // We are being asked to create it
678
679       struct dentry *de;
680       int err;
681       
682       // Find its parent
683       if (path_lookup(path,LOOKUP_PARENT|LOOKUP_FOLLOW,&nd)) { 
684         ERROR("attempt to create %s failed because its parent cannot be looked up\n",path);
685         return -1;
686       }
687       
688       // Can we write to the parent?
689       
690       if (inode_permission(nd.path.dentry->d_inode, MAY_WRITE | MAY_EXEC)) { 
691         ERROR("attempt to open %s, which has the wrong permissions for directory creation\n",path);
692         return -1;
693       }
694
695       // OK, we can, so let's create it
696       
697       de = lookup_create(&nd,1);
698       
699       if (!de || IS_ERR(de)) { 
700         ERROR("cannot allocate dentry\n");
701         return -1;
702       }
703
704       err = vfs_mkdir(nd.path.dentry->d_inode, de, perms);
705       
706       // lookup_create locks this for us!
707       
708       mutex_unlock(&(nd.path.dentry->d_inode->i_mutex));
709
710       if (err) {
711         ERROR("attempt to create %s failed because mkdir failed\n",path);
712         return -1;
713       }
714
715       // successfully created it. 
716       return 0;
717
718     }
719
720   } else {
721     
722     // it exists, can we read (and write, if needed) to it?
723     
724     if (inode_permission(nd.path.dentry->d_inode, MAY_EXEC | MAY_READ | need_write ? MAY_WRITE : 0 )) {
725       ERROR("attempt to open %s, which has the wrong permissions\n",path);
726       return -1;
727     }
728     
729     return 0;
730   }
731 }
732 #endif
733
734 #if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,41)
735 static int lookup_check_mkdir(const char *path, int need_write, int create, unsigned short perms)
736 {
737   struct path p; 
738
739   if (kern_path(path, LOOKUP_DIRECTORY|LOOKUP_FOLLOW, &p)) { 
740     
741     // directory does does not exist.  
742     
743     if (!create) { 
744       // we are not being asked to create it
745       ERROR("attempt to open %s, which does not exist\n",path);
746       return -1;
747     } else {
748       // We are being asked to create it
749
750       struct dentry *de;
751       int err;
752       
753       // Find its parent
754       if (kern_path(path,LOOKUP_PARENT|LOOKUP_FOLLOW,&p)) { 
755         ERROR("attempt to create %s failed because its parent cannot be looked up\n",path);
756         return -1;
757       }
758       
759       // Can we write to the parent?
760       
761       if (inode_permission(p.dentry->d_inode, MAY_WRITE | MAY_EXEC)) { 
762         ERROR("attempt to open %s, which has the wrong permissions for directory creation\n",path);
763         return -1;
764       }
765
766       // OK, we can, so let's create it
767       
768       de = kern_path_create(AT_FDCWD,path,&p,1);
769       
770       if (!de || IS_ERR(de)) { 
771         ERROR("cannot allocate dentry\n");
772         return -1;
773       }
774
775       err = vfs_mkdir(p.dentry->d_inode, de, perms);
776       
777       // lookup_create locks this for us!
778       
779       mutex_unlock(&(p.dentry->d_inode->i_mutex));
780
781       if (err) {
782         ERROR("attempt to create %s failed because mkdir failed\n",path);
783         return -1;
784       }
785
786       // successfully created it. 
787       return 0;
788
789     }
790     
791   } else {
792     
793     // it exists, can we read (and write, if needed) to it?
794     
795     if (inode_permission(p.dentry->d_inode, MAY_EXEC | MAY_READ | need_write ? MAY_WRITE : 0 )) {
796       ERROR("attempt to open %s, which has the wrong permissions\n",path);
797       return -1;
798     }
799     
800     return 0;
801   }
802 }
803
804 #endif
805
806
807 static v3_keyed_stream_t open_stream_file(char *url,
808                                           v3_keyed_stream_open_t ot)
809 {
810     struct file_keyed_stream *fks;
811
812     if (strncasecmp(url,"file:",5)) { 
813         WARNING("illegitimate attempt to open file stream \"%s\"\n",url);
814         return 0;
815     }
816
817     fks = palacios_alloc(sizeof(struct file_keyed_stream));
818     
819     if (!fks) { 
820         ERROR("cannot allocate space for file stream\n");
821         return 0;
822     }
823
824     fks->path = (char*)palacios_alloc(strlen(url+5)+1);
825     
826     if (!(fks->path)) { 
827         ERROR("cannot allocate space for file stream\n");
828         palacios_free(fks);
829         return 0;
830     }
831     
832     strcpy(fks->path,url+5); // will fit
833     
834     fks->stype=STREAM_FILE;
835
836     fks->ot= ot==V3_KS_WR_ONLY_CREATE ? V3_KS_WR_ONLY : ot;
837
838     if (lookup_check_mkdir(fks->path,ot!=V3_KS_RD_ONLY,ot==V3_KS_WR_ONLY_CREATE,0700)) { 
839       ERROR("cannot find or create directory for stream\n");
840       goto fail_out;
841     }
842
843     return fks;
844
845  fail_out:
846     palacios_free(fks->path);
847     palacios_free(fks);
848     return 0;
849
850 }
851
852 static void close_stream_file(v3_keyed_stream_t stream)
853 {
854     struct file_keyed_stream *fks = (struct file_keyed_stream *) stream;
855     
856     palacios_free(fks->path);
857     palacios_free(fks);
858
859 }
860
861 static void preallocate_hint_key_file(v3_keyed_stream_t stream,
862                                       char *key,
863                                       uint64_t size)
864 {
865     return;
866 }
867
868 static v3_keyed_stream_key_t open_key_file(v3_keyed_stream_t stream,
869                                            char *key)
870 {
871     struct file_keyed_stream *fks = (struct file_keyed_stream *) stream;
872     struct file_stream *fs;
873     char *path;
874
875     // the path is the stream's path plus the key name
876     // file:/home/foo + "regext" => "/home/foo/regext"
877     path = (char *) palacios_alloc(strlen(fks->path)+strlen(key)+2);
878     if (!path) {                                
879         ERROR("cannot allocate file keyed stream for key %s\n",key);
880         return 0;
881     }
882     // this sequence will fit and terminate with a zero
883     strcpy(path,fks->path);
884     strcat(path,"/");
885     strcat(path,key);
886     
887     fs = (struct file_stream *) palacios_alloc(sizeof(struct file_stream));
888     
889     if (!fs) { 
890         ERROR("cannot allocate file keyed stream for key %s\n",key);
891         palacios_free(path);
892         return 0;
893     }
894
895     fs->stype=STREAM_FILE;
896
897     fs->f = filp_open(path,O_RDWR|O_CREAT|O_LARGEFILE,0600);
898
899     if (!fs->f || IS_ERR(fs->f)) {
900         ERROR("cannot open relevent file \"%s\" for stream \"file:%s\" and key \"%s\"\n",path,fks->path,key);
901         palacios_free(fs);
902         palacios_free(path);
903         return 0;
904     }
905
906     palacios_free(path);
907
908     return fs;
909 }
910
911
912 static void close_key_file(v3_keyed_stream_t stream, 
913                            v3_keyed_stream_key_t key)
914 {
915     struct file_stream *fs = (struct file_stream *) key;
916
917     filp_close(fs->f,NULL);
918
919     palacios_free(fs);
920 }
921
922
923 static sint64_t write_file(struct file_stream *fs, void *buf, sint64_t len)
924 {
925     ssize_t done, left, total;
926     mm_segment_t old_fs;
927
928     total=len;
929     left=len;
930
931     while (left>0) {
932         old_fs = get_fs();
933         set_fs(get_ds());
934         done = fs->f->f_op->write(fs->f, buf+(total-left), left, &(fs->f->f_pos));
935         set_fs(old_fs);
936         if (done<=0) {
937             return -1;
938         } else {
939             left -= done;
940         }
941     }
942
943     return len;
944 }
945
946 static sint64_t write_key_file(v3_keyed_stream_t stream, 
947                                v3_keyed_stream_key_t key,
948                                void *tag,
949                                sint64_t taglen,
950                                void *buf,
951                                sint64_t len)
952 {
953   struct file_keyed_stream *fks = (struct file_keyed_stream *) stream;
954   struct file_stream *fs = (struct file_stream *) key;
955   sint64_t writelen;
956   
957   if (fks->ot!=V3_KS_WR_ONLY) { 
958     return -1;
959   }
960   
961   if (taglen<0 || len<0) { 
962     ERROR("Negative taglen or data len\n");
963     return -1;
964   }
965   
966   writelen=write_file(fs,&BOUNDARY_TAG,sizeof(BOUNDARY_TAG));
967   
968   if (writelen!=sizeof(BOUNDARY_TAG)) { 
969     ERROR("failed to write all data for boundary tag\n");
970     return -1;
971   }
972   
973   writelen=write_file(fs,&taglen,sizeof(taglen));
974   
975   if (writelen!=sizeof(taglen)) { 
976     ERROR("failed to write taglen\n");
977     return -1;
978   }
979   
980   if (write_file(fs,tag,taglen)!=taglen) { 
981     ERROR("failed to write tag\n");
982     return -1;
983   }
984
985   writelen=write_file(fs,&len,sizeof(len));
986   
987   if (writelen!=sizeof(len)) { 
988     ERROR("failed to write data len\n");
989     return -1;
990   }
991   
992   return write_file(fs,buf,len);
993 }
994
995 static sint64_t read_file(struct file_stream *fs, void *buf, sint64_t len)
996 {
997     ssize_t done, left, total;
998     mm_segment_t old_fs;
999
1000     total=len;
1001     left=len;
1002
1003
1004     while (left>0) {
1005         old_fs = get_fs();
1006         set_fs(get_ds());
1007         done = fs->f->f_op->read(fs->f, buf+(total-left), left, &(fs->f->f_pos));
1008         set_fs(old_fs);
1009         if (done<=0) {
1010             return -1;
1011         } else {
1012             left -= done;
1013         }
1014     }
1015
1016     return len;
1017 }
1018
1019
1020 static sint64_t read_key_file(v3_keyed_stream_t stream, 
1021                               v3_keyed_stream_key_t key,
1022                               void *tag,
1023                               sint64_t taglen,
1024                               void *buf,
1025                               sint64_t len)
1026 {
1027   struct file_keyed_stream *fks = (struct file_keyed_stream *) stream;
1028   struct file_stream *fs = (struct file_stream *) key;
1029   void *temptag;
1030   uint32_t tempbt;
1031   sint64_t templen;
1032   sint64_t readlen;
1033   
1034   if (fks->ot!=V3_KS_RD_ONLY) { 
1035     return -1;
1036   }
1037   
1038   if (len<0 || taglen<0) { 
1039     ERROR("taglen or data len is negative\n");
1040     return -1;
1041   }
1042
1043   readlen=read_file(fs,&tempbt,sizeof(tempbt));
1044   
1045   if (readlen!=sizeof(tempbt)) { 
1046     ERROR("failed to read all data for boundary tag\n");
1047     return -1;
1048   } 
1049   
1050   if (tempbt!=BOUNDARY_TAG) { 
1051     ERROR("boundary tag not found (read 0x%x)\n",tempbt);
1052     return -1;
1053   }
1054
1055   readlen=read_file(fs,&templen,sizeof(templen));
1056   
1057   if (readlen!=sizeof(templen)) { 
1058     ERROR("failed to read all data for tag len\n");
1059     return -1;
1060   } 
1061
1062   if (templen!=taglen) { 
1063     ERROR("tag size mismatch (requested=%lld, actual=%lld)\n",taglen,templen);
1064     return -1;
1065   }
1066
1067   temptag=palacios_alloc(taglen);
1068
1069   if (!temptag) { 
1070     ERROR("Cannot allocate temptag\n");
1071     return -1;
1072   }
1073   
1074   if (read_file(fs,temptag,taglen)!=taglen) { 
1075     ERROR("Cannot read tag\n");
1076     palacios_free(temptag);
1077     return -1;
1078   }
1079
1080   if (memcmp(temptag,tag,taglen)) { 
1081     ERROR("Tag mismatch\n");
1082     palacios_free(temptag);
1083     return -1;
1084   }
1085   
1086   palacios_free(temptag);
1087
1088   readlen=read_file(fs,&templen,sizeof(templen));
1089   
1090   if (readlen!=sizeof(templen)) { 
1091     ERROR("failed to read all data for data len\n");
1092     return -1;
1093   } 
1094
1095   if (templen!=len) { 
1096     ERROR("datasize mismatch (requested=%lld, actual=%lld)\n",len,templen);
1097     return -1;
1098   }
1099
1100   return read_file(fs,buf,len);
1101
1102 }
1103
1104
1105 /***************************************************************************************************
1106   Textfile-based implementation  ("textfile:")
1107
1108   Note that this implementation uses the internal structure and functions of the 
1109   "file:" implementation
1110 *************************************************************************************************/
1111
1112 // optimize the reading and decoding of hex data
1113 // this weakens the parser, so that:
1114 // tag =0A0B0D 
1115 // will work, but
1116 // tag = 0A 0B 0D
1117 // will not.  Note the leading whitespace
1118 #define TEXTFILE_OPT_HEX 0
1119
1120 //
1121 // The number of bytes handled at a time by the hex putter and getter
1122 //
1123 #define MAX_HEX_SEQ 64
1124
1125 /*
1126   A text file keyed stream is a file_keyed_stream,
1127   only with a different stype
1128 */
1129
1130 #define PAUSE()  
1131 //#define PAUSE() ssleep(5) 
1132
1133
1134 typedef struct file_keyed_stream textfile_keyed_stream;
1135
1136 typedef struct file_stream textfile_stream;
1137
1138
1139 static v3_keyed_stream_t open_stream_textfile(char *url,
1140                                               v3_keyed_stream_open_t ot)
1141 {
1142   textfile_keyed_stream *me;
1143
1144   if (strncasecmp(url,"textfile:",9)) { 
1145     WARNING("illegitimate attempt to open textfile stream \"%s\"\n",url);
1146     return 0;
1147   }
1148
1149   me = (textfile_keyed_stream *) open_stream_file(url+4, ot);
1150
1151   if (!me) {
1152     ERROR("could not create underlying file stream\n");
1153     return 0;
1154   }
1155
1156   me->stype=STREAM_TEXTFILE;
1157
1158   return me;
1159 }
1160
1161   
1162
1163 static void close_stream_textfile(v3_keyed_stream_t stream)
1164 {
1165   textfile_keyed_stream *me = stream;
1166
1167   me->stype=STREAM_FILE;
1168
1169   close_stream_file(me);
1170
1171 }
1172
1173 static void preallocate_hint_key_textfile(v3_keyed_stream_t stream,
1174                                           char *key,
1175                                           uint64_t size)
1176 {
1177   textfile_keyed_stream *me = stream;
1178   
1179   me->stype=STREAM_FILE;
1180
1181   preallocate_hint_key_file(me,key,size);
1182   
1183   me->stype=STREAM_TEXTFILE;
1184  
1185 }
1186
1187
1188 static inline int isoneof(char c, char *sl, int m)
1189 {
1190   int i;
1191   
1192   for (i=0;i<m;i++) { 
1193     if (c==sl[i]) { 
1194       return 1;
1195     }
1196   }
1197   
1198   return 0;
1199 }
1200
1201 static char get_next_char(textfile_stream *s)
1202 {
1203   char buf;
1204   if (read_file(s,&buf,1)!=1) { 
1205     return -1;
1206   } 
1207   return buf;
1208 }
1209
1210 static char hexify_nybble(char c)
1211 {
1212   if (c>=0 && c<=9) { 
1213     return '0'+c;
1214   } else if (c>=0xa && c<=0xf) { 
1215     return 'a'+(c-0xa);
1216   } else {
1217     return -1;
1218   }
1219 }
1220
1221 static int hexify_byte(char *c, char b)
1222 {
1223   char n;
1224   n = hexify_nybble( (b >> 4) & 0xf);
1225   if (n==-1) { 
1226     return -1;
1227   }
1228   c[0] = n;
1229   n = hexify_nybble( b & 0xf);
1230   if (n==-1) { 
1231     return -1;
1232   }
1233   c[1] = n;
1234   return 0;
1235 }
1236
1237
1238 static char dehexify_nybble(char c)
1239 {
1240   if (c>='0' && c<='9') { 
1241     return c-'0';
1242   } else if (c>='a' && c<='f') {
1243     return 0xa + (c-'a');
1244   } else if (c>='A' && c<='F') { 
1245     return 0xa + (c-'A');
1246   } else {
1247     return -1;
1248   }
1249 }
1250
1251 static int dehexify_byte(char *c, char *b)
1252 {
1253   char n;
1254   n = dehexify_nybble(c[0]);
1255   if (n==-1) { 
1256     return -1;
1257   }
1258   *b = n << 4;
1259   n = dehexify_nybble(c[1]);
1260   if (n==-1) { 
1261     return -1;
1262   }
1263   *b |= n;
1264   return 0;
1265 }
1266
1267
1268 #if TEXTFILE_OPT_HEX
1269
1270
1271 // Here the sl array, of length m is the number
1272 static int get_hexbytes_as_data(textfile_stream *s, char *buf, int n)
1273 {
1274   char rbuf[MAX_HEX_SEQ*2];
1275   int left = n;
1276   int off = 0;
1277   int cur = 0;
1278   int i;
1279
1280   while (left>0) {
1281     cur = left > MAX_HEX_SEQ ? MAX_HEX_SEQ : left;
1282     if (read_file(s,rbuf,cur*2)!=cur*2) { 
1283       ERROR("Cannot read data in getting hexbytes as data\n");
1284       return -1;
1285     }
1286     
1287     for (i=0;i<cur;i++) {
1288       if (dehexify_byte(rbuf+(i*2),buf+off+i)==-1) { 
1289         ERROR("Cannot decode data as hex in getting hexbytes as data\n");
1290         return -1;
1291       }
1292     }
1293     left-=cur;
1294     off+=cur;
1295   } 
1296
1297   return 0;
1298 }    
1299
1300 #endif
1301
1302 // Here the sl array, of length m is the set of characters to skip (e.g., whitespace)
1303 static int get_hexbytes_as_data_skip(textfile_stream *s, char *buf, int n, char *sl, int m)
1304 {
1305   char rbuf[2];
1306   int which = 0;
1307   int cur=0;
1308     
1309   while (cur<n) {
1310     which=0;
1311     while (which<2) { 
1312       rbuf[which] = get_next_char(s);
1313       if (rbuf[which]==-1) { 
1314         ERROR("Cannot read char in getting hexbytes as data with skiplist");
1315         return -1;
1316       }
1317       if (isoneof(rbuf[which],sl,m)) { 
1318         continue;
1319       } else {
1320         which++;
1321       }
1322     }
1323     if (dehexify_byte(rbuf,buf+cur)==-1) { 
1324       ERROR("Cannot decode data as hex in getting hexbytes as data with skiplist\n");
1325       return -1;
1326     } else {
1327       cur++;
1328     }
1329   }
1330   return 0;
1331 }    
1332
1333 /*
1334 static int put_next_char(textfile_stream *s, char d)
1335 {
1336   return write_file(s,&d,1);
1337 }
1338 */
1339
1340
1341 static int put_data_as_hexbytes(textfile_stream *s, char *buf, int n)
1342 {
1343   char rbuf[MAX_HEX_SEQ*2];
1344   int left = n;
1345   int off = 0;
1346   int cur = 0;
1347   int i;
1348
1349   while (left>0) {
1350     cur = left > MAX_HEX_SEQ ? MAX_HEX_SEQ : left;
1351     for (i=0;i<cur;i++) {
1352       if (hexify_byte(rbuf+(i*2),*(buf+off+i))==-1) { 
1353         ERROR("Cannot encode data as hex in putting data as hexbytes\n");
1354         return -1;
1355       }
1356     }
1357     if (write_file(s,rbuf,cur*2)!=cur*2) { 
1358       ERROR("Cannot write data in putting data as hexbytes\n");
1359       return -1;
1360     }
1361     left-=cur;
1362     off+=cur;
1363   } 
1364
1365   return 0;
1366 }    
1367
1368
1369 static int put_string_n(textfile_stream *s, char *buf, int n)
1370 {
1371   int rc;
1372
1373   rc = write_file(s,buf,n);
1374   
1375   if (rc!=n) { 
1376     return -1;
1377   } else {
1378     return 0;
1379   }
1380 }
1381
1382 static int put_string(textfile_stream *s, char *buf)
1383 {
1384   int n=strlen(buf);
1385
1386   return put_string_n(s,buf,n);
1387 }
1388
1389
1390
1391 static int search_for(textfile_stream *s, char d)
1392 {
1393   char c;
1394   do {
1395     c=get_next_char(s);
1396   } while (c!=-1 && c!=d);
1397   
1398   if (c==d) { 
1399     return 0;
1400   } else {
1401     return -1;
1402   }
1403 }
1404
1405 /*
1406 static int skip_matching(textfile_stream *s, char *m, int n)
1407 {
1408   char c;
1409   int rc = 0;
1410   int i;
1411
1412   while (rc==0) { 
1413     c=get_next_char(s);
1414     if (c==-1) { 
1415       rc=-1;
1416     } else {
1417       for (i=0;i<n;i++) { 
1418         if (c==m[i]) {
1419           rc=1;
1420           break;
1421         } 
1422       }
1423     }
1424   }
1425   
1426   if (rc==1) { 
1427     return 0;  // found
1428   } else {
1429     return rc; // unknown
1430   }
1431 }
1432
1433 */
1434
1435
1436 static int token_scan(textfile_stream *s, char *token, int n, char *sl, int m)
1437 {
1438   char c;
1439   int cur;
1440
1441   // Skip whitespace
1442   do {
1443     c=get_next_char(s);
1444     if (c==-1) { 
1445       ERROR("Failed to get character during token scan (preceding whitespace)\n");
1446       return -1;
1447     }
1448   } while (isoneof(c,sl,m));
1449
1450
1451   token[0]=c;
1452   
1453   // Record
1454   cur=1;
1455   while (cur<(n-1)) { 
1456     c=get_next_char(s);
1457     if (c==-1) { 
1458       ERROR("Failed to get character during token scan (token)\n");
1459       return -1;
1460     }
1461     if (isoneof(c,sl,m)) { 
1462       break;
1463     } else {
1464       token[cur]=c;
1465       cur++;
1466     } 
1467   }
1468   token[cur]=0;
1469   return 0;
1470 }
1471
1472
1473
1474 static v3_keyed_stream_key_t open_key_textfile(v3_keyed_stream_t stream,
1475                                                char *key)
1476 {
1477   textfile_keyed_stream *mks = stream;
1478   textfile_stream *ms;
1479
1480   mks->stype=STREAM_FILE;
1481
1482   ms = open_key_file(mks,key);
1483
1484   if (!ms) { 
1485     ERROR("cannot open underlying file keyed stream for key %s\n",key);
1486     mks->stype=STREAM_TEXTFILE;
1487     return 0;
1488   }
1489
1490   if (mks->ot==V3_KS_WR_ONLY) { 
1491     
1492     // Now we write the section header
1493
1494     ms->stype=STREAM_FILE;
1495     
1496     if (put_string(ms,"[")) { 
1497       close_key_file(mks,ms);
1498       mks->stype=STREAM_TEXTFILE;
1499       return 0;
1500     }
1501     
1502     if (put_string(ms,key)) { 
1503       close_key_file(mks,ms);
1504       mks->stype=STREAM_TEXTFILE;
1505       return 0;
1506     }
1507     
1508     if (put_string(ms,"]\n")) {
1509       close_key_file(mks,ms);
1510       mks->stype=STREAM_TEXTFILE;
1511       return 0;
1512     }
1513     
1514     
1515     mks->stype=STREAM_TEXTFILE;
1516     ms->stype=STREAM_TEXTFILE;
1517
1518     return ms;
1519
1520   } else if (mks->ot == V3_KS_RD_ONLY) {
1521     // Now we readthe section header
1522     int keylen=strlen(key);
1523     char *tempkey = palacios_alloc(keylen+3);
1524
1525     ms->stype=STREAM_FILE;
1526
1527     if (!tempkey) { 
1528       ERROR("Allocation failed in opening key\n");
1529       close_key_file(mks,ms);
1530       mks->stype=STREAM_FILE;
1531       return 0;
1532     }
1533
1534
1535     if (token_scan(ms,tempkey,keylen+3,"\t\r\n",3)) { 
1536       ERROR("Cannot scan for token (key search)\n");
1537       close_key_file(mks,ms);
1538       mks->stype=STREAM_TEXTFILE;
1539       palacios_free(tempkey);
1540       return 0;
1541     }
1542     
1543     tempkey[keylen+2] = 0;
1544     
1545     // Should now have [key]0
1546
1547     if (tempkey[0]!='[' ||
1548         tempkey[keylen+1]!=']' ||
1549         memcmp(key,tempkey+1,keylen)) {
1550       ERROR("key mismatch: target key=%s, found %s\n",key,tempkey);
1551       palacios_free(tempkey);
1552       close_key_file(mks,ms);
1553       mks->stype=STREAM_TEXTFILE;
1554       return 0;
1555     }
1556
1557     // key match done, success
1558
1559     mks->stype=STREAM_TEXTFILE;
1560     ms->stype=STREAM_TEXTFILE;
1561
1562     palacios_free(tempkey);
1563
1564     return ms;
1565     
1566   } else {
1567     ERROR("Unknown open type in open_key_textfile\n");
1568     ms->stype=STREAM_FILE;
1569     close_key_file(mks,ms);
1570     return 0;
1571   }
1572
1573 }
1574
1575
1576
1577 static void close_key_textfile(v3_keyed_stream_t stream, 
1578                                v3_keyed_stream_key_t key)
1579 {
1580   textfile_keyed_stream *mks = stream;
1581   textfile_stream *ms=key;
1582
1583   mks->stype=STREAM_FILE;
1584   ms->stype=STREAM_FILE;
1585
1586   close_key_file(mks,ms);
1587
1588   mks->stype=STREAM_TEXTFILE;
1589
1590 }
1591
1592
1593 static sint64_t read_key_textfile(v3_keyed_stream_t stream, 
1594                                   v3_keyed_stream_key_t key,
1595                                   void *tag,
1596                                   sint64_t taglen,
1597                                   void *buf,
1598                                   sint64_t len)
1599 {
1600     textfile_stream *ms = (textfile_stream *) key;
1601     char tags[32];
1602     char *temptag;
1603
1604
1605
1606     memcpy(tags,tag,taglen<31 ? taglen : 31);
1607     tags[taglen<32? taglen : 31 ]=0;
1608     
1609     temptag=palacios_alloc(taglen+1);
1610     if (!temptag) { 
1611       ERROR("Unable to allocate temptag in textfile read key\n");
1612       return -1;
1613     }
1614
1615     ms->stype=STREAM_FILE;
1616     
1617     if (token_scan(ms,temptag,taglen+1," \t\r\n=",5)) { 
1618       ERROR("Cannot scan for token (tag search)\n");
1619       ms->stype=STREAM_TEXTFILE;
1620       palacios_free(temptag);
1621       return -1;
1622     }
1623
1624     if (memcmp(tag,temptag,taglen)) { 
1625       ERROR("Tag mismatch in reading tag from textfile: desired tag=%s, actual tag=%s\n",tags,temptag);
1626       ms->stype=STREAM_TEXTFILE;
1627       palacios_free(temptag);
1628       return -1;
1629     }
1630
1631     // tag matches, let's go and find our =
1632     palacios_free(temptag);
1633
1634     if (search_for(ms,'=')) { 
1635       ERROR("Unable to find = sign in tag data parse (tag=%s)\n", tags);
1636       ms->stype=STREAM_TEXTFILE;
1637       return -1;
1638     }
1639
1640
1641 #if TEXTFILE_OPT_HEX
1642     if (get_hexbytes_as_data(ms,buf,len)) { 
1643       ERROR("Cannot read data in hex format (opt path) in textfile for tag %s\n",tags);
1644       ms->stype=STREAM_TEXTFILE;
1645       return -1;
1646     }
1647 #else
1648     if (get_hexbytes_as_data_skip(ms,buf,len," \t\r\n",4)) { 
1649       ERROR("Cannot read data in hex format (unopt path) in textfile for tag %s\n",tags);
1650       ms->stype=STREAM_TEXTFILE;
1651       return -1;
1652     }
1653 #endif
1654
1655     ms->stype=STREAM_TEXTFILE;
1656
1657     return len;
1658 }
1659
1660 static sint64_t write_key_textfile(v3_keyed_stream_t stream, 
1661                                    v3_keyed_stream_key_t key,
1662                                    void *tag,
1663                                    sint64_t taglen,
1664                                    void *buf,
1665                                    sint64_t len)
1666 {
1667     textfile_stream *ms = (textfile_stream *) key;
1668     char tags[32];
1669
1670
1671
1672     memcpy(tags,tag,taglen<31 ? taglen : 31);
1673     tags[taglen<32? taglen : 31 ]=0;
1674
1675     /*    if (taglen>100000 || len>100000) { 
1676       ERROR("Too big\n");
1677       return -1;
1678     }
1679     */
1680
1681     ms->stype=STREAM_FILE;
1682
1683     if (put_string_n(ms,tag,taglen)) { 
1684       ERROR("Cannot write tag %s in textfile\n",tags);
1685       ms->stype=STREAM_TEXTFILE;
1686       return -1;
1687     }
1688
1689     if (put_string(ms,"=")) { 
1690       ERROR("Cannot write = in textfile for tag %s\n",tags);
1691       ms->stype=STREAM_TEXTFILE;
1692       return -1;
1693     }
1694
1695     if (put_data_as_hexbytes(ms,buf,len)) { 
1696       ERROR("Cannot write data in hex format in textfile for tag %s\n",tags);
1697       ms->stype=STREAM_TEXTFILE;
1698       return -1;
1699     }
1700
1701     if (put_string(ms,"\n")) { 
1702       ERROR("Cannot write trailing lf in textfile for tag %s\n",tags);
1703       ms->stype=STREAM_TEXTFILE;
1704       return -1;
1705     }
1706
1707     ms->stype=STREAM_TEXTFILE;
1708
1709     return len;
1710 }
1711
1712
1713
1714 /***************************************************************************************************
1715   User implementation   ("user:")
1716 *************************************************************************************************/
1717
1718
1719 // List of all user keyed stream connections for the guest
1720 struct user_keyed_streams {
1721     spinlock_t lock;
1722     struct list_head streams;
1723 };
1724
1725
1726 // A single keyed stream connection to user space
1727 struct user_keyed_stream {
1728     int stype;
1729     v3_keyed_stream_open_t otype;
1730
1731     char *url;
1732     spinlock_t lock;
1733     int waiting;
1734
1735     wait_queue_head_t user_wait_queue;
1736     wait_queue_head_t host_wait_queue;
1737
1738     struct palacios_user_keyed_stream_op *op;
1739
1740     struct list_head node;
1741 };
1742
1743
1744 //
1745 // List of all of the user streams
1746 //
1747 static struct user_keyed_streams *user_streams;
1748
1749
1750
1751 static int resize_op(struct palacios_user_keyed_stream_op **op, uint64_t buf_len)
1752 {
1753     struct palacios_user_keyed_stream_op *old = *op;
1754     struct palacios_user_keyed_stream_op *new;
1755     
1756     if (!old) {
1757         new = palacios_alloc(sizeof(struct palacios_user_keyed_stream_op)+buf_len);
1758         if (!new) { 
1759             return -1;
1760         } else {
1761             new->len=sizeof(struct palacios_user_keyed_stream_op)+buf_len;
1762             new->buf_len=buf_len;
1763             *op=new;
1764             return 0;
1765         }
1766     } else {
1767         if ((old->len-sizeof(struct palacios_user_keyed_stream_op)) >= buf_len) { 
1768             old->buf_len=buf_len;
1769             return 0;
1770         } else {
1771             palacios_free(old);
1772             *op = 0 ;
1773             return resize_op(op,buf_len);
1774         }
1775     }
1776 }
1777
1778 //
1779 // The assumption is that we enter this with the stream locked
1780 // and we will return with it locked;  additionally, the op structure
1781 // will be overwritten with the response
1782 // 
1783 static int do_request_to_response(struct user_keyed_stream *s, unsigned long *flags)
1784 {
1785
1786     if (s->waiting) {
1787         ERROR("user keyed stream request attempted while one is already in progress on %s\n",s->url);
1788         return -1;
1789     }
1790
1791     // we are now waiting for a response
1792     s->waiting = 1;
1793
1794     // release the stream
1795     palacios_spinlock_unlock_irqrestore(&(s->lock), *flags);
1796
1797     // wake up anyone waiting on it
1798     wake_up_interruptible(&(s->user_wait_queue));
1799
1800     // wait for someone to give us a response
1801     while (wait_event_interruptible(s->host_wait_queue, (s->waiting == 0)) != 0) {}
1802
1803     // reacquire the lock for our called
1804     palacios_spinlock_lock_irqsave(&(s->lock), *flags);
1805
1806     return 0;
1807 }
1808
1809 //
1810 // The assumption is that we enter this with the stream locked
1811 // and we will return with it UNlocked
1812 // 
1813 static int do_response_to_request(struct user_keyed_stream *s, unsigned long *flags)
1814 {
1815
1816     if (!(s->waiting)) {
1817         ERROR("user keyed stream response while no request is in progress on %s\n",s->url);
1818         return -1;
1819     }
1820
1821     // we are now waiting for a request
1822     s->waiting = 0;
1823
1824     // release the stream
1825     palacios_spinlock_unlock_irqrestore(&(s->lock), *flags);
1826
1827     // wake up anyone waiting on it
1828     wake_up_interruptible(&(s->host_wait_queue));
1829     
1830     return 0;
1831 }
1832
1833
1834
1835 static unsigned int keyed_stream_poll_user(struct file *filp, poll_table *wait)
1836 {
1837     struct user_keyed_stream *s = (struct user_keyed_stream *) (filp->private_data);
1838     unsigned long flags;
1839     
1840     if (!s) {
1841         return POLLERR;
1842     }
1843     
1844     palacios_spinlock_lock_irqsave(&(s->lock), flags);
1845
1846     poll_wait(filp, &(s->user_wait_queue), wait);
1847
1848     if (s->waiting) {
1849         palacios_spinlock_unlock_irqrestore(&(s->lock), flags);
1850         return POLLIN | POLLRDNORM;
1851     }
1852     
1853     palacios_spinlock_unlock_irqrestore(&(s->lock), flags);
1854
1855     return 0;
1856 }
1857
1858 static long keyed_stream_ioctl_user(struct file * filp, unsigned int ioctl, unsigned long arg)
1859 {
1860     void __user *argp = (void __user *)arg;
1861     unsigned long flags;
1862     uint64_t size;
1863     
1864     struct user_keyed_stream *s = (struct user_keyed_stream *) (filp->private_data);
1865     
1866     switch (ioctl) {
1867
1868         case V3_KSTREAM_REQUEST_SIZE_IOCTL:
1869             
1870             // inform request size
1871             
1872             palacios_spinlock_lock_irqsave(&(s->lock), flags);
1873             
1874             if (!(s->waiting)) {
1875                 palacios_spinlock_unlock_irqrestore(&(s->lock), flags);
1876                 return 0;
1877             }
1878
1879             size =  sizeof(struct palacios_user_keyed_stream_op) + s->op->buf_len;
1880             
1881             if (copy_to_user((void * __user) argp, &size, sizeof(uint64_t))) {
1882                 palacios_spinlock_unlock_irqrestore(&(s->lock), flags);
1883                 ERROR("palacios user key size request failed to copy data\n");
1884                 return -EFAULT;
1885             }
1886             
1887             palacios_spinlock_unlock_irqrestore(&(s->lock), flags);
1888             
1889             return 1;
1890             
1891             break;
1892
1893         case V3_KSTREAM_REQUEST_PULL_IOCTL: 
1894                 
1895             // pull the request
1896             
1897             palacios_spinlock_lock_irqsave(&(s->lock), flags);
1898
1899             if (!(s->waiting)) {
1900                 palacios_spinlock_unlock_irqrestore(&(s->lock), flags);
1901                 ERROR("palacios user key pull request when not waiting\n");
1902                 return 0;
1903             }
1904
1905             size =  sizeof(struct palacios_user_keyed_stream_op) + s->op->buf_len;
1906
1907
1908             if (copy_to_user((void __user *) argp, s->op, size)) {
1909                 palacios_spinlock_unlock_irqrestore(&(s->lock), flags);
1910                 ERROR("palacios user key pull request failed to copy data\n");
1911                 return -EFAULT;
1912             }
1913
1914             palacios_spinlock_unlock_irqrestore(&(s->lock), flags);
1915             
1916             return 1;
1917             
1918          
1919             break;
1920
1921     case V3_KSTREAM_RESPONSE_PUSH_IOCTL:
1922
1923         // push the response
1924
1925         palacios_spinlock_lock_irqsave(&(s->lock), flags);
1926
1927         if (!(s->waiting)) {
1928             palacios_spinlock_unlock_irqrestore(&(s->lock), flags);
1929             ERROR("palacios user key push response when not waiting\n");
1930             return 0;
1931         }
1932         
1933         if (copy_from_user(&size, (void __user *) argp, sizeof(uint64_t))) {
1934             ERROR("palacios user key push response failed to copy size\n");
1935             palacios_spinlock_unlock_irqrestore(&(s->lock), flags);
1936             return -EFAULT;
1937         }
1938
1939         // overflow possible here for very large request
1940         if (resize_op(&(s->op),size-sizeof(struct palacios_user_keyed_stream_op))) {
1941             ERROR("unable to resize op in user key push response\n");
1942             palacios_spinlock_unlock_irqrestore(&(s->lock), flags);
1943             return -EFAULT;
1944         }
1945
1946         if (copy_from_user(s->op, (void __user *) argp, size)) {
1947             palacios_spinlock_unlock_irqrestore(&(s->lock), flags);
1948             return -EFAULT;
1949         }
1950
1951         do_response_to_request(s,&flags);
1952         // this will have unlocked s for us
1953
1954         return 1;
1955
1956         break;
1957         
1958     default:
1959         ERROR("unknown ioctl in user keyed stream\n");
1960
1961         return -EFAULT;
1962
1963         break;
1964         
1965     }
1966 }
1967
1968
1969 static int keyed_stream_release_user(struct inode *inode, struct file *filp)
1970 {
1971     struct user_keyed_stream *s = filp->private_data;
1972     unsigned long f1,f2;
1973
1974     palacios_spinlock_lock_irqsave(&(user_streams->lock),f1);
1975     palacios_spinlock_lock_irqsave(&(s->lock), f2);
1976
1977     list_del(&(s->node));
1978
1979     palacios_spinlock_unlock_irqrestore(&(s->lock), f2);
1980     palacios_spinlock_unlock_irqrestore(&(user_streams->lock), f1);
1981     
1982     palacios_free(s->url);
1983     palacios_free(s);
1984
1985     return 0;
1986 }
1987
1988 static struct file_operations user_keyed_stream_fops = {
1989     .poll = keyed_stream_poll_user,
1990     .compat_ioctl = keyed_stream_ioctl_user,
1991     .unlocked_ioctl = keyed_stream_ioctl_user,
1992     .release = keyed_stream_release_user,
1993 };
1994
1995
1996 /*
1997   user_keyed_streams are allocated on user connect, and deallocated on user release
1998   
1999   palacios-side opens and closes only manipulate the open type
2000 */
2001
2002 int keyed_stream_connect_user(struct v3_guest *guest, unsigned int cmd, unsigned long arg, void *priv_data)
2003 {
2004     int fd;
2005     unsigned long flags;
2006     char *url;
2007     uint64_t len;
2008     struct user_keyed_stream *s;
2009     
2010     if (!user_streams) { 
2011         ERROR("no user space keyed streams!\n");
2012         return -1;
2013     }
2014
2015     // get the url
2016     if (copy_from_user(&len,(void __user *)arg,sizeof(len))) { 
2017         ERROR("cannot copy url len from user\n");
2018         return -1;
2019     }
2020
2021     // overflow possible here, but only if this is a huge guest request (>4GB)
2022     url = palacios_alloc(len);
2023     
2024     if (!url) { 
2025         ERROR("cannot allocate url for user keyed stream\n");
2026         return -1;
2027     }
2028
2029     if (copy_from_user(url,((void __user *)arg)+sizeof(len),len)) {
2030         ERROR("cannot copy url from user\n");
2031         return -1;
2032     }
2033     url[len-1]=0;
2034         
2035     
2036     // Check for duplicate handler
2037     palacios_spinlock_lock_irqsave(&(user_streams->lock), flags);
2038     list_for_each_entry(s, &(user_streams->streams), node) {
2039         if (!strncasecmp(url, s->url, len)) {
2040             ERROR("user keyed stream connection with url \"%s\" already exists\n", url);
2041             palacios_free(url);
2042             return -1;
2043         }
2044     }
2045     palacios_spinlock_unlock_irqrestore(&(user_streams->lock), flags);
2046     
2047     // Create connection
2048     s = palacios_alloc(sizeof(struct user_keyed_stream));
2049     
2050     if (!s) {
2051         ERROR("cannot allocate new user keyed stream for %s\n",url);
2052         palacios_free(url);
2053         return -1;
2054     }
2055     
2056     
2057     // Get file descriptor
2058     fd = anon_inode_getfd("v3-kstream", &user_keyed_stream_fops, s, 0);
2059
2060     if (fd < 0) {
2061         ERROR("cannot allocate file descriptor for new user keyed stream for %s\n",url);
2062         palacios_free(s);
2063         palacios_free(url);
2064         return -1;
2065     }
2066     
2067     memset(s, 0, sizeof(struct user_keyed_stream));
2068     
2069     s->stype=STREAM_USER;
2070     s->url=url;
2071     
2072     init_waitqueue_head(&(s->user_wait_queue));
2073     init_waitqueue_head(&(s->host_wait_queue));
2074     
2075     // Insert connection into list
2076     palacios_spinlock_lock_irqsave(&(user_streams->lock), flags);
2077     list_add(&(s->node), &(user_streams->streams));
2078     palacios_spinlock_unlock_irqrestore(&(user_streams->lock), flags);
2079     
2080     return fd;
2081 }
2082     
2083 static struct user_keyed_stream *keyed_stream_user_find(char *url)
2084 {
2085     unsigned long flags;
2086     struct user_keyed_stream *s;
2087     
2088     if (!user_streams) { 
2089         ERROR("no user space keyed streams available\n");
2090         return NULL;
2091     }
2092     
2093     palacios_spinlock_lock_irqsave(&(user_streams->lock), flags);
2094     list_for_each_entry(s, &(user_streams->streams), node) {
2095         if (!strcasecmp(url, s->url)) {
2096             palacios_spinlock_unlock_irqrestore(&(user_streams->lock), flags);
2097             return s;
2098         }
2099     }
2100     
2101     palacios_spinlock_unlock_irqrestore(&(user_streams->lock), flags);
2102     
2103     return NULL;
2104 }
2105     
2106     
2107 static v3_keyed_stream_t open_stream_user(char *url, v3_keyed_stream_open_t ot)
2108 {
2109     unsigned long flags;
2110     struct user_keyed_stream *s;
2111     
2112     s = keyed_stream_user_find(url);
2113     
2114     if (!s) {
2115         ERROR("cannot open user stream %s as it does not exist yet\n",url);
2116         return NULL;
2117     }
2118
2119     palacios_spinlock_lock_irqsave(&(s->lock), flags);
2120
2121     if (s->waiting) {
2122         palacios_spinlock_unlock_irqrestore(&(s->lock), flags);
2123         ERROR("cannot open user stream %s as it is already in waiting state\n",url);
2124         return NULL;
2125     }
2126     
2127     s->otype = ot==V3_KS_WR_ONLY_CREATE ? V3_KS_WR_ONLY : ot;
2128     
2129     palacios_spinlock_unlock_irqrestore(&(s->lock), flags);
2130     
2131     return s;
2132     
2133 }
2134     
2135 // close stream does not do anything.  Creation of the stream and its cleanup
2136 // are driven by the user side, not the palacios side
2137 // might eventually want to reference count this, though
2138 static void close_stream_user(v3_keyed_stream_t stream)
2139 {
2140     return;
2141 }
2142
2143 static void preallocate_hint_key_user(v3_keyed_stream_t stream,
2144                                       char *key,
2145                                       uint64_t size)
2146 {
2147     return;
2148 }
2149
2150
2151
2152
2153 static v3_keyed_stream_key_t open_key_user(v3_keyed_stream_t stream, char *key)
2154 {
2155     unsigned long flags;
2156     struct user_keyed_stream *s = (struct user_keyed_stream *) stream;
2157     uint64_t   len = strlen(key)+1;
2158     void *user_key;
2159
2160     palacios_spinlock_lock_irqsave(&(s->lock), flags);
2161
2162
2163     if (resize_op(&(s->op),len)) {
2164         palacios_spinlock_unlock_irqrestore(&(s->lock),flags);
2165         ERROR("cannot resize op in opening key %s on user keyed stream %s\n",key,s->url);
2166         return NULL;
2167     }
2168
2169     s->op->type = PALACIOS_KSTREAM_OPEN_KEY;
2170     s->op->buf_len = len;
2171     strncpy(s->op->buf,key,len); // will terminate buffer
2172
2173     // enter with it locked
2174     if (do_request_to_response(s,&flags)) { 
2175         palacios_spinlock_unlock_irqrestore(&(s->lock),flags);
2176         ERROR("request/response handling failed\n");
2177         return NULL;
2178     }
2179     // return with it locked
2180
2181     user_key=s->op->user_key;
2182
2183     palacios_spinlock_unlock_irqrestore(&(s->lock),flags);
2184
2185     return user_key;
2186 }
2187
2188 static void close_key_user(v3_keyed_stream_t stream, v3_keyed_stream_key_t key)
2189 {
2190     struct user_keyed_stream *s = (struct user_keyed_stream *) stream;
2191     uint64_t   len = 0;
2192     unsigned long flags;
2193     
2194     palacios_spinlock_lock_irqsave(&(s->lock), flags);
2195
2196     if (resize_op(&(s->op),len)) {
2197         palacios_spinlock_unlock_irqrestore(&(s->lock),flags);
2198         ERROR("cannot resize op in closing key 0x%p on user keyed stream %s\n",key,s->url);
2199         return;
2200     }
2201
2202     s->op->type = PALACIOS_KSTREAM_CLOSE_KEY;
2203     s->op->buf_len = len;
2204     s->op->user_key = key;
2205
2206     // enter with it locked
2207     if (do_request_to_response(s,&flags)) { 
2208         palacios_spinlock_unlock_irqrestore(&(s->lock),flags);
2209         ERROR("request/response handling failed\n");
2210         return;
2211     }
2212     // return with it locked
2213
2214     palacios_spinlock_unlock_irqrestore(&(s->lock),flags);
2215
2216     return;
2217 }
2218
2219
2220
2221 static sint64_t read_key_user(v3_keyed_stream_t stream, v3_keyed_stream_key_t key,
2222                               void *tag,
2223                               sint64_t taglen,
2224                               void *buf, sint64_t rlen)
2225 {
2226
2227     struct user_keyed_stream *s = (struct user_keyed_stream *) stream;
2228     uint64_t   len = taglen ;
2229     sint64_t   xfer;
2230     unsigned long flags;
2231
2232     palacios_spinlock_lock_irqsave(&(s->lock), flags);
2233
2234     if (s->otype != V3_KS_RD_ONLY) { 
2235         palacios_spinlock_unlock_irqrestore(&(s->lock),flags);
2236         ERROR("attempt to read key from stream that is not in read state on %s\n",s->url);
2237     }   
2238
2239
2240     if (resize_op(&(s->op),len)) {
2241         palacios_spinlock_unlock_irqrestore(&(s->lock),flags);
2242         ERROR("cannot resize op in reading key 0x%p on user keyed stream %s\n",key,s->url);
2243         return -1;
2244     }
2245
2246     s->op->type = PALACIOS_KSTREAM_READ_KEY;
2247     s->op->buf_len = len ;
2248     s->op->xfer = rlen;
2249     s->op->user_key = key;
2250     s->op->data_off = taglen;
2251
2252     memcpy(s->op->buf,tag,taglen);
2253
2254     // enter with it locked
2255     if (do_request_to_response(s,&flags)) { 
2256         palacios_spinlock_unlock_irqrestore(&(s->lock),flags);
2257         ERROR("request/response handling failed\n");
2258         return -1;
2259     }
2260     // return with it locked
2261
2262
2263     if (s->op->xfer>0) { 
2264         // data_off must be zero
2265         memcpy(buf,s->op->buf,s->op->xfer);
2266     }
2267
2268     xfer=s->op->xfer;
2269
2270     palacios_spinlock_unlock_irqrestore(&(s->lock),flags);
2271
2272     return xfer;
2273 }
2274
2275
2276 static sint64_t write_key_user(v3_keyed_stream_t stream, v3_keyed_stream_key_t key,
2277                                void *tag,
2278                                sint64_t taglen,
2279                                void *buf, sint64_t wlen)
2280 {
2281
2282     struct user_keyed_stream *s = (struct user_keyed_stream *) stream;
2283     uint64_t   len = taglen + wlen ;
2284     sint64_t   xfer;
2285     unsigned long flags;
2286
2287
2288     palacios_spinlock_lock_irqsave(&(s->lock), flags);
2289
2290     if (s->otype != V3_KS_WR_ONLY) { 
2291         palacios_spinlock_unlock_irqrestore(&(s->lock),flags);
2292         ERROR("attempt to write key on stream that is not in write state on %s\n",s->url);
2293     }   
2294
2295     if (resize_op(&(s->op),len)) {
2296         palacios_spinlock_unlock_irqrestore(&(s->lock),flags);
2297         ERROR("cannot resize op in reading key 0x%p on user keyed stream %s\n",key,s->url);
2298         return -1;
2299     }
2300
2301     s->op->type = PALACIOS_KSTREAM_WRITE_KEY;
2302     s->op->buf_len = len;
2303     s->op->xfer = wlen;
2304     s->op->user_key = key;
2305     s->op->data_off = taglen;
2306
2307     memcpy(s->op->buf,tag,taglen);
2308     memcpy(s->op->buf+taglen,buf,wlen);
2309
2310     // enter with it locked
2311     if (do_request_to_response(s,&flags)) { 
2312         palacios_spinlock_unlock_irqrestore(&(s->lock),flags);
2313         ERROR("request/response handling failed\n");
2314         return -1;
2315     }
2316     // return with it locked
2317
2318     // no data comes back, xfer should be size of data write (not tag)
2319
2320     xfer=s->op->xfer;
2321
2322     palacios_spinlock_unlock_irqrestore(&(s->lock),flags);
2323
2324     return xfer;
2325 }
2326
2327
2328
2329 /****************************************************************************************
2330  *    Network-based implementation  ("net:")
2331  *****************************************************************************************/
2332
2333
2334 #define NET_MAX_KEY_LEN 128
2335
2336 struct net_keyed_stream {
2337     int stype;
2338     int ot;
2339     struct net_stream * ns;
2340 };
2341
2342 struct net_stream {
2343     int stype;
2344     struct socket *sock;
2345 };
2346
2347
2348 //ignore the arguments given here currently
2349 static struct net_stream * create_net_stream(void) 
2350 {
2351     struct net_stream * ns = NULL;
2352
2353     ns = palacios_alloc(sizeof(struct net_stream));
2354     
2355     if (!ns) { 
2356         ERROR("Cannot allocate a net_stream\n");
2357         return 0;
2358     }
2359
2360     memset(ns, 0, sizeof(struct net_stream));
2361
2362     ns->stype = STREAM_NETWORK;
2363
2364     return ns;
2365 }
2366
2367 static void close_socket(v3_keyed_stream_t stream)
2368 {
2369     struct net_keyed_stream *nks = (struct net_keyed_stream *) stream;
2370
2371     if (nks) { 
2372         struct net_stream *ns = nks->ns;
2373
2374         if (ns) {
2375             ns->sock->ops->release(ns->sock);
2376             palacios_free(ns);
2377             ERROR("Close Socket\n");
2378         }
2379         
2380         palacios_free(ns);
2381     }
2382 }
2383
2384
2385 static void close_stream_net(v3_keyed_stream_t stream)
2386 {
2387         close_socket(stream);
2388 }
2389
2390 static int connect_to_ip(struct net_stream *ns, int hostip, int port)
2391 {
2392     struct sockaddr_in client;
2393
2394     if (ns == NULL) {
2395         return -1;
2396     }
2397
2398     if (sock_create(PF_INET,SOCK_STREAM,IPPROTO_TCP,&(ns->sock))<0) { 
2399         ERROR("Cannot create accept socket\n");
2400         return -1;
2401     }
2402         
2403
2404     client.sin_family = AF_INET;
2405     client.sin_port = htons(port);
2406     client.sin_addr.s_addr = hostip;//in_aton(hostip);
2407
2408     return ns->sock->ops->connect(ns->sock, (struct sockaddr *)&client, sizeof(client), 0);
2409 }
2410
2411 static int send_msg(struct net_stream *ns,  char * buf, int len)
2412 {
2413     int left=len;
2414
2415     if (!ns) { 
2416         ERROR("Send message on null net_stream\n");
2417         return -1;
2418     }
2419
2420     if (!(ns->sock)) { 
2421         ERROR("Send message on net_stream without socket\n");
2422         return -1;
2423     }
2424
2425     while (left>0) {
2426
2427         struct msghdr msg;
2428         mm_segment_t oldfs;
2429         struct iovec iov;
2430         int err = 0;
2431         
2432
2433         msg.msg_flags = MSG_NOSIGNAL;//MSG_DONTWAIT;
2434         msg.msg_name = 0;
2435         msg.msg_namelen = 0;
2436         msg.msg_control = NULL;
2437         msg.msg_controllen = 0;
2438 #if LINUX_VERSION_CODE < KERNEL_VERSION(3,19,0)
2439         msg.msg_iov = &iov;
2440         msg.msg_iovlen = 1;
2441 #else
2442         iov_iter_init(&(msg.msg_iter),WRITE,&iov,1,0);
2443 #endif
2444
2445         iov.iov_base = (char *)&(buf[len-left]);
2446         iov.iov_len = (size_t)left;
2447
2448         oldfs = get_fs();
2449         set_fs(KERNEL_DS);
2450
2451         err = sock_sendmsg(ns->sock, &msg, (size_t)left);
2452
2453         set_fs(oldfs);
2454         
2455         if (err<0) {
2456             ERROR("Send msg error %d\n",err);
2457             return err;
2458         } else {
2459             left-=len;
2460         }
2461     }
2462
2463     return len;
2464 }
2465
2466
2467
2468 static int recv_msg(struct net_stream *ns, char * buf, int len)
2469 {
2470
2471     int left=len;
2472
2473     if (!ns) { 
2474         ERROR("Receive message on null net_stream\n");
2475         return -1;
2476     }
2477
2478     if (!(ns->sock)) { 
2479         ERROR("Receive  message on net_stream without socket\n");
2480         return -1;
2481     }
2482     
2483     
2484     while (left>0) {
2485         
2486         struct msghdr msg;
2487         mm_segment_t oldfs;
2488         struct iovec iov;
2489         int err;
2490         
2491         msg.msg_flags = 0;
2492         msg.msg_name = 0;
2493         msg.msg_namelen = 0;
2494         msg.msg_control = NULL;
2495         msg.msg_controllen = 0;
2496 #if LINUX_VERSION_CODE < KERNEL_VERSION(3,19,0)
2497         msg.msg_iov = &iov;
2498         msg.msg_iovlen = 1;
2499 #else
2500         iov_iter_init(&(msg.msg_iter),READ,&iov,1,0);
2501 #endif
2502         
2503         iov.iov_base = (void *)&(buf[len-left]);
2504         iov.iov_len = (size_t)left;
2505         
2506         oldfs = get_fs();
2507         set_fs(KERNEL_DS);
2508         
2509         err = sock_recvmsg(ns->sock, &msg, (size_t)left, 0);
2510         
2511         set_fs(oldfs);
2512         
2513         if (err<0) { 
2514             return err;
2515         } else {
2516             left -= err;
2517         }
2518     }
2519     return len;
2520 }
2521
2522 static struct net_stream * accept_once(struct net_stream * ns, const int port)
2523 {
2524     struct socket *accept_sock;
2525     struct sockaddr_in addr;
2526     int err;
2527     
2528     if (!ns) { 
2529         ERROR("Accept called on null net_stream\n");
2530         return 0;
2531     }
2532     
2533     if (sock_create(PF_INET,SOCK_STREAM,IPPROTO_TCP,&accept_sock)<0) { 
2534         ERROR("Cannot create accept socket\n"); 
2535         return NULL;
2536     }
2537     
2538
2539     addr.sin_family = AF_INET;
2540     addr.sin_port = htons(port);
2541     addr.sin_addr.s_addr = INADDR_ANY;
2542     
2543     err = accept_sock->ops->bind(accept_sock, (struct sockaddr *)&addr, sizeof(addr));
2544     
2545     if (err<0) {
2546         ERROR("Bind err: %d\n",err);
2547         return NULL;
2548     }
2549
2550     err = accept_sock->ops->listen(accept_sock,2);
2551     
2552     if (err<0) {
2553         ERROR("Listen err: %d\n",err);
2554         return NULL;
2555     }
2556     
2557     // Init the socket in the network strream
2558
2559     if (sock_create(PF_INET,SOCK_STREAM,IPPROTO_TCP,&(ns->sock))<0) { 
2560         ERROR("Cannot create socket\n");
2561         return NULL;
2562     }
2563     
2564     
2565     // Do the actual accept 
2566
2567     if (accept_sock->ops->accept(accept_sock,ns->sock,0)<0) {
2568         ERROR("accept failed");
2569         return NULL;
2570     }
2571     
2572     // close the accept socket
2573     accept_sock->ops->release(accept_sock);
2574     palacios_free(accept_sock);
2575
2576     return ns;
2577 }
2578
2579
2580 static struct v3_keyed_stream_t * open_stream_net(char * url,v3_keyed_stream_open_t ot)
2581 {
2582     struct net_keyed_stream * nks;
2583     int url_len;
2584     int i;
2585     int delimit[3];
2586     int k;
2587     char mode;
2588     int ip_len;
2589     int port_len;
2590
2591     nks = palacios_alloc(sizeof(struct net_keyed_stream)); 
2592
2593     if (!nks) { 
2594         ERROR("Could not allocate space in open_stream_net\n");
2595         return 0;
2596     }
2597     
2598     nks->ot = ot == V3_KS_WR_ONLY_CREATE ? V3_KS_WR_ONLY : ot;
2599
2600     nks->stype = STREAM_NETWORK; 
2601
2602     nks->ns = create_net_stream();
2603     
2604     if (!(nks->ns)) { 
2605         ERROR("Could not create network stream\n");
2606         palacios_free(nks);
2607         return 0;
2608     }
2609
2610     url_len=strlen(url);
2611     k=0;
2612
2613
2614     for(i = 0; i < url_len;i++){
2615         if(url[i] == ':'){
2616             delimit[k] = i;
2617             k++;        
2618         }
2619     }
2620
2621     mode = url[delimit[0] + 1];
2622     ip_len = delimit[2] - delimit[1];
2623     port_len = url_len - delimit[2];
2624
2625
2626     {
2627         char ip[ip_len];
2628         char port[port_len];
2629         int host_ip;
2630         int host_port;
2631
2632
2633         strncpy(ip,url + delimit[1]+1,ip_len-1);
2634         ip[ip_len-1]='\0';
2635         
2636         host_ip = in_aton(ip);
2637         
2638         strncpy(port,url+ delimit[2]+1,port_len-1);
2639         port[port_len-1]='\0';
2640         
2641         host_port = simple_strtol(port,NULL,10);
2642
2643         INFO("ip is %s\n",ip); 
2644         INFO("host_ip is %x\n", host_ip);
2645         INFO("port is %s (%d)\n",port,host_port);
2646         
2647         if (mode == 'a'){
2648             // accept a request
2649             INFO("Accepting Connection on INADDR_ANY port:%d\n",host_port);
2650             nks->ns = accept_once(nks->ns, host_port);
2651         } else if (mode == 'c'){
2652             // call connect to ip
2653             INFO("Connecting to %s:%d\n",ip,host_port);
2654             connect_to_ip(nks->ns,host_ip, host_port);
2655         } else {
2656             ERROR("Mode not recognized\n");
2657             palacios_free(nks);
2658             return NULL;
2659         }
2660         
2661         return (v3_keyed_stream_t)nks;
2662     }
2663 }
2664
2665 static void preallocate_hint_key_net(v3_keyed_stream_t stream, char *key,uint64_t size)
2666 {
2667     //do nothing
2668 }
2669
2670 static v3_keyed_stream_key_t open_key_net(v3_keyed_stream_t stream,char *key)
2671 {
2672    struct net_keyed_stream * nks = (struct net_keyed_stream *)stream;
2673
2674    // reciever of the key name 
2675    if (nks->ot==V3_KS_WR_ONLY)
2676    {
2677        unsigned short keylen = strlen(key);
2678
2679        if (keylen>NET_MAX_KEY_LEN || keylen>=32768) { 
2680            ERROR("Key is too long\n");
2681            return NULL;
2682        }
2683
2684        {
2685            // on-stack allocation here demands that we
2686            // keep key length low...
2687            char msg[keylen+3];
2688            int next = 0;
2689            
2690            // Opening a key for writing sends a notice of the 
2691            // key length and the key name on the channel
2692            
2693            msg[next++]=keylen & 0xFF;
2694            msg[next]=(keylen>>8) & 0xFF;
2695            // Set flag bit
2696            msg[next]=msg[next] | 0x80; // 0x80 is 128 and OR will flip leading bit to 1
2697            
2698            strncpy(msg+2,key,keylen);  // will also copy trailing zero
2699            
2700            if (send_msg(nks->ns,msg,keylen+2) != keylen+2) { 
2701                ERROR("Unable to open key for writing on net_stream (send key len+name)\n");
2702                return NULL;
2703            }
2704        }
2705    }
2706
2707    if (nks->ot==V3_KS_RD_ONLY)   {
2708        char msg_info[2];
2709        int next = 0;
2710        int keylen = 0;
2711
2712        if (recv_msg(nks->ns,msg_info,2) != 2) { 
2713            ERROR("Unable to open key for reading on net_stream (recv key len)\n");
2714            return NULL;
2715        }
2716
2717        next = 0;
2718        keylen = 0;
2719
2720        keylen |= msg_info[next++];
2721
2722        if ((msg_info[next] & 0x80) != 0x80)  {
2723            ERROR("Flag bit not set on receive of key length\n");
2724            return NULL;
2725        } else {
2726            msg_info[next] &= 0x7F; // flip the msb back to zero (clear flag)
2727        }
2728        
2729        keylen |= msg_info[next]<<8;
2730
2731        if (keylen > NET_MAX_KEY_LEN) { 
2732            ERROR("Received key length is too big\n");
2733            return NULL;
2734        }
2735        
2736        {
2737            
2738            char msg[keylen+1];
2739            
2740            if (recv_msg(nks->ns,msg,keylen) != keylen) { 
2741                ERROR("Unable to receive key\n");
2742                return NULL;
2743            }
2744            msg[keylen]=0;
2745            
2746            if (strncmp(key,msg,keylen)!=0) {
2747                ERROR("Key mismatch in open_key_net - expect %s but got %s\n",key,msg);
2748                return NULL;
2749            }
2750        }
2751    }
2752    
2753    return (v3_keyed_stream_key_t)key;
2754 }
2755
2756 static void close_key_net(v3_keyed_stream_t stream, v3_keyed_stream_key_t input_key)
2757 {
2758     char * key = (char*)input_key;
2759     struct net_keyed_stream * nks = (struct net_keyed_stream *)stream;
2760
2761     
2762     if (nks->ot==V3_KS_WR_ONLY) {
2763         unsigned short keylen = strlen(key);
2764
2765         if (keylen > NET_MAX_KEY_LEN || keylen>=32768) {
2766             ERROR("Key length too long in close_key_net\n");
2767             return;
2768         }
2769
2770         {
2771             char msg[keylen+3];
2772             int next = 0;
2773             
2774             msg[next++]=keylen & 0xFF;
2775             msg[next]=(keylen>>8) & 0xFF;
2776             // flag
2777             msg[next]=msg[next] | 0x80; // 0x80 is 128 and OR will filp leading bit to 1
2778             strncpy(msg+2,key,keylen); // will copy the zero
2779             msg[keylen+2]=0;
2780             if (send_msg(nks->ns,msg,keylen+2)!=keylen+2) { 
2781                 ERROR("Cannot send key on close_key_net\n");
2782                 return;
2783             }
2784         }
2785     }
2786     
2787     if (nks->ot==V3_KS_RD_ONLY)   {
2788         char msg_info[2];
2789         int next;
2790         int keylen;
2791         
2792         if (recv_msg(nks->ns,msg_info,2) != 2) { 
2793             ERROR("Cannot recv key length on close_key_net\n");
2794             return;
2795         }
2796         
2797         next = 0;
2798         keylen = 0;
2799         
2800         keylen |= msg_info[next++];
2801         
2802         if ((msg_info[next] & 0x80) != 0x80) {
2803             ERROR("Missing flag in close_key_net receive\n");
2804             return;
2805         } 
2806         
2807         msg_info[next] &= 0x7F; // flip the msb back to zero
2808         
2809         keylen |= msg_info[next]<<8;
2810         
2811         {
2812             char msg[keylen+1];
2813             
2814             if (recv_msg(nks->ns,msg,keylen)!=keylen) { 
2815                 ERROR("Did not receive all of key in close_key_net receive\n");
2816                 return;
2817             }
2818             
2819             msg[keylen]=0;
2820             
2821             if (strncmp(key,msg,keylen)!=0)  {
2822                 ERROR("Key mismatch in close_key_net - expect %s but got %s\n",key,msg);
2823                 return;
2824             }
2825         }
2826     }
2827 }
2828
2829 static sint64_t write_key_net(v3_keyed_stream_t stream, v3_keyed_stream_key_t key, 
2830                               void *tag,
2831                               sint64_t taglen,
2832                               void *buf, sint64_t len) 
2833 {
2834     struct net_keyed_stream * nks = (struct net_keyed_stream *)stream;
2835
2836     if (!buf) { 
2837         ERROR("Buf is NULL in write_key_net\n");
2838         return -1;
2839     }
2840
2841     if (!tag) { 
2842         ERROR("Tag is NULL in write_key_net\n");
2843         return -1;
2844     }
2845
2846     if (len<0) {
2847         ERROR("len is negative in write_key_net\n");
2848         return -1;
2849     }
2850
2851     if (taglen<0) {
2852         ERROR("taglen is negative in write_key_net\n");
2853         return -1;
2854     }
2855     
2856     if (!key){
2857        ERROR("write_key: key is NULL in write_key_net\n");
2858        return -1;
2859     }
2860     
2861     
2862     if (!nks)  {
2863         ERROR("nks is NULL in write_key_net\n");
2864         return -1;
2865     }
2866     
2867     if (nks->ot==V3_KS_WR_ONLY) {
2868         if (send_msg(nks->ns,(char*)(&BOUNDARY_TAG),sizeof(BOUNDARY_TAG))!=sizeof(BOUNDARY_TAG)) { 
2869             ERROR("Could not send boundary tag in write_key_net\n");
2870             return -1;
2871         } 
2872         if (send_msg(nks->ns,(char*)(&taglen),sizeof(taglen))!=sizeof(taglen)) { 
2873             ERROR("Could not send tag length in write_key_net\n");
2874             return -1;
2875         } 
2876         if (send_msg(nks->ns,tag,taglen)!=taglen) { 
2877             ERROR("Could not send tag in write_key_net\n");
2878             return -1;
2879         }
2880         if (send_msg(nks->ns,(char*)(&len),sizeof(len))!=sizeof(len)) { 
2881             ERROR("Could not send data length in write_key_net\n");
2882             return -1;
2883         } 
2884         if (send_msg(nks->ns,buf,len)!=len) { 
2885             ERROR("Could not send data in write_key_net\n");
2886             return -1;
2887         }
2888     }  else {
2889         ERROR("Permission not correct in write_key_net\n");
2890         return -1;
2891     }
2892     
2893     return len;
2894 }
2895
2896
2897 static sint64_t read_key_net(v3_keyed_stream_t stream, v3_keyed_stream_key_t key,
2898                              void *tag,
2899                              sint64_t taglen,
2900                              void *buf, sint64_t len)
2901 {
2902     struct net_keyed_stream * nks = (struct net_keyed_stream *)stream;
2903     void *temptag;
2904
2905     if (!buf) {
2906         ERROR("Buf is NULL in read_key_net\n");
2907         return -1;
2908     }
2909
2910     if (!tag) {
2911         ERROR("Tag is NULL in read_key_net\n");
2912         return -1;
2913     }
2914     
2915     if(len<0) {
2916         ERROR("len is negative in read_key_net\n");
2917         return -1;
2918     }
2919
2920     if(taglen<0) {
2921         ERROR("taglen is negative in read_key_net\n");
2922         return -1;
2923     }
2924     
2925     if (!key) {
2926         ERROR("read_key: key is NULL in read_key_net\n");
2927         return -1;
2928     }
2929
2930
2931     if (nks->ot==V3_KS_RD_ONLY) {
2932         
2933         sint64_t slen;
2934         uint32_t tempbt;
2935         
2936         if (recv_msg(nks->ns,(char*)(&tempbt),sizeof(tempbt))!=sizeof(tempbt)) { 
2937             ERROR("Cannot receive boundary tag in read_key_net\n");
2938             return -1;
2939         }
2940
2941         if (tempbt!=BOUNDARY_TAG) { 
2942           ERROR("Invalid boundary tag (received 0x%x\n",tempbt);
2943           return -1;
2944         }
2945            
2946         temptag=palacios_alloc(taglen);
2947         if (!temptag) {
2948           ERROR("failed to allocate temptag\n");
2949           return -1;
2950         }
2951
2952         if (recv_msg(nks->ns,(char*)(&slen),sizeof(slen))!=sizeof(slen)) { 
2953             ERROR("Cannot receive tag len in read_key_net\n");
2954             palacios_free(temptag);
2955             return -1;
2956         }
2957
2958         if (slen!=taglen) {
2959             ERROR("Tag len expected does not matched tag len decoded in read_key_net\n");
2960             palacios_free(temptag);
2961             return -1;
2962         }
2963
2964         if (recv_msg(nks->ns,temptag,taglen)!=taglen) { 
2965             ERROR("Cannot recieve tag in read_key_net\n");
2966             palacios_free(temptag);
2967             return -1;
2968         }
2969
2970         if (memcmp(temptag,tag,taglen)) { 
2971           ERROR("Tag mismatch\n");
2972           palacios_free(temptag);
2973           return -1;
2974         }
2975
2976         if (recv_msg(nks->ns,(char*)(&slen),sizeof(slen))!=sizeof(slen)) { 
2977             ERROR("Cannot receive data len in read_key_net\n");
2978             palacios_free(temptag);
2979             return -1;
2980         }
2981
2982         if (slen!=len) {
2983             ERROR("Data len expected does not matched data len decoded in read_key_net\n");
2984             palacios_free(temptag);
2985             return -1;
2986         }
2987
2988         if (recv_msg(nks->ns,buf,len)!=len) { 
2989             ERROR("Cannot recieve data in read_key_net\n");
2990             palacios_free(temptag);
2991             return -1;
2992         }
2993
2994         palacios_free(temptag);
2995         
2996     } else {
2997         ERROR("Permissions incorrect for the stream in read_key_net\n");
2998         return -1;
2999     }
3000
3001     return len;
3002     
3003 }
3004
3005
3006 /***************************************************************************************************
3007   Generic interface
3008 *************************************************************************************************/
3009
3010 static v3_keyed_stream_t open_stream(char *url,
3011                                      v3_keyed_stream_open_t ot)
3012 {
3013     if (!strncasecmp(url,"mem:",4)) { 
3014         return open_stream_mem(url,ot);
3015     } else if (!strncasecmp(url,"file:",5)) { 
3016         return open_stream_file(url,ot);
3017     } else if (!strncasecmp(url,"user:",5)) { 
3018         return open_stream_user(url,ot);
3019     } else if (!strncasecmp(url,"net:",4)){
3020         return open_stream_net(url,ot);
3021     } else if (!strncasecmp(url,"textfile:",9)) { 
3022         return open_stream_textfile(url,ot);
3023     } else {
3024         ERROR("unsupported type in attempt to open keyed stream \"%s\"\n",url);
3025         return 0;
3026     }
3027 }
3028
3029 static void close_stream(v3_keyed_stream_t stream)
3030 {
3031     struct generic_keyed_stream *gks = (struct generic_keyed_stream *) stream;
3032     switch (gks->stype){ 
3033         case STREAM_MEM:
3034             return close_stream_mem(stream);
3035             break;
3036         case STREAM_FILE:
3037             return close_stream_file(stream);
3038             break;
3039         case STREAM_TEXTFILE:
3040             return close_stream_textfile(stream);
3041             break;
3042         case STREAM_USER:
3043             return close_stream_user(stream);
3044             break;
3045         case STREAM_NETWORK:
3046             return close_stream_net(stream);
3047             break;
3048         default:
3049             ERROR("unknown stream type %d in close\n",gks->stype);
3050             break;
3051     }
3052 }
3053
3054 static void preallocate_hint_key(v3_keyed_stream_t stream,
3055                                  char *key,
3056                                  uint64_t size)
3057 {
3058     struct generic_keyed_stream *gks = (struct generic_keyed_stream *) stream;
3059     switch (gks->stype){ 
3060         case STREAM_MEM:
3061             preallocate_hint_key_mem(stream,key,size);
3062             break;
3063         case STREAM_FILE:
3064             preallocate_hint_key_file(stream,key,size);
3065             break;
3066         case STREAM_TEXTFILE:
3067             preallocate_hint_key_textfile(stream,key,size);
3068             break;
3069         case STREAM_USER:
3070             return preallocate_hint_key_user(stream,key,size);
3071             break;
3072         case STREAM_NETWORK:
3073             return preallocate_hint_key_net(stream,key,size);
3074             break;
3075         default:
3076             ERROR("unknown stream type %d in preallocate_hint_key\n",gks->stype);
3077             break;
3078     }
3079     return;
3080 }
3081
3082
3083 static v3_keyed_stream_key_t open_key(v3_keyed_stream_t stream,
3084                                       char *key)
3085 {
3086     struct generic_keyed_stream *gks = (struct generic_keyed_stream *) stream;
3087     switch (gks->stype){ 
3088         case STREAM_MEM:
3089             return open_key_mem(stream,key);
3090             break;
3091         case STREAM_FILE:
3092             return open_key_file(stream,key);
3093             break;
3094         case STREAM_TEXTFILE:
3095             return open_key_textfile(stream,key);
3096             break;
3097         case STREAM_USER:
3098             return open_key_user(stream,key);
3099             break;
3100         case STREAM_NETWORK:
3101             return open_key_net(stream, key);
3102             break;
3103         default:
3104             ERROR("unknown stream type %d in open_key\n",gks->stype);
3105             break;
3106     }
3107     return 0;
3108 }
3109
3110
3111 static void close_key(v3_keyed_stream_t stream, 
3112                       v3_keyed_stream_key_t key)
3113 {
3114     struct generic_keyed_stream *gks = (struct generic_keyed_stream *) stream;
3115     switch (gks->stype){ 
3116         case STREAM_MEM:
3117             return close_key_mem(stream,key);
3118             break;
3119         case STREAM_FILE:
3120             return close_key_file(stream,key);
3121             break;
3122         case STREAM_TEXTFILE:
3123             return close_key_textfile(stream,key);
3124             break;
3125         case STREAM_USER:
3126             return close_key_user(stream,key);
3127             break;
3128          case STREAM_NETWORK:
3129             return close_key_net(stream, key);
3130             break;      
3131         default:
3132             ERROR("unknown stream type %d in close_key\n",gks->stype);
3133             break;
3134     }
3135     // nothing to do
3136     return;
3137 }
3138
3139 static sint64_t write_key(v3_keyed_stream_t stream, 
3140                           v3_keyed_stream_key_t key,
3141                           void *tag,
3142                           sint64_t taglen,
3143                           void *buf,
3144                           sint64_t len)
3145 {
3146     struct generic_keyed_stream *gks = (struct generic_keyed_stream *) stream;
3147     switch (gks->stype){ 
3148         case STREAM_MEM:
3149             return write_key_mem(stream,key,tag,taglen,buf,len);
3150             break;
3151         case STREAM_FILE:
3152             return write_key_file(stream,key,tag,taglen,buf,len);
3153             break;
3154         case STREAM_TEXTFILE:
3155             return write_key_textfile(stream,key,tag,taglen,buf,len);
3156             break;
3157         case STREAM_USER:
3158             return write_key_user(stream,key,tag,taglen,buf,len);
3159             break;
3160         case STREAM_NETWORK:
3161             return write_key_net(stream,key,tag,taglen,buf,len);
3162             break;
3163         default:
3164             ERROR("unknown stream type %d in write_key\n",gks->stype);
3165             return -1;
3166             break;
3167     }
3168     return -1;
3169 }
3170
3171
3172 static sint64_t read_key(v3_keyed_stream_t stream, 
3173                          v3_keyed_stream_key_t key,
3174                          void *tag,
3175                          sint64_t taglen,
3176                          void *buf,
3177                          sint64_t len)
3178 {
3179     struct generic_keyed_stream *gks = (struct generic_keyed_stream *) stream;
3180     switch (gks->stype){ 
3181         case STREAM_MEM:
3182           return read_key_mem(stream,key,tag,taglen,buf,len);
3183             break;
3184         case STREAM_FILE:
3185             return read_key_file(stream,key,tag,taglen,buf,len);
3186             break;
3187         case STREAM_TEXTFILE:
3188             return read_key_textfile(stream,key,tag,taglen,buf,len);
3189             break;
3190         case STREAM_USER:
3191             return read_key_user(stream,key,tag,taglen,buf,len);
3192             break;
3193         case STREAM_NETWORK:
3194             return read_key_net(stream,key,tag,taglen,buf,len);
3195             break;
3196         default:
3197             ERROR("unknown stream type %d in read_key\n",gks->stype);
3198             return -1;
3199             break;
3200     }
3201     return -1;
3202 }
3203
3204
3205
3206
3207 /***************************************************************************************************
3208   Hooks to palacios and inititialization
3209 *************************************************************************************************/
3210
3211     
3212 static struct v3_keyed_stream_hooks hooks = {
3213     .open = open_stream,
3214     .close = close_stream,
3215     .preallocate_hint_key = preallocate_hint_key,
3216     .open_key = open_key,
3217     .close_key = close_key,
3218     .read_key = read_key,
3219     .write_key = write_key
3220 };
3221
3222
3223 static int init_keyed_streams( void )
3224 {
3225     mem_streams = palacios_create_htable(DEF_NUM_STREAMS,hash_func,hash_comp);
3226
3227     if (!mem_streams) { 
3228         ERROR("failed to allocated stream pool for in-memory streams\n");
3229         return -1;
3230     }
3231
3232     user_streams = palacios_alloc(sizeof(struct user_keyed_streams));
3233
3234     if (!user_streams) { 
3235         ERROR("failed to allocated list for user streams\n");
3236         return -1;
3237     }
3238
3239     INIT_LIST_HEAD(&(user_streams->streams));
3240     
3241     palacios_spinlock_init(&(user_streams->lock));
3242
3243     V3_Init_Keyed_Streams(&hooks);
3244
3245     return 0;
3246
3247 }
3248
3249 static int deinit_keyed_streams( void )
3250 {
3251     palacios_free_htable(mem_streams,1,1);
3252
3253     palacios_spinlock_deinit(&(user_streams->lock));
3254
3255     palacios_free(user_streams);
3256
3257     WARNING("Deinit of Palacios Keyed Streams likely leaked memory\n");
3258
3259     return 0;
3260 }
3261
3262
3263 static int guest_init_keyed_streams(struct v3_guest * guest, void ** vm_data ) 
3264 {
3265     
3266     add_guest_ctrl(guest, V3_VM_KSTREAM_USER_CONNECT, keyed_stream_connect_user, 0);
3267     
3268     return 0;
3269 }
3270
3271
3272 static int guest_deinit_keyed_streams(struct v3_guest * guest, void * vm_data)
3273 {
3274     remove_guest_ctrl(guest, V3_VM_KSTREAM_USER_CONNECT);
3275
3276     return 0;
3277 }
3278
3279
3280
3281
3282 static struct linux_ext key_stream_ext = {
3283     .name = "KEYED_STREAM_INTERFACE",
3284     .init = init_keyed_streams,
3285     .deinit = deinit_keyed_streams,
3286     .guest_init = guest_init_keyed_streams,
3287     .guest_deinit = guest_deinit_keyed_streams, 
3288 };
3289
3290
3291 register_extension(&key_stream_ext);