enum cassandra_query_type {
CASSANDRA_QUERY_TYPE_READ,
+ CASSANDRA_QUERY_TYPE_READ_MORE,
CASSANDRA_QUERY_TYPE_WRITE,
CASSANDRA_QUERY_TYPE_DELETE
};
unsigned int warn_timeout_secs;
unsigned int heartbeat_interval_secs, idle_timeout_secs;
unsigned int execution_retry_interval_msecs, execution_retry_times;
+ unsigned int page_size;
in_port_t port;
CassCluster *cluster;
#ifndef HAVE_CASSANDRA_SPECULATIVE_POLICY
i_fatal("cassandra: This cassandra version does not support execution_retry_times");
#endif
+ } else if (strcmp(key, "page_size") == 0) {
+ if (str_to_uint(value, &db->page_size) < 0)
+ i_fatal("cassandra: Invalid page_size: %s", value);
} else {
i_fatal("cassandra: Unknown connect string: %s", key);
}
static void driver_cassandra_init_statement(struct cassandra_result *result)
{
+ struct cassandra_db *db = (struct cassandra_db *)result->api.db;
+
+ if (result->statement != NULL) {
+ /* continuing a paged result */
+ return;
+ }
result->statement = cass_statement_new(result->query, 0);
cass_statement_set_consistency(result->statement, result->consistency);
#ifdef HAVE_CASSANDRA_SPECULATIVE_POLICY
cass_statement_set_is_idempotent(result->statement, cass_true);
#endif
+ if (db->page_size > 0)
+ cass_statement_set_paging_size(result->statement, db->page_size);
}
static void driver_cassandra_result_send_query(struct cassandra_result *result)
result->consistency = db->read_consistency;
result->fallback_consistency = db->read_fallback_consistency;
break;
+ case CASSANDRA_QUERY_TYPE_READ_MORE:
+ /* consistency is already set and we don't want to fallback
+ at this point anymore. */
+ result->fallback_consistency = result->consistency;
+ break;
case CASSANDRA_QUERY_TYPE_WRITE:
result->consistency = db->write_consistency;
result->fallback_consistency = db->write_fallback_consistency;
return 0;
}
+static int driver_cassandra_result_next_page(struct cassandra_result *result)
+{
+ struct cassandra_db *db = (struct cassandra_db *)result->api.db;
+
+ if (db->page_size == 0) {
+ /* no paging */
+ return 0;
+ }
+ if (cass_result_has_more_pages(result->result) == cass_false)
+ return 0;
+
+ /* callers that don't support sql_query_more() will still get a useful
+ error message. */
+ i_free(result->error);
+ result->error = i_strdup("Paged query has more results, but not supported by the caller");
+ return SQL_RESULT_NEXT_MORE;
+}
+
static int driver_cassandra_result_next_row(struct sql_result *_result)
{
struct cassandra_result *result = (struct cassandra_result *)_result;
return -1;
if (!cass_iterator_next(result->iterator))
- return 0;
+ return driver_cassandra_result_next_page(result);
result->row_count++;
p_clear(result->row_pool);
return ret;
}
+static void
+driver_cassandra_result_more(struct sql_result **_result, bool async,
+ sql_query_callback_t *callback, void *context)
+{
+ struct cassandra_db *db = (struct cassandra_db *)(*_result)->db;
+ struct cassandra_result *new_result;
+ struct cassandra_result *old_result =
+ (struct cassandra_result *)*_result;
+
+ /* Initialize the next page as a new sql_result */
+ new_result = driver_cassandra_query_init(db, old_result->query,
+ CASSANDRA_QUERY_TYPE_READ_MORE,
+ callback, context);
+
+ /* Preserve the statement and update its paging state */
+ new_result->statement = old_result->statement;
+ old_result->statement = NULL;
+ cass_statement_set_paging_state(new_result->statement,
+ old_result->result);
+
+ sql_result_unref(*_result);
+ *_result = NULL;
+
+ if (async)
+ (void)driver_cassandra_send_query(new_result);
+ else {
+ i_assert(db->api.state == SQL_DB_STATE_IDLE);
+ driver_cassandra_sync_init(db);
+ (void)driver_cassandra_send_query(new_result);
+ if (new_result->result == NULL) {
+ db->io_pipe = io_loop_move_io(&db->io_pipe);
+ io_loop_run(db->ioloop);
+ }
+ driver_cassandra_sync_deinit(db);
+
+ callback(&new_result->api, context);
+ }
+}
+
static unsigned int
driver_cassandra_result_get_fields_count(struct sql_result *_result)
{
driver_cassandra_result_find_field_value,
driver_cassandra_result_get_values,
driver_cassandra_result_get_error,
- NULL,
+ driver_cassandra_result_more,
}
};