return TM_ECODE_OK;
}
-static uint32_t FlowTimeoutsMin(void)
-{
- FlowProtoTimeoutPtr t = SC_ATOMIC_GET(flow_timeouts);
- uint32_t m = -1;
- for (unsigned int i = 0; i < FLOW_PROTO_MAX; i++) {
- m = MIN(m, t[i].new_timeout);
- m = MIN(m, t[i].est_timeout);
-
- if (i == FLOW_PROTO_TCP) {
- m = MIN(m, t[i].closed_timeout);
- }
- if (i == FLOW_PROTO_TCP || i == FLOW_PROTO_UDP) {
- m = MIN(m, t[i].bypassed_timeout);
- }
- }
- return m;
-}
-
-static void GetWorkUnitSizing(const uint32_t pass_in_sec, const uint32_t rows, const uint32_t mp,
- const bool emergency, uint64_t *wu_sleep, uint32_t *wu_rows, uint32_t *rows_sec)
+/** \internal
+ * \brief calculate number of rows to scan and how much time to sleep
+ * based on the busy score `mp` (0 idle, 100 max busy).
+ *
+ * We try to to make sure we scan the hash once a second. The number size
+ * of the slice of the hash scanned is determined by our busy score 'mp'.
+ * We sleep for the remainder of the second after processing the slice,
+ * or at least an approximation of it.
+ * A minimum busy score of 10 is assumed to avoid a longer than 10 second
+ * full hash pass. This is to avoid burstiness in scanning when there is
+ * a rapid increase of the busy score, which could lead to the flow manager
+ * suddenly scanning a much larger slice of the hash leading to a burst
+ * in scan/eviction work.
+ */
+static void GetWorkUnitSizing(const uint32_t rows, const uint32_t mp, const bool emergency,
+ uint64_t *wu_sleep, uint32_t *wu_rows, uint32_t *rows_sec)
{
if (emergency) {
*wu_rows = rows;
*wu_sleep = 250;
return;
}
-
- uint32_t full_pass_in_ms = pass_in_sec * 1000;
- const float perc = MIN((((float)(100 - mp) / (float)100)), 1.0);
- full_pass_in_ms *= perc;
- full_pass_in_ms = MAX(full_pass_in_ms, 333);
-
- uint32_t work_unit_ms = 999 * perc;
- work_unit_ms = MAX(work_unit_ms, 250);
-
- const uint32_t wus_per_full_pass = full_pass_in_ms / work_unit_ms;
-
- const uint32_t rows_per_wu = MAX(1, rows / wus_per_full_pass);
- const uint32_t rows_process_cost = rows_per_wu / 1000; // est 1usec per row
-
- int32_t sleep_per_wu = work_unit_ms - rows_process_cost;
- sleep_per_wu = MAX(sleep_per_wu, 10);
-
- const float passes_sec = 1000.0 / (float)full_pass_in_ms;
-
- *wu_sleep = sleep_per_wu;
- *wu_rows = rows_per_wu;
- *rows_sec = (uint32_t)((float)rows * passes_sec);
+ /* minimum busy score is 10 */
+ const uint32_t emp = MAX(mp, 10);
+ const uint32_t rows_per_sec = (uint32_t)((float)rows * (float)((float)emp / (float)100));
+ /* calc how much time we estimate the work will take, in ms. We assume
+ * each row takes an average of 1usec. Maxing out at 1sec. */
+ const uint32_t work_per_unit = MIN(rows_per_sec / 1000, 1000);
+ /* calc how much time we need to sleep to get to the per second cadence
+ * but sleeping for at least 250ms. */
+ const uint32_t sleep_per_unit = MAX(250, 1000 - work_per_unit);
+ SCLogDebug("mp %u emp %u rows %u rows_sec %u sleep %ums", mp, emp, rows, rows_per_sec,
+ sleep_per_unit);
+
+ *wu_sleep = sleep_per_unit;
+ *wu_rows = rows_per_sec;
+ *rows_sec = rows_per_sec;
}
/** \brief Thread that manages the flow table and times out flows.
{
FlowManagerThreadData *ftd = thread_data;
const uint32_t rows = ftd->max - ftd->min;
- const uint32_t min_timeout = FlowTimeoutsMin();
- const uint32_t pass_in_sec = min_timeout ? min_timeout * 8 : 60;
const bool time_is_live = TimeModeIsLive();
uint32_t emerg_over_cnt = 0;
return TM_ECODE_OK;
}
- SCLogDebug("FM %s/%d starting. min_timeout %us. Full hash pass in %us", th_v->name,
- ftd->instance, min_timeout, pass_in_sec);
-
uint32_t mp = MemcapsGetPressure() * 100;
if (ftd->instance == 0) {
StatsSetUI64(th_v, ftd->cnt.memcap_pressure, mp);
StatsSetUI64(th_v, ftd->cnt.memcap_pressure_max, mp);
}
- GetWorkUnitSizing(pass_in_sec, rows, mp, emerg, &sleep_per_wu, &rows_per_wu, &rows_sec);
+ GetWorkUnitSizing(rows, mp, emerg, &sleep_per_wu, &rows_per_wu, &rows_sec);
StatsSetUI64(th_v, ftd->cnt.flow_mgr_rows_sec, rows_sec);
while (1)
StatsSetUI64(th_v, ftd->cnt.memcap_pressure, mp);
StatsSetUI64(th_v, ftd->cnt.memcap_pressure_max, mp);
}
- GetWorkUnitSizing(pass_in_sec, rows, mp, emerg, &sleep_per_wu, &rows_per_wu, &rows_sec);
+ GetWorkUnitSizing(rows, mp, emerg, &sleep_per_wu, &rows_per_wu, &rows_sec);
if (pmp != mp) {
StatsSetUI64(th_v, ftd->cnt.flow_mgr_rows_sec, rows_sec);
}