/* atomic counter for flow managers, to assign instance id */
SC_ATOMIC_DECLARE(uint32_t, flowmgr_cnt);
+/* multi flow recycler support */
+static uint32_t flowrec_number = 1;
+/* atomic counter for flow recyclers, to assign instance id */
+SC_ATOMIC_DECLARE(uint32_t, flowrec_cnt);
+
SC_ATOMIC_EXTERN(unsigned int, flow_flags);
/* 1 seconds */
return;
}
+typedef struct FlowRecyclerThreadData_ {
+ void *output_thread_data;
+} FlowRecyclerThreadData;
+
+static TmEcode FlowRecyclerThreadInit(ThreadVars *t, void *initdata, void **data)
+{
+ FlowRecyclerThreadData *ftd = SCCalloc(1, sizeof(FlowRecyclerThreadData));
+ if (ftd == NULL)
+ return TM_ECODE_FAILED;
+
+ if (OutputFlowLogThreadInit(t, NULL, &ftd->output_thread_data) != TM_ECODE_OK) {
+ SCLogError(SC_ERR_THREAD_INIT, "initializing flow log API for thread failed");
+ SCFree(ftd);
+ return TM_ECODE_FAILED;
+ }
+ SCLogDebug("output_thread_data %p", ftd->output_thread_data);
+
+ *data = ftd;
+ return TM_ECODE_OK;
+}
+
+static TmEcode FlowRecyclerThreadDeinit(ThreadVars *t, void *data)
+{
+ FlowRecyclerThreadData *ftd = (FlowRecyclerThreadData *)data;
+ if (ftd->output_thread_data != NULL)
+ OutputFlowLogThreadDeinit(t, ftd->output_thread_data);
+
+ SCFree(data);
+ return TM_ECODE_OK;
+}
+
/** \brief Thread that manages timed out flows.
*
* \param td ThreadVars casted to void ptr
*/
-void *FlowRecyclerThread(void *td)
+static TmEcode FlowRecycler(ThreadVars *th_v, void *thread_data)
{
/* block usr2. usr2 to be handled by the main thread only */
UtilSignalBlock(SIGUSR2);
- ThreadVars *th_v = (ThreadVars *)td;
struct timeval ts;
struct timespec cond_time;
int flow_update_delay_sec = FLOW_NORMAL_MODE_UPDATE_DELAY_SEC;
int flow_update_delay_nsec = FLOW_NORMAL_MODE_UPDATE_DELAY_NSEC;
uint64_t recycled_cnt = 0;
- void *output_thread_data = NULL;
-
- if (th_v->thread_setup_flags != 0)
- TmThreadSetupOptions(th_v);
+ FlowRecyclerThreadData *ftd = (FlowRecyclerThreadData *)thread_data;
+ BUG_ON(ftd == NULL);
memset(&ts, 0, sizeof(ts));
- /* set the thread name */
- if (SCSetThreadName(th_v->name) < 0) {
- SCLogWarning(SC_ERR_THREAD_INIT, "Unable to set thread name");
- } else {
- SCLogDebug("%s started...", th_v->name);
- }
-
- /* Set the threads capability */
- th_v->cap_flags = 0;
- SCDropCaps(th_v);
-
- if (OutputFlowLogThreadInit(th_v, NULL, &output_thread_data) != TM_ECODE_OK) {
- SCLogError(SC_ERR_THREAD_INIT, "initializing flow log API for thread failed");
-
- /* failure */
- TmThreadsSetFlag(th_v, THV_RUNNING_DONE);
- TmThreadWaitForFlag(th_v, THV_DEINIT);
- TmThreadsSetFlag(th_v, THV_CLOSED);
- pthread_exit((void *) 0);
- return NULL;
- }
- SCLogDebug("output_thread_data %p", output_thread_data);
-
- TmThreadsSetFlag(th_v, THV_INIT_DONE);
while (1)
{
if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {
while ((f = FlowDequeue(&flow_recycle_q)) != NULL) {
FLOWLOCK_WRLOCK(f);
- (void)OutputFlowLog(th_v, output_thread_data, f);
+ (void)OutputFlowLog(th_v, ftd->output_thread_data, f);
FlowClearMemory (f, f->protomap);
FLOWLOCK_UNLOCK(f);
SCPerfSyncCountersIfSignalled(th_v);
}
- if (output_thread_data != NULL)
- OutputFlowLogThreadDeinit(th_v, output_thread_data);
-
SCLogInfo("%"PRIu64" flows processed", recycled_cnt);
- TmThreadsSetFlag(th_v, THV_RUNNING_DONE);
- TmThreadWaitForFlag(th_v, THV_DEINIT);
-
- TmThreadsSetFlag(th_v, THV_CLOSED);
- pthread_exit((void *) 0);
- return NULL;
+ return TM_ECODE_OK;
}
int FlowRecyclerReadyToShutdown(void)
/** \brief spawn the flow recycler thread */
void FlowRecyclerThreadSpawn()
{
- ThreadVars *tv_flowmgr = NULL;
+ intmax_t setting = 1;
+ (void)ConfGetInt("flow.recyclers", &setting);
+
+ if (setting < 1 || setting > 1024) {
+ SCLogError(SC_ERR_INVALID_ARGUMENTS,
+ "invalid flow.recyclers setting %"PRIdMAX, setting);
+ exit(EXIT_FAILURE);
+ }
+ flowrec_number = (uint32_t)setting;
+
+ SCLogInfo("using %u flow recycler threads", flowrec_number);
SCCtrlCondInit(&flow_recycler_ctrl_cond, NULL);
SCCtrlMutexInit(&flow_recycler_ctrl_mutex, NULL);
- tv_flowmgr = TmThreadCreateMgmtThread("FlowRecyclerThread",
- FlowRecyclerThread, 0);
- TmThreadSetCPU(tv_flowmgr, MANAGEMENT_CPU_SET);
+ uint32_t u;
+ for (u = 0; u < flowrec_number; u++) {
+ ThreadVars *tv_flowmgr = NULL;
- if (tv_flowmgr == NULL) {
- printf("ERROR: TmThreadsCreate failed\n");
- exit(1);
- }
- if (TmThreadSpawn(tv_flowmgr) != TM_ECODE_OK) {
- printf("ERROR: TmThreadSpawn failed\n");
- exit(1);
- }
+ char name[32] = "";
+ snprintf(name, sizeof(name), "FlowRecyclerThread%02u", u+1);
+ tv_flowmgr = TmThreadCreateMgmtThreadByName("FlowRecyclerThread",
+ "FlowRecycler", 0);
+ BUG_ON(tv_flowmgr == NULL);
+
+ TmThreadSetCPU(tv_flowmgr, MANAGEMENT_CPU_SET);
+
+ if (tv_flowmgr == NULL) {
+ printf("ERROR: TmThreadsCreate failed\n");
+ exit(1);
+ }
+ if (TmThreadSpawn(tv_flowmgr) != TM_ECODE_OK) {
+ printf("ERROR: TmThreadSpawn failed\n");
+ exit(1);
+ }
+ }
return;
}
usleep(10);
} while (FlowRecyclerReadyToShutdown() == 0);
+ /* wake up threads */
+ uint32_t u;
+ for (u = 0; u < flowrec_number; u++)
+ SCCtrlCondSignal(&flow_recycler_ctrl_cond);
+
SCMutexLock(&tv_root_lock);
- /* flow manager thread(s) is/are a part of mgmt threads */
+ /* flow recycler thread(s) is/are a part of mgmt threads */
tv = tv_root[TVT_MGMT];
while (tv != NULL) {
if (strcasecmp(tv->name, "FlowRecyclerThread") == 0) {
TmThreadsSetFlag(tv, THV_KILL);
TmThreadsSetFlag(tv, THV_DEINIT);
+ cnt++;
+ }
+ tv = tv->next;
+ }
- SCCtrlCondSignal(&flow_recycler_ctrl_cond);
+ /* wake up threads, another try */
+ for (u = 0; u < flowrec_number; u++)
+ SCCtrlCondSignal(&flow_recycler_ctrl_cond);
+ tv = tv_root[TVT_MGMT];
+ while (tv != NULL) {
+ if (strcasecmp(tv->name, "FlowRecyclerThread") == 0) {
/* be sure it has shut down */
while (!TmThreadsCheckFlag(tv, THV_CLOSED)) {
usleep(100);
}
- cnt++;
}
tv = tv->next;
}
+
/* not possible, unless someone decides to rename FlowManagerThread */
if (cnt == 0) {
SCMutexUnlock(&tv_root_lock);
}
SCMutexUnlock(&tv_root_lock);
+
+ /* reset count, so we can kill and respawn (unix socket) */
+ SC_ATOMIC_SET(flowrec_cnt, 0);
return;
}
SCLogDebug("%s registered", tmm_modules[TMM_FLOWMANAGER].name);
}
+void TmModuleFlowRecyclerRegister (void)
+{
+ tmm_modules[TMM_FLOWRECYCLER].name = "FlowRecycler";
+ tmm_modules[TMM_FLOWRECYCLER].ThreadInit = FlowRecyclerThreadInit;
+ tmm_modules[TMM_FLOWRECYCLER].ThreadDeinit = FlowRecyclerThreadDeinit;
+// tmm_modules[TMM_FLOWRECYCLER].RegisterTests = FlowRecyclerRegisterTests;
+ tmm_modules[TMM_FLOWRECYCLER].Management = FlowRecycler;
+ tmm_modules[TMM_FLOWRECYCLER].cap_flags = 0;
+ tmm_modules[TMM_FLOWRECYCLER].flags = TM_FLAG_MANAGEMENT_TM;
+ SCLogDebug("%s registered", tmm_modules[TMM_FLOWRECYCLER].name);
+}
+
#ifdef UNITTESTS
/**