Packet *TmqhInputFlow(ThreadVars *t);
void TmqhOutputFlowHash(ThreadVars *t, Packet *p);
+void TmqhOutputFlowIPPair(ThreadVars *t, Packet *p);
void TmqhOutputFlowActivePackets(ThreadVars *t, Packet *p);
void TmqhOutputFlowRoundRobin(ThreadVars *t, Packet *p);
void *TmqhOutputFlowSetupCtx(char *queue_str);
} else if (strcasecmp(scheduler, "hash") == 0) {
SCLogInfo("AutoFP mode using \"Hash\" flow load balancer");
tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowHash;
+ } else if (strcasecmp(scheduler, "ippair") == 0) {
+ SCLogInfo("AutoFP mode using \"ippair\" flow load balancer");
+ tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowIPPair;
} else {
SCLogError(SC_ERR_INVALID_YAML_CONF_ENTRY, "Invalid entry \"%s\" "
"for autofp-scheduler in conf. Killing engine.",
return;
}
+/**
+ * \brief select the queue to output based on IP address pair.
+ *
+ * \param tv thread vars.
+ * \param p packet.
+ */
+void TmqhOutputFlowIPPair(ThreadVars *tv, Packet *p)
+{
+ int16_t qid = 0;
+ uint32_t addr_hash = 0;
+ int i;
+
+ TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx;
+
+ /* if no flow we use the first queue,
+ * should be rare */
+ if (p->flow != NULL) {
+ qid = SC_ATOMIC_GET(p->flow->autofp_tmqh_flow_qid);
+ if (qid == -1) {
+ if (p->src.family == AF_INET6) {
+ for (i = 0; i < 4; i++) {
+ addr_hash += p->src.addr_data32[i] + p->dst.addr_data32[i];
+ }
+ } else {
+ addr_hash = p->src.addr_data32[0] + p->dst.addr_data32[0];
+ }
+
+ /* we don't have to worry about possible overflow, since
+ * ctx->size will be lesser than 2 ** 31 for sure */
+ qid = addr_hash % ctx->size;
+ (void) SC_ATOMIC_SET(p->flow->autofp_tmqh_flow_qid, qid);
+ (void) SC_ATOMIC_ADD(ctx->queues[qid].total_flows, 1);
+ }
+ } else {
+ qid = ctx->last++;
+
+ if (ctx->last == ctx->size)
+ ctx->last = 0;
+ }
+ (void) SC_ATOMIC_ADD(ctx->queues[qid].total_packets, 1);
+
+ PacketQueue *q = ctx->queues[qid].q;
+ SCMutexLock(&q->mutex_q);
+ PacketEnqueue(q, p);
+ SCCondSignal(&q->cond_q);
+ SCMutexUnlock(&q->mutex_q);
+
+ return;
+}
+
#ifdef UNITTESTS
static int TmqhOutputFlowSetupCtxTest01(void)