#include "net.h"
#include "write-full.h"
#include "time-util.h"
+#include "settings-parser.h"
#include "sql-api-private.h"
#ifdef BUILD_CASSANDRA
CassConsistency read_fallback_consistency, write_fallback_consistency, delete_fallback_consistency;
CassLogLevel log_level;
unsigned int protocol_version;
+ unsigned int num_threads;
+ unsigned int connect_timeout_secs, request_timeout_secs;
in_port_t port;
CassCluster *cluster;
static void driver_cassandra_parse_connect_string(struct cassandra_db *db,
const char *connect_string)
{
- const char *const *args, *key, *value;
+ const char *const *args, *key, *value, *error;
string_t *hosts = t_str_new(64);
bool read_fallback_set = FALSE, write_fallback_set = FALSE, delete_fallback_set = FALSE;
db->read_consistency = CASS_CONSISTENCY_LOCAL_QUORUM;
db->write_consistency = CASS_CONSISTENCY_LOCAL_QUORUM;
db->delete_consistency = CASS_CONSISTENCY_LOCAL_QUORUM;
+ db->connect_timeout_secs = SQL_CONNECT_TIMEOUT_SECS;
+ db->request_timeout_secs = SQL_QUERY_TIMEOUT_SECS;
args = t_strsplit_spaces(connect_string, " ");
for (; *args != NULL; args++) {
} else if (strcmp(key, "version") == 0) {
if (str_to_uint(value, &db->protocol_version) < 0)
i_fatal("cassandra: Invalid version: %s", value);
+ } else if (strcmp(key, "num_threads") == 0) {
+ if (str_to_uint(value, &db->num_threads) < 0)
+ i_fatal("cassandra: Invalid num_threads: %s", value);
+ } else if (strcmp(key, "connect_timeout") == 0) {
+ if (settings_get_time(value, &db->connect_timeout_secs, &error) < 0)
+ i_fatal("cassandra: Invalid connect_timeout '%s': %s", value, error);
+ } else if (strcmp(key, "request_timeout") == 0) {
+ if (settings_get_time(value, &db->request_timeout_secs, &error) < 0)
+ i_fatal("cassandra: Invalid request_timeout '%s': %s", value, error);
} else {
i_fatal("cassandra: Unknown connect string: %s", key);
}
db->timestamp_gen = cass_timestamp_gen_monotonic_new();
db->cluster = cass_cluster_new();
cass_cluster_set_timestamp_gen(db->cluster, db->timestamp_gen);
- cass_cluster_set_connect_timeout(db->cluster, SQL_CONNECT_TIMEOUT_SECS * 1000);
- cass_cluster_set_request_timeout(db->cluster, SQL_QUERY_TIMEOUT_SECS * 1000);
+ cass_cluster_set_connect_timeout(db->cluster, db->connect_timeout_secs * 1000);
+ cass_cluster_set_request_timeout(db->cluster, db->request_timeout_secs * 1000);
cass_cluster_set_contact_points(db->cluster, db->hosts);
if (db->user != NULL && db->password != NULL)
cass_cluster_set_credentials(db->cluster, db->user, db->password);
cass_cluster_set_port(db->cluster, db->port);
if (db->protocol_version != 0)
cass_cluster_set_protocol_version(db->cluster, db->protocol_version);
+ if (db->num_threads != 0)
+ cass_cluster_set_num_threads_io(db->cluster, db->num_threads);
db->session = cass_session_new();
i_array_init(&db->results, 16);
i_array_init(&db->callbacks, 16);