2 * Stream Implementation
7 #include <linux/errno.h>
8 #include <linux/percpu.h>
9 #include <linux/sched.h>
10 #include <linux/uaccess.h>
12 #include <linux/poll.h>
13 #include <linux/anon_inodes.h>
14 #include <linux/file.h>
17 #include <interfaces/vmm_stream.h>
18 #include "linux-exts.h"
19 #include "util-ringbuffer.h"
24 // This is probably overkill
25 #define STREAM_RING_LEN 4096
26 #define STREAM_NAME_LEN 128
30 static struct list_head global_streams;
35 char name[STREAM_NAME_LEN];
37 struct ringbuf * out_ring;
41 wait_queue_head_t intr_queue;
44 struct v3_guest * guest;
45 struct list_head stream_node;
47 struct v3_stream * v3_stream;
51 // Currently just the list of open streams
52 struct vm_global_streams {
53 struct list_head open_streams;
60 static struct stream_state * find_stream_by_name(struct v3_guest * guest, const char * name) {
61 struct stream_state * stream = NULL;
62 struct list_head * stream_list = NULL;
63 struct vm_global_streams * vm_state = NULL;
66 stream_list = &global_streams;
68 vm_state = get_vm_ext_data(guest, "STREAM_INTERFACE");
70 if (vm_state == NULL) {
71 ERROR("ERROR: Could not locate vm stream state for extension STREAM_INTERFACE\n");
75 stream_list = &(vm_state->open_streams);
78 list_for_each_entry(stream, stream_list, stream_node) {
79 if (strncmp(stream->name, name, STREAM_NAME_LEN) == 0) {
89 #define TMP_BUF_LEN 128
91 static ssize_t stream_read(struct file * filp, char __user * buf, size_t size, loff_t * offset) {
92 struct stream_state * stream = filp->private_data;
93 ssize_t bytes_read = 0;
94 ssize_t bytes_left = size;
96 char tmp_buf[TMP_BUF_LEN];
97 ssize_t total_bytes_left = 0;
99 // memset(tmp_buf, 0, TMP_BUF_LEN);
101 while (bytes_left > 0) {
102 int tmp_len = (TMP_BUF_LEN > bytes_left) ? bytes_left : TMP_BUF_LEN;
105 spin_lock_irqsave(&(stream->lock), flags);
106 tmp_read = ringbuf_read(stream->out_ring, tmp_buf, tmp_len);
107 spin_unlock_irqrestore(&(stream->lock), flags);
110 // If userspace reads more than we have
114 if (copy_to_user(buf + bytes_read, tmp_buf, tmp_read)) {
115 ERROR("Read Fault\n");
119 bytes_left -= tmp_read;
120 bytes_read += tmp_read;
124 spin_lock_irqsave(&(stream->lock), flags);
125 total_bytes_left = ringbuf_data_len(stream->out_ring);
126 spin_unlock_irqrestore(&(stream->lock), flags);
128 if (total_bytes_left > 0) {
129 wake_up_interruptible(&(stream->intr_queue));
136 stream_poll(struct file * filp, struct poll_table_struct * poll_tb) {
137 struct stream_state * stream = filp->private_data;
138 unsigned int mask = POLLIN | POLLRDNORM;
142 poll_wait(filp, &(stream->intr_queue), poll_tb);
144 spin_lock_irqsave(&(stream->lock), flags);
145 data_avail = ringbuf_data_len(stream->out_ring);
146 spin_unlock_irqrestore(&(stream->lock), flags);
148 if (data_avail > 0) {
156 static ssize_t stream_write(struct file * filp, const char __user * buf, size_t size, loff_t * offset) {
157 struct stream_state * stream = filp->private_data;
158 char * kern_buf = NULL;
159 ssize_t bytes_written = 0;
161 kern_buf = kmalloc(size, GFP_KERNEL);
163 if (copy_from_user(kern_buf, buf, size)) {
164 ERROR("Stream Write Failed\n");
168 bytes_written = stream->v3_stream->input(stream->v3_stream, kern_buf, size);
172 return bytes_written;
176 static int stream_release(struct inode * i, struct file * filp) {
177 struct stream_state * stream = filp->private_data;
180 spin_lock_irqsave(&(stream->lock), flags);
181 stream->connected = 0;
182 spin_unlock_irqrestore(&(stream->lock), flags);
189 static struct file_operations stream_fops = {
191 .write = stream_write,
192 .release = stream_release,
198 static void * palacios_stream_open(struct v3_stream * v3_stream, const char * name, void * private_data) {
199 struct v3_guest * guest = (struct v3_guest *)private_data;
200 struct stream_state * stream = NULL;
201 struct vm_global_streams * vm_state = NULL;
204 vm_state = get_vm_ext_data(guest, "STREAM_INTERFACE");
206 if (vm_state == NULL) {
207 ERROR("ERROR: Could not locate vm stream state for extension STREAM_INTERFACE\n");
212 if (find_stream_by_name(guest, name) != NULL) {
213 ERROR("Stream already exists\n");
217 stream = kmalloc(sizeof(struct stream_state), GFP_KERNEL);
218 memset(stream, 0, sizeof(struct stream_state));
220 stream->out_ring = create_ringbuf(STREAM_RING_LEN);
221 stream->v3_stream = v3_stream;
222 stream->guest = guest;
223 stream->connected = 0;
225 strncpy(stream->name, name, STREAM_NAME_LEN - 1);
227 init_waitqueue_head(&(stream->intr_queue));
228 spin_lock_init(&(stream->lock));
231 list_add(&(stream->stream_node), &(global_streams));
233 list_add(&(stream->stream_node), &(vm_state->open_streams));
240 static uint64_t palacios_stream_output(struct v3_stream * v3_stream, char * buf, int len) {
241 struct stream_state * stream = (struct stream_state *)v3_stream->host_stream_data;
242 int bytes_written = 0;
246 if (stream->connected == 0) {
250 while (bytes_written < len) {
251 spin_lock_irqsave(&(stream->lock), flags);
252 bytes_written += ringbuf_write(stream->out_ring, buf + bytes_written, len - bytes_written);
253 spin_unlock_irqrestore(&(stream->lock), flags);
255 wake_up_interruptible(&(stream->intr_queue));
257 if (bytes_written < len) {
258 // not enough space in ringbuffer, activate user space to drain it
264 return bytes_written;
268 static void palacios_stream_close(struct v3_stream * v3_stream) {
269 struct stream_state * stream = (struct stream_state *)v3_stream->host_stream_data;
271 free_ringbuf(stream->out_ring);
272 list_del(&(stream->stream_node));
277 static struct v3_stream_hooks palacios_stream_hooks = {
278 .open = palacios_stream_open,
279 .output = palacios_stream_output,
280 .close = palacios_stream_close,
284 static int stream_init( void ) {
285 INIT_LIST_HEAD(&(global_streams));
286 V3_Init_Stream(&palacios_stream_hooks);
292 static int stream_deinit( void ) {
293 if (!list_empty(&(global_streams))) {
294 ERROR("Error removing module with open streams\n");
295 DEBUG("TODO: free old streams... \n");
305 static int stream_connect(struct v3_guest * guest, unsigned int cmd, unsigned long arg, void * priv_data) {
306 void __user * argp = (void __user *)arg;
307 struct stream_state * stream = NULL;
309 char name[STREAM_NAME_LEN];
310 unsigned long flags = 0;
314 if (copy_from_user(name, argp, STREAM_NAME_LEN)) {
315 ERROR("%s(%d): copy from user error...\n", __FILE__, __LINE__);
319 stream = find_stream_by_name(guest, name);
321 if (stream == NULL) {
322 ERROR("Could not find stream (%s)\n", name);
326 spin_lock_irqsave(&(stream->lock), flags);
327 if (stream->connected == 0) {
328 stream->connected = 1;
331 spin_unlock_irqrestore(&(stream->lock), flags);
335 ERROR("Stream (%s) already connected\n", name);
340 stream_fd = anon_inode_getfd("v3-stream", &stream_fops, stream, O_RDWR);
343 ERROR("Error creating stream inode for (%s)\n", name);
347 INFO("Stream (%s) connected\n", name);
353 static int guest_stream_init(struct v3_guest * guest, void ** vm_data) {
354 struct vm_global_streams * state = kmalloc(sizeof(struct vm_global_streams), GFP_KERNEL);
356 INIT_LIST_HEAD(&(state->open_streams));
359 add_guest_ctrl(guest, V3_VM_STREAM_CONNECT, stream_connect, state);
365 static int guest_stream_deinit(struct v3_guest * guest, void * vm_data) {
366 struct vm_global_streams * state = vm_data;
367 if (!list_empty(&(state->open_streams))) {
368 ERROR("Error shutting down VM with open streams\n");
376 static struct linux_ext stream_ext = {
377 .name = "STREAM_INTERFACE",
379 .deinit = stream_deinit,
380 .guest_init = guest_stream_init,
381 .guest_deinit = guest_stream_deinit
385 register_extension(&stream_ext);