diff src/c/urweb.c @ 668:b0c1a46b1f15

First message send delivered, but not interpreted
author Adam Chlipala <adamc@hcoop.net>
date Sun, 22 Mar 2009 15:05:07 -0400
parents a93d5324f400
children f68eee90dbcf
line wrap: on
line diff
--- a/src/c/urweb.c	Thu Mar 19 16:34:13 2009 -0400
+++ b/src/c/urweb.c	Sun Mar 22 15:05:07 2009 -0400
@@ -76,8 +76,14 @@
   return b->back - b->start;
 }
 
+static void buf_append(buf *b, const char *s, size_t len) {
+  buf_check(b, len);
+  memcpy(b->front, s, len);
+  b->front += len;
+}
 
-// Cross-request state
+
+// Persistent client state
 
 typedef enum { UNUSED, USED } usage;
 
@@ -101,21 +107,6 @@
 
 static pthread_mutex_t clients_mutex = PTHREAD_MUTEX_INITIALIZER;
 
-void uw_global_init() {
-  srand(time(NULL) ^ getpid());
-
-  clients = malloc(0);
-}
-
-void uw_global_free() {
-  size_t i;
-
-  for (i = 0; i < n_clients; ++i)
-    free(clients[i]);
-
-  free(clients);
-}
-
 static client *uw_new_client() {
   client *c;
 
@@ -147,24 +138,33 @@
 
 static const char begin_msgs[] = "HTTP/1.1 200 OK\r\nContent-type: text/plain\r\n\r\n";
 
-void uw_client_connect(size_t id, int pass, int sock) {
+static client *uw_find_client(size_t id) {
   client *c;
 
   pthread_mutex_lock(&clients_mutex);
 
   if (id >= n_clients) {
     pthread_mutex_unlock(&clients_mutex);
-    close(sock);
-    fprintf(stderr, "Out-of-bounds client request (%d)\n", (int)id);
-    return;
+    return NULL;
   }
 
   c = clients[id];
 
   if (c->mode != USED) {
     pthread_mutex_unlock(&clients_mutex);
+    return NULL;
+  }
+
+  pthread_mutex_unlock(&clients_mutex);
+  return c;
+}
+
+void uw_client_connect(size_t id, int pass, int sock) {
+  client *c = uw_find_client(id);
+
+  if (c == NULL) {
     close(sock);
-    fprintf(stderr, "Client request for unused ID (%d)\n", (int)id);
+    fprintf(stderr, "Out-of-bounds client request (%d)\n", (int)id);
     return;
   }
 
@@ -172,7 +172,6 @@
 
   if (pass != c->data.used.pass) {
     pthread_mutex_unlock(&c->data.used.lock);
-    pthread_mutex_unlock(&clients_mutex);
     close(sock);
     fprintf(stderr, "Wrong client password (%d)\n", (int)id);
     return;
@@ -180,7 +179,6 @@
 
   if (c->data.used.sock != -1) {
     pthread_mutex_unlock(&c->data.used.lock);
-    pthread_mutex_unlock(&clients_mutex);
     close(sock);
     fprintf(stderr, "Duplicate client connection (%d)\n", (int)id);
     return;
@@ -193,15 +191,10 @@
     uw_really_send(sock, c->data.used.msgs.start, buf_used(&c->data.used.msgs));
     close(sock);
   }
-  else {
-    uw_really_send(sock, begin_msgs, sizeof(begin_msgs) - 1);
-    uw_really_send(sock, "Hi!", 3);
-    close(sock);
-    //c->data.used.sock = sock;
-  }
+  else
+    c->data.used.sock = sock;
 
   pthread_mutex_unlock(&c->data.used.lock);
-  pthread_mutex_unlock(&clients_mutex);
 }
 
 static void uw_free_client(client *c) {
@@ -234,6 +227,150 @@
 }
 
 
+// Persistent channel state
+
+typedef struct client_list {
+  client *data;
+  struct client_list *next;
+} client_list;
+
+typedef struct channel {
+  size_t id;
+  usage mode;
+  union {
+    struct channel *next;
+    struct {
+      pthread_mutex_t lock;
+      client_list *clients;
+    } used;
+  } data;
+} channel;
+
+static channel **channels, *channels_free;
+static size_t n_channels;
+
+static pthread_mutex_t channels_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+static channel *uw_new_channel() {
+  channel *ch;
+
+  pthread_mutex_lock(&channels_mutex);
+
+  if (channels_free) {
+    ch = channels_free;
+    channels_free = channels_free->data.next;
+  }
+  else {
+    ++n_channels;
+    channels = realloc(channels, sizeof(channels) * n_channels);
+    ch = malloc(sizeof(channel));
+    ch->id = n_channels-1;
+    channels[n_channels-1] = ch;
+  }
+
+  ch->mode = USED;
+  pthread_mutex_init(&ch->data.used.lock, NULL);
+  ch->data.used.clients = NULL;
+
+  pthread_mutex_unlock(&channels_mutex);
+
+  return ch;
+}
+
+static channel *uw_find_channel(size_t id) {
+  channel *ch = NULL;
+
+  pthread_mutex_lock(&channels_mutex);
+
+  if (id < n_channels && channels[id]->mode == USED)
+    ch = channels[id];
+
+  pthread_mutex_unlock(&channels_mutex);
+
+  return ch;
+}
+
+static void uw_subscribe(channel *ch, client *c) {
+  client_list *cs = malloc(sizeof(client_list));
+
+  pthread_mutex_lock(&ch->data.used.lock);
+
+  cs->data = c;
+  cs->next = ch->data.used.clients;
+  ch->data.used.clients = cs;
+
+  pthread_mutex_unlock(&ch->data.used.lock);
+}
+
+static void uw_unsubscribe(channel *ch, client *c) {
+  client_list *prev, *cur, *tmp;
+
+  pthread_mutex_lock(&ch->data.used.lock);
+
+  for (prev = NULL, cur = ch->data.used.clients; cur; ) {
+    if (cur->data == c) {
+      if (prev)
+        prev->next = cur->next;
+      else
+        ch->data.used.clients = cur->next;
+      tmp = cur;
+      cur = cur->next;
+      free(tmp);
+    }
+    else {
+      prev = cur;
+      cur = cur->next;
+    }
+  }
+
+  pthread_mutex_unlock(&ch->data.used.lock);
+}
+
+static void uw_channel_send(channel *ch, const char *msg) {
+  size_t len = strlen(msg), preLen;
+  char pre[INTS_MAX + 2];
+  client_list *cs;
+
+  sprintf(pre, "%d\n", (int)ch->id);
+  preLen = strlen(pre);
+
+  pthread_mutex_lock(&ch->data.used.lock);
+
+  for (cs = ch->data.used.clients; cs; cs = cs->next) {
+    client *c = cs->data;
+
+    pthread_mutex_lock(&c->data.used.lock);
+
+    if (c->data.used.sock != -1) {
+      uw_really_send(c->data.used.sock, begin_msgs, sizeof(begin_msgs) - 1);
+      uw_really_send(c->data.used.sock, pre, preLen);
+      uw_really_send(c->data.used.sock, msg, len);
+      uw_really_send(c->data.used.sock, "\n", 1);
+      close(c->data.used.sock);
+      c->data.used.sock = -1;
+    } else {
+      buf_append(&c->data.used.msgs, pre, preLen);
+      buf_append(&c->data.used.msgs, msg, len);
+      buf_append(&c->data.used.msgs, "\n", 1);
+    }
+
+    pthread_mutex_unlock(&c->data.used.lock);
+  }
+
+  pthread_mutex_unlock(&ch->data.used.lock);
+}
+
+
+// Global entry points
+
+void uw_global_init() {
+  srand(time(NULL) ^ getpid());
+
+  clients = malloc(0);
+  channels = malloc(0);
+}
+
+
 // Single-request state
 
 #define ERROR_BUF_LEN 1024
@@ -582,11 +719,17 @@
   }
 }
 
-const char *uw_Basis_get_listener(uw_context ctx, uw_unit u) {
+const char *uw_Basis_get_listener(uw_context ctx, uw_Basis_string onload) {
   if (ctx->script_header[0] == 0)
     return "";
-  else
+  else if (onload[0] == 0)
     return " onload='listener()'";
+  else {
+    uw_Basis_string s = uw_malloc(ctx, strlen(onload) + 22);
+
+    sprintf(s, " onload='listener();%s'", onload);
+    return s;
+  }
 }
 
 uw_Basis_string uw_Basis_jsifyString(uw_context ctx, uw_Basis_string s) {
@@ -941,6 +1084,10 @@
   return r;
 }
 
+uw_Basis_channel uw_Basis_unurlifyChannel(uw_context ctx, char **s) {
+  return uw_Basis_unurlifyInt(ctx, s);
+}
+
 uw_Basis_float uw_Basis_unurlifyFloat(uw_context ctx, char **s) {
   char *new_s = uw_unurlify_advance(*s);
   uw_Basis_float r;
@@ -1032,6 +1179,10 @@
   return uw_unit_v;
 }
 
+char *uw_Basis_htmlifyChannel(uw_context ctx, uw_Basis_channel ch) {
+  return uw_Basis_htmlifyInt(ctx, ch);
+}
+
 char *uw_Basis_htmlifyFloat(uw_context ctx, uw_Basis_float n) {
   int len;
   char *r;
@@ -1575,4 +1726,34 @@
   return uw_unit_v;
 }
 
+uw_Basis_channel uw_Basis_new_channel(uw_context ctx, uw_unit u) {
+  return uw_new_channel()->id;
+}
 
+uw_unit uw_Basis_subscribe(uw_context ctx, uw_Basis_channel chn) {
+  channel *ch = uw_find_channel(chn);
+
+  if (ch == NULL)
+    uw_error(ctx, FATAL, "Bad channel ID %d", (int)chn);
+  else {
+    size_t id = atoi(uw_Basis_requestHeader(ctx, "UrWeb-Client"));
+    int pass = atoi(uw_Basis_requestHeader(ctx, "UrWeb-Pass"));
+    client *c = uw_find_client(id);
+
+    if (c == NULL)
+      uw_error(ctx, FATAL, "Unknown client ID in subscription request");
+    else if (c->data.used.pass != pass)
+      uw_error(ctx, FATAL, "Wrong client password in subscription request");
+    else
+      uw_subscribe(ch, c);
+  }
+}
+
+uw_unit uw_Basis_send(uw_context ctx, uw_Basis_channel chn, uw_Basis_string msg) {
+  channel *ch = uw_find_channel(chn);
+
+  if (ch == NULL)
+    uw_error(ctx, FATAL, "Bad channel ID %d", (int)chn);
+  else
+    uw_channel_send(ch, msg);
+}