comparison src/c/urweb.c @ 2285:ad3ce1528f71

Fix committing multiple stores/flushes. Locking is WIP.
author Ziv Scully <ziv@mit.edu>
date Thu, 12 Nov 2015 16:36:35 -0500
parents 472b4504aef2
children 0bdfec16a01d
comparison
equal deleted inserted replaced
2284:472b4504aef2 2285:ad3ce1528f71
422 char *name; 422 char *name;
423 void *data; 423 void *data;
424 void (*free)(void*); 424 void (*free)(void*);
425 } global; 425 } global;
426 426
427 typedef struct uw_Sqlcache_Inval { 427 typedef struct uw_Sqlcache_Update {
428 uw_Sqlcache_Cache *cache; 428 uw_Sqlcache_Cache *cache;
429 char **keys; 429 char **keys;
430 struct uw_Sqlcache_Inval *next; 430 uw_Sqlcache_Value *value;
431 } uw_Sqlcache_Inval; 431 struct uw_Sqlcache_Update *next;
432 } uw_Sqlcache_Update;
432 433
433 struct uw_context { 434 struct uw_context {
434 uw_app *app; 435 uw_app *app;
435 int id; 436 int id;
436 437
495 size_t output_buffer_size; 496 size_t output_buffer_size;
496 497
497 // Sqlcache. 498 // Sqlcache.
498 int numRecording; 499 int numRecording;
499 int recordingOffset; 500 int recordingOffset;
500 uw_Sqlcache_Inval *inval; 501 uw_Sqlcache_Update *cacheUpdate;
502 uw_Sqlcache_Update *cacheUpdateTail;
501 503
502 int remoteSock; 504 int remoteSock;
503 }; 505 };
504 506
505 size_t uw_headers_max = SIZE_MAX; 507 size_t uw_headers_max = SIZE_MAX;
506 size_t uw_page_max = SIZE_MAX; 508 size_t uw_page_max = SIZE_MAX;
507 size_t uw_heap_max = SIZE_MAX; 509 size_t uw_heap_max = SIZE_MAX;
508 size_t uw_script_max = SIZE_MAX; 510 size_t uw_script_max = SIZE_MAX;
509 511
510 uw_context uw_init(int id, uw_loggers *lg) { 512 uw_context uw_init(int id, uw_loggers *lg) {
513 puts("Initializing");
511 uw_context ctx = malloc(sizeof(struct uw_context)); 514 uw_context ctx = malloc(sizeof(struct uw_context));
512 515
513 ctx->app = NULL; 516 ctx->app = NULL;
514 ctx->id = id; 517 ctx->id = id;
515 518
583 ctx->output_buffer = malloc(1); 586 ctx->output_buffer = malloc(1);
584 ctx->output_buffer_size = 1; 587 ctx->output_buffer_size = 1;
585 588
586 ctx->numRecording = 0; 589 ctx->numRecording = 0;
587 ctx->recordingOffset = 0; 590 ctx->recordingOffset = 0;
588 ctx->inval = NULL; 591 ctx->cacheUpdate = NULL;
592 ctx->cacheUpdateTail = NULL;
589 593
590 ctx->remoteSock = -1; 594 ctx->remoteSock = -1;
591 595
592 return ctx; 596 return ctx;
593 } 597 }
4627 *buf++ = uw_Sqlcache_keySep; 4631 *buf++ = uw_Sqlcache_keySep;
4628 return stpcpy(buf, key); 4632 return stpcpy(buf, key);
4629 } 4633 }
4630 4634
4631 // The NUL-terminated prefix of [key] below always looks something like "_k1_k2_k3..._kn". 4635 // The NUL-terminated prefix of [key] below always looks something like "_k1_k2_k3..._kn".
4632 // TODO: strlen(key) = buf - key? 4636
4633 4637 uw_Sqlcache_Value *uw_Sqlcache_check(uw_context ctx, uw_Sqlcache_Cache *cache, char **keys) {
4634 uw_Sqlcache_Value *uw_Sqlcache_check(uw_Sqlcache_Cache *cache, char **keys) { 4638 pthread_rwlock_rdlock(&cache->lock);
4635 //pthread_rwlock_rdlock(cache->lock);
4636 size_t numKeys = cache->numKeys; 4639 size_t numKeys = cache->numKeys;
4637 char *key = uw_Sqlcache_allocKeyBuffer(keys, numKeys); 4640 char *key = uw_Sqlcache_allocKeyBuffer(keys, numKeys);
4638 char *buf = key; 4641 char *buf = key;
4639 time_t timeInvalid = cache->timeInvalid; 4642 time_t timeInvalid = cache->timeInvalid;
4640 uw_Sqlcache_Entry *entry; 4643 uw_Sqlcache_Entry *entry;
4642 buf = uw_Sqlcache_keyCopy(buf, keys[numKeys]); 4645 buf = uw_Sqlcache_keyCopy(buf, keys[numKeys]);
4643 size_t len = buf - key; 4646 size_t len = buf - key;
4644 entry = uw_Sqlcache_find(cache, key, len, 1); 4647 entry = uw_Sqlcache_find(cache, key, len, 1);
4645 if (!entry) { 4648 if (!entry) {
4646 free(key); 4649 free(key);
4650 pthread_rwlock_unlock(&cache->lock);
4647 return NULL; 4651 return NULL;
4648 } 4652 }
4649 timeInvalid = uw_Sqlcache_timeMax(timeInvalid, entry->timeInvalid); 4653 timeInvalid = uw_Sqlcache_timeMax(timeInvalid, entry->timeInvalid);
4650 } 4654 }
4651 free(key); 4655 free(key);
4656 // TODO: pass back copy of value and free it in the generated code... or use uw_malloc?
4652 uw_Sqlcache_Value *value = entry->value; 4657 uw_Sqlcache_Value *value = entry->value;
4653 //pthread_rwlock_unlock(cache->lock); 4658 pthread_rwlock_unlock(&cache->lock);
4654 return value && value->timeValid > timeInvalid ? value : NULL; 4659 return value && value->timeValid > timeInvalid ? value : NULL;
4655 } 4660 }
4656 4661
4657 void uw_Sqlcache_store(uw_Sqlcache_Cache *cache, char **keys, uw_Sqlcache_Value *value) { 4662 void uw_Sqlcache_storeCommitOne(uw_Sqlcache_Cache *cache, char **keys, uw_Sqlcache_Value *value) {
4658 //pthread_rwlock_wrlock(cache->lock); 4663 pthread_rwlock_wrlock(&cache->lock);
4659 size_t numKeys = cache->numKeys; 4664 size_t numKeys = cache->numKeys;
4660 char *key = uw_Sqlcache_allocKeyBuffer(keys, numKeys); 4665 char *key = uw_Sqlcache_allocKeyBuffer(keys, numKeys);
4661 char *buf = key; 4666 char *buf = key;
4662 time_t timeNow = uw_Sqlcache_getTimeNow(cache); 4667 time_t timeNow = uw_Sqlcache_getTimeNow(cache);
4663 uw_Sqlcache_Entry *entry; 4668 uw_Sqlcache_Entry *entry;
4667 entry = uw_Sqlcache_find(cache, key, len, 1); 4672 entry = uw_Sqlcache_find(cache, key, len, 1);
4668 if (!entry) { 4673 if (!entry) {
4669 entry = malloc(sizeof(uw_Sqlcache_Entry)); 4674 entry = malloc(sizeof(uw_Sqlcache_Entry));
4670 entry->key = strdup(key); 4675 entry->key = strdup(key);
4671 entry->value = NULL; 4676 entry->value = NULL;
4672 entry->timeInvalid = 0; // ASK: is this okay? 4677 entry->timeInvalid = 0;
4673 uw_Sqlcache_add(cache, entry, len); 4678 uw_Sqlcache_add(cache, entry, len);
4674 } 4679 }
4675 } 4680 }
4676 free(key); 4681 free(key);
4677 uw_Sqlcache_freeValue(entry->value); 4682 uw_Sqlcache_freeValue(entry->value);
4678 entry->value = value; 4683 entry->value = value;
4679 entry->value->timeValid = timeNow; 4684 entry->value->timeValid = timeNow;
4680 //pthread_rwlock_unlock(cache->lock); 4685 pthread_rwlock_unlock(&cache->lock);
4681 } 4686 }
4682 4687
4683 void uw_Sqlcache_flushCommitOne(uw_Sqlcache_Cache *cache, char **keys) { 4688 void uw_Sqlcache_flushCommitOne(uw_Sqlcache_Cache *cache, char **keys) {
4689 pthread_rwlock_wrlock(&cache->lock);
4684 size_t numKeys = cache->numKeys; 4690 size_t numKeys = cache->numKeys;
4685 char *key = uw_Sqlcache_allocKeyBuffer(keys, numKeys); 4691 char *key = uw_Sqlcache_allocKeyBuffer(keys, numKeys);
4686 char *buf = key; 4692 char *buf = key;
4687 time_t timeNow = uw_Sqlcache_getTimeNow(cache); 4693 time_t timeNow = uw_Sqlcache_getTimeNow(cache);
4688 uw_Sqlcache_Entry *entry; 4694 uw_Sqlcache_Entry *entry;
4707 } 4713 }
4708 } 4714 }
4709 free(key); 4715 free(key);
4710 // All the keys were non-null and the relevant entry is present, so we delete it. 4716 // All the keys were non-null and the relevant entry is present, so we delete it.
4711 uw_Sqlcache_delete(cache, entry); 4717 uw_Sqlcache_delete(cache, entry);
4712 } 4718 pthread_rwlock_unlock(&cache->lock);
4713 4719 }
4714 void uw_Sqlcache_flushFree(void *data, int dontCare) { 4720
4715 uw_Sqlcache_Inval *inval = (uw_Sqlcache_Inval *)data; 4721 void uw_Sqlcache_freeUpdate(void *data, int dontCare) {
4716 while (inval) { 4722 uw_context ctx = (uw_context)data;
4717 char** keys = inval->keys; 4723 uw_Sqlcache_Update *update = ctx->cacheUpdate;
4718 size_t numKeys = inval->cache->numKeys; 4724 while (update) {
4725 char** keys = update->keys;
4726 size_t numKeys = update->cache->numKeys;
4719 while (numKeys-- > 0) { 4727 while (numKeys-- > 0) {
4720 free(keys[numKeys]); 4728 free(keys[numKeys]);
4721 } 4729 }
4722 free(keys); 4730 free(keys);
4723 uw_Sqlcache_Inval *nextInval = inval->next; 4731 // Don't free [update->value]: it's in the cache now!
4724 free(inval); 4732 uw_Sqlcache_Update *nextUpdate = update->next;
4725 inval = nextInval; 4733 free(update);
4726 } 4734 update = nextUpdate;
4727 } 4735 }
4728 4736 ctx->cacheUpdate = NULL;
4729 void uw_Sqlcache_flushCommit(void *data) { 4737 ctx->cacheUpdateTail = NULL;
4730 uw_Sqlcache_Inval *inval = (uw_Sqlcache_Inval *)data; 4738 }
4731 while (inval) { 4739
4732 uw_Sqlcache_Cache *cache = inval->cache; 4740 void uw_Sqlcache_commitUpdate(void *data) {
4733 char **keys = inval->keys; 4741 uw_context ctx = (uw_context)data;
4734 uw_Sqlcache_flushCommitOne(cache, keys); 4742 uw_Sqlcache_Update *update = ctx->cacheUpdate;
4735 inval = inval->next; 4743 while (update) {
4744 uw_Sqlcache_Cache *cache = update->cache;
4745 char **keys = update->keys;
4746 if (update->value) {
4747 uw_Sqlcache_storeCommitOne(cache, keys, update->value);
4748 } else {
4749 uw_Sqlcache_flushCommitOne(cache, keys);
4750 }
4751 update = update->next;
4736 } 4752 }
4737 } 4753 }
4738 4754
4739 char **uw_Sqlcache_copyKeys(char **keys, size_t numKeys) { 4755 char **uw_Sqlcache_copyKeys(char **keys, size_t numKeys) {
4740 char **copy = malloc(sizeof(char *) * numKeys); 4756 char **copy = malloc(sizeof(char *) * numKeys);
4741 while (numKeys-- > 0) { 4757 while (numKeys-- > 0) {
4742 char * k = keys[numKeys]; 4758 char *k = keys[numKeys];
4743 copy[numKeys] = k ? strdup(k) : NULL; 4759 copy[numKeys] = k ? strdup(k) : NULL;
4744 } 4760 }
4745 return copy; 4761 return copy;
4746 } 4762 }
4747 4763
4764 void uw_Sqlcache_store(uw_context ctx, uw_Sqlcache_Cache *cache, char **keys, uw_Sqlcache_Value *value) {
4765 uw_Sqlcache_Update *update = malloc(sizeof(uw_Sqlcache_Update));
4766 update->cache = cache;
4767 update->keys = uw_Sqlcache_copyKeys(keys, cache->numKeys);
4768 update->value = value;
4769 update->next = NULL;
4770 if (ctx->cacheUpdateTail) {
4771 // An update is already registered, so just extend it.
4772 ctx->cacheUpdateTail->next = update;
4773 } else {
4774 ctx->cacheUpdate = update;
4775 uw_register_transactional(ctx, ctx, uw_Sqlcache_commitUpdate, NULL, uw_Sqlcache_freeUpdate);
4776 }
4777 ctx->cacheUpdateTail = update;
4778 }
4779
4748 void uw_Sqlcache_flush(uw_context ctx, uw_Sqlcache_Cache *cache, char **keys) { 4780 void uw_Sqlcache_flush(uw_context ctx, uw_Sqlcache_Cache *cache, char **keys) {
4749 //pthread_rwlock_wrlock(cache->lock); 4781 // A flush is represented in the queue as storing NULL.
4750 uw_Sqlcache_Inval *inval = malloc(sizeof(uw_Sqlcache_Inval)); 4782 uw_Sqlcache_store(ctx, cache, keys, NULL);
4751 inval->cache = cache; 4783 }
4752 inval->keys = uw_Sqlcache_copyKeys(keys, cache->numKeys);
4753 inval->next = NULL;
4754 if (ctx->inval) {
4755 // An invalidation is already registered, so just extend it.
4756 ctx->inval->next = inval;
4757 } else {
4758 uw_register_transactional(ctx, inval, uw_Sqlcache_flushCommit, NULL, uw_Sqlcache_flushFree);
4759 }
4760 // [ctx->inval] should always point to the last invalidation.
4761 ctx->inval = inval;
4762 //pthread_rwlock_unlock(cache->lock);
4763 }