return m;
}
-//#define FM_PROFILE
-
static void GetWorkUnitSizing(const uint32_t pass_in_sec, const uint32_t rows, const uint32_t mp,
const bool emergency, uint64_t *wu_sleep, uint32_t *wu_rows)
{
{
FlowManagerThreadData *ftd = thread_data;
struct timeval ts;
- uint32_t established_cnt = 0, new_cnt = 0, closing_cnt = 0;
bool emerg = false;
bool prev_emerg = false;
uint32_t other_last_sec = 0; /**< last sec stamp when defrag etc ran */
-/* VJ leaving disabled for now, as hosts are only used by tags and the numbers
- * are really low. Might confuse ppl
- uint16_t flow_mgr_host_prune = StatsRegisterCounter("hosts.pruned", th_v);
- uint16_t flow_mgr_host_active = StatsRegisterCounter("hosts.active", th_v);
- uint16_t flow_mgr_host_spare = StatsRegisterCounter("hosts.spare", th_v);
-*/
memset(&ts, 0, sizeof(ts));
-
const uint32_t min_timeout = FlowTimeoutsMin();
const uint32_t pass_in_sec = min_timeout ? min_timeout * 8 : 60;
SCLogDebug("FM %s/%d starting. min_timeout %us. Full hash pass in %us", th_v->name,
ftd->instance, min_timeout, pass_in_sec);
-#ifdef FM_PROFILE
- struct timeval endts;
- struct timeval active;
- struct timeval paused;
- struct timeval sleeping;
- memset(&endts, 0, sizeof(endts));
- memset(&active, 0, sizeof(active));
- memset(&paused, 0, sizeof(paused));
- memset(&sleeping, 0, sizeof(sleeping));
-#endif
-
- struct timeval startts;
- memset(&startts, 0, sizeof(startts));
- gettimeofday(&startts, NULL);
-
uint32_t emerg_over_cnt = 0;
uint64_t next_run_ms = 0;
const uint32_t rows = ftd->max - ftd->min;
{
if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {
TmThreadsSetFlag(th_v, THV_PAUSED);
-#ifdef FM_PROFILE
- struct timeval pause_startts;
- memset(&pause_startts, 0, sizeof(pause_startts));
- gettimeofday(&pause_startts, NULL);
-#endif
TmThreadTestThreadUnPaused(th_v);
-#ifdef FM_PROFILE
- struct timeval pause_endts;
- memset(&pause_endts, 0, sizeof(pause_endts));
- gettimeofday(&pause_endts, NULL);
- struct timeval pause_time;
- memset(&pause_time, 0, sizeof(pause_time));
- timersub(&pause_endts, &pause_startts, &pause_time);
- timeradd(&paused, &pause_time, &paused);
-#endif
TmThreadsUnsetFlag(th_v, THV_PAUSED);
}
if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) {
emerg = true;
}
-#ifdef FM_PROFILE
- struct timeval run_startts;
- memset(&run_startts, 0, sizeof(run_startts));
- gettimeofday(&run_startts, NULL);
-#endif
/* Get the time */
memset(&ts, 0, sizeof(ts));
TimeGet(&ts);
}
}
- /*
- StatsAddUI64(th_v, flow_mgr_host_prune, (uint64_t)hosts_pruned);
- uint32_t hosts_active = HostGetActiveCount();
- StatsSetUI64(th_v, flow_mgr_host_active, (uint64_t)hosts_active);
- uint32_t hosts_spare = HostGetSpareCount();
- StatsSetUI64(th_v, flow_mgr_host_spare, (uint64_t)hosts_spare);
- */
StatsAddUI64(th_v, ftd->cnt.flow_mgr_cnt_clo, (uint64_t)counters.clo);
StatsAddUI64(th_v, ftd->cnt.flow_mgr_cnt_new, (uint64_t)counters.new);
StatsAddUI64(th_v, ftd->cnt.flow_mgr_cnt_est, (uint64_t)counters.est);
StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_notimeout, (uint64_t)counters.flows_notimeout);
StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_timeout, (uint64_t)counters.flows_timeout);
- //StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_removed, (uint64_t)counters.flows_removed);
StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_timeout_inuse, (uint64_t)counters.flows_timeout_inuse);
StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_aside, (uint64_t)counters.flows_aside);
StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_aside_needs_work, (uint64_t)counters.flows_aside_needs_work);
StatsAddUI64(th_v, ftd->cnt.flow_bypassed_bytes, (uint64_t)counters.bypassed_bytes);
StatsSetUI64(th_v, ftd->cnt.flow_mgr_rows_maxlen, (uint64_t)counters.rows_maxlen);
- // TODO AVG MAXLEN
- // TODO LOOKUP STEPS MAXLEN and AVG LEN
- /* Don't fear, FlowManagerThread is here...
- * clear emergency bit if we have at least xx flows pruned. */
+
uint32_t len = FlowSpareGetPoolSize();
StatsSetUI64(th_v, ftd->cnt.flow_mgr_spare, (uint64_t)len);
if (other_last_sec == 0 || other_last_sec < (uint32_t)ts.tv_sec) {
if (ftd->instance == 0) {
DefragTimeoutHash(&ts);
- // uint32_t hosts_pruned =
HostTimeoutHash(&ts);
IPPairTimeoutHash(&ts);
HttpRangeContainersTimeoutHash(&ts);
}
}
-#ifdef FM_PROFILE
- struct timeval run_endts;
- memset(&run_endts, 0, sizeof(run_endts));
- gettimeofday(&run_endts, NULL);
- struct timeval run_time;
- memset(&run_time, 0, sizeof(run_time));
- timersub(&run_endts, &run_startts, &run_time);
- timeradd(&active, &run_time, &active);
-#endif
-
if (TmThreadsCheckFlag(th_v, THV_KILL)) {
StatsSyncCounters(th_v);
break;
}
-#ifdef FM_PROFILE
- struct timeval sleep_startts;
- memset(&sleep_startts, 0, sizeof(sleep_startts));
- gettimeofday(&sleep_startts, NULL);
-#endif
-
if (emerg || !time_is_live) {
usleep(250);
} else {
SCCtrlMutexUnlock(&flow_manager_ctrl_mutex);
}
-#ifdef FM_PROFILE
- struct timeval sleep_endts;
- memset(&sleep_endts, 0, sizeof(sleep_endts));
- gettimeofday(&sleep_endts, NULL);
-
- struct timeval sleep_time;
- memset(&sleep_time, 0, sizeof(sleep_time));
- timersub(&sleep_endts, &sleep_startts, &sleep_time);
- timeradd(&sleeping, &sleep_time, &sleeping);
-#endif
SCLogDebug("woke up... %s", SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY ? "emergency":"");
StatsSyncCountersIfSignalled(th_v);
}
- SCLogPerf("%" PRIu32 " new flows, %" PRIu32 " established flows were "
- "timed out, %"PRIu32" flows in closed state", new_cnt,
- established_cnt, closing_cnt);
-
-#ifdef FM_PROFILE
- gettimeofday(&endts, NULL);
- struct timeval total_run_time;
- timersub(&endts, &startts, &total_run_time);
-
- SCLogNotice("FM: active %u.%us out of %u.%us; sleeping %u.%us, paused %u.%us",
- (uint32_t)active.tv_sec, (uint32_t)active.tv_usec,
- (uint32_t)total_run_time.tv_sec, (uint32_t)total_run_time.tv_usec,
- (uint32_t)sleeping.tv_sec, (uint32_t)sleeping.tv_usec,
- (uint32_t)paused.tv_sec, (uint32_t)paused.tv_usec);
-#endif
return TM_ECODE_OK;
}
memset(&ts, 0, sizeof(ts));
uint32_t fr_passes = 0;
-#ifdef FM_PROFILE
- struct timeval endts;
- struct timeval active;
- struct timeval paused;
- struct timeval sleeping;
- memset(&endts, 0, sizeof(endts));
- memset(&active, 0, sizeof(active));
- memset(&paused, 0, sizeof(paused));
- memset(&sleeping, 0, sizeof(sleeping));
-#endif
- struct timeval startts;
- memset(&startts, 0, sizeof(startts));
- gettimeofday(&startts, NULL);
-
while (1)
{
if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {
TmThreadsSetFlag(th_v, THV_PAUSED);
-#ifdef FM_PROFILE
- struct timeval pause_startts;
- memset(&pause_startts, 0, sizeof(pause_startts));
- gettimeofday(&pause_startts, NULL);
-#endif
TmThreadTestThreadUnPaused(th_v);
-
-#ifdef FM_PROFILE
- struct timeval pause_endts;
- memset(&pause_endts, 0, sizeof(pause_endts));
- gettimeofday(&pause_endts, NULL);
-
- struct timeval pause_time;
- memset(&pause_time, 0, sizeof(pause_time));
- timersub(&pause_endts, &pause_startts, &pause_time);
- timeradd(&paused, &pause_time, &paused);
-#endif
TmThreadsUnsetFlag(th_v, THV_PAUSED);
}
fr_passes++;
-#ifdef FM_PROFILE
- struct timeval run_startts;
- memset(&run_startts, 0, sizeof(run_startts));
- gettimeofday(&run_startts, NULL);
-#endif
SC_ATOMIC_ADD(flowrec_busy,1);
FlowQueuePrivate list = FlowQueueExtractPrivate(&flow_recycle_q);
}
SC_ATOMIC_SUB(flowrec_busy,1);
-#ifdef FM_PROFILE
- struct timeval run_endts;
- memset(&run_endts, 0, sizeof(run_endts));
- gettimeofday(&run_endts, NULL);
-
- struct timeval run_time;
- memset(&run_time, 0, sizeof(run_time));
- timersub(&run_endts, &run_startts, &run_time);
- timeradd(&active, &run_time, &active);
-#endif
-
if (bail) {
break;
}
-#ifdef FM_PROFILE
- struct timeval sleep_startts;
- memset(&sleep_startts, 0, sizeof(sleep_startts));
- gettimeofday(&sleep_startts, NULL);
-#endif
usleep(250);
-#ifdef FM_PROFILE
- struct timeval sleep_endts;
- memset(&sleep_endts, 0, sizeof(sleep_endts));
- gettimeofday(&sleep_endts, NULL);
- struct timeval sleep_time;
- memset(&sleep_time, 0, sizeof(sleep_time));
- timersub(&sleep_endts, &sleep_startts, &sleep_time);
- timeradd(&sleeping, &sleep_time, &sleeping);
-#endif
SCLogDebug("woke up...");
StatsSyncCountersIfSignalled(th_v);
}
StatsSyncCounters(th_v);
-#ifdef FM_PROFILE
- gettimeofday(&endts, NULL);
- struct timeval total_run_time;
- timersub(&endts, &startts, &total_run_time);
- SCLogNotice("FR: active %u.%us out of %u.%us; sleeping %u.%us, paused %u.%us",
- (uint32_t)active.tv_sec, (uint32_t)active.tv_usec,
- (uint32_t)total_run_time.tv_sec, (uint32_t)total_run_time.tv_usec,
- (uint32_t)sleeping.tv_sec, (uint32_t)sleeping.tv_usec,
- (uint32_t)paused.tv_sec, (uint32_t)paused.tv_usec);
-
- SCLogNotice("FR passes %u passes/s %u", fr_passes,
- (uint32_t)fr_passes/((uint32_t)active.tv_sec?(uint32_t)active.tv_sec:1));
-#endif
SCLogPerf("%"PRIu64" flows processed", recycled_cnt);
return TM_ECODE_OK;
}