]> git.ipfire.org Git - thirdparty/collectd.git/commitdiff
amqp: allow multiple hosts for failover
authorMärt Bakhoff <mbakhoff@perforce.com>
Fri, 13 Mar 2020 12:00:56 +0000 (14:00 +0200)
committerMärt Bakhoff <mbakhoff@perforce.com>
Fri, 13 Mar 2020 13:16:07 +0000 (15:16 +0200)
src/amqp.c
src/collectd.conf.pod

index b2312559e291f707ba0063b1e52d5c72eff0edbf..fceb40b7a9e2dafa24bf3cc0a8a10a9c7dc65044 100644 (file)
@@ -33,6 +33,7 @@
 #include "utils/common/common.h"
 #include "utils/format_graphite/format_graphite.h"
 #include "utils/format_json/format_json.h"
+#include "utils_random.h"
 
 #include <amqp.h>
 #include <amqp_framing.h>
@@ -70,7 +71,8 @@ struct camqp_config_s {
   bool publish;
   char *name;
 
-  char *host;
+  char **hosts;
+  size_t hosts_count;
   int port;
   char *vhost;
   char *user;
@@ -113,7 +115,6 @@ typedef struct camqp_config_s camqp_config_t;
 /*
  * Global variables
  */
-static const char *def_host = "localhost";
 static const char *def_vhost = "/";
 static const char *def_user = "guest";
 static const char *def_password = "guest";
@@ -153,7 +154,7 @@ static void camqp_config_free(void *ptr) /* {{{ */
   camqp_close_connection(conf);
 
   sfree(conf->name);
-  sfree(conf->host);
+  strarray_free(conf->hosts, conf->hosts_count);
   sfree(conf->vhost);
   sfree(conf->user);
   sfree(conf->password);
@@ -438,6 +439,9 @@ static int camqp_connect(camqp_config_t *conf) /* {{{ */
     return ENOMEM;
   }
 
+  char *host = conf->hosts[cdrand_u() % conf->hosts_count];
+  INFO("amqp plugin: Connecting to %s", host);
+
 #ifdef HAVE_AMQP_TCP_SOCKET
 #define CLOSE_SOCKET() /* amqp_destroy_connection() closes the socket for us   \
                         */
@@ -487,7 +491,7 @@ static int camqp_connect(camqp_config_t *conf) /* {{{ */
     }
   }
 
-  status = amqp_socket_open(socket, CONF(conf, host), conf->port);
+  status = amqp_socket_open(socket, host, conf->port);
   if (status < 0) {
     ERROR("amqp plugin: amqp_socket_open failed: %s",
           amqp_error_string2(status));
@@ -498,7 +502,7 @@ static int camqp_connect(camqp_config_t *conf) /* {{{ */
 #else /* HAVE_AMQP_TCP_SOCKET */
 #define CLOSE_SOCKET() close(sockfd)
   /* this interface is deprecated as of rabbitmq-c 0.4 */
-  sockfd = amqp_open_socket(CONF(conf, host), conf->port);
+  sockfd = amqp_open_socket(host, conf->port);
   if (sockfd < 0) {
     status = (-1) * sockfd;
     ERROR("amqp plugin: amqp_open_socket failed: %s", STRERROR(status));
@@ -538,7 +542,7 @@ static int camqp_connect(camqp_config_t *conf) /* {{{ */
 
   INFO("amqp plugin: Successfully opened connection to vhost \"%s\" "
        "on %s:%i.",
-       CONF(conf, vhost), CONF(conf, host), conf->port);
+       CONF(conf, vhost), host, conf->port);
 
   status = camqp_create_exchange(conf);
   if (status != 0)
@@ -887,7 +891,8 @@ static int camqp_config_connection(oconfig_item_t *ci, /* {{{ */
   conf->publish = publish;
   conf->name = NULL;
   conf->format = CAMQP_FORMAT_COMMAND;
-  conf->host = NULL;
+  conf->hosts = NULL;
+  conf->hosts_count = 0;
   conf->port = 5672;
   conf->vhost = NULL;
   conf->user = NULL;
@@ -929,9 +934,22 @@ static int camqp_config_connection(oconfig_item_t *ci, /* {{{ */
   for (int i = 0; i < ci->children_num; i++) {
     oconfig_item_t *child = ci->children + i;
 
-    if (strcasecmp("Host", child->key) == 0)
-      status = cf_util_get_string(child, &conf->host);
-    else if (strcasecmp("Port", child->key) == 0) {
+    if (strcasecmp("Host", child->key) == 0) {
+      for (int j = 0; j < child->values_num; j++) {
+        if (child->values[j].type != OCONFIG_TYPE_STRING) {
+          status = EINVAL;
+          ERROR("amqp plugin: Host arguments must be strings");
+          break;
+        }
+
+        status = strarray_add(&conf->hosts, &conf->hosts_count,
+                              child->values[j].value.string);
+        if (status) {
+          ERROR("amqp plugin: Host configuration failed: %d", status);
+          break;
+        }
+      }
+    } else if (strcasecmp("Port", child->key) == 0) {
       status = cf_util_get_port_number(child);
       if (status > 0) {
         conf->port = status;
@@ -1014,6 +1032,12 @@ static int camqp_config_connection(oconfig_item_t *ci, /* {{{ */
       break;
   } /* for (i = 0; i < ci->children_num; i++) */
 
+  if (status == 0 && conf->hosts_count == 0) {
+    status = strarray_add(&conf->hosts, &conf->hosts_count, "localhost");
+    if (status)
+      ERROR("amqp plugin: Host configuration failed: %d", status);
+  }
+
   if ((status == 0) && (conf->exchange == NULL)) {
     if (conf->exchange_type != NULL)
       WARNING("amqp plugin: The option \"ExchangeType\" was given "
index 09ea15fc2b2e3c3cb0ec5d84999d9f98a8a2adf1..a0c6f8857aa160b2441cdb2358d530f0a90f08f2 100644 (file)
@@ -540,6 +540,7 @@ B<Synopsis:>
    # Send values to an AMQP broker
    <Publish "some_name">
      Host "localhost"
+#    Host "amqp1.example.com" "amqp2.example.com" "amqp3.example.com"
      Port "5672"
      VHost "/"
      User "guest"
@@ -596,11 +597,14 @@ I<Publish> blocks in the future.
 
 =over 4
 
-=item B<Host> I<Host>
+=item B<Host> I<Host> [I<Host> ...]
 
 Hostname or IP-address of the AMQP broker. Defaults to the default behavior of
 the underlying communications library, I<rabbitmq-c>, which is "localhost".
 
+If multiple hosts are specified, then a random one is chosen at each
+(re)connection attempt. This is useful for failover with a clustered broker.
+
 =item B<Port> I<Port>
 
 Service name or port number on which the AMQP broker accepts connections. This