comparison src/c/urweb.c @ 2304:6fb9232ade99

Merge Sqlcache
author Adam Chlipala <adam@chlipala.net>
date Sun, 20 Dec 2015 14:18:52 -0500
parents 42079884e34a
children
comparison
equal deleted inserted replaced
2201:1091227f535a 2304:6fb9232ade99
20 20
21 #include <pthread.h> 21 #include <pthread.h>
22 22
23 #include "types.h" 23 #include "types.h"
24 24
25 #include "uthash.h"
26
25 uw_unit uw_unit_v = 0; 27 uw_unit uw_unit_v = 0;
26 28
27 29
28 // Socket extras 30 // Socket extras
29 31
68 free(b->start); 70 free(b->start);
69 } 71 }
70 72
71 void uw_buffer_reset(uw_buffer *b) { 73 void uw_buffer_reset(uw_buffer *b) {
72 b->front = b->start; 74 b->front = b->start;
75 if (b->front != b->back) {
76 *b->front = 0;
77 }
73 } 78 }
74 79
75 int uw_buffer_check(uw_buffer *b, size_t extra) { 80 int uw_buffer_check(uw_buffer *b, size_t extra) {
76 if (b->back - b->front < extra) { 81 if (b->back - b->front < extra) {
77 size_t desired = b->front - b->start + extra, next; 82 size_t desired = b->front - b->start + extra, next;
359 void uw_global_init() { 364 void uw_global_init() {
360 clients = malloc(0); 365 clients = malloc(0);
361 366
362 uw_global_custom(); 367 uw_global_custom();
363 uw_init_crypto(); 368 uw_init_crypto();
369
370 // Fast non-cryptographic strength randomness for Sqlcache.
371 srandom(clock());
364 } 372 }
365 373
366 void uw_app_init(uw_app *app) { 374 void uw_app_init(uw_app *app) {
367 app->client_init(); 375 app->client_init();
368 } 376 }
417 char *name; 425 char *name;
418 void *data; 426 void *data;
419 void (*free)(void*); 427 void (*free)(void*);
420 } global; 428 } global;
421 429
430 typedef struct uw_Sqlcache_Update {
431 uw_Sqlcache_Cache *cache;
432 char **keys;
433 uw_Sqlcache_Value *value;
434 struct uw_Sqlcache_Update *next;
435 } uw_Sqlcache_Update;
436
437 typedef struct uw_Sqlcache_Unlock {
438 pthread_rwlock_t *lock;
439 struct uw_Sqlcache_Unlock *next;
440 } uw_Sqlcache_Unlock;
441
422 struct uw_context { 442 struct uw_context {
423 uw_app *app; 443 uw_app *app;
424 int id; 444 int id;
425 445
426 char *(*get_header)(void *, const char *); 446 char *(*get_header)(void *, const char *);
480 500
481 int usedSig, needsResig; 501 int usedSig, needsResig;
482 502
483 char *output_buffer; 503 char *output_buffer;
484 size_t output_buffer_size; 504 size_t output_buffer_size;
505
506 // Sqlcache.
507 int numRecording, recordingCapacity;
508 int *recordingOffsets;
509 uw_Sqlcache_Update *cacheUpdate;
510 uw_Sqlcache_Update *cacheUpdateTail;
511 uw_Sqlcache_Unlock *cacheUnlock;
485 512
486 int remoteSock; 513 int remoteSock;
487 }; 514 };
488 515
489 size_t uw_headers_max = SIZE_MAX; 516 size_t uw_headers_max = SIZE_MAX;
565 ctx->needsResig = 0; 592 ctx->needsResig = 0;
566 593
567 ctx->output_buffer = malloc(1); 594 ctx->output_buffer = malloc(1);
568 ctx->output_buffer_size = 1; 595 ctx->output_buffer_size = 1;
569 596
597 ctx->numRecording = 0;
598 ctx->recordingCapacity = 0;
599 ctx->recordingOffsets = malloc(0);
600 ctx->cacheUpdate = NULL;
601 ctx->cacheUpdateTail = NULL;
602
570 ctx->remoteSock = -1; 603 ctx->remoteSock = -1;
604
605 ctx->cacheUnlock = NULL;
571 606
572 return ctx; 607 return ctx;
573 } 608 }
574 609
575 size_t uw_inputs_max = SIZE_MAX; 610 size_t uw_inputs_max = SIZE_MAX;
631 if (ctx->globals[i].free) 666 if (ctx->globals[i].free)
632 ctx->globals[i].free(ctx->globals[i].data); 667 ctx->globals[i].free(ctx->globals[i].data);
633 free(ctx->globals); 668 free(ctx->globals);
634 669
635 free(ctx->output_buffer); 670 free(ctx->output_buffer);
671
672 free(ctx->recordingOffsets);
636 673
637 free(ctx); 674 free(ctx);
638 } 675 }
639 676
640 void uw_reset_keep_error_message(uw_context ctx) { 677 void uw_reset_keep_error_message(uw_context ctx) {
655 ctx->nextId = 0; 692 ctx->nextId = 0;
656 ctx->amInitializing = 0; 693 ctx->amInitializing = 0;
657 ctx->usedSig = 0; 694 ctx->usedSig = 0;
658 ctx->needsResig = 0; 695 ctx->needsResig = 0;
659 ctx->remoteSock = -1; 696 ctx->remoteSock = -1;
697 ctx->numRecording = 0;
660 } 698 }
661 699
662 void uw_reset_keep_request(uw_context ctx) { 700 void uw_reset_keep_request(uw_context ctx) {
663 uw_reset_keep_error_message(ctx); 701 uw_reset_keep_error_message(ctx);
664 ctx->error_message[0] = 0; 702 ctx->error_message[0] = 0;
1699 1737
1700 void uw_write(uw_context ctx, const char* s) { 1738 void uw_write(uw_context ctx, const char* s) {
1701 uw_check(ctx, strlen(s) + 1); 1739 uw_check(ctx, strlen(s) + 1);
1702 uw_write_unsafe(ctx, s); 1740 uw_write_unsafe(ctx, s);
1703 *ctx->page.front = 0; 1741 *ctx->page.front = 0;
1742 }
1743
1744 void uw_recordingStart(uw_context ctx) {
1745 if (ctx->numRecording == ctx->recordingCapacity) {
1746 ++ctx->recordingCapacity;
1747 ctx->recordingOffsets = realloc(ctx->recordingOffsets, sizeof(int) * ctx->recordingCapacity);
1748 }
1749 ctx->recordingOffsets[ctx->numRecording] = ctx->page.front - ctx->page.start;
1750 ++ctx->numRecording;
1751 }
1752
1753 char *uw_recordingRead(uw_context ctx) {
1754 char *recording = ctx->page.start + ctx->recordingOffsets[--ctx->numRecording];
1755 return strdup(recording);
1704 } 1756 }
1705 1757
1706 char *uw_Basis_attrifyInt(uw_context ctx, uw_Basis_int n) { 1758 char *uw_Basis_attrifyInt(uw_context ctx, uw_Basis_int n) {
1707 char *result; 1759 char *result;
1708 int len; 1760 int len;
3631 int r = setjmp(ctx->jmp_buf); 3683 int r = setjmp(ctx->jmp_buf);
3632 3684
3633 if (r == 0) { 3685 if (r == 0) {
3634 uw_ensure_transaction(ctx); 3686 uw_ensure_transaction(ctx);
3635 ctx->app->initializer(ctx); 3687 ctx->app->initializer(ctx);
3636 if (ctx->app->db_commit(ctx)) 3688 if (uw_commit(ctx))
3637 uw_error(ctx, FATAL, "Error running SQL COMMIT"); 3689 uw_error(ctx, FATAL, "Error running SQL COMMIT");
3638 } 3690 }
3639 3691
3640 return r; 3692 return r;
3641 } 3693 }
4504 } 4556 }
4505 4557
4506 void uw_set_remoteSock(uw_context ctx, int sock) { 4558 void uw_set_remoteSock(uw_context ctx, int sock) {
4507 ctx->remoteSock = sock; 4559 ctx->remoteSock = sock;
4508 } 4560 }
4561
4562
4563 // Sqlcache
4564
4565 typedef struct uw_Sqlcache_Entry {
4566 char *key;
4567 uw_Sqlcache_Value *value;
4568 unsigned long timeInvalid;
4569 UT_hash_handle hh;
4570 } uw_Sqlcache_Entry;
4571
4572 static void uw_Sqlcache_freeValue(uw_Sqlcache_Value *value) {
4573 if (value) {
4574 free(value->result);
4575 free(value->output);
4576 free(value);
4577 }
4578 }
4579
4580 static void uw_Sqlcache_freeEntry(uw_Sqlcache_Entry* entry) {
4581 if (entry) {
4582 free(entry->key);
4583 uw_Sqlcache_freeValue(entry->value);
4584 free(entry);
4585 }
4586 }
4587
4588 // TODO: pick a number.
4589 static unsigned int uw_Sqlcache_maxSize = 1234567890;
4590
4591 static void uw_Sqlcache_delete(uw_Sqlcache_Cache *cache, uw_Sqlcache_Entry *entry) {
4592 if (entry) {
4593 HASH_DEL(cache->table, entry);
4594 uw_Sqlcache_freeEntry(entry);
4595 }
4596 }
4597
4598 static uw_Sqlcache_Entry *uw_Sqlcache_find(uw_Sqlcache_Cache *cache, char *key, size_t len, int bump) {
4599 uw_Sqlcache_Entry *entry = NULL;
4600 HASH_FIND(hh, cache->table, key, len, entry);
4601 if (entry && bump) {
4602 // Bump for LRU purposes.
4603 HASH_DEL(cache->table, entry);
4604 // Important that we use [entry->key], because [key] might be ephemeral.
4605 HASH_ADD_KEYPTR(hh, cache->table, entry->key, len, entry);
4606 }
4607 return entry;
4608 }
4609
4610 static void uw_Sqlcache_add(uw_Sqlcache_Cache *cache, uw_Sqlcache_Entry *entry, size_t len) {
4611 HASH_ADD_KEYPTR(hh, cache->table, entry->key, len, entry);
4612 if (HASH_COUNT(cache->table) > uw_Sqlcache_maxSize) {
4613 // Deletes the first element of the cache.
4614 uw_Sqlcache_delete(cache, cache->table);
4615 }
4616 }
4617
4618 static unsigned long uw_Sqlcache_getTimeNow(uw_Sqlcache_Cache *cache) {
4619 // TODO: verify that this makes time comparisons do the Right Thing.
4620 return cache->timeNow++;
4621 }
4622
4623 static unsigned long uw_Sqlcache_timeMax(unsigned long x, unsigned long y) {
4624 return x > y ? x : y;
4625 }
4626
4627 static char uw_Sqlcache_keySep = '_';
4628
4629 static char *uw_Sqlcache_allocKeyBuffer(char **keys, size_t numKeys) {
4630 size_t len = 0;
4631 while (numKeys-- > 0) {
4632 char* k = keys[numKeys];
4633 if (!k) {
4634 // Can only happen when flushing, in which case we don't need anything past the null key.
4635 break;
4636 }
4637 // Leave room for separator.
4638 len += 1 + strlen(k);
4639 }
4640 char *buf = malloc(len+1);
4641 // If nothing is copied into the buffer, it should look like it has length 0.
4642 buf[0] = 0;
4643 return buf;
4644 }
4645
4646 static char *uw_Sqlcache_keyCopy(char *buf, char *key) {
4647 *buf++ = uw_Sqlcache_keySep;
4648 return stpcpy(buf, key);
4649 }
4650
4651 // The NUL-terminated prefix of [key] below always looks something like "_k1_k2_k3..._kn".
4652
4653 uw_Sqlcache_Value *uw_Sqlcache_check(uw_context ctx, uw_Sqlcache_Cache *cache, char **keys) {
4654 int doBump = random() % 1024 == 0;
4655 if (doBump) {
4656 pthread_rwlock_wrlock(&cache->lockIn);
4657 } else {
4658 pthread_rwlock_rdlock(&cache->lockIn);
4659 }
4660 size_t numKeys = cache->numKeys;
4661 char *key = uw_Sqlcache_allocKeyBuffer(keys, numKeys);
4662 char *buf = key;
4663 time_t timeInvalid = cache->timeInvalid;
4664 uw_Sqlcache_Entry *entry;
4665 if (numKeys == 0) {
4666 entry = cache->table;
4667 if (!entry) {
4668 free(key);
4669 pthread_rwlock_unlock(&cache->lockIn);
4670 return NULL;
4671 }
4672 } else {
4673 while (numKeys-- > 0) {
4674 buf = uw_Sqlcache_keyCopy(buf, keys[numKeys]);
4675 size_t len = buf - key;
4676 entry = uw_Sqlcache_find(cache, key, len, doBump);
4677 if (!entry) {
4678 free(key);
4679 pthread_rwlock_unlock(&cache->lockIn);
4680 return NULL;
4681 }
4682 timeInvalid = uw_Sqlcache_timeMax(timeInvalid, entry->timeInvalid);
4683 }
4684 free(key);
4685 }
4686 uw_Sqlcache_Value *value = entry->value;
4687 pthread_rwlock_unlock(&cache->lockIn);
4688 // ASK: though the argument isn't trivial, this is safe, right?
4689 // Returning outside the lock is safe because updates happen at commit time.
4690 // Those are the only times the returned value or its strings can get freed.
4691 // Handler output is a new string, so it's safe to free this at commit time.
4692 return value && timeInvalid < value->timeValid ? value : NULL;
4693 }
4694
4695 static void uw_Sqlcache_storeCommitOne(uw_Sqlcache_Cache *cache, char **keys, uw_Sqlcache_Value *value) {
4696 pthread_rwlock_wrlock(&cache->lockIn);
4697 size_t numKeys = cache->numKeys;
4698 time_t timeNow = uw_Sqlcache_getTimeNow(cache);
4699 uw_Sqlcache_Entry *entry;
4700 if (numKeys == 0) {
4701 entry = cache->table;
4702 if (!entry) {
4703 entry = calloc(1, sizeof(uw_Sqlcache_Entry));
4704 entry->key = NULL;
4705 entry->value = NULL;
4706 entry->timeInvalid = 0;
4707 cache->table = entry;
4708 }
4709 } else {
4710 char *key = uw_Sqlcache_allocKeyBuffer(keys, numKeys);
4711 char *buf = key;
4712 while (numKeys-- > 0) {
4713 buf = uw_Sqlcache_keyCopy(buf, keys[numKeys]);
4714 size_t len = buf - key;
4715
4716 entry = uw_Sqlcache_find(cache, key, len, 1);
4717 if (!entry) {
4718 entry = calloc(1, sizeof(uw_Sqlcache_Entry));
4719 entry->key = strdup(key);
4720 entry->value = NULL;
4721 entry->timeInvalid = 0;
4722 uw_Sqlcache_add(cache, entry, len);
4723 }
4724 }
4725 free(key);
4726 }
4727 if (!entry->value || entry->value->timeValid < value->timeValid) {
4728 uw_Sqlcache_freeValue(entry->value);
4729 entry->value = value;
4730 entry->value->timeValid = timeNow;
4731 }
4732 pthread_rwlock_unlock(&cache->lockIn);
4733 }
4734
4735 static void uw_Sqlcache_flushCommitOne(uw_Sqlcache_Cache *cache, char **keys) {
4736 }
4737
4738 static void uw_Sqlcache_commit(void *data) {
4739 uw_context ctx = (uw_context)data;
4740 uw_Sqlcache_Update *update = ctx->cacheUpdate;
4741 while (update) {
4742 uw_Sqlcache_Cache *cache = update->cache;
4743 char **keys = update->keys;
4744 if (update->value) {
4745 uw_Sqlcache_storeCommitOne(cache, keys, update->value);
4746 } else {
4747 uw_Sqlcache_flushCommitOne(cache, keys);
4748 }
4749 update = update->next;
4750 }
4751 }
4752
4753 static void uw_Sqlcache_free(void *data, int dontCare) {
4754 uw_context ctx = (uw_context)data;
4755 uw_Sqlcache_Update *update = ctx->cacheUpdate;
4756 while (update) {
4757 char** keys = update->keys;
4758 size_t numKeys = update->cache->numKeys;
4759 while (numKeys-- > 0) {
4760 free(keys[numKeys]);
4761 }
4762 free(keys);
4763 // Don't free [update->value]: it's in the cache now!
4764 uw_Sqlcache_Update *nextUpdate = update->next;
4765 free(update);
4766 update = nextUpdate;
4767 }
4768 ctx->cacheUpdate = NULL;
4769 ctx->cacheUpdateTail = NULL;
4770 uw_Sqlcache_Unlock *unlock = ctx->cacheUnlock;
4771 while (unlock) {
4772 pthread_rwlock_unlock(unlock->lock);
4773 uw_Sqlcache_Unlock *nextUnlock = unlock->next;
4774 free(unlock);
4775 unlock = nextUnlock;
4776 }
4777 ctx->cacheUnlock = NULL;
4778 }
4779
4780 static void uw_Sqlcache_pushUnlock(uw_context ctx, pthread_rwlock_t *lock) {
4781 if (!ctx->cacheUnlock) {
4782 // Just need one registered commit for both updating and unlocking.
4783 uw_register_transactional(ctx, ctx, uw_Sqlcache_commit, NULL, uw_Sqlcache_free);
4784 }
4785 uw_Sqlcache_Unlock *unlock = malloc(sizeof(uw_Sqlcache_Unlock));
4786 unlock->lock = lock;
4787 unlock->next = ctx->cacheUnlock;
4788 ctx->cacheUnlock = unlock;
4789 }
4790
4791 void uw_Sqlcache_rlock(uw_context ctx, uw_Sqlcache_Cache *cache) {
4792 pthread_rwlock_rdlock(&cache->lockOut);
4793 uw_Sqlcache_pushUnlock(ctx, &cache->lockOut);
4794 }
4795
4796 void uw_Sqlcache_wlock(uw_context ctx, uw_Sqlcache_Cache *cache) {
4797 pthread_rwlock_wrlock(&cache->lockOut);
4798 uw_Sqlcache_pushUnlock(ctx, &cache->lockOut);
4799 }
4800
4801 static char **uw_Sqlcache_copyKeys(char **keys, size_t numKeys) {
4802 char **copy = malloc(sizeof(char *) * numKeys);
4803 while (numKeys-- > 0) {
4804 char *k = keys[numKeys];
4805 copy[numKeys] = k ? strdup(k) : NULL;
4806 }
4807 return copy;
4808 }
4809
4810 void uw_Sqlcache_store(uw_context ctx, uw_Sqlcache_Cache *cache, char **keys, uw_Sqlcache_Value *value) {
4811 uw_Sqlcache_Update *update = malloc(sizeof(uw_Sqlcache_Update));
4812 update->cache = cache;
4813 update->keys = uw_Sqlcache_copyKeys(keys, cache->numKeys);
4814 update->value = value;
4815 update->next = NULL;
4816 // Can't use [uw_Sqlcache_getTimeNow] because it modifies state and we don't have the lock.
4817 pthread_rwlock_rdlock(&cache->lockIn);
4818 value->timeValid = cache->timeNow;
4819 pthread_rwlock_unlock(&cache->lockIn);
4820 if (ctx->cacheUpdateTail) {
4821 ctx->cacheUpdateTail->next = update;
4822 } else {
4823 ctx->cacheUpdate = update;
4824 }
4825 ctx->cacheUpdateTail = update;
4826 }
4827
4828 void uw_Sqlcache_flush(uw_context ctx, uw_Sqlcache_Cache *cache, char **keys) {
4829 // A flush has to happen immediately so that subsequent stores in the same transaction fail.
4830 // This is safe to do because we will always call [uw_Sqlcache_wlock] earlier.
4831 // If the transaction fails, the only harm done is a few extra cache misses.
4832 pthread_rwlock_wrlock(&cache->lockIn);
4833 size_t numKeys = cache->numKeys;
4834 if (numKeys == 0) {
4835 uw_Sqlcache_Entry *entry = cache->table;
4836 if (entry) {
4837 uw_Sqlcache_freeValue(entry->value);
4838 entry->value = NULL;
4839 }
4840 } else {
4841 char *key = uw_Sqlcache_allocKeyBuffer(keys, numKeys);
4842 char *buf = key;
4843 time_t timeNow = uw_Sqlcache_getTimeNow(cache);
4844 while (numKeys-- > 0) {
4845 char *k = keys[numKeys];
4846 if (!k) {
4847 size_t len = buf - key;
4848 if (len == 0) {
4849 // The first key was null.
4850 cache->timeInvalid = timeNow;
4851 } else {
4852 uw_Sqlcache_Entry *entry = uw_Sqlcache_find(cache, key, len, 0);
4853 if (entry) {
4854 entry->timeInvalid = timeNow;
4855 }
4856 }
4857 free(key);
4858 pthread_rwlock_unlock(&cache->lockIn);
4859 return;
4860 }
4861 buf = uw_Sqlcache_keyCopy(buf, k);
4862 }
4863 // All the keys were non-null, so we delete the pointed-to entry.
4864 size_t len = buf - key;
4865 uw_Sqlcache_Entry *entry = uw_Sqlcache_find(cache, key, len, 0);
4866 free(key);
4867 uw_Sqlcache_delete(cache, entry);
4868 }
4869 pthread_rwlock_unlock(&cache->lockIn);
4870 }