--- /dev/null
+#include <unistd.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <malloc.h>
+#include <string.h>
+#include <stdio.h>
+
+#include "v3_user_keyed_stream.h"
+
+
+int v3_user_keyed_stream_attach(char *vmdev, char *url)
+{
+ int vmfd;
+ int devfd;
+
+ struct palacios_user_keyed_stream_url *u;
+
+ u=malloc(sizeof(struct palacios_user_keyed_stream_url)+strlen(url)+1);
+
+ if (!u) {
+ return -1;
+ }
+
+ strcpy(u->url,url);
+ u->len = strlen(url)+1;
+
+
+ if ((vmfd=open(vmdev,O_RDWR))<0) {
+ free(u);
+ return -1;
+ }
+
+ devfd = ioctl(vmfd,V3_VM_KSTREAM_USER_CONNECT,u);
+
+ close(vmfd);
+
+ free(u);
+
+ return devfd;
+
+}
+int v3_user_keyed_stream_detach(int devfd)
+{
+ return close(devfd);
+}
+
+
+int v3_user_keyed_stream_have_request(int devfd)
+{
+ uint64_t len;
+
+ int rc=ioctl(devfd,V3_KSTREAM_REQUEST_SIZE_IOCTL,&len);
+
+ return rc==1;
+}
+
+int v3_user_keyed_stream_pull_request(int devfd, struct palacios_user_keyed_stream_op **req)
+{
+ uint64_t len;
+ int rc;
+
+ rc=ioctl(devfd,V3_KSTREAM_REQUEST_SIZE_IOCTL,&len);
+
+ if (rc<=0) {
+ return -1;
+ } else {
+ struct palacios_user_keyed_stream_op *r = malloc(len);
+
+ if (!r) {
+ fprintf(stderr,"malloc failed\n");
+ return -1;
+ }
+
+ rc=ioctl(devfd, V3_KSTREAM_REQUEST_PULL_IOCTL,r);
+
+
+ if (rc<=0) {
+ free(r);
+ return -1;
+ } else {
+ *req=r;
+ return 0;
+ }
+ }
+}
+
+
+int v3_user_keyed_stream_push_response(int devfd, struct palacios_user_keyed_stream_op *resp)
+{
+ int rc;
+
+ rc=ioctl(devfd,V3_KSTREAM_RESPONSE_PUSH_IOCTL,resp);
+
+ if (rc<=0) {
+ return -1;
+ } else {
+ return 0;
+ }
+}
+
+
+
+
+
--- /dev/null
+#include <unistd.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <stdint.h>
+#include <string.h>
+#include <sys/select.h>
+#include <malloc.h>
+
+#define sint64_t int64_t
+
+#include "v3_user_keyed_stream.h"
+
+void usage()
+{
+ fprintf(stderr,"v3_user_keyed_stream_example /dev/v3-vm0 user:mystreamtype:mystream busywait|select\n");
+}
+
+
+int do_work(struct palacios_user_keyed_stream_op *req,
+ struct palacios_user_keyed_stream_op **resp)
+{
+ uint64_t datasize;
+
+ //
+ //
+ // Process request here
+ //
+ // req->len : total structure length
+ // req->type : request type (currently open/close key and read/write key
+ // req->xfer : unused
+ // req->user_key : the opaque key previously provided by you by an open key
+ // req->buf_len : length of data
+ // req->buf : buffer (contains key name (open key) or value (write key))
+ //
+
+ // now built a response
+ *resp = malloc(sizeof(struct palacios_user_keyed_stream_op) + datasize);
+ (*resp)->len = sizeof(struct palacios_user_keyed_stream_op) + datasize;
+ (*resp)->buf_len = datasize;
+ (*resp)->type = req->type;
+ (*resp)->user_key = req->user_key;
+
+ //
+ // The response
+ //
+ // resp->len : total structure length
+ // resp->type : response type - must match the request
+ // resp->xfer : contains the size of data read or written (in read key or write key)
+ // resp->user_key : unused
+ // resp->buf_len : length of data following
+ // resp->buf : buffer (contains the data (read key))
+
+
+ return 0;
+}
+
+int main(int argc, char *argv[])
+{
+ int devfd;
+ int mode=0;
+ char *vm, *url;
+
+ if (argc!=4) {
+ usage();
+ exit(-1);
+ }
+
+ vm=argv[1];
+ url=argv[2];
+ mode = argv[3][0]=='s';
+
+ // The URL should begin with user:
+ // the remainder can be used to demultiplex internally
+ // for example user:file:foo might refer to a user-side file-based implementation
+ //
+
+ if (strncmp(url,"user:",5)) {
+ fprintf(stderr, "URL %s is not a user: url\n");
+ exit(-1);
+ }
+
+ fprintf(stderr,"Attempting to attach to vm %s as url %s\n", vm, url);
+
+ if ((devfd = v3_user_keyed_stream_attach(vm,url))<0) {
+ perror("failed to attach");
+ exit(-1);
+ }
+
+ fprintf(stderr,"Attachment succeeded, I will now operate in %s mode\n", mode==0 ? "busywait" : "select");
+
+ if (mode==0) {
+ //busywait
+
+ struct palacios_user_keyed_stream_op *req;
+ struct palacios_user_keyed_stream_op *resp;
+ uint64_t datasize;
+
+ while (1) {
+ while (!(v3_user_keyed_stream_have_request(devfd))) {
+ }
+ v3_user_keyed_stream_pull_request(devfd, &req);
+
+ do_work(req, &resp);
+
+ v3_user_keyed_stream_push_response(devfd, resp);
+
+ free(resp);
+ free(req);
+ }
+ } else {
+
+ struct palacios_user_keyed_stream_op *req;
+ struct palacios_user_keyed_stream_op *resp;
+ uint64_t datasize;
+ fd_set readset;
+ int rc;
+
+ // select-based operation so that you can wait for multiple things
+
+ while (1) {
+ FD_ZERO(&readset);
+ FD_SET(devfd,&readset);
+
+ rc = select(devfd+1, &readset, 0, 0, 0); // pick whatever you want to select on, just include devfd
+
+ if (rc>0) {
+ if (FD_ISSET(devfd,&readset)) {
+ // a request is read for us!
+ v3_user_keyed_stream_pull_request(devfd, &req);
+
+ do_work(req, &resp);
+
+ v3_user_keyed_stream_push_response(devfd, resp);
+
+ free(resp);
+ free(req);
+ }
+ }
+ }
+ }
+
+ v3_user_keyed_stream_detatch(devfd);
+
+ return 0;
+
+}
--- /dev/null
+#include <unistd.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <stdint.h>
+#include <string.h>
+#include <sys/select.h>
+#include <malloc.h>
+#include <alloca.h>
+#include <sys/types.h>
+#include <dirent.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+#define sint64_t int64_t
+
+#include "v3_user_keyed_stream.h"
+
+void usage()
+{
+ fprintf(stderr,"v3_user_keyed_stream_file /dev/v3-vm0 user:file:stream\n");
+}
+
+char *dir;
+
+int dir_setup(char *dir)
+{
+ DIR *d;
+ int created=0;
+ int fd;
+
+ char buf[strlen(dir)+strlen("/.palacios_keyed_stream_user_file")+1];
+
+
+ strcpy(buf,dir);
+ strcat(buf,"/.palacios_keyed_stream_user_file");
+
+ d=opendir(dir);
+
+ // does the directory exist or can we create it
+ if (d) {
+ closedir(d);
+ } else {
+ if (mkdir(dir,0700)<0) {
+ perror("cannot create directory");
+ return -1;
+ } else {
+ created=1;
+ }
+ }
+
+ // can we write to it?
+
+ fd = open(buf,O_RDWR | O_CREAT,0600);
+
+ if (fd<0) {
+ perror("cannot write directory");
+ if (created) {
+ rmdir(dir);
+ }
+ return -1;
+ }
+
+ // ok, we are done
+
+ close(fd);
+
+ return 0;
+}
+
+
+
+int handle_open_key(struct palacios_user_keyed_stream_op *req,
+ struct palacios_user_keyed_stream_op **resp,
+ char *dir)
+{
+ int fd;
+ char fn[strlen(dir)+req->buf_len+1];
+
+ strcpy(fn,dir);
+ strcat(fn,"/");
+ strcat(fn,req->buf);
+
+
+ fd = open(fn,O_RDWR | O_CREAT,0600);
+
+ (*resp) = malloc(sizeof(struct palacios_user_keyed_stream_op)+0);
+
+ if (!(*resp)) {
+ return -1;
+ }
+
+ (*resp)->len=sizeof(struct palacios_user_keyed_stream_op);
+ (*resp)->type=req->type;
+ (*resp)->xfer=0;
+ (*resp)->user_key=(void*)fd;
+ (*resp)->buf_len=0;
+
+ return 0;
+
+}
+
+int handle_close_key(struct palacios_user_keyed_stream_op *req,
+ struct palacios_user_keyed_stream_op **resp,
+ char *dir)
+{
+ int fd;
+ int rc;
+
+ fd = (int) (req->user_key);
+
+ rc = close(fd);
+
+ (*resp) = malloc(sizeof(struct palacios_user_keyed_stream_op)+0);
+
+ if (!(*resp)) {
+ return -1;
+ }
+
+ (*resp)->len=sizeof(struct palacios_user_keyed_stream_op);
+ (*resp)->type=req->type;
+ (*resp)->xfer=rc;
+ (*resp)->user_key=(void*)fd;
+ (*resp)->buf_len=0;
+
+ return 0;
+
+}
+
+int read_all(int fd, char *buf, int len)
+{
+ int xfer;
+ int left;
+
+ left=len;
+
+ while (left>0) {
+ xfer=read(fd, buf+len-left,left);
+ if (xfer<0) {
+ perror("cannot read file");
+ return -1;
+ } else {
+ left-=xfer;
+ }
+ }
+ return len;
+}
+
+int write_all(int fd, char *buf, int len)
+{
+ int xfer;
+ int left;
+
+ left=len;
+
+ while (left>0) {
+ xfer=write(fd, buf+len-left,left);
+ if (xfer<0) {
+ perror("cannot write file");
+ return -1;
+ } else {
+ left-=xfer;
+ }
+ }
+ return len;
+}
+
+
+int handle_write_key(struct palacios_user_keyed_stream_op *req,
+ struct palacios_user_keyed_stream_op **resp,
+ char *dir)
+{
+ int fd;
+ int rc;
+
+ fd = (int) (req->user_key);
+
+ rc = write_all(fd,req->buf,req->xfer);
+
+ (*resp) = malloc(sizeof(struct palacios_user_keyed_stream_op)+0);
+
+ if (!(*resp)) {
+ return -1;
+ }
+
+ (*resp)->len=sizeof(struct palacios_user_keyed_stream_op);
+ (*resp)->type=req->type;
+ (*resp)->xfer=rc;
+ (*resp)->user_key=(void*)fd;
+ (*resp)->buf_len=0;
+
+
+ return 0;
+
+}
+
+int handle_read_key(struct palacios_user_keyed_stream_op *req,
+ struct palacios_user_keyed_stream_op **resp,
+ char *dir)
+{
+ int fd;
+ int rc;
+
+ fd = (int) (req->user_key);
+
+ (*resp) = malloc(sizeof(struct palacios_user_keyed_stream_op)+req->xfer);
+
+ if (!(*resp)) {
+ return -1;
+ }
+
+ rc = read_all(fd,(*resp)->buf,req->xfer);
+
+ (*resp)->len=sizeof(struct palacios_user_keyed_stream_op) + (rc>0 ? rc : 0);
+ (*resp)->type=req->type;
+ (*resp)->xfer=rc;
+ (*resp)->user_key=(void*)fd;
+ (*resp)->buf_len=rc>0 ? rc : 0;
+
+
+ return 0;
+
+}
+
+
+
+
+int handle_request(struct palacios_user_keyed_stream_op *req,
+ struct palacios_user_keyed_stream_op **resp,
+ char *dir)
+{
+ uint64_t datasize;
+
+ switch (req->type) {
+ case PALACIOS_KSTREAM_OPEN:
+ case PALACIOS_KSTREAM_CLOSE:
+ fprintf(stderr,"unsupported stream open or close\n");
+ return -1;
+ break;
+
+ case PALACIOS_KSTREAM_OPEN_KEY:
+ return handle_open_key(req,resp,dir);
+ break;
+ case PALACIOS_KSTREAM_CLOSE_KEY:
+ return handle_close_key(req,resp,dir);
+ break;
+ case PALACIOS_KSTREAM_READ_KEY:
+ return handle_read_key(req,resp,dir);
+ break;
+ case PALACIOS_KSTREAM_WRITE_KEY:
+ return handle_write_key(req,resp,dir);
+ break;
+ default:
+ fprintf(stderr,"unknown request type\n");
+ return -1;
+ break;
+ }
+
+ return 0;
+}
+
+
+int run(int devfd, char *dir)
+{
+ struct palacios_user_keyed_stream_op *req;
+ struct palacios_user_keyed_stream_op *resp;
+ fd_set readset;
+ int rc;
+
+ while (1) {
+ FD_ZERO(&readset);
+ FD_SET(devfd,&readset);
+
+ rc = select(devfd+1, &readset, 0, 0, 0);
+
+ if (rc>0) {
+ if (FD_ISSET(devfd,&readset)) {
+
+ int err;
+
+ if (v3_user_keyed_stream_pull_request(devfd, &req)) {
+ fprintf(stderr, "could not get request\n");
+ free(req);
+ return -1;
+ }
+
+ err=handle_request(req, &resp, dir);
+
+ if (v3_user_keyed_stream_push_response(devfd, resp)) {
+ fprintf(stderr,"could not send response\n");
+ free(req);
+ free(resp);
+ return -1;
+ }
+
+ if (err) {
+ fprintf(stderr, "request handling resulted in an error, continuing\n");
+ }
+
+ free(req);
+ free(resp);
+ }
+ }
+ }
+
+ return 0;
+}
+
+
+
+int main(int argc, char *argv[])
+{
+ int devfd;
+ char *vm, *url;
+ char *dir;
+
+ if (argc!=3) {
+ usage();
+ exit(-1);
+ }
+
+ vm=argv[1];
+ url=argv[2];
+
+ if (strncmp(url,"user:file:",10)) {
+ fprintf(stderr, "Url %s is not a user:file: url\n",url);
+ exit(-1);
+ }
+
+ dir = url+10;
+
+ if (dir_setup(dir)) {
+ fprintf(stderr,"Unable to open or create directory %s\n",dir);
+ return -1;
+ }
+
+ fprintf(stderr,"Attempting to attach to vm %s as url %s\n", vm, url);
+
+ if ((devfd = v3_user_keyed_stream_attach(vm,url))<0) {
+ perror("failed to attach");
+ exit(-1);
+ }
+
+ fprintf(stderr,"Attached and running\n");
+
+ run(devfd,dir);
+
+ v3_user_keyed_stream_detach(devfd);
+
+ return 0;
+
+}
+