From: Victor Julien Date: Sat, 14 May 2016 06:56:49 +0000 (+0200) Subject: flow-manager: optimize hash walking X-Git-Tag: suricata-3.2beta1~308 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=70c16f50e733f6f7cc40c1bc3465eb966e3be517;p=thirdparty%2Fsuricata.git flow-manager: optimize hash walking Until now the flow manager would walk the entire flow hash table on an interval. It would thus touch all flows, leading to a lot of memory and cache pressure. In scenario's where the number of tracked flows run into the hundreds on thousands, and the memory used can run into many hundreds of megabytes or even gigabytes, this would lead to serious performance degradation. This patch introduces a new approach. A timestamp per flow bucket (hash row) is maintained by the flow manager. It holds the timestamp of the earliest possible timeout of a flow in the list. The hash walk skips rows with timestamps beyond the current time. As the timestamp depends on the flows in the hash row's list, and on the 'state' of each flow in the list, any addition of a flow or changing of a flow's state invalidates the timestamp. The flow manager then has to walk the list again to set a new timestamp. A utility function FlowUpdateState is introduced to change Flow states, taking care of the bucket timestamp invalidation while at it. Empty flow buckets use a special value so that we don't have to take the flow bucket lock to find out the bucket is empty. This patch also adds more performance counters: flow_mgr.flows_checked | Total | 929 flow_mgr.flows_notimeout | Total | 391 flow_mgr.flows_timeout | Total | 538 flow_mgr.flows_removed | Total | 277 flow_mgr.flows_timeout_inuse | Total | 261 flow_mgr.rows_checked | Total | 1000000 flow_mgr.rows_skipped | Total | 998835 flow_mgr.rows_empty | Total | 290 flow_mgr.rows_maxlen | Total | 2 flow_mgr.flows_checked: number of flows checked for timeout in the last pass flow_mgr.flows_notimeout: number of flows out of flow_mgr.flows_checked that didn't time out flow_mgr.flows_timeout: number of out of flow_mgr.flows_checked that did reach the time out flow_mgr.flows_removed: number of flows out of flow_mgr.flows_timeout that were really removed flow_mgr.flows_timeout_inuse: number of flows out of flow_mgr.flows_timeout that were still in use or needed work flow_mgr.rows_checked: hash table rows checked flow_mgr.rows_skipped: hash table rows skipped because non of the flows would time out anyway The counters below are only relating to rows that were not skipped. flow_mgr.rows_empty: empty hash rows flow_mgr.rows_maxlen: max number of flows per hash row. Best to keep low, so increase hash-size if needed. flow_mgr.rows_busy: row skipped because it was locked by another thread --- diff --git a/src/flow-hash.c b/src/flow-hash.c index be4940d7d6..a330101b99 100644 --- a/src/flow-hash.c +++ b/src/flow-hash.c @@ -487,6 +487,7 @@ Flow *FlowGetFlowFromHash(ThreadVars *tv, DecodeThreadVars *dtv, const Packet *p FlowInit(f, p); f->flow_hash = hash; f->fb = fb; + FlowUpdateState(f, FLOW_STATE_NEW); /* update the last seen timestamp of this flow */ COPY_TIMESTAMP(&p->ts,&f->lastts); @@ -523,6 +524,7 @@ Flow *FlowGetFlowFromHash(ThreadVars *tv, DecodeThreadVars *dtv, const Packet *p FlowInit(f, p); f->flow_hash = hash; f->fb = fb; + FlowUpdateState(f, FLOW_STATE_NEW); /* update the last seen timestamp of this flow */ COPY_TIMESTAMP(&p->ts,&f->lastts); @@ -649,6 +651,7 @@ static Flow *FlowGetUsedFlow(ThreadVars *tv, DecodeThreadVars *dtv) f->hnext = NULL; f->hprev = NULL; f->fb = NULL; + SC_ATOMIC_SET(fb->next_ts, 0); FBLOCK_UNLOCK(fb); int state = SC_ATOMIC_GET(f->flow_state); @@ -670,6 +673,8 @@ static Flow *FlowGetUsedFlow(ThreadVars *tv, DecodeThreadVars *dtv) FlowClearMemory(f, f->protomap); + FlowUpdateState(f, FLOW_STATE_NEW); + FLOWLOCK_UNLOCK(f); (void) SC_ATOMIC_ADD(flow_prune_idx, (flow_config.hash_size - cnt)); diff --git a/src/flow-hash.h b/src/flow-hash.h index e570b05551..18a8c324b5 100644 --- a/src/flow-hash.h +++ b/src/flow-hash.h @@ -48,6 +48,12 @@ typedef struct FlowBucket_ { #else #error Enable FBLOCK_SPIN or FBLOCK_MUTEX #endif + /** timestamp in seconds of the earliest possible moment a flow + * will time out in this row. Set by the flow manager. Cleared + * to 0 by workers, either when new flows are added or when a + * flow state changes. The flow manager sets this to INT_MAX for + * empty buckets. */ + SC_ATOMIC_DECLARE(int32_t, next_ts); } __attribute__((aligned(CLS))) FlowBucket; #ifdef FBLOCK_SPIN diff --git a/src/flow-manager.c b/src/flow-manager.c index be58914a53..0deb0bc571 100644 --- a/src/flow-manager.c +++ b/src/flow-manager.c @@ -109,6 +109,18 @@ typedef struct FlowTimeoutCounters_ { uint32_t est; uint32_t clo; uint32_t tcp_reuse; + + uint32_t flows_checked; + uint32_t flows_notimeout; + uint32_t flows_timeout; + uint32_t flows_timeout_inuse; + uint32_t flows_removed; + + uint32_t rows_checked; + uint32_t rows_skipped; + uint32_t rows_empty; + uint32_t rows_busy; + uint32_t rows_maxlen; } FlowTimeoutCounters; /** @@ -181,7 +193,7 @@ void FlowDisableFlowManagerThread(void) * * \retval timeout timeout in seconds */ -static inline uint32_t FlowGetFlowTimeout(const Flow *f, int state) +static inline uint32_t FlowGetFlowTimeout(const Flow *f, enum FlowState state) { uint32_t timeout; FlowProtoTimeoutPtr flow_timeouts = SC_ATOMIC_GET(flow_timeouts); @@ -209,14 +221,18 @@ static inline uint32_t FlowGetFlowTimeout(const Flow *f, int state) * \retval 0 not timed out * \retval 1 timed out */ -static int FlowManagerFlowTimeout(const Flow *f, int state, struct timeval *ts) +static int FlowManagerFlowTimeout(const Flow *f, enum FlowState state, struct timeval *ts, int32_t *next_ts) { /* set the timeout value according to the flow operating mode, * flow's state and protocol.*/ uint32_t timeout = FlowGetFlowTimeout(f, state); + int32_t flow_times_out_at = (int32_t)(f->lastts.tv_sec + timeout); + if (*next_ts == 0 || flow_times_out_at < *next_ts) + *next_ts = flow_times_out_at; + /* do the timeout check */ - if ((int32_t)(f->lastts.tv_sec + timeout) >= ts->tv_sec) { + if (flow_times_out_at >= ts->tv_sec) { return 0; } @@ -268,20 +284,26 @@ static int FlowManagerFlowTimedOut(Flow *f, struct timeval *ts) * \retval cnt timed out flows */ static uint32_t FlowManagerHashRowTimeout(Flow *f, struct timeval *ts, - int emergency, FlowTimeoutCounters *counters) + int emergency, FlowTimeoutCounters *counters, int32_t *next_ts) { uint32_t cnt = 0; + uint32_t checked = 0; do { + checked++; + /* check flow timeout based on lastts and state. Both can be * accessed w/o Flow lock as we do have the hash row lock (so flow * can't disappear) and flow_state is atomic. lastts can only * be modified when we have both the flow and hash row lock */ - int state = SC_ATOMIC_GET(f->flow_state); + enum FlowState state = SC_ATOMIC_GET(f->flow_state); /* timeout logic goes here */ - if (FlowManagerFlowTimeout(f, state, ts) == 0) { + if (FlowManagerFlowTimeout(f, state, ts, next_ts) == 0) { + + counters->flows_notimeout++; + f = f->hprev; continue; } @@ -294,6 +316,8 @@ static uint32_t FlowManagerHashRowTimeout(Flow *f, struct timeval *ts, Flow *next_flow = f->hprev; + counters->flows_timeout++; + /* check if the flow is fully timed out and * ready to be discarded. */ if (FlowManagerFlowTimedOut(f, ts) == 1) { @@ -347,13 +371,19 @@ static uint32_t FlowManagerHashRowTimeout(Flow *f, struct timeval *ts, counters->clo++; break; } + counters->flows_removed++; } else { + counters->flows_timeout_inuse++; FLOWLOCK_UNLOCK(f); } f = next_flow; } while (f != NULL); + counters->flows_checked += checked; + if (checked > counters->rows_maxlen) + counters->rows_maxlen = checked; + return cnt; } @@ -382,20 +412,37 @@ static uint32_t FlowTimeoutHash(struct timeval *ts, uint32_t try_cnt, for (idx = hash_min; idx < hash_max; idx++) { FlowBucket *fb = &flow_hash[idx]; + counters->rows_checked++; + + int32_t check_ts = SC_ATOMIC_GET(fb->next_ts); + if (check_ts > (int32_t)ts->tv_sec) { + counters->rows_skipped++; + continue; + } + /* before grabbing the row lock, make sure we have at least * 9 packets in the pool */ PacketPoolWaitForN(9); - if (FBLOCK_TRYLOCK(fb) != 0) + if (FBLOCK_TRYLOCK(fb) != 0) { + counters->rows_busy++; continue; + } /* flow hash bucket is now locked */ - if (fb->tail == NULL) + if (fb->tail == NULL) { + SC_ATOMIC_SET(fb->next_ts, INT_MAX); + counters->rows_empty++; goto next; + } + + int32_t next_ts = 0; /* we have a flow, or more than one */ - cnt += FlowManagerHashRowTimeout(fb->tail, ts, emergency, counters); + cnt += FlowManagerHashRowTimeout(fb->tail, ts, emergency, counters, &next_ts); + + SC_ATOMIC_SET(fb->next_ts, next_ts); next: FBLOCK_UNLOCK(fb); @@ -502,6 +549,19 @@ typedef struct FlowManagerThreadData_ { uint16_t flow_emerg_mode_enter; uint16_t flow_emerg_mode_over; uint16_t flow_tcp_reuse; + + uint16_t flow_mgr_flows_checked; + uint16_t flow_mgr_flows_notimeout; + uint16_t flow_mgr_flows_timeout; + uint16_t flow_mgr_flows_timeout_inuse; + uint16_t flow_mgr_flows_removed; + + uint16_t flow_mgr_rows_checked; + uint16_t flow_mgr_rows_skipped; + uint16_t flow_mgr_rows_empty; + uint16_t flow_mgr_rows_busy; + uint16_t flow_mgr_rows_maxlen; + } FlowManagerThreadData; static TmEcode FlowManagerThreadInit(ThreadVars *t, void *initdata, void **data) @@ -540,6 +600,18 @@ static TmEcode FlowManagerThreadInit(ThreadVars *t, void *initdata, void **data) ftd->flow_emerg_mode_over = StatsRegisterCounter("flow.emerg_mode_over", t); ftd->flow_tcp_reuse = StatsRegisterCounter("flow.tcp_reuse", t); + ftd->flow_mgr_flows_checked = StatsRegisterCounter("flow_mgr.flows_checked", t); + ftd->flow_mgr_flows_notimeout = StatsRegisterCounter("flow_mgr.flows_notimeout", t); + ftd->flow_mgr_flows_timeout = StatsRegisterCounter("flow_mgr.flows_timeout", t); + ftd->flow_mgr_flows_timeout_inuse = StatsRegisterCounter("flow_mgr.flows_timeout_inuse", t); + ftd->flow_mgr_flows_removed = StatsRegisterCounter("flow_mgr.flows_removed", t); + + ftd->flow_mgr_rows_checked = StatsRegisterCounter("flow_mgr.rows_checked", t); + ftd->flow_mgr_rows_skipped = StatsRegisterCounter("flow_mgr.rows_skipped", t); + ftd->flow_mgr_rows_empty = StatsRegisterCounter("flow_mgr.rows_empty", t); + ftd->flow_mgr_rows_busy = StatsRegisterCounter("flow_mgr.rows_busy", t); + ftd->flow_mgr_rows_maxlen = StatsRegisterCounter("flow_mgr.rows_maxlen", t); + PacketPoolInit(); return TM_ECODE_OK; } @@ -609,7 +681,7 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data) FlowUpdateSpareFlows(); /* try to time out flows */ - FlowTimeoutCounters counters = { 0, 0, 0, 0, }; + FlowTimeoutCounters counters = { 0, 0, 0, 0, 0,0,0,0,0,0,0,0,0,0,}; FlowTimeoutHash(&ts, 0 /* check all */, ftd->min, ftd->max, &counters); @@ -631,6 +703,18 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data) StatsAddUI64(th_v, ftd->flow_mgr_cnt_est, (uint64_t)counters.est); StatsAddUI64(th_v, ftd->flow_tcp_reuse, (uint64_t)counters.tcp_reuse); + StatsSetUI64(th_v, ftd->flow_mgr_flows_checked, (uint64_t)counters.flows_checked); + StatsSetUI64(th_v, ftd->flow_mgr_flows_notimeout, (uint64_t)counters.flows_notimeout); + StatsSetUI64(th_v, ftd->flow_mgr_flows_timeout, (uint64_t)counters.flows_timeout); + StatsSetUI64(th_v, ftd->flow_mgr_flows_removed, (uint64_t)counters.flows_removed); + StatsSetUI64(th_v, ftd->flow_mgr_flows_timeout_inuse, (uint64_t)counters.flows_timeout_inuse); + + StatsSetUI64(th_v, ftd->flow_mgr_rows_checked, (uint64_t)counters.rows_checked); + StatsSetUI64(th_v, ftd->flow_mgr_rows_skipped, (uint64_t)counters.rows_skipped); + StatsSetUI64(th_v, ftd->flow_mgr_rows_maxlen, (uint64_t)counters.rows_maxlen); + StatsSetUI64(th_v, ftd->flow_mgr_rows_busy, (uint64_t)counters.rows_busy); + StatsSetUI64(th_v, ftd->flow_mgr_rows_empty, (uint64_t)counters.rows_empty); + uint32_t len = 0; FQLOCK_LOCK(&flow_spare_q); len = flow_spare_q.len; @@ -1046,8 +1130,9 @@ static int FlowMgrTest01 (void) f.proto = IPPROTO_TCP; + int32_t next_ts = 0; int state = SC_ATOMIC_GET(f.flow_state); - if (FlowManagerFlowTimeout(&f, state, &ts) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) { + if (FlowManagerFlowTimeout(&f, state, &ts, &next_ts) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) { FBLOCK_DESTROY(&fb); FLOW_DESTROY(&f); FlowQueueDestroy(&flow_spare_q); @@ -1105,8 +1190,9 @@ static int FlowMgrTest02 (void) f.fb = &fb; f.proto = IPPROTO_TCP; + int32_t next_ts = 0; int state = SC_ATOMIC_GET(f.flow_state); - if (FlowManagerFlowTimeout(&f, state, &ts) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) { + if (FlowManagerFlowTimeout(&f, state, &ts, &next_ts) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) { FBLOCK_DESTROY(&fb); FLOW_DESTROY(&f); FlowQueueDestroy(&flow_spare_q); @@ -1152,8 +1238,9 @@ static int FlowMgrTest03 (void) f.proto = IPPROTO_TCP; f.flags |= FLOW_EMERGENCY; + int next_ts = 0; int state = SC_ATOMIC_GET(f.flow_state); - if (FlowManagerFlowTimeout(&f, state, &ts) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) { + if (FlowManagerFlowTimeout(&f, state, &ts, &next_ts) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) { FBLOCK_DESTROY(&fb); FLOW_DESTROY(&f); FlowQueueDestroy(&flow_spare_q); @@ -1212,8 +1299,9 @@ static int FlowMgrTest04 (void) f.proto = IPPROTO_TCP; f.flags |= FLOW_EMERGENCY; + int next_ts = 0; int state = SC_ATOMIC_GET(f.flow_state); - if (FlowManagerFlowTimeout(&f, state, &ts) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) { + if (FlowManagerFlowTimeout(&f, state, &ts, &next_ts) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) { FBLOCK_DESTROY(&fb); FLOW_DESTROY(&f); FlowQueueDestroy(&flow_spare_q); @@ -1265,7 +1353,7 @@ static int FlowMgrTest05 (void) struct timeval ts; TimeGet(&ts); /* try to time out flows */ - FlowTimeoutCounters counters = { 0, 0, 0, 0, }; + FlowTimeoutCounters counters = { 0, 0, 0, 0, 0,0,0,0,0,0,0,0,0,0,}; FlowTimeoutHash(&ts, 0 /* check all */, 0, flow_config.hash_size, &counters); if (flow_recycle_q.len > 0) { diff --git a/src/flow.c b/src/flow.c index ddd6428e63..e7671c59f8 100644 --- a/src/flow.c +++ b/src/flow.c @@ -251,7 +251,7 @@ void FlowHandlePacketUpdate(Flow *f, Packet *p) p->flowflags |= FLOW_PKT_ESTABLISHED; if (f->proto != IPPROTO_TCP) { - SC_ATOMIC_SET(f->flow_state, FLOW_STATE_ESTABLISHED); + FlowUpdateState(f, FLOW_STATE_ESTABLISHED); } } @@ -378,6 +378,7 @@ void FlowInitConfig(char quiet) uint32_t i = 0; for (i = 0; i < flow_config.hash_size; i++) { FBLOCK_INIT(&flow_hash[i]); + SC_ATOMIC_INIT(flow_hash[i].next_ts); } (void) SC_ATOMIC_ADD(flow_memuse, (flow_config.hash_size * sizeof(FlowBucket))); @@ -460,6 +461,7 @@ void FlowShutdown(void) } FBLOCK_DESTROY(&flow_hash[u]); + SC_ATOMIC_DESTROY(flow_hash[u].next_ts); } SCFreeAligned(flow_hash); flow_hash = NULL; @@ -787,6 +789,18 @@ uint8_t FlowGetDisruptionFlags(const Flow *f, uint8_t flags) return newflags; } +void FlowUpdateState(Flow *f, enum FlowState s) +{ + /* set the state */ + SC_ATOMIC_SET(f->flow_state, s); + + if (f->fb) { + /* and reset the flow buckup next_ts value so that the flow manager + * has to revisit this row */ + SC_ATOMIC_SET(f->fb->next_ts, 0); + } +} + /************************************Unittests*******************************/ #ifdef UNITTESTS diff --git a/src/flow.h b/src/flow.h index def9c515ef..df5d9a4ac4 100644 --- a/src/flow.h +++ b/src/flow.h @@ -421,7 +421,7 @@ typedef struct Flow_ uint64_t tosrcbytecnt; } Flow; -enum { +enum FlowState { FLOW_STATE_NEW = 0, FLOW_STATE_ESTABLISHED, FLOW_STATE_CLOSED, @@ -465,6 +465,10 @@ int FlowGetPacketDirection(const Flow *, const Packet *); void FlowCleanupAppLayer(Flow *); +void FlowUpdateState(Flow *f, enum FlowState s); + +/** ----- Inline functions ----- */ + /** \brief Set the No Packet Inspection Flag without locking the flow. * * \param f Flow to set the flag in diff --git a/src/stream-tcp.c b/src/stream-tcp.c index a0517f3a4b..470d4208f1 100644 --- a/src/stream-tcp.c +++ b/src/stream-tcp.c @@ -662,12 +662,12 @@ static void StreamTcpPacketSetState(Packet *p, TcpSession *ssn, case TCP_FIN_WAIT2: case TCP_CLOSING: case TCP_CLOSE_WAIT: - SC_ATOMIC_SET(p->flow->flow_state, FLOW_STATE_ESTABLISHED); + FlowUpdateState(p->flow, FLOW_STATE_ESTABLISHED); break; case TCP_LAST_ACK: case TCP_TIME_WAIT: case TCP_CLOSED: - SC_ATOMIC_SET(p->flow->flow_state, FLOW_STATE_CLOSED); + FlowUpdateState(p->flow, FLOW_STATE_CLOSED); break; } }