annotate src/c/driver.c @ 736:796e42c93c48

Cookie signatures for RPCs
author Adam Chlipala <adamc@hcoop.net>
date Thu, 23 Apr 2009 16:13:02 -0400
parents f2a2be93331c
children d049d31a1966
rev   line source
adamc@116 1 #include <stdio.h>
adamc@116 2
adamc@116 3 #include <string.h>
adamc@502 4 #include <stdlib.h>
adamc@116 5 #include <sys/types.h>
adamc@116 6 #include <sys/socket.h>
adamc@116 7 #include <netinet/in.h>
adamc@472 8 #include <unistd.h>
adamc@502 9 #include <signal.h>
adamc@116 10
adamc@138 11 #include <pthread.h>
adamc@138 12
adamc@734 13 #include <mhash.h>
adamc@734 14
adamc@244 15 #include "urweb.h"
adamc@117 16
adamc@311 17 int uw_backlog = 10;
adamc@311 18 int uw_bufsize = 1024;
adamc@116 19
adamc@138 20 typedef struct node {
adamc@138 21 int fd;
adamc@138 22 struct node *next;
adamc@138 23 } *node;
adamc@138 24
adamc@138 25 static node front = NULL, back = NULL;
adamc@138 26
adamc@138 27 static int empty() {
adamc@138 28 return front == NULL;
adamc@138 29 }
adamc@138 30
adamc@138 31 static void enqueue(int fd) {
adamc@138 32 node n = malloc(sizeof(struct node));
adamc@138 33
adamc@138 34 n->fd = fd;
adamc@138 35 n->next = NULL;
adamc@138 36 if (back)
adamc@138 37 back->next = n;
adamc@138 38 else
adamc@138 39 front = n;
adamc@138 40 back = n;
adamc@138 41 }
adamc@138 42
adamc@138 43 static int dequeue() {
adamc@138 44 int ret = front->fd;
adamc@138 45
adamc@138 46 front = front->next;
adamc@138 47 if (!front)
adamc@138 48 back = NULL;
adamc@138 49
adamc@138 50 return ret;
adamc@138 51 }
adamc@138 52
adamc@138 53 static pthread_mutex_t queue_mutex = PTHREAD_MUTEX_INITIALIZER;
adamc@138 54 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
adamc@138 55
adamc@167 56 #define MAX_RETRIES 5
adamc@167 57
adamc@424 58 static int try_rollback(uw_context ctx) {
adamc@671 59 int r = uw_rollback(ctx);
adamc@424 60
adamc@424 61 if (r) {
adamc@424 62 printf("Error running SQL ROLLBACK\n");
adamc@424 63 uw_reset(ctx);
adamc@424 64 uw_write(ctx, "HTTP/1.1 500 Internal Server Error\n\r");
adamc@424 65 uw_write(ctx, "Content-type: text/plain\r\n\r\n");
adamc@424 66 uw_write(ctx, "Error running SQL ROLLBACK\n");
adamc@424 67 }
adamc@424 68
adamc@424 69 return r;
adamc@424 70 }
adamc@424 71
adamc@698 72 static uw_context new_context() {
adamc@687 73 uw_context ctx = uw_init();
adamc@698 74 int retries_left = MAX_RETRIES;
adamc@698 75
adamc@272 76 while (1) {
adamc@311 77 failure_kind fk = uw_begin_init(ctx);
adamc@272 78
adamc@272 79 if (fk == SUCCESS) {
adamc@272 80 printf("Database connection initialized.\n");
adamc@272 81 break;
adamc@272 82 } else if (fk == BOUNDED_RETRY) {
adamc@272 83 if (retries_left) {
adamc@311 84 printf("Initialization error triggers bounded retry: %s\n", uw_error_message(ctx));
adamc@272 85 --retries_left;
adamc@272 86 } else {
adamc@311 87 printf("Fatal initialization error (out of retries): %s\n", uw_error_message(ctx));
adamc@311 88 uw_free(ctx);
adamc@272 89 return NULL;
adamc@272 90 }
adamc@272 91 } else if (fk == UNLIMITED_RETRY)
adamc@311 92 printf("Initialization error triggers unlimited retry: %s\n", uw_error_message(ctx));
adamc@272 93 else if (fk == FATAL) {
adamc@311 94 printf("Fatal initialization error: %s\n", uw_error_message(ctx));
adamc@311 95 uw_free(ctx);
adamc@272 96 return NULL;
adamc@272 97 } else {
adamc@698 98 printf("Unknown uw_begin_init return code!\n");
adamc@311 99 uw_free(ctx);
adamc@272 100 return NULL;
adamc@272 101 }
adamc@272 102 }
adamc@116 103
adamc@698 104 return ctx;
adamc@698 105 }
adamc@698 106
adamc@734 107 #define KEYSIZE 16
adamc@734 108 #define PASSSIZE 4
adamc@734 109
adamc@734 110 #define HASH_ALGORITHM MHASH_SHA256
adamc@734 111 #define HASH_BLOCKSIZE 32
adamc@734 112 #define KEYGEN_ALGORITHM KEYGEN_MCRYPT
adamc@734 113
adamc@734 114 int uw_hash_blocksize = HASH_BLOCKSIZE;
adamc@734 115
adamc@734 116 static int password[PASSSIZE];
adamc@734 117 static unsigned char private_key[KEYSIZE];
adamc@734 118
adamc@734 119 static void init_crypto() {
adamc@734 120 KEYGEN kg = {{HASH_ALGORITHM, HASH_ALGORITHM}};
adamc@734 121 int i;
adamc@734 122
adamc@734 123 assert(mhash_get_block_size(HASH_ALGORITHM) == HASH_BLOCKSIZE);
adamc@734 124
adamc@734 125 for (i = 0; i < PASSSIZE; ++i)
adamc@734 126 password[i] = rand();
adamc@734 127
adamc@734 128 if (mhash_keygen_ext(KEYGEN_ALGORITHM, kg,
adamc@734 129 private_key, sizeof(private_key),
adamc@734 130 (unsigned char*)password, sizeof(password)) < 0) {
adamc@734 131 printf("Key generation failed\n");
adamc@734 132 exit(1);
adamc@734 133 }
adamc@734 134 }
adamc@734 135
adamc@734 136 void uw_sign(const char *in, char *out) {
adamc@734 137 MHASH td;
adamc@734 138
adamc@734 139 td = mhash_hmac_init(HASH_ALGORITHM, private_key, sizeof(private_key),
adamc@734 140 mhash_get_hash_pblock(HASH_ALGORITHM));
adamc@734 141
adamc@734 142 mhash(td, in, strlen(in));
adamc@734 143 if (mhash_hmac_deinit(td, out) < 0)
adamc@734 144 printf("Signing failed");
adamc@734 145 }
adamc@734 146
adamc@698 147 static void *worker(void *data) {
adamc@698 148 int me = *(int *)data, retries_left = MAX_RETRIES;
adamc@698 149 uw_context ctx = new_context();
adamc@698 150
adamc@116 151 while (1) {
adamc@730 152 char buf[uw_bufsize+1], *back = buf, *s, *post;
adamc@667 153 int sock, dont_close = 0;
adamc@116 154
adamc@138 155 pthread_mutex_lock(&queue_mutex);
adamc@138 156 while (empty())
adamc@138 157 pthread_cond_wait(&queue_cond, &queue_mutex);
adamc@138 158 sock = dequeue();
adamc@138 159 pthread_mutex_unlock(&queue_mutex);
adamc@138 160
adamc@138 161 printf("Handling connection with thread #%d.\n", me);
adamc@138 162
adamc@138 163 while (1) {
adamc@167 164 unsigned retries_left = MAX_RETRIES;
adamc@311 165 int r = recv(sock, back, uw_bufsize - (back - buf), 0);
adamc@138 166
adamc@138 167 if (r < 0) {
adamc@138 168 fprintf(stderr, "Recv failed\n");
adamc@138 169 break;
adamc@138 170 }
adamc@138 171
adamc@138 172 if (r == 0) {
adamc@138 173 printf("Connection closed.\n");
adamc@138 174 break;
adamc@138 175 }
adamc@138 176
adamc@730 177 //printf("Received %d bytes.\n", r);
adamc@138 178
adamc@138 179 back += r;
adamc@138 180 *back = 0;
adamc@730 181
adamc@138 182 if (s = strstr(buf, "\r\n\r\n")) {
adamc@424 183 failure_kind fk;
adamc@730 184 int is_post = 0;
adamc@730 185 char *cmd, *path, *headers, path_copy[uw_bufsize+1], *inputs, *after_headers;
adamc@138 186
adamc@475 187 s[2] = 0;
adamc@730 188 after_headers = s + 4;
adamc@144 189
adamc@138 190 if (!(s = strstr(buf, "\r\n"))) {
adamc@138 191 fprintf(stderr, "No newline in buf\n");
adamc@138 192 break;
adamc@138 193 }
adamc@138 194
adamc@138 195 *s = 0;
adamc@457 196 headers = s + 2;
adamc@138 197 cmd = s = buf;
adamc@401 198
adamc@401 199 printf("Read: %s\n", buf);
adamc@138 200
adamc@138 201 if (!strsep(&s, " ")) {
adamc@138 202 fprintf(stderr, "No first space in HTTP command\n");
adamc@138 203 break;
adamc@138 204 }
adamc@138 205
adamc@730 206 uw_set_headers(ctx, headers);
adamc@730 207
adamc@730 208 if (!strcmp(cmd, "POST")) {
adamc@730 209 char *clen_s = uw_Basis_requestHeader(ctx, "Content-length");
adamc@730 210 if (!clen_s) {
adamc@730 211 printf("No Content-length with POST\n");
adamc@730 212 goto done;
adamc@730 213 }
adamc@730 214 int clen = atoi(clen_s);
adamc@730 215 if (clen < 0) {
adamc@730 216 printf("Negative Content-length with POST\n");
adamc@730 217 goto done;
adamc@730 218 }
adamc@730 219
adamc@730 220 while (back - after_headers < clen) {
adamc@730 221 r = recv(sock, back, uw_bufsize - (back - buf), 0);
adamc@730 222
adamc@730 223 if (r < 0) {
adamc@730 224 fprintf(stderr, "Recv failed\n");
adamc@730 225 goto done;
adamc@730 226 }
adamc@730 227
adamc@730 228 if (r == 0) {
adamc@730 229 printf("Connection closed.\n");
adamc@730 230 goto done;
adamc@730 231 }
adamc@730 232
adamc@730 233 back += r;
adamc@730 234 *back = 0;
adamc@730 235 }
adamc@730 236
adamc@730 237 is_post = 1;
adamc@730 238 } else if (strcmp(cmd, "GET")) {
adamc@730 239 fprintf(stderr, "Not ready for non-GET/POST command: %s\n", cmd);
adamc@138 240 break;
adamc@138 241 }
adamc@138 242
adamc@138 243 path = s;
adamc@138 244 if (!strsep(&s, " ")) {
adamc@138 245 fprintf(stderr, "No second space in HTTP command\n");
adamc@138 246 break;
adamc@138 247 }
adamc@138 248
adamc@668 249 if (!strcmp(path, "/.msgs")) {
adamc@668 250 char *id = uw_Basis_requestHeader(ctx, "UrWeb-Client");
adamc@668 251 char *pass = uw_Basis_requestHeader(ctx, "UrWeb-Pass");
adamc@668 252
adamc@668 253 if (id && pass) {
adamc@682 254 unsigned idn = atoi(id);
adamc@668 255 uw_client_connect(idn, atoi(pass), sock);
adamc@668 256 dont_close = 1;
adamc@682 257 fprintf(stderr, "Processed request for messages by client %u\n\n", idn);
adamc@668 258 }
adamc@703 259 else {
adamc@703 260 fprintf(stderr, "Missing fields in .msgs request: %s, %s\n\n", id, pass);
adamc@703 261 }
adamc@667 262 break;
adamc@667 263 }
adamc@667 264
adamc@730 265 if (is_post)
adamc@730 266 inputs = after_headers;
adamc@730 267 else if (inputs = strchr(path, '?'))
adamc@730 268 *inputs++ = 0;
adamc@730 269 if (inputs) {
adamc@144 270 char *name, *value;
adamc@144 271
adamc@144 272 while (*inputs) {
adamc@144 273 name = inputs;
adamc@145 274 if (inputs = strchr(inputs, '&'))
adamc@145 275 *inputs++ = 0;
adamc@145 276 else
adamc@145 277 inputs = strchr(name, 0);
adamc@145 278
adamc@145 279 if (value = strchr(name, '=')) {
adamc@144 280 *value++ = 0;
adamc@311 281 uw_set_input(ctx, name, value);
adamc@144 282 }
adamc@145 283 else
adamc@311 284 uw_set_input(ctx, name, "");
adamc@144 285 }
adamc@144 286 }
adamc@144 287
adamc@138 288 printf("Serving URI %s....\n", path);
adamc@138 289
adamc@167 290 while (1) {
adamc@462 291 uw_write_header(ctx, "HTTP/1.1 200 OK\r\n");
adamc@167 292
adamc@400 293 strcpy(path_copy, path);
adamc@458 294 fk = uw_begin(ctx, path_copy);
adamc@167 295 if (fk == SUCCESS) {
adamc@671 296 uw_commit(ctx);
adamc@167 297 break;
adamc@167 298 } else if (fk == BOUNDED_RETRY) {
adamc@167 299 if (retries_left) {
adamc@311 300 printf("Error triggers bounded retry: %s\n", uw_error_message(ctx));
adamc@167 301 --retries_left;
adamc@167 302 }
adamc@167 303 else {
adamc@311 304 printf("Fatal error (out of retries): %s\n", uw_error_message(ctx));
adamc@167 305
adamc@682 306 try_rollback(ctx);
adamc@682 307
adamc@311 308 uw_reset_keep_error_message(ctx);
adamc@464 309 uw_write_header(ctx, "HTTP/1.1 500 Internal Server Error\n\r");
adamc@464 310 uw_write_header(ctx, "Content-type: text/plain\r\n");
adamc@311 311 uw_write(ctx, "Fatal error (out of retries): ");
adamc@311 312 uw_write(ctx, uw_error_message(ctx));
adamc@311 313 uw_write(ctx, "\n");
adamc@424 314
adamc@424 315 break;
adamc@167 316 }
adamc@167 317 } else if (fk == UNLIMITED_RETRY)
adamc@311 318 printf("Error triggers unlimited retry: %s\n", uw_error_message(ctx));
adamc@167 319 else if (fk == FATAL) {
adamc@311 320 printf("Fatal error: %s\n", uw_error_message(ctx));
adamc@167 321
adamc@682 322 try_rollback(ctx);
adamc@682 323
adamc@311 324 uw_reset_keep_error_message(ctx);
adamc@464 325 uw_write_header(ctx, "HTTP/1.1 500 Internal Server Error\r\n");
adamc@548 326 uw_write_header(ctx, "Content-type: text/html\r\n");
adamc@548 327 uw_write(ctx, "<html><head><title>Fatal Error</title></head><body>");
adamc@311 328 uw_write(ctx, "Fatal error: ");
adamc@311 329 uw_write(ctx, uw_error_message(ctx));
adamc@548 330 uw_write(ctx, "\n</body></html>");
adamc@167 331
adamc@167 332 break;
adamc@167 333 } else {
adamc@311 334 printf("Unknown uw_handle return code!\n");
adamc@167 335
adamc@682 336 try_rollback(ctx);
adamc@682 337
adamc@311 338 uw_reset_keep_request(ctx);
adamc@464 339 uw_write_header(ctx, "HTTP/1.1 500 Internal Server Error\n\r");
adamc@464 340 uw_write_header(ctx, "Content-type: text/plain\r\n");
adamc@311 341 uw_write(ctx, "Unknown uw_handle return code!\n");
adamc@167 342
adamc@167 343 break;
adamc@167 344 }
adamc@167 345
adamc@424 346 if (try_rollback(ctx))
adamc@424 347 break;
adamc@682 348
adamc@682 349 uw_reset_keep_request(ctx);
adamc@167 350 }
adamc@138 351
adamc@311 352 uw_send(ctx, sock);
adamc@138 353
adamc@138 354 printf("Done with client.\n\n");
adamc@324 355 uw_memstats(ctx);
adamc@138 356 break;
adamc@138 357 }
adamc@116 358 }
adamc@116 359
adamc@730 360 done:
adamc@667 361 if (!dont_close)
adamc@667 362 close(sock);
adamc@311 363 uw_reset(ctx);
adamc@116 364 }
adamc@116 365 }
adamc@102 366
adamc@667 367 static void *client_pruner(void *data) {
adamc@698 368 uw_context ctx = new_context();
adamc@698 369
adamc@698 370 if (!ctx)
adamc@698 371 exit(1);
adamc@683 372
adamc@667 373 while (1) {
adamc@683 374 uw_prune_clients(ctx);
adamc@667 375 sleep(5);
adamc@667 376 }
adamc@667 377 }
adamc@667 378
adamc@477 379 static void help(char *cmd) {
adamc@477 380 printf("Usage: %s [-p <port>] [-t <thread-count>]\n", cmd);
adamc@477 381 }
adamc@477 382
adamc@502 383 static void sigint(int signum) {
adamc@502 384 printf("Exiting....\n");
adamc@502 385 exit(0);
adamc@502 386 }
adamc@502 387
adamc@687 388 static void initialize() {
adamc@734 389 uw_context ctx;
adamc@700 390 failure_kind fk;
adamc@687 391
adamc@734 392 init_crypto();
adamc@734 393
adamc@734 394 ctx = new_context();
adamc@734 395
adamc@698 396 if (!ctx)
adamc@698 397 exit(1);
adamc@698 398
adamc@700 399 for (fk = uw_initialize(ctx); fk == UNLIMITED_RETRY; fk = uw_initialize(ctx)) {
adamc@700 400 printf("Unlimited retry during init: %s\n", uw_error_message(ctx));
adamc@700 401 uw_db_rollback(ctx);
adamc@700 402 uw_reset(ctx);
adamc@700 403 }
adamc@700 404
adamc@700 405 if (fk != SUCCESS) {
adamc@700 406 printf("Failed to initialize database! %s\n", uw_error_message(ctx));
adamc@687 407 uw_db_rollback(ctx);
adamc@687 408 exit(1);
adamc@687 409 }
adamc@687 410
adamc@687 411 uw_free(ctx);
adamc@687 412 }
adamc@687 413
adamc@138 414 int main(int argc, char *argv[]) {
adamc@116 415 // The skeleton for this function comes from Beej's sockets tutorial.
adamc@138 416 int sockfd; // listen on sock_fd
adamc@116 417 struct sockaddr_in my_addr;
adamc@116 418 struct sockaddr_in their_addr; // connector's address information
adamc@116 419 int sin_size, yes = 1;
adamc@472 420 int uw_port = 8080, nthreads = 1, i, *names, opt;
adamc@502 421
adamc@502 422 signal(SIGINT, sigint);
adamc@505 423 signal(SIGPIPE, SIG_IGN);
adamc@505 424
adamc@477 425 while ((opt = getopt(argc, argv, "hp:t:")) != -1) {
adamc@472 426 switch (opt) {
adamc@472 427 case '?':
adamc@472 428 fprintf(stderr, "Unknown command-line option");
adamc@477 429 help(argv[0]);
adamc@472 430 return 1;
adamc@138 431
adamc@477 432 case 'h':
adamc@477 433 help(argv[0]);
adamc@477 434 return 0;
adamc@477 435
adamc@472 436 case 'p':
adamc@472 437 uw_port = atoi(optarg);
adamc@472 438 if (uw_port <= 0) {
adamc@472 439 fprintf(stderr, "Invalid port number\n");
adamc@477 440 help(argv[0]);
adamc@472 441 return 1;
adamc@472 442 }
adamc@472 443 break;
adamc@472 444
adamc@472 445 case 't':
adamc@472 446 nthreads = atoi(optarg);
adamc@472 447 if (nthreads <= 0) {
adamc@472 448 fprintf(stderr, "Invalid thread count\n");
adamc@477 449 help(argv[0]);
adamc@472 450 return 1;
adamc@472 451 }
adamc@472 452 break;
adamc@472 453
adamc@472 454 default:
adamc@472 455 fprintf(stderr, "Unexpected getopt() behavior\n");
adamc@472 456 return 1;
adamc@472 457 }
adamc@138 458 }
adamc@138 459
adamc@734 460 uw_global_init();
adamc@687 461 initialize();
adamc@687 462
adamc@138 463 names = calloc(nthreads, sizeof(int));
adamc@116 464
adamc@116 465 sockfd = socket(PF_INET, SOCK_STREAM, 0); // do some error checking!
adamc@116 466
adamc@116 467 if (sockfd < 0) {
adamc@116 468 fprintf(stderr, "Listener socket creation failed\n");
adamc@116 469 return 1;
adamc@116 470 }
adamc@116 471
adamc@116 472 if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) < 0) {
adamc@116 473 fprintf(stderr, "Listener socket option setting failed\n");
adamc@116 474 return 1;
adamc@116 475 }
adamc@116 476
adamc@116 477 my_addr.sin_family = AF_INET; // host byte order
adamc@311 478 my_addr.sin_port = htons(uw_port); // short, network byte order
adamc@116 479 my_addr.sin_addr.s_addr = INADDR_ANY; // auto-fill with my IP
adamc@116 480 memset(my_addr.sin_zero, '\0', sizeof my_addr.sin_zero);
adamc@116 481
adamc@116 482 if (bind(sockfd, (struct sockaddr *)&my_addr, sizeof my_addr) < 0) {
adamc@116 483 fprintf(stderr, "Listener socket bind failed\n");
adamc@116 484 return 1;
adamc@116 485 }
adamc@116 486
adamc@311 487 if (listen(sockfd, uw_backlog) < 0) {
adamc@116 488 fprintf(stderr, "Socket listen failed\n");
adamc@116 489 return 1;
adamc@116 490 }
adamc@116 491
adamc@116 492 sin_size = sizeof their_addr;
adamc@116 493
adamc@311 494 printf("Listening on port %d....\n", uw_port);
adamc@116 495
adamc@667 496 {
adamc@667 497 pthread_t thread;
adamc@667 498 int name;
adamc@667 499
adamc@667 500 if (pthread_create(&thread, NULL, client_pruner, &name)) {
adamc@667 501 fprintf(stderr, "Error creating pruner thread\n");
adamc@667 502 return 1;
adamc@667 503 }
adamc@667 504 }
adamc@667 505
adamc@138 506 for (i = 0; i < nthreads; ++i) {
adamc@138 507 pthread_t thread;
adamc@138 508 names[i] = i;
adamc@138 509 if (pthread_create(&thread, NULL, worker, &names[i])) {
adamc@138 510 fprintf(stderr, "Error creating worker thread #%d\n", i);
adamc@138 511 return 1;
adamc@138 512 }
adamc@138 513 }
adamc@138 514
adamc@116 515 while (1) {
adamc@138 516 int new_fd = accept(sockfd, (struct sockaddr *)&their_addr, &sin_size);
adamc@116 517
adamc@116 518 if (new_fd < 0) {
adamc@116 519 fprintf(stderr, "Socket accept failed\n");
adamc@116 520 return 1;
adamc@116 521 }
adamc@116 522
adamc@116 523 printf("Accepted connection.\n");
adamc@138 524
adamc@138 525 pthread_mutex_lock(&queue_mutex);
adamc@138 526 enqueue(new_fd);
adamc@139 527 pthread_cond_broadcast(&queue_cond);
adamc@138 528 pthread_mutex_unlock(&queue_mutex);
adamc@116 529 }
adamc@102 530 }