From: Florian Forster Date: Sat, 16 Dec 2023 13:18:48 +0000 (+0100) Subject: write_http plugin: Add OTLP JSON support using resource metrics. X-Git-Tag: 6.0.0-rc0~18^2~7 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=79d0e80b7cc8c2b7c4aa788dc18b4a39c218cf04;p=thirdparty%2Fcollectd.git write_http plugin: Add OTLP JSON support using resource metrics. --- diff --git a/src/write_http.c b/src/write_http.c index e1e335564..cebc2c8c1 100644 --- a/src/write_http.c +++ b/src/write_http.c @@ -87,6 +87,7 @@ struct wh_callback_s { pthread_mutex_t send_buffer_lock; strbuf_t send_buffer; cdtime_t send_buffer_init_time; + resource_metrics_set_t resource_metrics; c_avl_tree_t *staged_metrics; // char* metric_identity() -> NULL c_avl_tree_t *staged_metric_families; // char* fam->name -> metric_family_t* @@ -206,11 +207,16 @@ static int wh_callback_init(wh_callback_t *cb) { curl_easy_setopt(cb->curl, CURLOPT_USERAGENT, COLLECTD_USERAGENT); cb->headers = curl_slist_append(cb->headers, "Accept: */*"); - if (cb->format == WH_FORMAT_JSON || cb->format == WH_FORMAT_KAIROSDB) + switch (cb->format) { + case WH_FORMAT_JSON: + case WH_FORMAT_KAIROSDB: + case WH_FORMAT_OTLP_JSON: cb->headers = curl_slist_append(cb->headers, "Content-Type: application/json"); - else + + default: cb->headers = curl_slist_append(cb->headers, "Content-Type: text/plain"); + } cb->headers = curl_slist_append(cb->headers, "Expect:"); curl_easy_setopt(cb->curl, CURLOPT_HTTPHEADER, cb->headers); @@ -269,6 +275,40 @@ static int wh_callback_init(wh_callback_t *cb) { return 0; } /* int wh_callback_init */ +static int flush_resource_metrics(wh_callback_t *cb) { + /* You must hold cb->send_buffer_lock when calling. */ + strbuf_t buf = STRBUF_CREATE; + int status = 0; + switch (cb->format) { + case WH_FORMAT_OTLP_JSON: + status = format_json_open_telemetry(&buf, &cb->resource_metrics); + if (status != 0) { + ERROR("write_http plugin: format_json_open_telemetry failed: %s", + STRERROR(status)); + } + break; + + default: + ERROR("write_http plugin: Unexpected format: %d", cb->format); + status = EINVAL; + } + + if (status != 0) { + pthread_mutex_unlock(&cb->send_buffer_lock); + STRBUF_DESTROY(buf); + return status; + } + + resource_metrics_reset(&cb->resource_metrics); + cb->send_buffer_init_time = cdtime(); + + pthread_mutex_unlock(&cb->send_buffer_lock); + + status = wh_post(cb, buf.ptr, buf.pos); + STRBUF_DESTROY(buf); + return status; +} + static int wh_flush(cdtime_t timeout, const char *identifier __attribute__((unused)), user_data_t *user_data) { @@ -293,6 +333,11 @@ static int wh_flush(cdtime_t timeout, } } + if (cb->format == WH_FORMAT_OTLP_PROTO || cb->format == WH_FORMAT_OTLP_JSON) { + /* cb->send_buffer_lock is unlocked in flush_resource_metrics. */ + return flush_resource_metrics(cb); + } + if (cb->send_buffer.pos == 0) { cb->send_buffer_init_time = cdtime(); pthread_mutex_unlock(&cb->send_buffer_lock); @@ -443,14 +488,18 @@ static int wh_write_influxdb(metric_family_t const *fam, wh_callback_t *cb) { return 0; } /* wh_write_influxdb */ -static int wh_write_otlp_proto(metric_family_t const *fam, wh_callback_t *cb) { - ERROR("wh_write_otlp_proto: Not implemented yet."); - return -1; -} +static int wh_write_resource_metrics(metric_family_t const *fam, + wh_callback_t *cb) { + pthread_mutex_lock(&cb->send_buffer_lock); + int status = resource_metrics_add(&cb->resource_metrics, fam); + pthread_mutex_unlock(&cb->send_buffer_lock); -static int wh_write_otlp_json(metric_family_t const *fam, wh_callback_t *cb) { - ERROR("wh_write_otlp_json: Not implemented yet."); - return -1; + if (status != 0) { + ERROR("write_http plugin: resource_metrics_add failed: %s", + STRERROR(status)); + return status; + } + return 0; } static int wh_write(metric_family_t const *fam, user_data_t *user_data) { @@ -474,10 +523,11 @@ static int wh_write(metric_family_t const *fam, user_data_t *user_data) { status = wh_write_influxdb(fam, cb); break; case WH_FORMAT_OTLP_PROTO: - status = wh_write_otlp_proto(fam, cb); + status = -1; + ERROR("wh_write_otlp_proto: Not implemented yet."); break; case WH_FORMAT_OTLP_JSON: - status = wh_write_otlp_json(fam, cb); + status = wh_write_resource_metrics(fam, cb); break; default: status = wh_write_command(fam, cb);