From: Razvan Becheriu Date: Fri, 8 Nov 2019 13:49:12 +0000 (+0200) Subject: [#883, !506] changed thread pool interface X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=a5655410156923ccc8efa5bac7bcd8fbe71c3b81;p=thirdparty%2Fkea.git [#883, !506] changed thread pool interface --- diff --git a/src/lib/dhcpsrv/tests/thread_pool_unittest.cc b/src/lib/dhcpsrv/tests/thread_pool_unittest.cc index ff19be2407..218b785476 100644 --- a/src/lib/dhcpsrv/tests/thread_pool_unittest.cc +++ b/src/lib/dhcpsrv/tests/thread_pool_unittest.cc @@ -10,6 +10,8 @@ #include +#include + using namespace isc::dhcp; using namespace std; @@ -184,11 +186,13 @@ private: list> threads_; }; +typedef function CallBack; + /// @brief test ThreadPool add and count TEST_F(ThreadPoolTest, testAddAndCount) { uint32_t items_count; - ThreadPool::WorkItemCallBack call_back; - ThreadPool thread_pool; + CallBack call_back; + ThreadPool thread_pool; // the item count should be 0 ASSERT_EQ(thread_pool.count(), 0); // the thread count should be 0 @@ -202,170 +206,12 @@ TEST_F(ThreadPoolTest, testAddAndCount) { for (uint32_t i = 0; i < items_count; ++i) { thread_pool.add(call_back); } - // the item count should match - ASSERT_EQ(thread_pool.count(), items_count); -} - -/// @brief test ThreadPool create and destroy -TEST_F(ThreadPoolTest, testCreateAndDestroy) { - uint32_t items_count; - uint32_t thread_count; - ThreadPool::WorkItemCallBack call_back; - ThreadPool thread_pool; - // the item count should be 0 - ASSERT_EQ(thread_pool.count(), 0); - // the thread count should be 0 - ASSERT_EQ(thread_pool.size(), 0); - - items_count = 4; - thread_count = 4; - // prepare setup - reset(thread_count); - - // create tasks which block thread pool threads until signaled by main - // thread to force all threads of the thread pool to run exactly one task - call_back = std::bind(&ThreadPoolTest::runAndWait, this); - - // add items to stopped thread pool - for (uint32_t i = 0; i < items_count; ++i) { - thread_pool.add(call_back); - } - // the item count should match - ASSERT_EQ(thread_pool.count(), items_count); - // calling create with false should not create threads and should remove all - // queued items - thread_pool.create(thread_count, false); - // the item count should be 0 - ASSERT_EQ(thread_pool.count(), 0); - // the thread count should be 0 - ASSERT_EQ(thread_pool.size(), 0); - - // add items to stopped thread pool - for (uint32_t i = 0; i < items_count; ++i) { - thread_pool.add(call_back); - } // the item count should match ASSERT_EQ(thread_pool.count(), items_count); - // the thread count should be 0 - ASSERT_EQ(thread_pool.size(), 0); - - // calling destroy should clear all threads and should remove all queued - // items - thread_pool.destroy(); - // the item count should be 0 - ASSERT_EQ(thread_pool.count(), 0); - // the thread count should be 0 - ASSERT_EQ(thread_pool.size(), 0); - - // do it once again to check if it works - thread_pool.destroy(); - // the item count should be 0 - ASSERT_EQ(thread_pool.count(), 0); - // the thread count should be 0 - ASSERT_EQ(thread_pool.size(), 0); - // calling create with false should not create threads and should remove all - // queued items - thread_pool.create(thread_count, false); - // the item count should be 0 - ASSERT_EQ(thread_pool.count(), 0); - // the thread count should be 0 - ASSERT_EQ(thread_pool.size(), 0); - - // add items to stopped thread pool - for (uint32_t i = 0; i < items_count; ++i) { - thread_pool.add(call_back); - } - // the item count should match - ASSERT_EQ(thread_pool.count(), items_count); - // the thread count should be 0 - ASSERT_EQ(thread_pool.size(), 0); - - // calling create with true should create threads and should remove all - // queued items - thread_pool.create(thread_count); - // the item count should be 0 - ASSERT_EQ(thread_pool.count(), 0); - // the thread count should match - ASSERT_EQ(thread_pool.size(), thread_count); - - // add items to running thread pool - for (uint32_t i = 0; i < items_count; ++i) { - thread_pool.add(call_back); - } - - // wait for all items to be processed - waitTasks(thread_count, items_count); - // the item count should be 0 - ASSERT_EQ(thread_pool.count(), 0); - // the thread count should match - ASSERT_EQ(thread_pool.size(), thread_count); - // as each thread pool thread is still waiting on main to unblock, each - // thread should have been registered in ids list - checkIds(items_count); - // all items should have been processed - ASSERT_EQ(count(), items_count); - - // check that the number of processed tasks matches the number of items - checkRunHistory(items_count); - - // signal thread pool tasks to continue - signalThreads(); - - // calling destroy should clear all threads and should remove all queued - // items - thread_pool.destroy(); - // the item count should be 0 - ASSERT_EQ(thread_pool.count(), 0); - // the thread count should be 0 - ASSERT_EQ(thread_pool.size(), 0); - - // do it once again to check if it works - thread_pool.destroy(); - // the item count should be 0 - ASSERT_EQ(thread_pool.count(), 0); - // the thread count should be 0 - ASSERT_EQ(thread_pool.size(), 0); - - items_count = 64; - thread_count = 16; - // prepare setup - reset(thread_count); - - // create tasks which do not block the thread pool threads so that several - // tasks can be run on the same thread and some of the threads never even - // having a chance to run - call_back = std::bind(&ThreadPoolTest::run, this); - - // calling create with true should create threads and should remove all - // queued items - thread_pool.create(thread_count); - // the item count should be 0 - ASSERT_EQ(thread_pool.count(), 0); - // the thread count should match - ASSERT_EQ(thread_pool.size(), thread_count); - - // add items to running thread pool - for (uint32_t i = 0; i < items_count; ++i) { - thread_pool.add(call_back); - } - - // wait for all items to be processed - waitTasks(thread_count, items_count); - // the item count should be 0 - ASSERT_EQ(thread_pool.count(), 0); - // the thread count should match - ASSERT_EQ(thread_pool.size(), thread_count); - // all items should have been processed - ASSERT_EQ(count(), items_count); - - // check that the number of processed tasks matches the number of items - checkRunHistory(items_count); - - // calling destroy should clear all threads and should remove all queued - // items - thread_pool.destroy(); + // calling reset should clear all threads and should remove all queued items + thread_pool.reset(); // the item count should be 0 ASSERT_EQ(thread_pool.count(), 0); // the thread count should be 0 @@ -376,8 +222,8 @@ TEST_F(ThreadPoolTest, testCreateAndDestroy) { TEST_F(ThreadPoolTest, testStartAndStop) { uint32_t items_count; uint32_t thread_count; - ThreadPool::WorkItemCallBack call_back; - ThreadPool thread_pool; + CallBack call_back; + ThreadPool thread_pool; // the item count should be 0 ASSERT_EQ(thread_pool.count(), 0); // the thread count should be 0 @@ -406,8 +252,7 @@ TEST_F(ThreadPoolTest, testStartAndStop) { // the thread count should match ASSERT_EQ(thread_pool.size(), thread_count); - // calling stop with false should clear all threads and should keep queued - // items + // calling stop should clear all threads and should keep queued items thread_pool.stop(); // the item count should be 0 ASSERT_EQ(thread_pool.count(), 0); @@ -425,22 +270,28 @@ TEST_F(ThreadPoolTest, testStartAndStop) { for (uint32_t i = 0; i < items_count; ++i) { thread_pool.add(call_back); } + // the item count should match ASSERT_EQ(thread_pool.count(), items_count); // the thread count should be 0 ASSERT_EQ(thread_pool.size(), 0); - // calling stop with false should clear all threads and should keep queued - // items + // calling stop should clear all threads and should keep queued items thread_pool.stop(); // the item count should match ASSERT_EQ(thread_pool.count(), items_count); // the thread count should be 0 ASSERT_EQ(thread_pool.size(), 0); - // calling stop with true should clear all threads and should remove all - // queued items - thread_pool.stop(true); + // calling reset should clear all threads and should remove all queued items + thread_pool.reset(); + // the item count should be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + + // do it once again to check if it works + thread_pool.reset(); // the item count should be 0 ASSERT_EQ(thread_pool.count(), 0); // the thread count should be 0 @@ -476,8 +327,7 @@ TEST_F(ThreadPoolTest, testStartAndStop) { // signal thread pool tasks to continue signalThreads(); - // calling stop with false should clear all threads and should keep queued - // items + // calling stop should clear all threads and should keep queued items thread_pool.stop(); // the item count should be 0 ASSERT_EQ(thread_pool.count(), 0); @@ -498,6 +348,7 @@ TEST_F(ThreadPoolTest, testStartAndStop) { for (uint32_t i = 0; i < items_count; ++i) { thread_pool.add(call_back); } + // the item count should match ASSERT_EQ(thread_pool.count(), items_count); // the thread count should be 0 @@ -520,8 +371,7 @@ TEST_F(ThreadPoolTest, testStartAndStop) { // check that the number of processed tasks matches the number of items checkRunHistory(items_count); - // calling stop with false should clear all threads and should keep queued - // items + // calling stop should clear all threads and should keep queued items thread_pool.stop(); // the item count should be 0 ASSERT_EQ(thread_pool.count(), 0); diff --git a/src/lib/dhcpsrv/thread_pool.cc b/src/lib/dhcpsrv/thread_pool.cc deleted file mode 100644 index 7b2c5bd121..0000000000 --- a/src/lib/dhcpsrv/thread_pool.cc +++ /dev/null @@ -1,233 +0,0 @@ -// Copyright (C) 2018-2019 Internet Systems Consortium, Inc. ("ISC") -// -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at http://mozilla.org/MPL/2.0/. - -#include - -#include -#include - -using namespace std; - -namespace isc { -namespace dhcp { - -/// @brief Defines a generic thread pool queue. -/// -/// The main purpose is to safely manage thread pool tasks. -/// The thread pool queue can be 'disabled', which means that no items can be -/// removed from the queue, or 'enabled', which guarantees that inserting or -/// removing items are thread safe. -/// In 'disabled' state, all threads waiting on the queue are unlocked and all -/// operations are non blocking. -template -struct ThreadPoolQueue { - /// @brief Constructor - /// - /// Creates the thread pool queue in 'disabled' state - ThreadPoolQueue() : exit_(true) { - } - - /// @brief Destructor - /// - /// Destroys the thread pool queue - ~ThreadPoolQueue() { - stop(true); - } - - /// @brief push work item to the queue - /// - /// Used to add work items in the queue. - /// This function adds an item to the queue and wakes up at least one thread - /// waiting on the queue. - /// - /// @param item the new item to be added to the queue - void push(WorkItem& item) { - if (item == nullptr) { - return; - } - std::lock_guard lock(mutex_); - queue_.push(item); - // Notify pop function so that it can effectively remove a work item. - cv_.notify_all(); - } - - /// @brief pop work item from the queue or block waiting - /// - /// Used to retrieve and remove a work item from the queue - /// If the queue is 'disabled', this function returns immediately - /// (no element). - /// If the queue is 'enabled', this function returns the first element in - /// the queue or blocks the calling thread if there are no work items - /// available. - /// - /// @param item the reference of the item removed from the queue, if any - /// - /// @return true if there was a work item removed from the queue, false - /// otherwise - WorkItem pop() { - std::unique_lock lock(mutex_); - while (!exit_) { - if (queue_.empty()) { - // Wait for push or stop functions. - cv_.wait(lock); - continue; - } - - WorkItem item = queue_.front(); - queue_.pop(); - return item; - } - - return nullptr; - } - - /// @brief count number of work items in the queue - /// - /// Returns the number of work items in the queue - /// - /// @return the number of work items - size_t count() { - std::lock_guard lock(mutex_); - return queue_.size(); - } - - /// @brief clear remove all work items - /// - /// Removes all queued work items - void clear() { - std::lock_guard lock(mutex_); - reset(); - } - - /// @brief start and enable the queue - /// - /// Sets the queue state to 'enabled' - void start() { - std::lock_guard lock(mutex_); - exit_ = false; - } - - /// brief stop and disable the queue - /// - /// Sets the queue state to 'disabled' and optionally removes all work items - /// - /// @param clear used to specify if the function should also clear the queue - void stop(bool clear = false) { - std::lock_guard lock(mutex_); - exit_ = true; - // Notify pop so that it can exit. - cv_.notify_all(); - if (clear) { - reset(); - } - } - -private: - /// @brief reset clears the queue removing all work items - /// - /// Must be called in a critical section (mutex locked scope). - void reset() { - queue_ = std::queue(); - } - - /// @brief underlying queue container - std::queue queue_; - - /// @brief mutex used for critical sections - std::mutex mutex_; - - /// @brief condition variable used to signal waiting threads - std::condition_variable cv_; - - /// @brief the sate of the queue - /// The 'enabled' state corresponds to false value - /// The 'disabled' state corresponds to true value - std::atomic_bool exit_; -}; - -ThreadPool::ThreadPool() : - queue_(make_shared>()), exit_(true) { -} - -ThreadPool::~ThreadPool() { - destroy(); -} - -void ThreadPool::create(uint32_t thread_count, bool run) { - LOG_INFO(dhcpsrv_logger, "Thread pool starting with %1 worker threads") - .arg(thread_count); - - if (!thread_count) { - return; - } - destroy(); - if (run) { - start(thread_count); - } - - LOG_INFO(dhcpsrv_logger, "Thread pool created"); -} - -void ThreadPool::destroy() { - LOG_INFO(dhcpsrv_logger, "Thread pool shutting down"); - - stop(true); - - LOG_INFO(dhcpsrv_logger, "Thread pool shut down"); -} - -void ThreadPool::start(uint32_t thread_count) { - if (!thread_count || !exit_) { - return; - } - queue_->start(); - exit_ = false; - for (int i = 0; i < thread_count; ++i) { - threads_.push_back(make_shared(&ThreadPool::run, this)); - } - - LOG_INFO(dhcpsrv_logger, "Thread pool started"); -} - -void ThreadPool::stop(bool clear) { - exit_ = true; - queue_->stop(clear); - for (auto thread : threads_) { - thread->join(); - } - threads_.clear(); - - LOG_INFO(dhcpsrv_logger, "Thread pool stopped"); -} - -void ThreadPool::add(WorkItemCallBack& call_back) { - queue_->push(call_back); -} - -size_t ThreadPool::count() { - return queue_->count(); -} - -size_t ThreadPool::size() { - return threads_.size(); -} - -void ThreadPool::run() { - thread::id th_id = this_thread::get_id(); - LOG_INFO(dhcpsrv_logger, "Thread pool thread started. id: %1").arg(th_id); - - while (!exit_) { - WorkItemCallBack item = queue_->pop(); - if (item != nullptr) { - item(); - } - } - - LOG_INFO(dhcpsrv_logger, "Thread pool thread ended. id: %1").arg(th_id); -} - -} // namespace dhcp -} // namespace isc diff --git a/src/lib/dhcpsrv/thread_pool.h b/src/lib/dhcpsrv/thread_pool.h index 4e23707e88..1432d4175b 100644 --- a/src/lib/dhcpsrv/thread_pool.h +++ b/src/lib/dhcpsrv/thread_pool.h @@ -7,7 +7,7 @@ #ifndef THREAD_POOL_H #define THREAD_POOL_H -#include +#include #include #include @@ -19,70 +19,233 @@ namespace isc { namespace dhcp { -template -struct ThreadPoolQueue; - /// @brief Defines a thread pool which uses a thread pool queue for managing /// work items. Each work item is a 'function' object. +template struct ThreadPool { - using WorkItemCallBack = std::function; - /// @brief Constructor - ThreadPool(); + ThreadPool() : running_(false) { + } /// @brief Destructor - ~ThreadPool(); + ~ThreadPool() { + reset(); + } - /// @brief initialize the thread pool with specified number of treads and - /// optionally starts all threads. - /// - /// @param worker_threads specifies the number of threads to be created - /// @param run to indicate if the threads should be started immediately - void create(uint32_t thread_count, bool run = true); + /// @brief reset the thread pool stopping threads and clearing the internal + /// queue + void reset() { + LOG_INFO(dhcpsrv_logger, "Thread pool shutting down"); - /// @brief de-initialize the thread pool stopping threads and clearing the - /// internal queue - void destroy(); + stop(); + queue_.clear(); + + LOG_INFO(dhcpsrv_logger, "Thread pool shut down"); + } /// @brief start all the threads /// - /// @param worker_threads specifies the number of threads to be created - void start(uint32_t thread_count); + /// @param thread_count specifies the number of threads to be created and + /// started + void start(uint32_t thread_count) { + LOG_INFO(dhcpsrv_logger, "Thread pool starting with %1 worker threads") + .arg(thread_count); + if (!thread_count || running_) { + return; + } + queue_.enable(); + running_ = true; + for (uint32_t i = 0; i < thread_count; ++i) { + threads_.push_back(std::make_shared(&ThreadPool::run, this)); + } + + LOG_INFO(dhcpsrv_logger, "Thread pool started"); + } /// @brief stop all the threads - /// - /// @param clear used to specify if the function should also clear the queue - void stop(bool clear = false); + void stop() { + LOG_INFO(dhcpsrv_logger, "Thread pool stopping"); + running_ = false; + queue_.disable(); + for (auto thread : threads_) { + thread->join(); + } + threads_.clear(); + + LOG_INFO(dhcpsrv_logger, "Thread pool stopped"); + } /// @brief add a working item to the thread pool /// /// @param call_back the 'function' object to be added to the queue - void add(WorkItemCallBack& call_back); + void add(WorkItem& item) { + queue_.push(item); + } /// @brief count number of work items in the queue /// /// @return the number of work items in the queue - size_t count(); + size_t count() { + return queue_.count(); + } /// @brief size number of thread pool threads /// /// @return the number of threads - size_t size(); - + size_t size() { + return threads_.size(); + } private: + /// @brief Defines a generic thread pool queue. + /// + /// The main purpose is to safely manage thread pool tasks. + /// The thread pool queue can be 'disabled', which means that no items can be + /// removed from the queue, or 'enabled', which guarantees that inserting or + /// removing items are thread safe. + /// In 'disabled' state, all threads waiting on the queue are unlocked and all + /// operations are non blocking. + template + struct ThreadPoolQueue { + /// @brief Constructor + /// + /// Creates the thread pool queue in 'disabled' state + ThreadPoolQueue() : enabled_(false) { + } + + /// @brief Destructor + /// + /// Destroys the thread pool queue + ~ThreadPoolQueue() { + disable(); + clear(); + } + + /// @brief push work item to the queue + /// + /// Used to add work items in the queue. + /// This function adds an item to the queue and wakes up at least one thread + /// waiting on the queue. + /// + /// @param item the new item to be added to the queue + void push(Item& item) { + if (!item) { + return; + } + std::lock_guard lock(mutex_); + queue_.push(item); + // Notify pop function so that it can effectively remove a work item. + cv_.notify_all(); + } + + /// @brief pop work item from the queue or block waiting + /// + /// Used to retrieve and remove a work item from the queue + /// If the queue is 'disabled', this function returns immediately + /// (no element). + /// If the queue is 'enabled', this function returns the first element in + /// the queue or blocks the calling thread if there are no work items + /// available. + /// + /// @param item the reference of the item removed from the queue, if any + /// + /// @return true if there was a work item removed from the queue, false + /// otherwise + Item pop() { + std::unique_lock lock(mutex_); + while (enabled_) { + if (queue_.empty()) { + // Wait for push or stop functions. + cv_.wait(lock); + continue; + } + + Item item = queue_.front(); + queue_.pop(); + return item; + } + + return Item(); + } + + /// @brief count number of work items in the queue + /// + /// Returns the number of work items in the queue + /// + /// @return the number of work items + size_t count() { + std::lock_guard lock(mutex_); + return queue_.size(); + } + + /// @brief clear remove all work items + /// + /// Removes all queued work items + void clear() { + std::lock_guard lock(mutex_); + queue_ = std::queue(); + } + + /// @brief start and enable the queue + /// + /// Sets the queue state to 'enabled' + void enable() { + std::lock_guard lock(mutex_); + enabled_ = true; + } + + /// brief stop and disable the queue + /// + /// Sets the queue state to 'disabled' and optionally removes all work items + void disable() { + std::lock_guard lock(mutex_); + enabled_ = false; + // Notify pop so that it can exit. + cv_.notify_all(); + } + + private: + /// @brief underlying queue container + std::queue queue_; + + /// @brief mutex used for critical sections + std::mutex mutex_; + + /// @brief condition variable used to signal waiting threads + std::condition_variable cv_; + + /// @brief the sate of the queue + /// The 'enabled' state corresponds to false value + /// The 'disabled' state corresponds to true value + std::atomic_bool enabled_; + }; + /// @brief run function of each thread - void run(); + void run() { + std::thread::id th_id = std::this_thread::get_id(); + LOG_INFO(dhcpsrv_logger, "Thread pool thread started. id: %1") + .arg(th_id); + + while (running_) { + WorkItem item = queue_.pop(); + if (item) { + item(); + } + } + + LOG_INFO(dhcpsrv_logger, "Thread pool thread ended. id: %1") + .arg(th_id); + } /// @brief list of worker threads std::list> threads_; /// @brief underlying work items queue - std::shared_ptr> queue_; + ThreadPoolQueue queue_; /// @brief state of the thread pool /// The 'run' state corresponds to false value /// The 'stop' state corresponds to true value - std::atomic_bool exit_; + std::atomic_bool running_; }; } // namespace dhcp