]> git.ipfire.org Git - thirdparty/collectd.git/commitdiff
Implement support for multiple Server entries in write_influxdb_udp
authorPaul <itsascambutmailmeanyway@gmail.com>
Sat, 4 Sep 2021 15:39:23 +0000 (17:39 +0200)
committerMatthias Runge <mrunge@matthias-runge.de>
Thu, 9 Sep 2021 05:45:50 +0000 (07:45 +0200)
-creates one socket for each Server row in the CFG: e.g.

LoadPlugin write_influxdb_udp
<Plugin write_influxdb_udp>
  Server "some.host.tld" "8585"
  Server "some.host.tld" "8686"
  Server "192.0.2.1" "8001"
  Server "192.0.2.5" "8007"
  Server "2001:db8::19" "5000"
</Plugin>

Tested on 5.13 and 6.0 code-base.

src/collectd.conf.pod
src/write_influxdb_udp.c

index 0b1a038db5585e05203f5756585700080c508eb2..43855bdb4cfa37bbcbe517379fa29c396e4e3c6b 100644 (file)
@@ -11209,12 +11209,13 @@ traffic between collectd and the HTTP server.
 
 =head2 Plugin C<write_influxdb_udp>
 
-The write_influxdb_udp plugin sends data to a instance of InfluxDB using the
+The write_influxdb_udp plugin sends data to instances of InfluxDB using the
 "Line Protocol". Each plugin is sent as a measurement with a time precision of
 miliseconds while plugin instance, type and type instance are sent as tags.
 
  <Plugin "write_influxdb_udp">
-   Server "influxdb.internal.tld"
+   Server "influxdb.fqdn"
+   Server "influxdb2.fqdn"
    TimePrecision "ms"
    StoreRates "yes"
  </Plugin>
@@ -11223,11 +11224,13 @@ miliseconds while plugin instance, type and type instance are sent as tags.
 
 =item B<E<lt>Server> I<Host> [I<Port>]B<E<gt>>
 
-The B<Server> statement sets the server to send datagrams to.
+The B<Server> statement sets a server to send datagrams to. This statement can
+appear multiple times, once for each unique destination to send to.
 
 The argument I<Host> may be a hostname, an IPv4 address or an IPv6 address. The
 optional second argument specifies a port number or a service name. If not
-given, the default, B<8089>, is used.
+given, the default, B<8089>, is used. The arguments I<Host> and I<Port> should
+be enclosed in "quotes".
 
 =item B<TimePrecision> I<ms>|I<us>|I<ns>
 
index 6f63ced14c98583be143eb6b54ac89c798d704ee..6b4ddf3b6220333ebe90992af7c8ecdca0fa39ac 100644 (file)
@@ -21,6 +21,8 @@
  *   Florian octo Forster <octo at collectd.org>
  *   Aman Gupta <aman at tmm1.net>
  *   Carlos Peon Costa <carlospeon at gmail.com>
+ *   multiple Server directives by:
+ *   Paul (systemcrash) <newtwen thatfunny_at_symbol gmail.com>
  **/
 
 #include "collectd.h"
@@ -60,6 +62,9 @@ typedef struct sockent {
   char *service;
   int interface;
   struct sockent_client client;
+
+  pthread_mutex_t lock;
+  struct sockent *next;
 } sockent_t;
 
 #define NET_DEFAULT_PACKET_SIZE 1452
@@ -80,7 +85,7 @@ static size_t wifxudp_config_packet_size = NET_DEFAULT_PACKET_SIZE;
 static bool wifxudp_config_store_rates;
 static wifxudp_time_precision_t wifxudp_config_time_precision = MS;
 
-static sockent_t *sending_socket;
+static sockent_t *sending_sockets;
 
 /* Buffer in which to-be-sent network packets are constructed. */
 static char *send_buffer;
@@ -161,6 +166,8 @@ static sockent_t *sockent_create() {
   se->node = NULL;
   se->service = NULL;
   se->interface = 0;
+  se->next = NULL;
+  pthread_mutex_init(&se->lock, NULL);
 
   se->client.fd = -1;
   se->client.addr = NULL;
@@ -284,14 +291,18 @@ static void free_sockent_client(struct sockent_client *sec) {
 } /* void free_sockent_client */
 
 static void sockent_destroy(sockent_t *se) {
+  sockent_t *next;
 
-  if (se != NULL) {
+  while (se != NULL) {
+    next = se->next;
     sfree(se->node);
     sfree(se->service);
 
+    pthread_mutex_destroy(&se->lock);
     free_sockent_client(&se->client);
 
     sfree(se);
+    se = next;
   }
 } /* void sockent_destroy */
 
@@ -302,7 +313,8 @@ static void write_influxdb_udp_init_buffer(void) {
   send_buffer_last_update = 0;
 } /* write_influxdb_udp_init_buffer */
 
-static void write_influxdb_udp_send_buffer(const char *buffer,
+static void write_influxdb_udp_send_buffer(sockent_t *sending_socket,
+                                           const char *buffer,
                                            size_t buffer_size) {
   while (42) {
     int status = sockent_client_connect(sending_socket);
@@ -328,8 +340,16 @@ static void write_influxdb_udp_send_buffer(const char *buffer,
   } /* while (42) */
 } /* void write_influxdb_udp_send_buffer */
 
+static void write_influxdb_send_buffers(char *buffer, size_t buffer_len) {
+  for (sockent_t *se = sending_sockets; se != NULL; se = se->next) {
+    pthread_mutex_lock(&se->lock);
+    write_influxdb_udp_send_buffer(se, buffer, buffer_len);
+    pthread_mutex_unlock(&se->lock);
+  } /* for (sending_sockets) */
+}
+
 static void flush_buffer(void) {
-  write_influxdb_udp_send_buffer(send_buffer, (size_t)send_buffer_fill);
+  write_influxdb_send_buffers(send_buffer, (size_t)send_buffer_fill);
   write_influxdb_udp_init_buffer();
 }
 
@@ -556,6 +576,28 @@ static int wifxudp_config_set_buffer_size(const oconfig_item_t *ci) {
   return 0;
 } /* int wifxudp_config_set_buffer_size */
 
+/* Add a sockent to the global list of sockets */
+static int sockent_add(sockent_t *se) /* {{{ */
+{
+  sockent_t *last_ptr;
+
+  if (se == NULL)
+    return -1;
+  else {
+    if (sending_sockets == NULL) {
+      sending_sockets = se;
+      return 0;
+    }
+    last_ptr = sending_sockets;
+  }
+
+  while (last_ptr->next != NULL)
+    last_ptr = last_ptr->next;
+  last_ptr->next = se;
+
+  return 0;
+} /* }}} int sockent_add */
+
 static int wifxudp_config_set_server(const oconfig_item_t *ci) {
   if ((ci->values_num < 1) || (ci->values_num > 2) ||
       (ci->values[0].type != OCONFIG_TYPE_STRING) ||
@@ -566,6 +608,9 @@ static int wifxudp_config_set_server(const oconfig_item_t *ci) {
     return -1;
   }
 
+  sockent_t *sending_socket;
+  int status;
+
   sending_socket = sockent_create();
   if (sending_socket == NULL) {
     ERROR("write_influxdb_udp plugin: sockent_create failed.");
@@ -576,6 +621,14 @@ static int wifxudp_config_set_server(const oconfig_item_t *ci) {
   if (ci->values_num >= 2)
     sending_socket->service = strdup(ci->values[1].value.string);
 
+  status = sockent_add(sending_socket);
+  if (status != 0) {
+    ERROR("write_influxdb_udp plugin: wifxudp_config_set_server: sockent_add "
+          "failed.");
+    sockent_destroy(sending_socket);
+    return -1;
+  }
+
   return 0;
 } /* int wifxudp_config_set_server */
 
@@ -632,10 +685,9 @@ static int write_influxdb_udp_shutdown(void) {
 
   sfree(send_buffer);
 
-  if (sending_socket != NULL) {
-    sockent_client_disconnect(sending_socket);
-    sockent_destroy(sending_socket);
-  }
+  for (sockent_t *se = sending_sockets; se != NULL; se = se->next)
+    sockent_client_disconnect(se);
+  sockent_destroy(sending_sockets);
 
   plugin_unregister_config("write_influxdb_udp");
   plugin_unregister_init("write_influxdb_udp");
@@ -664,7 +716,7 @@ static int write_influxdb_udp_init(void) {
   write_influxdb_udp_init_buffer();
 
   /* setup socket(s) and so on */
-  if (sending_socket != NULL) {
+  if (sending_sockets != NULL) {
     plugin_register_write("write_influxdb_udp", write_influxdb_udp_write,
                           /* user_data = */ NULL);
   }