Mercurial > urweb
comparison src/c/urweb.c @ 682:5bbb542243e8
Redo channels, making them single-client
author | Adam Chlipala <adamc@hcoop.net> |
---|---|
date | Sun, 29 Mar 2009 11:37:29 -0400 |
parents | 6c9b8875f347 |
children | 9a2c18dab11d |
comparison
equal
deleted
inserted
replaced
681:6c9b8875f347 | 682:5bbb542243e8 |
---|---|
5 #include <string.h> | 5 #include <string.h> |
6 #include <strings.h> | 6 #include <strings.h> |
7 #include <ctype.h> | 7 #include <ctype.h> |
8 #include <setjmp.h> | 8 #include <setjmp.h> |
9 #include <stdarg.h> | 9 #include <stdarg.h> |
10 #include <assert.h> | |
10 | 11 |
11 #include <pthread.h> | 12 #include <pthread.h> |
12 | 13 |
13 #include "types.h" | 14 #include "types.h" |
14 | 15 |
86 | 87 |
87 // Persistent state types | 88 // Persistent state types |
88 | 89 |
89 typedef enum { UNUSED, USED } usage; | 90 typedef enum { UNUSED, USED } usage; |
90 | 91 |
91 typedef struct channel_list { | |
92 struct channel *data; | |
93 struct channel_list *next; | |
94 } channel_list; | |
95 | |
96 typedef struct client { | 92 typedef struct client { |
97 size_t id; | 93 unsigned id; |
98 usage mode; | 94 usage mode; |
99 union { | 95 int pass; |
100 struct client *next; | 96 struct client *next; |
101 struct { | 97 pthread_mutex_t lock; |
102 pthread_mutex_t lock; | 98 buf msgs; |
103 int pass; | 99 int sock; |
104 buf msgs; | 100 time_t last_contact; |
105 int sock; | 101 unsigned n_channels; |
106 time_t last_contact; | |
107 unsigned refcount; | |
108 channel_list *channels; | |
109 } used; | |
110 } data; | |
111 } client; | 102 } client; |
112 | 103 |
113 typedef struct client_list { | |
114 client *data; | |
115 struct client_list *next; | |
116 } client_list; | |
117 | |
118 typedef struct channel { | |
119 size_t id; | |
120 usage mode; | |
121 union { | |
122 struct channel *next; | |
123 struct { | |
124 pthread_mutex_t lock; | |
125 client_list *clients; | |
126 unsigned refcount; | |
127 } used; | |
128 } data; | |
129 } channel; | |
130 | |
131 | 104 |
132 // Persistent client state | 105 // Persistent client state |
133 | 106 |
134 static client **clients, *clients_free; | 107 static client **clients, *clients_free, *clients_used; |
135 static size_t n_clients; | 108 static unsigned n_clients; |
136 | 109 |
137 static pthread_mutex_t clients_mutex = PTHREAD_MUTEX_INITIALIZER; | 110 static pthread_mutex_t clients_mutex = PTHREAD_MUTEX_INITIALIZER; |
138 | 111 |
139 static client *uw_new_client() { | 112 static client *new_client() { |
140 client *c; | 113 client *c; |
141 | 114 |
142 pthread_mutex_lock(&clients_mutex); | 115 pthread_mutex_lock(&clients_mutex); |
143 | 116 |
144 if (clients_free) { | 117 if (clients_free) { |
145 c = clients_free; | 118 c = clients_free; |
146 clients_free = clients_free->data.next; | 119 clients_free = clients_free->next; |
147 } | 120 } |
148 else { | 121 else { |
149 ++n_clients; | 122 ++n_clients; |
150 clients = realloc(clients, sizeof(client) * n_clients); | 123 clients = realloc(clients, sizeof(client) * n_clients); |
151 c = malloc(sizeof(client)); | 124 c = malloc(sizeof(client)); |
152 c->id = n_clients-1; | 125 c->id = n_clients-1; |
126 pthread_mutex_init(&c->lock, NULL); | |
127 buf_init(&c->msgs, 0); | |
153 clients[n_clients-1] = c; | 128 clients[n_clients-1] = c; |
154 } | 129 } |
155 | 130 |
131 pthread_mutex_lock(&c->lock); | |
156 c->mode = USED; | 132 c->mode = USED; |
157 pthread_mutex_init(&c->data.used.lock, NULL); | 133 c->pass = rand(); |
158 c->data.used.pass = rand(); | 134 c->sock = -1; |
159 c->data.used.sock = -1; | 135 c->last_contact = time(NULL); |
160 c->data.used.last_contact = time(NULL); | 136 buf_reset(&c->msgs); |
161 buf_init(&c->data.used.msgs, 0); | 137 c->n_channels = 0; |
162 c->data.used.refcount = 0; | 138 pthread_mutex_unlock(&c->lock); |
163 c->data.used.channels = NULL; | 139 |
140 c->next = clients_used; | |
141 clients_used = c; | |
164 | 142 |
165 pthread_mutex_unlock(&clients_mutex); | 143 pthread_mutex_unlock(&clients_mutex); |
166 | 144 |
167 return c; | 145 return c; |
168 } | 146 } |
169 | 147 |
170 static const char begin_msgs[] = "HTTP/1.1 200 OK\r\nContent-type: text/plain\r\n\r\n"; | 148 static const char begin_msgs[] = "HTTP/1.1 200 OK\r\nContent-type: text/plain\r\n\r\n"; |
171 | 149 |
172 static client *uw_find_client(size_t id) { | 150 static client *find_client(unsigned id) { |
173 client *c; | 151 client *c; |
174 | 152 |
175 pthread_mutex_lock(&clients_mutex); | 153 pthread_mutex_lock(&clients_mutex); |
176 | 154 |
177 if (id >= n_clients) { | 155 if (id >= n_clients) { |
179 return NULL; | 157 return NULL; |
180 } | 158 } |
181 | 159 |
182 c = clients[id]; | 160 c = clients[id]; |
183 | 161 |
184 if (c->mode != USED) { | |
185 pthread_mutex_unlock(&clients_mutex); | |
186 return NULL; | |
187 } | |
188 | |
189 pthread_mutex_lock(&c->data.used.lock); | |
190 ++c->data.used.refcount; | |
191 pthread_mutex_unlock(&c->data.used.lock); | |
192 pthread_mutex_unlock(&clients_mutex); | 162 pthread_mutex_unlock(&clients_mutex); |
193 return c; | 163 return c; |
194 } | 164 } |
195 | 165 |
196 static void uw_release_client(client *c) { | 166 void uw_client_connect(unsigned id, int pass, int sock) { |
197 pthread_mutex_lock(&c->data.used.lock); | 167 client *c = find_client(id); |
198 --c->data.used.refcount; | |
199 pthread_mutex_unlock(&c->data.used.lock); | |
200 } | |
201 | |
202 void uw_client_connect(size_t id, int pass, int sock) { | |
203 client *c = uw_find_client(id); | |
204 | 168 |
205 if (c == NULL) { | 169 if (c == NULL) { |
206 close(sock); | 170 close(sock); |
207 fprintf(stderr, "Out-of-bounds client request (%d)\n", (int)id); | 171 fprintf(stderr, "Out-of-bounds client request (%u)\n", id); |
208 return; | 172 return; |
209 } | 173 } |
210 | 174 |
211 uw_release_client(c); | 175 pthread_mutex_lock(&c->lock); |
212 | 176 |
213 pthread_mutex_lock(&c->data.used.lock); | 177 if (c->mode != USED) { |
214 | 178 pthread_mutex_unlock(&c->lock); |
215 if (pass != c->data.used.pass) { | |
216 pthread_mutex_unlock(&c->data.used.lock); | |
217 close(sock); | 179 close(sock); |
218 fprintf(stderr, "Wrong client password (%d)\n", (int)id); | 180 fprintf(stderr, "Client request for unused slot (%u)\n", id); |
219 return; | 181 return; |
220 } | 182 } |
221 | 183 |
222 if (c->data.used.sock != -1) { | 184 if (pass != c->pass) { |
223 close(c->data.used.sock); | 185 pthread_mutex_unlock(&c->lock); |
224 c->data.used.sock = -1; | 186 close(sock); |
225 } | 187 fprintf(stderr, "Wrong client password (%u, %d)\n", id, pass); |
226 | 188 return; |
227 c->data.used.last_contact = time(NULL); | 189 } |
228 | 190 |
229 if (buf_used(&c->data.used.msgs) > 0) { | 191 if (c->sock != -1) { |
192 close(c->sock); | |
193 c->sock = -1; | |
194 } | |
195 | |
196 c->last_contact = time(NULL); | |
197 | |
198 if (buf_used(&c->msgs) > 0) { | |
230 uw_really_send(sock, begin_msgs, sizeof(begin_msgs) - 1); | 199 uw_really_send(sock, begin_msgs, sizeof(begin_msgs) - 1); |
231 uw_really_send(sock, c->data.used.msgs.start, buf_used(&c->data.used.msgs)); | 200 uw_really_send(sock, c->msgs.start, buf_used(&c->msgs)); |
232 buf_reset(&c->data.used.msgs); | 201 buf_reset(&c->msgs); |
233 close(sock); | 202 close(sock); |
234 } | 203 } |
235 else | 204 else |
236 c->data.used.sock = sock; | 205 c->sock = sock; |
237 | 206 |
238 pthread_mutex_unlock(&c->data.used.lock); | 207 pthread_mutex_unlock(&c->lock); |
239 } | 208 } |
240 | 209 |
241 | 210 static void free_client(client *c) { |
242 static void uw_free_client(client *c) { | |
243 channel_list *chs; | |
244 | |
245 printf("Freeing client %d\n", c->id); | 211 printf("Freeing client %d\n", c->id); |
246 | 212 |
247 if (c->mode == USED) { | 213 c->mode = UNUSED; |
248 pthread_mutex_lock(&c->data.used.lock); | 214 c->pass = -1; |
249 | 215 |
250 for (chs = c->data.used.channels; chs; ) { | 216 c->next = clients_free; |
251 client_list *prev, *cs; | 217 clients_free = c; |
252 | |
253 channel *ch = chs->data; | |
254 channel_list *tmp = chs->next; | |
255 free(chs); | |
256 chs = tmp; | |
257 | |
258 pthread_mutex_lock(&ch->data.used.lock); | |
259 for (prev = NULL, cs = ch->data.used.clients; cs; ) { | |
260 if (cs->data == c) { | |
261 client_list *tmp = cs->next; | |
262 free(cs); | |
263 cs = tmp; | |
264 if (prev) | |
265 prev->next = cs; | |
266 else | |
267 ch->data.used.clients = cs; | |
268 } | |
269 else { | |
270 prev = cs; | |
271 cs = cs->next; | |
272 } | |
273 } | |
274 pthread_mutex_unlock(&ch->data.used.lock); | |
275 } | |
276 | |
277 if (c->data.used.sock != -1) | |
278 close(c->data.used.sock); | |
279 | |
280 pthread_mutex_unlock(&c->data.used.lock); | |
281 pthread_mutex_destroy(&c->data.used.lock); | |
282 buf_free(&c->data.used.msgs); | |
283 c->mode = UNUSED; | |
284 | |
285 c->data.next = clients_free; | |
286 clients_free = c; | |
287 } | |
288 } | 218 } |
289 | 219 |
290 extern int uw_timeout; | 220 extern int uw_timeout; |
291 | 221 |
292 void uw_prune_clients() { | 222 void uw_prune_clients() { |
293 size_t i; | 223 client *c, *next, *prev = NULL; |
294 time_t cutoff; | 224 time_t cutoff; |
295 | 225 |
296 cutoff = time(NULL) - uw_timeout; | 226 cutoff = time(NULL) - uw_timeout; |
297 | 227 |
298 pthread_mutex_lock(&clients_mutex); | 228 pthread_mutex_lock(&clients_mutex); |
299 | 229 |
300 for (i = 0; i < n_clients; ++i) { | 230 for (c = clients_used; c; c = next) { |
301 if (clients[i]->mode == USED && clients[i]->data.used.last_contact < cutoff | 231 next = c->next; |
302 && clients[i]->data.used.refcount == 0) | 232 pthread_mutex_lock(&c->lock); |
303 uw_free_client(clients[i]); | 233 if (c->last_contact < cutoff) { |
234 if (prev) | |
235 prev->next = next; | |
236 else | |
237 clients_used = next; | |
238 free_client(c); | |
239 } | |
240 else | |
241 prev = c; | |
242 pthread_mutex_unlock(&c->lock); | |
304 } | 243 } |
305 | 244 |
306 pthread_mutex_unlock(&clients_mutex); | 245 pthread_mutex_unlock(&clients_mutex); |
307 } | 246 } |
308 | 247 |
309 | 248 static uw_Basis_channel new_channel(client *c) { |
310 // Persistent channel state | 249 uw_Basis_channel ch = {c->id, c->n_channels++}; |
311 | |
312 | |
313 static channel **channels, *channels_free; | |
314 static size_t n_channels; | |
315 | |
316 static pthread_mutex_t channels_mutex = PTHREAD_MUTEX_INITIALIZER; | |
317 | |
318 static channel *uw_new_channel() { | |
319 channel *ch; | |
320 | |
321 pthread_mutex_lock(&channels_mutex); | |
322 | |
323 if (channels_free) { | |
324 ch = channels_free; | |
325 channels_free = channels_free->data.next; | |
326 } | |
327 else { | |
328 ++n_channels; | |
329 channels = realloc(channels, sizeof(channels) * n_channels); | |
330 ch = malloc(sizeof(channel)); | |
331 ch->id = n_channels-1; | |
332 channels[n_channels-1] = ch; | |
333 } | |
334 | |
335 ch->mode = USED; | |
336 pthread_mutex_init(&ch->data.used.lock, NULL); | |
337 ch->data.used.clients = NULL; | |
338 ch->data.used.refcount = 0; | |
339 | |
340 pthread_mutex_unlock(&channels_mutex); | |
341 | |
342 return ch; | 250 return ch; |
343 } | 251 } |
344 | 252 |
345 static void uw_free_channel(channel *ch) { | 253 static void client_send(int already_locked, client *c, buf *msg) { |
346 if (ch->mode == USED) { | 254 if (!already_locked) |
347 client_list *cs; | 255 pthread_mutex_lock(&c->lock); |
348 | 256 |
349 for (cs = ch->data.used.clients; cs; ) { | 257 if (c->sock != -1) { |
350 client_list *tmp = cs->next; | 258 uw_really_send(c->sock, begin_msgs, sizeof(begin_msgs) - 1); |
351 free(cs); | 259 uw_really_send(c->sock, msg->start, buf_used(msg)); |
352 cs = tmp; | 260 close(c->sock); |
353 } | 261 c->sock = -1; |
354 pthread_mutex_destroy(&ch->data.used.lock); | 262 } else |
355 ch->mode = UNUSED; | 263 buf_append(&c->msgs, msg->start, buf_used(msg)); |
356 ch->data.next = channels_free; | 264 |
357 channels_free = ch; | 265 if (!already_locked) |
358 } | 266 pthread_mutex_unlock(&c->lock); |
359 } | |
360 | |
361 static channel *uw_find_channel(size_t id) { | |
362 channel *ch = NULL; | |
363 | |
364 pthread_mutex_lock(&channels_mutex); | |
365 | |
366 if (id < n_channels && channels[id]->mode == USED) { | |
367 ch = channels[id]; | |
368 | |
369 pthread_mutex_lock(&ch->data.used.lock); | |
370 ++ch->data.used.refcount; | |
371 pthread_mutex_unlock(&ch->data.used.lock); | |
372 } | |
373 | |
374 pthread_mutex_unlock(&channels_mutex); | |
375 | |
376 return ch; | |
377 } | |
378 | |
379 static void uw_release_channel(channel *ch) { | |
380 pthread_mutex_lock(&ch->data.used.lock); | |
381 ++ch->data.used.refcount; | |
382 pthread_mutex_unlock(&ch->data.used.lock); | |
383 } | |
384 | |
385 static void uw_subscribe(channel *ch, client *c) { | |
386 client_list *cs; | |
387 | |
388 pthread_mutex_lock(&ch->data.used.lock); | |
389 | |
390 for (cs = ch->data.used.clients; cs; cs = cs->next) | |
391 if (cs->data == c) { | |
392 pthread_mutex_unlock(&ch->data.used.lock); | |
393 return; | |
394 } | |
395 | |
396 cs = malloc(sizeof(client_list)); | |
397 cs->data = c; | |
398 cs->next = ch->data.used.clients; | |
399 ch->data.used.clients = cs; | |
400 | |
401 pthread_mutex_unlock(&ch->data.used.lock); | |
402 } | |
403 | |
404 static void uw_unsubscribe(channel *ch, client *c) { | |
405 client_list *prev, *cur, *tmp; | |
406 | |
407 pthread_mutex_lock(&ch->data.used.lock); | |
408 | |
409 for (prev = NULL, cur = ch->data.used.clients; cur; ) { | |
410 if (cur->data == c) { | |
411 if (prev) | |
412 prev->next = cur->next; | |
413 else | |
414 ch->data.used.clients = cur->next; | |
415 tmp = cur; | |
416 cur = cur->next; | |
417 free(tmp); | |
418 } | |
419 else { | |
420 prev = cur; | |
421 cur = cur->next; | |
422 } | |
423 } | |
424 | |
425 pthread_mutex_unlock(&ch->data.used.lock); | |
426 } | |
427 | |
428 static void uw_channel_send(channel *ch, const char *msg) { | |
429 size_t len = strlen(msg), preLen; | |
430 char pre[INTS_MAX + 2]; | |
431 client_list *cs; | |
432 | |
433 sprintf(pre, "%d\n", (int)ch->id); | |
434 preLen = strlen(pre); | |
435 | |
436 pthread_mutex_lock(&ch->data.used.lock); | |
437 | |
438 for (cs = ch->data.used.clients; cs; cs = cs->next) { | |
439 client *c = cs->data; | |
440 | |
441 pthread_mutex_lock(&c->data.used.lock); | |
442 | |
443 if (c->data.used.sock != -1) { | |
444 uw_really_send(c->data.used.sock, begin_msgs, sizeof(begin_msgs) - 1); | |
445 uw_really_send(c->data.used.sock, pre, preLen); | |
446 uw_really_send(c->data.used.sock, msg, len); | |
447 uw_really_send(c->data.used.sock, "\n", 1); | |
448 close(c->data.used.sock); | |
449 c->data.used.sock = -1; | |
450 } else { | |
451 buf_append(&c->data.used.msgs, pre, preLen); | |
452 buf_append(&c->data.used.msgs, msg, len); | |
453 buf_append(&c->data.used.msgs, "\n", 1); | |
454 } | |
455 | |
456 pthread_mutex_unlock(&c->data.used.lock); | |
457 } | |
458 | |
459 pthread_mutex_unlock(&ch->data.used.lock); | |
460 } | 267 } |
461 | 268 |
462 | 269 |
463 // Global entry points | 270 // Global entry points |
464 | 271 |
465 void uw_global_init() { | 272 void uw_global_init() { |
466 srand(time(NULL) ^ getpid()); | 273 srand(time(NULL) ^ getpid()); |
467 | 274 |
468 clients = malloc(0); | 275 clients = malloc(0); |
469 channels = malloc(0); | |
470 } | 276 } |
471 | 277 |
472 | 278 |
473 // Single-request state | 279 // Single-request state |
474 | 280 |
482 void (*func)(void*); | 288 void (*func)(void*); |
483 void *arg; | 289 void *arg; |
484 } cleanup; | 290 } cleanup; |
485 | 291 |
486 typedef struct { | 292 typedef struct { |
487 usage mode; | 293 unsigned client; |
488 channel *ch; | |
489 enum { OLD, NEW } newness; | |
490 | |
491 size_t n_subscribed; | |
492 client **subscribed; | |
493 | |
494 buf msgs; | 294 buf msgs; |
495 } channel_delta; | 295 } delta; |
496 | 296 |
497 struct uw_context { | 297 struct uw_context { |
498 char *headers, *headers_end; | 298 char *headers, *headers_end; |
499 | 299 |
500 buf outHeaders, page, heap, script; | 300 buf outHeaders, page, heap, script; |
510 | 310 |
511 cleanup *cleanup, *cleanup_front, *cleanup_back; | 311 cleanup *cleanup, *cleanup_front, *cleanup_back; |
512 | 312 |
513 const char *script_header, *url_prefix; | 313 const char *script_header, *url_prefix; |
514 | 314 |
515 size_t n_deltas; | 315 size_t n_deltas, used_deltas; |
516 channel_delta *deltas; | 316 delta *deltas; |
517 | 317 |
518 int timeout; | 318 int timeout; |
319 | |
320 client *client; | |
519 | 321 |
520 char error_message[ERROR_BUF_LEN]; | 322 char error_message[ERROR_BUF_LEN]; |
521 }; | 323 }; |
522 | 324 |
523 extern int uw_inputs_len; | 325 extern int uw_inputs_len; |
546 | 348 |
547 ctx->error_message[0] = 0; | 349 ctx->error_message[0] = 0; |
548 | 350 |
549 ctx->source_count = 0; | 351 ctx->source_count = 0; |
550 | 352 |
551 ctx->n_deltas = 0; | 353 ctx->n_deltas = ctx->used_deltas = 0; |
552 ctx->deltas = malloc(0); | 354 ctx->deltas = malloc(0); |
553 | 355 |
554 ctx->timeout = uw_timeout; | 356 ctx->timeout = uw_timeout; |
555 | 357 |
556 return ctx; | 358 return ctx; |
572 buf_free(&ctx->page); | 374 buf_free(&ctx->page); |
573 buf_free(&ctx->heap); | 375 buf_free(&ctx->heap); |
574 free(ctx->inputs); | 376 free(ctx->inputs); |
575 free(ctx->cleanup); | 377 free(ctx->cleanup); |
576 | 378 |
577 for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED; ++i) { | 379 for (i = 0; i < ctx->n_deltas; ++i) |
578 free(ctx->deltas[i].subscribed); | |
579 buf_free(&ctx->deltas[i].msgs); | 380 buf_free(&ctx->deltas[i].msgs); |
580 } | |
581 | 381 |
582 free(ctx); | 382 free(ctx); |
583 } | 383 } |
584 | 384 |
585 void uw_reset_keep_error_message(uw_context ctx) { | 385 void uw_reset_keep_error_message(uw_context ctx) { |
589 buf_reset(&ctx->page); | 389 buf_reset(&ctx->page); |
590 buf_reset(&ctx->heap); | 390 buf_reset(&ctx->heap); |
591 ctx->regions = NULL; | 391 ctx->regions = NULL; |
592 ctx->cleanup_front = ctx->cleanup; | 392 ctx->cleanup_front = ctx->cleanup; |
593 ctx->source_count = 0; | 393 ctx->source_count = 0; |
594 if (ctx->n_deltas > 0) | 394 ctx->used_deltas = 0; |
595 ctx->deltas[0].mode = UNUSED; | 395 ctx->client = NULL; |
596 } | 396 } |
597 | 397 |
598 void uw_reset_keep_request(uw_context ctx) { | 398 void uw_reset_keep_request(uw_context ctx) { |
599 uw_reset_keep_error_message(ctx); | 399 uw_reset_keep_error_message(ctx); |
600 ctx->error_message[0] = 0; | 400 ctx->error_message[0] = 0; |
667 ctx->cleanup_front->func = func; | 467 ctx->cleanup_front->func = func; |
668 ctx->cleanup_front->arg = arg; | 468 ctx->cleanup_front->arg = arg; |
669 ++ctx->cleanup_front; | 469 ++ctx->cleanup_front; |
670 } | 470 } |
671 | 471 |
472 uw_Basis_string uw_Basis_requestHeader(uw_context ctx, uw_Basis_string h) { | |
473 int len = strlen(h); | |
474 char *s = ctx->headers, *p; | |
475 | |
476 while (p = strchr(s, ':')) { | |
477 if (p - s == len && !strncasecmp(s, h, len)) { | |
478 return p + 2; | |
479 } else { | |
480 if ((s = strchr(p, 0)) && s < ctx->headers_end) | |
481 s += 2; | |
482 else | |
483 return NULL; | |
484 } | |
485 } | |
486 | |
487 return NULL; | |
488 } | |
489 | |
490 void uw_login(uw_context ctx) { | |
491 if (ctx->script_header[0]) { | |
492 char *id_s, *pass_s; | |
493 | |
494 if ((id_s = uw_Basis_requestHeader(ctx, "UrWeb-Client")) | |
495 && (pass_s = uw_Basis_requestHeader(ctx, "UrWeb-Pass"))) { | |
496 unsigned id = atoi(id_s); | |
497 int pass = atoi(pass_s); | |
498 client *c = find_client(id); | |
499 | |
500 if (c == NULL) | |
501 uw_error(ctx, FATAL, "Unknown client ID in HTTP headers (%s, %s)", id_s, pass_s); | |
502 else { | |
503 pthread_mutex_lock(&c->lock); | |
504 ctx->client = c; | |
505 | |
506 if (c->mode != USED) | |
507 uw_error(ctx, FATAL, "Stale client ID (%u) in subscription request", id); | |
508 if (c->pass != pass) | |
509 uw_error(ctx, FATAL, "Wrong client password (%u, %d) in subscription request", id, pass); | |
510 } | |
511 } else { | |
512 client *c = new_client(); | |
513 pthread_mutex_lock(&c->lock); | |
514 ctx->client = c; | |
515 } | |
516 } | |
517 } | |
518 | |
672 failure_kind uw_begin(uw_context ctx, char *path) { | 519 failure_kind uw_begin(uw_context ctx, char *path) { |
673 int r = setjmp(ctx->jmp_buf); | 520 int r = setjmp(ctx->jmp_buf); |
674 | 521 |
675 if (r == 0) { | 522 if (r == 0) { |
676 if (uw_db_begin(ctx)) | 523 if (uw_db_begin(ctx)) |
677 uw_error(ctx, BOUNDED_RETRY, "Error running SQL BEGIN"); | 524 uw_error(ctx, BOUNDED_RETRY, "Error running SQL BEGIN"); |
525 | |
678 uw_handle(ctx, path); | 526 uw_handle(ctx, path); |
679 } | 527 } |
680 | 528 |
681 return r; | 529 return r; |
530 } | |
531 | |
532 uw_Basis_client uw_Basis_self(uw_context ctx, uw_unit u) { | |
533 if (ctx->client == NULL) | |
534 uw_error(ctx, FATAL, "Call to Basis.self() from page that has only server-side code"); | |
535 | |
536 return ctx->client->id; | |
682 } | 537 } |
683 | 538 |
684 void uw_pop_cleanup(uw_context ctx) { | 539 void uw_pop_cleanup(uw_context ctx) { |
685 if (ctx->cleanup_front == ctx->cleanup) | 540 if (ctx->cleanup_front == ctx->cleanup) |
686 uw_error(ctx, FATAL, "Attempt to pop from empty cleanup action stack"); | 541 uw_error(ctx, FATAL, "Attempt to pop from empty cleanup action stack"); |
841 | 696 |
842 const char *uw_Basis_get_script(uw_context ctx, uw_unit u) { | 697 const char *uw_Basis_get_script(uw_context ctx, uw_unit u) { |
843 if (ctx->script_header[0] == 0) | 698 if (ctx->script_header[0] == 0) |
844 return ""; | 699 return ""; |
845 else { | 700 else { |
846 int pass; | |
847 client *c = uw_new_client(&pass); | |
848 | |
849 char *r = uw_malloc(ctx, strlen(ctx->script_header) + 18 + buf_used(&ctx->script)); | 701 char *r = uw_malloc(ctx, strlen(ctx->script_header) + 18 + buf_used(&ctx->script)); |
850 sprintf(r, "%s<script>%s</script>", | 702 sprintf(r, "%s<script>%s</script>", |
851 ctx->script_header, | 703 ctx->script_header, |
852 ctx->script.start); | 704 ctx->script.start); |
853 return r; | 705 return r; |
854 } | 706 } |
855 } | 707 } |
856 | 708 |
857 const char *uw_Basis_get_settings(uw_context ctx, uw_Basis_string onload) { | 709 const char *uw_Basis_get_settings(uw_context ctx, uw_Basis_string onload) { |
858 if (ctx->script_header[0] == 0) | 710 if (ctx->client == NULL) |
859 return ""; | 711 return ""; |
860 else { | 712 else { |
861 int pass; | |
862 client *c = uw_new_client(&pass); | |
863 | |
864 char *r = uw_malloc(ctx, 52 + 3 * INTS_MAX + strlen(ctx->url_prefix) + strlen(onload)); | 713 char *r = uw_malloc(ctx, 52 + 3 * INTS_MAX + strlen(ctx->url_prefix) + strlen(onload)); |
865 sprintf(r, " onload='client_id=%d;client_pass=%d;url_prefix=\"%s\";timeout=%d;listener();%s'", | 714 sprintf(r, " onload='client_id=%u;client_pass=%d;url_prefix=\"%s\";timeout=%d;listener();%s'", |
866 (int)c->id, | 715 ctx->client->id, |
867 c->data.used.pass, | 716 ctx->client->pass, |
868 ctx->url_prefix, | 717 ctx->url_prefix, |
869 ctx->timeout, | 718 ctx->timeout, |
870 onload); | 719 onload); |
871 return r; | 720 return r; |
872 } | 721 } |
940 strcpy(s2, "\""); | 789 strcpy(s2, "\""); |
941 ctx->script.front = s2 + 1; | 790 ctx->script.front = s2 + 1; |
942 return r; | 791 return r; |
943 } | 792 } |
944 | 793 |
794 char *uw_Basis_jsifyChannel(uw_context ctx, uw_Basis_channel chn) { | |
795 if (ctx->client == NULL || chn.cli != ctx->client->id) | |
796 return "null"; | |
797 else { | |
798 int len; | |
799 char *r; | |
800 | |
801 uw_check_heap(ctx, INTS_MAX + 1); | |
802 r = ctx->heap.front; | |
803 sprintf(r, "%u%n", chn.chn, &len); | |
804 ctx->heap.front += len+1; | |
805 return r; | |
806 } | |
807 } | |
808 | |
945 uw_Basis_int uw_Basis_new_client_source(uw_context ctx, uw_Basis_string s) { | 809 uw_Basis_int uw_Basis_new_client_source(uw_context ctx, uw_Basis_string s) { |
946 int len; | 810 int len; |
947 size_t s_len = strlen(s); | 811 size_t s_len = strlen(s); |
948 | 812 |
949 uw_check_script(ctx, 12 + INTS_MAX + s_len); | 813 uw_check_script(ctx, 12 + INTS_MAX + s_len); |
1001 char *result; | 865 char *result; |
1002 int len; | 866 int len; |
1003 uw_check_heap(ctx, INTS_MAX); | 867 uw_check_heap(ctx, INTS_MAX); |
1004 result = ctx->heap.front; | 868 result = ctx->heap.front; |
1005 sprintf(result, "%lld%n", n, &len); | 869 sprintf(result, "%lld%n", n, &len); |
1006 ctx->heap.front += len+1; | |
1007 return result; | |
1008 } | |
1009 | |
1010 char *uw_Basis_attrifyChannel(uw_context ctx, uw_Basis_channel n) { | |
1011 char *result; | |
1012 int len; | |
1013 uw_check_heap(ctx, INTS_MAX); | |
1014 result = ctx->heap.front; | |
1015 sprintf(result, "%lld%n", (long long)n, &len); | |
1016 ctx->heap.front += len+1; | 870 ctx->heap.front += len+1; |
1017 return result; | 871 return result; |
1018 } | 872 } |
1019 | 873 |
1020 char *uw_Basis_attrifyFloat(uw_context ctx, uw_Basis_float n) { | 874 char *uw_Basis_attrifyFloat(uw_context ctx, uw_Basis_float n) { |
1114 sprintf(r, "%lld%n", n, &len); | 968 sprintf(r, "%lld%n", n, &len); |
1115 ctx->heap.front += len+1; | 969 ctx->heap.front += len+1; |
1116 return r; | 970 return r; |
1117 } | 971 } |
1118 | 972 |
1119 char *uw_Basis_urlifyChannel(uw_context ctx, uw_Basis_channel n) { | 973 char *uw_Basis_urlifyChannel(uw_context ctx, uw_Basis_channel chn) { |
1120 int len; | 974 if (ctx->client == NULL || chn.cli != ctx->client->id) |
1121 char *r; | 975 return ""; |
1122 | 976 else { |
1123 uw_check_heap(ctx, INTS_MAX); | 977 int len; |
1124 r = ctx->heap.front; | 978 char *r; |
1125 sprintf(r, "%lld%n", (long long)n, &len); | 979 |
1126 ctx->heap.front += len+1; | 980 uw_check_heap(ctx, INTS_MAX + 1); |
1127 return r; | 981 r = ctx->heap.front; |
982 sprintf(r, "%u%n", chn.chn, &len); | |
983 ctx->heap.front += len+1; | |
984 return r; | |
985 } | |
1128 } | 986 } |
1129 | 987 |
1130 char *uw_Basis_urlifyFloat(uw_context ctx, uw_Basis_float n) { | 988 char *uw_Basis_urlifyFloat(uw_context ctx, uw_Basis_float n) { |
1131 int len; | 989 int len; |
1132 char *r; | 990 char *r; |
1180 uw_Basis_urlifyInt_w_unsafe(ctx, n); | 1038 uw_Basis_urlifyInt_w_unsafe(ctx, n); |
1181 | 1039 |
1182 return uw_unit_v; | 1040 return uw_unit_v; |
1183 } | 1041 } |
1184 | 1042 |
1185 uw_unit uw_Basis_urlifyChannel_w(uw_context ctx, uw_Basis_channel n) { | 1043 uw_unit uw_Basis_urlifyChannel_w(uw_context ctx, uw_Basis_channel chn) { |
1186 int len; | 1044 if (ctx->client != NULL && chn.cli == ctx->client->id) { |
1187 | 1045 int len; |
1188 uw_check(ctx, INTS_MAX); | 1046 |
1189 sprintf(ctx->page.front, "%lld%n", (long long)n, &len); | 1047 uw_check(ctx, INTS_MAX + 1); |
1190 ctx->page.front += len; | 1048 sprintf(ctx->page.front, "%u%n", chn.chn, &len); |
1191 | 1049 ctx->page.front += len; |
1050 } | |
1051 | |
1192 return uw_unit_v; | 1052 return uw_unit_v; |
1193 } | 1053 } |
1194 | 1054 |
1195 uw_unit uw_Basis_urlifyFloat_w(uw_context ctx, uw_Basis_float n) { | 1055 uw_unit uw_Basis_urlifyFloat_w(uw_context ctx, uw_Basis_float n) { |
1196 int len; | 1056 int len; |
1251 uw_Basis_int r; | 1111 uw_Basis_int r; |
1252 | 1112 |
1253 r = atoll(*s); | 1113 r = atoll(*s); |
1254 *s = new_s; | 1114 *s = new_s; |
1255 return r; | 1115 return r; |
1256 } | |
1257 | |
1258 uw_Basis_channel uw_Basis_unurlifyChannel(uw_context ctx, char **s) { | |
1259 return uw_Basis_unurlifyInt(ctx, s); | |
1260 } | 1116 } |
1261 | 1117 |
1262 uw_Basis_float uw_Basis_unurlifyFloat(uw_context ctx, char **s) { | 1118 uw_Basis_float uw_Basis_unurlifyFloat(uw_context ctx, char **s) { |
1263 char *new_s = uw_unurlify_advance(*s); | 1119 char *new_s = uw_unurlify_advance(*s); |
1264 uw_Basis_float r; | 1120 uw_Basis_float r; |
1346 uw_check(ctx, INTS_MAX); | 1202 uw_check(ctx, INTS_MAX); |
1347 sprintf(ctx->page.front, "%lld%n", n, &len); | 1203 sprintf(ctx->page.front, "%lld%n", n, &len); |
1348 ctx->page.front += len; | 1204 ctx->page.front += len; |
1349 | 1205 |
1350 return uw_unit_v; | 1206 return uw_unit_v; |
1351 } | |
1352 | |
1353 char *uw_Basis_htmlifyChannel(uw_context ctx, uw_Basis_channel ch) { | |
1354 return uw_Basis_htmlifyInt(ctx, ch); | |
1355 } | 1207 } |
1356 | 1208 |
1357 char *uw_Basis_htmlifyFloat(uw_context ctx, uw_Basis_float n) { | 1209 char *uw_Basis_htmlifyFloat(uw_context ctx, uw_Basis_float n) { |
1358 int len; | 1210 int len; |
1359 char *r; | 1211 char *r; |
1539 sprintf(r, "%lld::int8%n", n, &len); | 1391 sprintf(r, "%lld::int8%n", n, &len); |
1540 ctx->heap.front += len+1; | 1392 ctx->heap.front += len+1; |
1541 return r; | 1393 return r; |
1542 } | 1394 } |
1543 | 1395 |
1544 char *uw_Basis_sqlifyChannel(uw_context ctx, uw_Basis_channel n) { | |
1545 int len; | |
1546 char *r; | |
1547 | |
1548 uw_check_heap(ctx, INTS_MAX + 6); | |
1549 r = ctx->heap.front; | |
1550 sprintf(r, "%lld::int4%n", (long long)n, &len); | |
1551 ctx->heap.front += len+1; | |
1552 return r; | |
1553 } | |
1554 | |
1555 char *uw_Basis_sqlifyIntN(uw_context ctx, uw_Basis_int *n) { | 1396 char *uw_Basis_sqlifyIntN(uw_context ctx, uw_Basis_int *n) { |
1556 if (n == NULL) | 1397 if (n == NULL) |
1557 return "NULL"; | 1398 return "NULL"; |
1558 else | 1399 else |
1559 return uw_Basis_sqlifyInt(ctx, *n); | 1400 return uw_Basis_sqlifyInt(ctx, *n); |
1612 strcpy(s2, "'::text"); | 1453 strcpy(s2, "'::text"); |
1613 ctx->heap.front = s2 + 8; | 1454 ctx->heap.front = s2 + 8; |
1614 return r; | 1455 return r; |
1615 } | 1456 } |
1616 | 1457 |
1458 char *uw_Basis_sqlifyChannel(uw_context ctx, uw_Basis_channel chn) { | |
1459 int len; | |
1460 char *r; | |
1461 unsigned long long combo = ((unsigned long long)chn.cli << 32) | chn.chn; | |
1462 | |
1463 uw_check_heap(ctx, INTS_MAX + 7); | |
1464 r = ctx->heap.front; | |
1465 sprintf(r, "%lld::int8%n", combo, &len); | |
1466 ctx->heap.front += len+1; | |
1467 return r; | |
1468 } | |
1469 | |
1470 char *uw_Basis_attrifyChannel(uw_context ctx, uw_Basis_channel chn) { | |
1471 int len; | |
1472 char *r; | |
1473 unsigned long long combo = ((unsigned long long)chn.cli << 32) | chn.chn; | |
1474 | |
1475 uw_check_heap(ctx, INTS_MAX + 1); | |
1476 r = ctx->heap.front; | |
1477 sprintf(r, "%lld%n", combo, &len); | |
1478 ctx->heap.front += len+1; | |
1479 return r; | |
1480 } | |
1481 | |
1482 char *uw_Basis_sqlifyClient(uw_context ctx, uw_Basis_client cli) { | |
1483 int len; | |
1484 char *r; | |
1485 | |
1486 uw_check_heap(ctx, INTS_MAX + 7); | |
1487 r = ctx->heap.front; | |
1488 sprintf(r, "%u::int4%n", cli, &len); | |
1489 ctx->heap.front += len+1; | |
1490 return r; | |
1491 } | |
1492 | |
1493 char *uw_Basis_attrifyClient(uw_context ctx, uw_Basis_client cli) { | |
1494 int len; | |
1495 char *r; | |
1496 | |
1497 uw_check_heap(ctx, INTS_MAX + 1); | |
1498 r = ctx->heap.front; | |
1499 sprintf(r, "%u%n", cli, &len); | |
1500 ctx->heap.front += len+1; | |
1501 return r; | |
1502 } | |
1503 | |
1617 uw_Basis_string uw_Basis_sqlifyStringN(uw_context ctx, uw_Basis_string s) { | 1504 uw_Basis_string uw_Basis_sqlifyStringN(uw_context ctx, uw_Basis_string s) { |
1618 if (s == NULL) | 1505 if (s == NULL) |
1619 return "NULL"; | 1506 return "NULL"; |
1620 else | 1507 else |
1621 return uw_Basis_sqlifyString(ctx, s); | 1508 return uw_Basis_sqlifyString(ctx, s); |
1634 else | 1521 else |
1635 return uw_Basis_sqlifyBool(ctx, *b); | 1522 return uw_Basis_sqlifyBool(ctx, *b); |
1636 } | 1523 } |
1637 | 1524 |
1638 char *uw_Basis_sqlifyTime(uw_context ctx, uw_Basis_time t) { | 1525 char *uw_Basis_sqlifyTime(uw_context ctx, uw_Basis_time t) { |
1526 size_t len; | |
1527 char *r, *s; | |
1528 struct tm stm; | |
1529 | |
1530 if (localtime_r(&t, &stm)) { | |
1531 s = uw_malloc(ctx, TIMES_MAX); | |
1532 len = strftime(s, TIMES_MAX, TIME_FMT, &stm); | |
1533 r = uw_malloc(ctx, len + 14); | |
1534 sprintf(r, "'%s'::timestamp", s); | |
1535 return r; | |
1536 } else | |
1537 return "<Invalid time>"; | |
1538 } | |
1539 | |
1540 char *uw_Basis_attrifyTime(uw_context ctx, uw_Basis_time t) { | |
1639 size_t len; | 1541 size_t len; |
1640 char *r; | 1542 char *r; |
1641 struct tm stm; | 1543 struct tm stm; |
1642 | 1544 |
1643 if (localtime_r(&t, &stm)) { | 1545 if (localtime_r(&t, &stm)) { |
1721 return r; | 1623 return r; |
1722 } else | 1624 } else |
1723 return NULL; | 1625 return NULL; |
1724 } | 1626 } |
1725 | 1627 |
1726 uw_Basis_channel *uw_Basis_stringToChannel(uw_context ctx, uw_Basis_string s) { | |
1727 char *endptr; | |
1728 uw_Basis_channel n = strtoll(s, &endptr, 10); | |
1729 | |
1730 if (*s != '\0' && *endptr == '\0') { | |
1731 uw_Basis_channel *r = uw_malloc(ctx, sizeof(uw_Basis_channel)); | |
1732 *r = n; | |
1733 return r; | |
1734 } else | |
1735 return NULL; | |
1736 } | |
1737 | |
1738 uw_Basis_float *uw_Basis_stringToFloat(uw_context ctx, uw_Basis_string s) { | 1628 uw_Basis_float *uw_Basis_stringToFloat(uw_context ctx, uw_Basis_string s) { |
1739 char *endptr; | 1629 char *endptr; |
1740 uw_Basis_float n = strtod(s, &endptr); | 1630 uw_Basis_float n = strtod(s, &endptr); |
1741 | 1631 |
1742 if (*s != '\0' && *endptr == '\0') { | 1632 if (*s != '\0' && *endptr == '\0') { |
1800 return n; | 1690 return n; |
1801 else | 1691 else |
1802 uw_error(ctx, FATAL, "Can't parse int: %s", s); | 1692 uw_error(ctx, FATAL, "Can't parse int: %s", s); |
1803 } | 1693 } |
1804 | 1694 |
1695 #include <errno.h> | |
1696 | |
1805 uw_Basis_channel uw_Basis_stringToChannel_error(uw_context ctx, uw_Basis_string s) { | 1697 uw_Basis_channel uw_Basis_stringToChannel_error(uw_context ctx, uw_Basis_string s) { |
1698 unsigned long long n; | |
1699 | |
1700 if (sscanf(s, "%llu", &n) < 1) | |
1701 uw_error(ctx, FATAL, "Can't parse channel: %s", s); | |
1702 else { | |
1703 uw_Basis_channel ch = {n >> 32, n & ((1ull << 32) - 1)}; | |
1704 return ch; | |
1705 } | |
1706 } | |
1707 | |
1708 uw_Basis_client uw_Basis_stringToClient_error(uw_context ctx, uw_Basis_string s) { | |
1806 char *endptr; | 1709 char *endptr; |
1807 uw_Basis_channel n = strtoll(s, &endptr, 10); | 1710 unsigned long n = strtoul(s, &endptr, 10); |
1808 | 1711 |
1809 if (*s != '\0' && *endptr == '\0') | 1712 if (*s != '\0' && *endptr == '\0') |
1810 return n; | 1713 return n; |
1811 else | 1714 else |
1812 uw_error(ctx, FATAL, "Can't parse channel int: %s", s); | 1715 uw_error(ctx, FATAL, "Can't parse client: %s", s); |
1813 } | 1716 } |
1814 | 1717 |
1815 uw_Basis_float uw_Basis_stringToFloat_error(uw_context ctx, uw_Basis_string s) { | 1718 uw_Basis_float uw_Basis_stringToFloat_error(uw_context ctx, uw_Basis_string s) { |
1816 char *endptr; | 1719 char *endptr; |
1817 uw_Basis_float n = strtod(s, &endptr); | 1720 uw_Basis_float n = strtod(s, &endptr); |
1851 return mktime(&stm); | 1754 return mktime(&stm); |
1852 else if (strptime(s, TIME_FMT, &stm) == end) | 1755 else if (strptime(s, TIME_FMT, &stm) == end) |
1853 return mktime(&stm); | 1756 return mktime(&stm); |
1854 else | 1757 else |
1855 uw_error(ctx, FATAL, "Can't parse time: %s", s); | 1758 uw_error(ctx, FATAL, "Can't parse time: %s", s); |
1856 } | |
1857 } | |
1858 | |
1859 uw_Basis_string uw_Basis_requestHeader(uw_context ctx, uw_Basis_string h) { | |
1860 int len = strlen(h); | |
1861 char *s = ctx->headers, *p; | |
1862 | |
1863 while (p = strchr(s, ':')) { | |
1864 if (p - s == len && !strncasecmp(s, h, len)) { | |
1865 return p + 2; | |
1866 } else { | |
1867 if ((s = strchr(p, 0)) && s < ctx->headers_end) | |
1868 s += 2; | |
1869 else | |
1870 return NULL; | |
1871 } | |
1872 } | 1759 } |
1873 } | 1760 } |
1874 | 1761 |
1875 uw_Basis_string uw_Basis_get_cookie(uw_context ctx, uw_Basis_string c) { | 1762 uw_Basis_string uw_Basis_get_cookie(uw_context ctx, uw_Basis_string c) { |
1876 int len = strlen(c); | 1763 int len = strlen(c); |
1928 uw_write_header(ctx, "\r\n"); | 1815 uw_write_header(ctx, "\r\n"); |
1929 | 1816 |
1930 return uw_unit_v; | 1817 return uw_unit_v; |
1931 } | 1818 } |
1932 | 1819 |
1933 static channel_delta *allocate_delta(uw_context ctx, channel *ch) { | 1820 static delta *allocate_delta(uw_context ctx, unsigned client) { |
1934 size_t i; | 1821 unsigned i; |
1935 channel_delta *cd; | 1822 delta *d; |
1936 | 1823 |
1937 for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED && ctx->deltas[i].ch != ch; ++i); | 1824 for (i = 0; i < ctx->used_deltas; ++i) |
1938 | 1825 if (ctx->deltas[i].client == client) |
1939 if (i < ctx->n_deltas && ctx->deltas[i].mode == USED && ctx->deltas[i].ch == ch) | 1826 return &ctx->deltas[i]; |
1940 return &ctx->deltas[i]; | 1827 |
1941 | 1828 if (ctx->used_deltas >= ctx->n_deltas) { |
1942 if (i < ctx->n_deltas) | 1829 ctx->deltas = realloc(ctx->deltas, sizeof(delta) * ++ctx->n_deltas); |
1943 cd = &ctx->deltas[i]; | 1830 buf_init(&ctx->deltas[ctx->n_deltas-1].msgs, 0); |
1944 else { | 1831 } |
1945 ++ctx->n_deltas; | 1832 |
1946 ctx->deltas = realloc(ctx->deltas, sizeof(channel_delta) * ctx->n_deltas); | 1833 d = &ctx->deltas[ctx->used_deltas++]; |
1947 cd = &ctx->deltas[ctx->n_deltas-1]; | 1834 d->client = client; |
1948 cd->n_subscribed = 0; | 1835 buf_reset(&d->msgs); |
1949 cd->subscribed = malloc(0); | 1836 return d; |
1950 buf_init(&cd->msgs, 0); | |
1951 } | |
1952 | |
1953 cd->mode = USED; | |
1954 cd->newness = OLD; | |
1955 cd->ch = ch; | |
1956 if (cd->n_subscribed > 0) | |
1957 cd->subscribed[0] = NULL; | |
1958 buf_reset(&cd->msgs); | |
1959 return cd; | |
1960 } | 1837 } |
1961 | 1838 |
1962 uw_Basis_channel uw_Basis_new_channel(uw_context ctx, uw_unit u) { | 1839 uw_Basis_channel uw_Basis_new_channel(uw_context ctx, uw_unit u) { |
1963 size_t i; | 1840 if (ctx->client == NULL) |
1964 channel *ch = uw_new_channel(); | 1841 uw_error(ctx, FATAL, "Attempt to create channel on request not associated with a persistent connection"); |
1965 ++ch->data.used.refcount; | 1842 |
1966 channel_delta *cd = allocate_delta(ctx, ch); | 1843 return new_channel(ctx->client); |
1967 | |
1968 cd->newness = NEW; | |
1969 | |
1970 return ch->id; | |
1971 } | |
1972 | |
1973 static int delta_used(channel_delta *cd) { | |
1974 return cd->newness == NEW || buf_used(&cd->msgs) > 0 || (cd->n_subscribed > 0 && cd->subscribed[0]); | |
1975 } | |
1976 | |
1977 uw_unit uw_Basis_subscribe(uw_context ctx, uw_Basis_channel chn) { | |
1978 channel *ch = uw_find_channel(chn); | |
1979 | |
1980 if (ch == NULL) | |
1981 uw_error(ctx, FATAL, "Bad channel ID %d", (int)chn); | |
1982 else { | |
1983 size_t id = atoi(uw_Basis_requestHeader(ctx, "UrWeb-Client")); | |
1984 int pass = atoi(uw_Basis_requestHeader(ctx, "UrWeb-Pass")); | |
1985 client *c = uw_find_client(id); | |
1986 | |
1987 if (c == NULL) { | |
1988 uw_release_channel(ch); | |
1989 uw_error(ctx, FATAL, "Unknown client ID in subscription request"); | |
1990 } else if (c->data.used.pass != pass) { | |
1991 uw_release_channel(ch); | |
1992 uw_release_client(c); | |
1993 uw_error(ctx, FATAL, "Wrong client password (%d) in subscription request", pass); | |
1994 } else { | |
1995 size_t i; | |
1996 channel_delta *cd = allocate_delta(ctx, ch); | |
1997 | |
1998 if (delta_used(cd)) | |
1999 uw_release_channel(ch); | |
2000 | |
2001 for (i = 0; i < cd->n_subscribed && cd->subscribed[i]; ++i); | |
2002 | |
2003 if (i < cd->n_subscribed) | |
2004 cd->subscribed[i] = c; | |
2005 else { | |
2006 ++cd->n_subscribed; | |
2007 cd->subscribed = realloc(cd->subscribed, sizeof(int) * cd->n_subscribed); | |
2008 cd->subscribed[cd->n_subscribed-1] = c; | |
2009 } | |
2010 } | |
2011 } | |
2012 | |
2013 return uw_unit_v; | |
2014 } | 1844 } |
2015 | 1845 |
2016 uw_unit uw_Basis_send(uw_context ctx, uw_Basis_channel chn, uw_Basis_string msg) { | 1846 uw_unit uw_Basis_send(uw_context ctx, uw_Basis_channel chn, uw_Basis_string msg) { |
2017 channel *ch = uw_find_channel(chn); | 1847 delta *d = allocate_delta(ctx, chn.cli); |
2018 | 1848 size_t len; |
2019 if (ch == NULL) | 1849 int preLen; |
2020 uw_error(ctx, FATAL, "Bad channel ID %d", (int)chn); | 1850 char pre[INTS_MAX + 2]; |
2021 else { | 1851 |
2022 channel_delta *cd = allocate_delta(ctx, ch); | 1852 len = strlen(msg); |
2023 if (delta_used(cd)) | 1853 |
2024 uw_release_channel(ch); | 1854 sprintf(pre, "%u\n%n", chn.chn, &preLen); |
2025 buf_append(&cd->msgs, msg, strlen(msg)); | 1855 |
2026 } | 1856 buf_append(&d->msgs, pre, preLen); |
1857 buf_append(&d->msgs, msg, len); | |
1858 buf_append(&d->msgs, "\n", 1); | |
2027 | 1859 |
2028 return uw_unit_v; | 1860 return uw_unit_v; |
2029 } | 1861 } |
2030 | 1862 |
2031 int uw_db_commit(uw_context); | 1863 int uw_db_commit(uw_context); |
2032 int uw_db_rollback(uw_context); | 1864 int uw_db_rollback(uw_context); |
2033 | 1865 |
2034 void uw_commit(uw_context ctx) { | 1866 void uw_commit(uw_context ctx) { |
2035 size_t i, j; | 1867 unsigned i; |
2036 | |
2037 for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED; ++i) { | |
2038 channel *ch = ctx->deltas[i].ch; | |
2039 | |
2040 for (j = 0; j < ctx->deltas[i].n_subscribed && ctx->deltas[i].subscribed[j]; ++j) { | |
2041 client *c = ctx->deltas[i].subscribed[j]; | |
2042 | |
2043 uw_subscribe(ch, c); | |
2044 uw_release_client(c); | |
2045 } | |
2046 | |
2047 if (buf_used(&ctx->deltas[i].msgs) > 0) { | |
2048 uw_channel_send(ch, ctx->deltas[i].msgs.start); | |
2049 } | |
2050 | |
2051 uw_release_channel(ch); | |
2052 } | |
2053 | 1868 |
2054 if (uw_db_commit(ctx)) | 1869 if (uw_db_commit(ctx)) |
2055 uw_error(ctx, FATAL, "Error running SQL COMMIT"); | 1870 uw_error(ctx, FATAL, "Error running SQL COMMIT"); |
1871 | |
1872 for (i = 0; i < ctx->used_deltas; ++i) { | |
1873 delta *d = &ctx->deltas[i]; | |
1874 client *c = find_client(d->client); | |
1875 | |
1876 assert (c != NULL && c->mode == USED); | |
1877 | |
1878 client_send(c == ctx->client, c, &d->msgs); | |
1879 } | |
1880 | |
1881 if (ctx->client) | |
1882 pthread_mutex_unlock(&ctx->client->lock); | |
2056 } | 1883 } |
2057 | 1884 |
2058 int uw_rollback(uw_context ctx) { | 1885 int uw_rollback(uw_context ctx) { |
2059 size_t i, j; | 1886 if (ctx->client) |
2060 | 1887 pthread_mutex_unlock(&ctx->client->lock); |
2061 for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED; ++i) { | |
2062 channel *ch = ctx->deltas[i].ch; | |
2063 | |
2064 for (j = 0; j < ctx->deltas[i].n_subscribed && ctx->deltas[i].subscribed[j]; ++j) { | |
2065 client *c = ctx->deltas[i].subscribed[j]; | |
2066 | |
2067 uw_release_client(c); | |
2068 } | |
2069 | |
2070 if (ctx->deltas[i].newness == NEW) | |
2071 uw_free_channel(ch); | |
2072 else | |
2073 uw_release_channel(ch); | |
2074 } | |
2075 | 1888 |
2076 return uw_db_rollback(ctx); | 1889 return uw_db_rollback(ctx); |
2077 } | 1890 } |