]> git.ipfire.org Git - thirdparty/dovecot/core.git/commitdiff
cassandra: Handle async queries internally - don't use sql pooling code.
authorTimo Sirainen <tss@iki.fi>
Thu, 3 Sep 2015 18:37:09 +0000 (21:37 +0300)
committerTimo Sirainen <tss@iki.fi>
Thu, 3 Sep 2015 18:37:09 +0000 (21:37 +0300)
There's no need to create multiple Cassandra instances, since the single
instance is capable of doing multiple asynchronous requests in parallel.

src/lib-sql/driver-cassandra.c

index 41a9032134706a033f1bbfbdefd5e64822628941..d218f8c1acd62e6c201387b093284058e4482f0c 100644 (file)
@@ -39,9 +39,10 @@ struct cassandra_db {
        int fd_pipe[2];
        struct io *io_pipe;
        ARRAY(struct cassandra_callback *) callbacks;
+       ARRAY(struct cassandra_result *) results;
        unsigned int callback_ids;
 
-       struct cassandra_result *cur_result;
+       /* for synchronous queries: */
        struct ioloop *ioloop, *orig_ioloop;
        struct sql_result *sync_result;
 
@@ -63,6 +64,7 @@ struct cassandra_result {
        sql_query_callback_t *callback;
        void *context;
 
+       unsigned int query_sent:1;
        unsigned int finished:1;
        unsigned int write_query:1;
 };
@@ -116,6 +118,7 @@ static struct {
        { CASS_LOG_TRACE, "trace" }
 };
 
+static void driver_cassandra_send_queries(struct cassandra_db *db);
 static void result_finish(struct cassandra_result *result);
 
 static int consistency_parse(const char *str, CassConsistency *consistency_r)
@@ -146,8 +149,6 @@ static int log_level_parse(const char *str, CassLogLevel *log_level_r)
 
 static void driver_cassandra_set_state(struct cassandra_db *db, enum sql_db_state state)
 {
-       i_assert(state == SQL_DB_STATE_BUSY || db->cur_result == NULL);
-
        /* switch back to original ioloop in case the caller wants to
           add/remove timeouts */
        if (db->ioloop != NULL)
@@ -157,8 +158,10 @@ static void driver_cassandra_set_state(struct cassandra_db *db, enum sql_db_stat
                io_loop_set_current(db->ioloop);
 }
 
-static void driver_cassandra_close(struct cassandra_db *db)
+static void driver_cassandra_close(struct cassandra_db *db, const char *error)
 {
+       struct cassandra_result *const *resultp;
+
        if (db->io_pipe != NULL)
                io_remove(&db->io_pipe);
        if (db->fd_pipe[0] != -1) {
@@ -167,6 +170,13 @@ static void driver_cassandra_close(struct cassandra_db *db)
        }
        driver_cassandra_set_state(db, SQL_DB_STATE_DISCONNECTED);
 
+       while (array_count(&db->results) > 0) {
+               resultp = array_idx(&db->results, 0);
+               if ((*resultp)->error == NULL)
+                       (*resultp)->error = i_strdup(error);
+               result_finish(*resultp);
+       }
+
        if (db->ioloop != NULL) {
                /* running a sync query, stop it */
                io_loop_stop(db->ioloop);
@@ -238,7 +248,7 @@ static void driver_cassandra_input(struct cassandra_db *db)
                        driver_cassandra_input_id(db, ids[i]);
                return;
        }
-       driver_cassandra_close(db);
+       driver_cassandra_close(db, "IPC pipe closed");
 }
 
 static void
@@ -267,7 +277,7 @@ static void connect_callback(CassFuture *future, void *context)
        if ((rc = cass_future_error_code(future)) != CASS_OK) {
                driver_cassandra_log_error(future,
                                           "Couldn't connect to Cassandra");
-               driver_cassandra_close(db);
+               driver_cassandra_close(db, "Couldn't connect to Cassandra");
                return;
        }
        driver_cassandra_set_state(db, SQL_DB_STATE_IDLE);
@@ -276,6 +286,7 @@ static void connect_callback(CassFuture *future, void *context)
                   finish */
                io_loop_stop(db->ioloop);
        }
+       driver_cassandra_send_queries(db);
 }
 
 static int driver_cassandra_connect(struct sql_db *_db)
@@ -302,12 +313,7 @@ static void driver_cassandra_disconnect(struct sql_db *_db)
 {
        struct cassandra_db *db = (struct cassandra_db *)_db;
 
-       if (db->cur_result != NULL && !db->cur_result->finished) {
-               if (db->cur_result->error == NULL)
-                       db->cur_result->error = i_strdup("disconnecting");
-               result_finish(db->cur_result);
-       }
-       driver_cassandra_close(db);
+       driver_cassandra_close(db, "Disconnected");
 }
 
 static const char *
@@ -394,6 +400,7 @@ static struct sql_db *driver_cassandra_init_v(const char *connect_string)
        cass_cluster_set_request_timeout(db->cluster, SQL_QUERY_TIMEOUT_SECS * 1000);
        cass_cluster_set_contact_points(db->cluster, db->hosts);
        db->session = cass_session_new();
+       i_array_init(&db->results, 16);
        i_array_init(&db->callbacks, 16);
        return &db->api;
 }
@@ -402,15 +409,12 @@ static void driver_cassandra_deinit_v(struct sql_db *_db)
 {
        struct cassandra_db *db = (struct cassandra_db *)_db;
 
-       if (db->cur_result != NULL && !db->cur_result->finished) {
-               if (db->cur_result->error == NULL)
-                       db->cur_result->error = i_strdup("deinitializing");
-               result_finish(db->cur_result);
-       }
-        driver_cassandra_close(db);
+        driver_cassandra_close(db, "Deinitialized");
 
        i_assert(array_count(&db->callbacks) == 0);
        array_free(&db->callbacks);
+       i_assert(array_count(&db->results) == 0);
+       array_free(&db->results);
 
        cass_session_free(db->session);
        cass_cluster_free(db->cluster);
@@ -421,11 +425,20 @@ static void driver_cassandra_deinit_v(struct sql_db *_db)
        i_free(db);
 }
 
-static void driver_cassandra_set_idle(struct cassandra_db *db)
+static void driver_cassandra_result_unlink(struct cassandra_db *db,
+                                          struct cassandra_result *result)
 {
-       i_assert(db->api.state == SQL_DB_STATE_BUSY);
+       struct cassandra_result *const *results;
+       unsigned int i, count;
 
-       driver_cassandra_set_state(db, SQL_DB_STATE_IDLE);
+       results = array_get(&db->results, &count);
+       for (i = 0; i < count; i++) {
+               if (results[i] == result) {
+                       array_delete(&db->results, i, 1);
+                       return;
+               }
+       }
+       i_unreached();
 }
 
 static void driver_cassandra_result_free(struct sql_result *_result)
@@ -434,14 +447,11 @@ static void driver_cassandra_result_free(struct sql_result *_result)
         struct cassandra_result *result = (struct cassandra_result *)_result;
 
        i_assert(!result->api.callback);
-       i_assert(db->cur_result == result);
        i_assert(result->callback == NULL);
 
        if (_result == db->sync_result)
                db->sync_result = NULL;
-       db->cur_result = NULL;
 
-       driver_cassandra_set_idle(db);
        if (result->result != NULL)
                cass_result_free(result->result);
        if (result->iterator != NULL)
@@ -459,6 +469,7 @@ static void result_finish(struct cassandra_result *result)
        bool free_result = TRUE;
 
        result->finished = TRUE;
+       driver_cassandra_result_unlink(db, result);
 
        if (db->log_level >= CASS_LOG_DEBUG) {
                i_debug("cassandra: Finished query '%s': %s", result->query,
@@ -504,26 +515,44 @@ static void query_callback(CassFuture *future, void *context)
        result_finish(result);
 }
 
-static void do_query(struct cassandra_result *result, const char *query)
+static int driver_cassandra_send_query(struct cassandra_result *result)
 {
         struct cassandra_db *db = (struct cassandra_db *)result->api.db;
        CassFuture *future;
+       int ret;
 
-       i_assert(SQL_DB_IS_READY(&db->api));
-       i_assert(db->cur_result == NULL);
-
-       driver_cassandra_set_state(db, SQL_DB_STATE_BUSY);
-       db->cur_result = result;
+       if (!SQL_DB_IS_READY(&db->api)) {
+               if ((ret = sql_connect(&db->api)) <= 0) {
+                       if (ret < 0)
+                               driver_cassandra_close(db, "Couldn't connect to Cassandra");
+                       return ret;
+               }
+       }
 
-       result->query = i_strdup(query);
        result->row_pool = pool_alloconly_create("cassandra result", 512);
-       result->statement = cass_statement_new(query, 0);
+       result->statement = cass_statement_new(result->query, 0);
        if (result->write_query)
                cass_statement_set_consistency(result->statement, db->write_consistency);
        else
                cass_statement_set_consistency(result->statement, db->read_consistency);
        future = cass_session_execute(db->session, result->statement);
        driver_cassandra_set_callback(future, db, query_callback, result);
+       result->query_sent = TRUE;
+       return 1;
+}
+
+static void driver_cassandra_send_queries(struct cassandra_db *db)
+{
+       struct cassandra_result *const *results;
+       unsigned int i, count;
+
+       results = array_get(&db->results, &count);
+       for (i = 0; i < count; i++) {
+               if (!results[i]->query_sent) {
+                       if (driver_cassandra_send_query(results[i]) <= 0)
+                               break;
+               }
+       }
 }
 
 static void exec_callback(struct sql_result *_result ATTR_UNUSED,
@@ -532,19 +561,23 @@ static void exec_callback(struct sql_result *_result ATTR_UNUSED,
 }
 
 static void
-driver_cassandra_query_full(struct sql_db *db, const char *query, bool write_query,
+driver_cassandra_query_full(struct sql_db *_db, const char *query, bool write_query,
                            sql_query_callback_t *callback, void *context)
 {
+        struct cassandra_db *db = (struct cassandra_db *)_db;
        struct cassandra_result *result;
 
        result = i_new(struct cassandra_result, 1);
        result->api = driver_cassandra_result;
-       result->api.db = db;
+       result->api.db = _db;
        result->api.refcount = 1;
        result->callback = callback;
        result->context = context;
        result->write_query = write_query;
-       do_query(result, query);
+       result->query = i_strdup(query);
+       array_append(&db->results, &result, 1);
+
+       (void)driver_cassandra_send_query(result);
 }
 
 static void driver_cassandra_exec(struct sql_db *db, const char *query)
@@ -971,7 +1004,7 @@ driver_cassandra_escape_blob(struct sql_db *_db ATTR_UNUSED,
 
 const struct sql_db driver_cassandra_db = {
        .name = "cassandra",
-       .flags = SQL_DB_FLAG_POOLED,
+       .flags = 0,
 
        .v = {
                driver_cassandra_init_v,