]> git.ipfire.org Git - thirdparty/collectd.git/commitdiff
Some improvements for InfluxDB support (#3938)
authorMatwey V. Kornilov <matwey.kornilov@gmail.com>
Thu, 28 Apr 2022 16:29:17 +0000 (19:29 +0300)
committerGitHub <noreply@github.com>
Thu, 28 Apr 2022 16:29:17 +0000 (18:29 +0200)
* write_http: Make use of CURLOPT_POSTFIELDSIZE

CURLOPT_POSTFIELDSIZE allows to specify the data size, which is known in
advance and equals to cb->send_buffer_fill. When CURLOPT_POSTFIELDSIZE is not
set (or set to -1), then curl evaluates data size using strlen() function,
which have O(N) complexity, so we save a few CPU cycles here.

Signed-off-by: Matwey V. Kornilov <matwey.kornilov@gmail.com>
* write_influxdb_udp: Split formatting functions to format_influxdb

Signed-off-by: Matwey V. Kornilov <matwey.kornilov@gmail.com>
* write_http: Add influxdb format

Signed-off-by: Matwey V. Kornilov <matwey.kornilov@gmail.com>
* write_http: Enable using unix socket in libcurl

Signed-off-by: Matwey V. Kornilov <matwey.kornilov@gmail.com>
* meta_data: Introduce basic iterator interface

Currently, meta_data supports only the key lookup over forward list data
structure, so iterating over the list would take O(N^2).

Here we introduce meta_data_iter() and meta_data_iter_next() functions dealing
with opaque iterator type.

Signed-off-by: Matwey V. Kornilov <matwey.kornilov@gmail.com>
* format_influxdb: Support serializing meta_data

collectd 6.0 supports serializing series labels as influxdb tags. Here we
backport this feature serializing string-values meta data keys as influxdb
tags.

Signed-off-by: Matwey V. Kornilov <matwey.kornilov@gmail.com>
Makefile.am
src/utils/format_influxdb/format_influxdb.c [new file with mode: 0644]
src/utils/format_influxdb/format_influxdb.h [new file with mode: 0644]
src/utils/metadata/meta_data.c
src/utils/metadata/meta_data.h
src/write_http.c
src/write_influxdb_udp.c

index 4f0320751a03e93a084abff68e8d1a881ea319e1..a7b5de974207d985b2f56fcbdc73f990d1f5a7ea 100644 (file)
@@ -133,6 +133,7 @@ noinst_LTLIBRARIES = \
        libavltree.la \
        libcmds.la \
        libcommon.la \
+       libformat_influxdb.la \
        libformat_graphite.la \
        libformat_json.la \
        libheap.la \
@@ -428,6 +429,10 @@ libplugin_mock_la_SOURCES = \
 libplugin_mock_la_CPPFLAGS = $(AM_CPPFLAGS) -DMOCK_TIME
 libplugin_mock_la_LIBADD = libcommon.la libignorelist.la $(COMMON_LIBS)
 
+libformat_influxdb_la_SOURCES = \
+       src/utils/format_influxdb/format_influxdb.c \
+       src/utils/format_influxdb/format_influxdb.h
+
 libformat_graphite_la_SOURCES = \
        src/utils/format_graphite/format_graphite.c \
        src/utils/format_graphite/format_graphite.h
@@ -2264,7 +2269,7 @@ write_http_la_SOURCES = \
        src/utils/format_kairosdb/format_kairosdb.h
 write_http_la_CFLAGS = $(AM_CFLAGS) $(BUILD_WITH_LIBCURL_CFLAGS)
 write_http_la_LDFLAGS = $(PLUGIN_LDFLAGS)
-write_http_la_LIBADD = libformat_json.la $(BUILD_WITH_LIBCURL_LIBS)
+write_http_la_LIBADD = libformat_influxdb.la libformat_json.la $(BUILD_WITH_LIBCURL_LIBS)
 endif
 
 if BUILD_PLUGIN_WRITE_INFLUXDB_UDP
@@ -2272,8 +2277,9 @@ pkglib_LTLIBRARIES += write_influxdb_udp.la
 write_influxdb_udp_la_SOURCES = src/write_influxdb_udp.c
 write_influxdb_udp_la_CPPFLAGS = $(AM_CPPFLAGS)
 write_influxdb_udp_la_LDFLAGS = $(PLUGIN_LDFLAGS)
+write_influxdb_udp_la_LIBADD = libformat_influxdb.la
 if BUILD_WITH_LIBSOCKET
-write_influxdb_udp_la_LIBADD = -lsocket
+write_influxdb_udp_la_LIBADD += -lsocket
 endif
 endif
 
diff --git a/src/utils/format_influxdb/format_influxdb.c b/src/utils/format_influxdb/format_influxdb.c
new file mode 100644 (file)
index 0000000..b216509
--- /dev/null
@@ -0,0 +1,211 @@
+/**
+ * collectd - src/utils_format_influxdb.c
+ * Copyright (C) 2007-2009  Florian octo Forster
+ * Copyright (C) 2009       Aman Gupta
+ * Copyright (C) 2019       Carlos Peon Costa
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; only version 2 of the License is applicable.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
+ *
+ * Authors:
+ *   Florian octo Forster <octo at collectd.org>
+ *   Aman Gupta <aman at tmm1.net>
+ *   Carlos Peon Costa <carlospeon at gmail.com>
+ *   multiple Server directives by:
+ *   Paul (systemcrash) <newtwen thatfunny_at_symbol gmail.com>
+ **/
+
+#include "collectd.h"
+
+#include "plugin.h"
+#include "utils/common/common.h"
+#include "utils/metadata/meta_data.h"
+#include "utils_cache.h"
+
+#include "utils/format_influxdb/format_influxdb.h"
+
+static int format_influxdb_escape_string(char *buffer, size_t buffer_size,
+                                         const char *string) {
+
+  if ((buffer == NULL) || (string == NULL))
+    return -EINVAL;
+
+  if (buffer_size < 3)
+    return -ENOMEM;
+
+  int dst_pos = 0;
+
+#define BUFFER_ADD(c)                                                          \
+  do {                                                                         \
+    if (dst_pos >= (buffer_size - 1)) {                                        \
+      buffer[buffer_size - 1] = '\0';                                          \
+      return -ENOMEM;                                                          \
+    }                                                                          \
+    buffer[dst_pos] = (c);                                                     \
+    dst_pos++;                                                                 \
+  } while (0)
+
+  /* Escape special characters */
+  for (int src_pos = 0; string[src_pos] != 0; src_pos++) {
+    if ((string[src_pos] == '\\') || (string[src_pos] == ' ') ||
+        (string[src_pos] == ',') || (string[src_pos] == '=') ||
+        (string[src_pos] == '"')) {
+      BUFFER_ADD('\\');
+      BUFFER_ADD(string[src_pos]);
+    } else
+      BUFFER_ADD(string[src_pos]);
+  } /* for */
+  buffer[dst_pos] = 0;
+
+#undef BUFFER_ADD
+
+  return dst_pos;
+} /* int format_influxdb_escape_string */
+
+int format_influxdb_value_list(
+    char *buffer, int buffer_len, const data_set_t *ds, const value_list_t *vl,
+    bool store_rates, format_influxdb_time_precision_t time_precision) {
+  int status;
+  int offset = 0;
+  gauge_t *rates = NULL;
+  bool have_values = false;
+
+  assert(0 == strcmp(ds->type, vl->type));
+
+#define BUFFER_ADD_ESCAPE(...)                                                 \
+  do {                                                                         \
+    status = format_influxdb_escape_string(buffer + offset,                    \
+                                           buffer_len - offset, __VA_ARGS__);  \
+    if (status < 0)                                                            \
+      return status;                                                           \
+    offset += status;                                                          \
+  } while (0)
+
+#define BUFFER_ADD(...)                                                        \
+  do {                                                                         \
+    status = snprintf(buffer + offset, buffer_len - offset, __VA_ARGS__);      \
+    if ((status < 0) || (status >= (buffer_len - offset))) {                   \
+      sfree(rates);                                                            \
+      return -ENOMEM;                                                          \
+    }                                                                          \
+    offset += status;                                                          \
+  } while (0)
+
+  BUFFER_ADD_ESCAPE(vl->plugin);
+  BUFFER_ADD(",host=");
+  BUFFER_ADD_ESCAPE(vl->host);
+  if (strcmp(vl->plugin_instance, "") != 0) {
+    BUFFER_ADD(",instance=");
+    BUFFER_ADD_ESCAPE(vl->plugin_instance);
+  }
+  if (strcmp(vl->type, "") != 0) {
+    BUFFER_ADD(",type=");
+    BUFFER_ADD_ESCAPE(vl->type);
+  }
+  if (strcmp(vl->type_instance, "") != 0) {
+    BUFFER_ADD(",type_instance=");
+    BUFFER_ADD_ESCAPE(vl->type_instance);
+  }
+  if (vl->meta) {
+    for (meta_entry_t *it = meta_data_iter(vl->meta); it != NULL;
+         it = meta_data_iter_next(it)) {
+      const char *key = meta_data_iter_key(it);
+      char *value;
+
+      if (meta_data_iter_type(it) != MD_TYPE_STRING ||
+          meta_data_iter_get_string(vl->meta, it, &value) != 0)
+        continue;
+
+      BUFFER_ADD(",");
+      BUFFER_ADD_ESCAPE(key);
+      BUFFER_ADD("=");
+      BUFFER_ADD_ESCAPE(value);
+      free(value);
+    }
+  }
+
+  BUFFER_ADD(" ");
+  for (size_t i = 0; i < ds->ds_num; i++) {
+    if ((ds->ds[i].type != DS_TYPE_COUNTER) &&
+        (ds->ds[i].type != DS_TYPE_GAUGE) &&
+        (ds->ds[i].type != DS_TYPE_DERIVE) &&
+        (ds->ds[i].type != DS_TYPE_ABSOLUTE)) {
+      sfree(rates);
+      return -EINVAL;
+    }
+
+    if (ds->ds[i].type == DS_TYPE_GAUGE) {
+      if (isnan(vl->values[i].gauge))
+        continue;
+      if (have_values)
+        BUFFER_ADD(",");
+      BUFFER_ADD("%s=%lf", ds->ds[i].name, vl->values[i].gauge);
+      have_values = true;
+    } else if (store_rates) {
+      if (rates == NULL)
+        rates = uc_get_rate(ds, vl);
+      if (rates == NULL) {
+        WARNING("format_influxdb: "
+                "uc_get_rate failed.");
+        return -EINVAL;
+      }
+      if (isnan(rates[i]))
+        continue;
+      if (have_values)
+        BUFFER_ADD(",");
+      BUFFER_ADD("%s=%lf", ds->ds[i].name, rates[i]);
+      have_values = true;
+    } else if (ds->ds[i].type == DS_TYPE_COUNTER) {
+      if (have_values)
+        BUFFER_ADD(",");
+      BUFFER_ADD("%s=%" PRIu64 "i", ds->ds[i].name,
+                 (uint64_t)vl->values[i].counter);
+      have_values = true;
+    } else if (ds->ds[i].type == DS_TYPE_DERIVE) {
+      if (have_values)
+        BUFFER_ADD(",");
+      BUFFER_ADD("%s=%" PRIi64 "i", ds->ds[i].name, vl->values[i].derive);
+      have_values = true;
+    } else if (ds->ds[i].type == DS_TYPE_ABSOLUTE) {
+      if (have_values)
+        BUFFER_ADD(",");
+      BUFFER_ADD("%s=%" PRIu64 "i", ds->ds[i].name, vl->values[i].absolute);
+      have_values = true;
+    }
+
+  } /* for ds->ds_num */
+  sfree(rates);
+
+  if (!have_values)
+    return 0;
+
+  uint64_t influxdb_time = 0;
+  switch (time_precision) {
+  case NS:
+    influxdb_time = CDTIME_T_TO_NS(vl->time);
+    break;
+  case US:
+    influxdb_time = CDTIME_T_TO_US(vl->time);
+    break;
+  case MS:
+    influxdb_time = CDTIME_T_TO_MS(vl->time);
+    break;
+  }
+
+  BUFFER_ADD(" %" PRIu64 "\n", influxdb_time);
+
+#undef BUFFER_ADD_ESCAPE
+#undef BUFFER_ADD
+
+  return offset;
+} /* int format_influxdb_value_list */
diff --git a/src/utils/format_influxdb/format_influxdb.h b/src/utils/format_influxdb/format_influxdb.h
new file mode 100644 (file)
index 0000000..53f5e88
--- /dev/null
@@ -0,0 +1,46 @@
+/**
+ * collectd - src/utils_format_influxdb.h
+ * Copyright (C) 2007-2009  Florian octo Forster
+ * Copyright (C) 2009       Aman Gupta
+ * Copyright (C) 2019       Carlos Peon Costa
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; only version 2 of the License is applicable.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
+ *
+ * Authors:
+ *   Florian octo Forster <octo at collectd.org>
+ *   Aman Gupta <aman at tmm1.net>
+ *   Carlos Peon Costa <carlospeon at gmail.com>
+ *   multiple Server directives by:
+ *   Paul (systemcrash) <newtwen thatfunny_at_symbol gmail.com>
+ **/
+
+#ifndef UTILS_FORMAT_INFLUXDB_H
+#define UTILS_FORMAT_INFLUXDB_H 1
+
+#include "collectd.h"
+
+#include "plugin.h"
+
+typedef enum {
+  NS,
+  US,
+  MS,
+} format_influxdb_time_precision_t;
+
+int format_influxdb_value_list(char *buffer, int buffer_len,
+                               const data_set_t *ds, const value_list_t *vl,
+                               bool store_rates,
+                               format_influxdb_time_precision_t time_precision);
+
+#endif /* UTILS_FORMAT_INFLUXDB_H */
index 963aebbe9d55baca3911fb5b1c81d21395aeceb2..e23a30045c58e0739c102f883ff078e9761ca2f3 100644 (file)
@@ -44,8 +44,6 @@ union meta_value_u {
 };
 typedef union meta_value_u meta_value_t;
 
-struct meta_entry_s;
-typedef struct meta_entry_s meta_entry_t;
 struct meta_entry_s {
   char *key;
   meta_value_t value;
@@ -543,10 +541,29 @@ int meta_data_add_boolean(meta_data_t *md, /* {{{ */
 /*
  * Get functions
  */
+int _meta_data_get_string(meta_data_t *md, meta_entry_t *e, char **value) {
+  char *temp;
+
+  if (e->type != MD_TYPE_STRING) {
+    ERROR("meta_data_get_string: Type mismatch for key `%s'", e->key);
+    return -ENOENT;
+  }
+
+  temp = md_strdup(e->value.mv_string);
+  if (temp == NULL) {
+    ERROR("meta_data_get_string: md_strdup failed.");
+    return -ENOMEM;
+  }
+
+  *value = temp;
+
+  return 0;
+}
+
 int meta_data_get_string(meta_data_t *md, /* {{{ */
                          const char *key, char **value) {
   meta_entry_t *e;
-  char *temp;
+  int res = 0;
 
   if ((md == NULL) || (key == NULL) || (value == NULL))
     return -EINVAL;
@@ -559,24 +576,10 @@ int meta_data_get_string(meta_data_t *md, /* {{{ */
     return -ENOENT;
   }
 
-  if (e->type != MD_TYPE_STRING) {
-    ERROR("meta_data_get_string: Type mismatch for key `%s'", e->key);
-    pthread_mutex_unlock(&md->lock);
-    return -ENOENT;
-  }
-
-  temp = md_strdup(e->value.mv_string);
-  if (temp == NULL) {
-    pthread_mutex_unlock(&md->lock);
-    ERROR("meta_data_get_string: md_strdup failed.");
-    return -ENOMEM;
-  }
+  res = _meta_data_get_string(md, e, value);
 
   pthread_mutex_unlock(&md->lock);
-
-  *value = temp;
-
-  return 0;
+  return res;
 } /* }}} int meta_data_get_string */
 
 int meta_data_get_signed_int(meta_data_t *md, /* {{{ */
@@ -745,3 +748,26 @@ int meta_data_as_string(meta_data_t *md, /* {{{ */
 
   return 0;
 } /* }}} int meta_data_as_string */
+
+meta_entry_t *meta_data_iter(meta_data_t *md) { return md->head; }
+
+meta_entry_t *meta_data_iter_next(meta_entry_t *iter) { return iter->next; }
+
+int meta_data_iter_type(meta_entry_t *iter) { return iter->type; }
+
+const char *meta_data_iter_key(meta_entry_t *iter) { return iter->key; }
+
+int meta_data_iter_get_string(meta_data_t *md, meta_entry_t *iter,
+                              char **value) {
+  int res = 0;
+
+  if ((md == NULL) || (iter == NULL) || (value == NULL))
+    return -EINVAL;
+
+  pthread_mutex_lock(&md->lock);
+
+  res = _meta_data_get_string(md, iter, value);
+
+  pthread_mutex_unlock(&md->lock);
+  return res;
+}
index 203b146078a02042f6c41b86cf65c866b1fc8b29..ccb58817d7234b5d6ed91ead672696bd1f9a764d 100644 (file)
@@ -41,6 +41,9 @@
 struct meta_data_s;
 typedef struct meta_data_s meta_data_t;
 
+struct meta_entry_s;
+typedef struct meta_entry_s meta_entry_t;
+
 meta_data_t *meta_data_create(void);
 meta_data_t *meta_data_clone(meta_data_t *orig);
 int meta_data_clone_merge(meta_data_t **dest, meta_data_t *orig);
@@ -68,4 +71,11 @@ int meta_data_get_boolean(meta_data_t *md, const char *key, bool *value);
 /* Returns the value as a string, regardless of the type. */
 int meta_data_as_string(meta_data_t *md, const char *key, char **value);
 
+meta_entry_t *meta_data_iter(meta_data_t *md);
+meta_entry_t *meta_data_iter_next(meta_entry_t *iter);
+int meta_data_iter_type(meta_entry_t *iter);
+const char *meta_data_iter_key(meta_entry_t *iter);
+int meta_data_iter_get_string(meta_data_t *md, meta_entry_t *iter,
+                              char **value);
+
 #endif /* META_DATA_H */
index 6da06b42b21fc05c500ab7ee3989eb002dd3c27f..3ca55661d2530f07079304ac039fc1d3107eb01c 100644 (file)
@@ -28,6 +28,7 @@
 #include "plugin.h"
 #include "utils/common/common.h"
 #include "utils/curl_stats/curl_stats.h"
+#include "utils/format_influxdb/format_influxdb.h"
 #include "utils/format_json/format_json.h"
 #include "utils/format_kairosdb/format_kairosdb.h"
 
@@ -72,6 +73,7 @@ struct wh_callback_s {
 #define WH_FORMAT_COMMAND 0
 #define WH_FORMAT_JSON 1
 #define WH_FORMAT_KAIROSDB 2
+#define WH_FORMAT_INFLUXDB 3
   int format;
   bool send_metrics;
   bool send_notifications;
@@ -94,6 +96,8 @@ struct wh_callback_s {
 
   int data_ttl;
   char *metrics_prefix;
+
+  char *unix_socket_path;
 };
 typedef struct wh_callback_s wh_callback_t;
 
@@ -161,11 +165,13 @@ static void wh_reset_buffer(wh_callback_t *cb) /* {{{ */
 } /* }}} wh_reset_buffer */
 
 /* must hold cb->send_lock when calling */
-static int wh_post_nolock(wh_callback_t *cb, char const *data) /* {{{ */
+static int wh_post_nolock(wh_callback_t *cb, char const *data,
+                          long size) /* {{{ */
 {
   int status = 0;
 
   curl_easy_setopt(cb->curl, CURLOPT_URL, cb->location);
+  curl_easy_setopt(cb->curl, CURLOPT_POSTFIELDSIZE, size);
   curl_easy_setopt(cb->curl, CURLOPT_POSTFIELDS, data);
   curl_easy_setopt(cb->curl, CURLOPT_WRITEFUNCTION, &wh_curl_write_callback);
   curl_easy_setopt(cb->curl, CURLOPT_WRITEDATA, (void *)cb);
@@ -275,6 +281,11 @@ static int wh_callback_init(wh_callback_t *cb) /* {{{ */
     if (cb->clientkeypass != NULL)
       curl_easy_setopt(cb->curl, CURLOPT_SSLKEYPASSWD, cb->clientkeypass);
   }
+#ifdef CURL_VERSION_UNIX_SOCKETS
+  if (cb->unix_socket_path) {
+    curl_easy_setopt(cb->curl, CURLOPT_UNIX_SOCKET_PATH, cb->unix_socket_path);
+  }
+#endif // CURL_VERSION_UNIX_SOCKETS
 
   wh_reset_buffer(cb);
 
@@ -304,7 +315,7 @@ static int wh_flush_nolock(cdtime_t timeout, wh_callback_t *cb) /* {{{ */
       return 0;
     }
 
-    status = wh_post_nolock(cb, cb->send_buffer);
+    status = wh_post_nolock(cb, cb->send_buffer, cb->send_buffer_fill);
     wh_reset_buffer(cb);
   } else if (cb->format == WH_FORMAT_JSON || cb->format == WH_FORMAT_KAIROSDB) {
     if (cb->send_buffer_fill <= 2) {
@@ -321,7 +332,15 @@ static int wh_flush_nolock(cdtime_t timeout, wh_callback_t *cb) /* {{{ */
       return status;
     }
 
-    status = wh_post_nolock(cb, cb->send_buffer);
+    status = wh_post_nolock(cb, cb->send_buffer, cb->send_buffer_fill);
+    wh_reset_buffer(cb);
+  } else if (cb->format == WH_FORMAT_INFLUXDB) {
+    if (cb->send_buffer_fill == 0) {
+      cb->send_buffer_init_time = cdtime();
+      return 0;
+    }
+
+    status = wh_post_nolock(cb, cb->send_buffer, cb->send_buffer_fill);
     wh_reset_buffer(cb);
   } else {
     ERROR("write_http: wh_flush_nolock: "
@@ -573,6 +592,47 @@ static int wh_write_kairosdb(const data_set_t *ds,
   return 0;
 } /* }}} int wh_write_kairosdb */
 
+static int wh_write_influxdb(const data_set_t *ds,
+                             const value_list_t *vl, /* {{{ */
+                             wh_callback_t *cb) {
+  int status;
+
+  pthread_mutex_lock(&cb->send_lock);
+  if (wh_callback_init(cb) != 0) {
+    ERROR("write_http plugin: wh_callback_init failed.");
+    pthread_mutex_unlock(&cb->send_lock);
+    return -1;
+  }
+
+  status = format_influxdb_value_list(cb->send_buffer + cb->send_buffer_fill,
+                                      cb->send_buffer_free, ds, vl,
+                                      cb->store_rates, NS);
+  if (status == -ENOMEM) {
+    status = wh_flush_nolock(/* timeout = */ 0, cb);
+    if (status != 0) {
+      wh_reset_buffer(cb);
+      pthread_mutex_unlock(&cb->send_lock);
+      return status;
+    }
+
+    status = format_influxdb_value_list(cb->send_buffer + cb->send_buffer_fill,
+                                        cb->send_buffer_free, ds, vl,
+                                        cb->store_rates, NS);
+  }
+  if (status < 0) {
+    pthread_mutex_unlock(&cb->send_lock);
+    return status;
+  }
+
+  cb->send_buffer_fill += status;
+  cb->send_buffer_free -= status;
+
+  /* Check if we have enough space for this command. */
+  pthread_mutex_unlock(&cb->send_lock);
+
+  return 0;
+} /* }}} int wh_write_influxdb */
+
 static int wh_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
                     user_data_t *user_data) {
   wh_callback_t *cb;
@@ -591,6 +651,9 @@ static int wh_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
   case WH_FORMAT_KAIROSDB:
     status = wh_write_kairosdb(ds, vl, cb);
     break;
+  case WH_FORMAT_INFLUXDB:
+    status = wh_write_influxdb(ds, vl, cb);
+    break;
   default:
     status = wh_write_command(ds, vl, cb);
     break;
@@ -623,7 +686,7 @@ static int wh_notify(notification_t const *n, user_data_t *ud) /* {{{ */
     return -1;
   }
 
-  status = wh_post_nolock(cb, alert);
+  status = wh_post_nolock(cb, alert, -1);
   pthread_mutex_unlock(&cb->send_lock);
 
   return status;
@@ -647,6 +710,8 @@ static int config_set_format(wh_callback_t *cb, /* {{{ */
     cb->format = WH_FORMAT_JSON;
   else if (strcasecmp("KAIROSDB", string) == 0)
     cb->format = WH_FORMAT_KAIROSDB;
+  else if (strcasecmp("INFLUXDB", string) == 0)
+    cb->format = WH_FORMAT_INFLUXDB;
   else {
     ERROR("write_http plugin: Invalid format string: %s", string);
     return -1;
@@ -698,6 +763,7 @@ static int wh_config_node(oconfig_item_t *ci) /* {{{ */
   cb->data_ttl = 0;
   cb->metrics_prefix = strdup(WRITE_HTTP_DEFAULT_PREFIX);
   cb->curl_stats = NULL;
+  cb->unix_socket_path = NULL;
 
   if (cb->metrics_prefix == NULL) {
     ERROR("write_http plugin: strdup failed.");
@@ -821,6 +887,12 @@ static int wh_config_node(oconfig_item_t *ci) /* {{{ */
       status = cf_util_get_int(child, &cb->data_ttl);
     } else if (strcasecmp("Prefix", child->key) == 0) {
       status = cf_util_get_string(child, &cb->metrics_prefix);
+    } else if (strcasecmp("UnixSocket", child->key) == 0) {
+#ifdef CURL_VERSION_UNIX_SOCKETS
+      status = cf_util_get_string(child, &cb->unix_socket_path);
+#else
+      WARNING("libcurl < 7.40.0, UnixSocket config is ignored");
+#endif // CURL_VERSION_UNIX_SOCKETS
     } else {
       ERROR("write_http plugin: Invalid configuration "
             "option: %s.",
index 6b4ddf3b6220333ebe90992af7c8ecdca0fa39ac..7c85a28e2a3de2142d276ac1e7f7e31578efb096 100644 (file)
@@ -32,6 +32,8 @@
 #include "utils_cache.h"
 #include "utils_complain.h"
 
+#include "utils/format_influxdb/format_influxdb.h"
+
 #if HAVE_NETDB_H
 #include <netdb.h>
 #endif
@@ -70,12 +72,6 @@ typedef struct sockent {
 #define NET_DEFAULT_PACKET_SIZE 1452
 #define NET_DEFAULT_PORT "8089"
 
-typedef enum {
-  NS,
-  US,
-  MS,
-} wifxudp_time_precision_t;
-
 /*
  * Private variables
  */
@@ -83,7 +79,7 @@ typedef enum {
 static int wifxudp_config_ttl;
 static size_t wifxudp_config_packet_size = NET_DEFAULT_PACKET_SIZE;
 static bool wifxudp_config_store_rates;
-static wifxudp_time_precision_t wifxudp_config_time_precision = MS;
+static format_influxdb_time_precision_t wifxudp_config_time_precision = MS;
 
 static sockent_t *sending_sockets;
 
@@ -353,170 +349,14 @@ static void flush_buffer(void) {
   write_influxdb_udp_init_buffer();
 }
 
-static int wifxudp_escape_string(char *buffer, size_t buffer_size,
-                                 const char *string) {
-
-  if ((buffer == NULL) || (string == NULL))
-    return -EINVAL;
-
-  if (buffer_size < 3)
-    return -ENOMEM;
-
-  int dst_pos = 0;
-
-#define BUFFER_ADD(c)                                                          \
-  do {                                                                         \
-    if (dst_pos >= (buffer_size - 1)) {                                        \
-      buffer[buffer_size - 1] = '\0';                                          \
-      return -ENOMEM;                                                          \
-    }                                                                          \
-    buffer[dst_pos] = (c);                                                     \
-    dst_pos++;                                                                 \
-  } while (0)
-
-  /* Escape special characters */
-  for (int src_pos = 0; string[src_pos] != 0; src_pos++) {
-    if ((string[src_pos] == '\\') || (string[src_pos] == ' ') ||
-        (string[src_pos] == ',') || (string[src_pos] == '=') ||
-        (string[src_pos] == '"')) {
-      BUFFER_ADD('\\');
-      BUFFER_ADD(string[src_pos]);
-    } else
-      BUFFER_ADD(string[src_pos]);
-  } /* for */
-  buffer[dst_pos] = 0;
-
-#undef BUFFER_ADD
-
-  return dst_pos;
-} /* int wifxudp_escape_string */
-
-static int write_influxdb_point(char *buffer, int buffer_len,
-                                const data_set_t *ds, const value_list_t *vl) {
-  int status;
-  int offset = 0;
-  gauge_t *rates = NULL;
-  bool have_values = false;
-
-  assert(0 == strcmp(ds->type, vl->type));
-
-#define BUFFER_ADD_ESCAPE(...)                                                 \
-  do {                                                                         \
-    status = wifxudp_escape_string(buffer + offset, buffer_len - offset,       \
-                                   __VA_ARGS__);                               \
-    if (status < 0)                                                            \
-      return -1;                                                               \
-    offset += status;                                                          \
-  } while (0)
-
-#define BUFFER_ADD(...)                                                        \
-  do {                                                                         \
-    status = snprintf(buffer + offset, buffer_len - offset, __VA_ARGS__);      \
-    if ((status < 0) || (status >= (buffer_len - offset))) {                   \
-      sfree(rates);                                                            \
-      return -1;                                                               \
-    }                                                                          \
-    offset += status;                                                          \
-  } while (0)
-
-  BUFFER_ADD_ESCAPE(vl->plugin);
-  BUFFER_ADD(",host=");
-  BUFFER_ADD_ESCAPE(vl->host);
-  if (strcmp(vl->plugin_instance, "") != 0) {
-    BUFFER_ADD(",instance=");
-    BUFFER_ADD_ESCAPE(vl->plugin_instance);
-  }
-  if (strcmp(vl->type, "") != 0) {
-    BUFFER_ADD(",type=");
-    BUFFER_ADD_ESCAPE(vl->type);
-  }
-  if (strcmp(vl->type_instance, "") != 0) {
-    BUFFER_ADD(",type_instance=");
-    BUFFER_ADD_ESCAPE(vl->type_instance);
-  }
-
-  BUFFER_ADD(" ");
-  for (size_t i = 0; i < ds->ds_num; i++) {
-    if ((ds->ds[i].type != DS_TYPE_COUNTER) &&
-        (ds->ds[i].type != DS_TYPE_GAUGE) &&
-        (ds->ds[i].type != DS_TYPE_DERIVE) &&
-        (ds->ds[i].type != DS_TYPE_ABSOLUTE)) {
-      sfree(rates);
-      return -1;
-    }
-
-    if (ds->ds[i].type == DS_TYPE_GAUGE) {
-      if (isnan(vl->values[i].gauge))
-        continue;
-      if (have_values)
-        BUFFER_ADD(",");
-      BUFFER_ADD("%s=%lf", ds->ds[i].name, vl->values[i].gauge);
-      have_values = true;
-    } else if (wifxudp_config_store_rates) {
-      if (rates == NULL)
-        rates = uc_get_rate(ds, vl);
-      if (rates == NULL) {
-        WARNING("write_influxdb_udp plugin: "
-                "uc_get_rate failed.");
-        return -1;
-      }
-      if (isnan(rates[i]))
-        continue;
-      if (have_values)
-        BUFFER_ADD(",");
-      BUFFER_ADD("%s=%lf", ds->ds[i].name, rates[i]);
-      have_values = true;
-    } else if (ds->ds[i].type == DS_TYPE_COUNTER) {
-      if (have_values)
-        BUFFER_ADD(",");
-      BUFFER_ADD("%s=%" PRIu64 "i", ds->ds[i].name,
-                 (uint64_t)vl->values[i].counter);
-      have_values = true;
-    } else if (ds->ds[i].type == DS_TYPE_DERIVE) {
-      if (have_values)
-        BUFFER_ADD(",");
-      BUFFER_ADD("%s=%" PRIi64 "i", ds->ds[i].name, vl->values[i].derive);
-      have_values = true;
-    } else if (ds->ds[i].type == DS_TYPE_ABSOLUTE) {
-      if (have_values)
-        BUFFER_ADD(",");
-      BUFFER_ADD("%s=%" PRIu64 "i", ds->ds[i].name, vl->values[i].absolute);
-      have_values = true;
-    }
-
-  } /* for ds->ds_num */
-  sfree(rates);
-
-  if (!have_values)
-    return 0;
-
-  uint64_t influxdb_time = 0;
-  switch (wifxudp_config_time_precision) {
-  case NS:
-    influxdb_time = CDTIME_T_TO_NS(vl->time);
-    break;
-  case US:
-    influxdb_time = CDTIME_T_TO_US(vl->time);
-    break;
-  case MS:
-    influxdb_time = CDTIME_T_TO_MS(vl->time);
-    break;
-  }
-
-  BUFFER_ADD(" %" PRIu64 "\n", influxdb_time);
-
-#undef BUFFER_ADD_ESCAPE
-#undef BUFFER_ADD
-
-  return offset;
-} /* int write_influxdb_point */
-
 static int
 write_influxdb_udp_write(const data_set_t *ds, const value_list_t *vl,
                          user_data_t __attribute__((unused)) * user_data) {
   char buffer[NET_DEFAULT_PACKET_SIZE];
 
-  int status = write_influxdb_point(buffer, NET_DEFAULT_PACKET_SIZE, ds, vl);
+  int status = format_influxdb_value_list(buffer, NET_DEFAULT_PACKET_SIZE, ds,
+                                          vl, wifxudp_config_store_rates,
+                                          wifxudp_config_time_precision);
 
   if (status < 0) {
     ERROR("write_influxdb_udp plugin: write_influxdb_udp_write failed.");