From: Florian Forster Date: Thu, 1 Feb 2024 06:58:29 +0000 (+0100) Subject: open_telemetry_collector: Handle `AGGREGATION_TEMPORALITY_DELTA` correctly. X-Git-Tag: collectd-6.0.0.rc3~4^2~19 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=e4065ea0ad02be26523c8aef48c1d184acfdebcf;p=thirdparty%2Fcollectd.git open_telemetry_collector: Handle `AGGREGATION_TEMPORALITY_DELTA` correctly. --- diff --git a/src/open_telemetry_collector.cc b/src/open_telemetry_collector.cc index 102460945..e4af1fe03 100644 --- a/src/open_telemetry_collector.cc +++ b/src/open_telemetry_collector.cc @@ -49,6 +49,10 @@ using opentelemetry::proto::collector::metrics::v1:: using opentelemetry::proto::collector::metrics::v1::MetricsService; using opentelemetry::proto::common::v1::AnyValue; using opentelemetry::proto::common::v1::KeyValue; +using opentelemetry::proto::metrics::v1::AGGREGATION_TEMPORALITY_CUMULATIVE; +using opentelemetry::proto::metrics::v1::AGGREGATION_TEMPORALITY_DELTA; +using opentelemetry::proto::metrics::v1::AGGREGATION_TEMPORALITY_UNSPECIFIED; +using opentelemetry::proto::metrics::v1::AggregationTemporality; using opentelemetry::proto::metrics::v1::Gauge; using opentelemetry::proto::metrics::v1::Metric; using opentelemetry::proto::metrics::v1::NumberDataPoint; @@ -135,27 +139,48 @@ static grpc::Status unmarshal_label_pair(KeyValue kv, label_set_t *labels) { static grpc::Status unmarshal_data_point(NumberDataPoint dp, metric_family_t *fam, - bool is_cumulative) { + AggregationTemporality agg) { metric_t m = { + .family = fam, // family needs to be populated for uc_get_value(). .time = NS_TO_CDTIME_T(dp.time_unix_nano()), }; + bool is_cumulative = (agg == AGGREGATION_TEMPORALITY_DELTA || + agg == AGGREGATION_TEMPORALITY_CUMULATIVE); + + value_t offset = {0}; + if (agg == AGGREGATION_TEMPORALITY_DELTA) { + int err = uc_get_value(&m, &offset); + switch (err) { + case ENOENT: + case EAGAIN: + offset = (value_t){0}; + break; + case 0: + // no-op + break; + default: + return wrap_error(err); + } + } + switch (dp.value_case()) { case NumberDataPoint::kAsDouble: -// TODO(octo): enable once floating point counters have been merged (#4266) -#if 0 if (is_cumulative) { - fam->type = METRIC_TYPE_FPCOUNTER; - m.value.fpcounter = dp.as_double(); + // TODO(octo): enable once floating point counters have been merged + // (#4266) + // fam->type = METRIC_TYPE_FPCOUNTER; + // m.value.fpcounter = dp.as_double(); + fam->type = METRIC_TYPE_COUNTER; + m.value.counter = offset.counter + (counter_t)dp.as_double(); break; } -#endif m.value.gauge = dp.as_double(); break; case NumberDataPoint::kAsInt: if (is_cumulative) { fam->type = METRIC_TYPE_COUNTER; - m.value.counter = (counter_t)dp.as_int(); + m.value.counter = offset.counter + (counter_t)dp.as_int(); break; } m.value.gauge = (gauge_t)dp.as_int(); @@ -175,8 +200,6 @@ static grpc::Status unmarshal_data_point(NumberDataPoint dp, // TODO(octo): get the first metric time from the cache and detect counter // resets. - // TODO(octo): when aggregation temporality == AGGREGATION_TEMPORALITY_DELTA, - // get the previous value from the cache and add to it. int err = metric_family_metric_append(fam, m); @@ -186,7 +209,8 @@ static grpc::Status unmarshal_data_point(NumberDataPoint dp, static grpc::Status unmarshal_gauge_metric(Gauge g, metric_family_t *fam) { for (auto db : g.data_points()) { - grpc::Status s = unmarshal_data_point(db, fam, false); + grpc::Status s = + unmarshal_data_point(db, fam, AGGREGATION_TEMPORALITY_UNSPECIFIED); if (!s.ok()) { return s; } @@ -195,10 +219,11 @@ static grpc::Status unmarshal_gauge_metric(Gauge g, metric_family_t *fam) { return grpc::Status::OK; } -static grpc::Status unmarshal_sum_metric(Sum s, metric_family_t *fam) { +static grpc::Status unmarshal_sum_metric(Sum sum, metric_family_t *fam) { // TODO(octo): check is_monotonic and aggregation temporality - for (auto db : s.data_points()) { - grpc::Status s = unmarshal_data_point(db, fam, true); + for (auto db : sum.data_points()) { + grpc::Status s = + unmarshal_data_point(db, fam, sum.aggregation_temporality()); if (!s.ok()) { return s; }