diff src/c/urweb.c @ 2285:ad3ce1528f71

Fix committing multiple stores/flushes. Locking is WIP.
author Ziv Scully <ziv@mit.edu>
date Thu, 12 Nov 2015 16:36:35 -0500
parents 472b4504aef2
children 0bdfec16a01d
line wrap: on
line diff
--- a/src/c/urweb.c	Thu Nov 12 11:44:21 2015 -0500
+++ b/src/c/urweb.c	Thu Nov 12 16:36:35 2015 -0500
@@ -424,11 +424,12 @@
   void (*free)(void*);
 } global;
 
-typedef struct uw_Sqlcache_Inval {
+typedef struct uw_Sqlcache_Update {
   uw_Sqlcache_Cache *cache;
   char **keys;
-  struct uw_Sqlcache_Inval *next;
-} uw_Sqlcache_Inval;
+  uw_Sqlcache_Value *value;
+  struct uw_Sqlcache_Update *next;
+} uw_Sqlcache_Update;
 
 struct uw_context {
   uw_app *app;
@@ -497,7 +498,8 @@
   // Sqlcache.
   int numRecording;
   int recordingOffset;
-  uw_Sqlcache_Inval *inval;
+  uw_Sqlcache_Update *cacheUpdate;
+  uw_Sqlcache_Update *cacheUpdateTail;
 
   int remoteSock;
 };
@@ -508,6 +510,7 @@
 size_t uw_script_max = SIZE_MAX;
 
 uw_context uw_init(int id, uw_loggers *lg) {
+  puts("Initializing");
   uw_context ctx = malloc(sizeof(struct uw_context));
 
   ctx->app = NULL;
@@ -585,7 +588,8 @@
 
   ctx->numRecording = 0;
   ctx->recordingOffset = 0;
-  ctx->inval = NULL;
+  ctx->cacheUpdate = NULL;
+  ctx->cacheUpdateTail = NULL;
 
   ctx->remoteSock = -1;
 
@@ -4629,10 +4633,9 @@
 }
 
 // The NUL-terminated prefix of [key] below always looks something like "_k1_k2_k3..._kn".
-// TODO: strlen(key) = buf - key?
-
-uw_Sqlcache_Value *uw_Sqlcache_check(uw_Sqlcache_Cache *cache, char **keys) {
-  //pthread_rwlock_rdlock(cache->lock);
+
+uw_Sqlcache_Value *uw_Sqlcache_check(uw_context ctx, uw_Sqlcache_Cache *cache, char **keys) {
+  pthread_rwlock_rdlock(&cache->lock);
   size_t numKeys = cache->numKeys;
   char *key = uw_Sqlcache_allocKeyBuffer(keys, numKeys);
   char *buf = key;
@@ -4644,18 +4647,20 @@
     entry = uw_Sqlcache_find(cache, key, len, 1);
     if (!entry) {
       free(key);
+      pthread_rwlock_unlock(&cache->lock);
       return NULL;
     }
     timeInvalid = uw_Sqlcache_timeMax(timeInvalid, entry->timeInvalid);
   }
   free(key);
+  // TODO: pass back copy of value and free it in the generated code... or use uw_malloc?
   uw_Sqlcache_Value *value = entry->value;
-  //pthread_rwlock_unlock(cache->lock);
+  pthread_rwlock_unlock(&cache->lock);
   return value && value->timeValid > timeInvalid ? value : NULL;
 }
 
-void uw_Sqlcache_store(uw_Sqlcache_Cache *cache, char **keys, uw_Sqlcache_Value *value) {
-  //pthread_rwlock_wrlock(cache->lock);
+void uw_Sqlcache_storeCommitOne(uw_Sqlcache_Cache *cache, char **keys, uw_Sqlcache_Value *value) {
+  pthread_rwlock_wrlock(&cache->lock);
   size_t numKeys = cache->numKeys;
   char *key = uw_Sqlcache_allocKeyBuffer(keys, numKeys);
   char *buf = key;
@@ -4669,7 +4674,7 @@
       entry = malloc(sizeof(uw_Sqlcache_Entry));
       entry->key = strdup(key);
       entry->value = NULL;
-      entry->timeInvalid = 0; // ASK: is this okay?
+      entry->timeInvalid = 0;
       uw_Sqlcache_add(cache, entry, len);
     }
   }
@@ -4677,10 +4682,11 @@
   uw_Sqlcache_freeValue(entry->value);
   entry->value = value;
   entry->value->timeValid = timeNow;
-  //pthread_rwlock_unlock(cache->lock);
+  pthread_rwlock_unlock(&cache->lock);
 }
 
 void uw_Sqlcache_flushCommitOne(uw_Sqlcache_Cache *cache, char **keys) {
+  pthread_rwlock_wrlock(&cache->lock);
   size_t numKeys = cache->numKeys;
   char *key = uw_Sqlcache_allocKeyBuffer(keys, numKeys);
   char *buf = key;
@@ -4709,55 +4715,69 @@
   free(key);
   // All the keys were non-null and the relevant entry is present, so we delete it.
   uw_Sqlcache_delete(cache, entry);
-}
-
-void uw_Sqlcache_flushFree(void *data, int dontCare) {
-  uw_Sqlcache_Inval *inval = (uw_Sqlcache_Inval *)data;
-  while (inval) {
-    char** keys = inval->keys;
-    size_t numKeys = inval->cache->numKeys;
+  pthread_rwlock_unlock(&cache->lock);
+}
+
+void uw_Sqlcache_freeUpdate(void *data, int dontCare) {
+  uw_context ctx = (uw_context)data;
+  uw_Sqlcache_Update *update = ctx->cacheUpdate;
+  while (update) {
+    char** keys = update->keys;
+    size_t numKeys = update->cache->numKeys;
     while (numKeys-- > 0) {
       free(keys[numKeys]);
     }
     free(keys);
-    uw_Sqlcache_Inval *nextInval = inval->next;
-    free(inval);
-    inval = nextInval;
+    // Don't free [update->value]: it's in the cache now!
+    uw_Sqlcache_Update *nextUpdate = update->next;
+    free(update);
+    update = nextUpdate;
   }
-}
-
-void uw_Sqlcache_flushCommit(void *data) {
-  uw_Sqlcache_Inval *inval = (uw_Sqlcache_Inval *)data;
-  while (inval) {
-    uw_Sqlcache_Cache *cache = inval->cache;
-    char **keys = inval->keys;
-    uw_Sqlcache_flushCommitOne(cache, keys);
-    inval = inval->next;
+  ctx->cacheUpdate = NULL;
+  ctx->cacheUpdateTail = NULL;
+}
+
+void uw_Sqlcache_commitUpdate(void *data) {
+  uw_context ctx = (uw_context)data;
+  uw_Sqlcache_Update *update = ctx->cacheUpdate;
+  while (update) {
+    uw_Sqlcache_Cache *cache = update->cache;
+    char **keys = update->keys;
+    if (update->value) {
+      uw_Sqlcache_storeCommitOne(cache, keys, update->value);
+    } else {
+      uw_Sqlcache_flushCommitOne(cache, keys);
+    }
+    update = update->next;
   }
 }
 
 char **uw_Sqlcache_copyKeys(char **keys, size_t numKeys) {
   char **copy = malloc(sizeof(char *) * numKeys);
   while (numKeys-- > 0) {
-    char * k = keys[numKeys];
+    char *k = keys[numKeys];
     copy[numKeys] = k ? strdup(k) : NULL;
   }
   return copy;
 }
 
+void uw_Sqlcache_store(uw_context ctx, uw_Sqlcache_Cache *cache, char **keys, uw_Sqlcache_Value *value) {
+  uw_Sqlcache_Update *update = malloc(sizeof(uw_Sqlcache_Update));
+  update->cache = cache;
+  update->keys = uw_Sqlcache_copyKeys(keys, cache->numKeys);
+  update->value = value;
+  update->next = NULL;
+  if (ctx->cacheUpdateTail) {
+    // An update is already registered, so just extend it.
+    ctx->cacheUpdateTail->next = update;
+  } else {
+    ctx->cacheUpdate = update;
+    uw_register_transactional(ctx, ctx, uw_Sqlcache_commitUpdate, NULL, uw_Sqlcache_freeUpdate);
+  }
+  ctx->cacheUpdateTail = update;
+}
+
 void uw_Sqlcache_flush(uw_context ctx, uw_Sqlcache_Cache *cache, char **keys) {
-  //pthread_rwlock_wrlock(cache->lock);
-  uw_Sqlcache_Inval *inval = malloc(sizeof(uw_Sqlcache_Inval));
-  inval->cache = cache;
-  inval->keys = uw_Sqlcache_copyKeys(keys, cache->numKeys);
-  inval->next = NULL;
-  if (ctx->inval) {
-    // An invalidation is already registered, so just extend it.
-    ctx->inval->next = inval;
-  } else {
-    uw_register_transactional(ctx, inval, uw_Sqlcache_flushCommit, NULL, uw_Sqlcache_flushFree);
-  }
-  // [ctx->inval] should always point to the last invalidation.
-  ctx->inval = inval;
-  //pthread_rwlock_unlock(cache->lock);
-}
+  // A flush is represented in the queue as storing NULL.
+  uw_Sqlcache_store(ctx, cache, keys, NULL);
+}