]> git.ipfire.org Git - thirdparty/kea.git/commitdiff
[#2581] Create asiolink::IoServiceThreadPool
authorThomas Markwalder <tmark@isc.org>
Thu, 29 Sep 2022 14:44:08 +0000 (10:44 -0400)
committerThomas Markwalder <tmark@isc.org>
Thu, 29 Sep 2022 14:44:08 +0000 (10:44 -0400)
Create IoServiceThreadPool from HttpThreadPool

src/lib/asiolink/Makefile.am
    - added new files

src/lib/asiolink/io_service_thread_pool.cc
src/lib/asiolink/io_service_thread_pool.h
    - new files

src/lib/asiolink/tests/Makefile.am
    - added new file

src/lib/asiolink/tests/io_service_thread_pool.h
    - new file

src/lib/asiolink/Makefile.am
src/lib/asiolink/io_service_thread_pool.cc [new file with mode: 0644]
src/lib/asiolink/io_service_thread_pool.h [new file with mode: 0644]
src/lib/asiolink/tests/Makefile.am
src/lib/asiolink/tests/io_service_thread_pool_unittests.cc [new file with mode: 0644]

index 782358c1748a99a9d1d20c862911ed7d26760091..cf4a2e7bf5369eb9005ebb098264953ec740d773 100644 (file)
@@ -30,6 +30,7 @@ libkea_asiolink_la_SOURCES += io_endpoint.cc io_endpoint.h
 libkea_asiolink_la_SOURCES += io_error.h
 libkea_asiolink_la_SOURCES += io_service.h io_service.cc
 libkea_asiolink_la_SOURCES += io_service_signal.cc io_service_signal.h
+libkea_asiolink_la_SOURCES += io_service_thread_pool.cc io_service_thread_poo.h
 libkea_asiolink_la_SOURCES += io_socket.h io_socket.cc
 libkea_asiolink_la_SOURCES += openssl_tls.h
 libkea_asiolink_la_SOURCES += process_spawn.h process_spawn.cc
@@ -83,6 +84,7 @@ libkea_asiolink_include_HEADERS = \
        io_error.h \
        io_service.h \
        io_service_signal.h \
+       io_service_thread_pool.h \
        io_socket.h \
        openssl_tls.h \
        process_spawn.h \
diff --git a/src/lib/asiolink/io_service_thread_pool.cc b/src/lib/asiolink/io_service_thread_pool.cc
new file mode 100644 (file)
index 0000000..3f930a4
--- /dev/null
@@ -0,0 +1,276 @@
+// Copyright (C) 2022 Internet Systems Consortium, Inc. ("ISC")
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+#include <config.h>
+
+#include <asiolink/asio_wrapper.h>
+#include <asiolink/interval_timer.h>
+#include <asiolink/io_service.h>
+#include <asiolink/io_service_thread_pool.h>
+#include <exceptions/exceptions.h>
+#include <util/multi_threading_mgr.h>
+#include <util/unlock_guard.h>
+
+#include <boost/shared_ptr.hpp>
+
+#include <atomic>
+#include <functional>
+#include <iostream>
+#include <list>
+#include <mutex>
+#include <thread>
+
+using namespace isc;
+using namespace isc::asiolink;
+using namespace isc::util;
+
+IoServiceThreadPool::IoServiceThreadPool(IOServicePtr io_service, size_t pool_size,
+                               bool defer_start /* = false */)
+    : pool_size_(pool_size), io_service_(io_service),
+      run_state_(State::STOPPED), mutex_(), thread_cv_(),
+      main_cv_(), paused_(0), running_(0), exited_(0)  {
+    if (!pool_size) {
+        isc_throw(BadValue, "pool_size must be non 0");
+    }
+
+    // If we weren't given an IOService, create our own.
+    if (!io_service_) {
+        io_service_.reset(new IOService());
+    }
+
+    // If we're not deferring the start, do it now.
+    if (!defer_start) {
+        run();
+    }
+}
+
+IoServiceThreadPool::~IoServiceThreadPool() {
+    stop();
+}
+
+void
+IoServiceThreadPool::run() {
+    setState(State::RUNNING);
+}
+
+void
+IoServiceThreadPool::pause() {
+    setState(State::PAUSED);
+}
+
+void
+IoServiceThreadPool::stop() {
+    setState(State::STOPPED);
+}
+
+IoServiceThreadPool::State
+IoServiceThreadPool::getState() {
+    std::lock_guard<std::mutex> lck(mutex_);
+    return (run_state_);
+}
+
+bool
+IoServiceThreadPool::validateStateChange(State state) const {
+    switch (run_state_) {
+    case State::STOPPED:
+        return (state == State::RUNNING);
+    case State::RUNNING:
+        return (state != State::RUNNING);
+    case State::PAUSED:
+        return (state != State::PAUSED);
+    }
+    return (false);
+}
+
+std::string
+IoServiceThreadPool::stateToText(State state) {
+    switch (state) {
+    case State::STOPPED:
+        return (std::string("stopped"));
+    case State::RUNNING:
+        return (std::string("running"));
+    case State::PAUSED:
+        return (std::string("paused"));
+    }
+    return (std::string("unknown-state"));
+}
+
+void
+IoServiceThreadPool::checkPausePermissions() {
+    checkPermissions(State::PAUSED);
+}
+
+void
+IoServiceThreadPool::checkPermissions(State state) {
+    auto id = std::this_thread::get_id();
+    if (checkThreadId(id)) {
+        isc_throw(MultiThreadingInvalidOperation, "invalid thread pool state change to "
+                  << IoServiceThreadPool::stateToText(state) << " performed by worker thread");
+    }
+}
+
+bool
+IoServiceThreadPool::checkThreadId(std::thread::id id) {
+    for (auto thread : threads_) {
+        if (id == thread->get_id()) {
+            return (true);
+        }
+    }
+    return (false);
+}
+
+void
+IoServiceThreadPool::setState(State state) {
+    checkPermissions(state);
+
+    std::unique_lock<std::mutex> main_lck(mutex_);
+
+    // Bail if the transition is invalid.
+    if (!validateStateChange(state)) {
+        return;
+    }
+
+    run_state_ = state;
+    // Notify threads of state change.
+    thread_cv_.notify_all();
+
+    switch (state) {
+    case State::RUNNING: {
+        // Restart the IOService.
+        io_service_->restart();
+
+        // While we have fewer threads than we should, make more.
+        while (threads_.size() < pool_size_) {
+            boost::shared_ptr<std::thread> thread(new std::thread(
+                std::bind(&IoServiceThreadPool::threadWork, this)));
+
+            // Add thread to the pool.
+            threads_.push_back(thread);
+        }
+
+        // Main thread waits here until all threads are running.
+        main_cv_.wait(main_lck,
+            [&]() {
+                return (running_ == threads_.size());
+            });
+
+        exited_ = 0;
+        break;
+    }
+
+    case State::PAUSED: {
+        // Stop IOService.
+        if (!io_service_->stopped()) {
+            io_service_->poll();
+            io_service_->stop();
+        }
+
+        // Main thread waits here until all threads are paused.
+        main_cv_.wait(main_lck,
+            [&]() {
+                return (paused_ == threads_.size());
+            });
+
+        break;
+    }
+
+    case State::STOPPED: {
+        // Stop IOService.
+        if (!io_service_->stopped()) {
+            io_service_->poll();
+            io_service_->stop();
+        }
+
+        // Main thread waits here until all threads have exited.
+        main_cv_.wait(main_lck,
+            [&]() {
+                return (exited_ == threads_.size());
+            });
+
+        for (auto const& thread : threads_) {
+            thread->join();
+        }
+
+        threads_.clear();
+        break;
+    }}
+}
+
+void
+IoServiceThreadPool::threadWork() {
+    bool done = false;
+    while (!done) {
+        switch (getState()) {
+        case State::RUNNING: {
+            {
+                std::unique_lock<std::mutex> lck(mutex_);
+                running_++;
+
+                // If We're all running notify main thread.
+                if (running_ == pool_size_) {
+                    main_cv_.notify_all();
+                }
+            }
+
+            // Run the IOService.
+            io_service_->run();
+
+            {
+                std::unique_lock<std::mutex> lck(mutex_);
+                running_--;
+            }
+
+            break;
+        }
+
+        case State::PAUSED: {
+            std::unique_lock<std::mutex> lck(mutex_);
+            paused_++;
+
+            // If we're all paused notify main.
+            if (paused_ == threads_.size()) {
+                main_cv_.notify_all();
+            }
+
+            // Wait here till I'm released.
+            thread_cv_.wait(lck,
+                [&]() {
+                    return (run_state_ != State::PAUSED);
+                });
+
+            paused_--;
+            break;
+        }
+
+        case State::STOPPED: {
+            done = true;
+            break;
+        }}
+    }
+
+    std::unique_lock<std::mutex> lck(mutex_);
+    exited_++;
+
+    // If we've all exited, notify main.
+    if (exited_ == threads_.size()) {
+        main_cv_.notify_all();
+    }
+}
+
+IOServicePtr
+IoServiceThreadPool::getIOService() const {
+    return (io_service_);
+}
+
+uint16_t
+IoServiceThreadPool::getPoolSize() const {
+    return (pool_size_);
+}
+
+uint16_t
+IoServiceThreadPool::getThreadCount() const {
+    return (threads_.size());
+}
diff --git a/src/lib/asiolink/io_service_thread_pool.h b/src/lib/asiolink/io_service_thread_pool.h
new file mode 100644 (file)
index 0000000..68c4e21
--- /dev/null
@@ -0,0 +1,265 @@
+// Copyright (C) 2022 Internet Systems Consortium, Inc. ("ISC")
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+#ifndef IOSERVICE_THREAD_POOL_H
+#define IOSERVICE_THREAD_POOL_H
+
+#include <asiolink/io_service.h>
+#include <util/unlock_guard.h>
+
+#include <boost/shared_ptr.hpp>
+
+#include <condition_variable>
+#include <list>
+#include <mutex>
+#include <thread>
+
+namespace isc {
+namespace asiolink {
+
+/// @brief Implements a pausable pool of IOService driven threads.
+class IoServiceThreadPool {
+public:
+    /// @brief Describes the possible operational state of the thread pool.
+    enum class State {
+        STOPPED,    /// Pool is not operational.
+        RUNNING,    /// Pool is populated with running threads.
+        PAUSED,     /// Pool is populated with threads that are paused.
+    };
+
+    /// @brief Constructor
+    ///
+    /// @param io_service IOService that will drive the pool's IO. If empty, it
+    /// create its own instance.
+    /// @param pool_size Maximum number of threads in the pool.  Currently the
+    /// number of threads is fixed at this value.
+    /// @param defer_start If true, creation of the threads is deferred until
+    /// a subsequent call to @ref run().  In this case the pool's operational
+    /// state post construction is STOPPED.  If false, the constructor will
+    /// invoke run() to transition the pool into the RUNNING state.
+    IoServiceThreadPool(asiolink::IOServicePtr io_service, size_t pool_size,
+                   bool defer_start = false);
+
+    /// @brief Destructor
+    ///
+    /// Ensures the thread pool is stopped prior to destruction.
+    ~IoServiceThreadPool();
+
+    /// @brief Transitions the pool from STOPPED or PAUSED to RUNNING.
+    ///
+    /// When called from the STOPPED state, the pool threads are created and
+    /// begin processing events.
+    /// When called from the PAUSED state, the pool threads are released
+    /// from PAUSED and resume processing events.
+    /// Has no effect if the pool is already in the RUNNING state.
+    void run();
+
+    /// @brief Transitions the pool from RUNNING to PAUSED.
+    ///
+    /// Pool threads suspend event processing and pause until they
+    /// are released to either resume running or stop.
+    /// Has no effect if the pool is already in the PAUSED or STOPPED
+    /// state.
+    void pause();
+
+    /// @brief Transitions the pool from RUNNING or PAUSED to STOPPED.
+    ///
+    /// Stops thread event processing and then destroys the pool's threads
+    /// Has no effect if the pool is already in the STOPPED state.
+    void stop();
+
+    /// @brief Check if the thread pool is running.
+    ///
+    /// @return True if the thread pool is running, false otherwise.
+    bool isRunning() {
+        return (getState() == State::RUNNING);
+    }
+
+    /// @brief Check if the thread pool is paused.
+    ///
+    /// @return True if the thread pool is paused, false otherwise.
+    bool isPaused() {
+        return (getState() == State::PAUSED);
+    }
+
+    /// @brief Check if the thread pool is stopped.
+    ///
+    /// @return True if the thread pool is stopped, false otherwise.
+    bool isStopped() {
+        return (getState() == State::STOPPED);
+    }
+
+    /// @brief Check current thread permissions to transition to the new PAUSED
+    /// state.
+    ///
+    /// This function throws @ref MultiThreadingInvalidOperation if the calling
+    /// thread is one of the worker threads. This would prevent a dead-lock if
+    /// the calling thread would try to perform a thread pool state transition
+    /// to PAUSED state.
+    ///
+    /// @throw MultiThreadingInvalidOperation if the state transition is done on
+    /// any of the worker threads.
+    void checkPausePermissions();
+
+private:
+    /// @brief Check current thread permissions to transition to the new state.
+    ///
+    /// This function throws @ref MultiThreadingInvalidOperation if the calling
+    /// thread is one of the worker threads. This would prevent a dead-lock if
+    /// the calling thread would try to perform a thread pool state transition.
+    ///
+    /// @param state The new transition state for the pool.
+    /// @throw MultiThreadingInvalidOperation if the state transition is done on
+    /// any of the worker threads.
+    void checkPermissions(State state);
+
+    /// @brief Check specified thread id against own threads.
+    ///
+    /// @return true if thread is owned, false otherwise.
+    bool checkThreadId(std::thread::id id);
+
+    /// @brief Thread-safe change of the pool's run state.
+    ///
+    /// Transitions a pool from one run state to another:
+    ///
+    /// When moving from STOPPED or PAUSED to RUNNING:
+    /// -# Sets state to RUNNING.
+    /// -# Notifies threads of state change.
+    /// -# Restarts the IOService.
+    /// -# Creates the threads if they do not yet exist (true only
+    ///    when transitioning from STOPPED).
+    /// -# Waits until all threads are running.
+    /// -# Sets the count of exited threads to 0.
+    /// -# Returns to caller.
+    ///
+    /// When moving from RUNNING or PAUSED to STOPPED:
+    /// -# Sets state to STOPPED
+    /// -# Notifies threads of state change.
+    /// -# Polls the IOService to flush handlers.
+    /// -# Stops the IOService.
+    /// -# Waits until all threads have exited the work function.
+    /// -# Joins and destroys the threads.
+    /// -# Returns to caller.
+    ///
+    /// When moving from RUNNING to PAUSED:
+    /// -# Sets state to PAUSED
+    /// -# Notifies threads of state change.
+    /// -# Polls the IOService to flush handlers.
+    /// -# Stops the IOService.
+    /// -# Waits until all threads have paused.
+    /// -# Returns to caller.
+    ///
+    /// @param state The new transition state for the pool.
+    /// @throw MultiThreadingInvalidOperation if the state transition is done on
+    /// any of the worker threads.
+    void setState(State state);
+
+    /// @brief Thread-safe fetch of the pool's operational state.
+    ///
+    /// @return Thread pool state.
+    State getState();
+
+    /// @brief Validates whether the pool can change to a given state.
+    ///
+    /// @param state new state for the pool.
+    /// @return true if the change is valid, false otherwise.
+    /// @note Must be called from a thread-safe context.
+    bool validateStateChange(State state) const;
+
+    /// @brief Text representation of a given state.
+    ///
+    /// @param state The state for the pool.
+    /// @return The text representation of a given state.
+    static std::string stateToText(State state);
+
+    /// @brief Work function executed by each thread in the pool.
+    ///
+    /// Implements the run state responsibilities for a given thread.
+    /// It executes a run loop until the pool is stopped. At the top
+    /// of each iteration of the loop the pool's run state is checked
+    /// and when it is:
+    ///
+    /// RUNNING:
+    /// -# The count of threads running is incremented.
+    /// -# If the count has reached the number of threads in pool the
+    ///    main thread is notified.
+    /// -# IOService::run() is invoked.
+    /// -# When IOService::run() returns, the count of threads running
+    ///    is decremented.
+    ///
+    /// PAUSED:
+    /// -# The count of threads paused is incremented.
+    /// -# If the count has reached the number of threads in pool the
+    ///    main thread is notified.
+    /// -# Thread blocks until notified the pool's run state is no
+    ///    longer PAUSED.
+    /// -# The count of threads paused is decremented.
+    ///
+    /// STOPPED:
+    /// -# The run loop is exited.
+    /// -# The count of threads exited is incremented.
+    /// -# If the count has reached the number of threads in pool the
+    ///    main thread is notified.
+    /// -# The function exits.
+    void threadWork();
+
+public:
+    /// @brief Fetches the IOService that drives the pool.
+    ///
+    /// @return the pointer to the IOService.
+    asiolink::IOServicePtr getIOService() const;
+
+    /// @brief Fetches the maximum size of the thread pool.
+    ///
+    /// @return the maximum size of the thread pool.
+    uint16_t getPoolSize() const;
+
+    /// @brief Fetches the number of threads in the pool.
+    ///
+    /// @return the number of running threads.
+    uint16_t getThreadCount() const;
+
+private:
+    /// @brief Maximum number of threads in the thread pool.
+    size_t pool_size_;
+
+    /// @brief Pointer to private IOService used in multi-threaded mode.
+    asiolink::IOServicePtr io_service_;
+
+    /// @brief Tracks the operational state of the pool.
+    State run_state_;
+
+    /// @brief Mutex to protect the internal state.
+    std::mutex mutex_;
+
+    /// @brief Condition variable used by threads for synchronization.
+    std::condition_variable thread_cv_;
+
+    /// @brief Condition variable used by main thread to wait on threads
+    /// state transitions.
+    std::condition_variable main_cv_;
+
+    /// @brief Number of threads currently paused.
+    size_t paused_;
+
+    /// @brief Number of threads currently running.
+    size_t running_;
+
+    /// @brief Number of threads that have exited the work function.
+    size_t exited_;
+
+    /// @brief Pool of threads used to service connections in multi-threaded
+    /// mode.
+    std::list<boost::shared_ptr<std::thread> > threads_;
+};
+
+/// @brief Defines a pointer to a thread pool.
+typedef boost::shared_ptr<IoServiceThreadPool> IoServiceThreadPoolPtr;
+
+} // end of namespace isc::asiolink
+} // end of namespace isc
+
+#endif
index b4be99784fd225e7130d88997c7fa679563a372b..cad0d6d16010f0439a29802046f08febcd46de52 100644 (file)
@@ -35,6 +35,7 @@ run_unittests_SOURCES += udp_endpoint_unittest.cc
 run_unittests_SOURCES += udp_socket_unittest.cc
 run_unittests_SOURCES += io_service_unittest.cc
 run_unittests_SOURCES += io_service_signal_unittests.cc
+run_unittests_SOURCES += io_service_thread_pool_unittests.cc
 run_unittests_SOURCES += dummy_io_callback_unittest.cc
 run_unittests_SOURCES += tcp_acceptor_unittest.cc
 run_unittests_SOURCES += unix_domain_socket_unittest.cc
diff --git a/src/lib/asiolink/tests/io_service_thread_pool_unittests.cc b/src/lib/asiolink/tests/io_service_thread_pool_unittests.cc
new file mode 100644 (file)
index 0000000..496f3cf
--- /dev/null
@@ -0,0 +1,264 @@
+// Copyright (C) 2021 Internet Systems Consortium, Inc. ("ISC")
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+#include <config.h>
+#include <asiolink/asio_wrapper.h>
+#include <asiolink/interval_timer.h>
+#include <asiolink/io_service.h>
+#include <asiolink/io_service_thread_pool.h>
+#include <exceptions/exceptions.h>
+#include <testutils/gtest_utils.h>
+
+#include <gtest/gtest.h>
+#include <string>
+
+using namespace isc;
+using namespace isc::asiolink;
+
+namespace {
+
+/// @brief Test timeout (ms).
+const long TEST_TIMEOUT = 10000;
+
+/// @brief Simple test fixture for testing IoServiceThreadPool.
+class IoServiceThreadPoolTest : public ::testing::Test {
+public:
+    /// @brief Constructor.
+    IoServiceThreadPoolTest()
+        : io_service_(new IOService()) {
+    }
+
+    /// @brief Destructor.
+    virtual ~IoServiceThreadPoolTest() {
+        io_service_->stop();
+    }
+
+    /// @brief IOService instance used by thread pools.
+    IOServicePtr io_service_;
+};
+
+// URL contains scheme and hostname.
+TEST_F(IoServiceThreadPoolTest, invalidConstruction) {
+    IoServiceThreadPoolPtr pool;
+
+    // Constructing with pool size of 0 should fail.
+    ASSERT_THROW_MSG(pool.reset(new IoServiceThreadPool(io_service_, 0)), BadValue,
+                     "pool_size must be non 0");
+}
+
+// Verifies that a pool can be created without starting it.
+TEST_F(IoServiceThreadPoolTest, deferredStartConstruction) {
+    IoServiceThreadPoolPtr pool;
+
+    ASSERT_NO_THROW_LOG(pool.reset(new IoServiceThreadPool(io_service_, 3, true)));
+
+    // State should be stopped.
+    // Pool size should be 3.
+    // IOService should be there.
+    // IOService is new, so it should not be stopped.
+    // No threads in the pool.
+    ASSERT_TRUE(pool->isStopped());
+    EXPECT_EQ(pool->getPoolSize(), 3);
+    ASSERT_TRUE(pool->getIOService());
+    EXPECT_FALSE(pool->getIOService()->stopped());
+    EXPECT_EQ(pool->getThreadCount(), 0);
+
+    // Destructor should not throw.
+    ASSERT_NO_THROW_LOG(pool.reset());
+}
+
+// Verifies that a pool can be started within the constructor.
+TEST_F(IoServiceThreadPoolTest, startDuringConstruction) {
+    IoServiceThreadPoolPtr pool;
+
+    ASSERT_NO_THROW_LOG(pool.reset(new IoServiceThreadPool(io_service_, 3)));
+
+    // State should be running.
+    // Pool size should be 3.
+    // IOService should be there.
+    // IOService is new, so it should not be stopped.
+    // Should have 3 threads in the pool.
+    ASSERT_TRUE(pool->isRunning());
+    EXPECT_EQ(pool->getPoolSize(), 3);
+    ASSERT_TRUE(pool->getIOService());
+    EXPECT_FALSE(pool->getIOService()->stopped());
+    EXPECT_EQ(pool->getThreadCount(), 3);
+
+    // Destructor should not throw.
+    ASSERT_NO_THROW_LOG(pool.reset());
+}
+
+// Verifies that pool can move from STOPPED to RUNNING.
+TEST_F(IoServiceThreadPoolTest, stoppedToRunning) {
+    IoServiceThreadPoolPtr pool;
+
+    // Create a stopped pool.
+    ASSERT_NO_THROW_LOG(pool.reset(new IoServiceThreadPool(io_service_, 3, true)));
+    ASSERT_TRUE(pool->isStopped());
+
+    // Call run from STOPPED.
+    ASSERT_NO_THROW_LOG(pool->run());
+
+    // State should be RUNNING, IOService should not be stopped, we should
+    // have 3 threads in the pool.
+    ASSERT_TRUE(pool->isRunning());
+    EXPECT_FALSE(pool->getIOService()->stopped());
+    EXPECT_EQ(pool->getThreadCount(), 3);
+
+    // Calling run again should be harmless.
+    ASSERT_NO_THROW_LOG(pool->run());
+
+    // State should be RUNNING, IOService should not be stopped, we should
+    // have 3 threads in the pool.
+    ASSERT_TRUE(pool->isRunning());
+    EXPECT_FALSE(pool->getIOService()->stopped());
+    EXPECT_EQ(pool->getThreadCount(), 3);
+
+    // Destroying the pool should be fine.
+    ASSERT_NO_THROW_LOG(pool.reset());
+}
+
+// Verifies that pool can move from RUNNING to STOPPED.
+TEST_F(IoServiceThreadPoolTest, runningToStopped) {
+    IoServiceThreadPoolPtr pool;
+
+    // Create a running pool.
+    ASSERT_NO_THROW_LOG(pool.reset(new IoServiceThreadPool(io_service_, 3, false)));
+    ASSERT_TRUE(pool->isRunning());
+
+    // Call stop.
+    ASSERT_NO_THROW_LOG(pool->stop());
+
+    // State should be STOPPED, IOService should be stopped, we should
+    // have 0 threads in the pool.
+    ASSERT_TRUE(pool->isStopped());
+    EXPECT_TRUE(pool->getIOService()->stopped());
+    EXPECT_EQ(pool->getThreadCount(), 0);
+
+    // Calling stop again should be harmless.
+    ASSERT_NO_THROW_LOG(pool->stop());
+
+    // State should be STOPPED, IOService should be stopped, we should
+    // have 0 threads in the pool.
+    ASSERT_TRUE(pool->isStopped());
+    EXPECT_TRUE(pool->getIOService()->stopped());
+    EXPECT_EQ(pool->getThreadCount(), 0);
+
+    // Destroying the pool should be fine.
+    ASSERT_NO_THROW_LOG(pool.reset());
+}
+
+// Verifies that pool can move from RUNNING to PAUSED.
+TEST_F(IoServiceThreadPoolTest, runningToPaused) {
+    IoServiceThreadPoolPtr pool;
+
+    // Create a running pool.
+    ASSERT_NO_THROW_LOG(pool.reset(new IoServiceThreadPool(io_service_, 3, false)));
+    ASSERT_TRUE(pool->isRunning());
+
+    // Call pause from RUNNING.
+    ASSERT_NO_THROW_LOG(pool->pause());
+
+    // State should be PAUSED, IOService should be stopped, we should
+    // have 3 threads in the pool.
+    ASSERT_TRUE(pool->isPaused());
+    EXPECT_TRUE(pool->getIOService()->stopped());
+    EXPECT_EQ(pool->getThreadCount(), 3);
+
+    // Calling pause again should be harmless.
+    ASSERT_NO_THROW_LOG(pool->pause());
+
+    // State should be PAUSED, IOService should be stopped, we should
+    // have 3 threads in the pool.
+    ASSERT_TRUE(pool->isPaused());
+    EXPECT_TRUE(pool->getIOService()->stopped());
+    EXPECT_EQ(pool->getThreadCount(), 3);
+
+    // Destroying the pool should be fine.
+    ASSERT_NO_THROW_LOG(pool.reset());
+}
+
+// Verifies that pool can move from PAUSED to RUNNING.
+TEST_F(IoServiceThreadPoolTest, pausedToRunning) {
+    IoServiceThreadPoolPtr pool;
+
+    // Create a running pool.
+    ASSERT_NO_THROW_LOG(pool.reset(new IoServiceThreadPool(io_service_, 3, false)));
+    ASSERT_TRUE(pool->isRunning());
+
+    // Call pause from RUNNING.
+    ASSERT_NO_THROW_LOG(pool->pause());
+    ASSERT_TRUE(pool->isPaused());
+
+    // Call run.
+    ASSERT_NO_THROW_LOG(pool->run());
+
+    // State should be RUNNING, IOService should not be stopped, we should
+    // have 3 threads in the pool.
+    ASSERT_TRUE(pool->isRunning());
+    EXPECT_FALSE(pool->getIOService()->stopped());
+    EXPECT_EQ(pool->getThreadCount(), 3);
+
+    // Destroying the pool should be fine.
+    ASSERT_NO_THROW_LOG(pool.reset());
+}
+
+// Verifies that pool can move from PAUSED to STOPPED.
+TEST_F(IoServiceThreadPoolTest, pausedToStopped) {
+    IoServiceThreadPoolPtr pool;
+
+    // Create a running pool.
+    ASSERT_NO_THROW_LOG(pool.reset(new IoServiceThreadPool(io_service_, 3, false)));
+    ASSERT_TRUE(pool->isRunning());
+
+    // Call pause from RUNNING.
+    ASSERT_NO_THROW_LOG(pool->pause());
+    ASSERT_TRUE(pool->isPaused());
+
+    // Call stop.
+    ASSERT_NO_THROW_LOG(pool->stop());
+
+    // State should be STOPPED, IOService should be stopped, we should
+    // have 0 threads in the pool.
+    ASSERT_TRUE(pool->isStopped());
+    EXPECT_TRUE(pool->getIOService()->stopped());
+    EXPECT_EQ(pool->getThreadCount(), 0);
+
+    // Destroying the pool should be fine.
+    ASSERT_NO_THROW_LOG(pool.reset());
+}
+
+// Verifies that attempting to pause a STOPPED pool has no effect.
+TEST_F(IoServiceThreadPoolTest, stoppedToPaused) {
+    IoServiceThreadPoolPtr pool;
+
+    // Create a stopped pool.
+    ASSERT_NO_THROW_LOG(pool.reset(new IoServiceThreadPool(io_service_, 3, true)));
+    ASSERT_TRUE(pool->isStopped());
+
+    // State should be STOPPED, IOService won't be stopped because it was
+    // never started. We should have 0 threads in the pool.
+    ASSERT_TRUE(pool->isStopped());
+    EXPECT_FALSE(pool->getIOService()->stopped());
+    EXPECT_EQ(pool->getThreadCount(), 0);
+
+    // Call pause from STOPPED.
+    ASSERT_NO_THROW_LOG(pool->pause());
+
+    // Should have no effect.
+    ASSERT_TRUE(pool->isStopped());
+
+    // State should be STOPPED, IOService won't be stopped because it was
+    // never started. We should have 0 threads in the pool.
+    ASSERT_TRUE(pool->isStopped());
+    EXPECT_FALSE(pool->getIOService()->stopped());
+    EXPECT_EQ(pool->getThreadCount(), 0);
+
+    // Destroying the pool should be fine.
+    ASSERT_NO_THROW_LOG(pool.reset());
+}
+
+}