]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
threads: fix autofp shutdown race condition
authorVictor Julien <vjulien@oisf.net>
Wed, 30 Apr 2025 08:20:10 +0000 (10:20 +0200)
committerVictor Julien <victor@inliniac.net>
Thu, 26 Jun 2025 21:19:59 +0000 (23:19 +0200)
Sometimes a single flow pcap would log 2 flows. It turns out FlowWorkToDoCleanup
ran before all the packet threads had processed their "wire" packets. It then
removed a flow that a wire packet would still have needed, leading to the worker
thread creating a new flow for it.

This could happen due to the logic in TmThreadDisableReceiveThreads which calls
TmThreadDrainPacketThreads to made sure it only returns when all autofp-workers
have processed all the packets the autofp-capture thread fed to them.

However, the way it checked this is by checking the size of the autofp-worker's
input queue. If 0, it assumes it is done.

What this missed, is that a worker thread could have just taken the last packet
from the input queue, but it is not yet done processing it. If then the
FlowWorkToDoCleanup is ran as well, it would race the worker thread to the flow
handling logic. When it won, the flow was evicted and the packet thread
created a new flow.

This patch improves the shutdown logic to force the worker threads to
enter a "flow loop" (THV_FLOW_LOOP) state before moving on to the
FlowWorkToDoCleanup step. This makes sure that any in progress packets
in the worker threads have been processed.

Bug: #7681.
(cherry picked from commit 12f8f03532e57fe8168c40d6a14b34b8954bd3e5)

src/suricata.c
src/threadvars.h
src/tm-threads.c
src/tm-threads.h

index 75c4b6ac49dcb0c647260ef2daeeedff727c2ee2..0afb332d61f2012fcb8cefcc1645f243d22319e9 100644 (file)
@@ -2275,9 +2275,14 @@ void PostRunDeinit(const int runmode, struct timeval *start_time)
     /* handle graceful shutdown of the flow engine, it's helper
      * threads and the packet threads */
     FlowDisableFlowManagerThread();
+    /* disable capture */
     TmThreadDisableReceiveThreads();
+    /* tell packet threads to enter flow timeout loop */
+    TmThreadDisablePacketThreads(THV_REQ_FLOW_LOOP, THV_FLOW_LOOP);
+    /* run cleanup on the flow hash */
     FlowForceReassembly();
-    TmThreadDisablePacketThreads();
+    /* gracefully shut down packet threads */
+    TmThreadDisablePacketThreads(THV_KILL, THV_RUNNING_DONE);
     SCPrintElapsedTime(start_time);
     FlowDisableFlowRecyclerThread();
 
index 3610e858adb5fbd51eaf177f8c057864b701a2ad..66983d49e6fe2debcd671c97c601197111c54d8f 100644 (file)
@@ -44,7 +44,7 @@ struct TmSlot_;
 #define THV_DEINIT              BIT_U32(7)
 #define THV_RUNNING_DONE        BIT_U32(8)  /** thread has completed running and is entering
                                          * the de-init phase */
-#define THV_KILL_PKTACQ         BIT_U32(9)  /**< flag thread to stop packet acq */
+#define THV_REQ_FLOW_LOOP       BIT_U32(9)  /**< request thread to enter flow timeout loop */
 #define THV_FLOW_LOOP           BIT_U32(10) /**< thread is in flow shutdown loop */
 
 /** signal thread's capture method to create a fake packet to force through
index c7b64a39be0425c1c3bf31c44a6f41fb1f481714..1885b16cbe9904f38be81754eacca10cbc397653 100644 (file)
@@ -369,7 +369,7 @@ static void *TmThreadsSlotPktAcqLoop(void *td)
             TmThreadsSetFlag(tv, THV_FAILED);
             run = 0;
         }
-        if (TmThreadsCheckFlag(tv, THV_KILL_PKTACQ) || suricata_ctl_flags) {
+        if (TmThreadsCheckFlag(tv, THV_REQ_FLOW_LOOP) || suricata_ctl_flags) {
             run = 0;
         }
         if (r == TM_ECODE_DONE) {
@@ -507,37 +507,16 @@ static void *TmThreadsSlotVar(void *td)
             TmThreadsHandleInjectedPackets(tv);
         }
 
-        if (TmThreadsCheckFlag(tv, THV_KILL)) {
+        if (TmThreadsCheckFlag(tv, THV_REQ_FLOW_LOOP)) {
             run = 0;
         }
-    } /* while (run) */
-    StatsSyncCounters(tv);
-
-    TmThreadsSetFlag(tv, THV_RUNNING_DONE);
-    TmThreadWaitForFlag(tv, THV_DEINIT);
-
-    PacketPoolDestroy();
-
-    s = (TmSlot *)tv->tm_slots;
-
-    for ( ; s != NULL; s = s->slot_next) {
-        if (s->SlotThreadExitPrintStats != NULL) {
-            s->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(s->slot_data));
-        }
-
-        if (s->SlotThreadDeinit != NULL) {
-            r = s->SlotThreadDeinit(tv, SC_ATOMIC_GET(s->slot_data));
-            if (r != TM_ECODE_OK) {
-                TmThreadsSetFlag(tv, THV_CLOSED);
-                goto error;
-            }
-        }
     }
+    if (!SCTmThreadsSlotPktAcqLoopFinish(tv)) {
+        goto error;
+    }
+    StatsSyncCounters(tv);
 
-    SCLogDebug("%s ending", tv->name);
-    tv->stream_pq = NULL;
-    TmThreadsSetFlag(tv, THV_CLOSED);
-    pthread_exit((void *) 0);
+    pthread_exit(NULL);
     return NULL;
 
 error:
@@ -1451,7 +1430,7 @@ again:
             if (tm && tm->PktAcqBreakLoop != NULL) {
                 tm->PktAcqBreakLoop(tv, SC_ATOMIC_GET(slots->slot_data));
             }
-            TmThreadsSetFlag(tv, THV_KILL_PKTACQ);
+            TmThreadsSetFlag(tv, THV_REQ_FLOW_LOOP);
 
             if (tv->inq != NULL) {
                 for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
@@ -1505,8 +1484,17 @@ static void TmThreadDebugValidateNoMorePackets(void)
 
 /**
  * \brief Disable all packet threads
+ * \param set flag to set
+ * \param check flag to check
+ *
+ * Support 2 stages in shutting down the packet threads:
+ * 1. set THV_REQ_FLOW_LOOP and wait for THV_FLOW_LOOP
+ * 2. set THV_KILL and wait for THV_RUNNING_DONE
+ *
+ * During step 1 the main loop is exited, and the flow loop logic is entered.
+ * During step 2, the flow loop logic is done and the thread closes.
  */
-void TmThreadDisablePacketThreads(void)
+void TmThreadDisablePacketThreads(const uint16_t set, const uint16_t check)
 {
     struct timeval start_ts;
     struct timeval cur_ts;
@@ -1530,7 +1518,7 @@ again:
     /* loop through the packet threads and kill them */
     SCMutexLock(&tv_root_lock);
     for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) {
-        TmThreadsSetFlag(tv, THV_KILL);
+        TmThreadsSetFlag(tv, set);
 
         /* separate worker threads (autofp) will still wait at their
          * input queues. So nudge them here so they will observe the
@@ -1544,7 +1532,8 @@ again:
             SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
         }
 
-        while (!TmThreadsCheckFlag(tv, THV_RUNNING_DONE)) {
+        /* wait for it to reach the expected state */
+        while (!TmThreadsCheckFlag(tv, check)) {
             SCMutexUnlock(&tv_root_lock);
 
             SleepMsec(1);
index 4ca55f9bc72cdec4ed4623fc6ac1bea2e3d3d38e..7698c3519aba903e219076e2502aecbd08aa8dc0 100644 (file)
@@ -121,7 +121,7 @@ void TmThreadWaitForFlag(ThreadVars *, uint32_t);
 
 TmEcode TmThreadsSlotVarRun (ThreadVars *tv, Packet *p, TmSlot *slot);
 
-void TmThreadDisablePacketThreads(void);
+void TmThreadDisablePacketThreads(const uint16_t set, const uint16_t check);
 void TmThreadDisableReceiveThreads(void);
 
 uint32_t TmThreadCountThreadsByTmmFlags(uint8_t flags);