From: Eric Leblond Date: Sun, 24 May 2015 19:52:56 +0000 (+0200) Subject: util-logopenfile: implement redis pipelining X-Git-Tag: suricata-3.0RC1~50 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=b834e2d19ae66b0ea0798e26818f5b100c2dc7d5;p=thirdparty%2Fsuricata.git util-logopenfile: implement redis pipelining This patch implements redis pipelining. This consist in contacting the redis server every N events to minimize the number of TCP exchange. This is optional and setup via the configuration file. --- diff --git a/src/util-logopenfile.c b/src/util-logopenfile.c index 519e4f7735..08a8f74be0 100644 --- a/src/util-logopenfile.c +++ b/src/util-logopenfile.c @@ -362,6 +362,24 @@ int SCConfLogOpenRedis(ConfNode *redis_node, LogFileCtx *log_ctx) exit(EXIT_FAILURE); } + log_ctx->redis_setup.batch_size = 0; + + ConfNode *pipelining = ConfNodeLookupChild(redis_node, "pipelining"); + if (pipelining) { + int enabled = 0; + int ret; + intmax_t val; + ret = ConfGetChildValueBool(pipelining, "enabled", &enabled); + if (ret && enabled) { + ret = ConfGetChildValueInt(pipelining, "batch-size", &val); + if (ret) { + log_ctx->redis_setup.batch_size = val; + } else { + log_ctx->redis_setup.batch_size = 10; + } + } + } + if (!strcmp(redis_mode, "list")) { log_ctx->redis_setup.command = SCStrdup("LPUSH"); if (!log_ctx->redis_setup.command) { @@ -404,6 +422,10 @@ LogFileCtx *LogFileNewCtx(void) lf_ctx->Write = SCLogFileWrite; lf_ctx->Close = SCLogFileClose; +#ifdef HAVE_LIBHIREDIS + SC_ATOMIC_INIT(lf_ctx->redis_setup.batch_count); +#endif + return lf_ctx; } @@ -459,24 +481,46 @@ int LogFileWrite(LogFileCtx *file_ctx, MemBuffer *buffer, char *string, size_t s } #if HAVE_LIBHIREDIS else if (file_ctx->type == LOGFILE_TYPE_REDIS) { - /* FIXME go async here */ - redisReply *reply = redisCommand(file_ctx->redis, "%s %s %s", - file_ctx->redis_setup.command, - file_ctx->redis_setup.key, - string); - switch (reply->type) { - case REDIS_REPLY_ERROR: - SCLogWarning(SC_WARN_NO_UNITTESTS, "Redis error: %s", reply->str); - break; - case REDIS_REPLY_INTEGER: - SCLogDebug("Redis integer %lld", reply->integer); - break; - default: - SCLogError(SC_ERR_INVALID_VALUE, - "Redis default triggered with %d", reply->type); - break; + /* FIXME go async here ? */ + if (file_ctx->redis_setup.batch_size) { + redisAppendCommand(file_ctx->redis, "%s %s %s", + file_ctx->redis_setup.command, + file_ctx->redis_setup.key, + string); + if (SC_ATOMIC_CAS(&file_ctx->redis_setup.batch_count, file_ctx->redis_setup.batch_size, 0)) { + redisReply *reply; + int i; + for(i = 0; i <= file_ctx->redis_setup.batch_size; i++) { + if (redisGetReply(file_ctx->redis, (void **)&reply) == REDIS_OK) { + freeReplyObject(reply); + } else { + /* FIXME treat error */ + SCLogInfo("Error when fetching reply"); + } + } + } else { + SC_ATOMIC_ADD(file_ctx->redis_setup.batch_count, 1); + } + } else { + redisReply *reply = redisCommand(file_ctx->redis, "%s %s %s", + file_ctx->redis_setup.command, + file_ctx->redis_setup.key, + string); + + switch (reply->type) { + case REDIS_REPLY_ERROR: + SCLogWarning(SC_WARN_NO_UNITTESTS, "Redis error: %s", reply->str); + break; + case REDIS_REPLY_INTEGER: + SCLogDebug("Redis integer %lld", reply->integer); + break; + default: + SCLogError(SC_ERR_INVALID_VALUE, + "Redis default triggered with %d", reply->type); + break; + } + freeReplyObject(reply); } - freeReplyObject(reply); } #endif SCMutexUnlock(&file_ctx->fp_mutex); diff --git a/src/util-logopenfile.h b/src/util-logopenfile.h index 82ea4eefdd..966fc7545c 100644 --- a/src/util-logopenfile.h +++ b/src/util-logopenfile.h @@ -54,6 +54,8 @@ typedef struct RedisSetup_ { char *command; char *key; char *sensor_name; + int batch_size; + SC_ATOMIC_DECLARE(int, batch_count); } RedisSetup; #endif diff --git a/suricata.yaml.in b/suricata.yaml.in index 43d5e17a4c..e19b855b06 100644 --- a/suricata.yaml.in +++ b/suricata.yaml.in @@ -109,6 +109,13 @@ outputs: # port: 6379 # mode: list ## possible values: list (default), channel # key: suricata ## key or channel to use (default to suricata) + # Redis pipelining set up. This will enable to only do a query every + # 'batch-size' events. This should lower the latency induced by network + # connection at the cost of some memory. There is no flushing implemented + # so this setting as to be reserved to high traffic suricata. + # pipelining: + # enabled: yes ## set enable to yes to enable query pipelining + # batch-size: 10 ## number of entry to keep in buffer types: - alert: # payload: yes # enable dumping payload in Base64