#include <grpc++/grpc++.h>
#include "opentelemetry/proto/collector/metrics/v1/metrics_service.grpc.pb.h"
+#include "opentelemetry/proto/collector/metrics/v1/metrics_service.pb.h"
#include "opentelemetry/proto/common/v1/common.pb.h"
#include "opentelemetry/proto/metrics/v1/metrics.pb.h"
#include "opentelemetry/proto/resource/v1/resource.pb.h"
+using opentelemetry::proto::collector::metrics::v1::ExportMetricsPartialSuccess;
using opentelemetry::proto::collector::metrics::v1::ExportMetricsServiceRequest;
using opentelemetry::proto::collector::metrics::v1::
ExportMetricsServiceResponse;
return grpc::Status::OK;
}
-static grpc::Status dispatch_metric(Metric mpb, label_set_t resource) {
+static grpc::Status reject_data_points(std::string msg, int num,
+ ExportMetricsPartialSuccess *ps) {
+ int64_t rejected = ps->rejected_data_points();
+ rejected += (int64_t)num;
+ ps->set_rejected_data_points(rejected);
+
+ std::string *error_message = ps->mutable_error_message();
+ if (!error_message->empty()) {
+ error_message->append(", ");
+ }
+ error_message->append(msg);
+
+ return grpc::Status::OK;
+}
+
+static grpc::Status dispatch_metric(Metric mpb, label_set_t resource,
+ ExportMetricsPartialSuccess *ps) {
metric_family_t fam = {
.name = (char *)mpb.name().c_str(),
.help = (char *)mpb.description().c_str(),
grpc::Status s = unmarshal_gauge_metric(mpb.gauge(), &fam);
if (!s.ok()) {
metric_family_metric_reset(&fam);
- return s;
+ return reject_data_points(s.error_message(),
+ mpb.gauge().data_points().size(), ps);
}
break;
}
grpc::Status s = unmarshal_sum_metric(mpb.sum(), &fam);
if (!s.ok()) {
metric_family_metric_reset(&fam);
- return s;
+ return reject_data_points(s.error_message(),
+ mpb.sum().data_points().size(), ps);
}
break;
}
case Metric::kHistogram:
+ return reject_data_points(
+ std::string("histogram metrics are not supported"),
+ mpb.histogram().data_points().size(), ps);
case Metric::kExponentialHistogram:
- return grpc::Status(grpc::StatusCode::UNIMPLEMENTED,
- "histogram metrics are not supported");
- case Metric::kSummary:
- return grpc::Status(grpc::StatusCode::UNIMPLEMENTED,
- "summary metrics are not supported");
+ return reject_data_points(
+ std::string("exponential histogram metrics are not supported"),
+ mpb.exponential_histogram().data_points().size(), ps);
+ case Metric::kSummary: {
+ return reject_data_points(std::string("summary metrics are not supported"),
+ mpb.summary().data_points().size(), ps);
+ }
default:
return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
"unexpected data type");
return grpc::Status::OK;
}
-static grpc::Status dispatch_resource_metrics(ResourceMetrics rm) {
+static grpc::Status dispatch_resource_metrics(ResourceMetrics rm,
+ ExportMetricsPartialSuccess *ps) {
label_set_t resource = {0};
grpc::Status s = unmarshal_resource(rm.resource(), &resource);
for (auto sm : rm.scope_metrics()) {
for (auto m : sm.metrics()) {
- grpc::Status s = dispatch_metric(m, resource);
+ grpc::Status s = dispatch_metric(m, resource, ps);
if (!s.ok()) {
return s;
}
grpc::Status Export(grpc::ServerContext *context,
const ExportMetricsServiceRequest *req,
ExportMetricsServiceResponse *resp) {
- for (auto rm : req.resource_metrics()) {
- grpc::Status s = dispatch_resource_metrics(rm);
+ ExportMetricsPartialSuccess *ps = resp->mutable_partial_success();
+
+ for (auto rm : req->resource_metrics()) {
+ grpc::Status s = dispatch_resource_metrics(rm, ps);
if (!s.ok()) {
+ ERROR("open_telemetry_collector: dispatch_resource_metrics failed: %s",
+ s.error_message().c_str());
return s;
}
}
- return grpc::Status(grpc::StatusCode::UNIMPLEMENTED,
- "Export is not implemented yet");
+ return grpc::Status::OK;
}
};