]> git.ipfire.org Git - thirdparty/collectd.git/commitdiff
write_http plugin: Add OTLP JSON support using resource metrics.
authorFlorian Forster <octo@collectd.org>
Sat, 16 Dec 2023 13:18:48 +0000 (14:18 +0100)
committerFlorian Forster <octo@collectd.org>
Wed, 3 Jan 2024 15:39:36 +0000 (16:39 +0100)
src/write_http.c

index e1e335564ee00ca152fa54a6efba8be1d78c6c40..cebc2c8c18122e1e590d0cef68971c9a93b616d1 100644 (file)
@@ -87,6 +87,7 @@ struct wh_callback_s {
   pthread_mutex_t send_buffer_lock;
   strbuf_t send_buffer;
   cdtime_t send_buffer_init_time;
+  resource_metrics_set_t resource_metrics;
 
   c_avl_tree_t *staged_metrics;         // char* metric_identity() -> NULL
   c_avl_tree_t *staged_metric_families; // char* fam->name -> metric_family_t*
@@ -206,11 +207,16 @@ static int wh_callback_init(wh_callback_t *cb) {
   curl_easy_setopt(cb->curl, CURLOPT_USERAGENT, COLLECTD_USERAGENT);
 
   cb->headers = curl_slist_append(cb->headers, "Accept:  */*");
-  if (cb->format == WH_FORMAT_JSON || cb->format == WH_FORMAT_KAIROSDB)
+  switch (cb->format) {
+  case WH_FORMAT_JSON:
+  case WH_FORMAT_KAIROSDB:
+  case WH_FORMAT_OTLP_JSON:
     cb->headers =
         curl_slist_append(cb->headers, "Content-Type: application/json");
-  else
+
+  default:
     cb->headers = curl_slist_append(cb->headers, "Content-Type: text/plain");
+  }
   cb->headers = curl_slist_append(cb->headers, "Expect:");
   curl_easy_setopt(cb->curl, CURLOPT_HTTPHEADER, cb->headers);
 
@@ -269,6 +275,40 @@ static int wh_callback_init(wh_callback_t *cb) {
   return 0;
 } /* int wh_callback_init */
 
+static int flush_resource_metrics(wh_callback_t *cb) {
+  /* You must hold cb->send_buffer_lock when calling. */
+  strbuf_t buf = STRBUF_CREATE;
+  int status = 0;
+  switch (cb->format) {
+  case WH_FORMAT_OTLP_JSON:
+    status = format_json_open_telemetry(&buf, &cb->resource_metrics);
+    if (status != 0) {
+      ERROR("write_http plugin: format_json_open_telemetry failed: %s",
+            STRERROR(status));
+    }
+    break;
+
+  default:
+    ERROR("write_http plugin: Unexpected format: %d", cb->format);
+    status = EINVAL;
+  }
+
+  if (status != 0) {
+    pthread_mutex_unlock(&cb->send_buffer_lock);
+    STRBUF_DESTROY(buf);
+    return status;
+  }
+
+  resource_metrics_reset(&cb->resource_metrics);
+  cb->send_buffer_init_time = cdtime();
+
+  pthread_mutex_unlock(&cb->send_buffer_lock);
+
+  status = wh_post(cb, buf.ptr, buf.pos);
+  STRBUF_DESTROY(buf);
+  return status;
+}
+
 static int wh_flush(cdtime_t timeout,
                     const char *identifier __attribute__((unused)),
                     user_data_t *user_data) {
@@ -293,6 +333,11 @@ static int wh_flush(cdtime_t timeout,
     }
   }
 
+  if (cb->format == WH_FORMAT_OTLP_PROTO || cb->format == WH_FORMAT_OTLP_JSON) {
+    /* cb->send_buffer_lock is unlocked in flush_resource_metrics. */
+    return flush_resource_metrics(cb);
+  }
+
   if (cb->send_buffer.pos == 0) {
     cb->send_buffer_init_time = cdtime();
     pthread_mutex_unlock(&cb->send_buffer_lock);
@@ -443,14 +488,18 @@ static int wh_write_influxdb(metric_family_t const *fam, wh_callback_t *cb) {
   return 0;
 } /* wh_write_influxdb */
 
-static int wh_write_otlp_proto(metric_family_t const *fam, wh_callback_t *cb) {
-  ERROR("wh_write_otlp_proto: Not implemented yet.");
-  return -1;
-}
+static int wh_write_resource_metrics(metric_family_t const *fam,
+                                     wh_callback_t *cb) {
+  pthread_mutex_lock(&cb->send_buffer_lock);
+  int status = resource_metrics_add(&cb->resource_metrics, fam);
+  pthread_mutex_unlock(&cb->send_buffer_lock);
 
-static int wh_write_otlp_json(metric_family_t const *fam, wh_callback_t *cb) {
-  ERROR("wh_write_otlp_json: Not implemented yet.");
-  return -1;
+  if (status != 0) {
+    ERROR("write_http plugin: resource_metrics_add failed: %s",
+          STRERROR(status));
+    return status;
+  }
+  return 0;
 }
 
 static int wh_write(metric_family_t const *fam, user_data_t *user_data) {
@@ -474,10 +523,11 @@ static int wh_write(metric_family_t const *fam, user_data_t *user_data) {
     status = wh_write_influxdb(fam, cb);
     break;
   case WH_FORMAT_OTLP_PROTO:
-    status = wh_write_otlp_proto(fam, cb);
+    status = -1;
+    ERROR("wh_write_otlp_proto: Not implemented yet.");
     break;
   case WH_FORMAT_OTLP_JSON:
-    status = wh_write_otlp_json(fam, cb);
+    status = wh_write_resource_metrics(fam, cb);
     break;
   default:
     status = wh_write_command(fam, cb);