From: Razvan Becheriu Date: Wed, 25 Aug 2021 06:36:24 +0000 (+0300) Subject: [#1599] added pause with wait for threads to stop X-Git-Tag: Kea-2.5.4~39 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=e2d76b8cc51ee7b0f56d3ce45b3dafcec84a453f;p=thirdparty%2Fkea.git [#1599] added pause with wait for threads to stop --- diff --git a/src/lib/util/multi_threading_mgr.cc b/src/lib/util/multi_threading_mgr.cc index 311b0ab492..a48a1b14ee 100644 --- a/src/lib/util/multi_threading_mgr.cc +++ b/src/lib/util/multi_threading_mgr.cc @@ -45,7 +45,11 @@ MultiThreadingMgr::enterCriticalSection() { ++critical_section_count_; if (getMode() && !inside) { if (getThreadPoolSize()) { - thread_pool_.stop(); + // We simply pause without waiting for all tasks to complete. + // We could also call pause(false) which does not wait for + // threads to stop and wait() so that all tasks are complete + // and threads are stopped. + thread_pool_.pause(); } // Now it is safe to call callbacks which can also create other CSs. callEntryCallbacks(); @@ -67,7 +71,7 @@ MultiThreadingMgr::exitCriticalSection() { --critical_section_count_; if (getMode() && !isInCriticalSection()) { if (getThreadPoolSize()) { - thread_pool_.start(getThreadPoolSize()); + thread_pool_.resume(); } // Now it is safe to call callbacks which can also create other CSs. callExitCallbacks(); diff --git a/src/lib/util/thread_pool.h b/src/lib/util/thread_pool.h index 207c265d57..28e66c09d9 100644 --- a/src/lib/util/thread_pool.h +++ b/src/lib/util/thread_pool.h @@ -143,8 +143,10 @@ struct ThreadPool { /// @brief pause threads /// /// Used to pause threads so that they stop processing tasks - void pause() { - queue_.pause(); + /// + /// @param wait the flag indicating if should wait for threads to pause. + void pause(bool wait = true) { + queue_.pause(wait); } /// @brief resume threads @@ -364,10 +366,15 @@ private: Item pop() { std::unique_lock lock(mutex_); --working_; - // Wait for push or disable functions. + // Signal thread waiting for threads to pause. + if (working_ == 0 && paused_) { + wait_threads_cv_.notify_all(); + } + // Signal thread waiting for tasks to finish. if (working_ == 0 && queue_.empty()) { wait_cv_.notify_all(); } + // Wait for push or disable functions. cv_.wait(lock, [&]() {return (!enabled_ || !queue_.empty());}); pause_cv_.wait(lock, [&]() {return (!enabled_ || !paused_);}); ++working_; @@ -421,9 +428,15 @@ private: /// @brief pause threads /// /// Used to pause threads so that they stop processing tasks - void pause() { + /// + /// @param wait the flag indicating if should wait for threads to pause. + void pause(bool wait) { std::unique_lock lock(mutex_); paused_ = true; + if (wait) { + // Wait for working threads to finish. + wait_threads_cv_.wait(lock, [&]() {return (working_ == 0);}); + } } /// @brief resume threads @@ -505,6 +518,9 @@ private: /// @brief condition variable used to wait for all items to be processed std::condition_variable wait_cv_; + /// @brief condition variable used to wait for all threads to be paused + std::condition_variable wait_threads_cv_; + /// @brief condition variable used to pause threads std::condition_variable pause_cv_;