#include "utils/common/common.h"
#include "utils/format_graphite/format_graphite.h"
#include "utils/format_json/format_json.h"
+#include "utils_random.h"
#include <amqp.h>
#include <amqp_framing.h>
bool publish;
char *name;
- char *host;
+ char **hosts;
+ size_t hosts_count;
int port;
char *vhost;
char *user;
/*
* Global variables
*/
-static const char *def_host = "localhost";
static const char *def_vhost = "/";
static const char *def_user = "guest";
static const char *def_password = "guest";
camqp_close_connection(conf);
sfree(conf->name);
- sfree(conf->host);
+ strarray_free(conf->hosts, conf->hosts_count);
sfree(conf->vhost);
sfree(conf->user);
sfree(conf->password);
return ENOMEM;
}
+ char *host = conf->hosts[cdrand_u() % conf->hosts_count];
+ INFO("amqp plugin: Connecting to %s", host);
+
#ifdef HAVE_AMQP_TCP_SOCKET
#define CLOSE_SOCKET() /* amqp_destroy_connection() closes the socket for us \
*/
}
}
- status = amqp_socket_open(socket, CONF(conf, host), conf->port);
+ status = amqp_socket_open(socket, host, conf->port);
if (status < 0) {
ERROR("amqp plugin: amqp_socket_open failed: %s",
amqp_error_string2(status));
#else /* HAVE_AMQP_TCP_SOCKET */
#define CLOSE_SOCKET() close(sockfd)
/* this interface is deprecated as of rabbitmq-c 0.4 */
- sockfd = amqp_open_socket(CONF(conf, host), conf->port);
+ sockfd = amqp_open_socket(host, conf->port);
if (sockfd < 0) {
status = (-1) * sockfd;
ERROR("amqp plugin: amqp_open_socket failed: %s", STRERROR(status));
INFO("amqp plugin: Successfully opened connection to vhost \"%s\" "
"on %s:%i.",
- CONF(conf, vhost), CONF(conf, host), conf->port);
+ CONF(conf, vhost), host, conf->port);
status = camqp_create_exchange(conf);
if (status != 0)
conf->publish = publish;
conf->name = NULL;
conf->format = CAMQP_FORMAT_COMMAND;
- conf->host = NULL;
+ conf->hosts = NULL;
+ conf->hosts_count = 0;
conf->port = 5672;
conf->vhost = NULL;
conf->user = NULL;
for (int i = 0; i < ci->children_num; i++) {
oconfig_item_t *child = ci->children + i;
- if (strcasecmp("Host", child->key) == 0)
- status = cf_util_get_string(child, &conf->host);
- else if (strcasecmp("Port", child->key) == 0) {
+ if (strcasecmp("Host", child->key) == 0) {
+ for (int j = 0; j < child->values_num; j++) {
+ if (child->values[j].type != OCONFIG_TYPE_STRING) {
+ status = EINVAL;
+ ERROR("amqp plugin: Host arguments must be strings");
+ break;
+ }
+
+ status = strarray_add(&conf->hosts, &conf->hosts_count,
+ child->values[j].value.string);
+ if (status) {
+ ERROR("amqp plugin: Host configuration failed: %d", status);
+ break;
+ }
+ }
+ } else if (strcasecmp("Port", child->key) == 0) {
status = cf_util_get_port_number(child);
if (status > 0) {
conf->port = status;
break;
} /* for (i = 0; i < ci->children_num; i++) */
+ if (status == 0 && conf->hosts_count == 0) {
+ status = strarray_add(&conf->hosts, &conf->hosts_count, "localhost");
+ if (status)
+ ERROR("amqp plugin: Host configuration failed: %d", status);
+ }
+
if ((status == 0) && (conf->exchange == NULL)) {
if (conf->exchange_type != NULL)
WARNING("amqp plugin: The option \"ExchangeType\" was given "