From 39af0e036e9e21663e81ea9793720be27e36649b Mon Sep 17 00:00:00 2001 From: Alberto Leiva Popper Date: Tue, 5 Aug 2025 18:42:34 -0600 Subject: [PATCH] Implement Prometheus Fixes #50. For now, it only implements the following stats: fort_valid_vrps_total{ta="",proto="ipv"} Total VRPs generated from TA (and the given protocol) during the previous cycle. "" is inferred from the TAL's file name. fort_rtr_current_connections Number of active RTR clients. To activate the server, set --mode=server and --prometheus.port to an allowed and available port number. Adds libmicrohttpd as a dependency. --- configure.ac | 1 + src/Makefile.am | 4 +- src/config.c | 25 ++++++ src/config.h | 1 + src/main.c | 14 +++- src/object/tal.c | 6 ++ src/prometheus.c | 100 +++++++++++++++++++++++ src/prometheus.h | 7 ++ src/rtr/db/db_table.c | 33 ++++++-- src/rtr/db/db_table.h | 2 + src/rtr/rtr.c | 3 + src/stats.c | 184 ++++++++++++++++++++++++++++++++++++++++++ src/stats.h | 15 ++++ 13 files changed, 385 insertions(+), 10 deletions(-) create mode 100644 src/prometheus.c create mode 100644 src/prometheus.h create mode 100644 src/stats.c create mode 100644 src/stats.h diff --git a/configure.ac b/configure.ac index 4000561a..6de3506c 100644 --- a/configure.ac +++ b/configure.ac @@ -45,6 +45,7 @@ AM_CONDITIONAL([BACKTRACE_ENABLED], [test "x$have_backtrace" != "xno"]) PKG_CHECK_MODULES([JANSSON], [jansson]) PKG_CHECK_MODULES([CURL], [libcurl]) PKG_CHECK_MODULES([XML2], [libxml-2.0]) +PKG_CHECK_MODULES([MICROHTTPD], [libmicrohttpd]) PKG_CHECK_MODULES([CHECK], [check], [usetests=yes], [usetests=no]) AM_CONDITIONAL([USE_TESTS], [test "x$usetests" = "xyes"]) diff --git a/src/Makefile.am b/src/Makefile.am index 8d9c2d0e..215e29ca 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -24,11 +24,13 @@ fort_SOURCES += log.h log.c fort_SOURCES += nid.h nid.c fort_SOURCES += output_printer.h output_printer.c fort_SOURCES += print_file.h print_file.c +fort_SOURCES += prometheus.h prometheus.c fort_SOURCES += resource.h resource.c fort_SOURCES += rpp.h rpp.c fort_SOURCES += rrdp.h rrdp.c fort_SOURCES += sorted_array.h sorted_array.c fort_SOURCES += state.h state.c +fort_SOURCES += stats.h stats.c fort_SOURCES += str_token.h str_token.c fort_SOURCES += thread_var.h thread_var.c fort_SOURCES += json_handler.h json_handler.c @@ -124,7 +126,7 @@ if BACKTRACE_ENABLED fort_CFLAGS += -DBACKTRACE_ENABLED endif fort_LDFLAGS = $(LDFLAGS_DEBUG) -fort_LDADD = ${JANSSON_LIBS} ${CURL_LIBS} ${XML2_LIBS} +fort_LDADD = ${JANSSON_LIBS} ${CURL_LIBS} ${XML2_LIBS} ${MICROHTTPD_LIBS} # I'm tired of scrolling up, but feel free to comment this out. GCC_WARNS = -fmax-errors=1 diff --git a/src/config.c b/src/config.c index e3e807c9..22a25dac 100644 --- a/src/config.c +++ b/src/config.c @@ -75,6 +75,10 @@ struct rpki_config { unsigned int deltas_lifetime; } server; + struct { + unsigned int port; + } prometheus; + struct { /* Enables the protocol */ bool enabled; @@ -420,6 +424,19 @@ static const struct option_field options[] = { .max = UINT_MAX, }, + /* Prometheus fields */ + { + .id = 14000, + .name = "prometheus.port", + .type = >_uint, + .offset = offsetof(struct rpki_config, prometheus.port), + .doc = "Port to bind the Prometheus server to. " + "Prometheus requires this value and 'server' mode to start. " + "Unlike server.port, prometheus.port will not be resolved.", + .min = 1, + .max = 0xFFFF, + }, + /* RSYNC fields */ { .id = 3000, @@ -955,6 +972,8 @@ set_default_values(void) rpki_config.server.interval.expire = 7200; rpki_config.server.deltas_lifetime = 2; + rpki_config.prometheus.port = 0; + rpki_config.rsync.enabled = true; rpki_config.rsync.priority = 50; rpki_config.rsync.strategy = pstrdup(""); @@ -1228,6 +1247,12 @@ config_get_deltas_lifetime(void) return rpki_config.server.deltas_lifetime; } +unsigned int +config_get_prometheus_port(void) +{ + return rpki_config.prometheus.port; +} + char const * config_get_slurm(void) { diff --git a/src/config.h b/src/config.h index a40f0f15..3a0b3feb 100644 --- a/src/config.h +++ b/src/config.h @@ -27,6 +27,7 @@ unsigned int config_get_interval_refresh(void); unsigned int config_get_interval_retry(void); unsigned int config_get_interval_expire(void); unsigned int config_get_deltas_lifetime(void); +unsigned int config_get_prometheus_port(void); char const *config_get_slurm(void); char const *config_get_tal(void); diff --git a/src/main.c b/src/main.c index 1d6295a3..e8a8989c 100644 --- a/src/main.c +++ b/src/main.c @@ -6,6 +6,8 @@ #include "log.h" #include "nid.h" #include "print_file.h" +#include "prometheus.h" +#include "stats.h" #include "rtr/rtr.h" #include "thread_var.h" #include "xml/relax_ng.h" @@ -127,9 +129,15 @@ main(int argc, char **argv) error = handle_flags_config(argc, argv); if (error) goto revert_log; - error = nid_init(); + error = stats_setup(); if (error) goto revert_config; + error = prometheus_setup(); + if (error) + goto revert_stats; + error = nid_init(); + if (error) + goto revert_prometheus; error = extension_init(); if (error) goto revert_nid; @@ -167,6 +175,10 @@ revert_http: http_cleanup(); revert_nid: nid_destroy(); +revert_prometheus: + prometheus_teardown(); +revert_stats: + stats_teardown(); revert_config: free_rpki_config(); revert_log: diff --git a/src/object/tal.c b/src/object/tal.c index 41d8d17a..33515748 100644 --- a/src/object/tal.c +++ b/src/object/tal.c @@ -10,6 +10,7 @@ #include "crypto/base64.h" #include "line_file.h" #include "log.h" +#include "stats.h" #include "thread_var.h" typedef int (*foreach_uri_cb)(struct tal *, struct rpki_uri *, void *); @@ -528,6 +529,11 @@ perform_standalone_validation(void) } if (!error) { + stats_set_tal_vrps(thread->tal_file, "ipv4", + db_table_roa_count_v4(thread->db)); + stats_set_tal_vrps(thread->tal_file, "ipv6", + db_table_roa_count_v6(thread->db)); + if (db == NULL) { db = thread->db; thread->db = NULL; diff --git a/src/prometheus.c b/src/prometheus.c new file mode 100644 index 00000000..663801f7 --- /dev/null +++ b/src/prometheus.c @@ -0,0 +1,100 @@ +#include "prometheus.h" + +#include +#include + +#include "config.h" +#include "log.h" +#include "stats.h" + +#define CONTENT_TYPE "application/openmetrics-text; version=1.0.0; charset=utf-8" + +static struct MHD_Daemon *prometheus_daemon; + +static enum MHD_Result +respond(struct MHD_Connection *conn, char *msg, unsigned int status) +{ + struct MHD_Response *response; + enum MHD_Result result; + + response = MHD_create_response_from_buffer(strlen(msg), msg, + MHD_RESPMEM_PERSISTENT); + result = MHD_queue_response(conn, status, response); + MHD_destroy_response(response); + + return result; +} + +static enum MHD_Result +send_metrics(struct MHD_Connection *conn) +{ + char *stats; + struct MHD_Response *res; + enum MHD_Result ret; + + pr_op_debug("Handling Prometheus request..."); + + stats = stats_export(); + res = MHD_create_response_from_buffer_with_free_callback(strlen(stats), + stats, free); + + ret = MHD_add_response_header(res, "Content-Type", CONTENT_TYPE); + if (ret != MHD_YES) { + pr_op_debug("Could not set Content-Type HTTP header."); + /* Keep going; maybe the client won't care. */ + } + + ret = MHD_queue_response(conn, MHD_HTTP_OK, res); + MHD_destroy_response(res); + + pr_op_debug("Prometheus request handled."); + return MHD_YES; +} + +static enum MHD_Result +handle_prometheus_req(void *cls, struct MHD_Connection *conn, + const char *url, const char *method, const char *version, + const char *upload, size_t *uplen, void **state) +{ + if (strcmp(method, "GET") != 0) + return respond(conn, "Invalid HTTP Method\n", MHD_HTTP_BAD_REQUEST); + + if (strcmp(url, "/") == 0) + return respond(conn, "OK\n", MHD_HTTP_OK); + if (strcmp(url, "/metrics") == 0) + return send_metrics(conn); + + return respond(conn, "Bad Request\n", MHD_HTTP_BAD_REQUEST); +} + +int +prometheus_setup(void) +{ + unsigned int port; + + port = config_get_prometheus_port(); + if (config_get_mode() != SERVER || port == 0) + return 0; + + pr_op_debug("Starting Prometheus server..."); + + prometheus_daemon = MHD_start_daemon( + MHD_USE_THREAD_PER_CONNECTION, /* flags */ + port, /* port */ + NULL, NULL, /* accept policy */ + &handle_prometheus_req, NULL, /* handler */ + MHD_OPTION_END /* options */ + ); + + if (prometheus_daemon == NULL) + return pr_op_err("Could not start Prometheus server; Unknown error"); + + pr_op_debug("Prometheus server started."); + return 0; +} + +void +prometheus_teardown(void) +{ + MHD_stop_daemon(prometheus_daemon); +} diff --git a/src/prometheus.h b/src/prometheus.h new file mode 100644 index 00000000..dfba6b52 --- /dev/null +++ b/src/prometheus.h @@ -0,0 +1,7 @@ +#ifndef SRC_PROMETHEUS_H_ +#define SRC_PROMETHEUS_H_ + +int prometheus_setup(void); +void prometheus_teardown(void); + +#endif /* SRC_PROMETHEUS_H_ */ diff --git a/src/rtr/db/db_table.c b/src/rtr/db/db_table.c index f2736e4a..2038381d 100644 --- a/src/rtr/db/db_table.c +++ b/src/rtr/db/db_table.c @@ -19,18 +19,15 @@ struct hashable_key { struct db_table { struct hashable_roa *roas; struct hashable_key *router_keys; + + unsigned int total_roas_v4; + unsigned int total_roas_v6; }; struct db_table * db_table_create(void) { - struct db_table *table; - - table = pmalloc(sizeof(struct db_table)); - table->roas = NULL; - table->router_keys = NULL; - - return table; + return pzalloc(sizeof(struct db_table)); } void @@ -64,13 +61,21 @@ add_roa(struct db_table *table, struct hashable_roa *new) errno = 0; HASH_REPLACE(hh, table->roas, data, sizeof(new->data), new, old); error = errno; + if (error) { pr_val_err("ROA couldn't be added to hash table: %s", strerror(error)); return -error; } - if (old != NULL) + + if (old == NULL) { + switch (new->data.addr_fam) { + case AF_INET: table->total_roas_v4++; break; + case AF_INET6: table->total_roas_v6++; break; + } + } else { free(old); + } return 0; } @@ -157,6 +162,18 @@ db_table_roa_count(struct db_table *table) return HASH_COUNT(table->roas); } +unsigned int +db_table_roa_count_v4(struct db_table *table) +{ + return table->total_roas_v4; +} + +unsigned int +db_table_roa_count_v6(struct db_table *table) +{ + return table->total_roas_v6; +} + unsigned int db_table_router_key_count(struct db_table *table) { diff --git a/src/rtr/db/db_table.h b/src/rtr/db/db_table.h index a763c957..814d3af0 100644 --- a/src/rtr/db/db_table.h +++ b/src/rtr/db/db_table.h @@ -12,6 +12,8 @@ void db_table_destroy(struct db_table *); int db_table_join(struct db_table *, struct db_table *); unsigned int db_table_roa_count(struct db_table *); +unsigned int db_table_roa_count_v4(struct db_table *); +unsigned int db_table_roa_count_v6(struct db_table *); unsigned int db_table_router_key_count(struct db_table *); int db_table_foreach_roa(struct db_table const *, vrp_foreach_cb, void *); diff --git a/src/rtr/rtr.c b/src/rtr/rtr.c index 3fc7501d..d52e0d9e 100644 --- a/src/rtr/rtr.c +++ b/src/rtr/rtr.c @@ -8,6 +8,7 @@ #include "config.h" #include "data_structure/array_list.h" #include "log.h" +#include "stats.h" #include "rtr/db/vrps.h" #include "rtr/pdu_handler.h" #include "rtr/pdu_sender.h" @@ -644,6 +645,8 @@ fddb_poll(void) mutex_lock(&lock); apply_pollfds(pollfds, nclients); mutex_unlock(&lock); + + stats_gauge_set(stat_rtr_connections, clients.len); /* Fall through */ success: diff --git a/src/stats.c b/src/stats.c new file mode 100644 index 00000000..c23f0b4a --- /dev/null +++ b/src/stats.c @@ -0,0 +1,184 @@ +#include "stats.h" + +#include + +#include "alloc.h" +#include "common.h" +#include "log.h" +#include "data_structure/uthash.h" + +struct stats_gauge { + char *name; + unsigned int value; + time_t timestamp; + + UT_hash_handle hh; +}; + +static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; +static struct stats_gauge *gauges; + +struct stats_gauge *stat_rtr_connections; + +/* Steals ownership of @name */ +static struct stats_gauge * +add_gauge(char *name, size_t namelen, unsigned int value) +{ + struct stats_gauge *old; + struct stats_gauge *new; + struct stats_gauge *delete; + struct stats_gauge *result; + + new = pzalloc(sizeof(struct stats_gauge)); + new->name = name; + new->value = value; + new->timestamp = time(NULL); + + if (namelen == 0) + namelen = strlen(name); + + mutex_lock(&lock); + HASH_FIND(hh, gauges, name, namelen, old); + if (old != NULL) { + old->value = value; + old->timestamp = new->timestamp; + delete = new; + result = old; + } else { + HASH_ADD_KEYPTR(hh, gauges, name, namelen, new); + delete = NULL; + result = new; + } + mutex_unlock(&lock); + + if (delete) { + free(delete->name); + free(delete); + } + + return result; +} + +#define ADD_GAUGE(name) add_gauge(pstrdup(name), 0, 0) + +int +stats_setup(void) +{ + stat_rtr_connections = ADD_GAUGE("fort_rtr_current_connections"); + return 0; +} + +void +stats_teardown(void) +{ + struct stats_gauge *gauge, *tmp; + + HASH_ITER(hh, gauges, gauge, tmp) { + HASH_DEL(gauges, gauge); + free(gauge->name); + free(gauge); + } +} + +void +stats_gauge_set(struct stats_gauge *gauge, unsigned int value) +{ + time_t now = time(NULL); + + mutex_lock(&lock); + gauge->value = value; + gauge->timestamp = now; + mutex_unlock(&lock); +} + +void +stats_set_tal_vrps(char const *tal_path, char const *proto, unsigned int value) +{ + char const *ta, *dot; + size_t talen; + + size_t baselen; + size_t keylen; + char *key; + int chars; + + ta = strrchr(tal_path, '/'); + ta = (ta == NULL) ? tal_path : (ta + 1); + dot = strrchr(ta, '.'); + talen = dot ? (dot - ta) : strlen(ta); + + baselen = strlen("fort_valid_vrps_total{ta=\"\",proto=\"\"}"); + keylen = baselen + talen + strlen(proto) + 1; + + key = pmalloc(keylen); + chars = snprintf(key, keylen, + "fort_valid_vrps_total{ta=\"%.*s\",proto=\"%s\"}", + (int)talen, ta, proto); + if (chars < 0 || keylen <= chars) { + free(key); + pr_op_warn("Cannot create valid_vrps_total stat: %d", chars); + return; + } + + add_gauge(key, keylen - 1, value); +} + +struct stats_buffer { + char *str; + char *cursor; + size_t capacity; +}; + +static bool +printf_buf(struct stats_buffer *buf, char const *fmt, ...) +{ + size_t available; + int written; + va_list ap; + + available = buf->capacity - (buf->cursor - buf->str); + + va_start(ap, fmt); + written = vsnprintf(buf->cursor, available, fmt, ap); + va_end(ap); + + if (written < 0 || available <= written) + return false; + + buf->cursor += written; + return true; +} + +char * +stats_export(void) +{ + struct stats_buffer buf; + struct stats_gauge *gauge, *tmp; + + buf.capacity = 1024; + buf.str = buf.cursor = pmalloc(buf.capacity); + + HASH_ITER(hh, gauges, gauge, tmp) { + if (!printf_buf(&buf, "%s %u", gauge->name, gauge->value)) + goto cancel; + if (gauge->timestamp != ((time_t)-1)) + if (!printf_buf(&buf, " %jd", (intmax_t)gauge->timestamp)) + goto cancel; + if (!printf_buf(&buf, "\n")) + goto cancel; + } + + if (!printf_buf(&buf, "# EOF\n")) + goto cancel; + + if (buf.cursor >= buf.str + buf.capacity) + goto cancel; + *buf.cursor = '\0'; + + return buf.str; + +cancel: + free(buf.str); + pr_op_err("Cannot create Prometheus response: Too many stats"); + return NULL; +} diff --git a/src/stats.h b/src/stats.h new file mode 100644 index 00000000..8d190301 --- /dev/null +++ b/src/stats.h @@ -0,0 +1,15 @@ +#ifndef SRC_STATS_H_ +#define SRC_STATS_H_ + +struct stats_gauge; +extern struct stats_gauge *stat_rtr_connections; + +int stats_setup(void); +void stats_teardown(void); + +void stats_gauge_set(struct stats_gauge *, unsigned int); +void stats_set_tal_vrps(char const *, char const *, unsigned int); + +char *stats_export(void); + +#endif /* SRC_STATS_H_ */ -- 2.47.3