]> git.ipfire.org Git - thirdparty/collectd.git/commitdiff
New plugin to send values to InfluxDB using line protocol via udp. 3162/head
authorCarlos Peon Costa <carlospeon@gmail.com>
Tue, 21 May 2019 13:20:27 +0000 (15:20 +0200)
committerCarlos Peón Costa <carlospec@inditex.com>
Thu, 27 Feb 2020 09:23:33 +0000 (10:23 +0100)
Each plugin is sent as a measurement with a time precision of miliseconds
while plugin instance, type and type instance are sent as tags.

Makefile.am
configure.ac
src/collectd.conf.in
src/collectd.conf.pod
src/write_influxdb_udp.c [new file with mode: 0644]

index 0b34088776e6dba1ca83ad01b6847ced60101d21..4d43fde9f4392ada981bf72710d6f2e2428a958f 100644 (file)
@@ -2093,6 +2093,16 @@ write_http_la_LDFLAGS = $(PLUGIN_LDFLAGS)
 write_http_la_LIBADD = libformat_json.la $(BUILD_WITH_LIBCURL_LIBS)
 endif
 
+if BUILD_PLUGIN_WRITE_INFLUXDB_UDP
+pkglib_LTLIBRARIES += write_influxdb_udp.la
+write_influxdb_udp_la_SOURCES = src/write_influxdb_udp.c
+write_influxdb_udp_la_CPPFLAGS = $(AM_CPPFLAGS)
+write_influxdb_udp_la_LDFLAGS = $(PLUGIN_LDFLAGS)
+if BUILD_WITH_LIBSOCKET
+write_influxdb_udp_la_LIBADD = -lsocket
+endif
+endif
+
 if BUILD_PLUGIN_WRITE_KAFKA
 pkglib_LTLIBRARIES += write_kafka.la
 write_kafka_la_SOURCES = src/write_kafka.c
index 6e886b94cda1ae466fdc4b8a4e5972ff96725ee5..74c97b85a7bde9ba1dbc6464b53897d046102378 100644 (file)
@@ -7009,6 +7009,7 @@ AC_PLUGIN([vserver],             [$plugin_vserver],           [Linux VServer sta
 AC_PLUGIN([wireless],            [$plugin_wireless],          [Wireless statistics])
 AC_PLUGIN([write_graphite],      [yes],                       [Graphite / Carbon output plugin])
 AC_PLUGIN([write_http],          [$with_libcurl],             [HTTP output plugin])
+AC_PLUGIN([write_influxdb_udp],  [yes],                       [Influxdb udp output plugin])
 AC_PLUGIN([write_kafka],         [$with_librdkafka],          [Kafka output plugin])
 AC_PLUGIN([write_log],           [yes],                       [Log output plugin])
 AC_PLUGIN([write_mongodb],       [$with_libmongoc],           [MongoDB output plugin])
@@ -7446,6 +7447,7 @@ AC_MSG_RESULT([    vserver . . . . . . . $enable_vserver])
 AC_MSG_RESULT([    wireless  . . . . . . $enable_wireless])
 AC_MSG_RESULT([    write_graphite  . . . $enable_write_graphite])
 AC_MSG_RESULT([    write_http  . . . . . $enable_write_http])
+AC_MSG_RESULT([    write_influxdb_udp. . $enable_write_influxdb_udp])
 AC_MSG_RESULT([    write_kafka . . . . . $enable_write_kafka])
 AC_MSG_RESULT([    write_log . . . . . . $enable_write_log])
 AC_MSG_RESULT([    write_mongodb . . . . $enable_write_mongodb])
index f6a62060b1473b267e6697194617c9f34a95905b..31580cbf7c52d1ff9b5d65f265a643426ab20ee9 100644 (file)
 #@BUILD_PLUGIN_WIRELESS_TRUE@LoadPlugin wireless
 #@BUILD_PLUGIN_WRITE_GRAPHITE_TRUE@LoadPlugin write_graphite
 #@BUILD_PLUGIN_WRITE_HTTP_TRUE@LoadPlugin write_http
+#@BUILD_PLUGIN_WRITE_INFLUXDB_UDP_TRUE@LoadPlugin write_influxdb_udp
 #@BUILD_PLUGIN_WRITE_KAFKA_TRUE@LoadPlugin write_kafka
 #@BUILD_PLUGIN_WRITE_LOG_TRUE@LoadPlugin write_log
 #@BUILD_PLUGIN_WRITE_MONGODB_TRUE@LoadPlugin write_mongodb
 #      </Node>
 #</Plugin>
 
+#<Plugin write_influxdb_udp>
+#  Server "localhost"
+#  StoreRates true
+#  MaxPacketSize 32768
+#  TimeToLive 128
+#</Plugin>
+
 #<Plugin write_kafka>
 #  Property "metadata.broker.list" "localhost:9092"
 #  <Topic "collectd">
index 1e94a8c9da36e40fd4a32aed1641a3209460b860..2d4c8d0fc3bb0744bcd6b486918853bb78468e89 100644 (file)
@@ -10539,6 +10539,48 @@ slightly below this interval, which you can estimate by monitoring the network
 traffic between collectd and the HTTP server.
 
 
+=back
+
+=head2 Plugin C<write_influxdb_udp>
+
+The write_influxdb_udp plugin sends data to a instance of InfluxDB using the
+"Line Protocol". Each plugin is sent as a measurement with a time precision of
+miliseconds while plugin instance, type and type instance are sent as tags.
+
+ <Plugin "write_influxdb_udp">
+   Server "influxdb.internal.tld"
+   StoreRates "yes"
+ </Plugin>
+
+=over 4
+
+=item B<E<lt>Server> I<Host> [I<Port>]B<E<gt>>
+
+The B<Server> statement sets the server to send datagrams to.
+
+The argument I<Host> may be a hostname, an IPv4 address or an IPv6 address. The
+optional second argument specifies a port number or a service name. If not
+given, the default, B<8089>, is used.
+
+=item B<TimeToLive> I<1-255>
+
+Set the time-to-live of sent packets. This applies to all, unicast and
+multicast, and IPv4 and IPv6 packets. The default is to not change this value.
+That means that multicast packets will be sent with a TTL of C<1> (one) on most
+operating systems.
+
+=item B<MaxPacketSize> I<1024-65535>
+
+Set the maximum size for datagrams received over the network. Packets larger
+than this will be truncated. Defaults to 1452E<nbsp>bytes, which is the maximum
+payload size that can be transmitted in one Ethernet frame using IPv6E<nbsp>/
+UDP.
+
+=item B<StoreRates> B<true|false>
+
+If set to B<true>, convert absolute, counter and derive values to rates. If set
+to B<false> (the default) absolute, counter and derive values are sent as is.
+
 =back
 
 =head2 Plugin C<write_kafka>
diff --git a/src/write_influxdb_udp.c b/src/write_influxdb_udp.c
new file mode 100644 (file)
index 0000000..8e3e90e
--- /dev/null
@@ -0,0 +1,670 @@
+/**
+ * collectd - src/write_influxdb_udp.c
+ * Copyright (C) 2007-2009  Florian octo Forster
+ * Copyright (C) 2009       Aman Gupta
+ * Copyright (C) 2019       Carlos Peon Costa
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; only version 2 of the License is applicable.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
+ *
+ * Authors:
+ *   Florian octo Forster <octo at collectd.org>
+ *   Aman Gupta <aman at tmm1.net>
+ *   Carlos Peon Costa <carlospeon at gmail.com>
+ **/
+
+#include "collectd.h"
+
+#include "plugin.h"
+#include "utils/common/common.h"
+#include "utils_cache.h"
+#include "utils_complain.h"
+
+#if HAVE_NETDB_H
+#include <netdb.h>
+#endif
+#if HAVE_NETINET_IN_H
+#include <netinet/in.h>
+#endif
+#if HAVE_ARPA_INET_H
+#include <arpa/inet.h>
+#endif
+#if HAVE_POLL_H
+#include <poll.h>
+#endif
+#if HAVE_NET_IF_H
+#include <net/if.h>
+#endif
+
+struct sockent_client {
+  int fd;
+  struct sockaddr_storage *addr;
+  socklen_t addrlen;
+  cdtime_t next_resolve_reconnect;
+  cdtime_t resolve_interval;
+  struct sockaddr_storage *bind_addr;
+};
+
+typedef struct sockent {
+  char *node;
+  char *service;
+  int interface;
+  struct sockent_client client;
+} sockent_t;
+
+#define NET_DEFAULT_PACKET_SIZE 1452
+#define NET_DEFAULT_PORT "8089"
+
+/*
+ * Private variables
+ */
+
+static int wifxudp_config_ttl;
+static size_t wifxudp_config_packet_size = NET_DEFAULT_PACKET_SIZE;
+static bool wifxudp_config_store_rates;
+
+static sockent_t *sending_socket;
+
+/* Buffer in which to-be-sent network packets are constructed. */
+static char *send_buffer;
+static char *send_buffer_ptr;
+static int send_buffer_fill;
+static cdtime_t send_buffer_last_update;
+static pthread_mutex_t send_buffer_lock = PTHREAD_MUTEX_INITIALIZER;
+
+static int listen_loop;
+
+static int set_ttl(const sockent_t *se, const struct addrinfo *ai) {
+
+  if ((wifxudp_config_ttl < 1) || (wifxudp_config_ttl > 255))
+    return -1;
+
+  if (ai->ai_family == AF_INET) {
+    struct sockaddr_in *addr = (struct sockaddr_in *)ai->ai_addr;
+    int optname;
+
+    if (IN_MULTICAST(ntohl(addr->sin_addr.s_addr)))
+      optname = IP_MULTICAST_TTL;
+    else
+      optname = IP_TTL;
+
+    if (setsockopt(se->client.fd, IPPROTO_IP, optname, &wifxudp_config_ttl,
+                   sizeof(wifxudp_config_ttl)) != 0) {
+      ERROR("write_influxdb_udp plugin: setsockopt (ipv4-ttl): %s", STRERRNO);
+      return -1;
+    }
+  } else if (ai->ai_family == AF_INET6) {
+    /* Useful example:
+     * http://gsyc.escet.urjc.es/~eva/IPv6-web/examples/mcast.html */
+    struct sockaddr_in6 *addr = (struct sockaddr_in6 *)ai->ai_addr;
+    int optname;
+
+    if (IN6_IS_ADDR_MULTICAST(&addr->sin6_addr))
+      optname = IPV6_MULTICAST_HOPS;
+    else
+      optname = IPV6_UNICAST_HOPS;
+
+    if (setsockopt(se->client.fd, IPPROTO_IPV6, optname, &wifxudp_config_ttl,
+                   sizeof(wifxudp_config_ttl)) != 0) {
+      ERROR("write_influxdb_udp plugin: setsockopt(ipv6-ttl): %s", STRERRNO);
+      return -1;
+    }
+  }
+
+  return 0;
+} /* int set_ttl */
+
+static int bind_socket_to_addr(sockent_t *se, const struct addrinfo *ai) {
+
+  if (se->client.bind_addr == NULL)
+    return 0;
+
+  char pbuffer[64];
+
+  if (ai->ai_family == AF_INET) {
+    struct sockaddr_in *addr = (struct sockaddr_in *)(se->client.bind_addr);
+    inet_ntop(AF_INET, &(addr->sin_addr), pbuffer, 64);
+
+    if (bind(se->client.fd, (struct sockaddr *)addr, sizeof(*addr)) == -1)
+      return -1;
+  } else if (ai->ai_family == AF_INET6) {
+    struct sockaddr_in6 *addr = (struct sockaddr_in6 *)(se->client.bind_addr);
+    inet_ntop(AF_INET6, &(addr->sin6_addr), pbuffer, 64);
+
+    if (bind(se->client.fd, (struct sockaddr *)addr, sizeof(*addr)) == -1)
+      return -1;
+  }
+
+  return 0;
+} /* int bind_socket_to_addr */
+
+static sockent_t *sockent_create() {
+  sockent_t *se = calloc(1, sizeof(*se));
+  if (se == NULL)
+    return NULL;
+
+  se->node = NULL;
+  se->service = NULL;
+  se->interface = 0;
+
+  se->client.fd = -1;
+  se->client.addr = NULL;
+  se->client.bind_addr = NULL;
+  se->client.resolve_interval = 0;
+  se->client.next_resolve_reconnect = 0;
+
+  return se;
+} /* sockent_t *sockent_create */
+
+static int sockent_client_disconnect(sockent_t *se) {
+
+  if (se == NULL)
+    return EINVAL;
+
+  struct sockent_client *client = &se->client;
+  if (client->fd >= 0) /* connected */
+  {
+    close(client->fd);
+    client->fd = -1;
+  }
+
+  sfree(client->addr);
+  client->addrlen = 0;
+
+  return 0;
+} /* int sockent_client_disconnect */
+
+static int sockent_client_connect(sockent_t *se) {
+  struct addrinfo *ai_list;
+  static c_complain_t complaint = C_COMPLAIN_INIT_STATIC;
+  bool reconnect = false;
+
+  if (se == NULL)
+    return EINVAL;
+
+  struct sockent_client *client = &se->client;
+
+  cdtime_t now = cdtime();
+  if (client->resolve_interval != 0 && client->next_resolve_reconnect < now) {
+    DEBUG("write_influxdb_udp plugin: "
+          "Reconnecting socket, resolve_interval = %lf, "
+          "next_resolve_reconnect = %lf",
+          CDTIME_T_TO_DOUBLE(client->resolve_interval),
+          CDTIME_T_TO_DOUBLE(client->next_resolve_reconnect));
+    reconnect = true;
+  }
+
+  if (client->fd >= 0 && !reconnect) /* already connected and not stale*/
+    return 0;
+
+  struct addrinfo ai_hints = {.ai_family = AF_UNSPEC,
+                              .ai_flags = AI_ADDRCONFIG,
+                              .ai_protocol = IPPROTO_UDP,
+                              .ai_socktype = SOCK_DGRAM};
+
+  int status = getaddrinfo(
+      se->node, (se->service != NULL) ? se->service : NET_DEFAULT_PORT,
+      &ai_hints, &ai_list);
+  if (status != 0) {
+    c_complain(LOG_ERR, &complaint,
+               "write_influxdb_udp plugin: getaddrinfo (%s, %s) failed: %s",
+               (se->node == NULL) ? "(null)" : se->node,
+               (se->service == NULL) ? "(null)" : se->service,
+               gai_strerror(status));
+    return -1;
+  } else {
+    c_release(LOG_NOTICE, &complaint,
+              "write_influxdb_udp plugin: Successfully resolved \"%s\".",
+              se->node);
+  }
+
+  for (struct addrinfo *ai_ptr = ai_list; ai_ptr != NULL;
+       ai_ptr = ai_ptr->ai_next) {
+    if (client->fd >= 0) /* when we reconnect */
+      sockent_client_disconnect(se);
+
+    client->fd =
+        socket(ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
+    if (client->fd < 0) {
+      ERROR("write_influxdb_udp plugin: socket(2) failed: %s", STRERRNO);
+      continue;
+    }
+
+    client->addr = calloc(1, sizeof(*client->addr));
+    if (client->addr == NULL) {
+      ERROR("write_influxdb_udp plugin: calloc failed.");
+      close(client->fd);
+      client->fd = -1;
+      continue;
+    }
+
+    assert(sizeof(*client->addr) >= ai_ptr->ai_addrlen);
+    memcpy(client->addr, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
+    client->addrlen = ai_ptr->ai_addrlen;
+
+    set_ttl(se, ai_ptr);
+    bind_socket_to_addr(se, ai_ptr);
+
+    /* We don't open more than one write-socket per
+     * node/service pair.. */
+    break;
+  }
+
+  freeaddrinfo(ai_list);
+  if (client->fd < 0)
+    return -1;
+
+  if (client->resolve_interval > 0)
+    client->next_resolve_reconnect = now + client->resolve_interval;
+  return 0;
+} /* int sockent_client_connect */
+
+static void free_sockent_client(struct sockent_client *sec) {
+  if (sec->fd >= 0) {
+    close(sec->fd);
+    sec->fd = -1;
+  }
+  sfree(sec->addr);
+  sfree(sec->bind_addr);
+} /* void free_sockent_client */
+
+static void sockent_destroy(sockent_t *se) {
+
+  if (se != NULL) {
+    sfree(se->node);
+    sfree(se->service);
+
+    free_sockent_client(&se->client);
+
+    sfree(se);
+  }
+} /* void sockent_destroy */
+
+static void write_influxdb_udp_init_buffer(void) {
+  memset(send_buffer, 0, wifxudp_config_packet_size);
+  send_buffer_ptr = send_buffer;
+  send_buffer_fill = 0;
+  send_buffer_last_update = 0;
+} /* write_influxdb_udp_init_buffer */
+
+static void write_influxdb_udp_send_buffer(const char *buffer,
+                                           size_t buffer_size) {
+  while (42) {
+    int status = sockent_client_connect(sending_socket);
+    if (status != 0)
+      return;
+
+    status =
+        sendto(sending_socket->client.fd, buffer, buffer_size,
+               /* flags = */ 0, (struct sockaddr *)sending_socket->client.addr,
+               sending_socket->client.addrlen);
+    if (status < 0) {
+      if ((errno == EINTR) || (errno == EAGAIN))
+        continue;
+
+      ERROR("write_influxdb_udp plugin: "
+            "sendto failed: %s. Closing sending socket.",
+            STRERRNO);
+      sockent_client_disconnect(sending_socket);
+      return;
+    }
+
+    break;
+  } /* while (42) */
+} /* void write_influxdb_udp_send_buffer */
+
+static void flush_buffer(void) {
+  write_influxdb_udp_send_buffer(send_buffer, (size_t)send_buffer_fill);
+  write_influxdb_udp_init_buffer();
+}
+
+static int wifxudp_escape_string(char *buffer, size_t buffer_size,
+                                 const char *string) {
+
+  if ((buffer == NULL) || (string == NULL))
+    return -EINVAL;
+
+  if (buffer_size < 3)
+    return -ENOMEM;
+
+  int dst_pos = 0;
+
+#define BUFFER_ADD(c)                                                          \
+  do {                                                                         \
+    if (dst_pos >= (buffer_size - 1)) {                                        \
+      buffer[buffer_size - 1] = '\0';                                          \
+      return -ENOMEM;                                                          \
+    }                                                                          \
+    buffer[dst_pos] = (c);                                                     \
+    dst_pos++;                                                                 \
+  } while (0)
+
+  /* Escape special characters */
+  for (int src_pos = 0; string[src_pos] != 0; src_pos++) {
+    if ((string[src_pos] == '\\') || (string[src_pos] == ' ') ||
+        (string[src_pos] == ',') || (string[src_pos] == '=') ||
+        (string[src_pos] == '"')) {
+      BUFFER_ADD('\\');
+      BUFFER_ADD(string[src_pos]);
+    } else
+      BUFFER_ADD(string[src_pos]);
+  } /* for */
+  buffer[dst_pos] = 0;
+
+#undef BUFFER_ADD
+
+  return dst_pos;
+} /* int wifxudp_escape_string */
+
+static int write_influxdb_point(char *buffer, int buffer_len,
+                                const data_set_t *ds, const value_list_t *vl) {
+  int status;
+  int offset = 0;
+  gauge_t *rates = NULL;
+  bool have_values = false;
+
+  assert(0 == strcmp(ds->type, vl->type));
+
+#define BUFFER_ADD_ESCAPE(...)                                                 \
+  do {                                                                         \
+    status = wifxudp_escape_string(buffer + offset, buffer_len - offset,       \
+                                   __VA_ARGS__);                               \
+    if (status < 0)                                                            \
+      return -1;                                                               \
+    offset += status;                                                          \
+  } while (0)
+
+#define BUFFER_ADD(...)                                                        \
+  do {                                                                         \
+    status = snprintf(buffer + offset, buffer_len - offset, __VA_ARGS__);      \
+    if ((status < 0) || (status >= (buffer_len - offset))) {                   \
+      sfree(rates);                                                            \
+      return -1;                                                               \
+    }                                                                          \
+    offset += status;                                                          \
+  } while (0)
+
+  BUFFER_ADD_ESCAPE(vl->plugin);
+  BUFFER_ADD(",host=");
+  BUFFER_ADD_ESCAPE(vl->host);
+  if (strcmp(vl->plugin_instance, "") != 0) {
+    BUFFER_ADD(",instance=");
+    BUFFER_ADD_ESCAPE(vl->plugin_instance);
+  }
+  if (strcmp(vl->type, "") != 0) {
+    BUFFER_ADD(",type=");
+    BUFFER_ADD_ESCAPE(vl->type);
+  }
+  if (strcmp(vl->type_instance, "") != 0) {
+    BUFFER_ADD(",type_instance=");
+    BUFFER_ADD_ESCAPE(vl->type_instance);
+  }
+
+  BUFFER_ADD(" ");
+  for (size_t i = 0; i < ds->ds_num; i++) {
+    if ((ds->ds[i].type != DS_TYPE_COUNTER) &&
+        (ds->ds[i].type != DS_TYPE_GAUGE) &&
+        (ds->ds[i].type != DS_TYPE_DERIVE) &&
+        (ds->ds[i].type != DS_TYPE_ABSOLUTE)) {
+      sfree(rates);
+      return -1;
+    }
+
+    if (ds->ds[i].type == DS_TYPE_GAUGE) {
+      if (isnan(vl->values[i].gauge))
+        continue;
+      if (have_values)
+        BUFFER_ADD(",");
+      BUFFER_ADD("%s=%lf", ds->ds[i].name, vl->values[i].gauge);
+      have_values = true;
+    } else if (wifxudp_config_store_rates) {
+      if (rates == NULL)
+        rates = uc_get_rate(ds, vl);
+      if (rates == NULL) {
+        WARNING("write_influxdb_udp plugin: "
+                "uc_get_rate failed.");
+        return -1;
+      }
+      if (isnan(rates[i]))
+        continue;
+      if (have_values)
+        BUFFER_ADD(",");
+      BUFFER_ADD("%s=%lf", ds->ds[i].name, rates[i]);
+      have_values = true;
+    } else if (ds->ds[i].type == DS_TYPE_COUNTER) {
+      if (have_values)
+        BUFFER_ADD(",");
+      BUFFER_ADD("%s=%" PRIu64 "i", ds->ds[i].name,
+                 (uint64_t)vl->values[i].counter);
+      have_values = true;
+    } else if (ds->ds[i].type == DS_TYPE_DERIVE) {
+      if (have_values)
+        BUFFER_ADD(",");
+      BUFFER_ADD("%s=%" PRIi64 "i", ds->ds[i].name, vl->values[i].derive);
+      have_values = true;
+    } else if (ds->ds[i].type == DS_TYPE_ABSOLUTE) {
+      if (have_values)
+        BUFFER_ADD(",");
+      BUFFER_ADD("%s=%" PRIu64 "i", ds->ds[i].name, vl->values[i].absolute);
+      have_values = true;
+    }
+
+  } /* for ds->ds_num */
+  sfree(rates);
+
+  if (!have_values)
+    return 0;
+
+  BUFFER_ADD(" %" PRIu64 "\n", CDTIME_T_TO_MS(vl->time));
+
+#undef BUFFER_ADD_ESCAPE
+#undef BUFFER_ADD
+
+  return offset;
+} /* int write_influxdb_point */
+
+static int
+write_influxdb_udp_write(const data_set_t *ds, const value_list_t *vl,
+                         user_data_t __attribute__((unused)) * user_data) {
+
+  /* listen_loop is set to non-zero in the shutdown callback, which is
+   * guaranteed to be called *after* all the write threads have been shut
+   * down. */
+  assert(listen_loop == 0);
+
+  pthread_mutex_lock(&send_buffer_lock);
+
+  int status = write_influxdb_point(
+      send_buffer_ptr, wifxudp_config_packet_size - send_buffer_fill, ds, vl);
+
+  if (status < 0) {
+    flush_buffer();
+    status = write_influxdb_point(
+        send_buffer_ptr, wifxudp_config_packet_size - send_buffer_fill, ds, vl);
+  }
+  if (status < 0) {
+    ERROR("write_influxdb_udp plugin: write_influxdb_udp_write failed.");
+    pthread_mutex_unlock(&send_buffer_lock);
+    return -1;
+  }
+  if (status == 0) {
+    /* no real values to send (nan) */
+    pthread_mutex_unlock(&send_buffer_lock);
+    return 0;
+  }
+
+  send_buffer_fill += status;
+  send_buffer_ptr += status;
+  send_buffer_last_update = cdtime();
+
+  if (wifxudp_config_packet_size - send_buffer_fill < 120)
+    /* No room for a new point of average size in buffer,
+       the probability of fail for the new point is bigger than
+       the probability of success */
+    flush_buffer();
+
+  pthread_mutex_unlock(&send_buffer_lock);
+  return 0;
+} /* int write_influxdb_udp_write */
+
+static int wifxudp_config_set_ttl(const oconfig_item_t *ci) {
+  int tmp = 0;
+
+  if (cf_util_get_int(ci, &tmp) != 0)
+    return -1;
+  else if ((tmp > 0) && (tmp <= 255))
+    wifxudp_config_ttl = tmp;
+  else {
+    WARNING("write_influxdb_udp plugin: "
+            "The `TimeToLive' must be between 1 and 255.");
+    return -1;
+  }
+
+  return 0;
+} /* int wifxudp_config_set_ttl */
+
+static int wifxudp_config_set_buffer_size(const oconfig_item_t *ci) {
+  int tmp = 0;
+
+  if (cf_util_get_int(ci, &tmp) != 0)
+    return -1;
+  else if ((tmp >= 1024) && (tmp <= 65535))
+    wifxudp_config_packet_size = tmp;
+  else {
+    WARNING("write_influxdb_udp plugin: "
+            "The `MaxPacketSize' must be between 1024 and 65535.");
+    return -1;
+  }
+
+  return 0;
+} /* int wifxudp_config_set_buffer_size */
+
+static int wifxudp_config_set_server(const oconfig_item_t *ci) {
+  if ((ci->values_num < 1) || (ci->values_num > 2) ||
+      (ci->values[0].type != OCONFIG_TYPE_STRING) ||
+      ((ci->values_num > 1) && (ci->values[1].type != OCONFIG_TYPE_STRING))) {
+    ERROR("write_influxdb_udp plugin: The `%s' config option needs "
+          "one or two string arguments.",
+          ci->key);
+    return -1;
+  }
+
+  sending_socket = sockent_create();
+  if (sending_socket == NULL) {
+    ERROR("write_influxdb_udp plugin: sockent_create failed.");
+    return -1;
+  }
+
+  sending_socket->node = strdup(ci->values[0].value.string);
+  if (ci->values_num >= 2)
+    sending_socket->service = strdup(ci->values[1].value.string);
+
+  return 0;
+} /* int wifxudp_config_set_server */
+
+static int write_influxdb_udp_config(oconfig_item_t *ci) {
+  for (int i = 0; i < ci->children_num; i++) {
+    oconfig_item_t *child = ci->children + i;
+
+    if (strcasecmp("Server", child->key) == 0)
+      wifxudp_config_set_server(child);
+    else if (strcasecmp("TimeToLive", child->key) == 0) {
+      wifxudp_config_set_ttl(child);
+    } else if (strcasecmp("MaxPacketSize", child->key) == 0)
+      wifxudp_config_set_buffer_size(child);
+    else if (strcasecmp("StoreRates", child->key) == 0)
+      cf_util_get_boolean(child, &wifxudp_config_store_rates);
+    else {
+      WARNING("write_influxdb_udp plugin: "
+              "Option `%s' is not allowed here.",
+              child->key);
+    }
+  }
+
+  return 0;
+} /* int write_influxdb_udp_config */
+
+static int write_influxdb_udp_shutdown(void) {
+  if (send_buffer_fill > 0)
+    flush_buffer();
+
+  sfree(send_buffer);
+
+  if (sending_socket != NULL) {
+    sockent_client_disconnect(sending_socket);
+    sockent_destroy(sending_socket);
+  }
+
+  plugin_unregister_config("write_influxdb_udp");
+  plugin_unregister_init("write_influxdb_udp");
+  plugin_unregister_write("write_influxdb_udp");
+  plugin_unregister_shutdown("write_influxdb_udp");
+
+  return 0;
+} /* int write_influxdb_udp_shutdown */
+
+static int write_influxdb_udp_init(void) {
+  static bool have_init;
+
+  /* Check if we were already initialized. If so, just return - there's
+   * nothing more to do (for now, that is). */
+  if (have_init)
+    return 0;
+  have_init = true;
+
+  plugin_register_shutdown("write_influxdb_udp", write_influxdb_udp_shutdown);
+
+  send_buffer = malloc(wifxudp_config_packet_size);
+  if (send_buffer == NULL) {
+    ERROR("write_influxdb_udp plugin: malloc failed.");
+    return -1;
+  }
+  write_influxdb_udp_init_buffer();
+
+  /* setup socket(s) and so on */
+  if (sending_socket != NULL) {
+    plugin_register_write("write_influxdb_udp", write_influxdb_udp_write,
+                          /* user_data = */ NULL);
+  }
+
+  return 0;
+} /* int write_influxdb_udp_init */
+
+static int write_influxdb_udp_flush(cdtime_t timeout,
+                                    __attribute__((unused))
+                                    const char *identifier,
+                                    __attribute__((unused))
+                                    user_data_t *user_data) {
+  pthread_mutex_lock(&send_buffer_lock);
+
+  if (send_buffer_fill > 0) {
+    if (timeout > 0) {
+      cdtime_t now = cdtime();
+      if ((send_buffer_last_update + timeout) > now) {
+        pthread_mutex_unlock(&send_buffer_lock);
+        return 0;
+      }
+    }
+    flush_buffer();
+  }
+  pthread_mutex_unlock(&send_buffer_lock);
+
+  return 0;
+} /* int write_influxdb_udp_flush */
+
+void module_register(void) {
+  plugin_register_complex_config("write_influxdb_udp",
+                                 write_influxdb_udp_config);
+  plugin_register_init("write_influxdb_udp", write_influxdb_udp_init);
+  plugin_register_flush("write_influxdb_udp", write_influxdb_udp_flush, NULL);
+} /* void module_register */