]> git.ipfire.org Git - thirdparty/collectd.git/commitdiff
write_open_telemetry plugin: Use resource metrics to stage metric families.
authorFlorian Forster <octo@collectd.org>
Sat, 16 Dec 2023 19:59:02 +0000 (20:59 +0100)
committerFlorian Forster <octo@collectd.org>
Wed, 3 Jan 2024 16:16:28 +0000 (17:16 +0100)
src/write_open_telemetry.cc

index 669e2b2531d68458a90634e7ec6d110920e3b4d4..916f2c66f8ec902b11fce0cf12ed4c90fbf4c6e0 100644 (file)
@@ -41,7 +41,7 @@ extern "C" {
 #include "plugin.h"
 #include "utils/common/common.h"
 
-#include "utils/avltree/avltree.h"
+#include "utils/resource_metrics/resource_metrics.h"
 #include "utils/strbuf/strbuf.h"
 #include "utils_complain.h"
 
@@ -75,34 +75,15 @@ typedef struct {
   char *host;
   char *port;
 
+  resource_metrics_set_t resource_metrics;
   cdtime_t staged_time;
-  c_avl_tree_t *staged_metrics;         // char* metric_identity() -> NULL
-  c_avl_tree_t *staged_metric_families; // char* fam->name -> metric_family_t*
 
   std::unique_ptr<MetricsService::Stub> stub;
 
   pthread_mutex_t mu;
 } ot_callback_t;
 
-static int ot_send_buffer(ot_callback_t *cb) {
-  size_t families_num = (size_t)c_avl_size(cb->staged_metric_families);
-  metric_family_t const *families[families_num];
-
-  memset(families, 0, sizeof(families));
-
-  c_avl_iterator_t *iter = c_avl_get_iterator(cb->staged_metric_families);
-  for (size_t i = 0; i < families_num; i++) {
-    metric_family_t *fam = NULL;
-    int status = c_avl_iterator_next(iter, /* key = */ NULL, (void **)&fam);
-    if (status != 0) {
-      ERROR("write_open_telemetry plugin: found %zu metric families, want %zu",
-            i, families_num);
-      return -1;
-    }
-
-    families[i] = fam;
-  }
-
+static int export_metrics(ot_callback_t *cb) {
   if (cb->stub == NULL) {
     strbuf_t buf = STRBUF_CREATE;
     strbuf_printf(&buf, "%s:%s", cb->host, cb->port);
@@ -114,8 +95,7 @@ static int ot_send_buffer(ot_callback_t *cb) {
     STRBUF_DESTROY(buf);
   }
 
-  auto req = format_open_telemetry_export_metrics_service_request(families,
-                                                                  families_num);
+  auto req = format_open_telemetry_export_metrics_service_request(cb->resource_metrics);
 
   grpc::ClientContext context;
   ExportMetricsServiceResponse resp;
@@ -133,37 +113,15 @@ static int ot_send_buffer(ot_callback_t *cb) {
     NOTICE("write_open_telemetry plugin: %" PRId64
            " data points were rejected: %s",
            ps.rejected_data_points(), ps.error_message().c_str());
-  } else {
-    DEBUG("write_open_telemetry plugin: Successfully sent %zu metric families",
-          families_num);
   }
 
   delete req;
   return 0;
 }
 
-static void ot_reset_buffer(ot_callback_t *cb) {
-  void *dummy = NULL;
-  metric_family_t *fam = NULL;
-  while (c_avl_pick(cb->staged_metric_families, &dummy, (void **)&fam) == 0) {
-    metric_family_free(fam);
-  }
-
-  char *id = NULL;
-  while (c_avl_pick(cb->staged_metrics, (void **)&id, NULL) == 0) {
-    sfree(id);
-  }
-}
-
 /* NOTE: You must hold cb->send_lock when calling this function! */
 static int ot_flush_nolock(cdtime_t timeout, ot_callback_t *cb) {
-  int staged_num = c_avl_size(cb->staged_metrics);
-
-  DEBUG("write_open_telemetry plugin: ot_flush_nolock: timeout = %.3f; "
-        "send_buf_fill = %d;",
-        CDTIME_T_TO_DOUBLE(timeout), staged_num);
-
-  if (staged_num == 0) {
+  if (cb->resource_metrics.num == 0) {
     cb->staged_time = cdtime();
     return 0;
   }
@@ -175,8 +133,8 @@ static int ot_flush_nolock(cdtime_t timeout, ot_callback_t *cb) {
       return 0;
   }
 
-  int status = ot_send_buffer(cb);
-  ot_reset_buffer(cb);
+  int status = export_metrics(cb);
+  resource_metrics_reset(&cb->resource_metrics);
 
   return status;
 }
@@ -197,9 +155,6 @@ static void ot_callback_decref(void *data) {
 
   cb->stub.reset();
 
-  c_avl_destroy(cb->staged_metrics);
-  c_avl_destroy(cb->staged_metric_families);
-
   sfree(cb->host);
   sfree(cb->port);
 
@@ -224,114 +179,16 @@ static int ot_flush(cdtime_t timeout,
   return status;
 }
 
-static bool ot_metric_is_staged(ot_callback_t *cb, metric_t const *m) {
-  strbuf_t id = STRBUF_CREATE;
-  metric_identity(&id, m);
-
-  int status = c_avl_get(cb->staged_metrics, id.ptr, NULL);
-  STRBUF_DESTROY(id);
-  return status == 0;
-}
-
-static bool ot_need_flush(ot_callback_t *cb, metric_family_t const *fam) {
-  int status = c_avl_get(cb->staged_metric_families, fam, NULL);
-  if (status != 0) {
-    return false;
-  }
-
-  /* if any of the metrics are already staged, we should flush before adding
-   * this metric family. */
-  for (size_t i = 0; i < fam->metric.num; i++) {
-    bool ok = ot_metric_is_staged(cb, fam->metric.ptr + i);
-    if (ok) {
-      return true;
-    }
-  }
-
-  return false;
-}
-
-static int ot_mark_metric_staged(ot_callback_t *cb, metric_t const *m) {
-  strbuf_t buf = STRBUF_CREATE;
-  int status = metric_identity(&buf, m);
-  if (status != 0) {
-    ERROR("write_open_telemetry plugin: metric_identity failed: %d", status);
-    STRBUF_DESTROY(buf);
-    return status;
-  }
-
-  char *id = strdup(buf.ptr);
-  if (id == NULL) {
-    STRBUF_DESTROY(buf);
-    return errno;
-  }
-
-  status = c_avl_insert(cb->staged_metrics, id, /* value = */ NULL);
-  if (status != 0) {
-    ERROR("write_open_telemetry plugin: c_avl_insert(\"%s\") failed: %d",
-          buf.ptr, status);
-    STRBUF_DESTROY(buf);
-    return status;
-  }
-
-  if (cb->staged_time == 0 || cb->staged_time > m->time) {
-    cb->staged_time = m->time;
-  }
-
-  DEBUG("write_open_telemetry plugin: Successfully marked metric \"%s\" as "
-        "staged",
-        id);
-  STRBUF_DESTROY(buf);
-  return 0;
-}
-
-static metric_family_t *ot_staged_metric_family(ot_callback_t *cb,
-                                                metric_family_t const *fam) {
-  metric_family_t *ret = NULL;
-  int status = c_avl_get(cb->staged_metric_families, fam, (void **)&ret);
-  if (status == 0) {
-    DEBUG("write_open_telemetry plugin: Found staged metric family \"%s\"",
-          ret->name);
-    return ret;
-  }
-
-  ret = metric_family_clone_shallow(fam);
-  c_avl_insert(cb->staged_metric_families, ret, ret);
-  DEBUG("write_open_telemetry plugin: Successfully staged metric family \"%s\"",
-        ret->name);
-  return ret;
-}
-
 static int ot_write(metric_family_t const *fam, user_data_t *user_data) {
-  if ((fam == NULL) || (user_data == NULL)) {
+  ot_callback_t *cb = (ot_callback_t *)user_data->data;
+  if ((fam == NULL) || (cb == NULL)) {
     return EINVAL;
   }
-
-  ot_callback_t *cb = (ot_callback_t *)user_data->data;
   pthread_mutex_lock(&cb->mu);
-
-  if (ot_need_flush(cb, fam)) {
-    cdtime_t timeout = 0;
-    ot_flush_nolock(timeout, cb);
-  }
-
-  metric_family_t *stage = ot_staged_metric_family(cb, fam);
-  size_t offset = stage->metric.num;
-
-  int status = metric_family_append_list(stage, fam->metric);
-  if (status != 0) {
-    ERROR("write_open_telemetry plugin: metric_list_append_list failed: %d",
-          status);
-    pthread_mutex_unlock(&cb->mu);
-    return status;
-  }
-
-  for (size_t i = offset; i < stage->metric.num; i++) {
-    ot_mark_metric_staged(cb, &stage->metric.ptr[i]);
-  }
-
+  int status = resource_metrics_add(&cb->resource_metrics, fam);
   pthread_mutex_unlock(&cb->mu);
-  return 0;
+
+  return status;
 }
 
 static int ot_config_node(oconfig_item_t *ci) {
@@ -346,11 +203,6 @@ static int ot_config_node(oconfig_item_t *ci) {
   cb->host = strdup(OT_DEFAULT_HOST);
   cb->port = strdup(OT_DEFAULT_PORT);
 
-  cb->staged_metrics =
-      c_avl_create((int (*)(const void *, const void *))strcmp);
-  cb->staged_metric_families =
-      c_avl_create((int (*)(const void *, const void *))metric_family_compare);
-
   pthread_mutex_init(&cb->mu, /* attr = */ NULL);
 
   for (int i = 0; i < ci->children_num; i++) {