annotate src/c/driver.c @ 731:e0dd85ea58e1

Label exported symbols by effect-ness; factor out some common datatypes
author Adam Chlipala <adamc@hcoop.net>
date Thu, 16 Apr 2009 14:49:25 -0400
parents 1b1047992ecf
children f2a2be93331c
rev   line source
adamc@116 1 #include <stdio.h>
adamc@116 2
adamc@116 3 #include <string.h>
adamc@502 4 #include <stdlib.h>
adamc@116 5 #include <sys/types.h>
adamc@116 6 #include <sys/socket.h>
adamc@116 7 #include <netinet/in.h>
adamc@472 8 #include <unistd.h>
adamc@502 9 #include <signal.h>
adamc@116 10
adamc@138 11 #include <pthread.h>
adamc@138 12
adamc@244 13 #include "urweb.h"
adamc@117 14
adamc@311 15 int uw_backlog = 10;
adamc@311 16 int uw_bufsize = 1024;
adamc@116 17
adamc@138 18 typedef struct node {
adamc@138 19 int fd;
adamc@138 20 struct node *next;
adamc@138 21 } *node;
adamc@138 22
adamc@138 23 static node front = NULL, back = NULL;
adamc@138 24
adamc@138 25 static int empty() {
adamc@138 26 return front == NULL;
adamc@138 27 }
adamc@138 28
adamc@138 29 static void enqueue(int fd) {
adamc@138 30 node n = malloc(sizeof(struct node));
adamc@138 31
adamc@138 32 n->fd = fd;
adamc@138 33 n->next = NULL;
adamc@138 34 if (back)
adamc@138 35 back->next = n;
adamc@138 36 else
adamc@138 37 front = n;
adamc@138 38 back = n;
adamc@138 39 }
adamc@138 40
adamc@138 41 static int dequeue() {
adamc@138 42 int ret = front->fd;
adamc@138 43
adamc@138 44 front = front->next;
adamc@138 45 if (!front)
adamc@138 46 back = NULL;
adamc@138 47
adamc@138 48 return ret;
adamc@138 49 }
adamc@138 50
adamc@138 51 static pthread_mutex_t queue_mutex = PTHREAD_MUTEX_INITIALIZER;
adamc@138 52 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
adamc@138 53
adamc@167 54 #define MAX_RETRIES 5
adamc@167 55
adamc@424 56 static int try_rollback(uw_context ctx) {
adamc@671 57 int r = uw_rollback(ctx);
adamc@424 58
adamc@424 59 if (r) {
adamc@424 60 printf("Error running SQL ROLLBACK\n");
adamc@424 61 uw_reset(ctx);
adamc@424 62 uw_write(ctx, "HTTP/1.1 500 Internal Server Error\n\r");
adamc@424 63 uw_write(ctx, "Content-type: text/plain\r\n\r\n");
adamc@424 64 uw_write(ctx, "Error running SQL ROLLBACK\n");
adamc@424 65 }
adamc@424 66
adamc@424 67 return r;
adamc@424 68 }
adamc@424 69
adamc@698 70 static uw_context new_context() {
adamc@687 71 uw_context ctx = uw_init();
adamc@698 72 int retries_left = MAX_RETRIES;
adamc@698 73
adamc@272 74 while (1) {
adamc@311 75 failure_kind fk = uw_begin_init(ctx);
adamc@272 76
adamc@272 77 if (fk == SUCCESS) {
adamc@272 78 printf("Database connection initialized.\n");
adamc@272 79 break;
adamc@272 80 } else if (fk == BOUNDED_RETRY) {
adamc@272 81 if (retries_left) {
adamc@311 82 printf("Initialization error triggers bounded retry: %s\n", uw_error_message(ctx));
adamc@272 83 --retries_left;
adamc@272 84 } else {
adamc@311 85 printf("Fatal initialization error (out of retries): %s\n", uw_error_message(ctx));
adamc@311 86 uw_free(ctx);
adamc@272 87 return NULL;
adamc@272 88 }
adamc@272 89 } else if (fk == UNLIMITED_RETRY)
adamc@311 90 printf("Initialization error triggers unlimited retry: %s\n", uw_error_message(ctx));
adamc@272 91 else if (fk == FATAL) {
adamc@311 92 printf("Fatal initialization error: %s\n", uw_error_message(ctx));
adamc@311 93 uw_free(ctx);
adamc@272 94 return NULL;
adamc@272 95 } else {
adamc@698 96 printf("Unknown uw_begin_init return code!\n");
adamc@311 97 uw_free(ctx);
adamc@272 98 return NULL;
adamc@272 99 }
adamc@272 100 }
adamc@116 101
adamc@698 102 return ctx;
adamc@698 103 }
adamc@698 104
adamc@698 105 static void *worker(void *data) {
adamc@698 106 int me = *(int *)data, retries_left = MAX_RETRIES;
adamc@698 107 uw_context ctx = new_context();
adamc@698 108
adamc@116 109 while (1) {
adamc@730 110 char buf[uw_bufsize+1], *back = buf, *s, *post;
adamc@667 111 int sock, dont_close = 0;
adamc@116 112
adamc@138 113 pthread_mutex_lock(&queue_mutex);
adamc@138 114 while (empty())
adamc@138 115 pthread_cond_wait(&queue_cond, &queue_mutex);
adamc@138 116 sock = dequeue();
adamc@138 117 pthread_mutex_unlock(&queue_mutex);
adamc@138 118
adamc@138 119 printf("Handling connection with thread #%d.\n", me);
adamc@138 120
adamc@138 121 while (1) {
adamc@167 122 unsigned retries_left = MAX_RETRIES;
adamc@311 123 int r = recv(sock, back, uw_bufsize - (back - buf), 0);
adamc@138 124
adamc@138 125 if (r < 0) {
adamc@138 126 fprintf(stderr, "Recv failed\n");
adamc@138 127 break;
adamc@138 128 }
adamc@138 129
adamc@138 130 if (r == 0) {
adamc@138 131 printf("Connection closed.\n");
adamc@138 132 break;
adamc@138 133 }
adamc@138 134
adamc@730 135 //printf("Received %d bytes.\n", r);
adamc@138 136
adamc@138 137 back += r;
adamc@138 138 *back = 0;
adamc@730 139
adamc@138 140 if (s = strstr(buf, "\r\n\r\n")) {
adamc@424 141 failure_kind fk;
adamc@730 142 int is_post = 0;
adamc@730 143 char *cmd, *path, *headers, path_copy[uw_bufsize+1], *inputs, *after_headers;
adamc@138 144
adamc@475 145 s[2] = 0;
adamc@730 146 after_headers = s + 4;
adamc@144 147
adamc@138 148 if (!(s = strstr(buf, "\r\n"))) {
adamc@138 149 fprintf(stderr, "No newline in buf\n");
adamc@138 150 break;
adamc@138 151 }
adamc@138 152
adamc@138 153 *s = 0;
adamc@457 154 headers = s + 2;
adamc@138 155 cmd = s = buf;
adamc@401 156
adamc@401 157 printf("Read: %s\n", buf);
adamc@138 158
adamc@138 159 if (!strsep(&s, " ")) {
adamc@138 160 fprintf(stderr, "No first space in HTTP command\n");
adamc@138 161 break;
adamc@138 162 }
adamc@138 163
adamc@730 164 uw_set_headers(ctx, headers);
adamc@730 165
adamc@730 166 if (!strcmp(cmd, "POST")) {
adamc@730 167 char *clen_s = uw_Basis_requestHeader(ctx, "Content-length");
adamc@730 168 if (!clen_s) {
adamc@730 169 printf("No Content-length with POST\n");
adamc@730 170 goto done;
adamc@730 171 }
adamc@730 172 int clen = atoi(clen_s);
adamc@730 173 if (clen < 0) {
adamc@730 174 printf("Negative Content-length with POST\n");
adamc@730 175 goto done;
adamc@730 176 }
adamc@730 177
adamc@730 178 while (back - after_headers < clen) {
adamc@730 179 r = recv(sock, back, uw_bufsize - (back - buf), 0);
adamc@730 180
adamc@730 181 if (r < 0) {
adamc@730 182 fprintf(stderr, "Recv failed\n");
adamc@730 183 goto done;
adamc@730 184 }
adamc@730 185
adamc@730 186 if (r == 0) {
adamc@730 187 printf("Connection closed.\n");
adamc@730 188 goto done;
adamc@730 189 }
adamc@730 190
adamc@730 191 back += r;
adamc@730 192 *back = 0;
adamc@730 193 }
adamc@730 194
adamc@730 195 is_post = 1;
adamc@730 196 } else if (strcmp(cmd, "GET")) {
adamc@730 197 fprintf(stderr, "Not ready for non-GET/POST command: %s\n", cmd);
adamc@138 198 break;
adamc@138 199 }
adamc@138 200
adamc@138 201 path = s;
adamc@138 202 if (!strsep(&s, " ")) {
adamc@138 203 fprintf(stderr, "No second space in HTTP command\n");
adamc@138 204 break;
adamc@138 205 }
adamc@138 206
adamc@668 207 if (!strcmp(path, "/.msgs")) {
adamc@668 208 char *id = uw_Basis_requestHeader(ctx, "UrWeb-Client");
adamc@668 209 char *pass = uw_Basis_requestHeader(ctx, "UrWeb-Pass");
adamc@668 210
adamc@668 211 if (id && pass) {
adamc@682 212 unsigned idn = atoi(id);
adamc@668 213 uw_client_connect(idn, atoi(pass), sock);
adamc@668 214 dont_close = 1;
adamc@682 215 fprintf(stderr, "Processed request for messages by client %u\n\n", idn);
adamc@668 216 }
adamc@703 217 else {
adamc@703 218 fprintf(stderr, "Missing fields in .msgs request: %s, %s\n\n", id, pass);
adamc@703 219 }
adamc@667 220 break;
adamc@667 221 }
adamc@667 222
adamc@730 223 if (is_post)
adamc@730 224 inputs = after_headers;
adamc@730 225 else if (inputs = strchr(path, '?'))
adamc@730 226 *inputs++ = 0;
adamc@730 227 if (inputs) {
adamc@144 228 char *name, *value;
adamc@144 229
adamc@144 230 while (*inputs) {
adamc@144 231 name = inputs;
adamc@145 232 if (inputs = strchr(inputs, '&'))
adamc@145 233 *inputs++ = 0;
adamc@145 234 else
adamc@145 235 inputs = strchr(name, 0);
adamc@145 236
adamc@145 237 if (value = strchr(name, '=')) {
adamc@144 238 *value++ = 0;
adamc@311 239 uw_set_input(ctx, name, value);
adamc@144 240 }
adamc@145 241 else
adamc@311 242 uw_set_input(ctx, name, "");
adamc@144 243 }
adamc@144 244 }
adamc@144 245
adamc@138 246 printf("Serving URI %s....\n", path);
adamc@138 247
adamc@167 248 while (1) {
adamc@462 249 uw_write_header(ctx, "HTTP/1.1 200 OK\r\n");
adamc@167 250
adamc@400 251 strcpy(path_copy, path);
adamc@458 252 fk = uw_begin(ctx, path_copy);
adamc@167 253 if (fk == SUCCESS) {
adamc@671 254 uw_commit(ctx);
adamc@167 255 break;
adamc@167 256 } else if (fk == BOUNDED_RETRY) {
adamc@167 257 if (retries_left) {
adamc@311 258 printf("Error triggers bounded retry: %s\n", uw_error_message(ctx));
adamc@167 259 --retries_left;
adamc@167 260 }
adamc@167 261 else {
adamc@311 262 printf("Fatal error (out of retries): %s\n", uw_error_message(ctx));
adamc@167 263
adamc@682 264 try_rollback(ctx);
adamc@682 265
adamc@311 266 uw_reset_keep_error_message(ctx);
adamc@464 267 uw_write_header(ctx, "HTTP/1.1 500 Internal Server Error\n\r");
adamc@464 268 uw_write_header(ctx, "Content-type: text/plain\r\n");
adamc@311 269 uw_write(ctx, "Fatal error (out of retries): ");
adamc@311 270 uw_write(ctx, uw_error_message(ctx));
adamc@311 271 uw_write(ctx, "\n");
adamc@424 272
adamc@424 273 break;
adamc@167 274 }
adamc@167 275 } else if (fk == UNLIMITED_RETRY)
adamc@311 276 printf("Error triggers unlimited retry: %s\n", uw_error_message(ctx));
adamc@167 277 else if (fk == FATAL) {
adamc@311 278 printf("Fatal error: %s\n", uw_error_message(ctx));
adamc@167 279
adamc@682 280 try_rollback(ctx);
adamc@682 281
adamc@311 282 uw_reset_keep_error_message(ctx);
adamc@464 283 uw_write_header(ctx, "HTTP/1.1 500 Internal Server Error\r\n");
adamc@548 284 uw_write_header(ctx, "Content-type: text/html\r\n");
adamc@548 285 uw_write(ctx, "<html><head><title>Fatal Error</title></head><body>");
adamc@311 286 uw_write(ctx, "Fatal error: ");
adamc@311 287 uw_write(ctx, uw_error_message(ctx));
adamc@548 288 uw_write(ctx, "\n</body></html>");
adamc@167 289
adamc@167 290 break;
adamc@167 291 } else {
adamc@311 292 printf("Unknown uw_handle return code!\n");
adamc@167 293
adamc@682 294 try_rollback(ctx);
adamc@682 295
adamc@311 296 uw_reset_keep_request(ctx);
adamc@464 297 uw_write_header(ctx, "HTTP/1.1 500 Internal Server Error\n\r");
adamc@464 298 uw_write_header(ctx, "Content-type: text/plain\r\n");
adamc@311 299 uw_write(ctx, "Unknown uw_handle return code!\n");
adamc@167 300
adamc@167 301 break;
adamc@167 302 }
adamc@167 303
adamc@424 304 if (try_rollback(ctx))
adamc@424 305 break;
adamc@682 306
adamc@682 307 uw_reset_keep_request(ctx);
adamc@167 308 }
adamc@138 309
adamc@311 310 uw_send(ctx, sock);
adamc@138 311
adamc@138 312 printf("Done with client.\n\n");
adamc@324 313 uw_memstats(ctx);
adamc@138 314 break;
adamc@138 315 }
adamc@116 316 }
adamc@116 317
adamc@730 318 done:
adamc@667 319 if (!dont_close)
adamc@667 320 close(sock);
adamc@311 321 uw_reset(ctx);
adamc@116 322 }
adamc@116 323 }
adamc@102 324
adamc@667 325 static void *client_pruner(void *data) {
adamc@698 326 uw_context ctx = new_context();
adamc@698 327
adamc@698 328 if (!ctx)
adamc@698 329 exit(1);
adamc@683 330
adamc@667 331 while (1) {
adamc@683 332 uw_prune_clients(ctx);
adamc@667 333 sleep(5);
adamc@667 334 }
adamc@667 335 }
adamc@667 336
adamc@477 337 static void help(char *cmd) {
adamc@477 338 printf("Usage: %s [-p <port>] [-t <thread-count>]\n", cmd);
adamc@477 339 }
adamc@477 340
adamc@502 341 static void sigint(int signum) {
adamc@502 342 printf("Exiting....\n");
adamc@502 343 exit(0);
adamc@502 344 }
adamc@502 345
adamc@687 346 static void initialize() {
adamc@698 347 uw_context ctx = new_context();
adamc@700 348 failure_kind fk;
adamc@687 349
adamc@698 350 if (!ctx)
adamc@698 351 exit(1);
adamc@698 352
adamc@700 353 for (fk = uw_initialize(ctx); fk == UNLIMITED_RETRY; fk = uw_initialize(ctx)) {
adamc@700 354 printf("Unlimited retry during init: %s\n", uw_error_message(ctx));
adamc@700 355 uw_db_rollback(ctx);
adamc@700 356 uw_reset(ctx);
adamc@700 357 }
adamc@700 358
adamc@700 359 if (fk != SUCCESS) {
adamc@700 360 printf("Failed to initialize database! %s\n", uw_error_message(ctx));
adamc@687 361 uw_db_rollback(ctx);
adamc@687 362 exit(1);
adamc@687 363 }
adamc@687 364
adamc@687 365 uw_free(ctx);
adamc@687 366 }
adamc@687 367
adamc@138 368 int main(int argc, char *argv[]) {
adamc@116 369 // The skeleton for this function comes from Beej's sockets tutorial.
adamc@138 370 int sockfd; // listen on sock_fd
adamc@116 371 struct sockaddr_in my_addr;
adamc@116 372 struct sockaddr_in their_addr; // connector's address information
adamc@116 373 int sin_size, yes = 1;
adamc@472 374 int uw_port = 8080, nthreads = 1, i, *names, opt;
adamc@502 375
adamc@502 376 signal(SIGINT, sigint);
adamc@505 377 signal(SIGPIPE, SIG_IGN);
adamc@505 378
adamc@477 379 while ((opt = getopt(argc, argv, "hp:t:")) != -1) {
adamc@472 380 switch (opt) {
adamc@472 381 case '?':
adamc@472 382 fprintf(stderr, "Unknown command-line option");
adamc@477 383 help(argv[0]);
adamc@472 384 return 1;
adamc@138 385
adamc@477 386 case 'h':
adamc@477 387 help(argv[0]);
adamc@477 388 return 0;
adamc@477 389
adamc@472 390 case 'p':
adamc@472 391 uw_port = atoi(optarg);
adamc@472 392 if (uw_port <= 0) {
adamc@472 393 fprintf(stderr, "Invalid port number\n");
adamc@477 394 help(argv[0]);
adamc@472 395 return 1;
adamc@472 396 }
adamc@472 397 break;
adamc@472 398
adamc@472 399 case 't':
adamc@472 400 nthreads = atoi(optarg);
adamc@472 401 if (nthreads <= 0) {
adamc@472 402 fprintf(stderr, "Invalid thread count\n");
adamc@477 403 help(argv[0]);
adamc@472 404 return 1;
adamc@472 405 }
adamc@472 406 break;
adamc@472 407
adamc@472 408 default:
adamc@472 409 fprintf(stderr, "Unexpected getopt() behavior\n");
adamc@472 410 return 1;
adamc@472 411 }
adamc@138 412 }
adamc@138 413
adamc@687 414 initialize();
adamc@687 415
adamc@138 416 names = calloc(nthreads, sizeof(int));
adamc@116 417
adamc@116 418 sockfd = socket(PF_INET, SOCK_STREAM, 0); // do some error checking!
adamc@116 419
adamc@116 420 if (sockfd < 0) {
adamc@116 421 fprintf(stderr, "Listener socket creation failed\n");
adamc@116 422 return 1;
adamc@116 423 }
adamc@116 424
adamc@116 425 if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) < 0) {
adamc@116 426 fprintf(stderr, "Listener socket option setting failed\n");
adamc@116 427 return 1;
adamc@116 428 }
adamc@116 429
adamc@116 430 my_addr.sin_family = AF_INET; // host byte order
adamc@311 431 my_addr.sin_port = htons(uw_port); // short, network byte order
adamc@116 432 my_addr.sin_addr.s_addr = INADDR_ANY; // auto-fill with my IP
adamc@116 433 memset(my_addr.sin_zero, '\0', sizeof my_addr.sin_zero);
adamc@116 434
adamc@116 435 if (bind(sockfd, (struct sockaddr *)&my_addr, sizeof my_addr) < 0) {
adamc@116 436 fprintf(stderr, "Listener socket bind failed\n");
adamc@116 437 return 1;
adamc@116 438 }
adamc@116 439
adamc@311 440 if (listen(sockfd, uw_backlog) < 0) {
adamc@116 441 fprintf(stderr, "Socket listen failed\n");
adamc@116 442 return 1;
adamc@116 443 }
adamc@116 444
adamc@116 445 sin_size = sizeof their_addr;
adamc@116 446
adamc@667 447 uw_global_init();
adamc@667 448
adamc@311 449 printf("Listening on port %d....\n", uw_port);
adamc@116 450
adamc@667 451 {
adamc@667 452 pthread_t thread;
adamc@667 453 int name;
adamc@667 454
adamc@667 455 if (pthread_create(&thread, NULL, client_pruner, &name)) {
adamc@667 456 fprintf(stderr, "Error creating pruner thread\n");
adamc@667 457 return 1;
adamc@667 458 }
adamc@667 459 }
adamc@667 460
adamc@138 461 for (i = 0; i < nthreads; ++i) {
adamc@138 462 pthread_t thread;
adamc@138 463 names[i] = i;
adamc@138 464 if (pthread_create(&thread, NULL, worker, &names[i])) {
adamc@138 465 fprintf(stderr, "Error creating worker thread #%d\n", i);
adamc@138 466 return 1;
adamc@138 467 }
adamc@138 468 }
adamc@138 469
adamc@116 470 while (1) {
adamc@138 471 int new_fd = accept(sockfd, (struct sockaddr *)&their_addr, &sin_size);
adamc@116 472
adamc@116 473 if (new_fd < 0) {
adamc@116 474 fprintf(stderr, "Socket accept failed\n");
adamc@116 475 return 1;
adamc@116 476 }
adamc@116 477
adamc@116 478 printf("Accepted connection.\n");
adamc@138 479
adamc@138 480 pthread_mutex_lock(&queue_mutex);
adamc@138 481 enqueue(new_fd);
adamc@139 482 pthread_cond_broadcast(&queue_cond);
adamc@138 483 pthread_mutex_unlock(&queue_mutex);
adamc@116 484 }
adamc@102 485 }