]> git.ipfire.org Git - thirdparty/kea.git/commitdiff
[#883, !506] added thread pool
authorRazvan Becheriu <razvan@isc.org>
Mon, 9 Sep 2019 08:53:03 +0000 (11:53 +0300)
committerRazvan Becheriu <razvan@isc.org>
Fri, 8 Nov 2019 13:52:29 +0000 (15:52 +0200)
src/lib/dhcpsrv/Makefile.am
src/lib/dhcpsrv/thread_pool.cc [new file with mode: 0644]
src/lib/dhcpsrv/thread_pool.h [new file with mode: 0644]

index d0076959dd2f9915f05a686d33dab6f5ef54b878..1d705cd5736cd2f3f1da39a4924c2ec1db570ac4 100644 (file)
@@ -118,6 +118,7 @@ libkea_dhcpsrv_la_SOURCES += lease_mgr_factory.cc lease_mgr_factory.h
 libkea_dhcpsrv_la_SOURCES += memfile_lease_mgr.cc memfile_lease_mgr.h
 libkea_dhcpsrv_la_SOURCES += memfile_lease_storage.h
 libkea_dhcpsrv_la_SOURCES += multi_threading_utils.h multi_threading_utils.cc
+libkea_dhcpsrv_la_SOURCES += thread_pool.h
 
 if HAVE_MYSQL
 libkea_dhcpsrv_la_SOURCES += mysql_lease_mgr.cc mysql_lease_mgr.h
diff --git a/src/lib/dhcpsrv/thread_pool.cc b/src/lib/dhcpsrv/thread_pool.cc
new file mode 100644 (file)
index 0000000..e784935
--- /dev/null
@@ -0,0 +1,90 @@
+// Copyright (C) 2018-2019 Internet Systems Consortium, Inc. ("ISC")
+// Copyright (C) 2017-2019 Deutsche Telekom AG.
+//
+// Authors: Andrei Pavel <andrei.pavel@qualitance.com>
+//          Cristian Secareanu <cristian.secareanu@qualitance.com>
+//          Razvan Becheriu <razvan.becheriu@qualitance.com>
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//           http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <cassert>
+#include <config.h>
+#include <dhcpsrv/dhcpsrv_log.h>
+#include <dhcpsrv/thread_pool.h>
+
+#include <functional>
+
+using namespace std;
+
+namespace isc {
+namespace dhcp {
+
+ThreadPool::ThreadPool() : exit_(true) {
+}
+
+ThreadPool::~ThreadPool() {
+    destroy();
+}
+
+void ThreadPool::create(uint32_t worker_threads) {
+    LOG_INFO(dhcpsrv_logger, "Starting packet thread pool with %1 worker threads")
+        .arg(worker_threads);
+    if (!worker_threads) {
+        return;
+    }
+    destroy();
+    queue_.create();
+    exit_ = false;
+    for (int i = 0; i < worker_threads; ++i) {
+        worker_threads_.push_back(make_shared<thread>(&ThreadPool::threadRun, this));
+    }
+
+    LOG_INFO(dhcpsrv_logger, "Packet thread pool started");
+}
+
+void ThreadPool::destroy() {
+    LOG_INFO(dhcpsrv_logger, "Shutting down packet thread pool");
+    exit_ = true;
+    queue_.destroy();
+    for (auto thread : worker_threads_) {
+        thread->join();
+    }
+    worker_threads_.clear();
+
+    LOG_INFO(dhcpsrv_logger, "Packet thread pool shut down");
+}
+
+void ThreadPool::add(WorkItemCallBack call_back) {
+    queue_.add(call_back);
+}
+
+size_t ThreadPool::count() {
+    return queue_.count();
+}
+
+void ThreadPool::threadRun() {
+    thread::id th_id = this_thread::get_id();
+    LOG_INFO(dhcpsrv_logger, "Packet thread pool new thread started. id: %1").arg(th_id);
+
+    while (!exit_) {
+        WorkItemCallBack work_item;
+        if (queue_.get(work_item)) {
+            work_item();
+        }
+    }
+
+    LOG_INFO(dhcpsrv_logger, "Packet thread pool thread ended. id: %1").arg(th_id);
+}
+
+}  // namespace dhcp
+}  // namespace isc
diff --git a/src/lib/dhcpsrv/thread_pool.h b/src/lib/dhcpsrv/thread_pool.h
new file mode 100644 (file)
index 0000000..6b73ca5
--- /dev/null
@@ -0,0 +1,136 @@
+// Copyright (C) 2018-2019 Internet Systems Consortium, Inc. ("ISC")
+// Copyright (C) 2017-2019 Deutsche Telekom AG.
+//
+// Authors: Andrei Pavel <andrei.pavel@qualitance.com>
+//          Cristian Secareanu <cristian.secareanu@qualitance.com>
+//          Razvan Becheriu <razvan.becheriu@qualitance.com>
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//           http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef THREAD_POOL_H
+#define THREAD_POOL_H
+
+#include <util/threads/lock_guard.h>
+
+#include <boost/function.hpp>
+
+#include <atomic>
+#include <condition_variable>
+#include <cstdint>
+#include <list>
+#include <mutex>
+#include <queue>
+#include <thread>
+
+namespace isc {
+namespace dhcp {
+
+template <typename WorkItem>
+struct ThreadPoolQueue {
+    ThreadPoolQueue() : exit_(true) {
+    }
+
+    ~ThreadPoolQueue() {
+    }
+
+    void add(WorkItem item) {
+        isc::util::thread::LockGuard<std::mutex> lock(&mutex_);
+        if (exit_) {
+            return;
+        }
+        queue_.push(item);
+        // Notify get() so that it can effectively get a work item.
+        cv_.notify_all();
+    }
+
+    bool get(WorkItem& item) {
+        std::unique_lock<std::mutex> lock(mutex_);
+
+        while (!exit_) {
+            if (queue_.empty()) {
+                // Wait for add() or destroy().
+                cv_.wait(lock);
+                continue;
+            }
+
+            item = queue_.front();
+            queue_.pop();
+            return true;
+        }
+
+        return false;
+    }
+
+    size_t count() {
+        isc::util::thread::LockGuard<std::mutex> lock(&mutex_);
+        return queue_.size();
+    }
+
+    void removeAll() {
+        isc::util::thread::LockGuard<std::mutex> lock(&mutex_);
+        removeAllUnsafe();
+    }
+
+    void create() {
+        isc::util::thread::LockGuard<std::mutex> lock(&mutex_);
+        exit_ = false;
+    }
+
+    void destroy() {
+        isc::util::thread::LockGuard<std::mutex> lock(&mutex_);
+        exit_ = true;
+        // Notify get() so that it can exit.
+        cv_.notify_all();
+        removeAllUnsafe();
+    }
+
+private:
+    /// @brief Has to be called in a mutex_-locked environment.
+    void removeAllUnsafe() {
+        while (queue_.size()) {
+            queue_.pop();
+        }
+    }
+
+    std::queue<WorkItem> queue_;
+    std::mutex mutex_;
+    std::condition_variable cv_;
+    std::atomic_bool exit_;
+};
+
+struct ThreadPool {
+    using WorkItemCallBack = std::function<void()>;
+
+    ThreadPool();
+    ~ThreadPool();
+
+    void create(uint32_t worker_threads);
+
+    void destroy();
+
+    void add(WorkItemCallBack call_back);
+
+    size_t count();
+
+private:
+    void threadRun();
+
+    std::list<std::shared_ptr<std::thread>> worker_threads_;
+    ThreadPoolQueue<WorkItemCallBack> queue_;
+    std::atomic_bool exit_;
+};
+
+}  // namespace dhcp
+}  // namespace isc
+
+#endif  // THREAD_POOL_H