]> git.ipfire.org Git - thirdparty/kea.git/commitdiff
[#3142] fixed deadlock
authorRazvan Becheriu <razvan@isc.org>
Sat, 18 Nov 2023 16:43:36 +0000 (18:43 +0200)
committerRazvan Becheriu <razvan@isc.org>
Mon, 20 Nov 2023 05:39:03 +0000 (07:39 +0200)
src/lib/util/tests/thread_pool_unittest.cc
src/lib/util/thread_pool.h

index 9c367c914c9f2124140b8f0775acdb8804023e34..4fa88d9265463c1d7a6b8c878d7ec0778d63cd1d 100644 (file)
@@ -526,7 +526,7 @@ TEST_F(ThreadPoolTest, wait) {
     checkState(thread_pool, 0, 0);
 
     items_count = 16;
-    thread_count = 16;
+    thread_count = 256;
     // prepare setup
     reset(thread_count);
 
@@ -546,8 +546,8 @@ TEST_F(ThreadPoolTest, wait) {
 
     // calling stop should clear all threads and should keep queued items
     EXPECT_NO_THROW(thread_pool.stop());
-    // the thread count should be 0
-    ASSERT_EQ(thread_pool.size(), 0);
+    checkState(thread_pool, 0, 0);
+
     // wait for all items to be processed
     ASSERT_TRUE(thread_pool.wait(1));
     checkState(thread_pool, 0, 0);
index 7a090a60a803b80ff328c250c251d8804fe31ed5..6807f530ab811256dc4183dbd593aec1d124bd5f 100644 (file)
@@ -278,7 +278,7 @@ private:
         /// Creates the thread pool queue in 'disabled' state
         ThreadPoolQueue()
             : enabled_(false), paused_(false), max_queue_size_(0), working_(0),
-              stat10(0.), stat100(0.), stat1000(0.) {
+              unavailable_(0), stat10(0.), stat100(0.), stat1000(0.) {
         }
 
         /// @brief Destructor
@@ -289,6 +289,20 @@ private:
             clear();
         }
 
+        /// @brief register thread so that it can be taken into account
+        void registerThread() {
+            std::lock_guard<std::mutex> lock(mutex_);
+            ++working_;
+            --unavailable_;
+        }
+
+        /// @brief unregister thread so that it can be ignored
+        void unregisterThread() {
+            std::lock_guard<std::mutex> lock(mutex_);
+            --working_;
+            ++unavailable_;
+        }
+
         /// @brief set maximum number of work items in the queue
         ///
         /// @return the maximum size (0 means unlimited)
@@ -377,7 +391,7 @@ private:
             std::unique_lock<std::mutex> lock(mutex_);
             --working_;
             // Signal thread waiting for threads to pause.
-            if (working_ == 0 && paused_) {
+            if (paused_ && working_ == 0 && unavailable_ == 0) {
                 wait_threads_cv_.notify_all();
             }
             // Signal thread waiting for tasks to finish.
@@ -386,10 +400,10 @@ private:
             }
             // Wait for push or disable functions.
             cv_.wait(lock, [&]() {return (!enabled_ || (!queue_.empty() && !paused_));});
+            ++working_;
             if (!enabled_) {
                 return (Item());
             }
-            ++working_;
             size_t length = queue_.size();
             stat10 = stat10 * CEXP10 + (1 - CEXP10) * length;
             stat100 = stat100 * CEXP100 + (1 - CEXP100) * length;
@@ -444,7 +458,7 @@ private:
             paused_ = true;
             if (wait) {
                 // Wait for working threads to finish.
-                wait_threads_cv_.wait(lock, [&]() {return (working_ == 0);});
+                wait_threads_cv_.wait(lock, [&]() {return (working_ == 0 && unavailable_ == 0);});
             }
         }
 
@@ -493,7 +507,7 @@ private:
         void enable(uint32_t thread_count) {
             std::lock_guard<std::mutex> lock(mutex_);
             enabled_ = true;
-            working_ = thread_count;
+            unavailable_ = thread_count;
         }
 
         /// @brief disable the queue
@@ -562,6 +576,9 @@ private:
         /// @brief number of threads currently doing work
         uint32_t working_;
 
+        /// @brief number of threads not running
+        uint32_t unavailable_;
+
         /// @brief queue length statistic for 10 packets
         double stat10;
 
@@ -574,6 +591,7 @@ private:
 
     /// @brief run function of each thread
     void run() {
+        queue_.registerThread();
         for (bool work = true; work; work = queue_.enabled()) {
             WorkItemPtr item = queue_.pop();
             if (item) {
@@ -584,6 +602,7 @@ private:
                 }
             }
         }
+        queue_.unregisterThread();
     }
 
     /// @brief list of worker threads