]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
util-logopenfile: reconnect handling
authorEric Leblond <eric@regit.org>
Mon, 25 May 2015 17:38:28 +0000 (19:38 +0200)
committerVictor Julien <victor@inliniac.net>
Thu, 22 Oct 2015 08:01:05 +0000 (10:01 +0200)
This patch implements reconnection handling for the redis output.
A reconnect limitation has been implemented with a limitation of
one connection per second.

src/util-logopenfile.c
src/util-logopenfile.h

index 08a8f74be05f30887a123795a8b01bea3f76561b..c8b335b263e8af31ebb356cc786cf729ddd7c4ec 100644 (file)
@@ -331,7 +331,16 @@ int SCConfLogReopen(LogFileCtx *log_ctx)
 }
 
 
-#if HAVE_LIBHIREDIS
+#ifdef HAVE_LIBHIREDIS
+
+static void SCLogFileCloseRedis(LogFileCtx *log_ctx)
+{
+    if (log_ctx->redis)
+        redisFree(log_ctx->redis);
+    log_ctx->redis_setup.tried = 0;
+    SC_ATOMIC_SET(log_ctx->redis_setup.batch_count, 0);
+}
+
 int SCConfLogOpenRedis(ConfNode *redis_node, LogFileCtx *log_ctx)
 {
     const char *redis_server = NULL;
@@ -398,10 +407,50 @@ int SCConfLogOpenRedis(ConfNode *redis_node, LogFileCtx *log_ctx)
         SCLogError(SC_ERR_SOCKET, "Error connecting to redis server: %s", c->errstr);
         exit(EXIT_FAILURE);
     }
+
+    /* store server params for reconnection */
+    log_ctx->redis_setup.server = SCStrdup(redis_server);
+    if (!log_ctx->redis_setup.server) {
+        SCLogError(SC_ERR_MEM_ALLOC, "Error allocating redis server string");
+        exit(EXIT_FAILURE);
+    }
+    log_ctx->redis_setup.port = atoi(redis_port);
+    log_ctx->redis_setup.tried = 0;
+
     log_ctx->redis = c;
 
+    log_ctx->Close = SCLogFileCloseRedis;
+
     return 0;
 }
+
+int SCConfLogReopenRedis(LogFileCtx *log_ctx)
+{
+    if (log_ctx->redis != NULL) {
+        redisFree(log_ctx->redis);
+        log_ctx->redis = NULL;
+    }
+
+    /* only try to reconnect once per second */
+    if (log_ctx->redis_setup.tried >= time(NULL)) {
+        return -1;
+    }
+
+    redisContext *c = redisConnect(log_ctx->redis_setup.server, log_ctx->redis_setup.port);
+    if (c != NULL && c->err) {
+        if (log_ctx->redis_setup.tried == 0) {
+            SCLogError(SC_ERR_SOCKET, "Error connecting to redis server: %s\n", c->errstr);
+        }
+        redisFree(c);
+        log_ctx->redis_setup.tried = time(NULL);
+        return -1;
+    }
+    log_ctx->redis = c;
+    log_ctx->redis_setup.tried = 0;
+    SC_ATOMIC_SET(log_ctx->redis_setup.batch_count, 0);
+    return 0;
+}
+
 #endif
 
 /** \brief LogFileNewCtx() Get a new LogFileCtx
@@ -448,6 +497,9 @@ int LogFileFreeCtx(LogFileCtx *lf_ctx)
 #ifdef HAVE_LIBHIREDIS
     if (lf_ctx->type == LOGFILE_TYPE_REDIS && lf_ctx->redis) {
         redisFree(lf_ctx->redis);
+        SCFree(lf_ctx->redis_setup.server);
+        SCFree(lf_ctx->redis_setup.command);
+        SCFree(lf_ctx->redis_setup.key);
     }
 #endif
 
@@ -481,6 +533,16 @@ int LogFileWrite(LogFileCtx *file_ctx, MemBuffer *buffer, char *string, size_t s
     }
 #if HAVE_LIBHIREDIS
     else if (file_ctx->type == LOGFILE_TYPE_REDIS) {
+        if (file_ctx->redis == NULL) {
+            /* FIXME temporisation */
+            SCConfLogReopenRedis(file_ctx);
+            if (file_ctx->redis == NULL) {
+                SCMutexUnlock(&file_ctx->fp_mutex);
+                return -1;
+            } else {
+                SCLogInfo("Reconnected to redis server");
+            }
+        }
         /* FIXME go async here ? */
         if (file_ctx->redis_setup.batch_size) {
             redisAppendCommand(file_ctx->redis, "%s %s %s",
@@ -495,7 +557,30 @@ int LogFileWrite(LogFileCtx *file_ctx, MemBuffer *buffer, char *string, size_t s
                         freeReplyObject(reply);
                     } else {
                         /* FIXME treat error */
-                        SCLogInfo("Error when fetching reply");
+                        if (file_ctx->redis->err) {
+                            SCLogInfo("Error when fetching reply: %s (%d)",
+                                      file_ctx->redis->errstr,
+                                      file_ctx->redis->err);
+                        }
+                        switch (file_ctx->redis->err) {
+                            case REDIS_ERR_EOF:
+                            case REDIS_ERR_IO:
+                                SCLogInfo("Reopening connection to redis server");
+                                SCConfLogReopenRedis(file_ctx);
+                                if (file_ctx->redis) {
+                                    SCLogInfo("Reconnected to redis server");
+                                    SCMutexUnlock(&file_ctx->fp_mutex);
+                                    return 0;
+                                } else {
+                                    SCLogInfo("Unable to reconnect to redis server");
+                                    SCMutexUnlock(&file_ctx->fp_mutex);
+                                    return 0;
+                                }
+                                break;
+                            default:
+                                SCLogInfo("Unsupported error code %d",
+                                          file_ctx->redis->err);
+                        }
                     }
                 }
             } else {
@@ -510,6 +595,7 @@ int LogFileWrite(LogFileCtx *file_ctx, MemBuffer *buffer, char *string, size_t s
             switch (reply->type) {
                 case REDIS_REPLY_ERROR:
                     SCLogWarning(SC_WARN_NO_UNITTESTS, "Redis error: %s", reply->str);
+                    SCConfLogReopenRedis(file_ctx);
                     break;
                 case REDIS_REPLY_INTEGER:
                     SCLogDebug("Redis integer %lld", reply->integer);
@@ -517,6 +603,7 @@ int LogFileWrite(LogFileCtx *file_ctx, MemBuffer *buffer, char *string, size_t s
                 default:
                     SCLogError(SC_ERR_INVALID_VALUE,
                             "Redis default triggered with %d", reply->type);
+                    SCConfLogReopenRedis(file_ctx);
                     break;
             }
             freeReplyObject(reply);
index 966fc7545c2164ba1d8d1e9dc9072c226a52d465..d1efee9f6776b0774c557c4af414602d9bd59c6c 100644 (file)
@@ -56,6 +56,9 @@ typedef struct RedisSetup_ {
     char *sensor_name;
     int  batch_size;
     SC_ATOMIC_DECLARE(int, batch_count);
+    char *server;
+    int  port;
+    time_t tried;
 } RedisSetup;
 #endif