From 73c7550455f33f4615f08760e4528732cce19c07 Mon Sep 17 00:00:00 2001 From: Victor Julien Date: Fri, 1 Oct 2021 13:20:02 +0200 Subject: [PATCH] flow: process evicted flows on low/no traffic In a scenario where there was suddenly no more traffic flowing, flows in a threads `flow_queue` would not be processed. The easiest way to see this would be in a traffic replay scenario. After the replay is done no more packets come in and these evicted flows got stuck. In workers mode, the capture part handles timeout this was updated to take the `ThreadVars::flow_queue` into account. The autofp mode the logic that puts a flow into a threads `flow_queue` would already wake a thread up, but the `flow_queue` was then ignored. This has been updated to take the `flow_queue` into account. In both cases a "capture timeout" packet is pushed through the pipeline to "flush" the queues. Bug: #4722. (cherry picked from commit b788d3345cd4e4c467672bb6bfb90d2b8620e068) --- src/tm-threads.c | 11 +++++++++++ src/tm-threads.h | 35 +++++++++++++++++++++++++++-------- 2 files changed, 38 insertions(+), 8 deletions(-) diff --git a/src/tm-threads.c b/src/tm-threads.c index d9c5160784..21b40bad6e 100644 --- a/src/tm-threads.c +++ b/src/tm-threads.c @@ -447,6 +447,17 @@ static void *TmThreadsSlotVar(void *td) /* input a packet */ p = tv->tmqh_in(tv); + /* if we didn't get a packet see if we need to do some housekeeping */ + if (unlikely(p == NULL)) { + if (tv->flow_queue && SC_ATOMIC_GET(tv->flow_queue->non_empty) == true) { + p = PacketGetFromQueueOrAlloc(); + if (p != NULL) { + p->flags |= PKT_PSEUDO_STREAM_END; + PKT_SET_SRC(p, PKT_SRC_CAPTURE_TIMEOUT); + } + } + } + if (p != NULL) { /* run the thread module(s) */ r = TmThreadsSlotVarRun(tv, p, s); diff --git a/src/tm-threads.h b/src/tm-threads.h index 76e13d9795..91cf9bbb47 100644 --- a/src/tm-threads.h +++ b/src/tm-threads.h @@ -159,7 +159,7 @@ static inline void TmThreadsSlotProcessPktFail(ThreadVars *tv, TmSlot *s, Packet * manager. * \param s pipeline to run on these packets. */ -static inline void TmThreadsHandleInjectedPackets(ThreadVars *tv) +static inline bool TmThreadsHandleInjectedPackets(ThreadVars *tv) { PacketQueue *pq = tv->stream_pq_local; if (pq && pq->len > 0) { @@ -176,6 +176,9 @@ static inline void TmThreadsHandleInjectedPackets(ThreadVars *tv) } tv->tmqh_out(tv, extra_p); } + return true; + } else { + return false; } } @@ -221,18 +224,34 @@ static inline void TmThreadsCaptureInjectPacket(ThreadVars *tv, Packet *p) } } +/** \brief handle capture timeout + * When a capture method times out we check for house keeping + * tasks in the capture thread. + * + * \param p packet. Capture method may have taken a packet from + * the pool prior to the timing out call. We will then + * use that packet. Otherwise we can get our own. + */ static inline void TmThreadsCaptureHandleTimeout(ThreadVars *tv, Packet *p) { if (TmThreadsCheckFlag(tv, THV_CAPTURE_INJECT_PKT)) { - TmThreadsCaptureInjectPacket(tv, p); - } else { - TmThreadsHandleInjectedPackets(tv); + TmThreadsCaptureInjectPacket(tv, p); /* consumes 'p' */ + return; - /* packet could have been passed to us that we won't use - * return it to the pool. */ - if (p != NULL) - tv->tmqh_out(tv, p); + } else { + if (TmThreadsHandleInjectedPackets(tv) == false) { + /* see if we have to do some house keeping */ + if (tv->flow_queue && SC_ATOMIC_GET(tv->flow_queue->non_empty) == true) { + TmThreadsCaptureInjectPacket(tv, p); /* consumes 'p' */ + return; + } + } } + + /* packet could have been passed to us that we won't use + * return it to the pool. */ + if (p != NULL) + tv->tmqh_out(tv, p); } void TmThreadsListThreads(void); -- 2.47.2