HttpThreadPool state changes are now synchronous.
src/lib/config/cmd_http_listener.cc
Minor updates
src/lib/config/tests/cmd_http_listener_unittests.cc
Revamped to use simplified command handling for
pause/resume testing.
src/lib/http/client.*
Minor updates
src/lib/http/http_thread_pool.*
HttpThreadPool::setRunState() is now synchronous, returning
only when all threads in the pool are in appropriate state
or have been destroyed.
Did away with SHUTDOWN state.
Changed RUN to RUNNING
src/lib/http/tests/http_thread_pool_unittests.cc
Revamped testing.
src/lib/http/tests/mt_client_unittests.cc
Revamped to use request handler handling for pause/resume testing
void
CmdHttpListener::resume() {
if (threads_) {
- threads_->resume();
+ threads_->run();
}
}
.arg(port_);
}
-HttpThreadPool::RunState
+HttpThreadPool::RunState
CmdHttpListener::getRunState() const {
if (!threads_) {
isc_throw(InvalidOperation,
return (threads_->getRunState());
}
-bool
+bool
CmdHttpListener::isRunning() {
if (threads_) {
- return (threads_->getRunState() == HttpThreadPool::RunState::RUN);
+ return (threads_->getRunState() == HttpThreadPool::RunState::RUNNING);
}
return (false);
}
-bool
+bool
CmdHttpListener::isStopped() {
if (threads_) {
return (threads_->getRunState() == HttpThreadPool::RunState::STOPPED);
return (true);
}
-bool
+bool
CmdHttpListener::isPaused() {
if (threads_) {
return (threads_->getRunState() == HttpThreadPool::RunState::PAUSED);
///
/// @param num_pauses Desired number of times the listener should be
/// paused during the test.
- void runIOService(size_t num_pauses = 0) {
+ void runIOService(size_t request_limit = 0) {
// Create a timer to use for invoking resume after pause.
IntervalTimer pause_timer_(io_service_);
paused_ = false;
+ if (!request_limit) {
+ request_limit = clients_.size();
+ }
+
// Loop until the clients are done, an error occurs, or the time runs out.
size_t num_done = 0;
- while (num_done != clients_.size()) {
+ while (num_done != request_limit) {
// Always call restart() before we call run();
io_service_.restart();
- if (shouldPause(num_pauses, num_done)) {
- // Pause the listener .
- paused_ = true;
- ++pause_cnt_;
- listener_->pause();
-
- // Set timer to resume listener.
- pause_timer_.setup(
- [this]() {
- listener_->resume();
- paused_ = false;
- }, 10, IntervalTimer::ONE_SHOT);
- }
-
// Run until a client stops the service.
io_service_.run();
///
/// @return True if the listener should be paused.
bool shouldPause(size_t num_pauses, size_t num_done) {
- // True if the number of clients done is a multiple of the number of pauses.
- return (!paused_ && num_pauses && num_done && !(num_done % num_pauses));
+ size_t request_limit = (pause_cnt_ < num_pauses ?
+ (num_done + ((clients_.size() - num_done) / num_pauses))
+ : clients_.size());
+ return (request_limit);
}
/// @brief Create an HttpResponse from a response string.
///
/// @return Returns response with map of arguments containing
/// a string value 'thread-id': <thread id>
- ConstElementPtr threadCommandHandler(const std::string& /*command_name*/,
+ ConstElementPtr synchronizedCommandHandler(const std::string& /*command_name*/,
const ConstElementPtr& command_arguments) {
// If the number of in progress commands is less than the number
// of threads, then wait here until we're notified. Otherwise,
return (createAnswer(CONTROL_RESULT_SUCCESS, arguments));
}
+ /// @brief Handler for the 'thread' command.
+ ///
+ /// @param command_name Command name, i.e. 'thread'.
+ /// @param command_arguments Command arguments should contain
+ /// one string element, "client-ptr", whose value is the stringified
+ /// pointer to the client that issued the command.
+ ///
+ /// @return Returns response with map of arguments containing
+ /// a string value 'thread-id': <thread id>
+ ConstElementPtr simpleCommandHandler(const std::string& /*command_name*/,
+ const ConstElementPtr& command_arguments) {
+ // Create the map of response arguments.
+ ElementPtr arguments = Element::createMap();
+ // First we echo the client-ptr command argument.
+ ConstElementPtr client_ptr = command_arguments->get("client-ptr");
+ if (!client_ptr) {
+ return (createAnswer(CONTROL_RESULT_ERROR, "missing client-ptr"));
+ }
+
+ arguments->set("client-ptr", client_ptr);
+
+ // Now we add the thread-id.
+ std::stringstream ss;
+ ss << std::this_thread::get_id();
+ arguments->set("thread-id", Element::create(ss.str()));
+
+ // We're done, ship it!
+ return (createAnswer(CONTROL_RESULT_SUCCESS, arguments));
+ }
+
/// @brief Submits one or more thread commands to a CmdHttpListener.
///
/// This function command will creates a CmdHttpListener
/// thread command. Each client is used to carry out a single thread
/// command request. Must be greater than 0 and a multiple of num_threads
/// if it is greater than num_threads.
- /// @param num_pauses Desired number of times the listener should be
- /// paused during the test.
- void threadListenAndRespond(size_t num_threads, size_t num_clients,
- size_t num_pauses = 0) {
+ void threadListenAndRespond(size_t num_threads, size_t num_clients) {
// First we makes sure the parameter rules apply.
ASSERT_TRUE(num_threads > 0);
ASSERT_TRUE(num_clients > 0);
// Register the thread command handler.
CommandMgr::instance().registerCommand("thread",
std::bind(&CmdHttpListenerTest
- ::threadCommandHandler,
+ ::synchronizedCommandHandler,
this, ph::_1, ph::_2));
// Create a listener with prescribed number of threads.
// Now we run the client-side IOService until all requests are done,
// errors occur or the test times out.
- ASSERT_NO_FATAL_FAILURE(runIOService(num_pauses));
+ ASSERT_NO_FATAL_FAILURE(runIOService());
// Stop the listener and then verify it has stopped.
ASSERT_NO_THROW_LOG(listener_->stop());
<< "thread-id: " << it.first
<< ", clients: " << it.second << std::endl;
}
+ }
+
+ /// @brief Pauses and resumes a CmdHttpListener while it processes command
+ /// requests.
+ ///
+ /// This function command will creates a CmdHttpListener
+ /// with the given number of threads, initiates the given
+ /// number of clients, each requesting the "thread" command,
+ /// and then iteratively runs the test's IOService until all
+ /// the clients have received their responses or an error occurs.
+ /// It will pause and resume the listener at intervals governed
+ /// by the given number of pauses.
+ ///
+ /// @param num_threads - the number of threads the CmdHttpListener
+ /// should use. Must be greater than 0.
+ /// @param num_clients - the number of clients that should issue the
+ /// thread command. Each client is used to carry out a single thread
+ /// command request. Must be greater than 0.
+ /// @param num_pauses Desired number of times the listener should be
+ /// paused during the test. Must be greated than 0.
+ void workPauseAndResume(size_t num_threads, size_t num_clients,
+ size_t num_pauses) {
+ // First we makes sure the parameter rules apply.
+ ASSERT_TRUE(num_threads);
+ ASSERT_TRUE(num_clients);
+ ASSERT_TRUE(num_pauses);
+ num_threads_ = num_threads;
+ num_clients_ = num_clients;
+
+ // Register the thread command handler.
+ CommandMgr::instance().registerCommand("thread",
+ std::bind(&CmdHttpListenerTest
+ ::simpleCommandHandler,
+ this, ph::_1, ph::_2));
+
+ // Create a listener with prescribed number of threads.
+ ASSERT_NO_THROW_LOG(listener_.reset(new CmdHttpListener(IOAddress(SERVER_ADDRESS),
+ SERVER_PORT, num_threads)));
+ ASSERT_TRUE(listener_);
+
+ // Start it and verify it is running.
+ ASSERT_NO_THROW_LOG(listener_->start());
+ ASSERT_TRUE(listener_->isRunning());
+ EXPECT_EQ(listener_->getThreadCount(), num_threads);
+
+ // Maps the number of clients served by a given thread-id.
+ std::map<std::string, int> clients_per_thread;
+
+ // Initiate the prescribed number of command requests.
+ num_in_progress_ = 0;
+ for (auto i = 0; clients_.size() < num_clients; ++i) {
+ ASSERT_NO_THROW_LOG(startThreadCommand());
+ }
+
+ // Now we run the client-side IOService until all requests are done,
+ // errors occur or the test times out. We'll pause and resume the
+ // given the number of pauses
+ size_t num_done = 0;
+ size_t total_requests = clients_.size();
+ while (num_done < total_requests) {
+ // Calculate how many more requests to process before we pause again.
+ // We divide the number of oustanding requests by the number of pauses
+ // and stop after we've done at least that many more.
+ size_t request_limit = (pause_cnt_ < num_pauses ?
+ (num_done + ((total_requests - num_done) / num_pauses))
+ : total_requests);
+
+ // Run test IOService until we hit the limit.
+ runIOService(request_limit);
+
+ // If we've done all our pauses we should be through.
+ if (pause_cnt_ == num_pauses) {
+ break;
+ }
+
+ // Pause the client.
+ ASSERT_NO_THROW(listener_->pause());
+ ASSERT_TRUE(listener_->isPaused());
+ ++pause_cnt_;
+
+ // Check our progress.
+ num_done = 0;
+ for (auto const& client : clients_) {
+ if (client->receiveDone()) {
+ ++num_done;
+ }
+ }
+
+ // We should completed at least as many as our
+ // target limit.
+ ASSERT_GE(num_done, request_limit);
+
+ // Resume the listener.
+ ASSERT_NO_THROW(listener_->resume());
+ ASSERT_TRUE(listener_->isRunning());
+ }
+
+ // Stop the listener and then verify it has stopped.
+ ASSERT_NO_THROW_LOG(listener_->stop());
+ ASSERT_TRUE(listener_->isStopped());
+ EXPECT_EQ(listener_->getThreadCount(), 0);
+
+ // Iterate over the clients, checking their outcomes.
+ size_t total_responses = 0;
+ for (auto const& client : clients_) {
+ // Client should have completed its receive successfully.
+ ASSERT_TRUE(client->receiveDone());
+
+ // Client response should not be empty.
+ HttpResponsePtr hr;
+ std::string response_str = client->getResponse();
+ ASSERT_FALSE(response_str.empty());
+
+ // Parse the response into an HttpResponse.
+ ASSERT_NO_THROW_LOG(hr = parseResponse(client->getResponse()));
+
+ // Now we walk the element tree to get the response data. It should look
+ // this:
+ //
+ // [ {
+ // "arguments": { "client-ptr": "xxxxx",
+ // "thread-id": "zzzzz" },
+ // "result": 0
+ // } ]
+ //
+ // First we turn it into an Element tree.
+ std::string body_str = hr->getBody();
+ ConstElementPtr body;
+ ASSERT_NO_THROW_LOG(body = Element::fromJSON(hr->getBody()));
+
+ // Outermost is a list, since we're emulating agent responses.
+ ASSERT_EQ(body->getType(), Element::list);
+ ASSERT_EQ(body->size(), 1);
+
+ // Answer should be a map containing "arguments" and "results".
+ ConstElementPtr answer = body->get(0);
+ ASSERT_EQ(answer->getType(), Element::map);
+
+ // "result" should be 0.
+ ConstElementPtr result = answer->get("result");
+ ASSERT_TRUE(result);
+ ASSERT_EQ(result->getType(), Element::integer);
+ ASSERT_EQ(result->intValue(), 0);
+
+ // "arguments" is a map containing "client-ptr" and "thread-id".
+ ConstElementPtr arguments = answer->get("arguments");
+ ASSERT_TRUE(arguments);
+ ASSERT_EQ(arguments->getType(), Element::map);
+
+ // "client-ptr" is a string.
+ ConstElementPtr client_ptr = arguments->get("client-ptr");
+ ASSERT_TRUE(client_ptr);
+ ASSERT_EQ(client_ptr->getType(), Element::string);
+
+ // "thread-id" is a string.
+ ConstElementPtr thread_id = arguments->get("thread-id");
+ ASSERT_TRUE(thread_id);
+ ASSERT_EQ(thread_id->getType(), Element::string);
+ std::string thread_id_str = thread_id->stringValue();
+
+ // Make sure the response received was for this client.
+ std::stringstream ss;
+ ss << client;
+ ASSERT_EQ(client_ptr->stringValue(), ss.str());
+
+ ++total_responses;
+ }
+
+ // We should have responses for all our clients.
+ EXPECT_EQ(total_responses, num_clients);
// We should have had the expected number of pauses.
if (!num_pauses) {
EXPECT_EQ(listener_->getThreadCount(), 1);
ASSERT_TRUE(listener_->getThreadIOService());
EXPECT_FALSE(listener_->getThreadIOService()->stopped());
- EXPECT_EQ(listener_->getRunState(), HttpThreadPool::RunState::RUN);
+ EXPECT_EQ(listener_->getRunState(), HttpThreadPool::RunState::RUNNING);
// Trying to start it again should fail.
ASSERT_THROW_MSG(listener_->start(), InvalidOperation,
threadListenAndRespond(num_threads, num_clients);
}
-// Pauses and resumes during work.
+// Pauses and resumes the listener while it is processing
+// requests.
TEST_F(CmdHttpListenerTest, pauseAndResume) {
size_t num_threads = 6;
size_t num_clients = 18;
size_t num_pauses = 3;
- threadListenAndRespond(num_threads, num_clients, num_pauses);
+ workPauseAndResume(num_threads, num_clients, num_pauses);
}
} // end of anonymous namespace
isc_throw(InvalidOperation, "HttpClient::resume - no thread pool");
}
- // Resume the thread pool.
- threads_->resume();
+ // Resume running the thread pool.
+ threads_->run();
}
/// @brief Indicates if the thread pool processing is running.
///
- /// @return True if the thread pool exists and is in the RUN state,
+ /// @return True if the thread pool exists and is in the RUNNING state,
/// false otherwise.
bool isRunning() {
if (threads_) {
- return (threads_->getRunState() == HttpThreadPool::RunState::RUN);
+ return (threads_->getRunState() == HttpThreadPool::RunState::RUNNING);
}
return (false);
HttpThreadPool::HttpThreadPool(IOServicePtr io_service, size_t pool_size,
bool defer_start /* = false */)
: pool_size_(pool_size), io_service_(io_service),
- run_state_(RunState::STOPPED), mutex_(), cv_() {
+ run_state_(RunState::STOPPED), mutex_(), thread_cv_(),
+ main_cv_(), paused_(0), running_(0), exited_(0) {
if (!pool_size) {
isc_throw(BadValue, "HttpThreadPool::ctor pool_size must be > 0");
}
// If we're not deferring the start, do it now.
if (!defer_start) {
- start();
+ run();
}
}
HttpThreadPool::~HttpThreadPool() {
- if (getRunState() != RunState::STOPPED) {
- // Stop if we aren't already stopped
- stop();
+ stop();
+}
+
+void
+HttpThreadPool::run() {
+ setRunState(RunState::RUNNING);
+}
+
+void
+HttpThreadPool::pause() {
+ setRunState(RunState::PAUSED);
+}
+
+void
+HttpThreadPool::stop() {
+ setRunState(RunState::STOPPED);
+}
+
+HttpThreadPool::RunState
+HttpThreadPool::getRunState() {
+ std::lock_guard<std::mutex> lck(mutex_);
+ return (run_state_);
+}
+
+bool
+HttpThreadPool::validateStateChange(RunState new_state) const {
+ bool is_valid = false;
+ switch(run_state_) {
+ case RunState::STOPPED:
+ is_valid = (new_state == RunState::RUNNING);
+ break;
+ case RunState::RUNNING:
+ is_valid = (new_state != RunState::RUNNING);
+ break;
+ case RunState::PAUSED:
+ is_valid = (new_state != RunState::PAUSED);
+ break;
}
+
+ return (is_valid);
}
void
-HttpThreadPool::start() {
- if (getRunState() != RunState::STOPPED) {
- isc_throw(InvalidOperation, "HttpThreadPool::start already started!");
+HttpThreadPool::setRunState (RunState new_state) {
+ std::unique_lock<std::mutex> main_lck(mutex_);
+
+ // Bail if the transition is invalid.
+ if (!validateStateChange(new_state)) {
+ return;
}
- // Set state to RUN.
- setRunState(RunState::RUN);
+ run_state_ = new_state;
+ // Notify threads of state change.
+ thread_cv_.notify_all();
- // Prep IOService for run() invocations.
- io_service_->restart();
+ switch(new_state) {
+ case RunState::RUNNING: {
+ // Restart the IOSerivce.
+ io_service_->restart();
- // Create a pool of threads, each calls run() on our
- // io_service instance.
- for (std::size_t i = 0; i < pool_size_; ++i) {
- boost::shared_ptr<std::thread> thread(new std::thread(
+ // While we have fewer threads than we should, make more.
+ while (threads_.size() < pool_size_) {
+ boost::shared_ptr<std::thread> thread(new std::thread(
[this]() {
bool done = false;
while (!done) {
switch (getRunState()) {
- case RunState::RUN:
- io_service_->run();
+ case RunState::RUNNING: {
+ {
+ std::unique_lock<std::mutex> lck(mutex_);
+ running_++;
+
+ // If We're all running notify main thread.
+ if (running_ == pool_size_) {
+ main_cv_.notify_all();
+ }
+ }
+
+ // Run the IOService.
+ io_service_->run();
+
+ {
+ std::unique_lock<std::mutex> lck(mutex_);
+ running_--;
+ }
+
break;
- case RunState::PAUSED:
- {
- // We need to stop and wait to be released.
+ }
+
+ case RunState::PAUSED: {
std::unique_lock<std::mutex> lck(mutex_);
- cv_.wait(lck,
+ paused_++;
+
+ // If we're all paused notify main.
+ if (paused_ == threads_.size()) {
+ main_cv_.notify_all();
+ }
+
+ // Wait here till I'm released.
+ thread_cv_.wait(lck,
[&]() {
return (run_state_ != RunState::PAUSED);
});
+
+ paused_--;
break;
}
- case RunState::SHUTDOWN:
- done = true;
- break;
- case RunState::STOPPED:
- // This should never happen.
+
+ case RunState::STOPPED: {
done = true;
break;
- }
+ }}
}
- }));
-
- threads_.push_back(thread);
- }
-}
-
-void
-HttpThreadPool::stop() {
- if (getRunState() == RunState::STOPPED) {
- // Nothing to do.
- return;
- }
- // Set the state to SHUTDOWN.
- setRunState(RunState::SHUTDOWN);
+ std::unique_lock<std::mutex> lck(mutex_);
+ exited_++;
+ if (exited_ == threads_.size()) {
+ main_cv_.notify_all();
+ }
+ }));
- // Stop our IOService.
- if (!io_service_->stopped()) {
- // Flush canceled (and ready) handlers.
- io_service_->poll();
+ // Add thread to the pool.
+ threads_.push_back(thread);
+ }
- // Stop the service
- io_service_->stop();
- }
+ // Main thread waits here until all threads are running.
+ main_cv_.wait(main_lck,
+ [&]() {
+ return (running_ == threads_.size());
+ });
- // Shutdown the threads.
- for (auto const& thread : threads_) {
- thread->join();
+ exited_ = 0;
+ break;
}
- // Empty the thread pool.
- threads_.clear();
+ case RunState::PAUSED: {
+ // Stop IOService.
+ if (!io_service_->stopped()) {
+ io_service_->poll();
+ io_service_->stop();
+ }
- // Set the state to STOPPED.
- setRunState(RunState::STOPPED);
-}
+ // Main thread waits here until all threads are paused.
+ main_cv_.wait(main_lck,
+ [&]() {
+ return (paused_ == threads_.size());
+ });
-void
-HttpThreadPool::pause() {
- if (getRunState() != RunState::RUN) {
- // Not running, can't pause.
- return;
+ break;
}
- setRunState(RunState::PAUSED);
- io_service_->stop();
-}
-
-void
-HttpThreadPool::resume() {
- if (getRunState() != RunState::PAUSED) {
- // Not paused, can't resume.
- return;
- }
-
- io_service_->restart();
- setRunState(RunState::RUN);
-}
-
-HttpThreadPool::RunState
-HttpThreadPool::getRunState() {
- std::lock_guard<std::mutex> lck(mutex_);
- return (run_state_);
-}
-
-void
-HttpThreadPool::setRunState(RunState state) {
- {
- std::lock_guard<std::mutex> lck(mutex_);
- run_state_ = state;
- }
- cv_.notify_all();
+ case RunState::STOPPED: {
+ // Stop IOService.
+ if (!io_service_->stopped()) {
+ io_service_->poll();
+ io_service_->stop();
+ }
+
+ // Main thread waits here until all threads have exited.
+ main_cv_.wait(main_lck,
+ [&]() {
+ return (exited_ == threads_.size());
+ });
+
+ for (auto const& thread : threads_) {
+ thread->join();
+ }
+
+ threads_.clear();
+ break;
+ }}
}
IOServicePtr
/// @brief Describes the possible operational state of the pool.
enum class RunState {
STOPPED, /// Pool is not operational.
- RUN, /// Pool is populated with running threads.
- PAUSED, /// Pool is populated with threads which are paused.
- SHUTDOWN, /// Pool is transitioning from RUN or PAUSED to STOPPED.
+ RUNNING, /// Pool is populated with running threads.
+ PAUSED, /// Pool is populated with threads that are paused.
};
/// @brief Constructor
///
/// @param io_service IOService that will drive the pool's IO. If empty, it
/// create it's own instance.
- /// @param pool_size Maximum number of threads in the pool. Currently the number
- /// of threads is fixed at this value.
- /// @param defer_start If true, creation of the threads is deferred until a subsequent
- /// call to @ref start(). In this case the pool's operational state post construction
- /// is STOPPED. If false, the constructor will invoke start() which will create the
- /// threads, placing the pool in RUN state.
- HttpThreadPool(asiolink::IOServicePtr io_service, size_t pool_size, bool defer_start = false);
+ /// @param pool_size Maximum number of threads in the pool. Currently the
+ /// number of threads is fixed at this value.
+ /// @param defer_start If true, creation of the threads is deferred until
+ /// a subsequent call to @ref start(). In this case the pool's operational
+ /// state post construction is STOPPED. If false, the constructor will
+ /// invoke run() to tranistion the pool into the RUNNING state.
+ HttpThreadPool(asiolink::IOServicePtr io_service, size_t pool_size,
+ bool defer_start = false);
/// @brief Destructor
///
/// Ensures the pool is stopped prior to destruction.
~HttpThreadPool();
- /// @brief Transitions the pool from STOPPED to RUN run state.
+ /// @brief Transitions the pool from STOPPED or PAUSED to RUNNING.
///
- /// It starts the pool by doing the following:
- /// -# Sets state to RUN
- /// -# Restarts the IOService preparing it thread invocations of
- /// IOService::run()
- /// -# Creates thread_pool_size_ threads, adding each to the pool.
+ /// When called from the STOPPED state, the pool threads are created
+ /// begin processing events.
+ /// When called from the PAUSED state, the pool threads are released
+ /// from PAUSED and resume processing event.s
+ /// Has no effect if the pool is already in the RUNNING state.
+ void run();
+
+ /// @brief Transitions the pool from RUNNING to PAUSED state.
///
- /// @throw InvalidOperation if called with the pool in any state other
- /// than STOPPED.
- void start();
-
- /// @brief Transitions the pool to STOPPED state.
- ///
- /// It stops the pool by doing the following:
- /// -# Sets the state to SHUTDOWN.
- /// =# Stops the IOService.
- /// =# Joins the pool threads.
- /// -# Empties the pool.
- /// -# Sets the state to STOPPED.
- void stop();
-
- /// @brief Transitions the pool from RUN to PAUSED state.
- ///
- /// If the state is any state other than RUN it simply returns,
- /// otherwise it does the following:
- ///
- /// -# Sets the state to PAUSED.
- /// -# Stops the IOService.
+ /// Pool threads suspend event processing and pause until they
+ /// are released to either resume running or stop.
+ /// Has no effect if the pool is already in the PAUSED or STOPPED
+ /// state.
void pause();
- /// @brief Transitions the pool from PAUSED to RUN state.
- ///
- /// If the state is any state other than PAUSED it simply returns,
- /// otherwise it does the following:
+ /// @brief Transitions the pool to from RUNNING OR PAUSED to STOPPED.
///
- /// -# Restarts the IOService preparing it for thread invocations
- /// of IOService::run()
- /// -# Sets the state to RUN.
- void resume();
+ /// Stops thread event processing and then destroys the pool's threads
+ /// Has no effect if the pool is already in the STOPPED state.
+ void stop();
/// @brief Thread-safe fetch of the pool's operational state.
///
RunState getRunState();
private:
- /// @brief Thread-safe set of the pool's operational state.
+ /// @brief Thread-safe change of the pool's operational state.
///
- /// @note This method does not validate the state change.
+ /// Transitions a pool from one state to another:
+ ///
+ /// When moving from STOPPED or PAUSED to RUNNING:
+ /// -# Sets state to RUNNING.
+ /// -# Notifies threads of state change.
+ /// -# Restarts the IOService.
+ /// -# Creates the threads if they do not yet exist (true only
+ /// when transitioning from STOPPED).
+ /// -# Waits until threads are running.
+ /// -# Returns to caller.
+ ///
+ /// When moving from RUNNING or PAUSED to STOPPED:
+ /// -# Sets state to STOPPED
+ /// -# Notifies threads of state change.
+ /// -# Polls the IOService to flush handlers.
+ /// -# Stops the IOService.
+ /// -# Waits until all threads have exited the work function.
+ /// -# Joins and destroys the threads.
+ /// -# Returns to caller.
+ ///
+ /// When moving from RUNNING to PAUSED:
+ /// -# Sets state to PAUSED
+ /// -# Notifies threads of state change.
+ /// -# Polls the IOService to flush handlers.
+ /// -# Stops the IOService.
+ /// -# Waits until all threads have paused.
+ /// -# Returns to caller.
///
/// @param state new state for the pool.
void setRunState(RunState state);
+ /// @brief Validates whether the pool can change to a given state.
+ ///
+ /// @param state new state for the pool.
+ /// @return true if the changs is valid, false otherwise.
+ /// @note Must be called from a thread-safe context.
+ bool validateStateChange(RunState state) const;
+
public:
+
/// @brief Fetches the IOService that drives the pool.
///
/// @return A pointer to the IOService.
/// @brief Mutex to protect the internal state.
std::mutex mutex_;
- /// @brief Condition variable for synchronization.
- std::condition_variable cv_;
+ /// @brief Condition variable used by threads for synchronization.
+ std::condition_variable thread_cv_;
+
+ /// @brief Condition variable used by main thread to wait on threads
+ /// state transitions.
+ std::condition_variable main_cv_;
+
+ /// @brief Number of threads currently paused.
+ size_t paused_;
+
+ /// @brief Number of threads currently running.
+ size_t running_;
+
+ /// @brief Number of threads that have exited the work funcion.
+ size_t exited_;
/// @brief Pool of threads used to service connections in multi-threaded
/// mode.
std::list<boost::shared_ptr<std::thread> > threads_;
+
};
/// @brief Defines a pointer to a thread pool.
#include <config.h>
#include <asiolink/asio_wrapper.h>
+#include <asiolink/interval_timer.h>
#include <asiolink/io_service.h>
#include <exceptions/exceptions.h>
#include <http/http_thread_pool.h>
namespace {
+/// @brief Test timeout (ms).
+const long TEST_TIMEOUT = 10000;
+
+/// @brief Simple test fixture for testing HttpThreadPool.
class HttpThreadPoolTest : public ::testing::Test {
public:
- HttpThreadPoolTest() : io_service_(new IOService()) {
+ /// @brief Constructor.
+ HttpThreadPoolTest()
+ : io_service_(new IOService()) {
}
- ~HttpThreadPoolTest() {
+ /// @brief Destructor.
+ virtual ~HttpThreadPoolTest() {
io_service_->stop();
}
+ /// @brief IOService instance used by thread pools.
IOServicePtr io_service_;
};
"HttpThreadPool::ctor pool_size must be > 0");
}
+// Verifies that a pool can be created without starting it.
TEST_F(HttpThreadPoolTest, deferredStartConstruction) {
HttpThreadPoolPtr pool;
ASSERT_NO_THROW_LOG(pool.reset(new HttpThreadPool(io_service_, 3, true)));
+ // State should be stopped.
// Pool size should be 3
- // State should be stopped.
// IOService should be there.
- // IOService is new, so it should not stopped,
+ // IOService is new, so it should not be stopped,
// No threads in the pool.
+ ASSERT_EQ(HttpThreadPool::RunState::STOPPED, pool->getRunState());
EXPECT_EQ(pool->getPoolSize(), 3);
ASSERT_TRUE(pool->getIOService());
EXPECT_FALSE(pool->getIOService()->stopped());
- EXPECT_EQ(HttpThreadPool::RunState::STOPPED, pool->getRunState());
EXPECT_EQ(pool->getThreadCount(), 0);
- // Stop should not throw.
- ASSERT_NO_THROW_LOG(pool->stop());
+ // Destructor should not throw.
+ ASSERT_NO_THROW_LOG(pool.reset());
+}
- // Nothing should have changed.
- EXPECT_EQ(HttpThreadPool::RunState::STOPPED, pool->getRunState());
- EXPECT_FALSE(pool->getIOService()->stopped());
- EXPECT_EQ(pool->getThreadCount(), 0);
+// Verifies that a pool can be started within the constructor.
+TEST_F(HttpThreadPoolTest, startDuringConstruction) {
+ HttpThreadPoolPtr pool;
- // Start should not throw.
- ASSERT_NO_THROW_LOG(pool->start());
+ ASSERT_NO_THROW_LOG(pool.reset(new HttpThreadPool(io_service_, 3)));
- // State should be RUN, IOService should not be stopped,
- // and there should be 3 threads in the pool.
+ // Pool size should be 3, state should be RUNNING, IOService should
+ // set but not stopped, and we should have 3 threads in the pool.
+ ASSERT_EQ(HttpThreadPool::RunState::RUNNING, pool->getRunState());
EXPECT_EQ(pool->getPoolSize(), 3);
- EXPECT_EQ(HttpThreadPool::RunState::RUN, pool->getRunState());
+ ASSERT_TRUE(pool->getIOService());
EXPECT_FALSE(pool->getIOService()->stopped());
EXPECT_EQ(pool->getThreadCount(), 3);
- // Stopping should not throw.
- ASSERT_NO_THROW_LOG(pool->stop());
-
- // State should be stopped, IOService should be stopped, and
- // there should be no threads in the pool.
- EXPECT_EQ(HttpThreadPool::RunState::STOPPED, pool->getRunState());
- EXPECT_TRUE(pool->getIOService()->stopped());
- EXPECT_EQ(pool->getThreadCount(), 0);
-
// Destructor should not throw.
ASSERT_NO_THROW_LOG(pool.reset());
}
-TEST_F(HttpThreadPoolTest, startDuringConstruction) {
+// Verifies that pool can move from STOPPED to RUNNING.
+TEST_F(HttpThreadPoolTest, stoppedToRunning) {
HttpThreadPoolPtr pool;
- ASSERT_NO_THROW_LOG(pool.reset(new HttpThreadPool(io_service_, 3)));
+ // Create a stopped pool.
+ ASSERT_NO_THROW_LOG(pool.reset(new HttpThreadPool(io_service_, 3, true)));
+ ASSERT_EQ(HttpThreadPool::RunState::STOPPED, pool->getRunState());
- // Pool size should be 3, state should be RUN, IOService should
- // set but not stopped, and we should have 3 threads in the pool.
- EXPECT_EQ(pool->getPoolSize(), 3);
- EXPECT_EQ(HttpThreadPool::RunState::RUN, pool->getRunState());
- ASSERT_TRUE(pool->getIOService());
+ // Call run from STOPPED.
+ ASSERT_NO_THROW_LOG(pool->run());
+
+ // State should be RUNNING, IOService should not be stopped, we should
+ // have 3 threads in the pool.
+ ASSERT_EQ(HttpThreadPool::RunState::RUNNING, pool->getRunState());
+ EXPECT_FALSE(pool->getIOService()->stopped());
+ EXPECT_EQ(pool->getThreadCount(), 3);
+
+ // Calling run again should be harmless.
+ ASSERT_NO_THROW_LOG(pool->run());
+
+ // State should be RUNNING, IOService should not be stopped, we should
+ // have 3 threads in the pool.
+ ASSERT_EQ(HttpThreadPool::RunState::RUNNING, pool->getRunState());
EXPECT_FALSE(pool->getIOService()->stopped());
EXPECT_EQ(pool->getThreadCount(), 3);
- // Starting again should throw.
- ASSERT_THROW_MSG(pool->start(), InvalidOperation,
- "HttpThreadPool::start already started!");
+ // Destroying the pool should be fine.
+ ASSERT_NO_THROW_LOG(pool.reset());
+}
+
+// Verifies that pool can move from RUNNING to STOPPED.
+TEST_F(HttpThreadPoolTest, runningToStopped) {
+ HttpThreadPoolPtr pool;
+
+ // Create a running pool.
+ ASSERT_NO_THROW_LOG(pool.reset(new HttpThreadPool(io_service_, 3, false)));
+ ASSERT_EQ(HttpThreadPool::RunState::RUNNING, pool->getRunState());
- // Stop should not throw.
+ // Call stop.
ASSERT_NO_THROW_LOG(pool->stop());
- // Pool size should be 3, state should STOPPED, IOService should
- // be stopped, and there should be no threads in the pool.
- EXPECT_EQ(pool->getPoolSize(), 3);
- EXPECT_EQ(HttpThreadPool::RunState::STOPPED, pool->getRunState());
+ // State should be STOPPED, IOService should be stopped, we should
+ // have 0 threads in the pool.
+ ASSERT_EQ(HttpThreadPool::RunState::STOPPED, pool->getRunState());
EXPECT_TRUE(pool->getIOService()->stopped());
EXPECT_EQ(pool->getThreadCount(), 0);
- // Destructor should not throw.
+ // Calling stop again should be harmless.
+ ASSERT_NO_THROW_LOG(pool->stop());
+
+ // State should be STOPPED, IOService should be stopped, we should
+ // have 0 threads in the pool.
+ ASSERT_EQ(HttpThreadPool::RunState::STOPPED, pool->getRunState());
+ EXPECT_TRUE(pool->getIOService()->stopped());
+ EXPECT_EQ(pool->getThreadCount(), 0);
+
+ // Destroying the pool should be fine.
ASSERT_NO_THROW_LOG(pool.reset());
}
-TEST_F(HttpThreadPoolTest, pauseAndResume) {
+// Verifies that pool can move from RUNNING to PAUSED.
+TEST_F(HttpThreadPoolTest, runningToPaused) {
HttpThreadPoolPtr pool;
- ASSERT_NO_THROW_LOG(pool.reset(new HttpThreadPool(io_service_, 3)));
+ // Create a running pool.
+ ASSERT_NO_THROW_LOG(pool.reset(new HttpThreadPool(io_service_, 3, false)));
+ ASSERT_EQ(HttpThreadPool::RunState::RUNNING, pool->getRunState());
- // State should be RUN, IOService not stopped, 3 threads in the pool.
- EXPECT_EQ(HttpThreadPool::RunState::RUN, pool->getRunState());
- EXPECT_FALSE(pool->getIOService()->stopped());
- EXPECT_EQ(pool->getThreadCount(), 3);
-
- // Pause should not throw.
+ // Call pause from RUNNING.
ASSERT_NO_THROW_LOG(pool->pause());
// State should be PAUSED, IOService should be stopped, we should
- // still have 3 threads in the pool.
- EXPECT_EQ(HttpThreadPool::RunState::PAUSED, pool->getRunState());
+ // have 3 threads in the pool.
+ ASSERT_EQ(HttpThreadPool::RunState::PAUSED, pool->getRunState());
EXPECT_TRUE(pool->getIOService()->stopped());
EXPECT_EQ(pool->getThreadCount(), 3);
- // Pausing again should be harmless.
+ // Calling pause again should be harmless.
ASSERT_NO_THROW_LOG(pool->pause());
- // Nothing should have changed.
- EXPECT_EQ(HttpThreadPool::RunState::PAUSED, pool->getRunState());
+ // State should be PAUSED, IOService should be stopped, we should
+ // have 3 threads in the pool.
+ ASSERT_EQ(HttpThreadPool::RunState::PAUSED, pool->getRunState());
EXPECT_TRUE(pool->getIOService()->stopped());
EXPECT_EQ(pool->getThreadCount(), 3);
- // Resume should not throw.
- ASSERT_NO_THROW_LOG(pool->resume());
+ // Destroying the pool should be fine.
+ ASSERT_NO_THROW_LOG(pool.reset());
+}
- // State should be PAUSED, IOService should be stopped, we should
- // still have 3 threads in the pool.
- EXPECT_EQ(HttpThreadPool::RunState::RUN, pool->getRunState());
- EXPECT_FALSE(pool->getIOService()->stopped());
- EXPECT_EQ(pool->getThreadCount(), 3);
+// Verifies that pool can move from PAUSED to RUNNING.
+TEST_F(HttpThreadPoolTest, pausedToRunning) {
+ HttpThreadPoolPtr pool;
+
+ // Create a running pool.
+ ASSERT_NO_THROW_LOG(pool.reset(new HttpThreadPool(io_service_, 3, false)));
+ ASSERT_EQ(HttpThreadPool::RunState::RUNNING, pool->getRunState());
- // Resuming again should be harmless.
- ASSERT_NO_THROW_LOG(pool->resume());
+ // Call pause from RUNNING.
+ ASSERT_NO_THROW_LOG(pool->pause());
+ ASSERT_EQ(HttpThreadPool::RunState::PAUSED, pool->getRunState());
- // Nothing should have changed.
- EXPECT_EQ(HttpThreadPool::RunState::RUN, pool->getRunState());
+ // Call run.
+ ASSERT_NO_THROW_LOG(pool->run());
+
+ // State should be RUNNING, IOService should not be stopped, we should
+ // have 3 threads in the pool.
+ ASSERT_EQ(HttpThreadPool::RunState::RUNNING, pool->getRunState());
EXPECT_FALSE(pool->getIOService()->stopped());
EXPECT_EQ(pool->getThreadCount(), 3);
- // Stop should not throw.
+ // Destroying the pool should be fine.
+ ASSERT_NO_THROW_LOG(pool.reset());
+}
+
+// Verifies that pool can move from PAUSED to STOPPED.
+TEST_F(HttpThreadPoolTest, pausedToStopped) {
+ HttpThreadPoolPtr pool;
+
+ // Create a running pool.
+ ASSERT_NO_THROW_LOG(pool.reset(new HttpThreadPool(io_service_, 3, false)));
+ ASSERT_EQ(HttpThreadPool::RunState::RUNNING, pool->getRunState());
+
+ // Call pause from RUNNING.
+ ASSERT_NO_THROW_LOG(pool->pause());
+ ASSERT_EQ(HttpThreadPool::RunState::PAUSED, pool->getRunState());
+
+ // Call stop.
ASSERT_NO_THROW_LOG(pool->stop());
- // State should STOPPED, IOService should be stopped,
- // and there should be no threads in the pool.
- EXPECT_EQ(HttpThreadPool::RunState::STOPPED, pool->getRunState());
+ // State should be STOPPED, IOService should be stopped, we should
+ // have 0 threads in the pool.
+ ASSERT_EQ(HttpThreadPool::RunState::STOPPED, pool->getRunState());
EXPECT_TRUE(pool->getIOService()->stopped());
EXPECT_EQ(pool->getThreadCount(), 0);
- // Destructor should not throw.
+ // Destroying the pool should be fine.
+ ASSERT_NO_THROW_LOG(pool.reset());
+}
+
+// Verifies that attempting to pause a STOPPED pool has no effect.
+TEST_F(HttpThreadPoolTest, stoppedToPaused) {
+ HttpThreadPoolPtr pool;
+
+ // Create a stopped pool.
+ ASSERT_NO_THROW_LOG(pool.reset(new HttpThreadPool(io_service_, 3, true)));
+ ASSERT_EQ(HttpThreadPool::RunState::STOPPED, pool->getRunState());
+
+ // State should be STOPPED, IOService won't be stopped because it was
+ // never started. We should have 0 threads in the pool.
+ ASSERT_EQ(HttpThreadPool::RunState::STOPPED, pool->getRunState());
+ EXPECT_FALSE(pool->getIOService()->stopped());
+ EXPECT_EQ(pool->getThreadCount(), 0);
+
+ // Call pause from STOPPED.
+ ASSERT_NO_THROW_LOG(pool->pause());
+
+ // Should have no effect.
+ ASSERT_EQ(HttpThreadPool::RunState::STOPPED, pool->getRunState());
+
+ // State should be STOPPED, IOService won't be stopped because it was
+ // never started. We should have 0 threads in the pool.
+ ASSERT_EQ(HttpThreadPool::RunState::STOPPED, pool->getRunState());
+ EXPECT_FALSE(pool->getIOService()->stopped());
+ EXPECT_EQ(pool->getThreadCount(), 0);
+
+ // Destroying the pool should be fine.
ASSERT_NO_THROW_LOG(pool.reset());
}
: io_service_(), client_(), listener_(), factory_(), listeners_(), factories_(),
test_timer_(io_service_), num_threads_(0), num_batches_(0), num_listeners_(0),
expected_requests_(0), num_in_progress_(0), num_finished_(0), paused_(false),
- pause_cnt_(0), shutdown_(false) {
+ pause_cnt_(0) {
test_timer_.setup(std::bind(&MtHttpClientTest::timeoutHandler, this, true),
TEST_TIMEOUT, IntervalTimer::ONE_SHOT);
MultiThreadingMgr::instance().setMode(true);
~MtHttpClientTest() {
// Stop the client.
if (client_) {
- stopTestClient();
+ client_->stop();
}
// Stop all listeners.
/// @brief Runs the test's IOService until the desired number of requests
/// have been carried out or the test fails.
- void runIOService() {
- // Loop until the clients are done, an error occurs, or the time runs out.
- while (getRRCount() < expected_requests_) {
+ void runIOService(size_t request_limit) {
+ while (getRRCount() < request_limit) {
// Always call reset() before we call run();
- io_service_.get_io_service().reset();
+ io_service_.restart();
// Run until a client stops the service.
io_service_.run();
} else {
// I'm ready but others aren't wait here.
bool ret = test_cv_.wait_for(lck, std::chrono::seconds(10),
- [&]() { return (num_in_progress_ == num_threads_ || shutdown_); });
+ [&]() { return (num_in_progress_ == num_threads_); });
if (!ret) {
ADD_FAILURE() << "clients failed to start work";
}
} else {
// I'm done but others aren't wait here.
bool ret = test_cv_.wait_for(lck, std::chrono::seconds(10),
- [&]() { return (num_finished_ == num_threads_ || shutdown_); });
+ [&]() { return (num_finished_ == num_threads_); });
if (!ret) {
ADD_FAILURE() << "clients failed to finish work";
}
}));
}
+
+ /// @brief Initiates a single HTTP request.
+ ///
+ /// Constructs an HTTP post whose body is a JSON map containing a
+ /// single integer element, "sequence".
+ ///
+ /// The request completion handler simply constructs the response,
+ /// and adds it the list of completed request/responses. If the
+ /// number of completed requests has reached the expected number
+ /// it stops the test IOService.
+ ///
+ /// @param sequence value for the integer element, "sequence",
+ /// to send in the request.
+ void startRequestSimple(int sequence, int port_offset = 0) {
+ // Create the URL on which the server can be reached.
+ std::stringstream ss;
+ ss << "http://" << SERVER_ADDRESS << ":" << (SERVER_PORT + port_offset);
+ Url url(ss.str());
+
+ // Initiate request to the server.
+ PostHttpRequestJsonPtr request_json = createRequest("sequence", sequence);
+ HttpResponseJsonPtr response_json = boost::make_shared<HttpResponseJson>();
+ ASSERT_NO_THROW(client_->asyncSendRequest(url, TlsContextPtr(),
+ request_json, response_json,
+ [this, request_json, response_json](const boost::system::error_code& ec,
+ const HttpResponsePtr&,
+ const std::string&) {
+ // Bail on an error.
+ ASSERT_FALSE(ec) << "asyncSendRequest failed, ec: " << ec;
+
+ // Get stringified thread-id.
+ std::stringstream ss;
+ ss << std::this_thread::get_id();
+
+ // Create the ClientRR.
+ ClientRRPtr clientRR(new ClientRR());
+ clientRR->thread_id_ = ss.str();
+ clientRR->request_ = request_json;
+ clientRR->response_ = response_json;
+
+ {
+ std::unique_lock<std::mutex> lck(test_mutex_);
+ clientRRs_.push_back(clientRR);
+ ++num_finished_;
+ if ((num_finished_ >= expected_requests_) && !io_service_.stopped()) {
+ io_service_.stop();
+ }
+ }
+
+ }));
+ }
+
/// @brief Carries out HTTP requests via HttpClient to HTTP listener(s).
///
/// This function creates one HttpClient with the given number
///
/// Then it iteratively runs the test's IOService until all
/// the requests have been responded to, an error occurs, or the
- /// test times out. During each pass through the run loop, the
- /// a call to shouldPause() is made to determine if the client
- /// thread pool should be paused. If so, the the pool is paused
- /// and an timer begun which resumes the pool upon timeout.
+ /// test times out.
///
/// Each request carries a single integer element, "sequence", which
/// uniquely identifies the request. Each response is expected to
/// conducted.
/// @param num_listeners number of HttpListeners to create. Defaults
/// to 1.
- /// @param num_pauses number of times to pause and resume the client
- /// during the test. Defaults to 0.
void threadRequestAndReceive(size_t num_threads, size_t num_batches,
- size_t num_listeners = 1,
- size_t num_pauses = 0) {
+ size_t num_listeners = 1) {
ASSERT_TRUE(num_batches);
ASSERT_TRUE(num_listeners);
num_threads_ = num_threads;
}
}
- // Create a timer to use for invoking resume after pause.
- IntervalTimer pause_timer_(io_service_);
- paused_ = false;
-
// Loop until the clients are done, an error occurs, or the time runs out.
while (getRRCount() < expected_requests_) {
// Always call restart() before we call run();
io_service_.restart();
- if (shouldPause(num_pauses)) {
- // Pause client.
- paused_ = true;
- ++pause_cnt_;
- client_->pause();
-
- // Set timer to resume client.
- pause_timer_.setup(
- [this]() {
- client_->resume();
- paused_ = false;
- }, 10, IntervalTimer::ONE_SHOT);
- }
-
// Run until a client stops the service.
io_service_.run();
}
// Client should stop without issue.
- stopTestClient();
+ ASSERT_NO_THROW(client_->stop());
// Listeners should stop without issue.
for (const auto& listener : listeners_) {
// We should have a response for each request.
ASSERT_EQ(getRRCount(), expected_requests_);
- // We should have had the expected number of pauses.
- if (!num_pauses) {
- ASSERT_EQ(pause_cnt_, 0);
- } else {
- // We allow a range on pauses of +-1. Figuring
- // out the exact intervals at which to pause was
- // getting to be a pain. We don't really care as
- // long as we're close. The primary thing is that
- // we did in fact pause and resume.
- ASSERT_TRUE((num_pauses - 1) <= pause_cnt_ &&
- (pause_cnt_ <= (num_pauses + 1)))
- << " num+_pauses: " << num_pauses
- << ", pause_cnt_" << pause_cnt_;
- }
-
// Create a map to track number of responses for each client thread.
std::map<std::string, int> responses_per_thread;
}
}
- /// @brief Indicates if the test should pause.
- ///
- /// Returns true if the number of completed requests
- /// has reached or exceeded the next pause interval.
- /// The pause interval is given by expected number of
- /// requests divided by the desired number of pauses.
- ///
- /// @param num_pauses Desired number of pauses.
- ///
- /// @return True if the client should be paused.
- bool shouldPause(size_t num_pauses) {
- size_t rr_count = getRRCount();
- if (paused_ || !num_pauses || !rr_count) {
- return false;
- }
-
- size_t interval = expected_requests_ / num_pauses;
- size_t next_stop = interval * (pause_cnt_ + 1);
- return (rr_count >= next_stop);
- }
-
- /// @brief Stops the test client.
- ///
- /// Sets the shutdown flag and pings the test condition variable,
- /// and then stops the thread pool.
- void stopTestClient() {
- // Set shutdown_ flag and notify any handles that may be waiting.
- shutdown_ = true;
- test_cv_.notify_all();
-
- // Client should stop without issue.
- ASSERT_NO_THROW(client_->stop());
- }
-
- /// @brief Verifies the client can be paused and shutdown while doing work.
+ /// @brief Verifies the client can be paused and resumed repeatedly
+ /// while doing multi-threaded doing work.
///
/// @param num_threads number of threads the HttpClient should use.
- /// A value of 0 puts the HttpClient in single-threaded mode.
+ /// Must be greater than zero, this test does not make sense for a
+ /// single threaded client.
/// @param num_batches number of batches of requests that should be
/// conducted.
/// @param num_listeners number of HttpListeners to create. Defaults
/// to 1.
- void workPauseShutdown(size_t num_threads, size_t num_batches,
- size_t num_listeners = 1, bool pause_first = true) {
+ /// @param num_pauses number of pauses to conduct.
+ void workPauseResumeShutdown(size_t num_threads, size_t num_batches,
+ size_t num_listeners, size_t num_pauses) {
+ ASSERT_TRUE(num_threads);
ASSERT_TRUE(num_batches);
ASSERT_TRUE(num_listeners);
num_threads_ = num_threads;
num_batches_ = num_batches;
num_listeners_ = num_listeners;
- // Client in ST is, in effect, 1 thread.
- size_t effective_threads = (num_threads_ == 0 ? 1 : num_threads_);
-
- // Calculate the maximum requests that could complete.
- size_t maximum_requests = (num_batches_ * num_listeners_ * effective_threads);
-
- // Calculate the expected number of requests.
- expected_requests_ = maximum_requests / 2;
+ // Calculate the total expected number of requests.
+ size_t total_requests = (num_batches_ * num_listeners_ * num_threads_);
+ // Create the listeners.
for (auto i = 0; i < num_listeners_; ++i) {
// Make a factory
HttpResponseCreatorFactoryPtr factory(new TestHttpResponseCreatorFactory(SERVER_PORT + i));
ASSERT_NO_THROW(listener->start());
}
- // Create an MT client with num_threads
+ // Create an instant start, MT client with num_threads
ASSERT_NO_THROW_LOG(client_.reset(new HttpClient(io_service_, num_threads)));
ASSERT_TRUE(client_);
- // Check convenience functions.
+ // Client shoudl be running. Check convenience functions.
ASSERT_TRUE(client_->isRunning());
ASSERT_FALSE(client_->isPaused());
ASSERT_FALSE(client_->isStopped());
- if (num_threads_ == 0) {
- // If we single-threaded client should not have it's own IOService.
- ASSERT_FALSE(client_->getThreadIOService());
- } else {
- // If we multi-threaded client should have it's own IOService.
- ASSERT_TRUE(client_->getThreadIOService());
- }
-
// Verify the pool size and number of threads are as expected.
ASSERT_EQ(client_->getThreadPoolSize(), num_threads);
ASSERT_EQ(client_->getThreadCount(), num_threads);
int sequence = 0;
for (auto b = 0; b < num_batches; ++b) {
for (auto l = 0; l < num_listeners_; ++l) {
- for (auto t = 0; t < effective_threads; ++t) {
- startRequest(++sequence, l);
+ for (auto t = 0; t < num_threads_; ++t) {
+ startRequestSimple(++sequence, l);
}
}
}
- // Loop until the 1/2 the requests are done, an error occurs,
- // or the time runs out.
size_t rr_count = 0;
- while (rr_count < (expected_requests_)) {
- // Always call reset() before we call run();
- io_service_.get_io_service().reset();
+ while (rr_count < total_requests) {
+ size_t request_limit = (pause_cnt_ < num_pauses ?
+ (rr_count + ((total_requests - rr_count) / num_pauses))
+ : total_requests);
- // Run until a client stops the service.
- io_service_.run();
- rr_count = getRRCount();
- }
+ // Run test IOService until we hit the limit.
+ runIOService(request_limit);
+
+ // If we've done all our pauses we should be through.
+ if (pause_cnt_ == num_pauses) {
+ break;
+ }
- if (pause_first) {
// Pause the client.
ASSERT_NO_THROW(client_->pause());
- ASSERT_EQ(HttpThreadPool::RunState::PAUSED, client_->getRunState());
-
- // Check convenience functions.
- ASSERT_FALSE(client_->isRunning());
ASSERT_TRUE(client_->isPaused());
- ASSERT_FALSE(client_->isStopped());
- }
+ ++pause_cnt_;
- // We should have completed at least the expected number of requests
- // but less than the maximum number of requests.
- ASSERT_GE(getRRCount(), expected_requests_ );
- ASSERT_LT(getRRCount(), maximum_requests);
+ // Check our progress.
+ rr_count = getRRCount();
+ ASSERT_GE(rr_count, request_limit);
+
+ // Resume the client.
+ ASSERT_NO_THROW(client_->resume());
+ ASSERT_TRUE(client_->isRunning());
+ }
// Client should stop without issue.
- stopTestClient();
+ ASSERT_NO_THROW(client_->stop());
+ ASSERT_TRUE(client_->isStopped());
+
+ // We should have finished all our requests.
+ ASSERT_EQ(getRRCount(), total_requests);
+
+ // Stopping again should be harmless.
+ ASSERT_NO_THROW(client_->stop());
// Listeners should stop without issue.
for (const auto& listener : listeners_) {
ASSERT_NO_THROW(listener->stop());
}
+
+ // Destructor should work fine.
+ client_.reset();
}
/// @brief Fetch the number of completed requests.
///
/// @return number of completed requests.
size_t getRRCount() {
- std::unique_lock<std::mutex> lck(test_mutex_);
- return(clientRRs_.size());
+ std::lock_guard<std::mutex> lck(test_mutex_);
+ return (clientRRs_.size());
}
/// @brief IO service used in the tests.
ASSERT_TRUE(client->getThreadIOService());
ASSERT_EQ(client->getThreadPoolSize(), 3);
ASSERT_EQ(client->getThreadCount(), 3);
- ASSERT_EQ(client->getRunState(), HttpThreadPool::RunState::RUN);
+ ASSERT_EQ(client->getRunState(), HttpThreadPool::RunState::RUNNING);
// Check convenience functions.
ASSERT_TRUE(client->isRunning());
// We should be able to start it.
ASSERT_NO_THROW(client->start());
- // Verify we have threads and run state is RUN.
+ // Verify we have threads and run state is RUNNING.
ASSERT_EQ(client->getThreadCount(), 3);
ASSERT_TRUE(client->getThreadIOService());
ASSERT_FALSE(client->getThreadIOService()->stopped());
- ASSERT_EQ(client->getRunState(), HttpThreadPool::RunState::RUN);
+ ASSERT_EQ(client->getRunState(), HttpThreadPool::RunState::RUNNING);
// Check convenience functions.
ASSERT_TRUE(client->isRunning());
ASSERT_FALSE(client->isPaused());
ASSERT_FALSE(client->isStopped());
- // Cannot start it twice.
- ASSERT_THROW_MSG(client->start(), InvalidOperation,
- "HttpThreadPool::start already started!");
+ // Second call to start should be harmless.
+ ASSERT_NO_THROW_LOG(client->start());
// Verify we didn't break it.
ASSERT_EQ(client->getThreadCount(), 3);
- ASSERT_EQ(client->getRunState(), HttpThreadPool::RunState::RUN);
+ ASSERT_EQ(client->getRunState(), HttpThreadPool::RunState::RUNNING);
// Make sure destruction doesn't throw.
ASSERT_NO_THROW_LOG(client.reset());
ASSERT_EQ(client->getThreadCount(), 3);
ASSERT_TRUE(client->getThreadIOService());
ASSERT_FALSE(client->getThreadIOService()->stopped());
- ASSERT_EQ(client->getRunState(), HttpThreadPool::RunState::RUN);
+ ASSERT_EQ(client->getRunState(), HttpThreadPool::RunState::RUNNING);
// Stop should succeed.
ASSERT_NO_THROW_LOG(client->stop());
ASSERT_EQ(client->getThreadCount(), 3);
ASSERT_TRUE(client->getThreadIOService());
ASSERT_FALSE(client->getThreadIOService()->stopped());
- ASSERT_EQ(client->getRunState(), HttpThreadPool::RunState::RUN);
+ ASSERT_EQ(client->getRunState(), HttpThreadPool::RunState::RUNNING);
// Make sure destruction doesn't throw.
ASSERT_NO_THROW_LOG(client.reset());
threadRequestAndReceive(num_threads, num_batches, num_listeners);
}
-// Verifies that we can cleanly work, pause, and resume repeatedly.
-TEST_F(MtHttpClientTest, workPauseResume) {
- size_t num_threads = 12;
- size_t num_batches = 12;
- size_t num_listeners = 12;
- size_t num_pauses = 7;
- threadRequestAndReceive(num_threads, num_batches, num_listeners, num_pauses);
-}
-
-// Verifies that we can cleanly pause and shutdown while doing
+// Verifies that we can cleanly pause, resume, and shutdown while doing
// multi-threaded work.
-TEST_F(MtHttpClientTest, workPauseShutdown) {
- size_t num_threads = 8;
- size_t num_batches = 8;
- size_t num_listeners = 8;
- workPauseShutdown(num_threads, num_batches, num_listeners);
+TEST_F(MtHttpClientTest, workPauseResumeShutdown) {
+ size_t num_threads = 4;
+ size_t num_batches = 4;
+ size_t num_listeners = 4;
+ size_t num_pauses = 3;
+ workPauseResumeShutdown(num_threads, num_batches, num_listeners, num_pauses);
}
} // end of anonymous namespace