libkea_dhcpsrv_la_SOURCES += memfile_lease_mgr.cc memfile_lease_mgr.h
libkea_dhcpsrv_la_SOURCES += memfile_lease_storage.h
libkea_dhcpsrv_la_SOURCES += multi_threading_utils.h multi_threading_utils.cc
+libkea_dhcpsrv_la_SOURCES += thread_pool.h
if HAVE_MYSQL
libkea_dhcpsrv_la_SOURCES += mysql_lease_mgr.cc mysql_lease_mgr.h
--- /dev/null
+// Copyright (C) 2018-2019 Internet Systems Consortium, Inc. ("ISC")
+// Copyright (C) 2017-2019 Deutsche Telekom AG.
+//
+// Authors: Andrei Pavel <andrei.pavel@qualitance.com>
+// Cristian Secareanu <cristian.secareanu@qualitance.com>
+// Razvan Becheriu <razvan.becheriu@qualitance.com>
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <cassert>
+#include <config.h>
+#include <dhcpsrv/dhcpsrv_log.h>
+#include <dhcpsrv/thread_pool.h>
+
+#include <functional>
+
+using namespace std;
+
+namespace isc {
+namespace dhcp {
+
+ThreadPool::ThreadPool() : exit_(true) {
+}
+
+ThreadPool::~ThreadPool() {
+ destroy();
+}
+
+void ThreadPool::create(uint32_t worker_threads) {
+ LOG_INFO(dhcpsrv_logger, "Starting packet thread pool with %1 worker threads")
+ .arg(worker_threads);
+ if (!worker_threads) {
+ return;
+ }
+ destroy();
+ queue_.create();
+ exit_ = false;
+ for (int i = 0; i < worker_threads; ++i) {
+ worker_threads_.push_back(make_shared<thread>(&ThreadPool::threadRun, this));
+ }
+
+ LOG_INFO(dhcpsrv_logger, "Packet thread pool started");
+}
+
+void ThreadPool::destroy() {
+ LOG_INFO(dhcpsrv_logger, "Shutting down packet thread pool");
+ exit_ = true;
+ queue_.destroy();
+ for (auto thread : worker_threads_) {
+ thread->join();
+ }
+ worker_threads_.clear();
+
+ LOG_INFO(dhcpsrv_logger, "Packet thread pool shut down");
+}
+
+void ThreadPool::add(WorkItemCallBack call_back) {
+ queue_.add(call_back);
+}
+
+size_t ThreadPool::count() {
+ return queue_.count();
+}
+
+void ThreadPool::threadRun() {
+ thread::id th_id = this_thread::get_id();
+ LOG_INFO(dhcpsrv_logger, "Packet thread pool new thread started. id: %1").arg(th_id);
+
+ while (!exit_) {
+ WorkItemCallBack work_item;
+ if (queue_.get(work_item)) {
+ work_item();
+ }
+ }
+
+ LOG_INFO(dhcpsrv_logger, "Packet thread pool thread ended. id: %1").arg(th_id);
+}
+
+} // namespace dhcp
+} // namespace isc
--- /dev/null
+// Copyright (C) 2018-2019 Internet Systems Consortium, Inc. ("ISC")
+// Copyright (C) 2017-2019 Deutsche Telekom AG.
+//
+// Authors: Andrei Pavel <andrei.pavel@qualitance.com>
+// Cristian Secareanu <cristian.secareanu@qualitance.com>
+// Razvan Becheriu <razvan.becheriu@qualitance.com>
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef THREAD_POOL_H
+#define THREAD_POOL_H
+
+#include <util/threads/lock_guard.h>
+
+#include <boost/function.hpp>
+
+#include <atomic>
+#include <condition_variable>
+#include <cstdint>
+#include <list>
+#include <mutex>
+#include <queue>
+#include <thread>
+
+namespace isc {
+namespace dhcp {
+
+template <typename WorkItem>
+struct ThreadPoolQueue {
+ ThreadPoolQueue() : exit_(true) {
+ }
+
+ ~ThreadPoolQueue() {
+ }
+
+ void add(WorkItem item) {
+ isc::util::thread::LockGuard<std::mutex> lock(&mutex_);
+ if (exit_) {
+ return;
+ }
+ queue_.push(item);
+ // Notify get() so that it can effectively get a work item.
+ cv_.notify_all();
+ }
+
+ bool get(WorkItem& item) {
+ std::unique_lock<std::mutex> lock(mutex_);
+
+ while (!exit_) {
+ if (queue_.empty()) {
+ // Wait for add() or destroy().
+ cv_.wait(lock);
+ continue;
+ }
+
+ item = queue_.front();
+ queue_.pop();
+ return true;
+ }
+
+ return false;
+ }
+
+ size_t count() {
+ isc::util::thread::LockGuard<std::mutex> lock(&mutex_);
+ return queue_.size();
+ }
+
+ void removeAll() {
+ isc::util::thread::LockGuard<std::mutex> lock(&mutex_);
+ removeAllUnsafe();
+ }
+
+ void create() {
+ isc::util::thread::LockGuard<std::mutex> lock(&mutex_);
+ exit_ = false;
+ }
+
+ void destroy() {
+ isc::util::thread::LockGuard<std::mutex> lock(&mutex_);
+ exit_ = true;
+ // Notify get() so that it can exit.
+ cv_.notify_all();
+ removeAllUnsafe();
+ }
+
+private:
+ /// @brief Has to be called in a mutex_-locked environment.
+ void removeAllUnsafe() {
+ while (queue_.size()) {
+ queue_.pop();
+ }
+ }
+
+ std::queue<WorkItem> queue_;
+ std::mutex mutex_;
+ std::condition_variable cv_;
+ std::atomic_bool exit_;
+};
+
+struct ThreadPool {
+ using WorkItemCallBack = std::function<void()>;
+
+ ThreadPool();
+ ~ThreadPool();
+
+ void create(uint32_t worker_threads);
+
+ void destroy();
+
+ void add(WorkItemCallBack call_back);
+
+ size_t count();
+
+private:
+ void threadRun();
+
+ std::list<std::shared_ptr<std::thread>> worker_threads_;
+ ThreadPoolQueue<WorkItemCallBack> queue_;
+ std::atomic_bool exit_;
+};
+
+} // namespace dhcp
+} // namespace isc
+
+#endif // THREAD_POOL_H