]> git.ipfire.org Git - thirdparty/kea.git/commitdiff
[#883, !506] moved thread pool in util lib
authorRazvan Becheriu <razvan@isc.org>
Fri, 8 Nov 2019 14:28:18 +0000 (16:28 +0200)
committerRazvan Becheriu <razvan@isc.org>
Fri, 8 Nov 2019 14:28:18 +0000 (16:28 +0200)
src/lib/dhcpsrv/Makefile.am
src/lib/dhcpsrv/tests/Makefile.am
src/lib/util/Makefile.am
src/lib/util/lock_guard.h [deleted file]
src/lib/util/tests/Makefile.am
src/lib/util/tests/lock_guard_unittest.cc [deleted file]
src/lib/util/tests/thread_pool_unittest.cc [moved from src/lib/dhcpsrv/tests/thread_pool_unittest.cc with 99% similarity]
src/lib/util/thread_pool.h [moved from src/lib/dhcpsrv/thread_pool.h with 78% similarity]

index 1d705cd5736cd2f3f1da39a4924c2ec1db570ac4..d0076959dd2f9915f05a686d33dab6f5ef54b878 100644 (file)
@@ -118,7 +118,6 @@ libkea_dhcpsrv_la_SOURCES += lease_mgr_factory.cc lease_mgr_factory.h
 libkea_dhcpsrv_la_SOURCES += memfile_lease_mgr.cc memfile_lease_mgr.h
 libkea_dhcpsrv_la_SOURCES += memfile_lease_storage.h
 libkea_dhcpsrv_la_SOURCES += multi_threading_utils.h multi_threading_utils.cc
-libkea_dhcpsrv_la_SOURCES += thread_pool.h
 
 if HAVE_MYSQL
 libkea_dhcpsrv_la_SOURCES += mysql_lease_mgr.cc mysql_lease_mgr.h
index 86bc606cf217f589540e89e9bd221ddf11967847..d9cac94776fa096b9da89fee85c20e181f6ae346 100644 (file)
@@ -125,7 +125,6 @@ libdhcpsrv_unittests_SOURCES += srv_config_unittest.cc
 libdhcpsrv_unittests_SOURCES += subnet_unittest.cc
 libdhcpsrv_unittests_SOURCES += test_get_callout_handle.cc test_get_callout_handle.h
 libdhcpsrv_unittests_SOURCES += triplet_unittest.cc
-libdhcpsrv_unittests_SOURCES += thread_pool_unittest.cc
 libdhcpsrv_unittests_SOURCES += test_utils.cc test_utils.h
 libdhcpsrv_unittests_SOURCES += timer_mgr_unittest.cc
 libdhcpsrv_unittests_SOURCES += network_state_unittest.cc
index 1ec700bc96c7d489db44b4e6f5965b73e1222db1..6744363064f62dd108d6c53cceb193d9e7f8b834 100644 (file)
@@ -13,7 +13,6 @@ libkea_util_la_SOURCES += csv_file.h csv_file.cc
 libkea_util_la_SOURCES += doubles.h
 libkea_util_la_SOURCES += filename.h filename.cc
 libkea_util_la_SOURCES += hash.h
-libkea_util_la_SOURCES += lock_guard.h
 libkea_util_la_SOURCES += labeled_value.h labeled_value.cc
 libkea_util_la_SOURCES += memory_segment.h
 libkea_util_la_SOURCES += memory_segment_local.h memory_segment_local.cc
@@ -29,6 +28,7 @@ libkea_util_la_SOURCES += state_model.cc state_model.h
 libkea_util_la_SOURCES += stopwatch.cc stopwatch.h
 libkea_util_la_SOURCES += stopwatch_impl.cc stopwatch_impl.h
 libkea_util_la_SOURCES += strutil.h strutil.cc
+libkea_util_la_SOURCES += thread_pool.h
 libkea_util_la_SOURCES += time_utilities.h time_utilities.cc
 libkea_util_la_SOURCES += versioned_csv_file.h versioned_csv_file.cc
 libkea_util_la_SOURCES += watch_socket.cc watch_socket.h
diff --git a/src/lib/util/lock_guard.h b/src/lib/util/lock_guard.h
deleted file mode 100644 (file)
index de919ab..0000000
+++ /dev/null
@@ -1,43 +0,0 @@
-// Copyright (C) 2018-2019 Internet Systems Consortium, Inc. ("ISC")
-//
-// This Source Code Form is subject to the terms of the Mozilla Public
-// License, v. 2.0. If a copy of the MPL was not distributed with this
-// file, You can obtain one at http://mozilla.org/MPL/2.0/.
-
-#ifndef LOCK_GUARD_H
-#define LOCK_GUARD_H
-
-#include <memory>
-
-namespace isc {
-namespace util {
-
-template <typename Lock>
-class LockGuard {
-public:
-    LockGuard(Lock* lock) : lk_(lock) {
-        if (lk_) {
-            lk_->lock();
-        }
-    }
-
-    ~LockGuard() {
-        if (lk_) {
-            lk_->unlock();
-        }
-    }
-
-    LockGuard(const LockGuard&) = delete;
-    LockGuard& operator=(const LockGuard&) = delete;
-
-    LockGuard(LockGuard&&) = delete;
-    LockGuard& operator=(LockGuard&&) = delete;
-
-private:
-    Lock* lk_;
-};
-
-}  // namespace util
-}  // namespace isc
-
-#endif  // LOCK_GUARD_H
index 62ce742a3d1ac0c7acfd0a08ac6e036589eb60f2..02750dd92148ba4fe0d0f607a2b1053eafb759d6 100644 (file)
@@ -41,7 +41,6 @@ run_unittests_SOURCES += hash_unittest.cc
 run_unittests_SOURCES += hex_unittest.cc
 run_unittests_SOURCES += io_utilities_unittest.cc
 run_unittests_SOURCES += labeled_value_unittest.cc
-run_unittests_SOURCES += lock_guard_unittest.cc
 run_unittests_SOURCES += memory_segment_local_unittest.cc
 run_unittests_SOURCES += memory_segment_common_unittest.h
 run_unittests_SOURCES += memory_segment_common_unittest.cc
@@ -54,6 +53,7 @@ run_unittests_SOURCES += random_number_generator_unittest.cc
 run_unittests_SOURCES += staged_value_unittest.cc
 run_unittests_SOURCES += state_model_unittest.cc
 run_unittests_SOURCES += strutil_unittest.cc
+run_unittests_SOURCES += thread_pool_unittest.cc
 run_unittests_SOURCES += time_utilities_unittest.cc
 run_unittests_SOURCES += range_utilities_unittest.cc
 run_unittests_SOURCES += signal_set_unittest.cc
diff --git a/src/lib/util/tests/lock_guard_unittest.cc b/src/lib/util/tests/lock_guard_unittest.cc
deleted file mode 100644 (file)
index 425a5b4..0000000
+++ /dev/null
@@ -1,223 +0,0 @@
-// Copyright (C) 2018-2019 Internet Systems Consortium, Inc. ("ISC")
-//
-// This Source Code Form is subject to the terms of the Mozilla Public
-// License, v. 2.0. If a copy of the MPL was not distributed with this
-// file, You can obtain one at http://mozilla.org/MPL/2.0/.
-
-#include <config.h>
-
-#include <gtest/gtest.h>
-
-#include <util/lock_guard.h>
-#include <exceptions/exceptions.h>
-
-#include <condition_variable>
-#include <list>
-#include <mutex>
-#include <set>
-#include <thread>
-
-using namespace isc::util;
-using namespace std;
-
-namespace {
-
-/// @brief test mutex class used to check the internal state of a 'fictional'
-/// mutex so that the functionality of the LockGuard can be tested
-/// the test mutex can be recursive which means that a lock can be called on the
-/// same thread and not resulting in a dead lock
-class TestMutex {
-public:
-    /// @brief Constructor
-    ///
-    /// @param recursive sets the mutex as recursive mutex
-    TestMutex(bool recursive = false) : lock_(0), dead_lock_(false),
-         lock_count_(0), unlock_count_(0), recursive_(recursive) {
-    }
-
-    /// @brief lock the mutex
-    void lock() {
-        lock_guard<mutex> lk(mutex_);
-        if (lock_ >= 1) {
-            // mutex is already locked
-            if (!recursive_) {
-                // lock on a non-recursive mutex resulting in a dead lock
-                dead_lock_ = true;
-                isc_throw(isc::InvalidOperation,
-                          "recursive lock on already locked mutex resulting in "
-                          "dead lock");
-            } else {
-                // lock on a recursive mutex
-                if (this_thread::get_id() != id_) {
-                    // lock on a recursive mutex on a different thread resulting
-                    // in a dead lock
-                    dead_lock_ = true;
-                    isc_throw(isc::InvalidOperation,
-                              "recursive lock on a different thread on already "
-                              "locked mutex resulting in dead lock");
-                }
-            }
-        }
-        // increment the total number of locks
-        lock_count_++;
-        // increment the lock state
-        lock_++;
-        // save the thread id
-        id_ = this_thread::get_id();
-    }
-
-    /// @brief unlock the mutex
-    void unlock() {
-        lock_guard<mutex> lk(mutex_);
-        if (lock_ <= 0) {
-            // unlock an unlocked mutex
-            isc_throw(isc::InvalidOperation, "unlock on non locked mutex "
-                      "resulting in undefined behavior");
-        }
-        if (lock_ == 1) {
-            // only one thread has the lock
-            // self healing mutex reseting the dead lock flag
-            dead_lock_ = false;
-            // reset the thread id
-            id_ = std::thread::id();
-        }
-        // increment the total number of unlocks
-        unlock_count_++;
-        // decrement the lock state
-        lock_--;
-    }
-
-    /// @brief get the mutex lock state
-    ///
-    /// @return the mutex lock state
-    int32_t getLock() {
-        lock_guard<mutex> lk(mutex_);
-        return lock_;
-    }
-
-    /// @brief get the mutex dead lock state
-    ///
-    /// @return the mutex dead lock state
-    bool getDeadLock() {
-        lock_guard<mutex> lk(mutex_);
-        return dead_lock_;
-    }
-
-    /// @brief get the number of locks performed on mutex
-    ///
-    /// @return the mutex number of locks
-    uint32_t getLockCount() {
-        lock_guard<mutex> lk(mutex_);
-        return lock_count_;
-    }
-
-    /// @brief get the number of unlocks performed on mutex
-    ///
-    /// @return the mutex number of unlocks
-    uint32_t getUnlockCount() {
-        lock_guard<mutex> lk(mutex_);
-        return unlock_count_;
-    }
-
-    /// @brief test the internal state of the mutex
-    ///
-    /// @param expected_lock check equality of this value with lock state
-    /// @param expected_lock_count check equality of this value with lock count
-    /// @param expected_unlock_count check equality of this value with unlock count
-    /// @param expected_dead_lock check equality of this value with dead lock state
-    void testMutexState(int32_t expected_lock,
-                        uint32_t expected_lock_count,
-                        uint32_t expected_unlock_count,
-                        bool expected_dead_lock) {
-        ASSERT_EQ(getLock(), expected_lock);
-        ASSERT_EQ(getLockCount(), expected_lock_count);
-        ASSERT_EQ(getUnlockCount(), expected_unlock_count);
-        ASSERT_EQ(getDeadLock(), expected_dead_lock);
-    }
-
-private:
-    /// @brief internal lock state of the mutex
-    int32_t lock_;
-
-    /// @brief state which indicates that the mutex is in dead lock
-    bool dead_lock_;
-
-    /// @brief total number of locks performed on the mutex
-    uint32_t lock_count_;
-
-    /// @brief total number of unlocks performed on the mutex
-    uint32_t unlock_count_;
-
-    /// @brief flag to indicate if the mutex is recursive or not
-    bool recursive_;
-
-    /// @brief mutex used to keep the internal state consistent
-    mutex mutex_;
-
-    /// @brief the id of the thread holding the mutex
-    std::thread::id id_;
-};
-
-/// @brief Test Fixture for testing isc::util::LockGuard
-class LockGuardTest : public ::testing::Test {
-};
-
-/// @brief test LockGuard functionality with non-recursive mutex, recursive mutex
-/// and null pointer
-TEST_F(LockGuardTest, testLock) {
-    shared_ptr<TestMutex> test_mutex;
-    // test non-recursive lock
-    test_mutex = make_shared<TestMutex>();
-    test_mutex->testMutexState(0, 0, 0, false);
-    {
-        // call LockGuard constructor which locks mutex
-        LockGuard<TestMutex> lock(test_mutex.get());
-        // expect lock 1 lock_count 1 unlock_count 0 dead_lock false
-        test_mutex->testMutexState(1, 1, 0, false);
-        {
-            // call LockGuard constructor which locks mutex resulting in an
-            // exception as the mutex is already locked (dead lock)
-            EXPECT_THROW(LockGuard<TestMutex> lock(test_mutex.get()),
-                         isc::InvalidOperation);
-            // expect lock 1 lock_count 1 unlock_count 0 dead_lock true
-            // you should not be able to get here...using a real mutex
-            test_mutex->testMutexState(1, 1, 0, true);
-        }
-        // expect lock 1 lock_count 1 unlock_count 0 dead_lock true
-        // you should not be able to get here...using a real mutex
-        test_mutex->testMutexState(1, 1, 0, true);
-    }
-    // expect lock 0 lock_count 1 unlock_count 1 dead_lock false
-    // the implementation is self healing when completely unlocking the mutex
-    test_mutex->testMutexState(0, 1, 1, false);
-    // test recursive lock
-    test_mutex = make_shared<TestMutex>(true);
-    test_mutex->testMutexState(0, 0, 0, false);
-    {
-        // call LockGuard constructor which locks mutex
-        LockGuard<TestMutex> lock(test_mutex.get());
-        // expect lock 1 lock_count 1 unlock_count 0 dead_lock false
-        test_mutex->testMutexState(1, 1, 0, false);
-        {
-            // call LockGuard constructor which locks mutex but does not block
-            // as this is done on the same thread and the mutex is recursive
-            EXPECT_NO_THROW(LockGuard<TestMutex> lock(test_mutex.get()));
-            // expect lock 1 lock_count 2 unlock_count 1 dead_lock false
-            // the destructor was already called in EXPECT_NO_THROW scope
-            test_mutex->testMutexState(1, 2, 1, false);
-        }
-        // expect lock 1 lock_count 2 unlock_count 1 dead_lock false
-        test_mutex->testMutexState(1, 2, 1, false);
-    }
-    // expect lock 0 lock_count 2 unlock_count 2 dead_lock false
-    test_mutex->testMutexState(0, 2, 2, false);
-    // test LockGuart with no mutex
-    {
-        LockGuard<TestMutex> lock(nullptr);
-        {
-            EXPECT_NO_THROW(LockGuard<TestMutex> lock(nullptr));
-        }
-    }
-    }
-
-}  // namespace
similarity index 99%
rename from src/lib/dhcpsrv/tests/thread_pool_unittest.cc
rename to src/lib/util/tests/thread_pool_unittest.cc
index 218b78547695992ef80317380a8e3d5ddfe5872a..e3a99b721c0a011ae08311bd8cf5b86407492fee 100644 (file)
@@ -8,11 +8,11 @@
 
 #include <gtest/gtest.h>
 
-#include <dhcpsrv/thread_pool.h>
+#include <util/thread_pool.h>
 
 #include <boost/function.hpp>
 
-using namespace isc::dhcp;
+using namespace isc::util;
 using namespace std;
 
 namespace {
@@ -186,6 +186,7 @@ private:
     list<shared_ptr<std::thread>> threads_;
 };
 
+/// @brief define CallBack type
 typedef function<void()> CallBack;
 
 /// @brief test ThreadPool add and count
similarity index 78%
rename from src/lib/dhcpsrv/thread_pool.h
rename to src/lib/util/thread_pool.h
index 1432d4175bf05080804af89d165954611733e732..613c84570355a347849c12ae20111774877fe971 100644 (file)
@@ -7,8 +7,6 @@
 #ifndef THREAD_POOL_H
 #define THREAD_POOL_H
 
-#include <dhcpsrv/dhcpsrv_log.h>
-
 #include <atomic>
 #include <condition_variable>
 #include <list>
@@ -17,7 +15,7 @@
 #include <thread>
 
 namespace isc {
-namespace dhcp {
+namespace util {
 
 /// @brief Defines a thread pool which uses a thread pool queue for managing
 /// work items. Each work item is a 'function' object.
@@ -35,12 +33,8 @@ struct ThreadPool {
     /// @brief reset the thread pool stopping threads and clearing the internal
     /// queue
     void reset() {
-        LOG_INFO(dhcpsrv_logger, "Thread pool shutting down");
-
         stop();
         queue_.clear();
-
-        LOG_INFO(dhcpsrv_logger, "Thread pool shut down");
     }
 
     /// @brief start all the threads
@@ -48,8 +42,6 @@ struct ThreadPool {
     /// @param thread_count specifies the number of threads to be created and
     /// started
     void start(uint32_t thread_count) {
-        LOG_INFO(dhcpsrv_logger, "Thread pool starting with %1 worker threads")
-            .arg(thread_count);
         if (!thread_count || running_) {
             return;
         }
@@ -58,26 +50,21 @@ struct ThreadPool {
         for (uint32_t i = 0; i < thread_count; ++i) {
             threads_.push_back(std::make_shared<std::thread>(&ThreadPool::run, this));
         }
-
-        LOG_INFO(dhcpsrv_logger, "Thread pool started");
     }
 
     /// @brief stop all the threads
     void stop() {
-        LOG_INFO(dhcpsrv_logger, "Thread pool stopping");
         running_ = false;
         queue_.disable();
         for (auto thread : threads_) {
             thread->join();
         }
         threads_.clear();
-
-        LOG_INFO(dhcpsrv_logger, "Thread pool stopped");
     }
 
-    /// @brief add a working item to the thread pool
+    /// @brief add a work item to the thread pool
     ///
-    /// @param call_back the 'function' object to be added to the queue
+    /// @param item the 'function' object to be added to the queue
     void add(WorkItem& item) {
         queue_.push(item);
     }
@@ -95,6 +82,7 @@ struct ThreadPool {
     size_t size() {
         return threads_.size();
     }
+
 private:
     /// @brief Defines a generic thread pool queue.
     ///
@@ -122,7 +110,7 @@ private:
 
         /// @brief push work item to the queue
         ///
-        /// Used to add work items in the queue.
+        /// Used to add work items to the queue.
         /// This function adds an item to the queue and wakes up at least one thread
         /// waiting on the queue.
         ///
@@ -140,21 +128,18 @@ private:
         /// @brief pop work item from the queue or block waiting
         ///
         /// Used to retrieve and remove a work item from the queue
-        /// If the queue is 'disabled', this function returns immediately
-        /// (no element).
+        /// If the queue is 'disabled', this function returns immediately an empty
+        /// element.
         /// If the queue is 'enabled', this function returns the first element in
         /// the queue or blocks the calling thread if there are no work items
         /// available.
         ///
-        /// @param item the reference of the item removed from the queue, if any
-        ///
-        /// @return true if there was a work item removed from the queue, false
-        /// otherwise
+        /// @return the first work item from the queue or an empty element.
         Item pop() {
             std::unique_lock<std::mutex> lock(mutex_);
             while (enabled_) {
                 if (queue_.empty()) {
-                    // Wait for push or stop functions.
+                    // Wait for push or disable functions.
                     cv_.wait(lock);
                     continue;
                 }
@@ -185,7 +170,7 @@ private:
             queue_ = std::queue<Item>();
         }
 
-        /// @brief start and enable the queue
+        /// @brief enable the queue
         ///
         /// Sets the queue state to 'enabled'
         void enable() {
@@ -193,9 +178,9 @@ private:
             enabled_ = true;
         }
 
-        /// brief stop and disable the queue
+        /// brief disable the queue
         ///
-        /// Sets the queue state to 'disabled' and optionally removes all work items
+        /// Sets the queue state to 'disabled'
         void disable() {
             std::lock_guard<std::mutex> lock(mutex_);
             enabled_ = false;
@@ -214,26 +199,22 @@ private:
         std::condition_variable cv_;
 
         /// @brief the sate of the queue
-        /// The 'enabled' state corresponds to false value
-        /// The 'disabled' state corresponds to true value
+        /// The 'enabled' state corresponds to true value
+        /// The 'disabled' state corresponds to false value
         std::atomic_bool enabled_;
     };
 
     /// @brief run function of each thread
     void run() {
-        std::thread::id th_id = std::this_thread::get_id();
-        LOG_INFO(dhcpsrv_logger, "Thread pool thread started. id: %1")
-            .arg(th_id);
-
         while (running_) {
             WorkItem item = queue_.pop();
             if (item) {
-                item();
+                try {
+                    item();
+                } catch (...) {
+                }
             }
         }
-
-        LOG_INFO(dhcpsrv_logger, "Thread pool thread ended. id: %1")
-            .arg(th_id);
     }
 
     /// @brief list of worker threads
@@ -243,12 +224,12 @@ private:
     ThreadPoolQueue<WorkItem> queue_;
 
     /// @brief state of the thread pool
-    /// The 'run' state corresponds to false value
-    /// The 'stop' state corresponds to true value
+    /// The 'running' state corresponds to true value
+    /// The 'not running' state corresponds to false value
     std::atomic_bool running_;
 };
 
-}  // namespace dhcp
+}  // namespace util
 }  // namespace isc
 
 #endif  // THREAD_POOL_H