]> git.ipfire.org Git - thirdparty/kea.git/commitdiff
[#1599] implemented pause and resume
authorRazvan Becheriu <razvan@isc.org>
Fri, 11 Dec 2020 10:52:03 +0000 (12:52 +0200)
committerRazvan Becheriu <razvan@isc.org>
Wed, 15 Nov 2023 06:36:55 +0000 (08:36 +0200)
src/lib/util/tests/thread_pool_unittest.cc
src/lib/util/thread_pool.h

index 43becc3955aca005082e2f2d252e409309e85f58..439f7a3ed3cb34b01c6e35e96bb29ee40dbec658 100644 (file)
@@ -553,6 +553,232 @@ TEST_F(ThreadPoolTest, wait) {
     checkState(thread_pool, 0, 0);
 }
 
+/// @brief test ThreadPool pause and resume
+TEST_F(ThreadPoolTest, pauseAndResume) {
+    uint32_t items_count;
+    uint32_t thread_count;
+    CallBack call_back;
+    ThreadPool<CallBack> thread_pool;
+    // the item count should be 0
+    ASSERT_EQ(thread_pool.count(), 0);
+    // the thread count should be 0
+    ASSERT_EQ(thread_pool.size(), 0);
+
+    items_count = 4;
+    thread_count = 4;
+    // prepare setup
+    reset(thread_count);
+
+    // create tasks which block thread pool threads until signaled by main
+    // thread to force all threads of the thread pool to run exactly one task
+    call_back = std::bind(&ThreadPoolTest::runAndWait, this);
+
+    // calling resume should do nothing if not started
+    EXPECT_NO_THROW(thread_pool.resume());
+    // the item count should be 0
+    ASSERT_EQ(thread_pool.count(), 0);
+    // the thread count should be 0
+    ASSERT_EQ(thread_pool.size(), 0);
+
+    // do it once again to check if it works
+    EXPECT_NO_THROW(thread_pool.resume());
+    // the item count should be 0
+    ASSERT_EQ(thread_pool.count(), 0);
+    // the thread count should be 0
+    ASSERT_EQ(thread_pool.size(), 0);
+
+    // calling pause should do nothing if not started
+    EXPECT_NO_THROW(thread_pool.pause());
+    // the item count should be 0
+    ASSERT_EQ(thread_pool.count(), 0);
+    // the thread count should be 0
+    ASSERT_EQ(thread_pool.size(), 0);
+
+    // do it once again to check if it works
+    EXPECT_NO_THROW(thread_pool.pause());
+    // the item count should be 0
+    ASSERT_EQ(thread_pool.count(), 0);
+    // the thread count should be 0
+    ASSERT_EQ(thread_pool.size(), 0);
+
+    // calling resume should do nothing if not started
+    EXPECT_NO_THROW(thread_pool.resume());
+    // the item count should be 0
+    ASSERT_EQ(thread_pool.count(), 0);
+    // the thread count should be 0
+    ASSERT_EQ(thread_pool.size(), 0);
+
+    // add items to stopped thread pool
+    for (uint32_t i = 0; i < items_count; ++i) {
+        bool ret = true;
+        EXPECT_NO_THROW(ret = thread_pool.add(boost::make_shared<CallBack>(call_back)));
+        EXPECT_TRUE(ret);
+    }
+
+    // the item count should match
+    ASSERT_EQ(thread_pool.count(), items_count);
+    // the thread count should be 0
+    ASSERT_EQ(thread_pool.size(), 0);
+
+    // calling pause should do nothing if not started
+    EXPECT_NO_THROW(thread_pool.pause());
+    // the item count should match
+    ASSERT_EQ(thread_pool.count(), items_count);
+    // the thread count should be 0
+    ASSERT_EQ(thread_pool.size(), 0);
+
+    // calling resume should do nothing if not started
+    EXPECT_NO_THROW(thread_pool.resume());
+    // the item count should match
+    ASSERT_EQ(thread_pool.count(), items_count);
+    // the thread count should be 0
+    ASSERT_EQ(thread_pool.size(), 0);
+
+    // calling pause should do nothing if not started
+    EXPECT_NO_THROW(thread_pool.pause());
+    // the item count should match
+    ASSERT_EQ(thread_pool.count(), items_count);
+    // the thread count should be 0
+    ASSERT_EQ(thread_pool.size(), 0);
+
+    // calling start should create the threads and should keep the queued items
+    EXPECT_NO_THROW(thread_pool.start(thread_count));
+    // the item count should match
+    ASSERT_EQ(thread_pool.count(), items_count);
+    // the thread count should match
+    ASSERT_EQ(thread_pool.size(), thread_count);
+
+    // calling resume should resume threads
+    EXPECT_NO_THROW(thread_pool.resume());
+    // the thread count should match
+    ASSERT_EQ(thread_pool.size(), thread_count);
+
+    // wait for all items to be processed
+    waitTasks(thread_count, items_count);
+    // the item count should be 0
+    ASSERT_EQ(thread_pool.count(), 0);
+    // the thread count should match
+    ASSERT_EQ(thread_pool.size(), thread_count);
+    // as each thread pool thread is still waiting on main to unblock, each
+    // thread should have been registered in ids list
+    checkIds(items_count);
+    // all items should have been processed
+    ASSERT_EQ(count(), items_count);
+
+    // check that the number of processed tasks matches the number of items
+    checkRunHistory(items_count);
+
+    // signal thread pool tasks to continue
+    signalThreads();
+
+    // do it once again to check if it works
+    EXPECT_NO_THROW(thread_pool.resume());
+    // the item count should be 0
+    ASSERT_EQ(thread_pool.count(), 0);
+    // the thread count should match
+    ASSERT_EQ(thread_pool.size(), thread_count);
+
+    // calling stop should clear all threads and should keep queued items
+    EXPECT_NO_THROW(thread_pool.stop());
+    // the item count should be 0
+    ASSERT_EQ(thread_pool.count(), 0);
+    // the thread count should be 0
+    ASSERT_EQ(thread_pool.size(), 0);
+
+    items_count = 64;
+    thread_count = 16;
+    // prepare setup
+    reset(thread_count);
+
+    // create tasks which do not block the thread pool threads so that several
+    // tasks can be run on the same thread and some of the threads never even
+    // having a chance to run
+    call_back = std::bind(&ThreadPoolTest::run, this);
+
+    // add items to stopped thread pool
+    for (uint32_t i = 0; i < items_count; ++i) {
+        bool ret = true;
+        EXPECT_NO_THROW(ret = thread_pool.add(boost::make_shared<CallBack>(call_back)));
+        EXPECT_TRUE(ret);
+    }
+
+    // the item count should match
+    ASSERT_EQ(thread_pool.count(), items_count);
+    // the thread count should be 0
+    ASSERT_EQ(thread_pool.size(), 0);
+
+    // calling start should create the threads and should keep the queued items
+    EXPECT_NO_THROW(thread_pool.start(thread_count));
+    // the thread count should match
+    ASSERT_EQ(thread_pool.size(), thread_count);
+
+    // wait for all items to be processed
+    waitTasks(thread_count, items_count);
+    // the item count should be 0
+    ASSERT_EQ(thread_pool.count(), 0);
+    // the thread count should match
+    ASSERT_EQ(thread_pool.size(), thread_count);
+    // all items should have been processed
+    ASSERT_EQ(count(), items_count);
+
+    // check that the number of processed tasks matches the number of items
+    checkRunHistory(items_count);
+
+    // calling pause should pause threads
+    EXPECT_NO_THROW(thread_pool.pause());
+    // the item count should be 0
+    ASSERT_EQ(thread_pool.count(), 0);
+    // the thread count should match
+    ASSERT_EQ(thread_pool.size(), thread_count);
+
+    // do it once again to check if it works
+    EXPECT_NO_THROW(thread_pool.pause());
+    // the item count should be 0
+    ASSERT_EQ(thread_pool.count(), 0);
+    // the thread count should match
+    ASSERT_EQ(thread_pool.size(), thread_count);
+
+    // prepare setup
+    reset(thread_count);
+
+    // add items to paused thread pool
+    for (uint32_t i = 0; i < items_count; ++i) {
+        bool ret = true;
+        EXPECT_NO_THROW(ret = thread_pool.add(boost::make_shared<CallBack>(call_back)));
+        EXPECT_TRUE(ret);
+    }
+
+    // the item count should match
+    ASSERT_EQ(thread_pool.count(), items_count);
+    // the thread count should match
+    ASSERT_EQ(thread_pool.size(), thread_count);
+
+    // calling resume should resume threads
+    EXPECT_NO_THROW(thread_pool.resume());
+    // the thread count should match
+    ASSERT_EQ(thread_pool.size(), thread_count);
+
+    // wait for all items to be processed
+    waitTasks(thread_count, items_count);
+    // the item count should be 0
+    ASSERT_EQ(thread_pool.count(), 0);
+    // the thread count should match
+    ASSERT_EQ(thread_pool.size(), thread_count);
+    // all items should have been processed
+    ASSERT_EQ(count(), items_count);
+
+    // check that the number of processed tasks matches the number of items
+    checkRunHistory(items_count);
+
+    // calling stop should clear all threads and should keep queued items
+    EXPECT_NO_THROW(thread_pool.stop());
+    // the item count should be 0
+    ASSERT_EQ(thread_pool.count(), 0);
+    // the thread count should be 0
+    ASSERT_EQ(thread_pool.size(), 0);
+>>>>>>> 0d7199fdd8 ([#1599] implemented pause and resume)
+}
+
 /// @brief test ThreadPool max queue size
 TEST_F(ThreadPoolTest, maxQueueSize) {
     uint32_t items_count;
index 726c1c61bf44170ae9e79ee250de56a9a4b003c8..207c265d57e7408ca1367038733d095349ce2bcd 100644 (file)
@@ -140,6 +140,20 @@ struct ThreadPool {
         return (queue_.wait(seconds));
     }
 
+    /// @brief pause threads
+    ///
+    /// Used to pause threads so that they stop processing tasks
+    void pause() {
+        queue_.pause();
+    }
+
+    /// @brief resume threads
+    ///
+    /// Used to resume threads so that they start processing tasks
+    void resume() {
+        queue_.resume();
+    }
+
     /// @brief set maximum number of work items in the queue
     ///
     /// @param max_queue_size the maximum size (0 means unlimited)
@@ -241,7 +255,7 @@ private:
         ///
         /// Creates the thread pool queue in 'disabled' state
         ThreadPoolQueue()
-            : enabled_(false), max_queue_size_(0), working_(0),
+            : enabled_(false), paused_(false), max_queue_size_(0), working_(0),
               stat10(0.), stat100(0.), stat1000(0.) {
         }
 
@@ -355,6 +369,7 @@ private:
                 wait_cv_.notify_all();
             }
             cv_.wait(lock, [&]() {return (!enabled_ || !queue_.empty());});
+            pause_cv_.wait(lock, [&]() {return (!enabled_ || !paused_);});
             ++working_;
             if (!enabled_) {
                 return (Item());
@@ -403,6 +418,23 @@ private:
             return (ret);
         }
 
+        /// @brief pause threads
+        ///
+        /// Used to pause threads so that they stop processing tasks
+        void pause() {
+            std::unique_lock<std::mutex> lock(mutex_);
+            paused_ = true;
+        }
+
+        /// @brief resume threads
+        ///
+        /// Used to resume threads so that they start processing tasks
+        void resume() {
+            std::unique_lock<std::mutex> lock(mutex_);
+            paused_ = false;
+            pause_cv_.notify_all();
+        }
+
         /// @brief get queue length statistic
         ///
         /// @param which select the statistic (10, 100 or 1000)
@@ -473,11 +505,19 @@ private:
         /// @brief condition variable used to wait for all items to be processed
         std::condition_variable wait_cv_;
 
+        /// @brief condition variable used to pause threads
+        std::condition_variable pause_cv_;
+
         /// @brief the state of the queue
         /// The 'enabled' state corresponds to true value
         /// The 'disabled' state corresponds to false value
         std::atomic<bool> enabled_;
 
+        /// @brief the pause state of the queue
+        /// The 'paused' state corresponds to true value
+        /// The 'resumed' state corresponds to false value
+        std::atomic<bool> paused_;
+
         /// @brief maximum number of work items in the queue
         /// (0 means unlimited)
         size_t max_queue_size_;