From: Razvan Becheriu Date: Mon, 18 Nov 2019 16:07:51 +0000 (+0200) Subject: [#883, !506] addressed comments - using smart pointer for WorkItem and using queue... X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=0ff84399c4c174a5eb2b32f760f43a102386464d;p=thirdparty%2Fkea.git [#883, !506] addressed comments - using smart pointer for WorkItem and using queue container as template argument --- diff --git a/src/lib/util/tests/thread_pool_unittest.cc b/src/lib/util/tests/thread_pool_unittest.cc index e3a99b721c..85a0340a5b 100644 --- a/src/lib/util/tests/thread_pool_unittest.cc +++ b/src/lib/util/tests/thread_pool_unittest.cc @@ -205,7 +205,7 @@ TEST_F(ThreadPoolTest, testAddAndCount) { // add items to stopped thread pool for (uint32_t i = 0; i < items_count; ++i) { - thread_pool.add(call_back); + thread_pool.add(make_shared(call_back)); } // the item count should match @@ -269,7 +269,7 @@ TEST_F(ThreadPoolTest, testStartAndStop) { // add items to stopped thread pool for (uint32_t i = 0; i < items_count; ++i) { - thread_pool.add(call_back); + thread_pool.add(make_shared(call_back)); } // the item count should match @@ -307,7 +307,7 @@ TEST_F(ThreadPoolTest, testStartAndStop) { // add items to running thread pool for (uint32_t i = 0; i < items_count; ++i) { - thread_pool.add(call_back); + thread_pool.add(make_shared(call_back)); } // wait for all items to be processed @@ -347,7 +347,7 @@ TEST_F(ThreadPoolTest, testStartAndStop) { // add items to stopped thread pool for (uint32_t i = 0; i < items_count; ++i) { - thread_pool.add(call_back); + thread_pool.add(make_shared(call_back)); } // the item count should match diff --git a/src/lib/util/thread_pool.h b/src/lib/util/thread_pool.h index 613c845703..cde06b0c0b 100644 --- a/src/lib/util/thread_pool.h +++ b/src/lib/util/thread_pool.h @@ -7,6 +7,8 @@ #ifndef THREAD_POOL_H #define THREAD_POOL_H +#include + #include #include #include @@ -19,8 +21,13 @@ namespace util { /// @brief Defines a thread pool which uses a thread pool queue for managing /// work items. Each work item is a 'function' object. -template +/// +/// @tparam WorkItem a functor +/// @tparam Container a 'queue like' container +template >> struct ThreadPool { + typedef typename std::shared_ptr WorkItemPtr; + /// @brief Constructor ThreadPool() : running_(false) { } @@ -42,8 +49,11 @@ struct ThreadPool { /// @param thread_count specifies the number of threads to be created and /// started void start(uint32_t thread_count) { - if (!thread_count || running_) { - return; + if (!thread_count) { + isc_throw(InvalidParameter, "thread count is 0"); + } + if (running_) { + isc_throw(InvalidParameter, "thread pool already started"); } queue_.enable(); running_ = true; @@ -54,6 +64,9 @@ struct ThreadPool { /// @brief stop all the threads void stop() { + if (!running_) { + isc_throw(InvalidParameter, "thread pool already stopped"); + } running_ = false; queue_.disable(); for (auto thread : threads_) { @@ -65,7 +78,7 @@ struct ThreadPool { /// @brief add a work item to the thread pool /// /// @param item the 'function' object to be added to the queue - void add(WorkItem& item) { + void add(const WorkItemPtr& item) { queue_.push(item); } @@ -92,7 +105,10 @@ private: /// removing items are thread safe. /// In 'disabled' state, all threads waiting on the queue are unlocked and all /// operations are non blocking. - template + /// + /// @tparam Item a 'smart pointer' to a functor + /// @tparam QueueContainer a 'queue like' container + template > struct ThreadPoolQueue { /// @brief Constructor /// @@ -115,7 +131,7 @@ private: /// waiting on the queue. /// /// @param item the new item to be added to the queue - void push(Item& item) { + void push(const Item& item) { if (!item) { return; } @@ -143,12 +159,10 @@ private: cv_.wait(lock); continue; } - Item item = queue_.front(); queue_.pop(); return item; } - return Item(); } @@ -190,7 +204,7 @@ private: private: /// @brief underlying queue container - std::queue queue_; + QueueContainer queue_; /// @brief mutex used for critical sections std::mutex mutex_; @@ -201,16 +215,16 @@ private: /// @brief the sate of the queue /// The 'enabled' state corresponds to true value /// The 'disabled' state corresponds to false value - std::atomic_bool enabled_; + std::atomic enabled_; }; /// @brief run function of each thread void run() { while (running_) { - WorkItem item = queue_.pop(); + WorkItemPtr item = queue_.pop(); if (item) { try { - item(); + (*item)(); } catch (...) { } } @@ -218,15 +232,15 @@ private: } /// @brief list of worker threads - std::list> threads_; + std::vector> threads_; /// @brief underlying work items queue - ThreadPoolQueue queue_; + ThreadPoolQueue queue_; /// @brief state of the thread pool /// The 'running' state corresponds to true value /// The 'not running' state corresponds to false value - std::atomic_bool running_; + std::atomic running_; }; } // namespace util