From: Razvan Becheriu Date: Thu, 4 Jun 2020 21:04:59 +0000 (+0300) Subject: [#1239] communication state is now thread safe X-Git-Tag: Kea-1.7.9~62 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=7a3f93e83bc83083d077dbcf821668e2ef63838d;p=thirdparty%2Fkea.git [#1239] communication state is now thread safe --- diff --git a/src/hooks/dhcp/high_availability/communication_state.cc b/src/hooks/dhcp/high_availability/communication_state.cc index 4888493902..9bac62e578 100644 --- a/src/hooks/dhcp/high_availability/communication_state.cc +++ b/src/hooks/dhcp/high_availability/communication_state.cc @@ -364,6 +364,16 @@ CommunicationState4::CommunicationState4(const IOServicePtr& io_service, void CommunicationState4::analyzeMessage(const boost::shared_ptr& message) { + if (MultiThreadingMgr::instance().getMode()) { + std::lock_guard lk(*mutex_); + analyzeMessageInternal(message); + } else { + analyzeMessageInternal(message); + } +} + +void +CommunicationState4::analyzeMessageInternal(const boost::shared_ptr& message) { // The DHCP message must successfully cast to a Pkt4 object. Pkt4Ptr msg = boost::dynamic_pointer_cast(message); if (!msg) { @@ -432,34 +442,74 @@ CommunicationState4::analyzeMessage(const boost::shared_ptr& message) // Only log the first time we detect a client is unacked. if (log_unacked) { unsigned unacked_left = 0; - if (config_->getMaxUnackedClients() > getUnackedClientsCount()) { - unacked_left = config_->getMaxUnackedClients() > getUnackedClientsCount(); + if (config_->getMaxUnackedClients() > getUnackedClientsCountInternal()) { + unacked_left = config_->getMaxUnackedClients() - getUnackedClientsCountInternal(); } LOG_INFO(ha_logger, HA_COMMUNICATION_INTERRUPTED_CLIENT4_UNACKED) .arg(message->getLabel()) - .arg(getUnackedClientsCount()) + .arg(getUnackedClientsCountInternal()) .arg(unacked_left); } } bool CommunicationState4::failureDetected() const { + if (MultiThreadingMgr::instance().getMode()) { + std::lock_guard lk(*mutex_); + return (failureDetectedInternal()); + } else { + return (failureDetectedInternal()); + } +} + +bool +CommunicationState4::failureDetectedInternal() const { return ((config_->getMaxUnackedClients() == 0) || (getUnackedClientsCount() > config_->getMaxUnackedClients())); } size_t CommunicationState4::getConnectingClientsCount() const { + if (MultiThreadingMgr::instance().getMode()) { + std::lock_guard lk(*mutex_); + return (getConnectingClientsCountInternal()); + } else { + return (getConnectingClientsCountInternal()); + } +} + +size_t +CommunicationState4::getConnectingClientsCountInternal() const { return (connecting_clients_.size()); } size_t CommunicationState4::getUnackedClientsCount() const { + if (MultiThreadingMgr::instance().getMode()) { + std::lock_guard lk(*mutex_); + return (getUnackedClientsCountInternal()); + } else { + return (getUnackedClientsCountInternal()); + } +} + +size_t +CommunicationState4::getUnackedClientsCountInternal() const { return (connecting_clients_.get<1>().count(true)); } void CommunicationState4::clearConnectingClients() { + if (MultiThreadingMgr::instance().getMode()) { + std::lock_guard lk(*mutex_); + clearConnectingClientsInternal(); + } else { + clearConnectingClientsInternal(); + } +} + +void +CommunicationState4::clearConnectingClientsInternal() { connecting_clients_.clear(); } @@ -470,6 +520,16 @@ CommunicationState6::CommunicationState6(const IOServicePtr& io_service, void CommunicationState6::analyzeMessage(const boost::shared_ptr& message) { + if (MultiThreadingMgr::instance().getMode()) { + std::lock_guard lk(*mutex_); + analyzeMessageInternal(message); + } else { + analyzeMessageInternal(message); + } +} + +void +CommunicationState6::analyzeMessageInternal(const boost::shared_ptr& message) { // The DHCP message must successfully cast to a Pkt6 object. Pkt6Ptr msg = boost::dynamic_pointer_cast(message); if (!msg) { @@ -526,34 +586,74 @@ CommunicationState6::analyzeMessage(const boost::shared_ptr& message) // Only log the first time we detect a client is unacked. if (log_unacked) { unsigned unacked_left = 0; - if (config_->getMaxUnackedClients() > getUnackedClientsCount()) { - unacked_left = config_->getMaxUnackedClients() > getUnackedClientsCount(); + if (config_->getMaxUnackedClients() > getUnackedClientsCountInternal()) { + unacked_left = config_->getMaxUnackedClients() - getUnackedClientsCountInternal(); } LOG_INFO(ha_logger, HA_COMMUNICATION_INTERRUPTED_CLIENT6_UNACKED) .arg(message->getLabel()) - .arg(getUnackedClientsCount()) + .arg(getUnackedClientsCountInternal()) .arg(unacked_left); } } bool CommunicationState6::failureDetected() const { + if (MultiThreadingMgr::instance().getMode()) { + std::lock_guard lk(*mutex_); + return (failureDetectedInternal()); + } else { + return (failureDetectedInternal()); + } +} + +bool +CommunicationState6::failureDetectedInternal() const { return ((config_->getMaxUnackedClients() == 0) || (getUnackedClientsCount() > config_->getMaxUnackedClients())); } size_t CommunicationState6::getConnectingClientsCount() const { + if (MultiThreadingMgr::instance().getMode()) { + std::lock_guard lk(*mutex_); + return (getConnectingClientsCountInternal()); + } else { + return (getConnectingClientsCountInternal()); + } +} + +size_t +CommunicationState6::getConnectingClientsCountInternal() const { return (connecting_clients_.size()); } size_t CommunicationState6::getUnackedClientsCount() const { + if (MultiThreadingMgr::instance().getMode()) { + std::lock_guard lk(*mutex_); + return (getUnackedClientsCountInternal()); + } else { + return (getUnackedClientsCountInternal()); + } +} + +size_t +CommunicationState6::getUnackedClientsCountInternal() const { return (connecting_clients_.get<1>().count(true)); } void CommunicationState6::clearConnectingClients() { + if (MultiThreadingMgr::instance().getMode()) { + std::lock_guard lk(*mutex_); + clearConnectingClientsInternal(); + } else { + clearConnectingClientsInternal(); + } +} + +void +CommunicationState6::clearConnectingClientsInternal() { connecting_clients_.clear(); } diff --git a/src/hooks/dhcp/high_availability/communication_state.h b/src/hooks/dhcp/high_availability/communication_state.h index 2ffffa4f8a..fc0e0b8761 100644 --- a/src/hooks/dhcp/high_availability/communication_state.h +++ b/src/hooks/dhcp/high_availability/communication_state.h @@ -497,6 +497,57 @@ public: protected: + /// @brief Checks if the DHCPv4 message appears to be unanswered. + /// + /// Should be called in a thread safe context. + /// + /// This method uses "secs" field value for detecting client + /// communication failures as described in the + /// @c CommunicationState::analyzeMessage. Some misbehaving Windows + /// clients were reported to swap "secs" field bytes. In this case + /// the first byte is set to non-zero byte and the second byte is + /// set to 0. This method handles such cases and corrects bytes + /// order before comparing against the threshold. + /// + /// @param message DHCPv4 message to be analyzed. This must be the + /// message which belongs to the partner, i.e. the caller must + /// filter out messages belonging to the partner prior to calling + /// this method. + virtual void analyzeMessageInternal(const boost::shared_ptr& message); + + /// @brief Checks if the partner failure has been detected based + /// on the DHCP traffic analysis. + /// + /// Should be called in a thread safe context. + /// + /// @return true if the partner failure has been detected, false + /// otherwise. + virtual bool failureDetectedInternal() const; + + /// @brief Returns the current number of clients which attempted + /// to get a lease from the partner server. + /// + /// Should be called in a thread safe context. + /// + /// The returned number is reset to 0 when the server successfully + /// establishes communication with the partner. The number is + /// incremented only in the communications interrupted case. + /// + /// @return The number of clients including unacked clients. + virtual size_t getConnectingClientsCountInternal() const; + + /// @brief Returns the current number of clients which haven't gotten + /// a lease from the partner server. + /// + /// Should be called in a thread safe context. + /// + /// The returned number is reset to 0 when the server successfully + /// establishes communication with the partner. The number is + /// incremented only in the communications interrupted case. + /// + /// @return Number of unacked clients. + virtual size_t getUnackedClientsCountInternal() const; + /// @brief Removes information about the clients the partner server /// should respond to while communication with the partner was /// interrupted. @@ -504,6 +555,15 @@ protected: /// See @c CommunicationState::analyzeMessage for details. virtual void clearConnectingClients(); + /// @brief Removes information about the clients the partner server + /// should respond to while communication with the partner was + /// interrupted. + /// + /// Should be called in a thread safe context. + /// + /// See @c CommunicationState::analyzeMessage for details. + virtual void clearConnectingClientsInternal(); + /// @brief Structure holding information about the client which has /// send the packet being analyzed. struct ConnectingClient4 { @@ -540,6 +600,9 @@ protected: /// the partner server while the servers are in communications /// interrupted state. ConnectingClients4 connecting_clients_; + + /// @brief The mutex used to protect internal state. + const boost::scoped_ptr mutex_; }; /// @brief Pointer to the @c CommunicationState4 object. @@ -599,6 +662,51 @@ public: protected: + /// @brief Checks if the DHCPv6 message appears to be unanswered. + /// + /// Should be called in a thread safe context. + /// + /// See @c CommunicationState::analyzeMessage for details. + /// + /// @param message DHCPv6 message to be analyzed. This must be the + /// message which belongs to the partner, i.e. the caller must + /// filter out messages belonging to the partner prior to calling + /// this method. + virtual void analyzeMessageInternal(const boost::shared_ptr& message); + + /// @brief Checks if the partner failure has been detected based + /// on the DHCP traffic analysis. + /// + /// Should be called in a thread safe context. + /// + /// @return true if the partner failure has been detected, false + /// otherwise. + virtual bool failureDetectedInternal() const; + + /// @brief Returns the current number of clients which attempted + /// to get a lease from the partner server. + /// + /// Should be called in a thread safe context. + /// + /// The returned number is reset to 0 when the server successfully + /// establishes communication with the partner. The number is + /// incremented only in the communications interrupted case. + /// + /// @return The number of clients including unacked clients. + virtual size_t getConnectingClientsCountInternal() const; + + /// @brief Returns the current number of clients which haven't gotten + /// a lease from the partner server. + /// + /// Should be called in a thread safe context. + /// + /// The returned number is reset to 0 when the server successfully + /// establishes communication with the partner. The number is + /// incremented only in the communications interrupted case. + /// + /// @return Number of unacked clients. + virtual size_t getUnackedClientsCountInternal() const; + /// @brief Removes information about the clients the partner server /// should respond to while communication with the partner was /// interrupted. @@ -606,6 +714,15 @@ protected: /// See @c CommunicationState::analyzeMessage for details. virtual void clearConnectingClients(); + /// @brief Removes information about the clients the partner server + /// should respond to while communication with the partner was + /// interrupted. + /// + /// Should be called in a thread safe context. + /// + /// See @c CommunicationState::analyzeMessage for details. + virtual void clearConnectingClientsInternal(); + /// @brief Structure holding information about a client which /// sent a packet being analyzed. struct ConnectingClient6 { @@ -635,6 +752,9 @@ protected: /// the partner server while the servers are in communications /// interrupted state. ConnectingClients6 connecting_clients_; + + /// @brief The mutex used to protect internal state. + const boost::scoped_ptr mutex_; }; /// @brief Pointer to the @c CommunicationState6 object. diff --git a/src/lib/http/client.cc b/src/lib/http/client.cc index dd2e73c6f9..5fc9d63f09 100644 --- a/src/lib/http/client.cc +++ b/src/lib/http/client.cc @@ -416,20 +416,11 @@ public: closeAll(); } - /// @brief Returns next queued request for the given URL. + /// @brief Process next queued request for the given URL. /// - /// @param url URL for which next queued request should be retrieved. - /// @param [out] request Pointer to the queued request. - /// @param [out] response Pointer to the object into which response should - /// be stored. - /// @param request_timeout Requested timeout for the transaction. - /// @param callback Pointer to the user callback for this request. - /// @param connect_callback Pointer to the user callback invoked when - /// the client connects to the server. - /// @param close_callback Pointer to the user callback invoked when - /// the client closes the connection to the server. + /// @param url URL for which next queued request should be processed. /// - /// @return true if the request for the given URL has been retrieved, + /// @return true if the request for the given URL has been processed, /// false if there are no more requests queued for this URL. bool processNextRequest(const Url& url) { if (MultiThreadingMgr::instance().getMode()) {