From: Thomas Markwalder Date: Thu, 29 Sep 2022 14:44:08 +0000 (-0400) Subject: [#2581] Create asiolink::IoServiceThreadPool X-Git-Tag: Kea-2.3.2~111 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=8ccbeb8c3b3d15e7adbb0ea8f17de65042999bc9;p=thirdparty%2Fkea.git [#2581] Create asiolink::IoServiceThreadPool Create IoServiceThreadPool from HttpThreadPool src/lib/asiolink/Makefile.am - added new files src/lib/asiolink/io_service_thread_pool.cc src/lib/asiolink/io_service_thread_pool.h - new files src/lib/asiolink/tests/Makefile.am - added new file src/lib/asiolink/tests/io_service_thread_pool.h - new file --- diff --git a/src/lib/asiolink/Makefile.am b/src/lib/asiolink/Makefile.am index 782358c174..cf4a2e7bf5 100644 --- a/src/lib/asiolink/Makefile.am +++ b/src/lib/asiolink/Makefile.am @@ -30,6 +30,7 @@ libkea_asiolink_la_SOURCES += io_endpoint.cc io_endpoint.h libkea_asiolink_la_SOURCES += io_error.h libkea_asiolink_la_SOURCES += io_service.h io_service.cc libkea_asiolink_la_SOURCES += io_service_signal.cc io_service_signal.h +libkea_asiolink_la_SOURCES += io_service_thread_pool.cc io_service_thread_poo.h libkea_asiolink_la_SOURCES += io_socket.h io_socket.cc libkea_asiolink_la_SOURCES += openssl_tls.h libkea_asiolink_la_SOURCES += process_spawn.h process_spawn.cc @@ -83,6 +84,7 @@ libkea_asiolink_include_HEADERS = \ io_error.h \ io_service.h \ io_service_signal.h \ + io_service_thread_pool.h \ io_socket.h \ openssl_tls.h \ process_spawn.h \ diff --git a/src/lib/asiolink/io_service_thread_pool.cc b/src/lib/asiolink/io_service_thread_pool.cc new file mode 100644 index 0000000000..3f930a46b4 --- /dev/null +++ b/src/lib/asiolink/io_service_thread_pool.cc @@ -0,0 +1,276 @@ +// Copyright (C) 2022 Internet Systems Consortium, Inc. ("ISC") +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include + +#include +#include +#include +#include +#include +#include + +using namespace isc; +using namespace isc::asiolink; +using namespace isc::util; + +IoServiceThreadPool::IoServiceThreadPool(IOServicePtr io_service, size_t pool_size, + bool defer_start /* = false */) + : pool_size_(pool_size), io_service_(io_service), + run_state_(State::STOPPED), mutex_(), thread_cv_(), + main_cv_(), paused_(0), running_(0), exited_(0) { + if (!pool_size) { + isc_throw(BadValue, "pool_size must be non 0"); + } + + // If we weren't given an IOService, create our own. + if (!io_service_) { + io_service_.reset(new IOService()); + } + + // If we're not deferring the start, do it now. + if (!defer_start) { + run(); + } +} + +IoServiceThreadPool::~IoServiceThreadPool() { + stop(); +} + +void +IoServiceThreadPool::run() { + setState(State::RUNNING); +} + +void +IoServiceThreadPool::pause() { + setState(State::PAUSED); +} + +void +IoServiceThreadPool::stop() { + setState(State::STOPPED); +} + +IoServiceThreadPool::State +IoServiceThreadPool::getState() { + std::lock_guard lck(mutex_); + return (run_state_); +} + +bool +IoServiceThreadPool::validateStateChange(State state) const { + switch (run_state_) { + case State::STOPPED: + return (state == State::RUNNING); + case State::RUNNING: + return (state != State::RUNNING); + case State::PAUSED: + return (state != State::PAUSED); + } + return (false); +} + +std::string +IoServiceThreadPool::stateToText(State state) { + switch (state) { + case State::STOPPED: + return (std::string("stopped")); + case State::RUNNING: + return (std::string("running")); + case State::PAUSED: + return (std::string("paused")); + } + return (std::string("unknown-state")); +} + +void +IoServiceThreadPool::checkPausePermissions() { + checkPermissions(State::PAUSED); +} + +void +IoServiceThreadPool::checkPermissions(State state) { + auto id = std::this_thread::get_id(); + if (checkThreadId(id)) { + isc_throw(MultiThreadingInvalidOperation, "invalid thread pool state change to " + << IoServiceThreadPool::stateToText(state) << " performed by worker thread"); + } +} + +bool +IoServiceThreadPool::checkThreadId(std::thread::id id) { + for (auto thread : threads_) { + if (id == thread->get_id()) { + return (true); + } + } + return (false); +} + +void +IoServiceThreadPool::setState(State state) { + checkPermissions(state); + + std::unique_lock main_lck(mutex_); + + // Bail if the transition is invalid. + if (!validateStateChange(state)) { + return; + } + + run_state_ = state; + // Notify threads of state change. + thread_cv_.notify_all(); + + switch (state) { + case State::RUNNING: { + // Restart the IOService. + io_service_->restart(); + + // While we have fewer threads than we should, make more. + while (threads_.size() < pool_size_) { + boost::shared_ptr thread(new std::thread( + std::bind(&IoServiceThreadPool::threadWork, this))); + + // Add thread to the pool. + threads_.push_back(thread); + } + + // Main thread waits here until all threads are running. + main_cv_.wait(main_lck, + [&]() { + return (running_ == threads_.size()); + }); + + exited_ = 0; + break; + } + + case State::PAUSED: { + // Stop IOService. + if (!io_service_->stopped()) { + io_service_->poll(); + io_service_->stop(); + } + + // Main thread waits here until all threads are paused. + main_cv_.wait(main_lck, + [&]() { + return (paused_ == threads_.size()); + }); + + break; + } + + case State::STOPPED: { + // Stop IOService. + if (!io_service_->stopped()) { + io_service_->poll(); + io_service_->stop(); + } + + // Main thread waits here until all threads have exited. + main_cv_.wait(main_lck, + [&]() { + return (exited_ == threads_.size()); + }); + + for (auto const& thread : threads_) { + thread->join(); + } + + threads_.clear(); + break; + }} +} + +void +IoServiceThreadPool::threadWork() { + bool done = false; + while (!done) { + switch (getState()) { + case State::RUNNING: { + { + std::unique_lock lck(mutex_); + running_++; + + // If We're all running notify main thread. + if (running_ == pool_size_) { + main_cv_.notify_all(); + } + } + + // Run the IOService. + io_service_->run(); + + { + std::unique_lock lck(mutex_); + running_--; + } + + break; + } + + case State::PAUSED: { + std::unique_lock lck(mutex_); + paused_++; + + // If we're all paused notify main. + if (paused_ == threads_.size()) { + main_cv_.notify_all(); + } + + // Wait here till I'm released. + thread_cv_.wait(lck, + [&]() { + return (run_state_ != State::PAUSED); + }); + + paused_--; + break; + } + + case State::STOPPED: { + done = true; + break; + }} + } + + std::unique_lock lck(mutex_); + exited_++; + + // If we've all exited, notify main. + if (exited_ == threads_.size()) { + main_cv_.notify_all(); + } +} + +IOServicePtr +IoServiceThreadPool::getIOService() const { + return (io_service_); +} + +uint16_t +IoServiceThreadPool::getPoolSize() const { + return (pool_size_); +} + +uint16_t +IoServiceThreadPool::getThreadCount() const { + return (threads_.size()); +} diff --git a/src/lib/asiolink/io_service_thread_pool.h b/src/lib/asiolink/io_service_thread_pool.h new file mode 100644 index 0000000000..68c4e21b96 --- /dev/null +++ b/src/lib/asiolink/io_service_thread_pool.h @@ -0,0 +1,265 @@ +// Copyright (C) 2022 Internet Systems Consortium, Inc. ("ISC") +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +#ifndef IOSERVICE_THREAD_POOL_H +#define IOSERVICE_THREAD_POOL_H + +#include +#include + +#include + +#include +#include +#include +#include + +namespace isc { +namespace asiolink { + +/// @brief Implements a pausable pool of IOService driven threads. +class IoServiceThreadPool { +public: + /// @brief Describes the possible operational state of the thread pool. + enum class State { + STOPPED, /// Pool is not operational. + RUNNING, /// Pool is populated with running threads. + PAUSED, /// Pool is populated with threads that are paused. + }; + + /// @brief Constructor + /// + /// @param io_service IOService that will drive the pool's IO. If empty, it + /// create its own instance. + /// @param pool_size Maximum number of threads in the pool. Currently the + /// number of threads is fixed at this value. + /// @param defer_start If true, creation of the threads is deferred until + /// a subsequent call to @ref run(). In this case the pool's operational + /// state post construction is STOPPED. If false, the constructor will + /// invoke run() to transition the pool into the RUNNING state. + IoServiceThreadPool(asiolink::IOServicePtr io_service, size_t pool_size, + bool defer_start = false); + + /// @brief Destructor + /// + /// Ensures the thread pool is stopped prior to destruction. + ~IoServiceThreadPool(); + + /// @brief Transitions the pool from STOPPED or PAUSED to RUNNING. + /// + /// When called from the STOPPED state, the pool threads are created and + /// begin processing events. + /// When called from the PAUSED state, the pool threads are released + /// from PAUSED and resume processing events. + /// Has no effect if the pool is already in the RUNNING state. + void run(); + + /// @brief Transitions the pool from RUNNING to PAUSED. + /// + /// Pool threads suspend event processing and pause until they + /// are released to either resume running or stop. + /// Has no effect if the pool is already in the PAUSED or STOPPED + /// state. + void pause(); + + /// @brief Transitions the pool from RUNNING or PAUSED to STOPPED. + /// + /// Stops thread event processing and then destroys the pool's threads + /// Has no effect if the pool is already in the STOPPED state. + void stop(); + + /// @brief Check if the thread pool is running. + /// + /// @return True if the thread pool is running, false otherwise. + bool isRunning() { + return (getState() == State::RUNNING); + } + + /// @brief Check if the thread pool is paused. + /// + /// @return True if the thread pool is paused, false otherwise. + bool isPaused() { + return (getState() == State::PAUSED); + } + + /// @brief Check if the thread pool is stopped. + /// + /// @return True if the thread pool is stopped, false otherwise. + bool isStopped() { + return (getState() == State::STOPPED); + } + + /// @brief Check current thread permissions to transition to the new PAUSED + /// state. + /// + /// This function throws @ref MultiThreadingInvalidOperation if the calling + /// thread is one of the worker threads. This would prevent a dead-lock if + /// the calling thread would try to perform a thread pool state transition + /// to PAUSED state. + /// + /// @throw MultiThreadingInvalidOperation if the state transition is done on + /// any of the worker threads. + void checkPausePermissions(); + +private: + /// @brief Check current thread permissions to transition to the new state. + /// + /// This function throws @ref MultiThreadingInvalidOperation if the calling + /// thread is one of the worker threads. This would prevent a dead-lock if + /// the calling thread would try to perform a thread pool state transition. + /// + /// @param state The new transition state for the pool. + /// @throw MultiThreadingInvalidOperation if the state transition is done on + /// any of the worker threads. + void checkPermissions(State state); + + /// @brief Check specified thread id against own threads. + /// + /// @return true if thread is owned, false otherwise. + bool checkThreadId(std::thread::id id); + + /// @brief Thread-safe change of the pool's run state. + /// + /// Transitions a pool from one run state to another: + /// + /// When moving from STOPPED or PAUSED to RUNNING: + /// -# Sets state to RUNNING. + /// -# Notifies threads of state change. + /// -# Restarts the IOService. + /// -# Creates the threads if they do not yet exist (true only + /// when transitioning from STOPPED). + /// -# Waits until all threads are running. + /// -# Sets the count of exited threads to 0. + /// -# Returns to caller. + /// + /// When moving from RUNNING or PAUSED to STOPPED: + /// -# Sets state to STOPPED + /// -# Notifies threads of state change. + /// -# Polls the IOService to flush handlers. + /// -# Stops the IOService. + /// -# Waits until all threads have exited the work function. + /// -# Joins and destroys the threads. + /// -# Returns to caller. + /// + /// When moving from RUNNING to PAUSED: + /// -# Sets state to PAUSED + /// -# Notifies threads of state change. + /// -# Polls the IOService to flush handlers. + /// -# Stops the IOService. + /// -# Waits until all threads have paused. + /// -# Returns to caller. + /// + /// @param state The new transition state for the pool. + /// @throw MultiThreadingInvalidOperation if the state transition is done on + /// any of the worker threads. + void setState(State state); + + /// @brief Thread-safe fetch of the pool's operational state. + /// + /// @return Thread pool state. + State getState(); + + /// @brief Validates whether the pool can change to a given state. + /// + /// @param state new state for the pool. + /// @return true if the change is valid, false otherwise. + /// @note Must be called from a thread-safe context. + bool validateStateChange(State state) const; + + /// @brief Text representation of a given state. + /// + /// @param state The state for the pool. + /// @return The text representation of a given state. + static std::string stateToText(State state); + + /// @brief Work function executed by each thread in the pool. + /// + /// Implements the run state responsibilities for a given thread. + /// It executes a run loop until the pool is stopped. At the top + /// of each iteration of the loop the pool's run state is checked + /// and when it is: + /// + /// RUNNING: + /// -# The count of threads running is incremented. + /// -# If the count has reached the number of threads in pool the + /// main thread is notified. + /// -# IOService::run() is invoked. + /// -# When IOService::run() returns, the count of threads running + /// is decremented. + /// + /// PAUSED: + /// -# The count of threads paused is incremented. + /// -# If the count has reached the number of threads in pool the + /// main thread is notified. + /// -# Thread blocks until notified the pool's run state is no + /// longer PAUSED. + /// -# The count of threads paused is decremented. + /// + /// STOPPED: + /// -# The run loop is exited. + /// -# The count of threads exited is incremented. + /// -# If the count has reached the number of threads in pool the + /// main thread is notified. + /// -# The function exits. + void threadWork(); + +public: + /// @brief Fetches the IOService that drives the pool. + /// + /// @return the pointer to the IOService. + asiolink::IOServicePtr getIOService() const; + + /// @brief Fetches the maximum size of the thread pool. + /// + /// @return the maximum size of the thread pool. + uint16_t getPoolSize() const; + + /// @brief Fetches the number of threads in the pool. + /// + /// @return the number of running threads. + uint16_t getThreadCount() const; + +private: + /// @brief Maximum number of threads in the thread pool. + size_t pool_size_; + + /// @brief Pointer to private IOService used in multi-threaded mode. + asiolink::IOServicePtr io_service_; + + /// @brief Tracks the operational state of the pool. + State run_state_; + + /// @brief Mutex to protect the internal state. + std::mutex mutex_; + + /// @brief Condition variable used by threads for synchronization. + std::condition_variable thread_cv_; + + /// @brief Condition variable used by main thread to wait on threads + /// state transitions. + std::condition_variable main_cv_; + + /// @brief Number of threads currently paused. + size_t paused_; + + /// @brief Number of threads currently running. + size_t running_; + + /// @brief Number of threads that have exited the work function. + size_t exited_; + + /// @brief Pool of threads used to service connections in multi-threaded + /// mode. + std::list > threads_; +}; + +/// @brief Defines a pointer to a thread pool. +typedef boost::shared_ptr IoServiceThreadPoolPtr; + +} // end of namespace isc::asiolink +} // end of namespace isc + +#endif diff --git a/src/lib/asiolink/tests/Makefile.am b/src/lib/asiolink/tests/Makefile.am index b4be99784f..cad0d6d160 100644 --- a/src/lib/asiolink/tests/Makefile.am +++ b/src/lib/asiolink/tests/Makefile.am @@ -35,6 +35,7 @@ run_unittests_SOURCES += udp_endpoint_unittest.cc run_unittests_SOURCES += udp_socket_unittest.cc run_unittests_SOURCES += io_service_unittest.cc run_unittests_SOURCES += io_service_signal_unittests.cc +run_unittests_SOURCES += io_service_thread_pool_unittests.cc run_unittests_SOURCES += dummy_io_callback_unittest.cc run_unittests_SOURCES += tcp_acceptor_unittest.cc run_unittests_SOURCES += unix_domain_socket_unittest.cc diff --git a/src/lib/asiolink/tests/io_service_thread_pool_unittests.cc b/src/lib/asiolink/tests/io_service_thread_pool_unittests.cc new file mode 100644 index 0000000000..496f3cfe87 --- /dev/null +++ b/src/lib/asiolink/tests/io_service_thread_pool_unittests.cc @@ -0,0 +1,264 @@ +// Copyright (C) 2021 Internet Systems Consortium, Inc. ("ISC") +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +using namespace isc; +using namespace isc::asiolink; + +namespace { + +/// @brief Test timeout (ms). +const long TEST_TIMEOUT = 10000; + +/// @brief Simple test fixture for testing IoServiceThreadPool. +class IoServiceThreadPoolTest : public ::testing::Test { +public: + /// @brief Constructor. + IoServiceThreadPoolTest() + : io_service_(new IOService()) { + } + + /// @brief Destructor. + virtual ~IoServiceThreadPoolTest() { + io_service_->stop(); + } + + /// @brief IOService instance used by thread pools. + IOServicePtr io_service_; +}; + +// URL contains scheme and hostname. +TEST_F(IoServiceThreadPoolTest, invalidConstruction) { + IoServiceThreadPoolPtr pool; + + // Constructing with pool size of 0 should fail. + ASSERT_THROW_MSG(pool.reset(new IoServiceThreadPool(io_service_, 0)), BadValue, + "pool_size must be non 0"); +} + +// Verifies that a pool can be created without starting it. +TEST_F(IoServiceThreadPoolTest, deferredStartConstruction) { + IoServiceThreadPoolPtr pool; + + ASSERT_NO_THROW_LOG(pool.reset(new IoServiceThreadPool(io_service_, 3, true))); + + // State should be stopped. + // Pool size should be 3. + // IOService should be there. + // IOService is new, so it should not be stopped. + // No threads in the pool. + ASSERT_TRUE(pool->isStopped()); + EXPECT_EQ(pool->getPoolSize(), 3); + ASSERT_TRUE(pool->getIOService()); + EXPECT_FALSE(pool->getIOService()->stopped()); + EXPECT_EQ(pool->getThreadCount(), 0); + + // Destructor should not throw. + ASSERT_NO_THROW_LOG(pool.reset()); +} + +// Verifies that a pool can be started within the constructor. +TEST_F(IoServiceThreadPoolTest, startDuringConstruction) { + IoServiceThreadPoolPtr pool; + + ASSERT_NO_THROW_LOG(pool.reset(new IoServiceThreadPool(io_service_, 3))); + + // State should be running. + // Pool size should be 3. + // IOService should be there. + // IOService is new, so it should not be stopped. + // Should have 3 threads in the pool. + ASSERT_TRUE(pool->isRunning()); + EXPECT_EQ(pool->getPoolSize(), 3); + ASSERT_TRUE(pool->getIOService()); + EXPECT_FALSE(pool->getIOService()->stopped()); + EXPECT_EQ(pool->getThreadCount(), 3); + + // Destructor should not throw. + ASSERT_NO_THROW_LOG(pool.reset()); +} + +// Verifies that pool can move from STOPPED to RUNNING. +TEST_F(IoServiceThreadPoolTest, stoppedToRunning) { + IoServiceThreadPoolPtr pool; + + // Create a stopped pool. + ASSERT_NO_THROW_LOG(pool.reset(new IoServiceThreadPool(io_service_, 3, true))); + ASSERT_TRUE(pool->isStopped()); + + // Call run from STOPPED. + ASSERT_NO_THROW_LOG(pool->run()); + + // State should be RUNNING, IOService should not be stopped, we should + // have 3 threads in the pool. + ASSERT_TRUE(pool->isRunning()); + EXPECT_FALSE(pool->getIOService()->stopped()); + EXPECT_EQ(pool->getThreadCount(), 3); + + // Calling run again should be harmless. + ASSERT_NO_THROW_LOG(pool->run()); + + // State should be RUNNING, IOService should not be stopped, we should + // have 3 threads in the pool. + ASSERT_TRUE(pool->isRunning()); + EXPECT_FALSE(pool->getIOService()->stopped()); + EXPECT_EQ(pool->getThreadCount(), 3); + + // Destroying the pool should be fine. + ASSERT_NO_THROW_LOG(pool.reset()); +} + +// Verifies that pool can move from RUNNING to STOPPED. +TEST_F(IoServiceThreadPoolTest, runningToStopped) { + IoServiceThreadPoolPtr pool; + + // Create a running pool. + ASSERT_NO_THROW_LOG(pool.reset(new IoServiceThreadPool(io_service_, 3, false))); + ASSERT_TRUE(pool->isRunning()); + + // Call stop. + ASSERT_NO_THROW_LOG(pool->stop()); + + // State should be STOPPED, IOService should be stopped, we should + // have 0 threads in the pool. + ASSERT_TRUE(pool->isStopped()); + EXPECT_TRUE(pool->getIOService()->stopped()); + EXPECT_EQ(pool->getThreadCount(), 0); + + // Calling stop again should be harmless. + ASSERT_NO_THROW_LOG(pool->stop()); + + // State should be STOPPED, IOService should be stopped, we should + // have 0 threads in the pool. + ASSERT_TRUE(pool->isStopped()); + EXPECT_TRUE(pool->getIOService()->stopped()); + EXPECT_EQ(pool->getThreadCount(), 0); + + // Destroying the pool should be fine. + ASSERT_NO_THROW_LOG(pool.reset()); +} + +// Verifies that pool can move from RUNNING to PAUSED. +TEST_F(IoServiceThreadPoolTest, runningToPaused) { + IoServiceThreadPoolPtr pool; + + // Create a running pool. + ASSERT_NO_THROW_LOG(pool.reset(new IoServiceThreadPool(io_service_, 3, false))); + ASSERT_TRUE(pool->isRunning()); + + // Call pause from RUNNING. + ASSERT_NO_THROW_LOG(pool->pause()); + + // State should be PAUSED, IOService should be stopped, we should + // have 3 threads in the pool. + ASSERT_TRUE(pool->isPaused()); + EXPECT_TRUE(pool->getIOService()->stopped()); + EXPECT_EQ(pool->getThreadCount(), 3); + + // Calling pause again should be harmless. + ASSERT_NO_THROW_LOG(pool->pause()); + + // State should be PAUSED, IOService should be stopped, we should + // have 3 threads in the pool. + ASSERT_TRUE(pool->isPaused()); + EXPECT_TRUE(pool->getIOService()->stopped()); + EXPECT_EQ(pool->getThreadCount(), 3); + + // Destroying the pool should be fine. + ASSERT_NO_THROW_LOG(pool.reset()); +} + +// Verifies that pool can move from PAUSED to RUNNING. +TEST_F(IoServiceThreadPoolTest, pausedToRunning) { + IoServiceThreadPoolPtr pool; + + // Create a running pool. + ASSERT_NO_THROW_LOG(pool.reset(new IoServiceThreadPool(io_service_, 3, false))); + ASSERT_TRUE(pool->isRunning()); + + // Call pause from RUNNING. + ASSERT_NO_THROW_LOG(pool->pause()); + ASSERT_TRUE(pool->isPaused()); + + // Call run. + ASSERT_NO_THROW_LOG(pool->run()); + + // State should be RUNNING, IOService should not be stopped, we should + // have 3 threads in the pool. + ASSERT_TRUE(pool->isRunning()); + EXPECT_FALSE(pool->getIOService()->stopped()); + EXPECT_EQ(pool->getThreadCount(), 3); + + // Destroying the pool should be fine. + ASSERT_NO_THROW_LOG(pool.reset()); +} + +// Verifies that pool can move from PAUSED to STOPPED. +TEST_F(IoServiceThreadPoolTest, pausedToStopped) { + IoServiceThreadPoolPtr pool; + + // Create a running pool. + ASSERT_NO_THROW_LOG(pool.reset(new IoServiceThreadPool(io_service_, 3, false))); + ASSERT_TRUE(pool->isRunning()); + + // Call pause from RUNNING. + ASSERT_NO_THROW_LOG(pool->pause()); + ASSERT_TRUE(pool->isPaused()); + + // Call stop. + ASSERT_NO_THROW_LOG(pool->stop()); + + // State should be STOPPED, IOService should be stopped, we should + // have 0 threads in the pool. + ASSERT_TRUE(pool->isStopped()); + EXPECT_TRUE(pool->getIOService()->stopped()); + EXPECT_EQ(pool->getThreadCount(), 0); + + // Destroying the pool should be fine. + ASSERT_NO_THROW_LOG(pool.reset()); +} + +// Verifies that attempting to pause a STOPPED pool has no effect. +TEST_F(IoServiceThreadPoolTest, stoppedToPaused) { + IoServiceThreadPoolPtr pool; + + // Create a stopped pool. + ASSERT_NO_THROW_LOG(pool.reset(new IoServiceThreadPool(io_service_, 3, true))); + ASSERT_TRUE(pool->isStopped()); + + // State should be STOPPED, IOService won't be stopped because it was + // never started. We should have 0 threads in the pool. + ASSERT_TRUE(pool->isStopped()); + EXPECT_FALSE(pool->getIOService()->stopped()); + EXPECT_EQ(pool->getThreadCount(), 0); + + // Call pause from STOPPED. + ASSERT_NO_THROW_LOG(pool->pause()); + + // Should have no effect. + ASSERT_TRUE(pool->isStopped()); + + // State should be STOPPED, IOService won't be stopped because it was + // never started. We should have 0 threads in the pool. + ASSERT_TRUE(pool->isStopped()); + EXPECT_FALSE(pool->getIOService()->stopped()); + EXPECT_EQ(pool->getThreadCount(), 0); + + // Destroying the pool should be fine. + ASSERT_NO_THROW_LOG(pool.reset()); +} + +}