SAVE_CPPFLAGS="$CPPFLAGS"
CPPFLAGS="$CPPFLAGS $with_librabbitmq_cppflags"
- AC_CHECK_HEADERS([amqp.h],
- [with_librabbitmq="yes"],
- [with_librabbitmq="no (amqp.h not found)"]
- )
+ with_librabbitmq="no (amqp.h and rabbitmq-c/amqp.h not found)"
+ AC_CHECK_HEADERS([rabbitmq-c/amqp.h], [with_librabbitmq="yes"], [])
+ AC_CHECK_HEADERS([amqp.h], [with_librabbitmq="yes"], [])
+
+ AC_CHECK_HEADERS([rabbitmq-c/framing.h rabbitmq-c/ssl_socket.h rabbitmq-c/tcp_socket.h \
+ amqp_framing.h amqp_ssl_socket.h amqp_tcp_socket.h])
CPPFLAGS="$SAVE_CPPFLAGS"
fi
#include <stdio.h>
#include <stdint.h>
#include <inttypes.h>
- #include <amqp.h>
+ #if HAVE_RABBITMQ_C_AMQP_H
+ # include <rabbitmq-c/amqp.h>
+ #else
+ # include <amqp.h>
+ #endif
]]
)
CPPFLAGS="$SAVE_CPPFLAGS"
LDFLAGS="$SAVE_LDFLAGS"
fi
-if test "x$with_librabbitmq" = "xyes"; then
- SAVE_CPPFLAGS="$CPPFLAGS"
- SAVE_LDFLAGS="$LDFLAGS"
- SAVE_LIBS="$LIBS"
- CPPFLAGS="$CPPFLAGS $with_librabbitmq_cppflags"
- LDFLAGS="$LDFLAGS $with_librabbitmq_ldflags"
- LIBS="-lrabbitmq"
-
- AC_CHECK_HEADERS([amqp_tcp_socket.h amqp_socket.h])
- AC_CHECK_FUNC([amqp_tcp_socket_new],
- [
- AC_DEFINE([HAVE_AMQP_TCP_SOCKET], [1],
- [Define if librabbitmq provides the new TCP socket interface.])
- ]
- )
-
- AC_CHECK_DECLS([amqp_socket_close],
- [],
- [],
- [[
- #include <amqp.h>
- #ifdef HAVE_AMQP_TCP_SOCKET_H
- # include <amqp_tcp_socket.h>
- #endif
- #ifdef HAVE_AMQP_SOCKET_H
- # include <amqp_socket.h>
- #endif
- ]]
- )
-
- CPPFLAGS="$SAVE_CPPFLAGS"
- LDFLAGS="$SAVE_LDFLAGS"
- LIBS="$SAVE_LIBS"
-fi
-
if test "x$with_librabbitmq" = "xyes"; then
BUILD_WITH_LIBRABBITMQ_CPPFLAGS="$with_librabbitmq_cppflags"
BUILD_WITH_LIBRABBITMQ_LDFLAGS="$with_librabbitmq_ldflags"
#include "utils/format_json/format_json.h"
#include "utils_random.h"
-#include <rabbitmq-c/amqp.h>
-#include <rabbitmq-c/amqp_framing.h>
-
-#ifdef HAVE_AMQP_TCP_SOCKET_H
-#include <rabbitmq-c/amqp_ssl_socket.h>
-#include <rabbitmq-c/amqp_tcp_socket.h>
-#endif
-#ifdef HAVE_AMQP_SOCKET_H
-#include <rabbitmq-c/amqp_socket.h>
-#endif
-#ifdef HAVE_AMQP_TCP_SOCKET
-#if defined HAVE_DECL_AMQP_SOCKET_CLOSE && !HAVE_DECL_AMQP_SOCKET_CLOSE
-/* rabbitmq-c does not currently ship amqp_socket.h
- * and, thus, does not define this function. */
-int amqp_socket_close(amqp_socket_t *);
-#endif
+#if HAVE_RABBITMQ_C_AMQP_H
+# include <rabbitmq-c/amqp.h>
+# if HAVE_RABBITMQ_C_FRAMING_H
+# include <rabbitmq-c/framing.h>
+# endif
+# if HAVE_RABBITMQ_C_TCP_SOCKET_H
+# include <rabbitmq-c/tcp_socket.h>
+# endif
+# if HAVE_RABBITMQ_C_SSL_SOCKET_H
+# include <rabbitmq-c/ssl_socket.h>
+# endif
+#elif HAVE_AMQP_H
+# include <amqp.h>
+# if HAVE_AMQP_FRAMING_H
+# include <amqp_framing.h>
+# endif
+# if HAVE_AMQP_TCP_SOCKET_H
+# include <amqp_tcp_socket.h>
+# endif
+# if HAVE_AMQP_SSL_SOCKET_H
+# include <amqp_ssl_socket.h>
+# endif
#endif
/* Defines for the delivery mode. I have no idea why they're not defined by the
{
static time_t last_connect_time;
- amqp_rpc_reply_t reply;
- int status;
-#ifdef HAVE_AMQP_TCP_SOCKET
- amqp_socket_t *socket;
-#else
- int sockfd;
-#endif
-
if (conf->connection != NULL)
return 0;
char *host = conf->hosts[cdrand_u() % conf->hosts_count];
INFO("amqp plugin: Connecting to %s", host);
-#ifdef HAVE_AMQP_TCP_SOCKET
-#define CLOSE_SOCKET() /* amqp_destroy_connection() closes the socket for us \
- */
-
+ amqp_socket_t *socket = NULL;
if (conf->tls_enabled) {
socket = amqp_ssl_socket_new(conf->connection);
- if (!socket) {
+ if (socket == NULL) {
ERROR("amqp plugin: amqp_ssl_socket_new failed.");
amqp_destroy_connection(conf->connection);
conf->connection = NULL;
#endif
if (conf->tls_cacert) {
- status = amqp_ssl_socket_set_cacert(socket, conf->tls_cacert);
+ int status = amqp_ssl_socket_set_cacert(socket, conf->tls_cacert);
if (status < 0) {
ERROR("amqp plugin: amqp_ssl_socket_set_cacert failed: %s",
amqp_error_string2(status));
}
}
if (conf->tls_client_cert && conf->tls_client_key) {
- status = amqp_ssl_socket_set_key(socket, conf->tls_client_cert,
+ int status = amqp_ssl_socket_set_key(socket, conf->tls_client_cert,
conf->tls_client_key);
if (status < 0) {
ERROR("amqp plugin: amqp_ssl_socket_set_key failed: %s",
}
}
- status = amqp_socket_open(socket, host, conf->port);
+ int status = amqp_socket_open(socket, host, conf->port);
if (status < 0) {
ERROR("amqp plugin: amqp_socket_open failed: %s",
amqp_error_string2(status));
conf->connection = NULL;
return status;
}
-#else /* HAVE_AMQP_TCP_SOCKET */
-#define CLOSE_SOCKET() close(sockfd)
- /* this interface is deprecated as of rabbitmq-c 0.4 */
- sockfd = amqp_open_socket(host, conf->port);
- if (sockfd < 0) {
- status = (-1) * sockfd;
- ERROR("amqp plugin: amqp_open_socket failed: %s", STRERROR(status));
- amqp_destroy_connection(conf->connection);
- conf->connection = NULL;
- return status;
- }
- amqp_set_sockfd(conf->connection, sockfd);
-#endif
- reply = amqp_login(conf->connection, CONF(conf, vhost),
+ amqp_rpc_reply_t reply = amqp_login(conf->connection, CONF(conf, vhost),
/* channel max = */ 0,
/* frame max = */ 131072,
/* heartbeat = */ 0,
ERROR("amqp plugin: amqp_login (vhost = %s, user = %s) failed.",
CONF(conf, vhost), CONF(conf, user));
amqp_destroy_connection(conf->connection);
- CLOSE_SOCKET();
conf->connection = NULL;
return 1;
}
ERROR("amqp plugin: amqp_channel_open failed.");
amqp_connection_close(conf->connection, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(conf->connection);
- CLOSE_SOCKET();
conf->connection = NULL;
return 1;
}