]> git.ipfire.org Git - thirdparty/dovecot/core.git/commitdiff
pgsql: Don't break when using multiple transactions.
authorTimo Sirainen <tss@iki.fi>
Mon, 26 Jan 2009 00:40:15 +0000 (19:40 -0500)
committerTimo Sirainen <tss@iki.fi>
Mon, 26 Jan 2009 00:40:15 +0000 (19:40 -0500)
--HG--
branch : HEAD

src/lib-sql/driver-mysql.c
src/lib-sql/driver-pgsql.c

index b0ee4adfdf2d3d6334900c67914d7d35380e3b7e..e4bd52ee166aa1c46bae576191a604d4c83c44f3 100644 (file)
@@ -654,10 +654,6 @@ driver_mysql_update(struct sql_transaction_context *_ctx, const char *query)
                (struct mysql_transaction_context *)_ctx;
        struct mysql_query_list *list;
 
-       /* FIXME: with mysql we're just appending everything into one big
-          linked list which gets committed in sql_transaction_commit().
-          we could avoid this if we knew for sure that transactions actually
-          worked, but I don't know how to do that.. */
        list = p_new(ctx->query_pool, struct mysql_query_list, 1);
        list->query = p_strdup(ctx->query_pool, query);
 
index 0360445d4a837cfeb8a4950ba4e3f850924df1c2..bede0f8e198299575c9a13ebf190fd7ad84d35e3 100644 (file)
@@ -75,15 +75,19 @@ struct pgsql_transaction_context {
        sql_commit_callback_t *callback;
        void *context;
 
-       char *first_update;
+       pool_t query_pool;
+       struct pgsql_query_list *head, *tail;
        const char *error;
 
-       unsigned int opened:1;
        unsigned int begin_succeeded:1;
        unsigned int begin_failed:1;
        unsigned int failed:1;
 };
 
+struct pgsql_query_list {
+       struct pgsql_query_list *next;
+       const char *query;
+};
 extern struct sql_db driver_pgsql_db;
 extern struct sql_result driver_pgsql_result;
 
@@ -318,7 +322,9 @@ static void result_finish(struct pgsql_result *result)
                } T_END;
                result->api.callback = FALSE;
                free_result = db->sync_result != &result->api;
-       }
+               if (db->queue == NULL && db->ioloop != NULL)
+                       io_loop_stop(db->ioloop);
+       }                            
 
        if (disconnected) {
                /* disconnected */
@@ -448,6 +454,9 @@ static void queue_abort_next(struct pgsql_db *db)
        i_free(queue->result);
        i_free(queue->query);
        i_free(queue);
+
+       if (db->queue == NULL && db->ioloop != NULL)
+               io_loop_stop(db->ioloop);
 }
 
 static void queue_drop_timed_out_queries(struct pgsql_db *db)
@@ -604,7 +613,6 @@ static void pgsql_query_s_callback(struct sql_result *result, void *context)
 
        db->query_finished = TRUE;
        db->sync_result = result;
-       io_loop_stop(db->ioloop);
 }
 
 static struct sql_result *
@@ -634,7 +642,8 @@ driver_pgsql_query_s(struct sql_db *_db, const char *query)
        }
 
        db->query_finished = FALSE;
-       driver_pgsql_query(_db, query, pgsql_query_s_callback, db);
+       if (query != NULL)
+               driver_pgsql_query(_db, query, pgsql_query_s_callback, db);
 
        if (!db->query_finished) {
                if ((db->connected || db->connecting) && db->io != NULL)
@@ -849,6 +858,9 @@ driver_pgsql_transaction_begin(struct sql_db *db)
        ctx = i_new(struct pgsql_transaction_context, 1);
        ctx->ctx.db = db;
        ctx->refcount = 1;
+       /* we need to be able to handle multiple open transactions, so at least
+          for now just keep them in memory until commit time. */
+       ctx->query_pool = pool_alloconly_create("pgsql transaction", 1024);
        return &ctx->ctx;
 }
 
@@ -859,7 +871,7 @@ driver_pgsql_transaction_unref(struct pgsql_transaction_context *ctx)
        if (--ctx->refcount > 0)
                return;
 
-       i_free(ctx->first_update);
+       pool_unref(&ctx->query_pool);
        i_free(ctx);
 }
 
@@ -873,6 +885,23 @@ transaction_commit_callback(struct sql_result *result, void *context)
                ctx->callback(sql_result_get_error(result), ctx->context);
        else
                ctx->callback(NULL, ctx->context);
+       driver_pgsql_transaction_unref(ctx);
+}
+
+static void
+transaction_update_callback(struct sql_result *result, void *context)
+{
+       struct pgsql_transaction_context *ctx = context;
+
+       if (sql_result_next_row(result) < 0) {
+               if (!ctx->begin_succeeded)
+                       ctx->begin_failed = TRUE;
+               ctx->failed = TRUE;
+               ctx->error = sql_result_get_error(result);
+       } else {
+               ctx->begin_succeeded = TRUE;
+       }
+       driver_pgsql_transaction_unref(ctx);
 }
 
 static void
@@ -882,29 +911,27 @@ driver_pgsql_transaction_commit(struct sql_transaction_context *_ctx,
        struct pgsql_transaction_context *ctx =
                (struct pgsql_transaction_context *)_ctx;
 
-       if (ctx->failed) {
-               callback(ctx->error, context);
-               if (!ctx->begin_failed)
-                       sql_exec(_ctx->db, "ROLLBACK");
-               driver_pgsql_transaction_unref(ctx);
-               return;
-       }
-
        ctx->callback = callback;
        ctx->context = context;
 
-       if (ctx->first_update != NULL) {
-               sql_query(_ctx->db, ctx->first_update,
+       if (ctx->failed || ctx->head == NULL) {
+               callback(ctx->failed ? ctx->error : NULL, context);
+               driver_pgsql_transaction_unref(ctx);
+       } else if (ctx->head->next == NULL) {
+               /* just a single query, send it */
+               sql_query(_ctx->db, ctx->head->query,
                          transaction_commit_callback, ctx);
-               i_free_and_null(ctx->first_update);
-       } else if (ctx->opened) {
-               driver_pgsql_query_full(_ctx->db, "COMMIT",
-                                       transaction_commit_callback, ctx,
-                                       FALSE);
        } else {
-               /* nothing done */
-               ctx->callback(NULL, ctx->context);
-               driver_pgsql_transaction_unref(ctx);
+               /* multiple queries, use a transaction */
+               ctx->refcount++;
+               sql_query(_ctx->db, "BEGIN", transaction_update_callback, ctx);
+               while (ctx->head != NULL) {
+                       ctx->refcount++;
+                       sql_query(_ctx->db, ctx->head->query,
+                                 transaction_update_callback, ctx);
+                       ctx->head = ctx->head->next;
+               }
+               sql_query(_ctx->db, "COMMIT", transaction_commit_callback, ctx);
        }
 }
 
@@ -916,31 +943,48 @@ driver_pgsql_transaction_commit_s(struct sql_transaction_context *_ctx,
                (struct pgsql_transaction_context *)_ctx;
        struct sql_result *result;
 
-       if (ctx->failed) {
+       *error_r = NULL;
+
+       if (ctx->failed || ctx->head == NULL) {
+               /* nothing to be done */
+               result = NULL;
+       } else if (ctx->head->next == NULL) {
+               /* just a single query, send it */
+               result = sql_query_s(_ctx->db, ctx->head->query);
+       } else {
+               /* multiple queries, use a transaction */
+               ctx->refcount++;
+               sql_query(_ctx->db, "BEGIN", transaction_update_callback, ctx);
+               while (ctx->head != NULL) {
+                       ctx->refcount++;
+                       sql_query(_ctx->db, ctx->head->query,
+                                 transaction_update_callback, ctx);
+                       ctx->head = ctx->head->next;
+               }
+               if (ctx->refcount > 1) {
+                       /* flush the previous queries */
+                       (void)driver_pgsql_query_s(_ctx->db, NULL);
+               }
+
+               if (ctx->begin_failed) {
+                       result = NULL;
+               } else if (ctx->failed) {
+                       result = sql_query_s(_ctx->db, "ROLLBACK");
+               } else {
+                       result = sql_query_s(_ctx->db, "COMMIT");
+               }
+       }
+
+       if (ctx->failed)
                *error_r = ctx->error;
-               if (!ctx->begin_failed)
-                       sql_exec(_ctx->db, "ROLLBACK");
-       } else if (ctx->first_update != NULL) {
-               result = sql_query_s(_ctx->db, ctx->first_update);
+       else if (result != NULL) {
                if (sql_result_next_row(result) < 0)
                        *error_r = sql_result_get_error(result);
-               else
-                       *error_r = NULL;
-               i_free_and_null(ctx->first_update);
-               sql_result_free(result);
-       } else if (!ctx->opened) {
-               *error_r = NULL;
-       } else {
-               result = sql_query_s(_ctx->db, "COMMIT");
-               if (ctx->failed)
-                       *error_r = ctx->error;
-               else if (sql_result_next_row(result) < 0)
-                       *error_r = sql_result_get_error(result);
-               else
-                       *error_r = NULL;
-               sql_result_free(result);
        }
+       if (result != NULL)
+               sql_result_free(result);
 
+       i_assert(ctx->refcount == 1);
        driver_pgsql_transaction_unref(ctx);
        return *error_r == NULL ? 0 : -1;
 }
@@ -951,24 +995,7 @@ driver_pgsql_transaction_rollback(struct sql_transaction_context *_ctx)
        struct pgsql_transaction_context *ctx =
                (struct pgsql_transaction_context *)_ctx;
 
-       if (!ctx->begin_failed)
-               sql_exec(_ctx->db, "ROLLBACK");
-       driver_pgsql_transaction_unref(ctx);
-}
-
-static void
-transaction_update_callback(struct sql_result *result, void *context)
-{
-       struct pgsql_transaction_context *ctx = context;
-
-       if (sql_result_next_row(result) < 0) {
-               if (!ctx->begin_succeeded)
-                       ctx->begin_failed = TRUE;
-               ctx->failed = TRUE;
-               ctx->error = sql_result_get_error(result);
-       } else {
-               ctx->begin_succeeded = TRUE;
-       }
+       i_assert(ctx->refcount == 1);
        driver_pgsql_transaction_unref(ctx);
 }
 
@@ -977,28 +1004,16 @@ driver_pgsql_update(struct sql_transaction_context *_ctx, const char *query)
 {
        struct pgsql_transaction_context *ctx =
                (struct pgsql_transaction_context *)_ctx;
+       struct pgsql_query_list *list;
 
-       if (ctx->failed)
-               return;
-
-       if (!ctx->opened) {
-               if (ctx->first_update == NULL) {
-                       /* delay sending the first update in case there is
-                          only one to be sent and we don't need BEGIN/COMMIT */
-                       ctx->first_update = i_strdup(query);
-                       return;
-               }
-               ctx->opened = TRUE;
-               ctx->refcount += 2;
-               sql_query(_ctx->db, "BEGIN", transaction_update_callback, ctx);
-               sql_query(_ctx->db, ctx->first_update,
-                         transaction_update_callback, ctx);
-               i_free_and_null(ctx->first_update);
-       }
+       list = p_new(ctx->query_pool, struct pgsql_query_list, 1);
+       list->query = p_strdup(ctx->query_pool, query);
 
-       ctx->refcount++;
-       driver_pgsql_query_full(_ctx->db, query,
-                               transaction_update_callback, ctx, FALSE);
+       if (ctx->head == NULL)
+               ctx->head = list;
+       else
+               ctx->tail->next = list;
+       ctx->tail = list;
 }
 
 struct sql_db driver_pgsql_db = {