]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
flow/recycler: bring back pthread_cond_t sleep
authorVictor Julien <victor@inliniac.net>
Thu, 11 Nov 2021 07:34:43 +0000 (08:34 +0100)
committerVictor Julien <vjulien@oisf.net>
Mon, 5 Sep 2022 09:59:45 +0000 (11:59 +0200)
Bug #4379.

(cherry picked from commit f271fb457522d77a1befeb1d097c125afcbdeeb9)

src/flow-manager.c
src/flow-manager.h

index d003e36076516873923cb8b8a952909945f73e4c..71e15828dbfb9c13059258fcbcaeb02faa408381 100644 (file)
@@ -90,6 +90,8 @@ SC_ATOMIC_EXTERN(unsigned int, flow_flags);
 
 SCCtrlCondT flow_manager_ctrl_cond;
 SCCtrlMutex flow_manager_ctrl_mutex;
+SCCtrlCondT flow_recycler_ctrl_cond;
+SCCtrlMutex flow_recycler_ctrl_mutex;
 
 void FlowTimeoutsInit(void)
 {
@@ -292,11 +294,13 @@ static uint32_t ProcessAsideQueue(FlowManagerTimeoutThread *td, FlowTimeoutCount
         FlowQueuePrivateAppendFlow(&recycle, f);
         if (recycle.len == 100) {
             FlowQueueAppendPrivate(&flow_recycle_q, &recycle);
+            FlowWakeupFlowRecyclerThread();
         }
         cnt++;
     }
     if (recycle.len) {
         FlowQueueAppendPrivate(&flow_recycle_q, &recycle);
+        FlowWakeupFlowRecyclerThread();
     }
     return cnt;
 }
@@ -566,9 +570,11 @@ static uint32_t FlowCleanupHash(void)
         FBLOCK_UNLOCK(fb);
         if (local_queue.len >= 25) {
             FlowQueueAppendPrivate(&flow_recycle_q, &local_queue);
+            FlowWakeupFlowRecyclerThread();
         }
     }
     FlowQueueAppendPrivate(&flow_recycle_q, &local_queue);
+    FlowWakeupFlowRecyclerThread();
 
     return cnt;
 }
@@ -1093,11 +1099,11 @@ static TmEcode FlowRecyclerThreadDeinit(ThreadVars *t, void *data)
  */
 static TmEcode FlowRecycler(ThreadVars *th_v, void *thread_data)
 {
-    struct timeval ts;
-    uint64_t recycled_cnt = 0;
     FlowRecyclerThreadData *ftd = (FlowRecyclerThreadData *)thread_data;
     BUG_ON(ftd == NULL);
-
+    const bool time_is_live = TimeModeIsLive();
+    uint64_t recycled_cnt = 0;
+    struct timeval ts;
     memset(&ts, 0, sizeof(ts));
     uint32_t fr_passes = 0;
 
@@ -1176,21 +1182,30 @@ static TmEcode FlowRecycler(ThreadVars *th_v, void *thread_data)
             break;
         }
 
-#ifdef FM_PROFILE
-        struct timeval sleep_startts;
-        memset(&sleep_startts, 0, sizeof(sleep_startts));
-        gettimeofday(&sleep_startts, NULL);
-#endif
-        usleep(250);
-#ifdef FM_PROFILE
-        struct timeval sleep_endts;
-        memset(&sleep_endts, 0, sizeof(sleep_endts));
-        gettimeofday(&sleep_endts, NULL);
-        struct timeval sleep_time;
-        memset(&sleep_time, 0, sizeof(sleep_time));
-        timersub(&sleep_endts, &sleep_startts, &sleep_time);
-        timeradd(&sleeping, &sleep_time, &sleeping);
-#endif
+        const bool emerg = (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY);
+        if (emerg || !time_is_live) {
+            usleep(250);
+        } else {
+            struct timeval cond_tv;
+            gettimeofday(&cond_tv, NULL);
+            cond_tv.tv_sec += 1;
+            struct timespec cond_time = FROM_TIMEVAL(cond_tv);
+            SCCtrlMutexLock(&flow_recycler_ctrl_mutex);
+            while (1) {
+                int rc = SCCtrlCondTimedwait(
+                        &flow_recycler_ctrl_cond, &flow_recycler_ctrl_mutex, &cond_time);
+                if (rc == ETIMEDOUT || rc < 0) {
+                    break;
+                }
+                if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) {
+                    break;
+                }
+                if (SC_ATOMIC_GET(flow_recycle_q.non_empty) == true) {
+                    break;
+                }
+            }
+            SCCtrlMutexUnlock(&flow_recycler_ctrl_mutex);
+        }
 
         SCLogDebug("woke up...");
 
@@ -1239,6 +1254,9 @@ void FlowRecyclerThreadSpawn()
     }
     flowrec_number = (uint32_t)setting;
 
+    SCCtrlCondInit(&flow_recycler_ctrl_cond, NULL);
+    SCCtrlMutexInit(&flow_recycler_ctrl_mutex, NULL);
+
     SCLogConfig("using %u flow recycler threads", flowrec_number);
 
     for (uint32_t u = 0; u < flowrec_number; u++) {
@@ -1280,6 +1298,7 @@ void FlowDisableFlowRecyclerThread(void)
 
     /* make sure all flows are processed */
     do {
+        FlowWakeupFlowRecyclerThread();
         usleep(10);
     } while (FlowRecyclerReadyToShutdown() == false);
 
@@ -1313,6 +1332,7 @@ again:
         {
             if (!TmThreadsCheckFlag(tv, THV_RUNNING_DONE)) {
                 SCMutexUnlock(&tv_root_lock);
+                FlowWakeupFlowRecyclerThread();
                 /* sleep outside lock */
                 SleepMsec(1);
                 goto again;
index 25ac22dc4e86a13f76b0f43f81f5598508f8a891..157358d170f59dca64d62eadff845a6475103150 100644 (file)
@@ -28,6 +28,9 @@
 extern SCCtrlCondT flow_manager_ctrl_cond;
 extern SCCtrlMutex flow_manager_ctrl_mutex;
 #define FlowWakeupFlowManagerThread() SCCtrlCondSignal(&flow_manager_ctrl_cond)
+extern SCCtrlCondT flow_recycler_ctrl_cond;
+extern SCCtrlMutex flow_recycler_ctrl_mutex;
+#define FlowWakeupFlowRecyclerThread() SCCtrlCondSignal(&flow_recycler_ctrl_cond)
 
 #define FlowTimeoutsReset() FlowTimeoutsInit()
 void FlowTimeoutsInit(void);