diff src/c/urweb.c @ 2304:6fb9232ade99

Merge Sqlcache
author Adam Chlipala <adam@chlipala.net>
date Sun, 20 Dec 2015 14:18:52 -0500
parents 42079884e34a
children
line wrap: on
line diff
--- a/src/c/urweb.c	Sun Dec 20 13:41:35 2015 -0500
+++ b/src/c/urweb.c	Sun Dec 20 14:18:52 2015 -0500
@@ -22,6 +22,8 @@
 
 #include "types.h"
 
+#include "uthash.h"
+
 uw_unit uw_unit_v = 0;
 
 
@@ -70,6 +72,9 @@
 
 void uw_buffer_reset(uw_buffer *b) {
   b->front = b->start;
+  if (b->front != b->back) {
+    *b->front = 0;
+  }
 }
 
 int uw_buffer_check(uw_buffer *b, size_t extra) {
@@ -361,6 +366,9 @@
 
   uw_global_custom();
   uw_init_crypto();
+
+  // Fast non-cryptographic strength randomness for Sqlcache.
+  srandom(clock());
 }
 
 void uw_app_init(uw_app *app) {
@@ -419,6 +427,18 @@
   void (*free)(void*);
 } global;
 
+typedef struct uw_Sqlcache_Update {
+  uw_Sqlcache_Cache *cache;
+  char **keys;
+  uw_Sqlcache_Value *value;
+  struct uw_Sqlcache_Update *next;
+} uw_Sqlcache_Update;
+
+typedef struct uw_Sqlcache_Unlock {
+  pthread_rwlock_t *lock;
+  struct uw_Sqlcache_Unlock *next;
+} uw_Sqlcache_Unlock;
+
 struct uw_context {
   uw_app *app;
   int id;
@@ -483,6 +503,13 @@
   char *output_buffer;
   size_t output_buffer_size;
 
+  // Sqlcache.
+  int numRecording, recordingCapacity;
+  int *recordingOffsets;
+  uw_Sqlcache_Update *cacheUpdate;
+  uw_Sqlcache_Update *cacheUpdateTail;
+  uw_Sqlcache_Unlock *cacheUnlock;
+
   int remoteSock;
 };
 
@@ -567,8 +594,16 @@
   ctx->output_buffer = malloc(1);
   ctx->output_buffer_size = 1;
 
+  ctx->numRecording = 0;
+  ctx->recordingCapacity = 0;
+  ctx->recordingOffsets = malloc(0);
+  ctx->cacheUpdate = NULL;
+  ctx->cacheUpdateTail = NULL;
+
   ctx->remoteSock = -1;
 
+  ctx->cacheUnlock = NULL;
+
   return ctx;
 }
 
@@ -634,6 +669,8 @@
 
   free(ctx->output_buffer);
 
+  free(ctx->recordingOffsets);
+
   free(ctx);
 }
 
@@ -657,6 +694,7 @@
   ctx->usedSig = 0;
   ctx->needsResig = 0;
   ctx->remoteSock = -1;
+  ctx->numRecording = 0;
 }
 
 void uw_reset_keep_request(uw_context ctx) {
@@ -1703,6 +1741,20 @@
   *ctx->page.front = 0;
 }
 
+void uw_recordingStart(uw_context ctx) {
+  if (ctx->numRecording == ctx->recordingCapacity) {
+    ++ctx->recordingCapacity;
+    ctx->recordingOffsets = realloc(ctx->recordingOffsets, sizeof(int) * ctx->recordingCapacity);
+  }
+  ctx->recordingOffsets[ctx->numRecording] = ctx->page.front - ctx->page.start;
+  ++ctx->numRecording;
+}
+
+char *uw_recordingRead(uw_context ctx) {
+  char *recording = ctx->page.start + ctx->recordingOffsets[--ctx->numRecording];
+  return strdup(recording);
+}
+
 char *uw_Basis_attrifyInt(uw_context ctx, uw_Basis_int n) {
   char *result;
   int len;
@@ -3633,7 +3685,7 @@
   if (r == 0) {
     uw_ensure_transaction(ctx);
     ctx->app->initializer(ctx);
-    if (ctx->app->db_commit(ctx))
+    if (uw_commit(ctx))
       uw_error(ctx, FATAL, "Error running SQL COMMIT");
   }
 
@@ -4506,3 +4558,313 @@
 void uw_set_remoteSock(uw_context ctx, int sock) {
   ctx->remoteSock = sock;
 }
+
+
+// Sqlcache
+
+typedef struct uw_Sqlcache_Entry {
+  char *key;
+  uw_Sqlcache_Value *value;
+  unsigned long timeInvalid;
+  UT_hash_handle hh;
+} uw_Sqlcache_Entry;
+
+static void uw_Sqlcache_freeValue(uw_Sqlcache_Value *value) {
+  if (value) {
+    free(value->result);
+    free(value->output);
+    free(value);
+  }
+}
+
+static void uw_Sqlcache_freeEntry(uw_Sqlcache_Entry* entry) {
+  if (entry) {
+    free(entry->key);
+    uw_Sqlcache_freeValue(entry->value);
+    free(entry);
+  }
+}
+
+// TODO: pick a number.
+static unsigned int uw_Sqlcache_maxSize = 1234567890;
+
+static void uw_Sqlcache_delete(uw_Sqlcache_Cache *cache, uw_Sqlcache_Entry *entry) {
+  if (entry) {
+    HASH_DEL(cache->table, entry);
+    uw_Sqlcache_freeEntry(entry);
+  }
+}
+
+static uw_Sqlcache_Entry *uw_Sqlcache_find(uw_Sqlcache_Cache *cache, char *key, size_t len, int bump) {
+  uw_Sqlcache_Entry *entry = NULL;
+  HASH_FIND(hh, cache->table, key, len, entry);
+  if (entry && bump) {
+    // Bump for LRU purposes.
+    HASH_DEL(cache->table, entry);
+    // Important that we use [entry->key], because [key] might be ephemeral.
+    HASH_ADD_KEYPTR(hh, cache->table, entry->key, len, entry);
+  }
+  return entry;
+}
+
+static void uw_Sqlcache_add(uw_Sqlcache_Cache *cache, uw_Sqlcache_Entry *entry, size_t len) {
+  HASH_ADD_KEYPTR(hh, cache->table, entry->key, len, entry);
+  if (HASH_COUNT(cache->table) > uw_Sqlcache_maxSize) {
+    // Deletes the first element of the cache.
+    uw_Sqlcache_delete(cache, cache->table);
+  }
+}
+
+static unsigned long uw_Sqlcache_getTimeNow(uw_Sqlcache_Cache *cache) {
+  // TODO: verify that this makes time comparisons do the Right Thing.
+  return cache->timeNow++;
+}
+
+static unsigned long uw_Sqlcache_timeMax(unsigned long x, unsigned long y) {
+  return x > y ? x : y;
+}
+
+static char uw_Sqlcache_keySep = '_';
+
+static char *uw_Sqlcache_allocKeyBuffer(char **keys, size_t numKeys) {
+  size_t len = 0;
+  while (numKeys-- > 0) {
+    char* k = keys[numKeys];
+    if (!k) {
+      // Can only happen when flushing, in which case we don't need anything past the null key.
+      break;
+    }
+    // Leave room for separator.
+    len += 1 + strlen(k);
+  }
+  char *buf = malloc(len+1);
+  // If nothing is copied into the buffer, it should look like it has length 0.
+  buf[0] = 0;
+  return buf;
+}
+
+static char *uw_Sqlcache_keyCopy(char *buf, char *key) {
+  *buf++ = uw_Sqlcache_keySep;
+  return stpcpy(buf, key);
+}
+
+// The NUL-terminated prefix of [key] below always looks something like "_k1_k2_k3..._kn".
+
+uw_Sqlcache_Value *uw_Sqlcache_check(uw_context ctx, uw_Sqlcache_Cache *cache, char **keys) {
+  int doBump = random() % 1024 == 0;
+  if (doBump) {
+    pthread_rwlock_wrlock(&cache->lockIn);
+  } else {
+    pthread_rwlock_rdlock(&cache->lockIn);
+  }
+  size_t numKeys = cache->numKeys;
+  char *key = uw_Sqlcache_allocKeyBuffer(keys, numKeys);
+  char *buf = key;
+  time_t timeInvalid = cache->timeInvalid;
+  uw_Sqlcache_Entry *entry;
+  if (numKeys == 0) {
+    entry = cache->table;
+    if (!entry) {
+      free(key);
+      pthread_rwlock_unlock(&cache->lockIn);
+      return NULL;
+    }
+  } else {
+    while (numKeys-- > 0) {
+      buf = uw_Sqlcache_keyCopy(buf, keys[numKeys]);
+      size_t len = buf - key;
+      entry = uw_Sqlcache_find(cache, key, len, doBump);
+      if (!entry) {
+        free(key);
+        pthread_rwlock_unlock(&cache->lockIn);
+        return NULL;
+      }
+      timeInvalid = uw_Sqlcache_timeMax(timeInvalid, entry->timeInvalid);
+    }
+    free(key);
+  }
+  uw_Sqlcache_Value *value = entry->value;
+  pthread_rwlock_unlock(&cache->lockIn);
+  // ASK: though the argument isn't trivial, this is safe, right?
+  // Returning outside the lock is safe because updates happen at commit time.
+  // Those are the only times the returned value or its strings can get freed.
+  // Handler output is a new string, so it's safe to free this at commit time.
+  return value && timeInvalid < value->timeValid ? value : NULL;
+}
+
+static void uw_Sqlcache_storeCommitOne(uw_Sqlcache_Cache *cache, char **keys, uw_Sqlcache_Value *value) {
+  pthread_rwlock_wrlock(&cache->lockIn);
+  size_t numKeys = cache->numKeys;
+  time_t timeNow = uw_Sqlcache_getTimeNow(cache);
+  uw_Sqlcache_Entry *entry;
+  if (numKeys == 0) {
+    entry = cache->table;
+    if (!entry) {
+      entry = calloc(1, sizeof(uw_Sqlcache_Entry));
+      entry->key = NULL;
+      entry->value = NULL;
+      entry->timeInvalid = 0;
+      cache->table = entry;
+    }
+  } else {
+    char *key = uw_Sqlcache_allocKeyBuffer(keys, numKeys);
+    char *buf = key;
+    while (numKeys-- > 0) {
+      buf = uw_Sqlcache_keyCopy(buf, keys[numKeys]);
+      size_t len = buf - key;
+
+      entry = uw_Sqlcache_find(cache, key, len, 1);
+      if (!entry) {
+        entry = calloc(1, sizeof(uw_Sqlcache_Entry));
+        entry->key = strdup(key);
+        entry->value = NULL;
+        entry->timeInvalid = 0;
+        uw_Sqlcache_add(cache, entry, len);
+      }
+    }
+    free(key);
+  }
+  if (!entry->value || entry->value->timeValid < value->timeValid) {
+    uw_Sqlcache_freeValue(entry->value);
+    entry->value = value;
+    entry->value->timeValid = timeNow;
+  }
+  pthread_rwlock_unlock(&cache->lockIn);
+}
+
+static void uw_Sqlcache_flushCommitOne(uw_Sqlcache_Cache *cache, char **keys) {
+}
+
+static void uw_Sqlcache_commit(void *data) {
+  uw_context ctx = (uw_context)data;
+  uw_Sqlcache_Update *update = ctx->cacheUpdate;
+  while (update) {
+    uw_Sqlcache_Cache *cache = update->cache;
+    char **keys = update->keys;
+    if (update->value) {
+      uw_Sqlcache_storeCommitOne(cache, keys, update->value);
+    } else {
+      uw_Sqlcache_flushCommitOne(cache, keys);
+    }
+    update = update->next;
+  }
+}
+
+static void uw_Sqlcache_free(void *data, int dontCare) {
+  uw_context ctx = (uw_context)data;
+  uw_Sqlcache_Update *update = ctx->cacheUpdate;
+  while (update) {
+    char** keys = update->keys;
+    size_t numKeys = update->cache->numKeys;
+    while (numKeys-- > 0) {
+      free(keys[numKeys]);
+    }
+    free(keys);
+    // Don't free [update->value]: it's in the cache now!
+    uw_Sqlcache_Update *nextUpdate = update->next;
+    free(update);
+    update = nextUpdate;
+  }
+  ctx->cacheUpdate = NULL;
+  ctx->cacheUpdateTail = NULL;
+  uw_Sqlcache_Unlock *unlock = ctx->cacheUnlock;
+  while (unlock) {
+    pthread_rwlock_unlock(unlock->lock);
+    uw_Sqlcache_Unlock *nextUnlock = unlock->next;
+    free(unlock);
+    unlock = nextUnlock;
+  }
+  ctx->cacheUnlock = NULL;
+}
+
+static void uw_Sqlcache_pushUnlock(uw_context ctx, pthread_rwlock_t *lock) {
+  if (!ctx->cacheUnlock) {
+    // Just need one registered commit for both updating and unlocking.
+    uw_register_transactional(ctx, ctx, uw_Sqlcache_commit, NULL, uw_Sqlcache_free);
+  }
+  uw_Sqlcache_Unlock *unlock = malloc(sizeof(uw_Sqlcache_Unlock));
+  unlock->lock = lock;
+  unlock->next = ctx->cacheUnlock;
+  ctx->cacheUnlock = unlock;
+}
+
+void uw_Sqlcache_rlock(uw_context ctx, uw_Sqlcache_Cache *cache) {
+  pthread_rwlock_rdlock(&cache->lockOut);
+  uw_Sqlcache_pushUnlock(ctx, &cache->lockOut);
+}
+
+void uw_Sqlcache_wlock(uw_context ctx, uw_Sqlcache_Cache *cache) {
+  pthread_rwlock_wrlock(&cache->lockOut);
+  uw_Sqlcache_pushUnlock(ctx, &cache->lockOut);
+}
+
+static char **uw_Sqlcache_copyKeys(char **keys, size_t numKeys) {
+  char **copy = malloc(sizeof(char *) * numKeys);
+  while (numKeys-- > 0) {
+    char *k = keys[numKeys];
+    copy[numKeys] = k ? strdup(k) : NULL;
+  }
+  return copy;
+}
+
+void uw_Sqlcache_store(uw_context ctx, uw_Sqlcache_Cache *cache, char **keys, uw_Sqlcache_Value *value) {
+  uw_Sqlcache_Update *update = malloc(sizeof(uw_Sqlcache_Update));
+  update->cache = cache;
+  update->keys = uw_Sqlcache_copyKeys(keys, cache->numKeys);
+  update->value = value;
+  update->next = NULL;
+  // Can't use [uw_Sqlcache_getTimeNow] because it modifies state and we don't have the lock.
+  pthread_rwlock_rdlock(&cache->lockIn);
+  value->timeValid = cache->timeNow;
+  pthread_rwlock_unlock(&cache->lockIn);
+  if (ctx->cacheUpdateTail) {
+    ctx->cacheUpdateTail->next = update;
+  } else {
+    ctx->cacheUpdate = update;
+  }
+  ctx->cacheUpdateTail = update;
+}
+
+void uw_Sqlcache_flush(uw_context ctx, uw_Sqlcache_Cache *cache, char **keys) {
+  // A flush has to happen immediately so that subsequent stores in the same transaction fail.
+  // This is safe to do because we will always call [uw_Sqlcache_wlock] earlier.
+  // If the transaction fails, the only harm done is a few extra cache misses.
+  pthread_rwlock_wrlock(&cache->lockIn);
+  size_t numKeys = cache->numKeys;
+  if (numKeys == 0) {
+    uw_Sqlcache_Entry *entry = cache->table;
+    if (entry) {
+      uw_Sqlcache_freeValue(entry->value);
+      entry->value = NULL;
+    }
+  } else {
+    char *key = uw_Sqlcache_allocKeyBuffer(keys, numKeys);
+    char *buf = key;
+    time_t timeNow = uw_Sqlcache_getTimeNow(cache);
+    while (numKeys-- > 0) {
+      char *k = keys[numKeys];
+      if (!k) {
+        size_t len = buf - key;
+        if (len == 0) {
+          // The first key was null.
+          cache->timeInvalid = timeNow;
+        } else {
+          uw_Sqlcache_Entry *entry = uw_Sqlcache_find(cache, key, len, 0);
+          if (entry) {
+            entry->timeInvalid = timeNow;
+          }
+        }
+        free(key);
+        pthread_rwlock_unlock(&cache->lockIn);
+        return;
+      }
+      buf = uw_Sqlcache_keyCopy(buf, k);
+    }
+    // All the keys were non-null, so we delete the pointed-to entry.
+    size_t len = buf - key;
+    uw_Sqlcache_Entry *entry = uw_Sqlcache_find(cache, key, len, 0);
+    free(key);
+    uw_Sqlcache_delete(cache, entry);
+  }
+  pthread_rwlock_unlock(&cache->lockIn);
+}