comparison src/c/urweb.c @ 1121:0cee0c8d8c37

Support for protocol-specific expunger dispatch
author Adam Chlipala <adamc@hcoop.net>
date Sun, 10 Jan 2010 10:40:57 -0500
parents 74f2eb3b0606
children e1cf925e2074
comparison
equal deleted inserted replaced
1120:74f2eb3b0606 1121:0cee0c8d8c37
148 int (*send)(int sockfd, const void *buf, ssize_t len); 148 int (*send)(int sockfd, const void *buf, ssize_t len);
149 int (*close)(int fd); 149 int (*close)(int fd);
150 time_t last_contact; 150 time_t last_contact;
151 unsigned n_channels; 151 unsigned n_channels;
152 unsigned refcount; 152 unsigned refcount;
153 void *data;
153 } client; 154 } client;
154 155
155 156
156 // Persistent client state 157 // Persistent client state
157 158
160 161
161 static pthread_mutex_t clients_mutex = PTHREAD_MUTEX_INITIALIZER; 162 static pthread_mutex_t clients_mutex = PTHREAD_MUTEX_INITIALIZER;
162 163
163 size_t uw_messages_max = SIZE_MAX; 164 size_t uw_messages_max = SIZE_MAX;
164 size_t uw_clients_max = SIZE_MAX; 165 size_t uw_clients_max = SIZE_MAX;
166
167 void *uw_init_client_data();
168 void uw_free_client_data(void *);
169 void uw_copy_client_data(void *dst, void *src);
165 170
166 static client *new_client() { 171 static client *new_client() {
167 client *c; 172 client *c;
168 173
169 pthread_mutex_lock(&clients_mutex); 174 pthread_mutex_lock(&clients_mutex);
191 c->sock = -1; 196 c->sock = -1;
192 c->last_contact = time(NULL); 197 c->last_contact = time(NULL);
193 buf_reset(&c->msgs); 198 buf_reset(&c->msgs);
194 c->n_channels = 0; 199 c->n_channels = 0;
195 c->refcount = 0; 200 c->refcount = 0;
201 c->data = uw_init_client_data();
196 pthread_mutex_unlock(&c->lock); 202 pthread_mutex_unlock(&c->lock);
197 203
198 c->next = clients_used; 204 c->next = clients_used;
199 clients_used = c; 205 clients_used = c;
200 206
430 436
431 char *current_url; 437 char *current_url;
432 438
433 int deadline; 439 int deadline;
434 440
441 void *client_data;
442
435 char error_message[ERROR_BUF_LEN]; 443 char error_message[ERROR_BUF_LEN];
436 }; 444 };
437 445
438 size_t uw_headers_max = SIZE_MAX; 446 size_t uw_headers_max = SIZE_MAX;
439 size_t uw_page_max = SIZE_MAX; 447 size_t uw_page_max = SIZE_MAX;
487 495
488 ctx->current_url = ""; 496 ctx->current_url = "";
489 497
490 ctx->deadline = INT_MAX; 498 ctx->deadline = INT_MAX;
491 499
500 ctx->client_data = uw_init_client_data();
501
492 return ctx; 502 return ctx;
493 } 503 }
494 504
495 size_t uw_inputs_max = SIZE_MAX; 505 size_t uw_inputs_max = SIZE_MAX;
506
507 uw_app *uw_get_app(uw_context ctx) {
508 return ctx->app;
509 }
496 510
497 int uw_set_app(uw_context ctx, uw_app *app) { 511 int uw_set_app(uw_context ctx, uw_app *app) {
498 ctx->app = app; 512 ctx->app = app;
499 513
500 if (app && app->inputs_len > ctx->sz_inputs) { 514 if (app && app->inputs_len > ctx->sz_inputs) {
503 517
504 ctx->sz_inputs = app->inputs_len; 518 ctx->sz_inputs = app->inputs_len;
505 ctx->inputs = realloc(ctx->inputs, ctx->sz_inputs * sizeof(input)); 519 ctx->inputs = realloc(ctx->inputs, ctx->sz_inputs * sizeof(input));
506 memset(ctx->inputs, 0, ctx->sz_inputs * sizeof(input)); 520 memset(ctx->inputs, 0, ctx->sz_inputs * sizeof(input));
507 } 521 }
522 }
523
524 void uw_set_client_data(uw_context ctx, void *data) {
525 uw_copy_client_data(ctx->client_data, data);
508 } 526 }
509 527
510 void uw_set_db(uw_context ctx, void *db) { 528 void uw_set_db(uw_context ctx, void *db) {
511 ctx->db = db; 529 ctx->db = db;
512 } 530 }
524 buf_free(&ctx->heap); 542 buf_free(&ctx->heap);
525 free(ctx->inputs); 543 free(ctx->inputs);
526 free(ctx->subinputs); 544 free(ctx->subinputs);
527 free(ctx->cleanup); 545 free(ctx->cleanup);
528 free(ctx->transactionals); 546 free(ctx->transactionals);
547 uw_free_client_data(ctx->client_data);
529 548
530 for (i = 0; i < ctx->n_deltas; ++i) 549 for (i = 0; i < ctx->n_deltas; ++i)
531 buf_free(&ctx->deltas[i].msgs); 550 buf_free(&ctx->deltas[i].msgs);
532 551
533 for (i = 0; i < ctx->n_globals; ++i) 552 for (i = 0; i < ctx->n_globals; ++i)
666 685
667 if (c == NULL) 686 if (c == NULL)
668 uw_error(ctx, FATAL, "Limit exceeded on number of message-passing clients"); 687 uw_error(ctx, FATAL, "Limit exceeded on number of message-passing clients");
669 688
670 use_client(c); 689 use_client(c);
690 uw_copy_client_data(c->data, ctx->client_data);
671 ctx->client = c; 691 ctx->client = c;
672 } 692 }
673 } 693 }
674 } 694 }
675 695
2975 } 2995 }
2976 2996
2977 2997
2978 // "Garbage collection" 2998 // "Garbage collection"
2979 2999
2980 static failure_kind uw_expunge(uw_context ctx, uw_Basis_client cli) { 3000 void uw_do_expunge(uw_context ctx, uw_Basis_client cli, void *data);
3001 void uw_post_expunge(uw_context ctx, void *data);
3002
3003 static failure_kind uw_expunge(uw_context ctx, uw_Basis_client cli, void *data) {
2981 int r = setjmp(ctx->jmp_buf); 3004 int r = setjmp(ctx->jmp_buf);
2982 3005
2983 if (r == 0) { 3006 if (r == 0)
2984 if (ctx->app->db_begin(ctx)) 3007 uw_do_expunge(ctx, cli, data);
2985 uw_error(ctx, FATAL, "Error running SQL BEGIN"); 3008 else
2986 ctx->app->expunger(ctx, cli); 3009 ctx->app->db_rollback(ctx);
2987 if (ctx->app->db_commit(ctx)) 3010
2988 uw_error(ctx, FATAL, "Error running SQL COMMIT"); 3011 uw_post_expunge(ctx, data);
2989 }
2990 3012
2991 return r; 3013 return r;
2992 } 3014 }
2993 3015
2994 void uw_prune_clients(uw_context ctx) { 3016 void uw_prune_clients(uw_context ctx) {
3008 prev->next = next; 3030 prev->next = next;
3009 else 3031 else
3010 clients_used = next; 3032 clients_used = next;
3011 uw_reset(ctx); 3033 uw_reset(ctx);
3012 while (fk == UNLIMITED_RETRY) { 3034 while (fk == UNLIMITED_RETRY) {
3013 fk = uw_expunge(ctx, c->id); 3035 fk = uw_expunge(ctx, c->id, c->data);
3014 if (fk == UNLIMITED_RETRY) { 3036 if (fk == UNLIMITED_RETRY)
3015 ctx->app->db_rollback(ctx);
3016 printf("Unlimited retry during expunge: %s\n", uw_error_message(ctx)); 3037 printf("Unlimited retry during expunge: %s\n", uw_error_message(ctx));
3017 }
3018 } 3038 }
3019 if (fk == SUCCESS) 3039 if (fk == SUCCESS)
3020 free_client(c); 3040 free_client(c);
3021 else { 3041 else
3022 ctx->app->db_rollback(ctx);
3023 fprintf(stderr, "Expunge blocked by error: %s\n", uw_error_message(ctx)); 3042 fprintf(stderr, "Expunge blocked by error: %s\n", uw_error_message(ctx));
3024 }
3025 } 3043 }
3026 else 3044 else
3027 prev = c; 3045 prev = c;
3028 pthread_mutex_unlock(&c->lock); 3046 pthread_mutex_unlock(&c->lock);
3029 } 3047 }