From: Timo Sirainen Date: Wed, 25 May 2016 19:29:47 +0000 (+0300) Subject: cassandra: Added metrics=path connect setting. X-Git-Tag: 2.2.25.rc1~268 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=a2f421418658fca58cfa0f6a2ae77b730992cb7f;p=thirdparty%2Fdovecot%2Fcore.git cassandra: Added metrics=path connect setting. Cassandra's metrics are written to the path in JSON format. It can be a file or a FIFO. The path supports expanding the standard global %variables, such as %{pid} --- diff --git a/src/lib-sql/driver-cassandra.c b/src/lib-sql/driver-cassandra.c index 13f46f4572..05cd3d9759 100644 --- a/src/lib-sql/driver-cassandra.c +++ b/src/lib-sql/driver-cassandra.c @@ -8,10 +8,12 @@ #include "net.h" #include "write-full.h" #include "time-util.h" +#include "var-expand.h" #include "settings-parser.h" #include "sql-api-private.h" #ifdef BUILD_CASSANDRA +#include #include #include @@ -66,6 +68,9 @@ struct cassandra_db { ARRAY(struct cassandra_result *) results; unsigned int callback_ids; + char *metrics_path; + struct timeout *to_metrics; + struct timeval first_fallback_sent[CASSANDRA_QUERY_TYPE_COUNT]; time_t last_fallback_warning[CASSANDRA_QUERY_TYPE_COUNT]; unsigned int fallback_failures[CASSANDRA_QUERY_TYPE_COUNT]; @@ -442,6 +447,9 @@ static void driver_cassandra_parse_connect_string(struct cassandra_db *db, } else if (strcmp(key, "request_timeout") == 0) { if (settings_get_time(value, &db->request_timeout_secs, &error) < 0) i_fatal("cassandra: Invalid request_timeout '%s': %s", value, error); + } else if (strcmp(key, "metrics") == 0) { + i_free(db->metrics_path); + db->metrics_path = i_strdup(value); } else { i_fatal("cassandra: Unknown connect string: %s", key); } @@ -461,6 +469,69 @@ static void driver_cassandra_parse_connect_string(struct cassandra_db *db, db->hosts = i_strdup(str_c(hosts)); } +static void +driver_cassandra_get_metrics_json(struct cassandra_db *db, string_t *dest) +{ +#define ADD_UINT64(_struct, _field) \ + str_printfa(dest, "\""#_field"\": %llu,", (unsigned long long)metrics._struct._field); +#define ADD_DOUBLE(_struct, _field) \ + str_printfa(dest, "\""#_field"\": %02lf,", metrics._struct._field); + CassMetrics metrics; + + cass_session_get_metrics(db->session, &metrics); + str_append(dest, "{ \"requests\": {"); + ADD_UINT64(requests, min); + ADD_UINT64(requests, max); + ADD_UINT64(requests, mean); + ADD_UINT64(requests, stddev); + ADD_UINT64(requests, median); + ADD_UINT64(requests, percentile_75th); + ADD_UINT64(requests, percentile_95th); + ADD_UINT64(requests, percentile_98th); + ADD_UINT64(requests, percentile_99th); + ADD_UINT64(requests, percentile_999th); + ADD_DOUBLE(requests, mean_rate); + ADD_DOUBLE(requests, one_minute_rate); + ADD_DOUBLE(requests, five_minute_rate); + ADD_DOUBLE(requests, fifteen_minute_rate); + str_truncate(dest, str_len(dest)-1); + str_append(dest, "}, \"stats\": {"); + ADD_UINT64(stats, total_connections); + ADD_UINT64(stats, available_connections); + ADD_UINT64(stats, exceeded_pending_requests_water_mark); + ADD_UINT64(stats, exceeded_write_bytes_water_mark); + str_truncate(dest, str_len(dest)-1); + str_append(dest, "}, \"errors\": {"); + ADD_UINT64(errors, connection_timeouts); + ADD_UINT64(errors, pending_request_timeouts); + ADD_UINT64(errors, request_timeouts); + str_truncate(dest, str_len(dest)-1); + str_append(dest, "}}"); +} + +static void driver_cassandra_metrics_write(struct cassandra_db *db) +{ + struct var_expand_table tab[] = { + { '\0', NULL, NULL } + }; + string_t *path = t_str_new(64); + string_t *data; + int fd; + + var_expand(path, db->metrics_path, tab); + + fd = open(str_c(path), O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK, 0600); + if (fd == -1) { + i_error("creat(%s) failed: %m", str_c(path)); + return; + } + data = t_str_new(1024); + driver_cassandra_get_metrics_json(db, data); + if (write_full(fd, str_data(data), str_len(data)) < 0) + i_error("write(%s) failed: %m", str_c(path)); + i_close_fd(&fd); +} + static struct sql_db *driver_cassandra_init_v(const char *connect_string) { struct cassandra_db *db; @@ -489,6 +560,8 @@ static struct sql_db *driver_cassandra_init_v(const char *connect_string) if (db->num_threads != 0) cass_cluster_set_num_threads_io(db->cluster, db->num_threads); db->session = cass_session_new(); + if (db->metrics_path != NULL) + db->to_metrics = timeout_add(1000, driver_cassandra_metrics_write, db); i_array_init(&db->results, 16); i_array_init(&db->callbacks, 16); return &db->api; @@ -508,6 +581,9 @@ static void driver_cassandra_deinit_v(struct sql_db *_db) cass_session_free(db->session); cass_cluster_free(db->cluster); cass_timestamp_gen_free(db->timestamp_gen); + if (db->to_metrics != NULL) + timeout_remove(&db->to_metrics); + i_free(db->metrics_path); i_free(db->hosts); i_free(db->error); i_free(db->keyspace);