if (strcmp(scheduler, "round_robin") == 0) {
SCLogInfo("AutoFP mode using \"Round Robin\" Q Handler");
tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowRoundRobin;
- } else if (strcmp(scheduler, "active_flows") == 0) {
- SCLogInfo("AutoFP mode using \"Active Flows\" Q Handler");
- tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowActiveFlows;
} else if (strcmp(scheduler, "active_packets") == 0) {
SCLogInfo("AutoFP mode using \"Active Packets\" Q Handler");
tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowActivePackets;
exit(EXIT_FAILURE);
}
} else {
- SCLogInfo("AutoFP mode using default \"Round Robin\" Q Handler");
- tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowRoundRobin;
+ SCLogInfo("AutoFP mode using default \"Active Packets\" Q Handler");
+ tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowActivePackets;
}
return;
memset(ctx->queues + (ctx->size - 1), 0, sizeof(TmqhFlowMode));
}
ctx->queues[ctx->size - 1].q = &trans_q[id];
- SC_ATOMIC_INIT(ctx->queues[ctx->size - 1].active_flows);
+ SC_ATOMIC_INIT(ctx->queues[ctx->size - 1].total_packets);
+ SC_ATOMIC_INIT(ctx->queues[ctx->size - 1].total_flows);
return 0;
}
tmqh_flow_outctx = ctx;
+ SC_ATOMIC_INIT(ctx->round_robin_idx);
+
SCFree(str);
return (void *)ctx;
int i;
TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx;
+ SCLogInfo("AutoFP - Total flow handler queues - %" PRIu16,
+ tmqh_flow_outctx->size);
for (i = 0; i < fctx->size; i++) {
- SC_ATOMIC_DESTROY(fctx->queues[i].active_flows);
+ SCLogInfo("AutoFP - Total Packets - Queue %"PRIu32 " - %"PRIu64 , i,
+ SC_ATOMIC_GET(fctx->queues[i].total_packets));
+ SCLogInfo("AutoFP - Total Flows - Queue %"PRIu32 " - %"PRIu64 , i,
+ SC_ATOMIC_GET(fctx->queues[i].total_flows));
+ SC_ATOMIC_DESTROY(fctx->queues[i].total_packets);
+ SC_ATOMIC_DESTROY(fctx->queues[i].total_flows);
}
SCFree(fctx->queues);
/* if no flow we use the first queue,
* should be rare */
if (p->flow != NULL) {
- qid = p->flow->autofp_tmqh_flow_qid;
+ qid = SC_ATOMIC_GET(p->flow->autofp_tmqh_flow_qid);
if (qid == -1) {
- p->flow->autofp_tmqh_flow_qid = qid = ctx->round_robin_idx++;
- ctx->round_robin_idx = ctx->round_robin_idx % ctx->size;
- SC_ATOMIC_ADD(ctx->queues[qid].active_flows, 1);
- ctx->queues[qid].total_flows++;
+ qid = SC_ATOMIC_ADD(ctx->round_robin_idx, 1);
+ if (qid >= ctx->size) {
+ SC_ATOMIC_RESET(ctx->round_robin_idx);
+ qid = 0;
+ }
+ SC_ATOMIC_ADD(ctx->queues[qid].total_flows, 1);
+ SC_ATOMIC_SET(p->flow->autofp_tmqh_flow_qid, qid);
}
- ctx->queues[qid].total_packets++;
} else {
qid = ctx->last++;
if (ctx->last == ctx->size)
ctx->last = 0;
}
+ SC_ATOMIC_ADD(ctx->queues[qid].total_packets, 1);
PacketQueue *q = ctx->queues[qid].q;
SCMutexLock(&q->mutex_q);
/* if no flow we use the first queue,
* should be rare */
if (p->flow != NULL) {
- qid = p->flow->autofp_tmqh_flow_qid;
+ qid = SC_ATOMIC_GET(p->flow->autofp_tmqh_flow_qid);
if (qid == -1) {
uint16_t i = 0;
int lowest_id = 0;
lowest_id = i;
}
}
- p->flow->autofp_tmqh_flow_qid = qid = lowest_id;
- SC_ATOMIC_ADD(ctx->queues[qid].active_flows, 1);
- queues[qid].total_flows++;
+ qid = lowest_id;
+ SC_ATOMIC_SET(p->flow->autofp_tmqh_flow_qid, lowest_id);
+ SC_ATOMIC_ADD(ctx->queues[qid].total_flows, 1);
}
- ctx->queues[qid].total_packets++;
} else {
qid = ctx->last++;
if (ctx->last == ctx->size)
ctx->last = 0;
}
+ SC_ATOMIC_ADD(ctx->queues[qid].total_packets, 1);
PacketQueue *q = ctx->queues[qid].q;
SCMutexLock(&q->mutex_q);
return;
}
-/**
- * \brief select the queue to output to based on active flows.
- *
- * \param tv thread vars
- * \param p packet
- */
-void TmqhOutputFlowActiveFlows(ThreadVars *tv, Packet *p)
-{
- int32_t qid = 0;
-
- TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx;
-
- /* if no flow we use the first queue,
- * should be rare */
- if (p->flow != NULL) {
- qid = p->flow->autofp_tmqh_flow_qid;
- if (qid == -1) {
- uint32_t i = 0;
- int lowest_id = 0;
- uint32_t lowest = ctx->queues[i].active_flows_sc_atomic__;
- for (i = 1; i < ctx->size; i++) {
- if (ctx->queues[i].active_flows_sc_atomic__ < lowest) {
- lowest = ctx->queues[i].active_flows_sc_atomic__;
- lowest_id = i;
- }
- }
- p->flow->autofp_tmqh_flow_qid = qid = lowest_id;
- SC_ATOMIC_ADD(ctx->queues[qid].active_flows, 1);
- ctx->queues[qid].total_flows++;
- }
- ctx->queues[qid].total_packets++;
- } else {
- qid = ctx->last++;
-
- if (ctx->last == ctx->size)
- ctx->last = 0;
- }
-
- PacketQueue *q = ctx->queues[qid].q;
- SCMutexLock(&q->mutex_q);
- PacketEnqueue(q, p);
- SCCondSignal(&q->cond_q);
- SCMutexUnlock(&q->mutex_q);
-
- return;
-}
-
-/**
- * \brief Prints flow q handler statistics.
- *
- * Requires engine to be dead when this function's called.
- */
-void TmqhFlowPrintStatistics(void)
-{
- uint32_t i;
-
- SCLogInfo("AutoFP - Total flow handler queues - %" PRIu16,
- tmqh_flow_outctx->size);
- for (i = 0; i < tmqh_flow_outctx->size; i++) {
- SCLogInfo("AutoFP - Total Packets - Queue %"PRIu32 " - %"PRIu64 , i,
- tmqh_flow_outctx->queues[i].total_packets);
- }
- for (i = 0; i < tmqh_flow_outctx->size; i++) {
- SCLogInfo("AutoFP - Total Flows - Queue %"PRIu32 " - %"PRIu64 , i,
- tmqh_flow_outctx->queues[i].total_flows);
- }
-
- return;
-}
-
#ifdef UNITTESTS
static int TmqhOutputFlowSetupCtxTest01(void)