#include "time-util.h"
#include "var-expand.h"
#include "safe-memset.h"
+#include "settings.h"
+#include "ssl-settings.h"
#include "sql-api-private.h"
#ifdef BUILD_CASSANDRA
static_assert_array_size(cassandra_result_type_prefixes,
CASSANDRA_RESULT_TYPE_COUNT);
+struct cassandra_settings {
+ pool_t pool;
+
+ ARRAY_TYPE(const_string) hosts;
+ in_port_t port;
+ const char *keyspace;
+ const char *user;
+ const char *password;
+
+ const char *metrics_path;
+ const char *log_level;
+ bool debug_queries;
+ bool log_retries;
+ bool latency_aware_routing;
+
+ const char *read_consistency;
+ const char *write_consistency;
+ const char *delete_consistency;
+ const char *read_fallback_consistency;
+ const char *write_fallback_consistency;
+ const char *delete_fallback_consistency;
+
+ unsigned int connect_timeout_msecs;
+ unsigned int request_timeout_msecs;
+ unsigned int warn_timeout_msecs;
+
+ unsigned int protocol_version;
+ unsigned int num_threads;
+ unsigned int heartbeat_interval_secs;
+ unsigned int idle_timeout_secs;
+ unsigned int execution_retry_interval_msecs;
+ unsigned int execution_retry_times;
+ unsigned int page_size;
+
+ const char *ssl;
+
+ /* generated: */
+ CassLogLevel parsed_log_level;
+ CassConsistency parsed_read_consistency;
+ CassConsistency parsed_write_consistency;
+ CassConsistency parsed_delete_consistency;
+ CassConsistency parsed_read_fallback_consistency;
+ CassConsistency parsed_write_fallback_consistency;
+ CassConsistency parsed_delete_fallback_consistency;
+ bool parsed_use_ssl;
+ CassSslVerifyFlags parsed_ssl_verify_flags;
+};
+
+static struct cassandra_settings cassandra_default_settings = {
+ .hosts = ARRAY_INIT,
+ .port = 0,
+ .keyspace = "",
+ .user = "",
+ .password = "",
+
+ .metrics_path = "",
+ .log_level = "warn:critical:error:info:debug:trace",
+ .debug_queries = FALSE,
+ .log_retries = FALSE,
+ .latency_aware_routing = FALSE,
+
+ .read_consistency = "local-quorum",
+ .write_consistency = "local-quorum",
+ .delete_consistency = "local-quorum",
+ .read_fallback_consistency = "",
+ .write_fallback_consistency = "",
+ .delete_fallback_consistency = "",
+
+ .connect_timeout_msecs = SQL_CONNECT_TIMEOUT_SECS * 1000,
+ .request_timeout_msecs = SQL_QUERY_TIMEOUT_SECS * 1000,
+ .warn_timeout_msecs = CASS_QUERY_DEFAULT_WARN_TIMEOUT_MSECS,
+
+ .protocol_version = 0,
+ .num_threads = 0,
+ .heartbeat_interval_secs = 0,
+ .idle_timeout_secs = 0,
+ .execution_retry_interval_msecs = 0,
+ .execution_retry_times = 0,
+ .page_size = 0,
+
+ .ssl = "no:cert-only:cert-ip:cert-dns",
+};
+
struct cassandra_callback {
unsigned int id;
struct timeout *to;
struct cassandra_db {
struct sql_db api;
- char *hosts, *keyspace, *table_prefix, *user, *password;
- CassConsistency read_consistency, write_consistency, delete_consistency;
- CassConsistency read_fallback_consistency, write_fallback_consistency;
- CassConsistency delete_fallback_consistency;
- CassLogLevel log_level;
- bool debug_queries;
- bool log_retries;
- bool latency_aware_routing;
- bool init_ssl;
- unsigned int protocol_version;
- unsigned int num_threads;
- unsigned int connect_timeout_msecs, request_timeout_msecs;
- unsigned int warn_timeout_msecs;
- unsigned int heartbeat_interval_secs, idle_timeout_secs;
- unsigned int execution_retry_interval_msecs, execution_retry_times;
- unsigned int page_size;
- in_port_t port;
+ const struct cassandra_settings *set;
+ const struct ssl_settings *ssl_set;
+ char *table_prefix;
CassCluster *cluster;
CassSession *session;
ARRAY(struct cassandra_result *) results;
unsigned int callback_ids;
- char *metrics_path;
- char *ssl_ca_file;
- char *ssl_cert_file;
- char *ssl_private_key_file;
- char *ssl_private_key_password;
- CassSslVerifyFlags ssl_verify_flags;
-
struct timeout *to_metrics;
uint64_t counters[CASSANDRA_COUNTER_COUNT];
const char *connect_string,
const char **error_r)
{
+ struct cassandra_settings *set;
+ pool_t pool;
const char *const *args, *key, *value, *error;
- string_t *hosts = t_str_new(64);
bool read_fallback_set = FALSE, write_fallback_set = FALSE;
bool delete_fallback_set = FALSE;
- db->log_level = CASS_LOG_WARN;
- db->read_consistency = CASS_CONSISTENCY_LOCAL_QUORUM;
- db->write_consistency = CASS_CONSISTENCY_LOCAL_QUORUM;
- db->delete_consistency = CASS_CONSISTENCY_LOCAL_QUORUM;
- db->connect_timeout_msecs = SQL_CONNECT_TIMEOUT_SECS*1000;
- db->request_timeout_msecs = SQL_QUERY_TIMEOUT_SECS*1000;
- db->warn_timeout_msecs = CASS_QUERY_DEFAULT_WARN_TIMEOUT_MSECS;
+ pool = pool_alloconly_create("cassandra settings", 1024);
+ set = p_new(pool, struct cassandra_settings, 1);
+ *set = cassandra_default_settings;
+ set->pool = pool;
+
+ struct ssl_settings *ssl_set = p_new(pool, struct ssl_settings, 1);
+ *ssl_set = ssl_default_settings;
+ ssl_set->pool = pool;
+ pool_ref(pool);
+
+ p_array_init(&set->hosts, pool, 8);
+ set->parsed_log_level = CASS_LOG_WARN;
+ set->parsed_read_consistency = CASS_CONSISTENCY_LOCAL_QUORUM;
+ set->parsed_write_consistency = CASS_CONSISTENCY_LOCAL_QUORUM;
+ set->parsed_delete_consistency = CASS_CONSISTENCY_LOCAL_QUORUM;
args = t_strsplit_spaces(connect_string, " ");
for (; *args != NULL; args++) {
key = t_strdup_until(*args, value++);
if (str_begins_with(key, "ssl_"))
- db->init_ssl = TRUE;
+ set->parsed_use_ssl = TRUE;
if (strcmp(key, "host") == 0) {
- if (str_len(hosts) > 0)
- str_append_c(hosts, ',');
- str_append(hosts, value);
+ value = p_strdup(pool, value);
+ array_push_back(&set->hosts, &value);
} else if (strcmp(key, "port") == 0) {
- if (net_str2port(value, &db->port) < 0) {
+ if (net_str2port(value, &set->port) < 0) {
*error_r = t_strdup_printf(
"Invalid port: %s", value);
return -1;
}
} else if (strcmp(key, "dbname") == 0 ||
strcmp(key, "keyspace") == 0) {
- i_free(db->keyspace);
- db->keyspace = i_strdup(value);
+ set->keyspace = p_strdup(pool, value);
} else if (strcmp(key, "user") == 0) {
- i_free(db->user);
- db->user = i_strdup(value);
+ set->user = p_strdup(pool, value);
} else if (strcmp(key, "password") == 0) {
- i_free(db->password);
- db->password = i_strdup(value);
+ set->password = p_strdup(pool, value);
} else if (strcmp(key, "read_consistency") == 0) {
- if (consistency_parse(value, &db->read_consistency) < 0) {
+ if (consistency_parse(value, &set->parsed_read_consistency) < 0) {
*error_r = t_strdup_printf(
"Unknown read_consistency: %s", value);
return -1;
}
} else if (strcmp(key, "read_fallback_consistency") == 0) {
- if (consistency_parse(value, &db->read_fallback_consistency) < 0) {
+ if (consistency_parse(value, &set->parsed_read_fallback_consistency) < 0) {
*error_r = t_strdup_printf(
"Unknown read_fallback_consistency: %s", value);
return -1;
read_fallback_set = TRUE;
} else if (strcmp(key, "write_consistency") == 0) {
if (consistency_parse(value,
- &db->write_consistency) < 0) {
+ &set->parsed_write_consistency) < 0) {
*error_r = t_strdup_printf(
"Unknown write_consistency: %s", value);
return -1;
}
} else if (strcmp(key, "write_fallback_consistency") == 0) {
if (consistency_parse(value,
- &db->write_fallback_consistency) < 0) {
+ &set->parsed_write_fallback_consistency) < 0) {
*error_r = t_strdup_printf(
"Unknown write_fallback_consistency: %s",
value);
write_fallback_set = TRUE;
} else if (strcmp(key, "delete_consistency") == 0) {
if (consistency_parse(value,
- &db->delete_consistency) < 0) {
+ &set->parsed_delete_consistency) < 0) {
*error_r = t_strdup_printf(
"Unknown delete_consistency: %s", value);
return -1;
}
} else if (strcmp(key, "delete_fallback_consistency") == 0) {
if (consistency_parse(value,
- &db->delete_fallback_consistency) < 0) {
+ &set->parsed_delete_fallback_consistency) < 0) {
*error_r = t_strdup_printf(
"Unknown delete_fallback_consistency: %s",
value);
}
delete_fallback_set = TRUE;
} else if (strcmp(key, "log_level") == 0) {
- if (log_level_parse(value, &db->log_level) < 0) {
+ if (log_level_parse(value, &set->parsed_log_level) < 0) {
*error_r = t_strdup_printf(
"Unknown log_level: %s", value);
return -1;
}
} else if (strcmp(key, "debug_queries") == 0) {
- db->debug_queries = TRUE;
+ set->debug_queries = TRUE;
} else if (strcmp(key, "log_retries") == 0) {
- db->log_retries = TRUE;
+ set->log_retries = TRUE;
} else if (strcmp(key, "latency_aware_routing") == 0) {
- db->latency_aware_routing = TRUE;
+ set->latency_aware_routing = TRUE;
} else if (strcmp(key, "version") == 0) {
- if (str_to_uint(value, &db->protocol_version) < 0) {
+ if (str_to_uint(value, &set->protocol_version) < 0) {
*error_r = t_strdup_printf(
"Invalid version: %s", value);
return -1;
}
} else if (strcmp(key, "num_threads") == 0) {
- if (str_to_uint(value, &db->num_threads) < 0) {
+ if (str_to_uint(value, &set->num_threads) < 0) {
*error_r = t_strdup_printf(
"Invalid num_threads: %s", value);
return -1;
}
} else if (strcmp(key, "heartbeat_interval") == 0) {
- if (str_parse_get_interval(value, &db->heartbeat_interval_secs,
+ if (str_parse_get_interval(value, &set->heartbeat_interval_secs,
&error) < 0) {
*error_r = t_strdup_printf(
"Invalid heartbeat_interval '%s': %s",
return -1;
}
} else if (strcmp(key, "idle_timeout") == 0) {
- if (str_parse_get_interval(value, &db->idle_timeout_secs,
+ if (str_parse_get_interval(value, &set->idle_timeout_secs,
&error) < 0) {
*error_r = t_strdup_printf(
"Invalid idle_timeout '%s': %s",
}
} else if (strcmp(key, "connect_timeout") == 0) {
if (str_parse_get_interval_msecs(value,
- &db->connect_timeout_msecs,
+ &set->connect_timeout_msecs,
&error) < 0) {
*error_r = t_strdup_printf(
"Invalid connect_timeout '%s': %s",
}
} else if (strcmp(key, "request_timeout") == 0) {
if (str_parse_get_interval_msecs(value,
- &db->request_timeout_msecs,
+ &set->request_timeout_msecs,
&error) < 0) {
*error_r = t_strdup_printf(
"Invalid request_timeout '%s': %s",
}
} else if (strcmp(key, "warn_timeout") == 0) {
if (str_parse_get_interval_msecs(value,
- &db->warn_timeout_msecs,
+ &set->warn_timeout_msecs,
&error) < 0) {
*error_r = t_strdup_printf(
"Invalid warn_timeout '%s': %s",
return -1;
}
} else if (strcmp(key, "metrics") == 0) {
- i_free(db->metrics_path);
- db->metrics_path = i_strdup(value);
+ set->metrics_path = p_strdup(pool, value);
} else if (strcmp(key, "execution_retry_interval") == 0) {
if (str_parse_get_interval_msecs(value,
- &db->execution_retry_interval_msecs,
+ &set->execution_retry_interval_msecs,
&error) < 0) {
*error_r = t_strdup_printf(
"Invalid execution_retry_interval '%s': %s",
return -1;
#endif
} else if (strcmp(key, "execution_retry_times") == 0) {
- if (str_to_uint(value, &db->execution_retry_times) < 0) {
+ if (str_to_uint(value, &set->execution_retry_times) < 0) {
*error_r = t_strdup_printf(
"Invalid execution_retry_times %s",
value);
return -1;
#endif
} else if (strcmp(key, "page_size") == 0) {
- if (str_to_uint(value, &db->page_size) < 0) {
+ if (str_to_uint(value, &set->page_size) < 0) {
*error_r = t_strdup_printf(
"Invalid page_size: %s",
value);
return -1;
}
} else if (strcmp(key, "ssl_ca") == 0) {
- db->ssl_ca_file = i_strdup(value);
+ if (settings_parse_read_file(value, value, pool,
+ &ssl_set->ssl_client_ca_file,
+ &error) < 0) {
+ *error_r = t_strdup_printf(
+ "Invalid ssl_ca: %s", error);
+ return -1;
+ }
} else if (strcmp(key, "ssl_cert_file") == 0) {
- db->ssl_cert_file = i_strdup(value);
+ if (settings_parse_read_file(value, value, pool,
+ &ssl_set->ssl_client_cert_file,
+ &error) < 0) {
+ *error_r = t_strdup_printf(
+ "Invalid ssl_cert_file: %s", error);
+ return -1;
+ }
} else if (strcmp(key, "ssl_private_key_file") == 0) {
- db->ssl_private_key_file = i_strdup(value);
+ if (settings_parse_read_file(value, value, pool,
+ &ssl_set->ssl_client_key_file,
+ &error) < 0) {
+ *error_r = t_strdup_printf(
+ "Invalid ssl_private_key_file: %s", error);
+ return -1;
+ }
} else if (strcmp(key, "ssl_private_key_password") == 0) {
- db->ssl_private_key_password = i_strdup(value);
+ ssl_set->ssl_client_key_password =
+ p_strdup(pool, value);
} else if (strcmp(key, "ssl_verify") == 0) {
if (strcmp(value, "none") == 0) {
- db->ssl_verify_flags = CASS_SSL_VERIFY_NONE;
+ set->parsed_ssl_verify_flags =
+ CASS_SSL_VERIFY_NONE;
} else if (strcmp(value, "cert") == 0) {
- db->ssl_verify_flags = CASS_SSL_VERIFY_PEER_CERT;
+ set->parsed_ssl_verify_flags =
+ CASS_SSL_VERIFY_PEER_CERT;
} else if (strcmp(value, "cert-ip") == 0) {
- db->ssl_verify_flags =
+ set->parsed_ssl_verify_flags =
CASS_SSL_VERIFY_PEER_CERT |
CASS_SSL_VERIFY_PEER_IDENTITY;
} else {
}
}
- if (!read_fallback_set)
- db->read_fallback_consistency = db->read_consistency;
- if (!write_fallback_set)
- db->write_fallback_consistency = db->write_consistency;
- if (!delete_fallback_set)
- db->delete_fallback_consistency = db->delete_consistency;
+ if (!read_fallback_set) {
+ set->parsed_read_fallback_consistency =
+ set->parsed_read_consistency;
+ }
+ if (!write_fallback_set) {
+ set->parsed_write_fallback_consistency =
+ set->parsed_write_consistency;
+ }
+ if (!delete_fallback_set) {
+ set->parsed_delete_fallback_consistency =
+ set->parsed_delete_consistency;
+ }
- if (str_len(hosts) == 0) {
+ if (array_count(&set->hosts) == 0) {
*error_r = t_strdup_printf("No hosts given in connect string");
return -1;
}
- if (db->keyspace == NULL) {
+ if (set->keyspace[0] == '\0') {
*error_r = t_strdup_printf("No dbname given in connect string");
return -1;
} else {
i_free(db->table_prefix);
- db->table_prefix = i_strdup_printf("%s.", db->keyspace);
+ db->table_prefix = i_strdup_printf("%s.", set->keyspace);
}
- if ((db->ssl_cert_file != NULL && db->ssl_private_key_file == NULL) ||
- (db->ssl_cert_file == NULL && db->ssl_private_key_file != NULL)) {
+ if ((ssl_set->ssl_client_cert_file[0] != '\0' &&
+ ssl_set->ssl_client_key_file[0] == '\0') ||
+ (ssl_set->ssl_client_cert_file[0] == '\0' &&
+ ssl_set->ssl_client_key_file[0] != '\0')) {
*error_r = "ssl_cert_file and ssl_private_key_file need to be both set";
return -1;
}
-
- db->hosts = i_strdup(str_c(hosts));
return 0;
}
const char *error;
int fd;
- if (var_expand(path, db->metrics_path, tab, &error) <= 0) {
+ if (var_expand(path, db->set->metrics_path, tab, &error) <= 0) {
e_error(db->api.event, "Failed to expand metrics_path=%s: %s",
- db->metrics_path, error);
+ db->set->metrics_path, error);
return;
}
*_db = NULL;
event_unref(&db->api.event);
- i_free(db->metrics_path);
- i_free(db->hosts);
i_free(db->error);
i_free(db->table_prefix);
- i_free(db->keyspace);
- i_free(db->user);
- i_free(db->password);
- i_free(db->ssl_ca_file);
- i_free(db->ssl_cert_file);
- i_free(db->ssl_private_key_file);
- i_free_and_null(db->ssl_private_key_password);
array_free(&db->api.module_contexts);
if (db->ssl != NULL)
cass_ssl_free(db->ssl);
+ settings_free(db->set);
+ settings_free(db->ssl_set);
i_free(db);
}
static int driver_cassandra_init_ssl(struct cassandra_db *db, const char **error_r)
{
- buffer_t *buf = t_buffer_create(512);
+ const struct ssl_settings *ssl_set = db->ssl_set;
+ struct settings_file file;
CassError c_err;
db->ssl = cass_ssl_new();
i_assert(db->ssl != NULL);
- if (db->ssl_ca_file != NULL) {
- if (buffer_append_full_file(buf, db->ssl_ca_file, SIZE_MAX,
- error_r) < 0)
- return -1;
- if ((c_err = cass_ssl_add_trusted_cert(db->ssl, str_c(buf))) != CASS_OK) {
+ if (ssl_set->ssl_client_ca_file[0] != '\0') {
+ settings_file_get(ssl_set->ssl_client_ca_file,
+ unsafe_data_stack_pool, &file);
+ if ((c_err = cass_ssl_add_trusted_cert(db->ssl, file.content)) != CASS_OK) {
*error_r = cass_error_desc(c_err);
return -1;
}
}
- if (db->ssl_private_key_file != NULL && db->ssl_cert_file != NULL) {
- buffer_set_used_size(buf, 0);
- if (buffer_append_full_file(buf, db->ssl_private_key_file,
- SIZE_MAX, error_r) < 0)
- return -1;
- c_err = cass_ssl_set_private_key(db->ssl, str_c(buf),
- db->ssl_private_key_password);
- safe_memset(buffer_get_modifiable_data(buf, NULL), 0, buf->used);
+ if (ssl_set->ssl_client_cert_file[0] != '\0') {
+ settings_file_get(ssl_set->ssl_client_key_file,
+ unsafe_data_stack_pool, &file);
+ c_err = cass_ssl_set_private_key(db->ssl, file.content,
+ ssl_set->ssl_client_key_password);
if (c_err != CASS_OK) {
*error_r = cass_error_desc(c_err);
return -1;
}
- buffer_set_used_size(buf, 0);
- if (buffer_append_full_file(buf, db->ssl_cert_file, SIZE_MAX, error_r) < 0)
- return -1;
- if ((c_err = cass_ssl_set_cert(db->ssl, str_c(buf))) != CASS_OK) {
+ settings_file_get(ssl_set->ssl_client_cert_file,
+ unsafe_data_stack_pool, &file);
+ if ((c_err = cass_ssl_set_cert(db->ssl, file.content)) != CASS_OK) {
*error_r = cass_error_desc(c_err);
return -1;
}
}
- cass_ssl_set_verify_flags(db->ssl, db->ssl_verify_flags);
+ cass_ssl_set_verify_flags(db->ssl, db->set->parsed_ssl_verify_flags);
return 0;
}
-static int driver_cassandra_init_full_v(const struct sql_legacy_settings *set,
- struct sql_db **db_r,
- const char **error_r)
+static int
+driver_cassandra_init_full_v(const struct sql_legacy_settings *legacy_set,
+ struct sql_db **db_r,
+ const char **error_r)
{
struct cassandra_db *db;
int ret;
db = i_new(struct cassandra_db, 1);
db->api = driver_cassandra_db;
db->fd_pipe[0] = db->fd_pipe[1] = -1;
- db->api.event = event_create(set->event_parent);
+ db->api.event = event_create(legacy_set->event_parent);
event_add_category(db->api.event, &event_category_cassandra);
event_set_append_log_prefix(db->api.event, "cassandra: ");
T_BEGIN {
ret = driver_cassandra_parse_connect_string(db,
- set->connect_string, error_r);
+ legacy_set->connect_string, error_r);
} T_END_PASS_STR_IF(ret < 0, error_r);
if (ret < 0) {
return -1;
}
- if (db->init_ssl && driver_cassandra_init_ssl(db, error_r) < 0) {
+ const struct cassandra_settings *set = db->set;
+ if (set->parsed_use_ssl && driver_cassandra_init_ssl(db, error_r) < 0) {
driver_cassandra_free(&db);
return -1;
}
driver_cassandra_init_log();
- cass_log_set_level(db->log_level);
- if (db->log_level >= CASS_LOG_DEBUG)
+ cass_log_set_level(set->parsed_log_level);
+ if (set->parsed_log_level >= CASS_LOG_DEBUG)
event_set_forced_debug(db->api.event, TRUE);
- if (db->protocol_version > 0 && db->protocol_version < 4) {
+ if (set->protocol_version > 0 && set->protocol_version < 4) {
/* binding with column indexes requires v4 */
db->api.v.prepared_statement_init = NULL;
db->api.v.prepared_statement_deinit = NULL;
db->timestamp_gen = cass_timestamp_gen_monotonic_new();
db->cluster = cass_cluster_new();
- cass_cluster_set_ssl(db->cluster, db->ssl);
cass_cluster_set_timestamp_gen(db->cluster, db->timestamp_gen);
- cass_cluster_set_connect_timeout(db->cluster, db->connect_timeout_msecs);
- cass_cluster_set_request_timeout(db->cluster, db->request_timeout_msecs);
- cass_cluster_set_contact_points(db->cluster, db->hosts);
- if (db->user != NULL && db->password != NULL)
- cass_cluster_set_credentials(db->cluster, db->user, db->password);
- if (db->port != 0)
- cass_cluster_set_port(db->cluster, db->port);
- if (db->protocol_version != 0)
- cass_cluster_set_protocol_version(db->cluster, db->protocol_version);
- if (db->num_threads != 0)
- cass_cluster_set_num_threads_io(db->cluster, db->num_threads);
- if (db->latency_aware_routing)
+ cass_cluster_set_connect_timeout(db->cluster, set->connect_timeout_msecs);
+ cass_cluster_set_request_timeout(db->cluster, set->request_timeout_msecs);
+ cass_cluster_set_contact_points(db->cluster,
+ p_array_const_string_join(unsafe_data_stack_pool,
+ &set->hosts, ","));
+ if (set->user[0] != '\0' && set->password[0] != '\0')
+ cass_cluster_set_credentials(db->cluster, set->user, set->password);
+ if (set->port != 0)
+ cass_cluster_set_port(db->cluster, set->port);
+ if (set->protocol_version != 0)
+ cass_cluster_set_protocol_version(db->cluster, set->protocol_version);
+ if (set->num_threads != 0)
+ cass_cluster_set_num_threads_io(db->cluster, set->num_threads);
+ if (set->latency_aware_routing)
cass_cluster_set_latency_aware_routing(db->cluster, cass_true);
- if (db->heartbeat_interval_secs != 0)
+ if (set->heartbeat_interval_secs != 0)
cass_cluster_set_connection_heartbeat_interval(db->cluster,
- db->heartbeat_interval_secs);
- if (db->log_retries) {
+ set->heartbeat_interval_secs);
+ if (set->log_retries) {
db->default_policy = cass_retry_policy_default_new();
db->logging_policy = cass_retry_policy_logging_new(db->default_policy);
cass_cluster_set_retry_policy(db->cluster, db->logging_policy);
}
- if (db->idle_timeout_secs != 0)
+ if (set->idle_timeout_secs != 0)
cass_cluster_set_connection_idle_timeout(db->cluster,
- db->idle_timeout_secs);
+ set->idle_timeout_secs);
#ifdef HAVE_CASSANDRA_SPECULATIVE_POLICY
- if (db->execution_retry_times > 0 && db->execution_retry_interval_msecs > 0)
+ if (set->execution_retry_times > 0 && set->execution_retry_interval_msecs > 0)
cass_cluster_set_constant_speculative_execution_policy(
- db->cluster, db->execution_retry_interval_msecs,
- db->execution_retry_times);
+ db->cluster, set->execution_retry_interval_msecs,
+ set->execution_retry_times);
#endif
- if (db->ssl != NULL) {
+ if (set->parsed_use_ssl) {
e_debug(db->api.event, "Enabling TLS for cluster");
cass_cluster_set_ssl(db->cluster, db->ssl);
}
db->session = cass_session_new();
- if (db->metrics_path != NULL)
+ if (set->metrics_path[0] != '\0')
db->to_metrics = timeout_add(1000, driver_cassandra_metrics_write,
db);
i_array_init(&db->results, 16);
e->add_str("error", result->error);
struct event *event = e->event();
- if (db->debug_queries)
+ if (db->set->debug_queries)
event_set_forced_debug(event, TRUE);
- if (reply_usecs/1000 >= db->warn_timeout_msecs) {
+ if (reply_usecs/1000 >= db->set->warn_timeout_msecs) {
db->counters[CASSANDRA_COUNTER_TYPE_QUERY_SLOW]++;
e_warning(event, "%s", str_c(str));
} else {
else
cass_statement_set_is_idempotent(result->statement, cass_true);
#endif
- if (db->page_size > 0 && result->batch == NULL)
- cass_statement_set_paging_size(result->statement, db->page_size);
+ if (db->set->page_size > 0 && result->batch == NULL) {
+ cass_statement_set_paging_size(result->statement,
+ db->set->page_size);
+ }
}
static void driver_cassandra_result_send_query(struct cassandra_result *result)
result->row_pool = pool_alloconly_create("cassandra result", 512);
switch (result->query_type) {
case CASSANDRA_QUERY_TYPE_READ:
- result->consistency = db->read_consistency;
- result->fallback_consistency = db->read_fallback_consistency;
+ result->consistency = db->set->parsed_read_consistency;
+ result->fallback_consistency =
+ db->set->parsed_read_fallback_consistency;
break;
case CASSANDRA_QUERY_TYPE_READ_MORE:
/* consistency is already set and we don't want to fallback
result->fallback_consistency = result->consistency;
break;
case CASSANDRA_QUERY_TYPE_WRITE:
- result->consistency = db->write_consistency;
- result->fallback_consistency = db->write_fallback_consistency;
+ result->consistency = db->set->parsed_write_consistency;
+ result->fallback_consistency =
+ db->set->parsed_write_fallback_consistency;
break;
case CASSANDRA_QUERY_TYPE_DELETE:
- result->consistency = db->delete_consistency;
- result->fallback_consistency = db->delete_fallback_consistency;
+ result->consistency = db->set->parsed_delete_consistency;
+ result->fallback_consistency =
+ db->set->parsed_delete_fallback_consistency;
break;
case CASSANDRA_QUERY_TYPE_COUNT:
i_unreached();
{
struct cassandra_db *db = container_of(result->api.db, struct cassandra_db, api);
- if (db->page_size == 0) {
+ if (db->set->page_size == 0) {
/* no paging */
return 0;
}