cass_int64_t timestamp;
struct cassandra_result *result;
+ struct cassandra_transaction_context *pending_transaction;
};
struct cassandra_sql_prepared_statement {
return &ctx->ctx;
}
+static void
+cassandra_prepared_statement_remove_pending(struct cassandra_sql_statement *stmt)
+{
+ struct cassandra_sql_prepared_statement *prep = stmt->prep;
+ struct cassandra_sql_statement *const *iter_stmt;
+
+ if (prep == NULL)
+ return;
+
+ array_foreach(&prep->pending_statements, iter_stmt) {
+ if (stmt == *iter_stmt) {
+ array_delete(&prep->pending_statements,
+ array_foreach_idx(&prep->pending_statements, iter_stmt), 1);
+ break;
+ }
+ }
+}
+
static void
driver_cassandra_transaction_unref(struct cassandra_transaction_context **_ctx)
{
if (--ctx->refcount > 0)
return;
+ if (ctx->stmt != NULL) {
+ cassandra_prepared_statement_remove_pending(ctx->stmt);
+ pool_unref(&ctx->stmt->stmt.pool);
+ }
event_unref(&ctx->ctx.event);
i_free(ctx->error);
i_free(ctx);
}
static void
-cassandra_statement_send_query(struct cassandra_sql_statement **_stmt)
+cassandra_statement_send_query(struct cassandra_sql_statement *stmt)
{
- struct cassandra_sql_statement *stmt = *_stmt;
-
- *_stmt = NULL;
stmt->result->statement = stmt->cass_stmt;
stmt->result->timestamp = stmt->timestamp;
(void)cassandra_result_connect_and_send_query(stmt->result);
- pool_unref(&stmt->stmt.pool);
}
-static void
-driver_cassandra_transaction_commit(struct sql_transaction_context *_ctx,
- sql_commit_callback_t *callback, void *context)
+static void cassandra_transaction_finish(struct cassandra_transaction_context *ctx,
+ const char *error)
{
- struct cassandra_transaction_context *ctx =
- (struct cassandra_transaction_context *)_ctx;
- struct cassandra_db *db = container_of(_ctx->db, struct cassandra_db, api);
- struct sql_commit_result result;
-
- i_zero(&result);
- ctx->callback = callback;
- ctx->context = context;
-
- if (ctx->failed || ctx->stmt == NULL) {
- if (ctx->failed)
- result.error = ctx->error;
+ struct cassandra_db *db =
+ container_of(ctx->ctx.db, struct cassandra_db, api);
- e_debug(sql_transaction_finished_event(_ctx)->
- add_str("error", "Rolled back")->event(),
- "Transaction rolled back");
- callback(&result, context);
+ if (error != NULL) {
+ struct sql_commit_result result = {
+ .error = error,
+ };
+ e_error(sql_transaction_finished_event(&ctx->ctx)->
+ add_str("error", error)->event(),
+ "Transaction failed: %s", error);
+ ctx->callback(&result, ctx->context);
driver_cassandra_transaction_unref(&ctx);
return;
}
return;
}
}
- cassandra_statement_send_query(&ctx->stmt);
+ cassandra_statement_send_query(ctx->stmt);
+}
+
+static void
+driver_cassandra_transaction_commit(struct sql_transaction_context *_ctx,
+ sql_commit_callback_t *callback, void *context)
+{
+ struct cassandra_transaction_context *ctx =
+ (struct cassandra_transaction_context *)_ctx;
+ struct sql_commit_result result;
+
+ i_zero(&result);
+ ctx->callback = callback;
+ ctx->context = context;
+
+ if (ctx->failed || ctx->stmt == NULL) {
+ if (ctx->failed)
+ result.error = ctx->error;
+
+ e_debug(sql_transaction_finished_event(_ctx)->
+ add_str("error", "Rolled back")->event(),
+ "Transaction rolled back");
+ callback(&result, context);
+ driver_cassandra_transaction_unref(&ctx);
+ return;
+ }
+
+ i_assert(ctx->stmt->pending_transaction == NULL);
+ if (ctx->stmt->prep != NULL && ctx->stmt->cass_stmt == NULL) {
+ /* wait for prepare to finish */
+ ctx->stmt->pending_transaction = ctx;
+ return;
+ }
+ cassandra_transaction_finish(ctx, NULL);
}
static void
if (stmt->prep->prepared == NULL) {
i_assert(stmt->prep->error != NULL);
- if (stmt->result != NULL) {
+ if (stmt->pending_transaction != NULL) {
+ const char *error = t_strdup_printf(
+ "Failed to prepare statement '%s': %s",
+ sql_statement_get_log_query(&stmt->stmt),
+ stmt->prep->error);
+ cassandra_transaction_finish(stmt->pending_transaction,
+ error);
+ } else if (stmt->result != NULL) {
stmt->result->error = i_strdup(stmt->prep->error);
result_finish(stmt->result);
+ pool_unref(&stmt->stmt.pool);
+ } else {
+ pool_unref(&stmt->stmt.pool);
}
- pool_unref(&stmt->stmt.pool);
return;
}
stmt->cass_stmt = cass_prepared_bind(stmt->prep->prepared);
array_foreach(&stmt->pending_args, arg)
prepare_finish_arg(stmt, arg);
}
- if (stmt->result != NULL)
- cassandra_statement_send_query(&stmt);
+ if (stmt->pending_transaction != NULL)
+ cassandra_transaction_finish(stmt->pending_transaction, NULL);
+ else if (stmt->result != NULL) {
+ cassandra_statement_send_query(stmt);
+ pool_unref(&stmt->stmt.pool);
+ }
}
static void