Mercurial > urweb
diff src/c/urweb.c @ 668:b0c1a46b1f15
First message send delivered, but not interpreted
author | Adam Chlipala <adamc@hcoop.net> |
---|---|
date | Sun, 22 Mar 2009 15:05:07 -0400 |
parents | a93d5324f400 |
children | f68eee90dbcf |
line wrap: on
line diff
--- a/src/c/urweb.c Thu Mar 19 16:34:13 2009 -0400 +++ b/src/c/urweb.c Sun Mar 22 15:05:07 2009 -0400 @@ -76,8 +76,14 @@ return b->back - b->start; } +static void buf_append(buf *b, const char *s, size_t len) { + buf_check(b, len); + memcpy(b->front, s, len); + b->front += len; +} -// Cross-request state + +// Persistent client state typedef enum { UNUSED, USED } usage; @@ -101,21 +107,6 @@ static pthread_mutex_t clients_mutex = PTHREAD_MUTEX_INITIALIZER; -void uw_global_init() { - srand(time(NULL) ^ getpid()); - - clients = malloc(0); -} - -void uw_global_free() { - size_t i; - - for (i = 0; i < n_clients; ++i) - free(clients[i]); - - free(clients); -} - static client *uw_new_client() { client *c; @@ -147,24 +138,33 @@ static const char begin_msgs[] = "HTTP/1.1 200 OK\r\nContent-type: text/plain\r\n\r\n"; -void uw_client_connect(size_t id, int pass, int sock) { +static client *uw_find_client(size_t id) { client *c; pthread_mutex_lock(&clients_mutex); if (id >= n_clients) { pthread_mutex_unlock(&clients_mutex); - close(sock); - fprintf(stderr, "Out-of-bounds client request (%d)\n", (int)id); - return; + return NULL; } c = clients[id]; if (c->mode != USED) { pthread_mutex_unlock(&clients_mutex); + return NULL; + } + + pthread_mutex_unlock(&clients_mutex); + return c; +} + +void uw_client_connect(size_t id, int pass, int sock) { + client *c = uw_find_client(id); + + if (c == NULL) { close(sock); - fprintf(stderr, "Client request for unused ID (%d)\n", (int)id); + fprintf(stderr, "Out-of-bounds client request (%d)\n", (int)id); return; } @@ -172,7 +172,6 @@ if (pass != c->data.used.pass) { pthread_mutex_unlock(&c->data.used.lock); - pthread_mutex_unlock(&clients_mutex); close(sock); fprintf(stderr, "Wrong client password (%d)\n", (int)id); return; @@ -180,7 +179,6 @@ if (c->data.used.sock != -1) { pthread_mutex_unlock(&c->data.used.lock); - pthread_mutex_unlock(&clients_mutex); close(sock); fprintf(stderr, "Duplicate client connection (%d)\n", (int)id); return; @@ -193,15 +191,10 @@ uw_really_send(sock, c->data.used.msgs.start, buf_used(&c->data.used.msgs)); close(sock); } - else { - uw_really_send(sock, begin_msgs, sizeof(begin_msgs) - 1); - uw_really_send(sock, "Hi!", 3); - close(sock); - //c->data.used.sock = sock; - } + else + c->data.used.sock = sock; pthread_mutex_unlock(&c->data.used.lock); - pthread_mutex_unlock(&clients_mutex); } static void uw_free_client(client *c) { @@ -234,6 +227,150 @@ } +// Persistent channel state + +typedef struct client_list { + client *data; + struct client_list *next; +} client_list; + +typedef struct channel { + size_t id; + usage mode; + union { + struct channel *next; + struct { + pthread_mutex_t lock; + client_list *clients; + } used; + } data; +} channel; + +static channel **channels, *channels_free; +static size_t n_channels; + +static pthread_mutex_t channels_mutex = PTHREAD_MUTEX_INITIALIZER; + +static channel *uw_new_channel() { + channel *ch; + + pthread_mutex_lock(&channels_mutex); + + if (channels_free) { + ch = channels_free; + channels_free = channels_free->data.next; + } + else { + ++n_channels; + channels = realloc(channels, sizeof(channels) * n_channels); + ch = malloc(sizeof(channel)); + ch->id = n_channels-1; + channels[n_channels-1] = ch; + } + + ch->mode = USED; + pthread_mutex_init(&ch->data.used.lock, NULL); + ch->data.used.clients = NULL; + + pthread_mutex_unlock(&channels_mutex); + + return ch; +} + +static channel *uw_find_channel(size_t id) { + channel *ch = NULL; + + pthread_mutex_lock(&channels_mutex); + + if (id < n_channels && channels[id]->mode == USED) + ch = channels[id]; + + pthread_mutex_unlock(&channels_mutex); + + return ch; +} + +static void uw_subscribe(channel *ch, client *c) { + client_list *cs = malloc(sizeof(client_list)); + + pthread_mutex_lock(&ch->data.used.lock); + + cs->data = c; + cs->next = ch->data.used.clients; + ch->data.used.clients = cs; + + pthread_mutex_unlock(&ch->data.used.lock); +} + +static void uw_unsubscribe(channel *ch, client *c) { + client_list *prev, *cur, *tmp; + + pthread_mutex_lock(&ch->data.used.lock); + + for (prev = NULL, cur = ch->data.used.clients; cur; ) { + if (cur->data == c) { + if (prev) + prev->next = cur->next; + else + ch->data.used.clients = cur->next; + tmp = cur; + cur = cur->next; + free(tmp); + } + else { + prev = cur; + cur = cur->next; + } + } + + pthread_mutex_unlock(&ch->data.used.lock); +} + +static void uw_channel_send(channel *ch, const char *msg) { + size_t len = strlen(msg), preLen; + char pre[INTS_MAX + 2]; + client_list *cs; + + sprintf(pre, "%d\n", (int)ch->id); + preLen = strlen(pre); + + pthread_mutex_lock(&ch->data.used.lock); + + for (cs = ch->data.used.clients; cs; cs = cs->next) { + client *c = cs->data; + + pthread_mutex_lock(&c->data.used.lock); + + if (c->data.used.sock != -1) { + uw_really_send(c->data.used.sock, begin_msgs, sizeof(begin_msgs) - 1); + uw_really_send(c->data.used.sock, pre, preLen); + uw_really_send(c->data.used.sock, msg, len); + uw_really_send(c->data.used.sock, "\n", 1); + close(c->data.used.sock); + c->data.used.sock = -1; + } else { + buf_append(&c->data.used.msgs, pre, preLen); + buf_append(&c->data.used.msgs, msg, len); + buf_append(&c->data.used.msgs, "\n", 1); + } + + pthread_mutex_unlock(&c->data.used.lock); + } + + pthread_mutex_unlock(&ch->data.used.lock); +} + + +// Global entry points + +void uw_global_init() { + srand(time(NULL) ^ getpid()); + + clients = malloc(0); + channels = malloc(0); +} + + // Single-request state #define ERROR_BUF_LEN 1024 @@ -582,11 +719,17 @@ } } -const char *uw_Basis_get_listener(uw_context ctx, uw_unit u) { +const char *uw_Basis_get_listener(uw_context ctx, uw_Basis_string onload) { if (ctx->script_header[0] == 0) return ""; - else + else if (onload[0] == 0) return " onload='listener()'"; + else { + uw_Basis_string s = uw_malloc(ctx, strlen(onload) + 22); + + sprintf(s, " onload='listener();%s'", onload); + return s; + } } uw_Basis_string uw_Basis_jsifyString(uw_context ctx, uw_Basis_string s) { @@ -941,6 +1084,10 @@ return r; } +uw_Basis_channel uw_Basis_unurlifyChannel(uw_context ctx, char **s) { + return uw_Basis_unurlifyInt(ctx, s); +} + uw_Basis_float uw_Basis_unurlifyFloat(uw_context ctx, char **s) { char *new_s = uw_unurlify_advance(*s); uw_Basis_float r; @@ -1032,6 +1179,10 @@ return uw_unit_v; } +char *uw_Basis_htmlifyChannel(uw_context ctx, uw_Basis_channel ch) { + return uw_Basis_htmlifyInt(ctx, ch); +} + char *uw_Basis_htmlifyFloat(uw_context ctx, uw_Basis_float n) { int len; char *r; @@ -1575,4 +1726,34 @@ return uw_unit_v; } +uw_Basis_channel uw_Basis_new_channel(uw_context ctx, uw_unit u) { + return uw_new_channel()->id; +} +uw_unit uw_Basis_subscribe(uw_context ctx, uw_Basis_channel chn) { + channel *ch = uw_find_channel(chn); + + if (ch == NULL) + uw_error(ctx, FATAL, "Bad channel ID %d", (int)chn); + else { + size_t id = atoi(uw_Basis_requestHeader(ctx, "UrWeb-Client")); + int pass = atoi(uw_Basis_requestHeader(ctx, "UrWeb-Pass")); + client *c = uw_find_client(id); + + if (c == NULL) + uw_error(ctx, FATAL, "Unknown client ID in subscription request"); + else if (c->data.used.pass != pass) + uw_error(ctx, FATAL, "Wrong client password in subscription request"); + else + uw_subscribe(ch, c); + } +} + +uw_unit uw_Basis_send(uw_context ctx, uw_Basis_channel chn, uw_Basis_string msg) { + channel *ch = uw_find_channel(chn); + + if (ch == NULL) + uw_error(ctx, FATAL, "Bad channel ID %d", (int)chn); + else + uw_channel_send(ch, msg); +}