libdhcpsrv_unittests_SOURCES += subnet_unittest.cc
libdhcpsrv_unittests_SOURCES += test_get_callout_handle.cc test_get_callout_handle.h
libdhcpsrv_unittests_SOURCES += triplet_unittest.cc
+libdhcpsrv_unittests_SOURCES += thread_pool_unittest.cc
libdhcpsrv_unittests_SOURCES += test_utils.cc test_utils.h
libdhcpsrv_unittests_SOURCES += timer_mgr_unittest.cc
libdhcpsrv_unittests_SOURCES += network_state_unittest.cc
--- /dev/null
+// Copyright (C) 2018-2019 Internet Systems Consortium, Inc. ("ISC")
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+#include <config.h>
+
+#include <gtest/gtest.h>
+
+#include <dhcpsrv/thread_pool.h>
+
+using namespace isc::dhcp;
+using namespace std;
+
+namespace {
+
+/// @brief Test Fixture for testing isc::dhcp::ThreadPool
+class ThreadPoolTest : public ::testing::Test {
+public:
+ ThreadPoolTest() : thread_count_(0), count_(0), wait_(false) {
+ }
+
+ /// @brief task function which registers the thread id and signals main
+ /// thread to wake
+ void runAndWait() {
+ run();
+ unique_lock<mutex> lk(wait_mutex_);
+ wait_cv_.wait(lk, [&]{ return (wait() == false); });
+ }
+
+ void run() {
+ // make sure this thread has started and it is accounted for
+ lock_guard<mutex> lk(mutex_);
+ auto id = this_thread::get_id();
+ ids_.emplace(id);
+ ++count_;
+ history_[id].push_back(count_);
+ // wake main thread if it is waiting for this thread to start
+ cv_.notify_all();
+ }
+
+ void reset(uint32_t thread_count) {
+ stopThreads();
+ thread_count_ = thread_count;
+ count_ = 0;
+ wait_ = true;
+ history_.clear();
+ }
+
+ void waitThreads(uint32_t thread_count, bool start = false) {
+ // make sure we have all threads running when performing all the checks
+ unique_lock<mutex> lck(mutex_);
+ if (start) {
+ startThreads(thread_count);
+ }
+ cv_.wait(lck, [&]{ return (count() == thread_count); });
+ }
+
+ void startThreads(uint32_t thread_count) {
+ for (uint32_t i = 0; i < thread_count; ++i) {
+ threads_.push_back(make_shared<std::thread>(&ThreadPoolTest::run, this));
+ }
+ }
+
+ void stopThreads() {
+ signalThreads();
+ for (auto thread : threads_) {
+ thread->join();
+ }
+ threads_.clear();
+ }
+
+ void signalThreads() {
+ lock_guard<mutex> lk(wait_mutex_);
+ if (wait_) {
+ // wake all thread if waiting for main thread
+ wait_cv_.notify_all();
+ }
+ wait_ = false;
+ }
+
+ uint32_t count() {
+ return count_;
+ }
+
+ bool wait() {
+ return wait_;
+ }
+
+ void checkRunHistory(uint32_t items_count) {
+ uint32_t count = 0;
+ for (auto element : history_) {
+ count += element.second.size();
+ }
+ ASSERT_EQ(count, items_count);
+ }
+
+ uint32_t thread_count_;
+
+ std::mutex mutex_;
+
+ condition_variable cv_;
+
+ std::mutex wait_mutex_;
+
+ condition_variable wait_cv_;
+
+ uint32_t count_;
+
+ bool wait_;
+
+ set<std::thread::id> ids_;
+
+ map<std::thread::id, list<uint32_t>> history_;
+
+ list<shared_ptr<std::thread>> threads_;
+};
+
+/// @brief test ThreadPool functionality
+TEST_F(ThreadPoolTest, testCreateAndDestroy) {
+ uint32_t items_count = 4;
+ reset(items_count);
+ ThreadPool::WorkItemCallBack call_back;
+
+ ThreadPool thread_pool;
+ ASSERT_EQ(thread_pool.count(), 0);
+
+ // create tasks which block thread pool threads until signaled by main
+ // to force all threads of the thread pool to run exactly one task
+ call_back = std::bind(&ThreadPoolTest::runAndWait, this);
+
+ for (uint32_t i = 0; i < items_count; ++i) {
+ thread_pool.add(call_back);
+ }
+ ASSERT_EQ(thread_pool.count(), items_count);
+
+ thread_pool.create(items_count, false);
+ ASSERT_EQ(thread_pool.count(), 0);
+
+ for (uint32_t i = 0; i < items_count; ++i) {
+ thread_pool.add(call_back);
+ }
+ ASSERT_EQ(thread_pool.count(), items_count);
+
+ thread_pool.destroy();
+ ASSERT_EQ(thread_pool.count(), 0);
+
+ thread_pool.create(items_count);
+
+ for (uint32_t i = 0; i < items_count; ++i) {
+ thread_pool.add(call_back);
+ }
+
+ waitThreads(items_count);
+ ASSERT_EQ(thread_pool.count(), 0);
+ ASSERT_EQ(ids_.size(), items_count);
+ ASSERT_EQ(count(), items_count);
+
+ checkRunHistory(items_count);
+
+ signalThreads();
+
+ thread_pool.destroy();
+ ASSERT_EQ(thread_pool.count(), 0);
+
+ reset(items_count);
+
+ // create tasks which do not block the thread pool threads so that several
+ // tasks can be run on the same thread and some of the threads never even
+ // having a chance to run
+ call_back = std::bind(&ThreadPoolTest::run, this);
+
+ thread_pool.create(items_count);
+
+ for (uint32_t i = 0; i < items_count; ++i) {
+ thread_pool.add(call_back);
+ }
+
+ waitThreads(items_count);
+ ASSERT_EQ(thread_pool.count(), 0);
+ ASSERT_EQ(count(), items_count);
+
+ checkRunHistory(items_count);
+}
+
+} // namespace
namespace isc {
namespace dhcp {
-ThreadPool::ThreadPool() : exit_(true) {
+/// @brief Defines a generic thread pool queue.
+///
+/// The main purpose is to safely manage thread pool tasks.
+/// The thread pool queue can be 'disabled', which means that no items can be
+/// added or removed from the queue, or 'enabled', which guarantees that
+/// inserting or removing items are thread safe.
+/// In 'disabled' state, all threads waiting on the queue are unlocked and all
+/// operations are non blocking.
+template <typename WorkItem>
+struct ThreadPoolQueue {
+ /// @brief Constructor
+ ///
+ /// Creates the thread pool queue in 'disabled' state
+ ThreadPoolQueue() : exit_(true) {
+ }
+
+ /// @brief Destructor
+ ///
+ /// Destroys the thread pool queue
+ ~ThreadPoolQueue() {
+ stop(true);
+ }
+
+ /// @brief push work item to the queue
+ ///
+ /// Used to add work items in the queue.
+ /// This function adds an item to the queue and wakes up at least one thread
+ /// waiting on the queue.
+ ///
+ /// @param item the new iten to be added to the queue
+ void push(WorkItem item) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ queue_.push(item);
+ // Notify pop function so that it can effectively remove a work item.
+ cv_.notify_all();
+ }
+
+ /// @brief pop work item from the queue or block waiting
+ ///
+ /// Used to retrieve and remove a work item from the queue
+ /// If the queue is 'disabled', this function returns immediately
+ /// (no element).
+ /// If the queue is 'enabled', this function returns the first element in
+ /// the queue or blocks the calling thread if there are no work items
+ /// available.
+ ///
+ /// @param item the reference of the item removed from the queue, if any
+ ///
+ /// @return true if there was a work item removed from the queue, false
+ /// otherwise
+ bool pop(WorkItem& item) {
+ std::unique_lock<std::mutex> lock(mutex_);
+ while (!exit_) {
+ if (queue_.empty()) {
+ // Wait for push or stop functions.
+ cv_.wait(lock);
+ continue;
+ }
+
+ item = queue_.front();
+ queue_.pop();
+ return true;
+ }
+
+ return false;
+ }
+
+ /// @brief count number of work items in the queue
+ ///
+ /// Returns the number of work items in the queue
+ ///
+ /// @return the number of work items
+ size_t count() {
+ std::lock_guard<std::mutex> lock(mutex_);
+ return queue_.size();
+ }
+
+ /// @brief clear remove all work items
+ ///
+ /// Removes all queued work items
+ void clear() {
+ std::lock_guard<std::mutex> lock(mutex_);
+ reset();
+ }
+
+ /// @brief start and enable the queue
+ ///
+ /// Sets the queue state to 'enabled'
+ void start() {
+ std::lock_guard<std::mutex> lock(mutex_);
+ exit_ = false;
+ }
+
+ /// brief stop and disable the queue
+ ///
+ /// Sets the queue state to 'disabled' and optionally removes all work items
+ ///
+ /// @param clear used to specify if the function should also clear the queue
+ void stop(bool clear = false) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ exit_ = true;
+ // Notify get() so that it can exit.
+ cv_.notify_all();
+ if (clear) {
+ reset();
+ }
+ }
+
+private:
+ /// @brief reset clears the queue removing all work items
+ ///
+ /// Must be called in a critical section (mutex locked scope).
+ void reset() {
+ queue_ = std::queue<WorkItem>();
+ }
+
+ /// @brief underlying queue container
+ std::queue<WorkItem> queue_;
+
+ /// @brief mutex used for critical sections
+ std::mutex mutex_;
+
+ /// @brief condition variable used to signal waiting threads
+ std::condition_variable cv_;
+
+ /// @brief the sate of the queue
+ ///
+ /// The 'enabled' state corresponds to false value
+ /// The 'disabled' state corresponds to true value
+ std::atomic_bool exit_;
+};
+
+ThreadPool::ThreadPool() :
+ queue_(make_shared<ThreadPoolQueue<WorkItemCallBack>>()), exit_(true) {
}
ThreadPool::~ThreadPool() {
destroy();
}
-void ThreadPool::create(uint32_t worker_threads, bool run) {
+void ThreadPool::create(uint32_t thread_count, bool run) {
LOG_INFO(dhcpsrv_logger, "Thread pool starting with %1 worker threads")
- .arg(worker_threads);
- if (!worker_threads) {
+ .arg(thread_count);
+
+ if (!thread_count) {
return;
}
destroy();
if (run) {
- start(worker_threads);
+ start(thread_count);
}
LOG_INFO(dhcpsrv_logger, "Thread pool created");
void ThreadPool::destroy() {
LOG_INFO(dhcpsrv_logger, "Thread pool shutting down");
+
stop(true);
LOG_INFO(dhcpsrv_logger, "Thread pool shut down");
}
-void ThreadPool::start(uint32_t worker_threads) {
- queue_.start();
+void ThreadPool::start(uint32_t thread_count) {
+ queue_->start();
exit_ = false;
- for (int i = 0; i < worker_threads; ++i) {
- worker_threads_.push_back(make_shared<thread>(&ThreadPool::run, this));
+ for (int i = 0; i < thread_count; ++i) {
+ threads_.push_back(make_shared<thread>(&ThreadPool::run, this));
}
LOG_INFO(dhcpsrv_logger, "Thread pool started");
void ThreadPool::stop(bool clear) {
exit_ = true;
- queue_.stop(clear);
- for (auto thread : worker_threads_) {
+ queue_->stop(clear);
+ for (auto thread : threads_) {
thread->join();
}
- worker_threads_.clear();
+ threads_.clear();
LOG_INFO(dhcpsrv_logger, "Thread pool stopped");
}
void ThreadPool::add(WorkItemCallBack call_back) {
- queue_.push(call_back);
+ queue_->push(call_back);
}
size_t ThreadPool::count() {
- return queue_.count();
+ return queue_->count();
}
void ThreadPool::run() {
LOG_INFO(dhcpsrv_logger, "Thread pool thread started. id: %1").arg(th_id);
while (!exit_) {
- WorkItemCallBack work_item;
- if (queue_.pop(work_item)) {
- work_item();
+ WorkItemCallBack item;
+ if (queue_->pop(item)) {
+ item();
}
}
namespace isc {
namespace dhcp {
-/// @brief Defines a generic thread pool queue.
-///
-/// The main purpose is to safely manage thread pool tasks.
-/// The thread pool queue can be 'disabled', which means that no items can be
-/// added or removed from the queue, or 'enabled', which guarantees that
-/// inserting or removing items are thread safe.
-/// In 'disabled' state, all threads waiting on the queue are unlocked and all
-/// operations are non blocking.
template <typename WorkItem>
-struct ThreadPoolQueue {
- /// @brief Constructor
- ///
- /// Creates the thread pool queue in 'disabled' state
- ThreadPoolQueue() : exit_(true) {
- }
-
- /// @brief Destructor
- ///
- /// Destroys the thread pool queue
- ~ThreadPoolQueue() {
- stop(true);
- }
-
- /// @brief push work item to the queue
- ///
- /// Used to add work items in the queue.
- /// If the queue is 'disabled', this function returns immediately.
- /// If the queue is 'enabled', this function adds an item to the queue and
- /// wakes up at least one thread waiting on the queue.
- ///
- /// @param item the new iten to be added to the queue
- void push(WorkItem item) {
- std::lock_guard<std::mutex> lock(mutex_);
- if (exit_) {
- return;
- }
- queue_.push(item);
- // Notify pop function so that it can effectively remove a work item.
- cv_.notify_all();
- }
-
- /// @brief pop work item from the queue or block waiting
- ///
- /// Used to retrieve and remove a work item from the queue
- /// If the queue is 'disabled', this function returns immediately
- /// (no element).
- /// If the queue is 'enabled', this function returns the first element in
- /// the queue or blocks the calling thread if there are no work items
- /// available.
- ///
- /// @param item the reference of the item removed from the queue, if any
- ///
- /// @return true if there was a work item removed from the queue, false
- /// otherwise
- bool pop(WorkItem& item) {
- std::unique_lock<std::mutex> lock(mutex_);
- while (!exit_) {
- if (queue_.empty()) {
- // Wait for push or stop functions.
- cv_.wait(lock);
- continue;
- }
-
- item = queue_.front();
- queue_.pop();
- return true;
- }
-
- return false;
- }
-
- /// @brief count number of work items in the queue
- ///
- /// Returns the number of work items in the queue
- ///
- /// @return the number of work items
- size_t count() {
- std::lock_guard<std::mutex> lock(mutex_);
- return queue_.size();
- }
-
- /// @brief clear remove all work items
- ///
- /// Removes all queued work items
- void clear() {
- std::lock_guard<std::mutex> lock(mutex_);
- reset();
- }
-
- /// @brief start and enable the queue
- ///
- /// Sets the queue state to 'enabled'
- void start() {
- std::lock_guard<std::mutex> lock(mutex_);
- exit_ = false;
- }
-
- /// brief stop and disable the queue
- ///
- /// Sets the queue state to 'disabled' and optionally removes all work items
- ///
- /// @param clear used to specify if the function should also clear the queue
- void stop(bool clear = false) {
- std::lock_guard<std::mutex> lock(mutex_);
- exit_ = true;
- // Notify get() so that it can exit.
- cv_.notify_all();
- if (clear) {
- reset();
- }
- }
-
-private:
- /// @brief reset clears the queue removing all work items
- ///
- /// Must be called in a critical section (mutex locked scope).
- void reset() {
- queue_ = std::queue<WorkItem>();
- }
-
- /// @brief underlying queue container
- std::queue<WorkItem> queue_;
-
- /// @brief mutex used for critical sections
- std::mutex mutex_;
-
- /// @brief condition variable used to signal waiting threads
- std::condition_variable cv_;
-
- /// @brief the sate of the queue
- ///
- /// The 'enabled' state corresponds to false value
- /// The 'disabled' state corresponds to true value
- std::atomic_bool exit_;
-};
+struct ThreadPoolQueue;
/// @brief Defines a thread pool which uses a thread pool queue for managing
/// work items. Each work item is a 'function' object.
void run();
/// @brief list of worker threads
- std::list<std::shared_ptr<std::thread>> worker_threads_;
+ std::list<std::shared_ptr<std::thread>> threads_;
/// @brief underlying work items queue
- ThreadPoolQueue<WorkItemCallBack> queue_;
+ std::shared_ptr<ThreadPoolQueue<WorkItemCallBack>> queue_;
/// @brief state of the thread pool
/// The 'run' state corresponds to false value
/// @brief Constructor
///
/// @param recursive sets the mutex as recursive mutex
- TestMutex(bool recursive = false) : lock_(0), lock_count_(0), unlock_count_(0),
- dead_lock_(false), recursive_(recursive) {
+ TestMutex(bool recursive = false) : lock_(0), dead_lock_(false),
+ lock_count_(0), unlock_count_(0), recursive_(recursive) {
}
/// @brief lock the mutex
return lock_;
}
+ /// @brief get the mutex dead lock state
+ ///
+ /// @return the mutex dead lock state
+ bool getDeadLock() {
+ lock_guard<mutex> lk(mutex_);
+ return dead_lock_;
+ }
+
/// @brief get the number of locks performed on mutex
///
/// @return the mutex number of locks
return unlock_count_;
}
- /// @brief get the mutex dead lock state
- ///
- /// @return the mutex dead lock state
- bool getDeadLock() {
- lock_guard<mutex> lk(mutex_);
- return dead_lock_;
- }
-
/// @brief test the internal state of the mutex
///
/// @param expected_lock check equality of this value with lock state
/// @brief internal lock state of the mutex
int32_t lock_;
+ /// @brief state which indicated that the mutex in in dead lock
+ bool dead_lock_;
+
/// @brief total number of locks performed on the mutex
uint32_t lock_count_;
/// @brief total number of unlocks performed on the mutex
uint32_t unlock_count_;
- /// @brief state which indicated that the mutex in in dead lock
- bool dead_lock_;
-
/// @brief flag to indicate if the mutex is recursive or not
bool recursive_;
std::thread::id id_;
};
-/// @brief Test Fixture for testing isc:util::thread::LockGuard
+/// @brief Test Fixture for testing isc::util::thread::LockGuard
class LockGuardTest : public ::testing::Test {
-public:
- void run() {
- {
- // make sure this thread has started and it is accounted for
- lock_guard<mutex> lk(mutex_);
- ids_.emplace(this_thread::get_id());
- ++count_;
- // wake main thread if it is waiting for this thread to start
- cv_.notify_all();
- }
- }
-
- void reset() {
- count_ = 0;
- }
-
- uint32_t count() {
- return count_;
- }
-
- std::mutex mutex_;
-
- condition_variable cv_;
-
- uint32_t count_;
-
- set<std::thread::id> ids_;
-
- list<shared_ptr<std::thread>> threads_;
};
+/// @brief test LockGuard functionality with non-recursive mutex, recursive mutex
+/// and null pointer
TEST_F(LockGuardTest, testLock) {
- reset();
- // make sure we have all threads running when performing all the checks
- {
- unique_lock<mutex> lck(mutex_);
- for (uint32_t i = 0; i < 2; ++i) {
- threads_.push_back(make_shared<std::thread>(&LockGuardTest::run, this));
- }
- cv_.wait(lck, [&]{ return count() == 2; });
- }
- for (auto thread : threads_) {
- thread->join();
- }
- threads_.clear();
- ASSERT_EQ(count_, 2);
- ASSERT_EQ(ids_.size(), 2);
-
shared_ptr<TestMutex> test_mutex;
// test non-recursive lock
test_mutex = make_shared<TestMutex>();
{
// call LockGuard constructor which locks mutex resulting in an
// exception as the mutex is already locked (dead lock)
- EXPECT_THROW(LockGuard<TestMutex> lock(test_mutex.get()), isc::InvalidOperation);
+ EXPECT_THROW(LockGuard<TestMutex> lock(test_mutex.get()),
+ isc::InvalidOperation);
// expect lock 1 lock_count 1 unlock_count 0 dead_lock true
// you should not be able to get here...using a real mutex
test_mutex->testMutexState(1, 1, 0, true);
}
}
}
-}
+
+} // namespace