]> git.ipfire.org Git - thirdparty/collectd.git/commitdiff
open_telemetry_collector plugin: Report failing metrics as "partial success".
authorFlorian Forster <octo@collectd.org>
Thu, 1 Feb 2024 11:28:02 +0000 (12:28 +0100)
committerFlorian Forster <octo@collectd.org>
Tue, 20 Feb 2024 14:28:50 +0000 (15:28 +0100)
src/open_telemetry_collector.cc

index 8efd1020f9b7254832ad65754e4851d951aa14a2..b4326747a99659837491d8c65f854b083dc5238a 100644 (file)
@@ -39,10 +39,12 @@ extern "C" {
 #include <grpc++/grpc++.h>
 
 #include "opentelemetry/proto/collector/metrics/v1/metrics_service.grpc.pb.h"
+#include "opentelemetry/proto/collector/metrics/v1/metrics_service.pb.h"
 #include "opentelemetry/proto/common/v1/common.pb.h"
 #include "opentelemetry/proto/metrics/v1/metrics.pb.h"
 #include "opentelemetry/proto/resource/v1/resource.pb.h"
 
+using opentelemetry::proto::collector::metrics::v1::ExportMetricsPartialSuccess;
 using opentelemetry::proto::collector::metrics::v1::ExportMetricsServiceRequest;
 using opentelemetry::proto::collector::metrics::v1::
     ExportMetricsServiceResponse;
@@ -232,7 +234,23 @@ static grpc::Status unmarshal_sum_metric(Sum sum, metric_family_t *fam) {
   return grpc::Status::OK;
 }
 
-static grpc::Status dispatch_metric(Metric mpb, label_set_t resource) {
+static grpc::Status reject_data_points(std::string msg, int num,
+                                       ExportMetricsPartialSuccess *ps) {
+  int64_t rejected = ps->rejected_data_points();
+  rejected += (int64_t)num;
+  ps->set_rejected_data_points(rejected);
+
+  std::string *error_message = ps->mutable_error_message();
+  if (!error_message->empty()) {
+    error_message->append(", ");
+  }
+  error_message->append(msg);
+
+  return grpc::Status::OK;
+}
+
+static grpc::Status dispatch_metric(Metric mpb, label_set_t resource,
+                                    ExportMetricsPartialSuccess *ps) {
   metric_family_t fam = {
       .name = (char *)mpb.name().c_str(),
       .help = (char *)mpb.description().c_str(),
@@ -246,7 +264,8 @@ static grpc::Status dispatch_metric(Metric mpb, label_set_t resource) {
     grpc::Status s = unmarshal_gauge_metric(mpb.gauge(), &fam);
     if (!s.ok()) {
       metric_family_metric_reset(&fam);
-      return s;
+      return reject_data_points(s.error_message(),
+                                mpb.gauge().data_points().size(), ps);
     }
     break;
   }
@@ -254,17 +273,23 @@ static grpc::Status dispatch_metric(Metric mpb, label_set_t resource) {
     grpc::Status s = unmarshal_sum_metric(mpb.sum(), &fam);
     if (!s.ok()) {
       metric_family_metric_reset(&fam);
-      return s;
+      return reject_data_points(s.error_message(),
+                                mpb.sum().data_points().size(), ps);
     }
     break;
   }
   case Metric::kHistogram:
+    return reject_data_points(
+        std::string("histogram metrics are not supported"),
+        mpb.histogram().data_points().size(), ps);
   case Metric::kExponentialHistogram:
-    return grpc::Status(grpc::StatusCode::UNIMPLEMENTED,
-                        "histogram metrics are not supported");
-  case Metric::kSummary:
-    return grpc::Status(grpc::StatusCode::UNIMPLEMENTED,
-                        "summary metrics are not supported");
+    return reject_data_points(
+        std::string("exponential histogram metrics are not supported"),
+        mpb.exponential_histogram().data_points().size(), ps);
+  case Metric::kSummary: {
+    return reject_data_points(std::string("summary metrics are not supported"),
+                              mpb.summary().data_points().size(), ps);
+  }
   default:
     return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
                         "unexpected data type");
@@ -287,7 +312,8 @@ static grpc::Status unmarshal_resource(Resource rpb, label_set_t *resource) {
   return grpc::Status::OK;
 }
 
-static grpc::Status dispatch_resource_metrics(ResourceMetrics rm) {
+static grpc::Status dispatch_resource_metrics(ResourceMetrics rm,
+                                              ExportMetricsPartialSuccess *ps) {
   label_set_t resource = {0};
 
   grpc::Status s = unmarshal_resource(rm.resource(), &resource);
@@ -297,7 +323,7 @@ static grpc::Status dispatch_resource_metrics(ResourceMetrics rm) {
 
   for (auto sm : rm.scope_metrics()) {
     for (auto m : sm.metrics()) {
-      grpc::Status s = dispatch_metric(m, resource);
+      grpc::Status s = dispatch_metric(m, resource, ps);
       if (!s.ok()) {
         return s;
       }
@@ -316,15 +342,18 @@ public:
   grpc::Status Export(grpc::ServerContext *context,
                       const ExportMetricsServiceRequest *req,
                       ExportMetricsServiceResponse *resp) {
-    for (auto rm : req.resource_metrics()) {
-      grpc::Status s = dispatch_resource_metrics(rm);
+    ExportMetricsPartialSuccess *ps = resp->mutable_partial_success();
+
+    for (auto rm : req->resource_metrics()) {
+      grpc::Status s = dispatch_resource_metrics(rm, ps);
       if (!s.ok()) {
+        ERROR("open_telemetry_collector: dispatch_resource_metrics failed: %s",
+              s.error_message().c_str());
         return s;
       }
     }
 
-    return grpc::Status(grpc::StatusCode::UNIMPLEMENTED,
-                        "Export is not implemented yet");
+    return grpc::Status::OK;
   }
 };