]> git.ipfire.org Git - thirdparty/collectd.git/commitdiff
write_open_telemetry plugin: Implement gRPC sending logic.
authorFlorian Forster <octo@collectd.org>
Mon, 4 Dec 2023 14:10:53 +0000 (15:10 +0100)
committerFlorian Forster <octo@collectd.org>
Wed, 3 Jan 2024 16:16:28 +0000 (17:16 +0100)
build.sh
src/utils/format_open_telemetry/format_open_telemetry.cc
src/utils/format_open_telemetry/format_open_telemetry.h
src/write_open_telemetry.cc

index 0fe9142dad643788245ca1fd052d6fb989a682bb..4c36775c0a214bbc5627b6190b82bfe7eb797e6f 100755 (executable)
--- a/build.sh
+++ b/build.sh
@@ -56,9 +56,10 @@ build()
     && automake --add-missing --copy \
     && autoconf
 
-    for f in common/v1/common.proto metrics/v1/metrics.proto resource/v1/resource.proto; do
+    for f in common/v1/common.proto metrics/v1/metrics.proto resource/v1/resource.proto collector/metrics/v1/metrics_service.proto; do
       protoc -Iopentelemetry-proto --cpp_out src/ "opentelemetry-proto/opentelemetry/proto/${f}"
     done
+    protoc -Iopentelemetry-proto --grpc_out src/ --plugin="protoc-gen-grpc=$(which grpc_cpp_plugin)" "opentelemetry-proto/opentelemetry/proto/collector/metrics/v1/metrics_service.proto"
 }
 
 build_cygwin()
index 771332a8c3e564e16c6efb44c32139b36a64f20a..3739b1ee26ee2960bbc0405c5a7b8eea5bf9a5b9 100644 (file)
 extern "C" {
 #include "collectd.h"
 #include "metric.h"
+}
 
 #include "utils/format_open_telemetry/format_open_telemetry.h"
-}
 
+#include "opentelemetry/proto/collector/metrics/v1/metrics_service.pb.h"
 #include "opentelemetry/proto/common/v1/common.pb.h"
 #include "opentelemetry/proto/metrics/v1/metrics.pb.h"
 #include "opentelemetry/proto/resource/v1/resource.pb.h"
 
+using opentelemetry::proto::collector::metrics::v1::ExportMetricsServiceRequest;
 using opentelemetry::proto::common::v1::AnyValue;
 using opentelemetry::proto::common::v1::InstrumentationScope;
 using opentelemetry::proto::common::v1::KeyValue;
@@ -46,7 +48,8 @@ using opentelemetry::proto::metrics::v1::ResourceMetrics;
 using opentelemetry::proto::metrics::v1::ScopeMetrics;
 using opentelemetry::proto::metrics::v1::Sum;
 
-static void metric_to_number_data_point(NumberDataPoint *dp, metric_t const *m) {
+static void metric_to_number_data_point(NumberDataPoint *dp,
+                                        metric_t const *m) {
   for (size_t i = 0; i < m->label.num; i++) {
     label_pair_t *l = m->label.ptr + i;
 
@@ -61,19 +64,24 @@ static void metric_to_number_data_point(NumberDataPoint *dp, metric_t const *m)
   // when we've seen a metric for the first time.
 
   switch (m->family->type) {
-    case METRIC_TYPE_COUNTER:
-      dp->set_as_int(m->value.derive);
-    case METRIC_TYPE_GAUGE:
-      dp->set_as_double(m->value.gauge);
-    case METRIC_TYPE_UNTYPED:
-      // TODO
-      assert(0);
+  case METRIC_TYPE_COUNTER:
+    dp->set_as_int(m->value.derive);
+    break;
+  case METRIC_TYPE_GAUGE:
+    dp->set_as_double(m->value.gauge);
+    break;
+  case METRIC_TYPE_UNTYPED:
+    // TODO
+    assert(0);
   }
 }
 
 static void set_sum(Metric *m, metric_family_t const *fam) {
   Sum *s = m->mutable_sum();
   for (size_t i = 0; i < fam->metric.num; i++) {
+    metric_t const *m = fam->metric.ptr + i;
+    assert(m->family == fam);
+
     NumberDataPoint *dp = s->add_data_points();
     metric_to_number_data_point(dp, fam->metric.ptr + i);
   }
@@ -99,15 +107,15 @@ static void add_metric(ScopeMetrics *sm, metric_family_t const *fam) {
   }
 
   switch (fam->type) {
-    case METRIC_TYPE_COUNTER:
-      set_sum(m, fam);
-      return;
-    case METRIC_TYPE_GAUGE:
-      set_gauge(m, fam);
-      return;
-    case METRIC_TYPE_UNTYPED:
-      // TODO
-      assert(0);
+  case METRIC_TYPE_COUNTER:
+    set_sum(m, fam);
+    return;
+  case METRIC_TYPE_GAUGE:
+    set_gauge(m, fam);
+    return;
+  case METRIC_TYPE_UNTYPED:
+    // TODO
+    assert(0);
   }
 }
 
@@ -117,18 +125,31 @@ static void set_instrumentation_scope(ScopeMetrics *sm) {
   is->set_version(PACKAGE_VERSION);
 }
 
-static void set_scope_metrics(ResourceMetrics *rm, metric_family_t const *fam) {
+static void set_scope_metrics(ResourceMetrics *rm, metric_family_t const **fam,
+                              size_t fam_num) {
   ScopeMetrics *sm = rm->add_scope_metrics();
 
   set_instrumentation_scope(sm);
 
-  add_metric(sm, fam);
+  for (size_t i = 0; i < fam_num; i++) {
+    add_metric(sm, fam[i]);
+  }
+}
+
+ResourceMetrics *
+format_open_telemetry_resource_metrics_serialized(metric_family_t const **fam,
+                                                  size_t fam_num) {
+  ResourceMetrics *rm = new ResourceMetrics();
+
+  set_scope_metrics(rm, fam, fam_num);
+  return rm;
 }
 
-int format_open_telemetry(strbuf_t *sb, metric_family_t const *fam) {
+int format_open_telemetry_resource_metrics_serialized(
+    strbuf_t *sb, metric_family_t const **fam, size_t fam_num) {
   ResourceMetrics rm;
 
-  set_scope_metrics(&rm, fam);
+  set_scope_metrics(&rm, fam, fam_num);
 
   std::string serialization;
   bool ok = rm.SerializeToString(&serialization);
@@ -140,3 +161,13 @@ int format_open_telemetry(strbuf_t *sb, metric_family_t const *fam) {
   return 0;
 }
 
+ExportMetricsServiceRequest *
+format_open_telemetry_export_metrics_service_request(
+    metric_family_t const **fam, size_t fam_num) {
+  ExportMetricsServiceRequest *req = new ExportMetricsServiceRequest();
+
+  ResourceMetrics *rm = req->add_resource_metrics();
+  set_scope_metrics(rm, fam, fam_num);
+
+  return req;
+}
index 4120dc9b6f13f345fc11a583dbbc47988aa81ef7..fa3c36106070f7d811f45a3a7288c8c2606d83e1 100644 (file)
 #ifndef UTILS_FORMAT_OPEN_TELEMETRY_H
 #define UTILS_FORMAT_OPEN_TELEMETRY_H 1
 
+#ifdef __cplusplus
+extern "C" {
+#endif
+
 #include "collectd.h"
 #include "metric.h"
 
-int format_open_telemetry(strbuf_t *sb, metric_family_t const *fam); // TODO: lacks return value
+int format_open_telemetry_resource_metrics_serialized(
+    strbuf_t *sb, metric_family_t const **fam, size_t fam_num);
+
+#ifdef __cplusplus
+}
+
+#include "opentelemetry/proto/metrics/v1/metrics.pb.h"
+#include "opentelemetry/proto/collector/metrics/v1/metrics_service.pb.h"
+
+opentelemetry::proto::metrics::v1::ResourceMetrics *
+format_open_telemetry_resource_metrics_serialized(metric_family_t const **fam,
+                                                  size_t fam_num);
+
+opentelemetry::proto::collector::metrics::v1::ExportMetricsServiceRequest *
+format_open_telemetry_export_metrics_service_request(
+    metric_family_t const **fam, size_t fam_num);
+#endif
 
 #endif /* UTILS_FORMAT_OPEN_TELEMETRY_H */
index be289d0716c7534bb83474553b0108946019c08e..f1c99d085c142efb02bf58961be61a9233511b55 100644 (file)
@@ -42,41 +42,32 @@ extern "C" {
 #include "utils/common/common.h"
 
 #include "utils/avltree/avltree.h"
-#include "utils/format_open_telemetry/format_open_telemetry.h"
 #include "utils/strbuf/strbuf.h"
 #include "utils_complain.h"
 
 #include <netdb.h>
 }
 
+#include <grpc++/grpc++.h>
+
+#include "opentelemetry/proto/collector/metrics/v1/metrics_service.grpc.pb.h"
+#include "utils/format_open_telemetry/format_open_telemetry.h"
+
 #ifndef OT_DEFAULT_HOST
 #define OT_DEFAULT_HOST "localhost"
 #endif
 
 #ifndef OT_DEFAULT_PORT
-#define OT_DEFAULT_PORT "8080"
+#define OT_DEFAULT_PORT "4317"
 #endif
 
 #ifndef OT_DEFAULT_PATH
 #define OT_DEFAULT_PATH "/v1/metrics"
 #endif
 
-#ifndef OT_DEFAULT_LOG_SEND_ERRORS
-#define OT_DEFAULT_LOG_SEND_ERRORS true
-#endif
-
-#ifndef OT_DEFAULT_ESCAPE
-#define OT_DEFAULT_ESCAPE '_'
-#endif
-
-/* Ethernet - (IPv6 + TCP) = 1500 - (40 + 32) = 1428 */
-#ifndef OT_SEND_BUF_SIZE
-#define OT_SEND_BUF_SIZE 1428
-#endif
-
-#ifndef OT_MIN_RECONNECT_INTERVAL
-#define OT_MIN_RECONNECT_INTERVAL TIME_T_TO_CDTIME_T(1)
-#endif
+using opentelemetry::proto::collector::metrics::v1::
+    ExportMetricsServiceResponse;
+using opentelemetry::proto::collector::metrics::v1::MetricsService;
 
 /*
  * Private variables
@@ -93,12 +84,14 @@ typedef struct {
   c_avl_tree_t *staged_metrics;         // char* metric_identity() -> NULL
   c_avl_tree_t *staged_metric_families; // char* fam->name -> metric_family_t*
 
+  std::unique_ptr<MetricsService::Stub> stub;
+
   pthread_mutex_t mu;
 } ot_callback_t;
 
 static int ot_send_buffer(ot_callback_t *cb) {
   size_t families_num = (size_t)c_avl_size(cb->staged_metric_families);
-  metric_family_t *families[families_num];
+  metric_family_t const *families[families_num];
 
   memset(families, 0, sizeof(families));
 
@@ -115,8 +108,42 @@ static int ot_send_buffer(ot_callback_t *cb) {
     families[i] = fam;
   }
 
-  DEBUG("write_open_telemetry plugin: TODO(octo): send %zu metric families",
-        families_num);
+  if (cb->stub == NULL) {
+    strbuf_t buf = STRBUF_CREATE;
+    strbuf_printf(&buf, "%s:%s", cb->host, cb->port);
+
+    auto chan =
+        grpc::CreateChannel(buf.ptr, grpc::InsecureChannelCredentials());
+    cb->stub = MetricsService::NewStub(chan);
+
+    STRBUF_DESTROY(buf);
+  }
+
+  auto req = format_open_telemetry_export_metrics_service_request(families,
+                                                                  families_num);
+
+  grpc::ClientContext context;
+  ExportMetricsServiceResponse resp;
+  grpc::Status status = cb->stub->Export(&context, *req, &resp);
+
+  if (!status.ok()) {
+    ERROR("write_open_telemetry plugin: Exporting metrics failed: %s",
+          status.error_message().c_str());
+    return -1;
+  }
+
+  if (resp.has_partial_success() &&
+      resp.partial_success().rejected_data_points() > 0) {
+    auto ps = resp.partial_success();
+    NOTICE("write_open_telemetry plugin: %" PRId64
+           " data points were rejected: %s",
+           ps.rejected_data_points(), ps.error_message().c_str());
+  } else {
+    DEBUG("write_open_telemetry plugin: Successfully sent %zu metric families",
+          families_num);
+  }
+
+  delete req;
   return 0;
 }
 
@@ -173,6 +200,8 @@ static void ot_callback_decref(void *data) {
 
   ot_flush_nolock(/* timeout = */ 0, cb);
 
+  cb->stub.reset();
+
   c_avl_destroy(cb->staged_metrics);
   c_avl_destroy(cb->staged_metric_families);