libavltree.la \
libcmds.la \
libcommon.la \
+ libformat_influxdb.la \
libformat_graphite.la \
libheap.la \
libignorelist.la \
libplugin_mock_la_CPPFLAGS = $(AM_CPPFLAGS) -DMOCK_TIME
libplugin_mock_la_LIBADD = libcommon.la libignorelist.la libmetadata.la $(COMMON_LIBS)
+libformat_influxdb_la_SOURCES = \
+ src/utils/format_influxdb/format_influxdb.c \
+ src/utils/format_influxdb/format_influxdb.h
+
libformat_graphite_la_SOURCES = \
src/utils/format_graphite/format_graphite.c \
src/utils/format_graphite/format_graphite.h
src/utils/format_kairosdb/format_kairosdb.h
write_http_la_CFLAGS = $(AM_CFLAGS) $(BUILD_WITH_LIBCURL_CFLAGS)
write_http_la_LDFLAGS = $(PLUGIN_LDFLAGS)
-write_http_la_LIBADD = libcmds.la libformat_json.la $(BUILD_WITH_LIBCURL_LIBS)
+write_http_la_LIBADD = libcmds.la libformat_json.la libformat_influxdb.la $(BUILD_WITH_LIBCURL_LIBS)
endif
if BUILD_PLUGIN_WRITE_INFLUXDB_UDP
write_influxdb_udp_la_SOURCES = src/write_influxdb_udp.c
write_influxdb_udp_la_CPPFLAGS = $(AM_CPPFLAGS)
write_influxdb_udp_la_LDFLAGS = $(PLUGIN_LDFLAGS)
+write_influxdb_udp_la_LIBADD = libformat_influxdb.la
if BUILD_WITH_LIBSOCKET
-write_influxdb_udp_la_LIBADD = -lsocket
+write_influxdb_udp_la_LIBADD += -lsocket
endif
endif
--- /dev/null
+/**
+ * collectd - src/utils_format_influxdb.c
+ * Copyright (C) 2007-2009 Florian octo Forster
+ * Copyright (C) 2009 Aman Gupta
+ * Copyright (C) 2019 Carlos Peon Costa
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; only version 2 of the License is applicable.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ *
+ * Authors:
+ * Florian octo Forster <octo at collectd.org>
+ * Aman Gupta <aman at tmm1.net>
+ * Carlos Peon Costa <carlospeon at gmail.com>
+ **/
+
+#include "collectd.h"
+
+#include "plugin.h"
+#include "utils/common/common.h"
+#include "utils_cache.h"
+
+#include "utils/format_influxdb/format_influxdb.h"
+
+int format_influxdb_point(strbuf_t *sb, metric_t metric, bool store_rates) {
+ bool have_values = false;
+
+#define BUFFER_ADD_ESCAPE(...) \
+ do { \
+ int status = strbuf_print_escaped(sb, __VA_ARGS__, "\\ ,=\"", '\\'); \
+ if (status != 0) \
+ return status; \
+ } while (0)
+
+#define BUFFER_ADD(...) \
+ do { \
+ int status = strbuf_printf(sb, __VA_ARGS__); \
+ if (status != 0) \
+ return status; \
+ } while (0)
+
+ have_values = false;
+ BUFFER_ADD_ESCAPE(metric.family->name);
+ for (size_t j = 0; j < metric.label.num; j++) {
+ label_pair_t label = metric.label.ptr[j];
+ BUFFER_ADD(",");
+ BUFFER_ADD_ESCAPE(label.name);
+ BUFFER_ADD("=");
+ BUFFER_ADD_ESCAPE(label.value);
+ }
+ BUFFER_ADD(" ");
+
+ if (store_rates && (metric.family->type == METRIC_TYPE_COUNTER)) {
+ gauge_t rate;
+ if (uc_get_rate(&metric, &rate) != 0) {
+ WARNING("write_influxdb_udp plugin: "
+ "uc_get_rate failed.");
+ return EINVAL;
+ }
+ if (!isnan(rate)) {
+ BUFFER_ADD("value=" GAUGE_FORMAT, rate);
+ have_values = true;
+ }
+ } else {
+ switch (metric.family->type) {
+ case METRIC_TYPE_GAUGE:
+ case METRIC_TYPE_UNTYPED:
+ if (!isnan(metric.value.gauge)) {
+ BUFFER_ADD("value=" GAUGE_FORMAT, metric.value.gauge);
+ have_values = true;
+ }
+ break;
+ case METRIC_TYPE_COUNTER:
+ BUFFER_ADD("value=%" PRIi64 "i", metric.value.counter);
+ have_values = true;
+ break;
+ default:
+ WARNING("write_influxdb_udp plugin: "
+ "unknown family type.");
+ return EINVAL;
+ break;
+ }
+ }
+
+ if (!have_values)
+ return 0;
+
+ BUFFER_ADD(" %" PRIu64 "\n", CDTIME_T_TO_MS(metric.time));
+
+#undef BUFFER_ADD_ESCAPE
+#undef BUFFER_ADD
+
+ return 0;
+} /* int write_influxdb_point */
--- /dev/null
+/**
+ * collectd - src/utils_format_influxdb.h
+ * Copyright (C) 2007-2009 Florian octo Forster
+ * Copyright (C) 2009 Aman Gupta
+ * Copyright (C) 2019 Carlos Peon Costa
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; only version 2 of the License is applicable.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ *
+ * Authors:
+ * Florian octo Forster <octo at collectd.org>
+ * Aman Gupta <aman at tmm1.net>
+ * Carlos Peon Costa <carlospeon at gmail.com>
+ **/
+
+#ifndef UTILS_FORMAT_INFLUXDB_H
+#define UTILS_FORMAT_INFLUXDB_H 1
+
+#include "collectd.h"
+
+#include "plugin.h"
+
+int format_influxdb_point(strbuf_t *sb, metric_t metric, bool store_rates);
+
+#endif /* UTILS_FORMAT_INFLUXDB_H */
#include "utils/cmds/putmetric.h"
#include "utils/common/common.h"
#include "utils/curl_stats/curl_stats.h"
+#include "utils/format_influxdb/format_influxdb.h"
#include "utils/format_json/format_json.h"
#include "utils/format_kairosdb/format_kairosdb.h"
#define WH_FORMAT_COMMAND 0
#define WH_FORMAT_JSON 1
#define WH_FORMAT_KAIROSDB 2
+#define WH_FORMAT_INFLUXDB 3
int format;
bool send_metrics;
bool send_notifications;
int data_ttl;
char *metrics_prefix;
+
+ char *unix_socket_path;
};
typedef struct wh_callback_s wh_callback_t;
}
/* must hold cb->curl_lock when calling */
-static int wh_post(wh_callback_t *cb, char const *data) {
+static int wh_post(wh_callback_t *cb, char const *data, long size) {
pthread_mutex_lock(&cb->curl_lock);
curl_easy_setopt(cb->curl, CURLOPT_URL, cb->location);
+ curl_easy_setopt(cb->curl, CURLOPT_POSTFIELDSIZE, size);
curl_easy_setopt(cb->curl, CURLOPT_POSTFIELDS, data);
curl_easy_setopt(cb->curl, CURLOPT_WRITEFUNCTION, &wh_curl_write_callback);
curl_easy_setopt(cb->curl, CURLOPT_WRITEDATA, (void *)cb);
if (cb->clientkeypass != NULL)
curl_easy_setopt(cb->curl, CURLOPT_SSLKEYPASSWD, cb->clientkeypass);
}
+#ifdef CURL_VERSION_UNIX_SOCKETS
+ if (cb->unix_socket_path) {
+ curl_easy_setopt(cb->curl, CURLOPT_UNIX_SOCKET_PATH, cb->unix_socket_path);
+ }
+#endif // CURL_VERSION_UNIX_SOCKETS
strbuf_reset(&cb->send_buffer);
}
char const *json = strdup(cb->send_buffer.ptr);
+ const size_t size = cb->send_buffer.pos;
strbuf_reset(&cb->send_buffer);
cb->send_buffer_init_time = cdtime();
pthread_mutex_unlock(&cb->send_buffer_lock);
return ENOMEM;
}
- return wh_post(cb, json);
+ return wh_post(cb, json, size);
} /* int wh_flush */
static void wh_callback_free(void *data) {
return 0;
} /* int wh_write_kairosdb */
+static int wh_write_influxdb(metric_family_t const *fam, wh_callback_t *cb) {
+ pthread_mutex_lock(&cb->send_buffer_lock);
+
+ for (size_t i = 0; i < fam->metric.num; i++) {
+ metric_t metric = fam->metric.ptr[i];
+ int status =
+ format_influxdb_point(&cb->send_buffer, metric, cb->store_rates);
+ if (status != 0) {
+ pthread_mutex_unlock(&cb->send_buffer_lock);
+ ERROR("write_http plugin: format_influxdb_point failed: %s",
+ STRERROR(status));
+ return status;
+ }
+ }
+
+ pthread_mutex_unlock(&cb->send_buffer_lock);
+ return 0;
+} /* wh_write_influxdb */
+
static int wh_write(metric_family_t const *fam, user_data_t *user_data) {
if ((fam == NULL) || (user_data == NULL)) {
return EINVAL;
case WH_FORMAT_KAIROSDB:
status = wh_write_kairosdb(fam, cb);
break;
+ case WH_FORMAT_INFLUXDB:
+ status = wh_write_influxdb(fam, cb);
+ break;
default:
status = wh_write_command(fam, cb);
break;
return -1;
}
- status = wh_post(cb, alert);
+ status = wh_post(cb, alert, -1);
pthread_mutex_unlock(&cb->send_buffer_lock);
return status;
cb->format = WH_FORMAT_JSON;
else if (strcasecmp("KAIROSDB", string) == 0)
cb->format = WH_FORMAT_KAIROSDB;
+ else if (strcasecmp("INFLUXDB", string) == 0)
+ cb->format = WH_FORMAT_INFLUXDB;
else {
ERROR("write_http plugin: Invalid format string: %s", string);
return -1;
cb->data_ttl = 0;
cb->metrics_prefix = strdup(WRITE_HTTP_DEFAULT_PREFIX);
cb->curl_stats = NULL;
+ cb->unix_socket_path = NULL;
if (cb->metrics_prefix == NULL) {
ERROR("write_http plugin: strdup failed.");
status = cf_util_get_int(child, &cb->data_ttl);
else if (strcasecmp("Prefix", child->key) == 0)
status = cf_util_get_string(child, &cb->metrics_prefix);
- else {
+ else if (strcasecmp("UnixSocket", child->key) == 0) {
+#ifdef CURL_VERSION_UNIX_SOCKETS
+ status = cf_util_get_string(child, &cb->unix_socket_path);
+#else
+ WARNING("libcurl < 7.40.0, UnixSocket config is ignored");
+#endif // CURL_VERSION_UNIX_SOCKETS
+ } else {
ERROR("write_http plugin: Invalid configuration "
"option: %s.",
child->key);
#include "plugin.h"
#include "utils/common/common.h"
+#include "utils/strbuf/strbuf.h"
#include "utils_cache.h"
#include "utils_complain.h"
+#include "utils/format_influxdb/format_influxdb.h"
+
#if HAVE_NETDB_H
#include <netdb.h>
#endif
pthread_mutex_unlock(&send_buffer_lock);
}
-static int wifxudp_escape_string(char *buffer, size_t buffer_size,
- const char *string) {
-
- if ((buffer == NULL) || (string == NULL))
- return -EINVAL;
-
- int dst_pos = 0;
-
-#define BUFFER_ADD(c) \
- do { \
- if (dst_pos >= (buffer_size - 1)) { \
- buffer[buffer_size - 1] = 0; \
- return dst_pos++; \
- } \
- buffer[dst_pos] = (c); \
- dst_pos++; \
- } while (0)
-
- /* Escape special characters */
- for (int src_pos = 0; string[src_pos] != 0; src_pos++) {
- if ((string[src_pos] == '\\') || (string[src_pos] == ' ') ||
- (string[src_pos] == ',') || (string[src_pos] == '=') ||
- (string[src_pos] == '"')) {
- BUFFER_ADD('\\');
- BUFFER_ADD(string[src_pos]);
- } else
- BUFFER_ADD(string[src_pos]);
- } /* for */
- buffer[dst_pos] = 0;
-
-#undef BUFFER_ADD
-
- return dst_pos;
-} /* int wifxudp_escape_string */
-
-static int write_influxdb_point(char *buffer, int buffer_len, metric_t metric) {
- int offset = 0;
- bool have_values = false;
-
-#define BUFFER_ADD_ESCAPE(...) \
- do { \
- int status = wifxudp_escape_string(buffer + offset, buffer_len - offset, \
- __VA_ARGS__); \
- if (status < 0) \
- return status; \
- offset += status; \
- if (status >= (buffer_len - offset)) \
- return offset; \
- } while (0)
-
-#define BUFFER_ADD(...) \
- do { \
- int status = snprintf(buffer + offset, buffer_len - offset, __VA_ARGS__); \
- if (status < 0) \
- return status; \
- offset += status; \
- if (status >= (buffer_len - offset)) \
- return offset; \
- } while (0)
-
- have_values = false;
- BUFFER_ADD_ESCAPE(metric.family->name);
- for (size_t j = 0; j < metric.label.num; j++) {
- label_pair_t label = metric.label.ptr[j];
- BUFFER_ADD(",");
- BUFFER_ADD_ESCAPE(label.name);
- BUFFER_ADD("=");
- BUFFER_ADD_ESCAPE(label.value);
- }
- BUFFER_ADD(" ");
-
- if (wifxudp_config_store_rates &&
- (metric.family->type == METRIC_TYPE_COUNTER)) {
- gauge_t rate;
- if (uc_get_rate(&metric, &rate) != 0) {
- WARNING("write_influxdb_udp plugin: "
- "uc_get_rate failed.");
- return -1;
- }
- if (!isnan(rate)) {
- BUFFER_ADD("value=" GAUGE_FORMAT, rate);
- have_values = true;
- }
- } else {
- switch (metric.family->type) {
- case METRIC_TYPE_GAUGE:
- case METRIC_TYPE_UNTYPED:
- if (!isnan(metric.value.gauge)) {
- BUFFER_ADD("value=" GAUGE_FORMAT, metric.value.gauge);
- have_values = true;
- }
- break;
- case METRIC_TYPE_COUNTER:
- BUFFER_ADD("value=%" PRIi64 "i", metric.value.counter);
- have_values = true;
- break;
- default:
- WARNING("write_influxdb_udp plugin: "
- "unknown family type.");
- return -1;
- break;
- }
- }
-
- if (!have_values)
- return 0;
-
- BUFFER_ADD(" %" PRIu64 "\n", CDTIME_T_TO_MS(metric.time));
-
-#undef BUFFER_ADD_ESCAPE
-#undef BUFFER_ADD
-
- return offset;
-} /* int write_influxdb_point */
-
static int write_influxdb_udp_write(metric_family_t const *fam,
user_data_t __attribute__((unused)) *
user_data) {
return EINVAL;
char buffer[NET_DEFAULT_PACKET_SIZE];
- int buffer_len = NET_DEFAULT_PACKET_SIZE;
- int offset = 0;
+ const size_t buffer_len =
+ (wifxudp_config_packet_size < NET_DEFAULT_PACKET_SIZE
+ ? wifxudp_config_packet_size
+ : NET_DEFAULT_PACKET_SIZE);
+ strbuf_t sb = STRBUF_CREATE_FIXED(buffer, buffer_len);
- if (wifxudp_config_packet_size < buffer_len)
- buffer_len = wifxudp_config_packet_size;
-
- for (size_t i = 0; i < fam->metric.num; i++) {
+ for (size_t i = 0; i < fam->metric.num;) {
metric_t metric = fam->metric.ptr[i];
- int status =
- write_influxdb_point(buffer + offset, buffer_len - offset, metric);
- if (status < 0) { // error
+ const size_t pos = sb.pos;
+ int status = format_influxdb_point(&sb, metric, wifxudp_config_store_rates);
+ if (status == ENOSPC) {
+ fill_send_buffer(sb.ptr, pos);
+ strbuf_reset(&sb);
+ } else if (status != 0) { // error
ERROR("write_influxdb_udp plugin: write_influxdb_udp_write failed.");
- return -1;
- } else if (status >= buffer_len - offset) { // full
- fill_send_buffer(buffer, offset);
- offset = 0;
- buffer[0] = 0;
+ return status;
} else {
- offset += status;
+ ++i;
}
}
- fill_send_buffer(buffer, offset);
+ fill_send_buffer(sb.ptr, sb.pos);
return 0;
} /* int write_influxdb_udp_write */