]> git.ipfire.org Git - thirdparty/kea.git/commitdiff
[#883, !506] addressed comments - using smart pointer for WorkItem and using queue...
authorRazvan Becheriu <razvan@isc.org>
Mon, 18 Nov 2019 16:07:51 +0000 (18:07 +0200)
committerRazvan Becheriu <razvan@isc.org>
Mon, 18 Nov 2019 16:39:36 +0000 (18:39 +0200)
src/lib/util/tests/thread_pool_unittest.cc
src/lib/util/thread_pool.h

index e3a99b721c0a011ae08311bd8cf5b86407492fee..85a0340a5b8f0800dd5d4c24f6f25707a93f37a7 100644 (file)
@@ -205,7 +205,7 @@ TEST_F(ThreadPoolTest, testAddAndCount) {
 
     // add items to stopped thread pool
     for (uint32_t i = 0; i < items_count; ++i) {
-        thread_pool.add(call_back);
+        thread_pool.add(make_shared<CallBack>(call_back));
     }
 
     // the item count should match
@@ -269,7 +269,7 @@ TEST_F(ThreadPoolTest, testStartAndStop) {
 
     // add items to stopped thread pool
     for (uint32_t i = 0; i < items_count; ++i) {
-        thread_pool.add(call_back);
+        thread_pool.add(make_shared<CallBack>(call_back));
     }
 
     // the item count should match
@@ -307,7 +307,7 @@ TEST_F(ThreadPoolTest, testStartAndStop) {
 
     // add items to running thread pool
     for (uint32_t i = 0; i < items_count; ++i) {
-        thread_pool.add(call_back);
+        thread_pool.add(make_shared<CallBack>(call_back));
     }
 
     // wait for all items to be processed
@@ -347,7 +347,7 @@ TEST_F(ThreadPoolTest, testStartAndStop) {
 
     // add items to stopped thread pool
     for (uint32_t i = 0; i < items_count; ++i) {
-        thread_pool.add(call_back);
+        thread_pool.add(make_shared<CallBack>(call_back));
     }
 
     // the item count should match
index 613c84570355a347849c12ae20111774877fe971..cde06b0c0bd0fed5c5d85fcbd88760e45437be27 100644 (file)
@@ -7,6 +7,8 @@
 #ifndef THREAD_POOL_H
 #define THREAD_POOL_H
 
+#include <exceptions/exceptions.h>
+
 #include <atomic>
 #include <condition_variable>
 #include <list>
@@ -19,8 +21,13 @@ namespace util {
 
 /// @brief Defines a thread pool which uses a thread pool queue for managing
 /// work items. Each work item is a 'function' object.
-template <typename WorkItem>
+///
+/// @tparam WorkItem a functor
+/// @tparam Container a 'queue like' container
+template <typename WorkItem, typename Container = std::queue<std::shared_ptr<WorkItem>>>
 struct ThreadPool {
+    typedef typename std::shared_ptr<WorkItem> WorkItemPtr;
+
     /// @brief Constructor
     ThreadPool() : running_(false) {
     }
@@ -42,8 +49,11 @@ struct ThreadPool {
     /// @param thread_count specifies the number of threads to be created and
     /// started
     void start(uint32_t thread_count) {
-        if (!thread_count || running_) {
-            return;
+        if (!thread_count) {
+            isc_throw(InvalidParameter, "thread count is 0");
+        }
+        if (running_) {
+            isc_throw(InvalidParameter, "thread pool already started");
         }
         queue_.enable();
         running_ = true;
@@ -54,6 +64,9 @@ struct ThreadPool {
 
     /// @brief stop all the threads
     void stop() {
+        if (!running_) {
+            isc_throw(InvalidParameter, "thread pool already stopped");
+        }
         running_ = false;
         queue_.disable();
         for (auto thread : threads_) {
@@ -65,7 +78,7 @@ struct ThreadPool {
     /// @brief add a work item to the thread pool
     ///
     /// @param item the 'function' object to be added to the queue
-    void add(WorkItem& item) {
+    void add(const WorkItemPtr& item) {
         queue_.push(item);
     }
 
@@ -92,7 +105,10 @@ private:
     /// removing items are thread safe.
     /// In 'disabled' state, all threads waiting on the queue are unlocked and all
     /// operations are non blocking.
-    template <typename Item>
+    ///
+    /// @tparam Item a 'smart pointer' to a functor
+    /// @tparam QueueContainer a 'queue like' container
+    template <typename Item, typename QueueContainer = std::queue<Item>>
     struct ThreadPoolQueue {
         /// @brief Constructor
         ///
@@ -115,7 +131,7 @@ private:
         /// waiting on the queue.
         ///
         /// @param item the new item to be added to the queue
-        void push(Item& item) {
+        void push(const Item& item) {
             if (!item) {
                 return;
             }
@@ -143,12 +159,10 @@ private:
                     cv_.wait(lock);
                     continue;
                 }
-
                 Item item = queue_.front();
                 queue_.pop();
                 return item;
             }
-
             return Item();
         }
 
@@ -190,7 +204,7 @@ private:
 
     private:
         /// @brief underlying queue container
-        std::queue<Item> queue_;
+        QueueContainer queue_;
 
         /// @brief mutex used for critical sections
         std::mutex mutex_;
@@ -201,16 +215,16 @@ private:
         /// @brief the sate of the queue
         /// The 'enabled' state corresponds to true value
         /// The 'disabled' state corresponds to false value
-        std::atomic_bool enabled_;
+        std::atomic<bool> enabled_;
     };
 
     /// @brief run function of each thread
     void run() {
         while (running_) {
-            WorkItem item = queue_.pop();
+            WorkItemPtr item = queue_.pop();
             if (item) {
                 try {
-                    item();
+                    (*item)();
                 } catch (...) {
                 }
             }
@@ -218,15 +232,15 @@ private:
     }
 
     /// @brief list of worker threads
-    std::list<std::shared_ptr<std::thread>> threads_;
+    std::vector<std::shared_ptr<std::thread>> threads_;
 
     /// @brief underlying work items queue
-    ThreadPoolQueue<WorkItem> queue_;
+    ThreadPoolQueue<WorkItemPtr, Container> queue_;
 
     /// @brief state of the thread pool
     /// The 'running' state corresponds to true value
     /// The 'not running' state corresponds to false value
-    std::atomic_bool running_;
+    std::atomic<bool> running_;
 };
 
 }  // namespace util