]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
output/log: Add flushing infrastructure
authorJeff Lucovsky <jlucovsky@oisf.net>
Tue, 30 Apr 2024 14:44:54 +0000 (10:44 -0400)
committerVictor Julien <victor@inliniac.net>
Wed, 26 Feb 2025 09:30:41 +0000 (10:30 +0100)
Issue: 3449

Add flushing functions and infrastructure. This includes:
- Flushing functions for packet loggers
- Log file flushing support

src/output-eve-stream.c
src/output-json-alert.c
src/output-json-anomaly.c
src/output-json-common.c
src/output-json-drop.c
src/output-json-frame.c
src/output-json-metadata.c
src/output-json.c
src/output-json.h
src/util-logopenfile.c
src/util-logopenfile.h

index 15d262b435baf8f1365c23b7e3712a2d1dbb036a..3fcd12ea41df76c638800b5d75c24f1cbeb91390 100644 (file)
@@ -454,7 +454,7 @@ void EveStreamLogRegister(void)
 {
     OutputPacketLoggerFunctions output_logger_functions = {
         .LogFunc = EveStreamLogger,
-        .FlushFunc = NULL,
+        .FlushFunc = OutputJsonLogFlush,
         .ConditionFunc = EveStreamLogCondition,
         .ThreadInitFunc = EveStreamLogThreadInit,
         .ThreadDeinitFunc = EveStreamLogThreadDeinit,
index 419a1d2b42bd0a1d6b761e74a8c8b6493f789099..ed6066be082bc1e9d80f4312adaa1de209a6da0c 100644 (file)
@@ -823,6 +823,14 @@ static int AlertJsonDecoderEvent(ThreadVars *tv, JsonAlertLogThread *aft, const
     return TM_ECODE_OK;
 }
 
+static int JsonAlertFlush(ThreadVars *tv, void *thread_data, const Packet *p)
+{
+    JsonAlertLogThread *aft = thread_data;
+    SCLogDebug("%s flushing %s", tv->name, ((LogFileCtx *)(aft->ctx->file_ctx))->filename);
+    OutputJsonFlush(aft->ctx);
+    return 0;
+}
+
 static int JsonAlertLogger(ThreadVars *tv, void *thread_data, const Packet *p)
 {
     JsonAlertLogThread *aft = thread_data;
@@ -1067,7 +1075,7 @@ void JsonAlertLogRegister (void)
 {
     OutputPacketLoggerFunctions output_logger_functions = {
         .LogFunc = JsonAlertLogger,
-        .FlushFunc = NULL,
+        .FlushFunc = JsonAlertFlush,
         .ConditionFunc = JsonAlertLogCondition,
         .ThreadInitFunc = JsonAlertLogThreadInit,
         .ThreadDeinitFunc = JsonAlertLogThreadDeinit,
index cd9e5dc068cce16f1544c6496ea0f36996ea41ac..00c4cbd57085f98523c4d2e2b330f0c5745a067f 100644 (file)
@@ -272,6 +272,14 @@ static int AnomalyJson(ThreadVars *tv, JsonAnomalyLogThread *aft, const Packet *
     return rc;
 }
 
+static int JsonAnomalyFlush(ThreadVars *tv, void *thread_data, const Packet *p)
+{
+    JsonAnomalyLogThread *aft = thread_data;
+    SCLogDebug("%s flushing %s", tv->name, ((LogFileCtx *)(aft->ctx->file_ctx))->filename);
+    OutputJsonFlush(aft->ctx);
+    return 0;
+}
+
 static int JsonAnomalyLogger(ThreadVars *tv, void *thread_data, const Packet *p)
 {
     JsonAnomalyLogThread *aft = thread_data;
@@ -451,7 +459,7 @@ void JsonAnomalyLogRegister (void)
 {
     OutputPacketLoggerFunctions output_logger_functions = {
         .LogFunc = JsonAnomalyLogger,
-        .FlushFunc = NULL,
+        .FlushFunc = JsonAnomalyFlush,
         .ConditionFunc = JsonAnomalyLogCondition,
         .ThreadInitFunc = JsonAnomalyLogThreadInit,
         .ThreadDeinitFunc = JsonAnomalyLogThreadDeinit,
index 5723e05a386a42c5debf4a36eeec5849e1400be7..1ec08b69050e151051d783a9fb20edc353f9ae71 100644 (file)
@@ -70,6 +70,15 @@ static void OutputJsonLogDeInitCtxSub(OutputCtx *output_ctx)
     SCFree(output_ctx);
 }
 
+int OutputJsonLogFlush(ThreadVars *tv, void *thread_data, const Packet *p)
+{
+    OutputJsonThreadCtx *aft = thread_data;
+    LogFileCtx *file_ctx = aft->ctx->file_ctx;
+    SCLogDebug("%s flushing %s", tv->name, file_ctx->filename);
+    LogFileFlush(file_ctx);
+    return 0;
+}
+
 OutputInitResult OutputJsonLogInitSub(ConfNode *conf, OutputCtx *parent_ctx)
 {
     OutputInitResult result = { NULL, false };
index 29ead13e0748b02660e8aced94f15f2494e8a2c5..79e663c437710f6ea8cf903ec9d23c378540b283 100644 (file)
@@ -392,7 +392,7 @@ void JsonDropLogRegister (void)
 {
     OutputPacketLoggerFunctions output_logger_functions = {
         .LogFunc = JsonDropLogger,
-        .FlushFunc = NULL,
+        .FlushFunc = OutputJsonLogFlush,
         .ConditionFunc = JsonDropLogCondition,
         .ThreadInitFunc = JsonDropLogThreadInit,
         .ThreadDeinitFunc = JsonDropLogThreadDeinit,
index dfd895b8ab6c7be31e1c82c19fc554d55a81ee0a..3ae80b820f669b6b32806533762b779a91580627 100644 (file)
@@ -562,7 +562,7 @@ void JsonFrameLogRegister(void)
 {
     OutputPacketLoggerFunctions output_logger_functions = {
         .LogFunc = JsonFrameLogger,
-        .FlushFunc = NULL,
+        .FlushFunc = OutputJsonLogFlush,
         .ConditionFunc = JsonFrameLogCondition,
         .ThreadInitFunc = JsonFrameLogThreadInit,
         .ThreadDeinitFunc = JsonFrameLogThreadDeinit,
index a87d735839d4be28a531ca330d011c787dc8b689..c930eaf177b82af407571d894af1c9bc01c145e6 100644 (file)
@@ -96,7 +96,7 @@ void JsonMetadataLogRegister (void)
 {
     OutputPacketLoggerFunctions output_logger_functions = {
         .LogFunc = JsonMetadataLogger,
-        .FlushFunc = NULL,
+        .FlushFunc = OutputJsonLogFlush,
         .ConditionFunc = JsonMetadataLogCondition,
         .ThreadInitFunc = JsonLogThreadInit,
         .ThreadDeinitFunc = JsonLogThreadDeinit,
index 37e0a87b6fe4cf470fb4ba3b394b434017e998f8..422191adbc3574fe5bc52f0b08fd46aaa7c27506 100644 (file)
@@ -956,6 +956,12 @@ int OutputJSONBuffer(json_t *js, LogFileCtx *file_ctx, MemBuffer **buffer)
     return 0;
 }
 
+void OutputJsonFlush(OutputJsonThreadCtx *ctx)
+{
+    LogFileCtx *file_ctx = ctx->file_ctx;
+    LogFileFlush(file_ctx);
+}
+
 void OutputJsonBuilderBuffer(
         ThreadVars *tv, const Packet *p, Flow *f, JsonBuilder *js, OutputJsonThreadCtx *ctx)
 {
index b8c11778ed272c8c58e3e3209180c111b5353a61..82989a11563964f7352329b5358966101c0b6585 100644 (file)
@@ -114,6 +114,7 @@ TmEcode JsonLogThreadDeinit(ThreadVars *t, void *data);
 
 void EveAddCommonOptions(const OutputJsonCommonSettings *cfg, const Packet *p, const Flow *f,
         JsonBuilder *js, enum OutputJsonLogDirection dir);
+int OutputJsonLogFlush(ThreadVars *tv, void *thread_data, const Packet *p);
 void EveAddMetadata(const Packet *p, const Flow *f, JsonBuilder *js);
 
 int OutputJSONMemBufferCallback(const char *str, size_t size, void *data);
@@ -121,5 +122,6 @@ int OutputJSONMemBufferCallback(const char *str, size_t size, void *data);
 OutputJsonThreadCtx *CreateEveThreadCtx(ThreadVars *t, OutputJsonCtx *ctx);
 void FreeEveThreadCtx(OutputJsonThreadCtx *ctx);
 void JSONFormatAndAddMACAddr(JsonBuilder *js, const char *key, const uint8_t *val, bool is_array);
+void OutputJsonFlush(OutputJsonThreadCtx *ctx);
 
 #endif /* SURICATA_OUTPUT_JSON_H */
index 7d4459ebc942581d71720753a31a6dd43f61ff3d..c27dcf8bab2c67fad369458f8dc8538d521480c5 100644 (file)
@@ -126,7 +126,7 @@ static int SCLogUnixSocketReconnect(LogFileCtx *log_ctx)
     log_ctx->fp = SCLogOpenUnixSocketFp(log_ctx->filename, log_ctx->sock_type, 0);
     if (log_ctx->fp) {
         /* Connected at last (or reconnected) */
-        SCLogNotice("Reconnected socket \"%s\"", log_ctx->filename);
+        SCLogDebug("Reconnected socket \"%s\"", log_ctx->filename);
     } else if (disconnected) {
         SCLogWarning("Reconnect failed: %s (will keep trying)", strerror(errno));
     }
@@ -189,6 +189,22 @@ static inline void OutputWriteLock(pthread_mutex_t *m)
 
 }
 
+/**
+ * \brief Flush a log file.
+ */
+static void SCLogFileFlushNoLock(LogFileCtx *log_ctx)
+{
+    log_ctx->bytes_since_last_flush = 0;
+    SCFflushUnlocked(log_ctx->fp);
+}
+
+static void SCLogFileFlush(LogFileCtx *log_ctx)
+{
+    OutputWriteLock(&log_ctx->fp_mutex);
+    SCLogFileFlushNoLock(log_ctx);
+    SCMutexUnlock(&log_ctx->fp_mutex);
+}
+
 /**
  * \brief Write buffer to log file.
  * \retval 0 on failure; otherwise, the return value of fwrite_unlocked (number of
@@ -224,8 +240,15 @@ static int SCLogFileWriteNoLock(const char *buffer, int buffer_len, LogFileCtx *
                         log_ctx->filename);
             }
             log_ctx->output_errors++;
-        } else if (log_ctx->buffer_size) {
-            SCFflushUnlocked(log_ctx->fp);
+            return ret;
+        }
+
+        log_ctx->bytes_since_last_flush += buffer_len;
+
+        if (log_ctx->buffer_size && log_ctx->bytes_since_last_flush >= log_ctx->buffer_size) {
+            SCLogDebug("%s: flushing %" PRIu64 " during write", log_ctx->filename,
+                    log_ctx->bytes_since_last_flush);
+            SCLogFileFlushNoLock(log_ctx);
         }
     }
 
@@ -248,35 +271,7 @@ static int SCLogFileWrite(const char *buffer, int buffer_len, LogFileCtx *log_ct
     } else
 #endif
     {
-
-        /* Check for rotation. */
-        if (log_ctx->rotation_flag) {
-            log_ctx->rotation_flag = 0;
-            SCConfLogReopen(log_ctx);
-        }
-
-        if (log_ctx->flags & LOGFILE_ROTATE_INTERVAL) {
-            time_t now = time(NULL);
-            if (now >= log_ctx->rotate_time) {
-                SCConfLogReopen(log_ctx);
-                log_ctx->rotate_time = now + log_ctx->rotate_interval;
-            }
-        }
-
-        if (log_ctx->fp) {
-            clearerr(log_ctx->fp);
-            if (1 != fwrite(buffer, buffer_len, 1, log_ctx->fp)) {
-                /* Only the first error is logged */
-                if (!log_ctx->output_errors) {
-                    SCLogError("%s error while writing to %s",
-                            ferror(log_ctx->fp) ? strerror(errno) : "unknown error",
-                            log_ctx->filename);
-                }
-                log_ctx->output_errors++;
-            } else {
-                fflush(log_ctx->fp);
-            }
-        }
+        ret = SCLogFileWriteNoLock(buffer, buffer_len, log_ctx);
     }
 
     SCMutexUnlock(&log_ctx->fp_mutex);
@@ -709,6 +704,7 @@ LogFileCtx *LogFileNewCtx(void)
 
     lf_ctx->Write = SCLogFileWrite;
     lf_ctx->Close = SCLogFileClose;
+    lf_ctx->Flush = SCLogFileFlush;
 
     return lf_ctx;
 }
@@ -977,6 +973,12 @@ int LogFileFreeCtx(LogFileCtx *lf_ctx)
     SCReturnInt(1);
 }
 
+void LogFileFlush(LogFileCtx *file_ctx)
+{
+    SCLogDebug("%s: bytes-to-flush %ld", file_ctx->filename, file_ctx->bytes_since_last_flush);
+    file_ctx->Flush(file_ctx);
+}
+
 int LogFileWrite(LogFileCtx *file_ctx, MemBuffer *buffer)
 {
     if (file_ctx->type == LOGFILE_TYPE_FILE || file_ctx->type == LOGFILE_TYPE_UNIX_DGRAM ||
index efa8159686ad93b06df227014609aad938883f31..19c9e5e1c7179e904181741bc30d4e005de091ad 100644 (file)
@@ -86,6 +86,7 @@ typedef struct LogFileCtx_ {
 
     int (*Write)(const char *buffer, int buffer_len, struct LogFileCtx_ *fp);
     void (*Close)(struct LogFileCtx_ *fp);
+    void (*Flush)(struct LogFileCtx_ *fp);
 
     LogFileTypeCtx filetype;
 
@@ -159,6 +160,9 @@ typedef struct LogFileCtx_ {
     uint64_t dropped;
 
     uint64_t output_errors;
+
+    /* Track buffered content */
+    uint64_t bytes_since_last_flush;
 } LogFileCtx;
 
 /* Min time (msecs) before trying to reconnect a Unix domain socket */
@@ -173,6 +177,7 @@ typedef struct LogFileCtx_ {
 LogFileCtx *LogFileNewCtx(void);
 int LogFileFreeCtx(LogFileCtx *);
 int LogFileWrite(LogFileCtx *file_ctx, MemBuffer *buffer);
+void LogFileFlush(LogFileCtx *file_ctx);
 
 LogFileCtx *LogFileEnsureExists(ThreadId thread_id, LogFileCtx *lf_ctx);
 int SCConfLogOpenGeneric(ConfNode *conf, LogFileCtx *, const char *, int);