]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
flow-manager: optimize hash walking 2266/head
authorVictor Julien <victor@inliniac.net>
Sat, 14 May 2016 06:56:49 +0000 (08:56 +0200)
committerVictor Julien <victor@inliniac.net>
Thu, 22 Sep 2016 11:36:28 +0000 (13:36 +0200)
Until now the flow manager would walk the entire flow hash table on an
interval. It would thus touch all flows, leading to a lot of memory
and cache pressure. In scenario's where the number of tracked flows run
into the hundreds on thousands, and the memory used can run into many
hundreds of megabytes or even gigabytes, this would lead to serious
performance degradation.

This patch introduces a new approach. A timestamp per flow bucket
(hash row) is maintained by the flow manager. It holds the timestamp
of the earliest possible timeout of a flow in the list. The hash walk
skips rows with timestamps beyond the current time.

As the timestamp depends on the flows in the hash row's list, and on
the 'state' of each flow in the list, any addition of a flow or
changing of a flow's state invalidates the timestamp. The flow manager
then has to walk the list again to set a new timestamp.

A utility function FlowUpdateState is introduced to change Flow states,
taking care of the bucket timestamp invalidation while at it.

Empty flow buckets use a special value so that we don't have to take
the flow bucket lock to find out the bucket is empty.

This patch also adds more performance counters:

flow_mgr.flows_checked         | Total    | 929
flow_mgr.flows_notimeout       | Total    | 391
flow_mgr.flows_timeout         | Total    | 538
flow_mgr.flows_removed         | Total    | 277
flow_mgr.flows_timeout_inuse   | Total    | 261
flow_mgr.rows_checked          | Total    | 1000000
flow_mgr.rows_skipped          | Total    | 998835
flow_mgr.rows_empty            | Total    | 290
flow_mgr.rows_maxlen           | Total    | 2

flow_mgr.flows_checked: number of flows checked for timeout in the
                        last pass
flow_mgr.flows_notimeout: number of flows out of flow_mgr.flows_checked
                        that didn't time out
flow_mgr.flows_timeout: number of out of flow_mgr.flows_checked that
                        did reach the time out
flow_mgr.flows_removed: number of flows out of flow_mgr.flows_timeout
                        that were really removed
flow_mgr.flows_timeout_inuse: number of flows out of flow_mgr.flows_timeout
                        that were still in use or needed work

flow_mgr.rows_checked: hash table rows checked
flow_mgr.rows_skipped: hash table rows skipped because non of the flows
                        would time out anyway

The counters below are only relating to rows that were not skipped.

flow_mgr.rows_empty:   empty hash rows
flow_mgr.rows_maxlen:  max number of flows per hash row. Best to keep low,
                        so increase hash-size if needed.
flow_mgr.rows_busy:    row skipped because it was locked by another thread

src/flow-hash.c
src/flow-hash.h
src/flow-manager.c
src/flow.c
src/flow.h
src/stream-tcp.c

index be4940d7d60948cb92b0355a226f8c74f9d51141..a330101b991db7692806dcac0e46ce181e566d31 100644 (file)
@@ -487,6 +487,7 @@ Flow *FlowGetFlowFromHash(ThreadVars *tv, DecodeThreadVars *dtv, const Packet *p
         FlowInit(f, p);
         f->flow_hash = hash;
         f->fb = fb;
+        FlowUpdateState(f, FLOW_STATE_NEW);
 
         /* update the last seen timestamp of this flow */
         COPY_TIMESTAMP(&p->ts,&f->lastts);
@@ -523,6 +524,7 @@ Flow *FlowGetFlowFromHash(ThreadVars *tv, DecodeThreadVars *dtv, const Packet *p
                 FlowInit(f, p);
                 f->flow_hash = hash;
                 f->fb = fb;
+                FlowUpdateState(f, FLOW_STATE_NEW);
 
                 /* update the last seen timestamp of this flow */
                 COPY_TIMESTAMP(&p->ts,&f->lastts);
@@ -649,6 +651,7 @@ static Flow *FlowGetUsedFlow(ThreadVars *tv, DecodeThreadVars *dtv)
         f->hnext = NULL;
         f->hprev = NULL;
         f->fb = NULL;
+        SC_ATOMIC_SET(fb->next_ts, 0);
         FBLOCK_UNLOCK(fb);
 
         int state = SC_ATOMIC_GET(f->flow_state);
@@ -670,6 +673,8 @@ static Flow *FlowGetUsedFlow(ThreadVars *tv, DecodeThreadVars *dtv)
 
         FlowClearMemory(f, f->protomap);
 
+        FlowUpdateState(f, FLOW_STATE_NEW);
+
         FLOWLOCK_UNLOCK(f);
 
         (void) SC_ATOMIC_ADD(flow_prune_idx, (flow_config.hash_size - cnt));
index e570b055513cef6b46df5e97a17b137f2c17576a..18a8c324b5a0426cca2178b4af32a084407cef90 100644 (file)
@@ -48,6 +48,12 @@ typedef struct FlowBucket_ {
 #else
     #error Enable FBLOCK_SPIN or FBLOCK_MUTEX
 #endif
+    /** timestamp in seconds of the earliest possible moment a flow
+     *  will time out in this row. Set by the flow manager. Cleared
+     *  to 0 by workers, either when new flows are added or when a
+     *  flow state changes. The flow manager sets this to INT_MAX for
+     *  empty buckets. */
+    SC_ATOMIC_DECLARE(int32_t, next_ts);
 } __attribute__((aligned(CLS))) FlowBucket;
 
 #ifdef FBLOCK_SPIN
index be58914a53c0c7f900e9354ad64379690a5f2702..0deb0bc571cabd1699e7d3061337b13d0b87fcf7 100644 (file)
@@ -109,6 +109,18 @@ typedef struct FlowTimeoutCounters_ {
     uint32_t est;
     uint32_t clo;
     uint32_t tcp_reuse;
+
+    uint32_t flows_checked;
+    uint32_t flows_notimeout;
+    uint32_t flows_timeout;
+    uint32_t flows_timeout_inuse;
+    uint32_t flows_removed;
+
+    uint32_t rows_checked;
+    uint32_t rows_skipped;
+    uint32_t rows_empty;
+    uint32_t rows_busy;
+    uint32_t rows_maxlen;
 } FlowTimeoutCounters;
 
 /**
@@ -181,7 +193,7 @@ void FlowDisableFlowManagerThread(void)
  *
  *  \retval timeout timeout in seconds
  */
-static inline uint32_t FlowGetFlowTimeout(const Flow *f, int state)
+static inline uint32_t FlowGetFlowTimeout(const Flow *f, enum FlowState state)
 {
     uint32_t timeout;
     FlowProtoTimeoutPtr flow_timeouts = SC_ATOMIC_GET(flow_timeouts);
@@ -209,14 +221,18 @@ static inline uint32_t FlowGetFlowTimeout(const Flow *f, int state)
  *  \retval 0 not timed out
  *  \retval 1 timed out
  */
-static int FlowManagerFlowTimeout(const Flow *f, int state, struct timeval *ts)
+static int FlowManagerFlowTimeout(const Flow *f, enum FlowState state, struct timeval *ts, int32_t *next_ts)
 {
     /* set the timeout value according to the flow operating mode,
      * flow's state and protocol.*/
     uint32_t timeout = FlowGetFlowTimeout(f, state);
 
+    int32_t flow_times_out_at = (int32_t)(f->lastts.tv_sec + timeout);
+    if (*next_ts == 0 || flow_times_out_at < *next_ts)
+        *next_ts = flow_times_out_at;
+
     /* do the timeout check */
-    if ((int32_t)(f->lastts.tv_sec + timeout) >= ts->tv_sec) {
+    if (flow_times_out_at >= ts->tv_sec) {
         return 0;
     }
 
@@ -268,20 +284,26 @@ static int FlowManagerFlowTimedOut(Flow *f, struct timeval *ts)
  *  \retval cnt timed out flows
  */
 static uint32_t FlowManagerHashRowTimeout(Flow *f, struct timeval *ts,
-        int emergency, FlowTimeoutCounters *counters)
+        int emergency, FlowTimeoutCounters *counters, int32_t *next_ts)
 {
     uint32_t cnt = 0;
+    uint32_t checked = 0;
 
     do {
+        checked++;
+
         /* check flow timeout based on lastts and state. Both can be
          * accessed w/o Flow lock as we do have the hash row lock (so flow
          * can't disappear) and flow_state is atomic. lastts can only
          * be modified when we have both the flow and hash row lock */
 
-        int state = SC_ATOMIC_GET(f->flow_state);
+        enum FlowState state = SC_ATOMIC_GET(f->flow_state);
 
         /* timeout logic goes here */
-        if (FlowManagerFlowTimeout(f, state, ts) == 0) {
+        if (FlowManagerFlowTimeout(f, state, ts, next_ts) == 0) {
+
+            counters->flows_notimeout++;
+
             f = f->hprev;
             continue;
         }
@@ -294,6 +316,8 @@ static uint32_t FlowManagerHashRowTimeout(Flow *f, struct timeval *ts,
 
         Flow *next_flow = f->hprev;
 
+        counters->flows_timeout++;
+
         /* check if the flow is fully timed out and
          * ready to be discarded. */
         if (FlowManagerFlowTimedOut(f, ts) == 1) {
@@ -347,13 +371,19 @@ static uint32_t FlowManagerHashRowTimeout(Flow *f, struct timeval *ts,
                     counters->clo++;
                     break;
             }
+            counters->flows_removed++;
         } else {
+            counters->flows_timeout_inuse++;
             FLOWLOCK_UNLOCK(f);
         }
 
         f = next_flow;
     } while (f != NULL);
 
+    counters->flows_checked += checked;
+    if (checked > counters->rows_maxlen)
+        counters->rows_maxlen = checked;
+
     return cnt;
 }
 
@@ -382,20 +412,37 @@ static uint32_t FlowTimeoutHash(struct timeval *ts, uint32_t try_cnt,
     for (idx = hash_min; idx < hash_max; idx++) {
         FlowBucket *fb = &flow_hash[idx];
 
+        counters->rows_checked++;
+
+        int32_t check_ts = SC_ATOMIC_GET(fb->next_ts);
+        if (check_ts > (int32_t)ts->tv_sec) {
+            counters->rows_skipped++;
+            continue;
+        }
+
         /* before grabbing the row lock, make sure we have at least
          * 9 packets in the pool */
         PacketPoolWaitForN(9);
 
-        if (FBLOCK_TRYLOCK(fb) != 0)
+        if (FBLOCK_TRYLOCK(fb) != 0) {
+            counters->rows_busy++;
             continue;
+        }
 
         /* flow hash bucket is now locked */
 
-        if (fb->tail == NULL)
+        if (fb->tail == NULL) {
+            SC_ATOMIC_SET(fb->next_ts, INT_MAX);
+            counters->rows_empty++;
             goto next;
+        }
+
+        int32_t next_ts = 0;
 
         /* we have a flow, or more than one */
-        cnt += FlowManagerHashRowTimeout(fb->tail, ts, emergency, counters);
+        cnt += FlowManagerHashRowTimeout(fb->tail, ts, emergency, counters, &next_ts);
+
+        SC_ATOMIC_SET(fb->next_ts, next_ts);
 
 next:
         FBLOCK_UNLOCK(fb);
@@ -502,6 +549,19 @@ typedef struct FlowManagerThreadData_ {
     uint16_t flow_emerg_mode_enter;
     uint16_t flow_emerg_mode_over;
     uint16_t flow_tcp_reuse;
+
+    uint16_t flow_mgr_flows_checked;
+    uint16_t flow_mgr_flows_notimeout;
+    uint16_t flow_mgr_flows_timeout;
+    uint16_t flow_mgr_flows_timeout_inuse;
+    uint16_t flow_mgr_flows_removed;
+
+    uint16_t flow_mgr_rows_checked;
+    uint16_t flow_mgr_rows_skipped;
+    uint16_t flow_mgr_rows_empty;
+    uint16_t flow_mgr_rows_busy;
+    uint16_t flow_mgr_rows_maxlen;
+
 } FlowManagerThreadData;
 
 static TmEcode FlowManagerThreadInit(ThreadVars *t, void *initdata, void **data)
@@ -540,6 +600,18 @@ static TmEcode FlowManagerThreadInit(ThreadVars *t, void *initdata, void **data)
     ftd->flow_emerg_mode_over = StatsRegisterCounter("flow.emerg_mode_over", t);
     ftd->flow_tcp_reuse = StatsRegisterCounter("flow.tcp_reuse", t);
 
+    ftd->flow_mgr_flows_checked = StatsRegisterCounter("flow_mgr.flows_checked", t);
+    ftd->flow_mgr_flows_notimeout = StatsRegisterCounter("flow_mgr.flows_notimeout", t);
+    ftd->flow_mgr_flows_timeout = StatsRegisterCounter("flow_mgr.flows_timeout", t);
+    ftd->flow_mgr_flows_timeout_inuse = StatsRegisterCounter("flow_mgr.flows_timeout_inuse", t);
+    ftd->flow_mgr_flows_removed = StatsRegisterCounter("flow_mgr.flows_removed", t);
+
+    ftd->flow_mgr_rows_checked = StatsRegisterCounter("flow_mgr.rows_checked", t);
+    ftd->flow_mgr_rows_skipped = StatsRegisterCounter("flow_mgr.rows_skipped", t);
+    ftd->flow_mgr_rows_empty = StatsRegisterCounter("flow_mgr.rows_empty", t);
+    ftd->flow_mgr_rows_busy = StatsRegisterCounter("flow_mgr.rows_busy", t);
+    ftd->flow_mgr_rows_maxlen = StatsRegisterCounter("flow_mgr.rows_maxlen", t);
+
     PacketPoolInit();
     return TM_ECODE_OK;
 }
@@ -609,7 +681,7 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
             FlowUpdateSpareFlows();
 
         /* try to time out flows */
-        FlowTimeoutCounters counters = { 0, 0, 0, 0, };
+        FlowTimeoutCounters counters = { 0, 0, 0, 0, 0,0,0,0,0,0,0,0,0,0,};
         FlowTimeoutHash(&ts, 0 /* check all */, ftd->min, ftd->max, &counters);
 
 
@@ -631,6 +703,18 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
         StatsAddUI64(th_v, ftd->flow_mgr_cnt_est, (uint64_t)counters.est);
         StatsAddUI64(th_v, ftd->flow_tcp_reuse, (uint64_t)counters.tcp_reuse);
 
+        StatsSetUI64(th_v, ftd->flow_mgr_flows_checked, (uint64_t)counters.flows_checked);
+        StatsSetUI64(th_v, ftd->flow_mgr_flows_notimeout, (uint64_t)counters.flows_notimeout);
+        StatsSetUI64(th_v, ftd->flow_mgr_flows_timeout, (uint64_t)counters.flows_timeout);
+        StatsSetUI64(th_v, ftd->flow_mgr_flows_removed, (uint64_t)counters.flows_removed);
+        StatsSetUI64(th_v, ftd->flow_mgr_flows_timeout_inuse, (uint64_t)counters.flows_timeout_inuse);
+
+        StatsSetUI64(th_v, ftd->flow_mgr_rows_checked, (uint64_t)counters.rows_checked);
+        StatsSetUI64(th_v, ftd->flow_mgr_rows_skipped, (uint64_t)counters.rows_skipped);
+        StatsSetUI64(th_v, ftd->flow_mgr_rows_maxlen, (uint64_t)counters.rows_maxlen);
+        StatsSetUI64(th_v, ftd->flow_mgr_rows_busy, (uint64_t)counters.rows_busy);
+        StatsSetUI64(th_v, ftd->flow_mgr_rows_empty, (uint64_t)counters.rows_empty);
+
         uint32_t len = 0;
         FQLOCK_LOCK(&flow_spare_q);
         len = flow_spare_q.len;
@@ -1046,8 +1130,9 @@ static int FlowMgrTest01 (void)
 
     f.proto = IPPROTO_TCP;
 
+    int32_t next_ts = 0;
     int state = SC_ATOMIC_GET(f.flow_state);
-    if (FlowManagerFlowTimeout(&f, state, &ts) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) {
+    if (FlowManagerFlowTimeout(&f, state, &ts, &next_ts) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) {
         FBLOCK_DESTROY(&fb);
         FLOW_DESTROY(&f);
         FlowQueueDestroy(&flow_spare_q);
@@ -1105,8 +1190,9 @@ static int FlowMgrTest02 (void)
     f.fb = &fb;
     f.proto = IPPROTO_TCP;
 
+    int32_t next_ts = 0;
     int state = SC_ATOMIC_GET(f.flow_state);
-    if (FlowManagerFlowTimeout(&f, state, &ts) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) {
+    if (FlowManagerFlowTimeout(&f, state, &ts, &next_ts) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) {
         FBLOCK_DESTROY(&fb);
         FLOW_DESTROY(&f);
         FlowQueueDestroy(&flow_spare_q);
@@ -1152,8 +1238,9 @@ static int FlowMgrTest03 (void)
     f.proto = IPPROTO_TCP;
     f.flags |= FLOW_EMERGENCY;
 
+    int next_ts = 0;
     int state = SC_ATOMIC_GET(f.flow_state);
-    if (FlowManagerFlowTimeout(&f, state, &ts) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) {
+    if (FlowManagerFlowTimeout(&f, state, &ts, &next_ts) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) {
         FBLOCK_DESTROY(&fb);
         FLOW_DESTROY(&f);
         FlowQueueDestroy(&flow_spare_q);
@@ -1212,8 +1299,9 @@ static int FlowMgrTest04 (void)
     f.proto = IPPROTO_TCP;
     f.flags |= FLOW_EMERGENCY;
 
+    int next_ts = 0;
     int state = SC_ATOMIC_GET(f.flow_state);
-    if (FlowManagerFlowTimeout(&f, state, &ts) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) {
+    if (FlowManagerFlowTimeout(&f, state, &ts, &next_ts) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) {
         FBLOCK_DESTROY(&fb);
         FLOW_DESTROY(&f);
         FlowQueueDestroy(&flow_spare_q);
@@ -1265,7 +1353,7 @@ static int FlowMgrTest05 (void)
     struct timeval ts;
     TimeGet(&ts);
     /* try to time out flows */
-    FlowTimeoutCounters counters = { 0, 0, 0, 0, };
+    FlowTimeoutCounters counters = { 0, 0, 0, 0, 0,0,0,0,0,0,0,0,0,0,};
     FlowTimeoutHash(&ts, 0 /* check all */, 0, flow_config.hash_size, &counters);
 
     if (flow_recycle_q.len > 0) {
index ddd6428e63ef8cb9cc12e0a8072dd38e875db330..e7671c59f89eed3bee522ee77ca7ce3ba10fcfe5 100644 (file)
@@ -251,7 +251,7 @@ void FlowHandlePacketUpdate(Flow *f, Packet *p)
         p->flowflags |= FLOW_PKT_ESTABLISHED;
 
         if (f->proto != IPPROTO_TCP) {
-            SC_ATOMIC_SET(f->flow_state, FLOW_STATE_ESTABLISHED);
+            FlowUpdateState(f, FLOW_STATE_ESTABLISHED);
         }
     }
 
@@ -378,6 +378,7 @@ void FlowInitConfig(char quiet)
     uint32_t i = 0;
     for (i = 0; i < flow_config.hash_size; i++) {
         FBLOCK_INIT(&flow_hash[i]);
+        SC_ATOMIC_INIT(flow_hash[i].next_ts);
     }
     (void) SC_ATOMIC_ADD(flow_memuse, (flow_config.hash_size * sizeof(FlowBucket)));
 
@@ -460,6 +461,7 @@ void FlowShutdown(void)
             }
 
             FBLOCK_DESTROY(&flow_hash[u]);
+            SC_ATOMIC_DESTROY(flow_hash[u].next_ts);
         }
         SCFreeAligned(flow_hash);
         flow_hash = NULL;
@@ -787,6 +789,18 @@ uint8_t FlowGetDisruptionFlags(const Flow *f, uint8_t flags)
     return newflags;
 }
 
+void FlowUpdateState(Flow *f, enum FlowState s)
+{
+    /* set the state */
+    SC_ATOMIC_SET(f->flow_state, s);
+
+    if (f->fb) {
+        /* and reset the flow buckup next_ts value so that the flow manager
+         * has to revisit this row */
+        SC_ATOMIC_SET(f->fb->next_ts, 0);
+    }
+}
+
 /************************************Unittests*******************************/
 
 #ifdef UNITTESTS
index def9c515efbd950acef3387c3bf13cbe3cd6a59a..df5d9a4ac4d718ba0644797e7db1f5cbdb1f9d7d 100644 (file)
@@ -421,7 +421,7 @@ typedef struct Flow_
     uint64_t tosrcbytecnt;
 } Flow;
 
-enum {
+enum FlowState {
     FLOW_STATE_NEW = 0,
     FLOW_STATE_ESTABLISHED,
     FLOW_STATE_CLOSED,
@@ -465,6 +465,10 @@ int FlowGetPacketDirection(const Flow *, const Packet *);
 
 void FlowCleanupAppLayer(Flow *);
 
+void FlowUpdateState(Flow *f, enum FlowState s);
+
+/** ----- Inline functions ----- */
+
 /** \brief Set the No Packet Inspection Flag without locking the flow.
  *
  * \param f Flow to set the flag in
index a0517f3a4b32e110dfa29c03dc59a08b593291ec..470d4208f112e59a99d00bf9ebf7b6274a3d97d3 100644 (file)
@@ -662,12 +662,12 @@ static void StreamTcpPacketSetState(Packet *p, TcpSession *ssn,
         case TCP_FIN_WAIT2:
         case TCP_CLOSING:
         case TCP_CLOSE_WAIT:
-            SC_ATOMIC_SET(p->flow->flow_state, FLOW_STATE_ESTABLISHED);
+            FlowUpdateState(p->flow, FLOW_STATE_ESTABLISHED);
             break;
         case TCP_LAST_ACK:
         case TCP_TIME_WAIT:
         case TCP_CLOSED:
-            SC_ATOMIC_SET(p->flow->flow_state, FLOW_STATE_CLOSED);
+            FlowUpdateState(p->flow, FLOW_STATE_CLOSED);
             break;
     }
 }