#include <dhcpsrv/thread_pool.h>
+#include <boost/function.hpp>
+
using namespace isc::dhcp;
using namespace std;
list<shared_ptr<std::thread>> threads_;
};
+typedef function<void()> CallBack;
+
/// @brief test ThreadPool add and count
TEST_F(ThreadPoolTest, testAddAndCount) {
uint32_t items_count;
- ThreadPool::WorkItemCallBack call_back;
- ThreadPool thread_pool;
+ CallBack call_back;
+ ThreadPool<CallBack> thread_pool;
// the item count should be 0
ASSERT_EQ(thread_pool.count(), 0);
// the thread count should be 0
for (uint32_t i = 0; i < items_count; ++i) {
thread_pool.add(call_back);
}
- // the item count should match
- ASSERT_EQ(thread_pool.count(), items_count);
-}
-
-/// @brief test ThreadPool create and destroy
-TEST_F(ThreadPoolTest, testCreateAndDestroy) {
- uint32_t items_count;
- uint32_t thread_count;
- ThreadPool::WorkItemCallBack call_back;
- ThreadPool thread_pool;
- // the item count should be 0
- ASSERT_EQ(thread_pool.count(), 0);
- // the thread count should be 0
- ASSERT_EQ(thread_pool.size(), 0);
-
- items_count = 4;
- thread_count = 4;
- // prepare setup
- reset(thread_count);
-
- // create tasks which block thread pool threads until signaled by main
- // thread to force all threads of the thread pool to run exactly one task
- call_back = std::bind(&ThreadPoolTest::runAndWait, this);
-
- // add items to stopped thread pool
- for (uint32_t i = 0; i < items_count; ++i) {
- thread_pool.add(call_back);
- }
- // the item count should match
- ASSERT_EQ(thread_pool.count(), items_count);
- // calling create with false should not create threads and should remove all
- // queued items
- thread_pool.create(thread_count, false);
- // the item count should be 0
- ASSERT_EQ(thread_pool.count(), 0);
- // the thread count should be 0
- ASSERT_EQ(thread_pool.size(), 0);
-
- // add items to stopped thread pool
- for (uint32_t i = 0; i < items_count; ++i) {
- thread_pool.add(call_back);
- }
// the item count should match
ASSERT_EQ(thread_pool.count(), items_count);
- // the thread count should be 0
- ASSERT_EQ(thread_pool.size(), 0);
-
- // calling destroy should clear all threads and should remove all queued
- // items
- thread_pool.destroy();
- // the item count should be 0
- ASSERT_EQ(thread_pool.count(), 0);
- // the thread count should be 0
- ASSERT_EQ(thread_pool.size(), 0);
-
- // do it once again to check if it works
- thread_pool.destroy();
- // the item count should be 0
- ASSERT_EQ(thread_pool.count(), 0);
- // the thread count should be 0
- ASSERT_EQ(thread_pool.size(), 0);
- // calling create with false should not create threads and should remove all
- // queued items
- thread_pool.create(thread_count, false);
- // the item count should be 0
- ASSERT_EQ(thread_pool.count(), 0);
- // the thread count should be 0
- ASSERT_EQ(thread_pool.size(), 0);
-
- // add items to stopped thread pool
- for (uint32_t i = 0; i < items_count; ++i) {
- thread_pool.add(call_back);
- }
- // the item count should match
- ASSERT_EQ(thread_pool.count(), items_count);
- // the thread count should be 0
- ASSERT_EQ(thread_pool.size(), 0);
-
- // calling create with true should create threads and should remove all
- // queued items
- thread_pool.create(thread_count);
- // the item count should be 0
- ASSERT_EQ(thread_pool.count(), 0);
- // the thread count should match
- ASSERT_EQ(thread_pool.size(), thread_count);
-
- // add items to running thread pool
- for (uint32_t i = 0; i < items_count; ++i) {
- thread_pool.add(call_back);
- }
-
- // wait for all items to be processed
- waitTasks(thread_count, items_count);
- // the item count should be 0
- ASSERT_EQ(thread_pool.count(), 0);
- // the thread count should match
- ASSERT_EQ(thread_pool.size(), thread_count);
- // as each thread pool thread is still waiting on main to unblock, each
- // thread should have been registered in ids list
- checkIds(items_count);
- // all items should have been processed
- ASSERT_EQ(count(), items_count);
-
- // check that the number of processed tasks matches the number of items
- checkRunHistory(items_count);
-
- // signal thread pool tasks to continue
- signalThreads();
-
- // calling destroy should clear all threads and should remove all queued
- // items
- thread_pool.destroy();
- // the item count should be 0
- ASSERT_EQ(thread_pool.count(), 0);
- // the thread count should be 0
- ASSERT_EQ(thread_pool.size(), 0);
-
- // do it once again to check if it works
- thread_pool.destroy();
- // the item count should be 0
- ASSERT_EQ(thread_pool.count(), 0);
- // the thread count should be 0
- ASSERT_EQ(thread_pool.size(), 0);
-
- items_count = 64;
- thread_count = 16;
- // prepare setup
- reset(thread_count);
-
- // create tasks which do not block the thread pool threads so that several
- // tasks can be run on the same thread and some of the threads never even
- // having a chance to run
- call_back = std::bind(&ThreadPoolTest::run, this);
-
- // calling create with true should create threads and should remove all
- // queued items
- thread_pool.create(thread_count);
- // the item count should be 0
- ASSERT_EQ(thread_pool.count(), 0);
- // the thread count should match
- ASSERT_EQ(thread_pool.size(), thread_count);
-
- // add items to running thread pool
- for (uint32_t i = 0; i < items_count; ++i) {
- thread_pool.add(call_back);
- }
-
- // wait for all items to be processed
- waitTasks(thread_count, items_count);
- // the item count should be 0
- ASSERT_EQ(thread_pool.count(), 0);
- // the thread count should match
- ASSERT_EQ(thread_pool.size(), thread_count);
- // all items should have been processed
- ASSERT_EQ(count(), items_count);
-
- // check that the number of processed tasks matches the number of items
- checkRunHistory(items_count);
-
- // calling destroy should clear all threads and should remove all queued
- // items
- thread_pool.destroy();
+ // calling reset should clear all threads and should remove all queued items
+ thread_pool.reset();
// the item count should be 0
ASSERT_EQ(thread_pool.count(), 0);
// the thread count should be 0
TEST_F(ThreadPoolTest, testStartAndStop) {
uint32_t items_count;
uint32_t thread_count;
- ThreadPool::WorkItemCallBack call_back;
- ThreadPool thread_pool;
+ CallBack call_back;
+ ThreadPool<CallBack> thread_pool;
// the item count should be 0
ASSERT_EQ(thread_pool.count(), 0);
// the thread count should be 0
// the thread count should match
ASSERT_EQ(thread_pool.size(), thread_count);
- // calling stop with false should clear all threads and should keep queued
- // items
+ // calling stop should clear all threads and should keep queued items
thread_pool.stop();
// the item count should be 0
ASSERT_EQ(thread_pool.count(), 0);
for (uint32_t i = 0; i < items_count; ++i) {
thread_pool.add(call_back);
}
+
// the item count should match
ASSERT_EQ(thread_pool.count(), items_count);
// the thread count should be 0
ASSERT_EQ(thread_pool.size(), 0);
- // calling stop with false should clear all threads and should keep queued
- // items
+ // calling stop should clear all threads and should keep queued items
thread_pool.stop();
// the item count should match
ASSERT_EQ(thread_pool.count(), items_count);
// the thread count should be 0
ASSERT_EQ(thread_pool.size(), 0);
- // calling stop with true should clear all threads and should remove all
- // queued items
- thread_pool.stop(true);
+ // calling reset should clear all threads and should remove all queued items
+ thread_pool.reset();
+ // the item count should be 0
+ ASSERT_EQ(thread_pool.count(), 0);
+ // the thread count should be 0
+ ASSERT_EQ(thread_pool.size(), 0);
+
+ // do it once again to check if it works
+ thread_pool.reset();
// the item count should be 0
ASSERT_EQ(thread_pool.count(), 0);
// the thread count should be 0
// signal thread pool tasks to continue
signalThreads();
- // calling stop with false should clear all threads and should keep queued
- // items
+ // calling stop should clear all threads and should keep queued items
thread_pool.stop();
// the item count should be 0
ASSERT_EQ(thread_pool.count(), 0);
for (uint32_t i = 0; i < items_count; ++i) {
thread_pool.add(call_back);
}
+
// the item count should match
ASSERT_EQ(thread_pool.count(), items_count);
// the thread count should be 0
// check that the number of processed tasks matches the number of items
checkRunHistory(items_count);
- // calling stop with false should clear all threads and should keep queued
- // items
+ // calling stop should clear all threads and should keep queued items
thread_pool.stop();
// the item count should be 0
ASSERT_EQ(thread_pool.count(), 0);
+++ /dev/null
-// Copyright (C) 2018-2019 Internet Systems Consortium, Inc. ("ISC")
-//
-// This Source Code Form is subject to the terms of the Mozilla Public
-// License, v. 2.0. If a copy of the MPL was not distributed with this
-// file, You can obtain one at http://mozilla.org/MPL/2.0/.
-
-#include <config.h>
-
-#include <dhcpsrv/dhcpsrv_log.h>
-#include <dhcpsrv/thread_pool.h>
-
-using namespace std;
-
-namespace isc {
-namespace dhcp {
-
-/// @brief Defines a generic thread pool queue.
-///
-/// The main purpose is to safely manage thread pool tasks.
-/// The thread pool queue can be 'disabled', which means that no items can be
-/// removed from the queue, or 'enabled', which guarantees that inserting or
-/// removing items are thread safe.
-/// In 'disabled' state, all threads waiting on the queue are unlocked and all
-/// operations are non blocking.
-template <typename WorkItem>
-struct ThreadPoolQueue {
- /// @brief Constructor
- ///
- /// Creates the thread pool queue in 'disabled' state
- ThreadPoolQueue() : exit_(true) {
- }
-
- /// @brief Destructor
- ///
- /// Destroys the thread pool queue
- ~ThreadPoolQueue() {
- stop(true);
- }
-
- /// @brief push work item to the queue
- ///
- /// Used to add work items in the queue.
- /// This function adds an item to the queue and wakes up at least one thread
- /// waiting on the queue.
- ///
- /// @param item the new item to be added to the queue
- void push(WorkItem& item) {
- if (item == nullptr) {
- return;
- }
- std::lock_guard<std::mutex> lock(mutex_);
- queue_.push(item);
- // Notify pop function so that it can effectively remove a work item.
- cv_.notify_all();
- }
-
- /// @brief pop work item from the queue or block waiting
- ///
- /// Used to retrieve and remove a work item from the queue
- /// If the queue is 'disabled', this function returns immediately
- /// (no element).
- /// If the queue is 'enabled', this function returns the first element in
- /// the queue or blocks the calling thread if there are no work items
- /// available.
- ///
- /// @param item the reference of the item removed from the queue, if any
- ///
- /// @return true if there was a work item removed from the queue, false
- /// otherwise
- WorkItem pop() {
- std::unique_lock<std::mutex> lock(mutex_);
- while (!exit_) {
- if (queue_.empty()) {
- // Wait for push or stop functions.
- cv_.wait(lock);
- continue;
- }
-
- WorkItem item = queue_.front();
- queue_.pop();
- return item;
- }
-
- return nullptr;
- }
-
- /// @brief count number of work items in the queue
- ///
- /// Returns the number of work items in the queue
- ///
- /// @return the number of work items
- size_t count() {
- std::lock_guard<std::mutex> lock(mutex_);
- return queue_.size();
- }
-
- /// @brief clear remove all work items
- ///
- /// Removes all queued work items
- void clear() {
- std::lock_guard<std::mutex> lock(mutex_);
- reset();
- }
-
- /// @brief start and enable the queue
- ///
- /// Sets the queue state to 'enabled'
- void start() {
- std::lock_guard<std::mutex> lock(mutex_);
- exit_ = false;
- }
-
- /// brief stop and disable the queue
- ///
- /// Sets the queue state to 'disabled' and optionally removes all work items
- ///
- /// @param clear used to specify if the function should also clear the queue
- void stop(bool clear = false) {
- std::lock_guard<std::mutex> lock(mutex_);
- exit_ = true;
- // Notify pop so that it can exit.
- cv_.notify_all();
- if (clear) {
- reset();
- }
- }
-
-private:
- /// @brief reset clears the queue removing all work items
- ///
- /// Must be called in a critical section (mutex locked scope).
- void reset() {
- queue_ = std::queue<WorkItem>();
- }
-
- /// @brief underlying queue container
- std::queue<WorkItem> queue_;
-
- /// @brief mutex used for critical sections
- std::mutex mutex_;
-
- /// @brief condition variable used to signal waiting threads
- std::condition_variable cv_;
-
- /// @brief the sate of the queue
- /// The 'enabled' state corresponds to false value
- /// The 'disabled' state corresponds to true value
- std::atomic_bool exit_;
-};
-
-ThreadPool::ThreadPool() :
- queue_(make_shared<ThreadPoolQueue<WorkItemCallBack>>()), exit_(true) {
-}
-
-ThreadPool::~ThreadPool() {
- destroy();
-}
-
-void ThreadPool::create(uint32_t thread_count, bool run) {
- LOG_INFO(dhcpsrv_logger, "Thread pool starting with %1 worker threads")
- .arg(thread_count);
-
- if (!thread_count) {
- return;
- }
- destroy();
- if (run) {
- start(thread_count);
- }
-
- LOG_INFO(dhcpsrv_logger, "Thread pool created");
-}
-
-void ThreadPool::destroy() {
- LOG_INFO(dhcpsrv_logger, "Thread pool shutting down");
-
- stop(true);
-
- LOG_INFO(dhcpsrv_logger, "Thread pool shut down");
-}
-
-void ThreadPool::start(uint32_t thread_count) {
- if (!thread_count || !exit_) {
- return;
- }
- queue_->start();
- exit_ = false;
- for (int i = 0; i < thread_count; ++i) {
- threads_.push_back(make_shared<thread>(&ThreadPool::run, this));
- }
-
- LOG_INFO(dhcpsrv_logger, "Thread pool started");
-}
-
-void ThreadPool::stop(bool clear) {
- exit_ = true;
- queue_->stop(clear);
- for (auto thread : threads_) {
- thread->join();
- }
- threads_.clear();
-
- LOG_INFO(dhcpsrv_logger, "Thread pool stopped");
-}
-
-void ThreadPool::add(WorkItemCallBack& call_back) {
- queue_->push(call_back);
-}
-
-size_t ThreadPool::count() {
- return queue_->count();
-}
-
-size_t ThreadPool::size() {
- return threads_.size();
-}
-
-void ThreadPool::run() {
- thread::id th_id = this_thread::get_id();
- LOG_INFO(dhcpsrv_logger, "Thread pool thread started. id: %1").arg(th_id);
-
- while (!exit_) {
- WorkItemCallBack item = queue_->pop();
- if (item != nullptr) {
- item();
- }
- }
-
- LOG_INFO(dhcpsrv_logger, "Thread pool thread ended. id: %1").arg(th_id);
-}
-
-} // namespace dhcp
-} // namespace isc
#ifndef THREAD_POOL_H
#define THREAD_POOL_H
-#include <boost/function.hpp>
+#include <dhcpsrv/dhcpsrv_log.h>
#include <atomic>
#include <condition_variable>
namespace isc {
namespace dhcp {
-template <typename WorkItem>
-struct ThreadPoolQueue;
-
/// @brief Defines a thread pool which uses a thread pool queue for managing
/// work items. Each work item is a 'function' object.
+template <typename WorkItem>
struct ThreadPool {
- using WorkItemCallBack = std::function<void()>;
-
/// @brief Constructor
- ThreadPool();
+ ThreadPool() : running_(false) {
+ }
/// @brief Destructor
- ~ThreadPool();
+ ~ThreadPool() {
+ reset();
+ }
- /// @brief initialize the thread pool with specified number of treads and
- /// optionally starts all threads.
- ///
- /// @param worker_threads specifies the number of threads to be created
- /// @param run to indicate if the threads should be started immediately
- void create(uint32_t thread_count, bool run = true);
+ /// @brief reset the thread pool stopping threads and clearing the internal
+ /// queue
+ void reset() {
+ LOG_INFO(dhcpsrv_logger, "Thread pool shutting down");
- /// @brief de-initialize the thread pool stopping threads and clearing the
- /// internal queue
- void destroy();
+ stop();
+ queue_.clear();
+
+ LOG_INFO(dhcpsrv_logger, "Thread pool shut down");
+ }
/// @brief start all the threads
///
- /// @param worker_threads specifies the number of threads to be created
- void start(uint32_t thread_count);
+ /// @param thread_count specifies the number of threads to be created and
+ /// started
+ void start(uint32_t thread_count) {
+ LOG_INFO(dhcpsrv_logger, "Thread pool starting with %1 worker threads")
+ .arg(thread_count);
+ if (!thread_count || running_) {
+ return;
+ }
+ queue_.enable();
+ running_ = true;
+ for (uint32_t i = 0; i < thread_count; ++i) {
+ threads_.push_back(std::make_shared<std::thread>(&ThreadPool::run, this));
+ }
+
+ LOG_INFO(dhcpsrv_logger, "Thread pool started");
+ }
/// @brief stop all the threads
- ///
- /// @param clear used to specify if the function should also clear the queue
- void stop(bool clear = false);
+ void stop() {
+ LOG_INFO(dhcpsrv_logger, "Thread pool stopping");
+ running_ = false;
+ queue_.disable();
+ for (auto thread : threads_) {
+ thread->join();
+ }
+ threads_.clear();
+
+ LOG_INFO(dhcpsrv_logger, "Thread pool stopped");
+ }
/// @brief add a working item to the thread pool
///
/// @param call_back the 'function' object to be added to the queue
- void add(WorkItemCallBack& call_back);
+ void add(WorkItem& item) {
+ queue_.push(item);
+ }
/// @brief count number of work items in the queue
///
/// @return the number of work items in the queue
- size_t count();
+ size_t count() {
+ return queue_.count();
+ }
/// @brief size number of thread pool threads
///
/// @return the number of threads
- size_t size();
-
+ size_t size() {
+ return threads_.size();
+ }
private:
+ /// @brief Defines a generic thread pool queue.
+ ///
+ /// The main purpose is to safely manage thread pool tasks.
+ /// The thread pool queue can be 'disabled', which means that no items can be
+ /// removed from the queue, or 'enabled', which guarantees that inserting or
+ /// removing items are thread safe.
+ /// In 'disabled' state, all threads waiting on the queue are unlocked and all
+ /// operations are non blocking.
+ template <typename Item>
+ struct ThreadPoolQueue {
+ /// @brief Constructor
+ ///
+ /// Creates the thread pool queue in 'disabled' state
+ ThreadPoolQueue() : enabled_(false) {
+ }
+
+ /// @brief Destructor
+ ///
+ /// Destroys the thread pool queue
+ ~ThreadPoolQueue() {
+ disable();
+ clear();
+ }
+
+ /// @brief push work item to the queue
+ ///
+ /// Used to add work items in the queue.
+ /// This function adds an item to the queue and wakes up at least one thread
+ /// waiting on the queue.
+ ///
+ /// @param item the new item to be added to the queue
+ void push(Item& item) {
+ if (!item) {
+ return;
+ }
+ std::lock_guard<std::mutex> lock(mutex_);
+ queue_.push(item);
+ // Notify pop function so that it can effectively remove a work item.
+ cv_.notify_all();
+ }
+
+ /// @brief pop work item from the queue or block waiting
+ ///
+ /// Used to retrieve and remove a work item from the queue
+ /// If the queue is 'disabled', this function returns immediately
+ /// (no element).
+ /// If the queue is 'enabled', this function returns the first element in
+ /// the queue or blocks the calling thread if there are no work items
+ /// available.
+ ///
+ /// @param item the reference of the item removed from the queue, if any
+ ///
+ /// @return true if there was a work item removed from the queue, false
+ /// otherwise
+ Item pop() {
+ std::unique_lock<std::mutex> lock(mutex_);
+ while (enabled_) {
+ if (queue_.empty()) {
+ // Wait for push or stop functions.
+ cv_.wait(lock);
+ continue;
+ }
+
+ Item item = queue_.front();
+ queue_.pop();
+ return item;
+ }
+
+ return Item();
+ }
+
+ /// @brief count number of work items in the queue
+ ///
+ /// Returns the number of work items in the queue
+ ///
+ /// @return the number of work items
+ size_t count() {
+ std::lock_guard<std::mutex> lock(mutex_);
+ return queue_.size();
+ }
+
+ /// @brief clear remove all work items
+ ///
+ /// Removes all queued work items
+ void clear() {
+ std::lock_guard<std::mutex> lock(mutex_);
+ queue_ = std::queue<Item>();
+ }
+
+ /// @brief start and enable the queue
+ ///
+ /// Sets the queue state to 'enabled'
+ void enable() {
+ std::lock_guard<std::mutex> lock(mutex_);
+ enabled_ = true;
+ }
+
+ /// brief stop and disable the queue
+ ///
+ /// Sets the queue state to 'disabled' and optionally removes all work items
+ void disable() {
+ std::lock_guard<std::mutex> lock(mutex_);
+ enabled_ = false;
+ // Notify pop so that it can exit.
+ cv_.notify_all();
+ }
+
+ private:
+ /// @brief underlying queue container
+ std::queue<Item> queue_;
+
+ /// @brief mutex used for critical sections
+ std::mutex mutex_;
+
+ /// @brief condition variable used to signal waiting threads
+ std::condition_variable cv_;
+
+ /// @brief the sate of the queue
+ /// The 'enabled' state corresponds to false value
+ /// The 'disabled' state corresponds to true value
+ std::atomic_bool enabled_;
+ };
+
/// @brief run function of each thread
- void run();
+ void run() {
+ std::thread::id th_id = std::this_thread::get_id();
+ LOG_INFO(dhcpsrv_logger, "Thread pool thread started. id: %1")
+ .arg(th_id);
+
+ while (running_) {
+ WorkItem item = queue_.pop();
+ if (item) {
+ item();
+ }
+ }
+
+ LOG_INFO(dhcpsrv_logger, "Thread pool thread ended. id: %1")
+ .arg(th_id);
+ }
/// @brief list of worker threads
std::list<std::shared_ptr<std::thread>> threads_;
/// @brief underlying work items queue
- std::shared_ptr<ThreadPoolQueue<WorkItemCallBack>> queue_;
+ ThreadPoolQueue<WorkItem> queue_;
/// @brief state of the thread pool
/// The 'run' state corresponds to false value
/// The 'stop' state corresponds to true value
- std::atomic_bool exit_;
+ std::atomic_bool running_;
};
} // namespace dhcp