From: Timo Sirainen Date: Thu, 9 Nov 2023 13:04:10 +0000 (+0200) Subject: cassandra: Support multiple statements in a transaction using batches X-Git-Tag: 2.4.0~2445 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=992985a423f403764c36704c7acdf56e1a5b0675;p=thirdparty%2Fdovecot%2Fcore.git cassandra: Support multiple statements in a transaction using batches --- diff --git a/src/lib-sql/driver-cassandra.c b/src/lib-sql/driver-cassandra.c index 8bcbfff87c..acfba9f55e 100644 --- a/src/lib-sql/driver-cassandra.c +++ b/src/lib-sql/driver-cassandra.c @@ -78,12 +78,13 @@ static_assert_array_size(cassandra_query_type_names, enum cassandra_result_type { CASSANDRA_RESULT_TYPE_QUERY, CASSANDRA_RESULT_TYPE_PREPARED, + CASSANDRA_RESULT_TYPE_BATCH, CASSANDRA_RESULT_TYPE_COUNT, }; static const char *cassandra_result_type_prefixes[] = { - "", "prepared ", + "", "prepared ", "batch ", }; static_assert_array_size(cassandra_result_type_prefixes, CASSANDRA_RESULT_TYPE_COUNT); @@ -154,7 +155,10 @@ struct cassandra_result { struct sql_result api; enum cassandra_result_type type; + /* either batch or statement is non-NULL */ + CassBatch *batch; CassStatement *statement; + const CassResult *result; CassIterator *iterator; char *log_query; @@ -191,6 +195,7 @@ struct cassandra_transaction_context { bool begin_succeeded:1; bool begin_failed:1; bool failed:1; + bool query_type_set:1; }; enum cassandra_sql_arg_type { @@ -1254,6 +1259,8 @@ static void driver_cassandra_result_free(struct sql_result *_result) cass_result_free(result->result); if (result->iterator != NULL) cass_iterator_free(result->iterator); + if (result->batch != NULL) + cass_batch_free(result->batch); if (result->statement != NULL) cass_statement_free(result->statement); pool_unref(&result->row_pool); @@ -1492,8 +1499,13 @@ static void driver_cassandra_init_statement(struct cassandra_result *result) struct cassandra_db *db = container_of(result->api.db, struct cassandra_db, api); CassError rc; - rc = cass_statement_set_consistency(result->statement, - result->consistency); + if (result->batch != NULL) { + rc = cass_batch_set_consistency(result->batch, + result->consistency); + } else { + rc = cass_statement_set_consistency(result->statement, + result->consistency); + } if (rc != CASS_OK) { e_error(db->api.event, "Failed to set consistency %s for query '%s': %s", @@ -1503,9 +1515,12 @@ static void driver_cassandra_init_statement(struct cassandra_result *result) #ifdef HAVE_CASSANDRA_SPECULATIVE_POLICY - cass_statement_set_is_idempotent(result->statement, cass_true); + if (result->batch != NULL) + cass_batch_set_is_idempotent(result->batch, cass_true); + else + cass_statement_set_is_idempotent(result->statement, cass_true); #endif - if (db->page_size > 0) + if (db->page_size > 0 && result->batch == NULL) cass_statement_set_paging_size(result->statement, db->page_size); } @@ -1514,13 +1529,16 @@ static void driver_cassandra_result_send_query(struct cassandra_result *result) struct cassandra_db *db = container_of(result->api.db, struct cassandra_db, api); CassFuture *future; - i_assert(result->statement != NULL); + i_assert(result->batch != NULL || result->statement != NULL); db->counters[CASSANDRA_COUNTER_TYPE_QUERY_SENT]++; if (result->query_type != CASSANDRA_QUERY_TYPE_READ_MORE) driver_cassandra_init_statement(result); - future = cass_session_execute(db->session, result->statement); + if (result->batch != NULL) + future = cass_session_execute_batch(db->session, result->batch); + else + future = cass_session_execute(db->session, result->statement); driver_cassandra_set_callback(future, db, query_callback, result); } @@ -1913,6 +1931,8 @@ driver_cassandra_result_more(struct sql_result **_result, bool async, struct cassandra_result *old_result = (struct cassandra_result *)*_result; + i_assert(old_result->statement != NULL); + /* Initialize the next page as a new sql_result */ new_result = driver_cassandra_result_init(db, old_result->log_query, CASSANDRA_QUERY_TYPE_READ_MORE, @@ -2160,11 +2180,13 @@ static void cassandra_transaction_finish(struct cassandra_transaction_context *c if (log_query == NULL) log_query = sql_statement_get_log_query(&stmt->stmt); else - i_unreached(); /* only single statement is possible now */ + log_query = ""; } enum cassandra_result_type result_type; - if (have_nonprepared) + if (array_count(&ctx->statements) > 1) + result_type = CASSANDRA_RESULT_TYPE_BATCH; + else if (have_nonprepared) result_type = CASSANDRA_RESULT_TYPE_QUERY; else result_type = CASSANDRA_RESULT_TYPE_PREPARED; @@ -2174,6 +2196,18 @@ static void cassandra_transaction_finish(struct cassandra_transaction_context *c ctx->query_type, result_type, transaction_commit_callback, ctx); switch (result_type) { + case CASSANDRA_RESULT_TYPE_BATCH: { + struct cassandra_sql_statement *stmt; + cass_result->batch = cass_batch_new(CASS_BATCH_TYPE_LOGGED); + array_foreach_elem(&ctx->statements, stmt) { + cass_batch_add_statement(cass_result->batch, + stmt->cass_stmt); + cass_statement_free(stmt->cass_stmt); + stmt->cass_stmt = NULL; + } + (void)cassandra_result_connect_and_send_query(cass_result); + break; + } case CASSANDRA_RESULT_TYPE_QUERY: case CASSANDRA_RESULT_TYPE_PREPARED: { struct cassandra_sql_statement *stmt = @@ -2280,12 +2314,14 @@ driver_cassandra_update_query_type(struct cassandra_transaction_context *ctx, query_type = CASSANDRA_QUERY_TYPE_DELETE; else query_type = CASSANDRA_QUERY_TYPE_WRITE; - if (array_count(&ctx->statements) > 0) { - transaction_set_failed(ctx, - "Multiple changes in transaction not supported"); + if (ctx->query_type_set && ctx->query_type != query_type) { + transaction_set_failed(ctx, t_strdup_printf( + "Mixed types of queries in transaction not supported " + "(%d -> %d)", ctx->query_type, query_type)); return FALSE; } ctx->query_type = query_type; + ctx->query_type_set = TRUE; return TRUE; }