From: Florian Forster Date: Sat, 2 Dec 2023 21:42:45 +0000 (+0100) Subject: write_open_telemetry plugin: s/cached/staged/ X-Git-Tag: 6.0.0-rc0~17^2~32 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=ed9dc8f2c6cda0ef5578430e0e3ec7377042664d;p=thirdparty%2Fcollectd.git write_open_telemetry plugin: s/cached/staged/ --- diff --git a/src/write_open_telemetry.cc b/src/write_open_telemetry.cc index f26f37af1..e306f6cfd 100644 --- a/src/write_open_telemetry.cc +++ b/src/write_open_telemetry.cc @@ -89,19 +89,19 @@ typedef struct { char *port; char *path; - c_avl_tree_t *cached_metrics; // char* metric_identity() -> NULL - c_avl_tree_t *cached_metric_families; // char* fam->name -> metric_family_t* + c_avl_tree_t *staged_metrics; // char* metric_identity() -> NULL + c_avl_tree_t *staged_metric_families; // char* fam->name -> metric_family_t* pthread_mutex_t mu; } ot_callback_t; static int ot_send_buffer(ot_callback_t *cb) { - size_t families_num = (size_t)c_avl_size(cb->cached_metric_families); + size_t families_num = (size_t)c_avl_size(cb->staged_metric_families); metric_family_t *families[families_num]; memset(families, 0, sizeof(families)); - c_avl_iterator_t *iter = c_avl_get_iterator(cb->cached_metric_families); + c_avl_iterator_t *iter = c_avl_get_iterator(cb->staged_metric_families); for (size_t i = 0; i < families_num; i++) { metric_family_t *fam = NULL; int status = c_avl_iterator_next(iter, /* key = */ NULL, (void **)&fam); @@ -122,23 +122,23 @@ static int ot_send_buffer(ot_callback_t *cb) { static void ot_reset_buffer(ot_callback_t *cb) { void *dummy = NULL; metric_family_t *fam = NULL; - while (c_avl_pick(cb->cached_metric_families, &dummy, (void **)&fam) == 0) { + while (c_avl_pick(cb->staged_metric_families, &dummy, (void **)&fam) == 0) { metric_family_free(fam); } char *id = NULL; - while (c_avl_pick(cb->cached_metrics, (void **)&id, NULL) == 0) { + while (c_avl_pick(cb->staged_metrics, (void **)&id, NULL) == 0) { sfree(id); } } /* NOTE: You must hold cb->send_lock when calling this function! */ static int ot_flush_nolock(cdtime_t timeout, ot_callback_t *cb) { +#if 0 DEBUG("write_open_telemetry plugin: ot_flush_nolock: timeout = %.3f; " "send_buf_fill = %" PRIsz ";", (double)timeout, cb->send_buf_fill); -#if 0 /* timeout == 0 => flush unconditionally */ if (timeout > 0) { cdtime_t now; @@ -174,8 +174,8 @@ static void ot_callback_decref(void *data) { ot_flush_nolock(/* timeout = */ 0, cb); - c_avl_destroy(cb->cached_metrics); - c_avl_destroy(cb->cached_metric_families); + c_avl_destroy(cb->staged_metrics); + c_avl_destroy(cb->staged_metric_families); sfree(cb->host); sfree(cb->port); @@ -202,25 +202,25 @@ static int ot_flush(cdtime_t timeout, return status; } -static bool ot_metric_is_cached(ot_callback_t *cb, metric_t const *m) { +static bool ot_metric_is_staged(ot_callback_t *cb, metric_t const *m) { strbuf_t id = STRBUF_CREATE; metric_identity(&id, m); - int status = c_avl_get(cb->cached_metrics, id.ptr, NULL); + int status = c_avl_get(cb->staged_metrics, id.ptr, NULL); STRBUF_DESTROY(id); return status == 0; } static bool ot_need_flush(ot_callback_t *cb, metric_family_t const *fam) { - int status = c_avl_get(cb->cached_metric_families, fam->name, NULL); + int status = c_avl_get(cb->staged_metric_families, fam->name, NULL); if (status != 0) { return false; } - /* if any of the metrics are already cached, we should flush before adding + /* if any of the metrics are already staged, we should flush before adding * this metric family. */ for (size_t i = 0; i < fam->metric.num; i++) { - bool ok = ot_metric_is_cached(cb, fam->metric.ptr + i); + bool ok = ot_metric_is_staged(cb, fam->metric.ptr + i); if (ok) { return true; } @@ -229,7 +229,7 @@ static bool ot_need_flush(ot_callback_t *cb, metric_family_t const *fam) { return false; } -static int ot_mark_metric_cached(ot_callback_t *cb, metric_t const *m) { +static int ot_mark_metric_staged(ot_callback_t *cb, metric_t const *m) { strbuf_t buf = STRBUF_CREATE; int status = metric_identity(&buf, m); if (status != 0) { @@ -244,7 +244,7 @@ static int ot_mark_metric_cached(ot_callback_t *cb, metric_t const *m) { return errno; } - status = c_avl_insert(cb->cached_metrics, id, /* value = */ NULL); + status = c_avl_insert(cb->staged_metrics, id, /* value = */ NULL); if (status != 0) { ERROR("write_open_telemetry plugin: c_avl_insert(\"%s\") failed: %d", buf.ptr, status); @@ -252,20 +252,27 @@ static int ot_mark_metric_cached(ot_callback_t *cb, metric_t const *m) { return status; } + DEBUG("write_open_telemetry plugin: Successfully marked metric \"%s\" as " + "staged", + id); STRBUF_DESTROY(buf); return 0; } -static metric_family_t *ot_cached_metric_family(ot_callback_t *cb, +static metric_family_t *ot_staged_metric_family(ot_callback_t *cb, metric_family_t const *fam) { metric_family_t *ret = NULL; - int status = c_avl_get(cb->cached_metric_families, fam->name, (void **)&ret); + int status = c_avl_get(cb->staged_metric_families, fam->name, (void **)&ret); if (status == 0) { + DEBUG("write_open_telemetry plugin: Found staged metric family \"%s\"", + ret->name); return ret; } ret = metric_family_clone_shallow(fam); - c_avl_insert(cb->cached_metric_families, ret->name, ret); + c_avl_insert(cb->staged_metric_families, ret->name, ret); + DEBUG("write_open_telemetry plugin: Successfully staged metric family \"%s\"", + ret->name); return ret; } @@ -282,20 +289,22 @@ static int ot_write(metric_family_t const *fam, user_data_t *user_data) { ot_flush_nolock(timeout, cb); } - metric_family_t *cache = ot_cached_metric_family(cb, fam); - size_t offset = cache->metric.num; + metric_family_t *stage = ot_staged_metric_family(cb, fam); + size_t offset = stage->metric.num; - int status = metric_family_append_list(cache, fam->metric); + int status = metric_family_append_list(stage, fam->metric); if (status != 0) { ERROR("write_open_telemetry plugin: metric_list_append_list failed: %d", status); + pthread_mutex_unlock(&cb->mu); return status; } - for (size_t i = offset; i < cache->metric.num; i++) { - ot_mark_metric_cached(cb, &cache->metric.ptr[i]); + for (size_t i = offset; i < stage->metric.num; i++) { + ot_mark_metric_staged(cb, &stage->metric.ptr[i]); } + pthread_mutex_unlock(&cb->mu); return 0; } @@ -312,9 +321,9 @@ static int ot_config_node(oconfig_item_t *ci) { cb->port = strdup(OT_DEFAULT_PORT); cb->path = strdup(OT_DEFAULT_PATH); - cb->cached_metrics = + cb->staged_metrics = c_avl_create((int (*)(const void *, const void *))strcmp); - cb->cached_metric_families = + cb->staged_metric_families = c_avl_create((int (*)(const void *, const void *))strcmp); pthread_mutex_init(&cb->mu, /* attr = */ NULL); @@ -342,9 +351,8 @@ static int ot_config_node(oconfig_item_t *ci) { } } - char callback_name[DATA_MAX_NAME_LEN]; - ssnprintf(callback_name, sizeof(callback_name), "write_open_telemetry/%s", - cb->name); + strbuf_t callback_name = STRBUF_CREATE; + strbuf_printf(&callback_name, "write_open_telemetry/%s", cb->name); user_data_t user_data = { .data = cb, @@ -352,12 +360,13 @@ static int ot_config_node(oconfig_item_t *ci) { }; cb->reference_count++; - plugin_register_write(callback_name, ot_write, &user_data); + plugin_register_write(callback_name.ptr, ot_write, &user_data); cb->reference_count++; - plugin_register_flush(callback_name, ot_flush, &user_data); + plugin_register_flush(callback_name.ptr, ot_flush, &user_data); ot_callback_decref(cb); + STRBUF_DESTROY(callback_name); return 0; }