changeset 667:a93d5324f400

Dummy message delivery to clients
author Adam Chlipala <adamc@hcoop.net>
date Thu, 19 Mar 2009 16:34:13 -0400
parents 5130228d2b29
children b0c1a46b1f15
files include/urweb.h lib/js/urweb.js src/c/driver.c src/c/urweb.c src/cjr_print.sml src/monoize.sml
diffstat 6 files changed, 334 insertions(+), 54 deletions(-) [+]
line wrap: on
line diff
--- a/include/urweb.h	Thu Mar 19 13:47:02 2009 -0400
+++ b/include/urweb.h	Thu Mar 19 16:34:13 2009 -0400
@@ -6,6 +6,11 @@
 
 extern uw_unit uw_unit_v;
 
+void uw_global_init(void);
+
+void uw_client_connect(size_t id, int pass, int sock);
+void uw_prune_clients(time_t timeout);
+
 uw_context uw_init(size_t outHeaders_len, size_t script_len, size_t page_len, size_t heap_len);
 void uw_set_db(uw_context, void*);
 void *uw_get_db(uw_context);
@@ -41,6 +46,7 @@
 
 void uw_set_script_header(uw_context, const char*);
 const char *uw_Basis_get_script(uw_context, uw_unit);
+const char *uw_Basis_get_listener(uw_context, uw_unit);
 
 char *uw_Basis_htmlifyInt(uw_context, uw_Basis_int);
 char *uw_Basis_htmlifyFloat(uw_context, uw_Basis_float);
--- a/lib/js/urweb.js	Thu Mar 19 13:47:02 2009 -0400
+++ b/lib/js/urweb.js	Thu Mar 19 16:34:13 2009 -0400
@@ -174,3 +174,40 @@
   xhr.open("GET", uri, true);
   xhr.send(null);
 }
+
+
+var client_id = 0;
+var client_pass = 0;
+var url_prefix = "/";
+
+function path_join(s1, s2) {
+  if (s1.length > 0 && s1[s1.length-1] == '/')
+    return s1 + s2;
+  else
+    return s1 + "/" + s2;
+}
+
+function listener() {
+  var xhr = getXHR();
+
+  xhr.onreadystatechange = function() {
+    if (xhr.readyState == 4) {
+      var isok = false;
+
+      try {
+        if (xhr.status == 200)
+          isok = true;
+      } catch (e) { }
+
+      if (isok)
+        alert("Messages: " + xhr.responseText);
+      else {
+        alert("Error querying remote server for messages!");
+        throw "Error querying remote server for messages!";
+      }
+    }
+  };
+
+  xhr.open("GET", path_join(url_prefix, ".msgs/" + client_id + "/" + client_pass), true);
+  xhr.send(null);
+}
--- a/src/c/driver.c	Thu Mar 19 13:47:02 2009 -0400
+++ b/src/c/driver.c	Thu Mar 19 16:34:13 2009 -0400
@@ -106,7 +106,7 @@
 
   while (1) {
     char buf[uw_bufsize+1], *back = buf, *s;
-    int sock;
+    int sock, dont_close = 0;
 
     pthread_mutex_lock(&queue_mutex);
     while (empty())
@@ -138,6 +138,7 @@
       if (s = strstr(buf, "\r\n\r\n")) {
         failure_kind fk;
         char *cmd, *path, *headers, path_copy[uw_bufsize+1], *inputs;
+        int id, pass;
 
         s[2] = 0;
 
@@ -168,6 +169,12 @@
           break;
         }
 
+        if (sscanf(path, "/.msgs/%d/%d", &id, &pass) == 2) {
+          uw_client_connect(id, pass, sock);
+          dont_close = 1;
+          break;
+        }
+
         if (inputs = strchr(path, '?')) {
           char *name, *value;
           *inputs++ = 0;
@@ -286,11 +293,19 @@
       }
     }
 
-    close(sock);
+    if (!dont_close)
+      close(sock);
     uw_reset(ctx);
   }
 }
 
+static void *client_pruner(void *data) {
+  while (1) {
+    uw_prune_clients(5);
+    sleep(5);
+  }
+}
+
 static void help(char *cmd) {
   printf("Usage: %s [-p <port>] [-t <thread-count>]\n", cmd);
 }
@@ -377,8 +392,20 @@
 
   sin_size = sizeof their_addr;
 
+  uw_global_init();
+
   printf("Listening on port %d....\n", uw_port);
 
+  {
+    pthread_t thread;
+    int name;
+
+    if (pthread_create(&thread, NULL, client_pruner, &name)) {
+      fprintf(stderr, "Error creating pruner thread\n");
+      return 1;
+    }
+  }
+
   for (i = 0; i < nthreads; ++i) {
     pthread_t thread;    
     names[i] = i;
--- a/src/c/urweb.c	Thu Mar 19 13:47:02 2009 -0400
+++ b/src/c/urweb.c	Thu Mar 19 16:34:13 2009 -0400
@@ -8,10 +8,234 @@
 #include <setjmp.h>
 #include <stdarg.h>
 
+#include <pthread.h>
+
 #include "types.h"
 
 uw_unit uw_unit_v = {};
 
+
+// Socket extras
+
+int uw_really_send(int sock, const void *buf, size_t len) {
+  while (len > 0) {
+    size_t n = send(sock, buf, len, 0);
+
+    if (n < 0)
+      return n;
+
+    buf += n;
+    len -= n;
+  }
+
+  return 0;
+}
+
+
+// Buffers
+
+typedef struct {
+  char *start, *front, *back;
+} buf;
+
+static void buf_init(buf *b, size_t s) {
+  b->front = b->start = malloc(s);
+  b->back = b->front + s;
+}
+
+static void buf_free(buf *b) {
+  free(b->start);
+}
+
+static void buf_reset(buf *b) {
+  b->front = b->start;
+}
+
+static void buf_check(buf *b, size_t extra) {
+  if (b->back - b->front < extra) {
+    size_t desired = b->front - b->start + extra, next;
+    char *new_heap;
+
+    next = b->back - b->start;
+    if (next == 0)
+      next = 1;
+    for (; next < desired; next *= 2);
+
+    new_heap = realloc(b->start, next);
+    b->front = new_heap + (b->front - b->start);
+    b->back = new_heap + next;
+    b->start = new_heap;
+  }
+}
+
+static size_t buf_used(buf *b) {
+  return b->front - b->start;
+}
+
+static size_t buf_avail(buf *b) {
+  return b->back - b->start;
+}
+
+
+// Cross-request state
+
+typedef enum { UNUSED, USED } usage;
+
+typedef struct client {
+  size_t id;
+  usage mode;
+  union {
+    struct client *next;
+    struct {
+      pthread_mutex_t lock;
+      int pass;
+      buf msgs;
+      int sock;
+      time_t last_contact;
+    } used;
+  } data;
+} client;
+
+static client **clients, *clients_free;
+static size_t n_clients;
+
+static pthread_mutex_t clients_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+void uw_global_init() {
+  srand(time(NULL) ^ getpid());
+
+  clients = malloc(0);
+}
+
+void uw_global_free() {
+  size_t i;
+
+  for (i = 0; i < n_clients; ++i)
+    free(clients[i]);
+
+  free(clients);
+}
+
+static client *uw_new_client() {
+  client *c;
+
+  pthread_mutex_lock(&clients_mutex);
+
+  if (clients_free) {
+    c = clients_free;
+    clients_free = clients_free->data.next;
+  }
+  else {
+    ++n_clients;
+    clients = realloc(clients, sizeof(client) * n_clients);
+    c = malloc(sizeof(client));
+    c->id = n_clients-1;
+    clients[n_clients-1] = c;
+  }
+
+  c->mode = USED;
+  pthread_mutex_init(&c->data.used.lock, NULL);
+  c->data.used.pass = rand();
+  c->data.used.sock = -1;
+  c->data.used.last_contact = time(NULL);
+  buf_init(&c->data.used.msgs, 0);
+
+  pthread_mutex_unlock(&clients_mutex);
+
+  return c;
+}
+
+static const char begin_msgs[] = "HTTP/1.1 200 OK\r\nContent-type: text/plain\r\n\r\n";
+
+void uw_client_connect(size_t id, int pass, int sock) {
+  client *c;
+
+  pthread_mutex_lock(&clients_mutex);
+
+  if (id >= n_clients) {
+    pthread_mutex_unlock(&clients_mutex);
+    close(sock);
+    fprintf(stderr, "Out-of-bounds client request (%d)\n", (int)id);
+    return;
+  }
+
+  c = clients[id];
+
+  if (c->mode != USED) {
+    pthread_mutex_unlock(&clients_mutex);
+    close(sock);
+    fprintf(stderr, "Client request for unused ID (%d)\n", (int)id);
+    return;
+  }
+
+  pthread_mutex_lock(&c->data.used.lock);
+
+  if (pass != c->data.used.pass) {
+    pthread_mutex_unlock(&c->data.used.lock);
+    pthread_mutex_unlock(&clients_mutex);
+    close(sock);
+    fprintf(stderr, "Wrong client password (%d)\n", (int)id);
+    return;
+  }
+
+  if (c->data.used.sock != -1) {
+    pthread_mutex_unlock(&c->data.used.lock);
+    pthread_mutex_unlock(&clients_mutex);
+    close(sock);
+    fprintf(stderr, "Duplicate client connection (%d)\n", (int)id);
+    return;
+  }
+
+  c->data.used.last_contact = time(NULL);
+
+  if (buf_used(&c->data.used.msgs) > 0) {
+    uw_really_send(sock, begin_msgs, sizeof(begin_msgs) - 1);
+    uw_really_send(sock, c->data.used.msgs.start, buf_used(&c->data.used.msgs));
+    close(sock);
+  }
+  else {
+    uw_really_send(sock, begin_msgs, sizeof(begin_msgs) - 1);
+    uw_really_send(sock, "Hi!", 3);
+    close(sock);
+    //c->data.used.sock = sock;
+  }
+
+  pthread_mutex_unlock(&c->data.used.lock);
+  pthread_mutex_unlock(&clients_mutex);
+}
+
+static void uw_free_client(client *c) {
+  printf("Freeing client %d\n", c->id);
+
+  if (c->mode == USED && c->data.used.sock != -1)
+    close(c->data.used.sock);
+
+  pthread_mutex_destroy(&c->data.used.lock);
+  buf_free(&c->data.used.msgs);
+  c->mode = UNUSED;
+  c->data.next = clients_free;
+  clients_free = c;
+}
+
+void uw_prune_clients(time_t timeout) {
+  size_t i;
+  time_t cutoff;
+
+  cutoff = time(NULL) - timeout;
+
+  pthread_mutex_lock(&clients_mutex);
+
+  for (i = 0; i < n_clients; ++i) {
+    if (clients[i]->mode == USED && clients[i]->data.used.last_contact < cutoff)
+      uw_free_client(clients[i]);
+  }
+
+  pthread_mutex_unlock(&clients_mutex);
+}
+
+
+// Single-request state
+
 #define ERROR_BUF_LEN 1024
 
 typedef struct regions {
@@ -23,10 +247,6 @@
   void *arg;
 } cleanup;
 
-typedef struct {
-  char *start, *front, *back;
-} buf;
-
 struct uw_context {
   char *headers, *headers_end;
 
@@ -43,18 +263,13 @@
 
   cleanup *cleanup, *cleanup_front, *cleanup_back;
 
-  const char *script_header;
+  const char *script_header, *url_prefix;
 
   char error_message[ERROR_BUF_LEN];
 };
 
 extern int uw_inputs_len;
 
-static void buf_init(buf *b, size_t s) {
-  b->front = b->start = malloc(s);
-  b->back = b->front + s;
-}
-
 uw_context uw_init(size_t outHeaders_len, size_t script_len, size_t page_len, size_t heap_len) {
   uw_context ctx = malloc(sizeof(struct uw_context));
 
@@ -64,6 +279,7 @@
   buf_init(&ctx->page, page_len);
   buf_init(&ctx->heap, heap_len);
   buf_init(&ctx->script, script_len);
+  ctx->script.start[0] = 0;
 
   ctx->inputs = calloc(uw_inputs_len, sizeof(char *));
 
@@ -74,6 +290,7 @@
   ctx->cleanup_front = ctx->cleanup_back = ctx->cleanup = malloc(0);
 
   ctx->script_header = "";
+  ctx->url_prefix = "/";
   
   ctx->error_message[0] = 0;
 
@@ -90,10 +307,6 @@
   return ctx->db;
 }
 
-static void buf_free(buf *b) {
-  free(b->front);
-}
-
 void uw_free(uw_context ctx) {
   buf_free(&ctx->outHeaders);
   buf_free(&ctx->script);
@@ -104,13 +317,10 @@
   free(ctx);
 }
 
-static void buf_reset(buf *b) {
-  b->front = b->start;
-}
-
 void uw_reset_keep_error_message(uw_context ctx) {
   buf_reset(&ctx->outHeaders);
   buf_reset(&ctx->script);
+  ctx->script.start[0] = 0;
   buf_reset(&ctx->page);
   buf_reset(&ctx->heap);
   ctx->regions = NULL;
@@ -249,7 +459,12 @@
   ctx->script_header = s;
 }
 
-static void buf_check(uw_context ctx, buf *b, size_t extra, const char *desc) {
+void uw_set_url_prefix(uw_context ctx, const char *s) {
+  ctx->url_prefix = s;
+}
+
+
+static void buf_check_ctx(uw_context ctx, buf *b, size_t extra, const char *desc) {
   if (b->back - b->front < extra) {
     size_t desired = b->front - b->start + extra, next;
     char *new_heap;
@@ -263,7 +478,7 @@
     b->front = new_heap + (b->front - b->start);
     b->back = new_heap + next;
 
-    if (desc && new_heap != b->start) {
+    if (new_heap != b->start) {
       b->start = new_heap;
       uw_error(ctx, UNLIMITED_RETRY, "Couldn't allocate new %s contiguously", desc);
     }
@@ -273,7 +488,7 @@
 }
 
 static void uw_check_heap(uw_context ctx, size_t extra) {
-  buf_check(ctx, &ctx->heap, extra, "heap chunk");
+  buf_check_ctx(ctx, &ctx->heap, extra, "heap chunk");
 }
 
 void *uw_malloc(uw_context ctx, size_t len) {
@@ -307,14 +522,6 @@
   ctx->regions = r->next;
 }
 
-static size_t buf_used(buf *b) {
-  return b->front - b->start;
-}
-
-static size_t buf_avail(buf *b) {
-  return b->back - b->start;
-}
-
 void uw_memstats(uw_context ctx) {
   printf("Headers: %d/%d\n", buf_used(&ctx->outHeaders), buf_avail(&ctx->outHeaders));
   printf("Script: %d/%d\n", buf_used(&ctx->script), buf_avail(&ctx->script));
@@ -322,20 +529,6 @@
   printf("Heap: %d/%d\n", buf_used(&ctx->heap), buf_avail(&ctx->heap));
 }
 
-int uw_really_send(int sock, const void *buf, size_t len) {
-  while (len > 0) {
-    size_t n = send(sock, buf, len, 0);
-
-    if (n < 0)
-      return n;
-
-    buf += n;
-    len -= n;
-  }
-
-  return 0;
-}
-
 int uw_send(uw_context ctx, int sock) {
   int n = uw_really_send(sock, ctx->outHeaders.start, ctx->outHeaders.front - ctx->outHeaders.start);
 
@@ -351,7 +544,7 @@
 }
 
 static void uw_check_headers(uw_context ctx, size_t extra) {
-  buf_check(ctx, &ctx->outHeaders, extra, NULL);
+  buf_check(&ctx->outHeaders, extra);
 }
 
 void uw_write_header(uw_context ctx, uw_Basis_string s) {
@@ -363,7 +556,7 @@
 }
 
 static void uw_check_script(uw_context ctx, size_t extra) {
-  buf_check(ctx, &ctx->script, extra, NULL);
+  buf_check(&ctx->script, extra);
 }
 
 void uw_write_script(uw_context ctx, uw_Basis_string s) {
@@ -375,16 +568,27 @@
 }
 
 const char *uw_Basis_get_script(uw_context ctx, uw_unit u) {
-  if (ctx->script.front == ctx->script.start) {
-    return ctx->script_header;
-  } else {
-    char *r = uw_malloc(ctx, 41 + (ctx->script.front - ctx->script.start) + strlen(ctx->script_header));
+  if (ctx->script_header[0] == 0)
+    return "";
+  else {
+    int pass;
+    client *c = uw_new_client(&pass);
 
-    sprintf(r, "%s<script>%s</script>", ctx->script_header, ctx->script.start);
+    char *r = uw_malloc(ctx, strlen(ctx->script_header) + 56 + 2 * INTS_MAX + buf_used(&ctx->script)
+                        + strlen(ctx->url_prefix));
+    sprintf(r, "%s<script>client_id=%d;client_pass=%d;url_prefix=\"%s\";%s</script>",
+            ctx->script_header, (int)c->id, c->data.used.pass, ctx->url_prefix, ctx->script.start);
     return r;
   }
 }
 
+const char *uw_Basis_get_listener(uw_context ctx, uw_unit u) {
+  if (ctx->script_header[0] == 0)
+    return "";
+  else
+    return " onload='listener()'";
+}
+
 uw_Basis_string uw_Basis_jsifyString(uw_context ctx, uw_Basis_string s) {
   char *r, *s2;
 
@@ -486,7 +690,7 @@
 }
 
 static void uw_check(uw_context ctx, size_t extra) {
-  buf_check(ctx, &ctx->page, extra, NULL);
+  buf_check(&ctx->page, extra);
 }
 
 static void uw_writec_unsafe(uw_context ctx, char c) {
@@ -1370,3 +1574,5 @@
 
   return uw_unit_v;
 }
+
+
--- a/src/cjr_print.sml	Thu Mar 19 13:47:02 2009 -0400
+++ b/src/cjr_print.sml	Thu Mar 19 16:34:13 2009 -0400
@@ -2355,6 +2355,9 @@
                                                                    ^ "\\\"></script>\\n"
                                               | ServerOnly => ""),
                                     string "\");",
+                                    string "uw_set_url_prefix(ctx, \"",
+                                    string (!Monoize.urlPrefix),
+                                    string "\");",
                                     newline]),
                      box [string "{",
                           newline,
--- a/src/monoize.sml	Thu Mar 19 13:47:02 2009 -0400
+++ b/src/monoize.sml	Thu Mar 19 16:34:13 2009 -0400
@@ -1924,7 +1924,8 @@
                     end
             in
                 case tag of
-                    "body" => normal ("body", NONE,
+                    "body" => normal ("body",
+                                      SOME (L'.EFfiApp ("Basis", "get_listener", [(L'.ERecord [], loc)]), loc),
                                       SOME (L'.EFfiApp ("Basis", "get_script", [(L'.ERecord [], loc)]), loc))
 
                   | "dyn" =>