From: fooinha Date: Thu, 23 Feb 2017 22:42:05 +0000 (+0000) Subject: eve: async mode for redis output X-Git-Tag: suricata-4.0.0-beta1~204 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=a64e5e77c788b4fb05c1d67ed004cc67a725f117;p=thirdparty%2Fsuricata.git eve: async mode for redis output eve: detects libevent for async redis at configure eve: moves redis output code to new file - util-log-redis.{c,h} eve: redis ECHO and QUIT commands for async mode eve: redis output defaults if conf is missing --- diff --git a/.travis.yml b/.travis.yml index 0194a07921..c05265f1f7 100644 --- a/.travis.yml +++ b/.travis.yml @@ -29,6 +29,8 @@ addons: - libnfnetlink0 - libhiredis-dev - libjansson-dev + - libevent-dev + - libevent-pthreads-2.0-5 # Now define the default set of packages which is those above, and # libjansson. packages: &packages diff --git a/configure.ac b/configure.ac index ef0aee1514..0752c6f1aa 100644 --- a/configure.ac +++ b/configure.ac @@ -1857,6 +1857,14 @@ LDFLAGS="${LDFLAGS} -pie" fi +#libevent includes and libraries + AC_ARG_WITH(libevent_includes, + [ --with-libevent-includes=DIR libevent include directory], + [with_libevent_includes="$withval"],[with_libevent_includes="no"]) + AC_ARG_WITH(libevent_libraries, + [ --with-libevent-libraries=DIR libevent library directory], + [with_libevent_libraries="$withval"],[with_libevent_libraries="no"]) + # libhiredis AC_ARG_ENABLE(hiredis, AS_HELP_STRING([--enable-hiredis],[Enable Redis support]), @@ -1869,6 +1877,7 @@ [ --with-libhiredis-libraries=DIR libhiredis library directory], [with_libhiredis_libraries="$withval"],[with_libhiredis_libraries="no"]) + enable_hiredis_async="no" if test "$enable_hiredis" = "yes"; then if test "$with_libhiredis_includes" != "no"; then CPPFLAGS="${CPPFLAGS} -I${with_libhiredis_includes}" @@ -1895,6 +1904,47 @@ if test "$HIREDIS" = "yes"; then AC_DEFINE([HAVE_LIBHIREDIS],[1],[libhiredis available]) enable_hiredis="yes" + # + # Check if async adapters and libevent is installed + # + AC_CHECK_HEADER("hiredis/adapters/libevent.h",HIREDIS_LIBEVENT_ADAPTER="yes",HIREDIS_LIBEVENT_ADAPTER="no") + if test "$HIREDIS_LIBEVENT_ADAPTER" = "yes"; then + #Look for libevent headers + if test "$with_libevent_includes" != "no"; then + CPPFLAGS="${CPPFLAGS} -I${with_libevent_includes}" + fi + AC_CHECK_HEADER("event.h",LIBEVENT="yes",LIBEVENT="no") + if test "$LIBEVENT" = "yes"; then + if test "$with_libevent_libraries" != "no"; then + LDFLAGS="${LDFLAGS} -L${with_libevent_libraries}" + fi + AC_CHECK_LIB(event, event_base_free,, HAVE_LIBEVENT="no") + AC_CHECK_LIB(event_pthreads, evthread_use_pthreads,, HAVE_LIBEVENT_PTHREADS="no") + fi + if test "$HAVE_LIBEVENT" = "no" -o test "$HAVE_LIBEVENT_PTHREADS" = "no" ; then + if test "$HAVE_LIBEVENT" = "no"; then + echo + echo " Async mode for redis output will not be available." + echo " To enable it install libevent" + echo + echo " Ubuntu: apt-get install libevent-dev" + echo " Fedora: dnf install event-devel" + echo " RHEL/CentOS: yum install event-devel" + echo + fi + if test "$HAVE_LIBEVENT_PTHREADS" = "no"; then + echo + echo " Async mode for redis output will not be available." + echo " To enable it install libevent with pthreads support" + echo + echo " Ubuntu: apt-get install libevent-pthreads-2.0-5" + echo + fi + else + AC_DEFINE([HAVE_LIBEVENT],[1],[libevent available]) + enable_hiredis_async="yes" + fi + fi fi fi @@ -2027,6 +2077,7 @@ SURICATA_BUILD_CONF="Suricata Configuration: libnspr support: ${enable_nspr} libjansson support: ${enable_jansson} hiredis support: ${enable_hiredis} + hiredis async with libevent: ${enable_hiredis_async} Prelude support: ${enable_prelude} PCRE jit: ${pcre_jit_available} LUA support: ${enable_lua} diff --git a/src/Makefile.am b/src/Makefile.am index de7006d05f..f5ecd6afa1 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -386,6 +386,7 @@ util-ioctl.h util-ioctl.c \ util-ip.h util-ip.c \ util-logopenfile.h util-logopenfile.c \ util-logopenfile-tile.h util-logopenfile-tile.c \ +util-log-redis.h util-log-redis.c \ util-lua.c util-lua.h \ util-luajit.c util-luajit.h \ util-lua-common.c util-lua-common.h \ diff --git a/src/output-json.c b/src/output-json.c index 5a457cd504..7df5660a2c 100644 --- a/src/output-json.c +++ b/src/output-json.c @@ -56,6 +56,7 @@ #include "util-optimize.h" #include "util-buffer.h" #include "util-logopenfile.h" +#include "util-log-redis.h" #include "util-device.h" #include "flow-var.h" @@ -595,6 +596,7 @@ OutputCtx *OutputJsonInitCtx(ConfNode *conf) json_ctx->json_out = LOGFILE_TYPE_UNIX_STREAM; } else if (strcmp(output_s, "redis") == 0) { #ifdef HAVE_LIBHIREDIS + SCLogRedisInit(); json_ctx->json_out = LOGFILE_TYPE_REDIS; #else SCLogError(SC_ERR_INVALID_ARGUMENT, diff --git a/src/util-error.c b/src/util-error.c index 99b7e55cf7..29275f7d38 100644 --- a/src/util-error.c +++ b/src/util-error.c @@ -340,6 +340,7 @@ const char * SCErrorToString(SCError err) CASE_CODE (SC_WARN_CHMOD); CASE_CODE (SC_WARN_LOG_CF_TOO_MANY_NODES); CASE_CODE (SC_WARN_EVENT_DROPPED); + CASE_CODE (SC_ERR_NO_REDIS_ASYNC); } return "UNKNOWN_ERROR"; diff --git a/src/util-error.h b/src/util-error.h index dbafabe100..9dc3c79d5b 100644 --- a/src/util-error.h +++ b/src/util-error.h @@ -330,6 +330,7 @@ typedef enum { SC_WARN_CHMOD, SC_WARN_LOG_CF_TOO_MANY_NODES, SC_WARN_EVENT_DROPPED, + SC_ERR_NO_REDIS_ASYNC } SCError; const char *SCErrorToString(SCError); diff --git a/src/util-log-redis.c b/src/util-log-redis.c new file mode 100644 index 0000000000..50d33a90f3 --- /dev/null +++ b/src/util-log-redis.c @@ -0,0 +1,572 @@ +/* vi: set et ts=4: */ +/* Copyright (C) 2007-2016 Open Information Security Foundation + * + * You can copy, redistribute or modify this Program under the terms of + * the GNU General Public License version 2 as published by the Free + * Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301, USA. + */ + +/** + * \file + * + * \author Paulo Pacheco + * + * File-like output for logging: redis + */ +#include "suricata-common.h" /* errno.h, string.h, etc. */ +#include "util-log-redis.h" +#include "util-logopenfile.h" + +#ifdef HAVE_LIBHIREDIS + +#ifdef HAVE_LIBEVENT_PTHREADS +#include +#endif /* HAVE_LIBEVENT_PTHREADS */ + +static const char * redis_push_cmd = "LPUSH"; +static const char * redis_publish_cmd = "PUBLISH"; +static const char * redis_default_key = "suricata"; +static const char * redis_default_server = "127.0.0.1"; + +static int SCConfLogReopenSyncRedis(LogFileCtx *log_ctx); +static void SCLogFileCloseRedis(LogFileCtx *log_ctx); + +/** + * \brief SCLogRedisInit() - Initializes global stuff before threads + */ +void SCLogRedisInit() +{ +#ifdef HAVE_LIBEVENT_PTHREADS + evthread_use_pthreads(); +#endif /* HAVE_LIBEVENT_PTHREADS */ +} + +/** \brief SCLogRedisContextAlloc() - Allocates and initalizes redis context + */ +static SCLogRedisContext * SCLogRedisContextAlloc() +{ + SCLogRedisContext* ctx = (SCLogRedisContext*) SCMalloc(sizeof(SCLogRedisContext)); + if (ctx == NULL) { + SCLogError(SC_ERR_MEM_ALLOC, "Unable to allocate redis context"); + exit(EXIT_FAILURE); + } + ctx->sync = NULL; +#if HAVE_LIBEVENT + ctx->ev_base = NULL; + ctx->async = NULL; +#endif + ctx->batch_count = 0; + ctx->tried = 0; + + return ctx; +} + +#ifdef HAVE_LIBEVENT + +static int SCConfLogReopenAsyncRedis(LogFileCtx *log_ctx); +#include + +/** \brief SCLogRedisAsyncContextAlloc() - Allocates and initalizes redis context with async + */ +static SCLogRedisContext * SCLogRedisContextAsyncAlloc() +{ + SCLogRedisContext* ctx = (SCLogRedisContext*) SCMalloc(sizeof(SCLogRedisContext)); + if (unlikely(ctx == NULL)) { + SCLogError(SC_ERR_MEM_ALLOC, "Unable to allocate redis context"); + exit(EXIT_FAILURE); + } + + ctx->sync = NULL; + ctx->async = NULL; + ctx->ev_base = NULL; + ctx->connected = 0; + ctx->batch_count = 0; + ctx->tried = 0; + + return ctx; +} + +/** \brief SCRedisAsyncCommandCallback() Callback when reply from redis happens. + * \param ac redis async context + * \param r redis reply + * \param privvata opaque datq with pointer to LogFileCtx + */ +static void SCRedisAsyncCommandCallback(redisAsyncContext *ac, void *r, void *privdata) +{ + redisReply *reply = r; + LogFileCtx *log_ctx = privdata; + SCLogRedisContext *ctx = log_ctx->redis; + + if (reply == NULL) { + if (ctx->connected > 0) + SCLogInfo("Missing reply from redis, disconnected."); + ctx->connected = 0; + } else { + ctx->connected = 1; + event_base_loopbreak(ctx->ev_base); + } +} + +/** \brief SCRedisAsyncEchoCommandCallback() Callback for an ECHO command reply + * This is used to check if redis is connected. + * \param ac redis async context + * \param r redis reply + * \param privvata opaque datq with pointer to LogFileCtx + */ +static void SCRedisAsyncEchoCommandCallback(redisAsyncContext *ac, void *r, void *privdata) +{ + redisReply *reply = r; + SCLogRedisContext * ctx = privdata; + + if (reply) { + if (ctx->connected == 0) { + SCLogNotice("Connected to Redis."); + ctx->connected = 1; + ctx->tried = 0; + } + } else { + ctx->connected = 0; + if (ctx->tried == 0) { + SCLogWarning(SC_ERR_SOCKET, "Failed to connect to Redis... (will keep trying)"); + } + ctx->tried = time(NULL); + } + event_base_loopbreak(ctx->ev_base); +} + +/** \brief SCRedisAsyncEchoCommandCallback() Callback for an QUIT command reply + * Emits and awaits response for an async ECHO command. + * It's used for check if redis is alive. + * \param ctx redis context + */ +static void SCLogAsyncRedisSendEcho(SCLogRedisContext * ctx) +{ + redisAsyncCommand(ctx->async, SCRedisAsyncEchoCommandCallback, ctx, "ECHO suricata"); + event_base_dispatch(ctx->ev_base); +} + +/** \brief SCRedisAsyncEchoCommandCallback() Callback for an QUIT command reply + * This is used to terminate connection with redis. + * \param ac redis async context + * \param r redis reply + * \param privvata opaque datq with pointer to LogFileCtx + */ +static void SCRedisAsyncQuitCommandCallback(redisAsyncContext *ac, void *r, void *privdata) +{ + SCLogInfo("Disconnecting from redis!"); +} + +/** \brief QUIT command + * Emits and awaits response for an async QUIT command. + * It's used to disconnect with redis + * \param ctx redis context + */ +static void SCLogAsyncRedisSendQuit(SCLogRedisContext * ctx) +{ + if (ctx->connected) { + redisAsyncCommand(ctx->async, SCRedisAsyncQuitCommandCallback, ctx, "QUIT"); + SCLogInfo("QUIT Command sent to redis. Connection will terminate!"); + } + + redisAsyncFree(ctx->async); + event_base_dispatch(ctx->ev_base); + ctx->async = NULL; + event_base_free(ctx->ev_base); + ctx->ev_base = NULL; + ctx->connected = 0; +} + +/** \brief SCConfLogReopenAsyncRedis() Open or re-opens connection to redis for logging. + * \param log_ctx Log file context allocated by caller + */ +static int SCConfLogReopenAsyncRedis(LogFileCtx *log_ctx) +{ + SCLogRedisContext * ctx = log_ctx->redis; + const char *redis_server = log_ctx->redis_setup.server; + int redis_port = log_ctx->redis_setup.port; + + /* only try to reconnect once per second */ + if (ctx->tried >= time(NULL)) { + return -1; + } + + ctx->async = redisAsyncConnect(redis_server, redis_port); + + if (ctx->ev_base != NULL) { + event_base_free(ctx->ev_base); + } + + if (ctx->async == NULL) { + SCLogError(SC_ERR_MEM_ALLOC, "Error allocate redis async."); + ctx->tried = time(NULL); + return -1; + } + + if (ctx->async != NULL && ctx->async->err) { + SCLogError(SC_ERR_SOCKET, "Error setting to redis async: [%s].", ctx->async->errstr); + ctx->tried = time(NULL); + return -1; + } + + ctx->ev_base = event_base_new(); + + if (ctx->ev_base == NULL) { + ctx->tried = time(NULL); + redisAsyncFree(ctx->async); + ctx->async = NULL; + return -1; + } + + redisLibeventAttach(ctx->async, ctx->ev_base); + + log_ctx->redis = ctx; + log_ctx->Close = SCLogFileCloseRedis; + return 0; +} + + +/** \brief SCLogRedisWriteAsync() writes string to redis output in async mode + * \param file_ctx Log file context allocated by caller + * \param string Buffer to output + */ +static int SCLogRedisWriteAsync(LogFileCtx *file_ctx, const char *string, size_t string_len) +{ + SCLogRedisContext *ctx = file_ctx->redis; + + if (! ctx->connected) { + if (SCConfLogReopenAsyncRedis(file_ctx) == -1) { + return -1; + } + if (ctx->tried == 0) { + SCLogNotice("Trying to connect to Redis"); + } + SCLogAsyncRedisSendEcho(ctx); + } + + if (!ctx->connected) { + return -1; + } + + if (ctx->async == NULL) { + return -1; + } + + redisAsyncCommand(ctx->async, + SCRedisAsyncCommandCallback, + file_ctx, + "%s %s %s", + file_ctx->redis_setup.command, + file_ctx->redis_setup.key, + string); + + event_base_loop(ctx->ev_base, EVLOOP_NONBLOCK); + + return 0; +} + +#endif// HAVE_LIBEVENT + +/** \brief SCConfLogReopenSyncRedis() Open or re-opens connection to redis for logging. + * \param log_ctx Log file context allocated by caller + */ +static int SCConfLogReopenSyncRedis(LogFileCtx *log_ctx) +{ + SCLogRedisContext * ctx = log_ctx->redis; + + /* only try to reconnect once per second */ + if (ctx->tried >= time(NULL)) { + return -1; + } + + const char *redis_server = log_ctx->redis_setup.server; + int redis_port = log_ctx->redis_setup.port; + + if (ctx->sync != NULL) { + redisFree(ctx->sync); + } + ctx->sync = redisConnect(redis_server, redis_port); + if (ctx->sync == NULL) { + SCLogError(SC_ERR_SOCKET, "Error connecting to redis server."); + ctx->tried = time(NULL); + return -1; + } + if (ctx->sync->err) { + SCLogError(SC_ERR_SOCKET, "Error connecting to redis server: [%s].", ctx->sync->errstr); + redisFree(ctx->sync); + ctx->sync = NULL; + ctx->tried = time(NULL); + return -1; + } + SCLogInfo("Connected to redis server [%s].", log_ctx->redis_setup.server); + + log_ctx->redis = ctx; + log_ctx->Close = SCLogFileCloseRedis; + return 0; +} +/** \brief SCLogRedisWriteSync() writes string to redis output in sync mode + * \param file_ctx Log file context allocated by caller + * \param string Buffer to output + */ +static int SCLogRedisWriteSync(LogFileCtx *file_ctx, const char *string) +{ + SCLogRedisContext * ctx = file_ctx->redis; + int ret = -1; + redisContext *redis = ctx->sync; + if (redis == NULL) { + SCConfLogReopenSyncRedis(file_ctx); + redis = ctx->sync; + if (redis == NULL) { + SCLogDebug("Redis after re-open is not available."); + return -1; + } + } + + /* synchronous mode */ + if (file_ctx->redis_setup.batch_size) { + redisAppendCommand(redis, "%s %s %s", + file_ctx->redis_setup.command, + file_ctx->redis_setup.key, + string); + if (ctx->batch_count == file_ctx->redis_setup.batch_size) { + redisReply *reply; + int i; + ctx->batch_count = 0; + for (i = 0; i <= file_ctx->redis_setup.batch_size; i++) { + if (redisGetReply(redis, (void **)&reply) == REDIS_OK) { + freeReplyObject(reply); + ret = 0; + } else { + if (redis->err) { + SCLogInfo("Error when fetching reply: %s (%d)", + redis->errstr, + redis->err); + } + switch (redis->err) { + case REDIS_ERR_EOF: + case REDIS_ERR_IO: + SCLogInfo("Reopening connection to redis server"); + SCConfLogReopenSyncRedis(file_ctx); + redis = ctx->sync; + if (redis) { + SCLogInfo("Reconnected to redis server"); + } else { + SCLogInfo("Unable to reconnect to redis server"); + return -1; + } + break; + default: + SCLogWarning(SC_ERR_INVALID_VALUE, + "Unsupported error code %d", + redis->err); + return -1; + } + } + } + } else { + ctx->batch_count++; + } + } else { + redisReply *reply = redisCommand(redis, "%s %s %s", + file_ctx->redis_setup.command, + file_ctx->redis_setup.key, + string); + /* We may lose the reply if disconnection happens*/ + if (reply) { + switch (reply->type) { + case REDIS_REPLY_ERROR: + SCLogWarning(SC_ERR_SOCKET, "Redis error: %s", reply->str); + SCConfLogReopenSyncRedis(file_ctx); + break; + case REDIS_REPLY_INTEGER: + SCLogDebug("Redis integer %lld", reply->integer); + ret = 0; + break; + default: + SCLogError(SC_ERR_INVALID_VALUE, + "Redis default triggered with %d", reply->type); + SCConfLogReopenSyncRedis(file_ctx); + break; + } + freeReplyObject(reply); + } else { + SCConfLogReopenSyncRedis(file_ctx); + } + } + return ret; +} + +/** + * \brief LogFileWriteRedis() writes log data to redis output. + * \param log_ctx Log file context allocated by caller + * \param string buffer with data to write + * \param string_len data length + * \retval 0 on sucess; + * \retval -1 on failure; + */ +int LogFileWriteRedis(void *lf_ctx, const char *string, size_t string_len) +{ + LogFileCtx *file_ctx = lf_ctx; + if (file_ctx == NULL) { + return -1; + } + +#if HAVE_LIBEVENT + /* async mode on */ + if (file_ctx->redis_setup.is_async) { + return SCLogRedisWriteAsync(file_ctx, string, string_len); + } +#endif + /* sync mode */ + if (! file_ctx->redis_setup.is_async) { + return SCLogRedisWriteSync(file_ctx, string); + } + return -1; +} + +/** \brief configure and initializes redis output logging + * \param conf ConfNode structure for the output section in question + * \param log_ctx Log file context allocated by caller + * \retval 0 on success + */ +int SCConfLogOpenRedis(ConfNode *redis_node, void *lf_ctx) +{ + LogFileCtx *log_ctx = lf_ctx; + + const char *redis_port = NULL; + const char *redis_mode = NULL; + + int is_async = 0; + + if (redis_node) { + log_ctx->redis_setup.server = ConfNodeLookupChildValue(redis_node, "server"); + log_ctx->redis_setup.key = ConfNodeLookupChildValue(redis_node, "key"); + + redis_port = ConfNodeLookupChildValue(redis_node, "port"); + redis_mode = ConfNodeLookupChildValue(redis_node, "mode"); + + ConfGetChildValueBool(redis_node, "async", &is_async); + } + if (!log_ctx->redis_setup.server) { + log_ctx->redis_setup.server = redis_default_server; + SCLogInfo("Using default redis server (127.0.0.1)"); + } + if (!redis_port) + redis_port = "6379"; + if (!redis_mode) + redis_mode = "list"; + if (!log_ctx->redis_setup.key) { + log_ctx->redis_setup.key = redis_default_key; + } + +#ifndef HAVE_LIBEVENT + if (is_async) { + SCLogWarning(SC_ERR_NO_REDIS_ASYNC, "async option not available."); + } + is_async = 0; +#endif //ifndef HAVE_LIBEVENT + + log_ctx->redis_setup.is_async = is_async; + log_ctx->redis_setup.batch_size = 0; + if (redis_node) { + ConfNode *pipelining = ConfNodeLookupChild(redis_node, "pipelining"); + if (pipelining) { + int enabled = 0; + int ret; + intmax_t val; + ret = ConfGetChildValueBool(pipelining, "enabled", &enabled); + if (ret && enabled) { + ret = ConfGetChildValueInt(pipelining, "batch-size", &val); + if (ret) { + log_ctx->redis_setup.batch_size = val; + } else { + log_ctx->redis_setup.batch_size = 10; + } + } + } + } else { + log_ctx->redis_setup.batch_size = 0; + } + + if (!strcmp(redis_mode, "list")) { + log_ctx->redis_setup.command = redis_push_cmd; + } else { + log_ctx->redis_setup.command = redis_publish_cmd; + } + /* store server params for reconnection */ + if (!log_ctx->redis_setup.server) { + SCLogError(SC_ERR_MEM_ALLOC, "Error allocating redis server string"); + exit(EXIT_FAILURE); + } + log_ctx->redis_setup.port = atoi(redis_port); + log_ctx->Close = SCLogFileCloseRedis; + +#ifdef HAVE_LIBEVENT + if (is_async) { + log_ctx->redis = SCLogRedisContextAsyncAlloc(); + } +#endif /*HAVE_LIBEVENT*/ + if (! is_async) { + log_ctx->redis = SCLogRedisContextAlloc(); + SCConfLogReopenSyncRedis(log_ctx); + } + return 0; +} + +/** \brief SCLogFileCloseRedis() Closes redis log more + * \param log_ctx Log file context allocated by caller + */ +void SCLogFileCloseRedis(LogFileCtx *log_ctx) +{ + SCLogRedisContext * ctx = log_ctx->redis; + if (ctx == NULL) { + return; + } + /* asynchronous */ + if (log_ctx->redis_setup.is_async) { +#if HAVE_LIBEVENT == 1 + if (ctx->async) { + if (ctx->connected > 0) { + SCLogAsyncRedisSendQuit(ctx); + } + if (ctx->ev_base != NULL) { + event_base_free(ctx->ev_base); + ctx->ev_base = NULL; + } + } +#endif + } + + /* synchronous */ + if (!log_ctx->redis_setup.is_async) { + if (ctx->sync) { + redisReply *reply; + int i; + for (i = 0; i < ctx->batch_count; i++) { + redisGetReply(ctx->sync, (void **)&reply); + if (reply) { + freeReplyObject(reply); + } + } + redisFree(ctx->sync); + ctx->sync = NULL; + } + ctx->tried = 0; + ctx->batch_count = 0; + } + + if (ctx != NULL) { + SCFree(ctx); + } +} + +#endif //#ifdef HAVE_LIBHIREDIS diff --git a/src/util-log-redis.h b/src/util-log-redis.h new file mode 100644 index 0000000000..ff8bfc46f7 --- /dev/null +++ b/src/util-log-redis.h @@ -0,0 +1,65 @@ +/* Copyright (C) 2016 Open Information Security Foundation + * + * You can copy, redistribute or modify this Program under the terms of + * the GNU General Public License version 2 as published by the Free + * Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301, USA. + */ + +/** + * \file + * + * \author Paulo Pacheco + */ + +#ifndef __UTIL_LOG_REDIS_H__ +#define __UTIL_LOG_REDIS_H__ + +#ifdef HAVE_LIBHIREDIS +#include + + +#ifdef HAVE_LIBEVENT +#include +#endif /* HAVE_LIBEVENT */ + +#include "conf.h" /* ConfNode */ + +enum RedisMode { REDIS_LIST, REDIS_CHANNEL }; + +typedef struct RedisSetup_ { + enum RedisMode mode; + const char *command; + const char *key; + const char *server; + int port; + int is_async; + int batch_size; +} RedisSetup; + +typedef struct SCLogRedisContext_ { + redisContext *sync; +#if HAVE_LIBEVENT + redisAsyncContext *async; + struct event_base *ev_base; + int connected; +#endif /* HAVE_LIBEVENT */ + time_t tried; + int batch_count; +} SCLogRedisContext; + +void SCLogRedisInit(); +int SCConfLogOpenRedis(ConfNode *, void *); +int LogFileWriteRedis(void *, const char *, size_t); + +#endif /* HAVE_LIBHIREDIS */ +#endif /* __UTIL_LOG_REDIS_H__ */ diff --git a/src/util-logopenfile.c b/src/util-logopenfile.c index 376a77af29..b5863f84ea 100644 --- a/src/util-logopenfile.c +++ b/src/util-logopenfile.c @@ -35,8 +35,9 @@ #include "util-logopenfile.h" #include "util-logopenfile-tile.h" -const char * redis_push_cmd = "LPUSH"; -const char * redis_publish_cmd = "PUBLISH"; +#ifdef HAVE_LIBHIREDIS +#include "util-log-redis.h" +#endif /* HAVE_LIBHIREDIS */ /** \brief connect to the indicated local stream socket, logging any errors * \param path filesystem path to connect to @@ -550,138 +551,6 @@ int SCConfLogReopen(LogFileCtx *log_ctx) return 0; } - -#ifdef HAVE_LIBHIREDIS - -static void SCLogFileCloseRedis(LogFileCtx *log_ctx) -{ - if (log_ctx->redis) { - redisReply *reply; - int i; - for (i = 0; i < log_ctx->redis_setup.batch_count; i++) { - redisGetReply(log_ctx->redis, (void **)&reply); - if (reply) - freeReplyObject(reply); - } - redisFree(log_ctx->redis); - log_ctx->redis = NULL; - } - log_ctx->redis_setup.tried = 0; - log_ctx->redis_setup.batch_count = 0; -} - -int SCConfLogOpenRedis(ConfNode *redis_node, LogFileCtx *log_ctx) -{ - const char *redis_server = NULL; - const char *redis_port = NULL; - const char *redis_mode = NULL; - const char *redis_key = NULL; - - if (redis_node) { - redis_server = ConfNodeLookupChildValue(redis_node, "server"); - redis_port = ConfNodeLookupChildValue(redis_node, "port"); - redis_mode = ConfNodeLookupChildValue(redis_node, "mode"); - redis_key = ConfNodeLookupChildValue(redis_node, "key"); - } - if (!redis_server) { - redis_server = "127.0.0.1"; - SCLogInfo("Using default redis server (127.0.0.1)"); - } - if (!redis_port) - redis_port = "6379"; - if (!redis_mode) - redis_mode = "list"; - if (!redis_key) - redis_key = "suricata"; - log_ctx->redis_setup.key = SCStrdup(redis_key); - - if (!log_ctx->redis_setup.key) { - SCLogError(SC_ERR_MEM_ALLOC, "Unable to allocate redis key name"); - exit(EXIT_FAILURE); - } - - log_ctx->redis_setup.batch_size = 0; - - ConfNode *pipelining = ConfNodeLookupChild(redis_node, "pipelining"); - if (pipelining) { - int enabled = 0; - int ret; - intmax_t val; - ret = ConfGetChildValueBool(pipelining, "enabled", &enabled); - if (ret && enabled) { - ret = ConfGetChildValueInt(pipelining, "batch-size", &val); - if (ret) { - log_ctx->redis_setup.batch_size = val; - } else { - log_ctx->redis_setup.batch_size = 10; - } - } - } - - if (!strcmp(redis_mode, "list")) { - log_ctx->redis_setup.command = redis_push_cmd; - if (!log_ctx->redis_setup.command) { - SCLogError(SC_ERR_MEM_ALLOC, "Unable to allocate redis key command"); - exit(EXIT_FAILURE); - } - } else { - log_ctx->redis_setup.command = redis_publish_cmd; - if (!log_ctx->redis_setup.command) { - SCLogError(SC_ERR_MEM_ALLOC, "Unable to allocate redis key command"); - exit(EXIT_FAILURE); - } - } - redisContext *c = redisConnect(redis_server, atoi(redis_port)); - if (c != NULL && c->err) { - SCLogError(SC_ERR_SOCKET, "Error connecting to redis server: %s", c->errstr); - exit(EXIT_FAILURE); - } - - /* store server params for reconnection */ - log_ctx->redis_setup.server = SCStrdup(redis_server); - if (!log_ctx->redis_setup.server) { - SCLogError(SC_ERR_MEM_ALLOC, "Error allocating redis server string"); - exit(EXIT_FAILURE); - } - log_ctx->redis_setup.port = atoi(redis_port); - log_ctx->redis_setup.tried = 0; - - log_ctx->redis = c; - - log_ctx->Close = SCLogFileCloseRedis; - - return 0; -} - -int SCConfLogReopenRedis(LogFileCtx *log_ctx) -{ - if (log_ctx->redis != NULL) { - redisFree(log_ctx->redis); - log_ctx->redis = NULL; - } - - /* only try to reconnect once per second */ - if (log_ctx->redis_setup.tried >= time(NULL)) { - return -1; - } - - redisContext *c = redisConnect(log_ctx->redis_setup.server, log_ctx->redis_setup.port); - if (c != NULL && c->err) { - if (log_ctx->redis_setup.tried == 0) { - SCLogError(SC_ERR_SOCKET, "Error connecting to redis server: %s\n", c->errstr); - } - redisFree(c); - log_ctx->redis_setup.tried = time(NULL); - return -1; - } - log_ctx->redis = c; - log_ctx->redis_setup.tried = 0; - log_ctx->redis_setup.batch_count = 0; - return 0; -} - -#endif - /** \brief LogFileNewCtx() Get a new LogFileCtx * \retval LogFileCtx * pointer if succesful, NULL if error * */ @@ -700,10 +569,6 @@ LogFileCtx *LogFileNewCtx(void) lf_ctx->Write = SCLogFileWrite; lf_ctx->Close = SCLogFileClose; -#ifdef HAVE_LIBHIREDIS - lf_ctx->redis_setup.batch_count = 0; -#endif - return lf_ctx; } @@ -723,17 +588,6 @@ int LogFileFreeCtx(LogFileCtx *lf_ctx) SCMutexUnlock(&lf_ctx->fp_mutex); } -#ifdef HAVE_LIBHIREDIS - if (lf_ctx->type == LOGFILE_TYPE_REDIS) { - if (lf_ctx->redis) - redisFree(lf_ctx->redis); - if (lf_ctx->redis_setup.server) - SCFree(lf_ctx->redis_setup.server); - if (lf_ctx->redis_setup.key) - SCFree(lf_ctx->redis_setup.key); - } -#endif - SCMutexDestroy(&lf_ctx->fp_mutex); if (lf_ctx->prefix != NULL) { @@ -754,91 +608,6 @@ int LogFileFreeCtx(LogFileCtx *lf_ctx) SCReturnInt(1); } -#ifdef HAVE_LIBHIREDIS -static int LogFileWriteRedis(LogFileCtx *file_ctx, const char *string, size_t string_len) -{ - if (file_ctx->redis == NULL) { - SCConfLogReopenRedis(file_ctx); - if (file_ctx->redis == NULL) { - return -1; - } else { - SCLogInfo("Reconnected to redis server"); - } - } - /* TODO go async here ? */ - if (file_ctx->redis_setup.batch_size) { - redisAppendCommand(file_ctx->redis, "%s %s %s", - file_ctx->redis_setup.command, - file_ctx->redis_setup.key, - string); - if (file_ctx->redis_setup.batch_count == file_ctx->redis_setup.batch_size) { - redisReply *reply; - int i; - file_ctx->redis_setup.batch_count = 0; - for (i = 0; i <= file_ctx->redis_setup.batch_size; i++) { - if (redisGetReply(file_ctx->redis, (void **)&reply) == REDIS_OK) { - freeReplyObject(reply); - } else { - if (file_ctx->redis->err) { - SCLogInfo("Error when fetching reply: %s (%d)", - file_ctx->redis->errstr, - file_ctx->redis->err); - } - switch (file_ctx->redis->err) { - case REDIS_ERR_EOF: - case REDIS_ERR_IO: - SCLogInfo("Reopening connection to redis server"); - SCConfLogReopenRedis(file_ctx); - if (file_ctx->redis) { - SCLogInfo("Reconnected to redis server"); - return 0; - } else { - SCLogInfo("Unable to reconnect to redis server"); - return 0; - } - break; - default: - SCLogWarning(SC_ERR_INVALID_VALUE, - "Unsupported error code %d", - file_ctx->redis->err); - return 0; - } - } - } - } else { - file_ctx->redis_setup.batch_count++; - } - } else { - redisReply *reply = redisCommand(file_ctx->redis, "%s %s %b", - file_ctx->redis_setup.command, - file_ctx->redis_setup.key, - string, string_len); - - /* We may lose the reply if disconnection happens! */ - if (reply) { - switch (reply->type) { - case REDIS_REPLY_ERROR: - SCLogWarning(SC_ERR_SOCKET, "Redis error: %s", reply->str); - SCConfLogReopenRedis(file_ctx); - break; - case REDIS_REPLY_INTEGER: - SCLogDebug("Redis integer %lld", reply->integer); - break; - default: - SCLogError(SC_ERR_INVALID_VALUE, - "Redis default triggered with %d", reply->type); - SCConfLogReopenRedis(file_ctx); - break; - } - freeReplyObject(reply); - } else { - SCConfLogReopenRedis(file_ctx); - } - } - return 0; -} -#endif - int LogFileWrite(LogFileCtx *file_ctx, MemBuffer *buffer) { if (file_ctx->type == LOGFILE_TYPE_SYSLOG) { diff --git a/src/util-logopenfile.h b/src/util-logopenfile.h index 857f8b821c..ae99ad8a56 100644 --- a/src/util-logopenfile.h +++ b/src/util-logopenfile.h @@ -29,8 +29,9 @@ #include "util-buffer.h" #ifdef HAVE_LIBHIREDIS -#include "hiredis/hiredis.h" -#endif +#include "util-log-redis.h" +#endif /* HAVE_LIBHIREDIS */ + typedef struct { uint16_t fileno; @@ -46,20 +47,6 @@ typedef struct SyslogSetup_ { int alert_syslog_level; } SyslogSetup; -#ifdef HAVE_LIBHIREDIS -enum RedisMode { REDIS_LIST, REDIS_CHANNEL }; - -typedef struct RedisSetup_ { - enum RedisMode mode; - const char *command; - char *key; - int batch_size; - int batch_count; - char *server; - int port; - time_t tried; -} RedisSetup; -#endif /** Global structure for Output Context */ typedef struct LogFileCtx_ { @@ -67,7 +54,7 @@ typedef struct LogFileCtx_ { FILE *fp; PcieFile *pcie_fp; #ifdef HAVE_LIBHIREDIS - redisContext *redis; + void *redis; #endif }; @@ -156,7 +143,6 @@ int LogFileFreeCtx(LogFileCtx *); int LogFileWrite(LogFileCtx *file_ctx, MemBuffer *buffer); int SCConfLogOpenGeneric(ConfNode *conf, LogFileCtx *, const char *, int); -int SCConfLogOpenRedis(ConfNode *conf, LogFileCtx *log_ctx); int SCConfLogReopen(LogFileCtx *); #endif /* __UTIL_LOGOPENFILE_H__ */ diff --git a/suricata.yaml.in b/suricata.yaml.in index df5b146af5..8e5ae74136 100644 --- a/suricata.yaml.in +++ b/suricata.yaml.in @@ -148,6 +148,7 @@ outputs: #redis: # server: 127.0.0.1 # port: 6379 + # async: true ## if redis replies are read asynchronously # mode: list ## possible values: list (default), channel # key: suricata ## key or channel to use (default to suricata) # Redis pipelining set up. This will enable to only do a query every