]> git.ipfire.org Git - thirdparty/dovecot/core.git/commitdiff
cassandra: Support multiple statements in a transaction using batches
authorTimo Sirainen <timo.sirainen@open-xchange.com>
Thu, 9 Nov 2023 13:04:10 +0000 (15:04 +0200)
committeraki.tuomi <aki.tuomi@open-xchange.com>
Fri, 10 Nov 2023 16:48:25 +0000 (16:48 +0000)
src/lib-sql/driver-cassandra.c

index 8bcbfff87cbc9b7f0d4480b9a31b353cbfc3a201..acfba9f55edbb8dc2faba696d73300a83b9018fd 100644 (file)
@@ -78,12 +78,13 @@ static_assert_array_size(cassandra_query_type_names,
 enum cassandra_result_type {
        CASSANDRA_RESULT_TYPE_QUERY,
        CASSANDRA_RESULT_TYPE_PREPARED,
+       CASSANDRA_RESULT_TYPE_BATCH,
 
        CASSANDRA_RESULT_TYPE_COUNT,
 };
 
 static const char *cassandra_result_type_prefixes[] = {
-       "", "prepared ",
+       "", "prepared ", "batch ",
 };
 static_assert_array_size(cassandra_result_type_prefixes,
                         CASSANDRA_RESULT_TYPE_COUNT);
@@ -154,7 +155,10 @@ struct cassandra_result {
        struct sql_result api;
        enum cassandra_result_type type;
 
+       /* either batch or statement is non-NULL */
+       CassBatch *batch;
        CassStatement *statement;
+
        const CassResult *result;
        CassIterator *iterator;
        char *log_query;
@@ -191,6 +195,7 @@ struct cassandra_transaction_context {
        bool begin_succeeded:1;
        bool begin_failed:1;
        bool failed:1;
+       bool query_type_set:1;
 };
 
 enum cassandra_sql_arg_type {
@@ -1254,6 +1259,8 @@ static void driver_cassandra_result_free(struct sql_result *_result)
                cass_result_free(result->result);
        if (result->iterator != NULL)
                cass_iterator_free(result->iterator);
+       if (result->batch != NULL)
+               cass_batch_free(result->batch);
        if (result->statement != NULL)
                cass_statement_free(result->statement);
        pool_unref(&result->row_pool);
@@ -1492,8 +1499,13 @@ static void driver_cassandra_init_statement(struct cassandra_result *result)
        struct cassandra_db *db = container_of(result->api.db, struct cassandra_db, api);
        CassError rc;
 
-       rc = cass_statement_set_consistency(result->statement,
-                                           result->consistency);
+       if (result->batch != NULL) {
+               rc = cass_batch_set_consistency(result->batch,
+                                               result->consistency);
+       } else {
+               rc = cass_statement_set_consistency(result->statement,
+                                                   result->consistency);
+       }
        if (rc != CASS_OK) {
                e_error(db->api.event,
                        "Failed to set consistency %s for query '%s': %s",
@@ -1503,9 +1515,12 @@ static void driver_cassandra_init_statement(struct cassandra_result *result)
 
 
 #ifdef HAVE_CASSANDRA_SPECULATIVE_POLICY
-       cass_statement_set_is_idempotent(result->statement, cass_true);
+       if (result->batch != NULL)
+               cass_batch_set_is_idempotent(result->batch, cass_true);
+       else
+               cass_statement_set_is_idempotent(result->statement, cass_true);
 #endif
-       if (db->page_size > 0)
+       if (db->page_size > 0 && result->batch == NULL)
                cass_statement_set_paging_size(result->statement, db->page_size);
 }
 
@@ -1514,13 +1529,16 @@ static void driver_cassandra_result_send_query(struct cassandra_result *result)
        struct cassandra_db *db = container_of(result->api.db, struct cassandra_db, api);
        CassFuture *future;
 
-       i_assert(result->statement != NULL);
+       i_assert(result->batch != NULL || result->statement != NULL);
 
        db->counters[CASSANDRA_COUNTER_TYPE_QUERY_SENT]++;
        if (result->query_type != CASSANDRA_QUERY_TYPE_READ_MORE)
                driver_cassandra_init_statement(result);
 
-       future = cass_session_execute(db->session, result->statement);
+       if (result->batch != NULL)
+               future = cass_session_execute_batch(db->session, result->batch);
+       else
+               future = cass_session_execute(db->session, result->statement);
        driver_cassandra_set_callback(future, db, query_callback, result);
 }
 
@@ -1913,6 +1931,8 @@ driver_cassandra_result_more(struct sql_result **_result, bool async,
        struct cassandra_result *old_result =
                (struct cassandra_result *)*_result;
 
+       i_assert(old_result->statement != NULL);
+
        /* Initialize the next page as a new sql_result */
        new_result = driver_cassandra_result_init(db, old_result->log_query,
                                                 CASSANDRA_QUERY_TYPE_READ_MORE,
@@ -2160,11 +2180,13 @@ static void cassandra_transaction_finish(struct cassandra_transaction_context *c
                if (log_query == NULL)
                        log_query = sql_statement_get_log_query(&stmt->stmt);
                else
-                       i_unreached(); /* only single statement is possible now */
+                       log_query = "<batch query>";
        }
 
        enum cassandra_result_type result_type;
-       if (have_nonprepared)
+       if (array_count(&ctx->statements) > 1)
+               result_type = CASSANDRA_RESULT_TYPE_BATCH;
+       else if (have_nonprepared)
                result_type = CASSANDRA_RESULT_TYPE_QUERY;
        else
                result_type = CASSANDRA_RESULT_TYPE_PREPARED;
@@ -2174,6 +2196,18 @@ static void cassandra_transaction_finish(struct cassandra_transaction_context *c
                        ctx->query_type, result_type,
                        transaction_commit_callback, ctx);
        switch (result_type) {
+       case CASSANDRA_RESULT_TYPE_BATCH: {
+               struct cassandra_sql_statement *stmt;
+               cass_result->batch = cass_batch_new(CASS_BATCH_TYPE_LOGGED);
+               array_foreach_elem(&ctx->statements, stmt) {
+                       cass_batch_add_statement(cass_result->batch,
+                                                stmt->cass_stmt);
+                       cass_statement_free(stmt->cass_stmt);
+                       stmt->cass_stmt = NULL;
+               }
+               (void)cassandra_result_connect_and_send_query(cass_result);
+               break;
+       }
        case CASSANDRA_RESULT_TYPE_QUERY:
        case CASSANDRA_RESULT_TYPE_PREPARED: {
                struct cassandra_sql_statement *stmt =
@@ -2280,12 +2314,14 @@ driver_cassandra_update_query_type(struct cassandra_transaction_context *ctx,
                query_type = CASSANDRA_QUERY_TYPE_DELETE;
        else
                query_type = CASSANDRA_QUERY_TYPE_WRITE;
-       if (array_count(&ctx->statements) > 0) {
-               transaction_set_failed(ctx,
-                       "Multiple changes in transaction not supported");
+       if (ctx->query_type_set && ctx->query_type != query_type) {
+               transaction_set_failed(ctx, t_strdup_printf(
+                       "Mixed types of queries in transaction not supported "
+                       "(%d -> %d)", ctx->query_type, query_type));
                return FALSE;
        }
        ctx->query_type = query_type;
+       ctx->query_type_set = TRUE;
        return TRUE;
 }