struct cassandra_result *result;
struct cassandra_transaction_context *pending_transaction;
};
+ARRAY_DEFINE_TYPE(cassandra_sql_statement, struct cassandra_sql_statement *);
struct cassandra_sql_prepared_statement {
struct sql_prepared_statement prep_stmt;
/* NULL, until the prepare is asynchronously finished */
const CassPrepared *prepared;
/* statements waiting for prepare to finish */
- ARRAY(struct cassandra_sql_statement *) pending_statements;
+ ARRAY_TYPE(cassandra_sql_statement) pending_statements;
/* an error here will cause the prepare to be retried on the next
execution attempt. */
char *error;
}
}
+static void cassandra_sql_statement_free(struct cassandra_sql_statement *stmt)
+{
+ cassandra_prepared_statement_remove_pending(stmt);
+ if (stmt->cass_stmt != NULL)
+ cass_statement_free(stmt->cass_stmt);
+ pool_unref(&stmt->stmt.pool);
+}
+
static void
driver_cassandra_transaction_unref(struct cassandra_transaction_context **_ctx)
{
if (--ctx->refcount > 0)
return;
- array_foreach_elem(&ctx->statements, stmt) {
- cassandra_prepared_statement_remove_pending(stmt);
- pool_unref(&stmt->stmt.pool);
- }
+ array_foreach_elem(&ctx->statements, stmt)
+ cassandra_sql_statement_free(stmt);
array_free(&ctx->statements);
event_unref(&ctx->ctx.event);
i_free(ctx->error);
cassandra_statement_send_query(struct cassandra_sql_statement *stmt)
{
stmt->result->statement = stmt->cass_stmt;
+ stmt->cass_stmt = NULL;
stmt->result->timestamp = stmt->timestamp;
(void)cassandra_result_connect_and_send_query(stmt->result);
}
} else if (stmt->result != NULL) {
stmt->result->error = i_strdup(stmt->prep->error);
result_finish(stmt->result);
- pool_unref(&stmt->stmt.pool);
+ cassandra_sql_statement_free(stmt);
} else {
- pool_unref(&stmt->stmt.pool);
+ cassandra_sql_statement_free(stmt);
}
return;
}
cassandra_transaction_finish(stmt->pending_transaction, NULL);
else if (stmt->result != NULL) {
cassandra_statement_send_query(stmt);
- pool_unref(&stmt->stmt.pool);
+ cassandra_sql_statement_free(stmt);
}
}
prepare_finish_pending_statements(struct cassandra_sql_prepared_statement *prep_stmt)
{
struct cassandra_sql_statement *stmt;
+ ARRAY_TYPE(cassandra_sql_statement) copy;
- array_foreach_elem(&prep_stmt->pending_statements, stmt)
- prepare_finish_statement(stmt);
+ /* clear pending_statements first before iterating, so any
+ cassandra_sql_statement_free() calls won't break the iteration */
+ t_array_init(©, array_count(&prep_stmt->pending_statements));
+ array_append_array(©, &prep_stmt->pending_statements);
array_clear(&prep_stmt->pending_statements);
+
+ array_foreach_elem(©, stmt)
+ prepare_finish_statement(stmt);
}
static void prepare_callback(CassFuture *future, void *context)
result_type, callback, context);
if (stmt->cass_stmt != NULL) {
stmt->result->statement = stmt->cass_stmt;
+ stmt->cass_stmt = NULL;
stmt->result->timestamp = stmt->timestamp;
} else if (stmt->prep != NULL) {
/* wait for prepare to finish */
}
}
(void)cassandra_result_connect_and_send_query(stmt->result);
- pool_unref(&_stmt->pool);
+ cassandra_sql_statement_free(stmt);
}
static struct sql_result *
i_assert(affected_rows == NULL);
if (!driver_cassandra_update_query_type(ctx,
- sql_statement_get_query(_stmt)))
+ sql_statement_get_query(_stmt))) {
+ cassandra_sql_statement_free(stmt);
return;
+ }
array_push_back(&ctx->statements, &stmt);
}