]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
flow: process evicted flows on low/no traffic
authorVictor Julien <victor@inliniac.net>
Fri, 1 Oct 2021 11:20:02 +0000 (13:20 +0200)
committerVictor Julien <victor@inliniac.net>
Tue, 5 Oct 2021 09:04:34 +0000 (11:04 +0200)
In a scenario where there was suddenly no more traffic flowing, flows
in a threads `flow_queue` would not be processed. The easiest way to
see this would be in a traffic replay scenario. After the replay is done
no more packets come in and these evicted flows got stuck.

In workers mode, the capture part handles timeout this was updated to
take the `ThreadVars::flow_queue` into account.

The autofp mode the logic that puts a flow into a threads `flow_queue`
would already wake a thread up, but the `flow_queue` was then ignored.
This has been updated to take the `flow_queue` into account.

In both cases a "capture timeout" packet is pushed through the pipeline
to "flush" the queues.

Bug: #4722.
(cherry picked from commit b788d3345cd4e4c467672bb6bfb90d2b8620e068)

src/tm-threads.c
src/tm-threads.h

index d9c5160784adce90c4c06aed7cd1a1d6b906d441..21b40bad6ebac40b48e330979d04aadd948278b0 100644 (file)
@@ -447,6 +447,17 @@ static void *TmThreadsSlotVar(void *td)
         /* input a packet */
         p = tv->tmqh_in(tv);
 
+        /* if we didn't get a packet see if we need to do some housekeeping */
+        if (unlikely(p == NULL)) {
+            if (tv->flow_queue && SC_ATOMIC_GET(tv->flow_queue->non_empty) == true) {
+                p = PacketGetFromQueueOrAlloc();
+                if (p != NULL) {
+                    p->flags |= PKT_PSEUDO_STREAM_END;
+                    PKT_SET_SRC(p, PKT_SRC_CAPTURE_TIMEOUT);
+                }
+            }
+        }
+
         if (p != NULL) {
             /* run the thread module(s) */
             r = TmThreadsSlotVarRun(tv, p, s);
index 76e13d9795c931918762ddd067395248e7ddd609..91cf9bbb4714f187ca0cf3a37b2170f52b9a8625 100644 (file)
@@ -159,7 +159,7 @@ static inline void TmThreadsSlotProcessPktFail(ThreadVars *tv, TmSlot *s, Packet
  *         manager.
  *  \param s pipeline to run on these packets.
  */
-static inline void TmThreadsHandleInjectedPackets(ThreadVars *tv)
+static inline bool TmThreadsHandleInjectedPackets(ThreadVars *tv)
 {
     PacketQueue *pq = tv->stream_pq_local;
     if (pq && pq->len > 0) {
@@ -176,6 +176,9 @@ static inline void TmThreadsHandleInjectedPackets(ThreadVars *tv)
             }
             tv->tmqh_out(tv, extra_p);
         }
+        return true;
+    } else {
+        return false;
     }
 }
 
@@ -221,18 +224,34 @@ static inline void TmThreadsCaptureInjectPacket(ThreadVars *tv, Packet *p)
     }
 }
 
+/** \brief handle capture timeout
+ *  When a capture method times out we check for house keeping
+ *  tasks in the capture thread.
+ *
+ *  \param p packet. Capture method may have taken a packet from
+ *           the pool prior to the timing out call. We will then
+ *           use that packet. Otherwise we can get our own.
+ */
 static inline void TmThreadsCaptureHandleTimeout(ThreadVars *tv, Packet *p)
 {
     if (TmThreadsCheckFlag(tv, THV_CAPTURE_INJECT_PKT)) {
-        TmThreadsCaptureInjectPacket(tv, p);
-    } else {
-        TmThreadsHandleInjectedPackets(tv);
+        TmThreadsCaptureInjectPacket(tv, p); /* consumes 'p' */
+        return;
 
-        /* packet could have been passed to us that we won't use
-         * return it to the pool. */
-        if (p != NULL)
-            tv->tmqh_out(tv, p);
+    } else {
+        if (TmThreadsHandleInjectedPackets(tv) == false) {
+            /* see if we have to do some house keeping */
+            if (tv->flow_queue && SC_ATOMIC_GET(tv->flow_queue->non_empty) == true) {
+                TmThreadsCaptureInjectPacket(tv, p); /* consumes 'p' */
+                return;
+            }
+        }
     }
+
+    /* packet could have been passed to us that we won't use
+     * return it to the pool. */
+    if (p != NULL)
+        tv->tmqh_out(tv, p);
 }
 
 void TmThreadsListThreads(void);