# HG changeset patch # User Adam Chlipala # Date 1237920285 14400 # Node ID 729e65db2e2fa66a22fe5f3b9b1b7761d590e9db # Parent f73913d97a40f85414cce813cc1e87dc7b56a147 Transactionalize channel operations diff -r f73913d97a40 -r 729e65db2e2f include/urweb.h --- a/include/urweb.h Sun Mar 22 16:03:45 2009 -0400 +++ b/include/urweb.h Tue Mar 24 14:44:45 2009 -0400 @@ -22,6 +22,8 @@ failure_kind uw_begin_init(uw_context); void uw_set_headers(uw_context, char *headers); failure_kind uw_begin(uw_context, char *path); +void uw_commit(uw_context); +int uw_rollback(uw_context); __attribute__((noreturn)) void uw_error(uw_context, failure_kind, const char *fmt, ...); char *uw_error_message(uw_context); diff -r f73913d97a40 -r 729e65db2e2f lib/js/urweb.js --- a/lib/js/urweb.js Sun Mar 22 16:03:45 2009 -0400 +++ b/lib/js/urweb.js Tue Mar 24 14:44:45 2009 -0400 @@ -246,7 +246,7 @@ if (isok) { var lines = xhr.responseText.split("\n"); if (lines.length < 2) - whine("Empty message from remote server"); + throw "Empty message from remote server"; for (var i = 0; i+1 < lines.length; i += 2) { var chn = lines[i]; diff -r f73913d97a40 -r 729e65db2e2f src/c/driver.c --- a/src/c/driver.c Sun Mar 22 16:03:45 2009 -0400 +++ b/src/c/driver.c Tue Mar 24 14:44:45 2009 -0400 @@ -53,12 +53,8 @@ #define MAX_RETRIES 5 -int uw_db_begin(uw_context); -int uw_db_commit(uw_context); -int uw_db_rollback(uw_context); - static int try_rollback(uw_context ctx) { - int r = uw_db_rollback(ctx); + int r = uw_rollback(ctx); if (r) { printf("Error running SQL ROLLBACK\n"); @@ -206,36 +202,12 @@ printf("Serving URI %s....\n", path); while (1) { - if (uw_db_begin(ctx)) { - printf("Error running SQL BEGIN\n"); - if (retries_left) - --retries_left; - else { - fk = FATAL; - uw_reset(ctx); - uw_write_header(ctx, "HTTP/1.1 500 Internal Server Error\n\r"); - uw_write_header(ctx, "Content-type: text/plain\r\n\r\n"); - uw_write(ctx, "Error running SQL BEGIN\n"); - - break; - } - } - uw_write_header(ctx, "HTTP/1.1 200 OK\r\n"); strcpy(path_copy, path); fk = uw_begin(ctx, path_copy); if (fk == SUCCESS) { - if (uw_db_commit(ctx)) { - fk = FATAL; - - printf("Error running SQL COMMIT\n"); - uw_reset(ctx); - uw_write_header(ctx, "HTTP/1.1 500 Internal Server Error\n\r"); - uw_write_header(ctx, "Content-type: text/plain\r\n"); - uw_write(ctx, "Error running SQL COMMIT\n"); - } - + uw_commit(ctx); break; } else if (fk == BOUNDED_RETRY) { if (retries_left) { diff -r f73913d97a40 -r 729e65db2e2f src/c/urweb.c --- a/src/c/urweb.c Sun Mar 22 16:03:45 2009 -0400 +++ b/src/c/urweb.c Tue Mar 24 14:44:45 2009 -0400 @@ -98,6 +98,7 @@ buf msgs; int sock; time_t last_contact; + unsigned refcount; } used; } data; } client; @@ -129,6 +130,7 @@ c->data.used.pass = rand(); c->data.used.sock = -1; c->data.used.last_contact = time(NULL); + c->data.used.refcount = 0; buf_init(&c->data.used.msgs, 0); pthread_mutex_unlock(&clients_mutex); @@ -154,11 +156,20 @@ pthread_mutex_unlock(&clients_mutex); return NULL; } - + + pthread_mutex_lock(&c->data.used.lock); + ++c->data.used.refcount; + pthread_mutex_unlock(&c->data.used.lock); pthread_mutex_unlock(&clients_mutex); return c; } +static void uw_release_client(client *c) { + pthread_mutex_lock(&c->data.used.lock); + --c->data.used.refcount; + pthread_mutex_unlock(&c->data.used.lock); +} + void uw_client_connect(size_t id, int pass, int sock) { client *c = uw_find_client(id); @@ -168,6 +179,8 @@ return; } + uw_release_client(c); + pthread_mutex_lock(&c->data.used.lock); if (pass != c->data.used.pass) { @@ -219,7 +232,8 @@ pthread_mutex_lock(&clients_mutex); for (i = 0; i < n_clients; ++i) { - if (clients[i]->mode == USED && clients[i]->data.used.last_contact < cutoff) + if (clients[i]->mode == USED && clients[i]->data.used.last_contact < cutoff + && clients[i]->data.used.refcount == 0) uw_free_client(clients[i]); } @@ -242,6 +256,7 @@ struct { pthread_mutex_t lock; client_list *clients; + unsigned refcount; } used; } data; } channel; @@ -271,25 +286,53 @@ ch->mode = USED; pthread_mutex_init(&ch->data.used.lock, NULL); ch->data.used.clients = NULL; + ch->data.used.refcount = 0; pthread_mutex_unlock(&channels_mutex); return ch; } +static void uw_free_channel(channel *ch) { + if (ch->mode == USED) { + client_list *cs; + + for (cs = ch->data.used.clients; cs; ) { + client_list *tmp = cs->next; + free(cs); + cs = tmp; + } + pthread_mutex_destroy(&ch->data.used.lock); + ch->mode = UNUSED; + ch->data.next = channels_free; + channels_free = ch; + } +} + static channel *uw_find_channel(size_t id) { channel *ch = NULL; pthread_mutex_lock(&channels_mutex); - if (id < n_channels && channels[id]->mode == USED) + if (id < n_channels && channels[id]->mode == USED) { ch = channels[id]; + pthread_mutex_lock(&ch->data.used.lock); + ++ch->data.used.refcount; + pthread_mutex_unlock(&ch->data.used.lock); + } + pthread_mutex_unlock(&channels_mutex); return ch; } +static void uw_release_channel(channel *ch) { + pthread_mutex_lock(&ch->data.used.lock); + ++ch->data.used.refcount; + pthread_mutex_unlock(&ch->data.used.lock); +} + static void uw_subscribe(channel *ch, client *c) { client_list *cs = malloc(sizeof(client_list)); @@ -342,7 +385,6 @@ pthread_mutex_lock(&c->data.used.lock); if (c->data.used.sock != -1) { - printf("Immediate send\n"); uw_really_send(c->data.used.sock, begin_msgs, sizeof(begin_msgs) - 1); uw_really_send(c->data.used.sock, pre, preLen); uw_really_send(c->data.used.sock, msg, len); @@ -350,7 +392,6 @@ close(c->data.used.sock); c->data.used.sock = -1; } else { - printf("Delayed send\n"); buf_append(&c->data.used.msgs, pre, preLen); buf_append(&c->data.used.msgs, msg, len); buf_append(&c->data.used.msgs, "\n", 1); @@ -386,6 +427,17 @@ void *arg; } cleanup; +typedef struct { + usage mode; + channel *ch; + enum { OLD, NEW } newness; + + size_t n_subscribed; + client **subscribed; + + buf msgs; +} channel_delta; + struct uw_context { char *headers, *headers_end; @@ -404,6 +456,9 @@ const char *script_header, *url_prefix; + size_t n_deltas; + channel_delta *deltas; + char error_message[ERROR_BUF_LEN]; }; @@ -435,6 +490,9 @@ ctx->source_count = 0; + ctx->n_deltas = 0; + ctx->deltas = malloc(0); + return ctx; } @@ -447,12 +505,20 @@ } void uw_free(uw_context ctx) { + size_t i; + buf_free(&ctx->outHeaders); buf_free(&ctx->script); buf_free(&ctx->page); buf_free(&ctx->heap); free(ctx->inputs); free(ctx->cleanup); + + for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED; ++i) { + free(ctx->deltas[i].subscribed); + buf_free(&ctx->deltas[i].msgs); + } + free(ctx); } @@ -465,6 +531,8 @@ ctx->regions = NULL; ctx->cleanup_front = ctx->cleanup; ctx->source_count = 0; + if (ctx->n_deltas > 0) + ctx->deltas[0].mode = UNUSED; } void uw_reset_keep_request(uw_context ctx) { @@ -506,14 +574,7 @@ ctx->headers_end = s; } -failure_kind uw_begin(uw_context ctx, char *path) { - int r = setjmp(ctx->jmp_buf); - - if (r == 0) - uw_handle(ctx, path); - - return r; -} +int uw_db_begin(uw_context); __attribute__((noreturn)) void uw_error(uw_context ctx, failure_kind fk, const char *fmt, ...) { cleanup *cl; @@ -548,6 +609,18 @@ ++ctx->cleanup_front; } +failure_kind uw_begin(uw_context ctx, char *path) { + int r = setjmp(ctx->jmp_buf); + + if (r == 0) { + if (uw_db_begin(ctx)) + uw_error(ctx, BOUNDED_RETRY, "Error running SQL BEGIN"); + uw_handle(ctx, path); + } + + return r; +} + void uw_pop_cleanup(uw_context ctx) { if (ctx->cleanup_front == ctx->cleanup) uw_error(ctx, FATAL, "Attempt to pop from empty cleanup action stack"); @@ -1728,8 +1801,45 @@ return uw_unit_v; } +static channel_delta *allocate_delta(uw_context ctx, channel *ch) { + size_t i; + channel_delta *cd; + + for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED && ctx->deltas[i].ch != ch; ++i); + + if (i < ctx->n_deltas && ctx->deltas[i].mode == USED && ctx->deltas[i].ch == ch) + return &ctx->deltas[i]; + + if (i < ctx->n_deltas) + cd = &ctx->deltas[i]; + else { + ++ctx->n_deltas; + ctx->deltas = realloc(ctx->deltas, sizeof(channel_delta) * ctx->n_deltas); + cd = &ctx->deltas[ctx->n_deltas-1]; + } + + cd->mode = USED; + cd->newness = OLD; + cd->ch = ch; + if (cd->n_subscribed > 0) + cd->subscribed[0] = NULL; + buf_reset(&cd->msgs); + return cd; +} + uw_Basis_channel uw_Basis_new_channel(uw_context ctx, uw_unit u) { - return uw_new_channel()->id; + size_t i; + channel *ch = uw_new_channel(); + ++ch->data.used.refcount; + channel_delta *cd = allocate_delta(ctx, ch); + + cd->newness = NEW; + + return ch->id; +} + +static int delta_used(channel_delta *cd) { + return cd->newness == NEW || buf_used(&cd->msgs) > 0 || (cd->n_subscribed > 0 && cd->subscribed[0]); } uw_unit uw_Basis_subscribe(uw_context ctx, uw_Basis_channel chn) { @@ -1742,13 +1852,33 @@ int pass = atoi(uw_Basis_requestHeader(ctx, "UrWeb-Pass")); client *c = uw_find_client(id); - if (c == NULL) + if (c == NULL) { + uw_release_channel(ch); uw_error(ctx, FATAL, "Unknown client ID in subscription request"); - else if (c->data.used.pass != pass) + } else if (c->data.used.pass != pass) { + uw_release_channel(ch); + uw_release_client(c); uw_error(ctx, FATAL, "Wrong client password in subscription request"); - else - uw_subscribe(ch, c); + } else { + size_t i; + channel_delta *cd = allocate_delta(ctx, ch); + + if (delta_used(cd)) + uw_release_channel(ch); + + for (i = 0; i < cd->n_subscribed && cd->subscribed[i]; ++i); + + if (i < cd->n_subscribed) + cd->subscribed[i] = c; + else { + ++cd->n_subscribed; + cd->subscribed = realloc(cd->subscribed, sizeof(int) * cd->n_subscribed); + cd->subscribed[cd->n_subscribed-1] = c; + } + } } + + return uw_unit_v; } uw_unit uw_Basis_send(uw_context ctx, uw_Basis_channel chn, uw_Basis_string msg) { @@ -1756,6 +1886,60 @@ if (ch == NULL) uw_error(ctx, FATAL, "Bad channel ID %d", (int)chn); - else - uw_channel_send(ch, msg); + else { + channel_delta *cd = allocate_delta(ctx, ch); + if (delta_used(cd)) + uw_release_channel(ch); + buf_append(&cd->msgs, msg, strlen(msg)); + } + + return uw_unit_v; } + +int uw_db_commit(uw_context); +int uw_db_rollback(uw_context); + +void uw_commit(uw_context ctx) { + size_t i, j; + + for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED; ++i) { + channel *ch = ctx->deltas[i].ch; + + for (j = 0; j < ctx->deltas[i].n_subscribed && ctx->deltas[i].subscribed[j]; ++j) { + client *c = ctx->deltas[i].subscribed[j]; + + uw_subscribe(ch, c); + uw_release_client(c); + } + + if (buf_used(&ctx->deltas[i].msgs) > 0) { + uw_channel_send(ch, ctx->deltas[i].msgs.start); + } + + uw_release_channel(ch); + } + + if (uw_db_commit(ctx)) + uw_error(ctx, FATAL, "Error running SQL COMMIT"); +} + +int uw_rollback(uw_context ctx) { + size_t i, j; + + for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED; ++i) { + channel *ch = ctx->deltas[i].ch; + + for (j = 0; j < ctx->deltas[i].n_subscribed && ctx->deltas[i].subscribed[j]; ++j) { + client *c = ctx->deltas[i].subscribed[j]; + + uw_release_client(c); + } + + if (ctx->deltas[i].newness == NEW) + uw_free_channel(ch); + else + uw_release_channel(ch); + } + + return uw_db_rollback(ctx); +}