# HG changeset patch # User Adam Chlipala # Date 1237748707 14400 # Node ID b0c1a46b1f15c920f0d9d090516ecb9fda2cd681 # Parent a93d5324f400069f8c4fdcf0fd61291a5e1b6fd0 First message send delivered, but not interpreted diff -r a93d5324f400 -r b0c1a46b1f15 include/types.h --- a/include/types.h Thu Mar 19 16:34:13 2009 -0400 +++ b/include/types.h Sun Mar 22 15:05:07 2009 -0400 @@ -17,6 +17,7 @@ typedef uw_Basis_string uw_Basis_xhtml; typedef uw_Basis_string uw_Basis_page; +typedef size_t uw_Basis_channel; typedef enum { SUCCESS, FATAL, BOUNDED_RETRY, UNLIMITED_RETRY } failure_kind; @@ -25,3 +26,4 @@ #define INTS_MAX 50 #define FLOATS_MAX 100 #define TIMES_MAX 100 + diff -r a93d5324f400 -r b0c1a46b1f15 include/urweb.h --- a/include/urweb.h Thu Mar 19 16:34:13 2009 -0400 +++ b/include/urweb.h Sun Mar 22 15:05:07 2009 -0400 @@ -46,13 +46,14 @@ void uw_set_script_header(uw_context, const char*); const char *uw_Basis_get_script(uw_context, uw_unit); -const char *uw_Basis_get_listener(uw_context, uw_unit); +const char *uw_Basis_get_listener(uw_context, uw_Basis_string); char *uw_Basis_htmlifyInt(uw_context, uw_Basis_int); char *uw_Basis_htmlifyFloat(uw_context, uw_Basis_float); char *uw_Basis_htmlifyString(uw_context, uw_Basis_string); char *uw_Basis_htmlifyBool(uw_context, uw_Basis_bool); char *uw_Basis_htmlifyTime(uw_context, uw_Basis_time); +char *uw_Basis_htmlifyChannel(uw_context, uw_Basis_channel); uw_unit uw_Basis_htmlifyInt_w(uw_context, uw_Basis_int); uw_unit uw_Basis_htmlifyFloat_w(uw_context, uw_Basis_float); @@ -84,6 +85,7 @@ uw_Basis_string uw_Basis_unurlifyString(uw_context, char **); uw_Basis_bool uw_Basis_unurlifyBool(uw_context, char **); uw_Basis_time uw_Basis_unurlifyTime(uw_context, char **); +uw_Basis_channel uw_Basis_unurlifyChannel(uw_context, char **); uw_Basis_string uw_Basis_strcat(uw_context, const char *, const char *); uw_Basis_string uw_Basis_strdup(uw_context, const char *); @@ -126,3 +128,7 @@ uw_Basis_string uw_Basis_get_cookie(uw_context, uw_Basis_string c); uw_unit uw_Basis_set_cookie(uw_context, uw_Basis_string prefix, uw_Basis_string c, uw_Basis_string v); + +uw_Basis_channel uw_Basis_new_channel(uw_context, uw_unit); +uw_unit uw_Basis_subscribe(uw_context, uw_Basis_channel); +uw_unit uw_Basis_send(uw_context, uw_Basis_channel, uw_Basis_string); diff -r a93d5324f400 -r b0c1a46b1f15 lib/js/urweb.js --- a/lib/js/urweb.js Thu Mar 19 16:34:13 2009 -0400 +++ b/lib/js/urweb.js Sun Mar 22 15:05:07 2009 -0400 @@ -133,7 +133,11 @@ } -function getXHR() +var client_id = 0; +var client_pass = 0; +var url_prefix = "/"; + +function getXHR(uri) { try { return new XMLHttpRequest(); @@ -150,6 +154,17 @@ } } +function requestUri(xhr, uri) { + xhr.open("GET", uri, true); + + if (client_id != 0) { + xhr.setRequestHeader("UrWeb-Client", client_id.toString()); + xhr.setRequestHeader("UrWeb-Pass", client_pass.toString()); + } + + xhr.send(null); +} + function rc(uri, parse, k) { var xhr = getXHR(); @@ -171,15 +186,10 @@ } }; - xhr.open("GET", uri, true); - xhr.send(null); + requestUri(xhr, uri); } -var client_id = 0; -var client_pass = 0; -var url_prefix = "/"; - function path_join(s1, s2) { if (s1.length > 0 && s1[s1.length-1] == '/') return s1 + s2; @@ -188,6 +198,7 @@ } function listener() { + var uri = path_join(url_prefix, ".msgs"); var xhr = getXHR(); xhr.onreadystatechange = function() { @@ -199,8 +210,10 @@ isok = true; } catch (e) { } - if (isok) + if (isok) { alert("Messages: " + xhr.responseText); + requestUri(xhr, uri); + } else { alert("Error querying remote server for messages!"); throw "Error querying remote server for messages!"; @@ -208,6 +221,5 @@ } }; - xhr.open("GET", path_join(url_prefix, ".msgs/" + client_id + "/" + client_pass), true); - xhr.send(null); + requestUri(xhr, uri); } diff -r a93d5324f400 -r b0c1a46b1f15 lib/ur/basis.urs --- a/lib/ur/basis.urs Thu Mar 19 16:34:13 2009 -0400 +++ b/lib/ur/basis.urs Sun Mar 22 15:05:07 2009 -0400 @@ -367,7 +367,7 @@ val head : unit -> tag [] html head [] [] val title : unit -> tag [] head [] [] [] -val body : unit -> tag [] html body [] [] +val body : unit -> tag [Onload = transaction unit] html body [] [] con bodyTag = fn (attrs :: {Type}) => ctx ::: {Unit} -> [[Body] ~ ctx] => @@ -452,3 +452,11 @@ (** Aborting *) val error : t ::: Type -> xml [Body] [] [] -> t + + +(** Channels *) + +con channel :: Type -> Type +val channel : t ::: Type -> transaction (channel t) +val subscribe : t ::: Type -> channel t -> transaction unit +val send : t ::: Type -> channel t -> t -> transaction unit diff -r a93d5324f400 -r b0c1a46b1f15 src/c/driver.c --- a/src/c/driver.c Thu Mar 19 16:34:13 2009 -0400 +++ b/src/c/driver.c Sun Mar 22 15:05:07 2009 -0400 @@ -138,7 +138,6 @@ if (s = strstr(buf, "\r\n\r\n")) { failure_kind fk; char *cmd, *path, *headers, path_copy[uw_bufsize+1], *inputs; - int id, pass; s[2] = 0; @@ -169,9 +168,18 @@ break; } - if (sscanf(path, "/.msgs/%d/%d", &id, &pass) == 2) { - uw_client_connect(id, pass, sock); - dont_close = 1; + uw_set_headers(ctx, headers); + + if (!strcmp(path, "/.msgs")) { + char *id = uw_Basis_requestHeader(ctx, "UrWeb-Client"); + char *pass = uw_Basis_requestHeader(ctx, "UrWeb-Pass"); + + if (id && pass) { + size_t idn = atoi(id); + uw_client_connect(idn, atoi(pass), sock); + dont_close = 1; + fprintf(stderr, "Processed request for messages by client %d\n\n", (int)idn); + } break; } @@ -197,8 +205,6 @@ printf("Serving URI %s....\n", path); - uw_set_headers(ctx, headers); - while (1) { if (uw_db_begin(ctx)) { printf("Error running SQL BEGIN\n"); diff -r a93d5324f400 -r b0c1a46b1f15 src/c/urweb.c --- a/src/c/urweb.c Thu Mar 19 16:34:13 2009 -0400 +++ b/src/c/urweb.c Sun Mar 22 15:05:07 2009 -0400 @@ -76,8 +76,14 @@ return b->back - b->start; } +static void buf_append(buf *b, const char *s, size_t len) { + buf_check(b, len); + memcpy(b->front, s, len); + b->front += len; +} -// Cross-request state + +// Persistent client state typedef enum { UNUSED, USED } usage; @@ -101,21 +107,6 @@ static pthread_mutex_t clients_mutex = PTHREAD_MUTEX_INITIALIZER; -void uw_global_init() { - srand(time(NULL) ^ getpid()); - - clients = malloc(0); -} - -void uw_global_free() { - size_t i; - - for (i = 0; i < n_clients; ++i) - free(clients[i]); - - free(clients); -} - static client *uw_new_client() { client *c; @@ -147,24 +138,33 @@ static const char begin_msgs[] = "HTTP/1.1 200 OK\r\nContent-type: text/plain\r\n\r\n"; -void uw_client_connect(size_t id, int pass, int sock) { +static client *uw_find_client(size_t id) { client *c; pthread_mutex_lock(&clients_mutex); if (id >= n_clients) { pthread_mutex_unlock(&clients_mutex); - close(sock); - fprintf(stderr, "Out-of-bounds client request (%d)\n", (int)id); - return; + return NULL; } c = clients[id]; if (c->mode != USED) { pthread_mutex_unlock(&clients_mutex); + return NULL; + } + + pthread_mutex_unlock(&clients_mutex); + return c; +} + +void uw_client_connect(size_t id, int pass, int sock) { + client *c = uw_find_client(id); + + if (c == NULL) { close(sock); - fprintf(stderr, "Client request for unused ID (%d)\n", (int)id); + fprintf(stderr, "Out-of-bounds client request (%d)\n", (int)id); return; } @@ -172,7 +172,6 @@ if (pass != c->data.used.pass) { pthread_mutex_unlock(&c->data.used.lock); - pthread_mutex_unlock(&clients_mutex); close(sock); fprintf(stderr, "Wrong client password (%d)\n", (int)id); return; @@ -180,7 +179,6 @@ if (c->data.used.sock != -1) { pthread_mutex_unlock(&c->data.used.lock); - pthread_mutex_unlock(&clients_mutex); close(sock); fprintf(stderr, "Duplicate client connection (%d)\n", (int)id); return; @@ -193,15 +191,10 @@ uw_really_send(sock, c->data.used.msgs.start, buf_used(&c->data.used.msgs)); close(sock); } - else { - uw_really_send(sock, begin_msgs, sizeof(begin_msgs) - 1); - uw_really_send(sock, "Hi!", 3); - close(sock); - //c->data.used.sock = sock; - } + else + c->data.used.sock = sock; pthread_mutex_unlock(&c->data.used.lock); - pthread_mutex_unlock(&clients_mutex); } static void uw_free_client(client *c) { @@ -234,6 +227,150 @@ } +// Persistent channel state + +typedef struct client_list { + client *data; + struct client_list *next; +} client_list; + +typedef struct channel { + size_t id; + usage mode; + union { + struct channel *next; + struct { + pthread_mutex_t lock; + client_list *clients; + } used; + } data; +} channel; + +static channel **channels, *channels_free; +static size_t n_channels; + +static pthread_mutex_t channels_mutex = PTHREAD_MUTEX_INITIALIZER; + +static channel *uw_new_channel() { + channel *ch; + + pthread_mutex_lock(&channels_mutex); + + if (channels_free) { + ch = channels_free; + channels_free = channels_free->data.next; + } + else { + ++n_channels; + channels = realloc(channels, sizeof(channels) * n_channels); + ch = malloc(sizeof(channel)); + ch->id = n_channels-1; + channels[n_channels-1] = ch; + } + + ch->mode = USED; + pthread_mutex_init(&ch->data.used.lock, NULL); + ch->data.used.clients = NULL; + + pthread_mutex_unlock(&channels_mutex); + + return ch; +} + +static channel *uw_find_channel(size_t id) { + channel *ch = NULL; + + pthread_mutex_lock(&channels_mutex); + + if (id < n_channels && channels[id]->mode == USED) + ch = channels[id]; + + pthread_mutex_unlock(&channels_mutex); + + return ch; +} + +static void uw_subscribe(channel *ch, client *c) { + client_list *cs = malloc(sizeof(client_list)); + + pthread_mutex_lock(&ch->data.used.lock); + + cs->data = c; + cs->next = ch->data.used.clients; + ch->data.used.clients = cs; + + pthread_mutex_unlock(&ch->data.used.lock); +} + +static void uw_unsubscribe(channel *ch, client *c) { + client_list *prev, *cur, *tmp; + + pthread_mutex_lock(&ch->data.used.lock); + + for (prev = NULL, cur = ch->data.used.clients; cur; ) { + if (cur->data == c) { + if (prev) + prev->next = cur->next; + else + ch->data.used.clients = cur->next; + tmp = cur; + cur = cur->next; + free(tmp); + } + else { + prev = cur; + cur = cur->next; + } + } + + pthread_mutex_unlock(&ch->data.used.lock); +} + +static void uw_channel_send(channel *ch, const char *msg) { + size_t len = strlen(msg), preLen; + char pre[INTS_MAX + 2]; + client_list *cs; + + sprintf(pre, "%d\n", (int)ch->id); + preLen = strlen(pre); + + pthread_mutex_lock(&ch->data.used.lock); + + for (cs = ch->data.used.clients; cs; cs = cs->next) { + client *c = cs->data; + + pthread_mutex_lock(&c->data.used.lock); + + if (c->data.used.sock != -1) { + uw_really_send(c->data.used.sock, begin_msgs, sizeof(begin_msgs) - 1); + uw_really_send(c->data.used.sock, pre, preLen); + uw_really_send(c->data.used.sock, msg, len); + uw_really_send(c->data.used.sock, "\n", 1); + close(c->data.used.sock); + c->data.used.sock = -1; + } else { + buf_append(&c->data.used.msgs, pre, preLen); + buf_append(&c->data.used.msgs, msg, len); + buf_append(&c->data.used.msgs, "\n", 1); + } + + pthread_mutex_unlock(&c->data.used.lock); + } + + pthread_mutex_unlock(&ch->data.used.lock); +} + + +// Global entry points + +void uw_global_init() { + srand(time(NULL) ^ getpid()); + + clients = malloc(0); + channels = malloc(0); +} + + // Single-request state #define ERROR_BUF_LEN 1024 @@ -582,11 +719,17 @@ } } -const char *uw_Basis_get_listener(uw_context ctx, uw_unit u) { +const char *uw_Basis_get_listener(uw_context ctx, uw_Basis_string onload) { if (ctx->script_header[0] == 0) return ""; - else + else if (onload[0] == 0) return " onload='listener()'"; + else { + uw_Basis_string s = uw_malloc(ctx, strlen(onload) + 22); + + sprintf(s, " onload='listener();%s'", onload); + return s; + } } uw_Basis_string uw_Basis_jsifyString(uw_context ctx, uw_Basis_string s) { @@ -941,6 +1084,10 @@ return r; } +uw_Basis_channel uw_Basis_unurlifyChannel(uw_context ctx, char **s) { + return uw_Basis_unurlifyInt(ctx, s); +} + uw_Basis_float uw_Basis_unurlifyFloat(uw_context ctx, char **s) { char *new_s = uw_unurlify_advance(*s); uw_Basis_float r; @@ -1032,6 +1179,10 @@ return uw_unit_v; } +char *uw_Basis_htmlifyChannel(uw_context ctx, uw_Basis_channel ch) { + return uw_Basis_htmlifyInt(ctx, ch); +} + char *uw_Basis_htmlifyFloat(uw_context ctx, uw_Basis_float n) { int len; char *r; @@ -1575,4 +1726,34 @@ return uw_unit_v; } +uw_Basis_channel uw_Basis_new_channel(uw_context ctx, uw_unit u) { + return uw_new_channel()->id; +} +uw_unit uw_Basis_subscribe(uw_context ctx, uw_Basis_channel chn) { + channel *ch = uw_find_channel(chn); + + if (ch == NULL) + uw_error(ctx, FATAL, "Bad channel ID %d", (int)chn); + else { + size_t id = atoi(uw_Basis_requestHeader(ctx, "UrWeb-Client")); + int pass = atoi(uw_Basis_requestHeader(ctx, "UrWeb-Pass")); + client *c = uw_find_client(id); + + if (c == NULL) + uw_error(ctx, FATAL, "Unknown client ID in subscription request"); + else if (c->data.used.pass != pass) + uw_error(ctx, FATAL, "Wrong client password in subscription request"); + else + uw_subscribe(ch, c); + } +} + +uw_unit uw_Basis_send(uw_context ctx, uw_Basis_channel chn, uw_Basis_string msg) { + channel *ch = uw_find_channel(chn); + + if (ch == NULL) + uw_error(ctx, FATAL, "Bad channel ID %d", (int)chn); + else + uw_channel_send(ch, msg); +} diff -r a93d5324f400 -r b0c1a46b1f15 src/jscomp.sml --- a/src/jscomp.sml Thu Mar 19 16:34:13 2009 -0400 +++ b/src/jscomp.sml Sun Mar 22 15:05:07 2009 -0400 @@ -48,7 +48,8 @@ (("Basis", "stringToInt_error"), "pi"), (("Basis", "urlifyInt"), "ts"), (("Basis", "urlifyFloat"), "ts"), - (("Basis", "urlifyString"), "escape")] + (("Basis", "urlifyString"), "escape"), + (("Basis", "urlifyChannel"), "ts")] structure FM = BinaryMapFn(struct type ord_key = string * string @@ -216,6 +217,7 @@ | TFfi ("Basis", "string") => ((EFfiApp ("Basis", "jsifyString", [e]), loc), st) | TFfi ("Basis", "int") => ((EFfiApp ("Basis", "htmlifyInt", [e]), loc), st) | TFfi ("Basis", "float") => ((EFfiApp ("Basis", "htmlifyFloat", [e]), loc), st) + | TFfi ("Basis", "channel") => ((EFfiApp ("Basis", "htmlifyChannel", [e]), loc), st) | TFfi ("Basis", "bool") => ((ECase (e, [((PCon (Enum, PConFfi {mod = "Basis", diff -r a93d5324f400 -r b0c1a46b1f15 src/mono_reduce.sml --- a/src/mono_reduce.sml Thu Mar 19 16:34:13 2009 -0400 +++ b/src/mono_reduce.sml Sun Mar 22 15:05:07 2009 -0400 @@ -57,6 +57,9 @@ | EFfiApp ("Basis", "new_client_source", _) => true | EFfiApp ("Basis", "set_client_source", _) => true | EFfiApp ("Basis", "alert", _) => true + | EFfiApp ("Basis", "new_channel", _) => true + | EFfiApp ("Basis", "subscribe", _) => true + | EFfiApp ("Basis", "send", _) => true | EFfiApp _ => false | EApp ((EFfi _, _), _) => false | EApp _ => true @@ -256,6 +259,8 @@ fun summarize d (e, _) = let + fun ffi es = List.concat (map (summarize d) es) @ [Unsure] + val s = case e of EPrim _ => [] @@ -266,10 +271,13 @@ | ENone _ => [] | ESome (_, e) => summarize d e | EFfi _ => [] - | EFfiApp ("Basis", "set_cookie", es) => List.concat (map (summarize d) es) @ [Unsure] - | EFfiApp ("Basis", "new_client_source", es) => List.concat (map (summarize d) es) @ [Unsure] - | EFfiApp ("Basis", "set_client_source", es) => List.concat (map (summarize d) es) @ [Unsure] - | EFfiApp ("Basis", "alert", es) => List.concat (map (summarize d) es) @ [Unsure] + | EFfiApp ("Basis", "set_cookie", es) => ffi es + | EFfiApp ("Basis", "new_client_source", es) => ffi es + | EFfiApp ("Basis", "set_client_source", es) => ffi es + | EFfiApp ("Basis", "alert", es) => ffi es + | EFfiApp ("Basis", "new_channel", es) => ffi es + | EFfiApp ("Basis", "subscribe", es) => ffi es + | EFfiApp ("Basis", "send", es) => ffi es | EFfiApp (_, _, es) => List.concat (map (summarize d) es) | EApp ((EFfi _, _), e) => summarize d e | EApp _ => diff -r a93d5324f400 -r b0c1a46b1f15 src/monoize.sml --- a/src/monoize.sml Thu Mar 19 16:34:13 2009 -0400 +++ b/src/monoize.sml Sun Mar 22 15:05:07 2009 -0400 @@ -180,6 +180,9 @@ | L.CApp ((L.CFfi ("Basis", "sql_nfunc"), _), _) => (L'.TFfi ("Basis", "string"), loc) + | L.CApp ((L.CFfi ("Basis", "channel"), _), _) => + (L'.TFfi ("Basis", "channel"), loc) + | L.CRel _ => poly () | L.CNamed n => (case IM.find (dtmap, n) of @@ -1081,6 +1084,34 @@ fm) end + | L.ECApp ((L.EFfi ("Basis", "channel"), _), t) => + ((L'.EAbs ("_", (L'.TRecord [], loc), (L'.TFfi ("Basis", "channel"), loc), + (L'.EFfiApp ("Basis", "new_channel", [(L'.ERecord [], loc)]), loc)), loc), + fm) + | L.ECApp ((L.EFfi ("Basis", "subscribe"), _), t) => + ((L'.EAbs ("ch", (L'.TFfi ("Basis", "channel"), loc), + (L'.TFun ((L'.TRecord [], loc), (L'.TRecord [], loc)), loc), + (L'.EAbs ("_", (L'.TRecord [], loc), (L'.TRecord [], loc), + (L'.EFfiApp ("Basis", "subscribe", + [(L'.ERel 1, loc)]), + loc)), loc)), loc), + fm) + | L.ECApp ((L.EFfi ("Basis", "send"), _), t) => + let + val t = monoType env t + val (e, fm) = urlifyExp env fm ((L'.ERel 1, loc), t) + in + ((L'.EAbs ("ch", (L'.TFfi ("Basis", "channel"), loc), + (L'.TFun (t, (L'.TFun ((L'.TRecord [], loc), (L'.TRecord [], loc)), loc)), loc), + (L'.EAbs ("v", t, (L'.TFun ((L'.TRecord [], loc), (L'.TRecord [], loc)), loc), + (L'.EAbs ("_", (L'.TRecord [], loc), (L'.TRecord [], loc), + (L'.EFfiApp ("Basis", "send", + [(L'.ERel 2, loc), + e]), + loc)), loc)), loc)), loc), + fm) + end + | L.EFfiApp ("Basis", "dml", [e]) => let val (e, fm) = monoExp (env, st, fm) e @@ -1781,6 +1812,14 @@ L'.ERecord xes => xes | _ => raise Fail "Non-record attributes!" + fun findOnload (attrs, acc) = + case attrs of + [] => (NONE, acc) + | ("Onload", e, _) :: rest => (SOME e, List.revAppend (acc, rest)) + | x :: rest => findOnload (rest, x :: acc) + + val (onload, attrs) = findOnload (attrs, []) + fun lowercaseFirst "" = "" | lowercaseFirst s = String.str (Char.toLower (String.sub (s, 0))) ^ String.extract (s, 1, NONE) @@ -1924,9 +1963,21 @@ end in case tag of - "body" => normal ("body", - SOME (L'.EFfiApp ("Basis", "get_listener", [(L'.ERecord [], loc)]), loc), - SOME (L'.EFfiApp ("Basis", "get_script", [(L'.ERecord [], loc)]), loc)) + "body" => + let + val onload = case onload of + NONE => (L'.EPrim (Prim.String ""), loc) + | SOME e => + let + val e = (L'.EApp (e, (L'.ERecord [], loc)), loc) + in + (L'.EJavaScript (L'.Attribute, e, NONE), loc) + end + in + normal ("body", + SOME (L'.EFfiApp ("Basis", "get_listener", [onload]), loc), + SOME (L'.EFfiApp ("Basis", "get_script", [(L'.ERecord [], loc)]), loc)) + end | "dyn" => (case attrs of diff -r a93d5324f400 -r b0c1a46b1f15 src/rpcify.sml --- a/src/rpcify.sml Thu Mar 19 16:34:13 2009 -0400 +++ b/src/rpcify.sml Sun Mar 22 15:05:07 2009 -0400 @@ -50,7 +50,10 @@ ["requestHeader", "query", "dml", - "nextval"]) + "nextval", + "new_channel", + "subscribe", + "send"]) val csBasis = SS.addList (SS.empty, ["source", diff -r a93d5324f400 -r b0c1a46b1f15 src/scriptcheck.sml --- a/src/scriptcheck.sml Thu Mar 19 16:34:13 2009 -0400 +++ b/src/scriptcheck.sml Sun Mar 22 15:05:07 2009 -0400 @@ -38,7 +38,10 @@ val csBasis = SS.addList (SS.empty, ["new_client_source", "get_client_source", - "set_client_source"]) + "set_client_source", + "new_channel", + "subscribe", + "recv"]) val scriptWords = [" + + + end diff -r a93d5324f400 -r b0c1a46b1f15 tests/channel.urp --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/channel.urp Sun Mar 22 15:05:07 2009 -0400 @@ -0,0 +1,3 @@ +debug + +channel