diff src/c/urweb.c @ 667:a93d5324f400

Dummy message delivery to clients
author Adam Chlipala <adamc@hcoop.net>
date Thu, 19 Mar 2009 16:34:13 -0400
parents 5130228d2b29
children b0c1a46b1f15
line wrap: on
line diff
--- a/src/c/urweb.c	Thu Mar 19 13:47:02 2009 -0400
+++ b/src/c/urweb.c	Thu Mar 19 16:34:13 2009 -0400
@@ -8,10 +8,234 @@
 #include <setjmp.h>
 #include <stdarg.h>
 
+#include <pthread.h>
+
 #include "types.h"
 
 uw_unit uw_unit_v = {};
 
+
+// Socket extras
+
+int uw_really_send(int sock, const void *buf, size_t len) {
+  while (len > 0) {
+    size_t n = send(sock, buf, len, 0);
+
+    if (n < 0)
+      return n;
+
+    buf += n;
+    len -= n;
+  }
+
+  return 0;
+}
+
+
+// Buffers
+
+typedef struct {
+  char *start, *front, *back;
+} buf;
+
+static void buf_init(buf *b, size_t s) {
+  b->front = b->start = malloc(s);
+  b->back = b->front + s;
+}
+
+static void buf_free(buf *b) {
+  free(b->start);
+}
+
+static void buf_reset(buf *b) {
+  b->front = b->start;
+}
+
+static void buf_check(buf *b, size_t extra) {
+  if (b->back - b->front < extra) {
+    size_t desired = b->front - b->start + extra, next;
+    char *new_heap;
+
+    next = b->back - b->start;
+    if (next == 0)
+      next = 1;
+    for (; next < desired; next *= 2);
+
+    new_heap = realloc(b->start, next);
+    b->front = new_heap + (b->front - b->start);
+    b->back = new_heap + next;
+    b->start = new_heap;
+  }
+}
+
+static size_t buf_used(buf *b) {
+  return b->front - b->start;
+}
+
+static size_t buf_avail(buf *b) {
+  return b->back - b->start;
+}
+
+
+// Cross-request state
+
+typedef enum { UNUSED, USED } usage;
+
+typedef struct client {
+  size_t id;
+  usage mode;
+  union {
+    struct client *next;
+    struct {
+      pthread_mutex_t lock;
+      int pass;
+      buf msgs;
+      int sock;
+      time_t last_contact;
+    } used;
+  } data;
+} client;
+
+static client **clients, *clients_free;
+static size_t n_clients;
+
+static pthread_mutex_t clients_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+void uw_global_init() {
+  srand(time(NULL) ^ getpid());
+
+  clients = malloc(0);
+}
+
+void uw_global_free() {
+  size_t i;
+
+  for (i = 0; i < n_clients; ++i)
+    free(clients[i]);
+
+  free(clients);
+}
+
+static client *uw_new_client() {
+  client *c;
+
+  pthread_mutex_lock(&clients_mutex);
+
+  if (clients_free) {
+    c = clients_free;
+    clients_free = clients_free->data.next;
+  }
+  else {
+    ++n_clients;
+    clients = realloc(clients, sizeof(client) * n_clients);
+    c = malloc(sizeof(client));
+    c->id = n_clients-1;
+    clients[n_clients-1] = c;
+  }
+
+  c->mode = USED;
+  pthread_mutex_init(&c->data.used.lock, NULL);
+  c->data.used.pass = rand();
+  c->data.used.sock = -1;
+  c->data.used.last_contact = time(NULL);
+  buf_init(&c->data.used.msgs, 0);
+
+  pthread_mutex_unlock(&clients_mutex);
+
+  return c;
+}
+
+static const char begin_msgs[] = "HTTP/1.1 200 OK\r\nContent-type: text/plain\r\n\r\n";
+
+void uw_client_connect(size_t id, int pass, int sock) {
+  client *c;
+
+  pthread_mutex_lock(&clients_mutex);
+
+  if (id >= n_clients) {
+    pthread_mutex_unlock(&clients_mutex);
+    close(sock);
+    fprintf(stderr, "Out-of-bounds client request (%d)\n", (int)id);
+    return;
+  }
+
+  c = clients[id];
+
+  if (c->mode != USED) {
+    pthread_mutex_unlock(&clients_mutex);
+    close(sock);
+    fprintf(stderr, "Client request for unused ID (%d)\n", (int)id);
+    return;
+  }
+
+  pthread_mutex_lock(&c->data.used.lock);
+
+  if (pass != c->data.used.pass) {
+    pthread_mutex_unlock(&c->data.used.lock);
+    pthread_mutex_unlock(&clients_mutex);
+    close(sock);
+    fprintf(stderr, "Wrong client password (%d)\n", (int)id);
+    return;
+  }
+
+  if (c->data.used.sock != -1) {
+    pthread_mutex_unlock(&c->data.used.lock);
+    pthread_mutex_unlock(&clients_mutex);
+    close(sock);
+    fprintf(stderr, "Duplicate client connection (%d)\n", (int)id);
+    return;
+  }
+
+  c->data.used.last_contact = time(NULL);
+
+  if (buf_used(&c->data.used.msgs) > 0) {
+    uw_really_send(sock, begin_msgs, sizeof(begin_msgs) - 1);
+    uw_really_send(sock, c->data.used.msgs.start, buf_used(&c->data.used.msgs));
+    close(sock);
+  }
+  else {
+    uw_really_send(sock, begin_msgs, sizeof(begin_msgs) - 1);
+    uw_really_send(sock, "Hi!", 3);
+    close(sock);
+    //c->data.used.sock = sock;
+  }
+
+  pthread_mutex_unlock(&c->data.used.lock);
+  pthread_mutex_unlock(&clients_mutex);
+}
+
+static void uw_free_client(client *c) {
+  printf("Freeing client %d\n", c->id);
+
+  if (c->mode == USED && c->data.used.sock != -1)
+    close(c->data.used.sock);
+
+  pthread_mutex_destroy(&c->data.used.lock);
+  buf_free(&c->data.used.msgs);
+  c->mode = UNUSED;
+  c->data.next = clients_free;
+  clients_free = c;
+}
+
+void uw_prune_clients(time_t timeout) {
+  size_t i;
+  time_t cutoff;
+
+  cutoff = time(NULL) - timeout;
+
+  pthread_mutex_lock(&clients_mutex);
+
+  for (i = 0; i < n_clients; ++i) {
+    if (clients[i]->mode == USED && clients[i]->data.used.last_contact < cutoff)
+      uw_free_client(clients[i]);
+  }
+
+  pthread_mutex_unlock(&clients_mutex);
+}
+
+
+// Single-request state
+
 #define ERROR_BUF_LEN 1024
 
 typedef struct regions {
@@ -23,10 +247,6 @@
   void *arg;
 } cleanup;
 
-typedef struct {
-  char *start, *front, *back;
-} buf;
-
 struct uw_context {
   char *headers, *headers_end;
 
@@ -43,18 +263,13 @@
 
   cleanup *cleanup, *cleanup_front, *cleanup_back;
 
-  const char *script_header;
+  const char *script_header, *url_prefix;
 
   char error_message[ERROR_BUF_LEN];
 };
 
 extern int uw_inputs_len;
 
-static void buf_init(buf *b, size_t s) {
-  b->front = b->start = malloc(s);
-  b->back = b->front + s;
-}
-
 uw_context uw_init(size_t outHeaders_len, size_t script_len, size_t page_len, size_t heap_len) {
   uw_context ctx = malloc(sizeof(struct uw_context));
 
@@ -64,6 +279,7 @@
   buf_init(&ctx->page, page_len);
   buf_init(&ctx->heap, heap_len);
   buf_init(&ctx->script, script_len);
+  ctx->script.start[0] = 0;
 
   ctx->inputs = calloc(uw_inputs_len, sizeof(char *));
 
@@ -74,6 +290,7 @@
   ctx->cleanup_front = ctx->cleanup_back = ctx->cleanup = malloc(0);
 
   ctx->script_header = "";
+  ctx->url_prefix = "/";
   
   ctx->error_message[0] = 0;
 
@@ -90,10 +307,6 @@
   return ctx->db;
 }
 
-static void buf_free(buf *b) {
-  free(b->front);
-}
-
 void uw_free(uw_context ctx) {
   buf_free(&ctx->outHeaders);
   buf_free(&ctx->script);
@@ -104,13 +317,10 @@
   free(ctx);
 }
 
-static void buf_reset(buf *b) {
-  b->front = b->start;
-}
-
 void uw_reset_keep_error_message(uw_context ctx) {
   buf_reset(&ctx->outHeaders);
   buf_reset(&ctx->script);
+  ctx->script.start[0] = 0;
   buf_reset(&ctx->page);
   buf_reset(&ctx->heap);
   ctx->regions = NULL;
@@ -249,7 +459,12 @@
   ctx->script_header = s;
 }
 
-static void buf_check(uw_context ctx, buf *b, size_t extra, const char *desc) {
+void uw_set_url_prefix(uw_context ctx, const char *s) {
+  ctx->url_prefix = s;
+}
+
+
+static void buf_check_ctx(uw_context ctx, buf *b, size_t extra, const char *desc) {
   if (b->back - b->front < extra) {
     size_t desired = b->front - b->start + extra, next;
     char *new_heap;
@@ -263,7 +478,7 @@
     b->front = new_heap + (b->front - b->start);
     b->back = new_heap + next;
 
-    if (desc && new_heap != b->start) {
+    if (new_heap != b->start) {
       b->start = new_heap;
       uw_error(ctx, UNLIMITED_RETRY, "Couldn't allocate new %s contiguously", desc);
     }
@@ -273,7 +488,7 @@
 }
 
 static void uw_check_heap(uw_context ctx, size_t extra) {
-  buf_check(ctx, &ctx->heap, extra, "heap chunk");
+  buf_check_ctx(ctx, &ctx->heap, extra, "heap chunk");
 }
 
 void *uw_malloc(uw_context ctx, size_t len) {
@@ -307,14 +522,6 @@
   ctx->regions = r->next;
 }
 
-static size_t buf_used(buf *b) {
-  return b->front - b->start;
-}
-
-static size_t buf_avail(buf *b) {
-  return b->back - b->start;
-}
-
 void uw_memstats(uw_context ctx) {
   printf("Headers: %d/%d\n", buf_used(&ctx->outHeaders), buf_avail(&ctx->outHeaders));
   printf("Script: %d/%d\n", buf_used(&ctx->script), buf_avail(&ctx->script));
@@ -322,20 +529,6 @@
   printf("Heap: %d/%d\n", buf_used(&ctx->heap), buf_avail(&ctx->heap));
 }
 
-int uw_really_send(int sock, const void *buf, size_t len) {
-  while (len > 0) {
-    size_t n = send(sock, buf, len, 0);
-
-    if (n < 0)
-      return n;
-
-    buf += n;
-    len -= n;
-  }
-
-  return 0;
-}
-
 int uw_send(uw_context ctx, int sock) {
   int n = uw_really_send(sock, ctx->outHeaders.start, ctx->outHeaders.front - ctx->outHeaders.start);
 
@@ -351,7 +544,7 @@
 }
 
 static void uw_check_headers(uw_context ctx, size_t extra) {
-  buf_check(ctx, &ctx->outHeaders, extra, NULL);
+  buf_check(&ctx->outHeaders, extra);
 }
 
 void uw_write_header(uw_context ctx, uw_Basis_string s) {
@@ -363,7 +556,7 @@
 }
 
 static void uw_check_script(uw_context ctx, size_t extra) {
-  buf_check(ctx, &ctx->script, extra, NULL);
+  buf_check(&ctx->script, extra);
 }
 
 void uw_write_script(uw_context ctx, uw_Basis_string s) {
@@ -375,16 +568,27 @@
 }
 
 const char *uw_Basis_get_script(uw_context ctx, uw_unit u) {
-  if (ctx->script.front == ctx->script.start) {
-    return ctx->script_header;
-  } else {
-    char *r = uw_malloc(ctx, 41 + (ctx->script.front - ctx->script.start) + strlen(ctx->script_header));
+  if (ctx->script_header[0] == 0)
+    return "";
+  else {
+    int pass;
+    client *c = uw_new_client(&pass);
 
-    sprintf(r, "%s<script>%s</script>", ctx->script_header, ctx->script.start);
+    char *r = uw_malloc(ctx, strlen(ctx->script_header) + 56 + 2 * INTS_MAX + buf_used(&ctx->script)
+                        + strlen(ctx->url_prefix));
+    sprintf(r, "%s<script>client_id=%d;client_pass=%d;url_prefix=\"%s\";%s</script>",
+            ctx->script_header, (int)c->id, c->data.used.pass, ctx->url_prefix, ctx->script.start);
     return r;
   }
 }
 
+const char *uw_Basis_get_listener(uw_context ctx, uw_unit u) {
+  if (ctx->script_header[0] == 0)
+    return "";
+  else
+    return " onload='listener()'";
+}
+
 uw_Basis_string uw_Basis_jsifyString(uw_context ctx, uw_Basis_string s) {
   char *r, *s2;
 
@@ -486,7 +690,7 @@
 }
 
 static void uw_check(uw_context ctx, size_t extra) {
-  buf_check(ctx, &ctx->page, extra, NULL);
+  buf_check(&ctx->page, extra);
 }
 
 static void uw_writec_unsafe(uw_context ctx, char c) {
@@ -1370,3 +1574,5 @@
 
   return uw_unit_v;
 }
+
+