]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
flow-manager: support multiple instances
authorVictor Julien <victor@inliniac.net>
Sat, 12 Jul 2014 11:47:33 +0000 (13:47 +0200)
committerVictor Julien <victor@inliniac.net>
Mon, 28 Jul 2014 13:47:45 +0000 (15:47 +0200)
Use new management API to run the flow manager.

Support multiple flow managers, where each of them works with it's
own part of the flow hash.

Make number of threads configurable:

flow:
  memcap: 64mb
  hash-size: 65536
  prealloc: 10000
  emergency-recovery: 30
  managers: 2

This sets up 2 flow managers.

Handle misc tasks only in instance 1: Handle defrag hash timeout
handing, host hash timeout handling and flow spare queue updating
only from the first instance.

src/flow-manager.c
src/flow-manager.h
src/suricata.c
src/tm-modules.c
src/tm-threads-common.h
suricata.yaml.in

index 4dbd7e58bcfd590d07113c7032bfb3482a897204..c8f58938f84fb35759f71a620a4886c2cbfbd9fa 100644 (file)
 /* Run mode selected at suricata.c */
 extern int run_mode;
 
+/* multi flow mananger support */
+static uint32_t flowmgr_number = 1;
+/* atomic counter for flow managers, to assign instance id */
+SC_ATOMIC_DECLARE(uint32_t, flowmgr_cnt);
+
 SC_ATOMIC_EXTERN(unsigned int, flow_flags);
 
 /* 1 seconds */
@@ -96,7 +101,10 @@ void FlowKillFlowManagerThread(void)
     ThreadVars *tv = NULL;
     int cnt = 0;
 
-    SCCtrlCondSignal(&flow_manager_ctrl_cond);
+    /* wake up threads */
+    uint32_t u;
+    for (u = 0; u < flowmgr_number; u++)
+        SCCtrlCondSignal(&flow_manager_ctrl_cond);
 
     SCMutexLock(&tv_root_lock);
 
@@ -107,16 +115,27 @@ void FlowKillFlowManagerThread(void)
         if (strcasecmp(tv->name, "FlowManagerThread") == 0) {
             TmThreadsSetFlag(tv, THV_KILL);
             TmThreadsSetFlag(tv, THV_DEINIT);
+            cnt++;
+        }
+        tv = tv->next;
+    }
+
+    /* wake up threads, another try */
+    for (u = 0; u < flowmgr_number; u++)
+        SCCtrlCondSignal(&flow_manager_ctrl_cond);
 
+    tv = tv_root[TVT_MGMT];
+    while (tv != NULL) {
+        if (strcasecmp(tv->name, "FlowManagerThread") == 0) {
             /* be sure it has shut down */
             while (!TmThreadsCheckFlag(tv, THV_CLOSED)) {
                 usleep(100);
             }
-            cnt++;
         }
         tv = tv->next;
     }
 
+
     /* not possible, unless someone decides to rename FlowManagerThread */
     if (cnt == 0) {
         SCMutexUnlock(&tv_root_lock);
@@ -124,6 +143,9 @@ void FlowKillFlowManagerThread(void)
     }
 
     SCMutexUnlock(&tv_root_lock);
+
+    /* reset count, so we can kill and respawn (unix socket) */
+    SC_ATOMIC_SET(flowmgr_cnt, 0);
     return;
 }
 
@@ -324,11 +346,16 @@ static uint32_t FlowManagerHashRowTimeout(Flow *f, struct timeval *ts,
  *
  *  \param ts timestamp
  *  \param try_cnt number of flows to time out max (0 is unlimited)
+ *  \param hash_min min hash index to consider
+ *  \param hash_max max hash index to consider
  *  \param counters ptr to FlowTimeoutCounters structure
  *
  *  \retval cnt number of timed out flow
  */
-uint32_t FlowTimeoutHash(struct timeval *ts, uint32_t try_cnt, FlowTimeoutCounters *counters) {
+static uint32_t FlowTimeoutHash(struct timeval *ts, uint32_t try_cnt,
+        uint32_t hash_min, uint32_t hash_max,
+        FlowTimeoutCounters *counters)
+{
     uint32_t idx = 0;
     uint32_t cnt = 0;
     int emergency = 0;
@@ -336,7 +363,7 @@ uint32_t FlowTimeoutHash(struct timeval *ts, uint32_t try_cnt, FlowTimeoutCounte
     if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY)
         emergency = 1;
 
-    for (idx = 0; idx < flow_config.hash_size; idx++) {
+    for (idx = hash_min; idx < hash_max; idx++) {
         FlowBucket *fb = &flow_hash[idx];
 
         if (FBLOCK_TRYLOCK(fb) != 0)
@@ -362,18 +389,86 @@ next:
 
 extern int g_detect_disabled;
 
+typedef struct FlowManagerThreadData_ {
+    uint32_t instance;
+    uint32_t min;
+    uint32_t max;
+
+    uint16_t flow_mgr_cnt_clo;
+    uint16_t flow_mgr_cnt_new;
+    uint16_t flow_mgr_cnt_est;
+    uint16_t flow_mgr_memuse;
+    uint16_t flow_mgr_spare;
+    uint16_t flow_emerg_mode_enter;
+    uint16_t flow_emerg_mode_over;
+} FlowManagerThreadData;
+
+static TmEcode FlowManagerThreadInit(ThreadVars *t, void *initdata, void **data)
+{
+    FlowManagerThreadData *ftd = SCCalloc(1, sizeof(FlowManagerThreadData));
+    if (ftd == NULL)
+        return TM_ECODE_FAILED;
+
+    ftd->instance = SC_ATOMIC_ADD(flowmgr_cnt, 1);
+    SCLogDebug("flow manager instance %u", ftd->instance);
+
+    /* set the min and max value used for hash row walking
+     * each thread has it's own section of the flow hash */
+    uint32_t range = flow_config.hash_size / flowmgr_number;
+    if (ftd->instance == 1)
+        ftd->max = range;
+    else if (ftd->instance == flowmgr_number) {
+        ftd->min = (range * (ftd->instance - 1));
+        ftd->max = flow_config.hash_size;
+    } else {
+        ftd->min = (range * (ftd->instance - 1));
+        ftd->max = (range * ftd->instance);
+    }
+    BUG_ON(ftd->min > flow_config.hash_size || ftd->max > flow_config.hash_size);
+
+    SCLogDebug("instance %u hash range %u %u", ftd->instance, ftd->min, ftd->max);
+
+    /* pass thread data back to caller */
+    *data = ftd;
+
+    ftd->flow_mgr_cnt_clo = SCPerfTVRegisterCounter("flow_mgr.closed_pruned", t,
+            SC_PERF_TYPE_UINT64, "NULL");
+    ftd->flow_mgr_cnt_new = SCPerfTVRegisterCounter("flow_mgr.new_pruned", t,
+            SC_PERF_TYPE_UINT64, "NULL");
+    ftd->flow_mgr_cnt_est = SCPerfTVRegisterCounter("flow_mgr.est_pruned", t,
+            SC_PERF_TYPE_UINT64, "NULL");
+    ftd->flow_mgr_memuse = SCPerfTVRegisterCounter("flow.memuse", t,
+            SC_PERF_TYPE_UINT64, "NULL");
+    ftd->flow_mgr_spare = SCPerfTVRegisterCounter("flow.spare", t,
+            SC_PERF_TYPE_UINT64, "NULL");
+    ftd->flow_emerg_mode_enter = SCPerfTVRegisterCounter("flow.emerg_mode_entered", t,
+            SC_PERF_TYPE_UINT64, "NULL");
+    ftd->flow_emerg_mode_over = SCPerfTVRegisterCounter("flow.emerg_mode_over", t,
+            SC_PERF_TYPE_UINT64, "NULL");
+
+    PacketPoolInit();
+    return TM_ECODE_OK;
+}
+
+static TmEcode FlowManagerThreadDeinit(ThreadVars *t, void *data)
+{
+    SCFree(data);
+    return TM_ECODE_OK;
+}
+
+
 /** \brief Thread that manages the flow table and times out flows.
  *
  *  \param td ThreadVars casted to void ptr
  *
  *  Keeps an eye on the spare list, alloc flows if needed...
  */
-void *FlowManagerThread(void *td)
+static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
 {
-    /* block usr1.  usr1 to be handled by the main thread only */
+    /* block usr2.  usr1 to be handled by the main thread only */
     UtilSignalBlock(SIGUSR2);
 
-    ThreadVars *th_v = (ThreadVars *)td;
+    FlowManagerThreadData *ftd = thread_data;
     struct timeval ts;
     uint32_t established_cnt = 0, new_cnt = 0, closing_cnt = 0;
     int emerg = FALSE;
@@ -394,53 +489,10 @@ void *FlowManagerThread(void *td)
             SC_PERF_TYPE_Q_NORMAL,
             "NULL");
 */
-    uint16_t flow_mgr_cnt_clo = SCPerfTVRegisterCounter("flow_mgr.closed_pruned", th_v,
-            SC_PERF_TYPE_UINT64,
-            "NULL");
-    uint16_t flow_mgr_cnt_new = SCPerfTVRegisterCounter("flow_mgr.new_pruned", th_v,
-            SC_PERF_TYPE_UINT64,
-            "NULL");
-    uint16_t flow_mgr_cnt_est = SCPerfTVRegisterCounter("flow_mgr.est_pruned", th_v,
-            SC_PERF_TYPE_UINT64,
-            "NULL");
-    uint16_t flow_mgr_memuse = SCPerfTVRegisterCounter("flow.memuse", th_v,
-            SC_PERF_TYPE_UINT64,
-            "NULL");
-    uint16_t flow_mgr_spare = SCPerfTVRegisterCounter("flow.spare", th_v,
-            SC_PERF_TYPE_UINT64,
-            "NULL");
-    uint16_t flow_emerg_mode_enter = SCPerfTVRegisterCounter("flow.emerg_mode_entered", th_v,
-            SC_PERF_TYPE_UINT64,
-            "NULL");
-    uint16_t flow_emerg_mode_over = SCPerfTVRegisterCounter("flow.emerg_mode_over", th_v,
-            SC_PERF_TYPE_UINT64,
-            "NULL");
-
-    if (th_v->thread_setup_flags != 0)
-        TmThreadSetupOptions(th_v);
-
     memset(&ts, 0, sizeof(ts));
 
-    FlowForceReassemblySetup(g_detect_disabled);
-
-    /* set the thread name */
-    if (SCSetThreadName(th_v->name) < 0) {
-        SCLogWarning(SC_ERR_THREAD_INIT, "Unable to set thread name");
-    } else {
-        SCLogDebug("%s started...", th_v->name);
-    }
-
-    th_v->sc_perf_pca = SCPerfGetAllCountersArray(&th_v->sc_perf_pctx);
-    SCPerfAddToClubbedTMTable(th_v->name, &th_v->sc_perf_pctx);
-
-    /* Set the threads capability */
-    th_v->cap_flags = 0;
-    SCDropCaps(th_v);
-    PacketPoolInit();
-
     FlowHashDebugInit();
 
-    TmThreadsSetFlag(th_v, THV_INIT_DONE);
     while (1)
     {
         if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {
@@ -457,7 +509,7 @@ void *FlowManagerThread(void *td)
 
                 SCLogDebug("Flow emergency mode entered...");
 
-                SCPerfCounterIncr(flow_emerg_mode_enter, th_v->sc_perf_pca);
+                SCPerfCounterIncr(ftd->flow_emerg_mode_enter, th_v->sc_perf_pca);
             }
         }
 
@@ -472,16 +524,19 @@ void *FlowManagerThread(void *td)
         }
 
         /* see if we still have enough spare flows */
-        FlowUpdateSpareFlows();
+        if (ftd->instance == 1)
+            FlowUpdateSpareFlows();
 
         /* try to time out flows */
         FlowTimeoutCounters counters = { 0, 0, 0, };
-        FlowTimeoutHash(&ts, 0 /* check all */, &counters);
+        FlowTimeoutHash(&ts, 0 /* check all */, ftd->min, ftd->max, &counters);
 
 
-        DefragTimeoutHash(&ts);
-        //uint32_t hosts_pruned =
-        HostTimeoutHash(&ts);
+        if (ftd->instance == 1) {
+            DefragTimeoutHash(&ts);
+            //uint32_t hosts_pruned =
+            HostTimeoutHash(&ts);
+        }
 /*
         SCPerfCounterAddUI64(flow_mgr_host_prune, th_v->sc_perf_pca, (uint64_t)hosts_pruned);
         uint32_t hosts_active = HostGetActiveCount();
@@ -489,17 +544,17 @@ void *FlowManagerThread(void *td)
         uint32_t hosts_spare = HostGetSpareCount();
         SCPerfCounterSetUI64(flow_mgr_host_spare, th_v->sc_perf_pca, (uint64_t)hosts_spare);
 */
-        SCPerfCounterAddUI64(flow_mgr_cnt_clo, th_v->sc_perf_pca, (uint64_t)counters.clo);
-        SCPerfCounterAddUI64(flow_mgr_cnt_new, th_v->sc_perf_pca, (uint64_t)counters.new);
-        SCPerfCounterAddUI64(flow_mgr_cnt_est, th_v->sc_perf_pca, (uint64_t)counters.est);
+        SCPerfCounterAddUI64(ftd->flow_mgr_cnt_clo, th_v->sc_perf_pca, (uint64_t)counters.clo);
+        SCPerfCounterAddUI64(ftd->flow_mgr_cnt_new, th_v->sc_perf_pca, (uint64_t)counters.new);
+        SCPerfCounterAddUI64(ftd->flow_mgr_cnt_est, th_v->sc_perf_pca, (uint64_t)counters.est);
         long long unsigned int flow_memuse = SC_ATOMIC_GET(flow_memuse);
-        SCPerfCounterSetUI64(flow_mgr_memuse, th_v->sc_perf_pca, (uint64_t)flow_memuse);
+        SCPerfCounterSetUI64(ftd->flow_mgr_memuse, th_v->sc_perf_pca, (uint64_t)flow_memuse);
 
         uint32_t len = 0;
         FQLOCK_LOCK(&flow_spare_q);
         len = flow_spare_q.len;
         FQLOCK_UNLOCK(&flow_spare_q);
-        SCPerfCounterSetUI64(flow_mgr_spare, th_v->sc_perf_pca, (uint64_t)len);
+        SCPerfCounterSetUI64(ftd->flow_mgr_spare, th_v->sc_perf_pca, (uint64_t)len);
 
         /* Don't fear, FlowManagerThread is here...
          * clear emergency bit if we have at least xx flows pruned. */
@@ -523,7 +578,7 @@ void *FlowManagerThread(void *td)
                           "%% flows at the queue", (uintmax_t)ts.tv_sec,
                           (uintmax_t)ts.tv_usec, len * 100 / flow_config.prealloc);
 
-                SCPerfCounterIncr(flow_emerg_mode_over, th_v->sc_perf_pca);
+                SCPerfCounterIncr(ftd->flow_emerg_mode_over, th_v->sc_perf_pca);
             } else {
                 flow_update_delay_sec = FLOW_EMERG_MODE_UPDATE_DELAY_SEC;
                 flow_update_delay_nsec = FLOW_EMERG_MODE_UPDATE_DELAY_NSEC;
@@ -547,42 +602,55 @@ void *FlowManagerThread(void *td)
         SCPerfSyncCountersIfSignalled(th_v);
     }
 
-    TmThreadsSetFlag(th_v, THV_RUNNING_DONE);
-    TmThreadWaitForFlag(th_v, THV_DEINIT);
-
     FlowHashDebugDeinit();
 
     SCLogInfo("%" PRIu32 " new flows, %" PRIu32 " established flows were "
               "timed out, %"PRIu32" flows in closed state", new_cnt,
               established_cnt, closing_cnt);
 
-    TmThreadsSetFlag(th_v, THV_CLOSED);
-    pthread_exit((void *) 0);
-    return NULL;
+    return TM_ECODE_OK;
 }
 
 /** \brief spawn the flow manager thread */
 void FlowManagerThreadSpawn()
 {
-    ThreadVars *tv_flowmgr = NULL;
+    intmax_t setting = 1;
+    (void)ConfGetInt("flow.managers", &setting);
+
+    if (setting < 1 || setting > 1024) {
+        SCLogError(SC_ERR_INVALID_ARGUMENTS,
+                "invalid flow.managers setting %"PRIdMAX, setting);
+        exit(EXIT_FAILURE);
+    }
+    flowmgr_number = (uint32_t)setting;
 
+    SCLogInfo("using %u flow manager threads", flowmgr_number);
+    FlowForceReassemblySetup(g_detect_disabled);
     SCCtrlCondInit(&flow_manager_ctrl_cond, NULL);
     SCCtrlMutexInit(&flow_manager_ctrl_mutex, NULL);
 
-    tv_flowmgr = TmThreadCreateMgmtThread("FlowManagerThread",
-                                          FlowManagerThread, 0);
+    uint32_t u;
+    for (u = 0; u < flowmgr_number; u++) {
+        ThreadVars *tv_flowmgr = NULL;
 
-    TmThreadSetCPU(tv_flowmgr, MANAGEMENT_CPU_SET);
+        char name[32] = "";
+        snprintf(name, sizeof(name), "FlowManagerThread%02u", u+1);
 
-    if (tv_flowmgr == NULL) {
-        printf("ERROR: TmThreadsCreate failed\n");
-        exit(1);
-    }
-    if (TmThreadSpawn(tv_flowmgr) != TM_ECODE_OK) {
-        printf("ERROR: TmThreadSpawn failed\n");
-        exit(1);
-    }
+        tv_flowmgr = TmThreadCreateMgmtThreadByName("FlowManagerThread",
+                "FlowManager", 0);
+        BUG_ON(tv_flowmgr == NULL);
 
+        TmThreadSetCPU(tv_flowmgr, MANAGEMENT_CPU_SET);
+
+        if (tv_flowmgr == NULL) {
+            printf("ERROR: TmThreadsCreate failed\n");
+            exit(1);
+        }
+        if (TmThreadSpawn(tv_flowmgr) != TM_ECODE_OK) {
+            printf("ERROR: TmThreadSpawn failed\n");
+            exit(1);
+        }
+    }
     return;
 }
 
@@ -783,6 +851,18 @@ void FlowKillFlowRecyclerThread(void)
     return;
 }
 
+void TmModuleFlowManagerRegister (void)
+{
+    tmm_modules[TMM_FLOWMANAGER].name = "FlowManager";
+    tmm_modules[TMM_FLOWMANAGER].ThreadInit = FlowManagerThreadInit;
+    tmm_modules[TMM_FLOWMANAGER].ThreadDeinit = FlowManagerThreadDeinit;
+//    tmm_modules[TMM_FLOWMANAGER].RegisterTests = FlowManagerRegisterTests;
+    tmm_modules[TMM_FLOWMANAGER].Management = FlowManager;
+    tmm_modules[TMM_FLOWMANAGER].cap_flags = 0;
+    tmm_modules[TMM_FLOWMANAGER].flags = TM_FLAG_MANAGEMENT_TM;
+    SCLogDebug("%s registered", tmm_modules[TMM_FLOWMANAGER].name);
+}
+
 #ifdef UNITTESTS
 
 /**
@@ -1033,7 +1113,7 @@ static int FlowMgrTest05 (void) {
     TimeGet(&ts);
     /* try to time out flows */
     FlowTimeoutCounters counters = { 0, 0, 0, };
-    FlowTimeoutHash(&ts, 0 /* check all */, &counters);
+    FlowTimeoutHash(&ts, 0 /* check all */, 0, flow_config.hash_size, &counters);
 
     if (flow_recycle_q.len > 0) {
         result = 1;
index 3ad5c9b63adaef15fe96eb80cdc168d40c466878..2999b4de4d74632f04d19d125ebbcc287cfdeb78 100644 (file)
@@ -42,4 +42,6 @@ SCCtrlMutex flow_recycler_ctrl_mutex;
 void FlowRecyclerThreadSpawn(void);
 void FlowKillFlowRecyclerThread(void);
 
+void TmModuleFlowManagerRegister (void);
+
 #endif /* __FLOW_MANAGER_H__ */
index 9be6671abd6c5070252ed290326eb6fad974f7d2..0f33e10999220dbefaa0a9c478e11d16f623c16c 100644 (file)
@@ -793,6 +793,8 @@ int g_ut_covered;
 
 void RegisterAllModules()
 {
+    /* managers */
+    TmModuleFlowManagerRegister();
     /* nfq */
     TmModuleReceiveNFQRegister();
     TmModuleVerdictNFQRegister();
index 2b2d242b4332e739e019bc0e6e49c340d5b7490d..b52a0daa73fa4acb3c331593e0300db750218283 100644 (file)
@@ -255,6 +255,8 @@ const char * TmModuleTmmIdToString(TmmId id)
         CASE_CODE (TMM_JSONSSHLOG);
         CASE_CODE (TMM_JSONTLSLOG);
         CASE_CODE (TMM_OUTPUTJSON);
+        CASE_CODE (TMM_FLOWMANAGER);
+        CASE_CODE (TMM_FLOWRECYCLER);
 
         CASE_CODE (TMM_SIZE);
     }
index 7514cdfbf94df0eb822507d35254e714fbd78c56..de869747bad729bd9431ab4fab5939edf30642f0 100644 (file)
@@ -93,6 +93,10 @@ typedef enum {
     TMM_DECODENFLOG,
     TMM_JSONFLOWLOG,
     TMM_JSONNETFLOWLOG,
+
+    TMM_FLOWMANAGER,
+    TMM_FLOWRECYCLER,
+
     TMM_SIZE,
 } TmmId;
 
index 9405003eeb104221dcdbc2e43fa0174b1484d1b5..682db04daff24d9f4dae74a5705c530706e9661e 100644 (file)
@@ -623,6 +623,7 @@ flow:
   hash-size: 65536
   prealloc: 10000
   emergency-recovery: 30
+  #managers: 1 # default to one flow manager
 
 # This option controls the use of vlan ids in the flow (and defrag)
 # hashing. Normally this should be enabled, but in some (broken)