From: Victor Julien Date: Sat, 12 Jul 2014 11:47:33 +0000 (+0200) Subject: flow-manager: support multiple instances X-Git-Tag: suricata-2.1beta1~39 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=e0841218f0b609cbd91b8e758c1c196f5d3a2bc5;p=thirdparty%2Fsuricata.git flow-manager: support multiple instances Use new management API to run the flow manager. Support multiple flow managers, where each of them works with it's own part of the flow hash. Make number of threads configurable: flow: memcap: 64mb hash-size: 65536 prealloc: 10000 emergency-recovery: 30 managers: 2 This sets up 2 flow managers. Handle misc tasks only in instance 1: Handle defrag hash timeout handing, host hash timeout handling and flow spare queue updating only from the first instance. --- diff --git a/src/flow-manager.c b/src/flow-manager.c index 4dbd7e58bc..c8f58938f8 100644 --- a/src/flow-manager.c +++ b/src/flow-manager.c @@ -69,6 +69,11 @@ /* Run mode selected at suricata.c */ extern int run_mode; +/* multi flow mananger support */ +static uint32_t flowmgr_number = 1; +/* atomic counter for flow managers, to assign instance id */ +SC_ATOMIC_DECLARE(uint32_t, flowmgr_cnt); + SC_ATOMIC_EXTERN(unsigned int, flow_flags); /* 1 seconds */ @@ -96,7 +101,10 @@ void FlowKillFlowManagerThread(void) ThreadVars *tv = NULL; int cnt = 0; - SCCtrlCondSignal(&flow_manager_ctrl_cond); + /* wake up threads */ + uint32_t u; + for (u = 0; u < flowmgr_number; u++) + SCCtrlCondSignal(&flow_manager_ctrl_cond); SCMutexLock(&tv_root_lock); @@ -107,16 +115,27 @@ void FlowKillFlowManagerThread(void) if (strcasecmp(tv->name, "FlowManagerThread") == 0) { TmThreadsSetFlag(tv, THV_KILL); TmThreadsSetFlag(tv, THV_DEINIT); + cnt++; + } + tv = tv->next; + } + + /* wake up threads, another try */ + for (u = 0; u < flowmgr_number; u++) + SCCtrlCondSignal(&flow_manager_ctrl_cond); + tv = tv_root[TVT_MGMT]; + while (tv != NULL) { + if (strcasecmp(tv->name, "FlowManagerThread") == 0) { /* be sure it has shut down */ while (!TmThreadsCheckFlag(tv, THV_CLOSED)) { usleep(100); } - cnt++; } tv = tv->next; } + /* not possible, unless someone decides to rename FlowManagerThread */ if (cnt == 0) { SCMutexUnlock(&tv_root_lock); @@ -124,6 +143,9 @@ void FlowKillFlowManagerThread(void) } SCMutexUnlock(&tv_root_lock); + + /* reset count, so we can kill and respawn (unix socket) */ + SC_ATOMIC_SET(flowmgr_cnt, 0); return; } @@ -324,11 +346,16 @@ static uint32_t FlowManagerHashRowTimeout(Flow *f, struct timeval *ts, * * \param ts timestamp * \param try_cnt number of flows to time out max (0 is unlimited) + * \param hash_min min hash index to consider + * \param hash_max max hash index to consider * \param counters ptr to FlowTimeoutCounters structure * * \retval cnt number of timed out flow */ -uint32_t FlowTimeoutHash(struct timeval *ts, uint32_t try_cnt, FlowTimeoutCounters *counters) { +static uint32_t FlowTimeoutHash(struct timeval *ts, uint32_t try_cnt, + uint32_t hash_min, uint32_t hash_max, + FlowTimeoutCounters *counters) +{ uint32_t idx = 0; uint32_t cnt = 0; int emergency = 0; @@ -336,7 +363,7 @@ uint32_t FlowTimeoutHash(struct timeval *ts, uint32_t try_cnt, FlowTimeoutCounte if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) emergency = 1; - for (idx = 0; idx < flow_config.hash_size; idx++) { + for (idx = hash_min; idx < hash_max; idx++) { FlowBucket *fb = &flow_hash[idx]; if (FBLOCK_TRYLOCK(fb) != 0) @@ -362,18 +389,86 @@ next: extern int g_detect_disabled; +typedef struct FlowManagerThreadData_ { + uint32_t instance; + uint32_t min; + uint32_t max; + + uint16_t flow_mgr_cnt_clo; + uint16_t flow_mgr_cnt_new; + uint16_t flow_mgr_cnt_est; + uint16_t flow_mgr_memuse; + uint16_t flow_mgr_spare; + uint16_t flow_emerg_mode_enter; + uint16_t flow_emerg_mode_over; +} FlowManagerThreadData; + +static TmEcode FlowManagerThreadInit(ThreadVars *t, void *initdata, void **data) +{ + FlowManagerThreadData *ftd = SCCalloc(1, sizeof(FlowManagerThreadData)); + if (ftd == NULL) + return TM_ECODE_FAILED; + + ftd->instance = SC_ATOMIC_ADD(flowmgr_cnt, 1); + SCLogDebug("flow manager instance %u", ftd->instance); + + /* set the min and max value used for hash row walking + * each thread has it's own section of the flow hash */ + uint32_t range = flow_config.hash_size / flowmgr_number; + if (ftd->instance == 1) + ftd->max = range; + else if (ftd->instance == flowmgr_number) { + ftd->min = (range * (ftd->instance - 1)); + ftd->max = flow_config.hash_size; + } else { + ftd->min = (range * (ftd->instance - 1)); + ftd->max = (range * ftd->instance); + } + BUG_ON(ftd->min > flow_config.hash_size || ftd->max > flow_config.hash_size); + + SCLogDebug("instance %u hash range %u %u", ftd->instance, ftd->min, ftd->max); + + /* pass thread data back to caller */ + *data = ftd; + + ftd->flow_mgr_cnt_clo = SCPerfTVRegisterCounter("flow_mgr.closed_pruned", t, + SC_PERF_TYPE_UINT64, "NULL"); + ftd->flow_mgr_cnt_new = SCPerfTVRegisterCounter("flow_mgr.new_pruned", t, + SC_PERF_TYPE_UINT64, "NULL"); + ftd->flow_mgr_cnt_est = SCPerfTVRegisterCounter("flow_mgr.est_pruned", t, + SC_PERF_TYPE_UINT64, "NULL"); + ftd->flow_mgr_memuse = SCPerfTVRegisterCounter("flow.memuse", t, + SC_PERF_TYPE_UINT64, "NULL"); + ftd->flow_mgr_spare = SCPerfTVRegisterCounter("flow.spare", t, + SC_PERF_TYPE_UINT64, "NULL"); + ftd->flow_emerg_mode_enter = SCPerfTVRegisterCounter("flow.emerg_mode_entered", t, + SC_PERF_TYPE_UINT64, "NULL"); + ftd->flow_emerg_mode_over = SCPerfTVRegisterCounter("flow.emerg_mode_over", t, + SC_PERF_TYPE_UINT64, "NULL"); + + PacketPoolInit(); + return TM_ECODE_OK; +} + +static TmEcode FlowManagerThreadDeinit(ThreadVars *t, void *data) +{ + SCFree(data); + return TM_ECODE_OK; +} + + /** \brief Thread that manages the flow table and times out flows. * * \param td ThreadVars casted to void ptr * * Keeps an eye on the spare list, alloc flows if needed... */ -void *FlowManagerThread(void *td) +static TmEcode FlowManager(ThreadVars *th_v, void *thread_data) { - /* block usr1. usr1 to be handled by the main thread only */ + /* block usr2. usr1 to be handled by the main thread only */ UtilSignalBlock(SIGUSR2); - ThreadVars *th_v = (ThreadVars *)td; + FlowManagerThreadData *ftd = thread_data; struct timeval ts; uint32_t established_cnt = 0, new_cnt = 0, closing_cnt = 0; int emerg = FALSE; @@ -394,53 +489,10 @@ void *FlowManagerThread(void *td) SC_PERF_TYPE_Q_NORMAL, "NULL"); */ - uint16_t flow_mgr_cnt_clo = SCPerfTVRegisterCounter("flow_mgr.closed_pruned", th_v, - SC_PERF_TYPE_UINT64, - "NULL"); - uint16_t flow_mgr_cnt_new = SCPerfTVRegisterCounter("flow_mgr.new_pruned", th_v, - SC_PERF_TYPE_UINT64, - "NULL"); - uint16_t flow_mgr_cnt_est = SCPerfTVRegisterCounter("flow_mgr.est_pruned", th_v, - SC_PERF_TYPE_UINT64, - "NULL"); - uint16_t flow_mgr_memuse = SCPerfTVRegisterCounter("flow.memuse", th_v, - SC_PERF_TYPE_UINT64, - "NULL"); - uint16_t flow_mgr_spare = SCPerfTVRegisterCounter("flow.spare", th_v, - SC_PERF_TYPE_UINT64, - "NULL"); - uint16_t flow_emerg_mode_enter = SCPerfTVRegisterCounter("flow.emerg_mode_entered", th_v, - SC_PERF_TYPE_UINT64, - "NULL"); - uint16_t flow_emerg_mode_over = SCPerfTVRegisterCounter("flow.emerg_mode_over", th_v, - SC_PERF_TYPE_UINT64, - "NULL"); - - if (th_v->thread_setup_flags != 0) - TmThreadSetupOptions(th_v); - memset(&ts, 0, sizeof(ts)); - FlowForceReassemblySetup(g_detect_disabled); - - /* set the thread name */ - if (SCSetThreadName(th_v->name) < 0) { - SCLogWarning(SC_ERR_THREAD_INIT, "Unable to set thread name"); - } else { - SCLogDebug("%s started...", th_v->name); - } - - th_v->sc_perf_pca = SCPerfGetAllCountersArray(&th_v->sc_perf_pctx); - SCPerfAddToClubbedTMTable(th_v->name, &th_v->sc_perf_pctx); - - /* Set the threads capability */ - th_v->cap_flags = 0; - SCDropCaps(th_v); - PacketPoolInit(); - FlowHashDebugInit(); - TmThreadsSetFlag(th_v, THV_INIT_DONE); while (1) { if (TmThreadsCheckFlag(th_v, THV_PAUSE)) { @@ -457,7 +509,7 @@ void *FlowManagerThread(void *td) SCLogDebug("Flow emergency mode entered..."); - SCPerfCounterIncr(flow_emerg_mode_enter, th_v->sc_perf_pca); + SCPerfCounterIncr(ftd->flow_emerg_mode_enter, th_v->sc_perf_pca); } } @@ -472,16 +524,19 @@ void *FlowManagerThread(void *td) } /* see if we still have enough spare flows */ - FlowUpdateSpareFlows(); + if (ftd->instance == 1) + FlowUpdateSpareFlows(); /* try to time out flows */ FlowTimeoutCounters counters = { 0, 0, 0, }; - FlowTimeoutHash(&ts, 0 /* check all */, &counters); + FlowTimeoutHash(&ts, 0 /* check all */, ftd->min, ftd->max, &counters); - DefragTimeoutHash(&ts); - //uint32_t hosts_pruned = - HostTimeoutHash(&ts); + if (ftd->instance == 1) { + DefragTimeoutHash(&ts); + //uint32_t hosts_pruned = + HostTimeoutHash(&ts); + } /* SCPerfCounterAddUI64(flow_mgr_host_prune, th_v->sc_perf_pca, (uint64_t)hosts_pruned); uint32_t hosts_active = HostGetActiveCount(); @@ -489,17 +544,17 @@ void *FlowManagerThread(void *td) uint32_t hosts_spare = HostGetSpareCount(); SCPerfCounterSetUI64(flow_mgr_host_spare, th_v->sc_perf_pca, (uint64_t)hosts_spare); */ - SCPerfCounterAddUI64(flow_mgr_cnt_clo, th_v->sc_perf_pca, (uint64_t)counters.clo); - SCPerfCounterAddUI64(flow_mgr_cnt_new, th_v->sc_perf_pca, (uint64_t)counters.new); - SCPerfCounterAddUI64(flow_mgr_cnt_est, th_v->sc_perf_pca, (uint64_t)counters.est); + SCPerfCounterAddUI64(ftd->flow_mgr_cnt_clo, th_v->sc_perf_pca, (uint64_t)counters.clo); + SCPerfCounterAddUI64(ftd->flow_mgr_cnt_new, th_v->sc_perf_pca, (uint64_t)counters.new); + SCPerfCounterAddUI64(ftd->flow_mgr_cnt_est, th_v->sc_perf_pca, (uint64_t)counters.est); long long unsigned int flow_memuse = SC_ATOMIC_GET(flow_memuse); - SCPerfCounterSetUI64(flow_mgr_memuse, th_v->sc_perf_pca, (uint64_t)flow_memuse); + SCPerfCounterSetUI64(ftd->flow_mgr_memuse, th_v->sc_perf_pca, (uint64_t)flow_memuse); uint32_t len = 0; FQLOCK_LOCK(&flow_spare_q); len = flow_spare_q.len; FQLOCK_UNLOCK(&flow_spare_q); - SCPerfCounterSetUI64(flow_mgr_spare, th_v->sc_perf_pca, (uint64_t)len); + SCPerfCounterSetUI64(ftd->flow_mgr_spare, th_v->sc_perf_pca, (uint64_t)len); /* Don't fear, FlowManagerThread is here... * clear emergency bit if we have at least xx flows pruned. */ @@ -523,7 +578,7 @@ void *FlowManagerThread(void *td) "%% flows at the queue", (uintmax_t)ts.tv_sec, (uintmax_t)ts.tv_usec, len * 100 / flow_config.prealloc); - SCPerfCounterIncr(flow_emerg_mode_over, th_v->sc_perf_pca); + SCPerfCounterIncr(ftd->flow_emerg_mode_over, th_v->sc_perf_pca); } else { flow_update_delay_sec = FLOW_EMERG_MODE_UPDATE_DELAY_SEC; flow_update_delay_nsec = FLOW_EMERG_MODE_UPDATE_DELAY_NSEC; @@ -547,42 +602,55 @@ void *FlowManagerThread(void *td) SCPerfSyncCountersIfSignalled(th_v); } - TmThreadsSetFlag(th_v, THV_RUNNING_DONE); - TmThreadWaitForFlag(th_v, THV_DEINIT); - FlowHashDebugDeinit(); SCLogInfo("%" PRIu32 " new flows, %" PRIu32 " established flows were " "timed out, %"PRIu32" flows in closed state", new_cnt, established_cnt, closing_cnt); - TmThreadsSetFlag(th_v, THV_CLOSED); - pthread_exit((void *) 0); - return NULL; + return TM_ECODE_OK; } /** \brief spawn the flow manager thread */ void FlowManagerThreadSpawn() { - ThreadVars *tv_flowmgr = NULL; + intmax_t setting = 1; + (void)ConfGetInt("flow.managers", &setting); + + if (setting < 1 || setting > 1024) { + SCLogError(SC_ERR_INVALID_ARGUMENTS, + "invalid flow.managers setting %"PRIdMAX, setting); + exit(EXIT_FAILURE); + } + flowmgr_number = (uint32_t)setting; + SCLogInfo("using %u flow manager threads", flowmgr_number); + FlowForceReassemblySetup(g_detect_disabled); SCCtrlCondInit(&flow_manager_ctrl_cond, NULL); SCCtrlMutexInit(&flow_manager_ctrl_mutex, NULL); - tv_flowmgr = TmThreadCreateMgmtThread("FlowManagerThread", - FlowManagerThread, 0); + uint32_t u; + for (u = 0; u < flowmgr_number; u++) { + ThreadVars *tv_flowmgr = NULL; - TmThreadSetCPU(tv_flowmgr, MANAGEMENT_CPU_SET); + char name[32] = ""; + snprintf(name, sizeof(name), "FlowManagerThread%02u", u+1); - if (tv_flowmgr == NULL) { - printf("ERROR: TmThreadsCreate failed\n"); - exit(1); - } - if (TmThreadSpawn(tv_flowmgr) != TM_ECODE_OK) { - printf("ERROR: TmThreadSpawn failed\n"); - exit(1); - } + tv_flowmgr = TmThreadCreateMgmtThreadByName("FlowManagerThread", + "FlowManager", 0); + BUG_ON(tv_flowmgr == NULL); + TmThreadSetCPU(tv_flowmgr, MANAGEMENT_CPU_SET); + + if (tv_flowmgr == NULL) { + printf("ERROR: TmThreadsCreate failed\n"); + exit(1); + } + if (TmThreadSpawn(tv_flowmgr) != TM_ECODE_OK) { + printf("ERROR: TmThreadSpawn failed\n"); + exit(1); + } + } return; } @@ -783,6 +851,18 @@ void FlowKillFlowRecyclerThread(void) return; } +void TmModuleFlowManagerRegister (void) +{ + tmm_modules[TMM_FLOWMANAGER].name = "FlowManager"; + tmm_modules[TMM_FLOWMANAGER].ThreadInit = FlowManagerThreadInit; + tmm_modules[TMM_FLOWMANAGER].ThreadDeinit = FlowManagerThreadDeinit; +// tmm_modules[TMM_FLOWMANAGER].RegisterTests = FlowManagerRegisterTests; + tmm_modules[TMM_FLOWMANAGER].Management = FlowManager; + tmm_modules[TMM_FLOWMANAGER].cap_flags = 0; + tmm_modules[TMM_FLOWMANAGER].flags = TM_FLAG_MANAGEMENT_TM; + SCLogDebug("%s registered", tmm_modules[TMM_FLOWMANAGER].name); +} + #ifdef UNITTESTS /** @@ -1033,7 +1113,7 @@ static int FlowMgrTest05 (void) { TimeGet(&ts); /* try to time out flows */ FlowTimeoutCounters counters = { 0, 0, 0, }; - FlowTimeoutHash(&ts, 0 /* check all */, &counters); + FlowTimeoutHash(&ts, 0 /* check all */, 0, flow_config.hash_size, &counters); if (flow_recycle_q.len > 0) { result = 1; diff --git a/src/flow-manager.h b/src/flow-manager.h index 3ad5c9b63a..2999b4de4d 100644 --- a/src/flow-manager.h +++ b/src/flow-manager.h @@ -42,4 +42,6 @@ SCCtrlMutex flow_recycler_ctrl_mutex; void FlowRecyclerThreadSpawn(void); void FlowKillFlowRecyclerThread(void); +void TmModuleFlowManagerRegister (void); + #endif /* __FLOW_MANAGER_H__ */ diff --git a/src/suricata.c b/src/suricata.c index 9be6671abd..0f33e10999 100644 --- a/src/suricata.c +++ b/src/suricata.c @@ -793,6 +793,8 @@ int g_ut_covered; void RegisterAllModules() { + /* managers */ + TmModuleFlowManagerRegister(); /* nfq */ TmModuleReceiveNFQRegister(); TmModuleVerdictNFQRegister(); diff --git a/src/tm-modules.c b/src/tm-modules.c index 2b2d242b43..b52a0daa73 100644 --- a/src/tm-modules.c +++ b/src/tm-modules.c @@ -255,6 +255,8 @@ const char * TmModuleTmmIdToString(TmmId id) CASE_CODE (TMM_JSONSSHLOG); CASE_CODE (TMM_JSONTLSLOG); CASE_CODE (TMM_OUTPUTJSON); + CASE_CODE (TMM_FLOWMANAGER); + CASE_CODE (TMM_FLOWRECYCLER); CASE_CODE (TMM_SIZE); } diff --git a/src/tm-threads-common.h b/src/tm-threads-common.h index 7514cdfbf9..de869747ba 100644 --- a/src/tm-threads-common.h +++ b/src/tm-threads-common.h @@ -93,6 +93,10 @@ typedef enum { TMM_DECODENFLOG, TMM_JSONFLOWLOG, TMM_JSONNETFLOWLOG, + + TMM_FLOWMANAGER, + TMM_FLOWRECYCLER, + TMM_SIZE, } TmmId; diff --git a/suricata.yaml.in b/suricata.yaml.in index 9405003eeb..682db04daf 100644 --- a/suricata.yaml.in +++ b/suricata.yaml.in @@ -623,6 +623,7 @@ flow: hash-size: 65536 prealloc: 10000 emergency-recovery: 30 + #managers: 1 # default to one flow manager # This option controls the use of vlan ids in the flow (and defrag) # hashing. Normally this should be enabled, but in some (broken)