}
}
+ // Stop everything before we change into single-threaded mode.
+ MultiThreadingCriticalSection cs;
+
// destroying the thread pool
MultiThreadingMgr::instance().apply(false, 0, 0);
}
}
+ // Stop everything before we change into single-threaded mode.
+ MultiThreadingCriticalSection cs;
+
// destroying the thread pool
MultiThreadingMgr::instance().apply(false, 0, 0);
void
HAService::startClientAndListener() {
+ // Add critical section callbacks.
+ MultiThreadingMgr::instance().addCriticalSectionCallbacks("HA_MT",
+ std::bind(&HAService::pauseClientAndListener, this),
+ std::bind(&HAService::resumeClientAndListener, this));
+
if (client_) {
client_->start();
}
void
HAService::stopClientAndListener() {
+ // Remove critical section callbacks.
+ MultiThreadingMgr::instance().removeCriticalSectionCallbacks("HA_MT");
+
if (client_) {
client_->stop();
}
///
/// When HA+Mt is enabled it starts the client's thread pool
/// and the dedicated listener thread pool, if the listener exists.
+ /// It registers pauseClientAndListener() and resumeClientAndListener()
+ /// as the MultiThreading critical section entry and exit callbacks,
+ /// respectively.
void startClientAndListener();
/// @brief Pauses client and(or) listener thread pool operations.
///
/// Suspends the client and listener thread pool event processing.
/// Has no effect in single-threaded mode or if thread pools are
- /// not currently running.
+ /// not currently running. Serves as the MultiThreading critical
+ /// section entry callback.
void pauseClientAndListener();
/// @brief Resumes client and(or) listener thread pool operations.
///
/// Resumes the client and listener thread pool event processing.
/// Has no effect in single-threaded mode or if thread pools are
- /// not currently paused.
+ /// not currently paused. Serves as the MultiThreading critical
+ /// section exit callback.
void resumeClientAndListener();
/// @brief Stop the client and(or) listener instances.
///
- /// Closes all connections and stops the thread pools for the client
+ /// It unregisters the MultiThreading critical section callbacks,
+ /// closes all connections, and the stops the thread pools for the client
/// and listener, if they exist.
void stopClientAndListener();
}
};
-// Verifies HA+MT start, pause, resume, and stop.
+// Verifies HA+MT start, pause, resume, and stop. Note
+// that pause and resume are tested indirectly through
+// entry and exit of a critical section.
TEST_F(HAMtServiceTest, multiThreadingBasics) {
// Build the HA JSON configuration.
EXPECT_EQ(service->listener_->getThreadPoolSize(), 3);
EXPECT_EQ(service->listener_->getThreadCount(), 3);
- // Pause client and listener.
- ASSERT_NO_THROW_LOG(service->pauseClientAndListener());
+ {
+ // Entering a critical section should pause both client
+ // and listener.
+ MultiThreadingCriticalSection cs;
- // Client should be paused.
- ASSERT_TRUE(service->client_->isPaused());
- EXPECT_TRUE(service->client_->getThreadIOService()->stopped());
+ // Client should be paused.
+ ASSERT_TRUE(service->client_->isPaused());
+ EXPECT_TRUE(service->client_->getThreadIOService()->stopped());
- // Listener should be paused.
- ASSERT_TRUE(service->listener_->isPaused());
- EXPECT_TRUE(service->listener_->getThreadIOService()->stopped());
+ // Listener should be paused.
+ ASSERT_TRUE(service->listener_->isPaused());
+ EXPECT_TRUE(service->listener_->getThreadIOService()->stopped());
+ }
- // Now resume client and listener.
- ASSERT_NO_THROW_LOG(service->resumeClientAndListener());
+ // Exiting critical section should resume both cllent
+ // and listener.
// Client should be running.
ASSERT_TRUE(service->client_->isRunning());
libkea_util_la_SOURCES += memory_segment.h
libkea_util_la_SOURCES += memory_segment_local.h memory_segment_local.cc
libkea_util_la_SOURCES += multi_threading_mgr.h multi_threading_mgr.cc
+libkea_util_la_SOURCES += named_callback.h named_callback.cc
libkea_util_la_SOURCES += optional.h
libkea_util_la_SOURCES += pid_file.h pid_file.cc
libkea_util_la_SOURCES += pointer_util.h
memory_segment.h \
memory_segment_local.h \
multi_threading_mgr.h \
+ named_callback.h \
optional.h \
pid_file.h \
pointer_util.h \
void
MultiThreadingMgr::enterCriticalSection() {
- stopPktProcessing();
+ stopProcessing();
++critical_section_count_;
}
isc_throw(InvalidOperation, "invalid negative value for override");
}
--critical_section_count_;
- startPktProcessing();
+ startProcessing();
}
bool
thread_pool_.start(thread_count);
}
} else {
+ removeAllCriticalSectionCallbacks();
thread_pool_.reset();
setMode(false);
setThreadPoolSize(thread_count);
}
void
-MultiThreadingMgr::stopPktProcessing() {
- if (getMode() && getThreadPoolSize() && !isInCriticalSection()) {
- thread_pool_.stop();
+MultiThreadingMgr::stopProcessing() {
+ if (getMode() && !isInCriticalSection()) {
+ if (getThreadPoolSize()) {
+ thread_pool_.stop();
+ }
+
+ for (auto cb : critical_entry_cbs_.getCallbacks() ) {
+ // @todo need to think about what we do with exceptions
+ (cb.callback_)();
+ }
}
}
void
-MultiThreadingMgr::startPktProcessing() {
- if (getMode() && getThreadPoolSize() && !isInCriticalSection()) {
- thread_pool_.start(getThreadPoolSize());
+MultiThreadingMgr::startProcessing() {
+ if (getMode() && !isInCriticalSection()) {
+ if (getThreadPoolSize()) {
+ thread_pool_.start(getThreadPoolSize());
+ }
+
+ for (auto cb : critical_exit_cbs_.getCallbacks() ) {
+ // @todo need to think about what we do with exceptions
+ (cb.callback_)();
+ }
}
}
+void
+MultiThreadingMgr::addCriticalSectionCallbacks(const std::string& name,
+ const NamedCallback::Callback& entry_cb,
+ const NamedCallback::Callback& exit_cb) {
+ critical_entry_cbs_.addCallback(name, entry_cb);
+ critical_exit_cbs_.addCallback(name, exit_cb);
+}
+
+void
+MultiThreadingMgr::removeCriticalSectionCallbacks(const std::string& name) {
+ critical_entry_cbs_.removeCallback(name);
+ critical_exit_cbs_.removeCallback(name);
+}
+
+void
+MultiThreadingMgr::removeAllCriticalSectionCallbacks() {
+ critical_entry_cbs_.removeAll();
+ critical_exit_cbs_.removeAll();
+}
+
MultiThreadingCriticalSection::MultiThreadingCriticalSection() {
MultiThreadingMgr::instance().enterCriticalSection();
}
-// Copyright (C) 2019-2020 Internet Systems Consortium, Inc. ("ISC")
+// Copyright (C) 2019-2021 Internet Systems Consortium, Inc. ("ISC")
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
#define MULTI_THREADING_MGR_H
#include <util/thread_pool.h>
+#include <util/named_callback.h>
#include <boost/noncopyable.hpp>
/// configured, 0 for unlimited size
void apply(bool enabled, uint32_t thread_count, uint32_t queue_size);
+ /// @brief Adds a pair of callbacks to the list of CriticalSection callbacks.
+ ///
+ /// @param name Name of the set of callbacks. This value is used by the
+ /// callback owner to remove or replace them.Duplicates are not allowed.
+ /// @param entry_cb Callback to invoke upon CriticalSection entry.
+ /// @param exit_cb Callback to invoke upon CriticalSection exit.
+ void addCriticalSectionCallbacks(const std::string& name,
+ const NamedCallback::Callback& entry_cb,
+ const NamedCallback::Callback& exit_cb);
+
+ /// @brief Removes the set of callbacks associated with a given name
+ /// from the list of CriticalSection callbacks.
+ ///
+ /// If the name is not found in the list, it simply returns.
+ ///
+ /// @param name Name of the set of callbacks to remove.
+ void removeCriticalSectionCallbacks(const std::string& name);
+
+ /// @brief Removes all callbacks in the list of CriticalSection callbacks.
+ void removeAllCriticalSectionCallbacks();
+
protected:
/// @brief Constructor.
private:
- /// @brief Class method stopping and joining all threads of the pool.
+ /// @brief Class method stops non-critical processing.
///
- /// Stop the dhcp thread pool if running.
- void stopPktProcessing();
+ /// Stops the DHCP thread pool if it's running and invokes
+ /// all CriticalSection entry callbacks. Has no effect
+ /// in single-threaded mode.
+ void stopProcessing();
- /// @brief Class method (re)starting threads of the pool.
+ /// @brief Class method (re)starts non-critical processing.
///
- /// Start the dhcp thread pool according to current configuration.
- void startPktProcessing();
+ /// Starts the DHCP thread pool according to current configuration,
+ /// and invokes all CriticalSection exit callbacks. Has no effect
+ /// in single-threaded mode.
+ void startProcessing();
/// @brief The current multi-threading mode.
///
/// @brief Packet processing thread pool.
ThreadPool<std::function<void()>> thread_pool_;
+
+ /// @brief List of callbacks to invoke upon CriticalSection entry.
+ NamedCallbackList critical_entry_cbs_;
+
+ /// @brief List of callbacks to invoke upon CriticalSection exit.
+ NamedCallbackList critical_exit_cbs_;
};
/// @note: everything here MUST be used ONLY from the main thread.
--- /dev/null
+// Copyright (C) 2021 Internet Systems Consortium, Inc. ("ISC")
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+#include <config.h>
+
+#include <exceptions/exceptions.h>
+#include <util/named_callback.h>
+
+#include <iostream>
+#include <string>
+
+namespace isc {
+namespace util {
+
+void
+NamedCallbackList::addCallback(const std::string& name, const NamedCallback::Callback& cb) {
+ if (!cb) {
+ isc_throw(BadValue, "NamedCallbackList - callback: " << name
+ << ", cannot be empty");
+ }
+
+ for (auto const& callback : callbacks_) {
+ if (callback.name_ == name) {
+ isc_throw(BadValue, "NamedCallbackList - callback: " << name
+ << ", already exists");
+ }
+ }
+
+ callbacks_.push_back(NamedCallback(name, cb));
+}
+
+void
+NamedCallbackList::removeCallback(const std::string& name) {
+ for (auto it = callbacks_.begin(); it != callbacks_.end(); ) {
+ if ((*it).name_ == name) {
+ it = callbacks_.erase(it);
+ break;
+ }
+
+ ++it;
+ }
+}
+
+void
+NamedCallbackList::removeAll() {
+ callbacks_.clear();
+}
+
+const std::list<NamedCallback>&
+NamedCallbackList::getCallbacks() {
+ return (callbacks_);
+}
+
+} // namespace util
+} // namespace isc
--- /dev/null
+// Copyright (C) 2021 Internet Systems Consortium, Inc. ("ISC")
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+#ifndef NAMED_CALLBACK_H
+#define NAMED_CALLBACK_H
+
+#include <exceptions/exceptions.h>
+#include <boost/make_shared.hpp>
+#include <boost/shared_ptr.hpp>
+
+#include <functional>
+#include <list>
+
+namespace isc {
+namespace util {
+
+/// @brief Associates a Callback with a name.
+struct NamedCallback {
+ /// @brief Defines a callback as a simple void() functor.
+ typedef std::function<void()> Callback;
+
+ /// @brief Constructor
+ ///
+ /// @param name Name by which the callback can be found.
+ /// @param cb Callback associated with name.
+ NamedCallback(const std::string& name, const Callback& cb)
+ : name_(name), callback_(cb) {
+ };
+
+ /// @Brief Name by which the callback can be found.
+ std::string name_;
+
+ /// @Brief Callback associated with name.
+ Callback callback_;
+};
+
+/// @brief Maintains list of unique NamedCallbacks.
+///
+/// The list emphasizes iteration order and speed over
+/// retrieval by name. When iterating over the list of
+/// callbacks, they are returned in the order they were
+/// added, not by name.
+class NamedCallbackList {
+public:
+ /// @brief Constructor.
+ NamedCallbackList(){};
+
+ /// @brief Adds a callback to the list.
+ ///
+ /// @param name Name of the callback to add.
+ /// @param cb Callback to add.
+ ///
+ /// @throw BadValue if the name is already in the list.
+ void addCallback(const std::string& name, const NamedCallback::Callback& cb);
+
+ /// @brief Removes a callback from the list.
+ ///
+ /// @param name Name of the callback to remove.
+ /// If no such callback exists, it simply returns.
+ void removeCallback(const std::string& name);
+
+ /// @brief Removes all callbacks from the list.
+ void removeAll();
+
+ /// @brief Fetches the list of callbacks.
+ const std::list<NamedCallback>& getCallbacks();
+
+private:
+ /// @brief The list of callbacks.
+ std::list<NamedCallback> callbacks_;
+};
+
+} // namespace util
+} // namespace isc
+
+#endif // NAMED_CALLBACK_H
run_unittests_SOURCES += memory_segment_common_unittest.h
run_unittests_SOURCES += memory_segment_common_unittest.cc
run_unittests_SOURCES += multi_threading_mgr_unittest.cc
+run_unittests_SOURCES += named_callback_unittest.cc
run_unittests_SOURCES += optional_unittest.cc
run_unittests_SOURCES += pid_file_unittest.cc
run_unittests_SOURCES += qid_gen_unittest.cc
-// Copyright (C) 2019-2020 Internet Systems Consortium, Inc. ("ISC")
+// Copyright (C) 2019-2021 Internet Systems Consortium, Inc. ("ISC")
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// apply multi-threading configuration with 0 threads
MultiThreadingMgr::instance().apply(false, 0, 0);
}
+
+/// @brief Test fixture for exercised CriticalSection callbacks.
+class CriticalSectionCallbackTest : public ::testing::Test {
+public:
+ /// @Brief Constructor.
+ CriticalSectionCallbackTest() {};
+
+ /// @Brief A callback that adds the value, 1, to invocations lists.
+ void one() {
+ invocations_.push_back(1);
+ }
+
+ /// @Brief A callback that adds the value, 2, to invocations lists.
+ void two() {
+ invocations_.push_back(2);
+ }
+
+ /// @Brief A callback that adds the value, 3, to invocations lists.
+ void three() {
+ invocations_.push_back(3);
+ }
+
+ /// @Brief A callback that adds the value, 4, to invocations lists.
+ void four() {
+ invocations_.push_back(4);
+ }
+
+ /// @brief Indicates whether or not the DHCP thread pool is running.
+ ///
+ /// @return True if the pool is running, false otherwise.
+ bool isThreadPoolRunning() {
+ return (MultiThreadingMgr::instance().getThreadPool().size());
+ }
+
+ /// @brief Checks callback invocations over a series of nested
+ /// CriticalSecitons.
+ ///
+ /// @param entries A vector of the invocation values that should
+ /// be present after entry into the outermost CriticalSection. The
+ /// expected values should be in the order the callbacks were
+ /// added to the MultiThreadingMgr's list of callbacks.
+ /// @param exits A vector of the invocation values that should
+ /// be present after exiting the CriticalSection. The expected
+ /// values should be in the order the callbacks were added to the
+ /// MultiThreadingMgr's list of callbacks.
+ void runCriticalSections(std::vector<int> entries, std::vector<int>exits) {
+ // Pool must be running.
+ ASSERT_TRUE(isThreadPoolRunning());
+
+ // Clear the invocations list.
+ invocations_.clear();
+
+ // Use scope to create nested CriticalSections.
+ {
+ // Enter a critical section.
+ MultiThreadingCriticalSection cs;
+
+ // Thread pool should be stopped.
+ ASSERT_FALSE(isThreadPoolRunning());
+
+ if (entries.size()) {
+ // We expect entry invocations.
+ ASSERT_TRUE(invocations_ == entries);
+ } else {
+ // We do not expect entry invocations.
+ ASSERT_FALSE(invocations_.size());
+ }
+
+ // Clear the invocations list.
+ invocations_.clear();
+
+ {
+ // Enter another CriticalSection.
+ MultiThreadingCriticalSection inner_cs;
+
+ // thread pool should still be stopped
+ ASSERT_FALSE(isThreadPoolRunning());
+
+ // We should not have had any callback invocations.
+ ASSERT_FALSE(invocations_.size());
+ }
+
+ // After exiting inner setion, the thread pool should
+ // still be stopped
+ ASSERT_FALSE(isThreadPoolRunning());
+
+ // We should not have had more callback invocations.
+ ASSERT_FALSE(invocations_.size());
+ }
+
+ // After exiting the outer setion, the thread pool should
+ // match the thread count.
+ ASSERT_TRUE(isThreadPoolRunning());
+
+ if (exits.size()) {
+ // We expect exit invocations.
+ ASSERT_TRUE(invocations_ == exits);
+ } else {
+ // We do not expect exit invocations.
+ ASSERT_FALSE(invocations_.size());
+ }
+ }
+
+ /// @Brief A list of values set by callback invocations.
+ std::vector<int> invocations_;
+};
+
+/// @brief Verifies that the critical section callbacks work.
+TEST_F(CriticalSectionCallbackTest, basics) {
+ // get the thread pool instance
+ auto& thread_pool = MultiThreadingMgr::instance().getThreadPool();
+ // thread pool should be stopped
+ EXPECT_EQ(thread_pool.size(), 0);
+
+ // Add two sets of CriticalSection call backs.
+ MultiThreadingMgr::instance().addCriticalSectionCallbacks("oneAndTwo",
+ std::bind(&CriticalSectionCallbackTest::one, this),
+ std::bind(&CriticalSectionCallbackTest::two, this));
+
+ MultiThreadingMgr::instance().addCriticalSectionCallbacks("threeAndFour",
+ std::bind(&CriticalSectionCallbackTest::three, this),
+ std::bind(&CriticalSectionCallbackTest::four, this));
+
+ // Apply multi-threading configuration with 16 threads and queue size 256.
+ MultiThreadingMgr::instance().apply(true, 16, 256);
+
+ // Make three passes over nested CriticalSections to ensure
+ // callbacks execute at the appropriate times and we can do
+ // so repeatedly.
+ for (int i = 0; i < 3; ++i) {
+ runCriticalSections({1,3}, {2,4});
+ }
+
+ // Now remove the first set of callbacks.
+ MultiThreadingMgr::instance().removeCriticalSectionCallbacks("oneAndTwo");
+
+ // Retest CriticalSections.
+ runCriticalSections({3}, {4});
+
+ // Now remove the remaining callbacks.
+ MultiThreadingMgr::instance().removeAllCriticalSectionCallbacks();
+
+ // Retest CriticalSections.
+ runCriticalSections({}, {});
+}
--- /dev/null
+// Copyright (C) 2021 Internet Systems Consortium, Inc. ("ISC")
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+#include <config.h>
+
+#include <gtest/gtest.h>
+
+#include <exceptions/exceptions.h>
+#include <util/named_callback.h>
+#include <testutils/gtest_utils.h>
+
+using namespace isc;
+using namespace isc::util;
+using namespace std;
+
+namespace {
+
+class NamedCallbackListTest : public ::testing::Test {
+public:
+ NamedCallbackListTest() {};
+
+ void one() {
+ invocations_.push_back(1);
+ }
+
+ void two() {
+ invocations_.push_back(2);
+ }
+
+ void three() {
+ invocations_.push_back(3);
+ }
+
+ void runCallbacks() {
+ invocations_.clear();
+ for (auto cb : cbs_.getCallbacks() ) {
+ ASSERT_NO_THROW((cb.callback_)());
+ }
+ }
+
+ NamedCallbackList cbs_;
+ std::vector<int> invocations_;
+};
+
+TEST_F(NamedCallbackListTest, basics) {
+
+ ASSERT_NO_THROW(cbs_.addCallback("one",
+ std::bind(&NamedCallbackListTest::one, this)));
+ ASSERT_NO_THROW(cbs_.addCallback("two",
+ std::bind(&NamedCallbackListTest::two, this)));
+ ASSERT_NO_THROW(cbs_.addCallback("three",
+ std::bind(&NamedCallbackListTest::three, this)));
+
+ // Can't add an empty callback.
+ ASSERT_THROW_MSG(cbs_.addCallback("nadda", 0),
+ BadValue, "NamedCallbackList - callback: nadda, cannot be empty");
+
+ // Can't add one twice.
+ ASSERT_THROW_MSG(cbs_.addCallback("one",
+ std::bind(&NamedCallbackListTest::one, this)),
+ BadValue, "NamedCallbackList - callback: one, already exists");
+
+ for (auto cb : cbs_.getCallbacks() ) {
+ ASSERT_NO_THROW((cb.callback_)());
+ }
+
+ int i = 0;
+ for (auto invocation : invocations_) {
+ EXPECT_EQ(invocation, ++i);
+ }
+
+ runCallbacks();
+
+ EXPECT_EQ(3, invocations_.size());
+ EXPECT_EQ(1, invocations_[0]);
+ EXPECT_EQ(2, invocations_[1]);
+ EXPECT_EQ(3, invocations_[2]);
+
+ // Removing two shouldn't throw.
+ ASSERT_NO_THROW(cbs_.removeCallback("two"));
+
+ // Removing it again shouldn't throw.
+ ASSERT_NO_THROW(cbs_.removeCallback("two"));
+
+ runCallbacks();
+
+ EXPECT_EQ(2, invocations_.size());
+ EXPECT_EQ(1, invocations_[0]);
+ EXPECT_EQ(3, invocations_[1]);
+}
+
+} // namespace