]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
support flow q handler schedulers active_flows and active_packets. Support new yaml...
authorAnoop Saldanha <poonaatsoc@gmail.com>
Wed, 11 Jan 2012 18:33:13 +0000 (00:03 +0530)
committerVictor Julien <victor@inliniac.net>
Mon, 19 Mar 2012 11:55:41 +0000 (12:55 +0100)
src/flow-util.h
src/runmode-pcap-file.c
src/suricata.c
src/tmqh-flow.c
src/tmqh-flow.h
suricata.yaml.in

index 751687dd610b1543e203ab771fec14069ab6be7b..e90d9c44db33ef02671b445b7cbb509538dcd7df 100644 (file)
@@ -25,6 +25,7 @@
 #define __FLOW_UTIL_H__
 
 #include "detect-engine-state.h"
+#include "tmqh-flow.h"
 
 #define COPY_TIMESTAMP(src,dst) ((dst)->tv_sec = (src)->tv_sec, (dst)->tv_usec = (src)->tv_usec)
 
         (f)->tag_list = NULL; \
         GenericVarFree((f)->flowvar); \
         (f)->flowvar = NULL; \
-        (f)->autofp_tmqh_flow_qid = -1; \
+        if ((f)->autofp_tmqh_flow_qid != -1) {  \
+            TmqhFlowUpdateActiveFlows((f));     \
+            (f)->autofp_tmqh_flow_qid = -1;     \
+        }                                       \
         RESET_COUNTERS((f)); \
     } while(0)
 
         DetectTagDataListFree((f)->tag_list); \
         GenericVarFree((f)->flowvar); \
         SCMutexDestroy(&(f)->de_state_m); \
+        if ((f)->autofp_tmqh_flow_qid != -1) {  \
+            TmqhFlowUpdateActiveFlows((f));     \
+        }                                       \
         (f)->tag_list = NULL; \
     } while(0)
 
index 27c8edd7fc5e39ba98fb8efb182ef3a930864639..0bfe01ec12d7c064315366d03f60c8f827461f31 100644 (file)
@@ -515,5 +515,6 @@ int RunModeFilePcapAutoFp(DetectEngineCtx *de_ctx)
         else
             cpu++;
     }
+
     return 0;
 }
index 40c5951282ea2c1aadbb22409cbc3530940af961..cf1636d286ce91d5b519b0027252457bd63c1fc8 100644 (file)
@@ -1902,5 +1902,8 @@ int main(int argc, char **argv)
 #endif /* OS_WIN32 */
 
     SC_ATOMIC_DESTROY(engine_stage);
+    if (strcasecmp(RunmodeGetActive(), "autofp") == 0)
+        TmqhFlowPrintStatistics();
+
     exit(engine_retval);
 }
index 91d662016cf228d0d4ccaa95899224c8bb50eb2b..92cb70c5281aa4dd39269b5a168513c8db63329b 100644 (file)
 #include "decode.h"
 #include "threads.h"
 #include "threadvars.h"
+#include "tmqh-flow.h"
 
 #include "tm-queuehandlers.h"
 
+#include "conf.h"
 #include "util-unittest.h"
 
-typedef struct TmqhFlowMode_ {
-    PacketQueue *q;
-
-    SC_ATOMIC_DECLARE(uint64_t, active_flows);
-    SC_ATOMIC_DECLARE(uint64_t, total_packets);
-} TmqhFlowMode;
-
-/** \brief Ctx for the flow queue handler
- *  \param size number of queues to output to
- *  \param queues array of queue id's this flow handler outputs to */
-typedef struct TmqhFlowCtx_ {
-    uint16_t size;
-    uint16_t last;
-
-    TmqhFlowMode *queues;
-
-    uint16_t round_robin_idx;
-} TmqhFlowCtx;
-
 Packet *TmqhInputFlow(ThreadVars *t);
+void TmqhOutputFlowActiveFlows(ThreadVars *t, Packet *p);
+void TmqhOutputFlowActivePackets(ThreadVars *t, Packet *p);
 void TmqhOutputFlowRoundRobin(ThreadVars *t, Packet *p);
 void *TmqhOutputFlowSetupCtx(char *queue_str);
 void TmqhFlowRegisterTests(void);
 
+TmqhFlowCtx *tmqh_flow_outctx = NULL;
+
 void TmqhFlowRegister (void) {
     tmqh_table[TMQH_FLOW].name = "flow";
     tmqh_table[TMQH_FLOW].InHandler = TmqhInputFlow;
-    tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowRoundRobin;
     tmqh_table[TMQH_FLOW].OutHandlerCtxSetup = TmqhOutputFlowSetupCtx;
     tmqh_table[TMQH_FLOW].OutHandlerCtxFree = NULL;
     tmqh_table[TMQH_FLOW].RegisterTests = TmqhFlowRegisterTests;
+
+    char *scheduler = NULL;
+    if (ConfGet("autofp-scheduler", &scheduler) == 1) {
+        if (strcmp(scheduler, "round_robin") == 0) {
+            SCLogInfo("AutoFP mode using \"Round Robin\" Q Handler");
+            tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowRoundRobin;
+        } else if (strcmp(scheduler, "active_flows") == 0) {
+            SCLogInfo("AutoFP mode using \"Active Flows\" Q Handler");
+            tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowActiveFlows;
+        } else if (strcmp(scheduler, "active_packets") == 0) {
+            SCLogInfo("AutoFP mode using \"Active Packets\" Q Handler");
+            tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowActivePackets;
+        } else {
+            SCLogError(SC_ERR_INVALID_YAML_CONF_ENTRY, "Invalid entry \"%s\" for "
+                       "autofp-scheduler in conf.  Killing engine.", scheduler);
+            exit(EXIT_FAILURE);
+        }
+    } else {
+        SCLogInfo("AutoFP mode using default \"Round Robin\" Q Handler");
+        tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowRoundRobin;
+    }
+
+    return;
 }
 
 /* same as 'simple' */
@@ -110,7 +119,7 @@ static int StoreQueueId(TmqhFlowCtx *ctx, char *name) {
     if (ctx->queues == NULL) {
         ctx->size = 1;
         ctx->queues = SCMalloc(ctx->size * sizeof(TmqhFlowMode));
-        memset(ctx->queues, 0, ctx->size * sizeof(sizeof(TmqhFlowMode)));
+        memset(ctx->queues, 0, ctx->size * sizeof(TmqhFlowMode));
         if (ctx->queues == NULL) {
             return -1;
         }
@@ -120,9 +129,10 @@ static int StoreQueueId(TmqhFlowCtx *ctx, char *name) {
         if (ctx->queues == NULL) {
             return -1;
         }
-        memset(ctx->queues + (ctx->size - 1), 0, sizeof(sizeof(TmqhFlowMode)));
+        memset(ctx->queues + (ctx->size - 1), 0, sizeof(TmqhFlowMode));
     }
     ctx->queues[ctx->size - 1].q = &trans_q[id];
+    SC_ATOMIC_INIT(ctx->queues[ctx->size - 1].active_flows);
 
     return 0;
 }
@@ -168,6 +178,8 @@ void *TmqhOutputFlowSetupCtx(char *queue_str) {
         tstr = comma ? (comma + 1) : comma;
     } while (tstr != NULL);
 
+    tmqh_flow_outctx = ctx;
+
     SCFree(str);
     return (void *)ctx;
 error:
@@ -194,7 +206,54 @@ void TmqhOutputFlowRoundRobin(ThreadVars *tv, Packet *p)
         if (qid == -1) {
             p->flow->autofp_tmqh_flow_qid = qid = ctx->round_robin_idx++;
             ctx->round_robin_idx = ctx->round_robin_idx % ctx->size;
+            SC_ATOMIC_ADD(ctx->queues[qid].active_flows, 1);
+            ctx->queues[qid].total_flows++;
+        }
+        ctx->queues[qid].total_packets++;
+    } else {
+        qid = ctx->last++;
+
+        if (ctx->last == ctx->size)
+            ctx->last = 0;
+    }
+
+    PacketQueue *q = ctx->queues[qid].q;
+    SCMutexLock(&q->mutex_q);
+    PacketEnqueue(q, p);
+    SCCondSignal(&q->cond_q);
+    SCMutexUnlock(&q->mutex_q);
+}
+
+/** \brief select the queue to output to based on flow
+ *  \param tv thread vars
+ *  \param p packet
+ */
+void TmqhOutputFlowActivePackets(ThreadVars *tv, Packet *p)
+{
+    int32_t qid = 0;
+
+    TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx;
+
+    /* if no flow we use the first queue,
+     * should be rare */
+    if (p->flow != NULL) {
+        qid = p->flow->autofp_tmqh_flow_qid;
+        if (qid == -1) {
+            uint16_t i = 0;
+            int lowest_id = 0;
+            TmqhFlowMode *queues = ctx->queues;
+            uint32_t lowest = queues[i].q->len;
+            for (i = 1; i < ctx->size; i++) {
+                if (queues[i].q->len < lowest) {
+                    lowest = queues[i].q->len;
+                    lowest_id = i;
+                }
+            }
+            p->flow->autofp_tmqh_flow_qid = qid = lowest_id;
+            SC_ATOMIC_ADD(ctx->queues[qid].active_flows, 1);
+            queues[qid].total_flows++;
         }
+        ctx->queues[qid].total_packets++;
     } else {
         qid = ctx->last++;
 
@@ -209,6 +268,67 @@ void TmqhOutputFlowRoundRobin(ThreadVars *tv, Packet *p)
     SCMutexUnlock(&q->mutex_q);
 }
 
+/** \brief select the queue to output to based on flow
+ *  \param tv thread vars
+ *  \param p packet
+ */
+void TmqhOutputFlowActiveFlows(ThreadVars *tv, Packet *p)
+{
+    int32_t qid = 0;
+
+    TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx;
+
+    /* if no flow we use the first queue,
+     * should be rare */
+    if (p->flow != NULL) {
+        qid = p->flow->autofp_tmqh_flow_qid;
+        if (qid == -1) {
+            uint32_t i = 0;
+            int lowest_id = 0;
+            uint32_t lowest = ctx->queues[i].active_flows_sc_atomic__;
+            for (i = 1; i < ctx->size; i++) {
+                if (ctx->queues[i].active_flows_sc_atomic__ < lowest) {
+                    lowest = ctx->queues[i].active_flows_sc_atomic__;
+                    lowest_id = i;
+                }
+            }
+            p->flow->autofp_tmqh_flow_qid = qid = lowest_id;
+            SC_ATOMIC_ADD(ctx->queues[qid].active_flows, 1);
+            ctx->queues[qid].total_flows++;
+        }
+        ctx->queues[qid].total_packets++;
+    } else {
+        qid = ctx->last++;
+
+        if (ctx->last == ctx->size)
+            ctx->last = 0;
+    }
+
+    PacketQueue *q = ctx->queues[qid].q;
+    SCMutexLock(&q->mutex_q);
+    PacketEnqueue(q, p);
+    SCCondSignal(&q->cond_q);
+    SCMutexUnlock(&q->mutex_q);
+}
+
+void TmqhFlowPrintStatistics(void)
+{
+    uint32_t i;
+
+    SCLogInfo("AutoFP - Total flow handler queues - %" PRIu16,
+              tmqh_flow_outctx->size);
+    for (i = 0; i < tmqh_flow_outctx->size; i++) {
+        SCLogInfo("AutoFP - Total Packets - Queue %"PRIu32 " - %"PRIu64 , i,
+                  tmqh_flow_outctx->queues[i].total_packets);
+    }
+    for (i = 0; i < tmqh_flow_outctx->size; i++) {
+        SCLogInfo("AutoFP - Total Flows - Queue %"PRIu32 " - %"PRIu64 , i,
+                  tmqh_flow_outctx->queues[i].total_flows);
+    }
+
+    return;
+}
+
 #ifdef UNITTESTS
 
 #if 0
index cdaacda624c60e3ed9d766289f27f96cb04bac82..84ea3bcdf7c2802f92467f3f51066338771d3381 100644 (file)
 #ifndef __TMQH_FLOW_H__
 #define __TMQH_FLOW_H__
 
+typedef struct TmqhFlowMode_ {
+    PacketQueue *q;
+
+    SC_ATOMIC_DECLARE(uint64_t, active_flows);
+    uint64_t total_packets;
+    uint64_t total_flows;
+} TmqhFlowMode;
+
+/** \brief Ctx for the flow queue handler
+ *  \param size number of queues to output to
+ *  \param queues array of queue id's this flow handler outputs to */
+typedef struct TmqhFlowCtx_ {
+    uint16_t size;
+    uint16_t last;
+
+    TmqhFlowMode *queues;
+
+    uint16_t round_robin_idx;
+} TmqhFlowCtx;
+
+extern TmqhFlowCtx *tmqh_flow_outctx;
+
+static inline void TmqhFlowUpdateActiveFlows(Flow *f)
+{
+    SC_ATOMIC_SUB(tmqh_flow_outctx->queues[f->autofp_tmqh_flow_qid].active_flows, 1);
+
+    return;
+}
+
 void TmqhFlowRegister (void);
 void TmqhFlowRegisterTests(void);
+void TmqhFlowPrintStatistics(void);
 
 #endif /* __TMQH_FLOW_H__ */
index f16aac46146b5c06409e613db942012763e772a1..cde848a957578dd2fc2471b8154bb7a0801f1153 100644 (file)
 # to get the runmode custom modes that can be used here for a particular runmode.
 #runmode: auto
 
+# Specifies the kind of q scheduler used by flow pinned autofp mode.
+# Supported scheduler are :
+# round_robin - Flow alloted to queue in a round robin fashion.
+# active-packets - Flow alloted to queue that has the least no of
+#                  unprocessed packets.
+# active-flows - Flow alloted to queue that has least no of active flows.
+autofp-scheduler: round-robin
+
 # Default pid file.
 # Will use this file if no --pidfile in command options.
 #pid-file: /var/run/suricata.pid