/// This will return the control to the caller of the \c run() method.
void stop() { io_service_.stop();} ;
+ /// \brief Indicates if the IOService has been stopped.
+ ///
+ /// \return true if the IOService has been stopped, false otherwise.
+ bool stopped() const {
+ return (io_service_.stopped());
+ }
+
+ /// \brief Restarts the IOService in preparation for a subsequent \c run() invocation.
+ void restart() {
+ io_service_.reset();
+ }
+
/// \brief Removes IO service work object to let it finish running
/// when all handlers have been invoked.
void stopWork() {
io_impl_->stop();
}
+bool
+IOService::stopped() const {
+ return (io_impl_->stopped());
+}
+
+void
+IOService::restart() {
+ io_impl_->restart();
+}
+
void
IOService::stopWork() {
io_impl_->stopWork();
/// This will return the control to the caller of the \c run() method.
void stop();
+ /// \brief Indicates if the IOService has been stopped.
+ ///
+ /// \return true if the IOService has been stopped, false otherwise.
+ bool stopped() const;
+
+ /// \brief Restarts the IOService in preparation for a subsequent \c run() invocation.
+ void restart();
+
/// \brief Removes IO service work object to let it finish running
/// when all handlers have been invoked.
void stopWork();
HttpListener::RequestTimeout(TIMEOUT_AGENT_RECEIVE_COMMAND),
HttpListener::IdleTimeout(TIMEOUT_AGENT_IDLE_CONNECTION_TIMEOUT)));
- // Create a pool of threads, each calls run on our IOService_service instance.
- for (std::size_t i = 0; i < thread_pool_size_; ++i) {
- boost::shared_ptr<std::thread> thread(new std::thread(
- std::bind(&IOService::run, io_service_)));
- threads_.push_back(thread);
- }
+ // Create the thread pool.
+ threads_.reset(new HttpThreadPool(io_service_, thread_pool_size_, false));
// Instruct the HTTP listener to actually open socket, install
// callback and start listening.
}
}
+void
+CmdHttpListener::pause() {
+ if (threads_) {
+ threads_->pause();
+ }
+}
+
+void
+CmdHttpListener::resume() {
+ if (threads_) {
+ threads_->resume();
+ }
+}
+
void
CmdHttpListener::stop() {
// Nothing to do.
.arg(address_)
.arg(port_);
- // Stop the IOService first.
- io_service_->stop();
-
- // Stop the threads next.
- for (auto const& thread : threads_) {
- thread->join();
- }
-
- threads_.clear();
+ // Stop the thread pool.
+ threads_->stop();
// Get rid of the listener.
http_listener_.reset();
.arg(port_);
}
+HttpThreadPool::RunState
+CmdHttpListener::getRunState() const {
+ if (!threads_) {
+ isc_throw(InvalidOperation,
+ "CmdHttpListener::getRunState - no thread pool!");
+ }
+
+ return (threads_->getRunState());
+}
+
bool
CmdHttpListener::isListening() const {
// If we have a listener we're listening.
#include <asiolink/io_address.h>
#include <asiolink/io_service.h>
#include <http/listener.h>
+#include <http/http_thread_pool.h>
#include <thread>
#include <vector>
/// @brief Destructor
virtual ~CmdHttpListener();
- /// @brief Initiates the listener's worker thread.
+ /// @brief Starts running the listener's thread pool.
void start();
- /// @brief Stops the listener's worker thread.
+ /// @brief Pauses the listener's thread pool.
+ void pause();
+
+ /// @brief Resumes running the listener's thread pool.
+ void resume();
+
+ /// @brief Stops the listener's thread pool.
void stop();
+ http::HttpThreadPool::RunState getRunState() const;
+
/// @brief Checks if we are listening to the HTTP requests.
///
/// @return true if we are listening.
///
/// @return uint16_t containing the number of running threads.
uint16_t getThreadCount() {
- return (threads_.size());
+ if (!threads_) {
+ return (0);
+ }
+
+ return (threads_->getThreadCount());
}
private:
std::size_t thread_pool_size_;
/// @brief The pool of threads that do IO work.
- std::vector<boost::shared_ptr<std::thread> > threads_;
+ http::HttpThreadPoolPtr threads_;
};
/// @brief Defines a shared pointer to CmdHttpListener.
/// Starts test timer which detects timeouts, deregisters all commands
/// from CommandMgr, and enables multi-threading mode.
CmdHttpListenerTest()
- : io_service_(), test_timer_(io_service_), run_io_service_timer_(io_service_),
+ : listener_(), io_service_(), test_timer_(io_service_), run_io_service_timer_(io_service_),
clients_(), num_threads_(), num_clients_(), num_in_progress_(0), num_finished_(0),
- chunk_size_(0) {
+ chunk_size_(0), paused_(false), pause_cnt_(0) {
test_timer_.setup(std::bind(&CmdHttpListenerTest::timeoutHandler, this, true),
TEST_TIMEOUT, IntervalTimer::ONE_SHOT);
///
/// Removes HTTP clients, unregisters commands, disables MT.
virtual ~CmdHttpListenerTest() {
+ // Wipe out the listener.
+ listener_.reset();
+
// Destroy all remaining clients.
for (auto const& client : clients_) {
client->close();
/// because the test clients stop the io_service when they're
/// through with a request.
///
- /// @param timeout Optional value specifying for how long the io service
- /// should be ran.
- void runIOService() {
+ /// @param num_pauses Desired number of times the listener should be
+ /// paused during the test.
+ void runIOService(size_t num_pauses = 0) {
+ // Create a timer to use for invoking resume after pause.
+ IntervalTimer pause_timer_(io_service_);
+ paused_ = false;
+
// Loop until the clients are done, an error occurs, or the time runs out.
- bool keep_going = true;
- while (keep_going) {
- // Always call reset() before we call run();
- io_service_.get_io_service().reset();
+ size_t num_done = 0;
+ while (num_done != clients_.size()) {
+ // Always call restart() before we call run();
+ io_service_.restart();
+
+ if (shouldPause(num_pauses, num_done)) {
+ // Pause the listener .
+ paused_ = true;
+ ++pause_cnt_;
+ listener_->pause();
+
+ // Set timer to resume listener.
+ pause_timer_.setup(
+ [this]() {
+ listener_->resume();
+ paused_ = false;
+ }, 10, IntervalTimer::ONE_SHOT);
+ }
// Run until a client stops the service.
io_service_.run();
// If all the clients are done receiving, the test is done.
- keep_going = false;
+ num_done = 0;
for (auto const& client : clients_) {
- if (!client->receiveDone()) {
- keep_going = true;
- break;
+ if (client->receiveDone()) {
+ ++num_done;
}
}
}
}
+ /// @brief Determines if the listner should be paused.
+ ///
+ /// @param num_pauses desired number of pauses
+ /// @param num_done number of clients that have completed their requests.
+ ///
+ /// @return True if the listener should be paused.
+ bool shouldPause(size_t num_pauses, size_t num_done) {
+ // True if the number of clients done is a mulitple of the number of pauses.
+ return (!paused_ && num_pauses && num_done && !(num_done % num_pauses));
+ }
+
/// @brief Create an HttpResponse from a response string.
///
/// @param response_str a string containing the whole HTTP
/// thread command. Each client is used to carry out a single thread
/// command request. Must be greater than 0 and a multiple of num_threads
/// if it is greater than num_threads.
- void threadListenAndRespond(size_t num_threads, size_t num_clients) {
+ /// @param num_pauses Desired number of times the listener should be
+ /// paused during the test.
+ void threadListenAndRespond(size_t num_threads, size_t num_clients,
+ size_t num_pauses = 0) {
// First we makes sure the parameter rules apply.
ASSERT_TRUE(num_threads > 0);
ASSERT_TRUE(num_clients > 0);
this, ph::_1, ph::_2));
// Create a listener with prescribed number of threads.
- CmdHttpListenerPtr listener;
- ASSERT_NO_THROW_LOG(listener.reset(new CmdHttpListener(IOAddress(SERVER_ADDRESS),
+ ASSERT_NO_THROW_LOG(listener_.reset(new CmdHttpListener(IOAddress(SERVER_ADDRESS),
SERVER_PORT, num_threads)));
- ASSERT_TRUE(listener);
+ ASSERT_TRUE(listener_);
// Start it and verify it is listening.
- ASSERT_NO_THROW_LOG(listener->start());
- ASSERT_TRUE(listener->isListening());
- EXPECT_EQ(listener->getThreadCount(), num_threads);
+ ASSERT_NO_THROW_LOG(listener_->start());
+ ASSERT_TRUE(listener_->isListening());
+ EXPECT_EQ(listener_->getThreadCount(), num_threads);
// Maps the number of clients served by a given thread-id.
std::map<std::string, int> clients_per_thread;
// Now we run the client-side IOService until all requests are done,
// errors occur or the test times out.
- ASSERT_NO_FATAL_FAILURE(runIOService());
+ ASSERT_NO_FATAL_FAILURE(runIOService(num_pauses));
// Stop the listener and then verify it has stopped.
- ASSERT_NO_THROW_LOG(listener->stop());
- ASSERT_FALSE(listener->isListening());
- EXPECT_EQ(listener->getThreadCount(), 0);
+ ASSERT_NO_THROW_LOG(listener_->stop());
+ ASSERT_FALSE(listener_->isListening());
+ EXPECT_EQ(listener_->getThreadCount(), 0);
// Iterate over the clients, checking their outcomes.
size_t total_responses = 0;
<< "thread-id: " << it.first
<< ", clients: " << it.second << std::endl;
}
+
+ // We should have had the expected number of pauses.
+ if (!num_pauses) {
+ ASSERT_EQ(pause_cnt_, 0);
+ } else {
+ // We allow a range on pauses of +-1.
+ ASSERT_TRUE((num_pauses - 1) <= pause_cnt_ &&
+ (pause_cnt_ <= (num_pauses + 1)))
+ << " num+_pauses: " << num_pauses
+ << ", pause_cnt_" << pause_cnt_;
+ }
}
+
+ /// @brief CmdHttpListener instance under test.
+ CmdHttpListenerPtr listener_;
+
/// @brief IO service used in drive the test and test clients.
IOService io_service_;
/// @brief Condition variable used to coordinate threads.
std::condition_variable cv_;
+
+ /// @brief Indicates if client threads are currently "paused".
+ bool paused_;
+
+ /// @brief Number of times client has been paused during the test.
+ size_t pause_cnt_;
};
/// Verifies the construction, starting, stopping, and destruction
TEST_F(CmdHttpListenerTest, basics) {
// Make sure multi-threading is off.
MultiThreadingMgr::instance().setMode(false);
- CmdHttpListenerPtr listener;
asiolink::IOAddress address(SERVER_ADDRESS);
uint16_t port = SERVER_PORT;
// Make sure we can create one.
- ASSERT_NO_THROW_LOG(listener.reset(new CmdHttpListener(address, port)));
- ASSERT_TRUE(listener);
+ ASSERT_NO_THROW_LOG(listener_.reset(new CmdHttpListener(address, port)));
+ ASSERT_TRUE(listener_);
// Verify the getters do what we expect.
- EXPECT_EQ(listener->getAddress(), address);
- EXPECT_EQ(listener->getPort(), port);
- EXPECT_EQ(listener->getThreadPoolSize(), 1);
+ EXPECT_EQ(listener_->getAddress(), address);
+ EXPECT_EQ(listener_->getPort(), port);
+ EXPECT_EQ(listener_->getThreadPoolSize(), 1);
// It should not be listening and have no threads.
- EXPECT_FALSE(listener->isListening());
- EXPECT_EQ(listener->getThreadCount(), 0);
+ EXPECT_FALSE(listener_->isListening());
+ EXPECT_EQ(listener_->getThreadCount(), 0);
// Verify that we cannot start it when multi-threading is disabled.
ASSERT_FALSE(MultiThreadingMgr::instance().getMode());
- ASSERT_THROW_MSG(listener->start(), InvalidOperation,
+ ASSERT_THROW_MSG(listener_->start(), InvalidOperation,
"CmdHttpListener cannot be started"
" when multi-threading is disabled");
// It should still not be listening and have no threads.
- EXPECT_FALSE(listener->isListening());
- EXPECT_EQ(listener->getThreadCount(), 0);
+ EXPECT_FALSE(listener_->isListening());
+ EXPECT_EQ(listener_->getThreadCount(), 0);
// Enable multi-threading.
MultiThreadingMgr::instance().setMode(true);
// Make sure we can start it and it's listening with 1 thread.
- ASSERT_NO_THROW_LOG(listener->start());
- ASSERT_TRUE(listener->isListening());
- EXPECT_EQ(listener->getThreadCount(), 1);
+ ASSERT_NO_THROW_LOG(listener_->start());
+ ASSERT_TRUE(listener_->isListening());
+ EXPECT_EQ(listener_->getThreadCount(), 1);
// Trying to start it again should fail.
- ASSERT_THROW_MSG(listener->start(), InvalidOperation,
+ ASSERT_THROW_MSG(listener_->start(), InvalidOperation,
"CmdHttpListener is already listening!");
// Stop it and verify we're no longer listening.
- ASSERT_NO_THROW_LOG(listener->stop());
- ASSERT_FALSE(listener->isListening());
- EXPECT_EQ(listener->getThreadCount(), 0);
+ ASSERT_NO_THROW_LOG(listener_->stop());
+ ASSERT_FALSE(listener_->isListening());
+ EXPECT_EQ(listener_->getThreadCount(), 0);
// Make sure we can call stop again without problems.
- ASSERT_NO_THROW_LOG(listener->stop());
+ ASSERT_NO_THROW_LOG(listener_->stop());
// We should be able to restart it.
- ASSERT_NO_THROW_LOG(listener->start());
- ASSERT_TRUE(listener->isListening());
- EXPECT_EQ(listener->getThreadCount(), 1);
+ ASSERT_NO_THROW_LOG(listener_->start());
+ ASSERT_TRUE(listener_->isListening());
+ EXPECT_EQ(listener_->getThreadCount(), 1);
// Destroying it should also stop it.
// If the test timeouts we know it didn't!
- ASSERT_NO_THROW_LOG(listener.reset());
+ ASSERT_NO_THROW_LOG(listener_.reset());
// Verify we can construct with more than one thread.
- ASSERT_NO_THROW_LOG(listener.reset(new CmdHttpListener(address, port, 4)));
- ASSERT_NO_THROW_LOG(listener->start());
- EXPECT_EQ(listener->getAddress(), address);
- EXPECT_EQ(listener->getPort(), port);
- EXPECT_EQ(listener->getThreadPoolSize(), 4);
- ASSERT_TRUE(listener->isListening());
- EXPECT_EQ(listener->getThreadCount(), 4);
+ ASSERT_NO_THROW_LOG(listener_.reset(new CmdHttpListener(address, port, 4)));
+ ASSERT_NO_THROW_LOG(listener_->start());
+ EXPECT_EQ(listener_->getAddress(), address);
+ EXPECT_EQ(listener_->getPort(), port);
+ EXPECT_EQ(listener_->getThreadPoolSize(), 4);
+ ASSERT_TRUE(listener_->isListening());
+ EXPECT_EQ(listener_->getThreadCount(), 4);
// Stop it and verify we're no longer listening.
- ASSERT_NO_THROW_LOG(listener->stop());
- ASSERT_FALSE(listener->isListening());
- EXPECT_EQ(listener->getThreadCount(), 0);
+ ASSERT_NO_THROW_LOG(listener_->stop());
+ ASSERT_FALSE(listener_->isListening());
+ EXPECT_EQ(listener_->getThreadCount(), 0);
}
TEST_F(CmdHttpListenerTest, basicListenAndRespond) {
// Create a listener with 1 thread.
- CmdHttpListenerPtr listener;
- ASSERT_NO_THROW_LOG(listener.reset(new CmdHttpListener(IOAddress(SERVER_ADDRESS),
+ ASSERT_NO_THROW_LOG(listener_.reset(new CmdHttpListener(IOAddress(SERVER_ADDRESS),
SERVER_PORT)));
- ASSERT_TRUE(listener);
+ ASSERT_TRUE(listener_);
// Start the listener and verify it's listening with 1 thread.
- ASSERT_NO_THROW_LOG(listener->start());
- ASSERT_TRUE(listener->isListening());
- EXPECT_EQ(listener->getThreadCount(), 1);
+ ASSERT_NO_THROW_LOG(listener_->start());
+ ASSERT_TRUE(listener_->isListening());
+ EXPECT_EQ(listener_->getThreadCount(), 1);
// Now let's send a "foo" command. This should create a client, connect
// to our listener, post our request and retrieve our reply.
ASSERT_NO_THROW(startRequest("{\"command\": \"foo\"}"));
+ ++num_clients_;
ASSERT_EQ(1, clients_.size());
ASSERT_NO_THROW(runIOService());
TestHttpClientPtr client = clients_.front();
this, ph::_1, ph::_2));
// Try posting the foo command again.
ASSERT_NO_THROW(startRequest("{\"command\": \"foo\"}"));
+ ++num_clients_;
ASSERT_EQ(2, clients_.size());
ASSERT_NO_THROW(runIOService());
client = clients_.back();
EXPECT_EQ(hr->getBody(), "[ { \"arguments\": [ \"bar\" ], \"result\": 0 } ]");
// Make sure the listener is still listening.
- ASSERT_TRUE(listener->isListening());
- EXPECT_EQ(listener->getThreadCount(), 1);
+ ASSERT_TRUE(listener_->isListening());
+ EXPECT_EQ(listener_->getThreadCount(), 1);
// Stop the listener then verify it has stopped.
- ASSERT_NO_THROW_LOG(listener->stop());
- ASSERT_FALSE(listener->isListening());
- EXPECT_EQ(listener->getThreadCount(), 0);
+ ASSERT_NO_THROW_LOG(listener_->stop());
+ ASSERT_FALSE(listener_->isListening());
+ EXPECT_EQ(listener_->getThreadCount(), 0);
}
// Now we'll run some permutations of the number of listener threads
threadListenAndRespond(num_threads, num_clients);
}
+// Pauses and resumes during work.
+TEST_F(CmdHttpListenerTest, pauseAndResume) {
+ size_t num_threads = 6;
+ size_t num_clients = 18;
+ size_t num_pauses = 3;
+ threadListenAndRespond(num_threads, num_clients, num_pauses);
+}
+
} // end of anonymous namespace
libkea_http_la_SOURCES += auth_messages.cc auth_messages.h
libkea_http_la_SOURCES += basic_auth_config.cc basic_auth_config.h
libkea_http_la_SOURCES += basic_auth.cc basic_auth.h
+libkea_http_la_SOURCES += http_thread_pool.cc http_thread_pool.h
libkea_http_la_CXXFLAGS = $(AM_CXXFLAGS)
libkea_http_la_CPPFLAGS = $(AM_CPPFLAGS)
http_message.h \
http_message_parser_base.h \
http_messages.h \
+ http_thread_pool.h \
http_types.h \
listener.h \
listener_impl.h \
#include <map>
#include <mutex>
#include <queue>
+#include <thread>
+
using namespace isc;
using namespace isc::asiolink;
///
/// @param io_service IOService that will drive connection IO in single
/// threaded mode. (Currently ignored in multi-threaded mode)
- ///
/// @param thread_pool_size maximum number of concurrent threads
/// Internally this also sets the maximum number concurrent connections
+ /// @param defer_thread_start if true, then the thread pool will be
+ /// created but started Applicable only when thread-pool-size is
+ /// greater than zero.
+ /// will not be startedfalse, then
/// per URL.
- HttpClientImpl(IOService& io_service, size_t thread_pool_size = 0) :
- thread_pool_size_(thread_pool_size) {
+ HttpClientImpl(IOService& io_service, size_t thread_pool_size = 0,
+ bool defer_thread_start = false)
+ : thread_pool_size_(thread_pool_size), threads_() {
if (thread_pool_size_ > 0) {
// Create our own private IOService.
thread_io_service_.reset(new IOService());
- // Create a pool of threads, each calls run on the same, private
- // io_service instance
- for (std::size_t i = 0; i < thread_pool_size_; ++i) {
- boost::shared_ptr<std::thread> thread(new std::thread(std::bind(&IOService::run,
- thread_io_service_)));
- threads_.push_back(thread);
- }
+ // Create the thread pool.
+ threads_.reset(new HttpThreadPool(thread_io_service_, thread_pool_size_,
+ defer_thread_start));
// Create the connection pool. Note that we use the thread_pool_size
// as the maximum connections per URL value.
conn_pool_.reset(new ConnectionPool(*thread_io_service_, thread_pool_size_));
LOG_DEBUG(http_logger, isc::log::DBGLVL_TRACE_BASIC, HTTP_CLIENT_MT_STARTED)
- .arg(threads_.size());
+ .arg(getThreadCount());
} else {
// Single-threaded mode: use the caller's IOService,
// one connection per URL.
stop();
}
- /// @brief Close all connections, and if multi-threaded, stop internal IOService
- /// and the thread pool.
+ /// @brief Close all connections, and if multi-threaded, stops the
+ /// thread pool.
void stop() {
// Close all the connections.
conn_pool_->closeAll();
- // Stop the multi-threaded service.
- if (thread_io_service_) {
- // Flush cancelled (and ready) handlers.
- thread_io_service_->poll();
+ if (threads_) {
+ threads_->stop();
+ }
+ }
- // Stop the private IOService.
- thread_io_service_->stop();
+ void pause() {
+ if (!threads_) {
+ isc_throw(InvalidOperation, "HttpClient::pause - no thread pool");
+ }
- // Shutdown the threads.
- for (auto const& thread : threads_) {
- thread->join();
- }
+ threads_->pause();
+ }
- threads_.clear();
+ /// @brief Pauses the thread pool's worker threads.
+ void resume() {
+ if (!threads_) {
+ isc_throw(InvalidOperation, "HttpClient::resume - no thread pool");
}
- // Get rid of the IOService.
- thread_io_service_.reset();
+ threads_->resume();
+ }
+
+ HttpThreadPool::RunState getRunState() const {
+ if (!threads_) {
+ isc_throw(InvalidOperation, "HttpClient::getRunState - no thread pool");
+ }
+
+ return (threads_->getRunState());
}
/// @brief Fetches the internal IOService used in multi-threaded mode.
///
/// @return the number of running threads.
uint16_t getThreadCount() {
- return (threads_.size());
+ if (!threads_) {
+ return (0);
+ }
+ return (threads_->getThreadCount());
}
/// @brief Holds a pointer to the connection pool.
ConnectionPoolPtr conn_pool_;
private:
+
/// @brief Maxim number of threads in the thread pool.
size_t thread_pool_size_;
- /// @brief Pool of threads used to service connections in multi-threaded
- /// mode.
- std::vector<boost::shared_ptr<std::thread> > threads_;
-
/// @brief Pointer to private IOService used in multi-threaded mode.
asiolink::IOServicePtr thread_io_service_;
+
+ /// @brief Pool of threads used to service connections in multi-threaded
+ /// mode.
+ HttpThreadPoolPtr threads_;
};
HttpClient::HttpClient(IOService& io_service, size_t thread_pool_size) {
impl_->stop();
}
+void
+HttpClient::pause() {
+ impl_->pause();
+}
+
+void
+HttpClient::resume() {
+ impl_->resume();
+}
+
const IOServicePtr
HttpClient::getThreadIOService() const {
return (impl_->getThreadIOService());
return (impl_->getThreadCount());
}
+HttpThreadPool::RunState
+HttpClient::getRunState() const {
+ return (impl_->getRunState());
+}
+
+
} // end of namespace isc::http
} // end of namespace isc
#include <http/url.h>
#include <http/request.h>
#include <http/response.h>
+#include <http/http_thread_pool.h>
#include <boost/shared_ptr.hpp>
#include <functional>
#include <string>
/// are returned via the 3rd parameter of the callback.
class HttpClient {
public:
-
/// @brief HTTP request/response timeout value.
struct RequestTimeout {
/// @brief Constructor.
/// IOService.
void stop();
+
/// @brief Closes a connection if it has an out-of-band socket event
///
/// If the client owns a connection using the given socket and that
/// @return the number of running threads.
uint16_t getThreadCount() const;
+ void pause();
+ void resume();
+ HttpThreadPool::RunState getRunState() const;
+
private:
/// @brief Pointer to the HTTP client implementation.
--- /dev/null
+// Copyright (C) 2021 Internet Systems Consortium, Inc. ("ISC")
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+#include <config.h>
+
+#include <asiolink/asio_wrapper.h>
+#include <asiolink/io_service.h>
+#include <asiolink/interval_timer.h>
+#include <exceptions/exceptions.h>
+#include <http/http_log.h>
+#include <http/http_messages.h>
+#include <http/http_thread_pool.h>
+#include <util/multi_threading_mgr.h>
+#include <util/unlock_guard.h>
+
+#include <boost/shared_ptr.hpp>
+
+#include <atomic>
+#include <functional>
+#include <iostream>
+#include <list>
+#include <mutex>
+#include <thread>
+
+using namespace isc;
+using namespace isc::asiolink;
+using namespace isc::http;
+using namespace isc::util;
+
+HttpThreadPool::HttpThreadPool(IOServicePtr io_service, size_t pool_size,
+ bool defer_start /* = false */)
+ : pool_size_(pool_size), io_service_(io_service),
+ run_state_(RunState::STOPPED), mutex_(), cv_() {
+ if (!pool_size) {
+ isc_throw(BadValue, "HttpThreadPool::ctor pool_size must be > 0");
+ }
+
+ // If we weren't given an IOService, create our own.
+ if (!io_service_) {
+ io_service_.reset(new IOService());
+ }
+
+ // If we're not deferring the start, do it now.
+ if (!defer_start) {
+ start();
+ }
+}
+
+HttpThreadPool::~HttpThreadPool() {
+ if (getRunState() != RunState::STOPPED) {
+ // Stop if we aren't already stopped
+ stop();
+ }
+}
+
+void
+HttpThreadPool::start() {
+ if (getRunState() != RunState::STOPPED) {
+ isc_throw(InvalidOperation, "HttpThreadPool::start already started!");
+ }
+
+ // Set state to RUN.
+ setRunState(RunState::RUN);
+
+ // Prep IOservice for run() invocations.
+ io_service_->restart();
+
+ // Create a pool of threads, each calls run() on our
+ // io_service instance.
+ for (std::size_t i = 0; i < pool_size_; ++i) {
+ boost::shared_ptr<std::thread> thread(new std::thread(
+ [this]() {
+ bool done = false;
+ while (!done) {
+ switch (getRunState()) {
+ case RunState::RUN:
+ io_service_->run();
+ break;
+ case RunState::PAUSED:
+ {
+ // We need to stop and wait to be released. We don't care how
+ // we exit, we'll do whatever the current state dictates.
+ std::unique_lock<std::mutex> lck(mutex_);
+ static_cast<void>(cv_.wait_for(lck, std::chrono::milliseconds(25),
+ [&]() {
+ return (run_state_ != RunState::PAUSED);
+ }));
+
+ break;
+ }
+ case RunState::SHUTDOWN:
+ done = true;
+ break;
+ case RunState::STOPPED:
+ // This should never happen.
+ done = true;
+ break;
+ }
+ }
+ }));
+
+ threads_.push_back(thread);
+ }
+}
+
+void
+HttpThreadPool::stop() {
+ if (getRunState() == RunState::STOPPED) {
+ // Nothing to do.
+ return;
+ }
+
+ // Set the state to SHUTDOWN.
+ setRunState(RunState::SHUTDOWN);
+
+ // Stop our IOService.
+ if (!io_service_->stopped()) {
+ io_service_->stop();
+ }
+
+ // Shutdown the threads.
+ for (auto const& thread : threads_) {
+ thread->join();
+ }
+
+ // Empty the thread pool.
+ threads_.clear();
+
+ // Set the state to STOPPED.
+ setRunState(RunState::STOPPED);
+}
+
+void
+HttpThreadPool::pause() {
+ if (getRunState() != RunState::RUN) {
+ // Not running, can't pause.
+ return;
+ }
+
+ /// @todo TKM - Take this out
+ std::cout << "HttpThreadPool pausing" << std::endl;
+ setRunState(RunState::PAUSED);
+ io_service_->stop();
+}
+
+void
+HttpThreadPool::resume() {
+ if (getRunState() != RunState::PAUSED) {
+ // Not PAUSED, can't resume.
+ return;
+ }
+
+ /// @todo TKM - Take this out
+ std::cout << "HttpThreadPool resuming" << std::endl;
+ io_service_->restart();
+ setRunState(RunState::RUN);
+}
+
+HttpThreadPool::RunState
+HttpThreadPool::getRunState() {
+ std::lock_guard<std::mutex> lck(mutex_);
+ return (run_state_);
+}
+
+void
+HttpThreadPool::setRunState(RunState state) {
+ {
+ std::lock_guard<std::mutex> lck(mutex_);
+ run_state_ = state;
+ }
+ cv_.notify_all();
+}
+
+IOServicePtr
+HttpThreadPool::getIOService() const {
+ return (io_service_);
+}
+
+uint16_t
+HttpThreadPool::getPoolSize() const {
+ return (pool_size_);
+}
+
+uint16_t
+HttpThreadPool::getThreadCount() const {
+ return (threads_.size());
+}
--- /dev/null
+// Copyright (C) 2021 Internet Systems Consortium, Inc. ("ISC")
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+#ifndef HTTP_THREAD_POOL_H
+#define HTTP_THREAD_POOL_H
+
+#include <asiolink/io_service.h>
+#include <util/unlock_guard.h>
+
+#include <boost/shared_ptr.hpp>
+
+#include <condition_variable>
+#include <list>
+#include <mutex>
+#include <thread>
+
+namespace isc {
+namespace http {
+
+/// @brief Implements a pausable pool of IOService driven threads.
+class HttpThreadPool {
+public:
+ /// @brief Describes the possible operational state of the pool.
+ enum class RunState {
+ STOPPED, /// Pool is not operational.
+ RUN, /// Pool is populated with running threads.
+ PAUSED, /// Pool is populated with threads which are paused.
+ SHUTDOWN, /// Pool is transitioning from RUN or PAUSED to STOPPED.
+ };
+
+ /// @brief Constructor
+ ///
+ /// @param io_service IOService that will drive the pool's IO. If empty, it
+ /// create it's own instance.
+ /// @param pool_size Maximum number of threads in the pool. Currently the number
+ /// of threads is fixed at this value.
+ /// @param defer_start If true, creation of the threads is deferred until a subsequent
+ /// call to @ref start(). In this case the pool's operational state post construction
+ /// is STOPPED. If false, the construtor will invoke start() which will create the
+ /// threads, placing the pool in RUN state.
+ HttpThreadPool(asiolink::IOServicePtr io_service, size_t pool_size, bool defer_start = false);
+
+ /// @brief Destructor
+ ///
+ /// Ensures the pool is stopped prior to destruction.
+ ~HttpThreadPool();
+
+ /// @brief Transitions the pool from STOPPED to RUN run state.
+ ///
+ /// It starts the pool by doing the following:
+ /// -# Sets state to RUN
+ /// -# Restarts the IOService preparing it thread invocations of
+ /// IOService::run()
+ /// -# Creates thread_pool_size_ threads, adding each to the pool.
+ ///
+ /// @throw InvalidOperation if called with the pool in any state other
+ /// than STOPPED.
+ void start();
+
+ /// @brief Tranisitions the pool to STOPPED state.
+ ///
+ /// It stops the pool by doing the following:
+ /// -# Sets the state to SHUTDOWN.
+ /// =# Stops the IOService.
+ /// =# Joins the pool threads.
+ /// -# Empties the pool.
+ /// -# Sets the state to STOPPED.
+ void stop();
+
+ /// @brief Transitions the pool from RUN to PAUSED state.
+ ///
+ /// If the state is any state other than RUN it simply returns,
+ /// otherwise it does the following:
+ ///
+ /// -# Sets the state to PAUSED.
+ /// -# Stops the IOService.
+ void pause();
+
+ /// @brief Transitions the pool from PAUSED to RUN state.
+ ///
+ /// If the state is any state other than PAUSED it simply returns,
+ /// otherwise it does the following:
+ ///
+ /// -# Restarts the IOService preparing it for thread invocations
+ /// of IOService::run()
+ /// -# Sets the state to RUN.
+ void resume();
+
+ /// @brief Thread-safe fetch of the pool's operational state.
+ ///
+ /// @return Pool run state.
+ RunState getRunState();
+
+private:
+ /// @brief Thread-safe set of the pool's operational state.
+ ///
+ /// @note This method does not validate the state change.
+ ///
+ /// @param state new state for the pool.
+ void setRunState(RunState state);
+
+public:
+ /// @brief Fetches the IOService that drives the pool.
+ ///
+ /// @return A pointer to the IOService.
+ asiolink::IOServicePtr getIOService() const;
+
+ /// @brief Fetches the maximum size of the thread pool.
+ ///
+ /// @return the maximum size of the thread pool.
+ uint16_t getPoolSize() const;
+
+ /// @brief Fetches the number of threads in the pool.
+ ///
+ /// @return the number of running threads.
+ uint16_t getThreadCount() const;
+
+private:
+ /// @brief Maxim number of threads in the thread pool.
+ size_t pool_size_;
+
+ /// @brief Pointer to private IOService used in multi-threaded mode.
+ asiolink::IOServicePtr io_service_;
+
+ /// @brief Tracks the operational state of the pool.
+ RunState run_state_;
+
+ /// @brief Mutex to protect the internal state.
+ std::mutex mutex_;
+
+ /// @brief Condition variable for synchronization.
+ std::condition_variable cv_;
+
+ /// @brief Pool of threads used to service connections in multi-threaded
+ /// mode.
+ std::list<boost::shared_ptr<std::thread> > threads_;
+};
+
+/// @brief Defines a pointer to a thread pool.
+typedef boost::shared_ptr<HttpThreadPool> HttpThreadPoolPtr;
+
+} // end of namespace isc::http
+} // end of namespace isc
+
+#endif
+
libhttp_unittests_SOURCES += url_unittests.cc
libhttp_unittests_SOURCES += test_http_client.h
libhttp_unittests_SOURCES += mt_client_unittests.cc
+libhttp_unittests_SOURCES += http_thread_pool_unittests.cc
libhttp_unittests_CPPFLAGS = $(AM_CPPFLAGS) $(GTEST_INCLUDES)
libhttp_unittests_CXXFLAGS = $(AM_CXXFLAGS)
MtHttpClientTest()
: io_service_(), client_(), listener_(), factory_(), listeners_(), factories_(),
test_timer_(io_service_), num_threads_(0), num_batches_(0), num_listeners_(0),
- expected_requests_(0), num_in_progress_(0), num_finished_(0) {
+ expected_requests_(0), num_in_progress_(0), num_finished_(0), paused_(false),
+ pause_cnt_(0) {
test_timer_.setup(std::bind(&MtHttpClientTest::timeoutHandler, this, true),
TEST_TIMEOUT, IntervalTimer::ONE_SHOT);
MultiThreadingMgr::instance().setMode(true);
/// have been carried out or the test fails.
void runIOService() {
// Loop until the clients are done, an error occurs, or the time runs out.
- while (clientRRs_.size() < expected_requests_) {
+ while (getRRCount() < expected_requests_) {
// Always call reset() before we call run();
io_service_.get_io_service().reset();
// Wait here until we have as many in progress as we have threads.
{
- std::unique_lock<std::mutex> lck(mutex_);
+ std::unique_lock<std::mutex> lck(test_mutex_);
++num_in_progress_;
if (num_threads_ == 0 || num_in_progress_ == num_threads_) {
// Everybody has one, let's go.
num_finished_ = 0;
- cv_.notify_all();
+ test_cv_.notify_all();
} else {
// I'm ready but others aren't wait here.
- bool ret = cv_.wait_for(lck, std::chrono::seconds(10),
+ bool ret = test_cv_.wait_for(lck, std::chrono::seconds(10),
[&]() { return (num_in_progress_ == num_threads_); });
if (!ret) {
ADD_FAILURE() << "clients failed to start work";
// Wait here until we have as many ready to finish as we have threads.
{
- std::unique_lock<std::mutex> lck(mutex_);
+ std::unique_lock<std::mutex> lck(test_mutex_);
++num_finished_;
clientRRs_.push_back(clientRR);
if (num_threads_ == 0 || num_finished_ == num_threads_) {
// We're all done, notify the others and finish.
num_in_progress_ = 0;
- cv_.notify_all();
+ test_cv_.notify_all();
// Stop the test's IOService.
io_service_.stop();
} else {
// I'm done but others aren't wait here.
- bool ret = cv_.wait_for(lck, std::chrono::seconds(10),
+ bool ret = test_cv_.wait_for(lck, std::chrono::seconds(10),
[&]() { return (num_finished_ == num_threads_); });
if (!ret) {
ADD_FAILURE() << "clients failed to finish work";
}));
}
+ /// @brief Initiates a single HTTP request.
+ ///
+ /// Constructs an HTTP post whose body is a JSON map containing a
+ /// single integer element, "sequence".
+ ///
+ /// The request completion handler simply constructs the response,
+ /// and adds it the list of completed request/responses. If the
+ /// number of completed requests has reached the expected number
+ /// it stops the test IOService.
+ ///
+ /// @param sequence value for the integer element, "sequence",
+ /// to send in the request.
+ void startRequestSimple(int sequence, int port_offset = 0) {
+ // Create the URL on which the server can be reached.
+ std::stringstream ss;
+ ss << "http://" << SERVER_ADDRESS << ":" << (SERVER_PORT + port_offset);
+ Url url(ss.str());
+
+ // Initiate request to the server.
+ PostHttpRequestJsonPtr request_json = createRequest("sequence", sequence);
+ HttpResponseJsonPtr response_json = boost::make_shared<HttpResponseJson>();
+ ASSERT_NO_THROW(client_->asyncSendRequest(url, TlsContextPtr(),
+ request_json, response_json,
+ [this, request_json, response_json](const boost::system::error_code& ec,
+ const HttpResponsePtr&,
+ const std::string&) {
+ // Bail on an error.
+ ASSERT_FALSE(ec) << "asyncSendRequest failed, ec: " << ec;
+
+ // Get stringified thread-id.
+ std::stringstream ss;
+ ss << std::this_thread::get_id();
+
+ // Create the ClientRR.
+ ClientRRPtr clientRR(new ClientRR());
+ clientRR->thread_id_ = ss.str();
+ clientRR->request_ = request_json;
+ clientRR->response_ = response_json;
+
+ {
+ std::unique_lock<std::mutex> lck(test_mutex_);
+ clientRRs_.push_back(clientRR);
+ ++num_finished_;
+ if ((num_finished_ >= expected_requests_) && !io_service_.stopped()) {
+ io_service_.stop();
+ }
+ }
+
+ }));
+ }
+
/// @brief Carries out HTTP requests via HttpClient to HTTP listener(s).
///
/// This function creates one HttpClient with the given number
///
/// Then it iteratively runs the test's IOService until all
/// the requests have been responded to, an error occurs, or the
- /// test times out.
+ /// test times out. During each pass through the run loop, the
+ /// a call to shouldPause() is made to determine if the client
+ /// thread pool should be paused. If so, the the pool is paused
+ /// and an timer begun which resumes the pool upon timeout.
///
/// Each request carries a single integer element, "sequence", which
/// uniquely identifies the request. Each response is expected to
/// A value of 0 puts the HttpClient in single-threaded mode.
/// @param num_batches number of batches of requests that should be
/// conducted.
- /// @param num_listeners number of HttpListeners to create.
- void threadRequestAndReceive(size_t num_threads, size_t num_batches, size_t num_listeners = 1) {
+ /// @param num_listeners number of HttpListeners to create. Defaults
+ /// to 1.
+ /// @param num_pauses number of times to pause and resume the client
+ /// during the test. Defaults to 0.
+ void threadRequestAndReceive(size_t num_threads, size_t num_batches,
+ size_t num_listeners = 1,
+ size_t num_pauses = 0) {
ASSERT_TRUE(num_batches);
ASSERT_TRUE(num_listeners);
num_threads_ = num_threads;
}
}
- // Run test thread IOService. This drives the listener's IO.
- ASSERT_NO_THROW(runIOService());
+ // Create a timer to use for invoking resume after pause.
+ IntervalTimer pause_timer_(io_service_);
+ paused_ = false;
+
+ // Loop until the clients are done, an error occurs, or the time runs out.
+ while (getRRCount() < expected_requests_) {
+ // Always call restart() before we call run();
+ io_service_.restart();
+
+ if (shouldPause(num_pauses)) {
+ // Pause client.
+ paused_ = true;
+ ++pause_cnt_;
+ client_->pause();
+
+ // Set timer to resume client.
+ pause_timer_.setup(
+ [this]() {
+ client_->resume();
+ paused_ = false;
+ }, 10, IntervalTimer::ONE_SHOT);
+ }
+
+ // Run until a client stops the service.
+ io_service_.run();
+ }
// Client should stop without issue.
ASSERT_NO_THROW(client_->stop());
}
// We should have a response for each request.
- ASSERT_EQ(clientRRs_.size(), expected_requests_);
+ ASSERT_EQ(getRRCount(), expected_requests_);
+
+ // We should have had the expected number of pauses.
+ if (!num_pauses) {
+ ASSERT_EQ(pause_cnt_, 0);
+ } else {
+ // We allow a range on pauses of +-1. Figuring
+ // out the exact intervals at which to pause was
+ // getting to be a pain. We don't really care as
+ // long as we're close. The primary thing is that
+ // we did in fact pause and resume.
+ ASSERT_TRUE((num_pauses - 1) <= pause_cnt_ &&
+ (pause_cnt_ <= (num_pauses + 1)))
+ << " num+_pauses: " << num_pauses
+ << ", pause_cnt_" << pause_cnt_;
+ }
// Create a map to track number of responses for each client thread.
std::map<std::string, int> responses_per_thread;
}
}
+ /// @brief Indicates if the test should pause.
+ ///
+ /// Returns true if the number of completed requests
+ /// has reached or exceeded the next pause interval.
+ /// The pause interval is given by expected number of
+ /// requests divided by the desired number of pauses.
+ ///
+ /// @param num_pauses Desired number of pauses.
+ ///
+ /// @return True if the client should be paused.
+ bool shouldPause(size_t num_pauses) {
+ size_t rr_count = getRRCount();
+ if (paused_ || !num_pauses || !rr_count) {
+ return false;
+ }
+
+ size_t interval = expected_requests_ / num_pauses;
+ size_t next_stop = interval * (pause_cnt_ + 1);
+ return (rr_count >= next_stop);
+ }
+
+ /// @brief Verifies the client can be puased and shutdown while doing work.
+ ///
+ /// @param num_threads number of threads the HttpClient should use.
+ /// A value of 0 puts the HttpClient in single-threaded mode.
+ /// @param num_batches number of batches of requests that should be
+ /// conducted.
+ /// @param num_listeners number of HttpListeners to create. Defaults
+ /// to 1.
+ void workPauseShutdown(size_t num_threads, size_t num_batches,
+ size_t num_listeners = 1, bool pause_first = true) {
+ ASSERT_TRUE(num_batches);
+ ASSERT_TRUE(num_listeners);
+ num_threads_ = num_threads;
+ num_batches_ = num_batches;
+ num_listeners_ = num_listeners;
+
+ // Client in ST is, in effect, 1 thread.
+ size_t effective_threads = (num_threads_ == 0 ? 1 : num_threads_);
+
+ // Calculate the maximum requests that could complete.
+ size_t maximum_requests = (num_batches_ * num_listeners_ * effective_threads);
+
+ // Calculate the expected number of requests.
+ expected_requests_ = maximum_requests / 2;
+
+ for (auto i = 0; i < num_listeners_; ++i) {
+ // Make a factory
+ HttpResponseCreatorFactoryPtr factory(new TestHttpResponseCreatorFactory(SERVER_PORT + i));
+ factories_.push_back(factory);
+
+ // Need to create a Listener on
+ HttpListenerPtr listener(new HttpListener(io_service_,
+ IOAddress(SERVER_ADDRESS), (SERVER_PORT + i),
+ TlsContextPtr(), factory,
+ HttpListener::RequestTimeout(10000),
+ HttpListener::IdleTimeout(10000)));
+ listeners_.push_back(listener);
+
+ // Start the server.
+ ASSERT_NO_THROW(listener->start());
+ }
+
+ // Create an MT client with num_threads
+ ASSERT_NO_THROW_LOG(client_.reset(new HttpClient(io_service_, num_threads)));
+ ASSERT_TRUE(client_);
+
+ if (num_threads_ == 0) {
+ // If we single-threaded client should not have it's own IOService.
+ ASSERT_FALSE(client_->getThreadIOService());
+ } else {
+ // If we multi-threaded client should have it's own IOService.
+ ASSERT_TRUE(client_->getThreadIOService());
+ }
+
+ // Verify the pool size and number of threads are as expected.
+ ASSERT_EQ(client_->getThreadPoolSize(), num_threads);
+ ASSERT_EQ(client_->getThreadCount(), num_threads);
+
+ // Start the requisite number of requests:
+ // batch * listeners * threads.
+ int sequence = 0;
+ for (auto b = 0; b < num_batches; ++b) {
+ for (auto l = 0; l < num_listeners_; ++l) {
+ for (auto t = 0; t < effective_threads; ++t) {
+ startRequestSimple(++sequence, l);
+ }
+ }
+ }
+
+ // Loop until the 1/2 the reuests are done, an error occurs,
+ // or the time runs out.
+ size_t rr_count = 0;
+ while (rr_count < (expected_requests_)) {
+ // Always call reset() before we call run();
+ io_service_.get_io_service().reset();
+
+ // Run until a client stops the service.
+ io_service_.run();
+ rr_count = getRRCount();
+ }
+
+ if (pause_first) {
+ // Pause the client.
+ ASSERT_NO_THROW(client_->pause());
+ ASSERT_EQ(HttpThreadPool::RunState::PAUSED, client_->getRunState());
+ }
+
+ // We should have completed at least the expected number of requests
+ // but less than the maximum number of requests.
+ ASSERT_GE(getRRCount(), expected_requests_ );
+ ASSERT_LT(getRRCount(), maximum_requests);
+
+ // Client should stop without issue.
+ ASSERT_NO_THROW(client_->stop());
+
+ // Listeners should stop without issue.
+ for (const auto& listener : listeners_) {
+ ASSERT_NO_THROW(listener->stop());
+ }
+ }
+
+ /// @brief Fetch the number of completed requests.
+ ///
+ /// @return number of completed requests.
+ size_t getRRCount() {
+ std::unique_lock<std::mutex> lck(test_mutex_);
+ return(clientRRs_.size());
+ }
+
/// @brief IO service used in the tests.
IOService io_service_;
std::vector<ClientRRPtr> clientRRs_;
/// @brief Mutex for locking.
- std::mutex mutex_;
+ std::mutex test_mutex_;
/// @brief Condition variable used to make client threads wait
/// until number of in-progress requests reaches the number
/// of client requests.
- std::condition_variable cv_;
+ std::condition_variable test_cv_;
+
+ /// @brief Indicates if client threads are currently "paused".
+ bool paused_;
+
+ /// @brief Number of times client has been paused during the test.
+ size_t pause_cnt_;
};
// Verifies we can construct and destruct, in both single
ASSERT_NO_THROW_LOG(client->stop());
// Verify we're stopped.
- ASSERT_FALSE(client->getThreadIOService());
+ ASSERT_TRUE(client->getThreadIOService());
+ EXPECT_TRUE(client->getThreadIOService()->stopped());
ASSERT_EQ(client->getThreadPoolSize(), 3);
ASSERT_EQ(client->getThreadCount(), 0);
threadRequestAndReceive(num_threads, num_batches, num_listeners);
}
+// Verifies that we can cleanly work, pause, and resume repeatedly.
+TEST_F(MtHttpClientTest, workPauseResumee) {
+ size_t num_threads = 12;
+ size_t num_batches = 12;
+ size_t num_listeners = 12;
+ size_t num_pauses = 7;
+ threadRequestAndReceive(num_threads, num_batches, num_listeners, num_pauses);
+}
+
+// Verifies that we can cleanly pause and shutdown while doing
+// multi-threaded work.
+TEST_F(MtHttpClientTest, workPauseShutdown) {
+ size_t num_threads = 8;
+ size_t num_batches = 8;
+ size_t num_listeners = 8;
+ workPauseShutdown(num_threads, num_batches, num_listeners);
+}
+
} // end of anonymous namespace