Palacios Public Git Repository

To checkout Palacios execute

  git clone http://v3vee.org/palacios/palacios.web/palacios.git
This will give you the master branch. You probably want the devel branch or one of the release branches. To switch to the devel branch, simply execute
  cd palacios
  git checkout --track -b devel origin/devel
The other branches are similar.


99524e1b459e5eee235c752b982d53a71bc48ebd
[palacios.git] / linux_module / iface-keyed-stream.c
1 /*
2  * Palacios keyed stream interface
3  *
4  * Plus implementations for mem, file, and user space implementations
5  *
6  * (c) Peter Dinda, 2011 (interface, mem + file implementations + recooked user impl)
7  * (c) Clint Sbisa, 2011 (initial user space implementation on which this is based)
8  * (c) Diana Palsetia & Steve Rangel, 2012 (network based implementation)       
9  * (c) Peter Dinda, 2012 (updated interface, textfile)
10  */
11
12 #include <linux/fs.h>
13 #include <linux/file.h>
14 #include <linux/uaccess.h>
15 #include <linux/namei.h>
16 #include <linux/poll.h>
17 #include <linux/anon_inodes.h>
18
19 #include "palacios.h"
20 #include "util-hashtable.h"
21 #include "linux-exts.h"
22 #include "vm.h"
23
24 #define sint64_t int64_t
25 #include <interfaces/vmm_keyed_stream.h>
26
27 #include "iface-keyed-stream-user.h"
28
29 #include <interfaces/vmm_socket.h>
30
31 #include <linux/spinlock.h>
32 #include <asm/uaccess.h>
33 #include <linux/inet.h>
34 #include <linux/kthread.h>
35 #include <linux/netdevice.h>
36 #include <linux/ip.h>
37 #include <linux/in.h>
38 #include <linux/string.h>
39 #include <linux/preempt.h>
40 #include <linux/sched.h>
41 #include <linux/list.h>
42 #include <linux/syscalls.h>
43
44 #include <linux/module.h>
45 #include <linux/kernel.h>
46 #include <linux/socket.h>
47 #include <linux/net.h>
48 #include <linux/slab.h>
49
50
51 /*
52   This is an implementation of the Palacios keyed stream interface
53   that supports four flavors of streams:
54
55   "mem:"   Streams are stored in a hash table
56   The values for this hash table are hash tables associated with 
57   each stream.   A key maps to an expanding memory buffer.
58   Data is stored in a buffer like:
59
60   [boundarytag][taglen][tag][datalen][data]
61   [boundarytag][taglen][tag][datalen][data]
62   ...
63
64   "file:"  Streams are stored in files.  Each high-level
65   open corresponds to a directory, while a key corresponds to
66   a distinct file in that directory.   Data is stored in a file
67   like:
68
69   [boundarytag][taglen][tag][datalen][data]
70   [boundarytag][taglen][tag][datalen][data]
71   ...
72
73   "textfile:" Same as file, but data is stored in text format, like a
74   windows style .ini file.  A key maps to a file, and data is stored
75   in a file like:
76
77   [key]
78   tag=data_in_hex
79   tag=data_in_hex
80
81   This format makes it possible to concentenate the files to
82   produce a single "ini" file with "sections".
83
84
85   "net:"  Streams are carried over the network.  Each
86    high level open corresponds to a TCP connection, while
87    each key corresponds to a context on the stream.
88       "net:a:<ip>:<port>" => Bind to <ip>:<port> and accept a connection
89       "net:c:<ip>:<port>" => Connect to <ip>:<port>
90    "c" (client) 
91      open_stream: connect
92    "a" (server) 
93      open_stream: accept
94    "c" or "a":
95      open_key:  send [keylen-lastbyte-high-bit][key] (writer)
96            or   recv (same format as above)          (reader)
97      close_key: send [keylen-lastbyte-high-bit][key] (writer)
98            or   recv (same format as above)          (reader)
99      write_key: send [boundarytag][taglen][tag][datalen][data]
100      read_key:  recv (same format as above)
101      close_stream: close socket
102
103   "user:" Stream requests are bounced to user space to be 
104    handled there.  A rendezvous approach similar to the host 
105    device userland support is used
106
107    All keyed streams store the tags.
108    
109 */
110
111 #define STREAM_GENERIC 0
112 #define STREAM_MEM     1
113 #define STREAM_FILE    2
114 #define STREAM_USER    3
115 #define STREAM_NETWORK 4
116 #define STREAM_TEXTFILE 5
117
118 /*
119   All keyed streams and streams indicate their implementation type within the first field
120  */
121 struct generic_keyed_stream {
122     int stype;
123 };
124
125 struct generic_stream {
126     int stype;
127 };
128   
129 /*
130   boundary tags are used for some othe raw formats. 
131 */
132 static uint32_t BOUNDARY_TAG=0xabcd0123;
133
134
135 /****************************************************************************************
136    Memory-based implementation  ("mem:")
137 ****************************************************************************************/
138
139 #define DEF_NUM_STREAMS 16
140 #define DEF_NUM_KEYS    128
141 #define DEF_SIZE        128
142
143 /*
144   A memory keyed stream is a pointer to the underlying hash table
145   while a memory stream contains an extensible buffer for the stream
146  */
147 struct mem_keyed_stream {
148     int stype;
149     v3_keyed_stream_open_t ot;
150     struct hashtable *ht;
151 };
152
153 struct mem_stream {
154     int       stype;
155     char     *data;
156     uint32_t  size;
157     uint32_t  data_max;
158     uint32_t  ptr;
159 };
160
161 static struct mem_stream *create_mem_stream_internal(uint64_t size)
162 {
163     struct mem_stream *m = palacios_alloc(sizeof(struct mem_stream));
164
165     if (!m) {
166         return 0;
167     }
168
169
170     m->data = palacios_valloc(size);
171     
172     if (!m->data) { 
173         palacios_free(m);
174         return 0;
175     }
176
177     m->stype = STREAM_MEM;
178     m->size=size;
179     m->ptr=0;
180     m->data_max=0;
181     
182     return m;
183 }
184
185
186 static struct mem_stream *create_mem_stream(void)
187 {
188     return create_mem_stream_internal(DEF_SIZE);
189 }
190
191 static void destroy_mem_stream(struct mem_stream *m)
192 {
193     if (m) {
194         if (m->data) {
195             palacios_vfree(m->data);
196         }
197         m->data=0;
198         palacios_free(m);
199     }
200 }
201     
202 static int expand_mem_stream(struct mem_stream *m, uint32_t new_size)
203 {
204     void *data = palacios_valloc(new_size);
205     uint32_t nc;
206
207     if (!data) { 
208         return -1;
209     }
210     
211     nc = (new_size<m->data_max) ? new_size : m->data_max;
212
213     memcpy(data,m->data,nc);
214
215     palacios_vfree(m->data);
216
217     m->data=data;
218     m->size=new_size;
219     if (m->size<m->data_max) { 
220         m->data_max=m->size;
221     }
222    
223     return 0;
224 }
225
226 static uint32_t write_mem_stream(struct mem_stream *m,
227                                  void *data,
228                                  uint32_t len)
229 {
230     if ((m->ptr + len) > m->size) { 
231         if (expand_mem_stream(m,m->ptr + len)) { 
232             return 0;
233         }
234     }
235     memcpy(m->data+m->ptr,data,len);
236     m->ptr+=len;
237     m->data_max=m->ptr;
238     
239     return len;
240
241 }
242
243
244
245 static uint32_t read_mem_stream(struct mem_stream *m,
246                                 void *data,
247                                 uint32_t len)
248 {
249     if ((m->ptr + len) > m->data_max) { 
250         return 0;
251     }
252     memcpy(data,m->data+m->ptr,len);
253     m->ptr+=len;
254     
255     return len;
256
257 }
258
259
260 static void reset_mem_stream(struct mem_stream *m)
261 {
262     m->ptr=0;
263 }
264
265
266 static inline uint_t hash_func(addr_t key)
267 {
268     return palacios_hash_buffer((uchar_t*)key,strlen((uchar_t*)key));
269 }
270
271 static inline int hash_comp(addr_t k1, addr_t k2)
272 {
273     return strcasecmp((char*)k1,(char*)k2)==0;
274 }
275
276
277 // This stores all the memory keyed streams streams
278 static struct hashtable *mem_streams=0;
279
280
281 static v3_keyed_stream_t open_stream_mem(char *url,
282                                          v3_keyed_stream_open_t ot)
283 {
284
285     if (strncasecmp(url,"mem:",4)) { 
286         WARNING("illegitimate attempt to open memory stream \"%s\"\n",url);
287         return 0;
288     }
289
290     switch (ot) { 
291         case V3_KS_RD_ONLY:
292         case V3_KS_WR_ONLY: {
293             struct mem_keyed_stream *mks = (struct mem_keyed_stream *) palacios_htable_search(mem_streams,(addr_t)(url+4));
294             if (mks) { 
295                 mks->ot=ot;
296             }
297             return (v3_keyed_stream_t) mks;
298         }
299             break;
300
301         case V3_KS_WR_ONLY_CREATE: {
302             struct mem_keyed_stream *mks = (struct mem_keyed_stream *) palacios_htable_search(mem_streams,(addr_t)(url+4));
303             if (!mks) { 
304                 char *mykey;
305
306                 mykey = palacios_alloc(strlen(url+4)+1);
307
308                 if (!mykey) { 
309                     ERROR("cannot allocate space for new in-memory keyed stream %s\n",url);
310                     return 0;
311                 }
312
313                 strcpy(mykey,url+4); // will fit
314                 
315                 mks = (struct mem_keyed_stream *) palacios_alloc(sizeof(struct mem_keyed_stream));
316
317                 if (!mks) { 
318                     palacios_free(mykey);
319                     ERROR("cannot allocate in-memory keyed stream %s\n",url);
320                     return 0;
321                 }
322             
323                 mks->ht = (void*) palacios_create_htable(DEF_NUM_KEYS,hash_func,hash_comp);
324                 if (!mks->ht) { 
325                     palacios_free(mks);
326                     palacios_free(mykey);
327                     ERROR("cannot allocate in-memory keyed stream %s\n",url);
328                     return 0;
329                 }
330
331                 
332                 if (!palacios_htable_insert(mem_streams,(addr_t)(mykey),(addr_t)mks)) { 
333                     palacios_free_htable(mks->ht,1,1);
334                     palacios_free(mks);
335                     palacios_free(mykey);
336                     ERROR("cannot insert in-memory keyed stream %s\n",url);
337                     return 0;
338                 }
339                 mks->stype=STREAM_MEM;
340             }
341
342             mks->ot=V3_KS_WR_ONLY;
343             
344             return mks;
345             
346         }
347             break;
348
349         default:
350             ERROR("unsupported open type in open_stream_mem\n");
351             break;
352     }
353     
354     return 0;
355         
356 }
357
358
359
360 static void close_stream_mem(v3_keyed_stream_t stream)
361 {
362     // nothing to do
363     return;
364 }
365
366
367 static v3_keyed_stream_key_t open_key_mem(v3_keyed_stream_t stream,
368                                           char *key)
369 {
370     struct mem_keyed_stream *mks = (struct mem_keyed_stream *) stream;
371     struct hashtable *s = mks->ht;
372
373     struct mem_stream *m;
374
375     m = (struct mem_stream *) palacios_htable_search(s,(addr_t)key);
376
377     if (!m) { 
378         char *mykey = palacios_alloc(strlen(key)+1);
379
380         if (!mykey) { 
381             ERROR("cannot allocate copy of key for key %s\n",key);
382             return 0;
383         }
384
385         strcpy(mykey,key); // will fit
386
387         m = create_mem_stream();
388         
389         if (!m) { 
390             palacios_free(mykey);
391             ERROR("cannot allocate mem keyed stream for key %s\n",key);
392             return 0;
393         }
394
395         if (!palacios_htable_insert(s,(addr_t)mykey,(addr_t)m)) {
396             destroy_mem_stream(m);
397             palacios_free(mykey);
398             ERROR("cannot insert mem keyed stream for key %s\n",key);
399             return 0;
400         }
401     }
402
403     reset_mem_stream(m);
404     return m;
405
406 }
407
408
409 static void preallocate_hint_key_mem(v3_keyed_stream_t stream,
410                                      char *key,
411                                      uint64_t size)
412 {
413     struct mem_keyed_stream *mks = (struct mem_keyed_stream *) stream;
414     struct hashtable *s = mks->ht;
415
416     struct mem_stream *m;
417
418     if (mks->ot != V3_KS_WR_ONLY) { 
419         return;
420     }
421
422     m = (struct mem_stream *) palacios_htable_search(s,(addr_t)key);
423
424     if (!m) {
425         char *mykey;
426         
427         mykey=palacios_alloc(strlen(key)+1);
428         
429         if (!mykey) { 
430             ERROR("cannot allocate key space for preallocte for key %s\n",key);
431             return;
432         }
433         
434         strcpy(mykey,key); // will fit
435        
436         m = create_mem_stream_internal(size);
437         
438         if (!m) { 
439             ERROR("cannot preallocate mem keyed stream for key %s\n",key);
440             return;
441         }
442
443         if (!palacios_htable_insert(s,(addr_t)mykey,(addr_t)m)) {
444             ERROR("cannot insert preallocated mem keyed stream for key %s\n",key);
445             destroy_mem_stream(m);
446             return;
447         }
448     } else {
449         if (m->data_max < size) { 
450             if (expand_mem_stream(m,size)) { 
451                 ERROR("cannot expand key for preallocation for key %s\n",key);
452                 return;
453             }
454         }
455     }
456
457     return;
458
459 }
460
461 static void close_key_mem(v3_keyed_stream_t stream, 
462                           v3_keyed_stream_key_t key)
463 {
464     // nothing to do
465     return;
466 }
467
468 static sint64_t write_key_mem(v3_keyed_stream_t stream, 
469                               v3_keyed_stream_key_t key,
470                               void *tag,
471                               sint64_t taglen,
472                               void *buf,
473                               sint64_t len)
474 {
475   struct mem_keyed_stream *mks = (struct mem_keyed_stream *) stream;
476   struct mem_stream *m = (struct mem_stream *) key;
477   uint32_t mylen;
478   uint32_t writelen;
479   
480   if (mks->ot!=V3_KS_WR_ONLY) {
481     return -1;
482   }
483   
484   if (taglen<0 || len<0) { 
485     ERROR("Negative taglen or data len\n");
486     return -1;
487   }
488   
489   if (taglen>0xffffffffULL || len>0xffffffffULL) { 
490     ERROR("taglen or data len is too large\n");
491     return -1;
492   }
493       
494   writelen=write_mem_stream(m,&BOUNDARY_TAG,sizeof(BOUNDARY_TAG));
495   
496   if (writelen!=sizeof(BOUNDARY_TAG)) { 
497     ERROR("failed to write all data for boundary tag\n");
498     return -1;
499   }
500   
501   writelen=write_mem_stream(m,&taglen,sizeof(taglen));
502   
503   if (writelen!=sizeof(taglen)) { 
504     ERROR("failed to write taglen\n");
505     return -1;
506   }
507   
508   mylen = (uint32_t) taglen;
509   
510   writelen=write_mem_stream(m,tag,mylen);
511   
512   if (writelen!=mylen) { 
513     ERROR("failed to write all data for tag\n");
514     return -1;
515   } 
516
517   writelen=write_mem_stream(m,&len,sizeof(len));
518   
519   if (writelen!=sizeof(len)) { 
520     ERROR("failed to write datalen\n");
521     return -1;
522   }
523   
524   mylen = (uint32_t) len;
525
526   writelen=write_mem_stream(m,buf,mylen);
527
528   if (writelen!=mylen) { 
529     ERROR("failed to write all data for key\n");
530     return -1;
531   } else {
532     return (sint64_t)writelen;
533   }
534 }
535
536 static sint64_t read_key_mem(v3_keyed_stream_t stream, 
537                              v3_keyed_stream_key_t key,
538                              void *tag,
539                              sint64_t taglen,
540                              void *buf,
541                              sint64_t len)
542 {
543   struct mem_keyed_stream *mks = (struct mem_keyed_stream *) stream;
544   struct mem_stream *m = (struct mem_stream *) key;
545   uint32_t mylen;
546   uint32_t readlen;
547   void *temptag;
548   uint32_t tempbt;
549   sint64_t templen;
550   
551   
552   if (mks->ot!=V3_KS_RD_ONLY) {
553     return -1;
554   }
555   
556   if (len<0 || taglen<0) { 
557     ERROR("taglen or data len is negative\n");
558     return -1;
559   }
560
561   if (len>0xffffffffULL || taglen>0xffffffffULL) { 
562     ERROR("taglen or data len is too large\n");
563     return -1;
564   }
565   
566   readlen=read_mem_stream(m,&tempbt,sizeof(tempbt));
567   
568   if (readlen!=sizeof(tempbt)) { 
569     ERROR("failed to read all data for boundary tag\n");
570     return -1;
571   } 
572   
573   if (tempbt!=BOUNDARY_TAG) { 
574     ERROR("boundary tag not found (read 0x%x)\n",tempbt);
575     return -1;
576   }
577   
578   readlen=read_mem_stream(m,&templen,sizeof(templen));
579
580   if (readlen!=sizeof(templen)) { 
581     ERROR("failed to read all data for taglen\n");
582     return -1;
583   } 
584
585   if (templen!=taglen) { 
586     ERROR("tag size mismatch (requested=%lld, actual=%lld)\n",taglen,templen);
587     return -1;
588   }
589   
590   temptag = palacios_alloc(taglen);
591     
592   if (!temptag) { 
593     ERROR("cannot allocate temporary tag\n");
594     return -1;
595   }
596
597   mylen = (uint32_t) len;
598     
599   readlen=read_mem_stream(m,temptag,mylen);
600     
601   if (readlen!=mylen) { 
602     ERROR("failed to read all data for tag\n");
603     palacios_free(temptag);
604     return -1;
605   } 
606
607   if (memcmp(tag,temptag,taglen)) { 
608     ERROR("tag mismatch\n");
609     palacios_free(temptag);
610     return -1;
611   }
612   
613   palacios_free(temptag);
614
615   readlen=read_mem_stream(m,&templen,sizeof(templen));
616
617   if (readlen!=sizeof(templen)) { 
618     ERROR("failed to read all data for data len\n");
619     return -1;
620   } 
621   
622   if (templen!=len) { 
623     ERROR("data size mismatch (requested=%lld, actual=%lld)\n",len,templen);
624     return -1;
625   }
626
627   mylen = (uint32_t) len;
628   
629   readlen=read_mem_stream(m,buf,mylen);
630   
631   if (readlen!=mylen) { 
632     ERROR("failed to read all data for key\n");
633     return -1;
634   } else {
635     return (sint64_t)readlen;
636   }
637 }
638
639
640 /***************************************************************************************************
641   File-based implementation  ("file:")
642 *************************************************************************************************/
643
644 /*
645   A file keyed stream contains the fd of the directory
646   and a path
647 */
648
649 struct file_keyed_stream {
650     int   stype;
651     v3_keyed_stream_open_t ot;
652     char  *path;
653 };
654
655 struct file_stream {
656     int   stype;
657     struct file *f;   // the opened file
658 };
659
660
661 static v3_keyed_stream_t open_stream_file(char *url,
662                                           v3_keyed_stream_open_t ot)
663 {
664     struct file_keyed_stream *fks;
665     struct nameidata nd;
666
667     if (strncasecmp(url,"file:",5)) { 
668         WARNING("illegitimate attempt to open file stream \"%s\"\n",url);
669         return 0;
670     }
671
672     fks = palacios_alloc(sizeof(struct file_keyed_stream));
673     
674     if (!fks) { 
675         ERROR("cannot allocate space for file stream\n");
676         return 0;
677     }
678
679     fks->path = (char*)palacios_alloc(strlen(url+5)+1);
680     
681     if (!(fks->path)) { 
682         ERROR("cannot allocate space for file stream\n");
683         palacios_free(fks);
684         return 0;
685     }
686     
687     strcpy(fks->path,url+5); // will fit
688     
689     fks->stype=STREAM_FILE;
690
691     fks->ot= ot==V3_KS_WR_ONLY_CREATE ? V3_KS_WR_ONLY : ot;
692
693     // Does the directory exist, and can we read/write it?
694    
695     if (path_lookup(fks->path,LOOKUP_DIRECTORY|LOOKUP_FOLLOW,&nd)) { 
696
697         // directory does does not exist.  
698
699         if (ot==V3_KS_RD_ONLY || ot==V3_KS_WR_ONLY) { 
700
701             // we are not being asked to create it
702             ERROR("attempt to open %s, which does not exist\n",fks->path);
703             goto fail_out;
704
705         } else {
706
707             // We are being asked to create it
708
709             struct dentry *de;
710             int err;
711
712             // Find its parent
713             if (path_lookup(fks->path,LOOKUP_PARENT|LOOKUP_FOLLOW,&nd)) { 
714                 ERROR("attempt to create %s failed because its parent cannot be looked up\n",fks->path);
715                 goto fail_out;
716             }
717
718             // Can we write to the parent?
719
720             if (inode_permission(nd.path.dentry->d_inode, MAY_WRITE | MAY_EXEC)) { 
721                 ERROR("attempt to open %s, which has the wrong permissions for directory creation\n",fks->path);
722                 goto fail_out;
723             }
724
725             // OK, we can, so let's create it
726
727             de = lookup_create(&nd,1);
728
729             if (!de || IS_ERR(de)) { 
730                 ERROR("cannot allocate dentry\n");
731                 goto fail_out;
732             }
733
734             err = vfs_mkdir(nd.path.dentry->d_inode, de, 0700);
735
736             // lookup_create locks this for us!
737
738             mutex_unlock(&(nd.path.dentry->d_inode->i_mutex));
739
740             if (err) {
741                 ERROR("attempt to create %s failed because mkdir failed\n",fks->path);
742                 goto fail_out;
743             }
744
745             // now the directory should exist and have reasonable permissions
746             return (v3_keyed_stream_t) fks;
747         }
748     } 
749
750     
751     // we must be in V3_KS_RD_ONLY or V3_KS_WR_ONLY, 
752     // and the directory exists, so we must check the permissions
753
754     if (inode_permission(nd.path.dentry->d_inode, MAY_EXEC | (ot==V3_KS_RD_ONLY ? MAY_READ : MAY_WRITE))) {
755         ERROR("attempt to open %s, which has the wrong permissions\n",fks->path);
756         goto fail_out;
757     } else {
758         return (v3_keyed_stream_t) fks;
759     }
760
761
762  fail_out:
763     palacios_free(fks->path);
764     palacios_free(fks);
765     return 0;
766
767 }
768
769 static void close_stream_file(v3_keyed_stream_t stream)
770 {
771     struct file_keyed_stream *fks = (struct file_keyed_stream *) stream;
772     
773     palacios_free(fks->path);
774     palacios_free(fks);
775
776 }
777
778 static void preallocate_hint_key_file(v3_keyed_stream_t stream,
779                                       char *key,
780                                       uint64_t size)
781 {
782     return;
783 }
784
785 static v3_keyed_stream_key_t open_key_file(v3_keyed_stream_t stream,
786                                            char *key)
787 {
788     struct file_keyed_stream *fks = (struct file_keyed_stream *) stream;
789     struct file_stream *fs;
790     char *path;
791
792     // the path is the stream's path plus the key name
793     // file:/home/foo + "regext" => "/home/foo/regext"
794     path = (char *) palacios_alloc(strlen(fks->path)+strlen(key)+2);
795     if (!path) {                                
796         ERROR("cannot allocate file keyed stream for key %s\n",key);
797         return 0;
798     }
799     // this sequence will fit and terminate with a zero
800     strcpy(path,fks->path);
801     strcat(path,"/");
802     strcat(path,key);
803     
804     fs = (struct file_stream *) palacios_alloc(sizeof(struct file_stream));
805     
806     if (!fs) { 
807         ERROR("cannot allocate file keyed stream for key %s\n",key);
808         palacios_free(path);
809         return 0;
810     }
811
812     fs->stype=STREAM_FILE;
813
814     fs->f = filp_open(path,O_RDWR|O_CREAT|O_LARGEFILE,0600);
815
816     if (!fs->f || IS_ERR(fs->f)) {
817         ERROR("cannot open relevent file \"%s\" for stream \"file:%s\" and key \"%s\"\n",path,fks->path,key);
818         palacios_free(fs);
819         palacios_free(path);
820         return 0;
821     }
822
823     palacios_free(path);
824
825     return fs;
826 }
827
828
829 static void close_key_file(v3_keyed_stream_t stream, 
830                            v3_keyed_stream_key_t key)
831 {
832     struct file_stream *fs = (struct file_stream *) key;
833
834     filp_close(fs->f,NULL);
835
836     palacios_free(fs);
837 }
838
839
840 static sint64_t write_file(struct file_stream *fs, void *buf, sint64_t len)
841 {
842     ssize_t done, left, total;
843     mm_segment_t old_fs;
844
845     total=len;
846     left=len;
847
848     while (left>0) {
849         old_fs = get_fs();
850         set_fs(get_ds());
851         done = fs->f->f_op->write(fs->f, buf+(total-left), left, &(fs->f->f_pos));
852         set_fs(old_fs);
853         if (done<=0) {
854             return -1;
855         } else {
856             left -= done;
857         }
858     }
859
860     return len;
861 }
862
863 static sint64_t write_key_file(v3_keyed_stream_t stream, 
864                                v3_keyed_stream_key_t key,
865                                void *tag,
866                                sint64_t taglen,
867                                void *buf,
868                                sint64_t len)
869 {
870   struct file_keyed_stream *fks = (struct file_keyed_stream *) stream;
871   struct file_stream *fs = (struct file_stream *) key;
872   sint64_t writelen;
873   
874   if (fks->ot!=V3_KS_WR_ONLY) { 
875     return -1;
876   }
877   
878   if (taglen<0 || len<0) { 
879     ERROR("Negative taglen or data len\n");
880     return -1;
881   }
882   
883   writelen=write_file(fs,&BOUNDARY_TAG,sizeof(BOUNDARY_TAG));
884   
885   if (writelen!=sizeof(BOUNDARY_TAG)) { 
886     ERROR("failed to write all data for boundary tag\n");
887     return -1;
888   }
889   
890   writelen=write_file(fs,&taglen,sizeof(taglen));
891   
892   if (writelen!=sizeof(taglen)) { 
893     ERROR("failed to write taglen\n");
894     return -1;
895   }
896   
897   if (write_file(fs,tag,taglen)!=taglen) { 
898     ERROR("failed to write tag\n");
899     return -1;
900   }
901
902   writelen=write_file(fs,&len,sizeof(len));
903   
904   if (writelen!=sizeof(len)) { 
905     ERROR("failed to write data len\n");
906     return -1;
907   }
908   
909   return write_file(fs,buf,len);
910 }
911
912 static sint64_t read_file(struct file_stream *fs, void *buf, sint64_t len)
913 {
914     ssize_t done, left, total;
915     mm_segment_t old_fs;
916
917     total=len;
918     left=len;
919
920
921     while (left>0) {
922         old_fs = get_fs();
923         set_fs(get_ds());
924         done = fs->f->f_op->read(fs->f, buf+(total-left), left, &(fs->f->f_pos));
925         set_fs(old_fs);
926         if (done<=0) {
927             return -1;
928         } else {
929             left -= done;
930         }
931     }
932
933     return len;
934 }
935
936
937 static sint64_t read_key_file(v3_keyed_stream_t stream, 
938                               v3_keyed_stream_key_t key,
939                               void *tag,
940                               sint64_t taglen,
941                               void *buf,
942                               sint64_t len)
943 {
944   struct file_keyed_stream *fks = (struct file_keyed_stream *) stream;
945   struct file_stream *fs = (struct file_stream *) key;
946   void *temptag;
947   uint32_t tempbt;
948   sint64_t templen;
949   sint64_t readlen;
950   
951   if (fks->ot!=V3_KS_RD_ONLY) { 
952     return -1;
953   }
954   
955   if (len<0 || taglen<0) { 
956     ERROR("taglen or data len is negative\n");
957     return -1;
958   }
959
960   readlen=read_file(fs,&tempbt,sizeof(tempbt));
961   
962   if (readlen!=sizeof(tempbt)) { 
963     ERROR("failed to read all data for boundary tag\n");
964     return -1;
965   } 
966   
967   if (tempbt!=BOUNDARY_TAG) { 
968     ERROR("boundary tag not found (read 0x%x)\n",tempbt);
969     return -1;
970   }
971
972   readlen=read_file(fs,&templen,sizeof(templen));
973   
974   if (readlen!=sizeof(templen)) { 
975     ERROR("failed to read all data for tag len\n");
976     return -1;
977   } 
978
979   if (templen!=taglen) { 
980     ERROR("tag size mismatch (requested=%lld, actual=%lld)\n",taglen,templen);
981     return -1;
982   }
983
984   temptag=palacios_alloc(taglen);
985
986   if (!temptag) { 
987     ERROR("Cannot allocate temptag\n");
988     return -1;
989   }
990   
991   if (read_file(fs,temptag,taglen)!=taglen) { 
992     ERROR("Cannot read tag\n");
993     palacios_free(temptag);
994     return -1;
995   }
996
997   if (memcmp(temptag,tag,taglen)) { 
998     ERROR("Tag mismatch\n");
999     palacios_free(temptag);
1000     return -1;
1001   }
1002   
1003   palacios_free(temptag);
1004
1005   readlen=read_file(fs,&templen,sizeof(templen));
1006   
1007   if (readlen!=sizeof(templen)) { 
1008     ERROR("failed to read all data for data len\n");
1009     return -1;
1010   } 
1011
1012   if (templen!=len) { 
1013     ERROR("datasize mismatch (requested=%lld, actual=%lld)\n",len,templen);
1014     return -1;
1015   }
1016
1017   return read_file(fs,buf,len);
1018
1019 }
1020
1021
1022 /***************************************************************************************************
1023   Textfile-based implementation  ("textfile:")
1024
1025   Note that this implementation uses the internal structure and functions of the 
1026   "file:" implementation
1027 *************************************************************************************************/
1028
1029 // optimize the reading and decoding of hex data
1030 // this weakens the parser, so that:
1031 // tag =0A0B0D 
1032 // will work, but
1033 // tag = 0A 0B 0D
1034 // will not.  Note the leading whitespace
1035 #define TEXTFILE_OPT_HEX 0
1036
1037 //
1038 // The number of bytes handled at a time by the hex putter and getter
1039 //
1040 #define MAX_HEX_SEQ 64
1041
1042 /*
1043   A text file keyed stream is a file_keyed_stream,
1044   only with a different stype
1045 */
1046
1047 #define PAUSE()  
1048 //#define PAUSE() ssleep(5) 
1049
1050
1051 typedef struct file_keyed_stream textfile_keyed_stream;
1052
1053 typedef struct file_stream textfile_stream;
1054
1055
1056 static v3_keyed_stream_t open_stream_textfile(char *url,
1057                                               v3_keyed_stream_open_t ot)
1058 {
1059   textfile_keyed_stream *me;
1060
1061   if (strncasecmp(url,"textfile:",9)) { 
1062     WARNING("illegitimate attempt to open textfile stream \"%s\"\n",url);
1063     return 0;
1064   }
1065
1066   me = (textfile_keyed_stream *) open_stream_file(url+4, ot);
1067
1068   if (!me) {
1069     ERROR("could not create underlying file stream\n");
1070     return 0;
1071   }
1072
1073   me->stype=STREAM_TEXTFILE;
1074
1075   return me;
1076 }
1077
1078   
1079
1080 static void close_stream_textfile(v3_keyed_stream_t stream)
1081 {
1082   textfile_keyed_stream *me = stream;
1083
1084   me->stype=STREAM_FILE;
1085
1086   close_stream_file(me);
1087
1088 }
1089
1090 static void preallocate_hint_key_textfile(v3_keyed_stream_t stream,
1091                                           char *key,
1092                                           uint64_t size)
1093 {
1094   textfile_keyed_stream *me = stream;
1095   
1096   me->stype=STREAM_FILE;
1097
1098   preallocate_hint_key_file(me,key,size);
1099   
1100   me->stype=STREAM_TEXTFILE;
1101  
1102 }
1103
1104
1105 static inline int isoneof(char c, char *sl, int m)
1106 {
1107   int i;
1108   
1109   for (i=0;i<m;i++) { 
1110     if (c==sl[i]) { 
1111       return 1;
1112     }
1113   }
1114   
1115   return 0;
1116 }
1117
1118 static char get_next_char(textfile_stream *s)
1119 {
1120   char buf;
1121   if (read_file(s,&buf,1)!=1) { 
1122     return -1;
1123   } 
1124   return buf;
1125 }
1126
1127 static char hexify_nybble(char c)
1128 {
1129   if (c>=0 && c<=9) { 
1130     return '0'+c;
1131   } else if (c>=0xa && c<=0xf) { 
1132     return 'a'+(c-0xa);
1133   } else {
1134     return -1;
1135   }
1136 }
1137
1138 static int hexify_byte(char *c, char b)
1139 {
1140   char n;
1141   n = hexify_nybble( (b >> 4) & 0xf);
1142   if (n==-1) { 
1143     return -1;
1144   }
1145   c[0] = n;
1146   n = hexify_nybble( b & 0xf);
1147   if (n==-1) { 
1148     return -1;
1149   }
1150   c[1] = n;
1151   return 0;
1152 }
1153
1154
1155 static char dehexify_nybble(char c)
1156 {
1157   if (c>='0' && c<='9') { 
1158     return c-'0';
1159   } else if (c>='a' && c<='f') {
1160     return 0xa + (c-'a');
1161   } else if (c>='A' && c<='F') { 
1162     return 0xa + (c-'A');
1163   } else {
1164     return -1;
1165   }
1166 }
1167
1168 static int dehexify_byte(char *c, char *b)
1169 {
1170   char n;
1171   n = dehexify_nybble(c[0]);
1172   if (n==-1) { 
1173     return -1;
1174   }
1175   *b = n << 4;
1176   n = dehexify_nybble(c[1]);
1177   if (n==-1) { 
1178     return -1;
1179   }
1180   *b |= n;
1181   return 0;
1182 }
1183
1184
1185 #if TEXTFILE_OPT_HEX
1186
1187
1188 // Here the sl array, of length m is the number
1189 static int get_hexbytes_as_data(textfile_stream *s, char *buf, int n)
1190 {
1191   char rbuf[MAX_HEX_SEQ*2];
1192   int left = n;
1193   int off = 0;
1194   int cur = 0;
1195   int i;
1196
1197   while (left>0) {
1198     cur = left > MAX_HEX_SEQ ? MAX_HEX_SEQ : left;
1199     if (read_file(s,rbuf,cur*2)!=cur*2) { 
1200       ERROR("Cannot read data in getting hexbytes as data\n");
1201       return -1;
1202     }
1203     
1204     for (i=0;i<cur;i++) {
1205       if (dehexify_byte(rbuf+(i*2),buf+off+i)==-1) { 
1206         ERROR("Cannot decode data as hex in getting hexbytes as data\n");
1207         return -1;
1208       }
1209     }
1210     left-=cur;
1211     off+=cur;
1212   } 
1213
1214   return 0;
1215 }    
1216
1217 #endif
1218
1219 // Here the sl array, of length m is the set of characters to skip (e.g., whitespace)
1220 static int get_hexbytes_as_data_skip(textfile_stream *s, char *buf, int n, char *sl, int m)
1221 {
1222   char rbuf[2];
1223   int which = 0;
1224   int cur=0;
1225     
1226   while (cur<n) {
1227     which=0;
1228     while (which<2) { 
1229       rbuf[which] = get_next_char(s);
1230       if (rbuf[which]==-1) { 
1231         ERROR("Cannot read char in getting hexbytes as data with skiplist");
1232         return -1;
1233       }
1234       if (isoneof(rbuf[which],sl,m)) { 
1235         continue;
1236       } else {
1237         which++;
1238       }
1239     }
1240     if (dehexify_byte(rbuf,buf+cur)==-1) { 
1241       ERROR("Cannot decode data as hex in getting hexbytes as data with skiplist\n");
1242       return -1;
1243     } else {
1244       cur++;
1245     }
1246   }
1247   return 0;
1248 }    
1249
1250 /*
1251 static int put_next_char(textfile_stream *s, char d)
1252 {
1253   return write_file(s,&d,1);
1254 }
1255 */
1256
1257
1258 static int put_data_as_hexbytes(textfile_stream *s, char *buf, int n)
1259 {
1260   char rbuf[MAX_HEX_SEQ*2];
1261   int left = n;
1262   int off = 0;
1263   int cur = 0;
1264   int i;
1265
1266   while (left>0) {
1267     cur = left > MAX_HEX_SEQ ? MAX_HEX_SEQ : left;
1268     for (i=0;i<cur;i++) {
1269       if (hexify_byte(rbuf+(i*2),*(buf+off+i))==-1) { 
1270         ERROR("Cannot encode data as hex in putting data as hexbytes\n");
1271         return -1;
1272       }
1273     }
1274     if (write_file(s,rbuf,cur*2)!=cur*2) { 
1275       ERROR("Cannot write data in putting data as hexbytes\n");
1276       return -1;
1277     }
1278     left-=cur;
1279     off+=cur;
1280   } 
1281
1282   return 0;
1283 }    
1284
1285
1286 static int put_string_n(textfile_stream *s, char *buf, int n)
1287 {
1288   int rc;
1289
1290   rc = write_file(s,buf,n);
1291   
1292   if (rc!=n) { 
1293     return -1;
1294   } else {
1295     return 0;
1296   }
1297 }
1298
1299 static int put_string(textfile_stream *s, char *buf)
1300 {
1301   int n=strlen(buf);
1302
1303   return put_string_n(s,buf,n);
1304 }
1305
1306
1307
1308 static int search_for(textfile_stream *s, char d)
1309 {
1310   char c;
1311   do {
1312     c=get_next_char(s);
1313   } while (c!=-1 && c!=d);
1314   
1315   if (c==d) { 
1316     return 0;
1317   } else {
1318     return -1;
1319   }
1320 }
1321
1322 /*
1323 static int skip_matching(textfile_stream *s, char *m, int n)
1324 {
1325   char c;
1326   int rc = 0;
1327   int i;
1328
1329   while (rc==0) { 
1330     c=get_next_char(s);
1331     if (c==-1) { 
1332       rc=-1;
1333     } else {
1334       for (i=0;i<n;i++) { 
1335         if (c==m[i]) {
1336           rc=1;
1337           break;
1338         } 
1339       }
1340     }
1341   }
1342   
1343   if (rc==1) { 
1344     return 0;  // found
1345   } else {
1346     return rc; // unknown
1347   }
1348 }
1349
1350 */
1351
1352
1353 static int token_scan(textfile_stream *s, char *token, int n, char *sl, int m)
1354 {
1355   char c;
1356   int cur;
1357
1358   // Skip whitespace
1359   do {
1360     c=get_next_char(s);
1361     if (c==-1) { 
1362       ERROR("Failed to get character during token scan (preceding whitespace)\n");
1363       return -1;
1364     }
1365   } while (isoneof(c,sl,m));
1366
1367
1368   token[0]=c;
1369   
1370   // Record
1371   cur=1;
1372   while (cur<(n-1)) { 
1373     c=get_next_char(s);
1374     if (c==-1) { 
1375       ERROR("Failed to get character during token scan (token)\n");
1376       return -1;
1377     }
1378     if (isoneof(c,sl,m)) { 
1379       break;
1380     } else {
1381       token[cur]=c;
1382       cur++;
1383     } 
1384   }
1385   token[cur]=0;
1386   return 0;
1387 }
1388
1389
1390
1391 static v3_keyed_stream_key_t open_key_textfile(v3_keyed_stream_t stream,
1392                                                char *key)
1393 {
1394   textfile_keyed_stream *mks = stream;
1395   textfile_stream *ms;
1396
1397   mks->stype=STREAM_FILE;
1398
1399   ms = open_key_file(mks,key);
1400
1401   if (!ms) { 
1402     ERROR("cannot open underlying file keyed stream for key %s\n",key);
1403     mks->stype=STREAM_TEXTFILE;
1404     return 0;
1405   }
1406
1407   if (mks->ot==V3_KS_WR_ONLY) { 
1408     
1409     // Now we write the section header
1410
1411     ms->stype=STREAM_FILE;
1412     
1413     if (put_string(ms,"[")) { 
1414       close_key_file(mks,ms);
1415       mks->stype=STREAM_TEXTFILE;
1416       return 0;
1417     }
1418     
1419     if (put_string(ms,key)) { 
1420       close_key_file(mks,ms);
1421       mks->stype=STREAM_TEXTFILE;
1422       return 0;
1423     }
1424     
1425     if (put_string(ms,"]\n")) {
1426       close_key_file(mks,ms);
1427       mks->stype=STREAM_TEXTFILE;
1428       return 0;
1429     }
1430     
1431     
1432     mks->stype=STREAM_TEXTFILE;
1433     ms->stype=STREAM_TEXTFILE;
1434
1435     return ms;
1436
1437   } else if (mks->ot == V3_KS_RD_ONLY) {
1438     // Now we readthe section header
1439     int keylen=strlen(key);
1440     char *tempkey = palacios_alloc(keylen+3);
1441
1442     ms->stype=STREAM_FILE;
1443
1444     if (!tempkey) { 
1445       ERROR("Allocation failed in opening key\n");
1446       close_key_file(mks,ms);
1447       mks->stype=STREAM_FILE;
1448       return 0;
1449     }
1450
1451
1452     if (token_scan(ms,tempkey,keylen+3,"\t\r\n",3)) { 
1453       ERROR("Cannot scan for token (key search)\n");
1454       close_key_file(mks,ms);
1455       mks->stype=STREAM_TEXTFILE;
1456       palacios_free(tempkey);
1457       return 0;
1458     }
1459     
1460     tempkey[keylen+2] = 0;
1461     
1462     // Should now have [key]0
1463
1464     if (tempkey[0]!='[' ||
1465         tempkey[keylen+1]!=']' ||
1466         memcmp(key,tempkey+1,keylen)) {
1467       ERROR("key mismatch: target key=%s, found %s\n",key,tempkey);
1468       palacios_free(tempkey);
1469       close_key_file(mks,ms);
1470       mks->stype=STREAM_TEXTFILE;
1471       return 0;
1472     }
1473
1474     // key match done, success
1475
1476     mks->stype=STREAM_TEXTFILE;
1477     ms->stype=STREAM_TEXTFILE;
1478
1479     palacios_free(tempkey);
1480
1481     return ms;
1482     
1483   } else {
1484     ERROR("Unknown open type in open_key_textfile\n");
1485     ms->stype=STREAM_FILE;
1486     close_key_file(mks,ms);
1487     return 0;
1488   }
1489
1490 }
1491
1492
1493
1494 static void close_key_textfile(v3_keyed_stream_t stream, 
1495                                v3_keyed_stream_key_t key)
1496 {
1497   textfile_keyed_stream *mks = stream;
1498   textfile_stream *ms=key;
1499
1500   mks->stype=STREAM_FILE;
1501   ms->stype=STREAM_FILE;
1502
1503   close_key_file(mks,ms);
1504
1505   mks->stype=STREAM_TEXTFILE;
1506
1507 }
1508
1509
1510 static sint64_t read_key_textfile(v3_keyed_stream_t stream, 
1511                                   v3_keyed_stream_key_t key,
1512                                   void *tag,
1513                                   sint64_t taglen,
1514                                   void *buf,
1515                                   sint64_t len)
1516 {
1517     textfile_stream *ms = (textfile_stream *) key;
1518     char tags[32];
1519     char *temptag;
1520
1521
1522
1523     memcpy(tags,tag,taglen<31 ? taglen : 31);
1524     tags[taglen<32? taglen : 31 ]=0;
1525     
1526     temptag=palacios_alloc(taglen+1);
1527     if (!temptag) { 
1528       ERROR("Unable to allocate temptag in textfile read key\n");
1529       return -1;
1530     }
1531
1532     ms->stype=STREAM_FILE;
1533     
1534     if (token_scan(ms,temptag,taglen+1," \t\r\n=",5)) { 
1535       ERROR("Cannot scan for token (tag search)\n");
1536       ms->stype=STREAM_TEXTFILE;
1537       palacios_free(temptag);
1538       return -1;
1539     }
1540
1541     if (memcmp(tag,temptag,taglen)) { 
1542       ERROR("Tag mismatch in reading tag from textfile: desired tag=%s, actual tag=%s\n",tags,temptag);
1543       ms->stype=STREAM_TEXTFILE;
1544       palacios_free(temptag);
1545       return -1;
1546     }
1547
1548     // tag matches, let's go and find our =
1549     palacios_free(temptag);
1550
1551     if (search_for(ms,'=')) { 
1552       ERROR("Unable to find = sign in tag data parse (tag=%s)\n", tags);
1553       ms->stype=STREAM_TEXTFILE;
1554       return -1;
1555     }
1556
1557
1558 #if TEXTFILE_OPT_HEX
1559     if (get_hexbytes_as_data(ms,buf,len)) { 
1560       ERROR("Cannot read data in hex format (opt path) in textfile for tag %s\n",tags);
1561       ms->stype=STREAM_TEXTFILE;
1562       return -1;
1563     }
1564 #else
1565     if (get_hexbytes_as_data_skip(ms,buf,len," \t\r\n",4)) { 
1566       ERROR("Cannot read data in hex format (unopt path) in textfile for tag %s\n",tags);
1567       ms->stype=STREAM_TEXTFILE;
1568       return -1;
1569     }
1570 #endif
1571
1572     ms->stype=STREAM_TEXTFILE;
1573
1574     return len;
1575 }
1576
1577 static sint64_t write_key_textfile(v3_keyed_stream_t stream, 
1578                                    v3_keyed_stream_key_t key,
1579                                    void *tag,
1580                                    sint64_t taglen,
1581                                    void *buf,
1582                                    sint64_t len)
1583 {
1584     textfile_stream *ms = (textfile_stream *) key;
1585     char tags[32];
1586
1587
1588
1589     memcpy(tags,tag,taglen<31 ? taglen : 31);
1590     tags[taglen<32? taglen : 31 ]=0;
1591
1592     /*    if (taglen>100000 || len>100000) { 
1593       ERROR("Too big\n");
1594       return -1;
1595     }
1596     */
1597
1598     ms->stype=STREAM_FILE;
1599
1600     if (put_string_n(ms,tag,taglen)) { 
1601       ERROR("Cannot write tag %s in textfile\n",tags);
1602       ms->stype=STREAM_TEXTFILE;
1603       return -1;
1604     }
1605
1606     if (put_string(ms,"=")) { 
1607       ERROR("Cannot write = in textfile for tag %s\n",tags);
1608       ms->stype=STREAM_TEXTFILE;
1609       return -1;
1610     }
1611
1612     if (put_data_as_hexbytes(ms,buf,len)) { 
1613       ERROR("Cannot write data in hex format in textfile for tag %s\n",tags);
1614       ms->stype=STREAM_TEXTFILE;
1615       return -1;
1616     }
1617
1618     if (put_string(ms,"\n")) { 
1619       ERROR("Cannot write trailing lf in textfile for tag %s\n",tags);
1620       ms->stype=STREAM_TEXTFILE;
1621       return -1;
1622     }
1623
1624     ms->stype=STREAM_TEXTFILE;
1625
1626     return len;
1627 }
1628
1629
1630
1631 /***************************************************************************************************
1632   User implementation   ("user:")
1633 *************************************************************************************************/
1634
1635
1636 // List of all user keyed stream connections for the guest
1637 struct user_keyed_streams {
1638     spinlock_t lock;
1639     struct list_head streams;
1640 };
1641
1642
1643 // A single keyed stream connection to user space
1644 struct user_keyed_stream {
1645     int stype;
1646     v3_keyed_stream_open_t otype;
1647
1648     char *url;
1649     spinlock_t lock;
1650     int waiting;
1651
1652     wait_queue_head_t user_wait_queue;
1653     wait_queue_head_t host_wait_queue;
1654
1655     struct palacios_user_keyed_stream_op *op;
1656
1657     struct list_head node;
1658 };
1659
1660
1661 //
1662 // List of all of the user streams
1663 //
1664 static struct user_keyed_streams *user_streams;
1665
1666
1667
1668 static int resize_op(struct palacios_user_keyed_stream_op **op, uint64_t buf_len)
1669 {
1670     struct palacios_user_keyed_stream_op *old = *op;
1671     struct palacios_user_keyed_stream_op *new;
1672     
1673     if (!old) {
1674         new = palacios_alloc(sizeof(struct palacios_user_keyed_stream_op)+buf_len);
1675         if (!new) { 
1676             return -1;
1677         } else {
1678             new->len=sizeof(struct palacios_user_keyed_stream_op)+buf_len;
1679             new->buf_len=buf_len;
1680             *op=new;
1681             return 0;
1682         }
1683     } else {
1684         if ((old->len-sizeof(struct palacios_user_keyed_stream_op)) >= buf_len) { 
1685             old->buf_len=buf_len;
1686             return 0;
1687         } else {
1688             palacios_free(old);
1689             *op = 0 ;
1690             return resize_op(op,buf_len);
1691         }
1692     }
1693 }
1694
1695 //
1696 // The assumption is that we enter this with the stream locked
1697 // and we will return with it locked;  additionally, the op structure
1698 // will be overwritten with the response
1699 // 
1700 static int do_request_to_response(struct user_keyed_stream *s, unsigned long *flags)
1701 {
1702
1703     if (s->waiting) {
1704         ERROR("user keyed stream request attempted while one is already in progress on %s\n",s->url);
1705         return -1;
1706     }
1707
1708     // we are now waiting for a response
1709     s->waiting = 1;
1710
1711     // release the stream
1712     palacios_spinlock_unlock_irqrestore(&(s->lock), *flags);
1713
1714     // wake up anyone waiting on it
1715     wake_up_interruptible(&(s->user_wait_queue));
1716
1717     // wait for someone to give us a response
1718     while (wait_event_interruptible(s->host_wait_queue, (s->waiting == 0)) != 0) {}
1719
1720     // reacquire the lock for our called
1721     palacios_spinlock_lock_irqsave(&(s->lock), *flags);
1722
1723     return 0;
1724 }
1725
1726 //
1727 // The assumption is that we enter this with the stream locked
1728 // and we will return with it UNlocked
1729 // 
1730 static int do_response_to_request(struct user_keyed_stream *s, unsigned long *flags)
1731 {
1732
1733     if (!(s->waiting)) {
1734         ERROR("user keyed stream response while no request is in progress on %s\n",s->url);
1735         return -1;
1736     }
1737
1738     // we are now waiting for a request
1739     s->waiting = 0;
1740
1741     // release the stream
1742     palacios_spinlock_unlock_irqrestore(&(s->lock), *flags);
1743
1744     // wake up anyone waiting on it
1745     wake_up_interruptible(&(s->host_wait_queue));
1746     
1747     return 0;
1748 }
1749
1750
1751
1752 static unsigned int keyed_stream_poll_user(struct file *filp, poll_table *wait)
1753 {
1754     struct user_keyed_stream *s = (struct user_keyed_stream *) (filp->private_data);
1755     unsigned long flags;
1756     
1757     if (!s) {
1758         return POLLERR;
1759     }
1760     
1761     palacios_spinlock_lock_irqsave(&(s->lock), flags);
1762
1763     poll_wait(filp, &(s->user_wait_queue), wait);
1764
1765     if (s->waiting) {
1766         palacios_spinlock_unlock_irqrestore(&(s->lock), flags);
1767         return POLLIN | POLLRDNORM;
1768     }
1769     
1770     palacios_spinlock_unlock_irqrestore(&(s->lock), flags);
1771
1772     return 0;
1773 }
1774
1775 static long keyed_stream_ioctl_user(struct file * filp, unsigned int ioctl, unsigned long arg)
1776 {
1777     void __user *argp = (void __user *)arg;
1778     unsigned long flags;
1779     uint64_t size;
1780     
1781     struct user_keyed_stream *s = (struct user_keyed_stream *) (filp->private_data);
1782     
1783     switch (ioctl) {
1784
1785         case V3_KSTREAM_REQUEST_SIZE_IOCTL:
1786             
1787             // inform request size
1788             
1789             palacios_spinlock_lock_irqsave(&(s->lock), flags);
1790             
1791             if (!(s->waiting)) {
1792                 palacios_spinlock_unlock_irqrestore(&(s->lock), flags);
1793                 return 0;
1794             }
1795
1796             size =  sizeof(struct palacios_user_keyed_stream_op) + s->op->buf_len;
1797             
1798             if (copy_to_user((void * __user) argp, &size, sizeof(uint64_t))) {
1799                 palacios_spinlock_unlock_irqrestore(&(s->lock), flags);
1800                 ERROR("palacios user key size request failed to copy data\n");
1801                 return -EFAULT;
1802             }
1803             
1804             palacios_spinlock_unlock_irqrestore(&(s->lock), flags);
1805             
1806             return 1;
1807             
1808             break;
1809
1810         case V3_KSTREAM_REQUEST_PULL_IOCTL: 
1811                 
1812             // pull the request
1813             
1814             palacios_spinlock_lock_irqsave(&(s->lock), flags);
1815
1816             if (!(s->waiting)) {
1817                 palacios_spinlock_unlock_irqrestore(&(s->lock), flags);
1818                 ERROR("palacios user key pull request when not waiting\n");
1819                 return 0;
1820             }
1821
1822             size =  sizeof(struct palacios_user_keyed_stream_op) + s->op->buf_len;
1823
1824
1825             if (copy_to_user((void __user *) argp, s->op, size)) {
1826                 palacios_spinlock_unlock_irqrestore(&(s->lock), flags);
1827                 ERROR("palacios user key pull request failed to copy data\n");
1828                 return -EFAULT;
1829             }
1830
1831             palacios_spinlock_unlock_irqrestore(&(s->lock), flags);
1832             
1833             return 1;
1834             
1835          
1836             break;
1837
1838     case V3_KSTREAM_RESPONSE_PUSH_IOCTL:
1839
1840         // push the response
1841
1842         palacios_spinlock_lock_irqsave(&(s->lock), flags);
1843
1844         if (!(s->waiting)) {
1845             palacios_spinlock_unlock_irqrestore(&(s->lock), flags);
1846             ERROR("palacios user key push response when not waiting\n");
1847             return 0;
1848         }
1849         
1850         if (copy_from_user(&size, (void __user *) argp, sizeof(uint64_t))) {
1851             ERROR("palacios user key push response failed to copy size\n");
1852             palacios_spinlock_unlock_irqrestore(&(s->lock), flags);
1853             return -EFAULT;
1854         }
1855
1856         // overflow possible here for very large request
1857         if (resize_op(&(s->op),size-sizeof(struct palacios_user_keyed_stream_op))) {
1858             ERROR("unable to resize op in user key push response\n");
1859             palacios_spinlock_unlock_irqrestore(&(s->lock), flags);
1860             return -EFAULT;
1861         }
1862
1863         if (copy_from_user(s->op, (void __user *) argp, size)) {
1864             palacios_spinlock_unlock_irqrestore(&(s->lock), flags);
1865             return -EFAULT;
1866         }
1867
1868         do_response_to_request(s,&flags);
1869         // this will have unlocked s for us
1870
1871         return 1;
1872
1873         break;
1874         
1875     default:
1876         ERROR("unknown ioctl in user keyed stream\n");
1877
1878         return -EFAULT;
1879
1880         break;
1881         
1882     }
1883 }
1884
1885
1886 static int keyed_stream_release_user(struct inode *inode, struct file *filp)
1887 {
1888     struct user_keyed_stream *s = filp->private_data;
1889     unsigned long f1,f2;
1890
1891     palacios_spinlock_lock_irqsave(&(user_streams->lock),f1);
1892     palacios_spinlock_lock_irqsave(&(s->lock), f2);
1893
1894     list_del(&(s->node));
1895
1896     palacios_spinlock_unlock_irqrestore(&(s->lock), f2);
1897     palacios_spinlock_unlock_irqrestore(&(user_streams->lock), f1);
1898     
1899     palacios_free(s->url);
1900     palacios_free(s);
1901
1902     return 0;
1903 }
1904
1905 static struct file_operations user_keyed_stream_fops = {
1906     .poll = keyed_stream_poll_user,
1907     .compat_ioctl = keyed_stream_ioctl_user,
1908     .unlocked_ioctl = keyed_stream_ioctl_user,
1909     .release = keyed_stream_release_user,
1910 };
1911
1912
1913 /*
1914   user_keyed_streams are allocated on user connect, and deallocated on user release
1915   
1916   palacios-side opens and closes only manipulate the open type
1917 */
1918
1919 int keyed_stream_connect_user(struct v3_guest *guest, unsigned int cmd, unsigned long arg, void *priv_data)
1920 {
1921     int fd;
1922     unsigned long flags;
1923     char *url;
1924     uint64_t len;
1925     struct user_keyed_stream *s;
1926     
1927     if (!user_streams) { 
1928         ERROR("no user space keyed streams!\n");
1929         return -1;
1930     }
1931
1932     // get the url
1933     if (copy_from_user(&len,(void __user *)arg,sizeof(len))) { 
1934         ERROR("cannot copy url len from user\n");
1935         return -1;
1936     }
1937
1938     // overflow possible here, but only if this is a huge guest request (>4GB)
1939     url = palacios_alloc(len);
1940     
1941     if (!url) { 
1942         ERROR("cannot allocate url for user keyed stream\n");
1943         return -1;
1944     }
1945
1946     if (copy_from_user(url,((void __user *)arg)+sizeof(len),len)) {
1947         ERROR("cannot copy url from user\n");
1948         return -1;
1949     }
1950     url[len-1]=0;
1951         
1952     
1953     // Check for duplicate handler
1954     palacios_spinlock_lock_irqsave(&(user_streams->lock), flags);
1955     list_for_each_entry(s, &(user_streams->streams), node) {
1956         if (!strncasecmp(url, s->url, len)) {
1957             ERROR("user keyed stream connection with url \"%s\" already exists\n", url);
1958             palacios_free(url);
1959             return -1;
1960         }
1961     }
1962     palacios_spinlock_unlock_irqrestore(&(user_streams->lock), flags);
1963     
1964     // Create connection
1965     s = palacios_alloc(sizeof(struct user_keyed_stream));
1966     
1967     if (!s) {
1968         ERROR("cannot allocate new user keyed stream for %s\n",url);
1969         palacios_free(url);
1970         return -1;
1971     }
1972     
1973     
1974     // Get file descriptor
1975     fd = anon_inode_getfd("v3-kstream", &user_keyed_stream_fops, s, 0);
1976
1977     if (fd < 0) {
1978         ERROR("cannot allocate file descriptor for new user keyed stream for %s\n",url);
1979         palacios_free(s);
1980         palacios_free(url);
1981         return -1;
1982     }
1983     
1984     memset(s, 0, sizeof(struct user_keyed_stream));
1985     
1986     s->stype=STREAM_USER;
1987     s->url=url;
1988     
1989     init_waitqueue_head(&(s->user_wait_queue));
1990     init_waitqueue_head(&(s->host_wait_queue));
1991     
1992     // Insert connection into list
1993     palacios_spinlock_lock_irqsave(&(user_streams->lock), flags);
1994     list_add(&(s->node), &(user_streams->streams));
1995     palacios_spinlock_unlock_irqrestore(&(user_streams->lock), flags);
1996     
1997     return fd;
1998 }
1999     
2000 static struct user_keyed_stream *keyed_stream_user_find(char *url)
2001 {
2002     unsigned long flags;
2003     struct user_keyed_stream *s;
2004     
2005     if (!user_streams) { 
2006         ERROR("no user space keyed streams available\n");
2007         return NULL;
2008     }
2009     
2010     palacios_spinlock_lock_irqsave(&(user_streams->lock), flags);
2011     list_for_each_entry(s, &(user_streams->streams), node) {
2012         if (!strcasecmp(url, s->url)) {
2013             palacios_spinlock_unlock_irqrestore(&(user_streams->lock), flags);
2014             return s;
2015         }
2016     }
2017     
2018     palacios_spinlock_unlock_irqrestore(&(user_streams->lock), flags);
2019     
2020     return NULL;
2021 }
2022     
2023     
2024 static v3_keyed_stream_t open_stream_user(char *url, v3_keyed_stream_open_t ot)
2025 {
2026     unsigned long flags;
2027     struct user_keyed_stream *s;
2028     
2029     s = keyed_stream_user_find(url);
2030     
2031     if (!s) {
2032         ERROR("cannot open user stream %s as it does not exist yet\n",url);
2033         return NULL;
2034     }
2035
2036     palacios_spinlock_lock_irqsave(&(s->lock), flags);
2037
2038     if (s->waiting) {
2039         palacios_spinlock_unlock_irqrestore(&(s->lock), flags);
2040         ERROR("cannot open user stream %s as it is already in waiting state\n",url);
2041         return NULL;
2042     }
2043     
2044     s->otype = ot==V3_KS_WR_ONLY_CREATE ? V3_KS_WR_ONLY : ot;
2045     
2046     palacios_spinlock_unlock_irqrestore(&(s->lock), flags);
2047     
2048     return s;
2049     
2050 }
2051     
2052 // close stream does not do anything.  Creation of the stream and its cleanup
2053 // are driven by the user side, not the palacios side
2054 // might eventually want to reference count this, though
2055 static void close_stream_user(v3_keyed_stream_t stream)
2056 {
2057     return;
2058 }
2059
2060 static void preallocate_hint_key_user(v3_keyed_stream_t stream,
2061                                       char *key,
2062                                       uint64_t size)
2063 {
2064     return;
2065 }
2066
2067
2068
2069
2070 static v3_keyed_stream_key_t open_key_user(v3_keyed_stream_t stream, char *key)
2071 {
2072     unsigned long flags;
2073     struct user_keyed_stream *s = (struct user_keyed_stream *) stream;
2074     uint64_t   len = strlen(key)+1;
2075     void *user_key;
2076
2077     palacios_spinlock_lock_irqsave(&(s->lock), flags);
2078
2079
2080     if (resize_op(&(s->op),len)) {
2081         palacios_spinlock_unlock_irqrestore(&(s->lock),flags);
2082         ERROR("cannot resize op in opening key %s on user keyed stream %s\n",key,s->url);
2083         return NULL;
2084     }
2085
2086     s->op->type = PALACIOS_KSTREAM_OPEN_KEY;
2087     s->op->buf_len = len;
2088     strncpy(s->op->buf,key,len); // will terminate buffer
2089
2090     // enter with it locked
2091     if (do_request_to_response(s,&flags)) { 
2092         palacios_spinlock_unlock_irqrestore(&(s->lock),flags);
2093         ERROR("request/response handling failed\n");
2094         return NULL;
2095     }
2096     // return with it locked
2097
2098     user_key=s->op->user_key;
2099
2100     palacios_spinlock_unlock_irqrestore(&(s->lock),flags);
2101
2102     return user_key;
2103 }
2104
2105 static void close_key_user(v3_keyed_stream_t stream, v3_keyed_stream_key_t key)
2106 {
2107     struct user_keyed_stream *s = (struct user_keyed_stream *) stream;
2108     uint64_t   len = 0;
2109     unsigned long flags;
2110     
2111     palacios_spinlock_lock_irqsave(&(s->lock), flags);
2112
2113     if (resize_op(&(s->op),len)) {
2114         palacios_spinlock_unlock_irqrestore(&(s->lock),flags);
2115         ERROR("cannot resize op in closing key 0x%p on user keyed stream %s\n",key,s->url);
2116         return;
2117     }
2118
2119     s->op->type = PALACIOS_KSTREAM_CLOSE_KEY;
2120     s->op->buf_len = len;
2121     s->op->user_key = key;
2122
2123     // enter with it locked
2124     if (do_request_to_response(s,&flags)) { 
2125         palacios_spinlock_unlock_irqrestore(&(s->lock),flags);
2126         ERROR("request/response handling failed\n");
2127         return;
2128     }
2129     // return with it locked
2130
2131     palacios_spinlock_unlock_irqrestore(&(s->lock),flags);
2132
2133     return;
2134 }
2135
2136
2137
2138 static sint64_t read_key_user(v3_keyed_stream_t stream, v3_keyed_stream_key_t key,
2139                               void *tag,
2140                               sint64_t taglen,
2141                               void *buf, sint64_t rlen)
2142 {
2143
2144     struct user_keyed_stream *s = (struct user_keyed_stream *) stream;
2145     uint64_t   len = taglen ;
2146     sint64_t   xfer;
2147     unsigned long flags;
2148
2149     palacios_spinlock_lock_irqsave(&(s->lock), flags);
2150
2151     if (s->otype != V3_KS_RD_ONLY) { 
2152         palacios_spinlock_unlock_irqrestore(&(s->lock),flags);
2153         ERROR("attempt to read key from stream that is not in read state on %s\n",s->url);
2154     }   
2155
2156
2157     if (resize_op(&(s->op),len)) {
2158         palacios_spinlock_unlock_irqrestore(&(s->lock),flags);
2159         ERROR("cannot resize op in reading key 0x%p on user keyed stream %s\n",key,s->url);
2160         return -1;
2161     }
2162
2163     s->op->type = PALACIOS_KSTREAM_READ_KEY;
2164     s->op->buf_len = len ;
2165     s->op->xfer = rlen;
2166     s->op->user_key = key;
2167     s->op->data_off = taglen;
2168
2169     memcpy(s->op->buf,tag,taglen);
2170
2171     // enter with it locked
2172     if (do_request_to_response(s,&flags)) { 
2173         palacios_spinlock_unlock_irqrestore(&(s->lock),flags);
2174         ERROR("request/response handling failed\n");
2175         return -1;
2176     }
2177     // return with it locked
2178
2179
2180     if (s->op->xfer>0) { 
2181         // data_off must be zero
2182         memcpy(buf,s->op->buf,s->op->xfer);
2183     }
2184
2185     xfer=s->op->xfer;
2186
2187     palacios_spinlock_unlock_irqrestore(&(s->lock),flags);
2188
2189     return xfer;
2190 }
2191
2192
2193 static sint64_t write_key_user(v3_keyed_stream_t stream, v3_keyed_stream_key_t key,
2194                                void *tag,
2195                                sint64_t taglen,
2196                                void *buf, sint64_t wlen)
2197 {
2198
2199     struct user_keyed_stream *s = (struct user_keyed_stream *) stream;
2200     uint64_t   len = taglen + wlen ;
2201     sint64_t   xfer;
2202     unsigned long flags;
2203
2204
2205     palacios_spinlock_lock_irqsave(&(s->lock), flags);
2206
2207     if (s->otype != V3_KS_WR_ONLY) { 
2208         palacios_spinlock_unlock_irqrestore(&(s->lock),flags);
2209         ERROR("attempt to write key on stream that is not in write state on %s\n",s->url);
2210     }   
2211
2212     if (resize_op(&(s->op),len)) {
2213         palacios_spinlock_unlock_irqrestore(&(s->lock),flags);
2214         ERROR("cannot resize op in reading key 0x%p on user keyed stream %s\n",key,s->url);
2215         return -1;
2216     }
2217
2218     s->op->type = PALACIOS_KSTREAM_WRITE_KEY;
2219     s->op->buf_len = len;
2220     s->op->xfer = wlen;
2221     s->op->user_key = key;
2222     s->op->data_off = taglen;
2223
2224     memcpy(s->op->buf,tag,taglen);
2225     memcpy(s->op->buf+taglen,buf,wlen);
2226
2227     // enter with it locked
2228     if (do_request_to_response(s,&flags)) { 
2229         palacios_spinlock_unlock_irqrestore(&(s->lock),flags);
2230         ERROR("request/response handling failed\n");
2231         return -1;
2232     }
2233     // return with it locked
2234
2235     // no data comes back, xfer should be size of data write (not tag)
2236
2237     xfer=s->op->xfer;
2238
2239     palacios_spinlock_unlock_irqrestore(&(s->lock),flags);
2240
2241     return xfer;
2242 }
2243
2244
2245
2246 /****************************************************************************************
2247  *    Network-based implementation  ("net:")
2248  *****************************************************************************************/
2249
2250
2251 #define NET_MAX_KEY_LEN 128
2252
2253 struct net_keyed_stream {
2254     int stype;
2255     int ot;
2256     struct net_stream * ns;
2257 };
2258
2259 struct net_stream {
2260     int stype;
2261     struct socket *sock;
2262 };
2263
2264
2265 //ignore the arguments given here currently
2266 static struct net_stream * create_net_stream(void) 
2267 {
2268     struct net_stream * ns = NULL;
2269
2270     ns = palacios_alloc(sizeof(struct net_stream));
2271     
2272     if (!ns) { 
2273         ERROR("Cannot allocate a net_stream\n");
2274         return 0;
2275     }
2276
2277     memset(ns, 0, sizeof(struct net_stream));
2278
2279     ns->stype = STREAM_NETWORK;
2280
2281     return ns;
2282 }
2283
2284 static void close_socket(v3_keyed_stream_t stream)
2285 {
2286     struct net_keyed_stream *nks = (struct net_keyed_stream *) stream;
2287
2288     if (nks) { 
2289         struct net_stream *ns = nks->ns;
2290
2291         if (ns) {
2292             ns->sock->ops->release(ns->sock);
2293             palacios_free(ns);
2294             ERROR("Close Socket\n");
2295         }
2296         
2297         palacios_free(ns);
2298     }
2299 }
2300
2301
2302 static void close_stream_net(v3_keyed_stream_t stream)
2303 {
2304         close_socket(stream);
2305 }
2306
2307 static int connect_to_ip(struct net_stream *ns, int hostip, int port)
2308 {
2309     struct sockaddr_in client;
2310
2311     if (ns == NULL) {
2312         return -1;
2313     }
2314
2315     if (sock_create(PF_INET,SOCK_STREAM,IPPROTO_TCP,&(ns->sock))<0) { 
2316         ERROR("Cannot create accept socket\n");
2317         return -1;
2318     }
2319         
2320
2321     client.sin_family = AF_INET;
2322     client.sin_port = htons(port);
2323     client.sin_addr.s_addr = hostip;//in_aton(hostip);
2324
2325     return ns->sock->ops->connect(ns->sock, (struct sockaddr *)&client, sizeof(client), 0);
2326 }
2327
2328 static int send_msg(struct net_stream *ns,  char * buf, int len)
2329 {
2330     int left=len;
2331
2332     if (!ns) { 
2333         ERROR("Send message on null net_stream\n");
2334         return -1;
2335     }
2336
2337     if (!(ns->sock)) { 
2338         ERROR("Send message on net_stream without socket\n");
2339         return -1;
2340     }
2341
2342     while (left>0) {
2343
2344         struct msghdr msg;
2345         mm_segment_t oldfs;
2346         struct iovec iov;
2347         int err = 0;
2348         
2349
2350         msg.msg_flags = MSG_NOSIGNAL;//MSG_DONTWAIT;
2351         msg.msg_name = 0;
2352         msg.msg_namelen = 0;
2353         msg.msg_control = NULL;
2354         msg.msg_controllen = 0;
2355         msg.msg_iov = &iov;
2356         msg.msg_iovlen = 1;
2357
2358         iov.iov_base = (char *)&(buf[len-left]);
2359         iov.iov_len = (size_t)left;
2360
2361         oldfs = get_fs();
2362         set_fs(KERNEL_DS);
2363
2364         err = sock_sendmsg(ns->sock, &msg, (size_t)left);
2365
2366         set_fs(oldfs);
2367         
2368         if (err<0) {
2369             ERROR("Send msg error %d\n",err);
2370             return err;
2371         } else {
2372             left-=len;
2373         }
2374     }
2375
2376     return len;
2377 }
2378
2379
2380
2381 static int recv_msg(struct net_stream *ns, char * buf, int len)
2382 {
2383
2384     int left=len;
2385
2386     if (!ns) { 
2387         ERROR("Receive message on null net_stream\n");
2388         return -1;
2389     }
2390
2391     if (!(ns->sock)) { 
2392         ERROR("Receive  message on net_stream without socket\n");
2393         return -1;
2394     }
2395     
2396     
2397     while (left>0) {
2398         
2399         struct msghdr msg;
2400         mm_segment_t oldfs;
2401         struct iovec iov;
2402         int err;
2403         
2404         msg.msg_flags = 0;
2405         msg.msg_name = 0;
2406         msg.msg_namelen = 0;
2407         msg.msg_control = NULL;
2408         msg.msg_controllen = 0;
2409         msg.msg_iov = &iov;
2410         msg.msg_iovlen = 1;
2411         
2412         iov.iov_base = (void *)&(buf[len-left]);
2413         iov.iov_len = (size_t)left;
2414         
2415         oldfs = get_fs();
2416         set_fs(KERNEL_DS);
2417         
2418         err = sock_recvmsg(ns->sock, &msg, (size_t)left, 0);
2419         
2420         set_fs(oldfs);
2421         
2422         if (err<0) { 
2423             return err;
2424         } else {
2425             left -= err;
2426         }
2427     }
2428     return len;
2429 }
2430
2431 static struct net_stream * accept_once(struct net_stream * ns, const int port)
2432 {
2433     struct socket *accept_sock;
2434     struct sockaddr_in addr;
2435     int err;
2436     
2437     if (!ns) { 
2438         ERROR("Accept called on null net_stream\n");
2439         return 0;
2440     }
2441     
2442     if (sock_create(PF_INET,SOCK_STREAM,IPPROTO_TCP,&accept_sock)<0) { 
2443         ERROR("Cannot create accept socket\n"); 
2444         return NULL;
2445     }
2446     
2447
2448     addr.sin_family = AF_INET;
2449     addr.sin_port = htons(port);
2450     addr.sin_addr.s_addr = INADDR_ANY;
2451     
2452     err = accept_sock->ops->bind(accept_sock, (struct sockaddr *)&addr, sizeof(addr));
2453     
2454     if (err<0) {
2455         ERROR("Bind err: %d\n",err);
2456         return NULL;
2457     }
2458
2459     err = accept_sock->ops->listen(accept_sock,2);
2460     
2461     if (err<0) {
2462         ERROR("Listen err: %d\n",err);
2463         return NULL;
2464     }
2465     
2466     // Init the socket in the network strream
2467
2468     if (sock_create(PF_INET,SOCK_STREAM,IPPROTO_TCP,&(ns->sock))<0) { 
2469         ERROR("Cannot create socket\n");
2470         return NULL;
2471     }
2472     
2473     
2474     // Do the actual accept 
2475
2476     if (accept_sock->ops->accept(accept_sock,ns->sock,0)<0) {
2477         ERROR("accept failed");
2478         return NULL;
2479     }
2480     
2481     // close the accept socket
2482     accept_sock->ops->release(accept_sock);
2483     palacios_free(accept_sock);
2484
2485     return ns;
2486 }
2487
2488
2489 static struct v3_keyed_stream_t * open_stream_net(char * url,v3_keyed_stream_open_t ot)
2490 {
2491     struct net_keyed_stream * nks;
2492     int url_len;
2493     int i;
2494     int delimit[3];
2495     int k;
2496     char mode;
2497     int ip_len;
2498     int port_len;
2499
2500     nks = palacios_alloc(sizeof(struct net_keyed_stream)); 
2501
2502     if (!nks) { 
2503         ERROR("Could not allocate space in open_stream_net\n");
2504         return 0;
2505     }
2506     
2507     nks->ot = ot == V3_KS_WR_ONLY_CREATE ? V3_KS_WR_ONLY : ot;
2508
2509     nks->stype = STREAM_NETWORK; 
2510
2511     nks->ns = create_net_stream();
2512     
2513     if (!(nks->ns)) { 
2514         ERROR("Could not create network stream\n");
2515         palacios_free(nks);
2516         return 0;
2517     }
2518
2519     url_len=strlen(url);
2520     k=0;
2521
2522
2523     for(i = 0; i < url_len;i++){
2524         if(url[i] == ':'){
2525             delimit[k] = i;
2526             k++;        
2527         }
2528     }
2529
2530     mode = url[delimit[0] + 1];
2531     ip_len = delimit[2] - delimit[1];
2532     port_len = url_len - delimit[2];
2533
2534
2535     {
2536         char ip[ip_len];
2537         char port[port_len];
2538         int host_ip;
2539         int host_port;
2540
2541
2542         strncpy(ip,url + delimit[1]+1,ip_len-1);
2543         ip[ip_len-1]='\0';
2544         
2545         host_ip = in_aton(ip);
2546         
2547         strncpy(port,url+ delimit[2]+1,port_len-1);
2548         port[port_len-1]='\0';
2549         
2550         host_port = simple_strtol(port,NULL,10);
2551
2552         INFO("ip is %s\n",ip); 
2553         INFO("host_ip is %x\n", host_ip);
2554         INFO("port is %s (%d)\n",port,host_port);
2555         
2556         if (mode == 'a'){
2557             // accept a request
2558             INFO("Accepting Connection on INADDR_ANY port:%d\n",host_port);
2559             nks->ns = accept_once(nks->ns, host_port);
2560         } else if (mode == 'c'){
2561             // call connect to ip
2562             INFO("Connecting to %s:%d\n",ip,host_port);
2563             connect_to_ip(nks->ns,host_ip, host_port);
2564         } else {
2565             ERROR("Mode not recognized\n");
2566             palacios_free(nks);
2567             return NULL;
2568         }
2569         
2570         return (v3_keyed_stream_t)nks;
2571     }
2572 }
2573
2574 static void preallocate_hint_key_net(v3_keyed_stream_t stream, char *key,uint64_t size)
2575 {
2576     //do nothing
2577 }
2578
2579 static v3_keyed_stream_key_t open_key_net(v3_keyed_stream_t stream,char *key)
2580 {
2581    struct net_keyed_stream * nks = (struct net_keyed_stream *)stream;
2582
2583    // reciever of the key name 
2584    if (nks->ot==V3_KS_WR_ONLY)
2585    {
2586        unsigned short keylen = strlen(key);
2587
2588        if (keylen>NET_MAX_KEY_LEN || keylen>=32768) { 
2589            ERROR("Key is too long\n");
2590            return NULL;
2591        }
2592
2593        {
2594            // on-stack allocation here demands that we
2595            // keep key length low...
2596            char msg[keylen+3];
2597            int next = 0;
2598            
2599            // Opening a key for writing sends a notice of the 
2600            // key length and the key name on the channel
2601            
2602            msg[next++]=keylen & 0xFF;
2603            msg[next]=(keylen>>8) & 0xFF;
2604            // Set flag bit
2605            msg[next]=msg[next] | 0x80; // 0x80 is 128 and OR will flip leading bit to 1
2606            
2607            strncpy(msg+2,key,keylen);  // will also copy trailing zero
2608            
2609            if (send_msg(nks->ns,msg,keylen+2) != keylen+2) { 
2610                ERROR("Unable to open key for writing on net_stream (send key len+name)\n");
2611                return NULL;
2612            }
2613        }
2614    }
2615
2616    if (nks->ot==V3_KS_RD_ONLY)   {
2617        char msg_info[2];
2618        int next = 0;
2619        int keylen = 0;
2620
2621        if (recv_msg(nks->ns,msg_info,2) != 2) { 
2622            ERROR("Unable to open key for reading on net_stream (recv key len)\n");
2623            return NULL;
2624        }
2625
2626        next = 0;
2627        keylen = 0;
2628
2629        keylen |= msg_info[next++];
2630
2631        if ((msg_info[next] & 0x80) != 0x80)  {
2632            ERROR("Flag bit not set on receive of key length\n");
2633            return NULL;
2634        } else {
2635            msg_info[next] &= 0x7F; // flip the msb back to zero (clear flag)
2636        }
2637        
2638        keylen |= msg_info[next]<<8;
2639
2640        if (keylen > NET_MAX_KEY_LEN) { 
2641            ERROR("Received key length is too big\n");
2642            return NULL;
2643        }
2644        
2645        {
2646            
2647            char msg[keylen+1];
2648            
2649            if (recv_msg(nks->ns,msg,keylen) != keylen) { 
2650                ERROR("Unable to receive key\n");
2651                return NULL;
2652            }
2653            msg[keylen]=0;
2654            
2655            if (strncmp(key,msg,keylen)!=0) {
2656                ERROR("Key mismatch in open_key_net - expect %s but got %s\n",key,msg);
2657                return NULL;
2658            }
2659        }
2660    }
2661    
2662    return (v3_keyed_stream_key_t)key;
2663 }
2664
2665 static void close_key_net(v3_keyed_stream_t stream, v3_keyed_stream_key_t input_key)
2666 {
2667     char * key = (char*)input_key;
2668     struct net_keyed_stream * nks = (struct net_keyed_stream *)stream;
2669
2670     
2671     if (nks->ot==V3_KS_WR_ONLY) {
2672         unsigned short keylen = strlen(key);
2673
2674         if (keylen > NET_MAX_KEY_LEN || keylen>=32768) {
2675             ERROR("Key length too long in close_key_net\n");
2676             return;
2677         }
2678
2679         {
2680             char msg[keylen+3];
2681             int next = 0;
2682             
2683             msg[next++]=keylen & 0xFF;
2684             msg[next]=(keylen>>8) & 0xFF;
2685             // flag
2686             msg[next]=msg[next] | 0x80; // 0x80 is 128 and OR will filp leading bit to 1
2687             strncpy(msg+2,key,keylen); // will copy the zero
2688             msg[keylen+2]=0;
2689             if (send_msg(nks->ns,msg,keylen+2)!=keylen+2) { 
2690                 ERROR("Cannot send key on close_key_net\n");
2691                 return;
2692             }
2693         }
2694     }
2695     
2696     if (nks->ot==V3_KS_RD_ONLY)   {
2697         char msg_info[2];
2698         int next;
2699         int keylen;
2700         
2701         if (recv_msg(nks->ns,msg_info,2) != 2) { 
2702             ERROR("Cannot recv key length on close_key_net\n");
2703             return;
2704         }
2705         
2706         next = 0;
2707         keylen = 0;
2708         
2709         keylen |= msg_info[next++];
2710         
2711         if ((msg_info[next] & 0x80) != 0x80) {
2712             ERROR("Missing flag in close_key_net receive\n");
2713             return;
2714         } 
2715         
2716         msg_info[next] &= 0x7F; // flip the msb back to zero
2717         
2718         keylen |= msg_info[next]<<8;
2719         
2720         {
2721             char msg[keylen+1];
2722             
2723             if (recv_msg(nks->ns,msg,keylen)!=keylen) { 
2724                 ERROR("Did not receive all of key in close_key_net receive\n");
2725                 return;
2726             }
2727             
2728             msg[keylen]=0;
2729             
2730             if (strncmp(key,msg,keylen)!=0)  {
2731                 ERROR("Key mismatch in close_key_net - expect %s but got %s\n",key,msg);
2732                 return;
2733             }
2734         }
2735     }
2736 }
2737
2738 static sint64_t write_key_net(v3_keyed_stream_t stream, v3_keyed_stream_key_t key, 
2739                               void *tag,
2740                               sint64_t taglen,
2741                               void *buf, sint64_t len) 
2742 {
2743     struct net_keyed_stream * nks = (struct net_keyed_stream *)stream;
2744
2745     if (!buf) { 
2746         ERROR("Buf is NULL in write_key_net\n");
2747         return -1;
2748     }
2749
2750     if (!tag) { 
2751         ERROR("Tag is NULL in write_key_net\n");
2752         return -1;
2753     }
2754
2755     if (len<0) {
2756         ERROR("len is negative in write_key_net\n");
2757         return -1;
2758     }
2759
2760     if (taglen<0) {
2761         ERROR("taglen is negative in write_key_net\n");
2762         return -1;
2763     }
2764     
2765     if (!key){
2766        ERROR("write_key: key is NULL in write_key_net\n");
2767        return -1;
2768     }
2769     
2770     
2771     if (!nks)  {
2772         ERROR("nks is NULL in write_key_net\n");
2773         return -1;
2774     }
2775     
2776     if (nks->ot==V3_KS_WR_ONLY) {
2777         if (send_msg(nks->ns,(char*)(&BOUNDARY_TAG),sizeof(BOUNDARY_TAG))!=sizeof(BOUNDARY_TAG)) { 
2778             ERROR("Could not send boundary tag in write_key_net\n");
2779             return -1;
2780         } 
2781         if (send_msg(nks->ns,(char*)(&taglen),sizeof(taglen))!=sizeof(taglen)) { 
2782             ERROR("Could not send tag length in write_key_net\n");
2783             return -1;
2784         } 
2785         if (send_msg(nks->ns,tag,taglen)!=taglen) { 
2786             ERROR("Could not send tag in write_key_net\n");
2787             return -1;
2788         }
2789         if (send_msg(nks->ns,(char*)(&len),sizeof(len))!=sizeof(len)) { 
2790             ERROR("Could not send data length in write_key_net\n");
2791             return -1;
2792         } 
2793         if (send_msg(nks->ns,buf,len)!=len) { 
2794             ERROR("Could not send data in write_key_net\n");
2795             return -1;
2796         }
2797     }  else {
2798         ERROR("Permission not correct in write_key_net\n");
2799         return -1;
2800     }
2801     
2802     return len;
2803 }
2804
2805
2806 static sint64_t read_key_net(v3_keyed_stream_t stream, v3_keyed_stream_key_t key,
2807                              void *tag,
2808                              sint64_t taglen,
2809                              void *buf, sint64_t len)
2810 {
2811     struct net_keyed_stream * nks = (struct net_keyed_stream *)stream;
2812     void *temptag;
2813
2814     if (!buf) {
2815         ERROR("Buf is NULL in read_key_net\n");
2816         return -1;
2817     }
2818
2819     if (!tag) {
2820         ERROR("Tag is NULL in read_key_net\n");
2821         return -1;
2822     }
2823     
2824     if(len<0) {
2825         ERROR("len is negative in read_key_net\n");
2826         return -1;
2827     }
2828
2829     if(taglen<0) {
2830         ERROR("taglen is negative in read_key_net\n");
2831         return -1;
2832     }
2833     
2834     if (!key) {
2835         ERROR("read_key: key is NULL in read_key_net\n");
2836         return -1;
2837     }
2838
2839
2840     if (nks->ot==V3_KS_RD_ONLY) {
2841         
2842         sint64_t slen;
2843         uint32_t tempbt;
2844         
2845         if (recv_msg(nks->ns,(char*)(&tempbt),sizeof(tempbt))!=sizeof(tempbt)) { 
2846             ERROR("Cannot receive boundary tag in read_key_net\n");
2847             return -1;
2848         }
2849
2850         if (tempbt!=BOUNDARY_TAG) { 
2851           ERROR("Invalid boundary tag (received 0x%x\n",tempbt);
2852           return -1;
2853         }
2854            
2855         temptag=palacios_alloc(taglen);
2856         if (!temptag) {
2857           ERROR("failed to allocate temptag\n");
2858           return -1;
2859         }
2860
2861         if (recv_msg(nks->ns,(char*)(&slen),sizeof(slen))!=sizeof(slen)) { 
2862             ERROR("Cannot receive tag len in read_key_net\n");
2863             palacios_free(temptag);
2864             return -1;
2865         }
2866
2867         if (slen!=taglen) {
2868             ERROR("Tag len expected does not matched tag len decoded in read_key_net\n");
2869             palacios_free(temptag);
2870             return -1;
2871         }
2872
2873         if (recv_msg(nks->ns,temptag,taglen)!=taglen) { 
2874             ERROR("Cannot recieve tag in read_key_net\n");
2875             palacios_free(temptag);
2876             return -1;
2877         }
2878
2879         if (memcmp(temptag,tag,taglen)) { 
2880           ERROR("Tag mismatch\n");
2881           palacios_free(temptag);
2882           return -1;
2883         }
2884
2885         if (recv_msg(nks->ns,(char*)(&slen),sizeof(slen))!=sizeof(slen)) { 
2886             ERROR("Cannot receive data len in read_key_net\n");
2887             palacios_free(temptag);
2888             return -1;
2889         }
2890
2891         if (slen!=len) {
2892             ERROR("Data len expected does not matched data len decoded in read_key_net\n");
2893             palacios_free(temptag);
2894             return -1;
2895         }
2896
2897         if (recv_msg(nks->ns,buf,len)!=len) { 
2898             ERROR("Cannot recieve data in read_key_net\n");
2899             palacios_free(temptag);
2900             return -1;
2901         }
2902
2903         palacios_free(temptag);
2904         
2905     } else {
2906         ERROR("Permissions incorrect for the stream in read_key_net\n");
2907         return -1;
2908     }
2909
2910     return len;
2911     
2912 }
2913
2914
2915 /***************************************************************************************************
2916   Generic interface
2917 *************************************************************************************************/
2918
2919 static v3_keyed_stream_t open_stream(char *url,
2920                                      v3_keyed_stream_open_t ot)
2921 {
2922     if (!strncasecmp(url,"mem:",4)) { 
2923         return open_stream_mem(url,ot);
2924     } else if (!strncasecmp(url,"file:",5)) { 
2925         return open_stream_file(url,ot);
2926     } else if (!strncasecmp(url,"user:",5)) { 
2927         return open_stream_user(url,ot);
2928     } else if (!strncasecmp(url,"net:",4)){
2929         return open_stream_net(url,ot);
2930     } else if (!strncasecmp(url,"textfile:",9)) { 
2931         return open_stream_textfile(url,ot);
2932     } else {
2933         ERROR("unsupported type in attempt to open keyed stream \"%s\"\n",url);
2934         return 0;
2935     }
2936 }
2937
2938 static void close_stream(v3_keyed_stream_t stream)
2939 {
2940     struct generic_keyed_stream *gks = (struct generic_keyed_stream *) stream;
2941     switch (gks->stype){ 
2942         case STREAM_MEM:
2943             return close_stream_mem(stream);
2944             break;
2945         case STREAM_FILE:
2946             return close_stream_file(stream);
2947             break;
2948         case STREAM_TEXTFILE:
2949             return close_stream_textfile(stream);
2950             break;
2951         case STREAM_USER:
2952             return close_stream_user(stream);
2953             break;
2954         case STREAM_NETWORK:
2955             return close_stream_net(stream);
2956             break;
2957         default:
2958             ERROR("unknown stream type %d in close\n",gks->stype);
2959             break;
2960     }
2961 }
2962
2963 static void preallocate_hint_key(v3_keyed_stream_t stream,
2964                                  char *key,
2965                                  uint64_t size)
2966 {
2967     struct generic_keyed_stream *gks = (struct generic_keyed_stream *) stream;
2968     switch (gks->stype){ 
2969         case STREAM_MEM:
2970             preallocate_hint_key_mem(stream,key,size);
2971             break;
2972         case STREAM_FILE:
2973             preallocate_hint_key_file(stream,key,size);
2974             break;
2975         case STREAM_TEXTFILE:
2976             preallocate_hint_key_textfile(stream,key,size);
2977             break;
2978         case STREAM_USER:
2979             return preallocate_hint_key_user(stream,key,size);
2980             break;
2981         case STREAM_NETWORK:
2982             return preallocate_hint_key_net(stream,key,size);
2983             break;
2984         default:
2985             ERROR("unknown stream type %d in preallocate_hint_key\n",gks->stype);
2986             break;
2987     }
2988     return;
2989 }
2990
2991
2992 static v3_keyed_stream_key_t open_key(v3_keyed_stream_t stream,
2993                                       char *key)
2994 {
2995     struct generic_keyed_stream *gks = (struct generic_keyed_stream *) stream;
2996     switch (gks->stype){ 
2997         case STREAM_MEM:
2998             return open_key_mem(stream,key);
2999             break;
3000         case STREAM_FILE:
3001             return open_key_file(stream,key);
3002             break;
3003         case STREAM_TEXTFILE:
3004             return open_key_textfile(stream,key);
3005             break;
3006         case STREAM_USER:
3007             return open_key_user(stream,key);
3008             break;
3009         case STREAM_NETWORK:
3010             return open_key_net(stream, key);
3011             break;
3012         default:
3013             ERROR("unknown stream type %d in open_key\n",gks->stype);
3014             break;
3015     }
3016     return 0;
3017 }
3018
3019
3020 static void close_key(v3_keyed_stream_t stream, 
3021                       v3_keyed_stream_key_t key)
3022 {
3023     struct generic_keyed_stream *gks = (struct generic_keyed_stream *) stream;
3024     switch (gks->stype){ 
3025         case STREAM_MEM:
3026             return close_key_mem(stream,key);
3027             break;
3028         case STREAM_FILE:
3029             return close_key_file(stream,key);
3030             break;
3031         case STREAM_TEXTFILE:
3032             return close_key_textfile(stream,key);
3033             break;
3034         case STREAM_USER:
3035             return close_key_user(stream,key);
3036             break;
3037          case STREAM_NETWORK:
3038             return close_key_net(stream, key);
3039             break;      
3040         default:
3041             ERROR("unknown stream type %d in close_key\n",gks->stype);
3042             break;
3043     }
3044     // nothing to do
3045     return;
3046 }
3047
3048 static sint64_t write_key(v3_keyed_stream_t stream, 
3049                           v3_keyed_stream_key_t key,
3050                           void *tag,
3051                           sint64_t taglen,
3052                           void *buf,
3053                           sint64_t len)
3054 {
3055     struct generic_keyed_stream *gks = (struct generic_keyed_stream *) stream;
3056     switch (gks->stype){ 
3057         case STREAM_MEM:
3058             return write_key_mem(stream,key,tag,taglen,buf,len);
3059             break;
3060         case STREAM_FILE:
3061             return write_key_file(stream,key,tag,taglen,buf,len);
3062             break;
3063         case STREAM_TEXTFILE:
3064             return write_key_textfile(stream,key,tag,taglen,buf,len);
3065             break;
3066         case STREAM_USER:
3067             return write_key_user(stream,key,tag,taglen,buf,len);
3068             break;
3069         case STREAM_NETWORK:
3070             return write_key_net(stream,key,tag,taglen,buf,len);
3071             break;
3072         default:
3073             ERROR("unknown stream type %d in write_key\n",gks->stype);
3074             return -1;
3075             break;
3076     }
3077     return -1;
3078 }
3079
3080
3081 static sint64_t read_key(v3_keyed_stream_t stream, 
3082                          v3_keyed_stream_key_t key,
3083                          void *tag,
3084                          sint64_t taglen,
3085                          void *buf,
3086                          sint64_t len)
3087 {
3088     struct generic_keyed_stream *gks = (struct generic_keyed_stream *) stream;
3089     switch (gks->stype){ 
3090         case STREAM_MEM:
3091           return read_key_mem(stream,key,tag,taglen,buf,len);
3092             break;
3093         case STREAM_FILE:
3094             return read_key_file(stream,key,tag,taglen,buf,len);
3095             break;
3096         case STREAM_TEXTFILE:
3097             return read_key_textfile(stream,key,tag,taglen,buf,len);
3098             break;
3099         case STREAM_USER:
3100             return read_key_user(stream,key,tag,taglen,buf,len);
3101             break;
3102         case STREAM_NETWORK:
3103             return read_key_net(stream,key,tag,taglen,buf,len);
3104             break;
3105         default:
3106             ERROR("unknown stream type %d in read_key\n",gks->stype);
3107             return -1;
3108             break;
3109     }
3110     return -1;
3111 }
3112
3113
3114
3115
3116 /***************************************************************************************************
3117   Hooks to palacios and inititialization
3118 *************************************************************************************************/
3119
3120     
3121 static struct v3_keyed_stream_hooks hooks = {
3122     .open = open_stream,
3123     .close = close_stream,
3124     .preallocate_hint_key = preallocate_hint_key,
3125     .open_key = open_key,
3126     .close_key = close_key,
3127     .read_key = read_key,
3128     .write_key = write_key
3129 };
3130
3131
3132 static int init_keyed_streams( void )
3133 {
3134     mem_streams = palacios_create_htable(DEF_NUM_STREAMS,hash_func,hash_comp);
3135
3136     if (!mem_streams) { 
3137         ERROR("failed to allocated stream pool for in-memory streams\n");
3138         return -1;
3139     }
3140
3141     user_streams = palacios_alloc(sizeof(struct user_keyed_streams));
3142
3143     if (!user_streams) { 
3144         ERROR("failed to allocated list for user streams\n");
3145         return -1;
3146     }
3147
3148     INIT_LIST_HEAD(&(user_streams->streams));
3149     
3150     palacios_spinlock_init(&(user_streams->lock));
3151
3152     V3_Init_Keyed_Streams(&hooks);
3153
3154     return 0;
3155
3156 }
3157
3158 static int deinit_keyed_streams( void )
3159 {
3160     palacios_free_htable(mem_streams,1,1);
3161
3162     palacios_spinlock_deinit(&(user_streams->lock));
3163
3164     palacios_free(user_streams);
3165
3166     WARNING("Deinit of Palacios Keyed Streams likely leaked memory\n");
3167
3168     return 0;
3169 }
3170
3171
3172 static int guest_init_keyed_streams(struct v3_guest * guest, void ** vm_data ) 
3173 {
3174     
3175     add_guest_ctrl(guest, V3_VM_KSTREAM_USER_CONNECT, keyed_stream_connect_user, 0);
3176     
3177     return 0;
3178 }
3179
3180
3181 static int guest_deinit_keyed_streams(struct v3_guest * guest, void * vm_data)
3182 {
3183     remove_guest_ctrl(guest, V3_VM_KSTREAM_USER_CONNECT);
3184
3185     return 0;
3186 }
3187
3188
3189
3190
3191 static struct linux_ext key_stream_ext = {
3192     .name = "KEYED_STREAM_INTERFACE",
3193     .init = init_keyed_streams,
3194     .deinit = deinit_keyed_streams,
3195     .guest_init = guest_init_keyed_streams,
3196     .guest_deinit = guest_deinit_keyed_streams, 
3197 };
3198
3199
3200 register_extension(&key_stream_ext);