From: Victor Julien Date: Sat, 9 Nov 2019 19:24:21 +0000 (+0100) Subject: threading: remove per slot post_pq X-Git-Tag: suricata-6.0.0-beta1~806 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=02004fa5475650677a546e84ad39f0f767c83fcf;p=thirdparty%2Fsuricata.git threading: remove per slot post_pq Use a single packet queue per thread for flow timeout packet injection. The per slot queue was unused except for this use case. Having a single queue makes the logic and implementation simpler. In case of 'autofp', the per thread packet queue will actually use the threads input queue. For workers/single a dedicated queue will be set up. Rename TmThreadsSlotHandlePostPQs to TmThreadsHandleInjectedPackets to reflect the changed logic. --- diff --git a/src/threadvars.h b/src/threadvars.h index 2fec2fa56b..e7b20f38b9 100644 --- a/src/threadvars.h +++ b/src/threadvars.h @@ -83,8 +83,10 @@ typedef struct ThreadVars_ { void *(*tm_func)(void *); struct TmSlot_ *tm_slots; - /** stream packet queue for flow time out injection */ + /** Stream packet queue for flow time out injection. Either a pointer to the + * workers input queue or to stream_pq_local */ struct PacketQueue_ *stream_pq; + struct PacketQueue_ *stream_pq_local; uint8_t thread_setup_flags; diff --git a/src/tm-threads.c b/src/tm-threads.c index 94e9bf310e..94e8f2a8f6 100644 --- a/src/tm-threads.c +++ b/src/tm-threads.c @@ -109,8 +109,6 @@ void TmThreadsUnsetFlag(ThreadVars *tv, uint32_t flag) /** * \brief Separate run function so we can call it recursively. - * - * \note post_pq if only used for first slot */ TmEcode TmThreadsSlotVarRun(ThreadVars *tv, Packet *p, TmSlot *slot) { @@ -123,13 +121,7 @@ TmEcode TmThreadsSlotVarRun(ThreadVars *tv, Packet *p, TmSlot *slot) /* handle error */ if (unlikely(r == TM_ECODE_FAILED)) { /* Encountered error. Return packets to packetpool and return */ - TmqhReleasePacketsToPacketPool(&s->slot_pre_pq); - - SCMutexLock(&s->slot_post_pq.mutex_q); - TmqhReleasePacketsToPacketPool(&s->slot_post_pq); - SCMutexUnlock(&s->slot_post_pq.mutex_q); - - TmThreadsSetFlag(tv, THV_FAILED); + TmThreadsSlotProcessPktFail(tv, s, NULL); return TM_ECODE_FAILED; } @@ -143,14 +135,7 @@ TmEcode TmThreadsSlotVarRun(ThreadVars *tv, Packet *p, TmSlot *slot) if (s->slot_next != NULL) { r = TmThreadsSlotVarRun(tv, extra_p, s->slot_next); if (unlikely(r == TM_ECODE_FAILED)) { - TmqhReleasePacketsToPacketPool(&s->slot_pre_pq); - - SCMutexLock(&s->slot_post_pq.mutex_q); - TmqhReleasePacketsToPacketPool(&s->slot_post_pq); - SCMutexUnlock(&s->slot_post_pq.mutex_q); - - TmqhOutputPacketpool(tv, extra_p); - TmThreadsSetFlag(tv, THV_FAILED); + TmThreadsSlotProcessPktFail(tv, s, extra_p); return TM_ECODE_FAILED; } } @@ -162,21 +147,17 @@ TmEcode TmThreadsSlotVarRun(ThreadVars *tv, Packet *p, TmSlot *slot) } /** \internal - * \brief check 'slot' pre_pq and post_pq at thread cleanup + * \brief check 'slot' pre_pq and thread cleanup * and dump detailed info about the state of the packets * and threads if in a unexpected state. */ static void CheckSlot(const TmSlot *slot) { - if (slot->slot_pre_pq.len || slot->slot_post_pq.len) { + if (slot->slot_pre_pq.len) { for (Packet *xp = slot->slot_pre_pq.top; xp != NULL; xp = xp->next) { SCLogNotice("pre_pq: slot id %u slot tm_id %u pre_pq.len %u packet src %s", slot->id, slot->tm_id, slot->slot_pre_pq.len, PktSrcToString(xp->pkt_src)); } - for (Packet *xp = slot->slot_post_pq.top; xp != NULL; xp = xp->next) { - SCLogNotice("post_pq: slot id %u slot tm_id %u post_pq.len %u packet src %s", - slot->id, slot->tm_id, slot->slot_post_pq.len, PktSrcToString(xp->pkt_src)); - } TmThreadDumpThreads(); abort(); } @@ -316,17 +297,19 @@ static void *TmThreadsSlotPktAcqLoop(void *td) } memset(&slot->slot_pre_pq, 0, sizeof(PacketQueue)); SCMutexInit(&slot->slot_pre_pq.mutex_q, NULL); - memset(&slot->slot_post_pq, 0, sizeof(PacketQueue)); - SCMutexInit(&slot->slot_post_pq.mutex_q, NULL); - - /* get the 'pre qeueue' from module before the stream module */ - if (slot->slot_next != NULL && (slot->slot_next->tm_id == TMM_FLOWWORKER)) { - SCLogDebug("pre-stream packetqueue %p (postq)", &s->slot_post_pq); - tv->stream_pq = &slot->slot_post_pq; - /* if the stream module is the first, get the threads input queue */ - } else if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) { + + /* if the flowworker module is the first, get the threads input queue */ + if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) { tv->stream_pq = &trans_q[tv->inq->id]; - SCLogDebug("pre-stream packetqueue %p (inq)", &slot->slot_pre_pq); + SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq); + /* setup a queue */ + } else if (slot->tm_id == TMM_FLOWWORKER) { + tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue)); + if (tv->stream_pq_local == NULL) + FatalError(SC_ERR_MEM_ALLOC, "failed to alloc PacketQueue"); + SCMutexInit(&tv->stream_pq_local->mutex_q, NULL); + tv->stream_pq = tv->stream_pq_local; + SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq); } } @@ -440,17 +423,19 @@ static void *TmThreadsSlotPktAcqLoopAFL(void *td) } memset(&slot->slot_pre_pq, 0, sizeof(PacketQueue)); SCMutexInit(&slot->slot_pre_pq.mutex_q, NULL); - memset(&slot->slot_post_pq, 0, sizeof(PacketQueue)); - SCMutexInit(&slot->slot_post_pq.mutex_q, NULL); - - /* get the 'pre qeueue' from module before the stream module */ - if (slot->slot_next != NULL && (slot->slot_next->tm_id == TMM_FLOWWORKER)) { - SCLogDebug("pre-stream packetqueue %p (postq)", &s->slot_post_pq); - tv->stream_pq = &slot->slot_post_pq; - /* if the stream module is the first, get the threads input queue */ - } else if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) { + + /* if the flowworker module is the first, get the threads input queue */ + if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) { tv->stream_pq = &trans_q[tv->inq->id]; - SCLogDebug("pre-stream packetqueue %p (inq)", &slot->slot_pre_pq); + SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq); + /* setup a queue */ + } else if (slot->tm_id == TMM_FLOWWORKER) { + tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue)); + if (tv->stream_pq_local == NULL) + FatalError(SC_ERR_MEM_ALLOC, "failed to alloc PacketQueue"); + SCMutexInit(&tv->stream_pq_local->mutex_q, NULL); + tv->stream_pq = tv->stream_pq_local; + SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq); } } @@ -509,10 +494,6 @@ error: } #endif -/** - * \todo Only the first "slot" currently makes the "post_pq" available - * to the thread module. - */ static void *TmThreadsSlotVar(void *td) { ThreadVars *tv = (ThreadVars *)td; @@ -553,20 +534,22 @@ static void *TmThreadsSlotVar(void *td) } memset(&s->slot_pre_pq, 0, sizeof(PacketQueue)); SCMutexInit(&s->slot_pre_pq.mutex_q, NULL); - memset(&s->slot_post_pq, 0, sizeof(PacketQueue)); - SCMutexInit(&s->slot_post_pq.mutex_q, NULL); /* special case: we need to access the stream queue * from the flow timeout code */ - /* get the 'pre qeueue' from module before the stream module */ - if (s->slot_next != NULL && (s->slot_next->tm_id == TMM_FLOWWORKER)) { - SCLogDebug("pre-stream packetqueue %p (preq)", &s->slot_pre_pq); - tv->stream_pq = &s->slot_pre_pq; - /* if the stream module is the first, get the threads input queue */ - } else if (s == (TmSlot *)tv->tm_slots && (s->tm_id == TMM_FLOWWORKER)) { + /* if the flowworker module is the first, get the threads input queue */ + if (s == (TmSlot *)tv->tm_slots && (s->tm_id == TMM_FLOWWORKER)) { tv->stream_pq = &trans_q[tv->inq->id]; - SCLogDebug("pre-stream packetqueue %p (inq)", &s->slot_pre_pq); + SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq); + /* setup a queue */ + } else if (s->tm_id == TMM_FLOWWORKER) { + tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue)); + if (tv->stream_pq_local == NULL) + FatalError(SC_ERR_MEM_ALLOC, "failed to alloc PacketQueue"); + SCMutexInit(&tv->stream_pq_local->mutex_q, NULL); + tv->stream_pq = tv->stream_pq_local; + SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq); } } @@ -598,8 +581,8 @@ static void *TmThreadsSlotVar(void *td) /* output the packet */ tv->tmqh_out(tv, p); - /* now handle the post_pq packets */ - TmThreadsSlotHandlePostPQs(tv, s); + /* now handle the stream pq packets */ + TmThreadsHandleInjectedPackets(tv, s); } if (TmThreadsCheckFlag(tv, THV_KILL)) { @@ -674,7 +657,6 @@ static void *TmThreadsManagement(void *td) (void)SC_ATOMIC_SET(s->slot_data, slot_data); } memset(&s->slot_pre_pq, 0, sizeof(PacketQueue)); - memset(&s->slot_post_pq, 0, sizeof(PacketQueue)); StatsSetupPrivate(tv); @@ -1771,6 +1753,12 @@ static void TmThreadFree(ThreadVars *tv) SCFree(tv->printable_name); } + if (tv->stream_pq_local) { + BUG_ON(tv->stream_pq_local->len); + SCMutexDestroy(&tv->stream_pq_local->mutex_q); + SCFree(tv->stream_pq_local); + } + s = (TmSlot *)tv->tm_slots; while (s) { ps = s; @@ -2137,19 +2125,12 @@ static void TmThreadDoDumpSlots(const ThreadVars *tv) { for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) { TmModule *m = TmModuleGetById(s->tm_id); - SCLogNotice("tv %p: -> slot %p id %d tm_id %d name %s %s", - tv, s, s->id, s->tm_id, m->name, (tv->type == 0 && tv->stream_pq == &s->slot_post_pq) ? "<==== stream_pq" : ""); - if (tv->type == 0 && tv->stream_pq == &s->slot_pre_pq) { - SCLogNotice("tv %p: -> slot %p/%d holds stream_pq %p IN PRE_PQ SUPER WEIRD", tv, s, s->id, tv->stream_pq); - } + SCLogNotice("tv %p: -> slot %p id %d tm_id %d name %s", + tv, s, s->id, s->tm_id, m->name); for (Packet *xp = s->slot_pre_pq.top; xp != NULL; xp = xp->next) { SCLogNotice("tv %p: ==> pre_pq: slot id %u slot tm_id %u pre_pq.len %u packet src %s", tv, s->id, s->tm_id, s->slot_pre_pq.len, PktSrcToString(xp->pkt_src)); } - for (Packet *xp = s->slot_post_pq.top; xp != NULL; xp = xp->next) { - SCLogNotice("tv %p: ==> post_pq: slot id %u slot tm_id %u post_pq.len %u packet src %s", - tv, s->id, s->tm_id, s->slot_post_pq.len, PktSrcToString(xp->pkt_src)); - } } } @@ -2164,6 +2145,11 @@ void TmThreadDumpThreads(void) tv, tv->type, tv->name, tv->tmm_flags, flags, tv->stream_pq); if (tv->inq && tv->stream_pq == &trans_q[tv->inq->id]) { SCLogNotice("tv %p: stream_pq at tv->inq %u", tv, tv->inq->id); + } else if (tv->stream_pq_local != NULL) { + for (Packet *xp = tv->stream_pq_local->top; xp != NULL; xp = xp->next) { + SCLogNotice("tv %p: ==> stream_pq_local: pq.len %u packet src %s", + tv, tv->stream_pq_local->len, PktSrcToString(xp->pkt_src)); + } } TmThreadDoDumpSlots(tv); tv = tv->next; diff --git a/src/tm-threads.h b/src/tm-threads.h index ca7541357c..16130b1753 100644 --- a/src/tm-threads.h +++ b/src/tm-threads.h @@ -71,11 +71,6 @@ typedef struct TmSlot_ { * The locks in the queue are NOT used */ PacketQueue slot_pre_pq; - /* queue filled by the SlotFunc with packets that will - * be processed futher _after_ the current packet. The - * locks in the queue are NOT used */ - PacketQueue slot_post_pq; - /* store the thread module id */ int tm_id; @@ -149,37 +144,38 @@ uint32_t TmThreadCountThreadsByTmmFlags(uint8_t flags); static inline void TmThreadsSlotProcessPktFail(ThreadVars *tv, TmSlot *s, Packet *p) { - TmqhOutputPacketpool(tv, p); - for (TmSlot *slot = s; slot != NULL; slot = slot->slot_next) { - SCMutexLock(&slot->slot_post_pq.mutex_q); - TmqhReleasePacketsToPacketPool(&slot->slot_post_pq); - SCMutexUnlock(&slot->slot_post_pq.mutex_q); + if (p != NULL) { + TmqhOutputPacketpool(tv, p); + } + TmqhReleasePacketsToPacketPool(&s->slot_pre_pq); + if (tv->stream_pq_local) { + SCMutexLock(&tv->stream_pq_local->mutex_q); + TmqhReleasePacketsToPacketPool(tv->stream_pq_local); + SCMutexUnlock(&tv->stream_pq_local->mutex_q); } TmThreadsSetFlag(tv, THV_FAILED); } /** * \brief Handle timeout from the capture layer. Checks - * post-pq which may have been filled by the flow + * stream_pq which may have been filled by the flow * manager. + * \param s pipeline to run on these packets. */ -static inline void TmThreadsSlotHandlePostPQs(ThreadVars *tv, TmSlot *s) +static inline void TmThreadsHandleInjectedPackets(ThreadVars *tv, TmSlot *s) { - /* post process pq: only the first slot will possible have used it */ - if (s->slot_post_pq.top != NULL) { + PacketQueue *pq = tv->stream_pq_local; + if (pq && pq->len > 0) { while (1) { - SCMutexLock(&s->slot_post_pq.mutex_q); - Packet *extra_p = PacketDequeue(&s->slot_post_pq); - SCMutexUnlock(&s->slot_post_pq.mutex_q); + SCMutexLock(&pq->mutex_q); + Packet *extra_p = PacketDequeue(pq); + SCMutexUnlock(&pq->mutex_q); if (extra_p == NULL) break; - - if (s->slot_next != NULL) { - TmEcode r = TmThreadsSlotVarRun(tv, extra_p, s->slot_next); - if (r == TM_ECODE_FAILED) { - TmThreadsSlotProcessPktFail(tv, s, extra_p); - break; - } + TmEcode r = TmThreadsSlotVarRun(tv, extra_p, s); + if (r == TM_ECODE_FAILED) { + TmThreadsSlotProcessPktFail(tv, s, extra_p); + break; } tv->tmqh_out(tv, extra_p); } @@ -204,7 +200,7 @@ static inline TmEcode TmThreadsSlotProcessPkt(ThreadVars *tv, TmSlot *s, Packet tv->tmqh_out(tv, p); - TmThreadsSlotHandlePostPQs(tv, s); + TmThreadsHandleInjectedPackets(tv, s); return TM_ECODE_OK; } @@ -233,7 +229,7 @@ static inline void TmThreadsCaptureHandleTimeout(ThreadVars *tv, TmSlot *slot, P if (TmThreadsCheckFlag(tv, THV_CAPTURE_INJECT_PKT)) { TmThreadsCaptureInjectPacket(tv, slot, p); } else { - TmThreadsSlotHandlePostPQs(tv, slot); + TmThreadsHandleInjectedPackets(tv, slot); /* packet could have been passed to us that we won't use * return it to the pool. */