// add items to stopped thread pool
for (uint32_t i = 0; i < items_count; ++i) {
- thread_pool.add(call_back);
+ thread_pool.add(make_shared<CallBack>(call_back));
}
// the item count should match
// add items to stopped thread pool
for (uint32_t i = 0; i < items_count; ++i) {
- thread_pool.add(call_back);
+ thread_pool.add(make_shared<CallBack>(call_back));
}
// the item count should match
// add items to running thread pool
for (uint32_t i = 0; i < items_count; ++i) {
- thread_pool.add(call_back);
+ thread_pool.add(make_shared<CallBack>(call_back));
}
// wait for all items to be processed
// add items to stopped thread pool
for (uint32_t i = 0; i < items_count; ++i) {
- thread_pool.add(call_back);
+ thread_pool.add(make_shared<CallBack>(call_back));
}
// the item count should match
#ifndef THREAD_POOL_H
#define THREAD_POOL_H
+#include <exceptions/exceptions.h>
+
#include <atomic>
#include <condition_variable>
#include <list>
/// @brief Defines a thread pool which uses a thread pool queue for managing
/// work items. Each work item is a 'function' object.
-template <typename WorkItem>
+///
+/// @tparam WorkItem a functor
+/// @tparam Container a 'queue like' container
+template <typename WorkItem, typename Container = std::queue<std::shared_ptr<WorkItem>>>
struct ThreadPool {
+ typedef typename std::shared_ptr<WorkItem> WorkItemPtr;
+
/// @brief Constructor
ThreadPool() : running_(false) {
}
/// @param thread_count specifies the number of threads to be created and
/// started
void start(uint32_t thread_count) {
- if (!thread_count || running_) {
- return;
+ if (!thread_count) {
+ isc_throw(InvalidParameter, "thread count is 0");
+ }
+ if (running_) {
+ isc_throw(InvalidParameter, "thread pool already started");
}
queue_.enable();
running_ = true;
/// @brief stop all the threads
void stop() {
+ if (!running_) {
+ isc_throw(InvalidParameter, "thread pool already stopped");
+ }
running_ = false;
queue_.disable();
for (auto thread : threads_) {
/// @brief add a work item to the thread pool
///
/// @param item the 'function' object to be added to the queue
- void add(WorkItem& item) {
+ void add(const WorkItemPtr& item) {
queue_.push(item);
}
/// removing items are thread safe.
/// In 'disabled' state, all threads waiting on the queue are unlocked and all
/// operations are non blocking.
- template <typename Item>
+ ///
+ /// @tparam Item a 'smart pointer' to a functor
+ /// @tparam QueueContainer a 'queue like' container
+ template <typename Item, typename QueueContainer = std::queue<Item>>
struct ThreadPoolQueue {
/// @brief Constructor
///
/// waiting on the queue.
///
/// @param item the new item to be added to the queue
- void push(Item& item) {
+ void push(const Item& item) {
if (!item) {
return;
}
cv_.wait(lock);
continue;
}
-
Item item = queue_.front();
queue_.pop();
return item;
}
-
return Item();
}
private:
/// @brief underlying queue container
- std::queue<Item> queue_;
+ QueueContainer queue_;
/// @brief mutex used for critical sections
std::mutex mutex_;
/// @brief the sate of the queue
/// The 'enabled' state corresponds to true value
/// The 'disabled' state corresponds to false value
- std::atomic_bool enabled_;
+ std::atomic<bool> enabled_;
};
/// @brief run function of each thread
void run() {
while (running_) {
- WorkItem item = queue_.pop();
+ WorkItemPtr item = queue_.pop();
if (item) {
try {
- item();
+ (*item)();
} catch (...) {
}
}
}
/// @brief list of worker threads
- std::list<std::shared_ptr<std::thread>> threads_;
+ std::vector<std::shared_ptr<std::thread>> threads_;
/// @brief underlying work items queue
- ThreadPoolQueue<WorkItem> queue_;
+ ThreadPoolQueue<WorkItemPtr, Container> queue_;
/// @brief state of the thread pool
/// The 'running' state corresponds to true value
/// The 'not running' state corresponds to false value
- std::atomic_bool running_;
+ std::atomic<bool> running_;
};
} // namespace util