From: Razvan Becheriu Date: Mon, 23 Aug 2021 13:16:57 +0000 (+0300) Subject: [#2043] safety measures for illegal opetations on thread pool done by owned threads X-Git-Tag: Kea-1.9.11~28 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=b7a6377dcf4376fdda08eeec7e0fe4a6993fb2ce;p=thirdparty%2Fkea.git [#2043] safety measures for illegal opetations on thread pool done by owned threads --- diff --git a/src/hooks/dhcp/high_availability/ha_messages.cc b/src/hooks/dhcp/high_availability/ha_messages.cc index c360fd781d..4a8ebe1ce7 100644 --- a/src/hooks/dhcp/high_availability/ha_messages.cc +++ b/src/hooks/dhcp/high_availability/ha_messages.cc @@ -85,6 +85,7 @@ extern const isc::log::MessageID HA_MAINTENANCE_STARTED_IN_PARTNER_DOWN = "HA_MA extern const isc::log::MessageID HA_MAINTENANCE_START_HANDLER_FAILED = "HA_MAINTENANCE_START_HANDLER_FAILED"; extern const isc::log::MessageID HA_MISSING_CONFIGURATION = "HA_MISSING_CONFIGURATION"; extern const isc::log::MessageID HA_PAUSE_CLIENT_LISTENER_FAILED = "HA_PAUSE_CLIENT_LISTENER_FAILED"; +extern const isc::log::MessageID HA_PAUSE_CLIENT_LISTENER_ILLEGAL = "HA_PAUSE_CLIENT_LISTENER_ILLEGAL"; extern const isc::log::MessageID HA_RESET_COMMUNICATIONS_FAILED = "HA_RESET_COMMUNICATIONS_FAILED"; extern const isc::log::MessageID HA_RESET_FAILED = "HA_RESET_FAILED"; extern const isc::log::MessageID HA_RESET_HANDLER_FAILED = "HA_RESET_HANDLER_FAILED"; @@ -186,6 +187,7 @@ const char* values[] = { "HA_MAINTENANCE_START_HANDLER_FAILED", "ha-maintenance-start command failed: %1", "HA_MISSING_CONFIGURATION", "high-availability parameter not specified for High Availability hooks library", "HA_PAUSE_CLIENT_LISTENER_FAILED", "Pausing multi-threaded HTTP processing failed: %1", + "HA_PAUSE_CLIENT_LISTENER_ILLEGAL", "Pausing multi-threaded HTTP processing failed: %1", "HA_RESET_COMMUNICATIONS_FAILED", "failed to send ha-reset command to %1: %2", "HA_RESET_FAILED", "failed to reset HA state machine of %1: %2", "HA_RESET_HANDLER_FAILED", "ha-reset command failed: %1", diff --git a/src/hooks/dhcp/high_availability/ha_messages.h b/src/hooks/dhcp/high_availability/ha_messages.h index 78ae6fb2ed..942eb0fc4c 100644 --- a/src/hooks/dhcp/high_availability/ha_messages.h +++ b/src/hooks/dhcp/high_availability/ha_messages.h @@ -86,6 +86,7 @@ extern const isc::log::MessageID HA_MAINTENANCE_STARTED_IN_PARTNER_DOWN; extern const isc::log::MessageID HA_MAINTENANCE_START_HANDLER_FAILED; extern const isc::log::MessageID HA_MISSING_CONFIGURATION; extern const isc::log::MessageID HA_PAUSE_CLIENT_LISTENER_FAILED; +extern const isc::log::MessageID HA_PAUSE_CLIENT_LISTENER_ILLEGAL; extern const isc::log::MessageID HA_RESET_COMMUNICATIONS_FAILED; extern const isc::log::MessageID HA_RESET_FAILED; extern const isc::log::MessageID HA_RESET_HANDLER_FAILED; diff --git a/src/hooks/dhcp/high_availability/ha_messages.mes b/src/hooks/dhcp/high_availability/ha_messages.mes index 513c410dd3..7712c42567 100644 --- a/src/hooks/dhcp/high_availability/ha_messages.mes +++ b/src/hooks/dhcp/high_availability/ha_messages.mes @@ -486,6 +486,12 @@ This error message is emitted when attempting to pause HA's HTTP client and listener threads. This error is highly unlikely and indicates a programmatic issue that should be reported as a defect. +% HA_PAUSE_CLIENT_LISTENER_ILLEGAL Pausing multi-threaded HTTP processing failed: %1 +This error message is emitted when attempting to pause HA's HTTP client and +listener threads from owned thread. This error indicates that a command run on +the listener threads is trying to use a critical section which would result in +a dead-lock. + % HA_RESET_COMMUNICATIONS_FAILED failed to send ha-reset command to %1: %2 This warning message indicates a problem with communication with a HA peer while sending the ha-reset command. The first argument specifies a remote diff --git a/src/hooks/dhcp/high_availability/ha_service.cc b/src/hooks/dhcp/high_availability/ha_service.cc index 387d31b88a..9e3042360c 100644 --- a/src/hooks/dhcp/high_availability/ha_service.cc +++ b/src/hooks/dhcp/high_availability/ha_service.cc @@ -1268,9 +1268,9 @@ HAService::updatePendingRequestInternal(QueryPtrType& query) { template void HAService::asyncSendLeaseUpdate(const QueryPtrType& query, - const HAConfig::PeerConfigPtr& config, - const ConstElementPtr& command, - const ParkingLotHandlePtr& parking_lot) { + const HAConfig::PeerConfigPtr& config, + const ConstElementPtr& command, + const ParkingLotHandlePtr& parking_lot) { // Create HTTP/1.1 request including our command. PostHttpRequestJsonPtr request = boost::make_shared (HttpRequest::Method::HTTP_POST, "/", HttpVersion::HTTP_11(), @@ -2834,14 +2834,25 @@ HAService::pauseClientAndListener() { // Since we're used as CS callback we need to suppress // any exceptions, unlikely though they may be. try { + // The listener is the only one handling commands, so if any command + // uses @ref MultiThreadingCriticalSection, it should throw first. + // In this situation there is no need to resume the client's + // @ref HttpThreadPool because it does not get paused in the first place. + if (listener_) { + listener_->pause(); + } + if (client_) { client_->pause(); } - if (listener_) { - listener_->pause(); - } - } catch (std::exception& ex) { + } catch (const isc::MultiThreadingInvalidOperation& ex) { + LOG_ERROR(ha_logger, HA_PAUSE_CLIENT_LISTENER_ILLEGAL) + .arg(ex.what()); + // The exception needs to be propagated to the caller of the + // @ref MultiThreadingCriticalSection constructor. + throw; + } catch (const std::exception& ex) { LOG_ERROR(ha_logger, HA_PAUSE_CLIENT_LISTENER_FAILED) .arg(ex.what()); } @@ -2870,13 +2881,13 @@ HAService::stopClientAndListener() { // Remove critical section callbacks. MultiThreadingMgr::instance().removeCriticalSectionCallbacks("HA_MT"); - if (client_) { - client_->stop(); - } - if (listener_) { listener_->stop(); } + + if (client_) { + client_->stop(); + } } // Explicit instantiations. diff --git a/src/lib/config/tests/cmd_http_listener_unittests.cc b/src/lib/config/tests/cmd_http_listener_unittests.cc index 32b4b5ecaf..55b2fbf25c 100644 --- a/src/lib/config/tests/cmd_http_listener_unittests.cc +++ b/src/lib/config/tests/cmd_http_listener_unittests.cc @@ -241,6 +241,10 @@ public: const ConstElementPtr& /*command_arguments*/) { ElementPtr arguments = Element::createList(); arguments->add(Element::create("bar")); + EXPECT_THROW(listener_->start(), InvalidOperation); + EXPECT_THROW(listener_->pause(), MultiThreadingInvalidOperation); + EXPECT_THROW(listener_->resume(), MultiThreadingInvalidOperation); + EXPECT_THROW(listener_->stop(), MultiThreadingInvalidOperation); return (createAnswer(CONTROL_RESULT_SUCCESS, arguments)); } diff --git a/src/lib/exceptions/exceptions.h b/src/lib/exceptions/exceptions.h index f5368c8458..47adf96d9d 100644 --- a/src/lib/exceptions/exceptions.h +++ b/src/lib/exceptions/exceptions.h @@ -178,6 +178,14 @@ public: isc::Exception(file, line, what) {} }; +/// \brief Exception thrown when an owned thread is trying to stop or pause the +/// respective thread pool (which would result in a dead-lock). +class MultiThreadingInvalidOperation : public Exception { +public: + MultiThreadingInvalidOperation(const char* file, size_t line, const char* what) : + isc::Exception(file, line, what) {}; +}; + /// /// A shortcut macro to insert known values into exception arguments. /// diff --git a/src/lib/http/http_thread_pool.cc b/src/lib/http/http_thread_pool.cc index 7e649f5f39..dfa995d677 100644 --- a/src/lib/http/http_thread_pool.cc +++ b/src/lib/http/http_thread_pool.cc @@ -85,12 +85,40 @@ HttpThreadPool::validateStateChange(State new_state) const { case State::PAUSED: return (new_state != State::PAUSED); } + return (false); +} +std::string +HttpThreadPool::stateToText(State state) { + switch (state) { + case State::STOPPED: + return (std::string("stopped")); + case State::RUNNING: + return (std::string("running")); + case State::PAUSED: + return (std::string("paused")); + } + return (std::string("unknown-state")); +} + +bool +HttpThreadPool::checkThreadId(std::thread::id id) { + for (auto thread : threads_) { + if (id == thread->get_id()) { + return (true); + } + } return (false); } void HttpThreadPool::setState(State new_state) { + auto id = std::this_thread::get_id(); + if (checkThreadId(id)) { + isc_throw(MultiThreadingInvalidOperation, "invalid thread pool state change to " + << HttpThreadPool::stateToText(new_state) << " performed by owned thread"); + } + std::unique_lock main_lck(mutex_); // Bail if the transition is invalid. diff --git a/src/lib/http/http_thread_pool.h b/src/lib/http/http_thread_pool.h index dddb43be33..1096e77826 100644 --- a/src/lib/http/http_thread_pool.h +++ b/src/lib/http/http_thread_pool.h @@ -93,6 +93,11 @@ public: } private: + /// @brief Check specified thread id against own threads. + /// + /// @return true if thread is owned, false otherwise. + bool checkThreadId(std::thread::id id); + /// @brief Thread-safe change of the pool's run state. /// /// Transitions a pool from one run state to another: @@ -139,6 +144,12 @@ private: /// @note Must be called from a thread-safe context. bool validateStateChange(State state) const; + /// @brief Text representation of a given state. + /// + /// @param state The state for the pool. + /// @return The text representation of a given state. + static std::string stateToText(State state); + /// @brief Work function executed by each thread in the pool. /// /// Implements the run state responsibilities for a given thread. diff --git a/src/lib/http/tests/client_mt_unittests.cc b/src/lib/http/tests/client_mt_unittests.cc index df09d1ad3e..d5d8cfc996 100644 --- a/src/lib/http/tests/client_mt_unittests.cc +++ b/src/lib/http/tests/client_mt_unittests.cc @@ -191,22 +191,22 @@ public: }; /// @brief Test fixture class for testing threading modes of HTTP client. -class MtHttpClientTest : public ::testing::Test { +class MultiThreadingHttpClientTest : public ::testing::Test { public: /// @brief Constructor. - MtHttpClientTest() + MultiThreadingHttpClientTest() : io_service_(), client_(), listener_(), factory_(), listeners_(), factories_(), test_timer_(io_service_), num_threads_(0), num_batches_(0), num_listeners_(0), expected_requests_(0), num_in_progress_(0), num_finished_(0), paused_(false), pause_cnt_(0) { - test_timer_.setup(std::bind(&MtHttpClientTest::timeoutHandler, this, true), + test_timer_.setup(std::bind(&MultiThreadingHttpClientTest::timeoutHandler, this, true), TEST_TIMEOUT, IntervalTimer::ONE_SHOT); MultiThreadingMgr::instance().setMode(true); } /// @brief Destructor. - ~MtHttpClientTest() { + ~MultiThreadingHttpClientTest() { // Stop the client. if (client_) { client_->stop(); @@ -811,7 +811,7 @@ public: // Verifies we can construct and destruct, in both single // and multi-threaded modes. -TEST_F(MtHttpClientTest, basics) { +TEST_F(MultiThreadingHttpClientTest, basics) { MultiThreadingMgr::instance().setMode(false); HttpClientPtr client; @@ -879,7 +879,7 @@ TEST_F(MtHttpClientTest, basics) { } // Verifies we can construct with deferred start. -TEST_F(MtHttpClientTest, deferredStart) { +TEST_F(MultiThreadingHttpClientTest, deferredStart) { MultiThreadingMgr::instance().setMode(true); HttpClientPtr client; size_t thread_pool_size = 3; @@ -923,7 +923,7 @@ TEST_F(MtHttpClientTest, deferredStart) { } // Verifies we can restart after stop. -TEST_F(MtHttpClientTest, restartAfterStop) { +TEST_F(MultiThreadingHttpClientTest, restartAfterStop) { MultiThreadingMgr::instance().setMode(true); HttpClientPtr client; size_t thread_pool_size = 3; @@ -964,14 +964,14 @@ TEST_F(MtHttpClientTest, restartAfterStop) { // requests, and listeners. // Single-threaded, three batches, one listener. -TEST_F(MtHttpClientTest, zeroByThreeByOne) { +TEST_F(MultiThreadingHttpClientTest, zeroByThreeByOne) { size_t num_threads = 0; // Zero threads = ST mode. size_t num_batches = 3; threadRequestAndReceive(num_threads, num_batches); } // Single-threaded, three batches, three listeners. -TEST_F(MtHttpClientTest, zeroByThreeByThree) { +TEST_F(MultiThreadingHttpClientTest, zeroByThreeByThree) { size_t num_threads = 0; // Zero threads = ST mode. size_t num_batches = 3; size_t num_listeners = 3; @@ -979,28 +979,28 @@ TEST_F(MtHttpClientTest, zeroByThreeByThree) { } // Multi-threaded with one thread, three batches, one listener -TEST_F(MtHttpClientTest, oneByThreeByOne) { +TEST_F(MultiThreadingHttpClientTest, oneByThreeByOne) { size_t num_threads = 1; size_t num_batches = 3; threadRequestAndReceive(num_threads, num_batches); } // Multi-threaded with three threads, three batches, one listener -TEST_F(MtHttpClientTest, threeByThreeByOne) { +TEST_F(MultiThreadingHttpClientTest, threeByThreeByOne) { size_t num_threads = 3; size_t num_batches = 3; threadRequestAndReceive(num_threads, num_batches); } // Multi-threaded with three threads, nine batches, one listener -TEST_F(MtHttpClientTest, threeByNineByOne) { +TEST_F(MultiThreadingHttpClientTest, threeByNineByOne) { size_t num_threads = 3; size_t num_batches = 9; threadRequestAndReceive(num_threads, num_batches); } // Multi-threaded with two threads, four batches, two listeners -TEST_F(MtHttpClientTest, twoByFourByTwo) { +TEST_F(MultiThreadingHttpClientTest, twoByFourByTwo) { size_t num_threads = 2; size_t num_batches = 4; size_t num_listeners = 2; @@ -1008,7 +1008,7 @@ TEST_F(MtHttpClientTest, twoByFourByTwo) { } // Multi-threaded with four threads, four batches, two listeners -TEST_F(MtHttpClientTest, fourByFourByTwo) { +TEST_F(MultiThreadingHttpClientTest, fourByFourByTwo) { size_t num_threads = 4; size_t num_batches = 4; size_t num_listeners = 2; @@ -1017,7 +1017,7 @@ TEST_F(MtHttpClientTest, fourByFourByTwo) { // Verifies that we can cleanly pause, resume, and shutdown while doing // multi-threaded work. -TEST_F(MtHttpClientTest, workPauseResumeShutdown) { +TEST_F(MultiThreadingHttpClientTest, workPauseResumeShutdown) { size_t num_threads = 4; size_t num_batches = 4; size_t num_listeners = 4; diff --git a/src/lib/util/multi_threading_mgr.cc b/src/lib/util/multi_threading_mgr.cc index b077d55ec2..9093873158 100644 --- a/src/lib/util/multi_threading_mgr.cc +++ b/src/lib/util/multi_threading_mgr.cc @@ -129,19 +129,30 @@ MultiThreadingMgr::apply(bool enabled, uint32_t thread_count, uint32_t queue_siz void MultiThreadingMgr::stopProcessing() { if (getMode() && !isInCriticalSectionInternal()) { - if (getThreadPoolSize()) { - thread_pool_.stop(); - } - + // First call the registered callback for entering the critical section + // so that if any exception is thrown, there is no need to stop and then + // start the service threads. for (const auto& cb : cs_callbacks_.getCallbackPairs()) { try { (cb.entry_cb_)(); + } catch (const isc::MultiThreadingInvalidOperation& ex) { + // If any registered callback throws, the exception needs to be + // propagated to the caller of the + // @ref MultiThreadingCriticalSection constructor. + // Because this function is called by the + // @ref MultiThreadingCriticalSection constructor, throwing here + // is safe. + throw; } catch (...) { // We can't log it and throwing could be chaos. // We'll swallow it and tell people their callbacks // must be exception-proof } } + + if (getThreadPoolSize()) { + thread_pool_.stop(); + } } } @@ -159,6 +170,9 @@ MultiThreadingMgr::startProcessing() { // We can't log it and throwing could be chaos. // We'll swallow it and tell people their callbacks // must be exception-proof + // Because this function is called by the + // @ref MultiThreadingCriticalSection destructor, throwing here + // is not safe and will cause the process to crash. } } } diff --git a/src/lib/util/tests/thread_pool_unittest.cc b/src/lib/util/tests/thread_pool_unittest.cc index d9095d3043..9c636c9e85 100644 --- a/src/lib/util/tests/thread_pool_unittest.cc +++ b/src/lib/util/tests/thread_pool_unittest.cc @@ -60,10 +60,10 @@ public: /// @brief task function which tries to stop the thread pool and then calls /// @ref runAndWait void runStopResetAndWait(ThreadPool* thread_pool) { - EXPECT_THROW(thread_pool->stop(), InvalidOperation); - EXPECT_THROW(thread_pool->reset(), InvalidOperation); - EXPECT_THROW(thread_pool->wait(), InvalidOperation); - EXPECT_THROW(thread_pool->wait(0), InvalidOperation); + EXPECT_THROW(thread_pool->stop(), MultiThreadingInvalidOperation); + EXPECT_THROW(thread_pool->reset(), MultiThreadingInvalidOperation); + EXPECT_THROW(thread_pool->wait(), MultiThreadingInvalidOperation); + EXPECT_THROW(thread_pool->wait(0), MultiThreadingInvalidOperation); sigset_t nsset; pthread_sigmask(SIG_SETMASK, 0, &nsset); EXPECT_EQ(1, sigismember(&nsset, SIGCHLD)); diff --git a/src/lib/util/thread_pool.h b/src/lib/util/thread_pool.h index 7313ea73f0..0136a95792 100644 --- a/src/lib/util/thread_pool.h +++ b/src/lib/util/thread_pool.h @@ -120,7 +120,7 @@ struct ThreadPool { void wait() { auto id = std::this_thread::get_id(); if (checkThreadId(id)) { - isc_throw(InvalidOperation, "thread pool stop called by owned thread"); + isc_throw(MultiThreadingInvalidOperation, "thread pool stop called by owned thread"); } queue_.wait(); } @@ -135,7 +135,7 @@ struct ThreadPool { bool wait(uint32_t seconds) { auto id = std::this_thread::get_id(); if (checkThreadId(id)) { - isc_throw(InvalidOperation, "thread pool stop called by owned thread"); + isc_throw(MultiThreadingInvalidOperation, "thread pool stop called by owned thread"); } return (queue_.wait(seconds)); } @@ -203,7 +203,7 @@ private: void stopInternal() { auto id = std::this_thread::get_id(); if (checkThreadId(id)) { - isc_throw(InvalidOperation, "thread pool stop called by owned thread"); + isc_throw(MultiThreadingInvalidOperation, "thread pool stop called by owned thread"); } queue_.disable(); for (auto thread : threads_) {