From: Timo Sirainen Date: Wed, 14 Feb 2024 12:29:51 +0000 (+0200) Subject: lib-sql: cassandra - Move settings to struct cassandra_settings X-Git-Tag: 2.4.1~978 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=5084dfca5e15f13ccabea36464b3baaa9b025471;p=thirdparty%2Fdovecot%2Fcore.git lib-sql: cassandra - Move settings to struct cassandra_settings --- diff --git a/src/lib-sql/driver-cassandra.c b/src/lib-sql/driver-cassandra.c index 259d23eec1..9e55898b1e 100644 --- a/src/lib-sql/driver-cassandra.c +++ b/src/lib-sql/driver-cassandra.c @@ -13,6 +13,8 @@ #include "time-util.h" #include "var-expand.h" #include "safe-memset.h" +#include "settings.h" +#include "ssl-settings.h" #include "sql-api-private.h" #ifdef BUILD_CASSANDRA @@ -89,6 +91,89 @@ static const char *cassandra_result_type_prefixes[] = { static_assert_array_size(cassandra_result_type_prefixes, CASSANDRA_RESULT_TYPE_COUNT); +struct cassandra_settings { + pool_t pool; + + ARRAY_TYPE(const_string) hosts; + in_port_t port; + const char *keyspace; + const char *user; + const char *password; + + const char *metrics_path; + const char *log_level; + bool debug_queries; + bool log_retries; + bool latency_aware_routing; + + const char *read_consistency; + const char *write_consistency; + const char *delete_consistency; + const char *read_fallback_consistency; + const char *write_fallback_consistency; + const char *delete_fallback_consistency; + + unsigned int connect_timeout_msecs; + unsigned int request_timeout_msecs; + unsigned int warn_timeout_msecs; + + unsigned int protocol_version; + unsigned int num_threads; + unsigned int heartbeat_interval_secs; + unsigned int idle_timeout_secs; + unsigned int execution_retry_interval_msecs; + unsigned int execution_retry_times; + unsigned int page_size; + + const char *ssl; + + /* generated: */ + CassLogLevel parsed_log_level; + CassConsistency parsed_read_consistency; + CassConsistency parsed_write_consistency; + CassConsistency parsed_delete_consistency; + CassConsistency parsed_read_fallback_consistency; + CassConsistency parsed_write_fallback_consistency; + CassConsistency parsed_delete_fallback_consistency; + bool parsed_use_ssl; + CassSslVerifyFlags parsed_ssl_verify_flags; +}; + +static struct cassandra_settings cassandra_default_settings = { + .hosts = ARRAY_INIT, + .port = 0, + .keyspace = "", + .user = "", + .password = "", + + .metrics_path = "", + .log_level = "warn:critical:error:info:debug:trace", + .debug_queries = FALSE, + .log_retries = FALSE, + .latency_aware_routing = FALSE, + + .read_consistency = "local-quorum", + .write_consistency = "local-quorum", + .delete_consistency = "local-quorum", + .read_fallback_consistency = "", + .write_fallback_consistency = "", + .delete_fallback_consistency = "", + + .connect_timeout_msecs = SQL_CONNECT_TIMEOUT_SECS * 1000, + .request_timeout_msecs = SQL_QUERY_TIMEOUT_SECS * 1000, + .warn_timeout_msecs = CASS_QUERY_DEFAULT_WARN_TIMEOUT_MSECS, + + .protocol_version = 0, + .num_threads = 0, + .heartbeat_interval_secs = 0, + .idle_timeout_secs = 0, + .execution_retry_interval_msecs = 0, + .execution_retry_times = 0, + .page_size = 0, + + .ssl = "no:cert-only:cert-ip:cert-dns", +}; + struct cassandra_callback { unsigned int id; struct timeout *to; @@ -101,23 +186,9 @@ struct cassandra_callback { struct cassandra_db { struct sql_db api; - char *hosts, *keyspace, *table_prefix, *user, *password; - CassConsistency read_consistency, write_consistency, delete_consistency; - CassConsistency read_fallback_consistency, write_fallback_consistency; - CassConsistency delete_fallback_consistency; - CassLogLevel log_level; - bool debug_queries; - bool log_retries; - bool latency_aware_routing; - bool init_ssl; - unsigned int protocol_version; - unsigned int num_threads; - unsigned int connect_timeout_msecs, request_timeout_msecs; - unsigned int warn_timeout_msecs; - unsigned int heartbeat_interval_secs, idle_timeout_secs; - unsigned int execution_retry_interval_msecs, execution_retry_times; - unsigned int page_size; - in_port_t port; + const struct cassandra_settings *set; + const struct ssl_settings *ssl_set; + char *table_prefix; CassCluster *cluster; CassSession *session; @@ -133,13 +204,6 @@ struct cassandra_db { ARRAY(struct cassandra_result *) results; unsigned int callback_ids; - char *metrics_path; - char *ssl_ca_file; - char *ssl_cert_file; - char *ssl_private_key_file; - char *ssl_private_key_password; - CassSslVerifyFlags ssl_verify_flags; - struct timeout *to_metrics; uint64_t counters[CASSANDRA_COUNTER_COUNT]; @@ -646,18 +710,27 @@ static int driver_cassandra_parse_connect_string(struct cassandra_db *db, const char *connect_string, const char **error_r) { + struct cassandra_settings *set; + pool_t pool; const char *const *args, *key, *value, *error; - string_t *hosts = t_str_new(64); bool read_fallback_set = FALSE, write_fallback_set = FALSE; bool delete_fallback_set = FALSE; - db->log_level = CASS_LOG_WARN; - db->read_consistency = CASS_CONSISTENCY_LOCAL_QUORUM; - db->write_consistency = CASS_CONSISTENCY_LOCAL_QUORUM; - db->delete_consistency = CASS_CONSISTENCY_LOCAL_QUORUM; - db->connect_timeout_msecs = SQL_CONNECT_TIMEOUT_SECS*1000; - db->request_timeout_msecs = SQL_QUERY_TIMEOUT_SECS*1000; - db->warn_timeout_msecs = CASS_QUERY_DEFAULT_WARN_TIMEOUT_MSECS; + pool = pool_alloconly_create("cassandra settings", 1024); + set = p_new(pool, struct cassandra_settings, 1); + *set = cassandra_default_settings; + set->pool = pool; + + struct ssl_settings *ssl_set = p_new(pool, struct ssl_settings, 1); + *ssl_set = ssl_default_settings; + ssl_set->pool = pool; + pool_ref(pool); + + p_array_init(&set->hosts, pool, 8); + set->parsed_log_level = CASS_LOG_WARN; + set->parsed_read_consistency = CASS_CONSISTENCY_LOCAL_QUORUM; + set->parsed_write_consistency = CASS_CONSISTENCY_LOCAL_QUORUM; + set->parsed_delete_consistency = CASS_CONSISTENCY_LOCAL_QUORUM; args = t_strsplit_spaces(connect_string, " "); for (; *args != NULL; args++) { @@ -670,36 +743,32 @@ static int driver_cassandra_parse_connect_string(struct cassandra_db *db, key = t_strdup_until(*args, value++); if (str_begins_with(key, "ssl_")) - db->init_ssl = TRUE; + set->parsed_use_ssl = TRUE; if (strcmp(key, "host") == 0) { - if (str_len(hosts) > 0) - str_append_c(hosts, ','); - str_append(hosts, value); + value = p_strdup(pool, value); + array_push_back(&set->hosts, &value); } else if (strcmp(key, "port") == 0) { - if (net_str2port(value, &db->port) < 0) { + if (net_str2port(value, &set->port) < 0) { *error_r = t_strdup_printf( "Invalid port: %s", value); return -1; } } else if (strcmp(key, "dbname") == 0 || strcmp(key, "keyspace") == 0) { - i_free(db->keyspace); - db->keyspace = i_strdup(value); + set->keyspace = p_strdup(pool, value); } else if (strcmp(key, "user") == 0) { - i_free(db->user); - db->user = i_strdup(value); + set->user = p_strdup(pool, value); } else if (strcmp(key, "password") == 0) { - i_free(db->password); - db->password = i_strdup(value); + set->password = p_strdup(pool, value); } else if (strcmp(key, "read_consistency") == 0) { - if (consistency_parse(value, &db->read_consistency) < 0) { + if (consistency_parse(value, &set->parsed_read_consistency) < 0) { *error_r = t_strdup_printf( "Unknown read_consistency: %s", value); return -1; } } else if (strcmp(key, "read_fallback_consistency") == 0) { - if (consistency_parse(value, &db->read_fallback_consistency) < 0) { + if (consistency_parse(value, &set->parsed_read_fallback_consistency) < 0) { *error_r = t_strdup_printf( "Unknown read_fallback_consistency: %s", value); return -1; @@ -707,14 +776,14 @@ static int driver_cassandra_parse_connect_string(struct cassandra_db *db, read_fallback_set = TRUE; } else if (strcmp(key, "write_consistency") == 0) { if (consistency_parse(value, - &db->write_consistency) < 0) { + &set->parsed_write_consistency) < 0) { *error_r = t_strdup_printf( "Unknown write_consistency: %s", value); return -1; } } else if (strcmp(key, "write_fallback_consistency") == 0) { if (consistency_parse(value, - &db->write_fallback_consistency) < 0) { + &set->parsed_write_fallback_consistency) < 0) { *error_r = t_strdup_printf( "Unknown write_fallback_consistency: %s", value); @@ -723,14 +792,14 @@ static int driver_cassandra_parse_connect_string(struct cassandra_db *db, write_fallback_set = TRUE; } else if (strcmp(key, "delete_consistency") == 0) { if (consistency_parse(value, - &db->delete_consistency) < 0) { + &set->parsed_delete_consistency) < 0) { *error_r = t_strdup_printf( "Unknown delete_consistency: %s", value); return -1; } } else if (strcmp(key, "delete_fallback_consistency") == 0) { if (consistency_parse(value, - &db->delete_fallback_consistency) < 0) { + &set->parsed_delete_fallback_consistency) < 0) { *error_r = t_strdup_printf( "Unknown delete_fallback_consistency: %s", value); @@ -738,31 +807,31 @@ static int driver_cassandra_parse_connect_string(struct cassandra_db *db, } delete_fallback_set = TRUE; } else if (strcmp(key, "log_level") == 0) { - if (log_level_parse(value, &db->log_level) < 0) { + if (log_level_parse(value, &set->parsed_log_level) < 0) { *error_r = t_strdup_printf( "Unknown log_level: %s", value); return -1; } } else if (strcmp(key, "debug_queries") == 0) { - db->debug_queries = TRUE; + set->debug_queries = TRUE; } else if (strcmp(key, "log_retries") == 0) { - db->log_retries = TRUE; + set->log_retries = TRUE; } else if (strcmp(key, "latency_aware_routing") == 0) { - db->latency_aware_routing = TRUE; + set->latency_aware_routing = TRUE; } else if (strcmp(key, "version") == 0) { - if (str_to_uint(value, &db->protocol_version) < 0) { + if (str_to_uint(value, &set->protocol_version) < 0) { *error_r = t_strdup_printf( "Invalid version: %s", value); return -1; } } else if (strcmp(key, "num_threads") == 0) { - if (str_to_uint(value, &db->num_threads) < 0) { + if (str_to_uint(value, &set->num_threads) < 0) { *error_r = t_strdup_printf( "Invalid num_threads: %s", value); return -1; } } else if (strcmp(key, "heartbeat_interval") == 0) { - if (str_parse_get_interval(value, &db->heartbeat_interval_secs, + if (str_parse_get_interval(value, &set->heartbeat_interval_secs, &error) < 0) { *error_r = t_strdup_printf( "Invalid heartbeat_interval '%s': %s", @@ -770,7 +839,7 @@ static int driver_cassandra_parse_connect_string(struct cassandra_db *db, return -1; } } else if (strcmp(key, "idle_timeout") == 0) { - if (str_parse_get_interval(value, &db->idle_timeout_secs, + if (str_parse_get_interval(value, &set->idle_timeout_secs, &error) < 0) { *error_r = t_strdup_printf( "Invalid idle_timeout '%s': %s", @@ -779,7 +848,7 @@ static int driver_cassandra_parse_connect_string(struct cassandra_db *db, } } else if (strcmp(key, "connect_timeout") == 0) { if (str_parse_get_interval_msecs(value, - &db->connect_timeout_msecs, + &set->connect_timeout_msecs, &error) < 0) { *error_r = t_strdup_printf( "Invalid connect_timeout '%s': %s", @@ -788,7 +857,7 @@ static int driver_cassandra_parse_connect_string(struct cassandra_db *db, } } else if (strcmp(key, "request_timeout") == 0) { if (str_parse_get_interval_msecs(value, - &db->request_timeout_msecs, + &set->request_timeout_msecs, &error) < 0) { *error_r = t_strdup_printf( "Invalid request_timeout '%s': %s", @@ -797,7 +866,7 @@ static int driver_cassandra_parse_connect_string(struct cassandra_db *db, } } else if (strcmp(key, "warn_timeout") == 0) { if (str_parse_get_interval_msecs(value, - &db->warn_timeout_msecs, + &set->warn_timeout_msecs, &error) < 0) { *error_r = t_strdup_printf( "Invalid warn_timeout '%s': %s", @@ -805,11 +874,10 @@ static int driver_cassandra_parse_connect_string(struct cassandra_db *db, return -1; } } else if (strcmp(key, "metrics") == 0) { - i_free(db->metrics_path); - db->metrics_path = i_strdup(value); + set->metrics_path = p_strdup(pool, value); } else if (strcmp(key, "execution_retry_interval") == 0) { if (str_parse_get_interval_msecs(value, - &db->execution_retry_interval_msecs, + &set->execution_retry_interval_msecs, &error) < 0) { *error_r = t_strdup_printf( "Invalid execution_retry_interval '%s': %s", @@ -822,7 +890,7 @@ static int driver_cassandra_parse_connect_string(struct cassandra_db *db, return -1; #endif } else if (strcmp(key, "execution_retry_times") == 0) { - if (str_to_uint(value, &db->execution_retry_times) < 0) { + if (str_to_uint(value, &set->execution_retry_times) < 0) { *error_r = t_strdup_printf( "Invalid execution_retry_times %s", value); @@ -834,27 +902,48 @@ static int driver_cassandra_parse_connect_string(struct cassandra_db *db, return -1; #endif } else if (strcmp(key, "page_size") == 0) { - if (str_to_uint(value, &db->page_size) < 0) { + if (str_to_uint(value, &set->page_size) < 0) { *error_r = t_strdup_printf( "Invalid page_size: %s", value); return -1; } } else if (strcmp(key, "ssl_ca") == 0) { - db->ssl_ca_file = i_strdup(value); + if (settings_parse_read_file(value, value, pool, + &ssl_set->ssl_client_ca_file, + &error) < 0) { + *error_r = t_strdup_printf( + "Invalid ssl_ca: %s", error); + return -1; + } } else if (strcmp(key, "ssl_cert_file") == 0) { - db->ssl_cert_file = i_strdup(value); + if (settings_parse_read_file(value, value, pool, + &ssl_set->ssl_client_cert_file, + &error) < 0) { + *error_r = t_strdup_printf( + "Invalid ssl_cert_file: %s", error); + return -1; + } } else if (strcmp(key, "ssl_private_key_file") == 0) { - db->ssl_private_key_file = i_strdup(value); + if (settings_parse_read_file(value, value, pool, + &ssl_set->ssl_client_key_file, + &error) < 0) { + *error_r = t_strdup_printf( + "Invalid ssl_private_key_file: %s", error); + return -1; + } } else if (strcmp(key, "ssl_private_key_password") == 0) { - db->ssl_private_key_password = i_strdup(value); + ssl_set->ssl_client_key_password = + p_strdup(pool, value); } else if (strcmp(key, "ssl_verify") == 0) { if (strcmp(value, "none") == 0) { - db->ssl_verify_flags = CASS_SSL_VERIFY_NONE; + set->parsed_ssl_verify_flags = + CASS_SSL_VERIFY_NONE; } else if (strcmp(value, "cert") == 0) { - db->ssl_verify_flags = CASS_SSL_VERIFY_PEER_CERT; + set->parsed_ssl_verify_flags = + CASS_SSL_VERIFY_PEER_CERT; } else if (strcmp(value, "cert-ip") == 0) { - db->ssl_verify_flags = + set->parsed_ssl_verify_flags = CASS_SSL_VERIFY_PEER_CERT | CASS_SSL_VERIFY_PEER_IDENTITY; } else { @@ -870,32 +959,38 @@ static int driver_cassandra_parse_connect_string(struct cassandra_db *db, } } - if (!read_fallback_set) - db->read_fallback_consistency = db->read_consistency; - if (!write_fallback_set) - db->write_fallback_consistency = db->write_consistency; - if (!delete_fallback_set) - db->delete_fallback_consistency = db->delete_consistency; + if (!read_fallback_set) { + set->parsed_read_fallback_consistency = + set->parsed_read_consistency; + } + if (!write_fallback_set) { + set->parsed_write_fallback_consistency = + set->parsed_write_consistency; + } + if (!delete_fallback_set) { + set->parsed_delete_fallback_consistency = + set->parsed_delete_consistency; + } - if (str_len(hosts) == 0) { + if (array_count(&set->hosts) == 0) { *error_r = t_strdup_printf("No hosts given in connect string"); return -1; } - if (db->keyspace == NULL) { + if (set->keyspace[0] == '\0') { *error_r = t_strdup_printf("No dbname given in connect string"); return -1; } else { i_free(db->table_prefix); - db->table_prefix = i_strdup_printf("%s.", db->keyspace); + db->table_prefix = i_strdup_printf("%s.", set->keyspace); } - if ((db->ssl_cert_file != NULL && db->ssl_private_key_file == NULL) || - (db->ssl_cert_file == NULL && db->ssl_private_key_file != NULL)) { + if ((ssl_set->ssl_client_cert_file[0] != '\0' && + ssl_set->ssl_client_key_file[0] == '\0') || + (ssl_set->ssl_client_cert_file[0] == '\0' && + ssl_set->ssl_client_key_file[0] != '\0')) { *error_r = "ssl_cert_file and ssl_private_key_file need to be both set"; return -1; } - - db->hosts = i_strdup(str_c(hosts)); return 0; } @@ -959,9 +1054,9 @@ static void driver_cassandra_metrics_write(struct cassandra_db *db) const char *error; int fd; - if (var_expand(path, db->metrics_path, tab, &error) <= 0) { + if (var_expand(path, db->set->metrics_path, tab, &error) <= 0) { e_error(db->api.event, "Failed to expand metrics_path=%s: %s", - db->metrics_path, error); + db->set->metrics_path, error); return; } @@ -983,71 +1078,61 @@ static void driver_cassandra_free(struct cassandra_db **_db) *_db = NULL; event_unref(&db->api.event); - i_free(db->metrics_path); - i_free(db->hosts); i_free(db->error); i_free(db->table_prefix); - i_free(db->keyspace); - i_free(db->user); - i_free(db->password); - i_free(db->ssl_ca_file); - i_free(db->ssl_cert_file); - i_free(db->ssl_private_key_file); - i_free_and_null(db->ssl_private_key_password); array_free(&db->api.module_contexts); if (db->ssl != NULL) cass_ssl_free(db->ssl); + settings_free(db->set); + settings_free(db->ssl_set); i_free(db); } static int driver_cassandra_init_ssl(struct cassandra_db *db, const char **error_r) { - buffer_t *buf = t_buffer_create(512); + const struct ssl_settings *ssl_set = db->ssl_set; + struct settings_file file; CassError c_err; db->ssl = cass_ssl_new(); i_assert(db->ssl != NULL); - if (db->ssl_ca_file != NULL) { - if (buffer_append_full_file(buf, db->ssl_ca_file, SIZE_MAX, - error_r) < 0) - return -1; - if ((c_err = cass_ssl_add_trusted_cert(db->ssl, str_c(buf))) != CASS_OK) { + if (ssl_set->ssl_client_ca_file[0] != '\0') { + settings_file_get(ssl_set->ssl_client_ca_file, + unsafe_data_stack_pool, &file); + if ((c_err = cass_ssl_add_trusted_cert(db->ssl, file.content)) != CASS_OK) { *error_r = cass_error_desc(c_err); return -1; } } - if (db->ssl_private_key_file != NULL && db->ssl_cert_file != NULL) { - buffer_set_used_size(buf, 0); - if (buffer_append_full_file(buf, db->ssl_private_key_file, - SIZE_MAX, error_r) < 0) - return -1; - c_err = cass_ssl_set_private_key(db->ssl, str_c(buf), - db->ssl_private_key_password); - safe_memset(buffer_get_modifiable_data(buf, NULL), 0, buf->used); + if (ssl_set->ssl_client_cert_file[0] != '\0') { + settings_file_get(ssl_set->ssl_client_key_file, + unsafe_data_stack_pool, &file); + c_err = cass_ssl_set_private_key(db->ssl, file.content, + ssl_set->ssl_client_key_password); if (c_err != CASS_OK) { *error_r = cass_error_desc(c_err); return -1; } - buffer_set_used_size(buf, 0); - if (buffer_append_full_file(buf, db->ssl_cert_file, SIZE_MAX, error_r) < 0) - return -1; - if ((c_err = cass_ssl_set_cert(db->ssl, str_c(buf))) != CASS_OK) { + settings_file_get(ssl_set->ssl_client_cert_file, + unsafe_data_stack_pool, &file); + if ((c_err = cass_ssl_set_cert(db->ssl, file.content)) != CASS_OK) { *error_r = cass_error_desc(c_err); return -1; } } - cass_ssl_set_verify_flags(db->ssl, db->ssl_verify_flags); + cass_ssl_set_verify_flags(db->ssl, db->set->parsed_ssl_verify_flags); return 0; } -static int driver_cassandra_init_full_v(const struct sql_legacy_settings *set, - struct sql_db **db_r, - const char **error_r) +static int +driver_cassandra_init_full_v(const struct sql_legacy_settings *legacy_set, + struct sql_db **db_r, + const char **error_r) { struct cassandra_db *db; int ret; @@ -1055,13 +1140,13 @@ static int driver_cassandra_init_full_v(const struct sql_legacy_settings *set, db = i_new(struct cassandra_db, 1); db->api = driver_cassandra_db; db->fd_pipe[0] = db->fd_pipe[1] = -1; - db->api.event = event_create(set->event_parent); + db->api.event = event_create(legacy_set->event_parent); event_add_category(db->api.event, &event_category_cassandra); event_set_append_log_prefix(db->api.event, "cassandra: "); T_BEGIN { ret = driver_cassandra_parse_connect_string(db, - set->connect_string, error_r); + legacy_set->connect_string, error_r); } T_END_PASS_STR_IF(ret < 0, error_r); if (ret < 0) { @@ -1069,17 +1154,18 @@ static int driver_cassandra_init_full_v(const struct sql_legacy_settings *set, return -1; } - if (db->init_ssl && driver_cassandra_init_ssl(db, error_r) < 0) { + const struct cassandra_settings *set = db->set; + if (set->parsed_use_ssl && driver_cassandra_init_ssl(db, error_r) < 0) { driver_cassandra_free(&db); return -1; } driver_cassandra_init_log(); - cass_log_set_level(db->log_level); - if (db->log_level >= CASS_LOG_DEBUG) + cass_log_set_level(set->parsed_log_level); + if (set->parsed_log_level >= CASS_LOG_DEBUG) event_set_forced_debug(db->api.event, TRUE); - if (db->protocol_version > 0 && db->protocol_version < 4) { + if (set->protocol_version > 0 && set->protocol_version < 4) { /* binding with column indexes requires v4 */ db->api.v.prepared_statement_init = NULL; db->api.v.prepared_statement_deinit = NULL; @@ -1089,44 +1175,45 @@ static int driver_cassandra_init_full_v(const struct sql_legacy_settings *set, db->timestamp_gen = cass_timestamp_gen_monotonic_new(); db->cluster = cass_cluster_new(); - cass_cluster_set_ssl(db->cluster, db->ssl); cass_cluster_set_timestamp_gen(db->cluster, db->timestamp_gen); - cass_cluster_set_connect_timeout(db->cluster, db->connect_timeout_msecs); - cass_cluster_set_request_timeout(db->cluster, db->request_timeout_msecs); - cass_cluster_set_contact_points(db->cluster, db->hosts); - if (db->user != NULL && db->password != NULL) - cass_cluster_set_credentials(db->cluster, db->user, db->password); - if (db->port != 0) - cass_cluster_set_port(db->cluster, db->port); - if (db->protocol_version != 0) - cass_cluster_set_protocol_version(db->cluster, db->protocol_version); - if (db->num_threads != 0) - cass_cluster_set_num_threads_io(db->cluster, db->num_threads); - if (db->latency_aware_routing) + cass_cluster_set_connect_timeout(db->cluster, set->connect_timeout_msecs); + cass_cluster_set_request_timeout(db->cluster, set->request_timeout_msecs); + cass_cluster_set_contact_points(db->cluster, + p_array_const_string_join(unsafe_data_stack_pool, + &set->hosts, ",")); + if (set->user[0] != '\0' && set->password[0] != '\0') + cass_cluster_set_credentials(db->cluster, set->user, set->password); + if (set->port != 0) + cass_cluster_set_port(db->cluster, set->port); + if (set->protocol_version != 0) + cass_cluster_set_protocol_version(db->cluster, set->protocol_version); + if (set->num_threads != 0) + cass_cluster_set_num_threads_io(db->cluster, set->num_threads); + if (set->latency_aware_routing) cass_cluster_set_latency_aware_routing(db->cluster, cass_true); - if (db->heartbeat_interval_secs != 0) + if (set->heartbeat_interval_secs != 0) cass_cluster_set_connection_heartbeat_interval(db->cluster, - db->heartbeat_interval_secs); - if (db->log_retries) { + set->heartbeat_interval_secs); + if (set->log_retries) { db->default_policy = cass_retry_policy_default_new(); db->logging_policy = cass_retry_policy_logging_new(db->default_policy); cass_cluster_set_retry_policy(db->cluster, db->logging_policy); } - if (db->idle_timeout_secs != 0) + if (set->idle_timeout_secs != 0) cass_cluster_set_connection_idle_timeout(db->cluster, - db->idle_timeout_secs); + set->idle_timeout_secs); #ifdef HAVE_CASSANDRA_SPECULATIVE_POLICY - if (db->execution_retry_times > 0 && db->execution_retry_interval_msecs > 0) + if (set->execution_retry_times > 0 && set->execution_retry_interval_msecs > 0) cass_cluster_set_constant_speculative_execution_policy( - db->cluster, db->execution_retry_interval_msecs, - db->execution_retry_times); + db->cluster, set->execution_retry_interval_msecs, + set->execution_retry_times); #endif - if (db->ssl != NULL) { + if (set->parsed_use_ssl) { e_debug(db->api.event, "Enabling TLS for cluster"); cass_cluster_set_ssl(db->cluster, db->ssl); } db->session = cass_session_new(); - if (db->metrics_path != NULL) + if (set->metrics_path[0] != '\0') db->to_metrics = timeout_add(1000, driver_cassandra_metrics_write, db); i_array_init(&db->results, 16); @@ -1233,9 +1320,9 @@ static void driver_cassandra_log_result(struct cassandra_result *result, e->add_str("error", result->error); struct event *event = e->event(); - if (db->debug_queries) + if (db->set->debug_queries) event_set_forced_debug(event, TRUE); - if (reply_usecs/1000 >= db->warn_timeout_msecs) { + if (reply_usecs/1000 >= db->set->warn_timeout_msecs) { db->counters[CASSANDRA_COUNTER_TYPE_QUERY_SLOW]++; e_warning(event, "%s", str_c(str)); } else { @@ -1537,8 +1624,10 @@ static void driver_cassandra_init_statement(struct cassandra_result *result) else cass_statement_set_is_idempotent(result->statement, cass_true); #endif - if (db->page_size > 0 && result->batch == NULL) - cass_statement_set_paging_size(result->statement, db->page_size); + if (db->set->page_size > 0 && result->batch == NULL) { + cass_statement_set_paging_size(result->statement, + db->set->page_size); + } } static void driver_cassandra_result_send_query(struct cassandra_result *result) @@ -1615,8 +1704,9 @@ cassandra_result_connect_and_send_query(struct cassandra_result *result) result->row_pool = pool_alloconly_create("cassandra result", 512); switch (result->query_type) { case CASSANDRA_QUERY_TYPE_READ: - result->consistency = db->read_consistency; - result->fallback_consistency = db->read_fallback_consistency; + result->consistency = db->set->parsed_read_consistency; + result->fallback_consistency = + db->set->parsed_read_fallback_consistency; break; case CASSANDRA_QUERY_TYPE_READ_MORE: /* consistency is already set and we don't want to fallback @@ -1624,12 +1714,14 @@ cassandra_result_connect_and_send_query(struct cassandra_result *result) result->fallback_consistency = result->consistency; break; case CASSANDRA_QUERY_TYPE_WRITE: - result->consistency = db->write_consistency; - result->fallback_consistency = db->write_fallback_consistency; + result->consistency = db->set->parsed_write_consistency; + result->fallback_consistency = + db->set->parsed_write_fallback_consistency; break; case CASSANDRA_QUERY_TYPE_DELETE: - result->consistency = db->delete_consistency; - result->fallback_consistency = db->delete_fallback_consistency; + result->consistency = db->set->parsed_delete_consistency; + result->fallback_consistency = + db->set->parsed_delete_fallback_consistency; break; case CASSANDRA_QUERY_TYPE_COUNT: i_unreached(); @@ -1893,7 +1985,7 @@ static int driver_cassandra_result_next_page(struct cassandra_result *result) { struct cassandra_db *db = container_of(result->api.db, struct cassandra_db, api); - if (db->page_size == 0) { + if (db->set->page_size == 0) { /* no paging */ return 0; }