Mercurial > urweb
comparison src/c/urweb.c @ 2304:6fb9232ade99
Merge Sqlcache
author | Adam Chlipala <adam@chlipala.net> |
---|---|
date | Sun, 20 Dec 2015 14:18:52 -0500 |
parents | 42079884e34a |
children |
comparison
equal
deleted
inserted
replaced
2201:1091227f535a | 2304:6fb9232ade99 |
---|---|
20 | 20 |
21 #include <pthread.h> | 21 #include <pthread.h> |
22 | 22 |
23 #include "types.h" | 23 #include "types.h" |
24 | 24 |
25 #include "uthash.h" | |
26 | |
25 uw_unit uw_unit_v = 0; | 27 uw_unit uw_unit_v = 0; |
26 | 28 |
27 | 29 |
28 // Socket extras | 30 // Socket extras |
29 | 31 |
68 free(b->start); | 70 free(b->start); |
69 } | 71 } |
70 | 72 |
71 void uw_buffer_reset(uw_buffer *b) { | 73 void uw_buffer_reset(uw_buffer *b) { |
72 b->front = b->start; | 74 b->front = b->start; |
75 if (b->front != b->back) { | |
76 *b->front = 0; | |
77 } | |
73 } | 78 } |
74 | 79 |
75 int uw_buffer_check(uw_buffer *b, size_t extra) { | 80 int uw_buffer_check(uw_buffer *b, size_t extra) { |
76 if (b->back - b->front < extra) { | 81 if (b->back - b->front < extra) { |
77 size_t desired = b->front - b->start + extra, next; | 82 size_t desired = b->front - b->start + extra, next; |
359 void uw_global_init() { | 364 void uw_global_init() { |
360 clients = malloc(0); | 365 clients = malloc(0); |
361 | 366 |
362 uw_global_custom(); | 367 uw_global_custom(); |
363 uw_init_crypto(); | 368 uw_init_crypto(); |
369 | |
370 // Fast non-cryptographic strength randomness for Sqlcache. | |
371 srandom(clock()); | |
364 } | 372 } |
365 | 373 |
366 void uw_app_init(uw_app *app) { | 374 void uw_app_init(uw_app *app) { |
367 app->client_init(); | 375 app->client_init(); |
368 } | 376 } |
417 char *name; | 425 char *name; |
418 void *data; | 426 void *data; |
419 void (*free)(void*); | 427 void (*free)(void*); |
420 } global; | 428 } global; |
421 | 429 |
430 typedef struct uw_Sqlcache_Update { | |
431 uw_Sqlcache_Cache *cache; | |
432 char **keys; | |
433 uw_Sqlcache_Value *value; | |
434 struct uw_Sqlcache_Update *next; | |
435 } uw_Sqlcache_Update; | |
436 | |
437 typedef struct uw_Sqlcache_Unlock { | |
438 pthread_rwlock_t *lock; | |
439 struct uw_Sqlcache_Unlock *next; | |
440 } uw_Sqlcache_Unlock; | |
441 | |
422 struct uw_context { | 442 struct uw_context { |
423 uw_app *app; | 443 uw_app *app; |
424 int id; | 444 int id; |
425 | 445 |
426 char *(*get_header)(void *, const char *); | 446 char *(*get_header)(void *, const char *); |
480 | 500 |
481 int usedSig, needsResig; | 501 int usedSig, needsResig; |
482 | 502 |
483 char *output_buffer; | 503 char *output_buffer; |
484 size_t output_buffer_size; | 504 size_t output_buffer_size; |
505 | |
506 // Sqlcache. | |
507 int numRecording, recordingCapacity; | |
508 int *recordingOffsets; | |
509 uw_Sqlcache_Update *cacheUpdate; | |
510 uw_Sqlcache_Update *cacheUpdateTail; | |
511 uw_Sqlcache_Unlock *cacheUnlock; | |
485 | 512 |
486 int remoteSock; | 513 int remoteSock; |
487 }; | 514 }; |
488 | 515 |
489 size_t uw_headers_max = SIZE_MAX; | 516 size_t uw_headers_max = SIZE_MAX; |
565 ctx->needsResig = 0; | 592 ctx->needsResig = 0; |
566 | 593 |
567 ctx->output_buffer = malloc(1); | 594 ctx->output_buffer = malloc(1); |
568 ctx->output_buffer_size = 1; | 595 ctx->output_buffer_size = 1; |
569 | 596 |
597 ctx->numRecording = 0; | |
598 ctx->recordingCapacity = 0; | |
599 ctx->recordingOffsets = malloc(0); | |
600 ctx->cacheUpdate = NULL; | |
601 ctx->cacheUpdateTail = NULL; | |
602 | |
570 ctx->remoteSock = -1; | 603 ctx->remoteSock = -1; |
604 | |
605 ctx->cacheUnlock = NULL; | |
571 | 606 |
572 return ctx; | 607 return ctx; |
573 } | 608 } |
574 | 609 |
575 size_t uw_inputs_max = SIZE_MAX; | 610 size_t uw_inputs_max = SIZE_MAX; |
631 if (ctx->globals[i].free) | 666 if (ctx->globals[i].free) |
632 ctx->globals[i].free(ctx->globals[i].data); | 667 ctx->globals[i].free(ctx->globals[i].data); |
633 free(ctx->globals); | 668 free(ctx->globals); |
634 | 669 |
635 free(ctx->output_buffer); | 670 free(ctx->output_buffer); |
671 | |
672 free(ctx->recordingOffsets); | |
636 | 673 |
637 free(ctx); | 674 free(ctx); |
638 } | 675 } |
639 | 676 |
640 void uw_reset_keep_error_message(uw_context ctx) { | 677 void uw_reset_keep_error_message(uw_context ctx) { |
655 ctx->nextId = 0; | 692 ctx->nextId = 0; |
656 ctx->amInitializing = 0; | 693 ctx->amInitializing = 0; |
657 ctx->usedSig = 0; | 694 ctx->usedSig = 0; |
658 ctx->needsResig = 0; | 695 ctx->needsResig = 0; |
659 ctx->remoteSock = -1; | 696 ctx->remoteSock = -1; |
697 ctx->numRecording = 0; | |
660 } | 698 } |
661 | 699 |
662 void uw_reset_keep_request(uw_context ctx) { | 700 void uw_reset_keep_request(uw_context ctx) { |
663 uw_reset_keep_error_message(ctx); | 701 uw_reset_keep_error_message(ctx); |
664 ctx->error_message[0] = 0; | 702 ctx->error_message[0] = 0; |
1699 | 1737 |
1700 void uw_write(uw_context ctx, const char* s) { | 1738 void uw_write(uw_context ctx, const char* s) { |
1701 uw_check(ctx, strlen(s) + 1); | 1739 uw_check(ctx, strlen(s) + 1); |
1702 uw_write_unsafe(ctx, s); | 1740 uw_write_unsafe(ctx, s); |
1703 *ctx->page.front = 0; | 1741 *ctx->page.front = 0; |
1742 } | |
1743 | |
1744 void uw_recordingStart(uw_context ctx) { | |
1745 if (ctx->numRecording == ctx->recordingCapacity) { | |
1746 ++ctx->recordingCapacity; | |
1747 ctx->recordingOffsets = realloc(ctx->recordingOffsets, sizeof(int) * ctx->recordingCapacity); | |
1748 } | |
1749 ctx->recordingOffsets[ctx->numRecording] = ctx->page.front - ctx->page.start; | |
1750 ++ctx->numRecording; | |
1751 } | |
1752 | |
1753 char *uw_recordingRead(uw_context ctx) { | |
1754 char *recording = ctx->page.start + ctx->recordingOffsets[--ctx->numRecording]; | |
1755 return strdup(recording); | |
1704 } | 1756 } |
1705 | 1757 |
1706 char *uw_Basis_attrifyInt(uw_context ctx, uw_Basis_int n) { | 1758 char *uw_Basis_attrifyInt(uw_context ctx, uw_Basis_int n) { |
1707 char *result; | 1759 char *result; |
1708 int len; | 1760 int len; |
3631 int r = setjmp(ctx->jmp_buf); | 3683 int r = setjmp(ctx->jmp_buf); |
3632 | 3684 |
3633 if (r == 0) { | 3685 if (r == 0) { |
3634 uw_ensure_transaction(ctx); | 3686 uw_ensure_transaction(ctx); |
3635 ctx->app->initializer(ctx); | 3687 ctx->app->initializer(ctx); |
3636 if (ctx->app->db_commit(ctx)) | 3688 if (uw_commit(ctx)) |
3637 uw_error(ctx, FATAL, "Error running SQL COMMIT"); | 3689 uw_error(ctx, FATAL, "Error running SQL COMMIT"); |
3638 } | 3690 } |
3639 | 3691 |
3640 return r; | 3692 return r; |
3641 } | 3693 } |
4504 } | 4556 } |
4505 | 4557 |
4506 void uw_set_remoteSock(uw_context ctx, int sock) { | 4558 void uw_set_remoteSock(uw_context ctx, int sock) { |
4507 ctx->remoteSock = sock; | 4559 ctx->remoteSock = sock; |
4508 } | 4560 } |
4561 | |
4562 | |
4563 // Sqlcache | |
4564 | |
4565 typedef struct uw_Sqlcache_Entry { | |
4566 char *key; | |
4567 uw_Sqlcache_Value *value; | |
4568 unsigned long timeInvalid; | |
4569 UT_hash_handle hh; | |
4570 } uw_Sqlcache_Entry; | |
4571 | |
4572 static void uw_Sqlcache_freeValue(uw_Sqlcache_Value *value) { | |
4573 if (value) { | |
4574 free(value->result); | |
4575 free(value->output); | |
4576 free(value); | |
4577 } | |
4578 } | |
4579 | |
4580 static void uw_Sqlcache_freeEntry(uw_Sqlcache_Entry* entry) { | |
4581 if (entry) { | |
4582 free(entry->key); | |
4583 uw_Sqlcache_freeValue(entry->value); | |
4584 free(entry); | |
4585 } | |
4586 } | |
4587 | |
4588 // TODO: pick a number. | |
4589 static unsigned int uw_Sqlcache_maxSize = 1234567890; | |
4590 | |
4591 static void uw_Sqlcache_delete(uw_Sqlcache_Cache *cache, uw_Sqlcache_Entry *entry) { | |
4592 if (entry) { | |
4593 HASH_DEL(cache->table, entry); | |
4594 uw_Sqlcache_freeEntry(entry); | |
4595 } | |
4596 } | |
4597 | |
4598 static uw_Sqlcache_Entry *uw_Sqlcache_find(uw_Sqlcache_Cache *cache, char *key, size_t len, int bump) { | |
4599 uw_Sqlcache_Entry *entry = NULL; | |
4600 HASH_FIND(hh, cache->table, key, len, entry); | |
4601 if (entry && bump) { | |
4602 // Bump for LRU purposes. | |
4603 HASH_DEL(cache->table, entry); | |
4604 // Important that we use [entry->key], because [key] might be ephemeral. | |
4605 HASH_ADD_KEYPTR(hh, cache->table, entry->key, len, entry); | |
4606 } | |
4607 return entry; | |
4608 } | |
4609 | |
4610 static void uw_Sqlcache_add(uw_Sqlcache_Cache *cache, uw_Sqlcache_Entry *entry, size_t len) { | |
4611 HASH_ADD_KEYPTR(hh, cache->table, entry->key, len, entry); | |
4612 if (HASH_COUNT(cache->table) > uw_Sqlcache_maxSize) { | |
4613 // Deletes the first element of the cache. | |
4614 uw_Sqlcache_delete(cache, cache->table); | |
4615 } | |
4616 } | |
4617 | |
4618 static unsigned long uw_Sqlcache_getTimeNow(uw_Sqlcache_Cache *cache) { | |
4619 // TODO: verify that this makes time comparisons do the Right Thing. | |
4620 return cache->timeNow++; | |
4621 } | |
4622 | |
4623 static unsigned long uw_Sqlcache_timeMax(unsigned long x, unsigned long y) { | |
4624 return x > y ? x : y; | |
4625 } | |
4626 | |
4627 static char uw_Sqlcache_keySep = '_'; | |
4628 | |
4629 static char *uw_Sqlcache_allocKeyBuffer(char **keys, size_t numKeys) { | |
4630 size_t len = 0; | |
4631 while (numKeys-- > 0) { | |
4632 char* k = keys[numKeys]; | |
4633 if (!k) { | |
4634 // Can only happen when flushing, in which case we don't need anything past the null key. | |
4635 break; | |
4636 } | |
4637 // Leave room for separator. | |
4638 len += 1 + strlen(k); | |
4639 } | |
4640 char *buf = malloc(len+1); | |
4641 // If nothing is copied into the buffer, it should look like it has length 0. | |
4642 buf[0] = 0; | |
4643 return buf; | |
4644 } | |
4645 | |
4646 static char *uw_Sqlcache_keyCopy(char *buf, char *key) { | |
4647 *buf++ = uw_Sqlcache_keySep; | |
4648 return stpcpy(buf, key); | |
4649 } | |
4650 | |
4651 // The NUL-terminated prefix of [key] below always looks something like "_k1_k2_k3..._kn". | |
4652 | |
4653 uw_Sqlcache_Value *uw_Sqlcache_check(uw_context ctx, uw_Sqlcache_Cache *cache, char **keys) { | |
4654 int doBump = random() % 1024 == 0; | |
4655 if (doBump) { | |
4656 pthread_rwlock_wrlock(&cache->lockIn); | |
4657 } else { | |
4658 pthread_rwlock_rdlock(&cache->lockIn); | |
4659 } | |
4660 size_t numKeys = cache->numKeys; | |
4661 char *key = uw_Sqlcache_allocKeyBuffer(keys, numKeys); | |
4662 char *buf = key; | |
4663 time_t timeInvalid = cache->timeInvalid; | |
4664 uw_Sqlcache_Entry *entry; | |
4665 if (numKeys == 0) { | |
4666 entry = cache->table; | |
4667 if (!entry) { | |
4668 free(key); | |
4669 pthread_rwlock_unlock(&cache->lockIn); | |
4670 return NULL; | |
4671 } | |
4672 } else { | |
4673 while (numKeys-- > 0) { | |
4674 buf = uw_Sqlcache_keyCopy(buf, keys[numKeys]); | |
4675 size_t len = buf - key; | |
4676 entry = uw_Sqlcache_find(cache, key, len, doBump); | |
4677 if (!entry) { | |
4678 free(key); | |
4679 pthread_rwlock_unlock(&cache->lockIn); | |
4680 return NULL; | |
4681 } | |
4682 timeInvalid = uw_Sqlcache_timeMax(timeInvalid, entry->timeInvalid); | |
4683 } | |
4684 free(key); | |
4685 } | |
4686 uw_Sqlcache_Value *value = entry->value; | |
4687 pthread_rwlock_unlock(&cache->lockIn); | |
4688 // ASK: though the argument isn't trivial, this is safe, right? | |
4689 // Returning outside the lock is safe because updates happen at commit time. | |
4690 // Those are the only times the returned value or its strings can get freed. | |
4691 // Handler output is a new string, so it's safe to free this at commit time. | |
4692 return value && timeInvalid < value->timeValid ? value : NULL; | |
4693 } | |
4694 | |
4695 static void uw_Sqlcache_storeCommitOne(uw_Sqlcache_Cache *cache, char **keys, uw_Sqlcache_Value *value) { | |
4696 pthread_rwlock_wrlock(&cache->lockIn); | |
4697 size_t numKeys = cache->numKeys; | |
4698 time_t timeNow = uw_Sqlcache_getTimeNow(cache); | |
4699 uw_Sqlcache_Entry *entry; | |
4700 if (numKeys == 0) { | |
4701 entry = cache->table; | |
4702 if (!entry) { | |
4703 entry = calloc(1, sizeof(uw_Sqlcache_Entry)); | |
4704 entry->key = NULL; | |
4705 entry->value = NULL; | |
4706 entry->timeInvalid = 0; | |
4707 cache->table = entry; | |
4708 } | |
4709 } else { | |
4710 char *key = uw_Sqlcache_allocKeyBuffer(keys, numKeys); | |
4711 char *buf = key; | |
4712 while (numKeys-- > 0) { | |
4713 buf = uw_Sqlcache_keyCopy(buf, keys[numKeys]); | |
4714 size_t len = buf - key; | |
4715 | |
4716 entry = uw_Sqlcache_find(cache, key, len, 1); | |
4717 if (!entry) { | |
4718 entry = calloc(1, sizeof(uw_Sqlcache_Entry)); | |
4719 entry->key = strdup(key); | |
4720 entry->value = NULL; | |
4721 entry->timeInvalid = 0; | |
4722 uw_Sqlcache_add(cache, entry, len); | |
4723 } | |
4724 } | |
4725 free(key); | |
4726 } | |
4727 if (!entry->value || entry->value->timeValid < value->timeValid) { | |
4728 uw_Sqlcache_freeValue(entry->value); | |
4729 entry->value = value; | |
4730 entry->value->timeValid = timeNow; | |
4731 } | |
4732 pthread_rwlock_unlock(&cache->lockIn); | |
4733 } | |
4734 | |
4735 static void uw_Sqlcache_flushCommitOne(uw_Sqlcache_Cache *cache, char **keys) { | |
4736 } | |
4737 | |
4738 static void uw_Sqlcache_commit(void *data) { | |
4739 uw_context ctx = (uw_context)data; | |
4740 uw_Sqlcache_Update *update = ctx->cacheUpdate; | |
4741 while (update) { | |
4742 uw_Sqlcache_Cache *cache = update->cache; | |
4743 char **keys = update->keys; | |
4744 if (update->value) { | |
4745 uw_Sqlcache_storeCommitOne(cache, keys, update->value); | |
4746 } else { | |
4747 uw_Sqlcache_flushCommitOne(cache, keys); | |
4748 } | |
4749 update = update->next; | |
4750 } | |
4751 } | |
4752 | |
4753 static void uw_Sqlcache_free(void *data, int dontCare) { | |
4754 uw_context ctx = (uw_context)data; | |
4755 uw_Sqlcache_Update *update = ctx->cacheUpdate; | |
4756 while (update) { | |
4757 char** keys = update->keys; | |
4758 size_t numKeys = update->cache->numKeys; | |
4759 while (numKeys-- > 0) { | |
4760 free(keys[numKeys]); | |
4761 } | |
4762 free(keys); | |
4763 // Don't free [update->value]: it's in the cache now! | |
4764 uw_Sqlcache_Update *nextUpdate = update->next; | |
4765 free(update); | |
4766 update = nextUpdate; | |
4767 } | |
4768 ctx->cacheUpdate = NULL; | |
4769 ctx->cacheUpdateTail = NULL; | |
4770 uw_Sqlcache_Unlock *unlock = ctx->cacheUnlock; | |
4771 while (unlock) { | |
4772 pthread_rwlock_unlock(unlock->lock); | |
4773 uw_Sqlcache_Unlock *nextUnlock = unlock->next; | |
4774 free(unlock); | |
4775 unlock = nextUnlock; | |
4776 } | |
4777 ctx->cacheUnlock = NULL; | |
4778 } | |
4779 | |
4780 static void uw_Sqlcache_pushUnlock(uw_context ctx, pthread_rwlock_t *lock) { | |
4781 if (!ctx->cacheUnlock) { | |
4782 // Just need one registered commit for both updating and unlocking. | |
4783 uw_register_transactional(ctx, ctx, uw_Sqlcache_commit, NULL, uw_Sqlcache_free); | |
4784 } | |
4785 uw_Sqlcache_Unlock *unlock = malloc(sizeof(uw_Sqlcache_Unlock)); | |
4786 unlock->lock = lock; | |
4787 unlock->next = ctx->cacheUnlock; | |
4788 ctx->cacheUnlock = unlock; | |
4789 } | |
4790 | |
4791 void uw_Sqlcache_rlock(uw_context ctx, uw_Sqlcache_Cache *cache) { | |
4792 pthread_rwlock_rdlock(&cache->lockOut); | |
4793 uw_Sqlcache_pushUnlock(ctx, &cache->lockOut); | |
4794 } | |
4795 | |
4796 void uw_Sqlcache_wlock(uw_context ctx, uw_Sqlcache_Cache *cache) { | |
4797 pthread_rwlock_wrlock(&cache->lockOut); | |
4798 uw_Sqlcache_pushUnlock(ctx, &cache->lockOut); | |
4799 } | |
4800 | |
4801 static char **uw_Sqlcache_copyKeys(char **keys, size_t numKeys) { | |
4802 char **copy = malloc(sizeof(char *) * numKeys); | |
4803 while (numKeys-- > 0) { | |
4804 char *k = keys[numKeys]; | |
4805 copy[numKeys] = k ? strdup(k) : NULL; | |
4806 } | |
4807 return copy; | |
4808 } | |
4809 | |
4810 void uw_Sqlcache_store(uw_context ctx, uw_Sqlcache_Cache *cache, char **keys, uw_Sqlcache_Value *value) { | |
4811 uw_Sqlcache_Update *update = malloc(sizeof(uw_Sqlcache_Update)); | |
4812 update->cache = cache; | |
4813 update->keys = uw_Sqlcache_copyKeys(keys, cache->numKeys); | |
4814 update->value = value; | |
4815 update->next = NULL; | |
4816 // Can't use [uw_Sqlcache_getTimeNow] because it modifies state and we don't have the lock. | |
4817 pthread_rwlock_rdlock(&cache->lockIn); | |
4818 value->timeValid = cache->timeNow; | |
4819 pthread_rwlock_unlock(&cache->lockIn); | |
4820 if (ctx->cacheUpdateTail) { | |
4821 ctx->cacheUpdateTail->next = update; | |
4822 } else { | |
4823 ctx->cacheUpdate = update; | |
4824 } | |
4825 ctx->cacheUpdateTail = update; | |
4826 } | |
4827 | |
4828 void uw_Sqlcache_flush(uw_context ctx, uw_Sqlcache_Cache *cache, char **keys) { | |
4829 // A flush has to happen immediately so that subsequent stores in the same transaction fail. | |
4830 // This is safe to do because we will always call [uw_Sqlcache_wlock] earlier. | |
4831 // If the transaction fails, the only harm done is a few extra cache misses. | |
4832 pthread_rwlock_wrlock(&cache->lockIn); | |
4833 size_t numKeys = cache->numKeys; | |
4834 if (numKeys == 0) { | |
4835 uw_Sqlcache_Entry *entry = cache->table; | |
4836 if (entry) { | |
4837 uw_Sqlcache_freeValue(entry->value); | |
4838 entry->value = NULL; | |
4839 } | |
4840 } else { | |
4841 char *key = uw_Sqlcache_allocKeyBuffer(keys, numKeys); | |
4842 char *buf = key; | |
4843 time_t timeNow = uw_Sqlcache_getTimeNow(cache); | |
4844 while (numKeys-- > 0) { | |
4845 char *k = keys[numKeys]; | |
4846 if (!k) { | |
4847 size_t len = buf - key; | |
4848 if (len == 0) { | |
4849 // The first key was null. | |
4850 cache->timeInvalid = timeNow; | |
4851 } else { | |
4852 uw_Sqlcache_Entry *entry = uw_Sqlcache_find(cache, key, len, 0); | |
4853 if (entry) { | |
4854 entry->timeInvalid = timeNow; | |
4855 } | |
4856 } | |
4857 free(key); | |
4858 pthread_rwlock_unlock(&cache->lockIn); | |
4859 return; | |
4860 } | |
4861 buf = uw_Sqlcache_keyCopy(buf, k); | |
4862 } | |
4863 // All the keys were non-null, so we delete the pointed-to entry. | |
4864 size_t len = buf - key; | |
4865 uw_Sqlcache_Entry *entry = uw_Sqlcache_find(cache, key, len, 0); | |
4866 free(key); | |
4867 uw_Sqlcache_delete(cache, entry); | |
4868 } | |
4869 pthread_rwlock_unlock(&cache->lockIn); | |
4870 } |