From 7d7bf06d06ba88a90b98cc1a04e2230fd4a005df Mon Sep 17 00:00:00 2001 From: Timo Sirainen Date: Fri, 16 Feb 2024 09:46:17 +0200 Subject: [PATCH] lib-sql: cassandra - Implement new init() API --- src/config/Makefile.am | 3 +- src/lib-sql/driver-cassandra.c | 172 ++++++++++++++++++++++++++++++++- 2 files changed, 171 insertions(+), 4 deletions(-) diff --git a/src/config/Makefile.am b/src/config/Makefile.am index 8bd38c9e35..5f0cf3772b 100644 --- a/src/config/Makefile.am +++ b/src/config/Makefile.am @@ -24,7 +24,8 @@ AM_CPPFLAGS = \ -DMODULEDIR=\""$(moduledir)"\" \ -DSSLDIR=\""$(ssldir)\"" \ -DSYSCONFDIR=\""$(pkgsysconfdir)"\" \ - $(BINARY_CFLAGS) + $(BINARY_CFLAGS) \ + $(SQL_CFLAGS) noinst_LTLIBRARIES = libconfig.la diff --git a/src/lib-sql/driver-cassandra.c b/src/lib-sql/driver-cassandra.c index 3b3f940f14..6a430bfcfa 100644 --- a/src/lib-sql/driver-cassandra.c +++ b/src/lib-sql/driver-cassandra.c @@ -21,7 +21,9 @@ #include #include #include +/* */ #include +/* */ #include #define IS_CONNECTED(db) \ @@ -91,6 +93,10 @@ static const char *cassandra_result_type_prefixes[] = { static_assert_array_size(cassandra_result_type_prefixes, CASSANDRA_RESULT_TYPE_COUNT); +/* */ +static bool +cassandra_settings_check(void *_set, pool_t pool, const char **error_r); + struct cassandra_settings { pool_t pool; @@ -138,6 +144,55 @@ struct cassandra_settings { bool parsed_use_ssl; CassSslVerifyFlags parsed_ssl_verify_flags; }; +/* */ + +#undef DEF +#define DEF(type, name) \ + SETTING_DEFINE_STRUCT_##type("cassandra_"#name, name, struct cassandra_settings) +#undef DEF_SECS +#define DEF_SECS(name) \ + SETTING_DEFINE_STRUCT_TIME("cassandra_"#name, name##_secs, struct cassandra_settings) +#undef DEF_MSECS +#define DEF_MSECS(name) \ + SETTING_DEFINE_STRUCT_TIME_MSECS("cassandra_"#name, name##_msecs, struct cassandra_settings) +static const struct setting_define cassandra_setting_defines[] = { + { .type = SET_FILTER_NAME, .key = "cassandra", }, + + DEF(BOOLLIST, hosts), + DEF(IN_PORT, port), + DEF(STR, keyspace), + DEF(STR, user), + DEF(STR, password), + + DEF(STR, metrics_path), + DEF(ENUM, log_level), + DEF(BOOL, debug_queries), + DEF(BOOL, log_retries), + DEF(BOOL, latency_aware_routing), + + DEF(STR, read_consistency), + DEF(STR, write_consistency), + DEF(STR, delete_consistency), + DEF(STR, read_fallback_consistency), + DEF(STR, write_fallback_consistency), + DEF(STR, delete_fallback_consistency), + + DEF_MSECS(connect_timeout), + DEF_MSECS(request_timeout), + DEF_MSECS(warn_timeout), + + DEF(UINT, protocol_version), + DEF(UINT, num_threads), + DEF_SECS(heartbeat_interval), + DEF_SECS(idle_timeout), + DEF_MSECS(execution_retry_interval), + DEF(UINT, execution_retry_times), + DEF(UINT, page_size), + + DEF(ENUM, ssl), + + SETTING_DEFINE_LIST_END +}; static struct cassandra_settings cassandra_default_settings = { .hosts = ARRAY_INIT, @@ -171,7 +226,18 @@ static struct cassandra_settings cassandra_default_settings = { .execution_retry_times = 0, .page_size = 0, - .ssl = "no:cert-only:cert-ip:cert-dns", + .ssl = "no:cert-only:cert-ip", +}; + +const struct setting_parser_info cassandra_setting_parser_info = { + .name = "cassandra", + + .defines = cassandra_setting_defines, + .defaults = &cassandra_default_settings, + + .struct_size = sizeof(struct cassandra_settings), + .pool_offset1 = 1 + offsetof(struct cassandra_settings, pool), + .check_func = cassandra_settings_check, }; struct cassandra_callback { @@ -317,6 +383,7 @@ struct cassandra_sql_prepared_statement { extern const struct sql_db driver_cassandra_db; extern const struct sql_result driver_cassandra_result; +/* */ static struct { CassConsistency consistency; const char *name; @@ -345,6 +412,7 @@ static struct { { CASS_LOG_DEBUG, "debug" }, { CASS_LOG_TRACE, "trace" } }; +/* */ static struct event_category event_category_cassandra = { .parent = &event_category_sql, @@ -433,6 +501,7 @@ static void driver_cassandra_init_log(void) } } +/* */ static int consistency_parse(const char *str, CassConsistency *consistency_r) { unsigned int i; @@ -459,6 +528,76 @@ static int log_level_parse(const char *str, CassLogLevel *log_level_r) return -1; } +static bool +cassandra_settings_check(void *_set, pool_t pool ATTR_UNUSED, + const char **error_r) +{ + struct cassandra_settings *set = _set; + const struct { + const char *set_name; + const char *set_value; + const CassConsistency *empty_default; + CassConsistency *output; + } consistencies[] = { + { "read_consistency", set->read_consistency, NULL, + &set->parsed_read_consistency }, + { "write_consistency", set->write_consistency, NULL, + &set->parsed_write_consistency }, + { "delete_consistency", set->delete_consistency, NULL, + &set->parsed_delete_consistency }, + /* Parse fallback consistencies after the ones above, so + empty_default can access already checked consistency + values. */ + { "read_fallback_consistency", set->read_fallback_consistency, + &set->parsed_read_consistency, + &set->parsed_read_fallback_consistency }, + { "write_fallback_consistency", set->write_fallback_consistency, + &set->parsed_write_consistency, + &set->parsed_write_fallback_consistency }, + { "delete_fallback_consistency", set->delete_fallback_consistency, + &set->parsed_delete_consistency, + &set->parsed_delete_fallback_consistency }, + }; + + for (unsigned int i = 0; i < N_ELEMENTS(consistencies); i++) { + if (consistencies[i].set_value[0] == '\0' && + consistencies[i].empty_default != NULL) { + *consistencies[i].output = + *consistencies[i].empty_default; + } else if (consistency_parse(consistencies[i].set_value, + consistencies[i].output) < 0) { + *error_r = t_strdup_printf( + "Unknown cassandra_%s: %s", + consistencies[i].set_name, + consistencies[i].set_value); + return FALSE; + } + } + if (log_level_parse(set->log_level, &set->parsed_log_level) < 0) { + *error_r = t_strdup_printf( + "Unknown cassandra_log_level: %s", set->log_level); + return FALSE; + } + + if (strcmp(set->ssl, "no") != 0) { + set->parsed_use_ssl = TRUE; + if (strcmp(set->ssl, "cert-only") == 0) { + set->parsed_ssl_verify_flags = + CASS_SSL_VERIFY_PEER_CERT; + } else if (strcmp(set->ssl, "cert-ip") == 0) { + set->parsed_ssl_verify_flags = + CASS_SSL_VERIFY_PEER_CERT | + CASS_SSL_VERIFY_PEER_IDENTITY; + } else { + *error_r = t_strdup_printf( + "Unsupported cassandra_ssl: '%s'", set->ssl); + return FALSE; + } + } + return TRUE; +} +/* */ + static void driver_cassandra_set_state(struct cassandra_db *db, enum sql_db_state state) { @@ -1124,8 +1263,12 @@ static int driver_cassandra_init_ssl(struct cassandra_db *db, const char **error } } - cass_ssl_set_verify_flags(db->ssl, db->set->parsed_ssl_verify_flags); - + if (!ssl_set->ssl_client_require_valid_cert) + cass_ssl_set_verify_flags(db->ssl, CASS_SSL_VERIFY_NONE); + else { + cass_ssl_set_verify_flags(db->ssl, + db->set->parsed_ssl_verify_flags); + } return 0; } @@ -1220,6 +1363,28 @@ driver_cassandra_init_common(struct event *event_parent, return 0; } +static int +driver_cassandra_init_v(struct event *event, struct sql_db **db_r, + const char **error_r) +{ + const struct cassandra_settings *set; + const struct ssl_settings *ssl_set; + + if (settings_get(event, &cassandra_setting_parser_info, 0, + &set, error_r) < 0) + return -1; + + if (strcmp(set->ssl, "no") != 0) { + if (settings_get(event, &ssl_setting_parser_info, 0, + &ssl_set, error_r) < 0) { + settings_free(set); + return -1; + } + } + + return driver_cassandra_init_common(event, set, ssl_set, db_r, error_r); +} + static int driver_cassandra_init_full_v(const struct sql_legacy_settings *legacy_set, struct sql_db **db_r, @@ -2983,6 +3148,7 @@ const struct sql_db driver_cassandra_db = { .flags = SQL_DB_FLAG_PREP_STATEMENTS, .v = { + .init = driver_cassandra_init_v, .init_legacy_full = driver_cassandra_init_full_v, .deinit = driver_cassandra_deinit_v, .connect = driver_cassandra_connect, -- 2.47.3