]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
threading: hide 'trans_q' from queue handlers
authorVictor Julien <victor@inliniac.net>
Wed, 13 Nov 2019 14:59:51 +0000 (15:59 +0100)
committerVictor Julien <victor@inliniac.net>
Fri, 7 Feb 2020 14:43:10 +0000 (15:43 +0100)
src/counters.c
src/detect-engine.c
src/tm-queues.c
src/tm-queues.h
src/tm-threads.c
src/tmqh-flow.c
src/tmqh-simple.c

index 7f863b75ef878858d98826fa8e33c9ef6cdf9258..91141da8435bbf635e12a07056bd70ea14ca820c 100644 (file)
@@ -499,7 +499,7 @@ static void *StatsWakeupThread(void *arg)
             tv->perf_public_ctx.perf_flag = 1;
 
             if (tv->inq != NULL) {
-                PacketQueue *q = &trans_q[tv->inq->id];
+                PacketQueue *q = tv->inq->pq;
                 SCCondSignal(&q->cond_q);
             }
 
index 39f73539d60f0ff3029a989870c78e1ff51690cc..969e070c932ceb71648d2e5543e7941c971cd6c3 100644 (file)
@@ -1731,7 +1731,7 @@ static void InjectPackets(ThreadVars **detect_tvs,
                 if (p != NULL) {
                     p->flags |= PKT_PSEUDO_STREAM_END;
                     PKT_SET_SRC(p, PKT_SRC_DETECT_RELOAD_FLUSH);
-                    PacketQueue *q = &trans_q[detect_tvs[i]->inq->id];
+                    PacketQueue *q = detect_tvs[i]->inq->pq;
                     SCMutexLock(&q->mutex_q);
                     PacketEnqueue(q, p);
                     SCCondSignal(&q->cond_q);
index 888a4bc0bd55b1e30f16e4ef35e0d174116bef1c..1d1e1ccff1869dc9210838471de2190a8bd1b2a8 100644 (file)
@@ -46,6 +46,8 @@ Tmq *TmqCreateQueue(const char *name)
     q->id = tmq_id++;
     q->is_packet_pool = (strcmp(q->name, "packetpool") == 0);
 
+    q->pq = &trans_q[q->id];
+
     SCLogDebug("created queue \'%s\', %p", name, q);
     return q;
 
@@ -67,9 +69,9 @@ void TmqDebugList(void)
 {
     for (int i = 0; i < tmq_id; i++) {
         /* get a lock accessing the len */
-        SCMutexLock(&trans_q[tmqs[i].id].mutex_q);
-        printf("TmqDebugList: id %" PRIu32 ", name \'%s\', len %" PRIu32 "\n", tmqs[i].id, tmqs[i].name, trans_q[tmqs[i].id].len);
-        SCMutexUnlock(&trans_q[tmqs[i].id].mutex_q);
+        SCMutexLock(&tmqs[i].pq->mutex_q);
+        printf("TmqDebugList: id %" PRIu32 ", name \'%s\', len %" PRIu32 "\n", tmqs[i].id, tmqs[i].name, tmqs[i].pq->len);
+        SCMutexUnlock(&tmqs[i].pq->mutex_q);
     }
 }
 
@@ -93,7 +95,7 @@ void TmValidateQueueState(void)
     bool err = false;
 
     for (int i = 0; i < tmq_id; i++) {
-        SCMutexLock(&trans_q[tmqs[i].id].mutex_q);
+        SCMutexLock(&tmqs[i].pq->mutex_q);
         if (tmqs[i].reader_cnt == 0) {
             SCLogError(SC_ERR_THREAD_QUEUE, "queue \"%s\" doesn't have a reader (id %d, max %u)", tmqs[i].name, i, tmq_id);
             err = true;
@@ -101,7 +103,7 @@ void TmValidateQueueState(void)
             SCLogError(SC_ERR_THREAD_QUEUE, "queue \"%s\" doesn't have a writer (id %d, max %u)", tmqs[i].name, i, tmq_id);
             err = true;
         }
-        SCMutexUnlock(&trans_q[tmqs[i].id].mutex_q);
+        SCMutexUnlock(&tmqs[i].pq->mutex_q);
 
         if (err == true)
             goto error;
index 5b72c3aa0ac535da64ee8710080c1209e3da160c..96bc22a63a50ead20885b5cad00dd23043a9b0f5 100644 (file)
 #ifndef __TM_QUEUES_H__
 #define __TM_QUEUES_H__
 
+#include "packet-queue.h"
+
 typedef struct Tmq_ {
     char *name;
     bool is_packet_pool;
     uint16_t id;
     uint16_t reader_cnt;
     uint16_t writer_cnt;
+    PacketQueue *pq;
 } Tmq;
 
 Tmq* TmqCreateQueue(const char *name);
index d7904fe5fdeeda2a77d97d732c6c58a136e21e3a..c772823190262c67b60746fbb7a0e61090c6fa01 100644 (file)
@@ -273,7 +273,7 @@ static void *TmThreadsSlotPktAcqLoop(void *td)
 
         /* if the flowworker module is the first, get the threads input queue */
         if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) {
-            tv->stream_pq = &trans_q[tv->inq->id];
+            tv->stream_pq = tv->inq->pq;
             tv->tm_flowworker = slot;
             SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
         /* setup a queue */
@@ -398,7 +398,7 @@ static void *TmThreadsSlotPktAcqLoopAFL(void *td)
 
         /* if the flowworker module is the first, get the threads input queue */
         if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) {
-            tv->stream_pq = &trans_q[tv->inq->id];
+            tv->stream_pq = tv->inq->pq;
             tv->tm_flowworker = slot;
             SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
         /* setup a queue */
@@ -510,7 +510,7 @@ static void *TmThreadsSlotVar(void *td)
 
         /* if the flowworker module is the first, get the threads input queue */
         if (s == (TmSlot *)tv->tm_slots && (s->tm_id == TMM_FLOWWORKER)) {
-            tv->stream_pq = &trans_q[tv->inq->id];
+            tv->stream_pq = tv->inq->pq;
             tv->tm_flowworker = s;
             SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
         /* setup a queue */
@@ -1313,7 +1313,7 @@ static bool ThreadStillHasPackets(ThreadVars *tv)
         /* we wait till we dry out all the inq packets, before we
          * kill this thread.  Do note that you should have disabled
          * packet acquire by now using TmThreadDisableReceiveThreads()*/
-        PacketQueue *q = &trans_q[tv->inq->id];
+        PacketQueue *q = tv->inq->pq;
         SCMutexLock(&q->mutex_q);
         uint32_t len = q->len;
         SCMutexUnlock(&q->mutex_q);
@@ -1367,7 +1367,7 @@ static int TmThreadKillThread(ThreadVars *tv)
         }
         if (tv->inq != NULL) {
             for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
-                SCCondSignal(&trans_q[tv->inq->id].cond_q);
+                SCCondSignal(&tv->inq->pq->cond_q);
             }
             SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
         }
@@ -1503,7 +1503,7 @@ again:
 
             if (tv->inq != NULL) {
                 for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
-                    SCCondSignal(&trans_q[tv->inq->id].cond_q);
+                    SCCondSignal(&tv->inq->pq->cond_q);
                 }
                 SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
             }
@@ -1579,7 +1579,7 @@ again:
          * THV_KILL flag. */
         if (tv->inq != NULL) {
             for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
-                SCCondSignal(&trans_q[tv->inq->id].cond_q);
+                SCCondSignal(&tv->inq->pq->cond_q);
             }
             SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
         }
@@ -2075,7 +2075,7 @@ void TmThreadDumpThreads(void)
             const uint32_t flags = SC_ATOMIC_GET(tv->flags);
             SCLogNotice("tv %p: type %u name %s tmm_flags %02X flags %X stream_pq %p",
                     tv, tv->type, tv->name, tv->tmm_flags, flags, tv->stream_pq);
-            if (tv->inq && tv->stream_pq == &trans_q[tv->inq->id]) {
+            if (tv->inq && tv->stream_pq == tv->inq->pq) {
                 SCLogNotice("tv %p: stream_pq at tv->inq %u", tv, tv->inq->id);
             } else if (tv->stream_pq_local != NULL) {
                 for (Packet *xp = tv->stream_pq_local->top; xp != NULL; xp = xp->next) {
@@ -2287,7 +2287,7 @@ int TmThreadsInjectPacketsById(Packet **packets, const int id)
 
     /* wake up listening thread(s) if necessary */
     if (tv->inq != NULL) {
-        SCCondSignal(&trans_q[tv->inq->id].cond_q);
+        SCCondSignal(&tv->inq->pq->cond_q);
     }
     return 1;
 }
index 6b5f323ed58ae2b1825921c05c075d21ebdaef48..4ddb4193db161317b637d69e4a3d01717279dca9 100644 (file)
@@ -94,7 +94,7 @@ void TmqhFlowPrintAutofpHandler(void)
 /* same as 'simple' */
 Packet *TmqhInputFlow(ThreadVars *tv)
 {
-    PacketQueue *q = &trans_q[tv->inq->id];
+    PacketQueue *q = tv->inq->pq;
 
     StatsSyncCountersIfSignalled(tv);
 
@@ -126,8 +126,6 @@ static int StoreQueueId(TmqhFlowCtx *ctx, char *name)
     }
     tmq->writer_cnt++;
 
-    uint16_t id = tmq->id;
-
     if (ctx->queues == NULL) {
         ctx->size = 1;
         ctx->queues = SCMalloc(ctx->size * sizeof(TmqhFlowMode));
@@ -147,7 +145,7 @@ static int StoreQueueId(TmqhFlowCtx *ctx, char *name)
 
         memset(ctx->queues + (ctx->size - 1), 0, sizeof(TmqhFlowMode));
     }
-    ctx->queues[ctx->size - 1].q = &trans_q[id];
+    ctx->queues[ctx->size - 1].q = tmq->pq;
 
     return 0;
 }
@@ -284,138 +282,98 @@ void TmqhOutputFlowIPPair(ThreadVars *tv, Packet *p)
 
 static int TmqhOutputFlowSetupCtxTest01(void)
 {
-    int retval = 0;
-    Tmq *tmq = NULL;
-    TmqhFlowCtx *fctx = NULL;
-
     TmqResetQueues();
 
-    tmq = TmqCreateQueue("queue1");
-    if (tmq == NULL)
-        goto end;
-    tmq = TmqCreateQueue("queue2");
-    if (tmq == NULL)
-        goto end;
-    tmq = TmqCreateQueue("another");
-    if (tmq == NULL)
-        goto end;
-    tmq = TmqCreateQueue("yetanother");
-    if (tmq == NULL)
-        goto end;
+    Tmq *tmq1 = TmqCreateQueue("queue1");
+    FAIL_IF_NULL(tmq1);
+    Tmq *tmq2 = TmqCreateQueue("queue2");
+    FAIL_IF_NULL(tmq2);
+    Tmq *tmq3 = TmqCreateQueue("another");
+    FAIL_IF_NULL(tmq3);
+    Tmq *tmq4 = TmqCreateQueue("yetanother");
+    FAIL_IF_NULL(tmq4);
 
     const char *str = "queue1,queue2,another,yetanother";
     void *ctx = TmqhOutputFlowSetupCtx(str);
+    FAIL_IF_NULL(ctx);
 
-    if (ctx == NULL)
-        goto end;
-
-    fctx = (TmqhFlowCtx *)ctx;
+    TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx;
 
-    if (fctx->size != 4)
-        goto end;
+    FAIL_IF_NOT(fctx->size == 4);
 
-    if (fctx->queues == NULL)
-        goto end;
+    FAIL_IF_NULL(fctx->queues);
 
-    if (fctx->queues[0].q != &trans_q[0])
-        goto end;
-    if (fctx->queues[1].q != &trans_q[1])
-        goto end;
-    if (fctx->queues[2].q != &trans_q[2])
-        goto end;
-    if (fctx->queues[3].q != &trans_q[3])
-        goto end;
+    FAIL_IF_NOT(fctx->queues[0].q == tmq1->pq);
+    FAIL_IF_NOT(fctx->queues[1].q == tmq2->pq);
+    FAIL_IF_NOT(fctx->queues[2].q == tmq3->pq);
+    FAIL_IF_NOT(fctx->queues[3].q == tmq4->pq);
 
-    retval = 1;
-end:
-    if (fctx != NULL)
-        TmqhOutputFlowFreeCtx(fctx);
+    TmqhOutputFlowFreeCtx(fctx);
     TmqResetQueues();
-    return retval;
+    PASS;
 }
 
 static int TmqhOutputFlowSetupCtxTest02(void)
 {
-    int retval = 0;
-    Tmq *tmq = NULL;
-    TmqhFlowCtx *fctx = NULL;
-
     TmqResetQueues();
 
-    tmq = TmqCreateQueue("queue1");
-    if (tmq == NULL)
-        goto end;
-    tmq = TmqCreateQueue("queue2");
-    if (tmq == NULL)
-        goto end;
-    tmq = TmqCreateQueue("another");
-    if (tmq == NULL)
-        goto end;
-    tmq = TmqCreateQueue("yetanother");
-    if (tmq == NULL)
-        goto end;
+    Tmq *tmq1 = TmqCreateQueue("queue1");
+    FAIL_IF_NULL(tmq1);
+    Tmq *tmq2 = TmqCreateQueue("queue2");
+    FAIL_IF_NULL(tmq2);
+    Tmq *tmq3 = TmqCreateQueue("another");
+    FAIL_IF_NULL(tmq3);
+    Tmq *tmq4 = TmqCreateQueue("yetanother");
+    FAIL_IF_NULL(tmq4);
 
     const char *str = "queue1";
     void *ctx = TmqhOutputFlowSetupCtx(str);
+    FAIL_IF_NULL(ctx);
 
-    if (ctx == NULL)
-        goto end;
-
-    fctx = (TmqhFlowCtx *)ctx;
-
-    if (fctx->size != 1)
-        goto end;
+    TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx;
 
-    if (fctx->queues == NULL)
-        goto end;
+    FAIL_IF_NOT(fctx->size == 1);
 
-    if (fctx->queues[0].q != &trans_q[0])
-        goto end;
+    FAIL_IF_NULL(fctx->queues);
 
-    retval = 1;
-end:
-    if (fctx != NULL)
-        TmqhOutputFlowFreeCtx(fctx);
+    FAIL_IF_NOT(fctx->queues[0].q == tmq1->pq);
+    TmqhOutputFlowFreeCtx(fctx);
     TmqResetQueues();
-    return retval;
+
+    PASS;
 }
 
 static int TmqhOutputFlowSetupCtxTest03(void)
 {
-    int retval = 0;
-    TmqhFlowCtx *fctx = NULL;
-
     TmqResetQueues();
 
     const char *str = "queue1,queue2,another,yetanother";
     void *ctx = TmqhOutputFlowSetupCtx(str);
+    FAIL_IF_NULL(ctx);
 
-    if (ctx == NULL)
-        goto end;
+    TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx;
 
-    fctx = (TmqhFlowCtx *)ctx;
+    FAIL_IF_NOT(fctx->size == 4);
 
-    if (fctx->size != 4)
-        goto end;
+    FAIL_IF_NULL(fctx->queues);
 
-    if (fctx->queues == NULL)
-        goto end;
+    Tmq *tmq1 = TmqGetQueueByName("queue1");
+    FAIL_IF_NULL(tmq1);
+    Tmq *tmq2 = TmqGetQueueByName("queue2");
+    FAIL_IF_NULL(tmq2);
+    Tmq *tmq3 = TmqGetQueueByName("another");
+    FAIL_IF_NULL(tmq3);
+    Tmq *tmq4 = TmqGetQueueByName("yetanother");
+    FAIL_IF_NULL(tmq4);
 
-    if (fctx->queues[0].q != &trans_q[0])
-        goto end;
-    if (fctx->queues[1].q != &trans_q[1])
-        goto end;
-    if (fctx->queues[2].q != &trans_q[2])
-        goto end;
-    if (fctx->queues[3].q != &trans_q[3])
-        goto end;
+    FAIL_IF_NOT(fctx->queues[0].q == tmq1->pq);
+    FAIL_IF_NOT(fctx->queues[1].q == tmq2->pq);
+    FAIL_IF_NOT(fctx->queues[2].q == tmq3->pq);
+    FAIL_IF_NOT(fctx->queues[3].q == tmq4->pq);
 
-    retval = 1;
-end:
-    if (fctx != NULL)
-        TmqhOutputFlowFreeCtx(fctx);
+    TmqhOutputFlowFreeCtx(fctx);
     TmqResetQueues();
-    return retval;
+    PASS;
 }
 
 #endif /* UNITTESTS */
index 255406476dac40ae6df67bc0c2526cac540b8824..2cc5ffbe7df77d74dca2c623dd9f49b562f84521 100644 (file)
@@ -46,7 +46,7 @@ void TmqhSimpleRegister (void)
 
 Packet *TmqhInputSimple(ThreadVars *t)
 {
-    PacketQueue *q = &trans_q[t->inq->id];
+    PacketQueue *q = t->inq->pq;
 
     StatsSyncCountersIfSignalled(t);
 
@@ -77,14 +77,14 @@ void TmqhInputSimpleShutdownHandler(ThreadVars *tv)
     }
 
     for (i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++)
-        SCCondSignal(&trans_q[tv->inq->id].cond_q);
+        SCCondSignal(&tv->inq->pq->cond_q);
 }
 
 void TmqhOutputSimple(ThreadVars *t, Packet *p)
 {
     SCLogDebug("Packet %p, p->root %p, alloced %s", p, p->root, p->flags & PKT_ALLOC ? "true":"false");
 
-    PacketQueue *q = &trans_q[t->outq->id];
+    PacketQueue *q = t->outq->pq;
 
     SCMutexLock(&q->mutex_q);
     PacketEnqueue(q, p);