# HG changeset patch # User Adam Chlipala # Date 1238347801 14400 # Node ID 9a2c18dab11d5d071f4fde05e53b9dd0d220421a # Parent 5bbb542243e83556bbc82ba047ad6b1bb3c7bd60 Expunging non-nullable rows diff -r 5bbb542243e8 -r 9a2c18dab11d include/urweb.h --- a/include/urweb.h Sun Mar 29 11:37:29 2009 -0400 +++ b/include/urweb.h Sun Mar 29 13:30:01 2009 -0400 @@ -9,7 +9,7 @@ void uw_global_init(void); void uw_client_connect(unsigned id, int pass, int sock); -void uw_prune_clients(time_t timeout); +void uw_prune_clients(uw_context); uw_context uw_init(size_t outHeaders_len, size_t script_len, size_t page_len, size_t heap_len); void uw_set_db(uw_context, void*); diff -r 5bbb542243e8 -r 9a2c18dab11d lib/ur/basis.urs --- a/lib/ur/basis.urs Sun Mar 29 11:37:29 2009 -0400 +++ b/lib/ur/basis.urs Sun Mar 29 13:30:01 2009 -0400 @@ -207,15 +207,12 @@ val sql_float : sql_injectable_prim float val sql_string : sql_injectable_prim string val sql_time : sql_injectable_prim time - -class sql_injectable_nullable -val sql_channel : t ::: Type -> sql_injectable_nullable (channel t) -val sql_client : sql_injectable_nullable client +val sql_channel : t ::: Type -> sql_injectable_prim (channel t) +val sql_client : sql_injectable_prim client class sql_injectable val sql_prim : t ::: Type -> sql_injectable_prim t -> sql_injectable t val sql_option_prim : t ::: Type -> sql_injectable_prim t -> sql_injectable (option t) -val sql_nullable : t ::: Type -> sql_injectable_nullable t -> sql_injectable (option t) val sql_inject : tables ::: {{Type}} -> agg ::: {{Type}} -> exps ::: {Type} -> t ::: Type diff -r 5bbb542243e8 -r 9a2c18dab11d lib/ur/top.ur --- a/lib/ur/top.ur Sun Mar 29 11:37:29 2009 -0400 +++ b/lib/ur/top.ur Sun Mar 29 13:30:01 2009 -0400 @@ -200,7 +200,7 @@ functor Broadcast(M : sig type t end) = struct sequence s - table t : {Id : int, Client : option client, Channel : option (channel M.t)} + table t : {Id : int, Client : client, Channel : channel M.t} type topic = int @@ -208,27 +208,17 @@ val create = nextval s - val cleanup = - dml (DELETE FROM t WHERE Client IS NULL) - fun subscribe id = cli <- self; - cleanup; - ro <- oneOrNoRows (SELECT t.Channel FROM t WHERE t.Id = {[id]} AND t.Client = {[Some cli]}); + ro <- oneOrNoRows (SELECT t.Channel FROM t WHERE t.Id = {[id]} AND t.Client = {[cli]}); case ro of None => ch <- channel; - dml (INSERT INTO t (Id, Client, Channel) VALUES ({[id]}, {[Some cli]}, {[Some ch]})); + dml (INSERT INTO t (Id, Client, Channel) VALUES ({[id]}, {[cli]}, {[ch]})); return ch - | Some r => - case r.T.Channel of - None => error Broadcast.subscribe: Got null result - | Some ch => return ch + | Some r => return r.T.Channel fun send id msg = - cleanup; queryI (SELECT t.Channel FROM t WHERE t.Id = {[id]}) - (fn r => case r.T.Channel of - None => error Broadcast.send: Got null result - | Some ch => Basis.send ch msg) + (fn r => Basis.send r.T.Channel msg) end diff -r 5bbb542243e8 -r 9a2c18dab11d src/c/driver.c --- a/src/c/driver.c Sun Mar 29 11:37:29 2009 -0400 +++ b/src/c/driver.c Sun Mar 29 13:30:01 2009 -0400 @@ -278,8 +278,11 @@ } static void *client_pruner(void *data) { + uw_context ctx = uw_init(0, 0, 0, 0); + uw_db_init(ctx); + while (1) { - uw_prune_clients(5); + uw_prune_clients(ctx); sleep(5); } } diff -r 5bbb542243e8 -r 9a2c18dab11d src/c/urweb.c --- a/src/c/urweb.c Sun Mar 29 11:37:29 2009 -0400 +++ b/src/c/urweb.c Sun Mar 29 13:30:01 2009 -0400 @@ -208,7 +208,7 @@ } static void free_client(client *c) { - printf("Freeing client %d\n", c->id); + printf("Freeing client %u\n", c->id); c->mode = UNUSED; c->pass = -1; @@ -217,34 +217,6 @@ clients_free = c; } -extern int uw_timeout; - -void uw_prune_clients() { - client *c, *next, *prev = NULL; - time_t cutoff; - - cutoff = time(NULL) - uw_timeout; - - pthread_mutex_lock(&clients_mutex); - - for (c = clients_used; c; c = next) { - next = c->next; - pthread_mutex_lock(&c->lock); - if (c->last_contact < cutoff) { - if (prev) - prev->next = next; - else - clients_used = next; - free_client(c); - } - else - prev = c; - pthread_mutex_unlock(&c->lock); - } - - pthread_mutex_unlock(&clients_mutex); -} - static uw_Basis_channel new_channel(client *c) { uw_Basis_channel ch = {c->id, c->n_channels++}; return ch; @@ -322,7 +294,7 @@ char error_message[ERROR_BUF_LEN]; }; -extern int uw_inputs_len; +extern int uw_inputs_len, uw_timeout; uw_context uw_init(size_t outHeaders_len, size_t script_len, size_t page_len, size_t heap_len) { uw_context ctx = malloc(sizeof(struct uw_context)); @@ -1888,3 +1860,53 @@ return uw_db_rollback(ctx); } + + +// "Garbage collection" + +void uw_expunger(uw_context ctx, uw_Basis_client cli); + +static failure_kind uw_expunge(uw_context ctx, uw_Basis_client cli) { + int r = setjmp(ctx->jmp_buf); + + if (r == 0) + uw_expunger(ctx, cli); + + return r; +} + +void uw_prune_clients(uw_context ctx) { + client *c, *next, *prev = NULL; + time_t cutoff; + + cutoff = time(NULL) - uw_timeout; + + pthread_mutex_lock(&clients_mutex); + + for (c = clients_used; c; c = next) { + next = c->next; + pthread_mutex_lock(&c->lock); + if (c->last_contact < cutoff) { + failure_kind fk = UNLIMITED_RETRY; + if (prev) + prev->next = next; + else + clients_used = next; + while (fk == UNLIMITED_RETRY) { + uw_reset(ctx); + fk = uw_expunge(ctx, c->id); + if (fk == SUCCESS) { + free_client(c); + break; + } + } + if (fk != SUCCESS) + printf("Expunge blocked by error: %s\n", uw_error_message(ctx)); + } + else + prev = c; + pthread_mutex_unlock(&c->lock); + } + + pthread_mutex_unlock(&clients_mutex); +} diff -r 5bbb542243e8 -r 9a2c18dab11d src/cjr.sml --- a/src/cjr.sml Sun Mar 29 11:37:29 2009 -0400 +++ b/src/cjr.sml Sun Mar 29 13:30:01 2009 -0400 @@ -106,7 +106,7 @@ | DTable of string * (string * typ) list | DSequence of string - | DDatabase of string + | DDatabase of string * int | DPreparedStatements of (string * int) list | DJavaScript of string diff -r 5bbb542243e8 -r 9a2c18dab11d src/cjr_print.sml --- a/src/cjr_print.sml Sun Mar 29 11:37:29 2009 -0400 +++ b/src/cjr_print.sml Sun Mar 29 13:30:01 2009 -0400 @@ -1937,119 +1937,128 @@ string x, string " */", newline] - | DDatabase s => box [string "static void uw_db_validate(uw_context);", - newline, - string "static void uw_db_prepare(uw_context);", - newline, - newline, - string "void uw_db_init(uw_context ctx) {", - newline, - string "PGconn *conn = PQconnectdb(\"", - string (String.toString s), - string "\");", - newline, - string "if (conn == NULL) uw_error(ctx, BOUNDED_RETRY, ", - string "\"libpq can't allocate a connection.\");", - newline, - string "if (PQstatus(conn) != CONNECTION_OK) {", - newline, - box [string "char msg[1024];", + | DDatabase (s, n) => box [string "static void uw_db_validate(uw_context);", newline, - string "strncpy(msg, PQerrorMessage(conn), 1024);", + string "static void uw_db_prepare(uw_context);", newline, - string "msg[1023] = 0;", newline, - string "PQfinish(conn);", + string "void uw_db_init(uw_context ctx) {", newline, - string "uw_error(ctx, BOUNDED_RETRY, ", - string "\"Connection to Postgres server failed: %s\", msg);"], - newline, - string "}", - newline, - string "uw_set_db(ctx, conn);", - newline, - string "uw_db_validate(ctx);", - newline, - string "uw_db_prepare(ctx);", - newline, - string "}", - newline, - newline, - string "void uw_db_close(uw_context ctx) {", - newline, - string "PQfinish(uw_get_db(ctx));", - newline, - string "}", - newline, - newline, + string "PGconn *conn = PQconnectdb(\"", + string (String.toString s), + string "\");", + newline, + string "if (conn == NULL) uw_error(ctx, BOUNDED_RETRY, ", + string "\"libpq can't allocate a connection.\");", + newline, + string "if (PQstatus(conn) != CONNECTION_OK) {", + newline, + box [string "char msg[1024];", + newline, + string "strncpy(msg, PQerrorMessage(conn), 1024);", + newline, + string "msg[1023] = 0;", + newline, + string "PQfinish(conn);", + newline, + string "uw_error(ctx, BOUNDED_RETRY, ", + string "\"Connection to Postgres server failed: %s\", msg);"], + newline, + string "}", + newline, + string "uw_set_db(ctx, conn);", + newline, + string "uw_db_validate(ctx);", + newline, + string "uw_db_prepare(ctx);", + newline, + string "}", + newline, + newline, + string "void uw_db_close(uw_context ctx) {", + newline, + string "PQfinish(uw_get_db(ctx));", + newline, + string "}", + newline, + newline, - string "int uw_db_begin(uw_context ctx) {", - newline, - string "PGconn *conn = uw_get_db(ctx);", - newline, - string "PGresult *res = PQexec(conn, \"BEGIN ISOLATION LEVEL SERIALIZABLE\");", - newline, - newline, - string "if (res == NULL) return 1;", - newline, - newline, - string "if (PQresultStatus(res) != PGRES_COMMAND_OK) {", - box [string "PQclear(res);", + string "int uw_db_begin(uw_context ctx) {", newline, - string "return 1;", - newline], - string "}", - newline, - string "return 0;", - newline, - string "}", - newline, - newline, + string "PGconn *conn = uw_get_db(ctx);", + newline, + string "PGresult *res = PQexec(conn, \"BEGIN ISOLATION LEVEL SERIALIZABLE\");", + newline, + newline, + string "if (res == NULL) return 1;", + newline, + newline, + string "if (PQresultStatus(res) != PGRES_COMMAND_OK) {", + box [string "PQclear(res);", + newline, + string "return 1;", + newline], + string "}", + newline, + string "return 0;", + newline, + string "}", + newline, + newline, - string "int uw_db_commit(uw_context ctx) {", - newline, - string "PGconn *conn = uw_get_db(ctx);", - newline, - string "PGresult *res = PQexec(conn, \"COMMIT\");", - newline, - newline, - string "if (res == NULL) return 1;", - newline, - newline, - string "if (PQresultStatus(res) != PGRES_COMMAND_OK) {", - box [string "PQclear(res);", + string "int uw_db_commit(uw_context ctx) {", newline, - string "return 1;", - newline], - string "}", - newline, - string "return 0;", - newline, - string "}", - newline, - newline, + string "PGconn *conn = uw_get_db(ctx);", + newline, + string "PGresult *res = PQexec(conn, \"COMMIT\");", + newline, + newline, + string "if (res == NULL) return 1;", + newline, + newline, + string "if (PQresultStatus(res) != PGRES_COMMAND_OK) {", + box [string "PQclear(res);", + newline, + string "return 1;", + newline], + string "}", + newline, + string "return 0;", + newline, + string "}", + newline, + newline, - string "int uw_db_rollback(uw_context ctx) {", - newline, - string "PGconn *conn = uw_get_db(ctx);", - newline, - string "PGresult *res = PQexec(conn, \"ROLLBACK\");", - newline, - newline, - string "if (res == NULL) return 1;", - newline, - newline, - string "if (PQresultStatus(res) != PGRES_COMMAND_OK) {", - box [string "PQclear(res);", + string "int uw_db_rollback(uw_context ctx) {", newline, - string "return 1;", - newline], - string "}", - newline, - string "return 0;", - newline, - string "}", - newline] + string "PGconn *conn = uw_get_db(ctx);", + newline, + string "PGresult *res = PQexec(conn, \"ROLLBACK\");", + newline, + newline, + string "if (res == NULL) return 1;", + newline, + newline, + string "if (PQresultStatus(res) != PGRES_COMMAND_OK) {", + box [string "PQclear(res);", + newline, + string "return 1;", + newline], + string "}", + newline, + string "return 0;", + newline, + string "}", + newline, + newline, + + string "void uw_expunger(uw_context ctx, uw_Basis_client cli) {", + newline, + box [p_enamed env n, + string "(ctx, cli);", + newline], + string "}", + newline] | DPreparedStatements [] => box [string "static void uw_db_prepare(uw_context ctx) {", diff -r 5bbb542243e8 -r 9a2c18dab11d src/mono.sml --- a/src/mono.sml Sun Mar 29 11:37:29 2009 -0400 +++ b/src/mono.sml Sun Mar 29 13:30:01 2009 -0400 @@ -122,7 +122,7 @@ | DTable of string * (string * typ) list | DSequence of string - | DDatabase of string + | DDatabase of string * int | DJavaScript of string diff -r 5bbb542243e8 -r 9a2c18dab11d src/mono_print.sml --- a/src/mono_print.sml Sun Mar 29 11:37:29 2009 -0400 +++ b/src/mono_print.sml Sun Mar 29 13:30:01 2009 -0400 @@ -413,9 +413,13 @@ | DSequence s => box [string "(* SQL sequence ", string s, string "*)"] - | DDatabase s => box [string "database", - space, - string s] + | DDatabase (s, n) => box [string "database", + space, + string s, + space, + string "(", + p_enamed env n, + string ")"] | DJavaScript s => box [string "JavaScript(", string s, string ")"] diff -r 5bbb542243e8 -r 9a2c18dab11d src/mono_shake.sml --- a/src/mono_shake.sml Sun Mar 29 11:37:29 2009 -0400 +++ b/src/mono_shake.sml Sun Mar 29 13:30:01 2009 -0400 @@ -45,6 +45,7 @@ let val page_es = List.foldl (fn ((DExport (_, _, n, _, _), _), page_es) => n :: page_es + | ((DDatabase (_, n), _), page_es) => n :: page_es | (_, page_es) => page_es) [] file val (cdef, edef) = foldl (fn ((DDatatype (_, n, xncs), _), (cdef, edef)) => diff -r 5bbb542243e8 -r 9a2c18dab11d src/monoize.sml --- a/src/monoize.sml Sun Mar 29 11:37:29 2009 -0400 +++ b/src/monoize.sml Sun Mar 29 13:30:01 2009 -0400 @@ -165,8 +165,6 @@ | L.CApp ((L.CFfi ("Basis", "sql_injectable_prim"), _), t) => (L'.TFun (mt env dtmap t, (L'.TFfi ("Basis", "string"), loc)), loc) - | L.CApp ((L.CFfi ("Basis", "sql_injectable_nullable"), _), t) => - (L'.TFun (mt env dtmap t, (L'.TFfi ("Basis", "string"), loc)), loc) | L.CApp ((L.CFfi ("Basis", "sql_injectable"), _), t) => (L'.TFun (mt env dtmap t, (L'.TFfi ("Basis", "string"), loc)), loc) | L.CApp ((L.CApp ((L.CFfi ("Basis", "sql_unary"), _), _), _), _) => @@ -248,6 +246,8 @@ val lookup : t -> foo_kind -> int -> (int -> t -> L'.decl * t) -> t * int val enter : t -> t val decls : t -> L'.decl list + + val freshName : t -> int * t end = struct structure M = BinaryMapFn(struct @@ -274,6 +274,7 @@ } fun enter ({count, map, ...} : t) = {count = count, map = map, decls = []} +fun freshName {count, map, decls} = (count, {count = count + 1, map = map, decls = decls}) fun decls ({decls, ...} : t) = decls fun lookup (t as {count, map, decls}) k n thunk = @@ -1455,26 +1456,6 @@ result = s}), loc)), loc)), loc), fm) end - | L.ECApp ((L.EFfi ("Basis", "sql_nullable"), _), t) => - let - val t = monoType env t - val s = (L'.TFfi ("Basis", "string"), loc) - in - ((L'.EAbs ("f", - (L'.TFun (t, s), loc), - (L'.TFun ((L'.TOption t, loc), s), loc), - (L'.EAbs ("x", - (L'.TOption t, loc), - s, - (L'.ECase ((L'.ERel 0, loc), - [((L'.PNone t, loc), - (L'.EPrim (Prim.String "NULL"), loc)), - ((L'.PSome (t, (L'.PVar ("y", t), loc)), loc), - (L'.EApp ((L'.ERel 2, loc), (L'.ERel 0, loc)), loc))], - {disc = (L'.TOption t, loc), - result = s}), loc)), loc)), loc), - fm) - end | L.ECApp ((L.EFfi ("Basis", "sql_subset"), _), _) => ((L'.ERecord [], loc), fm) @@ -2464,7 +2445,7 @@ [(L'.DSequence s, loc), (L'.DVal (x, n, t', e, s), loc)]) end - | L.DDatabase s => SOME (env, fm, [(L'.DDatabase s, loc)]) + | L.DDatabase _ => NONE | L.DCookie (x, n, t, s) => let val t = (L.CFfi ("Basis", "string"), loc) @@ -2477,7 +2458,9 @@ end end -fun monoize env ds = +datatype expungable = Client | Channel + +fun monoize env file = let val p = !urlPrefix val () = @@ -2488,14 +2471,100 @@ else () + val loc = E.dummySpan + val client = (L'.TFfi ("Basis", "client"), loc) + val unit = (L'.TRecord [], loc) + fun expunger () = + let + val target = (L'.EFfiApp ("Basis", "sqlifyClient", [(L'.ERel 0, loc)]), loc) + + fun doTable (tab, xts, e) = + case xts of + L.CRecord (_, xts) => + let + val (nullable, notNullable) = + foldl (fn ((x, t), st as (nullable, notNullable)) => + case #1 x of + L.CName x => + (case #1 t of + L.CFfi ("Basis", "client") => + (nullable, (x, Client) :: notNullable) + | L.CApp ((L.CFfi ("Basis", "option"), _), + (L.CFfi ("Basis", "client"), _)) => + ((x, Client) :: nullable, notNullable) + | L.CApp ((L.CFfi ("Basis", "channel"), _), _) => + (nullable, (x, Channel) :: notNullable) + | L.CApp ((L.CFfi ("Basis", "option"), _), + (L.CApp ((L.CFfi ("Basis", "channel"), _), _), _)) => + ((x, Channel) :: nullable, notNullable) + | _ => st) + | _ => st) ([], []) xts + + val e = + case notNullable of + [] => e + | eb :: ebs => + let + fun cond (x, v) = + (L'.EStrcat ((L'.EPrim (Prim.String ("uw_" ^ x + ^ (case v of + Client => "" + | Channel => " >> 32") + ^ " = ")), loc), + target), loc) + in + (L'.ESeq ( + (L'.EDml (foldl + (fn (eb, s) => + (L'.EStrcat (s, + (L'.EStrcat ((L'.EPrim (Prim.String " AND "), + loc), + cond eb), loc)), loc)) + (L'.EStrcat ((L'.EPrim (Prim.String ("DELETE FROM uw_" + ^ tab + ^ " WHERE ")), loc), + cond eb), loc) + ebs), loc), + e), loc) + end + in + e + end + | _ => e + + val e = (L'.ERecord [], loc) + in + foldl (fn ((d, _), e) => + case d of + L.DTable (_, _, xts, tab) => doTable (tab, #1 xts, e) + | _ => e) e file + end + val (_, _, ds) = List.foldl (fn (d, (env, fm, ds)) => - case monoDecl (env, fm) d of - NONE => (env, fm, ds) - | SOME (env, fm, ds') => - (env, - Fm.enter fm, - ds' @ Fm.decls fm @ ds)) - (env, Fm.empty (CoreUtil.File.maxName ds + 1), []) ds + case #1 d of + L.DDatabase s => + let + val (n, fm) = Fm.freshName fm + + + val d = L'.DVal ("expunger", + n, + (L'.TFun (client, unit), loc), + (L'.EAbs ("cli", client, unit, expunger ()), loc), + "expunger") + in + (env, Fm.enter fm, (L'.DDatabase (s, n), loc) + :: (d, loc) + :: ds) + end + | _ => + case monoDecl (env, fm) d of + NONE => (env, fm, ds) + | SOME (env, fm, ds') => + (env, + Fm.enter fm, + ds' @ Fm.decls fm @ ds)) + (env, Fm.empty (CoreUtil.File.maxName file + 1), []) file in rev ds end