]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
source: add THV_RUNNING flag to notify of running state
authorRichard McConnell <Richard_McConnell@rapid7.com>
Tue, 4 Oct 2022 16:13:01 +0000 (17:13 +0100)
committerVictor Julien <vjulien@oisf.net>
Thu, 27 Oct 2022 07:54:18 +0000 (09:54 +0200)
Each module (thread) updates its status to indicate running.
Main thread awaits for all threads to be in a running state
before continuing the initialisation process

Implements feature 5384
(https://redmine.openinfosecfoundation.org/issues/5384)

19 files changed:
src/counters.c
src/flow-manager.c
src/source-af-packet.c
src/source-erf-dag.c
src/source-erf-file.c
src/source-ipfw.c
src/source-napatech.c
src/source-netmap.c
src/source-nflog.c
src/source-nfq.c
src/source-pcap-file.c
src/source-pcap.c
src/source-pfring.c
src/source-windivert.c
src/suricata.c
src/threadvars.h
src/tm-threads.c
src/tm-threads.h
src/unix-manager.c

index d323bb2d5e511cfae48b620754dc8e6120b313ab..18d7e195093262f7ed2c2998fec0f880ac078cd6 100644 (file)
@@ -411,7 +411,7 @@ static void *StatsMgmtThread(void *arg)
     }
     SCLogDebug("stats_thread_data %p", &stats_thread_data);
 
-    TmThreadsSetFlag(tv_local, THV_INIT_DONE);
+    TmThreadsSetFlag(tv_local, THV_INIT_DONE | THV_RUNNING);
     while (1) {
         if (TmThreadsCheckFlag(tv_local, THV_PAUSE)) {
             TmThreadsSetFlag(tv_local, THV_PAUSED);
@@ -480,7 +480,8 @@ static void *StatsWakeupThread(void *arg)
         return NULL;
     }
 
-    TmThreadsSetFlag(tv_local, THV_INIT_DONE);
+    TmThreadsSetFlag(tv_local, THV_INIT_DONE | THV_RUNNING);
+
     while (1) {
         if (TmThreadsCheckFlag(tv_local, THV_PAUSE)) {
             TmThreadsSetFlag(tv_local, THV_PAUSED);
index b26542706b74b3cc5d92c0204d9fdaba1a3c18cb..298c8be64aa6d5335a3571e7f659cb1044c7facb 100644 (file)
@@ -799,6 +799,8 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
     GetWorkUnitSizing(rows, mp, emerg, &sleep_per_wu, &rows_per_wu, &rows_sec);
     StatsSetUI64(th_v, ftd->cnt.flow_mgr_rows_sec, rows_sec);
 
+    TmThreadsSetFlag(th_v, THV_RUNNING);
+
     while (1)
     {
         if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {
@@ -1063,6 +1065,8 @@ static TmEcode FlowRecycler(ThreadVars *th_v, void *thread_data)
     struct timeval ts;
     memset(&ts, 0, sizeof(ts));
 
+    TmThreadsSetFlag(th_v, THV_RUNNING);
+
     while (1)
     {
         if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {
index 0340d505c61a3a85ce50e4228b392f88be817a4e..c94b4a6b83a353ca1cc8ab2607aba93be03dc2f1 100644 (file)
@@ -560,7 +560,6 @@ static void AFPPeersListReachedInc(void)
         return;
 
     if ((SC_ATOMIC_ADD(peerslist.reached, 1) + 1) == peerslist.turn) {
-        SCLogInfo("All AFP capture threads are running.");
         (void)SC_ATOMIC_SET(peerslist.reached, 0);
         /* Set turn to 0 to skip syncrhonization when ReceiveAFPLoop is
          * restarted.
@@ -1339,6 +1338,10 @@ TmEcode ReceiveAFPLoop(ThreadVars *tv, void *data, void *slot)
     fds.fd = ptv->socket;
     fds.events = POLLIN;
 
+    // Indicate that the thread is actually running its application level code (i.e., it can poll
+    // packets)
+    TmThreadsSetFlag(tv, THV_RUNNING);
+
     while (1) {
         /* Start by checking the state of our interface */
         if (unlikely(ptv->afp_state == AFP_STATE_DOWN)) {
index d0925cc7a2e88aefe010152aa1d30ddf7096a245..3f6365a8508c87a91c6a3c17f9fb65469a8d3f47 100644 (file)
@@ -338,6 +338,10 @@ ReceiveErfDagLoop(ThreadVars *tv, void *data, void *slot)
 
     dtv->slot = s->slot_next;
 
+    // Indicate that the thread is actually running its application level code (i.e., it can poll
+    // packets)
+    TmThreadsSetFlag(tv, THV_RUNNING);
+
     while (1) {
         if (suricata_ctl_flags & SURICATA_STOP) {
             SCReturnInt(TM_ECODE_OK);
index bfeae1fbff063bd4483c567a4cb3ded65dc1e7a9..eaa750dbc84dc3c18db538b81205154b87efea2f 100644 (file)
@@ -116,6 +116,10 @@ TmEcode ReceiveErfFileLoop(ThreadVars *tv, void *data, void *slot)
 
     etv->slot = ((TmSlot *)slot)->slot_next;
 
+    // Indicate that the thread is actually running its application level code (i.e., it can poll
+    // packets)
+    TmThreadsSetFlag(tv, THV_RUNNING);
+
     while (1) {
         if (suricata_ctl_flags & SURICATA_STOP) {
             SCReturnInt(TM_ECODE_OK);
index f92d37fcba374b5b92d339993fc7d3244a17d59d..92c96caa337f81cb54367bc03d8a53392716318f 100644 (file)
@@ -240,6 +240,11 @@ TmEcode ReceiveIPFWLoop(ThreadVars *tv, void *data, void *slot)
 
     SCLogInfo("Thread '%s' will run on port %d (item %d)",
               tv->name, nq->port_num, ptv->ipfw_index);
+
+    // Indicate that the thread is actually running its application level code (i.e., it can poll
+    // packets)
+    TmThreadsSetFlag(tv, THV_RUNNING);
+
     while (1) {
         if (unlikely(suricata_ctl_flags != 0)) {
             SCReturnInt(TM_ECODE_OK);
index 3a9160ec015d17ed611084488bc0441b61278cd0..8d2ae69e2c618fb4713d5fa3461caeee1d876df9 100644 (file)
@@ -911,6 +911,10 @@ TmEcode NapatechPacketLoop(ThreadVars *tv, void *data, void *slot)
     TmSlot *s = (TmSlot *) slot;
     ntv->slot = s->slot_next;
 
+    // Indicate that the thread is actually running its application level code (i.e., it can poll
+    // packets)
+    TmThreadsSetFlag(tv, THV_RUNNING);
+
     while (!(suricata_ctl_flags & SURICATA_STOP)) {
         /* make sure we have at least one packet in the packet pool, to prevent
          * us from alloc'ing packets at line rate */
index 603c58b77fe23150a43c57d61d0512c10c8a0b3c..54e79b93cfa4048edf6eeb3183ab0ffc82e6cc28 100644 (file)
@@ -787,6 +787,11 @@ static TmEcode ReceiveNetmapLoop(ThreadVars *tv, void *data, void *slot)
     fds.events = POLLIN;
 
     SCLogDebug("thread %s polling on %d", tv->name, fds.fd);
+
+    // Indicate that the thread is actually running its application level code (i.e., it can poll
+    // packets)
+    TmThreadsSetFlag(tv, THV_RUNNING);
+
     for(;;) {
         if (unlikely(suricata_ctl_flags != 0)) {
             break;
index 2d83bc432b5a9ab0d34e353929f3ba56b95e7f44..e9d035e145e1eb94563a970d24bd3445b933a4da 100644 (file)
@@ -430,6 +430,10 @@ TmEcode ReceiveNFLOGLoop(ThreadVars *tv, void *data, void *slot)
         SCReturnInt(TM_ECODE_FAILED);
     }
 
+    // Indicate that the thread is actually running its application level code (i.e., it can poll
+    // packets)
+    TmThreadsSetFlag(tv, THV_RUNNING);
+
     while (1) {
         if (suricata_ctl_flags != 0)
             break;
index 0e8b1bdce63d9dff454037517e04c20eaa2d5242..d1a916f976daf162ce8b2b7db61ba0666e233a81 100644 (file)
@@ -1009,6 +1009,10 @@ TmEcode ReceiveNFQLoop(ThreadVars *tv, void *data, void *slot)
 
     ntv->slot = ((TmSlot *) slot)->slot_next;
 
+    // Indicate that the thread is actually running its application level code (i.e., it can poll
+    // packets)
+    TmThreadsSetFlag(tv, THV_RUNNING);
+
     while(1) {
         if (unlikely(suricata_ctl_flags != 0)) {
             NFQDestroyQueue(nq);
index eb2683378951cd0d3622f77bcdc836c89cda4e0a..417196556d6e38cf78a28571b86d7b505e85809d 100644 (file)
@@ -171,6 +171,10 @@ TmEcode ReceivePcapFileLoop(ThreadVars *tv, void *data, void *slot)
     ptv->shared.slot = s->slot_next;
     ptv->shared.cb_result = TM_ECODE_OK;
 
+    // Indicate that the thread is actually running its application level code (i.e., it can poll
+    // packets)
+    TmThreadsSetFlag(tv, THV_RUNNING);
+
     if(ptv->is_directory == 0) {
         SCLogInfo("Starting file run for %s", ptv->behavior.file->filename);
         status = PcapFileDispatch(ptv->behavior.file);
index 5e7ebc5f8874967fe54525ca2fc2102bffcfe939..8df56da60c232d2decc286d3b48533c97681ae63 100644 (file)
@@ -312,6 +312,10 @@ static TmEcode ReceivePcapLoop(ThreadVars *tv, void *data, void *slot)
     ptv->slot = s->slot_next;
     ptv->cb_result = TM_ECODE_OK;
 
+    // Indicate that the thread is actually running its application level code (i.e., it can poll
+    // packets)
+    TmThreadsSetFlag(tv, THV_RUNNING);
+
     while (1) {
         if (suricata_ctl_flags & SURICATA_STOP) {
             SCReturnInt(TM_ECODE_OK);
index c75b275c9e79a4c8c2bbf16efb754358bfa1d679..acd3ab8ef894cdd12a1e3dedfa7eac467538e4fc 100644 (file)
@@ -361,6 +361,10 @@ TmEcode ReceivePfringLoop(ThreadVars *tv, void *data, void *slot)
         SCReturnInt(TM_ECODE_FAILED);
     }
 
+    // Indicate that the thread is actually running its application level code (i.e., it can poll
+    // packets)
+    TmThreadsSetFlag(tv, THV_RUNNING);
+
     while(1) {
         if (suricata_ctl_flags & SURICATA_STOP) {
             SCReturnInt(TM_ECODE_OK);
index 4e7b7ac255825ffc323c9c02c16709c01533f10c..1cade18373b94f0388a8dfb17c35a787b65dca13 100644 (file)
@@ -410,6 +410,10 @@ TmEcode ReceiveWinDivertLoop(ThreadVars *tv, void *data, void *slot)
     WinDivertThreadVars *wd_tv = (WinDivertThreadVars *)data;
     wd_tv->slot = ((TmSlot *)slot)->slot_next;
 
+    // Indicate that the thread is actually running its application level code (i.e., it can poll
+    // packets)
+    TmThreadsSetFlag(tv, THV_RUNNING);
+
     while (true) {
         if (suricata_ctl_flags & SURICATA_STOP) {
             SCReturnInt(TM_ECODE_OK);
index 81c2e6802d7e3bfea8c7e3ac83c8d0531fd19ae9..335d6e301f542b14c86212e61012834e967a3c0c 100644 (file)
 #include <signal.h>
 #endif
 
+#if HAVE_LIBSYSTEMD
+#include <systemd/sd-daemon.h>
+#endif
+
 #include "suricata.h"
 
 #include "conf.h"
@@ -393,6 +397,23 @@ static void GlobalsDestroy(SCInstance *suri)
     suri->pid_filename = NULL;
 }
 
+/**
+ * \brief Used to send OS specific notification of running threads
+ *
+ * \retval TmEcode TM_ECODE_OK on success; TM_ECODE_FAILED on failure.
+ */
+static void OnNotifyRunning(void)
+{
+#if HAVE_LIBSYSTEMD
+    if (sd_notify(0, "READY=1") < 0) {
+        SCLogWarning(SC_ERR_SYSCALL, "failed to notify systemd");
+        /* Please refer to:
+         * https://www.freedesktop.org/software/systemd/man/sd_notify.html#Return%20Value
+         * for discussion on why failure should not be considered an error */
+    }
+#endif
+}
+
 /** \brief make sure threads can stop the engine by calling this
  *  function. Purpose: pcap file mode needs to be able to tell the
  *  engine the file eof is reached. */
@@ -2888,6 +2909,14 @@ int SuricataMain(int argc, char **argv)
     /* Un-pause all the paused threads */
     TmThreadContinueThreads();
 
+    /* Must ensure all threads are fully operational before continuing with init process */
+    if (TmThreadWaitOnThreadRunning() != TM_ECODE_OK) {
+        exit(EXIT_FAILURE);
+    }
+
+    /* Print notice and send OS specific notification of threads in running state */
+    OnNotifyRunning();
+
     PostRunStartedDetectSetup(&suricata);
 
     SCPledge();
index 1d866073c9f7210be775514c4c0c34209000e923..5c2efaf3b072a98fc4e705f01cd5684516e86424 100644 (file)
@@ -52,6 +52,7 @@ struct TmSlot_;
  *  rule reloads even if no packets are read by the capture method. */
 #define THV_CAPTURE_INJECT_PKT  BIT_U32(11)
 #define THV_DEAD                BIT_U32(12) /**< thread has been joined with pthread_join() */
+#define THV_RUNNING             BIT_U32(13) /**< thread is running */
 
 /** \brief Per thread variable structure */
 typedef struct ThreadVars_ {
index 4ae4aade2b1ee35f23975a989f7dacd245ddc153..7a0f1958263377392eb053e285a4a3bc5466c788 100644 (file)
@@ -428,7 +428,11 @@ static void *TmThreadsSlotVar(void *td)
 
     StatsSetupPrivate(tv);
 
-    TmThreadsSetFlag(tv, THV_INIT_DONE);
+    // Each 'worker' thread uses this func to process/decode the packet read.
+    // Each decode method is different to receive methods in that they do not
+    // enter infinite loops. They use this as the core loop. As a result, at this
+    // point the worker threads can be considered both initialized and running.
+    TmThreadsSetFlag(tv, THV_INIT_DONE | THV_RUNNING);
 
     s = (TmSlot *)tv->tm_slots;
 
@@ -1033,7 +1037,6 @@ ThreadVars *TmThreadCreatePacketHandler(const char *name, const char *inq_name,
         tv->id = TmThreadsRegisterThread(tv, tv->type);
     }
 
-
     return tv;
 }
 
@@ -1773,10 +1776,113 @@ void TmThreadWaitForFlag(ThreadVars *tv, uint32_t flags)
 void TmThreadContinue(ThreadVars *tv)
 {
     TmThreadsUnsetFlag(tv, THV_PAUSE);
-
     return;
 }
 
+/**
+ * \brief Waits for all threads to be in a running state
+ *
+ * \retval TM_ECODE_OK if all are running or error if a thread failed
+ */
+TmEcode TmThreadWaitOnThreadRunning(void)
+{
+    uint16_t RX_num = 0;
+    uint16_t W_num = 0;
+    uint16_t FM_num = 0;
+    uint16_t FR_num = 0;
+    uint16_t TX_num = 0;
+
+    struct timeval start_ts;
+    struct timeval cur_ts;
+    gettimeofday(&start_ts, NULL);
+
+again:
+    SCMutexLock(&tv_root_lock);
+    for (int i = 0; i < TVT_MAX; i++) {
+        ThreadVars *tv = tv_root[i];
+        while (tv != NULL) {
+            if (TmThreadsCheckFlag(tv, (THV_FAILED | THV_CLOSED | THV_DEAD))) {
+                SCMutexUnlock(&tv_root_lock);
+
+                SCLogError(SC_ERR_THREAD_INIT,
+                        "thread \"%s\" failed to "
+                        "start: flags %04x",
+                        tv->name, SC_ATOMIC_GET(tv->flags));
+                return TM_ECODE_FAILED;
+            }
+
+            if (!(TmThreadsCheckFlag(tv, THV_RUNNING | THV_RUNNING_DONE))) {
+                SCMutexUnlock(&tv_root_lock);
+
+                /* 60 seconds provided for the thread to transition from
+                 * THV_INIT_DONE to THV_RUNNING */
+                gettimeofday(&cur_ts, NULL);
+                if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
+                    SCLogError(SC_ERR_THREAD_INIT,
+                            "thread \"%s\" failed to "
+                            "start in time: flags %04x",
+                            tv->name, SC_ATOMIC_GET(tv->flags));
+                    return TM_ECODE_FAILED;
+                }
+
+                /* sleep a little to give the thread some
+                 * time to start running */
+                SleepUsec(100);
+                goto again;
+            }
+
+            if (strncmp(thread_name_autofp, tv->name, strlen(thread_name_autofp)) == 0)
+                RX_num++;
+            else if (strncmp(thread_name_workers, tv->name, strlen(thread_name_workers)) == 0)
+                W_num++;
+            else if (strncmp(thread_name_verdict, tv->name, strlen(thread_name_verdict)) == 0)
+                TX_num++;
+            else if (strncmp(thread_name_flow_mgr, tv->name, strlen(thread_name_flow_mgr)) == 0)
+                FM_num++;
+            else if (strncmp(thread_name_flow_rec, tv->name, strlen(thread_name_flow_rec)) == 0)
+                FR_num++;
+
+            tv = tv->next;
+        }
+    }
+    SCMutexUnlock(&tv_root_lock);
+
+    /* Construct a welcome string displaying
+     * initialized thread types and counts */
+    uint16_t app_len = 32;
+    uint16_t buf_len = 256;
+
+    char append_str[app_len];
+    char thread_counts[buf_len];
+
+    strlcpy(thread_counts, "Threads created -> ", strlen("Threads created -> ") + 1);
+    if (RX_num > 0) {
+        snprintf(append_str, app_len, "RX: %u ", RX_num);
+        strlcat(thread_counts, append_str, buf_len);
+    }
+    if (W_num > 0) {
+        snprintf(append_str, app_len, "W: %u ", W_num);
+        strlcat(thread_counts, append_str, buf_len);
+    }
+    if (TX_num > 0) {
+        snprintf(append_str, app_len, "TX: %u ", TX_num);
+        strlcat(thread_counts, append_str, buf_len);
+    }
+    if (FM_num > 0) {
+        snprintf(append_str, app_len, "FM: %u ", FM_num);
+        strlcat(thread_counts, append_str, buf_len);
+    }
+    if (FR_num > 0) {
+        snprintf(append_str, app_len, "FR: %u ", FR_num);
+        strlcat(thread_counts, append_str, buf_len);
+    }
+    snprintf(append_str, app_len, "  Engine started.");
+    strlcat(thread_counts, append_str, buf_len);
+    SCLogNotice("%s", thread_counts);
+
+    return TM_ECODE_OK;
+}
+
 /**
  * \brief Unpauses all threads present in tv_root
  */
@@ -1823,12 +1929,6 @@ void TmThreadCheckThreadState(void)
  */
 TmEcode TmThreadWaitOnThreadInit(void)
 {
-    uint16_t RX_num = 0;
-    uint16_t W_num = 0;
-    uint16_t FM_num = 0;
-    uint16_t FR_num = 0;
-    uint16_t TX_num = 0;
-
     struct timeval start_ts;
     struct timeval cur_ts;
     gettimeofday(&start_ts, NULL);
@@ -1877,55 +1977,11 @@ again:
                 return TM_ECODE_FAILED;
             }
 
-            if (strncmp(thread_name_autofp, tv->name, strlen(thread_name_autofp)) == 0)
-                RX_num++;
-            else if (strncmp(thread_name_workers, tv->name, strlen(thread_name_workers)) == 0)
-                W_num++;
-            else if (strncmp(thread_name_verdict, tv->name, strlen(thread_name_verdict)) == 0)
-                TX_num++;
-            else if (strncmp(thread_name_flow_mgr, tv->name, strlen(thread_name_flow_mgr)) == 0)
-                FM_num++;
-            else if (strncmp(thread_name_flow_rec, tv->name, strlen(thread_name_flow_rec)) == 0)
-                FR_num++;
-
             tv = tv->next;
         }
     }
     SCMutexUnlock(&tv_root_lock);
 
-    /* Construct a welcome string displaying
-     * initialized thread types and counts */
-    uint16_t app_len = 32;
-    uint16_t buf_len = 256;
-
-    char append_str[app_len];
-    char thread_counts[buf_len];
-
-    strlcpy(thread_counts, "Threads created -> ", strlen("Threads created -> ") + 1);
-    if (RX_num > 0) {
-        snprintf(append_str, app_len, "RX: %u ", RX_num);
-        strlcat(thread_counts, append_str, buf_len);
-    }
-    if (W_num > 0) {
-        snprintf(append_str, app_len, "W: %u ", W_num);
-        strlcat(thread_counts, append_str, buf_len);
-    }
-    if (TX_num > 0) {
-        snprintf(append_str, app_len, "TX: %u ", TX_num);
-        strlcat(thread_counts, append_str, buf_len);
-    }
-    if (FM_num > 0) {
-        snprintf(append_str, app_len, "FM: %u ", FM_num);
-        strlcat(thread_counts, append_str, buf_len);
-    }
-    if (FR_num > 0) {
-        snprintf(append_str, app_len, "FR: %u ", FR_num);
-        strlcat(thread_counts, append_str, buf_len);
-    }
-    snprintf(append_str, app_len, "  Engine started.");
-    strlcat(thread_counts, append_str, buf_len);
-    SCLogNotice("%s", thread_counts);
-
     return TM_ECODE_OK;
 }
 
index 624930561177ff6cab26eb587162d759d4466392..2ba6ddf3064d18e24b02f65232336d50e74e83ff 100644 (file)
@@ -123,6 +123,8 @@ void TmThreadDisableReceiveThreads(void);
 
 uint32_t TmThreadCountThreadsByTmmFlags(uint8_t flags);
 
+TmEcode TmThreadWaitOnThreadRunning(void);
+
 static inline void TmThreadsCleanDecodePQ(PacketQueueNoLock *pq)
 {
     while (1) {
index 95a0a8c14c2026b1f7be91929ca493e03a5d96c0..09488a2bbfaec4761f408d965061aa8912e3e733 100644 (file)
@@ -1130,7 +1130,8 @@ static TmEcode UnixManager(ThreadVars *th_v, void *thread_data)
     th_v->cap_flags = 0;
     SCDropCaps(th_v);
 
-    TmThreadsSetFlag(th_v, THV_INIT_DONE);
+    TmThreadsSetFlag(th_v, THV_INIT_DONE | THV_RUNNING);
+
     while (1) {
         ret = UnixMain(&command);
         if (ret == 0) {