Packet *TmqhInputFlow(ThreadVars *t);
void TmqhOutputFlowHash(ThreadVars *t, Packet *p);
void TmqhOutputFlowIPPair(ThreadVars *t, Packet *p);
-void TmqhOutputFlowActivePackets(ThreadVars *t, Packet *p);
-void TmqhOutputFlowRoundRobin(ThreadVars *t, Packet *p);
void *TmqhOutputFlowSetupCtx(char *queue_str);
void TmqhOutputFlowFreeCtx(void *ctx);
void TmqhFlowRegisterTests(void);
char *scheduler = NULL;
if (ConfGet("autofp-scheduler", &scheduler) == 1) {
if (strcasecmp(scheduler, "round-robin") == 0) {
- tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowRoundRobin;
+ SCLogNotice("using flow hash instead of round robin");
+ tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowHash;
} else if (strcasecmp(scheduler, "active-packets") == 0) {
- //tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowActivePackets;
- SCLogNotice("FIXME: using flow hash instead of active packets");
+ SCLogNotice("using flow hash instead of active packets");
tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowHash;
} else if (strcasecmp(scheduler, "hash") == 0) {
tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowHash;
exit(EXIT_FAILURE);
}
} else {
- //tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowActivePackets;
tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowHash;
}
if (tmqh_table[TMQH_FLOW].OutHandler == (f)) \
SCLogInfo("AutoFP mode using \"%s\" flow load balancer", (msg))
- PRINT_IF_FUNC(TmqhOutputFlowRoundRobin, "Round Robin");
- PRINT_IF_FUNC(TmqhOutputFlowActivePackets, "Active Packets");
PRINT_IF_FUNC(TmqhOutputFlowHash, "Hash");
PRINT_IF_FUNC(TmqhOutputFlowIPPair, "IPPair");
memset(ctx->queues + (ctx->size - 1), 0, sizeof(TmqhFlowMode));
}
ctx->queues[ctx->size - 1].q = &trans_q[id];
- SC_ATOMIC_INIT(ctx->queues[ctx->size - 1].total_packets);
- SC_ATOMIC_INIT(ctx->queues[ctx->size - 1].total_flows);
return 0;
}
tstr = comma ? (comma + 1) : comma;
} while (tstr != NULL);
- SC_ATOMIC_INIT(ctx->round_robin_idx);
-
SCFree(str);
return (void *)ctx;
void TmqhOutputFlowFreeCtx(void *ctx)
{
- int i;
TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx;
SCLogInfo("AutoFP - Total flow handler queues - %" PRIu16,
fctx->size);
- for (i = 0; i < fctx->size; i++) {
- SCLogInfo("AutoFP - Queue %-2"PRIu32 " - pkts: %-12"PRIu64" flows: %-12"PRIu64, i,
- SC_ATOMIC_GET(fctx->queues[i].total_packets),
- SC_ATOMIC_GET(fctx->queues[i].total_flows));
- SC_ATOMIC_DESTROY(fctx->queues[i].total_packets);
- SC_ATOMIC_DESTROY(fctx->queues[i].total_flows);
- }
-
SCFree(fctx->queues);
SCFree(fctx);
return;
}
-/**
- * \brief select the queue to output in a round robin fashion.
- *
- * \param tv thread vars
- * \param p packet
- */
-void TmqhOutputFlowRoundRobin(ThreadVars *tv, Packet *p)
-{
- int16_t qid = 0;
-
- TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx;
-
- /* if no flow we use the first queue,
- * should be rare */
- if (p->flow != NULL) {
- qid = SC_ATOMIC_GET(p->flow->autofp_tmqh_flow_qid);
- if (qid == -1) {
- qid = SC_ATOMIC_ADD(ctx->round_robin_idx, 1);
- if (qid >= ctx->size) {
- SC_ATOMIC_RESET(ctx->round_robin_idx);
- qid = 0;
- }
- (void) SC_ATOMIC_ADD(ctx->queues[qid].total_flows, 1);
- (void) SC_ATOMIC_SET(p->flow->autofp_tmqh_flow_qid, qid);
- }
- } else {
- qid = ctx->last++;
-
- if (ctx->last == ctx->size)
- ctx->last = 0;
- }
- (void) SC_ATOMIC_ADD(ctx->queues[qid].total_packets, 1);
-
- PacketQueue *q = ctx->queues[qid].q;
- SCMutexLock(&q->mutex_q);
- PacketEnqueue(q, p);
- SCCondSignal(&q->cond_q);
- SCMutexUnlock(&q->mutex_q);
-
- return;
-}
-
-/**
- * \brief select the queue to output to based on queue lengths.
- *
- * \param tv thread vars
- * \param p packet
- */
-void TmqhOutputFlowActivePackets(ThreadVars *tv, Packet *p)
-{
- int16_t qid = 0;
-
- TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx;
-
- /* if no flow we round robin the packets over the queues */
- if (p->flow != NULL) {
- qid = SC_ATOMIC_GET(p->flow->autofp_tmqh_flow_qid);
- if (qid == -1) {
- int16_t i = 0;
- int16_t lowest_id = 0;
- TmqhFlowMode *queues = ctx->queues;
- uint32_t lowest = queues[i].q->len;
- for (i = 1; i < ctx->size; i++) {
- if (queues[i].q->len < lowest) {
- lowest = queues[i].q->len;
- lowest_id = i;
- }
- }
- qid = lowest_id;
- (void) SC_ATOMIC_SET(p->flow->autofp_tmqh_flow_qid, lowest_id);
- (void) SC_ATOMIC_ADD(ctx->queues[qid].total_flows, 1);
- }
- } else {
- qid = ctx->last++;
-
- if (ctx->last == ctx->size)
- ctx->last = 0;
- }
- (void) SC_ATOMIC_ADD(ctx->queues[qid].total_packets, 1);
-
- PacketQueue *q = ctx->queues[qid].q;
- SCMutexLock(&q->mutex_q);
- PacketEnqueue(q, p);
- SCCondSignal(&q->cond_q);
- SCMutexUnlock(&q->mutex_q);
-
- return;
-}
-
-/**
- * \brief select the queue to output based on address hash.
- *
- * \param tv thread vars.
- * \param p packet.
- */
-void TmqhOutputFlowHash2(ThreadVars *tv, Packet *p)
-{
- int16_t qid = 0;
-
- TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx;
-
- /* if no flow we use the first queue,
- * should be rare */
- if (p->flow != NULL) {
- qid = SC_ATOMIC_GET(p->flow->autofp_tmqh_flow_qid);
- if (qid == -1) {
-#if __WORDSIZE == 64
- uint64_t addr = (uint64_t)p->flow;
-#else
- uint32_t addr = (uint32_t)p->flow;
-#endif
- addr >>= 7;
-
- /* we don't have to worry about possible overflow, since
- * ctx->size will be less than 2 ** 15 for sure */
- qid = addr % ctx->size;
- (void) SC_ATOMIC_SET(p->flow->autofp_tmqh_flow_qid, qid);
- (void) SC_ATOMIC_ADD(ctx->queues[qid].total_flows, 1);
- }
- } else {
- qid = ctx->last++;
-
- if (ctx->last == ctx->size)
- ctx->last = 0;
- }
- (void) SC_ATOMIC_ADD(ctx->queues[qid].total_packets, 1);
-
- PacketQueue *q = ctx->queues[qid].q;
- SCMutexLock(&q->mutex_q);
- PacketEnqueue(q, p);
- SCCondSignal(&q->cond_q);
- SCMutexUnlock(&q->mutex_q);
-
- return;
-}
-
void TmqhOutputFlowHash(ThreadVars *tv, Packet *p)
{
int16_t qid = 0;
if (ctx->last == ctx->size)
ctx->last = 0;
}
- (void) SC_ATOMIC_ADD(ctx->queues[qid].total_packets, 1);
PacketQueue *q = ctx->queues[qid].q;
SCMutexLock(&q->mutex_q);
return;
}
+
/**
* \brief select the queue to output based on IP address pair.
*
TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx;
- /* if no flow we use the first queue,
- * should be rare */
- if (p->flow != NULL) {
- qid = SC_ATOMIC_GET(p->flow->autofp_tmqh_flow_qid);
- if (qid == -1) {
- if (p->src.family == AF_INET6) {
- for (i = 0; i < 4; i++) {
- addr_hash += p->src.addr_data32[i] + p->dst.addr_data32[i];
- }
- } else {
- addr_hash = p->src.addr_data32[0] + p->dst.addr_data32[0];
- }
-
- /* we don't have to worry about possible overflow, since
- * ctx->size will be lesser than 2 ** 31 for sure */
- qid = addr_hash % ctx->size;
- (void) SC_ATOMIC_SET(p->flow->autofp_tmqh_flow_qid, qid);
- (void) SC_ATOMIC_ADD(ctx->queues[qid].total_flows, 1);
+ if (p->src.family == AF_INET6) {
+ for (i = 0; i < 4; i++) {
+ addr_hash += p->src.addr_data32[i] + p->dst.addr_data32[i];
}
} else {
- qid = ctx->last++;
-
- if (ctx->last == ctx->size)
- ctx->last = 0;
+ addr_hash = p->src.addr_data32[0] + p->dst.addr_data32[0];
}
- (void) SC_ATOMIC_ADD(ctx->queues[qid].total_packets, 1);
+
+ /* we don't have to worry about possible overflow, since
+ * ctx->size will be lesser than 2 ** 31 for sure */
+ qid = addr_hash % ctx->size;
PacketQueue *q = ctx->queues[qid].q;
SCMutexLock(&q->mutex_q);