From: Razvan Becheriu Date: Tue, 10 Sep 2019 15:14:54 +0000 (+0300) Subject: [#883, !506] added thread pool unit tests X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=f3f216d4e0e003d45d331d59b31d3e65d6d057bd;p=thirdparty%2Fkea.git [#883, !506] added thread pool unit tests --- diff --git a/src/lib/dhcpsrv/tests/Makefile.am b/src/lib/dhcpsrv/tests/Makefile.am index d9cac94776..86bc606cf2 100644 --- a/src/lib/dhcpsrv/tests/Makefile.am +++ b/src/lib/dhcpsrv/tests/Makefile.am @@ -125,6 +125,7 @@ libdhcpsrv_unittests_SOURCES += srv_config_unittest.cc libdhcpsrv_unittests_SOURCES += subnet_unittest.cc libdhcpsrv_unittests_SOURCES += test_get_callout_handle.cc test_get_callout_handle.h libdhcpsrv_unittests_SOURCES += triplet_unittest.cc +libdhcpsrv_unittests_SOURCES += thread_pool_unittest.cc libdhcpsrv_unittests_SOURCES += test_utils.cc test_utils.h libdhcpsrv_unittests_SOURCES += timer_mgr_unittest.cc libdhcpsrv_unittests_SOURCES += network_state_unittest.cc diff --git a/src/lib/dhcpsrv/tests/thread_pool_unittest.cc b/src/lib/dhcpsrv/tests/thread_pool_unittest.cc new file mode 100644 index 0000000000..a918d3bf75 --- /dev/null +++ b/src/lib/dhcpsrv/tests/thread_pool_unittest.cc @@ -0,0 +1,187 @@ +// Copyright (C) 2018-2019 Internet Systems Consortium, Inc. ("ISC") +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +#include + +#include + +#include + +using namespace isc::dhcp; +using namespace std; + +namespace { + +/// @brief Test Fixture for testing isc::dhcp::ThreadPool +class ThreadPoolTest : public ::testing::Test { +public: + ThreadPoolTest() : thread_count_(0), count_(0), wait_(false) { + } + + /// @brief task function which registers the thread id and signals main + /// thread to wake + void runAndWait() { + run(); + unique_lock lk(wait_mutex_); + wait_cv_.wait(lk, [&]{ return (wait() == false); }); + } + + void run() { + // make sure this thread has started and it is accounted for + lock_guard lk(mutex_); + auto id = this_thread::get_id(); + ids_.emplace(id); + ++count_; + history_[id].push_back(count_); + // wake main thread if it is waiting for this thread to start + cv_.notify_all(); + } + + void reset(uint32_t thread_count) { + stopThreads(); + thread_count_ = thread_count; + count_ = 0; + wait_ = true; + history_.clear(); + } + + void waitThreads(uint32_t thread_count, bool start = false) { + // make sure we have all threads running when performing all the checks + unique_lock lck(mutex_); + if (start) { + startThreads(thread_count); + } + cv_.wait(lck, [&]{ return (count() == thread_count); }); + } + + void startThreads(uint32_t thread_count) { + for (uint32_t i = 0; i < thread_count; ++i) { + threads_.push_back(make_shared(&ThreadPoolTest::run, this)); + } + } + + void stopThreads() { + signalThreads(); + for (auto thread : threads_) { + thread->join(); + } + threads_.clear(); + } + + void signalThreads() { + lock_guard lk(wait_mutex_); + if (wait_) { + // wake all thread if waiting for main thread + wait_cv_.notify_all(); + } + wait_ = false; + } + + uint32_t count() { + return count_; + } + + bool wait() { + return wait_; + } + + void checkRunHistory(uint32_t items_count) { + uint32_t count = 0; + for (auto element : history_) { + count += element.second.size(); + } + ASSERT_EQ(count, items_count); + } + + uint32_t thread_count_; + + std::mutex mutex_; + + condition_variable cv_; + + std::mutex wait_mutex_; + + condition_variable wait_cv_; + + uint32_t count_; + + bool wait_; + + set ids_; + + map> history_; + + list> threads_; +}; + +/// @brief test ThreadPool functionality +TEST_F(ThreadPoolTest, testCreateAndDestroy) { + uint32_t items_count = 4; + reset(items_count); + ThreadPool::WorkItemCallBack call_back; + + ThreadPool thread_pool; + ASSERT_EQ(thread_pool.count(), 0); + + // create tasks which block thread pool threads until signaled by main + // to force all threads of the thread pool to run exactly one task + call_back = std::bind(&ThreadPoolTest::runAndWait, this); + + for (uint32_t i = 0; i < items_count; ++i) { + thread_pool.add(call_back); + } + ASSERT_EQ(thread_pool.count(), items_count); + + thread_pool.create(items_count, false); + ASSERT_EQ(thread_pool.count(), 0); + + for (uint32_t i = 0; i < items_count; ++i) { + thread_pool.add(call_back); + } + ASSERT_EQ(thread_pool.count(), items_count); + + thread_pool.destroy(); + ASSERT_EQ(thread_pool.count(), 0); + + thread_pool.create(items_count); + + for (uint32_t i = 0; i < items_count; ++i) { + thread_pool.add(call_back); + } + + waitThreads(items_count); + ASSERT_EQ(thread_pool.count(), 0); + ASSERT_EQ(ids_.size(), items_count); + ASSERT_EQ(count(), items_count); + + checkRunHistory(items_count); + + signalThreads(); + + thread_pool.destroy(); + ASSERT_EQ(thread_pool.count(), 0); + + reset(items_count); + + // create tasks which do not block the thread pool threads so that several + // tasks can be run on the same thread and some of the threads never even + // having a chance to run + call_back = std::bind(&ThreadPoolTest::run, this); + + thread_pool.create(items_count); + + for (uint32_t i = 0; i < items_count; ++i) { + thread_pool.add(call_back); + } + + waitThreads(items_count); + ASSERT_EQ(thread_pool.count(), 0); + ASSERT_EQ(count(), items_count); + + checkRunHistory(items_count); +} + +} // namespace diff --git a/src/lib/dhcpsrv/thread_pool.cc b/src/lib/dhcpsrv/thread_pool.cc index e868bb3153..b465f6ce51 100644 --- a/src/lib/dhcpsrv/thread_pool.cc +++ b/src/lib/dhcpsrv/thread_pool.cc @@ -27,22 +27,156 @@ using namespace std; namespace isc { namespace dhcp { -ThreadPool::ThreadPool() : exit_(true) { +/// @brief Defines a generic thread pool queue. +/// +/// The main purpose is to safely manage thread pool tasks. +/// The thread pool queue can be 'disabled', which means that no items can be +/// added or removed from the queue, or 'enabled', which guarantees that +/// inserting or removing items are thread safe. +/// In 'disabled' state, all threads waiting on the queue are unlocked and all +/// operations are non blocking. +template +struct ThreadPoolQueue { + /// @brief Constructor + /// + /// Creates the thread pool queue in 'disabled' state + ThreadPoolQueue() : exit_(true) { + } + + /// @brief Destructor + /// + /// Destroys the thread pool queue + ~ThreadPoolQueue() { + stop(true); + } + + /// @brief push work item to the queue + /// + /// Used to add work items in the queue. + /// This function adds an item to the queue and wakes up at least one thread + /// waiting on the queue. + /// + /// @param item the new iten to be added to the queue + void push(WorkItem item) { + std::lock_guard lock(mutex_); + queue_.push(item); + // Notify pop function so that it can effectively remove a work item. + cv_.notify_all(); + } + + /// @brief pop work item from the queue or block waiting + /// + /// Used to retrieve and remove a work item from the queue + /// If the queue is 'disabled', this function returns immediately + /// (no element). + /// If the queue is 'enabled', this function returns the first element in + /// the queue or blocks the calling thread if there are no work items + /// available. + /// + /// @param item the reference of the item removed from the queue, if any + /// + /// @return true if there was a work item removed from the queue, false + /// otherwise + bool pop(WorkItem& item) { + std::unique_lock lock(mutex_); + while (!exit_) { + if (queue_.empty()) { + // Wait for push or stop functions. + cv_.wait(lock); + continue; + } + + item = queue_.front(); + queue_.pop(); + return true; + } + + return false; + } + + /// @brief count number of work items in the queue + /// + /// Returns the number of work items in the queue + /// + /// @return the number of work items + size_t count() { + std::lock_guard lock(mutex_); + return queue_.size(); + } + + /// @brief clear remove all work items + /// + /// Removes all queued work items + void clear() { + std::lock_guard lock(mutex_); + reset(); + } + + /// @brief start and enable the queue + /// + /// Sets the queue state to 'enabled' + void start() { + std::lock_guard lock(mutex_); + exit_ = false; + } + + /// brief stop and disable the queue + /// + /// Sets the queue state to 'disabled' and optionally removes all work items + /// + /// @param clear used to specify if the function should also clear the queue + void stop(bool clear = false) { + std::lock_guard lock(mutex_); + exit_ = true; + // Notify get() so that it can exit. + cv_.notify_all(); + if (clear) { + reset(); + } + } + +private: + /// @brief reset clears the queue removing all work items + /// + /// Must be called in a critical section (mutex locked scope). + void reset() { + queue_ = std::queue(); + } + + /// @brief underlying queue container + std::queue queue_; + + /// @brief mutex used for critical sections + std::mutex mutex_; + + /// @brief condition variable used to signal waiting threads + std::condition_variable cv_; + + /// @brief the sate of the queue + /// + /// The 'enabled' state corresponds to false value + /// The 'disabled' state corresponds to true value + std::atomic_bool exit_; +}; + +ThreadPool::ThreadPool() : + queue_(make_shared>()), exit_(true) { } ThreadPool::~ThreadPool() { destroy(); } -void ThreadPool::create(uint32_t worker_threads, bool run) { +void ThreadPool::create(uint32_t thread_count, bool run) { LOG_INFO(dhcpsrv_logger, "Thread pool starting with %1 worker threads") - .arg(worker_threads); - if (!worker_threads) { + .arg(thread_count); + + if (!thread_count) { return; } destroy(); if (run) { - start(worker_threads); + start(thread_count); } LOG_INFO(dhcpsrv_logger, "Thread pool created"); @@ -50,16 +184,17 @@ void ThreadPool::create(uint32_t worker_threads, bool run) { void ThreadPool::destroy() { LOG_INFO(dhcpsrv_logger, "Thread pool shutting down"); + stop(true); LOG_INFO(dhcpsrv_logger, "Thread pool shut down"); } -void ThreadPool::start(uint32_t worker_threads) { - queue_.start(); +void ThreadPool::start(uint32_t thread_count) { + queue_->start(); exit_ = false; - for (int i = 0; i < worker_threads; ++i) { - worker_threads_.push_back(make_shared(&ThreadPool::run, this)); + for (int i = 0; i < thread_count; ++i) { + threads_.push_back(make_shared(&ThreadPool::run, this)); } LOG_INFO(dhcpsrv_logger, "Thread pool started"); @@ -67,21 +202,21 @@ void ThreadPool::start(uint32_t worker_threads) { void ThreadPool::stop(bool clear) { exit_ = true; - queue_.stop(clear); - for (auto thread : worker_threads_) { + queue_->stop(clear); + for (auto thread : threads_) { thread->join(); } - worker_threads_.clear(); + threads_.clear(); LOG_INFO(dhcpsrv_logger, "Thread pool stopped"); } void ThreadPool::add(WorkItemCallBack call_back) { - queue_.push(call_back); + queue_->push(call_back); } size_t ThreadPool::count() { - return queue_.count(); + return queue_->count(); } void ThreadPool::run() { @@ -89,9 +224,9 @@ void ThreadPool::run() { LOG_INFO(dhcpsrv_logger, "Thread pool thread started. id: %1").arg(th_id); while (!exit_) { - WorkItemCallBack work_item; - if (queue_.pop(work_item)) { - work_item(); + WorkItemCallBack item; + if (queue_->pop(item)) { + item(); } } diff --git a/src/lib/dhcpsrv/thread_pool.h b/src/lib/dhcpsrv/thread_pool.h index b5b5f2c268..7c701439cc 100644 --- a/src/lib/dhcpsrv/thread_pool.h +++ b/src/lib/dhcpsrv/thread_pool.h @@ -32,141 +32,8 @@ namespace isc { namespace dhcp { -/// @brief Defines a generic thread pool queue. -/// -/// The main purpose is to safely manage thread pool tasks. -/// The thread pool queue can be 'disabled', which means that no items can be -/// added or removed from the queue, or 'enabled', which guarantees that -/// inserting or removing items are thread safe. -/// In 'disabled' state, all threads waiting on the queue are unlocked and all -/// operations are non blocking. template -struct ThreadPoolQueue { - /// @brief Constructor - /// - /// Creates the thread pool queue in 'disabled' state - ThreadPoolQueue() : exit_(true) { - } - - /// @brief Destructor - /// - /// Destroys the thread pool queue - ~ThreadPoolQueue() { - stop(true); - } - - /// @brief push work item to the queue - /// - /// Used to add work items in the queue. - /// If the queue is 'disabled', this function returns immediately. - /// If the queue is 'enabled', this function adds an item to the queue and - /// wakes up at least one thread waiting on the queue. - /// - /// @param item the new iten to be added to the queue - void push(WorkItem item) { - std::lock_guard lock(mutex_); - if (exit_) { - return; - } - queue_.push(item); - // Notify pop function so that it can effectively remove a work item. - cv_.notify_all(); - } - - /// @brief pop work item from the queue or block waiting - /// - /// Used to retrieve and remove a work item from the queue - /// If the queue is 'disabled', this function returns immediately - /// (no element). - /// If the queue is 'enabled', this function returns the first element in - /// the queue or blocks the calling thread if there are no work items - /// available. - /// - /// @param item the reference of the item removed from the queue, if any - /// - /// @return true if there was a work item removed from the queue, false - /// otherwise - bool pop(WorkItem& item) { - std::unique_lock lock(mutex_); - while (!exit_) { - if (queue_.empty()) { - // Wait for push or stop functions. - cv_.wait(lock); - continue; - } - - item = queue_.front(); - queue_.pop(); - return true; - } - - return false; - } - - /// @brief count number of work items in the queue - /// - /// Returns the number of work items in the queue - /// - /// @return the number of work items - size_t count() { - std::lock_guard lock(mutex_); - return queue_.size(); - } - - /// @brief clear remove all work items - /// - /// Removes all queued work items - void clear() { - std::lock_guard lock(mutex_); - reset(); - } - - /// @brief start and enable the queue - /// - /// Sets the queue state to 'enabled' - void start() { - std::lock_guard lock(mutex_); - exit_ = false; - } - - /// brief stop and disable the queue - /// - /// Sets the queue state to 'disabled' and optionally removes all work items - /// - /// @param clear used to specify if the function should also clear the queue - void stop(bool clear = false) { - std::lock_guard lock(mutex_); - exit_ = true; - // Notify get() so that it can exit. - cv_.notify_all(); - if (clear) { - reset(); - } - } - -private: - /// @brief reset clears the queue removing all work items - /// - /// Must be called in a critical section (mutex locked scope). - void reset() { - queue_ = std::queue(); - } - - /// @brief underlying queue container - std::queue queue_; - - /// @brief mutex used for critical sections - std::mutex mutex_; - - /// @brief condition variable used to signal waiting threads - std::condition_variable cv_; - - /// @brief the sate of the queue - /// - /// The 'enabled' state corresponds to false value - /// The 'disabled' state corresponds to true value - std::atomic_bool exit_; -}; +struct ThreadPoolQueue; /// @brief Defines a thread pool which uses a thread pool queue for managing /// work items. Each work item is a 'function' object. @@ -215,10 +82,10 @@ private: void run(); /// @brief list of worker threads - std::list> worker_threads_; + std::list> threads_; /// @brief underlying work items queue - ThreadPoolQueue queue_; + std::shared_ptr> queue_; /// @brief state of the thread pool /// The 'run' state corresponds to false value diff --git a/src/lib/util/threads/tests/lock_guard_unittest.cc b/src/lib/util/threads/tests/lock_guard_unittest.cc index 5ba8bdebba..594d778fbd 100644 --- a/src/lib/util/threads/tests/lock_guard_unittest.cc +++ b/src/lib/util/threads/tests/lock_guard_unittest.cc @@ -34,8 +34,8 @@ public: /// @brief Constructor /// /// @param recursive sets the mutex as recursive mutex - TestMutex(bool recursive = false) : lock_(0), lock_count_(0), unlock_count_(0), - dead_lock_(false), recursive_(recursive) { + TestMutex(bool recursive = false) : lock_(0), dead_lock_(false), + lock_count_(0), unlock_count_(0), recursive_(recursive) { } /// @brief lock the mutex @@ -97,6 +97,14 @@ public: return lock_; } + /// @brief get the mutex dead lock state + /// + /// @return the mutex dead lock state + bool getDeadLock() { + lock_guard lk(mutex_); + return dead_lock_; + } + /// @brief get the number of locks performed on mutex /// /// @return the mutex number of locks @@ -113,14 +121,6 @@ public: return unlock_count_; } - /// @brief get the mutex dead lock state - /// - /// @return the mutex dead lock state - bool getDeadLock() { - lock_guard lk(mutex_); - return dead_lock_; - } - /// @brief test the internal state of the mutex /// /// @param expected_lock check equality of this value with lock state @@ -141,15 +141,15 @@ private: /// @brief internal lock state of the mutex int32_t lock_; + /// @brief state which indicated that the mutex in in dead lock + bool dead_lock_; + /// @brief total number of locks performed on the mutex uint32_t lock_count_; /// @brief total number of unlocks performed on the mutex uint32_t unlock_count_; - /// @brief state which indicated that the mutex in in dead lock - bool dead_lock_; - /// @brief flag to indicate if the mutex is recursive or not bool recursive_; @@ -160,56 +160,13 @@ private: std::thread::id id_; }; -/// @brief Test Fixture for testing isc:util::thread::LockGuard +/// @brief Test Fixture for testing isc::util::thread::LockGuard class LockGuardTest : public ::testing::Test { -public: - void run() { - { - // make sure this thread has started and it is accounted for - lock_guard lk(mutex_); - ids_.emplace(this_thread::get_id()); - ++count_; - // wake main thread if it is waiting for this thread to start - cv_.notify_all(); - } - } - - void reset() { - count_ = 0; - } - - uint32_t count() { - return count_; - } - - std::mutex mutex_; - - condition_variable cv_; - - uint32_t count_; - - set ids_; - - list> threads_; }; +/// @brief test LockGuard functionality with non-recursive mutex, recursive mutex +/// and null pointer TEST_F(LockGuardTest, testLock) { - reset(); - // make sure we have all threads running when performing all the checks - { - unique_lock lck(mutex_); - for (uint32_t i = 0; i < 2; ++i) { - threads_.push_back(make_shared(&LockGuardTest::run, this)); - } - cv_.wait(lck, [&]{ return count() == 2; }); - } - for (auto thread : threads_) { - thread->join(); - } - threads_.clear(); - ASSERT_EQ(count_, 2); - ASSERT_EQ(ids_.size(), 2); - shared_ptr test_mutex; // test non-recursive lock test_mutex = make_shared(); @@ -222,7 +179,8 @@ TEST_F(LockGuardTest, testLock) { { // call LockGuard constructor which locks mutex resulting in an // exception as the mutex is already locked (dead lock) - EXPECT_THROW(LockGuard lock(test_mutex.get()), isc::InvalidOperation); + EXPECT_THROW(LockGuard lock(test_mutex.get()), + isc::InvalidOperation); // expect lock 1 lock_count 1 unlock_count 0 dead_lock true // you should not be able to get here...using a real mutex test_mutex->testMutexState(1, 1, 0, true); @@ -263,4 +221,5 @@ TEST_F(LockGuardTest, testLock) { } } } -} + +} // namespace