]> git.ipfire.org Git - thirdparty/kea.git/commitdiff
[#991] added thread pool wait function
authorRazvan Becheriu <razvan@isc.org>
Wed, 9 Dec 2020 10:07:29 +0000 (12:07 +0200)
committerRazvan Becheriu <razvan@isc.org>
Thu, 10 Dec 2020 19:37:50 +0000 (19:37 +0000)
src/bin/dhcp4/tests/client_handler_unittest.cc
src/bin/dhcp4/tests/dhcp4_client.cc
src/bin/dhcp4/tests/dhcp4_test_utils.h
src/bin/dhcp6/tests/client_handler_unittest.cc
src/lib/util/thread_pool.h

index 51721f34afc69777a6d2f841796dd10ec37d9eb2..7edebfce765ce0993b5f4c35e60b725e269eeaa0 100644 (file)
@@ -85,9 +85,7 @@ public:
 
     /// @brief Waits for pending continuations.
     void waitForThreads() {
-        while (MultiThreadingMgr::instance().getThreadPool().count() > 0) {
-            usleep(100);
-        }
+        MultiThreadingMgr::instance().getThreadPool().wait();
     }
 
     /// @brief Set called1_ to true.
index a92d9e75f27386cec34a4eec70584f9d64f6032e..e26a9312daaca2913eb5c9bfe6f4b2c1d78afb2f 100644 (file)
@@ -563,11 +563,8 @@ Dhcp4Client::sendMsg(const Pkt4Ptr& msg) {
         // Suppress errors, as the DHCPv4 server does.
     }
 
-    // make sure the server processed all packets in MT.
-    while (isc::util::MultiThreadingMgr::instance().getThreadPool().count()) {
-        usleep(100);
-    }
-    isc::util::MultiThreadingCriticalSection cs;
+    // Make sure the server processed all packets in MT.
+    isc::util::MultiThreadingMgr::instance().getThreadPool().wait();
 }
 
 void
index 533f0a181cba2c74b3b7fe268b7d25d9fa13c602..15a6caa588d400c6c9d528abedeb45be530d8a10 100644 (file)
@@ -149,13 +149,9 @@ public:
             return (pkt);
         }
 
-        // make sure the server processed all packets in MT.
-        while (isc::util::MultiThreadingMgr::instance().getThreadPool().count()) {
-            usleep(100);
-        }
-        {
-            isc::util::MultiThreadingCriticalSection cs;
-        }
+        // Make sure the server processed all packets in MT.
+        isc::util::MultiThreadingMgr::instance().getThreadPool().wait();
+
         // If not, just trigger shutdown and return immediately
         shutdown();
         return (Pkt4Ptr());
index 3a7a9669f9405f9d8aefcc8448cc4eaf4bdb991d..16619ff905fe1f8c8268871dfd964c1c6a532c42 100644 (file)
@@ -70,9 +70,7 @@ public:
 
     /// @brief Waits for pending continuations.
     void waitForThreads() {
-        while (MultiThreadingMgr::instance().getThreadPool().count() > 0) {
-            usleep(100);
-        }
+        MultiThreadingMgr::instance().getThreadPool().wait();
     }
 
     /// @brief Set called1_ to true.
index 8cf9f1c0d92653c3dfda406a9b4db4744ef38a18..71ce8ed73dcc4c8a9cbc7e23141de88ea655ae95 100644 (file)
@@ -110,6 +110,18 @@ struct ThreadPool {
         return (queue_.count());
     }
 
+    /// @brief wait for current items to be processed
+    ///
+    /// Used to block the calling thread until all items in the queue have
+    /// been processed
+    void wait() {
+        auto id = std::this_thread::get_id();
+        if (checkThreadId(id)) {
+            isc_throw(InvalidOperation, "thread pool stop called by owned thread");
+        }
+        queue_.wait();
+    }
+
     /// @brief set maximum number of work items in the queue
     ///
     /// @param max_queue_size the maximum size (0 means unlimited)
@@ -146,7 +158,7 @@ private:
     /// @param thread_count specifies the number of threads to be created and
     /// started
     void startInternal(uint32_t thread_count) {
-        queue_.enable();
+        queue_.enable(thread_count);
         for (uint32_t i = 0; i < thread_count; ++i) {
             threads_.push_back(boost::make_shared<std::thread>(&ThreadPool::run, this));
         }
@@ -194,7 +206,7 @@ private:
         ///
         /// Creates the thread pool queue in 'disabled' state
         ThreadPoolQueue()
-            : enabled_(false), max_queue_size_(0),
+            : enabled_(false), max_queue_size_(0), working_(0),
               stat10(0.), stat100(0.), stat1000(0.) {
         }
 
@@ -290,11 +302,16 @@ private:
         /// @return the first work item from the queue or an empty element.
         Item pop() {
             std::unique_lock<std::mutex> lock(mutex_);
+            --working_;
             // Wait for push or disable functions.
+            if (working_ == 0 && queue_.empty()) {
+                wait_cv_.notify_all();
+            }
             cv_.wait(lock, [&]() {return (!enabled_ || !queue_.empty());});
             if (!enabled_) {
                 return (Item());
             }
+            ++working_;
             size_t length = queue_.size();
             stat10 = stat10 * CEXP10 + (1 - CEXP10) * length;
             stat100 = stat100 * CEXP100 + (1 - CEXP100) * length;
@@ -314,7 +331,17 @@ private:
             return (queue_.size());
         }
 
-        /// @brief  get queue length statistic
+        /// @brief wait for current items to be processed
+        ///
+        /// Used to block the calling thread until all items in the queue have
+        /// been processed
+        void wait() {
+            std::unique_lock<std::mutex> lock(mutex_);
+            // Wait for any item or for working threads to finish.
+            wait_cv_.wait(lock, [&]() {return (working_ == 0 && queue_.empty());});
+        }
+
+        /// @brief get queue length statistic
         ///
         /// @param which select the statistic (10, 100 or 1000)
         /// @return the queue length statistic
@@ -345,9 +372,12 @@ private:
         /// @brief enable the queue
         ///
         /// Sets the queue state to 'enabled'
-        void enable() {
+        ///
+        /// @param number of working threads
+        void enable(uint32_t thread_count) {
             std::lock_guard<std::mutex> lock(mutex_);
             enabled_ = true;
+            working_ = thread_count;
         }
 
         /// @brief disable the queue
@@ -381,6 +411,9 @@ private:
         /// @brief condition variable used to signal waiting threads
         std::condition_variable cv_;
 
+        /// @brief condition variable used to wait for all items to be processed
+        std::condition_variable wait_cv_;
+
         /// @brief the sate of the queue
         /// The 'enabled' state corresponds to true value
         /// The 'disabled' state corresponds to false value
@@ -390,6 +423,9 @@ private:
         /// (0 means unlimited)
         size_t max_queue_size_;
 
+        /// @brief number of threads currently doing work
+        uint32_t working_;
+
         /// @brief queue length statistic for 10 packets
         double stat10;