From: Eric Leblond Date: Mon, 25 May 2015 17:38:28 +0000 (+0200) Subject: util-logopenfile: reconnect handling X-Git-Tag: suricata-3.0RC1~49 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=594f62b52305258b6a39be76b7bdf049c74298d3;p=thirdparty%2Fsuricata.git util-logopenfile: reconnect handling This patch implements reconnection handling for the redis output. A reconnect limitation has been implemented with a limitation of one connection per second. --- diff --git a/src/util-logopenfile.c b/src/util-logopenfile.c index 08a8f74be0..c8b335b263 100644 --- a/src/util-logopenfile.c +++ b/src/util-logopenfile.c @@ -331,7 +331,16 @@ int SCConfLogReopen(LogFileCtx *log_ctx) } -#if HAVE_LIBHIREDIS +#ifdef HAVE_LIBHIREDIS + +static void SCLogFileCloseRedis(LogFileCtx *log_ctx) +{ + if (log_ctx->redis) + redisFree(log_ctx->redis); + log_ctx->redis_setup.tried = 0; + SC_ATOMIC_SET(log_ctx->redis_setup.batch_count, 0); +} + int SCConfLogOpenRedis(ConfNode *redis_node, LogFileCtx *log_ctx) { const char *redis_server = NULL; @@ -398,10 +407,50 @@ int SCConfLogOpenRedis(ConfNode *redis_node, LogFileCtx *log_ctx) SCLogError(SC_ERR_SOCKET, "Error connecting to redis server: %s", c->errstr); exit(EXIT_FAILURE); } + + /* store server params for reconnection */ + log_ctx->redis_setup.server = SCStrdup(redis_server); + if (!log_ctx->redis_setup.server) { + SCLogError(SC_ERR_MEM_ALLOC, "Error allocating redis server string"); + exit(EXIT_FAILURE); + } + log_ctx->redis_setup.port = atoi(redis_port); + log_ctx->redis_setup.tried = 0; + log_ctx->redis = c; + log_ctx->Close = SCLogFileCloseRedis; + return 0; } + +int SCConfLogReopenRedis(LogFileCtx *log_ctx) +{ + if (log_ctx->redis != NULL) { + redisFree(log_ctx->redis); + log_ctx->redis = NULL; + } + + /* only try to reconnect once per second */ + if (log_ctx->redis_setup.tried >= time(NULL)) { + return -1; + } + + redisContext *c = redisConnect(log_ctx->redis_setup.server, log_ctx->redis_setup.port); + if (c != NULL && c->err) { + if (log_ctx->redis_setup.tried == 0) { + SCLogError(SC_ERR_SOCKET, "Error connecting to redis server: %s\n", c->errstr); + } + redisFree(c); + log_ctx->redis_setup.tried = time(NULL); + return -1; + } + log_ctx->redis = c; + log_ctx->redis_setup.tried = 0; + SC_ATOMIC_SET(log_ctx->redis_setup.batch_count, 0); + return 0; +} + #endif /** \brief LogFileNewCtx() Get a new LogFileCtx @@ -448,6 +497,9 @@ int LogFileFreeCtx(LogFileCtx *lf_ctx) #ifdef HAVE_LIBHIREDIS if (lf_ctx->type == LOGFILE_TYPE_REDIS && lf_ctx->redis) { redisFree(lf_ctx->redis); + SCFree(lf_ctx->redis_setup.server); + SCFree(lf_ctx->redis_setup.command); + SCFree(lf_ctx->redis_setup.key); } #endif @@ -481,6 +533,16 @@ int LogFileWrite(LogFileCtx *file_ctx, MemBuffer *buffer, char *string, size_t s } #if HAVE_LIBHIREDIS else if (file_ctx->type == LOGFILE_TYPE_REDIS) { + if (file_ctx->redis == NULL) { + /* FIXME temporisation */ + SCConfLogReopenRedis(file_ctx); + if (file_ctx->redis == NULL) { + SCMutexUnlock(&file_ctx->fp_mutex); + return -1; + } else { + SCLogInfo("Reconnected to redis server"); + } + } /* FIXME go async here ? */ if (file_ctx->redis_setup.batch_size) { redisAppendCommand(file_ctx->redis, "%s %s %s", @@ -495,7 +557,30 @@ int LogFileWrite(LogFileCtx *file_ctx, MemBuffer *buffer, char *string, size_t s freeReplyObject(reply); } else { /* FIXME treat error */ - SCLogInfo("Error when fetching reply"); + if (file_ctx->redis->err) { + SCLogInfo("Error when fetching reply: %s (%d)", + file_ctx->redis->errstr, + file_ctx->redis->err); + } + switch (file_ctx->redis->err) { + case REDIS_ERR_EOF: + case REDIS_ERR_IO: + SCLogInfo("Reopening connection to redis server"); + SCConfLogReopenRedis(file_ctx); + if (file_ctx->redis) { + SCLogInfo("Reconnected to redis server"); + SCMutexUnlock(&file_ctx->fp_mutex); + return 0; + } else { + SCLogInfo("Unable to reconnect to redis server"); + SCMutexUnlock(&file_ctx->fp_mutex); + return 0; + } + break; + default: + SCLogInfo("Unsupported error code %d", + file_ctx->redis->err); + } } } } else { @@ -510,6 +595,7 @@ int LogFileWrite(LogFileCtx *file_ctx, MemBuffer *buffer, char *string, size_t s switch (reply->type) { case REDIS_REPLY_ERROR: SCLogWarning(SC_WARN_NO_UNITTESTS, "Redis error: %s", reply->str); + SCConfLogReopenRedis(file_ctx); break; case REDIS_REPLY_INTEGER: SCLogDebug("Redis integer %lld", reply->integer); @@ -517,6 +603,7 @@ int LogFileWrite(LogFileCtx *file_ctx, MemBuffer *buffer, char *string, size_t s default: SCLogError(SC_ERR_INVALID_VALUE, "Redis default triggered with %d", reply->type); + SCConfLogReopenRedis(file_ctx); break; } freeReplyObject(reply); diff --git a/src/util-logopenfile.h b/src/util-logopenfile.h index 966fc7545c..d1efee9f67 100644 --- a/src/util-logopenfile.h +++ b/src/util-logopenfile.h @@ -56,6 +56,9 @@ typedef struct RedisSetup_ { char *sensor_name; int batch_size; SC_ATOMIC_DECLARE(int, batch_count); + char *server; + int port; + time_t tried; } RedisSetup; #endif