]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
Adapt flow tmqh counters to be atomic vars. Remove support for active flows q handler...
authorAnoop Saldanha <poonaatsoc@gmail.com>
Mon, 12 Mar 2012 14:18:49 +0000 (19:48 +0530)
committerVictor Julien <victor@inliniac.net>
Mon, 19 Mar 2012 11:55:41 +0000 (12:55 +0100)
src/flow-util.h
src/flow.h
src/suricata.c
src/threadvars.h
src/tm-threads.c
src/tmqh-flow.c
src/tmqh-flow.h
src/util-atomic.h
suricata.yaml.in

index e90d9c44db33ef02671b445b7cbb509538dcd7df..57bcbf7e94112b7fbcf37f03a1f3f05125a3a685 100644 (file)
@@ -62,7 +62,7 @@
         (f)->hprev = NULL; \
         (f)->lnext = NULL; \
         (f)->lprev = NULL; \
-        (f)->autofp_tmqh_flow_qid = -1; \
+        SC_ATOMIC_SET((f)->autofp_tmqh_flow_qid, -1);  \
         RESET_COUNTERS((f)); \
     } while (0)
 
@@ -93,9 +93,8 @@
         (f)->tag_list = NULL; \
         GenericVarFree((f)->flowvar); \
         (f)->flowvar = NULL; \
-        if ((f)->autofp_tmqh_flow_qid != -1) {  \
-            TmqhFlowUpdateActiveFlows((f));     \
-            (f)->autofp_tmqh_flow_qid = -1;     \
+        if (SC_ATOMIC_GET((f)->autofp_tmqh_flow_qid) != -1) {   \
+            SC_ATOMIC_SET((f)->autofp_tmqh_flow_qid, -1);   \
         }                                       \
         RESET_COUNTERS((f)); \
     } while(0)
         DetectTagDataListFree((f)->tag_list); \
         GenericVarFree((f)->flowvar); \
         SCMutexDestroy(&(f)->de_state_m); \
-        if ((f)->autofp_tmqh_flow_qid != -1) {  \
-            TmqhFlowUpdateActiveFlows((f));     \
+        if (SC_ATOMIC_GET((f)->autofp_tmqh_flow_qid) != -1) {   \
+            SC_ATOMIC_DESTROY((f)->autofp_tmqh_flow_qid);   \
         }                                       \
         (f)->tag_list = NULL; \
     } while(0)
index fe95cd6e280c80dafe87c179b6ef2159352e959d..c5a2af899353f89fc061a61e55dedd1db75d2b1d 100644 (file)
@@ -312,7 +312,7 @@ typedef struct Flow_
     uint64_t bytecnt;
 #endif
 
-    int32_t autofp_tmqh_flow_qid;
+    SC_ATOMIC_DECLARE(int, autofp_tmqh_flow_qid);
 } Flow;
 
 enum {
index cf1636d286ce91d5b519b0027252457bd63c1fc8..766951ce1582b9ee4564cca6f9c8d0bdfa2b8b87 100644 (file)
@@ -1902,8 +1902,6 @@ int main(int argc, char **argv)
 #endif /* OS_WIN32 */
 
     SC_ATOMIC_DESTROY(engine_stage);
-    if (strcasecmp(RunmodeGetActive(), "autofp") == 0)
-        TmqhFlowPrintStatistics();
 
     exit(engine_retval);
 }
index 9f81c347ce9ed3a87e9e53281ad64079dd977759..c8f760e5589f26c6c5dc49a8808fa3ea131a8a7e 100644 (file)
@@ -73,6 +73,7 @@ typedef struct ThreadVars_ {
     Tmq *inq;
     Tmq *outq;
     void *outctx;
+    char *outqh_name;
 
     /** queue handlers */
     struct Packet_ * (*tmqh_in)(struct ThreadVars_ *);
index c5eb4c9664fc9bafbcbab659250eff0d3795d400..9625f1a78b84d49353da233f92a1bd6150216b80 100644 (file)
@@ -1192,6 +1192,7 @@ ThreadVars *TmThreadCreate(char *name, char *inq_name, char *inqh_name,
             goto error;
 
         tv->tmqh_out = tmqh->OutHandler;
+        tv->outqh_name = tmqh->name;
 
         if (outq_name != NULL && strcmp(outq_name, "packetpool") != 0) {
             SCLogDebug("outq_name \"%s\"", outq_name);
@@ -1430,6 +1431,16 @@ void TmThreadKillThread(ThreadVars *tv)
         SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
     }
 
+    if (tv->outctx != NULL) {
+        Tmqh *tmqh = TmqhGetQueueHandlerByName(tv->outqh_name);
+        if (tmqh == NULL)
+            BUG_ON(1);
+
+        if (tmqh->OutHandlerCtxFree != NULL) {
+            tmqh->OutHandlerCtxFree(tv->outctx);
+        }
+    }
+
     if (tv->cond != NULL ) {
         int cnt = 0;
         while (1) {
index 038a8fd8d39fc9951a7d85bbf054d6f955e3b881..ec2a634d44a02e10e3ff54469ddece03e945bd1a 100644 (file)
@@ -62,9 +62,6 @@ void TmqhFlowRegister(void)
         if (strcmp(scheduler, "round_robin") == 0) {
             SCLogInfo("AutoFP mode using \"Round Robin\" Q Handler");
             tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowRoundRobin;
-        } else if (strcmp(scheduler, "active_flows") == 0) {
-            SCLogInfo("AutoFP mode using \"Active Flows\" Q Handler");
-            tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowActiveFlows;
         } else if (strcmp(scheduler, "active_packets") == 0) {
             SCLogInfo("AutoFP mode using \"Active Packets\" Q Handler");
             tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowActivePackets;
@@ -75,8 +72,8 @@ void TmqhFlowRegister(void)
             exit(EXIT_FAILURE);
         }
     } else {
-        SCLogInfo("AutoFP mode using default \"Round Robin\" Q Handler");
-        tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowRoundRobin;
+        SCLogInfo("AutoFP mode using default \"Active Packets\" Q Handler");
+        tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowActivePackets;
     }
 
     return;
@@ -134,7 +131,8 @@ static int StoreQueueId(TmqhFlowCtx *ctx, char *name)
         memset(ctx->queues + (ctx->size - 1), 0, sizeof(TmqhFlowMode));
     }
     ctx->queues[ctx->size - 1].q = &trans_q[id];
-    SC_ATOMIC_INIT(ctx->queues[ctx->size - 1].active_flows);
+    SC_ATOMIC_INIT(ctx->queues[ctx->size - 1].total_packets);
+    SC_ATOMIC_INIT(ctx->queues[ctx->size - 1].total_flows);
 
     return 0;
 }
@@ -185,6 +183,8 @@ void *TmqhOutputFlowSetupCtx(char *queue_str)
 
     tmqh_flow_outctx = ctx;
 
+    SC_ATOMIC_INIT(ctx->round_robin_idx);
+
     SCFree(str);
     return (void *)ctx;
 
@@ -200,8 +200,15 @@ void TmqhOutputFlowFreeCtx(void *ctx)
     int i;
     TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx;
 
+    SCLogInfo("AutoFP - Total flow handler queues - %" PRIu16,
+              tmqh_flow_outctx->size);
     for (i = 0; i < fctx->size; i++) {
-        SC_ATOMIC_DESTROY(fctx->queues[i].active_flows);
+        SCLogInfo("AutoFP - Total Packets - Queue %"PRIu32 " - %"PRIu64 , i,
+                  SC_ATOMIC_GET(fctx->queues[i].total_packets));
+        SCLogInfo("AutoFP - Total Flows - Queue %"PRIu32 " - %"PRIu64 , i,
+                  SC_ATOMIC_GET(fctx->queues[i].total_flows));
+        SC_ATOMIC_DESTROY(fctx->queues[i].total_packets);
+        SC_ATOMIC_DESTROY(fctx->queues[i].total_flows);
     }
 
     SCFree(fctx->queues);
@@ -226,20 +233,23 @@ void TmqhOutputFlowRoundRobin(ThreadVars *tv, Packet *p)
     /* if no flow we use the first queue,
      * should be rare */
     if (p->flow != NULL) {
-        qid = p->flow->autofp_tmqh_flow_qid;
+        qid = SC_ATOMIC_GET(p->flow->autofp_tmqh_flow_qid);
         if (qid == -1) {
-            p->flow->autofp_tmqh_flow_qid = qid = ctx->round_robin_idx++;
-            ctx->round_robin_idx = ctx->round_robin_idx % ctx->size;
-            SC_ATOMIC_ADD(ctx->queues[qid].active_flows, 1);
-            ctx->queues[qid].total_flows++;
+            qid = SC_ATOMIC_ADD(ctx->round_robin_idx, 1);
+            if (qid >= ctx->size) {
+                SC_ATOMIC_RESET(ctx->round_robin_idx);
+                qid = 0;
+            }
+            SC_ATOMIC_ADD(ctx->queues[qid].total_flows, 1);
+            SC_ATOMIC_SET(p->flow->autofp_tmqh_flow_qid, qid);
         }
-        ctx->queues[qid].total_packets++;
     } else {
         qid = ctx->last++;
 
         if (ctx->last == ctx->size)
             ctx->last = 0;
     }
+    SC_ATOMIC_ADD(ctx->queues[qid].total_packets, 1);
 
     PacketQueue *q = ctx->queues[qid].q;
     SCMutexLock(&q->mutex_q);
@@ -265,7 +275,7 @@ void TmqhOutputFlowActivePackets(ThreadVars *tv, Packet *p)
     /* if no flow we use the first queue,
      * should be rare */
     if (p->flow != NULL) {
-        qid = p->flow->autofp_tmqh_flow_qid;
+        qid = SC_ATOMIC_GET(p->flow->autofp_tmqh_flow_qid);
         if (qid == -1) {
             uint16_t i = 0;
             int lowest_id = 0;
@@ -277,17 +287,17 @@ void TmqhOutputFlowActivePackets(ThreadVars *tv, Packet *p)
                     lowest_id = i;
                 }
             }
-            p->flow->autofp_tmqh_flow_qid = qid = lowest_id;
-            SC_ATOMIC_ADD(ctx->queues[qid].active_flows, 1);
-            queues[qid].total_flows++;
+            qid = lowest_id;
+            SC_ATOMIC_SET(p->flow->autofp_tmqh_flow_qid, lowest_id);
+            SC_ATOMIC_ADD(ctx->queues[qid].total_flows, 1);
         }
-        ctx->queues[qid].total_packets++;
     } else {
         qid = ctx->last++;
 
         if (ctx->last == ctx->size)
             ctx->last = 0;
     }
+    SC_ATOMIC_ADD(ctx->queues[qid].total_packets, 1);
 
     PacketQueue *q = ctx->queues[qid].q;
     SCMutexLock(&q->mutex_q);
@@ -298,76 +308,6 @@ void TmqhOutputFlowActivePackets(ThreadVars *tv, Packet *p)
     return;
 }
 
-/**
- * \brief select the queue to output to based on active flows.
- *
- * \param tv thread vars
- * \param p packet
- */
-void TmqhOutputFlowActiveFlows(ThreadVars *tv, Packet *p)
-{
-    int32_t qid = 0;
-
-    TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx;
-
-    /* if no flow we use the first queue,
-     * should be rare */
-    if (p->flow != NULL) {
-        qid = p->flow->autofp_tmqh_flow_qid;
-        if (qid == -1) {
-            uint32_t i = 0;
-            int lowest_id = 0;
-            uint32_t lowest = ctx->queues[i].active_flows_sc_atomic__;
-            for (i = 1; i < ctx->size; i++) {
-                if (ctx->queues[i].active_flows_sc_atomic__ < lowest) {
-                    lowest = ctx->queues[i].active_flows_sc_atomic__;
-                    lowest_id = i;
-                }
-            }
-            p->flow->autofp_tmqh_flow_qid = qid = lowest_id;
-            SC_ATOMIC_ADD(ctx->queues[qid].active_flows, 1);
-            ctx->queues[qid].total_flows++;
-        }
-        ctx->queues[qid].total_packets++;
-    } else {
-        qid = ctx->last++;
-
-        if (ctx->last == ctx->size)
-            ctx->last = 0;
-    }
-
-    PacketQueue *q = ctx->queues[qid].q;
-    SCMutexLock(&q->mutex_q);
-    PacketEnqueue(q, p);
-    SCCondSignal(&q->cond_q);
-    SCMutexUnlock(&q->mutex_q);
-
-    return;
-}
-
-/**
- * \brief Prints flow q handler statistics.
- *
- *        Requires engine to be dead when this function's called.
- */
-void TmqhFlowPrintStatistics(void)
-{
-    uint32_t i;
-
-    SCLogInfo("AutoFP - Total flow handler queues - %" PRIu16,
-              tmqh_flow_outctx->size);
-    for (i = 0; i < tmqh_flow_outctx->size; i++) {
-        SCLogInfo("AutoFP - Total Packets - Queue %"PRIu32 " - %"PRIu64 , i,
-                  tmqh_flow_outctx->queues[i].total_packets);
-    }
-    for (i = 0; i < tmqh_flow_outctx->size; i++) {
-        SCLogInfo("AutoFP - Total Flows - Queue %"PRIu32 " - %"PRIu64 , i,
-                  tmqh_flow_outctx->queues[i].total_flows);
-    }
-
-    return;
-}
-
 #ifdef UNITTESTS
 
 static int TmqhOutputFlowSetupCtxTest01(void)
index 84ea3bcdf7c2802f92467f3f51066338771d3381..adcc70eb84493d6490252e71fe682b6dda2102cf 100644 (file)
@@ -27,9 +27,8 @@
 typedef struct TmqhFlowMode_ {
     PacketQueue *q;
 
-    SC_ATOMIC_DECLARE(uint64_t, active_flows);
-    uint64_t total_packets;
-    uint64_t total_flows;
+    SC_ATOMIC_DECLARE(uint64_t, total_packets);
+    SC_ATOMIC_DECLARE(uint64_t, total_flows);
 } TmqhFlowMode;
 
 /** \brief Ctx for the flow queue handler
@@ -41,20 +40,10 @@ typedef struct TmqhFlowCtx_ {
 
     TmqhFlowMode *queues;
 
-    uint16_t round_robin_idx;
+    SC_ATOMIC_DECLARE(uint16_t, round_robin_idx);
 } TmqhFlowCtx;
 
-extern TmqhFlowCtx *tmqh_flow_outctx;
-
-static inline void TmqhFlowUpdateActiveFlows(Flow *f)
-{
-    SC_ATOMIC_SUB(tmqh_flow_outctx->queues[f->autofp_tmqh_flow_qid].active_flows, 1);
-
-    return;
-}
-
 void TmqhFlowRegister (void);
 void TmqhFlowRegisterTests(void);
-void TmqhFlowPrintStatistics(void);
 
 #endif /* __TMQH_FLOW_H__ */
index d48ce6c36dc636a5abf17be7d74c5b74bfaa77a9..adf4194189645f22a2da11f390241a99554dc1d9 100644 (file)
     var; \
 })
 
+/**
+ *  \brief Set the value for the atomic variable.
+ *
+ *  \retval var value
+ */
+#define SC_ATOMIC_SET(name, val) ({       \
+    typeof(name ## _sc_atomic__) var; \
+    do { \
+        SCSpinLock(&(name ## _sc_lock__)); \
+        var = (name ## _sc_atomic__) = val; \
+        SCSpinUnlock(&(name ## _sc_lock__)); \
+    } while (0); \
+    var; \
+})
+
 /**
  *  \brief atomic Compare and Switch
  *
 #define SC_ATOMIC_GET(name) \
     (name ## _sc_atomic__)
 
+/**
+ *  \brief Set the value for the atomic variable.
+ *
+ *  \retval var value
+ */
+#define SC_ATOMIC_SET(name, val) ({       \
+    while (SC_ATOMIC_CAS(&name, SC_ATOMIC_GET(name), val) == 0) \
+        ;                                                       \
+        })
+
 #endif /* !no atomic operations */
 #endif /* __UTIL_ATOMIC_H__ */
 
index cde848a957578dd2fc2471b8154bb7a0801f1153..b9a9e91a459e1d53ae2efde828d4e765375f323a 100644 (file)
@@ -24,8 +24,7 @@
 # round_robin - Flow alloted to queue in a round robin fashion.
 # active-packets - Flow alloted to queue that has the least no of
 #                  unprocessed packets.
-# active-flows - Flow alloted to queue that has least no of active flows.
-autofp-scheduler: round-robin
+#autofp-scheduler: active_packets
 
 # Default pid file.
 # Will use this file if no --pidfile in command options.