tv->perf_public_ctx.perf_flag = 1;
if (tv->inq != NULL) {
- PacketQueue *q = &trans_q[tv->inq->id];
+ PacketQueue *q = tv->inq->pq;
SCCondSignal(&q->cond_q);
}
if (p != NULL) {
p->flags |= PKT_PSEUDO_STREAM_END;
PKT_SET_SRC(p, PKT_SRC_DETECT_RELOAD_FLUSH);
- PacketQueue *q = &trans_q[detect_tvs[i]->inq->id];
+ PacketQueue *q = detect_tvs[i]->inq->pq;
SCMutexLock(&q->mutex_q);
PacketEnqueue(q, p);
SCCondSignal(&q->cond_q);
q->id = tmq_id++;
q->is_packet_pool = (strcmp(q->name, "packetpool") == 0);
+ q->pq = &trans_q[q->id];
+
SCLogDebug("created queue \'%s\', %p", name, q);
return q;
{
for (int i = 0; i < tmq_id; i++) {
/* get a lock accessing the len */
- SCMutexLock(&trans_q[tmqs[i].id].mutex_q);
- printf("TmqDebugList: id %" PRIu32 ", name \'%s\', len %" PRIu32 "\n", tmqs[i].id, tmqs[i].name, trans_q[tmqs[i].id].len);
- SCMutexUnlock(&trans_q[tmqs[i].id].mutex_q);
+ SCMutexLock(&tmqs[i].pq->mutex_q);
+ printf("TmqDebugList: id %" PRIu32 ", name \'%s\', len %" PRIu32 "\n", tmqs[i].id, tmqs[i].name, tmqs[i].pq->len);
+ SCMutexUnlock(&tmqs[i].pq->mutex_q);
}
}
bool err = false;
for (int i = 0; i < tmq_id; i++) {
- SCMutexLock(&trans_q[tmqs[i].id].mutex_q);
+ SCMutexLock(&tmqs[i].pq->mutex_q);
if (tmqs[i].reader_cnt == 0) {
SCLogError(SC_ERR_THREAD_QUEUE, "queue \"%s\" doesn't have a reader (id %d, max %u)", tmqs[i].name, i, tmq_id);
err = true;
SCLogError(SC_ERR_THREAD_QUEUE, "queue \"%s\" doesn't have a writer (id %d, max %u)", tmqs[i].name, i, tmq_id);
err = true;
}
- SCMutexUnlock(&trans_q[tmqs[i].id].mutex_q);
+ SCMutexUnlock(&tmqs[i].pq->mutex_q);
if (err == true)
goto error;
#ifndef __TM_QUEUES_H__
#define __TM_QUEUES_H__
+#include "packet-queue.h"
+
typedef struct Tmq_ {
char *name;
bool is_packet_pool;
uint16_t id;
uint16_t reader_cnt;
uint16_t writer_cnt;
+ PacketQueue *pq;
} Tmq;
Tmq* TmqCreateQueue(const char *name);
/* if the flowworker module is the first, get the threads input queue */
if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) {
- tv->stream_pq = &trans_q[tv->inq->id];
+ tv->stream_pq = tv->inq->pq;
tv->tm_flowworker = slot;
SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
/* setup a queue */
/* if the flowworker module is the first, get the threads input queue */
if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) {
- tv->stream_pq = &trans_q[tv->inq->id];
+ tv->stream_pq = tv->inq->pq;
tv->tm_flowworker = slot;
SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
/* setup a queue */
/* if the flowworker module is the first, get the threads input queue */
if (s == (TmSlot *)tv->tm_slots && (s->tm_id == TMM_FLOWWORKER)) {
- tv->stream_pq = &trans_q[tv->inq->id];
+ tv->stream_pq = tv->inq->pq;
tv->tm_flowworker = s;
SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
/* setup a queue */
/* we wait till we dry out all the inq packets, before we
* kill this thread. Do note that you should have disabled
* packet acquire by now using TmThreadDisableReceiveThreads()*/
- PacketQueue *q = &trans_q[tv->inq->id];
+ PacketQueue *q = tv->inq->pq;
SCMutexLock(&q->mutex_q);
uint32_t len = q->len;
SCMutexUnlock(&q->mutex_q);
}
if (tv->inq != NULL) {
for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
- SCCondSignal(&trans_q[tv->inq->id].cond_q);
+ SCCondSignal(&tv->inq->pq->cond_q);
}
SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
}
if (tv->inq != NULL) {
for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
- SCCondSignal(&trans_q[tv->inq->id].cond_q);
+ SCCondSignal(&tv->inq->pq->cond_q);
}
SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
}
* THV_KILL flag. */
if (tv->inq != NULL) {
for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
- SCCondSignal(&trans_q[tv->inq->id].cond_q);
+ SCCondSignal(&tv->inq->pq->cond_q);
}
SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
}
const uint32_t flags = SC_ATOMIC_GET(tv->flags);
SCLogNotice("tv %p: type %u name %s tmm_flags %02X flags %X stream_pq %p",
tv, tv->type, tv->name, tv->tmm_flags, flags, tv->stream_pq);
- if (tv->inq && tv->stream_pq == &trans_q[tv->inq->id]) {
+ if (tv->inq && tv->stream_pq == tv->inq->pq) {
SCLogNotice("tv %p: stream_pq at tv->inq %u", tv, tv->inq->id);
} else if (tv->stream_pq_local != NULL) {
for (Packet *xp = tv->stream_pq_local->top; xp != NULL; xp = xp->next) {
/* wake up listening thread(s) if necessary */
if (tv->inq != NULL) {
- SCCondSignal(&trans_q[tv->inq->id].cond_q);
+ SCCondSignal(&tv->inq->pq->cond_q);
}
return 1;
}
/* same as 'simple' */
Packet *TmqhInputFlow(ThreadVars *tv)
{
- PacketQueue *q = &trans_q[tv->inq->id];
+ PacketQueue *q = tv->inq->pq;
StatsSyncCountersIfSignalled(tv);
}
tmq->writer_cnt++;
- uint16_t id = tmq->id;
-
if (ctx->queues == NULL) {
ctx->size = 1;
ctx->queues = SCMalloc(ctx->size * sizeof(TmqhFlowMode));
memset(ctx->queues + (ctx->size - 1), 0, sizeof(TmqhFlowMode));
}
- ctx->queues[ctx->size - 1].q = &trans_q[id];
+ ctx->queues[ctx->size - 1].q = tmq->pq;
return 0;
}
static int TmqhOutputFlowSetupCtxTest01(void)
{
- int retval = 0;
- Tmq *tmq = NULL;
- TmqhFlowCtx *fctx = NULL;
-
TmqResetQueues();
- tmq = TmqCreateQueue("queue1");
- if (tmq == NULL)
- goto end;
- tmq = TmqCreateQueue("queue2");
- if (tmq == NULL)
- goto end;
- tmq = TmqCreateQueue("another");
- if (tmq == NULL)
- goto end;
- tmq = TmqCreateQueue("yetanother");
- if (tmq == NULL)
- goto end;
+ Tmq *tmq1 = TmqCreateQueue("queue1");
+ FAIL_IF_NULL(tmq1);
+ Tmq *tmq2 = TmqCreateQueue("queue2");
+ FAIL_IF_NULL(tmq2);
+ Tmq *tmq3 = TmqCreateQueue("another");
+ FAIL_IF_NULL(tmq3);
+ Tmq *tmq4 = TmqCreateQueue("yetanother");
+ FAIL_IF_NULL(tmq4);
const char *str = "queue1,queue2,another,yetanother";
void *ctx = TmqhOutputFlowSetupCtx(str);
+ FAIL_IF_NULL(ctx);
- if (ctx == NULL)
- goto end;
-
- fctx = (TmqhFlowCtx *)ctx;
+ TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx;
- if (fctx->size != 4)
- goto end;
+ FAIL_IF_NOT(fctx->size == 4);
- if (fctx->queues == NULL)
- goto end;
+ FAIL_IF_NULL(fctx->queues);
- if (fctx->queues[0].q != &trans_q[0])
- goto end;
- if (fctx->queues[1].q != &trans_q[1])
- goto end;
- if (fctx->queues[2].q != &trans_q[2])
- goto end;
- if (fctx->queues[3].q != &trans_q[3])
- goto end;
+ FAIL_IF_NOT(fctx->queues[0].q == tmq1->pq);
+ FAIL_IF_NOT(fctx->queues[1].q == tmq2->pq);
+ FAIL_IF_NOT(fctx->queues[2].q == tmq3->pq);
+ FAIL_IF_NOT(fctx->queues[3].q == tmq4->pq);
- retval = 1;
-end:
- if (fctx != NULL)
- TmqhOutputFlowFreeCtx(fctx);
+ TmqhOutputFlowFreeCtx(fctx);
TmqResetQueues();
- return retval;
+ PASS;
}
static int TmqhOutputFlowSetupCtxTest02(void)
{
- int retval = 0;
- Tmq *tmq = NULL;
- TmqhFlowCtx *fctx = NULL;
-
TmqResetQueues();
- tmq = TmqCreateQueue("queue1");
- if (tmq == NULL)
- goto end;
- tmq = TmqCreateQueue("queue2");
- if (tmq == NULL)
- goto end;
- tmq = TmqCreateQueue("another");
- if (tmq == NULL)
- goto end;
- tmq = TmqCreateQueue("yetanother");
- if (tmq == NULL)
- goto end;
+ Tmq *tmq1 = TmqCreateQueue("queue1");
+ FAIL_IF_NULL(tmq1);
+ Tmq *tmq2 = TmqCreateQueue("queue2");
+ FAIL_IF_NULL(tmq2);
+ Tmq *tmq3 = TmqCreateQueue("another");
+ FAIL_IF_NULL(tmq3);
+ Tmq *tmq4 = TmqCreateQueue("yetanother");
+ FAIL_IF_NULL(tmq4);
const char *str = "queue1";
void *ctx = TmqhOutputFlowSetupCtx(str);
+ FAIL_IF_NULL(ctx);
- if (ctx == NULL)
- goto end;
-
- fctx = (TmqhFlowCtx *)ctx;
-
- if (fctx->size != 1)
- goto end;
+ TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx;
- if (fctx->queues == NULL)
- goto end;
+ FAIL_IF_NOT(fctx->size == 1);
- if (fctx->queues[0].q != &trans_q[0])
- goto end;
+ FAIL_IF_NULL(fctx->queues);
- retval = 1;
-end:
- if (fctx != NULL)
- TmqhOutputFlowFreeCtx(fctx);
+ FAIL_IF_NOT(fctx->queues[0].q == tmq1->pq);
+ TmqhOutputFlowFreeCtx(fctx);
TmqResetQueues();
- return retval;
+
+ PASS;
}
static int TmqhOutputFlowSetupCtxTest03(void)
{
- int retval = 0;
- TmqhFlowCtx *fctx = NULL;
-
TmqResetQueues();
const char *str = "queue1,queue2,another,yetanother";
void *ctx = TmqhOutputFlowSetupCtx(str);
+ FAIL_IF_NULL(ctx);
- if (ctx == NULL)
- goto end;
+ TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx;
- fctx = (TmqhFlowCtx *)ctx;
+ FAIL_IF_NOT(fctx->size == 4);
- if (fctx->size != 4)
- goto end;
+ FAIL_IF_NULL(fctx->queues);
- if (fctx->queues == NULL)
- goto end;
+ Tmq *tmq1 = TmqGetQueueByName("queue1");
+ FAIL_IF_NULL(tmq1);
+ Tmq *tmq2 = TmqGetQueueByName("queue2");
+ FAIL_IF_NULL(tmq2);
+ Tmq *tmq3 = TmqGetQueueByName("another");
+ FAIL_IF_NULL(tmq3);
+ Tmq *tmq4 = TmqGetQueueByName("yetanother");
+ FAIL_IF_NULL(tmq4);
- if (fctx->queues[0].q != &trans_q[0])
- goto end;
- if (fctx->queues[1].q != &trans_q[1])
- goto end;
- if (fctx->queues[2].q != &trans_q[2])
- goto end;
- if (fctx->queues[3].q != &trans_q[3])
- goto end;
+ FAIL_IF_NOT(fctx->queues[0].q == tmq1->pq);
+ FAIL_IF_NOT(fctx->queues[1].q == tmq2->pq);
+ FAIL_IF_NOT(fctx->queues[2].q == tmq3->pq);
+ FAIL_IF_NOT(fctx->queues[3].q == tmq4->pq);
- retval = 1;
-end:
- if (fctx != NULL)
- TmqhOutputFlowFreeCtx(fctx);
+ TmqhOutputFlowFreeCtx(fctx);
TmqResetQueues();
- return retval;
+ PASS;
}
#endif /* UNITTESTS */
Packet *TmqhInputSimple(ThreadVars *t)
{
- PacketQueue *q = &trans_q[t->inq->id];
+ PacketQueue *q = t->inq->pq;
StatsSyncCountersIfSignalled(t);
}
for (i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++)
- SCCondSignal(&trans_q[tv->inq->id].cond_q);
+ SCCondSignal(&tv->inq->pq->cond_q);
}
void TmqhOutputSimple(ThreadVars *t, Packet *p)
{
SCLogDebug("Packet %p, p->root %p, alloced %s", p, p->root, p->flags & PKT_ALLOC ? "true":"false");
- PacketQueue *q = &trans_q[t->outq->id];
+ PacketQueue *q = t->outq->pq;
SCMutexLock(&q->mutex_q);
PacketEnqueue(q, p);