From: Victor Julien Date: Mon, 11 Nov 2019 10:11:55 +0000 (+0100) Subject: threading: add shortcut to flowworker X-Git-Tag: suricata-6.0.0-beta1~805 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=9df8e1c98451ddd689e4ad274729deceebcb5c16;p=thirdparty%2Fsuricata.git threading: add shortcut to flowworker --- diff --git a/src/threadvars.h b/src/threadvars.h index e7b20f38b9..0b0a7acf06 100644 --- a/src/threadvars.h +++ b/src/threadvars.h @@ -82,6 +82,10 @@ typedef struct ThreadVars_ { /** slot functions */ void *(*tm_func)(void *); struct TmSlot_ *tm_slots; + /** pointer to the flowworker in the pipeline. Used as starting point + * for injected packets. Can be NULL if the flowworker is not part + * of this thread. */ + struct TmSlot_ *tm_flowworker; /** Stream packet queue for flow time out injection. Either a pointer to the * workers input queue or to stream_pq_local */ diff --git a/src/tm-threads.c b/src/tm-threads.c index 94e8f2a8f6..b592950a9b 100644 --- a/src/tm-threads.c +++ b/src/tm-threads.c @@ -301,6 +301,7 @@ static void *TmThreadsSlotPktAcqLoop(void *td) /* if the flowworker module is the first, get the threads input queue */ if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) { tv->stream_pq = &trans_q[tv->inq->id]; + tv->tm_flowworker = slot; SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq); /* setup a queue */ } else if (slot->tm_id == TMM_FLOWWORKER) { @@ -309,6 +310,7 @@ static void *TmThreadsSlotPktAcqLoop(void *td) FatalError(SC_ERR_MEM_ALLOC, "failed to alloc PacketQueue"); SCMutexInit(&tv->stream_pq_local->mutex_q, NULL); tv->stream_pq = tv->stream_pq_local; + tv->tm_flowworker = slot; SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq); } } @@ -427,6 +429,7 @@ static void *TmThreadsSlotPktAcqLoopAFL(void *td) /* if the flowworker module is the first, get the threads input queue */ if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) { tv->stream_pq = &trans_q[tv->inq->id]; + tv->tm_flowworker = slot; SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq); /* setup a queue */ } else if (slot->tm_id == TMM_FLOWWORKER) { @@ -435,6 +438,7 @@ static void *TmThreadsSlotPktAcqLoopAFL(void *td) FatalError(SC_ERR_MEM_ALLOC, "failed to alloc PacketQueue"); SCMutexInit(&tv->stream_pq_local->mutex_q, NULL); tv->stream_pq = tv->stream_pq_local; + tv->tm_flowworker = slot; SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq); } } @@ -541,6 +545,7 @@ static void *TmThreadsSlotVar(void *td) /* if the flowworker module is the first, get the threads input queue */ if (s == (TmSlot *)tv->tm_slots && (s->tm_id == TMM_FLOWWORKER)) { tv->stream_pq = &trans_q[tv->inq->id]; + tv->tm_flowworker = s; SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq); /* setup a queue */ } else if (s->tm_id == TMM_FLOWWORKER) { @@ -549,6 +554,7 @@ static void *TmThreadsSlotVar(void *td) FatalError(SC_ERR_MEM_ALLOC, "failed to alloc PacketQueue"); SCMutexInit(&tv->stream_pq_local->mutex_q, NULL); tv->stream_pq = tv->stream_pq_local; + tv->tm_flowworker = s; SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq); } }