From: Florian Forster Date: Mon, 4 Dec 2023 14:10:53 +0000 (+0100) Subject: write_open_telemetry plugin: Implement gRPC sending logic. X-Git-Tag: 6.0.0-rc0~17^2~30 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=fa25e4d70dfb6352b5c8253ac8277de58920b80b;p=thirdparty%2Fcollectd.git write_open_telemetry plugin: Implement gRPC sending logic. --- diff --git a/build.sh b/build.sh index 0fe9142da..4c36775c0 100755 --- a/build.sh +++ b/build.sh @@ -56,9 +56,10 @@ build() && automake --add-missing --copy \ && autoconf - for f in common/v1/common.proto metrics/v1/metrics.proto resource/v1/resource.proto; do + for f in common/v1/common.proto metrics/v1/metrics.proto resource/v1/resource.proto collector/metrics/v1/metrics_service.proto; do protoc -Iopentelemetry-proto --cpp_out src/ "opentelemetry-proto/opentelemetry/proto/${f}" done + protoc -Iopentelemetry-proto --grpc_out src/ --plugin="protoc-gen-grpc=$(which grpc_cpp_plugin)" "opentelemetry-proto/opentelemetry/proto/collector/metrics/v1/metrics_service.proto" } build_cygwin() diff --git a/src/utils/format_open_telemetry/format_open_telemetry.cc b/src/utils/format_open_telemetry/format_open_telemetry.cc index 771332a8c..3739b1ee2 100644 --- a/src/utils/format_open_telemetry/format_open_telemetry.cc +++ b/src/utils/format_open_telemetry/format_open_telemetry.cc @@ -27,14 +27,16 @@ extern "C" { #include "collectd.h" #include "metric.h" +} #include "utils/format_open_telemetry/format_open_telemetry.h" -} +#include "opentelemetry/proto/collector/metrics/v1/metrics_service.pb.h" #include "opentelemetry/proto/common/v1/common.pb.h" #include "opentelemetry/proto/metrics/v1/metrics.pb.h" #include "opentelemetry/proto/resource/v1/resource.pb.h" +using opentelemetry::proto::collector::metrics::v1::ExportMetricsServiceRequest; using opentelemetry::proto::common::v1::AnyValue; using opentelemetry::proto::common::v1::InstrumentationScope; using opentelemetry::proto::common::v1::KeyValue; @@ -46,7 +48,8 @@ using opentelemetry::proto::metrics::v1::ResourceMetrics; using opentelemetry::proto::metrics::v1::ScopeMetrics; using opentelemetry::proto::metrics::v1::Sum; -static void metric_to_number_data_point(NumberDataPoint *dp, metric_t const *m) { +static void metric_to_number_data_point(NumberDataPoint *dp, + metric_t const *m) { for (size_t i = 0; i < m->label.num; i++) { label_pair_t *l = m->label.ptr + i; @@ -61,19 +64,24 @@ static void metric_to_number_data_point(NumberDataPoint *dp, metric_t const *m) // when we've seen a metric for the first time. switch (m->family->type) { - case METRIC_TYPE_COUNTER: - dp->set_as_int(m->value.derive); - case METRIC_TYPE_GAUGE: - dp->set_as_double(m->value.gauge); - case METRIC_TYPE_UNTYPED: - // TODO - assert(0); + case METRIC_TYPE_COUNTER: + dp->set_as_int(m->value.derive); + break; + case METRIC_TYPE_GAUGE: + dp->set_as_double(m->value.gauge); + break; + case METRIC_TYPE_UNTYPED: + // TODO + assert(0); } } static void set_sum(Metric *m, metric_family_t const *fam) { Sum *s = m->mutable_sum(); for (size_t i = 0; i < fam->metric.num; i++) { + metric_t const *m = fam->metric.ptr + i; + assert(m->family == fam); + NumberDataPoint *dp = s->add_data_points(); metric_to_number_data_point(dp, fam->metric.ptr + i); } @@ -99,15 +107,15 @@ static void add_metric(ScopeMetrics *sm, metric_family_t const *fam) { } switch (fam->type) { - case METRIC_TYPE_COUNTER: - set_sum(m, fam); - return; - case METRIC_TYPE_GAUGE: - set_gauge(m, fam); - return; - case METRIC_TYPE_UNTYPED: - // TODO - assert(0); + case METRIC_TYPE_COUNTER: + set_sum(m, fam); + return; + case METRIC_TYPE_GAUGE: + set_gauge(m, fam); + return; + case METRIC_TYPE_UNTYPED: + // TODO + assert(0); } } @@ -117,18 +125,31 @@ static void set_instrumentation_scope(ScopeMetrics *sm) { is->set_version(PACKAGE_VERSION); } -static void set_scope_metrics(ResourceMetrics *rm, metric_family_t const *fam) { +static void set_scope_metrics(ResourceMetrics *rm, metric_family_t const **fam, + size_t fam_num) { ScopeMetrics *sm = rm->add_scope_metrics(); set_instrumentation_scope(sm); - add_metric(sm, fam); + for (size_t i = 0; i < fam_num; i++) { + add_metric(sm, fam[i]); + } +} + +ResourceMetrics * +format_open_telemetry_resource_metrics_serialized(metric_family_t const **fam, + size_t fam_num) { + ResourceMetrics *rm = new ResourceMetrics(); + + set_scope_metrics(rm, fam, fam_num); + return rm; } -int format_open_telemetry(strbuf_t *sb, metric_family_t const *fam) { +int format_open_telemetry_resource_metrics_serialized( + strbuf_t *sb, metric_family_t const **fam, size_t fam_num) { ResourceMetrics rm; - set_scope_metrics(&rm, fam); + set_scope_metrics(&rm, fam, fam_num); std::string serialization; bool ok = rm.SerializeToString(&serialization); @@ -140,3 +161,13 @@ int format_open_telemetry(strbuf_t *sb, metric_family_t const *fam) { return 0; } +ExportMetricsServiceRequest * +format_open_telemetry_export_metrics_service_request( + metric_family_t const **fam, size_t fam_num) { + ExportMetricsServiceRequest *req = new ExportMetricsServiceRequest(); + + ResourceMetrics *rm = req->add_resource_metrics(); + set_scope_metrics(rm, fam, fam_num); + + return req; +} diff --git a/src/utils/format_open_telemetry/format_open_telemetry.h b/src/utils/format_open_telemetry/format_open_telemetry.h index 4120dc9b6..fa3c36106 100644 --- a/src/utils/format_open_telemetry/format_open_telemetry.h +++ b/src/utils/format_open_telemetry/format_open_telemetry.h @@ -27,9 +27,29 @@ #ifndef UTILS_FORMAT_OPEN_TELEMETRY_H #define UTILS_FORMAT_OPEN_TELEMETRY_H 1 +#ifdef __cplusplus +extern "C" { +#endif + #include "collectd.h" #include "metric.h" -int format_open_telemetry(strbuf_t *sb, metric_family_t const *fam); // TODO: lacks return value +int format_open_telemetry_resource_metrics_serialized( + strbuf_t *sb, metric_family_t const **fam, size_t fam_num); + +#ifdef __cplusplus +} + +#include "opentelemetry/proto/metrics/v1/metrics.pb.h" +#include "opentelemetry/proto/collector/metrics/v1/metrics_service.pb.h" + +opentelemetry::proto::metrics::v1::ResourceMetrics * +format_open_telemetry_resource_metrics_serialized(metric_family_t const **fam, + size_t fam_num); + +opentelemetry::proto::collector::metrics::v1::ExportMetricsServiceRequest * +format_open_telemetry_export_metrics_service_request( + metric_family_t const **fam, size_t fam_num); +#endif #endif /* UTILS_FORMAT_OPEN_TELEMETRY_H */ diff --git a/src/write_open_telemetry.cc b/src/write_open_telemetry.cc index be289d071..f1c99d085 100644 --- a/src/write_open_telemetry.cc +++ b/src/write_open_telemetry.cc @@ -42,41 +42,32 @@ extern "C" { #include "utils/common/common.h" #include "utils/avltree/avltree.h" -#include "utils/format_open_telemetry/format_open_telemetry.h" #include "utils/strbuf/strbuf.h" #include "utils_complain.h" #include } +#include + +#include "opentelemetry/proto/collector/metrics/v1/metrics_service.grpc.pb.h" +#include "utils/format_open_telemetry/format_open_telemetry.h" + #ifndef OT_DEFAULT_HOST #define OT_DEFAULT_HOST "localhost" #endif #ifndef OT_DEFAULT_PORT -#define OT_DEFAULT_PORT "8080" +#define OT_DEFAULT_PORT "4317" #endif #ifndef OT_DEFAULT_PATH #define OT_DEFAULT_PATH "/v1/metrics" #endif -#ifndef OT_DEFAULT_LOG_SEND_ERRORS -#define OT_DEFAULT_LOG_SEND_ERRORS true -#endif - -#ifndef OT_DEFAULT_ESCAPE -#define OT_DEFAULT_ESCAPE '_' -#endif - -/* Ethernet - (IPv6 + TCP) = 1500 - (40 + 32) = 1428 */ -#ifndef OT_SEND_BUF_SIZE -#define OT_SEND_BUF_SIZE 1428 -#endif - -#ifndef OT_MIN_RECONNECT_INTERVAL -#define OT_MIN_RECONNECT_INTERVAL TIME_T_TO_CDTIME_T(1) -#endif +using opentelemetry::proto::collector::metrics::v1:: + ExportMetricsServiceResponse; +using opentelemetry::proto::collector::metrics::v1::MetricsService; /* * Private variables @@ -93,12 +84,14 @@ typedef struct { c_avl_tree_t *staged_metrics; // char* metric_identity() -> NULL c_avl_tree_t *staged_metric_families; // char* fam->name -> metric_family_t* + std::unique_ptr stub; + pthread_mutex_t mu; } ot_callback_t; static int ot_send_buffer(ot_callback_t *cb) { size_t families_num = (size_t)c_avl_size(cb->staged_metric_families); - metric_family_t *families[families_num]; + metric_family_t const *families[families_num]; memset(families, 0, sizeof(families)); @@ -115,8 +108,42 @@ static int ot_send_buffer(ot_callback_t *cb) { families[i] = fam; } - DEBUG("write_open_telemetry plugin: TODO(octo): send %zu metric families", - families_num); + if (cb->stub == NULL) { + strbuf_t buf = STRBUF_CREATE; + strbuf_printf(&buf, "%s:%s", cb->host, cb->port); + + auto chan = + grpc::CreateChannel(buf.ptr, grpc::InsecureChannelCredentials()); + cb->stub = MetricsService::NewStub(chan); + + STRBUF_DESTROY(buf); + } + + auto req = format_open_telemetry_export_metrics_service_request(families, + families_num); + + grpc::ClientContext context; + ExportMetricsServiceResponse resp; + grpc::Status status = cb->stub->Export(&context, *req, &resp); + + if (!status.ok()) { + ERROR("write_open_telemetry plugin: Exporting metrics failed: %s", + status.error_message().c_str()); + return -1; + } + + if (resp.has_partial_success() && + resp.partial_success().rejected_data_points() > 0) { + auto ps = resp.partial_success(); + NOTICE("write_open_telemetry plugin: %" PRId64 + " data points were rejected: %s", + ps.rejected_data_points(), ps.error_message().c_str()); + } else { + DEBUG("write_open_telemetry plugin: Successfully sent %zu metric families", + families_num); + } + + delete req; return 0; } @@ -173,6 +200,8 @@ static void ot_callback_decref(void *data) { ot_flush_nolock(/* timeout = */ 0, cb); + cb->stub.reset(); + c_avl_destroy(cb->staged_metrics); c_avl_destroy(cb->staged_metric_families);