libavltree.la \
libcmds.la \
libcommon.la \
+ libformat_influxdb.la \
libformat_graphite.la \
libformat_json.la \
libheap.la \
libplugin_mock_la_CPPFLAGS = $(AM_CPPFLAGS) -DMOCK_TIME
libplugin_mock_la_LIBADD = libcommon.la libignorelist.la $(COMMON_LIBS)
+libformat_influxdb_la_SOURCES = \
+ src/utils/format_influxdb/format_influxdb.c \
+ src/utils/format_influxdb/format_influxdb.h
+
libformat_graphite_la_SOURCES = \
src/utils/format_graphite/format_graphite.c \
src/utils/format_graphite/format_graphite.h
src/utils/format_kairosdb/format_kairosdb.h
write_http_la_CFLAGS = $(AM_CFLAGS) $(BUILD_WITH_LIBCURL_CFLAGS)
write_http_la_LDFLAGS = $(PLUGIN_LDFLAGS)
-write_http_la_LIBADD = libformat_json.la $(BUILD_WITH_LIBCURL_LIBS)
+write_http_la_LIBADD = libformat_influxdb.la libformat_json.la $(BUILD_WITH_LIBCURL_LIBS)
endif
if BUILD_PLUGIN_WRITE_INFLUXDB_UDP
write_influxdb_udp_la_SOURCES = src/write_influxdb_udp.c
write_influxdb_udp_la_CPPFLAGS = $(AM_CPPFLAGS)
write_influxdb_udp_la_LDFLAGS = $(PLUGIN_LDFLAGS)
+write_influxdb_udp_la_LIBADD = libformat_influxdb.la
if BUILD_WITH_LIBSOCKET
-write_influxdb_udp_la_LIBADD = -lsocket
+write_influxdb_udp_la_LIBADD += -lsocket
endif
endif
--- /dev/null
+/**
+ * collectd - src/utils_format_influxdb.c
+ * Copyright (C) 2007-2009 Florian octo Forster
+ * Copyright (C) 2009 Aman Gupta
+ * Copyright (C) 2019 Carlos Peon Costa
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; only version 2 of the License is applicable.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ *
+ * Authors:
+ * Florian octo Forster <octo at collectd.org>
+ * Aman Gupta <aman at tmm1.net>
+ * Carlos Peon Costa <carlospeon at gmail.com>
+ * multiple Server directives by:
+ * Paul (systemcrash) <newtwen thatfunny_at_symbol gmail.com>
+ **/
+
+#include "collectd.h"
+
+#include "plugin.h"
+#include "utils/common/common.h"
+#include "utils/metadata/meta_data.h"
+#include "utils_cache.h"
+
+#include "utils/format_influxdb/format_influxdb.h"
+
+static int format_influxdb_escape_string(char *buffer, size_t buffer_size,
+ const char *string) {
+
+ if ((buffer == NULL) || (string == NULL))
+ return -EINVAL;
+
+ if (buffer_size < 3)
+ return -ENOMEM;
+
+ int dst_pos = 0;
+
+#define BUFFER_ADD(c) \
+ do { \
+ if (dst_pos >= (buffer_size - 1)) { \
+ buffer[buffer_size - 1] = '\0'; \
+ return -ENOMEM; \
+ } \
+ buffer[dst_pos] = (c); \
+ dst_pos++; \
+ } while (0)
+
+ /* Escape special characters */
+ for (int src_pos = 0; string[src_pos] != 0; src_pos++) {
+ if ((string[src_pos] == '\\') || (string[src_pos] == ' ') ||
+ (string[src_pos] == ',') || (string[src_pos] == '=') ||
+ (string[src_pos] == '"')) {
+ BUFFER_ADD('\\');
+ BUFFER_ADD(string[src_pos]);
+ } else
+ BUFFER_ADD(string[src_pos]);
+ } /* for */
+ buffer[dst_pos] = 0;
+
+#undef BUFFER_ADD
+
+ return dst_pos;
+} /* int format_influxdb_escape_string */
+
+int format_influxdb_value_list(
+ char *buffer, int buffer_len, const data_set_t *ds, const value_list_t *vl,
+ bool store_rates, format_influxdb_time_precision_t time_precision) {
+ int status;
+ int offset = 0;
+ gauge_t *rates = NULL;
+ bool have_values = false;
+
+ assert(0 == strcmp(ds->type, vl->type));
+
+#define BUFFER_ADD_ESCAPE(...) \
+ do { \
+ status = format_influxdb_escape_string(buffer + offset, \
+ buffer_len - offset, __VA_ARGS__); \
+ if (status < 0) \
+ return status; \
+ offset += status; \
+ } while (0)
+
+#define BUFFER_ADD(...) \
+ do { \
+ status = snprintf(buffer + offset, buffer_len - offset, __VA_ARGS__); \
+ if ((status < 0) || (status >= (buffer_len - offset))) { \
+ sfree(rates); \
+ return -ENOMEM; \
+ } \
+ offset += status; \
+ } while (0)
+
+ BUFFER_ADD_ESCAPE(vl->plugin);
+ BUFFER_ADD(",host=");
+ BUFFER_ADD_ESCAPE(vl->host);
+ if (strcmp(vl->plugin_instance, "") != 0) {
+ BUFFER_ADD(",instance=");
+ BUFFER_ADD_ESCAPE(vl->plugin_instance);
+ }
+ if (strcmp(vl->type, "") != 0) {
+ BUFFER_ADD(",type=");
+ BUFFER_ADD_ESCAPE(vl->type);
+ }
+ if (strcmp(vl->type_instance, "") != 0) {
+ BUFFER_ADD(",type_instance=");
+ BUFFER_ADD_ESCAPE(vl->type_instance);
+ }
+ if (vl->meta) {
+ for (meta_entry_t *it = meta_data_iter(vl->meta); it != NULL;
+ it = meta_data_iter_next(it)) {
+ const char *key = meta_data_iter_key(it);
+ char *value;
+
+ if (meta_data_iter_type(it) != MD_TYPE_STRING ||
+ meta_data_iter_get_string(vl->meta, it, &value) != 0)
+ continue;
+
+ BUFFER_ADD(",");
+ BUFFER_ADD_ESCAPE(key);
+ BUFFER_ADD("=");
+ BUFFER_ADD_ESCAPE(value);
+ free(value);
+ }
+ }
+
+ BUFFER_ADD(" ");
+ for (size_t i = 0; i < ds->ds_num; i++) {
+ if ((ds->ds[i].type != DS_TYPE_COUNTER) &&
+ (ds->ds[i].type != DS_TYPE_GAUGE) &&
+ (ds->ds[i].type != DS_TYPE_DERIVE) &&
+ (ds->ds[i].type != DS_TYPE_ABSOLUTE)) {
+ sfree(rates);
+ return -EINVAL;
+ }
+
+ if (ds->ds[i].type == DS_TYPE_GAUGE) {
+ if (isnan(vl->values[i].gauge))
+ continue;
+ if (have_values)
+ BUFFER_ADD(",");
+ BUFFER_ADD("%s=%lf", ds->ds[i].name, vl->values[i].gauge);
+ have_values = true;
+ } else if (store_rates) {
+ if (rates == NULL)
+ rates = uc_get_rate(ds, vl);
+ if (rates == NULL) {
+ WARNING("format_influxdb: "
+ "uc_get_rate failed.");
+ return -EINVAL;
+ }
+ if (isnan(rates[i]))
+ continue;
+ if (have_values)
+ BUFFER_ADD(",");
+ BUFFER_ADD("%s=%lf", ds->ds[i].name, rates[i]);
+ have_values = true;
+ } else if (ds->ds[i].type == DS_TYPE_COUNTER) {
+ if (have_values)
+ BUFFER_ADD(",");
+ BUFFER_ADD("%s=%" PRIu64 "i", ds->ds[i].name,
+ (uint64_t)vl->values[i].counter);
+ have_values = true;
+ } else if (ds->ds[i].type == DS_TYPE_DERIVE) {
+ if (have_values)
+ BUFFER_ADD(",");
+ BUFFER_ADD("%s=%" PRIi64 "i", ds->ds[i].name, vl->values[i].derive);
+ have_values = true;
+ } else if (ds->ds[i].type == DS_TYPE_ABSOLUTE) {
+ if (have_values)
+ BUFFER_ADD(",");
+ BUFFER_ADD("%s=%" PRIu64 "i", ds->ds[i].name, vl->values[i].absolute);
+ have_values = true;
+ }
+
+ } /* for ds->ds_num */
+ sfree(rates);
+
+ if (!have_values)
+ return 0;
+
+ uint64_t influxdb_time = 0;
+ switch (time_precision) {
+ case NS:
+ influxdb_time = CDTIME_T_TO_NS(vl->time);
+ break;
+ case US:
+ influxdb_time = CDTIME_T_TO_US(vl->time);
+ break;
+ case MS:
+ influxdb_time = CDTIME_T_TO_MS(vl->time);
+ break;
+ }
+
+ BUFFER_ADD(" %" PRIu64 "\n", influxdb_time);
+
+#undef BUFFER_ADD_ESCAPE
+#undef BUFFER_ADD
+
+ return offset;
+} /* int format_influxdb_value_list */
--- /dev/null
+/**
+ * collectd - src/utils_format_influxdb.h
+ * Copyright (C) 2007-2009 Florian octo Forster
+ * Copyright (C) 2009 Aman Gupta
+ * Copyright (C) 2019 Carlos Peon Costa
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; only version 2 of the License is applicable.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ *
+ * Authors:
+ * Florian octo Forster <octo at collectd.org>
+ * Aman Gupta <aman at tmm1.net>
+ * Carlos Peon Costa <carlospeon at gmail.com>
+ * multiple Server directives by:
+ * Paul (systemcrash) <newtwen thatfunny_at_symbol gmail.com>
+ **/
+
+#ifndef UTILS_FORMAT_INFLUXDB_H
+#define UTILS_FORMAT_INFLUXDB_H 1
+
+#include "collectd.h"
+
+#include "plugin.h"
+
+typedef enum {
+ NS,
+ US,
+ MS,
+} format_influxdb_time_precision_t;
+
+int format_influxdb_value_list(char *buffer, int buffer_len,
+ const data_set_t *ds, const value_list_t *vl,
+ bool store_rates,
+ format_influxdb_time_precision_t time_precision);
+
+#endif /* UTILS_FORMAT_INFLUXDB_H */
};
typedef union meta_value_u meta_value_t;
-struct meta_entry_s;
-typedef struct meta_entry_s meta_entry_t;
struct meta_entry_s {
char *key;
meta_value_t value;
/*
* Get functions
*/
+int _meta_data_get_string(meta_data_t *md, meta_entry_t *e, char **value) {
+ char *temp;
+
+ if (e->type != MD_TYPE_STRING) {
+ ERROR("meta_data_get_string: Type mismatch for key `%s'", e->key);
+ return -ENOENT;
+ }
+
+ temp = md_strdup(e->value.mv_string);
+ if (temp == NULL) {
+ ERROR("meta_data_get_string: md_strdup failed.");
+ return -ENOMEM;
+ }
+
+ *value = temp;
+
+ return 0;
+}
+
int meta_data_get_string(meta_data_t *md, /* {{{ */
const char *key, char **value) {
meta_entry_t *e;
- char *temp;
+ int res = 0;
if ((md == NULL) || (key == NULL) || (value == NULL))
return -EINVAL;
return -ENOENT;
}
- if (e->type != MD_TYPE_STRING) {
- ERROR("meta_data_get_string: Type mismatch for key `%s'", e->key);
- pthread_mutex_unlock(&md->lock);
- return -ENOENT;
- }
-
- temp = md_strdup(e->value.mv_string);
- if (temp == NULL) {
- pthread_mutex_unlock(&md->lock);
- ERROR("meta_data_get_string: md_strdup failed.");
- return -ENOMEM;
- }
+ res = _meta_data_get_string(md, e, value);
pthread_mutex_unlock(&md->lock);
-
- *value = temp;
-
- return 0;
+ return res;
} /* }}} int meta_data_get_string */
int meta_data_get_signed_int(meta_data_t *md, /* {{{ */
return 0;
} /* }}} int meta_data_as_string */
+
+meta_entry_t *meta_data_iter(meta_data_t *md) { return md->head; }
+
+meta_entry_t *meta_data_iter_next(meta_entry_t *iter) { return iter->next; }
+
+int meta_data_iter_type(meta_entry_t *iter) { return iter->type; }
+
+const char *meta_data_iter_key(meta_entry_t *iter) { return iter->key; }
+
+int meta_data_iter_get_string(meta_data_t *md, meta_entry_t *iter,
+ char **value) {
+ int res = 0;
+
+ if ((md == NULL) || (iter == NULL) || (value == NULL))
+ return -EINVAL;
+
+ pthread_mutex_lock(&md->lock);
+
+ res = _meta_data_get_string(md, iter, value);
+
+ pthread_mutex_unlock(&md->lock);
+ return res;
+}
struct meta_data_s;
typedef struct meta_data_s meta_data_t;
+struct meta_entry_s;
+typedef struct meta_entry_s meta_entry_t;
+
meta_data_t *meta_data_create(void);
meta_data_t *meta_data_clone(meta_data_t *orig);
int meta_data_clone_merge(meta_data_t **dest, meta_data_t *orig);
/* Returns the value as a string, regardless of the type. */
int meta_data_as_string(meta_data_t *md, const char *key, char **value);
+meta_entry_t *meta_data_iter(meta_data_t *md);
+meta_entry_t *meta_data_iter_next(meta_entry_t *iter);
+int meta_data_iter_type(meta_entry_t *iter);
+const char *meta_data_iter_key(meta_entry_t *iter);
+int meta_data_iter_get_string(meta_data_t *md, meta_entry_t *iter,
+ char **value);
+
#endif /* META_DATA_H */
#include "plugin.h"
#include "utils/common/common.h"
#include "utils/curl_stats/curl_stats.h"
+#include "utils/format_influxdb/format_influxdb.h"
#include "utils/format_json/format_json.h"
#include "utils/format_kairosdb/format_kairosdb.h"
#define WH_FORMAT_COMMAND 0
#define WH_FORMAT_JSON 1
#define WH_FORMAT_KAIROSDB 2
+#define WH_FORMAT_INFLUXDB 3
int format;
bool send_metrics;
bool send_notifications;
int data_ttl;
char *metrics_prefix;
+
+ char *unix_socket_path;
};
typedef struct wh_callback_s wh_callback_t;
} /* }}} wh_reset_buffer */
/* must hold cb->send_lock when calling */
-static int wh_post_nolock(wh_callback_t *cb, char const *data) /* {{{ */
+static int wh_post_nolock(wh_callback_t *cb, char const *data,
+ long size) /* {{{ */
{
int status = 0;
curl_easy_setopt(cb->curl, CURLOPT_URL, cb->location);
+ curl_easy_setopt(cb->curl, CURLOPT_POSTFIELDSIZE, size);
curl_easy_setopt(cb->curl, CURLOPT_POSTFIELDS, data);
curl_easy_setopt(cb->curl, CURLOPT_WRITEFUNCTION, &wh_curl_write_callback);
curl_easy_setopt(cb->curl, CURLOPT_WRITEDATA, (void *)cb);
if (cb->clientkeypass != NULL)
curl_easy_setopt(cb->curl, CURLOPT_SSLKEYPASSWD, cb->clientkeypass);
}
+#ifdef CURL_VERSION_UNIX_SOCKETS
+ if (cb->unix_socket_path) {
+ curl_easy_setopt(cb->curl, CURLOPT_UNIX_SOCKET_PATH, cb->unix_socket_path);
+ }
+#endif // CURL_VERSION_UNIX_SOCKETS
wh_reset_buffer(cb);
return 0;
}
- status = wh_post_nolock(cb, cb->send_buffer);
+ status = wh_post_nolock(cb, cb->send_buffer, cb->send_buffer_fill);
wh_reset_buffer(cb);
} else if (cb->format == WH_FORMAT_JSON || cb->format == WH_FORMAT_KAIROSDB) {
if (cb->send_buffer_fill <= 2) {
return status;
}
- status = wh_post_nolock(cb, cb->send_buffer);
+ status = wh_post_nolock(cb, cb->send_buffer, cb->send_buffer_fill);
+ wh_reset_buffer(cb);
+ } else if (cb->format == WH_FORMAT_INFLUXDB) {
+ if (cb->send_buffer_fill == 0) {
+ cb->send_buffer_init_time = cdtime();
+ return 0;
+ }
+
+ status = wh_post_nolock(cb, cb->send_buffer, cb->send_buffer_fill);
wh_reset_buffer(cb);
} else {
ERROR("write_http: wh_flush_nolock: "
return 0;
} /* }}} int wh_write_kairosdb */
+static int wh_write_influxdb(const data_set_t *ds,
+ const value_list_t *vl, /* {{{ */
+ wh_callback_t *cb) {
+ int status;
+
+ pthread_mutex_lock(&cb->send_lock);
+ if (wh_callback_init(cb) != 0) {
+ ERROR("write_http plugin: wh_callback_init failed.");
+ pthread_mutex_unlock(&cb->send_lock);
+ return -1;
+ }
+
+ status = format_influxdb_value_list(cb->send_buffer + cb->send_buffer_fill,
+ cb->send_buffer_free, ds, vl,
+ cb->store_rates, NS);
+ if (status == -ENOMEM) {
+ status = wh_flush_nolock(/* timeout = */ 0, cb);
+ if (status != 0) {
+ wh_reset_buffer(cb);
+ pthread_mutex_unlock(&cb->send_lock);
+ return status;
+ }
+
+ status = format_influxdb_value_list(cb->send_buffer + cb->send_buffer_fill,
+ cb->send_buffer_free, ds, vl,
+ cb->store_rates, NS);
+ }
+ if (status < 0) {
+ pthread_mutex_unlock(&cb->send_lock);
+ return status;
+ }
+
+ cb->send_buffer_fill += status;
+ cb->send_buffer_free -= status;
+
+ /* Check if we have enough space for this command. */
+ pthread_mutex_unlock(&cb->send_lock);
+
+ return 0;
+} /* }}} int wh_write_influxdb */
+
static int wh_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
user_data_t *user_data) {
wh_callback_t *cb;
case WH_FORMAT_KAIROSDB:
status = wh_write_kairosdb(ds, vl, cb);
break;
+ case WH_FORMAT_INFLUXDB:
+ status = wh_write_influxdb(ds, vl, cb);
+ break;
default:
status = wh_write_command(ds, vl, cb);
break;
return -1;
}
- status = wh_post_nolock(cb, alert);
+ status = wh_post_nolock(cb, alert, -1);
pthread_mutex_unlock(&cb->send_lock);
return status;
cb->format = WH_FORMAT_JSON;
else if (strcasecmp("KAIROSDB", string) == 0)
cb->format = WH_FORMAT_KAIROSDB;
+ else if (strcasecmp("INFLUXDB", string) == 0)
+ cb->format = WH_FORMAT_INFLUXDB;
else {
ERROR("write_http plugin: Invalid format string: %s", string);
return -1;
cb->data_ttl = 0;
cb->metrics_prefix = strdup(WRITE_HTTP_DEFAULT_PREFIX);
cb->curl_stats = NULL;
+ cb->unix_socket_path = NULL;
if (cb->metrics_prefix == NULL) {
ERROR("write_http plugin: strdup failed.");
status = cf_util_get_int(child, &cb->data_ttl);
} else if (strcasecmp("Prefix", child->key) == 0) {
status = cf_util_get_string(child, &cb->metrics_prefix);
+ } else if (strcasecmp("UnixSocket", child->key) == 0) {
+#ifdef CURL_VERSION_UNIX_SOCKETS
+ status = cf_util_get_string(child, &cb->unix_socket_path);
+#else
+ WARNING("libcurl < 7.40.0, UnixSocket config is ignored");
+#endif // CURL_VERSION_UNIX_SOCKETS
} else {
ERROR("write_http plugin: Invalid configuration "
"option: %s.",
#include "utils_cache.h"
#include "utils_complain.h"
+#include "utils/format_influxdb/format_influxdb.h"
+
#if HAVE_NETDB_H
#include <netdb.h>
#endif
#define NET_DEFAULT_PACKET_SIZE 1452
#define NET_DEFAULT_PORT "8089"
-typedef enum {
- NS,
- US,
- MS,
-} wifxudp_time_precision_t;
-
/*
* Private variables
*/
static int wifxudp_config_ttl;
static size_t wifxudp_config_packet_size = NET_DEFAULT_PACKET_SIZE;
static bool wifxudp_config_store_rates;
-static wifxudp_time_precision_t wifxudp_config_time_precision = MS;
+static format_influxdb_time_precision_t wifxudp_config_time_precision = MS;
static sockent_t *sending_sockets;
write_influxdb_udp_init_buffer();
}
-static int wifxudp_escape_string(char *buffer, size_t buffer_size,
- const char *string) {
-
- if ((buffer == NULL) || (string == NULL))
- return -EINVAL;
-
- if (buffer_size < 3)
- return -ENOMEM;
-
- int dst_pos = 0;
-
-#define BUFFER_ADD(c) \
- do { \
- if (dst_pos >= (buffer_size - 1)) { \
- buffer[buffer_size - 1] = '\0'; \
- return -ENOMEM; \
- } \
- buffer[dst_pos] = (c); \
- dst_pos++; \
- } while (0)
-
- /* Escape special characters */
- for (int src_pos = 0; string[src_pos] != 0; src_pos++) {
- if ((string[src_pos] == '\\') || (string[src_pos] == ' ') ||
- (string[src_pos] == ',') || (string[src_pos] == '=') ||
- (string[src_pos] == '"')) {
- BUFFER_ADD('\\');
- BUFFER_ADD(string[src_pos]);
- } else
- BUFFER_ADD(string[src_pos]);
- } /* for */
- buffer[dst_pos] = 0;
-
-#undef BUFFER_ADD
-
- return dst_pos;
-} /* int wifxudp_escape_string */
-
-static int write_influxdb_point(char *buffer, int buffer_len,
- const data_set_t *ds, const value_list_t *vl) {
- int status;
- int offset = 0;
- gauge_t *rates = NULL;
- bool have_values = false;
-
- assert(0 == strcmp(ds->type, vl->type));
-
-#define BUFFER_ADD_ESCAPE(...) \
- do { \
- status = wifxudp_escape_string(buffer + offset, buffer_len - offset, \
- __VA_ARGS__); \
- if (status < 0) \
- return -1; \
- offset += status; \
- } while (0)
-
-#define BUFFER_ADD(...) \
- do { \
- status = snprintf(buffer + offset, buffer_len - offset, __VA_ARGS__); \
- if ((status < 0) || (status >= (buffer_len - offset))) { \
- sfree(rates); \
- return -1; \
- } \
- offset += status; \
- } while (0)
-
- BUFFER_ADD_ESCAPE(vl->plugin);
- BUFFER_ADD(",host=");
- BUFFER_ADD_ESCAPE(vl->host);
- if (strcmp(vl->plugin_instance, "") != 0) {
- BUFFER_ADD(",instance=");
- BUFFER_ADD_ESCAPE(vl->plugin_instance);
- }
- if (strcmp(vl->type, "") != 0) {
- BUFFER_ADD(",type=");
- BUFFER_ADD_ESCAPE(vl->type);
- }
- if (strcmp(vl->type_instance, "") != 0) {
- BUFFER_ADD(",type_instance=");
- BUFFER_ADD_ESCAPE(vl->type_instance);
- }
-
- BUFFER_ADD(" ");
- for (size_t i = 0; i < ds->ds_num; i++) {
- if ((ds->ds[i].type != DS_TYPE_COUNTER) &&
- (ds->ds[i].type != DS_TYPE_GAUGE) &&
- (ds->ds[i].type != DS_TYPE_DERIVE) &&
- (ds->ds[i].type != DS_TYPE_ABSOLUTE)) {
- sfree(rates);
- return -1;
- }
-
- if (ds->ds[i].type == DS_TYPE_GAUGE) {
- if (isnan(vl->values[i].gauge))
- continue;
- if (have_values)
- BUFFER_ADD(",");
- BUFFER_ADD("%s=%lf", ds->ds[i].name, vl->values[i].gauge);
- have_values = true;
- } else if (wifxudp_config_store_rates) {
- if (rates == NULL)
- rates = uc_get_rate(ds, vl);
- if (rates == NULL) {
- WARNING("write_influxdb_udp plugin: "
- "uc_get_rate failed.");
- return -1;
- }
- if (isnan(rates[i]))
- continue;
- if (have_values)
- BUFFER_ADD(",");
- BUFFER_ADD("%s=%lf", ds->ds[i].name, rates[i]);
- have_values = true;
- } else if (ds->ds[i].type == DS_TYPE_COUNTER) {
- if (have_values)
- BUFFER_ADD(",");
- BUFFER_ADD("%s=%" PRIu64 "i", ds->ds[i].name,
- (uint64_t)vl->values[i].counter);
- have_values = true;
- } else if (ds->ds[i].type == DS_TYPE_DERIVE) {
- if (have_values)
- BUFFER_ADD(",");
- BUFFER_ADD("%s=%" PRIi64 "i", ds->ds[i].name, vl->values[i].derive);
- have_values = true;
- } else if (ds->ds[i].type == DS_TYPE_ABSOLUTE) {
- if (have_values)
- BUFFER_ADD(",");
- BUFFER_ADD("%s=%" PRIu64 "i", ds->ds[i].name, vl->values[i].absolute);
- have_values = true;
- }
-
- } /* for ds->ds_num */
- sfree(rates);
-
- if (!have_values)
- return 0;
-
- uint64_t influxdb_time = 0;
- switch (wifxudp_config_time_precision) {
- case NS:
- influxdb_time = CDTIME_T_TO_NS(vl->time);
- break;
- case US:
- influxdb_time = CDTIME_T_TO_US(vl->time);
- break;
- case MS:
- influxdb_time = CDTIME_T_TO_MS(vl->time);
- break;
- }
-
- BUFFER_ADD(" %" PRIu64 "\n", influxdb_time);
-
-#undef BUFFER_ADD_ESCAPE
-#undef BUFFER_ADD
-
- return offset;
-} /* int write_influxdb_point */
-
static int
write_influxdb_udp_write(const data_set_t *ds, const value_list_t *vl,
user_data_t __attribute__((unused)) * user_data) {
char buffer[NET_DEFAULT_PACKET_SIZE];
- int status = write_influxdb_point(buffer, NET_DEFAULT_PACKET_SIZE, ds, vl);
+ int status = format_influxdb_value_list(buffer, NET_DEFAULT_PACKET_SIZE, ds,
+ vl, wifxudp_config_store_rates,
+ wifxudp_config_time_precision);
if (status < 0) {
ERROR("write_influxdb_udp plugin: write_influxdb_udp_write failed.");