&& automake --add-missing --copy \
&& autoconf
- for f in common/v1/common.proto metrics/v1/metrics.proto resource/v1/resource.proto; do
+ for f in common/v1/common.proto metrics/v1/metrics.proto resource/v1/resource.proto collector/metrics/v1/metrics_service.proto; do
protoc -Iopentelemetry-proto --cpp_out src/ "opentelemetry-proto/opentelemetry/proto/${f}"
done
+ protoc -Iopentelemetry-proto --grpc_out src/ --plugin="protoc-gen-grpc=$(which grpc_cpp_plugin)" "opentelemetry-proto/opentelemetry/proto/collector/metrics/v1/metrics_service.proto"
}
build_cygwin()
extern "C" {
#include "collectd.h"
#include "metric.h"
+}
#include "utils/format_open_telemetry/format_open_telemetry.h"
-}
+#include "opentelemetry/proto/collector/metrics/v1/metrics_service.pb.h"
#include "opentelemetry/proto/common/v1/common.pb.h"
#include "opentelemetry/proto/metrics/v1/metrics.pb.h"
#include "opentelemetry/proto/resource/v1/resource.pb.h"
+using opentelemetry::proto::collector::metrics::v1::ExportMetricsServiceRequest;
using opentelemetry::proto::common::v1::AnyValue;
using opentelemetry::proto::common::v1::InstrumentationScope;
using opentelemetry::proto::common::v1::KeyValue;
using opentelemetry::proto::metrics::v1::ScopeMetrics;
using opentelemetry::proto::metrics::v1::Sum;
-static void metric_to_number_data_point(NumberDataPoint *dp, metric_t const *m) {
+static void metric_to_number_data_point(NumberDataPoint *dp,
+ metric_t const *m) {
for (size_t i = 0; i < m->label.num; i++) {
label_pair_t *l = m->label.ptr + i;
// when we've seen a metric for the first time.
switch (m->family->type) {
- case METRIC_TYPE_COUNTER:
- dp->set_as_int(m->value.derive);
- case METRIC_TYPE_GAUGE:
- dp->set_as_double(m->value.gauge);
- case METRIC_TYPE_UNTYPED:
- // TODO
- assert(0);
+ case METRIC_TYPE_COUNTER:
+ dp->set_as_int(m->value.derive);
+ break;
+ case METRIC_TYPE_GAUGE:
+ dp->set_as_double(m->value.gauge);
+ break;
+ case METRIC_TYPE_UNTYPED:
+ // TODO
+ assert(0);
}
}
static void set_sum(Metric *m, metric_family_t const *fam) {
Sum *s = m->mutable_sum();
for (size_t i = 0; i < fam->metric.num; i++) {
+ metric_t const *m = fam->metric.ptr + i;
+ assert(m->family == fam);
+
NumberDataPoint *dp = s->add_data_points();
metric_to_number_data_point(dp, fam->metric.ptr + i);
}
}
switch (fam->type) {
- case METRIC_TYPE_COUNTER:
- set_sum(m, fam);
- return;
- case METRIC_TYPE_GAUGE:
- set_gauge(m, fam);
- return;
- case METRIC_TYPE_UNTYPED:
- // TODO
- assert(0);
+ case METRIC_TYPE_COUNTER:
+ set_sum(m, fam);
+ return;
+ case METRIC_TYPE_GAUGE:
+ set_gauge(m, fam);
+ return;
+ case METRIC_TYPE_UNTYPED:
+ // TODO
+ assert(0);
}
}
is->set_version(PACKAGE_VERSION);
}
-static void set_scope_metrics(ResourceMetrics *rm, metric_family_t const *fam) {
+static void set_scope_metrics(ResourceMetrics *rm, metric_family_t const **fam,
+ size_t fam_num) {
ScopeMetrics *sm = rm->add_scope_metrics();
set_instrumentation_scope(sm);
- add_metric(sm, fam);
+ for (size_t i = 0; i < fam_num; i++) {
+ add_metric(sm, fam[i]);
+ }
+}
+
+ResourceMetrics *
+format_open_telemetry_resource_metrics_serialized(metric_family_t const **fam,
+ size_t fam_num) {
+ ResourceMetrics *rm = new ResourceMetrics();
+
+ set_scope_metrics(rm, fam, fam_num);
+ return rm;
}
-int format_open_telemetry(strbuf_t *sb, metric_family_t const *fam) {
+int format_open_telemetry_resource_metrics_serialized(
+ strbuf_t *sb, metric_family_t const **fam, size_t fam_num) {
ResourceMetrics rm;
- set_scope_metrics(&rm, fam);
+ set_scope_metrics(&rm, fam, fam_num);
std::string serialization;
bool ok = rm.SerializeToString(&serialization);
return 0;
}
+ExportMetricsServiceRequest *
+format_open_telemetry_export_metrics_service_request(
+ metric_family_t const **fam, size_t fam_num) {
+ ExportMetricsServiceRequest *req = new ExportMetricsServiceRequest();
+
+ ResourceMetrics *rm = req->add_resource_metrics();
+ set_scope_metrics(rm, fam, fam_num);
+
+ return req;
+}
#ifndef UTILS_FORMAT_OPEN_TELEMETRY_H
#define UTILS_FORMAT_OPEN_TELEMETRY_H 1
+#ifdef __cplusplus
+extern "C" {
+#endif
+
#include "collectd.h"
#include "metric.h"
-int format_open_telemetry(strbuf_t *sb, metric_family_t const *fam); // TODO: lacks return value
+int format_open_telemetry_resource_metrics_serialized(
+ strbuf_t *sb, metric_family_t const **fam, size_t fam_num);
+
+#ifdef __cplusplus
+}
+
+#include "opentelemetry/proto/metrics/v1/metrics.pb.h"
+#include "opentelemetry/proto/collector/metrics/v1/metrics_service.pb.h"
+
+opentelemetry::proto::metrics::v1::ResourceMetrics *
+format_open_telemetry_resource_metrics_serialized(metric_family_t const **fam,
+ size_t fam_num);
+
+opentelemetry::proto::collector::metrics::v1::ExportMetricsServiceRequest *
+format_open_telemetry_export_metrics_service_request(
+ metric_family_t const **fam, size_t fam_num);
+#endif
#endif /* UTILS_FORMAT_OPEN_TELEMETRY_H */
#include "utils/common/common.h"
#include "utils/avltree/avltree.h"
-#include "utils/format_open_telemetry/format_open_telemetry.h"
#include "utils/strbuf/strbuf.h"
#include "utils_complain.h"
#include <netdb.h>
}
+#include <grpc++/grpc++.h>
+
+#include "opentelemetry/proto/collector/metrics/v1/metrics_service.grpc.pb.h"
+#include "utils/format_open_telemetry/format_open_telemetry.h"
+
#ifndef OT_DEFAULT_HOST
#define OT_DEFAULT_HOST "localhost"
#endif
#ifndef OT_DEFAULT_PORT
-#define OT_DEFAULT_PORT "8080"
+#define OT_DEFAULT_PORT "4317"
#endif
#ifndef OT_DEFAULT_PATH
#define OT_DEFAULT_PATH "/v1/metrics"
#endif
-#ifndef OT_DEFAULT_LOG_SEND_ERRORS
-#define OT_DEFAULT_LOG_SEND_ERRORS true
-#endif
-
-#ifndef OT_DEFAULT_ESCAPE
-#define OT_DEFAULT_ESCAPE '_'
-#endif
-
-/* Ethernet - (IPv6 + TCP) = 1500 - (40 + 32) = 1428 */
-#ifndef OT_SEND_BUF_SIZE
-#define OT_SEND_BUF_SIZE 1428
-#endif
-
-#ifndef OT_MIN_RECONNECT_INTERVAL
-#define OT_MIN_RECONNECT_INTERVAL TIME_T_TO_CDTIME_T(1)
-#endif
+using opentelemetry::proto::collector::metrics::v1::
+ ExportMetricsServiceResponse;
+using opentelemetry::proto::collector::metrics::v1::MetricsService;
/*
* Private variables
c_avl_tree_t *staged_metrics; // char* metric_identity() -> NULL
c_avl_tree_t *staged_metric_families; // char* fam->name -> metric_family_t*
+ std::unique_ptr<MetricsService::Stub> stub;
+
pthread_mutex_t mu;
} ot_callback_t;
static int ot_send_buffer(ot_callback_t *cb) {
size_t families_num = (size_t)c_avl_size(cb->staged_metric_families);
- metric_family_t *families[families_num];
+ metric_family_t const *families[families_num];
memset(families, 0, sizeof(families));
families[i] = fam;
}
- DEBUG("write_open_telemetry plugin: TODO(octo): send %zu metric families",
- families_num);
+ if (cb->stub == NULL) {
+ strbuf_t buf = STRBUF_CREATE;
+ strbuf_printf(&buf, "%s:%s", cb->host, cb->port);
+
+ auto chan =
+ grpc::CreateChannel(buf.ptr, grpc::InsecureChannelCredentials());
+ cb->stub = MetricsService::NewStub(chan);
+
+ STRBUF_DESTROY(buf);
+ }
+
+ auto req = format_open_telemetry_export_metrics_service_request(families,
+ families_num);
+
+ grpc::ClientContext context;
+ ExportMetricsServiceResponse resp;
+ grpc::Status status = cb->stub->Export(&context, *req, &resp);
+
+ if (!status.ok()) {
+ ERROR("write_open_telemetry plugin: Exporting metrics failed: %s",
+ status.error_message().c_str());
+ return -1;
+ }
+
+ if (resp.has_partial_success() &&
+ resp.partial_success().rejected_data_points() > 0) {
+ auto ps = resp.partial_success();
+ NOTICE("write_open_telemetry plugin: %" PRId64
+ " data points were rejected: %s",
+ ps.rejected_data_points(), ps.error_message().c_str());
+ } else {
+ DEBUG("write_open_telemetry plugin: Successfully sent %zu metric families",
+ families_num);
+ }
+
+ delete req;
return 0;
}
ot_flush_nolock(/* timeout = */ 0, cb);
+ cb->stub.reset();
+
c_avl_destroy(cb->staged_metrics);
c_avl_destroy(cb->staged_metric_families);