// Found it, look for an idle connection.
connection = destination->getIdleConnection();
} else {
- // Doesn't exist yet so it's a new destination/
+ // Doesn't exist yet so it's a new destination.
destination = addDestination(url);
}
/// @return The connection or an empty pointer if no matching
/// connection exists.
ConnectionPtr findBySocketFd(int socket_fd) {
- for (auto connection : connections_) {
+ for (auto const& connection : connections_) {
if (connection->isMySocket(socket_fd)) {
return (connection);
}
size_t max_connections_;
/// @brief List of concurrent connections.
- std::vector<ConnectionPtr> connections_;
+ std::list<ConnectionPtr> connections_;
/// @brief Holds the queue of request for this destination.
std::queue<RequestDescriptor> queue_;
///
/// @return pointer the desired destination, empty pointer
/// if the destination does not exist.
+ /// @note Must be called from within a thread-safe context.
DestinationPtr findDestination(const Url& url) const {
auto it = destinations_.find(url);
if (it != destinations_.end()) {
thread_io_service_->stop();
// Shutdown the threads.
- for(auto const& thread : threads_) {
+ for (auto const& thread : threads_) {
thread->join();
}
///
/// @return A pointer to the IOService, or an empty pointer when
/// in single-threaded mode.
- asiolink::IOServicePtr getThreadIOService() { return (thread_io_service_); };
+ asiolink::IOServicePtr getThreadIOService() {
+ return (thread_io_service_);
+ };
/// @brief Fetches the maximum size of the thread pool.
///
- /// @return unit16_t containing the maximum size of the thread pool.
+ /// @return the maximum size of the thread pool.
uint16_t getThreadPoolSize() {
return (thread_pool_size_);
}
/// @brief Fetches the number of threads in the pool.
///
- /// @return unit16_t containing the number of running threads.
+ /// @return the number of running threads.
uint16_t getThreadCount() {
return (threads_.size());
}
struct ClientRR {
/// @brief Thread id of the client thread handling the request as a string.
std::string thread_id_;
+
/// @brief HTTP request submitted by the client thread.
PostHttpRequestJsonPtr request_;
+
/// @brief HTTP response received by the client thread.
HttpResponseJsonPtr response_;
};
/// @brief Creates HTTP response.
///
/// @param request Pointer to the HTTP request.
+ /// @param status_code status code to include in the response.
+ ///
/// @return Pointer to the generated HTTP response.
virtual HttpResponsePtr
createStockHttpResponse(const HttpRequestPtr& request,
PostHttpRequestJsonPtr request_json =
boost::dynamic_pointer_cast<PostHttpRequestJson>(request);
if (!request_json) {
- return(createStockHttpResponse(request, HttpStatusCode::BAD_REQUEST));
+ return (createStockHttpResponse(request, HttpStatusCode::BAD_REQUEST));
}
// Extract the sequence from the request.
ConstElementPtr sequence = request_json->getJsonElement("sequence");
if (!sequence) {
- return(createStockHttpResponse(request, HttpStatusCode::BAD_REQUEST));
+ return (createStockHttpResponse(request, HttpStatusCode::BAD_REQUEST));
}
// Create the response.
/// @brief Constructor
///
- /// @param server_port listener port of the server.
+ /// @param server_port port upon with the server is listening. This
+ /// value will be included in responses such that each response
+ /// can be attributed to a specific server.
TestHttpResponseCreatorFactory(uint16_t server_port)
: server_port_(server_port) {};
/// @brief Constructor.
MtHttpClientTest()
: io_service_(), client_(), listener_(), factory_(), listeners_(), factories_(),
- test_timer_(io_service_), num_threads_(0), num_batches_(0), num_listeners_(0),
- expected_requests_(0), num_in_progress_(0), num_finished_(0) {
+ test_timer_(io_service_), num_threads_(0), num_batches_(0), num_listeners_(0),
+ expected_requests_(0), num_in_progress_(0), num_finished_(0) {
test_timer_.setup(std::bind(&MtHttpClientTest::timeoutHandler, this, true),
TEST_TIMEOUT, IntervalTimer::ONE_SHOT);
MultiThreadingMgr::instance().setMode(true);
for (const auto& listener : listeners_) {
listener->stop();
}
+
+ MultiThreadingMgr::instance().setMode(false);
}
/// @brief Callback function to invoke upon test timeout.
ASSERT_NO_THROW(client_->asyncSendRequest(url, TlsContextPtr(),
request_json, response_json,
[this, request_json, response_json](const boost::system::error_code& ec,
- const HttpResponsePtr&,
- const std::string&) {
+ const HttpResponsePtr&,
+ const std::string&) {
// Bail on an error.
ASSERT_FALSE(ec) << "asyncSendRequest failed, ec: " << ec;
// Create the ClientRR.
ClientRRPtr clientRR(new ClientRR());
- clientRR->thread_id_ = ss.str();
+ clientRR->thread_id_ = ss.str();
clientRR->request_ = request_json;
clientRR->response_ = response_json;
size_t effective_threads = (num_threads_ == 0 ? 1 : num_threads_);
// Calculate the expected number of requests.
- expected_requests_ = (num_batches_ * num_listeners_ * effective_threads);
+ expected_requests_ = (num_batches_ * num_listeners_ * effective_threads);
for (auto i = 0; i < num_listeners_; ++i) {
// Make a factory
- HttpResponseCreatorFactoryPtr factory(new TestHttpResponseCreatorFactory(SERVER_PORT+i));
+ HttpResponseCreatorFactoryPtr factory(new TestHttpResponseCreatorFactory(SERVER_PORT + i));
factories_.push_back(factory);
// Need to create a Listener on
HttpListenerPtr listener(new HttpListener(io_service_,
- IOAddress(SERVER_ADDRESS), SERVER_PORT+i,
+ IOAddress(SERVER_ADDRESS), (SERVER_PORT + i),
TlsContextPtr(), factory,
HttpListener::RequestTimeout(10000),
HttpListener::IdleTimeout(10000)));
ASSERT_EQ(client_->getThreadCount(), num_threads);
// Start the requisite number of requests:
- // batch * listners * threads.
+ // batch * listeners * threads.
int sequence = 0;
for (auto b = 0; b < num_batches; ++b) {
for (auto l = 0; l < num_listeners_; ++l) {
}
// Make sure that all client threads received responses.
- ASSERT_EQ(responses_per_thread.size(), num_threads ? num_threads : 1);
+ ASSERT_EQ(responses_per_thread.size(), effective_threads);
// Make sure that each client thread received the same number of responses.
for (auto const& it : responses_per_thread) {
/// @brief Pointer to the response creator factory.
HttpResponseCreatorFactoryPtr factory_;
+ /// @brief List of listeners.
std::vector<HttpListenerPtr> listeners_;
+
+ /// @brief List of response factories.
std::vector<HttpResponseCreatorFactoryPtr> factories_;
/// @brief Asynchronous timer service to detect timeouts.
/// @brief Mutex for locking.
std::mutex mutex_;
- /// @brief Condition variable used make client threads wait
+ /// @brief Condition variable used to make client threads wait
/// until number of in-progress requests reaches the number
/// of client requests.
std::condition_variable cv_;
TEST_F(MtHttpClientTest, zeroByThreeByThree) {
size_t num_threads = 0; // Zero threads = ST mode.
size_t num_batches = 3;
- size_t num_listeners= 3;
+ size_t num_listeners = 3;
threadRequestAndReceive(num_threads, num_batches, num_listeners);
}