using opentelemetry::proto::collector::metrics::v1::MetricsService;
using opentelemetry::proto::common::v1::AnyValue;
using opentelemetry::proto::common::v1::KeyValue;
+using opentelemetry::proto::metrics::v1::AGGREGATION_TEMPORALITY_CUMULATIVE;
+using opentelemetry::proto::metrics::v1::AGGREGATION_TEMPORALITY_DELTA;
+using opentelemetry::proto::metrics::v1::AGGREGATION_TEMPORALITY_UNSPECIFIED;
+using opentelemetry::proto::metrics::v1::AggregationTemporality;
using opentelemetry::proto::metrics::v1::Gauge;
using opentelemetry::proto::metrics::v1::Metric;
using opentelemetry::proto::metrics::v1::NumberDataPoint;
static grpc::Status unmarshal_data_point(NumberDataPoint dp,
metric_family_t *fam,
- bool is_cumulative) {
+ AggregationTemporality agg) {
metric_t m = {
+ .family = fam, // family needs to be populated for uc_get_value().
.time = NS_TO_CDTIME_T(dp.time_unix_nano()),
};
+ bool is_cumulative = (agg == AGGREGATION_TEMPORALITY_DELTA ||
+ agg == AGGREGATION_TEMPORALITY_CUMULATIVE);
+
+ value_t offset = {0};
+ if (agg == AGGREGATION_TEMPORALITY_DELTA) {
+ int err = uc_get_value(&m, &offset);
+ switch (err) {
+ case ENOENT:
+ case EAGAIN:
+ offset = (value_t){0};
+ break;
+ case 0:
+ // no-op
+ break;
+ default:
+ return wrap_error(err);
+ }
+ }
+
switch (dp.value_case()) {
case NumberDataPoint::kAsDouble:
-// TODO(octo): enable once floating point counters have been merged (#4266)
-#if 0
if (is_cumulative) {
- fam->type = METRIC_TYPE_FPCOUNTER;
- m.value.fpcounter = dp.as_double();
+ // TODO(octo): enable once floating point counters have been merged
+ // (#4266)
+ // fam->type = METRIC_TYPE_FPCOUNTER;
+ // m.value.fpcounter = dp.as_double();
+ fam->type = METRIC_TYPE_COUNTER;
+ m.value.counter = offset.counter + (counter_t)dp.as_double();
break;
}
-#endif
m.value.gauge = dp.as_double();
break;
case NumberDataPoint::kAsInt:
if (is_cumulative) {
fam->type = METRIC_TYPE_COUNTER;
- m.value.counter = (counter_t)dp.as_int();
+ m.value.counter = offset.counter + (counter_t)dp.as_int();
break;
}
m.value.gauge = (gauge_t)dp.as_int();
// TODO(octo): get the first metric time from the cache and detect counter
// resets.
- // TODO(octo): when aggregation temporality == AGGREGATION_TEMPORALITY_DELTA,
- // get the previous value from the cache and add to it.
int err = metric_family_metric_append(fam, m);
static grpc::Status unmarshal_gauge_metric(Gauge g, metric_family_t *fam) {
for (auto db : g.data_points()) {
- grpc::Status s = unmarshal_data_point(db, fam, false);
+ grpc::Status s =
+ unmarshal_data_point(db, fam, AGGREGATION_TEMPORALITY_UNSPECIFIED);
if (!s.ok()) {
return s;
}
return grpc::Status::OK;
}
-static grpc::Status unmarshal_sum_metric(Sum s, metric_family_t *fam) {
+static grpc::Status unmarshal_sum_metric(Sum sum, metric_family_t *fam) {
// TODO(octo): check is_monotonic and aggregation temporality
- for (auto db : s.data_points()) {
- grpc::Status s = unmarshal_data_point(db, fam, true);
+ for (auto db : sum.data_points()) {
+ grpc::Status s =
+ unmarshal_data_point(db, fam, sum.aggregation_temporality());
if (!s.ok()) {
return s;
}