From: Florian Forster Date: Sat, 16 Dec 2023 19:59:02 +0000 (+0100) Subject: write_open_telemetry plugin: Use resource metrics to stage metric families. X-Git-Tag: 6.0.0-rc0~17^2~13 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=bbc7da757181b61de89b8797d597d0c7d19cb5fc;p=thirdparty%2Fcollectd.git write_open_telemetry plugin: Use resource metrics to stage metric families. --- diff --git a/src/write_open_telemetry.cc b/src/write_open_telemetry.cc index 669e2b253..916f2c66f 100644 --- a/src/write_open_telemetry.cc +++ b/src/write_open_telemetry.cc @@ -41,7 +41,7 @@ extern "C" { #include "plugin.h" #include "utils/common/common.h" -#include "utils/avltree/avltree.h" +#include "utils/resource_metrics/resource_metrics.h" #include "utils/strbuf/strbuf.h" #include "utils_complain.h" @@ -75,34 +75,15 @@ typedef struct { char *host; char *port; + resource_metrics_set_t resource_metrics; cdtime_t staged_time; - c_avl_tree_t *staged_metrics; // char* metric_identity() -> NULL - c_avl_tree_t *staged_metric_families; // char* fam->name -> metric_family_t* std::unique_ptr stub; pthread_mutex_t mu; } ot_callback_t; -static int ot_send_buffer(ot_callback_t *cb) { - size_t families_num = (size_t)c_avl_size(cb->staged_metric_families); - metric_family_t const *families[families_num]; - - memset(families, 0, sizeof(families)); - - c_avl_iterator_t *iter = c_avl_get_iterator(cb->staged_metric_families); - for (size_t i = 0; i < families_num; i++) { - metric_family_t *fam = NULL; - int status = c_avl_iterator_next(iter, /* key = */ NULL, (void **)&fam); - if (status != 0) { - ERROR("write_open_telemetry plugin: found %zu metric families, want %zu", - i, families_num); - return -1; - } - - families[i] = fam; - } - +static int export_metrics(ot_callback_t *cb) { if (cb->stub == NULL) { strbuf_t buf = STRBUF_CREATE; strbuf_printf(&buf, "%s:%s", cb->host, cb->port); @@ -114,8 +95,7 @@ static int ot_send_buffer(ot_callback_t *cb) { STRBUF_DESTROY(buf); } - auto req = format_open_telemetry_export_metrics_service_request(families, - families_num); + auto req = format_open_telemetry_export_metrics_service_request(cb->resource_metrics); grpc::ClientContext context; ExportMetricsServiceResponse resp; @@ -133,37 +113,15 @@ static int ot_send_buffer(ot_callback_t *cb) { NOTICE("write_open_telemetry plugin: %" PRId64 " data points were rejected: %s", ps.rejected_data_points(), ps.error_message().c_str()); - } else { - DEBUG("write_open_telemetry plugin: Successfully sent %zu metric families", - families_num); } delete req; return 0; } -static void ot_reset_buffer(ot_callback_t *cb) { - void *dummy = NULL; - metric_family_t *fam = NULL; - while (c_avl_pick(cb->staged_metric_families, &dummy, (void **)&fam) == 0) { - metric_family_free(fam); - } - - char *id = NULL; - while (c_avl_pick(cb->staged_metrics, (void **)&id, NULL) == 0) { - sfree(id); - } -} - /* NOTE: You must hold cb->send_lock when calling this function! */ static int ot_flush_nolock(cdtime_t timeout, ot_callback_t *cb) { - int staged_num = c_avl_size(cb->staged_metrics); - - DEBUG("write_open_telemetry plugin: ot_flush_nolock: timeout = %.3f; " - "send_buf_fill = %d;", - CDTIME_T_TO_DOUBLE(timeout), staged_num); - - if (staged_num == 0) { + if (cb->resource_metrics.num == 0) { cb->staged_time = cdtime(); return 0; } @@ -175,8 +133,8 @@ static int ot_flush_nolock(cdtime_t timeout, ot_callback_t *cb) { return 0; } - int status = ot_send_buffer(cb); - ot_reset_buffer(cb); + int status = export_metrics(cb); + resource_metrics_reset(&cb->resource_metrics); return status; } @@ -197,9 +155,6 @@ static void ot_callback_decref(void *data) { cb->stub.reset(); - c_avl_destroy(cb->staged_metrics); - c_avl_destroy(cb->staged_metric_families); - sfree(cb->host); sfree(cb->port); @@ -224,114 +179,16 @@ static int ot_flush(cdtime_t timeout, return status; } -static bool ot_metric_is_staged(ot_callback_t *cb, metric_t const *m) { - strbuf_t id = STRBUF_CREATE; - metric_identity(&id, m); - - int status = c_avl_get(cb->staged_metrics, id.ptr, NULL); - STRBUF_DESTROY(id); - return status == 0; -} - -static bool ot_need_flush(ot_callback_t *cb, metric_family_t const *fam) { - int status = c_avl_get(cb->staged_metric_families, fam, NULL); - if (status != 0) { - return false; - } - - /* if any of the metrics are already staged, we should flush before adding - * this metric family. */ - for (size_t i = 0; i < fam->metric.num; i++) { - bool ok = ot_metric_is_staged(cb, fam->metric.ptr + i); - if (ok) { - return true; - } - } - - return false; -} - -static int ot_mark_metric_staged(ot_callback_t *cb, metric_t const *m) { - strbuf_t buf = STRBUF_CREATE; - int status = metric_identity(&buf, m); - if (status != 0) { - ERROR("write_open_telemetry plugin: metric_identity failed: %d", status); - STRBUF_DESTROY(buf); - return status; - } - - char *id = strdup(buf.ptr); - if (id == NULL) { - STRBUF_DESTROY(buf); - return errno; - } - - status = c_avl_insert(cb->staged_metrics, id, /* value = */ NULL); - if (status != 0) { - ERROR("write_open_telemetry plugin: c_avl_insert(\"%s\") failed: %d", - buf.ptr, status); - STRBUF_DESTROY(buf); - return status; - } - - if (cb->staged_time == 0 || cb->staged_time > m->time) { - cb->staged_time = m->time; - } - - DEBUG("write_open_telemetry plugin: Successfully marked metric \"%s\" as " - "staged", - id); - STRBUF_DESTROY(buf); - return 0; -} - -static metric_family_t *ot_staged_metric_family(ot_callback_t *cb, - metric_family_t const *fam) { - metric_family_t *ret = NULL; - int status = c_avl_get(cb->staged_metric_families, fam, (void **)&ret); - if (status == 0) { - DEBUG("write_open_telemetry plugin: Found staged metric family \"%s\"", - ret->name); - return ret; - } - - ret = metric_family_clone_shallow(fam); - c_avl_insert(cb->staged_metric_families, ret, ret); - DEBUG("write_open_telemetry plugin: Successfully staged metric family \"%s\"", - ret->name); - return ret; -} - static int ot_write(metric_family_t const *fam, user_data_t *user_data) { - if ((fam == NULL) || (user_data == NULL)) { + ot_callback_t *cb = (ot_callback_t *)user_data->data; + if ((fam == NULL) || (cb == NULL)) { return EINVAL; } - - ot_callback_t *cb = (ot_callback_t *)user_data->data; pthread_mutex_lock(&cb->mu); - - if (ot_need_flush(cb, fam)) { - cdtime_t timeout = 0; - ot_flush_nolock(timeout, cb); - } - - metric_family_t *stage = ot_staged_metric_family(cb, fam); - size_t offset = stage->metric.num; - - int status = metric_family_append_list(stage, fam->metric); - if (status != 0) { - ERROR("write_open_telemetry plugin: metric_list_append_list failed: %d", - status); - pthread_mutex_unlock(&cb->mu); - return status; - } - - for (size_t i = offset; i < stage->metric.num; i++) { - ot_mark_metric_staged(cb, &stage->metric.ptr[i]); - } - + int status = resource_metrics_add(&cb->resource_metrics, fam); pthread_mutex_unlock(&cb->mu); - return 0; + + return status; } static int ot_config_node(oconfig_item_t *ci) { @@ -346,11 +203,6 @@ static int ot_config_node(oconfig_item_t *ci) { cb->host = strdup(OT_DEFAULT_HOST); cb->port = strdup(OT_DEFAULT_PORT); - cb->staged_metrics = - c_avl_create((int (*)(const void *, const void *))strcmp); - cb->staged_metric_families = - c_avl_create((int (*)(const void *, const void *))metric_family_compare); - pthread_mutex_init(&cb->mu, /* attr = */ NULL); for (int i = 0; i < ci->children_num; i++) {