From: Victor Julien Date: Thu, 6 Apr 2023 14:06:34 +0000 (+0200) Subject: detect: only breakloop threads that are lagging X-Git-Tag: suricata-7.0.0-rc2~447 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=8a968faa04443d31fed473cc4e358609fb925f25;p=thirdparty%2Fsuricata.git detect: only breakloop threads that are lagging Sleep after all threads have been checked. Bug: #5969. --- diff --git a/src/detect-engine.c b/src/detect-engine.c index 6aef3d4efe..953f99de3b 100644 --- a/src/detect-engine.c +++ b/src/detect-engine.c @@ -2112,40 +2112,6 @@ int DetectEngineInspectPktBufferGeneric( } } - -/* nudge capture loops to wake up */ -static void BreakCapture(void) -{ - SCMutexLock(&tv_root_lock); - for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) { - if ((tv->tmm_flags & TM_FLAG_RECEIVE_TM) == 0) { - continue; - } - /* find the correct slot */ - for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) { - if (suricata_ctl_flags != 0) { - SCMutexUnlock(&tv_root_lock); - return; - } - - TmModule *tm = TmModuleGetById(s->tm_id); - if (!(tm->flags & TM_FLAG_RECEIVE_TM)) { - continue; - } - - /* signal capture method that we need a packet. */ - TmThreadsSetFlag(tv, THV_CAPTURE_INJECT_PKT); - /* if the method supports it, BreakLoop. Otherwise we rely on - * the capture method's recv timeout */ - if (tm->PktAcqLoop && tm->PktAcqBreakLoop) { - tm->PktAcqBreakLoop(tv, SC_ATOMIC_GET(s->slot_data)); - } - break; - } - } - SCMutexUnlock(&tv_root_lock); -} - /** \internal * \brief inject a pseudo packet into each detect thread that doesn't use the * new det_ctx yet @@ -2271,21 +2237,27 @@ static int DetectEngineReloadThreads(DetectEngineCtx *new_de_ctx) InjectPackets(detect_tvs, new_det_ctx, no_of_detect_tvs); + /* loop waiting for detect threads to switch to the new det_ctx. Try to + * wake up capture if needed (break loop). */ + uint32_t threads_done = 0; +retry: for (i = 0; i < no_of_detect_tvs; i++) { - int break_out = 0; + if (suricata_ctl_flags != 0) { + threads_done = no_of_detect_tvs; + break; + } usleep(1000); - while (SC_ATOMIC_GET(new_det_ctx[i]->so_far_used_by_detect) != 1) { - if (suricata_ctl_flags != 0) { - break_out = 1; - break; - } - - BreakCapture(); - usleep(1000); + if (SC_ATOMIC_GET(new_det_ctx[i]->so_far_used_by_detect) == 1) { + SCLogDebug("new_det_ctx - %p used by detect engine", new_det_ctx[i]); + threads_done++; + } else if (detect_tvs[i]->break_loop) { + TmThreadsCaptureBreakLoop(detect_tvs[i]); } - if (break_out) - break; - SCLogDebug("new_det_ctx - %p used by detect engine", new_det_ctx[i]); + } + if (threads_done < no_of_detect_tvs) { + threads_done = 0; + SleepMsec(250); + goto retry; } /* this is to make sure that if someone initiated shutdown during a live diff --git a/src/tm-threads.c b/src/tm-threads.c index 69ca2e0402..e100aaa2c1 100644 --- a/src/tm-threads.c +++ b/src/tm-threads.c @@ -2272,23 +2272,6 @@ uint16_t TmThreadsGetWorkerThreadMax(void) return (uint16_t)thread_max; } -static inline void ThreadBreakLoop(ThreadVars *tv) -{ - if ((tv->tmm_flags & TM_FLAG_RECEIVE_TM) == 0) { - return; - } - /* find the correct slot */ - TmSlot *s = tv->tm_slots; - TmModule *tm = TmModuleGetById(s->tm_id); - if (tm->flags & TM_FLAG_RECEIVE_TM) { - /* if the method supports it, BreakLoop. Otherwise we rely on - * the capture method's recv timeout */ - if (tm->PktAcqLoop && tm->PktAcqBreakLoop) { - tm->PktAcqBreakLoop(tv, SC_ATOMIC_GET(s->slot_data)); - } - } -} - /** \brief inject a flow into a threads flow queue */ void TmThreadsInjectFlowById(Flow *f, const int id) @@ -2308,6 +2291,6 @@ void TmThreadsInjectFlowById(Flow *f, const int id) if (tv->inq != NULL) { SCCondSignal(&tv->inq->pq->cond_q); } else if (tv->break_loop) { - ThreadBreakLoop(tv); + TmThreadsCaptureBreakLoop(tv); } } diff --git a/src/tm-threads.h b/src/tm-threads.h index 6a5346be36..e547064bc4 100644 --- a/src/tm-threads.h +++ b/src/tm-threads.h @@ -253,6 +253,27 @@ static inline void TmThreadsCaptureHandleTimeout(ThreadVars *tv, Packet *p) tv->tmqh_out(tv, p); } +static inline void TmThreadsCaptureBreakLoop(ThreadVars *tv) +{ + if (unlikely(!tv->break_loop)) + return; + + if ((tv->tmm_flags & TM_FLAG_RECEIVE_TM) == 0) { + return; + } + /* find the correct slot */ + TmSlot *s = tv->tm_slots; + TmModule *tm = TmModuleGetById(s->tm_id); + if (tm->flags & TM_FLAG_RECEIVE_TM) { + /* if the method supports it, BreakLoop. Otherwise we rely on + * the capture method's recv timeout */ + if (tm->PktAcqLoop && tm->PktAcqBreakLoop) { + tm->PktAcqBreakLoop(tv, SC_ATOMIC_GET(s->slot_data)); + } + TmThreadsSetFlag(tv, THV_CAPTURE_INJECT_PKT); + } +} + void TmThreadsListThreads(void); int TmThreadsRegisterThread(ThreadVars *tv, const int type); void TmThreadsUnregisterThread(const int id);