From: Victor Julien Date: Fri, 20 Jun 2025 09:03:07 +0000 (+0200) Subject: threads: pktacq loop cleanup X-Git-Tag: suricata-7.0.11~12 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=fe1f846e50dd588319bc620bc3d65360cfae48ab;p=thirdparty%2Fsuricata.git threads: pktacq loop cleanup Manual backport of relevant bits from: 35d7d77ddb05 ("threads: refactor TmThreadsSlotPktAcqLoop for user threads") --- diff --git a/src/tm-threads.c b/src/tm-threads.c index 74c2631551..c7b64a39be 100644 --- a/src/tm-threads.c +++ b/src/tm-threads.c @@ -202,6 +202,44 @@ static int TmThreadTimeoutLoop(ThreadVars *tv, TmSlot *s) return r; } +static bool SCTmThreadsSlotPktAcqLoopFinish(ThreadVars *tv) +{ + TmSlot *s = tv->tm_slots; + bool rc = true; + + StatsSyncCounters(tv); + + TmThreadsSetFlag(tv, THV_FLOW_LOOP); + + /* process all pseudo packets the flow timeout may throw at us */ + TmThreadTimeoutLoop(tv, s); + + TmThreadsSetFlag(tv, THV_RUNNING_DONE); + TmThreadWaitForFlag(tv, THV_DEINIT); + + PacketPoolDestroy(); + + for (TmSlot *slot = s; slot != NULL; slot = slot->slot_next) { + if (slot->SlotThreadExitPrintStats != NULL) { + slot->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(slot->slot_data)); + } + + if (slot->SlotThreadDeinit != NULL) { + TmEcode r = slot->SlotThreadDeinit(tv, SC_ATOMIC_GET(slot->slot_data)); + if (r != TM_ECODE_OK) { + TmThreadsSetFlag(tv, THV_CLOSED); + rc = true; + break; + } + } + } + + tv->stream_pq = NULL; + SCLogDebug("%s ending", tv->name); + TmThreadsSetFlag(tv, THV_CLOSED); + return rc; +} + /* pcap/nfq @@ -229,13 +267,9 @@ static int TmThreadTimeoutLoop(ThreadVars *tv, TmSlot *s) */ -static void *TmThreadsSlotPktAcqLoop(void *td) +static bool TmThreadsSlotPktAcqLoopInit(ThreadVars *tv) { - ThreadVars *tv = (ThreadVars *)td; TmSlot *s = tv->tm_slots; - char run = 1; - TmEcode r = TM_ECODE_OK; - TmSlot *slot = NULL; SCSetThreadName(tv->name); @@ -247,21 +281,10 @@ static void *TmThreadsSlotPktAcqLoop(void *td) CaptureStatsSetup(tv); PacketPoolInit(); - /* check if we are setup properly */ - if (s == NULL || s->PktAcqLoop == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) { - SCLogError("TmSlot or ThreadVars badly setup: s=%p," - " PktAcqLoop=%p, tmqh_in=%p," - " tmqh_out=%p", - s, s ? s->PktAcqLoop : NULL, tv->tmqh_in, tv->tmqh_out); - TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE); - pthread_exit((void *) -1); - return NULL; - } - - for (slot = s; slot != NULL; slot = slot->slot_next) { + for (TmSlot *slot = s; slot != NULL; slot = slot->slot_next) { if (slot->SlotThreadInit != NULL) { void *slot_data = NULL; - r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot_data); + TmEcode r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot_data); if (r != TM_ECODE_OK) { if (r == TM_ECODE_DONE) { EngineDone(); @@ -283,8 +306,7 @@ static void *TmThreadsSlotPktAcqLoop(void *td) tv->flow_queue = FlowQueueNew(); if (tv->flow_queue == NULL) { TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE); - pthread_exit((void *) -1); - return NULL; + goto error; } /* setup a queue */ } else if (slot->tm_id == TMM_FLOWWORKER) { @@ -298,8 +320,7 @@ static void *TmThreadsSlotPktAcqLoop(void *td) tv->flow_queue = FlowQueueNew(); if (tv->flow_queue == NULL) { TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE); - pthread_exit((void *) -1); - return NULL; + goto error; } } } @@ -308,6 +329,33 @@ static void *TmThreadsSlotPktAcqLoop(void *td) TmThreadsSetFlag(tv, THV_INIT_DONE); + return true; +error: + return false; +} + +static void *TmThreadsSlotPktAcqLoop(void *td) +{ + ThreadVars *tv = (ThreadVars *)td; + TmSlot *s = tv->tm_slots; + TmEcode r = TM_ECODE_OK; + + /* check if we are setup properly */ + if (s == NULL || s->PktAcqLoop == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) { + SCLogError("TmSlot or ThreadVars badly setup: s=%p," + " PktAcqLoop=%p, tmqh_in=%p," + " tmqh_out=%p", + s, s ? s->PktAcqLoop : NULL, tv->tmqh_in, tv->tmqh_out); + TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE); + pthread_exit((void *)-1); + return NULL; + } + + if (!TmThreadsSlotPktAcqLoopInit(td)) { + goto error; + } + + char run = 1; while(run) { if (TmThreadsCheckFlag(tv, THV_PAUSE)) { TmThreadsSetFlag(tv, THV_PAUSED); @@ -328,41 +376,18 @@ static void *TmThreadsSlotPktAcqLoop(void *td) run = 0; } } - StatsSyncCounters(tv); - - TmThreadsSetFlag(tv, THV_FLOW_LOOP); - /* process all pseudo packets the flow timeout may throw at us */ - TmThreadTimeoutLoop(tv, s); - - TmThreadsSetFlag(tv, THV_RUNNING_DONE); - TmThreadWaitForFlag(tv, THV_DEINIT); - - PacketPoolDestroy(); - - for (slot = s; slot != NULL; slot = slot->slot_next) { - if (slot->SlotThreadExitPrintStats != NULL) { - slot->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(slot->slot_data)); - } - - if (slot->SlotThreadDeinit != NULL) { - r = slot->SlotThreadDeinit(tv, SC_ATOMIC_GET(slot->slot_data)); - if (r != TM_ECODE_OK) { - TmThreadsSetFlag(tv, THV_CLOSED); - goto error; - } - } + if (!SCTmThreadsSlotPktAcqLoopFinish(tv)) { + goto error; } - tv->stream_pq = NULL; SCLogDebug("%s ending", tv->name); - TmThreadsSetFlag(tv, THV_CLOSED); pthread_exit((void *) 0); return NULL; error: tv->stream_pq = NULL; - pthread_exit((void *) -1); + pthread_exit(NULL); return NULL; }