]> git.ipfire.org Git - thirdparty/kea.git/commitdiff
[#883, !506] changed thread pool interface
authorRazvan Becheriu <razvan@isc.org>
Fri, 8 Nov 2019 13:49:12 +0000 (15:49 +0200)
committerRazvan Becheriu <razvan@isc.org>
Fri, 8 Nov 2019 13:52:29 +0000 (15:52 +0200)
src/lib/dhcpsrv/tests/thread_pool_unittest.cc
src/lib/dhcpsrv/thread_pool.cc [deleted file]
src/lib/dhcpsrv/thread_pool.h

index ff19be24074d4236c6e2ce31355525460783741c..218b78547695992ef80317380a8e3d5ddfe5872a 100644 (file)
@@ -10,6 +10,8 @@
 
 #include <dhcpsrv/thread_pool.h>
 
+#include <boost/function.hpp>
+
 using namespace isc::dhcp;
 using namespace std;
 
@@ -184,11 +186,13 @@ private:
     list<shared_ptr<std::thread>> threads_;
 };
 
+typedef function<void()> CallBack;
+
 /// @brief test ThreadPool add and count
 TEST_F(ThreadPoolTest, testAddAndCount) {
     uint32_t items_count;
-    ThreadPool::WorkItemCallBack call_back;
-    ThreadPool thread_pool;
+    CallBack call_back;
+    ThreadPool<CallBack> thread_pool;
     // the item count should be 0
     ASSERT_EQ(thread_pool.count(), 0);
     // the thread count should be 0
@@ -202,170 +206,12 @@ TEST_F(ThreadPoolTest, testAddAndCount) {
     for (uint32_t i = 0; i < items_count; ++i) {
         thread_pool.add(call_back);
     }
-    // the item count should match
-    ASSERT_EQ(thread_pool.count(), items_count);
-}
-
-/// @brief test ThreadPool create and destroy
-TEST_F(ThreadPoolTest, testCreateAndDestroy) {
-    uint32_t items_count;
-    uint32_t thread_count;
-    ThreadPool::WorkItemCallBack call_back;
-    ThreadPool thread_pool;
-    // the item count should be 0
-    ASSERT_EQ(thread_pool.count(), 0);
-    // the thread count should be 0
-    ASSERT_EQ(thread_pool.size(), 0);
-
-    items_count = 4;
-    thread_count = 4;
-    // prepare setup
-    reset(thread_count);
-
-    // create tasks which block thread pool threads until signaled by main
-    // thread to force all threads of the thread pool to run exactly one task
-    call_back = std::bind(&ThreadPoolTest::runAndWait, this);
-
-    // add items to stopped thread pool
-    for (uint32_t i = 0; i < items_count; ++i) {
-        thread_pool.add(call_back);
-    }
-    // the item count should match
-    ASSERT_EQ(thread_pool.count(), items_count);
 
-    // calling create with false should not create threads and should remove all
-    // queued items
-    thread_pool.create(thread_count, false);
-    // the item count should be 0
-    ASSERT_EQ(thread_pool.count(), 0);
-    // the thread count should be 0
-    ASSERT_EQ(thread_pool.size(), 0);
-
-    // add items to stopped thread pool
-    for (uint32_t i = 0; i < items_count; ++i) {
-        thread_pool.add(call_back);
-    }
     // the item count should match
     ASSERT_EQ(thread_pool.count(), items_count);
-    // the thread count should be 0
-    ASSERT_EQ(thread_pool.size(), 0);
-
-    // calling destroy should clear all threads and should remove all queued
-    // items
-    thread_pool.destroy();
-    // the item count should be 0
-    ASSERT_EQ(thread_pool.count(), 0);
-    // the thread count should be 0
-    ASSERT_EQ(thread_pool.size(), 0);
-
-    // do it once again to check if it works
-    thread_pool.destroy();
-    // the item count should be 0
-    ASSERT_EQ(thread_pool.count(), 0);
-    // the thread count should be 0
-    ASSERT_EQ(thread_pool.size(), 0);
 
-    // calling create with false should not create threads and should remove all
-    // queued items
-    thread_pool.create(thread_count, false);
-    // the item count should be 0
-    ASSERT_EQ(thread_pool.count(), 0);
-    // the thread count should be 0
-    ASSERT_EQ(thread_pool.size(), 0);
-
-    // add items to stopped thread pool
-    for (uint32_t i = 0; i < items_count; ++i) {
-        thread_pool.add(call_back);
-    }
-    // the item count should match
-    ASSERT_EQ(thread_pool.count(), items_count);
-    // the thread count should be 0
-    ASSERT_EQ(thread_pool.size(), 0);
-
-    // calling create with true should create threads and should remove all
-    // queued items
-    thread_pool.create(thread_count);
-    // the item count should be 0
-    ASSERT_EQ(thread_pool.count(), 0);
-    // the thread count should match
-    ASSERT_EQ(thread_pool.size(), thread_count);
-
-    // add items to running thread pool
-    for (uint32_t i = 0; i < items_count; ++i) {
-        thread_pool.add(call_back);
-    }
-
-    // wait for all items to be processed
-    waitTasks(thread_count, items_count);
-    // the item count should be 0
-    ASSERT_EQ(thread_pool.count(), 0);
-    // the thread count should match
-    ASSERT_EQ(thread_pool.size(), thread_count);
-    // as each thread pool thread is still waiting on main to unblock, each
-    // thread should have been registered in ids list
-    checkIds(items_count);
-    // all items should have been processed
-    ASSERT_EQ(count(), items_count);
-
-    // check that the number of processed tasks matches the number of items
-    checkRunHistory(items_count);
-
-    // signal thread pool tasks to continue
-    signalThreads();
-
-    // calling destroy should clear all threads and should remove all queued
-    // items
-    thread_pool.destroy();
-    // the item count should be 0
-    ASSERT_EQ(thread_pool.count(), 0);
-    // the thread count should be 0
-    ASSERT_EQ(thread_pool.size(), 0);
-
-    // do it once again to check if it works
-    thread_pool.destroy();
-    // the item count should be 0
-    ASSERT_EQ(thread_pool.count(), 0);
-    // the thread count should be 0
-    ASSERT_EQ(thread_pool.size(), 0);
-
-    items_count = 64;
-    thread_count = 16;
-    // prepare setup
-    reset(thread_count);
-
-    // create tasks which do not block the thread pool threads so that several
-    // tasks can be run on the same thread and some of the threads never even
-    // having a chance to run
-    call_back = std::bind(&ThreadPoolTest::run, this);
-
-    // calling create with true should create threads and should remove all
-    // queued items
-    thread_pool.create(thread_count);
-    // the item count should be 0
-    ASSERT_EQ(thread_pool.count(), 0);
-    // the thread count should match
-    ASSERT_EQ(thread_pool.size(), thread_count);
-
-    // add items to running thread pool
-    for (uint32_t i = 0; i < items_count; ++i) {
-        thread_pool.add(call_back);
-    }
-
-    // wait for all items to be processed
-    waitTasks(thread_count, items_count);
-    // the item count should be 0
-    ASSERT_EQ(thread_pool.count(), 0);
-    // the thread count should match
-    ASSERT_EQ(thread_pool.size(), thread_count);
-    // all items should have been processed
-    ASSERT_EQ(count(), items_count);
-
-    // check that the number of processed tasks matches the number of items
-    checkRunHistory(items_count);
-
-    // calling destroy should clear all threads and should remove all queued
-    // items
-    thread_pool.destroy();
+    // calling reset should clear all threads and should remove all queued items
+    thread_pool.reset();
     // the item count should be 0
     ASSERT_EQ(thread_pool.count(), 0);
     // the thread count should be 0
@@ -376,8 +222,8 @@ TEST_F(ThreadPoolTest, testCreateAndDestroy) {
 TEST_F(ThreadPoolTest, testStartAndStop) {
     uint32_t items_count;
     uint32_t thread_count;
-    ThreadPool::WorkItemCallBack call_back;
-    ThreadPool thread_pool;
+    CallBack call_back;
+    ThreadPool<CallBack> thread_pool;
     // the item count should be 0
     ASSERT_EQ(thread_pool.count(), 0);
     // the thread count should be 0
@@ -406,8 +252,7 @@ TEST_F(ThreadPoolTest, testStartAndStop) {
     // the thread count should match
     ASSERT_EQ(thread_pool.size(), thread_count);
 
-    // calling stop with false should clear all threads and should keep queued
-    // items
+    // calling stop should clear all threads and should keep queued items
     thread_pool.stop();
     // the item count should be 0
     ASSERT_EQ(thread_pool.count(), 0);
@@ -425,22 +270,28 @@ TEST_F(ThreadPoolTest, testStartAndStop) {
     for (uint32_t i = 0; i < items_count; ++i) {
         thread_pool.add(call_back);
     }
+
     // the item count should match
     ASSERT_EQ(thread_pool.count(), items_count);
     // the thread count should be 0
     ASSERT_EQ(thread_pool.size(), 0);
 
-    // calling stop with false should clear all threads and should keep queued
-    // items
+    // calling stop should clear all threads and should keep queued items
     thread_pool.stop();
     // the item count should match
     ASSERT_EQ(thread_pool.count(), items_count);
     // the thread count should be 0
     ASSERT_EQ(thread_pool.size(), 0);
 
-    // calling stop with true should clear all threads and should remove all
-    // queued items
-    thread_pool.stop(true);
+    // calling reset should clear all threads and should remove all queued items
+    thread_pool.reset();
+    // the item count should be 0
+    ASSERT_EQ(thread_pool.count(), 0);
+    // the thread count should be 0
+    ASSERT_EQ(thread_pool.size(), 0);
+
+    // do it once again to check if it works
+    thread_pool.reset();
     // the item count should be 0
     ASSERT_EQ(thread_pool.count(), 0);
     // the thread count should be 0
@@ -476,8 +327,7 @@ TEST_F(ThreadPoolTest, testStartAndStop) {
     // signal thread pool tasks to continue
     signalThreads();
 
-    // calling stop with false should clear all threads and should keep queued
-    // items
+    // calling stop should clear all threads and should keep queued items
     thread_pool.stop();
     // the item count should be 0
     ASSERT_EQ(thread_pool.count(), 0);
@@ -498,6 +348,7 @@ TEST_F(ThreadPoolTest, testStartAndStop) {
     for (uint32_t i = 0; i < items_count; ++i) {
         thread_pool.add(call_back);
     }
+
     // the item count should match
     ASSERT_EQ(thread_pool.count(), items_count);
     // the thread count should be 0
@@ -520,8 +371,7 @@ TEST_F(ThreadPoolTest, testStartAndStop) {
     // check that the number of processed tasks matches the number of items
     checkRunHistory(items_count);
 
-    // calling stop with false should clear all threads and should keep queued
-    // items
+    // calling stop should clear all threads and should keep queued items
     thread_pool.stop();
     // the item count should be 0
     ASSERT_EQ(thread_pool.count(), 0);
diff --git a/src/lib/dhcpsrv/thread_pool.cc b/src/lib/dhcpsrv/thread_pool.cc
deleted file mode 100644 (file)
index 7b2c5bd..0000000
+++ /dev/null
@@ -1,233 +0,0 @@
-// Copyright (C) 2018-2019 Internet Systems Consortium, Inc. ("ISC")
-//
-// This Source Code Form is subject to the terms of the Mozilla Public
-// License, v. 2.0. If a copy of the MPL was not distributed with this
-// file, You can obtain one at http://mozilla.org/MPL/2.0/.
-
-#include <config.h>
-
-#include <dhcpsrv/dhcpsrv_log.h>
-#include <dhcpsrv/thread_pool.h>
-
-using namespace std;
-
-namespace isc {
-namespace dhcp {
-
-/// @brief Defines a generic thread pool queue.
-///
-/// The main purpose is to safely manage thread pool tasks.
-/// The thread pool queue can be 'disabled', which means that no items can be
-/// removed from the queue, or 'enabled', which guarantees that inserting or
-/// removing items are thread safe.
-/// In 'disabled' state, all threads waiting on the queue are unlocked and all
-/// operations are non blocking.
-template <typename WorkItem>
-struct ThreadPoolQueue {
-    /// @brief Constructor
-    ///
-    /// Creates the thread pool queue in 'disabled' state
-    ThreadPoolQueue() : exit_(true) {
-    }
-
-    /// @brief Destructor
-    ///
-    /// Destroys the thread pool queue
-    ~ThreadPoolQueue() {
-        stop(true);
-    }
-
-    /// @brief push work item to the queue
-    ///
-    /// Used to add work items in the queue.
-    /// This function adds an item to the queue and wakes up at least one thread
-    /// waiting on the queue.
-    ///
-    /// @param item the new item to be added to the queue
-    void push(WorkItem& item) {
-        if (item == nullptr) {
-            return;
-        }
-        std::lock_guard<std::mutex> lock(mutex_);
-        queue_.push(item);
-        // Notify pop function so that it can effectively remove a work item.
-        cv_.notify_all();
-    }
-
-    /// @brief pop work item from the queue or block waiting
-    ///
-    /// Used to retrieve and remove a work item from the queue
-    /// If the queue is 'disabled', this function returns immediately
-    /// (no element).
-    /// If the queue is 'enabled', this function returns the first element in
-    /// the queue or blocks the calling thread if there are no work items
-    /// available.
-    ///
-    /// @param item the reference of the item removed from the queue, if any
-    ///
-    /// @return true if there was a work item removed from the queue, false
-    /// otherwise
-    WorkItem pop() {
-        std::unique_lock<std::mutex> lock(mutex_);
-        while (!exit_) {
-            if (queue_.empty()) {
-                // Wait for push or stop functions.
-                cv_.wait(lock);
-                continue;
-            }
-
-            WorkItem item = queue_.front();
-            queue_.pop();
-            return item;
-        }
-
-        return nullptr;
-    }
-
-    /// @brief count number of work items in the queue
-    ///
-    /// Returns the number of work items in the queue
-    ///
-    /// @return the number of work items
-    size_t count() {
-        std::lock_guard<std::mutex> lock(mutex_);
-        return queue_.size();
-    }
-
-    /// @brief clear remove all work items
-    ///
-    /// Removes all queued work items
-    void clear() {
-        std::lock_guard<std::mutex> lock(mutex_);
-        reset();
-    }
-
-    /// @brief start and enable the queue
-    ///
-    /// Sets the queue state to 'enabled'
-    void start() {
-        std::lock_guard<std::mutex> lock(mutex_);
-        exit_ = false;
-    }
-
-    /// brief stop and disable the queue
-    ///
-    /// Sets the queue state to 'disabled' and optionally removes all work items
-    ///
-    /// @param clear used to specify if the function should also clear the queue
-    void stop(bool clear = false) {
-        std::lock_guard<std::mutex> lock(mutex_);
-        exit_ = true;
-        // Notify pop so that it can exit.
-        cv_.notify_all();
-        if (clear) {
-            reset();
-        }
-    }
-
-private:
-    /// @brief reset clears the queue removing all work items
-    ///
-    /// Must be called in a critical section (mutex locked scope).
-    void reset() {
-        queue_ = std::queue<WorkItem>();
-    }
-
-    /// @brief underlying queue container
-    std::queue<WorkItem> queue_;
-
-    /// @brief mutex used for critical sections
-    std::mutex mutex_;
-
-    /// @brief condition variable used to signal waiting threads
-    std::condition_variable cv_;
-
-    /// @brief the sate of the queue
-    /// The 'enabled' state corresponds to false value
-    /// The 'disabled' state corresponds to true value
-    std::atomic_bool exit_;
-};
-
-ThreadPool::ThreadPool() :
-    queue_(make_shared<ThreadPoolQueue<WorkItemCallBack>>()), exit_(true) {
-}
-
-ThreadPool::~ThreadPool() {
-    destroy();
-}
-
-void ThreadPool::create(uint32_t thread_count, bool run) {
-    LOG_INFO(dhcpsrv_logger, "Thread pool starting with %1 worker threads")
-        .arg(thread_count);
-
-    if (!thread_count) {
-        return;
-    }
-    destroy();
-    if (run) {
-        start(thread_count);
-    }
-
-    LOG_INFO(dhcpsrv_logger, "Thread pool created");
-}
-
-void ThreadPool::destroy() {
-    LOG_INFO(dhcpsrv_logger, "Thread pool shutting down");
-
-    stop(true);
-
-    LOG_INFO(dhcpsrv_logger, "Thread pool shut down");
-}
-
-void ThreadPool::start(uint32_t thread_count) {
-    if (!thread_count || !exit_) {
-        return;
-    }
-    queue_->start();
-    exit_ = false;
-    for (int i = 0; i < thread_count; ++i) {
-        threads_.push_back(make_shared<thread>(&ThreadPool::run, this));
-    }
-
-    LOG_INFO(dhcpsrv_logger, "Thread pool started");
-}
-
-void ThreadPool::stop(bool clear) {
-    exit_ = true;
-    queue_->stop(clear);
-    for (auto thread : threads_) {
-        thread->join();
-    }
-    threads_.clear();
-
-    LOG_INFO(dhcpsrv_logger, "Thread pool stopped");
-}
-
-void ThreadPool::add(WorkItemCallBack& call_back) {
-    queue_->push(call_back);
-}
-
-size_t ThreadPool::count() {
-    return queue_->count();
-}
-
-size_t ThreadPool::size() {
-    return threads_.size();
-}
-
-void ThreadPool::run() {
-    thread::id th_id = this_thread::get_id();
-    LOG_INFO(dhcpsrv_logger, "Thread pool thread started. id: %1").arg(th_id);
-
-    while (!exit_) {
-        WorkItemCallBack item = queue_->pop();
-        if (item != nullptr) {
-            item();
-        }
-    }
-
-    LOG_INFO(dhcpsrv_logger, "Thread pool thread ended. id: %1").arg(th_id);
-}
-
-}  // namespace dhcp
-}  // namespace isc
index 4e23707e889a450eba3e7468dfdc23c847eb9db0..1432d4175bf05080804af89d165954611733e732 100644 (file)
@@ -7,7 +7,7 @@
 #ifndef THREAD_POOL_H
 #define THREAD_POOL_H
 
-#include <boost/function.hpp>
+#include <dhcpsrv/dhcpsrv_log.h>
 
 #include <atomic>
 #include <condition_variable>
 namespace isc {
 namespace dhcp {
 
-template <typename WorkItem>
-struct ThreadPoolQueue;
-
 /// @brief Defines a thread pool which uses a thread pool queue for managing
 /// work items. Each work item is a 'function' object.
+template <typename WorkItem>
 struct ThreadPool {
-    using WorkItemCallBack = std::function<void()>;
-
     /// @brief Constructor
-    ThreadPool();
+    ThreadPool() : running_(false) {
+    }
 
     /// @brief Destructor
-    ~ThreadPool();
+    ~ThreadPool() {
+        reset();
+    }
 
-    /// @brief initialize the thread pool with specified number of treads and
-    /// optionally starts all threads.
-    ///
-    /// @param worker_threads specifies the number of threads to be created
-    /// @param run to indicate if the threads should be started immediately
-    void create(uint32_t thread_count, bool run = true);
+    /// @brief reset the thread pool stopping threads and clearing the internal
+    /// queue
+    void reset() {
+        LOG_INFO(dhcpsrv_logger, "Thread pool shutting down");
 
-    /// @brief de-initialize the thread pool stopping threads and clearing the
-    /// internal queue
-    void destroy();
+        stop();
+        queue_.clear();
+
+        LOG_INFO(dhcpsrv_logger, "Thread pool shut down");
+    }
 
     /// @brief start all the threads
     ///
-    /// @param worker_threads specifies the number of threads to be created
-    void start(uint32_t thread_count);
+    /// @param thread_count specifies the number of threads to be created and
+    /// started
+    void start(uint32_t thread_count) {
+        LOG_INFO(dhcpsrv_logger, "Thread pool starting with %1 worker threads")
+            .arg(thread_count);
+        if (!thread_count || running_) {
+            return;
+        }
+        queue_.enable();
+        running_ = true;
+        for (uint32_t i = 0; i < thread_count; ++i) {
+            threads_.push_back(std::make_shared<std::thread>(&ThreadPool::run, this));
+        }
+
+        LOG_INFO(dhcpsrv_logger, "Thread pool started");
+    }
 
     /// @brief stop all the threads
-    ///
-    /// @param clear used to specify if the function should also clear the queue
-    void stop(bool clear = false);
+    void stop() {
+        LOG_INFO(dhcpsrv_logger, "Thread pool stopping");
+        running_ = false;
+        queue_.disable();
+        for (auto thread : threads_) {
+            thread->join();
+        }
+        threads_.clear();
+
+        LOG_INFO(dhcpsrv_logger, "Thread pool stopped");
+    }
 
     /// @brief add a working item to the thread pool
     ///
     /// @param call_back the 'function' object to be added to the queue
-    void add(WorkItemCallBack& call_back);
+    void add(WorkItem& item) {
+        queue_.push(item);
+    }
 
     /// @brief count number of work items in the queue
     ///
     /// @return the number of work items in the queue
-    size_t count();
+    size_t count() {
+        return queue_.count();
+    }
 
     /// @brief size number of thread pool threads
     ///
     /// @return the number of threads
-    size_t size();
-
+    size_t size() {
+        return threads_.size();
+    }
 private:
+    /// @brief Defines a generic thread pool queue.
+    ///
+    /// The main purpose is to safely manage thread pool tasks.
+    /// The thread pool queue can be 'disabled', which means that no items can be
+    /// removed from the queue, or 'enabled', which guarantees that inserting or
+    /// removing items are thread safe.
+    /// In 'disabled' state, all threads waiting on the queue are unlocked and all
+    /// operations are non blocking.
+    template <typename Item>
+    struct ThreadPoolQueue {
+        /// @brief Constructor
+        ///
+        /// Creates the thread pool queue in 'disabled' state
+        ThreadPoolQueue() : enabled_(false) {
+        }
+
+        /// @brief Destructor
+        ///
+        /// Destroys the thread pool queue
+        ~ThreadPoolQueue() {
+            disable();
+            clear();
+        }
+
+        /// @brief push work item to the queue
+        ///
+        /// Used to add work items in the queue.
+        /// This function adds an item to the queue and wakes up at least one thread
+        /// waiting on the queue.
+        ///
+        /// @param item the new item to be added to the queue
+        void push(Item& item) {
+            if (!item) {
+                return;
+            }
+            std::lock_guard<std::mutex> lock(mutex_);
+            queue_.push(item);
+            // Notify pop function so that it can effectively remove a work item.
+            cv_.notify_all();
+        }
+
+        /// @brief pop work item from the queue or block waiting
+        ///
+        /// Used to retrieve and remove a work item from the queue
+        /// If the queue is 'disabled', this function returns immediately
+        /// (no element).
+        /// If the queue is 'enabled', this function returns the first element in
+        /// the queue or blocks the calling thread if there are no work items
+        /// available.
+        ///
+        /// @param item the reference of the item removed from the queue, if any
+        ///
+        /// @return true if there was a work item removed from the queue, false
+        /// otherwise
+        Item pop() {
+            std::unique_lock<std::mutex> lock(mutex_);
+            while (enabled_) {
+                if (queue_.empty()) {
+                    // Wait for push or stop functions.
+                    cv_.wait(lock);
+                    continue;
+                }
+
+                Item item = queue_.front();
+                queue_.pop();
+                return item;
+            }
+
+            return Item();
+        }
+
+        /// @brief count number of work items in the queue
+        ///
+        /// Returns the number of work items in the queue
+        ///
+        /// @return the number of work items
+        size_t count() {
+            std::lock_guard<std::mutex> lock(mutex_);
+            return queue_.size();
+        }
+
+        /// @brief clear remove all work items
+        ///
+        /// Removes all queued work items
+        void clear() {
+            std::lock_guard<std::mutex> lock(mutex_);
+            queue_ = std::queue<Item>();
+        }
+
+        /// @brief start and enable the queue
+        ///
+        /// Sets the queue state to 'enabled'
+        void enable() {
+            std::lock_guard<std::mutex> lock(mutex_);
+            enabled_ = true;
+        }
+
+        /// brief stop and disable the queue
+        ///
+        /// Sets the queue state to 'disabled' and optionally removes all work items
+        void disable() {
+            std::lock_guard<std::mutex> lock(mutex_);
+            enabled_ = false;
+            // Notify pop so that it can exit.
+            cv_.notify_all();
+        }
+
+    private:
+        /// @brief underlying queue container
+        std::queue<Item> queue_;
+
+        /// @brief mutex used for critical sections
+        std::mutex mutex_;
+
+        /// @brief condition variable used to signal waiting threads
+        std::condition_variable cv_;
+
+        /// @brief the sate of the queue
+        /// The 'enabled' state corresponds to false value
+        /// The 'disabled' state corresponds to true value
+        std::atomic_bool enabled_;
+    };
+
     /// @brief run function of each thread
-    void run();
+    void run() {
+        std::thread::id th_id = std::this_thread::get_id();
+        LOG_INFO(dhcpsrv_logger, "Thread pool thread started. id: %1")
+            .arg(th_id);
+
+        while (running_) {
+            WorkItem item = queue_.pop();
+            if (item) {
+                item();
+            }
+        }
+
+        LOG_INFO(dhcpsrv_logger, "Thread pool thread ended. id: %1")
+            .arg(th_id);
+    }
 
     /// @brief list of worker threads
     std::list<std::shared_ptr<std::thread>> threads_;
 
     /// @brief underlying work items queue
-    std::shared_ptr<ThreadPoolQueue<WorkItemCallBack>> queue_;
+    ThreadPoolQueue<WorkItem> queue_;
 
     /// @brief state of the thread pool
     /// The 'run' state corresponds to false value
     /// The 'stop' state corresponds to true value
-    std::atomic_bool exit_;
+    std::atomic_bool running_;
 };
 
 }  // namespace dhcp