]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
eve: async mode for redis output
authorfooinha <fooinha@gmail.com>
Thu, 23 Feb 2017 22:42:05 +0000 (22:42 +0000)
committerVictor Julien <victor@inliniac.net>
Thu, 20 Apr 2017 07:33:53 +0000 (09:33 +0200)
eve: detects libevent for async redis at configure
eve: moves redis output code to new file - util-log-redis.{c,h}
eve: redis ECHO and QUIT commands for async mode
eve: redis output defaults if conf is missing

.travis.yml
configure.ac
src/Makefile.am
src/output-json.c
src/util-error.c
src/util-error.h
src/util-log-redis.c [new file with mode: 0644]
src/util-log-redis.h [new file with mode: 0644]
src/util-logopenfile.c
src/util-logopenfile.h
suricata.yaml.in

index 0194a079211b59b6681fa9ad15983da2f4fa022d..c05265f1f7d5c531108d048fda56658277dcd8ed 100644 (file)
@@ -29,6 +29,8 @@ addons:
       - libnfnetlink0
       - libhiredis-dev
       - libjansson-dev
+      - libevent-dev
+      - libevent-pthreads-2.0-5
     # Now define the default set of packages which is those above, and
     # libjansson.
     packages: &packages
index ef0aee1514d95934235e26c4dd15e9294ff1fbfe..0752c6f1aa5a5734a7404a66a50fe083cf013958 100644 (file)
         LDFLAGS="${LDFLAGS} -pie"
     fi
 
+#libevent includes and libraries
+    AC_ARG_WITH(libevent_includes,
+            [  --with-libevent-includes=DIR  libevent include directory],
+            [with_libevent_includes="$withval"],[with_libevent_includes="no"])
+    AC_ARG_WITH(libevent_libraries,
+            [  --with-libevent-libraries=DIR    libevent library directory],
+            [with_libevent_libraries="$withval"],[with_libevent_libraries="no"])
+
 # libhiredis
     AC_ARG_ENABLE(hiredis,
                AS_HELP_STRING([--enable-hiredis],[Enable Redis support]),
             [  --with-libhiredis-libraries=DIR    libhiredis library directory],
             [with_libhiredis_libraries="$withval"],[with_libhiredis_libraries="no"])
 
+    enable_hiredis_async="no"
     if test "$enable_hiredis" = "yes"; then
         if test "$with_libhiredis_includes" != "no"; then
             CPPFLAGS="${CPPFLAGS} -I${with_libhiredis_includes}"
         if test "$HIREDIS" = "yes"; then
             AC_DEFINE([HAVE_LIBHIREDIS],[1],[libhiredis available])
             enable_hiredis="yes"
+            #
+            # Check if async adapters and libevent is installed
+            #
+            AC_CHECK_HEADER("hiredis/adapters/libevent.h",HIREDIS_LIBEVENT_ADAPTER="yes",HIREDIS_LIBEVENT_ADAPTER="no")
+            if test "$HIREDIS_LIBEVENT_ADAPTER" = "yes"; then
+                #Look for libevent headers
+                if test "$with_libevent_includes" != "no"; then
+                    CPPFLAGS="${CPPFLAGS} -I${with_libevent_includes}"
+                fi
+                AC_CHECK_HEADER("event.h",LIBEVENT="yes",LIBEVENT="no")
+                if test "$LIBEVENT" = "yes"; then
+                    if test "$with_libevent_libraries" != "no"; then
+                        LDFLAGS="${LDFLAGS}  -L${with_libevent_libraries}"
+                    fi
+                    AC_CHECK_LIB(event, event_base_free,, HAVE_LIBEVENT="no")
+                    AC_CHECK_LIB(event_pthreads, evthread_use_pthreads,, HAVE_LIBEVENT_PTHREADS="no")
+                fi
+                if test "$HAVE_LIBEVENT" = "no" -o test "$HAVE_LIBEVENT_PTHREADS" = "no" ; then
+                    if test "$HAVE_LIBEVENT" = "no"; then
+                        echo
+                        echo "  Async mode for redis output will not be available."
+                        echo "  To enable it install libevent"
+                        echo
+                        echo "   Ubuntu: apt-get install libevent-dev"
+                        echo "   Fedora: dnf install event-devel"
+                        echo "   RHEL/CentOS: yum install event-devel"
+                        echo
+                   fi
+                   if test "$HAVE_LIBEVENT_PTHREADS" = "no"; then
+                        echo
+                        echo "  Async mode for redis output will not be available."
+                        echo "  To enable it install libevent with pthreads support"
+                        echo
+                        echo "   Ubuntu: apt-get install libevent-pthreads-2.0-5"
+                        echo
+                   fi
+                else
+                    AC_DEFINE([HAVE_LIBEVENT],[1],[libevent available])
+                    enable_hiredis_async="yes"
+                fi
+            fi
         fi
     fi
 
@@ -2027,6 +2077,7 @@ SURICATA_BUILD_CONF="Suricata Configuration:
   libnspr support:                         ${enable_nspr}
   libjansson support:                      ${enable_jansson}
   hiredis support:                         ${enable_hiredis}
+  hiredis async with libevent:             ${enable_hiredis_async}
   Prelude support:                         ${enable_prelude}
   PCRE jit:                                ${pcre_jit_available}
   LUA support:                             ${enable_lua}
index de7006d05f0ff8d0e92d256ab9dab2f42d3665e7..f5ecd6afa1df56e7985e77599466cae2ec6bbbb2 100644 (file)
@@ -386,6 +386,7 @@ util-ioctl.h util-ioctl.c \
 util-ip.h util-ip.c \
 util-logopenfile.h util-logopenfile.c \
 util-logopenfile-tile.h util-logopenfile-tile.c \
+util-log-redis.h util-log-redis.c \
 util-lua.c util-lua.h \
 util-luajit.c util-luajit.h \
 util-lua-common.c util-lua-common.h \
index 5a457cd5046ffc68ac43e29b4ea647e5b40eab4b..7df5660a2c5bcd4b84ee4f3a7d47117125b809ed 100644 (file)
@@ -56,6 +56,7 @@
 #include "util-optimize.h"
 #include "util-buffer.h"
 #include "util-logopenfile.h"
+#include "util-log-redis.h"
 #include "util-device.h"
 
 #include "flow-var.h"
@@ -595,6 +596,7 @@ OutputCtx *OutputJsonInitCtx(ConfNode *conf)
                 json_ctx->json_out = LOGFILE_TYPE_UNIX_STREAM;
             } else if (strcmp(output_s, "redis") == 0) {
 #ifdef HAVE_LIBHIREDIS
+                SCLogRedisInit();
                 json_ctx->json_out = LOGFILE_TYPE_REDIS;
 #else
                 SCLogError(SC_ERR_INVALID_ARGUMENT,
index 99b7e55cf7267c49c1f655cc69f4dd955fae3962..29275f7d382e2ce024086ea195deca1a95159452 100644 (file)
@@ -340,6 +340,7 @@ const char * SCErrorToString(SCError err)
         CASE_CODE (SC_WARN_CHMOD);
         CASE_CODE (SC_WARN_LOG_CF_TOO_MANY_NODES);
         CASE_CODE (SC_WARN_EVENT_DROPPED);
+        CASE_CODE (SC_ERR_NO_REDIS_ASYNC);
     }
 
     return "UNKNOWN_ERROR";
index dbafabe1009aad79e5b124c58adaf1d0098da29b..9dc3c79d5b8f01b9699e2503d70d93a0b568f5d6 100644 (file)
@@ -330,6 +330,7 @@ typedef enum {
     SC_WARN_CHMOD,
     SC_WARN_LOG_CF_TOO_MANY_NODES,
     SC_WARN_EVENT_DROPPED,
+    SC_ERR_NO_REDIS_ASYNC
 } SCError;
 
 const char *SCErrorToString(SCError);
diff --git a/src/util-log-redis.c b/src/util-log-redis.c
new file mode 100644 (file)
index 0000000..50d33a9
--- /dev/null
@@ -0,0 +1,572 @@
+/* vi: set et ts=4: */
+/* Copyright (C) 2007-2016 Open Information Security Foundation
+ *
+ * You can copy, redistribute or modify this Program under the terms of
+ * the GNU General Public License version 2 as published by the Free
+ * Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301, USA.
+ */
+
+/**
+ * \file
+ *
+ * \author Paulo Pacheco <fooinha@gmail.com>
+ *
+ * File-like output for logging:  redis
+ */
+#include "suricata-common.h" /* errno.h, string.h, etc. */
+#include "util-log-redis.h"
+#include "util-logopenfile.h"
+
+#ifdef HAVE_LIBHIREDIS
+
+#ifdef HAVE_LIBEVENT_PTHREADS
+#include <event2/thread.h>
+#endif /* HAVE_LIBEVENT_PTHREADS */
+
+static const char * redis_push_cmd = "LPUSH";
+static const char * redis_publish_cmd = "PUBLISH";
+static const char * redis_default_key = "suricata";
+static const char * redis_default_server = "127.0.0.1";
+
+static int SCConfLogReopenSyncRedis(LogFileCtx *log_ctx);
+static void SCLogFileCloseRedis(LogFileCtx *log_ctx);
+
+/**
+ * \brief SCLogRedisInit() - Initializes global stuff before threads
+ */
+void SCLogRedisInit()
+{
+#ifdef HAVE_LIBEVENT_PTHREADS
+    evthread_use_pthreads();
+#endif /* HAVE_LIBEVENT_PTHREADS */
+}
+
+/** \brief SCLogRedisContextAlloc() - Allocates and initalizes redis context
+ */
+static SCLogRedisContext * SCLogRedisContextAlloc()
+{
+    SCLogRedisContext* ctx = (SCLogRedisContext*) SCMalloc(sizeof(SCLogRedisContext));
+    if (ctx == NULL) {
+        SCLogError(SC_ERR_MEM_ALLOC, "Unable to allocate redis context");
+        exit(EXIT_FAILURE);
+    }
+    ctx->sync = NULL;
+#if HAVE_LIBEVENT
+    ctx->ev_base = NULL;
+    ctx->async   = NULL;
+#endif
+    ctx->batch_count = 0;
+    ctx->tried = 0;
+
+    return ctx;
+}
+
+#ifdef HAVE_LIBEVENT
+
+static int SCConfLogReopenAsyncRedis(LogFileCtx *log_ctx);
+#include <hiredis/adapters/libevent.h>
+
+/** \brief SCLogRedisAsyncContextAlloc() - Allocates and initalizes redis context with async
+ */
+static SCLogRedisContext * SCLogRedisContextAsyncAlloc()
+{
+    SCLogRedisContext* ctx = (SCLogRedisContext*) SCMalloc(sizeof(SCLogRedisContext));
+    if (unlikely(ctx == NULL)) {
+        SCLogError(SC_ERR_MEM_ALLOC, "Unable to allocate redis context");
+        exit(EXIT_FAILURE);
+    }
+
+    ctx->sync = NULL;
+    ctx->async   = NULL;
+    ctx->ev_base = NULL;
+    ctx->connected = 0;
+    ctx->batch_count = 0;
+    ctx->tried = 0;
+
+    return ctx;
+}
+
+/** \brief SCRedisAsyncCommandCallback() Callback when reply from redis happens.
+ *  \param ac redis async context
+ *  \param r redis reply
+ *  \param privvata opaque datq with pointer to LogFileCtx
+ */
+static void SCRedisAsyncCommandCallback(redisAsyncContext *ac, void *r, void *privdata)
+{
+    redisReply *reply = r;
+    LogFileCtx *log_ctx = privdata;
+    SCLogRedisContext *ctx = log_ctx->redis;
+
+    if (reply == NULL) {
+        if (ctx->connected > 0)
+            SCLogInfo("Missing reply from redis, disconnected.");
+        ctx->connected = 0;
+    } else {
+        ctx->connected = 1;
+        event_base_loopbreak(ctx->ev_base);
+    }
+}
+
+/** \brief SCRedisAsyncEchoCommandCallback() Callback for an ECHO command reply
+ *         This is used to check if redis is connected.
+ *  \param ac redis async context
+ *  \param r redis reply
+ *  \param privvata opaque datq with pointer to LogFileCtx
+ */
+static void SCRedisAsyncEchoCommandCallback(redisAsyncContext *ac, void *r, void *privdata)
+{
+    redisReply *reply = r;
+    SCLogRedisContext * ctx = privdata;
+
+    if (reply) {
+       if (ctx->connected == 0) {
+          SCLogNotice("Connected to Redis.");
+          ctx->connected = 1;
+          ctx->tried = 0;
+       }
+    } else {
+       ctx->connected = 0;
+       if (ctx->tried == 0) {
+          SCLogWarning(SC_ERR_SOCKET, "Failed to connect to Redis... (will keep trying)");
+       }
+       ctx->tried = time(NULL);
+    }
+    event_base_loopbreak(ctx->ev_base);
+}
+
+/** \brief SCRedisAsyncEchoCommandCallback() Callback for an QUIT command reply
+ *         Emits and awaits response for an async ECHO command.
+ *         It's used for check if redis is alive.
+ *  \param ctx redis context
+ */
+static void SCLogAsyncRedisSendEcho(SCLogRedisContext * ctx)
+{
+    redisAsyncCommand(ctx->async, SCRedisAsyncEchoCommandCallback, ctx, "ECHO suricata");
+    event_base_dispatch(ctx->ev_base);
+}
+
+/** \brief SCRedisAsyncEchoCommandCallback() Callback for an QUIT command reply
+ *         This is used to terminate connection with redis.
+ *  \param ac redis async context
+ *  \param r redis reply
+ *  \param privvata opaque datq with pointer to LogFileCtx
+ */
+static void SCRedisAsyncQuitCommandCallback(redisAsyncContext *ac, void *r, void *privdata)
+{
+    SCLogInfo("Disconnecting from redis!");
+}
+
+/** \brief QUIT command
+ *         Emits and awaits response for an async QUIT command.
+ *         It's used to disconnect with redis
+ *  \param ctx redis context
+ */
+static void SCLogAsyncRedisSendQuit(SCLogRedisContext * ctx)
+{
+    if (ctx->connected) {
+        redisAsyncCommand(ctx->async, SCRedisAsyncQuitCommandCallback, ctx, "QUIT");
+        SCLogInfo("QUIT Command sent to redis. Connection will terminate!");
+    }
+
+    redisAsyncFree(ctx->async);
+    event_base_dispatch(ctx->ev_base);
+    ctx->async = NULL;
+    event_base_free(ctx->ev_base);
+    ctx->ev_base = NULL;
+    ctx->connected = 0;
+}
+
+/** \brief SCConfLogReopenAsyncRedis() Open or re-opens connection to redis for logging.
+ *  \param log_ctx Log file context allocated by caller
+ */
+static int SCConfLogReopenAsyncRedis(LogFileCtx *log_ctx)
+{
+    SCLogRedisContext * ctx = log_ctx->redis;
+    const char *redis_server = log_ctx->redis_setup.server;
+    int redis_port = log_ctx->redis_setup.port;
+
+    /* only try to reconnect once per second */
+    if (ctx->tried >= time(NULL)) {
+        return -1;
+    }
+
+    ctx->async = redisAsyncConnect(redis_server, redis_port);
+
+    if (ctx->ev_base != NULL) {
+        event_base_free(ctx->ev_base);
+    }
+
+    if (ctx->async == NULL) {
+        SCLogError(SC_ERR_MEM_ALLOC, "Error allocate redis async.");
+        ctx->tried = time(NULL);
+        return -1;
+    }
+
+    if (ctx->async != NULL && ctx->async->err) {
+        SCLogError(SC_ERR_SOCKET, "Error setting to redis async: [%s].", ctx->async->errstr);
+        ctx->tried = time(NULL);
+        return -1;
+    }
+
+    ctx->ev_base = event_base_new();
+
+    if (ctx->ev_base == NULL) {
+        ctx->tried = time(NULL);
+        redisAsyncFree(ctx->async);
+        ctx->async = NULL;
+        return -1;
+    }
+
+    redisLibeventAttach(ctx->async, ctx->ev_base);
+
+    log_ctx->redis = ctx;
+    log_ctx->Close = SCLogFileCloseRedis;
+    return 0;
+}
+
+
+/** \brief SCLogRedisWriteAsync() writes string to redis output in async mode
+ *  \param file_ctx Log file context allocated by caller
+ *  \param string Buffer to output
+ */
+static int SCLogRedisWriteAsync(LogFileCtx *file_ctx, const char *string, size_t string_len)
+{
+    SCLogRedisContext *ctx = file_ctx->redis;
+
+    if (! ctx->connected) {
+        if (SCConfLogReopenAsyncRedis(file_ctx) == -1) {
+            return -1;
+        }
+        if (ctx->tried == 0) {
+           SCLogNotice("Trying to connect to Redis");
+        }
+        SCLogAsyncRedisSendEcho(ctx);
+    }
+
+    if (!ctx->connected) {
+        return -1;
+    }
+
+    if (ctx->async == NULL) {
+        return -1;
+    }
+
+    redisAsyncCommand(ctx->async,
+            SCRedisAsyncCommandCallback,
+            file_ctx,
+            "%s %s %s",
+            file_ctx->redis_setup.command,
+            file_ctx->redis_setup.key,
+            string);
+
+    event_base_loop(ctx->ev_base, EVLOOP_NONBLOCK);
+
+    return 0;
+}
+
+#endif// HAVE_LIBEVENT
+
+/** \brief SCConfLogReopenSyncRedis() Open or re-opens connection to redis for logging.
+ *  \param log_ctx Log file context allocated by caller
+ */
+static int SCConfLogReopenSyncRedis(LogFileCtx *log_ctx)
+{
+    SCLogRedisContext * ctx = log_ctx->redis;
+
+    /* only try to reconnect once per second */
+    if (ctx->tried >= time(NULL)) {
+        return -1;
+    }
+
+    const char *redis_server = log_ctx->redis_setup.server;
+    int redis_port = log_ctx->redis_setup.port;
+
+    if (ctx->sync != NULL)  {
+        redisFree(ctx->sync);
+    }
+    ctx->sync = redisConnect(redis_server, redis_port);
+    if (ctx->sync == NULL) {
+        SCLogError(SC_ERR_SOCKET, "Error connecting to redis server.");
+        ctx->tried = time(NULL);
+        return -1;
+    }
+    if (ctx->sync->err) {
+        SCLogError(SC_ERR_SOCKET, "Error connecting to redis server: [%s].", ctx->sync->errstr);
+        redisFree(ctx->sync);
+        ctx->sync = NULL;
+        ctx->tried = time(NULL);
+        return -1;
+    }
+    SCLogInfo("Connected to redis server [%s].", log_ctx->redis_setup.server);
+
+    log_ctx->redis = ctx;
+    log_ctx->Close = SCLogFileCloseRedis;
+    return 0;
+}
+/** \brief SCLogRedisWriteSync() writes string to redis output in sync mode
+ *  \param file_ctx Log file context allocated by caller
+ *  \param string Buffer to output
+ */
+static int SCLogRedisWriteSync(LogFileCtx *file_ctx, const char *string)
+{
+    SCLogRedisContext * ctx = file_ctx->redis;
+    int ret = -1;
+    redisContext *redis = ctx->sync;
+    if (redis == NULL) {
+        SCConfLogReopenSyncRedis(file_ctx);
+        redis = ctx->sync;
+        if (redis == NULL) {
+            SCLogDebug("Redis after re-open is not available.");
+            return -1;
+        }
+    }
+
+    /* synchronous mode */
+    if (file_ctx->redis_setup.batch_size) {
+        redisAppendCommand(redis, "%s %s %s",
+                file_ctx->redis_setup.command,
+                file_ctx->redis_setup.key,
+                string);
+        if (ctx->batch_count == file_ctx->redis_setup.batch_size) {
+            redisReply *reply;
+            int i;
+            ctx->batch_count = 0;
+            for (i = 0; i <= file_ctx->redis_setup.batch_size; i++) {
+                if (redisGetReply(redis, (void **)&reply) == REDIS_OK) {
+                    freeReplyObject(reply);
+                    ret = 0;
+                } else {
+                    if (redis->err) {
+                        SCLogInfo("Error when fetching reply: %s (%d)",
+                                redis->errstr,
+                                redis->err);
+                    }
+                    switch (redis->err) {
+                        case REDIS_ERR_EOF:
+                        case REDIS_ERR_IO:
+                            SCLogInfo("Reopening connection to redis server");
+                            SCConfLogReopenSyncRedis(file_ctx);
+                            redis = ctx->sync;
+                            if (redis) {
+                                SCLogInfo("Reconnected to redis server");
+                            } else {
+                                SCLogInfo("Unable to reconnect to redis server");
+                                return -1;
+                            }
+                            break;
+                        default:
+                            SCLogWarning(SC_ERR_INVALID_VALUE,
+                                    "Unsupported error code %d",
+                                    redis->err);
+                            return -1;
+                    }
+                }
+            }
+        } else {
+            ctx->batch_count++;
+        }
+    } else {
+        redisReply *reply = redisCommand(redis, "%s %s %s",
+                file_ctx->redis_setup.command,
+                file_ctx->redis_setup.key,
+                string);
+        /* We may lose the reply if disconnection happens*/
+        if (reply) {
+            switch (reply->type) {
+                case REDIS_REPLY_ERROR:
+                    SCLogWarning(SC_ERR_SOCKET, "Redis error: %s", reply->str);
+                    SCConfLogReopenSyncRedis(file_ctx);
+                    break;
+                case REDIS_REPLY_INTEGER:
+                    SCLogDebug("Redis integer %lld", reply->integer);
+                    ret = 0;
+                    break;
+                default:
+                    SCLogError(SC_ERR_INVALID_VALUE,
+                            "Redis default triggered with %d", reply->type);
+                    SCConfLogReopenSyncRedis(file_ctx);
+                    break;
+            }
+            freeReplyObject(reply);
+        } else {
+            SCConfLogReopenSyncRedis(file_ctx);
+        }
+    }
+    return ret;
+}
+
+/**
+ * \brief LogFileWriteRedis() writes log data to redis output.
+ * \param log_ctx Log file context allocated by caller
+ * \param string buffer with data to write
+ * \param string_len data length
+ * \retval 0 on sucess;
+ * \retval -1 on failure;
+ */
+int LogFileWriteRedis(void *lf_ctx, const char *string, size_t string_len)
+{
+    LogFileCtx *file_ctx = lf_ctx;
+    if (file_ctx == NULL) {
+        return -1;
+    }
+
+#if HAVE_LIBEVENT
+    /* async mode on */
+    if (file_ctx->redis_setup.is_async) {
+        return SCLogRedisWriteAsync(file_ctx, string, string_len);
+    }
+#endif
+    /* sync mode */
+    if (! file_ctx->redis_setup.is_async) {
+        return SCLogRedisWriteSync(file_ctx, string);
+    }
+    return -1;
+}
+
+/** \brief configure and initializes redis output logging
+ *  \param conf ConfNode structure for the output section in question
+ *  \param log_ctx Log file context allocated by caller
+ *  \retval 0 on success
+ */
+int SCConfLogOpenRedis(ConfNode *redis_node, void *lf_ctx)
+{
+    LogFileCtx *log_ctx = lf_ctx;
+
+    const char *redis_port = NULL;
+    const char *redis_mode = NULL;
+
+    int is_async = 0;
+
+    if (redis_node) {
+        log_ctx->redis_setup.server = ConfNodeLookupChildValue(redis_node, "server");
+        log_ctx->redis_setup.key =  ConfNodeLookupChildValue(redis_node, "key");
+
+        redis_port =  ConfNodeLookupChildValue(redis_node, "port");
+        redis_mode =  ConfNodeLookupChildValue(redis_node, "mode");
+
+        ConfGetChildValueBool(redis_node, "async", &is_async);
+    }
+    if (!log_ctx->redis_setup.server) {
+        log_ctx->redis_setup.server = redis_default_server;
+        SCLogInfo("Using default redis server (127.0.0.1)");
+    }
+    if (!redis_port)
+        redis_port = "6379";
+    if (!redis_mode)
+        redis_mode = "list";
+    if (!log_ctx->redis_setup.key) {
+        log_ctx->redis_setup.key = redis_default_key;
+    }
+
+#ifndef HAVE_LIBEVENT
+    if (is_async) {
+        SCLogWarning(SC_ERR_NO_REDIS_ASYNC, "async option not available.");
+    }
+    is_async = 0;
+#endif //ifndef HAVE_LIBEVENT
+
+    log_ctx->redis_setup.is_async = is_async;
+    log_ctx->redis_setup.batch_size = 0;
+    if (redis_node) {
+        ConfNode *pipelining = ConfNodeLookupChild(redis_node, "pipelining");
+        if (pipelining) {
+            int enabled = 0;
+            int ret;
+            intmax_t val;
+            ret = ConfGetChildValueBool(pipelining, "enabled", &enabled);
+            if (ret && enabled) {
+                ret = ConfGetChildValueInt(pipelining, "batch-size", &val);
+                if (ret) {
+                    log_ctx->redis_setup.batch_size = val;
+                } else {
+                    log_ctx->redis_setup.batch_size = 10;
+                }
+            }
+        }
+    } else {
+        log_ctx->redis_setup.batch_size = 0;
+    }
+
+    if (!strcmp(redis_mode, "list")) {
+        log_ctx->redis_setup.command = redis_push_cmd;
+    } else {
+        log_ctx->redis_setup.command = redis_publish_cmd;
+    }
+    /* store server params for reconnection */
+    if (!log_ctx->redis_setup.server) {
+        SCLogError(SC_ERR_MEM_ALLOC, "Error allocating redis server string");
+        exit(EXIT_FAILURE);
+    }
+    log_ctx->redis_setup.port = atoi(redis_port);
+    log_ctx->Close = SCLogFileCloseRedis;
+
+#ifdef HAVE_LIBEVENT
+    if (is_async) {
+        log_ctx->redis = SCLogRedisContextAsyncAlloc();
+    }
+#endif /*HAVE_LIBEVENT*/
+    if (! is_async) {
+        log_ctx->redis = SCLogRedisContextAlloc();
+        SCConfLogReopenSyncRedis(log_ctx);
+    }
+    return 0;
+}
+
+/** \brief SCLogFileCloseRedis() Closes redis log more
+ *  \param log_ctx Log file context allocated by caller
+ */
+void SCLogFileCloseRedis(LogFileCtx *log_ctx)
+{
+    SCLogRedisContext * ctx = log_ctx->redis;
+    if (ctx == NULL) {
+        return;
+    }
+    /* asynchronous */
+    if (log_ctx->redis_setup.is_async) {
+#if HAVE_LIBEVENT == 1
+        if (ctx->async) {
+            if (ctx->connected > 0) {
+                SCLogAsyncRedisSendQuit(ctx);
+            }
+            if (ctx->ev_base != NULL) {
+                event_base_free(ctx->ev_base);
+                ctx->ev_base = NULL;
+            }
+        }
+#endif
+    }
+
+    /* synchronous */
+    if (!log_ctx->redis_setup.is_async) {
+        if (ctx->sync) {
+            redisReply *reply;
+            int i;
+            for (i = 0; i < ctx->batch_count; i++) {
+                redisGetReply(ctx->sync, (void **)&reply);
+                if (reply) {
+                    freeReplyObject(reply);
+                }
+            }
+            redisFree(ctx->sync);
+            ctx->sync = NULL;
+        }
+        ctx->tried = 0;
+        ctx->batch_count = 0;
+    }
+
+    if (ctx != NULL) {
+        SCFree(ctx);
+    }
+}
+
+#endif //#ifdef HAVE_LIBHIREDIS
diff --git a/src/util-log-redis.h b/src/util-log-redis.h
new file mode 100644 (file)
index 0000000..ff8bfc4
--- /dev/null
@@ -0,0 +1,65 @@
+/* Copyright (C) 2016 Open Information Security Foundation
+ *
+ * You can copy, redistribute or modify this Program under the terms of
+ * the GNU General Public License version 2 as published by the Free
+ * Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301, USA.
+ */
+
+/**
+ * \file
+ *
+ * \author Paulo Pacheco <fooinha@gmail.com>
+ */
+
+#ifndef __UTIL_LOG_REDIS_H__
+#define __UTIL_LOG_REDIS_H__
+
+#ifdef HAVE_LIBHIREDIS
+#include <hiredis/hiredis.h>
+
+
+#ifdef HAVE_LIBEVENT
+#include <hiredis/async.h>
+#endif /* HAVE_LIBEVENT */
+
+#include "conf.h"            /* ConfNode   */
+
+enum RedisMode { REDIS_LIST, REDIS_CHANNEL };
+
+typedef struct RedisSetup_ {
+    enum RedisMode mode;
+    const char *command;
+    const char *key;
+    const char *server;
+    int  port;
+    int is_async;
+    int  batch_size;
+} RedisSetup;
+
+typedef struct SCLogRedisContext_ {
+    redisContext *sync;
+#if HAVE_LIBEVENT
+    redisAsyncContext *async;
+    struct event_base *ev_base;
+    int connected;
+#endif /* HAVE_LIBEVENT */
+    time_t tried;
+    int  batch_count;
+} SCLogRedisContext;
+
+void SCLogRedisInit();
+int SCConfLogOpenRedis(ConfNode *, void *);
+int LogFileWriteRedis(void *, const char *, size_t);
+
+#endif /* HAVE_LIBHIREDIS */
+#endif /* __UTIL_LOG_REDIS_H__ */
index 376a77af29eb74d968fe7da86aff25abda09c2fe..b5863f84ea43c8e23a24319790a0e1fed39277cb 100644 (file)
@@ -35,8 +35,9 @@
 #include "util-logopenfile.h"
 #include "util-logopenfile-tile.h"
 
-const char * redis_push_cmd = "LPUSH";
-const char * redis_publish_cmd = "PUBLISH";
+#ifdef HAVE_LIBHIREDIS
+#include "util-log-redis.h"
+#endif /* HAVE_LIBHIREDIS */
 
 /** \brief connect to the indicated local stream socket, logging any errors
  *  \param path filesystem path to connect to
@@ -550,138 +551,6 @@ int SCConfLogReopen(LogFileCtx *log_ctx)
     return 0;
 }
 
-
-#ifdef HAVE_LIBHIREDIS
-
-static void SCLogFileCloseRedis(LogFileCtx *log_ctx)
-{
-    if (log_ctx->redis) {
-        redisReply *reply;
-        int i;
-        for (i = 0; i < log_ctx->redis_setup.batch_count; i++) {
-            redisGetReply(log_ctx->redis, (void **)&reply);
-            if (reply)
-                freeReplyObject(reply);
-        }
-        redisFree(log_ctx->redis);
-        log_ctx->redis = NULL;
-    }
-    log_ctx->redis_setup.tried = 0;
-    log_ctx->redis_setup.batch_count = 0;
-}
-
-int SCConfLogOpenRedis(ConfNode *redis_node, LogFileCtx *log_ctx)
-{
-    const char *redis_server = NULL;
-    const char *redis_port = NULL;
-    const char *redis_mode = NULL;
-    const char *redis_key = NULL;
-
-    if (redis_node) {
-        redis_server = ConfNodeLookupChildValue(redis_node, "server");
-        redis_port =  ConfNodeLookupChildValue(redis_node, "port");
-        redis_mode =  ConfNodeLookupChildValue(redis_node, "mode");
-        redis_key =  ConfNodeLookupChildValue(redis_node, "key");
-    }
-    if (!redis_server) {
-        redis_server = "127.0.0.1";
-        SCLogInfo("Using default redis server (127.0.0.1)");
-    }
-    if (!redis_port)
-        redis_port = "6379";
-    if (!redis_mode)
-        redis_mode = "list";
-    if (!redis_key)
-        redis_key = "suricata";
-    log_ctx->redis_setup.key = SCStrdup(redis_key);
-
-    if (!log_ctx->redis_setup.key) {
-        SCLogError(SC_ERR_MEM_ALLOC, "Unable to allocate redis key name");
-        exit(EXIT_FAILURE);
-    }
-
-    log_ctx->redis_setup.batch_size = 0;
-
-    ConfNode *pipelining = ConfNodeLookupChild(redis_node, "pipelining");
-    if (pipelining) {
-        int enabled = 0;
-        int ret;
-        intmax_t val;
-        ret = ConfGetChildValueBool(pipelining, "enabled", &enabled);
-        if (ret && enabled) {
-            ret = ConfGetChildValueInt(pipelining, "batch-size", &val);
-            if (ret) {
-                log_ctx->redis_setup.batch_size = val;
-            } else {
-                log_ctx->redis_setup.batch_size = 10;
-            }
-        }
-    }
-
-    if (!strcmp(redis_mode, "list")) {
-        log_ctx->redis_setup.command = redis_push_cmd;
-        if (!log_ctx->redis_setup.command) {
-            SCLogError(SC_ERR_MEM_ALLOC, "Unable to allocate redis key command");
-            exit(EXIT_FAILURE);
-        }
-    } else {
-        log_ctx->redis_setup.command = redis_publish_cmd;
-        if (!log_ctx->redis_setup.command) {
-            SCLogError(SC_ERR_MEM_ALLOC, "Unable to allocate redis key command");
-            exit(EXIT_FAILURE);
-        }
-    }
-    redisContext *c = redisConnect(redis_server, atoi(redis_port));
-    if (c != NULL && c->err) {
-        SCLogError(SC_ERR_SOCKET, "Error connecting to redis server: %s", c->errstr);
-        exit(EXIT_FAILURE);
-    }
-
-    /* store server params for reconnection */
-    log_ctx->redis_setup.server = SCStrdup(redis_server);
-    if (!log_ctx->redis_setup.server) {
-        SCLogError(SC_ERR_MEM_ALLOC, "Error allocating redis server string");
-        exit(EXIT_FAILURE);
-    }
-    log_ctx->redis_setup.port = atoi(redis_port);
-    log_ctx->redis_setup.tried = 0;
-
-    log_ctx->redis = c;
-
-    log_ctx->Close = SCLogFileCloseRedis;
-
-    return 0;
-}
-
-int SCConfLogReopenRedis(LogFileCtx *log_ctx)
-{
-    if (log_ctx->redis != NULL) {
-        redisFree(log_ctx->redis);
-        log_ctx->redis = NULL;
-    }
-
-    /* only try to reconnect once per second */
-    if (log_ctx->redis_setup.tried >= time(NULL)) {
-        return -1;
-    }
-
-    redisContext *c = redisConnect(log_ctx->redis_setup.server, log_ctx->redis_setup.port);
-    if (c != NULL && c->err) {
-        if (log_ctx->redis_setup.tried == 0) {
-            SCLogError(SC_ERR_SOCKET, "Error connecting to redis server: %s\n", c->errstr);
-        }
-        redisFree(c);
-        log_ctx->redis_setup.tried = time(NULL);
-        return -1;
-    }
-    log_ctx->redis = c;
-    log_ctx->redis_setup.tried = 0;
-    log_ctx->redis_setup.batch_count = 0;
-    return 0;
-}
-
-#endif
-
 /** \brief LogFileNewCtx() Get a new LogFileCtx
  *  \retval LogFileCtx * pointer if succesful, NULL if error
  *  */
@@ -700,10 +569,6 @@ LogFileCtx *LogFileNewCtx(void)
     lf_ctx->Write = SCLogFileWrite;
     lf_ctx->Close = SCLogFileClose;
 
-#ifdef HAVE_LIBHIREDIS
-    lf_ctx->redis_setup.batch_count = 0;
-#endif
-
     return lf_ctx;
 }
 
@@ -723,17 +588,6 @@ int LogFileFreeCtx(LogFileCtx *lf_ctx)
         SCMutexUnlock(&lf_ctx->fp_mutex);
     }
 
-#ifdef HAVE_LIBHIREDIS
-    if (lf_ctx->type == LOGFILE_TYPE_REDIS) {
-        if (lf_ctx->redis)
-            redisFree(lf_ctx->redis);
-        if (lf_ctx->redis_setup.server)
-            SCFree(lf_ctx->redis_setup.server);
-        if (lf_ctx->redis_setup.key)
-            SCFree(lf_ctx->redis_setup.key);
-    }
-#endif
-
     SCMutexDestroy(&lf_ctx->fp_mutex);
 
     if (lf_ctx->prefix != NULL) {
@@ -754,91 +608,6 @@ int LogFileFreeCtx(LogFileCtx *lf_ctx)
     SCReturnInt(1);
 }
 
-#ifdef HAVE_LIBHIREDIS
-static int  LogFileWriteRedis(LogFileCtx *file_ctx, const char *string, size_t string_len)
-{
-    if (file_ctx->redis == NULL) {
-        SCConfLogReopenRedis(file_ctx);
-        if (file_ctx->redis == NULL) {
-            return -1;
-        } else {
-            SCLogInfo("Reconnected to redis server");
-        }
-    }
-    /* TODO go async here ? */
-    if (file_ctx->redis_setup.batch_size) {
-        redisAppendCommand(file_ctx->redis, "%s %s %s",
-                file_ctx->redis_setup.command,
-                file_ctx->redis_setup.key,
-                string);
-        if (file_ctx->redis_setup.batch_count == file_ctx->redis_setup.batch_size) {
-            redisReply *reply;
-            int i;
-            file_ctx->redis_setup.batch_count = 0;
-            for (i = 0; i <= file_ctx->redis_setup.batch_size; i++) {
-                if (redisGetReply(file_ctx->redis, (void **)&reply) == REDIS_OK) {
-                    freeReplyObject(reply);
-                } else {
-                    if (file_ctx->redis->err) {
-                        SCLogInfo("Error when fetching reply: %s (%d)",
-                                file_ctx->redis->errstr,
-                                file_ctx->redis->err);
-                    }
-                    switch (file_ctx->redis->err) {
-                        case REDIS_ERR_EOF:
-                        case REDIS_ERR_IO:
-                            SCLogInfo("Reopening connection to redis server");
-                            SCConfLogReopenRedis(file_ctx);
-                            if (file_ctx->redis) {
-                                SCLogInfo("Reconnected to redis server");
-                                return 0;
-                            } else {
-                                SCLogInfo("Unable to reconnect to redis server");
-                                return 0;
-                            }
-                            break;
-                        default:
-                            SCLogWarning(SC_ERR_INVALID_VALUE,
-                                    "Unsupported error code %d",
-                                    file_ctx->redis->err);
-                            return 0;
-                    }
-                }
-            }
-        } else {
-            file_ctx->redis_setup.batch_count++;
-        }
-    } else {
-        redisReply *reply = redisCommand(file_ctx->redis, "%s %s %b",
-                file_ctx->redis_setup.command,
-                file_ctx->redis_setup.key,
-                string, string_len);
-
-        /*  We may lose the reply if disconnection happens! */
-        if (reply) {
-            switch (reply->type) {
-                case REDIS_REPLY_ERROR:
-                    SCLogWarning(SC_ERR_SOCKET, "Redis error: %s", reply->str);
-                    SCConfLogReopenRedis(file_ctx);
-                    break;
-                case REDIS_REPLY_INTEGER:
-                    SCLogDebug("Redis integer %lld", reply->integer);
-                    break;
-                default:
-                    SCLogError(SC_ERR_INVALID_VALUE,
-                            "Redis default triggered with %d", reply->type);
-                    SCConfLogReopenRedis(file_ctx);
-                    break;
-            }
-            freeReplyObject(reply);
-        } else {
-            SCConfLogReopenRedis(file_ctx);
-        }
-    }
-    return 0;
-}
-#endif
-
 int LogFileWrite(LogFileCtx *file_ctx, MemBuffer *buffer)
 {
     if (file_ctx->type == LOGFILE_TYPE_SYSLOG) {
index 857f8b821c700eb18d6f65df08fedc57af92155a..ae99ad8a5617be86671967a561f2ab7e59606fdb 100644 (file)
@@ -29,8 +29,9 @@
 #include "util-buffer.h"
 
 #ifdef HAVE_LIBHIREDIS
-#include "hiredis/hiredis.h"
-#endif
+#include "util-log-redis.h"
+#endif /* HAVE_LIBHIREDIS */
+
 
 typedef struct {
     uint16_t fileno;
@@ -46,20 +47,6 @@ typedef struct SyslogSetup_ {
     int alert_syslog_level;
 } SyslogSetup;
 
-#ifdef HAVE_LIBHIREDIS
-enum RedisMode { REDIS_LIST, REDIS_CHANNEL };
-
-typedef struct RedisSetup_ {
-    enum RedisMode mode;
-    const char *command;
-    char *key;
-    int  batch_size;
-    int  batch_count;
-    char *server;
-    int  port;
-    time_t tried;
-} RedisSetup;
-#endif
 
 /** Global structure for Output Context */
 typedef struct LogFileCtx_ {
@@ -67,7 +54,7 @@ typedef struct LogFileCtx_ {
         FILE *fp;
         PcieFile *pcie_fp;
 #ifdef HAVE_LIBHIREDIS
-        redisContext *redis;
+        void *redis;
 #endif
     };
 
@@ -156,7 +143,6 @@ int LogFileFreeCtx(LogFileCtx *);
 int LogFileWrite(LogFileCtx *file_ctx, MemBuffer *buffer);
 
 int SCConfLogOpenGeneric(ConfNode *conf, LogFileCtx *, const char *, int);
-int SCConfLogOpenRedis(ConfNode *conf, LogFileCtx *log_ctx);
 int SCConfLogReopen(LogFileCtx *);
 
 #endif /* __UTIL_LOGOPENFILE_H__ */
index df5b146af5d8eb1a360a0cf90be0b28c43083e10..8e5ae741364b920bab16eb9290550b9cdef58158 100644 (file)
@@ -148,6 +148,7 @@ outputs:
       #redis:
       #  server: 127.0.0.1
       #  port: 6379
+      #  async: true ## if redis replies are read asynchronously
       #  mode: list ## possible values: list (default), channel
       #  key: suricata ## key or channel to use (default to suricata)
       # Redis pipelining set up. This will enable to only do a query every