LDFLAGS="${LDFLAGS} -pie"
fi
+#libevent includes and libraries
+ AC_ARG_WITH(libevent_includes,
+ [ --with-libevent-includes=DIR libevent include directory],
+ [with_libevent_includes="$withval"],[with_libevent_includes="no"])
+ AC_ARG_WITH(libevent_libraries,
+ [ --with-libevent-libraries=DIR libevent library directory],
+ [with_libevent_libraries="$withval"],[with_libevent_libraries="no"])
+
# libhiredis
AC_ARG_ENABLE(hiredis,
AS_HELP_STRING([--enable-hiredis],[Enable Redis support]),
[ --with-libhiredis-libraries=DIR libhiredis library directory],
[with_libhiredis_libraries="$withval"],[with_libhiredis_libraries="no"])
+ enable_hiredis_async="no"
if test "$enable_hiredis" = "yes"; then
if test "$with_libhiredis_includes" != "no"; then
CPPFLAGS="${CPPFLAGS} -I${with_libhiredis_includes}"
if test "$HIREDIS" = "yes"; then
AC_DEFINE([HAVE_LIBHIREDIS],[1],[libhiredis available])
enable_hiredis="yes"
+ #
+ # Check if async adapters and libevent is installed
+ #
+ AC_CHECK_HEADER("hiredis/adapters/libevent.h",HIREDIS_LIBEVENT_ADAPTER="yes",HIREDIS_LIBEVENT_ADAPTER="no")
+ if test "$HIREDIS_LIBEVENT_ADAPTER" = "yes"; then
+ #Look for libevent headers
+ if test "$with_libevent_includes" != "no"; then
+ CPPFLAGS="${CPPFLAGS} -I${with_libevent_includes}"
+ fi
+ AC_CHECK_HEADER("event.h",LIBEVENT="yes",LIBEVENT="no")
+ if test "$LIBEVENT" = "yes"; then
+ if test "$with_libevent_libraries" != "no"; then
+ LDFLAGS="${LDFLAGS} -L${with_libevent_libraries}"
+ fi
+ AC_CHECK_LIB(event, event_base_free,, HAVE_LIBEVENT="no")
+ AC_CHECK_LIB(event_pthreads, evthread_use_pthreads,, HAVE_LIBEVENT_PTHREADS="no")
+ fi
+ if test "$HAVE_LIBEVENT" = "no" -o test "$HAVE_LIBEVENT_PTHREADS" = "no" ; then
+ if test "$HAVE_LIBEVENT" = "no"; then
+ echo
+ echo " Async mode for redis output will not be available."
+ echo " To enable it install libevent"
+ echo
+ echo " Ubuntu: apt-get install libevent-dev"
+ echo " Fedora: dnf install event-devel"
+ echo " RHEL/CentOS: yum install event-devel"
+ echo
+ fi
+ if test "$HAVE_LIBEVENT_PTHREADS" = "no"; then
+ echo
+ echo " Async mode for redis output will not be available."
+ echo " To enable it install libevent with pthreads support"
+ echo
+ echo " Ubuntu: apt-get install libevent-pthreads-2.0-5"
+ echo
+ fi
+ else
+ AC_DEFINE([HAVE_LIBEVENT],[1],[libevent available])
+ enable_hiredis_async="yes"
+ fi
+ fi
fi
fi
libnspr support: ${enable_nspr}
libjansson support: ${enable_jansson}
hiredis support: ${enable_hiredis}
+ hiredis async with libevent: ${enable_hiredis_async}
Prelude support: ${enable_prelude}
PCRE jit: ${pcre_jit_available}
LUA support: ${enable_lua}
--- /dev/null
+/* vi: set et ts=4: */
+/* Copyright (C) 2007-2016 Open Information Security Foundation
+ *
+ * You can copy, redistribute or modify this Program under the terms of
+ * the GNU General Public License version 2 as published by the Free
+ * Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301, USA.
+ */
+
+/**
+ * \file
+ *
+ * \author Paulo Pacheco <fooinha@gmail.com>
+ *
+ * File-like output for logging: redis
+ */
+#include "suricata-common.h" /* errno.h, string.h, etc. */
+#include "util-log-redis.h"
+#include "util-logopenfile.h"
+
+#ifdef HAVE_LIBHIREDIS
+
+#ifdef HAVE_LIBEVENT_PTHREADS
+#include <event2/thread.h>
+#endif /* HAVE_LIBEVENT_PTHREADS */
+
+static const char * redis_push_cmd = "LPUSH";
+static const char * redis_publish_cmd = "PUBLISH";
+static const char * redis_default_key = "suricata";
+static const char * redis_default_server = "127.0.0.1";
+
+static int SCConfLogReopenSyncRedis(LogFileCtx *log_ctx);
+static void SCLogFileCloseRedis(LogFileCtx *log_ctx);
+
+/**
+ * \brief SCLogRedisInit() - Initializes global stuff before threads
+ */
+void SCLogRedisInit()
+{
+#ifdef HAVE_LIBEVENT_PTHREADS
+ evthread_use_pthreads();
+#endif /* HAVE_LIBEVENT_PTHREADS */
+}
+
+/** \brief SCLogRedisContextAlloc() - Allocates and initalizes redis context
+ */
+static SCLogRedisContext * SCLogRedisContextAlloc()
+{
+ SCLogRedisContext* ctx = (SCLogRedisContext*) SCMalloc(sizeof(SCLogRedisContext));
+ if (ctx == NULL) {
+ SCLogError(SC_ERR_MEM_ALLOC, "Unable to allocate redis context");
+ exit(EXIT_FAILURE);
+ }
+ ctx->sync = NULL;
+#if HAVE_LIBEVENT
+ ctx->ev_base = NULL;
+ ctx->async = NULL;
+#endif
+ ctx->batch_count = 0;
+ ctx->tried = 0;
+
+ return ctx;
+}
+
+#ifdef HAVE_LIBEVENT
+
+static int SCConfLogReopenAsyncRedis(LogFileCtx *log_ctx);
+#include <hiredis/adapters/libevent.h>
+
+/** \brief SCLogRedisAsyncContextAlloc() - Allocates and initalizes redis context with async
+ */
+static SCLogRedisContext * SCLogRedisContextAsyncAlloc()
+{
+ SCLogRedisContext* ctx = (SCLogRedisContext*) SCMalloc(sizeof(SCLogRedisContext));
+ if (unlikely(ctx == NULL)) {
+ SCLogError(SC_ERR_MEM_ALLOC, "Unable to allocate redis context");
+ exit(EXIT_FAILURE);
+ }
+
+ ctx->sync = NULL;
+ ctx->async = NULL;
+ ctx->ev_base = NULL;
+ ctx->connected = 0;
+ ctx->batch_count = 0;
+ ctx->tried = 0;
+
+ return ctx;
+}
+
+/** \brief SCRedisAsyncCommandCallback() Callback when reply from redis happens.
+ * \param ac redis async context
+ * \param r redis reply
+ * \param privvata opaque datq with pointer to LogFileCtx
+ */
+static void SCRedisAsyncCommandCallback(redisAsyncContext *ac, void *r, void *privdata)
+{
+ redisReply *reply = r;
+ LogFileCtx *log_ctx = privdata;
+ SCLogRedisContext *ctx = log_ctx->redis;
+
+ if (reply == NULL) {
+ if (ctx->connected > 0)
+ SCLogInfo("Missing reply from redis, disconnected.");
+ ctx->connected = 0;
+ } else {
+ ctx->connected = 1;
+ event_base_loopbreak(ctx->ev_base);
+ }
+}
+
+/** \brief SCRedisAsyncEchoCommandCallback() Callback for an ECHO command reply
+ * This is used to check if redis is connected.
+ * \param ac redis async context
+ * \param r redis reply
+ * \param privvata opaque datq with pointer to LogFileCtx
+ */
+static void SCRedisAsyncEchoCommandCallback(redisAsyncContext *ac, void *r, void *privdata)
+{
+ redisReply *reply = r;
+ SCLogRedisContext * ctx = privdata;
+
+ if (reply) {
+ if (ctx->connected == 0) {
+ SCLogNotice("Connected to Redis.");
+ ctx->connected = 1;
+ ctx->tried = 0;
+ }
+ } else {
+ ctx->connected = 0;
+ if (ctx->tried == 0) {
+ SCLogWarning(SC_ERR_SOCKET, "Failed to connect to Redis... (will keep trying)");
+ }
+ ctx->tried = time(NULL);
+ }
+ event_base_loopbreak(ctx->ev_base);
+}
+
+/** \brief SCRedisAsyncEchoCommandCallback() Callback for an QUIT command reply
+ * Emits and awaits response for an async ECHO command.
+ * It's used for check if redis is alive.
+ * \param ctx redis context
+ */
+static void SCLogAsyncRedisSendEcho(SCLogRedisContext * ctx)
+{
+ redisAsyncCommand(ctx->async, SCRedisAsyncEchoCommandCallback, ctx, "ECHO suricata");
+ event_base_dispatch(ctx->ev_base);
+}
+
+/** \brief SCRedisAsyncEchoCommandCallback() Callback for an QUIT command reply
+ * This is used to terminate connection with redis.
+ * \param ac redis async context
+ * \param r redis reply
+ * \param privvata opaque datq with pointer to LogFileCtx
+ */
+static void SCRedisAsyncQuitCommandCallback(redisAsyncContext *ac, void *r, void *privdata)
+{
+ SCLogInfo("Disconnecting from redis!");
+}
+
+/** \brief QUIT command
+ * Emits and awaits response for an async QUIT command.
+ * It's used to disconnect with redis
+ * \param ctx redis context
+ */
+static void SCLogAsyncRedisSendQuit(SCLogRedisContext * ctx)
+{
+ if (ctx->connected) {
+ redisAsyncCommand(ctx->async, SCRedisAsyncQuitCommandCallback, ctx, "QUIT");
+ SCLogInfo("QUIT Command sent to redis. Connection will terminate!");
+ }
+
+ redisAsyncFree(ctx->async);
+ event_base_dispatch(ctx->ev_base);
+ ctx->async = NULL;
+ event_base_free(ctx->ev_base);
+ ctx->ev_base = NULL;
+ ctx->connected = 0;
+}
+
+/** \brief SCConfLogReopenAsyncRedis() Open or re-opens connection to redis for logging.
+ * \param log_ctx Log file context allocated by caller
+ */
+static int SCConfLogReopenAsyncRedis(LogFileCtx *log_ctx)
+{
+ SCLogRedisContext * ctx = log_ctx->redis;
+ const char *redis_server = log_ctx->redis_setup.server;
+ int redis_port = log_ctx->redis_setup.port;
+
+ /* only try to reconnect once per second */
+ if (ctx->tried >= time(NULL)) {
+ return -1;
+ }
+
+ ctx->async = redisAsyncConnect(redis_server, redis_port);
+
+ if (ctx->ev_base != NULL) {
+ event_base_free(ctx->ev_base);
+ }
+
+ if (ctx->async == NULL) {
+ SCLogError(SC_ERR_MEM_ALLOC, "Error allocate redis async.");
+ ctx->tried = time(NULL);
+ return -1;
+ }
+
+ if (ctx->async != NULL && ctx->async->err) {
+ SCLogError(SC_ERR_SOCKET, "Error setting to redis async: [%s].", ctx->async->errstr);
+ ctx->tried = time(NULL);
+ return -1;
+ }
+
+ ctx->ev_base = event_base_new();
+
+ if (ctx->ev_base == NULL) {
+ ctx->tried = time(NULL);
+ redisAsyncFree(ctx->async);
+ ctx->async = NULL;
+ return -1;
+ }
+
+ redisLibeventAttach(ctx->async, ctx->ev_base);
+
+ log_ctx->redis = ctx;
+ log_ctx->Close = SCLogFileCloseRedis;
+ return 0;
+}
+
+
+/** \brief SCLogRedisWriteAsync() writes string to redis output in async mode
+ * \param file_ctx Log file context allocated by caller
+ * \param string Buffer to output
+ */
+static int SCLogRedisWriteAsync(LogFileCtx *file_ctx, const char *string, size_t string_len)
+{
+ SCLogRedisContext *ctx = file_ctx->redis;
+
+ if (! ctx->connected) {
+ if (SCConfLogReopenAsyncRedis(file_ctx) == -1) {
+ return -1;
+ }
+ if (ctx->tried == 0) {
+ SCLogNotice("Trying to connect to Redis");
+ }
+ SCLogAsyncRedisSendEcho(ctx);
+ }
+
+ if (!ctx->connected) {
+ return -1;
+ }
+
+ if (ctx->async == NULL) {
+ return -1;
+ }
+
+ redisAsyncCommand(ctx->async,
+ SCRedisAsyncCommandCallback,
+ file_ctx,
+ "%s %s %s",
+ file_ctx->redis_setup.command,
+ file_ctx->redis_setup.key,
+ string);
+
+ event_base_loop(ctx->ev_base, EVLOOP_NONBLOCK);
+
+ return 0;
+}
+
+#endif// HAVE_LIBEVENT
+
+/** \brief SCConfLogReopenSyncRedis() Open or re-opens connection to redis for logging.
+ * \param log_ctx Log file context allocated by caller
+ */
+static int SCConfLogReopenSyncRedis(LogFileCtx *log_ctx)
+{
+ SCLogRedisContext * ctx = log_ctx->redis;
+
+ /* only try to reconnect once per second */
+ if (ctx->tried >= time(NULL)) {
+ return -1;
+ }
+
+ const char *redis_server = log_ctx->redis_setup.server;
+ int redis_port = log_ctx->redis_setup.port;
+
+ if (ctx->sync != NULL) {
+ redisFree(ctx->sync);
+ }
+ ctx->sync = redisConnect(redis_server, redis_port);
+ if (ctx->sync == NULL) {
+ SCLogError(SC_ERR_SOCKET, "Error connecting to redis server.");
+ ctx->tried = time(NULL);
+ return -1;
+ }
+ if (ctx->sync->err) {
+ SCLogError(SC_ERR_SOCKET, "Error connecting to redis server: [%s].", ctx->sync->errstr);
+ redisFree(ctx->sync);
+ ctx->sync = NULL;
+ ctx->tried = time(NULL);
+ return -1;
+ }
+ SCLogInfo("Connected to redis server [%s].", log_ctx->redis_setup.server);
+
+ log_ctx->redis = ctx;
+ log_ctx->Close = SCLogFileCloseRedis;
+ return 0;
+}
+/** \brief SCLogRedisWriteSync() writes string to redis output in sync mode
+ * \param file_ctx Log file context allocated by caller
+ * \param string Buffer to output
+ */
+static int SCLogRedisWriteSync(LogFileCtx *file_ctx, const char *string)
+{
+ SCLogRedisContext * ctx = file_ctx->redis;
+ int ret = -1;
+ redisContext *redis = ctx->sync;
+ if (redis == NULL) {
+ SCConfLogReopenSyncRedis(file_ctx);
+ redis = ctx->sync;
+ if (redis == NULL) {
+ SCLogDebug("Redis after re-open is not available.");
+ return -1;
+ }
+ }
+
+ /* synchronous mode */
+ if (file_ctx->redis_setup.batch_size) {
+ redisAppendCommand(redis, "%s %s %s",
+ file_ctx->redis_setup.command,
+ file_ctx->redis_setup.key,
+ string);
+ if (ctx->batch_count == file_ctx->redis_setup.batch_size) {
+ redisReply *reply;
+ int i;
+ ctx->batch_count = 0;
+ for (i = 0; i <= file_ctx->redis_setup.batch_size; i++) {
+ if (redisGetReply(redis, (void **)&reply) == REDIS_OK) {
+ freeReplyObject(reply);
+ ret = 0;
+ } else {
+ if (redis->err) {
+ SCLogInfo("Error when fetching reply: %s (%d)",
+ redis->errstr,
+ redis->err);
+ }
+ switch (redis->err) {
+ case REDIS_ERR_EOF:
+ case REDIS_ERR_IO:
+ SCLogInfo("Reopening connection to redis server");
+ SCConfLogReopenSyncRedis(file_ctx);
+ redis = ctx->sync;
+ if (redis) {
+ SCLogInfo("Reconnected to redis server");
+ } else {
+ SCLogInfo("Unable to reconnect to redis server");
+ return -1;
+ }
+ break;
+ default:
+ SCLogWarning(SC_ERR_INVALID_VALUE,
+ "Unsupported error code %d",
+ redis->err);
+ return -1;
+ }
+ }
+ }
+ } else {
+ ctx->batch_count++;
+ }
+ } else {
+ redisReply *reply = redisCommand(redis, "%s %s %s",
+ file_ctx->redis_setup.command,
+ file_ctx->redis_setup.key,
+ string);
+ /* We may lose the reply if disconnection happens*/
+ if (reply) {
+ switch (reply->type) {
+ case REDIS_REPLY_ERROR:
+ SCLogWarning(SC_ERR_SOCKET, "Redis error: %s", reply->str);
+ SCConfLogReopenSyncRedis(file_ctx);
+ break;
+ case REDIS_REPLY_INTEGER:
+ SCLogDebug("Redis integer %lld", reply->integer);
+ ret = 0;
+ break;
+ default:
+ SCLogError(SC_ERR_INVALID_VALUE,
+ "Redis default triggered with %d", reply->type);
+ SCConfLogReopenSyncRedis(file_ctx);
+ break;
+ }
+ freeReplyObject(reply);
+ } else {
+ SCConfLogReopenSyncRedis(file_ctx);
+ }
+ }
+ return ret;
+}
+
+/**
+ * \brief LogFileWriteRedis() writes log data to redis output.
+ * \param log_ctx Log file context allocated by caller
+ * \param string buffer with data to write
+ * \param string_len data length
+ * \retval 0 on sucess;
+ * \retval -1 on failure;
+ */
+int LogFileWriteRedis(void *lf_ctx, const char *string, size_t string_len)
+{
+ LogFileCtx *file_ctx = lf_ctx;
+ if (file_ctx == NULL) {
+ return -1;
+ }
+
+#if HAVE_LIBEVENT
+ /* async mode on */
+ if (file_ctx->redis_setup.is_async) {
+ return SCLogRedisWriteAsync(file_ctx, string, string_len);
+ }
+#endif
+ /* sync mode */
+ if (! file_ctx->redis_setup.is_async) {
+ return SCLogRedisWriteSync(file_ctx, string);
+ }
+ return -1;
+}
+
+/** \brief configure and initializes redis output logging
+ * \param conf ConfNode structure for the output section in question
+ * \param log_ctx Log file context allocated by caller
+ * \retval 0 on success
+ */
+int SCConfLogOpenRedis(ConfNode *redis_node, void *lf_ctx)
+{
+ LogFileCtx *log_ctx = lf_ctx;
+
+ const char *redis_port = NULL;
+ const char *redis_mode = NULL;
+
+ int is_async = 0;
+
+ if (redis_node) {
+ log_ctx->redis_setup.server = ConfNodeLookupChildValue(redis_node, "server");
+ log_ctx->redis_setup.key = ConfNodeLookupChildValue(redis_node, "key");
+
+ redis_port = ConfNodeLookupChildValue(redis_node, "port");
+ redis_mode = ConfNodeLookupChildValue(redis_node, "mode");
+
+ ConfGetChildValueBool(redis_node, "async", &is_async);
+ }
+ if (!log_ctx->redis_setup.server) {
+ log_ctx->redis_setup.server = redis_default_server;
+ SCLogInfo("Using default redis server (127.0.0.1)");
+ }
+ if (!redis_port)
+ redis_port = "6379";
+ if (!redis_mode)
+ redis_mode = "list";
+ if (!log_ctx->redis_setup.key) {
+ log_ctx->redis_setup.key = redis_default_key;
+ }
+
+#ifndef HAVE_LIBEVENT
+ if (is_async) {
+ SCLogWarning(SC_ERR_NO_REDIS_ASYNC, "async option not available.");
+ }
+ is_async = 0;
+#endif //ifndef HAVE_LIBEVENT
+
+ log_ctx->redis_setup.is_async = is_async;
+ log_ctx->redis_setup.batch_size = 0;
+ if (redis_node) {
+ ConfNode *pipelining = ConfNodeLookupChild(redis_node, "pipelining");
+ if (pipelining) {
+ int enabled = 0;
+ int ret;
+ intmax_t val;
+ ret = ConfGetChildValueBool(pipelining, "enabled", &enabled);
+ if (ret && enabled) {
+ ret = ConfGetChildValueInt(pipelining, "batch-size", &val);
+ if (ret) {
+ log_ctx->redis_setup.batch_size = val;
+ } else {
+ log_ctx->redis_setup.batch_size = 10;
+ }
+ }
+ }
+ } else {
+ log_ctx->redis_setup.batch_size = 0;
+ }
+
+ if (!strcmp(redis_mode, "list")) {
+ log_ctx->redis_setup.command = redis_push_cmd;
+ } else {
+ log_ctx->redis_setup.command = redis_publish_cmd;
+ }
+ /* store server params for reconnection */
+ if (!log_ctx->redis_setup.server) {
+ SCLogError(SC_ERR_MEM_ALLOC, "Error allocating redis server string");
+ exit(EXIT_FAILURE);
+ }
+ log_ctx->redis_setup.port = atoi(redis_port);
+ log_ctx->Close = SCLogFileCloseRedis;
+
+#ifdef HAVE_LIBEVENT
+ if (is_async) {
+ log_ctx->redis = SCLogRedisContextAsyncAlloc();
+ }
+#endif /*HAVE_LIBEVENT*/
+ if (! is_async) {
+ log_ctx->redis = SCLogRedisContextAlloc();
+ SCConfLogReopenSyncRedis(log_ctx);
+ }
+ return 0;
+}
+
+/** \brief SCLogFileCloseRedis() Closes redis log more
+ * \param log_ctx Log file context allocated by caller
+ */
+void SCLogFileCloseRedis(LogFileCtx *log_ctx)
+{
+ SCLogRedisContext * ctx = log_ctx->redis;
+ if (ctx == NULL) {
+ return;
+ }
+ /* asynchronous */
+ if (log_ctx->redis_setup.is_async) {
+#if HAVE_LIBEVENT == 1
+ if (ctx->async) {
+ if (ctx->connected > 0) {
+ SCLogAsyncRedisSendQuit(ctx);
+ }
+ if (ctx->ev_base != NULL) {
+ event_base_free(ctx->ev_base);
+ ctx->ev_base = NULL;
+ }
+ }
+#endif
+ }
+
+ /* synchronous */
+ if (!log_ctx->redis_setup.is_async) {
+ if (ctx->sync) {
+ redisReply *reply;
+ int i;
+ for (i = 0; i < ctx->batch_count; i++) {
+ redisGetReply(ctx->sync, (void **)&reply);
+ if (reply) {
+ freeReplyObject(reply);
+ }
+ }
+ redisFree(ctx->sync);
+ ctx->sync = NULL;
+ }
+ ctx->tried = 0;
+ ctx->batch_count = 0;
+ }
+
+ if (ctx != NULL) {
+ SCFree(ctx);
+ }
+}
+
+#endif //#ifdef HAVE_LIBHIREDIS
#include "util-logopenfile.h"
#include "util-logopenfile-tile.h"
-const char * redis_push_cmd = "LPUSH";
-const char * redis_publish_cmd = "PUBLISH";
+#ifdef HAVE_LIBHIREDIS
+#include "util-log-redis.h"
+#endif /* HAVE_LIBHIREDIS */
/** \brief connect to the indicated local stream socket, logging any errors
* \param path filesystem path to connect to
return 0;
}
-
-#ifdef HAVE_LIBHIREDIS
-
-static void SCLogFileCloseRedis(LogFileCtx *log_ctx)
-{
- if (log_ctx->redis) {
- redisReply *reply;
- int i;
- for (i = 0; i < log_ctx->redis_setup.batch_count; i++) {
- redisGetReply(log_ctx->redis, (void **)&reply);
- if (reply)
- freeReplyObject(reply);
- }
- redisFree(log_ctx->redis);
- log_ctx->redis = NULL;
- }
- log_ctx->redis_setup.tried = 0;
- log_ctx->redis_setup.batch_count = 0;
-}
-
-int SCConfLogOpenRedis(ConfNode *redis_node, LogFileCtx *log_ctx)
-{
- const char *redis_server = NULL;
- const char *redis_port = NULL;
- const char *redis_mode = NULL;
- const char *redis_key = NULL;
-
- if (redis_node) {
- redis_server = ConfNodeLookupChildValue(redis_node, "server");
- redis_port = ConfNodeLookupChildValue(redis_node, "port");
- redis_mode = ConfNodeLookupChildValue(redis_node, "mode");
- redis_key = ConfNodeLookupChildValue(redis_node, "key");
- }
- if (!redis_server) {
- redis_server = "127.0.0.1";
- SCLogInfo("Using default redis server (127.0.0.1)");
- }
- if (!redis_port)
- redis_port = "6379";
- if (!redis_mode)
- redis_mode = "list";
- if (!redis_key)
- redis_key = "suricata";
- log_ctx->redis_setup.key = SCStrdup(redis_key);
-
- if (!log_ctx->redis_setup.key) {
- SCLogError(SC_ERR_MEM_ALLOC, "Unable to allocate redis key name");
- exit(EXIT_FAILURE);
- }
-
- log_ctx->redis_setup.batch_size = 0;
-
- ConfNode *pipelining = ConfNodeLookupChild(redis_node, "pipelining");
- if (pipelining) {
- int enabled = 0;
- int ret;
- intmax_t val;
- ret = ConfGetChildValueBool(pipelining, "enabled", &enabled);
- if (ret && enabled) {
- ret = ConfGetChildValueInt(pipelining, "batch-size", &val);
- if (ret) {
- log_ctx->redis_setup.batch_size = val;
- } else {
- log_ctx->redis_setup.batch_size = 10;
- }
- }
- }
-
- if (!strcmp(redis_mode, "list")) {
- log_ctx->redis_setup.command = redis_push_cmd;
- if (!log_ctx->redis_setup.command) {
- SCLogError(SC_ERR_MEM_ALLOC, "Unable to allocate redis key command");
- exit(EXIT_FAILURE);
- }
- } else {
- log_ctx->redis_setup.command = redis_publish_cmd;
- if (!log_ctx->redis_setup.command) {
- SCLogError(SC_ERR_MEM_ALLOC, "Unable to allocate redis key command");
- exit(EXIT_FAILURE);
- }
- }
- redisContext *c = redisConnect(redis_server, atoi(redis_port));
- if (c != NULL && c->err) {
- SCLogError(SC_ERR_SOCKET, "Error connecting to redis server: %s", c->errstr);
- exit(EXIT_FAILURE);
- }
-
- /* store server params for reconnection */
- log_ctx->redis_setup.server = SCStrdup(redis_server);
- if (!log_ctx->redis_setup.server) {
- SCLogError(SC_ERR_MEM_ALLOC, "Error allocating redis server string");
- exit(EXIT_FAILURE);
- }
- log_ctx->redis_setup.port = atoi(redis_port);
- log_ctx->redis_setup.tried = 0;
-
- log_ctx->redis = c;
-
- log_ctx->Close = SCLogFileCloseRedis;
-
- return 0;
-}
-
-int SCConfLogReopenRedis(LogFileCtx *log_ctx)
-{
- if (log_ctx->redis != NULL) {
- redisFree(log_ctx->redis);
- log_ctx->redis = NULL;
- }
-
- /* only try to reconnect once per second */
- if (log_ctx->redis_setup.tried >= time(NULL)) {
- return -1;
- }
-
- redisContext *c = redisConnect(log_ctx->redis_setup.server, log_ctx->redis_setup.port);
- if (c != NULL && c->err) {
- if (log_ctx->redis_setup.tried == 0) {
- SCLogError(SC_ERR_SOCKET, "Error connecting to redis server: %s\n", c->errstr);
- }
- redisFree(c);
- log_ctx->redis_setup.tried = time(NULL);
- return -1;
- }
- log_ctx->redis = c;
- log_ctx->redis_setup.tried = 0;
- log_ctx->redis_setup.batch_count = 0;
- return 0;
-}
-
-#endif
-
/** \brief LogFileNewCtx() Get a new LogFileCtx
* \retval LogFileCtx * pointer if succesful, NULL if error
* */
lf_ctx->Write = SCLogFileWrite;
lf_ctx->Close = SCLogFileClose;
-#ifdef HAVE_LIBHIREDIS
- lf_ctx->redis_setup.batch_count = 0;
-#endif
-
return lf_ctx;
}
SCMutexUnlock(&lf_ctx->fp_mutex);
}
-#ifdef HAVE_LIBHIREDIS
- if (lf_ctx->type == LOGFILE_TYPE_REDIS) {
- if (lf_ctx->redis)
- redisFree(lf_ctx->redis);
- if (lf_ctx->redis_setup.server)
- SCFree(lf_ctx->redis_setup.server);
- if (lf_ctx->redis_setup.key)
- SCFree(lf_ctx->redis_setup.key);
- }
-#endif
-
SCMutexDestroy(&lf_ctx->fp_mutex);
if (lf_ctx->prefix != NULL) {
SCReturnInt(1);
}
-#ifdef HAVE_LIBHIREDIS
-static int LogFileWriteRedis(LogFileCtx *file_ctx, const char *string, size_t string_len)
-{
- if (file_ctx->redis == NULL) {
- SCConfLogReopenRedis(file_ctx);
- if (file_ctx->redis == NULL) {
- return -1;
- } else {
- SCLogInfo("Reconnected to redis server");
- }
- }
- /* TODO go async here ? */
- if (file_ctx->redis_setup.batch_size) {
- redisAppendCommand(file_ctx->redis, "%s %s %s",
- file_ctx->redis_setup.command,
- file_ctx->redis_setup.key,
- string);
- if (file_ctx->redis_setup.batch_count == file_ctx->redis_setup.batch_size) {
- redisReply *reply;
- int i;
- file_ctx->redis_setup.batch_count = 0;
- for (i = 0; i <= file_ctx->redis_setup.batch_size; i++) {
- if (redisGetReply(file_ctx->redis, (void **)&reply) == REDIS_OK) {
- freeReplyObject(reply);
- } else {
- if (file_ctx->redis->err) {
- SCLogInfo("Error when fetching reply: %s (%d)",
- file_ctx->redis->errstr,
- file_ctx->redis->err);
- }
- switch (file_ctx->redis->err) {
- case REDIS_ERR_EOF:
- case REDIS_ERR_IO:
- SCLogInfo("Reopening connection to redis server");
- SCConfLogReopenRedis(file_ctx);
- if (file_ctx->redis) {
- SCLogInfo("Reconnected to redis server");
- return 0;
- } else {
- SCLogInfo("Unable to reconnect to redis server");
- return 0;
- }
- break;
- default:
- SCLogWarning(SC_ERR_INVALID_VALUE,
- "Unsupported error code %d",
- file_ctx->redis->err);
- return 0;
- }
- }
- }
- } else {
- file_ctx->redis_setup.batch_count++;
- }
- } else {
- redisReply *reply = redisCommand(file_ctx->redis, "%s %s %b",
- file_ctx->redis_setup.command,
- file_ctx->redis_setup.key,
- string, string_len);
-
- /* We may lose the reply if disconnection happens! */
- if (reply) {
- switch (reply->type) {
- case REDIS_REPLY_ERROR:
- SCLogWarning(SC_ERR_SOCKET, "Redis error: %s", reply->str);
- SCConfLogReopenRedis(file_ctx);
- break;
- case REDIS_REPLY_INTEGER:
- SCLogDebug("Redis integer %lld", reply->integer);
- break;
- default:
- SCLogError(SC_ERR_INVALID_VALUE,
- "Redis default triggered with %d", reply->type);
- SCConfLogReopenRedis(file_ctx);
- break;
- }
- freeReplyObject(reply);
- } else {
- SCConfLogReopenRedis(file_ctx);
- }
- }
- return 0;
-}
-#endif
-
int LogFileWrite(LogFileCtx *file_ctx, MemBuffer *buffer)
{
if (file_ctx->type == LOGFILE_TYPE_SYSLOG) {