]> git.ipfire.org Git - thirdparty/kea.git/commitdiff
[#1219] Made pending_requests_ thread safe
authorFrancis Dupont <fdupont@isc.org>
Fri, 8 May 2020 23:53:17 +0000 (01:53 +0200)
committerFrancis Dupont <fdupont@isc.org>
Sat, 16 May 2020 13:01:53 +0000 (15:01 +0200)
src/hooks/dhcp/high_availability/ha_service.cc
src/hooks/dhcp/high_availability/ha_service.h
src/hooks/dhcp/high_availability/tests/ha_service_unittest.cc

index 9f475ecc1692384d4a36565c12befe054e1416e4..72c669fc813605f498cfa98d8d7ea709a16cd5e5 100644 (file)
@@ -19,6 +19,7 @@
 #include <http/date_time.h>
 #include <http/response_json.h>
 #include <http/post_request_json.h>
+#include <util/multi_threading_mgr.h>
 #include <util/stopwatch.h>
 #include <boost/pointer_cast.hpp>
 #include <boost/bind.hpp>
@@ -51,7 +52,7 @@ HAService::HAService(const IOServicePtr& io_service, const NetworkStatePtr& netw
                      const HAConfigPtr& config, const HAServerType& server_type)
     : io_service_(io_service), network_state_(network_state), config_(config),
       server_type_(server_type), client_(*io_service), communication_state_(),
-      query_filter_(config), pending_requests_() {
+      query_filter_(config), pending_requests_mutex_(), pending_requests_() {
 
     if (server_type == HAServerType::DHCPv4) {
         communication_state_.reset(new CommunicationState4(io_service_, config));
@@ -913,6 +914,61 @@ HAService::asyncSendLeaseUpdates(const dhcp::Pkt6Ptr& query,
     return (sent_num);
 }
 
+template<typename QueryPtrType>
+bool
+HAService::leaseUpdateComplete(QueryPtrType& query,
+                               const ParkingLotHandlePtr& parking_lot) {
+    if (MultiThreadingMgr::instance().getMode()) {
+        std::lock_guard<std::mutex> lock(pending_requests_mutex_);
+        return (leaseUpdateCompleteInternal(query, parking_lot));
+    } else {
+        return (leaseUpdateCompleteInternal(query, parking_lot));
+    }
+}
+
+template<typename QueryPtrType>
+bool
+HAService::leaseUpdateCompleteInternal(QueryPtrType& query,
+                                       const ParkingLotHandlePtr& parking_lot) {
+    auto it = pending_requests_.find(query);
+
+    // If there are no more pending requests for this query, let's unpark
+    // the DHCP packet.
+    if (it == pending_requests_.end() || (--pending_requests_[query] <= 0)) {
+        parking_lot->unpark(query);
+
+        // If we have unparked the packet we can clear pending requests for
+        // this query.
+        if (it != pending_requests_.end()) {
+            pending_requests_.erase(it);
+        }
+        return (true);
+    }
+    return (false);
+}
+
+template<typename QueryPtrType>
+void
+HAService::updatePendingRequest(QueryPtrType& query) {
+    if (MultiThreadingMgr::instance().getMode()) {
+        std::lock_guard<std::mutex> lock(pending_requests_mutex_);
+        updatePendingRequestInternal(query);
+    } else {
+        updatePendingRequestInternal(query);
+    }
+}
+
+template<typename QueryPtrType>
+void
+HAService::updatePendingRequestInternal(QueryPtrType& query) {
+    if (pending_requests_.count(query) == 0) {
+        pending_requests_[query] = 1;
+
+    } else {
+        ++pending_requests_[query];
+    }
+}
+
 template<typename QueryPtrType>
 void
 HAService::asyncSendLeaseUpdate(const QueryPtrType& query,
@@ -1020,19 +1076,7 @@ HAService::asyncSendLeaseUpdate(const QueryPtrType& query,
                 return;
             }
 
-            auto it = pending_requests_.find(query);
-
-            // If there are no more pending requests for this query, let's unpark
-            // the DHCP packet.
-            if (it == pending_requests_.end() || (--pending_requests_[query] <= 0)) {
-                parking_lot->unpark(query);
-
-                // If we have unparked the packet we can clear pending requests for
-                // this query.
-                if (it != pending_requests_.end()) {
-                    pending_requests_.erase(it);
-                }
-
+            if (leaseUpdateComplete(query, parking_lot)) {
                 // If we have finished sending the lease updates we need to run the
                 // state machine until the state machine finds that additional events
                 // are required, such as next heartbeat or a lease update. The runModel()
@@ -1052,12 +1096,7 @@ HAService::asyncSendLeaseUpdate(const QueryPtrType& query,
     // a backup increase the number of pending requests.
     if (config_->amWaitingBackupAck() || (config->getRole() != HAConfig::PeerConfig::BACKUP)) {
         // Request scheduled, so update the request counters for the query.
-        if (pending_requests_.count(query) == 0) {
-            pending_requests_[query] = 1;
-
-        } else {
-            ++pending_requests_[query];
-        }
+        updatePendingRequest(query);
     }
 }
 
@@ -2200,5 +2239,15 @@ HAService::clientCloseHandler(int tcp_native_fd) {
     }
 };
 
+size_t
+HAService::pendingRequestSize() {
+    if (MultiThreadingMgr::instance().getMode()) {
+        std::lock_guard<std::mutex> lock(pending_requests_mutex_);
+        return (pending_requests_.size());
+    } else {
+        return (pending_requests_.size());
+    }
+}
+
 } // end of namespace isc::ha
 } // end of namespace isc
index 8f438920eab700e27f4422dd69f474cb3fb99dc0..448b11962e6eafdd3fe1bb3a791d46a64b588201 100644 (file)
@@ -25,6 +25,7 @@
 #include <boost/shared_ptr.hpp>
 #include <functional>
 #include <map>
+#include <mutex>
 #include <vector>
 
 namespace isc {
@@ -376,7 +377,7 @@ private:
     /// This is a generic implementation of the public @c inScope method
     /// variants.
     ///
-    /// @tparam type of the pointer to the DHCP query.
+    /// @tparam QueryPtrType type of the pointer to the DHCP query.
     /// @param [out] query6 pointer to the DHCP query received. A client class
     /// will be appended to this query instance, appropriate for the server to
     /// process this query, e.g. "HA_server1" if the "server1" should process
@@ -917,6 +918,72 @@ protected:
     /// @brief Selects queries to be processed/dropped.
     QueryFilter query_filter_;
 
+    /// @brief Handle last pending request for this query.
+    ///
+    /// Search if there are pending requests for this query:
+    ///  - if there are decrement the count
+    ///  - if there were at least two return false
+    ///  - if there was none or one unpark the query
+    ///  - if there was one remove the query from the map
+    ///  - return true
+    ///
+    /// @tparam QueryPtrType Type of the pointer to the DHCP client's message,
+    /// i.e. Pkt4Ptr or Pkt6Ptr.
+    /// @param query Pointer to the DHCP client's query.
+    /// @param [out] parking_lot Parking lot where the query is parked.
+    /// This method uses this handle to unpark the packet when all asynchronous
+    /// requests have been completed.
+    /// @return True when all lease updates are complete, false otherwise.
+    template<typename QueryPtrType>
+    bool leaseUpdateComplete(QueryPtrType& query,
+                             const hooks::ParkingLotHandlePtr& parking_lot);
+
+    /// @brief Update pending request counter for this query.
+    ///
+    /// @tparam QueryPtrType Type of the pointer to the DHCP client's message,
+    /// i.e. Pkt4Ptr or Pkt6Ptr.
+    /// @param query Pointer to the DHCP client's query.
+    template<typename QueryPtrType>
+    void updatePendingRequest(QueryPtrType& query);
+
+protected:
+    /// @brief Get the number of entries in the pending request map.
+    /// @note Currently for testing purposes only.
+    /// @return Number of entries in the pending request map.
+    size_t pendingRequestSize();
+
+private:
+    /// @brief Handle last pending request for this query.
+    ///
+    /// Search if there are pending requests for this query:
+    ///  - if there are decrement the count
+    ///  - if there were at least two return false
+    ///  - if there was none or one unpark the query
+    ///  - if there was one remove the query from the map
+    ///  - return true
+    ///
+    /// @tparam QueryPtrType Type of the pointer to the DHCP client's message,
+    /// i.e. Pkt4Ptr or Pkt6Ptr.
+    /// @param query Pointer to the DHCP client's query.
+    /// @param [out] parking_lot Parking lot where the query is parked.
+    /// This method uses this handle to unpark the packet when all asynchronous
+    /// requests have been completed.
+    /// @return True when all lease updates are complete, false otherwise.
+    template<typename QueryPtrType>
+    bool leaseUpdateCompleteInternal(QueryPtrType& query,
+                                     const hooks::ParkingLotHandlePtr& parking_lot);
+
+    /// @brief Update pending request counter for this query.
+    ///
+    /// @tparam QueryPtrType Type of the pointer to the DHCP client's message,
+    /// i.e. Pkt4Ptr or Pkt6Ptr.
+    /// @param query Pointer to the DHCP client's query.
+    template<typename QueryPtrType>
+    void updatePendingRequestInternal(QueryPtrType& query);
+
+    /// @brief Mutex to protect the pending_requests_ map.
+    std::mutex pending_requests_mutex_;
+
     /// @brief Map holding a number of scheduled requests for a given packet.
     ///
     /// A single callout may send multiple requests at the same time, e.g.
index 74750902c356506c36a857659a19f34e6ed302b3..1d39a83c6f4fc1187c152b753d17cebb60986013 100644 (file)
@@ -189,11 +189,11 @@ public:
     using HAService::transition;
     using HAService::verboseTransition;
     using HAService::shouldSendLeaseUpdates;
+    using HAService::pendingRequestSize;
     using HAService::network_state_;
     using HAService::config_;
     using HAService::communication_state_;
     using HAService::query_filter_;
-    using HAService::pending_requests_;
 };
 
 /// @brief Pointer to the @c TestHAService.
@@ -738,7 +738,7 @@ public:
         // Actually perform the lease updates.
         ASSERT_NO_THROW(runIOService(TEST_TIMEOUT, [&service]() {
             // Finish running IO service when there are no more pending requests.
-            return (service.pending_requests_.empty());
+            return (service.pendingRequestSize() == 0);
         }));
 
         // Only if we wait for lease updates to complete it makes senst to test
@@ -842,7 +842,7 @@ public:
         // Actually perform the lease updates.
         ASSERT_NO_THROW(runIOService(TEST_TIMEOUT, [&service]() {
             // Finish running IO service when there are no more pending requests.
-            return (service.pending_requests_.empty());
+            return (service.pendingRequestSize() == 0);
         }));
 
         // Only if we wait for lease updates to complete it makes senst to test