]> git.ipfire.org Git - thirdparty/kea.git/commitdiff
[#1239] communication state is now thread safe
authorRazvan Becheriu <razvan@isc.org>
Thu, 4 Jun 2020 21:04:59 +0000 (00:04 +0300)
committerRazvan Becheriu <razvan@isc.org>
Tue, 16 Jun 2020 09:02:52 +0000 (09:02 +0000)
src/hooks/dhcp/high_availability/communication_state.cc
src/hooks/dhcp/high_availability/communication_state.h
src/lib/http/client.cc

index 488849390243461fa9fd4a5ba30d16f224ee2a5d..9bac62e578916a3ef893de814981fb800523999c 100644 (file)
@@ -364,6 +364,16 @@ CommunicationState4::CommunicationState4(const IOServicePtr& io_service,
 
 void
 CommunicationState4::analyzeMessage(const boost::shared_ptr<dhcp::Pkt>& message) {
+    if (MultiThreadingMgr::instance().getMode()) {
+        std::lock_guard<std::mutex> lk(*mutex_);
+        analyzeMessageInternal(message);
+    } else {
+        analyzeMessageInternal(message);
+    }
+}
+
+void
+CommunicationState4::analyzeMessageInternal(const boost::shared_ptr<dhcp::Pkt>& message) {
     // The DHCP message must successfully cast to a Pkt4 object.
     Pkt4Ptr msg = boost::dynamic_pointer_cast<Pkt4>(message);
     if (!msg) {
@@ -432,34 +442,74 @@ CommunicationState4::analyzeMessage(const boost::shared_ptr<dhcp::Pkt>& message)
     // Only log the first time we detect a client is unacked.
     if (log_unacked) {
         unsigned unacked_left = 0;
-        if (config_->getMaxUnackedClients() > getUnackedClientsCount()) {
-            unacked_left = config_->getMaxUnackedClients() > getUnackedClientsCount();
+        if (config_->getMaxUnackedClients() > getUnackedClientsCountInternal()) {
+            unacked_left = config_->getMaxUnackedClients() - getUnackedClientsCountInternal();
         }
         LOG_INFO(ha_logger, HA_COMMUNICATION_INTERRUPTED_CLIENT4_UNACKED)
             .arg(message->getLabel())
-            .arg(getUnackedClientsCount())
+            .arg(getUnackedClientsCountInternal())
             .arg(unacked_left);
     }
 }
 
 bool
 CommunicationState4::failureDetected() const {
+    if (MultiThreadingMgr::instance().getMode()) {
+        std::lock_guard<std::mutex> lk(*mutex_);
+        return (failureDetectedInternal());
+    } else {
+        return (failureDetectedInternal());
+    }
+}
+
+bool
+CommunicationState4::failureDetectedInternal() const {
     return ((config_->getMaxUnackedClients() == 0) ||
             (getUnackedClientsCount() > config_->getMaxUnackedClients()));
 }
 
 size_t
 CommunicationState4::getConnectingClientsCount() const {
+    if (MultiThreadingMgr::instance().getMode()) {
+        std::lock_guard<std::mutex> lk(*mutex_);
+        return (getConnectingClientsCountInternal());
+    } else {
+        return (getConnectingClientsCountInternal());
+    }
+}
+
+size_t
+CommunicationState4::getConnectingClientsCountInternal() const {
     return (connecting_clients_.size());
 }
 
 size_t
 CommunicationState4::getUnackedClientsCount() const {
+    if (MultiThreadingMgr::instance().getMode()) {
+        std::lock_guard<std::mutex> lk(*mutex_);
+        return (getUnackedClientsCountInternal());
+    } else {
+        return (getUnackedClientsCountInternal());
+    }
+}
+
+size_t
+CommunicationState4::getUnackedClientsCountInternal() const {
     return (connecting_clients_.get<1>().count(true));
 }
 
 void
 CommunicationState4::clearConnectingClients() {
+    if (MultiThreadingMgr::instance().getMode()) {
+        std::lock_guard<std::mutex> lk(*mutex_);
+        clearConnectingClientsInternal();
+    } else {
+        clearConnectingClientsInternal();
+    }
+}
+
+void
+CommunicationState4::clearConnectingClientsInternal() {
     connecting_clients_.clear();
 }
 
@@ -470,6 +520,16 @@ CommunicationState6::CommunicationState6(const IOServicePtr& io_service,
 
 void
 CommunicationState6::analyzeMessage(const boost::shared_ptr<dhcp::Pkt>& message) {
+    if (MultiThreadingMgr::instance().getMode()) {
+        std::lock_guard<std::mutex> lk(*mutex_);
+        analyzeMessageInternal(message);
+    } else {
+        analyzeMessageInternal(message);
+    }
+}
+
+void
+CommunicationState6::analyzeMessageInternal(const boost::shared_ptr<dhcp::Pkt>& message) {
     // The DHCP message must successfully cast to a Pkt6 object.
     Pkt6Ptr msg = boost::dynamic_pointer_cast<Pkt6>(message);
     if (!msg) {
@@ -526,34 +586,74 @@ CommunicationState6::analyzeMessage(const boost::shared_ptr<dhcp::Pkt>& message)
     // Only log the first time we detect a client is unacked.
     if (log_unacked) {
         unsigned unacked_left = 0;
-        if (config_->getMaxUnackedClients() > getUnackedClientsCount()) {
-            unacked_left = config_->getMaxUnackedClients() > getUnackedClientsCount();
+        if (config_->getMaxUnackedClients() > getUnackedClientsCountInternal()) {
+            unacked_left = config_->getMaxUnackedClients() - getUnackedClientsCountInternal();
         }
         LOG_INFO(ha_logger, HA_COMMUNICATION_INTERRUPTED_CLIENT6_UNACKED)
             .arg(message->getLabel())
-            .arg(getUnackedClientsCount())
+            .arg(getUnackedClientsCountInternal())
             .arg(unacked_left);
     }
 }
 
 bool
 CommunicationState6::failureDetected() const {
+    if (MultiThreadingMgr::instance().getMode()) {
+        std::lock_guard<std::mutex> lk(*mutex_);
+        return (failureDetectedInternal());
+    } else {
+        return (failureDetectedInternal());
+    }
+}
+
+bool
+CommunicationState6::failureDetectedInternal() const {
     return ((config_->getMaxUnackedClients() == 0) ||
             (getUnackedClientsCount() > config_->getMaxUnackedClients()));
 }
 
 size_t
 CommunicationState6::getConnectingClientsCount() const {
+    if (MultiThreadingMgr::instance().getMode()) {
+        std::lock_guard<std::mutex> lk(*mutex_);
+        return (getConnectingClientsCountInternal());
+    } else {
+        return (getConnectingClientsCountInternal());
+    }
+}
+
+size_t
+CommunicationState6::getConnectingClientsCountInternal() const {
     return (connecting_clients_.size());
 }
 
 size_t
 CommunicationState6::getUnackedClientsCount() const {
+    if (MultiThreadingMgr::instance().getMode()) {
+        std::lock_guard<std::mutex> lk(*mutex_);
+        return (getUnackedClientsCountInternal());
+    } else {
+        return (getUnackedClientsCountInternal());
+    }
+}
+
+size_t
+CommunicationState6::getUnackedClientsCountInternal() const {
     return (connecting_clients_.get<1>().count(true));
 }
 
 void
 CommunicationState6::clearConnectingClients() {
+    if (MultiThreadingMgr::instance().getMode()) {
+        std::lock_guard<std::mutex> lk(*mutex_);
+        clearConnectingClientsInternal();
+    } else {
+        clearConnectingClientsInternal();
+    }
+}
+
+void
+CommunicationState6::clearConnectingClientsInternal() {
     connecting_clients_.clear();
 }
 
index 2ffffa4f8ab89eaa832c08fb0ff44bb8141257df..fc0e0b8761c7371f14f7ccabd9265d5590b5df68 100644 (file)
@@ -497,6 +497,57 @@ public:
 
 protected:
 
+    /// @brief Checks if the DHCPv4 message appears to be unanswered.
+    ///
+    /// Should be called in a thread safe context.
+    ///
+    /// This method uses "secs" field value for detecting client
+    /// communication failures as described in the
+    /// @c CommunicationState::analyzeMessage. Some misbehaving Windows
+    /// clients were reported to swap "secs" field bytes. In this case
+    /// the first byte is set to non-zero byte and the second byte is
+    /// set to 0. This method handles such cases and corrects bytes
+    /// order before comparing against the threshold.
+    ///
+    /// @param message DHCPv4 message to be analyzed. This must be the
+    /// message which belongs to the partner, i.e. the caller must
+    /// filter out messages belonging to the partner prior to calling
+    /// this method.
+    virtual void analyzeMessageInternal(const boost::shared_ptr<dhcp::Pkt>& message);
+
+    /// @brief Checks if the partner failure has been detected based
+    /// on the DHCP traffic analysis.
+    ///
+    /// Should be called in a thread safe context.
+    ///
+    /// @return true if the partner failure has been detected, false
+    /// otherwise.
+    virtual bool failureDetectedInternal() const;
+
+    /// @brief Returns the current number of clients which attempted
+    /// to get a lease from the partner server.
+    ///
+    /// Should be called in a thread safe context.
+    ///
+    /// The returned number is reset to 0 when the server successfully
+    /// establishes communication with the partner. The number is
+    /// incremented only in the communications interrupted case.
+    ///
+    /// @return The number of clients including unacked clients.
+    virtual size_t getConnectingClientsCountInternal() const;
+
+    /// @brief Returns the current number of clients which haven't gotten
+    /// a lease from the partner server.
+    ///
+    /// Should be called in a thread safe context.
+    ///
+    /// The returned number is reset to 0 when the server successfully
+    /// establishes communication with the partner. The number is
+    /// incremented only in the communications interrupted case.
+    ///
+    /// @return Number of unacked clients.
+    virtual size_t getUnackedClientsCountInternal() const;
+
     /// @brief Removes information about the clients the partner server
     /// should respond to while communication with the partner was
     /// interrupted.
@@ -504,6 +555,15 @@ protected:
     /// See @c CommunicationState::analyzeMessage for details.
     virtual void clearConnectingClients();
 
+    /// @brief Removes information about the clients the partner server
+    /// should respond to while communication with the partner was
+    /// interrupted.
+    ///
+    /// Should be called in a thread safe context.
+    ///
+    /// See @c CommunicationState::analyzeMessage for details.
+    virtual void clearConnectingClientsInternal();
+
     /// @brief Structure holding information about the client which has
     /// send the packet being analyzed.
     struct ConnectingClient4 {
@@ -540,6 +600,9 @@ protected:
     /// the partner server while the servers are in communications
     /// interrupted state.
     ConnectingClients4 connecting_clients_;
+
+    /// @brief The mutex used to protect internal state.
+    const boost::scoped_ptr<std::mutex> mutex_;
 };
 
 /// @brief Pointer to the @c CommunicationState4 object.
@@ -599,6 +662,51 @@ public:
 
 protected:
 
+    /// @brief Checks if the DHCPv6 message appears to be unanswered.
+    ///
+    /// Should be called in a thread safe context.
+    ///
+    /// See @c CommunicationState::analyzeMessage for details.
+    ///
+    /// @param message DHCPv6 message to be analyzed. This must be the
+    /// message which belongs to the partner, i.e. the caller must
+    /// filter out messages belonging to the partner prior to calling
+    /// this method.
+    virtual void analyzeMessageInternal(const boost::shared_ptr<dhcp::Pkt>& message);
+
+    /// @brief Checks if the partner failure has been detected based
+    /// on the DHCP traffic analysis.
+    ///
+    /// Should be called in a thread safe context.
+    ///
+    /// @return true if the partner failure has been detected, false
+    /// otherwise.
+    virtual bool failureDetectedInternal() const;
+
+    /// @brief Returns the current number of clients which attempted
+    /// to get a lease from the partner server.
+    ///
+    /// Should be called in a thread safe context.
+    ///
+    /// The returned number is reset to 0 when the server successfully
+    /// establishes communication with the partner. The number is
+    /// incremented only in the communications interrupted case.
+    ///
+    /// @return The number of clients including unacked clients.
+    virtual size_t getConnectingClientsCountInternal() const;
+
+    /// @brief Returns the current number of clients which haven't gotten
+    /// a lease from the partner server.
+    ///
+    /// Should be called in a thread safe context.
+    ///
+    /// The returned number is reset to 0 when the server successfully
+    /// establishes communication with the partner. The number is
+    /// incremented only in the communications interrupted case.
+    ///
+    /// @return Number of unacked clients.
+    virtual size_t getUnackedClientsCountInternal() const;
+
     /// @brief Removes information about the clients the partner server
     /// should respond to while communication with the partner was
     /// interrupted.
@@ -606,6 +714,15 @@ protected:
     /// See @c CommunicationState::analyzeMessage for details.
     virtual void clearConnectingClients();
 
+    /// @brief Removes information about the clients the partner server
+    /// should respond to while communication with the partner was
+    /// interrupted.
+    ///
+    /// Should be called in a thread safe context.
+    ///
+    /// See @c CommunicationState::analyzeMessage for details.
+    virtual void clearConnectingClientsInternal();
+
     /// @brief Structure holding information about a client which
     /// sent a packet being analyzed.
     struct ConnectingClient6 {
@@ -635,6 +752,9 @@ protected:
     /// the partner server while the servers are in communications
     /// interrupted state.
     ConnectingClients6 connecting_clients_;
+
+    /// @brief The mutex used to protect internal state.
+    const boost::scoped_ptr<std::mutex> mutex_;
 };
 
 /// @brief Pointer to the @c CommunicationState6 object.
index dd2e73c6f9b7ca65571c35f794e7abc34e1bac8e..5fc9d63f09c86fbf990c5ba3f46b4de34bc1c939 100644 (file)
@@ -416,20 +416,11 @@ public:
         closeAll();
     }
 
-    /// @brief Returns next queued request for the given URL.
+    /// @brief Process next queued request for the given URL.
     ///
-    /// @param url URL for which next queued request should be retrieved.
-    /// @param [out] request Pointer to the queued request.
-    /// @param [out] response Pointer to the object into which response should
-    /// be stored.
-    /// @param request_timeout Requested timeout for the transaction.
-    /// @param callback Pointer to the user callback for this request.
-    /// @param connect_callback Pointer to the user callback invoked when
-    /// the client connects to the server.
-    /// @param close_callback Pointer to the user callback invoked when
-    /// the client closes the connection to the server.
+    /// @param url URL for which next queued request should be processed.
     ///
-    /// @return true if the request for the given URL has been retrieved,
+    /// @return true if the request for the given URL has been processed,
     /// false if there are no more requests queued for this URL.
     bool processNextRequest(const Url& url) {
         if (MultiThreadingMgr::instance().getMode()) {