]> git.ipfire.org Git - thirdparty/dovecot/core.git/commitdiff
cassandra: Add page_size setting to enable paged results for queries
authorTimo Sirainen <timo.sirainen@dovecot.fi>
Mon, 17 Jul 2017 14:54:07 +0000 (17:54 +0300)
committerTimo Sirainen <timo.sirainen@dovecot.fi>
Tue, 18 Jul 2017 12:22:41 +0000 (15:22 +0300)
src/lib-sql/driver-cassandra.c

index e8239b952fab81d97774ac797cdcafcdd8337e0c..7c8d0e775c6ef5745f82c53eaf75b142c2f2a320 100644 (file)
@@ -56,6 +56,7 @@ static const char *counter_names[CASSANDRA_COUNTER_COUNT] = {
 
 enum cassandra_query_type {
        CASSANDRA_QUERY_TYPE_READ,
+       CASSANDRA_QUERY_TYPE_READ_MORE,
        CASSANDRA_QUERY_TYPE_WRITE,
        CASSANDRA_QUERY_TYPE_DELETE
 };
@@ -88,6 +89,7 @@ struct cassandra_db {
        unsigned int warn_timeout_secs;
        unsigned int heartbeat_interval_secs, idle_timeout_secs;
        unsigned int execution_retry_interval_msecs, execution_retry_times;
+       unsigned int page_size;
        in_port_t port;
 
        CassCluster *cluster;
@@ -516,6 +518,9 @@ static void driver_cassandra_parse_connect_string(struct cassandra_db *db,
 #ifndef HAVE_CASSANDRA_SPECULATIVE_POLICY
                        i_fatal("cassandra: This cassandra version does not support execution_retry_times");
 #endif
+               } else if (strcmp(key, "page_size") == 0) {
+                       if (str_to_uint(value, &db->page_size) < 0)
+                               i_fatal("cassandra: Invalid page_size: %s", value);
                } else {
                        i_fatal("cassandra: Unknown connect string: %s", key);
                }
@@ -886,12 +891,20 @@ static void query_callback(CassFuture *future, void *context)
 
 static void driver_cassandra_init_statement(struct cassandra_result *result)
 {
+       struct cassandra_db *db = (struct cassandra_db *)result->api.db;
+
+       if (result->statement != NULL) {
+               /* continuing a paged result */
+               return;
+       }
        result->statement = cass_statement_new(result->query, 0);
        cass_statement_set_consistency(result->statement, result->consistency);
 
 #ifdef HAVE_CASSANDRA_SPECULATIVE_POLICY
        cass_statement_set_is_idempotent(result->statement, cass_true);
 #endif
+       if (db->page_size > 0)
+               cass_statement_set_paging_size(result->statement, db->page_size);
 }
 
 static void driver_cassandra_result_send_query(struct cassandra_result *result)
@@ -961,6 +974,11 @@ static int driver_cassandra_send_query(struct cassandra_result *result)
                result->consistency = db->read_consistency;
                result->fallback_consistency = db->read_fallback_consistency;
                break;
+       case CASSANDRA_QUERY_TYPE_READ_MORE:
+               /* consistency is already set and we don't want to fallback
+                  at this point anymore. */
+               result->fallback_consistency = result->consistency;
+               break;
        case CASSANDRA_QUERY_TYPE_WRITE:
                result->consistency = db->write_consistency;
                result->fallback_consistency = db->write_fallback_consistency;
@@ -1184,6 +1202,24 @@ driver_cassandra_get_value(struct cassandra_result *result,
        return 0;
 }
 
+static int driver_cassandra_result_next_page(struct cassandra_result *result)
+{
+       struct cassandra_db *db = (struct cassandra_db *)result->api.db;
+
+       if (db->page_size == 0) {
+               /* no paging */
+               return 0;
+       }
+       if (cass_result_has_more_pages(result->result) == cass_false)
+               return 0;
+
+       /* callers that don't support sql_query_more() will still get a useful
+          error message. */
+       i_free(result->error);
+       result->error = i_strdup("Paged query has more results, but not supported by the caller");
+       return SQL_RESULT_NEXT_MORE;
+}
+
 static int driver_cassandra_result_next_row(struct sql_result *_result)
 {
        struct cassandra_result *result = (struct cassandra_result *)_result;
@@ -1198,7 +1234,7 @@ static int driver_cassandra_result_next_row(struct sql_result *_result)
                return -1;
 
        if (!cass_iterator_next(result->iterator))
-               return 0;
+               return driver_cassandra_result_next_page(result);
        result->row_count++;
 
        p_clear(result->row_pool);
@@ -1217,6 +1253,45 @@ static int driver_cassandra_result_next_row(struct sql_result *_result)
        return ret;
 }
 
+static void
+driver_cassandra_result_more(struct sql_result **_result, bool async,
+                            sql_query_callback_t *callback, void *context)
+{
+       struct cassandra_db *db = (struct cassandra_db *)(*_result)->db;
+       struct cassandra_result *new_result;
+       struct cassandra_result *old_result =
+               (struct cassandra_result *)*_result;
+
+       /* Initialize the next page as a new sql_result */
+       new_result = driver_cassandra_query_init(db, old_result->query,
+                                                CASSANDRA_QUERY_TYPE_READ_MORE,
+                                                callback, context);
+
+       /* Preserve the statement and update its paging state */
+       new_result->statement = old_result->statement;
+       old_result->statement = NULL;
+       cass_statement_set_paging_state(new_result->statement,
+                                       old_result->result);
+
+       sql_result_unref(*_result);
+       *_result = NULL;
+
+       if (async)
+               (void)driver_cassandra_send_query(new_result);
+       else {
+               i_assert(db->api.state == SQL_DB_STATE_IDLE);
+               driver_cassandra_sync_init(db);
+               (void)driver_cassandra_send_query(new_result);
+               if (new_result->result == NULL) {
+                       db->io_pipe = io_loop_move_io(&db->io_pipe);
+                       io_loop_run(db->ioloop);
+               }
+               driver_cassandra_sync_deinit(db);
+
+               callback(&new_result->api, context);
+       }
+}
+
 static unsigned int
 driver_cassandra_result_get_fields_count(struct sql_result *_result)
 {
@@ -1538,7 +1613,7 @@ const struct sql_result driver_cassandra_result = {
                driver_cassandra_result_find_field_value,
                driver_cassandra_result_get_values,
                driver_cassandra_result_get_error,
-               NULL,
+               driver_cassandra_result_more,
        }
 };