changeset 138:d6d78055f001

Change driver to use Pthreads
author Adam Chlipala <adamc@hcoop.net>
date Sat, 19 Jul 2008 18:56:57 -0400
parents 4ffdbf429e8d
children adfa2c7a75da
files include/lacweb.h src/c/driver.c src/c/lacweb.c src/compiler.sml
diffstat 4 files changed, 151 insertions(+), 73 deletions(-) [+]
line wrap: on
line diff
--- a/include/lacweb.h	Thu Jul 17 14:32:49 2008 -0400
+++ b/include/lacweb.h	Sat Jul 19 18:56:57 2008 -0400
@@ -8,6 +8,7 @@
 
 lw_context lw_init(size_t page_len, size_t heap_len);
 void lw_free(lw_context);
+void lw_reset(lw_context);
 void *lw_malloc(lw_context, size_t);
 int lw_send(lw_context, int sock);
 
--- a/src/c/driver.c	Thu Jul 17 14:32:49 2008 -0400
+++ b/src/c/driver.c	Sat Jul 19 18:56:57 2008 -0400
@@ -5,6 +5,8 @@
 #include <sys/socket.h>
 #include <netinet/in.h>
 
+#include <pthread.h>
+
 #include "lacweb.h"
 
 int lw_port = 8080;
@@ -13,88 +15,145 @@
 
 void lw_handle(lw_context, char*);
 
-static void worker(int sock) {
-  char buf[lw_bufsize+1], *back = buf, *s;
+typedef struct node {
+  int fd;
+  struct node *next;
+} *node;
+
+static node front = NULL, back = NULL;
+
+static int empty() {
+  return front == NULL;
+}
+
+static void enqueue(int fd) {
+  node n = malloc(sizeof(struct node));
+
+  n->fd = fd;
+  n->next = NULL;
+  if (back)
+    back->next = n;
+  else
+    front = n;
+  back = n;
+}
+
+static int dequeue() {
+  int ret = front->fd;
+
+  front = front->next;
+  if (!front)
+    back = NULL;
+
+  return ret;
+}
+
+static pthread_mutex_t queue_mutex = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
+
+static void *worker(void *data) {
+  int me = *(int *)data;
+  lw_context ctx = lw_init(1024, 1024);
 
   while (1) {
-    int r = recv(sock, back, lw_bufsize - (back - buf), 0);
+    char buf[lw_bufsize+1], *back = buf, *s;
+    int sock;
 
-    if (r < 0) {
-      fprintf(stderr, "Recv failed\n");
-      close(sock);
-      return;
+    pthread_mutex_lock(&queue_mutex);
+    while (empty())
+      pthread_cond_wait(&queue_cond, &queue_mutex);
+    sock = dequeue();
+    pthread_mutex_unlock(&queue_mutex);
+
+    printf("Handling connection with thread #%d.\n", me);
+
+    while (1) {
+      int r = recv(sock, back, lw_bufsize - (back - buf), 0);
+
+      if (r < 0) {
+        fprintf(stderr, "Recv failed\n");
+        break;
+      }
+
+      if (r == 0) {
+        printf("Connection closed.\n");
+        break;
+      }
+
+      printf("Received %d bytes.\n", r);
+
+      back += r;
+      *back = 0;
+    
+      if (s = strstr(buf, "\r\n\r\n")) {
+        char *cmd, *path;
+
+        *s = 0;
+      
+        if (!(s = strstr(buf, "\r\n"))) {
+          fprintf(stderr, "No newline in buf\n");
+          break;
+        }
+
+        *s = 0;
+        cmd = s = buf;
+      
+        if (!strsep(&s, " ")) {
+          fprintf(stderr, "No first space in HTTP command\n");
+          break;
+        }
+
+        if (strcmp(cmd, "GET")) {
+          fprintf(stderr, "Not ready for non-get command: %s\n", cmd);
+          break;
+        }
+
+        path = s;
+        if (!strsep(&s, " ")) {
+          fprintf(stderr, "No second space in HTTP command\n");
+          break;
+        }
+
+        printf("Serving URI %s....\n", path);
+
+        ctx = lw_init(1024, 1024);
+        lw_write (ctx, "HTTP/1.1 200 OK\r\n");
+        lw_write(ctx, "Content-type: text/html\r\n\r\n");
+        lw_write(ctx, "<html>");
+        lw_handle(ctx, path);
+        lw_write(ctx, "</html>");
+
+        lw_send(ctx, sock);
+
+        printf("Done with client.\n\n");
+        break;
+      }
     }
 
-    if (r == 0) {
-      printf("Connection closed.\n");
-      close(sock);
-      return;
-    }
-
-    printf("Received %d bytes.\n", r);
-
-    back += r;
-    *back = 0;
-    
-    if (s = strstr(buf, "\r\n\r\n")) {
-      char *cmd, *path;
-      lw_context ctx;
-
-      *s = 0;
-      
-      if (!(s = strstr(buf, "\r\n"))) {
-        fprintf(stderr, "No newline in buf\n");
-        close(sock);
-        return;
-      }
-
-      *s = 0;
-      cmd = s = buf;
-      
-      if (!strsep(&s, " ")) {
-        fprintf(stderr, "No first space in HTTP command\n");
-        close(sock);
-        return;
-      }
-
-      if (strcmp(cmd, "GET")) {
-        fprintf(stderr, "Not ready for non-get command: %s\n", cmd);
-        close(sock);
-        return;
-      }
-
-      path = s;
-      if (!strsep(&s, " ")) {
-        fprintf(stderr, "No second space in HTTP command\n");
-        close(sock);
-        return;
-      }
-
-      printf("Serving URI %s....\n", path);
-
-      ctx = lw_init(1024, 1024);
-      lw_write (ctx, "HTTP/1.1 200 OK\r\n");
-      lw_write(ctx, "Content-type: text/html\r\n\r\n");
-      lw_write(ctx, "<html>");
-      lw_handle(ctx, path);
-      lw_write(ctx, "</html>");
-
-      lw_send(ctx, sock);
-
-      lw_free(ctx);
-      printf("Done with client.\n\n");
-      close(sock);
-      return;
-    }
+    close(sock);
+    lw_reset(ctx);
   }
 }
 
-int main() {
+int main(int argc, char *argv[]) {
   // The skeleton for this function comes from Beej's sockets tutorial.
-  int sockfd, new_fd;  // listen on sock_fd, new connection on new_fd
+  int sockfd;  // listen on sock_fd
   struct sockaddr_in my_addr;
   struct sockaddr_in their_addr; // connector's address information
   int sin_size, yes = 1;
+  int nthreads, i, *names;
+
+  if (argc < 2) {
+    fprintf(stderr, "No thread count specified\n");
+    return 1;
+  }
+
+  nthreads = atoi(argv[1]);
+  if (nthreads <= 0) {
+    fprintf(stderr, "Invalid thread count\n");
+    return 1;
+  }
+  names = calloc(nthreads, sizeof(int));
 
   sockfd = socket(PF_INET, SOCK_STREAM, 0); // do some error checking!
 
@@ -127,8 +186,17 @@
 
   printf("Listening on port %d....\n", lw_port);
 
+  for (i = 0; i < nthreads; ++i) {
+    pthread_t thread;    
+    names[i] = i;
+    if (pthread_create(&thread, NULL, worker, &names[i])) {
+      fprintf(stderr, "Error creating worker thread #%d\n", i);
+      return 1;
+    }
+  }
+
   while (1) {
-    new_fd = accept(sockfd, (struct sockaddr *)&their_addr, &sin_size);
+    int new_fd = accept(sockfd, (struct sockaddr *)&their_addr, &sin_size);
 
     if (new_fd < 0) {
       fprintf(stderr, "Socket accept failed\n");
@@ -136,6 +204,10 @@
     }
 
     printf("Accepted connection.\n");
-    worker(new_fd);
+
+    pthread_mutex_lock(&queue_mutex);
+    enqueue(new_fd);
+    pthread_mutex_unlock(&queue_mutex);
+    pthread_cond_broadcast(&queue_cond);
   }
 }
--- a/src/c/lacweb.c	Thu Jul 17 14:32:49 2008 -0400
+++ b/src/c/lacweb.c	Sat Jul 19 18:56:57 2008 -0400
@@ -31,6 +31,11 @@
   free(ctx);
 }
 
+void lw_reset(lw_context ctx) {
+  ctx->page_front = ctx->page;
+  ctx->heap_front = ctx->heap;
+}
+
 static void lw_check_heap(lw_context ctx, size_t extra) {
   if (ctx->heap_back - ctx->heap_front < extra) {
     size_t desired = ctx->heap_back - ctx->heap_front + extra, next;
--- a/src/compiler.sml	Thu Jul 17 14:32:49 2008 -0400
+++ b/src/compiler.sml	Sat Jul 19 18:56:57 2008 -0400
@@ -432,7 +432,7 @@
             val ename = "/tmp/webapp"
 
             val compile = "gcc -O3 -I include -c " ^ cname ^ " -o " ^ oname
-            val link = "gcc -O3 clib/lacweb.o " ^ oname ^ " clib/driver.o -o " ^ ename
+            val link = "gcc -pthread -O3 clib/lacweb.o " ^ oname ^ " clib/driver.o -o " ^ ename
 
             val outf = TextIO.openOut cname
             val s = TextIOPP.openOut {dst = outf, wid = 80}