]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
autofp: add "ippair" scheduler
authorcdwakelin <cwakelin@emergingthreats.net>
Wed, 23 Mar 2016 17:13:55 +0000 (17:13 +0000)
committerVictor Julien <victor@inliniac.net>
Sat, 26 Mar 2016 10:04:31 +0000 (11:04 +0100)
Add "ippair" autofp scheduler to split traffic based on source and
destination IP only (not ports).

- This is useful when using the "xbits" feature to track events
  that occur between the same hosts but not necessarily the same
  flow (such as exploit kit landings/expoits/payloads)
- The disadvantage is that traffic may be balanced very unevenly
  between threads if some host pairs are much more frequently seen
  than others, so it may be only practicable for sandbox or pcap
  analysis
- not tested for IPv6

See https://redmine.openinfosecfoundation.org/issues/1661

src/tmqh-flow.c

index a1ec839bcfbec930ad6fffb9d0ced249a2de2d98..49679b8fb203d7da9eed79a9ec963f191294a162 100644 (file)
@@ -41,6 +41,7 @@
 
 Packet *TmqhInputFlow(ThreadVars *t);
 void TmqhOutputFlowHash(ThreadVars *t, Packet *p);
+void TmqhOutputFlowIPPair(ThreadVars *t, Packet *p);
 void TmqhOutputFlowActivePackets(ThreadVars *t, Packet *p);
 void TmqhOutputFlowRoundRobin(ThreadVars *t, Packet *p);
 void *TmqhOutputFlowSetupCtx(char *queue_str);
@@ -66,6 +67,9 @@ void TmqhFlowRegister(void)
         } else if (strcasecmp(scheduler, "hash") == 0) {
             SCLogInfo("AutoFP mode using \"Hash\" flow load balancer");
             tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowHash;
+        } else if (strcasecmp(scheduler, "ippair") == 0) {
+            SCLogInfo("AutoFP mode using \"ippair\" flow load balancer");
+            tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowIPPair;
         } else {
             SCLogError(SC_ERR_INVALID_YAML_CONF_ENTRY, "Invalid entry \"%s\" "
                        "for autofp-scheduler in conf.  Killing engine.",
@@ -359,6 +363,56 @@ void TmqhOutputFlowHash(ThreadVars *tv, Packet *p)
     return;
 }
 
+/**
+ * \brief select the queue to output based on IP address pair.
+ *
+ * \param tv thread vars.
+ * \param p packet.
+ */
+void TmqhOutputFlowIPPair(ThreadVars *tv, Packet *p)
+{
+    int16_t qid = 0;
+    uint32_t addr_hash = 0;
+    int i;
+
+    TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx;
+
+    /* if no flow we use the first queue,
+     * should be rare */
+    if (p->flow != NULL) {
+        qid = SC_ATOMIC_GET(p->flow->autofp_tmqh_flow_qid);
+        if (qid == -1) {
+            if (p->src.family == AF_INET6) {
+                for (i = 0; i < 4; i++) {
+                    addr_hash += p->src.addr_data32[i] + p->dst.addr_data32[i];
+                }
+            } else {
+                addr_hash = p->src.addr_data32[0] + p->dst.addr_data32[0];
+            }
+
+            /* we don't have to worry about possible overflow, since
+             * ctx->size will be lesser than 2 ** 31 for sure */
+            qid = addr_hash % ctx->size;
+            (void) SC_ATOMIC_SET(p->flow->autofp_tmqh_flow_qid, qid);
+            (void) SC_ATOMIC_ADD(ctx->queues[qid].total_flows, 1);
+        }
+    } else {
+        qid = ctx->last++;
+
+        if (ctx->last == ctx->size)
+            ctx->last = 0;
+    }
+    (void) SC_ATOMIC_ADD(ctx->queues[qid].total_packets, 1);
+
+    PacketQueue *q = ctx->queues[qid].q;
+    SCMutexLock(&q->mutex_q);
+    PacketEnqueue(q, p);
+    SCCondSignal(&q->cond_q);
+    SCMutexUnlock(&q->mutex_q);
+
+    return;
+}
+
 #ifdef UNITTESTS
 
 static int TmqhOutputFlowSetupCtxTest01(void)