]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
main/flush: Support periodic flush logs
authorJeff Lucovsky <jlucovsky@oisf.net>
Sat, 7 Oct 2023 21:08:27 +0000 (17:08 -0400)
committerVictor Julien <victor@inliniac.net>
Wed, 26 Feb 2025 09:30:41 +0000 (10:30 +0100)
Issue: 3449

14 files changed:
src/Makefile.am
src/detect-engine.c
src/detect-engine.h
src/flow-worker.c
src/flow-worker.h
src/log-flush.c [new file with mode: 0644]
src/log-flush.h [new file with mode: 0644]
src/output.c
src/output.h
src/runmodes.c
src/runmodes.h
src/suricata.c
src/suricata.h
suricata.yaml.in

index c3b1e9d237f9bdad63c455cbb9a090279b038187..f5f8c8947abf5c6fb3e9fa099761d44f8f0501ea 100755 (executable)
@@ -344,6 +344,7 @@ noinst_HEADERS = \
        ippair-storage.h \
        ippair-timeout.h \
        log-cf-common.h \
+       log-flush.h \
        log-httplog.h \
        log-pcap.h \
        log-stats.h \
@@ -916,6 +917,7 @@ libsuricata_c_a_SOURCES = \
        ippair-storage.c \
        ippair-timeout.c \
        log-cf-common.c \
+       log-flush.c \
        log-httplog.c \
        log-pcap.c \
        log-stats.c \
index a89a5e04569047019a3da00a6c89e4e8e4257676..153d25d9d0c0ab7d33f1eb22568369723d37ab26 100644 (file)
@@ -2323,15 +2323,50 @@ int DetectEngineInspectPktBufferGeneric(
 }
 
 /** \internal
- *  \brief inject a pseudo packet into each detect thread that doesn't use the
- *         new det_ctx yet
+ *  \brief inject a pseudo packet into each detect thread
+ *         if the thread should flush its output logs.
  */
-static void InjectPackets(ThreadVars **detect_tvs,
-                          DetectEngineThreadCtx **new_det_ctx,
-                          int no_of_detect_tvs)
-{
-    /* inject a fake packet if the detect thread isn't using the new ctx yet,
-     * this speeds up the process */
+void InjectPacketsForFlush(ThreadVars **detect_tvs, int no_of_detect_tvs)
+{
+    /* inject a fake packet if the detect thread that needs it. This function
+     * is called when a heartbeat log-flush request has been made
+     * and it should process a pseudo packet and flush its output logs
+     * to speed the process. */
+#if DEBUG
+    int count = 0;
+#endif
+    for (int i = 0; i < no_of_detect_tvs; i++) {
+        if (detect_tvs[i]) { // && detect_tvs[i]->inq != NULL) {
+            Packet *p = PacketGetFromAlloc();
+            if (p != NULL) {
+                SCLogDebug("Injecting pkt for tv %s[i=%d] %d", detect_tvs[i]->name, i, count++);
+                p->flags |= PKT_PSEUDO_STREAM_END;
+                p->flags |= PKT_PSEUDO_LOG_FLUSH;
+                PKT_SET_SRC(p, PKT_SRC_DETECT_RELOAD_FLUSH);
+                PacketQueue *q = detect_tvs[i]->stream_pq;
+                SCMutexLock(&q->mutex_q);
+                PacketEnqueue(q, p);
+                SCCondSignal(&q->cond_q);
+                SCMutexUnlock(&q->mutex_q);
+            }
+        }
+    }
+    SCLogDebug("leaving: thread notification count = %d", count);
+}
+
+/** \internal
+ *  \brief inject a pseudo packet into each detect thread
+ *      -that doesn't use the new det_ctx yet
+ *      -*or*, if the thread should flush its output logs.
+ */
+static void InjectPackets(
+        ThreadVars **detect_tvs, DetectEngineThreadCtx **new_det_ctx, int no_of_detect_tvs)
+{
+    /* inject a fake packet if the detect thread that needs it. This function
+     * is called if
+     *  - A thread isn't using a DE ctx and should
+     *  - Or, it should process a pseudo packet and flush its output logs.
+     * to speed the process. */
     for (int i = 0; i < no_of_detect_tvs; i++) {
         if (SC_ATOMIC_GET(new_det_ctx[i]->so_far_used_by_detect) != 1) {
             if (detect_tvs[i]->inq != NULL) {
index 4ccf9249d9c59b9cb9167dcfa607751c2821c788..b59b42888174ed9772650eb670c8a96fc44b0b2c 100644 (file)
@@ -212,4 +212,6 @@ void DetectEngineStateResetTxs(Flow *f);
 
 void DeStateRegisterTests(void);
 
+/* packet injection */
+void InjectPacketsForFlush(ThreadVars **detect_tvs, int no_of_detect_tvs);
 #endif /* SURICATA_DETECT_ENGINE_H */
index 425b016b09f24cde19b7badc9e552a274a0b9f50..645bfd6a654a636f3ceb9429f018766888a6b984 100644 (file)
@@ -73,6 +73,8 @@ typedef struct FlowWorkerThreadData_ {
 
     SC_ATOMIC_DECLARE(DetectEngineThreadCtxPtr, detect_thread);
 
+    SC_ATOMIC_DECLARE(bool, flush_ack);
+
     void *output_thread; /* Output thread data. */
     void *output_thread_flow; /* Output thread data. */
 
@@ -554,6 +556,15 @@ static TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data)
 
     SCLogDebug("packet %"PRIu64, p->pcap_cnt);
 
+    if ((PKT_IS_FLUSHPKT(p))) {
+        SCLogDebug("thread %s flushing", tv->printable_name);
+        OutputLoggerFlush(tv, p, fw->output_thread);
+        /* Ack if a flush was requested */
+        bool notset = false;
+        SC_ATOMIC_CAS(&fw->flush_ack, notset, true);
+        return TM_ECODE_OK;
+    }
+
     /* handle Flow */
     if (p->flags & PKT_WANTS_FLOW) {
         FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_FLOW);
@@ -723,6 +734,23 @@ void *FlowWorkerGetDetectCtxPtr(void *flow_worker)
     return SC_ATOMIC_GET(fw->detect_thread);
 }
 
+void *FlowWorkerGetThreadData(void *flow_worker)
+{
+    return (FlowWorkerThreadData *)flow_worker;
+}
+
+bool FlowWorkerGetFlushAck(void *flow_worker)
+{
+    FlowWorkerThreadData *fw = flow_worker;
+    return SC_ATOMIC_GET(fw->flush_ack) == true;
+}
+
+void FlowWorkerSetFlushAck(void *flow_worker)
+{
+    FlowWorkerThreadData *fw = flow_worker;
+    SC_ATOMIC_SET(fw->flush_ack, false);
+}
+
 const char *ProfileFlowWorkerIdToString(enum ProfileFlowWorkerId fwi)
 {
     switch (fwi) {
index 951878eb062e7b7c6f612e060cf678a0f1f1ed2d..6bdea551935d26ba937bd35dcbc5169899118b1c 100644 (file)
@@ -32,6 +32,9 @@ const char *ProfileFlowWorkerIdToString(enum ProfileFlowWorkerId fwi);
 
 void FlowWorkerReplaceDetectCtx(void *flow_worker, void *detect_ctx);
 void *FlowWorkerGetDetectCtxPtr(void *flow_worker);
+void *FlowWorkerGetThreadData(void *flow_worker);
+bool FlowWorkerGetFlushAck(void *flow_worker);
+void FlowWorkerSetFlushAck(void *flow_worker);
 
 void TmModuleFlowWorkerRegister (void);
 
diff --git a/src/log-flush.c b/src/log-flush.c
new file mode 100644 (file)
index 0000000..a8c94e5
--- /dev/null
@@ -0,0 +1,199 @@
+/* Copyright (C) 2024 Open Information Security Foundation
+ *
+ * You can copy, redistribute or modify this Program under the terms of
+ * the GNU General Public License version 2 as published by the Free
+ * Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301, USA.
+ */
+
+/**
+ * \file
+ *
+ * \author Jeff Lucovsky <jlucovsky@oisf.net>
+ */
+
+#include "suricata-common.h"
+#include "suricata.h"
+#include "detect.h"
+#include "detect-engine.h"
+#include "flow-worker.h"
+#include "log-flush.h"
+#include "tm-threads.h"
+#include "conf.h"
+#include "conf-yaml-loader.h"
+#include "util-privs.h"
+
+/**
+ * \brief Trigger detect threads to flush their output logs
+ *
+ * This function is intended to be called at regular intervals to force
+ * buffered log data to be persisted
+ */
+static void WorkerFlushLogs(void)
+{
+    SCEnter();
+
+    /* count detect threads in use */
+    uint32_t no_of_detect_tvs = TmThreadCountThreadsByTmmFlags(TM_FLAG_DETECT_TM);
+    /* can be zero in unix socket mode */
+    if (no_of_detect_tvs == 0) {
+        return;
+    }
+
+    /* prepare swap structures */
+    void *fw_threads[no_of_detect_tvs];
+    ThreadVars *detect_tvs[no_of_detect_tvs];
+    memset(fw_threads, 0x00, (no_of_detect_tvs * sizeof(void *)));
+    memset(detect_tvs, 0x00, (no_of_detect_tvs * sizeof(ThreadVars *)));
+
+    /* start by initiating the log flushes */
+
+    uint32_t i = 0;
+    SCMutexLock(&tv_root_lock);
+    /* get reference to tv's and setup fw_threads array */
+    for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) {
+        if ((tv->tmm_flags & TM_FLAG_DETECT_TM) == 0) {
+            continue;
+        }
+        for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) {
+            TmModule *tm = TmModuleGetById(s->tm_id);
+            if (!(tm->flags & TM_FLAG_DETECT_TM)) {
+                continue;
+            }
+
+            if (suricata_ctl_flags != 0) {
+                SCMutexUnlock(&tv_root_lock);
+                goto error;
+            }
+
+            fw_threads[i] = FlowWorkerGetThreadData(SC_ATOMIC_GET(s->slot_data));
+            if (fw_threads[i]) {
+                FlowWorkerSetFlushAck(fw_threads[i]);
+                SCLogDebug("Setting flush-ack for thread %s[i=%d]", tv->printable_name, i);
+                detect_tvs[i] = tv;
+            }
+
+            i++;
+            break;
+        }
+    }
+    BUG_ON(i != no_of_detect_tvs);
+
+    SCMutexUnlock(&tv_root_lock);
+
+    SCLogDebug("Creating flush pseudo packets for %d threads", no_of_detect_tvs);
+    InjectPacketsForFlush(detect_tvs, no_of_detect_tvs);
+
+    uint32_t threads_done = 0;
+retry:
+    for (i = 0; i < no_of_detect_tvs; i++) {
+        if (suricata_ctl_flags != 0) {
+            threads_done = no_of_detect_tvs;
+            break;
+        }
+        usleep(1000);
+        if (fw_threads[i] && FlowWorkerGetFlushAck(fw_threads[i])) {
+            SCLogDebug("thread slot %d has ack'd flush request", i);
+            threads_done++;
+        } else if (detect_tvs[i]) {
+            SCLogDebug("thread slot %d not yet ack'd flush request", i);
+            TmThreadsCaptureBreakLoop(detect_tvs[i]);
+        }
+    }
+    if (threads_done < no_of_detect_tvs) {
+        threads_done = 0;
+        SleepMsec(250);
+        goto retry;
+    }
+
+error:
+    return;
+}
+
+static int OutputFlushInterval(void)
+{
+    intmax_t output_flush_interval = 0;
+    if (ConfGetInt("heartbeat.output-flush-interval", &output_flush_interval) == 0) {
+        output_flush_interval = 0;
+    }
+    if (output_flush_interval < 0 || output_flush_interval > 60) {
+        SCLogConfig("flush_interval must be 0 or less than 60; using 0");
+        output_flush_interval = 0;
+    }
+
+    return (int)output_flush_interval;
+}
+
+static void *LogFlusherWakeupThread(void *arg)
+{
+    int output_flush_interval = OutputFlushInterval();
+    /* This was checked by the logic creating this thread */
+    BUG_ON(output_flush_interval == 0);
+
+    SCLogConfig("Using output-flush-interval of %d seconds", output_flush_interval);
+    /*
+     * Calculate the number of sleep intervals based on the output flush interval. This is necessary
+     * because this thread pauses a fixed amount of time to react to shutdown situations more
+     * quickly.
+     */
+    const int log_flush_sleep_time = 500; /* milliseconds */
+    const int flush_wait_count = (1000 * output_flush_interval) / log_flush_sleep_time;
+
+    ThreadVars *tv_local = (ThreadVars *)arg;
+    SCSetThreadName(tv_local->name);
+
+    if (tv_local->thread_setup_flags != 0)
+        TmThreadSetupOptions(tv_local);
+
+    /* Set the threads capability */
+    tv_local->cap_flags = 0;
+    SCDropCaps(tv_local);
+
+    TmThreadsSetFlag(tv_local, THV_INIT_DONE | THV_RUNNING);
+
+    int wait_count = 0;
+    uint64_t worker_flush_count = 0;
+    bool run = TmThreadsWaitForUnpause(tv_local);
+    while (run) {
+        usleep(log_flush_sleep_time * 1000);
+
+        if (++wait_count == flush_wait_count) {
+            worker_flush_count++;
+            WorkerFlushLogs();
+            wait_count = 0;
+        }
+
+        if (TmThreadsCheckFlag(tv_local, THV_KILL)) {
+            break;
+        }
+    }
+
+    TmThreadsSetFlag(tv_local, THV_RUNNING_DONE);
+    TmThreadWaitForFlag(tv_local, THV_DEINIT);
+    TmThreadsSetFlag(tv_local, THV_CLOSED);
+    SCLogInfo("%s: initiated %" PRIu64 " flushes", tv_local->name, worker_flush_count);
+    return NULL;
+}
+
+void LogFlushThreads(void)
+{
+    if (0 == OutputFlushInterval()) {
+        SCLogConfig("log flusher thread not used with heartbeat.output-flush-interval of 0");
+        return;
+    }
+
+    ThreadVars *tv_log_flush =
+            TmThreadCreateMgmtThread(thread_name_heartbeat, LogFlusherWakeupThread, 1);
+    if (!tv_log_flush || (TmThreadSpawn(tv_log_flush) != 0)) {
+        FatalError("Unable to create and start log flush thread");
+    }
+}
diff --git a/src/log-flush.h b/src/log-flush.h
new file mode 100644 (file)
index 0000000..e201942
--- /dev/null
@@ -0,0 +1,26 @@
+/* Copyright (C) 2024 Open Information Security Foundation
+ *
+ * You can copy, redistribute or modify this Program under the terms of
+ * the GNU General Public License version 2 as published by the Free
+ * Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301, USA.
+ */
+
+/**
+ * \file
+ *
+ * \author Jeff Lucovsky <jlucovsky@oisf.net>
+ */
+#ifndef SURICATA_LOG_FLUSH_H__
+#define SURICATA_LOG_FLUSH_H__
+void LogFlushThreads(void);
+#endif /* SURICATA_LOG_FLUSH_H__ */
index 261fa9a5a78f39298fa0ffedecd8c6616042d1f6..845b2644093f0bd4ef693bab03fd26054d9a12b8 100644 (file)
@@ -708,6 +708,21 @@ void OutputNotifyFileRotation(void) {
     }
 }
 
+TmEcode OutputLoggerFlush(ThreadVars *tv, Packet *p, void *thread_data)
+{
+    LoggerThreadStore *thread_store = (LoggerThreadStore *)thread_data;
+    RootLogger *logger = TAILQ_FIRST(&active_loggers);
+    LoggerThreadStoreNode *thread_store_node = TAILQ_FIRST(thread_store);
+    while (logger && thread_store_node) {
+        if (logger->FlushFunc)
+            logger->FlushFunc(tv, p, thread_store_node->thread_data);
+
+        logger = TAILQ_NEXT(logger, entries);
+        thread_store_node = TAILQ_NEXT(thread_store_node, entries);
+    }
+    return TM_ECODE_OK;
+}
+
 TmEcode OutputLoggerLog(ThreadVars *tv, Packet *p, void *thread_data)
 {
     LoggerThreadStore *thread_store = (LoggerThreadStore *)thread_data;
index 728a2310e617b059acb9f9e137db73ae486563fe..bc8086fc4a821d16d9d9e24afa9620e07bd9aad2 100644 (file)
@@ -164,6 +164,7 @@ void OutputRegisterRootLogger(ThreadInitFunc ThreadInit, ThreadDeinitFunc Thread
 void TmModuleLoggerRegister(void);
 
 TmEcode OutputLoggerLog(ThreadVars *, Packet *, void *);
+TmEcode OutputLoggerFlush(ThreadVars *, Packet *, void *);
 TmEcode OutputLoggerThreadInit(ThreadVars *, const void *, void **);
 TmEcode OutputLoggerThreadDeinit(ThreadVars *, void *);
 void OutputLoggerExitPrintStats(ThreadVars *, void *);
index dd322cf7daf2a9393c9a0e27b41be1ae3ffd1837..87f1e5cef49e2bbefcd7732280d49616af5e5995 100644 (file)
@@ -28,6 +28,7 @@
 #include "util-debug.h"
 #include "util-affinity.h"
 #include "conf.h"
+#include "log-flush.h"
 #include "runmodes.h"
 #include "runmode-af-packet.h"
 #include "runmode-af-xdp.h"
@@ -72,6 +73,7 @@ const char *thread_name_unix_socket = "US";
 const char *thread_name_detect_loader = "DL";
 const char *thread_name_counter_stats = "CS";
 const char *thread_name_counter_wakeup = "CW";
+const char *thread_name_heartbeat = "HB";
 
 /**
  * \brief Holds description for a runmode.
@@ -436,6 +438,7 @@ void RunModeDispatch(int runmode, const char *custom_mode, const char *capture_p
             BypassedFlowManagerThreadSpawn();
         }
         StatsSpawnThreads();
+        LogFlushThreads();
         TmThreadsSealThreads();
     }
 }
index cce5fcbbaa425716a105aab78ee3401f2be4c5d5..18fd5886d3e607da6553a8b60696d370a4bd2150 100644 (file)
@@ -73,6 +73,7 @@ extern const char *thread_name_unix_socket;
 extern const char *thread_name_detect_loader;
 extern const char *thread_name_counter_stats;
 extern const char *thread_name_counter_wakeup;
+extern const char *thread_name_heartbeat;
 
 char *RunmodeGetActive(void);
 const char *RunModeGetMainMode(void);
index 2c9fbe04dd76a78ebc274af7e707117f9843fed8..ab8f48d5ba78c84daba4a6f7989d693dc1acd33c 100644 (file)
@@ -3021,7 +3021,7 @@ void SuricataPostInit(void)
 #if defined(HAVE_SYS_RESOURCE_H)
 #ifdef linux
         if (geteuid() == 0) {
-            SCLogWarning("setrlimit has no effet when running as root.");
+            SCLogWarning("setrlimit has no effect when running as root.");
         }
 #endif
         struct rlimit r = { 0, 0 };
index 70209f393e9af2fa20c4f8ba3b2e271310d288dc..2448cb9f739d93a92a33f619e87d77a3b1df702f 100644 (file)
@@ -152,6 +152,7 @@ typedef struct SCInstance_ {
     int offline;
     int verbose;
     int checksum_validation;
+    int output_flush_interval;
 
     struct timeval start_time;
 
index b2c69f9ec6d06eca1caa9f038bca0d837d83b11d..dd376fe59913ae4dff9ad4624f5276358aa1b005 100644 (file)
@@ -572,19 +572,20 @@ outputs:
       scripts:
       #   - script1.lua
 
-# Logging configuration.  This is not about logging IDS alerts/events, but
-# output about what Suricata is doing, like startup messages, errors, etc.
-logging:
-  # The flush-interval governs how often Suricata will instruct the detection
-  # threads to flush their EVE output. Specify the value in seconds [1-60]
+heartbeat:
+  # The output-flush-interval value governs how often Suricata will instruct the
+  # detection threads to flush their EVE output. Specify the value in seconds [1-60]
   # and Suricata will initiate EVE log output flushes at that interval. A value
   # of 0 means no EVE log output flushes are initiated. When the EVE output
   # buffer-size value is non-zero, some EVE output that was written may remain
-  # buffered. The flush-interval governs how much buffered data exists.
+  # buffered. The output-flush-interval governs how much buffered data exists.
   #
   # The default value is: 0 (never instruct detection threads to flush output)
-  #flush-interval: 0
+  #output-flush-interval: 0
 
+# Logging configuration.  This is not about logging IDS alerts/events, but
+# output about what Suricata is doing, like startup messages, errors, etc.
+logging:
   # The default log level: can be overridden in an output section.
   # Note that debug level logging will only be emitted if Suricata was
   # compiled with the --enable-debug configure option.