From: Timo Sirainen Date: Thu, 9 Nov 2023 12:44:13 +0000 (+0200) Subject: cassandra: Refactor continuing transaction after pending prepared statement is finished X-Git-Tag: 2.4.0~2447 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=9d91f5ae705894b913eef1883f4f50fa5279a5d9;p=thirdparty%2Fdovecot%2Fcore.git cassandra: Refactor continuing transaction after pending prepared statement is finished --- diff --git a/src/lib-sql/driver-cassandra.c b/src/lib-sql/driver-cassandra.c index 2573a209cb..39b39cf65c 100644 --- a/src/lib-sql/driver-cassandra.c +++ b/src/lib-sql/driver-cassandra.c @@ -223,6 +223,7 @@ struct cassandra_sql_statement { cass_int64_t timestamp; struct cassandra_result *result; + struct cassandra_transaction_context *pending_transaction; }; struct cassandra_sql_prepared_statement { @@ -2035,6 +2036,24 @@ driver_cassandra_transaction_begin(struct sql_db *db) return &ctx->ctx; } +static void +cassandra_prepared_statement_remove_pending(struct cassandra_sql_statement *stmt) +{ + struct cassandra_sql_prepared_statement *prep = stmt->prep; + struct cassandra_sql_statement *const *iter_stmt; + + if (prep == NULL) + return; + + array_foreach(&prep->pending_statements, iter_stmt) { + if (stmt == *iter_stmt) { + array_delete(&prep->pending_statements, + array_foreach_idx(&prep->pending_statements, iter_stmt), 1); + break; + } + } +} + static void driver_cassandra_transaction_unref(struct cassandra_transaction_context **_ctx) { @@ -2045,6 +2064,10 @@ driver_cassandra_transaction_unref(struct cassandra_transaction_context **_ctx) if (--ctx->refcount > 0) return; + if (ctx->stmt != NULL) { + cassandra_prepared_statement_remove_pending(ctx->stmt); + pool_unref(&ctx->stmt->stmt.pool); + } event_unref(&ctx->ctx.event); i_free(ctx->error); i_free(ctx); @@ -2085,38 +2108,27 @@ transaction_commit_callback(struct sql_result *result, void *context) } static void -cassandra_statement_send_query(struct cassandra_sql_statement **_stmt) +cassandra_statement_send_query(struct cassandra_sql_statement *stmt) { - struct cassandra_sql_statement *stmt = *_stmt; - - *_stmt = NULL; stmt->result->statement = stmt->cass_stmt; stmt->result->timestamp = stmt->timestamp; (void)cassandra_result_connect_and_send_query(stmt->result); - pool_unref(&stmt->stmt.pool); } -static void -driver_cassandra_transaction_commit(struct sql_transaction_context *_ctx, - sql_commit_callback_t *callback, void *context) +static void cassandra_transaction_finish(struct cassandra_transaction_context *ctx, + const char *error) { - struct cassandra_transaction_context *ctx = - (struct cassandra_transaction_context *)_ctx; - struct cassandra_db *db = container_of(_ctx->db, struct cassandra_db, api); - struct sql_commit_result result; - - i_zero(&result); - ctx->callback = callback; - ctx->context = context; - - if (ctx->failed || ctx->stmt == NULL) { - if (ctx->failed) - result.error = ctx->error; + struct cassandra_db *db = + container_of(ctx->ctx.db, struct cassandra_db, api); - e_debug(sql_transaction_finished_event(_ctx)-> - add_str("error", "Rolled back")->event(), - "Transaction rolled back"); - callback(&result, context); + if (error != NULL) { + struct sql_commit_result result = { + .error = error, + }; + e_error(sql_transaction_finished_event(&ctx->ctx)-> + add_str("error", error)->event(), + "Transaction failed: %s", error); + ctx->callback(&result, ctx->context); driver_cassandra_transaction_unref(&ctx); return; } @@ -2144,7 +2156,40 @@ driver_cassandra_transaction_commit(struct sql_transaction_context *_ctx, return; } } - cassandra_statement_send_query(&ctx->stmt); + cassandra_statement_send_query(ctx->stmt); +} + +static void +driver_cassandra_transaction_commit(struct sql_transaction_context *_ctx, + sql_commit_callback_t *callback, void *context) +{ + struct cassandra_transaction_context *ctx = + (struct cassandra_transaction_context *)_ctx; + struct sql_commit_result result; + + i_zero(&result); + ctx->callback = callback; + ctx->context = context; + + if (ctx->failed || ctx->stmt == NULL) { + if (ctx->failed) + result.error = ctx->error; + + e_debug(sql_transaction_finished_event(_ctx)-> + add_str("error", "Rolled back")->event(), + "Transaction rolled back"); + callback(&result, context); + driver_cassandra_transaction_unref(&ctx); + return; + } + + i_assert(ctx->stmt->pending_transaction == NULL); + if (ctx->stmt->prep != NULL && ctx->stmt->cass_stmt == NULL) { + /* wait for prepare to finish */ + ctx->stmt->pending_transaction = ctx; + return; + } + cassandra_transaction_finish(ctx, NULL); } static void @@ -2318,11 +2363,20 @@ static void prepare_finish_statement(struct cassandra_sql_statement *stmt) if (stmt->prep->prepared == NULL) { i_assert(stmt->prep->error != NULL); - if (stmt->result != NULL) { + if (stmt->pending_transaction != NULL) { + const char *error = t_strdup_printf( + "Failed to prepare statement '%s': %s", + sql_statement_get_log_query(&stmt->stmt), + stmt->prep->error); + cassandra_transaction_finish(stmt->pending_transaction, + error); + } else if (stmt->result != NULL) { stmt->result->error = i_strdup(stmt->prep->error); result_finish(stmt->result); + pool_unref(&stmt->stmt.pool); + } else { + pool_unref(&stmt->stmt.pool); } - pool_unref(&stmt->stmt.pool); return; } stmt->cass_stmt = cass_prepared_bind(stmt->prep->prepared); @@ -2334,8 +2388,12 @@ static void prepare_finish_statement(struct cassandra_sql_statement *stmt) array_foreach(&stmt->pending_args, arg) prepare_finish_arg(stmt, arg); } - if (stmt->result != NULL) - cassandra_statement_send_query(&stmt); + if (stmt->pending_transaction != NULL) + cassandra_transaction_finish(stmt->pending_transaction, NULL); + else if (stmt->result != NULL) { + cassandra_statement_send_query(stmt); + pool_unref(&stmt->stmt.pool); + } } static void