From: Victor Julien Date: Sun, 3 Nov 2019 15:02:21 +0000 (+0100) Subject: threading: optimize and unify post_pq checks X-Git-Tag: suricata-6.0.0-beta1~815 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=b55f617c2fc8e6ef43a65ae04c67440acba0d767;p=thirdparty%2Fsuricata.git threading: optimize and unify post_pq checks TmThreadsSlotProcessPkt did not need to look all 'slots' as only the first slots post_pq can have been used. Unify post_pq cleanup handling. --- diff --git a/src/tm-threads.c b/src/tm-threads.c index 085617a443..5ad769f1b9 100644 --- a/src/tm-threads.c +++ b/src/tm-threads.c @@ -110,7 +110,7 @@ void TmThreadsUnsetFlag(ThreadVars *tv, uint32_t flag) /** * \brief Separate run function so we can call it recursively. * - * \todo Deal with post_pq for slots beyond the first. + * \note post_pq if only used for first slot */ TmEcode TmThreadsSlotVarRun(ThreadVars *tv, Packet *p, TmSlot *slot) { @@ -602,37 +602,9 @@ static void *TmThreadsSlotVar(void *td) /* output the packet */ tv->tmqh_out(tv, p); - } /* if (p != NULL) */ - - /* now handle the post_pq packets */ - TmSlot *slot; - for (slot = s; slot != NULL; slot = slot->slot_next) { - if (slot->slot_post_pq.top != NULL) { - while (1) { - SCMutexLock(&slot->slot_post_pq.mutex_q); - Packet *extra_p = PacketDequeue(&slot->slot_post_pq); - SCMutexUnlock(&slot->slot_post_pq.mutex_q); - - if (extra_p == NULL) - break; - - if (slot->slot_next != NULL) { - r = TmThreadsSlotVarRun(tv, extra_p, slot->slot_next); - if (r == TM_ECODE_FAILED) { - SCMutexLock(&slot->slot_post_pq.mutex_q); - TmqhReleasePacketsToPacketPool(&slot->slot_post_pq); - SCMutexUnlock(&slot->slot_post_pq.mutex_q); - - TmqhOutputPacketpool(tv, extra_p); - TmThreadsSetFlag(tv, THV_FAILED); - break; - } - } - /* output the packet */ - tv->tmqh_out(tv, extra_p); - } /* while */ - } /* if */ - } /* for */ + /* now handle the post_pq packets */ + TmThreadsSlotHandlePostPQs(tv, s); + } if (TmThreadsCheckFlag(tv, THV_KILL)) { run = 0; diff --git a/src/tm-threads.h b/src/tm-threads.h index 5f8ff1d13b..805dc4ca49 100644 --- a/src/tm-threads.h +++ b/src/tm-threads.h @@ -159,6 +159,34 @@ static inline void TmThreadsSlotProcessPktFail(ThreadVars *tv, TmSlot *s, Packet TmThreadsSetFlag(tv, THV_FAILED); } +/** + * \brief Handle timeout from the capture layer. Checks + * post-pq which may have been filled by the flow + * manager. + */ +static inline void TmThreadsSlotHandlePostPQs(ThreadVars *tv, TmSlot *s) +{ + /* post process pq: only the first slot will possible have used it */ + if (s->slot_post_pq.top != NULL) { + while (1) { + SCMutexLock(&s->slot_post_pq.mutex_q); + Packet *extra_p = PacketDequeue(&s->slot_post_pq); + SCMutexUnlock(&s->slot_post_pq.mutex_q); + if (extra_p == NULL) + break; + + if (s->slot_next != NULL) { + TmEcode r = TmThreadsSlotVarRun(tv, extra_p, s->slot_next); + if (r == TM_ECODE_FAILED) { + TmThreadsSlotProcessPktFail(tv, s, extra_p); + break; + } + } + tv->tmqh_out(tv, extra_p); + } + } +} + /** * \brief Process the rest of the functions (if any) and queue. */ @@ -177,65 +205,8 @@ static inline TmEcode TmThreadsSlotProcessPkt(ThreadVars *tv, TmSlot *s, Packet tv->tmqh_out(tv, p); - /* post process pq */ - for (TmSlot *slot = s; slot != NULL; slot = slot->slot_next) { - if (slot->slot_post_pq.top != NULL) { - while (1) { - SCMutexLock(&slot->slot_post_pq.mutex_q); - Packet *extra_p = PacketDequeue(&slot->slot_post_pq); - SCMutexUnlock(&slot->slot_post_pq.mutex_q); - - if (extra_p == NULL) - break; - - if (slot->slot_next != NULL) { - r = TmThreadsSlotVarRun(tv, extra_p, slot->slot_next); - if (r == TM_ECODE_FAILED) { - TmThreadsSlotProcessPktFail(tv, slot, extra_p); - break; - } - } - tv->tmqh_out(tv, extra_p); - } - } /* if (slot->slot_post_pq.top != NULL) */ - } - return TM_ECODE_OK; -} - -/** - * \brief Handle timeout from the capture layer. Checks - * post-pq which may have been filled by the flow - * manager. - */ -static inline TmEcode TmThreadsSlotHandlePostPQs(ThreadVars *tv, TmSlot *s) -{ - /* post process pq */ - for (TmSlot *slot = s; slot != NULL; slot = slot->slot_next) { - if (slot->slot_post_pq.top != NULL) { - while (1) { - SCMutexLock(&slot->slot_post_pq.mutex_q); - Packet *extra_p = PacketDequeue(&slot->slot_post_pq); - SCMutexUnlock(&slot->slot_post_pq.mutex_q); - - if (extra_p == NULL) - break; + TmThreadsSlotHandlePostPQs(tv, s); - if (slot->slot_next != NULL) { - TmEcode r = TmThreadsSlotVarRun(tv, extra_p, slot->slot_next); - if (r == TM_ECODE_FAILED) { - SCMutexLock(&slot->slot_post_pq.mutex_q); - TmqhReleasePacketsToPacketPool(&slot->slot_post_pq); - SCMutexUnlock(&slot->slot_post_pq.mutex_q); - - TmqhOutputPacketpool(tv, extra_p); - TmThreadsSetFlag(tv, THV_FAILED); - return TM_ECODE_FAILED; - } - } - tv->tmqh_out(tv, extra_p); - } - } - } return TM_ECODE_OK; }