# HG changeset patch # User Adam Chlipala # Date 1216508217 14400 # Node ID d6d78055f001dd2c631859473fda48a87c2dab0a # Parent 4ffdbf429e8d6fc0d6e588ab6a76eb09109c501d Change driver to use Pthreads diff -r 4ffdbf429e8d -r d6d78055f001 include/lacweb.h --- a/include/lacweb.h Thu Jul 17 14:32:49 2008 -0400 +++ b/include/lacweb.h Sat Jul 19 18:56:57 2008 -0400 @@ -8,6 +8,7 @@ lw_context lw_init(size_t page_len, size_t heap_len); void lw_free(lw_context); +void lw_reset(lw_context); void *lw_malloc(lw_context, size_t); int lw_send(lw_context, int sock); diff -r 4ffdbf429e8d -r d6d78055f001 src/c/driver.c --- a/src/c/driver.c Thu Jul 17 14:32:49 2008 -0400 +++ b/src/c/driver.c Sat Jul 19 18:56:57 2008 -0400 @@ -5,6 +5,8 @@ #include #include +#include + #include "lacweb.h" int lw_port = 8080; @@ -13,88 +15,145 @@ void lw_handle(lw_context, char*); -static void worker(int sock) { - char buf[lw_bufsize+1], *back = buf, *s; +typedef struct node { + int fd; + struct node *next; +} *node; + +static node front = NULL, back = NULL; + +static int empty() { + return front == NULL; +} + +static void enqueue(int fd) { + node n = malloc(sizeof(struct node)); + + n->fd = fd; + n->next = NULL; + if (back) + back->next = n; + else + front = n; + back = n; +} + +static int dequeue() { + int ret = front->fd; + + front = front->next; + if (!front) + back = NULL; + + return ret; +} + +static pthread_mutex_t queue_mutex = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER; + +static void *worker(void *data) { + int me = *(int *)data; + lw_context ctx = lw_init(1024, 1024); while (1) { - int r = recv(sock, back, lw_bufsize - (back - buf), 0); + char buf[lw_bufsize+1], *back = buf, *s; + int sock; - if (r < 0) { - fprintf(stderr, "Recv failed\n"); - close(sock); - return; + pthread_mutex_lock(&queue_mutex); + while (empty()) + pthread_cond_wait(&queue_cond, &queue_mutex); + sock = dequeue(); + pthread_mutex_unlock(&queue_mutex); + + printf("Handling connection with thread #%d.\n", me); + + while (1) { + int r = recv(sock, back, lw_bufsize - (back - buf), 0); + + if (r < 0) { + fprintf(stderr, "Recv failed\n"); + break; + } + + if (r == 0) { + printf("Connection closed.\n"); + break; + } + + printf("Received %d bytes.\n", r); + + back += r; + *back = 0; + + if (s = strstr(buf, "\r\n\r\n")) { + char *cmd, *path; + + *s = 0; + + if (!(s = strstr(buf, "\r\n"))) { + fprintf(stderr, "No newline in buf\n"); + break; + } + + *s = 0; + cmd = s = buf; + + if (!strsep(&s, " ")) { + fprintf(stderr, "No first space in HTTP command\n"); + break; + } + + if (strcmp(cmd, "GET")) { + fprintf(stderr, "Not ready for non-get command: %s\n", cmd); + break; + } + + path = s; + if (!strsep(&s, " ")) { + fprintf(stderr, "No second space in HTTP command\n"); + break; + } + + printf("Serving URI %s....\n", path); + + ctx = lw_init(1024, 1024); + lw_write (ctx, "HTTP/1.1 200 OK\r\n"); + lw_write(ctx, "Content-type: text/html\r\n\r\n"); + lw_write(ctx, ""); + lw_handle(ctx, path); + lw_write(ctx, ""); + + lw_send(ctx, sock); + + printf("Done with client.\n\n"); + break; + } } - if (r == 0) { - printf("Connection closed.\n"); - close(sock); - return; - } - - printf("Received %d bytes.\n", r); - - back += r; - *back = 0; - - if (s = strstr(buf, "\r\n\r\n")) { - char *cmd, *path; - lw_context ctx; - - *s = 0; - - if (!(s = strstr(buf, "\r\n"))) { - fprintf(stderr, "No newline in buf\n"); - close(sock); - return; - } - - *s = 0; - cmd = s = buf; - - if (!strsep(&s, " ")) { - fprintf(stderr, "No first space in HTTP command\n"); - close(sock); - return; - } - - if (strcmp(cmd, "GET")) { - fprintf(stderr, "Not ready for non-get command: %s\n", cmd); - close(sock); - return; - } - - path = s; - if (!strsep(&s, " ")) { - fprintf(stderr, "No second space in HTTP command\n"); - close(sock); - return; - } - - printf("Serving URI %s....\n", path); - - ctx = lw_init(1024, 1024); - lw_write (ctx, "HTTP/1.1 200 OK\r\n"); - lw_write(ctx, "Content-type: text/html\r\n\r\n"); - lw_write(ctx, ""); - lw_handle(ctx, path); - lw_write(ctx, ""); - - lw_send(ctx, sock); - - lw_free(ctx); - printf("Done with client.\n\n"); - close(sock); - return; - } + close(sock); + lw_reset(ctx); } } -int main() { +int main(int argc, char *argv[]) { // The skeleton for this function comes from Beej's sockets tutorial. - int sockfd, new_fd; // listen on sock_fd, new connection on new_fd + int sockfd; // listen on sock_fd struct sockaddr_in my_addr; struct sockaddr_in their_addr; // connector's address information int sin_size, yes = 1; + int nthreads, i, *names; + + if (argc < 2) { + fprintf(stderr, "No thread count specified\n"); + return 1; + } + + nthreads = atoi(argv[1]); + if (nthreads <= 0) { + fprintf(stderr, "Invalid thread count\n"); + return 1; + } + names = calloc(nthreads, sizeof(int)); sockfd = socket(PF_INET, SOCK_STREAM, 0); // do some error checking! @@ -127,8 +186,17 @@ printf("Listening on port %d....\n", lw_port); + for (i = 0; i < nthreads; ++i) { + pthread_t thread; + names[i] = i; + if (pthread_create(&thread, NULL, worker, &names[i])) { + fprintf(stderr, "Error creating worker thread #%d\n", i); + return 1; + } + } + while (1) { - new_fd = accept(sockfd, (struct sockaddr *)&their_addr, &sin_size); + int new_fd = accept(sockfd, (struct sockaddr *)&their_addr, &sin_size); if (new_fd < 0) { fprintf(stderr, "Socket accept failed\n"); @@ -136,6 +204,10 @@ } printf("Accepted connection.\n"); - worker(new_fd); + + pthread_mutex_lock(&queue_mutex); + enqueue(new_fd); + pthread_mutex_unlock(&queue_mutex); + pthread_cond_broadcast(&queue_cond); } } diff -r 4ffdbf429e8d -r d6d78055f001 src/c/lacweb.c --- a/src/c/lacweb.c Thu Jul 17 14:32:49 2008 -0400 +++ b/src/c/lacweb.c Sat Jul 19 18:56:57 2008 -0400 @@ -31,6 +31,11 @@ free(ctx); } +void lw_reset(lw_context ctx) { + ctx->page_front = ctx->page; + ctx->heap_front = ctx->heap; +} + static void lw_check_heap(lw_context ctx, size_t extra) { if (ctx->heap_back - ctx->heap_front < extra) { size_t desired = ctx->heap_back - ctx->heap_front + extra, next; diff -r 4ffdbf429e8d -r d6d78055f001 src/compiler.sml --- a/src/compiler.sml Thu Jul 17 14:32:49 2008 -0400 +++ b/src/compiler.sml Sat Jul 19 18:56:57 2008 -0400 @@ -432,7 +432,7 @@ val ename = "/tmp/webapp" val compile = "gcc -O3 -I include -c " ^ cname ^ " -o " ^ oname - val link = "gcc -O3 clib/lacweb.o " ^ oname ^ " clib/driver.o -o " ^ ename + val link = "gcc -pthread -O3 clib/lacweb.o " ^ oname ^ " clib/driver.o -o " ^ ename val outf = TextIO.openOut cname val s = TextIOPP.openOut {dst = outf, wid = 80}