]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
capture: check for flow packets on capture timeout
authorVictor Julien <victor@inliniac.net>
Mon, 27 May 2019 13:46:18 +0000 (15:46 +0200)
committerVictor Julien <victor@inliniac.net>
Wed, 29 May 2019 13:34:36 +0000 (15:34 +0200)
The capture threads can receive packets from the flow manager in their
Threadvars::stream_pq packet queue. This mechanism makes sure the packets
the flow manager injects into the engine are processed by the correct
worker thread.

If the capture thread(s) would not receive packets for a long time, the
Threadvars::stream_pq would not be checked and processed. This could
lead to packet pool depletion in the flow manager. It would also lead
to flows not being timed out/logged until either packets started flowing
again or until the engine was shut down.

The scenario is more likely to happen in a test (e.g. replay) but could
also delay logging on low traffic sensors.

src/source-af-packet.c
src/source-netmap.c
src/source-nfq.c
src/source-pcap.c
src/source-pfring.c
src/tm-threads.h

index e7b00afba52f8c92a4b73f4e2b56091cc6dd5a6c..21160bc651e76c1267d13b04e2cb763eba266149 100644 (file)
@@ -1607,8 +1607,8 @@ TmEcode ReceiveAFPLoop(ThreadVars *tv, void *data, void *slot)
                 AFPDumpCounters(ptv);
                 last_dump = current_time;
             }
-            /* poll timed out, lets see if we need to inject a fake packet  */
-            TmThreadsCaptureInjectPacket(tv, ptv->slot, NULL);
+            /* poll timed out, lets see handle our timeout path */
+            TmThreadsCaptureHandleTimeout(tv, ptv->slot, NULL);
 
         } else if ((r < 0) && (errno != EINTR)) {
             SCLogError(SC_ERR_AFP_READ, "Error reading data from iface '%s': (%d) %s",
index 75e23be7fd8587c8d72600bfff4da6d8fbd4bb32..210aabf3e356f9907153733a632c789dd176604a 100644 (file)
@@ -639,8 +639,8 @@ static TmEcode ReceiveNetmapLoop(ThreadVars *tv, void *data, void *slot)
             NetmapDumpCounters(ntv);
             StatsSyncCountersIfSignalled(tv);
 
-            /* poll timed out, lets see if we need to inject a fake packet  */
-            TmThreadsCaptureInjectPacket(tv, ntv->slot, NULL);
+            /* poll timed out, lets handle the timeout */
+            TmThreadsCaptureHandleTimeout(tv, ntv->slot, NULL);
             continue;
         }
 
index aeb80f048e406b05ec55b6dee87bd6517762c850..cf72d19dfe9d07e49c4bc90fd6fbab932ee7ea9a 100644 (file)
@@ -998,8 +998,8 @@ static void NFQRecvPkt(NFQQueueVars *t, NFQThreadVars *tv)
             if (flag)
                 NFQVerdictCacheFlush(t);
 
-            /* inject a fake packet on timeout */
-            TmThreadsCaptureInjectPacket(tv->tv, tv->slot, NULL);
+            /* handle timeout */
+            TmThreadsCaptureHandleTimeout(tv->tv, tv->slot, NULL);
         } else {
 #ifdef COUNTERS
             NFQMutexLock(t);
index 6ba556fc53f8dca078815663d9c2ed4c873799c8..9c7388b964f0f0428e38724f10a64672f5ebe1c2 100644 (file)
@@ -288,7 +288,7 @@ TmEcode ReceivePcapLoop(ThreadVars *tv, void *data, void *slot)
             SCLogError(SC_ERR_PCAP_DISPATCH, "Pcap callback PcapCallbackLoop failed");
             SCReturnInt(TM_ECODE_FAILED);
         } else if (unlikely(r == 0)) {
-            TmThreadsCaptureInjectPacket(tv, ptv->slot, NULL);
+            TmThreadsCaptureHandleTimeout(tv, ptv->slot, NULL);
         }
 
         StatsSyncCountersIfSignalled(tv);
index 06249dede7a5edc76ef613ed9f71637addd1e417..70b5a71fd9c492cdce33e2f92a6ccfd9a912d344 100644 (file)
@@ -435,7 +435,7 @@ TmEcode ReceivePfringLoop(ThreadVars *tv, void *data, void *slot)
             }
 
             /* pfring didn't use the packet yet */
-            TmThreadsCaptureInjectPacket(tv, ptv->slot, p);
+            TmThreadsCaptureHandleTimeout(tv, ptv->slot, p);
 
         } else {
             SCLogError(SC_ERR_PF_RING_RECV,"pfring_recv error  %" PRId32 "", r);
index c8b6f4784926ba5735e0c8e4fb2d5c8d3bd8022f..5acd9616f65b7240a99d75ebd1fbb3c4475dc178 100644 (file)
@@ -209,6 +209,43 @@ static inline TmEcode TmThreadsSlotProcessPkt(ThreadVars *tv, TmSlot *s, Packet
     return r;
 }
 
+/**
+ *  \brief Handle timeout from the capture layer. Checks
+ *         post-pq which may have been filled by the flow
+ *         manager.
+ */
+static inline TmEcode TmThreadsSlotHandlePostPQs(ThreadVars *tv, TmSlot *s)
+{
+    /* post process pq */
+    for (TmSlot *slot = s; slot != NULL; slot = slot->slot_next) {
+        if (slot->slot_post_pq.top != NULL) {
+            while (1) {
+                SCMutexLock(&slot->slot_post_pq.mutex_q);
+                Packet *extra_p = PacketDequeue(&slot->slot_post_pq);
+                SCMutexUnlock(&slot->slot_post_pq.mutex_q);
+
+                if (extra_p == NULL)
+                    break;
+
+                if (slot->slot_next != NULL) {
+                    TmEcode r = TmThreadsSlotVarRun(tv, extra_p, slot->slot_next);
+                    if (r == TM_ECODE_FAILED) {
+                        SCMutexLock(&slot->slot_post_pq.mutex_q);
+                        TmqhReleasePacketsToPacketPool(&slot->slot_post_pq);
+                        SCMutexUnlock(&slot->slot_post_pq.mutex_q);
+
+                        TmqhOutputPacketpool(tv, extra_p);
+                        TmThreadsSetFlag(tv, THV_FAILED);
+                        return TM_ECODE_FAILED;
+                    }
+                }
+                tv->tmqh_out(tv, extra_p);
+            }
+        }
+    }
+    return TM_ECODE_OK;
+}
+
 /** \brief inject packet if THV_CAPTURE_INJECT_PKT is set
  *  Allow caller to supply their own packet
  *
@@ -216,19 +253,31 @@ static inline TmEcode TmThreadsSlotProcessPkt(ThreadVars *tv, TmSlot *s, Packet
  *  to force a packet through the engine to complete a reload */
 static inline void TmThreadsCaptureInjectPacket(ThreadVars *tv, TmSlot *slot, Packet *p)
 {
-    if (TmThreadsCheckFlag(tv, THV_CAPTURE_INJECT_PKT)) {
-        TmThreadsUnsetFlag(tv, THV_CAPTURE_INJECT_PKT);
-        if (p == NULL)
-            p = PacketGetFromQueueOrAlloc();
-        if (p != NULL) {
-            p->flags |= PKT_PSEUDO_STREAM_END;
-            if (TmThreadsSlotProcessPkt(tv, slot, p) != TM_ECODE_OK) {
-                TmqhOutputPacketpool(tv, p);
-            }
+    TmThreadsUnsetFlag(tv, THV_CAPTURE_INJECT_PKT);
+    if (p == NULL)
+        p = PacketGetFromQueueOrAlloc();
+    if (p != NULL) {
+        p->flags |= PKT_PSEUDO_STREAM_END;
+        if (TmThreadsSlotProcessPkt(tv, slot, p) != TM_ECODE_OK) {
+            TmqhOutputPacketpool(tv, p);
         }
     }
 }
 
+static inline void TmThreadsCaptureHandleTimeout(ThreadVars *tv, TmSlot *slot, Packet *p)
+{
+    if (TmThreadsCheckFlag(tv, THV_CAPTURE_INJECT_PKT)) {
+        TmThreadsCaptureInjectPacket(tv, slot, p);
+    } else {
+        TmThreadsSlotHandlePostPQs(tv, slot);
+
+        /* packet could have been passed to us that we won't use
+         * return it to the pool. */
+        if (p != NULL)
+            tv->tmqh_out(tv, p);
+    }
+}
+
 void TmThreadsListThreads(void);
 int TmThreadsRegisterThread(ThreadVars *tv, const int type);
 void TmThreadsUnregisterThread(const int id);