]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
flow/worker: refresh detect thread during housekeeping
authorVictor Julien <vjulien@oisf.net>
Thu, 6 Apr 2023 13:43:41 +0000 (15:43 +0200)
committerVictor Julien <vjulien@oisf.net>
Thu, 13 Apr 2023 05:34:43 +0000 (07:34 +0200)
During housekeeping multiple flows are processed. If a rule reload happens
at that time, we need to use the new detect thread as soon as possible.

Bug: #5969.
(cherry picked from commit 5e4cf182abf72ab33e56d4e17c261c025b4ce766)

src/flow-worker.c

index 3b7c69be8030797bf1bffa375944397dd6b7ccf0..ab8a41c586c09abef1506412cfa605f73c591e6f 100644 (file)
@@ -161,9 +161,7 @@ static int FlowFinish(ThreadVars *tv, Flow *f, FlowWorkerThreadData *fw, void *d
     return 1;
 }
 
-static void CheckWorkQueue(ThreadVars *tv, FlowWorkerThreadData *fw,
-        void *detect_thread, // TODO proper type?
-        FlowTimeoutCounters *counters,
+static void CheckWorkQueue(ThreadVars *tv, FlowWorkerThreadData *fw, FlowTimeoutCounters *counters,
         FlowQueuePrivate *fq)
 {
     Flow *f;
@@ -174,6 +172,8 @@ static void CheckWorkQueue(ThreadVars *tv, FlowWorkerThreadData *fw,
         if (f->proto == IPPROTO_TCP) {
             if (!(f->flags & FLOW_TIMEOUT_REASSEMBLY_DONE) && !FlowIsBypassed(f) &&
                     FlowForceReassemblyNeedReassembly(f) == 1 && f->ffr != 0) {
+                /* read detect thread in case we're doing a reload */
+                void *detect_thread = SC_ATOMIC_GET(fw->detect_thread);
                 int cnt = FlowFinish(tv, f, fw, detect_thread);
                 counters->flows_aside_pkt_inject += cnt;
                 counters->flows_aside_needs_work++;
@@ -447,8 +447,8 @@ static void FlowWorkerFlowTimeout(ThreadVars *tv, Packet *p, FlowWorkerThreadDat
 /** \internal
  *  \brief process flows injected into our queue by other threads
  */
-static inline void FlowWorkerProcessInjectedFlows(ThreadVars *tv,
-        FlowWorkerThreadData *fw, Packet *p, void *detect_thread)
+static inline void FlowWorkerProcessInjectedFlows(
+        ThreadVars *tv, FlowWorkerThreadData *fw, Packet *p)
 {
     /* take injected flows and append to our work queue */
     FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_FLOW_INJECTED);
@@ -459,7 +459,7 @@ static inline void FlowWorkerProcessInjectedFlows(ThreadVars *tv,
         StatsAddUI64(tv, fw->cnt.flows_injected, (uint64_t)injected.len);
 
         FlowTimeoutCounters counters = { 0, 0, };
-        CheckWorkQueue(tv, fw, detect_thread, &counters, &injected);
+        CheckWorkQueue(tv, fw, &counters, &injected);
         UpdateCounters(tv, fw, &counters);
     }
     FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_FLOW_INJECTED);
@@ -468,15 +468,14 @@ static inline void FlowWorkerProcessInjectedFlows(ThreadVars *tv,
 /** \internal
  *  \brief process flows set aside locally during flow lookup
  */
-static inline void FlowWorkerProcessLocalFlows(ThreadVars *tv,
-        FlowWorkerThreadData *fw, Packet *p, void *detect_thread)
+static inline void FlowWorkerProcessLocalFlows(ThreadVars *tv, FlowWorkerThreadData *fw, Packet *p)
 {
     FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_FLOW_EVICTED);
     if (fw->fls.work_queue.len) {
         StatsAddUI64(tv, fw->cnt.flows_removed, (uint64_t)fw->fls.work_queue.len);
 
         FlowTimeoutCounters counters = { 0, 0, };
-        CheckWorkQueue(tv, fw, detect_thread, &counters, &fw->fls.work_queue);
+        CheckWorkQueue(tv, fw, &counters, &fw->fls.work_queue);
         UpdateCounters(tv, fw, &counters);
     }
     FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_FLOW_EVICTED);
@@ -588,10 +587,10 @@ static TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data)
 housekeeping:
 
     /* take injected flows and process them */
-    FlowWorkerProcessInjectedFlows(tv, fw, p, detect_thread);
+    FlowWorkerProcessInjectedFlows(tv, fw, p);
 
     /* process local work queue */
-    FlowWorkerProcessLocalFlows(tv, fw, p, detect_thread);
+    FlowWorkerProcessLocalFlows(tv, fw, p);
 
     return TM_ECODE_OK;
 }