]> git.ipfire.org Git - thirdparty/kea.git/commitdiff
[#883, !506] updated unit tests
authorRazvan Becheriu <razvan@isc.org>
Wed, 11 Sep 2019 08:35:25 +0000 (11:35 +0300)
committerRazvan Becheriu <razvan@isc.org>
Fri, 8 Nov 2019 13:52:29 +0000 (15:52 +0200)
src/lib/dhcpsrv/tests/thread_pool_unittest.cc
src/lib/dhcpsrv/thread_pool.cc
src/lib/dhcpsrv/thread_pool.h

index a795f8d55c82675323f2d621708dfcd6add6aece..616f718ea436fc28eb44b009c98852db0b4dc5f5 100644 (file)
@@ -18,106 +18,193 @@ namespace {
 /// @brief Test Fixture for testing isc::dhcp::ThreadPool
 class ThreadPoolTest : public ::testing::Test {
 public:
+    /// @brief Constructor
     ThreadPoolTest() : thread_count_(0), count_(0), wait_(false) {
     }
 
     /// @brief task function which registers the thread id and signals main
-    /// thread to wake
+    /// thread to stop waiting and then waits from main thread to signal to exit
     void runAndWait() {
+        // run task
         run();
+        // wait for main thread signal to exit
         unique_lock<mutex> lk(wait_mutex_);
         wait_cv_.wait(lk, [&]{ return (wait() == false); });
     }
 
+    /// @brief task function which registers the thread id and signals main
+    /// thread to stop waiting
     void run() {
         // make sure this thread has started and it is accounted for
         lock_guard<mutex> lk(mutex_);
         auto id = this_thread::get_id();
+        // register this thread as doing work on items
         ids_.emplace(id);
+        // finish task
         ++count_;
+        // register this task on the history of this thread
         history_[id].push_back(count_);
-        // wake main thread if it is waiting for this thread to start
+        // wake main thread if it is waiting for this thread to process
         cv_.notify_all();
     }
 
+    /// @brief reset all counters and internal test state
     void reset(uint32_t thread_count) {
+        // stop test threads
         stopThreads();
+        // reset the internal state of the test
         thread_count_ = thread_count;
         count_ = 0;
         wait_ = true;
         history_.clear();
     }
 
-    void waitThreads(uint32_t thread_count, uint32_t items_count, bool start = false) {
+    /// @brief start test threads and block main thread until all tasks have
+    /// been processed
+    ///
+    /// @param thread_count number of threads to be started
+    /// @param items_count number of items to wait for being processed
+    /// @param start create processing threads
+    /// @param signal give main thread control over the threads exit
+    void waitTasks(uint32_t thread_count, uint32_t items_count,
+                   bool start = false, bool signal = true) {
         // make sure we have all threads running when performing all the checks
         unique_lock<mutex> lck(mutex_);
         if (start) {
-            startThreads(thread_count);
+            // start test threads if explicitly specified
+            startThreads(thread_count, signal);
         }
+        // wait for the threads to process all the items
         cv_.wait(lck, [&]{ return (count() == items_count); });
     }
 
-    void startThreads(uint32_t thread_count) {
+    /// @brief start test threads
+    ///
+    /// @param thread_count number of threads to be started
+    /// @param signal give main thread control over the threads exit
+    void startThreads(uint32_t thread_count, bool signal = true) {
+        // set default task function to wait for main thread signal
+        auto runFunction = &ThreadPoolTest::runAndWait;
+        if (!signal) {
+            // set the task function to finish immediately
+            runFunction = &ThreadPoolTest::run;
+        }
+        // start test threads
         for (uint32_t i = 0; i < thread_count; ++i) {
-            threads_.push_back(make_shared<std::thread>(&ThreadPoolTest::run, this));
+            threads_.push_back(make_shared<std::thread>(runFunction, this));
         }
     }
 
+    /// @brief stop test threads
     void stopThreads() {
+        // signal threads that are waiting
         signalThreads();
+        // wait for all test threads to exit
         for (auto thread : threads_) {
             thread->join();
         }
+        // reset all threads
         threads_.clear();
     }
 
+    /// @brief function used by main thread to unblock processing threads
     void signalThreads() {
         lock_guard<mutex> lk(wait_mutex_);
-        if (wait_) {
-            // wake all thread if waiting for main thread
-            wait_cv_.notify_all();
-        }
+        // clear the wait flag so that threads will no longer wait for the main
+        // thread signal
         wait_ = false;
+        // wake all thread if waiting for main thread
+        wait_cv_.notify_all();
     }
 
+    /// @brief number of completed tasks
+    ///
+    /// @return the number of completed tasks
     uint32_t count() {
         return count_;
     }
 
+    /// @brief flag which indicates if working thread should wait for main
+    /// thread signal
+    ///
+    /// @return the wait flag
     bool wait() {
         return wait_;
     }
 
+    /// @brief check the total number of tasks that have been processed
+    /// Some of the tasks may have been run on the same thread and none may have
+    /// been processed by other threads
     void checkRunHistory(uint32_t items_count) {
         uint32_t count = 0;
+        // iterate over all threads history and count all the processed tasks
         for (auto element : history_) {
             count += element.second.size();
         }
         ASSERT_EQ(count, items_count);
     }
 
+    /// @brief check the total number of threads that have processed tasks
+    void checkIds(uint32_t count) {
+        ASSERT_EQ(ids_.size(), count);
+    }
+
+private:
+    /// @brief thread count used by the test
     uint32_t thread_count_;
 
+    /// @brief mutex used to keep the internal state consistent
     std::mutex mutex_;
 
+    /// @brief condition variable used to signal main thread that all threads
+    /// have started processing
     condition_variable cv_;
 
+    /// @brief mutex used to keep the internal state consistent
+    /// related to the control of the main thread over the working threads exit
     std::mutex wait_mutex_;
 
+    /// @brief condition variable used to signal working threads to exit
     condition_variable wait_cv_;
 
+    /// @brief number of completed tasks
     uint32_t count_;
 
+    /// @brief flag which indicates if working thread should wait for main
+    /// thread signal
     bool wait_;
 
+    /// @brief the set of thread ids which have completed tasks
     set<std::thread::id> ids_;
 
+    /// @brief the list of completed tasks run by each thread
     map<std::thread::id, list<uint32_t>> history_;
 
+    /// @brief the list of test threads
     list<shared_ptr<std::thread>> threads_;
 };
 
-/// @brief test ThreadPool functionality
+/// @brief test ThreadPool add and count
+TEST_F(ThreadPoolTest, testAddAndCount) {
+    uint32_t items_count;
+    ThreadPool::WorkItemCallBack call_back;
+    ThreadPool thread_pool;
+    ASSERT_EQ(thread_pool.count(), 0);
+    ASSERT_EQ(thread_pool.size(), 0);
+
+    items_count = 4;
+
+    call_back = std::bind(&ThreadPoolTest::run, this);
+
+    // add items to stopped thread pool
+    for (uint32_t i = 0; i < items_count; ++i) {
+        thread_pool.add(call_back);
+    }
+    // the item count sould match
+    ASSERT_EQ(thread_pool.count(), items_count);
+}
+
+/// @brief test ThreadPool create and destroy
 TEST_F(ThreadPoolTest, testCreateAndDestroy) {
     uint32_t items_count;
     uint32_t thread_count;
@@ -127,47 +214,118 @@ TEST_F(ThreadPoolTest, testCreateAndDestroy) {
 
     items_count = 4;
     thread_count = 4;
+    // prepare setup
     reset(thread_count);
 
     // create tasks which block thread pool threads until signaled by main
     // to force all threads of the thread pool to run exactly one task
     call_back = std::bind(&ThreadPoolTest::runAndWait, this);
 
+    // add items to stopped thread pool
     for (uint32_t i = 0; i < items_count; ++i) {
         thread_pool.add(call_back);
     }
+    // the item count should match
     ASSERT_EQ(thread_pool.count(), items_count);
 
+    // calling create with false should not create threads and should remove all
+    // queued items
     thread_pool.create(thread_count, false);
+    // the item count sould be 0
     ASSERT_EQ(thread_pool.count(), 0);
+    // the thread count should be 0
+    ASSERT_EQ(thread_pool.size(), 0);
 
+    // add items to stopped thread pool
     for (uint32_t i = 0; i < items_count; ++i) {
         thread_pool.add(call_back);
     }
+    // the item count should match
     ASSERT_EQ(thread_pool.count(), items_count);
+    // the thread count should be 0
+    ASSERT_EQ(thread_pool.size(), 0);
 
+    // calling destroy should clear all threads and should remove all queued
+    // items
     thread_pool.destroy();
+    // the item count sould be 0
     ASSERT_EQ(thread_pool.count(), 0);
+    // the thread count should be 0
+    ASSERT_EQ(thread_pool.size(), 0);
+
+    // do it once again to check if it works
+    thread_pool.destroy();
+    // the item count sould be 0
+    ASSERT_EQ(thread_pool.count(), 0);
+    // the thread count should be 0
+    ASSERT_EQ(thread_pool.size(), 0);
+
+    // calling create with false should not create threads and should remove all
+    // queued items
+    thread_pool.create(thread_count, false);
+    // the item count sould be 0
+    ASSERT_EQ(thread_pool.count(), 0);
+    // the thread count should be 0
+    ASSERT_EQ(thread_pool.size(), 0);
+
+    // add items to stopped thread pool
+    for (uint32_t i = 0; i < items_count; ++i) {
+        thread_pool.add(call_back);
+    }
+    // the item count should match
+    ASSERT_EQ(thread_pool.count(), items_count);
+    // the thread count should be 0
+    ASSERT_EQ(thread_pool.size(), 0);
 
+    // calling create with true should create threads and should remove all
+    // queued items
     thread_pool.create(thread_count);
+    // the item count sould be 0
+    ASSERT_EQ(thread_pool.count(), 0);
+    // the thread count should match
+    ASSERT_EQ(thread_pool.size(), thread_count);
 
+    // add items to running thread pool
     for (uint32_t i = 0; i < items_count; ++i) {
         thread_pool.add(call_back);
     }
 
-    waitThreads(thread_count, items_count);
+    // wait for all items to be processed
+    waitTasks(thread_count, items_count);
+    // the item count sould be 0
     ASSERT_EQ(thread_pool.count(), 0);
-    ASSERT_EQ(ids_.size(), items_count);
+    // the thread count should match
+    ASSERT_EQ(thread_pool.size(), thread_count);
+    // as each thread pool thread is still waiting on main to unblock, each
+    // thread should have been registered in ids list
+    checkIds(items_count);
+    // all items should have been processed
     ASSERT_EQ(count(), items_count);
 
+    // check that the number of processed tasks matches the number of items
     checkRunHistory(items_count);
 
+    // signal thread pool tasks to continue
     signalThreads();
 
+    // calling destroy should clear all threads and should remove all queued
+    // items
+    thread_pool.destroy();
+    // the item count sould be 0
+    ASSERT_EQ(thread_pool.count(), 0);
+    // the thread count should be 0
+    ASSERT_EQ(thread_pool.size(), 0);
+
+    // do it once again to check if it works
     thread_pool.destroy();
+    // the item count sould be 0
     ASSERT_EQ(thread_pool.count(), 0);
+    // the thread count should be 0
+    ASSERT_EQ(thread_pool.size(), 0);
 
     items_count = 64;
+    thread_count = 16;
+    // prepare setup
     reset(thread_count);
 
     // create tasks which do not block the thread pool threads so that several
@@ -175,17 +333,192 @@ TEST_F(ThreadPoolTest, testCreateAndDestroy) {
     // having a chance to run
     call_back = std::bind(&ThreadPoolTest::run, this);
 
+    // calling create with true should create threads and should remove all
+    // queued items
     thread_pool.create(thread_count);
+    // the item count sould be 0
+    ASSERT_EQ(thread_pool.count(), 0);
+    // the thread count should match
+    ASSERT_EQ(thread_pool.size(), thread_count);
 
+    // add items to running thread pool
     for (uint32_t i = 0; i < items_count; ++i) {
         thread_pool.add(call_back);
     }
 
-    waitThreads(thread_count, items_count);
+    // wait for all items to be processed
+    waitTasks(thread_count, items_count);
+    // the item count sould be 0
     ASSERT_EQ(thread_pool.count(), 0);
+    // the thread count should match
+    ASSERT_EQ(thread_pool.size(), thread_count);
+    // all items should have been processed
     ASSERT_EQ(count(), items_count);
 
+    // check that the number of processed tasks matches the number of items
     checkRunHistory(items_count);
+
+    // calling destroy should clear all threads and should remove all queued
+    // items
+    thread_pool.destroy();
+    // the item count sould be 0
+    ASSERT_EQ(thread_pool.count(), 0);
+    // the thread count should be 0
+    ASSERT_EQ(thread_pool.size(), 0);
+}
+
+/// @brief test ThreadPool start and stop
+TEST_F(ThreadPoolTest, testStartAndStop) {
+    uint32_t items_count;
+    uint32_t thread_count;
+    ThreadPool::WorkItemCallBack call_back;
+    ThreadPool thread_pool;
+    ASSERT_EQ(thread_pool.count(), 0);
+
+    items_count = 4;
+    thread_count = 4;
+    // prepare setup
+    reset(thread_count);
+
+    // create tasks which block thread pool threads until signaled by main
+    // to force all threads of the thread pool to run exactly one task
+    call_back = std::bind(&ThreadPoolTest::runAndWait, this);
+
+    // calling start should create the threads and should keep the queued items
+    thread_pool.start(thread_count);
+    // the item count sould be 0
+    ASSERT_EQ(thread_pool.count(), 0);
+    // the thread count should match
+    ASSERT_EQ(thread_pool.size(), thread_count);
+
+    // do it once again to check if it works
+    thread_pool.start(thread_count);
+    // the item count sould be 0
+    ASSERT_EQ(thread_pool.count(), 0);
+    // the thread count should match
+    ASSERT_EQ(thread_pool.size(), thread_count);
+
+    // calling stop with false should clear all threads and should keep queued
+    // items
+    thread_pool.stop();
+    // the item count sould be 0
+    ASSERT_EQ(thread_pool.count(), 0);
+    // the thread count should be 0
+    ASSERT_EQ(thread_pool.size(), 0);
+
+    // do it once again to check if it works
+    thread_pool.stop();
+    // the item count sould be 0
+    ASSERT_EQ(thread_pool.count(), 0);
+    // the thread count should be 0
+    ASSERT_EQ(thread_pool.size(), 0);
+
+    // add items to stopped thread pool
+    for (uint32_t i = 0; i < items_count; ++i) {
+        thread_pool.add(call_back);
+    }
+    // the item count sould match
+    ASSERT_EQ(thread_pool.count(), items_count);
+    // the thread count should be 0
+    ASSERT_EQ(thread_pool.size(), 0);
+
+    // calling stop with false should clear all threads and should keep queued
+    // items
+    thread_pool.stop();
+    // the item count sould match
+    ASSERT_EQ(thread_pool.count(), items_count);
+    // the thread count should be 0
+    ASSERT_EQ(thread_pool.size(), 0);
+
+    // calling stop with true should clear all threads and should remove all
+    // queued items
+    thread_pool.stop(true);
+    // the item count sould be 0
+    ASSERT_EQ(thread_pool.count(), 0);
+    // the thread count should be 0
+    ASSERT_EQ(thread_pool.size(), 0);
+
+    // calling start should create the threads and should keep the queued items
+    thread_pool.start(thread_count);
+    // the item count sould be 0
+    ASSERT_EQ(thread_pool.count(), 0);
+    // the thread count should be 0
+    ASSERT_EQ(thread_pool.size(), thread_count);
+
+    // add items to running thread pool
+    for (uint32_t i = 0; i < items_count; ++i) {
+        thread_pool.add(call_back);
+    }
+
+    // wait for all items to be processed
+    waitTasks(thread_count, items_count);
+    // the item count sould be 0
+    ASSERT_EQ(thread_pool.count(), 0);
+    // the thread count should match
+    ASSERT_EQ(thread_pool.size(), thread_count);
+    // as each thread pool thread is still waiting on main to unblock, each
+    // thread should have been registered in ids list
+    checkIds(items_count);
+    // all items should have been processed
+    ASSERT_EQ(count(), items_count);
+
+    // check that the number of processed tasks matches the number of items
+    checkRunHistory(items_count);
+
+    // signal thread pool tasks to continue
+    signalThreads();
+
+    // calling stop with false should clear all threads and should keep queued
+    // items
+    thread_pool.stop();
+    // the item count sould be 0
+    ASSERT_EQ(thread_pool.count(), 0);
+    // the thread count should be 0
+    ASSERT_EQ(thread_pool.size(), 0);
+
+    items_count = 64;
+    thread_count = 16;
+    // prepare setup
+    reset(thread_count);
+
+    // create tasks which do not block the thread pool threads so that several
+    // tasks can be run on the same thread and some of the threads never even
+    // having a chance to run
+    call_back = std::bind(&ThreadPoolTest::run, this);
+
+    // add items to stopped thread pool
+    for (uint32_t i = 0; i < items_count; ++i) {
+        thread_pool.add(call_back);
+    }
+    // the item count sould match
+    ASSERT_EQ(thread_pool.count(), items_count);
+    // the thread count should be 0
+    ASSERT_EQ(thread_pool.size(), 0);
+
+    // calling start should create the threads and should keep the queued items
+    thread_pool.start(thread_count);
+    // the thread count should match
+    ASSERT_EQ(thread_pool.size(), thread_count);
+
+    // wait for all items to be processed
+    waitTasks(thread_count, items_count);
+    // the item count sould be 0
+    ASSERT_EQ(thread_pool.count(), 0);
+    // the thread count should match
+    ASSERT_EQ(thread_pool.size(), thread_count);
+    // all items should have been processed
+    ASSERT_EQ(count(), items_count);
+
+    // check that the number of processed tasks matches the number of items
+    checkRunHistory(items_count);
+
+    // calling stop with false should clear all threads and should keep queued
+    // items
+    thread_pool.stop();
+    // the item count sould be 0
+    ASSERT_EQ(thread_pool.count(), 0);
+    // the thread count should be 0
+    ASSERT_EQ(thread_pool.size(), 0);
 }
 
 }  // namespace
index b465f6ce51c5fa8796ea23c8b65b9f39db791b52..675c41f5091c71b97b6a00a89021b6055d68b583 100644 (file)
@@ -191,6 +191,9 @@ void ThreadPool::destroy() {
 }
 
 void ThreadPool::start(uint32_t thread_count) {
+    if (!thread_count || !exit_) {
+        return;
+    }
     queue_->start();
     exit_ = false;
     for (int i = 0; i < thread_count; ++i) {
@@ -219,6 +222,10 @@ size_t ThreadPool::count() {
     return queue_->count();
 }
 
+size_t ThreadPool::size() {
+    return threads_.size();
+}
+
 void ThreadPool::run() {
     thread::id th_id = this_thread::get_id();
     LOG_INFO(dhcpsrv_logger, "Thread pool thread started. id: %1").arg(th_id);
index 7c701439cc18b6538961c0c29c8fb239f4dace5f..4883da19db6929fcce42a8a848e2741198278106 100644 (file)
@@ -77,6 +77,11 @@ struct ThreadPool {
     /// @return the number of work items in the queue
     size_t count();
 
+    /// @brief size number of thread pool threads
+    ///
+    /// @return the number of threads
+    size_t size();
+
 private:
     /// @brief run function of each thread
     void run();