From: Timo Sirainen Date: Mon, 26 Jan 2009 00:40:15 +0000 (-0500) Subject: pgsql: Don't break when using multiple transactions. X-Git-Tag: 1.2.beta1~59 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=892b3cbf0eba9ba455448adcf71864a409345c6d;p=thirdparty%2Fdovecot%2Fcore.git pgsql: Don't break when using multiple transactions. --HG-- branch : HEAD --- diff --git a/src/lib-sql/driver-mysql.c b/src/lib-sql/driver-mysql.c index b0ee4adfdf..e4bd52ee16 100644 --- a/src/lib-sql/driver-mysql.c +++ b/src/lib-sql/driver-mysql.c @@ -654,10 +654,6 @@ driver_mysql_update(struct sql_transaction_context *_ctx, const char *query) (struct mysql_transaction_context *)_ctx; struct mysql_query_list *list; - /* FIXME: with mysql we're just appending everything into one big - linked list which gets committed in sql_transaction_commit(). - we could avoid this if we knew for sure that transactions actually - worked, but I don't know how to do that.. */ list = p_new(ctx->query_pool, struct mysql_query_list, 1); list->query = p_strdup(ctx->query_pool, query); diff --git a/src/lib-sql/driver-pgsql.c b/src/lib-sql/driver-pgsql.c index 0360445d4a..bede0f8e19 100644 --- a/src/lib-sql/driver-pgsql.c +++ b/src/lib-sql/driver-pgsql.c @@ -75,15 +75,19 @@ struct pgsql_transaction_context { sql_commit_callback_t *callback; void *context; - char *first_update; + pool_t query_pool; + struct pgsql_query_list *head, *tail; const char *error; - unsigned int opened:1; unsigned int begin_succeeded:1; unsigned int begin_failed:1; unsigned int failed:1; }; +struct pgsql_query_list { + struct pgsql_query_list *next; + const char *query; +}; extern struct sql_db driver_pgsql_db; extern struct sql_result driver_pgsql_result; @@ -318,7 +322,9 @@ static void result_finish(struct pgsql_result *result) } T_END; result->api.callback = FALSE; free_result = db->sync_result != &result->api; - } + if (db->queue == NULL && db->ioloop != NULL) + io_loop_stop(db->ioloop); + } if (disconnected) { /* disconnected */ @@ -448,6 +454,9 @@ static void queue_abort_next(struct pgsql_db *db) i_free(queue->result); i_free(queue->query); i_free(queue); + + if (db->queue == NULL && db->ioloop != NULL) + io_loop_stop(db->ioloop); } static void queue_drop_timed_out_queries(struct pgsql_db *db) @@ -604,7 +613,6 @@ static void pgsql_query_s_callback(struct sql_result *result, void *context) db->query_finished = TRUE; db->sync_result = result; - io_loop_stop(db->ioloop); } static struct sql_result * @@ -634,7 +642,8 @@ driver_pgsql_query_s(struct sql_db *_db, const char *query) } db->query_finished = FALSE; - driver_pgsql_query(_db, query, pgsql_query_s_callback, db); + if (query != NULL) + driver_pgsql_query(_db, query, pgsql_query_s_callback, db); if (!db->query_finished) { if ((db->connected || db->connecting) && db->io != NULL) @@ -849,6 +858,9 @@ driver_pgsql_transaction_begin(struct sql_db *db) ctx = i_new(struct pgsql_transaction_context, 1); ctx->ctx.db = db; ctx->refcount = 1; + /* we need to be able to handle multiple open transactions, so at least + for now just keep them in memory until commit time. */ + ctx->query_pool = pool_alloconly_create("pgsql transaction", 1024); return &ctx->ctx; } @@ -859,7 +871,7 @@ driver_pgsql_transaction_unref(struct pgsql_transaction_context *ctx) if (--ctx->refcount > 0) return; - i_free(ctx->first_update); + pool_unref(&ctx->query_pool); i_free(ctx); } @@ -873,6 +885,23 @@ transaction_commit_callback(struct sql_result *result, void *context) ctx->callback(sql_result_get_error(result), ctx->context); else ctx->callback(NULL, ctx->context); + driver_pgsql_transaction_unref(ctx); +} + +static void +transaction_update_callback(struct sql_result *result, void *context) +{ + struct pgsql_transaction_context *ctx = context; + + if (sql_result_next_row(result) < 0) { + if (!ctx->begin_succeeded) + ctx->begin_failed = TRUE; + ctx->failed = TRUE; + ctx->error = sql_result_get_error(result); + } else { + ctx->begin_succeeded = TRUE; + } + driver_pgsql_transaction_unref(ctx); } static void @@ -882,29 +911,27 @@ driver_pgsql_transaction_commit(struct sql_transaction_context *_ctx, struct pgsql_transaction_context *ctx = (struct pgsql_transaction_context *)_ctx; - if (ctx->failed) { - callback(ctx->error, context); - if (!ctx->begin_failed) - sql_exec(_ctx->db, "ROLLBACK"); - driver_pgsql_transaction_unref(ctx); - return; - } - ctx->callback = callback; ctx->context = context; - if (ctx->first_update != NULL) { - sql_query(_ctx->db, ctx->first_update, + if (ctx->failed || ctx->head == NULL) { + callback(ctx->failed ? ctx->error : NULL, context); + driver_pgsql_transaction_unref(ctx); + } else if (ctx->head->next == NULL) { + /* just a single query, send it */ + sql_query(_ctx->db, ctx->head->query, transaction_commit_callback, ctx); - i_free_and_null(ctx->first_update); - } else if (ctx->opened) { - driver_pgsql_query_full(_ctx->db, "COMMIT", - transaction_commit_callback, ctx, - FALSE); } else { - /* nothing done */ - ctx->callback(NULL, ctx->context); - driver_pgsql_transaction_unref(ctx); + /* multiple queries, use a transaction */ + ctx->refcount++; + sql_query(_ctx->db, "BEGIN", transaction_update_callback, ctx); + while (ctx->head != NULL) { + ctx->refcount++; + sql_query(_ctx->db, ctx->head->query, + transaction_update_callback, ctx); + ctx->head = ctx->head->next; + } + sql_query(_ctx->db, "COMMIT", transaction_commit_callback, ctx); } } @@ -916,31 +943,48 @@ driver_pgsql_transaction_commit_s(struct sql_transaction_context *_ctx, (struct pgsql_transaction_context *)_ctx; struct sql_result *result; - if (ctx->failed) { + *error_r = NULL; + + if (ctx->failed || ctx->head == NULL) { + /* nothing to be done */ + result = NULL; + } else if (ctx->head->next == NULL) { + /* just a single query, send it */ + result = sql_query_s(_ctx->db, ctx->head->query); + } else { + /* multiple queries, use a transaction */ + ctx->refcount++; + sql_query(_ctx->db, "BEGIN", transaction_update_callback, ctx); + while (ctx->head != NULL) { + ctx->refcount++; + sql_query(_ctx->db, ctx->head->query, + transaction_update_callback, ctx); + ctx->head = ctx->head->next; + } + if (ctx->refcount > 1) { + /* flush the previous queries */ + (void)driver_pgsql_query_s(_ctx->db, NULL); + } + + if (ctx->begin_failed) { + result = NULL; + } else if (ctx->failed) { + result = sql_query_s(_ctx->db, "ROLLBACK"); + } else { + result = sql_query_s(_ctx->db, "COMMIT"); + } + } + + if (ctx->failed) *error_r = ctx->error; - if (!ctx->begin_failed) - sql_exec(_ctx->db, "ROLLBACK"); - } else if (ctx->first_update != NULL) { - result = sql_query_s(_ctx->db, ctx->first_update); + else if (result != NULL) { if (sql_result_next_row(result) < 0) *error_r = sql_result_get_error(result); - else - *error_r = NULL; - i_free_and_null(ctx->first_update); - sql_result_free(result); - } else if (!ctx->opened) { - *error_r = NULL; - } else { - result = sql_query_s(_ctx->db, "COMMIT"); - if (ctx->failed) - *error_r = ctx->error; - else if (sql_result_next_row(result) < 0) - *error_r = sql_result_get_error(result); - else - *error_r = NULL; - sql_result_free(result); } + if (result != NULL) + sql_result_free(result); + i_assert(ctx->refcount == 1); driver_pgsql_transaction_unref(ctx); return *error_r == NULL ? 0 : -1; } @@ -951,24 +995,7 @@ driver_pgsql_transaction_rollback(struct sql_transaction_context *_ctx) struct pgsql_transaction_context *ctx = (struct pgsql_transaction_context *)_ctx; - if (!ctx->begin_failed) - sql_exec(_ctx->db, "ROLLBACK"); - driver_pgsql_transaction_unref(ctx); -} - -static void -transaction_update_callback(struct sql_result *result, void *context) -{ - struct pgsql_transaction_context *ctx = context; - - if (sql_result_next_row(result) < 0) { - if (!ctx->begin_succeeded) - ctx->begin_failed = TRUE; - ctx->failed = TRUE; - ctx->error = sql_result_get_error(result); - } else { - ctx->begin_succeeded = TRUE; - } + i_assert(ctx->refcount == 1); driver_pgsql_transaction_unref(ctx); } @@ -977,28 +1004,16 @@ driver_pgsql_update(struct sql_transaction_context *_ctx, const char *query) { struct pgsql_transaction_context *ctx = (struct pgsql_transaction_context *)_ctx; + struct pgsql_query_list *list; - if (ctx->failed) - return; - - if (!ctx->opened) { - if (ctx->first_update == NULL) { - /* delay sending the first update in case there is - only one to be sent and we don't need BEGIN/COMMIT */ - ctx->first_update = i_strdup(query); - return; - } - ctx->opened = TRUE; - ctx->refcount += 2; - sql_query(_ctx->db, "BEGIN", transaction_update_callback, ctx); - sql_query(_ctx->db, ctx->first_update, - transaction_update_callback, ctx); - i_free_and_null(ctx->first_update); - } + list = p_new(ctx->query_pool, struct pgsql_query_list, 1); + list->query = p_strdup(ctx->query_pool, query); - ctx->refcount++; - driver_pgsql_query_full(_ctx->db, query, - transaction_update_callback, ctx, FALSE); + if (ctx->head == NULL) + ctx->head = list; + else + ctx->tail->next = list; + ctx->tail = list; } struct sql_db driver_pgsql_db = {