#include "output-flow.h"
#include "util-validate.h"
+#include "runmode-unix-socket.h"
+
/* Run mode selected at suricata.c */
extern int run_mode;
return cnt;
}
-static uint32_t FlowTimeoutHashInChunks(FlowManagerTimeoutThread *td,
- struct timeval *ts,
- const uint32_t hash_min, const uint32_t hash_max,
- FlowTimeoutCounters *counters, uint32_t iter, const uint32_t chunks)
+/** \internal
+ * \brief handle timeout for a slice of hash rows
+ * If we wrap around we call FlowTimeoutHash twice */
+static uint32_t FlowTimeoutHashInChunks(FlowManagerTimeoutThread *td, struct timeval *ts,
+ const uint32_t hash_min, const uint32_t hash_max, FlowTimeoutCounters *counters,
+ const uint32_t rows, uint32_t *pos)
{
- const uint32_t rows = hash_max - hash_min;
- const uint32_t chunk_size = rows / chunks;
-
- const uint32_t min = iter * chunk_size + hash_min;
- uint32_t max = min + chunk_size;
- /* we start at beginning of hash at next iteration so let's check
- * hash till the end */
- if (iter + 1 == chunks) {
- max = hash_max;
+ uint32_t start = 0;
+ uint32_t end = 0;
+ uint32_t cnt = 0;
+ uint32_t rows_left = rows;
+
+again:
+ start = hash_min + (*pos);
+ if (start >= hash_max) {
+ start = hash_min;
+ }
+ end = start + rows_left;
+ if (end > hash_max) {
+ end = hash_max;
+ }
+ *pos = (end == hash_max) ? hash_min : end;
+ rows_left = rows_left - (end - start);
+
+ cnt += FlowTimeoutHash(td, ts, start, end, counters);
+ if (rows_left) {
+ goto again;
}
- const uint32_t cnt = FlowTimeoutHash(td, ts, min, max, counters);
return cnt;
}
uint16_t flow_bypassed_cnt_clo;
uint16_t flow_bypassed_pkts;
uint16_t flow_bypassed_bytes;
+
+ uint16_t memcap_pressure;
+ uint16_t memcap_pressure_max;
} FlowCounters;
typedef struct FlowManagerThreadData_ {
fc->flow_bypassed_cnt_clo = StatsRegisterCounter("flow_bypassed.closed", t);
fc->flow_bypassed_pkts = StatsRegisterCounter("flow_bypassed.pkts", t);
fc->flow_bypassed_bytes = StatsRegisterCounter("flow_bypassed.bytes", t);
+
+ fc->memcap_pressure = StatsRegisterCounter("memcap_pressure", t);
+ fc->memcap_pressure_max = StatsRegisterMaxCounter("memcap_pressure_max", t);
}
static TmEcode FlowManagerThreadInit(ThreadVars *t, const void *initdata, void **data)
//#define FM_PROFILE
+static void GetWorkUnitSizing(const uint32_t pass_in_sec, const uint32_t rows, const uint32_t mp,
+ const bool emergency, uint64_t *wu_sleep, uint32_t *wu_rows)
+{
+ if (emergency) {
+ *wu_rows = rows;
+ *wu_sleep = 250;
+ return;
+ }
+
+ uint32_t full_pass_in_ms = pass_in_sec * 1000;
+ float perc = MIN((((float)(100 - mp) / (float)100)), 1);
+ full_pass_in_ms *= perc;
+ full_pass_in_ms = MAX(full_pass_in_ms, 333);
+
+ uint32_t work_unit_ms = 999 * perc;
+ work_unit_ms = MAX(work_unit_ms, 250);
+
+ uint32_t wus_per_full_pass = full_pass_in_ms / work_unit_ms;
+
+ uint32_t rows_per_wu = MAX(1, rows / wus_per_full_pass);
+ uint32_t rows_process_cost = rows_per_wu / 1000; // est 1usec per row
+
+ int32_t sleep_per_wu = work_unit_ms - rows_process_cost;
+ sleep_per_wu = MAX(sleep_per_wu, 10);
+#if 0
+ float passes_sec = 1000.0/(float)full_pass_in_ms;
+ SCLogNotice("full_pass_in_ms %u perc %f rows %u "
+ "wus_per_full_pass %u rows_per_wu %u work_unit_ms %u sleep_per_wu %u => passes/s %f rows/s %u",
+ full_pass_in_ms, perc, rows,
+ wus_per_full_pass, rows_per_wu, work_unit_ms, (uint32_t)sleep_per_wu,
+ passes_sec, (uint32_t)((float)rows * passes_sec));
+#endif
+ *wu_sleep = sleep_per_wu;
+ *wu_rows = rows_per_wu;
+}
+
/** \brief Thread that manages the flow table and times out flows.
*
* \param td ThreadVars casted to void ptr
bool emerg = false;
bool prev_emerg = false;
uint32_t other_last_sec = 0; /**< last sec stamp when defrag etc ran */
- uint32_t flow_last_sec = 0;
/* VJ leaving disabled for now, as hosts are only used by tags and the numbers
* are really low. Might confuse ppl
uint16_t flow_mgr_host_prune = StatsRegisterCounter("hosts.pruned", th_v);
uint16_t flow_mgr_host_spare = StatsRegisterCounter("hosts.spare", th_v);
*/
memset(&ts, 0, sizeof(ts));
- uint32_t hash_passes = 0;
-#ifdef FM_PROFILE
- uint32_t hash_row_checks = 0;
- uint32_t hash_passes_chunks = 0;
-#endif
- uint32_t hash_full_passes = 0;
const uint32_t min_timeout = FlowTimeoutsMin();
const uint32_t pass_in_sec = min_timeout ? min_timeout * 8 : 60;
memset(&startts, 0, sizeof(startts));
gettimeofday(&startts, NULL);
- uint32_t hash_pass_iter = 0;
uint32_t emerg_over_cnt = 0;
uint64_t next_run_ms = 0;
+ const uint32_t rows = ftd->max - ftd->min;
+ uint32_t pos = 0;
+ uint32_t rows_per_wu = 0;
+ uint64_t sleep_per_wu = 0;
+
+ uint32_t mp = MemcapsGetPressure() * 100;
+ if (ftd->instance == 0) {
+ StatsSetUI64(th_v, ftd->cnt.memcap_pressure, mp);
+ StatsSetUI64(th_v, ftd->cnt.memcap_pressure_max, mp);
+ }
+ GetWorkUnitSizing(pass_in_sec, rows, mp, emerg, &sleep_per_wu, &rows_per_wu);
while (1)
{
TimeGet(&ts);
SCLogDebug("ts %" PRIdMAX "", (intmax_t)ts.tv_sec);
const uint64_t ts_ms = ts.tv_sec * 1000 + ts.tv_usec / 1000;
- const uint32_t rt = (uint32_t)ts.tv_sec;
const bool emerge_p = (emerg && !prev_emerg);
if (emerge_p) {
next_run_ms = 0;
FlowSparePoolUpdate(sq_len);
}
}
- const uint32_t secs_passed = rt - flow_last_sec;
/* try to time out flows */
FlowTimeoutCounters counters = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
if (emerg) {
/* in emergency mode, do a full pass of the hash table */
FlowTimeoutHash(&ftd->timeout, &ts, ftd->min, ftd->max, &counters);
- hash_passes++;
- hash_full_passes++;
- hash_passes++;
-#ifdef FM_PROFILE
- hash_passes_chunks += 1;
- hash_row_checks += counters.rows_checked;
-#endif
StatsIncr(th_v, ftd->cnt.flow_mgr_full_pass);
} else {
- /* non-emergency mode: scan part of the hash */
- const uint32_t chunks = MIN(secs_passed, pass_in_sec);
- for (uint32_t i = 0; i < chunks; i++) {
- FlowTimeoutHashInChunks(&ftd->timeout, &ts, ftd->min, ftd->max,
- &counters, hash_pass_iter, pass_in_sec);
- hash_pass_iter++;
- if (hash_pass_iter == pass_in_sec) {
- hash_pass_iter = 0;
- hash_full_passes++;
- StatsIncr(th_v, ftd->cnt.flow_mgr_full_pass);
- }
+ SCLogDebug("hash %u:%u slice starting at %u with %u rows", ftd->min, ftd->max, pos,
+ rows_per_wu);
+
+ const uint32_t ppos = pos;
+ FlowTimeoutHashInChunks(
+ &ftd->timeout, &ts, ftd->min, ftd->max, &counters, rows_per_wu, &pos);
+ if (ppos > pos) {
+ StatsIncr(th_v, ftd->cnt.flow_mgr_full_pass);
}
- hash_passes++;
-#ifdef FM_PROFILE
- hash_row_checks += counters.rows_checked;
- hash_passes_chunks += chunks;
-#endif
}
- flow_last_sec = rt;
/*
StatsAddUI64(th_v, flow_mgr_host_prune, (uint64_t)hosts_pruned);
* clear emergency bit if we have at least xx flows pruned. */
uint32_t len = FlowSpareGetPoolSize();
StatsSetUI64(th_v, ftd->cnt.flow_mgr_spare, (uint64_t)len);
+
if (emerg == true) {
SCLogDebug("flow_sparse_q.len = %"PRIu32" prealloc: %"PRIu32
"flow_spare_q status: %"PRIu32"%% flows at the queue",
len, flow_config.prealloc, len * 100 / flow_config.prealloc);
- /* only if we have pruned this "emergency_recovery" percentage
- * of flows, we will unset the emergency bit */
- if (len * 100 / flow_config.prealloc > flow_config.emergency_recovery) {
- emerg_over_cnt++;
- } else {
- emerg_over_cnt = 0;
+ /* only if we have pruned this "emergency_recovery" percentage
+ * of flows, we will unset the emergency bit */
+ if (len * 100 / flow_config.prealloc > flow_config.emergency_recovery) {
+ emerg_over_cnt++;
+ } else {
+ emerg_over_cnt = 0;
+ }
+
+ if (emerg_over_cnt >= 30) {
+ SC_ATOMIC_AND(flow_flags, ~FLOW_EMERGENCY);
+ FlowTimeoutsReset();
+
+ emerg = false;
+ prev_emerg = false;
+ emerg_over_cnt = 0;
+ SCLogNotice("Flow emergency mode over, back to normal... unsetting"
+ " FLOW_EMERGENCY bit (ts.tv_sec: %" PRIuMAX ", "
+ "ts.tv_usec:%" PRIuMAX ") flow_spare_q status(): %" PRIu32
+ "%% flows at the queue",
+ (uintmax_t)ts.tv_sec, (uintmax_t)ts.tv_usec,
+ len * 100 / flow_config.prealloc);
+
+ StatsIncr(th_v, ftd->cnt.flow_emerg_mode_over);
+ }
}
- if (emerg_over_cnt >= 30) {
- SC_ATOMIC_AND(flow_flags, ~FLOW_EMERGENCY);
- FlowTimeoutsReset();
-
- emerg = false;
- prev_emerg = false;
- emerg_over_cnt = 0;
- hash_pass_iter = 0;
- SCLogNotice("Flow emergency mode over, back to normal... unsetting"
- " FLOW_EMERGENCY bit (ts.tv_sec: %"PRIuMAX", "
- "ts.tv_usec:%"PRIuMAX") flow_spare_q status(): %"PRIu32
- "%% flows at the queue", (uintmax_t)ts.tv_sec,
- (uintmax_t)ts.tv_usec, len * 100 / flow_config.prealloc);
-
- StatsIncr(th_v, ftd->cnt.flow_emerg_mode_over);
+ /* update work units */
+ mp = MemcapsGetPressure() * 100;
+ if (ftd->instance == 0) {
+ StatsSetUI64(th_v, ftd->cnt.memcap_pressure, mp);
+ StatsSetUI64(th_v, ftd->cnt.memcap_pressure_max, mp);
}
- }
- next_run_ms = ts_ms + 667;
- if (emerg)
- next_run_ms = ts_ms + 250;
- }
- if (flow_last_sec == 0) {
- flow_last_sec = rt;
- }
+ GetWorkUnitSizing(pass_in_sec, rows, mp, emerg, &sleep_per_wu, &rows_per_wu);
- if (ftd->instance == 0 &&
- (other_last_sec == 0 || other_last_sec < (uint32_t)ts.tv_sec)) {
- DefragTimeoutHash(&ts);
- //uint32_t hosts_pruned =
- HostTimeoutHash(&ts);
- IPPairTimeoutHash(&ts);
- HttpRangeContainersTimeoutHash(&ts);
- other_last_sec = (uint32_t)ts.tv_sec;
+ next_run_ms = ts_ms + sleep_per_wu;
+ }
+ if (other_last_sec == 0 || other_last_sec < (uint32_t)ts.tv_sec) {
+ if (ftd->instance == 0) {
+ DefragTimeoutHash(&ts);
+ // uint32_t hosts_pruned =
+ HostTimeoutHash(&ts);
+ IPPairTimeoutHash(&ts);
+ HttpRangeContainersTimeoutHash(&ts);
+ other_last_sec = (uint32_t)ts.tv_sec;
+ }
}
-
#ifdef FM_PROFILE
struct timeval run_endts;
established_cnt, closing_cnt);
#ifdef FM_PROFILE
- SCLogNotice("hash passes %u avg chunks %u full %u rows %u (rows/s %u)",
- hash_passes, hash_passes_chunks / (hash_passes ? hash_passes : 1),
- hash_full_passes, hash_row_checks,
- hash_row_checks / ((uint32_t)active.tv_sec?(uint32_t)active.tv_sec:1));
-
gettimeofday(&endts, NULL);
struct timeval total_run_time;
timersub(&endts, &startts, &total_run_time);