]> git.ipfire.org Git - thirdparty/dovecot/core.git/commitdiff
cassandra: Split consistency setting to read_consistency and write_consistency.
authorTimo Sirainen <tss@iki.fi>
Mon, 31 Aug 2015 18:31:46 +0000 (21:31 +0300)
committerTimo Sirainen <tss@iki.fi>
Mon, 31 Aug 2015 18:31:46 +0000 (21:31 +0300)
src/lib-sql/driver-cassandra.c

index 077c8c9057e7572c79c4e3c8d020a69fbd618937..b5aa05c88a05ef1360880c779e00f3af1f5aa1c1 100644 (file)
@@ -29,7 +29,7 @@ struct cassandra_db {
        struct sql_db api;
 
        char *hosts, *keyspace;
-       CassConsistency consistency;
+       CassConsistency read_consistency, write_consistency;
        CassLogLevel log_level;
 
        CassCluster *cluster;
@@ -46,7 +46,8 @@ struct cassandra_db {
 
        char *error;
 
-       unsigned int set_consistency:1;
+       unsigned int set_read_consistency:1;
+       unsigned int set_write_consistency:1;
 };
 
 struct cassandra_result {
@@ -64,6 +65,7 @@ struct cassandra_result {
        void *context;
 
        unsigned int finished:1;
+       unsigned int write_query:1;
 };
 
 struct cassandra_transaction_context {
@@ -352,10 +354,14 @@ static void driver_cassandra_parse_connect_string(struct cassandra_db *db,
                           strcmp(key, "keyspace") == 0) {
                        i_free(db->keyspace);
                        db->keyspace = i_strdup(value);
-               } else if (strcmp(key, "consistency") == 0) {
-                       if (consistency_parse(value, &db->consistency) < 0)
-                               i_fatal("cassandra: Unknown consistency: %s", value);
-                       db->set_consistency = TRUE;
+               } else if (strcmp(key, "read_consistency") == 0) {
+                       if (consistency_parse(value, &db->read_consistency) < 0)
+                               i_fatal("cassandra: Unknown read_consistency: %s", value);
+                       db->set_read_consistency = TRUE;
+               } else if (strcmp(key, "write_consistency") == 0) {
+                       if (consistency_parse(value, &db->write_consistency) < 0)
+                               i_fatal("cassandra: Unknown write_consistency: %s", value);
+                       db->set_write_consistency = TRUE;
                } else if (strcmp(key, "log_level") == 0) {
                        if (log_level_parse(value, &db->log_level) < 0)
                                i_fatal("cassandra: Unknown log_level: %s", value);
@@ -518,8 +524,13 @@ static void do_query(struct cassandra_result *result, const char *query)
        result->query = i_strdup(query);
        result->row_pool = pool_alloconly_create("cassandra result", 512);
        result->statement = cass_statement_new(query, 0);
-       if (db->set_consistency)
-               cass_statement_set_consistency(result->statement, db->consistency);
+       if (result->write_query) {
+               if (db->set_write_consistency)
+                       cass_statement_set_consistency(result->statement, db->write_consistency);
+       } else {
+               if (db->set_read_consistency)
+                       cass_statement_set_consistency(result->statement, db->read_consistency);
+       }
        future = cass_session_execute(db->session, result->statement);
        driver_cassandra_set_callback(future, db, query_callback, result);
 }
@@ -529,7 +540,9 @@ static void exec_callback(struct sql_result *_result ATTR_UNUSED,
 {
 }
 
-static void driver_cassandra_exec(struct sql_db *db, const char *query)
+static void
+driver_cassandra_query_full(struct sql_db *db, const char *query, bool write_query,
+                           sql_query_callback_t *callback, void *context)
 {
        struct cassandra_result *result;
 
@@ -537,22 +550,21 @@ static void driver_cassandra_exec(struct sql_db *db, const char *query)
        result->api = driver_cassandra_result;
        result->api.db = db;
        result->api.refcount = 1;
-       result->callback = exec_callback;
+       result->callback = callback;
+       result->context = context;
+       result->write_query = write_query;
        do_query(result, query);
 }
 
+static void driver_cassandra_exec(struct sql_db *db, const char *query)
+{
+       driver_cassandra_query_full(db, query, TRUE, exec_callback, NULL);
+}
+
 static void driver_cassandra_query(struct sql_db *db, const char *query,
                                   sql_query_callback_t *callback, void *context)
 {
-       struct cassandra_result *result;
-
-       result = i_new(struct cassandra_result, 1);
-       result->api = driver_cassandra_result;
-       result->api.db = db;
-       result->api.refcount = 1;
-       result->callback = callback;
-       result->context = context;
-       do_query(result, query);
+       driver_cassandra_query_full(db, query, FALSE, callback, context);
 }
 
 static void cassandra_query_s_callback(struct sql_result *result, void *context)
@@ -797,22 +809,10 @@ transaction_set_failed(struct cassandra_transaction_context *ctx,
 }
 
 static void
-transaction_begin_callback(struct sql_result *result,
-                          struct cassandra_transaction_context *ctx)
+transaction_commit_callback(struct sql_result *result, void *context)
 {
-       if (sql_result_next_row(result) < 0) {
-               ctx->begin_failed = TRUE;
-               transaction_set_failed(ctx, sql_result_get_error(result));
-       } else {
-               ctx->begin_succeeded = TRUE;
-       }
-       driver_cassandra_transaction_unref(&ctx);
-}
+       struct cassandra_transaction_context *ctx = context;
 
-static void
-transaction_commit_callback(struct sql_result *result,
-                           struct cassandra_transaction_context *ctx)
-{
        if (sql_result_next_row(result) < 0)
                ctx->callback(sql_result_get_error(result), ctx->context);
        else
@@ -820,21 +820,9 @@ transaction_commit_callback(struct sql_result *result,
        driver_cassandra_transaction_unref(&ctx);
 }
 
-static void
-transaction_update_callback(struct sql_result *result,
-                           struct sql_transaction_query *query)
-{
-       struct cassandra_transaction_context *ctx =
-               (struct cassandra_transaction_context *)query->trans;
-
-       if (sql_result_next_row(result) < 0)
-               transaction_set_failed(ctx, sql_result_get_error(result));
-       driver_cassandra_transaction_unref(&ctx);
-}
-
 static void
 driver_cassandra_transaction_commit(struct sql_transaction_context *_ctx,
-                               sql_commit_callback_t *callback, void *context)
+                                   sql_commit_callback_t *callback, void *context)
 {
        struct cassandra_transaction_context *ctx =
                (struct cassandra_transaction_context *)_ctx;
@@ -847,19 +835,11 @@ driver_cassandra_transaction_commit(struct sql_transaction_context *_ctx,
                driver_cassandra_transaction_unref(&ctx);
        } else if (_ctx->head->next == NULL) {
                /* just a single query, send it */
-               sql_query(_ctx->db, _ctx->head->query,
+               driver_cassandra_query_full(_ctx->db, _ctx->head->query, TRUE,
                          transaction_commit_callback, ctx);
        } else {
-               /* multiple queries, use a transaction */
-               ctx->refcount++;
-               sql_query(_ctx->db, "BEGIN", transaction_begin_callback, ctx);
-               while (_ctx->head != NULL) {
-                       ctx->refcount++;
-                       sql_query(_ctx->db, _ctx->head->query,
-                                 transaction_update_callback, _ctx->head);
-                       _ctx->head = _ctx->head->next;
-               }
-               sql_query(_ctx->db, "COMMIT", transaction_commit_callback, ctx);
+               /* multiple queries - we don't actually have a transaction though */
+               callback("Multiple changes in transaction not supported", context);
        }
 }