#include "decode.h"
#include "threads.h"
#include "threadvars.h"
+#include "tmqh-flow.h"
#include "tm-queuehandlers.h"
+#include "conf.h"
#include "util-unittest.h"
-typedef struct TmqhFlowMode_ {
- PacketQueue *q;
-
- SC_ATOMIC_DECLARE(uint64_t, active_flows);
- SC_ATOMIC_DECLARE(uint64_t, total_packets);
-} TmqhFlowMode;
-
-/** \brief Ctx for the flow queue handler
- * \param size number of queues to output to
- * \param queues array of queue id's this flow handler outputs to */
-typedef struct TmqhFlowCtx_ {
- uint16_t size;
- uint16_t last;
-
- TmqhFlowMode *queues;
-
- uint16_t round_robin_idx;
-} TmqhFlowCtx;
-
Packet *TmqhInputFlow(ThreadVars *t);
+void TmqhOutputFlowActiveFlows(ThreadVars *t, Packet *p);
+void TmqhOutputFlowActivePackets(ThreadVars *t, Packet *p);
void TmqhOutputFlowRoundRobin(ThreadVars *t, Packet *p);
void *TmqhOutputFlowSetupCtx(char *queue_str);
void TmqhFlowRegisterTests(void);
+TmqhFlowCtx *tmqh_flow_outctx = NULL;
+
void TmqhFlowRegister (void) {
tmqh_table[TMQH_FLOW].name = "flow";
tmqh_table[TMQH_FLOW].InHandler = TmqhInputFlow;
- tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowRoundRobin;
tmqh_table[TMQH_FLOW].OutHandlerCtxSetup = TmqhOutputFlowSetupCtx;
tmqh_table[TMQH_FLOW].OutHandlerCtxFree = NULL;
tmqh_table[TMQH_FLOW].RegisterTests = TmqhFlowRegisterTests;
+
+ char *scheduler = NULL;
+ if (ConfGet("autofp-scheduler", &scheduler) == 1) {
+ if (strcmp(scheduler, "round_robin") == 0) {
+ SCLogInfo("AutoFP mode using \"Round Robin\" Q Handler");
+ tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowRoundRobin;
+ } else if (strcmp(scheduler, "active_flows") == 0) {
+ SCLogInfo("AutoFP mode using \"Active Flows\" Q Handler");
+ tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowActiveFlows;
+ } else if (strcmp(scheduler, "active_packets") == 0) {
+ SCLogInfo("AutoFP mode using \"Active Packets\" Q Handler");
+ tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowActivePackets;
+ } else {
+ SCLogError(SC_ERR_INVALID_YAML_CONF_ENTRY, "Invalid entry \"%s\" for "
+ "autofp-scheduler in conf. Killing engine.", scheduler);
+ exit(EXIT_FAILURE);
+ }
+ } else {
+ SCLogInfo("AutoFP mode using default \"Round Robin\" Q Handler");
+ tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowRoundRobin;
+ }
+
+ return;
}
/* same as 'simple' */
if (ctx->queues == NULL) {
ctx->size = 1;
ctx->queues = SCMalloc(ctx->size * sizeof(TmqhFlowMode));
- memset(ctx->queues, 0, ctx->size * sizeof(sizeof(TmqhFlowMode)));
+ memset(ctx->queues, 0, ctx->size * sizeof(TmqhFlowMode));
if (ctx->queues == NULL) {
return -1;
}
if (ctx->queues == NULL) {
return -1;
}
- memset(ctx->queues + (ctx->size - 1), 0, sizeof(sizeof(TmqhFlowMode)));
+ memset(ctx->queues + (ctx->size - 1), 0, sizeof(TmqhFlowMode));
}
ctx->queues[ctx->size - 1].q = &trans_q[id];
+ SC_ATOMIC_INIT(ctx->queues[ctx->size - 1].active_flows);
return 0;
}
tstr = comma ? (comma + 1) : comma;
} while (tstr != NULL);
+ tmqh_flow_outctx = ctx;
+
SCFree(str);
return (void *)ctx;
error:
if (qid == -1) {
p->flow->autofp_tmqh_flow_qid = qid = ctx->round_robin_idx++;
ctx->round_robin_idx = ctx->round_robin_idx % ctx->size;
+ SC_ATOMIC_ADD(ctx->queues[qid].active_flows, 1);
+ ctx->queues[qid].total_flows++;
+ }
+ ctx->queues[qid].total_packets++;
+ } else {
+ qid = ctx->last++;
+
+ if (ctx->last == ctx->size)
+ ctx->last = 0;
+ }
+
+ PacketQueue *q = ctx->queues[qid].q;
+ SCMutexLock(&q->mutex_q);
+ PacketEnqueue(q, p);
+ SCCondSignal(&q->cond_q);
+ SCMutexUnlock(&q->mutex_q);
+}
+
+/** \brief select the queue to output to based on flow
+ * \param tv thread vars
+ * \param p packet
+ */
+void TmqhOutputFlowActivePackets(ThreadVars *tv, Packet *p)
+{
+ int32_t qid = 0;
+
+ TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx;
+
+ /* if no flow we use the first queue,
+ * should be rare */
+ if (p->flow != NULL) {
+ qid = p->flow->autofp_tmqh_flow_qid;
+ if (qid == -1) {
+ uint16_t i = 0;
+ int lowest_id = 0;
+ TmqhFlowMode *queues = ctx->queues;
+ uint32_t lowest = queues[i].q->len;
+ for (i = 1; i < ctx->size; i++) {
+ if (queues[i].q->len < lowest) {
+ lowest = queues[i].q->len;
+ lowest_id = i;
+ }
+ }
+ p->flow->autofp_tmqh_flow_qid = qid = lowest_id;
+ SC_ATOMIC_ADD(ctx->queues[qid].active_flows, 1);
+ queues[qid].total_flows++;
}
+ ctx->queues[qid].total_packets++;
} else {
qid = ctx->last++;
SCMutexUnlock(&q->mutex_q);
}
+/** \brief select the queue to output to based on flow
+ * \param tv thread vars
+ * \param p packet
+ */
+void TmqhOutputFlowActiveFlows(ThreadVars *tv, Packet *p)
+{
+ int32_t qid = 0;
+
+ TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx;
+
+ /* if no flow we use the first queue,
+ * should be rare */
+ if (p->flow != NULL) {
+ qid = p->flow->autofp_tmqh_flow_qid;
+ if (qid == -1) {
+ uint32_t i = 0;
+ int lowest_id = 0;
+ uint32_t lowest = ctx->queues[i].active_flows_sc_atomic__;
+ for (i = 1; i < ctx->size; i++) {
+ if (ctx->queues[i].active_flows_sc_atomic__ < lowest) {
+ lowest = ctx->queues[i].active_flows_sc_atomic__;
+ lowest_id = i;
+ }
+ }
+ p->flow->autofp_tmqh_flow_qid = qid = lowest_id;
+ SC_ATOMIC_ADD(ctx->queues[qid].active_flows, 1);
+ ctx->queues[qid].total_flows++;
+ }
+ ctx->queues[qid].total_packets++;
+ } else {
+ qid = ctx->last++;
+
+ if (ctx->last == ctx->size)
+ ctx->last = 0;
+ }
+
+ PacketQueue *q = ctx->queues[qid].q;
+ SCMutexLock(&q->mutex_q);
+ PacketEnqueue(q, p);
+ SCCondSignal(&q->cond_q);
+ SCMutexUnlock(&q->mutex_q);
+}
+
+void TmqhFlowPrintStatistics(void)
+{
+ uint32_t i;
+
+ SCLogInfo("AutoFP - Total flow handler queues - %" PRIu16,
+ tmqh_flow_outctx->size);
+ for (i = 0; i < tmqh_flow_outctx->size; i++) {
+ SCLogInfo("AutoFP - Total Packets - Queue %"PRIu32 " - %"PRIu64 , i,
+ tmqh_flow_outctx->queues[i].total_packets);
+ }
+ for (i = 0; i < tmqh_flow_outctx->size; i++) {
+ SCLogInfo("AutoFP - Total Flows - Queue %"PRIu32 " - %"PRIu64 , i,
+ tmqh_flow_outctx->queues[i].total_flows);
+ }
+
+ return;
+}
+
#ifdef UNITTESTS
#if 0