{
OutputPacketLoggerFunctions output_logger_functions = {
.LogFunc = EveStreamLogger,
- .FlushFunc = NULL,
+ .FlushFunc = OutputJsonLogFlush,
.ConditionFunc = EveStreamLogCondition,
.ThreadInitFunc = EveStreamLogThreadInit,
.ThreadDeinitFunc = EveStreamLogThreadDeinit,
return TM_ECODE_OK;
}
+static int JsonAlertFlush(ThreadVars *tv, void *thread_data, const Packet *p)
+{
+ JsonAlertLogThread *aft = thread_data;
+ SCLogDebug("%s flushing %s", tv->name, ((LogFileCtx *)(aft->ctx->file_ctx))->filename);
+ OutputJsonFlush(aft->ctx);
+ return 0;
+}
+
static int JsonAlertLogger(ThreadVars *tv, void *thread_data, const Packet *p)
{
JsonAlertLogThread *aft = thread_data;
{
OutputPacketLoggerFunctions output_logger_functions = {
.LogFunc = JsonAlertLogger,
- .FlushFunc = NULL,
+ .FlushFunc = JsonAlertFlush,
.ConditionFunc = JsonAlertLogCondition,
.ThreadInitFunc = JsonAlertLogThreadInit,
.ThreadDeinitFunc = JsonAlertLogThreadDeinit,
return rc;
}
+static int JsonAnomalyFlush(ThreadVars *tv, void *thread_data, const Packet *p)
+{
+ JsonAnomalyLogThread *aft = thread_data;
+ SCLogDebug("%s flushing %s", tv->name, ((LogFileCtx *)(aft->ctx->file_ctx))->filename);
+ OutputJsonFlush(aft->ctx);
+ return 0;
+}
+
static int JsonAnomalyLogger(ThreadVars *tv, void *thread_data, const Packet *p)
{
JsonAnomalyLogThread *aft = thread_data;
{
OutputPacketLoggerFunctions output_logger_functions = {
.LogFunc = JsonAnomalyLogger,
- .FlushFunc = NULL,
+ .FlushFunc = JsonAnomalyFlush,
.ConditionFunc = JsonAnomalyLogCondition,
.ThreadInitFunc = JsonAnomalyLogThreadInit,
.ThreadDeinitFunc = JsonAnomalyLogThreadDeinit,
SCFree(output_ctx);
}
+int OutputJsonLogFlush(ThreadVars *tv, void *thread_data, const Packet *p)
+{
+ OutputJsonThreadCtx *aft = thread_data;
+ LogFileCtx *file_ctx = aft->ctx->file_ctx;
+ SCLogDebug("%s flushing %s", tv->name, file_ctx->filename);
+ LogFileFlush(file_ctx);
+ return 0;
+}
+
OutputInitResult OutputJsonLogInitSub(ConfNode *conf, OutputCtx *parent_ctx)
{
OutputInitResult result = { NULL, false };
{
OutputPacketLoggerFunctions output_logger_functions = {
.LogFunc = JsonDropLogger,
- .FlushFunc = NULL,
+ .FlushFunc = OutputJsonLogFlush,
.ConditionFunc = JsonDropLogCondition,
.ThreadInitFunc = JsonDropLogThreadInit,
.ThreadDeinitFunc = JsonDropLogThreadDeinit,
{
OutputPacketLoggerFunctions output_logger_functions = {
.LogFunc = JsonFrameLogger,
- .FlushFunc = NULL,
+ .FlushFunc = OutputJsonLogFlush,
.ConditionFunc = JsonFrameLogCondition,
.ThreadInitFunc = JsonFrameLogThreadInit,
.ThreadDeinitFunc = JsonFrameLogThreadDeinit,
{
OutputPacketLoggerFunctions output_logger_functions = {
.LogFunc = JsonMetadataLogger,
- .FlushFunc = NULL,
+ .FlushFunc = OutputJsonLogFlush,
.ConditionFunc = JsonMetadataLogCondition,
.ThreadInitFunc = JsonLogThreadInit,
.ThreadDeinitFunc = JsonLogThreadDeinit,
return 0;
}
+void OutputJsonFlush(OutputJsonThreadCtx *ctx)
+{
+ LogFileCtx *file_ctx = ctx->file_ctx;
+ LogFileFlush(file_ctx);
+}
+
void OutputJsonBuilderBuffer(
ThreadVars *tv, const Packet *p, Flow *f, JsonBuilder *js, OutputJsonThreadCtx *ctx)
{
void EveAddCommonOptions(const OutputJsonCommonSettings *cfg, const Packet *p, const Flow *f,
JsonBuilder *js, enum OutputJsonLogDirection dir);
+int OutputJsonLogFlush(ThreadVars *tv, void *thread_data, const Packet *p);
void EveAddMetadata(const Packet *p, const Flow *f, JsonBuilder *js);
int OutputJSONMemBufferCallback(const char *str, size_t size, void *data);
OutputJsonThreadCtx *CreateEveThreadCtx(ThreadVars *t, OutputJsonCtx *ctx);
void FreeEveThreadCtx(OutputJsonThreadCtx *ctx);
void JSONFormatAndAddMACAddr(JsonBuilder *js, const char *key, const uint8_t *val, bool is_array);
+void OutputJsonFlush(OutputJsonThreadCtx *ctx);
#endif /* SURICATA_OUTPUT_JSON_H */
log_ctx->fp = SCLogOpenUnixSocketFp(log_ctx->filename, log_ctx->sock_type, 0);
if (log_ctx->fp) {
/* Connected at last (or reconnected) */
- SCLogNotice("Reconnected socket \"%s\"", log_ctx->filename);
+ SCLogDebug("Reconnected socket \"%s\"", log_ctx->filename);
} else if (disconnected) {
SCLogWarning("Reconnect failed: %s (will keep trying)", strerror(errno));
}
}
+/**
+ * \brief Flush a log file.
+ */
+static void SCLogFileFlushNoLock(LogFileCtx *log_ctx)
+{
+ log_ctx->bytes_since_last_flush = 0;
+ SCFflushUnlocked(log_ctx->fp);
+}
+
+static void SCLogFileFlush(LogFileCtx *log_ctx)
+{
+ OutputWriteLock(&log_ctx->fp_mutex);
+ SCLogFileFlushNoLock(log_ctx);
+ SCMutexUnlock(&log_ctx->fp_mutex);
+}
+
/**
* \brief Write buffer to log file.
* \retval 0 on failure; otherwise, the return value of fwrite_unlocked (number of
log_ctx->filename);
}
log_ctx->output_errors++;
- } else if (log_ctx->buffer_size) {
- SCFflushUnlocked(log_ctx->fp);
+ return ret;
+ }
+
+ log_ctx->bytes_since_last_flush += buffer_len;
+
+ if (log_ctx->buffer_size && log_ctx->bytes_since_last_flush >= log_ctx->buffer_size) {
+ SCLogDebug("%s: flushing %" PRIu64 " during write", log_ctx->filename,
+ log_ctx->bytes_since_last_flush);
+ SCLogFileFlushNoLock(log_ctx);
}
}
} else
#endif
{
-
- /* Check for rotation. */
- if (log_ctx->rotation_flag) {
- log_ctx->rotation_flag = 0;
- SCConfLogReopen(log_ctx);
- }
-
- if (log_ctx->flags & LOGFILE_ROTATE_INTERVAL) {
- time_t now = time(NULL);
- if (now >= log_ctx->rotate_time) {
- SCConfLogReopen(log_ctx);
- log_ctx->rotate_time = now + log_ctx->rotate_interval;
- }
- }
-
- if (log_ctx->fp) {
- clearerr(log_ctx->fp);
- if (1 != fwrite(buffer, buffer_len, 1, log_ctx->fp)) {
- /* Only the first error is logged */
- if (!log_ctx->output_errors) {
- SCLogError("%s error while writing to %s",
- ferror(log_ctx->fp) ? strerror(errno) : "unknown error",
- log_ctx->filename);
- }
- log_ctx->output_errors++;
- } else {
- fflush(log_ctx->fp);
- }
- }
+ ret = SCLogFileWriteNoLock(buffer, buffer_len, log_ctx);
}
SCMutexUnlock(&log_ctx->fp_mutex);
lf_ctx->Write = SCLogFileWrite;
lf_ctx->Close = SCLogFileClose;
+ lf_ctx->Flush = SCLogFileFlush;
return lf_ctx;
}
SCReturnInt(1);
}
+void LogFileFlush(LogFileCtx *file_ctx)
+{
+ SCLogDebug("%s: bytes-to-flush %ld", file_ctx->filename, file_ctx->bytes_since_last_flush);
+ file_ctx->Flush(file_ctx);
+}
+
int LogFileWrite(LogFileCtx *file_ctx, MemBuffer *buffer)
{
if (file_ctx->type == LOGFILE_TYPE_FILE || file_ctx->type == LOGFILE_TYPE_UNIX_DGRAM ||
int (*Write)(const char *buffer, int buffer_len, struct LogFileCtx_ *fp);
void (*Close)(struct LogFileCtx_ *fp);
+ void (*Flush)(struct LogFileCtx_ *fp);
LogFileTypeCtx filetype;
uint64_t dropped;
uint64_t output_errors;
+
+ /* Track buffered content */
+ uint64_t bytes_since_last_flush;
} LogFileCtx;
/* Min time (msecs) before trying to reconnect a Unix domain socket */
LogFileCtx *LogFileNewCtx(void);
int LogFileFreeCtx(LogFileCtx *);
int LogFileWrite(LogFileCtx *file_ctx, MemBuffer *buffer);
+void LogFileFlush(LogFileCtx *file_ctx);
LogFileCtx *LogFileEnsureExists(ThreadId thread_id, LogFileCtx *lf_ctx);
int SCConfLogOpenGeneric(ConfNode *conf, LogFileCtx *, const char *, int);