#include "plugin.h"
#include "utils/common/common.h"
-#include "utils/avltree/avltree.h"
+#include "utils/resource_metrics/resource_metrics.h"
#include "utils/strbuf/strbuf.h"
#include "utils_complain.h"
char *host;
char *port;
+ resource_metrics_set_t resource_metrics;
cdtime_t staged_time;
- c_avl_tree_t *staged_metrics; // char* metric_identity() -> NULL
- c_avl_tree_t *staged_metric_families; // char* fam->name -> metric_family_t*
std::unique_ptr<MetricsService::Stub> stub;
pthread_mutex_t mu;
} ot_callback_t;
-static int ot_send_buffer(ot_callback_t *cb) {
- size_t families_num = (size_t)c_avl_size(cb->staged_metric_families);
- metric_family_t const *families[families_num];
-
- memset(families, 0, sizeof(families));
-
- c_avl_iterator_t *iter = c_avl_get_iterator(cb->staged_metric_families);
- for (size_t i = 0; i < families_num; i++) {
- metric_family_t *fam = NULL;
- int status = c_avl_iterator_next(iter, /* key = */ NULL, (void **)&fam);
- if (status != 0) {
- ERROR("write_open_telemetry plugin: found %zu metric families, want %zu",
- i, families_num);
- return -1;
- }
-
- families[i] = fam;
- }
-
+static int export_metrics(ot_callback_t *cb) {
if (cb->stub == NULL) {
strbuf_t buf = STRBUF_CREATE;
strbuf_printf(&buf, "%s:%s", cb->host, cb->port);
STRBUF_DESTROY(buf);
}
- auto req = format_open_telemetry_export_metrics_service_request(families,
- families_num);
+ auto req = format_open_telemetry_export_metrics_service_request(cb->resource_metrics);
grpc::ClientContext context;
ExportMetricsServiceResponse resp;
NOTICE("write_open_telemetry plugin: %" PRId64
" data points were rejected: %s",
ps.rejected_data_points(), ps.error_message().c_str());
- } else {
- DEBUG("write_open_telemetry plugin: Successfully sent %zu metric families",
- families_num);
}
delete req;
return 0;
}
-static void ot_reset_buffer(ot_callback_t *cb) {
- void *dummy = NULL;
- metric_family_t *fam = NULL;
- while (c_avl_pick(cb->staged_metric_families, &dummy, (void **)&fam) == 0) {
- metric_family_free(fam);
- }
-
- char *id = NULL;
- while (c_avl_pick(cb->staged_metrics, (void **)&id, NULL) == 0) {
- sfree(id);
- }
-}
-
/* NOTE: You must hold cb->send_lock when calling this function! */
static int ot_flush_nolock(cdtime_t timeout, ot_callback_t *cb) {
- int staged_num = c_avl_size(cb->staged_metrics);
-
- DEBUG("write_open_telemetry plugin: ot_flush_nolock: timeout = %.3f; "
- "send_buf_fill = %d;",
- CDTIME_T_TO_DOUBLE(timeout), staged_num);
-
- if (staged_num == 0) {
+ if (cb->resource_metrics.num == 0) {
cb->staged_time = cdtime();
return 0;
}
return 0;
}
- int status = ot_send_buffer(cb);
- ot_reset_buffer(cb);
+ int status = export_metrics(cb);
+ resource_metrics_reset(&cb->resource_metrics);
return status;
}
cb->stub.reset();
- c_avl_destroy(cb->staged_metrics);
- c_avl_destroy(cb->staged_metric_families);
-
sfree(cb->host);
sfree(cb->port);
return status;
}
-static bool ot_metric_is_staged(ot_callback_t *cb, metric_t const *m) {
- strbuf_t id = STRBUF_CREATE;
- metric_identity(&id, m);
-
- int status = c_avl_get(cb->staged_metrics, id.ptr, NULL);
- STRBUF_DESTROY(id);
- return status == 0;
-}
-
-static bool ot_need_flush(ot_callback_t *cb, metric_family_t const *fam) {
- int status = c_avl_get(cb->staged_metric_families, fam, NULL);
- if (status != 0) {
- return false;
- }
-
- /* if any of the metrics are already staged, we should flush before adding
- * this metric family. */
- for (size_t i = 0; i < fam->metric.num; i++) {
- bool ok = ot_metric_is_staged(cb, fam->metric.ptr + i);
- if (ok) {
- return true;
- }
- }
-
- return false;
-}
-
-static int ot_mark_metric_staged(ot_callback_t *cb, metric_t const *m) {
- strbuf_t buf = STRBUF_CREATE;
- int status = metric_identity(&buf, m);
- if (status != 0) {
- ERROR("write_open_telemetry plugin: metric_identity failed: %d", status);
- STRBUF_DESTROY(buf);
- return status;
- }
-
- char *id = strdup(buf.ptr);
- if (id == NULL) {
- STRBUF_DESTROY(buf);
- return errno;
- }
-
- status = c_avl_insert(cb->staged_metrics, id, /* value = */ NULL);
- if (status != 0) {
- ERROR("write_open_telemetry plugin: c_avl_insert(\"%s\") failed: %d",
- buf.ptr, status);
- STRBUF_DESTROY(buf);
- return status;
- }
-
- if (cb->staged_time == 0 || cb->staged_time > m->time) {
- cb->staged_time = m->time;
- }
-
- DEBUG("write_open_telemetry plugin: Successfully marked metric \"%s\" as "
- "staged",
- id);
- STRBUF_DESTROY(buf);
- return 0;
-}
-
-static metric_family_t *ot_staged_metric_family(ot_callback_t *cb,
- metric_family_t const *fam) {
- metric_family_t *ret = NULL;
- int status = c_avl_get(cb->staged_metric_families, fam, (void **)&ret);
- if (status == 0) {
- DEBUG("write_open_telemetry plugin: Found staged metric family \"%s\"",
- ret->name);
- return ret;
- }
-
- ret = metric_family_clone_shallow(fam);
- c_avl_insert(cb->staged_metric_families, ret, ret);
- DEBUG("write_open_telemetry plugin: Successfully staged metric family \"%s\"",
- ret->name);
- return ret;
-}
-
static int ot_write(metric_family_t const *fam, user_data_t *user_data) {
- if ((fam == NULL) || (user_data == NULL)) {
+ ot_callback_t *cb = (ot_callback_t *)user_data->data;
+ if ((fam == NULL) || (cb == NULL)) {
return EINVAL;
}
-
- ot_callback_t *cb = (ot_callback_t *)user_data->data;
pthread_mutex_lock(&cb->mu);
-
- if (ot_need_flush(cb, fam)) {
- cdtime_t timeout = 0;
- ot_flush_nolock(timeout, cb);
- }
-
- metric_family_t *stage = ot_staged_metric_family(cb, fam);
- size_t offset = stage->metric.num;
-
- int status = metric_family_append_list(stage, fam->metric);
- if (status != 0) {
- ERROR("write_open_telemetry plugin: metric_list_append_list failed: %d",
- status);
- pthread_mutex_unlock(&cb->mu);
- return status;
- }
-
- for (size_t i = offset; i < stage->metric.num; i++) {
- ot_mark_metric_staged(cb, &stage->metric.ptr[i]);
- }
-
+ int status = resource_metrics_add(&cb->resource_metrics, fam);
pthread_mutex_unlock(&cb->mu);
- return 0;
+
+ return status;
}
static int ot_config_node(oconfig_item_t *ci) {
cb->host = strdup(OT_DEFAULT_HOST);
cb->port = strdup(OT_DEFAULT_PORT);
- cb->staged_metrics =
- c_avl_create((int (*)(const void *, const void *))strcmp);
- cb->staged_metric_families =
- c_avl_create((int (*)(const void *, const void *))metric_family_compare);
-
pthread_mutex_init(&cb->mu, /* attr = */ NULL);
for (int i = 0; i < ci->children_num; i++) {