From: Razvan Becheriu Date: Wed, 9 Dec 2020 10:07:29 +0000 (+0200) Subject: [#991] added thread pool wait function X-Git-Tag: Kea-1.9.3~58 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=a823c4d7733a4b669f03fe23cd929c283b13d5ff;p=thirdparty%2Fkea.git [#991] added thread pool wait function --- diff --git a/src/bin/dhcp4/tests/client_handler_unittest.cc b/src/bin/dhcp4/tests/client_handler_unittest.cc index 51721f34af..7edebfce76 100644 --- a/src/bin/dhcp4/tests/client_handler_unittest.cc +++ b/src/bin/dhcp4/tests/client_handler_unittest.cc @@ -85,9 +85,7 @@ public: /// @brief Waits for pending continuations. void waitForThreads() { - while (MultiThreadingMgr::instance().getThreadPool().count() > 0) { - usleep(100); - } + MultiThreadingMgr::instance().getThreadPool().wait(); } /// @brief Set called1_ to true. diff --git a/src/bin/dhcp4/tests/dhcp4_client.cc b/src/bin/dhcp4/tests/dhcp4_client.cc index a92d9e75f2..e26a9312da 100644 --- a/src/bin/dhcp4/tests/dhcp4_client.cc +++ b/src/bin/dhcp4/tests/dhcp4_client.cc @@ -563,11 +563,8 @@ Dhcp4Client::sendMsg(const Pkt4Ptr& msg) { // Suppress errors, as the DHCPv4 server does. } - // make sure the server processed all packets in MT. - while (isc::util::MultiThreadingMgr::instance().getThreadPool().count()) { - usleep(100); - } - isc::util::MultiThreadingCriticalSection cs; + // Make sure the server processed all packets in MT. + isc::util::MultiThreadingMgr::instance().getThreadPool().wait(); } void diff --git a/src/bin/dhcp4/tests/dhcp4_test_utils.h b/src/bin/dhcp4/tests/dhcp4_test_utils.h index 533f0a181c..15a6caa588 100644 --- a/src/bin/dhcp4/tests/dhcp4_test_utils.h +++ b/src/bin/dhcp4/tests/dhcp4_test_utils.h @@ -149,13 +149,9 @@ public: return (pkt); } - // make sure the server processed all packets in MT. - while (isc::util::MultiThreadingMgr::instance().getThreadPool().count()) { - usleep(100); - } - { - isc::util::MultiThreadingCriticalSection cs; - } + // Make sure the server processed all packets in MT. + isc::util::MultiThreadingMgr::instance().getThreadPool().wait(); + // If not, just trigger shutdown and return immediately shutdown(); return (Pkt4Ptr()); diff --git a/src/bin/dhcp6/tests/client_handler_unittest.cc b/src/bin/dhcp6/tests/client_handler_unittest.cc index 3a7a9669f9..16619ff905 100644 --- a/src/bin/dhcp6/tests/client_handler_unittest.cc +++ b/src/bin/dhcp6/tests/client_handler_unittest.cc @@ -70,9 +70,7 @@ public: /// @brief Waits for pending continuations. void waitForThreads() { - while (MultiThreadingMgr::instance().getThreadPool().count() > 0) { - usleep(100); - } + MultiThreadingMgr::instance().getThreadPool().wait(); } /// @brief Set called1_ to true. diff --git a/src/lib/util/thread_pool.h b/src/lib/util/thread_pool.h index 8cf9f1c0d9..71ce8ed73d 100644 --- a/src/lib/util/thread_pool.h +++ b/src/lib/util/thread_pool.h @@ -110,6 +110,18 @@ struct ThreadPool { return (queue_.count()); } + /// @brief wait for current items to be processed + /// + /// Used to block the calling thread until all items in the queue have + /// been processed + void wait() { + auto id = std::this_thread::get_id(); + if (checkThreadId(id)) { + isc_throw(InvalidOperation, "thread pool stop called by owned thread"); + } + queue_.wait(); + } + /// @brief set maximum number of work items in the queue /// /// @param max_queue_size the maximum size (0 means unlimited) @@ -146,7 +158,7 @@ private: /// @param thread_count specifies the number of threads to be created and /// started void startInternal(uint32_t thread_count) { - queue_.enable(); + queue_.enable(thread_count); for (uint32_t i = 0; i < thread_count; ++i) { threads_.push_back(boost::make_shared(&ThreadPool::run, this)); } @@ -194,7 +206,7 @@ private: /// /// Creates the thread pool queue in 'disabled' state ThreadPoolQueue() - : enabled_(false), max_queue_size_(0), + : enabled_(false), max_queue_size_(0), working_(0), stat10(0.), stat100(0.), stat1000(0.) { } @@ -290,11 +302,16 @@ private: /// @return the first work item from the queue or an empty element. Item pop() { std::unique_lock lock(mutex_); + --working_; // Wait for push or disable functions. + if (working_ == 0 && queue_.empty()) { + wait_cv_.notify_all(); + } cv_.wait(lock, [&]() {return (!enabled_ || !queue_.empty());}); if (!enabled_) { return (Item()); } + ++working_; size_t length = queue_.size(); stat10 = stat10 * CEXP10 + (1 - CEXP10) * length; stat100 = stat100 * CEXP100 + (1 - CEXP100) * length; @@ -314,7 +331,17 @@ private: return (queue_.size()); } - /// @brief get queue length statistic + /// @brief wait for current items to be processed + /// + /// Used to block the calling thread until all items in the queue have + /// been processed + void wait() { + std::unique_lock lock(mutex_); + // Wait for any item or for working threads to finish. + wait_cv_.wait(lock, [&]() {return (working_ == 0 && queue_.empty());}); + } + + /// @brief get queue length statistic /// /// @param which select the statistic (10, 100 or 1000) /// @return the queue length statistic @@ -345,9 +372,12 @@ private: /// @brief enable the queue /// /// Sets the queue state to 'enabled' - void enable() { + /// + /// @param number of working threads + void enable(uint32_t thread_count) { std::lock_guard lock(mutex_); enabled_ = true; + working_ = thread_count; } /// @brief disable the queue @@ -381,6 +411,9 @@ private: /// @brief condition variable used to signal waiting threads std::condition_variable cv_; + /// @brief condition variable used to wait for all items to be processed + std::condition_variable wait_cv_; + /// @brief the sate of the queue /// The 'enabled' state corresponds to true value /// The 'disabled' state corresponds to false value @@ -390,6 +423,9 @@ private: /// (0 means unlimited) size_t max_queue_size_; + /// @brief number of threads currently doing work + uint32_t working_; + /// @brief queue length statistic for 10 packets double stat10;