namespace {
+/// @brief define CallBack type
+typedef function<void()> CallBack;
+
/// @brief Test Fixture for testing isc::dhcp::ThreadPool
class ThreadPoolTest : public ::testing::Test {
public:
ThreadPoolTest() : thread_count_(0), count_(0), wait_(false) {
}
+ /// @brief task function
+ void runTryStopAndWait(ThreadPool<CallBack> &thread_pool) {
+ EXPECT_THROW(thread_pool.stop(), InvalidOperation);
+ EXPECT_NO_THROW(runAndWait());
+ }
+
/// @brief task function which registers the thread id and signals main
/// thread to stop waiting and then waits for main thread to signal to exit
void runAndWait() {
list<boost::shared_ptr<std::thread>> threads_;
};
-/// @brief define CallBack type
-typedef function<void()> CallBack;
-
/// @brief test ThreadPool add and count
TEST_F(ThreadPoolTest, testAddAndCount) {
uint32_t items_count;
ASSERT_EQ(thread_pool.count(), 0);
// the thread count should be 0
ASSERT_EQ(thread_pool.size(), 0);
+
+ // create tasks which try to stop the thread pool and then block thread pool
+ // threads until signaled by main thread to force all threads of the thread
+ // pool to run exactly one task
+ call_back = std::bind(&ThreadPoolTest::runTryStopAndWait, this, thread_pool);
+
+ // calling start should create the threads and should keep the queued items
+ EXPECT_NO_THROW(thread_pool.start(thread_count));
+ // the item count should be 0
+ ASSERT_EQ(thread_pool.count(), 0);
+ // the thread count should be 0
+ ASSERT_EQ(thread_pool.size(), thread_count);
+
+ // add items to running thread pool
+ for (uint32_t i = 0; i < items_count; ++i) {
+ EXPECT_NO_THROW(thread_pool.add(boost::make_shared<CallBack>(call_back)));
+ }
+
+ // wait for all items to be processed
+ waitTasks(thread_count, items_count);
+ // the item count should be 0
+ ASSERT_EQ(thread_pool.count(), 0);
+ // the thread count should match
+ ASSERT_EQ(thread_pool.size(), thread_count);
+ // as each thread pool thread is still waiting on main to unblock, each
+ // thread should have been registered in ids list
+ checkIds(items_count);
+ // all items should have been processed
+ ASSERT_EQ(count(), items_count);
+
+ // check that the number of processed tasks matches the number of items
+ checkRunHistory(items_count);
+
+ // signal thread pool tasks to continue
+ signalThreads();
+
+ // calling stop should clear all threads and should keep queued items
+ EXPECT_NO_THROW(thread_pool.stop());
+ // the item count should be 0
+ ASSERT_EQ(thread_pool.count(), 0);
+ // the thread count should be 0
+ ASSERT_EQ(thread_pool.size(), 0);
}
} // namespace
/// @brief stop all the threads
void stopInternal() {
+ auto id = std::this_thread::get_id();
+ if (checkThreadId(id)) {
+ isc_throw(InvalidOperation, "thread pool stop called by owned thread");
+ }
queue_.disable();
for (auto thread : threads_) {
thread->join();
threads_.clear();
}
+ /// @brief check specified thread id against own threads
+ ///
+ /// @return true if thread is owned, false otherwise
+ bool checkThreadId(std::thread::id id) {
+ for (auto thread : threads_) {
+ if (id == thread->get_id()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
/// @brief Defines a generic thread pool queue.
///
/// The main purpose is to safely manage thread pool tasks.