Mercurial > urweb
changeset 2288:98f96a976ede
Finish locking, but it's not yet tested rigorously.
author | Ziv Scully <ziv@mit.edu> |
---|---|
date | Fri, 13 Nov 2015 11:03:09 -0500 (2015-11-13) |
parents | 08203f93dbc3 |
children | 78820fa8f5a7 |
files | include/urweb/types_cpp.h include/urweb/urweb_cpp.h src/c/urweb.c src/lru_cache.sml src/sqlcache.sml |
diffstat | 5 files changed, 164 insertions(+), 75 deletions(-) [+] |
line wrap: on
line diff
--- a/include/urweb/types_cpp.h Fri Nov 13 01:05:22 2015 -0500 +++ b/include/urweb/types_cpp.h Fri Nov 13 11:03:09 2015 -0500 @@ -133,7 +133,8 @@ typedef struct uw_Sqlcache_Entry uw_Sqlcache_Entry; typedef struct uw_Sqlcache_Cache { - pthread_rwlock_t lock; + pthread_rwlock_t lockOut; + pthread_rwlock_t lockIn; uw_Sqlcache_Entry *table; unsigned long timeInvalid; unsigned long timeNow;
--- a/include/urweb/urweb_cpp.h Fri Nov 13 01:05:22 2015 -0500 +++ b/include/urweb/urweb_cpp.h Fri Nov 13 11:03:09 2015 -0500 @@ -406,6 +406,8 @@ // Sqlcache. +void *uw_Sqlcache_rlock(struct uw_context *, uw_Sqlcache_Cache *); +void *uw_Sqlcache_wlock(struct uw_context *, uw_Sqlcache_Cache *); uw_Sqlcache_Value *uw_Sqlcache_check(struct uw_context *, uw_Sqlcache_Cache *, char **); void *uw_Sqlcache_store(struct uw_context *, uw_Sqlcache_Cache *, char **, uw_Sqlcache_Value *); void *uw_Sqlcache_flush(struct uw_context *, uw_Sqlcache_Cache *, char **);
--- a/src/c/urweb.c Fri Nov 13 01:05:22 2015 -0500 +++ b/src/c/urweb.c Fri Nov 13 11:03:09 2015 -0500 @@ -366,6 +366,9 @@ uw_global_custom(); uw_init_crypto(); + + // Fast non-cryptographic strength randomness for Sqlcache. + srandom(clock()); } void uw_app_init(uw_app *app) { @@ -431,6 +434,11 @@ struct uw_Sqlcache_Update *next; } uw_Sqlcache_Update; +typedef struct uw_Sqlcache_Unlock { + pthread_rwlock_t *lock; + struct uw_Sqlcache_Unlock *next; +} uw_Sqlcache_Unlock; + struct uw_context { uw_app *app; int id; @@ -500,6 +508,7 @@ int recordingOffset; uw_Sqlcache_Update *cacheUpdate; uw_Sqlcache_Update *cacheUpdateTail; + uw_Sqlcache_Unlock *cacheUnlock; int remoteSock; }; @@ -4556,7 +4565,7 @@ UT_hash_handle hh; } uw_Sqlcache_Entry; -void uw_Sqlcache_freeValue(uw_Sqlcache_Value *value) { +static void uw_Sqlcache_freeValue(uw_Sqlcache_Value *value) { if (value) { free(value->result); free(value->output); @@ -4564,7 +4573,7 @@ } } -void uw_Sqlcache_freeEntry(uw_Sqlcache_Entry* entry) { +static void uw_Sqlcache_freeEntry(uw_Sqlcache_Entry* entry) { if (entry) { free(entry->key); uw_Sqlcache_freeValue(entry->value); @@ -4573,14 +4582,14 @@ } // TODO: pick a number. -unsigned int uw_Sqlcache_maxSize = 1234567890; - -void uw_Sqlcache_delete(uw_Sqlcache_Cache *cache, uw_Sqlcache_Entry *entry) { +static unsigned int uw_Sqlcache_maxSize = 1234567890; + +static void uw_Sqlcache_delete(uw_Sqlcache_Cache *cache, uw_Sqlcache_Entry *entry) { HASH_DEL(cache->table, entry); uw_Sqlcache_freeEntry(entry); } -uw_Sqlcache_Entry *uw_Sqlcache_find(uw_Sqlcache_Cache *cache, char *key, size_t len, int bump) { +static uw_Sqlcache_Entry *uw_Sqlcache_find(uw_Sqlcache_Cache *cache, char *key, size_t len, int bump) { uw_Sqlcache_Entry *entry = NULL; HASH_FIND(hh, cache->table, key, len, entry); if (entry && bump) { @@ -4592,7 +4601,7 @@ return entry; } -void uw_Sqlcache_add(uw_Sqlcache_Cache *cache, uw_Sqlcache_Entry *entry, size_t len) { +static void uw_Sqlcache_add(uw_Sqlcache_Cache *cache, uw_Sqlcache_Entry *entry, size_t len) { HASH_ADD_KEYPTR(hh, cache->table, entry->key, len, entry); if (HASH_COUNT(cache->table) > uw_Sqlcache_maxSize) { // Deletes the first element of the cache. @@ -4600,17 +4609,17 @@ } } -unsigned long uw_Sqlcache_getTimeNow(uw_Sqlcache_Cache *cache) { +static unsigned long uw_Sqlcache_getTimeNow(uw_Sqlcache_Cache *cache) { return ++cache->timeNow; } -unsigned long uw_Sqlcache_timeMax(unsigned long x, unsigned long y) { +static unsigned long uw_Sqlcache_timeMax(unsigned long x, unsigned long y) { return x > y ? x : y; } -char uw_Sqlcache_keySep = '_'; - -char *uw_Sqlcache_allocKeyBuffer(char **keys, size_t numKeys) { +static char uw_Sqlcache_keySep = '_'; + +static char *uw_Sqlcache_allocKeyBuffer(char **keys, size_t numKeys) { size_t len = 0; while (numKeys-- > 0) { char* k = keys[numKeys]; @@ -4627,7 +4636,7 @@ return buf; } -char *uw_Sqlcache_keyCopy(char *buf, char *key) { +static char *uw_Sqlcache_keyCopy(char *buf, char *key) { *buf++ = uw_Sqlcache_keySep; return stpcpy(buf, key); } @@ -4635,7 +4644,12 @@ // The NUL-terminated prefix of [key] below always looks something like "_k1_k2_k3..._kn". uw_Sqlcache_Value *uw_Sqlcache_check(uw_context ctx, uw_Sqlcache_Cache *cache, char **keys) { - pthread_rwlock_rdlock(&cache->lock); + int doBump = random() % 1024 == 0; + if (doBump) { + pthread_rwlock_wrlock(&cache->lockIn); + } else { + pthread_rwlock_rdlock(&cache->lockIn); + } size_t numKeys = cache->numKeys; char *key = uw_Sqlcache_allocKeyBuffer(keys, numKeys); char *buf = key; @@ -4645,46 +4659,49 @@ entry = cache->table; if (!entry) { free(key); - pthread_rwlock_unlock(&cache->lock); + pthread_rwlock_unlock(&cache->lockIn); return NULL; } } else { while (numKeys-- > 0) { buf = uw_Sqlcache_keyCopy(buf, keys[numKeys]); size_t len = buf - key; - entry = uw_Sqlcache_find(cache, key, len, 1); + entry = uw_Sqlcache_find(cache, key, len, doBump); if (!entry) { free(key); - pthread_rwlock_unlock(&cache->lock); + pthread_rwlock_unlock(&cache->lockIn); return NULL; } timeInvalid = uw_Sqlcache_timeMax(timeInvalid, entry->timeInvalid); } free(key); } - // TODO: pass back copy of value and free it in the generated code... or use uw_malloc? uw_Sqlcache_Value *value = entry->value; - pthread_rwlock_unlock(&cache->lock); + pthread_rwlock_unlock(&cache->lockIn); + // ASK: though the argument isn't trivial, this is safe, right? + // Returning outside the lock is safe because updates happen at commit time. + // Those are the only times the returned value or its strings can get freed. + // Handler output is a new string, so it's safe to free this at commit time. return value && value->timeValid > timeInvalid ? value : NULL; } -void uw_Sqlcache_storeCommitOne(uw_Sqlcache_Cache *cache, char **keys, uw_Sqlcache_Value *value) { - pthread_rwlock_wrlock(&cache->lock); +static void uw_Sqlcache_storeCommitOne(uw_Sqlcache_Cache *cache, char **keys, uw_Sqlcache_Value *value) { + pthread_rwlock_wrlock(&cache->lockIn); size_t numKeys = cache->numKeys; - char *key = uw_Sqlcache_allocKeyBuffer(keys, numKeys); - char *buf = key; time_t timeNow = uw_Sqlcache_getTimeNow(cache); uw_Sqlcache_Entry *entry; if (numKeys == 0) { entry = cache->table; if (!entry) { entry = malloc(sizeof(uw_Sqlcache_Entry)); - entry->key = strdup(key); + entry->key = NULL; entry->value = NULL; entry->timeInvalid = 0; cache->table = entry; } } else { + char *key = uw_Sqlcache_allocKeyBuffer(keys, numKeys); + char *buf = key; while (numKeys-- > 0) { buf = uw_Sqlcache_keyCopy(buf, keys[numKeys]); size_t len = buf - key; @@ -4702,23 +4719,23 @@ uw_Sqlcache_freeValue(entry->value); entry->value = value; entry->value->timeValid = timeNow; - pthread_rwlock_unlock(&cache->lock); -} - -void uw_Sqlcache_flushCommitOne(uw_Sqlcache_Cache *cache, char **keys) { - pthread_rwlock_wrlock(&cache->lock); + pthread_rwlock_unlock(&cache->lockIn); +} + +static void uw_Sqlcache_flushCommitOne(uw_Sqlcache_Cache *cache, char **keys) { + pthread_rwlock_wrlock(&cache->lockIn); size_t numKeys = cache->numKeys; - char *key = uw_Sqlcache_allocKeyBuffer(keys, numKeys); - char *buf = key; - time_t timeNow = uw_Sqlcache_getTimeNow(cache); - uw_Sqlcache_Entry *entry; if (numKeys == 0) { - entry = cache->table; + uw_Sqlcache_Entry *entry = cache->table; if (entry) { uw_Sqlcache_freeValue(entry->value); entry->value = NULL; } } else { + char *key = uw_Sqlcache_allocKeyBuffer(keys, numKeys); + char *buf = key; + time_t timeNow = uw_Sqlcache_getTimeNow(cache); + uw_Sqlcache_Entry *entry = NULL; while (numKeys-- > 0) { char *k = keys[numKeys]; if (!k) { @@ -4729,15 +4746,16 @@ cache->timeInvalid = timeNow; } free(key); - pthread_rwlock_unlock(&cache->lock); + pthread_rwlock_unlock(&cache->lockIn); return; } buf = uw_Sqlcache_keyCopy(buf, k); size_t len = buf - key; entry = uw_Sqlcache_find(cache, key, len, 0); if (!entry) { + // Nothing in the cache to flush. free(key); - pthread_rwlock_unlock(&cache->lock); + pthread_rwlock_unlock(&cache->lockIn); return; } } @@ -4745,10 +4763,25 @@ // All the keys were non-null and the relevant entry is present, so we delete it. uw_Sqlcache_delete(cache, entry); } - pthread_rwlock_unlock(&cache->lock); -} - -void uw_Sqlcache_freeUpdate(void *data, int dontCare) { + pthread_rwlock_unlock(&cache->lockIn); +} + +static void uw_Sqlcache_commit(void *data) { + uw_context ctx = (uw_context)data; + uw_Sqlcache_Update *update = ctx->cacheUpdate; + while (update) { + uw_Sqlcache_Cache *cache = update->cache; + char **keys = update->keys; + if (update->value) { + uw_Sqlcache_storeCommitOne(cache, keys, update->value); + } else { + uw_Sqlcache_flushCommitOne(cache, keys); + } + update = update->next; + } +} + +static void uw_Sqlcache_free(void *data, int dontCare) { uw_context ctx = (uw_context)data; uw_Sqlcache_Update *update = ctx->cacheUpdate; while (update) { @@ -4765,24 +4798,38 @@ } ctx->cacheUpdate = NULL; ctx->cacheUpdateTail = NULL; -} - -void uw_Sqlcache_commitUpdate(void *data) { - uw_context ctx = (uw_context)data; - uw_Sqlcache_Update *update = ctx->cacheUpdate; - while (update) { - uw_Sqlcache_Cache *cache = update->cache; - char **keys = update->keys; - if (update->value) { - uw_Sqlcache_storeCommitOne(cache, keys, update->value); - } else { - uw_Sqlcache_flushCommitOne(cache, keys); - } - update = update->next; + uw_Sqlcache_Unlock *unlock = ctx->cacheUnlock; + while (unlock) { + pthread_rwlock_unlock(unlock->lock); + uw_Sqlcache_Unlock *nextUnlock = unlock->next; + free(unlock); + unlock = nextUnlock; } -} - -char **uw_Sqlcache_copyKeys(char **keys, size_t numKeys) { + ctx->cacheUnlock = NULL; +} + +static void uw_Sqlcache_pushUnlock(uw_context ctx, pthread_rwlock_t *lock) { + if (!ctx->cacheUnlock) { + // Just need one registered commit for both updating and unlocking. + uw_register_transactional(ctx, ctx, uw_Sqlcache_commit, NULL, uw_Sqlcache_free); + } + uw_Sqlcache_Unlock *unlock = malloc(sizeof(uw_Sqlcache_Unlock)); + unlock->lock = lock; + unlock->next = ctx->cacheUnlock; + ctx->cacheUnlock = unlock; +} + +void uw_Sqlcache_rlock(uw_context ctx, uw_Sqlcache_Cache *cache) { + pthread_rwlock_rdlock(&cache->lockOut); + uw_Sqlcache_pushUnlock(ctx, &cache->lockOut); +} + +void uw_Sqlcache_wlock(uw_context ctx, uw_Sqlcache_Cache *cache) { + pthread_rwlock_wrlock(&cache->lockOut); + uw_Sqlcache_pushUnlock(ctx, &cache->lockOut); +} + +static char **uw_Sqlcache_copyKeys(char **keys, size_t numKeys) { char **copy = malloc(sizeof(char *) * numKeys); while (numKeys-- > 0) { char *k = keys[numKeys]; @@ -4798,11 +4845,9 @@ update->value = value; update->next = NULL; if (ctx->cacheUpdateTail) { - // An update is already registered, so just extend it. ctx->cacheUpdateTail->next = update; } else { ctx->cacheUpdate = update; - uw_register_transactional(ctx, ctx, uw_Sqlcache_commitUpdate, NULL, uw_Sqlcache_freeUpdate); } ctx->cacheUpdateTail = update; }
--- a/src/lru_cache.sml Fri Nov 13 01:05:22 2015 -0500 +++ b/src/lru_cache.sml Fri Nov 13 11:03:09 2015 -0500 @@ -69,7 +69,9 @@ Print.box [string ("static uw_Sqlcache_Cache cacheStruct" ^ i ^ " = {"), newline, - string " .lock = PTHREAD_RWLOCK_INITIALIZER,", + string " .lockIn = PTHREAD_RWLOCK_INITIALIZER,", + newline, + string " .lockOut = PTHREAD_RWLOCK_INITIALIZER,", newline, string " .table = NULL,", newline, @@ -83,6 +85,22 @@ newline, newline, + string ("static void uw_Sqlcache_rlock" ^ i ^ "(uw_context ctx) {"), + newline, + string (" uw_Sqlcache_rlock(ctx, cache" ^ i ^ ");"), + newline, + string "}", + newline, + newline, + + string ("static void uw_Sqlcache_wlock" ^ i ^ "(uw_context ctx) {"), + newline, + string (" uw_Sqlcache_wlock(ctx, cache" ^ i ^ ");"), + newline, + string "}", + newline, + newline, + string ("static uw_Basis_string uw_Sqlcache_check" ^ i), string ("(uw_context ctx" ^ typedArgs ^ ") {"), newline,
--- a/src/sqlcache.sml Fri Nov 13 01:05:22 2015 -0500 +++ b/src/sqlcache.sml Fri Nov 13 11:03:09 2015 -0500 @@ -913,7 +913,7 @@ (* Program Instrumentation Utilities *) (*************************************) -val {check, store, flush, ...} = getCache () +val {check, store, flush, lock, ...} = getCache () val dummyTyp = (TRecord [], dummyLoc) @@ -1431,7 +1431,7 @@ (* Locking *) (***********) -(* TODO: do this less evil-ly by not relying on specific FFI names, please? *) +(* TODO: do this less evilly by not relying on specific FFI names, please? *) fun locksNeeded file = transitiveAnalysis (fn ((_, name, _, e, _), state) => @@ -1439,14 +1439,14 @@ {typ = #2, exp = fn (EFfiApp ("Sqlcache", x, _), state as {store, flush}) => (case Int.fromString (String.extract (x, 5, NONE)) of - NONE => raise Match + NONE => state | SOME index => - if String.isPrefix "store" x + if String.isPrefix "flush" x + then {store = store, flush = IIMM.insert (flush, name, index)} + else if String.isPrefix "store" x then {store = IIMM.insert (store, name, index), flush = flush} - else if String.isPrefix "flush" x - then {store = store, flush = IIMM.insert (flush, name, index)} else state) - | _ => state} + | (_, state) => state} state e) {store = IIMM.empty, flush = IIMM.empty} @@ -1459,13 +1459,36 @@ IS.empty decls -(* fun addLocking file = *) -(* let *) -(* val whichLocks = locksNeeded file *) -(* val needsLocks = exports file *) -(* in *) +fun wrapLocks (locks, (exp', loc)) = + case exp' of + EAbs (s, t1, t2, exp) => (EAbs (s, t1, t2, wrapLocks (locks, exp)), loc) + | _ => (List.foldr (fn (l, e') => sequence [lock l, e']) exp' locks, loc) -(* end *) +fun addLocking file = + let + val {store, flush} = locksNeeded file + fun locks n = + let + val wlocks = IIMM.findSet (flush, n) + val rlocks = IIMM.findSet (store, n) + val ls = map (fn i => (i, true)) (IS.listItems wlocks) + @ map (fn i => (i, false)) (IS.listItems (IS.difference (rlocks, wlocks))) + in + ListMergeSort.sort (fn ((i, _), (j, _)) => i > j) ls + end + val expts = exports file + fun doVal (v as (x, n, t, exp, s)) = + if IS.member (expts, n) + then (x, n, t, wrapLocks ((locks n), exp), s) + else v + val doDecl = + fn (DVal v, loc) => (DVal (doVal v), loc) + | (DValRec vs, loc) => (DValRec (map doVal vs), loc) + | decl => decl + in + mapFst (map doDecl) file + end + (************************) (* Compiler Entry Point *) @@ -1494,7 +1517,7 @@ (datatypes @ newDecls @ others, sideInfo) end -val go' = addFlushing o addCaching o simplifySql o inlineSql +val go' = addLocking o addFlushing o addCaching o simplifySql o inlineSql fun go file = let