]> git.ipfire.org Git - thirdparty/kea.git/commitdiff
added thread pool
authorRazvan Becheriu <razvan@isc.org>
Mon, 25 Mar 2019 13:31:07 +0000 (15:31 +0200)
committerRazvan Becheriu <razvan@isc.org>
Fri, 12 Apr 2019 12:05:14 +0000 (15:05 +0300)
src/lib/dhcpsrv/thread_pool.cc [new file with mode: 0644]
src/lib/dhcpsrv/thread_pool.h [new file with mode: 0644]

diff --git a/src/lib/dhcpsrv/thread_pool.cc b/src/lib/dhcpsrv/thread_pool.cc
new file mode 100644 (file)
index 0000000..191cc33
--- /dev/null
@@ -0,0 +1,71 @@
+#include <cassert>
+#include <config.h>
+#include <dhcpsrv/dhcpsrv_log.h>
+#include <dhcpsrv/thread_pool.h>
+
+#include <functional>
+
+using namespace std;
+
+namespace isc {
+namespace dhcp {
+
+ThreadPool::ThreadPool() : exit_(true) {
+}
+
+ThreadPool::~ThreadPool() {
+    destroy();
+}
+
+void ThreadPool::create(uint32_t worker_threads) {
+    LOG_INFO(dhcpsrv_logger, "Starting packet thread pool with %1 worker threads")
+        .arg(worker_threads);
+    if (!worker_threads) {
+        return;
+    }
+    destroy();
+    queue_.create();
+    exit_ = false;
+    for (int i = 0; i < worker_threads; ++i) {
+        worker_threads_.push_back(make_shared<thread>(&ThreadPool::threadRun, this));
+    }
+
+    LOG_INFO(dhcpsrv_logger, "Packet thread pool started");
+}
+
+void ThreadPool::destroy() {
+    LOG_INFO(dhcpsrv_logger, "Shutting down packet thread pool");
+    exit_ = true;
+    queue_.destroy();
+    for (auto thread : worker_threads_) {
+        thread->join();
+    }
+    worker_threads_.clear();
+
+    LOG_INFO(dhcpsrv_logger, "Packet thread pool shut down");
+}
+
+void ThreadPool::add(WorkItemCallBack call_back) {
+    queue_.add(call_back);
+}
+
+size_t ThreadPool::count() {
+    return queue_.count();
+}
+
+void ThreadPool::threadRun() {
+    thread::id th_id = this_thread::get_id();
+    LOG_INFO(dhcpsrv_logger, "Packet thread pool new thread started. id: %1").arg(th_id);
+
+    while (!exit_) {
+        WorkItemCallBack work_item;
+        if (queue_.get(work_item)) {
+            work_item();
+        }
+    }
+
+    LOG_INFO(dhcpsrv_logger, "Packet thread pool thread ended. id: %1").arg(th_id);
+}
+
+}  // namespace dhcp
+}  // namespace isc
diff --git a/src/lib/dhcpsrv/thread_pool.h b/src/lib/dhcpsrv/thread_pool.h
new file mode 100644 (file)
index 0000000..c04bf43
--- /dev/null
@@ -0,0 +1,117 @@
+#ifndef THREAD_POOL_H
+#define THREAD_POOL_H
+
+#include <util/threads/lock_guard.h>
+
+#include <boost/function.hpp>
+
+#include <atomic>
+#include <condition_variable>
+#include <cstdint>
+#include <list>
+#include <mutex>
+#include <queue>
+#include <thread>
+
+namespace isc {
+namespace dhcp {
+
+template <typename WorkItem>
+struct ThreadPoolQueue {
+    ThreadPoolQueue() : exit_(true) {
+    }
+
+    ~ThreadPoolQueue() {
+    }
+
+    void add(WorkItem item) {
+        isc::util::thread::LockGuard<std::mutex> lock(&mutex_);
+        if (exit_) {
+            return;
+        }
+        queue_.push(item);
+        // Notify get() so that it can effectively get a work item.
+        cv_.notify_all();
+    }
+
+    bool get(WorkItem& item) {
+        std::unique_lock<std::mutex> lock(mutex_);
+
+        while (!exit_) {
+            if (queue_.empty()) {
+                // Wait for add() or destroy().
+                cv_.wait(lock);
+                continue;
+            }
+
+            item = queue_.front();
+            queue_.pop();
+            return true;
+        }
+
+        return false;
+    }
+
+    size_t count() {
+        isc::util::thread::LockGuard<std::mutex> lock(&mutex_);
+        return queue_.size();
+    }
+
+    void removeAll() {
+        isc::util::thread::LockGuard<std::mutex> lock(&mutex_);
+        removeAllUnsafe();
+    }
+
+    void create() {
+        isc::util::thread::LockGuard<std::mutex> lock(&mutex_);
+        exit_ = false;
+    }
+
+    void destroy() {
+        isc::util::thread::LockGuard<std::mutex> lock(&mutex_);
+        exit_ = true;
+        // Notify get() so that it can exit.
+        cv_.notify_all();
+        removeAllUnsafe();
+    }
+
+private:
+    /// @brief Has to be called in a mutex_-locked environment.
+    void removeAllUnsafe() {
+        while (queue_.size()) {
+            queue_.pop();
+        }
+    }
+
+    std::queue<WorkItem> queue_;
+    std::mutex mutex_;
+    std::condition_variable cv_;
+    std::atomic_bool exit_;
+};
+
+struct ThreadPool {
+    using WorkItemCallBack = std::function<void()>;
+
+    ThreadPool();
+    ~ThreadPool();
+
+    void create(uint32_t worker_threads);
+
+    void destroy();
+
+    void add(WorkItemCallBack call_back);
+
+    size_t count();
+
+private:
+    void threadRun();
+
+    std::list<std::shared_ptr<std::thread>> worker_threads_;
+    ThreadPoolQueue<WorkItemCallBack> queue_;
+    std::atomic_bool exit_;
+};
+
+}  // namespace dhcp
+}  // namespace isc
+
+#endif  // THREAD_POOL_H