]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
threading: remove per slot post_pq
authorVictor Julien <victor@inliniac.net>
Sat, 9 Nov 2019 19:24:21 +0000 (20:24 +0100)
committerVictor Julien <victor@inliniac.net>
Fri, 7 Feb 2020 14:43:10 +0000 (15:43 +0100)
Use a single packet queue per thread for flow timeout packet
injection. The per slot queue was unused except for this use
case. Having a single queue makes the logic and implementation
simpler.

In case of 'autofp', the per thread packet queue will actually
use the threads input queue. For workers/single a dedicated
queue will be set up.

Rename TmThreadsSlotHandlePostPQs to TmThreadsHandleInjectedPackets
to reflect the changed logic.

src/threadvars.h
src/tm-threads.c
src/tm-threads.h

index 2fec2fa56b5e798488362f6b50d0b73f4cb2bd8b..e7b20f38b9347f182e2cce21a25c79b67af062f0 100644 (file)
@@ -83,8 +83,10 @@ typedef struct ThreadVars_ {
     void *(*tm_func)(void *);
     struct TmSlot_ *tm_slots;
 
-    /** stream packet queue for flow time out injection */
+    /** Stream packet queue for flow time out injection. Either a pointer to the
+     *  workers input queue or to stream_pq_local */
     struct PacketQueue_ *stream_pq;
+    struct PacketQueue_ *stream_pq_local;
 
     uint8_t thread_setup_flags;
 
index 94e9bf310eac491a7df0dfad009ca27864465f71..94e8f2a8f6104517c9b2615f763d7f218b3f5246 100644 (file)
@@ -109,8 +109,6 @@ void TmThreadsUnsetFlag(ThreadVars *tv, uint32_t flag)
 
 /**
  * \brief Separate run function so we can call it recursively.
- *
- * \note post_pq if only used for first slot
  */
 TmEcode TmThreadsSlotVarRun(ThreadVars *tv, Packet *p, TmSlot *slot)
 {
@@ -123,13 +121,7 @@ TmEcode TmThreadsSlotVarRun(ThreadVars *tv, Packet *p, TmSlot *slot)
         /* handle error */
         if (unlikely(r == TM_ECODE_FAILED)) {
             /* Encountered error.  Return packets to packetpool and return */
-            TmqhReleasePacketsToPacketPool(&s->slot_pre_pq);
-
-            SCMutexLock(&s->slot_post_pq.mutex_q);
-            TmqhReleasePacketsToPacketPool(&s->slot_post_pq);
-            SCMutexUnlock(&s->slot_post_pq.mutex_q);
-
-            TmThreadsSetFlag(tv, THV_FAILED);
+            TmThreadsSlotProcessPktFail(tv, s, NULL);
             return TM_ECODE_FAILED;
         }
 
@@ -143,14 +135,7 @@ TmEcode TmThreadsSlotVarRun(ThreadVars *tv, Packet *p, TmSlot *slot)
             if (s->slot_next != NULL) {
                 r = TmThreadsSlotVarRun(tv, extra_p, s->slot_next);
                 if (unlikely(r == TM_ECODE_FAILED)) {
-                    TmqhReleasePacketsToPacketPool(&s->slot_pre_pq);
-
-                    SCMutexLock(&s->slot_post_pq.mutex_q);
-                    TmqhReleasePacketsToPacketPool(&s->slot_post_pq);
-                    SCMutexUnlock(&s->slot_post_pq.mutex_q);
-
-                    TmqhOutputPacketpool(tv, extra_p);
-                    TmThreadsSetFlag(tv, THV_FAILED);
+                    TmThreadsSlotProcessPktFail(tv, s, extra_p);
                     return TM_ECODE_FAILED;
                 }
             }
@@ -162,21 +147,17 @@ TmEcode TmThreadsSlotVarRun(ThreadVars *tv, Packet *p, TmSlot *slot)
 }
 
 /** \internal
- *  \brief check 'slot' pre_pq and post_pq at thread cleanup
+ *  \brief check 'slot' pre_pq and thread cleanup
  *         and dump detailed info about the state of the packets
  *         and threads if in a unexpected state.
  */
 static void CheckSlot(const TmSlot *slot)
 {
-    if (slot->slot_pre_pq.len || slot->slot_post_pq.len) {
+    if (slot->slot_pre_pq.len) {
         for (Packet *xp = slot->slot_pre_pq.top; xp != NULL; xp = xp->next) {
             SCLogNotice("pre_pq: slot id %u slot tm_id %u pre_pq.len %u packet src %s",
                     slot->id, slot->tm_id, slot->slot_pre_pq.len, PktSrcToString(xp->pkt_src));
         }
-        for (Packet *xp = slot->slot_post_pq.top; xp != NULL; xp = xp->next) {
-            SCLogNotice("post_pq: slot id %u slot tm_id %u post_pq.len %u packet src %s",
-                    slot->id, slot->tm_id, slot->slot_post_pq.len, PktSrcToString(xp->pkt_src));
-        }
         TmThreadDumpThreads();
         abort();
     }
@@ -316,17 +297,19 @@ static void *TmThreadsSlotPktAcqLoop(void *td)
         }
         memset(&slot->slot_pre_pq, 0, sizeof(PacketQueue));
         SCMutexInit(&slot->slot_pre_pq.mutex_q, NULL);
-        memset(&slot->slot_post_pq, 0, sizeof(PacketQueue));
-        SCMutexInit(&slot->slot_post_pq.mutex_q, NULL);
-
-        /* get the 'pre qeueue' from module before the stream module */
-        if (slot->slot_next != NULL && (slot->slot_next->tm_id == TMM_FLOWWORKER)) {
-            SCLogDebug("pre-stream packetqueue %p (postq)", &s->slot_post_pq);
-            tv->stream_pq = &slot->slot_post_pq;
-        /* if the stream module is the first, get the threads input queue */
-        } else if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) {
+
+        /* if the flowworker module is the first, get the threads input queue */
+        if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) {
             tv->stream_pq = &trans_q[tv->inq->id];
-            SCLogDebug("pre-stream packetqueue %p (inq)", &slot->slot_pre_pq);
+            SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
+        /* setup a queue */
+        } else if (slot->tm_id == TMM_FLOWWORKER) {
+            tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue));
+            if (tv->stream_pq_local == NULL)
+                FatalError(SC_ERR_MEM_ALLOC, "failed to alloc PacketQueue");
+            SCMutexInit(&tv->stream_pq_local->mutex_q, NULL);
+            tv->stream_pq = tv->stream_pq_local;
+            SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
         }
     }
 
@@ -440,17 +423,19 @@ static void *TmThreadsSlotPktAcqLoopAFL(void *td)
         }
         memset(&slot->slot_pre_pq, 0, sizeof(PacketQueue));
         SCMutexInit(&slot->slot_pre_pq.mutex_q, NULL);
-        memset(&slot->slot_post_pq, 0, sizeof(PacketQueue));
-        SCMutexInit(&slot->slot_post_pq.mutex_q, NULL);
-
-        /* get the 'pre qeueue' from module before the stream module */
-        if (slot->slot_next != NULL && (slot->slot_next->tm_id == TMM_FLOWWORKER)) {
-            SCLogDebug("pre-stream packetqueue %p (postq)", &s->slot_post_pq);
-            tv->stream_pq = &slot->slot_post_pq;
-        /* if the stream module is the first, get the threads input queue */
-        } else if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) {
+
+        /* if the flowworker module is the first, get the threads input queue */
+        if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) {
             tv->stream_pq = &trans_q[tv->inq->id];
-            SCLogDebug("pre-stream packetqueue %p (inq)", &slot->slot_pre_pq);
+            SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
+        /* setup a queue */
+        } else if (slot->tm_id == TMM_FLOWWORKER) {
+            tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue));
+            if (tv->stream_pq_local == NULL)
+                FatalError(SC_ERR_MEM_ALLOC, "failed to alloc PacketQueue");
+            SCMutexInit(&tv->stream_pq_local->mutex_q, NULL);
+            tv->stream_pq = tv->stream_pq_local;
+            SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
         }
     }
 
@@ -509,10 +494,6 @@ error:
 }
 #endif
 
-/**
- * \todo Only the first "slot" currently makes the "post_pq" available
- *       to the thread module.
- */
 static void *TmThreadsSlotVar(void *td)
 {
     ThreadVars *tv = (ThreadVars *)td;
@@ -553,20 +534,22 @@ static void *TmThreadsSlotVar(void *td)
         }
         memset(&s->slot_pre_pq, 0, sizeof(PacketQueue));
         SCMutexInit(&s->slot_pre_pq.mutex_q, NULL);
-        memset(&s->slot_post_pq, 0, sizeof(PacketQueue));
-        SCMutexInit(&s->slot_post_pq.mutex_q, NULL);
 
         /* special case: we need to access the stream queue
          * from the flow timeout code */
 
-        /* get the 'pre qeueue' from module before the stream module */
-        if (s->slot_next != NULL && (s->slot_next->tm_id == TMM_FLOWWORKER)) {
-            SCLogDebug("pre-stream packetqueue %p (preq)", &s->slot_pre_pq);
-            tv->stream_pq = &s->slot_pre_pq;
-        /* if the stream module is the first, get the threads input queue */
-        } else if (s == (TmSlot *)tv->tm_slots && (s->tm_id == TMM_FLOWWORKER)) {
+        /* if the flowworker module is the first, get the threads input queue */
+        if (s == (TmSlot *)tv->tm_slots && (s->tm_id == TMM_FLOWWORKER)) {
             tv->stream_pq = &trans_q[tv->inq->id];
-            SCLogDebug("pre-stream packetqueue %p (inq)", &s->slot_pre_pq);
+            SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
+        /* setup a queue */
+        } else if (s->tm_id == TMM_FLOWWORKER) {
+            tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue));
+            if (tv->stream_pq_local == NULL)
+                FatalError(SC_ERR_MEM_ALLOC, "failed to alloc PacketQueue");
+            SCMutexInit(&tv->stream_pq_local->mutex_q, NULL);
+            tv->stream_pq = tv->stream_pq_local;
+            SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
         }
     }
 
@@ -598,8 +581,8 @@ static void *TmThreadsSlotVar(void *td)
             /* output the packet */
             tv->tmqh_out(tv, p);
 
-            /* now handle the post_pq packets */
-            TmThreadsSlotHandlePostPQs(tv, s);
+            /* now handle the stream pq packets */
+            TmThreadsHandleInjectedPackets(tv, s);
         }
 
         if (TmThreadsCheckFlag(tv, THV_KILL)) {
@@ -674,7 +657,6 @@ static void *TmThreadsManagement(void *td)
         (void)SC_ATOMIC_SET(s->slot_data, slot_data);
     }
     memset(&s->slot_pre_pq, 0, sizeof(PacketQueue));
-    memset(&s->slot_post_pq, 0, sizeof(PacketQueue));
 
     StatsSetupPrivate(tv);
 
@@ -1771,6 +1753,12 @@ static void TmThreadFree(ThreadVars *tv)
         SCFree(tv->printable_name);
     }
 
+    if (tv->stream_pq_local) {
+        BUG_ON(tv->stream_pq_local->len);
+        SCMutexDestroy(&tv->stream_pq_local->mutex_q);
+        SCFree(tv->stream_pq_local);
+    }
+
     s = (TmSlot *)tv->tm_slots;
     while (s) {
         ps = s;
@@ -2137,19 +2125,12 @@ static void TmThreadDoDumpSlots(const ThreadVars *tv)
 {
     for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) {
         TmModule *m = TmModuleGetById(s->tm_id);
-        SCLogNotice("tv %p: -> slot %p id %d tm_id %d name %s %s",
-            tv, s, s->id, s->tm_id, m->name, (tv->type == 0 && tv->stream_pq == &s->slot_post_pq) ? "<==== stream_pq" : "");
-        if (tv->type == 0 && tv->stream_pq == &s->slot_pre_pq) {
-            SCLogNotice("tv %p: -> slot %p/%d holds stream_pq %p IN PRE_PQ SUPER WEIRD", tv, s, s->id, tv->stream_pq);
-        }
+        SCLogNotice("tv %p: -> slot %p id %d tm_id %d name %s",
+            tv, s, s->id, s->tm_id, m->name);
         for (Packet *xp = s->slot_pre_pq.top; xp != NULL; xp = xp->next) {
             SCLogNotice("tv %p: ==> pre_pq: slot id %u slot tm_id %u pre_pq.len %u packet src %s",
                     tv, s->id, s->tm_id, s->slot_pre_pq.len, PktSrcToString(xp->pkt_src));
         }
-        for (Packet *xp = s->slot_post_pq.top; xp != NULL; xp = xp->next) {
-            SCLogNotice("tv %p: ==> post_pq: slot id %u slot tm_id %u post_pq.len %u packet src %s",
-                    tv, s->id, s->tm_id, s->slot_post_pq.len, PktSrcToString(xp->pkt_src));
-        }
     }
 }
 
@@ -2164,6 +2145,11 @@ void TmThreadDumpThreads(void)
                     tv, tv->type, tv->name, tv->tmm_flags, flags, tv->stream_pq);
             if (tv->inq && tv->stream_pq == &trans_q[tv->inq->id]) {
                 SCLogNotice("tv %p: stream_pq at tv->inq %u", tv, tv->inq->id);
+            } else if (tv->stream_pq_local != NULL) {
+                for (Packet *xp = tv->stream_pq_local->top; xp != NULL; xp = xp->next) {
+                    SCLogNotice("tv %p: ==> stream_pq_local: pq.len %u packet src %s",
+                            tv, tv->stream_pq_local->len, PktSrcToString(xp->pkt_src));
+                }
             }
             TmThreadDoDumpSlots(tv);
             tv = tv->next;
index ca7541357cd57b7efa312b22259969caeee9a857..16130b1753583650b74570507c486de8c81a5bf2 100644 (file)
@@ -71,11 +71,6 @@ typedef struct TmSlot_ {
      * The locks in the queue are NOT used */
     PacketQueue slot_pre_pq;
 
-    /* queue filled by the SlotFunc with packets that will
-     * be processed futher _after_ the current packet. The
-     * locks in the queue are NOT used */
-    PacketQueue slot_post_pq;
-
     /* store the thread module id */
     int tm_id;
 
@@ -149,37 +144,38 @@ uint32_t TmThreadCountThreadsByTmmFlags(uint8_t flags);
 
 static inline void TmThreadsSlotProcessPktFail(ThreadVars *tv, TmSlot *s, Packet *p)
 {
-    TmqhOutputPacketpool(tv, p);
-    for (TmSlot *slot = s; slot != NULL; slot = slot->slot_next) {
-        SCMutexLock(&slot->slot_post_pq.mutex_q);
-        TmqhReleasePacketsToPacketPool(&slot->slot_post_pq);
-        SCMutexUnlock(&slot->slot_post_pq.mutex_q);
+    if (p != NULL) {
+        TmqhOutputPacketpool(tv, p);
+    }
+    TmqhReleasePacketsToPacketPool(&s->slot_pre_pq);
+    if (tv->stream_pq_local) {
+        SCMutexLock(&tv->stream_pq_local->mutex_q);
+        TmqhReleasePacketsToPacketPool(tv->stream_pq_local);
+        SCMutexUnlock(&tv->stream_pq_local->mutex_q);
     }
     TmThreadsSetFlag(tv, THV_FAILED);
 }
 
 /**
  *  \brief Handle timeout from the capture layer. Checks
- *         post-pq which may have been filled by the flow
+ *         stream_pq which may have been filled by the flow
  *         manager.
+ *  \param s pipeline to run on these packets.
  */
-static inline void TmThreadsSlotHandlePostPQs(ThreadVars *tv, TmSlot *s)
+static inline void TmThreadsHandleInjectedPackets(ThreadVars *tv, TmSlot *s)
 {
-    /* post process pq: only the first slot will possible have used it */
-    if (s->slot_post_pq.top != NULL) {
+    PacketQueue *pq = tv->stream_pq_local;
+    if (pq && pq->len > 0) {
         while (1) {
-            SCMutexLock(&s->slot_post_pq.mutex_q);
-            Packet *extra_p = PacketDequeue(&s->slot_post_pq);
-            SCMutexUnlock(&s->slot_post_pq.mutex_q);
+            SCMutexLock(&pq->mutex_q);
+            Packet *extra_p = PacketDequeue(pq);
+            SCMutexUnlock(&pq->mutex_q);
             if (extra_p == NULL)
                 break;
-
-            if (s->slot_next != NULL) {
-                TmEcode r = TmThreadsSlotVarRun(tv, extra_p, s->slot_next);
-                if (r == TM_ECODE_FAILED) {
-                    TmThreadsSlotProcessPktFail(tv, s, extra_p);
-                    break;
-                }
+            TmEcode r = TmThreadsSlotVarRun(tv, extra_p, s);
+            if (r == TM_ECODE_FAILED) {
+                TmThreadsSlotProcessPktFail(tv, s, extra_p);
+                break;
             }
             tv->tmqh_out(tv, extra_p);
         }
@@ -204,7 +200,7 @@ static inline TmEcode TmThreadsSlotProcessPkt(ThreadVars *tv, TmSlot *s, Packet
 
     tv->tmqh_out(tv, p);
 
-    TmThreadsSlotHandlePostPQs(tv, s);
+    TmThreadsHandleInjectedPackets(tv, s);
 
     return TM_ECODE_OK;
 }
@@ -233,7 +229,7 @@ static inline void TmThreadsCaptureHandleTimeout(ThreadVars *tv, TmSlot *slot, P
     if (TmThreadsCheckFlag(tv, THV_CAPTURE_INJECT_PKT)) {
         TmThreadsCaptureInjectPacket(tv, slot, p);
     } else {
-        TmThreadsSlotHandlePostPQs(tv, slot);
+        TmThreadsHandleInjectedPackets(tv, slot);
 
         /* packet could have been passed to us that we won't use
          * return it to the pool. */