-/* Copyright (C) 2007-2021 Open Information Security Foundation
+/* Copyright (C) 2007-2024 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
static const char * redis_lpush_cmd = "LPUSH";
static const char * redis_rpush_cmd = "RPUSH";
static const char * redis_publish_cmd = "PUBLISH";
+static const char *redis_xadd_cmd = "XADD";
static const char * redis_default_key = "suricata";
static const char * redis_default_server = "127.0.0.1";
+static const char *redis_default_format = "%s %s %s";
+static const char *redis_stream_format = "%s %s * eve %s";
static int SCConfLogReopenSyncRedis(LogFileCtx *log_ctx);
static void SCLogFileCloseRedis(LogFileCtx *log_ctx);
return -1;
}
- redisAsyncCommand(ctx->async,
- SCRedisAsyncCommandCallback,
- file_ctx,
- "%s %s %s",
- file_ctx->redis_setup.command,
- file_ctx->redis_setup.key,
+ redisAsyncCommand(ctx->async, SCRedisAsyncCommandCallback, file_ctx,
+ file_ctx->redis_setup.format, file_ctx->redis_setup.command, file_ctx->redis_setup.key,
string);
event_base_loop(ctx->ev_base, EVLOOP_NONBLOCK);
/* synchronous mode */
if (file_ctx->redis_setup.batch_size) {
- redisAppendCommand(redis, "%s %s %s",
- file_ctx->redis_setup.command,
- file_ctx->redis_setup.key,
- string);
+ redisAppendCommand(redis, file_ctx->redis_setup.format, file_ctx->redis_setup.command,
+ file_ctx->redis_setup.key, string);
time_t now = time(NULL);
if ((ctx->batch_count == file_ctx->redis_setup.batch_size) || (ctx->last_push < now)) {
redisReply *reply;
redis = ctx->sync;
if (redis) {
SCLogInfo("Reconnected to redis server");
- redisAppendCommand(redis, "%s %s %s",
- file_ctx->redis_setup.command,
- file_ctx->redis_setup.key,
+ redisAppendCommand(redis, file_ctx->redis_setup.format,
+ file_ctx->redis_setup.command, file_ctx->redis_setup.key,
string);
ctx->batch_count++;
return 0;
ctx->batch_count++;
}
} else {
- redisReply *reply = redisCommand(redis, "%s %s %s",
- file_ctx->redis_setup.command,
- file_ctx->redis_setup.key,
- string);
+ redisReply *reply = redisCommand(redis, file_ctx->redis_setup.format,
+ file_ctx->redis_setup.command, file_ctx->redis_setup.key, string);
/* We may lose the reply if disconnection happens*/
if (reply) {
switch (reply->type) {
SCLogDebug("Redis integer %lld", reply->integer);
ret = 0;
break;
+ case REDIS_REPLY_STRING:
+ SCLogDebug("Redis string %s", reply->str);
+ ret = 0;
+ break;
default:
SCLogError("Redis default triggered with %d", reply->type);
SCConfLogReopenSyncRedis(file_ctx);
log_ctx->redis_setup.batch_size = 0;
}
+ log_ctx->redis_setup.format = redis_default_format;
if (!strcmp(redis_mode, "list") || !strcmp(redis_mode,"lpush")) {
log_ctx->redis_setup.command = redis_lpush_cmd;
} else if(!strcmp(redis_mode, "rpush")){
log_ctx->redis_setup.command = redis_rpush_cmd;
} else if(!strcmp(redis_mode,"channel") || !strcmp(redis_mode,"publish")) {
log_ctx->redis_setup.command = redis_publish_cmd;
+ } else if (!strcmp(redis_mode, "stream") || !strcmp(redis_mode, "xadd")) {
+ log_ctx->redis_setup.command = redis_xadd_cmd;
+ log_ctx->redis_setup.format = redis_stream_format;
} else {
- FatalError("Invalid redis mode");
+ FatalError("Invalid redis mode: %s", redis_mode);
}
/* store server params for reconnection */