changeset 682:5bbb542243e8

Redo channels, making them single-client
author Adam Chlipala <adamc@hcoop.net>
date Sun, 29 Mar 2009 11:37:29 -0400 (2009-03-29)
parents 6c9b8875f347
children 9a2c18dab11d
files include/types.h include/urweb.h lib/js/urweb.js lib/ur/basis.urs lib/ur/top.ur lib/ur/top.urs src/c/driver.c src/c/urweb.c src/cjr_print.sml src/jscomp.sml src/monoize.sml src/prepare.sml tests/chat.ur
diffstat 13 files changed, 433 insertions(+), 544 deletions(-) [+]
line wrap: on
line diff
--- a/include/types.h	Sat Mar 28 11:15:42 2009 -0400
+++ b/include/types.h	Sun Mar 29 11:37:29 2009 -0400
@@ -17,7 +17,11 @@
 
 typedef uw_Basis_string uw_Basis_xhtml;
 typedef uw_Basis_string uw_Basis_page;
-typedef size_t uw_Basis_channel;
+
+typedef unsigned uw_Basis_client;
+typedef struct {
+  unsigned cli, chn;
+} uw_Basis_channel;
 
 
 typedef enum { SUCCESS, FATAL, BOUNDED_RETRY, UNLIMITED_RETRY } failure_kind;
--- a/include/urweb.h	Sat Mar 28 11:15:42 2009 -0400
+++ b/include/urweb.h	Sun Mar 29 11:37:29 2009 -0400
@@ -8,7 +8,7 @@
 
 void uw_global_init(void);
 
-void uw_client_connect(size_t id, int pass, int sock);
+void uw_client_connect(unsigned id, int pass, int sock);
 void uw_prune_clients(time_t timeout);
 
 uw_context uw_init(size_t outHeaders_len, size_t script_len, size_t page_len, size_t heap_len);
@@ -22,6 +22,7 @@
 failure_kind uw_begin_init(uw_context);
 void uw_set_headers(uw_context, char *headers);
 failure_kind uw_begin(uw_context, char *path);
+void uw_login(uw_context);
 void uw_commit(uw_context);
 int uw_rollback(uw_context);
 
@@ -55,7 +56,6 @@
 char *uw_Basis_htmlifyString(uw_context, uw_Basis_string);
 char *uw_Basis_htmlifyBool(uw_context, uw_Basis_bool);
 char *uw_Basis_htmlifyTime(uw_context, uw_Basis_time);
-char *uw_Basis_htmlifyChannel(uw_context, uw_Basis_channel);
 
 uw_unit uw_Basis_htmlifyInt_w(uw_context, uw_Basis_int);
 uw_unit uw_Basis_htmlifyFloat_w(uw_context, uw_Basis_float);
@@ -66,7 +66,9 @@
 char *uw_Basis_attrifyInt(uw_context, uw_Basis_int);
 char *uw_Basis_attrifyFloat(uw_context, uw_Basis_float);
 char *uw_Basis_attrifyString(uw_context, uw_Basis_string);
+char *uw_Basis_attrifyTime(uw_context, uw_Basis_time);
 char *uw_Basis_attrifyChannel(uw_context, uw_Basis_channel);
+char *uw_Basis_attrifyClient(uw_context, uw_Basis_client);
 
 uw_unit uw_Basis_attrifyInt_w(uw_context, uw_Basis_int);
 uw_unit uw_Basis_attrifyFloat_w(uw_context, uw_Basis_float);
@@ -90,7 +92,6 @@
 uw_Basis_string uw_Basis_unurlifyString(uw_context, char **);
 uw_Basis_bool uw_Basis_unurlifyBool(uw_context, char **);
 uw_Basis_time uw_Basis_unurlifyTime(uw_context, char **);
-uw_Basis_channel uw_Basis_unurlifyChannel(uw_context, char **);
 
 uw_Basis_string uw_Basis_strcat(uw_context, const char *, const char *);
 uw_Basis_string uw_Basis_strdup(uw_context, const char *);
@@ -102,6 +103,7 @@
 uw_Basis_string uw_Basis_sqlifyBool(uw_context, uw_Basis_bool);
 uw_Basis_string uw_Basis_sqlifyTime(uw_context, uw_Basis_time);
 uw_Basis_string uw_Basis_sqlifyChannel(uw_context, uw_Basis_channel);
+uw_Basis_string uw_Basis_sqlifyClient(uw_context, uw_Basis_client);
 
 uw_Basis_string uw_Basis_sqlifyIntN(uw_context, uw_Basis_int*);
 uw_Basis_string uw_Basis_sqlifyFloatN(uw_context, uw_Basis_float*);
@@ -112,6 +114,7 @@
 char *uw_Basis_ensqlBool(uw_Basis_bool);
 
 char *uw_Basis_jsifyString(uw_context, uw_Basis_string);
+char *uw_Basis_jsifyChannel(uw_context, uw_Basis_channel);
 
 uw_Basis_string uw_Basis_intToString(uw_context, uw_Basis_int);
 uw_Basis_string uw_Basis_floatToString(uw_context, uw_Basis_float);
@@ -122,13 +125,13 @@
 uw_Basis_float *uw_Basis_stringToFloat(uw_context, uw_Basis_string);
 uw_Basis_bool *uw_Basis_stringToBool(uw_context, uw_Basis_string);
 uw_Basis_time *uw_Basis_stringToTime(uw_context, uw_Basis_string);
-uw_Basis_channel *uw_Basis_stringToChannel(uw_context, uw_Basis_string);
 
 uw_Basis_int uw_Basis_stringToInt_error(uw_context, uw_Basis_string);
 uw_Basis_float uw_Basis_stringToFloat_error(uw_context, uw_Basis_string);
 uw_Basis_bool uw_Basis_stringToBool_error(uw_context, uw_Basis_string);
 uw_Basis_time uw_Basis_stringToTime_error(uw_context, uw_Basis_string);
 uw_Basis_channel uw_Basis_stringToChannel_error(uw_context, uw_Basis_string);
+uw_Basis_client uw_Basis_stringToClient_error(uw_context, uw_Basis_string);
 
 uw_Basis_string uw_Basis_requestHeader(uw_context, uw_Basis_string);
 
@@ -138,5 +141,6 @@
 uw_unit uw_Basis_set_cookie(uw_context, uw_Basis_string prefix, uw_Basis_string c, uw_Basis_string v);
 
 uw_Basis_channel uw_Basis_new_channel(uw_context, uw_unit);
-uw_unit uw_Basis_subscribe(uw_context, uw_Basis_channel);
 uw_unit uw_Basis_send(uw_context, uw_Basis_channel, uw_Basis_string);
+
+uw_Basis_client uw_Basis_self(uw_context, uw_unit);
--- a/lib/js/urweb.js	Sat Mar 28 11:15:42 2009 -0400
+++ b/lib/js/urweb.js	Sun Mar 29 11:37:29 2009 -0400
@@ -301,6 +301,9 @@
 }
 
 function rv(chn, parse, k) {
+  if (chn == null)
+    return;
+
   if (chn < 0)
     whine("Out-of-bounds channel receive");
 
--- a/lib/ur/basis.urs	Sat Mar 28 11:15:42 2009 -0400
+++ b/lib/ur/basis.urs	Sun Mar 29 11:37:29 2009 -0400
@@ -115,6 +115,9 @@
 val send : t ::: Type -> channel t -> t -> transaction unit
 val recv : t ::: Type -> channel t -> transaction t
 
+type client
+val self : transaction client
+
 
 (** SQL *)
 
@@ -207,6 +210,7 @@
 
 class sql_injectable_nullable
 val sql_channel : t ::: Type -> sql_injectable_nullable (channel t)
+val sql_client : sql_injectable_nullable client
 
 class sql_injectable
 val sql_prim : t ::: Type -> sql_injectable_prim t -> sql_injectable t
--- a/lib/ur/top.ur	Sat Mar 28 11:15:42 2009 -0400
+++ b/lib/ur/top.ur	Sun Mar 29 11:37:29 2009 -0400
@@ -143,6 +143,14 @@
                <xml>{f [nm] [t] [rest] ! r1 r2}{acc}</xml>)
            <xml/>
 
+fun queryI (tables ::: {{Type}}) (exps ::: {Type})
+           [tables ~ exps] (q : sql_query tables exps)
+           (f : $(exps ++ map (fn fields :: {Type} => $fields) tables)
+                -> transaction unit) =
+    query q
+          (fn fs _ => f fs)
+          ()
+
 fun queryX (tables ::: {{Type}}) (exps ::: {Type}) (ctx ::: {Unit})
            [tables ~ exps] (q : sql_query tables exps)
            (f : $(exps ++ map (fn fields :: {Type} => $fields) tables)
@@ -188,3 +196,39 @@
     case e2 of
         None => (SQL {e1} IS NULL)
       | Some _ => sql_binary sql_eq e1 (sql_inject e2)
+
+
+functor Broadcast(M : sig type t end) = struct
+    sequence s
+    table t : {Id : int, Client : option client, Channel : option (channel M.t)}
+
+    type topic = int
+
+    val inj : sql_injectable topic = _
+
+    val create = nextval s
+
+    val cleanup =
+        dml (DELETE FROM t WHERE Client IS NULL)
+
+    fun subscribe id =
+        cli <- self;
+        cleanup;
+        ro <- oneOrNoRows (SELECT t.Channel FROM t WHERE t.Id = {[id]} AND t.Client = {[Some cli]});
+        case ro of
+            None =>
+            ch <- channel;
+            dml (INSERT INTO t (Id, Client, Channel) VALUES ({[id]}, {[Some cli]}, {[Some ch]}));
+            return ch
+          | Some r =>
+            case r.T.Channel of
+                None => error <xml>Broadcast.subscribe: Got null result</xml>
+              | Some ch => return ch
+
+    fun send id msg =
+        cleanup;
+        queryI (SELECT t.Channel FROM t WHERE t.Id = {[id]})
+        (fn r => case r.T.Channel of
+                     None => error <xml>Broadcast.send: Got null result</xml>
+                   | Some ch => Basis.send ch msg)
+end
--- a/lib/ur/top.urs	Sat Mar 28 11:15:42 2009 -0400
+++ b/lib/ur/top.urs	Sun Mar 29 11:37:29 2009 -0400
@@ -87,6 +87,13 @@
               -> r :: {K} -> folder r
               -> $(map tf1 r) -> $(map tf2 r) -> xml ctx [] []
 
+val queryI : tables ::: {{Type}} -> exps ::: {Type}
+             -> [tables ~ exps] =>
+             sql_query tables exps
+             -> ($(exps ++ map (fn fields :: {Type} => $fields) tables)
+                 -> transaction unit)
+             -> transaction unit
+
 val queryX : tables ::: {{Type}} -> exps ::: {Type} -> ctx ::: {Unit}
              -> [tables ~ exps] =>
              sql_query tables exps
@@ -127,3 +134,14 @@
                   -> sql_exp tables agg exps (option t)
                   -> option t
                   -> sql_exp tables agg exps bool
+
+
+functor Broadcast(M : sig type t end) : sig
+    type topic
+
+    val inj : sql_injectable topic
+
+    val create : transaction topic
+    val subscribe : topic -> transaction (channel M.t)
+    val send : topic -> M.t -> transaction unit
+end
--- a/src/c/driver.c	Sat Mar 28 11:15:42 2009 -0400
+++ b/src/c/driver.c	Sun Mar 29 11:37:29 2009 -0400
@@ -171,10 +171,10 @@
           char *pass = uw_Basis_requestHeader(ctx, "UrWeb-Pass");
 
           if (id && pass) {
-            size_t idn = atoi(id);
+            unsigned idn = atoi(id);
             uw_client_connect(idn, atoi(pass), sock);
             dont_close = 1;
-            fprintf(stderr, "Processed request for messages by client %d\n\n", (int)idn);
+            fprintf(stderr, "Processed request for messages by client %u\n\n", idn);
           }
           break;
         }
@@ -217,6 +217,8 @@
             else {
               printf("Fatal error (out of retries): %s\n", uw_error_message(ctx));
 
+              try_rollback(ctx);
+
               uw_reset_keep_error_message(ctx);
               uw_write_header(ctx, "HTTP/1.1 500 Internal Server Error\n\r");
               uw_write_header(ctx, "Content-type: text/plain\r\n");
@@ -224,8 +226,6 @@
               uw_write(ctx, uw_error_message(ctx));
               uw_write(ctx, "\n");
 
-              try_rollback(ctx);
-
               break;
             }
           } else if (fk == UNLIMITED_RETRY)
@@ -233,6 +233,8 @@
           else if (fk == FATAL) {
             printf("Fatal error: %s\n", uw_error_message(ctx));
 
+            try_rollback(ctx);
+
             uw_reset_keep_error_message(ctx);
             uw_write_header(ctx, "HTTP/1.1 500 Internal Server Error\r\n");
             uw_write_header(ctx, "Content-type: text/html\r\n");
@@ -241,26 +243,24 @@
             uw_write(ctx, uw_error_message(ctx));
             uw_write(ctx, "\n</body></html>");
 
-            try_rollback(ctx);
-
             break;
           } else {
             printf("Unknown uw_handle return code!\n");
 
+            try_rollback(ctx);
+
             uw_reset_keep_request(ctx);
             uw_write_header(ctx, "HTTP/1.1 500 Internal Server Error\n\r");
             uw_write_header(ctx, "Content-type: text/plain\r\n");
             uw_write(ctx, "Unknown uw_handle return code!\n");
 
-            try_rollback(ctx);
-
             break;
           }
 
-          uw_reset_keep_request(ctx);
-
           if (try_rollback(ctx))
             break;
+
+          uw_reset_keep_request(ctx);
         }
 
         uw_send(ctx, sock);
--- a/src/c/urweb.c	Sat Mar 28 11:15:42 2009 -0400
+++ b/src/c/urweb.c	Sun Mar 29 11:37:29 2009 -0400
@@ -7,6 +7,7 @@
 #include <ctype.h>
 #include <setjmp.h>
 #include <stdarg.h>
+#include <assert.h>
 
 #include <pthread.h>
 
@@ -88,79 +89,56 @@
 
 typedef enum { UNUSED, USED } usage;
 
-typedef struct channel_list {
-  struct channel *data;
-  struct channel_list *next;
-} channel_list;
-
 typedef struct client {
-  size_t id;
+  unsigned id;
   usage mode;
-  union {
-    struct client *next;
-    struct {
-      pthread_mutex_t lock;
-      int pass;
-      buf msgs;
-      int sock;
-      time_t last_contact;
-      unsigned refcount;
-      channel_list *channels;
-    } used;
-  } data;
+  int pass;
+  struct client *next;
+  pthread_mutex_t lock;
+  buf msgs;
+  int sock;
+  time_t last_contact;
+  unsigned n_channels;
 } client;
 
-typedef struct client_list {
-  client *data;
-  struct client_list *next;
-} client_list;
-
-typedef struct channel {
-  size_t id;
-  usage mode;
-  union {
-    struct channel *next;
-    struct {
-      pthread_mutex_t lock;
-      client_list *clients;
-      unsigned refcount;
-    } used;
-  } data;
-} channel;
-
 
 // Persistent client state
 
-static client **clients, *clients_free;
-static size_t n_clients;
+static client **clients, *clients_free, *clients_used;
+static unsigned n_clients;
 
 static pthread_mutex_t clients_mutex = PTHREAD_MUTEX_INITIALIZER;
 
-static client *uw_new_client() {
+static client *new_client() {
   client *c;
 
   pthread_mutex_lock(&clients_mutex);
 
   if (clients_free) {
     c = clients_free;
-    clients_free = clients_free->data.next;
+    clients_free = clients_free->next;
   }
   else {
     ++n_clients;
     clients = realloc(clients, sizeof(client) * n_clients);
     c = malloc(sizeof(client));
     c->id = n_clients-1;
+    pthread_mutex_init(&c->lock, NULL);
+    buf_init(&c->msgs, 0);
     clients[n_clients-1] = c;
   }
 
+  pthread_mutex_lock(&c->lock);
   c->mode = USED;
-  pthread_mutex_init(&c->data.used.lock, NULL);
-  c->data.used.pass = rand();
-  c->data.used.sock = -1;
-  c->data.used.last_contact = time(NULL);
-  buf_init(&c->data.used.msgs, 0);
-  c->data.used.refcount = 0;
-  c->data.used.channels = NULL;
+  c->pass = rand();
+  c->sock = -1;
+  c->last_contact = time(NULL);
+  buf_reset(&c->msgs);
+  c->n_channels = 0;
+  pthread_mutex_unlock(&c->lock);
+
+  c->next = clients_used;
+  clients_used = c;
 
   pthread_mutex_unlock(&clients_mutex);
 
@@ -169,7 +147,7 @@
 
 static const char begin_msgs[] = "HTTP/1.1 200 OK\r\nContent-type: text/plain\r\n\r\n";
 
-static client *uw_find_client(size_t id) {
+static client *find_client(unsigned id) {
   client *c;
 
   pthread_mutex_lock(&clients_mutex);
@@ -181,282 +159,111 @@
 
   c = clients[id];
 
-  if (c->mode != USED) {
-    pthread_mutex_unlock(&clients_mutex);
-    return NULL;
-  }
-  
-  pthread_mutex_lock(&c->data.used.lock);
-  ++c->data.used.refcount;
-  pthread_mutex_unlock(&c->data.used.lock);
   pthread_mutex_unlock(&clients_mutex);
   return c;
 }
 
-static void uw_release_client(client *c) {
-  pthread_mutex_lock(&c->data.used.lock);
-  --c->data.used.refcount;
-  pthread_mutex_unlock(&c->data.used.lock);
-}
-
-void uw_client_connect(size_t id, int pass, int sock) {
-  client *c = uw_find_client(id);
+void uw_client_connect(unsigned id, int pass, int sock) {
+  client *c = find_client(id);
 
   if (c == NULL) {
     close(sock);
-    fprintf(stderr, "Out-of-bounds client request (%d)\n", (int)id);
+    fprintf(stderr, "Out-of-bounds client request (%u)\n", id);
     return;
   }
 
-  uw_release_client(c);
+  pthread_mutex_lock(&c->lock);
 
-  pthread_mutex_lock(&c->data.used.lock);
-
-  if (pass != c->data.used.pass) {
-    pthread_mutex_unlock(&c->data.used.lock);
+  if (c->mode != USED) {
+    pthread_mutex_unlock(&c->lock);
     close(sock);
-    fprintf(stderr, "Wrong client password (%d)\n", (int)id);
+    fprintf(stderr, "Client request for unused slot (%u)\n", id);
     return;
   }
 
-  if (c->data.used.sock != -1) {
-    close(c->data.used.sock);
-    c->data.used.sock = -1;
+  if (pass != c->pass) {
+    pthread_mutex_unlock(&c->lock);
+    close(sock);
+    fprintf(stderr, "Wrong client password (%u, %d)\n", id, pass);
+    return;
   }
 
-  c->data.used.last_contact = time(NULL);
+  if (c->sock != -1) {
+    close(c->sock);
+    c->sock = -1;
+  }
 
-  if (buf_used(&c->data.used.msgs) > 0) {
+  c->last_contact = time(NULL);
+
+  if (buf_used(&c->msgs) > 0) {
     uw_really_send(sock, begin_msgs, sizeof(begin_msgs) - 1);
-    uw_really_send(sock, c->data.used.msgs.start, buf_used(&c->data.used.msgs));
-    buf_reset(&c->data.used.msgs);
+    uw_really_send(sock, c->msgs.start, buf_used(&c->msgs));
+    buf_reset(&c->msgs);
     close(sock);
   }
   else
-    c->data.used.sock = sock;
+    c->sock = sock;
 
-  pthread_mutex_unlock(&c->data.used.lock);
+  pthread_mutex_unlock(&c->lock);
 }
 
-
-static void uw_free_client(client *c) {
-  channel_list *chs;
-
+static void free_client(client *c) {
   printf("Freeing client %d\n", c->id);
 
-  if (c->mode == USED) {
-    pthread_mutex_lock(&c->data.used.lock);
+  c->mode = UNUSED;
+  c->pass = -1;
 
-    for (chs = c->data.used.channels; chs; ) {
-      client_list *prev, *cs;
-      
-      channel *ch = chs->data;
-      channel_list *tmp = chs->next;
-      free(chs);
-      chs = tmp;
-
-      pthread_mutex_lock(&ch->data.used.lock);
-      for (prev = NULL, cs = ch->data.used.clients; cs; ) {
-        if (cs->data == c) {
-          client_list *tmp = cs->next;
-          free(cs);
-          cs = tmp;
-          if (prev)
-            prev->next = cs;
-          else
-            ch->data.used.clients = cs;
-        }
-        else {
-          prev = cs;
-          cs = cs->next;
-        }
-      }
-      pthread_mutex_unlock(&ch->data.used.lock);
-    }
-
-    if (c->data.used.sock != -1)
-      close(c->data.used.sock);
-
-    pthread_mutex_unlock(&c->data.used.lock);
-    pthread_mutex_destroy(&c->data.used.lock);
-    buf_free(&c->data.used.msgs);
-    c->mode = UNUSED;
-
-    c->data.next = clients_free;
-    clients_free = c;
-  }
+  c->next = clients_free;
+  clients_free = c;
 }
 
 extern int uw_timeout;
 
 void uw_prune_clients() {
-  size_t i;
+  client *c, *next, *prev = NULL;
   time_t cutoff;
 
   cutoff = time(NULL) - uw_timeout;
 
   pthread_mutex_lock(&clients_mutex);
 
-  for (i = 0; i < n_clients; ++i) {
-    if (clients[i]->mode == USED && clients[i]->data.used.last_contact < cutoff
-        && clients[i]->data.used.refcount == 0)
-      uw_free_client(clients[i]);
+  for (c = clients_used; c; c = next) {
+    next = c->next;
+    pthread_mutex_lock(&c->lock);
+    if (c->last_contact < cutoff) {
+      if (prev)
+        prev->next = next;
+      else
+        clients_used = next;
+      free_client(c);
+    }
+    else
+      prev = c;
+    pthread_mutex_unlock(&c->lock);
   }
 
   pthread_mutex_unlock(&clients_mutex);
 }
 
-
-// Persistent channel state
-
-
-static channel **channels, *channels_free;
-static size_t n_channels;
-
-static pthread_mutex_t channels_mutex = PTHREAD_MUTEX_INITIALIZER;
-
-static channel *uw_new_channel() {
-  channel *ch;
-
-  pthread_mutex_lock(&channels_mutex);
-
-  if (channels_free) {
-    ch = channels_free;
-    channels_free = channels_free->data.next;
-  }
-  else {
-    ++n_channels;
-    channels = realloc(channels, sizeof(channels) * n_channels);
-    ch = malloc(sizeof(channel));
-    ch->id = n_channels-1;
-    channels[n_channels-1] = ch;
-  }
-
-  ch->mode = USED;
-  pthread_mutex_init(&ch->data.used.lock, NULL);
-  ch->data.used.clients = NULL;
-  ch->data.used.refcount = 0;
-
-  pthread_mutex_unlock(&channels_mutex);
-
+static uw_Basis_channel new_channel(client *c) {
+  uw_Basis_channel ch = {c->id, c->n_channels++};
   return ch;
 }
 
-static void uw_free_channel(channel *ch) {
-  if (ch->mode == USED) {
-    client_list *cs;
+static void client_send(int already_locked, client *c, buf *msg) {
+  if (!already_locked)
+    pthread_mutex_lock(&c->lock);
 
-    for (cs = ch->data.used.clients; cs; ) {
-      client_list *tmp = cs->next;
-      free(cs);
-      cs = tmp;
-    }
-    pthread_mutex_destroy(&ch->data.used.lock);
-    ch->mode = UNUSED;
-    ch->data.next = channels_free;
-    channels_free = ch;
-  }
-}
+  if (c->sock != -1) {
+    uw_really_send(c->sock, begin_msgs, sizeof(begin_msgs) - 1);
+    uw_really_send(c->sock, msg->start, buf_used(msg));
+    close(c->sock);
+    c->sock = -1;
+  } else
+    buf_append(&c->msgs, msg->start, buf_used(msg));
 
-static channel *uw_find_channel(size_t id) {
-  channel *ch = NULL;
-
-  pthread_mutex_lock(&channels_mutex);
-
-  if (id < n_channels && channels[id]->mode == USED) {
-    ch = channels[id];
-
-    pthread_mutex_lock(&ch->data.used.lock);
-    ++ch->data.used.refcount;
-    pthread_mutex_unlock(&ch->data.used.lock);
-  }
-
-  pthread_mutex_unlock(&channels_mutex);
-
-  return ch;
-}
-
-static void uw_release_channel(channel *ch) {
-  pthread_mutex_lock(&ch->data.used.lock);
-  ++ch->data.used.refcount;
-  pthread_mutex_unlock(&ch->data.used.lock);
-}
-
-static void uw_subscribe(channel *ch, client *c) {
-  client_list *cs;
-
-  pthread_mutex_lock(&ch->data.used.lock);
-
-  for (cs = ch->data.used.clients; cs; cs = cs->next)
-    if (cs->data == c) {
-      pthread_mutex_unlock(&ch->data.used.lock);
-      return;
-    }
-
-  cs = malloc(sizeof(client_list));
-  cs->data = c;
-  cs->next = ch->data.used.clients;
-  ch->data.used.clients = cs;
-
-  pthread_mutex_unlock(&ch->data.used.lock);
-}
-
-static void uw_unsubscribe(channel *ch, client *c) {
-  client_list *prev, *cur, *tmp;
-
-  pthread_mutex_lock(&ch->data.used.lock);
-
-  for (prev = NULL, cur = ch->data.used.clients; cur; ) {
-    if (cur->data == c) {
-      if (prev)
-        prev->next = cur->next;
-      else
-        ch->data.used.clients = cur->next;
-      tmp = cur;
-      cur = cur->next;
-      free(tmp);
-    }
-    else {
-      prev = cur;
-      cur = cur->next;
-    }
-  }
-
-  pthread_mutex_unlock(&ch->data.used.lock);
-}
-
-static void uw_channel_send(channel *ch, const char *msg) {
-  size_t len = strlen(msg), preLen;
-  char pre[INTS_MAX + 2];
-  client_list *cs;
-
-  sprintf(pre, "%d\n", (int)ch->id);
-  preLen = strlen(pre);
-
-  pthread_mutex_lock(&ch->data.used.lock);
-
-  for (cs = ch->data.used.clients; cs; cs = cs->next) {
-    client *c = cs->data;
-
-    pthread_mutex_lock(&c->data.used.lock);
-
-    if (c->data.used.sock != -1) {
-      uw_really_send(c->data.used.sock, begin_msgs, sizeof(begin_msgs) - 1);
-      uw_really_send(c->data.used.sock, pre, preLen);
-      uw_really_send(c->data.used.sock, msg, len);
-      uw_really_send(c->data.used.sock, "\n", 1);
-      close(c->data.used.sock);
-      c->data.used.sock = -1;
-    } else {
-      buf_append(&c->data.used.msgs, pre, preLen);
-      buf_append(&c->data.used.msgs, msg, len);
-      buf_append(&c->data.used.msgs, "\n", 1);
-    }
-
-    pthread_mutex_unlock(&c->data.used.lock);
-  }
-
-  pthread_mutex_unlock(&ch->data.used.lock);
+  if (!already_locked)
+    pthread_mutex_unlock(&c->lock);
 }
 
 
@@ -466,7 +273,6 @@
   srand(time(NULL) ^ getpid());
 
   clients = malloc(0);
-  channels = malloc(0);
 }
 
 
@@ -484,15 +290,9 @@
 } cleanup;
 
 typedef struct {
-  usage mode;
-  channel *ch;
-  enum { OLD, NEW } newness;
-
-  size_t n_subscribed;
-  client **subscribed;
-
+  unsigned client;
   buf msgs;
-} channel_delta;
+} delta;
 
 struct uw_context {
   char *headers, *headers_end;
@@ -512,11 +312,13 @@
 
   const char *script_header, *url_prefix;
 
-  size_t n_deltas;
-  channel_delta *deltas;
+  size_t n_deltas, used_deltas;
+  delta *deltas;
 
   int timeout;
 
+  client *client;
+
   char error_message[ERROR_BUF_LEN];
 };
 
@@ -548,7 +350,7 @@
 
   ctx->source_count = 0;
 
-  ctx->n_deltas = 0;
+  ctx->n_deltas = ctx->used_deltas = 0;
   ctx->deltas = malloc(0);
 
   ctx->timeout = uw_timeout;
@@ -574,10 +376,8 @@
   free(ctx->inputs);
   free(ctx->cleanup);
 
-  for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED; ++i) {
-    free(ctx->deltas[i].subscribed);
+  for (i = 0; i < ctx->n_deltas; ++i)
     buf_free(&ctx->deltas[i].msgs);
-  }
 
   free(ctx);
 }
@@ -591,8 +391,8 @@
   ctx->regions = NULL;
   ctx->cleanup_front = ctx->cleanup;
   ctx->source_count = 0;
-  if (ctx->n_deltas > 0)
-    ctx->deltas[0].mode = UNUSED;
+  ctx->used_deltas = 0;
+  ctx->client = NULL;
 }
 
 void uw_reset_keep_request(uw_context ctx) {
@@ -669,18 +469,73 @@
   ++ctx->cleanup_front;
 }
 
+uw_Basis_string uw_Basis_requestHeader(uw_context ctx, uw_Basis_string h) {
+  int len = strlen(h);
+  char *s = ctx->headers, *p;
+
+  while (p = strchr(s, ':')) {
+    if (p - s == len && !strncasecmp(s, h, len)) {
+      return p + 2;
+    } else {
+      if ((s = strchr(p, 0)) && s < ctx->headers_end)
+        s += 2;
+      else
+        return NULL;
+    }
+  }
+
+  return NULL;
+}
+
+void uw_login(uw_context ctx) {
+  if (ctx->script_header[0]) {
+    char *id_s, *pass_s;
+
+    if ((id_s = uw_Basis_requestHeader(ctx, "UrWeb-Client"))
+        && (pass_s = uw_Basis_requestHeader(ctx, "UrWeb-Pass"))) {
+      unsigned id = atoi(id_s);
+      int pass = atoi(pass_s);
+      client *c = find_client(id);
+
+      if (c == NULL)
+        uw_error(ctx, FATAL, "Unknown client ID in HTTP headers (%s, %s)", id_s, pass_s);
+      else {
+        pthread_mutex_lock(&c->lock);
+        ctx->client = c;
+
+        if (c->mode != USED)
+          uw_error(ctx, FATAL, "Stale client ID (%u) in subscription request", id);
+        if (c->pass != pass)
+          uw_error(ctx, FATAL, "Wrong client password (%u, %d) in subscription request", id, pass);
+      }
+    } else {
+      client *c = new_client();
+      pthread_mutex_lock(&c->lock);
+      ctx->client = c;
+    }  
+  }
+}
+
 failure_kind uw_begin(uw_context ctx, char *path) {
   int r = setjmp(ctx->jmp_buf);
 
   if (r == 0) {
     if (uw_db_begin(ctx))
       uw_error(ctx, BOUNDED_RETRY, "Error running SQL BEGIN");
+
     uw_handle(ctx, path);
   }
 
   return r;
 }
 
+uw_Basis_client uw_Basis_self(uw_context ctx, uw_unit u) {
+  if (ctx->client == NULL)
+    uw_error(ctx, FATAL, "Call to Basis.self() from page that has only server-side code");
+
+  return ctx->client->id;
+}
+
 void uw_pop_cleanup(uw_context ctx) {
   if (ctx->cleanup_front == ctx->cleanup)
     uw_error(ctx, FATAL, "Attempt to pop from empty cleanup action stack");
@@ -843,9 +698,6 @@
   if (ctx->script_header[0] == 0)
     return "";
   else {
-    int pass;
-    client *c = uw_new_client(&pass);
-
     char *r = uw_malloc(ctx, strlen(ctx->script_header) + 18 + buf_used(&ctx->script));
     sprintf(r, "%s<script>%s</script>",
             ctx->script_header,
@@ -855,16 +707,13 @@
 }
 
 const char *uw_Basis_get_settings(uw_context ctx, uw_Basis_string onload) {
-  if (ctx->script_header[0] == 0)
+  if (ctx->client == NULL)
     return "";
   else {
-    int pass;
-    client *c = uw_new_client(&pass);
-
     char *r = uw_malloc(ctx, 52 + 3 * INTS_MAX + strlen(ctx->url_prefix) + strlen(onload));
-    sprintf(r, " onload='client_id=%d;client_pass=%d;url_prefix=\"%s\";timeout=%d;listener();%s'",
-            (int)c->id,
-            c->data.used.pass,
+    sprintf(r, " onload='client_id=%u;client_pass=%d;url_prefix=\"%s\";timeout=%d;listener();%s'",
+            ctx->client->id,
+            ctx->client->pass,
             ctx->url_prefix,
             ctx->timeout,
             onload);
@@ -942,6 +791,21 @@
   return r;
 }
 
+char *uw_Basis_jsifyChannel(uw_context ctx, uw_Basis_channel chn) {
+  if (ctx->client == NULL || chn.cli != ctx->client->id)
+    return "null";
+  else {
+    int len;
+    char *r;
+
+    uw_check_heap(ctx, INTS_MAX + 1);
+    r = ctx->heap.front;
+    sprintf(r, "%u%n", chn.chn, &len);
+    ctx->heap.front += len+1;
+    return r;
+  }
+}
+
 uw_Basis_int uw_Basis_new_client_source(uw_context ctx, uw_Basis_string s) {
   int len;
   size_t s_len = strlen(s);
@@ -1007,16 +871,6 @@
   return result;
 }
 
-char *uw_Basis_attrifyChannel(uw_context ctx, uw_Basis_channel n) {
-  char *result;
-  int len;
-  uw_check_heap(ctx, INTS_MAX);
-  result = ctx->heap.front;
-  sprintf(result, "%lld%n", (long long)n, &len);
-  ctx->heap.front += len+1;
-  return result;
-}
-
 char *uw_Basis_attrifyFloat(uw_context ctx, uw_Basis_float n) {
   char *result;
   int len;
@@ -1116,15 +970,19 @@
   return r;
 }
 
-char *uw_Basis_urlifyChannel(uw_context ctx, uw_Basis_channel n) {
-  int len;
-  char *r;
+char *uw_Basis_urlifyChannel(uw_context ctx, uw_Basis_channel chn) {
+  if (ctx->client == NULL || chn.cli != ctx->client->id)
+    return "";
+  else {
+    int len;
+    char *r;
 
-  uw_check_heap(ctx, INTS_MAX);
-  r = ctx->heap.front;
-  sprintf(r, "%lld%n", (long long)n, &len);
-  ctx->heap.front += len+1;
-  return r;
+    uw_check_heap(ctx, INTS_MAX + 1);
+    r = ctx->heap.front;
+    sprintf(r, "%u%n", chn.chn, &len);
+    ctx->heap.front += len+1;
+    return r;
+  }
 }
 
 char *uw_Basis_urlifyFloat(uw_context ctx, uw_Basis_float n) {
@@ -1182,13 +1040,15 @@
   return uw_unit_v;
 }
 
-uw_unit uw_Basis_urlifyChannel_w(uw_context ctx, uw_Basis_channel n) {
-  int len;
-
-  uw_check(ctx, INTS_MAX);
-  sprintf(ctx->page.front, "%lld%n", (long long)n, &len);
-  ctx->page.front += len;
-
+uw_unit uw_Basis_urlifyChannel_w(uw_context ctx, uw_Basis_channel chn) {
+  if (ctx->client != NULL && chn.cli == ctx->client->id) {
+    int len;
+    
+    uw_check(ctx, INTS_MAX + 1);
+    sprintf(ctx->page.front, "%u%n", chn.chn, &len);
+    ctx->page.front += len;
+  }
+    
   return uw_unit_v;
 }
 
@@ -1255,10 +1115,6 @@
   return r;
 }
 
-uw_Basis_channel uw_Basis_unurlifyChannel(uw_context ctx, char **s) {
-  return uw_Basis_unurlifyInt(ctx, s);
-}
-
 uw_Basis_float uw_Basis_unurlifyFloat(uw_context ctx, char **s) {
   char *new_s = uw_unurlify_advance(*s);
   uw_Basis_float r;
@@ -1350,10 +1206,6 @@
   return uw_unit_v;
 }
 
-char *uw_Basis_htmlifyChannel(uw_context ctx, uw_Basis_channel ch) {
-  return uw_Basis_htmlifyInt(ctx, ch);
-}
-
 char *uw_Basis_htmlifyFloat(uw_context ctx, uw_Basis_float n) {
   int len;
   char *r;
@@ -1541,17 +1393,6 @@
   return r;
 }
 
-char *uw_Basis_sqlifyChannel(uw_context ctx, uw_Basis_channel n) {
-  int len;
-  char *r;
-
-  uw_check_heap(ctx, INTS_MAX + 6);
-  r = ctx->heap.front;
-  sprintf(r, "%lld::int4%n", (long long)n, &len);
-  ctx->heap.front += len+1;
-  return r;
-}
-
 char *uw_Basis_sqlifyIntN(uw_context ctx, uw_Basis_int *n) {
   if (n == NULL)
     return "NULL";
@@ -1614,6 +1455,52 @@
   return r;
 }
 
+char *uw_Basis_sqlifyChannel(uw_context ctx, uw_Basis_channel chn) {
+  int len;
+  char *r;
+  unsigned long long combo = ((unsigned long long)chn.cli << 32) | chn.chn;
+
+  uw_check_heap(ctx, INTS_MAX + 7);
+  r = ctx->heap.front;
+  sprintf(r, "%lld::int8%n", combo, &len);
+  ctx->heap.front += len+1;
+  return r;
+}
+
+char *uw_Basis_attrifyChannel(uw_context ctx, uw_Basis_channel chn) {
+  int len;
+  char *r;
+  unsigned long long combo = ((unsigned long long)chn.cli << 32) | chn.chn;
+
+  uw_check_heap(ctx, INTS_MAX + 1);
+  r = ctx->heap.front;
+  sprintf(r, "%lld%n", combo, &len);
+  ctx->heap.front += len+1;
+  return r;
+}
+
+char *uw_Basis_sqlifyClient(uw_context ctx, uw_Basis_client cli) {
+  int len;
+  char *r;
+
+  uw_check_heap(ctx, INTS_MAX + 7);
+  r = ctx->heap.front;
+  sprintf(r, "%u::int4%n", cli, &len);
+  ctx->heap.front += len+1;
+  return r;
+}
+
+char *uw_Basis_attrifyClient(uw_context ctx, uw_Basis_client cli) {
+  int len;
+  char *r;
+
+  uw_check_heap(ctx, INTS_MAX + 1);
+  r = ctx->heap.front;
+  sprintf(r, "%u%n", cli, &len);
+  ctx->heap.front += len+1;
+  return r;
+}
+
 uw_Basis_string uw_Basis_sqlifyStringN(uw_context ctx, uw_Basis_string s) {
   if (s == NULL)
     return "NULL";
@@ -1637,6 +1524,21 @@
 
 char *uw_Basis_sqlifyTime(uw_context ctx, uw_Basis_time t) {
   size_t len;
+  char *r, *s;
+  struct tm stm;
+
+  if (localtime_r(&t, &stm)) {
+    s = uw_malloc(ctx, TIMES_MAX);
+    len = strftime(s, TIMES_MAX, TIME_FMT, &stm);
+    r = uw_malloc(ctx, len + 14);
+    sprintf(r, "'%s'::timestamp", s);
+    return r;
+  } else
+    return "<Invalid time>";
+}
+
+char *uw_Basis_attrifyTime(uw_context ctx, uw_Basis_time t) {
+  size_t len;
   char *r;
   struct tm stm;
 
@@ -1723,18 +1625,6 @@
     return NULL;
 }
 
-uw_Basis_channel *uw_Basis_stringToChannel(uw_context ctx, uw_Basis_string s) {
-  char *endptr;
-  uw_Basis_channel n = strtoll(s, &endptr, 10);
-
-  if (*s != '\0' && *endptr == '\0') {
-    uw_Basis_channel *r = uw_malloc(ctx, sizeof(uw_Basis_channel));
-    *r = n;
-    return r;
-  } else
-    return NULL;
-}
-
 uw_Basis_float *uw_Basis_stringToFloat(uw_context ctx, uw_Basis_string s) {
   char *endptr;
   uw_Basis_float n = strtod(s, &endptr);
@@ -1802,14 +1692,27 @@
     uw_error(ctx, FATAL, "Can't parse int: %s", s);
 }
 
+#include <errno.h>
+
 uw_Basis_channel uw_Basis_stringToChannel_error(uw_context ctx, uw_Basis_string s) {
+  unsigned long long n;
+
+  if (sscanf(s, "%llu", &n) < 1)
+    uw_error(ctx, FATAL, "Can't parse channel: %s", s);
+  else {
+    uw_Basis_channel ch = {n >> 32, n & ((1ull << 32) - 1)};
+    return ch;
+  }
+}
+
+uw_Basis_client uw_Basis_stringToClient_error(uw_context ctx, uw_Basis_string s) {
   char *endptr;
-  uw_Basis_channel n = strtoll(s, &endptr, 10);
+  unsigned long n = strtoul(s, &endptr, 10);
 
   if (*s != '\0' && *endptr == '\0')
     return n;
   else
-    uw_error(ctx, FATAL, "Can't parse channel int: %s", s);
+    uw_error(ctx, FATAL, "Can't parse client: %s", s);
 }
 
 uw_Basis_float uw_Basis_stringToFloat_error(uw_context ctx, uw_Basis_string s) {
@@ -1856,22 +1759,6 @@
   }
 }
 
-uw_Basis_string uw_Basis_requestHeader(uw_context ctx, uw_Basis_string h) {
-  int len = strlen(h);
-  char *s = ctx->headers, *p;
-
-  while (p = strchr(s, ':')) {
-    if (p - s == len && !strncasecmp(s, h, len)) {
-      return p + 2;
-    } else {
-      if ((s = strchr(p, 0)) && s < ctx->headers_end)
-        s += 2;
-      else
-        return NULL;
-    }
-  }
-}
-
 uw_Basis_string uw_Basis_get_cookie(uw_context ctx, uw_Basis_string c) {
   int len = strlen(c);
   char *s = ctx->headers, *p = ctx->outHeaders.start;
@@ -1930,100 +1817,45 @@
   return uw_unit_v;
 }
 
-static channel_delta *allocate_delta(uw_context ctx, channel *ch) {
-  size_t i;
-  channel_delta *cd;
+static delta *allocate_delta(uw_context ctx, unsigned client) {
+  unsigned i;
+  delta *d;
 
-  for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED && ctx->deltas[i].ch != ch; ++i);
+  for (i = 0; i < ctx->used_deltas; ++i)
+    if (ctx->deltas[i].client == client)
+      return &ctx->deltas[i];
 
-  if (i < ctx->n_deltas && ctx->deltas[i].mode == USED && ctx->deltas[i].ch == ch)
-    return &ctx->deltas[i];
+  if (ctx->used_deltas >= ctx->n_deltas) {
+    ctx->deltas = realloc(ctx->deltas, sizeof(delta) * ++ctx->n_deltas);
+    buf_init(&ctx->deltas[ctx->n_deltas-1].msgs, 0);
+  }
 
-  if (i < ctx->n_deltas)
-    cd = &ctx->deltas[i];
-  else {
-    ++ctx->n_deltas;
-    ctx->deltas = realloc(ctx->deltas, sizeof(channel_delta) * ctx->n_deltas);
-    cd = &ctx->deltas[ctx->n_deltas-1];
-    cd->n_subscribed = 0;
-    cd->subscribed = malloc(0);
-    buf_init(&cd->msgs, 0);
-  }
-   
-  cd->mode = USED;
-  cd->newness = OLD;
-  cd->ch = ch;
-  if (cd->n_subscribed > 0)
-    cd->subscribed[0] = NULL;
-  buf_reset(&cd->msgs);
-  return cd;
+  d = &ctx->deltas[ctx->used_deltas++];
+  d->client = client;
+  buf_reset(&d->msgs);
+  return d;
 }
 
 uw_Basis_channel uw_Basis_new_channel(uw_context ctx, uw_unit u) {
-  size_t i;
-  channel *ch = uw_new_channel();
-  ++ch->data.used.refcount;
-  channel_delta *cd = allocate_delta(ctx, ch);
+  if (ctx->client == NULL)
+    uw_error(ctx, FATAL, "Attempt to create channel on request not associated with a persistent connection");
 
-  cd->newness = NEW;
-
-  return ch->id;
-}
-
-static int delta_used(channel_delta *cd) {
-  return cd->newness == NEW || buf_used(&cd->msgs) > 0 || (cd->n_subscribed > 0 && cd->subscribed[0]);
-}
-
-uw_unit uw_Basis_subscribe(uw_context ctx, uw_Basis_channel chn) {
-  channel *ch = uw_find_channel(chn);
-
-  if (ch == NULL)
-    uw_error(ctx, FATAL, "Bad channel ID %d", (int)chn);
-  else {
-    size_t id = atoi(uw_Basis_requestHeader(ctx, "UrWeb-Client"));
-    int pass = atoi(uw_Basis_requestHeader(ctx, "UrWeb-Pass"));
-    client *c = uw_find_client(id);
-
-    if (c == NULL) {
-      uw_release_channel(ch);
-      uw_error(ctx, FATAL, "Unknown client ID in subscription request");
-    } else if (c->data.used.pass != pass) {
-      uw_release_channel(ch);
-      uw_release_client(c);
-      uw_error(ctx, FATAL, "Wrong client password (%d) in subscription request", pass);
-    } else {
-      size_t i;
-      channel_delta *cd = allocate_delta(ctx, ch);
-
-      if (delta_used(cd))
-        uw_release_channel(ch);
-
-      for (i = 0; i < cd->n_subscribed && cd->subscribed[i]; ++i);
-
-      if (i < cd->n_subscribed)
-        cd->subscribed[i] = c;
-      else {
-        ++cd->n_subscribed;
-        cd->subscribed = realloc(cd->subscribed, sizeof(int) * cd->n_subscribed);
-        cd->subscribed[cd->n_subscribed-1] = c;
-      }
-    }
-  }
-
-  return uw_unit_v;
+  return new_channel(ctx->client);
 }
 
 uw_unit uw_Basis_send(uw_context ctx, uw_Basis_channel chn, uw_Basis_string msg) {
-  channel *ch = uw_find_channel(chn);
+  delta *d = allocate_delta(ctx, chn.cli);
+  size_t len;
+  int preLen;
+  char pre[INTS_MAX + 2];
 
-  if (ch == NULL)
-    uw_error(ctx, FATAL, "Bad channel ID %d", (int)chn);
-  else {
-    channel_delta *cd = allocate_delta(ctx, ch);
-    if (delta_used(cd))
-      uw_release_channel(ch);
-    buf_append(&cd->msgs, msg, strlen(msg));
-  }
+  len = strlen(msg);
+
+  sprintf(pre, "%u\n%n", chn.chn, &preLen);
+
+  buf_append(&d->msgs, pre, preLen);
+  buf_append(&d->msgs, msg, len);
+  buf_append(&d->msgs, "\n", 1);
 
   return uw_unit_v;
 }
@@ -2032,46 +1864,27 @@
 int uw_db_rollback(uw_context);
 
 void uw_commit(uw_context ctx) {
-  size_t i, j;
-
-  for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED; ++i) {
-    channel *ch = ctx->deltas[i].ch;
-
-    for (j = 0; j < ctx->deltas[i].n_subscribed && ctx->deltas[i].subscribed[j]; ++j) {
-      client *c = ctx->deltas[i].subscribed[j];
-
-      uw_subscribe(ch, c);
-      uw_release_client(c);
-    }
-
-    if (buf_used(&ctx->deltas[i].msgs) > 0) {
-      uw_channel_send(ch, ctx->deltas[i].msgs.start);
-    }
-
-    uw_release_channel(ch);
-  }
+  unsigned i;
 
   if (uw_db_commit(ctx))
     uw_error(ctx, FATAL, "Error running SQL COMMIT");
+
+  for (i = 0; i < ctx->used_deltas; ++i) {
+    delta *d = &ctx->deltas[i];
+    client *c = find_client(d->client);
+
+    assert (c != NULL && c->mode == USED);
+
+    client_send(c == ctx->client, c, &d->msgs);
+  }
+
+  if (ctx->client)
+    pthread_mutex_unlock(&ctx->client->lock);
 }
 
 int uw_rollback(uw_context ctx) {
-  size_t i, j;
-
-  for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED; ++i) {
-    channel *ch = ctx->deltas[i].ch;
-
-    for (j = 0; j < ctx->deltas[i].n_subscribed && ctx->deltas[i].subscribed[j]; ++j) {
-      client *c = ctx->deltas[i].subscribed[j];
-
-      uw_release_client(c);
-    }
-
-    if (ctx->deltas[i].newness == NEW)
-      uw_free_channel(ch);
-    else
-      uw_release_channel(ch);
-  }
+  if (ctx->client)
+    pthread_mutex_unlock(&ctx->client->lock);
 
   return uw_db_rollback(ctx);
 }
--- a/src/cjr_print.sml	Sat Mar 28 11:15:42 2009 -0400
+++ b/src/cjr_print.sml	Sun Mar 29 11:37:29 2009 -0400
@@ -404,6 +404,7 @@
       | TFfi ("Basis", "bool") => box [string "uw_Basis_stringToBool_error(ctx, ", e, string ")"]
       | TFfi ("Basis", "time") => box [string "uw_Basis_stringToTime_error(ctx, ", e, string ")"]
       | TFfi ("Basis", "channel") => box [string "uw_Basis_stringToChannel_error(ctx, ", e, string ")"]
+      | TFfi ("Basis", "client") => box [string "uw_Basis_stringToClient_error(ctx, ", e, string ")"]
 
       | _ => (ErrorMsg.errorAt loc "Don't know how to unmarshal type from SQL";
               Print.eprefaces' [("Type", p_typ env tAll)];
@@ -447,6 +448,7 @@
        | Bool
        | Time
        | Channel
+       | Client
        | Nullable of sql_type
 
 fun p_sql_type' t =
@@ -457,6 +459,7 @@
       | Bool => "uw_Basis_bool"
       | Time => "uw_Basis_time"
       | Channel => "uw_Basis_channel"
+      | Client => "uw_Basis_client"
       | Nullable String => "uw_Basis_string"
       | Nullable t => p_sql_type' t ^ "*"
 
@@ -473,6 +476,7 @@
       | EFfiApp ("Basis", "sqlifyBool", [e]) => [(e, Bool)]
       | EFfiApp ("Basis", "sqlifyTime", [e]) => [(e, Time)]
       | EFfiApp ("Basis", "sqlifyChannel", [e]) => [(e, Channel)]
+      | EFfiApp ("Basis", "sqlifyClient", [e]) => [(e, Client)]
 
       | ECase (e,
                [((PNone _, _),
@@ -496,8 +500,9 @@
       | Float => box [string "uw_Basis_attrifyFloat(ctx, ", e, string ")"]
       | String => e
       | Bool => box [string "(", e, string " ? \"TRUE\" : \"FALSE\")"]
-      | Time => box [string "uw_Basis_sqlifyTime(ctx, ", e, string ")"]
+      | Time => box [string "uw_Basis_attrifyTime(ctx, ", e, string ")"]
       | Channel => box [string "uw_Basis_attrifyChannel(ctx, ", e, string ")"]
+      | Client => box [string "uw_Basis_attrifyClient(ctx, ", e, string ")"]
       | Nullable String => e
       | Nullable t => box [string "(",
                            e,
@@ -1982,7 +1987,7 @@
                             newline,
                             string "PGconn *conn = uw_get_db(ctx);",
                             newline,
-                            string "PGresult *res = PQexec(conn, \"BEGIN\");",
+                            string "PGresult *res = PQexec(conn, \"BEGIN ISOLATION LEVEL SERIALIZABLE\");",
                             newline,
                             newline,
                             string "if (res == NULL) return 1;",
@@ -2108,7 +2113,8 @@
       | TFfi ("Basis", "string") => "text"
       | TFfi ("Basis", "bool") => "bool"
       | TFfi ("Basis", "time") => "timestamp"
-      | TFfi ("Basis", "channel") => "int4"
+      | TFfi ("Basis", "channel") => "int8"
+      | TFfi ("Basis", "client") => "int4"
       | _ => (ErrorMsg.errorAt loc "Don't know SQL equivalent of type";
               Print.eprefaces' [("Type", p_typ env tAll)];
               "ERROR")
@@ -2368,6 +2374,8 @@
                                     string (!Monoize.urlPrefix),
                                     string "\");",
                                     newline]),
+                     string "uw_login(ctx);",
+                     newline,
                      box [string "{",
                           newline,
                           box (ListUtil.mapi (fn (i, t) => box [p_typ env t,
--- a/src/jscomp.sml	Sat Mar 28 11:15:42 2009 -0400
+++ b/src/jscomp.sml	Sun Mar 29 11:37:29 2009 -0400
@@ -49,7 +49,6 @@
              (("Basis", "urlifyInt"), "ts"),
              (("Basis", "urlifyFloat"), "ts"),
              (("Basis", "urlifyString"), "uf"),
-             (("Basis", "urlifyChannel"), "ts"),
              (("Basis", "recv"), "rv")]
 
 structure FM = BinaryMapFn(struct
@@ -220,7 +219,7 @@
               | TFfi ("Basis", "string") => ((EFfiApp ("Basis", "jsifyString", [e]), loc), st)
               | TFfi ("Basis", "int") => ((EFfiApp ("Basis", "htmlifyInt", [e]), loc), st)
               | TFfi ("Basis", "float") => ((EFfiApp ("Basis", "htmlifyFloat", [e]), loc), st)
-              | TFfi ("Basis", "channel") => ((EFfiApp ("Basis", "htmlifyChannel", [e]), loc), st)
+              | TFfi ("Basis", "channel") => ((EFfiApp ("Basis", "jsifyChannel", [e]), loc), st)
 
               | TFfi ("Basis", "bool") => ((ECase (e,
                                                    [((PCon (Enum, PConFfi {mod = "Basis",
@@ -348,7 +347,7 @@
               | TFfi ("Basis", "string") => ("uu(t[i++])", st)
               | TFfi ("Basis", "int") => ("parseInt(t[i++])", st)
               | TFfi ("Basis", "float") => ("parseFloat(t[i++])", st)
-              | TFfi ("Basis", "channel") => ("parseInt(t[i++])", st)
+              | TFfi ("Basis", "channel") => ("(t[i++].length > 0 ? parseInt(t[i]) : null)", st)
 
               | TFfi ("Basis", "bool") => ("t[i++] == \"True\"", st)
 
--- a/src/monoize.sml	Sat Mar 28 11:15:42 2009 -0400
+++ b/src/monoize.sml	Sun Mar 29 11:37:29 2009 -0400
@@ -1110,14 +1110,6 @@
                 ((L'.EAbs ("_", (L'.TRecord [], loc), (L'.TFfi ("Basis", "channel"), loc),
                            (L'.EFfiApp ("Basis", "new_channel", [(L'.ERecord [], loc)]), loc)), loc),
                  fm)
-          | L.ECApp ((L.EFfi ("Basis", "subscribe"), _), t) =>
-            ((L'.EAbs ("ch", (L'.TFfi ("Basis", "channel"), loc),
-                       (L'.TFun ((L'.TRecord [], loc), (L'.TRecord [], loc)), loc),
-                       (L'.EAbs ("_", (L'.TRecord [], loc), (L'.TRecord [], loc),
-                                 (L'.EFfiApp ("Basis", "subscribe",
-                                              [(L'.ERel 1, loc)]),
-                                  loc)), loc)), loc),
-             fm)
           | L.ECApp ((L.EFfi ("Basis", "send"), _), t) =>
             let
                 val t = monoType env t
@@ -1431,6 +1423,10 @@
             ((L'.EAbs ("x", (L'.TFfi ("Basis", "channel"), loc), (L'.TFfi ("Basis", "string"), loc),
                        (L'.EFfiApp ("Basis", "sqlifyChannel", [(L'.ERel 0, loc)]), loc)), loc),
              fm)
+          | L.EFfi ("Basis", "sql_client") =>
+            ((L'.EAbs ("x", (L'.TFfi ("Basis", "client"), loc), (L'.TFfi ("Basis", "string"), loc),
+                       (L'.EFfiApp ("Basis", "sqlifyClient", [(L'.ERel 0, loc)]), loc)), loc),
+             fm)
           | L.ECApp ((L.EFfi ("Basis", "sql_prim"), _), t) =>
             let
                 val t = monoType env t
--- a/src/prepare.sml	Sat Mar 28 11:15:42 2009 -0400
+++ b/src/prepare.sml	Sun Mar 29 11:37:29 2009 -0400
@@ -48,6 +48,8 @@
       | EFfiApp ("Basis", "sqlifyTime", [e]) =>
         SOME ("$" ^ Int.toString (n + 1) ^ "::timestamp" :: ss, n + 1)
       | EFfiApp ("Basis", "sqlifyChannel", [e]) =>
+        SOME ("$" ^ Int.toString (n + 1) ^ "::int8" :: ss, n + 1)
+      | EFfiApp ("Basis", "sqlifyClient", [e]) =>
         SOME ("$" ^ Int.toString (n + 1) ^ "::int4" :: ss, n + 1)
 
       | ECase (e,
--- a/tests/chat.ur	Sat Mar 28 11:15:42 2009 -0400
+++ b/tests/chat.ur	Sun Mar 29 11:37:29 2009 -0400
@@ -9,32 +9,22 @@
     log <- signal logS;
     return (render log)
 
+structure Room = Broadcast(struct
+                               type t = string
+                           end)
+
 sequence s
-table t : { Id : int, Title : string, Chan : option (channel string) }
+table t : { Id : int, Title : string, Room : Room.topic }
 
 fun chat id =
-    r <- oneRow (SELECT t.Title, t.Chan FROM t WHERE t.Id = {[id]});
-    ch <- (case r.T.Chan of
-               None => (ch <- channel;
-                        dml (UPDATE t SET Chan = {[Some ch]} WHERE Id = {[id]});
-                        return ch)
-             | Some ch => return ch);
+    r <- oneRow (SELECT t.Title, t.Room FROM t WHERE t.Id = {[id]});
+    ch <- Room.subscribe r.T.Room;
 
     newLine <- source "";
     logHead <- source End;
     logTail <- source logHead;
 
     let
-        fun getCh () =
-            r <- oneRow (SELECT t.Chan FROM t WHERE t.Id = {[id]});
-            case r.T.Chan of
-                None => error <xml>Channel disappeared</xml>
-              | Some ch => return ch
-
-        fun join () =
-            ch <- getCh ();
-            subscribe ch
-
         fun onload () =
             let
                 fun listener () =
@@ -45,13 +35,16 @@
                     set logTail newTail;
                     listener ()
             in
-                join ();
                 listener ()
             end
 
+        fun getRoom () =
+            r <- oneRow (SELECT t.Room FROM t WHERE t.Id = {[id]});
+            return r.T.Room
+
         fun speak line =
-            ch <- getCh ();
-            send ch line
+            room <- getRoom ();
+            Room.send room line
 
         fun doSpeak () =
             line <- get newLine;
@@ -84,7 +77,8 @@
     let
         fun create r =
             id <- nextval s;
-            dml (INSERT INTO t (Id, Title, Chan) VALUES ({[id]}, {[r.Title]}, NULL));
+            room <- Room.create;
+            dml (INSERT INTO t (Id, Title, Room) VALUES ({[id]}, {[r.Title]}, {[room]}));
             main ()
     in
         ls <- list ();