comparison src/c/urweb.c @ 682:5bbb542243e8

Redo channels, making them single-client
author Adam Chlipala <adamc@hcoop.net>
date Sun, 29 Mar 2009 11:37:29 -0400
parents 6c9b8875f347
children 9a2c18dab11d
comparison
equal deleted inserted replaced
681:6c9b8875f347 682:5bbb542243e8
5 #include <string.h> 5 #include <string.h>
6 #include <strings.h> 6 #include <strings.h>
7 #include <ctype.h> 7 #include <ctype.h>
8 #include <setjmp.h> 8 #include <setjmp.h>
9 #include <stdarg.h> 9 #include <stdarg.h>
10 #include <assert.h>
10 11
11 #include <pthread.h> 12 #include <pthread.h>
12 13
13 #include "types.h" 14 #include "types.h"
14 15
86 87
87 // Persistent state types 88 // Persistent state types
88 89
89 typedef enum { UNUSED, USED } usage; 90 typedef enum { UNUSED, USED } usage;
90 91
91 typedef struct channel_list {
92 struct channel *data;
93 struct channel_list *next;
94 } channel_list;
95
96 typedef struct client { 92 typedef struct client {
97 size_t id; 93 unsigned id;
98 usage mode; 94 usage mode;
99 union { 95 int pass;
100 struct client *next; 96 struct client *next;
101 struct { 97 pthread_mutex_t lock;
102 pthread_mutex_t lock; 98 buf msgs;
103 int pass; 99 int sock;
104 buf msgs; 100 time_t last_contact;
105 int sock; 101 unsigned n_channels;
106 time_t last_contact;
107 unsigned refcount;
108 channel_list *channels;
109 } used;
110 } data;
111 } client; 102 } client;
112 103
113 typedef struct client_list {
114 client *data;
115 struct client_list *next;
116 } client_list;
117
118 typedef struct channel {
119 size_t id;
120 usage mode;
121 union {
122 struct channel *next;
123 struct {
124 pthread_mutex_t lock;
125 client_list *clients;
126 unsigned refcount;
127 } used;
128 } data;
129 } channel;
130
131 104
132 // Persistent client state 105 // Persistent client state
133 106
134 static client **clients, *clients_free; 107 static client **clients, *clients_free, *clients_used;
135 static size_t n_clients; 108 static unsigned n_clients;
136 109
137 static pthread_mutex_t clients_mutex = PTHREAD_MUTEX_INITIALIZER; 110 static pthread_mutex_t clients_mutex = PTHREAD_MUTEX_INITIALIZER;
138 111
139 static client *uw_new_client() { 112 static client *new_client() {
140 client *c; 113 client *c;
141 114
142 pthread_mutex_lock(&clients_mutex); 115 pthread_mutex_lock(&clients_mutex);
143 116
144 if (clients_free) { 117 if (clients_free) {
145 c = clients_free; 118 c = clients_free;
146 clients_free = clients_free->data.next; 119 clients_free = clients_free->next;
147 } 120 }
148 else { 121 else {
149 ++n_clients; 122 ++n_clients;
150 clients = realloc(clients, sizeof(client) * n_clients); 123 clients = realloc(clients, sizeof(client) * n_clients);
151 c = malloc(sizeof(client)); 124 c = malloc(sizeof(client));
152 c->id = n_clients-1; 125 c->id = n_clients-1;
126 pthread_mutex_init(&c->lock, NULL);
127 buf_init(&c->msgs, 0);
153 clients[n_clients-1] = c; 128 clients[n_clients-1] = c;
154 } 129 }
155 130
131 pthread_mutex_lock(&c->lock);
156 c->mode = USED; 132 c->mode = USED;
157 pthread_mutex_init(&c->data.used.lock, NULL); 133 c->pass = rand();
158 c->data.used.pass = rand(); 134 c->sock = -1;
159 c->data.used.sock = -1; 135 c->last_contact = time(NULL);
160 c->data.used.last_contact = time(NULL); 136 buf_reset(&c->msgs);
161 buf_init(&c->data.used.msgs, 0); 137 c->n_channels = 0;
162 c->data.used.refcount = 0; 138 pthread_mutex_unlock(&c->lock);
163 c->data.used.channels = NULL; 139
140 c->next = clients_used;
141 clients_used = c;
164 142
165 pthread_mutex_unlock(&clients_mutex); 143 pthread_mutex_unlock(&clients_mutex);
166 144
167 return c; 145 return c;
168 } 146 }
169 147
170 static const char begin_msgs[] = "HTTP/1.1 200 OK\r\nContent-type: text/plain\r\n\r\n"; 148 static const char begin_msgs[] = "HTTP/1.1 200 OK\r\nContent-type: text/plain\r\n\r\n";
171 149
172 static client *uw_find_client(size_t id) { 150 static client *find_client(unsigned id) {
173 client *c; 151 client *c;
174 152
175 pthread_mutex_lock(&clients_mutex); 153 pthread_mutex_lock(&clients_mutex);
176 154
177 if (id >= n_clients) { 155 if (id >= n_clients) {
179 return NULL; 157 return NULL;
180 } 158 }
181 159
182 c = clients[id]; 160 c = clients[id];
183 161
184 if (c->mode != USED) {
185 pthread_mutex_unlock(&clients_mutex);
186 return NULL;
187 }
188
189 pthread_mutex_lock(&c->data.used.lock);
190 ++c->data.used.refcount;
191 pthread_mutex_unlock(&c->data.used.lock);
192 pthread_mutex_unlock(&clients_mutex); 162 pthread_mutex_unlock(&clients_mutex);
193 return c; 163 return c;
194 } 164 }
195 165
196 static void uw_release_client(client *c) { 166 void uw_client_connect(unsigned id, int pass, int sock) {
197 pthread_mutex_lock(&c->data.used.lock); 167 client *c = find_client(id);
198 --c->data.used.refcount;
199 pthread_mutex_unlock(&c->data.used.lock);
200 }
201
202 void uw_client_connect(size_t id, int pass, int sock) {
203 client *c = uw_find_client(id);
204 168
205 if (c == NULL) { 169 if (c == NULL) {
206 close(sock); 170 close(sock);
207 fprintf(stderr, "Out-of-bounds client request (%d)\n", (int)id); 171 fprintf(stderr, "Out-of-bounds client request (%u)\n", id);
208 return; 172 return;
209 } 173 }
210 174
211 uw_release_client(c); 175 pthread_mutex_lock(&c->lock);
212 176
213 pthread_mutex_lock(&c->data.used.lock); 177 if (c->mode != USED) {
214 178 pthread_mutex_unlock(&c->lock);
215 if (pass != c->data.used.pass) {
216 pthread_mutex_unlock(&c->data.used.lock);
217 close(sock); 179 close(sock);
218 fprintf(stderr, "Wrong client password (%d)\n", (int)id); 180 fprintf(stderr, "Client request for unused slot (%u)\n", id);
219 return; 181 return;
220 } 182 }
221 183
222 if (c->data.used.sock != -1) { 184 if (pass != c->pass) {
223 close(c->data.used.sock); 185 pthread_mutex_unlock(&c->lock);
224 c->data.used.sock = -1; 186 close(sock);
225 } 187 fprintf(stderr, "Wrong client password (%u, %d)\n", id, pass);
226 188 return;
227 c->data.used.last_contact = time(NULL); 189 }
228 190
229 if (buf_used(&c->data.used.msgs) > 0) { 191 if (c->sock != -1) {
192 close(c->sock);
193 c->sock = -1;
194 }
195
196 c->last_contact = time(NULL);
197
198 if (buf_used(&c->msgs) > 0) {
230 uw_really_send(sock, begin_msgs, sizeof(begin_msgs) - 1); 199 uw_really_send(sock, begin_msgs, sizeof(begin_msgs) - 1);
231 uw_really_send(sock, c->data.used.msgs.start, buf_used(&c->data.used.msgs)); 200 uw_really_send(sock, c->msgs.start, buf_used(&c->msgs));
232 buf_reset(&c->data.used.msgs); 201 buf_reset(&c->msgs);
233 close(sock); 202 close(sock);
234 } 203 }
235 else 204 else
236 c->data.used.sock = sock; 205 c->sock = sock;
237 206
238 pthread_mutex_unlock(&c->data.used.lock); 207 pthread_mutex_unlock(&c->lock);
239 } 208 }
240 209
241 210 static void free_client(client *c) {
242 static void uw_free_client(client *c) {
243 channel_list *chs;
244
245 printf("Freeing client %d\n", c->id); 211 printf("Freeing client %d\n", c->id);
246 212
247 if (c->mode == USED) { 213 c->mode = UNUSED;
248 pthread_mutex_lock(&c->data.used.lock); 214 c->pass = -1;
249 215
250 for (chs = c->data.used.channels; chs; ) { 216 c->next = clients_free;
251 client_list *prev, *cs; 217 clients_free = c;
252
253 channel *ch = chs->data;
254 channel_list *tmp = chs->next;
255 free(chs);
256 chs = tmp;
257
258 pthread_mutex_lock(&ch->data.used.lock);
259 for (prev = NULL, cs = ch->data.used.clients; cs; ) {
260 if (cs->data == c) {
261 client_list *tmp = cs->next;
262 free(cs);
263 cs = tmp;
264 if (prev)
265 prev->next = cs;
266 else
267 ch->data.used.clients = cs;
268 }
269 else {
270 prev = cs;
271 cs = cs->next;
272 }
273 }
274 pthread_mutex_unlock(&ch->data.used.lock);
275 }
276
277 if (c->data.used.sock != -1)
278 close(c->data.used.sock);
279
280 pthread_mutex_unlock(&c->data.used.lock);
281 pthread_mutex_destroy(&c->data.used.lock);
282 buf_free(&c->data.used.msgs);
283 c->mode = UNUSED;
284
285 c->data.next = clients_free;
286 clients_free = c;
287 }
288 } 218 }
289 219
290 extern int uw_timeout; 220 extern int uw_timeout;
291 221
292 void uw_prune_clients() { 222 void uw_prune_clients() {
293 size_t i; 223 client *c, *next, *prev = NULL;
294 time_t cutoff; 224 time_t cutoff;
295 225
296 cutoff = time(NULL) - uw_timeout; 226 cutoff = time(NULL) - uw_timeout;
297 227
298 pthread_mutex_lock(&clients_mutex); 228 pthread_mutex_lock(&clients_mutex);
299 229
300 for (i = 0; i < n_clients; ++i) { 230 for (c = clients_used; c; c = next) {
301 if (clients[i]->mode == USED && clients[i]->data.used.last_contact < cutoff 231 next = c->next;
302 && clients[i]->data.used.refcount == 0) 232 pthread_mutex_lock(&c->lock);
303 uw_free_client(clients[i]); 233 if (c->last_contact < cutoff) {
234 if (prev)
235 prev->next = next;
236 else
237 clients_used = next;
238 free_client(c);
239 }
240 else
241 prev = c;
242 pthread_mutex_unlock(&c->lock);
304 } 243 }
305 244
306 pthread_mutex_unlock(&clients_mutex); 245 pthread_mutex_unlock(&clients_mutex);
307 } 246 }
308 247
309 248 static uw_Basis_channel new_channel(client *c) {
310 // Persistent channel state 249 uw_Basis_channel ch = {c->id, c->n_channels++};
311
312
313 static channel **channels, *channels_free;
314 static size_t n_channels;
315
316 static pthread_mutex_t channels_mutex = PTHREAD_MUTEX_INITIALIZER;
317
318 static channel *uw_new_channel() {
319 channel *ch;
320
321 pthread_mutex_lock(&channels_mutex);
322
323 if (channels_free) {
324 ch = channels_free;
325 channels_free = channels_free->data.next;
326 }
327 else {
328 ++n_channels;
329 channels = realloc(channels, sizeof(channels) * n_channels);
330 ch = malloc(sizeof(channel));
331 ch->id = n_channels-1;
332 channels[n_channels-1] = ch;
333 }
334
335 ch->mode = USED;
336 pthread_mutex_init(&ch->data.used.lock, NULL);
337 ch->data.used.clients = NULL;
338 ch->data.used.refcount = 0;
339
340 pthread_mutex_unlock(&channels_mutex);
341
342 return ch; 250 return ch;
343 } 251 }
344 252
345 static void uw_free_channel(channel *ch) { 253 static void client_send(int already_locked, client *c, buf *msg) {
346 if (ch->mode == USED) { 254 if (!already_locked)
347 client_list *cs; 255 pthread_mutex_lock(&c->lock);
348 256
349 for (cs = ch->data.used.clients; cs; ) { 257 if (c->sock != -1) {
350 client_list *tmp = cs->next; 258 uw_really_send(c->sock, begin_msgs, sizeof(begin_msgs) - 1);
351 free(cs); 259 uw_really_send(c->sock, msg->start, buf_used(msg));
352 cs = tmp; 260 close(c->sock);
353 } 261 c->sock = -1;
354 pthread_mutex_destroy(&ch->data.used.lock); 262 } else
355 ch->mode = UNUSED; 263 buf_append(&c->msgs, msg->start, buf_used(msg));
356 ch->data.next = channels_free; 264
357 channels_free = ch; 265 if (!already_locked)
358 } 266 pthread_mutex_unlock(&c->lock);
359 }
360
361 static channel *uw_find_channel(size_t id) {
362 channel *ch = NULL;
363
364 pthread_mutex_lock(&channels_mutex);
365
366 if (id < n_channels && channels[id]->mode == USED) {
367 ch = channels[id];
368
369 pthread_mutex_lock(&ch->data.used.lock);
370 ++ch->data.used.refcount;
371 pthread_mutex_unlock(&ch->data.used.lock);
372 }
373
374 pthread_mutex_unlock(&channels_mutex);
375
376 return ch;
377 }
378
379 static void uw_release_channel(channel *ch) {
380 pthread_mutex_lock(&ch->data.used.lock);
381 ++ch->data.used.refcount;
382 pthread_mutex_unlock(&ch->data.used.lock);
383 }
384
385 static void uw_subscribe(channel *ch, client *c) {
386 client_list *cs;
387
388 pthread_mutex_lock(&ch->data.used.lock);
389
390 for (cs = ch->data.used.clients; cs; cs = cs->next)
391 if (cs->data == c) {
392 pthread_mutex_unlock(&ch->data.used.lock);
393 return;
394 }
395
396 cs = malloc(sizeof(client_list));
397 cs->data = c;
398 cs->next = ch->data.used.clients;
399 ch->data.used.clients = cs;
400
401 pthread_mutex_unlock(&ch->data.used.lock);
402 }
403
404 static void uw_unsubscribe(channel *ch, client *c) {
405 client_list *prev, *cur, *tmp;
406
407 pthread_mutex_lock(&ch->data.used.lock);
408
409 for (prev = NULL, cur = ch->data.used.clients; cur; ) {
410 if (cur->data == c) {
411 if (prev)
412 prev->next = cur->next;
413 else
414 ch->data.used.clients = cur->next;
415 tmp = cur;
416 cur = cur->next;
417 free(tmp);
418 }
419 else {
420 prev = cur;
421 cur = cur->next;
422 }
423 }
424
425 pthread_mutex_unlock(&ch->data.used.lock);
426 }
427
428 static void uw_channel_send(channel *ch, const char *msg) {
429 size_t len = strlen(msg), preLen;
430 char pre[INTS_MAX + 2];
431 client_list *cs;
432
433 sprintf(pre, "%d\n", (int)ch->id);
434 preLen = strlen(pre);
435
436 pthread_mutex_lock(&ch->data.used.lock);
437
438 for (cs = ch->data.used.clients; cs; cs = cs->next) {
439 client *c = cs->data;
440
441 pthread_mutex_lock(&c->data.used.lock);
442
443 if (c->data.used.sock != -1) {
444 uw_really_send(c->data.used.sock, begin_msgs, sizeof(begin_msgs) - 1);
445 uw_really_send(c->data.used.sock, pre, preLen);
446 uw_really_send(c->data.used.sock, msg, len);
447 uw_really_send(c->data.used.sock, "\n", 1);
448 close(c->data.used.sock);
449 c->data.used.sock = -1;
450 } else {
451 buf_append(&c->data.used.msgs, pre, preLen);
452 buf_append(&c->data.used.msgs, msg, len);
453 buf_append(&c->data.used.msgs, "\n", 1);
454 }
455
456 pthread_mutex_unlock(&c->data.used.lock);
457 }
458
459 pthread_mutex_unlock(&ch->data.used.lock);
460 } 267 }
461 268
462 269
463 // Global entry points 270 // Global entry points
464 271
465 void uw_global_init() { 272 void uw_global_init() {
466 srand(time(NULL) ^ getpid()); 273 srand(time(NULL) ^ getpid());
467 274
468 clients = malloc(0); 275 clients = malloc(0);
469 channels = malloc(0);
470 } 276 }
471 277
472 278
473 // Single-request state 279 // Single-request state
474 280
482 void (*func)(void*); 288 void (*func)(void*);
483 void *arg; 289 void *arg;
484 } cleanup; 290 } cleanup;
485 291
486 typedef struct { 292 typedef struct {
487 usage mode; 293 unsigned client;
488 channel *ch;
489 enum { OLD, NEW } newness;
490
491 size_t n_subscribed;
492 client **subscribed;
493
494 buf msgs; 294 buf msgs;
495 } channel_delta; 295 } delta;
496 296
497 struct uw_context { 297 struct uw_context {
498 char *headers, *headers_end; 298 char *headers, *headers_end;
499 299
500 buf outHeaders, page, heap, script; 300 buf outHeaders, page, heap, script;
510 310
511 cleanup *cleanup, *cleanup_front, *cleanup_back; 311 cleanup *cleanup, *cleanup_front, *cleanup_back;
512 312
513 const char *script_header, *url_prefix; 313 const char *script_header, *url_prefix;
514 314
515 size_t n_deltas; 315 size_t n_deltas, used_deltas;
516 channel_delta *deltas; 316 delta *deltas;
517 317
518 int timeout; 318 int timeout;
319
320 client *client;
519 321
520 char error_message[ERROR_BUF_LEN]; 322 char error_message[ERROR_BUF_LEN];
521 }; 323 };
522 324
523 extern int uw_inputs_len; 325 extern int uw_inputs_len;
546 348
547 ctx->error_message[0] = 0; 349 ctx->error_message[0] = 0;
548 350
549 ctx->source_count = 0; 351 ctx->source_count = 0;
550 352
551 ctx->n_deltas = 0; 353 ctx->n_deltas = ctx->used_deltas = 0;
552 ctx->deltas = malloc(0); 354 ctx->deltas = malloc(0);
553 355
554 ctx->timeout = uw_timeout; 356 ctx->timeout = uw_timeout;
555 357
556 return ctx; 358 return ctx;
572 buf_free(&ctx->page); 374 buf_free(&ctx->page);
573 buf_free(&ctx->heap); 375 buf_free(&ctx->heap);
574 free(ctx->inputs); 376 free(ctx->inputs);
575 free(ctx->cleanup); 377 free(ctx->cleanup);
576 378
577 for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED; ++i) { 379 for (i = 0; i < ctx->n_deltas; ++i)
578 free(ctx->deltas[i].subscribed);
579 buf_free(&ctx->deltas[i].msgs); 380 buf_free(&ctx->deltas[i].msgs);
580 }
581 381
582 free(ctx); 382 free(ctx);
583 } 383 }
584 384
585 void uw_reset_keep_error_message(uw_context ctx) { 385 void uw_reset_keep_error_message(uw_context ctx) {
589 buf_reset(&ctx->page); 389 buf_reset(&ctx->page);
590 buf_reset(&ctx->heap); 390 buf_reset(&ctx->heap);
591 ctx->regions = NULL; 391 ctx->regions = NULL;
592 ctx->cleanup_front = ctx->cleanup; 392 ctx->cleanup_front = ctx->cleanup;
593 ctx->source_count = 0; 393 ctx->source_count = 0;
594 if (ctx->n_deltas > 0) 394 ctx->used_deltas = 0;
595 ctx->deltas[0].mode = UNUSED; 395 ctx->client = NULL;
596 } 396 }
597 397
598 void uw_reset_keep_request(uw_context ctx) { 398 void uw_reset_keep_request(uw_context ctx) {
599 uw_reset_keep_error_message(ctx); 399 uw_reset_keep_error_message(ctx);
600 ctx->error_message[0] = 0; 400 ctx->error_message[0] = 0;
667 ctx->cleanup_front->func = func; 467 ctx->cleanup_front->func = func;
668 ctx->cleanup_front->arg = arg; 468 ctx->cleanup_front->arg = arg;
669 ++ctx->cleanup_front; 469 ++ctx->cleanup_front;
670 } 470 }
671 471
472 uw_Basis_string uw_Basis_requestHeader(uw_context ctx, uw_Basis_string h) {
473 int len = strlen(h);
474 char *s = ctx->headers, *p;
475
476 while (p = strchr(s, ':')) {
477 if (p - s == len && !strncasecmp(s, h, len)) {
478 return p + 2;
479 } else {
480 if ((s = strchr(p, 0)) && s < ctx->headers_end)
481 s += 2;
482 else
483 return NULL;
484 }
485 }
486
487 return NULL;
488 }
489
490 void uw_login(uw_context ctx) {
491 if (ctx->script_header[0]) {
492 char *id_s, *pass_s;
493
494 if ((id_s = uw_Basis_requestHeader(ctx, "UrWeb-Client"))
495 && (pass_s = uw_Basis_requestHeader(ctx, "UrWeb-Pass"))) {
496 unsigned id = atoi(id_s);
497 int pass = atoi(pass_s);
498 client *c = find_client(id);
499
500 if (c == NULL)
501 uw_error(ctx, FATAL, "Unknown client ID in HTTP headers (%s, %s)", id_s, pass_s);
502 else {
503 pthread_mutex_lock(&c->lock);
504 ctx->client = c;
505
506 if (c->mode != USED)
507 uw_error(ctx, FATAL, "Stale client ID (%u) in subscription request", id);
508 if (c->pass != pass)
509 uw_error(ctx, FATAL, "Wrong client password (%u, %d) in subscription request", id, pass);
510 }
511 } else {
512 client *c = new_client();
513 pthread_mutex_lock(&c->lock);
514 ctx->client = c;
515 }
516 }
517 }
518
672 failure_kind uw_begin(uw_context ctx, char *path) { 519 failure_kind uw_begin(uw_context ctx, char *path) {
673 int r = setjmp(ctx->jmp_buf); 520 int r = setjmp(ctx->jmp_buf);
674 521
675 if (r == 0) { 522 if (r == 0) {
676 if (uw_db_begin(ctx)) 523 if (uw_db_begin(ctx))
677 uw_error(ctx, BOUNDED_RETRY, "Error running SQL BEGIN"); 524 uw_error(ctx, BOUNDED_RETRY, "Error running SQL BEGIN");
525
678 uw_handle(ctx, path); 526 uw_handle(ctx, path);
679 } 527 }
680 528
681 return r; 529 return r;
530 }
531
532 uw_Basis_client uw_Basis_self(uw_context ctx, uw_unit u) {
533 if (ctx->client == NULL)
534 uw_error(ctx, FATAL, "Call to Basis.self() from page that has only server-side code");
535
536 return ctx->client->id;
682 } 537 }
683 538
684 void uw_pop_cleanup(uw_context ctx) { 539 void uw_pop_cleanup(uw_context ctx) {
685 if (ctx->cleanup_front == ctx->cleanup) 540 if (ctx->cleanup_front == ctx->cleanup)
686 uw_error(ctx, FATAL, "Attempt to pop from empty cleanup action stack"); 541 uw_error(ctx, FATAL, "Attempt to pop from empty cleanup action stack");
841 696
842 const char *uw_Basis_get_script(uw_context ctx, uw_unit u) { 697 const char *uw_Basis_get_script(uw_context ctx, uw_unit u) {
843 if (ctx->script_header[0] == 0) 698 if (ctx->script_header[0] == 0)
844 return ""; 699 return "";
845 else { 700 else {
846 int pass;
847 client *c = uw_new_client(&pass);
848
849 char *r = uw_malloc(ctx, strlen(ctx->script_header) + 18 + buf_used(&ctx->script)); 701 char *r = uw_malloc(ctx, strlen(ctx->script_header) + 18 + buf_used(&ctx->script));
850 sprintf(r, "%s<script>%s</script>", 702 sprintf(r, "%s<script>%s</script>",
851 ctx->script_header, 703 ctx->script_header,
852 ctx->script.start); 704 ctx->script.start);
853 return r; 705 return r;
854 } 706 }
855 } 707 }
856 708
857 const char *uw_Basis_get_settings(uw_context ctx, uw_Basis_string onload) { 709 const char *uw_Basis_get_settings(uw_context ctx, uw_Basis_string onload) {
858 if (ctx->script_header[0] == 0) 710 if (ctx->client == NULL)
859 return ""; 711 return "";
860 else { 712 else {
861 int pass;
862 client *c = uw_new_client(&pass);
863
864 char *r = uw_malloc(ctx, 52 + 3 * INTS_MAX + strlen(ctx->url_prefix) + strlen(onload)); 713 char *r = uw_malloc(ctx, 52 + 3 * INTS_MAX + strlen(ctx->url_prefix) + strlen(onload));
865 sprintf(r, " onload='client_id=%d;client_pass=%d;url_prefix=\"%s\";timeout=%d;listener();%s'", 714 sprintf(r, " onload='client_id=%u;client_pass=%d;url_prefix=\"%s\";timeout=%d;listener();%s'",
866 (int)c->id, 715 ctx->client->id,
867 c->data.used.pass, 716 ctx->client->pass,
868 ctx->url_prefix, 717 ctx->url_prefix,
869 ctx->timeout, 718 ctx->timeout,
870 onload); 719 onload);
871 return r; 720 return r;
872 } 721 }
940 strcpy(s2, "\""); 789 strcpy(s2, "\"");
941 ctx->script.front = s2 + 1; 790 ctx->script.front = s2 + 1;
942 return r; 791 return r;
943 } 792 }
944 793
794 char *uw_Basis_jsifyChannel(uw_context ctx, uw_Basis_channel chn) {
795 if (ctx->client == NULL || chn.cli != ctx->client->id)
796 return "null";
797 else {
798 int len;
799 char *r;
800
801 uw_check_heap(ctx, INTS_MAX + 1);
802 r = ctx->heap.front;
803 sprintf(r, "%u%n", chn.chn, &len);
804 ctx->heap.front += len+1;
805 return r;
806 }
807 }
808
945 uw_Basis_int uw_Basis_new_client_source(uw_context ctx, uw_Basis_string s) { 809 uw_Basis_int uw_Basis_new_client_source(uw_context ctx, uw_Basis_string s) {
946 int len; 810 int len;
947 size_t s_len = strlen(s); 811 size_t s_len = strlen(s);
948 812
949 uw_check_script(ctx, 12 + INTS_MAX + s_len); 813 uw_check_script(ctx, 12 + INTS_MAX + s_len);
1001 char *result; 865 char *result;
1002 int len; 866 int len;
1003 uw_check_heap(ctx, INTS_MAX); 867 uw_check_heap(ctx, INTS_MAX);
1004 result = ctx->heap.front; 868 result = ctx->heap.front;
1005 sprintf(result, "%lld%n", n, &len); 869 sprintf(result, "%lld%n", n, &len);
1006 ctx->heap.front += len+1;
1007 return result;
1008 }
1009
1010 char *uw_Basis_attrifyChannel(uw_context ctx, uw_Basis_channel n) {
1011 char *result;
1012 int len;
1013 uw_check_heap(ctx, INTS_MAX);
1014 result = ctx->heap.front;
1015 sprintf(result, "%lld%n", (long long)n, &len);
1016 ctx->heap.front += len+1; 870 ctx->heap.front += len+1;
1017 return result; 871 return result;
1018 } 872 }
1019 873
1020 char *uw_Basis_attrifyFloat(uw_context ctx, uw_Basis_float n) { 874 char *uw_Basis_attrifyFloat(uw_context ctx, uw_Basis_float n) {
1114 sprintf(r, "%lld%n", n, &len); 968 sprintf(r, "%lld%n", n, &len);
1115 ctx->heap.front += len+1; 969 ctx->heap.front += len+1;
1116 return r; 970 return r;
1117 } 971 }
1118 972
1119 char *uw_Basis_urlifyChannel(uw_context ctx, uw_Basis_channel n) { 973 char *uw_Basis_urlifyChannel(uw_context ctx, uw_Basis_channel chn) {
1120 int len; 974 if (ctx->client == NULL || chn.cli != ctx->client->id)
1121 char *r; 975 return "";
1122 976 else {
1123 uw_check_heap(ctx, INTS_MAX); 977 int len;
1124 r = ctx->heap.front; 978 char *r;
1125 sprintf(r, "%lld%n", (long long)n, &len); 979
1126 ctx->heap.front += len+1; 980 uw_check_heap(ctx, INTS_MAX + 1);
1127 return r; 981 r = ctx->heap.front;
982 sprintf(r, "%u%n", chn.chn, &len);
983 ctx->heap.front += len+1;
984 return r;
985 }
1128 } 986 }
1129 987
1130 char *uw_Basis_urlifyFloat(uw_context ctx, uw_Basis_float n) { 988 char *uw_Basis_urlifyFloat(uw_context ctx, uw_Basis_float n) {
1131 int len; 989 int len;
1132 char *r; 990 char *r;
1180 uw_Basis_urlifyInt_w_unsafe(ctx, n); 1038 uw_Basis_urlifyInt_w_unsafe(ctx, n);
1181 1039
1182 return uw_unit_v; 1040 return uw_unit_v;
1183 } 1041 }
1184 1042
1185 uw_unit uw_Basis_urlifyChannel_w(uw_context ctx, uw_Basis_channel n) { 1043 uw_unit uw_Basis_urlifyChannel_w(uw_context ctx, uw_Basis_channel chn) {
1186 int len; 1044 if (ctx->client != NULL && chn.cli == ctx->client->id) {
1187 1045 int len;
1188 uw_check(ctx, INTS_MAX); 1046
1189 sprintf(ctx->page.front, "%lld%n", (long long)n, &len); 1047 uw_check(ctx, INTS_MAX + 1);
1190 ctx->page.front += len; 1048 sprintf(ctx->page.front, "%u%n", chn.chn, &len);
1191 1049 ctx->page.front += len;
1050 }
1051
1192 return uw_unit_v; 1052 return uw_unit_v;
1193 } 1053 }
1194 1054
1195 uw_unit uw_Basis_urlifyFloat_w(uw_context ctx, uw_Basis_float n) { 1055 uw_unit uw_Basis_urlifyFloat_w(uw_context ctx, uw_Basis_float n) {
1196 int len; 1056 int len;
1251 uw_Basis_int r; 1111 uw_Basis_int r;
1252 1112
1253 r = atoll(*s); 1113 r = atoll(*s);
1254 *s = new_s; 1114 *s = new_s;
1255 return r; 1115 return r;
1256 }
1257
1258 uw_Basis_channel uw_Basis_unurlifyChannel(uw_context ctx, char **s) {
1259 return uw_Basis_unurlifyInt(ctx, s);
1260 } 1116 }
1261 1117
1262 uw_Basis_float uw_Basis_unurlifyFloat(uw_context ctx, char **s) { 1118 uw_Basis_float uw_Basis_unurlifyFloat(uw_context ctx, char **s) {
1263 char *new_s = uw_unurlify_advance(*s); 1119 char *new_s = uw_unurlify_advance(*s);
1264 uw_Basis_float r; 1120 uw_Basis_float r;
1346 uw_check(ctx, INTS_MAX); 1202 uw_check(ctx, INTS_MAX);
1347 sprintf(ctx->page.front, "%lld%n", n, &len); 1203 sprintf(ctx->page.front, "%lld%n", n, &len);
1348 ctx->page.front += len; 1204 ctx->page.front += len;
1349 1205
1350 return uw_unit_v; 1206 return uw_unit_v;
1351 }
1352
1353 char *uw_Basis_htmlifyChannel(uw_context ctx, uw_Basis_channel ch) {
1354 return uw_Basis_htmlifyInt(ctx, ch);
1355 } 1207 }
1356 1208
1357 char *uw_Basis_htmlifyFloat(uw_context ctx, uw_Basis_float n) { 1209 char *uw_Basis_htmlifyFloat(uw_context ctx, uw_Basis_float n) {
1358 int len; 1210 int len;
1359 char *r; 1211 char *r;
1539 sprintf(r, "%lld::int8%n", n, &len); 1391 sprintf(r, "%lld::int8%n", n, &len);
1540 ctx->heap.front += len+1; 1392 ctx->heap.front += len+1;
1541 return r; 1393 return r;
1542 } 1394 }
1543 1395
1544 char *uw_Basis_sqlifyChannel(uw_context ctx, uw_Basis_channel n) {
1545 int len;
1546 char *r;
1547
1548 uw_check_heap(ctx, INTS_MAX + 6);
1549 r = ctx->heap.front;
1550 sprintf(r, "%lld::int4%n", (long long)n, &len);
1551 ctx->heap.front += len+1;
1552 return r;
1553 }
1554
1555 char *uw_Basis_sqlifyIntN(uw_context ctx, uw_Basis_int *n) { 1396 char *uw_Basis_sqlifyIntN(uw_context ctx, uw_Basis_int *n) {
1556 if (n == NULL) 1397 if (n == NULL)
1557 return "NULL"; 1398 return "NULL";
1558 else 1399 else
1559 return uw_Basis_sqlifyInt(ctx, *n); 1400 return uw_Basis_sqlifyInt(ctx, *n);
1612 strcpy(s2, "'::text"); 1453 strcpy(s2, "'::text");
1613 ctx->heap.front = s2 + 8; 1454 ctx->heap.front = s2 + 8;
1614 return r; 1455 return r;
1615 } 1456 }
1616 1457
1458 char *uw_Basis_sqlifyChannel(uw_context ctx, uw_Basis_channel chn) {
1459 int len;
1460 char *r;
1461 unsigned long long combo = ((unsigned long long)chn.cli << 32) | chn.chn;
1462
1463 uw_check_heap(ctx, INTS_MAX + 7);
1464 r = ctx->heap.front;
1465 sprintf(r, "%lld::int8%n", combo, &len);
1466 ctx->heap.front += len+1;
1467 return r;
1468 }
1469
1470 char *uw_Basis_attrifyChannel(uw_context ctx, uw_Basis_channel chn) {
1471 int len;
1472 char *r;
1473 unsigned long long combo = ((unsigned long long)chn.cli << 32) | chn.chn;
1474
1475 uw_check_heap(ctx, INTS_MAX + 1);
1476 r = ctx->heap.front;
1477 sprintf(r, "%lld%n", combo, &len);
1478 ctx->heap.front += len+1;
1479 return r;
1480 }
1481
1482 char *uw_Basis_sqlifyClient(uw_context ctx, uw_Basis_client cli) {
1483 int len;
1484 char *r;
1485
1486 uw_check_heap(ctx, INTS_MAX + 7);
1487 r = ctx->heap.front;
1488 sprintf(r, "%u::int4%n", cli, &len);
1489 ctx->heap.front += len+1;
1490 return r;
1491 }
1492
1493 char *uw_Basis_attrifyClient(uw_context ctx, uw_Basis_client cli) {
1494 int len;
1495 char *r;
1496
1497 uw_check_heap(ctx, INTS_MAX + 1);
1498 r = ctx->heap.front;
1499 sprintf(r, "%u%n", cli, &len);
1500 ctx->heap.front += len+1;
1501 return r;
1502 }
1503
1617 uw_Basis_string uw_Basis_sqlifyStringN(uw_context ctx, uw_Basis_string s) { 1504 uw_Basis_string uw_Basis_sqlifyStringN(uw_context ctx, uw_Basis_string s) {
1618 if (s == NULL) 1505 if (s == NULL)
1619 return "NULL"; 1506 return "NULL";
1620 else 1507 else
1621 return uw_Basis_sqlifyString(ctx, s); 1508 return uw_Basis_sqlifyString(ctx, s);
1634 else 1521 else
1635 return uw_Basis_sqlifyBool(ctx, *b); 1522 return uw_Basis_sqlifyBool(ctx, *b);
1636 } 1523 }
1637 1524
1638 char *uw_Basis_sqlifyTime(uw_context ctx, uw_Basis_time t) { 1525 char *uw_Basis_sqlifyTime(uw_context ctx, uw_Basis_time t) {
1526 size_t len;
1527 char *r, *s;
1528 struct tm stm;
1529
1530 if (localtime_r(&t, &stm)) {
1531 s = uw_malloc(ctx, TIMES_MAX);
1532 len = strftime(s, TIMES_MAX, TIME_FMT, &stm);
1533 r = uw_malloc(ctx, len + 14);
1534 sprintf(r, "'%s'::timestamp", s);
1535 return r;
1536 } else
1537 return "<Invalid time>";
1538 }
1539
1540 char *uw_Basis_attrifyTime(uw_context ctx, uw_Basis_time t) {
1639 size_t len; 1541 size_t len;
1640 char *r; 1542 char *r;
1641 struct tm stm; 1543 struct tm stm;
1642 1544
1643 if (localtime_r(&t, &stm)) { 1545 if (localtime_r(&t, &stm)) {
1721 return r; 1623 return r;
1722 } else 1624 } else
1723 return NULL; 1625 return NULL;
1724 } 1626 }
1725 1627
1726 uw_Basis_channel *uw_Basis_stringToChannel(uw_context ctx, uw_Basis_string s) {
1727 char *endptr;
1728 uw_Basis_channel n = strtoll(s, &endptr, 10);
1729
1730 if (*s != '\0' && *endptr == '\0') {
1731 uw_Basis_channel *r = uw_malloc(ctx, sizeof(uw_Basis_channel));
1732 *r = n;
1733 return r;
1734 } else
1735 return NULL;
1736 }
1737
1738 uw_Basis_float *uw_Basis_stringToFloat(uw_context ctx, uw_Basis_string s) { 1628 uw_Basis_float *uw_Basis_stringToFloat(uw_context ctx, uw_Basis_string s) {
1739 char *endptr; 1629 char *endptr;
1740 uw_Basis_float n = strtod(s, &endptr); 1630 uw_Basis_float n = strtod(s, &endptr);
1741 1631
1742 if (*s != '\0' && *endptr == '\0') { 1632 if (*s != '\0' && *endptr == '\0') {
1800 return n; 1690 return n;
1801 else 1691 else
1802 uw_error(ctx, FATAL, "Can't parse int: %s", s); 1692 uw_error(ctx, FATAL, "Can't parse int: %s", s);
1803 } 1693 }
1804 1694
1695 #include <errno.h>
1696
1805 uw_Basis_channel uw_Basis_stringToChannel_error(uw_context ctx, uw_Basis_string s) { 1697 uw_Basis_channel uw_Basis_stringToChannel_error(uw_context ctx, uw_Basis_string s) {
1698 unsigned long long n;
1699
1700 if (sscanf(s, "%llu", &n) < 1)
1701 uw_error(ctx, FATAL, "Can't parse channel: %s", s);
1702 else {
1703 uw_Basis_channel ch = {n >> 32, n & ((1ull << 32) - 1)};
1704 return ch;
1705 }
1706 }
1707
1708 uw_Basis_client uw_Basis_stringToClient_error(uw_context ctx, uw_Basis_string s) {
1806 char *endptr; 1709 char *endptr;
1807 uw_Basis_channel n = strtoll(s, &endptr, 10); 1710 unsigned long n = strtoul(s, &endptr, 10);
1808 1711
1809 if (*s != '\0' && *endptr == '\0') 1712 if (*s != '\0' && *endptr == '\0')
1810 return n; 1713 return n;
1811 else 1714 else
1812 uw_error(ctx, FATAL, "Can't parse channel int: %s", s); 1715 uw_error(ctx, FATAL, "Can't parse client: %s", s);
1813 } 1716 }
1814 1717
1815 uw_Basis_float uw_Basis_stringToFloat_error(uw_context ctx, uw_Basis_string s) { 1718 uw_Basis_float uw_Basis_stringToFloat_error(uw_context ctx, uw_Basis_string s) {
1816 char *endptr; 1719 char *endptr;
1817 uw_Basis_float n = strtod(s, &endptr); 1720 uw_Basis_float n = strtod(s, &endptr);
1851 return mktime(&stm); 1754 return mktime(&stm);
1852 else if (strptime(s, TIME_FMT, &stm) == end) 1755 else if (strptime(s, TIME_FMT, &stm) == end)
1853 return mktime(&stm); 1756 return mktime(&stm);
1854 else 1757 else
1855 uw_error(ctx, FATAL, "Can't parse time: %s", s); 1758 uw_error(ctx, FATAL, "Can't parse time: %s", s);
1856 }
1857 }
1858
1859 uw_Basis_string uw_Basis_requestHeader(uw_context ctx, uw_Basis_string h) {
1860 int len = strlen(h);
1861 char *s = ctx->headers, *p;
1862
1863 while (p = strchr(s, ':')) {
1864 if (p - s == len && !strncasecmp(s, h, len)) {
1865 return p + 2;
1866 } else {
1867 if ((s = strchr(p, 0)) && s < ctx->headers_end)
1868 s += 2;
1869 else
1870 return NULL;
1871 }
1872 } 1759 }
1873 } 1760 }
1874 1761
1875 uw_Basis_string uw_Basis_get_cookie(uw_context ctx, uw_Basis_string c) { 1762 uw_Basis_string uw_Basis_get_cookie(uw_context ctx, uw_Basis_string c) {
1876 int len = strlen(c); 1763 int len = strlen(c);
1928 uw_write_header(ctx, "\r\n"); 1815 uw_write_header(ctx, "\r\n");
1929 1816
1930 return uw_unit_v; 1817 return uw_unit_v;
1931 } 1818 }
1932 1819
1933 static channel_delta *allocate_delta(uw_context ctx, channel *ch) { 1820 static delta *allocate_delta(uw_context ctx, unsigned client) {
1934 size_t i; 1821 unsigned i;
1935 channel_delta *cd; 1822 delta *d;
1936 1823
1937 for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED && ctx->deltas[i].ch != ch; ++i); 1824 for (i = 0; i < ctx->used_deltas; ++i)
1938 1825 if (ctx->deltas[i].client == client)
1939 if (i < ctx->n_deltas && ctx->deltas[i].mode == USED && ctx->deltas[i].ch == ch) 1826 return &ctx->deltas[i];
1940 return &ctx->deltas[i]; 1827
1941 1828 if (ctx->used_deltas >= ctx->n_deltas) {
1942 if (i < ctx->n_deltas) 1829 ctx->deltas = realloc(ctx->deltas, sizeof(delta) * ++ctx->n_deltas);
1943 cd = &ctx->deltas[i]; 1830 buf_init(&ctx->deltas[ctx->n_deltas-1].msgs, 0);
1944 else { 1831 }
1945 ++ctx->n_deltas; 1832
1946 ctx->deltas = realloc(ctx->deltas, sizeof(channel_delta) * ctx->n_deltas); 1833 d = &ctx->deltas[ctx->used_deltas++];
1947 cd = &ctx->deltas[ctx->n_deltas-1]; 1834 d->client = client;
1948 cd->n_subscribed = 0; 1835 buf_reset(&d->msgs);
1949 cd->subscribed = malloc(0); 1836 return d;
1950 buf_init(&cd->msgs, 0);
1951 }
1952
1953 cd->mode = USED;
1954 cd->newness = OLD;
1955 cd->ch = ch;
1956 if (cd->n_subscribed > 0)
1957 cd->subscribed[0] = NULL;
1958 buf_reset(&cd->msgs);
1959 return cd;
1960 } 1837 }
1961 1838
1962 uw_Basis_channel uw_Basis_new_channel(uw_context ctx, uw_unit u) { 1839 uw_Basis_channel uw_Basis_new_channel(uw_context ctx, uw_unit u) {
1963 size_t i; 1840 if (ctx->client == NULL)
1964 channel *ch = uw_new_channel(); 1841 uw_error(ctx, FATAL, "Attempt to create channel on request not associated with a persistent connection");
1965 ++ch->data.used.refcount; 1842
1966 channel_delta *cd = allocate_delta(ctx, ch); 1843 return new_channel(ctx->client);
1967
1968 cd->newness = NEW;
1969
1970 return ch->id;
1971 }
1972
1973 static int delta_used(channel_delta *cd) {
1974 return cd->newness == NEW || buf_used(&cd->msgs) > 0 || (cd->n_subscribed > 0 && cd->subscribed[0]);
1975 }
1976
1977 uw_unit uw_Basis_subscribe(uw_context ctx, uw_Basis_channel chn) {
1978 channel *ch = uw_find_channel(chn);
1979
1980 if (ch == NULL)
1981 uw_error(ctx, FATAL, "Bad channel ID %d", (int)chn);
1982 else {
1983 size_t id = atoi(uw_Basis_requestHeader(ctx, "UrWeb-Client"));
1984 int pass = atoi(uw_Basis_requestHeader(ctx, "UrWeb-Pass"));
1985 client *c = uw_find_client(id);
1986
1987 if (c == NULL) {
1988 uw_release_channel(ch);
1989 uw_error(ctx, FATAL, "Unknown client ID in subscription request");
1990 } else if (c->data.used.pass != pass) {
1991 uw_release_channel(ch);
1992 uw_release_client(c);
1993 uw_error(ctx, FATAL, "Wrong client password (%d) in subscription request", pass);
1994 } else {
1995 size_t i;
1996 channel_delta *cd = allocate_delta(ctx, ch);
1997
1998 if (delta_used(cd))
1999 uw_release_channel(ch);
2000
2001 for (i = 0; i < cd->n_subscribed && cd->subscribed[i]; ++i);
2002
2003 if (i < cd->n_subscribed)
2004 cd->subscribed[i] = c;
2005 else {
2006 ++cd->n_subscribed;
2007 cd->subscribed = realloc(cd->subscribed, sizeof(int) * cd->n_subscribed);
2008 cd->subscribed[cd->n_subscribed-1] = c;
2009 }
2010 }
2011 }
2012
2013 return uw_unit_v;
2014 } 1844 }
2015 1845
2016 uw_unit uw_Basis_send(uw_context ctx, uw_Basis_channel chn, uw_Basis_string msg) { 1846 uw_unit uw_Basis_send(uw_context ctx, uw_Basis_channel chn, uw_Basis_string msg) {
2017 channel *ch = uw_find_channel(chn); 1847 delta *d = allocate_delta(ctx, chn.cli);
2018 1848 size_t len;
2019 if (ch == NULL) 1849 int preLen;
2020 uw_error(ctx, FATAL, "Bad channel ID %d", (int)chn); 1850 char pre[INTS_MAX + 2];
2021 else { 1851
2022 channel_delta *cd = allocate_delta(ctx, ch); 1852 len = strlen(msg);
2023 if (delta_used(cd)) 1853
2024 uw_release_channel(ch); 1854 sprintf(pre, "%u\n%n", chn.chn, &preLen);
2025 buf_append(&cd->msgs, msg, strlen(msg)); 1855
2026 } 1856 buf_append(&d->msgs, pre, preLen);
1857 buf_append(&d->msgs, msg, len);
1858 buf_append(&d->msgs, "\n", 1);
2027 1859
2028 return uw_unit_v; 1860 return uw_unit_v;
2029 } 1861 }
2030 1862
2031 int uw_db_commit(uw_context); 1863 int uw_db_commit(uw_context);
2032 int uw_db_rollback(uw_context); 1864 int uw_db_rollback(uw_context);
2033 1865
2034 void uw_commit(uw_context ctx) { 1866 void uw_commit(uw_context ctx) {
2035 size_t i, j; 1867 unsigned i;
2036
2037 for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED; ++i) {
2038 channel *ch = ctx->deltas[i].ch;
2039
2040 for (j = 0; j < ctx->deltas[i].n_subscribed && ctx->deltas[i].subscribed[j]; ++j) {
2041 client *c = ctx->deltas[i].subscribed[j];
2042
2043 uw_subscribe(ch, c);
2044 uw_release_client(c);
2045 }
2046
2047 if (buf_used(&ctx->deltas[i].msgs) > 0) {
2048 uw_channel_send(ch, ctx->deltas[i].msgs.start);
2049 }
2050
2051 uw_release_channel(ch);
2052 }
2053 1868
2054 if (uw_db_commit(ctx)) 1869 if (uw_db_commit(ctx))
2055 uw_error(ctx, FATAL, "Error running SQL COMMIT"); 1870 uw_error(ctx, FATAL, "Error running SQL COMMIT");
1871
1872 for (i = 0; i < ctx->used_deltas; ++i) {
1873 delta *d = &ctx->deltas[i];
1874 client *c = find_client(d->client);
1875
1876 assert (c != NULL && c->mode == USED);
1877
1878 client_send(c == ctx->client, c, &d->msgs);
1879 }
1880
1881 if (ctx->client)
1882 pthread_mutex_unlock(&ctx->client->lock);
2056 } 1883 }
2057 1884
2058 int uw_rollback(uw_context ctx) { 1885 int uw_rollback(uw_context ctx) {
2059 size_t i, j; 1886 if (ctx->client)
2060 1887 pthread_mutex_unlock(&ctx->client->lock);
2061 for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED; ++i) {
2062 channel *ch = ctx->deltas[i].ch;
2063
2064 for (j = 0; j < ctx->deltas[i].n_subscribed && ctx->deltas[i].subscribed[j]; ++j) {
2065 client *c = ctx->deltas[i].subscribed[j];
2066
2067 uw_release_client(c);
2068 }
2069
2070 if (ctx->deltas[i].newness == NEW)
2071 uw_free_channel(ch);
2072 else
2073 uw_release_channel(ch);
2074 }
2075 1888
2076 return uw_db_rollback(ctx); 1889 return uw_db_rollback(ctx);
2077 } 1890 }