]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
flow: process evicted flows on low/no traffic
authorVictor Julien <victor@inliniac.net>
Fri, 1 Oct 2021 11:20:02 +0000 (13:20 +0200)
committerVictor Julien <victor@inliniac.net>
Mon, 4 Oct 2021 09:38:21 +0000 (11:38 +0200)
In a scenario where there was suddenly no more traffic flowing, flows
in a threads `flow_queue` would not be processed. The easiest way to
see this would be in a traffic replay scenario. After the replay is done
no more packets come in and these evicted flows got stuck.

In workers mode, the capture part handles timeout this was updated to
take the `ThreadVars::flow_queue` into account.

The autofp mode the logic that puts a flow into a threads `flow_queue`
would already wake a thread up, but the `flow_queue` was then ignored.
This has been updated to take the `flow_queue` into account.

In both cases a "capture timeout" packet is pushed through the pipeline
to "flush" the queues.

Bug: #4722.

src/tm-threads.c
src/tm-threads.h

index 0920c0bf852176d17c7e6bb948f7e7adb1068ba0..ed01e26a71973d611ec08c9584258152e17ef75b 100644 (file)
@@ -447,6 +447,17 @@ static void *TmThreadsSlotVar(void *td)
         /* input a packet */
         p = tv->tmqh_in(tv);
 
+        /* if we didn't get a packet see if we need to do some housekeeping */
+        if (unlikely(p == NULL)) {
+            if (tv->flow_queue && SC_ATOMIC_GET(tv->flow_queue->non_empty) == true) {
+                p = PacketGetFromQueueOrAlloc();
+                if (p != NULL) {
+                    p->flags |= PKT_PSEUDO_STREAM_END;
+                    PKT_SET_SRC(p, PKT_SRC_CAPTURE_TIMEOUT);
+                }
+            }
+        }
+
         if (p != NULL) {
             /* run the thread module(s) */
             r = TmThreadsSlotVarRun(tv, p, s);
index 76e13d9795c931918762ddd067395248e7ddd609..91cf9bbb4714f187ca0cf3a37b2170f52b9a8625 100644 (file)
@@ -159,7 +159,7 @@ static inline void TmThreadsSlotProcessPktFail(ThreadVars *tv, TmSlot *s, Packet
  *         manager.
  *  \param s pipeline to run on these packets.
  */
-static inline void TmThreadsHandleInjectedPackets(ThreadVars *tv)
+static inline bool TmThreadsHandleInjectedPackets(ThreadVars *tv)
 {
     PacketQueue *pq = tv->stream_pq_local;
     if (pq && pq->len > 0) {
@@ -176,6 +176,9 @@ static inline void TmThreadsHandleInjectedPackets(ThreadVars *tv)
             }
             tv->tmqh_out(tv, extra_p);
         }
+        return true;
+    } else {
+        return false;
     }
 }
 
@@ -221,18 +224,34 @@ static inline void TmThreadsCaptureInjectPacket(ThreadVars *tv, Packet *p)
     }
 }
 
+/** \brief handle capture timeout
+ *  When a capture method times out we check for house keeping
+ *  tasks in the capture thread.
+ *
+ *  \param p packet. Capture method may have taken a packet from
+ *           the pool prior to the timing out call. We will then
+ *           use that packet. Otherwise we can get our own.
+ */
 static inline void TmThreadsCaptureHandleTimeout(ThreadVars *tv, Packet *p)
 {
     if (TmThreadsCheckFlag(tv, THV_CAPTURE_INJECT_PKT)) {
-        TmThreadsCaptureInjectPacket(tv, p);
-    } else {
-        TmThreadsHandleInjectedPackets(tv);
+        TmThreadsCaptureInjectPacket(tv, p); /* consumes 'p' */
+        return;
 
-        /* packet could have been passed to us that we won't use
-         * return it to the pool. */
-        if (p != NULL)
-            tv->tmqh_out(tv, p);
+    } else {
+        if (TmThreadsHandleInjectedPackets(tv) == false) {
+            /* see if we have to do some house keeping */
+            if (tv->flow_queue && SC_ATOMIC_GET(tv->flow_queue->non_empty) == true) {
+                TmThreadsCaptureInjectPacket(tv, p); /* consumes 'p' */
+                return;
+            }
+        }
     }
+
+    /* packet could have been passed to us that we won't use
+     * return it to the pool. */
+    if (p != NULL)
+        tv->tmqh_out(tv, p);
 }
 
 void TmThreadsListThreads(void);