From: Razvan Becheriu Date: Fri, 8 Nov 2019 14:28:18 +0000 (+0200) Subject: [#883, !506] moved thread pool in util lib X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=a30931b1d53ce106904075556ef84bdfae87775f;p=thirdparty%2Fkea.git [#883, !506] moved thread pool in util lib --- diff --git a/src/lib/dhcpsrv/Makefile.am b/src/lib/dhcpsrv/Makefile.am index 1d705cd573..d0076959dd 100644 --- a/src/lib/dhcpsrv/Makefile.am +++ b/src/lib/dhcpsrv/Makefile.am @@ -118,7 +118,6 @@ libkea_dhcpsrv_la_SOURCES += lease_mgr_factory.cc lease_mgr_factory.h libkea_dhcpsrv_la_SOURCES += memfile_lease_mgr.cc memfile_lease_mgr.h libkea_dhcpsrv_la_SOURCES += memfile_lease_storage.h libkea_dhcpsrv_la_SOURCES += multi_threading_utils.h multi_threading_utils.cc -libkea_dhcpsrv_la_SOURCES += thread_pool.h if HAVE_MYSQL libkea_dhcpsrv_la_SOURCES += mysql_lease_mgr.cc mysql_lease_mgr.h diff --git a/src/lib/dhcpsrv/tests/Makefile.am b/src/lib/dhcpsrv/tests/Makefile.am index 86bc606cf2..d9cac94776 100644 --- a/src/lib/dhcpsrv/tests/Makefile.am +++ b/src/lib/dhcpsrv/tests/Makefile.am @@ -125,7 +125,6 @@ libdhcpsrv_unittests_SOURCES += srv_config_unittest.cc libdhcpsrv_unittests_SOURCES += subnet_unittest.cc libdhcpsrv_unittests_SOURCES += test_get_callout_handle.cc test_get_callout_handle.h libdhcpsrv_unittests_SOURCES += triplet_unittest.cc -libdhcpsrv_unittests_SOURCES += thread_pool_unittest.cc libdhcpsrv_unittests_SOURCES += test_utils.cc test_utils.h libdhcpsrv_unittests_SOURCES += timer_mgr_unittest.cc libdhcpsrv_unittests_SOURCES += network_state_unittest.cc diff --git a/src/lib/util/Makefile.am b/src/lib/util/Makefile.am index 1ec700bc96..6744363064 100644 --- a/src/lib/util/Makefile.am +++ b/src/lib/util/Makefile.am @@ -13,7 +13,6 @@ libkea_util_la_SOURCES += csv_file.h csv_file.cc libkea_util_la_SOURCES += doubles.h libkea_util_la_SOURCES += filename.h filename.cc libkea_util_la_SOURCES += hash.h -libkea_util_la_SOURCES += lock_guard.h libkea_util_la_SOURCES += labeled_value.h labeled_value.cc libkea_util_la_SOURCES += memory_segment.h libkea_util_la_SOURCES += memory_segment_local.h memory_segment_local.cc @@ -29,6 +28,7 @@ libkea_util_la_SOURCES += state_model.cc state_model.h libkea_util_la_SOURCES += stopwatch.cc stopwatch.h libkea_util_la_SOURCES += stopwatch_impl.cc stopwatch_impl.h libkea_util_la_SOURCES += strutil.h strutil.cc +libkea_util_la_SOURCES += thread_pool.h libkea_util_la_SOURCES += time_utilities.h time_utilities.cc libkea_util_la_SOURCES += versioned_csv_file.h versioned_csv_file.cc libkea_util_la_SOURCES += watch_socket.cc watch_socket.h diff --git a/src/lib/util/lock_guard.h b/src/lib/util/lock_guard.h deleted file mode 100644 index de919ab5f7..0000000000 --- a/src/lib/util/lock_guard.h +++ /dev/null @@ -1,43 +0,0 @@ -// Copyright (C) 2018-2019 Internet Systems Consortium, Inc. ("ISC") -// -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at http://mozilla.org/MPL/2.0/. - -#ifndef LOCK_GUARD_H -#define LOCK_GUARD_H - -#include - -namespace isc { -namespace util { - -template -class LockGuard { -public: - LockGuard(Lock* lock) : lk_(lock) { - if (lk_) { - lk_->lock(); - } - } - - ~LockGuard() { - if (lk_) { - lk_->unlock(); - } - } - - LockGuard(const LockGuard&) = delete; - LockGuard& operator=(const LockGuard&) = delete; - - LockGuard(LockGuard&&) = delete; - LockGuard& operator=(LockGuard&&) = delete; - -private: - Lock* lk_; -}; - -} // namespace util -} // namespace isc - -#endif // LOCK_GUARD_H diff --git a/src/lib/util/tests/Makefile.am b/src/lib/util/tests/Makefile.am index 62ce742a3d..02750dd921 100644 --- a/src/lib/util/tests/Makefile.am +++ b/src/lib/util/tests/Makefile.am @@ -41,7 +41,6 @@ run_unittests_SOURCES += hash_unittest.cc run_unittests_SOURCES += hex_unittest.cc run_unittests_SOURCES += io_utilities_unittest.cc run_unittests_SOURCES += labeled_value_unittest.cc -run_unittests_SOURCES += lock_guard_unittest.cc run_unittests_SOURCES += memory_segment_local_unittest.cc run_unittests_SOURCES += memory_segment_common_unittest.h run_unittests_SOURCES += memory_segment_common_unittest.cc @@ -54,6 +53,7 @@ run_unittests_SOURCES += random_number_generator_unittest.cc run_unittests_SOURCES += staged_value_unittest.cc run_unittests_SOURCES += state_model_unittest.cc run_unittests_SOURCES += strutil_unittest.cc +run_unittests_SOURCES += thread_pool_unittest.cc run_unittests_SOURCES += time_utilities_unittest.cc run_unittests_SOURCES += range_utilities_unittest.cc run_unittests_SOURCES += signal_set_unittest.cc diff --git a/src/lib/util/tests/lock_guard_unittest.cc b/src/lib/util/tests/lock_guard_unittest.cc deleted file mode 100644 index 425a5b440b..0000000000 --- a/src/lib/util/tests/lock_guard_unittest.cc +++ /dev/null @@ -1,223 +0,0 @@ -// Copyright (C) 2018-2019 Internet Systems Consortium, Inc. ("ISC") -// -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at http://mozilla.org/MPL/2.0/. - -#include - -#include - -#include -#include - -#include -#include -#include -#include -#include - -using namespace isc::util; -using namespace std; - -namespace { - -/// @brief test mutex class used to check the internal state of a 'fictional' -/// mutex so that the functionality of the LockGuard can be tested -/// the test mutex can be recursive which means that a lock can be called on the -/// same thread and not resulting in a dead lock -class TestMutex { -public: - /// @brief Constructor - /// - /// @param recursive sets the mutex as recursive mutex - TestMutex(bool recursive = false) : lock_(0), dead_lock_(false), - lock_count_(0), unlock_count_(0), recursive_(recursive) { - } - - /// @brief lock the mutex - void lock() { - lock_guard lk(mutex_); - if (lock_ >= 1) { - // mutex is already locked - if (!recursive_) { - // lock on a non-recursive mutex resulting in a dead lock - dead_lock_ = true; - isc_throw(isc::InvalidOperation, - "recursive lock on already locked mutex resulting in " - "dead lock"); - } else { - // lock on a recursive mutex - if (this_thread::get_id() != id_) { - // lock on a recursive mutex on a different thread resulting - // in a dead lock - dead_lock_ = true; - isc_throw(isc::InvalidOperation, - "recursive lock on a different thread on already " - "locked mutex resulting in dead lock"); - } - } - } - // increment the total number of locks - lock_count_++; - // increment the lock state - lock_++; - // save the thread id - id_ = this_thread::get_id(); - } - - /// @brief unlock the mutex - void unlock() { - lock_guard lk(mutex_); - if (lock_ <= 0) { - // unlock an unlocked mutex - isc_throw(isc::InvalidOperation, "unlock on non locked mutex " - "resulting in undefined behavior"); - } - if (lock_ == 1) { - // only one thread has the lock - // self healing mutex reseting the dead lock flag - dead_lock_ = false; - // reset the thread id - id_ = std::thread::id(); - } - // increment the total number of unlocks - unlock_count_++; - // decrement the lock state - lock_--; - } - - /// @brief get the mutex lock state - /// - /// @return the mutex lock state - int32_t getLock() { - lock_guard lk(mutex_); - return lock_; - } - - /// @brief get the mutex dead lock state - /// - /// @return the mutex dead lock state - bool getDeadLock() { - lock_guard lk(mutex_); - return dead_lock_; - } - - /// @brief get the number of locks performed on mutex - /// - /// @return the mutex number of locks - uint32_t getLockCount() { - lock_guard lk(mutex_); - return lock_count_; - } - - /// @brief get the number of unlocks performed on mutex - /// - /// @return the mutex number of unlocks - uint32_t getUnlockCount() { - lock_guard lk(mutex_); - return unlock_count_; - } - - /// @brief test the internal state of the mutex - /// - /// @param expected_lock check equality of this value with lock state - /// @param expected_lock_count check equality of this value with lock count - /// @param expected_unlock_count check equality of this value with unlock count - /// @param expected_dead_lock check equality of this value with dead lock state - void testMutexState(int32_t expected_lock, - uint32_t expected_lock_count, - uint32_t expected_unlock_count, - bool expected_dead_lock) { - ASSERT_EQ(getLock(), expected_lock); - ASSERT_EQ(getLockCount(), expected_lock_count); - ASSERT_EQ(getUnlockCount(), expected_unlock_count); - ASSERT_EQ(getDeadLock(), expected_dead_lock); - } - -private: - /// @brief internal lock state of the mutex - int32_t lock_; - - /// @brief state which indicates that the mutex is in dead lock - bool dead_lock_; - - /// @brief total number of locks performed on the mutex - uint32_t lock_count_; - - /// @brief total number of unlocks performed on the mutex - uint32_t unlock_count_; - - /// @brief flag to indicate if the mutex is recursive or not - bool recursive_; - - /// @brief mutex used to keep the internal state consistent - mutex mutex_; - - /// @brief the id of the thread holding the mutex - std::thread::id id_; -}; - -/// @brief Test Fixture for testing isc::util::LockGuard -class LockGuardTest : public ::testing::Test { -}; - -/// @brief test LockGuard functionality with non-recursive mutex, recursive mutex -/// and null pointer -TEST_F(LockGuardTest, testLock) { - shared_ptr test_mutex; - // test non-recursive lock - test_mutex = make_shared(); - test_mutex->testMutexState(0, 0, 0, false); - { - // call LockGuard constructor which locks mutex - LockGuard lock(test_mutex.get()); - // expect lock 1 lock_count 1 unlock_count 0 dead_lock false - test_mutex->testMutexState(1, 1, 0, false); - { - // call LockGuard constructor which locks mutex resulting in an - // exception as the mutex is already locked (dead lock) - EXPECT_THROW(LockGuard lock(test_mutex.get()), - isc::InvalidOperation); - // expect lock 1 lock_count 1 unlock_count 0 dead_lock true - // you should not be able to get here...using a real mutex - test_mutex->testMutexState(1, 1, 0, true); - } - // expect lock 1 lock_count 1 unlock_count 0 dead_lock true - // you should not be able to get here...using a real mutex - test_mutex->testMutexState(1, 1, 0, true); - } - // expect lock 0 lock_count 1 unlock_count 1 dead_lock false - // the implementation is self healing when completely unlocking the mutex - test_mutex->testMutexState(0, 1, 1, false); - // test recursive lock - test_mutex = make_shared(true); - test_mutex->testMutexState(0, 0, 0, false); - { - // call LockGuard constructor which locks mutex - LockGuard lock(test_mutex.get()); - // expect lock 1 lock_count 1 unlock_count 0 dead_lock false - test_mutex->testMutexState(1, 1, 0, false); - { - // call LockGuard constructor which locks mutex but does not block - // as this is done on the same thread and the mutex is recursive - EXPECT_NO_THROW(LockGuard lock(test_mutex.get())); - // expect lock 1 lock_count 2 unlock_count 1 dead_lock false - // the destructor was already called in EXPECT_NO_THROW scope - test_mutex->testMutexState(1, 2, 1, false); - } - // expect lock 1 lock_count 2 unlock_count 1 dead_lock false - test_mutex->testMutexState(1, 2, 1, false); - } - // expect lock 0 lock_count 2 unlock_count 2 dead_lock false - test_mutex->testMutexState(0, 2, 2, false); - // test LockGuart with no mutex - { - LockGuard lock(nullptr); - { - EXPECT_NO_THROW(LockGuard lock(nullptr)); - } - } - } - -} // namespace diff --git a/src/lib/dhcpsrv/tests/thread_pool_unittest.cc b/src/lib/util/tests/thread_pool_unittest.cc similarity index 99% rename from src/lib/dhcpsrv/tests/thread_pool_unittest.cc rename to src/lib/util/tests/thread_pool_unittest.cc index 218b785476..e3a99b721c 100644 --- a/src/lib/dhcpsrv/tests/thread_pool_unittest.cc +++ b/src/lib/util/tests/thread_pool_unittest.cc @@ -8,11 +8,11 @@ #include -#include +#include #include -using namespace isc::dhcp; +using namespace isc::util; using namespace std; namespace { @@ -186,6 +186,7 @@ private: list> threads_; }; +/// @brief define CallBack type typedef function CallBack; /// @brief test ThreadPool add and count diff --git a/src/lib/dhcpsrv/thread_pool.h b/src/lib/util/thread_pool.h similarity index 78% rename from src/lib/dhcpsrv/thread_pool.h rename to src/lib/util/thread_pool.h index 1432d4175b..613c845703 100644 --- a/src/lib/dhcpsrv/thread_pool.h +++ b/src/lib/util/thread_pool.h @@ -7,8 +7,6 @@ #ifndef THREAD_POOL_H #define THREAD_POOL_H -#include - #include #include #include @@ -17,7 +15,7 @@ #include namespace isc { -namespace dhcp { +namespace util { /// @brief Defines a thread pool which uses a thread pool queue for managing /// work items. Each work item is a 'function' object. @@ -35,12 +33,8 @@ struct ThreadPool { /// @brief reset the thread pool stopping threads and clearing the internal /// queue void reset() { - LOG_INFO(dhcpsrv_logger, "Thread pool shutting down"); - stop(); queue_.clear(); - - LOG_INFO(dhcpsrv_logger, "Thread pool shut down"); } /// @brief start all the threads @@ -48,8 +42,6 @@ struct ThreadPool { /// @param thread_count specifies the number of threads to be created and /// started void start(uint32_t thread_count) { - LOG_INFO(dhcpsrv_logger, "Thread pool starting with %1 worker threads") - .arg(thread_count); if (!thread_count || running_) { return; } @@ -58,26 +50,21 @@ struct ThreadPool { for (uint32_t i = 0; i < thread_count; ++i) { threads_.push_back(std::make_shared(&ThreadPool::run, this)); } - - LOG_INFO(dhcpsrv_logger, "Thread pool started"); } /// @brief stop all the threads void stop() { - LOG_INFO(dhcpsrv_logger, "Thread pool stopping"); running_ = false; queue_.disable(); for (auto thread : threads_) { thread->join(); } threads_.clear(); - - LOG_INFO(dhcpsrv_logger, "Thread pool stopped"); } - /// @brief add a working item to the thread pool + /// @brief add a work item to the thread pool /// - /// @param call_back the 'function' object to be added to the queue + /// @param item the 'function' object to be added to the queue void add(WorkItem& item) { queue_.push(item); } @@ -95,6 +82,7 @@ struct ThreadPool { size_t size() { return threads_.size(); } + private: /// @brief Defines a generic thread pool queue. /// @@ -122,7 +110,7 @@ private: /// @brief push work item to the queue /// - /// Used to add work items in the queue. + /// Used to add work items to the queue. /// This function adds an item to the queue and wakes up at least one thread /// waiting on the queue. /// @@ -140,21 +128,18 @@ private: /// @brief pop work item from the queue or block waiting /// /// Used to retrieve and remove a work item from the queue - /// If the queue is 'disabled', this function returns immediately - /// (no element). + /// If the queue is 'disabled', this function returns immediately an empty + /// element. /// If the queue is 'enabled', this function returns the first element in /// the queue or blocks the calling thread if there are no work items /// available. /// - /// @param item the reference of the item removed from the queue, if any - /// - /// @return true if there was a work item removed from the queue, false - /// otherwise + /// @return the first work item from the queue or an empty element. Item pop() { std::unique_lock lock(mutex_); while (enabled_) { if (queue_.empty()) { - // Wait for push or stop functions. + // Wait for push or disable functions. cv_.wait(lock); continue; } @@ -185,7 +170,7 @@ private: queue_ = std::queue(); } - /// @brief start and enable the queue + /// @brief enable the queue /// /// Sets the queue state to 'enabled' void enable() { @@ -193,9 +178,9 @@ private: enabled_ = true; } - /// brief stop and disable the queue + /// brief disable the queue /// - /// Sets the queue state to 'disabled' and optionally removes all work items + /// Sets the queue state to 'disabled' void disable() { std::lock_guard lock(mutex_); enabled_ = false; @@ -214,26 +199,22 @@ private: std::condition_variable cv_; /// @brief the sate of the queue - /// The 'enabled' state corresponds to false value - /// The 'disabled' state corresponds to true value + /// The 'enabled' state corresponds to true value + /// The 'disabled' state corresponds to false value std::atomic_bool enabled_; }; /// @brief run function of each thread void run() { - std::thread::id th_id = std::this_thread::get_id(); - LOG_INFO(dhcpsrv_logger, "Thread pool thread started. id: %1") - .arg(th_id); - while (running_) { WorkItem item = queue_.pop(); if (item) { - item(); + try { + item(); + } catch (...) { + } } } - - LOG_INFO(dhcpsrv_logger, "Thread pool thread ended. id: %1") - .arg(th_id); } /// @brief list of worker threads @@ -243,12 +224,12 @@ private: ThreadPoolQueue queue_; /// @brief state of the thread pool - /// The 'run' state corresponds to false value - /// The 'stop' state corresponds to true value + /// The 'running' state corresponds to true value + /// The 'not running' state corresponds to false value std::atomic_bool running_; }; -} // namespace dhcp +} // namespace util } // namespace isc #endif // THREAD_POOL_H