#include "net.h"
#include "write-full.h"
#include "time-util.h"
+#include "var-expand.h"
#include "settings-parser.h"
#include "sql-api-private.h"
#ifdef BUILD_CASSANDRA
+#include <fcntl.h>
#include <unistd.h>
#include <cassandra.h>
ARRAY(struct cassandra_result *) results;
unsigned int callback_ids;
+ char *metrics_path;
+ struct timeout *to_metrics;
+
struct timeval first_fallback_sent[CASSANDRA_QUERY_TYPE_COUNT];
time_t last_fallback_warning[CASSANDRA_QUERY_TYPE_COUNT];
unsigned int fallback_failures[CASSANDRA_QUERY_TYPE_COUNT];
} else if (strcmp(key, "request_timeout") == 0) {
if (settings_get_time_msecs(value, &db->request_timeout_msecs, &error) < 0)
i_fatal("cassandra: Invalid request_timeout '%s': %s", value, error);
+ } else if (strcmp(key, "metrics") == 0) {
+ i_free(db->metrics_path);
+ db->metrics_path = i_strdup(value);
} else {
i_fatal("cassandra: Unknown connect string: %s", key);
}
db->hosts = i_strdup(str_c(hosts));
}
+static void
+driver_cassandra_get_metrics_json(struct cassandra_db *db, string_t *dest)
+{
+#define ADD_UINT64(_struct, _field) \
+ str_printfa(dest, "\""#_field"\": %llu,", (unsigned long long)metrics._struct._field);
+#define ADD_DOUBLE(_struct, _field) \
+ str_printfa(dest, "\""#_field"\": %02lf,", metrics._struct._field);
+ CassMetrics metrics;
+
+ cass_session_get_metrics(db->session, &metrics);
+ str_append(dest, "{ \"requests\": {");
+ ADD_UINT64(requests, min);
+ ADD_UINT64(requests, max);
+ ADD_UINT64(requests, mean);
+ ADD_UINT64(requests, stddev);
+ ADD_UINT64(requests, median);
+ ADD_UINT64(requests, percentile_75th);
+ ADD_UINT64(requests, percentile_95th);
+ ADD_UINT64(requests, percentile_98th);
+ ADD_UINT64(requests, percentile_99th);
+ ADD_UINT64(requests, percentile_999th);
+ ADD_DOUBLE(requests, mean_rate);
+ ADD_DOUBLE(requests, one_minute_rate);
+ ADD_DOUBLE(requests, five_minute_rate);
+ ADD_DOUBLE(requests, fifteen_minute_rate);
+ str_truncate(dest, str_len(dest)-1);
+ str_append(dest, "}, \"stats\": {");
+ ADD_UINT64(stats, total_connections);
+ ADD_UINT64(stats, available_connections);
+ ADD_UINT64(stats, exceeded_pending_requests_water_mark);
+ ADD_UINT64(stats, exceeded_write_bytes_water_mark);
+ str_truncate(dest, str_len(dest)-1);
+ str_append(dest, "}, \"errors\": {");
+ ADD_UINT64(errors, connection_timeouts);
+ ADD_UINT64(errors, pending_request_timeouts);
+ ADD_UINT64(errors, request_timeouts);
+ str_truncate(dest, str_len(dest)-1);
+ str_append(dest, "}}");
+}
+
+static void driver_cassandra_metrics_write(struct cassandra_db *db)
+{
+ struct var_expand_table tab[] = {
+ { '\0', NULL, NULL }
+ };
+ string_t *path = t_str_new(64);
+ string_t *data;
+ int fd;
+
+ var_expand(path, db->metrics_path, tab);
+
+ fd = open(str_c(path), O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK, 0600);
+ if (fd == -1) {
+ i_error("creat(%s) failed: %m", str_c(path));
+ return;
+ }
+ data = t_str_new(1024);
+ driver_cassandra_get_metrics_json(db, data);
+ if (write_full(fd, str_data(data), str_len(data)) < 0)
+ i_error("write(%s) failed: %m", str_c(path));
+ i_close_fd(&fd);
+}
+
static struct sql_db *driver_cassandra_init_v(const char *connect_string)
{
struct cassandra_db *db;
if (db->num_threads != 0)
cass_cluster_set_num_threads_io(db->cluster, db->num_threads);
db->session = cass_session_new();
+ if (db->metrics_path != NULL)
+ db->to_metrics = timeout_add(1000, driver_cassandra_metrics_write, db);
i_array_init(&db->results, 16);
i_array_init(&db->callbacks, 16);
return &db->api;
cass_session_free(db->session);
cass_cluster_free(db->cluster);
cass_timestamp_gen_free(db->timestamp_gen);
+ if (db->to_metrics != NULL)
+ timeout_remove(&db->to_metrics);
+ i_free(db->metrics_path);
i_free(db->hosts);
i_free(db->error);
i_free(db->keyspace);