]> git.ipfire.org Git - thirdparty/kea.git/commitdiff
[#883, !506] added doxigen and cleaned up code
authorRazvan Becheriu <razvan@isc.org>
Mon, 9 Sep 2019 14:35:27 +0000 (17:35 +0300)
committerRazvan Becheriu <razvan@isc.org>
Wed, 6 Nov 2019 17:32:51 +0000 (19:32 +0200)
src/lib/dhcpsrv/thread_pool.cc
src/lib/dhcpsrv/thread_pool.h

index 2dbfd121b5645b31dde003d57b081796bc88960e..e868bb3153a335321f1d990bfea89d8989178c03 100644 (file)
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-#include <cassert>
 #include <config.h>
+
 #include <dhcpsrv/dhcpsrv_log.h>
 #include <dhcpsrv/thread_pool.h>
 
-#include <functional>
-
 using namespace std;
 
 namespace isc {
@@ -36,49 +34,63 @@ ThreadPool::~ThreadPool() {
     destroy();
 }
 
-void ThreadPool::create(uint32_t worker_threads) {
+void ThreadPool::create(uint32_t worker_threads, bool run) {
     LOG_INFO(dhcpsrv_logger, "Thread pool starting with %1 worker threads")
         .arg(worker_threads);
     if (!worker_threads) {
         return;
     }
     destroy();
-    queue_.create();
+    if (run) {
+        start(worker_threads);
+    }
+
+    LOG_INFO(dhcpsrv_logger, "Thread pool created");
+}
+
+void ThreadPool::destroy() {
+    LOG_INFO(dhcpsrv_logger, "Thread pool shutting down");
+    stop(true);
+
+    LOG_INFO(dhcpsrv_logger, "Thread pool shut down");
+}
+
+void ThreadPool::start(uint32_t worker_threads) {
+    queue_.start();
     exit_ = false;
     for (int i = 0; i < worker_threads; ++i) {
-        worker_threads_.push_back(make_shared<thread>(&ThreadPool::threadRun, this));
+        worker_threads_.push_back(make_shared<thread>(&ThreadPool::run, this));
     }
 
     LOG_INFO(dhcpsrv_logger, "Thread pool started");
 }
 
-void ThreadPool::destroy() {
-    LOG_INFO(dhcpsrv_logger, "Thread pool shutting down");
+void ThreadPool::stop(bool clear) {
     exit_ = true;
-    queue_.destroy();
+    queue_.stop(clear);
     for (auto thread : worker_threads_) {
         thread->join();
     }
     worker_threads_.clear();
 
-    LOG_INFO(dhcpsrv_logger, "Thread pool shut down");
+    LOG_INFO(dhcpsrv_logger, "Thread pool stopped");
 }
 
 void ThreadPool::add(WorkItemCallBack call_back) {
-    queue_.add(call_back);
+    queue_.push(call_back);
 }
 
 size_t ThreadPool::count() {
     return queue_.count();
 }
 
-void ThreadPool::threadRun() {
+void ThreadPool::run() {
     thread::id th_id = this_thread::get_id();
     LOG_INFO(dhcpsrv_logger, "Thread pool thread started. id: %1").arg(th_id);
 
     while (!exit_) {
         WorkItemCallBack work_item;
-        if (queue_.get(work_item)) {
+        if (queue_.pop(work_item)) {
             work_item();
         }
     }
index 826a962ec3f42369afc1604090b18de903a6438b..b5b5f2c268bd20a2668363ea12b2c4b7e7bdab67 100644 (file)
@@ -24,7 +24,6 @@
 
 #include <atomic>
 #include <condition_variable>
-#include <cstdint>
 #include <list>
 #include <mutex>
 #include <queue>
 namespace isc {
 namespace dhcp {
 
+/// @brief Defines a generic thread pool queue.
+///
+/// The main purpose is to safely manage thread pool tasks.
+/// The thread pool queue can be 'disabled', which means that no items can be
+/// added or removed from the queue, or 'enabled', which guarantees that
+/// inserting or removing items are thread safe.
+/// In 'disabled' state, all threads waiting on the queue are unlocked and all
+/// operations are non blocking.
 template <typename WorkItem>
 struct ThreadPoolQueue {
+    /// @brief Constructor
+    ///
+    /// Creates the thread pool queue in 'disabled' state
     ThreadPoolQueue() : exit_(true) {
     }
 
+    /// @brief Destructor
+    ///
+    /// Destroys the thread pool queue
     ~ThreadPoolQueue() {
+        stop(true);
     }
 
-    void add(WorkItem item) {
+    /// @brief push work item to the queue
+    ///
+    /// Used to add work items in the queue.
+    /// If the queue is 'disabled', this function returns immediately.
+    /// If the queue is 'enabled', this function adds an item to the queue and
+    /// wakes up at least one thread waiting on the queue.
+    ///
+    /// @param item the new iten to be added to the queue
+    void push(WorkItem item) {
         std::lock_guard<std::mutex> lock(mutex_);
         if (exit_) {
             return;
         }
         queue_.push(item);
-        // Notify get() so that it can effectively get a work item.
+        // Notify pop function so that it can effectively remove a work item.
         cv_.notify_all();
     }
 
-    bool get(WorkItem& item) {
+    /// @brief pop work item from the queue or block waiting
+    ///
+    /// Used to retrieve and remove a work item from the queue
+    /// If the queue is 'disabled', this function returns immediately
+    /// (no element).
+    /// If the queue is 'enabled', this function returns the first element in
+    /// the queue or blocks the calling thread if there are no work items
+    /// available.
+    ///
+    /// @param item the reference of the item removed from the queue, if any
+    ///
+    /// @return true if there was a work item removed from the queue, false
+    /// otherwise
+    bool pop(WorkItem& item) {
         std::unique_lock<std::mutex> lock(mutex_);
         while (!exit_) {
             if (queue_.empty()) {
-                // Wait for add() or destroy().
+                // Wait for push or stop functions.
                 cv_.wait(lock);
                 continue;
             }
@@ -68,62 +103,126 @@ struct ThreadPoolQueue {
         return false;
     }
 
+    /// @brief count number of work items in the queue
+    ///
+    /// Returns the number of work items in the queue
+    ///
+    /// @return the number of work items
     size_t count() {
         std::lock_guard<std::mutex> lock(mutex_);
         return queue_.size();
     }
 
-    void removeAll() {
+    /// @brief clear remove all work items
+    ///
+    /// Removes all queued work items
+    void clear() {
         std::lock_guard<std::mutex> lock(mutex_);
-        removeAllUnsafe();
+        reset();
     }
 
-    void create() {
+    /// @brief start and enable the queue
+    ///
+    /// Sets the queue state to 'enabled'
+    void start() {
         std::lock_guard<std::mutex> lock(mutex_);
         exit_ = false;
     }
 
-    void destroy() {
+    /// brief stop and disable the queue
+    ///
+    /// Sets the queue state to 'disabled' and optionally removes all work items
+    ///
+    /// @param clear used to specify if the function should also clear the queue
+    void stop(bool clear = false) {
         std::lock_guard<std::mutex> lock(mutex_);
         exit_ = true;
         // Notify get() so that it can exit.
         cv_.notify_all();
-        removeAllUnsafe();
+        if (clear) {
+            reset();
+        }
     }
 
 private:
-    /// @brief Has to be called in a mutex_-locked environment.
-    void removeAllUnsafe() {
-        while (queue_.size()) {
-            queue_.pop();
-        }
+    /// @brief reset clears the queue removing all work items
+    ///
+    /// Must be called in a critical section (mutex locked scope).
+    void reset() {
+        queue_ = std::queue<WorkItem>();
     }
 
+    /// @brief underlying queue container
     std::queue<WorkItem> queue_;
+
+    /// @brief mutex used for critical sections
     std::mutex mutex_;
+
+    /// @brief condition variable used to signal waiting threads
     std::condition_variable cv_;
+
+    /// @brief the sate of the queue
+    ///
+    /// The 'enabled' state corresponds to false value
+    /// The 'disabled' state corresponds to true value
     std::atomic_bool exit_;
 };
 
+/// @brief Defines a thread pool which uses a thread pool queue for managing
+/// work items. Each work item is a 'function' object.
 struct ThreadPool {
     using WorkItemCallBack = std::function<void()>;
 
+    /// @brief Constructor
     ThreadPool();
+
+    /// @brief Destructor
     ~ThreadPool();
 
-    void create(uint32_t worker_threads);
+    /// @brief initialize the thread pool with specified number of treads and
+    /// optionally starts all threads.
+    ///
+    /// @param worker_threads specifies the number of threads to be created
+    /// @param run to indicate if the threads should be started immediately
+    void create(uint32_t thread_count, bool run = true);
 
+    /// @brief de-initialize the thread pool stopping threads and clearing the
+    /// internal queue
     void destroy();
 
+    /// @brief start all the threads
+    ///
+    /// @param worker_threads specifies the number of threads to be created
+    void start(uint32_t thread_count);
+
+    /// @brief stop all the threads
+    ///
+    /// @param clear used to specify if the function should also clear the queue
+    void stop(bool clear = false);
+
+    /// @brief add a working item to the thread pool
+    ///
+    /// @param call_back the 'function' object to be added to the queue
     void add(WorkItemCallBack call_back);
 
+    /// @brief count number of work items in the queue
+    ///
+    /// @return the number of work items in the queue
     size_t count();
 
 private:
-    void threadRun();
+    /// @brief run function of each thread
+    void run();
 
+    /// @brief list of worker threads
     std::list<std::shared_ptr<std::thread>> worker_threads_;
+
+    /// @brief underlying work items queue
     ThreadPoolQueue<WorkItemCallBack> queue_;
+
+    /// @brief state of the thread pool
+    /// The 'run' state corresponds to false value
+    /// The 'stop' state corresponds to true value
     std::atomic_bool exit_;
 };