exit(EXIT_FAILURE);
}
+ log_ctx->redis_setup.batch_size = 0;
+
+ ConfNode *pipelining = ConfNodeLookupChild(redis_node, "pipelining");
+ if (pipelining) {
+ int enabled = 0;
+ int ret;
+ intmax_t val;
+ ret = ConfGetChildValueBool(pipelining, "enabled", &enabled);
+ if (ret && enabled) {
+ ret = ConfGetChildValueInt(pipelining, "batch-size", &val);
+ if (ret) {
+ log_ctx->redis_setup.batch_size = val;
+ } else {
+ log_ctx->redis_setup.batch_size = 10;
+ }
+ }
+ }
+
if (!strcmp(redis_mode, "list")) {
log_ctx->redis_setup.command = SCStrdup("LPUSH");
if (!log_ctx->redis_setup.command) {
lf_ctx->Write = SCLogFileWrite;
lf_ctx->Close = SCLogFileClose;
+#ifdef HAVE_LIBHIREDIS
+ SC_ATOMIC_INIT(lf_ctx->redis_setup.batch_count);
+#endif
+
return lf_ctx;
}
}
#if HAVE_LIBHIREDIS
else if (file_ctx->type == LOGFILE_TYPE_REDIS) {
- /* FIXME go async here */
- redisReply *reply = redisCommand(file_ctx->redis, "%s %s %s",
- file_ctx->redis_setup.command,
- file_ctx->redis_setup.key,
- string);
- switch (reply->type) {
- case REDIS_REPLY_ERROR:
- SCLogWarning(SC_WARN_NO_UNITTESTS, "Redis error: %s", reply->str);
- break;
- case REDIS_REPLY_INTEGER:
- SCLogDebug("Redis integer %lld", reply->integer);
- break;
- default:
- SCLogError(SC_ERR_INVALID_VALUE,
- "Redis default triggered with %d", reply->type);
- break;
+ /* FIXME go async here ? */
+ if (file_ctx->redis_setup.batch_size) {
+ redisAppendCommand(file_ctx->redis, "%s %s %s",
+ file_ctx->redis_setup.command,
+ file_ctx->redis_setup.key,
+ string);
+ if (SC_ATOMIC_CAS(&file_ctx->redis_setup.batch_count, file_ctx->redis_setup.batch_size, 0)) {
+ redisReply *reply;
+ int i;
+ for(i = 0; i <= file_ctx->redis_setup.batch_size; i++) {
+ if (redisGetReply(file_ctx->redis, (void **)&reply) == REDIS_OK) {
+ freeReplyObject(reply);
+ } else {
+ /* FIXME treat error */
+ SCLogInfo("Error when fetching reply");
+ }
+ }
+ } else {
+ SC_ATOMIC_ADD(file_ctx->redis_setup.batch_count, 1);
+ }
+ } else {
+ redisReply *reply = redisCommand(file_ctx->redis, "%s %s %s",
+ file_ctx->redis_setup.command,
+ file_ctx->redis_setup.key,
+ string);
+
+ switch (reply->type) {
+ case REDIS_REPLY_ERROR:
+ SCLogWarning(SC_WARN_NO_UNITTESTS, "Redis error: %s", reply->str);
+ break;
+ case REDIS_REPLY_INTEGER:
+ SCLogDebug("Redis integer %lld", reply->integer);
+ break;
+ default:
+ SCLogError(SC_ERR_INVALID_VALUE,
+ "Redis default triggered with %d", reply->type);
+ break;
+ }
+ freeReplyObject(reply);
}
- freeReplyObject(reply);
}
#endif
SCMutexUnlock(&file_ctx->fp_mutex);