#include <mutex>
#include <queue>
+#ifndef TOMS_TRACE_LOG
+#include <thread>
+#if 0
+#define TOMS_TRACE_LOG(msg) {std::cout << std::this_thread::get_id() << ":" << __FILE__ << ":" << __FUNCTION__ << ":" << __LINE__ << " " << msg << std::endl;}
+
+#else
+#define TOMS_TRACE_LOG(msg)
+#endif
+
+#endif
+
using namespace isc;
using namespace isc::asiolink;
using namespace isc::http;
/// after invocation. Defaults to false.
void closeCallback(const bool clear = false);
- /// @brief Pointer to the connection pool owning this connection.
+ /// @brief Fetches the current socket descriptor, if one.
///
- /// This is a weak pointer to avoid circular dependency between the
- /// Connection and ConnectionPool.
- boost::weak_ptr<ConnectionPool> conn_pool_;
-
- int getSocketFd() {
+ /// @return The socket descriptor or -1.
+ int getSocketFd() const {
int fd = -1;
if (tcp_socket_) {
return (fd);
}
+ /// @brief Pointer to the connection pool owning this connection.
+ ///
+ /// This is a weak pointer to avoid circular dependency between the
+ /// Connection and ConnectionPool.
+ boost::weak_ptr<ConnectionPool> conn_pool_;
+
/// @brief URL for this connection.
Url url_;
void addConnection(ConnectionPtr connection) {
if (full()) {
isc_throw(BadValue, "URL: " << url_.toText()
- << ", already at maximum connectsions: "
+ << ", already at maximum connections: "
<< max_connections_);
}
connections_.push_back(connection);
}
- /// @brief Closes a connection and removes it from the list. (Wonder
- /// if I should call this removeConnection?)
+ /// @brief Closes a connection and removes it from the list.
///
/// @param connection the connection to remove
void closeConnection(ConnectionPtr connection) {
return connections_.size();
}
- /// @brief Fetches the maxium number of connections.
+ /// @brief Fetches the maximum number of connections.
///
- /// @return the maxium number of connections.
+ /// @return the maxim number of connections.
size_t max_connections() const {
return max_connections_;
}
/// @brief URL supported by the list.
Url url_;
- /// @brief Maxium number of concurrent connections allowed in the list.
+ /// @brief Maximum number of concurrent connections allowed in the list.
size_t max_connections_;
/// @brief List of concurrent connections.
///
/// @param io_service Reference to the IO service to be used by the
/// connections.
- /// @param max_url_connections maxium number of concurrent
+ /// @param max_url_connections maximum number of concurrent
/// connections allowed per URL.
explicit ConnectionPool(IOService& io_service, size_t max_url_connections)
: io_service_(io_service), conns_(), queue_(), mutex_(),
// Now, look for an idle connection.
ConnectionPtr connection = conns_it->second->getIdleConnection();
if (!connection) {
- TOMS_TRACE_LOG("no idle connections, don't dequeue");
- // @todo TKM think the question below through... you perf teseted it
- // with simple return.
- // We shouldn't be here w/o an idle connection? ... if this is called
- // terminate, then how can the instigating connection not be free?
+ TOMS_TRACE_LOG("*** No idle connections, don't dequeue?");
+ // @todo Resolve this, throw or just return, possibly log and return
+ //
+ // We shouldn't be in this function w/o an idle connection as it is called
+ // from by terminate() after completion of a transaction? It should not be
+ // possible for the connection that got us here to not be busy.
+ // Do we throw or just not dequeue ther request? It was TSAN tested and
+ // perf tested with just the return.
// isc_throw(Unexpected, "no idle connections for :" << url.toText());
// Let's leave it on the queue, nothing idle yet?
return;
void
Connection::terminateInternal(const boost::system::error_code& ec,
const std::string& parsing_error) {
- TOMS_TRACE_LOG(" on:" << getSocketFd())
+ TOMS_TRACE_LOG("terminate on: " << getSocketFd()
+ << ", isTransactionOngoing? " << isTransactionOngoing());
HttpResponsePtr response;
if (isTransactionOngoing()) {
// another transaction if there is at least one.
ConnectionPoolPtr conn_pool = conn_pool_.lock();
if (conn_pool) {
- TOMS_TRACE_LOG(" more work...");
+ TOMS_TRACE_LOG(" more work on? " << getSocketFd()
+ << ", isTransactionOngoing? " << isTransactionOngoing());
if (MultiThreadingMgr::instance().getMode()) {
UnlockGuard<std::mutex> lock(mutex_);
conn_pool->processNextRequest(url_);
/// - Creates a private IOService
/// - Creates a thread pool with the thread_pool_size threads
/// - Creates the connection pool passing the private IOService
- /// and the thread_pool_size as the maximum nubmer of connections
+ /// and the thread_pool_size as the maximum number of connections
/// per URL.
///
/// @param io_service IOService that will drive connection IO in single
/// server's response. The last argument specified in this call is the pointer
/// to the callback function, which should be launched when the response is
/// received, an error occurs or when a timeout in the transmission is
-/// signalled.
+/// signaled.
///
/// The HTTP client supports multiple simultaneous and persistent connections
/// with different destinations. The client determines if the connection is
/// request is queued in the FIFO queue. When the previous request completes,
/// the next request in the queue for the particular URL will be initiated.
///
+/// Furthermore, the class supports two modes of operation: single-threaded
+/// and multi-threaded mode. In single-threaded mode, all IO is driven by
+/// an external IOService passed into the class constructor, and ultimately
+/// only a single connection per URL can be open at any given time.
+///
+/// In multi-threaded mode, an internal thread pool, driven by a private
+/// IOService instance, is used to support multiple concurrent connections
+/// per URL. Currently the number of connections per URL is equal to the
+/// number of threads in the thread pool.
+///
/// The client tests the persistent connection for usability before sending
/// a request by trying to read from the socket (with message peeking). If
/// the socket is usable the client uses it to transmit the request.
/// @brief Destructor.
~HttpClient();
- /// @brief Queues new asynchronous HTTP request.
+ /// @brief Queues new asynchronous HTTP request for a given URL.
+ ///
+ /// The client maintains an internal connection pool which manages lists
+ /// of connections per URL. In single-threaded mode, each URL is limited
+ /// to a single /connection. In multi-threaded mode, each URL may have
+ /// more than one open connection per URL, enabling the client to carry
+ /// on multiple concurrent requests per URL.
+ ///
+ /// The client will search the pool for an open, idle connection for the
+ /// given URL. If there are no idle connections, the client will open
+ /// a new connection up to the maximum number of connections allowed by the
+ /// thread mode. If all possible connections are busy, the request is
+ /// pushed on to back of a URL-specific FIFO queue of pending requests.
+ ///
+ /// If however, there is an idle connection available than a new transaction
+ /// for the request will be initiated immediately upon that connection.
///
- /// The client creates one connection for the specified URL. If the
- /// connection with the particular destination already exists, it will be
- /// re-used for the new transaction scheduled with this call. If another
- /// transaction is still in progress, the new transaction is queued. The
- /// queued transactions are started in the FIFO order one after another. If
- /// the connection is idle or the connection doesn't exist, the new
- /// transaction is started immediately.
+ /// Note that when a connection completes a transaction, and its URL
+ /// queue is not empty, it will pop a pending request from the front of
+ /// the queue and begin a new transaction for that request. The net effect
+ /// is that requests are always pulled from the front of the queue unless
+ /// the queue is empty.
///
/// The existing connection is tested before it is used for the new
/// transaction by attempting to read (with message peeking) from
///
/// If message parsing was successful the second argument of the callback
/// contains a pointer to the parsed response (the same pointer as provided
- ///by the caller as the argument). If parsing was unsuccessful, the null
+ /// by the caller as the argument). If parsing was unsuccessful, the null
/// pointer is returned.
///
/// The default timeout for the transaction is set to 10 seconds
/// @brief Fetches a pointer to the internal IOService used to
/// drive the thread-pool in multi-threaded mode.
///
- /// @return pointer to the IOService instance, or an emtpy pointer
+ /// @return pointer to the IOService instance, or an empty pointer
/// in single-threaded mode.
const asiolink::IOServicePtr getMyIOService() const;
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
#include <config.h>
+
#include <asiolink/asio_wrapper.h>
#include <asiolink/interval_timer.h>
#include <cc/data.h>
#include <http/client.h>
-#include <http/http_types.h>
#include <http/listener.h>
-#include <http/listener_impl.h>
#include <http/post_request_json.h>
#include <http/response_creator.h>
#include <http/response_creator_factory.h>
#include <http/response_json.h>
-#include <http/tests/response_test.h>
#include <http/url.h>
#include <util/multi_threading_mgr.h>
#include <testutils/gtest_utils.h>
-#include <boost/asio/buffer.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/pointer_cast.hpp>
#include <gtest/gtest.h>
#include <functional>
-#include <list>
#include <sstream>
#include <string>
using namespace isc::asiolink;
using namespace isc::data;
using namespace isc::http;
-using namespace isc::http::test;
using namespace isc::util;
namespace ph = std::placeholders;
-namespace isc {
-data::ConstElementPtr
-http::HttpResponse::getJsonElement(const std::string& element_name) const {
- try {
- ConstElementPtr body = Element::fromJSON(getBody());
- if (body) {
- const std::map<std::string, ConstElementPtr>& map_value = body->mapValue();
- auto map_element = map_value.find(element_name);
- if (map_element != map_value.end()) {
- return (map_element->second);
- }
- }
-
- } catch (const std::exception& ex) {
- isc_throw(HttpResponseError, "unable to get JSON element "
- << element_name << ": " << ex.what());
- }
-
- return (ConstElementPtr());
-}
-}
-
-
namespace {
/// @brief IP address to which HTTP service is bound.
const std::string SERVER_ADDRESS = "127.0.0.1";
-/// @brief IPv6 address to whch HTTP service is bound.
+/// @brief IPv6 address to which HTTP service is bound.
const std::string IPV6_SERVER_ADDRESS = "::1";
/// @brief Port number to which HTTP service is bound.
const unsigned short SERVER_PORT = 18123;
-/// @brief Request Timeout used in most of the tests (ms).
-const long REQUEST_TIMEOUT = 10000;
-
-/// @brief Persistent connection idle timeout used in most of the tests (ms).
-const long IDLE_TIMEOUT = 10000;
-
-/// @brief Persistent connection idle timeout used in tests where idle connections
-/// are tested (ms).
-const long SHORT_IDLE_TIMEOUT = 200;
-
/// @brief Test timeout (ms).
const long TEST_TIMEOUT = 10000;
-/// @brief Test HTTP response.
-typedef TestHttpResponseBase<HttpResponseJson> Response;
-
-/// @brief Pointer to test HTTP response.
-typedef boost::shared_ptr<Response> ResponsePtr;
-
-/// @brief Generic test HTTP response.
-typedef TestHttpResponseBase<HttpResponse> GenericResponse;
-
-/// @brief Pointer to generic test HTTP response.
-typedef boost::shared_ptr<GenericResponse> GenericResponsePtr;
-
+/// @brief Container request/response pair handled by a given thread.
struct ClientRR {
std::string thread_id_;
- HttpRequestPtr request_;
- HttpResponsePtr response_;
+ PostHttpRequestJsonPtr request_;
+ HttpResponseJsonPtr response_;
};
+/// @brief Pointer to a ClientRR.
typedef boost::shared_ptr<ClientRR> ClientRRPtr;
-/// @todo Creator and Factory are currently with server_client_unittests.cc
/// @brief Implementation of the @ref HttpResponseCreator.
class TestHttpResponseCreator : public HttpResponseCreator {
public:
// The request hasn't been finalized so the request object
// doesn't contain any information about the HTTP version number
// used. But, the context should have this data (assuming the
- // HTTP version is parsed ok).
+ // HTTP version is parsed OK).
HttpVersion http_version(request->context()->http_version_major_,
request->context()->http_version_minor_);
// This will generate the response holding JSON content.
- ResponsePtr response(new Response(http_version, status_code));
+ HttpResponseJsonPtr response(new HttpResponseJson(http_version, status_code));
response->finalize();
return (response);
}
/// @brief Creates HTTP response.
///
- /// This method generates 3 types of responses:
- /// - response with a requested content type,
- /// - partial response with incomplete JSON body,
- /// - response with JSON body copied from the request.
- ///
- /// The first one is useful to test situations when received response can't
- /// be parsed because of the content type mismatch. The second one is useful
- /// to test request timeouts. The third type is used by most of the unit tests
- /// to test successful transactions.
+ /// This method generates a response with the JSON body copied
+ /// from the request.
///
/// @param request Pointer to the HTTP request.
/// @return Pointer to the generated HTTP OK response with no content.
// Request must always be JSON.
PostHttpRequestJsonPtr request_json =
boost::dynamic_pointer_cast<PostHttpRequestJson>(request);
- ConstElementPtr body;
- if (request_json) {
- body = request_json->getBodyAsJson();
- if (body) {
- // Check if the client requested one of the two first response
- // types.
- GenericResponsePtr response;
- ConstElementPtr content_type = body->get("requested-content-type");
- ConstElementPtr partial_response = body->get("partial-response");
- if (content_type || partial_response) {
- // The first two response types can only be generated using the
- // generic response as we have to explicitly modify some of the
- // values.
- response.reset(new GenericResponse(request->getHttpVersion(),
- HttpStatusCode::OK));
- HttpResponseContextPtr ctx = response->context();
-
- if (content_type) {
- // Provide requested content type.
- ctx->headers_.push_back(HttpHeaderContext("Content-Type",
- content_type->stringValue()));
- // It doesn't matter what body is there.
- ctx->body_ = "abcd";
- response->finalize();
-
- } else {
- // Generate JSON response.
- ctx->headers_.push_back(HttpHeaderContext("Content-Type",
- "application/json"));
- // The body lacks '}' so the client will be waiting for it and
- // eventually should time out.
- ctx->body_ = "{";
- response->finalize();
- // The auto generated Content-Length header would be based on the
- // body size (so set to 1 byte). We have to override it to
- // account for the missing '}' character.
- response->setContentLength(2);
- }
- return (response);
- }
- }
+
+ if (!request_json) {
+ return(createStockHttpResponse(request, HttpStatusCode::BAD_REQUEST));
}
- // Third type of response is requested.
- ResponsePtr response(new Response(request->getHttpVersion(),
- HttpStatusCode::OK));
- // If body was included in the request. Let's copy it.
- if (body) {
- response->setBodyAsJson(body);
+ // Request must always contain a body.
+ ConstElementPtr body = request_json->getBodyAsJson();
+ if (!body) {
+ return(createStockHttpResponse(request, HttpStatusCode::BAD_REQUEST));
}
+ HttpResponseJsonPtr response(new HttpResponseJson(request->getHttpVersion(),
+ HttpStatusCode::OK));
+
+ // Echo request body back in the response.
+ response->setBodyAsJson(body);
+
response->finalize();
return (response);
}
}
};
-/// @brief Test fixture class for testing HTTP client.
+/// @brief Test fixture class for testing multi-threaded HTTP client.
class MtHttpClientTest : public ::testing::Test {
public:
/// @brief Destructor.
~MtHttpClientTest() {
+ if (client_) {
+ client_->stop();
+ }
+
if (listener_) {
listener_->stop();
}
-
- MultiThreadingMgr::instance().setMode(false);
}
/// @brief Callback function invoke upon test timeout.
io_service_.stop();
}
- void runIOService() {
+ /// @brief Runs test's IOService until the desired number of have been carried out.
+ void runIOService() {
// Loop until the clients are done, an error occurs, or the time runs out.
while (clientRRs_.size() < num_requests_) {
// Always call restart() before we call run();
return (request);
}
- void startRequest() {
+ /// @brief Initiates a single HTTP request.
+ ///
+ /// Constructs an HTTP post whose body is a JSON map containing a
+ /// single integer element, "sequence".
+ ///
+ /// The request completion handler will block each requesting thread
+ /// until the number of in-progress threads reaches the number of
+ /// threads in the pool. At that point, the handler will unblock
+ /// until all threads have finished preparing the response and are
+ /// ready to return. The handler will notify all pending threads
+ /// and invoke stop() on the test's main IO service thread.
+ ///
+ /// @param sequence value for the integer element, "sequence",
+ /// to send in the request.
+ void startRequest(int sequence) {
// Create the URL on which the server can be reached.
std::stringstream ss;
ss << "http://" << SERVER_ADDRESS << ":" << SERVER_PORT;
Url url(ss.str());
// Initiate request to the server.
- PostHttpRequestJsonPtr request = createRequest("sequence", 1);
+ PostHttpRequestJsonPtr request_json = createRequest("sequence", sequence);
HttpResponseJsonPtr response_json = boost::make_shared<HttpResponseJson>();
ASSERT_NO_THROW(client_->asyncSendRequest(url, TlsContextPtr(),
- request, response_json,
- [this, request](const boost::system::error_code& ec,
- const HttpResponsePtr& response,
+ request_json, response_json,
+ [this, request_json, response_json](const boost::system::error_code& ec,
+ const HttpResponsePtr&/* response*/,
const std::string&) {
// Bail on an error.
ASSERT_FALSE(ec) << "asyncSendRequest failed, ec: " << ec;
// Create the ClientRR.
ClientRRPtr clientRR(new ClientRR());
clientRR->thread_id_ = ss.str();
- clientRR->request_ = request;
- clientRR->response_ = response;
+ clientRR->request_ = request_json;
+ clientRR->response_ = response_json;
{
std::unique_lock<std::mutex> lck(mutex_);
// We're all done, notify the others and finish.
num_in_progress_ = 0;
cv_.notify_all();
- // Stop the test's IOservice.
+ // Stop the test's IOService.
io_service_.stop();
} else {
// I'm done but others aren't wait here.
}));
}
+ /// @brief Starts one or more HTTP requests via HttpClient to a test listener.
+ ///
+ /// This function command creates a HttpClient with the given number
+ /// of threads. It initiates then given number of HTTP requests. Each
+ /// request carries a single integer element, "sequence" in its body.
+ /// The response is expected to be this same element echoed back.
+ /// Then it iteratively runs the test's IOService until all
+ /// the requests have been responded to, an error occurs, or the
+ /// test times out.
+ ///
+ /// It requires that the number of requests, when greater than the
+ /// number of threads, be a multiple of the number of threads. The
+ /// requests completion handler is structured in such a way as to ensure
+ /// (we hope) that each client thread handles the same number of requests.
+ ///
+ /// @param num_threads - the number of threads the HttpClient
+ /// should use. A value of 0 puts the HttpClient in single-threaded mode.
+ /// @param num_requests - the number of requests that should be carried out.
+ /// Must be greater than 0. If it is greater than num_threads it must be a
+ /// multiple of num_threads.
+ ///
+ /// @param num_threads
+ /// @param num_requests
void threadRequestAndReceive(size_t num_threads, size_t num_requests) {
// First we makes sure the parameter rules apply.
ASSERT_TRUE((num_threads == 0) || (num_requests < num_threads)
|| (num_requests % num_threads == 0));
+ num_threads_ = num_threads;
+ num_requests_ = num_requests;
// Make a factory
factory_.reset(new TestHttpResponseCreatorFactory());
// Start the server.
ASSERT_NO_THROW(listener_->start());
- num_threads_ = num_threads;
- num_requests_ = num_requests;
-
// Create an MT client with num_threads
ASSERT_NO_THROW_LOG(client_.reset(new HttpClient(io_service_, num_threads)));
ASSERT_TRUE(client_);
if (num_threads_ == 0) {
+ // If we single-threaded client should not have it's own IOService.
ASSERT_FALSE(client_->getMyIOService());
} else {
+ // If we multi-threaded client should have it's own IOService.
ASSERT_TRUE(client_->getMyIOService());
}
+
+ // Verify the pool size and number of threads are as expected.
ASSERT_EQ(client_->getThreadPoolSize(), num_threads);
ASSERT_EQ(client_->getThreadCount(), num_threads);
// Start the requisite number of requests.
- for (auto i = 0; i < num_requests_; ++i) {
- startRequest();
+ for (int i = 0; i < num_requests_; ++i) {
+ startRequest(i + 1);
}
// Run test thread IOService. This drives the listener's IO.
ASSERT_NO_THROW(runIOService());
+ // We should have a response for each request.
ASSERT_EQ(clientRRs_.size(), num_requests_);
+
+ // Create a map to track number of responses for each client thread.
+ std::map<std::string, int> responses_per_thread;
+
+ // Get the stringified thread-id of the test's main thread.
+ std::stringstream ss;
+ ss << std::this_thread::get_id();
+ std::string main_thread_id = ss.str();
+
+ // Iterate over the client request/response pairs.
for (auto const& clientRR : clientRRs_ ) {
- HttpResponsePtr response = clientRR->response_;
- ASSERT_TRUE(response);
- ConstElementPtr sequence = response->getJsonElement("sequence");
+ // Make sure it's whole.
+ ASSERT_FALSE(clientRR->thread_id_.empty());
+ ASSERT_TRUE(clientRR->request_);
+ ASSERT_TRUE(clientRR->response_);
+
+ // Request should contain an integer sequence number.
+ int request_sequence;
+ ConstElementPtr sequence = clientRR->request_->getJsonElement("sequence");
+ ASSERT_TRUE(sequence);
+ ASSERT_NO_THROW(request_sequence = sequence->intValue());
+
+ // Response should contain an integer sequence number.
+ int response_sequence;
+ sequence = clientRR->response_->getJsonElement("sequence");
ASSERT_TRUE(sequence);
+ ASSERT_NO_THROW(response_sequence = sequence->intValue());
+
+ // Request and Response sequence numbers should match.
+ ASSERT_EQ(request_sequence, response_sequence);
+
+ if (num_threads_ == 0) {
+ // For ST mode thread id should always be the main thread.
+ ASSERT_EQ(clientRR->thread_id_, main_thread_id);
+ } else {
+ // For MT mode the thread id should never be the main thread.
+ ASSERT_NE(clientRR->thread_id_, main_thread_id);
+ }
+
+ // Bump the response count for the given client thread-id.
+ auto it = responses_per_thread.find(clientRR->thread_id_);
+ if (it != responses_per_thread.end()) {
+ responses_per_thread[clientRR->thread_id_] = it->second + 1;
+ } else {
+ responses_per_thread[clientRR->thread_id_] = 1;
+ }
+ }
+
+ // Make sure we have the expected number of responding threads.
+ if (num_threads_ == 0) {
+ ASSERT_EQ(responses_per_thread.size(), 1);
+ } else {
+ size_t expected_thread_count = (num_requests_ < num_threads_ ?
+ num_requests_ : num_threads_);
+ ASSERT_EQ(responses_per_thread.size(), expected_thread_count);
+ }
+
+ // Each thread-id ought to have received the same number of responses.
+ for (auto const& it : responses_per_thread) {
+ EXPECT_EQ(it.second, num_requests_ / responses_per_thread.size())
+ << "thread-id: " << it.first
+ << ", responses: " << it.second << std::endl;
}
ASSERT_NO_THROW(client_->stop());
// Create another multi-threaded instance.
ASSERT_NO_THROW_LOG(client.reset(new HttpClient(io_service_, 3)));
- // Make sure destruction doesn't throw. Note, if destuctor
- // doesn't stop the threads correctly the test will crash upon exit.
+ // Make sure destruction doesn't throw.
ASSERT_NO_THROW_LOG(client.reset());
}
+// Now we'll run some permutations of the number of client threads
+// and the number of client requests.
+
+// Single-threaded, three requests.
TEST_F(MtHttpClientTest, zeroByThree) {
- // Zero threads = ST mode.
- size_t num_threads = 0;
+ size_t num_threads = 0; // Zero threads = ST mode.
size_t num_requests = 3;
threadRequestAndReceive(num_threads, num_requests);
}
-
+// Multi-threaded with one thread, three requests.
TEST_F(MtHttpClientTest, oneByThree) {
size_t num_threads = 1;
size_t num_requests = 3;
threadRequestAndReceive(num_threads, num_requests);
}
+// Multi-threaded with threads, three requests.
TEST_F(MtHttpClientTest, threeByThree) {
size_t num_threads = 3;
size_t num_requests = 3;
threadRequestAndReceive(num_threads, num_requests);
}
+// Multi-threaded with threads, nine requests.
TEST_F(MtHttpClientTest, threeByNine) {
size_t num_threads = 3;
size_t num_requests = 9;
threadRequestAndReceive(num_threads, num_requests);
}
-}
+} // end of anonymous namespace