From: Razvan Becheriu Date: Mon, 9 Sep 2019 14:35:27 +0000 (+0300) Subject: [#883, !506] added doxigen and cleaned up code X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=96900cfc64dbec9db19cdec8ddb7c9d8a23f88d4;p=thirdparty%2Fkea.git [#883, !506] added doxigen and cleaned up code --- diff --git a/src/lib/dhcpsrv/thread_pool.cc b/src/lib/dhcpsrv/thread_pool.cc index 2dbfd121b5..e868bb3153 100644 --- a/src/lib/dhcpsrv/thread_pool.cc +++ b/src/lib/dhcpsrv/thread_pool.cc @@ -17,13 +17,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include #include + #include #include -#include - using namespace std; namespace isc { @@ -36,49 +34,63 @@ ThreadPool::~ThreadPool() { destroy(); } -void ThreadPool::create(uint32_t worker_threads) { +void ThreadPool::create(uint32_t worker_threads, bool run) { LOG_INFO(dhcpsrv_logger, "Thread pool starting with %1 worker threads") .arg(worker_threads); if (!worker_threads) { return; } destroy(); - queue_.create(); + if (run) { + start(worker_threads); + } + + LOG_INFO(dhcpsrv_logger, "Thread pool created"); +} + +void ThreadPool::destroy() { + LOG_INFO(dhcpsrv_logger, "Thread pool shutting down"); + stop(true); + + LOG_INFO(dhcpsrv_logger, "Thread pool shut down"); +} + +void ThreadPool::start(uint32_t worker_threads) { + queue_.start(); exit_ = false; for (int i = 0; i < worker_threads; ++i) { - worker_threads_.push_back(make_shared(&ThreadPool::threadRun, this)); + worker_threads_.push_back(make_shared(&ThreadPool::run, this)); } LOG_INFO(dhcpsrv_logger, "Thread pool started"); } -void ThreadPool::destroy() { - LOG_INFO(dhcpsrv_logger, "Thread pool shutting down"); +void ThreadPool::stop(bool clear) { exit_ = true; - queue_.destroy(); + queue_.stop(clear); for (auto thread : worker_threads_) { thread->join(); } worker_threads_.clear(); - LOG_INFO(dhcpsrv_logger, "Thread pool shut down"); + LOG_INFO(dhcpsrv_logger, "Thread pool stopped"); } void ThreadPool::add(WorkItemCallBack call_back) { - queue_.add(call_back); + queue_.push(call_back); } size_t ThreadPool::count() { return queue_.count(); } -void ThreadPool::threadRun() { +void ThreadPool::run() { thread::id th_id = this_thread::get_id(); LOG_INFO(dhcpsrv_logger, "Thread pool thread started. id: %1").arg(th_id); while (!exit_) { WorkItemCallBack work_item; - if (queue_.get(work_item)) { + if (queue_.pop(work_item)) { work_item(); } } diff --git a/src/lib/dhcpsrv/thread_pool.h b/src/lib/dhcpsrv/thread_pool.h index 826a962ec3..b5b5f2c268 100644 --- a/src/lib/dhcpsrv/thread_pool.h +++ b/src/lib/dhcpsrv/thread_pool.h @@ -24,7 +24,6 @@ #include #include -#include #include #include #include @@ -33,29 +32,65 @@ namespace isc { namespace dhcp { +/// @brief Defines a generic thread pool queue. +/// +/// The main purpose is to safely manage thread pool tasks. +/// The thread pool queue can be 'disabled', which means that no items can be +/// added or removed from the queue, or 'enabled', which guarantees that +/// inserting or removing items are thread safe. +/// In 'disabled' state, all threads waiting on the queue are unlocked and all +/// operations are non blocking. template struct ThreadPoolQueue { + /// @brief Constructor + /// + /// Creates the thread pool queue in 'disabled' state ThreadPoolQueue() : exit_(true) { } + /// @brief Destructor + /// + /// Destroys the thread pool queue ~ThreadPoolQueue() { + stop(true); } - void add(WorkItem item) { + /// @brief push work item to the queue + /// + /// Used to add work items in the queue. + /// If the queue is 'disabled', this function returns immediately. + /// If the queue is 'enabled', this function adds an item to the queue and + /// wakes up at least one thread waiting on the queue. + /// + /// @param item the new iten to be added to the queue + void push(WorkItem item) { std::lock_guard lock(mutex_); if (exit_) { return; } queue_.push(item); - // Notify get() so that it can effectively get a work item. + // Notify pop function so that it can effectively remove a work item. cv_.notify_all(); } - bool get(WorkItem& item) { + /// @brief pop work item from the queue or block waiting + /// + /// Used to retrieve and remove a work item from the queue + /// If the queue is 'disabled', this function returns immediately + /// (no element). + /// If the queue is 'enabled', this function returns the first element in + /// the queue or blocks the calling thread if there are no work items + /// available. + /// + /// @param item the reference of the item removed from the queue, if any + /// + /// @return true if there was a work item removed from the queue, false + /// otherwise + bool pop(WorkItem& item) { std::unique_lock lock(mutex_); while (!exit_) { if (queue_.empty()) { - // Wait for add() or destroy(). + // Wait for push or stop functions. cv_.wait(lock); continue; } @@ -68,62 +103,126 @@ struct ThreadPoolQueue { return false; } + /// @brief count number of work items in the queue + /// + /// Returns the number of work items in the queue + /// + /// @return the number of work items size_t count() { std::lock_guard lock(mutex_); return queue_.size(); } - void removeAll() { + /// @brief clear remove all work items + /// + /// Removes all queued work items + void clear() { std::lock_guard lock(mutex_); - removeAllUnsafe(); + reset(); } - void create() { + /// @brief start and enable the queue + /// + /// Sets the queue state to 'enabled' + void start() { std::lock_guard lock(mutex_); exit_ = false; } - void destroy() { + /// brief stop and disable the queue + /// + /// Sets the queue state to 'disabled' and optionally removes all work items + /// + /// @param clear used to specify if the function should also clear the queue + void stop(bool clear = false) { std::lock_guard lock(mutex_); exit_ = true; // Notify get() so that it can exit. cv_.notify_all(); - removeAllUnsafe(); + if (clear) { + reset(); + } } private: - /// @brief Has to be called in a mutex_-locked environment. - void removeAllUnsafe() { - while (queue_.size()) { - queue_.pop(); - } + /// @brief reset clears the queue removing all work items + /// + /// Must be called in a critical section (mutex locked scope). + void reset() { + queue_ = std::queue(); } + /// @brief underlying queue container std::queue queue_; + + /// @brief mutex used for critical sections std::mutex mutex_; + + /// @brief condition variable used to signal waiting threads std::condition_variable cv_; + + /// @brief the sate of the queue + /// + /// The 'enabled' state corresponds to false value + /// The 'disabled' state corresponds to true value std::atomic_bool exit_; }; +/// @brief Defines a thread pool which uses a thread pool queue for managing +/// work items. Each work item is a 'function' object. struct ThreadPool { using WorkItemCallBack = std::function; + /// @brief Constructor ThreadPool(); + + /// @brief Destructor ~ThreadPool(); - void create(uint32_t worker_threads); + /// @brief initialize the thread pool with specified number of treads and + /// optionally starts all threads. + /// + /// @param worker_threads specifies the number of threads to be created + /// @param run to indicate if the threads should be started immediately + void create(uint32_t thread_count, bool run = true); + /// @brief de-initialize the thread pool stopping threads and clearing the + /// internal queue void destroy(); + /// @brief start all the threads + /// + /// @param worker_threads specifies the number of threads to be created + void start(uint32_t thread_count); + + /// @brief stop all the threads + /// + /// @param clear used to specify if the function should also clear the queue + void stop(bool clear = false); + + /// @brief add a working item to the thread pool + /// + /// @param call_back the 'function' object to be added to the queue void add(WorkItemCallBack call_back); + /// @brief count number of work items in the queue + /// + /// @return the number of work items in the queue size_t count(); private: - void threadRun(); + /// @brief run function of each thread + void run(); + /// @brief list of worker threads std::list> worker_threads_; + + /// @brief underlying work items queue ThreadPoolQueue queue_; + + /// @brief state of the thread pool + /// The 'run' state corresponds to false value + /// The 'stop' state corresponds to true value std::atomic_bool exit_; };