]> git.ipfire.org Git - thirdparty/kea.git/commitdiff
[#1599] added pause with wait for threads to stop
authorRazvan Becheriu <razvan@isc.org>
Wed, 25 Aug 2021 06:36:24 +0000 (09:36 +0300)
committerRazvan Becheriu <razvan@isc.org>
Wed, 15 Nov 2023 06:36:55 +0000 (08:36 +0200)
src/lib/util/multi_threading_mgr.cc
src/lib/util/thread_pool.h

index 311b0ab49235687d1c74187240d0c245b56c7560..a48a1b14eed83747af9b5a7bdd6a97c0cdf18d27 100644 (file)
@@ -45,7 +45,11 @@ MultiThreadingMgr::enterCriticalSection() {
     ++critical_section_count_;
     if (getMode() && !inside) {
         if (getThreadPoolSize()) {
-            thread_pool_.stop();
+            // We simply pause without waiting for all tasks to complete.
+            // We could also call pause(false) which does not wait for
+            // threads to stop and wait() so that all tasks are complete
+            // and threads are stopped.
+            thread_pool_.pause();
         }
         // Now it is safe to call callbacks which can also create other CSs.
         callEntryCallbacks();
@@ -67,7 +71,7 @@ MultiThreadingMgr::exitCriticalSection() {
     --critical_section_count_;
     if (getMode() && !isInCriticalSection()) {
         if (getThreadPoolSize()) {
-            thread_pool_.start(getThreadPoolSize());
+            thread_pool_.resume();
         }
         // Now it is safe to call callbacks which can also create other CSs.
         callExitCallbacks();
index 207c265d57e7408ca1367038733d095349ce2bcd..28e66c09d9ccbf8c89bfc7a3658b799da0275ab7 100644 (file)
@@ -143,8 +143,10 @@ struct ThreadPool {
     /// @brief pause threads
     ///
     /// Used to pause threads so that they stop processing tasks
-    void pause() {
-        queue_.pause();
+    ///
+    /// @param wait the flag indicating if should wait for threads to pause.
+    void pause(bool wait = true) {
+        queue_.pause(wait);
     }
 
     /// @brief resume threads
@@ -364,10 +366,15 @@ private:
         Item pop() {
             std::unique_lock<std::mutex> lock(mutex_);
             --working_;
-            // Wait for push or disable functions.
+            // Signal thread waiting for threads to pause.
+            if (working_ == 0 && paused_) {
+                wait_threads_cv_.notify_all();
+            }
+            // Signal thread waiting for tasks to finish.
             if (working_ == 0 && queue_.empty()) {
                 wait_cv_.notify_all();
             }
+            // Wait for push or disable functions.
             cv_.wait(lock, [&]() {return (!enabled_ || !queue_.empty());});
             pause_cv_.wait(lock, [&]() {return (!enabled_ || !paused_);});
             ++working_;
@@ -421,9 +428,15 @@ private:
         /// @brief pause threads
         ///
         /// Used to pause threads so that they stop processing tasks
-        void pause() {
+        ///
+        /// @param wait the flag indicating if should wait for threads to pause.
+        void pause(bool wait) {
             std::unique_lock<std::mutex> lock(mutex_);
             paused_ = true;
+            if (wait) {
+                // Wait for working threads to finish.
+                wait_threads_cv_.wait(lock, [&]() {return (working_ == 0);});
+            }
         }
 
         /// @brief resume threads
@@ -505,6 +518,9 @@ private:
         /// @brief condition variable used to wait for all items to be processed
         std::condition_variable wait_cv_;
 
+        /// @brief condition variable used to wait for all threads to be paused
+        std::condition_variable wait_threads_cv_;
+
         /// @brief condition variable used to pause threads
         std::condition_variable pause_cv_;