]> git.ipfire.org Git - thirdparty/kea.git/commitdiff
[#1732] HttpClient ST/MT modes fully function with unit tests
authorThomas Markwalder <tmark@isc.org>
Thu, 25 Mar 2021 17:45:59 +0000 (13:45 -0400)
committerThomas Markwalder <tmark@isc.org>
Thu, 8 Apr 2021 12:59:48 +0000 (08:59 -0400)
HttpClient now supports both single and multi threaded modes.
MT mode is not currently used anywhere other than unit tests.

src/lib/http/client.*
    Added commentary, spell-check, cleanup

src/lib/http/http_log.h
    removed TOMS_TRACE_LOG

src/lib/http/tests/mt_client_unittests.cc
    Expanded testing
    Clean up

src/lib/http/client.cc
src/lib/http/client.h
src/lib/http/http_log.h
src/lib/http/tests/mt_client_unittests.cc

index 4521ea42c6ec29972c4d25dfc067688e34ed12ee..b0baf631ebac89a45b6c3011d63ac0ad4795e180 100644 (file)
 #include <mutex>
 #include <queue>
 
+#ifndef TOMS_TRACE_LOG
+#include <thread>
+#if 0
+#define TOMS_TRACE_LOG(msg) {std::cout << std::this_thread::get_id() << ":" << __FILE__ << ":" << __FUNCTION__ << ":" << __LINE__ << " " << msg << std::endl;}
+
+#else
+#define TOMS_TRACE_LOG(msg)
+#endif
+
+#endif
+
 using namespace isc;
 using namespace isc::asiolink;
 using namespace isc::http;
@@ -371,13 +382,10 @@ private:
     /// after invocation. Defaults to false.
     void closeCallback(const bool clear = false);
 
-    /// @brief Pointer to the connection pool owning this connection.
+    /// @brief Fetches the current socket descriptor, if one.
     ///
-    /// This is a weak pointer to avoid circular dependency between the
-    /// Connection and ConnectionPool.
-    boost::weak_ptr<ConnectionPool> conn_pool_;
-
-    int getSocketFd() {
+    /// @return The socket descriptor or -1.
+    int getSocketFd() const {
         int fd = -1;
 
         if (tcp_socket_) {
@@ -389,6 +397,12 @@ private:
         return (fd);
     }
 
+    /// @brief Pointer to the connection pool owning this connection.
+    ///
+    /// This is a weak pointer to avoid circular dependency between the
+    /// Connection and ConnectionPool.
+    boost::weak_ptr<ConnectionPool> conn_pool_;
+
     /// @brief URL for this connection.
     Url url_;
 
@@ -459,15 +473,14 @@ public:
     void addConnection(ConnectionPtr connection) {
         if (full()) {
             isc_throw(BadValue, "URL: " << url_.toText()
-                      << ", already at maximum connectsions: "
+                      << ", already at maximum connections: "
                       << max_connections_);
         }
 
         connections_.push_back(connection);
     }
 
-    /// @brief Closes a connection and removes it from the list. (Wonder
-    /// if I should call this removeConnection?)
+    /// @brief Closes a connection and removes it from the list. 
     ///
     /// @param connection the connection to remove
     void closeConnection(ConnectionPtr connection) {
@@ -543,9 +556,9 @@ public:
         return connections_.size();
     }
 
-    /// @brief Fetches the maxium number of connections.
+    /// @brief Fetches the maximum number of connections.
     ///
-    /// @return the maxium number of connections.
+    /// @return the maxim number of connections.
     size_t max_connections() const {
         return max_connections_;
     }
@@ -561,7 +574,7 @@ private:
     /// @brief URL supported by the list.
     Url url_;
 
-    /// @brief Maxium number of concurrent connections allowed in the list.
+    /// @brief Maximum number of concurrent connections allowed in the list.
     size_t max_connections_;
 
     /// @brief List of concurrent connections.
@@ -585,7 +598,7 @@ public:
     ///
     /// @param io_service Reference to the IO service to be used by the
     /// connections.
-    /// @param max_url_connections maxium number of concurrent
+    /// @param max_url_connections maximum number of concurrent
     /// connections allowed per URL.
     explicit ConnectionPool(IOService& io_service, size_t max_url_connections)
         : io_service_(io_service), conns_(), queue_(), mutex_(),
@@ -722,11 +735,14 @@ private:
                 // Now, look for an idle connection.
                 ConnectionPtr connection = conns_it->second->getIdleConnection();
                 if (!connection) {
-                    TOMS_TRACE_LOG("no idle connections, don't dequeue");
-                    // @todo TKM think the question below through... you perf teseted it
-                    // with simple return.
-                    // We shouldn't be here w/o an idle connection? ... if this is called
-                    // terminate, then how can the instigating connection not be free?
+                    TOMS_TRACE_LOG("*** No idle connections, don't dequeue?");
+                    // @todo Resolve this,  throw or just return, possibly log and return
+                    //
+                    // We shouldn't be in this function w/o an idle connection as it is called
+                    // from by terminate() after completion of a transaction? It should not be
+                    // possible for the connection that got us here to not be busy.
+                    // Do we throw or just not dequeue ther request?  It was TSAN tested and
+                    // perf tested with just the return.
                     // isc_throw(Unexpected, "no idle connections for :" << url.toText());
                     // Let's leave it on the queue, nothing idle yet?
                     return;
@@ -1206,7 +1222,8 @@ Connection::terminate(const boost::system::error_code& ec,
 void
 Connection::terminateInternal(const boost::system::error_code& ec,
                               const std::string& parsing_error) {
-    TOMS_TRACE_LOG(" on:" << getSocketFd())
+    TOMS_TRACE_LOG("terminate on: " << getSocketFd() 
+                    << ", isTransactionOngoing? " << isTransactionOngoing());
     HttpResponsePtr response;
 
     if (isTransactionOngoing()) {
@@ -1279,7 +1296,8 @@ Connection::terminateInternal(const boost::system::error_code& ec,
     // another transaction if there is at least one.
     ConnectionPoolPtr conn_pool = conn_pool_.lock();
     if (conn_pool) {
-        TOMS_TRACE_LOG(" more work...");
+        TOMS_TRACE_LOG(" more work on? " << getSocketFd() 
+                    << ", isTransactionOngoing? " << isTransactionOngoing());
         if (MultiThreadingMgr::instance().getMode()) {
             UnlockGuard<std::mutex> lock(mutex_);
             conn_pool->processNextRequest(url_);
@@ -1603,7 +1621,7 @@ public:
     /// - Creates a private IOService
     /// - Creates a thread pool with the thread_pool_size threads
     /// - Creates the connection pool passing the private IOService
-    /// and the thread_pool_size as the maximum nubmer of connections
+    /// and the thread_pool_size as the maximum number of connections
     /// per URL.
     ///
     /// @param io_service IOService that will drive connection IO in single
index a73a7448dec4584fde75f38ad3c743d70aa466a2..6825d567690375a7ecd43c269edf2b958f7258f1 100644 (file)
@@ -45,7 +45,7 @@ class HttpClientImpl;
 /// server's response. The last argument specified in this call is the pointer
 /// to the callback function, which should be launched when the response is
 /// received, an error occurs or when a timeout in the transmission is
-/// signalled.
+/// signaled.
 ///
 /// The HTTP client supports multiple simultaneous and persistent connections
 /// with different destinations. The client determines if the connection is
@@ -58,6 +58,16 @@ class HttpClientImpl;
 /// request is queued in the FIFO queue. When the previous request completes,
 /// the next request in the queue for the particular URL will be initiated.
 ///
+/// Furthermore, the class supports two modes of operation: single-threaded
+/// and multi-threaded mode.  In single-threaded mode, all IO is driven by
+/// an external IOService passed into the class constructor, and ultimately
+/// only a single connection per URL can be open at any given time. 
+///
+/// In multi-threaded mode, an internal thread pool, driven by a private
+/// IOService instance, is used to support multiple concurrent connections
+/// per URL. Currently the number of connections per URL is equal to the
+/// number of threads in the thread pool.
+///
 /// The client tests the persistent connection for usability before sending
 /// a request by trying to read from the socket (with message peeking). If
 /// the socket is usable the client uses it to transmit the request.
@@ -133,15 +143,28 @@ public:
     /// @brief Destructor.
     ~HttpClient();
 
-    /// @brief Queues new asynchronous HTTP request.
+    /// @brief Queues new asynchronous HTTP request for a given URL.
+    ///
+    /// The client maintains an internal connection pool which manages lists 
+    /// of connections per URL. In single-threaded mode, each URL is limited 
+    /// to a single /connection.  In multi-threaded mode, each URL may have 
+    /// more than one open connection per URL, enabling the client to carry
+    /// on multiple concurrent requests per URL.
+    ///
+    /// The client will search the pool for an open, idle connection for the
+    /// given URL.  If there are no idle connections, the client will open
+    /// a new connection up to the maximum number of connections allowed by the
+    /// thread mode.  If all possible connections are busy, the request is 
+    /// pushed on to back of a URL-specific FIFO queue of pending requests.
+    ///
+    /// If however, there is an idle connection available than a new transaction
+    /// for the request will be initiated immediately upon that connection.
     ///
-    /// The client creates one connection for the specified URL. If the
-    /// connection with the particular destination already exists, it will be
-    /// re-used for the new transaction scheduled with this call. If another
-    /// transaction is still in progress, the new transaction is queued. The
-    /// queued transactions are started in the FIFO order one after another. If
-    /// the connection is idle or the connection doesn't exist, the new
-    /// transaction is started immediately.
+    /// Note that when a connection completes a transaction, and its URL
+    /// queue is not empty, it will pop a pending request from the front of 
+    /// the queue and begin a new transaction for that request. The net effect
+    /// is that requests are always pulled from the front of the queue unless
+    /// the queue is empty. 
     ///
     /// The existing connection is tested before it is used for the new
     /// transaction by attempting to read (with message peeking) from
@@ -178,7 +201,7 @@ public:
     ///
     /// If message parsing was successful the second argument of the callback
     /// contains a pointer to the parsed response (the same pointer as provided
-    ///by the caller as the argument). If parsing was unsuccessful, the null
+    /// by the caller as the argument). If parsing was unsuccessful, the null
     /// pointer is returned.
     ///
     /// The default timeout for the transaction is set to 10 seconds
@@ -238,7 +261,7 @@ public:
     /// @brief Fetches a pointer to the internal IOService used to
     /// drive the thread-pool in multi-threaded mode.
     ///
-    /// @return pointer to the IOService instance, or an emtpy pointer
+    /// @return pointer to the IOService instance, or an empty pointer
     /// in single-threaded mode.
     const asiolink::IOServicePtr getMyIOService() const;
 
index 468e87b26e9c821ebe8c574e89029cc13ebbb655..498c6162b9ae4549d2b0bc3ce3721aa3fce8eb25 100644 (file)
@@ -20,13 +20,4 @@ extern isc::log::Logger http_logger;
 } // namespace http
 } // namespace isc
 
-#ifndef TOMS_TRACE_LOG
-#include <thread>
-
-#define TOMS_TRACE_LOG(msg) {std::cout << std::this_thread::get_id() << ":" << __FILE__ << ":" << __FUNCTION__ << ":" << __LINE__ << " " << msg << std::endl;}
-
-#define TOMS_LOG(msg)
-
-#endif
-
 #endif // HTTP_LOG_H
index 69295ddfb484a1c2657dee27b0b4c0333c3dbf4c..fc2ad5a48735e7394e6e6e9305ce5de15304ad49 100644 (file)
@@ -5,29 +5,25 @@
 // file, You can obtain one at http://mozilla.org/MPL/2.0/.
 
 #include <config.h>
+
 #include <asiolink/asio_wrapper.h>
 #include <asiolink/interval_timer.h>
 #include <cc/data.h>
 #include <http/client.h>
-#include <http/http_types.h>
 #include <http/listener.h>
-#include <http/listener_impl.h>
 #include <http/post_request_json.h>
 #include <http/response_creator.h>
 #include <http/response_creator_factory.h>
 #include <http/response_json.h>
-#include <http/tests/response_test.h>
 #include <http/url.h>
 #include <util/multi_threading_mgr.h>
 #include <testutils/gtest_utils.h>
 
-#include <boost/asio/buffer.hpp>
 #include <boost/asio/ip/tcp.hpp>
 #include <boost/pointer_cast.hpp>
 #include <gtest/gtest.h>
 
 #include <functional>
-#include <list>
 #include <sstream>
 #include <string>
 
@@ -35,78 +31,33 @@ using namespace isc;
 using namespace isc::asiolink;
 using namespace isc::data;
 using namespace isc::http;
-using namespace isc::http::test;
 using namespace isc::util;
 namespace ph = std::placeholders;
 
-namespace isc {
-data::ConstElementPtr 
-http::HttpResponse::getJsonElement(const std::string& element_name) const {
-    try {
-        ConstElementPtr body = Element::fromJSON(getBody());
-        if (body) {
-            const std::map<std::string, ConstElementPtr>& map_value = body->mapValue();
-            auto map_element = map_value.find(element_name);
-            if (map_element != map_value.end()) {
-                return (map_element->second);
-            }
-        }
-
-    } catch (const std::exception& ex) {
-        isc_throw(HttpResponseError, "unable to get JSON element "
-                  << element_name << ": " << ex.what());
-    }
-
-    return (ConstElementPtr());
-}
-}
-
-
 namespace {
 
 /// @brief IP address to which HTTP service is bound.
 const std::string SERVER_ADDRESS = "127.0.0.1";
 
-/// @brief IPv6 address to whch HTTP service is bound.
+/// @brief IPv6 address to which HTTP service is bound.
 const std::string IPV6_SERVER_ADDRESS = "::1";
 
 /// @brief Port number to which HTTP service is bound.
 const unsigned short SERVER_PORT = 18123;
 
-/// @brief Request Timeout used in most of the tests (ms).
-const long REQUEST_TIMEOUT = 10000;
-
-/// @brief Persistent connection idle timeout used in most of the tests (ms).
-const long IDLE_TIMEOUT = 10000;
-
-/// @brief Persistent connection idle timeout used in tests where idle connections
-/// are tested (ms).
-const long SHORT_IDLE_TIMEOUT = 200;
-
 /// @brief Test timeout (ms).
 const long TEST_TIMEOUT = 10000;
 
-/// @brief Test HTTP response.
-typedef TestHttpResponseBase<HttpResponseJson> Response;
-
-/// @brief Pointer to test HTTP response.
-typedef boost::shared_ptr<Response> ResponsePtr;
-
-/// @brief Generic test HTTP response.
-typedef TestHttpResponseBase<HttpResponse> GenericResponse;
-
-/// @brief Pointer to generic test HTTP response.
-typedef boost::shared_ptr<GenericResponse> GenericResponsePtr;
-
+/// @brief Container request/response pair handled by a given thread.
 struct ClientRR {
     std::string thread_id_;
-    HttpRequestPtr request_;
-    HttpResponsePtr response_;
+    PostHttpRequestJsonPtr request_;
+    HttpResponseJsonPtr response_;
 };
 
+/// @brief Pointer to a ClientRR.
 typedef boost::shared_ptr<ClientRR> ClientRRPtr;
 
-/// @todo  Creator and Factory are currently with server_client_unittests.cc
 /// @brief Implementation of the @ref HttpResponseCreator.
 class TestHttpResponseCreator : public HttpResponseCreator {
 public:
@@ -131,26 +82,19 @@ private:
         // The request hasn't been finalized so the request object
         // doesn't contain any information about the HTTP version number
         // used. But, the context should have this data (assuming the
-        // HTTP version is parsed ok).
+        // HTTP version is parsed OK).
         HttpVersion http_version(request->context()->http_version_major_,
                                  request->context()->http_version_minor_);
         // This will generate the response holding JSON content.
-        ResponsePtr response(new Response(http_version, status_code));
+        HttpResponseJsonPtr response(new HttpResponseJson(http_version, status_code));
         response->finalize();
         return (response);
     }
 
     /// @brief Creates HTTP response.
     ///
-    /// This method generates 3 types of responses:
-    /// - response with a requested content type,
-    /// - partial response with incomplete JSON body,
-    /// - response with JSON body copied from the request.
-    ///
-    /// The first one is useful to test situations when received response can't
-    /// be parsed because of the content type mismatch. The second one is useful
-    /// to test request timeouts. The third type is used by most of the unit tests
-    /// to test successful transactions.
+    /// This method generates a response with the JSON body copied 
+    /// from the request.
     ///
     /// @param request Pointer to the HTTP request.
     /// @return Pointer to the generated HTTP OK response with no content.
@@ -159,57 +103,23 @@ private:
         // Request must always be JSON.
         PostHttpRequestJsonPtr request_json =
             boost::dynamic_pointer_cast<PostHttpRequestJson>(request);
-        ConstElementPtr body;
-        if (request_json) {
-            body = request_json->getBodyAsJson();
-            if (body) {
-                // Check if the client requested one of the two first response
-                // types.
-                GenericResponsePtr response;
-                ConstElementPtr content_type = body->get("requested-content-type");
-                ConstElementPtr partial_response = body->get("partial-response");
-                if (content_type || partial_response) {
-                    // The first two response types can only be generated using the
-                    // generic response as we have to explicitly modify some of the
-                    // values.
-                    response.reset(new GenericResponse(request->getHttpVersion(),
-                                                       HttpStatusCode::OK));
-                    HttpResponseContextPtr ctx = response->context();
-
-                    if (content_type) {
-                        // Provide requested content type.
-                        ctx->headers_.push_back(HttpHeaderContext("Content-Type",
-                                                                  content_type->stringValue()));
-                        // It doesn't matter what body is there.
-                        ctx->body_ = "abcd";
-                        response->finalize();
-
-                    } else {
-                        // Generate JSON response.
-                        ctx->headers_.push_back(HttpHeaderContext("Content-Type",
-                                                                  "application/json"));
-                        // The body lacks '}' so the client will be waiting for it and
-                        // eventually should time out.
-                        ctx->body_ = "{";
-                        response->finalize();
-                        // The auto generated Content-Length header would be based on the
-                        // body size (so set to 1 byte). We have to override it to
-                        // account for the missing '}' character.
-                        response->setContentLength(2);
-                    }
-                    return (response);
-                }
-            }
+
+        if (!request_json) {
+            return(createStockHttpResponse(request, HttpStatusCode::BAD_REQUEST));
         }
 
-        // Third type of response is requested.
-        ResponsePtr response(new Response(request->getHttpVersion(),
-                                          HttpStatusCode::OK));
-        // If body was included in the request. Let's copy it.
-        if (body) {
-            response->setBodyAsJson(body);
+        // Request must always contain a body.
+        ConstElementPtr body = request_json->getBodyAsJson();
+        if (!body) {
+            return(createStockHttpResponse(request, HttpStatusCode::BAD_REQUEST));
         }
 
+        HttpResponseJsonPtr response(new HttpResponseJson(request->getHttpVersion(),
+                                                          HttpStatusCode::OK));
+
+        // Echo request body back in the response.
+        response->setBodyAsJson(body);
+
         response->finalize();
         return (response);
     }
@@ -228,7 +138,7 @@ public:
     }
 };
 
-/// @brief Test fixture class for testing HTTP client.
+/// @brief Test fixture class for testing multi-threaded HTTP client.
 class MtHttpClientTest : public ::testing::Test {
 public:
 
@@ -243,11 +153,13 @@ public:
 
     /// @brief Destructor.
     ~MtHttpClientTest() {
+        if (client_) {
+            client_->stop();
+        }
+
         if (listener_) {
             listener_->stop();
         }
-
-        MultiThreadingMgr::instance().setMode(false);
     }
 
     /// @brief Callback function invoke upon test timeout.
@@ -262,7 +174,8 @@ public:
         io_service_.stop();
     }
 
-   void runIOService() {
+    /// @brief Runs test's IOService until the desired number of have been carried out.
+    void runIOService() {
         // Loop until the clients are done, an error occurs, or the time runs out.
         while (clientRRs_.size() < num_requests_) {
             // Always call restart() before we call run();
@@ -301,21 +214,35 @@ public:
         return (request);
     }
 
-    void startRequest() {
+    /// @brief Initiates a single HTTP request.
+    ///
+    /// Constructs an HTTP post whose body is a JSON map containing a
+    /// single integer element, "sequence".
+    ///
+    /// The request completion handler will block each requesting thread
+    /// until the number of in-progress threads reaches the number of
+    /// threads in the pool.  At that point, the handler will unblock
+    /// until all threads have finished preparing the response and are
+    /// ready to return.   The handler will notify all pending threads
+    /// and invoke stop() on the test's main IO service thread. 
+    ///
+    /// @param sequence value for the integer element, "sequence",
+    /// to send in the request.
+    void startRequest(int sequence) {
         // Create the URL on which the server can be reached.
         std::stringstream ss;
         ss << "http://" << SERVER_ADDRESS << ":" << SERVER_PORT;
         Url url(ss.str());
 
         // Initiate request to the server.
-        PostHttpRequestJsonPtr request = createRequest("sequence", 1);
+        PostHttpRequestJsonPtr request_json = createRequest("sequence", sequence);
 
         HttpResponseJsonPtr response_json = boost::make_shared<HttpResponseJson>();
 
         ASSERT_NO_THROW(client_->asyncSendRequest(url, TlsContextPtr(),
-                                                  request, response_json,
-            [this, request](const boost::system::error_code& ec,
-                   const HttpResponsePtr& response,
+                                                  request_json, response_json,
+            [this, request_json, response_json](const boost::system::error_code& ec,
+                   const HttpResponsePtr&/* response*/,
                    const std::string&) {
             // Bail on an error.
             ASSERT_FALSE(ec) << "asyncSendRequest failed, ec: " << ec;
@@ -342,8 +269,8 @@ public:
             // Create the ClientRR.
             ClientRRPtr clientRR(new ClientRR());
             clientRR->thread_id_ =  ss.str();
-            clientRR->request_ = request;
-            clientRR->response_ = response;
+            clientRR->request_ = request_json;
+            clientRR->response_ = response_json;
 
             {
                 std::unique_lock<std::mutex> lck(mutex_);
@@ -353,7 +280,7 @@ public:
                     // We're all done, notify the others and finish.
                     num_in_progress_ = 0;
                     cv_.notify_all();
-                    // Stop the test's IOservice.
+                    // Stop the test's IOService.
                     io_service_.stop();
                 } else {
                     // I'm done but others aren't wait here.
@@ -367,10 +294,35 @@ public:
         }));
     }
 
+    /// @brief Starts one or more HTTP requests via HttpClient to a test listener.
+    ///
+    /// This function command creates a HttpClient with the given number
+    /// of threads. It initiates then given number of HTTP requests. Each 
+    /// request carries a single integer element, "sequence" in its body.
+    /// The response is expected to be this same element echoed back.
+    /// Then it iteratively runs the test's IOService until all
+    /// the requests have been responded to, an error occurs, or the
+    /// test times out.
+    ///
+    /// It requires that the number of requests, when greater than the
+    /// number of threads, be a multiple of the number of threads.  The
+    /// requests completion handler is structured in such a way as to ensure
+    /// (we hope) that each client thread handles the same number of requests.
+    ///
+    /// @param num_threads - the number of threads the HttpClient
+    /// should use. A value of 0 puts the HttpClient in single-threaded mode.
+    /// @param num_requests - the number of requests that should be carried out.
+    /// Must be greater than 0. If it is greater than num_threads it must be a
+    /// multiple of num_threads.
+    ///
+    /// @param num_threads
+    /// @param num_requests
     void threadRequestAndReceive(size_t num_threads, size_t num_requests) {
         // First we makes sure the parameter rules apply.
         ASSERT_TRUE((num_threads == 0) || (num_requests < num_threads) 
                     || (num_requests % num_threads == 0));
+        num_threads_ = num_threads;
+        num_requests_ = num_requests;
 
         // Make a factory
         factory_.reset(new TestHttpResponseCreatorFactory());
@@ -385,35 +337,94 @@ public:
         // Start the server.
         ASSERT_NO_THROW(listener_->start());
 
-        num_threads_ = num_threads;
-        num_requests_ = num_requests;
-
         // Create an MT client with num_threads
         ASSERT_NO_THROW_LOG(client_.reset(new HttpClient(io_service_, num_threads)));
         ASSERT_TRUE(client_);
 
         if (num_threads_ == 0) {
+            // If we single-threaded client should not have it's own IOService.
             ASSERT_FALSE(client_->getMyIOService());
         } else {
+            // If we multi-threaded client should have it's own IOService.
             ASSERT_TRUE(client_->getMyIOService());
         }
+
+        // Verify the pool size and number of threads are as expected.
         ASSERT_EQ(client_->getThreadPoolSize(), num_threads);
         ASSERT_EQ(client_->getThreadCount(), num_threads);
 
         // Start the requisite number of requests.
-        for (auto i = 0; i < num_requests_; ++i) {
-            startRequest();
+        for (int i = 0; i < num_requests_; ++i) {
+            startRequest(i + 1);
         }
 
         // Run test thread IOService.  This drives the listener's IO.
         ASSERT_NO_THROW(runIOService());
 
+        // We should have a response for each request.
         ASSERT_EQ(clientRRs_.size(), num_requests_);
+
+        // Create a map to track number of responses for each client thread.
+        std::map<std::string, int> responses_per_thread;
+
+        // Get the stringified thread-id of the test's main thread.
+        std::stringstream ss;
+        ss << std::this_thread::get_id();
+        std::string main_thread_id = ss.str();
+
+        // Iterate over the client request/response pairs.
         for (auto const& clientRR : clientRRs_ ) {
-            HttpResponsePtr response = clientRR->response_;
-            ASSERT_TRUE(response);
-            ConstElementPtr sequence = response->getJsonElement("sequence");
+            // Make sure it's whole.
+            ASSERT_FALSE(clientRR->thread_id_.empty());
+            ASSERT_TRUE(clientRR->request_);
+            ASSERT_TRUE(clientRR->response_);
+
+            // Request should contain an integer sequence number.
+            int request_sequence;
+            ConstElementPtr sequence = clientRR->request_->getJsonElement("sequence");
+            ASSERT_TRUE(sequence);
+            ASSERT_NO_THROW(request_sequence = sequence->intValue());
+
+            // Response should contain an integer sequence number.
+            int response_sequence;
+            sequence = clientRR->response_->getJsonElement("sequence");
             ASSERT_TRUE(sequence);
+            ASSERT_NO_THROW(response_sequence = sequence->intValue());
+
+            // Request and Response sequence numbers should match.
+            ASSERT_EQ(request_sequence, response_sequence);
+
+            if (num_threads_ == 0) {
+                // For ST mode thread id should always be the main thread.
+                ASSERT_EQ(clientRR->thread_id_, main_thread_id);
+            } else {
+                // For MT mode the thread id should never be the main thread.
+                ASSERT_NE(clientRR->thread_id_, main_thread_id);
+            }
+
+            // Bump the response count for the given client thread-id.
+            auto it = responses_per_thread.find(clientRR->thread_id_);
+            if (it != responses_per_thread.end()) {
+                responses_per_thread[clientRR->thread_id_] = it->second + 1;
+            } else {
+                responses_per_thread[clientRR->thread_id_] = 1;
+            }
+        }
+
+        // Make sure we have the expected number of responding threads.
+        if (num_threads_ == 0) {
+            ASSERT_EQ(responses_per_thread.size(), 1);
+        } else {
+            size_t expected_thread_count = (num_requests_ < num_threads_ ?
+                                            num_requests_ : num_threads_);
+            ASSERT_EQ(responses_per_thread.size(), expected_thread_count);
+        }
+
+        // Each thread-id ought to have received the same number of responses.
+        for (auto const& it : responses_per_thread) {
+            EXPECT_EQ(it.second, num_requests_ / responses_per_thread.size())
+                      << "thread-id: " << it.first
+                      << ", responses: " << it.second << std::endl;
         }
 
         ASSERT_NO_THROW(client_->stop());
@@ -513,35 +524,39 @@ TEST_F(MtHttpClientTest, basics) {
     // Create another multi-threaded instance.
     ASSERT_NO_THROW_LOG(client.reset(new HttpClient(io_service_, 3)));
 
-    // Make sure destruction doesn't throw. Note, if destuctor
-    // doesn't stop the threads correctly the test will crash upon exit.
+    // Make sure destruction doesn't throw.
     ASSERT_NO_THROW_LOG(client.reset());
 }
 
+// Now we'll run some permutations of the number of client threads
+// and the number of client requests.
+
+// Single-threaded, three requests.
 TEST_F(MtHttpClientTest, zeroByThree) {
-    // Zero threads = ST mode.
-    size_t num_threads = 0; 
+    size_t num_threads = 0; // Zero threads = ST mode.
     size_t num_requests = 3;
     threadRequestAndReceive(num_threads, num_requests);
 }
 
-
+// Multi-threaded with one thread, three requests.
 TEST_F(MtHttpClientTest, oneByThree) {
     size_t num_threads = 1;
     size_t num_requests = 3;
     threadRequestAndReceive(num_threads, num_requests);
 }
 
+// Multi-threaded with threads, three requests.
 TEST_F(MtHttpClientTest, threeByThree) {
     size_t num_threads = 3;
     size_t num_requests = 3;
     threadRequestAndReceive(num_threads, num_requests);
 }
 
+// Multi-threaded with threads, nine requests.
 TEST_F(MtHttpClientTest, threeByNine) {
     size_t num_threads = 3;
     size_t num_requests = 9;
     threadRequestAndReceive(num_threads, num_requests);
 }
 
-}
+} // end of anonymous namespace