From: Timo Sirainen Date: Tue, 15 Sep 2015 06:20:08 +0000 (+0900) Subject: cassandra: Added delete_consistency parameter. X-Git-Tag: 2.2.19.rc1~48 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=4db61af2cfe2b206113bcc4b6153521679702bb4;p=thirdparty%2Fdovecot%2Fcore.git cassandra: Added delete_consistency parameter. --- diff --git a/src/lib-sql/driver-cassandra.c b/src/lib-sql/driver-cassandra.c index d218f8c1ac..81a8202e5a 100644 --- a/src/lib-sql/driver-cassandra.c +++ b/src/lib-sql/driver-cassandra.c @@ -18,6 +18,12 @@ typedef void driver_cassandra_callback_t(CassFuture *future, void *context); +enum cassandra_query_type { + CASSANDRA_QUERY_TYPE_READ, + CASSANDRA_QUERY_TYPE_WRITE, + CASSANDRA_QUERY_TYPE_DELETE +}; + struct cassandra_callback { unsigned int id; CassFuture *future; @@ -30,7 +36,7 @@ struct cassandra_db { struct sql_db api; char *hosts, *keyspace; - CassConsistency read_consistency, write_consistency; + CassConsistency read_consistency, write_consistency, delete_consistency; CassLogLevel log_level; CassCluster *cluster; @@ -56,6 +62,7 @@ struct cassandra_result { CassIterator *iterator; char *query; char *error; + enum cassandra_query_type query_type; pool_t row_pool; ARRAY_TYPE(const_string) fields; @@ -66,7 +73,6 @@ struct cassandra_result { unsigned int query_sent:1; unsigned int finished:1; - unsigned int write_query:1; }; struct cassandra_transaction_context { @@ -343,6 +349,7 @@ static void driver_cassandra_parse_connect_string(struct cassandra_db *db, db->log_level = CASS_LOG_WARN; db->read_consistency = CASS_CONSISTENCY_LOCAL_QUORUM; db->write_consistency = CASS_CONSISTENCY_LOCAL_QUORUM; + db->delete_consistency = CASS_CONSISTENCY_LOCAL_QUORUM; args = t_strsplit_spaces(connect_string, " "); for (; *args != NULL; args++) { @@ -367,6 +374,9 @@ static void driver_cassandra_parse_connect_string(struct cassandra_db *db, } else if (strcmp(key, "write_consistency") == 0) { if (consistency_parse(value, &db->write_consistency) < 0) i_fatal("cassandra: Unknown write_consistency: %s", value); + } else if (strcmp(key, "delete_consistency") == 0) { + if (consistency_parse(value, &db->delete_consistency) < 0) + i_fatal("cassandra: Unknown delete_consistency: %s", value); } else if (strcmp(key, "log_level") == 0) { if (log_level_parse(value, &db->log_level) < 0) i_fatal("cassandra: Unknown log_level: %s", value); @@ -531,10 +541,17 @@ static int driver_cassandra_send_query(struct cassandra_result *result) result->row_pool = pool_alloconly_create("cassandra result", 512); result->statement = cass_statement_new(result->query, 0); - if (result->write_query) - cass_statement_set_consistency(result->statement, db->write_consistency); - else + switch (result->query_type) { + case CASSANDRA_QUERY_TYPE_READ: cass_statement_set_consistency(result->statement, db->read_consistency); + break; + case CASSANDRA_QUERY_TYPE_WRITE: + cass_statement_set_consistency(result->statement, db->write_consistency); + break; + case CASSANDRA_QUERY_TYPE_DELETE: + cass_statement_set_consistency(result->statement, db->delete_consistency); + break; + } future = cass_session_execute(db->session, result->statement); driver_cassandra_set_callback(future, db, query_callback, result); result->query_sent = TRUE; @@ -561,7 +578,8 @@ static void exec_callback(struct sql_result *_result ATTR_UNUSED, } static void -driver_cassandra_query_full(struct sql_db *_db, const char *query, bool write_query, +driver_cassandra_query_full(struct sql_db *_db, const char *query, + enum cassandra_query_type query_type, sql_query_callback_t *callback, void *context) { struct cassandra_db *db = (struct cassandra_db *)_db; @@ -573,7 +591,7 @@ driver_cassandra_query_full(struct sql_db *_db, const char *query, bool write_qu result->api.refcount = 1; result->callback = callback; result->context = context; - result->write_query = write_query; + result->query_type = query_type; result->query = i_strdup(query); array_append(&db->results, &result, 1); @@ -582,13 +600,13 @@ driver_cassandra_query_full(struct sql_db *_db, const char *query, bool write_qu static void driver_cassandra_exec(struct sql_db *db, const char *query) { - driver_cassandra_query_full(db, query, TRUE, exec_callback, NULL); + driver_cassandra_query_full(db, query, CASSANDRA_QUERY_TYPE_WRITE, exec_callback, NULL); } static void driver_cassandra_query(struct sql_db *db, const char *query, sql_query_callback_t *callback, void *context) { - driver_cassandra_query_full(db, query, FALSE, callback, context); + driver_cassandra_query_full(db, query, CASSANDRA_QUERY_TYPE_READ, callback, context); } static void cassandra_query_s_callback(struct sql_result *result, void *context) @@ -865,6 +883,7 @@ driver_cassandra_transaction_commit(struct sql_transaction_context *_ctx, { struct cassandra_transaction_context *ctx = (struct cassandra_transaction_context *)_ctx; + enum cassandra_query_type query_type; ctx->callback = callback; ctx->context = context; @@ -874,7 +893,11 @@ driver_cassandra_transaction_commit(struct sql_transaction_context *_ctx, driver_cassandra_transaction_unref(&ctx); } else if (_ctx->head->next == NULL) { /* just a single query, send it */ - driver_cassandra_query_full(_ctx->db, _ctx->head->query, TRUE, + if (strncasecmp(_ctx->head->query, "DELETE ", 7) == 0) + query_type = CASSANDRA_QUERY_TYPE_DELETE; + else + query_type = CASSANDRA_QUERY_TYPE_WRITE; + driver_cassandra_query_full(_ctx->db, _ctx->head->query, query_type, transaction_commit_callback, ctx); } else { /* multiple queries - we don't actually have a transaction though */