diff src/c/urweb.c @ 671:729e65db2e2f

Transactionalize channel operations
author Adam Chlipala <adamc@hcoop.net>
date Tue, 24 Mar 2009 14:44:45 -0400
parents f68eee90dbcf
children df6eb58de040
line wrap: on
line diff
--- a/src/c/urweb.c	Sun Mar 22 16:03:45 2009 -0400
+++ b/src/c/urweb.c	Tue Mar 24 14:44:45 2009 -0400
@@ -98,6 +98,7 @@
       buf msgs;
       int sock;
       time_t last_contact;
+      unsigned refcount;
     } used;
   } data;
 } client;
@@ -129,6 +130,7 @@
   c->data.used.pass = rand();
   c->data.used.sock = -1;
   c->data.used.last_contact = time(NULL);
+  c->data.used.refcount = 0;
   buf_init(&c->data.used.msgs, 0);
 
   pthread_mutex_unlock(&clients_mutex);
@@ -154,11 +156,20 @@
     pthread_mutex_unlock(&clients_mutex);
     return NULL;
   }
-
+  
+  pthread_mutex_lock(&c->data.used.lock);
+  ++c->data.used.refcount;
+  pthread_mutex_unlock(&c->data.used.lock);
   pthread_mutex_unlock(&clients_mutex);
   return c;
 }
 
+static void uw_release_client(client *c) {
+  pthread_mutex_lock(&c->data.used.lock);
+  --c->data.used.refcount;
+  pthread_mutex_unlock(&c->data.used.lock);
+}
+
 void uw_client_connect(size_t id, int pass, int sock) {
   client *c = uw_find_client(id);
 
@@ -168,6 +179,8 @@
     return;
   }
 
+  uw_release_client(c);
+
   pthread_mutex_lock(&c->data.used.lock);
 
   if (pass != c->data.used.pass) {
@@ -219,7 +232,8 @@
   pthread_mutex_lock(&clients_mutex);
 
   for (i = 0; i < n_clients; ++i) {
-    if (clients[i]->mode == USED && clients[i]->data.used.last_contact < cutoff)
+    if (clients[i]->mode == USED && clients[i]->data.used.last_contact < cutoff
+        && clients[i]->data.used.refcount == 0)
       uw_free_client(clients[i]);
   }
 
@@ -242,6 +256,7 @@
     struct {
       pthread_mutex_t lock;
       client_list *clients;
+      unsigned refcount;
     } used;
   } data;
 } channel;
@@ -271,25 +286,53 @@
   ch->mode = USED;
   pthread_mutex_init(&ch->data.used.lock, NULL);
   ch->data.used.clients = NULL;
+  ch->data.used.refcount = 0;
 
   pthread_mutex_unlock(&channels_mutex);
 
   return ch;
 }
 
+static void uw_free_channel(channel *ch) {
+  if (ch->mode == USED) {
+    client_list *cs;
+
+    for (cs = ch->data.used.clients; cs; ) {
+      client_list *tmp = cs->next;
+      free(cs);
+      cs = tmp;
+    }
+    pthread_mutex_destroy(&ch->data.used.lock);
+    ch->mode = UNUSED;
+    ch->data.next = channels_free;
+    channels_free = ch;
+  }
+}
+
 static channel *uw_find_channel(size_t id) {
   channel *ch = NULL;
 
   pthread_mutex_lock(&channels_mutex);
 
-  if (id < n_channels && channels[id]->mode == USED)
+  if (id < n_channels && channels[id]->mode == USED) {
     ch = channels[id];
 
+    pthread_mutex_lock(&ch->data.used.lock);
+    ++ch->data.used.refcount;
+    pthread_mutex_unlock(&ch->data.used.lock);
+  }
+
   pthread_mutex_unlock(&channels_mutex);
 
   return ch;
 }
 
+static void uw_release_channel(channel *ch) {
+  pthread_mutex_lock(&ch->data.used.lock);
+  ++ch->data.used.refcount;
+  pthread_mutex_unlock(&ch->data.used.lock);
+}
+
 static void uw_subscribe(channel *ch, client *c) {
   client_list *cs = malloc(sizeof(client_list));
 
@@ -342,7 +385,6 @@
     pthread_mutex_lock(&c->data.used.lock);
 
     if (c->data.used.sock != -1) {
-      printf("Immediate send\n");
       uw_really_send(c->data.used.sock, begin_msgs, sizeof(begin_msgs) - 1);
       uw_really_send(c->data.used.sock, pre, preLen);
       uw_really_send(c->data.used.sock, msg, len);
@@ -350,7 +392,6 @@
       close(c->data.used.sock);
       c->data.used.sock = -1;
     } else {
-      printf("Delayed send\n");
       buf_append(&c->data.used.msgs, pre, preLen);
       buf_append(&c->data.used.msgs, msg, len);
       buf_append(&c->data.used.msgs, "\n", 1);
@@ -386,6 +427,17 @@
   void *arg;
 } cleanup;
 
+typedef struct {
+  usage mode;
+  channel *ch;
+  enum { OLD, NEW } newness;
+
+  size_t n_subscribed;
+  client **subscribed;
+
+  buf msgs;
+} channel_delta;
+
 struct uw_context {
   char *headers, *headers_end;
 
@@ -404,6 +456,9 @@
 
   const char *script_header, *url_prefix;
 
+  size_t n_deltas;
+  channel_delta *deltas;
+
   char error_message[ERROR_BUF_LEN];
 };
 
@@ -435,6 +490,9 @@
 
   ctx->source_count = 0;
 
+  ctx->n_deltas = 0;
+  ctx->deltas = malloc(0);
+
   return ctx;
 }
 
@@ -447,12 +505,20 @@
 }
 
 void uw_free(uw_context ctx) {
+  size_t i;
+
   buf_free(&ctx->outHeaders);
   buf_free(&ctx->script);
   buf_free(&ctx->page);
   buf_free(&ctx->heap);
   free(ctx->inputs);
   free(ctx->cleanup);
+
+  for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED; ++i) {
+    free(ctx->deltas[i].subscribed);
+    buf_free(&ctx->deltas[i].msgs);
+  }
+
   free(ctx);
 }
 
@@ -465,6 +531,8 @@
   ctx->regions = NULL;
   ctx->cleanup_front = ctx->cleanup;
   ctx->source_count = 0;
+  if (ctx->n_deltas > 0)
+    ctx->deltas[0].mode = UNUSED;
 }
 
 void uw_reset_keep_request(uw_context ctx) {
@@ -506,14 +574,7 @@
   ctx->headers_end = s;
 }
 
-failure_kind uw_begin(uw_context ctx, char *path) {
-  int r = setjmp(ctx->jmp_buf);
-
-  if (r == 0)
-    uw_handle(ctx, path);
-
-  return r;
-}
+int uw_db_begin(uw_context);
 
 __attribute__((noreturn)) void uw_error(uw_context ctx, failure_kind fk, const char *fmt, ...) {
   cleanup *cl;
@@ -548,6 +609,18 @@
   ++ctx->cleanup_front;
 }
 
+failure_kind uw_begin(uw_context ctx, char *path) {
+  int r = setjmp(ctx->jmp_buf);
+
+  if (r == 0) {
+    if (uw_db_begin(ctx))
+      uw_error(ctx, BOUNDED_RETRY, "Error running SQL BEGIN");
+    uw_handle(ctx, path);
+  }
+
+  return r;
+}
+
 void uw_pop_cleanup(uw_context ctx) {
   if (ctx->cleanup_front == ctx->cleanup)
     uw_error(ctx, FATAL, "Attempt to pop from empty cleanup action stack");
@@ -1728,8 +1801,45 @@
   return uw_unit_v;
 }
 
+static channel_delta *allocate_delta(uw_context ctx, channel *ch) {
+  size_t i;
+  channel_delta *cd;
+
+  for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED && ctx->deltas[i].ch != ch; ++i);
+
+  if (i < ctx->n_deltas && ctx->deltas[i].mode == USED && ctx->deltas[i].ch == ch)
+    return &ctx->deltas[i];
+
+  if (i < ctx->n_deltas)
+    cd = &ctx->deltas[i];
+  else {
+    ++ctx->n_deltas;
+    ctx->deltas = realloc(ctx->deltas, sizeof(channel_delta) * ctx->n_deltas);
+    cd = &ctx->deltas[ctx->n_deltas-1];
+  }
+   
+  cd->mode = USED;
+  cd->newness = OLD;
+  cd->ch = ch;
+  if (cd->n_subscribed > 0)
+    cd->subscribed[0] = NULL;
+  buf_reset(&cd->msgs);
+  return cd;
+}
+
 uw_Basis_channel uw_Basis_new_channel(uw_context ctx, uw_unit u) {
-  return uw_new_channel()->id;
+  size_t i;
+  channel *ch = uw_new_channel();
+  ++ch->data.used.refcount;
+  channel_delta *cd = allocate_delta(ctx, ch);
+
+  cd->newness = NEW;
+
+  return ch->id;
+}
+
+static int delta_used(channel_delta *cd) {
+  return cd->newness == NEW || buf_used(&cd->msgs) > 0 || (cd->n_subscribed > 0 && cd->subscribed[0]);
 }
 
 uw_unit uw_Basis_subscribe(uw_context ctx, uw_Basis_channel chn) {
@@ -1742,13 +1852,33 @@
     int pass = atoi(uw_Basis_requestHeader(ctx, "UrWeb-Pass"));
     client *c = uw_find_client(id);
 
-    if (c == NULL)
+    if (c == NULL) {
+      uw_release_channel(ch);
       uw_error(ctx, FATAL, "Unknown client ID in subscription request");
-    else if (c->data.used.pass != pass)
+    } else if (c->data.used.pass != pass) {
+      uw_release_channel(ch);
+      uw_release_client(c);
       uw_error(ctx, FATAL, "Wrong client password in subscription request");
-    else
-      uw_subscribe(ch, c);
+    } else {
+      size_t i;
+      channel_delta *cd = allocate_delta(ctx, ch);
+
+      if (delta_used(cd))
+        uw_release_channel(ch);
+
+      for (i = 0; i < cd->n_subscribed && cd->subscribed[i]; ++i);
+
+      if (i < cd->n_subscribed)
+        cd->subscribed[i] = c;
+      else {
+        ++cd->n_subscribed;
+        cd->subscribed = realloc(cd->subscribed, sizeof(int) * cd->n_subscribed);
+        cd->subscribed[cd->n_subscribed-1] = c;
+      }
+    }
   }
+
+  return uw_unit_v;
 }
 
 uw_unit uw_Basis_send(uw_context ctx, uw_Basis_channel chn, uw_Basis_string msg) {
@@ -1756,6 +1886,60 @@
 
   if (ch == NULL)
     uw_error(ctx, FATAL, "Bad channel ID %d", (int)chn);
-  else
-    uw_channel_send(ch, msg);
+  else {
+    channel_delta *cd = allocate_delta(ctx, ch);
+    if (delta_used(cd))
+      uw_release_channel(ch);
+    buf_append(&cd->msgs, msg, strlen(msg));
+  }
+
+  return uw_unit_v;
 }
+
+int uw_db_commit(uw_context);
+int uw_db_rollback(uw_context);
+
+void uw_commit(uw_context ctx) {
+  size_t i, j;
+
+  for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED; ++i) {
+    channel *ch = ctx->deltas[i].ch;
+
+    for (j = 0; j < ctx->deltas[i].n_subscribed && ctx->deltas[i].subscribed[j]; ++j) {
+      client *c = ctx->deltas[i].subscribed[j];
+
+      uw_subscribe(ch, c);
+      uw_release_client(c);
+    }
+
+    if (buf_used(&ctx->deltas[i].msgs) > 0) {
+      uw_channel_send(ch, ctx->deltas[i].msgs.start);
+    }
+
+    uw_release_channel(ch);
+  }
+
+  if (uw_db_commit(ctx))
+    uw_error(ctx, FATAL, "Error running SQL COMMIT");
+}
+
+int uw_rollback(uw_context ctx) {
+  size_t i, j;
+
+  for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED; ++i) {
+    channel *ch = ctx->deltas[i].ch;
+
+    for (j = 0; j < ctx->deltas[i].n_subscribed && ctx->deltas[i].subscribed[j]; ++j) {
+      client *c = ctx->deltas[i].subscribed[j];
+
+      uw_release_client(c);
+    }
+
+    if (ctx->deltas[i].newness == NEW)
+      uw_free_channel(ch);
+    else
+      uw_release_channel(ch);
+  }
+
+  return uw_db_rollback(ctx);
+}