From: Victor Julien Date: Wed, 13 Nov 2019 14:59:51 +0000 (+0100) Subject: threading: hide 'trans_q' from queue handlers X-Git-Tag: suricata-6.0.0-beta1~782 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=550cfdd98d595ead90811be4491ca2b82db7d1f5;p=thirdparty%2Fsuricata.git threading: hide 'trans_q' from queue handlers --- diff --git a/src/counters.c b/src/counters.c index 7f863b75ef..91141da843 100644 --- a/src/counters.c +++ b/src/counters.c @@ -499,7 +499,7 @@ static void *StatsWakeupThread(void *arg) tv->perf_public_ctx.perf_flag = 1; if (tv->inq != NULL) { - PacketQueue *q = &trans_q[tv->inq->id]; + PacketQueue *q = tv->inq->pq; SCCondSignal(&q->cond_q); } diff --git a/src/detect-engine.c b/src/detect-engine.c index 39f73539d6..969e070c93 100644 --- a/src/detect-engine.c +++ b/src/detect-engine.c @@ -1731,7 +1731,7 @@ static void InjectPackets(ThreadVars **detect_tvs, if (p != NULL) { p->flags |= PKT_PSEUDO_STREAM_END; PKT_SET_SRC(p, PKT_SRC_DETECT_RELOAD_FLUSH); - PacketQueue *q = &trans_q[detect_tvs[i]->inq->id]; + PacketQueue *q = detect_tvs[i]->inq->pq; SCMutexLock(&q->mutex_q); PacketEnqueue(q, p); SCCondSignal(&q->cond_q); diff --git a/src/tm-queues.c b/src/tm-queues.c index 888a4bc0bd..1d1e1ccff1 100644 --- a/src/tm-queues.c +++ b/src/tm-queues.c @@ -46,6 +46,8 @@ Tmq *TmqCreateQueue(const char *name) q->id = tmq_id++; q->is_packet_pool = (strcmp(q->name, "packetpool") == 0); + q->pq = &trans_q[q->id]; + SCLogDebug("created queue \'%s\', %p", name, q); return q; @@ -67,9 +69,9 @@ void TmqDebugList(void) { for (int i = 0; i < tmq_id; i++) { /* get a lock accessing the len */ - SCMutexLock(&trans_q[tmqs[i].id].mutex_q); - printf("TmqDebugList: id %" PRIu32 ", name \'%s\', len %" PRIu32 "\n", tmqs[i].id, tmqs[i].name, trans_q[tmqs[i].id].len); - SCMutexUnlock(&trans_q[tmqs[i].id].mutex_q); + SCMutexLock(&tmqs[i].pq->mutex_q); + printf("TmqDebugList: id %" PRIu32 ", name \'%s\', len %" PRIu32 "\n", tmqs[i].id, tmqs[i].name, tmqs[i].pq->len); + SCMutexUnlock(&tmqs[i].pq->mutex_q); } } @@ -93,7 +95,7 @@ void TmValidateQueueState(void) bool err = false; for (int i = 0; i < tmq_id; i++) { - SCMutexLock(&trans_q[tmqs[i].id].mutex_q); + SCMutexLock(&tmqs[i].pq->mutex_q); if (tmqs[i].reader_cnt == 0) { SCLogError(SC_ERR_THREAD_QUEUE, "queue \"%s\" doesn't have a reader (id %d, max %u)", tmqs[i].name, i, tmq_id); err = true; @@ -101,7 +103,7 @@ void TmValidateQueueState(void) SCLogError(SC_ERR_THREAD_QUEUE, "queue \"%s\" doesn't have a writer (id %d, max %u)", tmqs[i].name, i, tmq_id); err = true; } - SCMutexUnlock(&trans_q[tmqs[i].id].mutex_q); + SCMutexUnlock(&tmqs[i].pq->mutex_q); if (err == true) goto error; diff --git a/src/tm-queues.h b/src/tm-queues.h index 5b72c3aa0a..96bc22a63a 100644 --- a/src/tm-queues.h +++ b/src/tm-queues.h @@ -24,12 +24,15 @@ #ifndef __TM_QUEUES_H__ #define __TM_QUEUES_H__ +#include "packet-queue.h" + typedef struct Tmq_ { char *name; bool is_packet_pool; uint16_t id; uint16_t reader_cnt; uint16_t writer_cnt; + PacketQueue *pq; } Tmq; Tmq* TmqCreateQueue(const char *name); diff --git a/src/tm-threads.c b/src/tm-threads.c index d7904fe5fd..c772823190 100644 --- a/src/tm-threads.c +++ b/src/tm-threads.c @@ -273,7 +273,7 @@ static void *TmThreadsSlotPktAcqLoop(void *td) /* if the flowworker module is the first, get the threads input queue */ if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) { - tv->stream_pq = &trans_q[tv->inq->id]; + tv->stream_pq = tv->inq->pq; tv->tm_flowworker = slot; SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq); /* setup a queue */ @@ -398,7 +398,7 @@ static void *TmThreadsSlotPktAcqLoopAFL(void *td) /* if the flowworker module is the first, get the threads input queue */ if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) { - tv->stream_pq = &trans_q[tv->inq->id]; + tv->stream_pq = tv->inq->pq; tv->tm_flowworker = slot; SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq); /* setup a queue */ @@ -510,7 +510,7 @@ static void *TmThreadsSlotVar(void *td) /* if the flowworker module is the first, get the threads input queue */ if (s == (TmSlot *)tv->tm_slots && (s->tm_id == TMM_FLOWWORKER)) { - tv->stream_pq = &trans_q[tv->inq->id]; + tv->stream_pq = tv->inq->pq; tv->tm_flowworker = s; SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq); /* setup a queue */ @@ -1313,7 +1313,7 @@ static bool ThreadStillHasPackets(ThreadVars *tv) /* we wait till we dry out all the inq packets, before we * kill this thread. Do note that you should have disabled * packet acquire by now using TmThreadDisableReceiveThreads()*/ - PacketQueue *q = &trans_q[tv->inq->id]; + PacketQueue *q = tv->inq->pq; SCMutexLock(&q->mutex_q); uint32_t len = q->len; SCMutexUnlock(&q->mutex_q); @@ -1367,7 +1367,7 @@ static int TmThreadKillThread(ThreadVars *tv) } if (tv->inq != NULL) { for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) { - SCCondSignal(&trans_q[tv->inq->id].cond_q); + SCCondSignal(&tv->inq->pq->cond_q); } SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id); } @@ -1503,7 +1503,7 @@ again: if (tv->inq != NULL) { for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) { - SCCondSignal(&trans_q[tv->inq->id].cond_q); + SCCondSignal(&tv->inq->pq->cond_q); } SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id); } @@ -1579,7 +1579,7 @@ again: * THV_KILL flag. */ if (tv->inq != NULL) { for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) { - SCCondSignal(&trans_q[tv->inq->id].cond_q); + SCCondSignal(&tv->inq->pq->cond_q); } SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id); } @@ -2075,7 +2075,7 @@ void TmThreadDumpThreads(void) const uint32_t flags = SC_ATOMIC_GET(tv->flags); SCLogNotice("tv %p: type %u name %s tmm_flags %02X flags %X stream_pq %p", tv, tv->type, tv->name, tv->tmm_flags, flags, tv->stream_pq); - if (tv->inq && tv->stream_pq == &trans_q[tv->inq->id]) { + if (tv->inq && tv->stream_pq == tv->inq->pq) { SCLogNotice("tv %p: stream_pq at tv->inq %u", tv, tv->inq->id); } else if (tv->stream_pq_local != NULL) { for (Packet *xp = tv->stream_pq_local->top; xp != NULL; xp = xp->next) { @@ -2287,7 +2287,7 @@ int TmThreadsInjectPacketsById(Packet **packets, const int id) /* wake up listening thread(s) if necessary */ if (tv->inq != NULL) { - SCCondSignal(&trans_q[tv->inq->id].cond_q); + SCCondSignal(&tv->inq->pq->cond_q); } return 1; } diff --git a/src/tmqh-flow.c b/src/tmqh-flow.c index 6b5f323ed5..4ddb4193db 100644 --- a/src/tmqh-flow.c +++ b/src/tmqh-flow.c @@ -94,7 +94,7 @@ void TmqhFlowPrintAutofpHandler(void) /* same as 'simple' */ Packet *TmqhInputFlow(ThreadVars *tv) { - PacketQueue *q = &trans_q[tv->inq->id]; + PacketQueue *q = tv->inq->pq; StatsSyncCountersIfSignalled(tv); @@ -126,8 +126,6 @@ static int StoreQueueId(TmqhFlowCtx *ctx, char *name) } tmq->writer_cnt++; - uint16_t id = tmq->id; - if (ctx->queues == NULL) { ctx->size = 1; ctx->queues = SCMalloc(ctx->size * sizeof(TmqhFlowMode)); @@ -147,7 +145,7 @@ static int StoreQueueId(TmqhFlowCtx *ctx, char *name) memset(ctx->queues + (ctx->size - 1), 0, sizeof(TmqhFlowMode)); } - ctx->queues[ctx->size - 1].q = &trans_q[id]; + ctx->queues[ctx->size - 1].q = tmq->pq; return 0; } @@ -284,138 +282,98 @@ void TmqhOutputFlowIPPair(ThreadVars *tv, Packet *p) static int TmqhOutputFlowSetupCtxTest01(void) { - int retval = 0; - Tmq *tmq = NULL; - TmqhFlowCtx *fctx = NULL; - TmqResetQueues(); - tmq = TmqCreateQueue("queue1"); - if (tmq == NULL) - goto end; - tmq = TmqCreateQueue("queue2"); - if (tmq == NULL) - goto end; - tmq = TmqCreateQueue("another"); - if (tmq == NULL) - goto end; - tmq = TmqCreateQueue("yetanother"); - if (tmq == NULL) - goto end; + Tmq *tmq1 = TmqCreateQueue("queue1"); + FAIL_IF_NULL(tmq1); + Tmq *tmq2 = TmqCreateQueue("queue2"); + FAIL_IF_NULL(tmq2); + Tmq *tmq3 = TmqCreateQueue("another"); + FAIL_IF_NULL(tmq3); + Tmq *tmq4 = TmqCreateQueue("yetanother"); + FAIL_IF_NULL(tmq4); const char *str = "queue1,queue2,another,yetanother"; void *ctx = TmqhOutputFlowSetupCtx(str); + FAIL_IF_NULL(ctx); - if (ctx == NULL) - goto end; - - fctx = (TmqhFlowCtx *)ctx; + TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx; - if (fctx->size != 4) - goto end; + FAIL_IF_NOT(fctx->size == 4); - if (fctx->queues == NULL) - goto end; + FAIL_IF_NULL(fctx->queues); - if (fctx->queues[0].q != &trans_q[0]) - goto end; - if (fctx->queues[1].q != &trans_q[1]) - goto end; - if (fctx->queues[2].q != &trans_q[2]) - goto end; - if (fctx->queues[3].q != &trans_q[3]) - goto end; + FAIL_IF_NOT(fctx->queues[0].q == tmq1->pq); + FAIL_IF_NOT(fctx->queues[1].q == tmq2->pq); + FAIL_IF_NOT(fctx->queues[2].q == tmq3->pq); + FAIL_IF_NOT(fctx->queues[3].q == tmq4->pq); - retval = 1; -end: - if (fctx != NULL) - TmqhOutputFlowFreeCtx(fctx); + TmqhOutputFlowFreeCtx(fctx); TmqResetQueues(); - return retval; + PASS; } static int TmqhOutputFlowSetupCtxTest02(void) { - int retval = 0; - Tmq *tmq = NULL; - TmqhFlowCtx *fctx = NULL; - TmqResetQueues(); - tmq = TmqCreateQueue("queue1"); - if (tmq == NULL) - goto end; - tmq = TmqCreateQueue("queue2"); - if (tmq == NULL) - goto end; - tmq = TmqCreateQueue("another"); - if (tmq == NULL) - goto end; - tmq = TmqCreateQueue("yetanother"); - if (tmq == NULL) - goto end; + Tmq *tmq1 = TmqCreateQueue("queue1"); + FAIL_IF_NULL(tmq1); + Tmq *tmq2 = TmqCreateQueue("queue2"); + FAIL_IF_NULL(tmq2); + Tmq *tmq3 = TmqCreateQueue("another"); + FAIL_IF_NULL(tmq3); + Tmq *tmq4 = TmqCreateQueue("yetanother"); + FAIL_IF_NULL(tmq4); const char *str = "queue1"; void *ctx = TmqhOutputFlowSetupCtx(str); + FAIL_IF_NULL(ctx); - if (ctx == NULL) - goto end; - - fctx = (TmqhFlowCtx *)ctx; - - if (fctx->size != 1) - goto end; + TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx; - if (fctx->queues == NULL) - goto end; + FAIL_IF_NOT(fctx->size == 1); - if (fctx->queues[0].q != &trans_q[0]) - goto end; + FAIL_IF_NULL(fctx->queues); - retval = 1; -end: - if (fctx != NULL) - TmqhOutputFlowFreeCtx(fctx); + FAIL_IF_NOT(fctx->queues[0].q == tmq1->pq); + TmqhOutputFlowFreeCtx(fctx); TmqResetQueues(); - return retval; + + PASS; } static int TmqhOutputFlowSetupCtxTest03(void) { - int retval = 0; - TmqhFlowCtx *fctx = NULL; - TmqResetQueues(); const char *str = "queue1,queue2,another,yetanother"; void *ctx = TmqhOutputFlowSetupCtx(str); + FAIL_IF_NULL(ctx); - if (ctx == NULL) - goto end; + TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx; - fctx = (TmqhFlowCtx *)ctx; + FAIL_IF_NOT(fctx->size == 4); - if (fctx->size != 4) - goto end; + FAIL_IF_NULL(fctx->queues); - if (fctx->queues == NULL) - goto end; + Tmq *tmq1 = TmqGetQueueByName("queue1"); + FAIL_IF_NULL(tmq1); + Tmq *tmq2 = TmqGetQueueByName("queue2"); + FAIL_IF_NULL(tmq2); + Tmq *tmq3 = TmqGetQueueByName("another"); + FAIL_IF_NULL(tmq3); + Tmq *tmq4 = TmqGetQueueByName("yetanother"); + FAIL_IF_NULL(tmq4); - if (fctx->queues[0].q != &trans_q[0]) - goto end; - if (fctx->queues[1].q != &trans_q[1]) - goto end; - if (fctx->queues[2].q != &trans_q[2]) - goto end; - if (fctx->queues[3].q != &trans_q[3]) - goto end; + FAIL_IF_NOT(fctx->queues[0].q == tmq1->pq); + FAIL_IF_NOT(fctx->queues[1].q == tmq2->pq); + FAIL_IF_NOT(fctx->queues[2].q == tmq3->pq); + FAIL_IF_NOT(fctx->queues[3].q == tmq4->pq); - retval = 1; -end: - if (fctx != NULL) - TmqhOutputFlowFreeCtx(fctx); + TmqhOutputFlowFreeCtx(fctx); TmqResetQueues(); - return retval; + PASS; } #endif /* UNITTESTS */ diff --git a/src/tmqh-simple.c b/src/tmqh-simple.c index 255406476d..2cc5ffbe7d 100644 --- a/src/tmqh-simple.c +++ b/src/tmqh-simple.c @@ -46,7 +46,7 @@ void TmqhSimpleRegister (void) Packet *TmqhInputSimple(ThreadVars *t) { - PacketQueue *q = &trans_q[t->inq->id]; + PacketQueue *q = t->inq->pq; StatsSyncCountersIfSignalled(t); @@ -77,14 +77,14 @@ void TmqhInputSimpleShutdownHandler(ThreadVars *tv) } for (i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) - SCCondSignal(&trans_q[tv->inq->id].cond_q); + SCCondSignal(&tv->inq->pq->cond_q); } void TmqhOutputSimple(ThreadVars *t, Packet *p) { SCLogDebug("Packet %p, p->root %p, alloced %s", p, p->root, p->flags & PKT_ALLOC ? "true":"false"); - PacketQueue *q = &trans_q[t->outq->id]; + PacketQueue *q = t->outq->pq; SCMutexLock(&q->mutex_q); PacketEnqueue(q, p);