enum cassandra_result_type {
CASSANDRA_RESULT_TYPE_QUERY,
CASSANDRA_RESULT_TYPE_PREPARED,
+ CASSANDRA_RESULT_TYPE_BATCH,
CASSANDRA_RESULT_TYPE_COUNT,
};
static const char *cassandra_result_type_prefixes[] = {
- "", "prepared ",
+ "", "prepared ", "batch ",
};
static_assert_array_size(cassandra_result_type_prefixes,
CASSANDRA_RESULT_TYPE_COUNT);
struct sql_result api;
enum cassandra_result_type type;
+ /* either batch or statement is non-NULL */
+ CassBatch *batch;
CassStatement *statement;
+
const CassResult *result;
CassIterator *iterator;
char *log_query;
bool begin_succeeded:1;
bool begin_failed:1;
bool failed:1;
+ bool query_type_set:1;
};
enum cassandra_sql_arg_type {
cass_result_free(result->result);
if (result->iterator != NULL)
cass_iterator_free(result->iterator);
+ if (result->batch != NULL)
+ cass_batch_free(result->batch);
if (result->statement != NULL)
cass_statement_free(result->statement);
pool_unref(&result->row_pool);
struct cassandra_db *db = container_of(result->api.db, struct cassandra_db, api);
CassError rc;
- rc = cass_statement_set_consistency(result->statement,
- result->consistency);
+ if (result->batch != NULL) {
+ rc = cass_batch_set_consistency(result->batch,
+ result->consistency);
+ } else {
+ rc = cass_statement_set_consistency(result->statement,
+ result->consistency);
+ }
if (rc != CASS_OK) {
e_error(db->api.event,
"Failed to set consistency %s for query '%s': %s",
#ifdef HAVE_CASSANDRA_SPECULATIVE_POLICY
- cass_statement_set_is_idempotent(result->statement, cass_true);
+ if (result->batch != NULL)
+ cass_batch_set_is_idempotent(result->batch, cass_true);
+ else
+ cass_statement_set_is_idempotent(result->statement, cass_true);
#endif
- if (db->page_size > 0)
+ if (db->page_size > 0 && result->batch == NULL)
cass_statement_set_paging_size(result->statement, db->page_size);
}
struct cassandra_db *db = container_of(result->api.db, struct cassandra_db, api);
CassFuture *future;
- i_assert(result->statement != NULL);
+ i_assert(result->batch != NULL || result->statement != NULL);
db->counters[CASSANDRA_COUNTER_TYPE_QUERY_SENT]++;
if (result->query_type != CASSANDRA_QUERY_TYPE_READ_MORE)
driver_cassandra_init_statement(result);
- future = cass_session_execute(db->session, result->statement);
+ if (result->batch != NULL)
+ future = cass_session_execute_batch(db->session, result->batch);
+ else
+ future = cass_session_execute(db->session, result->statement);
driver_cassandra_set_callback(future, db, query_callback, result);
}
struct cassandra_result *old_result =
(struct cassandra_result *)*_result;
+ i_assert(old_result->statement != NULL);
+
/* Initialize the next page as a new sql_result */
new_result = driver_cassandra_result_init(db, old_result->log_query,
CASSANDRA_QUERY_TYPE_READ_MORE,
if (log_query == NULL)
log_query = sql_statement_get_log_query(&stmt->stmt);
else
- i_unreached(); /* only single statement is possible now */
+ log_query = "<batch query>";
}
enum cassandra_result_type result_type;
- if (have_nonprepared)
+ if (array_count(&ctx->statements) > 1)
+ result_type = CASSANDRA_RESULT_TYPE_BATCH;
+ else if (have_nonprepared)
result_type = CASSANDRA_RESULT_TYPE_QUERY;
else
result_type = CASSANDRA_RESULT_TYPE_PREPARED;
ctx->query_type, result_type,
transaction_commit_callback, ctx);
switch (result_type) {
+ case CASSANDRA_RESULT_TYPE_BATCH: {
+ struct cassandra_sql_statement *stmt;
+ cass_result->batch = cass_batch_new(CASS_BATCH_TYPE_LOGGED);
+ array_foreach_elem(&ctx->statements, stmt) {
+ cass_batch_add_statement(cass_result->batch,
+ stmt->cass_stmt);
+ cass_statement_free(stmt->cass_stmt);
+ stmt->cass_stmt = NULL;
+ }
+ (void)cassandra_result_connect_and_send_query(cass_result);
+ break;
+ }
case CASSANDRA_RESULT_TYPE_QUERY:
case CASSANDRA_RESULT_TYPE_PREPARED: {
struct cassandra_sql_statement *stmt =
query_type = CASSANDRA_QUERY_TYPE_DELETE;
else
query_type = CASSANDRA_QUERY_TYPE_WRITE;
- if (array_count(&ctx->statements) > 0) {
- transaction_set_failed(ctx,
- "Multiple changes in transaction not supported");
+ if (ctx->query_type_set && ctx->query_type != query_type) {
+ transaction_set_failed(ctx, t_strdup_printf(
+ "Mixed types of queries in transaction not supported "
+ "(%d -> %d)", ctx->query_type, query_type));
return FALSE;
}
ctx->query_type = query_type;
+ ctx->query_type_set = TRUE;
return TRUE;
}