Mercurial > urweb
comparison src/c/urweb.c @ 671:729e65db2e2f
Transactionalize channel operations
author | Adam Chlipala <adamc@hcoop.net> |
---|---|
date | Tue, 24 Mar 2009 14:44:45 -0400 |
parents | f68eee90dbcf |
children | df6eb58de040 |
comparison
equal
deleted
inserted
replaced
670:f73913d97a40 | 671:729e65db2e2f |
---|---|
96 pthread_mutex_t lock; | 96 pthread_mutex_t lock; |
97 int pass; | 97 int pass; |
98 buf msgs; | 98 buf msgs; |
99 int sock; | 99 int sock; |
100 time_t last_contact; | 100 time_t last_contact; |
101 unsigned refcount; | |
101 } used; | 102 } used; |
102 } data; | 103 } data; |
103 } client; | 104 } client; |
104 | 105 |
105 static client **clients, *clients_free; | 106 static client **clients, *clients_free; |
127 c->mode = USED; | 128 c->mode = USED; |
128 pthread_mutex_init(&c->data.used.lock, NULL); | 129 pthread_mutex_init(&c->data.used.lock, NULL); |
129 c->data.used.pass = rand(); | 130 c->data.used.pass = rand(); |
130 c->data.used.sock = -1; | 131 c->data.used.sock = -1; |
131 c->data.used.last_contact = time(NULL); | 132 c->data.used.last_contact = time(NULL); |
133 c->data.used.refcount = 0; | |
132 buf_init(&c->data.used.msgs, 0); | 134 buf_init(&c->data.used.msgs, 0); |
133 | 135 |
134 pthread_mutex_unlock(&clients_mutex); | 136 pthread_mutex_unlock(&clients_mutex); |
135 | 137 |
136 return c; | 138 return c; |
152 | 154 |
153 if (c->mode != USED) { | 155 if (c->mode != USED) { |
154 pthread_mutex_unlock(&clients_mutex); | 156 pthread_mutex_unlock(&clients_mutex); |
155 return NULL; | 157 return NULL; |
156 } | 158 } |
157 | 159 |
160 pthread_mutex_lock(&c->data.used.lock); | |
161 ++c->data.used.refcount; | |
162 pthread_mutex_unlock(&c->data.used.lock); | |
158 pthread_mutex_unlock(&clients_mutex); | 163 pthread_mutex_unlock(&clients_mutex); |
159 return c; | 164 return c; |
165 } | |
166 | |
167 static void uw_release_client(client *c) { | |
168 pthread_mutex_lock(&c->data.used.lock); | |
169 --c->data.used.refcount; | |
170 pthread_mutex_unlock(&c->data.used.lock); | |
160 } | 171 } |
161 | 172 |
162 void uw_client_connect(size_t id, int pass, int sock) { | 173 void uw_client_connect(size_t id, int pass, int sock) { |
163 client *c = uw_find_client(id); | 174 client *c = uw_find_client(id); |
164 | 175 |
165 if (c == NULL) { | 176 if (c == NULL) { |
166 close(sock); | 177 close(sock); |
167 fprintf(stderr, "Out-of-bounds client request (%d)\n", (int)id); | 178 fprintf(stderr, "Out-of-bounds client request (%d)\n", (int)id); |
168 return; | 179 return; |
169 } | 180 } |
181 | |
182 uw_release_client(c); | |
170 | 183 |
171 pthread_mutex_lock(&c->data.used.lock); | 184 pthread_mutex_lock(&c->data.used.lock); |
172 | 185 |
173 if (pass != c->data.used.pass) { | 186 if (pass != c->data.used.pass) { |
174 pthread_mutex_unlock(&c->data.used.lock); | 187 pthread_mutex_unlock(&c->data.used.lock); |
217 cutoff = time(NULL) - timeout; | 230 cutoff = time(NULL) - timeout; |
218 | 231 |
219 pthread_mutex_lock(&clients_mutex); | 232 pthread_mutex_lock(&clients_mutex); |
220 | 233 |
221 for (i = 0; i < n_clients; ++i) { | 234 for (i = 0; i < n_clients; ++i) { |
222 if (clients[i]->mode == USED && clients[i]->data.used.last_contact < cutoff) | 235 if (clients[i]->mode == USED && clients[i]->data.used.last_contact < cutoff |
236 && clients[i]->data.used.refcount == 0) | |
223 uw_free_client(clients[i]); | 237 uw_free_client(clients[i]); |
224 } | 238 } |
225 | 239 |
226 pthread_mutex_unlock(&clients_mutex); | 240 pthread_mutex_unlock(&clients_mutex); |
227 } | 241 } |
240 union { | 254 union { |
241 struct channel *next; | 255 struct channel *next; |
242 struct { | 256 struct { |
243 pthread_mutex_t lock; | 257 pthread_mutex_t lock; |
244 client_list *clients; | 258 client_list *clients; |
259 unsigned refcount; | |
245 } used; | 260 } used; |
246 } data; | 261 } data; |
247 } channel; | 262 } channel; |
248 | 263 |
249 static channel **channels, *channels_free; | 264 static channel **channels, *channels_free; |
269 } | 284 } |
270 | 285 |
271 ch->mode = USED; | 286 ch->mode = USED; |
272 pthread_mutex_init(&ch->data.used.lock, NULL); | 287 pthread_mutex_init(&ch->data.used.lock, NULL); |
273 ch->data.used.clients = NULL; | 288 ch->data.used.clients = NULL; |
289 ch->data.used.refcount = 0; | |
274 | 290 |
275 pthread_mutex_unlock(&channels_mutex); | 291 pthread_mutex_unlock(&channels_mutex); |
276 | 292 |
277 return ch; | 293 return ch; |
294 } | |
295 | |
296 static void uw_free_channel(channel *ch) { | |
297 if (ch->mode == USED) { | |
298 client_list *cs; | |
299 | |
300 for (cs = ch->data.used.clients; cs; ) { | |
301 client_list *tmp = cs->next; | |
302 free(cs); | |
303 cs = tmp; | |
304 } | |
305 pthread_mutex_destroy(&ch->data.used.lock); | |
306 ch->mode = UNUSED; | |
307 ch->data.next = channels_free; | |
308 channels_free = ch; | |
309 } | |
278 } | 310 } |
279 | 311 |
280 static channel *uw_find_channel(size_t id) { | 312 static channel *uw_find_channel(size_t id) { |
281 channel *ch = NULL; | 313 channel *ch = NULL; |
282 | 314 |
283 pthread_mutex_lock(&channels_mutex); | 315 pthread_mutex_lock(&channels_mutex); |
284 | 316 |
285 if (id < n_channels && channels[id]->mode == USED) | 317 if (id < n_channels && channels[id]->mode == USED) { |
286 ch = channels[id]; | 318 ch = channels[id]; |
287 | 319 |
320 pthread_mutex_lock(&ch->data.used.lock); | |
321 ++ch->data.used.refcount; | |
322 pthread_mutex_unlock(&ch->data.used.lock); | |
323 } | |
324 | |
288 pthread_mutex_unlock(&channels_mutex); | 325 pthread_mutex_unlock(&channels_mutex); |
289 | 326 |
290 return ch; | 327 return ch; |
328 } | |
329 | |
330 static void uw_release_channel(channel *ch) { | |
331 pthread_mutex_lock(&ch->data.used.lock); | |
332 ++ch->data.used.refcount; | |
333 pthread_mutex_unlock(&ch->data.used.lock); | |
291 } | 334 } |
292 | 335 |
293 static void uw_subscribe(channel *ch, client *c) { | 336 static void uw_subscribe(channel *ch, client *c) { |
294 client_list *cs = malloc(sizeof(client_list)); | 337 client_list *cs = malloc(sizeof(client_list)); |
295 | 338 |
340 client *c = cs->data; | 383 client *c = cs->data; |
341 | 384 |
342 pthread_mutex_lock(&c->data.used.lock); | 385 pthread_mutex_lock(&c->data.used.lock); |
343 | 386 |
344 if (c->data.used.sock != -1) { | 387 if (c->data.used.sock != -1) { |
345 printf("Immediate send\n"); | |
346 uw_really_send(c->data.used.sock, begin_msgs, sizeof(begin_msgs) - 1); | 388 uw_really_send(c->data.used.sock, begin_msgs, sizeof(begin_msgs) - 1); |
347 uw_really_send(c->data.used.sock, pre, preLen); | 389 uw_really_send(c->data.used.sock, pre, preLen); |
348 uw_really_send(c->data.used.sock, msg, len); | 390 uw_really_send(c->data.used.sock, msg, len); |
349 uw_really_send(c->data.used.sock, "\n", 1); | 391 uw_really_send(c->data.used.sock, "\n", 1); |
350 close(c->data.used.sock); | 392 close(c->data.used.sock); |
351 c->data.used.sock = -1; | 393 c->data.used.sock = -1; |
352 } else { | 394 } else { |
353 printf("Delayed send\n"); | |
354 buf_append(&c->data.used.msgs, pre, preLen); | 395 buf_append(&c->data.used.msgs, pre, preLen); |
355 buf_append(&c->data.used.msgs, msg, len); | 396 buf_append(&c->data.used.msgs, msg, len); |
356 buf_append(&c->data.used.msgs, "\n", 1); | 397 buf_append(&c->data.used.msgs, "\n", 1); |
357 } | 398 } |
358 | 399 |
384 typedef struct { | 425 typedef struct { |
385 void (*func)(void*); | 426 void (*func)(void*); |
386 void *arg; | 427 void *arg; |
387 } cleanup; | 428 } cleanup; |
388 | 429 |
430 typedef struct { | |
431 usage mode; | |
432 channel *ch; | |
433 enum { OLD, NEW } newness; | |
434 | |
435 size_t n_subscribed; | |
436 client **subscribed; | |
437 | |
438 buf msgs; | |
439 } channel_delta; | |
440 | |
389 struct uw_context { | 441 struct uw_context { |
390 char *headers, *headers_end; | 442 char *headers, *headers_end; |
391 | 443 |
392 buf outHeaders, page, heap, script; | 444 buf outHeaders, page, heap, script; |
393 char **inputs; | 445 char **inputs; |
401 regions *regions; | 453 regions *regions; |
402 | 454 |
403 cleanup *cleanup, *cleanup_front, *cleanup_back; | 455 cleanup *cleanup, *cleanup_front, *cleanup_back; |
404 | 456 |
405 const char *script_header, *url_prefix; | 457 const char *script_header, *url_prefix; |
458 | |
459 size_t n_deltas; | |
460 channel_delta *deltas; | |
406 | 461 |
407 char error_message[ERROR_BUF_LEN]; | 462 char error_message[ERROR_BUF_LEN]; |
408 }; | 463 }; |
409 | 464 |
410 extern int uw_inputs_len; | 465 extern int uw_inputs_len; |
433 | 488 |
434 ctx->error_message[0] = 0; | 489 ctx->error_message[0] = 0; |
435 | 490 |
436 ctx->source_count = 0; | 491 ctx->source_count = 0; |
437 | 492 |
493 ctx->n_deltas = 0; | |
494 ctx->deltas = malloc(0); | |
495 | |
438 return ctx; | 496 return ctx; |
439 } | 497 } |
440 | 498 |
441 void uw_set_db(uw_context ctx, void *db) { | 499 void uw_set_db(uw_context ctx, void *db) { |
442 ctx->db = db; | 500 ctx->db = db; |
445 void *uw_get_db(uw_context ctx) { | 503 void *uw_get_db(uw_context ctx) { |
446 return ctx->db; | 504 return ctx->db; |
447 } | 505 } |
448 | 506 |
449 void uw_free(uw_context ctx) { | 507 void uw_free(uw_context ctx) { |
508 size_t i; | |
509 | |
450 buf_free(&ctx->outHeaders); | 510 buf_free(&ctx->outHeaders); |
451 buf_free(&ctx->script); | 511 buf_free(&ctx->script); |
452 buf_free(&ctx->page); | 512 buf_free(&ctx->page); |
453 buf_free(&ctx->heap); | 513 buf_free(&ctx->heap); |
454 free(ctx->inputs); | 514 free(ctx->inputs); |
455 free(ctx->cleanup); | 515 free(ctx->cleanup); |
516 | |
517 for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED; ++i) { | |
518 free(ctx->deltas[i].subscribed); | |
519 buf_free(&ctx->deltas[i].msgs); | |
520 } | |
521 | |
456 free(ctx); | 522 free(ctx); |
457 } | 523 } |
458 | 524 |
459 void uw_reset_keep_error_message(uw_context ctx) { | 525 void uw_reset_keep_error_message(uw_context ctx) { |
460 buf_reset(&ctx->outHeaders); | 526 buf_reset(&ctx->outHeaders); |
463 buf_reset(&ctx->page); | 529 buf_reset(&ctx->page); |
464 buf_reset(&ctx->heap); | 530 buf_reset(&ctx->heap); |
465 ctx->regions = NULL; | 531 ctx->regions = NULL; |
466 ctx->cleanup_front = ctx->cleanup; | 532 ctx->cleanup_front = ctx->cleanup; |
467 ctx->source_count = 0; | 533 ctx->source_count = 0; |
534 if (ctx->n_deltas > 0) | |
535 ctx->deltas[0].mode = UNUSED; | |
468 } | 536 } |
469 | 537 |
470 void uw_reset_keep_request(uw_context ctx) { | 538 void uw_reset_keep_request(uw_context ctx) { |
471 uw_reset_keep_error_message(ctx); | 539 uw_reset_keep_error_message(ctx); |
472 ctx->error_message[0] = 0; | 540 ctx->error_message[0] = 0; |
504 } | 572 } |
505 | 573 |
506 ctx->headers_end = s; | 574 ctx->headers_end = s; |
507 } | 575 } |
508 | 576 |
509 failure_kind uw_begin(uw_context ctx, char *path) { | 577 int uw_db_begin(uw_context); |
510 int r = setjmp(ctx->jmp_buf); | |
511 | |
512 if (r == 0) | |
513 uw_handle(ctx, path); | |
514 | |
515 return r; | |
516 } | |
517 | 578 |
518 __attribute__((noreturn)) void uw_error(uw_context ctx, failure_kind fk, const char *fmt, ...) { | 579 __attribute__((noreturn)) void uw_error(uw_context ctx, failure_kind fk, const char *fmt, ...) { |
519 cleanup *cl; | 580 cleanup *cl; |
520 | 581 |
521 va_list ap; | 582 va_list ap; |
544 } | 605 } |
545 | 606 |
546 ctx->cleanup_front->func = func; | 607 ctx->cleanup_front->func = func; |
547 ctx->cleanup_front->arg = arg; | 608 ctx->cleanup_front->arg = arg; |
548 ++ctx->cleanup_front; | 609 ++ctx->cleanup_front; |
610 } | |
611 | |
612 failure_kind uw_begin(uw_context ctx, char *path) { | |
613 int r = setjmp(ctx->jmp_buf); | |
614 | |
615 if (r == 0) { | |
616 if (uw_db_begin(ctx)) | |
617 uw_error(ctx, BOUNDED_RETRY, "Error running SQL BEGIN"); | |
618 uw_handle(ctx, path); | |
619 } | |
620 | |
621 return r; | |
549 } | 622 } |
550 | 623 |
551 void uw_pop_cleanup(uw_context ctx) { | 624 void uw_pop_cleanup(uw_context ctx) { |
552 if (ctx->cleanup_front == ctx->cleanup) | 625 if (ctx->cleanup_front == ctx->cleanup) |
553 uw_error(ctx, FATAL, "Attempt to pop from empty cleanup action stack"); | 626 uw_error(ctx, FATAL, "Attempt to pop from empty cleanup action stack"); |
1726 uw_write_header(ctx, "\r\n"); | 1799 uw_write_header(ctx, "\r\n"); |
1727 | 1800 |
1728 return uw_unit_v; | 1801 return uw_unit_v; |
1729 } | 1802 } |
1730 | 1803 |
1804 static channel_delta *allocate_delta(uw_context ctx, channel *ch) { | |
1805 size_t i; | |
1806 channel_delta *cd; | |
1807 | |
1808 for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED && ctx->deltas[i].ch != ch; ++i); | |
1809 | |
1810 if (i < ctx->n_deltas && ctx->deltas[i].mode == USED && ctx->deltas[i].ch == ch) | |
1811 return &ctx->deltas[i]; | |
1812 | |
1813 if (i < ctx->n_deltas) | |
1814 cd = &ctx->deltas[i]; | |
1815 else { | |
1816 ++ctx->n_deltas; | |
1817 ctx->deltas = realloc(ctx->deltas, sizeof(channel_delta) * ctx->n_deltas); | |
1818 cd = &ctx->deltas[ctx->n_deltas-1]; | |
1819 } | |
1820 | |
1821 cd->mode = USED; | |
1822 cd->newness = OLD; | |
1823 cd->ch = ch; | |
1824 if (cd->n_subscribed > 0) | |
1825 cd->subscribed[0] = NULL; | |
1826 buf_reset(&cd->msgs); | |
1827 return cd; | |
1828 } | |
1829 | |
1731 uw_Basis_channel uw_Basis_new_channel(uw_context ctx, uw_unit u) { | 1830 uw_Basis_channel uw_Basis_new_channel(uw_context ctx, uw_unit u) { |
1732 return uw_new_channel()->id; | 1831 size_t i; |
1832 channel *ch = uw_new_channel(); | |
1833 ++ch->data.used.refcount; | |
1834 channel_delta *cd = allocate_delta(ctx, ch); | |
1835 | |
1836 cd->newness = NEW; | |
1837 | |
1838 return ch->id; | |
1839 } | |
1840 | |
1841 static int delta_used(channel_delta *cd) { | |
1842 return cd->newness == NEW || buf_used(&cd->msgs) > 0 || (cd->n_subscribed > 0 && cd->subscribed[0]); | |
1733 } | 1843 } |
1734 | 1844 |
1735 uw_unit uw_Basis_subscribe(uw_context ctx, uw_Basis_channel chn) { | 1845 uw_unit uw_Basis_subscribe(uw_context ctx, uw_Basis_channel chn) { |
1736 channel *ch = uw_find_channel(chn); | 1846 channel *ch = uw_find_channel(chn); |
1737 | 1847 |
1740 else { | 1850 else { |
1741 size_t id = atoi(uw_Basis_requestHeader(ctx, "UrWeb-Client")); | 1851 size_t id = atoi(uw_Basis_requestHeader(ctx, "UrWeb-Client")); |
1742 int pass = atoi(uw_Basis_requestHeader(ctx, "UrWeb-Pass")); | 1852 int pass = atoi(uw_Basis_requestHeader(ctx, "UrWeb-Pass")); |
1743 client *c = uw_find_client(id); | 1853 client *c = uw_find_client(id); |
1744 | 1854 |
1745 if (c == NULL) | 1855 if (c == NULL) { |
1856 uw_release_channel(ch); | |
1746 uw_error(ctx, FATAL, "Unknown client ID in subscription request"); | 1857 uw_error(ctx, FATAL, "Unknown client ID in subscription request"); |
1747 else if (c->data.used.pass != pass) | 1858 } else if (c->data.used.pass != pass) { |
1859 uw_release_channel(ch); | |
1860 uw_release_client(c); | |
1748 uw_error(ctx, FATAL, "Wrong client password in subscription request"); | 1861 uw_error(ctx, FATAL, "Wrong client password in subscription request"); |
1749 else | 1862 } else { |
1750 uw_subscribe(ch, c); | 1863 size_t i; |
1751 } | 1864 channel_delta *cd = allocate_delta(ctx, ch); |
1865 | |
1866 if (delta_used(cd)) | |
1867 uw_release_channel(ch); | |
1868 | |
1869 for (i = 0; i < cd->n_subscribed && cd->subscribed[i]; ++i); | |
1870 | |
1871 if (i < cd->n_subscribed) | |
1872 cd->subscribed[i] = c; | |
1873 else { | |
1874 ++cd->n_subscribed; | |
1875 cd->subscribed = realloc(cd->subscribed, sizeof(int) * cd->n_subscribed); | |
1876 cd->subscribed[cd->n_subscribed-1] = c; | |
1877 } | |
1878 } | |
1879 } | |
1880 | |
1881 return uw_unit_v; | |
1752 } | 1882 } |
1753 | 1883 |
1754 uw_unit uw_Basis_send(uw_context ctx, uw_Basis_channel chn, uw_Basis_string msg) { | 1884 uw_unit uw_Basis_send(uw_context ctx, uw_Basis_channel chn, uw_Basis_string msg) { |
1755 channel *ch = uw_find_channel(chn); | 1885 channel *ch = uw_find_channel(chn); |
1756 | 1886 |
1757 if (ch == NULL) | 1887 if (ch == NULL) |
1758 uw_error(ctx, FATAL, "Bad channel ID %d", (int)chn); | 1888 uw_error(ctx, FATAL, "Bad channel ID %d", (int)chn); |
1759 else | 1889 else { |
1760 uw_channel_send(ch, msg); | 1890 channel_delta *cd = allocate_delta(ctx, ch); |
1761 } | 1891 if (delta_used(cd)) |
1892 uw_release_channel(ch); | |
1893 buf_append(&cd->msgs, msg, strlen(msg)); | |
1894 } | |
1895 | |
1896 return uw_unit_v; | |
1897 } | |
1898 | |
1899 int uw_db_commit(uw_context); | |
1900 int uw_db_rollback(uw_context); | |
1901 | |
1902 void uw_commit(uw_context ctx) { | |
1903 size_t i, j; | |
1904 | |
1905 for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED; ++i) { | |
1906 channel *ch = ctx->deltas[i].ch; | |
1907 | |
1908 for (j = 0; j < ctx->deltas[i].n_subscribed && ctx->deltas[i].subscribed[j]; ++j) { | |
1909 client *c = ctx->deltas[i].subscribed[j]; | |
1910 | |
1911 uw_subscribe(ch, c); | |
1912 uw_release_client(c); | |
1913 } | |
1914 | |
1915 if (buf_used(&ctx->deltas[i].msgs) > 0) { | |
1916 uw_channel_send(ch, ctx->deltas[i].msgs.start); | |
1917 } | |
1918 | |
1919 uw_release_channel(ch); | |
1920 } | |
1921 | |
1922 if (uw_db_commit(ctx)) | |
1923 uw_error(ctx, FATAL, "Error running SQL COMMIT"); | |
1924 } | |
1925 | |
1926 int uw_rollback(uw_context ctx) { | |
1927 size_t i, j; | |
1928 | |
1929 for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED; ++i) { | |
1930 channel *ch = ctx->deltas[i].ch; | |
1931 | |
1932 for (j = 0; j < ctx->deltas[i].n_subscribed && ctx->deltas[i].subscribed[j]; ++j) { | |
1933 client *c = ctx->deltas[i].subscribed[j]; | |
1934 | |
1935 uw_release_client(c); | |
1936 } | |
1937 | |
1938 if (ctx->deltas[i].newness == NEW) | |
1939 uw_free_channel(ch); | |
1940 else | |
1941 uw_release_channel(ch); | |
1942 } | |
1943 | |
1944 return uw_db_rollback(ctx); | |
1945 } |