typedef void driver_cassandra_callback_t(CassFuture *future, void *context);
+enum cassandra_counter_type {
+ CASSANDRA_COUNTER_TYPE_QUERY_SENT,
+ CASSANDRA_COUNTER_TYPE_QUERY_RECV_OK,
+ CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_NO_HOSTS,
+ CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_QUEUE_FULL,
+ CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_CLIENT_TIMEOUT,
+ CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_SERVER_TIMEOUT,
+ CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_SERVER_UNAVAILABLE,
+ CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_OTHER,
+
+ CASSANDRA_COUNTER_COUNT
+};
+static const char *counter_names[CASSANDRA_COUNTER_COUNT] = {
+ "sent",
+ "recv_ok",
+ "recv_err_no_hosts",
+ "recv_err_queue_full",
+ "recv_err_client_timeout",
+ "recv_err_server_timeout",
+ "recv_err_server_unavailable",
+ "recv_err_other",
+};
+
enum cassandra_query_type {
CASSANDRA_QUERY_TYPE_READ,
CASSANDRA_QUERY_TYPE_WRITE,
char *metrics_path;
struct timeout *to_metrics;
+ uint64_t counters[CASSANDRA_COUNTER_COUNT];
struct timeval first_fallback_sent[CASSANDRA_QUERY_TYPE_COUNT];
time_t last_fallback_warning[CASSANDRA_QUERY_TYPE_COUNT];
ADD_DOUBLE(requests, five_minute_rate);
ADD_DOUBLE(requests, fifteen_minute_rate);
str_truncate(dest, str_len(dest)-1);
+
str_append(dest, "}, \"stats\": {");
ADD_UINT64(stats, total_connections);
ADD_UINT64(stats, available_connections);
ADD_UINT64(stats, exceeded_pending_requests_water_mark);
ADD_UINT64(stats, exceeded_write_bytes_water_mark);
str_truncate(dest, str_len(dest)-1);
+
str_append(dest, "}, \"errors\": {");
ADD_UINT64(errors, connection_timeouts);
ADD_UINT64(errors, pending_request_timeouts);
ADD_UINT64(errors, request_timeouts);
str_truncate(dest, str_len(dest)-1);
+
+ str_append(dest, "}, \"queries\": {");
+ for (unsigned int i = 0; i < CASSANDRA_COUNTER_COUNT; i++) {
+ str_printfa(dest, "\"%s\": %llu,", counter_names[i],
+ (unsigned long long)db->counters[i]);
+ }
+ str_truncate(dest, str_len(dest)-1);
str_append(dest, "}}");
}
driver_cassandra_result_send_query(result);
}
+static void counters_inc_error(struct cassandra_db *db, CassError error)
+{
+ switch (error) {
+ case CASS_ERROR_LIB_NO_HOSTS_AVAILABLE:
+ db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_NO_HOSTS]++;
+ break;
+ case CASS_ERROR_LIB_REQUEST_QUEUE_FULL:
+ db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_QUEUE_FULL]++;
+ break;
+ case CASS_ERROR_LIB_REQUEST_TIMED_OUT:
+ db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_CLIENT_TIMEOUT]++;
+ break;
+ case CASS_ERROR_SERVER_WRITE_TIMEOUT:
+ db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_SERVER_TIMEOUT]++;
+ break;
+ case CASS_ERROR_SERVER_UNAVAILABLE:
+ db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_SERVER_UNAVAILABLE]++;
+ break;
+ default:
+ db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_OTHER]++;
+ break;
+ }
+}
+
static void query_callback(CassFuture *future, void *context)
{
struct cassandra_result *result = context;
i_free(result->error);
msecs = timeval_diff_msecs(&ioloop_timeval, &result->start_time);
+ counters_inc_error(db, error);
result->api.error_type = error == CASS_ERROR_SERVER_WRITE_TIMEOUT ||
error == CASS_ERROR_LIB_REQUEST_TIMED_OUT ?
SQL_RESULT_ERROR_TYPE_WRITE_UNCERTAIN :
SQL_RESULT_ERROR_TYPE_UNKNOWN;
result->error = i_strdup_printf("Query '%s' failed: %.*s (in %u.%03u secs)",
result->query, (int)errsize, errmsg, msecs/1000, msecs%1000);
+
/* unavailable = cassandra server knows that there aren't
enough nodes available.
result_finish(result);
return;
}
+ db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_OK]++;
if (result->fallback_consistency != result->consistency) {
/* non-fallback query finished successfully. if there had been
struct cassandra_db *db = (struct cassandra_db *)result->api.db;
CassFuture *future;
+ db->counters[CASSANDRA_COUNTER_TYPE_QUERY_SENT]++;
+
result->statement = cass_statement_new(result->query, 0);
cass_statement_set_consistency(result->statement, result->consistency);