]> git.ipfire.org Git - thirdparty/kea.git/commitdiff
[#1818] Reimplemented HttpThreadPool internals
authorThomas Markwalder <tmark@isc.org>
Thu, 13 May 2021 18:55:14 +0000 (14:55 -0400)
committerThomas Markwalder <tmark@isc.org>
Mon, 17 May 2021 14:56:50 +0000 (10:56 -0400)
HttpThreadPool state changes are now synchronous.

src/lib/config/cmd_http_listener.cc
    Minor updates

src/lib/config/tests/cmd_http_listener_unittests.cc
    Revamped to use simplified command handling for
    pause/resume testing.

src/lib/http/client.*
    Minor updates

src/lib/http/http_thread_pool.*
    HttpThreadPool::setRunState() is now synchronous, returning
    only when all threads in the pool are in appropriate state
    or have been destroyed.

    Did away with SHUTDOWN state.
    Changed RUN to RUNNING

src/lib/http/tests/http_thread_pool_unittests.cc
    Revamped testing.

src/lib/http/tests/mt_client_unittests.cc
    Revamped to use request handler handling for pause/resume testing

src/lib/config/cmd_http_listener.cc
src/lib/config/tests/cmd_http_listener_unittests.cc
src/lib/http/client.cc
src/lib/http/http_thread_pool.cc
src/lib/http/http_thread_pool.h
src/lib/http/tests/http_thread_pool_unittests.cc
src/lib/http/tests/mt_client_unittests.cc

index 0b2a5eba611a4f548a698a41b9d432715e83298f..55cb2d856dda8c42f28303f9238616fd853f3a80 100644 (file)
@@ -93,7 +93,7 @@ CmdHttpListener::pause() {
 void
 CmdHttpListener::resume() {
     if (threads_) {
-        threads_->resume();
+        threads_->run();
     }
 }
 
@@ -122,7 +122,7 @@ CmdHttpListener::stop() {
               .arg(port_);
 }
 
-HttpThreadPool::RunState 
+HttpThreadPool::RunState
 CmdHttpListener::getRunState() const {
     if (!threads_) {
         isc_throw(InvalidOperation,
@@ -132,16 +132,16 @@ CmdHttpListener::getRunState() const {
     return (threads_->getRunState());
 }
 
-bool 
+bool
 CmdHttpListener::isRunning() {
     if (threads_) {
-        return (threads_->getRunState() == HttpThreadPool::RunState::RUN);
+        return (threads_->getRunState() == HttpThreadPool::RunState::RUNNING);
     }
 
     return (false);
 }
 
-bool 
+bool
 CmdHttpListener::isStopped() {
     if (threads_) {
         return (threads_->getRunState() == HttpThreadPool::RunState::STOPPED);
@@ -150,7 +150,7 @@ CmdHttpListener::isStopped() {
     return (true);
 }
 
-bool 
+bool
 CmdHttpListener::isPaused() {
     if (threads_) {
         return (threads_->getRunState() == HttpThreadPool::RunState::PAUSED);
index 8af2571057453226efcae55072e2b0b71fbb4243..f2e390fd0195ab026f5ce38e621328a7f675f7fa 100644 (file)
@@ -180,31 +180,21 @@ public:
     ///
     /// @param num_pauses Desired number of times the listener should be
     /// paused during the test.
-    void runIOService(size_t num_pauses = 0) {
+    void runIOService(size_t request_limit = 0) {
         // Create a timer to use for invoking resume after pause.
         IntervalTimer pause_timer_(io_service_);
         paused_ = false;
 
+        if (!request_limit) {
+            request_limit = clients_.size();
+        }
+
         // Loop until the clients are done, an error occurs, or the time runs out.
         size_t num_done = 0;
-        while (num_done != clients_.size()) {
+        while (num_done != request_limit) {
             // Always call restart() before we call run();
             io_service_.restart();
 
-            if (shouldPause(num_pauses, num_done)) {
-                // Pause the listener .
-                paused_ = true;
-                ++pause_cnt_;
-                listener_->pause();
-
-                // Set timer to resume listener.
-                pause_timer_.setup(
-                    [this]() {
-                        listener_->resume();
-                        paused_ = false;
-                    }, 10, IntervalTimer::ONE_SHOT);
-            }
-
             // Run until a client stops the service.
             io_service_.run();
 
@@ -225,8 +215,10 @@ public:
     ///
     /// @return True if the listener should be paused.
     bool shouldPause(size_t num_pauses, size_t num_done) {
-        // True if the number of clients done is a multiple of the number of pauses.
-        return (!paused_ && num_pauses && num_done && !(num_done % num_pauses));
+        size_t request_limit = (pause_cnt_ < num_pauses ?
+                               (num_done + ((clients_.size() - num_done) / num_pauses))
+                                    : clients_.size());
+        return (request_limit);
     }
 
     /// @brief Create an HttpResponse from a response string.
@@ -277,7 +269,7 @@ public:
     ///
     /// @return Returns response with map of arguments containing
     /// a string value 'thread-id': <thread id>
-    ConstElementPtr threadCommandHandler(const std::string& /*command_name*/,
+    ConstElementPtr synchronizedCommandHandler(const std::string& /*command_name*/,
                                          const ConstElementPtr& command_arguments) {
         // If the number of in progress commands is less than the number
         // of threads, then wait here until we're notified.  Otherwise,
@@ -335,6 +327,36 @@ public:
         return (createAnswer(CONTROL_RESULT_SUCCESS, arguments));
     }
 
+    /// @brief Handler for the 'thread' command.
+    ///
+    /// @param command_name Command name, i.e. 'thread'.
+    /// @param command_arguments Command arguments should contain
+    /// one string element, "client-ptr", whose value is the stringified
+    /// pointer to the client that issued the command.
+    ///
+    /// @return Returns response with map of arguments containing
+    /// a string value 'thread-id': <thread id>
+    ConstElementPtr simpleCommandHandler(const std::string& /*command_name*/,
+                                         const ConstElementPtr& command_arguments) {
+        // Create the map of response arguments.
+        ElementPtr arguments = Element::createMap();
+        // First we echo the client-ptr command argument.
+        ConstElementPtr client_ptr = command_arguments->get("client-ptr");
+        if (!client_ptr) {
+            return (createAnswer(CONTROL_RESULT_ERROR, "missing client-ptr"));
+        }
+
+        arguments->set("client-ptr", client_ptr);
+
+        // Now we add the thread-id.
+        std::stringstream ss;
+        ss << std::this_thread::get_id();
+        arguments->set("thread-id", Element::create(ss.str()));
+
+        // We're done, ship it!
+        return (createAnswer(CONTROL_RESULT_SUCCESS, arguments));
+    }
+
     /// @brief Submits one or more thread commands to a CmdHttpListener.
     ///
     /// This function command will creates a CmdHttpListener
@@ -354,10 +376,7 @@ public:
     /// thread command.  Each client is used to carry out a single thread
     /// command request.  Must be greater than 0 and a multiple of num_threads
     /// if it is greater than num_threads.
-    /// @param num_pauses Desired number of times the listener should be
-    /// paused during the test.
-    void threadListenAndRespond(size_t num_threads, size_t num_clients,
-                                size_t num_pauses = 0) {
+    void threadListenAndRespond(size_t num_threads, size_t num_clients) {
         // First we makes sure the parameter rules apply.
         ASSERT_TRUE(num_threads > 0);
         ASSERT_TRUE(num_clients > 0);
@@ -373,7 +392,7 @@ public:
         // Register the thread command handler.
         CommandMgr::instance().registerCommand("thread",
                                                std::bind(&CmdHttpListenerTest
-                                                         ::threadCommandHandler,
+                                                         ::synchronizedCommandHandler,
                                                          this, ph::_1, ph::_2));
 
         // Create a listener with prescribed number of threads.
@@ -397,7 +416,7 @@ public:
 
         // Now we run the client-side IOService until all requests are done,
         // errors occur or the test times out.
-        ASSERT_NO_FATAL_FAILURE(runIOService(num_pauses));
+        ASSERT_NO_FATAL_FAILURE(runIOService());
 
         // Stop the listener and then verify it has stopped.
         ASSERT_NO_THROW_LOG(listener_->stop());
@@ -493,6 +512,176 @@ public:
                       << "thread-id: " << it.first
                       << ", clients: " << it.second << std::endl;
         }
+    }
+
+    /// @brief Pauses and resumes a CmdHttpListener while it processes command
+    /// requests.
+    ///
+    /// This function command will creates a CmdHttpListener
+    /// with the given number of threads, initiates the given
+    /// number of clients, each requesting the "thread" command,
+    /// and then iteratively runs the test's IOService until all
+    /// the clients have received their responses or an error occurs.
+    /// It will pause and resume the listener at intervals governed
+    /// by the given number of pauses.
+    ///
+    /// @param num_threads - the number of threads the CmdHttpListener
+    /// should use. Must be greater than 0.
+    /// @param num_clients - the number of clients that should issue the
+    /// thread command.  Each client is used to carry out a single thread
+    /// command request.  Must be greater than 0.
+    /// @param num_pauses Desired number of times the listener should be
+    /// paused during the test. Must be greated than 0.
+    void workPauseAndResume(size_t num_threads, size_t num_clients,
+                            size_t num_pauses) {
+        // First we makes sure the parameter rules apply.
+        ASSERT_TRUE(num_threads);
+        ASSERT_TRUE(num_clients);
+        ASSERT_TRUE(num_pauses);
+        num_threads_ = num_threads;
+        num_clients_ = num_clients;
+
+        // Register the thread command handler.
+        CommandMgr::instance().registerCommand("thread",
+                                               std::bind(&CmdHttpListenerTest
+                                                         ::simpleCommandHandler,
+                                                         this, ph::_1, ph::_2));
+
+        // Create a listener with prescribed number of threads.
+        ASSERT_NO_THROW_LOG(listener_.reset(new CmdHttpListener(IOAddress(SERVER_ADDRESS),
+                                                               SERVER_PORT, num_threads)));
+        ASSERT_TRUE(listener_);
+
+        // Start it and verify it is running.
+        ASSERT_NO_THROW_LOG(listener_->start());
+        ASSERT_TRUE(listener_->isRunning());
+        EXPECT_EQ(listener_->getThreadCount(), num_threads);
+
+        // Maps the number of clients served by a given thread-id.
+        std::map<std::string, int> clients_per_thread;
+
+        // Initiate the prescribed number of command requests.
+        num_in_progress_ = 0;
+        for (auto i = 0; clients_.size() < num_clients; ++i) {
+            ASSERT_NO_THROW_LOG(startThreadCommand());
+        }
+
+        // Now we run the client-side IOService until all requests are done,
+        // errors occur or the test times out.  We'll pause and resume the
+        // given the number of pauses
+        size_t num_done = 0;
+        size_t total_requests = clients_.size();
+        while (num_done < total_requests) {
+            // Calculate how many more requests to process before we pause again.
+            // We divide the number of oustanding requests by the number of pauses
+            // and stop after we've done at least that many more.
+            size_t request_limit = (pause_cnt_ < num_pauses ?
+                                    (num_done + ((total_requests - num_done) / num_pauses))
+                                    : total_requests);
+
+            // Run test IOService until we hit the limit.
+            runIOService(request_limit);
+
+            // If we've done all our pauses we should be through.
+            if (pause_cnt_ == num_pauses) {
+                break;
+            }
+
+            // Pause the client.
+            ASSERT_NO_THROW(listener_->pause());
+            ASSERT_TRUE(listener_->isPaused());
+            ++pause_cnt_;
+
+            // Check our progress.
+            num_done = 0;
+            for (auto const& client : clients_) {
+                if (client->receiveDone()) {
+                    ++num_done;
+                }
+            }
+
+            // We should completed at least as many as our
+            // target limit.
+            ASSERT_GE(num_done, request_limit);
+
+            // Resume the listener.
+            ASSERT_NO_THROW(listener_->resume());
+            ASSERT_TRUE(listener_->isRunning());
+        }
+
+        // Stop the listener and then verify it has stopped.
+        ASSERT_NO_THROW_LOG(listener_->stop());
+        ASSERT_TRUE(listener_->isStopped());
+        EXPECT_EQ(listener_->getThreadCount(), 0);
+
+        // Iterate over the clients, checking their outcomes.
+        size_t total_responses = 0;
+        for (auto const& client : clients_) {
+            // Client should have completed its receive successfully.
+            ASSERT_TRUE(client->receiveDone());
+
+            // Client response should not be empty.
+            HttpResponsePtr hr;
+            std::string response_str = client->getResponse();
+            ASSERT_FALSE(response_str.empty());
+
+            // Parse the response into an HttpResponse.
+            ASSERT_NO_THROW_LOG(hr = parseResponse(client->getResponse()));
+
+            // Now we walk the element tree to get the response data.  It should look
+            // this:
+            //
+            //  [ {
+            //       "arguments": { "client-ptr": "xxxxx",
+            //                      "thread-id": "zzzzz" },
+            //       "result": 0
+            //  } ]
+            //
+            // First we turn it into an Element tree.
+            std::string body_str = hr->getBody();
+            ConstElementPtr body;
+            ASSERT_NO_THROW_LOG(body = Element::fromJSON(hr->getBody()));
+
+            // Outermost is a list, since we're emulating agent responses.
+            ASSERT_EQ(body->getType(), Element::list);
+            ASSERT_EQ(body->size(), 1);
+
+            // Answer should be a map containing "arguments" and "results".
+            ConstElementPtr answer = body->get(0);
+            ASSERT_EQ(answer->getType(), Element::map);
+
+            // "result" should be 0.
+            ConstElementPtr result = answer->get("result");
+            ASSERT_TRUE(result);
+            ASSERT_EQ(result->getType(), Element::integer);
+            ASSERT_EQ(result->intValue(), 0);
+
+            // "arguments" is a map containing "client-ptr" and "thread-id".
+            ConstElementPtr arguments = answer->get("arguments");
+            ASSERT_TRUE(arguments);
+            ASSERT_EQ(arguments->getType(), Element::map);
+
+            // "client-ptr" is a string.
+            ConstElementPtr client_ptr = arguments->get("client-ptr");
+            ASSERT_TRUE(client_ptr);
+            ASSERT_EQ(client_ptr->getType(), Element::string);
+
+            // "thread-id" is a string.
+            ConstElementPtr thread_id = arguments->get("thread-id");
+            ASSERT_TRUE(thread_id);
+            ASSERT_EQ(thread_id->getType(), Element::string);
+            std::string thread_id_str = thread_id->stringValue();
+
+            // Make sure the response received was for this client.
+            std::stringstream ss;
+            ss << client;
+            ASSERT_EQ(client_ptr->stringValue(), ss.str());
+
+            ++total_responses;
+        }
+
+        // We should have responses for all our clients.
+        EXPECT_EQ(total_responses, num_clients);
 
         // We should have had the expected number of pauses.
         if (!num_pauses) {
@@ -599,7 +788,7 @@ TEST_F(CmdHttpListenerTest, basics) {
     EXPECT_EQ(listener_->getThreadCount(), 1);
     ASSERT_TRUE(listener_->getThreadIOService());
     EXPECT_FALSE(listener_->getThreadIOService()->stopped());
-    EXPECT_EQ(listener_->getRunState(),  HttpThreadPool::RunState::RUN);
+    EXPECT_EQ(listener_->getRunState(),  HttpThreadPool::RunState::RUNNING);
 
     // Trying to start it again should fail.
     ASSERT_THROW_MSG(listener_->start(), InvalidOperation,
@@ -762,12 +951,13 @@ TEST_F(CmdHttpListenerTest, sixByEighteen) {
     threadListenAndRespond(num_threads, num_clients);
 }
 
-// Pauses and resumes during work.
+// Pauses and resumes the listener while it is processing
+// requests.
 TEST_F(CmdHttpListenerTest, pauseAndResume) {
     size_t num_threads = 6;
     size_t num_clients = 18;
     size_t num_pauses = 3;
-    threadListenAndRespond(num_threads, num_clients, num_pauses);
+    workPauseAndResume(num_threads, num_clients, num_pauses);
 }
 
 } // end of anonymous namespace
index 87ffb8898fe5885116ef271d1ba7469801528a2c..777355dcc4780dac7681e5a475d3ce19489b2726 100644 (file)
@@ -1810,8 +1810,8 @@ public:
             isc_throw(InvalidOperation, "HttpClient::resume - no thread pool");
         }
 
-        // Resume the thread pool.
-        threads_->resume();
+        // Resume running the thread pool.
+        threads_->run();
     }
 
 
@@ -1840,11 +1840,11 @@ public:
 
     /// @brief Indicates if the thread pool processing is running.
     ///
-    /// @return True if the thread pool exists and is in the RUN state,
+    /// @return True if the thread pool exists and is in the RUNNING state,
     /// false otherwise.
     bool isRunning() {
         if (threads_) {
-            return (threads_->getRunState() == HttpThreadPool::RunState::RUN);
+            return (threads_->getRunState() == HttpThreadPool::RunState::RUNNING);
         }
 
         return (false);
index d17c782f313c2f485c97395ba98eeeb9400cfb5b..901a0aeb62d12090f4402b3a9cb550c660e8e250 100644 (file)
@@ -33,7 +33,8 @@ using namespace isc::util;
 HttpThreadPool::HttpThreadPool(IOServicePtr io_service, size_t pool_size,
                                bool defer_start /* = false */)
     : pool_size_(pool_size), io_service_(io_service),
-      run_state_(RunState::STOPPED), mutex_(), cv_()  {
+      run_state_(RunState::STOPPED), mutex_(), thread_cv_(),
+      main_cv_(), paused_(0), running_(0), exited_(0)  {
     if (!pool_size) {
         isc_throw(BadValue, "HttpThreadPool::ctor pool_size must be > 0");
     }
@@ -45,131 +46,182 @@ HttpThreadPool::HttpThreadPool(IOServicePtr io_service, size_t pool_size,
 
     // If we're not deferring the start, do it now.
     if (!defer_start) {
-        start();
+        run();
     }
 }
 
 HttpThreadPool::~HttpThreadPool() {
-    if (getRunState() != RunState::STOPPED) {
-        // Stop if we aren't already stopped
-        stop();
+    stop();
+}
+
+void
+HttpThreadPool::run() {
+    setRunState(RunState::RUNNING);
+}
+
+void
+HttpThreadPool::pause() {
+    setRunState(RunState::PAUSED);
+}
+
+void
+HttpThreadPool::stop() {
+    setRunState(RunState::STOPPED);
+}
+
+HttpThreadPool::RunState
+HttpThreadPool::getRunState() {
+    std::lock_guard<std::mutex> lck(mutex_);
+    return (run_state_);
+}
+
+bool
+HttpThreadPool::validateStateChange(RunState new_state) const {
+    bool is_valid = false;
+    switch(run_state_) {
+    case RunState::STOPPED:
+        is_valid = (new_state == RunState::RUNNING);
+        break;
+    case RunState::RUNNING:
+        is_valid = (new_state != RunState::RUNNING);
+        break;
+    case RunState::PAUSED:
+        is_valid = (new_state != RunState::PAUSED);
+        break;
     }
+
+    return (is_valid);
 }
 
 void
-HttpThreadPool::start() {
-    if (getRunState() != RunState::STOPPED) {
-        isc_throw(InvalidOperation, "HttpThreadPool::start already started!");
+HttpThreadPool::setRunState (RunState new_state) {
+    std::unique_lock<std::mutex> main_lck(mutex_);
+
+    // Bail if the transition is invalid.
+    if (!validateStateChange(new_state)) {
+        return;
     }
 
-    // Set state to RUN.
-    setRunState(RunState::RUN);
+    run_state_ = new_state;
+    // Notify threads of state change.
+    thread_cv_.notify_all();
 
-    // Prep IOService for run() invocations.
-    io_service_->restart();
+    switch(new_state) {
+    case RunState::RUNNING: {
+        // Restart the IOSerivce.
+        io_service_->restart();
 
-    // Create a pool of threads, each calls run() on our
-    // io_service instance.
-    for (std::size_t i = 0; i < pool_size_; ++i) {
-        boost::shared_ptr<std::thread> thread(new std::thread(
+        // While we have fewer threads than we should, make more.
+        while (threads_.size() < pool_size_) {
+            boost::shared_ptr<std::thread> thread(new std::thread(
             [this]() {
                 bool done = false;
                 while (!done) {
                     switch (getRunState()) {
-                    case RunState::RUN:
-                        io_service_->run();
+                    case RunState::RUNNING: {
+                        {
+                            std::unique_lock<std::mutex> lck(mutex_);
+                            running_++;
+
+                            // If We're all running notify main thread.
+                            if (running_ == pool_size_) {
+                                main_cv_.notify_all();
+                            }
+                         }
+
+                         // Run the IOService.
+                         io_service_->run();
+
+                         {
+                            std::unique_lock<std::mutex> lck(mutex_);
+                            running_--;
+                         }
+
                         break;
-                    case RunState::PAUSED:
-                    {
-                        // We need to stop and wait to be released.
+                    }
+
+                    case RunState::PAUSED: {
                         std::unique_lock<std::mutex> lck(mutex_);
-                        cv_.wait(lck,
+                        paused_++;
+
+                        // If we're all paused notify main.
+                        if (paused_ == threads_.size()) {
+                            main_cv_.notify_all();
+                        }
+
+                        // Wait here till I'm released.
+                        thread_cv_.wait(lck,
                             [&]() {
                                 return (run_state_ != RunState::PAUSED);
                             });
+
+                        paused_--;
                         break;
                     }
-                    case RunState::SHUTDOWN:
-                        done = true;
-                        break;
-                    case RunState::STOPPED:
-                        // This should never happen.
+
+                    case RunState::STOPPED: {
                         done = true;
                         break;
-                    }
+                    }}
                 }
-            }));
-
-        threads_.push_back(thread);
-    }
-}
-
-void
-HttpThreadPool::stop() {
-    if (getRunState() == RunState::STOPPED) {
-        // Nothing to do.
-        return;
-    }
 
-    // Set the state to SHUTDOWN.
-    setRunState(RunState::SHUTDOWN);
+                std::unique_lock<std::mutex> lck(mutex_);
+                exited_++;
+                if (exited_ == threads_.size()) {
+                    main_cv_.notify_all();
+                }
+            }));
 
-    // Stop our IOService.
-    if (!io_service_->stopped()) {
-        // Flush canceled (and ready) handlers.
-        io_service_->poll();
+            // Add thread to the pool.
+            threads_.push_back(thread);
+        }
 
-        // Stop the service
-        io_service_->stop();
-    }
+        // Main thread waits here until all threads are running.
+        main_cv_.wait(main_lck,
+            [&]() {
+                return (running_ == threads_.size());
+            });
 
-    // Shutdown the threads.
-    for (auto const& thread : threads_) {
-        thread->join();
+        exited_ = 0;
+        break;
     }
 
-    // Empty the thread pool.
-    threads_.clear();
+    case RunState::PAUSED: {
+        // Stop IOService.
+        if (!io_service_->stopped()) {
+            io_service_->poll();
+            io_service_->stop();
+        }
 
-    // Set the state to STOPPED.
-    setRunState(RunState::STOPPED);
-}
+        // Main thread waits here until all threads are paused.
+        main_cv_.wait(main_lck,
+            [&]() {
+                return (paused_ == threads_.size());
+            });
 
-void
-HttpThreadPool::pause() {
-    if (getRunState() !=  RunState::RUN) {
-        // Not running, can't pause.
-        return;
+        break;
     }
 
-    setRunState(RunState::PAUSED);
-    io_service_->stop();
-}
-
-void
-HttpThreadPool::resume() {
-    if (getRunState() !=  RunState::PAUSED) {
-        // Not paused, can't resume.
-        return;
-    }
-
-    io_service_->restart();
-    setRunState(RunState::RUN);
-}
-
-HttpThreadPool::RunState
-HttpThreadPool::getRunState() {
-    std::lock_guard<std::mutex> lck(mutex_);
-    return (run_state_);
-}
-
-void
-HttpThreadPool::setRunState(RunState state) {
-    {
-        std::lock_guard<std::mutex> lck(mutex_);
-        run_state_ = state;
-    }
-    cv_.notify_all();
+    case RunState::STOPPED: {
+        // Stop IOService.
+        if (!io_service_->stopped()) {
+            io_service_->poll();
+            io_service_->stop();
+        }
+
+        // Main thread waits here until all threads have exited.
+        main_cv_.wait(main_lck,
+            [&]() {
+                return (exited_ == threads_.size());
+            });
+
+        for (auto const& thread : threads_) {
+            thread->join();
+        }
+
+        threads_.clear();
+        break;
+    }}
 }
 
 IOServicePtr
index e937b2a4ebbc2d91c9951827334437f3b893e0e8..d311468b173796a29d9f94094e7c4a4a084c76d5 100644 (file)
@@ -26,68 +26,50 @@ public:
     /// @brief Describes the possible operational state of the pool.
     enum class RunState {
         STOPPED,    /// Pool is not operational.
-        RUN,        /// Pool is populated with running threads.
-        PAUSED,     /// Pool is populated with threads which are paused.
-        SHUTDOWN,   /// Pool is transitioning from RUN or PAUSED to STOPPED.
+        RUNNING,    /// Pool is populated with running threads.
+        PAUSED,     /// Pool is populated with threads that are paused.
     };
 
     /// @brief Constructor
     ///
     /// @param io_service IOService that will drive the pool's IO. If empty, it
     /// create it's own instance.
-    /// @param pool_size Maximum number of threads in the pool.  Currently the number
-    /// of threads is fixed at this value.
-    /// @param defer_start If true, creation of the threads is deferred until a subsequent
-    /// call to @ref start().  In this case the pool's operational state post construction
-    /// is STOPPED.  If false, the constructor will invoke start() which will create the
-    /// threads, placing the pool in RUN state.
-    HttpThreadPool(asiolink::IOServicePtr io_service, size_t pool_size, bool defer_start = false);
+    /// @param pool_size Maximum number of threads in the pool.  Currently the
+    /// number of threads is fixed at this value.
+    /// @param defer_start If true, creation of the threads is deferred until
+    /// a subsequent call to @ref start().  In this case the pool's operational
+    /// state post construction is STOPPED.  If false, the constructor will
+    /// invoke run() to tranistion the pool into the RUNNING state.
+    HttpThreadPool(asiolink::IOServicePtr io_service, size_t pool_size,
+                   bool defer_start = false);
 
     /// @brief Destructor
     ///
     /// Ensures the pool is stopped prior to destruction.
     ~HttpThreadPool();
 
-    /// @brief Transitions the pool from STOPPED to RUN run state.
+    /// @brief Transitions the pool from STOPPED or PAUSED to RUNNING.
     ///
-    /// It starts the pool by doing the following:
-    /// -# Sets state to RUN
-    /// -# Restarts the IOService preparing it thread invocations of
-    ///    IOService::run()
-    /// -# Creates thread_pool_size_ threads, adding each to the pool.
+    /// When called from the STOPPED state, the pool threads are created
+    /// begin processing events.
+    /// When called from the PAUSED state, the pool threads are released
+    /// from PAUSED and resume processing event.s
+    /// Has no effect if the pool is already in the RUNNING state.
+    void run();
+
+    /// @brief Transitions the pool from RUNNING to PAUSED state.
     ///
-    /// @throw InvalidOperation if called with the pool in any state other
-    /// than STOPPED.
-    void start();
-
-    /// @brief Transitions the pool to STOPPED state.
-    ///
-    /// It stops the pool by doing the following:
-    /// -# Sets the state to SHUTDOWN.
-    /// =# Stops the IOService.
-    /// =# Joins the pool threads.
-    /// -# Empties the pool.
-    /// -# Sets the state to STOPPED.
-    void stop();
-
-    /// @brief Transitions the pool from RUN to PAUSED state.
-    ///
-    /// If the state is any state other than RUN it simply returns,
-    /// otherwise it does the following:
-    ///
-    /// -# Sets the state to PAUSED.
-    /// -# Stops the IOService.
+    /// Pool threads suspend event processing and pause until they
+    /// are released to either resume running or stop.
+    /// Has no effect if the pool is already in the PAUSED or STOPPED
+    /// state.
     void pause();
 
-    /// @brief Transitions the pool from PAUSED to RUN state.
-    ///
-    /// If the state is any state other than PAUSED it simply returns,
-    /// otherwise it does the following:
+    /// @brief Transitions the pool to from RUNNING OR PAUSED to STOPPED.
     ///
-    /// -# Restarts the IOService preparing it for thread invocations
-    ///    of IOService::run()
-    /// -# Sets the state to RUN.
-    void resume();
+    /// Stops thread event processing and then destroys the pool's threads
+    /// Has no effect if the pool is already in the STOPPED state.
+    void stop();
 
     /// @brief Thread-safe fetch of the pool's operational state.
     ///
@@ -95,14 +77,48 @@ public:
     RunState getRunState();
 
 private:
-    /// @brief Thread-safe set of the pool's operational state.
+    /// @brief Thread-safe change of the pool's operational state.
     ///
-    /// @note This method does not validate the state change.
+    /// Transitions a pool from one state to another:
+    ///
+    /// When moving from STOPPED or PAUSED to RUNNING:
+    /// -# Sets state to RUNNING.
+    /// -# Notifies threads of state change.
+    /// -# Restarts the IOService.
+    /// -# Creates the threads if they do not yet exist (true only
+    /// when transitioning from STOPPED).
+    /// -# Waits until threads are running.
+    /// -# Returns to caller.
+    ///
+    /// When moving from RUNNING or PAUSED to STOPPED:
+    /// -# Sets state to STOPPED
+    /// -# Notifies threads of state change.
+    /// -# Polls the IOService to flush handlers.
+    /// -# Stops the IOService.
+    /// -# Waits until all threads have exited the work function.
+    /// -# Joins and destroys the threads.
+    /// -# Returns to caller.
+    ///
+    /// When moving from RUNNING to PAUSED:
+    /// -# Sets state to PAUSED
+    /// -# Notifies threads of state change.
+    /// -# Polls the IOService to flush handlers.
+    /// -# Stops the IOService.
+    /// -# Waits until all threads have paused.
+    /// -# Returns to caller.
     ///
     /// @param state new state for the pool.
     void setRunState(RunState state);
 
+    /// @brief Validates whether the pool can change to a given state.
+    ///
+    /// @param state new state for the pool.
+    /// @return true if the changs is valid, false otherwise.
+    /// @note Must be called from a thread-safe context.
+    bool validateStateChange(RunState state) const;
+
 public:
+
     /// @brief Fetches the IOService that drives the pool.
     ///
     /// @return A pointer to the IOService.
@@ -131,12 +147,26 @@ private:
     /// @brief Mutex to protect the internal state.
     std::mutex mutex_;
 
-    /// @brief Condition variable for synchronization.
-    std::condition_variable cv_;
+    /// @brief Condition variable used by threads for synchronization.
+    std::condition_variable thread_cv_;
+
+    /// @brief Condition variable used by main thread to wait on threads
+    /// state transitions.
+    std::condition_variable main_cv_;
+
+    /// @brief Number of threads currently paused.
+    size_t paused_;
+
+    /// @brief Number of threads currently running.
+    size_t running_;
+
+    /// @brief Number of threads that have exited the work funcion.
+    size_t exited_;
 
     /// @brief Pool of threads used to service connections in multi-threaded
     /// mode.
     std::list<boost::shared_ptr<std::thread> > threads_;
+
 };
 
 /// @brief Defines a pointer to a thread pool.
index 80bc392ccbd78ed804ccf709b46f91423d135593..0f21d092edd7bd6901573013156abf3f5ec2af05 100644 (file)
@@ -6,6 +6,7 @@
 
 #include <config.h>
 #include <asiolink/asio_wrapper.h>
+#include <asiolink/interval_timer.h>
 #include <asiolink/io_service.h>
 #include <exceptions/exceptions.h>
 #include <http/http_thread_pool.h>
@@ -20,15 +21,23 @@ using namespace isc::http;
 
 namespace {
 
+/// @brief Test timeout (ms).
+const long TEST_TIMEOUT = 10000;
+
+/// @brief Simple test fixture for testing HttpThreadPool.
 class HttpThreadPoolTest : public ::testing::Test {
 public:
-    HttpThreadPoolTest() : io_service_(new IOService()) {
+    /// @brief Constructor.
+    HttpThreadPoolTest()
+        : io_service_(new IOService()) {
     }
 
-    ~HttpThreadPoolTest() {
+    /// @brief Destructor.
+    virtual ~HttpThreadPoolTest() {
         io_service_->stop();
     }
 
+    /// @brief IOService instance used by thread pools.
     IOServicePtr io_service_;
 };
 
@@ -41,138 +50,212 @@ TEST_F(HttpThreadPoolTest, invalidConstruction) {
                      "HttpThreadPool::ctor pool_size must be > 0");
 }
 
+// Verifies that a pool can be created without starting it.
 TEST_F(HttpThreadPoolTest, deferredStartConstruction) {
     HttpThreadPoolPtr pool;
 
     ASSERT_NO_THROW_LOG(pool.reset(new HttpThreadPool(io_service_, 3, true)));
 
+    // State should be stopped.
     // Pool size should be 3
-    // State should be stopped. 
     // IOService should be there.
-    // IOService is new, so it should not stopped,
+    // IOService is new, so it should not be stopped,
     // No threads in the pool.
+    ASSERT_EQ(HttpThreadPool::RunState::STOPPED, pool->getRunState());
     EXPECT_EQ(pool->getPoolSize(), 3);
     ASSERT_TRUE(pool->getIOService());
     EXPECT_FALSE(pool->getIOService()->stopped());
-    EXPECT_EQ(HttpThreadPool::RunState::STOPPED, pool->getRunState());
     EXPECT_EQ(pool->getThreadCount(), 0);
 
-    // Stop should not throw.
-    ASSERT_NO_THROW_LOG(pool->stop());
+    // Destructor should not throw.
+    ASSERT_NO_THROW_LOG(pool.reset());
+}
 
-    // Nothing should have changed.
-    EXPECT_EQ(HttpThreadPool::RunState::STOPPED, pool->getRunState());
-    EXPECT_FALSE(pool->getIOService()->stopped());
-    EXPECT_EQ(pool->getThreadCount(), 0);
+// Verifies that a pool can be started within the constructor.
+TEST_F(HttpThreadPoolTest, startDuringConstruction) {
+    HttpThreadPoolPtr pool;
 
-    // Start should not throw.
-    ASSERT_NO_THROW_LOG(pool->start());
+    ASSERT_NO_THROW_LOG(pool.reset(new HttpThreadPool(io_service_, 3)));
 
-    // State should be RUN, IOService should not be stopped, 
-    // and there should be 3 threads in the pool.
+    // Pool size should be 3, state should be RUNNING, IOService should
+    // set but not stopped, and we should have 3 threads in the pool.
+    ASSERT_EQ(HttpThreadPool::RunState::RUNNING, pool->getRunState());
     EXPECT_EQ(pool->getPoolSize(), 3);
-    EXPECT_EQ(HttpThreadPool::RunState::RUN, pool->getRunState());
+    ASSERT_TRUE(pool->getIOService());
     EXPECT_FALSE(pool->getIOService()->stopped());
     EXPECT_EQ(pool->getThreadCount(), 3);
 
-    // Stopping should not throw.
-    ASSERT_NO_THROW_LOG(pool->stop());
-
-    // State should be stopped, IOService should be stopped, and 
-    // there should be no threads in the pool.
-    EXPECT_EQ(HttpThreadPool::RunState::STOPPED, pool->getRunState());
-    EXPECT_TRUE(pool->getIOService()->stopped());
-    EXPECT_EQ(pool->getThreadCount(), 0);
-
     // Destructor should not throw.
     ASSERT_NO_THROW_LOG(pool.reset());
 }
 
-TEST_F(HttpThreadPoolTest, startDuringConstruction) {
+// Verifies that pool can move from STOPPED to RUNNING.
+TEST_F(HttpThreadPoolTest, stoppedToRunning) {
     HttpThreadPoolPtr pool;
 
-    ASSERT_NO_THROW_LOG(pool.reset(new HttpThreadPool(io_service_, 3)));
+    // Create a stopped pool.
+    ASSERT_NO_THROW_LOG(pool.reset(new HttpThreadPool(io_service_, 3, true)));
+    ASSERT_EQ(HttpThreadPool::RunState::STOPPED, pool->getRunState());
 
-    // Pool size should be 3, state should be RUN, IOService should
-    // set but not stopped, and we should have 3 threads in the pool.
-    EXPECT_EQ(pool->getPoolSize(), 3);
-    EXPECT_EQ(HttpThreadPool::RunState::RUN, pool->getRunState());
-    ASSERT_TRUE(pool->getIOService());
+    // Call run from STOPPED.
+    ASSERT_NO_THROW_LOG(pool->run());
+
+    // State should be RUNNING, IOService should not be stopped, we should
+    // have 3 threads in the pool.
+    ASSERT_EQ(HttpThreadPool::RunState::RUNNING, pool->getRunState());
+    EXPECT_FALSE(pool->getIOService()->stopped());
+    EXPECT_EQ(pool->getThreadCount(), 3);
+
+    // Calling run again should be harmless.
+    ASSERT_NO_THROW_LOG(pool->run());
+
+    // State should be RUNNING, IOService should not be stopped, we should
+    // have 3 threads in the pool.
+    ASSERT_EQ(HttpThreadPool::RunState::RUNNING, pool->getRunState());
     EXPECT_FALSE(pool->getIOService()->stopped());
     EXPECT_EQ(pool->getThreadCount(), 3);
 
-    // Starting again should throw.
-    ASSERT_THROW_MSG(pool->start(), InvalidOperation,
-                     "HttpThreadPool::start already started!");
+    // Destroying the pool should be fine.
+    ASSERT_NO_THROW_LOG(pool.reset());
+}
+
+// Verifies that pool can move from RUNNING to STOPPED.
+TEST_F(HttpThreadPoolTest, runningToStopped) {
+    HttpThreadPoolPtr pool;
+
+    // Create a running pool.
+    ASSERT_NO_THROW_LOG(pool.reset(new HttpThreadPool(io_service_, 3, false)));
+    ASSERT_EQ(HttpThreadPool::RunState::RUNNING, pool->getRunState());
 
-    // Stop should not throw.
+    // Call stop.
     ASSERT_NO_THROW_LOG(pool->stop());
 
-    // Pool size should be 3, state should STOPPED, IOService should
-    // be stopped, and there should be no threads in the pool.
-    EXPECT_EQ(pool->getPoolSize(), 3);
-    EXPECT_EQ(HttpThreadPool::RunState::STOPPED, pool->getRunState());
+    // State should be STOPPED, IOService should be stopped, we should
+    // have 0 threads in the pool.
+    ASSERT_EQ(HttpThreadPool::RunState::STOPPED, pool->getRunState());
     EXPECT_TRUE(pool->getIOService()->stopped());
     EXPECT_EQ(pool->getThreadCount(), 0);
 
-    // Destructor should not throw.
+    // Calling stop again should be harmless.
+    ASSERT_NO_THROW_LOG(pool->stop());
+
+    // State should be STOPPED, IOService should be stopped, we should
+    // have 0 threads in the pool.
+    ASSERT_EQ(HttpThreadPool::RunState::STOPPED, pool->getRunState());
+    EXPECT_TRUE(pool->getIOService()->stopped());
+    EXPECT_EQ(pool->getThreadCount(), 0);
+
+    // Destroying the pool should be fine.
     ASSERT_NO_THROW_LOG(pool.reset());
 }
 
-TEST_F(HttpThreadPoolTest, pauseAndResume) {
+// Verifies that pool can move from RUNNING to PAUSED.
+TEST_F(HttpThreadPoolTest, runningToPaused) {
     HttpThreadPoolPtr pool;
 
-    ASSERT_NO_THROW_LOG(pool.reset(new HttpThreadPool(io_service_, 3)));
+    // Create a running pool.
+    ASSERT_NO_THROW_LOG(pool.reset(new HttpThreadPool(io_service_, 3, false)));
+    ASSERT_EQ(HttpThreadPool::RunState::RUNNING, pool->getRunState());
 
-    // State should be RUN, IOService not stopped, 3 threads in the pool.
-    EXPECT_EQ(HttpThreadPool::RunState::RUN, pool->getRunState());
-    EXPECT_FALSE(pool->getIOService()->stopped());
-    EXPECT_EQ(pool->getThreadCount(), 3);
-
-    // Pause should not throw.
+    // Call pause from RUNNING.
     ASSERT_NO_THROW_LOG(pool->pause());
 
     // State should be PAUSED, IOService should be stopped, we should
-    // still have 3 threads in the pool.
-    EXPECT_EQ(HttpThreadPool::RunState::PAUSED, pool->getRunState());
+    // have 3 threads in the pool.
+    ASSERT_EQ(HttpThreadPool::RunState::PAUSED, pool->getRunState());
     EXPECT_TRUE(pool->getIOService()->stopped());
     EXPECT_EQ(pool->getThreadCount(), 3);
 
-    // Pausing again should be harmless.
+    // Calling pause again should be harmless.
     ASSERT_NO_THROW_LOG(pool->pause());
 
-    // Nothing should have changed.
-    EXPECT_EQ(HttpThreadPool::RunState::PAUSED, pool->getRunState());
+    // State should be PAUSED, IOService should be stopped, we should
+    // have 3 threads in the pool.
+    ASSERT_EQ(HttpThreadPool::RunState::PAUSED, pool->getRunState());
     EXPECT_TRUE(pool->getIOService()->stopped());
     EXPECT_EQ(pool->getThreadCount(), 3);
 
-    // Resume should not throw.
-    ASSERT_NO_THROW_LOG(pool->resume());
+    // Destroying the pool should be fine.
+    ASSERT_NO_THROW_LOG(pool.reset());
+}
 
-    // State should be PAUSED, IOService should be stopped, we should
-    // still have 3 threads in the pool.
-    EXPECT_EQ(HttpThreadPool::RunState::RUN, pool->getRunState());
-    EXPECT_FALSE(pool->getIOService()->stopped());
-    EXPECT_EQ(pool->getThreadCount(), 3);
+// Verifies that pool can move from PAUSED to RUNNING.
+TEST_F(HttpThreadPoolTest, pausedToRunning) {
+    HttpThreadPoolPtr pool;
+
+    // Create a running pool.
+    ASSERT_NO_THROW_LOG(pool.reset(new HttpThreadPool(io_service_, 3, false)));
+    ASSERT_EQ(HttpThreadPool::RunState::RUNNING, pool->getRunState());
 
-    // Resuming again should be harmless.
-    ASSERT_NO_THROW_LOG(pool->resume());
+    // Call pause from RUNNING.
+    ASSERT_NO_THROW_LOG(pool->pause());
+    ASSERT_EQ(HttpThreadPool::RunState::PAUSED, pool->getRunState());
 
-    // Nothing should have changed.
-    EXPECT_EQ(HttpThreadPool::RunState::RUN, pool->getRunState());
+    // Call run.
+    ASSERT_NO_THROW_LOG(pool->run());
+
+    // State should be RUNNING, IOService should not be stopped, we should
+    // have 3 threads in the pool.
+    ASSERT_EQ(HttpThreadPool::RunState::RUNNING, pool->getRunState());
     EXPECT_FALSE(pool->getIOService()->stopped());
     EXPECT_EQ(pool->getThreadCount(), 3);
 
-    // Stop should not throw.
+    // Destroying the pool should be fine.
+    ASSERT_NO_THROW_LOG(pool.reset());
+}
+
+// Verifies that pool can move from PAUSED to STOPPED.
+TEST_F(HttpThreadPoolTest, pausedToStopped) {
+    HttpThreadPoolPtr pool;
+
+    // Create a running pool.
+    ASSERT_NO_THROW_LOG(pool.reset(new HttpThreadPool(io_service_, 3, false)));
+    ASSERT_EQ(HttpThreadPool::RunState::RUNNING, pool->getRunState());
+
+    // Call pause from RUNNING.
+    ASSERT_NO_THROW_LOG(pool->pause());
+    ASSERT_EQ(HttpThreadPool::RunState::PAUSED, pool->getRunState());
+
+    // Call stop.
     ASSERT_NO_THROW_LOG(pool->stop());
 
-    // State should STOPPED, IOService should be stopped,
-    // and there should be no threads in the pool.
-    EXPECT_EQ(HttpThreadPool::RunState::STOPPED, pool->getRunState());
+    // State should be STOPPED, IOService should be stopped, we should
+    // have 0 threads in the pool.
+    ASSERT_EQ(HttpThreadPool::RunState::STOPPED, pool->getRunState());
     EXPECT_TRUE(pool->getIOService()->stopped());
     EXPECT_EQ(pool->getThreadCount(), 0);
 
-    // Destructor should not throw.
+    // Destroying the pool should be fine.
+    ASSERT_NO_THROW_LOG(pool.reset());
+}
+
+// Verifies that attempting to pause a STOPPED pool has no effect.
+TEST_F(HttpThreadPoolTest, stoppedToPaused) {
+    HttpThreadPoolPtr pool;
+
+    // Create a stopped pool.
+    ASSERT_NO_THROW_LOG(pool.reset(new HttpThreadPool(io_service_, 3, true)));
+    ASSERT_EQ(HttpThreadPool::RunState::STOPPED, pool->getRunState());
+
+    // State should be STOPPED, IOService won't be stopped because it was
+    // never started. We should have 0 threads in the pool.
+    ASSERT_EQ(HttpThreadPool::RunState::STOPPED, pool->getRunState());
+    EXPECT_FALSE(pool->getIOService()->stopped());
+    EXPECT_EQ(pool->getThreadCount(), 0);
+
+    // Call pause from STOPPED.
+    ASSERT_NO_THROW_LOG(pool->pause());
+
+    // Should have no effect.
+    ASSERT_EQ(HttpThreadPool::RunState::STOPPED, pool->getRunState());
+
+    // State should be STOPPED, IOService won't be stopped because it was
+    // never started. We should have 0 threads in the pool.
+    ASSERT_EQ(HttpThreadPool::RunState::STOPPED, pool->getRunState());
+    EXPECT_FALSE(pool->getIOService()->stopped());
+    EXPECT_EQ(pool->getThreadCount(), 0);
+
+    // Destroying the pool should be fine.
     ASSERT_NO_THROW_LOG(pool.reset());
 }
 
index de69cde0137eac3c389ff0b394c65ecc964f1810..ab313e81d13af5ea9da8a7896a45f04f03f711d9 100644 (file)
@@ -199,7 +199,7 @@ public:
         : io_service_(), client_(), listener_(), factory_(), listeners_(), factories_(),
           test_timer_(io_service_), num_threads_(0), num_batches_(0), num_listeners_(0),
           expected_requests_(0), num_in_progress_(0), num_finished_(0), paused_(false),
-          pause_cnt_(0), shutdown_(false) {
+          pause_cnt_(0) {
         test_timer_.setup(std::bind(&MtHttpClientTest::timeoutHandler, this, true),
                           TEST_TIMEOUT, IntervalTimer::ONE_SHOT);
         MultiThreadingMgr::instance().setMode(true);
@@ -209,7 +209,7 @@ public:
     ~MtHttpClientTest() {
         // Stop the client.
         if (client_) {
-            stopTestClient();
+            client_->stop();
         }
 
         // Stop all listeners.
@@ -234,11 +234,10 @@ public:
 
     /// @brief Runs the test's IOService until the desired number of requests
     /// have been carried out or the test fails.
-    void runIOService() {
-        // Loop until the clients are done, an error occurs, or the time runs out.
-        while (getRRCount() < expected_requests_) {
+    void runIOService(size_t request_limit) {
+        while (getRRCount() < request_limit) {
             // Always call reset() before we call run();
-            io_service_.get_io_service().reset();
+            io_service_.restart();
 
             // Run until a client stops the service.
             io_service_.run();
@@ -314,7 +313,7 @@ public:
                 } else {
                     // I'm ready but others aren't wait here.
                     bool ret = test_cv_.wait_for(lck, std::chrono::seconds(10),
-                                            [&]() { return (num_in_progress_ == num_threads_ || shutdown_); });
+                                            [&]() { return (num_in_progress_ == num_threads_); });
                     if (!ret) {
                         ADD_FAILURE() << "clients failed to start work";
                     }
@@ -345,7 +344,7 @@ public:
                 } else {
                     // I'm done but others aren't wait here.
                     bool ret = test_cv_.wait_for(lck, std::chrono::seconds(10),
-                                            [&]() { return (num_finished_ == num_threads_ || shutdown_); });
+                                            [&]() { return (num_finished_ == num_threads_); });
                     if (!ret) {
                         ADD_FAILURE() << "clients failed to finish work";
                     }
@@ -354,6 +353,58 @@ public:
         }));
     }
 
+
+    /// @brief Initiates a single HTTP request.
+    ///
+    /// Constructs an HTTP post whose body is a JSON map containing a
+    /// single integer element, "sequence".
+    ///
+    /// The request completion handler simply constructs the response,
+    /// and adds it the list of completed request/responses. If the
+    /// number of completed requests has reached the expected number
+    /// it stops the test IOService.
+    ///
+    /// @param sequence value for the integer element, "sequence",
+    /// to send in the request.
+    void startRequestSimple(int sequence, int port_offset = 0) {
+        // Create the URL on which the server can be reached.
+        std::stringstream ss;
+        ss << "http://" << SERVER_ADDRESS << ":" << (SERVER_PORT + port_offset);
+        Url url(ss.str());
+
+        // Initiate request to the server.
+        PostHttpRequestJsonPtr request_json = createRequest("sequence", sequence);
+        HttpResponseJsonPtr response_json = boost::make_shared<HttpResponseJson>();
+        ASSERT_NO_THROW(client_->asyncSendRequest(url, TlsContextPtr(),
+                                                  request_json, response_json,
+            [this, request_json, response_json](const boost::system::error_code& ec,
+                                                const HttpResponsePtr&,
+                                                const std::string&) {
+            // Bail on an error.
+            ASSERT_FALSE(ec) << "asyncSendRequest failed, ec: " << ec;
+
+            // Get stringified thread-id.
+            std::stringstream ss;
+            ss << std::this_thread::get_id();
+
+            // Create the ClientRR.
+            ClientRRPtr clientRR(new ClientRR());
+            clientRR->thread_id_ = ss.str();
+            clientRR->request_ = request_json;
+            clientRR->response_ = response_json;
+
+            {
+                std::unique_lock<std::mutex> lck(test_mutex_);
+                clientRRs_.push_back(clientRR);
+                ++num_finished_;
+                if ((num_finished_ >= expected_requests_) && !io_service_.stopped()) {
+                    io_service_.stop();
+                }
+            }
+
+        }));
+    }
+
     /// @brief Carries out HTTP requests via HttpClient to HTTP listener(s).
     ///
     /// This function creates one HttpClient with the given number
@@ -363,10 +414,7 @@ public:
     ///
     /// Then it iteratively runs the test's IOService until all
     /// the requests have been responded to, an error occurs, or the
-    /// test times out.  During each pass through the run loop, the
-    /// a call to shouldPause() is made to determine if the client
-    /// thread pool should be paused.  If so, the the pool is paused
-    /// and an timer begun which resumes the pool upon timeout.
+    /// test times out.
     ///
     /// Each request carries a single integer element, "sequence", which
     /// uniquely identifies the request. Each response is expected to
@@ -388,11 +436,8 @@ public:
     /// conducted.
     /// @param num_listeners number of HttpListeners to create. Defaults
     /// to 1.
-    /// @param num_pauses number of times to pause and resume the client
-    /// during the test.  Defaults to 0.
     void threadRequestAndReceive(size_t num_threads, size_t num_batches,
-                                 size_t num_listeners = 1,
-                                 size_t num_pauses = 0) {
+                                 size_t num_listeners = 1) {
         ASSERT_TRUE(num_batches);
         ASSERT_TRUE(num_listeners);
         num_threads_ = num_threads;
@@ -449,35 +494,17 @@ public:
             }
         }
 
-        // Create a timer to use for invoking resume after pause.
-        IntervalTimer pause_timer_(io_service_);
-        paused_ = false;
-
         // Loop until the clients are done, an error occurs, or the time runs out.
         while (getRRCount() < expected_requests_) {
             // Always call restart() before we call run();
             io_service_.restart();
 
-            if (shouldPause(num_pauses)) {
-                // Pause client.
-                paused_ = true;
-                ++pause_cnt_;
-                client_->pause();
-
-                // Set timer to resume client.
-                pause_timer_.setup(
-                    [this]() {
-                            client_->resume();
-                            paused_ = false;
-                    }, 10, IntervalTimer::ONE_SHOT);
-            }
-
             // Run until a client stops the service.
             io_service_.run();
         }
 
         // Client should stop without issue.
-        stopTestClient();
+        ASSERT_NO_THROW(client_->stop());
 
         // Listeners should stop without issue.
         for (const auto& listener : listeners_) {
@@ -487,21 +514,6 @@ public:
         // We should have a response for each request.
         ASSERT_EQ(getRRCount(), expected_requests_);
 
-        // We should have had the expected number of pauses.
-        if (!num_pauses) {
-            ASSERT_EQ(pause_cnt_, 0);
-        } else {
-            // We allow a range on pauses of +-1.  Figuring
-            // out the exact intervals at which to pause was
-            // getting to be a pain.  We don't really care as
-            // long as we're close.  The primary thing is that
-            // we did in fact pause and resume.
-            ASSERT_TRUE((num_pauses - 1) <= pause_cnt_ &&
-                        (pause_cnt_ <= (num_pauses + 1)))
-                        << " num+_pauses: " << num_pauses
-                        << ", pause_cnt_" << pause_cnt_;
-        }
-
         // Create a map to track number of responses for each client thread.
         std::map<std::string, int> responses_per_thread;
 
@@ -585,65 +597,30 @@ public:
         }
     }
 
-    /// @brief Indicates if the test should pause.
-    ///
-    /// Returns true if the number of completed requests
-    /// has reached or exceeded the next pause interval.
-    /// The pause interval is given by expected number of
-    /// requests divided by the desired number of pauses.
-    ///
-    /// @param num_pauses Desired number of pauses.
-    ///
-    /// @return True if the client should be paused.
-    bool shouldPause(size_t num_pauses) {
-        size_t rr_count = getRRCount();
-        if (paused_ || !num_pauses || !rr_count) {
-            return false;
-        }
-
-        size_t interval = expected_requests_ / num_pauses;
-        size_t next_stop = interval * (pause_cnt_ + 1);
-        return (rr_count >= next_stop);
-    }
-
-    /// @brief Stops the test client.
-    ///
-    /// Sets the shutdown flag and pings the test condition variable,
-    /// and then stops the thread pool.
-    void stopTestClient() {
-        // Set shutdown_ flag and notify any handles that may be waiting.
-        shutdown_ = true;
-        test_cv_.notify_all();
-
-        // Client should stop without issue.
-        ASSERT_NO_THROW(client_->stop());
-    }
-
-    /// @brief Verifies the client can be paused and shutdown while doing work.
+    /// @brief Verifies the client can be paused and resumed repeatedly
+    /// while doing multi-threaded doing work.
     ///
     /// @param num_threads number of threads the HttpClient should use.
-    /// A value of 0 puts the HttpClient in single-threaded mode.
+    /// Must be greater than zero, this test does not make sense for a
+    /// single threaded client.
     /// @param num_batches number of batches of requests that should be
     /// conducted.
     /// @param num_listeners number of HttpListeners to create. Defaults
     /// to 1.
-    void workPauseShutdown(size_t num_threads, size_t num_batches,
-                           size_t num_listeners = 1, bool pause_first = true) {
+    /// @param num_pauses number of pauses to conduct.
+    void workPauseResumeShutdown(size_t num_threads, size_t num_batches,
+                                 size_t num_listeners, size_t num_pauses) {
+        ASSERT_TRUE(num_threads);
         ASSERT_TRUE(num_batches);
         ASSERT_TRUE(num_listeners);
         num_threads_ = num_threads;
         num_batches_ = num_batches;
         num_listeners_ = num_listeners;
 
-        // Client in ST is, in effect, 1 thread.
-        size_t effective_threads = (num_threads_ == 0 ? 1 : num_threads_);
-
-        // Calculate the maximum requests that could complete.
-        size_t maximum_requests = (num_batches_ * num_listeners_ * effective_threads);
-
-        // Calculate the expected number of requests.
-        expected_requests_ = maximum_requests / 2;
+        // Calculate the total expected number of requests.
+        size_t total_requests = (num_batches_ * num_listeners_ * num_threads_);
 
+        // Create the listeners.
         for (auto i = 0; i < num_listeners_; ++i) {
             // Make a factory
             HttpResponseCreatorFactoryPtr factory(new TestHttpResponseCreatorFactory(SERVER_PORT + i));
@@ -661,23 +638,15 @@ public:
             ASSERT_NO_THROW(listener->start());
         }
 
-        // Create an MT client with num_threads
+        // Create an instant start, MT client with num_threads
         ASSERT_NO_THROW_LOG(client_.reset(new HttpClient(io_service_, num_threads)));
         ASSERT_TRUE(client_);
 
-        // Check convenience functions.
+        // Client shoudl be running. Check convenience functions.
         ASSERT_TRUE(client_->isRunning());
         ASSERT_FALSE(client_->isPaused());
         ASSERT_FALSE(client_->isStopped());
 
-        if (num_threads_ == 0) {
-            // If we single-threaded client should not have it's own IOService.
-            ASSERT_FALSE(client_->getThreadIOService());
-        } else {
-            // If we multi-threaded client should have it's own IOService.
-            ASSERT_TRUE(client_->getThreadIOService());
-        }
-
         // Verify the pool size and number of threads are as expected.
         ASSERT_EQ(client_->getThreadPoolSize(), num_threads);
         ASSERT_EQ(client_->getThreadCount(), num_threads);
@@ -687,55 +656,65 @@ public:
         int sequence = 0;
         for (auto b = 0; b < num_batches; ++b) {
             for (auto l = 0; l < num_listeners_; ++l) {
-                for (auto t = 0; t < effective_threads; ++t) {
-                    startRequest(++sequence, l);
+                for (auto t = 0; t < num_threads_; ++t) {
+                    startRequestSimple(++sequence, l);
                 }
             }
         }
 
-        // Loop until the 1/2 the requests are done, an error occurs,
-        // or the time runs out.
         size_t rr_count = 0;
-        while (rr_count < (expected_requests_)) {
-            // Always call reset() before we call run();
-            io_service_.get_io_service().reset();
+        while (rr_count < total_requests) {
+            size_t request_limit = (pause_cnt_ < num_pauses ?
+                                    (rr_count + ((total_requests - rr_count) / num_pauses))
+                                    : total_requests);
 
-            // Run until a client stops the service.
-            io_service_.run();
-            rr_count = getRRCount();
-        }
+            // Run test IOService until we hit the limit.
+            runIOService(request_limit);
+
+            // If we've done all our pauses we should be through.
+            if (pause_cnt_ == num_pauses) {
+                break;
+            }
 
-        if (pause_first) {
             // Pause the client.
             ASSERT_NO_THROW(client_->pause());
-            ASSERT_EQ(HttpThreadPool::RunState::PAUSED, client_->getRunState());
-
-            // Check convenience functions.
-            ASSERT_FALSE(client_->isRunning());
             ASSERT_TRUE(client_->isPaused());
-            ASSERT_FALSE(client_->isStopped());
-        }
+            ++pause_cnt_;
 
-        // We should have completed at least the expected number of requests
-        // but less than the maximum number of requests.
-        ASSERT_GE(getRRCount(), expected_requests_ );
-        ASSERT_LT(getRRCount(), maximum_requests);
+            // Check our progress.
+            rr_count = getRRCount();
+            ASSERT_GE(rr_count, request_limit);
+
+            // Resume the client.
+            ASSERT_NO_THROW(client_->resume());
+            ASSERT_TRUE(client_->isRunning());
+        }
 
         // Client should stop without issue.
-        stopTestClient();
+        ASSERT_NO_THROW(client_->stop());
+        ASSERT_TRUE(client_->isStopped());
+
+        // We should have finished all our requests.
+        ASSERT_EQ(getRRCount(), total_requests);
+
+        // Stopping again should be harmless.
+        ASSERT_NO_THROW(client_->stop());
 
         // Listeners should stop without issue.
         for (const auto& listener : listeners_) {
             ASSERT_NO_THROW(listener->stop());
         }
+
+        // Destructor should work fine.
+        client_.reset();
     }
 
     /// @brief Fetch the number of completed requests.
     ///
     /// @return number of completed requests.
     size_t getRRCount() {
-        std::unique_lock<std::mutex> lck(test_mutex_);
-        return(clientRRs_.size());
+        std::lock_guard<std::mutex> lck(test_mutex_);
+        return (clientRRs_.size());
     }
 
     /// @brief IO service used in the tests.
@@ -832,7 +811,7 @@ TEST_F(MtHttpClientTest, basics) {
     ASSERT_TRUE(client->getThreadIOService());
     ASSERT_EQ(client->getThreadPoolSize(), 3);
     ASSERT_EQ(client->getThreadCount(), 3);
-    ASSERT_EQ(client->getRunState(), HttpThreadPool::RunState::RUN);
+    ASSERT_EQ(client->getRunState(), HttpThreadPool::RunState::RUNNING);
 
     // Check convenience functions.
     ASSERT_TRUE(client->isRunning());
@@ -890,24 +869,23 @@ TEST_F(MtHttpClientTest, deferredStart) {
     // We should be able to start it.
     ASSERT_NO_THROW(client->start());
 
-    // Verify we have threads and run state is RUN.
+    // Verify we have threads and run state is RUNNING.
     ASSERT_EQ(client->getThreadCount(), 3);
     ASSERT_TRUE(client->getThreadIOService());
     ASSERT_FALSE(client->getThreadIOService()->stopped());
-    ASSERT_EQ(client->getRunState(), HttpThreadPool::RunState::RUN);
+    ASSERT_EQ(client->getRunState(), HttpThreadPool::RunState::RUNNING);
 
     // Check convenience functions.
     ASSERT_TRUE(client->isRunning());
     ASSERT_FALSE(client->isPaused());
     ASSERT_FALSE(client->isStopped());
 
-    // Cannot start it twice.
-    ASSERT_THROW_MSG(client->start(), InvalidOperation,
-                     "HttpThreadPool::start already started!");
+    // Second call to start should be harmless.
+    ASSERT_NO_THROW_LOG(client->start());
 
     // Verify we didn't break it.
     ASSERT_EQ(client->getThreadCount(), 3);
-    ASSERT_EQ(client->getRunState(), HttpThreadPool::RunState::RUN);
+    ASSERT_EQ(client->getRunState(), HttpThreadPool::RunState::RUNNING);
 
     // Make sure destruction doesn't throw.
     ASSERT_NO_THROW_LOG(client.reset());
@@ -927,7 +905,7 @@ TEST_F(MtHttpClientTest, restartAfterStop) {
     ASSERT_EQ(client->getThreadCount(), 3);
     ASSERT_TRUE(client->getThreadIOService());
     ASSERT_FALSE(client->getThreadIOService()->stopped());
-    ASSERT_EQ(client->getRunState(), HttpThreadPool::RunState::RUN);
+    ASSERT_EQ(client->getRunState(), HttpThreadPool::RunState::RUNNING);
 
     // Stop should succeed.
     ASSERT_NO_THROW_LOG(client->stop());
@@ -943,7 +921,7 @@ TEST_F(MtHttpClientTest, restartAfterStop) {
     ASSERT_EQ(client->getThreadCount(), 3);
     ASSERT_TRUE(client->getThreadIOService());
     ASSERT_FALSE(client->getThreadIOService()->stopped());
-    ASSERT_EQ(client->getRunState(), HttpThreadPool::RunState::RUN);
+    ASSERT_EQ(client->getRunState(), HttpThreadPool::RunState::RUNNING);
 
     // Make sure destruction doesn't throw.
     ASSERT_NO_THROW_LOG(client.reset());
@@ -1004,22 +982,14 @@ TEST_F(MtHttpClientTest, fourByFourByTwo) {
     threadRequestAndReceive(num_threads, num_batches, num_listeners);
 }
 
-// Verifies that we can cleanly work, pause, and resume repeatedly.
-TEST_F(MtHttpClientTest, workPauseResume) {
-    size_t num_threads = 12;
-    size_t num_batches = 12;
-    size_t num_listeners = 12;
-    size_t num_pauses = 7;
-    threadRequestAndReceive(num_threads, num_batches, num_listeners, num_pauses);
-}
-
-// Verifies that we can cleanly pause and shutdown while doing
+// Verifies that we can cleanly pause, resume, and shutdown while doing
 // multi-threaded work.
-TEST_F(MtHttpClientTest, workPauseShutdown) {
-    size_t num_threads = 8;
-    size_t num_batches = 8;
-    size_t num_listeners = 8;
-    workPauseShutdown(num_threads, num_batches, num_listeners);
+TEST_F(MtHttpClientTest, workPauseResumeShutdown) {
+    size_t num_threads = 4;
+    size_t num_batches = 4;
+    size_t num_listeners = 4;
+    size_t num_pauses = 3;
+    workPauseResumeShutdown(num_threads, num_batches, num_listeners, num_pauses);
 }
 
 } // end of anonymous namespace