libkea_asiolink_la_SOURCES += io_error.h
libkea_asiolink_la_SOURCES += io_service.h io_service.cc
libkea_asiolink_la_SOURCES += io_service_signal.cc io_service_signal.h
+libkea_asiolink_la_SOURCES += io_service_thread_pool.cc io_service_thread_poo.h
libkea_asiolink_la_SOURCES += io_socket.h io_socket.cc
libkea_asiolink_la_SOURCES += openssl_tls.h
libkea_asiolink_la_SOURCES += process_spawn.h process_spawn.cc
io_error.h \
io_service.h \
io_service_signal.h \
+ io_service_thread_pool.h \
io_socket.h \
openssl_tls.h \
process_spawn.h \
--- /dev/null
+// Copyright (C) 2022 Internet Systems Consortium, Inc. ("ISC")
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+#include <config.h>
+
+#include <asiolink/asio_wrapper.h>
+#include <asiolink/interval_timer.h>
+#include <asiolink/io_service.h>
+#include <asiolink/io_service_thread_pool.h>
+#include <exceptions/exceptions.h>
+#include <util/multi_threading_mgr.h>
+#include <util/unlock_guard.h>
+
+#include <boost/shared_ptr.hpp>
+
+#include <atomic>
+#include <functional>
+#include <iostream>
+#include <list>
+#include <mutex>
+#include <thread>
+
+using namespace isc;
+using namespace isc::asiolink;
+using namespace isc::util;
+
+IoServiceThreadPool::IoServiceThreadPool(IOServicePtr io_service, size_t pool_size,
+ bool defer_start /* = false */)
+ : pool_size_(pool_size), io_service_(io_service),
+ run_state_(State::STOPPED), mutex_(), thread_cv_(),
+ main_cv_(), paused_(0), running_(0), exited_(0) {
+ if (!pool_size) {
+ isc_throw(BadValue, "pool_size must be non 0");
+ }
+
+ // If we weren't given an IOService, create our own.
+ if (!io_service_) {
+ io_service_.reset(new IOService());
+ }
+
+ // If we're not deferring the start, do it now.
+ if (!defer_start) {
+ run();
+ }
+}
+
+IoServiceThreadPool::~IoServiceThreadPool() {
+ stop();
+}
+
+void
+IoServiceThreadPool::run() {
+ setState(State::RUNNING);
+}
+
+void
+IoServiceThreadPool::pause() {
+ setState(State::PAUSED);
+}
+
+void
+IoServiceThreadPool::stop() {
+ setState(State::STOPPED);
+}
+
+IoServiceThreadPool::State
+IoServiceThreadPool::getState() {
+ std::lock_guard<std::mutex> lck(mutex_);
+ return (run_state_);
+}
+
+bool
+IoServiceThreadPool::validateStateChange(State state) const {
+ switch (run_state_) {
+ case State::STOPPED:
+ return (state == State::RUNNING);
+ case State::RUNNING:
+ return (state != State::RUNNING);
+ case State::PAUSED:
+ return (state != State::PAUSED);
+ }
+ return (false);
+}
+
+std::string
+IoServiceThreadPool::stateToText(State state) {
+ switch (state) {
+ case State::STOPPED:
+ return (std::string("stopped"));
+ case State::RUNNING:
+ return (std::string("running"));
+ case State::PAUSED:
+ return (std::string("paused"));
+ }
+ return (std::string("unknown-state"));
+}
+
+void
+IoServiceThreadPool::checkPausePermissions() {
+ checkPermissions(State::PAUSED);
+}
+
+void
+IoServiceThreadPool::checkPermissions(State state) {
+ auto id = std::this_thread::get_id();
+ if (checkThreadId(id)) {
+ isc_throw(MultiThreadingInvalidOperation, "invalid thread pool state change to "
+ << IoServiceThreadPool::stateToText(state) << " performed by worker thread");
+ }
+}
+
+bool
+IoServiceThreadPool::checkThreadId(std::thread::id id) {
+ for (auto thread : threads_) {
+ if (id == thread->get_id()) {
+ return (true);
+ }
+ }
+ return (false);
+}
+
+void
+IoServiceThreadPool::setState(State state) {
+ checkPermissions(state);
+
+ std::unique_lock<std::mutex> main_lck(mutex_);
+
+ // Bail if the transition is invalid.
+ if (!validateStateChange(state)) {
+ return;
+ }
+
+ run_state_ = state;
+ // Notify threads of state change.
+ thread_cv_.notify_all();
+
+ switch (state) {
+ case State::RUNNING: {
+ // Restart the IOService.
+ io_service_->restart();
+
+ // While we have fewer threads than we should, make more.
+ while (threads_.size() < pool_size_) {
+ boost::shared_ptr<std::thread> thread(new std::thread(
+ std::bind(&IoServiceThreadPool::threadWork, this)));
+
+ // Add thread to the pool.
+ threads_.push_back(thread);
+ }
+
+ // Main thread waits here until all threads are running.
+ main_cv_.wait(main_lck,
+ [&]() {
+ return (running_ == threads_.size());
+ });
+
+ exited_ = 0;
+ break;
+ }
+
+ case State::PAUSED: {
+ // Stop IOService.
+ if (!io_service_->stopped()) {
+ io_service_->poll();
+ io_service_->stop();
+ }
+
+ // Main thread waits here until all threads are paused.
+ main_cv_.wait(main_lck,
+ [&]() {
+ return (paused_ == threads_.size());
+ });
+
+ break;
+ }
+
+ case State::STOPPED: {
+ // Stop IOService.
+ if (!io_service_->stopped()) {
+ io_service_->poll();
+ io_service_->stop();
+ }
+
+ // Main thread waits here until all threads have exited.
+ main_cv_.wait(main_lck,
+ [&]() {
+ return (exited_ == threads_.size());
+ });
+
+ for (auto const& thread : threads_) {
+ thread->join();
+ }
+
+ threads_.clear();
+ break;
+ }}
+}
+
+void
+IoServiceThreadPool::threadWork() {
+ bool done = false;
+ while (!done) {
+ switch (getState()) {
+ case State::RUNNING: {
+ {
+ std::unique_lock<std::mutex> lck(mutex_);
+ running_++;
+
+ // If We're all running notify main thread.
+ if (running_ == pool_size_) {
+ main_cv_.notify_all();
+ }
+ }
+
+ // Run the IOService.
+ io_service_->run();
+
+ {
+ std::unique_lock<std::mutex> lck(mutex_);
+ running_--;
+ }
+
+ break;
+ }
+
+ case State::PAUSED: {
+ std::unique_lock<std::mutex> lck(mutex_);
+ paused_++;
+
+ // If we're all paused notify main.
+ if (paused_ == threads_.size()) {
+ main_cv_.notify_all();
+ }
+
+ // Wait here till I'm released.
+ thread_cv_.wait(lck,
+ [&]() {
+ return (run_state_ != State::PAUSED);
+ });
+
+ paused_--;
+ break;
+ }
+
+ case State::STOPPED: {
+ done = true;
+ break;
+ }}
+ }
+
+ std::unique_lock<std::mutex> lck(mutex_);
+ exited_++;
+
+ // If we've all exited, notify main.
+ if (exited_ == threads_.size()) {
+ main_cv_.notify_all();
+ }
+}
+
+IOServicePtr
+IoServiceThreadPool::getIOService() const {
+ return (io_service_);
+}
+
+uint16_t
+IoServiceThreadPool::getPoolSize() const {
+ return (pool_size_);
+}
+
+uint16_t
+IoServiceThreadPool::getThreadCount() const {
+ return (threads_.size());
+}
--- /dev/null
+// Copyright (C) 2022 Internet Systems Consortium, Inc. ("ISC")
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+#ifndef IOSERVICE_THREAD_POOL_H
+#define IOSERVICE_THREAD_POOL_H
+
+#include <asiolink/io_service.h>
+#include <util/unlock_guard.h>
+
+#include <boost/shared_ptr.hpp>
+
+#include <condition_variable>
+#include <list>
+#include <mutex>
+#include <thread>
+
+namespace isc {
+namespace asiolink {
+
+/// @brief Implements a pausable pool of IOService driven threads.
+class IoServiceThreadPool {
+public:
+ /// @brief Describes the possible operational state of the thread pool.
+ enum class State {
+ STOPPED, /// Pool is not operational.
+ RUNNING, /// Pool is populated with running threads.
+ PAUSED, /// Pool is populated with threads that are paused.
+ };
+
+ /// @brief Constructor
+ ///
+ /// @param io_service IOService that will drive the pool's IO. If empty, it
+ /// create its own instance.
+ /// @param pool_size Maximum number of threads in the pool. Currently the
+ /// number of threads is fixed at this value.
+ /// @param defer_start If true, creation of the threads is deferred until
+ /// a subsequent call to @ref run(). In this case the pool's operational
+ /// state post construction is STOPPED. If false, the constructor will
+ /// invoke run() to transition the pool into the RUNNING state.
+ IoServiceThreadPool(asiolink::IOServicePtr io_service, size_t pool_size,
+ bool defer_start = false);
+
+ /// @brief Destructor
+ ///
+ /// Ensures the thread pool is stopped prior to destruction.
+ ~IoServiceThreadPool();
+
+ /// @brief Transitions the pool from STOPPED or PAUSED to RUNNING.
+ ///
+ /// When called from the STOPPED state, the pool threads are created and
+ /// begin processing events.
+ /// When called from the PAUSED state, the pool threads are released
+ /// from PAUSED and resume processing events.
+ /// Has no effect if the pool is already in the RUNNING state.
+ void run();
+
+ /// @brief Transitions the pool from RUNNING to PAUSED.
+ ///
+ /// Pool threads suspend event processing and pause until they
+ /// are released to either resume running or stop.
+ /// Has no effect if the pool is already in the PAUSED or STOPPED
+ /// state.
+ void pause();
+
+ /// @brief Transitions the pool from RUNNING or PAUSED to STOPPED.
+ ///
+ /// Stops thread event processing and then destroys the pool's threads
+ /// Has no effect if the pool is already in the STOPPED state.
+ void stop();
+
+ /// @brief Check if the thread pool is running.
+ ///
+ /// @return True if the thread pool is running, false otherwise.
+ bool isRunning() {
+ return (getState() == State::RUNNING);
+ }
+
+ /// @brief Check if the thread pool is paused.
+ ///
+ /// @return True if the thread pool is paused, false otherwise.
+ bool isPaused() {
+ return (getState() == State::PAUSED);
+ }
+
+ /// @brief Check if the thread pool is stopped.
+ ///
+ /// @return True if the thread pool is stopped, false otherwise.
+ bool isStopped() {
+ return (getState() == State::STOPPED);
+ }
+
+ /// @brief Check current thread permissions to transition to the new PAUSED
+ /// state.
+ ///
+ /// This function throws @ref MultiThreadingInvalidOperation if the calling
+ /// thread is one of the worker threads. This would prevent a dead-lock if
+ /// the calling thread would try to perform a thread pool state transition
+ /// to PAUSED state.
+ ///
+ /// @throw MultiThreadingInvalidOperation if the state transition is done on
+ /// any of the worker threads.
+ void checkPausePermissions();
+
+private:
+ /// @brief Check current thread permissions to transition to the new state.
+ ///
+ /// This function throws @ref MultiThreadingInvalidOperation if the calling
+ /// thread is one of the worker threads. This would prevent a dead-lock if
+ /// the calling thread would try to perform a thread pool state transition.
+ ///
+ /// @param state The new transition state for the pool.
+ /// @throw MultiThreadingInvalidOperation if the state transition is done on
+ /// any of the worker threads.
+ void checkPermissions(State state);
+
+ /// @brief Check specified thread id against own threads.
+ ///
+ /// @return true if thread is owned, false otherwise.
+ bool checkThreadId(std::thread::id id);
+
+ /// @brief Thread-safe change of the pool's run state.
+ ///
+ /// Transitions a pool from one run state to another:
+ ///
+ /// When moving from STOPPED or PAUSED to RUNNING:
+ /// -# Sets state to RUNNING.
+ /// -# Notifies threads of state change.
+ /// -# Restarts the IOService.
+ /// -# Creates the threads if they do not yet exist (true only
+ /// when transitioning from STOPPED).
+ /// -# Waits until all threads are running.
+ /// -# Sets the count of exited threads to 0.
+ /// -# Returns to caller.
+ ///
+ /// When moving from RUNNING or PAUSED to STOPPED:
+ /// -# Sets state to STOPPED
+ /// -# Notifies threads of state change.
+ /// -# Polls the IOService to flush handlers.
+ /// -# Stops the IOService.
+ /// -# Waits until all threads have exited the work function.
+ /// -# Joins and destroys the threads.
+ /// -# Returns to caller.
+ ///
+ /// When moving from RUNNING to PAUSED:
+ /// -# Sets state to PAUSED
+ /// -# Notifies threads of state change.
+ /// -# Polls the IOService to flush handlers.
+ /// -# Stops the IOService.
+ /// -# Waits until all threads have paused.
+ /// -# Returns to caller.
+ ///
+ /// @param state The new transition state for the pool.
+ /// @throw MultiThreadingInvalidOperation if the state transition is done on
+ /// any of the worker threads.
+ void setState(State state);
+
+ /// @brief Thread-safe fetch of the pool's operational state.
+ ///
+ /// @return Thread pool state.
+ State getState();
+
+ /// @brief Validates whether the pool can change to a given state.
+ ///
+ /// @param state new state for the pool.
+ /// @return true if the change is valid, false otherwise.
+ /// @note Must be called from a thread-safe context.
+ bool validateStateChange(State state) const;
+
+ /// @brief Text representation of a given state.
+ ///
+ /// @param state The state for the pool.
+ /// @return The text representation of a given state.
+ static std::string stateToText(State state);
+
+ /// @brief Work function executed by each thread in the pool.
+ ///
+ /// Implements the run state responsibilities for a given thread.
+ /// It executes a run loop until the pool is stopped. At the top
+ /// of each iteration of the loop the pool's run state is checked
+ /// and when it is:
+ ///
+ /// RUNNING:
+ /// -# The count of threads running is incremented.
+ /// -# If the count has reached the number of threads in pool the
+ /// main thread is notified.
+ /// -# IOService::run() is invoked.
+ /// -# When IOService::run() returns, the count of threads running
+ /// is decremented.
+ ///
+ /// PAUSED:
+ /// -# The count of threads paused is incremented.
+ /// -# If the count has reached the number of threads in pool the
+ /// main thread is notified.
+ /// -# Thread blocks until notified the pool's run state is no
+ /// longer PAUSED.
+ /// -# The count of threads paused is decremented.
+ ///
+ /// STOPPED:
+ /// -# The run loop is exited.
+ /// -# The count of threads exited is incremented.
+ /// -# If the count has reached the number of threads in pool the
+ /// main thread is notified.
+ /// -# The function exits.
+ void threadWork();
+
+public:
+ /// @brief Fetches the IOService that drives the pool.
+ ///
+ /// @return the pointer to the IOService.
+ asiolink::IOServicePtr getIOService() const;
+
+ /// @brief Fetches the maximum size of the thread pool.
+ ///
+ /// @return the maximum size of the thread pool.
+ uint16_t getPoolSize() const;
+
+ /// @brief Fetches the number of threads in the pool.
+ ///
+ /// @return the number of running threads.
+ uint16_t getThreadCount() const;
+
+private:
+ /// @brief Maximum number of threads in the thread pool.
+ size_t pool_size_;
+
+ /// @brief Pointer to private IOService used in multi-threaded mode.
+ asiolink::IOServicePtr io_service_;
+
+ /// @brief Tracks the operational state of the pool.
+ State run_state_;
+
+ /// @brief Mutex to protect the internal state.
+ std::mutex mutex_;
+
+ /// @brief Condition variable used by threads for synchronization.
+ std::condition_variable thread_cv_;
+
+ /// @brief Condition variable used by main thread to wait on threads
+ /// state transitions.
+ std::condition_variable main_cv_;
+
+ /// @brief Number of threads currently paused.
+ size_t paused_;
+
+ /// @brief Number of threads currently running.
+ size_t running_;
+
+ /// @brief Number of threads that have exited the work function.
+ size_t exited_;
+
+ /// @brief Pool of threads used to service connections in multi-threaded
+ /// mode.
+ std::list<boost::shared_ptr<std::thread> > threads_;
+};
+
+/// @brief Defines a pointer to a thread pool.
+typedef boost::shared_ptr<IoServiceThreadPool> IoServiceThreadPoolPtr;
+
+} // end of namespace isc::asiolink
+} // end of namespace isc
+
+#endif
run_unittests_SOURCES += udp_socket_unittest.cc
run_unittests_SOURCES += io_service_unittest.cc
run_unittests_SOURCES += io_service_signal_unittests.cc
+run_unittests_SOURCES += io_service_thread_pool_unittests.cc
run_unittests_SOURCES += dummy_io_callback_unittest.cc
run_unittests_SOURCES += tcp_acceptor_unittest.cc
run_unittests_SOURCES += unix_domain_socket_unittest.cc
--- /dev/null
+// Copyright (C) 2021 Internet Systems Consortium, Inc. ("ISC")
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+#include <config.h>
+#include <asiolink/asio_wrapper.h>
+#include <asiolink/interval_timer.h>
+#include <asiolink/io_service.h>
+#include <asiolink/io_service_thread_pool.h>
+#include <exceptions/exceptions.h>
+#include <testutils/gtest_utils.h>
+
+#include <gtest/gtest.h>
+#include <string>
+
+using namespace isc;
+using namespace isc::asiolink;
+
+namespace {
+
+/// @brief Test timeout (ms).
+const long TEST_TIMEOUT = 10000;
+
+/// @brief Simple test fixture for testing IoServiceThreadPool.
+class IoServiceThreadPoolTest : public ::testing::Test {
+public:
+ /// @brief Constructor.
+ IoServiceThreadPoolTest()
+ : io_service_(new IOService()) {
+ }
+
+ /// @brief Destructor.
+ virtual ~IoServiceThreadPoolTest() {
+ io_service_->stop();
+ }
+
+ /// @brief IOService instance used by thread pools.
+ IOServicePtr io_service_;
+};
+
+// URL contains scheme and hostname.
+TEST_F(IoServiceThreadPoolTest, invalidConstruction) {
+ IoServiceThreadPoolPtr pool;
+
+ // Constructing with pool size of 0 should fail.
+ ASSERT_THROW_MSG(pool.reset(new IoServiceThreadPool(io_service_, 0)), BadValue,
+ "pool_size must be non 0");
+}
+
+// Verifies that a pool can be created without starting it.
+TEST_F(IoServiceThreadPoolTest, deferredStartConstruction) {
+ IoServiceThreadPoolPtr pool;
+
+ ASSERT_NO_THROW_LOG(pool.reset(new IoServiceThreadPool(io_service_, 3, true)));
+
+ // State should be stopped.
+ // Pool size should be 3.
+ // IOService should be there.
+ // IOService is new, so it should not be stopped.
+ // No threads in the pool.
+ ASSERT_TRUE(pool->isStopped());
+ EXPECT_EQ(pool->getPoolSize(), 3);
+ ASSERT_TRUE(pool->getIOService());
+ EXPECT_FALSE(pool->getIOService()->stopped());
+ EXPECT_EQ(pool->getThreadCount(), 0);
+
+ // Destructor should not throw.
+ ASSERT_NO_THROW_LOG(pool.reset());
+}
+
+// Verifies that a pool can be started within the constructor.
+TEST_F(IoServiceThreadPoolTest, startDuringConstruction) {
+ IoServiceThreadPoolPtr pool;
+
+ ASSERT_NO_THROW_LOG(pool.reset(new IoServiceThreadPool(io_service_, 3)));
+
+ // State should be running.
+ // Pool size should be 3.
+ // IOService should be there.
+ // IOService is new, so it should not be stopped.
+ // Should have 3 threads in the pool.
+ ASSERT_TRUE(pool->isRunning());
+ EXPECT_EQ(pool->getPoolSize(), 3);
+ ASSERT_TRUE(pool->getIOService());
+ EXPECT_FALSE(pool->getIOService()->stopped());
+ EXPECT_EQ(pool->getThreadCount(), 3);
+
+ // Destructor should not throw.
+ ASSERT_NO_THROW_LOG(pool.reset());
+}
+
+// Verifies that pool can move from STOPPED to RUNNING.
+TEST_F(IoServiceThreadPoolTest, stoppedToRunning) {
+ IoServiceThreadPoolPtr pool;
+
+ // Create a stopped pool.
+ ASSERT_NO_THROW_LOG(pool.reset(new IoServiceThreadPool(io_service_, 3, true)));
+ ASSERT_TRUE(pool->isStopped());
+
+ // Call run from STOPPED.
+ ASSERT_NO_THROW_LOG(pool->run());
+
+ // State should be RUNNING, IOService should not be stopped, we should
+ // have 3 threads in the pool.
+ ASSERT_TRUE(pool->isRunning());
+ EXPECT_FALSE(pool->getIOService()->stopped());
+ EXPECT_EQ(pool->getThreadCount(), 3);
+
+ // Calling run again should be harmless.
+ ASSERT_NO_THROW_LOG(pool->run());
+
+ // State should be RUNNING, IOService should not be stopped, we should
+ // have 3 threads in the pool.
+ ASSERT_TRUE(pool->isRunning());
+ EXPECT_FALSE(pool->getIOService()->stopped());
+ EXPECT_EQ(pool->getThreadCount(), 3);
+
+ // Destroying the pool should be fine.
+ ASSERT_NO_THROW_LOG(pool.reset());
+}
+
+// Verifies that pool can move from RUNNING to STOPPED.
+TEST_F(IoServiceThreadPoolTest, runningToStopped) {
+ IoServiceThreadPoolPtr pool;
+
+ // Create a running pool.
+ ASSERT_NO_THROW_LOG(pool.reset(new IoServiceThreadPool(io_service_, 3, false)));
+ ASSERT_TRUE(pool->isRunning());
+
+ // Call stop.
+ ASSERT_NO_THROW_LOG(pool->stop());
+
+ // State should be STOPPED, IOService should be stopped, we should
+ // have 0 threads in the pool.
+ ASSERT_TRUE(pool->isStopped());
+ EXPECT_TRUE(pool->getIOService()->stopped());
+ EXPECT_EQ(pool->getThreadCount(), 0);
+
+ // Calling stop again should be harmless.
+ ASSERT_NO_THROW_LOG(pool->stop());
+
+ // State should be STOPPED, IOService should be stopped, we should
+ // have 0 threads in the pool.
+ ASSERT_TRUE(pool->isStopped());
+ EXPECT_TRUE(pool->getIOService()->stopped());
+ EXPECT_EQ(pool->getThreadCount(), 0);
+
+ // Destroying the pool should be fine.
+ ASSERT_NO_THROW_LOG(pool.reset());
+}
+
+// Verifies that pool can move from RUNNING to PAUSED.
+TEST_F(IoServiceThreadPoolTest, runningToPaused) {
+ IoServiceThreadPoolPtr pool;
+
+ // Create a running pool.
+ ASSERT_NO_THROW_LOG(pool.reset(new IoServiceThreadPool(io_service_, 3, false)));
+ ASSERT_TRUE(pool->isRunning());
+
+ // Call pause from RUNNING.
+ ASSERT_NO_THROW_LOG(pool->pause());
+
+ // State should be PAUSED, IOService should be stopped, we should
+ // have 3 threads in the pool.
+ ASSERT_TRUE(pool->isPaused());
+ EXPECT_TRUE(pool->getIOService()->stopped());
+ EXPECT_EQ(pool->getThreadCount(), 3);
+
+ // Calling pause again should be harmless.
+ ASSERT_NO_THROW_LOG(pool->pause());
+
+ // State should be PAUSED, IOService should be stopped, we should
+ // have 3 threads in the pool.
+ ASSERT_TRUE(pool->isPaused());
+ EXPECT_TRUE(pool->getIOService()->stopped());
+ EXPECT_EQ(pool->getThreadCount(), 3);
+
+ // Destroying the pool should be fine.
+ ASSERT_NO_THROW_LOG(pool.reset());
+}
+
+// Verifies that pool can move from PAUSED to RUNNING.
+TEST_F(IoServiceThreadPoolTest, pausedToRunning) {
+ IoServiceThreadPoolPtr pool;
+
+ // Create a running pool.
+ ASSERT_NO_THROW_LOG(pool.reset(new IoServiceThreadPool(io_service_, 3, false)));
+ ASSERT_TRUE(pool->isRunning());
+
+ // Call pause from RUNNING.
+ ASSERT_NO_THROW_LOG(pool->pause());
+ ASSERT_TRUE(pool->isPaused());
+
+ // Call run.
+ ASSERT_NO_THROW_LOG(pool->run());
+
+ // State should be RUNNING, IOService should not be stopped, we should
+ // have 3 threads in the pool.
+ ASSERT_TRUE(pool->isRunning());
+ EXPECT_FALSE(pool->getIOService()->stopped());
+ EXPECT_EQ(pool->getThreadCount(), 3);
+
+ // Destroying the pool should be fine.
+ ASSERT_NO_THROW_LOG(pool.reset());
+}
+
+// Verifies that pool can move from PAUSED to STOPPED.
+TEST_F(IoServiceThreadPoolTest, pausedToStopped) {
+ IoServiceThreadPoolPtr pool;
+
+ // Create a running pool.
+ ASSERT_NO_THROW_LOG(pool.reset(new IoServiceThreadPool(io_service_, 3, false)));
+ ASSERT_TRUE(pool->isRunning());
+
+ // Call pause from RUNNING.
+ ASSERT_NO_THROW_LOG(pool->pause());
+ ASSERT_TRUE(pool->isPaused());
+
+ // Call stop.
+ ASSERT_NO_THROW_LOG(pool->stop());
+
+ // State should be STOPPED, IOService should be stopped, we should
+ // have 0 threads in the pool.
+ ASSERT_TRUE(pool->isStopped());
+ EXPECT_TRUE(pool->getIOService()->stopped());
+ EXPECT_EQ(pool->getThreadCount(), 0);
+
+ // Destroying the pool should be fine.
+ ASSERT_NO_THROW_LOG(pool.reset());
+}
+
+// Verifies that attempting to pause a STOPPED pool has no effect.
+TEST_F(IoServiceThreadPoolTest, stoppedToPaused) {
+ IoServiceThreadPoolPtr pool;
+
+ // Create a stopped pool.
+ ASSERT_NO_THROW_LOG(pool.reset(new IoServiceThreadPool(io_service_, 3, true)));
+ ASSERT_TRUE(pool->isStopped());
+
+ // State should be STOPPED, IOService won't be stopped because it was
+ // never started. We should have 0 threads in the pool.
+ ASSERT_TRUE(pool->isStopped());
+ EXPECT_FALSE(pool->getIOService()->stopped());
+ EXPECT_EQ(pool->getThreadCount(), 0);
+
+ // Call pause from STOPPED.
+ ASSERT_NO_THROW_LOG(pool->pause());
+
+ // Should have no effect.
+ ASSERT_TRUE(pool->isStopped());
+
+ // State should be STOPPED, IOService won't be stopped because it was
+ // never started. We should have 0 threads in the pool.
+ ASSERT_TRUE(pool->isStopped());
+ EXPECT_FALSE(pool->getIOService()->stopped());
+ EXPECT_EQ(pool->getThreadCount(), 0);
+
+ // Destroying the pool should be fine.
+ ASSERT_NO_THROW_LOG(pool.reset());
+}
+
+}