comparison src/c/urweb.c @ 2033:ea0ecd5fa9df

Add locking to enforce atomicity of message sends from one transaction
author Adam Chlipala <adam@chlipala.net>
date Fri, 27 Jun 2014 14:39:31 -0400
parents 6add6d00ef5f
children 1d36654c2d21
comparison
equal deleted inserted replaced
2032:884673e5f7d5 2033:ea0ecd5fa9df
3302 return NULL; 3302 return NULL;
3303 3303
3304 return s; 3304 return s;
3305 } 3305 }
3306 3306
3307 static pthread_mutex_t message_send_mutex = PTHREAD_MUTEX_INITIALIZER;
3308
3307 int uw_commit(uw_context ctx) { 3309 int uw_commit(uw_context ctx) {
3308 int i; 3310 int i;
3309 char *sig; 3311 char *sig;
3310 3312
3311 if (uw_has_error(ctx)) { 3313 if (uw_has_error(ctx)) {
3321 uw_rollback(ctx, 0); 3323 uw_rollback(ctx, 0);
3322 return 0; 3324 return 0;
3323 } 3325 }
3324 } 3326 }
3325 3327
3328 // Here's an important lock to provide the abstraction that all messages from one transaction are sent as an atomic unit.
3329 if (ctx->used_deltas > 0)
3330 pthread_mutex_lock(&message_send_mutex);
3331
3326 if (ctx->transaction_started) { 3332 if (ctx->transaction_started) {
3327 int code = ctx->app->db_commit(ctx); 3333 int code = ctx->app->db_commit(ctx);
3328 3334
3329 if (code) { 3335 if (code) {
3336 if (ctx->used_deltas > 0)
3337 pthread_mutex_unlock(&message_send_mutex);
3338
3330 if (ctx->client) 3339 if (ctx->client)
3331 release_client(ctx->client); 3340 release_client(ctx->client);
3332 3341
3333 if (code == -1) { 3342 if (code == -1) {
3334 // This case is for a serialization failure, which is not really an "error." 3343 // This case is for a serialization failure, which is not really an "error."
3358 for (i = ctx->used_transactionals-1; i >= 0; --i) 3367 for (i = ctx->used_transactionals-1; i >= 0; --i)
3359 if (ctx->transactionals[i].rollback == NULL) 3368 if (ctx->transactionals[i].rollback == NULL)
3360 if (ctx->transactionals[i].commit) { 3369 if (ctx->transactionals[i].commit) {
3361 ctx->transactionals[i].commit(ctx->transactionals[i].data); 3370 ctx->transactionals[i].commit(ctx->transactionals[i].data);
3362 if (uw_has_error(ctx)) { 3371 if (uw_has_error(ctx)) {
3363 if (ctx->client) 3372 if (ctx->used_deltas > 0)
3364 release_client(ctx->client); 3373 pthread_mutex_unlock(&message_send_mutex);
3365 3374
3366 for (i = ctx->used_transactionals-1; i >= 0; --i) 3375 if (ctx->client)
3367 if (ctx->transactionals[i].rollback != NULL) 3376 release_client(ctx->client);
3368 ctx->transactionals[i].rollback(ctx->transactionals[i].data); 3377
3369 3378 for (i = ctx->used_transactionals-1; i >= 0; --i)
3370 for (i = ctx->used_transactionals-1; i >= 0; --i) 3379 if (ctx->transactionals[i].rollback != NULL)
3371 if (ctx->transactionals[i].free) 3380 ctx->transactionals[i].rollback(ctx->transactionals[i].data);
3372 ctx->transactionals[i].free(ctx->transactionals[i].data, 0); 3381
3382 for (i = ctx->used_transactionals-1; i >= 0; --i)
3383 if (ctx->transactionals[i].free)
3384 ctx->transactionals[i].free(ctx->transactionals[i].data, 0);
3373 3385
3374 return 0; 3386 return 0;
3375 } 3387 }
3376 } 3388 }
3377 3389
3382 assert (c != NULL); 3394 assert (c != NULL);
3383 assert(c->mode == USED); 3395 assert(c->mode == USED);
3384 3396
3385 client_send(c, &d->msgs, ctx->script.start, uw_buffer_used(&ctx->script)); 3397 client_send(c, &d->msgs, ctx->script.start, uw_buffer_used(&ctx->script));
3386 } 3398 }
3399
3400 if (ctx->used_deltas > 0)
3401 pthread_mutex_unlock(&message_send_mutex);
3387 3402
3388 if (ctx->client) 3403 if (ctx->client)
3389 release_client(ctx->client); 3404 release_client(ctx->client);
3390 3405
3391 for (i = ctx->used_transactionals-1; i >= 0; --i) 3406 for (i = ctx->used_transactionals-1; i >= 0; --i)