Mercurial > urweb
comparison src/c/urweb.c @ 2033:ea0ecd5fa9df
Add locking to enforce atomicity of message sends from one transaction
author | Adam Chlipala <adam@chlipala.net> |
---|---|
date | Fri, 27 Jun 2014 14:39:31 -0400 |
parents | 6add6d00ef5f |
children | 1d36654c2d21 |
comparison
equal
deleted
inserted
replaced
2032:884673e5f7d5 | 2033:ea0ecd5fa9df |
---|---|
3302 return NULL; | 3302 return NULL; |
3303 | 3303 |
3304 return s; | 3304 return s; |
3305 } | 3305 } |
3306 | 3306 |
3307 static pthread_mutex_t message_send_mutex = PTHREAD_MUTEX_INITIALIZER; | |
3308 | |
3307 int uw_commit(uw_context ctx) { | 3309 int uw_commit(uw_context ctx) { |
3308 int i; | 3310 int i; |
3309 char *sig; | 3311 char *sig; |
3310 | 3312 |
3311 if (uw_has_error(ctx)) { | 3313 if (uw_has_error(ctx)) { |
3321 uw_rollback(ctx, 0); | 3323 uw_rollback(ctx, 0); |
3322 return 0; | 3324 return 0; |
3323 } | 3325 } |
3324 } | 3326 } |
3325 | 3327 |
3328 // Here's an important lock to provide the abstraction that all messages from one transaction are sent as an atomic unit. | |
3329 if (ctx->used_deltas > 0) | |
3330 pthread_mutex_lock(&message_send_mutex); | |
3331 | |
3326 if (ctx->transaction_started) { | 3332 if (ctx->transaction_started) { |
3327 int code = ctx->app->db_commit(ctx); | 3333 int code = ctx->app->db_commit(ctx); |
3328 | 3334 |
3329 if (code) { | 3335 if (code) { |
3336 if (ctx->used_deltas > 0) | |
3337 pthread_mutex_unlock(&message_send_mutex); | |
3338 | |
3330 if (ctx->client) | 3339 if (ctx->client) |
3331 release_client(ctx->client); | 3340 release_client(ctx->client); |
3332 | 3341 |
3333 if (code == -1) { | 3342 if (code == -1) { |
3334 // This case is for a serialization failure, which is not really an "error." | 3343 // This case is for a serialization failure, which is not really an "error." |
3358 for (i = ctx->used_transactionals-1; i >= 0; --i) | 3367 for (i = ctx->used_transactionals-1; i >= 0; --i) |
3359 if (ctx->transactionals[i].rollback == NULL) | 3368 if (ctx->transactionals[i].rollback == NULL) |
3360 if (ctx->transactionals[i].commit) { | 3369 if (ctx->transactionals[i].commit) { |
3361 ctx->transactionals[i].commit(ctx->transactionals[i].data); | 3370 ctx->transactionals[i].commit(ctx->transactionals[i].data); |
3362 if (uw_has_error(ctx)) { | 3371 if (uw_has_error(ctx)) { |
3363 if (ctx->client) | 3372 if (ctx->used_deltas > 0) |
3364 release_client(ctx->client); | 3373 pthread_mutex_unlock(&message_send_mutex); |
3365 | 3374 |
3366 for (i = ctx->used_transactionals-1; i >= 0; --i) | 3375 if (ctx->client) |
3367 if (ctx->transactionals[i].rollback != NULL) | 3376 release_client(ctx->client); |
3368 ctx->transactionals[i].rollback(ctx->transactionals[i].data); | 3377 |
3369 | 3378 for (i = ctx->used_transactionals-1; i >= 0; --i) |
3370 for (i = ctx->used_transactionals-1; i >= 0; --i) | 3379 if (ctx->transactionals[i].rollback != NULL) |
3371 if (ctx->transactionals[i].free) | 3380 ctx->transactionals[i].rollback(ctx->transactionals[i].data); |
3372 ctx->transactionals[i].free(ctx->transactionals[i].data, 0); | 3381 |
3382 for (i = ctx->used_transactionals-1; i >= 0; --i) | |
3383 if (ctx->transactionals[i].free) | |
3384 ctx->transactionals[i].free(ctx->transactionals[i].data, 0); | |
3373 | 3385 |
3374 return 0; | 3386 return 0; |
3375 } | 3387 } |
3376 } | 3388 } |
3377 | 3389 |
3382 assert (c != NULL); | 3394 assert (c != NULL); |
3383 assert(c->mode == USED); | 3395 assert(c->mode == USED); |
3384 | 3396 |
3385 client_send(c, &d->msgs, ctx->script.start, uw_buffer_used(&ctx->script)); | 3397 client_send(c, &d->msgs, ctx->script.start, uw_buffer_used(&ctx->script)); |
3386 } | 3398 } |
3399 | |
3400 if (ctx->used_deltas > 0) | |
3401 pthread_mutex_unlock(&message_send_mutex); | |
3387 | 3402 |
3388 if (ctx->client) | 3403 if (ctx->client) |
3389 release_client(ctx->client); | 3404 release_client(ctx->client); |
3390 | 3405 |
3391 for (i = ctx->used_transactionals-1; i >= 0; --i) | 3406 for (i = ctx->used_transactionals-1; i >= 0; --i) |