/* Run mode selected at suricata.c */
extern int run_mode;
+/* multi flow mananger support */
+static uint32_t flowmgr_number = 1;
+/* atomic counter for flow managers, to assign instance id */
+SC_ATOMIC_DECLARE(uint32_t, flowmgr_cnt);
+
SC_ATOMIC_EXTERN(unsigned int, flow_flags);
/* 1 seconds */
ThreadVars *tv = NULL;
int cnt = 0;
- SCCtrlCondSignal(&flow_manager_ctrl_cond);
+ /* wake up threads */
+ uint32_t u;
+ for (u = 0; u < flowmgr_number; u++)
+ SCCtrlCondSignal(&flow_manager_ctrl_cond);
SCMutexLock(&tv_root_lock);
if (strcasecmp(tv->name, "FlowManagerThread") == 0) {
TmThreadsSetFlag(tv, THV_KILL);
TmThreadsSetFlag(tv, THV_DEINIT);
+ cnt++;
+ }
+ tv = tv->next;
+ }
+
+ /* wake up threads, another try */
+ for (u = 0; u < flowmgr_number; u++)
+ SCCtrlCondSignal(&flow_manager_ctrl_cond);
+ tv = tv_root[TVT_MGMT];
+ while (tv != NULL) {
+ if (strcasecmp(tv->name, "FlowManagerThread") == 0) {
/* be sure it has shut down */
while (!TmThreadsCheckFlag(tv, THV_CLOSED)) {
usleep(100);
}
- cnt++;
}
tv = tv->next;
}
+
/* not possible, unless someone decides to rename FlowManagerThread */
if (cnt == 0) {
SCMutexUnlock(&tv_root_lock);
}
SCMutexUnlock(&tv_root_lock);
+
+ /* reset count, so we can kill and respawn (unix socket) */
+ SC_ATOMIC_SET(flowmgr_cnt, 0);
return;
}
*
* \param ts timestamp
* \param try_cnt number of flows to time out max (0 is unlimited)
+ * \param hash_min min hash index to consider
+ * \param hash_max max hash index to consider
* \param counters ptr to FlowTimeoutCounters structure
*
* \retval cnt number of timed out flow
*/
-uint32_t FlowTimeoutHash(struct timeval *ts, uint32_t try_cnt, FlowTimeoutCounters *counters) {
+static uint32_t FlowTimeoutHash(struct timeval *ts, uint32_t try_cnt,
+ uint32_t hash_min, uint32_t hash_max,
+ FlowTimeoutCounters *counters)
+{
uint32_t idx = 0;
uint32_t cnt = 0;
int emergency = 0;
if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY)
emergency = 1;
- for (idx = 0; idx < flow_config.hash_size; idx++) {
+ for (idx = hash_min; idx < hash_max; idx++) {
FlowBucket *fb = &flow_hash[idx];
if (FBLOCK_TRYLOCK(fb) != 0)
extern int g_detect_disabled;
+typedef struct FlowManagerThreadData_ {
+ uint32_t instance;
+ uint32_t min;
+ uint32_t max;
+
+ uint16_t flow_mgr_cnt_clo;
+ uint16_t flow_mgr_cnt_new;
+ uint16_t flow_mgr_cnt_est;
+ uint16_t flow_mgr_memuse;
+ uint16_t flow_mgr_spare;
+ uint16_t flow_emerg_mode_enter;
+ uint16_t flow_emerg_mode_over;
+} FlowManagerThreadData;
+
+static TmEcode FlowManagerThreadInit(ThreadVars *t, void *initdata, void **data)
+{
+ FlowManagerThreadData *ftd = SCCalloc(1, sizeof(FlowManagerThreadData));
+ if (ftd == NULL)
+ return TM_ECODE_FAILED;
+
+ ftd->instance = SC_ATOMIC_ADD(flowmgr_cnt, 1);
+ SCLogDebug("flow manager instance %u", ftd->instance);
+
+ /* set the min and max value used for hash row walking
+ * each thread has it's own section of the flow hash */
+ uint32_t range = flow_config.hash_size / flowmgr_number;
+ if (ftd->instance == 1)
+ ftd->max = range;
+ else if (ftd->instance == flowmgr_number) {
+ ftd->min = (range * (ftd->instance - 1));
+ ftd->max = flow_config.hash_size;
+ } else {
+ ftd->min = (range * (ftd->instance - 1));
+ ftd->max = (range * ftd->instance);
+ }
+ BUG_ON(ftd->min > flow_config.hash_size || ftd->max > flow_config.hash_size);
+
+ SCLogDebug("instance %u hash range %u %u", ftd->instance, ftd->min, ftd->max);
+
+ /* pass thread data back to caller */
+ *data = ftd;
+
+ ftd->flow_mgr_cnt_clo = SCPerfTVRegisterCounter("flow_mgr.closed_pruned", t,
+ SC_PERF_TYPE_UINT64, "NULL");
+ ftd->flow_mgr_cnt_new = SCPerfTVRegisterCounter("flow_mgr.new_pruned", t,
+ SC_PERF_TYPE_UINT64, "NULL");
+ ftd->flow_mgr_cnt_est = SCPerfTVRegisterCounter("flow_mgr.est_pruned", t,
+ SC_PERF_TYPE_UINT64, "NULL");
+ ftd->flow_mgr_memuse = SCPerfTVRegisterCounter("flow.memuse", t,
+ SC_PERF_TYPE_UINT64, "NULL");
+ ftd->flow_mgr_spare = SCPerfTVRegisterCounter("flow.spare", t,
+ SC_PERF_TYPE_UINT64, "NULL");
+ ftd->flow_emerg_mode_enter = SCPerfTVRegisterCounter("flow.emerg_mode_entered", t,
+ SC_PERF_TYPE_UINT64, "NULL");
+ ftd->flow_emerg_mode_over = SCPerfTVRegisterCounter("flow.emerg_mode_over", t,
+ SC_PERF_TYPE_UINT64, "NULL");
+
+ PacketPoolInit();
+ return TM_ECODE_OK;
+}
+
+static TmEcode FlowManagerThreadDeinit(ThreadVars *t, void *data)
+{
+ SCFree(data);
+ return TM_ECODE_OK;
+}
+
+
/** \brief Thread that manages the flow table and times out flows.
*
* \param td ThreadVars casted to void ptr
*
* Keeps an eye on the spare list, alloc flows if needed...
*/
-void *FlowManagerThread(void *td)
+static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
{
- /* block usr1. usr1 to be handled by the main thread only */
+ /* block usr2. usr1 to be handled by the main thread only */
UtilSignalBlock(SIGUSR2);
- ThreadVars *th_v = (ThreadVars *)td;
+ FlowManagerThreadData *ftd = thread_data;
struct timeval ts;
uint32_t established_cnt = 0, new_cnt = 0, closing_cnt = 0;
int emerg = FALSE;
SC_PERF_TYPE_Q_NORMAL,
"NULL");
*/
- uint16_t flow_mgr_cnt_clo = SCPerfTVRegisterCounter("flow_mgr.closed_pruned", th_v,
- SC_PERF_TYPE_UINT64,
- "NULL");
- uint16_t flow_mgr_cnt_new = SCPerfTVRegisterCounter("flow_mgr.new_pruned", th_v,
- SC_PERF_TYPE_UINT64,
- "NULL");
- uint16_t flow_mgr_cnt_est = SCPerfTVRegisterCounter("flow_mgr.est_pruned", th_v,
- SC_PERF_TYPE_UINT64,
- "NULL");
- uint16_t flow_mgr_memuse = SCPerfTVRegisterCounter("flow.memuse", th_v,
- SC_PERF_TYPE_UINT64,
- "NULL");
- uint16_t flow_mgr_spare = SCPerfTVRegisterCounter("flow.spare", th_v,
- SC_PERF_TYPE_UINT64,
- "NULL");
- uint16_t flow_emerg_mode_enter = SCPerfTVRegisterCounter("flow.emerg_mode_entered", th_v,
- SC_PERF_TYPE_UINT64,
- "NULL");
- uint16_t flow_emerg_mode_over = SCPerfTVRegisterCounter("flow.emerg_mode_over", th_v,
- SC_PERF_TYPE_UINT64,
- "NULL");
-
- if (th_v->thread_setup_flags != 0)
- TmThreadSetupOptions(th_v);
-
memset(&ts, 0, sizeof(ts));
- FlowForceReassemblySetup(g_detect_disabled);
-
- /* set the thread name */
- if (SCSetThreadName(th_v->name) < 0) {
- SCLogWarning(SC_ERR_THREAD_INIT, "Unable to set thread name");
- } else {
- SCLogDebug("%s started...", th_v->name);
- }
-
- th_v->sc_perf_pca = SCPerfGetAllCountersArray(&th_v->sc_perf_pctx);
- SCPerfAddToClubbedTMTable(th_v->name, &th_v->sc_perf_pctx);
-
- /* Set the threads capability */
- th_v->cap_flags = 0;
- SCDropCaps(th_v);
- PacketPoolInit();
-
FlowHashDebugInit();
- TmThreadsSetFlag(th_v, THV_INIT_DONE);
while (1)
{
if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {
SCLogDebug("Flow emergency mode entered...");
- SCPerfCounterIncr(flow_emerg_mode_enter, th_v->sc_perf_pca);
+ SCPerfCounterIncr(ftd->flow_emerg_mode_enter, th_v->sc_perf_pca);
}
}
}
/* see if we still have enough spare flows */
- FlowUpdateSpareFlows();
+ if (ftd->instance == 1)
+ FlowUpdateSpareFlows();
/* try to time out flows */
FlowTimeoutCounters counters = { 0, 0, 0, };
- FlowTimeoutHash(&ts, 0 /* check all */, &counters);
+ FlowTimeoutHash(&ts, 0 /* check all */, ftd->min, ftd->max, &counters);
- DefragTimeoutHash(&ts);
- //uint32_t hosts_pruned =
- HostTimeoutHash(&ts);
+ if (ftd->instance == 1) {
+ DefragTimeoutHash(&ts);
+ //uint32_t hosts_pruned =
+ HostTimeoutHash(&ts);
+ }
/*
SCPerfCounterAddUI64(flow_mgr_host_prune, th_v->sc_perf_pca, (uint64_t)hosts_pruned);
uint32_t hosts_active = HostGetActiveCount();
uint32_t hosts_spare = HostGetSpareCount();
SCPerfCounterSetUI64(flow_mgr_host_spare, th_v->sc_perf_pca, (uint64_t)hosts_spare);
*/
- SCPerfCounterAddUI64(flow_mgr_cnt_clo, th_v->sc_perf_pca, (uint64_t)counters.clo);
- SCPerfCounterAddUI64(flow_mgr_cnt_new, th_v->sc_perf_pca, (uint64_t)counters.new);
- SCPerfCounterAddUI64(flow_mgr_cnt_est, th_v->sc_perf_pca, (uint64_t)counters.est);
+ SCPerfCounterAddUI64(ftd->flow_mgr_cnt_clo, th_v->sc_perf_pca, (uint64_t)counters.clo);
+ SCPerfCounterAddUI64(ftd->flow_mgr_cnt_new, th_v->sc_perf_pca, (uint64_t)counters.new);
+ SCPerfCounterAddUI64(ftd->flow_mgr_cnt_est, th_v->sc_perf_pca, (uint64_t)counters.est);
long long unsigned int flow_memuse = SC_ATOMIC_GET(flow_memuse);
- SCPerfCounterSetUI64(flow_mgr_memuse, th_v->sc_perf_pca, (uint64_t)flow_memuse);
+ SCPerfCounterSetUI64(ftd->flow_mgr_memuse, th_v->sc_perf_pca, (uint64_t)flow_memuse);
uint32_t len = 0;
FQLOCK_LOCK(&flow_spare_q);
len = flow_spare_q.len;
FQLOCK_UNLOCK(&flow_spare_q);
- SCPerfCounterSetUI64(flow_mgr_spare, th_v->sc_perf_pca, (uint64_t)len);
+ SCPerfCounterSetUI64(ftd->flow_mgr_spare, th_v->sc_perf_pca, (uint64_t)len);
/* Don't fear, FlowManagerThread is here...
* clear emergency bit if we have at least xx flows pruned. */
"%% flows at the queue", (uintmax_t)ts.tv_sec,
(uintmax_t)ts.tv_usec, len * 100 / flow_config.prealloc);
- SCPerfCounterIncr(flow_emerg_mode_over, th_v->sc_perf_pca);
+ SCPerfCounterIncr(ftd->flow_emerg_mode_over, th_v->sc_perf_pca);
} else {
flow_update_delay_sec = FLOW_EMERG_MODE_UPDATE_DELAY_SEC;
flow_update_delay_nsec = FLOW_EMERG_MODE_UPDATE_DELAY_NSEC;
SCPerfSyncCountersIfSignalled(th_v);
}
- TmThreadsSetFlag(th_v, THV_RUNNING_DONE);
- TmThreadWaitForFlag(th_v, THV_DEINIT);
-
FlowHashDebugDeinit();
SCLogInfo("%" PRIu32 " new flows, %" PRIu32 " established flows were "
"timed out, %"PRIu32" flows in closed state", new_cnt,
established_cnt, closing_cnt);
- TmThreadsSetFlag(th_v, THV_CLOSED);
- pthread_exit((void *) 0);
- return NULL;
+ return TM_ECODE_OK;
}
/** \brief spawn the flow manager thread */
void FlowManagerThreadSpawn()
{
- ThreadVars *tv_flowmgr = NULL;
+ intmax_t setting = 1;
+ (void)ConfGetInt("flow.managers", &setting);
+
+ if (setting < 1 || setting > 1024) {
+ SCLogError(SC_ERR_INVALID_ARGUMENTS,
+ "invalid flow.managers setting %"PRIdMAX, setting);
+ exit(EXIT_FAILURE);
+ }
+ flowmgr_number = (uint32_t)setting;
+ SCLogInfo("using %u flow manager threads", flowmgr_number);
+ FlowForceReassemblySetup(g_detect_disabled);
SCCtrlCondInit(&flow_manager_ctrl_cond, NULL);
SCCtrlMutexInit(&flow_manager_ctrl_mutex, NULL);
- tv_flowmgr = TmThreadCreateMgmtThread("FlowManagerThread",
- FlowManagerThread, 0);
+ uint32_t u;
+ for (u = 0; u < flowmgr_number; u++) {
+ ThreadVars *tv_flowmgr = NULL;
- TmThreadSetCPU(tv_flowmgr, MANAGEMENT_CPU_SET);
+ char name[32] = "";
+ snprintf(name, sizeof(name), "FlowManagerThread%02u", u+1);
- if (tv_flowmgr == NULL) {
- printf("ERROR: TmThreadsCreate failed\n");
- exit(1);
- }
- if (TmThreadSpawn(tv_flowmgr) != TM_ECODE_OK) {
- printf("ERROR: TmThreadSpawn failed\n");
- exit(1);
- }
+ tv_flowmgr = TmThreadCreateMgmtThreadByName("FlowManagerThread",
+ "FlowManager", 0);
+ BUG_ON(tv_flowmgr == NULL);
+ TmThreadSetCPU(tv_flowmgr, MANAGEMENT_CPU_SET);
+
+ if (tv_flowmgr == NULL) {
+ printf("ERROR: TmThreadsCreate failed\n");
+ exit(1);
+ }
+ if (TmThreadSpawn(tv_flowmgr) != TM_ECODE_OK) {
+ printf("ERROR: TmThreadSpawn failed\n");
+ exit(1);
+ }
+ }
return;
}
return;
}
+void TmModuleFlowManagerRegister (void)
+{
+ tmm_modules[TMM_FLOWMANAGER].name = "FlowManager";
+ tmm_modules[TMM_FLOWMANAGER].ThreadInit = FlowManagerThreadInit;
+ tmm_modules[TMM_FLOWMANAGER].ThreadDeinit = FlowManagerThreadDeinit;
+// tmm_modules[TMM_FLOWMANAGER].RegisterTests = FlowManagerRegisterTests;
+ tmm_modules[TMM_FLOWMANAGER].Management = FlowManager;
+ tmm_modules[TMM_FLOWMANAGER].cap_flags = 0;
+ tmm_modules[TMM_FLOWMANAGER].flags = TM_FLAG_MANAGEMENT_TM;
+ SCLogDebug("%s registered", tmm_modules[TMM_FLOWMANAGER].name);
+}
+
#ifdef UNITTESTS
/**
TimeGet(&ts);
/* try to time out flows */
FlowTimeoutCounters counters = { 0, 0, 0, };
- FlowTimeoutHash(&ts, 0 /* check all */, &counters);
+ FlowTimeoutHash(&ts, 0 /* check all */, 0, flow_config.hash_size, &counters);
if (flow_recycle_q.len > 0) {
result = 1;