]> git.ipfire.org Git - thirdparty/collectd.git/commitdiff
open_telemetry_collector: Handle `AGGREGATION_TEMPORALITY_DELTA` correctly.
authorFlorian Forster <octo@collectd.org>
Thu, 1 Feb 2024 06:58:29 +0000 (07:58 +0100)
committerFlorian Forster <octo@collectd.org>
Tue, 20 Feb 2024 14:28:50 +0000 (15:28 +0100)
src/open_telemetry_collector.cc

index 1024609452322f98bce0def1fba8d34088340947..e4af1fe0370d2cf2a5c63954d8313f5c390b2731 100644 (file)
@@ -49,6 +49,10 @@ using opentelemetry::proto::collector::metrics::v1::
 using opentelemetry::proto::collector::metrics::v1::MetricsService;
 using opentelemetry::proto::common::v1::AnyValue;
 using opentelemetry::proto::common::v1::KeyValue;
+using opentelemetry::proto::metrics::v1::AGGREGATION_TEMPORALITY_CUMULATIVE;
+using opentelemetry::proto::metrics::v1::AGGREGATION_TEMPORALITY_DELTA;
+using opentelemetry::proto::metrics::v1::AGGREGATION_TEMPORALITY_UNSPECIFIED;
+using opentelemetry::proto::metrics::v1::AggregationTemporality;
 using opentelemetry::proto::metrics::v1::Gauge;
 using opentelemetry::proto::metrics::v1::Metric;
 using opentelemetry::proto::metrics::v1::NumberDataPoint;
@@ -135,27 +139,48 @@ static grpc::Status unmarshal_label_pair(KeyValue kv, label_set_t *labels) {
 
 static grpc::Status unmarshal_data_point(NumberDataPoint dp,
                                          metric_family_t *fam,
-                                         bool is_cumulative) {
+                                         AggregationTemporality agg) {
   metric_t m = {
+      .family = fam, // family needs to be populated for uc_get_value().
       .time = NS_TO_CDTIME_T(dp.time_unix_nano()),
   };
 
+  bool is_cumulative = (agg == AGGREGATION_TEMPORALITY_DELTA ||
+                        agg == AGGREGATION_TEMPORALITY_CUMULATIVE);
+
+  value_t offset = {0};
+  if (agg == AGGREGATION_TEMPORALITY_DELTA) {
+    int err = uc_get_value(&m, &offset);
+    switch (err) {
+    case ENOENT:
+    case EAGAIN:
+      offset = (value_t){0};
+      break;
+    case 0:
+      // no-op
+      break;
+    default:
+      return wrap_error(err);
+    }
+  }
+
   switch (dp.value_case()) {
   case NumberDataPoint::kAsDouble:
-// TODO(octo): enable once floating point counters have been merged (#4266)
-#if 0
     if (is_cumulative) {
-      fam->type = METRIC_TYPE_FPCOUNTER;
-      m.value.fpcounter = dp.as_double();
+      // TODO(octo): enable once floating point counters have been merged
+      // (#4266)
+      // fam->type = METRIC_TYPE_FPCOUNTER;
+      // m.value.fpcounter = dp.as_double();
+      fam->type = METRIC_TYPE_COUNTER;
+      m.value.counter = offset.counter + (counter_t)dp.as_double();
       break;
     }
-#endif
     m.value.gauge = dp.as_double();
     break;
   case NumberDataPoint::kAsInt:
     if (is_cumulative) {
       fam->type = METRIC_TYPE_COUNTER;
-      m.value.counter = (counter_t)dp.as_int();
+      m.value.counter = offset.counter + (counter_t)dp.as_int();
       break;
     }
     m.value.gauge = (gauge_t)dp.as_int();
@@ -175,8 +200,6 @@ static grpc::Status unmarshal_data_point(NumberDataPoint dp,
 
   // TODO(octo): get the first metric time from the cache and detect counter
   // resets.
-  // TODO(octo): when aggregation temporality == AGGREGATION_TEMPORALITY_DELTA,
-  // get the previous value from the cache and add to it.
 
   int err = metric_family_metric_append(fam, m);
 
@@ -186,7 +209,8 @@ static grpc::Status unmarshal_data_point(NumberDataPoint dp,
 
 static grpc::Status unmarshal_gauge_metric(Gauge g, metric_family_t *fam) {
   for (auto db : g.data_points()) {
-    grpc::Status s = unmarshal_data_point(db, fam, false);
+    grpc::Status s =
+        unmarshal_data_point(db, fam, AGGREGATION_TEMPORALITY_UNSPECIFIED);
     if (!s.ok()) {
       return s;
     }
@@ -195,10 +219,11 @@ static grpc::Status unmarshal_gauge_metric(Gauge g, metric_family_t *fam) {
   return grpc::Status::OK;
 }
 
-static grpc::Status unmarshal_sum_metric(Sum s, metric_family_t *fam) {
+static grpc::Status unmarshal_sum_metric(Sum sum, metric_family_t *fam) {
   // TODO(octo): check is_monotonic and aggregation temporality
-  for (auto db : s.data_points()) {
-    grpc::Status s = unmarshal_data_point(db, fam, true);
+  for (auto db : sum.data_points()) {
+    grpc::Status s =
+        unmarshal_data_point(db, fam, sum.aggregation_temporality());
     if (!s.ok()) {
       return s;
     }