]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
flow/worker: process injected flows more gradually
authorVictor Julien <vjulien@oisf.net>
Mon, 26 Sep 2022 07:54:37 +0000 (09:54 +0200)
committerVictor Julien <vjulien@oisf.net>
Sat, 1 Oct 2022 18:27:38 +0000 (20:27 +0200)
Worker threads are responsible for final processing of timed out flows.
These are selected by the Flow Manager and inserted into a per thread
queue. The Flow Worker then checks this queue after each packet. Due to
the burstiness of this process, the packet threads would sometimes process
a lot of these flows in the context of a single packet, leading to spike
in latency which might cause packet loss.

This patch changes the behavior to only process at max 2 flows per packet.
This way added processing cost is amortized over many packets.

src/flow-worker.c

index 5a4bc810004c28dded4793b0e6eceb1f1766c73a..a14b4c35233c80ff4e437c292404378665cc3d89 100644 (file)
@@ -168,11 +168,12 @@ static int FlowFinish(ThreadVars *tv, Flow *f, FlowWorkerThreadData *fw, void *d
     return 1;
 }
 
+/** \param[in] max_work Max flows to process. 0 if unlimited. */
 static void CheckWorkQueue(ThreadVars *tv, FlowWorkerThreadData *fw,
         void *detect_thread, // TODO proper type?
-        FlowTimeoutCounters *counters,
-        FlowQueuePrivate *fq)
+        FlowTimeoutCounters *counters, FlowQueuePrivate *fq, const uint32_t max_work)
 {
+    uint32_t i = 0;
     Flow *f;
     while ((f = FlowQueuePrivateGetFromTop(fq)) != NULL) {
         FLOWLOCK_WRLOCK(f);
@@ -204,11 +205,15 @@ static void CheckWorkQueue(ThreadVars *tv, FlowWorkerThreadData *fw,
 
         FlowClearMemory (f, f->protomap);
         FLOWLOCK_UNLOCK(f);
+
         if (fw->fls.spare_queue.len >= 200) { // TODO match to API? 200 = 2 * block size
             FlowSparePoolReturnFlow(f);
         } else {
             FlowQueuePrivatePrependFlow(&fw->fls.spare_queue, f);
         }
+
+        if (max_work != 0 && ++i == max_work)
+            break;
     }
 }
 
@@ -459,9 +464,8 @@ static inline void FlowWorkerProcessInjectedFlows(ThreadVars *tv,
         if (p->pkt_src == PKT_SRC_WIRE)
             StatsSetUI64(tv, fw->cnt.flows_injected_max, (uint64_t)injected.len);
 
-        FlowTimeoutCounters counters = { 0, 0, };
-        CheckWorkQueue(tv, fw, detect_thread, &counters, &injected);
-        UpdateCounters(tv, fw, &counters);
+        /* move to local queue so we can process over the course of multiple packets */
+        FlowQueuePrivateAppendPrivate(&fw->fls.work_queue, &injected);
     }
     FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_FLOW_INJECTED);
 }
@@ -472,12 +476,16 @@ static inline void FlowWorkerProcessInjectedFlows(ThreadVars *tv,
 static inline void FlowWorkerProcessLocalFlows(ThreadVars *tv,
         FlowWorkerThreadData *fw, Packet *p, void *detect_thread)
 {
+    uint32_t max_work = 2;
+    if (PKT_IS_PSEUDOPKT(p))
+        max_work = 0;
+
     FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_FLOW_EVICTED);
     if (fw->fls.work_queue.len) {
         StatsAddUI64(tv, fw->cnt.flows_removed, (uint64_t)fw->fls.work_queue.len);
 
         FlowTimeoutCounters counters = { 0, 0, };
-        CheckWorkQueue(tv, fw, detect_thread, &counters, &fw->fls.work_queue);
+        CheckWorkQueue(tv, fw, detect_thread, &counters, &fw->fls.work_queue, max_work);
         UpdateCounters(tv, fw, &counters);
     }
     FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_FLOW_EVICTED);
@@ -592,7 +600,7 @@ static TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data)
 
 housekeeping:
 
-    /* take injected flows and process them */
+    /* take injected flows and add them to our local queue */
     FlowWorkerProcessInjectedFlows(tv, fw, p, detect_thread);
 
     /* process local work queue */