]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
flowworker: rate limit flow queue processing
authorVictor Julien <vjulien@oisf.net>
Tue, 9 May 2023 12:56:33 +0000 (14:56 +0200)
committerVictor Julien <vjulien@oisf.net>
Wed, 10 May 2023 08:59:17 +0000 (10:59 +0200)
Until this patch the logic the flow worker flow house keeping used was:
- at most 2 flows are handled per packet
- pseudo packets could flush the entire queue

This patch changes that. Pseudo packets are fairly common, and can lead
to packet stalls / latency spikes if the number of flows in the queue
is large.

It does that by adding a new packet type only used at shutdown, which
flushes out the queues completely. All other packets will now stick
to the 2 flow rate limit.

src/decode.c
src/decode.h
src/flow-worker.c
src/tm-threads.c

index 3bf8a98a5b86fc35a34ed8c42aa207ee96c35837..f524cad4785bfd61fc153052c7a8e278de43ed7a 100644 (file)
@@ -778,6 +778,9 @@ const char *PktSrcToString(enum PktSrcEnum pkt_src)
         case PKT_SRC_CAPTURE_TIMEOUT:
             pkt_src_str = "capture timeout flush";
             break;
+        case PKT_SRC_SHUTDOWN_FLUSH:
+            pkt_src_str = "shutdown flush";
+            break;
     }
     DEBUG_VALIDATE_BUG_ON(pkt_src_str == NULL);
     return pkt_src_str;
index 39d9bf1355d00cfd483a861b92f3a14fa390fcdd..ca7e85f81cbd5d729b6f0adfda5ce3c350cc8866 100644 (file)
@@ -63,6 +63,7 @@ enum PktSrcEnum {
     PKT_SRC_DETECT_RELOAD_FLUSH,
     PKT_SRC_CAPTURE_TIMEOUT,
     PKT_SRC_DECODER_GENEVE,
+    PKT_SRC_SHUTDOWN_FLUSH,
 };
 
 #include "source-nflog.h"
index b6676e6aa7e62f3a7c292569d2413342829a40ae..3b122c7246bb3159db47307979b29e8e550b85ce 100644 (file)
@@ -485,7 +485,7 @@ static inline void FlowWorkerProcessInjectedFlows(
 static inline void FlowWorkerProcessLocalFlows(ThreadVars *tv, FlowWorkerThreadData *fw, Packet *p)
 {
     uint32_t max_work = 2;
-    if (PKT_IS_PSEUDOPKT(p))
+    if (p->pkt_src == PKT_SRC_SHUTDOWN_FLUSH)
         max_work = 0;
 
     FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_FLOW_EVICTED);
index 8f5ad969c8e1404f0fda482856b5b39b40a0510c..c3f73d666626bb3d83935259594fd60692409cba 100644 (file)
@@ -1308,7 +1308,7 @@ again:
                 Packet *p = PacketGetFromAlloc();
                 if (p != NULL) {
                     p->flags |= PKT_PSEUDO_STREAM_END;
-                    PKT_SET_SRC(p, PKT_SRC_DETECT_RELOAD_FLUSH);
+                    PKT_SET_SRC(p, PKT_SRC_SHUTDOWN_FLUSH);
                     PacketQueue *q = tv->stream_pq;
                     SCMutexLock(&q->mutex_q);
                     PacketEnqueue(q, p);
@@ -1397,7 +1397,7 @@ again:
                     Packet *p = PacketGetFromAlloc();
                     if (p != NULL) {
                         p->flags |= PKT_PSEUDO_STREAM_END;
-                        PKT_SET_SRC(p, PKT_SRC_DETECT_RELOAD_FLUSH);
+                        PKT_SET_SRC(p, PKT_SRC_SHUTDOWN_FLUSH);
                         PacketQueue *q = tv->stream_pq;
                         SCMutexLock(&q->mutex_q);
                         PacketEnqueue(q, p);