]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
detect: only breakloop threads that are lagging
authorVictor Julien <vjulien@oisf.net>
Thu, 6 Apr 2023 14:06:34 +0000 (16:06 +0200)
committerVictor Julien <vjulien@oisf.net>
Thu, 13 Apr 2023 05:34:48 +0000 (07:34 +0200)
Sleep after all threads have been checked.

Bug: #5969.
(cherry picked from commit 8a968faa04443d31fed473cc4e358609fb925f25)

src/detect-engine.c
src/tm-threads.c
src/tm-threads.h

index 3bb3c3307c177596156a3988ba381a21937c18a1..46c5b61b2465ec4dfc7efa90d02067efbf96da16 100644 (file)
@@ -1791,40 +1791,6 @@ int DetectEngineInspectPktBufferGeneric(
     }
 }
 
-
-/* nudge capture loops to wake up */
-static void BreakCapture(void)
-{
-    SCMutexLock(&tv_root_lock);
-    for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) {
-        if ((tv->tmm_flags & TM_FLAG_RECEIVE_TM) == 0) {
-            continue;
-        }
-        /* find the correct slot */
-        for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) {
-            if (suricata_ctl_flags != 0) {
-                SCMutexUnlock(&tv_root_lock);
-                return;
-            }
-
-            TmModule *tm = TmModuleGetById(s->tm_id);
-            if (!(tm->flags & TM_FLAG_RECEIVE_TM)) {
-                continue;
-            }
-
-            /* signal capture method that we need a packet. */
-            TmThreadsSetFlag(tv, THV_CAPTURE_INJECT_PKT);
-            /* if the method supports it, BreakLoop. Otherwise we rely on
-             * the capture method's recv timeout */
-            if (tm->PktAcqLoop && tm->PktAcqBreakLoop) {
-                tm->PktAcqBreakLoop(tv, SC_ATOMIC_GET(s->slot_data));
-            }
-            break;
-        }
-    }
-    SCMutexUnlock(&tv_root_lock);
-}
-
 /** \internal
  *  \brief inject a pseudo packet into each detect thread that doesn't use the
  *         new det_ctx yet
@@ -1950,21 +1916,27 @@ static int DetectEngineReloadThreads(DetectEngineCtx *new_de_ctx)
 
     InjectPackets(detect_tvs, new_det_ctx, no_of_detect_tvs);
 
+    /* loop waiting for detect threads to switch to the new det_ctx. Try to
+     * wake up capture if needed (break loop). */
+    uint32_t threads_done = 0;
+retry:
     for (i = 0; i < no_of_detect_tvs; i++) {
-        int break_out = 0;
+        if (suricata_ctl_flags != 0) {
+            threads_done = no_of_detect_tvs;
+            break;
+        }
         usleep(1000);
-        while (SC_ATOMIC_GET(new_det_ctx[i]->so_far_used_by_detect) != 1) {
-            if (suricata_ctl_flags != 0) {
-                break_out = 1;
-                break;
-            }
-
-            BreakCapture();
-            usleep(1000);
+        if (SC_ATOMIC_GET(new_det_ctx[i]->so_far_used_by_detect) == 1) {
+            SCLogDebug("new_det_ctx - %p used by detect engine", new_det_ctx[i]);
+            threads_done++;
+        } else if (detect_tvs[i]->break_loop) {
+            TmThreadsCaptureBreakLoop(detect_tvs[i]);
         }
-        if (break_out)
-            break;
-        SCLogDebug("new_det_ctx - %p used by detect engine", new_det_ctx[i]);
+    }
+    if (threads_done < no_of_detect_tvs) {
+        threads_done = 0;
+        SleepMsec(250);
+        goto retry;
     }
 
     /* this is to make sure that if someone initiated shutdown during a live
index 367c85ab9506452d56c4081910ae08d24f776f60..deabe7333b1cc6dd5f35a6e550a6ea162c932d08 100644 (file)
@@ -2307,23 +2307,6 @@ uint16_t TmThreadsGetWorkerThreadMax(void)
     return thread_max;
 }
 
-static inline void ThreadBreakLoop(ThreadVars *tv)
-{
-    if ((tv->tmm_flags & TM_FLAG_RECEIVE_TM) == 0) {
-        return;
-    }
-    /* find the correct slot */
-    TmSlot *s = tv->tm_slots;
-    TmModule *tm = TmModuleGetById(s->tm_id);
-    if (tm->flags & TM_FLAG_RECEIVE_TM) {
-        /* if the method supports it, BreakLoop. Otherwise we rely on
-         * the capture method's recv timeout */
-        if (tm->PktAcqLoop && tm->PktAcqBreakLoop) {
-            tm->PktAcqBreakLoop(tv, SC_ATOMIC_GET(s->slot_data));
-        }
-    }
-}
-
 /**
  *  \retval r 1 if packet was accepted, 0 otherwise
  *  \note if packet was not accepted, it's still the responsibility
@@ -2353,7 +2336,7 @@ int TmThreadsInjectPacketsById(Packet **packets, const int id)
     if (tv->inq != NULL) {
         SCCondSignal(&tv->inq->pq->cond_q);
     } else if (tv->break_loop) {
-        ThreadBreakLoop(tv);
+        TmThreadsCaptureBreakLoop(tv);
     }
     return 1;
 }
@@ -2377,6 +2360,6 @@ void TmThreadsInjectFlowById(Flow *f, const int id)
     if (tv->inq != NULL) {
         SCCondSignal(&tv->inq->pq->cond_q);
     } else if (tv->break_loop) {
-        ThreadBreakLoop(tv);
+        TmThreadsCaptureBreakLoop(tv);
     }
 }
index 91cf9bbb4714f187ca0cf3a37b2170f52b9a8625..a2b998f15d7221395d2cc020c9feffe7074cba51 100644 (file)
@@ -254,6 +254,27 @@ static inline void TmThreadsCaptureHandleTimeout(ThreadVars *tv, Packet *p)
         tv->tmqh_out(tv, p);
 }
 
+static inline void TmThreadsCaptureBreakLoop(ThreadVars *tv)
+{
+    if (unlikely(!tv->break_loop))
+        return;
+
+    if ((tv->tmm_flags & TM_FLAG_RECEIVE_TM) == 0) {
+        return;
+    }
+    /* find the correct slot */
+    TmSlot *s = tv->tm_slots;
+    TmModule *tm = TmModuleGetById(s->tm_id);
+    if (tm->flags & TM_FLAG_RECEIVE_TM) {
+        /* if the method supports it, BreakLoop. Otherwise we rely on
+         * the capture method's recv timeout */
+        if (tm->PktAcqLoop && tm->PktAcqBreakLoop) {
+            tm->PktAcqBreakLoop(tv, SC_ATOMIC_GET(s->slot_data));
+        }
+        TmThreadsSetFlag(tv, THV_CAPTURE_INJECT_PKT);
+    }
+}
+
 void TmThreadsListThreads(void);
 int TmThreadsRegisterThread(ThreadVars *tv, const int type);
 void TmThreadsUnregisterThread(const int id);