pthread_mutex_t send_buffer_lock;
strbuf_t send_buffer;
cdtime_t send_buffer_init_time;
+ resource_metrics_set_t resource_metrics;
c_avl_tree_t *staged_metrics; // char* metric_identity() -> NULL
c_avl_tree_t *staged_metric_families; // char* fam->name -> metric_family_t*
curl_easy_setopt(cb->curl, CURLOPT_USERAGENT, COLLECTD_USERAGENT);
cb->headers = curl_slist_append(cb->headers, "Accept: */*");
- if (cb->format == WH_FORMAT_JSON || cb->format == WH_FORMAT_KAIROSDB)
+ switch (cb->format) {
+ case WH_FORMAT_JSON:
+ case WH_FORMAT_KAIROSDB:
+ case WH_FORMAT_OTLP_JSON:
cb->headers =
curl_slist_append(cb->headers, "Content-Type: application/json");
- else
+
+ default:
cb->headers = curl_slist_append(cb->headers, "Content-Type: text/plain");
+ }
cb->headers = curl_slist_append(cb->headers, "Expect:");
curl_easy_setopt(cb->curl, CURLOPT_HTTPHEADER, cb->headers);
return 0;
} /* int wh_callback_init */
+static int flush_resource_metrics(wh_callback_t *cb) {
+ /* You must hold cb->send_buffer_lock when calling. */
+ strbuf_t buf = STRBUF_CREATE;
+ int status = 0;
+ switch (cb->format) {
+ case WH_FORMAT_OTLP_JSON:
+ status = format_json_open_telemetry(&buf, &cb->resource_metrics);
+ if (status != 0) {
+ ERROR("write_http plugin: format_json_open_telemetry failed: %s",
+ STRERROR(status));
+ }
+ break;
+
+ default:
+ ERROR("write_http plugin: Unexpected format: %d", cb->format);
+ status = EINVAL;
+ }
+
+ if (status != 0) {
+ pthread_mutex_unlock(&cb->send_buffer_lock);
+ STRBUF_DESTROY(buf);
+ return status;
+ }
+
+ resource_metrics_reset(&cb->resource_metrics);
+ cb->send_buffer_init_time = cdtime();
+
+ pthread_mutex_unlock(&cb->send_buffer_lock);
+
+ status = wh_post(cb, buf.ptr, buf.pos);
+ STRBUF_DESTROY(buf);
+ return status;
+}
+
static int wh_flush(cdtime_t timeout,
const char *identifier __attribute__((unused)),
user_data_t *user_data) {
}
}
+ if (cb->format == WH_FORMAT_OTLP_PROTO || cb->format == WH_FORMAT_OTLP_JSON) {
+ /* cb->send_buffer_lock is unlocked in flush_resource_metrics. */
+ return flush_resource_metrics(cb);
+ }
+
if (cb->send_buffer.pos == 0) {
cb->send_buffer_init_time = cdtime();
pthread_mutex_unlock(&cb->send_buffer_lock);
return 0;
} /* wh_write_influxdb */
-static int wh_write_otlp_proto(metric_family_t const *fam, wh_callback_t *cb) {
- ERROR("wh_write_otlp_proto: Not implemented yet.");
- return -1;
-}
+static int wh_write_resource_metrics(metric_family_t const *fam,
+ wh_callback_t *cb) {
+ pthread_mutex_lock(&cb->send_buffer_lock);
+ int status = resource_metrics_add(&cb->resource_metrics, fam);
+ pthread_mutex_unlock(&cb->send_buffer_lock);
-static int wh_write_otlp_json(metric_family_t const *fam, wh_callback_t *cb) {
- ERROR("wh_write_otlp_json: Not implemented yet.");
- return -1;
+ if (status != 0) {
+ ERROR("write_http plugin: resource_metrics_add failed: %s",
+ STRERROR(status));
+ return status;
+ }
+ return 0;
}
static int wh_write(metric_family_t const *fam, user_data_t *user_data) {
status = wh_write_influxdb(fam, cb);
break;
case WH_FORMAT_OTLP_PROTO:
- status = wh_write_otlp_proto(fam, cb);
+ status = -1;
+ ERROR("wh_write_otlp_proto: Not implemented yet.");
break;
case WH_FORMAT_OTLP_JSON:
- status = wh_write_otlp_json(fam, cb);
+ status = wh_write_resource_metrics(fam, cb);
break;
default:
status = wh_write_command(fam, cb);