Palacios Public Git Repository

To checkout Palacios execute

  git clone http://v3vee.org/palacios/palacios.web/palacios.git
This will give you the master branch. You probably want the devel branch or one of the release branches. To switch to the devel branch, simply execute
  cd palacios
  git checkout --track -b devel origin/devel
The other branches are similar.


Merge branch 'devel' of newskysaw.cs.northwestern.edu:/home/palacios/palacios into...
[palacios.git] / linux_module / iface-stream.c
1 /*
2  * Stream Implementation
3  * (c) Lei Xia  2010
4  */
5  
6  
7 #include <linux/errno.h>
8 #include <linux/percpu.h>
9 #include <linux/sched.h>
10 #include <linux/uaccess.h>
11 #include <linux/fs.h>
12 #include <linux/poll.h>
13 #include <linux/anon_inodes.h>
14 #include <linux/file.h>
15
16
17 #include <interfaces/vmm_stream.h>
18 #include "linux-exts.h"
19 #include "util-ringbuffer.h"
20 #include "vm.h"
21 #include "iface-stream.h"
22
23
24 // This is probably overkill
25 #define STREAM_RING_LEN 4096
26
27
28
29
30 static struct list_head global_streams;
31
32
33
34 struct stream_state {
35     char name[STREAM_NAME_LEN];
36
37     struct ringbuf * out_ring;
38
39     int connected;
40
41     wait_queue_head_t intr_queue;
42     spinlock_t lock;
43
44     struct v3_guest * guest;
45     struct list_head stream_node;
46
47     struct v3_stream * v3_stream;
48 };
49
50
51 // Currently just the list of open streams
52 struct vm_global_streams {
53     struct list_head open_streams;
54 };
55
56
57
58
59
60 static struct stream_state * find_stream_by_name(struct v3_guest * guest, const char * name) {
61     struct stream_state * stream = NULL;
62     struct list_head * stream_list = NULL;
63     struct vm_global_streams * vm_state = NULL;
64
65     if (guest == NULL) {
66         stream_list = &global_streams;
67     } else {
68         vm_state = get_vm_ext_data(guest, "STREAM_INTERFACE");
69
70         if (vm_state == NULL) {
71             printk("ERROR: Could not locate vm stream state for extension STREAM_INTERFACE\n");
72             return NULL;
73         }
74
75         stream_list = &(vm_state->open_streams);
76     }
77
78     list_for_each_entry(stream,  stream_list, stream_node) {
79         if (strncmp(stream->name, name, STREAM_NAME_LEN) == 0) {
80             return stream;
81         }
82     }
83
84     return NULL;
85 }
86
87
88
89 #define TMP_BUF_LEN 128
90
91 static ssize_t stream_read(struct file * filp, char __user * buf, size_t size, loff_t * offset) {
92     struct stream_state * stream = filp->private_data;
93     ssize_t bytes_read = 0;
94     ssize_t bytes_left = size;
95     unsigned long flags;
96     char tmp_buf[TMP_BUF_LEN];
97     ssize_t total_bytes_left = 0;
98
99     // memset(tmp_buf, 0, TMP_BUF_LEN);
100
101     while (bytes_left > 0) {
102         int tmp_len = (TMP_BUF_LEN > bytes_left) ? bytes_left : TMP_BUF_LEN;
103         int tmp_read = 0;
104
105         spin_lock_irqsave(&(stream->lock), flags);
106         tmp_read = ringbuf_read(stream->out_ring, tmp_buf, tmp_len);
107         spin_unlock_irqrestore(&(stream->lock), flags);
108
109         if (tmp_read == 0) {
110             // If userspace reads more than we have
111             break;
112         }
113
114         if (copy_to_user(buf + bytes_read, tmp_buf, tmp_read)) {
115             printk("Read Fault\n");
116             return -EFAULT;
117         }
118         
119         bytes_left -= tmp_read;
120         bytes_read += tmp_read;
121     }
122     
123
124     spin_lock_irqsave(&(stream->lock), flags); 
125     total_bytes_left = ringbuf_data_len(stream->out_ring);
126     spin_unlock_irqrestore(&(stream->lock), flags);
127
128     if (total_bytes_left > 0) {
129         wake_up_interruptible(&(stream->intr_queue));
130     }
131
132     return bytes_read;
133 }
134
135 static unsigned int 
136 stream_poll(struct file * filp, struct poll_table_struct * poll_tb) {
137     struct stream_state * stream = filp->private_data;
138     unsigned int mask = POLLIN | POLLRDNORM;
139     unsigned long flags;
140     int data_avail = 0;
141
142     poll_wait(filp, &(stream->intr_queue), poll_tb);
143
144     spin_lock_irqsave(&(stream->lock), flags);
145     data_avail = ringbuf_data_len(stream->out_ring);
146     spin_unlock_irqrestore(&(stream->lock), flags);
147
148     if (data_avail > 0) {
149         return mask;
150     }
151
152     return 0;
153
154 }
155
156 static ssize_t stream_write(struct file * filp, const char __user * buf, size_t size, loff_t * offset) {
157     struct stream_state * stream = filp->private_data;
158     char * kern_buf = NULL;
159     ssize_t bytes_written = 0;
160     
161     kern_buf = kmalloc(size, GFP_KERNEL);
162
163     if (copy_from_user(kern_buf, buf, size)) {
164         printk("Stream Write Failed\n");
165         return -EFAULT;
166     };
167     
168     bytes_written = stream->v3_stream->input(stream->v3_stream, kern_buf, size);
169
170     kfree(kern_buf);
171
172     return bytes_written;
173 }
174
175
176 static int stream_release(struct inode * i, struct file * filp) {
177     struct stream_state * stream = filp->private_data;
178     unsigned long flags;
179     
180     spin_lock_irqsave(&(stream->lock), flags);
181     stream->connected = 0;
182     spin_unlock_irqrestore(&(stream->lock), flags);
183
184     
185     return 0;
186
187 }
188
189 static struct file_operations stream_fops = {
190     .read = stream_read,
191     .write = stream_write,
192     .release = stream_release,
193     .poll = stream_poll,
194 };
195
196
197
198 static void * palacios_stream_open(struct v3_stream * v3_stream, const char * name, void * private_data) {
199     struct v3_guest * guest = (struct v3_guest *)private_data;
200     struct stream_state * stream = NULL;
201     struct vm_global_streams * vm_state = NULL;
202
203     if (guest != NULL) {
204         vm_state = get_vm_ext_data(guest, "STREAM_INTERFACE");
205
206         if (vm_state == NULL) {
207             printk("ERROR: Could not locate vm stream state for extension STREAM_INTERFACE\n");
208             return NULL;
209         }
210     }
211
212     if (find_stream_by_name(guest, name) != NULL) {
213         printk("Stream already exists\n");
214         return NULL;
215     }
216
217     stream = kmalloc(sizeof(struct stream_state), GFP_KERNEL);
218     memset(stream, 0, sizeof(struct stream_state));
219
220     stream->out_ring = create_ringbuf(STREAM_RING_LEN);
221     stream->v3_stream = v3_stream;
222     stream->guest = guest;
223     stream->connected = 0;
224
225     strncpy(stream->name, name, STREAM_NAME_LEN - 1);
226
227     init_waitqueue_head(&(stream->intr_queue));
228     spin_lock_init(&(stream->lock));
229
230     if (guest == NULL) {
231         list_add(&(stream->stream_node), &(global_streams));
232     } else {
233         list_add(&(stream->stream_node), &(vm_state->open_streams));
234     } 
235
236     return stream;
237 }
238
239
240 static uint64_t palacios_stream_output(struct v3_stream * v3_stream, char * buf, int len) {
241     struct stream_state * stream = (struct stream_state *)v3_stream->host_stream_data;
242     int bytes_written = 0;
243     unsigned long flags;
244     
245
246     if (stream->connected == 0) {
247         return 0;
248     }
249
250     while (bytes_written < len) {
251         spin_lock_irqsave(&(stream->lock), flags);
252         bytes_written += ringbuf_write(stream->out_ring, buf + bytes_written, len - bytes_written);
253         spin_unlock_irqrestore(&(stream->lock), flags);
254
255         wake_up_interruptible(&(stream->intr_queue));
256
257         if (bytes_written < len) {
258             // not enough space in ringbuffer, activate user space to drain it
259             schedule();
260         }
261     }
262
263     
264     return bytes_written;
265 }
266
267
268 static void palacios_stream_close(struct v3_stream * v3_stream) {
269     struct stream_state * stream = (struct stream_state *)v3_stream->host_stream_data;
270
271     free_ringbuf(stream->out_ring);
272     list_del(&(stream->stream_node));
273     kfree(stream);
274
275 }
276
277 static struct v3_stream_hooks palacios_stream_hooks = {
278     .open = palacios_stream_open,
279     .output = palacios_stream_output,
280     .close = palacios_stream_close,
281 };
282
283
284 static int stream_init( void ) {
285     INIT_LIST_HEAD(&(global_streams));
286     V3_Init_Stream(&palacios_stream_hooks);
287     
288     return 0;
289 }
290
291
292 static int stream_deinit( void ) {
293     if (!list_empty(&(global_streams))) {
294         printk("Error removing module with open streams\n");
295         printk("TODO: free old streams... \n");
296     }
297
298     return 0;
299 }
300
301
302
303
304
305 static int stream_connect(struct v3_guest * guest, unsigned int cmd, unsigned long arg, void * priv_data) {
306     void __user * argp = (void __user *)arg;
307     struct stream_state * stream = NULL;
308     int stream_fd = 0;
309     char name[STREAM_NAME_LEN];
310     unsigned long flags = 0;
311     int ret = -1;
312     
313     
314     if (copy_from_user(name, argp, STREAM_NAME_LEN)) {
315         printk("%s(%d): copy from user error...\n", __FILE__, __LINE__);
316         return -EFAULT;
317     }
318
319     stream = find_stream_by_name(guest, name);
320
321     if (stream == NULL) {
322         printk("Could not find stream (%s)\n", name);
323         return -EFAULT;
324     }
325
326     spin_lock_irqsave(&(stream->lock), flags);
327     if (stream->connected == 0) {
328         stream->connected = 1;
329         ret = 1;
330     }
331     spin_unlock_irqrestore(&(stream->lock), flags);
332
333
334     if (ret == -1) {
335         printk("Stream (%s) already connected\n", name);
336         return -EFAULT;
337     }
338
339     
340     stream_fd = anon_inode_getfd("v3-stream", &stream_fops, stream, O_RDWR);
341
342     if (stream_fd < 0) {
343         printk("Error creating stream inode for (%s)\n", name);
344         return stream_fd;
345     }
346
347     printk("Stream (%s) connected\n", name);
348
349     return stream_fd;
350 }
351
352
353 static int guest_stream_init(struct v3_guest * guest, void ** vm_data) {
354     struct vm_global_streams * state = kmalloc(sizeof(struct vm_global_streams), GFP_KERNEL);
355
356     INIT_LIST_HEAD(&(state->open_streams));
357     *vm_data = state;
358
359     add_guest_ctrl(guest, V3_VM_STREAM_CONNECT, stream_connect, state);
360
361     return 0;
362 }
363
364
365 static int guest_stream_deinit(struct v3_guest * guest, void * vm_data) {
366     struct vm_global_streams * state = vm_data;
367     if (!list_empty(&(state->open_streams))) {
368         printk("Error shutting down VM with open streams\n");
369     }
370
371     return 0;
372 }
373
374
375
376 static struct linux_ext stream_ext = {
377     .name = "STREAM_INTERFACE",
378     .init = stream_init,
379     .deinit = stream_deinit,
380     .guest_init = guest_stream_init,
381     .guest_deinit = guest_stream_deinit
382 };
383
384
385 register_extension(&stream_ext);