char *port;
char *path;
- c_avl_tree_t *cached_metrics; // char* metric_identity() -> NULL
- c_avl_tree_t *cached_metric_families; // char* fam->name -> metric_family_t*
+ c_avl_tree_t *staged_metrics; // char* metric_identity() -> NULL
+ c_avl_tree_t *staged_metric_families; // char* fam->name -> metric_family_t*
pthread_mutex_t mu;
} ot_callback_t;
static int ot_send_buffer(ot_callback_t *cb) {
- size_t families_num = (size_t)c_avl_size(cb->cached_metric_families);
+ size_t families_num = (size_t)c_avl_size(cb->staged_metric_families);
metric_family_t *families[families_num];
memset(families, 0, sizeof(families));
- c_avl_iterator_t *iter = c_avl_get_iterator(cb->cached_metric_families);
+ c_avl_iterator_t *iter = c_avl_get_iterator(cb->staged_metric_families);
for (size_t i = 0; i < families_num; i++) {
metric_family_t *fam = NULL;
int status = c_avl_iterator_next(iter, /* key = */ NULL, (void **)&fam);
static void ot_reset_buffer(ot_callback_t *cb) {
void *dummy = NULL;
metric_family_t *fam = NULL;
- while (c_avl_pick(cb->cached_metric_families, &dummy, (void **)&fam) == 0) {
+ while (c_avl_pick(cb->staged_metric_families, &dummy, (void **)&fam) == 0) {
metric_family_free(fam);
}
char *id = NULL;
- while (c_avl_pick(cb->cached_metrics, (void **)&id, NULL) == 0) {
+ while (c_avl_pick(cb->staged_metrics, (void **)&id, NULL) == 0) {
sfree(id);
}
}
/* NOTE: You must hold cb->send_lock when calling this function! */
static int ot_flush_nolock(cdtime_t timeout, ot_callback_t *cb) {
+#if 0
DEBUG("write_open_telemetry plugin: ot_flush_nolock: timeout = %.3f; "
"send_buf_fill = %" PRIsz ";",
(double)timeout, cb->send_buf_fill);
-#if 0
/* timeout == 0 => flush unconditionally */
if (timeout > 0) {
cdtime_t now;
ot_flush_nolock(/* timeout = */ 0, cb);
- c_avl_destroy(cb->cached_metrics);
- c_avl_destroy(cb->cached_metric_families);
+ c_avl_destroy(cb->staged_metrics);
+ c_avl_destroy(cb->staged_metric_families);
sfree(cb->host);
sfree(cb->port);
return status;
}
-static bool ot_metric_is_cached(ot_callback_t *cb, metric_t const *m) {
+static bool ot_metric_is_staged(ot_callback_t *cb, metric_t const *m) {
strbuf_t id = STRBUF_CREATE;
metric_identity(&id, m);
- int status = c_avl_get(cb->cached_metrics, id.ptr, NULL);
+ int status = c_avl_get(cb->staged_metrics, id.ptr, NULL);
STRBUF_DESTROY(id);
return status == 0;
}
static bool ot_need_flush(ot_callback_t *cb, metric_family_t const *fam) {
- int status = c_avl_get(cb->cached_metric_families, fam->name, NULL);
+ int status = c_avl_get(cb->staged_metric_families, fam->name, NULL);
if (status != 0) {
return false;
}
- /* if any of the metrics are already cached, we should flush before adding
+ /* if any of the metrics are already staged, we should flush before adding
* this metric family. */
for (size_t i = 0; i < fam->metric.num; i++) {
- bool ok = ot_metric_is_cached(cb, fam->metric.ptr + i);
+ bool ok = ot_metric_is_staged(cb, fam->metric.ptr + i);
if (ok) {
return true;
}
return false;
}
-static int ot_mark_metric_cached(ot_callback_t *cb, metric_t const *m) {
+static int ot_mark_metric_staged(ot_callback_t *cb, metric_t const *m) {
strbuf_t buf = STRBUF_CREATE;
int status = metric_identity(&buf, m);
if (status != 0) {
return errno;
}
- status = c_avl_insert(cb->cached_metrics, id, /* value = */ NULL);
+ status = c_avl_insert(cb->staged_metrics, id, /* value = */ NULL);
if (status != 0) {
ERROR("write_open_telemetry plugin: c_avl_insert(\"%s\") failed: %d",
buf.ptr, status);
return status;
}
+ DEBUG("write_open_telemetry plugin: Successfully marked metric \"%s\" as "
+ "staged",
+ id);
STRBUF_DESTROY(buf);
return 0;
}
-static metric_family_t *ot_cached_metric_family(ot_callback_t *cb,
+static metric_family_t *ot_staged_metric_family(ot_callback_t *cb,
metric_family_t const *fam) {
metric_family_t *ret = NULL;
- int status = c_avl_get(cb->cached_metric_families, fam->name, (void **)&ret);
+ int status = c_avl_get(cb->staged_metric_families, fam->name, (void **)&ret);
if (status == 0) {
+ DEBUG("write_open_telemetry plugin: Found staged metric family \"%s\"",
+ ret->name);
return ret;
}
ret = metric_family_clone_shallow(fam);
- c_avl_insert(cb->cached_metric_families, ret->name, ret);
+ c_avl_insert(cb->staged_metric_families, ret->name, ret);
+ DEBUG("write_open_telemetry plugin: Successfully staged metric family \"%s\"",
+ ret->name);
return ret;
}
ot_flush_nolock(timeout, cb);
}
- metric_family_t *cache = ot_cached_metric_family(cb, fam);
- size_t offset = cache->metric.num;
+ metric_family_t *stage = ot_staged_metric_family(cb, fam);
+ size_t offset = stage->metric.num;
- int status = metric_family_append_list(cache, fam->metric);
+ int status = metric_family_append_list(stage, fam->metric);
if (status != 0) {
ERROR("write_open_telemetry plugin: metric_list_append_list failed: %d",
status);
+ pthread_mutex_unlock(&cb->mu);
return status;
}
- for (size_t i = offset; i < cache->metric.num; i++) {
- ot_mark_metric_cached(cb, &cache->metric.ptr[i]);
+ for (size_t i = offset; i < stage->metric.num; i++) {
+ ot_mark_metric_staged(cb, &stage->metric.ptr[i]);
}
+ pthread_mutex_unlock(&cb->mu);
return 0;
}
cb->port = strdup(OT_DEFAULT_PORT);
cb->path = strdup(OT_DEFAULT_PATH);
- cb->cached_metrics =
+ cb->staged_metrics =
c_avl_create((int (*)(const void *, const void *))strcmp);
- cb->cached_metric_families =
+ cb->staged_metric_families =
c_avl_create((int (*)(const void *, const void *))strcmp);
pthread_mutex_init(&cb->mu, /* attr = */ NULL);
}
}
- char callback_name[DATA_MAX_NAME_LEN];
- ssnprintf(callback_name, sizeof(callback_name), "write_open_telemetry/%s",
- cb->name);
+ strbuf_t callback_name = STRBUF_CREATE;
+ strbuf_printf(&callback_name, "write_open_telemetry/%s", cb->name);
user_data_t user_data = {
.data = cb,
};
cb->reference_count++;
- plugin_register_write(callback_name, ot_write, &user_data);
+ plugin_register_write(callback_name.ptr, ot_write, &user_data);
cb->reference_count++;
- plugin_register_flush(callback_name, ot_flush, &user_data);
+ plugin_register_flush(callback_name.ptr, ot_flush, &user_data);
ot_callback_decref(cb);
+ STRBUF_DESTROY(callback_name);
return 0;
}