From: Matwey V. Kornilov Date: Thu, 28 Apr 2022 16:29:17 +0000 (+0300) Subject: Some improvements for InfluxDB support (#3938) X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=ab869e58c1873036efd3c687823f070a3a07e6a5;p=thirdparty%2Fcollectd.git Some improvements for InfluxDB support (#3938) * write_http: Make use of CURLOPT_POSTFIELDSIZE CURLOPT_POSTFIELDSIZE allows to specify the data size, which is known in advance and equals to cb->send_buffer_fill. When CURLOPT_POSTFIELDSIZE is not set (or set to -1), then curl evaluates data size using strlen() function, which have O(N) complexity, so we save a few CPU cycles here. Signed-off-by: Matwey V. Kornilov * write_influxdb_udp: Split formatting functions to format_influxdb Signed-off-by: Matwey V. Kornilov * write_http: Add influxdb format Signed-off-by: Matwey V. Kornilov * write_http: Enable using unix socket in libcurl Signed-off-by: Matwey V. Kornilov * meta_data: Introduce basic iterator interface Currently, meta_data supports only the key lookup over forward list data structure, so iterating over the list would take O(N^2). Here we introduce meta_data_iter() and meta_data_iter_next() functions dealing with opaque iterator type. Signed-off-by: Matwey V. Kornilov * format_influxdb: Support serializing meta_data collectd 6.0 supports serializing series labels as influxdb tags. Here we backport this feature serializing string-values meta data keys as influxdb tags. Signed-off-by: Matwey V. Kornilov --- diff --git a/Makefile.am b/Makefile.am index 4f0320751..a7b5de974 100644 --- a/Makefile.am +++ b/Makefile.am @@ -133,6 +133,7 @@ noinst_LTLIBRARIES = \ libavltree.la \ libcmds.la \ libcommon.la \ + libformat_influxdb.la \ libformat_graphite.la \ libformat_json.la \ libheap.la \ @@ -428,6 +429,10 @@ libplugin_mock_la_SOURCES = \ libplugin_mock_la_CPPFLAGS = $(AM_CPPFLAGS) -DMOCK_TIME libplugin_mock_la_LIBADD = libcommon.la libignorelist.la $(COMMON_LIBS) +libformat_influxdb_la_SOURCES = \ + src/utils/format_influxdb/format_influxdb.c \ + src/utils/format_influxdb/format_influxdb.h + libformat_graphite_la_SOURCES = \ src/utils/format_graphite/format_graphite.c \ src/utils/format_graphite/format_graphite.h @@ -2264,7 +2269,7 @@ write_http_la_SOURCES = \ src/utils/format_kairosdb/format_kairosdb.h write_http_la_CFLAGS = $(AM_CFLAGS) $(BUILD_WITH_LIBCURL_CFLAGS) write_http_la_LDFLAGS = $(PLUGIN_LDFLAGS) -write_http_la_LIBADD = libformat_json.la $(BUILD_WITH_LIBCURL_LIBS) +write_http_la_LIBADD = libformat_influxdb.la libformat_json.la $(BUILD_WITH_LIBCURL_LIBS) endif if BUILD_PLUGIN_WRITE_INFLUXDB_UDP @@ -2272,8 +2277,9 @@ pkglib_LTLIBRARIES += write_influxdb_udp.la write_influxdb_udp_la_SOURCES = src/write_influxdb_udp.c write_influxdb_udp_la_CPPFLAGS = $(AM_CPPFLAGS) write_influxdb_udp_la_LDFLAGS = $(PLUGIN_LDFLAGS) +write_influxdb_udp_la_LIBADD = libformat_influxdb.la if BUILD_WITH_LIBSOCKET -write_influxdb_udp_la_LIBADD = -lsocket +write_influxdb_udp_la_LIBADD += -lsocket endif endif diff --git a/src/utils/format_influxdb/format_influxdb.c b/src/utils/format_influxdb/format_influxdb.c new file mode 100644 index 000000000..b216509cc --- /dev/null +++ b/src/utils/format_influxdb/format_influxdb.c @@ -0,0 +1,211 @@ +/** + * collectd - src/utils_format_influxdb.c + * Copyright (C) 2007-2009 Florian octo Forster + * Copyright (C) 2009 Aman Gupta + * Copyright (C) 2019 Carlos Peon Costa + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; only version 2 of the License is applicable. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * Authors: + * Florian octo Forster + * Aman Gupta + * Carlos Peon Costa + * multiple Server directives by: + * Paul (systemcrash) + **/ + +#include "collectd.h" + +#include "plugin.h" +#include "utils/common/common.h" +#include "utils/metadata/meta_data.h" +#include "utils_cache.h" + +#include "utils/format_influxdb/format_influxdb.h" + +static int format_influxdb_escape_string(char *buffer, size_t buffer_size, + const char *string) { + + if ((buffer == NULL) || (string == NULL)) + return -EINVAL; + + if (buffer_size < 3) + return -ENOMEM; + + int dst_pos = 0; + +#define BUFFER_ADD(c) \ + do { \ + if (dst_pos >= (buffer_size - 1)) { \ + buffer[buffer_size - 1] = '\0'; \ + return -ENOMEM; \ + } \ + buffer[dst_pos] = (c); \ + dst_pos++; \ + } while (0) + + /* Escape special characters */ + for (int src_pos = 0; string[src_pos] != 0; src_pos++) { + if ((string[src_pos] == '\\') || (string[src_pos] == ' ') || + (string[src_pos] == ',') || (string[src_pos] == '=') || + (string[src_pos] == '"')) { + BUFFER_ADD('\\'); + BUFFER_ADD(string[src_pos]); + } else + BUFFER_ADD(string[src_pos]); + } /* for */ + buffer[dst_pos] = 0; + +#undef BUFFER_ADD + + return dst_pos; +} /* int format_influxdb_escape_string */ + +int format_influxdb_value_list( + char *buffer, int buffer_len, const data_set_t *ds, const value_list_t *vl, + bool store_rates, format_influxdb_time_precision_t time_precision) { + int status; + int offset = 0; + gauge_t *rates = NULL; + bool have_values = false; + + assert(0 == strcmp(ds->type, vl->type)); + +#define BUFFER_ADD_ESCAPE(...) \ + do { \ + status = format_influxdb_escape_string(buffer + offset, \ + buffer_len - offset, __VA_ARGS__); \ + if (status < 0) \ + return status; \ + offset += status; \ + } while (0) + +#define BUFFER_ADD(...) \ + do { \ + status = snprintf(buffer + offset, buffer_len - offset, __VA_ARGS__); \ + if ((status < 0) || (status >= (buffer_len - offset))) { \ + sfree(rates); \ + return -ENOMEM; \ + } \ + offset += status; \ + } while (0) + + BUFFER_ADD_ESCAPE(vl->plugin); + BUFFER_ADD(",host="); + BUFFER_ADD_ESCAPE(vl->host); + if (strcmp(vl->plugin_instance, "") != 0) { + BUFFER_ADD(",instance="); + BUFFER_ADD_ESCAPE(vl->plugin_instance); + } + if (strcmp(vl->type, "") != 0) { + BUFFER_ADD(",type="); + BUFFER_ADD_ESCAPE(vl->type); + } + if (strcmp(vl->type_instance, "") != 0) { + BUFFER_ADD(",type_instance="); + BUFFER_ADD_ESCAPE(vl->type_instance); + } + if (vl->meta) { + for (meta_entry_t *it = meta_data_iter(vl->meta); it != NULL; + it = meta_data_iter_next(it)) { + const char *key = meta_data_iter_key(it); + char *value; + + if (meta_data_iter_type(it) != MD_TYPE_STRING || + meta_data_iter_get_string(vl->meta, it, &value) != 0) + continue; + + BUFFER_ADD(","); + BUFFER_ADD_ESCAPE(key); + BUFFER_ADD("="); + BUFFER_ADD_ESCAPE(value); + free(value); + } + } + + BUFFER_ADD(" "); + for (size_t i = 0; i < ds->ds_num; i++) { + if ((ds->ds[i].type != DS_TYPE_COUNTER) && + (ds->ds[i].type != DS_TYPE_GAUGE) && + (ds->ds[i].type != DS_TYPE_DERIVE) && + (ds->ds[i].type != DS_TYPE_ABSOLUTE)) { + sfree(rates); + return -EINVAL; + } + + if (ds->ds[i].type == DS_TYPE_GAUGE) { + if (isnan(vl->values[i].gauge)) + continue; + if (have_values) + BUFFER_ADD(","); + BUFFER_ADD("%s=%lf", ds->ds[i].name, vl->values[i].gauge); + have_values = true; + } else if (store_rates) { + if (rates == NULL) + rates = uc_get_rate(ds, vl); + if (rates == NULL) { + WARNING("format_influxdb: " + "uc_get_rate failed."); + return -EINVAL; + } + if (isnan(rates[i])) + continue; + if (have_values) + BUFFER_ADD(","); + BUFFER_ADD("%s=%lf", ds->ds[i].name, rates[i]); + have_values = true; + } else if (ds->ds[i].type == DS_TYPE_COUNTER) { + if (have_values) + BUFFER_ADD(","); + BUFFER_ADD("%s=%" PRIu64 "i", ds->ds[i].name, + (uint64_t)vl->values[i].counter); + have_values = true; + } else if (ds->ds[i].type == DS_TYPE_DERIVE) { + if (have_values) + BUFFER_ADD(","); + BUFFER_ADD("%s=%" PRIi64 "i", ds->ds[i].name, vl->values[i].derive); + have_values = true; + } else if (ds->ds[i].type == DS_TYPE_ABSOLUTE) { + if (have_values) + BUFFER_ADD(","); + BUFFER_ADD("%s=%" PRIu64 "i", ds->ds[i].name, vl->values[i].absolute); + have_values = true; + } + + } /* for ds->ds_num */ + sfree(rates); + + if (!have_values) + return 0; + + uint64_t influxdb_time = 0; + switch (time_precision) { + case NS: + influxdb_time = CDTIME_T_TO_NS(vl->time); + break; + case US: + influxdb_time = CDTIME_T_TO_US(vl->time); + break; + case MS: + influxdb_time = CDTIME_T_TO_MS(vl->time); + break; + } + + BUFFER_ADD(" %" PRIu64 "\n", influxdb_time); + +#undef BUFFER_ADD_ESCAPE +#undef BUFFER_ADD + + return offset; +} /* int format_influxdb_value_list */ diff --git a/src/utils/format_influxdb/format_influxdb.h b/src/utils/format_influxdb/format_influxdb.h new file mode 100644 index 000000000..53f5e88c3 --- /dev/null +++ b/src/utils/format_influxdb/format_influxdb.h @@ -0,0 +1,46 @@ +/** + * collectd - src/utils_format_influxdb.h + * Copyright (C) 2007-2009 Florian octo Forster + * Copyright (C) 2009 Aman Gupta + * Copyright (C) 2019 Carlos Peon Costa + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; only version 2 of the License is applicable. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * Authors: + * Florian octo Forster + * Aman Gupta + * Carlos Peon Costa + * multiple Server directives by: + * Paul (systemcrash) + **/ + +#ifndef UTILS_FORMAT_INFLUXDB_H +#define UTILS_FORMAT_INFLUXDB_H 1 + +#include "collectd.h" + +#include "plugin.h" + +typedef enum { + NS, + US, + MS, +} format_influxdb_time_precision_t; + +int format_influxdb_value_list(char *buffer, int buffer_len, + const data_set_t *ds, const value_list_t *vl, + bool store_rates, + format_influxdb_time_precision_t time_precision); + +#endif /* UTILS_FORMAT_INFLUXDB_H */ diff --git a/src/utils/metadata/meta_data.c b/src/utils/metadata/meta_data.c index 963aebbe9..e23a30045 100644 --- a/src/utils/metadata/meta_data.c +++ b/src/utils/metadata/meta_data.c @@ -44,8 +44,6 @@ union meta_value_u { }; typedef union meta_value_u meta_value_t; -struct meta_entry_s; -typedef struct meta_entry_s meta_entry_t; struct meta_entry_s { char *key; meta_value_t value; @@ -543,10 +541,29 @@ int meta_data_add_boolean(meta_data_t *md, /* {{{ */ /* * Get functions */ +int _meta_data_get_string(meta_data_t *md, meta_entry_t *e, char **value) { + char *temp; + + if (e->type != MD_TYPE_STRING) { + ERROR("meta_data_get_string: Type mismatch for key `%s'", e->key); + return -ENOENT; + } + + temp = md_strdup(e->value.mv_string); + if (temp == NULL) { + ERROR("meta_data_get_string: md_strdup failed."); + return -ENOMEM; + } + + *value = temp; + + return 0; +} + int meta_data_get_string(meta_data_t *md, /* {{{ */ const char *key, char **value) { meta_entry_t *e; - char *temp; + int res = 0; if ((md == NULL) || (key == NULL) || (value == NULL)) return -EINVAL; @@ -559,24 +576,10 @@ int meta_data_get_string(meta_data_t *md, /* {{{ */ return -ENOENT; } - if (e->type != MD_TYPE_STRING) { - ERROR("meta_data_get_string: Type mismatch for key `%s'", e->key); - pthread_mutex_unlock(&md->lock); - return -ENOENT; - } - - temp = md_strdup(e->value.mv_string); - if (temp == NULL) { - pthread_mutex_unlock(&md->lock); - ERROR("meta_data_get_string: md_strdup failed."); - return -ENOMEM; - } + res = _meta_data_get_string(md, e, value); pthread_mutex_unlock(&md->lock); - - *value = temp; - - return 0; + return res; } /* }}} int meta_data_get_string */ int meta_data_get_signed_int(meta_data_t *md, /* {{{ */ @@ -745,3 +748,26 @@ int meta_data_as_string(meta_data_t *md, /* {{{ */ return 0; } /* }}} int meta_data_as_string */ + +meta_entry_t *meta_data_iter(meta_data_t *md) { return md->head; } + +meta_entry_t *meta_data_iter_next(meta_entry_t *iter) { return iter->next; } + +int meta_data_iter_type(meta_entry_t *iter) { return iter->type; } + +const char *meta_data_iter_key(meta_entry_t *iter) { return iter->key; } + +int meta_data_iter_get_string(meta_data_t *md, meta_entry_t *iter, + char **value) { + int res = 0; + + if ((md == NULL) || (iter == NULL) || (value == NULL)) + return -EINVAL; + + pthread_mutex_lock(&md->lock); + + res = _meta_data_get_string(md, iter, value); + + pthread_mutex_unlock(&md->lock); + return res; +} diff --git a/src/utils/metadata/meta_data.h b/src/utils/metadata/meta_data.h index 203b14607..ccb58817d 100644 --- a/src/utils/metadata/meta_data.h +++ b/src/utils/metadata/meta_data.h @@ -41,6 +41,9 @@ struct meta_data_s; typedef struct meta_data_s meta_data_t; +struct meta_entry_s; +typedef struct meta_entry_s meta_entry_t; + meta_data_t *meta_data_create(void); meta_data_t *meta_data_clone(meta_data_t *orig); int meta_data_clone_merge(meta_data_t **dest, meta_data_t *orig); @@ -68,4 +71,11 @@ int meta_data_get_boolean(meta_data_t *md, const char *key, bool *value); /* Returns the value as a string, regardless of the type. */ int meta_data_as_string(meta_data_t *md, const char *key, char **value); +meta_entry_t *meta_data_iter(meta_data_t *md); +meta_entry_t *meta_data_iter_next(meta_entry_t *iter); +int meta_data_iter_type(meta_entry_t *iter); +const char *meta_data_iter_key(meta_entry_t *iter); +int meta_data_iter_get_string(meta_data_t *md, meta_entry_t *iter, + char **value); + #endif /* META_DATA_H */ diff --git a/src/write_http.c b/src/write_http.c index 6da06b42b..3ca55661d 100644 --- a/src/write_http.c +++ b/src/write_http.c @@ -28,6 +28,7 @@ #include "plugin.h" #include "utils/common/common.h" #include "utils/curl_stats/curl_stats.h" +#include "utils/format_influxdb/format_influxdb.h" #include "utils/format_json/format_json.h" #include "utils/format_kairosdb/format_kairosdb.h" @@ -72,6 +73,7 @@ struct wh_callback_s { #define WH_FORMAT_COMMAND 0 #define WH_FORMAT_JSON 1 #define WH_FORMAT_KAIROSDB 2 +#define WH_FORMAT_INFLUXDB 3 int format; bool send_metrics; bool send_notifications; @@ -94,6 +96,8 @@ struct wh_callback_s { int data_ttl; char *metrics_prefix; + + char *unix_socket_path; }; typedef struct wh_callback_s wh_callback_t; @@ -161,11 +165,13 @@ static void wh_reset_buffer(wh_callback_t *cb) /* {{{ */ } /* }}} wh_reset_buffer */ /* must hold cb->send_lock when calling */ -static int wh_post_nolock(wh_callback_t *cb, char const *data) /* {{{ */ +static int wh_post_nolock(wh_callback_t *cb, char const *data, + long size) /* {{{ */ { int status = 0; curl_easy_setopt(cb->curl, CURLOPT_URL, cb->location); + curl_easy_setopt(cb->curl, CURLOPT_POSTFIELDSIZE, size); curl_easy_setopt(cb->curl, CURLOPT_POSTFIELDS, data); curl_easy_setopt(cb->curl, CURLOPT_WRITEFUNCTION, &wh_curl_write_callback); curl_easy_setopt(cb->curl, CURLOPT_WRITEDATA, (void *)cb); @@ -275,6 +281,11 @@ static int wh_callback_init(wh_callback_t *cb) /* {{{ */ if (cb->clientkeypass != NULL) curl_easy_setopt(cb->curl, CURLOPT_SSLKEYPASSWD, cb->clientkeypass); } +#ifdef CURL_VERSION_UNIX_SOCKETS + if (cb->unix_socket_path) { + curl_easy_setopt(cb->curl, CURLOPT_UNIX_SOCKET_PATH, cb->unix_socket_path); + } +#endif // CURL_VERSION_UNIX_SOCKETS wh_reset_buffer(cb); @@ -304,7 +315,7 @@ static int wh_flush_nolock(cdtime_t timeout, wh_callback_t *cb) /* {{{ */ return 0; } - status = wh_post_nolock(cb, cb->send_buffer); + status = wh_post_nolock(cb, cb->send_buffer, cb->send_buffer_fill); wh_reset_buffer(cb); } else if (cb->format == WH_FORMAT_JSON || cb->format == WH_FORMAT_KAIROSDB) { if (cb->send_buffer_fill <= 2) { @@ -321,7 +332,15 @@ static int wh_flush_nolock(cdtime_t timeout, wh_callback_t *cb) /* {{{ */ return status; } - status = wh_post_nolock(cb, cb->send_buffer); + status = wh_post_nolock(cb, cb->send_buffer, cb->send_buffer_fill); + wh_reset_buffer(cb); + } else if (cb->format == WH_FORMAT_INFLUXDB) { + if (cb->send_buffer_fill == 0) { + cb->send_buffer_init_time = cdtime(); + return 0; + } + + status = wh_post_nolock(cb, cb->send_buffer, cb->send_buffer_fill); wh_reset_buffer(cb); } else { ERROR("write_http: wh_flush_nolock: " @@ -573,6 +592,47 @@ static int wh_write_kairosdb(const data_set_t *ds, return 0; } /* }}} int wh_write_kairosdb */ +static int wh_write_influxdb(const data_set_t *ds, + const value_list_t *vl, /* {{{ */ + wh_callback_t *cb) { + int status; + + pthread_mutex_lock(&cb->send_lock); + if (wh_callback_init(cb) != 0) { + ERROR("write_http plugin: wh_callback_init failed."); + pthread_mutex_unlock(&cb->send_lock); + return -1; + } + + status = format_influxdb_value_list(cb->send_buffer + cb->send_buffer_fill, + cb->send_buffer_free, ds, vl, + cb->store_rates, NS); + if (status == -ENOMEM) { + status = wh_flush_nolock(/* timeout = */ 0, cb); + if (status != 0) { + wh_reset_buffer(cb); + pthread_mutex_unlock(&cb->send_lock); + return status; + } + + status = format_influxdb_value_list(cb->send_buffer + cb->send_buffer_fill, + cb->send_buffer_free, ds, vl, + cb->store_rates, NS); + } + if (status < 0) { + pthread_mutex_unlock(&cb->send_lock); + return status; + } + + cb->send_buffer_fill += status; + cb->send_buffer_free -= status; + + /* Check if we have enough space for this command. */ + pthread_mutex_unlock(&cb->send_lock); + + return 0; +} /* }}} int wh_write_influxdb */ + static int wh_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */ user_data_t *user_data) { wh_callback_t *cb; @@ -591,6 +651,9 @@ static int wh_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */ case WH_FORMAT_KAIROSDB: status = wh_write_kairosdb(ds, vl, cb); break; + case WH_FORMAT_INFLUXDB: + status = wh_write_influxdb(ds, vl, cb); + break; default: status = wh_write_command(ds, vl, cb); break; @@ -623,7 +686,7 @@ static int wh_notify(notification_t const *n, user_data_t *ud) /* {{{ */ return -1; } - status = wh_post_nolock(cb, alert); + status = wh_post_nolock(cb, alert, -1); pthread_mutex_unlock(&cb->send_lock); return status; @@ -647,6 +710,8 @@ static int config_set_format(wh_callback_t *cb, /* {{{ */ cb->format = WH_FORMAT_JSON; else if (strcasecmp("KAIROSDB", string) == 0) cb->format = WH_FORMAT_KAIROSDB; + else if (strcasecmp("INFLUXDB", string) == 0) + cb->format = WH_FORMAT_INFLUXDB; else { ERROR("write_http plugin: Invalid format string: %s", string); return -1; @@ -698,6 +763,7 @@ static int wh_config_node(oconfig_item_t *ci) /* {{{ */ cb->data_ttl = 0; cb->metrics_prefix = strdup(WRITE_HTTP_DEFAULT_PREFIX); cb->curl_stats = NULL; + cb->unix_socket_path = NULL; if (cb->metrics_prefix == NULL) { ERROR("write_http plugin: strdup failed."); @@ -821,6 +887,12 @@ static int wh_config_node(oconfig_item_t *ci) /* {{{ */ status = cf_util_get_int(child, &cb->data_ttl); } else if (strcasecmp("Prefix", child->key) == 0) { status = cf_util_get_string(child, &cb->metrics_prefix); + } else if (strcasecmp("UnixSocket", child->key) == 0) { +#ifdef CURL_VERSION_UNIX_SOCKETS + status = cf_util_get_string(child, &cb->unix_socket_path); +#else + WARNING("libcurl < 7.40.0, UnixSocket config is ignored"); +#endif // CURL_VERSION_UNIX_SOCKETS } else { ERROR("write_http plugin: Invalid configuration " "option: %s.", diff --git a/src/write_influxdb_udp.c b/src/write_influxdb_udp.c index 6b4ddf3b6..7c85a28e2 100644 --- a/src/write_influxdb_udp.c +++ b/src/write_influxdb_udp.c @@ -32,6 +32,8 @@ #include "utils_cache.h" #include "utils_complain.h" +#include "utils/format_influxdb/format_influxdb.h" + #if HAVE_NETDB_H #include #endif @@ -70,12 +72,6 @@ typedef struct sockent { #define NET_DEFAULT_PACKET_SIZE 1452 #define NET_DEFAULT_PORT "8089" -typedef enum { - NS, - US, - MS, -} wifxudp_time_precision_t; - /* * Private variables */ @@ -83,7 +79,7 @@ typedef enum { static int wifxudp_config_ttl; static size_t wifxudp_config_packet_size = NET_DEFAULT_PACKET_SIZE; static bool wifxudp_config_store_rates; -static wifxudp_time_precision_t wifxudp_config_time_precision = MS; +static format_influxdb_time_precision_t wifxudp_config_time_precision = MS; static sockent_t *sending_sockets; @@ -353,170 +349,14 @@ static void flush_buffer(void) { write_influxdb_udp_init_buffer(); } -static int wifxudp_escape_string(char *buffer, size_t buffer_size, - const char *string) { - - if ((buffer == NULL) || (string == NULL)) - return -EINVAL; - - if (buffer_size < 3) - return -ENOMEM; - - int dst_pos = 0; - -#define BUFFER_ADD(c) \ - do { \ - if (dst_pos >= (buffer_size - 1)) { \ - buffer[buffer_size - 1] = '\0'; \ - return -ENOMEM; \ - } \ - buffer[dst_pos] = (c); \ - dst_pos++; \ - } while (0) - - /* Escape special characters */ - for (int src_pos = 0; string[src_pos] != 0; src_pos++) { - if ((string[src_pos] == '\\') || (string[src_pos] == ' ') || - (string[src_pos] == ',') || (string[src_pos] == '=') || - (string[src_pos] == '"')) { - BUFFER_ADD('\\'); - BUFFER_ADD(string[src_pos]); - } else - BUFFER_ADD(string[src_pos]); - } /* for */ - buffer[dst_pos] = 0; - -#undef BUFFER_ADD - - return dst_pos; -} /* int wifxudp_escape_string */ - -static int write_influxdb_point(char *buffer, int buffer_len, - const data_set_t *ds, const value_list_t *vl) { - int status; - int offset = 0; - gauge_t *rates = NULL; - bool have_values = false; - - assert(0 == strcmp(ds->type, vl->type)); - -#define BUFFER_ADD_ESCAPE(...) \ - do { \ - status = wifxudp_escape_string(buffer + offset, buffer_len - offset, \ - __VA_ARGS__); \ - if (status < 0) \ - return -1; \ - offset += status; \ - } while (0) - -#define BUFFER_ADD(...) \ - do { \ - status = snprintf(buffer + offset, buffer_len - offset, __VA_ARGS__); \ - if ((status < 0) || (status >= (buffer_len - offset))) { \ - sfree(rates); \ - return -1; \ - } \ - offset += status; \ - } while (0) - - BUFFER_ADD_ESCAPE(vl->plugin); - BUFFER_ADD(",host="); - BUFFER_ADD_ESCAPE(vl->host); - if (strcmp(vl->plugin_instance, "") != 0) { - BUFFER_ADD(",instance="); - BUFFER_ADD_ESCAPE(vl->plugin_instance); - } - if (strcmp(vl->type, "") != 0) { - BUFFER_ADD(",type="); - BUFFER_ADD_ESCAPE(vl->type); - } - if (strcmp(vl->type_instance, "") != 0) { - BUFFER_ADD(",type_instance="); - BUFFER_ADD_ESCAPE(vl->type_instance); - } - - BUFFER_ADD(" "); - for (size_t i = 0; i < ds->ds_num; i++) { - if ((ds->ds[i].type != DS_TYPE_COUNTER) && - (ds->ds[i].type != DS_TYPE_GAUGE) && - (ds->ds[i].type != DS_TYPE_DERIVE) && - (ds->ds[i].type != DS_TYPE_ABSOLUTE)) { - sfree(rates); - return -1; - } - - if (ds->ds[i].type == DS_TYPE_GAUGE) { - if (isnan(vl->values[i].gauge)) - continue; - if (have_values) - BUFFER_ADD(","); - BUFFER_ADD("%s=%lf", ds->ds[i].name, vl->values[i].gauge); - have_values = true; - } else if (wifxudp_config_store_rates) { - if (rates == NULL) - rates = uc_get_rate(ds, vl); - if (rates == NULL) { - WARNING("write_influxdb_udp plugin: " - "uc_get_rate failed."); - return -1; - } - if (isnan(rates[i])) - continue; - if (have_values) - BUFFER_ADD(","); - BUFFER_ADD("%s=%lf", ds->ds[i].name, rates[i]); - have_values = true; - } else if (ds->ds[i].type == DS_TYPE_COUNTER) { - if (have_values) - BUFFER_ADD(","); - BUFFER_ADD("%s=%" PRIu64 "i", ds->ds[i].name, - (uint64_t)vl->values[i].counter); - have_values = true; - } else if (ds->ds[i].type == DS_TYPE_DERIVE) { - if (have_values) - BUFFER_ADD(","); - BUFFER_ADD("%s=%" PRIi64 "i", ds->ds[i].name, vl->values[i].derive); - have_values = true; - } else if (ds->ds[i].type == DS_TYPE_ABSOLUTE) { - if (have_values) - BUFFER_ADD(","); - BUFFER_ADD("%s=%" PRIu64 "i", ds->ds[i].name, vl->values[i].absolute); - have_values = true; - } - - } /* for ds->ds_num */ - sfree(rates); - - if (!have_values) - return 0; - - uint64_t influxdb_time = 0; - switch (wifxudp_config_time_precision) { - case NS: - influxdb_time = CDTIME_T_TO_NS(vl->time); - break; - case US: - influxdb_time = CDTIME_T_TO_US(vl->time); - break; - case MS: - influxdb_time = CDTIME_T_TO_MS(vl->time); - break; - } - - BUFFER_ADD(" %" PRIu64 "\n", influxdb_time); - -#undef BUFFER_ADD_ESCAPE -#undef BUFFER_ADD - - return offset; -} /* int write_influxdb_point */ - static int write_influxdb_udp_write(const data_set_t *ds, const value_list_t *vl, user_data_t __attribute__((unused)) * user_data) { char buffer[NET_DEFAULT_PACKET_SIZE]; - int status = write_influxdb_point(buffer, NET_DEFAULT_PACKET_SIZE, ds, vl); + int status = format_influxdb_value_list(buffer, NET_DEFAULT_PACKET_SIZE, ds, + vl, wifxudp_config_store_rates, + wifxudp_config_time_precision); if (status < 0) { ERROR("write_influxdb_udp plugin: write_influxdb_udp_write failed.");