]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
autofp: update queue handlers 2089/head
authorVictor Julien <victor@inliniac.net>
Tue, 17 May 2016 16:34:55 +0000 (18:34 +0200)
committerVictor Julien <victor@inliniac.net>
Fri, 20 May 2016 07:04:27 +0000 (09:04 +0200)
Now that the flow lookup is done in the worker threads the flow
queue handlers running after the capture thread(s) no longer have
access to the flow. This limits the options of how flow balancing
can be done.

This patch removes all code that is now useless. The only 2 methods
that still make sense are 'hash' and 'ippair'.

src/flow-hash.c
src/flow-util.h
src/flow.h
src/tmqh-flow.c
src/tmqh-flow.h

index c7418aa2949138a4fe367079349fb0402a69d9b6..466b246cc59b8037f958cec09692b65028ee3e65 100644 (file)
@@ -415,7 +415,6 @@ static Flow *TcpReuseReplace(ThreadVars *tv, DecodeThreadVars *dtv,
     old_f->flags |= FLOW_TCP_REUSED;
     /* get some settings that we move over to the new flow */
     FlowThreadId thread_id = old_f->thread_id;
-    int16_t autofp_tmqh_flow_qid = SC_ATOMIC_GET(old_f->autofp_tmqh_flow_qid);
 
     /* since fb lock is still held this flow won't be found until we are done */
     FLOWLOCK_UNLOCK(old_f);
@@ -439,9 +438,6 @@ static Flow *TcpReuseReplace(ThreadVars *tv, DecodeThreadVars *dtv,
     f->fb = fb;
 
     f->thread_id = thread_id;
-    if (autofp_tmqh_flow_qid != -1) {
-        SC_ATOMIC_SET(f->autofp_tmqh_flow_qid, autofp_tmqh_flow_qid);
-    }
     return f;
 }
 
index ca6a49cccb291baf00af7bb348031748b7a09166..7ab08f6c4f0ab5bb712dcdf45ac4a1d127bd6723 100644 (file)
@@ -70,8 +70,6 @@
         (f)->hprev = NULL; \
         (f)->lnext = NULL; \
         (f)->lprev = NULL; \
-        SC_ATOMIC_INIT((f)->autofp_tmqh_flow_qid);  \
-        (void) SC_ATOMIC_SET((f)->autofp_tmqh_flow_qid, -1);  \
         RESET_COUNTERS((f)); \
     } while (0)
 
         (f)->sgh_toclient = NULL; \
         GenericVarFree((f)->flowvar); \
         (f)->flowvar = NULL; \
-        if (SC_ATOMIC_GET((f)->autofp_tmqh_flow_qid) != -1) {   \
-            (void) SC_ATOMIC_SET((f)->autofp_tmqh_flow_qid, -1);   \
-        }                                       \
         RESET_COUNTERS((f)); \
     } while(0)
 
             DetectEngineStateFlowFree((f)->de_state); \
         } \
         GenericVarFree((f)->flowvar); \
-        SC_ATOMIC_DESTROY((f)->autofp_tmqh_flow_qid);   \
     } while(0)
 
 /** \brief check if a memory alloc would fit in the memcap
index 95f3333a65dcb431e3b5e29b8f605b6b07baeedf..0400250e23627b3062b72faebbe55acc6d8321d8 100644 (file)
@@ -334,9 +334,6 @@ typedef struct Flow_
      */
     SC_ATOMIC_DECLARE(FlowRefCount, use_cnt);
 
-    /** flow queue id, used with autofp */
-    SC_ATOMIC_DECLARE(int16_t, autofp_tmqh_flow_qid);
-
     /** flow tenant id, used to setup flow timeout and stream pseudo
      *  packets with the correct tenant id set */
     uint32_t tenant_id;
index 881c5985cae8232b913212630ed7d5745ed6a577..480e909c32a0f9b68990f2f8c8f1822061a2a0a6 100644 (file)
@@ -42,8 +42,6 @@
 Packet *TmqhInputFlow(ThreadVars *t);
 void TmqhOutputFlowHash(ThreadVars *t, Packet *p);
 void TmqhOutputFlowIPPair(ThreadVars *t, Packet *p);
-void TmqhOutputFlowActivePackets(ThreadVars *t, Packet *p);
-void TmqhOutputFlowRoundRobin(ThreadVars *t, Packet *p);
 void *TmqhOutputFlowSetupCtx(char *queue_str);
 void TmqhOutputFlowFreeCtx(void *ctx);
 void TmqhFlowRegisterTests(void);
@@ -59,10 +57,10 @@ void TmqhFlowRegister(void)
     char *scheduler = NULL;
     if (ConfGet("autofp-scheduler", &scheduler) == 1) {
         if (strcasecmp(scheduler, "round-robin") == 0) {
-            tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowRoundRobin;
+            SCLogNotice("using flow hash instead of round robin");
+            tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowHash;
         } else if (strcasecmp(scheduler, "active-packets") == 0) {
-            //tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowActivePackets;
-            SCLogNotice("FIXME: using flow hash instead of active packets");
+            SCLogNotice("using flow hash instead of active packets");
             tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowHash;
         } else if (strcasecmp(scheduler, "hash") == 0) {
             tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowHash;
@@ -75,7 +73,6 @@ void TmqhFlowRegister(void)
             exit(EXIT_FAILURE);
         }
     } else {
-        //tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowActivePackets;
         tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowHash;
     }
 
@@ -88,8 +85,6 @@ void TmqhFlowPrintAutofpHandler(void)
     if (tmqh_table[TMQH_FLOW].OutHandler == (f))    \
         SCLogInfo("AutoFP mode using \"%s\" flow load balancer", (msg))
 
-    PRINT_IF_FUNC(TmqhOutputFlowRoundRobin, "Round Robin");
-    PRINT_IF_FUNC(TmqhOutputFlowActivePackets, "Active Packets");
     PRINT_IF_FUNC(TmqhOutputFlowHash, "Hash");
     PRINT_IF_FUNC(TmqhOutputFlowIPPair, "IPPair");
 
@@ -153,8 +148,6 @@ static int StoreQueueId(TmqhFlowCtx *ctx, char *name)
         memset(ctx->queues + (ctx->size - 1), 0, sizeof(TmqhFlowMode));
     }
     ctx->queues[ctx->size - 1].q = &trans_q[id];
-    SC_ATOMIC_INIT(ctx->queues[ctx->size - 1].total_packets);
-    SC_ATOMIC_INIT(ctx->queues[ctx->size - 1].total_flows);
 
     return 0;
 }
@@ -205,8 +198,6 @@ void *TmqhOutputFlowSetupCtx(char *queue_str)
         tstr = comma ? (comma + 1) : comma;
     } while (tstr != NULL);
 
-    SC_ATOMIC_INIT(ctx->round_robin_idx);
-
     SCFree(str);
     return (void *)ctx;
 
@@ -219,161 +210,16 @@ error:
 
 void TmqhOutputFlowFreeCtx(void *ctx)
 {
-    int i;
     TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx;
 
     SCLogInfo("AutoFP - Total flow handler queues - %" PRIu16,
               fctx->size);
-    for (i = 0; i < fctx->size; i++) {
-        SCLogInfo("AutoFP - Queue %-2"PRIu32 " - pkts: %-12"PRIu64" flows: %-12"PRIu64, i,
-                SC_ATOMIC_GET(fctx->queues[i].total_packets),
-                SC_ATOMIC_GET(fctx->queues[i].total_flows));
-        SC_ATOMIC_DESTROY(fctx->queues[i].total_packets);
-        SC_ATOMIC_DESTROY(fctx->queues[i].total_flows);
-    }
-
     SCFree(fctx->queues);
     SCFree(fctx);
 
     return;
 }
 
-/**
- * \brief select the queue to output in a round robin fashion.
- *
- * \param tv thread vars
- * \param p packet
- */
-void TmqhOutputFlowRoundRobin(ThreadVars *tv, Packet *p)
-{
-    int16_t qid = 0;
-
-    TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx;
-
-    /* if no flow we use the first queue,
-     * should be rare */
-    if (p->flow != NULL) {
-        qid = SC_ATOMIC_GET(p->flow->autofp_tmqh_flow_qid);
-        if (qid == -1) {
-            qid = SC_ATOMIC_ADD(ctx->round_robin_idx, 1);
-            if (qid >= ctx->size) {
-                SC_ATOMIC_RESET(ctx->round_robin_idx);
-                qid = 0;
-            }
-            (void) SC_ATOMIC_ADD(ctx->queues[qid].total_flows, 1);
-            (void) SC_ATOMIC_SET(p->flow->autofp_tmqh_flow_qid, qid);
-        }
-    } else {
-        qid = ctx->last++;
-
-        if (ctx->last == ctx->size)
-            ctx->last = 0;
-    }
-    (void) SC_ATOMIC_ADD(ctx->queues[qid].total_packets, 1);
-
-    PacketQueue *q = ctx->queues[qid].q;
-    SCMutexLock(&q->mutex_q);
-    PacketEnqueue(q, p);
-    SCCondSignal(&q->cond_q);
-    SCMutexUnlock(&q->mutex_q);
-
-    return;
-}
-
-/**
- * \brief select the queue to output to based on queue lengths.
- *
- * \param tv thread vars
- * \param p packet
- */
-void TmqhOutputFlowActivePackets(ThreadVars *tv, Packet *p)
-{
-    int16_t qid = 0;
-
-    TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx;
-
-    /* if no flow we round robin the packets over the queues */
-    if (p->flow != NULL) {
-        qid = SC_ATOMIC_GET(p->flow->autofp_tmqh_flow_qid);
-        if (qid == -1) {
-            int16_t i = 0;
-            int16_t lowest_id = 0;
-            TmqhFlowMode *queues = ctx->queues;
-            uint32_t lowest = queues[i].q->len;
-            for (i = 1; i < ctx->size; i++) {
-                if (queues[i].q->len < lowest) {
-                    lowest = queues[i].q->len;
-                    lowest_id = i;
-                }
-            }
-            qid = lowest_id;
-            (void) SC_ATOMIC_SET(p->flow->autofp_tmqh_flow_qid, lowest_id);
-            (void) SC_ATOMIC_ADD(ctx->queues[qid].total_flows, 1);
-        }
-    } else {
-        qid = ctx->last++;
-
-        if (ctx->last == ctx->size)
-            ctx->last = 0;
-    }
-    (void) SC_ATOMIC_ADD(ctx->queues[qid].total_packets, 1);
-
-    PacketQueue *q = ctx->queues[qid].q;
-    SCMutexLock(&q->mutex_q);
-    PacketEnqueue(q, p);
-    SCCondSignal(&q->cond_q);
-    SCMutexUnlock(&q->mutex_q);
-
-    return;
-}
-
-/**
- * \brief select the queue to output based on address hash.
- *
- * \param tv thread vars.
- * \param p packet.
- */
-void TmqhOutputFlowHash2(ThreadVars *tv, Packet *p)
-{
-    int16_t qid = 0;
-
-    TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx;
-
-    /* if no flow we use the first queue,
-     * should be rare */
-    if (p->flow != NULL) {
-        qid = SC_ATOMIC_GET(p->flow->autofp_tmqh_flow_qid);
-        if (qid == -1) {
-#if __WORDSIZE == 64
-            uint64_t addr = (uint64_t)p->flow;
-#else
-            uint32_t addr = (uint32_t)p->flow;
-#endif
-            addr >>= 7;
-
-            /* we don't have to worry about possible overflow, since
-             * ctx->size will be less than 2 ** 15 for sure */
-            qid = addr % ctx->size;
-            (void) SC_ATOMIC_SET(p->flow->autofp_tmqh_flow_qid, qid);
-            (void) SC_ATOMIC_ADD(ctx->queues[qid].total_flows, 1);
-        }
-    } else {
-        qid = ctx->last++;
-
-        if (ctx->last == ctx->size)
-            ctx->last = 0;
-    }
-    (void) SC_ATOMIC_ADD(ctx->queues[qid].total_packets, 1);
-
-    PacketQueue *q = ctx->queues[qid].q;
-    SCMutexLock(&q->mutex_q);
-    PacketEnqueue(q, p);
-    SCCondSignal(&q->cond_q);
-    SCMutexUnlock(&q->mutex_q);
-
-    return;
-}
-
 void TmqhOutputFlowHash(ThreadVars *tv, Packet *p)
 {
     int16_t qid = 0;
@@ -389,7 +235,6 @@ void TmqhOutputFlowHash(ThreadVars *tv, Packet *p)
         if (ctx->last == ctx->size)
             ctx->last = 0;
     }
-    (void) SC_ATOMIC_ADD(ctx->queues[qid].total_packets, 1);
 
     PacketQueue *q = ctx->queues[qid].q;
     SCMutexLock(&q->mutex_q);
@@ -399,6 +244,7 @@ void TmqhOutputFlowHash(ThreadVars *tv, Packet *p)
 
     return;
 }
+
 /**
  * \brief select the queue to output based on IP address pair.
  *
@@ -413,32 +259,17 @@ void TmqhOutputFlowIPPair(ThreadVars *tv, Packet *p)
 
     TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx;
 
-    /* if no flow we use the first queue,
-     * should be rare */
-    if (p->flow != NULL) {
-        qid = SC_ATOMIC_GET(p->flow->autofp_tmqh_flow_qid);
-        if (qid == -1) {
-            if (p->src.family == AF_INET6) {
-                for (i = 0; i < 4; i++) {
-                    addr_hash += p->src.addr_data32[i] + p->dst.addr_data32[i];
-                }
-            } else {
-                addr_hash = p->src.addr_data32[0] + p->dst.addr_data32[0];
-            }
-
-            /* we don't have to worry about possible overflow, since
-             * ctx->size will be lesser than 2 ** 31 for sure */
-            qid = addr_hash % ctx->size;
-            (void) SC_ATOMIC_SET(p->flow->autofp_tmqh_flow_qid, qid);
-            (void) SC_ATOMIC_ADD(ctx->queues[qid].total_flows, 1);
+    if (p->src.family == AF_INET6) {
+        for (i = 0; i < 4; i++) {
+            addr_hash += p->src.addr_data32[i] + p->dst.addr_data32[i];
         }
     } else {
-        qid = ctx->last++;
-
-        if (ctx->last == ctx->size)
-            ctx->last = 0;
+        addr_hash = p->src.addr_data32[0] + p->dst.addr_data32[0];
     }
-    (void) SC_ATOMIC_ADD(ctx->queues[qid].total_packets, 1);
+
+    /* we don't have to worry about possible overflow, since
+     * ctx->size will be lesser than 2 ** 31 for sure */
+    qid = addr_hash % ctx->size;
 
     PacketQueue *q = ctx->queues[qid].q;
     SCMutexLock(&q->mutex_q);
index e710ffeca468f58aa08f8c030a73a8f363beaa8b..3cec7a165f5b8bdbf2aded9761e5b7d3f6071296 100644 (file)
@@ -26,8 +26,6 @@
 
 typedef struct TmqhFlowMode_ {
     PacketQueue *q;
-    SC_ATOMIC_DECLARE(uint64_t, total_packets);
-    SC_ATOMIC_DECLARE(uint64_t, total_flows);
 } TmqhFlowMode;
 
 /** \brief Ctx for the flow queue handler
@@ -38,8 +36,6 @@ typedef struct TmqhFlowCtx_ {
     uint16_t last;
 
     TmqhFlowMode *queues;
-
-    SC_ATOMIC_DECLARE(int16_t, round_robin_idx);
 } TmqhFlowCtx;
 
 void TmqhFlowRegister (void);