From: Thomas Markwalder Date: Mon, 19 Apr 2021 14:59:38 +0000 (-0400) Subject: [#1736] Make HA CommunicationState class thread-safe X-Git-Tag: Kea-1.9.7~30 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=6ea9ac9c34ca67fc68bc501336b7124c53c817ea;p=thirdparty%2Fkea.git [#1736] Make HA CommunicationState class thread-safe communication_state.cc communication_state.h - Added mutexing to avoid race conditions --- diff --git a/src/hooks/dhcp/high_availability/communication_state.cc b/src/hooks/dhcp/high_availability/communication_state.cc index e91b5be802..7e11af1b82 100644 --- a/src/hooks/dhcp/high_availability/communication_state.cc +++ b/src/hooks/dhcp/high_availability/communication_state.cc @@ -76,19 +76,58 @@ CommunicationState::modifyPokeTime(const long secs) { } } +int +CommunicationState::getPartnerState() const { + if (MultiThreadingMgr::instance().getMode()) { + std::lock_guard lk(*mutex_); + return (partner_state_); + } else { + return (partner_state_); + } +} + void CommunicationState::setPartnerState(const std::string& state) { + if (MultiThreadingMgr::instance().getMode()) { + std::lock_guard lk(*mutex_); + setPartnerStateInternal(state); + } else { + setPartnerStateInternal(state); + } +} + +void +CommunicationState::setPartnerStateInternal(const std::string& state) { try { partner_state_ = stringToState(state); - } catch (...) { isc_throw(BadValue, "unsupported HA partner state returned " << state); } } +std::set +CommunicationState::getPartnerScopes() const { + if (MultiThreadingMgr::instance().getMode()) { + std::lock_guard lk(*mutex_); + return (partner_scopes_); + } else { + return (partner_scopes_); + } +} + void CommunicationState::setPartnerScopes(ConstElementPtr new_scopes) { + if (MultiThreadingMgr::instance().getMode()) { + std::lock_guard lk(*mutex_); + setPartnerScopesInternal(new_scopes); + } else { + setPartnerScopesInternal(new_scopes); + } +} + +void +CommunicationState::setPartnerScopesInternal(ConstElementPtr new_scopes) { if (!new_scopes || (new_scopes->getType() != Element::list)) { isc_throw(BadValue, "unable to record partner's HA scopes because" " the received value is not a valid JSON list"); @@ -181,6 +220,16 @@ CommunicationState::stopHeartbeatInternal() { } } +bool +CommunicationState::isHeartbeatRunning() const { + if (MultiThreadingMgr::instance().getMode()) { + std::lock_guard lk(*mutex_); + return (static_cast(timer_)); + } else { + return (static_cast(timer_)); + } +} + boost::posix_time::time_duration CommunicationState::updatePokeTime() { if (MultiThreadingMgr::instance().getMode()) { @@ -202,8 +251,18 @@ CommunicationState::updatePokeTimeInternal() { void CommunicationState::poke() { + if (MultiThreadingMgr::instance().getMode()) { + std::lock_guard lk(*mutex_); + pokeInternal(); + } else { + pokeInternal(); + } +} + +void +CommunicationState::pokeInternal() { // Update poke time and compute duration. - boost::posix_time::time_duration duration_since_poke = updatePokeTime(); + boost::posix_time::time_duration duration_since_poke = updatePokeTimeInternal(); // If we have been tracking the DHCP messages directed to the partner, // we need to clear any gathered information because the connection @@ -256,6 +315,16 @@ CommunicationState::getAnalyzedMessagesCount() const { bool CommunicationState::clockSkewShouldWarn() { + if (MultiThreadingMgr::instance().getMode()) { + std::lock_guard lk(*mutex_); + return(clockSkewShouldWarnInternal()); + } else { + return(clockSkewShouldWarnInternal()); + } +} + +bool +CommunicationState::clockSkewShouldWarnInternal() { // First check if the clock skew is beyond the threshold. if (isClockSkewGreater(WARN_CLOCK_SKEW)) { @@ -283,18 +352,33 @@ CommunicationState::clockSkewShouldWarn() { bool CommunicationState::clockSkewShouldTerminate() const { - // Issue a warning if the clock skew is greater than 60s. - return (isClockSkewGreater(TERM_CLOCK_SKEW)); + if (MultiThreadingMgr::instance().getMode()) { + std::lock_guard lk(*mutex_); + // Issue a warning if the clock skew is greater than 60s. + return (isClockSkewGreater(TERM_CLOCK_SKEW)); + } else { + return (isClockSkewGreater(TERM_CLOCK_SKEW)); + } } bool CommunicationState::isClockSkewGreater(const long seconds) const { return ((clock_skew_.total_seconds() > seconds) || - (clock_skew_.total_seconds() < -seconds)); + (clock_skew_.total_seconds() < -seconds)); } void CommunicationState::setPartnerTime(const std::string& time_text) { + if (MultiThreadingMgr::instance().getMode()) { + std::lock_guard lk(*mutex_); + setPartnerTimeInternal(time_text); + } else { + setPartnerTimeInternal(time_text); + } +} + +void +CommunicationState::setPartnerTimeInternal(const std::string& time_text) { partner_time_at_skew_ = HttpDateTime().fromRfc1123(time_text).getPtime(); my_time_at_skew_ = HttpDateTime().getPtime(); clock_skew_ = partner_time_at_skew_ - my_time_at_skew_; @@ -302,6 +386,16 @@ CommunicationState::setPartnerTime(const std::string& time_text) { std::string CommunicationState::logFormatClockSkew() const { + if (MultiThreadingMgr::instance().getMode()) { + std::lock_guard lk(*mutex_); + return(logFormatClockSkewInternal()); + } else { + return(logFormatClockSkewInternal()); + } +} + +std::string +CommunicationState::logFormatClockSkewInternal() const { std::ostringstream os; if ((my_time_at_skew_.is_not_a_date_time()) || @@ -501,12 +595,7 @@ CommunicationState4::getUnackedClientsCount() const { void CommunicationState4::clearConnectingClients() { - if (MultiThreadingMgr::instance().getMode()) { - std::lock_guard lk(*mutex_); - connecting_clients_.clear(); - } else { - connecting_clients_.clear(); - } + connecting_clients_.clear(); } CommunicationState6::CommunicationState6(const IOServicePtr& io_service, @@ -632,12 +721,7 @@ CommunicationState6::getUnackedClientsCount() const { void CommunicationState6::clearConnectingClients() { - if (MultiThreadingMgr::instance().getMode()) { - std::lock_guard lk(*mutex_); - connecting_clients_.clear(); - } else { - connecting_clients_.clear(); - } + connecting_clients_.clear(); } } // end of namespace isc::ha diff --git a/src/hooks/dhcp/high_availability/communication_state.h b/src/hooks/dhcp/high_availability/communication_state.h index cdc444f6d2..1d533b0232 100644 --- a/src/hooks/dhcp/high_availability/communication_state.h +++ b/src/hooks/dhcp/high_availability/communication_state.h @@ -100,9 +100,7 @@ public: /// @brief Returns last known state of the partner. /// /// @return Partner's state if it is known, or a negative value otherwise. - int getPartnerState() const { - return (partner_state_); - } + int getPartnerState() const; /// @brief Sets partner state. /// @@ -111,18 +109,32 @@ public: /// @throw BadValue if unsupported state value was provided. void setPartnerState(const std::string& state); +protected: + /// @brief Sets partner state. + /// + /// @param state new partner's state in a textual form. Supported values are + /// those returned in response to a ha-heartbeat command. + /// @throw BadValue if unsupported state value was provided. + void setPartnerStateInternal(const std::string& state); + +public: /// @brief Returns scopes served by the partner server. /// /// @return A set of scopes served by the partner. - std::set getPartnerScopes() const { - return (partner_scopes_); - } + std::set getPartnerScopes() const; /// @brief Sets partner scopes. /// /// @param new_scopes Partner scopes enclosed in a JSON list. void setPartnerScopes(data::ConstElementPtr new_scopes); +protected: + /// @brief Sets partner scopes. + /// + /// @param new_scopes Partner scopes enclosed in a JSON list. + void setPartnerScopesInternal(data::ConstElementPtr new_scopes); + +public: /// @brief Starts recurring heartbeat (public interface). /// /// @param interval heartbeat interval in milliseconds. @@ -152,9 +164,7 @@ public: /// @brief Checks if recurring heartbeat is running. /// /// @return true if heartbeat is running, false otherwise. - bool isHeartbeatRunning() const { - return (static_cast(timer_)); - } + bool isHeartbeatRunning() const; /// @brief Pokes the communication state. /// @@ -163,6 +173,15 @@ public: /// to the next heartbeat). void poke(); +protected: + /// @brief Pokes the communication state. + /// + /// Sets the last poke time to current time. If the heartbeat timer + /// has been scheduled, it is reset (starts over measuring the time + /// to the next heartbeat). + void pokeInternal(); + +public: /// @brief Returns duration between the poke time and current time. /// /// @return Duration between the poke time and current time. @@ -296,6 +315,32 @@ public: /// skew exceeding a warning threshold. bool clockSkewShouldWarn(); +protected: + /// @brief Indicates whether the HA service should issue a warning about + /// high clock skew between the active servers. + /// + /// The HA service monitors the clock skew between the active servers. The + /// clock skew is calculated from the local time and the time returned by + /// the partner in response to a heartbeat. When clock skew exceeds a certain + /// threshold the HA service starts issuing a warning message. This method + /// returns true if the HA service should issue this message. + /// + /// Currently, the warning threshold for the clock skew is hardcoded to + /// 30 seconds. In the future it may become configurable. + /// + /// This method is called for each heartbeat. If we issue a warning for each + /// heartbeat it may flood logs with those messages. This method provides + /// a gating mechanism which prevents the HA service from logging the + /// warning more often than every 60 seconds. If the last warning was issued + /// less than 60 seconds ago this method will return false even if the clock + /// skew exceeds the 30 seconds threshold. The correction of the clock skew + /// will reset the gating counter. + /// + /// @return true if the warning message should be logged because of the clock + /// skew exceeding a warning threshold. + bool clockSkewShouldWarnInternal(); + +public: /// @brief Indicates whether the HA service should enter "terminated" /// state as a result of the clock skew exceeding maximum value. /// @@ -339,9 +384,29 @@ public: /// precision. void setPartnerTime(const std::string& time_text); +protected: + /// @brief Provide partner's notion of time so the new clock skew can be + /// calculated. + /// + /// @param time_text Partner's time received in response to a heartbeat. The + /// time must be provided in the RFC 1123 format. It stores the current + /// time, partner's time, and the difference (skew) between them. + /// + /// @throw isc::http::HttpTimeConversionError if the time format is invalid. + /// + /// @todo Consider some other time formats which include millisecond + /// precision. + void setPartnerTimeInternal(const std::string& time_text); + +public: /// @brief Returns current clock skew value in the logger friendly format. std::string logFormatClockSkew() const; +protected: + /// @brief Returns current clock skew value in the logger friendly format. + std::string logFormatClockSkewInternal() const; + +public: /// @brief Returns the report about current communication state. /// /// This function returns a JSON map describing the state of communication