From: Timo Sirainen Date: Thu, 9 Nov 2023 20:24:16 +0000 (+0200) Subject: cassandra: Prepare support for multiple statements in transaction X-Git-Tag: 2.4.0~2446 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=3eb765c5c62b3d74035ca33111a17bd294d7d8a2;p=thirdparty%2Fdovecot%2Fcore.git cassandra: Prepare support for multiple statements in transaction Prepare the code for an array of statements, but don't actually support more than one yet. --- diff --git a/src/lib-sql/driver-cassandra.c b/src/lib-sql/driver-cassandra.c index 39b39cf65c..8bcbfff87c 100644 --- a/src/lib-sql/driver-cassandra.c +++ b/src/lib-sql/driver-cassandra.c @@ -184,7 +184,7 @@ struct cassandra_transaction_context { sql_commit_callback_t *callback; void *context; - struct cassandra_sql_statement *stmt; + ARRAY(struct cassandra_sql_statement *) statements; enum cassandra_query_type query_type; char *error; @@ -2033,6 +2033,7 @@ driver_cassandra_transaction_begin(struct sql_db *db) ctx->ctx.db = db; ctx->ctx.event = event_create(db->event); ctx->refcount = 1; + i_array_init(&ctx->statements, 8); return &ctx->ctx; } @@ -2058,16 +2059,18 @@ static void driver_cassandra_transaction_unref(struct cassandra_transaction_context **_ctx) { struct cassandra_transaction_context *ctx = *_ctx; + struct cassandra_sql_statement *stmt; *_ctx = NULL; i_assert(ctx->refcount > 0); if (--ctx->refcount > 0) return; - if (ctx->stmt != NULL) { - cassandra_prepared_statement_remove_pending(ctx->stmt); - pool_unref(&ctx->stmt->stmt.pool); + array_foreach_elem(&ctx->statements, stmt) { + cassandra_prepared_statement_remove_pending(stmt); + pool_unref(&stmt->stmt.pool); } + array_free(&ctx->statements); event_unref(&ctx->ctx.event); i_free(ctx->error); i_free(ctx); @@ -2133,30 +2136,55 @@ static void cassandra_transaction_finish(struct cassandra_transaction_context *c return; } - /* just a single query, send it */ - enum cassandra_result_type result_type = - ctx->stmt->prep != NULL ? CASSANDRA_RESULT_TYPE_PREPARED : - CASSANDRA_RESULT_TYPE_QUERY; + struct cassandra_sql_statement *stmt; + bool have_nonprepared = FALSE; + const char *log_query = NULL; + array_foreach_elem(&ctx->statements, stmt) { + if (stmt->prep != NULL && stmt->cass_stmt == NULL) { + /* wait for prepare to finish in + prepare_finish_statement() */ + stmt->pending_transaction = ctx; + return; + } + + if (stmt->prep == NULL) + have_nonprepared = TRUE; + if (stmt->cass_stmt == NULL) { + stmt->cass_stmt = cass_statement_new( + sql_statement_get_query(&stmt->stmt), 0); + if (stmt->timestamp != 0) { + cass_statement_set_timestamp(stmt->cass_stmt, + stmt->timestamp); + } + } + if (log_query == NULL) + log_query = sql_statement_get_log_query(&stmt->stmt); + else + i_unreached(); /* only single statement is possible now */ + } + + enum cassandra_result_type result_type; + if (have_nonprepared) + result_type = CASSANDRA_RESULT_TYPE_QUERY; + else + result_type = CASSANDRA_RESULT_TYPE_PREPARED; + struct cassandra_result *cass_result = - driver_cassandra_result_init(db, - sql_statement_get_log_query(&ctx->stmt->stmt), + driver_cassandra_result_init(db, log_query, ctx->query_type, result_type, transaction_commit_callback, ctx); - if (ctx->stmt->prep == NULL) { - ctx->stmt->cass_stmt = - cass_statement_new(sql_statement_get_query(&ctx->stmt->stmt), 0); - if (ctx->stmt->timestamp != 0) { - cass_statement_set_timestamp(ctx->stmt->cass_stmt, - ctx->stmt->timestamp); - } - } else { - ctx->stmt->result = cass_result; - if (ctx->stmt->cass_stmt == NULL) { - /* wait for prepare to finish */ - return; - } + switch (result_type) { + case CASSANDRA_RESULT_TYPE_QUERY: + case CASSANDRA_RESULT_TYPE_PREPARED: { + struct cassandra_sql_statement *stmt = + array_idx_elem(&ctx->statements, 0); + stmt->result = cass_result; + cassandra_statement_send_query(stmt); + break; + } + case CASSANDRA_RESULT_TYPE_COUNT: + i_unreached(); } - cassandra_statement_send_query(ctx->stmt); } static void @@ -2171,7 +2199,7 @@ driver_cassandra_transaction_commit(struct sql_transaction_context *_ctx, ctx->callback = callback; ctx->context = context; - if (ctx->failed || ctx->stmt == NULL) { + if (ctx->failed || array_count(&ctx->statements) == 0) { if (ctx->failed) result.error = ctx->error; @@ -2183,12 +2211,6 @@ driver_cassandra_transaction_commit(struct sql_transaction_context *_ctx, return; } - i_assert(ctx->stmt->pending_transaction == NULL); - if (ctx->stmt->prep != NULL && ctx->stmt->cass_stmt == NULL) { - /* wait for prepare to finish */ - ctx->stmt->pending_transaction = ctx; - return; - } cassandra_transaction_finish(ctx, NULL); } @@ -2199,10 +2221,20 @@ driver_cassandra_try_commit_s(struct cassandra_transaction_context *ctx) struct cassandra_db *db = container_of(_ctx->db, struct cassandra_db, api); struct sql_result *result = NULL; + /* simplify the code for now: these aren't currently needed */ + if (array_count(&ctx->statements) != 1) + i_panic("cassandra: sql_transaction_commit_s() not supported for multiple statements"); + struct cassandra_sql_statement *stmt = + array_idx_elem(&ctx->statements, 0); + if (stmt->prep != NULL) { + /* nothing should be using this - don't bother implementing */ + i_panic("cassandra: sql_transaction_commit_s() not supported for prepared statements"); + } + /* just a single query, send it */ driver_cassandra_sync_init(db); result = driver_cassandra_sync_query(db, - sql_statement_get_query(&ctx->stmt->stmt), + sql_statement_get_query(&stmt->stmt), ctx->query_type); driver_cassandra_sync_deinit(db); @@ -2218,12 +2250,7 @@ driver_cassandra_transaction_commit_s(struct sql_transaction_context *_ctx, struct cassandra_transaction_context *ctx = (struct cassandra_transaction_context *)_ctx; - if (ctx->stmt != NULL && ctx->stmt->prep != NULL) { - /* nothing should be using this - don't bother implementing */ - i_panic("cassandra: sql_transaction_commit_s() not supported for prepared statements"); - } - - if (ctx->stmt != NULL && !ctx->failed) + if (array_count(&ctx->statements) > 0 && !ctx->failed) driver_cassandra_try_commit_s(ctx); *error_r = t_strdup(ctx->error); @@ -2243,6 +2270,25 @@ driver_cassandra_transaction_rollback(struct sql_transaction_context *_ctx) driver_cassandra_transaction_unref(&ctx); } +static bool +driver_cassandra_update_query_type(struct cassandra_transaction_context *ctx, + const char *query) +{ + enum cassandra_query_type query_type; + + if (str_begins_icase_with(query, "DELETE ")) + query_type = CASSANDRA_QUERY_TYPE_DELETE; + else + query_type = CASSANDRA_QUERY_TYPE_WRITE; + if (array_count(&ctx->statements) > 0) { + transaction_set_failed(ctx, + "Multiple changes in transaction not supported"); + return FALSE; + } + ctx->query_type = query_type; + return TRUE; +} + static void driver_cassandra_update(struct sql_transaction_context *_ctx, const char *query, unsigned int *affected_rows) @@ -2252,18 +2298,13 @@ driver_cassandra_update(struct sql_transaction_context *_ctx, const char *query, i_assert(affected_rows == NULL); - if (ctx->stmt != NULL) { - transaction_set_failed(ctx, "Multiple changes in transaction not supported"); + if (!driver_cassandra_update_query_type(ctx, query)) return; - } - - if (str_begins_icase_with(query, "DELETE ")) - ctx->query_type = CASSANDRA_QUERY_TYPE_DELETE; - else - ctx->query_type = CASSANDRA_QUERY_TYPE_WRITE; struct sql_statement *_stmt = sql_statement_init(_ctx->db, query); - ctx->stmt = container_of(_stmt, struct cassandra_sql_statement, stmt); + struct cassandra_sql_statement *stmt = + container_of(_stmt, struct cassandra_sql_statement, stmt); + array_push_back(&ctx->statements, &stmt); } static const char * @@ -2717,18 +2758,11 @@ driver_cassandra_update_stmt(struct sql_transaction_context *_ctx, i_assert(affected_rows == NULL); - if (ctx->stmt != NULL) { - transaction_set_failed(ctx, - "Multiple changes in transaction not supported"); + if (!driver_cassandra_update_query_type(ctx, + sql_statement_get_query(_stmt))) return; - } - const char *query = sql_statement_get_query(_stmt); - if (str_begins_icase_with(query, "DELETE ")) - ctx->query_type = CASSANDRA_QUERY_TYPE_DELETE; - else - ctx->query_type = CASSANDRA_QUERY_TYPE_WRITE; - ctx->stmt = stmt; + array_push_back(&ctx->statements, &stmt); } static bool driver_cassandra_have_work(struct cassandra_db *db)