From: Victor Julien Date: Thu, 6 Apr 2023 13:43:41 +0000 (+0200) Subject: flow/worker: refresh detect thread during housekeeping X-Git-Tag: suricata-6.0.11~9 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=6658300c84fe5c1a883667e6e9d87637ed40da01;p=thirdparty%2Fsuricata.git flow/worker: refresh detect thread during housekeeping During housekeeping multiple flows are processed. If a rule reload happens at that time, we need to use the new detect thread as soon as possible. Bug: #5969. (cherry picked from commit 5e4cf182abf72ab33e56d4e17c261c025b4ce766) --- diff --git a/src/flow-worker.c b/src/flow-worker.c index 3b7c69be80..ab8a41c586 100644 --- a/src/flow-worker.c +++ b/src/flow-worker.c @@ -161,9 +161,7 @@ static int FlowFinish(ThreadVars *tv, Flow *f, FlowWorkerThreadData *fw, void *d return 1; } -static void CheckWorkQueue(ThreadVars *tv, FlowWorkerThreadData *fw, - void *detect_thread, // TODO proper type? - FlowTimeoutCounters *counters, +static void CheckWorkQueue(ThreadVars *tv, FlowWorkerThreadData *fw, FlowTimeoutCounters *counters, FlowQueuePrivate *fq) { Flow *f; @@ -174,6 +172,8 @@ static void CheckWorkQueue(ThreadVars *tv, FlowWorkerThreadData *fw, if (f->proto == IPPROTO_TCP) { if (!(f->flags & FLOW_TIMEOUT_REASSEMBLY_DONE) && !FlowIsBypassed(f) && FlowForceReassemblyNeedReassembly(f) == 1 && f->ffr != 0) { + /* read detect thread in case we're doing a reload */ + void *detect_thread = SC_ATOMIC_GET(fw->detect_thread); int cnt = FlowFinish(tv, f, fw, detect_thread); counters->flows_aside_pkt_inject += cnt; counters->flows_aside_needs_work++; @@ -447,8 +447,8 @@ static void FlowWorkerFlowTimeout(ThreadVars *tv, Packet *p, FlowWorkerThreadDat /** \internal * \brief process flows injected into our queue by other threads */ -static inline void FlowWorkerProcessInjectedFlows(ThreadVars *tv, - FlowWorkerThreadData *fw, Packet *p, void *detect_thread) +static inline void FlowWorkerProcessInjectedFlows( + ThreadVars *tv, FlowWorkerThreadData *fw, Packet *p) { /* take injected flows and append to our work queue */ FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_FLOW_INJECTED); @@ -459,7 +459,7 @@ static inline void FlowWorkerProcessInjectedFlows(ThreadVars *tv, StatsAddUI64(tv, fw->cnt.flows_injected, (uint64_t)injected.len); FlowTimeoutCounters counters = { 0, 0, }; - CheckWorkQueue(tv, fw, detect_thread, &counters, &injected); + CheckWorkQueue(tv, fw, &counters, &injected); UpdateCounters(tv, fw, &counters); } FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_FLOW_INJECTED); @@ -468,15 +468,14 @@ static inline void FlowWorkerProcessInjectedFlows(ThreadVars *tv, /** \internal * \brief process flows set aside locally during flow lookup */ -static inline void FlowWorkerProcessLocalFlows(ThreadVars *tv, - FlowWorkerThreadData *fw, Packet *p, void *detect_thread) +static inline void FlowWorkerProcessLocalFlows(ThreadVars *tv, FlowWorkerThreadData *fw, Packet *p) { FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_FLOW_EVICTED); if (fw->fls.work_queue.len) { StatsAddUI64(tv, fw->cnt.flows_removed, (uint64_t)fw->fls.work_queue.len); FlowTimeoutCounters counters = { 0, 0, }; - CheckWorkQueue(tv, fw, detect_thread, &counters, &fw->fls.work_queue); + CheckWorkQueue(tv, fw, &counters, &fw->fls.work_queue); UpdateCounters(tv, fw, &counters); } FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_FLOW_EVICTED); @@ -588,10 +587,10 @@ static TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data) housekeeping: /* take injected flows and process them */ - FlowWorkerProcessInjectedFlows(tv, fw, p, detect_thread); + FlowWorkerProcessInjectedFlows(tv, fw, p); /* process local work queue */ - FlowWorkerProcessLocalFlows(tv, fw, p, detect_thread); + FlowWorkerProcessLocalFlows(tv, fw, p); return TM_ECODE_OK; }