From: Thomas Markwalder Date: Thu, 13 May 2021 18:55:14 +0000 (-0400) Subject: [#1818] Reimplemented HttpThreadPool internals X-Git-Tag: Kea-1.9.8~85 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=a2f4f6724c9e620cbd45cd3ab5c038cb5565ecab;p=thirdparty%2Fkea.git [#1818] Reimplemented HttpThreadPool internals HttpThreadPool state changes are now synchronous. src/lib/config/cmd_http_listener.cc Minor updates src/lib/config/tests/cmd_http_listener_unittests.cc Revamped to use simplified command handling for pause/resume testing. src/lib/http/client.* Minor updates src/lib/http/http_thread_pool.* HttpThreadPool::setRunState() is now synchronous, returning only when all threads in the pool are in appropriate state or have been destroyed. Did away with SHUTDOWN state. Changed RUN to RUNNING src/lib/http/tests/http_thread_pool_unittests.cc Revamped testing. src/lib/http/tests/mt_client_unittests.cc Revamped to use request handler handling for pause/resume testing --- diff --git a/src/lib/config/cmd_http_listener.cc b/src/lib/config/cmd_http_listener.cc index 0b2a5eba61..55cb2d856d 100644 --- a/src/lib/config/cmd_http_listener.cc +++ b/src/lib/config/cmd_http_listener.cc @@ -93,7 +93,7 @@ CmdHttpListener::pause() { void CmdHttpListener::resume() { if (threads_) { - threads_->resume(); + threads_->run(); } } @@ -122,7 +122,7 @@ CmdHttpListener::stop() { .arg(port_); } -HttpThreadPool::RunState +HttpThreadPool::RunState CmdHttpListener::getRunState() const { if (!threads_) { isc_throw(InvalidOperation, @@ -132,16 +132,16 @@ CmdHttpListener::getRunState() const { return (threads_->getRunState()); } -bool +bool CmdHttpListener::isRunning() { if (threads_) { - return (threads_->getRunState() == HttpThreadPool::RunState::RUN); + return (threads_->getRunState() == HttpThreadPool::RunState::RUNNING); } return (false); } -bool +bool CmdHttpListener::isStopped() { if (threads_) { return (threads_->getRunState() == HttpThreadPool::RunState::STOPPED); @@ -150,7 +150,7 @@ CmdHttpListener::isStopped() { return (true); } -bool +bool CmdHttpListener::isPaused() { if (threads_) { return (threads_->getRunState() == HttpThreadPool::RunState::PAUSED); diff --git a/src/lib/config/tests/cmd_http_listener_unittests.cc b/src/lib/config/tests/cmd_http_listener_unittests.cc index 8af2571057..f2e390fd01 100644 --- a/src/lib/config/tests/cmd_http_listener_unittests.cc +++ b/src/lib/config/tests/cmd_http_listener_unittests.cc @@ -180,31 +180,21 @@ public: /// /// @param num_pauses Desired number of times the listener should be /// paused during the test. - void runIOService(size_t num_pauses = 0) { + void runIOService(size_t request_limit = 0) { // Create a timer to use for invoking resume after pause. IntervalTimer pause_timer_(io_service_); paused_ = false; + if (!request_limit) { + request_limit = clients_.size(); + } + // Loop until the clients are done, an error occurs, or the time runs out. size_t num_done = 0; - while (num_done != clients_.size()) { + while (num_done != request_limit) { // Always call restart() before we call run(); io_service_.restart(); - if (shouldPause(num_pauses, num_done)) { - // Pause the listener . - paused_ = true; - ++pause_cnt_; - listener_->pause(); - - // Set timer to resume listener. - pause_timer_.setup( - [this]() { - listener_->resume(); - paused_ = false; - }, 10, IntervalTimer::ONE_SHOT); - } - // Run until a client stops the service. io_service_.run(); @@ -225,8 +215,10 @@ public: /// /// @return True if the listener should be paused. bool shouldPause(size_t num_pauses, size_t num_done) { - // True if the number of clients done is a multiple of the number of pauses. - return (!paused_ && num_pauses && num_done && !(num_done % num_pauses)); + size_t request_limit = (pause_cnt_ < num_pauses ? + (num_done + ((clients_.size() - num_done) / num_pauses)) + : clients_.size()); + return (request_limit); } /// @brief Create an HttpResponse from a response string. @@ -277,7 +269,7 @@ public: /// /// @return Returns response with map of arguments containing /// a string value 'thread-id': - ConstElementPtr threadCommandHandler(const std::string& /*command_name*/, + ConstElementPtr synchronizedCommandHandler(const std::string& /*command_name*/, const ConstElementPtr& command_arguments) { // If the number of in progress commands is less than the number // of threads, then wait here until we're notified. Otherwise, @@ -335,6 +327,36 @@ public: return (createAnswer(CONTROL_RESULT_SUCCESS, arguments)); } + /// @brief Handler for the 'thread' command. + /// + /// @param command_name Command name, i.e. 'thread'. + /// @param command_arguments Command arguments should contain + /// one string element, "client-ptr", whose value is the stringified + /// pointer to the client that issued the command. + /// + /// @return Returns response with map of arguments containing + /// a string value 'thread-id': + ConstElementPtr simpleCommandHandler(const std::string& /*command_name*/, + const ConstElementPtr& command_arguments) { + // Create the map of response arguments. + ElementPtr arguments = Element::createMap(); + // First we echo the client-ptr command argument. + ConstElementPtr client_ptr = command_arguments->get("client-ptr"); + if (!client_ptr) { + return (createAnswer(CONTROL_RESULT_ERROR, "missing client-ptr")); + } + + arguments->set("client-ptr", client_ptr); + + // Now we add the thread-id. + std::stringstream ss; + ss << std::this_thread::get_id(); + arguments->set("thread-id", Element::create(ss.str())); + + // We're done, ship it! + return (createAnswer(CONTROL_RESULT_SUCCESS, arguments)); + } + /// @brief Submits one or more thread commands to a CmdHttpListener. /// /// This function command will creates a CmdHttpListener @@ -354,10 +376,7 @@ public: /// thread command. Each client is used to carry out a single thread /// command request. Must be greater than 0 and a multiple of num_threads /// if it is greater than num_threads. - /// @param num_pauses Desired number of times the listener should be - /// paused during the test. - void threadListenAndRespond(size_t num_threads, size_t num_clients, - size_t num_pauses = 0) { + void threadListenAndRespond(size_t num_threads, size_t num_clients) { // First we makes sure the parameter rules apply. ASSERT_TRUE(num_threads > 0); ASSERT_TRUE(num_clients > 0); @@ -373,7 +392,7 @@ public: // Register the thread command handler. CommandMgr::instance().registerCommand("thread", std::bind(&CmdHttpListenerTest - ::threadCommandHandler, + ::synchronizedCommandHandler, this, ph::_1, ph::_2)); // Create a listener with prescribed number of threads. @@ -397,7 +416,7 @@ public: // Now we run the client-side IOService until all requests are done, // errors occur or the test times out. - ASSERT_NO_FATAL_FAILURE(runIOService(num_pauses)); + ASSERT_NO_FATAL_FAILURE(runIOService()); // Stop the listener and then verify it has stopped. ASSERT_NO_THROW_LOG(listener_->stop()); @@ -493,6 +512,176 @@ public: << "thread-id: " << it.first << ", clients: " << it.second << std::endl; } + } + + /// @brief Pauses and resumes a CmdHttpListener while it processes command + /// requests. + /// + /// This function command will creates a CmdHttpListener + /// with the given number of threads, initiates the given + /// number of clients, each requesting the "thread" command, + /// and then iteratively runs the test's IOService until all + /// the clients have received their responses or an error occurs. + /// It will pause and resume the listener at intervals governed + /// by the given number of pauses. + /// + /// @param num_threads - the number of threads the CmdHttpListener + /// should use. Must be greater than 0. + /// @param num_clients - the number of clients that should issue the + /// thread command. Each client is used to carry out a single thread + /// command request. Must be greater than 0. + /// @param num_pauses Desired number of times the listener should be + /// paused during the test. Must be greated than 0. + void workPauseAndResume(size_t num_threads, size_t num_clients, + size_t num_pauses) { + // First we makes sure the parameter rules apply. + ASSERT_TRUE(num_threads); + ASSERT_TRUE(num_clients); + ASSERT_TRUE(num_pauses); + num_threads_ = num_threads; + num_clients_ = num_clients; + + // Register the thread command handler. + CommandMgr::instance().registerCommand("thread", + std::bind(&CmdHttpListenerTest + ::simpleCommandHandler, + this, ph::_1, ph::_2)); + + // Create a listener with prescribed number of threads. + ASSERT_NO_THROW_LOG(listener_.reset(new CmdHttpListener(IOAddress(SERVER_ADDRESS), + SERVER_PORT, num_threads))); + ASSERT_TRUE(listener_); + + // Start it and verify it is running. + ASSERT_NO_THROW_LOG(listener_->start()); + ASSERT_TRUE(listener_->isRunning()); + EXPECT_EQ(listener_->getThreadCount(), num_threads); + + // Maps the number of clients served by a given thread-id. + std::map clients_per_thread; + + // Initiate the prescribed number of command requests. + num_in_progress_ = 0; + for (auto i = 0; clients_.size() < num_clients; ++i) { + ASSERT_NO_THROW_LOG(startThreadCommand()); + } + + // Now we run the client-side IOService until all requests are done, + // errors occur or the test times out. We'll pause and resume the + // given the number of pauses + size_t num_done = 0; + size_t total_requests = clients_.size(); + while (num_done < total_requests) { + // Calculate how many more requests to process before we pause again. + // We divide the number of oustanding requests by the number of pauses + // and stop after we've done at least that many more. + size_t request_limit = (pause_cnt_ < num_pauses ? + (num_done + ((total_requests - num_done) / num_pauses)) + : total_requests); + + // Run test IOService until we hit the limit. + runIOService(request_limit); + + // If we've done all our pauses we should be through. + if (pause_cnt_ == num_pauses) { + break; + } + + // Pause the client. + ASSERT_NO_THROW(listener_->pause()); + ASSERT_TRUE(listener_->isPaused()); + ++pause_cnt_; + + // Check our progress. + num_done = 0; + for (auto const& client : clients_) { + if (client->receiveDone()) { + ++num_done; + } + } + + // We should completed at least as many as our + // target limit. + ASSERT_GE(num_done, request_limit); + + // Resume the listener. + ASSERT_NO_THROW(listener_->resume()); + ASSERT_TRUE(listener_->isRunning()); + } + + // Stop the listener and then verify it has stopped. + ASSERT_NO_THROW_LOG(listener_->stop()); + ASSERT_TRUE(listener_->isStopped()); + EXPECT_EQ(listener_->getThreadCount(), 0); + + // Iterate over the clients, checking their outcomes. + size_t total_responses = 0; + for (auto const& client : clients_) { + // Client should have completed its receive successfully. + ASSERT_TRUE(client->receiveDone()); + + // Client response should not be empty. + HttpResponsePtr hr; + std::string response_str = client->getResponse(); + ASSERT_FALSE(response_str.empty()); + + // Parse the response into an HttpResponse. + ASSERT_NO_THROW_LOG(hr = parseResponse(client->getResponse())); + + // Now we walk the element tree to get the response data. It should look + // this: + // + // [ { + // "arguments": { "client-ptr": "xxxxx", + // "thread-id": "zzzzz" }, + // "result": 0 + // } ] + // + // First we turn it into an Element tree. + std::string body_str = hr->getBody(); + ConstElementPtr body; + ASSERT_NO_THROW_LOG(body = Element::fromJSON(hr->getBody())); + + // Outermost is a list, since we're emulating agent responses. + ASSERT_EQ(body->getType(), Element::list); + ASSERT_EQ(body->size(), 1); + + // Answer should be a map containing "arguments" and "results". + ConstElementPtr answer = body->get(0); + ASSERT_EQ(answer->getType(), Element::map); + + // "result" should be 0. + ConstElementPtr result = answer->get("result"); + ASSERT_TRUE(result); + ASSERT_EQ(result->getType(), Element::integer); + ASSERT_EQ(result->intValue(), 0); + + // "arguments" is a map containing "client-ptr" and "thread-id". + ConstElementPtr arguments = answer->get("arguments"); + ASSERT_TRUE(arguments); + ASSERT_EQ(arguments->getType(), Element::map); + + // "client-ptr" is a string. + ConstElementPtr client_ptr = arguments->get("client-ptr"); + ASSERT_TRUE(client_ptr); + ASSERT_EQ(client_ptr->getType(), Element::string); + + // "thread-id" is a string. + ConstElementPtr thread_id = arguments->get("thread-id"); + ASSERT_TRUE(thread_id); + ASSERT_EQ(thread_id->getType(), Element::string); + std::string thread_id_str = thread_id->stringValue(); + + // Make sure the response received was for this client. + std::stringstream ss; + ss << client; + ASSERT_EQ(client_ptr->stringValue(), ss.str()); + + ++total_responses; + } + + // We should have responses for all our clients. + EXPECT_EQ(total_responses, num_clients); // We should have had the expected number of pauses. if (!num_pauses) { @@ -599,7 +788,7 @@ TEST_F(CmdHttpListenerTest, basics) { EXPECT_EQ(listener_->getThreadCount(), 1); ASSERT_TRUE(listener_->getThreadIOService()); EXPECT_FALSE(listener_->getThreadIOService()->stopped()); - EXPECT_EQ(listener_->getRunState(), HttpThreadPool::RunState::RUN); + EXPECT_EQ(listener_->getRunState(), HttpThreadPool::RunState::RUNNING); // Trying to start it again should fail. ASSERT_THROW_MSG(listener_->start(), InvalidOperation, @@ -762,12 +951,13 @@ TEST_F(CmdHttpListenerTest, sixByEighteen) { threadListenAndRespond(num_threads, num_clients); } -// Pauses and resumes during work. +// Pauses and resumes the listener while it is processing +// requests. TEST_F(CmdHttpListenerTest, pauseAndResume) { size_t num_threads = 6; size_t num_clients = 18; size_t num_pauses = 3; - threadListenAndRespond(num_threads, num_clients, num_pauses); + workPauseAndResume(num_threads, num_clients, num_pauses); } } // end of anonymous namespace diff --git a/src/lib/http/client.cc b/src/lib/http/client.cc index 87ffb8898f..777355dcc4 100644 --- a/src/lib/http/client.cc +++ b/src/lib/http/client.cc @@ -1810,8 +1810,8 @@ public: isc_throw(InvalidOperation, "HttpClient::resume - no thread pool"); } - // Resume the thread pool. - threads_->resume(); + // Resume running the thread pool. + threads_->run(); } @@ -1840,11 +1840,11 @@ public: /// @brief Indicates if the thread pool processing is running. /// - /// @return True if the thread pool exists and is in the RUN state, + /// @return True if the thread pool exists and is in the RUNNING state, /// false otherwise. bool isRunning() { if (threads_) { - return (threads_->getRunState() == HttpThreadPool::RunState::RUN); + return (threads_->getRunState() == HttpThreadPool::RunState::RUNNING); } return (false); diff --git a/src/lib/http/http_thread_pool.cc b/src/lib/http/http_thread_pool.cc index d17c782f31..901a0aeb62 100644 --- a/src/lib/http/http_thread_pool.cc +++ b/src/lib/http/http_thread_pool.cc @@ -33,7 +33,8 @@ using namespace isc::util; HttpThreadPool::HttpThreadPool(IOServicePtr io_service, size_t pool_size, bool defer_start /* = false */) : pool_size_(pool_size), io_service_(io_service), - run_state_(RunState::STOPPED), mutex_(), cv_() { + run_state_(RunState::STOPPED), mutex_(), thread_cv_(), + main_cv_(), paused_(0), running_(0), exited_(0) { if (!pool_size) { isc_throw(BadValue, "HttpThreadPool::ctor pool_size must be > 0"); } @@ -45,131 +46,182 @@ HttpThreadPool::HttpThreadPool(IOServicePtr io_service, size_t pool_size, // If we're not deferring the start, do it now. if (!defer_start) { - start(); + run(); } } HttpThreadPool::~HttpThreadPool() { - if (getRunState() != RunState::STOPPED) { - // Stop if we aren't already stopped - stop(); + stop(); +} + +void +HttpThreadPool::run() { + setRunState(RunState::RUNNING); +} + +void +HttpThreadPool::pause() { + setRunState(RunState::PAUSED); +} + +void +HttpThreadPool::stop() { + setRunState(RunState::STOPPED); +} + +HttpThreadPool::RunState +HttpThreadPool::getRunState() { + std::lock_guard lck(mutex_); + return (run_state_); +} + +bool +HttpThreadPool::validateStateChange(RunState new_state) const { + bool is_valid = false; + switch(run_state_) { + case RunState::STOPPED: + is_valid = (new_state == RunState::RUNNING); + break; + case RunState::RUNNING: + is_valid = (new_state != RunState::RUNNING); + break; + case RunState::PAUSED: + is_valid = (new_state != RunState::PAUSED); + break; } + + return (is_valid); } void -HttpThreadPool::start() { - if (getRunState() != RunState::STOPPED) { - isc_throw(InvalidOperation, "HttpThreadPool::start already started!"); +HttpThreadPool::setRunState (RunState new_state) { + std::unique_lock main_lck(mutex_); + + // Bail if the transition is invalid. + if (!validateStateChange(new_state)) { + return; } - // Set state to RUN. - setRunState(RunState::RUN); + run_state_ = new_state; + // Notify threads of state change. + thread_cv_.notify_all(); - // Prep IOService for run() invocations. - io_service_->restart(); + switch(new_state) { + case RunState::RUNNING: { + // Restart the IOSerivce. + io_service_->restart(); - // Create a pool of threads, each calls run() on our - // io_service instance. - for (std::size_t i = 0; i < pool_size_; ++i) { - boost::shared_ptr thread(new std::thread( + // While we have fewer threads than we should, make more. + while (threads_.size() < pool_size_) { + boost::shared_ptr thread(new std::thread( [this]() { bool done = false; while (!done) { switch (getRunState()) { - case RunState::RUN: - io_service_->run(); + case RunState::RUNNING: { + { + std::unique_lock lck(mutex_); + running_++; + + // If We're all running notify main thread. + if (running_ == pool_size_) { + main_cv_.notify_all(); + } + } + + // Run the IOService. + io_service_->run(); + + { + std::unique_lock lck(mutex_); + running_--; + } + break; - case RunState::PAUSED: - { - // We need to stop and wait to be released. + } + + case RunState::PAUSED: { std::unique_lock lck(mutex_); - cv_.wait(lck, + paused_++; + + // If we're all paused notify main. + if (paused_ == threads_.size()) { + main_cv_.notify_all(); + } + + // Wait here till I'm released. + thread_cv_.wait(lck, [&]() { return (run_state_ != RunState::PAUSED); }); + + paused_--; break; } - case RunState::SHUTDOWN: - done = true; - break; - case RunState::STOPPED: - // This should never happen. + + case RunState::STOPPED: { done = true; break; - } + }} } - })); - - threads_.push_back(thread); - } -} - -void -HttpThreadPool::stop() { - if (getRunState() == RunState::STOPPED) { - // Nothing to do. - return; - } - // Set the state to SHUTDOWN. - setRunState(RunState::SHUTDOWN); + std::unique_lock lck(mutex_); + exited_++; + if (exited_ == threads_.size()) { + main_cv_.notify_all(); + } + })); - // Stop our IOService. - if (!io_service_->stopped()) { - // Flush canceled (and ready) handlers. - io_service_->poll(); + // Add thread to the pool. + threads_.push_back(thread); + } - // Stop the service - io_service_->stop(); - } + // Main thread waits here until all threads are running. + main_cv_.wait(main_lck, + [&]() { + return (running_ == threads_.size()); + }); - // Shutdown the threads. - for (auto const& thread : threads_) { - thread->join(); + exited_ = 0; + break; } - // Empty the thread pool. - threads_.clear(); + case RunState::PAUSED: { + // Stop IOService. + if (!io_service_->stopped()) { + io_service_->poll(); + io_service_->stop(); + } - // Set the state to STOPPED. - setRunState(RunState::STOPPED); -} + // Main thread waits here until all threads are paused. + main_cv_.wait(main_lck, + [&]() { + return (paused_ == threads_.size()); + }); -void -HttpThreadPool::pause() { - if (getRunState() != RunState::RUN) { - // Not running, can't pause. - return; + break; } - setRunState(RunState::PAUSED); - io_service_->stop(); -} - -void -HttpThreadPool::resume() { - if (getRunState() != RunState::PAUSED) { - // Not paused, can't resume. - return; - } - - io_service_->restart(); - setRunState(RunState::RUN); -} - -HttpThreadPool::RunState -HttpThreadPool::getRunState() { - std::lock_guard lck(mutex_); - return (run_state_); -} - -void -HttpThreadPool::setRunState(RunState state) { - { - std::lock_guard lck(mutex_); - run_state_ = state; - } - cv_.notify_all(); + case RunState::STOPPED: { + // Stop IOService. + if (!io_service_->stopped()) { + io_service_->poll(); + io_service_->stop(); + } + + // Main thread waits here until all threads have exited. + main_cv_.wait(main_lck, + [&]() { + return (exited_ == threads_.size()); + }); + + for (auto const& thread : threads_) { + thread->join(); + } + + threads_.clear(); + break; + }} } IOServicePtr diff --git a/src/lib/http/http_thread_pool.h b/src/lib/http/http_thread_pool.h index e937b2a4eb..d311468b17 100644 --- a/src/lib/http/http_thread_pool.h +++ b/src/lib/http/http_thread_pool.h @@ -26,68 +26,50 @@ public: /// @brief Describes the possible operational state of the pool. enum class RunState { STOPPED, /// Pool is not operational. - RUN, /// Pool is populated with running threads. - PAUSED, /// Pool is populated with threads which are paused. - SHUTDOWN, /// Pool is transitioning from RUN or PAUSED to STOPPED. + RUNNING, /// Pool is populated with running threads. + PAUSED, /// Pool is populated with threads that are paused. }; /// @brief Constructor /// /// @param io_service IOService that will drive the pool's IO. If empty, it /// create it's own instance. - /// @param pool_size Maximum number of threads in the pool. Currently the number - /// of threads is fixed at this value. - /// @param defer_start If true, creation of the threads is deferred until a subsequent - /// call to @ref start(). In this case the pool's operational state post construction - /// is STOPPED. If false, the constructor will invoke start() which will create the - /// threads, placing the pool in RUN state. - HttpThreadPool(asiolink::IOServicePtr io_service, size_t pool_size, bool defer_start = false); + /// @param pool_size Maximum number of threads in the pool. Currently the + /// number of threads is fixed at this value. + /// @param defer_start If true, creation of the threads is deferred until + /// a subsequent call to @ref start(). In this case the pool's operational + /// state post construction is STOPPED. If false, the constructor will + /// invoke run() to tranistion the pool into the RUNNING state. + HttpThreadPool(asiolink::IOServicePtr io_service, size_t pool_size, + bool defer_start = false); /// @brief Destructor /// /// Ensures the pool is stopped prior to destruction. ~HttpThreadPool(); - /// @brief Transitions the pool from STOPPED to RUN run state. + /// @brief Transitions the pool from STOPPED or PAUSED to RUNNING. /// - /// It starts the pool by doing the following: - /// -# Sets state to RUN - /// -# Restarts the IOService preparing it thread invocations of - /// IOService::run() - /// -# Creates thread_pool_size_ threads, adding each to the pool. + /// When called from the STOPPED state, the pool threads are created + /// begin processing events. + /// When called from the PAUSED state, the pool threads are released + /// from PAUSED and resume processing event.s + /// Has no effect if the pool is already in the RUNNING state. + void run(); + + /// @brief Transitions the pool from RUNNING to PAUSED state. /// - /// @throw InvalidOperation if called with the pool in any state other - /// than STOPPED. - void start(); - - /// @brief Transitions the pool to STOPPED state. - /// - /// It stops the pool by doing the following: - /// -# Sets the state to SHUTDOWN. - /// =# Stops the IOService. - /// =# Joins the pool threads. - /// -# Empties the pool. - /// -# Sets the state to STOPPED. - void stop(); - - /// @brief Transitions the pool from RUN to PAUSED state. - /// - /// If the state is any state other than RUN it simply returns, - /// otherwise it does the following: - /// - /// -# Sets the state to PAUSED. - /// -# Stops the IOService. + /// Pool threads suspend event processing and pause until they + /// are released to either resume running or stop. + /// Has no effect if the pool is already in the PAUSED or STOPPED + /// state. void pause(); - /// @brief Transitions the pool from PAUSED to RUN state. - /// - /// If the state is any state other than PAUSED it simply returns, - /// otherwise it does the following: + /// @brief Transitions the pool to from RUNNING OR PAUSED to STOPPED. /// - /// -# Restarts the IOService preparing it for thread invocations - /// of IOService::run() - /// -# Sets the state to RUN. - void resume(); + /// Stops thread event processing and then destroys the pool's threads + /// Has no effect if the pool is already in the STOPPED state. + void stop(); /// @brief Thread-safe fetch of the pool's operational state. /// @@ -95,14 +77,48 @@ public: RunState getRunState(); private: - /// @brief Thread-safe set of the pool's operational state. + /// @brief Thread-safe change of the pool's operational state. /// - /// @note This method does not validate the state change. + /// Transitions a pool from one state to another: + /// + /// When moving from STOPPED or PAUSED to RUNNING: + /// -# Sets state to RUNNING. + /// -# Notifies threads of state change. + /// -# Restarts the IOService. + /// -# Creates the threads if they do not yet exist (true only + /// when transitioning from STOPPED). + /// -# Waits until threads are running. + /// -# Returns to caller. + /// + /// When moving from RUNNING or PAUSED to STOPPED: + /// -# Sets state to STOPPED + /// -# Notifies threads of state change. + /// -# Polls the IOService to flush handlers. + /// -# Stops the IOService. + /// -# Waits until all threads have exited the work function. + /// -# Joins and destroys the threads. + /// -# Returns to caller. + /// + /// When moving from RUNNING to PAUSED: + /// -# Sets state to PAUSED + /// -# Notifies threads of state change. + /// -# Polls the IOService to flush handlers. + /// -# Stops the IOService. + /// -# Waits until all threads have paused. + /// -# Returns to caller. /// /// @param state new state for the pool. void setRunState(RunState state); + /// @brief Validates whether the pool can change to a given state. + /// + /// @param state new state for the pool. + /// @return true if the changs is valid, false otherwise. + /// @note Must be called from a thread-safe context. + bool validateStateChange(RunState state) const; + public: + /// @brief Fetches the IOService that drives the pool. /// /// @return A pointer to the IOService. @@ -131,12 +147,26 @@ private: /// @brief Mutex to protect the internal state. std::mutex mutex_; - /// @brief Condition variable for synchronization. - std::condition_variable cv_; + /// @brief Condition variable used by threads for synchronization. + std::condition_variable thread_cv_; + + /// @brief Condition variable used by main thread to wait on threads + /// state transitions. + std::condition_variable main_cv_; + + /// @brief Number of threads currently paused. + size_t paused_; + + /// @brief Number of threads currently running. + size_t running_; + + /// @brief Number of threads that have exited the work funcion. + size_t exited_; /// @brief Pool of threads used to service connections in multi-threaded /// mode. std::list > threads_; + }; /// @brief Defines a pointer to a thread pool. diff --git a/src/lib/http/tests/http_thread_pool_unittests.cc b/src/lib/http/tests/http_thread_pool_unittests.cc index 80bc392ccb..0f21d092ed 100644 --- a/src/lib/http/tests/http_thread_pool_unittests.cc +++ b/src/lib/http/tests/http_thread_pool_unittests.cc @@ -6,6 +6,7 @@ #include #include +#include #include #include #include @@ -20,15 +21,23 @@ using namespace isc::http; namespace { +/// @brief Test timeout (ms). +const long TEST_TIMEOUT = 10000; + +/// @brief Simple test fixture for testing HttpThreadPool. class HttpThreadPoolTest : public ::testing::Test { public: - HttpThreadPoolTest() : io_service_(new IOService()) { + /// @brief Constructor. + HttpThreadPoolTest() + : io_service_(new IOService()) { } - ~HttpThreadPoolTest() { + /// @brief Destructor. + virtual ~HttpThreadPoolTest() { io_service_->stop(); } + /// @brief IOService instance used by thread pools. IOServicePtr io_service_; }; @@ -41,138 +50,212 @@ TEST_F(HttpThreadPoolTest, invalidConstruction) { "HttpThreadPool::ctor pool_size must be > 0"); } +// Verifies that a pool can be created without starting it. TEST_F(HttpThreadPoolTest, deferredStartConstruction) { HttpThreadPoolPtr pool; ASSERT_NO_THROW_LOG(pool.reset(new HttpThreadPool(io_service_, 3, true))); + // State should be stopped. // Pool size should be 3 - // State should be stopped. // IOService should be there. - // IOService is new, so it should not stopped, + // IOService is new, so it should not be stopped, // No threads in the pool. + ASSERT_EQ(HttpThreadPool::RunState::STOPPED, pool->getRunState()); EXPECT_EQ(pool->getPoolSize(), 3); ASSERT_TRUE(pool->getIOService()); EXPECT_FALSE(pool->getIOService()->stopped()); - EXPECT_EQ(HttpThreadPool::RunState::STOPPED, pool->getRunState()); EXPECT_EQ(pool->getThreadCount(), 0); - // Stop should not throw. - ASSERT_NO_THROW_LOG(pool->stop()); + // Destructor should not throw. + ASSERT_NO_THROW_LOG(pool.reset()); +} - // Nothing should have changed. - EXPECT_EQ(HttpThreadPool::RunState::STOPPED, pool->getRunState()); - EXPECT_FALSE(pool->getIOService()->stopped()); - EXPECT_EQ(pool->getThreadCount(), 0); +// Verifies that a pool can be started within the constructor. +TEST_F(HttpThreadPoolTest, startDuringConstruction) { + HttpThreadPoolPtr pool; - // Start should not throw. - ASSERT_NO_THROW_LOG(pool->start()); + ASSERT_NO_THROW_LOG(pool.reset(new HttpThreadPool(io_service_, 3))); - // State should be RUN, IOService should not be stopped, - // and there should be 3 threads in the pool. + // Pool size should be 3, state should be RUNNING, IOService should + // set but not stopped, and we should have 3 threads in the pool. + ASSERT_EQ(HttpThreadPool::RunState::RUNNING, pool->getRunState()); EXPECT_EQ(pool->getPoolSize(), 3); - EXPECT_EQ(HttpThreadPool::RunState::RUN, pool->getRunState()); + ASSERT_TRUE(pool->getIOService()); EXPECT_FALSE(pool->getIOService()->stopped()); EXPECT_EQ(pool->getThreadCount(), 3); - // Stopping should not throw. - ASSERT_NO_THROW_LOG(pool->stop()); - - // State should be stopped, IOService should be stopped, and - // there should be no threads in the pool. - EXPECT_EQ(HttpThreadPool::RunState::STOPPED, pool->getRunState()); - EXPECT_TRUE(pool->getIOService()->stopped()); - EXPECT_EQ(pool->getThreadCount(), 0); - // Destructor should not throw. ASSERT_NO_THROW_LOG(pool.reset()); } -TEST_F(HttpThreadPoolTest, startDuringConstruction) { +// Verifies that pool can move from STOPPED to RUNNING. +TEST_F(HttpThreadPoolTest, stoppedToRunning) { HttpThreadPoolPtr pool; - ASSERT_NO_THROW_LOG(pool.reset(new HttpThreadPool(io_service_, 3))); + // Create a stopped pool. + ASSERT_NO_THROW_LOG(pool.reset(new HttpThreadPool(io_service_, 3, true))); + ASSERT_EQ(HttpThreadPool::RunState::STOPPED, pool->getRunState()); - // Pool size should be 3, state should be RUN, IOService should - // set but not stopped, and we should have 3 threads in the pool. - EXPECT_EQ(pool->getPoolSize(), 3); - EXPECT_EQ(HttpThreadPool::RunState::RUN, pool->getRunState()); - ASSERT_TRUE(pool->getIOService()); + // Call run from STOPPED. + ASSERT_NO_THROW_LOG(pool->run()); + + // State should be RUNNING, IOService should not be stopped, we should + // have 3 threads in the pool. + ASSERT_EQ(HttpThreadPool::RunState::RUNNING, pool->getRunState()); + EXPECT_FALSE(pool->getIOService()->stopped()); + EXPECT_EQ(pool->getThreadCount(), 3); + + // Calling run again should be harmless. + ASSERT_NO_THROW_LOG(pool->run()); + + // State should be RUNNING, IOService should not be stopped, we should + // have 3 threads in the pool. + ASSERT_EQ(HttpThreadPool::RunState::RUNNING, pool->getRunState()); EXPECT_FALSE(pool->getIOService()->stopped()); EXPECT_EQ(pool->getThreadCount(), 3); - // Starting again should throw. - ASSERT_THROW_MSG(pool->start(), InvalidOperation, - "HttpThreadPool::start already started!"); + // Destroying the pool should be fine. + ASSERT_NO_THROW_LOG(pool.reset()); +} + +// Verifies that pool can move from RUNNING to STOPPED. +TEST_F(HttpThreadPoolTest, runningToStopped) { + HttpThreadPoolPtr pool; + + // Create a running pool. + ASSERT_NO_THROW_LOG(pool.reset(new HttpThreadPool(io_service_, 3, false))); + ASSERT_EQ(HttpThreadPool::RunState::RUNNING, pool->getRunState()); - // Stop should not throw. + // Call stop. ASSERT_NO_THROW_LOG(pool->stop()); - // Pool size should be 3, state should STOPPED, IOService should - // be stopped, and there should be no threads in the pool. - EXPECT_EQ(pool->getPoolSize(), 3); - EXPECT_EQ(HttpThreadPool::RunState::STOPPED, pool->getRunState()); + // State should be STOPPED, IOService should be stopped, we should + // have 0 threads in the pool. + ASSERT_EQ(HttpThreadPool::RunState::STOPPED, pool->getRunState()); EXPECT_TRUE(pool->getIOService()->stopped()); EXPECT_EQ(pool->getThreadCount(), 0); - // Destructor should not throw. + // Calling stop again should be harmless. + ASSERT_NO_THROW_LOG(pool->stop()); + + // State should be STOPPED, IOService should be stopped, we should + // have 0 threads in the pool. + ASSERT_EQ(HttpThreadPool::RunState::STOPPED, pool->getRunState()); + EXPECT_TRUE(pool->getIOService()->stopped()); + EXPECT_EQ(pool->getThreadCount(), 0); + + // Destroying the pool should be fine. ASSERT_NO_THROW_LOG(pool.reset()); } -TEST_F(HttpThreadPoolTest, pauseAndResume) { +// Verifies that pool can move from RUNNING to PAUSED. +TEST_F(HttpThreadPoolTest, runningToPaused) { HttpThreadPoolPtr pool; - ASSERT_NO_THROW_LOG(pool.reset(new HttpThreadPool(io_service_, 3))); + // Create a running pool. + ASSERT_NO_THROW_LOG(pool.reset(new HttpThreadPool(io_service_, 3, false))); + ASSERT_EQ(HttpThreadPool::RunState::RUNNING, pool->getRunState()); - // State should be RUN, IOService not stopped, 3 threads in the pool. - EXPECT_EQ(HttpThreadPool::RunState::RUN, pool->getRunState()); - EXPECT_FALSE(pool->getIOService()->stopped()); - EXPECT_EQ(pool->getThreadCount(), 3); - - // Pause should not throw. + // Call pause from RUNNING. ASSERT_NO_THROW_LOG(pool->pause()); // State should be PAUSED, IOService should be stopped, we should - // still have 3 threads in the pool. - EXPECT_EQ(HttpThreadPool::RunState::PAUSED, pool->getRunState()); + // have 3 threads in the pool. + ASSERT_EQ(HttpThreadPool::RunState::PAUSED, pool->getRunState()); EXPECT_TRUE(pool->getIOService()->stopped()); EXPECT_EQ(pool->getThreadCount(), 3); - // Pausing again should be harmless. + // Calling pause again should be harmless. ASSERT_NO_THROW_LOG(pool->pause()); - // Nothing should have changed. - EXPECT_EQ(HttpThreadPool::RunState::PAUSED, pool->getRunState()); + // State should be PAUSED, IOService should be stopped, we should + // have 3 threads in the pool. + ASSERT_EQ(HttpThreadPool::RunState::PAUSED, pool->getRunState()); EXPECT_TRUE(pool->getIOService()->stopped()); EXPECT_EQ(pool->getThreadCount(), 3); - // Resume should not throw. - ASSERT_NO_THROW_LOG(pool->resume()); + // Destroying the pool should be fine. + ASSERT_NO_THROW_LOG(pool.reset()); +} - // State should be PAUSED, IOService should be stopped, we should - // still have 3 threads in the pool. - EXPECT_EQ(HttpThreadPool::RunState::RUN, pool->getRunState()); - EXPECT_FALSE(pool->getIOService()->stopped()); - EXPECT_EQ(pool->getThreadCount(), 3); +// Verifies that pool can move from PAUSED to RUNNING. +TEST_F(HttpThreadPoolTest, pausedToRunning) { + HttpThreadPoolPtr pool; + + // Create a running pool. + ASSERT_NO_THROW_LOG(pool.reset(new HttpThreadPool(io_service_, 3, false))); + ASSERT_EQ(HttpThreadPool::RunState::RUNNING, pool->getRunState()); - // Resuming again should be harmless. - ASSERT_NO_THROW_LOG(pool->resume()); + // Call pause from RUNNING. + ASSERT_NO_THROW_LOG(pool->pause()); + ASSERT_EQ(HttpThreadPool::RunState::PAUSED, pool->getRunState()); - // Nothing should have changed. - EXPECT_EQ(HttpThreadPool::RunState::RUN, pool->getRunState()); + // Call run. + ASSERT_NO_THROW_LOG(pool->run()); + + // State should be RUNNING, IOService should not be stopped, we should + // have 3 threads in the pool. + ASSERT_EQ(HttpThreadPool::RunState::RUNNING, pool->getRunState()); EXPECT_FALSE(pool->getIOService()->stopped()); EXPECT_EQ(pool->getThreadCount(), 3); - // Stop should not throw. + // Destroying the pool should be fine. + ASSERT_NO_THROW_LOG(pool.reset()); +} + +// Verifies that pool can move from PAUSED to STOPPED. +TEST_F(HttpThreadPoolTest, pausedToStopped) { + HttpThreadPoolPtr pool; + + // Create a running pool. + ASSERT_NO_THROW_LOG(pool.reset(new HttpThreadPool(io_service_, 3, false))); + ASSERT_EQ(HttpThreadPool::RunState::RUNNING, pool->getRunState()); + + // Call pause from RUNNING. + ASSERT_NO_THROW_LOG(pool->pause()); + ASSERT_EQ(HttpThreadPool::RunState::PAUSED, pool->getRunState()); + + // Call stop. ASSERT_NO_THROW_LOG(pool->stop()); - // State should STOPPED, IOService should be stopped, - // and there should be no threads in the pool. - EXPECT_EQ(HttpThreadPool::RunState::STOPPED, pool->getRunState()); + // State should be STOPPED, IOService should be stopped, we should + // have 0 threads in the pool. + ASSERT_EQ(HttpThreadPool::RunState::STOPPED, pool->getRunState()); EXPECT_TRUE(pool->getIOService()->stopped()); EXPECT_EQ(pool->getThreadCount(), 0); - // Destructor should not throw. + // Destroying the pool should be fine. + ASSERT_NO_THROW_LOG(pool.reset()); +} + +// Verifies that attempting to pause a STOPPED pool has no effect. +TEST_F(HttpThreadPoolTest, stoppedToPaused) { + HttpThreadPoolPtr pool; + + // Create a stopped pool. + ASSERT_NO_THROW_LOG(pool.reset(new HttpThreadPool(io_service_, 3, true))); + ASSERT_EQ(HttpThreadPool::RunState::STOPPED, pool->getRunState()); + + // State should be STOPPED, IOService won't be stopped because it was + // never started. We should have 0 threads in the pool. + ASSERT_EQ(HttpThreadPool::RunState::STOPPED, pool->getRunState()); + EXPECT_FALSE(pool->getIOService()->stopped()); + EXPECT_EQ(pool->getThreadCount(), 0); + + // Call pause from STOPPED. + ASSERT_NO_THROW_LOG(pool->pause()); + + // Should have no effect. + ASSERT_EQ(HttpThreadPool::RunState::STOPPED, pool->getRunState()); + + // State should be STOPPED, IOService won't be stopped because it was + // never started. We should have 0 threads in the pool. + ASSERT_EQ(HttpThreadPool::RunState::STOPPED, pool->getRunState()); + EXPECT_FALSE(pool->getIOService()->stopped()); + EXPECT_EQ(pool->getThreadCount(), 0); + + // Destroying the pool should be fine. ASSERT_NO_THROW_LOG(pool.reset()); } diff --git a/src/lib/http/tests/mt_client_unittests.cc b/src/lib/http/tests/mt_client_unittests.cc index de69cde013..ab313e81d1 100644 --- a/src/lib/http/tests/mt_client_unittests.cc +++ b/src/lib/http/tests/mt_client_unittests.cc @@ -199,7 +199,7 @@ public: : io_service_(), client_(), listener_(), factory_(), listeners_(), factories_(), test_timer_(io_service_), num_threads_(0), num_batches_(0), num_listeners_(0), expected_requests_(0), num_in_progress_(0), num_finished_(0), paused_(false), - pause_cnt_(0), shutdown_(false) { + pause_cnt_(0) { test_timer_.setup(std::bind(&MtHttpClientTest::timeoutHandler, this, true), TEST_TIMEOUT, IntervalTimer::ONE_SHOT); MultiThreadingMgr::instance().setMode(true); @@ -209,7 +209,7 @@ public: ~MtHttpClientTest() { // Stop the client. if (client_) { - stopTestClient(); + client_->stop(); } // Stop all listeners. @@ -234,11 +234,10 @@ public: /// @brief Runs the test's IOService until the desired number of requests /// have been carried out or the test fails. - void runIOService() { - // Loop until the clients are done, an error occurs, or the time runs out. - while (getRRCount() < expected_requests_) { + void runIOService(size_t request_limit) { + while (getRRCount() < request_limit) { // Always call reset() before we call run(); - io_service_.get_io_service().reset(); + io_service_.restart(); // Run until a client stops the service. io_service_.run(); @@ -314,7 +313,7 @@ public: } else { // I'm ready but others aren't wait here. bool ret = test_cv_.wait_for(lck, std::chrono::seconds(10), - [&]() { return (num_in_progress_ == num_threads_ || shutdown_); }); + [&]() { return (num_in_progress_ == num_threads_); }); if (!ret) { ADD_FAILURE() << "clients failed to start work"; } @@ -345,7 +344,7 @@ public: } else { // I'm done but others aren't wait here. bool ret = test_cv_.wait_for(lck, std::chrono::seconds(10), - [&]() { return (num_finished_ == num_threads_ || shutdown_); }); + [&]() { return (num_finished_ == num_threads_); }); if (!ret) { ADD_FAILURE() << "clients failed to finish work"; } @@ -354,6 +353,58 @@ public: })); } + + /// @brief Initiates a single HTTP request. + /// + /// Constructs an HTTP post whose body is a JSON map containing a + /// single integer element, "sequence". + /// + /// The request completion handler simply constructs the response, + /// and adds it the list of completed request/responses. If the + /// number of completed requests has reached the expected number + /// it stops the test IOService. + /// + /// @param sequence value for the integer element, "sequence", + /// to send in the request. + void startRequestSimple(int sequence, int port_offset = 0) { + // Create the URL on which the server can be reached. + std::stringstream ss; + ss << "http://" << SERVER_ADDRESS << ":" << (SERVER_PORT + port_offset); + Url url(ss.str()); + + // Initiate request to the server. + PostHttpRequestJsonPtr request_json = createRequest("sequence", sequence); + HttpResponseJsonPtr response_json = boost::make_shared(); + ASSERT_NO_THROW(client_->asyncSendRequest(url, TlsContextPtr(), + request_json, response_json, + [this, request_json, response_json](const boost::system::error_code& ec, + const HttpResponsePtr&, + const std::string&) { + // Bail on an error. + ASSERT_FALSE(ec) << "asyncSendRequest failed, ec: " << ec; + + // Get stringified thread-id. + std::stringstream ss; + ss << std::this_thread::get_id(); + + // Create the ClientRR. + ClientRRPtr clientRR(new ClientRR()); + clientRR->thread_id_ = ss.str(); + clientRR->request_ = request_json; + clientRR->response_ = response_json; + + { + std::unique_lock lck(test_mutex_); + clientRRs_.push_back(clientRR); + ++num_finished_; + if ((num_finished_ >= expected_requests_) && !io_service_.stopped()) { + io_service_.stop(); + } + } + + })); + } + /// @brief Carries out HTTP requests via HttpClient to HTTP listener(s). /// /// This function creates one HttpClient with the given number @@ -363,10 +414,7 @@ public: /// /// Then it iteratively runs the test's IOService until all /// the requests have been responded to, an error occurs, or the - /// test times out. During each pass through the run loop, the - /// a call to shouldPause() is made to determine if the client - /// thread pool should be paused. If so, the the pool is paused - /// and an timer begun which resumes the pool upon timeout. + /// test times out. /// /// Each request carries a single integer element, "sequence", which /// uniquely identifies the request. Each response is expected to @@ -388,11 +436,8 @@ public: /// conducted. /// @param num_listeners number of HttpListeners to create. Defaults /// to 1. - /// @param num_pauses number of times to pause and resume the client - /// during the test. Defaults to 0. void threadRequestAndReceive(size_t num_threads, size_t num_batches, - size_t num_listeners = 1, - size_t num_pauses = 0) { + size_t num_listeners = 1) { ASSERT_TRUE(num_batches); ASSERT_TRUE(num_listeners); num_threads_ = num_threads; @@ -449,35 +494,17 @@ public: } } - // Create a timer to use for invoking resume after pause. - IntervalTimer pause_timer_(io_service_); - paused_ = false; - // Loop until the clients are done, an error occurs, or the time runs out. while (getRRCount() < expected_requests_) { // Always call restart() before we call run(); io_service_.restart(); - if (shouldPause(num_pauses)) { - // Pause client. - paused_ = true; - ++pause_cnt_; - client_->pause(); - - // Set timer to resume client. - pause_timer_.setup( - [this]() { - client_->resume(); - paused_ = false; - }, 10, IntervalTimer::ONE_SHOT); - } - // Run until a client stops the service. io_service_.run(); } // Client should stop without issue. - stopTestClient(); + ASSERT_NO_THROW(client_->stop()); // Listeners should stop without issue. for (const auto& listener : listeners_) { @@ -487,21 +514,6 @@ public: // We should have a response for each request. ASSERT_EQ(getRRCount(), expected_requests_); - // We should have had the expected number of pauses. - if (!num_pauses) { - ASSERT_EQ(pause_cnt_, 0); - } else { - // We allow a range on pauses of +-1. Figuring - // out the exact intervals at which to pause was - // getting to be a pain. We don't really care as - // long as we're close. The primary thing is that - // we did in fact pause and resume. - ASSERT_TRUE((num_pauses - 1) <= pause_cnt_ && - (pause_cnt_ <= (num_pauses + 1))) - << " num+_pauses: " << num_pauses - << ", pause_cnt_" << pause_cnt_; - } - // Create a map to track number of responses for each client thread. std::map responses_per_thread; @@ -585,65 +597,30 @@ public: } } - /// @brief Indicates if the test should pause. - /// - /// Returns true if the number of completed requests - /// has reached or exceeded the next pause interval. - /// The pause interval is given by expected number of - /// requests divided by the desired number of pauses. - /// - /// @param num_pauses Desired number of pauses. - /// - /// @return True if the client should be paused. - bool shouldPause(size_t num_pauses) { - size_t rr_count = getRRCount(); - if (paused_ || !num_pauses || !rr_count) { - return false; - } - - size_t interval = expected_requests_ / num_pauses; - size_t next_stop = interval * (pause_cnt_ + 1); - return (rr_count >= next_stop); - } - - /// @brief Stops the test client. - /// - /// Sets the shutdown flag and pings the test condition variable, - /// and then stops the thread pool. - void stopTestClient() { - // Set shutdown_ flag and notify any handles that may be waiting. - shutdown_ = true; - test_cv_.notify_all(); - - // Client should stop without issue. - ASSERT_NO_THROW(client_->stop()); - } - - /// @brief Verifies the client can be paused and shutdown while doing work. + /// @brief Verifies the client can be paused and resumed repeatedly + /// while doing multi-threaded doing work. /// /// @param num_threads number of threads the HttpClient should use. - /// A value of 0 puts the HttpClient in single-threaded mode. + /// Must be greater than zero, this test does not make sense for a + /// single threaded client. /// @param num_batches number of batches of requests that should be /// conducted. /// @param num_listeners number of HttpListeners to create. Defaults /// to 1. - void workPauseShutdown(size_t num_threads, size_t num_batches, - size_t num_listeners = 1, bool pause_first = true) { + /// @param num_pauses number of pauses to conduct. + void workPauseResumeShutdown(size_t num_threads, size_t num_batches, + size_t num_listeners, size_t num_pauses) { + ASSERT_TRUE(num_threads); ASSERT_TRUE(num_batches); ASSERT_TRUE(num_listeners); num_threads_ = num_threads; num_batches_ = num_batches; num_listeners_ = num_listeners; - // Client in ST is, in effect, 1 thread. - size_t effective_threads = (num_threads_ == 0 ? 1 : num_threads_); - - // Calculate the maximum requests that could complete. - size_t maximum_requests = (num_batches_ * num_listeners_ * effective_threads); - - // Calculate the expected number of requests. - expected_requests_ = maximum_requests / 2; + // Calculate the total expected number of requests. + size_t total_requests = (num_batches_ * num_listeners_ * num_threads_); + // Create the listeners. for (auto i = 0; i < num_listeners_; ++i) { // Make a factory HttpResponseCreatorFactoryPtr factory(new TestHttpResponseCreatorFactory(SERVER_PORT + i)); @@ -661,23 +638,15 @@ public: ASSERT_NO_THROW(listener->start()); } - // Create an MT client with num_threads + // Create an instant start, MT client with num_threads ASSERT_NO_THROW_LOG(client_.reset(new HttpClient(io_service_, num_threads))); ASSERT_TRUE(client_); - // Check convenience functions. + // Client shoudl be running. Check convenience functions. ASSERT_TRUE(client_->isRunning()); ASSERT_FALSE(client_->isPaused()); ASSERT_FALSE(client_->isStopped()); - if (num_threads_ == 0) { - // If we single-threaded client should not have it's own IOService. - ASSERT_FALSE(client_->getThreadIOService()); - } else { - // If we multi-threaded client should have it's own IOService. - ASSERT_TRUE(client_->getThreadIOService()); - } - // Verify the pool size and number of threads are as expected. ASSERT_EQ(client_->getThreadPoolSize(), num_threads); ASSERT_EQ(client_->getThreadCount(), num_threads); @@ -687,55 +656,65 @@ public: int sequence = 0; for (auto b = 0; b < num_batches; ++b) { for (auto l = 0; l < num_listeners_; ++l) { - for (auto t = 0; t < effective_threads; ++t) { - startRequest(++sequence, l); + for (auto t = 0; t < num_threads_; ++t) { + startRequestSimple(++sequence, l); } } } - // Loop until the 1/2 the requests are done, an error occurs, - // or the time runs out. size_t rr_count = 0; - while (rr_count < (expected_requests_)) { - // Always call reset() before we call run(); - io_service_.get_io_service().reset(); + while (rr_count < total_requests) { + size_t request_limit = (pause_cnt_ < num_pauses ? + (rr_count + ((total_requests - rr_count) / num_pauses)) + : total_requests); - // Run until a client stops the service. - io_service_.run(); - rr_count = getRRCount(); - } + // Run test IOService until we hit the limit. + runIOService(request_limit); + + // If we've done all our pauses we should be through. + if (pause_cnt_ == num_pauses) { + break; + } - if (pause_first) { // Pause the client. ASSERT_NO_THROW(client_->pause()); - ASSERT_EQ(HttpThreadPool::RunState::PAUSED, client_->getRunState()); - - // Check convenience functions. - ASSERT_FALSE(client_->isRunning()); ASSERT_TRUE(client_->isPaused()); - ASSERT_FALSE(client_->isStopped()); - } + ++pause_cnt_; - // We should have completed at least the expected number of requests - // but less than the maximum number of requests. - ASSERT_GE(getRRCount(), expected_requests_ ); - ASSERT_LT(getRRCount(), maximum_requests); + // Check our progress. + rr_count = getRRCount(); + ASSERT_GE(rr_count, request_limit); + + // Resume the client. + ASSERT_NO_THROW(client_->resume()); + ASSERT_TRUE(client_->isRunning()); + } // Client should stop without issue. - stopTestClient(); + ASSERT_NO_THROW(client_->stop()); + ASSERT_TRUE(client_->isStopped()); + + // We should have finished all our requests. + ASSERT_EQ(getRRCount(), total_requests); + + // Stopping again should be harmless. + ASSERT_NO_THROW(client_->stop()); // Listeners should stop without issue. for (const auto& listener : listeners_) { ASSERT_NO_THROW(listener->stop()); } + + // Destructor should work fine. + client_.reset(); } /// @brief Fetch the number of completed requests. /// /// @return number of completed requests. size_t getRRCount() { - std::unique_lock lck(test_mutex_); - return(clientRRs_.size()); + std::lock_guard lck(test_mutex_); + return (clientRRs_.size()); } /// @brief IO service used in the tests. @@ -832,7 +811,7 @@ TEST_F(MtHttpClientTest, basics) { ASSERT_TRUE(client->getThreadIOService()); ASSERT_EQ(client->getThreadPoolSize(), 3); ASSERT_EQ(client->getThreadCount(), 3); - ASSERT_EQ(client->getRunState(), HttpThreadPool::RunState::RUN); + ASSERT_EQ(client->getRunState(), HttpThreadPool::RunState::RUNNING); // Check convenience functions. ASSERT_TRUE(client->isRunning()); @@ -890,24 +869,23 @@ TEST_F(MtHttpClientTest, deferredStart) { // We should be able to start it. ASSERT_NO_THROW(client->start()); - // Verify we have threads and run state is RUN. + // Verify we have threads and run state is RUNNING. ASSERT_EQ(client->getThreadCount(), 3); ASSERT_TRUE(client->getThreadIOService()); ASSERT_FALSE(client->getThreadIOService()->stopped()); - ASSERT_EQ(client->getRunState(), HttpThreadPool::RunState::RUN); + ASSERT_EQ(client->getRunState(), HttpThreadPool::RunState::RUNNING); // Check convenience functions. ASSERT_TRUE(client->isRunning()); ASSERT_FALSE(client->isPaused()); ASSERT_FALSE(client->isStopped()); - // Cannot start it twice. - ASSERT_THROW_MSG(client->start(), InvalidOperation, - "HttpThreadPool::start already started!"); + // Second call to start should be harmless. + ASSERT_NO_THROW_LOG(client->start()); // Verify we didn't break it. ASSERT_EQ(client->getThreadCount(), 3); - ASSERT_EQ(client->getRunState(), HttpThreadPool::RunState::RUN); + ASSERT_EQ(client->getRunState(), HttpThreadPool::RunState::RUNNING); // Make sure destruction doesn't throw. ASSERT_NO_THROW_LOG(client.reset()); @@ -927,7 +905,7 @@ TEST_F(MtHttpClientTest, restartAfterStop) { ASSERT_EQ(client->getThreadCount(), 3); ASSERT_TRUE(client->getThreadIOService()); ASSERT_FALSE(client->getThreadIOService()->stopped()); - ASSERT_EQ(client->getRunState(), HttpThreadPool::RunState::RUN); + ASSERT_EQ(client->getRunState(), HttpThreadPool::RunState::RUNNING); // Stop should succeed. ASSERT_NO_THROW_LOG(client->stop()); @@ -943,7 +921,7 @@ TEST_F(MtHttpClientTest, restartAfterStop) { ASSERT_EQ(client->getThreadCount(), 3); ASSERT_TRUE(client->getThreadIOService()); ASSERT_FALSE(client->getThreadIOService()->stopped()); - ASSERT_EQ(client->getRunState(), HttpThreadPool::RunState::RUN); + ASSERT_EQ(client->getRunState(), HttpThreadPool::RunState::RUNNING); // Make sure destruction doesn't throw. ASSERT_NO_THROW_LOG(client.reset()); @@ -1004,22 +982,14 @@ TEST_F(MtHttpClientTest, fourByFourByTwo) { threadRequestAndReceive(num_threads, num_batches, num_listeners); } -// Verifies that we can cleanly work, pause, and resume repeatedly. -TEST_F(MtHttpClientTest, workPauseResume) { - size_t num_threads = 12; - size_t num_batches = 12; - size_t num_listeners = 12; - size_t num_pauses = 7; - threadRequestAndReceive(num_threads, num_batches, num_listeners, num_pauses); -} - -// Verifies that we can cleanly pause and shutdown while doing +// Verifies that we can cleanly pause, resume, and shutdown while doing // multi-threaded work. -TEST_F(MtHttpClientTest, workPauseShutdown) { - size_t num_threads = 8; - size_t num_batches = 8; - size_t num_listeners = 8; - workPauseShutdown(num_threads, num_batches, num_listeners); +TEST_F(MtHttpClientTest, workPauseResumeShutdown) { + size_t num_threads = 4; + size_t num_batches = 4; + size_t num_listeners = 4; + size_t num_pauses = 3; + workPauseResumeShutdown(num_threads, num_batches, num_listeners, num_pauses); } } // end of anonymous namespace