From ce71bf1fffbeefb70b63efa176408a296dc57ccd Mon Sep 17 00:00:00 2001 From: Victor Julien Date: Mon, 27 May 2019 15:46:18 +0200 Subject: [PATCH] capture: check for flow packets on capture timeout The capture threads can receive packets from the flow manager in their Threadvars::stream_pq packet queue. This mechanism makes sure the packets the flow manager injects into the engine are processed by the correct worker thread. If the capture thread(s) would not receive packets for a long time, the Threadvars::stream_pq would not be checked and processed. This could lead to packet pool depletion in the flow manager. It would also lead to flows not being timed out/logged until either packets started flowing again or until the engine was shut down. The scenario is more likely to happen in a test (e.g. replay) but could also delay logging on low traffic sensors. --- src/source-af-packet.c | 4 +-- src/source-netmap.c | 4 +-- src/source-nfq.c | 4 +-- src/source-pcap.c | 2 +- src/source-pfring.c | 2 +- src/tm-threads.h | 67 ++++++++++++++++++++++++++++++++++++------ 6 files changed, 66 insertions(+), 17 deletions(-) diff --git a/src/source-af-packet.c b/src/source-af-packet.c index e7b00afba5..21160bc651 100644 --- a/src/source-af-packet.c +++ b/src/source-af-packet.c @@ -1607,8 +1607,8 @@ TmEcode ReceiveAFPLoop(ThreadVars *tv, void *data, void *slot) AFPDumpCounters(ptv); last_dump = current_time; } - /* poll timed out, lets see if we need to inject a fake packet */ - TmThreadsCaptureInjectPacket(tv, ptv->slot, NULL); + /* poll timed out, lets see handle our timeout path */ + TmThreadsCaptureHandleTimeout(tv, ptv->slot, NULL); } else if ((r < 0) && (errno != EINTR)) { SCLogError(SC_ERR_AFP_READ, "Error reading data from iface '%s': (%d) %s", diff --git a/src/source-netmap.c b/src/source-netmap.c index 75e23be7fd..210aabf3e3 100644 --- a/src/source-netmap.c +++ b/src/source-netmap.c @@ -639,8 +639,8 @@ static TmEcode ReceiveNetmapLoop(ThreadVars *tv, void *data, void *slot) NetmapDumpCounters(ntv); StatsSyncCountersIfSignalled(tv); - /* poll timed out, lets see if we need to inject a fake packet */ - TmThreadsCaptureInjectPacket(tv, ntv->slot, NULL); + /* poll timed out, lets handle the timeout */ + TmThreadsCaptureHandleTimeout(tv, ntv->slot, NULL); continue; } diff --git a/src/source-nfq.c b/src/source-nfq.c index aeb80f048e..cf72d19dfe 100644 --- a/src/source-nfq.c +++ b/src/source-nfq.c @@ -998,8 +998,8 @@ static void NFQRecvPkt(NFQQueueVars *t, NFQThreadVars *tv) if (flag) NFQVerdictCacheFlush(t); - /* inject a fake packet on timeout */ - TmThreadsCaptureInjectPacket(tv->tv, tv->slot, NULL); + /* handle timeout */ + TmThreadsCaptureHandleTimeout(tv->tv, tv->slot, NULL); } else { #ifdef COUNTERS NFQMutexLock(t); diff --git a/src/source-pcap.c b/src/source-pcap.c index 6ba556fc53..9c7388b964 100644 --- a/src/source-pcap.c +++ b/src/source-pcap.c @@ -288,7 +288,7 @@ TmEcode ReceivePcapLoop(ThreadVars *tv, void *data, void *slot) SCLogError(SC_ERR_PCAP_DISPATCH, "Pcap callback PcapCallbackLoop failed"); SCReturnInt(TM_ECODE_FAILED); } else if (unlikely(r == 0)) { - TmThreadsCaptureInjectPacket(tv, ptv->slot, NULL); + TmThreadsCaptureHandleTimeout(tv, ptv->slot, NULL); } StatsSyncCountersIfSignalled(tv); diff --git a/src/source-pfring.c b/src/source-pfring.c index 06249dede7..70b5a71fd9 100644 --- a/src/source-pfring.c +++ b/src/source-pfring.c @@ -435,7 +435,7 @@ TmEcode ReceivePfringLoop(ThreadVars *tv, void *data, void *slot) } /* pfring didn't use the packet yet */ - TmThreadsCaptureInjectPacket(tv, ptv->slot, p); + TmThreadsCaptureHandleTimeout(tv, ptv->slot, p); } else { SCLogError(SC_ERR_PF_RING_RECV,"pfring_recv error %" PRId32 "", r); diff --git a/src/tm-threads.h b/src/tm-threads.h index c8b6f47849..5acd9616f6 100644 --- a/src/tm-threads.h +++ b/src/tm-threads.h @@ -209,6 +209,43 @@ static inline TmEcode TmThreadsSlotProcessPkt(ThreadVars *tv, TmSlot *s, Packet return r; } +/** + * \brief Handle timeout from the capture layer. Checks + * post-pq which may have been filled by the flow + * manager. + */ +static inline TmEcode TmThreadsSlotHandlePostPQs(ThreadVars *tv, TmSlot *s) +{ + /* post process pq */ + for (TmSlot *slot = s; slot != NULL; slot = slot->slot_next) { + if (slot->slot_post_pq.top != NULL) { + while (1) { + SCMutexLock(&slot->slot_post_pq.mutex_q); + Packet *extra_p = PacketDequeue(&slot->slot_post_pq); + SCMutexUnlock(&slot->slot_post_pq.mutex_q); + + if (extra_p == NULL) + break; + + if (slot->slot_next != NULL) { + TmEcode r = TmThreadsSlotVarRun(tv, extra_p, slot->slot_next); + if (r == TM_ECODE_FAILED) { + SCMutexLock(&slot->slot_post_pq.mutex_q); + TmqhReleasePacketsToPacketPool(&slot->slot_post_pq); + SCMutexUnlock(&slot->slot_post_pq.mutex_q); + + TmqhOutputPacketpool(tv, extra_p); + TmThreadsSetFlag(tv, THV_FAILED); + return TM_ECODE_FAILED; + } + } + tv->tmqh_out(tv, extra_p); + } + } + } + return TM_ECODE_OK; +} + /** \brief inject packet if THV_CAPTURE_INJECT_PKT is set * Allow caller to supply their own packet * @@ -216,19 +253,31 @@ static inline TmEcode TmThreadsSlotProcessPkt(ThreadVars *tv, TmSlot *s, Packet * to force a packet through the engine to complete a reload */ static inline void TmThreadsCaptureInjectPacket(ThreadVars *tv, TmSlot *slot, Packet *p) { - if (TmThreadsCheckFlag(tv, THV_CAPTURE_INJECT_PKT)) { - TmThreadsUnsetFlag(tv, THV_CAPTURE_INJECT_PKT); - if (p == NULL) - p = PacketGetFromQueueOrAlloc(); - if (p != NULL) { - p->flags |= PKT_PSEUDO_STREAM_END; - if (TmThreadsSlotProcessPkt(tv, slot, p) != TM_ECODE_OK) { - TmqhOutputPacketpool(tv, p); - } + TmThreadsUnsetFlag(tv, THV_CAPTURE_INJECT_PKT); + if (p == NULL) + p = PacketGetFromQueueOrAlloc(); + if (p != NULL) { + p->flags |= PKT_PSEUDO_STREAM_END; + if (TmThreadsSlotProcessPkt(tv, slot, p) != TM_ECODE_OK) { + TmqhOutputPacketpool(tv, p); } } } +static inline void TmThreadsCaptureHandleTimeout(ThreadVars *tv, TmSlot *slot, Packet *p) +{ + if (TmThreadsCheckFlag(tv, THV_CAPTURE_INJECT_PKT)) { + TmThreadsCaptureInjectPacket(tv, slot, p); + } else { + TmThreadsSlotHandlePostPQs(tv, slot); + + /* packet could have been passed to us that we won't use + * return it to the pool. */ + if (p != NULL) + tv->tmqh_out(tv, p); + } +} + void TmThreadsListThreads(void); int TmThreadsRegisterThread(ThreadVars *tv, const int type); void TmThreadsUnregisterThread(const int id); -- 2.47.2