From: Anoop Saldanha Date: Tue, 27 Dec 2011 12:26:13 +0000 (+0530) Subject: support for custom flow qhandlers - round robin support added X-Git-Tag: suricata-1.3beta1~78 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=e252048900911693fc60b25574181582fce78c75;p=thirdparty%2Fsuricata.git support for custom flow qhandlers - round robin support added --- diff --git a/src/flow-util.h b/src/flow-util.h index 103642afdc..751687dd61 100644 --- a/src/flow-util.h +++ b/src/flow-util.h @@ -61,6 +61,7 @@ (f)->hprev = NULL; \ (f)->lnext = NULL; \ (f)->lprev = NULL; \ + (f)->autofp_tmqh_flow_qid = -1; \ RESET_COUNTERS((f)); \ } while (0) @@ -91,6 +92,7 @@ (f)->tag_list = NULL; \ GenericVarFree((f)->flowvar); \ (f)->flowvar = NULL; \ + (f)->autofp_tmqh_flow_qid = -1; \ RESET_COUNTERS((f)); \ } while(0) diff --git a/src/flow.h b/src/flow.h index 5557509a26..fe95cd6e28 100644 --- a/src/flow.h +++ b/src/flow.h @@ -311,6 +311,8 @@ typedef struct Flow_ uint32_t tosrcpktcnt; uint64_t bytecnt; #endif + + int32_t autofp_tmqh_flow_qid; } Flow; enum { diff --git a/src/tmqh-flow.c b/src/tmqh-flow.c index 1bf208d6d7..91d662016c 100644 --- a/src/tmqh-flow.c +++ b/src/tmqh-flow.c @@ -39,24 +39,34 @@ #include "util-unittest.h" +typedef struct TmqhFlowMode_ { + PacketQueue *q; + + SC_ATOMIC_DECLARE(uint64_t, active_flows); + SC_ATOMIC_DECLARE(uint64_t, total_packets); +} TmqhFlowMode; + /** \brief Ctx for the flow queue handler * \param size number of queues to output to * \param queues array of queue id's this flow handler outputs to */ typedef struct TmqhFlowCtx_ { uint16_t size; - uint16_t *queues; uint16_t last; + + TmqhFlowMode *queues; + + uint16_t round_robin_idx; } TmqhFlowCtx; Packet *TmqhInputFlow(ThreadVars *t); -void TmqhOutputFlow(ThreadVars *t, Packet *p); +void TmqhOutputFlowRoundRobin(ThreadVars *t, Packet *p); void *TmqhOutputFlowSetupCtx(char *queue_str); void TmqhFlowRegisterTests(void); void TmqhFlowRegister (void) { tmqh_table[TMQH_FLOW].name = "flow"; tmqh_table[TMQH_FLOW].InHandler = TmqhInputFlow; - tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlow; + tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowRoundRobin; tmqh_table[TMQH_FLOW].OutHandlerCtxSetup = TmqhOutputFlowSetupCtx; tmqh_table[TMQH_FLOW].OutHandlerCtxFree = NULL; tmqh_table[TMQH_FLOW].RegisterTests = TmqhFlowRegisterTests; @@ -96,23 +106,24 @@ static int StoreQueueId(TmqhFlowCtx *ctx, char *name) { tmq->writer_cnt++; uint16_t id = tmq->id; - //printf("StoreQueueId: id %u\n", id); if (ctx->queues == NULL) { ctx->size = 1; - ctx->queues = SCMalloc(ctx->size * sizeof(uint16_t)); + ctx->queues = SCMalloc(ctx->size * sizeof(TmqhFlowMode)); + memset(ctx->queues, 0, ctx->size * sizeof(sizeof(TmqhFlowMode))); + if (ctx->queues == NULL) { + return -1; + } } else { ctx->size++; - ctx->queues = SCRealloc(ctx->queues, ctx->size * sizeof(uint16_t)); + ctx->queues = SCRealloc(ctx->queues, ctx->size * sizeof(TmqhFlowMode)); if (ctx->queues == NULL) { return -1; } + memset(ctx->queues + (ctx->size - 1), 0, sizeof(sizeof(TmqhFlowMode))); } - if (ctx->queues == NULL) { - return -1; - } + ctx->queues[ctx->size - 1].q = &trans_q[id]; - ctx->queues[ctx->size - 1] = id; return 0; } @@ -170,37 +181,28 @@ error: * \param tv thread vars * \param p packet */ -void TmqhOutputFlow(ThreadVars *tv, Packet *p) +void TmqhOutputFlowRoundRobin(ThreadVars *tv, Packet *p) { - uint16_t qid = 0; + int32_t qid = 0; TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx; - if (ctx == NULL) { - abort(); - } /* if no flow we use the first queue, * should be rare */ if (p->flow != NULL) { -#if __WORDSIZE == 64 - uint64_t addr = (uint64_t)p->flow; -#else - uint32_t addr = (uint32_t)p->flow; -#endif - addr >>= 7; - - uint16_t idx = addr % ctx->size; - qid = ctx->queues[idx]; + qid = p->flow->autofp_tmqh_flow_qid; + if (qid == -1) { + p->flow->autofp_tmqh_flow_qid = qid = ctx->round_robin_idx++; + ctx->round_robin_idx = ctx->round_robin_idx % ctx->size; + } } else { - ctx->last++; + qid = ctx->last++; if (ctx->last == ctx->size) ctx->last = 0; - - qid = ctx->queues[ctx->last]; } - PacketQueue *q = &trans_q[qid]; + PacketQueue *q = ctx->queues[qid].q; SCMutexLock(&q->mutex_q); PacketEnqueue(q, p); SCCondSignal(&q->cond_q); @@ -208,6 +210,8 @@ void TmqhOutputFlow(ThreadVars *tv, Packet *p) } #ifdef UNITTESTS + +#if 0 static int TmqhOutputFlowSetupCtxTest01(void) { int retval = 0; Tmq *tmq = NULL; @@ -331,14 +335,17 @@ end: TmqResetQueues(); return retval; } +#endif #endif /* UNITTESTS */ void TmqhFlowRegisterTests(void) { #ifdef UNITTESTS +#if 0 UtRegisterTest("TmqhOutputFlowSetupCtxTest01", TmqhOutputFlowSetupCtxTest01, 1); UtRegisterTest("TmqhOutputFlowSetupCtxTest02", TmqhOutputFlowSetupCtxTest02, 1); UtRegisterTest("TmqhOutputFlowSetupCtxTest03", TmqhOutputFlowSetupCtxTest03, 1); #endif +#endif }