# HG changeset patch # User Adam Chlipala # Date 1238341049 14400 # Node ID 5bbb542243e83556bbc82ba047ad6b1bb3c7bd60 # Parent 6c9b8875f34743bd0f6457b6192755413015a3a3 Redo channels, making them single-client diff -r 6c9b8875f347 -r 5bbb542243e8 include/types.h --- a/include/types.h Sat Mar 28 11:15:42 2009 -0400 +++ b/include/types.h Sun Mar 29 11:37:29 2009 -0400 @@ -17,7 +17,11 @@ typedef uw_Basis_string uw_Basis_xhtml; typedef uw_Basis_string uw_Basis_page; -typedef size_t uw_Basis_channel; + +typedef unsigned uw_Basis_client; +typedef struct { + unsigned cli, chn; +} uw_Basis_channel; typedef enum { SUCCESS, FATAL, BOUNDED_RETRY, UNLIMITED_RETRY } failure_kind; diff -r 6c9b8875f347 -r 5bbb542243e8 include/urweb.h --- a/include/urweb.h Sat Mar 28 11:15:42 2009 -0400 +++ b/include/urweb.h Sun Mar 29 11:37:29 2009 -0400 @@ -8,7 +8,7 @@ void uw_global_init(void); -void uw_client_connect(size_t id, int pass, int sock); +void uw_client_connect(unsigned id, int pass, int sock); void uw_prune_clients(time_t timeout); uw_context uw_init(size_t outHeaders_len, size_t script_len, size_t page_len, size_t heap_len); @@ -22,6 +22,7 @@ failure_kind uw_begin_init(uw_context); void uw_set_headers(uw_context, char *headers); failure_kind uw_begin(uw_context, char *path); +void uw_login(uw_context); void uw_commit(uw_context); int uw_rollback(uw_context); @@ -55,7 +56,6 @@ char *uw_Basis_htmlifyString(uw_context, uw_Basis_string); char *uw_Basis_htmlifyBool(uw_context, uw_Basis_bool); char *uw_Basis_htmlifyTime(uw_context, uw_Basis_time); -char *uw_Basis_htmlifyChannel(uw_context, uw_Basis_channel); uw_unit uw_Basis_htmlifyInt_w(uw_context, uw_Basis_int); uw_unit uw_Basis_htmlifyFloat_w(uw_context, uw_Basis_float); @@ -66,7 +66,9 @@ char *uw_Basis_attrifyInt(uw_context, uw_Basis_int); char *uw_Basis_attrifyFloat(uw_context, uw_Basis_float); char *uw_Basis_attrifyString(uw_context, uw_Basis_string); +char *uw_Basis_attrifyTime(uw_context, uw_Basis_time); char *uw_Basis_attrifyChannel(uw_context, uw_Basis_channel); +char *uw_Basis_attrifyClient(uw_context, uw_Basis_client); uw_unit uw_Basis_attrifyInt_w(uw_context, uw_Basis_int); uw_unit uw_Basis_attrifyFloat_w(uw_context, uw_Basis_float); @@ -90,7 +92,6 @@ uw_Basis_string uw_Basis_unurlifyString(uw_context, char **); uw_Basis_bool uw_Basis_unurlifyBool(uw_context, char **); uw_Basis_time uw_Basis_unurlifyTime(uw_context, char **); -uw_Basis_channel uw_Basis_unurlifyChannel(uw_context, char **); uw_Basis_string uw_Basis_strcat(uw_context, const char *, const char *); uw_Basis_string uw_Basis_strdup(uw_context, const char *); @@ -102,6 +103,7 @@ uw_Basis_string uw_Basis_sqlifyBool(uw_context, uw_Basis_bool); uw_Basis_string uw_Basis_sqlifyTime(uw_context, uw_Basis_time); uw_Basis_string uw_Basis_sqlifyChannel(uw_context, uw_Basis_channel); +uw_Basis_string uw_Basis_sqlifyClient(uw_context, uw_Basis_client); uw_Basis_string uw_Basis_sqlifyIntN(uw_context, uw_Basis_int*); uw_Basis_string uw_Basis_sqlifyFloatN(uw_context, uw_Basis_float*); @@ -112,6 +114,7 @@ char *uw_Basis_ensqlBool(uw_Basis_bool); char *uw_Basis_jsifyString(uw_context, uw_Basis_string); +char *uw_Basis_jsifyChannel(uw_context, uw_Basis_channel); uw_Basis_string uw_Basis_intToString(uw_context, uw_Basis_int); uw_Basis_string uw_Basis_floatToString(uw_context, uw_Basis_float); @@ -122,13 +125,13 @@ uw_Basis_float *uw_Basis_stringToFloat(uw_context, uw_Basis_string); uw_Basis_bool *uw_Basis_stringToBool(uw_context, uw_Basis_string); uw_Basis_time *uw_Basis_stringToTime(uw_context, uw_Basis_string); -uw_Basis_channel *uw_Basis_stringToChannel(uw_context, uw_Basis_string); uw_Basis_int uw_Basis_stringToInt_error(uw_context, uw_Basis_string); uw_Basis_float uw_Basis_stringToFloat_error(uw_context, uw_Basis_string); uw_Basis_bool uw_Basis_stringToBool_error(uw_context, uw_Basis_string); uw_Basis_time uw_Basis_stringToTime_error(uw_context, uw_Basis_string); uw_Basis_channel uw_Basis_stringToChannel_error(uw_context, uw_Basis_string); +uw_Basis_client uw_Basis_stringToClient_error(uw_context, uw_Basis_string); uw_Basis_string uw_Basis_requestHeader(uw_context, uw_Basis_string); @@ -138,5 +141,6 @@ uw_unit uw_Basis_set_cookie(uw_context, uw_Basis_string prefix, uw_Basis_string c, uw_Basis_string v); uw_Basis_channel uw_Basis_new_channel(uw_context, uw_unit); -uw_unit uw_Basis_subscribe(uw_context, uw_Basis_channel); uw_unit uw_Basis_send(uw_context, uw_Basis_channel, uw_Basis_string); + +uw_Basis_client uw_Basis_self(uw_context, uw_unit); diff -r 6c9b8875f347 -r 5bbb542243e8 lib/js/urweb.js --- a/lib/js/urweb.js Sat Mar 28 11:15:42 2009 -0400 +++ b/lib/js/urweb.js Sun Mar 29 11:37:29 2009 -0400 @@ -301,6 +301,9 @@ } function rv(chn, parse, k) { + if (chn == null) + return; + if (chn < 0) whine("Out-of-bounds channel receive"); diff -r 6c9b8875f347 -r 5bbb542243e8 lib/ur/basis.urs --- a/lib/ur/basis.urs Sat Mar 28 11:15:42 2009 -0400 +++ b/lib/ur/basis.urs Sun Mar 29 11:37:29 2009 -0400 @@ -115,6 +115,9 @@ val send : t ::: Type -> channel t -> t -> transaction unit val recv : t ::: Type -> channel t -> transaction t +type client +val self : transaction client + (** SQL *) @@ -207,6 +210,7 @@ class sql_injectable_nullable val sql_channel : t ::: Type -> sql_injectable_nullable (channel t) +val sql_client : sql_injectable_nullable client class sql_injectable val sql_prim : t ::: Type -> sql_injectable_prim t -> sql_injectable t diff -r 6c9b8875f347 -r 5bbb542243e8 lib/ur/top.ur --- a/lib/ur/top.ur Sat Mar 28 11:15:42 2009 -0400 +++ b/lib/ur/top.ur Sun Mar 29 11:37:29 2009 -0400 @@ -143,6 +143,14 @@ {f [nm] [t] [rest] ! r1 r2}{acc}) +fun queryI (tables ::: {{Type}}) (exps ::: {Type}) + [tables ~ exps] (q : sql_query tables exps) + (f : $(exps ++ map (fn fields :: {Type} => $fields) tables) + -> transaction unit) = + query q + (fn fs _ => f fs) + () + fun queryX (tables ::: {{Type}}) (exps ::: {Type}) (ctx ::: {Unit}) [tables ~ exps] (q : sql_query tables exps) (f : $(exps ++ map (fn fields :: {Type} => $fields) tables) @@ -188,3 +196,39 @@ case e2 of None => (SQL {e1} IS NULL) | Some _ => sql_binary sql_eq e1 (sql_inject e2) + + +functor Broadcast(M : sig type t end) = struct + sequence s + table t : {Id : int, Client : option client, Channel : option (channel M.t)} + + type topic = int + + val inj : sql_injectable topic = _ + + val create = nextval s + + val cleanup = + dml (DELETE FROM t WHERE Client IS NULL) + + fun subscribe id = + cli <- self; + cleanup; + ro <- oneOrNoRows (SELECT t.Channel FROM t WHERE t.Id = {[id]} AND t.Client = {[Some cli]}); + case ro of + None => + ch <- channel; + dml (INSERT INTO t (Id, Client, Channel) VALUES ({[id]}, {[Some cli]}, {[Some ch]})); + return ch + | Some r => + case r.T.Channel of + None => error Broadcast.subscribe: Got null result + | Some ch => return ch + + fun send id msg = + cleanup; + queryI (SELECT t.Channel FROM t WHERE t.Id = {[id]}) + (fn r => case r.T.Channel of + None => error Broadcast.send: Got null result + | Some ch => Basis.send ch msg) +end diff -r 6c9b8875f347 -r 5bbb542243e8 lib/ur/top.urs --- a/lib/ur/top.urs Sat Mar 28 11:15:42 2009 -0400 +++ b/lib/ur/top.urs Sun Mar 29 11:37:29 2009 -0400 @@ -87,6 +87,13 @@ -> r :: {K} -> folder r -> $(map tf1 r) -> $(map tf2 r) -> xml ctx [] [] +val queryI : tables ::: {{Type}} -> exps ::: {Type} + -> [tables ~ exps] => + sql_query tables exps + -> ($(exps ++ map (fn fields :: {Type} => $fields) tables) + -> transaction unit) + -> transaction unit + val queryX : tables ::: {{Type}} -> exps ::: {Type} -> ctx ::: {Unit} -> [tables ~ exps] => sql_query tables exps @@ -127,3 +134,14 @@ -> sql_exp tables agg exps (option t) -> option t -> sql_exp tables agg exps bool + + +functor Broadcast(M : sig type t end) : sig + type topic + + val inj : sql_injectable topic + + val create : transaction topic + val subscribe : topic -> transaction (channel M.t) + val send : topic -> M.t -> transaction unit +end diff -r 6c9b8875f347 -r 5bbb542243e8 src/c/driver.c --- a/src/c/driver.c Sat Mar 28 11:15:42 2009 -0400 +++ b/src/c/driver.c Sun Mar 29 11:37:29 2009 -0400 @@ -171,10 +171,10 @@ char *pass = uw_Basis_requestHeader(ctx, "UrWeb-Pass"); if (id && pass) { - size_t idn = atoi(id); + unsigned idn = atoi(id); uw_client_connect(idn, atoi(pass), sock); dont_close = 1; - fprintf(stderr, "Processed request for messages by client %d\n\n", (int)idn); + fprintf(stderr, "Processed request for messages by client %u\n\n", idn); } break; } @@ -217,6 +217,8 @@ else { printf("Fatal error (out of retries): %s\n", uw_error_message(ctx)); + try_rollback(ctx); + uw_reset_keep_error_message(ctx); uw_write_header(ctx, "HTTP/1.1 500 Internal Server Error\n\r"); uw_write_header(ctx, "Content-type: text/plain\r\n"); @@ -224,8 +226,6 @@ uw_write(ctx, uw_error_message(ctx)); uw_write(ctx, "\n"); - try_rollback(ctx); - break; } } else if (fk == UNLIMITED_RETRY) @@ -233,6 +233,8 @@ else if (fk == FATAL) { printf("Fatal error: %s\n", uw_error_message(ctx)); + try_rollback(ctx); + uw_reset_keep_error_message(ctx); uw_write_header(ctx, "HTTP/1.1 500 Internal Server Error\r\n"); uw_write_header(ctx, "Content-type: text/html\r\n"); @@ -241,26 +243,24 @@ uw_write(ctx, uw_error_message(ctx)); uw_write(ctx, "\n"); - try_rollback(ctx); - break; } else { printf("Unknown uw_handle return code!\n"); + try_rollback(ctx); + uw_reset_keep_request(ctx); uw_write_header(ctx, "HTTP/1.1 500 Internal Server Error\n\r"); uw_write_header(ctx, "Content-type: text/plain\r\n"); uw_write(ctx, "Unknown uw_handle return code!\n"); - try_rollback(ctx); - break; } - uw_reset_keep_request(ctx); - if (try_rollback(ctx)) break; + + uw_reset_keep_request(ctx); } uw_send(ctx, sock); diff -r 6c9b8875f347 -r 5bbb542243e8 src/c/urweb.c --- a/src/c/urweb.c Sat Mar 28 11:15:42 2009 -0400 +++ b/src/c/urweb.c Sun Mar 29 11:37:29 2009 -0400 @@ -7,6 +7,7 @@ #include #include #include +#include #include @@ -88,79 +89,56 @@ typedef enum { UNUSED, USED } usage; -typedef struct channel_list { - struct channel *data; - struct channel_list *next; -} channel_list; - typedef struct client { - size_t id; + unsigned id; usage mode; - union { - struct client *next; - struct { - pthread_mutex_t lock; - int pass; - buf msgs; - int sock; - time_t last_contact; - unsigned refcount; - channel_list *channels; - } used; - } data; + int pass; + struct client *next; + pthread_mutex_t lock; + buf msgs; + int sock; + time_t last_contact; + unsigned n_channels; } client; -typedef struct client_list { - client *data; - struct client_list *next; -} client_list; - -typedef struct channel { - size_t id; - usage mode; - union { - struct channel *next; - struct { - pthread_mutex_t lock; - client_list *clients; - unsigned refcount; - } used; - } data; -} channel; - // Persistent client state -static client **clients, *clients_free; -static size_t n_clients; +static client **clients, *clients_free, *clients_used; +static unsigned n_clients; static pthread_mutex_t clients_mutex = PTHREAD_MUTEX_INITIALIZER; -static client *uw_new_client() { +static client *new_client() { client *c; pthread_mutex_lock(&clients_mutex); if (clients_free) { c = clients_free; - clients_free = clients_free->data.next; + clients_free = clients_free->next; } else { ++n_clients; clients = realloc(clients, sizeof(client) * n_clients); c = malloc(sizeof(client)); c->id = n_clients-1; + pthread_mutex_init(&c->lock, NULL); + buf_init(&c->msgs, 0); clients[n_clients-1] = c; } + pthread_mutex_lock(&c->lock); c->mode = USED; - pthread_mutex_init(&c->data.used.lock, NULL); - c->data.used.pass = rand(); - c->data.used.sock = -1; - c->data.used.last_contact = time(NULL); - buf_init(&c->data.used.msgs, 0); - c->data.used.refcount = 0; - c->data.used.channels = NULL; + c->pass = rand(); + c->sock = -1; + c->last_contact = time(NULL); + buf_reset(&c->msgs); + c->n_channels = 0; + pthread_mutex_unlock(&c->lock); + + c->next = clients_used; + clients_used = c; pthread_mutex_unlock(&clients_mutex); @@ -169,7 +147,7 @@ static const char begin_msgs[] = "HTTP/1.1 200 OK\r\nContent-type: text/plain\r\n\r\n"; -static client *uw_find_client(size_t id) { +static client *find_client(unsigned id) { client *c; pthread_mutex_lock(&clients_mutex); @@ -181,282 +159,111 @@ c = clients[id]; - if (c->mode != USED) { - pthread_mutex_unlock(&clients_mutex); - return NULL; - } - - pthread_mutex_lock(&c->data.used.lock); - ++c->data.used.refcount; - pthread_mutex_unlock(&c->data.used.lock); pthread_mutex_unlock(&clients_mutex); return c; } -static void uw_release_client(client *c) { - pthread_mutex_lock(&c->data.used.lock); - --c->data.used.refcount; - pthread_mutex_unlock(&c->data.used.lock); -} - -void uw_client_connect(size_t id, int pass, int sock) { - client *c = uw_find_client(id); +void uw_client_connect(unsigned id, int pass, int sock) { + client *c = find_client(id); if (c == NULL) { close(sock); - fprintf(stderr, "Out-of-bounds client request (%d)\n", (int)id); + fprintf(stderr, "Out-of-bounds client request (%u)\n", id); return; } - uw_release_client(c); + pthread_mutex_lock(&c->lock); - pthread_mutex_lock(&c->data.used.lock); - - if (pass != c->data.used.pass) { - pthread_mutex_unlock(&c->data.used.lock); + if (c->mode != USED) { + pthread_mutex_unlock(&c->lock); close(sock); - fprintf(stderr, "Wrong client password (%d)\n", (int)id); + fprintf(stderr, "Client request for unused slot (%u)\n", id); return; } - if (c->data.used.sock != -1) { - close(c->data.used.sock); - c->data.used.sock = -1; + if (pass != c->pass) { + pthread_mutex_unlock(&c->lock); + close(sock); + fprintf(stderr, "Wrong client password (%u, %d)\n", id, pass); + return; } - c->data.used.last_contact = time(NULL); + if (c->sock != -1) { + close(c->sock); + c->sock = -1; + } - if (buf_used(&c->data.used.msgs) > 0) { + c->last_contact = time(NULL); + + if (buf_used(&c->msgs) > 0) { uw_really_send(sock, begin_msgs, sizeof(begin_msgs) - 1); - uw_really_send(sock, c->data.used.msgs.start, buf_used(&c->data.used.msgs)); - buf_reset(&c->data.used.msgs); + uw_really_send(sock, c->msgs.start, buf_used(&c->msgs)); + buf_reset(&c->msgs); close(sock); } else - c->data.used.sock = sock; + c->sock = sock; - pthread_mutex_unlock(&c->data.used.lock); + pthread_mutex_unlock(&c->lock); } - -static void uw_free_client(client *c) { - channel_list *chs; - +static void free_client(client *c) { printf("Freeing client %d\n", c->id); - if (c->mode == USED) { - pthread_mutex_lock(&c->data.used.lock); + c->mode = UNUSED; + c->pass = -1; - for (chs = c->data.used.channels; chs; ) { - client_list *prev, *cs; - - channel *ch = chs->data; - channel_list *tmp = chs->next; - free(chs); - chs = tmp; - - pthread_mutex_lock(&ch->data.used.lock); - for (prev = NULL, cs = ch->data.used.clients; cs; ) { - if (cs->data == c) { - client_list *tmp = cs->next; - free(cs); - cs = tmp; - if (prev) - prev->next = cs; - else - ch->data.used.clients = cs; - } - else { - prev = cs; - cs = cs->next; - } - } - pthread_mutex_unlock(&ch->data.used.lock); - } - - if (c->data.used.sock != -1) - close(c->data.used.sock); - - pthread_mutex_unlock(&c->data.used.lock); - pthread_mutex_destroy(&c->data.used.lock); - buf_free(&c->data.used.msgs); - c->mode = UNUSED; - - c->data.next = clients_free; - clients_free = c; - } + c->next = clients_free; + clients_free = c; } extern int uw_timeout; void uw_prune_clients() { - size_t i; + client *c, *next, *prev = NULL; time_t cutoff; cutoff = time(NULL) - uw_timeout; pthread_mutex_lock(&clients_mutex); - for (i = 0; i < n_clients; ++i) { - if (clients[i]->mode == USED && clients[i]->data.used.last_contact < cutoff - && clients[i]->data.used.refcount == 0) - uw_free_client(clients[i]); + for (c = clients_used; c; c = next) { + next = c->next; + pthread_mutex_lock(&c->lock); + if (c->last_contact < cutoff) { + if (prev) + prev->next = next; + else + clients_used = next; + free_client(c); + } + else + prev = c; + pthread_mutex_unlock(&c->lock); } pthread_mutex_unlock(&clients_mutex); } - -// Persistent channel state - - -static channel **channels, *channels_free; -static size_t n_channels; - -static pthread_mutex_t channels_mutex = PTHREAD_MUTEX_INITIALIZER; - -static channel *uw_new_channel() { - channel *ch; - - pthread_mutex_lock(&channels_mutex); - - if (channels_free) { - ch = channels_free; - channels_free = channels_free->data.next; - } - else { - ++n_channels; - channels = realloc(channels, sizeof(channels) * n_channels); - ch = malloc(sizeof(channel)); - ch->id = n_channels-1; - channels[n_channels-1] = ch; - } - - ch->mode = USED; - pthread_mutex_init(&ch->data.used.lock, NULL); - ch->data.used.clients = NULL; - ch->data.used.refcount = 0; - - pthread_mutex_unlock(&channels_mutex); - +static uw_Basis_channel new_channel(client *c) { + uw_Basis_channel ch = {c->id, c->n_channels++}; return ch; } -static void uw_free_channel(channel *ch) { - if (ch->mode == USED) { - client_list *cs; +static void client_send(int already_locked, client *c, buf *msg) { + if (!already_locked) + pthread_mutex_lock(&c->lock); - for (cs = ch->data.used.clients; cs; ) { - client_list *tmp = cs->next; - free(cs); - cs = tmp; - } - pthread_mutex_destroy(&ch->data.used.lock); - ch->mode = UNUSED; - ch->data.next = channels_free; - channels_free = ch; - } -} + if (c->sock != -1) { + uw_really_send(c->sock, begin_msgs, sizeof(begin_msgs) - 1); + uw_really_send(c->sock, msg->start, buf_used(msg)); + close(c->sock); + c->sock = -1; + } else + buf_append(&c->msgs, msg->start, buf_used(msg)); -static channel *uw_find_channel(size_t id) { - channel *ch = NULL; - - pthread_mutex_lock(&channels_mutex); - - if (id < n_channels && channels[id]->mode == USED) { - ch = channels[id]; - - pthread_mutex_lock(&ch->data.used.lock); - ++ch->data.used.refcount; - pthread_mutex_unlock(&ch->data.used.lock); - } - - pthread_mutex_unlock(&channels_mutex); - - return ch; -} - -static void uw_release_channel(channel *ch) { - pthread_mutex_lock(&ch->data.used.lock); - ++ch->data.used.refcount; - pthread_mutex_unlock(&ch->data.used.lock); -} - -static void uw_subscribe(channel *ch, client *c) { - client_list *cs; - - pthread_mutex_lock(&ch->data.used.lock); - - for (cs = ch->data.used.clients; cs; cs = cs->next) - if (cs->data == c) { - pthread_mutex_unlock(&ch->data.used.lock); - return; - } - - cs = malloc(sizeof(client_list)); - cs->data = c; - cs->next = ch->data.used.clients; - ch->data.used.clients = cs; - - pthread_mutex_unlock(&ch->data.used.lock); -} - -static void uw_unsubscribe(channel *ch, client *c) { - client_list *prev, *cur, *tmp; - - pthread_mutex_lock(&ch->data.used.lock); - - for (prev = NULL, cur = ch->data.used.clients; cur; ) { - if (cur->data == c) { - if (prev) - prev->next = cur->next; - else - ch->data.used.clients = cur->next; - tmp = cur; - cur = cur->next; - free(tmp); - } - else { - prev = cur; - cur = cur->next; - } - } - - pthread_mutex_unlock(&ch->data.used.lock); -} - -static void uw_channel_send(channel *ch, const char *msg) { - size_t len = strlen(msg), preLen; - char pre[INTS_MAX + 2]; - client_list *cs; - - sprintf(pre, "%d\n", (int)ch->id); - preLen = strlen(pre); - - pthread_mutex_lock(&ch->data.used.lock); - - for (cs = ch->data.used.clients; cs; cs = cs->next) { - client *c = cs->data; - - pthread_mutex_lock(&c->data.used.lock); - - if (c->data.used.sock != -1) { - uw_really_send(c->data.used.sock, begin_msgs, sizeof(begin_msgs) - 1); - uw_really_send(c->data.used.sock, pre, preLen); - uw_really_send(c->data.used.sock, msg, len); - uw_really_send(c->data.used.sock, "\n", 1); - close(c->data.used.sock); - c->data.used.sock = -1; - } else { - buf_append(&c->data.used.msgs, pre, preLen); - buf_append(&c->data.used.msgs, msg, len); - buf_append(&c->data.used.msgs, "\n", 1); - } - - pthread_mutex_unlock(&c->data.used.lock); - } - - pthread_mutex_unlock(&ch->data.used.lock); + if (!already_locked) + pthread_mutex_unlock(&c->lock); } @@ -466,7 +273,6 @@ srand(time(NULL) ^ getpid()); clients = malloc(0); - channels = malloc(0); } @@ -484,15 +290,9 @@ } cleanup; typedef struct { - usage mode; - channel *ch; - enum { OLD, NEW } newness; - - size_t n_subscribed; - client **subscribed; - + unsigned client; buf msgs; -} channel_delta; +} delta; struct uw_context { char *headers, *headers_end; @@ -512,11 +312,13 @@ const char *script_header, *url_prefix; - size_t n_deltas; - channel_delta *deltas; + size_t n_deltas, used_deltas; + delta *deltas; int timeout; + client *client; + char error_message[ERROR_BUF_LEN]; }; @@ -548,7 +350,7 @@ ctx->source_count = 0; - ctx->n_deltas = 0; + ctx->n_deltas = ctx->used_deltas = 0; ctx->deltas = malloc(0); ctx->timeout = uw_timeout; @@ -574,10 +376,8 @@ free(ctx->inputs); free(ctx->cleanup); - for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED; ++i) { - free(ctx->deltas[i].subscribed); + for (i = 0; i < ctx->n_deltas; ++i) buf_free(&ctx->deltas[i].msgs); - } free(ctx); } @@ -591,8 +391,8 @@ ctx->regions = NULL; ctx->cleanup_front = ctx->cleanup; ctx->source_count = 0; - if (ctx->n_deltas > 0) - ctx->deltas[0].mode = UNUSED; + ctx->used_deltas = 0; + ctx->client = NULL; } void uw_reset_keep_request(uw_context ctx) { @@ -669,18 +469,73 @@ ++ctx->cleanup_front; } +uw_Basis_string uw_Basis_requestHeader(uw_context ctx, uw_Basis_string h) { + int len = strlen(h); + char *s = ctx->headers, *p; + + while (p = strchr(s, ':')) { + if (p - s == len && !strncasecmp(s, h, len)) { + return p + 2; + } else { + if ((s = strchr(p, 0)) && s < ctx->headers_end) + s += 2; + else + return NULL; + } + } + + return NULL; +} + +void uw_login(uw_context ctx) { + if (ctx->script_header[0]) { + char *id_s, *pass_s; + + if ((id_s = uw_Basis_requestHeader(ctx, "UrWeb-Client")) + && (pass_s = uw_Basis_requestHeader(ctx, "UrWeb-Pass"))) { + unsigned id = atoi(id_s); + int pass = atoi(pass_s); + client *c = find_client(id); + + if (c == NULL) + uw_error(ctx, FATAL, "Unknown client ID in HTTP headers (%s, %s)", id_s, pass_s); + else { + pthread_mutex_lock(&c->lock); + ctx->client = c; + + if (c->mode != USED) + uw_error(ctx, FATAL, "Stale client ID (%u) in subscription request", id); + if (c->pass != pass) + uw_error(ctx, FATAL, "Wrong client password (%u, %d) in subscription request", id, pass); + } + } else { + client *c = new_client(); + pthread_mutex_lock(&c->lock); + ctx->client = c; + } + } +} + failure_kind uw_begin(uw_context ctx, char *path) { int r = setjmp(ctx->jmp_buf); if (r == 0) { if (uw_db_begin(ctx)) uw_error(ctx, BOUNDED_RETRY, "Error running SQL BEGIN"); + uw_handle(ctx, path); } return r; } +uw_Basis_client uw_Basis_self(uw_context ctx, uw_unit u) { + if (ctx->client == NULL) + uw_error(ctx, FATAL, "Call to Basis.self() from page that has only server-side code"); + + return ctx->client->id; +} + void uw_pop_cleanup(uw_context ctx) { if (ctx->cleanup_front == ctx->cleanup) uw_error(ctx, FATAL, "Attempt to pop from empty cleanup action stack"); @@ -843,9 +698,6 @@ if (ctx->script_header[0] == 0) return ""; else { - int pass; - client *c = uw_new_client(&pass); - char *r = uw_malloc(ctx, strlen(ctx->script_header) + 18 + buf_used(&ctx->script)); sprintf(r, "%s", ctx->script_header, @@ -855,16 +707,13 @@ } const char *uw_Basis_get_settings(uw_context ctx, uw_Basis_string onload) { - if (ctx->script_header[0] == 0) + if (ctx->client == NULL) return ""; else { - int pass; - client *c = uw_new_client(&pass); - char *r = uw_malloc(ctx, 52 + 3 * INTS_MAX + strlen(ctx->url_prefix) + strlen(onload)); - sprintf(r, " onload='client_id=%d;client_pass=%d;url_prefix=\"%s\";timeout=%d;listener();%s'", - (int)c->id, - c->data.used.pass, + sprintf(r, " onload='client_id=%u;client_pass=%d;url_prefix=\"%s\";timeout=%d;listener();%s'", + ctx->client->id, + ctx->client->pass, ctx->url_prefix, ctx->timeout, onload); @@ -942,6 +791,21 @@ return r; } +char *uw_Basis_jsifyChannel(uw_context ctx, uw_Basis_channel chn) { + if (ctx->client == NULL || chn.cli != ctx->client->id) + return "null"; + else { + int len; + char *r; + + uw_check_heap(ctx, INTS_MAX + 1); + r = ctx->heap.front; + sprintf(r, "%u%n", chn.chn, &len); + ctx->heap.front += len+1; + return r; + } +} + uw_Basis_int uw_Basis_new_client_source(uw_context ctx, uw_Basis_string s) { int len; size_t s_len = strlen(s); @@ -1007,16 +871,6 @@ return result; } -char *uw_Basis_attrifyChannel(uw_context ctx, uw_Basis_channel n) { - char *result; - int len; - uw_check_heap(ctx, INTS_MAX); - result = ctx->heap.front; - sprintf(result, "%lld%n", (long long)n, &len); - ctx->heap.front += len+1; - return result; -} - char *uw_Basis_attrifyFloat(uw_context ctx, uw_Basis_float n) { char *result; int len; @@ -1116,15 +970,19 @@ return r; } -char *uw_Basis_urlifyChannel(uw_context ctx, uw_Basis_channel n) { - int len; - char *r; +char *uw_Basis_urlifyChannel(uw_context ctx, uw_Basis_channel chn) { + if (ctx->client == NULL || chn.cli != ctx->client->id) + return ""; + else { + int len; + char *r; - uw_check_heap(ctx, INTS_MAX); - r = ctx->heap.front; - sprintf(r, "%lld%n", (long long)n, &len); - ctx->heap.front += len+1; - return r; + uw_check_heap(ctx, INTS_MAX + 1); + r = ctx->heap.front; + sprintf(r, "%u%n", chn.chn, &len); + ctx->heap.front += len+1; + return r; + } } char *uw_Basis_urlifyFloat(uw_context ctx, uw_Basis_float n) { @@ -1182,13 +1040,15 @@ return uw_unit_v; } -uw_unit uw_Basis_urlifyChannel_w(uw_context ctx, uw_Basis_channel n) { - int len; - - uw_check(ctx, INTS_MAX); - sprintf(ctx->page.front, "%lld%n", (long long)n, &len); - ctx->page.front += len; - +uw_unit uw_Basis_urlifyChannel_w(uw_context ctx, uw_Basis_channel chn) { + if (ctx->client != NULL && chn.cli == ctx->client->id) { + int len; + + uw_check(ctx, INTS_MAX + 1); + sprintf(ctx->page.front, "%u%n", chn.chn, &len); + ctx->page.front += len; + } + return uw_unit_v; } @@ -1255,10 +1115,6 @@ return r; } -uw_Basis_channel uw_Basis_unurlifyChannel(uw_context ctx, char **s) { - return uw_Basis_unurlifyInt(ctx, s); -} - uw_Basis_float uw_Basis_unurlifyFloat(uw_context ctx, char **s) { char *new_s = uw_unurlify_advance(*s); uw_Basis_float r; @@ -1350,10 +1206,6 @@ return uw_unit_v; } -char *uw_Basis_htmlifyChannel(uw_context ctx, uw_Basis_channel ch) { - return uw_Basis_htmlifyInt(ctx, ch); -} - char *uw_Basis_htmlifyFloat(uw_context ctx, uw_Basis_float n) { int len; char *r; @@ -1541,17 +1393,6 @@ return r; } -char *uw_Basis_sqlifyChannel(uw_context ctx, uw_Basis_channel n) { - int len; - char *r; - - uw_check_heap(ctx, INTS_MAX + 6); - r = ctx->heap.front; - sprintf(r, "%lld::int4%n", (long long)n, &len); - ctx->heap.front += len+1; - return r; -} - char *uw_Basis_sqlifyIntN(uw_context ctx, uw_Basis_int *n) { if (n == NULL) return "NULL"; @@ -1614,6 +1455,52 @@ return r; } +char *uw_Basis_sqlifyChannel(uw_context ctx, uw_Basis_channel chn) { + int len; + char *r; + unsigned long long combo = ((unsigned long long)chn.cli << 32) | chn.chn; + + uw_check_heap(ctx, INTS_MAX + 7); + r = ctx->heap.front; + sprintf(r, "%lld::int8%n", combo, &len); + ctx->heap.front += len+1; + return r; +} + +char *uw_Basis_attrifyChannel(uw_context ctx, uw_Basis_channel chn) { + int len; + char *r; + unsigned long long combo = ((unsigned long long)chn.cli << 32) | chn.chn; + + uw_check_heap(ctx, INTS_MAX + 1); + r = ctx->heap.front; + sprintf(r, "%lld%n", combo, &len); + ctx->heap.front += len+1; + return r; +} + +char *uw_Basis_sqlifyClient(uw_context ctx, uw_Basis_client cli) { + int len; + char *r; + + uw_check_heap(ctx, INTS_MAX + 7); + r = ctx->heap.front; + sprintf(r, "%u::int4%n", cli, &len); + ctx->heap.front += len+1; + return r; +} + +char *uw_Basis_attrifyClient(uw_context ctx, uw_Basis_client cli) { + int len; + char *r; + + uw_check_heap(ctx, INTS_MAX + 1); + r = ctx->heap.front; + sprintf(r, "%u%n", cli, &len); + ctx->heap.front += len+1; + return r; +} + uw_Basis_string uw_Basis_sqlifyStringN(uw_context ctx, uw_Basis_string s) { if (s == NULL) return "NULL"; @@ -1637,6 +1524,21 @@ char *uw_Basis_sqlifyTime(uw_context ctx, uw_Basis_time t) { size_t len; + char *r, *s; + struct tm stm; + + if (localtime_r(&t, &stm)) { + s = uw_malloc(ctx, TIMES_MAX); + len = strftime(s, TIMES_MAX, TIME_FMT, &stm); + r = uw_malloc(ctx, len + 14); + sprintf(r, "'%s'::timestamp", s); + return r; + } else + return ""; +} + +char *uw_Basis_attrifyTime(uw_context ctx, uw_Basis_time t) { + size_t len; char *r; struct tm stm; @@ -1723,18 +1625,6 @@ return NULL; } -uw_Basis_channel *uw_Basis_stringToChannel(uw_context ctx, uw_Basis_string s) { - char *endptr; - uw_Basis_channel n = strtoll(s, &endptr, 10); - - if (*s != '\0' && *endptr == '\0') { - uw_Basis_channel *r = uw_malloc(ctx, sizeof(uw_Basis_channel)); - *r = n; - return r; - } else - return NULL; -} - uw_Basis_float *uw_Basis_stringToFloat(uw_context ctx, uw_Basis_string s) { char *endptr; uw_Basis_float n = strtod(s, &endptr); @@ -1802,14 +1692,27 @@ uw_error(ctx, FATAL, "Can't parse int: %s", s); } +#include + uw_Basis_channel uw_Basis_stringToChannel_error(uw_context ctx, uw_Basis_string s) { + unsigned long long n; + + if (sscanf(s, "%llu", &n) < 1) + uw_error(ctx, FATAL, "Can't parse channel: %s", s); + else { + uw_Basis_channel ch = {n >> 32, n & ((1ull << 32) - 1)}; + return ch; + } +} + +uw_Basis_client uw_Basis_stringToClient_error(uw_context ctx, uw_Basis_string s) { char *endptr; - uw_Basis_channel n = strtoll(s, &endptr, 10); + unsigned long n = strtoul(s, &endptr, 10); if (*s != '\0' && *endptr == '\0') return n; else - uw_error(ctx, FATAL, "Can't parse channel int: %s", s); + uw_error(ctx, FATAL, "Can't parse client: %s", s); } uw_Basis_float uw_Basis_stringToFloat_error(uw_context ctx, uw_Basis_string s) { @@ -1856,22 +1759,6 @@ } } -uw_Basis_string uw_Basis_requestHeader(uw_context ctx, uw_Basis_string h) { - int len = strlen(h); - char *s = ctx->headers, *p; - - while (p = strchr(s, ':')) { - if (p - s == len && !strncasecmp(s, h, len)) { - return p + 2; - } else { - if ((s = strchr(p, 0)) && s < ctx->headers_end) - s += 2; - else - return NULL; - } - } -} - uw_Basis_string uw_Basis_get_cookie(uw_context ctx, uw_Basis_string c) { int len = strlen(c); char *s = ctx->headers, *p = ctx->outHeaders.start; @@ -1930,100 +1817,45 @@ return uw_unit_v; } -static channel_delta *allocate_delta(uw_context ctx, channel *ch) { - size_t i; - channel_delta *cd; +static delta *allocate_delta(uw_context ctx, unsigned client) { + unsigned i; + delta *d; - for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED && ctx->deltas[i].ch != ch; ++i); + for (i = 0; i < ctx->used_deltas; ++i) + if (ctx->deltas[i].client == client) + return &ctx->deltas[i]; - if (i < ctx->n_deltas && ctx->deltas[i].mode == USED && ctx->deltas[i].ch == ch) - return &ctx->deltas[i]; + if (ctx->used_deltas >= ctx->n_deltas) { + ctx->deltas = realloc(ctx->deltas, sizeof(delta) * ++ctx->n_deltas); + buf_init(&ctx->deltas[ctx->n_deltas-1].msgs, 0); + } - if (i < ctx->n_deltas) - cd = &ctx->deltas[i]; - else { - ++ctx->n_deltas; - ctx->deltas = realloc(ctx->deltas, sizeof(channel_delta) * ctx->n_deltas); - cd = &ctx->deltas[ctx->n_deltas-1]; - cd->n_subscribed = 0; - cd->subscribed = malloc(0); - buf_init(&cd->msgs, 0); - } - - cd->mode = USED; - cd->newness = OLD; - cd->ch = ch; - if (cd->n_subscribed > 0) - cd->subscribed[0] = NULL; - buf_reset(&cd->msgs); - return cd; + d = &ctx->deltas[ctx->used_deltas++]; + d->client = client; + buf_reset(&d->msgs); + return d; } uw_Basis_channel uw_Basis_new_channel(uw_context ctx, uw_unit u) { - size_t i; - channel *ch = uw_new_channel(); - ++ch->data.used.refcount; - channel_delta *cd = allocate_delta(ctx, ch); + if (ctx->client == NULL) + uw_error(ctx, FATAL, "Attempt to create channel on request not associated with a persistent connection"); - cd->newness = NEW; - - return ch->id; -} - -static int delta_used(channel_delta *cd) { - return cd->newness == NEW || buf_used(&cd->msgs) > 0 || (cd->n_subscribed > 0 && cd->subscribed[0]); -} - -uw_unit uw_Basis_subscribe(uw_context ctx, uw_Basis_channel chn) { - channel *ch = uw_find_channel(chn); - - if (ch == NULL) - uw_error(ctx, FATAL, "Bad channel ID %d", (int)chn); - else { - size_t id = atoi(uw_Basis_requestHeader(ctx, "UrWeb-Client")); - int pass = atoi(uw_Basis_requestHeader(ctx, "UrWeb-Pass")); - client *c = uw_find_client(id); - - if (c == NULL) { - uw_release_channel(ch); - uw_error(ctx, FATAL, "Unknown client ID in subscription request"); - } else if (c->data.used.pass != pass) { - uw_release_channel(ch); - uw_release_client(c); - uw_error(ctx, FATAL, "Wrong client password (%d) in subscription request", pass); - } else { - size_t i; - channel_delta *cd = allocate_delta(ctx, ch); - - if (delta_used(cd)) - uw_release_channel(ch); - - for (i = 0; i < cd->n_subscribed && cd->subscribed[i]; ++i); - - if (i < cd->n_subscribed) - cd->subscribed[i] = c; - else { - ++cd->n_subscribed; - cd->subscribed = realloc(cd->subscribed, sizeof(int) * cd->n_subscribed); - cd->subscribed[cd->n_subscribed-1] = c; - } - } - } - - return uw_unit_v; + return new_channel(ctx->client); } uw_unit uw_Basis_send(uw_context ctx, uw_Basis_channel chn, uw_Basis_string msg) { - channel *ch = uw_find_channel(chn); + delta *d = allocate_delta(ctx, chn.cli); + size_t len; + int preLen; + char pre[INTS_MAX + 2]; - if (ch == NULL) - uw_error(ctx, FATAL, "Bad channel ID %d", (int)chn); - else { - channel_delta *cd = allocate_delta(ctx, ch); - if (delta_used(cd)) - uw_release_channel(ch); - buf_append(&cd->msgs, msg, strlen(msg)); - } + len = strlen(msg); + + sprintf(pre, "%u\n%n", chn.chn, &preLen); + + buf_append(&d->msgs, pre, preLen); + buf_append(&d->msgs, msg, len); + buf_append(&d->msgs, "\n", 1); return uw_unit_v; } @@ -2032,46 +1864,27 @@ int uw_db_rollback(uw_context); void uw_commit(uw_context ctx) { - size_t i, j; - - for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED; ++i) { - channel *ch = ctx->deltas[i].ch; - - for (j = 0; j < ctx->deltas[i].n_subscribed && ctx->deltas[i].subscribed[j]; ++j) { - client *c = ctx->deltas[i].subscribed[j]; - - uw_subscribe(ch, c); - uw_release_client(c); - } - - if (buf_used(&ctx->deltas[i].msgs) > 0) { - uw_channel_send(ch, ctx->deltas[i].msgs.start); - } - - uw_release_channel(ch); - } + unsigned i; if (uw_db_commit(ctx)) uw_error(ctx, FATAL, "Error running SQL COMMIT"); + + for (i = 0; i < ctx->used_deltas; ++i) { + delta *d = &ctx->deltas[i]; + client *c = find_client(d->client); + + assert (c != NULL && c->mode == USED); + + client_send(c == ctx->client, c, &d->msgs); + } + + if (ctx->client) + pthread_mutex_unlock(&ctx->client->lock); } int uw_rollback(uw_context ctx) { - size_t i, j; - - for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED; ++i) { - channel *ch = ctx->deltas[i].ch; - - for (j = 0; j < ctx->deltas[i].n_subscribed && ctx->deltas[i].subscribed[j]; ++j) { - client *c = ctx->deltas[i].subscribed[j]; - - uw_release_client(c); - } - - if (ctx->deltas[i].newness == NEW) - uw_free_channel(ch); - else - uw_release_channel(ch); - } + if (ctx->client) + pthread_mutex_unlock(&ctx->client->lock); return uw_db_rollback(ctx); } diff -r 6c9b8875f347 -r 5bbb542243e8 src/cjr_print.sml --- a/src/cjr_print.sml Sat Mar 28 11:15:42 2009 -0400 +++ b/src/cjr_print.sml Sun Mar 29 11:37:29 2009 -0400 @@ -404,6 +404,7 @@ | TFfi ("Basis", "bool") => box [string "uw_Basis_stringToBool_error(ctx, ", e, string ")"] | TFfi ("Basis", "time") => box [string "uw_Basis_stringToTime_error(ctx, ", e, string ")"] | TFfi ("Basis", "channel") => box [string "uw_Basis_stringToChannel_error(ctx, ", e, string ")"] + | TFfi ("Basis", "client") => box [string "uw_Basis_stringToClient_error(ctx, ", e, string ")"] | _ => (ErrorMsg.errorAt loc "Don't know how to unmarshal type from SQL"; Print.eprefaces' [("Type", p_typ env tAll)]; @@ -447,6 +448,7 @@ | Bool | Time | Channel + | Client | Nullable of sql_type fun p_sql_type' t = @@ -457,6 +459,7 @@ | Bool => "uw_Basis_bool" | Time => "uw_Basis_time" | Channel => "uw_Basis_channel" + | Client => "uw_Basis_client" | Nullable String => "uw_Basis_string" | Nullable t => p_sql_type' t ^ "*" @@ -473,6 +476,7 @@ | EFfiApp ("Basis", "sqlifyBool", [e]) => [(e, Bool)] | EFfiApp ("Basis", "sqlifyTime", [e]) => [(e, Time)] | EFfiApp ("Basis", "sqlifyChannel", [e]) => [(e, Channel)] + | EFfiApp ("Basis", "sqlifyClient", [e]) => [(e, Client)] | ECase (e, [((PNone _, _), @@ -496,8 +500,9 @@ | Float => box [string "uw_Basis_attrifyFloat(ctx, ", e, string ")"] | String => e | Bool => box [string "(", e, string " ? \"TRUE\" : \"FALSE\")"] - | Time => box [string "uw_Basis_sqlifyTime(ctx, ", e, string ")"] + | Time => box [string "uw_Basis_attrifyTime(ctx, ", e, string ")"] | Channel => box [string "uw_Basis_attrifyChannel(ctx, ", e, string ")"] + | Client => box [string "uw_Basis_attrifyClient(ctx, ", e, string ")"] | Nullable String => e | Nullable t => box [string "(", e, @@ -1982,7 +1987,7 @@ newline, string "PGconn *conn = uw_get_db(ctx);", newline, - string "PGresult *res = PQexec(conn, \"BEGIN\");", + string "PGresult *res = PQexec(conn, \"BEGIN ISOLATION LEVEL SERIALIZABLE\");", newline, newline, string "if (res == NULL) return 1;", @@ -2108,7 +2113,8 @@ | TFfi ("Basis", "string") => "text" | TFfi ("Basis", "bool") => "bool" | TFfi ("Basis", "time") => "timestamp" - | TFfi ("Basis", "channel") => "int4" + | TFfi ("Basis", "channel") => "int8" + | TFfi ("Basis", "client") => "int4" | _ => (ErrorMsg.errorAt loc "Don't know SQL equivalent of type"; Print.eprefaces' [("Type", p_typ env tAll)]; "ERROR") @@ -2368,6 +2374,8 @@ string (!Monoize.urlPrefix), string "\");", newline]), + string "uw_login(ctx);", + newline, box [string "{", newline, box (ListUtil.mapi (fn (i, t) => box [p_typ env t, diff -r 6c9b8875f347 -r 5bbb542243e8 src/jscomp.sml --- a/src/jscomp.sml Sat Mar 28 11:15:42 2009 -0400 +++ b/src/jscomp.sml Sun Mar 29 11:37:29 2009 -0400 @@ -49,7 +49,6 @@ (("Basis", "urlifyInt"), "ts"), (("Basis", "urlifyFloat"), "ts"), (("Basis", "urlifyString"), "uf"), - (("Basis", "urlifyChannel"), "ts"), (("Basis", "recv"), "rv")] structure FM = BinaryMapFn(struct @@ -220,7 +219,7 @@ | TFfi ("Basis", "string") => ((EFfiApp ("Basis", "jsifyString", [e]), loc), st) | TFfi ("Basis", "int") => ((EFfiApp ("Basis", "htmlifyInt", [e]), loc), st) | TFfi ("Basis", "float") => ((EFfiApp ("Basis", "htmlifyFloat", [e]), loc), st) - | TFfi ("Basis", "channel") => ((EFfiApp ("Basis", "htmlifyChannel", [e]), loc), st) + | TFfi ("Basis", "channel") => ((EFfiApp ("Basis", "jsifyChannel", [e]), loc), st) | TFfi ("Basis", "bool") => ((ECase (e, [((PCon (Enum, PConFfi {mod = "Basis", @@ -348,7 +347,7 @@ | TFfi ("Basis", "string") => ("uu(t[i++])", st) | TFfi ("Basis", "int") => ("parseInt(t[i++])", st) | TFfi ("Basis", "float") => ("parseFloat(t[i++])", st) - | TFfi ("Basis", "channel") => ("parseInt(t[i++])", st) + | TFfi ("Basis", "channel") => ("(t[i++].length > 0 ? parseInt(t[i]) : null)", st) | TFfi ("Basis", "bool") => ("t[i++] == \"True\"", st) diff -r 6c9b8875f347 -r 5bbb542243e8 src/monoize.sml --- a/src/monoize.sml Sat Mar 28 11:15:42 2009 -0400 +++ b/src/monoize.sml Sun Mar 29 11:37:29 2009 -0400 @@ -1110,14 +1110,6 @@ ((L'.EAbs ("_", (L'.TRecord [], loc), (L'.TFfi ("Basis", "channel"), loc), (L'.EFfiApp ("Basis", "new_channel", [(L'.ERecord [], loc)]), loc)), loc), fm) - | L.ECApp ((L.EFfi ("Basis", "subscribe"), _), t) => - ((L'.EAbs ("ch", (L'.TFfi ("Basis", "channel"), loc), - (L'.TFun ((L'.TRecord [], loc), (L'.TRecord [], loc)), loc), - (L'.EAbs ("_", (L'.TRecord [], loc), (L'.TRecord [], loc), - (L'.EFfiApp ("Basis", "subscribe", - [(L'.ERel 1, loc)]), - loc)), loc)), loc), - fm) | L.ECApp ((L.EFfi ("Basis", "send"), _), t) => let val t = monoType env t @@ -1431,6 +1423,10 @@ ((L'.EAbs ("x", (L'.TFfi ("Basis", "channel"), loc), (L'.TFfi ("Basis", "string"), loc), (L'.EFfiApp ("Basis", "sqlifyChannel", [(L'.ERel 0, loc)]), loc)), loc), fm) + | L.EFfi ("Basis", "sql_client") => + ((L'.EAbs ("x", (L'.TFfi ("Basis", "client"), loc), (L'.TFfi ("Basis", "string"), loc), + (L'.EFfiApp ("Basis", "sqlifyClient", [(L'.ERel 0, loc)]), loc)), loc), + fm) | L.ECApp ((L.EFfi ("Basis", "sql_prim"), _), t) => let val t = monoType env t diff -r 6c9b8875f347 -r 5bbb542243e8 src/prepare.sml --- a/src/prepare.sml Sat Mar 28 11:15:42 2009 -0400 +++ b/src/prepare.sml Sun Mar 29 11:37:29 2009 -0400 @@ -48,6 +48,8 @@ | EFfiApp ("Basis", "sqlifyTime", [e]) => SOME ("$" ^ Int.toString (n + 1) ^ "::timestamp" :: ss, n + 1) | EFfiApp ("Basis", "sqlifyChannel", [e]) => + SOME ("$" ^ Int.toString (n + 1) ^ "::int8" :: ss, n + 1) + | EFfiApp ("Basis", "sqlifyClient", [e]) => SOME ("$" ^ Int.toString (n + 1) ^ "::int4" :: ss, n + 1) | ECase (e, diff -r 6c9b8875f347 -r 5bbb542243e8 tests/chat.ur --- a/tests/chat.ur Sat Mar 28 11:15:42 2009 -0400 +++ b/tests/chat.ur Sun Mar 29 11:37:29 2009 -0400 @@ -9,32 +9,22 @@ log <- signal logS; return (render log) +structure Room = Broadcast(struct + type t = string + end) + sequence s -table t : { Id : int, Title : string, Chan : option (channel string) } +table t : { Id : int, Title : string, Room : Room.topic } fun chat id = - r <- oneRow (SELECT t.Title, t.Chan FROM t WHERE t.Id = {[id]}); - ch <- (case r.T.Chan of - None => (ch <- channel; - dml (UPDATE t SET Chan = {[Some ch]} WHERE Id = {[id]}); - return ch) - | Some ch => return ch); + r <- oneRow (SELECT t.Title, t.Room FROM t WHERE t.Id = {[id]}); + ch <- Room.subscribe r.T.Room; newLine <- source ""; logHead <- source End; logTail <- source logHead; let - fun getCh () = - r <- oneRow (SELECT t.Chan FROM t WHERE t.Id = {[id]}); - case r.T.Chan of - None => error Channel disappeared - | Some ch => return ch - - fun join () = - ch <- getCh (); - subscribe ch - fun onload () = let fun listener () = @@ -45,13 +35,16 @@ set logTail newTail; listener () in - join (); listener () end + fun getRoom () = + r <- oneRow (SELECT t.Room FROM t WHERE t.Id = {[id]}); + return r.T.Room + fun speak line = - ch <- getCh (); - send ch line + room <- getRoom (); + Room.send room line fun doSpeak () = line <- get newLine; @@ -84,7 +77,8 @@ let fun create r = id <- nextval s; - dml (INSERT INTO t (Id, Title, Chan) VALUES ({[id]}, {[r.Title]}, NULL)); + room <- Room.create; + dml (INSERT INTO t (Id, Title, Room) VALUES ({[id]}, {[r.Title]}, {[room]})); main () in ls <- list ();