From: Victor Julien Date: Mon, 11 Nov 2019 10:19:44 +0000 (+0100) Subject: threading: use tm_flowworker for pseudo packets X-Git-Tag: suricata-6.0.0-beta1~804 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=49599dfe8949bf71186aa0205f680ccac00eab42;p=thirdparty%2Fsuricata.git threading: use tm_flowworker for pseudo packets Pseudo packets don't need to be processed by the decoding layer. --- diff --git a/src/source-af-packet.c b/src/source-af-packet.c index ba2a251604..3bc03fe57c 100644 --- a/src/source-af-packet.c +++ b/src/source-af-packet.c @@ -1617,7 +1617,7 @@ TmEcode ReceiveAFPLoop(ThreadVars *tv, void *data, void *slot) last_dump = current_time; } /* poll timed out, lets see handle our timeout path */ - TmThreadsCaptureHandleTimeout(tv, ptv->slot, NULL); + TmThreadsCaptureHandleTimeout(tv, NULL); } else if ((r < 0) && (errno != EINTR)) { SCLogError(SC_ERR_AFP_READ, "Error reading data from iface '%s': (%d) %s", diff --git a/src/source-netmap.c b/src/source-netmap.c index 579ad1295b..210c1ee81c 100644 --- a/src/source-netmap.c +++ b/src/source-netmap.c @@ -640,7 +640,7 @@ static TmEcode ReceiveNetmapLoop(ThreadVars *tv, void *data, void *slot) StatsSyncCountersIfSignalled(tv); /* poll timed out, lets handle the timeout */ - TmThreadsCaptureHandleTimeout(tv, ntv->slot, NULL); + TmThreadsCaptureHandleTimeout(tv, NULL); continue; } diff --git a/src/source-nfq.c b/src/source-nfq.c index d5859be2f8..b5a143decc 100644 --- a/src/source-nfq.c +++ b/src/source-nfq.c @@ -980,7 +980,7 @@ static void NFQRecvPkt(NFQQueueVars *t, NFQThreadVars *tv) NFQVerdictCacheFlush(t); /* handle timeout */ - TmThreadsCaptureHandleTimeout(tv->tv, tv->slot, NULL); + TmThreadsCaptureHandleTimeout(tv->tv, NULL); } else { #ifdef COUNTERS NFQMutexLock(t); diff --git a/src/source-pcap.c b/src/source-pcap.c index 9ec461508c..a7f9d38ee5 100644 --- a/src/source-pcap.c +++ b/src/source-pcap.c @@ -266,7 +266,7 @@ TmEcode ReceivePcapLoop(ThreadVars *tv, void *data, void *slot) if (r == PCAP_ERROR_BREAK && ptv->cb_result == TM_ECODE_FAILED) { SCReturnInt(TM_ECODE_FAILED); } - TmThreadsCaptureHandleTimeout(tv, ptv->slot, NULL); + TmThreadsCaptureHandleTimeout(tv, NULL); } else if (unlikely(r < 0)) { int dbreak = 0; SCLogError(SC_ERR_PCAP_DISPATCH, "error code %" PRId32 " %s", diff --git a/src/source-pfring.c b/src/source-pfring.c index 82a0bb85ec..2f5487b45f 100644 --- a/src/source-pfring.c +++ b/src/source-pfring.c @@ -434,7 +434,7 @@ TmEcode ReceivePfringLoop(ThreadVars *tv, void *data, void *slot) } /* pfring didn't use the packet yet */ - TmThreadsCaptureHandleTimeout(tv, ptv->slot, p); + TmThreadsCaptureHandleTimeout(tv, p); } else { SCLogError(SC_ERR_PF_RING_RECV,"pfring_recv error %" PRId32 "", r); diff --git a/src/tm-threads.c b/src/tm-threads.c index b592950a9b..19016e3642 100644 --- a/src/tm-threads.c +++ b/src/tm-threads.c @@ -588,7 +588,7 @@ static void *TmThreadsSlotVar(void *td) tv->tmqh_out(tv, p); /* now handle the stream pq packets */ - TmThreadsHandleInjectedPackets(tv, s); + TmThreadsHandleInjectedPackets(tv); } if (TmThreadsCheckFlag(tv, THV_KILL)) { diff --git a/src/tm-threads.h b/src/tm-threads.h index 16130b1753..6ce9c5e07b 100644 --- a/src/tm-threads.h +++ b/src/tm-threads.h @@ -162,7 +162,7 @@ static inline void TmThreadsSlotProcessPktFail(ThreadVars *tv, TmSlot *s, Packet * manager. * \param s pipeline to run on these packets. */ -static inline void TmThreadsHandleInjectedPackets(ThreadVars *tv, TmSlot *s) +static inline void TmThreadsHandleInjectedPackets(ThreadVars *tv) { PacketQueue *pq = tv->stream_pq_local; if (pq && pq->len > 0) { @@ -172,9 +172,9 @@ static inline void TmThreadsHandleInjectedPackets(ThreadVars *tv, TmSlot *s) SCMutexUnlock(&pq->mutex_q); if (extra_p == NULL) break; - TmEcode r = TmThreadsSlotVarRun(tv, extra_p, s); + TmEcode r = TmThreadsSlotVarRun(tv, extra_p, tv->tm_flowworker); if (r == TM_ECODE_FAILED) { - TmThreadsSlotProcessPktFail(tv, s, extra_p); + TmThreadsSlotProcessPktFail(tv, tv->tm_flowworker, extra_p); break; } tv->tmqh_out(tv, extra_p); @@ -200,7 +200,7 @@ static inline TmEcode TmThreadsSlotProcessPkt(ThreadVars *tv, TmSlot *s, Packet tv->tmqh_out(tv, p); - TmThreadsHandleInjectedPackets(tv, s); + TmThreadsHandleInjectedPackets(tv); return TM_ECODE_OK; } @@ -210,7 +210,7 @@ static inline TmEcode TmThreadsSlotProcessPkt(ThreadVars *tv, TmSlot *s, Packet * * Meant for detect reload process that interupts an sleeping capture thread * to force a packet through the engine to complete a reload */ -static inline void TmThreadsCaptureInjectPacket(ThreadVars *tv, TmSlot *slot, Packet *p) +static inline void TmThreadsCaptureInjectPacket(ThreadVars *tv, Packet *p) { TmThreadsUnsetFlag(tv, THV_CAPTURE_INJECT_PKT); if (p == NULL) @@ -218,18 +218,18 @@ static inline void TmThreadsCaptureInjectPacket(ThreadVars *tv, TmSlot *slot, Pa if (p != NULL) { p->flags |= PKT_PSEUDO_STREAM_END; PKT_SET_SRC(p, PKT_SRC_CAPTURE_TIMEOUT); - if (TmThreadsSlotProcessPkt(tv, slot, p) != TM_ECODE_OK) { + if (TmThreadsSlotProcessPkt(tv, tv->tm_flowworker, p) != TM_ECODE_OK) { TmqhOutputPacketpool(tv, p); } } } -static inline void TmThreadsCaptureHandleTimeout(ThreadVars *tv, TmSlot *slot, Packet *p) +static inline void TmThreadsCaptureHandleTimeout(ThreadVars *tv, Packet *p) { if (TmThreadsCheckFlag(tv, THV_CAPTURE_INJECT_PKT)) { - TmThreadsCaptureInjectPacket(tv, slot, p); + TmThreadsCaptureInjectPacket(tv, p); } else { - TmThreadsHandleInjectedPackets(tv, slot); + TmThreadsHandleInjectedPackets(tv); /* packet could have been passed to us that we won't use * return it to the pool. */