]> git.ipfire.org Git - thirdparty/dovecot/core.git/commitdiff
cassandra: Added delete_consistency parameter.
authorTimo Sirainen <tss@iki.fi>
Tue, 15 Sep 2015 06:20:08 +0000 (15:20 +0900)
committerTimo Sirainen <tss@iki.fi>
Tue, 15 Sep 2015 06:20:08 +0000 (15:20 +0900)
src/lib-sql/driver-cassandra.c

index d218f8c1acd62e6c201387b093284058e4482f0c..81a8202e5ac152bd6bdf0c0ece3cfaeab62301d0 100644 (file)
 
 typedef void driver_cassandra_callback_t(CassFuture *future, void *context);
 
+enum cassandra_query_type {
+       CASSANDRA_QUERY_TYPE_READ,
+       CASSANDRA_QUERY_TYPE_WRITE,
+       CASSANDRA_QUERY_TYPE_DELETE
+};
+
 struct cassandra_callback {
        unsigned int id;
        CassFuture *future;
@@ -30,7 +36,7 @@ struct cassandra_db {
        struct sql_db api;
 
        char *hosts, *keyspace;
-       CassConsistency read_consistency, write_consistency;
+       CassConsistency read_consistency, write_consistency, delete_consistency;
        CassLogLevel log_level;
 
        CassCluster *cluster;
@@ -56,6 +62,7 @@ struct cassandra_result {
        CassIterator *iterator;
        char *query;
        char *error;
+       enum cassandra_query_type query_type;
 
        pool_t row_pool;
        ARRAY_TYPE(const_string) fields;
@@ -66,7 +73,6 @@ struct cassandra_result {
 
        unsigned int query_sent:1;
        unsigned int finished:1;
-       unsigned int write_query:1;
 };
 
 struct cassandra_transaction_context {
@@ -343,6 +349,7 @@ static void driver_cassandra_parse_connect_string(struct cassandra_db *db,
        db->log_level = CASS_LOG_WARN;
        db->read_consistency = CASS_CONSISTENCY_LOCAL_QUORUM;
        db->write_consistency = CASS_CONSISTENCY_LOCAL_QUORUM;
+       db->delete_consistency = CASS_CONSISTENCY_LOCAL_QUORUM;
 
        args = t_strsplit_spaces(connect_string, " ");
        for (; *args != NULL; args++) {
@@ -367,6 +374,9 @@ static void driver_cassandra_parse_connect_string(struct cassandra_db *db,
                } else if (strcmp(key, "write_consistency") == 0) {
                        if (consistency_parse(value, &db->write_consistency) < 0)
                                i_fatal("cassandra: Unknown write_consistency: %s", value);
+               } else if (strcmp(key, "delete_consistency") == 0) {
+                       if (consistency_parse(value, &db->delete_consistency) < 0)
+                               i_fatal("cassandra: Unknown delete_consistency: %s", value);
                } else if (strcmp(key, "log_level") == 0) {
                        if (log_level_parse(value, &db->log_level) < 0)
                                i_fatal("cassandra: Unknown log_level: %s", value);
@@ -531,10 +541,17 @@ static int driver_cassandra_send_query(struct cassandra_result *result)
 
        result->row_pool = pool_alloconly_create("cassandra result", 512);
        result->statement = cass_statement_new(result->query, 0);
-       if (result->write_query)
-               cass_statement_set_consistency(result->statement, db->write_consistency);
-       else
+       switch (result->query_type) {
+       case CASSANDRA_QUERY_TYPE_READ:
                cass_statement_set_consistency(result->statement, db->read_consistency);
+               break;
+       case CASSANDRA_QUERY_TYPE_WRITE:
+               cass_statement_set_consistency(result->statement, db->write_consistency);
+               break;
+       case CASSANDRA_QUERY_TYPE_DELETE:
+               cass_statement_set_consistency(result->statement, db->delete_consistency);
+               break;
+       }
        future = cass_session_execute(db->session, result->statement);
        driver_cassandra_set_callback(future, db, query_callback, result);
        result->query_sent = TRUE;
@@ -561,7 +578,8 @@ static void exec_callback(struct sql_result *_result ATTR_UNUSED,
 }
 
 static void
-driver_cassandra_query_full(struct sql_db *_db, const char *query, bool write_query,
+driver_cassandra_query_full(struct sql_db *_db, const char *query,
+                           enum cassandra_query_type query_type,
                            sql_query_callback_t *callback, void *context)
 {
         struct cassandra_db *db = (struct cassandra_db *)_db;
@@ -573,7 +591,7 @@ driver_cassandra_query_full(struct sql_db *_db, const char *query, bool write_qu
        result->api.refcount = 1;
        result->callback = callback;
        result->context = context;
-       result->write_query = write_query;
+       result->query_type = query_type;
        result->query = i_strdup(query);
        array_append(&db->results, &result, 1);
 
@@ -582,13 +600,13 @@ driver_cassandra_query_full(struct sql_db *_db, const char *query, bool write_qu
 
 static void driver_cassandra_exec(struct sql_db *db, const char *query)
 {
-       driver_cassandra_query_full(db, query, TRUE, exec_callback, NULL);
+       driver_cassandra_query_full(db, query, CASSANDRA_QUERY_TYPE_WRITE, exec_callback, NULL);
 }
 
 static void driver_cassandra_query(struct sql_db *db, const char *query,
                                   sql_query_callback_t *callback, void *context)
 {
-       driver_cassandra_query_full(db, query, FALSE, callback, context);
+       driver_cassandra_query_full(db, query, CASSANDRA_QUERY_TYPE_READ, callback, context);
 }
 
 static void cassandra_query_s_callback(struct sql_result *result, void *context)
@@ -865,6 +883,7 @@ driver_cassandra_transaction_commit(struct sql_transaction_context *_ctx,
 {
        struct cassandra_transaction_context *ctx =
                (struct cassandra_transaction_context *)_ctx;
+       enum cassandra_query_type query_type;
 
        ctx->callback = callback;
        ctx->context = context;
@@ -874,7 +893,11 @@ driver_cassandra_transaction_commit(struct sql_transaction_context *_ctx,
                driver_cassandra_transaction_unref(&ctx);
        } else if (_ctx->head->next == NULL) {
                /* just a single query, send it */
-               driver_cassandra_query_full(_ctx->db, _ctx->head->query, TRUE,
+               if (strncasecmp(_ctx->head->query, "DELETE ", 7) == 0)
+                       query_type = CASSANDRA_QUERY_TYPE_DELETE;
+               else
+                       query_type = CASSANDRA_QUERY_TYPE_WRITE;
+               driver_cassandra_query_full(_ctx->db, _ctx->head->query, query_type,
                          transaction_commit_callback, ctx);
        } else {
                /* multiple queries - we don't actually have a transaction though */