From: Francis Dupont Date: Fri, 8 May 2020 23:53:17 +0000 (+0200) Subject: [#1219] Made pending_requests_ thread safe X-Git-Tag: Kea-1.7.8~81 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=bee3e709b7220f0b9d9779189e9b7d21e45cd247;p=thirdparty%2Fkea.git [#1219] Made pending_requests_ thread safe --- diff --git a/src/hooks/dhcp/high_availability/ha_service.cc b/src/hooks/dhcp/high_availability/ha_service.cc index 9f475ecc16..72c669fc81 100644 --- a/src/hooks/dhcp/high_availability/ha_service.cc +++ b/src/hooks/dhcp/high_availability/ha_service.cc @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -51,7 +52,7 @@ HAService::HAService(const IOServicePtr& io_service, const NetworkStatePtr& netw const HAConfigPtr& config, const HAServerType& server_type) : io_service_(io_service), network_state_(network_state), config_(config), server_type_(server_type), client_(*io_service), communication_state_(), - query_filter_(config), pending_requests_() { + query_filter_(config), pending_requests_mutex_(), pending_requests_() { if (server_type == HAServerType::DHCPv4) { communication_state_.reset(new CommunicationState4(io_service_, config)); @@ -913,6 +914,61 @@ HAService::asyncSendLeaseUpdates(const dhcp::Pkt6Ptr& query, return (sent_num); } +template +bool +HAService::leaseUpdateComplete(QueryPtrType& query, + const ParkingLotHandlePtr& parking_lot) { + if (MultiThreadingMgr::instance().getMode()) { + std::lock_guard lock(pending_requests_mutex_); + return (leaseUpdateCompleteInternal(query, parking_lot)); + } else { + return (leaseUpdateCompleteInternal(query, parking_lot)); + } +} + +template +bool +HAService::leaseUpdateCompleteInternal(QueryPtrType& query, + const ParkingLotHandlePtr& parking_lot) { + auto it = pending_requests_.find(query); + + // If there are no more pending requests for this query, let's unpark + // the DHCP packet. + if (it == pending_requests_.end() || (--pending_requests_[query] <= 0)) { + parking_lot->unpark(query); + + // If we have unparked the packet we can clear pending requests for + // this query. + if (it != pending_requests_.end()) { + pending_requests_.erase(it); + } + return (true); + } + return (false); +} + +template +void +HAService::updatePendingRequest(QueryPtrType& query) { + if (MultiThreadingMgr::instance().getMode()) { + std::lock_guard lock(pending_requests_mutex_); + updatePendingRequestInternal(query); + } else { + updatePendingRequestInternal(query); + } +} + +template +void +HAService::updatePendingRequestInternal(QueryPtrType& query) { + if (pending_requests_.count(query) == 0) { + pending_requests_[query] = 1; + + } else { + ++pending_requests_[query]; + } +} + template void HAService::asyncSendLeaseUpdate(const QueryPtrType& query, @@ -1020,19 +1076,7 @@ HAService::asyncSendLeaseUpdate(const QueryPtrType& query, return; } - auto it = pending_requests_.find(query); - - // If there are no more pending requests for this query, let's unpark - // the DHCP packet. - if (it == pending_requests_.end() || (--pending_requests_[query] <= 0)) { - parking_lot->unpark(query); - - // If we have unparked the packet we can clear pending requests for - // this query. - if (it != pending_requests_.end()) { - pending_requests_.erase(it); - } - + if (leaseUpdateComplete(query, parking_lot)) { // If we have finished sending the lease updates we need to run the // state machine until the state machine finds that additional events // are required, such as next heartbeat or a lease update. The runModel() @@ -1052,12 +1096,7 @@ HAService::asyncSendLeaseUpdate(const QueryPtrType& query, // a backup increase the number of pending requests. if (config_->amWaitingBackupAck() || (config->getRole() != HAConfig::PeerConfig::BACKUP)) { // Request scheduled, so update the request counters for the query. - if (pending_requests_.count(query) == 0) { - pending_requests_[query] = 1; - - } else { - ++pending_requests_[query]; - } + updatePendingRequest(query); } } @@ -2200,5 +2239,15 @@ HAService::clientCloseHandler(int tcp_native_fd) { } }; +size_t +HAService::pendingRequestSize() { + if (MultiThreadingMgr::instance().getMode()) { + std::lock_guard lock(pending_requests_mutex_); + return (pending_requests_.size()); + } else { + return (pending_requests_.size()); + } +} + } // end of namespace isc::ha } // end of namespace isc diff --git a/src/hooks/dhcp/high_availability/ha_service.h b/src/hooks/dhcp/high_availability/ha_service.h index 8f438920ea..448b11962e 100644 --- a/src/hooks/dhcp/high_availability/ha_service.h +++ b/src/hooks/dhcp/high_availability/ha_service.h @@ -25,6 +25,7 @@ #include #include #include +#include #include namespace isc { @@ -376,7 +377,7 @@ private: /// This is a generic implementation of the public @c inScope method /// variants. /// - /// @tparam type of the pointer to the DHCP query. + /// @tparam QueryPtrType type of the pointer to the DHCP query. /// @param [out] query6 pointer to the DHCP query received. A client class /// will be appended to this query instance, appropriate for the server to /// process this query, e.g. "HA_server1" if the "server1" should process @@ -917,6 +918,72 @@ protected: /// @brief Selects queries to be processed/dropped. QueryFilter query_filter_; + /// @brief Handle last pending request for this query. + /// + /// Search if there are pending requests for this query: + /// - if there are decrement the count + /// - if there were at least two return false + /// - if there was none or one unpark the query + /// - if there was one remove the query from the map + /// - return true + /// + /// @tparam QueryPtrType Type of the pointer to the DHCP client's message, + /// i.e. Pkt4Ptr or Pkt6Ptr. + /// @param query Pointer to the DHCP client's query. + /// @param [out] parking_lot Parking lot where the query is parked. + /// This method uses this handle to unpark the packet when all asynchronous + /// requests have been completed. + /// @return True when all lease updates are complete, false otherwise. + template + bool leaseUpdateComplete(QueryPtrType& query, + const hooks::ParkingLotHandlePtr& parking_lot); + + /// @brief Update pending request counter for this query. + /// + /// @tparam QueryPtrType Type of the pointer to the DHCP client's message, + /// i.e. Pkt4Ptr or Pkt6Ptr. + /// @param query Pointer to the DHCP client's query. + template + void updatePendingRequest(QueryPtrType& query); + +protected: + /// @brief Get the number of entries in the pending request map. + /// @note Currently for testing purposes only. + /// @return Number of entries in the pending request map. + size_t pendingRequestSize(); + +private: + /// @brief Handle last pending request for this query. + /// + /// Search if there are pending requests for this query: + /// - if there are decrement the count + /// - if there were at least two return false + /// - if there was none or one unpark the query + /// - if there was one remove the query from the map + /// - return true + /// + /// @tparam QueryPtrType Type of the pointer to the DHCP client's message, + /// i.e. Pkt4Ptr or Pkt6Ptr. + /// @param query Pointer to the DHCP client's query. + /// @param [out] parking_lot Parking lot where the query is parked. + /// This method uses this handle to unpark the packet when all asynchronous + /// requests have been completed. + /// @return True when all lease updates are complete, false otherwise. + template + bool leaseUpdateCompleteInternal(QueryPtrType& query, + const hooks::ParkingLotHandlePtr& parking_lot); + + /// @brief Update pending request counter for this query. + /// + /// @tparam QueryPtrType Type of the pointer to the DHCP client's message, + /// i.e. Pkt4Ptr or Pkt6Ptr. + /// @param query Pointer to the DHCP client's query. + template + void updatePendingRequestInternal(QueryPtrType& query); + + /// @brief Mutex to protect the pending_requests_ map. + std::mutex pending_requests_mutex_; + /// @brief Map holding a number of scheduled requests for a given packet. /// /// A single callout may send multiple requests at the same time, e.g. diff --git a/src/hooks/dhcp/high_availability/tests/ha_service_unittest.cc b/src/hooks/dhcp/high_availability/tests/ha_service_unittest.cc index 74750902c3..1d39a83c6f 100644 --- a/src/hooks/dhcp/high_availability/tests/ha_service_unittest.cc +++ b/src/hooks/dhcp/high_availability/tests/ha_service_unittest.cc @@ -189,11 +189,11 @@ public: using HAService::transition; using HAService::verboseTransition; using HAService::shouldSendLeaseUpdates; + using HAService::pendingRequestSize; using HAService::network_state_; using HAService::config_; using HAService::communication_state_; using HAService::query_filter_; - using HAService::pending_requests_; }; /// @brief Pointer to the @c TestHAService. @@ -738,7 +738,7 @@ public: // Actually perform the lease updates. ASSERT_NO_THROW(runIOService(TEST_TIMEOUT, [&service]() { // Finish running IO service when there are no more pending requests. - return (service.pending_requests_.empty()); + return (service.pendingRequestSize() == 0); })); // Only if we wait for lease updates to complete it makes senst to test @@ -842,7 +842,7 @@ public: // Actually perform the lease updates. ASSERT_NO_THROW(runIOService(TEST_TIMEOUT, [&service]() { // Finish running IO service when there are no more pending requests. - return (service.pending_requests_.empty()); + return (service.pendingRequestSize() == 0); })); // Only if we wait for lease updates to complete it makes senst to test