sql_commit_callback_t *callback;
void *context;
- struct cassandra_sql_statement *stmt;
+ ARRAY(struct cassandra_sql_statement *) statements;
enum cassandra_query_type query_type;
char *error;
ctx->ctx.db = db;
ctx->ctx.event = event_create(db->event);
ctx->refcount = 1;
+ i_array_init(&ctx->statements, 8);
return &ctx->ctx;
}
driver_cassandra_transaction_unref(struct cassandra_transaction_context **_ctx)
{
struct cassandra_transaction_context *ctx = *_ctx;
+ struct cassandra_sql_statement *stmt;
*_ctx = NULL;
i_assert(ctx->refcount > 0);
if (--ctx->refcount > 0)
return;
- if (ctx->stmt != NULL) {
- cassandra_prepared_statement_remove_pending(ctx->stmt);
- pool_unref(&ctx->stmt->stmt.pool);
+ array_foreach_elem(&ctx->statements, stmt) {
+ cassandra_prepared_statement_remove_pending(stmt);
+ pool_unref(&stmt->stmt.pool);
}
+ array_free(&ctx->statements);
event_unref(&ctx->ctx.event);
i_free(ctx->error);
i_free(ctx);
return;
}
- /* just a single query, send it */
- enum cassandra_result_type result_type =
- ctx->stmt->prep != NULL ? CASSANDRA_RESULT_TYPE_PREPARED :
- CASSANDRA_RESULT_TYPE_QUERY;
+ struct cassandra_sql_statement *stmt;
+ bool have_nonprepared = FALSE;
+ const char *log_query = NULL;
+ array_foreach_elem(&ctx->statements, stmt) {
+ if (stmt->prep != NULL && stmt->cass_stmt == NULL) {
+ /* wait for prepare to finish in
+ prepare_finish_statement() */
+ stmt->pending_transaction = ctx;
+ return;
+ }
+
+ if (stmt->prep == NULL)
+ have_nonprepared = TRUE;
+ if (stmt->cass_stmt == NULL) {
+ stmt->cass_stmt = cass_statement_new(
+ sql_statement_get_query(&stmt->stmt), 0);
+ if (stmt->timestamp != 0) {
+ cass_statement_set_timestamp(stmt->cass_stmt,
+ stmt->timestamp);
+ }
+ }
+ if (log_query == NULL)
+ log_query = sql_statement_get_log_query(&stmt->stmt);
+ else
+ i_unreached(); /* only single statement is possible now */
+ }
+
+ enum cassandra_result_type result_type;
+ if (have_nonprepared)
+ result_type = CASSANDRA_RESULT_TYPE_QUERY;
+ else
+ result_type = CASSANDRA_RESULT_TYPE_PREPARED;
+
struct cassandra_result *cass_result =
- driver_cassandra_result_init(db,
- sql_statement_get_log_query(&ctx->stmt->stmt),
+ driver_cassandra_result_init(db, log_query,
ctx->query_type, result_type,
transaction_commit_callback, ctx);
- if (ctx->stmt->prep == NULL) {
- ctx->stmt->cass_stmt =
- cass_statement_new(sql_statement_get_query(&ctx->stmt->stmt), 0);
- if (ctx->stmt->timestamp != 0) {
- cass_statement_set_timestamp(ctx->stmt->cass_stmt,
- ctx->stmt->timestamp);
- }
- } else {
- ctx->stmt->result = cass_result;
- if (ctx->stmt->cass_stmt == NULL) {
- /* wait for prepare to finish */
- return;
- }
+ switch (result_type) {
+ case CASSANDRA_RESULT_TYPE_QUERY:
+ case CASSANDRA_RESULT_TYPE_PREPARED: {
+ struct cassandra_sql_statement *stmt =
+ array_idx_elem(&ctx->statements, 0);
+ stmt->result = cass_result;
+ cassandra_statement_send_query(stmt);
+ break;
+ }
+ case CASSANDRA_RESULT_TYPE_COUNT:
+ i_unreached();
}
- cassandra_statement_send_query(ctx->stmt);
}
static void
ctx->callback = callback;
ctx->context = context;
- if (ctx->failed || ctx->stmt == NULL) {
+ if (ctx->failed || array_count(&ctx->statements) == 0) {
if (ctx->failed)
result.error = ctx->error;
return;
}
- i_assert(ctx->stmt->pending_transaction == NULL);
- if (ctx->stmt->prep != NULL && ctx->stmt->cass_stmt == NULL) {
- /* wait for prepare to finish */
- ctx->stmt->pending_transaction = ctx;
- return;
- }
cassandra_transaction_finish(ctx, NULL);
}
struct cassandra_db *db = container_of(_ctx->db, struct cassandra_db, api);
struct sql_result *result = NULL;
+ /* simplify the code for now: these aren't currently needed */
+ if (array_count(&ctx->statements) != 1)
+ i_panic("cassandra: sql_transaction_commit_s() not supported for multiple statements");
+ struct cassandra_sql_statement *stmt =
+ array_idx_elem(&ctx->statements, 0);
+ if (stmt->prep != NULL) {
+ /* nothing should be using this - don't bother implementing */
+ i_panic("cassandra: sql_transaction_commit_s() not supported for prepared statements");
+ }
+
/* just a single query, send it */
driver_cassandra_sync_init(db);
result = driver_cassandra_sync_query(db,
- sql_statement_get_query(&ctx->stmt->stmt),
+ sql_statement_get_query(&stmt->stmt),
ctx->query_type);
driver_cassandra_sync_deinit(db);
struct cassandra_transaction_context *ctx =
(struct cassandra_transaction_context *)_ctx;
- if (ctx->stmt != NULL && ctx->stmt->prep != NULL) {
- /* nothing should be using this - don't bother implementing */
- i_panic("cassandra: sql_transaction_commit_s() not supported for prepared statements");
- }
-
- if (ctx->stmt != NULL && !ctx->failed)
+ if (array_count(&ctx->statements) > 0 && !ctx->failed)
driver_cassandra_try_commit_s(ctx);
*error_r = t_strdup(ctx->error);
driver_cassandra_transaction_unref(&ctx);
}
+static bool
+driver_cassandra_update_query_type(struct cassandra_transaction_context *ctx,
+ const char *query)
+{
+ enum cassandra_query_type query_type;
+
+ if (str_begins_icase_with(query, "DELETE "))
+ query_type = CASSANDRA_QUERY_TYPE_DELETE;
+ else
+ query_type = CASSANDRA_QUERY_TYPE_WRITE;
+ if (array_count(&ctx->statements) > 0) {
+ transaction_set_failed(ctx,
+ "Multiple changes in transaction not supported");
+ return FALSE;
+ }
+ ctx->query_type = query_type;
+ return TRUE;
+}
+
static void
driver_cassandra_update(struct sql_transaction_context *_ctx, const char *query,
unsigned int *affected_rows)
i_assert(affected_rows == NULL);
- if (ctx->stmt != NULL) {
- transaction_set_failed(ctx, "Multiple changes in transaction not supported");
+ if (!driver_cassandra_update_query_type(ctx, query))
return;
- }
-
- if (str_begins_icase_with(query, "DELETE "))
- ctx->query_type = CASSANDRA_QUERY_TYPE_DELETE;
- else
- ctx->query_type = CASSANDRA_QUERY_TYPE_WRITE;
struct sql_statement *_stmt = sql_statement_init(_ctx->db, query);
- ctx->stmt = container_of(_stmt, struct cassandra_sql_statement, stmt);
+ struct cassandra_sql_statement *stmt =
+ container_of(_stmt, struct cassandra_sql_statement, stmt);
+ array_push_back(&ctx->statements, &stmt);
}
static const char *
i_assert(affected_rows == NULL);
- if (ctx->stmt != NULL) {
- transaction_set_failed(ctx,
- "Multiple changes in transaction not supported");
+ if (!driver_cassandra_update_query_type(ctx,
+ sql_statement_get_query(_stmt)))
return;
- }
- const char *query = sql_statement_get_query(_stmt);
- if (str_begins_icase_with(query, "DELETE "))
- ctx->query_type = CASSANDRA_QUERY_TYPE_DELETE;
- else
- ctx->query_type = CASSANDRA_QUERY_TYPE_WRITE;
- ctx->stmt = stmt;
+ array_push_back(&ctx->statements, &stmt);
}
static bool driver_cassandra_have_work(struct cassandra_db *db)