char *hosts, *keyspace, *user, *password;
CassConsistency read_consistency, write_consistency, delete_consistency;
- CassConsistency read_fallback_consistency, write_fallback_consistency, delete_fallback_consistency;
+ CassConsistency read_fallback_consistency, write_fallback_consistency;
+ CassConsistency delete_fallback_consistency;
CassLogLevel log_level;
bool debug_queries;
bool latency_aware_routing;
return -1;
}
-static void driver_cassandra_set_state(struct cassandra_db *db, enum sql_db_state state)
+static void driver_cassandra_set_state(struct cassandra_db *db,
+ enum sql_db_state state)
{
/* switch back to original ioloop in case the caller wants to
add/remove timeouts */
/* success */
unsigned int i, count = ret / sizeof(ids[0]);
- for (i = 0; i < count && db->api.state != SQL_DB_STATE_DISCONNECTED; i++)
+ for (i = 0; i < count &&
+ db->api.state != SQL_DB_STATE_DISCONNECTED; i++)
driver_cassandra_input_id(db, ids[i]);
return;
}
driver_cassandra_input, db);
driver_cassandra_set_state(db, SQL_DB_STATE_CONNECTING);
- future = cass_session_connect_keyspace(db->session, db->cluster, db->keyspace);
+ future = cass_session_connect_keyspace(db->session, db->cluster,
+ db->keyspace);
driver_cassandra_set_callback(future, db, connect_callback, db);
return 0;
}
{
const char *const *args, *key, *value, *error;
string_t *hosts = t_str_new(64);
- bool read_fallback_set = FALSE, write_fallback_set = FALSE, delete_fallback_set = FALSE;
+ bool read_fallback_set = FALSE, write_fallback_set = FALSE;
+ bool delete_fallback_set = FALSE;
db->log_level = CASS_LOG_WARN;
db->read_consistency = CASS_CONSISTENCY_LOCAL_QUORUM;
for (; *args != NULL; args++) {
value = strchr(*args, '=');
if (value == NULL) {
- *error_r = t_strdup_printf("Missing value in connect string: %s",
- *args);
+ *error_r = t_strdup_printf(
+ "Missing value in connect string: %s", *args);
return -1;
}
key = t_strdup_until(*args, value++);
str_append(hosts, value);
} else if (strcmp(key, "port") == 0) {
if (net_str2port(value, &db->port) < 0) {
- *error_r = t_strdup_printf("Invalid port: %s", value);
+ *error_r = t_strdup_printf(
+ "Invalid port: %s", value);
return -1;
}
} else if (strcmp(key, "dbname") == 0 ||
db->password = i_strdup(value);
} else if (strcmp(key, "read_consistency") == 0) {
if (consistency_parse(value, &db->read_consistency) < 0) {
- *error_r = t_strdup_printf("Unknown read_consistency: %s", value);
+ *error_r = t_strdup_printf(
+ "Unknown read_consistency: %s", value);
return -1;
}
} else if (strcmp(key, "read_fallback_consistency") == 0) {
if (consistency_parse(value, &db->read_fallback_consistency) < 0) {
- *error_r = t_strdup_printf("Unknown read_fallback_consistency: %s", value);
+ *error_r = t_strdup_printf(
+ "Unknown read_fallback_consistency: %s", value);
return -1;
}
read_fallback_set = TRUE;
} else if (strcmp(key, "write_consistency") == 0) {
- if (consistency_parse(value, &db->write_consistency) < 0) {
- *error_r = t_strdup_printf("Unknown write_consistency: %s", value);
+ if (consistency_parse(value,
+ &db->write_consistency) < 0) {
+ *error_r = t_strdup_printf(
+ "Unknown write_consistency: %s", value);
return -1;
}
} else if (strcmp(key, "write_fallback_consistency") == 0) {
- if (consistency_parse(value, &db->write_fallback_consistency) < 0) {
- *error_r = t_strdup_printf("Unknown write_fallback_consistency: %s", value);
+ if (consistency_parse(value,
+ &db->write_fallback_consistency) < 0) {
+ *error_r = t_strdup_printf(
+ "Unknown write_fallback_consistency: %s",
+ value);
return -1;
}
write_fallback_set = TRUE;
} else if (strcmp(key, "delete_consistency") == 0) {
- if (consistency_parse(value, &db->delete_consistency) < 0) {
- *error_r = t_strdup_printf("Unknown delete_consistency: %s", value);
+ if (consistency_parse(value,
+ &db->delete_consistency) < 0) {
+ *error_r = t_strdup_printf(
+ "Unknown delete_consistency: %s", value);
return -1;
}
} else if (strcmp(key, "delete_fallback_consistency") == 0) {
- if (consistency_parse(value, &db->delete_fallback_consistency) < 0) {
- *error_r = t_strdup_printf("Unknown delete_fallback_consistency: %s", value);
+ if (consistency_parse(value,
+ &db->delete_fallback_consistency) < 0) {
+ *error_r = t_strdup_printf(
+ "Unknown delete_fallback_consistency: %s",
+ value);
return -1;
}
delete_fallback_set = TRUE;
} else if (strcmp(key, "log_level") == 0) {
if (log_level_parse(value, &db->log_level) < 0) {
- *error_r = t_strdup_printf("Unknown log_level: %s", value);
+ *error_r = t_strdup_printf(
+ "Unknown log_level: %s", value);
return -1;
}
} else if (strcmp(key, "debug_queries") == 0) {
db->latency_aware_routing = TRUE;
} else if (strcmp(key, "version") == 0) {
if (str_to_uint(value, &db->protocol_version) < 0) {
- *error_r = t_strdup_printf("Invalid version: %s", value);
+ *error_r = t_strdup_printf(
+ "Invalid version: %s", value);
return -1;
}
} else if (strcmp(key, "num_threads") == 0) {
if (str_to_uint(value, &db->num_threads) < 0) {
- *error_r = t_strdup_printf("Invalid num_threads: %s", value);
+ *error_r = t_strdup_printf(
+ "Invalid num_threads: %s", value);
return -1;
}
} else if (strcmp(key, "heartbeat_interval") == 0) {
- if (settings_get_time(value, &db->heartbeat_interval_secs, &error) < 0) {
- *error_r = t_strdup_printf("Invalid heartbeat_interval '%s': %s", value, error);
+ if (settings_get_time(value, &db->heartbeat_interval_secs,
+ &error) < 0) {
+ *error_r = t_strdup_printf(
+ "Invalid heartbeat_interval '%s': %s",
+ value, error);
return -1;
}
} else if (strcmp(key, "idle_timeout") == 0) {
- if (settings_get_time(value, &db->idle_timeout_secs, &error) < 0) {
- *error_r = t_strdup_printf("Invalid idle_timeout '%s': %s", value, error);
+ if (settings_get_time(value, &db->idle_timeout_secs,
+ &error) < 0) {
+ *error_r = t_strdup_printf(
+ "Invalid idle_timeout '%s': %s",
+ value, error);
return -1;
}
} else if (strcmp(key, "connect_timeout") == 0) {
- if (settings_get_time_msecs(value, &db->connect_timeout_msecs, &error) < 0) {
- *error_r = t_strdup_printf("Invalid connect_timeout '%s': %s", value, error);
+ if (settings_get_time_msecs(value,
+ &db->connect_timeout_msecs,
+ &error) < 0) {
+ *error_r = t_strdup_printf(
+ "Invalid connect_timeout '%s': %s",
+ value, error);
return -1;
}
} else if (strcmp(key, "request_timeout") == 0) {
- if (settings_get_time_msecs(value, &db->request_timeout_msecs, &error) < 0) {
- *error_r = t_strdup_printf("Invalid request_timeout '%s': %s", value, error);
+ if (settings_get_time_msecs(value,
+ &db->request_timeout_msecs,
+ &error) < 0) {
+ *error_r = t_strdup_printf(
+ "Invalid request_timeout '%s': %s",
+ value, error);
return -1;
}
} else if (strcmp(key, "warn_timeout") == 0) {
- if (settings_get_time_msecs(value, &db->warn_timeout_msecs, &error) < 0) {
- *error_r = t_strdup_printf("Invalid warn_timeout '%s': %s", value, error);
+ if (settings_get_time_msecs(value,
+ &db->warn_timeout_msecs,
+ &error) < 0) {
+ *error_r = t_strdup_printf(
+ "Invalid warn_timeout '%s': %s",
+ value, error);
return -1;
}
} else if (strcmp(key, "metrics") == 0) {
i_free(db->metrics_path);
db->metrics_path = i_strdup(value);
} else if (strcmp(key, "execution_retry_interval") == 0) {
- if (settings_get_time_msecs(value, &db->execution_retry_interval_msecs, &error) < 0) {
- *error_r = t_strdup_printf("Invalid execution_retry_interval '%s': %s", value, error);
+ if (settings_get_time_msecs(value,
+ &db->execution_retry_interval_msecs,
+ &error) < 0) {
+ *error_r = t_strdup_printf(
+ "Invalid execution_retry_interval '%s': %s",
+ value, error);
return -1;
}
#ifndef HAVE_CASSANDRA_SPECULATIVE_POLICY
- *error_r = t_strdup_printf("This cassandra version does not support execution_retry_interval");
+ *error_r = t_strdup_printf(
+ "This cassandra version does not support execution_retry_interval");
return -1;
#endif
} else if (strcmp(key, "execution_retry_times") == 0) {
if (str_to_uint(value, &db->execution_retry_times) < 0) {
- *error_r = t_strdup_printf("Invalid execution_retry_times %s", value);
+ *error_r = t_strdup_printf(
+ "Invalid execution_retry_times %s",
+ value);
return -1;
}
#ifndef HAVE_CASSANDRA_SPECULATIVE_POLICY
- *error_r = t_strdup_printf("This cassandra version does not support execution_retry_times");
+ *error_r = t_strdup_printf(
+ "This cassandra version does not support execution_retry_times");
return -1;
#endif
} else if (strcmp(key, "page_size") == 0) {
if (str_to_uint(value, &db->page_size) < 0) {
- *error_r = t_strdup_printf("Invalid page_size: %s", value);
+ *error_r = t_strdup_printf(
+ "Invalid page_size: %s",
+ value);
return -1;
}
} else {
- *error_r = t_strdup_printf("Unknown connect string: %s", key);
+ *error_r = t_strdup_printf(
+ "Unknown connect string: %s", key);
return -1;
}
}
driver_cassandra_get_metrics_json(struct cassandra_db *db, string_t *dest)
{
#define ADD_UINT64(_struct, _field) \
- str_printfa(dest, "\""#_field"\": %llu,", (unsigned long long)metrics._struct._field);
+ str_printfa(dest, "\""#_field"\": %llu,", \
+ (unsigned long long)metrics._struct._field);
#define ADD_DOUBLE(_struct, _field) \
str_printfa(dest, "\""#_field"\": %02lf,", metrics._struct._field);
CassMetrics metrics;
}
static int driver_cassandra_init_full_v(const struct sql_settings *set,
- struct sql_db **db_r, const char **error_r)
+ struct sql_db **db_r,
+ const char **error_r)
{
struct cassandra_db *db;
char *error = NULL;
T_BEGIN {
const char *tmp;
- if ((ret = driver_cassandra_parse_connect_string(db, set->connect_string,
+ if ((ret = driver_cassandra_parse_connect_string(db,
+ set->connect_string,
&tmp)) < 0) {
error = i_strdup(tmp);
}
if (db->latency_aware_routing)
cass_cluster_set_latency_aware_routing(db->cluster, cass_true);
if (db->heartbeat_interval_secs != 0)
- cass_cluster_set_connection_heartbeat_interval(db->cluster, db->heartbeat_interval_secs);
+ cass_cluster_set_connection_heartbeat_interval(db->cluster,
+ db->heartbeat_interval_secs);
if (db->idle_timeout_secs != 0)
- cass_cluster_set_connection_idle_timeout(db->cluster, db->idle_timeout_secs);
+ cass_cluster_set_connection_idle_timeout(db->cluster,
+ db->idle_timeout_secs);
#ifdef HAVE_CASSANDRA_SPECULATIVE_POLICY
if (db->execution_retry_times > 0 && db->execution_retry_interval_msecs > 0)
- cass_cluster_set_constant_speculative_execution_policy(db->cluster, db->execution_retry_interval_msecs, db->execution_retry_times);
+ cass_cluster_set_constant_speculative_execution_policy(
+ db->cluster, db->execution_retry_interval_msecs,
+ db->execution_retry_times);
#endif
db->session = cass_session_new();
if (db->metrics_path != NULL)
- db->to_metrics = timeout_add(1000, driver_cassandra_metrics_write, db);
+ db->to_metrics = timeout_add(1000, driver_cassandra_metrics_write,
+ db);
i_array_init(&db->results, 16);
i_array_init(&db->callbacks, 16);
i_array_init(&db->pending_prepares, 16);
{
struct cassandra_db *db = (struct cassandra_db *)_db;
- driver_cassandra_close(db, "Deinitialized");
+ driver_cassandra_close(db, "Deinitialized");
i_assert(array_count(&db->callbacks) == 0);
array_free(&db->callbacks);
static void driver_cassandra_result_free(struct sql_result *_result)
{
struct cassandra_db *db = (struct cassandra_db *)_result->db;
- struct cassandra_result *result = (struct cassandra_result *)_result;
+ struct cassandra_result *result = (struct cassandra_result *)_result;
long long reply_usecs;
i_assert(!result->api.callback);
if (_result == db->sync_result)
db->sync_result = NULL;
- reply_usecs = timeval_diff_usecs(&result->finish_time, &result->start_time);
+ reply_usecs = timeval_diff_usecs(&result->finish_time,
+ &result->start_time);
driver_cassandra_log_result(result, FALSE, reply_usecs);
if (result->page_num > 0 && !result->paging_continues) {
ioloop_time - db->last_fallback_warning[result->query_type];
if (last_warning >= CASSANDRA_FALLBACK_WARN_INTERVAL_SECS) {
- e_warning(db->api.event, "%s - retrying future %s queries with consistency %s (instead of %s)",
+ e_warning(db->api.event,
+ "%s - retrying future %s queries with consistency %s (instead of %s)",
result->error, cassandra_query_type_names[result->query_type],
cass_consistency_string(result->fallback_consistency),
cass_consistency_string(result->consistency));
not. Also _SERVER_UNAVAILABLE could have actually written
enough copies of the data for the query to succeed. */
result->api.error_type = driver_cassandra_error_is_uncertain(error);
- result->error = i_strdup_printf("Query '%s' failed: %.*s (in %u.%03u secs%s)",
+ result->error = i_strdup_printf(
+ "Query '%s' failed: %.*s (in %u.%03u secs%s)",
result->query, (int)errsize, errmsg, msecs/1000, msecs%1000,
- result->page_num == 0 ? "" : t_strdup_printf(", page %u", result->page_num));
+ result->page_num == 0 ?
+ "" :
+ t_strdup_printf(", page %u", result->page_num));
if (query_error_want_fallback(error) &&
result->fallback_consistency != result->consistency) {
static bool
driver_cassandra_want_fallback_query(struct cassandra_result *result)
{
- struct cassandra_db *db = (struct cassandra_db *)result->api.db;
+ struct cassandra_db *db = (struct cassandra_db *)result->api.db;
unsigned int failure_count = db->fallback_failures[result->query_type];
unsigned int i, msecs = CASSANDRA_FALLBACK_FIRST_RETRY_MSECS;
struct timeval tv;
static int driver_cassandra_send_query(struct cassandra_result *result)
{
- struct cassandra_db *db = (struct cassandra_db *)result->api.db;
+ struct cassandra_db *db = (struct cassandra_db *)result->api.db;
int ret;
if (!SQL_DB_IS_READY(&db->api)) {
if ((ret = sql_connect(&db->api)) <= 0) {
if (ret < 0)
- driver_cassandra_close(db, "Couldn't connect to Cassandra");
+ driver_cassandra_close(db,
+ "Couldn't connect to Cassandra");
return ret;
}
}
static void driver_cassandra_exec(struct sql_db *db, const char *query)
{
- driver_cassandra_query_full(db, query, CASSANDRA_QUERY_TYPE_WRITE, exec_callback, NULL);
+ driver_cassandra_query_full(db, query, CASSANDRA_QUERY_TYPE_WRITE,
+ exec_callback, NULL);
}
static void driver_cassandra_query(struct sql_db *db, const char *query,
sql_query_callback_t *callback, void *context)
{
- driver_cassandra_query_full(db, query, CASSANDRA_QUERY_TYPE_READ, callback, context);
+ driver_cassandra_query_full(db, query, CASSANDRA_QUERY_TYPE_READ,
+ callback, context);
}
static void cassandra_query_s_callback(struct sql_result *result, void *context)
{
- struct cassandra_db *db = context;
+ struct cassandra_db *db = context;
db->sync_result = result;
}
/* callers that don't support sql_query_more() will still get a useful
error message. */
i_free(result->error);
- result->error = i_strdup("Paged query has more results, but not supported by the caller");
+ result->error = i_strdup(
+ "Paged query has more results, but not supported by the caller");
return SQL_RESULT_NEXT_MORE;
}
i_assert(stmt->prep != NULL);
/* statements require exactly correct value type */
- data_type = cass_prepared_parameter_data_type(stmt->prep->prepared, column_idx);
+ data_type = cass_prepared_parameter_data_type(stmt->prep->prepared,
+ column_idx);
value_type = cass_data_type_type(data_type);
switch (value_type) {
case CASS_VALUE_TYPE_INT:
if (value < -2147483648 || value > 2147483647)
return CASS_ERROR_LIB_INVALID_VALUE_TYPE;
- return cass_statement_bind_int32(stmt->cass_stmt, column_idx, value);
+ return cass_statement_bind_int32(stmt->cass_stmt, column_idx,
+ value);
case CASS_VALUE_TYPE_TIMESTAMP:
case CASS_VALUE_TYPE_BIGINT:
- return cass_statement_bind_int64(stmt->cass_stmt, column_idx, value);
+ return cass_statement_bind_int64(stmt->cass_stmt, column_idx,
+ value);
case CASS_VALUE_TYPE_SMALL_INT:
if (value < -32768 || value > 32767)
return CASS_ERROR_LIB_INVALID_VALUE_TYPE;
- return cass_statement_bind_int16(stmt->cass_stmt, column_idx, value);
+ return cass_statement_bind_int16(stmt->cass_stmt, column_idx,
+ value);
case CASS_VALUE_TYPE_TINY_INT:
if (value < -128 || value > 127)
return CASS_ERROR_LIB_INVALID_VALUE_TYPE;
- return cass_statement_bind_int8(stmt->cass_stmt, column_idx, value);
+ return cass_statement_bind_int8(stmt->cass_stmt, column_idx,
+ value);
default:
return CASS_ERROR_LIB_INVALID_VALUE_TYPE;
}
arg->value_int64);
}
if (rc != CASS_OK) {
- e_error(stmt->stmt.db->event, "Statement '%s': Failed to bind column %u: %s",
+ e_error(stmt->stmt.db->event,
+ "Statement '%s': Failed to bind column %u: %s",
stmt->stmt.query_template, arg->column_idx,
cass_error_desc(rc));
}
/* clear the current error in case we're retrying */
i_free_and_null(prep_stmt->error);
- future = cass_session_prepare(db->session, prep_stmt->prep_stmt.query_template);
+ future = cass_session_prepare(db->session,
+ prep_stmt->prep_stmt.query_template);
driver_cassandra_set_callback(future, db, prepare_callback, prep_stmt);
}
i_assert(affected_rows == NULL);
if (ctx->query != NULL || ctx->stmt != NULL) {
- transaction_set_failed(ctx, "Multiple changes in transaction not supported");
+ transaction_set_failed(ctx,
+ "Multiple changes in transaction not supported");
return;
}
if (stmt->prep != NULL)