ippair-storage.h \
ippair-timeout.h \
log-cf-common.h \
+ log-flush.h \
log-httplog.h \
log-pcap.h \
log-stats.h \
ippair-storage.c \
ippair-timeout.c \
log-cf-common.c \
+ log-flush.c \
log-httplog.c \
log-pcap.c \
log-stats.c \
}
/** \internal
- * \brief inject a pseudo packet into each detect thread that doesn't use the
- * new det_ctx yet
+ * \brief inject a pseudo packet into each detect thread
+ * if the thread should flush its output logs.
*/
-static void InjectPackets(ThreadVars **detect_tvs,
- DetectEngineThreadCtx **new_det_ctx,
- int no_of_detect_tvs)
-{
- /* inject a fake packet if the detect thread isn't using the new ctx yet,
- * this speeds up the process */
+void InjectPacketsForFlush(ThreadVars **detect_tvs, int no_of_detect_tvs)
+{
+ /* inject a fake packet if the detect thread that needs it. This function
+ * is called when a heartbeat log-flush request has been made
+ * and it should process a pseudo packet and flush its output logs
+ * to speed the process. */
+#if DEBUG
+ int count = 0;
+#endif
+ for (int i = 0; i < no_of_detect_tvs; i++) {
+ if (detect_tvs[i]) { // && detect_tvs[i]->inq != NULL) {
+ Packet *p = PacketGetFromAlloc();
+ if (p != NULL) {
+ SCLogDebug("Injecting pkt for tv %s[i=%d] %d", detect_tvs[i]->name, i, count++);
+ p->flags |= PKT_PSEUDO_STREAM_END;
+ p->flags |= PKT_PSEUDO_LOG_FLUSH;
+ PKT_SET_SRC(p, PKT_SRC_DETECT_RELOAD_FLUSH);
+ PacketQueue *q = detect_tvs[i]->stream_pq;
+ SCMutexLock(&q->mutex_q);
+ PacketEnqueue(q, p);
+ SCCondSignal(&q->cond_q);
+ SCMutexUnlock(&q->mutex_q);
+ }
+ }
+ }
+ SCLogDebug("leaving: thread notification count = %d", count);
+}
+
+/** \internal
+ * \brief inject a pseudo packet into each detect thread
+ * -that doesn't use the new det_ctx yet
+ * -*or*, if the thread should flush its output logs.
+ */
+static void InjectPackets(
+ ThreadVars **detect_tvs, DetectEngineThreadCtx **new_det_ctx, int no_of_detect_tvs)
+{
+ /* inject a fake packet if the detect thread that needs it. This function
+ * is called if
+ * - A thread isn't using a DE ctx and should
+ * - Or, it should process a pseudo packet and flush its output logs.
+ * to speed the process. */
for (int i = 0; i < no_of_detect_tvs; i++) {
if (SC_ATOMIC_GET(new_det_ctx[i]->so_far_used_by_detect) != 1) {
if (detect_tvs[i]->inq != NULL) {
void DeStateRegisterTests(void);
+/* packet injection */
+void InjectPacketsForFlush(ThreadVars **detect_tvs, int no_of_detect_tvs);
#endif /* SURICATA_DETECT_ENGINE_H */
SC_ATOMIC_DECLARE(DetectEngineThreadCtxPtr, detect_thread);
+ SC_ATOMIC_DECLARE(bool, flush_ack);
+
void *output_thread; /* Output thread data. */
void *output_thread_flow; /* Output thread data. */
SCLogDebug("packet %"PRIu64, p->pcap_cnt);
+ if ((PKT_IS_FLUSHPKT(p))) {
+ SCLogDebug("thread %s flushing", tv->printable_name);
+ OutputLoggerFlush(tv, p, fw->output_thread);
+ /* Ack if a flush was requested */
+ bool notset = false;
+ SC_ATOMIC_CAS(&fw->flush_ack, notset, true);
+ return TM_ECODE_OK;
+ }
+
/* handle Flow */
if (p->flags & PKT_WANTS_FLOW) {
FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_FLOW);
return SC_ATOMIC_GET(fw->detect_thread);
}
+void *FlowWorkerGetThreadData(void *flow_worker)
+{
+ return (FlowWorkerThreadData *)flow_worker;
+}
+
+bool FlowWorkerGetFlushAck(void *flow_worker)
+{
+ FlowWorkerThreadData *fw = flow_worker;
+ return SC_ATOMIC_GET(fw->flush_ack) == true;
+}
+
+void FlowWorkerSetFlushAck(void *flow_worker)
+{
+ FlowWorkerThreadData *fw = flow_worker;
+ SC_ATOMIC_SET(fw->flush_ack, false);
+}
+
const char *ProfileFlowWorkerIdToString(enum ProfileFlowWorkerId fwi)
{
switch (fwi) {
void FlowWorkerReplaceDetectCtx(void *flow_worker, void *detect_ctx);
void *FlowWorkerGetDetectCtxPtr(void *flow_worker);
+void *FlowWorkerGetThreadData(void *flow_worker);
+bool FlowWorkerGetFlushAck(void *flow_worker);
+void FlowWorkerSetFlushAck(void *flow_worker);
void TmModuleFlowWorkerRegister (void);
--- /dev/null
+/* Copyright (C) 2024 Open Information Security Foundation
+ *
+ * You can copy, redistribute or modify this Program under the terms of
+ * the GNU General Public License version 2 as published by the Free
+ * Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301, USA.
+ */
+
+/**
+ * \file
+ *
+ * \author Jeff Lucovsky <jlucovsky@oisf.net>
+ */
+
+#include "suricata-common.h"
+#include "suricata.h"
+#include "detect.h"
+#include "detect-engine.h"
+#include "flow-worker.h"
+#include "log-flush.h"
+#include "tm-threads.h"
+#include "conf.h"
+#include "conf-yaml-loader.h"
+#include "util-privs.h"
+
+/**
+ * \brief Trigger detect threads to flush their output logs
+ *
+ * This function is intended to be called at regular intervals to force
+ * buffered log data to be persisted
+ */
+static void WorkerFlushLogs(void)
+{
+ SCEnter();
+
+ /* count detect threads in use */
+ uint32_t no_of_detect_tvs = TmThreadCountThreadsByTmmFlags(TM_FLAG_DETECT_TM);
+ /* can be zero in unix socket mode */
+ if (no_of_detect_tvs == 0) {
+ return;
+ }
+
+ /* prepare swap structures */
+ void *fw_threads[no_of_detect_tvs];
+ ThreadVars *detect_tvs[no_of_detect_tvs];
+ memset(fw_threads, 0x00, (no_of_detect_tvs * sizeof(void *)));
+ memset(detect_tvs, 0x00, (no_of_detect_tvs * sizeof(ThreadVars *)));
+
+ /* start by initiating the log flushes */
+
+ uint32_t i = 0;
+ SCMutexLock(&tv_root_lock);
+ /* get reference to tv's and setup fw_threads array */
+ for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) {
+ if ((tv->tmm_flags & TM_FLAG_DETECT_TM) == 0) {
+ continue;
+ }
+ for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) {
+ TmModule *tm = TmModuleGetById(s->tm_id);
+ if (!(tm->flags & TM_FLAG_DETECT_TM)) {
+ continue;
+ }
+
+ if (suricata_ctl_flags != 0) {
+ SCMutexUnlock(&tv_root_lock);
+ goto error;
+ }
+
+ fw_threads[i] = FlowWorkerGetThreadData(SC_ATOMIC_GET(s->slot_data));
+ if (fw_threads[i]) {
+ FlowWorkerSetFlushAck(fw_threads[i]);
+ SCLogDebug("Setting flush-ack for thread %s[i=%d]", tv->printable_name, i);
+ detect_tvs[i] = tv;
+ }
+
+ i++;
+ break;
+ }
+ }
+ BUG_ON(i != no_of_detect_tvs);
+
+ SCMutexUnlock(&tv_root_lock);
+
+ SCLogDebug("Creating flush pseudo packets for %d threads", no_of_detect_tvs);
+ InjectPacketsForFlush(detect_tvs, no_of_detect_tvs);
+
+ uint32_t threads_done = 0;
+retry:
+ for (i = 0; i < no_of_detect_tvs; i++) {
+ if (suricata_ctl_flags != 0) {
+ threads_done = no_of_detect_tvs;
+ break;
+ }
+ usleep(1000);
+ if (fw_threads[i] && FlowWorkerGetFlushAck(fw_threads[i])) {
+ SCLogDebug("thread slot %d has ack'd flush request", i);
+ threads_done++;
+ } else if (detect_tvs[i]) {
+ SCLogDebug("thread slot %d not yet ack'd flush request", i);
+ TmThreadsCaptureBreakLoop(detect_tvs[i]);
+ }
+ }
+ if (threads_done < no_of_detect_tvs) {
+ threads_done = 0;
+ SleepMsec(250);
+ goto retry;
+ }
+
+error:
+ return;
+}
+
+static int OutputFlushInterval(void)
+{
+ intmax_t output_flush_interval = 0;
+ if (ConfGetInt("heartbeat.output-flush-interval", &output_flush_interval) == 0) {
+ output_flush_interval = 0;
+ }
+ if (output_flush_interval < 0 || output_flush_interval > 60) {
+ SCLogConfig("flush_interval must be 0 or less than 60; using 0");
+ output_flush_interval = 0;
+ }
+
+ return (int)output_flush_interval;
+}
+
+static void *LogFlusherWakeupThread(void *arg)
+{
+ int output_flush_interval = OutputFlushInterval();
+ /* This was checked by the logic creating this thread */
+ BUG_ON(output_flush_interval == 0);
+
+ SCLogConfig("Using output-flush-interval of %d seconds", output_flush_interval);
+ /*
+ * Calculate the number of sleep intervals based on the output flush interval. This is necessary
+ * because this thread pauses a fixed amount of time to react to shutdown situations more
+ * quickly.
+ */
+ const int log_flush_sleep_time = 500; /* milliseconds */
+ const int flush_wait_count = (1000 * output_flush_interval) / log_flush_sleep_time;
+
+ ThreadVars *tv_local = (ThreadVars *)arg;
+ SCSetThreadName(tv_local->name);
+
+ if (tv_local->thread_setup_flags != 0)
+ TmThreadSetupOptions(tv_local);
+
+ /* Set the threads capability */
+ tv_local->cap_flags = 0;
+ SCDropCaps(tv_local);
+
+ TmThreadsSetFlag(tv_local, THV_INIT_DONE | THV_RUNNING);
+
+ int wait_count = 0;
+ uint64_t worker_flush_count = 0;
+ bool run = TmThreadsWaitForUnpause(tv_local);
+ while (run) {
+ usleep(log_flush_sleep_time * 1000);
+
+ if (++wait_count == flush_wait_count) {
+ worker_flush_count++;
+ WorkerFlushLogs();
+ wait_count = 0;
+ }
+
+ if (TmThreadsCheckFlag(tv_local, THV_KILL)) {
+ break;
+ }
+ }
+
+ TmThreadsSetFlag(tv_local, THV_RUNNING_DONE);
+ TmThreadWaitForFlag(tv_local, THV_DEINIT);
+ TmThreadsSetFlag(tv_local, THV_CLOSED);
+ SCLogInfo("%s: initiated %" PRIu64 " flushes", tv_local->name, worker_flush_count);
+ return NULL;
+}
+
+void LogFlushThreads(void)
+{
+ if (0 == OutputFlushInterval()) {
+ SCLogConfig("log flusher thread not used with heartbeat.output-flush-interval of 0");
+ return;
+ }
+
+ ThreadVars *tv_log_flush =
+ TmThreadCreateMgmtThread(thread_name_heartbeat, LogFlusherWakeupThread, 1);
+ if (!tv_log_flush || (TmThreadSpawn(tv_log_flush) != 0)) {
+ FatalError("Unable to create and start log flush thread");
+ }
+}
--- /dev/null
+/* Copyright (C) 2024 Open Information Security Foundation
+ *
+ * You can copy, redistribute or modify this Program under the terms of
+ * the GNU General Public License version 2 as published by the Free
+ * Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301, USA.
+ */
+
+/**
+ * \file
+ *
+ * \author Jeff Lucovsky <jlucovsky@oisf.net>
+ */
+#ifndef SURICATA_LOG_FLUSH_H__
+#define SURICATA_LOG_FLUSH_H__
+void LogFlushThreads(void);
+#endif /* SURICATA_LOG_FLUSH_H__ */
}
}
+TmEcode OutputLoggerFlush(ThreadVars *tv, Packet *p, void *thread_data)
+{
+ LoggerThreadStore *thread_store = (LoggerThreadStore *)thread_data;
+ RootLogger *logger = TAILQ_FIRST(&active_loggers);
+ LoggerThreadStoreNode *thread_store_node = TAILQ_FIRST(thread_store);
+ while (logger && thread_store_node) {
+ if (logger->FlushFunc)
+ logger->FlushFunc(tv, p, thread_store_node->thread_data);
+
+ logger = TAILQ_NEXT(logger, entries);
+ thread_store_node = TAILQ_NEXT(thread_store_node, entries);
+ }
+ return TM_ECODE_OK;
+}
+
TmEcode OutputLoggerLog(ThreadVars *tv, Packet *p, void *thread_data)
{
LoggerThreadStore *thread_store = (LoggerThreadStore *)thread_data;
void TmModuleLoggerRegister(void);
TmEcode OutputLoggerLog(ThreadVars *, Packet *, void *);
+TmEcode OutputLoggerFlush(ThreadVars *, Packet *, void *);
TmEcode OutputLoggerThreadInit(ThreadVars *, const void *, void **);
TmEcode OutputLoggerThreadDeinit(ThreadVars *, void *);
void OutputLoggerExitPrintStats(ThreadVars *, void *);
#include "util-debug.h"
#include "util-affinity.h"
#include "conf.h"
+#include "log-flush.h"
#include "runmodes.h"
#include "runmode-af-packet.h"
#include "runmode-af-xdp.h"
const char *thread_name_detect_loader = "DL";
const char *thread_name_counter_stats = "CS";
const char *thread_name_counter_wakeup = "CW";
+const char *thread_name_heartbeat = "HB";
/**
* \brief Holds description for a runmode.
BypassedFlowManagerThreadSpawn();
}
StatsSpawnThreads();
+ LogFlushThreads();
TmThreadsSealThreads();
}
}
extern const char *thread_name_detect_loader;
extern const char *thread_name_counter_stats;
extern const char *thread_name_counter_wakeup;
+extern const char *thread_name_heartbeat;
char *RunmodeGetActive(void);
const char *RunModeGetMainMode(void);
#if defined(HAVE_SYS_RESOURCE_H)
#ifdef linux
if (geteuid() == 0) {
- SCLogWarning("setrlimit has no effet when running as root.");
+ SCLogWarning("setrlimit has no effect when running as root.");
}
#endif
struct rlimit r = { 0, 0 };
int offline;
int verbose;
int checksum_validation;
+ int output_flush_interval;
struct timeval start_time;
scripts:
# - script1.lua
-# Logging configuration. This is not about logging IDS alerts/events, but
-# output about what Suricata is doing, like startup messages, errors, etc.
-logging:
- # The flush-interval governs how often Suricata will instruct the detection
- # threads to flush their EVE output. Specify the value in seconds [1-60]
+heartbeat:
+ # The output-flush-interval value governs how often Suricata will instruct the
+ # detection threads to flush their EVE output. Specify the value in seconds [1-60]
# and Suricata will initiate EVE log output flushes at that interval. A value
# of 0 means no EVE log output flushes are initiated. When the EVE output
# buffer-size value is non-zero, some EVE output that was written may remain
- # buffered. The flush-interval governs how much buffered data exists.
+ # buffered. The output-flush-interval governs how much buffered data exists.
#
# The default value is: 0 (never instruct detection threads to flush output)
- #flush-interval: 0
+ #output-flush-interval: 0
+# Logging configuration. This is not about logging IDS alerts/events, but
+# output about what Suricata is doing, like startup messages, errors, etc.
+logging:
# The default log level: can be overridden in an output section.
# Note that debug level logging will only be emitted if Suricata was
# compiled with the --enable-debug configure option.