]> git.ipfire.org Git - thirdparty/kea.git/commitdiff
[#883, !506] added thread pool unit tests
authorRazvan Becheriu <razvan@isc.org>
Tue, 10 Sep 2019 15:14:54 +0000 (18:14 +0300)
committerRazvan Becheriu <razvan@isc.org>
Wed, 6 Nov 2019 17:32:51 +0000 (19:32 +0200)
src/lib/dhcpsrv/tests/Makefile.am
src/lib/dhcpsrv/tests/thread_pool_unittest.cc [new file with mode: 0644]
src/lib/dhcpsrv/thread_pool.cc
src/lib/dhcpsrv/thread_pool.h
src/lib/util/threads/tests/lock_guard_unittest.cc

index d9cac94776fa096b9da89fee85c20e181f6ae346..86bc606cf217f589540e89e9bd221ddf11967847 100644 (file)
@@ -125,6 +125,7 @@ libdhcpsrv_unittests_SOURCES += srv_config_unittest.cc
 libdhcpsrv_unittests_SOURCES += subnet_unittest.cc
 libdhcpsrv_unittests_SOURCES += test_get_callout_handle.cc test_get_callout_handle.h
 libdhcpsrv_unittests_SOURCES += triplet_unittest.cc
+libdhcpsrv_unittests_SOURCES += thread_pool_unittest.cc
 libdhcpsrv_unittests_SOURCES += test_utils.cc test_utils.h
 libdhcpsrv_unittests_SOURCES += timer_mgr_unittest.cc
 libdhcpsrv_unittests_SOURCES += network_state_unittest.cc
diff --git a/src/lib/dhcpsrv/tests/thread_pool_unittest.cc b/src/lib/dhcpsrv/tests/thread_pool_unittest.cc
new file mode 100644 (file)
index 0000000..a918d3b
--- /dev/null
@@ -0,0 +1,187 @@
+// Copyright (C) 2018-2019 Internet Systems Consortium, Inc. ("ISC")
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+#include <config.h>
+
+#include <gtest/gtest.h>
+
+#include <dhcpsrv/thread_pool.h>
+
+using namespace isc::dhcp;
+using namespace std;
+
+namespace {
+
+/// @brief Test Fixture for testing isc::dhcp::ThreadPool
+class ThreadPoolTest : public ::testing::Test {
+public:
+    ThreadPoolTest() : thread_count_(0), count_(0), wait_(false) {
+    }
+
+    /// @brief task function which registers the thread id and signals main
+    /// thread to wake
+    void runAndWait() {
+        run();
+        unique_lock<mutex> lk(wait_mutex_);
+        wait_cv_.wait(lk, [&]{ return (wait() == false); });
+    }
+
+    void run() {
+        // make sure this thread has started and it is accounted for
+        lock_guard<mutex> lk(mutex_);
+        auto id = this_thread::get_id();
+        ids_.emplace(id);
+        ++count_;
+        history_[id].push_back(count_);
+        // wake main thread if it is waiting for this thread to start
+        cv_.notify_all();
+    }
+
+    void reset(uint32_t thread_count) {
+        stopThreads();
+        thread_count_ = thread_count;
+        count_ = 0;
+        wait_ = true;
+        history_.clear();
+    }
+
+    void waitThreads(uint32_t thread_count, bool start = false) {
+        // make sure we have all threads running when performing all the checks
+        unique_lock<mutex> lck(mutex_);
+        if (start) {
+            startThreads(thread_count);
+        }
+        cv_.wait(lck, [&]{ return (count() == thread_count); });
+    }
+
+    void startThreads(uint32_t thread_count) {
+        for (uint32_t i = 0; i < thread_count; ++i) {
+            threads_.push_back(make_shared<std::thread>(&ThreadPoolTest::run, this));
+        }
+    }
+
+    void stopThreads() {
+        signalThreads();
+        for (auto thread : threads_) {
+            thread->join();
+        }
+        threads_.clear();
+    }
+
+    void signalThreads() {
+        lock_guard<mutex> lk(wait_mutex_);
+        if (wait_) {
+            // wake all thread if waiting for main thread
+            wait_cv_.notify_all();
+        }
+        wait_ = false;
+    }
+
+    uint32_t count() {
+        return count_;
+    }
+
+    bool wait() {
+        return wait_;
+    }
+
+    void checkRunHistory(uint32_t items_count) {
+        uint32_t count = 0;
+        for (auto element : history_) {
+            count += element.second.size();
+        }
+        ASSERT_EQ(count, items_count);
+    }
+
+    uint32_t thread_count_;
+
+    std::mutex mutex_;
+
+    condition_variable cv_;
+
+    std::mutex wait_mutex_;
+
+    condition_variable wait_cv_;
+
+    uint32_t count_;
+
+    bool wait_;
+
+    set<std::thread::id> ids_;
+
+    map<std::thread::id, list<uint32_t>> history_;
+
+    list<shared_ptr<std::thread>> threads_;
+};
+
+/// @brief test ThreadPool functionality
+TEST_F(ThreadPoolTest, testCreateAndDestroy) {
+    uint32_t items_count = 4;
+    reset(items_count);
+    ThreadPool::WorkItemCallBack call_back;
+
+    ThreadPool thread_pool;
+    ASSERT_EQ(thread_pool.count(), 0);
+
+    // create tasks which block thread pool threads until signaled by main
+    // to force all threads of the thread pool to run exactly one task
+    call_back = std::bind(&ThreadPoolTest::runAndWait, this);
+
+    for (uint32_t i = 0; i < items_count; ++i) {
+        thread_pool.add(call_back);
+    }
+    ASSERT_EQ(thread_pool.count(), items_count);
+
+    thread_pool.create(items_count, false);
+    ASSERT_EQ(thread_pool.count(), 0);
+
+    for (uint32_t i = 0; i < items_count; ++i) {
+        thread_pool.add(call_back);
+    }
+    ASSERT_EQ(thread_pool.count(), items_count);
+
+    thread_pool.destroy();
+    ASSERT_EQ(thread_pool.count(), 0);
+
+    thread_pool.create(items_count);
+
+    for (uint32_t i = 0; i < items_count; ++i) {
+        thread_pool.add(call_back);
+    }
+
+    waitThreads(items_count);
+    ASSERT_EQ(thread_pool.count(), 0);
+    ASSERT_EQ(ids_.size(), items_count);
+    ASSERT_EQ(count(), items_count);
+
+    checkRunHistory(items_count);
+
+    signalThreads();
+
+    thread_pool.destroy();
+    ASSERT_EQ(thread_pool.count(), 0);
+
+    reset(items_count);
+
+    // create tasks which do not block the thread pool threads so that several
+    // tasks can be run on the same thread and some of the threads never even
+    // having a chance to run
+    call_back = std::bind(&ThreadPoolTest::run, this);
+
+    thread_pool.create(items_count);
+
+    for (uint32_t i = 0; i < items_count; ++i) {
+        thread_pool.add(call_back);
+    }
+
+    waitThreads(items_count);
+    ASSERT_EQ(thread_pool.count(), 0);
+    ASSERT_EQ(count(), items_count);
+
+    checkRunHistory(items_count);
+}
+
+}  // namespace
index e868bb3153a335321f1d990bfea89d8989178c03..b465f6ce51c5fa8796ea23c8b65b9f39db791b52 100644 (file)
@@ -27,22 +27,156 @@ using namespace std;
 namespace isc {
 namespace dhcp {
 
-ThreadPool::ThreadPool() : exit_(true) {
+/// @brief Defines a generic thread pool queue.
+///
+/// The main purpose is to safely manage thread pool tasks.
+/// The thread pool queue can be 'disabled', which means that no items can be
+/// added or removed from the queue, or 'enabled', which guarantees that
+/// inserting or removing items are thread safe.
+/// In 'disabled' state, all threads waiting on the queue are unlocked and all
+/// operations are non blocking.
+template <typename WorkItem>
+struct ThreadPoolQueue {
+    /// @brief Constructor
+    ///
+    /// Creates the thread pool queue in 'disabled' state
+    ThreadPoolQueue() : exit_(true) {
+    }
+
+    /// @brief Destructor
+    ///
+    /// Destroys the thread pool queue
+    ~ThreadPoolQueue() {
+        stop(true);
+    }
+
+    /// @brief push work item to the queue
+    ///
+    /// Used to add work items in the queue.
+    /// This function adds an item to the queue and wakes up at least one thread
+    /// waiting on the queue.
+    ///
+    /// @param item the new iten to be added to the queue
+    void push(WorkItem item) {
+        std::lock_guard<std::mutex> lock(mutex_);
+        queue_.push(item);
+        // Notify pop function so that it can effectively remove a work item.
+        cv_.notify_all();
+    }
+
+    /// @brief pop work item from the queue or block waiting
+    ///
+    /// Used to retrieve and remove a work item from the queue
+    /// If the queue is 'disabled', this function returns immediately
+    /// (no element).
+    /// If the queue is 'enabled', this function returns the first element in
+    /// the queue or blocks the calling thread if there are no work items
+    /// available.
+    ///
+    /// @param item the reference of the item removed from the queue, if any
+    ///
+    /// @return true if there was a work item removed from the queue, false
+    /// otherwise
+    bool pop(WorkItem& item) {
+        std::unique_lock<std::mutex> lock(mutex_);
+        while (!exit_) {
+            if (queue_.empty()) {
+                // Wait for push or stop functions.
+                cv_.wait(lock);
+                continue;
+            }
+
+            item = queue_.front();
+            queue_.pop();
+            return true;
+        }
+
+        return false;
+    }
+
+    /// @brief count number of work items in the queue
+    ///
+    /// Returns the number of work items in the queue
+    ///
+    /// @return the number of work items
+    size_t count() {
+        std::lock_guard<std::mutex> lock(mutex_);
+        return queue_.size();
+    }
+
+    /// @brief clear remove all work items
+    ///
+    /// Removes all queued work items
+    void clear() {
+        std::lock_guard<std::mutex> lock(mutex_);
+        reset();
+    }
+
+    /// @brief start and enable the queue
+    ///
+    /// Sets the queue state to 'enabled'
+    void start() {
+        std::lock_guard<std::mutex> lock(mutex_);
+        exit_ = false;
+    }
+
+    /// brief stop and disable the queue
+    ///
+    /// Sets the queue state to 'disabled' and optionally removes all work items
+    ///
+    /// @param clear used to specify if the function should also clear the queue
+    void stop(bool clear = false) {
+        std::lock_guard<std::mutex> lock(mutex_);
+        exit_ = true;
+        // Notify get() so that it can exit.
+        cv_.notify_all();
+        if (clear) {
+            reset();
+        }
+    }
+
+private:
+    /// @brief reset clears the queue removing all work items
+    ///
+    /// Must be called in a critical section (mutex locked scope).
+    void reset() {
+        queue_ = std::queue<WorkItem>();
+    }
+
+    /// @brief underlying queue container
+    std::queue<WorkItem> queue_;
+
+    /// @brief mutex used for critical sections
+    std::mutex mutex_;
+
+    /// @brief condition variable used to signal waiting threads
+    std::condition_variable cv_;
+
+    /// @brief the sate of the queue
+    ///
+    /// The 'enabled' state corresponds to false value
+    /// The 'disabled' state corresponds to true value
+    std::atomic_bool exit_;
+};
+
+ThreadPool::ThreadPool() :
+    queue_(make_shared<ThreadPoolQueue<WorkItemCallBack>>()), exit_(true) {
 }
 
 ThreadPool::~ThreadPool() {
     destroy();
 }
 
-void ThreadPool::create(uint32_t worker_threads, bool run) {
+void ThreadPool::create(uint32_t thread_count, bool run) {
     LOG_INFO(dhcpsrv_logger, "Thread pool starting with %1 worker threads")
-        .arg(worker_threads);
-    if (!worker_threads) {
+        .arg(thread_count);
+
+    if (!thread_count) {
         return;
     }
     destroy();
     if (run) {
-        start(worker_threads);
+        start(thread_count);
     }
 
     LOG_INFO(dhcpsrv_logger, "Thread pool created");
@@ -50,16 +184,17 @@ void ThreadPool::create(uint32_t worker_threads, bool run) {
 
 void ThreadPool::destroy() {
     LOG_INFO(dhcpsrv_logger, "Thread pool shutting down");
+
     stop(true);
 
     LOG_INFO(dhcpsrv_logger, "Thread pool shut down");
 }
 
-void ThreadPool::start(uint32_t worker_threads) {
-    queue_.start();
+void ThreadPool::start(uint32_t thread_count) {
+    queue_->start();
     exit_ = false;
-    for (int i = 0; i < worker_threads; ++i) {
-        worker_threads_.push_back(make_shared<thread>(&ThreadPool::run, this));
+    for (int i = 0; i < thread_count; ++i) {
+        threads_.push_back(make_shared<thread>(&ThreadPool::run, this));
     }
 
     LOG_INFO(dhcpsrv_logger, "Thread pool started");
@@ -67,21 +202,21 @@ void ThreadPool::start(uint32_t worker_threads) {
 
 void ThreadPool::stop(bool clear) {
     exit_ = true;
-    queue_.stop(clear);
-    for (auto thread : worker_threads_) {
+    queue_->stop(clear);
+    for (auto thread : threads_) {
         thread->join();
     }
-    worker_threads_.clear();
+    threads_.clear();
 
     LOG_INFO(dhcpsrv_logger, "Thread pool stopped");
 }
 
 void ThreadPool::add(WorkItemCallBack call_back) {
-    queue_.push(call_back);
+    queue_->push(call_back);
 }
 
 size_t ThreadPool::count() {
-    return queue_.count();
+    return queue_->count();
 }
 
 void ThreadPool::run() {
@@ -89,9 +224,9 @@ void ThreadPool::run() {
     LOG_INFO(dhcpsrv_logger, "Thread pool thread started. id: %1").arg(th_id);
 
     while (!exit_) {
-        WorkItemCallBack work_item;
-        if (queue_.pop(work_item)) {
-            work_item();
+        WorkItemCallBack item;
+        if (queue_->pop(item)) {
+            item();
         }
     }
 
index b5b5f2c268bd20a2668363ea12b2c4b7e7bdab67..7c701439cc18b6538961c0c29c8fb239f4dace5f 100644 (file)
 namespace isc {
 namespace dhcp {
 
-/// @brief Defines a generic thread pool queue.
-///
-/// The main purpose is to safely manage thread pool tasks.
-/// The thread pool queue can be 'disabled', which means that no items can be
-/// added or removed from the queue, or 'enabled', which guarantees that
-/// inserting or removing items are thread safe.
-/// In 'disabled' state, all threads waiting on the queue are unlocked and all
-/// operations are non blocking.
 template <typename WorkItem>
-struct ThreadPoolQueue {
-    /// @brief Constructor
-    ///
-    /// Creates the thread pool queue in 'disabled' state
-    ThreadPoolQueue() : exit_(true) {
-    }
-
-    /// @brief Destructor
-    ///
-    /// Destroys the thread pool queue
-    ~ThreadPoolQueue() {
-        stop(true);
-    }
-
-    /// @brief push work item to the queue
-    ///
-    /// Used to add work items in the queue.
-    /// If the queue is 'disabled', this function returns immediately.
-    /// If the queue is 'enabled', this function adds an item to the queue and
-    /// wakes up at least one thread waiting on the queue.
-    ///
-    /// @param item the new iten to be added to the queue
-    void push(WorkItem item) {
-        std::lock_guard<std::mutex> lock(mutex_);
-        if (exit_) {
-            return;
-        }
-        queue_.push(item);
-        // Notify pop function so that it can effectively remove a work item.
-        cv_.notify_all();
-    }
-
-    /// @brief pop work item from the queue or block waiting
-    ///
-    /// Used to retrieve and remove a work item from the queue
-    /// If the queue is 'disabled', this function returns immediately
-    /// (no element).
-    /// If the queue is 'enabled', this function returns the first element in
-    /// the queue or blocks the calling thread if there are no work items
-    /// available.
-    ///
-    /// @param item the reference of the item removed from the queue, if any
-    ///
-    /// @return true if there was a work item removed from the queue, false
-    /// otherwise
-    bool pop(WorkItem& item) {
-        std::unique_lock<std::mutex> lock(mutex_);
-        while (!exit_) {
-            if (queue_.empty()) {
-                // Wait for push or stop functions.
-                cv_.wait(lock);
-                continue;
-            }
-
-            item = queue_.front();
-            queue_.pop();
-            return true;
-        }
-
-        return false;
-    }
-
-    /// @brief count number of work items in the queue
-    ///
-    /// Returns the number of work items in the queue
-    ///
-    /// @return the number of work items
-    size_t count() {
-        std::lock_guard<std::mutex> lock(mutex_);
-        return queue_.size();
-    }
-
-    /// @brief clear remove all work items
-    ///
-    /// Removes all queued work items
-    void clear() {
-        std::lock_guard<std::mutex> lock(mutex_);
-        reset();
-    }
-
-    /// @brief start and enable the queue
-    ///
-    /// Sets the queue state to 'enabled'
-    void start() {
-        std::lock_guard<std::mutex> lock(mutex_);
-        exit_ = false;
-    }
-
-    /// brief stop and disable the queue
-    ///
-    /// Sets the queue state to 'disabled' and optionally removes all work items
-    ///
-    /// @param clear used to specify if the function should also clear the queue
-    void stop(bool clear = false) {
-        std::lock_guard<std::mutex> lock(mutex_);
-        exit_ = true;
-        // Notify get() so that it can exit.
-        cv_.notify_all();
-        if (clear) {
-            reset();
-        }
-    }
-
-private:
-    /// @brief reset clears the queue removing all work items
-    ///
-    /// Must be called in a critical section (mutex locked scope).
-    void reset() {
-        queue_ = std::queue<WorkItem>();
-    }
-
-    /// @brief underlying queue container
-    std::queue<WorkItem> queue_;
-
-    /// @brief mutex used for critical sections
-    std::mutex mutex_;
-
-    /// @brief condition variable used to signal waiting threads
-    std::condition_variable cv_;
-
-    /// @brief the sate of the queue
-    ///
-    /// The 'enabled' state corresponds to false value
-    /// The 'disabled' state corresponds to true value
-    std::atomic_bool exit_;
-};
+struct ThreadPoolQueue;
 
 /// @brief Defines a thread pool which uses a thread pool queue for managing
 /// work items. Each work item is a 'function' object.
@@ -215,10 +82,10 @@ private:
     void run();
 
     /// @brief list of worker threads
-    std::list<std::shared_ptr<std::thread>> worker_threads_;
+    std::list<std::shared_ptr<std::thread>> threads_;
 
     /// @brief underlying work items queue
-    ThreadPoolQueue<WorkItemCallBack> queue_;
+    std::shared_ptr<ThreadPoolQueue<WorkItemCallBack>> queue_;
 
     /// @brief state of the thread pool
     /// The 'run' state corresponds to false value
index 5ba8bdebba2c1a4dd5bffdd9fd8d8673e3316af6..594d778fbd6f5aaf535cbe3cb9f928282ec54acb 100644 (file)
@@ -34,8 +34,8 @@ public:
     /// @brief Constructor
     ///
     /// @param recursive sets the mutex as recursive mutex
-    TestMutex(bool recursive = false) : lock_(0), lock_count_(0), unlock_count_(0),
-        dead_lock_(false), recursive_(recursive) {
+    TestMutex(bool recursive = false) : lock_(0), dead_lock_(false),
+         lock_count_(0), unlock_count_(0), recursive_(recursive) {
     }
 
     /// @brief lock the mutex
@@ -97,6 +97,14 @@ public:
         return lock_;
     }
 
+    /// @brief get the mutex dead lock state
+    ///
+    /// @return the mutex dead lock state
+    bool getDeadLock() {
+        lock_guard<mutex> lk(mutex_);
+        return dead_lock_;
+    }
+
     /// @brief get the number of locks performed on mutex
     ///
     /// @return the mutex number of locks
@@ -113,14 +121,6 @@ public:
         return unlock_count_;
     }
 
-    /// @brief get the mutex dead lock state
-    ///
-    /// @return the mutex dead lock state
-    bool getDeadLock() {
-        lock_guard<mutex> lk(mutex_);
-        return dead_lock_;
-    }
-
     /// @brief test the internal state of the mutex
     ///
     /// @param expected_lock check equality of this value with lock state
@@ -141,15 +141,15 @@ private:
     /// @brief internal lock state of the mutex
     int32_t lock_;
 
+    /// @brief state which indicated that the mutex in in dead lock
+    bool dead_lock_;
+
     /// @brief total number of locks performed on the mutex
     uint32_t lock_count_;
 
     /// @brief total number of unlocks performed on the mutex
     uint32_t unlock_count_;
 
-    /// @brief state which indicated that the mutex in in dead lock
-    bool dead_lock_;
-
     /// @brief flag to indicate if the mutex is recursive or not
     bool recursive_;
 
@@ -160,56 +160,13 @@ private:
     std::thread::id id_;
 };
 
-/// @brief Test Fixture for testing isc:util::thread::LockGuard
+/// @brief Test Fixture for testing isc::util::thread::LockGuard
 class LockGuardTest : public ::testing::Test {
-public:
-    void run() {
-        {
-            // make sure this thread has started and it is accounted for
-            lock_guard<mutex> lk(mutex_);
-            ids_.emplace(this_thread::get_id());
-            ++count_;
-            // wake main thread if it is waiting for this thread to start
-            cv_.notify_all();
-        }
-    }
-
-    void reset() {
-        count_ = 0;
-    }
-
-    uint32_t count() {
-        return count_;
-    }
-
-    std::mutex mutex_;
-
-    condition_variable cv_;
-
-    uint32_t count_;
-
-    set<std::thread::id> ids_;
-
-    list<shared_ptr<std::thread>> threads_;
 };
 
+/// @brief test LockGuard functionality with non-recursive mutex, recursive mutex
+/// and null pointer
 TEST_F(LockGuardTest, testLock) {
-    reset();
-    // make sure we have all threads running when performing all the checks
-    {
-        unique_lock<mutex> lck(mutex_);
-        for (uint32_t i = 0; i < 2; ++i) {
-            threads_.push_back(make_shared<std::thread>(&LockGuardTest::run, this));
-        }
-        cv_.wait(lck, [&]{ return count() == 2; });
-    }
-    for (auto thread : threads_) {
-        thread->join();
-    }
-    threads_.clear();
-    ASSERT_EQ(count_, 2);
-    ASSERT_EQ(ids_.size(), 2);
-
     shared_ptr<TestMutex> test_mutex;
     // test non-recursive lock
     test_mutex = make_shared<TestMutex>();
@@ -222,7 +179,8 @@ TEST_F(LockGuardTest, testLock) {
         {
             // call LockGuard constructor which locks mutex resulting in an
             // exception as the mutex is already locked (dead lock)
-            EXPECT_THROW(LockGuard<TestMutex> lock(test_mutex.get()), isc::InvalidOperation);
+            EXPECT_THROW(LockGuard<TestMutex> lock(test_mutex.get()),
+                         isc::InvalidOperation);
             // expect lock 1 lock_count 1 unlock_count 0 dead_lock true
             // you should not be able to get here...using a real mutex
             test_mutex->testMutexState(1, 1, 0, true);
@@ -263,4 +221,5 @@ TEST_F(LockGuardTest, testLock) {
         }
     }
     }
-}
+
+}  // namespace