checkState(thread_pool, 0, 0);
}
+/// @brief test ThreadPool pause and resume
+TEST_F(ThreadPoolTest, pauseAndResume) {
+ uint32_t items_count;
+ uint32_t thread_count;
+ CallBack call_back;
+ ThreadPool<CallBack> thread_pool;
+ // the item count should be 0
+ ASSERT_EQ(thread_pool.count(), 0);
+ // the thread count should be 0
+ ASSERT_EQ(thread_pool.size(), 0);
+
+ items_count = 4;
+ thread_count = 4;
+ // prepare setup
+ reset(thread_count);
+
+ // create tasks which block thread pool threads until signaled by main
+ // thread to force all threads of the thread pool to run exactly one task
+ call_back = std::bind(&ThreadPoolTest::runAndWait, this);
+
+ // calling resume should do nothing if not started
+ EXPECT_NO_THROW(thread_pool.resume());
+ // the item count should be 0
+ ASSERT_EQ(thread_pool.count(), 0);
+ // the thread count should be 0
+ ASSERT_EQ(thread_pool.size(), 0);
+
+ // do it once again to check if it works
+ EXPECT_NO_THROW(thread_pool.resume());
+ // the item count should be 0
+ ASSERT_EQ(thread_pool.count(), 0);
+ // the thread count should be 0
+ ASSERT_EQ(thread_pool.size(), 0);
+
+ // calling pause should do nothing if not started
+ EXPECT_NO_THROW(thread_pool.pause());
+ // the item count should be 0
+ ASSERT_EQ(thread_pool.count(), 0);
+ // the thread count should be 0
+ ASSERT_EQ(thread_pool.size(), 0);
+
+ // do it once again to check if it works
+ EXPECT_NO_THROW(thread_pool.pause());
+ // the item count should be 0
+ ASSERT_EQ(thread_pool.count(), 0);
+ // the thread count should be 0
+ ASSERT_EQ(thread_pool.size(), 0);
+
+ // calling resume should do nothing if not started
+ EXPECT_NO_THROW(thread_pool.resume());
+ // the item count should be 0
+ ASSERT_EQ(thread_pool.count(), 0);
+ // the thread count should be 0
+ ASSERT_EQ(thread_pool.size(), 0);
+
+ // add items to stopped thread pool
+ for (uint32_t i = 0; i < items_count; ++i) {
+ bool ret = true;
+ EXPECT_NO_THROW(ret = thread_pool.add(boost::make_shared<CallBack>(call_back)));
+ EXPECT_TRUE(ret);
+ }
+
+ // the item count should match
+ ASSERT_EQ(thread_pool.count(), items_count);
+ // the thread count should be 0
+ ASSERT_EQ(thread_pool.size(), 0);
+
+ // calling pause should do nothing if not started
+ EXPECT_NO_THROW(thread_pool.pause());
+ // the item count should match
+ ASSERT_EQ(thread_pool.count(), items_count);
+ // the thread count should be 0
+ ASSERT_EQ(thread_pool.size(), 0);
+
+ // calling resume should do nothing if not started
+ EXPECT_NO_THROW(thread_pool.resume());
+ // the item count should match
+ ASSERT_EQ(thread_pool.count(), items_count);
+ // the thread count should be 0
+ ASSERT_EQ(thread_pool.size(), 0);
+
+ // calling pause should do nothing if not started
+ EXPECT_NO_THROW(thread_pool.pause());
+ // the item count should match
+ ASSERT_EQ(thread_pool.count(), items_count);
+ // the thread count should be 0
+ ASSERT_EQ(thread_pool.size(), 0);
+
+ // calling start should create the threads and should keep the queued items
+ EXPECT_NO_THROW(thread_pool.start(thread_count));
+ // the item count should match
+ ASSERT_EQ(thread_pool.count(), items_count);
+ // the thread count should match
+ ASSERT_EQ(thread_pool.size(), thread_count);
+
+ // calling resume should resume threads
+ EXPECT_NO_THROW(thread_pool.resume());
+ // the thread count should match
+ ASSERT_EQ(thread_pool.size(), thread_count);
+
+ // wait for all items to be processed
+ waitTasks(thread_count, items_count);
+ // the item count should be 0
+ ASSERT_EQ(thread_pool.count(), 0);
+ // the thread count should match
+ ASSERT_EQ(thread_pool.size(), thread_count);
+ // as each thread pool thread is still waiting on main to unblock, each
+ // thread should have been registered in ids list
+ checkIds(items_count);
+ // all items should have been processed
+ ASSERT_EQ(count(), items_count);
+
+ // check that the number of processed tasks matches the number of items
+ checkRunHistory(items_count);
+
+ // signal thread pool tasks to continue
+ signalThreads();
+
+ // do it once again to check if it works
+ EXPECT_NO_THROW(thread_pool.resume());
+ // the item count should be 0
+ ASSERT_EQ(thread_pool.count(), 0);
+ // the thread count should match
+ ASSERT_EQ(thread_pool.size(), thread_count);
+
+ // calling stop should clear all threads and should keep queued items
+ EXPECT_NO_THROW(thread_pool.stop());
+ // the item count should be 0
+ ASSERT_EQ(thread_pool.count(), 0);
+ // the thread count should be 0
+ ASSERT_EQ(thread_pool.size(), 0);
+
+ items_count = 64;
+ thread_count = 16;
+ // prepare setup
+ reset(thread_count);
+
+ // create tasks which do not block the thread pool threads so that several
+ // tasks can be run on the same thread and some of the threads never even
+ // having a chance to run
+ call_back = std::bind(&ThreadPoolTest::run, this);
+
+ // add items to stopped thread pool
+ for (uint32_t i = 0; i < items_count; ++i) {
+ bool ret = true;
+ EXPECT_NO_THROW(ret = thread_pool.add(boost::make_shared<CallBack>(call_back)));
+ EXPECT_TRUE(ret);
+ }
+
+ // the item count should match
+ ASSERT_EQ(thread_pool.count(), items_count);
+ // the thread count should be 0
+ ASSERT_EQ(thread_pool.size(), 0);
+
+ // calling start should create the threads and should keep the queued items
+ EXPECT_NO_THROW(thread_pool.start(thread_count));
+ // the thread count should match
+ ASSERT_EQ(thread_pool.size(), thread_count);
+
+ // wait for all items to be processed
+ waitTasks(thread_count, items_count);
+ // the item count should be 0
+ ASSERT_EQ(thread_pool.count(), 0);
+ // the thread count should match
+ ASSERT_EQ(thread_pool.size(), thread_count);
+ // all items should have been processed
+ ASSERT_EQ(count(), items_count);
+
+ // check that the number of processed tasks matches the number of items
+ checkRunHistory(items_count);
+
+ // calling pause should pause threads
+ EXPECT_NO_THROW(thread_pool.pause());
+ // the item count should be 0
+ ASSERT_EQ(thread_pool.count(), 0);
+ // the thread count should match
+ ASSERT_EQ(thread_pool.size(), thread_count);
+
+ // do it once again to check if it works
+ EXPECT_NO_THROW(thread_pool.pause());
+ // the item count should be 0
+ ASSERT_EQ(thread_pool.count(), 0);
+ // the thread count should match
+ ASSERT_EQ(thread_pool.size(), thread_count);
+
+ // prepare setup
+ reset(thread_count);
+
+ // add items to paused thread pool
+ for (uint32_t i = 0; i < items_count; ++i) {
+ bool ret = true;
+ EXPECT_NO_THROW(ret = thread_pool.add(boost::make_shared<CallBack>(call_back)));
+ EXPECT_TRUE(ret);
+ }
+
+ // the item count should match
+ ASSERT_EQ(thread_pool.count(), items_count);
+ // the thread count should match
+ ASSERT_EQ(thread_pool.size(), thread_count);
+
+ // calling resume should resume threads
+ EXPECT_NO_THROW(thread_pool.resume());
+ // the thread count should match
+ ASSERT_EQ(thread_pool.size(), thread_count);
+
+ // wait for all items to be processed
+ waitTasks(thread_count, items_count);
+ // the item count should be 0
+ ASSERT_EQ(thread_pool.count(), 0);
+ // the thread count should match
+ ASSERT_EQ(thread_pool.size(), thread_count);
+ // all items should have been processed
+ ASSERT_EQ(count(), items_count);
+
+ // check that the number of processed tasks matches the number of items
+ checkRunHistory(items_count);
+
+ // calling stop should clear all threads and should keep queued items
+ EXPECT_NO_THROW(thread_pool.stop());
+ // the item count should be 0
+ ASSERT_EQ(thread_pool.count(), 0);
+ // the thread count should be 0
+ ASSERT_EQ(thread_pool.size(), 0);
+>>>>>>> 0d7199fdd8 ([#1599] implemented pause and resume)
+}
+
/// @brief test ThreadPool max queue size
TEST_F(ThreadPoolTest, maxQueueSize) {
uint32_t items_count;
return (queue_.wait(seconds));
}
+ /// @brief pause threads
+ ///
+ /// Used to pause threads so that they stop processing tasks
+ void pause() {
+ queue_.pause();
+ }
+
+ /// @brief resume threads
+ ///
+ /// Used to resume threads so that they start processing tasks
+ void resume() {
+ queue_.resume();
+ }
+
/// @brief set maximum number of work items in the queue
///
/// @param max_queue_size the maximum size (0 means unlimited)
///
/// Creates the thread pool queue in 'disabled' state
ThreadPoolQueue()
- : enabled_(false), max_queue_size_(0), working_(0),
+ : enabled_(false), paused_(false), max_queue_size_(0), working_(0),
stat10(0.), stat100(0.), stat1000(0.) {
}
wait_cv_.notify_all();
}
cv_.wait(lock, [&]() {return (!enabled_ || !queue_.empty());});
+ pause_cv_.wait(lock, [&]() {return (!enabled_ || !paused_);});
++working_;
if (!enabled_) {
return (Item());
return (ret);
}
+ /// @brief pause threads
+ ///
+ /// Used to pause threads so that they stop processing tasks
+ void pause() {
+ std::unique_lock<std::mutex> lock(mutex_);
+ paused_ = true;
+ }
+
+ /// @brief resume threads
+ ///
+ /// Used to resume threads so that they start processing tasks
+ void resume() {
+ std::unique_lock<std::mutex> lock(mutex_);
+ paused_ = false;
+ pause_cv_.notify_all();
+ }
+
/// @brief get queue length statistic
///
/// @param which select the statistic (10, 100 or 1000)
/// @brief condition variable used to wait for all items to be processed
std::condition_variable wait_cv_;
+ /// @brief condition variable used to pause threads
+ std::condition_variable pause_cv_;
+
/// @brief the state of the queue
/// The 'enabled' state corresponds to true value
/// The 'disabled' state corresponds to false value
std::atomic<bool> enabled_;
+ /// @brief the pause state of the queue
+ /// The 'paused' state corresponds to true value
+ /// The 'resumed' state corresponds to false value
+ std::atomic<bool> paused_;
+
/// @brief maximum number of work items in the queue
/// (0 means unlimited)
size_t max_queue_size_;