]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
threads: refactor TmThreadsSlotPktAcqLoop for user threads
authorJason Ish <jason.ish@oisf.net>
Fri, 23 Aug 2024 18:49:20 +0000 (12:49 -0600)
committerVictor Julien <victor@inliniac.net>
Tue, 1 Apr 2025 08:17:05 +0000 (10:17 +0200)
Refactor TmThreadsSlotPktAcqLoop for user provided thread by breaking
out the init and finish code into their own functions.

For user provided threads, Suricata should not "drive" the thread, but
the setup and finish code is the same.

The finish function is exported so it can be called by the user
application when its receive loop or equivalent is done.

Also remove obsolete comment.

Ticket: #7240

src/runmode-lib.c
src/tm-threads.c
src/tm-threads.h

index b5643d99c8337a0f4d155d8370058619bfca490d..2699d47e44c5dbe08acb28b1a5ebf0ffcc7e84c2 100644 (file)
@@ -108,37 +108,5 @@ int RunModeSpawnWorker(void *td)
 /** \brief destroy a worker thread */
 void RunModeDestroyWorker(void *td)
 {
-    ThreadVars *tv = (ThreadVars *)td;
-    TmSlot *s = tv->tm_slots;
-    TmEcode r;
-    TmSlot *slot = NULL;
-
-    StatsSyncCounters(tv);
-
-    TmThreadsSetFlag(tv, THV_FLOW_LOOP);
-
-    /* process all pseudo packets the flow timeout may throw at us */
-    TmThreadTimeoutLoop(tv, s);
-
-    TmThreadsSetFlag(tv, THV_RUNNING_DONE);
-    TmThreadWaitForFlag(tv, THV_DEINIT);
-
-    PacketPoolDestroy();
-
-    for (slot = s; slot != NULL; slot = slot->slot_next) {
-        if (slot->SlotThreadExitPrintStats != NULL) {
-            slot->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(slot->slot_data));
-        }
-
-        if (slot->SlotThreadDeinit != NULL) {
-            r = slot->SlotThreadDeinit(tv, SC_ATOMIC_GET(slot->slot_data));
-            if (r != TM_ECODE_OK) {
-                break;
-            }
-        }
-    }
-
-    tv->stream_pq = NULL;
-    SCLogDebug("%s ending", tv->name);
-    TmThreadsSetFlag(tv, THV_CLOSED);
+    SCTmThreadsSlotPktAcqLoopFinish((ThreadVars *)td);
 }
index a1d74a80fbf270a9c6bdc5162f7538ea8a2bfa53..19aa6eff3d5b3b5aff8ce1e27ae7f70920315190 100644 (file)
@@ -204,37 +204,9 @@ int TmThreadTimeoutLoop(ThreadVars *tv, TmSlot *s)
     return r;
 }
 
-/*
-
-    pcap/nfq
-
-    pkt read
-        callback
-            process_pkt
-
-    pkt read
-        process_pkt
-
-    slot:
-        setup
-
-        pkt_ack_loop(tv, slot_data)
-
-        deinit
-
-    process_pkt:
-        while(s)
-            run s;
-        queue;
-
- */
-
-static void *TmThreadsSlotPktAcqLoop(void *td)
+static bool TmThreadsSlotPktAcqLoopInit(ThreadVars *tv)
 {
-    ThreadVars *tv = (ThreadVars *)td;
     TmSlot *s = tv->tm_slots;
-    TmEcode r = TM_ECODE_OK;
-    TmSlot *slot = NULL;
 
     SCSetThreadName(tv->name);
 
@@ -244,21 +216,10 @@ static void *TmThreadsSlotPktAcqLoop(void *td)
     CaptureStatsSetup(tv);
     PacketPoolInit();
 
-    /* check if we are setup properly */
-    if (s == NULL || s->PktAcqLoop == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
-        SCLogError("TmSlot or ThreadVars badly setup: s=%p,"
-                   " PktAcqLoop=%p, tmqh_in=%p,"
-                   " tmqh_out=%p",
-                s, s ? s->PktAcqLoop : NULL, tv->tmqh_in, tv->tmqh_out);
-        TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
-        pthread_exit(NULL);
-        return NULL;
-    }
-
-    for (slot = s; slot != NULL; slot = slot->slot_next) {
+    for (TmSlot *slot = s; slot != NULL; slot = slot->slot_next) {
         if (slot->SlotThreadInit != NULL) {
             void *slot_data = NULL;
-            r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot_data);
+            TmEcode r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot_data);
             if (r != TM_ECODE_OK) {
                 if (r == TM_ECODE_DONE) {
                     EngineDone();
@@ -280,8 +241,7 @@ static void *TmThreadsSlotPktAcqLoop(void *td)
             tv->flow_queue = FlowQueueNew();
             if (tv->flow_queue == NULL) {
                 TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
-                pthread_exit(NULL);
-                return NULL;
+                goto error;
             }
         /* setup a queue */
         } else if (slot->tm_id == TMM_FLOWWORKER) {
@@ -295,8 +255,7 @@ static void *TmThreadsSlotPktAcqLoop(void *td)
             tv->flow_queue = FlowQueueNew();
             if (tv->flow_queue == NULL) {
                 TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
-                pthread_exit(NULL);
-                return NULL;
+                goto error;
             }
         }
     }
@@ -304,22 +263,18 @@ static void *TmThreadsSlotPktAcqLoop(void *td)
     StatsSetupPrivate(tv);
 
     TmThreadsSetFlag(tv, THV_INIT_DONE);
-    bool run = TmThreadsWaitForUnpause(tv);
 
-    while (run) {
-        r = s->PktAcqLoop(tv, SC_ATOMIC_GET(s->slot_data), s);
+    return true;
+
+error:
+    return false;
+}
+
+bool SCTmThreadsSlotPktAcqLoopFinish(ThreadVars *tv)
+{
+    TmSlot *s = tv->tm_slots;
+    bool rc = true;
 
-        if (r == TM_ECODE_FAILED) {
-            TmThreadsSetFlag(tv, THV_FAILED);
-            run = false;
-        }
-        if (TmThreadsCheckFlag(tv, THV_KILL_PKTACQ) || suricata_ctl_flags) {
-            run = false;
-        }
-        if (r == TM_ECODE_DONE) {
-            run = false;
-        }
-    }
     StatsSyncCounters(tv);
 
     TmThreadsSetFlag(tv, THV_FLOW_LOOP);
@@ -332,28 +287,72 @@ static void *TmThreadsSlotPktAcqLoop(void *td)
 
     PacketPoolDestroy();
 
-    for (slot = s; slot != NULL; slot = slot->slot_next) {
+    for (TmSlot *slot = s; slot != NULL; slot = slot->slot_next) {
         if (slot->SlotThreadExitPrintStats != NULL) {
             slot->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(slot->slot_data));
         }
 
         if (slot->SlotThreadDeinit != NULL) {
-            r = slot->SlotThreadDeinit(tv, SC_ATOMIC_GET(slot->slot_data));
+            TmEcode r = slot->SlotThreadDeinit(tv, SC_ATOMIC_GET(slot->slot_data));
             if (r != TM_ECODE_OK) {
                 TmThreadsSetFlag(tv, THV_CLOSED);
-                goto error;
+                rc = false;
+                break;
             }
         }
     }
 
     tv->stream_pq = NULL;
-    SCLogDebug("%s ending", tv->name);
     TmThreadsSetFlag(tv, THV_CLOSED);
+    return rc;
+}
+
+static void *TmThreadsSlotPktAcqLoop(void *td)
+{
+    ThreadVars *tv = (ThreadVars *)td;
+    TmSlot *s = tv->tm_slots;
+    TmEcode r = TM_ECODE_OK;
+
+    /* check if we are setup properly */
+    if (s == NULL || s->PktAcqLoop == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
+        SCLogError("TmSlot or ThreadVars badly setup: s=%p,"
+                   " PktAcqLoop=%p, tmqh_in=%p,"
+                   " tmqh_out=%p",
+                s, s ? s->PktAcqLoop : NULL, tv->tmqh_in, tv->tmqh_out);
+        TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
+        pthread_exit(NULL);
+        return NULL;
+    }
+
+    if (!TmThreadsSlotPktAcqLoopInit(td)) {
+        goto error;
+    }
+
+    bool run = TmThreadsWaitForUnpause(tv);
+
+    while (run) {
+        r = s->PktAcqLoop(tv, SC_ATOMIC_GET(s->slot_data), s);
+
+        if (r == TM_ECODE_FAILED) {
+            TmThreadsSetFlag(tv, THV_FAILED);
+            run = false;
+        }
+        if (TmThreadsCheckFlag(tv, THV_KILL_PKTACQ) || suricata_ctl_flags) {
+            run = false;
+        }
+        if (r == TM_ECODE_DONE) {
+            run = false;
+        }
+    }
+    if (!SCTmThreadsSlotPktAcqLoopFinish(tv)) {
+        goto error;
+    }
+
+    SCLogDebug("%s ending", tv->name);
     pthread_exit((void *) 0);
     return NULL;
 
 error:
-    tv->stream_pq = NULL;
     pthread_exit(NULL);
     return NULL;
 }
@@ -383,19 +382,6 @@ static void *TmThreadsLib(void *td)
 {
     ThreadVars *tv = (ThreadVars *)td;
     TmSlot *s = tv->tm_slots;
-    TmEcode r = TM_ECODE_OK;
-    TmSlot *slot = NULL;
-
-    /* Set the thread name */
-    SCSetThreadName(tv->name);
-
-    if (tv->thread_setup_flags != 0)
-        TmThreadSetupOptions(tv);
-
-    /* Drop the capabilities for this thread */
-    SCDropCaps(tv);
-
-    PacketPoolInit();
 
     /* check if we are setup properly */
     if (s == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
@@ -406,52 +392,9 @@ static void *TmThreadsLib(void *td)
         return NULL;
     }
 
-    for (slot = s; slot != NULL; slot = slot->slot_next) {
-        if (slot->SlotThreadInit != NULL) {
-            void *slot_data = NULL;
-            r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot_data);
-            if (r != TM_ECODE_OK) {
-                if (r == TM_ECODE_DONE) {
-                    EngineDone();
-                    TmThreadsSetFlag(tv, THV_CLOSED | THV_INIT_DONE | THV_RUNNING_DONE);
-                    goto error;
-                } else {
-                    TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
-                    goto error;
-                }
-            }
-            (void)SC_ATOMIC_SET(slot->slot_data, slot_data);
-        }
-
-        /* if the flowworker module is the first, get the threads input queue */
-        if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) {
-            tv->stream_pq = tv->inq->pq;
-            tv->tm_flowworker = slot;
-            SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
-            tv->flow_queue = FlowQueueNew();
-            if (tv->flow_queue == NULL) {
-                TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
-                return NULL;
-            }
-            /* setup a queue */
-        } else if (slot->tm_id == TMM_FLOWWORKER) {
-            tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue));
-            if (tv->stream_pq_local == NULL)
-                FatalError("failed to alloc PacketQueue");
-            SCMutexInit(&tv->stream_pq_local->mutex_q, NULL);
-            tv->stream_pq = tv->stream_pq_local;
-            tv->tm_flowworker = slot;
-            SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
-            tv->flow_queue = FlowQueueNew();
-            if (tv->flow_queue == NULL) {
-                TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
-                return NULL;
-            }
-        }
+    if (!TmThreadsSlotPktAcqLoopInit(tv)) {
+        goto error;
     }
-    StatsSetupPrivate(tv);
-
-    TmThreadsSetFlag(tv, THV_INIT_DONE);
 
     TmThreadsWaitForUnpause(tv);
 
index 13ce78d51dd2c3f148d1f2d68de74e176bb41754..d4c8e898a50e3114dc87d2a0925e1cdabfa2477a 100644 (file)
@@ -291,6 +291,7 @@ void TmThreadsGetMinimalTimestamp(struct timeval *ts);
 SCTime_t TmThreadsGetThreadTime(const int idx);
 uint16_t TmThreadsGetWorkerThreadMax(void);
 bool TmThreadsTimeSubsysIsReady(void);
+bool SCTmThreadsSlotPktAcqLoopFinish(ThreadVars *tv);
 
 /** \brief Wait for a thread to become unpaused.
  *