changeset 2033:ea0ecd5fa9df

Add locking to enforce atomicity of message sends from one transaction
author Adam Chlipala <adam@chlipala.net>
date Fri, 27 Jun 2014 14:39:31 -0400
parents 884673e5f7d5
children 1d36654c2d21
files src/c/urweb.c
diffstat 1 files changed, 25 insertions(+), 10 deletions(-) [+]
line wrap: on
line diff
--- a/src/c/urweb.c	Wed Jun 25 14:19:58 2014 -0400
+++ b/src/c/urweb.c	Fri Jun 27 14:39:31 2014 -0400
@@ -3304,6 +3304,8 @@
   return s;
 }
 
+static pthread_mutex_t message_send_mutex = PTHREAD_MUTEX_INITIALIZER;
+
 int uw_commit(uw_context ctx) {
   int i;
   char *sig;
@@ -3323,10 +3325,17 @@
         }
       }
 
+  // Here's an important lock to provide the abstraction that all messages from one transaction are sent as an atomic unit.
+  if (ctx->used_deltas > 0)
+    pthread_mutex_lock(&message_send_mutex);
+
   if (ctx->transaction_started) {
     int code = ctx->app->db_commit(ctx);
 
     if (code) {
+      if (ctx->used_deltas > 0)
+        pthread_mutex_unlock(&message_send_mutex);
+
       if (ctx->client)
         release_client(ctx->client);
 
@@ -3360,16 +3369,19 @@
       if (ctx->transactionals[i].commit) {
         ctx->transactionals[i].commit(ctx->transactionals[i].data);
         if (uw_has_error(ctx)) {
-           if (ctx->client)
-             release_client(ctx->client);
-
-           for (i = ctx->used_transactionals-1; i >= 0; --i)
-             if (ctx->transactionals[i].rollback != NULL)
-               ctx->transactionals[i].rollback(ctx->transactionals[i].data);
-
-           for (i = ctx->used_transactionals-1; i >= 0; --i)
-             if (ctx->transactionals[i].free)
-               ctx->transactionals[i].free(ctx->transactionals[i].data, 0);
+          if (ctx->used_deltas > 0)
+            pthread_mutex_unlock(&message_send_mutex);
+
+          if (ctx->client)
+            release_client(ctx->client);
+
+          for (i = ctx->used_transactionals-1; i >= 0; --i)
+            if (ctx->transactionals[i].rollback != NULL)
+              ctx->transactionals[i].rollback(ctx->transactionals[i].data);
+
+          for (i = ctx->used_transactionals-1; i >= 0; --i)
+            if (ctx->transactionals[i].free)
+              ctx->transactionals[i].free(ctx->transactionals[i].data, 0);
 
           return 0;
         }
@@ -3385,6 +3397,9 @@
     client_send(c, &d->msgs, ctx->script.start, uw_buffer_used(&ctx->script));
   }
 
+  if (ctx->used_deltas > 0)
+    pthread_mutex_unlock(&message_send_mutex);
+
   if (ctx->client)
     release_client(ctx->client);