From: Razvan Becheriu Date: Wed, 11 Sep 2019 08:35:25 +0000 (+0300) Subject: [#883, !506] updated unit tests X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=f9a849a31a5a458af4ff9ef0be2472b6905d4a78;p=thirdparty%2Fkea.git [#883, !506] updated unit tests --- diff --git a/src/lib/dhcpsrv/tests/thread_pool_unittest.cc b/src/lib/dhcpsrv/tests/thread_pool_unittest.cc index a795f8d55c..616f718ea4 100644 --- a/src/lib/dhcpsrv/tests/thread_pool_unittest.cc +++ b/src/lib/dhcpsrv/tests/thread_pool_unittest.cc @@ -18,106 +18,193 @@ namespace { /// @brief Test Fixture for testing isc::dhcp::ThreadPool class ThreadPoolTest : public ::testing::Test { public: + /// @brief Constructor ThreadPoolTest() : thread_count_(0), count_(0), wait_(false) { } /// @brief task function which registers the thread id and signals main - /// thread to wake + /// thread to stop waiting and then waits from main thread to signal to exit void runAndWait() { + // run task run(); + // wait for main thread signal to exit unique_lock lk(wait_mutex_); wait_cv_.wait(lk, [&]{ return (wait() == false); }); } + /// @brief task function which registers the thread id and signals main + /// thread to stop waiting void run() { // make sure this thread has started and it is accounted for lock_guard lk(mutex_); auto id = this_thread::get_id(); + // register this thread as doing work on items ids_.emplace(id); + // finish task ++count_; + // register this task on the history of this thread history_[id].push_back(count_); - // wake main thread if it is waiting for this thread to start + // wake main thread if it is waiting for this thread to process cv_.notify_all(); } + /// @brief reset all counters and internal test state void reset(uint32_t thread_count) { + // stop test threads stopThreads(); + // reset the internal state of the test thread_count_ = thread_count; count_ = 0; wait_ = true; history_.clear(); } - void waitThreads(uint32_t thread_count, uint32_t items_count, bool start = false) { + /// @brief start test threads and block main thread until all tasks have + /// been processed + /// + /// @param thread_count number of threads to be started + /// @param items_count number of items to wait for being processed + /// @param start create processing threads + /// @param signal give main thread control over the threads exit + void waitTasks(uint32_t thread_count, uint32_t items_count, + bool start = false, bool signal = true) { // make sure we have all threads running when performing all the checks unique_lock lck(mutex_); if (start) { - startThreads(thread_count); + // start test threads if explicitly specified + startThreads(thread_count, signal); } + // wait for the threads to process all the items cv_.wait(lck, [&]{ return (count() == items_count); }); } - void startThreads(uint32_t thread_count) { + /// @brief start test threads + /// + /// @param thread_count number of threads to be started + /// @param signal give main thread control over the threads exit + void startThreads(uint32_t thread_count, bool signal = true) { + // set default task function to wait for main thread signal + auto runFunction = &ThreadPoolTest::runAndWait; + if (!signal) { + // set the task function to finish immediately + runFunction = &ThreadPoolTest::run; + } + // start test threads for (uint32_t i = 0; i < thread_count; ++i) { - threads_.push_back(make_shared(&ThreadPoolTest::run, this)); + threads_.push_back(make_shared(runFunction, this)); } } + /// @brief stop test threads void stopThreads() { + // signal threads that are waiting signalThreads(); + // wait for all test threads to exit for (auto thread : threads_) { thread->join(); } + // reset all threads threads_.clear(); } + /// @brief function used by main thread to unblock processing threads void signalThreads() { lock_guard lk(wait_mutex_); - if (wait_) { - // wake all thread if waiting for main thread - wait_cv_.notify_all(); - } + // clear the wait flag so that threads will no longer wait for the main + // thread signal wait_ = false; + // wake all thread if waiting for main thread + wait_cv_.notify_all(); } + /// @brief number of completed tasks + /// + /// @return the number of completed tasks uint32_t count() { return count_; } + /// @brief flag which indicates if working thread should wait for main + /// thread signal + /// + /// @return the wait flag bool wait() { return wait_; } + /// @brief check the total number of tasks that have been processed + /// Some of the tasks may have been run on the same thread and none may have + /// been processed by other threads void checkRunHistory(uint32_t items_count) { uint32_t count = 0; + // iterate over all threads history and count all the processed tasks for (auto element : history_) { count += element.second.size(); } ASSERT_EQ(count, items_count); } + /// @brief check the total number of threads that have processed tasks + void checkIds(uint32_t count) { + ASSERT_EQ(ids_.size(), count); + } + +private: + /// @brief thread count used by the test uint32_t thread_count_; + /// @brief mutex used to keep the internal state consistent std::mutex mutex_; + /// @brief condition variable used to signal main thread that all threads + /// have started processing condition_variable cv_; + /// @brief mutex used to keep the internal state consistent + /// related to the control of the main thread over the working threads exit std::mutex wait_mutex_; + /// @brief condition variable used to signal working threads to exit condition_variable wait_cv_; + /// @brief number of completed tasks uint32_t count_; + /// @brief flag which indicates if working thread should wait for main + /// thread signal bool wait_; + /// @brief the set of thread ids which have completed tasks set ids_; + /// @brief the list of completed tasks run by each thread map> history_; + /// @brief the list of test threads list> threads_; }; -/// @brief test ThreadPool functionality +/// @brief test ThreadPool add and count +TEST_F(ThreadPoolTest, testAddAndCount) { + uint32_t items_count; + ThreadPool::WorkItemCallBack call_back; + ThreadPool thread_pool; + ASSERT_EQ(thread_pool.count(), 0); + ASSERT_EQ(thread_pool.size(), 0); + + items_count = 4; + + call_back = std::bind(&ThreadPoolTest::run, this); + + // add items to stopped thread pool + for (uint32_t i = 0; i < items_count; ++i) { + thread_pool.add(call_back); + } + // the item count sould match + ASSERT_EQ(thread_pool.count(), items_count); +} + +/// @brief test ThreadPool create and destroy TEST_F(ThreadPoolTest, testCreateAndDestroy) { uint32_t items_count; uint32_t thread_count; @@ -127,47 +214,118 @@ TEST_F(ThreadPoolTest, testCreateAndDestroy) { items_count = 4; thread_count = 4; + // prepare setup reset(thread_count); // create tasks which block thread pool threads until signaled by main // to force all threads of the thread pool to run exactly one task call_back = std::bind(&ThreadPoolTest::runAndWait, this); + // add items to stopped thread pool for (uint32_t i = 0; i < items_count; ++i) { thread_pool.add(call_back); } + // the item count should match ASSERT_EQ(thread_pool.count(), items_count); + // calling create with false should not create threads and should remove all + // queued items thread_pool.create(thread_count, false); + // the item count sould be 0 ASSERT_EQ(thread_pool.count(), 0); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + // add items to stopped thread pool for (uint32_t i = 0; i < items_count; ++i) { thread_pool.add(call_back); } + // the item count should match ASSERT_EQ(thread_pool.count(), items_count); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + // calling destroy should clear all threads and should remove all queued + // items thread_pool.destroy(); + // the item count sould be 0 ASSERT_EQ(thread_pool.count(), 0); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + + // do it once again to check if it works + thread_pool.destroy(); + // the item count sould be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + + // calling create with false should not create threads and should remove all + // queued items + thread_pool.create(thread_count, false); + // the item count sould be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + + // add items to stopped thread pool + for (uint32_t i = 0; i < items_count; ++i) { + thread_pool.add(call_back); + } + // the item count should match + ASSERT_EQ(thread_pool.count(), items_count); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + // calling create with true should create threads and should remove all + // queued items thread_pool.create(thread_count); + // the item count sould be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should match + ASSERT_EQ(thread_pool.size(), thread_count); + // add items to running thread pool for (uint32_t i = 0; i < items_count; ++i) { thread_pool.add(call_back); } - waitThreads(thread_count, items_count); + // wait for all items to be processed + waitTasks(thread_count, items_count); + // the item count sould be 0 ASSERT_EQ(thread_pool.count(), 0); - ASSERT_EQ(ids_.size(), items_count); + // the thread count should match + ASSERT_EQ(thread_pool.size(), thread_count); + // as each thread pool thread is still waiting on main to unblock, each + // thread should have been registered in ids list + checkIds(items_count); + // all items should have been processed ASSERT_EQ(count(), items_count); + // check that the number of processed tasks matches the number of items checkRunHistory(items_count); + // signal thread pool tasks to continue signalThreads(); + // calling destroy should clear all threads and should remove all queued + // items + thread_pool.destroy(); + // the item count sould be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + + // do it once again to check if it works thread_pool.destroy(); + // the item count sould be 0 ASSERT_EQ(thread_pool.count(), 0); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); items_count = 64; + thread_count = 16; + // prepare setup reset(thread_count); // create tasks which do not block the thread pool threads so that several @@ -175,17 +333,192 @@ TEST_F(ThreadPoolTest, testCreateAndDestroy) { // having a chance to run call_back = std::bind(&ThreadPoolTest::run, this); + // calling create with true should create threads and should remove all + // queued items thread_pool.create(thread_count); + // the item count sould be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should match + ASSERT_EQ(thread_pool.size(), thread_count); + // add items to running thread pool for (uint32_t i = 0; i < items_count; ++i) { thread_pool.add(call_back); } - waitThreads(thread_count, items_count); + // wait for all items to be processed + waitTasks(thread_count, items_count); + // the item count sould be 0 ASSERT_EQ(thread_pool.count(), 0); + // the thread count should match + ASSERT_EQ(thread_pool.size(), thread_count); + // all items should have been processed ASSERT_EQ(count(), items_count); + // check that the number of processed tasks matches the number of items checkRunHistory(items_count); + + // calling destroy should clear all threads and should remove all queued + // items + thread_pool.destroy(); + // the item count sould be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); +} + +/// @brief test ThreadPool start and stop +TEST_F(ThreadPoolTest, testStartAndStop) { + uint32_t items_count; + uint32_t thread_count; + ThreadPool::WorkItemCallBack call_back; + ThreadPool thread_pool; + ASSERT_EQ(thread_pool.count(), 0); + + items_count = 4; + thread_count = 4; + // prepare setup + reset(thread_count); + + // create tasks which block thread pool threads until signaled by main + // to force all threads of the thread pool to run exactly one task + call_back = std::bind(&ThreadPoolTest::runAndWait, this); + + // calling start should create the threads and should keep the queued items + thread_pool.start(thread_count); + // the item count sould be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should match + ASSERT_EQ(thread_pool.size(), thread_count); + + // do it once again to check if it works + thread_pool.start(thread_count); + // the item count sould be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should match + ASSERT_EQ(thread_pool.size(), thread_count); + + // calling stop with false should clear all threads and should keep queued + // items + thread_pool.stop(); + // the item count sould be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + + // do it once again to check if it works + thread_pool.stop(); + // the item count sould be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + + // add items to stopped thread pool + for (uint32_t i = 0; i < items_count; ++i) { + thread_pool.add(call_back); + } + // the item count sould match + ASSERT_EQ(thread_pool.count(), items_count); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + + // calling stop with false should clear all threads and should keep queued + // items + thread_pool.stop(); + // the item count sould match + ASSERT_EQ(thread_pool.count(), items_count); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + + // calling stop with true should clear all threads and should remove all + // queued items + thread_pool.stop(true); + // the item count sould be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + + // calling start should create the threads and should keep the queued items + thread_pool.start(thread_count); + // the item count sould be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), thread_count); + + // add items to running thread pool + for (uint32_t i = 0; i < items_count; ++i) { + thread_pool.add(call_back); + } + + // wait for all items to be processed + waitTasks(thread_count, items_count); + // the item count sould be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should match + ASSERT_EQ(thread_pool.size(), thread_count); + // as each thread pool thread is still waiting on main to unblock, each + // thread should have been registered in ids list + checkIds(items_count); + // all items should have been processed + ASSERT_EQ(count(), items_count); + + // check that the number of processed tasks matches the number of items + checkRunHistory(items_count); + + // signal thread pool tasks to continue + signalThreads(); + + // calling stop with false should clear all threads and should keep queued + // items + thread_pool.stop(); + // the item count sould be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + + items_count = 64; + thread_count = 16; + // prepare setup + reset(thread_count); + + // create tasks which do not block the thread pool threads so that several + // tasks can be run on the same thread and some of the threads never even + // having a chance to run + call_back = std::bind(&ThreadPoolTest::run, this); + + // add items to stopped thread pool + for (uint32_t i = 0; i < items_count; ++i) { + thread_pool.add(call_back); + } + // the item count sould match + ASSERT_EQ(thread_pool.count(), items_count); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); + + // calling start should create the threads and should keep the queued items + thread_pool.start(thread_count); + // the thread count should match + ASSERT_EQ(thread_pool.size(), thread_count); + + // wait for all items to be processed + waitTasks(thread_count, items_count); + // the item count sould be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should match + ASSERT_EQ(thread_pool.size(), thread_count); + // all items should have been processed + ASSERT_EQ(count(), items_count); + + // check that the number of processed tasks matches the number of items + checkRunHistory(items_count); + + // calling stop with false should clear all threads and should keep queued + // items + thread_pool.stop(); + // the item count sould be 0 + ASSERT_EQ(thread_pool.count(), 0); + // the thread count should be 0 + ASSERT_EQ(thread_pool.size(), 0); } } // namespace diff --git a/src/lib/dhcpsrv/thread_pool.cc b/src/lib/dhcpsrv/thread_pool.cc index b465f6ce51..675c41f509 100644 --- a/src/lib/dhcpsrv/thread_pool.cc +++ b/src/lib/dhcpsrv/thread_pool.cc @@ -191,6 +191,9 @@ void ThreadPool::destroy() { } void ThreadPool::start(uint32_t thread_count) { + if (!thread_count || !exit_) { + return; + } queue_->start(); exit_ = false; for (int i = 0; i < thread_count; ++i) { @@ -219,6 +222,10 @@ size_t ThreadPool::count() { return queue_->count(); } +size_t ThreadPool::size() { + return threads_.size(); +} + void ThreadPool::run() { thread::id th_id = this_thread::get_id(); LOG_INFO(dhcpsrv_logger, "Thread pool thread started. id: %1").arg(th_id); diff --git a/src/lib/dhcpsrv/thread_pool.h b/src/lib/dhcpsrv/thread_pool.h index 7c701439cc..4883da19db 100644 --- a/src/lib/dhcpsrv/thread_pool.h +++ b/src/lib/dhcpsrv/thread_pool.h @@ -77,6 +77,11 @@ struct ThreadPool { /// @return the number of work items in the queue size_t count(); + /// @brief size number of thread pool threads + /// + /// @return the number of threads + size_t size(); + private: /// @brief run function of each thread void run();