From: Victor Julien Date: Tue, 9 May 2023 13:54:02 +0000 (+0200) Subject: threading: wait for flow housekeeping at shutdown X-Git-Tag: suricata-7.0.0-rc2~178 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=fa3f16ec757affee44439744ef10aa42e57dd750;p=thirdparty%2Fsuricata.git threading: wait for flow housekeeping at shutdown Flow house keeping can accumulate work that wasn't taken into account during shutdown. This could lead to flows still in the flowworker thread context when being it was freed, leading to missed work and memory leaks. This patch adds a new way of checking if a thread module is still busy. Bug: #6062. --- diff --git a/src/flow-worker.c b/src/flow-worker.c index aad47df982..9ecfe65f29 100644 --- a/src/flow-worker.c +++ b/src/flow-worker.c @@ -660,11 +660,32 @@ static void FlowWorkerExitPrintStats(ThreadVars *tv, void *data) OutputLoggerExitPrintStats(tv, fw->output_thread); } +static bool FlowWorkerIsBusy(ThreadVars *tv, void *flow_worker) +{ + FlowWorkerThreadData *fw = flow_worker; + if (fw->pq.len) + return true; + if (fw->fls.work_queue.len) + return true; + + if (tv->flow_queue) { + FQLOCK_LOCK(tv->flow_queue); + bool fq_done = (tv->flow_queue->qlen == 0); + FQLOCK_UNLOCK(tv->flow_queue); + if (!fq_done) { + return true; + } + } + + return false; +} + void TmModuleFlowWorkerRegister (void) { tmm_modules[TMM_FLOWWORKER].name = "FlowWorker"; tmm_modules[TMM_FLOWWORKER].ThreadInit = FlowWorkerThreadInit; tmm_modules[TMM_FLOWWORKER].Func = FlowWorker; + tmm_modules[TMM_FLOWWORKER].ThreadBusy = FlowWorkerIsBusy; tmm_modules[TMM_FLOWWORKER].ThreadDeinit = FlowWorkerThreadDeinit; tmm_modules[TMM_FLOWWORKER].ThreadExitPrintStats = FlowWorkerExitPrintStats; tmm_modules[TMM_FLOWWORKER].cap_flags = 0; diff --git a/src/tm-modules.h b/src/tm-modules.h index 3e77db637f..4642ff46a6 100644 --- a/src/tm-modules.h +++ b/src/tm-modules.h @@ -56,6 +56,12 @@ typedef struct TmModule_ { /** terminates the capture loop in PktAcqLoop */ TmEcode (*PktAcqBreakLoop)(ThreadVars *, void *); + /** does a thread still have tasks to complete before it can be killed? + * \retval bool + * \param tv threadvars + * \param thread_data thread module thread data (e.g. FlowWorkerThreadData for FlowWorker) */ + bool (*ThreadBusy)(ThreadVars *tv, void *thread_data); + TmEcode (*Management)(ThreadVars *, void *); /** global Init/DeInit */ diff --git a/src/tm-threads.c b/src/tm-threads.c index c3f73d6666..99808ad2ca 100644 --- a/src/tm-threads.c +++ b/src/tm-threads.c @@ -1263,6 +1263,18 @@ static int TmThreadKillThread(ThreadVars *tv) return 1; } +static bool ThreadBusy(ThreadVars *tv) +{ + for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) { + TmModule *tm = TmModuleGetById(s->tm_id); + if (tm && tm->ThreadBusy != NULL) { + if (tm->ThreadBusy(tv, SC_ATOMIC_GET(s->slot_data))) + return true; + } + } + return false; +} + /** \internal * * \brief make sure that all packet threads are done processing their @@ -1298,28 +1310,23 @@ again: SleepMsec(1); goto again; } - if (tv->flow_queue) { - FQLOCK_LOCK(tv->flow_queue); - bool fq_done = (tv->flow_queue->qlen == 0); - FQLOCK_UNLOCK(tv->flow_queue); - if (!fq_done) { - SCMutexUnlock(&tv_root_lock); - - Packet *p = PacketGetFromAlloc(); - if (p != NULL) { - p->flags |= PKT_PSEUDO_STREAM_END; - PKT_SET_SRC(p, PKT_SRC_SHUTDOWN_FLUSH); - PacketQueue *q = tv->stream_pq; - SCMutexLock(&q->mutex_q); - PacketEnqueue(q, p); - SCCondSignal(&q->cond_q); - SCMutexUnlock(&q->mutex_q); - } + if (ThreadBusy(tv)) { + SCMutexUnlock(&tv_root_lock); - /* don't sleep while holding a lock */ - SleepMsec(1); - goto again; + Packet *p = PacketGetFromAlloc(); + if (p != NULL) { + p->flags |= PKT_PSEUDO_STREAM_END; + PKT_SET_SRC(p, PKT_SRC_SHUTDOWN_FLUSH); + PacketQueue *q = tv->stream_pq; + SCMutexLock(&q->mutex_q); + PacketEnqueue(q, p); + SCCondSignal(&q->cond_q); + SCMutexUnlock(&q->mutex_q); } + + /* don't sleep while holding a lock */ + SleepMsec(1); + goto again; } tv = tv->next; } @@ -1387,28 +1394,23 @@ again: goto again; } - if (tv->flow_queue) { - FQLOCK_LOCK(tv->flow_queue); - bool fq_done = (tv->flow_queue->qlen == 0); - FQLOCK_UNLOCK(tv->flow_queue); - if (!fq_done) { - SCMutexUnlock(&tv_root_lock); - - Packet *p = PacketGetFromAlloc(); - if (p != NULL) { - p->flags |= PKT_PSEUDO_STREAM_END; - PKT_SET_SRC(p, PKT_SRC_SHUTDOWN_FLUSH); - PacketQueue *q = tv->stream_pq; - SCMutexLock(&q->mutex_q); - PacketEnqueue(q, p); - SCCondSignal(&q->cond_q); - SCMutexUnlock(&q->mutex_q); - } + if (ThreadBusy(tv)) { + SCMutexUnlock(&tv_root_lock); - /* don't sleep while holding a lock */ - SleepMsec(1); - goto again; + Packet *p = PacketGetFromAlloc(); + if (p != NULL) { + p->flags |= PKT_PSEUDO_STREAM_END; + PKT_SET_SRC(p, PKT_SRC_SHUTDOWN_FLUSH); + PacketQueue *q = tv->stream_pq; + SCMutexLock(&q->mutex_q); + PacketEnqueue(q, p); + SCCondSignal(&q->cond_q); + SCMutexUnlock(&q->mutex_q); } + + /* don't sleep while holding a lock */ + SleepMsec(1); + goto again; } /* we found a receive TV. Send it a KILL_PKTACQ signal. */