]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
util-logopenfile: implement redis pipelining
authorEric Leblond <eric@regit.org>
Sun, 24 May 2015 19:52:56 +0000 (21:52 +0200)
committerVictor Julien <victor@inliniac.net>
Thu, 22 Oct 2015 08:01:05 +0000 (10:01 +0200)
This patch implements redis pipelining. This consist in contacting
the redis server every N events to minimize the number of TCP
exchange. This is optional and setup via the configuration file.

src/util-logopenfile.c
src/util-logopenfile.h
suricata.yaml.in

index 519e4f7735fb2077924d2d7e5a72b6a5214a5c5f..08a8f74be05f30887a123795a8b01bea3f76561b 100644 (file)
@@ -362,6 +362,24 @@ int SCConfLogOpenRedis(ConfNode *redis_node, LogFileCtx *log_ctx)
         exit(EXIT_FAILURE);
     }
 
+    log_ctx->redis_setup.batch_size = 0;
+
+    ConfNode *pipelining = ConfNodeLookupChild(redis_node, "pipelining");
+    if (pipelining) {
+        int enabled = 0;
+        int ret;
+        intmax_t val;
+        ret = ConfGetChildValueBool(pipelining, "enabled", &enabled);
+        if (ret && enabled) {
+            ret = ConfGetChildValueInt(pipelining, "batch-size", &val);
+            if (ret) {
+                log_ctx->redis_setup.batch_size = val;
+            } else {
+                log_ctx->redis_setup.batch_size = 10;
+            }
+        }
+    }
+
     if (!strcmp(redis_mode, "list")) {
         log_ctx->redis_setup.command = SCStrdup("LPUSH");
         if (!log_ctx->redis_setup.command) {
@@ -404,6 +422,10 @@ LogFileCtx *LogFileNewCtx(void)
     lf_ctx->Write = SCLogFileWrite;
     lf_ctx->Close = SCLogFileClose;
 
+#ifdef HAVE_LIBHIREDIS
+    SC_ATOMIC_INIT(lf_ctx->redis_setup.batch_count);
+#endif
+
     return lf_ctx;
 }
 
@@ -459,24 +481,46 @@ int LogFileWrite(LogFileCtx *file_ctx, MemBuffer *buffer, char *string, size_t s
     }
 #if HAVE_LIBHIREDIS
     else if (file_ctx->type == LOGFILE_TYPE_REDIS) {
-        /* FIXME go async here */
-        redisReply *reply = redisCommand(file_ctx->redis, "%s %s %s",
-                                         file_ctx->redis_setup.command,
-                                         file_ctx->redis_setup.key,
-                                         string);
-        switch (reply->type) {
-            case REDIS_REPLY_ERROR:
-                SCLogWarning(SC_WARN_NO_UNITTESTS, "Redis error: %s", reply->str);
-                break;
-            case REDIS_REPLY_INTEGER:
-                SCLogDebug("Redis integer %lld", reply->integer);
-                break;
-            default:
-                SCLogError(SC_ERR_INVALID_VALUE,
-                           "Redis default triggered with %d", reply->type);
-                break;
+        /* FIXME go async here ? */
+        if (file_ctx->redis_setup.batch_size) {
+            redisAppendCommand(file_ctx->redis, "%s %s %s",
+                    file_ctx->redis_setup.command,
+                    file_ctx->redis_setup.key,
+                    string);
+            if (SC_ATOMIC_CAS(&file_ctx->redis_setup.batch_count, file_ctx->redis_setup.batch_size, 0)) {
+                redisReply *reply;
+                int i;
+                for(i = 0; i <= file_ctx->redis_setup.batch_size; i++) {
+                    if (redisGetReply(file_ctx->redis, (void **)&reply) == REDIS_OK) {
+                        freeReplyObject(reply);
+                    } else {
+                        /* FIXME treat error */
+                        SCLogInfo("Error when fetching reply");
+                    }
+                }
+            } else {
+                SC_ATOMIC_ADD(file_ctx->redis_setup.batch_count, 1);
+            }
+        } else {
+            redisReply *reply = redisCommand(file_ctx->redis, "%s %s %s",
+                    file_ctx->redis_setup.command,
+                    file_ctx->redis_setup.key,
+                    string);
+
+            switch (reply->type) {
+                case REDIS_REPLY_ERROR:
+                    SCLogWarning(SC_WARN_NO_UNITTESTS, "Redis error: %s", reply->str);
+                    break;
+                case REDIS_REPLY_INTEGER:
+                    SCLogDebug("Redis integer %lld", reply->integer);
+                    break;
+                default:
+                    SCLogError(SC_ERR_INVALID_VALUE,
+                            "Redis default triggered with %d", reply->type);
+                    break;
+            }
+            freeReplyObject(reply);
         }
-        freeReplyObject(reply);
     }
 #endif
     SCMutexUnlock(&file_ctx->fp_mutex);
index 82ea4eefdd35605b62e6df81a231eb860d817303..966fc7545c2164ba1d8d1e9dc9072c226a52d465 100644 (file)
@@ -54,6 +54,8 @@ typedef struct RedisSetup_ {
     char *command;
     char *key;
     char *sensor_name;
+    int  batch_size;
+    SC_ATOMIC_DECLARE(int, batch_count);
 } RedisSetup;
 #endif
 
index 43d5e17a4c5542afbdf36a79ec6cfc18f7bb334b..e19b855b065c305175a530951224508623f8ae88 100644 (file)
@@ -109,6 +109,13 @@ outputs:
       #  port: 6379
       #  mode: list ## possible values: list (default), channel
       #  key: suricata ## key or channel to use (default to suricata)
+      # Redis pipelining set up. This will enable to only do a query every
+      # 'batch-size' events. This should lower the latency induced by network
+      # connection at the cost of some memory. There is no flushing implemented
+      # so this setting as to be reserved to high traffic suricata.
+      #  pipelining:
+      #    enabled: yes ## set enable to yes to enable query pipelining
+      #    batch-size: 10 ## number of entry to keep in buffer
       types:
         - alert:
             # payload: yes           # enable dumping payload in Base64