diff src/c/urweb.c @ 682:5bbb542243e8

Redo channels, making them single-client
author Adam Chlipala <adamc@hcoop.net>
date Sun, 29 Mar 2009 11:37:29 -0400
parents 6c9b8875f347
children 9a2c18dab11d
line wrap: on
line diff
--- a/src/c/urweb.c	Sat Mar 28 11:15:42 2009 -0400
+++ b/src/c/urweb.c	Sun Mar 29 11:37:29 2009 -0400
@@ -7,6 +7,7 @@
 #include <ctype.h>
 #include <setjmp.h>
 #include <stdarg.h>
+#include <assert.h>
 
 #include <pthread.h>
 
@@ -88,79 +89,56 @@
 
 typedef enum { UNUSED, USED } usage;
 
-typedef struct channel_list {
-  struct channel *data;
-  struct channel_list *next;
-} channel_list;
-
 typedef struct client {
-  size_t id;
+  unsigned id;
   usage mode;
-  union {
-    struct client *next;
-    struct {
-      pthread_mutex_t lock;
-      int pass;
-      buf msgs;
-      int sock;
-      time_t last_contact;
-      unsigned refcount;
-      channel_list *channels;
-    } used;
-  } data;
+  int pass;
+  struct client *next;
+  pthread_mutex_t lock;
+  buf msgs;
+  int sock;
+  time_t last_contact;
+  unsigned n_channels;
 } client;
 
-typedef struct client_list {
-  client *data;
-  struct client_list *next;
-} client_list;
-
-typedef struct channel {
-  size_t id;
-  usage mode;
-  union {
-    struct channel *next;
-    struct {
-      pthread_mutex_t lock;
-      client_list *clients;
-      unsigned refcount;
-    } used;
-  } data;
-} channel;
-
 
 // Persistent client state
 
-static client **clients, *clients_free;
-static size_t n_clients;
+static client **clients, *clients_free, *clients_used;
+static unsigned n_clients;
 
 static pthread_mutex_t clients_mutex = PTHREAD_MUTEX_INITIALIZER;
 
-static client *uw_new_client() {
+static client *new_client() {
   client *c;
 
   pthread_mutex_lock(&clients_mutex);
 
   if (clients_free) {
     c = clients_free;
-    clients_free = clients_free->data.next;
+    clients_free = clients_free->next;
   }
   else {
     ++n_clients;
     clients = realloc(clients, sizeof(client) * n_clients);
     c = malloc(sizeof(client));
     c->id = n_clients-1;
+    pthread_mutex_init(&c->lock, NULL);
+    buf_init(&c->msgs, 0);
     clients[n_clients-1] = c;
   }
 
+  pthread_mutex_lock(&c->lock);
   c->mode = USED;
-  pthread_mutex_init(&c->data.used.lock, NULL);
-  c->data.used.pass = rand();
-  c->data.used.sock = -1;
-  c->data.used.last_contact = time(NULL);
-  buf_init(&c->data.used.msgs, 0);
-  c->data.used.refcount = 0;
-  c->data.used.channels = NULL;
+  c->pass = rand();
+  c->sock = -1;
+  c->last_contact = time(NULL);
+  buf_reset(&c->msgs);
+  c->n_channels = 0;
+  pthread_mutex_unlock(&c->lock);
+
+  c->next = clients_used;
+  clients_used = c;
 
   pthread_mutex_unlock(&clients_mutex);
 
@@ -169,7 +147,7 @@
 
 static const char begin_msgs[] = "HTTP/1.1 200 OK\r\nContent-type: text/plain\r\n\r\n";
 
-static client *uw_find_client(size_t id) {
+static client *find_client(unsigned id) {
   client *c;
 
   pthread_mutex_lock(&clients_mutex);
@@ -181,282 +159,111 @@
 
   c = clients[id];
 
-  if (c->mode != USED) {
-    pthread_mutex_unlock(&clients_mutex);
-    return NULL;
-  }
-  
-  pthread_mutex_lock(&c->data.used.lock);
-  ++c->data.used.refcount;
-  pthread_mutex_unlock(&c->data.used.lock);
   pthread_mutex_unlock(&clients_mutex);
   return c;
 }
 
-static void uw_release_client(client *c) {
-  pthread_mutex_lock(&c->data.used.lock);
-  --c->data.used.refcount;
-  pthread_mutex_unlock(&c->data.used.lock);
-}
-
-void uw_client_connect(size_t id, int pass, int sock) {
-  client *c = uw_find_client(id);
+void uw_client_connect(unsigned id, int pass, int sock) {
+  client *c = find_client(id);
 
   if (c == NULL) {
     close(sock);
-    fprintf(stderr, "Out-of-bounds client request (%d)\n", (int)id);
+    fprintf(stderr, "Out-of-bounds client request (%u)\n", id);
     return;
   }
 
-  uw_release_client(c);
+  pthread_mutex_lock(&c->lock);
 
-  pthread_mutex_lock(&c->data.used.lock);
-
-  if (pass != c->data.used.pass) {
-    pthread_mutex_unlock(&c->data.used.lock);
+  if (c->mode != USED) {
+    pthread_mutex_unlock(&c->lock);
     close(sock);
-    fprintf(stderr, "Wrong client password (%d)\n", (int)id);
+    fprintf(stderr, "Client request for unused slot (%u)\n", id);
     return;
   }
 
-  if (c->data.used.sock != -1) {
-    close(c->data.used.sock);
-    c->data.used.sock = -1;
+  if (pass != c->pass) {
+    pthread_mutex_unlock(&c->lock);
+    close(sock);
+    fprintf(stderr, "Wrong client password (%u, %d)\n", id, pass);
+    return;
   }
 
-  c->data.used.last_contact = time(NULL);
+  if (c->sock != -1) {
+    close(c->sock);
+    c->sock = -1;
+  }
 
-  if (buf_used(&c->data.used.msgs) > 0) {
+  c->last_contact = time(NULL);
+
+  if (buf_used(&c->msgs) > 0) {
     uw_really_send(sock, begin_msgs, sizeof(begin_msgs) - 1);
-    uw_really_send(sock, c->data.used.msgs.start, buf_used(&c->data.used.msgs));
-    buf_reset(&c->data.used.msgs);
+    uw_really_send(sock, c->msgs.start, buf_used(&c->msgs));
+    buf_reset(&c->msgs);
     close(sock);
   }
   else
-    c->data.used.sock = sock;
+    c->sock = sock;
 
-  pthread_mutex_unlock(&c->data.used.lock);
+  pthread_mutex_unlock(&c->lock);
 }
 
-
-static void uw_free_client(client *c) {
-  channel_list *chs;
-
+static void free_client(client *c) {
   printf("Freeing client %d\n", c->id);
 
-  if (c->mode == USED) {
-    pthread_mutex_lock(&c->data.used.lock);
+  c->mode = UNUSED;
+  c->pass = -1;
 
-    for (chs = c->data.used.channels; chs; ) {
-      client_list *prev, *cs;
-      
-      channel *ch = chs->data;
-      channel_list *tmp = chs->next;
-      free(chs);
-      chs = tmp;
-
-      pthread_mutex_lock(&ch->data.used.lock);
-      for (prev = NULL, cs = ch->data.used.clients; cs; ) {
-        if (cs->data == c) {
-          client_list *tmp = cs->next;
-          free(cs);
-          cs = tmp;
-          if (prev)
-            prev->next = cs;
-          else
-            ch->data.used.clients = cs;
-        }
-        else {
-          prev = cs;
-          cs = cs->next;
-        }
-      }
-      pthread_mutex_unlock(&ch->data.used.lock);
-    }
-
-    if (c->data.used.sock != -1)
-      close(c->data.used.sock);
-
-    pthread_mutex_unlock(&c->data.used.lock);
-    pthread_mutex_destroy(&c->data.used.lock);
-    buf_free(&c->data.used.msgs);
-    c->mode = UNUSED;
-
-    c->data.next = clients_free;
-    clients_free = c;
-  }
+  c->next = clients_free;
+  clients_free = c;
 }
 
 extern int uw_timeout;
 
 void uw_prune_clients() {
-  size_t i;
+  client *c, *next, *prev = NULL;
   time_t cutoff;
 
   cutoff = time(NULL) - uw_timeout;
 
   pthread_mutex_lock(&clients_mutex);
 
-  for (i = 0; i < n_clients; ++i) {
-    if (clients[i]->mode == USED && clients[i]->data.used.last_contact < cutoff
-        && clients[i]->data.used.refcount == 0)
-      uw_free_client(clients[i]);
+  for (c = clients_used; c; c = next) {
+    next = c->next;
+    pthread_mutex_lock(&c->lock);
+    if (c->last_contact < cutoff) {
+      if (prev)
+        prev->next = next;
+      else
+        clients_used = next;
+      free_client(c);
+    }
+    else
+      prev = c;
+    pthread_mutex_unlock(&c->lock);
   }
 
   pthread_mutex_unlock(&clients_mutex);
 }
 
-
-// Persistent channel state
-
-
-static channel **channels, *channels_free;
-static size_t n_channels;
-
-static pthread_mutex_t channels_mutex = PTHREAD_MUTEX_INITIALIZER;
-
-static channel *uw_new_channel() {
-  channel *ch;
-
-  pthread_mutex_lock(&channels_mutex);
-
-  if (channels_free) {
-    ch = channels_free;
-    channels_free = channels_free->data.next;
-  }
-  else {
-    ++n_channels;
-    channels = realloc(channels, sizeof(channels) * n_channels);
-    ch = malloc(sizeof(channel));
-    ch->id = n_channels-1;
-    channels[n_channels-1] = ch;
-  }
-
-  ch->mode = USED;
-  pthread_mutex_init(&ch->data.used.lock, NULL);
-  ch->data.used.clients = NULL;
-  ch->data.used.refcount = 0;
-
-  pthread_mutex_unlock(&channels_mutex);
-
+static uw_Basis_channel new_channel(client *c) {
+  uw_Basis_channel ch = {c->id, c->n_channels++};
   return ch;
 }
 
-static void uw_free_channel(channel *ch) {
-  if (ch->mode == USED) {
-    client_list *cs;
+static void client_send(int already_locked, client *c, buf *msg) {
+  if (!already_locked)
+    pthread_mutex_lock(&c->lock);
 
-    for (cs = ch->data.used.clients; cs; ) {
-      client_list *tmp = cs->next;
-      free(cs);
-      cs = tmp;
-    }
-    pthread_mutex_destroy(&ch->data.used.lock);
-    ch->mode = UNUSED;
-    ch->data.next = channels_free;
-    channels_free = ch;
-  }
-}
+  if (c->sock != -1) {
+    uw_really_send(c->sock, begin_msgs, sizeof(begin_msgs) - 1);
+    uw_really_send(c->sock, msg->start, buf_used(msg));
+    close(c->sock);
+    c->sock = -1;
+  } else
+    buf_append(&c->msgs, msg->start, buf_used(msg));
 
-static channel *uw_find_channel(size_t id) {
-  channel *ch = NULL;
-
-  pthread_mutex_lock(&channels_mutex);
-
-  if (id < n_channels && channels[id]->mode == USED) {
-    ch = channels[id];
-
-    pthread_mutex_lock(&ch->data.used.lock);
-    ++ch->data.used.refcount;
-    pthread_mutex_unlock(&ch->data.used.lock);
-  }
-
-  pthread_mutex_unlock(&channels_mutex);
-
-  return ch;
-}
-
-static void uw_release_channel(channel *ch) {
-  pthread_mutex_lock(&ch->data.used.lock);
-  ++ch->data.used.refcount;
-  pthread_mutex_unlock(&ch->data.used.lock);
-}
-
-static void uw_subscribe(channel *ch, client *c) {
-  client_list *cs;
-
-  pthread_mutex_lock(&ch->data.used.lock);
-
-  for (cs = ch->data.used.clients; cs; cs = cs->next)
-    if (cs->data == c) {
-      pthread_mutex_unlock(&ch->data.used.lock);
-      return;
-    }
-
-  cs = malloc(sizeof(client_list));
-  cs->data = c;
-  cs->next = ch->data.used.clients;
-  ch->data.used.clients = cs;
-
-  pthread_mutex_unlock(&ch->data.used.lock);
-}
-
-static void uw_unsubscribe(channel *ch, client *c) {
-  client_list *prev, *cur, *tmp;
-
-  pthread_mutex_lock(&ch->data.used.lock);
-
-  for (prev = NULL, cur = ch->data.used.clients; cur; ) {
-    if (cur->data == c) {
-      if (prev)
-        prev->next = cur->next;
-      else
-        ch->data.used.clients = cur->next;
-      tmp = cur;
-      cur = cur->next;
-      free(tmp);
-    }
-    else {
-      prev = cur;
-      cur = cur->next;
-    }
-  }
-
-  pthread_mutex_unlock(&ch->data.used.lock);
-}
-
-static void uw_channel_send(channel *ch, const char *msg) {
-  size_t len = strlen(msg), preLen;
-  char pre[INTS_MAX + 2];
-  client_list *cs;
-
-  sprintf(pre, "%d\n", (int)ch->id);
-  preLen = strlen(pre);
-
-  pthread_mutex_lock(&ch->data.used.lock);
-
-  for (cs = ch->data.used.clients; cs; cs = cs->next) {
-    client *c = cs->data;
-
-    pthread_mutex_lock(&c->data.used.lock);
-
-    if (c->data.used.sock != -1) {
-      uw_really_send(c->data.used.sock, begin_msgs, sizeof(begin_msgs) - 1);
-      uw_really_send(c->data.used.sock, pre, preLen);
-      uw_really_send(c->data.used.sock, msg, len);
-      uw_really_send(c->data.used.sock, "\n", 1);
-      close(c->data.used.sock);
-      c->data.used.sock = -1;
-    } else {
-      buf_append(&c->data.used.msgs, pre, preLen);
-      buf_append(&c->data.used.msgs, msg, len);
-      buf_append(&c->data.used.msgs, "\n", 1);
-    }
-
-    pthread_mutex_unlock(&c->data.used.lock);
-  }
-
-  pthread_mutex_unlock(&ch->data.used.lock);
+  if (!already_locked)
+    pthread_mutex_unlock(&c->lock);
 }
 
 
@@ -466,7 +273,6 @@
   srand(time(NULL) ^ getpid());
 
   clients = malloc(0);
-  channels = malloc(0);
 }
 
 
@@ -484,15 +290,9 @@
 } cleanup;
 
 typedef struct {
-  usage mode;
-  channel *ch;
-  enum { OLD, NEW } newness;
-
-  size_t n_subscribed;
-  client **subscribed;
-
+  unsigned client;
   buf msgs;
-} channel_delta;
+} delta;
 
 struct uw_context {
   char *headers, *headers_end;
@@ -512,11 +312,13 @@
 
   const char *script_header, *url_prefix;
 
-  size_t n_deltas;
-  channel_delta *deltas;
+  size_t n_deltas, used_deltas;
+  delta *deltas;
 
   int timeout;
 
+  client *client;
+
   char error_message[ERROR_BUF_LEN];
 };
 
@@ -548,7 +350,7 @@
 
   ctx->source_count = 0;
 
-  ctx->n_deltas = 0;
+  ctx->n_deltas = ctx->used_deltas = 0;
   ctx->deltas = malloc(0);
 
   ctx->timeout = uw_timeout;
@@ -574,10 +376,8 @@
   free(ctx->inputs);
   free(ctx->cleanup);
 
-  for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED; ++i) {
-    free(ctx->deltas[i].subscribed);
+  for (i = 0; i < ctx->n_deltas; ++i)
     buf_free(&ctx->deltas[i].msgs);
-  }
 
   free(ctx);
 }
@@ -591,8 +391,8 @@
   ctx->regions = NULL;
   ctx->cleanup_front = ctx->cleanup;
   ctx->source_count = 0;
-  if (ctx->n_deltas > 0)
-    ctx->deltas[0].mode = UNUSED;
+  ctx->used_deltas = 0;
+  ctx->client = NULL;
 }
 
 void uw_reset_keep_request(uw_context ctx) {
@@ -669,18 +469,73 @@
   ++ctx->cleanup_front;
 }
 
+uw_Basis_string uw_Basis_requestHeader(uw_context ctx, uw_Basis_string h) {
+  int len = strlen(h);
+  char *s = ctx->headers, *p;
+
+  while (p = strchr(s, ':')) {
+    if (p - s == len && !strncasecmp(s, h, len)) {
+      return p + 2;
+    } else {
+      if ((s = strchr(p, 0)) && s < ctx->headers_end)
+        s += 2;
+      else
+        return NULL;
+    }
+  }
+
+  return NULL;
+}
+
+void uw_login(uw_context ctx) {
+  if (ctx->script_header[0]) {
+    char *id_s, *pass_s;
+
+    if ((id_s = uw_Basis_requestHeader(ctx, "UrWeb-Client"))
+        && (pass_s = uw_Basis_requestHeader(ctx, "UrWeb-Pass"))) {
+      unsigned id = atoi(id_s);
+      int pass = atoi(pass_s);
+      client *c = find_client(id);
+
+      if (c == NULL)
+        uw_error(ctx, FATAL, "Unknown client ID in HTTP headers (%s, %s)", id_s, pass_s);
+      else {
+        pthread_mutex_lock(&c->lock);
+        ctx->client = c;
+
+        if (c->mode != USED)
+          uw_error(ctx, FATAL, "Stale client ID (%u) in subscription request", id);
+        if (c->pass != pass)
+          uw_error(ctx, FATAL, "Wrong client password (%u, %d) in subscription request", id, pass);
+      }
+    } else {
+      client *c = new_client();
+      pthread_mutex_lock(&c->lock);
+      ctx->client = c;
+    }  
+  }
+}
+
 failure_kind uw_begin(uw_context ctx, char *path) {
   int r = setjmp(ctx->jmp_buf);
 
   if (r == 0) {
     if (uw_db_begin(ctx))
       uw_error(ctx, BOUNDED_RETRY, "Error running SQL BEGIN");
+
     uw_handle(ctx, path);
   }
 
   return r;
 }
 
+uw_Basis_client uw_Basis_self(uw_context ctx, uw_unit u) {
+  if (ctx->client == NULL)
+    uw_error(ctx, FATAL, "Call to Basis.self() from page that has only server-side code");
+
+  return ctx->client->id;
+}
+
 void uw_pop_cleanup(uw_context ctx) {
   if (ctx->cleanup_front == ctx->cleanup)
     uw_error(ctx, FATAL, "Attempt to pop from empty cleanup action stack");
@@ -843,9 +698,6 @@
   if (ctx->script_header[0] == 0)
     return "";
   else {
-    int pass;
-    client *c = uw_new_client(&pass);
-
     char *r = uw_malloc(ctx, strlen(ctx->script_header) + 18 + buf_used(&ctx->script));
     sprintf(r, "%s<script>%s</script>",
             ctx->script_header,
@@ -855,16 +707,13 @@
 }
 
 const char *uw_Basis_get_settings(uw_context ctx, uw_Basis_string onload) {
-  if (ctx->script_header[0] == 0)
+  if (ctx->client == NULL)
     return "";
   else {
-    int pass;
-    client *c = uw_new_client(&pass);
-
     char *r = uw_malloc(ctx, 52 + 3 * INTS_MAX + strlen(ctx->url_prefix) + strlen(onload));
-    sprintf(r, " onload='client_id=%d;client_pass=%d;url_prefix=\"%s\";timeout=%d;listener();%s'",
-            (int)c->id,
-            c->data.used.pass,
+    sprintf(r, " onload='client_id=%u;client_pass=%d;url_prefix=\"%s\";timeout=%d;listener();%s'",
+            ctx->client->id,
+            ctx->client->pass,
             ctx->url_prefix,
             ctx->timeout,
             onload);
@@ -942,6 +791,21 @@
   return r;
 }
 
+char *uw_Basis_jsifyChannel(uw_context ctx, uw_Basis_channel chn) {
+  if (ctx->client == NULL || chn.cli != ctx->client->id)
+    return "null";
+  else {
+    int len;
+    char *r;
+
+    uw_check_heap(ctx, INTS_MAX + 1);
+    r = ctx->heap.front;
+    sprintf(r, "%u%n", chn.chn, &len);
+    ctx->heap.front += len+1;
+    return r;
+  }
+}
+
 uw_Basis_int uw_Basis_new_client_source(uw_context ctx, uw_Basis_string s) {
   int len;
   size_t s_len = strlen(s);
@@ -1007,16 +871,6 @@
   return result;
 }
 
-char *uw_Basis_attrifyChannel(uw_context ctx, uw_Basis_channel n) {
-  char *result;
-  int len;
-  uw_check_heap(ctx, INTS_MAX);
-  result = ctx->heap.front;
-  sprintf(result, "%lld%n", (long long)n, &len);
-  ctx->heap.front += len+1;
-  return result;
-}
-
 char *uw_Basis_attrifyFloat(uw_context ctx, uw_Basis_float n) {
   char *result;
   int len;
@@ -1116,15 +970,19 @@
   return r;
 }
 
-char *uw_Basis_urlifyChannel(uw_context ctx, uw_Basis_channel n) {
-  int len;
-  char *r;
+char *uw_Basis_urlifyChannel(uw_context ctx, uw_Basis_channel chn) {
+  if (ctx->client == NULL || chn.cli != ctx->client->id)
+    return "";
+  else {
+    int len;
+    char *r;
 
-  uw_check_heap(ctx, INTS_MAX);
-  r = ctx->heap.front;
-  sprintf(r, "%lld%n", (long long)n, &len);
-  ctx->heap.front += len+1;
-  return r;
+    uw_check_heap(ctx, INTS_MAX + 1);
+    r = ctx->heap.front;
+    sprintf(r, "%u%n", chn.chn, &len);
+    ctx->heap.front += len+1;
+    return r;
+  }
 }
 
 char *uw_Basis_urlifyFloat(uw_context ctx, uw_Basis_float n) {
@@ -1182,13 +1040,15 @@
   return uw_unit_v;
 }
 
-uw_unit uw_Basis_urlifyChannel_w(uw_context ctx, uw_Basis_channel n) {
-  int len;
-
-  uw_check(ctx, INTS_MAX);
-  sprintf(ctx->page.front, "%lld%n", (long long)n, &len);
-  ctx->page.front += len;
-
+uw_unit uw_Basis_urlifyChannel_w(uw_context ctx, uw_Basis_channel chn) {
+  if (ctx->client != NULL && chn.cli == ctx->client->id) {
+    int len;
+    
+    uw_check(ctx, INTS_MAX + 1);
+    sprintf(ctx->page.front, "%u%n", chn.chn, &len);
+    ctx->page.front += len;
+  }
+    
   return uw_unit_v;
 }
 
@@ -1255,10 +1115,6 @@
   return r;
 }
 
-uw_Basis_channel uw_Basis_unurlifyChannel(uw_context ctx, char **s) {
-  return uw_Basis_unurlifyInt(ctx, s);
-}
-
 uw_Basis_float uw_Basis_unurlifyFloat(uw_context ctx, char **s) {
   char *new_s = uw_unurlify_advance(*s);
   uw_Basis_float r;
@@ -1350,10 +1206,6 @@
   return uw_unit_v;
 }
 
-char *uw_Basis_htmlifyChannel(uw_context ctx, uw_Basis_channel ch) {
-  return uw_Basis_htmlifyInt(ctx, ch);
-}
-
 char *uw_Basis_htmlifyFloat(uw_context ctx, uw_Basis_float n) {
   int len;
   char *r;
@@ -1541,17 +1393,6 @@
   return r;
 }
 
-char *uw_Basis_sqlifyChannel(uw_context ctx, uw_Basis_channel n) {
-  int len;
-  char *r;
-
-  uw_check_heap(ctx, INTS_MAX + 6);
-  r = ctx->heap.front;
-  sprintf(r, "%lld::int4%n", (long long)n, &len);
-  ctx->heap.front += len+1;
-  return r;
-}
-
 char *uw_Basis_sqlifyIntN(uw_context ctx, uw_Basis_int *n) {
   if (n == NULL)
     return "NULL";
@@ -1614,6 +1455,52 @@
   return r;
 }
 
+char *uw_Basis_sqlifyChannel(uw_context ctx, uw_Basis_channel chn) {
+  int len;
+  char *r;
+  unsigned long long combo = ((unsigned long long)chn.cli << 32) | chn.chn;
+
+  uw_check_heap(ctx, INTS_MAX + 7);
+  r = ctx->heap.front;
+  sprintf(r, "%lld::int8%n", combo, &len);
+  ctx->heap.front += len+1;
+  return r;
+}
+
+char *uw_Basis_attrifyChannel(uw_context ctx, uw_Basis_channel chn) {
+  int len;
+  char *r;
+  unsigned long long combo = ((unsigned long long)chn.cli << 32) | chn.chn;
+
+  uw_check_heap(ctx, INTS_MAX + 1);
+  r = ctx->heap.front;
+  sprintf(r, "%lld%n", combo, &len);
+  ctx->heap.front += len+1;
+  return r;
+}
+
+char *uw_Basis_sqlifyClient(uw_context ctx, uw_Basis_client cli) {
+  int len;
+  char *r;
+
+  uw_check_heap(ctx, INTS_MAX + 7);
+  r = ctx->heap.front;
+  sprintf(r, "%u::int4%n", cli, &len);
+  ctx->heap.front += len+1;
+  return r;
+}
+
+char *uw_Basis_attrifyClient(uw_context ctx, uw_Basis_client cli) {
+  int len;
+  char *r;
+
+  uw_check_heap(ctx, INTS_MAX + 1);
+  r = ctx->heap.front;
+  sprintf(r, "%u%n", cli, &len);
+  ctx->heap.front += len+1;
+  return r;
+}
+
 uw_Basis_string uw_Basis_sqlifyStringN(uw_context ctx, uw_Basis_string s) {
   if (s == NULL)
     return "NULL";
@@ -1637,6 +1524,21 @@
 
 char *uw_Basis_sqlifyTime(uw_context ctx, uw_Basis_time t) {
   size_t len;
+  char *r, *s;
+  struct tm stm;
+
+  if (localtime_r(&t, &stm)) {
+    s = uw_malloc(ctx, TIMES_MAX);
+    len = strftime(s, TIMES_MAX, TIME_FMT, &stm);
+    r = uw_malloc(ctx, len + 14);
+    sprintf(r, "'%s'::timestamp", s);
+    return r;
+  } else
+    return "<Invalid time>";
+}
+
+char *uw_Basis_attrifyTime(uw_context ctx, uw_Basis_time t) {
+  size_t len;
   char *r;
   struct tm stm;
 
@@ -1723,18 +1625,6 @@
     return NULL;
 }
 
-uw_Basis_channel *uw_Basis_stringToChannel(uw_context ctx, uw_Basis_string s) {
-  char *endptr;
-  uw_Basis_channel n = strtoll(s, &endptr, 10);
-
-  if (*s != '\0' && *endptr == '\0') {
-    uw_Basis_channel *r = uw_malloc(ctx, sizeof(uw_Basis_channel));
-    *r = n;
-    return r;
-  } else
-    return NULL;
-}
-
 uw_Basis_float *uw_Basis_stringToFloat(uw_context ctx, uw_Basis_string s) {
   char *endptr;
   uw_Basis_float n = strtod(s, &endptr);
@@ -1802,14 +1692,27 @@
     uw_error(ctx, FATAL, "Can't parse int: %s", s);
 }
 
+#include <errno.h>
+
 uw_Basis_channel uw_Basis_stringToChannel_error(uw_context ctx, uw_Basis_string s) {
+  unsigned long long n;
+
+  if (sscanf(s, "%llu", &n) < 1)
+    uw_error(ctx, FATAL, "Can't parse channel: %s", s);
+  else {
+    uw_Basis_channel ch = {n >> 32, n & ((1ull << 32) - 1)};
+    return ch;
+  }
+}
+
+uw_Basis_client uw_Basis_stringToClient_error(uw_context ctx, uw_Basis_string s) {
   char *endptr;
-  uw_Basis_channel n = strtoll(s, &endptr, 10);
+  unsigned long n = strtoul(s, &endptr, 10);
 
   if (*s != '\0' && *endptr == '\0')
     return n;
   else
-    uw_error(ctx, FATAL, "Can't parse channel int: %s", s);
+    uw_error(ctx, FATAL, "Can't parse client: %s", s);
 }
 
 uw_Basis_float uw_Basis_stringToFloat_error(uw_context ctx, uw_Basis_string s) {
@@ -1856,22 +1759,6 @@
   }
 }
 
-uw_Basis_string uw_Basis_requestHeader(uw_context ctx, uw_Basis_string h) {
-  int len = strlen(h);
-  char *s = ctx->headers, *p;
-
-  while (p = strchr(s, ':')) {
-    if (p - s == len && !strncasecmp(s, h, len)) {
-      return p + 2;
-    } else {
-      if ((s = strchr(p, 0)) && s < ctx->headers_end)
-        s += 2;
-      else
-        return NULL;
-    }
-  }
-}
-
 uw_Basis_string uw_Basis_get_cookie(uw_context ctx, uw_Basis_string c) {
   int len = strlen(c);
   char *s = ctx->headers, *p = ctx->outHeaders.start;
@@ -1930,100 +1817,45 @@
   return uw_unit_v;
 }
 
-static channel_delta *allocate_delta(uw_context ctx, channel *ch) {
-  size_t i;
-  channel_delta *cd;
+static delta *allocate_delta(uw_context ctx, unsigned client) {
+  unsigned i;
+  delta *d;
 
-  for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED && ctx->deltas[i].ch != ch; ++i);
+  for (i = 0; i < ctx->used_deltas; ++i)
+    if (ctx->deltas[i].client == client)
+      return &ctx->deltas[i];
 
-  if (i < ctx->n_deltas && ctx->deltas[i].mode == USED && ctx->deltas[i].ch == ch)
-    return &ctx->deltas[i];
+  if (ctx->used_deltas >= ctx->n_deltas) {
+    ctx->deltas = realloc(ctx->deltas, sizeof(delta) * ++ctx->n_deltas);
+    buf_init(&ctx->deltas[ctx->n_deltas-1].msgs, 0);
+  }
 
-  if (i < ctx->n_deltas)
-    cd = &ctx->deltas[i];
-  else {
-    ++ctx->n_deltas;
-    ctx->deltas = realloc(ctx->deltas, sizeof(channel_delta) * ctx->n_deltas);
-    cd = &ctx->deltas[ctx->n_deltas-1];
-    cd->n_subscribed = 0;
-    cd->subscribed = malloc(0);
-    buf_init(&cd->msgs, 0);
-  }
-   
-  cd->mode = USED;
-  cd->newness = OLD;
-  cd->ch = ch;
-  if (cd->n_subscribed > 0)
-    cd->subscribed[0] = NULL;
-  buf_reset(&cd->msgs);
-  return cd;
+  d = &ctx->deltas[ctx->used_deltas++];
+  d->client = client;
+  buf_reset(&d->msgs);
+  return d;
 }
 
 uw_Basis_channel uw_Basis_new_channel(uw_context ctx, uw_unit u) {
-  size_t i;
-  channel *ch = uw_new_channel();
-  ++ch->data.used.refcount;
-  channel_delta *cd = allocate_delta(ctx, ch);
+  if (ctx->client == NULL)
+    uw_error(ctx, FATAL, "Attempt to create channel on request not associated with a persistent connection");
 
-  cd->newness = NEW;
-
-  return ch->id;
-}
-
-static int delta_used(channel_delta *cd) {
-  return cd->newness == NEW || buf_used(&cd->msgs) > 0 || (cd->n_subscribed > 0 && cd->subscribed[0]);
-}
-
-uw_unit uw_Basis_subscribe(uw_context ctx, uw_Basis_channel chn) {
-  channel *ch = uw_find_channel(chn);
-
-  if (ch == NULL)
-    uw_error(ctx, FATAL, "Bad channel ID %d", (int)chn);
-  else {
-    size_t id = atoi(uw_Basis_requestHeader(ctx, "UrWeb-Client"));
-    int pass = atoi(uw_Basis_requestHeader(ctx, "UrWeb-Pass"));
-    client *c = uw_find_client(id);
-
-    if (c == NULL) {
-      uw_release_channel(ch);
-      uw_error(ctx, FATAL, "Unknown client ID in subscription request");
-    } else if (c->data.used.pass != pass) {
-      uw_release_channel(ch);
-      uw_release_client(c);
-      uw_error(ctx, FATAL, "Wrong client password (%d) in subscription request", pass);
-    } else {
-      size_t i;
-      channel_delta *cd = allocate_delta(ctx, ch);
-
-      if (delta_used(cd))
-        uw_release_channel(ch);
-
-      for (i = 0; i < cd->n_subscribed && cd->subscribed[i]; ++i);
-
-      if (i < cd->n_subscribed)
-        cd->subscribed[i] = c;
-      else {
-        ++cd->n_subscribed;
-        cd->subscribed = realloc(cd->subscribed, sizeof(int) * cd->n_subscribed);
-        cd->subscribed[cd->n_subscribed-1] = c;
-      }
-    }
-  }
-
-  return uw_unit_v;
+  return new_channel(ctx->client);
 }
 
 uw_unit uw_Basis_send(uw_context ctx, uw_Basis_channel chn, uw_Basis_string msg) {
-  channel *ch = uw_find_channel(chn);
+  delta *d = allocate_delta(ctx, chn.cli);
+  size_t len;
+  int preLen;
+  char pre[INTS_MAX + 2];
 
-  if (ch == NULL)
-    uw_error(ctx, FATAL, "Bad channel ID %d", (int)chn);
-  else {
-    channel_delta *cd = allocate_delta(ctx, ch);
-    if (delta_used(cd))
-      uw_release_channel(ch);
-    buf_append(&cd->msgs, msg, strlen(msg));
-  }
+  len = strlen(msg);
+
+  sprintf(pre, "%u\n%n", chn.chn, &preLen);
+
+  buf_append(&d->msgs, pre, preLen);
+  buf_append(&d->msgs, msg, len);
+  buf_append(&d->msgs, "\n", 1);
 
   return uw_unit_v;
 }
@@ -2032,46 +1864,27 @@
 int uw_db_rollback(uw_context);
 
 void uw_commit(uw_context ctx) {
-  size_t i, j;
-
-  for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED; ++i) {
-    channel *ch = ctx->deltas[i].ch;
-
-    for (j = 0; j < ctx->deltas[i].n_subscribed && ctx->deltas[i].subscribed[j]; ++j) {
-      client *c = ctx->deltas[i].subscribed[j];
-
-      uw_subscribe(ch, c);
-      uw_release_client(c);
-    }
-
-    if (buf_used(&ctx->deltas[i].msgs) > 0) {
-      uw_channel_send(ch, ctx->deltas[i].msgs.start);
-    }
-
-    uw_release_channel(ch);
-  }
+  unsigned i;
 
   if (uw_db_commit(ctx))
     uw_error(ctx, FATAL, "Error running SQL COMMIT");
+
+  for (i = 0; i < ctx->used_deltas; ++i) {
+    delta *d = &ctx->deltas[i];
+    client *c = find_client(d->client);
+
+    assert (c != NULL && c->mode == USED);
+
+    client_send(c == ctx->client, c, &d->msgs);
+  }
+
+  if (ctx->client)
+    pthread_mutex_unlock(&ctx->client->lock);
 }
 
 int uw_rollback(uw_context ctx) {
-  size_t i, j;
-
-  for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED; ++i) {
-    channel *ch = ctx->deltas[i].ch;
-
-    for (j = 0; j < ctx->deltas[i].n_subscribed && ctx->deltas[i].subscribed[j]; ++j) {
-      client *c = ctx->deltas[i].subscribed[j];
-
-      uw_release_client(c);
-    }
-
-    if (ctx->deltas[i].newness == NEW)
-      uw_free_channel(ch);
-    else
-      uw_release_channel(ch);
-  }
+  if (ctx->client)
+    pthread_mutex_unlock(&ctx->client->lock);
 
   return uw_db_rollback(ctx);
 }