]> git.ipfire.org Git - thirdparty/dovecot/core.git/commitdiff
cassandra: Refactor continuing transaction after pending prepared statement is finished
authorTimo Sirainen <timo.sirainen@open-xchange.com>
Thu, 9 Nov 2023 12:44:13 +0000 (14:44 +0200)
committeraki.tuomi <aki.tuomi@open-xchange.com>
Fri, 10 Nov 2023 16:48:25 +0000 (16:48 +0000)
src/lib-sql/driver-cassandra.c

index 2573a209cb820278342887d326892f8b728370fa..39b39cf65c338339258ef17ed76ec3cac064b470 100644 (file)
@@ -223,6 +223,7 @@ struct cassandra_sql_statement {
        cass_int64_t timestamp;
 
        struct cassandra_result *result;
+       struct cassandra_transaction_context *pending_transaction;
 };
 
 struct cassandra_sql_prepared_statement {
@@ -2035,6 +2036,24 @@ driver_cassandra_transaction_begin(struct sql_db *db)
        return &ctx->ctx;
 }
 
+static void
+cassandra_prepared_statement_remove_pending(struct cassandra_sql_statement *stmt)
+{
+       struct cassandra_sql_prepared_statement *prep = stmt->prep;
+       struct cassandra_sql_statement *const *iter_stmt;
+
+       if (prep == NULL)
+               return;
+
+       array_foreach(&prep->pending_statements, iter_stmt) {
+               if (stmt == *iter_stmt) {
+                       array_delete(&prep->pending_statements,
+                               array_foreach_idx(&prep->pending_statements, iter_stmt), 1);
+                       break;
+               }
+       }
+}
+
 static void
 driver_cassandra_transaction_unref(struct cassandra_transaction_context **_ctx)
 {
@@ -2045,6 +2064,10 @@ driver_cassandra_transaction_unref(struct cassandra_transaction_context **_ctx)
        if (--ctx->refcount > 0)
                return;
 
+       if (ctx->stmt != NULL) {
+               cassandra_prepared_statement_remove_pending(ctx->stmt);
+               pool_unref(&ctx->stmt->stmt.pool);
+       }
        event_unref(&ctx->ctx.event);
        i_free(ctx->error);
        i_free(ctx);
@@ -2085,38 +2108,27 @@ transaction_commit_callback(struct sql_result *result, void *context)
 }
 
 static void
-cassandra_statement_send_query(struct cassandra_sql_statement **_stmt)
+cassandra_statement_send_query(struct cassandra_sql_statement *stmt)
 {
-       struct cassandra_sql_statement *stmt = *_stmt;
-
-       *_stmt = NULL;
        stmt->result->statement = stmt->cass_stmt;
        stmt->result->timestamp = stmt->timestamp;
        (void)cassandra_result_connect_and_send_query(stmt->result);
-       pool_unref(&stmt->stmt.pool);
 }
 
-static void
-driver_cassandra_transaction_commit(struct sql_transaction_context *_ctx,
-                                   sql_commit_callback_t *callback, void *context)
+static void cassandra_transaction_finish(struct cassandra_transaction_context *ctx,
+                                        const char *error)
 {
-       struct cassandra_transaction_context *ctx =
-               (struct cassandra_transaction_context *)_ctx;
-       struct cassandra_db *db = container_of(_ctx->db, struct cassandra_db, api);
-       struct sql_commit_result result;
-
-       i_zero(&result);
-       ctx->callback = callback;
-       ctx->context = context;
-
-       if (ctx->failed || ctx->stmt == NULL) {
-               if (ctx->failed)
-                       result.error = ctx->error;
+       struct cassandra_db *db =
+               container_of(ctx->ctx.db, struct cassandra_db, api);
 
-               e_debug(sql_transaction_finished_event(_ctx)->
-                       add_str("error", "Rolled back")->event(),
-                       "Transaction rolled back");
-               callback(&result, context);
+       if (error != NULL) {
+               struct sql_commit_result result = {
+                       .error = error,
+               };
+               e_error(sql_transaction_finished_event(&ctx->ctx)->
+                       add_str("error", error)->event(),
+                       "Transaction failed: %s", error);
+               ctx->callback(&result, ctx->context);
                driver_cassandra_transaction_unref(&ctx);
                return;
        }
@@ -2144,7 +2156,40 @@ driver_cassandra_transaction_commit(struct sql_transaction_context *_ctx,
                        return;
                }
        }
-       cassandra_statement_send_query(&ctx->stmt);
+       cassandra_statement_send_query(ctx->stmt);
+}
+
+static void
+driver_cassandra_transaction_commit(struct sql_transaction_context *_ctx,
+                                   sql_commit_callback_t *callback, void *context)
+{
+       struct cassandra_transaction_context *ctx =
+               (struct cassandra_transaction_context *)_ctx;
+       struct sql_commit_result result;
+
+       i_zero(&result);
+       ctx->callback = callback;
+       ctx->context = context;
+
+       if (ctx->failed || ctx->stmt == NULL) {
+               if (ctx->failed)
+                       result.error = ctx->error;
+
+               e_debug(sql_transaction_finished_event(_ctx)->
+                       add_str("error", "Rolled back")->event(),
+                       "Transaction rolled back");
+               callback(&result, context);
+               driver_cassandra_transaction_unref(&ctx);
+               return;
+       }
+
+       i_assert(ctx->stmt->pending_transaction == NULL);
+       if (ctx->stmt->prep != NULL && ctx->stmt->cass_stmt == NULL) {
+               /* wait for prepare to finish */
+               ctx->stmt->pending_transaction = ctx;
+               return;
+       }
+       cassandra_transaction_finish(ctx, NULL);
 }
 
 static void
@@ -2318,11 +2363,20 @@ static void prepare_finish_statement(struct cassandra_sql_statement *stmt)
        if (stmt->prep->prepared == NULL) {
                i_assert(stmt->prep->error != NULL);
 
-               if (stmt->result != NULL) {
+               if (stmt->pending_transaction != NULL) {
+                       const char *error = t_strdup_printf(
+                               "Failed to prepare statement '%s': %s",
+                               sql_statement_get_log_query(&stmt->stmt),
+                               stmt->prep->error);
+                       cassandra_transaction_finish(stmt->pending_transaction,
+                                                    error);
+               } else if (stmt->result != NULL) {
                        stmt->result->error = i_strdup(stmt->prep->error);
                        result_finish(stmt->result);
+                       pool_unref(&stmt->stmt.pool);
+               } else {
+                       pool_unref(&stmt->stmt.pool);
                }
-               pool_unref(&stmt->stmt.pool);
                return;
        }
        stmt->cass_stmt = cass_prepared_bind(stmt->prep->prepared);
@@ -2334,8 +2388,12 @@ static void prepare_finish_statement(struct cassandra_sql_statement *stmt)
                array_foreach(&stmt->pending_args, arg)
                        prepare_finish_arg(stmt, arg);
        }
-       if (stmt->result != NULL)
-               cassandra_statement_send_query(&stmt);
+       if (stmt->pending_transaction != NULL)
+               cassandra_transaction_finish(stmt->pending_transaction, NULL);
+       else if (stmt->result != NULL) {
+               cassandra_statement_send_query(stmt);
+               pool_unref(&stmt->stmt.pool);
+       }
 }
 
 static void