From: Märt Bakhoff Date: Fri, 13 Mar 2020 12:00:56 +0000 (+0200) Subject: amqp: allow multiple hosts for failover X-Git-Tag: collectd-5.12.0~42^2~1 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=9e998293fc3140aca31994f4f2f3413bd51149f2;p=thirdparty%2Fcollectd.git amqp: allow multiple hosts for failover --- diff --git a/src/amqp.c b/src/amqp.c index b2312559e..fceb40b7a 100644 --- a/src/amqp.c +++ b/src/amqp.c @@ -33,6 +33,7 @@ #include "utils/common/common.h" #include "utils/format_graphite/format_graphite.h" #include "utils/format_json/format_json.h" +#include "utils_random.h" #include #include @@ -70,7 +71,8 @@ struct camqp_config_s { bool publish; char *name; - char *host; + char **hosts; + size_t hosts_count; int port; char *vhost; char *user; @@ -113,7 +115,6 @@ typedef struct camqp_config_s camqp_config_t; /* * Global variables */ -static const char *def_host = "localhost"; static const char *def_vhost = "/"; static const char *def_user = "guest"; static const char *def_password = "guest"; @@ -153,7 +154,7 @@ static void camqp_config_free(void *ptr) /* {{{ */ camqp_close_connection(conf); sfree(conf->name); - sfree(conf->host); + strarray_free(conf->hosts, conf->hosts_count); sfree(conf->vhost); sfree(conf->user); sfree(conf->password); @@ -438,6 +439,9 @@ static int camqp_connect(camqp_config_t *conf) /* {{{ */ return ENOMEM; } + char *host = conf->hosts[cdrand_u() % conf->hosts_count]; + INFO("amqp plugin: Connecting to %s", host); + #ifdef HAVE_AMQP_TCP_SOCKET #define CLOSE_SOCKET() /* amqp_destroy_connection() closes the socket for us \ */ @@ -487,7 +491,7 @@ static int camqp_connect(camqp_config_t *conf) /* {{{ */ } } - status = amqp_socket_open(socket, CONF(conf, host), conf->port); + status = amqp_socket_open(socket, host, conf->port); if (status < 0) { ERROR("amqp plugin: amqp_socket_open failed: %s", amqp_error_string2(status)); @@ -498,7 +502,7 @@ static int camqp_connect(camqp_config_t *conf) /* {{{ */ #else /* HAVE_AMQP_TCP_SOCKET */ #define CLOSE_SOCKET() close(sockfd) /* this interface is deprecated as of rabbitmq-c 0.4 */ - sockfd = amqp_open_socket(CONF(conf, host), conf->port); + sockfd = amqp_open_socket(host, conf->port); if (sockfd < 0) { status = (-1) * sockfd; ERROR("amqp plugin: amqp_open_socket failed: %s", STRERROR(status)); @@ -538,7 +542,7 @@ static int camqp_connect(camqp_config_t *conf) /* {{{ */ INFO("amqp plugin: Successfully opened connection to vhost \"%s\" " "on %s:%i.", - CONF(conf, vhost), CONF(conf, host), conf->port); + CONF(conf, vhost), host, conf->port); status = camqp_create_exchange(conf); if (status != 0) @@ -887,7 +891,8 @@ static int camqp_config_connection(oconfig_item_t *ci, /* {{{ */ conf->publish = publish; conf->name = NULL; conf->format = CAMQP_FORMAT_COMMAND; - conf->host = NULL; + conf->hosts = NULL; + conf->hosts_count = 0; conf->port = 5672; conf->vhost = NULL; conf->user = NULL; @@ -929,9 +934,22 @@ static int camqp_config_connection(oconfig_item_t *ci, /* {{{ */ for (int i = 0; i < ci->children_num; i++) { oconfig_item_t *child = ci->children + i; - if (strcasecmp("Host", child->key) == 0) - status = cf_util_get_string(child, &conf->host); - else if (strcasecmp("Port", child->key) == 0) { + if (strcasecmp("Host", child->key) == 0) { + for (int j = 0; j < child->values_num; j++) { + if (child->values[j].type != OCONFIG_TYPE_STRING) { + status = EINVAL; + ERROR("amqp plugin: Host arguments must be strings"); + break; + } + + status = strarray_add(&conf->hosts, &conf->hosts_count, + child->values[j].value.string); + if (status) { + ERROR("amqp plugin: Host configuration failed: %d", status); + break; + } + } + } else if (strcasecmp("Port", child->key) == 0) { status = cf_util_get_port_number(child); if (status > 0) { conf->port = status; @@ -1014,6 +1032,12 @@ static int camqp_config_connection(oconfig_item_t *ci, /* {{{ */ break; } /* for (i = 0; i < ci->children_num; i++) */ + if (status == 0 && conf->hosts_count == 0) { + status = strarray_add(&conf->hosts, &conf->hosts_count, "localhost"); + if (status) + ERROR("amqp plugin: Host configuration failed: %d", status); + } + if ((status == 0) && (conf->exchange == NULL)) { if (conf->exchange_type != NULL) WARNING("amqp plugin: The option \"ExchangeType\" was given " diff --git a/src/collectd.conf.pod b/src/collectd.conf.pod index 09ea15fc2..a0c6f8857 100644 --- a/src/collectd.conf.pod +++ b/src/collectd.conf.pod @@ -540,6 +540,7 @@ B # Send values to an AMQP broker Host "localhost" +# Host "amqp1.example.com" "amqp2.example.com" "amqp3.example.com" Port "5672" VHost "/" User "guest" @@ -596,11 +597,14 @@ I blocks in the future. =over 4 -=item B I +=item B I [I ...] Hostname or IP-address of the AMQP broker. Defaults to the default behavior of the underlying communications library, I, which is "localhost". +If multiple hosts are specified, then a random one is chosen at each +(re)connection attempt. This is useful for failover with a clustered broker. + =item B I Service name or port number on which the AMQP broker accepts connections. This