]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
threads: fix autofp shutdown race condition
authorVictor Julien <vjulien@oisf.net>
Wed, 30 Apr 2025 08:20:10 +0000 (10:20 +0200)
committerVictor Julien <victor@inliniac.net>
Wed, 30 Apr 2025 20:22:23 +0000 (22:22 +0200)
Sometimes a single flow pcap would log 2 flows. It turns out FlowWorkToDoCleanup
ran before all the packet threads had processed their "wire" packets. It then
removed a flow that a wire packet would still have needed, leading to the worker
thread creating a new flow for it.

This could happen due to the logic in TmThreadDisableReceiveThreads which calls
TmThreadDrainPacketThreads to made sure it only returns when all autofp-workers
have processed all the packets the autofp-capture thread fed to them.

However, the way it checked this is by checking the size of the autofp-worker's
input queue. If 0, it assumes it is done.

What this missed, is that a worker thread could have just taken the last packet
from the input queue, but it is not yet done processing it. If then the
FlowWorkToDoCleanup is ran as well, it would race the worker thread to the flow
handling logic. When it won, the flow was evicted and the packet thread
created a new flow.

This patch improves the shutdown logic to force the worker threads to
enter a "flow loop" (THV_FLOW_LOOP) state before moving on to the
FlowWorkToDoCleanup step. This makes sure that any in progress packets
in the worker threads have been processed.

Bug: #7681.

src/suricata.c
src/threadvars.h
src/tm-threads.c
src/tm-threads.h

index 6313b01babcc02f134b3b080902354bede664f4c..db3373def00547e20919a8138bd0d2a68e2745a8 100644 (file)
@@ -2283,9 +2283,14 @@ void PostRunDeinit(const int runmode, struct timeval *start_time)
     /* handle graceful shutdown of the flow engine, it's helper
      * threads and the packet threads */
     FlowDisableFlowManagerThread();
+    /* disable capture */
     TmThreadDisableReceiveThreads();
+    /* tell packet threads to enter flow timeout loop */
+    TmThreadDisablePacketThreads(THV_REQ_FLOW_LOOP, THV_FLOW_LOOP);
+    /* run cleanup on the flow hash */
     FlowWorkToDoCleanup();
-    TmThreadDisablePacketThreads();
+    /* gracefully shut down packet threads */
+    TmThreadDisablePacketThreads(THV_KILL, THV_RUNNING_DONE);
     SCPrintElapsedTime(start_time);
     FlowDisableFlowRecyclerThread();
 
index b7b39ad49130dd0cf4e02ac3a4708828bb8a16bc..d645d0645ec75b011424942c2fbddbd5c612acb2 100644 (file)
@@ -45,7 +45,7 @@ struct TmSlot_;
 #define THV_DEINIT              BIT_U32(7)
 #define THV_RUNNING_DONE        BIT_U32(8)  /** thread has completed running and is entering
                                          * the de-init phase */
-#define THV_KILL_PKTACQ         BIT_U32(9)  /**< flag thread to stop packet acq */
+#define THV_REQ_FLOW_LOOP       BIT_U32(9)  /**< request thread to enter flow timeout loop */
 #define THV_FLOW_LOOP           BIT_U32(10) /**< thread is in flow shutdown loop */
 
 /** signal thread's capture method to create a fake packet to force through
index d8246cb12b21d31a47b20ca4b9e039ae74ff8de8..ee4a3e750f3e254c3d9a8d65f7745e7a20131573 100644 (file)
@@ -337,7 +337,7 @@ static void *TmThreadsSlotPktAcqLoop(void *td)
             TmThreadsSetFlag(tv, THV_FAILED);
             run = false;
         }
-        if (TmThreadsCheckFlag(tv, THV_KILL_PKTACQ) || suricata_ctl_flags) {
+        if (TmThreadsCheckFlag(tv, THV_REQ_FLOW_LOOP) || suricata_ctl_flags) {
             run = false;
         }
         if (r == TM_ECODE_DONE) {
@@ -517,37 +517,16 @@ static void *TmThreadsSlotVar(void *td)
             TmThreadsHandleInjectedPackets(tv);
         }
 
-        if (TmThreadsCheckFlag(tv, THV_KILL)) {
+        if (TmThreadsCheckFlag(tv, THV_REQ_FLOW_LOOP)) {
             run = false;
         }
-    } /* while (run) */
-    StatsSyncCounters(tv);
-
-    TmThreadsSetFlag(tv, THV_RUNNING_DONE);
-    TmThreadWaitForFlag(tv, THV_DEINIT);
-
-    PacketPoolDestroy();
-
-    s = (TmSlot *)tv->tm_slots;
-
-    for ( ; s != NULL; s = s->slot_next) {
-        if (s->SlotThreadExitPrintStats != NULL) {
-            s->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(s->slot_data));
-        }
-
-        if (s->SlotThreadDeinit != NULL) {
-            r = s->SlotThreadDeinit(tv, SC_ATOMIC_GET(s->slot_data));
-            if (r != TM_ECODE_OK) {
-                TmThreadsSetFlag(tv, THV_CLOSED);
-                goto error;
-            }
-        }
     }
+    if (!SCTmThreadsSlotPktAcqLoopFinish(tv)) {
+        goto error;
+    }
+    StatsSyncCounters(tv);
 
-    SCLogDebug("%s ending", tv->name);
-    tv->stream_pq = NULL;
-    TmThreadsSetFlag(tv, THV_CLOSED);
-    pthread_exit((void *) 0);
+    pthread_exit(NULL);
     return NULL;
 
 error:
@@ -1462,7 +1441,7 @@ again:
             if (tm && tm->PktAcqBreakLoop != NULL) {
                 tm->PktAcqBreakLoop(tv, SC_ATOMIC_GET(slots->slot_data));
             }
-            TmThreadsSetFlag(tv, THV_KILL_PKTACQ);
+            TmThreadsSetFlag(tv, THV_REQ_FLOW_LOOP);
 
             if (tv->inq != NULL) {
                 for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
@@ -1515,8 +1494,17 @@ static void TmThreadDebugValidateNoMorePackets(void)
 
 /**
  * \brief Disable all packet threads
+ * \param set flag to set
+ * \param check flag to check
+ *
+ * Support 2 stages in shutting down the packet threads:
+ * 1. set THV_REQ_FLOW_LOOP and wait for THV_FLOW_LOOP
+ * 2. set THV_KILL and wait for THV_RUNNING_DONE
+ *
+ * During step 1 the main loop is exited, and the flow loop logic is entered.
+ * During step 2, the flow loop logic is done and the thread closes.
  */
-void TmThreadDisablePacketThreads(void)
+void TmThreadDisablePacketThreads(const uint16_t set, const uint16_t check)
 {
     struct timeval start_ts;
     struct timeval cur_ts;
@@ -1540,7 +1528,7 @@ again:
     /* loop through the packet threads and kill them */
     SCMutexLock(&tv_root_lock);
     for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) {
-        TmThreadsSetFlag(tv, THV_KILL);
+        TmThreadsSetFlag(tv, set);
 
         /* separate worker threads (autofp) will still wait at their
          * input queues. So nudge them here so they will observe the
@@ -1554,7 +1542,8 @@ again:
             SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
         }
 
-        while (!TmThreadsCheckFlag(tv, THV_RUNNING_DONE)) {
+        /* wait for it to reach the expected state */
+        while (!TmThreadsCheckFlag(tv, check)) {
             SCMutexUnlock(&tv_root_lock);
 
             SleepMsec(1);
index d4c8e898a50e3114dc87d2a0925e1cdabfa2477a..83bc826826081aa857994993650be8b1bc3cafe1 100644 (file)
@@ -122,7 +122,7 @@ void TmThreadWaitForFlag(ThreadVars *, uint32_t);
 
 TmEcode TmThreadsSlotVarRun (ThreadVars *tv, Packet *p, TmSlot *slot);
 
-void TmThreadDisablePacketThreads(void);
+void TmThreadDisablePacketThreads(const uint16_t set, const uint16_t check);
 void TmThreadDisableReceiveThreads(void);
 
 uint32_t TmThreadCountThreadsByTmmFlags(uint8_t flags);