From: Razvan Becheriu Date: Mon, 25 Mar 2019 13:31:07 +0000 (+0200) Subject: added thread pool X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=96cef34660ab203075ea014b93458c44660bee80;p=thirdparty%2Fkea.git added thread pool --- diff --git a/src/lib/dhcpsrv/thread_pool.cc b/src/lib/dhcpsrv/thread_pool.cc new file mode 100644 index 0000000000..191cc330b4 --- /dev/null +++ b/src/lib/dhcpsrv/thread_pool.cc @@ -0,0 +1,71 @@ +#include +#include +#include +#include + +#include + +using namespace std; + +namespace isc { +namespace dhcp { + +ThreadPool::ThreadPool() : exit_(true) { +} + +ThreadPool::~ThreadPool() { + destroy(); +} + +void ThreadPool::create(uint32_t worker_threads) { + LOG_INFO(dhcpsrv_logger, "Starting packet thread pool with %1 worker threads") + .arg(worker_threads); + if (!worker_threads) { + return; + } + destroy(); + queue_.create(); + exit_ = false; + for (int i = 0; i < worker_threads; ++i) { + worker_threads_.push_back(make_shared(&ThreadPool::threadRun, this)); + } + + LOG_INFO(dhcpsrv_logger, "Packet thread pool started"); +} + +void ThreadPool::destroy() { + LOG_INFO(dhcpsrv_logger, "Shutting down packet thread pool"); + exit_ = true; + queue_.destroy(); + for (auto thread : worker_threads_) { + thread->join(); + } + worker_threads_.clear(); + + LOG_INFO(dhcpsrv_logger, "Packet thread pool shut down"); +} + +void ThreadPool::add(WorkItemCallBack call_back) { + queue_.add(call_back); +} + +size_t ThreadPool::count() { + return queue_.count(); +} + +void ThreadPool::threadRun() { + thread::id th_id = this_thread::get_id(); + LOG_INFO(dhcpsrv_logger, "Packet thread pool new thread started. id: %1").arg(th_id); + + while (!exit_) { + WorkItemCallBack work_item; + if (queue_.get(work_item)) { + work_item(); + } + } + + LOG_INFO(dhcpsrv_logger, "Packet thread pool thread ended. id: %1").arg(th_id); +} + +} // namespace dhcp +} // namespace isc diff --git a/src/lib/dhcpsrv/thread_pool.h b/src/lib/dhcpsrv/thread_pool.h new file mode 100644 index 0000000000..c04bf4383a --- /dev/null +++ b/src/lib/dhcpsrv/thread_pool.h @@ -0,0 +1,117 @@ +#ifndef THREAD_POOL_H +#define THREAD_POOL_H + +#include + +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace isc { +namespace dhcp { + +template +struct ThreadPoolQueue { + ThreadPoolQueue() : exit_(true) { + } + + ~ThreadPoolQueue() { + } + + void add(WorkItem item) { + isc::util::thread::LockGuard lock(&mutex_); + if (exit_) { + return; + } + queue_.push(item); + // Notify get() so that it can effectively get a work item. + cv_.notify_all(); + } + + bool get(WorkItem& item) { + std::unique_lock lock(mutex_); + + while (!exit_) { + if (queue_.empty()) { + // Wait for add() or destroy(). + cv_.wait(lock); + continue; + } + + item = queue_.front(); + queue_.pop(); + return true; + } + + return false; + } + + size_t count() { + isc::util::thread::LockGuard lock(&mutex_); + return queue_.size(); + } + + void removeAll() { + isc::util::thread::LockGuard lock(&mutex_); + removeAllUnsafe(); + } + + void create() { + isc::util::thread::LockGuard lock(&mutex_); + exit_ = false; + } + + void destroy() { + isc::util::thread::LockGuard lock(&mutex_); + exit_ = true; + // Notify get() so that it can exit. + cv_.notify_all(); + removeAllUnsafe(); + } + +private: + /// @brief Has to be called in a mutex_-locked environment. + void removeAllUnsafe() { + while (queue_.size()) { + queue_.pop(); + } + } + + std::queue queue_; + std::mutex mutex_; + std::condition_variable cv_; + std::atomic_bool exit_; +}; + +struct ThreadPool { + using WorkItemCallBack = std::function; + + ThreadPool(); + ~ThreadPool(); + + void create(uint32_t worker_threads); + + void destroy(); + + void add(WorkItemCallBack call_back); + + size_t count(); + +private: + void threadRun(); + + std::list> worker_threads_; + ThreadPoolQueue queue_; + std::atomic_bool exit_; +}; + +} // namespace dhcp +} // namespace isc + +#endif // THREAD_POOL_H