From: Razvan Becheriu Date: Mon, 9 Sep 2019 08:53:03 +0000 (+0300) Subject: [#883, !506] added thread pool X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=189ff88848c2c734278f7d7451c7a7ef295d62ae;p=thirdparty%2Fkea.git [#883, !506] added thread pool --- diff --git a/src/lib/dhcpsrv/Makefile.am b/src/lib/dhcpsrv/Makefile.am index d0076959dd..1d705cd573 100644 --- a/src/lib/dhcpsrv/Makefile.am +++ b/src/lib/dhcpsrv/Makefile.am @@ -118,6 +118,7 @@ libkea_dhcpsrv_la_SOURCES += lease_mgr_factory.cc lease_mgr_factory.h libkea_dhcpsrv_la_SOURCES += memfile_lease_mgr.cc memfile_lease_mgr.h libkea_dhcpsrv_la_SOURCES += memfile_lease_storage.h libkea_dhcpsrv_la_SOURCES += multi_threading_utils.h multi_threading_utils.cc +libkea_dhcpsrv_la_SOURCES += thread_pool.h if HAVE_MYSQL libkea_dhcpsrv_la_SOURCES += mysql_lease_mgr.cc mysql_lease_mgr.h diff --git a/src/lib/dhcpsrv/thread_pool.cc b/src/lib/dhcpsrv/thread_pool.cc new file mode 100644 index 0000000000..e7849356d3 --- /dev/null +++ b/src/lib/dhcpsrv/thread_pool.cc @@ -0,0 +1,90 @@ +// Copyright (C) 2018-2019 Internet Systems Consortium, Inc. ("ISC") +// Copyright (C) 2017-2019 Deutsche Telekom AG. +// +// Authors: Andrei Pavel +// Cristian Secareanu +// Razvan Becheriu +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include + +#include + +using namespace std; + +namespace isc { +namespace dhcp { + +ThreadPool::ThreadPool() : exit_(true) { +} + +ThreadPool::~ThreadPool() { + destroy(); +} + +void ThreadPool::create(uint32_t worker_threads) { + LOG_INFO(dhcpsrv_logger, "Starting packet thread pool with %1 worker threads") + .arg(worker_threads); + if (!worker_threads) { + return; + } + destroy(); + queue_.create(); + exit_ = false; + for (int i = 0; i < worker_threads; ++i) { + worker_threads_.push_back(make_shared(&ThreadPool::threadRun, this)); + } + + LOG_INFO(dhcpsrv_logger, "Packet thread pool started"); +} + +void ThreadPool::destroy() { + LOG_INFO(dhcpsrv_logger, "Shutting down packet thread pool"); + exit_ = true; + queue_.destroy(); + for (auto thread : worker_threads_) { + thread->join(); + } + worker_threads_.clear(); + + LOG_INFO(dhcpsrv_logger, "Packet thread pool shut down"); +} + +void ThreadPool::add(WorkItemCallBack call_back) { + queue_.add(call_back); +} + +size_t ThreadPool::count() { + return queue_.count(); +} + +void ThreadPool::threadRun() { + thread::id th_id = this_thread::get_id(); + LOG_INFO(dhcpsrv_logger, "Packet thread pool new thread started. id: %1").arg(th_id); + + while (!exit_) { + WorkItemCallBack work_item; + if (queue_.get(work_item)) { + work_item(); + } + } + + LOG_INFO(dhcpsrv_logger, "Packet thread pool thread ended. id: %1").arg(th_id); +} + +} // namespace dhcp +} // namespace isc diff --git a/src/lib/dhcpsrv/thread_pool.h b/src/lib/dhcpsrv/thread_pool.h new file mode 100644 index 0000000000..6b73ca5537 --- /dev/null +++ b/src/lib/dhcpsrv/thread_pool.h @@ -0,0 +1,136 @@ +// Copyright (C) 2018-2019 Internet Systems Consortium, Inc. ("ISC") +// Copyright (C) 2017-2019 Deutsche Telekom AG. +// +// Authors: Andrei Pavel +// Cristian Secareanu +// Razvan Becheriu +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef THREAD_POOL_H +#define THREAD_POOL_H + +#include + +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace isc { +namespace dhcp { + +template +struct ThreadPoolQueue { + ThreadPoolQueue() : exit_(true) { + } + + ~ThreadPoolQueue() { + } + + void add(WorkItem item) { + isc::util::thread::LockGuard lock(&mutex_); + if (exit_) { + return; + } + queue_.push(item); + // Notify get() so that it can effectively get a work item. + cv_.notify_all(); + } + + bool get(WorkItem& item) { + std::unique_lock lock(mutex_); + + while (!exit_) { + if (queue_.empty()) { + // Wait for add() or destroy(). + cv_.wait(lock); + continue; + } + + item = queue_.front(); + queue_.pop(); + return true; + } + + return false; + } + + size_t count() { + isc::util::thread::LockGuard lock(&mutex_); + return queue_.size(); + } + + void removeAll() { + isc::util::thread::LockGuard lock(&mutex_); + removeAllUnsafe(); + } + + void create() { + isc::util::thread::LockGuard lock(&mutex_); + exit_ = false; + } + + void destroy() { + isc::util::thread::LockGuard lock(&mutex_); + exit_ = true; + // Notify get() so that it can exit. + cv_.notify_all(); + removeAllUnsafe(); + } + +private: + /// @brief Has to be called in a mutex_-locked environment. + void removeAllUnsafe() { + while (queue_.size()) { + queue_.pop(); + } + } + + std::queue queue_; + std::mutex mutex_; + std::condition_variable cv_; + std::atomic_bool exit_; +}; + +struct ThreadPool { + using WorkItemCallBack = std::function; + + ThreadPool(); + ~ThreadPool(); + + void create(uint32_t worker_threads); + + void destroy(); + + void add(WorkItemCallBack call_back); + + size_t count(); + +private: + void threadRun(); + + std::list> worker_threads_; + ThreadPoolQueue queue_; + std::atomic_bool exit_; +}; + +} // namespace dhcp +} // namespace isc + +#endif // THREAD_POOL_H