]> git.ipfire.org Git - thirdparty/collectd.git/commitdiff
write_open_telemetry plugin: s/cached/staged/
authorFlorian Forster <octo@collectd.org>
Sat, 2 Dec 2023 21:42:45 +0000 (22:42 +0100)
committerFlorian Forster <octo@collectd.org>
Wed, 3 Jan 2024 16:16:28 +0000 (17:16 +0100)
src/write_open_telemetry.cc

index f26f37af1c8e7e91af8621016e2843cd8a5588cb..e306f6cfdc2915ee70cddee65df0194eb1e92a6b 100644 (file)
@@ -89,19 +89,19 @@ typedef struct {
   char *port;
   char *path;
 
-  c_avl_tree_t *cached_metrics;         // char* metric_identity() -> NULL
-  c_avl_tree_t *cached_metric_families; // char* fam->name -> metric_family_t*
+  c_avl_tree_t *staged_metrics;         // char* metric_identity() -> NULL
+  c_avl_tree_t *staged_metric_families; // char* fam->name -> metric_family_t*
 
   pthread_mutex_t mu;
 } ot_callback_t;
 
 static int ot_send_buffer(ot_callback_t *cb) {
-  size_t families_num = (size_t)c_avl_size(cb->cached_metric_families);
+  size_t families_num = (size_t)c_avl_size(cb->staged_metric_families);
   metric_family_t *families[families_num];
 
   memset(families, 0, sizeof(families));
 
-  c_avl_iterator_t *iter = c_avl_get_iterator(cb->cached_metric_families);
+  c_avl_iterator_t *iter = c_avl_get_iterator(cb->staged_metric_families);
   for (size_t i = 0; i < families_num; i++) {
     metric_family_t *fam = NULL;
     int status = c_avl_iterator_next(iter, /* key = */ NULL, (void **)&fam);
@@ -122,23 +122,23 @@ static int ot_send_buffer(ot_callback_t *cb) {
 static void ot_reset_buffer(ot_callback_t *cb) {
   void *dummy = NULL;
   metric_family_t *fam = NULL;
-  while (c_avl_pick(cb->cached_metric_families, &dummy, (void **)&fam) == 0) {
+  while (c_avl_pick(cb->staged_metric_families, &dummy, (void **)&fam) == 0) {
     metric_family_free(fam);
   }
 
   char *id = NULL;
-  while (c_avl_pick(cb->cached_metrics, (void **)&id, NULL) == 0) {
+  while (c_avl_pick(cb->staged_metrics, (void **)&id, NULL) == 0) {
     sfree(id);
   }
 }
 
 /* NOTE: You must hold cb->send_lock when calling this function! */
 static int ot_flush_nolock(cdtime_t timeout, ot_callback_t *cb) {
+#if 0
   DEBUG("write_open_telemetry plugin: ot_flush_nolock: timeout = %.3f; "
         "send_buf_fill = %" PRIsz ";",
         (double)timeout, cb->send_buf_fill);
 
-#if 0
   /* timeout == 0  => flush unconditionally */
   if (timeout > 0) {
     cdtime_t now;
@@ -174,8 +174,8 @@ static void ot_callback_decref(void *data) {
 
   ot_flush_nolock(/* timeout = */ 0, cb);
 
-  c_avl_destroy(cb->cached_metrics);
-  c_avl_destroy(cb->cached_metric_families);
+  c_avl_destroy(cb->staged_metrics);
+  c_avl_destroy(cb->staged_metric_families);
 
   sfree(cb->host);
   sfree(cb->port);
@@ -202,25 +202,25 @@ static int ot_flush(cdtime_t timeout,
   return status;
 }
 
-static bool ot_metric_is_cached(ot_callback_t *cb, metric_t const *m) {
+static bool ot_metric_is_staged(ot_callback_t *cb, metric_t const *m) {
   strbuf_t id = STRBUF_CREATE;
   metric_identity(&id, m);
 
-  int status = c_avl_get(cb->cached_metrics, id.ptr, NULL);
+  int status = c_avl_get(cb->staged_metrics, id.ptr, NULL);
   STRBUF_DESTROY(id);
   return status == 0;
 }
 
 static bool ot_need_flush(ot_callback_t *cb, metric_family_t const *fam) {
-  int status = c_avl_get(cb->cached_metric_families, fam->name, NULL);
+  int status = c_avl_get(cb->staged_metric_families, fam->name, NULL);
   if (status != 0) {
     return false;
   }
 
-  /* if any of the metrics are already cached, we should flush before adding
+  /* if any of the metrics are already staged, we should flush before adding
    * this metric family. */
   for (size_t i = 0; i < fam->metric.num; i++) {
-    bool ok = ot_metric_is_cached(cb, fam->metric.ptr + i);
+    bool ok = ot_metric_is_staged(cb, fam->metric.ptr + i);
     if (ok) {
       return true;
     }
@@ -229,7 +229,7 @@ static bool ot_need_flush(ot_callback_t *cb, metric_family_t const *fam) {
   return false;
 }
 
-static int ot_mark_metric_cached(ot_callback_t *cb, metric_t const *m) {
+static int ot_mark_metric_staged(ot_callback_t *cb, metric_t const *m) {
   strbuf_t buf = STRBUF_CREATE;
   int status = metric_identity(&buf, m);
   if (status != 0) {
@@ -244,7 +244,7 @@ static int ot_mark_metric_cached(ot_callback_t *cb, metric_t const *m) {
     return errno;
   }
 
-  status = c_avl_insert(cb->cached_metrics, id, /* value = */ NULL);
+  status = c_avl_insert(cb->staged_metrics, id, /* value = */ NULL);
   if (status != 0) {
     ERROR("write_open_telemetry plugin: c_avl_insert(\"%s\") failed: %d",
           buf.ptr, status);
@@ -252,20 +252,27 @@ static int ot_mark_metric_cached(ot_callback_t *cb, metric_t const *m) {
     return status;
   }
 
+  DEBUG("write_open_telemetry plugin: Successfully marked metric \"%s\" as "
+        "staged",
+        id);
   STRBUF_DESTROY(buf);
   return 0;
 }
 
-static metric_family_t *ot_cached_metric_family(ot_callback_t *cb,
+static metric_family_t *ot_staged_metric_family(ot_callback_t *cb,
                                                 metric_family_t const *fam) {
   metric_family_t *ret = NULL;
-  int status = c_avl_get(cb->cached_metric_families, fam->name, (void **)&ret);
+  int status = c_avl_get(cb->staged_metric_families, fam->name, (void **)&ret);
   if (status == 0) {
+    DEBUG("write_open_telemetry plugin: Found staged metric family \"%s\"",
+          ret->name);
     return ret;
   }
 
   ret = metric_family_clone_shallow(fam);
-  c_avl_insert(cb->cached_metric_families, ret->name, ret);
+  c_avl_insert(cb->staged_metric_families, ret->name, ret);
+  DEBUG("write_open_telemetry plugin: Successfully staged metric family \"%s\"",
+        ret->name);
   return ret;
 }
 
@@ -282,20 +289,22 @@ static int ot_write(metric_family_t const *fam, user_data_t *user_data) {
     ot_flush_nolock(timeout, cb);
   }
 
-  metric_family_t *cache = ot_cached_metric_family(cb, fam);
-  size_t offset = cache->metric.num;
+  metric_family_t *stage = ot_staged_metric_family(cb, fam);
+  size_t offset = stage->metric.num;
 
-  int status = metric_family_append_list(cache, fam->metric);
+  int status = metric_family_append_list(stage, fam->metric);
   if (status != 0) {
     ERROR("write_open_telemetry plugin: metric_list_append_list failed: %d",
           status);
+    pthread_mutex_unlock(&cb->mu);
     return status;
   }
 
-  for (size_t i = offset; i < cache->metric.num; i++) {
-    ot_mark_metric_cached(cb, &cache->metric.ptr[i]);
+  for (size_t i = offset; i < stage->metric.num; i++) {
+    ot_mark_metric_staged(cb, &stage->metric.ptr[i]);
   }
 
+  pthread_mutex_unlock(&cb->mu);
   return 0;
 }
 
@@ -312,9 +321,9 @@ static int ot_config_node(oconfig_item_t *ci) {
   cb->port = strdup(OT_DEFAULT_PORT);
   cb->path = strdup(OT_DEFAULT_PATH);
 
-  cb->cached_metrics =
+  cb->staged_metrics =
       c_avl_create((int (*)(const void *, const void *))strcmp);
-  cb->cached_metric_families =
+  cb->staged_metric_families =
       c_avl_create((int (*)(const void *, const void *))strcmp);
 
   pthread_mutex_init(&cb->mu, /* attr = */ NULL);
@@ -342,9 +351,8 @@ static int ot_config_node(oconfig_item_t *ci) {
     }
   }
 
-  char callback_name[DATA_MAX_NAME_LEN];
-  ssnprintf(callback_name, sizeof(callback_name), "write_open_telemetry/%s",
-            cb->name);
+  strbuf_t callback_name = STRBUF_CREATE;
+  strbuf_printf(&callback_name, "write_open_telemetry/%s", cb->name);
 
   user_data_t user_data = {
       .data = cb,
@@ -352,12 +360,13 @@ static int ot_config_node(oconfig_item_t *ci) {
   };
 
   cb->reference_count++;
-  plugin_register_write(callback_name, ot_write, &user_data);
+  plugin_register_write(callback_name.ptr, ot_write, &user_data);
 
   cb->reference_count++;
-  plugin_register_flush(callback_name, ot_flush, &user_data);
+  plugin_register_flush(callback_name.ptr, ot_flush, &user_data);
 
   ot_callback_decref(cb);
+  STRBUF_DESTROY(callback_name);
   return 0;
 }