]> git.ipfire.org Git - thirdparty/kea.git/commitdiff
[#2043] safety measures for illegal opetations on thread pool done by owned threads
authorRazvan Becheriu <razvan@isc.org>
Mon, 23 Aug 2021 13:16:57 +0000 (16:16 +0300)
committerRazvan Becheriu <razvan@isc.org>
Tue, 24 Aug 2021 18:52:09 +0000 (21:52 +0300)
12 files changed:
src/hooks/dhcp/high_availability/ha_messages.cc
src/hooks/dhcp/high_availability/ha_messages.h
src/hooks/dhcp/high_availability/ha_messages.mes
src/hooks/dhcp/high_availability/ha_service.cc
src/lib/config/tests/cmd_http_listener_unittests.cc
src/lib/exceptions/exceptions.h
src/lib/http/http_thread_pool.cc
src/lib/http/http_thread_pool.h
src/lib/http/tests/client_mt_unittests.cc
src/lib/util/multi_threading_mgr.cc
src/lib/util/tests/thread_pool_unittest.cc
src/lib/util/thread_pool.h

index c360fd781d41cab824418c97c5170b4dc06ebdc0..4a8ebe1ce7ec9f3cda6d91825dc472c958d1e0b8 100644 (file)
@@ -85,6 +85,7 @@ extern const isc::log::MessageID HA_MAINTENANCE_STARTED_IN_PARTNER_DOWN = "HA_MA
 extern const isc::log::MessageID HA_MAINTENANCE_START_HANDLER_FAILED = "HA_MAINTENANCE_START_HANDLER_FAILED";
 extern const isc::log::MessageID HA_MISSING_CONFIGURATION = "HA_MISSING_CONFIGURATION";
 extern const isc::log::MessageID HA_PAUSE_CLIENT_LISTENER_FAILED = "HA_PAUSE_CLIENT_LISTENER_FAILED";
+extern const isc::log::MessageID HA_PAUSE_CLIENT_LISTENER_ILLEGAL = "HA_PAUSE_CLIENT_LISTENER_ILLEGAL";
 extern const isc::log::MessageID HA_RESET_COMMUNICATIONS_FAILED = "HA_RESET_COMMUNICATIONS_FAILED";
 extern const isc::log::MessageID HA_RESET_FAILED = "HA_RESET_FAILED";
 extern const isc::log::MessageID HA_RESET_HANDLER_FAILED = "HA_RESET_HANDLER_FAILED";
@@ -186,6 +187,7 @@ const char* values[] = {
     "HA_MAINTENANCE_START_HANDLER_FAILED", "ha-maintenance-start command failed: %1",
     "HA_MISSING_CONFIGURATION", "high-availability parameter not specified for High Availability hooks library",
     "HA_PAUSE_CLIENT_LISTENER_FAILED", "Pausing multi-threaded HTTP processing failed: %1",
+    "HA_PAUSE_CLIENT_LISTENER_ILLEGAL", "Pausing multi-threaded HTTP processing failed: %1",
     "HA_RESET_COMMUNICATIONS_FAILED", "failed to send ha-reset command to %1: %2",
     "HA_RESET_FAILED", "failed to reset HA state machine of %1: %2",
     "HA_RESET_HANDLER_FAILED", "ha-reset command failed: %1",
index 78ae6fb2ed99a7b9ee15cad4043f6e995ca007b3..942eb0fc4c942e107fec25dc876717fc1d7be13c 100644 (file)
@@ -86,6 +86,7 @@ extern const isc::log::MessageID HA_MAINTENANCE_STARTED_IN_PARTNER_DOWN;
 extern const isc::log::MessageID HA_MAINTENANCE_START_HANDLER_FAILED;
 extern const isc::log::MessageID HA_MISSING_CONFIGURATION;
 extern const isc::log::MessageID HA_PAUSE_CLIENT_LISTENER_FAILED;
+extern const isc::log::MessageID HA_PAUSE_CLIENT_LISTENER_ILLEGAL;
 extern const isc::log::MessageID HA_RESET_COMMUNICATIONS_FAILED;
 extern const isc::log::MessageID HA_RESET_FAILED;
 extern const isc::log::MessageID HA_RESET_HANDLER_FAILED;
index 513c410dd3832525e76abcaa9cddb98372f212f4..7712c42567bfd0fad35d56fbe40dada629766629 100644 (file)
@@ -486,6 +486,12 @@ This error message is emitted when attempting to pause HA's HTTP client and
 listener threads. This error is highly unlikely and indicates a programmatic
 issue that should be reported as a defect.
 
+% HA_PAUSE_CLIENT_LISTENER_ILLEGAL Pausing multi-threaded HTTP processing failed: %1
+This error message is emitted when attempting to pause HA's HTTP client and
+listener threads from owned thread. This error indicates that a command run on
+the listener threads is trying to use a critical section which would result in
+a dead-lock.
+
 % HA_RESET_COMMUNICATIONS_FAILED failed to send ha-reset command to %1: %2
 This warning message indicates a problem with communication with a HA peer
 while sending the ha-reset command. The first argument specifies a remote
index 387d31b88a733d162fcd03274bf668d939709dc9..9e3042360c25a3c897c8d6e5e8e14b76fa68e81a 100644 (file)
@@ -1268,9 +1268,9 @@ HAService::updatePendingRequestInternal(QueryPtrType& query) {
 template<typename QueryPtrType>
 void
 HAService::asyncSendLeaseUpdate(const QueryPtrType& query,
-                          const HAConfig::PeerConfigPtr& config,
-                          const ConstElementPtr& command,
-                          const ParkingLotHandlePtr& parking_lot) {
+                                const HAConfig::PeerConfigPtr& config,
+                                const ConstElementPtr& command,
+                                const ParkingLotHandlePtr& parking_lot) {
     // Create HTTP/1.1 request including our command.
     PostHttpRequestJsonPtr request = boost::make_shared<PostHttpRequestJson>
         (HttpRequest::Method::HTTP_POST, "/", HttpVersion::HTTP_11(),
@@ -2834,14 +2834,25 @@ HAService::pauseClientAndListener() {
     // Since we're used as CS callback we need to suppress
     // any exceptions, unlikely though they may be.
     try {
+        // The listener is the only one handling commands, so if any command
+        // uses @ref MultiThreadingCriticalSection, it should throw first.
+        // In this situation there is no need to resume the client's
+        // @ref HttpThreadPool because it does not get paused in the first place.
+        if (listener_) {
+            listener_->pause();
+        }
+
         if (client_) {
             client_->pause();
         }
 
-        if (listener_) {
-            listener_->pause();
-        }
-    } catch (std::exception& ex) {
+    } catch (const isc::MultiThreadingInvalidOperation& ex) {
+        LOG_ERROR(ha_logger, HA_PAUSE_CLIENT_LISTENER_ILLEGAL)
+                  .arg(ex.what());
+        // The exception needs to be propagated to the caller of the
+        // @ref MultiThreadingCriticalSection constructor.
+        throw;
+    } catch (const std::exception& ex) {
         LOG_ERROR(ha_logger, HA_PAUSE_CLIENT_LISTENER_FAILED)
                   .arg(ex.what());
     }
@@ -2870,13 +2881,13 @@ HAService::stopClientAndListener() {
     // Remove critical section callbacks.
     MultiThreadingMgr::instance().removeCriticalSectionCallbacks("HA_MT");
 
-    if (client_) {
-        client_->stop();
-    }
-
     if (listener_) {
         listener_->stop();
     }
+
+    if (client_) {
+        client_->stop();
+    }
 }
 
 // Explicit instantiations.
index 32b4b5ecafc28f675eeade4985f2a86266257ae3..55b2fbf25c77c5cdcd19d702da5e5d8beb034337 100644 (file)
@@ -241,6 +241,10 @@ public:
                                       const ConstElementPtr& /*command_arguments*/) {
         ElementPtr arguments = Element::createList();
         arguments->add(Element::create("bar"));
+        EXPECT_THROW(listener_->start(), InvalidOperation);
+        EXPECT_THROW(listener_->pause(), MultiThreadingInvalidOperation);
+        EXPECT_THROW(listener_->resume(), MultiThreadingInvalidOperation);
+        EXPECT_THROW(listener_->stop(), MultiThreadingInvalidOperation);
         return (createAnswer(CONTROL_RESULT_SUCCESS, arguments));
     }
 
index f5368c84587c80add646e22bb66e8bccf4d22bc1..47adf96d9d0c2ee59fa20d67e10265e60bb32b0f 100644 (file)
@@ -178,6 +178,14 @@ public:
         isc::Exception(file, line, what) {}
 };
 
+/// \brief Exception thrown when an owned thread is trying to stop or pause the
+/// respective thread pool (which would result in a dead-lock).
+class MultiThreadingInvalidOperation : public Exception {
+public:
+    MultiThreadingInvalidOperation(const char* file, size_t line, const char* what) :
+        isc::Exception(file, line, what) {};
+};
+
 ///
 /// A shortcut macro to insert known values into exception arguments.
 ///
index 7e649f5f39b56ed22a2ec396f4f22d98aa81436e..dfa995d67713eae937f856c3de80b40e6c52e3ec 100644 (file)
@@ -85,12 +85,40 @@ HttpThreadPool::validateStateChange(State new_state) const {
     case State::PAUSED:
         return (new_state != State::PAUSED);
     }
+    return (false);
+}
 
+std::string
+HttpThreadPool::stateToText(State state) {
+    switch (state) {
+    case State::STOPPED:
+        return (std::string("stopped"));
+    case State::RUNNING:
+        return (std::string("running"));
+    case State::PAUSED:
+        return (std::string("paused"));
+    }
+    return (std::string("unknown-state"));
+}
+
+bool
+HttpThreadPool::checkThreadId(std::thread::id id) {
+    for (auto thread : threads_) {
+        if (id == thread->get_id()) {
+            return (true);
+        }
+    }
     return (false);
 }
 
 void
 HttpThreadPool::setState(State new_state) {
+    auto id = std::this_thread::get_id();
+    if (checkThreadId(id)) {
+        isc_throw(MultiThreadingInvalidOperation, "invalid thread pool state change to "
+                  << HttpThreadPool::stateToText(new_state) << " performed by owned thread");
+    }
+
     std::unique_lock<std::mutex> main_lck(mutex_);
 
     // Bail if the transition is invalid.
index dddb43be333d9cce2f4b69d5648e8f541fdedcd3..1096e778263057cac40bfde029cca369e5f84fa9 100644 (file)
@@ -93,6 +93,11 @@ public:
     }
 
 private:
+    /// @brief Check specified thread id against own threads.
+    ///
+    /// @return true if thread is owned, false otherwise.
+    bool checkThreadId(std::thread::id id);
+
     /// @brief Thread-safe change of the pool's run state.
     ///
     /// Transitions a pool from one run state to another:
@@ -139,6 +144,12 @@ private:
     /// @note Must be called from a thread-safe context.
     bool validateStateChange(State state) const;
 
+    /// @brief Text representation of a given state.
+    ///
+    /// @param state The state for the pool.
+    /// @return The text representation of a given state.
+    static std::string stateToText(State state);
+
     /// @brief Work function executed by each thread in the pool.
     ///
     /// Implements the run state responsibilities for a given thread.
index df09d1ad3e4beb5b24942ec5e1ba6bd7164443cf..d5d8cfc99639cee936edc1769679b6fe904283fd 100644 (file)
@@ -191,22 +191,22 @@ public:
 };
 
 /// @brief Test fixture class for testing threading modes of HTTP client.
-class MtHttpClientTest : public ::testing::Test {
+class MultiThreadingHttpClientTest : public ::testing::Test {
 public:
 
     /// @brief Constructor.
-    MtHttpClientTest()
+    MultiThreadingHttpClientTest()
         : io_service_(), client_(), listener_(), factory_(), listeners_(), factories_(),
           test_timer_(io_service_), num_threads_(0), num_batches_(0), num_listeners_(0),
           expected_requests_(0), num_in_progress_(0), num_finished_(0), paused_(false),
           pause_cnt_(0) {
-        test_timer_.setup(std::bind(&MtHttpClientTest::timeoutHandler, this, true),
+        test_timer_.setup(std::bind(&MultiThreadingHttpClientTest::timeoutHandler, this, true),
                           TEST_TIMEOUT, IntervalTimer::ONE_SHOT);
         MultiThreadingMgr::instance().setMode(true);
     }
 
     /// @brief Destructor.
-    ~MtHttpClientTest() {
+    ~MultiThreadingHttpClientTest() {
         // Stop the client.
         if (client_) {
             client_->stop();
@@ -811,7 +811,7 @@ public:
 
 // Verifies we can construct and destruct, in both single
 // and multi-threaded modes.
-TEST_F(MtHttpClientTest, basics) {
+TEST_F(MultiThreadingHttpClientTest, basics) {
     MultiThreadingMgr::instance().setMode(false);
     HttpClientPtr client;
 
@@ -879,7 +879,7 @@ TEST_F(MtHttpClientTest, basics) {
 }
 
 // Verifies we can construct with deferred start.
-TEST_F(MtHttpClientTest, deferredStart) {
+TEST_F(MultiThreadingHttpClientTest, deferredStart) {
     MultiThreadingMgr::instance().setMode(true);
     HttpClientPtr client;
     size_t thread_pool_size = 3;
@@ -923,7 +923,7 @@ TEST_F(MtHttpClientTest, deferredStart) {
 }
 
 // Verifies we can restart after stop.
-TEST_F(MtHttpClientTest, restartAfterStop) {
+TEST_F(MultiThreadingHttpClientTest, restartAfterStop) {
     MultiThreadingMgr::instance().setMode(true);
     HttpClientPtr client;
     size_t thread_pool_size = 3;
@@ -964,14 +964,14 @@ TEST_F(MtHttpClientTest, restartAfterStop) {
 // requests, and listeners.
 
 // Single-threaded, three batches, one listener.
-TEST_F(MtHttpClientTest, zeroByThreeByOne) {
+TEST_F(MultiThreadingHttpClientTest, zeroByThreeByOne) {
     size_t num_threads = 0; // Zero threads = ST mode.
     size_t num_batches = 3;
     threadRequestAndReceive(num_threads, num_batches);
 }
 
 // Single-threaded, three batches, three listeners.
-TEST_F(MtHttpClientTest, zeroByThreeByThree) {
+TEST_F(MultiThreadingHttpClientTest, zeroByThreeByThree) {
     size_t num_threads = 0; // Zero threads = ST mode.
     size_t num_batches = 3;
     size_t num_listeners = 3;
@@ -979,28 +979,28 @@ TEST_F(MtHttpClientTest, zeroByThreeByThree) {
 }
 
 // Multi-threaded with one thread, three batches, one listener
-TEST_F(MtHttpClientTest, oneByThreeByOne) {
+TEST_F(MultiThreadingHttpClientTest, oneByThreeByOne) {
     size_t num_threads = 1;
     size_t num_batches = 3;
     threadRequestAndReceive(num_threads, num_batches);
 }
 
 // Multi-threaded with three threads, three batches, one listener
-TEST_F(MtHttpClientTest, threeByThreeByOne) {
+TEST_F(MultiThreadingHttpClientTest, threeByThreeByOne) {
     size_t num_threads = 3;
     size_t num_batches = 3;
     threadRequestAndReceive(num_threads, num_batches);
 }
 
 // Multi-threaded with three threads, nine batches, one listener
-TEST_F(MtHttpClientTest, threeByNineByOne) {
+TEST_F(MultiThreadingHttpClientTest, threeByNineByOne) {
     size_t num_threads = 3;
     size_t num_batches = 9;
     threadRequestAndReceive(num_threads, num_batches);
 }
 
 // Multi-threaded with two threads, four batches, two listeners
-TEST_F(MtHttpClientTest, twoByFourByTwo) {
+TEST_F(MultiThreadingHttpClientTest, twoByFourByTwo) {
     size_t num_threads = 2;
     size_t num_batches = 4;
     size_t num_listeners = 2;
@@ -1008,7 +1008,7 @@ TEST_F(MtHttpClientTest, twoByFourByTwo) {
 }
 
 // Multi-threaded with four threads, four batches, two listeners
-TEST_F(MtHttpClientTest, fourByFourByTwo) {
+TEST_F(MultiThreadingHttpClientTest, fourByFourByTwo) {
     size_t num_threads = 4;
     size_t num_batches = 4;
     size_t num_listeners = 2;
@@ -1017,7 +1017,7 @@ TEST_F(MtHttpClientTest, fourByFourByTwo) {
 
 // Verifies that we can cleanly pause, resume, and shutdown while doing
 // multi-threaded work.
-TEST_F(MtHttpClientTest, workPauseResumeShutdown) {
+TEST_F(MultiThreadingHttpClientTest, workPauseResumeShutdown) {
     size_t num_threads = 4;
     size_t num_batches = 4;
     size_t num_listeners = 4;
index b077d55ec2f2a8d20976c652b76d78b6a787d0ed..9093873158c1ab6d363419c79e5229b01068559f 100644 (file)
@@ -129,19 +129,30 @@ MultiThreadingMgr::apply(bool enabled, uint32_t thread_count, uint32_t queue_siz
 void
 MultiThreadingMgr::stopProcessing() {
     if (getMode() && !isInCriticalSectionInternal()) {
-        if (getThreadPoolSize()) {
-            thread_pool_.stop();
-        }
-
+        // First call the registered callback for entering the critical section
+        // so that if any exception is thrown, there is no need to stop and then
+        // start the service threads.
         for (const auto& cb : cs_callbacks_.getCallbackPairs()) {
             try {
                 (cb.entry_cb_)();
+            } catch (const isc::MultiThreadingInvalidOperation& ex) {
+                // If any registered callback throws, the exception needs to be
+                // propagated to the caller of the
+                // @ref MultiThreadingCriticalSection constructor.
+                // Because this function is called by the
+                // @ref MultiThreadingCriticalSection constructor, throwing here
+                // is safe.
+                throw;
             } catch (...) {
                 // We can't log it and throwing could be chaos.
                 // We'll swallow it and tell people their callbacks
                 // must be exception-proof
             }
         }
+
+        if (getThreadPoolSize()) {
+            thread_pool_.stop();
+        }
     }
 }
 
@@ -159,6 +170,9 @@ MultiThreadingMgr::startProcessing() {
                 // We can't log it and throwing could be chaos.
                 // We'll swallow it and tell people their callbacks
                 // must be exception-proof
+                // Because this function is called by the
+                // @ref MultiThreadingCriticalSection destructor, throwing here
+                // is not safe and will cause the process to crash.
             }
         }
     }
index d9095d30432d10a257db391eda9331045eff049d..9c636c9e85f42b2d8507a1c67f3a96603f0549aa 100644 (file)
@@ -60,10 +60,10 @@ public:
     /// @brief task function which tries to stop the thread pool and then calls
     /// @ref runAndWait
     void runStopResetAndWait(ThreadPool<CallBack>* thread_pool) {
-        EXPECT_THROW(thread_pool->stop(), InvalidOperation);
-        EXPECT_THROW(thread_pool->reset(), InvalidOperation);
-        EXPECT_THROW(thread_pool->wait(), InvalidOperation);
-        EXPECT_THROW(thread_pool->wait(0), InvalidOperation);
+        EXPECT_THROW(thread_pool->stop(), MultiThreadingInvalidOperation);
+        EXPECT_THROW(thread_pool->reset(), MultiThreadingInvalidOperation);
+        EXPECT_THROW(thread_pool->wait(), MultiThreadingInvalidOperation);
+        EXPECT_THROW(thread_pool->wait(0), MultiThreadingInvalidOperation);
         sigset_t nsset;
         pthread_sigmask(SIG_SETMASK, 0, &nsset);
         EXPECT_EQ(1, sigismember(&nsset, SIGCHLD));
index 7313ea73f0e1ab6fc4f3ae2d692dc7f9a1d5d5bf..0136a957926732a1629b2d285aee4b6960313c06 100644 (file)
@@ -120,7 +120,7 @@ struct ThreadPool {
     void wait() {
         auto id = std::this_thread::get_id();
         if (checkThreadId(id)) {
-            isc_throw(InvalidOperation, "thread pool stop called by owned thread");
+            isc_throw(MultiThreadingInvalidOperation, "thread pool stop called by owned thread");
         }
         queue_.wait();
     }
@@ -135,7 +135,7 @@ struct ThreadPool {
     bool wait(uint32_t seconds) {
         auto id = std::this_thread::get_id();
         if (checkThreadId(id)) {
-            isc_throw(InvalidOperation, "thread pool stop called by owned thread");
+            isc_throw(MultiThreadingInvalidOperation, "thread pool stop called by owned thread");
         }
         return (queue_.wait(seconds));
     }
@@ -203,7 +203,7 @@ private:
     void stopInternal() {
         auto id = std::this_thread::get_id();
         if (checkThreadId(id)) {
-            isc_throw(InvalidOperation, "thread pool stop called by owned thread");
+            isc_throw(MultiThreadingInvalidOperation, "thread pool stop called by owned thread");
         }
         queue_.disable();
         for (auto thread : threads_) {