From: Anoop Saldanha Date: Thu, 12 Jan 2012 11:09:47 +0000 (+0530) Subject: neaten flow q handler code X-Git-Tag: suricata-1.3beta1~75 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=d01589c9d8f47d6915565011ba6af59c02462fcd;p=thirdparty%2Fsuricata.git neaten flow q handler code --- diff --git a/src/tmqh-flow.c b/src/tmqh-flow.c index f17237c0c5..7cd1072c2d 100644 --- a/src/tmqh-flow.c +++ b/src/tmqh-flow.c @@ -19,14 +19,12 @@ * \file * * \author Victor Julien + * \author Anoop Saldanha * * Simple output queue handler that makes sure all packets of the same flow - * are sent to the same queue. This is done by simply hashing the flow's - * memory address as thats readable from a packet without the need to lock - * the flow itself. - * - * \todo we can also think about a queue handler that takes queue load into - * account. + * are sent to the same queue. We support different kind of q handlers. Have + * a look at "autofp-scheduler" conf to further undertsand the various q + * handlers we provide. */ #include "suricata.h" @@ -50,7 +48,8 @@ void TmqhFlowRegisterTests(void); TmqhFlowCtx *tmqh_flow_outctx = NULL; -void TmqhFlowRegister (void) { +void TmqhFlowRegister(void) +{ tmqh_table[TMQH_FLOW].name = "flow"; tmqh_table[TMQH_FLOW].InHandler = TmqhInputFlow; tmqh_table[TMQH_FLOW].OutHandlerCtxSetup = TmqhOutputFlowSetupCtx; @@ -69,8 +68,9 @@ void TmqhFlowRegister (void) { SCLogInfo("AutoFP mode using \"Active Packets\" Q Handler"); tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowActivePackets; } else { - SCLogError(SC_ERR_INVALID_YAML_CONF_ENTRY, "Invalid entry \"%s\" for " - "autofp-scheduler in conf. Killing engine.", scheduler); + SCLogError(SC_ERR_INVALID_YAML_CONF_ENTRY, "Invalid entry \"%s\" " + "for autofp-scheduler in conf. Killing engine.", + scheduler); exit(EXIT_FAILURE); } } else { @@ -105,7 +105,8 @@ Packet *TmqhInputFlow(ThreadVars *tv) } } -static int StoreQueueId(TmqhFlowCtx *ctx, char *name) { +static int StoreQueueId(TmqhFlowCtx *ctx, char *name) +{ Tmq *tmq = TmqGetQueueByName(name); if (tmq == NULL) { tmq = TmqCreateQueue(SCStrdup(name)); @@ -137,15 +138,18 @@ static int StoreQueueId(TmqhFlowCtx *ctx, char *name) { return 0; } -/** \brief setup the queue handlers ctx +/** + * \brief setup the queue handlers ctx * - * Parses a comma separated string "queuename1,queuename2,etc" - * and sets the ctx up to devide flows over these queue's. + * Parses a comma separated string "queuename1,queuename2,etc" + * and sets the ctx up to devide flows over these queue's. * - * \param queue_str comma separated string with output queue names - * \retval ctx queues handlers ctx or NULL in error + * \param queue_str comma separated string with output queue names + * + * \retval ctx queues handlers ctx or NULL in error */ -void *TmqhOutputFlowSetupCtx(char *queue_str) { +void *TmqhOutputFlowSetupCtx(char *queue_str) +{ if (queue_str == NULL || strlen(queue_str) == 0) return NULL; @@ -182,6 +186,7 @@ void *TmqhOutputFlowSetupCtx(char *queue_str) { SCFree(str); return (void *)ctx; + error: SCFree(ctx); if (str != NULL) @@ -189,9 +194,11 @@ error: return NULL; } -/** \brief select the queue to output to based on flow - * \param tv thread vars - * \param p packet +/** + * \brief select the queue to output in a round robin fashion. + * + * \param tv thread vars + * \param p packet */ void TmqhOutputFlowRoundRobin(ThreadVars *tv, Packet *p) { @@ -222,11 +229,15 @@ void TmqhOutputFlowRoundRobin(ThreadVars *tv, Packet *p) PacketEnqueue(q, p); SCCondSignal(&q->cond_q); SCMutexUnlock(&q->mutex_q); + + return; } -/** \brief select the queue to output to based on flow - * \param tv thread vars - * \param p packet +/** + * \brief select the queue to output to based on queue lengths. + * + * \param tv thread vars + * \param p packet */ void TmqhOutputFlowActivePackets(ThreadVars *tv, Packet *p) { @@ -266,11 +277,15 @@ void TmqhOutputFlowActivePackets(ThreadVars *tv, Packet *p) PacketEnqueue(q, p); SCCondSignal(&q->cond_q); SCMutexUnlock(&q->mutex_q); + + return; } -/** \brief select the queue to output to based on flow - * \param tv thread vars - * \param p packet +/** + * \brief select the queue to output to based on active flows. + * + * \param tv thread vars + * \param p packet */ void TmqhOutputFlowActiveFlows(ThreadVars *tv, Packet *p) { @@ -309,8 +324,15 @@ void TmqhOutputFlowActiveFlows(ThreadVars *tv, Packet *p) PacketEnqueue(q, p); SCCondSignal(&q->cond_q); SCMutexUnlock(&q->mutex_q); + + return; } +/** + * \brief Prints flow q handler statistics. + * + * Requires engine to be dead when this function's called. + */ void TmqhFlowPrintStatistics(void) { uint32_t i; @@ -331,7 +353,8 @@ void TmqhFlowPrintStatistics(void) #ifdef UNITTESTS -static int TmqhOutputFlowSetupCtxTest01(void) { +static int TmqhOutputFlowSetupCtxTest01(void) +{ int retval = 0; Tmq *tmq = NULL; @@ -379,7 +402,8 @@ end: return retval; } -static int TmqhOutputFlowSetupCtxTest02(void) { +static int TmqhOutputFlowSetupCtxTest02(void) +{ int retval = 0; Tmq *tmq = NULL; @@ -421,7 +445,8 @@ end: return retval; } -static int TmqhOutputFlowSetupCtxTest03(void) { +static int TmqhOutputFlowSetupCtxTest03(void) +{ int retval = 0; TmqResetQueues(); @@ -457,11 +482,13 @@ end: #endif /* UNITTESTS */ -void TmqhFlowRegisterTests(void) { +void TmqhFlowRegisterTests(void) +{ #ifdef UNITTESTS UtRegisterTest("TmqhOutputFlowSetupCtxTest01", TmqhOutputFlowSetupCtxTest01, 1); UtRegisterTest("TmqhOutputFlowSetupCtxTest02", TmqhOutputFlowSetupCtxTest02, 1); UtRegisterTest("TmqhOutputFlowSetupCtxTest03", TmqhOutputFlowSetupCtxTest03, 1); #endif -} + return; +}