uint32_t est;
uint32_t clo;
uint32_t tcp_reuse;
+
+ uint32_t flows_checked;
+ uint32_t flows_notimeout;
+ uint32_t flows_timeout;
+ uint32_t flows_timeout_inuse;
+ uint32_t flows_removed;
+
+ uint32_t rows_checked;
+ uint32_t rows_skipped;
+ uint32_t rows_empty;
+ uint32_t rows_busy;
+ uint32_t rows_maxlen;
} FlowTimeoutCounters;
/**
*
* \retval timeout timeout in seconds
*/
-static inline uint32_t FlowGetFlowTimeout(const Flow *f, int state)
+static inline uint32_t FlowGetFlowTimeout(const Flow *f, enum FlowState state)
{
uint32_t timeout;
FlowProtoTimeoutPtr flow_timeouts = SC_ATOMIC_GET(flow_timeouts);
* \retval 0 not timed out
* \retval 1 timed out
*/
-static int FlowManagerFlowTimeout(const Flow *f, int state, struct timeval *ts)
+static int FlowManagerFlowTimeout(const Flow *f, enum FlowState state, struct timeval *ts, int32_t *next_ts)
{
/* set the timeout value according to the flow operating mode,
* flow's state and protocol.*/
uint32_t timeout = FlowGetFlowTimeout(f, state);
+ int32_t flow_times_out_at = (int32_t)(f->lastts.tv_sec + timeout);
+ if (*next_ts == 0 || flow_times_out_at < *next_ts)
+ *next_ts = flow_times_out_at;
+
/* do the timeout check */
- if ((int32_t)(f->lastts.tv_sec + timeout) >= ts->tv_sec) {
+ if (flow_times_out_at >= ts->tv_sec) {
return 0;
}
* \retval cnt timed out flows
*/
static uint32_t FlowManagerHashRowTimeout(Flow *f, struct timeval *ts,
- int emergency, FlowTimeoutCounters *counters)
+ int emergency, FlowTimeoutCounters *counters, int32_t *next_ts)
{
uint32_t cnt = 0;
+ uint32_t checked = 0;
do {
+ checked++;
+
/* check flow timeout based on lastts and state. Both can be
* accessed w/o Flow lock as we do have the hash row lock (so flow
* can't disappear) and flow_state is atomic. lastts can only
* be modified when we have both the flow and hash row lock */
- int state = SC_ATOMIC_GET(f->flow_state);
+ enum FlowState state = SC_ATOMIC_GET(f->flow_state);
/* timeout logic goes here */
- if (FlowManagerFlowTimeout(f, state, ts) == 0) {
+ if (FlowManagerFlowTimeout(f, state, ts, next_ts) == 0) {
+
+ counters->flows_notimeout++;
+
f = f->hprev;
continue;
}
Flow *next_flow = f->hprev;
+ counters->flows_timeout++;
+
/* check if the flow is fully timed out and
* ready to be discarded. */
if (FlowManagerFlowTimedOut(f, ts) == 1) {
counters->clo++;
break;
}
+ counters->flows_removed++;
} else {
+ counters->flows_timeout_inuse++;
FLOWLOCK_UNLOCK(f);
}
f = next_flow;
} while (f != NULL);
+ counters->flows_checked += checked;
+ if (checked > counters->rows_maxlen)
+ counters->rows_maxlen = checked;
+
return cnt;
}
for (idx = hash_min; idx < hash_max; idx++) {
FlowBucket *fb = &flow_hash[idx];
+ counters->rows_checked++;
+
+ int32_t check_ts = SC_ATOMIC_GET(fb->next_ts);
+ if (check_ts > (int32_t)ts->tv_sec) {
+ counters->rows_skipped++;
+ continue;
+ }
+
/* before grabbing the row lock, make sure we have at least
* 9 packets in the pool */
PacketPoolWaitForN(9);
- if (FBLOCK_TRYLOCK(fb) != 0)
+ if (FBLOCK_TRYLOCK(fb) != 0) {
+ counters->rows_busy++;
continue;
+ }
/* flow hash bucket is now locked */
- if (fb->tail == NULL)
+ if (fb->tail == NULL) {
+ SC_ATOMIC_SET(fb->next_ts, INT_MAX);
+ counters->rows_empty++;
goto next;
+ }
+
+ int32_t next_ts = 0;
/* we have a flow, or more than one */
- cnt += FlowManagerHashRowTimeout(fb->tail, ts, emergency, counters);
+ cnt += FlowManagerHashRowTimeout(fb->tail, ts, emergency, counters, &next_ts);
+
+ SC_ATOMIC_SET(fb->next_ts, next_ts);
next:
FBLOCK_UNLOCK(fb);
uint16_t flow_emerg_mode_enter;
uint16_t flow_emerg_mode_over;
uint16_t flow_tcp_reuse;
+
+ uint16_t flow_mgr_flows_checked;
+ uint16_t flow_mgr_flows_notimeout;
+ uint16_t flow_mgr_flows_timeout;
+ uint16_t flow_mgr_flows_timeout_inuse;
+ uint16_t flow_mgr_flows_removed;
+
+ uint16_t flow_mgr_rows_checked;
+ uint16_t flow_mgr_rows_skipped;
+ uint16_t flow_mgr_rows_empty;
+ uint16_t flow_mgr_rows_busy;
+ uint16_t flow_mgr_rows_maxlen;
+
} FlowManagerThreadData;
static TmEcode FlowManagerThreadInit(ThreadVars *t, void *initdata, void **data)
ftd->flow_emerg_mode_over = StatsRegisterCounter("flow.emerg_mode_over", t);
ftd->flow_tcp_reuse = StatsRegisterCounter("flow.tcp_reuse", t);
+ ftd->flow_mgr_flows_checked = StatsRegisterCounter("flow_mgr.flows_checked", t);
+ ftd->flow_mgr_flows_notimeout = StatsRegisterCounter("flow_mgr.flows_notimeout", t);
+ ftd->flow_mgr_flows_timeout = StatsRegisterCounter("flow_mgr.flows_timeout", t);
+ ftd->flow_mgr_flows_timeout_inuse = StatsRegisterCounter("flow_mgr.flows_timeout_inuse", t);
+ ftd->flow_mgr_flows_removed = StatsRegisterCounter("flow_mgr.flows_removed", t);
+
+ ftd->flow_mgr_rows_checked = StatsRegisterCounter("flow_mgr.rows_checked", t);
+ ftd->flow_mgr_rows_skipped = StatsRegisterCounter("flow_mgr.rows_skipped", t);
+ ftd->flow_mgr_rows_empty = StatsRegisterCounter("flow_mgr.rows_empty", t);
+ ftd->flow_mgr_rows_busy = StatsRegisterCounter("flow_mgr.rows_busy", t);
+ ftd->flow_mgr_rows_maxlen = StatsRegisterCounter("flow_mgr.rows_maxlen", t);
+
PacketPoolInit();
return TM_ECODE_OK;
}
FlowUpdateSpareFlows();
/* try to time out flows */
- FlowTimeoutCounters counters = { 0, 0, 0, 0, };
+ FlowTimeoutCounters counters = { 0, 0, 0, 0, 0,0,0,0,0,0,0,0,0,0,};
FlowTimeoutHash(&ts, 0 /* check all */, ftd->min, ftd->max, &counters);
StatsAddUI64(th_v, ftd->flow_mgr_cnt_est, (uint64_t)counters.est);
StatsAddUI64(th_v, ftd->flow_tcp_reuse, (uint64_t)counters.tcp_reuse);
+ StatsSetUI64(th_v, ftd->flow_mgr_flows_checked, (uint64_t)counters.flows_checked);
+ StatsSetUI64(th_v, ftd->flow_mgr_flows_notimeout, (uint64_t)counters.flows_notimeout);
+ StatsSetUI64(th_v, ftd->flow_mgr_flows_timeout, (uint64_t)counters.flows_timeout);
+ StatsSetUI64(th_v, ftd->flow_mgr_flows_removed, (uint64_t)counters.flows_removed);
+ StatsSetUI64(th_v, ftd->flow_mgr_flows_timeout_inuse, (uint64_t)counters.flows_timeout_inuse);
+
+ StatsSetUI64(th_v, ftd->flow_mgr_rows_checked, (uint64_t)counters.rows_checked);
+ StatsSetUI64(th_v, ftd->flow_mgr_rows_skipped, (uint64_t)counters.rows_skipped);
+ StatsSetUI64(th_v, ftd->flow_mgr_rows_maxlen, (uint64_t)counters.rows_maxlen);
+ StatsSetUI64(th_v, ftd->flow_mgr_rows_busy, (uint64_t)counters.rows_busy);
+ StatsSetUI64(th_v, ftd->flow_mgr_rows_empty, (uint64_t)counters.rows_empty);
+
uint32_t len = 0;
FQLOCK_LOCK(&flow_spare_q);
len = flow_spare_q.len;
f.proto = IPPROTO_TCP;
+ int32_t next_ts = 0;
int state = SC_ATOMIC_GET(f.flow_state);
- if (FlowManagerFlowTimeout(&f, state, &ts) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) {
+ if (FlowManagerFlowTimeout(&f, state, &ts, &next_ts) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) {
FBLOCK_DESTROY(&fb);
FLOW_DESTROY(&f);
FlowQueueDestroy(&flow_spare_q);
f.fb = &fb;
f.proto = IPPROTO_TCP;
+ int32_t next_ts = 0;
int state = SC_ATOMIC_GET(f.flow_state);
- if (FlowManagerFlowTimeout(&f, state, &ts) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) {
+ if (FlowManagerFlowTimeout(&f, state, &ts, &next_ts) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) {
FBLOCK_DESTROY(&fb);
FLOW_DESTROY(&f);
FlowQueueDestroy(&flow_spare_q);
f.proto = IPPROTO_TCP;
f.flags |= FLOW_EMERGENCY;
+ int next_ts = 0;
int state = SC_ATOMIC_GET(f.flow_state);
- if (FlowManagerFlowTimeout(&f, state, &ts) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) {
+ if (FlowManagerFlowTimeout(&f, state, &ts, &next_ts) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) {
FBLOCK_DESTROY(&fb);
FLOW_DESTROY(&f);
FlowQueueDestroy(&flow_spare_q);
f.proto = IPPROTO_TCP;
f.flags |= FLOW_EMERGENCY;
+ int next_ts = 0;
int state = SC_ATOMIC_GET(f.flow_state);
- if (FlowManagerFlowTimeout(&f, state, &ts) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) {
+ if (FlowManagerFlowTimeout(&f, state, &ts, &next_ts) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) {
FBLOCK_DESTROY(&fb);
FLOW_DESTROY(&f);
FlowQueueDestroy(&flow_spare_q);
struct timeval ts;
TimeGet(&ts);
/* try to time out flows */
- FlowTimeoutCounters counters = { 0, 0, 0, 0, };
+ FlowTimeoutCounters counters = { 0, 0, 0, 0, 0,0,0,0,0,0,0,0,0,0,};
FlowTimeoutHash(&ts, 0 /* check all */, 0, flow_config.hash_size, &counters);
if (flow_recycle_q.len > 0) {