]> git.ipfire.org Git - thirdparty/kea.git/commitdiff
[#1736] Make HA CommunicationState class thread-safe
authorThomas Markwalder <tmark@isc.org>
Mon, 19 Apr 2021 14:59:38 +0000 (10:59 -0400)
committerThomas Markwalder <tmark@isc.org>
Fri, 23 Apr 2021 12:54:31 +0000 (08:54 -0400)
    communication_state.cc
    communication_state.h
       - Added mutexing to avoid race conditions

src/hooks/dhcp/high_availability/communication_state.cc
src/hooks/dhcp/high_availability/communication_state.h

index e91b5be8021baa414c1a00ec33c99653bcbef873..7e11af1b8292e8816994ecbf7867c41675ffe44e 100644 (file)
@@ -76,19 +76,58 @@ CommunicationState::modifyPokeTime(const long secs) {
     }
 }
 
+int
+CommunicationState::getPartnerState() const {
+    if (MultiThreadingMgr::instance().getMode()) {
+        std::lock_guard<std::mutex> lk(*mutex_);
+        return (partner_state_);
+    } else {
+        return (partner_state_);
+    }
+}
+
 void
 CommunicationState::setPartnerState(const std::string& state) {
+    if (MultiThreadingMgr::instance().getMode()) {
+        std::lock_guard<std::mutex> lk(*mutex_);
+        setPartnerStateInternal(state);
+    } else {
+        setPartnerStateInternal(state);
+    }
+}
+
+void
+CommunicationState::setPartnerStateInternal(const std::string& state) {
     try {
         partner_state_ = stringToState(state);
-
     } catch (...) {
         isc_throw(BadValue, "unsupported HA partner state returned "
                   << state);
     }
 }
 
+std::set<std::string>
+CommunicationState::getPartnerScopes() const {
+    if (MultiThreadingMgr::instance().getMode()) {
+        std::lock_guard<std::mutex> lk(*mutex_);
+        return (partner_scopes_);
+    } else {
+        return (partner_scopes_);
+    }
+}
+
 void
 CommunicationState::setPartnerScopes(ConstElementPtr new_scopes) {
+    if (MultiThreadingMgr::instance().getMode()) {
+        std::lock_guard<std::mutex> lk(*mutex_);
+        setPartnerScopesInternal(new_scopes);
+    } else {
+        setPartnerScopesInternal(new_scopes);
+    }
+}
+
+void
+CommunicationState::setPartnerScopesInternal(ConstElementPtr new_scopes) {
     if (!new_scopes || (new_scopes->getType() != Element::list)) {
         isc_throw(BadValue, "unable to record partner's HA scopes because"
                   " the received value is not a valid JSON list");
@@ -181,6 +220,16 @@ CommunicationState::stopHeartbeatInternal() {
     }
 }
 
+bool
+CommunicationState::isHeartbeatRunning() const {
+    if (MultiThreadingMgr::instance().getMode()) {
+        std::lock_guard<std::mutex> lk(*mutex_);
+        return (static_cast<bool>(timer_));
+    } else {
+        return (static_cast<bool>(timer_));
+    }
+}
+
 boost::posix_time::time_duration
 CommunicationState::updatePokeTime() {
     if (MultiThreadingMgr::instance().getMode()) {
@@ -202,8 +251,18 @@ CommunicationState::updatePokeTimeInternal() {
 
 void
 CommunicationState::poke() {
+    if (MultiThreadingMgr::instance().getMode()) {
+        std::lock_guard<std::mutex> lk(*mutex_);
+        pokeInternal();
+    } else {
+        pokeInternal();
+    }
+}
+
+void
+CommunicationState::pokeInternal() {
     // Update poke time and compute duration.
-    boost::posix_time::time_duration duration_since_poke = updatePokeTime();
+    boost::posix_time::time_duration duration_since_poke = updatePokeTimeInternal();
 
     // If we have been tracking the DHCP messages directed to the partner,
     // we need to clear any gathered information because the connection
@@ -256,6 +315,16 @@ CommunicationState::getAnalyzedMessagesCount() const {
 
 bool
 CommunicationState::clockSkewShouldWarn() {
+    if (MultiThreadingMgr::instance().getMode()) {
+        std::lock_guard<std::mutex> lk(*mutex_);
+        return(clockSkewShouldWarnInternal());
+    } else {
+        return(clockSkewShouldWarnInternal());
+    }
+}
+
+bool
+CommunicationState::clockSkewShouldWarnInternal() {
     // First check if the clock skew is beyond the threshold.
     if (isClockSkewGreater(WARN_CLOCK_SKEW)) {
 
@@ -283,18 +352,33 @@ CommunicationState::clockSkewShouldWarn() {
 
 bool
 CommunicationState::clockSkewShouldTerminate() const {
-    // Issue a warning if the clock skew is greater than 60s.
-    return (isClockSkewGreater(TERM_CLOCK_SKEW));
+    if (MultiThreadingMgr::instance().getMode()) {
+        std::lock_guard<std::mutex> lk(*mutex_);
+        // Issue a warning if the clock skew is greater than 60s.
+        return (isClockSkewGreater(TERM_CLOCK_SKEW));
+    } else {
+        return (isClockSkewGreater(TERM_CLOCK_SKEW));
+    }
 }
 
 bool
 CommunicationState::isClockSkewGreater(const long seconds) const {
     return ((clock_skew_.total_seconds() > seconds) ||
-            (clock_skew_.total_seconds() < -seconds));
+             (clock_skew_.total_seconds() < -seconds));
 }
 
 void
 CommunicationState::setPartnerTime(const std::string& time_text) {
+    if (MultiThreadingMgr::instance().getMode()) {
+        std::lock_guard<std::mutex> lk(*mutex_);
+        setPartnerTimeInternal(time_text);
+    } else {
+        setPartnerTimeInternal(time_text);
+    }
+}
+
+void
+CommunicationState::setPartnerTimeInternal(const std::string& time_text) {
     partner_time_at_skew_ = HttpDateTime().fromRfc1123(time_text).getPtime();
     my_time_at_skew_ = HttpDateTime().getPtime();
     clock_skew_ = partner_time_at_skew_ - my_time_at_skew_;
@@ -302,6 +386,16 @@ CommunicationState::setPartnerTime(const std::string& time_text) {
 
 std::string
 CommunicationState::logFormatClockSkew() const {
+    if (MultiThreadingMgr::instance().getMode()) {
+        std::lock_guard<std::mutex> lk(*mutex_);
+        return(logFormatClockSkewInternal());
+    } else {
+        return(logFormatClockSkewInternal());
+    }
+}
+
+std::string
+CommunicationState::logFormatClockSkewInternal() const {
     std::ostringstream os;
 
     if ((my_time_at_skew_.is_not_a_date_time()) ||
@@ -501,12 +595,7 @@ CommunicationState4::getUnackedClientsCount() const {
 
 void
 CommunicationState4::clearConnectingClients() {
-    if (MultiThreadingMgr::instance().getMode()) {
-        std::lock_guard<std::mutex> lk(*mutex_);
-        connecting_clients_.clear();
-    } else {
-        connecting_clients_.clear();
-    }
+    connecting_clients_.clear();
 }
 
 CommunicationState6::CommunicationState6(const IOServicePtr& io_service,
@@ -632,12 +721,7 @@ CommunicationState6::getUnackedClientsCount() const {
 
 void
 CommunicationState6::clearConnectingClients() {
-    if (MultiThreadingMgr::instance().getMode()) {
-        std::lock_guard<std::mutex> lk(*mutex_);
-        connecting_clients_.clear();
-    } else {
-        connecting_clients_.clear();
-    }
+    connecting_clients_.clear();
 }
 
 } // end of namespace isc::ha
index cdc444f6d27d6c0afb5659b5426943e9bc3076c3..1d533b0232f2ed3aa24ea981ae9c1a9e11891a52 100644 (file)
@@ -100,9 +100,7 @@ public:
     /// @brief Returns last known state of the partner.
     ///
     /// @return Partner's state if it is known, or a negative value otherwise.
-    int getPartnerState() const {
-        return (partner_state_);
-    }
+    int getPartnerState() const;
 
     /// @brief Sets partner state.
     ///
@@ -111,18 +109,32 @@ public:
     /// @throw BadValue if unsupported state value was provided.
     void setPartnerState(const std::string& state);
 
+protected:
+    /// @brief Sets partner state.
+    ///
+    /// @param state new partner's state in a textual form. Supported values are
+    /// those returned in response to a ha-heartbeat command.
+    /// @throw BadValue if unsupported state value was provided.
+    void setPartnerStateInternal(const std::string& state);
+
+public:
     /// @brief Returns scopes served by the partner server.
     ///
     /// @return A set of scopes served by the partner.
-    std::set<std::string> getPartnerScopes() const {
-        return (partner_scopes_);
-    }
+    std::set<std::string> getPartnerScopes() const;
 
     /// @brief Sets partner scopes.
     ///
     /// @param new_scopes Partner scopes enclosed in a JSON list.
     void setPartnerScopes(data::ConstElementPtr new_scopes);
 
+protected:
+    /// @brief Sets partner scopes.
+    ///
+    /// @param new_scopes Partner scopes enclosed in a JSON list.
+    void setPartnerScopesInternal(data::ConstElementPtr new_scopes);
+
+public:
     /// @brief Starts recurring heartbeat (public interface).
     ///
     /// @param interval heartbeat interval in milliseconds.
@@ -152,9 +164,7 @@ public:
     /// @brief Checks if recurring heartbeat is running.
     ///
     /// @return true if heartbeat is running, false otherwise.
-    bool isHeartbeatRunning() const {
-        return (static_cast<bool>(timer_));
-    }
+    bool isHeartbeatRunning() const;
 
     /// @brief Pokes the communication state.
     ///
@@ -163,6 +173,15 @@ public:
     /// to the next heartbeat).
     void poke();
 
+protected:
+    /// @brief Pokes the communication state.
+    ///
+    /// Sets the last poke time to current time. If the heartbeat timer
+    /// has been scheduled, it is reset (starts over measuring the time
+    /// to the next heartbeat).
+    void pokeInternal();
+
+public:
     /// @brief Returns duration between the poke time and current time.
     ///
     /// @return Duration between the poke time and current time.
@@ -296,6 +315,32 @@ public:
     /// skew exceeding a warning threshold.
     bool clockSkewShouldWarn();
 
+protected:
+    /// @brief Indicates whether the HA service should issue a warning about
+    /// high clock skew between the active servers.
+    ///
+    /// The HA service monitors the clock skew between the active servers. The
+    /// clock skew is calculated from the local time and the time returned by
+    /// the partner in response to a heartbeat. When clock skew exceeds a certain
+    /// threshold the HA service starts issuing a warning message. This method
+    /// returns true if the HA service should issue this message.
+    ///
+    /// Currently, the warning threshold for the clock skew is hardcoded to
+    /// 30 seconds.  In the future it may become configurable.
+    ///
+    /// This method is called for each heartbeat. If we issue a warning for each
+    /// heartbeat it may flood logs with those messages. This method provides
+    /// a gating mechanism which prevents the HA service from logging the
+    /// warning more often than every 60 seconds. If the last warning was issued
+    /// less than 60 seconds ago this method will return false even if the clock
+    /// skew exceeds the 30 seconds threshold. The correction of the clock skew
+    /// will reset the gating counter.
+    ///
+    /// @return true if the warning message should be logged because of the clock
+    /// skew exceeding a warning threshold.
+    bool clockSkewShouldWarnInternal();
+
+public:
     /// @brief Indicates whether the HA service should enter "terminated"
     /// state as a result of the clock skew exceeding maximum value.
     ///
@@ -339,9 +384,29 @@ public:
     /// precision.
     void setPartnerTime(const std::string& time_text);
 
+protected:
+    /// @brief Provide partner's notion of time so the new clock skew can be
+    /// calculated.
+    ///
+    /// @param time_text Partner's time received in response to a heartbeat. The
+    /// time must be provided in the RFC 1123 format.  It stores the current
+    /// time, partner's time, and the difference (skew) between them.
+    ///
+    /// @throw isc::http::HttpTimeConversionError if the time format is invalid.
+    ///
+    /// @todo Consider some other time formats which include millisecond
+    /// precision.
+    void setPartnerTimeInternal(const std::string& time_text);
+
+public:
     /// @brief Returns current clock skew value in the logger friendly format.
     std::string logFormatClockSkew() const;
 
+protected:
+    /// @brief Returns current clock skew value in the logger friendly format.
+    std::string logFormatClockSkewInternal() const;
+
+public:
     /// @brief Returns the report about current communication state.
     ///
     /// This function returns a JSON map describing the state of communication