+++ /dev/null
-// Copyright (C) 2018-2019 Internet Systems Consortium, Inc. ("ISC")
-//
-// This Source Code Form is subject to the terms of the Mozilla Public
-// License, v. 2.0. If a copy of the MPL was not distributed with this
-// file, You can obtain one at http://mozilla.org/MPL/2.0/.
-
-#include <config.h>
-
-#include <gtest/gtest.h>
-
-#include <util/lock_guard.h>
-#include <exceptions/exceptions.h>
-
-#include <condition_variable>
-#include <list>
-#include <mutex>
-#include <set>
-#include <thread>
-
-using namespace isc::util;
-using namespace std;
-
-namespace {
-
-/// @brief test mutex class used to check the internal state of a 'fictional'
-/// mutex so that the functionality of the LockGuard can be tested
-/// the test mutex can be recursive which means that a lock can be called on the
-/// same thread and not resulting in a dead lock
-class TestMutex {
-public:
- /// @brief Constructor
- ///
- /// @param recursive sets the mutex as recursive mutex
- TestMutex(bool recursive = false) : lock_(0), dead_lock_(false),
- lock_count_(0), unlock_count_(0), recursive_(recursive) {
- }
-
- /// @brief lock the mutex
- void lock() {
- lock_guard<mutex> lk(mutex_);
- if (lock_ >= 1) {
- // mutex is already locked
- if (!recursive_) {
- // lock on a non-recursive mutex resulting in a dead lock
- dead_lock_ = true;
- isc_throw(isc::InvalidOperation,
- "recursive lock on already locked mutex resulting in "
- "dead lock");
- } else {
- // lock on a recursive mutex
- if (this_thread::get_id() != id_) {
- // lock on a recursive mutex on a different thread resulting
- // in a dead lock
- dead_lock_ = true;
- isc_throw(isc::InvalidOperation,
- "recursive lock on a different thread on already "
- "locked mutex resulting in dead lock");
- }
- }
- }
- // increment the total number of locks
- lock_count_++;
- // increment the lock state
- lock_++;
- // save the thread id
- id_ = this_thread::get_id();
- }
-
- /// @brief unlock the mutex
- void unlock() {
- lock_guard<mutex> lk(mutex_);
- if (lock_ <= 0) {
- // unlock an unlocked mutex
- isc_throw(isc::InvalidOperation, "unlock on non locked mutex "
- "resulting in undefined behavior");
- }
- if (lock_ == 1) {
- // only one thread has the lock
- // self healing mutex reseting the dead lock flag
- dead_lock_ = false;
- // reset the thread id
- id_ = std::thread::id();
- }
- // increment the total number of unlocks
- unlock_count_++;
- // decrement the lock state
- lock_--;
- }
-
- /// @brief get the mutex lock state
- ///
- /// @return the mutex lock state
- int32_t getLock() {
- lock_guard<mutex> lk(mutex_);
- return lock_;
- }
-
- /// @brief get the mutex dead lock state
- ///
- /// @return the mutex dead lock state
- bool getDeadLock() {
- lock_guard<mutex> lk(mutex_);
- return dead_lock_;
- }
-
- /// @brief get the number of locks performed on mutex
- ///
- /// @return the mutex number of locks
- uint32_t getLockCount() {
- lock_guard<mutex> lk(mutex_);
- return lock_count_;
- }
-
- /// @brief get the number of unlocks performed on mutex
- ///
- /// @return the mutex number of unlocks
- uint32_t getUnlockCount() {
- lock_guard<mutex> lk(mutex_);
- return unlock_count_;
- }
-
- /// @brief test the internal state of the mutex
- ///
- /// @param expected_lock check equality of this value with lock state
- /// @param expected_lock_count check equality of this value with lock count
- /// @param expected_unlock_count check equality of this value with unlock count
- /// @param expected_dead_lock check equality of this value with dead lock state
- void testMutexState(int32_t expected_lock,
- uint32_t expected_lock_count,
- uint32_t expected_unlock_count,
- bool expected_dead_lock) {
- ASSERT_EQ(getLock(), expected_lock);
- ASSERT_EQ(getLockCount(), expected_lock_count);
- ASSERT_EQ(getUnlockCount(), expected_unlock_count);
- ASSERT_EQ(getDeadLock(), expected_dead_lock);
- }
-
-private:
- /// @brief internal lock state of the mutex
- int32_t lock_;
-
- /// @brief state which indicates that the mutex is in dead lock
- bool dead_lock_;
-
- /// @brief total number of locks performed on the mutex
- uint32_t lock_count_;
-
- /// @brief total number of unlocks performed on the mutex
- uint32_t unlock_count_;
-
- /// @brief flag to indicate if the mutex is recursive or not
- bool recursive_;
-
- /// @brief mutex used to keep the internal state consistent
- mutex mutex_;
-
- /// @brief the id of the thread holding the mutex
- std::thread::id id_;
-};
-
-/// @brief Test Fixture for testing isc::util::LockGuard
-class LockGuardTest : public ::testing::Test {
-};
-
-/// @brief test LockGuard functionality with non-recursive mutex, recursive mutex
-/// and null pointer
-TEST_F(LockGuardTest, testLock) {
- shared_ptr<TestMutex> test_mutex;
- // test non-recursive lock
- test_mutex = make_shared<TestMutex>();
- test_mutex->testMutexState(0, 0, 0, false);
- {
- // call LockGuard constructor which locks mutex
- LockGuard<TestMutex> lock(test_mutex.get());
- // expect lock 1 lock_count 1 unlock_count 0 dead_lock false
- test_mutex->testMutexState(1, 1, 0, false);
- {
- // call LockGuard constructor which locks mutex resulting in an
- // exception as the mutex is already locked (dead lock)
- EXPECT_THROW(LockGuard<TestMutex> lock(test_mutex.get()),
- isc::InvalidOperation);
- // expect lock 1 lock_count 1 unlock_count 0 dead_lock true
- // you should not be able to get here...using a real mutex
- test_mutex->testMutexState(1, 1, 0, true);
- }
- // expect lock 1 lock_count 1 unlock_count 0 dead_lock true
- // you should not be able to get here...using a real mutex
- test_mutex->testMutexState(1, 1, 0, true);
- }
- // expect lock 0 lock_count 1 unlock_count 1 dead_lock false
- // the implementation is self healing when completely unlocking the mutex
- test_mutex->testMutexState(0, 1, 1, false);
- // test recursive lock
- test_mutex = make_shared<TestMutex>(true);
- test_mutex->testMutexState(0, 0, 0, false);
- {
- // call LockGuard constructor which locks mutex
- LockGuard<TestMutex> lock(test_mutex.get());
- // expect lock 1 lock_count 1 unlock_count 0 dead_lock false
- test_mutex->testMutexState(1, 1, 0, false);
- {
- // call LockGuard constructor which locks mutex but does not block
- // as this is done on the same thread and the mutex is recursive
- EXPECT_NO_THROW(LockGuard<TestMutex> lock(test_mutex.get()));
- // expect lock 1 lock_count 2 unlock_count 1 dead_lock false
- // the destructor was already called in EXPECT_NO_THROW scope
- test_mutex->testMutexState(1, 2, 1, false);
- }
- // expect lock 1 lock_count 2 unlock_count 1 dead_lock false
- test_mutex->testMutexState(1, 2, 1, false);
- }
- // expect lock 0 lock_count 2 unlock_count 2 dead_lock false
- test_mutex->testMutexState(0, 2, 2, false);
- // test LockGuart with no mutex
- {
- LockGuard<TestMutex> lock(nullptr);
- {
- EXPECT_NO_THROW(LockGuard<TestMutex> lock(nullptr));
- }
- }
- }
-
-} // namespace
#ifndef THREAD_POOL_H
#define THREAD_POOL_H
-#include <dhcpsrv/dhcpsrv_log.h>
-
#include <atomic>
#include <condition_variable>
#include <list>
#include <thread>
namespace isc {
-namespace dhcp {
+namespace util {
/// @brief Defines a thread pool which uses a thread pool queue for managing
/// work items. Each work item is a 'function' object.
/// @brief reset the thread pool stopping threads and clearing the internal
/// queue
void reset() {
- LOG_INFO(dhcpsrv_logger, "Thread pool shutting down");
-
stop();
queue_.clear();
-
- LOG_INFO(dhcpsrv_logger, "Thread pool shut down");
}
/// @brief start all the threads
/// @param thread_count specifies the number of threads to be created and
/// started
void start(uint32_t thread_count) {
- LOG_INFO(dhcpsrv_logger, "Thread pool starting with %1 worker threads")
- .arg(thread_count);
if (!thread_count || running_) {
return;
}
for (uint32_t i = 0; i < thread_count; ++i) {
threads_.push_back(std::make_shared<std::thread>(&ThreadPool::run, this));
}
-
- LOG_INFO(dhcpsrv_logger, "Thread pool started");
}
/// @brief stop all the threads
void stop() {
- LOG_INFO(dhcpsrv_logger, "Thread pool stopping");
running_ = false;
queue_.disable();
for (auto thread : threads_) {
thread->join();
}
threads_.clear();
-
- LOG_INFO(dhcpsrv_logger, "Thread pool stopped");
}
- /// @brief add a working item to the thread pool
+ /// @brief add a work item to the thread pool
///
- /// @param call_back the 'function' object to be added to the queue
+ /// @param item the 'function' object to be added to the queue
void add(WorkItem& item) {
queue_.push(item);
}
size_t size() {
return threads_.size();
}
+
private:
/// @brief Defines a generic thread pool queue.
///
/// @brief push work item to the queue
///
- /// Used to add work items in the queue.
+ /// Used to add work items to the queue.
/// This function adds an item to the queue and wakes up at least one thread
/// waiting on the queue.
///
/// @brief pop work item from the queue or block waiting
///
/// Used to retrieve and remove a work item from the queue
- /// If the queue is 'disabled', this function returns immediately
- /// (no element).
+ /// If the queue is 'disabled', this function returns immediately an empty
+ /// element.
/// If the queue is 'enabled', this function returns the first element in
/// the queue or blocks the calling thread if there are no work items
/// available.
///
- /// @param item the reference of the item removed from the queue, if any
- ///
- /// @return true if there was a work item removed from the queue, false
- /// otherwise
+ /// @return the first work item from the queue or an empty element.
Item pop() {
std::unique_lock<std::mutex> lock(mutex_);
while (enabled_) {
if (queue_.empty()) {
- // Wait for push or stop functions.
+ // Wait for push or disable functions.
cv_.wait(lock);
continue;
}
queue_ = std::queue<Item>();
}
- /// @brief start and enable the queue
+ /// @brief enable the queue
///
/// Sets the queue state to 'enabled'
void enable() {
enabled_ = true;
}
- /// brief stop and disable the queue
+ /// brief disable the queue
///
- /// Sets the queue state to 'disabled' and optionally removes all work items
+ /// Sets the queue state to 'disabled'
void disable() {
std::lock_guard<std::mutex> lock(mutex_);
enabled_ = false;
std::condition_variable cv_;
/// @brief the sate of the queue
- /// The 'enabled' state corresponds to false value
- /// The 'disabled' state corresponds to true value
+ /// The 'enabled' state corresponds to true value
+ /// The 'disabled' state corresponds to false value
std::atomic_bool enabled_;
};
/// @brief run function of each thread
void run() {
- std::thread::id th_id = std::this_thread::get_id();
- LOG_INFO(dhcpsrv_logger, "Thread pool thread started. id: %1")
- .arg(th_id);
-
while (running_) {
WorkItem item = queue_.pop();
if (item) {
- item();
+ try {
+ item();
+ } catch (...) {
+ }
}
}
-
- LOG_INFO(dhcpsrv_logger, "Thread pool thread ended. id: %1")
- .arg(th_id);
}
/// @brief list of worker threads
ThreadPoolQueue<WorkItem> queue_;
/// @brief state of the thread pool
- /// The 'run' state corresponds to false value
- /// The 'stop' state corresponds to true value
+ /// The 'running' state corresponds to true value
+ /// The 'not running' state corresponds to false value
std::atomic_bool running_;
};
-} // namespace dhcp
+} // namespace util
} // namespace isc
#endif // THREAD_POOL_H