]> git.ipfire.org Git - thirdparty/kea.git/commitdiff
[#1818] Added Multithreading CS callbacks to HA
authorThomas Markwalder <tmark@isc.org>
Mon, 17 May 2021 17:39:28 +0000 (13:39 -0400)
committerThomas Markwalder <tmark@isc.org>
Wed, 19 May 2021 20:14:16 +0000 (16:14 -0400)
src/bin/dhcp4/dhcp4_srv.cc
    Dhcpv4Srv::run() - added CS prior to MultiThreadingMgr:apply()
    prior to exit

src/bin/dhcp6/dhcp6_srv.cc
    Dhcpv6Srv::run() - added CS prior to MultiThreadingMgr:apply()
    prior to exit

src/hooks/dhcp/high_availability/ha_service.*
    HAService::startClientAndListener() - added call to register
    CS callbacks

    HAService::stopClientAndListener() - added call to remove CS
    callbacks

src/hooks/dhcp/high_availability/tests/ha_mt_unittest.cc
    Revamped to test CS callbacks rather than pause/resume directly.

src/lib/util/Makefile.am
    Added named_callbacks.h/cc

src/lib/util/named_callback.cc
src/lib/util/named_callback.h
    - new files that implement an ordered list of named callbacks

src/lib/util/multi_threading_mgr.*
    MultiThreadingMgr::apply() - removes all CS callbacks when entering
    single-threaded mode

    MultiThreadingMgr::stopProcessing() - formerly stopPktProcessing, added
    call to invoke CS exit callbacks

    MultiThreadingMgr::startProcessing()  - formerly startPktProcessing,
    added call to invoke CS entry callbacks.

    MultiThreadingMgr::addCriticalSectionCallbacks() - new method for
    adding CS callbacks

    MultiThreadingMgr::removeCriticalSectionCallbacks() -  new method for
    removing CS callbacks

    MultiThreadingMgr::removeAllCriticalSectionCallbacks() - new method for
    all CS callbacks

src/lib/util/tests/Makefile.am
    Added named_callback_unittest.cc

src/lib/util/tests/named_callback_unittest.cc - tests for NamedCallback
classes

src/lib/util/tests/multi_threading_mgr_unittest.cc
    CriticalSectionCallbackTest - new test fixture
    TEST_F(CriticalSectionCallbackTest, basics) - new test

13 files changed:
src/bin/dhcp4/dhcp4_srv.cc
src/bin/dhcp6/dhcp6_srv.cc
src/hooks/dhcp/high_availability/ha_service.cc
src/hooks/dhcp/high_availability/ha_service.h
src/hooks/dhcp/high_availability/tests/ha_mt_unittest.cc
src/lib/util/Makefile.am
src/lib/util/multi_threading_mgr.cc
src/lib/util/multi_threading_mgr.h
src/lib/util/named_callback.cc [new file with mode: 0644]
src/lib/util/named_callback.h [new file with mode: 0644]
src/lib/util/tests/Makefile.am
src/lib/util/tests/multi_threading_mgr_unittest.cc
src/lib/util/tests/named_callback_unittest.cc [new file with mode: 0644]

index 2d04cdecf0ebd2e2ff8d0dd4a4e43db8989ef4db..041c3d03861191743aeed6dda33d172487cd1121 100644 (file)
@@ -960,6 +960,9 @@ Dhcpv4Srv::run() {
         }
     }
 
+    // Stop everything before we change into single-threaded mode.
+    MultiThreadingCriticalSection cs;
+
     // destroying the thread pool
     MultiThreadingMgr::instance().apply(false, 0, 0);
 
index 72f89540380f04899c33c02e73f5d799aad5f5f9..a5ede5680947ae0bd197ab184c936a7874ba8aa2 100644 (file)
@@ -539,6 +539,9 @@ int Dhcpv6Srv::run() {
         }
     }
 
+    // Stop everything before we change into single-threaded mode.
+    MultiThreadingCriticalSection cs;
+
     // destroying the thread pool
     MultiThreadingMgr::instance().apply(false, 0, 0);
 
index 55f46d1b75fe66fd8c272e84fd1ae42638394660..a7402d284de2f492ab28908b1a005448907de059 100644 (file)
@@ -2815,6 +2815,11 @@ HAService::getPendingRequestInternal(const QueryPtrType& query) {
 
 void
 HAService::startClientAndListener() {
+    // Add critical section callbacks.
+    MultiThreadingMgr::instance().addCriticalSectionCallbacks("HA_MT",
+        std::bind(&HAService::pauseClientAndListener, this),
+        std::bind(&HAService::resumeClientAndListener, this));
+
     if (client_) {
         client_->start();
     }
@@ -2848,6 +2853,9 @@ HAService::resumeClientAndListener() {
 
 void
 HAService::stopClientAndListener() {
+    // Remove critical section callbacks.
+    MultiThreadingMgr::instance().removeCriticalSectionCallbacks("HA_MT");
+
     if (client_) {
         client_->stop();
     }
index 9c430056e003a2bb252a373faedb6aa8fa7d6374..617f60dfedb1d2653dc826aebce0d937472fd472 100644 (file)
@@ -1006,25 +1006,31 @@ public:
     ///
     /// When HA+Mt is enabled it starts the client's thread pool
     /// and the dedicated listener thread pool, if the listener exists.
+    /// It registers pauseClientAndListener() and resumeClientAndListener()
+    /// as the MultiThreading critical section entry and exit callbacks,
+    /// respectively.
     void startClientAndListener();
 
     /// @brief Pauses client and(or) listener thread pool operations.
     ///
     /// Suspends the client and listener thread pool event processing.
     /// Has no effect in single-threaded mode or if thread pools are
-    /// not currently running.
+    /// not currently running.  Serves as the MultiThreading critical
+    /// section entry callback.
     void pauseClientAndListener();
 
     /// @brief Resumes client and(or) listener thread pool operations.
     ///
     /// Resumes the client and listener thread pool event processing.
     /// Has no effect in single-threaded mode or if thread pools are
-    /// not currently paused.
+    /// not currently paused. Serves as the MultiThreading critical
+    /// section exit callback.
     void resumeClientAndListener();
 
     /// @brief Stop the client and(or) listener instances.
     ///
-    /// Closes all connections and stops the thread pools for the client
+    /// It unregisters the MultiThreading critical section callbacks,
+    /// closes all connections, and the stops the thread pools for the client
     /// and listener, if they exist.
     void stopClientAndListener();
 
index 7c650ae118a34153e7cab35deba0bded7c983618..53f2c82d0ad396ddc7912a18ffafde24c7ec2a1c 100644 (file)
@@ -149,7 +149,9 @@ public:
     }
 };
 
-// Verifies HA+MT start, pause, resume, and stop.
+// Verifies HA+MT start, pause, resume, and stop. Note
+// that pause and resume are tested indirectly through
+// entry and exit of a critical section.
 TEST_F(HAMtServiceTest, multiThreadingBasics) {
 
     // Build the HA JSON configuration.
@@ -228,19 +230,22 @@ TEST_F(HAMtServiceTest, multiThreadingBasics) {
         EXPECT_EQ(service->listener_->getThreadPoolSize(), 3);
         EXPECT_EQ(service->listener_->getThreadCount(), 3);
 
-        // Pause client and listener.
-        ASSERT_NO_THROW_LOG(service->pauseClientAndListener());
+        {
+            // Entering a critical section should pause both client
+            // and listener.
+            MultiThreadingCriticalSection cs;
 
-        // Client should be paused.
-        ASSERT_TRUE(service->client_->isPaused());
-        EXPECT_TRUE(service->client_->getThreadIOService()->stopped());
+            // Client should be paused.
+            ASSERT_TRUE(service->client_->isPaused());
+            EXPECT_TRUE(service->client_->getThreadIOService()->stopped());
 
-        // Listener should be paused.
-        ASSERT_TRUE(service->listener_->isPaused());
-        EXPECT_TRUE(service->listener_->getThreadIOService()->stopped());
+            // Listener should be paused.
+            ASSERT_TRUE(service->listener_->isPaused());
+            EXPECT_TRUE(service->listener_->getThreadIOService()->stopped());
+        }
 
-        // Now resume client and listener.
-        ASSERT_NO_THROW_LOG(service->resumeClientAndListener());
+        // Exiting critical section should resume both cllent
+        // and listener.
 
         // Client should be running.
         ASSERT_TRUE(service->client_->isRunning());
index b5dbe823ed0ea692ef5d8737cb7e0a0273cfcd7a..85d43d75fdf11388e72a0f6f30099fe73a9dd646 100644 (file)
@@ -18,6 +18,7 @@ libkea_util_la_SOURCES += labeled_value.h labeled_value.cc
 libkea_util_la_SOURCES += memory_segment.h
 libkea_util_la_SOURCES += memory_segment_local.h memory_segment_local.cc
 libkea_util_la_SOURCES += multi_threading_mgr.h multi_threading_mgr.cc
+libkea_util_la_SOURCES += named_callback.h named_callback.cc
 libkea_util_la_SOURCES += optional.h
 libkea_util_la_SOURCES += pid_file.h pid_file.cc
 libkea_util_la_SOURCES += pointer_util.h
@@ -66,6 +67,7 @@ libkea_util_include_HEADERS = \
        memory_segment.h \
        memory_segment_local.h \
        multi_threading_mgr.h \
+       named_callback.h \
        optional.h \
        pid_file.h \
        pointer_util.h \
index 27f5e8e46df19e476a335428392ac816e597b48e..f7835810615d476d69eb09a959a32336fc31972f 100644 (file)
@@ -36,7 +36,7 @@ MultiThreadingMgr::setMode(bool enabled) {
 
 void
 MultiThreadingMgr::enterCriticalSection() {
-    stopPktProcessing();
+    stopProcessing();
     ++critical_section_count_;
 }
 
@@ -46,7 +46,7 @@ MultiThreadingMgr::exitCriticalSection() {
         isc_throw(InvalidOperation, "invalid negative value for override");
     }
     --critical_section_count_;
-    startPktProcessing();
+    startProcessing();
 }
 
 bool
@@ -110,6 +110,7 @@ MultiThreadingMgr::apply(bool enabled, uint32_t thread_count, uint32_t queue_siz
             thread_pool_.start(thread_count);
         }
     } else {
+        removeAllCriticalSectionCallbacks();
         thread_pool_.reset();
         setMode(false);
         setThreadPoolSize(thread_count);
@@ -118,19 +119,53 @@ MultiThreadingMgr::apply(bool enabled, uint32_t thread_count, uint32_t queue_siz
 }
 
 void
-MultiThreadingMgr::stopPktProcessing() {
-    if (getMode() && getThreadPoolSize() && !isInCriticalSection()) {
-        thread_pool_.stop();
+MultiThreadingMgr::stopProcessing() {
+    if (getMode() && !isInCriticalSection()) {
+        if (getThreadPoolSize()) {
+            thread_pool_.stop();
+        }
+
+        for (auto cb : critical_entry_cbs_.getCallbacks() ) {
+            // @todo need to think about what we do with exceptions
+            (cb.callback_)();
+        }
     }
 }
 
 void
-MultiThreadingMgr::startPktProcessing() {
-    if (getMode() && getThreadPoolSize() && !isInCriticalSection()) {
-        thread_pool_.start(getThreadPoolSize());
+MultiThreadingMgr::startProcessing() {
+    if (getMode() && !isInCriticalSection()) {
+        if (getThreadPoolSize()) {
+            thread_pool_.start(getThreadPoolSize());
+        }
+
+        for (auto cb : critical_exit_cbs_.getCallbacks() ) {
+            // @todo need to think about what we do with exceptions
+            (cb.callback_)();
+        }
     }
 }
 
+void
+MultiThreadingMgr::addCriticalSectionCallbacks(const std::string& name,
+                                               const NamedCallback::Callback& entry_cb,
+                                               const NamedCallback::Callback& exit_cb) {
+    critical_entry_cbs_.addCallback(name, entry_cb);
+    critical_exit_cbs_.addCallback(name, exit_cb);
+}
+
+void
+MultiThreadingMgr::removeCriticalSectionCallbacks(const std::string& name) {
+    critical_entry_cbs_.removeCallback(name);
+    critical_exit_cbs_.removeCallback(name);
+}
+
+void
+MultiThreadingMgr::removeAllCriticalSectionCallbacks() {
+    critical_entry_cbs_.removeAll();
+    critical_exit_cbs_.removeAll();
+}
+
 MultiThreadingCriticalSection::MultiThreadingCriticalSection() {
     MultiThreadingMgr::instance().enterCriticalSection();
 }
index d4d8b0d0fd974c49a9f4c70a2fb9e22a736a6dee..9ad4373f41e77afb57dc95b69d7296fa4c65ad23 100644 (file)
@@ -1,4 +1,4 @@
-// Copyright (C) 2019-2020 Internet Systems Consortium, Inc. ("ISC")
+// Copyright (C) 2019-2021 Internet Systems Consortium, Inc. ("ISC")
 //
 // This Source Code Form is subject to the terms of the Mozilla Public
 // License, v. 2.0. If a copy of the MPL was not distributed with this
@@ -8,6 +8,7 @@
 #define MULTI_THREADING_MGR_H
 
 #include <util/thread_pool.h>
+#include <util/named_callback.h>
 
 #include <boost/noncopyable.hpp>
 
@@ -129,6 +130,27 @@ public:
     /// configured, 0 for unlimited size
     void apply(bool enabled, uint32_t thread_count, uint32_t queue_size);
 
+    /// @brief Adds a pair of callbacks to the list of CriticalSection callbacks.
+    ///
+    /// @param name Name of the set of callbacks. This value is used by the
+    /// callback owner to remove or replace them.Duplicates are not allowed.
+    /// @param entry_cb Callback to invoke upon CriticalSection entry.
+    /// @param exit_cb Callback to invoke upon CriticalSection exit.
+    void addCriticalSectionCallbacks(const std::string& name,
+                                     const NamedCallback::Callback& entry_cb,
+                                     const NamedCallback::Callback& exit_cb);
+
+    /// @brief Removes the set of callbacks associated with a given name
+    /// from the list of CriticalSection callbacks.
+    ///
+    /// If the name is not found in the list, it simply returns.
+    ///
+    /// @param name Name of the set of callbacks to remove.
+    void removeCriticalSectionCallbacks(const std::string& name);
+
+    /// @brief Removes all callbacks in the list of CriticalSection callbacks.
+    void removeAllCriticalSectionCallbacks();
+
 protected:
 
     /// @brief Constructor.
@@ -139,15 +161,19 @@ protected:
 
 private:
 
-    /// @brief Class method stopping and joining all threads of the pool.
+    /// @brief Class method stops non-critical processing.
     ///
-    /// Stop the dhcp thread pool if running.
-    void stopPktProcessing();
+    /// Stops the DHCP thread pool if it's running and invokes
+    /// all CriticalSection entry callbacks.  Has no effect
+    /// in single-threaded mode.
+    void stopProcessing();
 
-    /// @brief Class method (re)starting threads of the pool.
+    /// @brief Class method (re)starts non-critical processing.
     ///
-    /// Start the dhcp thread pool according to current configuration.
-    void startPktProcessing();
+    /// Starts the DHCP thread pool according to current configuration,
+    /// and invokes all CriticalSection exit callbacks. Has no effect
+    /// in single-threaded mode.
+    void startProcessing();
 
     /// @brief The current multi-threading mode.
     ///
@@ -168,6 +194,12 @@ private:
 
     /// @brief Packet processing thread pool.
     ThreadPool<std::function<void()>> thread_pool_;
+
+    /// @brief List of callbacks to invoke upon CriticalSection entry.
+    NamedCallbackList critical_entry_cbs_;
+
+    /// @brief List of callbacks to invoke upon CriticalSection exit.
+    NamedCallbackList critical_exit_cbs_;
 };
 
 /// @note: everything here MUST be used ONLY from the main thread.
diff --git a/src/lib/util/named_callback.cc b/src/lib/util/named_callback.cc
new file mode 100644 (file)
index 0000000..9ad5d7f
--- /dev/null
@@ -0,0 +1,58 @@
+// Copyright (C) 2021 Internet Systems Consortium, Inc. ("ISC")
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+#include <config.h>
+
+#include <exceptions/exceptions.h>
+#include <util/named_callback.h>
+
+#include <iostream>
+#include <string>
+
+namespace isc {
+namespace util {
+
+void
+NamedCallbackList::addCallback(const std::string& name, const NamedCallback::Callback& cb) {
+    if (!cb) {
+        isc_throw(BadValue, "NamedCallbackList - callback: " << name
+                      << ", cannot be empty");
+    }
+
+    for (auto const& callback : callbacks_) {
+        if (callback.name_ == name) {
+            isc_throw(BadValue, "NamedCallbackList - callback: " << name
+                      << ", already exists");
+        }
+    }
+
+    callbacks_.push_back(NamedCallback(name, cb));
+}
+
+void
+NamedCallbackList::removeCallback(const std::string& name) {
+    for (auto it = callbacks_.begin(); it != callbacks_.end(); ) {
+        if ((*it).name_ == name) {
+            it = callbacks_.erase(it);
+            break;
+        }
+
+        ++it;
+    }
+}
+
+void
+NamedCallbackList::removeAll() {
+    callbacks_.clear();
+}
+
+const std::list<NamedCallback>&
+NamedCallbackList::getCallbacks() {
+    return (callbacks_);
+}
+
+}  // namespace util
+}  // namespace isc
diff --git a/src/lib/util/named_callback.h b/src/lib/util/named_callback.h
new file mode 100644 (file)
index 0000000..ac98459
--- /dev/null
@@ -0,0 +1,79 @@
+// Copyright (C) 2021 Internet Systems Consortium, Inc. ("ISC")
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+#ifndef NAMED_CALLBACK_H
+#define NAMED_CALLBACK_H
+
+#include <exceptions/exceptions.h>
+#include <boost/make_shared.hpp>
+#include <boost/shared_ptr.hpp>
+
+#include <functional>
+#include <list>
+
+namespace isc {
+namespace util {
+
+/// @brief Associates a Callback with a name.
+struct NamedCallback {
+    /// @brief Defines a callback as a simple void() functor.
+    typedef std::function<void()> Callback;
+
+    /// @brief Constructor
+    ///
+    /// @param name Name by which the callback can be found.
+    /// @param cb Callback associated with name.
+    NamedCallback(const std::string& name, const Callback& cb)
+        : name_(name), callback_(cb) {
+    };
+
+    /// @Brief Name by which the callback can be found.
+    std::string name_;
+
+    /// @Brief Callback associated with name.
+    Callback callback_;
+};
+
+/// @brief Maintains list of unique NamedCallbacks.
+///
+/// The list emphasizes iteration order and speed over 
+/// retrieval by name. When iterating over the list of
+/// callbacks, they are returned in the order they were
+/// added, not by name.
+class NamedCallbackList {
+public:
+    /// @brief Constructor.
+    NamedCallbackList(){};
+
+    /// @brief Adds a callback to the list.
+    ///
+    /// @param name Name of the callback to add.
+    /// @param cb Callback to add.
+    ///
+    /// @throw BadValue if the name is already in the list.
+    void addCallback(const std::string& name, const NamedCallback::Callback& cb);
+
+    /// @brief Removes a callback from the list.
+    ///
+    /// @param name Name of the callback to remove.
+    /// If no such callback exists, it simply returns.
+    void removeCallback(const std::string& name);
+
+    /// @brief Removes all callbacks from the list.
+    void removeAll();
+
+    /// @brief Fetches the list of callbacks.
+    const std::list<NamedCallback>& getCallbacks();
+
+private:
+    /// @brief The list of callbacks.
+    std::list<NamedCallback>  callbacks_;
+};
+
+}  // namespace util
+}  // namespace isc
+
+#endif  // NAMED_CALLBACK_H
index d8354e967bb70f426eef03dfd2794606b91e8a57..dfde69ccfe82c2582ee30daab0d07e78b667d452 100644 (file)
@@ -42,6 +42,7 @@ run_unittests_SOURCES += memory_segment_local_unittest.cc
 run_unittests_SOURCES += memory_segment_common_unittest.h
 run_unittests_SOURCES += memory_segment_common_unittest.cc
 run_unittests_SOURCES += multi_threading_mgr_unittest.cc
+run_unittests_SOURCES += named_callback_unittest.cc
 run_unittests_SOURCES += optional_unittest.cc
 run_unittests_SOURCES += pid_file_unittest.cc
 run_unittests_SOURCES += qid_gen_unittest.cc
index 35e8fce0cf7f8dbd3cac89aed4c1a922ac599645..3ff29b3719750b26e7a3197713f080eda4280aa9 100644 (file)
@@ -1,4 +1,4 @@
-// Copyright (C) 2019-2020 Internet Systems Consortium, Inc. ("ISC")
+// Copyright (C) 2019-2021 Internet Systems Consortium, Inc. ("ISC")
 //
 // This Source Code Form is subject to the terms of the Mozilla Public
 // License, v. 2.0. If a copy of the MPL was not distributed with this
@@ -322,3 +322,148 @@ TEST(MultiThreadingMgrTest, criticalSection) {
     // apply multi-threading configuration with 0 threads
     MultiThreadingMgr::instance().apply(false, 0, 0);
 }
+
+/// @brief Test fixture for exercised CriticalSection callbacks.
+class CriticalSectionCallbackTest : public ::testing::Test {
+public:
+    /// @Brief Constructor.
+    CriticalSectionCallbackTest() {};
+
+    /// @Brief A callback that adds the value, 1, to invocations lists.
+    void one() {
+        invocations_.push_back(1);
+    }
+
+    /// @Brief A callback that adds the value, 2, to invocations lists.
+    void two() {
+        invocations_.push_back(2);
+    }
+
+    /// @Brief A callback that adds the value, 3, to invocations lists.
+    void three() {
+        invocations_.push_back(3);
+    }
+
+    /// @Brief A callback that adds the value, 4, to invocations lists.
+    void four() {
+        invocations_.push_back(4);
+    }
+
+    /// @brief Indicates whether or not the DHCP thread pool is running.
+    ///
+    /// @return True if the pool is running, false otherwise.
+    bool isThreadPoolRunning() {
+        return (MultiThreadingMgr::instance().getThreadPool().size());
+    }
+
+    /// @brief Checks callback invocations over a series of nested
+    /// CriticalSecitons.
+    ///
+    /// @param entries A vector of the invocation values that should
+    /// be present after entry into the outermost CriticalSection.  The
+    /// expected  values should be in the order the callbacks were
+    /// added to the  MultiThreadingMgr's list of callbacks.
+    /// @param exits A vector of the invocation values that should
+    /// be present after exiting the CriticalSection.  The expected
+    /// values should be in the order the callbacks were added to the
+    /// MultiThreadingMgr's list of callbacks.
+    void runCriticalSections(std::vector<int> entries, std::vector<int>exits) {
+        // Pool must be running.
+        ASSERT_TRUE(isThreadPoolRunning());
+
+        // Clear the invocations list.
+        invocations_.clear();
+
+        // Use scope to create nested CriticalSections.
+        {
+            // Enter a critical section.
+            MultiThreadingCriticalSection cs;
+
+            // Thread pool should be stopped.
+            ASSERT_FALSE(isThreadPoolRunning());
+
+            if (entries.size()) {
+                // We expect entry invocations.
+                ASSERT_TRUE(invocations_ ==  entries);
+            } else {
+                // We do not expect entry invocations.
+                ASSERT_FALSE(invocations_.size());
+            }
+
+            // Clear the invocations list.
+            invocations_.clear();
+
+            {
+                // Enter another CriticalSection.
+                MultiThreadingCriticalSection inner_cs;
+
+                // thread pool should still be stopped
+                ASSERT_FALSE(isThreadPoolRunning());
+
+                // We should not have had any callback invocations.
+                ASSERT_FALSE(invocations_.size());
+            }
+
+            // After exiting inner setion, the thread pool should
+            // still be stopped
+            ASSERT_FALSE(isThreadPoolRunning());
+
+            // We should not have had more callback invocations.
+            ASSERT_FALSE(invocations_.size());
+        }
+
+        // After exiting the outer setion, the thread pool should
+        // match the thread count.
+        ASSERT_TRUE(isThreadPoolRunning());
+
+        if (exits.size()) {
+            // We expect exit invocations.
+            ASSERT_TRUE(invocations_ ==  exits);
+        } else {
+            // We do not expect exit invocations.
+            ASSERT_FALSE(invocations_.size());
+        }
+    }
+
+    /// @Brief A list of values set by callback invocations.
+    std::vector<int> invocations_;
+};
+
+/// @brief Verifies that the critical section callbacks work.
+TEST_F(CriticalSectionCallbackTest, basics) {
+    // get the thread pool instance
+    auto& thread_pool = MultiThreadingMgr::instance().getThreadPool();
+    // thread pool should be stopped
+    EXPECT_EQ(thread_pool.size(), 0);
+
+    // Add two sets of CriticalSection call backs.
+    MultiThreadingMgr::instance().addCriticalSectionCallbacks("oneAndTwo",
+         std::bind(&CriticalSectionCallbackTest::one, this),
+         std::bind(&CriticalSectionCallbackTest::two, this));
+
+    MultiThreadingMgr::instance().addCriticalSectionCallbacks("threeAndFour",
+         std::bind(&CriticalSectionCallbackTest::three, this),
+         std::bind(&CriticalSectionCallbackTest::four, this));
+
+    // Apply multi-threading configuration with 16 threads and queue size 256.
+    MultiThreadingMgr::instance().apply(true, 16, 256);
+
+    // Make three passes over nested CriticalSections to ensure
+    // callbacks execute at the appropriate times and we can do
+    // so repeatedly.
+    for (int i = 0; i < 3; ++i) {
+        runCriticalSections({1,3}, {2,4});
+    }
+
+    // Now remove the first set of callbacks.
+    MultiThreadingMgr::instance().removeCriticalSectionCallbacks("oneAndTwo");
+
+    // Retest CriticalSections.
+    runCriticalSections({3}, {4});
+
+    // Now remove the remaining callbacks.
+    MultiThreadingMgr::instance().removeAllCriticalSectionCallbacks();
+
+    // Retest CriticalSections.
+    runCriticalSections({}, {});
+}
diff --git a/src/lib/util/tests/named_callback_unittest.cc b/src/lib/util/tests/named_callback_unittest.cc
new file mode 100644 (file)
index 0000000..7de9135
--- /dev/null
@@ -0,0 +1,95 @@
+// Copyright (C) 2021 Internet Systems Consortium, Inc. ("ISC")
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+#include <config.h>
+
+#include <gtest/gtest.h>
+
+#include <exceptions/exceptions.h>
+#include <util/named_callback.h>
+#include <testutils/gtest_utils.h>
+
+using namespace isc;
+using namespace isc::util;
+using namespace std;
+
+namespace {
+
+class NamedCallbackListTest : public ::testing::Test {
+public:
+    NamedCallbackListTest() {};
+
+    void one() {
+        invocations_.push_back(1);
+    }
+
+    void two() {
+        invocations_.push_back(2);
+    }
+
+    void three() {
+        invocations_.push_back(3);
+    }
+
+    void runCallbacks() {
+        invocations_.clear();
+        for (auto cb : cbs_.getCallbacks() ) {
+            ASSERT_NO_THROW((cb.callback_)());
+        }
+    }
+
+    NamedCallbackList cbs_;
+    std::vector<int> invocations_;
+};
+
+TEST_F(NamedCallbackListTest, basics) {
+
+    ASSERT_NO_THROW(cbs_.addCallback("one",
+                                     std::bind(&NamedCallbackListTest::one, this)));
+    ASSERT_NO_THROW(cbs_.addCallback("two",
+                                     std::bind(&NamedCallbackListTest::two, this)));
+    ASSERT_NO_THROW(cbs_.addCallback("three",
+                                     std::bind(&NamedCallbackListTest::three, this)));
+
+    // Can't add an empty callback.
+    ASSERT_THROW_MSG(cbs_.addCallback("nadda", 0),
+                     BadValue, "NamedCallbackList - callback: nadda, cannot be empty");
+
+    // Can't add one twice.
+    ASSERT_THROW_MSG(cbs_.addCallback("one",
+                                      std::bind(&NamedCallbackListTest::one, this)),
+                     BadValue, "NamedCallbackList - callback: one, already exists");
+
+    for (auto cb : cbs_.getCallbacks() ) {
+        ASSERT_NO_THROW((cb.callback_)());
+    }
+
+    int i = 0;
+    for (auto invocation : invocations_) {
+        EXPECT_EQ(invocation, ++i);
+    }
+
+    runCallbacks();
+
+    EXPECT_EQ(3, invocations_.size());
+    EXPECT_EQ(1, invocations_[0]);
+    EXPECT_EQ(2, invocations_[1]);
+    EXPECT_EQ(3, invocations_[2]);
+
+    // Removing two shouldn't throw.
+    ASSERT_NO_THROW(cbs_.removeCallback("two"));
+
+    // Removing it again shouldn't throw.
+    ASSERT_NO_THROW(cbs_.removeCallback("two"));
+
+    runCallbacks();
+
+    EXPECT_EQ(2, invocations_.size());
+    EXPECT_EQ(1, invocations_[0]);
+    EXPECT_EQ(3, invocations_[1]);
+}
+
+}  // namespace