}
}
- /// @brief Determines if the listener should be paused.
- ///
- /// @param num_pauses desired number of pauses
- /// @param num_done number of clients that have completed their requests.
- ///
- /// @return True if the listener should be paused.
- bool shouldPause(size_t num_pauses, size_t num_done) {
- size_t request_limit = (pause_cnt_ < num_pauses ?
- (num_done + ((clients_.size() - num_done) / num_pauses))
- : clients_.size());
- return (request_limit);
- }
-
/// @brief Create an HttpResponse from a response string.
///
/// @param response_str a string containing the whole HTTP
/// @brief Submits one or more thread commands to a CmdHttpListener.
///
- /// This function command will creates a CmdHttpListener
+ /// This function command will create a CmdHttpListener
/// with the given number of threads, initiates the given
/// number of clients, each requesting the "thread" command,
/// and then iteratively runs the test's IOService until all
/// @brief Pauses and resumes a CmdHttpListener while it processes command
/// requests.
///
- /// This function command will creates a CmdHttpListener
+ /// This function command will create a CmdHttpListener
/// with the given number of threads, initiates the given
/// number of clients, each requesting the "thread" command,
/// and then iteratively runs the test's IOService until all
// Create a listener with prescribed number of threads.
ASSERT_NO_THROW_LOG(listener_.reset(new CmdHttpListener(IOAddress(SERVER_ADDRESS),
- SERVER_PORT, num_threads)));
+ SERVER_PORT, num_threads)));
ASSERT_TRUE(listener_);
// Start it and verify it is running.
// Now we run the client-side IOService until all requests are done,
// errors occur or the test times out. We'll pause and resume the
- // given the number of pauses
+ // number of times given by num_pauses.
size_t num_done = 0;
size_t total_requests = clients_.size();
while (num_done < total_requests) {
// Calculate how many more requests to process before we pause again.
- // We divide the number of oustanding requests by the number of pauses
- // and stop after we've done at least that many more.
+ // We divide the number of outstanding requests by the number of pauses
+ // and stop after we've done at least that many more requests.
size_t request_limit = (pause_cnt_ < num_pauses ?
(num_done + ((total_requests - num_done) / num_pauses))
- : total_requests);
+ : total_requests);
// Run test IOService until we hit the limit.
runIOService(request_limit);
/// deferred until a subsequent call to @ref start(). In this case the
/// pool's operational state post-construction is STOPPED. Otherwise,
/// the thread pool threads will be created and started, with the post-
- /// construction state being RUN. Applicable only when thread-pool size
+ /// construction state being RUNNING. Applicable only when thread-pool size
/// is greater than zero.
HttpClientImpl(IOService& io_service, size_t thread_pool_size = 0,
bool defer_thread_start = false)
/// deferred until a subsequent call to @ref start(). In this case the
/// pool's operational state post-construction is STOPPED. Otherwise,
/// the thread pool threads will be created and started, with the post-
- /// construction state being RUN. Applicable only when thread-pool size
+ /// construction state being RUNNING. Applicable only when thread-pool size
/// is greater than zero.
explicit HttpClient(asiolink::IOService& io_service, size_t thread_pool_size = 0,
bool defer_thread_start = false);
/// @brief Indicates if the thread pool processing is running.
///
- /// @return True if the thread pool exists and is in the RUN state,
+ /// @return True if the thread pool exists and is in the RUNNING state,
/// false otherwise.
bool isRunning();
switch(new_state) {
case RunState::RUNNING: {
- // Restart the IOSerivce.
+ // Restart the IOService.
io_service_->restart();
// While we have fewer threads than we should, make more.
while (threads_.size() < pool_size_) {
boost::shared_ptr<std::thread> thread(new std::thread(
- [this]() {
- bool done = false;
- while (!done) {
- switch (getRunState()) {
- case RunState::RUNNING: {
- {
- std::unique_lock<std::mutex> lck(mutex_);
- running_++;
-
- // If We're all running notify main thread.
- if (running_ == pool_size_) {
- main_cv_.notify_all();
- }
- }
-
- // Run the IOService.
- io_service_->run();
-
- {
- std::unique_lock<std::mutex> lck(mutex_);
- running_--;
- }
-
- break;
- }
-
- case RunState::PAUSED: {
- std::unique_lock<std::mutex> lck(mutex_);
- paused_++;
-
- // If we're all paused notify main.
- if (paused_ == threads_.size()) {
- main_cv_.notify_all();
- }
-
- // Wait here till I'm released.
- thread_cv_.wait(lck,
- [&]() {
- return (run_state_ != RunState::PAUSED);
- });
-
- paused_--;
- break;
- }
-
- case RunState::STOPPED: {
- done = true;
- break;
- }}
- }
-
- std::unique_lock<std::mutex> lck(mutex_);
- exited_++;
- if (exited_ == threads_.size()) {
- main_cv_.notify_all();
- }
- }));
+ std::bind(&HttpThreadPool::threadWork, this)));
// Add thread to the pool.
threads_.push_back(thread);
}}
}
+void
+HttpThreadPool::threadWork() {
+ bool done = false;
+ while (!done) {
+ switch (getRunState()) {
+ case RunState::RUNNING: {
+ {
+ std::unique_lock<std::mutex> lck(mutex_);
+ running_++;
+
+ // If We're all running notify main thread.
+ if (running_ == pool_size_) {
+ main_cv_.notify_all();
+ }
+ }
+
+ // Run the IOService.
+ io_service_->run();
+
+ {
+ std::unique_lock<std::mutex> lck(mutex_);
+ running_--;
+ }
+
+ break;
+ }
+
+ case RunState::PAUSED: {
+ std::unique_lock<std::mutex> lck(mutex_);
+ paused_++;
+
+ // If we're all paused notify main.
+ if (paused_ == threads_.size()) {
+ main_cv_.notify_all();
+ }
+
+ // Wait here till I'm released.
+ thread_cv_.wait(lck,
+ [&]() {
+ return (run_state_ != RunState::PAUSED);
+ });
+
+ paused_--;
+ break;
+ }
+
+ case RunState::STOPPED: {
+ done = true;
+ break;
+ }}
+ }
+
+ std::unique_lock<std::mutex> lck(mutex_);
+ exited_++;
+
+ // If we've all exited, notify main.
+ if (exited_ == threads_.size()) {
+ main_cv_.notify_all();
+ }
+}
+
IOServicePtr
HttpThreadPool::getIOService() const {
return (io_service_);
/// @param defer_start If true, creation of the threads is deferred until
/// a subsequent call to @ref start(). In this case the pool's operational
/// state post construction is STOPPED. If false, the constructor will
- /// invoke run() to tranistion the pool into the RUNNING state.
+ /// invoke run() to transition the pool into the RUNNING state.
HttpThreadPool(asiolink::IOServicePtr io_service, size_t pool_size,
bool defer_start = false);
RunState getRunState();
private:
- /// @brief Thread-safe change of the pool's operational state.
+ /// @brief Thread-safe change of the pool's run state.
///
- /// Transitions a pool from one state to another:
+ /// Transitions a pool from one run state to another:
///
/// When moving from STOPPED or PAUSED to RUNNING:
/// -# Sets state to RUNNING.
/// -# Creates the threads if they do not yet exist (true only
/// when transitioning from STOPPED).
/// -# Waits until threads are running.
+ /// -# Sets the count of exited threads to 0.
/// -# Returns to caller.
///
/// When moving from RUNNING or PAUSED to STOPPED:
/// @brief Validates whether the pool can change to a given state.
///
/// @param state new state for the pool.
- /// @return true if the changs is valid, false otherwise.
+ /// @return true if the Chang's is valid, false otherwise.
/// @note Must be called from a thread-safe context.
bool validateStateChange(RunState state) const;
+ /// @brief Work function executed by each thread in the pool.
+ ///
+ /// Implements the run state responsibilities for a given thread.
+ /// It executes a run loop until the pool is stopped. At the top
+ /// of each iteration of the loop the pool's run state is checked
+ /// and when it is:
+ ///
+ /// RUNNING:
+ /// -# The count of threads running is incremented.
+ /// -# If the count has reached the number of threads in pool the
+ /// main thread is notified.
+ /// -# IOService::run() is invoked.
+ /// -# When IOService::run() returns, the count of threads running
+ /// is decremented.
+ ///
+ /// PAUSED:
+ /// -# The count of threads paused is incremented.
+ /// -# If the count has reached the number of threads in pool the
+ /// main thread is notified.
+ /// -# Thread blocks until notified the pool's run state is no
+ /// longer PAUSED.
+ /// -# The count of threads paused is decremented.
+ ///
+ /// STOPPED:
+ /// -# The run loop is exited.
+ /// -# The count of threads exited is incremented.
+ /// -# If the count has reached the number of threads in pool the
+ /// main thread is notified.
+ /// -# function exits.
+ void threadWork();
+
public:
/// @brief Fetches the IOService that drives the pool.
/// @brief Number of threads currently running.
size_t running_;
- /// @brief Number of threads that have exited the work funcion.
+ /// @brief Number of threads that have exited the work function.
size_t exited_;
/// @brief Pool of threads used to service connections in multi-threaded