From 956cc5158883815a0d23b8c1b180e7196e523c8a Mon Sep 17 00:00:00 2001 From: "Matwey V. Kornilov" Date: Wed, 8 Jun 2022 12:56:42 +0300 Subject: [PATCH] [collectd 6] Port #3938 for collectd 6 (#3999) * write_http: Make use of CURLOPT_POSTFIELDSIZE CURLOPT_POSTFIELDSIZE allows to specify the data size, which is known in advance and equals to cb->send_buffer_fill. When CURLOPT_POSTFIELDSIZE is not set (or set to -1), then curl evaluates data size using strlen() function, which have O(N) complexity, so we save a few CPU cycles here. Signed-off-by: Matwey V. Kornilov * write_influxdb_udp: Split formatting functions to format_influxdb Signed-off-by: Matwey V. Kornilov * write_http: Add influxdb format Signed-off-by: Matwey V. Kornilov * write_http: Enable using unix socket in libcurl Signed-off-by: Matwey V. Kornilov Co-authored-by: Matthias Runge --- Makefile.am | 10 +- src/utils/format_influxdb/format_influxdb.c | 103 ++++++++++++++ src/utils/format_influxdb/format_influxdb.h | 35 +++++ src/write_http.c | 50 ++++++- src/write_influxdb_udp.c | 149 +++----------------- 5 files changed, 210 insertions(+), 137 deletions(-) create mode 100644 src/utils/format_influxdb/format_influxdb.c create mode 100644 src/utils/format_influxdb/format_influxdb.h diff --git a/Makefile.am b/Makefile.am index 35a3678b3..7da618886 100644 --- a/Makefile.am +++ b/Makefile.am @@ -130,6 +130,7 @@ noinst_LTLIBRARIES = \ libavltree.la \ libcmds.la \ libcommon.la \ + libformat_influxdb.la \ libformat_graphite.la \ libheap.la \ libignorelist.la \ @@ -437,6 +438,10 @@ libplugin_mock_la_SOURCES = \ libplugin_mock_la_CPPFLAGS = $(AM_CPPFLAGS) -DMOCK_TIME libplugin_mock_la_LIBADD = libcommon.la libignorelist.la libmetadata.la $(COMMON_LIBS) +libformat_influxdb_la_SOURCES = \ + src/utils/format_influxdb/format_influxdb.c \ + src/utils/format_influxdb/format_influxdb.h + libformat_graphite_la_SOURCES = \ src/utils/format_graphite/format_graphite.c \ src/utils/format_graphite/format_graphite.h @@ -2263,7 +2268,7 @@ write_http_la_SOURCES = \ src/utils/format_kairosdb/format_kairosdb.h write_http_la_CFLAGS = $(AM_CFLAGS) $(BUILD_WITH_LIBCURL_CFLAGS) write_http_la_LDFLAGS = $(PLUGIN_LDFLAGS) -write_http_la_LIBADD = libcmds.la libformat_json.la $(BUILD_WITH_LIBCURL_LIBS) +write_http_la_LIBADD = libcmds.la libformat_json.la libformat_influxdb.la $(BUILD_WITH_LIBCURL_LIBS) endif if BUILD_PLUGIN_WRITE_INFLUXDB_UDP @@ -2271,8 +2276,9 @@ pkglib_LTLIBRARIES += write_influxdb_udp.la write_influxdb_udp_la_SOURCES = src/write_influxdb_udp.c write_influxdb_udp_la_CPPFLAGS = $(AM_CPPFLAGS) write_influxdb_udp_la_LDFLAGS = $(PLUGIN_LDFLAGS) +write_influxdb_udp_la_LIBADD = libformat_influxdb.la if BUILD_WITH_LIBSOCKET -write_influxdb_udp_la_LIBADD = -lsocket +write_influxdb_udp_la_LIBADD += -lsocket endif endif diff --git a/src/utils/format_influxdb/format_influxdb.c b/src/utils/format_influxdb/format_influxdb.c new file mode 100644 index 000000000..2e9588bbf --- /dev/null +++ b/src/utils/format_influxdb/format_influxdb.c @@ -0,0 +1,103 @@ +/** + * collectd - src/utils_format_influxdb.c + * Copyright (C) 2007-2009 Florian octo Forster + * Copyright (C) 2009 Aman Gupta + * Copyright (C) 2019 Carlos Peon Costa + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; only version 2 of the License is applicable. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * Authors: + * Florian octo Forster + * Aman Gupta + * Carlos Peon Costa + **/ + +#include "collectd.h" + +#include "plugin.h" +#include "utils/common/common.h" +#include "utils_cache.h" + +#include "utils/format_influxdb/format_influxdb.h" + +int format_influxdb_point(strbuf_t *sb, metric_t metric, bool store_rates) { + bool have_values = false; + +#define BUFFER_ADD_ESCAPE(...) \ + do { \ + int status = strbuf_print_escaped(sb, __VA_ARGS__, "\\ ,=\"", '\\'); \ + if (status != 0) \ + return status; \ + } while (0) + +#define BUFFER_ADD(...) \ + do { \ + int status = strbuf_printf(sb, __VA_ARGS__); \ + if (status != 0) \ + return status; \ + } while (0) + + have_values = false; + BUFFER_ADD_ESCAPE(metric.family->name); + for (size_t j = 0; j < metric.label.num; j++) { + label_pair_t label = metric.label.ptr[j]; + BUFFER_ADD(","); + BUFFER_ADD_ESCAPE(label.name); + BUFFER_ADD("="); + BUFFER_ADD_ESCAPE(label.value); + } + BUFFER_ADD(" "); + + if (store_rates && (metric.family->type == METRIC_TYPE_COUNTER)) { + gauge_t rate; + if (uc_get_rate(&metric, &rate) != 0) { + WARNING("write_influxdb_udp plugin: " + "uc_get_rate failed."); + return EINVAL; + } + if (!isnan(rate)) { + BUFFER_ADD("value=" GAUGE_FORMAT, rate); + have_values = true; + } + } else { + switch (metric.family->type) { + case METRIC_TYPE_GAUGE: + case METRIC_TYPE_UNTYPED: + if (!isnan(metric.value.gauge)) { + BUFFER_ADD("value=" GAUGE_FORMAT, metric.value.gauge); + have_values = true; + } + break; + case METRIC_TYPE_COUNTER: + BUFFER_ADD("value=%" PRIi64 "i", metric.value.counter); + have_values = true; + break; + default: + WARNING("write_influxdb_udp plugin: " + "unknown family type."); + return EINVAL; + break; + } + } + + if (!have_values) + return 0; + + BUFFER_ADD(" %" PRIu64 "\n", CDTIME_T_TO_MS(metric.time)); + +#undef BUFFER_ADD_ESCAPE +#undef BUFFER_ADD + + return 0; +} /* int write_influxdb_point */ diff --git a/src/utils/format_influxdb/format_influxdb.h b/src/utils/format_influxdb/format_influxdb.h new file mode 100644 index 000000000..bd6b9746b --- /dev/null +++ b/src/utils/format_influxdb/format_influxdb.h @@ -0,0 +1,35 @@ +/** + * collectd - src/utils_format_influxdb.h + * Copyright (C) 2007-2009 Florian octo Forster + * Copyright (C) 2009 Aman Gupta + * Copyright (C) 2019 Carlos Peon Costa + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; only version 2 of the License is applicable. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * Authors: + * Florian octo Forster + * Aman Gupta + * Carlos Peon Costa + **/ + +#ifndef UTILS_FORMAT_INFLUXDB_H +#define UTILS_FORMAT_INFLUXDB_H 1 + +#include "collectd.h" + +#include "plugin.h" + +int format_influxdb_point(strbuf_t *sb, metric_t metric, bool store_rates); + +#endif /* UTILS_FORMAT_INFLUXDB_H */ diff --git a/src/write_http.c b/src/write_http.c index 1986f6804..e31877c46 100644 --- a/src/write_http.c +++ b/src/write_http.c @@ -29,6 +29,7 @@ #include "utils/cmds/putmetric.h" #include "utils/common/common.h" #include "utils/curl_stats/curl_stats.h" +#include "utils/format_influxdb/format_influxdb.h" #include "utils/format_json/format_json.h" #include "utils/format_kairosdb/format_kairosdb.h" @@ -69,6 +70,7 @@ struct wh_callback_s { #define WH_FORMAT_COMMAND 0 #define WH_FORMAT_JSON 1 #define WH_FORMAT_KAIROSDB 2 +#define WH_FORMAT_INFLUXDB 3 int format; bool send_metrics; bool send_notifications; @@ -88,6 +90,8 @@ struct wh_callback_s { int data_ttl; char *metrics_prefix; + + char *unix_socket_path; }; typedef struct wh_callback_s wh_callback_t; @@ -133,10 +137,11 @@ static void wh_log_http_error(wh_callback_t *cb) { } /* must hold cb->curl_lock when calling */ -static int wh_post(wh_callback_t *cb, char const *data) { +static int wh_post(wh_callback_t *cb, char const *data, long size) { pthread_mutex_lock(&cb->curl_lock); curl_easy_setopt(cb->curl, CURLOPT_URL, cb->location); + curl_easy_setopt(cb->curl, CURLOPT_POSTFIELDSIZE, size); curl_easy_setopt(cb->curl, CURLOPT_POSTFIELDS, data); curl_easy_setopt(cb->curl, CURLOPT_WRITEFUNCTION, &wh_curl_write_callback); curl_easy_setopt(cb->curl, CURLOPT_WRITEDATA, (void *)cb); @@ -245,6 +250,11 @@ static int wh_callback_init(wh_callback_t *cb) { if (cb->clientkeypass != NULL) curl_easy_setopt(cb->curl, CURLOPT_SSLKEYPASSWD, cb->clientkeypass); } +#ifdef CURL_VERSION_UNIX_SOCKETS + if (cb->unix_socket_path) { + curl_easy_setopt(cb->curl, CURLOPT_UNIX_SOCKET_PATH, cb->unix_socket_path); + } +#endif // CURL_VERSION_UNIX_SOCKETS strbuf_reset(&cb->send_buffer); @@ -282,6 +292,7 @@ static int wh_flush(cdtime_t timeout, } char const *json = strdup(cb->send_buffer.ptr); + const size_t size = cb->send_buffer.pos; strbuf_reset(&cb->send_buffer); cb->send_buffer_init_time = cdtime(); pthread_mutex_unlock(&cb->send_buffer_lock); @@ -290,7 +301,7 @@ static int wh_flush(cdtime_t timeout, return ENOMEM; } - return wh_post(cb, json); + return wh_post(cb, json, size); } /* int wh_flush */ static void wh_callback_free(void *data) { @@ -388,6 +399,25 @@ static int wh_write_kairosdb(metric_family_t const *fam, wh_callback_t *cb) { return 0; } /* int wh_write_kairosdb */ +static int wh_write_influxdb(metric_family_t const *fam, wh_callback_t *cb) { + pthread_mutex_lock(&cb->send_buffer_lock); + + for (size_t i = 0; i < fam->metric.num; i++) { + metric_t metric = fam->metric.ptr[i]; + int status = + format_influxdb_point(&cb->send_buffer, metric, cb->store_rates); + if (status != 0) { + pthread_mutex_unlock(&cb->send_buffer_lock); + ERROR("write_http plugin: format_influxdb_point failed: %s", + STRERROR(status)); + return status; + } + } + + pthread_mutex_unlock(&cb->send_buffer_lock); + return 0; +} /* wh_write_influxdb */ + static int wh_write(metric_family_t const *fam, user_data_t *user_data) { if ((fam == NULL) || (user_data == NULL)) { return EINVAL; @@ -405,6 +435,9 @@ static int wh_write(metric_family_t const *fam, user_data_t *user_data) { case WH_FORMAT_KAIROSDB: status = wh_write_kairosdb(fam, cb); break; + case WH_FORMAT_INFLUXDB: + status = wh_write_influxdb(fam, cb); + break; default: status = wh_write_command(fam, cb); break; @@ -436,7 +469,7 @@ static int wh_notify(notification_t const *n, user_data_t *ud) { return -1; } - status = wh_post(cb, alert); + status = wh_post(cb, alert, -1); pthread_mutex_unlock(&cb->send_buffer_lock); return status; @@ -459,6 +492,8 @@ static int config_set_format(wh_callback_t *cb, oconfig_item_t *ci) { cb->format = WH_FORMAT_JSON; else if (strcasecmp("KAIROSDB", string) == 0) cb->format = WH_FORMAT_KAIROSDB; + else if (strcasecmp("INFLUXDB", string) == 0) + cb->format = WH_FORMAT_INFLUXDB; else { ERROR("write_http plugin: Invalid format string: %s", string); return -1; @@ -508,6 +543,7 @@ static int wh_config_node(oconfig_item_t *ci) { cb->data_ttl = 0; cb->metrics_prefix = strdup(WRITE_HTTP_DEFAULT_PREFIX); cb->curl_stats = NULL; + cb->unix_socket_path = NULL; if (cb->metrics_prefix == NULL) { ERROR("write_http plugin: strdup failed."); @@ -605,7 +641,13 @@ static int wh_config_node(oconfig_item_t *ci) { status = cf_util_get_int(child, &cb->data_ttl); else if (strcasecmp("Prefix", child->key) == 0) status = cf_util_get_string(child, &cb->metrics_prefix); - else { + else if (strcasecmp("UnixSocket", child->key) == 0) { +#ifdef CURL_VERSION_UNIX_SOCKETS + status = cf_util_get_string(child, &cb->unix_socket_path); +#else + WARNING("libcurl < 7.40.0, UnixSocket config is ignored"); +#endif // CURL_VERSION_UNIX_SOCKETS + } else { ERROR("write_http plugin: Invalid configuration " "option: %s.", child->key); diff --git a/src/write_influxdb_udp.c b/src/write_influxdb_udp.c index 1019de084..4f9c9c4cd 100644 --- a/src/write_influxdb_udp.c +++ b/src/write_influxdb_udp.c @@ -27,9 +27,12 @@ #include "plugin.h" #include "utils/common/common.h" +#include "utils/strbuf/strbuf.h" #include "utils_cache.h" #include "utils_complain.h" +#include "utils/format_influxdb/format_influxdb.h" + #if HAVE_NETDB_H #include #endif @@ -339,121 +342,6 @@ static void fill_send_buffer(char const *buffer, size_t len) { pthread_mutex_unlock(&send_buffer_lock); } -static int wifxudp_escape_string(char *buffer, size_t buffer_size, - const char *string) { - - if ((buffer == NULL) || (string == NULL)) - return -EINVAL; - - int dst_pos = 0; - -#define BUFFER_ADD(c) \ - do { \ - if (dst_pos >= (buffer_size - 1)) { \ - buffer[buffer_size - 1] = 0; \ - return dst_pos++; \ - } \ - buffer[dst_pos] = (c); \ - dst_pos++; \ - } while (0) - - /* Escape special characters */ - for (int src_pos = 0; string[src_pos] != 0; src_pos++) { - if ((string[src_pos] == '\\') || (string[src_pos] == ' ') || - (string[src_pos] == ',') || (string[src_pos] == '=') || - (string[src_pos] == '"')) { - BUFFER_ADD('\\'); - BUFFER_ADD(string[src_pos]); - } else - BUFFER_ADD(string[src_pos]); - } /* for */ - buffer[dst_pos] = 0; - -#undef BUFFER_ADD - - return dst_pos; -} /* int wifxudp_escape_string */ - -static int write_influxdb_point(char *buffer, int buffer_len, metric_t metric) { - int offset = 0; - bool have_values = false; - -#define BUFFER_ADD_ESCAPE(...) \ - do { \ - int status = wifxudp_escape_string(buffer + offset, buffer_len - offset, \ - __VA_ARGS__); \ - if (status < 0) \ - return status; \ - offset += status; \ - if (status >= (buffer_len - offset)) \ - return offset; \ - } while (0) - -#define BUFFER_ADD(...) \ - do { \ - int status = snprintf(buffer + offset, buffer_len - offset, __VA_ARGS__); \ - if (status < 0) \ - return status; \ - offset += status; \ - if (status >= (buffer_len - offset)) \ - return offset; \ - } while (0) - - have_values = false; - BUFFER_ADD_ESCAPE(metric.family->name); - for (size_t j = 0; j < metric.label.num; j++) { - label_pair_t label = metric.label.ptr[j]; - BUFFER_ADD(","); - BUFFER_ADD_ESCAPE(label.name); - BUFFER_ADD("="); - BUFFER_ADD_ESCAPE(label.value); - } - BUFFER_ADD(" "); - - if (wifxudp_config_store_rates && - (metric.family->type == METRIC_TYPE_COUNTER)) { - gauge_t rate; - if (uc_get_rate(&metric, &rate) != 0) { - WARNING("write_influxdb_udp plugin: " - "uc_get_rate failed."); - return -1; - } - if (!isnan(rate)) { - BUFFER_ADD("value=" GAUGE_FORMAT, rate); - have_values = true; - } - } else { - switch (metric.family->type) { - case METRIC_TYPE_GAUGE: - case METRIC_TYPE_UNTYPED: - if (!isnan(metric.value.gauge)) { - BUFFER_ADD("value=" GAUGE_FORMAT, metric.value.gauge); - have_values = true; - } - break; - case METRIC_TYPE_COUNTER: - BUFFER_ADD("value=%" PRIi64 "i", metric.value.counter); - have_values = true; - break; - default: - WARNING("write_influxdb_udp plugin: " - "unknown family type."); - return -1; - break; - } - } - - if (!have_values) - return 0; - - BUFFER_ADD(" %" PRIu64 "\n", CDTIME_T_TO_MS(metric.time)); - -#undef BUFFER_ADD_ESCAPE -#undef BUFFER_ADD - - return offset; -} /* int write_influxdb_point */ - static int write_influxdb_udp_write(metric_family_t const *fam, user_data_t __attribute__((unused)) * user_data) { @@ -461,28 +349,27 @@ static int write_influxdb_udp_write(metric_family_t const *fam, return EINVAL; char buffer[NET_DEFAULT_PACKET_SIZE]; - int buffer_len = NET_DEFAULT_PACKET_SIZE; - int offset = 0; + const size_t buffer_len = + (wifxudp_config_packet_size < NET_DEFAULT_PACKET_SIZE + ? wifxudp_config_packet_size + : NET_DEFAULT_PACKET_SIZE); + strbuf_t sb = STRBUF_CREATE_FIXED(buffer, buffer_len); - if (wifxudp_config_packet_size < buffer_len) - buffer_len = wifxudp_config_packet_size; - - for (size_t i = 0; i < fam->metric.num; i++) { + for (size_t i = 0; i < fam->metric.num;) { metric_t metric = fam->metric.ptr[i]; - int status = - write_influxdb_point(buffer + offset, buffer_len - offset, metric); - if (status < 0) { // error + const size_t pos = sb.pos; + int status = format_influxdb_point(&sb, metric, wifxudp_config_store_rates); + if (status == ENOSPC) { + fill_send_buffer(sb.ptr, pos); + strbuf_reset(&sb); + } else if (status != 0) { // error ERROR("write_influxdb_udp plugin: write_influxdb_udp_write failed."); - return -1; - } else if (status >= buffer_len - offset) { // full - fill_send_buffer(buffer, offset); - offset = 0; - buffer[0] = 0; + return status; } else { - offset += status; + ++i; } } - fill_send_buffer(buffer, offset); + fill_send_buffer(sb.ptr, sb.pos); return 0; } /* int write_influxdb_udp_write */ -- 2.47.3