}
-#if HAVE_LIBHIREDIS
+#ifdef HAVE_LIBHIREDIS
+
+static void SCLogFileCloseRedis(LogFileCtx *log_ctx)
+{
+ if (log_ctx->redis)
+ redisFree(log_ctx->redis);
+ log_ctx->redis_setup.tried = 0;
+ SC_ATOMIC_SET(log_ctx->redis_setup.batch_count, 0);
+}
+
int SCConfLogOpenRedis(ConfNode *redis_node, LogFileCtx *log_ctx)
{
const char *redis_server = NULL;
SCLogError(SC_ERR_SOCKET, "Error connecting to redis server: %s", c->errstr);
exit(EXIT_FAILURE);
}
+
+ /* store server params for reconnection */
+ log_ctx->redis_setup.server = SCStrdup(redis_server);
+ if (!log_ctx->redis_setup.server) {
+ SCLogError(SC_ERR_MEM_ALLOC, "Error allocating redis server string");
+ exit(EXIT_FAILURE);
+ }
+ log_ctx->redis_setup.port = atoi(redis_port);
+ log_ctx->redis_setup.tried = 0;
+
log_ctx->redis = c;
+ log_ctx->Close = SCLogFileCloseRedis;
+
return 0;
}
+
+int SCConfLogReopenRedis(LogFileCtx *log_ctx)
+{
+ if (log_ctx->redis != NULL) {
+ redisFree(log_ctx->redis);
+ log_ctx->redis = NULL;
+ }
+
+ /* only try to reconnect once per second */
+ if (log_ctx->redis_setup.tried >= time(NULL)) {
+ return -1;
+ }
+
+ redisContext *c = redisConnect(log_ctx->redis_setup.server, log_ctx->redis_setup.port);
+ if (c != NULL && c->err) {
+ if (log_ctx->redis_setup.tried == 0) {
+ SCLogError(SC_ERR_SOCKET, "Error connecting to redis server: %s\n", c->errstr);
+ }
+ redisFree(c);
+ log_ctx->redis_setup.tried = time(NULL);
+ return -1;
+ }
+ log_ctx->redis = c;
+ log_ctx->redis_setup.tried = 0;
+ SC_ATOMIC_SET(log_ctx->redis_setup.batch_count, 0);
+ return 0;
+}
+
#endif
/** \brief LogFileNewCtx() Get a new LogFileCtx
#ifdef HAVE_LIBHIREDIS
if (lf_ctx->type == LOGFILE_TYPE_REDIS && lf_ctx->redis) {
redisFree(lf_ctx->redis);
+ SCFree(lf_ctx->redis_setup.server);
+ SCFree(lf_ctx->redis_setup.command);
+ SCFree(lf_ctx->redis_setup.key);
}
#endif
}
#if HAVE_LIBHIREDIS
else if (file_ctx->type == LOGFILE_TYPE_REDIS) {
+ if (file_ctx->redis == NULL) {
+ /* FIXME temporisation */
+ SCConfLogReopenRedis(file_ctx);
+ if (file_ctx->redis == NULL) {
+ SCMutexUnlock(&file_ctx->fp_mutex);
+ return -1;
+ } else {
+ SCLogInfo("Reconnected to redis server");
+ }
+ }
/* FIXME go async here ? */
if (file_ctx->redis_setup.batch_size) {
redisAppendCommand(file_ctx->redis, "%s %s %s",
freeReplyObject(reply);
} else {
/* FIXME treat error */
- SCLogInfo("Error when fetching reply");
+ if (file_ctx->redis->err) {
+ SCLogInfo("Error when fetching reply: %s (%d)",
+ file_ctx->redis->errstr,
+ file_ctx->redis->err);
+ }
+ switch (file_ctx->redis->err) {
+ case REDIS_ERR_EOF:
+ case REDIS_ERR_IO:
+ SCLogInfo("Reopening connection to redis server");
+ SCConfLogReopenRedis(file_ctx);
+ if (file_ctx->redis) {
+ SCLogInfo("Reconnected to redis server");
+ SCMutexUnlock(&file_ctx->fp_mutex);
+ return 0;
+ } else {
+ SCLogInfo("Unable to reconnect to redis server");
+ SCMutexUnlock(&file_ctx->fp_mutex);
+ return 0;
+ }
+ break;
+ default:
+ SCLogInfo("Unsupported error code %d",
+ file_ctx->redis->err);
+ }
}
}
} else {
switch (reply->type) {
case REDIS_REPLY_ERROR:
SCLogWarning(SC_WARN_NO_UNITTESTS, "Redis error: %s", reply->str);
+ SCConfLogReopenRedis(file_ctx);
break;
case REDIS_REPLY_INTEGER:
SCLogDebug("Redis integer %lld", reply->integer);
default:
SCLogError(SC_ERR_INVALID_VALUE,
"Redis default triggered with %d", reply->type);
+ SCConfLogReopenRedis(file_ctx);
break;
}
freeReplyObject(reply);