]> git.ipfire.org Git - thirdparty/kea.git/commitdiff
[#3142] fixed race between start stop and wait
authorRazvan Becheriu <razvan@isc.org>
Wed, 8 Nov 2023 09:31:07 +0000 (11:31 +0200)
committerRazvan Becheriu <razvan@isc.org>
Tue, 14 Nov 2023 17:20:17 +0000 (17:20 +0000)
src/lib/dhcp/tests/libdhcp++_unittest.cc
src/lib/util/tests/multi_threading_mgr_unittest.cc
src/lib/util/tests/thread_pool_unittest.cc
src/lib/util/thread_pool.h

index 7c05b9785965a95d9b7dc058b738ace31a9b17b5..3c6262c0635c1ff6ed92758f6a1496fa14d887c3 100644 (file)
@@ -1121,7 +1121,7 @@ TEST_F(LibDhcpTest, splitOptionNoBufferMultiThreading) {
         boost::shared_ptr<CallBack> call_back = boost::make_shared<CallBack>(work);
         tp.add(call_back);
     }
-    ASSERT_TRUE(tp.wait(10));
+    ASSERT_TRUE(tp.wait(30));
 }
 
 // This test verifies that split options works if there is only one byte
@@ -1177,7 +1177,7 @@ TEST_F(LibDhcpTest, splitOptionOneByteLeftBufferMultiThreading) {
         boost::shared_ptr<CallBack> call_back = boost::make_shared<CallBack>(work);
         tp.add(call_back);
     }
-    ASSERT_TRUE(tp.wait(10));
+    ASSERT_TRUE(tp.wait(30));
 }
 
 // This test verifies that split options for v4 is working correctly.
@@ -1293,7 +1293,7 @@ TEST_F(LibDhcpTest, splitOptionWithSuboptionAtLimitMultiThreading) {
         boost::shared_ptr<CallBack> call_back = boost::make_shared<CallBack>(work);
         tp.add(call_back);
     }
-    ASSERT_TRUE(tp.wait(10));
+    ASSERT_TRUE(tp.wait(30));
 }
 
 // This test verifies that split options for v4 is working correctly.
@@ -1347,7 +1347,7 @@ TEST_F(LibDhcpTest, splitLongOptionMultiThreading) {
         boost::shared_ptr<CallBack> call_back = boost::make_shared<CallBack>(work);
         tp.add(call_back);
     }
-    ASSERT_TRUE(tp.wait(10));
+    ASSERT_TRUE(tp.wait(30));
 }
 
 // This test verifies that split options for v4 is working correctly even if
@@ -1433,7 +1433,7 @@ TEST_F(LibDhcpTest, splitOptionWithSuboptionWhichOverflowMultiThreading) {
         boost::shared_ptr<CallBack> call_back = boost::make_shared<CallBack>(work);
         tp.add(call_back);
     }
-    ASSERT_TRUE(tp.wait(10));
+    ASSERT_TRUE(tp.wait(30));
 }
 
 // This test verifies that split options for v4 is working correctly.
@@ -1531,7 +1531,7 @@ TEST_F(LibDhcpTest, splitLongOptionWithLongSuboptionMultiThreading) {
         boost::shared_ptr<CallBack> call_back = boost::make_shared<CallBack>(work);
         tp.add(call_back);
     }
-    ASSERT_TRUE(tp.wait(10));
+    ASSERT_TRUE(tp.wait(30));
 }
 
 // This test verifies that fuse options for v4 is working correctly.
index 4d3ae33a27c653922a6eaeabba6d9c5ebe63269b..9c9828a23398887b32f580253db42d7017692be4 100644 (file)
@@ -17,12 +17,31 @@ using namespace isc;
 
 /// @brief Fixture used to reset multi-threading before and after each test.
 struct MultiThreadingMgrTest : ::testing::Test {
+    /// @brief Constructor.
     MultiThreadingMgrTest() {
         MultiThreadingMgr::instance().apply(false, 0, 0);
     }
+
+    /// @brief Destructor.
     ~MultiThreadingMgrTest() {
         MultiThreadingMgr::instance().apply(false, 0, 0);
     }
+
+    /// @brief Check thread pool state.
+    ///
+    /// @param mode The multi-threading mode.
+    /// @param size The thread pool size.
+    /// @param count The thread queue size.
+    /// @param running The running threads count.
+    /// @param in_cs Flag which indicates if running inside critical section.
+    void checkState(bool mode, size_t size, size_t count, size_t running,
+                    bool in_cs = false) {
+        EXPECT_EQ(MultiThreadingMgr::instance().getMode(), mode);
+        EXPECT_EQ(MultiThreadingMgr::instance().getThreadPoolSize(), size);
+        EXPECT_EQ(MultiThreadingMgr::instance().getPacketQueueSize(), count);
+        EXPECT_EQ(MultiThreadingMgr::instance().getThreadPool().size(), running);
+        EXPECT_EQ(MultiThreadingMgr::instance().isInCriticalSection(), in_cs);
+    }
 };
 
 /// @brief Verifies that the default mode is false (MT disabled).
@@ -88,64 +107,24 @@ TEST_F(MultiThreadingMgrTest, detectThreadCount) {
 
 /// @brief Verifies that apply settings works.
 TEST_F(MultiThreadingMgrTest, applyConfig) {
-    // get the thread pool
-    auto& thread_pool = MultiThreadingMgr::instance().getThreadPool();
-    // MT should be disabled
-    EXPECT_FALSE(MultiThreadingMgr::instance().getMode());
-    // default thread count is 0
-    EXPECT_EQ(MultiThreadingMgr::instance().getThreadPoolSize(), 0);
-    // thread pool should be stopped
-    EXPECT_EQ(thread_pool.size(), 0);
+    checkState(false, 0, 0, 0);
     // enable MT with 16 threads and queue size 256
     EXPECT_NO_THROW(MultiThreadingMgr::instance().apply(true, 16, 256));
-    // MT should be enabled
-    EXPECT_TRUE(MultiThreadingMgr::instance().getMode());
-    // thread count should be 16
-    EXPECT_EQ(MultiThreadingMgr::instance().getThreadPoolSize(), 16);
-    // queue size should be 256
-    EXPECT_EQ(MultiThreadingMgr::instance().getPacketQueueSize(), 256);
-    // thread pool should be started
-    EXPECT_EQ(thread_pool.size(), 16);
+    checkState(true, 16, 256, 16);
     // disable MT
     EXPECT_NO_THROW(MultiThreadingMgr::instance().apply(false, 16, 256));
-    // MT should be disabled
-    EXPECT_FALSE(MultiThreadingMgr::instance().getMode());
-    // thread count should be 0
-    EXPECT_EQ(MultiThreadingMgr::instance().getThreadPoolSize(), 0);
-    // queue size should be 0
-    EXPECT_EQ(MultiThreadingMgr::instance().getPacketQueueSize(), 0);
-    // thread pool should be stopped
-    EXPECT_EQ(thread_pool.size(), 0);
+    checkState(false, 0, 0, 0);
     // enable MT with auto scaling
     EXPECT_NO_THROW(MultiThreadingMgr::instance().apply(true, 0, 0));
-    // MT should be enabled
-    EXPECT_TRUE(MultiThreadingMgr::instance().getMode());
-    // thread count should be detected automatically
-    EXPECT_EQ(MultiThreadingMgr::instance().getThreadPoolSize(), MultiThreadingMgr::detectThreadCount());
-    // thread pool should be started
-    EXPECT_EQ(thread_pool.size(), MultiThreadingMgr::detectThreadCount());
+    checkState(true, MultiThreadingMgr::detectThreadCount(), 0, MultiThreadingMgr::detectThreadCount());
     // disable MT
     EXPECT_NO_THROW(MultiThreadingMgr::instance().apply(false, 0, 0));
-    // MT should be disabled
-    EXPECT_FALSE(MultiThreadingMgr::instance().getMode());
-    // thread count should be 0
-    EXPECT_EQ(MultiThreadingMgr::instance().getThreadPoolSize(), 0);
-    // thread pool should be stopped
-    EXPECT_EQ(thread_pool.size(), 0);
+    checkState(false, 0, 0, 0);
 }
 
 /// @brief Verifies that the critical section flag works.
 TEST_F(MultiThreadingMgrTest, criticalSectionFlag) {
-    // get the thread pool
-    auto& thread_pool = MultiThreadingMgr::instance().getThreadPool();
-    // MT should be disabled
-    EXPECT_FALSE(MultiThreadingMgr::instance().getMode());
-    // critical section should be disabled
-    EXPECT_FALSE(MultiThreadingMgr::instance().isInCriticalSection());
-    // thread count should be 0
-    EXPECT_EQ(MultiThreadingMgr::instance().getThreadPoolSize(), 0);
-    // thread pool should be stopped
-    EXPECT_EQ(thread_pool.size(), 0);
+    checkState(false, 0, 0, 0);
     // exit critical section
     EXPECT_THROW(MultiThreadingMgr::instance().exitCriticalSection(), InvalidOperation);
     // critical section should be disabled
@@ -156,14 +135,7 @@ TEST_F(MultiThreadingMgrTest, criticalSectionFlag) {
     EXPECT_TRUE(MultiThreadingMgr::instance().isInCriticalSection());
     // enable MT with 16 threads and queue size 256
     EXPECT_NO_THROW(MultiThreadingMgr::instance().apply(true, 16, 256));
-    // MT should be enabled
-    EXPECT_TRUE(MultiThreadingMgr::instance().getMode());
-    // thread count should be 16
-    EXPECT_EQ(MultiThreadingMgr::instance().getThreadPoolSize(), 16);
-    // queue size should be 256
-    EXPECT_EQ(MultiThreadingMgr::instance().getPacketQueueSize(), 256);
-    // thread pool should be stopped
-    EXPECT_EQ(thread_pool.size(), 0);
+    checkState(true, 16, 256, 0, true);
     // exit critical section
     EXPECT_NO_THROW(MultiThreadingMgr::instance().exitCriticalSection());
     // critical section should be disabled
@@ -174,164 +146,69 @@ TEST_F(MultiThreadingMgrTest, criticalSectionFlag) {
     EXPECT_FALSE(MultiThreadingMgr::instance().isInCriticalSection());
     // disable MT
     EXPECT_NO_THROW(MultiThreadingMgr::instance().apply(false, 0, 0));
-    // MT should be disabled
-    EXPECT_FALSE(MultiThreadingMgr::instance().getMode());
-    // thread count should be 0
-    EXPECT_EQ(MultiThreadingMgr::instance().getThreadPoolSize(), 0);
-    // queue size should be 0
-    EXPECT_EQ(MultiThreadingMgr::instance().getPacketQueueSize(), 0);
-    // thread pool should be stopped
-    EXPECT_EQ(thread_pool.size(), 0);
+    checkState(false, 0, 0, 0);
 }
 
 /// @brief Verifies that the critical section works.
 TEST_F(MultiThreadingMgrTest, criticalSection) {
-    // get the thread pool instance
-    auto& thread_pool = MultiThreadingMgr::instance().getThreadPool();
-    // thread pool should be stopped
-    EXPECT_EQ(thread_pool.size(), 0);
+    checkState(false, 0, 0, 0);
     // apply multi-threading configuration with 16 threads and queue size 256
     MultiThreadingMgr::instance().apply(true, 16, 256);
-    // thread count should match
-    EXPECT_EQ(thread_pool.size(), 16);
-    // thread count should be 16
-    EXPECT_EQ(MultiThreadingMgr::instance().getThreadPoolSize(), 16);
-    // queue size should be 256
-    EXPECT_EQ(MultiThreadingMgr::instance().getPacketQueueSize(), 256);
+    checkState(true, 16, 256, 16);
     // use scope to test constructor and destructor
     {
         MultiThreadingCriticalSection cs;
-        // thread pool should be stopped
-        EXPECT_EQ(thread_pool.size(), 0);
-        // thread count should be 16
-        EXPECT_EQ(MultiThreadingMgr::instance().getThreadPoolSize(), 16);
-        // queue size should be 256
-        EXPECT_EQ(MultiThreadingMgr::instance().getPacketQueueSize(), 256);
+        checkState(true, 16, 256, 0, true);
         // use scope to test constructor and destructor
         {
             MultiThreadingCriticalSection inner_cs;
-            // thread pool should be stopped
-            EXPECT_EQ(thread_pool.size(), 0);
-            // thread count should be 16
-            EXPECT_EQ(MultiThreadingMgr::instance().getThreadPoolSize(), 16);
-            // queue size should be 256
-            EXPECT_EQ(MultiThreadingMgr::instance().getPacketQueueSize(), 256);
+            checkState(true, 16, 256, 0, true);
         }
-        // thread pool should be stopped
-        EXPECT_EQ(thread_pool.size(), 0);
-        // thread count should be 16
-        EXPECT_EQ(MultiThreadingMgr::instance().getThreadPoolSize(), 16);
-        // queue size should be 256
-        EXPECT_EQ(MultiThreadingMgr::instance().getPacketQueueSize(), 256);
+        checkState(true, 16, 256, 0, true);
     }
-    // thread count should match
-    EXPECT_EQ(thread_pool.size(), 16);
-    // thread count should be 16
-    EXPECT_EQ(MultiThreadingMgr::instance().getThreadPoolSize(), 16);
-    // queue size should be 256
-    EXPECT_EQ(MultiThreadingMgr::instance().getPacketQueueSize(), 256);
+    checkState(true, 16, 256, 16);
     // use scope to test constructor and destructor
     {
         MultiThreadingCriticalSection cs;
-        // thread pool should be stopped
-        EXPECT_EQ(thread_pool.size(), 0);
-        // thread count should be 16
-        EXPECT_EQ(MultiThreadingMgr::instance().getThreadPoolSize(), 16);
-        // queue size should be 256
-        EXPECT_EQ(MultiThreadingMgr::instance().getPacketQueueSize(), 256);
+        checkState(true, 16, 256, 0, true);
         // apply multi-threading configuration with 64 threads and queue size 4
         MultiThreadingMgr::instance().apply(true, 64, 4);
-        // thread pool should be stopped
-        EXPECT_EQ(thread_pool.size(), 0);
-        // thread count should be 64
-        EXPECT_EQ(MultiThreadingMgr::instance().getThreadPoolSize(), 64);
-        // queue size should be 4
-        EXPECT_EQ(MultiThreadingMgr::instance().getPacketQueueSize(), 4);
+        checkState(true, 64, 4, 0, true);
     }
-    // thread count should match
-    EXPECT_EQ(thread_pool.size(), 64);
-    // thread count should be 64
-    EXPECT_EQ(MultiThreadingMgr::instance().getThreadPoolSize(), 64);
-    // queue size should be 4
-    EXPECT_EQ(MultiThreadingMgr::instance().getPacketQueueSize(), 4);
+    checkState(true, 64, 4, 64);
     // use scope to test constructor and destructor
     {
         MultiThreadingCriticalSection cs;
-        // thread pool should be stopped
-        EXPECT_EQ(thread_pool.size(), 0);
-        // thread count should be 64
-        EXPECT_EQ(MultiThreadingMgr::instance().getThreadPoolSize(), 64);
-        // queue size should be 4
-        EXPECT_EQ(MultiThreadingMgr::instance().getPacketQueueSize(), 4);
+        checkState(true, 64, 4, 0, true);
         // apply multi-threading configuration with 0 threads
         MultiThreadingMgr::instance().apply(false, 64, 256);
-        // thread pool should be stopped
-        EXPECT_EQ(thread_pool.size(), 0);
-        // thread count should be 0
-        EXPECT_EQ(MultiThreadingMgr::instance().getThreadPoolSize(), 0);
-        // queue size should be 0
-        EXPECT_EQ(MultiThreadingMgr::instance().getPacketQueueSize(), 0);
+        checkState(false, 0, 0, 0, true);
     }
-    // thread count should match
-    EXPECT_EQ(thread_pool.size(), 0);
-    // thread count should be 0
-    EXPECT_EQ(MultiThreadingMgr::instance().getThreadPoolSize(), 0);
-    // queue size should be 0
-    EXPECT_EQ(MultiThreadingMgr::instance().getPacketQueueSize(), 0);
+    checkState(false, 0, 0, 0);
     // use scope to test constructor and destructor
     {
         MultiThreadingCriticalSection cs;
-        // thread pool should be stopped
-        EXPECT_EQ(thread_pool.size(), 0);
-        // thread count should be 0
-        EXPECT_EQ(MultiThreadingMgr::instance().getThreadPoolSize(), 0);
-        // queue size should be 0
-        EXPECT_EQ(MultiThreadingMgr::instance().getPacketQueueSize(), 0);
+        checkState(false, 0, 0, 0, true);
         // use scope to test constructor and destructor
         {
             MultiThreadingCriticalSection inner_cs;
-            // thread pool should be stopped
-            EXPECT_EQ(thread_pool.size(), 0);
-            // thread count should be 0
-            EXPECT_EQ(MultiThreadingMgr::instance().getThreadPoolSize(), 0);
-            // queue size should be 0
-            EXPECT_EQ(MultiThreadingMgr::instance().getPacketQueueSize(), 0);
+            checkState(false, 0, 0, 0, true);
         }
-        // thread pool should be stopped
-        EXPECT_EQ(thread_pool.size(), 0);
-        // thread count should be 0
-        EXPECT_EQ(MultiThreadingMgr::instance().getThreadPoolSize(), 0);
-        // queue size should be 0
-        EXPECT_EQ(MultiThreadingMgr::instance().getPacketQueueSize(), 0);
+        checkState(false, 0, 0, 0, true);
     }
-    // thread count should match
-    EXPECT_EQ(thread_pool.size(), 0);
-    // thread count should be 0
-    EXPECT_EQ(MultiThreadingMgr::instance().getThreadPoolSize(), 0);
-    // queue size should be 0
-    EXPECT_EQ(MultiThreadingMgr::instance().getPacketQueueSize(), 0);
+    checkState(false, 0, 0, 0);
     // use scope to test constructor and destructor
     {
         MultiThreadingCriticalSection cs;
-        // thread pool should be stopped
-        EXPECT_EQ(thread_pool.size(), 0);
+        checkState(false, 0, 0, 0, true);
         // apply multi-threading configuration with 64 threads
         MultiThreadingMgr::instance().apply(true, 64, 256);
-        // thread pool should be stopped
-        EXPECT_EQ(thread_pool.size(), 0);
-        // thread count should be 64
-        EXPECT_EQ(MultiThreadingMgr::instance().getThreadPoolSize(), 64);
-        // queue size should be 256
-        EXPECT_EQ(MultiThreadingMgr::instance().getPacketQueueSize(), 256);
+        checkState(true, 64, 256, 0, true);
     }
-    // thread count should match
-    EXPECT_EQ(thread_pool.size(), 64);
-    // thread count should be 64
-    EXPECT_EQ(MultiThreadingMgr::instance().getThreadPoolSize(), 64);
-    // queue size should be 256
-    EXPECT_EQ(MultiThreadingMgr::instance().getPacketQueueSize(), 256);
+    checkState(true, 64, 256, 64);
     // apply multi-threading configuration with 0 threads
     MultiThreadingMgr::instance().apply(false, 0, 0);
+    checkState(false, 0, 0, 0);
 }
 
 /// @brief Checks that the lock works only when multi-threading is enabled and
index 9c636c9e85f42b2d8507a1c67f3a96603f0549aa..43becc3955aca005082e2f2d252e409309e85f58 100644 (file)
@@ -59,6 +59,8 @@ public:
 
     /// @brief task function which tries to stop the thread pool and then calls
     /// @ref runAndWait
+    ///
+    /// @param thread_pool the thread pool
     void runStopResetAndWait(ThreadPool<CallBack>* thread_pool) {
         EXPECT_THROW(thread_pool->stop(), MultiThreadingInvalidOperation);
         EXPECT_THROW(thread_pool->reset(), MultiThreadingInvalidOperation);
@@ -74,6 +76,8 @@ public:
     }
 
     /// @brief reset all counters and internal test state
+    ///
+    /// @param thread_count number of threads
     void reset(uint32_t thread_count) {
         // stop test threads
         stopThreads();
@@ -163,6 +167,8 @@ public:
     /// @brief check the total number of tasks that have been processed
     /// Some of the tasks may have been run on the same thread and none may have
     /// been processed by other threads
+    ///
+    /// @param items_count the number of tasks
     void checkRunHistory(uint32_t items_count) {
         uint32_t count = 0;
         // iterate over all threads history and count all the processed tasks
@@ -173,10 +179,24 @@ public:
     }
 
     /// @brief check the total number of threads that have processed tasks
+    ///
+    /// @param number of threads processing tasks
     void checkIds(uint32_t count) {
         ASSERT_EQ(ids_.size(), count);
     }
 
+    /// @brief check the thread pool state
+    ///
+    /// @param thread_pool the thread pool
+    /// @param count the number of tasks
+    /// @param size the thread pool size
+    void checkState(ThreadPool<CallBack>& thread_pool, uint32_t count, uint32_t size) {
+        // the item count should match
+        ASSERT_EQ(thread_pool.count(), count);
+        // the thread count should match
+        ASSERT_EQ(thread_pool.size(), size);
+    }
+
 private:
     /// @brief thread count used by the test
     uint32_t thread_count_;
@@ -217,10 +237,7 @@ TEST_F(ThreadPoolTest, addAndCount) {
     uint32_t items_count;
     CallBack call_back;
     ThreadPool<CallBack> thread_pool;
-    // the item count should be 0
-    ASSERT_EQ(thread_pool.count(), 0);
-    // the thread count should be 0
-    ASSERT_EQ(thread_pool.size(), 0);
+    checkState(thread_pool, 0, 0);
 
     items_count = 4;
 
@@ -238,10 +255,7 @@ TEST_F(ThreadPoolTest, addAndCount) {
 
     // calling reset should clear all threads and should remove all queued items
     EXPECT_NO_THROW(thread_pool.reset());
-    // the item count should be 0
-    ASSERT_EQ(thread_pool.count(), 0);
-    // the thread count should be 0
-    ASSERT_EQ(thread_pool.size(), 0);
+    checkState(thread_pool, 0, 0);
 }
 
 /// @brief test ThreadPool start and stop
@@ -250,10 +264,7 @@ TEST_F(ThreadPoolTest, startAndStop) {
     uint32_t thread_count;
     CallBack call_back;
     ThreadPool<CallBack> thread_pool;
-    // the item count should be 0
-    ASSERT_EQ(thread_pool.count(), 0);
-    // the thread count should be 0
-    ASSERT_EQ(thread_pool.size(), 0);
+    checkState(thread_pool, 0, 0);
 
     items_count = 4;
     thread_count = 4;
@@ -268,31 +279,19 @@ TEST_F(ThreadPoolTest, startAndStop) {
     EXPECT_THROW(thread_pool.start(0), InvalidParameter);
     // calling start should create the threads and should keep the queued items
     EXPECT_NO_THROW(thread_pool.start(thread_count));
-    // the item count should be 0
-    ASSERT_EQ(thread_pool.count(), 0);
-    // the thread count should match
-    ASSERT_EQ(thread_pool.size(), thread_count);
+    checkState(thread_pool, 0, thread_count);
 
     // do it once again to check if it works
     EXPECT_THROW(thread_pool.start(thread_count), InvalidOperation);
-    // the item count should be 0
-    ASSERT_EQ(thread_pool.count(), 0);
-    // the thread count should match
-    ASSERT_EQ(thread_pool.size(), thread_count);
+    checkState(thread_pool, 0, thread_count);
 
     // calling stop should clear all threads and should keep queued items
     EXPECT_NO_THROW(thread_pool.stop());
-    // the item count should be 0
-    ASSERT_EQ(thread_pool.count(), 0);
-    // the thread count should be 0
-    ASSERT_EQ(thread_pool.size(), 0);
+    checkState(thread_pool, 0, 0);
 
     // do it once again to check if it works
     EXPECT_THROW(thread_pool.stop(), InvalidOperation);
-    // the item count should be 0
-    ASSERT_EQ(thread_pool.count(), 0);
-    // the thread count should be 0
-    ASSERT_EQ(thread_pool.size(), 0);
+    checkState(thread_pool, 0, 0);
 
     // add items to stopped thread pool
     for (uint32_t i = 0; i < items_count; ++i) {
@@ -301,38 +300,23 @@ TEST_F(ThreadPoolTest, startAndStop) {
         EXPECT_TRUE(ret);
     }
 
-    // the item count should match
-    ASSERT_EQ(thread_pool.count(), items_count);
-    // the thread count should be 0
-    ASSERT_EQ(thread_pool.size(), 0);
+    checkState(thread_pool, items_count, 0);
 
     // calling stop should clear all threads and should keep queued items
     EXPECT_THROW(thread_pool.stop(), InvalidOperation);
-    // the item count should match
-    ASSERT_EQ(thread_pool.count(), items_count);
-    // the thread count should be 0
-    ASSERT_EQ(thread_pool.size(), 0);
+    checkState(thread_pool, items_count, 0);
 
     // calling reset should clear all threads and should remove all queued items
     EXPECT_NO_THROW(thread_pool.reset());
-    // the item count should be 0
-    ASSERT_EQ(thread_pool.count(), 0);
-    // the thread count should be 0
-    ASSERT_EQ(thread_pool.size(), 0);
+    checkState(thread_pool, 0, 0);
 
     // do it once again to check if it works
     EXPECT_NO_THROW(thread_pool.reset());
-    // the item count should be 0
-    ASSERT_EQ(thread_pool.count(), 0);
-    // the thread count should be 0
-    ASSERT_EQ(thread_pool.size(), 0);
+    checkState(thread_pool, 0, 0);
 
     // calling start should create the threads and should keep the queued items
     EXPECT_NO_THROW(thread_pool.start(thread_count));
-    // the item count should be 0
-    ASSERT_EQ(thread_pool.count(), 0);
-    // the thread count should match
-    ASSERT_EQ(thread_pool.size(), thread_count);
+    checkState(thread_pool, 0, thread_count);
 
     // add items to running thread pool
     for (uint32_t i = 0; i < items_count; ++i) {
@@ -343,10 +327,7 @@ TEST_F(ThreadPoolTest, startAndStop) {
 
     // wait for all items to be processed
     waitTasks(thread_count, items_count);
-    // the item count should be 0
-    ASSERT_EQ(thread_pool.count(), 0);
-    // the thread count should match
-    ASSERT_EQ(thread_pool.size(), thread_count);
+    checkState(thread_pool, 0, thread_count);
     // as each thread pool thread is still waiting on main to unblock, each
     // thread should have been registered in ids list
     checkIds(items_count);
@@ -361,10 +342,7 @@ TEST_F(ThreadPoolTest, startAndStop) {
 
     // calling stop should clear all threads and should keep queued items
     EXPECT_NO_THROW(thread_pool.stop());
-    // the item count should be 0
-    ASSERT_EQ(thread_pool.count(), 0);
-    // the thread count should be 0
-    ASSERT_EQ(thread_pool.size(), 0);
+    checkState(thread_pool, 0, 0);
 
     items_count = 64;
     thread_count = 16;
@@ -383,10 +361,7 @@ TEST_F(ThreadPoolTest, startAndStop) {
         EXPECT_TRUE(ret);
     }
 
-    // the item count should match
-    ASSERT_EQ(thread_pool.count(), items_count);
-    // the thread count should be 0
-    ASSERT_EQ(thread_pool.size(), 0);
+    checkState(thread_pool, items_count, 0);
 
     // calling start should create the threads and should keep the queued items
     EXPECT_NO_THROW(thread_pool.start(thread_count));
@@ -395,10 +370,7 @@ TEST_F(ThreadPoolTest, startAndStop) {
 
     // wait for all items to be processed
     waitTasks(thread_count, items_count);
-    // the item count should be 0
-    ASSERT_EQ(thread_pool.count(), 0);
-    // the thread count should match
-    ASSERT_EQ(thread_pool.size(), thread_count);
+    checkState(thread_pool, 0, thread_count);
     // all items should have been processed
     ASSERT_EQ(count(), items_count);
 
@@ -407,10 +379,7 @@ TEST_F(ThreadPoolTest, startAndStop) {
 
     // calling stop should clear all threads and should keep queued items
     EXPECT_NO_THROW(thread_pool.stop());
-    // the item count should be 0
-    ASSERT_EQ(thread_pool.count(), 0);
-    // the thread count should be 0
-    ASSERT_EQ(thread_pool.size(), 0);
+    checkState(thread_pool, 0, 0);
 
     items_count = 16;
     thread_count = 16;
@@ -424,10 +393,7 @@ TEST_F(ThreadPoolTest, startAndStop) {
 
     // calling start should create the threads and should keep the queued items
     EXPECT_NO_THROW(thread_pool.start(thread_count));
-    // the item count should be 0
-    ASSERT_EQ(thread_pool.count(), 0);
-    // the thread count should match
-    ASSERT_EQ(thread_pool.size(), thread_count);
+    checkState(thread_pool, 0, thread_count);
 
     // add items to running thread pool
     for (uint32_t i = 0; i < items_count; ++i) {
@@ -438,10 +404,7 @@ TEST_F(ThreadPoolTest, startAndStop) {
 
     // wait for all items to be processed
     waitTasks(thread_count, items_count);
-    // the item count should be 0
-    ASSERT_EQ(thread_pool.count(), 0);
-    // the thread count should match
-    ASSERT_EQ(thread_pool.size(), thread_count);
+    checkState(thread_pool, 0, thread_count);
     // as each thread pool thread is still waiting on main to unblock, each
     // thread should have been registered in ids list
     checkIds(items_count);
@@ -456,10 +419,7 @@ TEST_F(ThreadPoolTest, startAndStop) {
 
     // calling stop should clear all threads and should keep queued items
     EXPECT_NO_THROW(thread_pool.stop());
-    // the item count should be 0
-    ASSERT_EQ(thread_pool.count(), 0);
-    // the thread count should be 0
-    ASSERT_EQ(thread_pool.size(), 0);
+    checkState(thread_pool, 0, 0);
 
     /// statistics
     std::cout << "stat10: " << thread_pool.getQueueStat(10) << std::endl;
@@ -473,10 +433,15 @@ TEST_F(ThreadPoolTest, wait) {
     uint32_t thread_count;
     CallBack call_back;
     ThreadPool<CallBack> thread_pool;
-    // the item count should be 0
-    ASSERT_EQ(thread_pool.count(), 0);
-    // the thread count should be 0
-    ASSERT_EQ(thread_pool.size(), 0);
+    checkState(thread_pool, 0, 0);
+
+    // calling wait should do nothing if not started
+    EXPECT_NO_THROW(thread_pool.wait());
+    checkState(thread_pool, 0, 0);
+
+    // do it once again to check if it works
+    EXPECT_NO_THROW(thread_pool.wait());
+    checkState(thread_pool, 0, 0);
 
     items_count = 16;
     thread_count = 16;
@@ -494,10 +459,7 @@ TEST_F(ThreadPoolTest, wait) {
         EXPECT_TRUE(ret);
     }
 
-    // the item count should match
-    ASSERT_EQ(thread_pool.count(), items_count);
-    // the thread count should be 0
-    ASSERT_EQ(thread_pool.size(), 0);
+    checkState(thread_pool, items_count, 0);
 
     // calling start should create the threads and should keep the queued items
     EXPECT_NO_THROW(thread_pool.start(thread_count));
@@ -506,10 +468,7 @@ TEST_F(ThreadPoolTest, wait) {
 
     // wait for all items to be processed
     waitTasks(thread_count, items_count);
-    // the item count should be 0
-    ASSERT_EQ(thread_pool.count(), 0);
-    // the thread count should match
-    ASSERT_EQ(thread_pool.size(), thread_count);
+    checkState(thread_pool, 0, thread_count);
     // as each thread pool thread is still waiting on main to unblock, each
     // thread should have been registered in ids list
     checkIds(items_count);
@@ -527,10 +486,7 @@ TEST_F(ThreadPoolTest, wait) {
 
     // calling stop should clear all threads and should keep queued items
     EXPECT_NO_THROW(thread_pool.stop());
-    // the item count should be 0
-    ASSERT_EQ(thread_pool.count(), 0);
-    // the thread count should be 0
-    ASSERT_EQ(thread_pool.size(), 0);
+    checkState(thread_pool, 0, 0);
 
     items_count = 64;
     thread_count = 16;
@@ -549,10 +505,7 @@ TEST_F(ThreadPoolTest, wait) {
         EXPECT_TRUE(ret);
     }
 
-    // the item count should match
-    ASSERT_EQ(thread_pool.count(), items_count);
-    // the thread count should be 0
-    ASSERT_EQ(thread_pool.size(), 0);
+    checkState(thread_pool, items_count, 0);
 
     // calling start should create the threads and should keep the queued items
     EXPECT_NO_THROW(thread_pool.start(thread_count));
@@ -561,15 +514,43 @@ TEST_F(ThreadPoolTest, wait) {
 
     // wait for all items to be processed
     thread_pool.wait();
-    // the item count should be 0
-    ASSERT_EQ(thread_pool.count(), 0);
-    // the thread count should match
-    ASSERT_EQ(thread_pool.size(), thread_count);
+    checkState(thread_pool, 0, thread_count);
     // all items should have been processed
     ASSERT_EQ(count(), items_count);
 
     // check that the number of processed tasks matches the number of items
     checkRunHistory(items_count);
+
+    // calling stop should clear all threads and should keep queued items
+    EXPECT_NO_THROW(thread_pool.stop());
+    checkState(thread_pool, 0, 0);
+
+    items_count = 16;
+    thread_count = 16;
+    // prepare setup
+    reset(thread_count);
+
+    // add items to stopped thread pool
+    for (uint32_t i = 0; i < items_count; ++i) {
+        bool ret = true;
+        EXPECT_NO_THROW(ret = thread_pool.add(boost::make_shared<CallBack>(call_back)));
+        EXPECT_TRUE(ret);
+    }
+
+    checkState(thread_pool, items_count, 0);
+
+    // calling start should create the threads and should keep the queued items
+    EXPECT_NO_THROW(thread_pool.start(thread_count));
+    // the thread count should match
+    ASSERT_EQ(thread_pool.size(), thread_count);
+
+    // calling stop should clear all threads and should keep queued items
+    EXPECT_NO_THROW(thread_pool.stop());
+    // the thread count should be 0
+    ASSERT_EQ(thread_pool.size(), 0);
+    // wait for all items to be processed
+    ASSERT_TRUE(thread_pool.wait(1));
+    checkState(thread_pool, 0, 0);
 }
 
 /// @brief test ThreadPool max queue size
index fdfce0f71bb3538a1acebfe620548243d79a841d..eec91c7b97eef9bc55bda5b3453dfbff31c9684e 100644 (file)
@@ -417,15 +417,13 @@ private:
         void clear() {
             std::lock_guard<std::mutex> lock(mutex_);
             queue_ = QueueContainer();
-            working_ = 0;
-            wait_cv_.notify_all();
         }
 
         /// @brief enable the queue
         ///
         /// Sets the queue state to 'enabled'
         ///
-        /// @param number of working threads
+        /// @param thread_count number of working threads
         void enable(uint32_t thread_count) {
             std::lock_guard<std::mutex> lock(mutex_);
             enabled_ = true;
@@ -466,7 +464,7 @@ private:
         /// @brief condition variable used to wait for all items to be processed
         std::condition_variable wait_cv_;
 
-        /// @brief the sate of the queue
+        /// @brief the state of the queue
         /// The 'enabled' state corresponds to true value
         /// The 'disabled' state corresponds to false value
         std::atomic<bool> enabled_;
@@ -490,7 +488,7 @@ private:
 
     /// @brief run function of each thread
     void run() {
-        while (queue_.enabled()) {
+        for (bool work = true; work; work = queue_.enabled()) {
             WorkItemPtr item = queue_.pop();
             if (item) {
                 try {