]> git.ipfire.org Git - thirdparty/collectd.git/commitdiff
[collectd 6] Port #3938 for collectd 6 (#3999)
authorMatwey V. Kornilov <matwey.kornilov@gmail.com>
Wed, 8 Jun 2022 09:56:42 +0000 (12:56 +0300)
committerGitHub <noreply@github.com>
Wed, 8 Jun 2022 09:56:42 +0000 (11:56 +0200)
* write_http: Make use of CURLOPT_POSTFIELDSIZE

CURLOPT_POSTFIELDSIZE allows to specify the data size, which is known in
advance and equals to cb->send_buffer_fill. When CURLOPT_POSTFIELDSIZE is not
set (or set to -1), then curl evaluates data size using strlen() function,
which have O(N) complexity, so we save a few CPU cycles here.

Signed-off-by: Matwey V. Kornilov <matwey.kornilov@gmail.com>
* write_influxdb_udp: Split formatting functions to format_influxdb

Signed-off-by: Matwey V. Kornilov <matwey.kornilov@gmail.com>
* write_http: Add influxdb format

Signed-off-by: Matwey V. Kornilov <matwey.kornilov@gmail.com>
* write_http: Enable using unix socket in libcurl

Signed-off-by: Matwey V. Kornilov <matwey.kornilov@gmail.com>
Co-authored-by: Matthias Runge <mrunge@redhat.com>
Makefile.am
src/utils/format_influxdb/format_influxdb.c [new file with mode: 0644]
src/utils/format_influxdb/format_influxdb.h [new file with mode: 0644]
src/write_http.c
src/write_influxdb_udp.c

index 35a3678b3a031f871c0db47e9f00e0d66ddec080..7da61888698bba7684e622a50380f805b3c7438d 100644 (file)
@@ -130,6 +130,7 @@ noinst_LTLIBRARIES = \
        libavltree.la \
        libcmds.la \
        libcommon.la \
+       libformat_influxdb.la \
        libformat_graphite.la \
        libheap.la \
        libignorelist.la \
@@ -437,6 +438,10 @@ libplugin_mock_la_SOURCES = \
 libplugin_mock_la_CPPFLAGS = $(AM_CPPFLAGS) -DMOCK_TIME
 libplugin_mock_la_LIBADD = libcommon.la libignorelist.la libmetadata.la $(COMMON_LIBS)
 
+libformat_influxdb_la_SOURCES = \
+       src/utils/format_influxdb/format_influxdb.c \
+       src/utils/format_influxdb/format_influxdb.h
+
 libformat_graphite_la_SOURCES = \
        src/utils/format_graphite/format_graphite.c \
        src/utils/format_graphite/format_graphite.h
@@ -2263,7 +2268,7 @@ write_http_la_SOURCES = \
        src/utils/format_kairosdb/format_kairosdb.h
 write_http_la_CFLAGS = $(AM_CFLAGS) $(BUILD_WITH_LIBCURL_CFLAGS)
 write_http_la_LDFLAGS = $(PLUGIN_LDFLAGS)
-write_http_la_LIBADD = libcmds.la libformat_json.la $(BUILD_WITH_LIBCURL_LIBS)
+write_http_la_LIBADD = libcmds.la libformat_json.la libformat_influxdb.la $(BUILD_WITH_LIBCURL_LIBS)
 endif
 
 if BUILD_PLUGIN_WRITE_INFLUXDB_UDP
@@ -2271,8 +2276,9 @@ pkglib_LTLIBRARIES += write_influxdb_udp.la
 write_influxdb_udp_la_SOURCES = src/write_influxdb_udp.c
 write_influxdb_udp_la_CPPFLAGS = $(AM_CPPFLAGS)
 write_influxdb_udp_la_LDFLAGS = $(PLUGIN_LDFLAGS)
+write_influxdb_udp_la_LIBADD = libformat_influxdb.la
 if BUILD_WITH_LIBSOCKET
-write_influxdb_udp_la_LIBADD = -lsocket
+write_influxdb_udp_la_LIBADD += -lsocket
 endif
 endif
 
diff --git a/src/utils/format_influxdb/format_influxdb.c b/src/utils/format_influxdb/format_influxdb.c
new file mode 100644 (file)
index 0000000..2e9588b
--- /dev/null
@@ -0,0 +1,103 @@
+/**
+ * collectd - src/utils_format_influxdb.c
+ * Copyright (C) 2007-2009  Florian octo Forster
+ * Copyright (C) 2009       Aman Gupta
+ * Copyright (C) 2019       Carlos Peon Costa
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; only version 2 of the License is applicable.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
+ *
+ * Authors:
+ *   Florian octo Forster <octo at collectd.org>
+ *   Aman Gupta <aman at tmm1.net>
+ *   Carlos Peon Costa <carlospeon at gmail.com>
+ **/
+
+#include "collectd.h"
+
+#include "plugin.h"
+#include "utils/common/common.h"
+#include "utils_cache.h"
+
+#include "utils/format_influxdb/format_influxdb.h"
+
+int format_influxdb_point(strbuf_t *sb, metric_t metric, bool store_rates) {
+  bool have_values = false;
+
+#define BUFFER_ADD_ESCAPE(...)                                                 \
+  do {                                                                         \
+    int status = strbuf_print_escaped(sb, __VA_ARGS__, "\\ ,=\"", '\\');       \
+    if (status != 0)                                                           \
+      return status;                                                           \
+  } while (0)
+
+#define BUFFER_ADD(...)                                                        \
+  do {                                                                         \
+    int status = strbuf_printf(sb, __VA_ARGS__);                               \
+    if (status != 0)                                                           \
+      return status;                                                           \
+  } while (0)
+
+  have_values = false;
+  BUFFER_ADD_ESCAPE(metric.family->name);
+  for (size_t j = 0; j < metric.label.num; j++) {
+    label_pair_t label = metric.label.ptr[j];
+    BUFFER_ADD(",");
+    BUFFER_ADD_ESCAPE(label.name);
+    BUFFER_ADD("=");
+    BUFFER_ADD_ESCAPE(label.value);
+  }
+  BUFFER_ADD(" ");
+
+  if (store_rates && (metric.family->type == METRIC_TYPE_COUNTER)) {
+    gauge_t rate;
+    if (uc_get_rate(&metric, &rate) != 0) {
+      WARNING("write_influxdb_udp plugin: "
+              "uc_get_rate failed.");
+      return EINVAL;
+    }
+    if (!isnan(rate)) {
+      BUFFER_ADD("value=" GAUGE_FORMAT, rate);
+      have_values = true;
+    }
+  } else {
+    switch (metric.family->type) {
+    case METRIC_TYPE_GAUGE:
+    case METRIC_TYPE_UNTYPED:
+      if (!isnan(metric.value.gauge)) {
+        BUFFER_ADD("value=" GAUGE_FORMAT, metric.value.gauge);
+        have_values = true;
+      }
+      break;
+    case METRIC_TYPE_COUNTER:
+      BUFFER_ADD("value=%" PRIi64 "i", metric.value.counter);
+      have_values = true;
+      break;
+    default:
+      WARNING("write_influxdb_udp plugin: "
+              "unknown family type.");
+      return EINVAL;
+      break;
+    }
+  }
+
+  if (!have_values)
+    return 0;
+
+  BUFFER_ADD(" %" PRIu64 "\n", CDTIME_T_TO_MS(metric.time));
+
+#undef BUFFER_ADD_ESCAPE
+#undef BUFFER_ADD
+
+  return 0;
+} /* int write_influxdb_point */
diff --git a/src/utils/format_influxdb/format_influxdb.h b/src/utils/format_influxdb/format_influxdb.h
new file mode 100644 (file)
index 0000000..bd6b974
--- /dev/null
@@ -0,0 +1,35 @@
+/**
+ * collectd - src/utils_format_influxdb.h
+ * Copyright (C) 2007-2009  Florian octo Forster
+ * Copyright (C) 2009       Aman Gupta
+ * Copyright (C) 2019       Carlos Peon Costa
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; only version 2 of the License is applicable.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
+ *
+ * Authors:
+ *   Florian octo Forster <octo at collectd.org>
+ *   Aman Gupta <aman at tmm1.net>
+ *   Carlos Peon Costa <carlospeon at gmail.com>
+ **/
+
+#ifndef UTILS_FORMAT_INFLUXDB_H
+#define UTILS_FORMAT_INFLUXDB_H 1
+
+#include "collectd.h"
+
+#include "plugin.h"
+
+int format_influxdb_point(strbuf_t *sb, metric_t metric, bool store_rates);
+
+#endif /* UTILS_FORMAT_INFLUXDB_H */
index 1986f68049271be356e5c8f927394d404a5d9f39..e31877c46208d13145347fff24758b4a24bca17f 100644 (file)
@@ -29,6 +29,7 @@
 #include "utils/cmds/putmetric.h"
 #include "utils/common/common.h"
 #include "utils/curl_stats/curl_stats.h"
+#include "utils/format_influxdb/format_influxdb.h"
 #include "utils/format_json/format_json.h"
 #include "utils/format_kairosdb/format_kairosdb.h"
 
@@ -69,6 +70,7 @@ struct wh_callback_s {
 #define WH_FORMAT_COMMAND 0
 #define WH_FORMAT_JSON 1
 #define WH_FORMAT_KAIROSDB 2
+#define WH_FORMAT_INFLUXDB 3
   int format;
   bool send_metrics;
   bool send_notifications;
@@ -88,6 +90,8 @@ struct wh_callback_s {
 
   int data_ttl;
   char *metrics_prefix;
+
+  char *unix_socket_path;
 };
 typedef struct wh_callback_s wh_callback_t;
 
@@ -133,10 +137,11 @@ static void wh_log_http_error(wh_callback_t *cb) {
 }
 
 /* must hold cb->curl_lock when calling */
-static int wh_post(wh_callback_t *cb, char const *data) {
+static int wh_post(wh_callback_t *cb, char const *data, long size) {
   pthread_mutex_lock(&cb->curl_lock);
 
   curl_easy_setopt(cb->curl, CURLOPT_URL, cb->location);
+  curl_easy_setopt(cb->curl, CURLOPT_POSTFIELDSIZE, size);
   curl_easy_setopt(cb->curl, CURLOPT_POSTFIELDS, data);
   curl_easy_setopt(cb->curl, CURLOPT_WRITEFUNCTION, &wh_curl_write_callback);
   curl_easy_setopt(cb->curl, CURLOPT_WRITEDATA, (void *)cb);
@@ -245,6 +250,11 @@ static int wh_callback_init(wh_callback_t *cb) {
     if (cb->clientkeypass != NULL)
       curl_easy_setopt(cb->curl, CURLOPT_SSLKEYPASSWD, cb->clientkeypass);
   }
+#ifdef CURL_VERSION_UNIX_SOCKETS
+  if (cb->unix_socket_path) {
+    curl_easy_setopt(cb->curl, CURLOPT_UNIX_SOCKET_PATH, cb->unix_socket_path);
+  }
+#endif // CURL_VERSION_UNIX_SOCKETS
 
   strbuf_reset(&cb->send_buffer);
 
@@ -282,6 +292,7 @@ static int wh_flush(cdtime_t timeout,
   }
 
   char const *json = strdup(cb->send_buffer.ptr);
+  const size_t size = cb->send_buffer.pos;
   strbuf_reset(&cb->send_buffer);
   cb->send_buffer_init_time = cdtime();
   pthread_mutex_unlock(&cb->send_buffer_lock);
@@ -290,7 +301,7 @@ static int wh_flush(cdtime_t timeout,
     return ENOMEM;
   }
 
-  return wh_post(cb, json);
+  return wh_post(cb, json, size);
 } /* int wh_flush */
 
 static void wh_callback_free(void *data) {
@@ -388,6 +399,25 @@ static int wh_write_kairosdb(metric_family_t const *fam, wh_callback_t *cb) {
   return 0;
 } /* int wh_write_kairosdb */
 
+static int wh_write_influxdb(metric_family_t const *fam, wh_callback_t *cb) {
+  pthread_mutex_lock(&cb->send_buffer_lock);
+
+  for (size_t i = 0; i < fam->metric.num; i++) {
+    metric_t metric = fam->metric.ptr[i];
+    int status =
+        format_influxdb_point(&cb->send_buffer, metric, cb->store_rates);
+    if (status != 0) {
+      pthread_mutex_unlock(&cb->send_buffer_lock);
+      ERROR("write_http plugin: format_influxdb_point failed: %s",
+            STRERROR(status));
+      return status;
+    }
+  }
+
+  pthread_mutex_unlock(&cb->send_buffer_lock);
+  return 0;
+} /* wh_write_influxdb */
+
 static int wh_write(metric_family_t const *fam, user_data_t *user_data) {
   if ((fam == NULL) || (user_data == NULL)) {
     return EINVAL;
@@ -405,6 +435,9 @@ static int wh_write(metric_family_t const *fam, user_data_t *user_data) {
   case WH_FORMAT_KAIROSDB:
     status = wh_write_kairosdb(fam, cb);
     break;
+  case WH_FORMAT_INFLUXDB:
+    status = wh_write_influxdb(fam, cb);
+    break;
   default:
     status = wh_write_command(fam, cb);
     break;
@@ -436,7 +469,7 @@ static int wh_notify(notification_t const *n, user_data_t *ud) {
     return -1;
   }
 
-  status = wh_post(cb, alert);
+  status = wh_post(cb, alert, -1);
   pthread_mutex_unlock(&cb->send_buffer_lock);
 
   return status;
@@ -459,6 +492,8 @@ static int config_set_format(wh_callback_t *cb, oconfig_item_t *ci) {
     cb->format = WH_FORMAT_JSON;
   else if (strcasecmp("KAIROSDB", string) == 0)
     cb->format = WH_FORMAT_KAIROSDB;
+  else if (strcasecmp("INFLUXDB", string) == 0)
+    cb->format = WH_FORMAT_INFLUXDB;
   else {
     ERROR("write_http plugin: Invalid format string: %s", string);
     return -1;
@@ -508,6 +543,7 @@ static int wh_config_node(oconfig_item_t *ci) {
   cb->data_ttl = 0;
   cb->metrics_prefix = strdup(WRITE_HTTP_DEFAULT_PREFIX);
   cb->curl_stats = NULL;
+  cb->unix_socket_path = NULL;
 
   if (cb->metrics_prefix == NULL) {
     ERROR("write_http plugin: strdup failed.");
@@ -605,7 +641,13 @@ static int wh_config_node(oconfig_item_t *ci) {
       status = cf_util_get_int(child, &cb->data_ttl);
     else if (strcasecmp("Prefix", child->key) == 0)
       status = cf_util_get_string(child, &cb->metrics_prefix);
-    else {
+    else if (strcasecmp("UnixSocket", child->key) == 0) {
+#ifdef CURL_VERSION_UNIX_SOCKETS
+      status = cf_util_get_string(child, &cb->unix_socket_path);
+#else
+      WARNING("libcurl < 7.40.0, UnixSocket config is ignored");
+#endif // CURL_VERSION_UNIX_SOCKETS
+    } else {
       ERROR("write_http plugin: Invalid configuration "
             "option: %s.",
             child->key);
index 1019de08450a3ce86ea610d8bc61143d6e873b3f..4f9c9c4cd92d4f7b4ba45515ada47ec1f6a3d690 100644 (file)
 
 #include "plugin.h"
 #include "utils/common/common.h"
+#include "utils/strbuf/strbuf.h"
 #include "utils_cache.h"
 #include "utils_complain.h"
 
+#include "utils/format_influxdb/format_influxdb.h"
+
 #if HAVE_NETDB_H
 #include <netdb.h>
 #endif
@@ -339,121 +342,6 @@ static void fill_send_buffer(char const *buffer, size_t len) {
   pthread_mutex_unlock(&send_buffer_lock);
 }
 
-static int wifxudp_escape_string(char *buffer, size_t buffer_size,
-                                 const char *string) {
-
-  if ((buffer == NULL) || (string == NULL))
-    return -EINVAL;
-
-  int dst_pos = 0;
-
-#define BUFFER_ADD(c)                                                          \
-  do {                                                                         \
-    if (dst_pos >= (buffer_size - 1)) {                                        \
-      buffer[buffer_size - 1] = 0;                                             \
-      return dst_pos++;                                                        \
-    }                                                                          \
-    buffer[dst_pos] = (c);                                                     \
-    dst_pos++;                                                                 \
-  } while (0)
-
-  /* Escape special characters */
-  for (int src_pos = 0; string[src_pos] != 0; src_pos++) {
-    if ((string[src_pos] == '\\') || (string[src_pos] == ' ') ||
-        (string[src_pos] == ',') || (string[src_pos] == '=') ||
-        (string[src_pos] == '"')) {
-      BUFFER_ADD('\\');
-      BUFFER_ADD(string[src_pos]);
-    } else
-      BUFFER_ADD(string[src_pos]);
-  } /* for */
-  buffer[dst_pos] = 0;
-
-#undef BUFFER_ADD
-
-  return dst_pos;
-} /* int wifxudp_escape_string */
-
-static int write_influxdb_point(char *buffer, int buffer_len, metric_t metric) {
-  int offset = 0;
-  bool have_values = false;
-
-#define BUFFER_ADD_ESCAPE(...)                                                 \
-  do {                                                                         \
-    int status = wifxudp_escape_string(buffer + offset, buffer_len - offset,   \
-                                       __VA_ARGS__);                           \
-    if (status < 0)                                                            \
-      return status;                                                           \
-    offset += status;                                                          \
-    if (status >= (buffer_len - offset))                                       \
-      return offset;                                                           \
-  } while (0)
-
-#define BUFFER_ADD(...)                                                        \
-  do {                                                                         \
-    int status = snprintf(buffer + offset, buffer_len - offset, __VA_ARGS__);  \
-    if (status < 0)                                                            \
-      return status;                                                           \
-    offset += status;                                                          \
-    if (status >= (buffer_len - offset))                                       \
-      return offset;                                                           \
-  } while (0)
-
-  have_values = false;
-  BUFFER_ADD_ESCAPE(metric.family->name);
-  for (size_t j = 0; j < metric.label.num; j++) {
-    label_pair_t label = metric.label.ptr[j];
-    BUFFER_ADD(",");
-    BUFFER_ADD_ESCAPE(label.name);
-    BUFFER_ADD("=");
-    BUFFER_ADD_ESCAPE(label.value);
-  }
-  BUFFER_ADD(" ");
-
-  if (wifxudp_config_store_rates &&
-      (metric.family->type == METRIC_TYPE_COUNTER)) {
-    gauge_t rate;
-    if (uc_get_rate(&metric, &rate) != 0) {
-      WARNING("write_influxdb_udp plugin: "
-              "uc_get_rate failed.");
-      return -1;
-    }
-    if (!isnan(rate)) {
-      BUFFER_ADD("value=" GAUGE_FORMAT, rate);
-      have_values = true;
-    }
-  } else {
-    switch (metric.family->type) {
-    case METRIC_TYPE_GAUGE:
-    case METRIC_TYPE_UNTYPED:
-      if (!isnan(metric.value.gauge)) {
-        BUFFER_ADD("value=" GAUGE_FORMAT, metric.value.gauge);
-        have_values = true;
-      }
-      break;
-    case METRIC_TYPE_COUNTER:
-      BUFFER_ADD("value=%" PRIi64 "i", metric.value.counter);
-      have_values = true;
-      break;
-    default:
-      WARNING("write_influxdb_udp plugin: "
-              "unknown family type.");
-      return -1;
-      break;
-    }
-  }
-
-  if (!have_values)
-    return 0;
-
-  BUFFER_ADD(" %" PRIu64 "\n", CDTIME_T_TO_MS(metric.time));
-
-#undef BUFFER_ADD_ESCAPE
-#undef BUFFER_ADD
-
-  return offset;
-} /* int write_influxdb_point */
-
 static int write_influxdb_udp_write(metric_family_t const *fam,
                                     user_data_t __attribute__((unused)) *
                                         user_data) {
@@ -461,28 +349,27 @@ static int write_influxdb_udp_write(metric_family_t const *fam,
     return EINVAL;
 
   char buffer[NET_DEFAULT_PACKET_SIZE];
-  int buffer_len = NET_DEFAULT_PACKET_SIZE;
-  int offset = 0;
+  const size_t buffer_len =
+      (wifxudp_config_packet_size < NET_DEFAULT_PACKET_SIZE
+           ? wifxudp_config_packet_size
+           : NET_DEFAULT_PACKET_SIZE);
+  strbuf_t sb = STRBUF_CREATE_FIXED(buffer, buffer_len);
 
-  if (wifxudp_config_packet_size < buffer_len)
-    buffer_len = wifxudp_config_packet_size;
-
-  for (size_t i = 0; i < fam->metric.num; i++) {
+  for (size_t i = 0; i < fam->metric.num;) {
     metric_t metric = fam->metric.ptr[i];
-    int status =
-        write_influxdb_point(buffer + offset, buffer_len - offset, metric);
-    if (status < 0) { // error
+    const size_t pos = sb.pos;
+    int status = format_influxdb_point(&sb, metric, wifxudp_config_store_rates);
+    if (status == ENOSPC) {
+      fill_send_buffer(sb.ptr, pos);
+      strbuf_reset(&sb);
+    } else if (status != 0) { // error
       ERROR("write_influxdb_udp plugin: write_influxdb_udp_write failed.");
-      return -1;
-    } else if (status >= buffer_len - offset) { // full
-      fill_send_buffer(buffer, offset);
-      offset = 0;
-      buffer[0] = 0;
+      return status;
     } else {
-      offset += status;
+      ++i;
     }
   }
-  fill_send_buffer(buffer, offset);
+  fill_send_buffer(sb.ptr, sb.pos);
   return 0;
 } /* int write_influxdb_udp_write */