typedef void driver_cassandra_callback_t(CassFuture *future, void *context);
+enum cassandra_query_type {
+ CASSANDRA_QUERY_TYPE_READ,
+ CASSANDRA_QUERY_TYPE_WRITE,
+ CASSANDRA_QUERY_TYPE_DELETE
+};
+
struct cassandra_callback {
unsigned int id;
CassFuture *future;
struct sql_db api;
char *hosts, *keyspace;
- CassConsistency read_consistency, write_consistency;
+ CassConsistency read_consistency, write_consistency, delete_consistency;
CassLogLevel log_level;
CassCluster *cluster;
CassIterator *iterator;
char *query;
char *error;
+ enum cassandra_query_type query_type;
pool_t row_pool;
ARRAY_TYPE(const_string) fields;
unsigned int query_sent:1;
unsigned int finished:1;
- unsigned int write_query:1;
};
struct cassandra_transaction_context {
db->log_level = CASS_LOG_WARN;
db->read_consistency = CASS_CONSISTENCY_LOCAL_QUORUM;
db->write_consistency = CASS_CONSISTENCY_LOCAL_QUORUM;
+ db->delete_consistency = CASS_CONSISTENCY_LOCAL_QUORUM;
args = t_strsplit_spaces(connect_string, " ");
for (; *args != NULL; args++) {
} else if (strcmp(key, "write_consistency") == 0) {
if (consistency_parse(value, &db->write_consistency) < 0)
i_fatal("cassandra: Unknown write_consistency: %s", value);
+ } else if (strcmp(key, "delete_consistency") == 0) {
+ if (consistency_parse(value, &db->delete_consistency) < 0)
+ i_fatal("cassandra: Unknown delete_consistency: %s", value);
} else if (strcmp(key, "log_level") == 0) {
if (log_level_parse(value, &db->log_level) < 0)
i_fatal("cassandra: Unknown log_level: %s", value);
result->row_pool = pool_alloconly_create("cassandra result", 512);
result->statement = cass_statement_new(result->query, 0);
- if (result->write_query)
- cass_statement_set_consistency(result->statement, db->write_consistency);
- else
+ switch (result->query_type) {
+ case CASSANDRA_QUERY_TYPE_READ:
cass_statement_set_consistency(result->statement, db->read_consistency);
+ break;
+ case CASSANDRA_QUERY_TYPE_WRITE:
+ cass_statement_set_consistency(result->statement, db->write_consistency);
+ break;
+ case CASSANDRA_QUERY_TYPE_DELETE:
+ cass_statement_set_consistency(result->statement, db->delete_consistency);
+ break;
+ }
future = cass_session_execute(db->session, result->statement);
driver_cassandra_set_callback(future, db, query_callback, result);
result->query_sent = TRUE;
}
static void
-driver_cassandra_query_full(struct sql_db *_db, const char *query, bool write_query,
+driver_cassandra_query_full(struct sql_db *_db, const char *query,
+ enum cassandra_query_type query_type,
sql_query_callback_t *callback, void *context)
{
struct cassandra_db *db = (struct cassandra_db *)_db;
result->api.refcount = 1;
result->callback = callback;
result->context = context;
- result->write_query = write_query;
+ result->query_type = query_type;
result->query = i_strdup(query);
array_append(&db->results, &result, 1);
static void driver_cassandra_exec(struct sql_db *db, const char *query)
{
- driver_cassandra_query_full(db, query, TRUE, exec_callback, NULL);
+ driver_cassandra_query_full(db, query, CASSANDRA_QUERY_TYPE_WRITE, exec_callback, NULL);
}
static void driver_cassandra_query(struct sql_db *db, const char *query,
sql_query_callback_t *callback, void *context)
{
- driver_cassandra_query_full(db, query, FALSE, callback, context);
+ driver_cassandra_query_full(db, query, CASSANDRA_QUERY_TYPE_READ, callback, context);
}
static void cassandra_query_s_callback(struct sql_result *result, void *context)
{
struct cassandra_transaction_context *ctx =
(struct cassandra_transaction_context *)_ctx;
+ enum cassandra_query_type query_type;
ctx->callback = callback;
ctx->context = context;
driver_cassandra_transaction_unref(&ctx);
} else if (_ctx->head->next == NULL) {
/* just a single query, send it */
- driver_cassandra_query_full(_ctx->db, _ctx->head->query, TRUE,
+ if (strncasecmp(_ctx->head->query, "DELETE ", 7) == 0)
+ query_type = CASSANDRA_QUERY_TYPE_DELETE;
+ else
+ query_type = CASSANDRA_QUERY_TYPE_WRITE;
+ driver_cassandra_query_full(_ctx->db, _ctx->head->query, query_type,
transaction_commit_callback, ctx);
} else {
/* multiple queries - we don't actually have a transaction though */