extern const isc::log::MessageID HA_MAINTENANCE_START_HANDLER_FAILED = "HA_MAINTENANCE_START_HANDLER_FAILED";
extern const isc::log::MessageID HA_MISSING_CONFIGURATION = "HA_MISSING_CONFIGURATION";
extern const isc::log::MessageID HA_PAUSE_CLIENT_LISTENER_FAILED = "HA_PAUSE_CLIENT_LISTENER_FAILED";
+extern const isc::log::MessageID HA_PAUSE_CLIENT_LISTENER_ILLEGAL = "HA_PAUSE_CLIENT_LISTENER_ILLEGAL";
extern const isc::log::MessageID HA_RESET_COMMUNICATIONS_FAILED = "HA_RESET_COMMUNICATIONS_FAILED";
extern const isc::log::MessageID HA_RESET_FAILED = "HA_RESET_FAILED";
extern const isc::log::MessageID HA_RESET_HANDLER_FAILED = "HA_RESET_HANDLER_FAILED";
"HA_MAINTENANCE_START_HANDLER_FAILED", "ha-maintenance-start command failed: %1",
"HA_MISSING_CONFIGURATION", "high-availability parameter not specified for High Availability hooks library",
"HA_PAUSE_CLIENT_LISTENER_FAILED", "Pausing multi-threaded HTTP processing failed: %1",
+ "HA_PAUSE_CLIENT_LISTENER_ILLEGAL", "Pausing multi-threaded HTTP processing failed: %1",
"HA_RESET_COMMUNICATIONS_FAILED", "failed to send ha-reset command to %1: %2",
"HA_RESET_FAILED", "failed to reset HA state machine of %1: %2",
"HA_RESET_HANDLER_FAILED", "ha-reset command failed: %1",
extern const isc::log::MessageID HA_MAINTENANCE_START_HANDLER_FAILED;
extern const isc::log::MessageID HA_MISSING_CONFIGURATION;
extern const isc::log::MessageID HA_PAUSE_CLIENT_LISTENER_FAILED;
+extern const isc::log::MessageID HA_PAUSE_CLIENT_LISTENER_ILLEGAL;
extern const isc::log::MessageID HA_RESET_COMMUNICATIONS_FAILED;
extern const isc::log::MessageID HA_RESET_FAILED;
extern const isc::log::MessageID HA_RESET_HANDLER_FAILED;
listener threads. This error is highly unlikely and indicates a programmatic
issue that should be reported as a defect.
+% HA_PAUSE_CLIENT_LISTENER_ILLEGAL Pausing multi-threaded HTTP processing failed: %1
+This error message is emitted when attempting to pause HA's HTTP client and
+listener threads from owned thread. This error indicates that a command run on
+the listener threads is trying to use a critical section which would result in
+a dead-lock.
+
% HA_RESET_COMMUNICATIONS_FAILED failed to send ha-reset command to %1: %2
This warning message indicates a problem with communication with a HA peer
while sending the ha-reset command. The first argument specifies a remote
template<typename QueryPtrType>
void
HAService::asyncSendLeaseUpdate(const QueryPtrType& query,
- const HAConfig::PeerConfigPtr& config,
- const ConstElementPtr& command,
- const ParkingLotHandlePtr& parking_lot) {
+ const HAConfig::PeerConfigPtr& config,
+ const ConstElementPtr& command,
+ const ParkingLotHandlePtr& parking_lot) {
// Create HTTP/1.1 request including our command.
PostHttpRequestJsonPtr request = boost::make_shared<PostHttpRequestJson>
(HttpRequest::Method::HTTP_POST, "/", HttpVersion::HTTP_11(),
// Since we're used as CS callback we need to suppress
// any exceptions, unlikely though they may be.
try {
+ // The listener is the only one handling commands, so if any command
+ // uses @ref MultiThreadingCriticalSection, it should throw first.
+ // In this situation there is no need to resume the client's
+ // @ref HttpThreadPool because it does not get paused in the first place.
+ if (listener_) {
+ listener_->pause();
+ }
+
if (client_) {
client_->pause();
}
- if (listener_) {
- listener_->pause();
- }
- } catch (std::exception& ex) {
+ } catch (const isc::MultiThreadingInvalidOperation& ex) {
+ LOG_ERROR(ha_logger, HA_PAUSE_CLIENT_LISTENER_ILLEGAL)
+ .arg(ex.what());
+ // The exception needs to be propagated to the caller of the
+ // @ref MultiThreadingCriticalSection constructor.
+ throw;
+ } catch (const std::exception& ex) {
LOG_ERROR(ha_logger, HA_PAUSE_CLIENT_LISTENER_FAILED)
.arg(ex.what());
}
// Remove critical section callbacks.
MultiThreadingMgr::instance().removeCriticalSectionCallbacks("HA_MT");
- if (client_) {
- client_->stop();
- }
-
if (listener_) {
listener_->stop();
}
+
+ if (client_) {
+ client_->stop();
+ }
}
// Explicit instantiations.
const ConstElementPtr& /*command_arguments*/) {
ElementPtr arguments = Element::createList();
arguments->add(Element::create("bar"));
+ EXPECT_THROW(listener_->start(), InvalidOperation);
+ EXPECT_THROW(listener_->pause(), MultiThreadingInvalidOperation);
+ EXPECT_THROW(listener_->resume(), MultiThreadingInvalidOperation);
+ EXPECT_THROW(listener_->stop(), MultiThreadingInvalidOperation);
return (createAnswer(CONTROL_RESULT_SUCCESS, arguments));
}
isc::Exception(file, line, what) {}
};
+/// \brief Exception thrown when an owned thread is trying to stop or pause the
+/// respective thread pool (which would result in a dead-lock).
+class MultiThreadingInvalidOperation : public Exception {
+public:
+ MultiThreadingInvalidOperation(const char* file, size_t line, const char* what) :
+ isc::Exception(file, line, what) {};
+};
+
///
/// A shortcut macro to insert known values into exception arguments.
///
case State::PAUSED:
return (new_state != State::PAUSED);
}
+ return (false);
+}
+std::string
+HttpThreadPool::stateToText(State state) {
+ switch (state) {
+ case State::STOPPED:
+ return (std::string("stopped"));
+ case State::RUNNING:
+ return (std::string("running"));
+ case State::PAUSED:
+ return (std::string("paused"));
+ }
+ return (std::string("unknown-state"));
+}
+
+bool
+HttpThreadPool::checkThreadId(std::thread::id id) {
+ for (auto thread : threads_) {
+ if (id == thread->get_id()) {
+ return (true);
+ }
+ }
return (false);
}
void
HttpThreadPool::setState(State new_state) {
+ auto id = std::this_thread::get_id();
+ if (checkThreadId(id)) {
+ isc_throw(MultiThreadingInvalidOperation, "invalid thread pool state change to "
+ << HttpThreadPool::stateToText(new_state) << " performed by owned thread");
+ }
+
std::unique_lock<std::mutex> main_lck(mutex_);
// Bail if the transition is invalid.
}
private:
+ /// @brief Check specified thread id against own threads.
+ ///
+ /// @return true if thread is owned, false otherwise.
+ bool checkThreadId(std::thread::id id);
+
/// @brief Thread-safe change of the pool's run state.
///
/// Transitions a pool from one run state to another:
/// @note Must be called from a thread-safe context.
bool validateStateChange(State state) const;
+ /// @brief Text representation of a given state.
+ ///
+ /// @param state The state for the pool.
+ /// @return The text representation of a given state.
+ static std::string stateToText(State state);
+
/// @brief Work function executed by each thread in the pool.
///
/// Implements the run state responsibilities for a given thread.
};
/// @brief Test fixture class for testing threading modes of HTTP client.
-class MtHttpClientTest : public ::testing::Test {
+class MultiThreadingHttpClientTest : public ::testing::Test {
public:
/// @brief Constructor.
- MtHttpClientTest()
+ MultiThreadingHttpClientTest()
: io_service_(), client_(), listener_(), factory_(), listeners_(), factories_(),
test_timer_(io_service_), num_threads_(0), num_batches_(0), num_listeners_(0),
expected_requests_(0), num_in_progress_(0), num_finished_(0), paused_(false),
pause_cnt_(0) {
- test_timer_.setup(std::bind(&MtHttpClientTest::timeoutHandler, this, true),
+ test_timer_.setup(std::bind(&MultiThreadingHttpClientTest::timeoutHandler, this, true),
TEST_TIMEOUT, IntervalTimer::ONE_SHOT);
MultiThreadingMgr::instance().setMode(true);
}
/// @brief Destructor.
- ~MtHttpClientTest() {
+ ~MultiThreadingHttpClientTest() {
// Stop the client.
if (client_) {
client_->stop();
// Verifies we can construct and destruct, in both single
// and multi-threaded modes.
-TEST_F(MtHttpClientTest, basics) {
+TEST_F(MultiThreadingHttpClientTest, basics) {
MultiThreadingMgr::instance().setMode(false);
HttpClientPtr client;
}
// Verifies we can construct with deferred start.
-TEST_F(MtHttpClientTest, deferredStart) {
+TEST_F(MultiThreadingHttpClientTest, deferredStart) {
MultiThreadingMgr::instance().setMode(true);
HttpClientPtr client;
size_t thread_pool_size = 3;
}
// Verifies we can restart after stop.
-TEST_F(MtHttpClientTest, restartAfterStop) {
+TEST_F(MultiThreadingHttpClientTest, restartAfterStop) {
MultiThreadingMgr::instance().setMode(true);
HttpClientPtr client;
size_t thread_pool_size = 3;
// requests, and listeners.
// Single-threaded, three batches, one listener.
-TEST_F(MtHttpClientTest, zeroByThreeByOne) {
+TEST_F(MultiThreadingHttpClientTest, zeroByThreeByOne) {
size_t num_threads = 0; // Zero threads = ST mode.
size_t num_batches = 3;
threadRequestAndReceive(num_threads, num_batches);
}
// Single-threaded, three batches, three listeners.
-TEST_F(MtHttpClientTest, zeroByThreeByThree) {
+TEST_F(MultiThreadingHttpClientTest, zeroByThreeByThree) {
size_t num_threads = 0; // Zero threads = ST mode.
size_t num_batches = 3;
size_t num_listeners = 3;
}
// Multi-threaded with one thread, three batches, one listener
-TEST_F(MtHttpClientTest, oneByThreeByOne) {
+TEST_F(MultiThreadingHttpClientTest, oneByThreeByOne) {
size_t num_threads = 1;
size_t num_batches = 3;
threadRequestAndReceive(num_threads, num_batches);
}
// Multi-threaded with three threads, three batches, one listener
-TEST_F(MtHttpClientTest, threeByThreeByOne) {
+TEST_F(MultiThreadingHttpClientTest, threeByThreeByOne) {
size_t num_threads = 3;
size_t num_batches = 3;
threadRequestAndReceive(num_threads, num_batches);
}
// Multi-threaded with three threads, nine batches, one listener
-TEST_F(MtHttpClientTest, threeByNineByOne) {
+TEST_F(MultiThreadingHttpClientTest, threeByNineByOne) {
size_t num_threads = 3;
size_t num_batches = 9;
threadRequestAndReceive(num_threads, num_batches);
}
// Multi-threaded with two threads, four batches, two listeners
-TEST_F(MtHttpClientTest, twoByFourByTwo) {
+TEST_F(MultiThreadingHttpClientTest, twoByFourByTwo) {
size_t num_threads = 2;
size_t num_batches = 4;
size_t num_listeners = 2;
}
// Multi-threaded with four threads, four batches, two listeners
-TEST_F(MtHttpClientTest, fourByFourByTwo) {
+TEST_F(MultiThreadingHttpClientTest, fourByFourByTwo) {
size_t num_threads = 4;
size_t num_batches = 4;
size_t num_listeners = 2;
// Verifies that we can cleanly pause, resume, and shutdown while doing
// multi-threaded work.
-TEST_F(MtHttpClientTest, workPauseResumeShutdown) {
+TEST_F(MultiThreadingHttpClientTest, workPauseResumeShutdown) {
size_t num_threads = 4;
size_t num_batches = 4;
size_t num_listeners = 4;
void
MultiThreadingMgr::stopProcessing() {
if (getMode() && !isInCriticalSectionInternal()) {
- if (getThreadPoolSize()) {
- thread_pool_.stop();
- }
-
+ // First call the registered callback for entering the critical section
+ // so that if any exception is thrown, there is no need to stop and then
+ // start the service threads.
for (const auto& cb : cs_callbacks_.getCallbackPairs()) {
try {
(cb.entry_cb_)();
+ } catch (const isc::MultiThreadingInvalidOperation& ex) {
+ // If any registered callback throws, the exception needs to be
+ // propagated to the caller of the
+ // @ref MultiThreadingCriticalSection constructor.
+ // Because this function is called by the
+ // @ref MultiThreadingCriticalSection constructor, throwing here
+ // is safe.
+ throw;
} catch (...) {
// We can't log it and throwing could be chaos.
// We'll swallow it and tell people their callbacks
// must be exception-proof
}
}
+
+ if (getThreadPoolSize()) {
+ thread_pool_.stop();
+ }
}
}
// We can't log it and throwing could be chaos.
// We'll swallow it and tell people their callbacks
// must be exception-proof
+ // Because this function is called by the
+ // @ref MultiThreadingCriticalSection destructor, throwing here
+ // is not safe and will cause the process to crash.
}
}
}
/// @brief task function which tries to stop the thread pool and then calls
/// @ref runAndWait
void runStopResetAndWait(ThreadPool<CallBack>* thread_pool) {
- EXPECT_THROW(thread_pool->stop(), InvalidOperation);
- EXPECT_THROW(thread_pool->reset(), InvalidOperation);
- EXPECT_THROW(thread_pool->wait(), InvalidOperation);
- EXPECT_THROW(thread_pool->wait(0), InvalidOperation);
+ EXPECT_THROW(thread_pool->stop(), MultiThreadingInvalidOperation);
+ EXPECT_THROW(thread_pool->reset(), MultiThreadingInvalidOperation);
+ EXPECT_THROW(thread_pool->wait(), MultiThreadingInvalidOperation);
+ EXPECT_THROW(thread_pool->wait(0), MultiThreadingInvalidOperation);
sigset_t nsset;
pthread_sigmask(SIG_SETMASK, 0, &nsset);
EXPECT_EQ(1, sigismember(&nsset, SIGCHLD));
void wait() {
auto id = std::this_thread::get_id();
if (checkThreadId(id)) {
- isc_throw(InvalidOperation, "thread pool stop called by owned thread");
+ isc_throw(MultiThreadingInvalidOperation, "thread pool stop called by owned thread");
}
queue_.wait();
}
bool wait(uint32_t seconds) {
auto id = std::this_thread::get_id();
if (checkThreadId(id)) {
- isc_throw(InvalidOperation, "thread pool stop called by owned thread");
+ isc_throw(MultiThreadingInvalidOperation, "thread pool stop called by owned thread");
}
return (queue_.wait(seconds));
}
void stopInternal() {
auto id = std::this_thread::get_id();
if (checkThreadId(id)) {
- isc_throw(InvalidOperation, "thread pool stop called by owned thread");
+ isc_throw(MultiThreadingInvalidOperation, "thread pool stop called by owned thread");
}
queue_.disable();
for (auto thread : threads_) {