From: Paul Date: Sat, 4 Sep 2021 15:39:23 +0000 (+0200) Subject: Implement support for multiple Server entries in write_influxdb_udp X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=d64597cff6225b57c33d7f8dbc599fa549f75b3f;p=thirdparty%2Fcollectd.git Implement support for multiple Server entries in write_influxdb_udp -creates one socket for each Server row in the CFG: e.g. LoadPlugin write_influxdb_udp Server "some.host.tld" "8585" Server "some.host.tld" "8686" Server "192.0.2.1" "8001" Server "192.0.2.5" "8007" Server "2001:db8::19" "5000" Tested on 5.13 and 6.0 code-base. --- diff --git a/src/collectd.conf.pod b/src/collectd.conf.pod index 0b1a038db..43855bdb4 100644 --- a/src/collectd.conf.pod +++ b/src/collectd.conf.pod @@ -11209,12 +11209,13 @@ traffic between collectd and the HTTP server. =head2 Plugin C -The write_influxdb_udp plugin sends data to a instance of InfluxDB using the +The write_influxdb_udp plugin sends data to instances of InfluxDB using the "Line Protocol". Each plugin is sent as a measurement with a time precision of miliseconds while plugin instance, type and type instance are sent as tags. - Server "influxdb.internal.tld" + Server "influxdb.fqdn" + Server "influxdb2.fqdn" TimePrecision "ms" StoreRates "yes" @@ -11223,11 +11224,13 @@ miliseconds while plugin instance, type and type instance are sent as tags. =item BServer> I [I]B> -The B statement sets the server to send datagrams to. +The B statement sets a server to send datagrams to. This statement can +appear multiple times, once for each unique destination to send to. The argument I may be a hostname, an IPv4 address or an IPv6 address. The optional second argument specifies a port number or a service name. If not -given, the default, B<8089>, is used. +given, the default, B<8089>, is used. The arguments I and I should +be enclosed in "quotes". =item B I|I|I diff --git a/src/write_influxdb_udp.c b/src/write_influxdb_udp.c index 6f63ced14..6b4ddf3b6 100644 --- a/src/write_influxdb_udp.c +++ b/src/write_influxdb_udp.c @@ -21,6 +21,8 @@ * Florian octo Forster * Aman Gupta * Carlos Peon Costa + * multiple Server directives by: + * Paul (systemcrash) **/ #include "collectd.h" @@ -60,6 +62,9 @@ typedef struct sockent { char *service; int interface; struct sockent_client client; + + pthread_mutex_t lock; + struct sockent *next; } sockent_t; #define NET_DEFAULT_PACKET_SIZE 1452 @@ -80,7 +85,7 @@ static size_t wifxudp_config_packet_size = NET_DEFAULT_PACKET_SIZE; static bool wifxudp_config_store_rates; static wifxudp_time_precision_t wifxudp_config_time_precision = MS; -static sockent_t *sending_socket; +static sockent_t *sending_sockets; /* Buffer in which to-be-sent network packets are constructed. */ static char *send_buffer; @@ -161,6 +166,8 @@ static sockent_t *sockent_create() { se->node = NULL; se->service = NULL; se->interface = 0; + se->next = NULL; + pthread_mutex_init(&se->lock, NULL); se->client.fd = -1; se->client.addr = NULL; @@ -284,14 +291,18 @@ static void free_sockent_client(struct sockent_client *sec) { } /* void free_sockent_client */ static void sockent_destroy(sockent_t *se) { + sockent_t *next; - if (se != NULL) { + while (se != NULL) { + next = se->next; sfree(se->node); sfree(se->service); + pthread_mutex_destroy(&se->lock); free_sockent_client(&se->client); sfree(se); + se = next; } } /* void sockent_destroy */ @@ -302,7 +313,8 @@ static void write_influxdb_udp_init_buffer(void) { send_buffer_last_update = 0; } /* write_influxdb_udp_init_buffer */ -static void write_influxdb_udp_send_buffer(const char *buffer, +static void write_influxdb_udp_send_buffer(sockent_t *sending_socket, + const char *buffer, size_t buffer_size) { while (42) { int status = sockent_client_connect(sending_socket); @@ -328,8 +340,16 @@ static void write_influxdb_udp_send_buffer(const char *buffer, } /* while (42) */ } /* void write_influxdb_udp_send_buffer */ +static void write_influxdb_send_buffers(char *buffer, size_t buffer_len) { + for (sockent_t *se = sending_sockets; se != NULL; se = se->next) { + pthread_mutex_lock(&se->lock); + write_influxdb_udp_send_buffer(se, buffer, buffer_len); + pthread_mutex_unlock(&se->lock); + } /* for (sending_sockets) */ +} + static void flush_buffer(void) { - write_influxdb_udp_send_buffer(send_buffer, (size_t)send_buffer_fill); + write_influxdb_send_buffers(send_buffer, (size_t)send_buffer_fill); write_influxdb_udp_init_buffer(); } @@ -556,6 +576,28 @@ static int wifxudp_config_set_buffer_size(const oconfig_item_t *ci) { return 0; } /* int wifxudp_config_set_buffer_size */ +/* Add a sockent to the global list of sockets */ +static int sockent_add(sockent_t *se) /* {{{ */ +{ + sockent_t *last_ptr; + + if (se == NULL) + return -1; + else { + if (sending_sockets == NULL) { + sending_sockets = se; + return 0; + } + last_ptr = sending_sockets; + } + + while (last_ptr->next != NULL) + last_ptr = last_ptr->next; + last_ptr->next = se; + + return 0; +} /* }}} int sockent_add */ + static int wifxudp_config_set_server(const oconfig_item_t *ci) { if ((ci->values_num < 1) || (ci->values_num > 2) || (ci->values[0].type != OCONFIG_TYPE_STRING) || @@ -566,6 +608,9 @@ static int wifxudp_config_set_server(const oconfig_item_t *ci) { return -1; } + sockent_t *sending_socket; + int status; + sending_socket = sockent_create(); if (sending_socket == NULL) { ERROR("write_influxdb_udp plugin: sockent_create failed."); @@ -576,6 +621,14 @@ static int wifxudp_config_set_server(const oconfig_item_t *ci) { if (ci->values_num >= 2) sending_socket->service = strdup(ci->values[1].value.string); + status = sockent_add(sending_socket); + if (status != 0) { + ERROR("write_influxdb_udp plugin: wifxudp_config_set_server: sockent_add " + "failed."); + sockent_destroy(sending_socket); + return -1; + } + return 0; } /* int wifxudp_config_set_server */ @@ -632,10 +685,9 @@ static int write_influxdb_udp_shutdown(void) { sfree(send_buffer); - if (sending_socket != NULL) { - sockent_client_disconnect(sending_socket); - sockent_destroy(sending_socket); - } + for (sockent_t *se = sending_sockets; se != NULL; se = se->next) + sockent_client_disconnect(se); + sockent_destroy(sending_sockets); plugin_unregister_config("write_influxdb_udp"); plugin_unregister_init("write_influxdb_udp"); @@ -664,7 +716,7 @@ static int write_influxdb_udp_init(void) { write_influxdb_udp_init_buffer(); /* setup socket(s) and so on */ - if (sending_socket != NULL) { + if (sending_sockets != NULL) { plugin_register_write("write_influxdb_udp", write_influxdb_udp_write, /* user_data = */ NULL); }