From: Thomas Markwalder Date: Tue, 4 May 2021 14:29:40 +0000 (-0400) Subject: [#1818] Pausable HttpThreadPool initial implemenation X-Git-Tag: Kea-1.9.8~93 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=1e683dde86c61de35373ac05795d4624dbd875da;p=thirdparty%2Fkea.git [#1818] Pausable HttpThreadPool initial implemenation src/lib/asiolink/io_service.* IOServcie::stopped() IOServcie::restart() - new wrapper methods src/lib/http/http_thread_pool.* New files that implement HttpThreadPool class src/lib/config/cmd_http_listener.* CmdHttpListner - retooled to use HttpThreadPool Added pause() and resume() methods src/lib/config/tests/cmd_http_listener_unittests.cc Retooled and added unit tests for pause/resume src/lib/http/Makefile.am Added http_thread_pool.* src/lib/http/client.cc CmdHttpListner - retooled to use HttpThreadPool Added pause() and resume() methods src/lib/http/tests/Makefile.am Added http_thread_pool_unittests.cc src/lib/http/tests/mt_client_unittests.cc Retooled and added unit tests for pause/resume --- diff --git a/src/lib/asiolink/io_service.cc b/src/lib/asiolink/io_service.cc index 0574c5f866..927a1b1ba9 100644 --- a/src/lib/asiolink/io_service.cc +++ b/src/lib/asiolink/io_service.cc @@ -84,6 +84,18 @@ public: /// This will return the control to the caller of the \c run() method. void stop() { io_service_.stop();} ; + /// \brief Indicates if the IOService has been stopped. + /// + /// \return true if the IOService has been stopped, false otherwise. + bool stopped() const { + return (io_service_.stopped()); + } + + /// \brief Restarts the IOService in preparation for a subsequent \c run() invocation. + void restart() { + io_service_.reset(); + } + /// \brief Removes IO service work object to let it finish running /// when all handlers have been invoked. void stopWork() { @@ -136,6 +148,16 @@ IOService::stop() { io_impl_->stop(); } +bool +IOService::stopped() const { + return (io_impl_->stopped()); +} + +void +IOService::restart() { + io_impl_->restart(); +} + void IOService::stopWork() { io_impl_->stopWork(); diff --git a/src/lib/asiolink/io_service.h b/src/lib/asiolink/io_service.h index eeb668e16b..4bcedf1cb7 100644 --- a/src/lib/asiolink/io_service.h +++ b/src/lib/asiolink/io_service.h @@ -71,6 +71,14 @@ public: /// This will return the control to the caller of the \c run() method. void stop(); + /// \brief Indicates if the IOService has been stopped. + /// + /// \return true if the IOService has been stopped, false otherwise. + bool stopped() const; + + /// \brief Restarts the IOService in preparation for a subsequent \c run() invocation. + void restart(); + /// \brief Removes IO service work object to let it finish running /// when all handlers have been invoked. void stopWork(); diff --git a/src/lib/config/cmd_http_listener.cc b/src/lib/config/cmd_http_listener.cc index f3ff7bd2a8..b850e7aa2e 100644 --- a/src/lib/config/cmd_http_listener.cc +++ b/src/lib/config/cmd_http_listener.cc @@ -66,12 +66,8 @@ CmdHttpListener::start() { HttpListener::RequestTimeout(TIMEOUT_AGENT_RECEIVE_COMMAND), HttpListener::IdleTimeout(TIMEOUT_AGENT_IDLE_CONNECTION_TIMEOUT))); - // Create a pool of threads, each calls run on our IOService_service instance. - for (std::size_t i = 0; i < thread_pool_size_; ++i) { - boost::shared_ptr thread(new std::thread( - std::bind(&IOService::run, io_service_))); - threads_.push_back(thread); - } + // Create the thread pool. + threads_.reset(new HttpThreadPool(io_service_, thread_pool_size_, false)); // Instruct the HTTP listener to actually open socket, install // callback and start listening. @@ -87,6 +83,20 @@ CmdHttpListener::start() { } } +void +CmdHttpListener::pause() { + if (threads_) { + threads_->pause(); + } +} + +void +CmdHttpListener::resume() { + if (threads_) { + threads_->resume(); + } +} + void CmdHttpListener::stop() { // Nothing to do. @@ -98,15 +108,8 @@ CmdHttpListener::stop() { .arg(address_) .arg(port_); - // Stop the IOService first. - io_service_->stop(); - - // Stop the threads next. - for (auto const& thread : threads_) { - thread->join(); - } - - threads_.clear(); + // Stop the thread pool. + threads_->stop(); // Get rid of the listener. http_listener_.reset(); @@ -119,6 +122,16 @@ CmdHttpListener::stop() { .arg(port_); } +HttpThreadPool::RunState +CmdHttpListener::getRunState() const { + if (!threads_) { + isc_throw(InvalidOperation, + "CmdHttpListener::getRunState - no thread pool!"); + } + + return (threads_->getRunState()); +} + bool CmdHttpListener::isListening() const { // If we have a listener we're listening. diff --git a/src/lib/config/cmd_http_listener.h b/src/lib/config/cmd_http_listener.h index 3929471662..e7fd4428a3 100644 --- a/src/lib/config/cmd_http_listener.h +++ b/src/lib/config/cmd_http_listener.h @@ -10,6 +10,7 @@ #include #include #include +#include #include #include @@ -37,12 +38,20 @@ public: /// @brief Destructor virtual ~CmdHttpListener(); - /// @brief Initiates the listener's worker thread. + /// @brief Starts running the listener's thread pool. void start(); - /// @brief Stops the listener's worker thread. + /// @brief Pauses the listener's thread pool. + void pause(); + + /// @brief Resumes running the listener's thread pool. + void resume(); + + /// @brief Stops the listener's thread pool. void stop(); + http::HttpThreadPool::RunState getRunState() const; + /// @brief Checks if we are listening to the HTTP requests. /// /// @return true if we are listening. @@ -73,7 +82,11 @@ public: /// /// @return uint16_t containing the number of running threads. uint16_t getThreadCount() { - return (threads_.size()); + if (!threads_) { + return (0); + } + + return (threads_->getThreadCount()); } private: @@ -93,7 +106,7 @@ private: std::size_t thread_pool_size_; /// @brief The pool of threads that do IO work. - std::vector > threads_; + http::HttpThreadPoolPtr threads_; }; /// @brief Defines a shared pointer to CmdHttpListener. diff --git a/src/lib/config/tests/cmd_http_listener_unittests.cc b/src/lib/config/tests/cmd_http_listener_unittests.cc index 3d0f1e57f4..7f2bb8c141 100644 --- a/src/lib/config/tests/cmd_http_listener_unittests.cc +++ b/src/lib/config/tests/cmd_http_listener_unittests.cc @@ -52,9 +52,9 @@ public: /// Starts test timer which detects timeouts, deregisters all commands /// from CommandMgr, and enables multi-threading mode. CmdHttpListenerTest() - : io_service_(), test_timer_(io_service_), run_io_service_timer_(io_service_), + : listener_(), io_service_(), test_timer_(io_service_), run_io_service_timer_(io_service_), clients_(), num_threads_(), num_clients_(), num_in_progress_(0), num_finished_(0), - chunk_size_(0) { + chunk_size_(0), paused_(false), pause_cnt_(0) { test_timer_.setup(std::bind(&CmdHttpListenerTest::timeoutHandler, this, true), TEST_TIMEOUT, IntervalTimer::ONE_SHOT); @@ -69,6 +69,9 @@ public: /// /// Removes HTTP clients, unregisters commands, disables MT. virtual ~CmdHttpListenerTest() { + // Wipe out the listener. + listener_.reset(); + // Destroy all remaining clients. for (auto const& client : clients_) { client->close(); @@ -175,29 +178,57 @@ public: /// because the test clients stop the io_service when they're /// through with a request. /// - /// @param timeout Optional value specifying for how long the io service - /// should be ran. - void runIOService() { + /// @param num_pauses Desired number of times the listener should be + /// paused during the test. + void runIOService(size_t num_pauses = 0) { + // Create a timer to use for invoking resume after pause. + IntervalTimer pause_timer_(io_service_); + paused_ = false; + // Loop until the clients are done, an error occurs, or the time runs out. - bool keep_going = true; - while (keep_going) { - // Always call reset() before we call run(); - io_service_.get_io_service().reset(); + size_t num_done = 0; + while (num_done != clients_.size()) { + // Always call restart() before we call run(); + io_service_.restart(); + + if (shouldPause(num_pauses, num_done)) { + // Pause the listener . + paused_ = true; + ++pause_cnt_; + listener_->pause(); + + // Set timer to resume listener. + pause_timer_.setup( + [this]() { + listener_->resume(); + paused_ = false; + }, 10, IntervalTimer::ONE_SHOT); + } // Run until a client stops the service. io_service_.run(); // If all the clients are done receiving, the test is done. - keep_going = false; + num_done = 0; for (auto const& client : clients_) { - if (!client->receiveDone()) { - keep_going = true; - break; + if (client->receiveDone()) { + ++num_done; } } } } + /// @brief Determines if the listner should be paused. + /// + /// @param num_pauses desired number of pauses + /// @param num_done number of clients that have completed their requests. + /// + /// @return True if the listener should be paused. + bool shouldPause(size_t num_pauses, size_t num_done) { + // True if the number of clients done is a mulitple of the number of pauses. + return (!paused_ && num_pauses && num_done && !(num_done % num_pauses)); + } + /// @brief Create an HttpResponse from a response string. /// /// @param response_str a string containing the whole HTTP @@ -323,7 +354,10 @@ public: /// thread command. Each client is used to carry out a single thread /// command request. Must be greater than 0 and a multiple of num_threads /// if it is greater than num_threads. - void threadListenAndRespond(size_t num_threads, size_t num_clients) { + /// @param num_pauses Desired number of times the listener should be + /// paused during the test. + void threadListenAndRespond(size_t num_threads, size_t num_clients, + size_t num_pauses = 0) { // First we makes sure the parameter rules apply. ASSERT_TRUE(num_threads > 0); ASSERT_TRUE(num_clients > 0); @@ -343,15 +377,14 @@ public: this, ph::_1, ph::_2)); // Create a listener with prescribed number of threads. - CmdHttpListenerPtr listener; - ASSERT_NO_THROW_LOG(listener.reset(new CmdHttpListener(IOAddress(SERVER_ADDRESS), + ASSERT_NO_THROW_LOG(listener_.reset(new CmdHttpListener(IOAddress(SERVER_ADDRESS), SERVER_PORT, num_threads))); - ASSERT_TRUE(listener); + ASSERT_TRUE(listener_); // Start it and verify it is listening. - ASSERT_NO_THROW_LOG(listener->start()); - ASSERT_TRUE(listener->isListening()); - EXPECT_EQ(listener->getThreadCount(), num_threads); + ASSERT_NO_THROW_LOG(listener_->start()); + ASSERT_TRUE(listener_->isListening()); + EXPECT_EQ(listener_->getThreadCount(), num_threads); // Maps the number of clients served by a given thread-id. std::map clients_per_thread; @@ -364,12 +397,12 @@ public: // Now we run the client-side IOService until all requests are done, // errors occur or the test times out. - ASSERT_NO_FATAL_FAILURE(runIOService()); + ASSERT_NO_FATAL_FAILURE(runIOService(num_pauses)); // Stop the listener and then verify it has stopped. - ASSERT_NO_THROW_LOG(listener->stop()); - ASSERT_FALSE(listener->isListening()); - EXPECT_EQ(listener->getThreadCount(), 0); + ASSERT_NO_THROW_LOG(listener_->stop()); + ASSERT_FALSE(listener_->isListening()); + EXPECT_EQ(listener_->getThreadCount(), 0); // Iterate over the clients, checking their outcomes. size_t total_responses = 0; @@ -460,8 +493,23 @@ public: << "thread-id: " << it.first << ", clients: " << it.second << std::endl; } + + // We should have had the expected number of pauses. + if (!num_pauses) { + ASSERT_EQ(pause_cnt_, 0); + } else { + // We allow a range on pauses of +-1. + ASSERT_TRUE((num_pauses - 1) <= pause_cnt_ && + (pause_cnt_ <= (num_pauses + 1))) + << " num+_pauses: " << num_pauses + << ", pause_cnt_" << pause_cnt_; + } } + + /// @brief CmdHttpListener instance under test. + CmdHttpListenerPtr listener_; + /// @brief IO service used in drive the test and test clients. IOService io_service_; @@ -499,6 +547,12 @@ public: /// @brief Condition variable used to coordinate threads. std::condition_variable cv_; + + /// @brief Indicates if client threads are currently "paused". + bool paused_; + + /// @brief Number of times client has been paused during the test. + size_t pause_cnt_; }; /// Verifies the construction, starting, stopping, and destruction @@ -506,75 +560,74 @@ public: TEST_F(CmdHttpListenerTest, basics) { // Make sure multi-threading is off. MultiThreadingMgr::instance().setMode(false); - CmdHttpListenerPtr listener; asiolink::IOAddress address(SERVER_ADDRESS); uint16_t port = SERVER_PORT; // Make sure we can create one. - ASSERT_NO_THROW_LOG(listener.reset(new CmdHttpListener(address, port))); - ASSERT_TRUE(listener); + ASSERT_NO_THROW_LOG(listener_.reset(new CmdHttpListener(address, port))); + ASSERT_TRUE(listener_); // Verify the getters do what we expect. - EXPECT_EQ(listener->getAddress(), address); - EXPECT_EQ(listener->getPort(), port); - EXPECT_EQ(listener->getThreadPoolSize(), 1); + EXPECT_EQ(listener_->getAddress(), address); + EXPECT_EQ(listener_->getPort(), port); + EXPECT_EQ(listener_->getThreadPoolSize(), 1); // It should not be listening and have no threads. - EXPECT_FALSE(listener->isListening()); - EXPECT_EQ(listener->getThreadCount(), 0); + EXPECT_FALSE(listener_->isListening()); + EXPECT_EQ(listener_->getThreadCount(), 0); // Verify that we cannot start it when multi-threading is disabled. ASSERT_FALSE(MultiThreadingMgr::instance().getMode()); - ASSERT_THROW_MSG(listener->start(), InvalidOperation, + ASSERT_THROW_MSG(listener_->start(), InvalidOperation, "CmdHttpListener cannot be started" " when multi-threading is disabled"); // It should still not be listening and have no threads. - EXPECT_FALSE(listener->isListening()); - EXPECT_EQ(listener->getThreadCount(), 0); + EXPECT_FALSE(listener_->isListening()); + EXPECT_EQ(listener_->getThreadCount(), 0); // Enable multi-threading. MultiThreadingMgr::instance().setMode(true); // Make sure we can start it and it's listening with 1 thread. - ASSERT_NO_THROW_LOG(listener->start()); - ASSERT_TRUE(listener->isListening()); - EXPECT_EQ(listener->getThreadCount(), 1); + ASSERT_NO_THROW_LOG(listener_->start()); + ASSERT_TRUE(listener_->isListening()); + EXPECT_EQ(listener_->getThreadCount(), 1); // Trying to start it again should fail. - ASSERT_THROW_MSG(listener->start(), InvalidOperation, + ASSERT_THROW_MSG(listener_->start(), InvalidOperation, "CmdHttpListener is already listening!"); // Stop it and verify we're no longer listening. - ASSERT_NO_THROW_LOG(listener->stop()); - ASSERT_FALSE(listener->isListening()); - EXPECT_EQ(listener->getThreadCount(), 0); + ASSERT_NO_THROW_LOG(listener_->stop()); + ASSERT_FALSE(listener_->isListening()); + EXPECT_EQ(listener_->getThreadCount(), 0); // Make sure we can call stop again without problems. - ASSERT_NO_THROW_LOG(listener->stop()); + ASSERT_NO_THROW_LOG(listener_->stop()); // We should be able to restart it. - ASSERT_NO_THROW_LOG(listener->start()); - ASSERT_TRUE(listener->isListening()); - EXPECT_EQ(listener->getThreadCount(), 1); + ASSERT_NO_THROW_LOG(listener_->start()); + ASSERT_TRUE(listener_->isListening()); + EXPECT_EQ(listener_->getThreadCount(), 1); // Destroying it should also stop it. // If the test timeouts we know it didn't! - ASSERT_NO_THROW_LOG(listener.reset()); + ASSERT_NO_THROW_LOG(listener_.reset()); // Verify we can construct with more than one thread. - ASSERT_NO_THROW_LOG(listener.reset(new CmdHttpListener(address, port, 4))); - ASSERT_NO_THROW_LOG(listener->start()); - EXPECT_EQ(listener->getAddress(), address); - EXPECT_EQ(listener->getPort(), port); - EXPECT_EQ(listener->getThreadPoolSize(), 4); - ASSERT_TRUE(listener->isListening()); - EXPECT_EQ(listener->getThreadCount(), 4); + ASSERT_NO_THROW_LOG(listener_.reset(new CmdHttpListener(address, port, 4))); + ASSERT_NO_THROW_LOG(listener_->start()); + EXPECT_EQ(listener_->getAddress(), address); + EXPECT_EQ(listener_->getPort(), port); + EXPECT_EQ(listener_->getThreadPoolSize(), 4); + ASSERT_TRUE(listener_->isListening()); + EXPECT_EQ(listener_->getThreadCount(), 4); // Stop it and verify we're no longer listening. - ASSERT_NO_THROW_LOG(listener->stop()); - ASSERT_FALSE(listener->isListening()); - EXPECT_EQ(listener->getThreadCount(), 0); + ASSERT_NO_THROW_LOG(listener_->stop()); + ASSERT_FALSE(listener_->isListening()); + EXPECT_EQ(listener_->getThreadCount(), 0); } @@ -583,19 +636,19 @@ TEST_F(CmdHttpListenerTest, basics) { TEST_F(CmdHttpListenerTest, basicListenAndRespond) { // Create a listener with 1 thread. - CmdHttpListenerPtr listener; - ASSERT_NO_THROW_LOG(listener.reset(new CmdHttpListener(IOAddress(SERVER_ADDRESS), + ASSERT_NO_THROW_LOG(listener_.reset(new CmdHttpListener(IOAddress(SERVER_ADDRESS), SERVER_PORT))); - ASSERT_TRUE(listener); + ASSERT_TRUE(listener_); // Start the listener and verify it's listening with 1 thread. - ASSERT_NO_THROW_LOG(listener->start()); - ASSERT_TRUE(listener->isListening()); - EXPECT_EQ(listener->getThreadCount(), 1); + ASSERT_NO_THROW_LOG(listener_->start()); + ASSERT_TRUE(listener_->isListening()); + EXPECT_EQ(listener_->getThreadCount(), 1); // Now let's send a "foo" command. This should create a client, connect // to our listener, post our request and retrieve our reply. ASSERT_NO_THROW(startRequest("{\"command\": \"foo\"}")); + ++num_clients_; ASSERT_EQ(1, clients_.size()); ASSERT_NO_THROW(runIOService()); TestHttpClientPtr client = clients_.front(); @@ -614,6 +667,7 @@ TEST_F(CmdHttpListenerTest, basicListenAndRespond) { this, ph::_1, ph::_2)); // Try posting the foo command again. ASSERT_NO_THROW(startRequest("{\"command\": \"foo\"}")); + ++num_clients_; ASSERT_EQ(2, clients_.size()); ASSERT_NO_THROW(runIOService()); client = clients_.back(); @@ -626,13 +680,13 @@ TEST_F(CmdHttpListenerTest, basicListenAndRespond) { EXPECT_EQ(hr->getBody(), "[ { \"arguments\": [ \"bar\" ], \"result\": 0 } ]"); // Make sure the listener is still listening. - ASSERT_TRUE(listener->isListening()); - EXPECT_EQ(listener->getThreadCount(), 1); + ASSERT_TRUE(listener_->isListening()); + EXPECT_EQ(listener_->getThreadCount(), 1); // Stop the listener then verify it has stopped. - ASSERT_NO_THROW_LOG(listener->stop()); - ASSERT_FALSE(listener->isListening()); - EXPECT_EQ(listener->getThreadCount(), 0); + ASSERT_NO_THROW_LOG(listener_->stop()); + ASSERT_FALSE(listener_->isListening()); + EXPECT_EQ(listener_->getThreadCount(), 0); } // Now we'll run some permutations of the number of listener threads @@ -680,4 +734,12 @@ TEST_F(CmdHttpListenerTest, sixByEighteen) { threadListenAndRespond(num_threads, num_clients); } +// Pauses and resumes during work. +TEST_F(CmdHttpListenerTest, pauseAndResume) { + size_t num_threads = 6; + size_t num_clients = 18; + size_t num_pauses = 3; + threadListenAndRespond(num_threads, num_clients, num_pauses); +} + } // end of anonymous namespace diff --git a/src/lib/http/Makefile.am b/src/lib/http/Makefile.am index b2646e7065..6101a0c84f 100644 --- a/src/lib/http/Makefile.am +++ b/src/lib/http/Makefile.am @@ -43,6 +43,7 @@ libkea_http_la_SOURCES += auth_log.cc auth_log.h libkea_http_la_SOURCES += auth_messages.cc auth_messages.h libkea_http_la_SOURCES += basic_auth_config.cc basic_auth_config.h libkea_http_la_SOURCES += basic_auth.cc basic_auth.h +libkea_http_la_SOURCES += http_thread_pool.cc http_thread_pool.h libkea_http_la_CXXFLAGS = $(AM_CXXFLAGS) libkea_http_la_CPPFLAGS = $(AM_CPPFLAGS) @@ -114,6 +115,7 @@ libkea_http_include_HEADERS = \ http_message.h \ http_message_parser_base.h \ http_messages.h \ + http_thread_pool.h \ http_types.h \ listener.h \ listener_impl.h \ diff --git a/src/lib/http/client.cc b/src/lib/http/client.cc index 30c82d184a..07dc4f5f5f 100644 --- a/src/lib/http/client.cc +++ b/src/lib/http/client.cc @@ -27,6 +27,8 @@ #include #include #include +#include + using namespace isc; using namespace isc::asiolink; @@ -1736,30 +1738,30 @@ public: /// /// @param io_service IOService that will drive connection IO in single /// threaded mode. (Currently ignored in multi-threaded mode) - /// /// @param thread_pool_size maximum number of concurrent threads /// Internally this also sets the maximum number concurrent connections + /// @param defer_thread_start if true, then the thread pool will be + /// created but started Applicable only when thread-pool-size is + /// greater than zero. + /// will not be startedfalse, then /// per URL. - HttpClientImpl(IOService& io_service, size_t thread_pool_size = 0) : - thread_pool_size_(thread_pool_size) { + HttpClientImpl(IOService& io_service, size_t thread_pool_size = 0, + bool defer_thread_start = false) + : thread_pool_size_(thread_pool_size), threads_() { if (thread_pool_size_ > 0) { // Create our own private IOService. thread_io_service_.reset(new IOService()); - // Create a pool of threads, each calls run on the same, private - // io_service instance - for (std::size_t i = 0; i < thread_pool_size_; ++i) { - boost::shared_ptr thread(new std::thread(std::bind(&IOService::run, - thread_io_service_))); - threads_.push_back(thread); - } + // Create the thread pool. + threads_.reset(new HttpThreadPool(thread_io_service_, thread_pool_size_, + defer_thread_start)); // Create the connection pool. Note that we use the thread_pool_size // as the maximum connections per URL value. conn_pool_.reset(new ConnectionPool(*thread_io_service_, thread_pool_size_)); LOG_DEBUG(http_logger, isc::log::DBGLVL_TRACE_BASIC, HTTP_CLIENT_MT_STARTED) - .arg(threads_.size()); + .arg(getThreadCount()); } else { // Single-threaded mode: use the caller's IOService, // one connection per URL. @@ -1774,30 +1776,40 @@ public: stop(); } - /// @brief Close all connections, and if multi-threaded, stop internal IOService - /// and the thread pool. + /// @brief Close all connections, and if multi-threaded, stops the + /// thread pool. void stop() { // Close all the connections. conn_pool_->closeAll(); - // Stop the multi-threaded service. - if (thread_io_service_) { - // Flush cancelled (and ready) handlers. - thread_io_service_->poll(); + if (threads_) { + threads_->stop(); + } + } - // Stop the private IOService. - thread_io_service_->stop(); + void pause() { + if (!threads_) { + isc_throw(InvalidOperation, "HttpClient::pause - no thread pool"); + } - // Shutdown the threads. - for (auto const& thread : threads_) { - thread->join(); - } + threads_->pause(); + } - threads_.clear(); + /// @brief Pauses the thread pool's worker threads. + void resume() { + if (!threads_) { + isc_throw(InvalidOperation, "HttpClient::resume - no thread pool"); } - // Get rid of the IOService. - thread_io_service_.reset(); + threads_->resume(); + } + + HttpThreadPool::RunState getRunState() const { + if (!threads_) { + isc_throw(InvalidOperation, "HttpClient::getRunState - no thread pool"); + } + + return (threads_->getRunState()); } /// @brief Fetches the internal IOService used in multi-threaded mode. @@ -1819,22 +1831,26 @@ public: /// /// @return the number of running threads. uint16_t getThreadCount() { - return (threads_.size()); + if (!threads_) { + return (0); + } + return (threads_->getThreadCount()); } /// @brief Holds a pointer to the connection pool. ConnectionPoolPtr conn_pool_; private: + /// @brief Maxim number of threads in the thread pool. size_t thread_pool_size_; - /// @brief Pool of threads used to service connections in multi-threaded - /// mode. - std::vector > threads_; - /// @brief Pointer to private IOService used in multi-threaded mode. asiolink::IOServicePtr thread_io_service_; + + /// @brief Pool of threads used to service connections in multi-threaded + /// mode. + HttpThreadPoolPtr threads_; }; HttpClient::HttpClient(IOService& io_service, size_t thread_pool_size) { @@ -1898,6 +1914,16 @@ HttpClient::stop() { impl_->stop(); } +void +HttpClient::pause() { + impl_->pause(); +} + +void +HttpClient::resume() { + impl_->resume(); +} + const IOServicePtr HttpClient::getThreadIOService() const { return (impl_->getThreadIOService()); @@ -1913,5 +1939,11 @@ HttpClient::getThreadCount() const { return (impl_->getThreadCount()); } +HttpThreadPool::RunState +HttpClient::getRunState() const { + return (impl_->getRunState()); +} + + } // end of namespace isc::http } // end of namespace isc diff --git a/src/lib/http/client.h b/src/lib/http/client.h index 4fd3c319e9..6241238ece 100644 --- a/src/lib/http/client.h +++ b/src/lib/http/client.h @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -85,7 +86,6 @@ class HttpClientImpl; /// are returned via the 3rd parameter of the callback. class HttpClient { public: - /// @brief HTTP request/response timeout value. struct RequestTimeout { /// @brief Constructor. @@ -249,6 +249,7 @@ public: /// IOService. void stop(); + /// @brief Closes a connection if it has an out-of-band socket event /// /// If the client owns a connection using the given socket and that @@ -280,6 +281,10 @@ public: /// @return the number of running threads. uint16_t getThreadCount() const; + void pause(); + void resume(); + HttpThreadPool::RunState getRunState() const; + private: /// @brief Pointer to the HTTP client implementation. diff --git a/src/lib/http/http_thread_pool.cc b/src/lib/http/http_thread_pool.cc new file mode 100644 index 0000000000..642ac83f3f --- /dev/null +++ b/src/lib/http/http_thread_pool.cc @@ -0,0 +1,190 @@ +// Copyright (C) 2021 Internet Systems Consortium, Inc. ("ISC") +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include +#include +#include +#include +#include +#include + +using namespace isc; +using namespace isc::asiolink; +using namespace isc::http; +using namespace isc::util; + +HttpThreadPool::HttpThreadPool(IOServicePtr io_service, size_t pool_size, + bool defer_start /* = false */) + : pool_size_(pool_size), io_service_(io_service), + run_state_(RunState::STOPPED), mutex_(), cv_() { + if (!pool_size) { + isc_throw(BadValue, "HttpThreadPool::ctor pool_size must be > 0"); + } + + // If we weren't given an IOService, create our own. + if (!io_service_) { + io_service_.reset(new IOService()); + } + + // If we're not deferring the start, do it now. + if (!defer_start) { + start(); + } +} + +HttpThreadPool::~HttpThreadPool() { + if (getRunState() != RunState::STOPPED) { + // Stop if we aren't already stopped + stop(); + } +} + +void +HttpThreadPool::start() { + if (getRunState() != RunState::STOPPED) { + isc_throw(InvalidOperation, "HttpThreadPool::start already started!"); + } + + // Set state to RUN. + setRunState(RunState::RUN); + + // Prep IOservice for run() invocations. + io_service_->restart(); + + // Create a pool of threads, each calls run() on our + // io_service instance. + for (std::size_t i = 0; i < pool_size_; ++i) { + boost::shared_ptr thread(new std::thread( + [this]() { + bool done = false; + while (!done) { + switch (getRunState()) { + case RunState::RUN: + io_service_->run(); + break; + case RunState::PAUSED: + { + // We need to stop and wait to be released. We don't care how + // we exit, we'll do whatever the current state dictates. + std::unique_lock lck(mutex_); + static_cast(cv_.wait_for(lck, std::chrono::milliseconds(25), + [&]() { + return (run_state_ != RunState::PAUSED); + })); + + break; + } + case RunState::SHUTDOWN: + done = true; + break; + case RunState::STOPPED: + // This should never happen. + done = true; + break; + } + } + })); + + threads_.push_back(thread); + } +} + +void +HttpThreadPool::stop() { + if (getRunState() == RunState::STOPPED) { + // Nothing to do. + return; + } + + // Set the state to SHUTDOWN. + setRunState(RunState::SHUTDOWN); + + // Stop our IOService. + if (!io_service_->stopped()) { + io_service_->stop(); + } + + // Shutdown the threads. + for (auto const& thread : threads_) { + thread->join(); + } + + // Empty the thread pool. + threads_.clear(); + + // Set the state to STOPPED. + setRunState(RunState::STOPPED); +} + +void +HttpThreadPool::pause() { + if (getRunState() != RunState::RUN) { + // Not running, can't pause. + return; + } + + /// @todo TKM - Take this out + std::cout << "HttpThreadPool pausing" << std::endl; + setRunState(RunState::PAUSED); + io_service_->stop(); +} + +void +HttpThreadPool::resume() { + if (getRunState() != RunState::PAUSED) { + // Not PAUSED, can't resume. + return; + } + + /// @todo TKM - Take this out + std::cout << "HttpThreadPool resuming" << std::endl; + io_service_->restart(); + setRunState(RunState::RUN); +} + +HttpThreadPool::RunState +HttpThreadPool::getRunState() { + std::lock_guard lck(mutex_); + return (run_state_); +} + +void +HttpThreadPool::setRunState(RunState state) { + { + std::lock_guard lck(mutex_); + run_state_ = state; + } + cv_.notify_all(); +} + +IOServicePtr +HttpThreadPool::getIOService() const { + return (io_service_); +} + +uint16_t +HttpThreadPool::getPoolSize() const { + return (pool_size_); +} + +uint16_t +HttpThreadPool::getThreadCount() const { + return (threads_.size()); +} diff --git a/src/lib/http/http_thread_pool.h b/src/lib/http/http_thread_pool.h new file mode 100644 index 0000000000..ed0c2d8a3e --- /dev/null +++ b/src/lib/http/http_thread_pool.h @@ -0,0 +1,149 @@ +// Copyright (C) 2021 Internet Systems Consortium, Inc. ("ISC") +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +#ifndef HTTP_THREAD_POOL_H +#define HTTP_THREAD_POOL_H + +#include +#include + +#include + +#include +#include +#include +#include + +namespace isc { +namespace http { + +/// @brief Implements a pausable pool of IOService driven threads. +class HttpThreadPool { +public: + /// @brief Describes the possible operational state of the pool. + enum class RunState { + STOPPED, /// Pool is not operational. + RUN, /// Pool is populated with running threads. + PAUSED, /// Pool is populated with threads which are paused. + SHUTDOWN, /// Pool is transitioning from RUN or PAUSED to STOPPED. + }; + + /// @brief Constructor + /// + /// @param io_service IOService that will drive the pool's IO. If empty, it + /// create it's own instance. + /// @param pool_size Maximum number of threads in the pool. Currently the number + /// of threads is fixed at this value. + /// @param defer_start If true, creation of the threads is deferred until a subsequent + /// call to @ref start(). In this case the pool's operational state post construction + /// is STOPPED. If false, the construtor will invoke start() which will create the + /// threads, placing the pool in RUN state. + HttpThreadPool(asiolink::IOServicePtr io_service, size_t pool_size, bool defer_start = false); + + /// @brief Destructor + /// + /// Ensures the pool is stopped prior to destruction. + ~HttpThreadPool(); + + /// @brief Transitions the pool from STOPPED to RUN run state. + /// + /// It starts the pool by doing the following: + /// -# Sets state to RUN + /// -# Restarts the IOService preparing it thread invocations of + /// IOService::run() + /// -# Creates thread_pool_size_ threads, adding each to the pool. + /// + /// @throw InvalidOperation if called with the pool in any state other + /// than STOPPED. + void start(); + + /// @brief Tranisitions the pool to STOPPED state. + /// + /// It stops the pool by doing the following: + /// -# Sets the state to SHUTDOWN. + /// =# Stops the IOService. + /// =# Joins the pool threads. + /// -# Empties the pool. + /// -# Sets the state to STOPPED. + void stop(); + + /// @brief Transitions the pool from RUN to PAUSED state. + /// + /// If the state is any state other than RUN it simply returns, + /// otherwise it does the following: + /// + /// -# Sets the state to PAUSED. + /// -# Stops the IOService. + void pause(); + + /// @brief Transitions the pool from PAUSED to RUN state. + /// + /// If the state is any state other than PAUSED it simply returns, + /// otherwise it does the following: + /// + /// -# Restarts the IOService preparing it for thread invocations + /// of IOService::run() + /// -# Sets the state to RUN. + void resume(); + + /// @brief Thread-safe fetch of the pool's operational state. + /// + /// @return Pool run state. + RunState getRunState(); + +private: + /// @brief Thread-safe set of the pool's operational state. + /// + /// @note This method does not validate the state change. + /// + /// @param state new state for the pool. + void setRunState(RunState state); + +public: + /// @brief Fetches the IOService that drives the pool. + /// + /// @return A pointer to the IOService. + asiolink::IOServicePtr getIOService() const; + + /// @brief Fetches the maximum size of the thread pool. + /// + /// @return the maximum size of the thread pool. + uint16_t getPoolSize() const; + + /// @brief Fetches the number of threads in the pool. + /// + /// @return the number of running threads. + uint16_t getThreadCount() const; + +private: + /// @brief Maxim number of threads in the thread pool. + size_t pool_size_; + + /// @brief Pointer to private IOService used in multi-threaded mode. + asiolink::IOServicePtr io_service_; + + /// @brief Tracks the operational state of the pool. + RunState run_state_; + + /// @brief Mutex to protect the internal state. + std::mutex mutex_; + + /// @brief Condition variable for synchronization. + std::condition_variable cv_; + + /// @brief Pool of threads used to service connections in multi-threaded + /// mode. + std::list > threads_; +}; + +/// @brief Defines a pointer to a thread pool. +typedef boost::shared_ptr HttpThreadPoolPtr; + +} // end of namespace isc::http +} // end of namespace isc + +#endif + diff --git a/src/lib/http/tests/Makefile.am b/src/lib/http/tests/Makefile.am index 5dc64f32e8..8fbc3397e7 100644 --- a/src/lib/http/tests/Makefile.am +++ b/src/lib/http/tests/Makefile.am @@ -49,6 +49,7 @@ endif libhttp_unittests_SOURCES += url_unittests.cc libhttp_unittests_SOURCES += test_http_client.h libhttp_unittests_SOURCES += mt_client_unittests.cc +libhttp_unittests_SOURCES += http_thread_pool_unittests.cc libhttp_unittests_CPPFLAGS = $(AM_CPPFLAGS) $(GTEST_INCLUDES) libhttp_unittests_CXXFLAGS = $(AM_CXXFLAGS) diff --git a/src/lib/http/tests/mt_client_unittests.cc b/src/lib/http/tests/mt_client_unittests.cc index cab50bd764..81ed232a35 100644 --- a/src/lib/http/tests/mt_client_unittests.cc +++ b/src/lib/http/tests/mt_client_unittests.cc @@ -198,7 +198,8 @@ public: MtHttpClientTest() : io_service_(), client_(), listener_(), factory_(), listeners_(), factories_(), test_timer_(io_service_), num_threads_(0), num_batches_(0), num_listeners_(0), - expected_requests_(0), num_in_progress_(0), num_finished_(0) { + expected_requests_(0), num_in_progress_(0), num_finished_(0), paused_(false), + pause_cnt_(0) { test_timer_.setup(std::bind(&MtHttpClientTest::timeoutHandler, this, true), TEST_TIMEOUT, IntervalTimer::ONE_SHOT); MultiThreadingMgr::instance().setMode(true); @@ -235,7 +236,7 @@ public: /// have been carried out or the test fails. void runIOService() { // Loop until the clients are done, an error occurs, or the time runs out. - while (clientRRs_.size() < expected_requests_) { + while (getRRCount() < expected_requests_) { // Always call reset() before we call run(); io_service_.get_io_service().reset(); @@ -304,15 +305,15 @@ public: // Wait here until we have as many in progress as we have threads. { - std::unique_lock lck(mutex_); + std::unique_lock lck(test_mutex_); ++num_in_progress_; if (num_threads_ == 0 || num_in_progress_ == num_threads_) { // Everybody has one, let's go. num_finished_ = 0; - cv_.notify_all(); + test_cv_.notify_all(); } else { // I'm ready but others aren't wait here. - bool ret = cv_.wait_for(lck, std::chrono::seconds(10), + bool ret = test_cv_.wait_for(lck, std::chrono::seconds(10), [&]() { return (num_in_progress_ == num_threads_); }); if (!ret) { ADD_FAILURE() << "clients failed to start work"; @@ -332,18 +333,18 @@ public: // Wait here until we have as many ready to finish as we have threads. { - std::unique_lock lck(mutex_); + std::unique_lock lck(test_mutex_); ++num_finished_; clientRRs_.push_back(clientRR); if (num_threads_ == 0 || num_finished_ == num_threads_) { // We're all done, notify the others and finish. num_in_progress_ = 0; - cv_.notify_all(); + test_cv_.notify_all(); // Stop the test's IOService. io_service_.stop(); } else { // I'm done but others aren't wait here. - bool ret = cv_.wait_for(lck, std::chrono::seconds(10), + bool ret = test_cv_.wait_for(lck, std::chrono::seconds(10), [&]() { return (num_finished_ == num_threads_); }); if (!ret) { ADD_FAILURE() << "clients failed to finish work"; @@ -353,6 +354,57 @@ public: })); } + /// @brief Initiates a single HTTP request. + /// + /// Constructs an HTTP post whose body is a JSON map containing a + /// single integer element, "sequence". + /// + /// The request completion handler simply constructs the response, + /// and adds it the list of completed request/responses. If the + /// number of completed requests has reached the expected number + /// it stops the test IOService. + /// + /// @param sequence value for the integer element, "sequence", + /// to send in the request. + void startRequestSimple(int sequence, int port_offset = 0) { + // Create the URL on which the server can be reached. + std::stringstream ss; + ss << "http://" << SERVER_ADDRESS << ":" << (SERVER_PORT + port_offset); + Url url(ss.str()); + + // Initiate request to the server. + PostHttpRequestJsonPtr request_json = createRequest("sequence", sequence); + HttpResponseJsonPtr response_json = boost::make_shared(); + ASSERT_NO_THROW(client_->asyncSendRequest(url, TlsContextPtr(), + request_json, response_json, + [this, request_json, response_json](const boost::system::error_code& ec, + const HttpResponsePtr&, + const std::string&) { + // Bail on an error. + ASSERT_FALSE(ec) << "asyncSendRequest failed, ec: " << ec; + + // Get stringified thread-id. + std::stringstream ss; + ss << std::this_thread::get_id(); + + // Create the ClientRR. + ClientRRPtr clientRR(new ClientRR()); + clientRR->thread_id_ = ss.str(); + clientRR->request_ = request_json; + clientRR->response_ = response_json; + + { + std::unique_lock lck(test_mutex_); + clientRRs_.push_back(clientRR); + ++num_finished_; + if ((num_finished_ >= expected_requests_) && !io_service_.stopped()) { + io_service_.stop(); + } + } + + })); + } + /// @brief Carries out HTTP requests via HttpClient to HTTP listener(s). /// /// This function creates one HttpClient with the given number @@ -362,7 +414,10 @@ public: /// /// Then it iteratively runs the test's IOService until all /// the requests have been responded to, an error occurs, or the - /// test times out. + /// test times out. During each pass through the run loop, the + /// a call to shouldPause() is made to determine if the client + /// thread pool should be paused. If so, the the pool is paused + /// and an timer begun which resumes the pool upon timeout. /// /// Each request carries a single integer element, "sequence", which /// uniquely identifies the request. Each response is expected to @@ -382,8 +437,13 @@ public: /// A value of 0 puts the HttpClient in single-threaded mode. /// @param num_batches number of batches of requests that should be /// conducted. - /// @param num_listeners number of HttpListeners to create. - void threadRequestAndReceive(size_t num_threads, size_t num_batches, size_t num_listeners = 1) { + /// @param num_listeners number of HttpListeners to create. Defaults + /// to 1. + /// @param num_pauses number of times to pause and resume the client + /// during the test. Defaults to 0. + void threadRequestAndReceive(size_t num_threads, size_t num_batches, + size_t num_listeners = 1, + size_t num_pauses = 0) { ASSERT_TRUE(num_batches); ASSERT_TRUE(num_listeners); num_threads_ = num_threads; @@ -440,8 +500,32 @@ public: } } - // Run test thread IOService. This drives the listener's IO. - ASSERT_NO_THROW(runIOService()); + // Create a timer to use for invoking resume after pause. + IntervalTimer pause_timer_(io_service_); + paused_ = false; + + // Loop until the clients are done, an error occurs, or the time runs out. + while (getRRCount() < expected_requests_) { + // Always call restart() before we call run(); + io_service_.restart(); + + if (shouldPause(num_pauses)) { + // Pause client. + paused_ = true; + ++pause_cnt_; + client_->pause(); + + // Set timer to resume client. + pause_timer_.setup( + [this]() { + client_->resume(); + paused_ = false; + }, 10, IntervalTimer::ONE_SHOT); + } + + // Run until a client stops the service. + io_service_.run(); + } // Client should stop without issue. ASSERT_NO_THROW(client_->stop()); @@ -452,7 +536,22 @@ public: } // We should have a response for each request. - ASSERT_EQ(clientRRs_.size(), expected_requests_); + ASSERT_EQ(getRRCount(), expected_requests_); + + // We should have had the expected number of pauses. + if (!num_pauses) { + ASSERT_EQ(pause_cnt_, 0); + } else { + // We allow a range on pauses of +-1. Figuring + // out the exact intervals at which to pause was + // getting to be a pain. We don't really care as + // long as we're close. The primary thing is that + // we did in fact pause and resume. + ASSERT_TRUE((num_pauses - 1) <= pause_cnt_ && + (pause_cnt_ <= (num_pauses + 1))) + << " num+_pauses: " << num_pauses + << ", pause_cnt_" << pause_cnt_; + } // Create a map to track number of responses for each client thread. std::map responses_per_thread; @@ -537,6 +636,136 @@ public: } } + /// @brief Indicates if the test should pause. + /// + /// Returns true if the number of completed requests + /// has reached or exceeded the next pause interval. + /// The pause interval is given by expected number of + /// requests divided by the desired number of pauses. + /// + /// @param num_pauses Desired number of pauses. + /// + /// @return True if the client should be paused. + bool shouldPause(size_t num_pauses) { + size_t rr_count = getRRCount(); + if (paused_ || !num_pauses || !rr_count) { + return false; + } + + size_t interval = expected_requests_ / num_pauses; + size_t next_stop = interval * (pause_cnt_ + 1); + return (rr_count >= next_stop); + } + + /// @brief Verifies the client can be puased and shutdown while doing work. + /// + /// @param num_threads number of threads the HttpClient should use. + /// A value of 0 puts the HttpClient in single-threaded mode. + /// @param num_batches number of batches of requests that should be + /// conducted. + /// @param num_listeners number of HttpListeners to create. Defaults + /// to 1. + void workPauseShutdown(size_t num_threads, size_t num_batches, + size_t num_listeners = 1, bool pause_first = true) { + ASSERT_TRUE(num_batches); + ASSERT_TRUE(num_listeners); + num_threads_ = num_threads; + num_batches_ = num_batches; + num_listeners_ = num_listeners; + + // Client in ST is, in effect, 1 thread. + size_t effective_threads = (num_threads_ == 0 ? 1 : num_threads_); + + // Calculate the maximum requests that could complete. + size_t maximum_requests = (num_batches_ * num_listeners_ * effective_threads); + + // Calculate the expected number of requests. + expected_requests_ = maximum_requests / 2; + + for (auto i = 0; i < num_listeners_; ++i) { + // Make a factory + HttpResponseCreatorFactoryPtr factory(new TestHttpResponseCreatorFactory(SERVER_PORT + i)); + factories_.push_back(factory); + + // Need to create a Listener on + HttpListenerPtr listener(new HttpListener(io_service_, + IOAddress(SERVER_ADDRESS), (SERVER_PORT + i), + TlsContextPtr(), factory, + HttpListener::RequestTimeout(10000), + HttpListener::IdleTimeout(10000))); + listeners_.push_back(listener); + + // Start the server. + ASSERT_NO_THROW(listener->start()); + } + + // Create an MT client with num_threads + ASSERT_NO_THROW_LOG(client_.reset(new HttpClient(io_service_, num_threads))); + ASSERT_TRUE(client_); + + if (num_threads_ == 0) { + // If we single-threaded client should not have it's own IOService. + ASSERT_FALSE(client_->getThreadIOService()); + } else { + // If we multi-threaded client should have it's own IOService. + ASSERT_TRUE(client_->getThreadIOService()); + } + + // Verify the pool size and number of threads are as expected. + ASSERT_EQ(client_->getThreadPoolSize(), num_threads); + ASSERT_EQ(client_->getThreadCount(), num_threads); + + // Start the requisite number of requests: + // batch * listeners * threads. + int sequence = 0; + for (auto b = 0; b < num_batches; ++b) { + for (auto l = 0; l < num_listeners_; ++l) { + for (auto t = 0; t < effective_threads; ++t) { + startRequestSimple(++sequence, l); + } + } + } + + // Loop until the 1/2 the reuests are done, an error occurs, + // or the time runs out. + size_t rr_count = 0; + while (rr_count < (expected_requests_)) { + // Always call reset() before we call run(); + io_service_.get_io_service().reset(); + + // Run until a client stops the service. + io_service_.run(); + rr_count = getRRCount(); + } + + if (pause_first) { + // Pause the client. + ASSERT_NO_THROW(client_->pause()); + ASSERT_EQ(HttpThreadPool::RunState::PAUSED, client_->getRunState()); + } + + // We should have completed at least the expected number of requests + // but less than the maximum number of requests. + ASSERT_GE(getRRCount(), expected_requests_ ); + ASSERT_LT(getRRCount(), maximum_requests); + + // Client should stop without issue. + ASSERT_NO_THROW(client_->stop()); + + // Listeners should stop without issue. + for (const auto& listener : listeners_) { + ASSERT_NO_THROW(listener->stop()); + } + } + + /// @brief Fetch the number of completed requests. + /// + /// @return number of completed requests. + size_t getRRCount() { + std::unique_lock lck(test_mutex_); + return(clientRRs_.size()); + } + /// @brief IO service used in the tests. IOService io_service_; @@ -580,12 +809,18 @@ public: std::vector clientRRs_; /// @brief Mutex for locking. - std::mutex mutex_; + std::mutex test_mutex_; /// @brief Condition variable used to make client threads wait /// until number of in-progress requests reaches the number /// of client requests. - std::condition_variable cv_; + std::condition_variable test_cv_; + + /// @brief Indicates if client threads are currently "paused". + bool paused_; + + /// @brief Number of times client has been paused during the test. + size_t pause_cnt_; }; // Verifies we can construct and destruct, in both single @@ -628,7 +863,8 @@ TEST_F(MtHttpClientTest, basics) { ASSERT_NO_THROW_LOG(client->stop()); // Verify we're stopped. - ASSERT_FALSE(client->getThreadIOService()); + ASSERT_TRUE(client->getThreadIOService()); + EXPECT_TRUE(client->getThreadIOService()->stopped()); ASSERT_EQ(client->getThreadPoolSize(), 3); ASSERT_EQ(client->getThreadCount(), 0); @@ -700,4 +936,22 @@ TEST_F(MtHttpClientTest, fourByFourByTwo) { threadRequestAndReceive(num_threads, num_batches, num_listeners); } +// Verifies that we can cleanly work, pause, and resume repeatedly. +TEST_F(MtHttpClientTest, workPauseResumee) { + size_t num_threads = 12; + size_t num_batches = 12; + size_t num_listeners = 12; + size_t num_pauses = 7; + threadRequestAndReceive(num_threads, num_batches, num_listeners, num_pauses); +} + +// Verifies that we can cleanly pause and shutdown while doing +// multi-threaded work. +TEST_F(MtHttpClientTest, workPauseShutdown) { + size_t num_threads = 8; + size_t num_batches = 8; + size_t num_listeners = 8; + workPauseShutdown(num_threads, num_batches, num_listeners); +} + } // end of anonymous namespace