]> git.ipfire.org Git - thirdparty/kea.git/commitdiff
[#1147] Checkpoint: implemented queue of one for v6 client
authorFrancis Dupont <fdupont@isc.org>
Sat, 9 May 2020 16:04:23 +0000 (18:04 +0200)
committerFrancis Dupont <fdupont@isc.org>
Tue, 26 May 2020 09:51:57 +0000 (11:51 +0200)
src/bin/dhcp6/client_handler.cc
src/bin/dhcp6/client_handler.h
src/bin/dhcp6/tests/client_handler_unittest.cc
src/lib/util/tests/thread_pool_unittest.cc
src/lib/util/thread_pool.h

index 9cbd5782bc3fccd8c7b633d0783c4228f24ef8e0..e2eba6f62e72242082f76ab80836dba35247acec 100644 (file)
 #include <dhcp6/dhcp6_log.h>
 #include <exceptions/exceptions.h>
 #include <stats/stats_mgr.h>
+#include <util/multi_threading_mgr.h>
 
 using namespace std;
+using namespace isc::util;
 
 namespace isc {
 namespace dhcp {
@@ -71,10 +73,22 @@ ClientHandler::unLock() {
     }
     // Assume erase will never fail so not checking its result.
     clients_.erase(locked_->getDuid());
+    if (!client_ || !client_->cont_) {
+        return;
+    }
+    // Try to process next query. As the caller holds the mutex of
+    // the handler class the continuation will be resumed after.
+    MultiThreadingMgr& mt_mgr = MultiThreadingMgr::instance();
+    if (mt_mgr.getMode()) {
+        if (!mt_mgr.getThreadPool().addFront(client_->cont_)) {
+            LOG_DEBUG(dhcp6_logger, DBG_DHCP6_BASIC, DHCP6_PACKET_QUEUE_FULL);
+        }
+    }
+    client_->cont_.reset();
 }
 
 bool
-ClientHandler::tryLock(Pkt6Ptr query) {
+ClientHandler::tryLock(Pkt6Ptr query, ContinuationPtr cont) {
     if (!query) {
         isc_throw(InvalidParameter, "null query in ClientHandler::tryLock");
     }
@@ -102,7 +116,25 @@ ClientHandler::tryLock(Pkt6Ptr query) {
             return (false);
         }
     }
-    // This query is a duplicate: currently it is simply dropped.
+    // This query can be a duplicate so put the continuation.
+    if (cont) {
+        Pkt6Ptr next_query = holder->next_query_;
+        holder->next_query_ = query;
+        holder->cont_ = cont;
+        if (next_query) {
+            // Logging a warning as it is supposed to be a rare event
+            // with well behaving clients...
+            LOG_WARN(bad_packet6_logger, DHCP6_PACKET_DROP_DUPLICATE)
+                .arg(next_query->toText())
+                .arg(this_thread::get_id())
+                .arg(query->toText())
+                .arg(this_thread::get_id());
+            stats::StatsMgr::instance().addValue("pkt6-receive-drop",
+                                                 static_cast<int64_t>(1));
+        }
+        return (false);
+    }
+
     // Logging a warning as it is supposed to be a rare event
     // with well behaving clients...
     LOG_WARN(bad_packet6_logger, DHCP6_PACKET_DROP_DUPLICATE)
index cb3119652466ac709fa958326908353545487c4e..01f0dd113088c95de7f9e2813c888691d1ef3c1b 100644 (file)
@@ -13,6 +13,7 @@
 #include <boost/multi_index/hashed_index.hpp>
 #include <boost/multi_index/member.hpp>
 #include <boost/shared_ptr.hpp>
+#include <functional>
 #include <mutex>
 #include <thread>
 
@@ -23,6 +24,19 @@ namespace dhcp {
 class ClientHandler : public boost::noncopyable {
 public:
 
+    /// @brief Define the type of packet processing continuation.
+    typedef std::function<void()> Continuation;
+
+    /// @brief Define the type of shared pointers to continuations.
+    typedef boost::shared_ptr<Continuation> ContinuationPtr;
+
+    /// @brief Continuation factory.
+    ///
+    /// @param cont Continuation rvalue.
+    static ContinuationPtr makeContinuation(Continuation&& cont) {
+        return (boost::make_shared<Continuation>(cont));
+    }
+
     /// @brief Constructor.
     ClientHandler();
 
@@ -33,10 +47,15 @@ public:
 
     /// @brief Tries to acquires a client.
     ///
+    /// Lookup the client:
+    ///  - if not found insert the client in the clients map and return true
+    ///  - if found put the continuation in the holder and return false
+    ///
     /// @param query The query from the client.
+    /// @param cont The continuation in the case the client was held.
     /// @return true if the client was acquired, false if there is already
     /// a query from the same client.
-    bool tryLock(Pkt6Ptr query);
+    bool tryLock(Pkt6Ptr query, ContinuationPtr cont = ContinuationPtr());
 
 private:
 
@@ -58,6 +77,12 @@ private:
 
         /// @brief The ID of the thread processing the query.
         std::thread::id thread_;
+
+        /// @brief The next query.
+        Pkt6Ptr next_query_;
+
+        /// @brief The continuation to process next query for the client.
+        ContinuationPtr cont_;
     };
 
     /// @brief The type of shared pointers to clients.
@@ -87,6 +112,9 @@ private:
 
     /// @brief Release a client.
     ///
+    /// If the client has a continuation, push it at front of the thread
+    /// packet queue.
+    ///
     /// The mutex must be held by the caller.
     void unLock();
 
index 469e5c6508c24f795102da5490c2134e990bbc0c..8f6f1f36ec1dda364b6e70c34eeecb255e44e93b 100644 (file)
@@ -9,6 +9,8 @@
 #include <dhcp6/client_handler.h>
 #include <dhcp6/tests/dhcp6_test_utils.h>
 #include <stats/stats_mgr.h>
+#include <util/multi_threading_mgr.h>
+#include <unistd.h>
 
 using namespace isc;
 using namespace isc::dhcp;
@@ -25,7 +27,8 @@ public:
     /// @brief Constructor.
     ///
     /// Creates the pkt6-receive-drop statistic.
-    ClientHandleTest() {
+    ClientHandleTest() : called1_(false), called2_(false), called3_(false) {
+        MultiThreadingMgr::instance().apply(false, 0, 0);
         StatsMgr::instance().setValue("pkt6-receive-drop", static_cast<int64_t>(0));
     }
 
@@ -33,6 +36,7 @@ public:
     ///
     /// Removes statistics.
     ~ClientHandleTest() {
+        MultiThreadingMgr::instance().apply(false, 0, 0);
         StatsMgr::instance().removeAll();
     }
 
@@ -63,6 +67,38 @@ public:
             EXPECT_EQ(0, obs->getInteger().first);
         }
     }
+
+    /// @brief Waits for pending continuations.
+    void waitForThreads() {
+        while (MultiThreadingMgr::instance().getThreadPool().count() > 0) {
+            usleep(100);
+        }
+    }
+
+    /// @brief Set called1_ to true.
+    void setCalled1() {
+        called1_ = true;
+    }
+
+    /// @brief Set called2_ to true.
+    void setCalled2() {
+        called2_ = true;
+    }
+
+    /// @brief Set called3_ to true.
+    void setCalled3() {
+        called3_ = true;
+    }
+
+    /// @brief The called flag number 1.
+    bool called1_;
+
+    /// @brief The called flag number 2.
+    bool called2_;
+
+    /// @brief The called flag number 3.
+    bool called3_;
+
 };
 
 // Verifies behavior with empty block.
@@ -73,6 +109,7 @@ TEST_F(ClientHandleTest, empty) {
     } catch (const std::exception& ex) {
         ADD_FAILURE() << "unexpected exception: " << ex.what();
     }
+    checkStat(false);
 }
 
 // Verifies behavior with one query.
@@ -283,4 +320,179 @@ TEST_F(ClientHandleTest, doubleTryLock) {
 // Cannot verifies that empty client ID fails because getClientId() handles
 // this condition and replaces it by no client ID.
 
+// Verifies behavior with two queries for the same client and multi-threading.
+TEST_F(ClientHandleTest, serializeTwoQueries) {
+    // Get two queries.
+    Pkt6Ptr sol(new Pkt6(DHCPV6_SOLICIT, 1234));
+    Pkt6Ptr req(new Pkt6(DHCPV6_REQUEST, 2345));
+    OptionPtr client_id = generateClientId();
+    // Same client ID: same client.
+    sol->addOption(client_id);
+    req->addOption(client_id);
+
+    // Start multi-threading.
+    EXPECT_NO_THROW(MultiThreadingMgr::instance().apply(true, 1, 0));
+
+    try {
+        // Get a client handler.
+        ClientHandler client_handler;
+
+        // Create a continuation.
+        ClientHandler::ContinuationPtr cont1 =
+            ClientHandler::makeContinuation(std::bind(&ClientHandleTest::setCalled1, this));
+
+        // Try to lock it with the solicit.
+        bool duplicate = false;
+        EXPECT_NO_THROW(duplicate = client_handler.tryLock(sol, cont1));
+
+        // Should return false (no duplicate).
+        EXPECT_FALSE(duplicate);
+
+        // Get a second client handler.
+        ClientHandler client_handler2;
+
+        // Create a continuation.
+        ClientHandler::ContinuationPtr cont2 =
+            ClientHandler::makeContinuation(std::bind(&ClientHandleTest::setCalled2, this));
+
+        // Try to lock it with a request.
+        EXPECT_NO_THROW(duplicate = client_handler2.tryLock(req, cont2));
+
+        // Should return false (multi-threading enforces serialization).
+        EXPECT_FALSE(duplicate);
+    } catch (const std::exception& ex) {
+        ADD_FAILURE() << "unexpected exception: " << ex.what();
+    }
+
+    // Give the second continuation a chance.
+    waitForThreads();
+
+    // Force multi-threading to stop;
+    MultiThreadingCriticalSection cs;
+
+    checkStat(false);
+    EXPECT_FALSE(called1_);
+    EXPECT_TRUE(called2_);
+}
+
+// Verifies behavior with two queries for the same client and multi-threading.
+// Continuations are required for serialization.
+TEST_F(ClientHandleTest, serializeNoCont) {
+    // Get two queries.
+    Pkt6Ptr sol(new Pkt6(DHCPV6_SOLICIT, 1234));
+    Pkt6Ptr req(new Pkt6(DHCPV6_REQUEST, 2345));
+    OptionPtr client_id = generateClientId();
+    // Same client ID: same client.
+    sol->addOption(client_id);
+    req->addOption(client_id);
+
+    // Start multi-threading.
+    EXPECT_NO_THROW(MultiThreadingMgr::instance().apply(true, 1, 0));
+
+    try {
+        // Get a client handler.
+        ClientHandler client_handler;
+
+        // Try to lock it with the solicit.
+        bool duplicate = false;
+        EXPECT_NO_THROW(duplicate = client_handler.tryLock(sol));
+
+        // Should return false (no duplicate).
+        EXPECT_FALSE(duplicate);
+
+        // Get a second client handler.
+        ClientHandler client_handler2;
+
+        // Try to lock it with a request.
+        EXPECT_NO_THROW(duplicate = client_handler2.tryLock(req));
+
+        // Should return true (duplicate without continuation).
+        EXPECT_TRUE(duplicate);
+    } catch (const std::exception& ex) {
+        ADD_FAILURE() << "unexpected exception: " << ex.what();
+    }
+
+    // Give the second continuation a chance even there is none...
+    waitForThreads();
+
+    // Force multi-threading to stop;
+    MultiThreadingCriticalSection cs;
+
+    checkStat(true);
+}
+
+// Verifies behavior with three queries for the same client and
+// multi-threading: currently we accept only two queries,
+// a third one replaces second so we get the first (oldest) query and
+// the last (newest) query when the client is busy.
+TEST_F(ClientHandleTest, serializeThreeQueries) {
+    // Get two queries.
+    Pkt6Ptr sol(new Pkt6(DHCPV6_SOLICIT, 1234));
+    Pkt6Ptr req(new Pkt6(DHCPV6_REQUEST, 2345));
+    Pkt6Ptr ren(new Pkt6(DHCPV6_RENEW, 3456));
+    OptionPtr client_id = generateClientId();
+    // Same client ID: same client.
+    sol->addOption(client_id);
+    req->addOption(client_id);
+    ren->addOption(client_id);
+
+    // Start multi-threading.
+    EXPECT_NO_THROW(MultiThreadingMgr::instance().apply(true, 1, 0));
+
+    try {
+        // Get a client handler.
+        ClientHandler client_handler;
+
+        // Create a continuation.
+        ClientHandler::ContinuationPtr cont1 =
+            ClientHandler::makeContinuation(std::bind(&ClientHandleTest::setCalled1, this));
+
+        // Try to lock it with the solicit.
+        bool duplicate = false;
+        EXPECT_NO_THROW(duplicate = client_handler.tryLock(sol, cont1));
+
+        // Should return false (no duplicate).
+        EXPECT_FALSE(duplicate);
+
+        // Get a second client handler.
+        ClientHandler client_handler2;
+
+        // Create a continuation.
+        ClientHandler::ContinuationPtr cont2 =
+            ClientHandler::makeContinuation(std::bind(&ClientHandleTest::setCalled2, this));
+
+        // Try to lock it with a request.
+        EXPECT_NO_THROW(duplicate = client_handler2.tryLock(req, cont2));
+
+        // Should return false (multi-threading enforces serialization).
+        EXPECT_FALSE(duplicate);
+
+        // Get a third client handler.
+        ClientHandler client_handler3;
+
+        // Create a continuation.
+        ClientHandler::ContinuationPtr cont3 =
+            ClientHandler::makeContinuation(std::bind(&ClientHandleTest::setCalled3, this));
+
+        // Try to lock it with a renew.
+        EXPECT_NO_THROW(duplicate = client_handler3.tryLock(ren, cont3));
+
+        // Should return false (multi-threading enforces serialization).
+        EXPECT_FALSE(duplicate);
+    } catch (const std::exception& ex) {
+        ADD_FAILURE() << "unexpected exception: " << ex.what();
+    }
+
+    // Give the second continuation a chance.
+    waitForThreads();
+
+    // Force multi-threading to stop;
+    MultiThreadingCriticalSection cs;
+
+    checkStat(true);
+    EXPECT_FALSE(called1_);
+    EXPECT_FALSE(called2_);
+    EXPECT_TRUE(called3_);
+}
+
 } // end of anonymous namespace
index c2902bc0f7c1510dd93846d232e835b74fcb18a6..41daecc8559efaedb4586e0fdb4d777bee0c321a 100644 (file)
@@ -205,7 +205,7 @@ private:
 };
 
 /// @brief test ThreadPool add and count
-TEST_F(ThreadPoolTest, testAddAndCount) {
+TEST_F(ThreadPoolTest, addAndCount) {
     uint32_t items_count;
     CallBack call_back;
     ThreadPool<CallBack> thread_pool;
@@ -237,7 +237,7 @@ TEST_F(ThreadPoolTest, testAddAndCount) {
 }
 
 /// @brief test ThreadPool start and stop
-TEST_F(ThreadPoolTest, testStartAndStop) {
+TEST_F(ThreadPoolTest, startAndStop) {
     uint32_t items_count;
     uint32_t thread_count;
     CallBack call_back;
@@ -455,7 +455,7 @@ TEST_F(ThreadPoolTest, testStartAndStop) {
 }
 
 /// @brief test ThreadPool max queue size
-TEST_F(ThreadPoolTest, testMaxQueueSize) {
+TEST_F(ThreadPoolTest, maxQueueSize) {
     uint32_t items_count;
     CallBack call_back;
     ThreadPool<CallBack> thread_pool;
@@ -491,4 +491,41 @@ TEST_F(ThreadPoolTest, testMaxQueueSize) {
     EXPECT_EQ(thread_pool.count(), max_queue_size);
 }
 
+/// @brief test ThreadPool add front.
+TEST_F(ThreadPoolTest, addFront) {
+    uint32_t items_count;
+    CallBack call_back;
+    ThreadPool<CallBack> thread_pool;
+    // the item count should be 0
+    ASSERT_EQ(thread_pool.count(), 0);
+    // the thread count should be 0
+    ASSERT_EQ(thread_pool.size(), 0);
+
+    items_count = 20;
+
+    call_back = std::bind(&ThreadPoolTest::run, this);
+
+    // add items to stopped thread pool
+    bool ret = true;
+    for (uint32_t i = 0; i < items_count; ++i) {
+        EXPECT_NO_THROW(ret = thread_pool.addFront(boost::make_shared<CallBack>(call_back)));
+        EXPECT_TRUE(ret);
+    }
+
+    // the item count should match
+    ASSERT_EQ(thread_pool.count(), items_count);
+
+    // change the max count
+    ASSERT_EQ(thread_pool.getMaxQueueSize(), 0);
+    size_t max_queue_size = 10;
+    thread_pool.setMaxQueueSize(max_queue_size);
+    EXPECT_EQ(thread_pool.getMaxQueueSize(), max_queue_size);
+
+    // adding an item at front should change nothing queue
+    EXPECT_EQ(thread_pool.count(), items_count);
+    EXPECT_NO_THROW(ret = thread_pool.addFront(boost::make_shared<CallBack>(call_back)));
+    EXPECT_FALSE(ret);
+    EXPECT_EQ(thread_pool.count(), items_count);
+}
+
 }  // namespace
index ccad7bdf378596861a81f76fc79a16d324a55a98..fdc29ee7fedcceda231bb7f1f9eccd683de7c4b6 100644 (file)
@@ -26,7 +26,7 @@ namespace util {
 ///
 /// @tparam WorkItem a functor
 /// @tparam Container a 'queue like' container
-template <typename WorkItem, typename Container = std::queue<boost::shared_ptr<WorkItem>>>
+template <typename WorkItem, typename Container = std::deque<boost::shared_ptr<WorkItem>>>
 struct ThreadPool {
     typedef typename boost::shared_ptr<WorkItem> WorkItemPtr;
 
@@ -81,7 +81,15 @@ struct ThreadPool {
     /// @return false if the queue was full and oldest item(s) was dropped,
     /// true otherwise.
     bool add(const WorkItemPtr& item) {
-        return (queue_.push(item));
+        return (queue_.push_back(item));
+    }
+
+    /// @brief add a work item to the thread pool at front
+    ///
+    /// @param item the 'functor' object to be added to the queue
+    /// @return false if the queue was full, true otherwise.
+    bool addFront(const WorkItemPtr& item) {
+        return (queue_.push_front(item));
     }
 
     /// @brief count number of work items in the queue
@@ -203,7 +211,7 @@ private:
         /// @param item the new item to be added to the queue
         /// @return false if the queue was full and oldest item(s) dropped,
         /// true otherwise
-        bool push(const Item& item) {
+        bool push_back(const Item& item) {
             bool ret = true;
             if (!item) {
                 return (ret);
@@ -212,17 +220,41 @@ private:
                 std::lock_guard<std::mutex> lock(mutex_);
                 if (max_queue_size_ != 0) {
                     while (queue_.size() >= max_queue_size_) {
-                        queue_.pop();
+                        queue_.pop_front();
                         ret = false;
                     }
                 }
-                queue_.push(item);
+                queue_.push_back(item);
             }
             // Notify pop function so that it can effectively remove a work item.
             cv_.notify_one();
             return (ret);
         }
 
+        /// @brief push work item to the queue at front.
+        ///
+        /// Used to add work items to the queue at front.
+        /// When the queue is full the item is not added.
+        ///
+        /// @param item the new item to be added to the queue
+        /// @return false if the queue was full, true otherwise
+        bool push_front(const Item& item) {
+            if (!item) {
+                return (true);
+            }
+            {
+                std::lock_guard<std::mutex> lock(mutex_);
+                if ((max_queue_size_ != 0) &&
+                    (queue_.size() >= max_queue_size_)) {
+                        return (false);
+                }
+                queue_.push_front(item);
+            }
+            // Notify pop function so that it can effectively remove a work item.
+            cv_.notify_one();
+            return (true);
+        }
+
         /// @brief pop work item from the queue or block waiting
         ///
         /// Used to retrieve and remove a work item from the queue
@@ -241,7 +273,7 @@ private:
                 return (Item());
             }
             Item item = queue_.front();
-            queue_.pop();
+            queue_.pop_front();
             return (item);
         }