]> git.ipfire.org Git - thirdparty/dovecot/core.git/commitdiff
lib-sql: Reformat driver-cassandra.c
authorAki Tuomi <aki.tuomi@open-xchange.com>
Fri, 28 Aug 2020 10:13:32 +0000 (13:13 +0300)
committeraki.tuomi <aki.tuomi@open-xchange.com>
Fri, 11 Sep 2020 05:31:07 +0000 (05:31 +0000)
src/lib-sql/driver-cassandra.c

index ef1dcd069113a6e9fdfb0e584ad10ab2767f3308..07ff5fdb9f4df48ed7186ae0d8bf630dc395be3e 100644 (file)
@@ -84,7 +84,8 @@ struct cassandra_db {
 
        char *hosts, *keyspace, *user, *password;
        CassConsistency read_consistency, write_consistency, delete_consistency;
-       CassConsistency read_fallback_consistency, write_fallback_consistency, delete_fallback_consistency;
+       CassConsistency read_fallback_consistency, write_fallback_consistency;
+       CassConsistency delete_fallback_consistency;
        CassLogLevel log_level;
        bool debug_queries;
        bool latency_aware_routing;
@@ -333,7 +334,8 @@ static int log_level_parse(const char *str, CassLogLevel *log_level_r)
        return -1;
 }
 
-static void driver_cassandra_set_state(struct cassandra_db *db, enum sql_db_state state)
+static void driver_cassandra_set_state(struct cassandra_db *db,
+                                      enum sql_db_state state)
 {
        /* switch back to original ioloop in case the caller wants to
           add/remove timeouts */
@@ -463,7 +465,8 @@ static void driver_cassandra_input(struct cassandra_db *db)
                /* success */
                unsigned int i, count = ret / sizeof(ids[0]);
 
-               for (i = 0; i < count && db->api.state != SQL_DB_STATE_DISCONNECTED; i++)
+               for (i = 0; i < count &&
+                           db->api.state != SQL_DB_STATE_DISCONNECTED; i++)
                        driver_cassandra_input_id(db, ids[i]);
                return;
        }
@@ -533,7 +536,8 @@ static int driver_cassandra_connect(struct sql_db *_db)
                             driver_cassandra_input, db);
        driver_cassandra_set_state(db, SQL_DB_STATE_CONNECTING);
 
-       future = cass_session_connect_keyspace(db->session, db->cluster, db->keyspace);
+       future = cass_session_connect_keyspace(db->session, db->cluster,
+                                              db->keyspace);
        driver_cassandra_set_callback(future, db, connect_callback, db);
        return 0;
 }
@@ -569,7 +573,8 @@ static int driver_cassandra_parse_connect_string(struct cassandra_db *db,
 {
        const char *const *args, *key, *value, *error;
        string_t *hosts = t_str_new(64);
-       bool read_fallback_set = FALSE, write_fallback_set = FALSE, delete_fallback_set = FALSE;
+       bool read_fallback_set = FALSE, write_fallback_set = FALSE;
+       bool delete_fallback_set = FALSE;
 
        db->log_level = CASS_LOG_WARN;
        db->read_consistency = CASS_CONSISTENCY_LOCAL_QUORUM;
@@ -583,8 +588,8 @@ static int driver_cassandra_parse_connect_string(struct cassandra_db *db,
        for (; *args != NULL; args++) {
                value = strchr(*args, '=');
                if (value == NULL) {
-                       *error_r = t_strdup_printf("Missing value in connect string: %s",
-                                                  *args);
+                       *error_r = t_strdup_printf(
+                               "Missing value in connect string: %s", *args);
                        return -1;
                }
                key = t_strdup_until(*args, value++);
@@ -595,7 +600,8 @@ static int driver_cassandra_parse_connect_string(struct cassandra_db *db,
                        str_append(hosts, value);
                } else if (strcmp(key, "port") == 0) {
                        if (net_str2port(value, &db->port) < 0) {
-                               *error_r = t_strdup_printf("Invalid port: %s", value);
+                               *error_r = t_strdup_printf(
+                                       "Invalid port: %s", value);
                                return -1;
                        }
                } else if (strcmp(key, "dbname") == 0 ||
@@ -610,40 +616,53 @@ static int driver_cassandra_parse_connect_string(struct cassandra_db *db,
                        db->password = i_strdup(value);
                } else if (strcmp(key, "read_consistency") == 0) {
                        if (consistency_parse(value, &db->read_consistency) < 0) {
-                               *error_r = t_strdup_printf("Unknown read_consistency: %s", value);
+                               *error_r = t_strdup_printf(
+                                       "Unknown read_consistency: %s", value);
                                return -1;
                        }
                } else if (strcmp(key, "read_fallback_consistency") == 0) {
                        if (consistency_parse(value, &db->read_fallback_consistency) < 0) {
-                               *error_r = t_strdup_printf("Unknown read_fallback_consistency: %s", value);
+                               *error_r = t_strdup_printf(
+                                       "Unknown read_fallback_consistency: %s", value);
                                return -1;
                        }
                        read_fallback_set = TRUE;
                } else if (strcmp(key, "write_consistency") == 0) {
-                       if (consistency_parse(value, &db->write_consistency) < 0) {
-                               *error_r = t_strdup_printf("Unknown write_consistency: %s", value);
+                       if (consistency_parse(value,
+                                             &db->write_consistency) < 0) {
+                               *error_r = t_strdup_printf(
+                                       "Unknown write_consistency: %s", value);
                                return -1;
                        }
                } else if (strcmp(key, "write_fallback_consistency") == 0) {
-                       if (consistency_parse(value, &db->write_fallback_consistency) < 0) {
-                               *error_r = t_strdup_printf("Unknown write_fallback_consistency: %s", value);
+                       if (consistency_parse(value,
+                                             &db->write_fallback_consistency) < 0) {
+                               *error_r = t_strdup_printf(
+                                       "Unknown write_fallback_consistency: %s",
+                                       value);
                                return -1;
                        }
                        write_fallback_set = TRUE;
                } else if (strcmp(key, "delete_consistency") == 0) {
-                       if (consistency_parse(value, &db->delete_consistency) < 0) {
-                               *error_r = t_strdup_printf("Unknown delete_consistency: %s", value);
+                       if (consistency_parse(value,
+                                             &db->delete_consistency) < 0) {
+                               *error_r = t_strdup_printf(
+                                       "Unknown delete_consistency: %s", value);
                                return -1;
                        }
                } else if (strcmp(key, "delete_fallback_consistency") == 0) {
-                       if (consistency_parse(value, &db->delete_fallback_consistency) < 0) {
-                               *error_r = t_strdup_printf("Unknown delete_fallback_consistency: %s", value);
+                       if (consistency_parse(value,
+                                             &db->delete_fallback_consistency) < 0) {
+                               *error_r = t_strdup_printf(
+                                       "Unknown delete_fallback_consistency: %s",
+                                       value);
                                return -1;
                        }
                        delete_fallback_set = TRUE;
                } else if (strcmp(key, "log_level") == 0) {
                        if (log_level_parse(value, &db->log_level) < 0) {
-                               *error_r = t_strdup_printf("Unknown log_level: %s", value);
+                               *error_r = t_strdup_printf(
+                                       "Unknown log_level: %s", value);
                                return -1;
                        }
                } else if (strcmp(key, "debug_queries") == 0) {
@@ -652,67 +671,98 @@ static int driver_cassandra_parse_connect_string(struct cassandra_db *db,
                        db->latency_aware_routing = TRUE;
                } else if (strcmp(key, "version") == 0) {
                        if (str_to_uint(value, &db->protocol_version) < 0) {
-                               *error_r = t_strdup_printf("Invalid version: %s", value);
+                               *error_r = t_strdup_printf(
+                                       "Invalid version: %s", value);
                                return -1;
                        }
                } else if (strcmp(key, "num_threads") == 0) {
                        if (str_to_uint(value, &db->num_threads) < 0) {
-                               *error_r = t_strdup_printf("Invalid num_threads: %s", value);
+                               *error_r = t_strdup_printf(
+                                       "Invalid num_threads: %s", value);
                                return -1;
                        }
                } else if (strcmp(key, "heartbeat_interval") == 0) {
-                       if (settings_get_time(value, &db->heartbeat_interval_secs, &error) < 0) {
-                               *error_r = t_strdup_printf("Invalid heartbeat_interval '%s': %s", value, error);
+                       if (settings_get_time(value, &db->heartbeat_interval_secs,
+                                             &error) < 0) {
+                               *error_r = t_strdup_printf(
+                                       "Invalid heartbeat_interval '%s': %s",
+                                       value, error);
                                return -1;
                        }
                } else if (strcmp(key, "idle_timeout") == 0) {
-                       if (settings_get_time(value, &db->idle_timeout_secs, &error) < 0) {
-                               *error_r = t_strdup_printf("Invalid idle_timeout '%s': %s", value, error);
+                       if (settings_get_time(value, &db->idle_timeout_secs,
+                                             &error) < 0) {
+                               *error_r = t_strdup_printf(
+                                       "Invalid idle_timeout '%s': %s",
+                                       value, error);
                                return -1;
                        }
                } else if (strcmp(key, "connect_timeout") == 0) {
-                       if (settings_get_time_msecs(value, &db->connect_timeout_msecs, &error) < 0) {
-                               *error_r = t_strdup_printf("Invalid connect_timeout '%s': %s", value, error);
+                       if (settings_get_time_msecs(value,
+                                                   &db->connect_timeout_msecs,
+                                                   &error) < 0) {
+                               *error_r = t_strdup_printf(
+                                       "Invalid connect_timeout '%s': %s",
+                                       value, error);
                                return -1;
                        }
                } else if (strcmp(key, "request_timeout") == 0) {
-                       if (settings_get_time_msecs(value, &db->request_timeout_msecs, &error) < 0) {
-                               *error_r = t_strdup_printf("Invalid request_timeout '%s': %s", value, error);
+                       if (settings_get_time_msecs(value,
+                                                   &db->request_timeout_msecs,
+                                                   &error) < 0) {
+                               *error_r = t_strdup_printf(
+                                       "Invalid request_timeout '%s': %s",
+                                       value, error);
                                return -1;
                        }
                } else if (strcmp(key, "warn_timeout") == 0) {
-                       if (settings_get_time_msecs(value, &db->warn_timeout_msecs, &error) < 0) {
-                               *error_r = t_strdup_printf("Invalid warn_timeout '%s': %s", value, error);
+                       if (settings_get_time_msecs(value,
+                                                   &db->warn_timeout_msecs,
+                                                   &error) < 0) {
+                               *error_r = t_strdup_printf(
+                                       "Invalid warn_timeout '%s': %s",
+                                       value, error);
                                return -1;
                        }
                } else if (strcmp(key, "metrics") == 0) {
                        i_free(db->metrics_path);
                        db->metrics_path = i_strdup(value);
                } else if (strcmp(key, "execution_retry_interval") == 0) {
-                       if (settings_get_time_msecs(value, &db->execution_retry_interval_msecs, &error) < 0) {
-                               *error_r = t_strdup_printf("Invalid execution_retry_interval '%s': %s", value, error);
+                       if (settings_get_time_msecs(value,
+                                                   &db->execution_retry_interval_msecs,
+                                                   &error) < 0) {
+                               *error_r = t_strdup_printf(
+                                       "Invalid execution_retry_interval '%s': %s",
+                                       value, error);
                                return -1;
                        }
 #ifndef HAVE_CASSANDRA_SPECULATIVE_POLICY
-                       *error_r = t_strdup_printf("This cassandra version does not support execution_retry_interval");
+                       *error_r = t_strdup_printf(
+       "This cassandra version does not support execution_retry_interval");
                        return -1;
 #endif
                } else if (strcmp(key, "execution_retry_times") == 0) {
                        if (str_to_uint(value, &db->execution_retry_times) < 0) {
-                               *error_r = t_strdup_printf("Invalid execution_retry_times %s", value);
+                               *error_r = t_strdup_printf(
+                                       "Invalid execution_retry_times %s",
+                                       value);
                                return -1;
                        }
 #ifndef HAVE_CASSANDRA_SPECULATIVE_POLICY
-                       *error_r = t_strdup_printf("This cassandra version does not support execution_retry_times");
+                       *error_r = t_strdup_printf(
+       "This cassandra version does not support execution_retry_times");
                        return -1;
 #endif
                } else if (strcmp(key, "page_size") == 0) {
                        if (str_to_uint(value, &db->page_size) < 0) {
-                               *error_r = t_strdup_printf("Invalid page_size: %s", value);
+                               *error_r = t_strdup_printf(
+                                       "Invalid page_size: %s",
+                                       value);
                                return -1;
                        }
                } else {
-                       *error_r = t_strdup_printf("Unknown connect string: %s", key);
+                       *error_r = t_strdup_printf(
+                               "Unknown connect string: %s", key);
                        return -1;
                }
        }
@@ -740,7 +790,8 @@ static void
 driver_cassandra_get_metrics_json(struct cassandra_db *db, string_t *dest)
 {
 #define ADD_UINT64(_struct, _field) \
-       str_printfa(dest, "\""#_field"\": %llu,", (unsigned long long)metrics._struct._field);
+       str_printfa(dest, "\""#_field"\": %llu,", \
+               (unsigned long long)metrics._struct._field);
 #define ADD_DOUBLE(_struct, _field) \
        str_printfa(dest, "\""#_field"\": %02lf,", metrics._struct._field);
        CassMetrics metrics;
@@ -830,7 +881,8 @@ static void driver_cassandra_free(struct cassandra_db **_db)
 }
 
 static int driver_cassandra_init_full_v(const struct sql_settings *set,
-                                       struct sql_db **db_r, const char **error_r)
+                                       struct sql_db **db_r,
+                                       const char **error_r)
 {
        struct cassandra_db *db;
        char *error = NULL;
@@ -845,7 +897,8 @@ static int driver_cassandra_init_full_v(const struct sql_settings *set,
 
        T_BEGIN {
                const char *tmp;
-               if ((ret = driver_cassandra_parse_connect_string(db, set->connect_string,
+               if ((ret = driver_cassandra_parse_connect_string(db,
+                                                                set->connect_string,
                                                                 &tmp)) < 0) {
                        error = i_strdup(tmp);
                }
@@ -887,16 +940,21 @@ static int driver_cassandra_init_full_v(const struct sql_settings *set,
        if (db->latency_aware_routing)
                cass_cluster_set_latency_aware_routing(db->cluster, cass_true);
        if (db->heartbeat_interval_secs != 0)
-               cass_cluster_set_connection_heartbeat_interval(db->cluster, db->heartbeat_interval_secs);
+               cass_cluster_set_connection_heartbeat_interval(db->cluster,
+                       db->heartbeat_interval_secs);
        if (db->idle_timeout_secs != 0)
-               cass_cluster_set_connection_idle_timeout(db->cluster, db->idle_timeout_secs);
+               cass_cluster_set_connection_idle_timeout(db->cluster,
+                       db->idle_timeout_secs);
 #ifdef HAVE_CASSANDRA_SPECULATIVE_POLICY
        if (db->execution_retry_times > 0 && db->execution_retry_interval_msecs > 0)
-               cass_cluster_set_constant_speculative_execution_policy(db->cluster, db->execution_retry_interval_msecs, db->execution_retry_times);
+               cass_cluster_set_constant_speculative_execution_policy(
+                       db->cluster, db->execution_retry_interval_msecs,
+                       db->execution_retry_times);
 #endif
        db->session = cass_session_new();
        if (db->metrics_path != NULL)
-               db->to_metrics = timeout_add(1000, driver_cassandra_metrics_write, db);
+               db->to_metrics = timeout_add(1000, driver_cassandra_metrics_write,
+                                            db);
        i_array_init(&db->results, 16);
        i_array_init(&db->callbacks, 16);
        i_array_init(&db->pending_prepares, 16);
@@ -913,7 +971,7 @@ static void driver_cassandra_deinit_v(struct sql_db *_db)
 {
        struct cassandra_db *db = (struct cassandra_db *)_db;
 
-        driver_cassandra_close(db, "Deinitialized");
+       driver_cassandra_close(db, "Deinitialized");
 
        i_assert(array_count(&db->callbacks) == 0);
        array_free(&db->callbacks);
@@ -993,7 +1051,7 @@ static void driver_cassandra_log_result(struct cassandra_result *result,
 static void driver_cassandra_result_free(struct sql_result *_result)
 {
        struct cassandra_db *db = (struct cassandra_db *)_result->db;
-        struct cassandra_result *result = (struct cassandra_result *)_result;
+       struct cassandra_result *result = (struct cassandra_result *)_result;
        long long reply_usecs;
 
        i_assert(!result->api.callback);
@@ -1002,7 +1060,8 @@ static void driver_cassandra_result_free(struct sql_result *_result)
        if (_result == db->sync_result)
                db->sync_result = NULL;
 
-       reply_usecs = timeval_diff_usecs(&result->finish_time, &result->start_time);
+       reply_usecs = timeval_diff_usecs(&result->finish_time,
+                                        &result->start_time);
        driver_cassandra_log_result(result, FALSE, reply_usecs);
 
        if (result->page_num > 0 && !result->paging_continues) {
@@ -1060,7 +1119,8 @@ static void query_resend_with_fallback(struct cassandra_result *result)
                ioloop_time - db->last_fallback_warning[result->query_type];
 
        if (last_warning >= CASSANDRA_FALLBACK_WARN_INTERVAL_SECS) {
-               e_warning(db->api.event, "%s - retrying future %s queries with consistency %s (instead of %s)",
+               e_warning(db->api.event,
+                         "%s - retrying future %s queries with consistency %s (instead of %s)",
                          result->error, cassandra_query_type_names[result->query_type],
                          cass_consistency_string(result->fallback_consistency),
                          cass_consistency_string(result->consistency));
@@ -1178,9 +1238,12 @@ static void query_callback(CassFuture *future, void *context)
                   not. Also _SERVER_UNAVAILABLE could have actually written
                   enough copies of the data for the query to succeed. */
                result->api.error_type = driver_cassandra_error_is_uncertain(error);
-               result->error = i_strdup_printf("Query '%s' failed: %.*s (in %u.%03u secs%s)",
+               result->error = i_strdup_printf(
+                       "Query '%s' failed: %.*s (in %u.%03u secs%s)",
                        result->query, (int)errsize, errmsg, msecs/1000, msecs%1000,
-                       result->page_num == 0 ? "" : t_strdup_printf(", page %u", result->page_num));
+                       result->page_num == 0 ?
+                               "" :
+                               t_strdup_printf(", page %u", result->page_num));
 
                if (query_error_want_fallback(error) &&
                    result->fallback_consistency != result->consistency) {
@@ -1235,7 +1298,7 @@ static void driver_cassandra_result_send_query(struct cassandra_result *result)
 static bool
 driver_cassandra_want_fallback_query(struct cassandra_result *result)
 {
-        struct cassandra_db *db = (struct cassandra_db *)result->api.db;
+       struct cassandra_db *db = (struct cassandra_db *)result->api.db;
        unsigned int failure_count = db->fallback_failures[result->query_type];
        unsigned int i, msecs = CASSANDRA_FALLBACK_FIRST_RETRY_MSECS;
        struct timeval tv;
@@ -1269,13 +1332,14 @@ driver_cassandra_want_fallback_query(struct cassandra_result *result)
 
 static int driver_cassandra_send_query(struct cassandra_result *result)
 {
-        struct cassandra_db *db = (struct cassandra_db *)result->api.db;
+       struct cassandra_db *db = (struct cassandra_db *)result->api.db;
        int ret;
 
        if (!SQL_DB_IS_READY(&db->api)) {
                if ((ret = sql_connect(&db->api)) <= 0) {
                        if (ret < 0)
-                               driver_cassandra_close(db, "Couldn't connect to Cassandra");
+                               driver_cassandra_close(db,
+                                       "Couldn't connect to Cassandra");
                        return ret;
                }
        }
@@ -1373,18 +1437,20 @@ driver_cassandra_query_full(struct sql_db *_db, const char *query,
 
 static void driver_cassandra_exec(struct sql_db *db, const char *query)
 {
-       driver_cassandra_query_full(db, query, CASSANDRA_QUERY_TYPE_WRITE, exec_callback, NULL);
+       driver_cassandra_query_full(db, query, CASSANDRA_QUERY_TYPE_WRITE,
+                                   exec_callback, NULL);
 }
 
 static void driver_cassandra_query(struct sql_db *db, const char *query,
                                   sql_query_callback_t *callback, void *context)
 {
-       driver_cassandra_query_full(db, query, CASSANDRA_QUERY_TYPE_READ, callback, context);
+       driver_cassandra_query_full(db, query, CASSANDRA_QUERY_TYPE_READ,
+                                   callback, context);
 }
 
 static void cassandra_query_s_callback(struct sql_result *result, void *context)
 {
-        struct cassandra_db *db = context;
+       struct cassandra_db *db = context;
 
        db->sync_result = result;
 }
@@ -1542,7 +1608,8 @@ static int driver_cassandra_result_next_page(struct cassandra_result *result)
        /* callers that don't support sql_query_more() will still get a useful
           error message. */
        i_free(result->error);
-       result->error = i_strdup("Paged query has more results, but not supported by the caller");
+       result->error = i_strdup(
+               "Paged query has more results, but not supported by the caller");
        return SQL_RESULT_NEXT_MORE;
 }
 
@@ -1916,25 +1983,30 @@ driver_cassandra_bind_int(struct cassandra_sql_statement *stmt,
        i_assert(stmt->prep != NULL);
 
        /* statements require exactly correct value type */
-       data_type = cass_prepared_parameter_data_type(stmt->prep->prepared, column_idx);
+       data_type = cass_prepared_parameter_data_type(stmt->prep->prepared,
+                                                     column_idx);
        value_type = cass_data_type_type(data_type);
 
        switch (value_type) {
        case CASS_VALUE_TYPE_INT:
                if (value < -2147483648 || value > 2147483647)
                        return CASS_ERROR_LIB_INVALID_VALUE_TYPE;
-               return cass_statement_bind_int32(stmt->cass_stmt, column_idx, value);
+               return cass_statement_bind_int32(stmt->cass_stmt, column_idx,
+                                                value);
        case CASS_VALUE_TYPE_TIMESTAMP:
        case CASS_VALUE_TYPE_BIGINT:
-               return cass_statement_bind_int64(stmt->cass_stmt, column_idx, value);
+               return cass_statement_bind_int64(stmt->cass_stmt, column_idx,
+                                                value);
        case CASS_VALUE_TYPE_SMALL_INT:
                if (value < -32768 || value > 32767)
                        return CASS_ERROR_LIB_INVALID_VALUE_TYPE;
-               return cass_statement_bind_int16(stmt->cass_stmt, column_idx, value);
+               return cass_statement_bind_int16(stmt->cass_stmt, column_idx,
+                                                value);
        case CASS_VALUE_TYPE_TINY_INT:
                if (value < -128 || value > 127)
                        return CASS_ERROR_LIB_INVALID_VALUE_TYPE;
-               return cass_statement_bind_int8(stmt->cass_stmt, column_idx, value);
+               return cass_statement_bind_int8(stmt->cass_stmt, column_idx,
+                                               value);
        default:
                return CASS_ERROR_LIB_INVALID_VALUE_TYPE;
        }
@@ -1957,7 +2029,8 @@ static void prepare_finish_arg(struct cassandra_sql_statement *stmt,
                                               arg->value_int64);
        }
        if (rc != CASS_OK) {
-               e_error(stmt->stmt.db->event, "Statement '%s': Failed to bind column %u: %s",
+               e_error(stmt->stmt.db->event,
+                       "Statement '%s': Failed to bind column %u: %s",
                        stmt->stmt.query_template, arg->column_idx,
                        cass_error_desc(rc));
        }
@@ -2041,7 +2114,8 @@ static void prepare_start(struct cassandra_sql_prepared_statement *prep_stmt)
        /* clear the current error in case we're retrying */
        i_free_and_null(prep_stmt->error);
 
-       future = cass_session_prepare(db->session, prep_stmt->prep_stmt.query_template);
+       future = cass_session_prepare(db->session,
+                                     prep_stmt->prep_stmt.query_template);
        driver_cassandra_set_callback(future, db, prepare_callback, prep_stmt);
 }
 
@@ -2267,7 +2341,8 @@ driver_cassandra_update_stmt(struct sql_transaction_context *_ctx,
        i_assert(affected_rows == NULL);
 
        if (ctx->query != NULL || ctx->stmt != NULL) {
-               transaction_set_failed(ctx, "Multiple changes in transaction not supported");
+               transaction_set_failed(ctx,
+                       "Multiple changes in transaction not supported");
                return;
        }
        if (stmt->prep != NULL)