// See the License for the specific language governing permissions and
// limitations under the License.
-#include <cassert>
#include <config.h>
+
#include <dhcpsrv/dhcpsrv_log.h>
#include <dhcpsrv/thread_pool.h>
-#include <functional>
-
using namespace std;
namespace isc {
destroy();
}
-void ThreadPool::create(uint32_t worker_threads) {
+void ThreadPool::create(uint32_t worker_threads, bool run) {
LOG_INFO(dhcpsrv_logger, "Thread pool starting with %1 worker threads")
.arg(worker_threads);
if (!worker_threads) {
return;
}
destroy();
- queue_.create();
+ if (run) {
+ start(worker_threads);
+ }
+
+ LOG_INFO(dhcpsrv_logger, "Thread pool created");
+}
+
+void ThreadPool::destroy() {
+ LOG_INFO(dhcpsrv_logger, "Thread pool shutting down");
+ stop(true);
+
+ LOG_INFO(dhcpsrv_logger, "Thread pool shut down");
+}
+
+void ThreadPool::start(uint32_t worker_threads) {
+ queue_.start();
exit_ = false;
for (int i = 0; i < worker_threads; ++i) {
- worker_threads_.push_back(make_shared<thread>(&ThreadPool::threadRun, this));
+ worker_threads_.push_back(make_shared<thread>(&ThreadPool::run, this));
}
LOG_INFO(dhcpsrv_logger, "Thread pool started");
}
-void ThreadPool::destroy() {
- LOG_INFO(dhcpsrv_logger, "Thread pool shutting down");
+void ThreadPool::stop(bool clear) {
exit_ = true;
- queue_.destroy();
+ queue_.stop(clear);
for (auto thread : worker_threads_) {
thread->join();
}
worker_threads_.clear();
- LOG_INFO(dhcpsrv_logger, "Thread pool shut down");
+ LOG_INFO(dhcpsrv_logger, "Thread pool stopped");
}
void ThreadPool::add(WorkItemCallBack call_back) {
- queue_.add(call_back);
+ queue_.push(call_back);
}
size_t ThreadPool::count() {
return queue_.count();
}
-void ThreadPool::threadRun() {
+void ThreadPool::run() {
thread::id th_id = this_thread::get_id();
LOG_INFO(dhcpsrv_logger, "Thread pool thread started. id: %1").arg(th_id);
while (!exit_) {
WorkItemCallBack work_item;
- if (queue_.get(work_item)) {
+ if (queue_.pop(work_item)) {
work_item();
}
}
#ifndef THREAD_POOL_H
#define THREAD_POOL_H
-#include <util/threads/lock_guard.h>
-
#include <boost/function.hpp>
#include <atomic>
#include <condition_variable>
-#include <cstdint>
#include <list>
#include <mutex>
#include <queue>
namespace isc {
namespace dhcp {
+/// @brief Defines a generic thread pool queue.
+///
+/// The main purpose is to safely manage thread pool tasks.
+/// The thread pool queue can be 'disabled', which means that no items can be
+/// added or removed from the queue, or 'enabled', which guarantees that
+/// inserting or removing items are thread safe.
+/// In 'disabled' state, all threads waiting on the queue are unlocked and all
+/// operations are non blocking.
template <typename WorkItem>
struct ThreadPoolQueue {
+ /// @brief Constructor
+ ///
+ /// Creates the thread pool queue in 'disabled' state
ThreadPoolQueue() : exit_(true) {
}
+ /// @brief Destructor
+ ///
+ /// Destroys the thread pool queue
~ThreadPoolQueue() {
+ stop(true);
}
- void add(WorkItem item) {
+ /// @brief push work item to the queue
+ ///
+ /// Used to add work items in the queue.
+ /// If the queue is 'disabled', this function returns immediately.
+ /// If the queue is 'enabled', this function adds an item to the queue and
+ /// wakes up at least one thread waiting on the queue.
+ ///
+ /// @param item the new iten to be added to the queue
+ void push(WorkItem item) {
std::lock_guard<std::mutex> lock(mutex_);
if (exit_) {
return;
}
queue_.push(item);
- // Notify get() so that it can effectively get a work item.
+ // Notify pop function so that it can effectively remove a work item.
cv_.notify_all();
}
- bool get(WorkItem& item) {
+ /// @brief pop work item from the queue or block waiting
+ ///
+ /// Used to retrieve and remove a work item from the queue
+ /// If the queue is 'disabled', this function returns immediately
+ /// (no element).
+ /// If the queue is 'enabled', this function returns the first element in
+ /// the queue or blocks the calling thread if there are no work items
+ /// available.
+ ///
+ /// @param item the reference of the item removed from the queue, if any
+ ///
+ /// @return true if there was a work item removed from the queue, false
+ /// otherwise
+ bool pop(WorkItem& item) {
std::unique_lock<std::mutex> lock(mutex_);
while (!exit_) {
if (queue_.empty()) {
- // Wait for add() or destroy().
+ // Wait for push or stop functions.
cv_.wait(lock);
continue;
}
return false;
}
+ /// @brief count number of work items in the queue
+ ///
+ /// Returns the number of work items in the queue
+ ///
+ /// @return the number of work items
size_t count() {
std::lock_guard<std::mutex> lock(mutex_);
return queue_.size();
}
- void removeAll() {
+ /// @brief clear remove all work items
+ ///
+ /// Removes all queued work items
+ void clear() {
std::lock_guard<std::mutex> lock(mutex_);
- removeAllUnsafe();
+ reset();
}
- void create() {
+ /// @brief start and enable the queue
+ ///
+ /// Sets the queue state to 'enabled'
+ void start() {
std::lock_guard<std::mutex> lock(mutex_);
exit_ = false;
}
- void destroy() {
+ /// brief stop and disable the queue
+ ///
+ /// Sets the queue state to 'disabled' and optionally removes all work items
+ ///
+ /// @param clear used to specify if the function should also clear the queue
+ void stop(bool clear = false) {
std::lock_guard<std::mutex> lock(mutex_);
exit_ = true;
// Notify get() so that it can exit.
cv_.notify_all();
- removeAllUnsafe();
+ if (clear) {
+ reset();
+ }
}
private:
- /// @brief Has to be called in a mutex_-locked environment.
- void removeAllUnsafe() {
- while (queue_.size()) {
- queue_.pop();
- }
+ /// @brief reset clears the queue removing all work items
+ ///
+ /// Must be called in a critical section (mutex locked scope).
+ void reset() {
+ queue_ = std::queue<WorkItem>();
}
+ /// @brief underlying queue container
std::queue<WorkItem> queue_;
+
+ /// @brief mutex used for critical sections
std::mutex mutex_;
+
+ /// @brief condition variable used to signal waiting threads
std::condition_variable cv_;
+
+ /// @brief the sate of the queue
+ ///
+ /// The 'enabled' state corresponds to false value
+ /// The 'disabled' state corresponds to true value
std::atomic_bool exit_;
};
+/// @brief Defines a thread pool which uses a thread pool queue for managing
+/// work items. Each work item is a 'function' object.
struct ThreadPool {
using WorkItemCallBack = std::function<void()>;
+ /// @brief Constructor
ThreadPool();
+
+ /// @brief Destructor
~ThreadPool();
- void create(uint32_t worker_threads);
+ /// @brief initialize the thread pool with specified number of treads and
+ /// optionally starts all threads.
+ ///
+ /// @param worker_threads specifies the number of threads to be created
+ /// @param run to indicate if the threads should be started immediately
+ void create(uint32_t thread_count, bool run = true);
+ /// @brief de-initialize the thread pool stopping threads and clearing the
+ /// internal queue
void destroy();
+ /// @brief start all the threads
+ ///
+ /// @param worker_threads specifies the number of threads to be created
+ void start(uint32_t thread_count);
+
+ /// @brief stop all the threads
+ ///
+ /// @param clear used to specify if the function should also clear the queue
+ void stop(bool clear = false);
+
+ /// @brief add a working item to the thread pool
+ ///
+ /// @param call_back the 'function' object to be added to the queue
void add(WorkItemCallBack call_back);
+ /// @brief count number of work items in the queue
+ ///
+ /// @return the number of work items in the queue
size_t count();
private:
- void threadRun();
+ /// @brief run function of each thread
+ void run();
+ /// @brief list of worker threads
std::list<std::shared_ptr<std::thread>> worker_threads_;
+
+ /// @brief underlying work items queue
ThreadPoolQueue<WorkItemCallBack> queue_;
+
+ /// @brief state of the thread pool
+ /// The 'run' state corresponds to false value
+ /// The 'stop' state corresponds to true value
std::atomic_bool exit_;
};