]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
redis: implement XADD stream support
authorSascha Steinbiss <satta@debian.org>
Tue, 11 Jun 2024 11:14:00 +0000 (13:14 +0200)
committerVictor Julien <victor@inliniac.net>
Thu, 24 Oct 2024 07:35:23 +0000 (09:35 +0200)
Ticket: #7082

doc/userguide/output/eve/eve-json-output.rst
doc/userguide/partials/eve-log.yaml
src/util-log-redis.c
src/util-log-redis.h
suricata.yaml.in

index 7fc40783c2f2e4a27d8614c46270c156665d7e5d..26a010a7a960fd1009d25e712d63ad03f8f943c2 100644 (file)
@@ -41,10 +41,11 @@ Output types::
       #  server: 127.0.0.1
       #  port: 6379
       #  async: true ## if redis replies are read asynchronously
-      #  mode: list ## possible values: list|lpush (default), rpush, channel|publish
+      #  mode: list ## possible values: list|lpush (default), rpush, channel|publish, xadd|stream
       #             ## lpush and rpush are using a Redis list. "list" is an alias for lpush
       #             ## publish is using a Redis channel. "channel" is an alias for publish
-      #  key: suricata ## key or channel to use (default to suricata)
+      #             ## xadd is using a Redis stream. "stream" is an alias for xadd
+      #  key: suricata ## string denoting the key/channel/stream to use (default to suricata)
       # Redis pipelining set up. This will enable to only do a query every
       # 'batch-size' events. This should lower the latency induced by network
       # connection at the cost of some memory. There is no flushing implemented
index 05faf209d4b53caf507c5428afc298c22efbe143..2030a5b68f1683172c7587e86fb3e24c05c03b2a 100644 (file)
@@ -18,10 +18,11 @@ outputs:
       #  server: 127.0.0.1
       #  port: 6379
       #  async: true ## if redis replies are read asynchronously
-      #  mode: list ## possible values: list|lpush (default), rpush, channel|publish
+      #  mode: list ## possible values: list|lpush (default), rpush, channel|publish, xadd|stream
       #             ## lpush and rpush are using a Redis list. "list" is an alias for lpush
       #             ## publish is using a Redis channel. "channel" is an alias for publish
-      #  key: suricata ## key or channel to use (default to suricata)
+      #             ## xadd is using a Redis stream. "stream" is an alias for xadd
+      #  key: suricata ## string denoting the key/channel/stream to use (default to suricata)
       # Redis pipelining set up. This will enable to only do a query every
       # 'batch-size' events. This should lower the latency induced by network
       # connection at the cost of some memory. There is no flushing implemented
index 5f590d2c69339387399a3009d8f32d6cc91f77b3..ad48ab68ef43a7b34b282987a0171a4495547bf0 100644 (file)
@@ -1,4 +1,4 @@
-/* Copyright (C) 2007-2021 Open Information Security Foundation
+/* Copyright (C) 2007-2024 Open Information Security Foundation
  *
  * You can copy, redistribute or modify this Program under the terms of
  * the GNU General Public License version 2 as published by the Free
 static const char * redis_lpush_cmd = "LPUSH";
 static const char * redis_rpush_cmd = "RPUSH";
 static const char * redis_publish_cmd = "PUBLISH";
+static const char *redis_xadd_cmd = "XADD";
 static const char * redis_default_key = "suricata";
 static const char * redis_default_server = "127.0.0.1";
+static const char *redis_default_format = "%s %s %s";
+static const char *redis_stream_format = "%s %s * eve %s";
 
 static int SCConfLogReopenSyncRedis(LogFileCtx *log_ctx);
 static void SCLogFileCloseRedis(LogFileCtx *log_ctx);
@@ -268,12 +271,8 @@ static int SCLogRedisWriteAsync(LogFileCtx *file_ctx, const char *string, size_t
         return -1;
     }
 
-    redisAsyncCommand(ctx->async,
-            SCRedisAsyncCommandCallback,
-            file_ctx,
-            "%s %s %s",
-            file_ctx->redis_setup.command,
-            file_ctx->redis_setup.key,
+    redisAsyncCommand(ctx->async, SCRedisAsyncCommandCallback, file_ctx,
+            file_ctx->redis_setup.format, file_ctx->redis_setup.command, file_ctx->redis_setup.key,
             string);
 
     event_base_loop(ctx->ev_base, EVLOOP_NONBLOCK);
@@ -345,10 +344,8 @@ static int SCLogRedisWriteSync(LogFileCtx *file_ctx, const char *string)
 
     /* synchronous mode */
     if (file_ctx->redis_setup.batch_size) {
-        redisAppendCommand(redis, "%s %s %s",
-                file_ctx->redis_setup.command,
-                file_ctx->redis_setup.key,
-                string);
+        redisAppendCommand(redis, file_ctx->redis_setup.format, file_ctx->redis_setup.command,
+                file_ctx->redis_setup.key, string);
         time_t now = time(NULL);
         if ((ctx->batch_count == file_ctx->redis_setup.batch_size) || (ctx->last_push < now)) {
             redisReply *reply;
@@ -374,9 +371,8 @@ static int SCLogRedisWriteSync(LogFileCtx *file_ctx, const char *string)
                             redis = ctx->sync;
                             if (redis) {
                                 SCLogInfo("Reconnected to redis server");
-                                redisAppendCommand(redis, "%s %s %s",
-                                        file_ctx->redis_setup.command,
-                                        file_ctx->redis_setup.key,
+                                redisAppendCommand(redis, file_ctx->redis_setup.format,
+                                        file_ctx->redis_setup.command, file_ctx->redis_setup.key,
                                         string);
                                 ctx->batch_count++;
                                 return 0;
@@ -395,10 +391,8 @@ static int SCLogRedisWriteSync(LogFileCtx *file_ctx, const char *string)
             ctx->batch_count++;
         }
     } else {
-        redisReply *reply = redisCommand(redis, "%s %s %s",
-                file_ctx->redis_setup.command,
-                file_ctx->redis_setup.key,
-                string);
+        redisReply *reply = redisCommand(redis, file_ctx->redis_setup.format,
+                file_ctx->redis_setup.command, file_ctx->redis_setup.key, string);
         /* We may lose the reply if disconnection happens*/
         if (reply) {
             switch (reply->type) {
@@ -410,6 +404,10 @@ static int SCLogRedisWriteSync(LogFileCtx *file_ctx, const char *string)
                     SCLogDebug("Redis integer %lld", reply->integer);
                     ret = 0;
                     break;
+                case REDIS_REPLY_STRING:
+                    SCLogDebug("Redis string %s", reply->str);
+                    ret = 0;
+                    break;
                 default:
                     SCLogError("Redis default triggered with %d", reply->type);
                     SCConfLogReopenSyncRedis(file_ctx);
@@ -519,14 +517,18 @@ int SCConfLogOpenRedis(ConfNode *redis_node, void *lf_ctx)
         log_ctx->redis_setup.batch_size = 0;
     }
 
+    log_ctx->redis_setup.format = redis_default_format;
     if (!strcmp(redis_mode, "list") || !strcmp(redis_mode,"lpush")) {
         log_ctx->redis_setup.command = redis_lpush_cmd;
     } else if(!strcmp(redis_mode, "rpush")){
         log_ctx->redis_setup.command = redis_rpush_cmd;
     } else if(!strcmp(redis_mode,"channel") || !strcmp(redis_mode,"publish")) {
         log_ctx->redis_setup.command = redis_publish_cmd;
+    } else if (!strcmp(redis_mode, "stream") || !strcmp(redis_mode, "xadd")) {
+        log_ctx->redis_setup.command = redis_xadd_cmd;
+        log_ctx->redis_setup.format = redis_stream_format;
     } else {
-        FatalError("Invalid redis mode");
+        FatalError("Invalid redis mode: %s", redis_mode);
     }
 
     /* store server params for reconnection */
index f53669a195b3a95f25bc1e3c825800b820d4f01c..f6d069555e588bd2a7f39c72777f42e9d89e4792 100644 (file)
@@ -1,4 +1,4 @@
-/* Copyright (C) 2016 Open Information Security Foundation
+/* Copyright (C) 2016-2024 Open Information Security Foundation
  *
  * You can copy, redistribute or modify this Program under the terms of
  * the GNU General Public License version 2 as published by the Free
@@ -38,6 +38,7 @@ enum RedisMode { REDIS_LIST, REDIS_CHANNEL };
 
 typedef struct RedisSetup_ {
     enum RedisMode mode;
+    const char *format;
     const char *command;
     const char *key;
     const char *server;
index 7bf4165c362a7cfd4ff602a6d5b4021f0b596ede..6b87db93b01cecf47f1dc8ce0631857696ea4b89 100644 (file)
@@ -112,10 +112,11 @@ outputs:
       #  server: 127.0.0.1
       #  port: 6379
       #  async: true ## if redis replies are read asynchronously
-      #  mode: list ## possible values: list|lpush (default), rpush, channel|publish
+      #  mode: list ## possible values: list|lpush (default), rpush, channel|publish, xadd|stream
       #             ## lpush and rpush are using a Redis list. "list" is an alias for lpush
       #             ## publish is using a Redis channel. "channel" is an alias for publish
-      #  key: suricata ## key or channel to use (default to suricata)
+      #             ## xadd is using a Redis stream. "stream" is an alias for xadd
+      #  key: suricata ## string denoting the key/channel/stream to use (default to suricata)
       # Redis pipelining set up. This will enable to only do a query every
       # 'batch-size' events. This should lower the latency induced by network
       # connection at the cost of some memory. There is no flushing implemented