comparison src/c/urweb.c @ 671:729e65db2e2f

Transactionalize channel operations
author Adam Chlipala <adamc@hcoop.net>
date Tue, 24 Mar 2009 14:44:45 -0400
parents f68eee90dbcf
children df6eb58de040
comparison
equal deleted inserted replaced
670:f73913d97a40 671:729e65db2e2f
96 pthread_mutex_t lock; 96 pthread_mutex_t lock;
97 int pass; 97 int pass;
98 buf msgs; 98 buf msgs;
99 int sock; 99 int sock;
100 time_t last_contact; 100 time_t last_contact;
101 unsigned refcount;
101 } used; 102 } used;
102 } data; 103 } data;
103 } client; 104 } client;
104 105
105 static client **clients, *clients_free; 106 static client **clients, *clients_free;
127 c->mode = USED; 128 c->mode = USED;
128 pthread_mutex_init(&c->data.used.lock, NULL); 129 pthread_mutex_init(&c->data.used.lock, NULL);
129 c->data.used.pass = rand(); 130 c->data.used.pass = rand();
130 c->data.used.sock = -1; 131 c->data.used.sock = -1;
131 c->data.used.last_contact = time(NULL); 132 c->data.used.last_contact = time(NULL);
133 c->data.used.refcount = 0;
132 buf_init(&c->data.used.msgs, 0); 134 buf_init(&c->data.used.msgs, 0);
133 135
134 pthread_mutex_unlock(&clients_mutex); 136 pthread_mutex_unlock(&clients_mutex);
135 137
136 return c; 138 return c;
152 154
153 if (c->mode != USED) { 155 if (c->mode != USED) {
154 pthread_mutex_unlock(&clients_mutex); 156 pthread_mutex_unlock(&clients_mutex);
155 return NULL; 157 return NULL;
156 } 158 }
157 159
160 pthread_mutex_lock(&c->data.used.lock);
161 ++c->data.used.refcount;
162 pthread_mutex_unlock(&c->data.used.lock);
158 pthread_mutex_unlock(&clients_mutex); 163 pthread_mutex_unlock(&clients_mutex);
159 return c; 164 return c;
165 }
166
167 static void uw_release_client(client *c) {
168 pthread_mutex_lock(&c->data.used.lock);
169 --c->data.used.refcount;
170 pthread_mutex_unlock(&c->data.used.lock);
160 } 171 }
161 172
162 void uw_client_connect(size_t id, int pass, int sock) { 173 void uw_client_connect(size_t id, int pass, int sock) {
163 client *c = uw_find_client(id); 174 client *c = uw_find_client(id);
164 175
165 if (c == NULL) { 176 if (c == NULL) {
166 close(sock); 177 close(sock);
167 fprintf(stderr, "Out-of-bounds client request (%d)\n", (int)id); 178 fprintf(stderr, "Out-of-bounds client request (%d)\n", (int)id);
168 return; 179 return;
169 } 180 }
181
182 uw_release_client(c);
170 183
171 pthread_mutex_lock(&c->data.used.lock); 184 pthread_mutex_lock(&c->data.used.lock);
172 185
173 if (pass != c->data.used.pass) { 186 if (pass != c->data.used.pass) {
174 pthread_mutex_unlock(&c->data.used.lock); 187 pthread_mutex_unlock(&c->data.used.lock);
217 cutoff = time(NULL) - timeout; 230 cutoff = time(NULL) - timeout;
218 231
219 pthread_mutex_lock(&clients_mutex); 232 pthread_mutex_lock(&clients_mutex);
220 233
221 for (i = 0; i < n_clients; ++i) { 234 for (i = 0; i < n_clients; ++i) {
222 if (clients[i]->mode == USED && clients[i]->data.used.last_contact < cutoff) 235 if (clients[i]->mode == USED && clients[i]->data.used.last_contact < cutoff
236 && clients[i]->data.used.refcount == 0)
223 uw_free_client(clients[i]); 237 uw_free_client(clients[i]);
224 } 238 }
225 239
226 pthread_mutex_unlock(&clients_mutex); 240 pthread_mutex_unlock(&clients_mutex);
227 } 241 }
240 union { 254 union {
241 struct channel *next; 255 struct channel *next;
242 struct { 256 struct {
243 pthread_mutex_t lock; 257 pthread_mutex_t lock;
244 client_list *clients; 258 client_list *clients;
259 unsigned refcount;
245 } used; 260 } used;
246 } data; 261 } data;
247 } channel; 262 } channel;
248 263
249 static channel **channels, *channels_free; 264 static channel **channels, *channels_free;
269 } 284 }
270 285
271 ch->mode = USED; 286 ch->mode = USED;
272 pthread_mutex_init(&ch->data.used.lock, NULL); 287 pthread_mutex_init(&ch->data.used.lock, NULL);
273 ch->data.used.clients = NULL; 288 ch->data.used.clients = NULL;
289 ch->data.used.refcount = 0;
274 290
275 pthread_mutex_unlock(&channels_mutex); 291 pthread_mutex_unlock(&channels_mutex);
276 292
277 return ch; 293 return ch;
294 }
295
296 static void uw_free_channel(channel *ch) {
297 if (ch->mode == USED) {
298 client_list *cs;
299
300 for (cs = ch->data.used.clients; cs; ) {
301 client_list *tmp = cs->next;
302 free(cs);
303 cs = tmp;
304 }
305 pthread_mutex_destroy(&ch->data.used.lock);
306 ch->mode = UNUSED;
307 ch->data.next = channels_free;
308 channels_free = ch;
309 }
278 } 310 }
279 311
280 static channel *uw_find_channel(size_t id) { 312 static channel *uw_find_channel(size_t id) {
281 channel *ch = NULL; 313 channel *ch = NULL;
282 314
283 pthread_mutex_lock(&channels_mutex); 315 pthread_mutex_lock(&channels_mutex);
284 316
285 if (id < n_channels && channels[id]->mode == USED) 317 if (id < n_channels && channels[id]->mode == USED) {
286 ch = channels[id]; 318 ch = channels[id];
287 319
320 pthread_mutex_lock(&ch->data.used.lock);
321 ++ch->data.used.refcount;
322 pthread_mutex_unlock(&ch->data.used.lock);
323 }
324
288 pthread_mutex_unlock(&channels_mutex); 325 pthread_mutex_unlock(&channels_mutex);
289 326
290 return ch; 327 return ch;
328 }
329
330 static void uw_release_channel(channel *ch) {
331 pthread_mutex_lock(&ch->data.used.lock);
332 ++ch->data.used.refcount;
333 pthread_mutex_unlock(&ch->data.used.lock);
291 } 334 }
292 335
293 static void uw_subscribe(channel *ch, client *c) { 336 static void uw_subscribe(channel *ch, client *c) {
294 client_list *cs = malloc(sizeof(client_list)); 337 client_list *cs = malloc(sizeof(client_list));
295 338
340 client *c = cs->data; 383 client *c = cs->data;
341 384
342 pthread_mutex_lock(&c->data.used.lock); 385 pthread_mutex_lock(&c->data.used.lock);
343 386
344 if (c->data.used.sock != -1) { 387 if (c->data.used.sock != -1) {
345 printf("Immediate send\n");
346 uw_really_send(c->data.used.sock, begin_msgs, sizeof(begin_msgs) - 1); 388 uw_really_send(c->data.used.sock, begin_msgs, sizeof(begin_msgs) - 1);
347 uw_really_send(c->data.used.sock, pre, preLen); 389 uw_really_send(c->data.used.sock, pre, preLen);
348 uw_really_send(c->data.used.sock, msg, len); 390 uw_really_send(c->data.used.sock, msg, len);
349 uw_really_send(c->data.used.sock, "\n", 1); 391 uw_really_send(c->data.used.sock, "\n", 1);
350 close(c->data.used.sock); 392 close(c->data.used.sock);
351 c->data.used.sock = -1; 393 c->data.used.sock = -1;
352 } else { 394 } else {
353 printf("Delayed send\n");
354 buf_append(&c->data.used.msgs, pre, preLen); 395 buf_append(&c->data.used.msgs, pre, preLen);
355 buf_append(&c->data.used.msgs, msg, len); 396 buf_append(&c->data.used.msgs, msg, len);
356 buf_append(&c->data.used.msgs, "\n", 1); 397 buf_append(&c->data.used.msgs, "\n", 1);
357 } 398 }
358 399
384 typedef struct { 425 typedef struct {
385 void (*func)(void*); 426 void (*func)(void*);
386 void *arg; 427 void *arg;
387 } cleanup; 428 } cleanup;
388 429
430 typedef struct {
431 usage mode;
432 channel *ch;
433 enum { OLD, NEW } newness;
434
435 size_t n_subscribed;
436 client **subscribed;
437
438 buf msgs;
439 } channel_delta;
440
389 struct uw_context { 441 struct uw_context {
390 char *headers, *headers_end; 442 char *headers, *headers_end;
391 443
392 buf outHeaders, page, heap, script; 444 buf outHeaders, page, heap, script;
393 char **inputs; 445 char **inputs;
401 regions *regions; 453 regions *regions;
402 454
403 cleanup *cleanup, *cleanup_front, *cleanup_back; 455 cleanup *cleanup, *cleanup_front, *cleanup_back;
404 456
405 const char *script_header, *url_prefix; 457 const char *script_header, *url_prefix;
458
459 size_t n_deltas;
460 channel_delta *deltas;
406 461
407 char error_message[ERROR_BUF_LEN]; 462 char error_message[ERROR_BUF_LEN];
408 }; 463 };
409 464
410 extern int uw_inputs_len; 465 extern int uw_inputs_len;
433 488
434 ctx->error_message[0] = 0; 489 ctx->error_message[0] = 0;
435 490
436 ctx->source_count = 0; 491 ctx->source_count = 0;
437 492
493 ctx->n_deltas = 0;
494 ctx->deltas = malloc(0);
495
438 return ctx; 496 return ctx;
439 } 497 }
440 498
441 void uw_set_db(uw_context ctx, void *db) { 499 void uw_set_db(uw_context ctx, void *db) {
442 ctx->db = db; 500 ctx->db = db;
445 void *uw_get_db(uw_context ctx) { 503 void *uw_get_db(uw_context ctx) {
446 return ctx->db; 504 return ctx->db;
447 } 505 }
448 506
449 void uw_free(uw_context ctx) { 507 void uw_free(uw_context ctx) {
508 size_t i;
509
450 buf_free(&ctx->outHeaders); 510 buf_free(&ctx->outHeaders);
451 buf_free(&ctx->script); 511 buf_free(&ctx->script);
452 buf_free(&ctx->page); 512 buf_free(&ctx->page);
453 buf_free(&ctx->heap); 513 buf_free(&ctx->heap);
454 free(ctx->inputs); 514 free(ctx->inputs);
455 free(ctx->cleanup); 515 free(ctx->cleanup);
516
517 for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED; ++i) {
518 free(ctx->deltas[i].subscribed);
519 buf_free(&ctx->deltas[i].msgs);
520 }
521
456 free(ctx); 522 free(ctx);
457 } 523 }
458 524
459 void uw_reset_keep_error_message(uw_context ctx) { 525 void uw_reset_keep_error_message(uw_context ctx) {
460 buf_reset(&ctx->outHeaders); 526 buf_reset(&ctx->outHeaders);
463 buf_reset(&ctx->page); 529 buf_reset(&ctx->page);
464 buf_reset(&ctx->heap); 530 buf_reset(&ctx->heap);
465 ctx->regions = NULL; 531 ctx->regions = NULL;
466 ctx->cleanup_front = ctx->cleanup; 532 ctx->cleanup_front = ctx->cleanup;
467 ctx->source_count = 0; 533 ctx->source_count = 0;
534 if (ctx->n_deltas > 0)
535 ctx->deltas[0].mode = UNUSED;
468 } 536 }
469 537
470 void uw_reset_keep_request(uw_context ctx) { 538 void uw_reset_keep_request(uw_context ctx) {
471 uw_reset_keep_error_message(ctx); 539 uw_reset_keep_error_message(ctx);
472 ctx->error_message[0] = 0; 540 ctx->error_message[0] = 0;
504 } 572 }
505 573
506 ctx->headers_end = s; 574 ctx->headers_end = s;
507 } 575 }
508 576
509 failure_kind uw_begin(uw_context ctx, char *path) { 577 int uw_db_begin(uw_context);
510 int r = setjmp(ctx->jmp_buf);
511
512 if (r == 0)
513 uw_handle(ctx, path);
514
515 return r;
516 }
517 578
518 __attribute__((noreturn)) void uw_error(uw_context ctx, failure_kind fk, const char *fmt, ...) { 579 __attribute__((noreturn)) void uw_error(uw_context ctx, failure_kind fk, const char *fmt, ...) {
519 cleanup *cl; 580 cleanup *cl;
520 581
521 va_list ap; 582 va_list ap;
544 } 605 }
545 606
546 ctx->cleanup_front->func = func; 607 ctx->cleanup_front->func = func;
547 ctx->cleanup_front->arg = arg; 608 ctx->cleanup_front->arg = arg;
548 ++ctx->cleanup_front; 609 ++ctx->cleanup_front;
610 }
611
612 failure_kind uw_begin(uw_context ctx, char *path) {
613 int r = setjmp(ctx->jmp_buf);
614
615 if (r == 0) {
616 if (uw_db_begin(ctx))
617 uw_error(ctx, BOUNDED_RETRY, "Error running SQL BEGIN");
618 uw_handle(ctx, path);
619 }
620
621 return r;
549 } 622 }
550 623
551 void uw_pop_cleanup(uw_context ctx) { 624 void uw_pop_cleanup(uw_context ctx) {
552 if (ctx->cleanup_front == ctx->cleanup) 625 if (ctx->cleanup_front == ctx->cleanup)
553 uw_error(ctx, FATAL, "Attempt to pop from empty cleanup action stack"); 626 uw_error(ctx, FATAL, "Attempt to pop from empty cleanup action stack");
1726 uw_write_header(ctx, "\r\n"); 1799 uw_write_header(ctx, "\r\n");
1727 1800
1728 return uw_unit_v; 1801 return uw_unit_v;
1729 } 1802 }
1730 1803
1804 static channel_delta *allocate_delta(uw_context ctx, channel *ch) {
1805 size_t i;
1806 channel_delta *cd;
1807
1808 for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED && ctx->deltas[i].ch != ch; ++i);
1809
1810 if (i < ctx->n_deltas && ctx->deltas[i].mode == USED && ctx->deltas[i].ch == ch)
1811 return &ctx->deltas[i];
1812
1813 if (i < ctx->n_deltas)
1814 cd = &ctx->deltas[i];
1815 else {
1816 ++ctx->n_deltas;
1817 ctx->deltas = realloc(ctx->deltas, sizeof(channel_delta) * ctx->n_deltas);
1818 cd = &ctx->deltas[ctx->n_deltas-1];
1819 }
1820
1821 cd->mode = USED;
1822 cd->newness = OLD;
1823 cd->ch = ch;
1824 if (cd->n_subscribed > 0)
1825 cd->subscribed[0] = NULL;
1826 buf_reset(&cd->msgs);
1827 return cd;
1828 }
1829
1731 uw_Basis_channel uw_Basis_new_channel(uw_context ctx, uw_unit u) { 1830 uw_Basis_channel uw_Basis_new_channel(uw_context ctx, uw_unit u) {
1732 return uw_new_channel()->id; 1831 size_t i;
1832 channel *ch = uw_new_channel();
1833 ++ch->data.used.refcount;
1834 channel_delta *cd = allocate_delta(ctx, ch);
1835
1836 cd->newness = NEW;
1837
1838 return ch->id;
1839 }
1840
1841 static int delta_used(channel_delta *cd) {
1842 return cd->newness == NEW || buf_used(&cd->msgs) > 0 || (cd->n_subscribed > 0 && cd->subscribed[0]);
1733 } 1843 }
1734 1844
1735 uw_unit uw_Basis_subscribe(uw_context ctx, uw_Basis_channel chn) { 1845 uw_unit uw_Basis_subscribe(uw_context ctx, uw_Basis_channel chn) {
1736 channel *ch = uw_find_channel(chn); 1846 channel *ch = uw_find_channel(chn);
1737 1847
1740 else { 1850 else {
1741 size_t id = atoi(uw_Basis_requestHeader(ctx, "UrWeb-Client")); 1851 size_t id = atoi(uw_Basis_requestHeader(ctx, "UrWeb-Client"));
1742 int pass = atoi(uw_Basis_requestHeader(ctx, "UrWeb-Pass")); 1852 int pass = atoi(uw_Basis_requestHeader(ctx, "UrWeb-Pass"));
1743 client *c = uw_find_client(id); 1853 client *c = uw_find_client(id);
1744 1854
1745 if (c == NULL) 1855 if (c == NULL) {
1856 uw_release_channel(ch);
1746 uw_error(ctx, FATAL, "Unknown client ID in subscription request"); 1857 uw_error(ctx, FATAL, "Unknown client ID in subscription request");
1747 else if (c->data.used.pass != pass) 1858 } else if (c->data.used.pass != pass) {
1859 uw_release_channel(ch);
1860 uw_release_client(c);
1748 uw_error(ctx, FATAL, "Wrong client password in subscription request"); 1861 uw_error(ctx, FATAL, "Wrong client password in subscription request");
1749 else 1862 } else {
1750 uw_subscribe(ch, c); 1863 size_t i;
1751 } 1864 channel_delta *cd = allocate_delta(ctx, ch);
1865
1866 if (delta_used(cd))
1867 uw_release_channel(ch);
1868
1869 for (i = 0; i < cd->n_subscribed && cd->subscribed[i]; ++i);
1870
1871 if (i < cd->n_subscribed)
1872 cd->subscribed[i] = c;
1873 else {
1874 ++cd->n_subscribed;
1875 cd->subscribed = realloc(cd->subscribed, sizeof(int) * cd->n_subscribed);
1876 cd->subscribed[cd->n_subscribed-1] = c;
1877 }
1878 }
1879 }
1880
1881 return uw_unit_v;
1752 } 1882 }
1753 1883
1754 uw_unit uw_Basis_send(uw_context ctx, uw_Basis_channel chn, uw_Basis_string msg) { 1884 uw_unit uw_Basis_send(uw_context ctx, uw_Basis_channel chn, uw_Basis_string msg) {
1755 channel *ch = uw_find_channel(chn); 1885 channel *ch = uw_find_channel(chn);
1756 1886
1757 if (ch == NULL) 1887 if (ch == NULL)
1758 uw_error(ctx, FATAL, "Bad channel ID %d", (int)chn); 1888 uw_error(ctx, FATAL, "Bad channel ID %d", (int)chn);
1759 else 1889 else {
1760 uw_channel_send(ch, msg); 1890 channel_delta *cd = allocate_delta(ctx, ch);
1761 } 1891 if (delta_used(cd))
1892 uw_release_channel(ch);
1893 buf_append(&cd->msgs, msg, strlen(msg));
1894 }
1895
1896 return uw_unit_v;
1897 }
1898
1899 int uw_db_commit(uw_context);
1900 int uw_db_rollback(uw_context);
1901
1902 void uw_commit(uw_context ctx) {
1903 size_t i, j;
1904
1905 for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED; ++i) {
1906 channel *ch = ctx->deltas[i].ch;
1907
1908 for (j = 0; j < ctx->deltas[i].n_subscribed && ctx->deltas[i].subscribed[j]; ++j) {
1909 client *c = ctx->deltas[i].subscribed[j];
1910
1911 uw_subscribe(ch, c);
1912 uw_release_client(c);
1913 }
1914
1915 if (buf_used(&ctx->deltas[i].msgs) > 0) {
1916 uw_channel_send(ch, ctx->deltas[i].msgs.start);
1917 }
1918
1919 uw_release_channel(ch);
1920 }
1921
1922 if (uw_db_commit(ctx))
1923 uw_error(ctx, FATAL, "Error running SQL COMMIT");
1924 }
1925
1926 int uw_rollback(uw_context ctx) {
1927 size_t i, j;
1928
1929 for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED; ++i) {
1930 channel *ch = ctx->deltas[i].ch;
1931
1932 for (j = 0; j < ctx->deltas[i].n_subscribed && ctx->deltas[i].subscribed[j]; ++j) {
1933 client *c = ctx->deltas[i].subscribed[j];
1934
1935 uw_release_client(c);
1936 }
1937
1938 if (ctx->deltas[i].newness == NEW)
1939 uw_free_channel(ch);
1940 else
1941 uw_release_channel(ch);
1942 }
1943
1944 return uw_db_rollback(ctx);
1945 }