From: Victor Julien Date: Tue, 9 May 2023 12:56:33 +0000 (+0200) Subject: flowworker: rate limit flow queue processing X-Git-Tag: suricata-7.0.0-rc2~180 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=55e32b25447c1d941c6f02cf244d231a6d69526d;p=thirdparty%2Fsuricata.git flowworker: rate limit flow queue processing Until this patch the logic the flow worker flow house keeping used was: - at most 2 flows are handled per packet - pseudo packets could flush the entire queue This patch changes that. Pseudo packets are fairly common, and can lead to packet stalls / latency spikes if the number of flows in the queue is large. It does that by adding a new packet type only used at shutdown, which flushes out the queues completely. All other packets will now stick to the 2 flow rate limit. --- diff --git a/src/decode.c b/src/decode.c index 3bf8a98a5b..f524cad478 100644 --- a/src/decode.c +++ b/src/decode.c @@ -778,6 +778,9 @@ const char *PktSrcToString(enum PktSrcEnum pkt_src) case PKT_SRC_CAPTURE_TIMEOUT: pkt_src_str = "capture timeout flush"; break; + case PKT_SRC_SHUTDOWN_FLUSH: + pkt_src_str = "shutdown flush"; + break; } DEBUG_VALIDATE_BUG_ON(pkt_src_str == NULL); return pkt_src_str; diff --git a/src/decode.h b/src/decode.h index 39d9bf1355..ca7e85f81c 100644 --- a/src/decode.h +++ b/src/decode.h @@ -63,6 +63,7 @@ enum PktSrcEnum { PKT_SRC_DETECT_RELOAD_FLUSH, PKT_SRC_CAPTURE_TIMEOUT, PKT_SRC_DECODER_GENEVE, + PKT_SRC_SHUTDOWN_FLUSH, }; #include "source-nflog.h" diff --git a/src/flow-worker.c b/src/flow-worker.c index b6676e6aa7..3b122c7246 100644 --- a/src/flow-worker.c +++ b/src/flow-worker.c @@ -485,7 +485,7 @@ static inline void FlowWorkerProcessInjectedFlows( static inline void FlowWorkerProcessLocalFlows(ThreadVars *tv, FlowWorkerThreadData *fw, Packet *p) { uint32_t max_work = 2; - if (PKT_IS_PSEUDOPKT(p)) + if (p->pkt_src == PKT_SRC_SHUTDOWN_FLUSH) max_work = 0; FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_FLOW_EVICTED); diff --git a/src/tm-threads.c b/src/tm-threads.c index 8f5ad969c8..c3f73d6666 100644 --- a/src/tm-threads.c +++ b/src/tm-threads.c @@ -1308,7 +1308,7 @@ again: Packet *p = PacketGetFromAlloc(); if (p != NULL) { p->flags |= PKT_PSEUDO_STREAM_END; - PKT_SET_SRC(p, PKT_SRC_DETECT_RELOAD_FLUSH); + PKT_SET_SRC(p, PKT_SRC_SHUTDOWN_FLUSH); PacketQueue *q = tv->stream_pq; SCMutexLock(&q->mutex_q); PacketEnqueue(q, p); @@ -1397,7 +1397,7 @@ again: Packet *p = PacketGetFromAlloc(); if (p != NULL) { p->flags |= PKT_PSEUDO_STREAM_END; - PKT_SET_SRC(p, PKT_SRC_DETECT_RELOAD_FLUSH); + PKT_SET_SRC(p, PKT_SRC_SHUTDOWN_FLUSH); PacketQueue *q = tv->stream_pq; SCMutexLock(&q->mutex_q); PacketEnqueue(q, p);