Mercurial > urweb
diff src/c/driver.c @ 138:d6d78055f001
Change driver to use Pthreads
author | Adam Chlipala <adamc@hcoop.net> |
---|---|
date | Sat, 19 Jul 2008 18:56:57 -0400 |
parents | 133fa2d51bb4 |
children | adfa2c7a75da |
line wrap: on
line diff
--- a/src/c/driver.c Thu Jul 17 14:32:49 2008 -0400 +++ b/src/c/driver.c Sat Jul 19 18:56:57 2008 -0400 @@ -5,6 +5,8 @@ #include <sys/socket.h> #include <netinet/in.h> +#include <pthread.h> + #include "lacweb.h" int lw_port = 8080; @@ -13,88 +15,145 @@ void lw_handle(lw_context, char*); -static void worker(int sock) { - char buf[lw_bufsize+1], *back = buf, *s; +typedef struct node { + int fd; + struct node *next; +} *node; + +static node front = NULL, back = NULL; + +static int empty() { + return front == NULL; +} + +static void enqueue(int fd) { + node n = malloc(sizeof(struct node)); + + n->fd = fd; + n->next = NULL; + if (back) + back->next = n; + else + front = n; + back = n; +} + +static int dequeue() { + int ret = front->fd; + + front = front->next; + if (!front) + back = NULL; + + return ret; +} + +static pthread_mutex_t queue_mutex = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER; + +static void *worker(void *data) { + int me = *(int *)data; + lw_context ctx = lw_init(1024, 1024); while (1) { - int r = recv(sock, back, lw_bufsize - (back - buf), 0); + char buf[lw_bufsize+1], *back = buf, *s; + int sock; - if (r < 0) { - fprintf(stderr, "Recv failed\n"); - close(sock); - return; + pthread_mutex_lock(&queue_mutex); + while (empty()) + pthread_cond_wait(&queue_cond, &queue_mutex); + sock = dequeue(); + pthread_mutex_unlock(&queue_mutex); + + printf("Handling connection with thread #%d.\n", me); + + while (1) { + int r = recv(sock, back, lw_bufsize - (back - buf), 0); + + if (r < 0) { + fprintf(stderr, "Recv failed\n"); + break; + } + + if (r == 0) { + printf("Connection closed.\n"); + break; + } + + printf("Received %d bytes.\n", r); + + back += r; + *back = 0; + + if (s = strstr(buf, "\r\n\r\n")) { + char *cmd, *path; + + *s = 0; + + if (!(s = strstr(buf, "\r\n"))) { + fprintf(stderr, "No newline in buf\n"); + break; + } + + *s = 0; + cmd = s = buf; + + if (!strsep(&s, " ")) { + fprintf(stderr, "No first space in HTTP command\n"); + break; + } + + if (strcmp(cmd, "GET")) { + fprintf(stderr, "Not ready for non-get command: %s\n", cmd); + break; + } + + path = s; + if (!strsep(&s, " ")) { + fprintf(stderr, "No second space in HTTP command\n"); + break; + } + + printf("Serving URI %s....\n", path); + + ctx = lw_init(1024, 1024); + lw_write (ctx, "HTTP/1.1 200 OK\r\n"); + lw_write(ctx, "Content-type: text/html\r\n\r\n"); + lw_write(ctx, "<html>"); + lw_handle(ctx, path); + lw_write(ctx, "</html>"); + + lw_send(ctx, sock); + + printf("Done with client.\n\n"); + break; + } } - if (r == 0) { - printf("Connection closed.\n"); - close(sock); - return; - } - - printf("Received %d bytes.\n", r); - - back += r; - *back = 0; - - if (s = strstr(buf, "\r\n\r\n")) { - char *cmd, *path; - lw_context ctx; - - *s = 0; - - if (!(s = strstr(buf, "\r\n"))) { - fprintf(stderr, "No newline in buf\n"); - close(sock); - return; - } - - *s = 0; - cmd = s = buf; - - if (!strsep(&s, " ")) { - fprintf(stderr, "No first space in HTTP command\n"); - close(sock); - return; - } - - if (strcmp(cmd, "GET")) { - fprintf(stderr, "Not ready for non-get command: %s\n", cmd); - close(sock); - return; - } - - path = s; - if (!strsep(&s, " ")) { - fprintf(stderr, "No second space in HTTP command\n"); - close(sock); - return; - } - - printf("Serving URI %s....\n", path); - - ctx = lw_init(1024, 1024); - lw_write (ctx, "HTTP/1.1 200 OK\r\n"); - lw_write(ctx, "Content-type: text/html\r\n\r\n"); - lw_write(ctx, "<html>"); - lw_handle(ctx, path); - lw_write(ctx, "</html>"); - - lw_send(ctx, sock); - - lw_free(ctx); - printf("Done with client.\n\n"); - close(sock); - return; - } + close(sock); + lw_reset(ctx); } } -int main() { +int main(int argc, char *argv[]) { // The skeleton for this function comes from Beej's sockets tutorial. - int sockfd, new_fd; // listen on sock_fd, new connection on new_fd + int sockfd; // listen on sock_fd struct sockaddr_in my_addr; struct sockaddr_in their_addr; // connector's address information int sin_size, yes = 1; + int nthreads, i, *names; + + if (argc < 2) { + fprintf(stderr, "No thread count specified\n"); + return 1; + } + + nthreads = atoi(argv[1]); + if (nthreads <= 0) { + fprintf(stderr, "Invalid thread count\n"); + return 1; + } + names = calloc(nthreads, sizeof(int)); sockfd = socket(PF_INET, SOCK_STREAM, 0); // do some error checking! @@ -127,8 +186,17 @@ printf("Listening on port %d....\n", lw_port); + for (i = 0; i < nthreads; ++i) { + pthread_t thread; + names[i] = i; + if (pthread_create(&thread, NULL, worker, &names[i])) { + fprintf(stderr, "Error creating worker thread #%d\n", i); + return 1; + } + } + while (1) { - new_fd = accept(sockfd, (struct sockaddr *)&their_addr, &sin_size); + int new_fd = accept(sockfd, (struct sockaddr *)&their_addr, &sin_size); if (new_fd < 0) { fprintf(stderr, "Socket accept failed\n"); @@ -136,6 +204,10 @@ } printf("Accepted connection.\n"); - worker(new_fd); + + pthread_mutex_lock(&queue_mutex); + enqueue(new_fd); + pthread_mutex_unlock(&queue_mutex); + pthread_cond_broadcast(&queue_cond); } }