]> git.ipfire.org Git - people/ms/suricata.git/commitdiff
flow-recycler: support multiple instances
authorVictor Julien <victor@inliniac.net>
Mon, 21 Jul 2014 13:13:42 +0000 (15:13 +0200)
committerVictor Julien <victor@inliniac.net>
Mon, 28 Jul 2014 13:47:45 +0000 (15:47 +0200)
Use new management API to run the flow recycler.

Make number of threads configurable:

flow:
  memcap: 64mb
  hash-size: 65536
  prealloc: 10000
  emergency-recovery: 30
  managers: 2
  recyclers: 2

This sets up 2 flow recyclers.

src/flow-manager.c
src/flow-manager.h
src/suricata.c
suricata.yaml.in

index c8f58938f84fb35759f71a620a4886c2cbfbd9fa..45d5ecc334cfc3ffe97432790bbcf853531c914d 100644 (file)
@@ -74,6 +74,11 @@ static uint32_t flowmgr_number = 1;
 /* atomic counter for flow managers, to assign instance id */
 SC_ATOMIC_DECLARE(uint32_t, flowmgr_cnt);
 
+/* multi flow recycler support */
+static uint32_t flowrec_number = 1;
+/* atomic counter for flow recyclers, to assign instance id */
+SC_ATOMIC_DECLARE(uint32_t, flowrec_cnt);
+
 SC_ATOMIC_EXTERN(unsigned int, flow_flags);
 
 /* 1 seconds */
@@ -654,52 +659,56 @@ void FlowManagerThreadSpawn()
     return;
 }
 
+typedef struct FlowRecyclerThreadData_ {
+    void *output_thread_data;
+} FlowRecyclerThreadData;
+
+static TmEcode FlowRecyclerThreadInit(ThreadVars *t, void *initdata, void **data)
+{
+    FlowRecyclerThreadData *ftd = SCCalloc(1, sizeof(FlowRecyclerThreadData));
+    if (ftd == NULL)
+        return TM_ECODE_FAILED;
+
+    if (OutputFlowLogThreadInit(t, NULL, &ftd->output_thread_data) != TM_ECODE_OK) {
+        SCLogError(SC_ERR_THREAD_INIT, "initializing flow log API for thread failed");
+        SCFree(ftd);
+        return TM_ECODE_FAILED;
+    }
+    SCLogDebug("output_thread_data %p", ftd->output_thread_data);
+
+    *data = ftd;
+    return TM_ECODE_OK;
+}
+
+static TmEcode FlowRecyclerThreadDeinit(ThreadVars *t, void *data)
+{
+    FlowRecyclerThreadData *ftd = (FlowRecyclerThreadData *)data;
+    if (ftd->output_thread_data != NULL)
+        OutputFlowLogThreadDeinit(t, ftd->output_thread_data);
+
+    SCFree(data);
+    return TM_ECODE_OK;
+}
+
 /** \brief Thread that manages timed out flows.
  *
  *  \param td ThreadVars casted to void ptr
  */
-void *FlowRecyclerThread(void *td)
+static TmEcode FlowRecycler(ThreadVars *th_v, void *thread_data)
 {
     /* block usr2. usr2 to be handled by the main thread only */
     UtilSignalBlock(SIGUSR2);
 
-    ThreadVars *th_v = (ThreadVars *)td;
     struct timeval ts;
     struct timespec cond_time;
     int flow_update_delay_sec = FLOW_NORMAL_MODE_UPDATE_DELAY_SEC;
     int flow_update_delay_nsec = FLOW_NORMAL_MODE_UPDATE_DELAY_NSEC;
     uint64_t recycled_cnt = 0;
-    void *output_thread_data = NULL;
-
-    if (th_v->thread_setup_flags != 0)
-        TmThreadSetupOptions(th_v);
+    FlowRecyclerThreadData *ftd = (FlowRecyclerThreadData *)thread_data;
+    BUG_ON(ftd == NULL);
 
     memset(&ts, 0, sizeof(ts));
 
-    /* set the thread name */
-    if (SCSetThreadName(th_v->name) < 0) {
-        SCLogWarning(SC_ERR_THREAD_INIT, "Unable to set thread name");
-    } else {
-        SCLogDebug("%s started...", th_v->name);
-    }
-
-    /* Set the threads capability */
-    th_v->cap_flags = 0;
-    SCDropCaps(th_v);
-
-    if (OutputFlowLogThreadInit(th_v, NULL, &output_thread_data) != TM_ECODE_OK) {
-        SCLogError(SC_ERR_THREAD_INIT, "initializing flow log API for thread failed");
-
-        /* failure */
-        TmThreadsSetFlag(th_v, THV_RUNNING_DONE);
-        TmThreadWaitForFlag(th_v, THV_DEINIT);
-        TmThreadsSetFlag(th_v, THV_CLOSED);
-        pthread_exit((void *) 0);
-        return NULL;
-    }
-    SCLogDebug("output_thread_data %p", output_thread_data);
-
-    TmThreadsSetFlag(th_v, THV_INIT_DONE);
     while (1)
     {
         if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {
@@ -725,7 +734,7 @@ void *FlowRecyclerThread(void *td)
             while ((f = FlowDequeue(&flow_recycle_q)) != NULL) {
                 FLOWLOCK_WRLOCK(f);
 
-                (void)OutputFlowLog(th_v, output_thread_data, f);
+                (void)OutputFlowLog(th_v, ftd->output_thread_data, f);
 
                 FlowClearMemory (f, f->protomap);
                 FLOWLOCK_UNLOCK(f);
@@ -753,17 +762,9 @@ void *FlowRecyclerThread(void *td)
         SCPerfSyncCountersIfSignalled(th_v);
     }
 
-    if (output_thread_data != NULL)
-        OutputFlowLogThreadDeinit(th_v, output_thread_data);
-
     SCLogInfo("%"PRIu64" flows processed", recycled_cnt);
 
-    TmThreadsSetFlag(th_v, THV_RUNNING_DONE);
-    TmThreadWaitForFlag(th_v, THV_DEINIT);
-
-    TmThreadsSetFlag(th_v, THV_CLOSED);
-    pthread_exit((void *) 0);
-    return NULL;
+    return TM_ECODE_OK;
 }
 
 int FlowRecyclerReadyToShutdown(void)
@@ -779,25 +780,44 @@ int FlowRecyclerReadyToShutdown(void)
 /** \brief spawn the flow recycler thread */
 void FlowRecyclerThreadSpawn()
 {
-    ThreadVars *tv_flowmgr = NULL;
+    intmax_t setting = 1;
+    (void)ConfGetInt("flow.recyclers", &setting);
+
+    if (setting < 1 || setting > 1024) {
+        SCLogError(SC_ERR_INVALID_ARGUMENTS,
+                "invalid flow.recyclers setting %"PRIdMAX, setting);
+        exit(EXIT_FAILURE);
+    }
+    flowrec_number = (uint32_t)setting;
+
+    SCLogInfo("using %u flow recycler threads", flowrec_number);
 
     SCCtrlCondInit(&flow_recycler_ctrl_cond, NULL);
     SCCtrlMutexInit(&flow_recycler_ctrl_mutex, NULL);
 
-    tv_flowmgr = TmThreadCreateMgmtThread("FlowRecyclerThread",
-                                          FlowRecyclerThread, 0);
 
-    TmThreadSetCPU(tv_flowmgr, MANAGEMENT_CPU_SET);
+    uint32_t u;
+    for (u = 0; u < flowrec_number; u++) {
+        ThreadVars *tv_flowmgr = NULL;
 
-    if (tv_flowmgr == NULL) {
-        printf("ERROR: TmThreadsCreate failed\n");
-        exit(1);
-    }
-    if (TmThreadSpawn(tv_flowmgr) != TM_ECODE_OK) {
-        printf("ERROR: TmThreadSpawn failed\n");
-        exit(1);
-    }
+        char name[32] = "";
+        snprintf(name, sizeof(name), "FlowRecyclerThread%02u", u+1);
 
+        tv_flowmgr = TmThreadCreateMgmtThreadByName("FlowRecyclerThread",
+                "FlowRecycler", 0);
+        BUG_ON(tv_flowmgr == NULL);
+
+        TmThreadSetCPU(tv_flowmgr, MANAGEMENT_CPU_SET);
+
+        if (tv_flowmgr == NULL) {
+            printf("ERROR: TmThreadsCreate failed\n");
+            exit(1);
+        }
+        if (TmThreadSpawn(tv_flowmgr) != TM_ECODE_OK) {
+            printf("ERROR: TmThreadSpawn failed\n");
+            exit(1);
+        }
+    }
     return;
 }
 
@@ -820,27 +840,41 @@ void FlowKillFlowRecyclerThread(void)
         usleep(10);
     } while (FlowRecyclerReadyToShutdown() == 0);
 
+    /* wake up threads */
+    uint32_t u;
+    for (u = 0; u < flowrec_number; u++)
+        SCCtrlCondSignal(&flow_recycler_ctrl_cond);
+
     SCMutexLock(&tv_root_lock);
 
-    /* flow manager thread(s) is/are a part of mgmt threads */
+    /* flow recycler thread(s) is/are a part of mgmt threads */
     tv = tv_root[TVT_MGMT];
 
     while (tv != NULL) {
         if (strcasecmp(tv->name, "FlowRecyclerThread") == 0) {
             TmThreadsSetFlag(tv, THV_KILL);
             TmThreadsSetFlag(tv, THV_DEINIT);
+            cnt++;
+        }
+        tv = tv->next;
+    }
 
-            SCCtrlCondSignal(&flow_recycler_ctrl_cond);
+    /* wake up threads, another try */
+    for (u = 0; u < flowrec_number; u++)
+        SCCtrlCondSignal(&flow_recycler_ctrl_cond);
 
+    tv = tv_root[TVT_MGMT];
+    while (tv != NULL) {
+        if (strcasecmp(tv->name, "FlowRecyclerThread") == 0) {
             /* be sure it has shut down */
             while (!TmThreadsCheckFlag(tv, THV_CLOSED)) {
                 usleep(100);
             }
-            cnt++;
         }
         tv = tv->next;
     }
 
+
     /* not possible, unless someone decides to rename FlowManagerThread */
     if (cnt == 0) {
         SCMutexUnlock(&tv_root_lock);
@@ -848,6 +882,9 @@ void FlowKillFlowRecyclerThread(void)
     }
 
     SCMutexUnlock(&tv_root_lock);
+
+    /* reset count, so we can kill and respawn (unix socket) */
+    SC_ATOMIC_SET(flowrec_cnt, 0);
     return;
 }
 
@@ -863,6 +900,18 @@ void TmModuleFlowManagerRegister (void)
     SCLogDebug("%s registered", tmm_modules[TMM_FLOWMANAGER].name);
 }
 
+void TmModuleFlowRecyclerRegister (void)
+{
+    tmm_modules[TMM_FLOWRECYCLER].name = "FlowRecycler";
+    tmm_modules[TMM_FLOWRECYCLER].ThreadInit = FlowRecyclerThreadInit;
+    tmm_modules[TMM_FLOWRECYCLER].ThreadDeinit = FlowRecyclerThreadDeinit;
+//    tmm_modules[TMM_FLOWRECYCLER].RegisterTests = FlowRecyclerRegisterTests;
+    tmm_modules[TMM_FLOWRECYCLER].Management = FlowRecycler;
+    tmm_modules[TMM_FLOWRECYCLER].cap_flags = 0;
+    tmm_modules[TMM_FLOWRECYCLER].flags = TM_FLAG_MANAGEMENT_TM;
+    SCLogDebug("%s registered", tmm_modules[TMM_FLOWRECYCLER].name);
+}
+
 #ifdef UNITTESTS
 
 /**
index 2999b4de4d74632f04d19d125ebbcc287cfdeb78..a2e6f6dc068d7a63c25c529b0a0c43afc2ccc314 100644 (file)
@@ -43,5 +43,6 @@ void FlowRecyclerThreadSpawn(void);
 void FlowKillFlowRecyclerThread(void);
 
 void TmModuleFlowManagerRegister (void);
+void TmModuleFlowRecyclerRegister (void);
 
 #endif /* __FLOW_MANAGER_H__ */
index 0f33e10999220dbefaa0a9c478e11d16f623c16c..a804b5444782f5a09cf2091358fc039b80de42c1 100644 (file)
@@ -795,6 +795,7 @@ void RegisterAllModules()
 {
     /* managers */
     TmModuleFlowManagerRegister();
+    TmModuleFlowRecyclerRegister();
     /* nfq */
     TmModuleReceiveNFQRegister();
     TmModuleVerdictNFQRegister();
index 682db04daff24d9f4dae74a5705c530706e9661e..fd985dc4af1e99e08a02f03a66c39da554962c36 100644 (file)
@@ -624,6 +624,7 @@ flow:
   prealloc: 10000
   emergency-recovery: 30
   #managers: 1 # default to one flow manager
+  #recyclers: 1 # default to one flow recycler thread
 
 # This option controls the use of vlan ids in the flow (and defrag)
 # hashing. Normally this should be enabled, but in some (broken)