From 0ac94ef777d222bb530e6f314b1092b2b4b24744 Mon Sep 17 00:00:00 2001 From: Victor Julien Date: Mon, 21 Jul 2014 15:13:42 +0200 Subject: [PATCH] flow-recycler: support multiple instances Use new management API to run the flow recycler. Make number of threads configurable: flow: memcap: 64mb hash-size: 65536 prealloc: 10000 emergency-recovery: 30 managers: 2 recyclers: 2 This sets up 2 flow recyclers. --- src/flow-manager.c | 159 +++++++++++++++++++++++++++++---------------- src/flow-manager.h | 1 + src/suricata.c | 1 + suricata.yaml.in | 1 + 4 files changed, 107 insertions(+), 55 deletions(-) diff --git a/src/flow-manager.c b/src/flow-manager.c index c8f58938f8..45d5ecc334 100644 --- a/src/flow-manager.c +++ b/src/flow-manager.c @@ -74,6 +74,11 @@ static uint32_t flowmgr_number = 1; /* atomic counter for flow managers, to assign instance id */ SC_ATOMIC_DECLARE(uint32_t, flowmgr_cnt); +/* multi flow recycler support */ +static uint32_t flowrec_number = 1; +/* atomic counter for flow recyclers, to assign instance id */ +SC_ATOMIC_DECLARE(uint32_t, flowrec_cnt); + SC_ATOMIC_EXTERN(unsigned int, flow_flags); /* 1 seconds */ @@ -654,52 +659,56 @@ void FlowManagerThreadSpawn() return; } +typedef struct FlowRecyclerThreadData_ { + void *output_thread_data; +} FlowRecyclerThreadData; + +static TmEcode FlowRecyclerThreadInit(ThreadVars *t, void *initdata, void **data) +{ + FlowRecyclerThreadData *ftd = SCCalloc(1, sizeof(FlowRecyclerThreadData)); + if (ftd == NULL) + return TM_ECODE_FAILED; + + if (OutputFlowLogThreadInit(t, NULL, &ftd->output_thread_data) != TM_ECODE_OK) { + SCLogError(SC_ERR_THREAD_INIT, "initializing flow log API for thread failed"); + SCFree(ftd); + return TM_ECODE_FAILED; + } + SCLogDebug("output_thread_data %p", ftd->output_thread_data); + + *data = ftd; + return TM_ECODE_OK; +} + +static TmEcode FlowRecyclerThreadDeinit(ThreadVars *t, void *data) +{ + FlowRecyclerThreadData *ftd = (FlowRecyclerThreadData *)data; + if (ftd->output_thread_data != NULL) + OutputFlowLogThreadDeinit(t, ftd->output_thread_data); + + SCFree(data); + return TM_ECODE_OK; +} + /** \brief Thread that manages timed out flows. * * \param td ThreadVars casted to void ptr */ -void *FlowRecyclerThread(void *td) +static TmEcode FlowRecycler(ThreadVars *th_v, void *thread_data) { /* block usr2. usr2 to be handled by the main thread only */ UtilSignalBlock(SIGUSR2); - ThreadVars *th_v = (ThreadVars *)td; struct timeval ts; struct timespec cond_time; int flow_update_delay_sec = FLOW_NORMAL_MODE_UPDATE_DELAY_SEC; int flow_update_delay_nsec = FLOW_NORMAL_MODE_UPDATE_DELAY_NSEC; uint64_t recycled_cnt = 0; - void *output_thread_data = NULL; - - if (th_v->thread_setup_flags != 0) - TmThreadSetupOptions(th_v); + FlowRecyclerThreadData *ftd = (FlowRecyclerThreadData *)thread_data; + BUG_ON(ftd == NULL); memset(&ts, 0, sizeof(ts)); - /* set the thread name */ - if (SCSetThreadName(th_v->name) < 0) { - SCLogWarning(SC_ERR_THREAD_INIT, "Unable to set thread name"); - } else { - SCLogDebug("%s started...", th_v->name); - } - - /* Set the threads capability */ - th_v->cap_flags = 0; - SCDropCaps(th_v); - - if (OutputFlowLogThreadInit(th_v, NULL, &output_thread_data) != TM_ECODE_OK) { - SCLogError(SC_ERR_THREAD_INIT, "initializing flow log API for thread failed"); - - /* failure */ - TmThreadsSetFlag(th_v, THV_RUNNING_DONE); - TmThreadWaitForFlag(th_v, THV_DEINIT); - TmThreadsSetFlag(th_v, THV_CLOSED); - pthread_exit((void *) 0); - return NULL; - } - SCLogDebug("output_thread_data %p", output_thread_data); - - TmThreadsSetFlag(th_v, THV_INIT_DONE); while (1) { if (TmThreadsCheckFlag(th_v, THV_PAUSE)) { @@ -725,7 +734,7 @@ void *FlowRecyclerThread(void *td) while ((f = FlowDequeue(&flow_recycle_q)) != NULL) { FLOWLOCK_WRLOCK(f); - (void)OutputFlowLog(th_v, output_thread_data, f); + (void)OutputFlowLog(th_v, ftd->output_thread_data, f); FlowClearMemory (f, f->protomap); FLOWLOCK_UNLOCK(f); @@ -753,17 +762,9 @@ void *FlowRecyclerThread(void *td) SCPerfSyncCountersIfSignalled(th_v); } - if (output_thread_data != NULL) - OutputFlowLogThreadDeinit(th_v, output_thread_data); - SCLogInfo("%"PRIu64" flows processed", recycled_cnt); - TmThreadsSetFlag(th_v, THV_RUNNING_DONE); - TmThreadWaitForFlag(th_v, THV_DEINIT); - - TmThreadsSetFlag(th_v, THV_CLOSED); - pthread_exit((void *) 0); - return NULL; + return TM_ECODE_OK; } int FlowRecyclerReadyToShutdown(void) @@ -779,25 +780,44 @@ int FlowRecyclerReadyToShutdown(void) /** \brief spawn the flow recycler thread */ void FlowRecyclerThreadSpawn() { - ThreadVars *tv_flowmgr = NULL; + intmax_t setting = 1; + (void)ConfGetInt("flow.recyclers", &setting); + + if (setting < 1 || setting > 1024) { + SCLogError(SC_ERR_INVALID_ARGUMENTS, + "invalid flow.recyclers setting %"PRIdMAX, setting); + exit(EXIT_FAILURE); + } + flowrec_number = (uint32_t)setting; + + SCLogInfo("using %u flow recycler threads", flowrec_number); SCCtrlCondInit(&flow_recycler_ctrl_cond, NULL); SCCtrlMutexInit(&flow_recycler_ctrl_mutex, NULL); - tv_flowmgr = TmThreadCreateMgmtThread("FlowRecyclerThread", - FlowRecyclerThread, 0); - TmThreadSetCPU(tv_flowmgr, MANAGEMENT_CPU_SET); + uint32_t u; + for (u = 0; u < flowrec_number; u++) { + ThreadVars *tv_flowmgr = NULL; - if (tv_flowmgr == NULL) { - printf("ERROR: TmThreadsCreate failed\n"); - exit(1); - } - if (TmThreadSpawn(tv_flowmgr) != TM_ECODE_OK) { - printf("ERROR: TmThreadSpawn failed\n"); - exit(1); - } + char name[32] = ""; + snprintf(name, sizeof(name), "FlowRecyclerThread%02u", u+1); + tv_flowmgr = TmThreadCreateMgmtThreadByName("FlowRecyclerThread", + "FlowRecycler", 0); + BUG_ON(tv_flowmgr == NULL); + + TmThreadSetCPU(tv_flowmgr, MANAGEMENT_CPU_SET); + + if (tv_flowmgr == NULL) { + printf("ERROR: TmThreadsCreate failed\n"); + exit(1); + } + if (TmThreadSpawn(tv_flowmgr) != TM_ECODE_OK) { + printf("ERROR: TmThreadSpawn failed\n"); + exit(1); + } + } return; } @@ -820,27 +840,41 @@ void FlowKillFlowRecyclerThread(void) usleep(10); } while (FlowRecyclerReadyToShutdown() == 0); + /* wake up threads */ + uint32_t u; + for (u = 0; u < flowrec_number; u++) + SCCtrlCondSignal(&flow_recycler_ctrl_cond); + SCMutexLock(&tv_root_lock); - /* flow manager thread(s) is/are a part of mgmt threads */ + /* flow recycler thread(s) is/are a part of mgmt threads */ tv = tv_root[TVT_MGMT]; while (tv != NULL) { if (strcasecmp(tv->name, "FlowRecyclerThread") == 0) { TmThreadsSetFlag(tv, THV_KILL); TmThreadsSetFlag(tv, THV_DEINIT); + cnt++; + } + tv = tv->next; + } - SCCtrlCondSignal(&flow_recycler_ctrl_cond); + /* wake up threads, another try */ + for (u = 0; u < flowrec_number; u++) + SCCtrlCondSignal(&flow_recycler_ctrl_cond); + tv = tv_root[TVT_MGMT]; + while (tv != NULL) { + if (strcasecmp(tv->name, "FlowRecyclerThread") == 0) { /* be sure it has shut down */ while (!TmThreadsCheckFlag(tv, THV_CLOSED)) { usleep(100); } - cnt++; } tv = tv->next; } + /* not possible, unless someone decides to rename FlowManagerThread */ if (cnt == 0) { SCMutexUnlock(&tv_root_lock); @@ -848,6 +882,9 @@ void FlowKillFlowRecyclerThread(void) } SCMutexUnlock(&tv_root_lock); + + /* reset count, so we can kill and respawn (unix socket) */ + SC_ATOMIC_SET(flowrec_cnt, 0); return; } @@ -863,6 +900,18 @@ void TmModuleFlowManagerRegister (void) SCLogDebug("%s registered", tmm_modules[TMM_FLOWMANAGER].name); } +void TmModuleFlowRecyclerRegister (void) +{ + tmm_modules[TMM_FLOWRECYCLER].name = "FlowRecycler"; + tmm_modules[TMM_FLOWRECYCLER].ThreadInit = FlowRecyclerThreadInit; + tmm_modules[TMM_FLOWRECYCLER].ThreadDeinit = FlowRecyclerThreadDeinit; +// tmm_modules[TMM_FLOWRECYCLER].RegisterTests = FlowRecyclerRegisterTests; + tmm_modules[TMM_FLOWRECYCLER].Management = FlowRecycler; + tmm_modules[TMM_FLOWRECYCLER].cap_flags = 0; + tmm_modules[TMM_FLOWRECYCLER].flags = TM_FLAG_MANAGEMENT_TM; + SCLogDebug("%s registered", tmm_modules[TMM_FLOWRECYCLER].name); +} + #ifdef UNITTESTS /** diff --git a/src/flow-manager.h b/src/flow-manager.h index 2999b4de4d..a2e6f6dc06 100644 --- a/src/flow-manager.h +++ b/src/flow-manager.h @@ -43,5 +43,6 @@ void FlowRecyclerThreadSpawn(void); void FlowKillFlowRecyclerThread(void); void TmModuleFlowManagerRegister (void); +void TmModuleFlowRecyclerRegister (void); #endif /* __FLOW_MANAGER_H__ */ diff --git a/src/suricata.c b/src/suricata.c index 0f33e10999..a804b54447 100644 --- a/src/suricata.c +++ b/src/suricata.c @@ -795,6 +795,7 @@ void RegisterAllModules() { /* managers */ TmModuleFlowManagerRegister(); + TmModuleFlowRecyclerRegister(); /* nfq */ TmModuleReceiveNFQRegister(); TmModuleVerdictNFQRegister(); diff --git a/suricata.yaml.in b/suricata.yaml.in index 682db04daf..fd985dc4af 100644 --- a/suricata.yaml.in +++ b/suricata.yaml.in @@ -624,6 +624,7 @@ flow: prealloc: 10000 emergency-recovery: 30 #managers: 1 # default to one flow manager + #recyclers: 1 # default to one flow recycler thread # This option controls the use of vlan ids in the flow (and defrag) # hashing. Normally this should be enabled, but in some (broken) -- 2.47.3