]> git.ipfire.org Git - thirdparty/dovecot/core.git/commitdiff
cassandra: Prepare support for multiple statements in transaction
authorTimo Sirainen <timo.sirainen@open-xchange.com>
Thu, 9 Nov 2023 20:24:16 +0000 (22:24 +0200)
committeraki.tuomi <aki.tuomi@open-xchange.com>
Fri, 10 Nov 2023 16:48:25 +0000 (16:48 +0000)
Prepare the code for an array of statements, but don't actually support more
than one yet.

src/lib-sql/driver-cassandra.c

index 39b39cf65c338339258ef17ed76ec3cac064b470..8bcbfff87cbc9b7f0d4480b9a31b353cbfc3a201 100644 (file)
@@ -184,7 +184,7 @@ struct cassandra_transaction_context {
        sql_commit_callback_t *callback;
        void *context;
 
-       struct cassandra_sql_statement *stmt;
+       ARRAY(struct cassandra_sql_statement *) statements;
        enum cassandra_query_type query_type;
        char *error;
 
@@ -2033,6 +2033,7 @@ driver_cassandra_transaction_begin(struct sql_db *db)
        ctx->ctx.db = db;
        ctx->ctx.event = event_create(db->event);
        ctx->refcount = 1;
+       i_array_init(&ctx->statements, 8);
        return &ctx->ctx;
 }
 
@@ -2058,16 +2059,18 @@ static void
 driver_cassandra_transaction_unref(struct cassandra_transaction_context **_ctx)
 {
        struct cassandra_transaction_context *ctx = *_ctx;
+       struct cassandra_sql_statement *stmt;
 
        *_ctx = NULL;
        i_assert(ctx->refcount > 0);
        if (--ctx->refcount > 0)
                return;
 
-       if (ctx->stmt != NULL) {
-               cassandra_prepared_statement_remove_pending(ctx->stmt);
-               pool_unref(&ctx->stmt->stmt.pool);
+       array_foreach_elem(&ctx->statements, stmt) {
+               cassandra_prepared_statement_remove_pending(stmt);
+               pool_unref(&stmt->stmt.pool);
        }
+       array_free(&ctx->statements);
        event_unref(&ctx->ctx.event);
        i_free(ctx->error);
        i_free(ctx);
@@ -2133,30 +2136,55 @@ static void cassandra_transaction_finish(struct cassandra_transaction_context *c
                return;
        }
 
-       /* just a single query, send it */
-       enum cassandra_result_type result_type =
-               ctx->stmt->prep != NULL ? CASSANDRA_RESULT_TYPE_PREPARED :
-               CASSANDRA_RESULT_TYPE_QUERY;
+       struct cassandra_sql_statement *stmt;
+       bool have_nonprepared = FALSE;
+       const char *log_query = NULL;
+       array_foreach_elem(&ctx->statements, stmt) {
+               if (stmt->prep != NULL && stmt->cass_stmt == NULL) {
+                       /* wait for prepare to finish in
+                          prepare_finish_statement() */
+                       stmt->pending_transaction = ctx;
+                       return;
+               }
+
+               if (stmt->prep == NULL)
+                       have_nonprepared = TRUE;
+               if (stmt->cass_stmt == NULL) {
+                       stmt->cass_stmt = cass_statement_new(
+                               sql_statement_get_query(&stmt->stmt), 0);
+                       if (stmt->timestamp != 0) {
+                               cass_statement_set_timestamp(stmt->cass_stmt,
+                                                            stmt->timestamp);
+                       }
+               }
+               if (log_query == NULL)
+                       log_query = sql_statement_get_log_query(&stmt->stmt);
+               else
+                       i_unreached(); /* only single statement is possible now */
+       }
+
+       enum cassandra_result_type result_type;
+       if (have_nonprepared)
+               result_type = CASSANDRA_RESULT_TYPE_QUERY;
+       else
+               result_type = CASSANDRA_RESULT_TYPE_PREPARED;
+
        struct cassandra_result *cass_result =
-               driver_cassandra_result_init(db,
-                       sql_statement_get_log_query(&ctx->stmt->stmt),
+               driver_cassandra_result_init(db, log_query,
                        ctx->query_type, result_type,
                        transaction_commit_callback, ctx);
-       if (ctx->stmt->prep == NULL) {
-               ctx->stmt->cass_stmt =
-                       cass_statement_new(sql_statement_get_query(&ctx->stmt->stmt), 0);
-               if (ctx->stmt->timestamp != 0) {
-                       cass_statement_set_timestamp(ctx->stmt->cass_stmt,
-                               ctx->stmt->timestamp);
-               }
-       } else {
-               ctx->stmt->result = cass_result;
-               if (ctx->stmt->cass_stmt == NULL) {
-                       /* wait for prepare to finish */
-                       return;
-               }
+       switch (result_type) {
+       case CASSANDRA_RESULT_TYPE_QUERY:
+       case CASSANDRA_RESULT_TYPE_PREPARED: {
+               struct cassandra_sql_statement *stmt =
+                       array_idx_elem(&ctx->statements, 0);
+               stmt->result = cass_result;
+               cassandra_statement_send_query(stmt);
+               break;
+       }
+       case CASSANDRA_RESULT_TYPE_COUNT:
+               i_unreached();
        }
-       cassandra_statement_send_query(ctx->stmt);
 }
 
 static void
@@ -2171,7 +2199,7 @@ driver_cassandra_transaction_commit(struct sql_transaction_context *_ctx,
        ctx->callback = callback;
        ctx->context = context;
 
-       if (ctx->failed || ctx->stmt == NULL) {
+       if (ctx->failed || array_count(&ctx->statements) == 0) {
                if (ctx->failed)
                        result.error = ctx->error;
 
@@ -2183,12 +2211,6 @@ driver_cassandra_transaction_commit(struct sql_transaction_context *_ctx,
                return;
        }
 
-       i_assert(ctx->stmt->pending_transaction == NULL);
-       if (ctx->stmt->prep != NULL && ctx->stmt->cass_stmt == NULL) {
-               /* wait for prepare to finish */
-               ctx->stmt->pending_transaction = ctx;
-               return;
-       }
        cassandra_transaction_finish(ctx, NULL);
 }
 
@@ -2199,10 +2221,20 @@ driver_cassandra_try_commit_s(struct cassandra_transaction_context *ctx)
        struct cassandra_db *db = container_of(_ctx->db, struct cassandra_db, api);
        struct sql_result *result = NULL;
 
+       /* simplify the code for now: these aren't currently needed */
+       if (array_count(&ctx->statements) != 1)
+               i_panic("cassandra: sql_transaction_commit_s() not supported for multiple statements");
+       struct cassandra_sql_statement *stmt =
+               array_idx_elem(&ctx->statements, 0);
+       if (stmt->prep != NULL) {
+               /* nothing should be using this - don't bother implementing */
+               i_panic("cassandra: sql_transaction_commit_s() not supported for prepared statements");
+       }
+
        /* just a single query, send it */
        driver_cassandra_sync_init(db);
        result = driver_cassandra_sync_query(db,
-                       sql_statement_get_query(&ctx->stmt->stmt),
+                       sql_statement_get_query(&stmt->stmt),
                        ctx->query_type);
        driver_cassandra_sync_deinit(db);
 
@@ -2218,12 +2250,7 @@ driver_cassandra_transaction_commit_s(struct sql_transaction_context *_ctx,
        struct cassandra_transaction_context *ctx =
                (struct cassandra_transaction_context *)_ctx;
 
-       if (ctx->stmt != NULL && ctx->stmt->prep != NULL) {
-               /* nothing should be using this - don't bother implementing */
-               i_panic("cassandra: sql_transaction_commit_s() not supported for prepared statements");
-       }
-
-       if (ctx->stmt != NULL && !ctx->failed)
+       if (array_count(&ctx->statements) > 0 && !ctx->failed)
                driver_cassandra_try_commit_s(ctx);
        *error_r = t_strdup(ctx->error);
 
@@ -2243,6 +2270,25 @@ driver_cassandra_transaction_rollback(struct sql_transaction_context *_ctx)
        driver_cassandra_transaction_unref(&ctx);
 }
 
+static bool
+driver_cassandra_update_query_type(struct cassandra_transaction_context *ctx,
+                                  const char *query)
+{
+       enum cassandra_query_type query_type;
+
+       if (str_begins_icase_with(query, "DELETE "))
+               query_type = CASSANDRA_QUERY_TYPE_DELETE;
+       else
+               query_type = CASSANDRA_QUERY_TYPE_WRITE;
+       if (array_count(&ctx->statements) > 0) {
+               transaction_set_failed(ctx,
+                       "Multiple changes in transaction not supported");
+               return FALSE;
+       }
+       ctx->query_type = query_type;
+       return TRUE;
+}
+
 static void
 driver_cassandra_update(struct sql_transaction_context *_ctx, const char *query,
                        unsigned int *affected_rows)
@@ -2252,18 +2298,13 @@ driver_cassandra_update(struct sql_transaction_context *_ctx, const char *query,
 
        i_assert(affected_rows == NULL);
 
-       if (ctx->stmt != NULL) {
-               transaction_set_failed(ctx, "Multiple changes in transaction not supported");
+       if (!driver_cassandra_update_query_type(ctx, query))
                return;
-       }
-
-       if (str_begins_icase_with(query, "DELETE "))
-               ctx->query_type = CASSANDRA_QUERY_TYPE_DELETE;
-       else
-               ctx->query_type = CASSANDRA_QUERY_TYPE_WRITE;
 
        struct sql_statement *_stmt = sql_statement_init(_ctx->db, query);
-       ctx->stmt = container_of(_stmt, struct cassandra_sql_statement, stmt);
+       struct cassandra_sql_statement *stmt =
+               container_of(_stmt, struct cassandra_sql_statement, stmt);
+       array_push_back(&ctx->statements, &stmt);
 }
 
 static const char *
@@ -2717,18 +2758,11 @@ driver_cassandra_update_stmt(struct sql_transaction_context *_ctx,
 
        i_assert(affected_rows == NULL);
 
-       if (ctx->stmt != NULL) {
-               transaction_set_failed(ctx,
-                       "Multiple changes in transaction not supported");
+       if (!driver_cassandra_update_query_type(ctx,
+                       sql_statement_get_query(_stmt)))
                return;
-       }
 
-       const char *query = sql_statement_get_query(_stmt);
-       if (str_begins_icase_with(query, "DELETE "))
-               ctx->query_type = CASSANDRA_QUERY_TYPE_DELETE;
-       else
-               ctx->query_type = CASSANDRA_QUERY_TYPE_WRITE;
-       ctx->stmt = stmt;
+       array_push_back(&ctx->statements, &stmt);
 }
 
 static bool driver_cassandra_have_work(struct cassandra_db *db)