]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
flow: introduce FlowRecycler stub
authorVictor Julien <victor@inliniac.net>
Wed, 30 Apr 2014 11:37:30 +0000 (13:37 +0200)
committerVictor Julien <victor@inliniac.net>
Mon, 28 Jul 2014 13:47:44 +0000 (15:47 +0200)
FlowRecycler thread stub. Start/stop code.

src/flow-manager.c
src/flow-manager.h
src/suricata.c

index 71bf2e0a49369d3ecd31384a87360a28a93b9711..415106944e6c7776977c40913ec7939bd2263c8d 100644 (file)
@@ -591,6 +591,150 @@ void FlowManagerThreadSpawn()
     return;
 }
 
+/** \brief Thread that manages timed out flows.
+ *
+ *  \param td ThreadVars casted to void ptr
+ */
+void *FlowRecyclerThread(void *td)
+{
+    /* block usr2. usr2 to be handled by the main thread only */
+    UtilSignalBlock(SIGUSR2);
+
+    ThreadVars *th_v = (ThreadVars *)td;
+    struct timeval ts;
+    struct timespec cond_time;
+    int flow_update_delay_sec = FLOW_NORMAL_MODE_UPDATE_DELAY_SEC;
+    int flow_update_delay_nsec = FLOW_NORMAL_MODE_UPDATE_DELAY_NSEC;
+
+    if (th_v->thread_setup_flags != 0)
+        TmThreadSetupOptions(th_v);
+
+    memset(&ts, 0, sizeof(ts));
+
+    /* set the thread name */
+    if (SCSetThreadName(th_v->name) < 0) {
+        SCLogWarning(SC_ERR_THREAD_INIT, "Unable to set thread name");
+    } else {
+        SCLogDebug("%s started...", th_v->name);
+    }
+
+    /* Set the threads capability */
+    th_v->cap_flags = 0;
+    SCDropCaps(th_v);
+
+    TmThreadsSetFlag(th_v, THV_INIT_DONE);
+    while (1)
+    {
+        if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {
+            TmThreadsSetFlag(th_v, THV_PAUSED);
+            TmThreadTestThreadUnPaused(th_v);
+            TmThreadsUnsetFlag(th_v, THV_PAUSED);
+        }
+
+        /* Get the time */
+        memset(&ts, 0, sizeof(ts));
+        TimeGet(&ts);
+        SCLogDebug("ts %" PRIdMAX "", (intmax_t)ts.tv_sec);
+
+        uint32_t len = 0;
+        FQLOCK_LOCK(&flow_recycle_q);
+        len = flow_recycle_q.len;
+        FQLOCK_UNLOCK(&flow_recycle_q);
+
+        SCLogDebug("%u flows to recycle", len);
+
+        if (TmThreadsCheckFlag(th_v, THV_KILL)) {
+            SCPerfSyncCounters(th_v);
+            break;
+        }
+
+        cond_time.tv_sec = time(NULL) + flow_update_delay_sec;
+        cond_time.tv_nsec = flow_update_delay_nsec;
+        SCCtrlMutexLock(&flow_recycler_ctrl_mutex);
+        SCCtrlCondTimedwait(&flow_recycler_ctrl_cond,
+                &flow_recycler_ctrl_mutex, &cond_time);
+        SCCtrlMutexUnlock(&flow_recycler_ctrl_mutex);
+
+        SCLogDebug("woke up...");
+
+        SCPerfSyncCountersIfSignalled(th_v);
+    }
+
+    TmThreadsSetFlag(th_v, THV_RUNNING_DONE);
+    TmThreadWaitForFlag(th_v, THV_DEINIT);
+
+    TmThreadsSetFlag(th_v, THV_CLOSED);
+    pthread_exit((void *) 0);
+    return NULL;
+}
+
+/** \brief spawn the flow recycler thread */
+void FlowRecyclerThreadSpawn()
+{
+    ThreadVars *tv_flowmgr = NULL;
+
+    SCCtrlCondInit(&flow_recycler_ctrl_cond, NULL);
+    SCCtrlMutexInit(&flow_recycler_ctrl_mutex, NULL);
+
+    tv_flowmgr = TmThreadCreateMgmtThread("FlowRecyclerThread",
+                                          FlowRecyclerThread, 0);
+
+    TmThreadSetCPU(tv_flowmgr, MANAGEMENT_CPU_SET);
+
+    if (tv_flowmgr == NULL) {
+        printf("ERROR: TmThreadsCreate failed\n");
+        exit(1);
+    }
+    if (TmThreadSpawn(tv_flowmgr) != TM_ECODE_OK) {
+        printf("ERROR: TmThreadSpawn failed\n");
+        exit(1);
+    }
+
+    return;
+}
+
+/**
+ * \brief Used to kill flow recycler thread(s).
+ *
+ * \todo Kinda hackish since it uses the tv name to identify flow recycler
+ *       thread.  We need an all weather identification scheme.
+ */
+void FlowKillFlowRecyclerThread(void)
+{
+    ThreadVars *tv = NULL;
+    int cnt = 0;
+
+    SCCtrlCondSignal(&flow_recycler_ctrl_cond);
+
+    SCMutexLock(&tv_root_lock);
+
+    /* flow manager thread(s) is/are a part of mgmt threads */
+    tv = tv_root[TVT_MGMT];
+
+    while (tv != NULL) {
+        if (strcasecmp(tv->name, "FlowRecyclerThread") == 0) {
+            TmThreadsSetFlag(tv, THV_KILL);
+            TmThreadsSetFlag(tv, THV_DEINIT);
+
+            /* be sure it has shut down */
+            while (!TmThreadsCheckFlag(tv, THV_CLOSED)) {
+                usleep(100);
+            }
+            cnt++;
+        }
+        tv = tv->next;
+    }
+
+    /* not possible, unless someone decides to rename FlowManagerThread */
+    if (cnt == 0) {
+        SCMutexUnlock(&tv_root_lock);
+        abort();
+    }
+
+    SCMutexUnlock(&tv_root_lock);
+    return;
+}
+
 #ifdef UNITTESTS
 
 /**
index 090e74fbc38ab01c27d3b9e955300d83faa77df2..3ad5c9b63adaef15fe96eb80cdc168d40c466878 100644 (file)
@@ -33,4 +33,13 @@ void FlowManagerThreadSpawn(void);
 void FlowKillFlowManagerThread(void);
 void FlowMgrRegisterTests (void);
 
+/** flow recycler scheduling condition */
+SCCtrlCondT flow_recycler_ctrl_cond;
+SCCtrlMutex flow_recycler_ctrl_mutex;
+#define FlowWakeupFlowRecyclerThread() \
+    SCCtrlCondSignal(&flow_recycler_ctrl_cond)
+
+void FlowRecyclerThreadSpawn(void);
+void FlowKillFlowRecyclerThread(void);
+
 #endif /* __FLOW_MANAGER_H__ */
index 5ec53a4366ece6eddd5441c10f1446b8aa83ab6c..85d0019f414b258cba952576a56f8f5c7d44d5ec 100644 (file)
@@ -2282,6 +2282,7 @@ int main(int argc, char **argv)
         }
         /* Spawn the flow manager thread */
         FlowManagerThreadSpawn();
+        FlowRecyclerThreadSpawn();
         StreamTcpInitConfig(STREAM_VERBOSE);
 
         SCPerfSpawnThreads();
@@ -2400,6 +2401,7 @@ int main(int argc, char **argv)
 
     if (suri.run_mode != RUNMODE_UNIX_SOCKET) {
         SCPerfReleaseResources();
+        FlowKillFlowRecyclerThread();
         FlowShutdown();
         StreamTcpFreeConfig(STREAM_VERBOSE);
     }