]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
threading: wait for flow housekeeping at shutdown
authorVictor Julien <vjulien@oisf.net>
Tue, 9 May 2023 13:54:02 +0000 (15:54 +0200)
committerVictor Julien <vjulien@oisf.net>
Wed, 10 May 2023 08:59:31 +0000 (10:59 +0200)
Flow house keeping can accumulate work that wasn't taken into account
during shutdown. This could lead to flows still in the flowworker
thread context when being it was freed, leading to missed work and
memory leaks.

This patch adds a new way of checking if a thread module is still
busy.

Bug: #6062.

src/flow-worker.c
src/tm-modules.h
src/tm-threads.c

index aad47df9821ae2a2661c74ad17d97dc08be97600..9ecfe65f299ad46e24c7ee4197ef9693e302788c 100644 (file)
@@ -660,11 +660,32 @@ static void FlowWorkerExitPrintStats(ThreadVars *tv, void *data)
     OutputLoggerExitPrintStats(tv, fw->output_thread);
 }
 
+static bool FlowWorkerIsBusy(ThreadVars *tv, void *flow_worker)
+{
+    FlowWorkerThreadData *fw = flow_worker;
+    if (fw->pq.len)
+        return true;
+    if (fw->fls.work_queue.len)
+        return true;
+
+    if (tv->flow_queue) {
+        FQLOCK_LOCK(tv->flow_queue);
+        bool fq_done = (tv->flow_queue->qlen == 0);
+        FQLOCK_UNLOCK(tv->flow_queue);
+        if (!fq_done) {
+            return true;
+        }
+    }
+
+    return false;
+}
+
 void TmModuleFlowWorkerRegister (void)
 {
     tmm_modules[TMM_FLOWWORKER].name = "FlowWorker";
     tmm_modules[TMM_FLOWWORKER].ThreadInit = FlowWorkerThreadInit;
     tmm_modules[TMM_FLOWWORKER].Func = FlowWorker;
+    tmm_modules[TMM_FLOWWORKER].ThreadBusy = FlowWorkerIsBusy;
     tmm_modules[TMM_FLOWWORKER].ThreadDeinit = FlowWorkerThreadDeinit;
     tmm_modules[TMM_FLOWWORKER].ThreadExitPrintStats = FlowWorkerExitPrintStats;
     tmm_modules[TMM_FLOWWORKER].cap_flags = 0;
index 3e77db637fcf3b9c10f70a4be50d5265aef34bc2..4642ff46a6ca4afd618eb761b438e891287055c4 100644 (file)
@@ -56,6 +56,12 @@ typedef struct TmModule_ {
     /** terminates the capture loop in PktAcqLoop */
     TmEcode (*PktAcqBreakLoop)(ThreadVars *, void *);
 
+    /** does a thread still have tasks to complete before it can be killed?
+     *  \retval bool
+     *  \param tv threadvars
+     *  \param thread_data thread module thread data (e.g. FlowWorkerThreadData for FlowWorker) */
+    bool (*ThreadBusy)(ThreadVars *tv, void *thread_data);
+
     TmEcode (*Management)(ThreadVars *, void *);
 
     /** global Init/DeInit */
index c3f73d666626bb3d83935259594fd60692409cba..99808ad2ca59257da3cf922cddf388d96d87e9a0 100644 (file)
@@ -1263,6 +1263,18 @@ static int TmThreadKillThread(ThreadVars *tv)
     return 1;
 }
 
+static bool ThreadBusy(ThreadVars *tv)
+{
+    for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) {
+        TmModule *tm = TmModuleGetById(s->tm_id);
+        if (tm && tm->ThreadBusy != NULL) {
+            if (tm->ThreadBusy(tv, SC_ATOMIC_GET(s->slot_data)))
+                return true;
+        }
+    }
+    return false;
+}
+
 /** \internal
  *
  *  \brief make sure that all packet threads are done processing their
@@ -1298,28 +1310,23 @@ again:
             SleepMsec(1);
             goto again;
         }
-        if (tv->flow_queue) {
-            FQLOCK_LOCK(tv->flow_queue);
-            bool fq_done = (tv->flow_queue->qlen == 0);
-            FQLOCK_UNLOCK(tv->flow_queue);
-            if (!fq_done) {
-                SCMutexUnlock(&tv_root_lock);
-
-                Packet *p = PacketGetFromAlloc();
-                if (p != NULL) {
-                    p->flags |= PKT_PSEUDO_STREAM_END;
-                    PKT_SET_SRC(p, PKT_SRC_SHUTDOWN_FLUSH);
-                    PacketQueue *q = tv->stream_pq;
-                    SCMutexLock(&q->mutex_q);
-                    PacketEnqueue(q, p);
-                    SCCondSignal(&q->cond_q);
-                    SCMutexUnlock(&q->mutex_q);
-                }
+        if (ThreadBusy(tv)) {
+            SCMutexUnlock(&tv_root_lock);
 
-                /* don't sleep while holding a lock */
-                SleepMsec(1);
-                goto again;
+            Packet *p = PacketGetFromAlloc();
+            if (p != NULL) {
+                p->flags |= PKT_PSEUDO_STREAM_END;
+                PKT_SET_SRC(p, PKT_SRC_SHUTDOWN_FLUSH);
+                PacketQueue *q = tv->stream_pq;
+                SCMutexLock(&q->mutex_q);
+                PacketEnqueue(q, p);
+                SCCondSignal(&q->cond_q);
+                SCMutexUnlock(&q->mutex_q);
             }
+
+            /* don't sleep while holding a lock */
+            SleepMsec(1);
+            goto again;
         }
         tv = tv->next;
     }
@@ -1387,28 +1394,23 @@ again:
                 goto again;
             }
 
-            if (tv->flow_queue) {
-                FQLOCK_LOCK(tv->flow_queue);
-                bool fq_done = (tv->flow_queue->qlen == 0);
-                FQLOCK_UNLOCK(tv->flow_queue);
-                if (!fq_done) {
-                    SCMutexUnlock(&tv_root_lock);
-
-                    Packet *p = PacketGetFromAlloc();
-                    if (p != NULL) {
-                        p->flags |= PKT_PSEUDO_STREAM_END;
-                        PKT_SET_SRC(p, PKT_SRC_SHUTDOWN_FLUSH);
-                        PacketQueue *q = tv->stream_pq;
-                        SCMutexLock(&q->mutex_q);
-                        PacketEnqueue(q, p);
-                        SCCondSignal(&q->cond_q);
-                        SCMutexUnlock(&q->mutex_q);
-                    }
+            if (ThreadBusy(tv)) {
+                SCMutexUnlock(&tv_root_lock);
 
-                    /* don't sleep while holding a lock */
-                    SleepMsec(1);
-                    goto again;
+                Packet *p = PacketGetFromAlloc();
+                if (p != NULL) {
+                    p->flags |= PKT_PSEUDO_STREAM_END;
+                    PKT_SET_SRC(p, PKT_SRC_SHUTDOWN_FLUSH);
+                    PacketQueue *q = tv->stream_pq;
+                    SCMutexLock(&q->mutex_q);
+                    PacketEnqueue(q, p);
+                    SCCondSignal(&q->cond_q);
+                    SCMutexUnlock(&q->mutex_q);
                 }
+
+                /* don't sleep while holding a lock */
+                SleepMsec(1);
+                goto again;
             }
 
             /* we found a receive TV. Send it a KILL_PKTACQ signal. */