]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
threading: add shortcut to flowworker
authorVictor Julien <victor@inliniac.net>
Mon, 11 Nov 2019 10:11:55 +0000 (11:11 +0100)
committerVictor Julien <victor@inliniac.net>
Fri, 7 Feb 2020 14:43:10 +0000 (15:43 +0100)
src/threadvars.h
src/tm-threads.c

index e7b20f38b9347f182e2cce21a25c79b67af062f0..0b0a7acf0600a76a1a783249aad60754173ea90c 100644 (file)
@@ -82,6 +82,10 @@ typedef struct ThreadVars_ {
     /** slot functions */
     void *(*tm_func)(void *);
     struct TmSlot_ *tm_slots;
+    /** pointer to the flowworker in the pipeline. Used as starting point
+     *  for injected packets. Can be NULL if the flowworker is not part
+     *  of this thread. */
+    struct TmSlot_ *tm_flowworker;
 
     /** Stream packet queue for flow time out injection. Either a pointer to the
      *  workers input queue or to stream_pq_local */
index 94e8f2a8f6104517c9b2615f763d7f218b3f5246..b592950a9b73520918dac9b4ec37b07ed522dbc4 100644 (file)
@@ -301,6 +301,7 @@ static void *TmThreadsSlotPktAcqLoop(void *td)
         /* if the flowworker module is the first, get the threads input queue */
         if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) {
             tv->stream_pq = &trans_q[tv->inq->id];
+            tv->tm_flowworker = slot;
             SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
         /* setup a queue */
         } else if (slot->tm_id == TMM_FLOWWORKER) {
@@ -309,6 +310,7 @@ static void *TmThreadsSlotPktAcqLoop(void *td)
                 FatalError(SC_ERR_MEM_ALLOC, "failed to alloc PacketQueue");
             SCMutexInit(&tv->stream_pq_local->mutex_q, NULL);
             tv->stream_pq = tv->stream_pq_local;
+            tv->tm_flowworker = slot;
             SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
         }
     }
@@ -427,6 +429,7 @@ static void *TmThreadsSlotPktAcqLoopAFL(void *td)
         /* if the flowworker module is the first, get the threads input queue */
         if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) {
             tv->stream_pq = &trans_q[tv->inq->id];
+            tv->tm_flowworker = slot;
             SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
         /* setup a queue */
         } else if (slot->tm_id == TMM_FLOWWORKER) {
@@ -435,6 +438,7 @@ static void *TmThreadsSlotPktAcqLoopAFL(void *td)
                 FatalError(SC_ERR_MEM_ALLOC, "failed to alloc PacketQueue");
             SCMutexInit(&tv->stream_pq_local->mutex_q, NULL);
             tv->stream_pq = tv->stream_pq_local;
+            tv->tm_flowworker = slot;
             SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
         }
     }
@@ -541,6 +545,7 @@ static void *TmThreadsSlotVar(void *td)
         /* if the flowworker module is the first, get the threads input queue */
         if (s == (TmSlot *)tv->tm_slots && (s->tm_id == TMM_FLOWWORKER)) {
             tv->stream_pq = &trans_q[tv->inq->id];
+            tv->tm_flowworker = s;
             SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
         /* setup a queue */
         } else if (s->tm_id == TMM_FLOWWORKER) {
@@ -549,6 +554,7 @@ static void *TmThreadsSlotVar(void *td)
                 FatalError(SC_ERR_MEM_ALLOC, "failed to alloc PacketQueue");
             SCMutexInit(&tv->stream_pq_local->mutex_q, NULL);
             tv->stream_pq = tv->stream_pq_local;
+            tv->tm_flowworker = s;
             SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
         }
     }