]> git.ipfire.org Git - thirdparty/kea.git/commitdiff
[#1818] Pausable HttpThreadPool initial implemenation
authorThomas Markwalder <tmark@isc.org>
Tue, 4 May 2021 14:29:40 +0000 (10:29 -0400)
committerThomas Markwalder <tmark@isc.org>
Mon, 17 May 2021 14:56:49 +0000 (10:56 -0400)
src/lib/asiolink/io_service.*
    IOServcie::stopped()
    IOServcie::restart() - new wrapper methods

src/lib/http/http_thread_pool.*
    New files that implement HttpThreadPool class

src/lib/config/cmd_http_listener.*
    CmdHttpListner - retooled to use HttpThreadPool
    Added pause() and resume() methods

src/lib/config/tests/cmd_http_listener_unittests.cc
    Retooled and added unit tests for pause/resume

src/lib/http/Makefile.am
    Added http_thread_pool.*

src/lib/http/client.cc
    CmdHttpListner - retooled to use HttpThreadPool
    Added pause() and resume() methods

src/lib/http/tests/Makefile.am
    Added http_thread_pool_unittests.cc

src/lib/http/tests/mt_client_unittests.cc
    Retooled and added unit tests for pause/resume

12 files changed:
src/lib/asiolink/io_service.cc
src/lib/asiolink/io_service.h
src/lib/config/cmd_http_listener.cc
src/lib/config/cmd_http_listener.h
src/lib/config/tests/cmd_http_listener_unittests.cc
src/lib/http/Makefile.am
src/lib/http/client.cc
src/lib/http/client.h
src/lib/http/http_thread_pool.cc [new file with mode: 0644]
src/lib/http/http_thread_pool.h [new file with mode: 0644]
src/lib/http/tests/Makefile.am
src/lib/http/tests/mt_client_unittests.cc

index 0574c5f866ca84501de76f7d064c10f6cb90e4aa..927a1b1ba90968bdb72df0aafa3cb53654d25d88 100644 (file)
@@ -84,6 +84,18 @@ public:
     /// This will return the control to the caller of the \c run() method.
     void stop() { io_service_.stop();} ;
 
+    /// \brief Indicates if the IOService has been stopped.
+    ///
+    /// \return true if the IOService has been stopped, false otherwise.
+    bool stopped() const {
+        return (io_service_.stopped());
+    }
+
+    /// \brief Restarts the IOService in preparation for a subsequent \c run() invocation.
+    void restart() {
+        io_service_.reset();
+    }
+
     /// \brief Removes IO service work object to let it finish running
     /// when all handlers have been invoked.
     void stopWork() {
@@ -136,6 +148,16 @@ IOService::stop() {
     io_impl_->stop();
 }
 
+bool
+IOService::stopped() const {
+    return (io_impl_->stopped());
+}
+
+void
+IOService::restart() {
+    io_impl_->restart();
+}
+
 void
 IOService::stopWork() {
     io_impl_->stopWork();
index eeb668e16b11b9300f28d20e6944ae0442f88bd8..4bcedf1cb762061ce4bb10b6f3f28838924ffc0f 100644 (file)
@@ -71,6 +71,14 @@ public:
     /// This will return the control to the caller of the \c run() method.
     void stop();
 
+    /// \brief Indicates if the IOService has been stopped.
+    ///
+    /// \return true if the IOService has been stopped, false otherwise.
+    bool stopped() const;
+
+    /// \brief Restarts the IOService in preparation for a subsequent \c run() invocation.
+    void restart();
+
     /// \brief Removes IO service work object to let it finish running
     /// when all handlers have been invoked.
     void stopWork();
index f3ff7bd2a86295be8de4f36bceca2c4fac236af0..b850e7aa2eb1524dbdd9aad21397dfc1c0bc6810 100644 (file)
@@ -66,12 +66,8 @@ CmdHttpListener::start() {
                                               HttpListener::RequestTimeout(TIMEOUT_AGENT_RECEIVE_COMMAND),
                                               HttpListener::IdleTimeout(TIMEOUT_AGENT_IDLE_CONNECTION_TIMEOUT)));
 
-        // Create a pool of threads, each calls run on our IOService_service instance.
-        for (std::size_t i = 0; i < thread_pool_size_; ++i) {
-            boost::shared_ptr<std::thread> thread(new std::thread(
-                std::bind(&IOService::run, io_service_)));
-            threads_.push_back(thread);
-        }
+        // Create the thread pool.
+        threads_.reset(new HttpThreadPool(io_service_, thread_pool_size_, false));
 
         // Instruct the HTTP listener to actually open socket, install
         // callback and start listening.
@@ -87,6 +83,20 @@ CmdHttpListener::start() {
     }
 }
 
+void
+CmdHttpListener::pause() {
+    if (threads_) {
+        threads_->pause();
+    }
+}
+
+void
+CmdHttpListener::resume() {
+    if (threads_) {
+        threads_->resume();
+    }
+}
+
 void
 CmdHttpListener::stop() {
     // Nothing to do.
@@ -98,15 +108,8 @@ CmdHttpListener::stop() {
               .arg(address_)
               .arg(port_);
 
-    // Stop the IOService first.
-    io_service_->stop();
-
-    // Stop the threads next.
-    for (auto const& thread : threads_) {
-        thread->join();
-    }
-
-    threads_.clear();
+    // Stop the thread pool.
+    threads_->stop();
 
     // Get rid of the listener.
     http_listener_.reset();
@@ -119,6 +122,16 @@ CmdHttpListener::stop() {
               .arg(port_);
 }
 
+HttpThreadPool::RunState 
+CmdHttpListener::getRunState() const {
+    if (!threads_) {
+        isc_throw(InvalidOperation,
+                  "CmdHttpListener::getRunState - no thread pool!");
+    }
+
+    return (threads_->getRunState());
+}
+
 bool
 CmdHttpListener::isListening() const {
     // If we have a listener we're listening.
index 3929471662af8a0b7f841e4dbb3a17f8df1eb3b3..e7fd4428a36256fe2b96447235c91a3f8bcff8a6 100644 (file)
@@ -10,6 +10,7 @@
 #include <asiolink/io_address.h>
 #include <asiolink/io_service.h>
 #include <http/listener.h>
+#include <http/http_thread_pool.h>
 #include <thread>
 #include <vector>
 
@@ -37,12 +38,20 @@ public:
     /// @brief Destructor
     virtual ~CmdHttpListener();
 
-    /// @brief Initiates the listener's worker thread.
+    /// @brief Starts running the listener's thread pool.
     void start();
 
-    /// @brief Stops the listener's worker thread.
+    /// @brief Pauses the listener's thread pool. 
+    void pause();
+
+    /// @brief Resumes running the listener's thread pool. 
+    void resume();
+
+    /// @brief Stops the listener's thread pool.
     void stop();
 
+    http::HttpThreadPool::RunState getRunState() const;
+
     /// @brief Checks if we are listening to the HTTP requests.
     ///
     /// @return true if we are listening.
@@ -73,7 +82,11 @@ public:
     ///
     /// @return uint16_t containing the number of running threads.
     uint16_t getThreadCount() {
-        return (threads_.size());
+        if (!threads_) {
+            return (0);
+        }
+
+        return (threads_->getThreadCount());
     }
 
 private:
@@ -93,7 +106,7 @@ private:
     std::size_t thread_pool_size_;
 
     /// @brief The pool of threads that do IO work.
-    std::vector<boost::shared_ptr<std::thread> > threads_;
+    http::HttpThreadPoolPtr threads_;
 };
 
 /// @brief Defines a shared pointer to CmdHttpListener.
index 3d0f1e57f474585c7e4364c66e432da6cf5fef31..7f2bb8c141d3775900e89ae734874e97095a516b 100644 (file)
@@ -52,9 +52,9 @@ public:
     /// Starts test timer which detects timeouts, deregisters all commands
     /// from CommandMgr, and enables multi-threading mode.
     CmdHttpListenerTest()
-        : io_service_(), test_timer_(io_service_), run_io_service_timer_(io_service_),
+        : listener_(), io_service_(), test_timer_(io_service_), run_io_service_timer_(io_service_),
         clients_(), num_threads_(), num_clients_(), num_in_progress_(0), num_finished_(0),
-        chunk_size_(0) {
+        chunk_size_(0), paused_(false), pause_cnt_(0) {
         test_timer_.setup(std::bind(&CmdHttpListenerTest::timeoutHandler, this, true),
                           TEST_TIMEOUT, IntervalTimer::ONE_SHOT);
 
@@ -69,6 +69,9 @@ public:
     ///
     /// Removes HTTP clients, unregisters commands, disables MT.
     virtual ~CmdHttpListenerTest() {
+        // Wipe out the listener.
+        listener_.reset();
+
         // Destroy all remaining clients.
         for (auto const& client : clients_) {
             client->close();
@@ -175,29 +178,57 @@ public:
     /// because the test clients stop the io_service when they're
     /// through with a request.
     ///
-    /// @param timeout Optional value specifying for how long the io service
-    /// should be ran.
-    void runIOService() {
+    /// @param num_pauses Desired number of times the listener should be
+    /// paused during the test.
+    void runIOService(size_t num_pauses = 0) {
+        // Create a timer to use for invoking resume after pause.
+        IntervalTimer pause_timer_(io_service_);
+        paused_ = false;
+
         // Loop until the clients are done, an error occurs, or the time runs out.
-        bool keep_going = true;
-        while (keep_going) {
-            // Always call reset() before we call run();
-            io_service_.get_io_service().reset();
+        size_t num_done = 0;
+        while (num_done != clients_.size()) {
+            // Always call restart() before we call run();
+            io_service_.restart();
+
+            if (shouldPause(num_pauses, num_done)) {
+                // Pause the listener .
+                paused_ = true;
+                ++pause_cnt_;
+                listener_->pause();
+
+                // Set timer to resume listener.
+                pause_timer_.setup(
+                    [this]() {
+                        listener_->resume();
+                        paused_ = false;
+                    }, 10, IntervalTimer::ONE_SHOT);
+            }
 
             // Run until a client stops the service.
             io_service_.run();
 
             // If all the clients are done receiving, the test is done.
-            keep_going = false;
+            num_done = 0;
             for (auto const& client : clients_) {
-                if (!client->receiveDone()) {
-                    keep_going = true;
-                    break;
+                if (client->receiveDone()) {
+                    ++num_done;
                 }
             }
         }
     }
 
+    /// @brief Determines if the listner should be paused.
+    ///
+    /// @param num_pauses desired number of pauses
+    /// @param num_done number of clients that have completed their requests.
+    ///
+    /// @return True if the listener should be paused.
+    bool shouldPause(size_t num_pauses, size_t num_done) {
+        // True if the number of clients done is a mulitple of the number of pauses.
+        return (!paused_ && num_pauses && num_done && !(num_done % num_pauses));
+    }
+
     /// @brief Create an HttpResponse from a response string.
     ///
     /// @param response_str a string containing the whole HTTP
@@ -323,7 +354,10 @@ public:
     /// thread command.  Each client is used to carry out a single thread
     /// command request.  Must be greater than 0 and a multiple of num_threads
     /// if it is greater than num_threads.
-    void threadListenAndRespond(size_t num_threads, size_t num_clients) {
+    /// @param num_pauses Desired number of times the listener should be
+    /// paused during the test.
+    void threadListenAndRespond(size_t num_threads, size_t num_clients,
+                                size_t num_pauses = 0) {
         // First we makes sure the parameter rules apply.
         ASSERT_TRUE(num_threads > 0);
         ASSERT_TRUE(num_clients > 0);
@@ -343,15 +377,14 @@ public:
                                                          this, ph::_1, ph::_2));
 
         // Create a listener with prescribed number of threads.
-        CmdHttpListenerPtr listener;
-        ASSERT_NO_THROW_LOG(listener.reset(new CmdHttpListener(IOAddress(SERVER_ADDRESS),
+        ASSERT_NO_THROW_LOG(listener_.reset(new CmdHttpListener(IOAddress(SERVER_ADDRESS),
                                                                SERVER_PORT, num_threads)));
-        ASSERT_TRUE(listener);
+        ASSERT_TRUE(listener_);
 
         // Start it and verify it is listening.
-        ASSERT_NO_THROW_LOG(listener->start());
-        ASSERT_TRUE(listener->isListening());
-        EXPECT_EQ(listener->getThreadCount(), num_threads);
+        ASSERT_NO_THROW_LOG(listener_->start());
+        ASSERT_TRUE(listener_->isListening());
+        EXPECT_EQ(listener_->getThreadCount(), num_threads);
 
         // Maps the number of clients served by a given thread-id.
         std::map<std::string, int> clients_per_thread;
@@ -364,12 +397,12 @@ public:
 
         // Now we run the client-side IOService until all requests are done,
         // errors occur or the test times out.
-        ASSERT_NO_FATAL_FAILURE(runIOService());
+        ASSERT_NO_FATAL_FAILURE(runIOService(num_pauses));
 
         // Stop the listener and then verify it has stopped.
-        ASSERT_NO_THROW_LOG(listener->stop());
-        ASSERT_FALSE(listener->isListening());
-        EXPECT_EQ(listener->getThreadCount(), 0);
+        ASSERT_NO_THROW_LOG(listener_->stop());
+        ASSERT_FALSE(listener_->isListening());
+        EXPECT_EQ(listener_->getThreadCount(), 0);
 
         // Iterate over the clients, checking their outcomes.
         size_t total_responses = 0;
@@ -460,8 +493,23 @@ public:
                       << "thread-id: " << it.first
                       << ", clients: " << it.second << std::endl;
         }
+
+        // We should have had the expected number of pauses.
+        if (!num_pauses) {
+            ASSERT_EQ(pause_cnt_, 0);
+        } else {
+            // We allow a range on pauses of +-1.
+            ASSERT_TRUE((num_pauses - 1) <= pause_cnt_ &&
+                        (pause_cnt_ <= (num_pauses + 1)))
+                        << " num+_pauses: " << num_pauses
+                        << ", pause_cnt_" << pause_cnt_;
+        }
     }
 
+
+    /// @brief CmdHttpListener instance under test.
+    CmdHttpListenerPtr listener_;
+
     /// @brief IO service used in drive the test and test clients.
     IOService io_service_;
 
@@ -499,6 +547,12 @@ public:
 
     /// @brief Condition variable used to coordinate threads.
     std::condition_variable cv_;
+
+    /// @brief Indicates if client threads are currently "paused".
+    bool paused_;
+
+    /// @brief Number of times client has been paused during the test.
+    size_t pause_cnt_;
 };
 
 /// Verifies the construction, starting, stopping, and destruction
@@ -506,75 +560,74 @@ public:
 TEST_F(CmdHttpListenerTest, basics) {
     // Make sure multi-threading is off.
     MultiThreadingMgr::instance().setMode(false);
-    CmdHttpListenerPtr listener;
     asiolink::IOAddress address(SERVER_ADDRESS);
     uint16_t port = SERVER_PORT;
 
     // Make sure we can create one.
-    ASSERT_NO_THROW_LOG(listener.reset(new CmdHttpListener(address, port)));
-    ASSERT_TRUE(listener);
+    ASSERT_NO_THROW_LOG(listener_.reset(new CmdHttpListener(address, port)));
+    ASSERT_TRUE(listener_);
 
     // Verify the getters do what we expect.
-    EXPECT_EQ(listener->getAddress(), address);
-    EXPECT_EQ(listener->getPort(), port);
-    EXPECT_EQ(listener->getThreadPoolSize(), 1);
+    EXPECT_EQ(listener_->getAddress(), address);
+    EXPECT_EQ(listener_->getPort(), port);
+    EXPECT_EQ(listener_->getThreadPoolSize(), 1);
 
     // It should not be listening and have no threads.
-    EXPECT_FALSE(listener->isListening());
-    EXPECT_EQ(listener->getThreadCount(), 0);
+    EXPECT_FALSE(listener_->isListening());
+    EXPECT_EQ(listener_->getThreadCount(), 0);
 
     // Verify that we cannot start it when multi-threading is disabled.
     ASSERT_FALSE(MultiThreadingMgr::instance().getMode());
-    ASSERT_THROW_MSG(listener->start(), InvalidOperation,
+    ASSERT_THROW_MSG(listener_->start(), InvalidOperation,
                      "CmdHttpListener cannot be started"
                      " when multi-threading is disabled");
 
     // It should still not be listening and have no threads.
-    EXPECT_FALSE(listener->isListening());
-    EXPECT_EQ(listener->getThreadCount(), 0);
+    EXPECT_FALSE(listener_->isListening());
+    EXPECT_EQ(listener_->getThreadCount(), 0);
 
     // Enable multi-threading.
     MultiThreadingMgr::instance().setMode(true);
 
     // Make sure we can start it and it's listening with 1 thread.
-    ASSERT_NO_THROW_LOG(listener->start());
-    ASSERT_TRUE(listener->isListening());
-    EXPECT_EQ(listener->getThreadCount(), 1);
+    ASSERT_NO_THROW_LOG(listener_->start());
+    ASSERT_TRUE(listener_->isListening());
+    EXPECT_EQ(listener_->getThreadCount(), 1);
 
     // Trying to start it again should fail.
-    ASSERT_THROW_MSG(listener->start(), InvalidOperation,
+    ASSERT_THROW_MSG(listener_->start(), InvalidOperation,
                      "CmdHttpListener is already listening!");
 
     // Stop it and verify we're no longer listening.
-    ASSERT_NO_THROW_LOG(listener->stop());
-    ASSERT_FALSE(listener->isListening());
-    EXPECT_EQ(listener->getThreadCount(), 0);
+    ASSERT_NO_THROW_LOG(listener_->stop());
+    ASSERT_FALSE(listener_->isListening());
+    EXPECT_EQ(listener_->getThreadCount(), 0);
 
     // Make sure we can call stop again without problems.
-    ASSERT_NO_THROW_LOG(listener->stop());
+    ASSERT_NO_THROW_LOG(listener_->stop());
 
     // We should be able to restart it.
-    ASSERT_NO_THROW_LOG(listener->start());
-    ASSERT_TRUE(listener->isListening());
-    EXPECT_EQ(listener->getThreadCount(), 1);
+    ASSERT_NO_THROW_LOG(listener_->start());
+    ASSERT_TRUE(listener_->isListening());
+    EXPECT_EQ(listener_->getThreadCount(), 1);
 
     // Destroying it should also stop it.
     // If the test timeouts we know it didn't!
-    ASSERT_NO_THROW_LOG(listener.reset());
+    ASSERT_NO_THROW_LOG(listener_.reset());
 
     // Verify we can construct with more than one thread.
-    ASSERT_NO_THROW_LOG(listener.reset(new CmdHttpListener(address, port, 4)));
-    ASSERT_NO_THROW_LOG(listener->start());
-    EXPECT_EQ(listener->getAddress(), address);
-    EXPECT_EQ(listener->getPort(), port);
-    EXPECT_EQ(listener->getThreadPoolSize(), 4);
-    ASSERT_TRUE(listener->isListening());
-    EXPECT_EQ(listener->getThreadCount(), 4);
+    ASSERT_NO_THROW_LOG(listener_.reset(new CmdHttpListener(address, port, 4)));
+    ASSERT_NO_THROW_LOG(listener_->start());
+    EXPECT_EQ(listener_->getAddress(), address);
+    EXPECT_EQ(listener_->getPort(), port);
+    EXPECT_EQ(listener_->getThreadPoolSize(), 4);
+    ASSERT_TRUE(listener_->isListening());
+    EXPECT_EQ(listener_->getThreadCount(), 4);
 
     // Stop it and verify we're no longer listening.
-    ASSERT_NO_THROW_LOG(listener->stop());
-    ASSERT_FALSE(listener->isListening());
-    EXPECT_EQ(listener->getThreadCount(), 0);
+    ASSERT_NO_THROW_LOG(listener_->stop());
+    ASSERT_FALSE(listener_->isListening());
+    EXPECT_EQ(listener_->getThreadCount(), 0);
 }
 
 
@@ -583,19 +636,19 @@ TEST_F(CmdHttpListenerTest, basics) {
 TEST_F(CmdHttpListenerTest, basicListenAndRespond) {
 
     // Create a listener with 1 thread.
-    CmdHttpListenerPtr listener;
-    ASSERT_NO_THROW_LOG(listener.reset(new CmdHttpListener(IOAddress(SERVER_ADDRESS),
+    ASSERT_NO_THROW_LOG(listener_.reset(new CmdHttpListener(IOAddress(SERVER_ADDRESS),
                                                            SERVER_PORT)));
-    ASSERT_TRUE(listener);
+    ASSERT_TRUE(listener_);
 
     // Start the listener and verify it's listening with 1 thread.
-    ASSERT_NO_THROW_LOG(listener->start());
-    ASSERT_TRUE(listener->isListening());
-    EXPECT_EQ(listener->getThreadCount(), 1);
+    ASSERT_NO_THROW_LOG(listener_->start());
+    ASSERT_TRUE(listener_->isListening());
+    EXPECT_EQ(listener_->getThreadCount(), 1);
 
     // Now let's send a "foo" command.  This should create a client, connect
     // to our listener, post our request and retrieve our reply.
     ASSERT_NO_THROW(startRequest("{\"command\": \"foo\"}"));
+    ++num_clients_;
     ASSERT_EQ(1, clients_.size());
     ASSERT_NO_THROW(runIOService());
     TestHttpClientPtr client = clients_.front();
@@ -614,6 +667,7 @@ TEST_F(CmdHttpListenerTest, basicListenAndRespond) {
                                                       this, ph::_1, ph::_2));
     // Try posting the foo command again.
     ASSERT_NO_THROW(startRequest("{\"command\": \"foo\"}"));
+    ++num_clients_;
     ASSERT_EQ(2, clients_.size());
     ASSERT_NO_THROW(runIOService());
     client = clients_.back();
@@ -626,13 +680,13 @@ TEST_F(CmdHttpListenerTest, basicListenAndRespond) {
     EXPECT_EQ(hr->getBody(), "[ { \"arguments\": [ \"bar\" ], \"result\": 0 } ]");
 
     // Make sure the listener is still listening.
-    ASSERT_TRUE(listener->isListening());
-    EXPECT_EQ(listener->getThreadCount(), 1);
+    ASSERT_TRUE(listener_->isListening());
+    EXPECT_EQ(listener_->getThreadCount(), 1);
 
     // Stop the listener then verify it has stopped.
-    ASSERT_NO_THROW_LOG(listener->stop());
-    ASSERT_FALSE(listener->isListening());
-    EXPECT_EQ(listener->getThreadCount(), 0);
+    ASSERT_NO_THROW_LOG(listener_->stop());
+    ASSERT_FALSE(listener_->isListening());
+    EXPECT_EQ(listener_->getThreadCount(), 0);
 }
 
 // Now we'll run some permutations of the number of listener threads
@@ -680,4 +734,12 @@ TEST_F(CmdHttpListenerTest, sixByEighteen) {
     threadListenAndRespond(num_threads, num_clients);
 }
 
+// Pauses and resumes during work.
+TEST_F(CmdHttpListenerTest, pauseAndResume) {
+    size_t num_threads = 6;
+    size_t num_clients = 18;
+    size_t num_pauses = 3;
+    threadListenAndRespond(num_threads, num_clients, num_pauses);
+}
+
 } // end of anonymous namespace
index b2646e706583d73c138290e30f91470c540f98e7..6101a0c84f35712d9aa0cbddc587b3cc659a7bf9 100644 (file)
@@ -43,6 +43,7 @@ libkea_http_la_SOURCES += auth_log.cc auth_log.h
 libkea_http_la_SOURCES += auth_messages.cc auth_messages.h
 libkea_http_la_SOURCES += basic_auth_config.cc basic_auth_config.h
 libkea_http_la_SOURCES += basic_auth.cc basic_auth.h
+libkea_http_la_SOURCES += http_thread_pool.cc http_thread_pool.h
 
 libkea_http_la_CXXFLAGS = $(AM_CXXFLAGS)
 libkea_http_la_CPPFLAGS = $(AM_CPPFLAGS)
@@ -114,6 +115,7 @@ libkea_http_include_HEADERS = \
        http_message.h \
        http_message_parser_base.h \
        http_messages.h \
+       http_thread_pool.h \
        http_types.h \
        listener.h \
        listener_impl.h \
index 30c82d184a9f792b8147429ee16061352ca0fb8e..07dc4f5f5fc4bd514cba2d286e052979462a412c 100644 (file)
@@ -27,6 +27,8 @@
 #include <map>
 #include <mutex>
 #include <queue>
+#include <thread>
+
 
 using namespace isc;
 using namespace isc::asiolink;
@@ -1736,30 +1738,30 @@ public:
     ///
     /// @param io_service IOService that will drive connection IO in single
     /// threaded mode.  (Currently ignored in multi-threaded mode)
-    ///
     /// @param thread_pool_size maximum number of concurrent threads
     /// Internally this also sets the maximum number concurrent connections
+    /// @param defer_thread_start if true, then the thread pool will be
+    /// created but started  Applicable only when thread-pool-size is
+    /// greater than zero. 
+    /// will not be startedfalse, then 
     /// per URL.
-    HttpClientImpl(IOService& io_service, size_t thread_pool_size = 0) :
-        thread_pool_size_(thread_pool_size) {
+    HttpClientImpl(IOService& io_service, size_t thread_pool_size = 0,
+                   bool defer_thread_start = false)
+        : thread_pool_size_(thread_pool_size), threads_() {
         if (thread_pool_size_ > 0) {
             // Create our own private IOService.
             thread_io_service_.reset(new IOService());
 
-            // Create a pool of threads, each calls run on the same, private
-            // io_service instance
-            for (std::size_t i = 0; i < thread_pool_size_; ++i) {
-                boost::shared_ptr<std::thread> thread(new std::thread(std::bind(&IOService::run,
-                                                                      thread_io_service_)));
-                threads_.push_back(thread);
-            }
+            // Create the thread pool. 
+            threads_.reset(new HttpThreadPool(thread_io_service_, thread_pool_size_,
+                                              defer_thread_start));
 
             // Create the connection pool. Note that we use the thread_pool_size
             // as the maximum connections per URL value.
             conn_pool_.reset(new ConnectionPool(*thread_io_service_, thread_pool_size_));
 
             LOG_DEBUG(http_logger, isc::log::DBGLVL_TRACE_BASIC, HTTP_CLIENT_MT_STARTED)
-                     .arg(threads_.size());
+                     .arg(getThreadCount());
         } else {
             // Single-threaded mode: use the caller's IOService,
             // one connection per URL.
@@ -1774,30 +1776,40 @@ public:
         stop();
     }
 
-    /// @brief Close all connections, and if multi-threaded, stop internal IOService
-    /// and the thread pool.
+    /// @brief Close all connections, and if multi-threaded, stops the
+    /// thread pool.
     void stop() {
         // Close all the connections.
         conn_pool_->closeAll();
 
-        // Stop the multi-threaded service.
-        if (thread_io_service_) {
-            // Flush cancelled (and ready) handlers.
-            thread_io_service_->poll();
+        if (threads_) {
+            threads_->stop();
+        }
+    }
 
-            // Stop the private IOService.
-            thread_io_service_->stop();
+    void pause() {
+        if (!threads_) {
+            isc_throw(InvalidOperation, "HttpClient::pause - no thread pool");
+        }
 
-            // Shutdown the threads.
-            for (auto const& thread : threads_) {
-                thread->join();
-            }
+        threads_->pause();
+    }
 
-            threads_.clear();
+    /// @brief Pauses the thread pool's worker threads.
+    void resume() {
+        if (!threads_) {
+            isc_throw(InvalidOperation, "HttpClient::resume - no thread pool");
         }
 
-        // Get rid of the IOService.
-        thread_io_service_.reset();
+        threads_->resume();
+    }
+
+    HttpThreadPool::RunState getRunState() const {
+        if (!threads_) {
+            isc_throw(InvalidOperation, "HttpClient::getRunState - no thread pool");
+        }
+
+        return (threads_->getRunState());
     }
 
     /// @brief Fetches the internal IOService used in multi-threaded mode.
@@ -1819,22 +1831,26 @@ public:
     ///
     /// @return the number of running threads.
     uint16_t getThreadCount() {
-        return (threads_.size());
+        if (!threads_) {
+            return (0);
+        }
+        return (threads_->getThreadCount());
     }
 
     /// @brief Holds a pointer to the connection pool.
     ConnectionPoolPtr conn_pool_;
 
 private:
+
     /// @brief Maxim number of threads in the thread pool.
     size_t thread_pool_size_;
 
-    /// @brief Pool of threads used to service connections in multi-threaded
-    /// mode.
-    std::vector<boost::shared_ptr<std::thread> > threads_;
-
     /// @brief Pointer to private IOService used in multi-threaded mode.
     asiolink::IOServicePtr thread_io_service_;
+
+    /// @brief Pool of threads used to service connections in multi-threaded
+    /// mode.
+    HttpThreadPoolPtr threads_;
 };
 
 HttpClient::HttpClient(IOService& io_service, size_t thread_pool_size) {
@@ -1898,6 +1914,16 @@ HttpClient::stop() {
     impl_->stop();
 }
 
+void
+HttpClient::pause() {
+    impl_->pause();
+}
+
+void
+HttpClient::resume() {
+    impl_->resume();
+}
+
 const IOServicePtr
 HttpClient::getThreadIOService() const {
     return (impl_->getThreadIOService());
@@ -1913,5 +1939,11 @@ HttpClient::getThreadCount() const {
     return (impl_->getThreadCount());
 }
 
+HttpThreadPool::RunState
+HttpClient::getRunState() const {
+    return (impl_->getRunState());
+}
+
+
 } // end of namespace isc::http
 } // end of namespace isc
index 4fd3c319e9f4cc6432e0288a117eb10321701e7f..6241238ece544b37ac0a5d0e501cf726a2c8781e 100644 (file)
@@ -13,6 +13,7 @@
 #include <http/url.h>
 #include <http/request.h>
 #include <http/response.h>
+#include <http/http_thread_pool.h>
 #include <boost/shared_ptr.hpp>
 #include <functional>
 #include <string>
@@ -85,7 +86,6 @@ class HttpClientImpl;
 /// are returned via the 3rd parameter of the callback.
 class HttpClient {
 public:
-
     /// @brief HTTP request/response timeout value.
     struct RequestTimeout {
         /// @brief Constructor.
@@ -249,6 +249,7 @@ public:
     /// IOService.
     void stop();
 
+
     /// @brief Closes a connection if it has an out-of-band socket event
     ///
     /// If the  client owns a connection using the given socket and that
@@ -280,6 +281,10 @@ public:
     /// @return the number of running threads.
     uint16_t getThreadCount() const;
 
+    void pause();
+    void resume();
+    HttpThreadPool::RunState getRunState() const;
+
 private:
 
     /// @brief Pointer to the HTTP client implementation.
diff --git a/src/lib/http/http_thread_pool.cc b/src/lib/http/http_thread_pool.cc
new file mode 100644 (file)
index 0000000..642ac83
--- /dev/null
@@ -0,0 +1,190 @@
+// Copyright (C) 2021 Internet Systems Consortium, Inc. ("ISC")
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+#include <config.h>
+
+#include <asiolink/asio_wrapper.h>
+#include <asiolink/io_service.h>
+#include <asiolink/interval_timer.h>
+#include <exceptions/exceptions.h>
+#include <http/http_log.h>
+#include <http/http_messages.h>
+#include <http/http_thread_pool.h>
+#include <util/multi_threading_mgr.h>
+#include <util/unlock_guard.h>
+
+#include <boost/shared_ptr.hpp>
+
+#include <atomic>
+#include <functional>
+#include <iostream>
+#include <list>
+#include <mutex>
+#include <thread>
+
+using namespace isc;
+using namespace isc::asiolink;
+using namespace isc::http;
+using namespace isc::util;
+
+HttpThreadPool::HttpThreadPool(IOServicePtr io_service, size_t pool_size,
+                               bool defer_start /* = false */)
+    : pool_size_(pool_size), io_service_(io_service),
+      run_state_(RunState::STOPPED), mutex_(), cv_()  {
+    if (!pool_size) {
+        isc_throw(BadValue, "HttpThreadPool::ctor pool_size must be > 0");
+    }
+
+    // If we weren't given an IOService, create our own.
+    if (!io_service_) {
+        io_service_.reset(new IOService());
+    }
+
+    // If we're not deferring the start, do it now.
+    if (!defer_start) {
+        start();
+    }
+}
+
+HttpThreadPool::~HttpThreadPool() {
+    if (getRunState() != RunState::STOPPED) {
+        // Stop if we aren't already stopped
+        stop();
+    }
+}
+
+void
+HttpThreadPool::start() {
+    if (getRunState() != RunState::STOPPED) {
+        isc_throw(InvalidOperation, "HttpThreadPool::start already started!");
+    }
+
+    // Set state to RUN.
+    setRunState(RunState::RUN);
+
+    // Prep IOservice for run() invocations.
+    io_service_->restart();
+
+    // Create a pool of threads, each calls run() on our
+    // io_service instance.
+    for (std::size_t i = 0; i < pool_size_; ++i) {
+        boost::shared_ptr<std::thread> thread(new std::thread(
+            [this]() {
+                bool done = false;
+                while (!done) {
+                    switch (getRunState()) {
+                    case RunState::RUN:
+                        io_service_->run();
+                        break;
+                    case RunState::PAUSED:
+                    {
+                        // We need to stop and wait to be released. We don't care how 
+                        // we exit, we'll do whatever the current state dictates.
+                        std::unique_lock<std::mutex> lck(mutex_);
+                        static_cast<void>(cv_.wait_for(lck, std::chrono::milliseconds(25),
+                            [&]() {
+                                return (run_state_ != RunState::PAUSED);
+                            }));
+
+                        break;
+                    }
+                    case RunState::SHUTDOWN:
+                        done = true;
+                        break;
+                    case RunState::STOPPED:
+                        // This should never happen.
+                        done = true;
+                        break;
+                    }
+                }
+            }));
+
+        threads_.push_back(thread);
+    }
+}
+
+void
+HttpThreadPool::stop() {
+    if (getRunState() == RunState::STOPPED) {
+        // Nothing to do.
+        return;
+    }
+
+    // Set the state to SHUTDOWN.
+    setRunState(RunState::SHUTDOWN);
+
+    // Stop our IOService.
+    if (!io_service_->stopped()) {
+        io_service_->stop();
+    }
+
+    // Shutdown the threads.
+    for (auto const& thread : threads_) {
+        thread->join();
+    }
+
+    // Empty the thread pool.
+    threads_.clear();
+
+    // Set the state to STOPPED.
+    setRunState(RunState::STOPPED);
+}
+
+void
+HttpThreadPool::pause() {
+    if (getRunState() !=  RunState::RUN) {
+        // Not running, can't pause.
+        return;
+    }
+
+    /// @todo TKM - Take this out
+    std::cout << "HttpThreadPool pausing" << std::endl;
+    setRunState(RunState::PAUSED);
+    io_service_->stop();
+}
+
+void
+HttpThreadPool::resume() {
+    if (getRunState() !=  RunState::PAUSED) {
+        // Not PAUSED, can't resume.
+        return;
+    }
+
+    /// @todo TKM - Take this out
+    std::cout << "HttpThreadPool resuming" << std::endl;
+    io_service_->restart();
+    setRunState(RunState::RUN);
+}
+
+HttpThreadPool::RunState
+HttpThreadPool::getRunState() {
+    std::lock_guard<std::mutex> lck(mutex_);
+    return (run_state_);
+}
+
+void
+HttpThreadPool::setRunState(RunState state) {
+    {
+        std::lock_guard<std::mutex> lck(mutex_);
+        run_state_ = state;
+    }
+    cv_.notify_all();
+}
+
+IOServicePtr
+HttpThreadPool::getIOService() const {
+    return (io_service_);
+}
+
+uint16_t
+HttpThreadPool::getPoolSize() const {
+    return (pool_size_);
+}
+
+uint16_t
+HttpThreadPool::getThreadCount() const {
+    return (threads_.size());
+}
diff --git a/src/lib/http/http_thread_pool.h b/src/lib/http/http_thread_pool.h
new file mode 100644 (file)
index 0000000..ed0c2d8
--- /dev/null
@@ -0,0 +1,149 @@
+// Copyright (C) 2021 Internet Systems Consortium, Inc. ("ISC")
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+#ifndef HTTP_THREAD_POOL_H
+#define HTTP_THREAD_POOL_H
+
+#include <asiolink/io_service.h>
+#include <util/unlock_guard.h>
+
+#include <boost/shared_ptr.hpp>
+
+#include <condition_variable>
+#include <list>
+#include <mutex>
+#include <thread>
+
+namespace isc {
+namespace http {
+
+/// @brief Implements a pausable pool of IOService driven threads.
+class HttpThreadPool {
+public:
+    /// @brief Describes the possible operational state of the pool.
+    enum class RunState {
+        STOPPED,    /// Pool is not operational.
+        RUN,        /// Pool is populated with running threads.
+        PAUSED,     /// Pool is populated with threads which are paused.
+        SHUTDOWN,   /// Pool is transitioning from RUN or PAUSED to STOPPED.
+    };
+
+    /// @brief Constructor
+    ///
+    /// @param io_service IOService that will drive the pool's IO. If empty, it
+    /// create it's own instance.
+    /// @param pool_size Maximum number of threads in the pool.  Currently the number
+    /// of threads is fixed at this value.
+    /// @param defer_start If true, creation of the threads is deferred until a subsequent
+    /// call to @ref start().  In this case the pool's operational state post construction
+    /// is STOPPED.  If false, the construtor will invoke start() which will create the
+    /// threads, placing the pool in RUN state.
+    HttpThreadPool(asiolink::IOServicePtr io_service, size_t pool_size, bool defer_start = false);
+
+    /// @brief Destructor
+    ///
+    /// Ensures the pool is stopped prior to destruction.
+    ~HttpThreadPool();
+
+    /// @brief Transitions the pool from STOPPED to RUN run state.
+    ///
+    /// It starts the pool by doing the following:
+    /// -# Sets state to RUN
+    /// -# Restarts the IOService preparing it thread invocations of
+    ///    IOService::run()
+    /// -# Creates thread_pool_size_ threads, adding each to the pool.
+    ///
+    /// @throw InvalidOperation if called with the pool in any state other
+    /// than STOPPED.
+    void start();
+
+    /// @brief Tranisitions the pool to STOPPED state.
+    ///
+    /// It stops the pool by doing the following:
+    /// -# Sets the state to SHUTDOWN.
+    /// =# Stops the IOService.
+    /// =# Joins the pool threads.
+    /// -# Empties the pool.
+    /// -# Sets the state to STOPPED.
+    void stop();
+
+    /// @brief Transitions the pool from RUN to PAUSED state.
+    ///
+    /// If the state is any state other than RUN it simply returns,
+    /// otherwise it does the following:
+    ///
+    /// -# Sets the state to PAUSED.
+    /// -# Stops the IOService.
+    void pause();
+
+    /// @brief Transitions the pool from PAUSED to RUN state.
+    ///
+    /// If the state is any state other than PAUSED it simply returns,
+    /// otherwise it does the following:
+    ///
+    /// -# Restarts the IOService preparing it for thread invocations
+    ///    of IOService::run()
+    /// -# Sets the state to RUN.
+    void resume();
+
+    /// @brief Thread-safe fetch of the pool's operational state.
+    ///
+    /// @return Pool run state.
+    RunState getRunState();
+
+private:
+    /// @brief Thread-safe set of the pool's operational state.
+    ///
+    /// @note This method does not validate the state change.
+    ///
+    /// @param state new state for the pool.
+    void setRunState(RunState state);
+
+public:
+    /// @brief Fetches the IOService that drives the pool.
+    ///
+    /// @return A pointer to the IOService.
+    asiolink::IOServicePtr getIOService() const;
+
+    /// @brief Fetches the maximum size of the thread pool.
+    ///
+    /// @return the maximum size of the thread pool.
+    uint16_t getPoolSize() const;
+
+    /// @brief Fetches the number of threads in the pool.
+    ///
+    /// @return the number of running threads.
+    uint16_t getThreadCount() const;
+
+private:
+    /// @brief Maxim number of threads in the thread pool.
+    size_t pool_size_;
+
+    /// @brief Pointer to private IOService used in multi-threaded mode.
+    asiolink::IOServicePtr io_service_;
+
+    /// @brief Tracks the operational state of the pool.
+    RunState run_state_;
+
+    /// @brief Mutex to protect the internal state.
+    std::mutex mutex_;
+
+    /// @brief Condition variable for synchronization.
+    std::condition_variable cv_;
+
+    /// @brief Pool of threads used to service connections in multi-threaded
+    /// mode.
+    std::list<boost::shared_ptr<std::thread> > threads_;
+};
+
+/// @brief Defines a pointer to a thread pool.
+typedef boost::shared_ptr<HttpThreadPool> HttpThreadPoolPtr;
+
+} // end of namespace isc::http
+} // end of namespace isc
+
+#endif
+
index 5dc64f32e868f8039d1098b3d7a8028ec74dd080..8fbc3397e7654c7ef6658d5b1a33737524637db7 100644 (file)
@@ -49,6 +49,7 @@ endif
 libhttp_unittests_SOURCES += url_unittests.cc
 libhttp_unittests_SOURCES += test_http_client.h
 libhttp_unittests_SOURCES += mt_client_unittests.cc
+libhttp_unittests_SOURCES += http_thread_pool_unittests.cc
 
 libhttp_unittests_CPPFLAGS = $(AM_CPPFLAGS) $(GTEST_INCLUDES)
 libhttp_unittests_CXXFLAGS = $(AM_CXXFLAGS)
index cab50bd7643928ad7adb24bef93030cbfd75bddd..81ed232a35fe486ad69cda06c8af8cbbe10fcdff 100644 (file)
@@ -198,7 +198,8 @@ public:
     MtHttpClientTest()
         : io_service_(), client_(), listener_(), factory_(), listeners_(), factories_(),
           test_timer_(io_service_), num_threads_(0), num_batches_(0), num_listeners_(0),
-          expected_requests_(0), num_in_progress_(0), num_finished_(0) {
+          expected_requests_(0), num_in_progress_(0), num_finished_(0), paused_(false),
+          pause_cnt_(0) {
         test_timer_.setup(std::bind(&MtHttpClientTest::timeoutHandler, this, true),
                           TEST_TIMEOUT, IntervalTimer::ONE_SHOT);
         MultiThreadingMgr::instance().setMode(true);
@@ -235,7 +236,7 @@ public:
     /// have been carried out or the test fails.
     void runIOService() {
         // Loop until the clients are done, an error occurs, or the time runs out.
-        while (clientRRs_.size() < expected_requests_) {
+        while (getRRCount() < expected_requests_) {
             // Always call reset() before we call run();
             io_service_.get_io_service().reset();
 
@@ -304,15 +305,15 @@ public:
 
             // Wait here until we have as many in progress as we have threads.
             {
-                std::unique_lock<std::mutex> lck(mutex_);
+                std::unique_lock<std::mutex> lck(test_mutex_);
                 ++num_in_progress_;
                 if (num_threads_ == 0 || num_in_progress_ == num_threads_) {
                     // Everybody has one, let's go.
                     num_finished_ = 0;
-                    cv_.notify_all();
+                    test_cv_.notify_all();
                 } else {
                     // I'm ready but others aren't wait here.
-                    bool ret = cv_.wait_for(lck, std::chrono::seconds(10),
+                    bool ret = test_cv_.wait_for(lck, std::chrono::seconds(10),
                                             [&]() { return (num_in_progress_ == num_threads_); });
                     if (!ret) {
                         ADD_FAILURE() << "clients failed to start work";
@@ -332,18 +333,18 @@ public:
 
             // Wait here until we have as many ready to finish as we have threads.
             {
-                std::unique_lock<std::mutex> lck(mutex_);
+                std::unique_lock<std::mutex> lck(test_mutex_);
                 ++num_finished_;
                 clientRRs_.push_back(clientRR);
                 if (num_threads_ == 0 || num_finished_ == num_threads_) {
                     // We're all done, notify the others and finish.
                     num_in_progress_ = 0;
-                    cv_.notify_all();
+                    test_cv_.notify_all();
                     // Stop the test's IOService.
                     io_service_.stop();
                 } else {
                     // I'm done but others aren't wait here.
-                    bool ret = cv_.wait_for(lck, std::chrono::seconds(10),
+                    bool ret = test_cv_.wait_for(lck, std::chrono::seconds(10),
                                             [&]() { return (num_finished_ == num_threads_); });
                     if (!ret) {
                         ADD_FAILURE() << "clients failed to finish work";
@@ -353,6 +354,57 @@ public:
         }));
     }
 
+    /// @brief Initiates a single HTTP request.
+    ///
+    /// Constructs an HTTP post whose body is a JSON map containing a
+    /// single integer element, "sequence".
+    ///
+    /// The request completion handler simply constructs the response,
+    /// and adds it the list of completed request/responses. If the
+    /// number of completed requests has reached the expected number
+    /// it stops the test IOService.
+    ///
+    /// @param sequence value for the integer element, "sequence",
+    /// to send in the request.
+    void startRequestSimple(int sequence, int port_offset = 0) {
+        // Create the URL on which the server can be reached.
+        std::stringstream ss;
+        ss << "http://" << SERVER_ADDRESS << ":" << (SERVER_PORT + port_offset);
+        Url url(ss.str());
+
+        // Initiate request to the server.
+        PostHttpRequestJsonPtr request_json = createRequest("sequence", sequence);
+        HttpResponseJsonPtr response_json = boost::make_shared<HttpResponseJson>();
+        ASSERT_NO_THROW(client_->asyncSendRequest(url, TlsContextPtr(),
+                                                  request_json, response_json,
+            [this, request_json, response_json](const boost::system::error_code& ec,
+                                                const HttpResponsePtr&,
+                                                const std::string&) {
+            // Bail on an error.
+            ASSERT_FALSE(ec) << "asyncSendRequest failed, ec: " << ec;
+
+            // Get stringified thread-id.
+            std::stringstream ss;
+            ss << std::this_thread::get_id();
+
+            // Create the ClientRR.
+            ClientRRPtr clientRR(new ClientRR());
+            clientRR->thread_id_ = ss.str();
+            clientRR->request_ = request_json;
+            clientRR->response_ = response_json;
+
+            {
+                std::unique_lock<std::mutex> lck(test_mutex_);
+                clientRRs_.push_back(clientRR);
+                ++num_finished_;
+                if ((num_finished_ >= expected_requests_) && !io_service_.stopped()) {
+                    io_service_.stop();
+                }
+            }
+
+        }));
+    }
+
     /// @brief Carries out HTTP requests via HttpClient to HTTP listener(s).
     ///
     /// This function creates one HttpClient with the given number
@@ -362,7 +414,10 @@ public:
     ///
     /// Then it iteratively runs the test's IOService until all
     /// the requests have been responded to, an error occurs, or the
-    /// test times out.
+    /// test times out.  During each pass through the run loop, the
+    /// a call to shouldPause() is made to determine if the client
+    /// thread pool should be paused.  If so, the the pool is paused
+    /// and an timer begun which resumes the pool upon timeout.
     ///
     /// Each request carries a single integer element, "sequence", which
     /// uniquely identifies the request. Each response is expected to
@@ -382,8 +437,13 @@ public:
     /// A value of 0 puts the HttpClient in single-threaded mode.
     /// @param num_batches number of batches of requests that should be
     /// conducted.
-    /// @param num_listeners number of HttpListeners to create.
-    void threadRequestAndReceive(size_t num_threads, size_t num_batches, size_t num_listeners = 1) {
+    /// @param num_listeners number of HttpListeners to create. Defaults
+    /// to 1.
+    /// @param num_pauses number of times to pause and resume the client
+    /// during the test.  Defaults to 0.
+    void threadRequestAndReceive(size_t num_threads, size_t num_batches,
+                                 size_t num_listeners = 1,
+                                 size_t num_pauses = 0) {
         ASSERT_TRUE(num_batches);
         ASSERT_TRUE(num_listeners);
         num_threads_ = num_threads;
@@ -440,8 +500,32 @@ public:
             }
         }
 
-        // Run test thread IOService. This drives the listener's IO.
-        ASSERT_NO_THROW(runIOService());
+        // Create a timer to use for invoking resume after pause.
+        IntervalTimer pause_timer_(io_service_);
+        paused_ = false;
+
+        // Loop until the clients are done, an error occurs, or the time runs out.
+        while (getRRCount() < expected_requests_) {
+            // Always call restart() before we call run();
+            io_service_.restart();
+
+            if (shouldPause(num_pauses)) {
+                // Pause client.
+                paused_ = true;
+                ++pause_cnt_;
+                client_->pause();
+
+                // Set timer to resume client.
+                pause_timer_.setup(
+                    [this]() {
+                            client_->resume();
+                            paused_ = false;
+                    }, 10, IntervalTimer::ONE_SHOT);
+            }
+
+            // Run until a client stops the service.
+            io_service_.run();
+        }
 
         // Client should stop without issue.
         ASSERT_NO_THROW(client_->stop());
@@ -452,7 +536,22 @@ public:
         }
 
         // We should have a response for each request.
-        ASSERT_EQ(clientRRs_.size(), expected_requests_);
+        ASSERT_EQ(getRRCount(), expected_requests_);
+
+        // We should have had the expected number of pauses.
+        if (!num_pauses) {
+            ASSERT_EQ(pause_cnt_, 0);
+        } else {
+            // We allow a range on pauses of +-1.  Figuring
+            // out the exact intervals at which to pause was
+            // getting to be a pain.  We don't really care as
+            // long as we're close.  The primary thing is that
+            // we did in fact pause and resume.
+            ASSERT_TRUE((num_pauses - 1) <= pause_cnt_ &&
+                        (pause_cnt_ <= (num_pauses + 1)))
+                        << " num+_pauses: " << num_pauses
+                        << ", pause_cnt_" << pause_cnt_;
+        }
 
         // Create a map to track number of responses for each client thread.
         std::map<std::string, int> responses_per_thread;
@@ -537,6 +636,136 @@ public:
         }
     }
 
+    /// @brief Indicates if the test should pause.
+    ///
+    /// Returns true if the number of completed requests
+    /// has reached or exceeded the next pause interval.
+    /// The pause interval is given by expected number of
+    /// requests divided by the desired number of pauses.
+    ///
+    /// @param num_pauses Desired number of pauses.
+    ///
+    /// @return True if the client should be paused.
+    bool shouldPause(size_t num_pauses) {
+        size_t rr_count = getRRCount();
+        if (paused_ || !num_pauses || !rr_count) {
+            return false;
+        }
+
+        size_t interval = expected_requests_ / num_pauses;
+        size_t next_stop = interval * (pause_cnt_ + 1);
+        return (rr_count >= next_stop);
+    }
+
+    /// @brief Verifies the client can be puased and shutdown while doing work.
+    ///
+    /// @param num_threads number of threads the HttpClient should use.
+    /// A value of 0 puts the HttpClient in single-threaded mode.
+    /// @param num_batches number of batches of requests that should be
+    /// conducted.
+    /// @param num_listeners number of HttpListeners to create. Defaults
+    /// to 1.
+    void workPauseShutdown(size_t num_threads, size_t num_batches,
+                           size_t num_listeners = 1, bool pause_first = true) {
+        ASSERT_TRUE(num_batches);
+        ASSERT_TRUE(num_listeners);
+        num_threads_ = num_threads;
+        num_batches_ = num_batches;
+        num_listeners_ = num_listeners;
+
+        // Client in ST is, in effect, 1 thread.
+        size_t effective_threads = (num_threads_ == 0 ? 1 : num_threads_);
+
+        // Calculate the maximum requests that could complete.
+        size_t maximum_requests = (num_batches_ * num_listeners_ * effective_threads);
+
+        // Calculate the expected number of requests.
+        expected_requests_ = maximum_requests / 2;
+
+        for (auto i = 0; i < num_listeners_; ++i) {
+            // Make a factory
+            HttpResponseCreatorFactoryPtr factory(new TestHttpResponseCreatorFactory(SERVER_PORT + i));
+            factories_.push_back(factory);
+
+            // Need to create a Listener on
+            HttpListenerPtr listener(new HttpListener(io_service_,
+                                                      IOAddress(SERVER_ADDRESS), (SERVER_PORT + i),
+                                                      TlsContextPtr(), factory,
+                                                      HttpListener::RequestTimeout(10000),
+                                                      HttpListener::IdleTimeout(10000)));
+            listeners_.push_back(listener);
+
+            // Start the server.
+            ASSERT_NO_THROW(listener->start());
+        }
+
+        // Create an MT client with num_threads
+        ASSERT_NO_THROW_LOG(client_.reset(new HttpClient(io_service_, num_threads)));
+        ASSERT_TRUE(client_);
+
+        if (num_threads_ == 0) {
+            // If we single-threaded client should not have it's own IOService.
+            ASSERT_FALSE(client_->getThreadIOService());
+        } else {
+            // If we multi-threaded client should have it's own IOService.
+            ASSERT_TRUE(client_->getThreadIOService());
+        }
+
+        // Verify the pool size and number of threads are as expected.
+        ASSERT_EQ(client_->getThreadPoolSize(), num_threads);
+        ASSERT_EQ(client_->getThreadCount(), num_threads);
+
+        // Start the requisite number of requests:
+        //   batch * listeners * threads.
+        int sequence = 0;
+        for (auto b = 0; b < num_batches; ++b) {
+            for (auto l = 0; l < num_listeners_; ++l) {
+                for (auto t = 0; t < effective_threads; ++t) {
+                    startRequestSimple(++sequence, l);
+                }
+            }
+        }
+
+        // Loop until the 1/2 the reuests are done, an error occurs,
+        // or the time runs out.
+        size_t rr_count = 0;
+        while (rr_count < (expected_requests_)) {
+            // Always call reset() before we call run();
+            io_service_.get_io_service().reset();
+
+            // Run until a client stops the service.
+            io_service_.run();
+            rr_count = getRRCount();
+        }
+
+        if (pause_first) {
+            // Pause the client.
+            ASSERT_NO_THROW(client_->pause());
+            ASSERT_EQ(HttpThreadPool::RunState::PAUSED, client_->getRunState());
+        }
+
+        // We should have completed at least the expected number of requests
+        // but less than the maximum number of requests.
+        ASSERT_GE(getRRCount(), expected_requests_ );
+        ASSERT_LT(getRRCount(), maximum_requests);
+
+        // Client should stop without issue.
+        ASSERT_NO_THROW(client_->stop());
+
+        // Listeners should stop without issue.
+        for (const auto& listener : listeners_) {
+            ASSERT_NO_THROW(listener->stop());
+        }
+    }
+
+    /// @brief Fetch the number of completed requests.
+    ///
+    /// @return number of completed requests.
+    size_t getRRCount() {
+        std::unique_lock<std::mutex> lck(test_mutex_);
+        return(clientRRs_.size());
+    }
+
     /// @brief IO service used in the tests.
     IOService io_service_;
 
@@ -580,12 +809,18 @@ public:
     std::vector<ClientRRPtr> clientRRs_;
 
     /// @brief Mutex for locking.
-    std::mutex mutex_;
+    std::mutex test_mutex_;
 
     /// @brief Condition variable used to make client threads wait
     /// until number of in-progress requests reaches the number
     /// of client requests.
-    std::condition_variable cv_;
+    std::condition_variable test_cv_;
+
+    /// @brief Indicates if client threads are currently "paused".
+    bool paused_;
+
+    /// @brief Number of times client has been paused during the test.
+    size_t pause_cnt_;
 };
 
 // Verifies we can construct and destruct, in both single
@@ -628,7 +863,8 @@ TEST_F(MtHttpClientTest, basics) {
     ASSERT_NO_THROW_LOG(client->stop());
 
     // Verify we're stopped.
-    ASSERT_FALSE(client->getThreadIOService());
+    ASSERT_TRUE(client->getThreadIOService());
+    EXPECT_TRUE(client->getThreadIOService()->stopped());
     ASSERT_EQ(client->getThreadPoolSize(), 3);
     ASSERT_EQ(client->getThreadCount(), 0);
 
@@ -700,4 +936,22 @@ TEST_F(MtHttpClientTest, fourByFourByTwo) {
     threadRequestAndReceive(num_threads, num_batches, num_listeners);
 }
 
+// Verifies that we can cleanly work, pause, and resume repeatedly.
+TEST_F(MtHttpClientTest, workPauseResumee) {
+    size_t num_threads = 12;
+    size_t num_batches = 12;
+    size_t num_listeners = 12;
+    size_t num_pauses = 7;
+    threadRequestAndReceive(num_threads, num_batches, num_listeners, num_pauses);
+}
+
+// Verifies that we can cleanly pause and shutdown while doing
+// multi-threaded work.
+TEST_F(MtHttpClientTest, workPauseShutdown) {
+    size_t num_threads = 8;
+    size_t num_batches = 8;
+    size_t num_listeners = 8;
+    workPauseShutdown(num_threads, num_batches, num_listeners);
+}
+
 } // end of anonymous namespace