changeset 2288:98f96a976ede

Finish locking, but it's not yet tested rigorously.
author Ziv Scully <ziv@mit.edu>
date Fri, 13 Nov 2015 11:03:09 -0500
parents 08203f93dbc3
children 78820fa8f5a7
files include/urweb/types_cpp.h include/urweb/urweb_cpp.h src/c/urweb.c src/lru_cache.sml src/sqlcache.sml
diffstat 5 files changed, 164 insertions(+), 75 deletions(-) [+]
line wrap: on
line diff
--- a/include/urweb/types_cpp.h	Fri Nov 13 01:05:22 2015 -0500
+++ b/include/urweb/types_cpp.h	Fri Nov 13 11:03:09 2015 -0500
@@ -133,7 +133,8 @@
 typedef struct uw_Sqlcache_Entry uw_Sqlcache_Entry;
 
 typedef struct uw_Sqlcache_Cache {
-  pthread_rwlock_t lock;
+  pthread_rwlock_t lockOut;
+  pthread_rwlock_t lockIn;
   uw_Sqlcache_Entry *table;
   unsigned long timeInvalid;
   unsigned long timeNow;
--- a/include/urweb/urweb_cpp.h	Fri Nov 13 01:05:22 2015 -0500
+++ b/include/urweb/urweb_cpp.h	Fri Nov 13 11:03:09 2015 -0500
@@ -406,6 +406,8 @@
 
 // Sqlcache.
 
+void *uw_Sqlcache_rlock(struct uw_context *, uw_Sqlcache_Cache *);
+void *uw_Sqlcache_wlock(struct uw_context *, uw_Sqlcache_Cache *);
 uw_Sqlcache_Value *uw_Sqlcache_check(struct uw_context *, uw_Sqlcache_Cache *, char **);
 void *uw_Sqlcache_store(struct uw_context *, uw_Sqlcache_Cache *, char **, uw_Sqlcache_Value *);
 void *uw_Sqlcache_flush(struct uw_context *, uw_Sqlcache_Cache *, char **);
--- a/src/c/urweb.c	Fri Nov 13 01:05:22 2015 -0500
+++ b/src/c/urweb.c	Fri Nov 13 11:03:09 2015 -0500
@@ -366,6 +366,9 @@
 
   uw_global_custom();
   uw_init_crypto();
+
+  // Fast non-cryptographic strength randomness for Sqlcache.
+  srandom(clock());
 }
 
 void uw_app_init(uw_app *app) {
@@ -431,6 +434,11 @@
   struct uw_Sqlcache_Update *next;
 } uw_Sqlcache_Update;
 
+typedef struct uw_Sqlcache_Unlock {
+  pthread_rwlock_t *lock;
+  struct uw_Sqlcache_Unlock *next;
+} uw_Sqlcache_Unlock;
+
 struct uw_context {
   uw_app *app;
   int id;
@@ -500,6 +508,7 @@
   int recordingOffset;
   uw_Sqlcache_Update *cacheUpdate;
   uw_Sqlcache_Update *cacheUpdateTail;
+  uw_Sqlcache_Unlock *cacheUnlock;
 
   int remoteSock;
 };
@@ -4556,7 +4565,7 @@
   UT_hash_handle hh;
 } uw_Sqlcache_Entry;
 
-void uw_Sqlcache_freeValue(uw_Sqlcache_Value *value) {
+static void uw_Sqlcache_freeValue(uw_Sqlcache_Value *value) {
   if (value) {
     free(value->result);
     free(value->output);
@@ -4564,7 +4573,7 @@
   }
 }
 
-void uw_Sqlcache_freeEntry(uw_Sqlcache_Entry* entry) {
+static void uw_Sqlcache_freeEntry(uw_Sqlcache_Entry* entry) {
   if (entry) {
     free(entry->key);
     uw_Sqlcache_freeValue(entry->value);
@@ -4573,14 +4582,14 @@
 }
 
 // TODO: pick a number.
-unsigned int uw_Sqlcache_maxSize = 1234567890;
-
-void uw_Sqlcache_delete(uw_Sqlcache_Cache *cache, uw_Sqlcache_Entry *entry) {
+static unsigned int uw_Sqlcache_maxSize = 1234567890;
+
+static void uw_Sqlcache_delete(uw_Sqlcache_Cache *cache, uw_Sqlcache_Entry *entry) {
   HASH_DEL(cache->table, entry);
   uw_Sqlcache_freeEntry(entry);
 }
 
-uw_Sqlcache_Entry *uw_Sqlcache_find(uw_Sqlcache_Cache *cache, char *key, size_t len, int bump) {
+static uw_Sqlcache_Entry *uw_Sqlcache_find(uw_Sqlcache_Cache *cache, char *key, size_t len, int bump) {
   uw_Sqlcache_Entry *entry = NULL;
   HASH_FIND(hh, cache->table, key, len, entry);
   if (entry && bump) {
@@ -4592,7 +4601,7 @@
   return entry;
 }
 
-void uw_Sqlcache_add(uw_Sqlcache_Cache *cache, uw_Sqlcache_Entry *entry, size_t len) {
+static void uw_Sqlcache_add(uw_Sqlcache_Cache *cache, uw_Sqlcache_Entry *entry, size_t len) {
   HASH_ADD_KEYPTR(hh, cache->table, entry->key, len, entry);
   if (HASH_COUNT(cache->table) > uw_Sqlcache_maxSize) {
     // Deletes the first element of the cache.
@@ -4600,17 +4609,17 @@
   }
 }
 
-unsigned long uw_Sqlcache_getTimeNow(uw_Sqlcache_Cache *cache) {
+static unsigned long uw_Sqlcache_getTimeNow(uw_Sqlcache_Cache *cache) {
   return ++cache->timeNow;
 }
 
-unsigned long uw_Sqlcache_timeMax(unsigned long x, unsigned long y) {
+static unsigned long uw_Sqlcache_timeMax(unsigned long x, unsigned long y) {
   return x > y ? x : y;
 }
 
-char uw_Sqlcache_keySep = '_';
-
-char *uw_Sqlcache_allocKeyBuffer(char **keys, size_t numKeys) {
+static char uw_Sqlcache_keySep = '_';
+
+static char *uw_Sqlcache_allocKeyBuffer(char **keys, size_t numKeys) {
   size_t len = 0;
   while (numKeys-- > 0) {
     char* k = keys[numKeys];
@@ -4627,7 +4636,7 @@
   return buf;
 }
 
-char *uw_Sqlcache_keyCopy(char *buf, char *key) {
+static char *uw_Sqlcache_keyCopy(char *buf, char *key) {
   *buf++ = uw_Sqlcache_keySep;
   return stpcpy(buf, key);
 }
@@ -4635,7 +4644,12 @@
 // The NUL-terminated prefix of [key] below always looks something like "_k1_k2_k3..._kn".
 
 uw_Sqlcache_Value *uw_Sqlcache_check(uw_context ctx, uw_Sqlcache_Cache *cache, char **keys) {
-  pthread_rwlock_rdlock(&cache->lock);
+  int doBump = random() % 1024 == 0;
+  if (doBump) {
+    pthread_rwlock_wrlock(&cache->lockIn);
+  } else {
+    pthread_rwlock_rdlock(&cache->lockIn);
+  }
   size_t numKeys = cache->numKeys;
   char *key = uw_Sqlcache_allocKeyBuffer(keys, numKeys);
   char *buf = key;
@@ -4645,46 +4659,49 @@
     entry = cache->table;
     if (!entry) {
       free(key);
-      pthread_rwlock_unlock(&cache->lock);
+      pthread_rwlock_unlock(&cache->lockIn);
       return NULL;
     }
   } else {
     while (numKeys-- > 0) {
       buf = uw_Sqlcache_keyCopy(buf, keys[numKeys]);
       size_t len = buf - key;
-      entry = uw_Sqlcache_find(cache, key, len, 1);
+      entry = uw_Sqlcache_find(cache, key, len, doBump);
       if (!entry) {
         free(key);
-        pthread_rwlock_unlock(&cache->lock);
+        pthread_rwlock_unlock(&cache->lockIn);
         return NULL;
       }
       timeInvalid = uw_Sqlcache_timeMax(timeInvalid, entry->timeInvalid);
     }
     free(key);
   }
-  // TODO: pass back copy of value and free it in the generated code... or use uw_malloc?
   uw_Sqlcache_Value *value = entry->value;
-  pthread_rwlock_unlock(&cache->lock);
+  pthread_rwlock_unlock(&cache->lockIn);
+  // ASK: though the argument isn't trivial, this is safe, right?
+  // Returning outside the lock is safe because updates happen at commit time.
+  // Those are the only times the returned value or its strings can get freed.
+  // Handler output is a new string, so it's safe to free this at commit time.
   return value && value->timeValid > timeInvalid ? value : NULL;
 }
 
-void uw_Sqlcache_storeCommitOne(uw_Sqlcache_Cache *cache, char **keys, uw_Sqlcache_Value *value) {
-  pthread_rwlock_wrlock(&cache->lock);
+static void uw_Sqlcache_storeCommitOne(uw_Sqlcache_Cache *cache, char **keys, uw_Sqlcache_Value *value) {
+  pthread_rwlock_wrlock(&cache->lockIn);
   size_t numKeys = cache->numKeys;
-  char *key = uw_Sqlcache_allocKeyBuffer(keys, numKeys);
-  char *buf = key;
   time_t timeNow = uw_Sqlcache_getTimeNow(cache);
   uw_Sqlcache_Entry *entry;
   if (numKeys == 0) {
     entry = cache->table;
     if (!entry) {
       entry = malloc(sizeof(uw_Sqlcache_Entry));
-      entry->key = strdup(key);
+      entry->key = NULL;
       entry->value = NULL;
       entry->timeInvalid = 0;
       cache->table = entry;
     }
   } else {
+    char *key = uw_Sqlcache_allocKeyBuffer(keys, numKeys);
+    char *buf = key;
     while (numKeys-- > 0) {
       buf = uw_Sqlcache_keyCopy(buf, keys[numKeys]);
       size_t len = buf - key;
@@ -4702,23 +4719,23 @@
   uw_Sqlcache_freeValue(entry->value);
   entry->value = value;
   entry->value->timeValid = timeNow;
-  pthread_rwlock_unlock(&cache->lock);
-}
-
-void uw_Sqlcache_flushCommitOne(uw_Sqlcache_Cache *cache, char **keys) {
-  pthread_rwlock_wrlock(&cache->lock);
+  pthread_rwlock_unlock(&cache->lockIn);
+}
+
+static void uw_Sqlcache_flushCommitOne(uw_Sqlcache_Cache *cache, char **keys) {
+  pthread_rwlock_wrlock(&cache->lockIn);
   size_t numKeys = cache->numKeys;
-  char *key = uw_Sqlcache_allocKeyBuffer(keys, numKeys);
-  char *buf = key;
-  time_t timeNow = uw_Sqlcache_getTimeNow(cache);
-  uw_Sqlcache_Entry *entry;
   if (numKeys == 0) {
-    entry = cache->table;
+    uw_Sqlcache_Entry *entry = cache->table;
     if (entry) {
       uw_Sqlcache_freeValue(entry->value);
       entry->value = NULL;
     }
   } else {
+    char *key = uw_Sqlcache_allocKeyBuffer(keys, numKeys);
+    char *buf = key;
+    time_t timeNow = uw_Sqlcache_getTimeNow(cache);
+    uw_Sqlcache_Entry *entry = NULL;
     while (numKeys-- > 0) {
       char *k = keys[numKeys];
       if (!k) {
@@ -4729,15 +4746,16 @@
           cache->timeInvalid = timeNow;
         }
         free(key);
-        pthread_rwlock_unlock(&cache->lock);
+        pthread_rwlock_unlock(&cache->lockIn);
         return;
       }
       buf = uw_Sqlcache_keyCopy(buf, k);
       size_t len = buf - key;
       entry = uw_Sqlcache_find(cache, key, len, 0);
       if (!entry) {
+        // Nothing in the cache to flush.
         free(key);
-        pthread_rwlock_unlock(&cache->lock);
+        pthread_rwlock_unlock(&cache->lockIn);
         return;
       }
     }
@@ -4745,10 +4763,25 @@
     // All the keys were non-null and the relevant entry is present, so we delete it.
     uw_Sqlcache_delete(cache, entry);
   }
-  pthread_rwlock_unlock(&cache->lock);
-}
-
-void uw_Sqlcache_freeUpdate(void *data, int dontCare) {
+  pthread_rwlock_unlock(&cache->lockIn);
+}
+
+static void uw_Sqlcache_commit(void *data) {
+  uw_context ctx = (uw_context)data;
+  uw_Sqlcache_Update *update = ctx->cacheUpdate;
+  while (update) {
+    uw_Sqlcache_Cache *cache = update->cache;
+    char **keys = update->keys;
+    if (update->value) {
+      uw_Sqlcache_storeCommitOne(cache, keys, update->value);
+    } else {
+      uw_Sqlcache_flushCommitOne(cache, keys);
+    }
+    update = update->next;
+  }
+}
+
+static void uw_Sqlcache_free(void *data, int dontCare) {
   uw_context ctx = (uw_context)data;
   uw_Sqlcache_Update *update = ctx->cacheUpdate;
   while (update) {
@@ -4765,24 +4798,38 @@
   }
   ctx->cacheUpdate = NULL;
   ctx->cacheUpdateTail = NULL;
-}
-
-void uw_Sqlcache_commitUpdate(void *data) {
-  uw_context ctx = (uw_context)data;
-  uw_Sqlcache_Update *update = ctx->cacheUpdate;
-  while (update) {
-    uw_Sqlcache_Cache *cache = update->cache;
-    char **keys = update->keys;
-    if (update->value) {
-      uw_Sqlcache_storeCommitOne(cache, keys, update->value);
-    } else {
-      uw_Sqlcache_flushCommitOne(cache, keys);
-    }
-    update = update->next;
+  uw_Sqlcache_Unlock *unlock = ctx->cacheUnlock;
+  while (unlock) {
+    pthread_rwlock_unlock(unlock->lock);
+    uw_Sqlcache_Unlock *nextUnlock = unlock->next;
+    free(unlock);
+    unlock = nextUnlock;
   }
-}
-
-char **uw_Sqlcache_copyKeys(char **keys, size_t numKeys) {
+  ctx->cacheUnlock = NULL;
+}
+
+static void uw_Sqlcache_pushUnlock(uw_context ctx, pthread_rwlock_t *lock) {
+  if (!ctx->cacheUnlock) {
+    // Just need one registered commit for both updating and unlocking.
+    uw_register_transactional(ctx, ctx, uw_Sqlcache_commit, NULL, uw_Sqlcache_free);
+  }
+  uw_Sqlcache_Unlock *unlock = malloc(sizeof(uw_Sqlcache_Unlock));
+  unlock->lock = lock;
+  unlock->next = ctx->cacheUnlock;
+  ctx->cacheUnlock = unlock;
+}
+
+void uw_Sqlcache_rlock(uw_context ctx, uw_Sqlcache_Cache *cache) {
+  pthread_rwlock_rdlock(&cache->lockOut);
+  uw_Sqlcache_pushUnlock(ctx, &cache->lockOut);
+}
+
+void uw_Sqlcache_wlock(uw_context ctx, uw_Sqlcache_Cache *cache) {
+  pthread_rwlock_wrlock(&cache->lockOut);
+  uw_Sqlcache_pushUnlock(ctx, &cache->lockOut);
+}
+
+static char **uw_Sqlcache_copyKeys(char **keys, size_t numKeys) {
   char **copy = malloc(sizeof(char *) * numKeys);
   while (numKeys-- > 0) {
     char *k = keys[numKeys];
@@ -4798,11 +4845,9 @@
   update->value = value;
   update->next = NULL;
   if (ctx->cacheUpdateTail) {
-    // An update is already registered, so just extend it.
     ctx->cacheUpdateTail->next = update;
   } else {
     ctx->cacheUpdate = update;
-    uw_register_transactional(ctx, ctx, uw_Sqlcache_commitUpdate, NULL, uw_Sqlcache_freeUpdate);
   }
   ctx->cacheUpdateTail = update;
 }
--- a/src/lru_cache.sml	Fri Nov 13 01:05:22 2015 -0500
+++ b/src/lru_cache.sml	Fri Nov 13 11:03:09 2015 -0500
@@ -69,7 +69,9 @@
         Print.box
             [string ("static uw_Sqlcache_Cache cacheStruct" ^ i ^ " = {"),
              newline,
-             string "  .lock = PTHREAD_RWLOCK_INITIALIZER,",
+             string "  .lockIn = PTHREAD_RWLOCK_INITIALIZER,",
+             newline,
+             string "  .lockOut = PTHREAD_RWLOCK_INITIALIZER,",
              newline,
              string "  .table = NULL,",
              newline,
@@ -83,6 +85,22 @@
              newline,
              newline,
 
+             string ("static void uw_Sqlcache_rlock" ^ i ^ "(uw_context ctx) {"),
+             newline,
+             string ("  uw_Sqlcache_rlock(ctx, cache" ^ i ^ ");"),
+             newline,
+             string "}",
+             newline,
+             newline,
+
+             string ("static void uw_Sqlcache_wlock" ^ i ^ "(uw_context ctx) {"),
+             newline,
+             string ("  uw_Sqlcache_wlock(ctx, cache" ^ i ^ ");"),
+             newline,
+             string "}",
+             newline,
+             newline,
+
              string ("static uw_Basis_string uw_Sqlcache_check" ^ i),
              string ("(uw_context ctx" ^ typedArgs ^ ") {"),
              newline,
--- a/src/sqlcache.sml	Fri Nov 13 01:05:22 2015 -0500
+++ b/src/sqlcache.sml	Fri Nov 13 11:03:09 2015 -0500
@@ -913,7 +913,7 @@
 (* Program Instrumentation Utilities *)
 (*************************************)
 
-val {check, store, flush, ...} = getCache ()
+val {check, store, flush, lock, ...} = getCache ()
 
 val dummyTyp = (TRecord [], dummyLoc)
 
@@ -1431,7 +1431,7 @@
 (* Locking *)
 (***********)
 
-(* TODO: do this less evil-ly by not relying on specific FFI names, please? *)
+(* TODO: do this less evilly by not relying on specific FFI names, please? *)
 fun locksNeeded file =
     transitiveAnalysis
         (fn ((_, name, _, e, _), state) =>
@@ -1439,14 +1439,14 @@
                 {typ = #2,
                  exp = fn (EFfiApp ("Sqlcache", x, _), state as {store, flush}) =>
                           (case Int.fromString (String.extract (x, 5, NONE)) of
-                               NONE => raise Match
+                               NONE => state
                              | SOME index =>
-                               if String.isPrefix "store" x
+                               if String.isPrefix "flush" x
+                               then {store = store, flush = IIMM.insert (flush, name, index)}
+                               else if String.isPrefix "store" x
                                then {store = IIMM.insert (store, name, index), flush = flush}
-                               else if String.isPrefix "flush" x
-                               then {store = store, flush = IIMM.insert (flush, name, index)}
                                else state)
-                        | _ => state}
+                        | (_, state) => state}
                 state
                 e)
         {store = IIMM.empty, flush = IIMM.empty}
@@ -1459,13 +1459,36 @@
                IS.empty
                decls
 
-(* fun addLocking file = *)
-(*     let *)
-(*         val whichLocks = locksNeeded file *)
-(*         val needsLocks = exports file *)
-(*     in *)
+fun wrapLocks (locks, (exp', loc)) =
+    case exp' of
+        EAbs (s, t1, t2, exp) => (EAbs (s, t1, t2, wrapLocks (locks, exp)), loc)
+      | _ => (List.foldr (fn (l, e') => sequence [lock l, e']) exp' locks, loc)
 
-(*     end *)
+fun addLocking file =
+    let
+        val {store, flush} = locksNeeded file
+        fun locks n =
+            let
+                val wlocks = IIMM.findSet (flush, n)
+                val rlocks = IIMM.findSet (store, n)
+                val ls = map (fn i => (i, true)) (IS.listItems wlocks)
+                         @ map (fn i => (i, false)) (IS.listItems (IS.difference (rlocks, wlocks)))
+            in
+                ListMergeSort.sort (fn ((i, _), (j, _)) => i > j) ls
+            end
+        val expts = exports file
+        fun doVal (v as (x, n, t, exp, s)) =
+            if IS.member (expts, n)
+            then (x, n, t, wrapLocks ((locks n), exp), s)
+            else v
+        val doDecl =
+         fn (DVal v, loc) => (DVal (doVal v), loc)
+          | (DValRec vs, loc) => (DValRec (map doVal vs), loc)
+          | decl => decl
+    in
+        mapFst (map doDecl) file
+    end
+
 
 (************************)
 (* Compiler Entry Point *)
@@ -1494,7 +1517,7 @@
         (datatypes @ newDecls @ others, sideInfo)
     end
 
-val go' = addFlushing o addCaching o simplifySql o inlineSql
+val go' = addLocking o addFlushing o addCaching o simplifySql o inlineSql
 
 fun go file =
     let