2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
24 #include "dnsdist-backoff.hh"
25 #include "dnsdist-metrics.hh"
26 #include "dnsdist-nghttp2.hh"
27 #include "dnsdist-random.hh"
28 #include "dnsdist-rings.hh"
29 #include "dnsdist-tcp.hh"
32 bool DownstreamState::passCrossProtocolQuery(std::unique_ptr
<CrossProtocolQuery
>&& cpq
)
34 #if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
35 if (!d_config
.d_dohPath
.empty()) {
36 return g_dohClientThreads
&& g_dohClientThreads
->passCrossProtocolQueryToThread(std::move(cpq
));
39 return g_tcpclientthreads
&& g_tcpclientthreads
->passCrossProtocolQueryToThread(std::move(cpq
));
42 bool DownstreamState::reconnect(bool initialAttempt
)
44 std::unique_lock
<std::mutex
> tl(connectLock
, std::try_to_lock
);
45 if (!tl
.owns_lock() || isStopped()) {
46 /* we are already reconnecting or stopped anyway */
50 if (IsAnyAddress(d_config
.remote
)) {
55 for (auto& fd
: sockets
) {
57 if (sockets
.size() > 1) {
58 (*mplexer
.lock())->removeReadFD(fd
);
60 /* shutdown() is needed to wake up recv() in the responderThread */
61 shutdown(fd
, SHUT_RDWR
);
65 fd
= SSocket(d_config
.remote
.sin4
.sin_family
, SOCK_DGRAM
, 0);
67 #ifdef SO_BINDTODEVICE
68 if (!d_config
.sourceItfName
.empty()) {
69 int res
= setsockopt(fd
, SOL_SOCKET
, SO_BINDTODEVICE
, d_config
.sourceItfName
.c_str(), d_config
.sourceItfName
.length());
71 infolog("Error setting up the interface on backend socket '%s': %s", d_config
.remote
.toStringWithPort(), stringerror());
76 if (!IsAnyAddress(d_config
.sourceAddr
)) {
77 #ifdef IP_BIND_ADDRESS_NO_PORT
78 if (d_config
.ipBindAddrNoPort
) {
79 SSetsockopt(fd
, SOL_IP
, IP_BIND_ADDRESS_NO_PORT
, 1);
82 SBind(fd
, d_config
.sourceAddr
);
86 SConnect(fd
, d_config
.remote
);
87 if (sockets
.size() > 1) {
88 (*mplexer
.lock())->addReadFD(fd
, [](int, boost::any
) {});
92 catch (const std::runtime_error
& error
) {
93 if (initialAttempt
|| g_verbose
) {
94 infolog("Error connecting to new server with address %s: %s", d_config
.remote
.toStringWithPort(), error
.what());
101 /* if at least one (re-)connection failed, close all sockets */
103 for (auto& fd
: sockets
) {
105 if (sockets
.size() > 1) {
107 (*mplexer
.lock())->removeReadFD(fd
);
109 catch (const FDMultiplexerException
& e
) {
110 /* some sockets might not have been added to the multiplexer
114 /* shutdown() is needed to wake up recv() in the responderThread */
115 shutdown(fd
, SHUT_RDWR
);
124 d_connectedWait
.notify_all();
125 if (!initialAttempt
) {
126 /* we need to be careful not to start this
127 thread too soon, as the creation should only
128 happen after the configuration has been parsed */
136 void DownstreamState::waitUntilConnected()
145 std::unique_lock
<std::mutex
> lock(connectLock
);
146 d_connectedWait
.wait(lock
, [this]{
147 return connected
.load();
152 void DownstreamState::stop()
160 std::lock_guard
<std::mutex
> tl(connectLock
);
161 auto slock
= mplexer
.lock();
163 for (auto& fd
: sockets
) {
165 /* shutdown() is needed to wake up recv() in the responderThread */
166 shutdown(fd
, SHUT_RDWR
);
172 void DownstreamState::hash()
174 vinfolog("Computing hashes for id=%s and weight=%d", *d_config
.id
, d_config
.d_weight
);
175 auto w
= d_config
.d_weight
;
176 auto idStr
= boost::str(boost::format("%s") % *d_config
.id
);
177 auto lockedHashes
= hashes
.write_lock();
178 lockedHashes
->clear();
179 lockedHashes
->reserve(w
);
181 std::string uuid
= boost::str(boost::format("%s-%d") % idStr
% w
);
182 unsigned int wshash
= burtleCI(reinterpret_cast<const unsigned char*>(uuid
.c_str()), uuid
.size(), g_hashperturb
);
183 lockedHashes
->push_back(wshash
);
186 std::sort(lockedHashes
->begin(), lockedHashes
->end());
187 hashesComputed
= true;
190 void DownstreamState::setId(const boost::uuids::uuid
& newId
)
193 // compute hashes only if already done
194 if (hashesComputed
) {
199 void DownstreamState::setWeight(int newWeight
)
202 errlog("Error setting server's weight: downstream weight value must be greater than 0.");
206 d_config
.d_weight
= newWeight
;
208 if (hashesComputed
) {
213 DownstreamState::DownstreamState(DownstreamState::Config
&& config
, std::shared_ptr
<TLSCtx
> tlsCtx
, bool connect
): d_config(std::move(config
)), d_tlsCtx(std::move(tlsCtx
))
215 threadStarted
.clear();
217 if (d_config
.d_qpsLimit
> 0) {
218 qps
= QPSLimiter(d_config
.d_qpsLimit
, d_config
.d_qpsLimit
);
225 d_config
.id
= getUniqueID();
228 if (d_config
.d_weight
> 0) {
229 setWeight(d_config
.d_weight
);
232 if (d_config
.availability
== Availability::Lazy
&& d_config
.d_lazyHealthCheckSampleSize
> 0) {
233 d_lazyHealthCheckStats
.lock()->d_lastResults
.set_capacity(d_config
.d_lazyHealthCheckSampleSize
);
237 setName(d_config
.name
);
240 if (!d_config
.d_dohPath
.empty()) {
242 setupDoHClientProtocolNegotiation(d_tlsCtx
);
244 if (g_configurationDone
&& g_outgoingDoHWorkerThreads
&& *g_outgoingDoHWorkerThreads
== 0) {
245 throw std::runtime_error("Error: setOutgoingDoHWorkerThreads() is set to 0 so no outgoing DoH worker thread is available to serve queries");
248 if (!g_outgoingDoHWorkerThreads
|| *g_outgoingDoHWorkerThreads
== 0) {
249 g_outgoingDoHWorkerThreads
= 1;
251 #endif /* HAVE_NGHTTP2 */
254 setupDoTProtocolNegotiation(d_tlsCtx
);
258 if (connect
&& !isTCPOnly()) {
259 if (!IsAnyAddress(d_config
.remote
)) {
268 void DownstreamState::start()
270 if (connected
&& !threadStarted
.test_and_set()) {
271 tid
= std::thread(responderThread
, shared_from_this());
273 if (!d_config
.d_cpus
.empty()) {
274 mapThreadToCPUList(tid
.native_handle(), d_config
.d_cpus
);
281 void DownstreamState::connectUDPSockets()
283 if (s_randomizeIDs
) {
287 idStates
.resize(g_maxOutstanding
);
289 sockets
.resize(d_config
.d_numberOfSockets
);
291 if (sockets
.size() > 1) {
292 *(mplexer
.lock()) = std::unique_ptr
<FDMultiplexer
>(FDMultiplexer::getMultiplexerSilent(sockets
.size()));
295 for (auto& fd
: sockets
) {
302 DownstreamState::~DownstreamState()
304 for (auto& fd
: sockets
) {
312 void DownstreamState::incCurrentConnectionsCount()
314 auto currentConnectionsCount
= ++tcpCurrentConnections
;
315 if (currentConnectionsCount
> tcpMaxConcurrentConnections
) {
316 tcpMaxConcurrentConnections
.store(currentConnectionsCount
);
320 int DownstreamState::pickSocketForSending()
322 size_t numberOfSockets
= sockets
.size();
323 if (numberOfSockets
== 1) {
328 if (s_randomizeSockets
) {
329 idx
= dnsdist::getRandomValue(numberOfSockets
);
332 idx
= socketsOffset
++;
335 return sockets
[idx
% numberOfSockets
];
338 void DownstreamState::pickSocketsReadyForReceiving(std::vector
<int>& ready
)
342 if (sockets
.size() == 1) {
343 ready
.push_back(sockets
[0]);
347 (*mplexer
.lock())->getAvailableFDs(ready
, 1000);
350 bool DownstreamState::s_randomizeSockets
{false};
351 bool DownstreamState::s_randomizeIDs
{false};
352 int DownstreamState::s_udpTimeout
{2};
354 static bool isIDSExpired(const IDState
& ids
)
356 auto age
= ids
.age
.load();
357 return age
> DownstreamState::s_udpTimeout
;
360 void DownstreamState::handleUDPTimeout(IDState
& ids
)
364 DOHUnitInterface::handleTimeout(std::move(ids
.internal
.du
));
367 ++dnsdist::metrics::g_stats
.downstreamTimeouts
; // this is an 'actively' discovered timeout
368 vinfolog("Had a downstream timeout from %s (%s) for query for %s|%s from %s",
369 d_config
.remote
.toStringWithPort(), getName(),
370 ids
.internal
.qname
.toLogString(), QType(ids
.internal
.qtype
).toString(), ids
.internal
.origRemote
.toStringWithPort());
372 if (g_rings
.shouldRecordResponses()) {
376 struct dnsheader fake
;
377 memset(&fake
, 0, sizeof(fake
));
378 fake
.id
= ids
.internal
.origID
;
379 uint16_t* flags
= getFlagsFromDNSHeader(&fake
);
380 *flags
= ids
.internal
.origFlags
;
382 g_rings
.insertResponse(ts
, ids
.internal
.origRemote
, ids
.internal
.qname
, ids
.internal
.qtype
, std::numeric_limits
<unsigned int>::max(), 0, fake
, d_config
.remote
, getProtocol());
385 reportTimeoutOrError();
388 void DownstreamState::reportResponse(uint8_t rcode
)
390 if (d_config
.availability
== Availability::Lazy
&& d_config
.d_lazyHealthCheckSampleSize
> 0) {
391 bool failure
= d_config
.d_lazyHealthCheckMode
== LazyHealthCheckMode::TimeoutOrServFail
? rcode
== RCode::ServFail
: false;
392 d_lazyHealthCheckStats
.lock()->d_lastResults
.push_back(failure
);
396 void DownstreamState::reportTimeoutOrError()
398 if (d_config
.availability
== Availability::Lazy
&& d_config
.d_lazyHealthCheckSampleSize
> 0) {
399 d_lazyHealthCheckStats
.lock()->d_lastResults
.push_back(true);
403 void DownstreamState::handleUDPTimeouts()
405 if (getProtocol() != dnsdist::Protocol::DoUDP
) {
409 if (s_randomizeIDs
) {
410 auto map
= d_idStatesMap
.lock();
411 for (auto it
= map
->begin(); it
!= map
->end(); ) {
412 auto& ids
= it
->second
;
413 if (isIDSExpired(ids
)) {
414 handleUDPTimeout(ids
);
423 if (outstanding
.load() > 0) {
424 for (IDState
& ids
: idStates
) {
425 if (!ids
.isInUse()) {
428 if (!isIDSExpired(ids
)) {
432 auto guard
= ids
.acquire();
436 /* check again, now that we have locked this state */
437 if (ids
.isInUse() && isIDSExpired(ids
)) {
438 handleUDPTimeout(ids
);
445 uint16_t DownstreamState::saveState(InternalQueryState
&& state
)
447 if (s_randomizeIDs
) {
448 /* if the state is already in use we will retry,
449 up to 5 five times. The last selected one is used
450 even if it was already in use */
451 size_t remainingAttempts
= 5;
452 auto map
= d_idStatesMap
.lock();
455 uint16_t selectedID
= dnsdist::getRandomValue(std::numeric_limits
<uint16_t>::max());
456 auto [it
, inserted
] = map
->emplace(selectedID
, IDState());
460 if (remainingAttempts
> 0) {
464 auto oldDU
= std::move(it
->second
.internal
.du
);
466 ++dnsdist::metrics::g_stats
.downstreamTimeouts
;
467 DOHUnitInterface::handleTimeout(std::move(oldDU
));
473 it
->second
.internal
= std::move(state
);
474 it
->second
.age
.store(0);
482 uint16_t selectedID
= (idOffset
++) % idStates
.size();
483 IDState
& ids
= idStates
[selectedID
];
484 auto guard
= ids
.acquire();
489 /* we are reusing a state, no change in outstanding but if there was an existing DOHUnit we need
490 to handle it because it's about to be overwritten. */
491 auto oldDU
= std::move(ids
.internal
.du
);
493 ++dnsdist::metrics::g_stats
.downstreamTimeouts
;
494 DOHUnitInterface::handleTimeout(std::move(oldDU
));
499 ids
.internal
= std::move(state
);
507 void DownstreamState::restoreState(uint16_t id
, InternalQueryState
&& state
)
509 if (s_randomizeIDs
) {
510 auto map
= d_idStatesMap
.lock();
512 auto [it
, inserted
] = map
->emplace(id
, IDState());
516 ++dnsdist::metrics::g_stats
.downstreamTimeouts
;
517 DOHUnitInterface::handleTimeout(std::move(state
.du
));
520 it
->second
.internal
= std::move(state
);
526 auto& ids
= idStates
[id
];
527 auto guard
= ids
.acquire();
531 ++dnsdist::metrics::g_stats
.downstreamTimeouts
;
532 DOHUnitInterface::handleTimeout(std::move(state
.du
));
538 ++dnsdist::metrics::g_stats
.downstreamTimeouts
;
539 DOHUnitInterface::handleTimeout(std::move(state
.du
));
542 ids
.internal
= std::move(state
);
547 std::optional
<InternalQueryState
> DownstreamState::getState(uint16_t id
)
549 std::optional
<InternalQueryState
> result
= std::nullopt
;
551 if (s_randomizeIDs
) {
552 auto map
= d_idStatesMap
.lock();
554 auto it
= map
->find(id
);
555 if (it
== map
->end()) {
559 result
= std::move(it
->second
.internal
);
565 if (id
> idStates
.size()) {
569 auto& ids
= idStates
[id
];
570 auto guard
= ids
.acquire();
576 result
= std::move(ids
.internal
);
583 bool DownstreamState::healthCheckRequired(std::optional
<time_t> currentTime
)
585 if (d_config
.availability
== DownstreamState::Availability::Lazy
) {
586 auto stats
= d_lazyHealthCheckStats
.lock();
587 if (stats
->d_status
== LazyHealthCheckStats::LazyStatus::PotentialFailure
) {
588 vinfolog("Sending health-check query for %s which is still in the Potential Failure state", getNameWithAddr());
591 if (stats
->d_status
== LazyHealthCheckStats::LazyStatus::Failed
) {
592 auto now
= currentTime
? *currentTime
: time(nullptr);
593 if (stats
->d_nextCheck
<= now
) {
594 /* we update the next check time here because the check might time out,
595 and we do not want to send a second check during that time unless
596 the timer is actually very short */
597 vinfolog("Sending health-check query for %s which is still in the Failed state", getNameWithAddr());
598 updateNextLazyHealthCheck(*stats
, true, now
);
603 if (stats
->d_status
== LazyHealthCheckStats::LazyStatus::Healthy
) {
604 auto& lastResults
= stats
->d_lastResults
;
605 size_t totalCount
= lastResults
.size();
606 if (totalCount
< d_config
.d_lazyHealthCheckMinSampleCount
) {
611 for (const auto& result
: lastResults
) {
617 const auto maxFailureRate
= static_cast<float>(d_config
.d_lazyHealthCheckThreshold
);
618 auto current
= (100.0 * failures
) / totalCount
;
619 if (current
>= maxFailureRate
) {
621 vinfolog("Backend %s reached the lazy health-check threshold (%f%% out of %f%%, looking at sample of %d items with %d failures), moving to Potential Failure state", getNameWithAddr(), current
, maxFailureRate
, totalCount
, failures
);
622 stats
->d_status
= LazyHealthCheckStats::LazyStatus::PotentialFailure
;
623 /* we update the next check time here because the check might time out,
624 and we do not want to send a second check during that time unless
625 the timer is actually very short */
626 updateNextLazyHealthCheck(*stats
, true);
633 else if (d_config
.availability
== DownstreamState::Availability::Auto
) {
635 if (d_nextCheck
> 1) {
640 d_nextCheck
= d_config
.checkInterval
;
647 time_t DownstreamState::getNextLazyHealthCheck()
649 auto stats
= d_lazyHealthCheckStats
.lock();
650 return stats
->d_nextCheck
;
653 void DownstreamState::updateNextLazyHealthCheck(LazyHealthCheckStats
& stats
, bool checkScheduled
, std::optional
<time_t> currentTime
)
655 auto now
= currentTime
? * currentTime
: time(nullptr);
656 if (d_config
.d_lazyHealthCheckUseExponentialBackOff
) {
657 if (stats
.d_status
== DownstreamState::LazyHealthCheckStats::LazyStatus::PotentialFailure
) {
658 /* we are still in the "up" state, we need to send the next query quickly to
659 determine if the backend is really down */
660 stats
.d_nextCheck
= now
+ d_config
.checkInterval
;
661 vinfolog("Backend %s is in potential failure state, next check in %d seconds", getNameWithAddr(), d_config
.checkInterval
);
663 else if (consecutiveSuccessfulChecks
> 0) {
664 /* we are in 'Failed' state, but just had one (or more) successful check,
665 so we want the next one to happen quite quickly as the backend might
666 be available again. */
667 stats
.d_nextCheck
= now
+ d_config
.d_lazyHealthCheckFailedInterval
;
668 if (!checkScheduled
) {
669 vinfolog("Backend %s is in failed state but had %d consecutive successful checks, next check in %d seconds", getNameWithAddr(), std::to_string(consecutiveSuccessfulChecks
), d_config
.d_lazyHealthCheckFailedInterval
);
673 uint16_t failedTests
= currentCheckFailures
;
674 if (checkScheduled
) {
675 /* we are planning the check after that one, which will only
676 occur if there is a failure */
680 time_t backOff
= d_config
.d_lazyHealthCheckMaxBackOff
;
681 const ExponentialBackOffTimer
backOffTimer(d_config
.d_lazyHealthCheckMaxBackOff
);
682 auto backOffCoeffTmp
= backOffTimer
.get(failedTests
);
683 /* backOffCoeffTmp cannot be higher than d_config.d_lazyHealthCheckMaxBackOff */
684 const auto backOffCoeff
= static_cast<time_t>(backOffCoeffTmp
);
685 if ((std::numeric_limits
<time_t>::max() / d_config
.d_lazyHealthCheckFailedInterval
) >= backOffCoeff
) {
686 backOff
= d_config
.d_lazyHealthCheckFailedInterval
* backOffCoeff
;
687 if (backOff
> d_config
.d_lazyHealthCheckMaxBackOff
|| (std::numeric_limits
<time_t>::max() - now
) <= backOff
) {
688 backOff
= d_config
.d_lazyHealthCheckMaxBackOff
;
692 stats
.d_nextCheck
= now
+ backOff
;
693 vinfolog("Backend %s is in failed state and has failed %d consecutive checks, next check in %d seconds", getNameWithAddr(), failedTests
, backOff
);
697 stats
.d_nextCheck
= now
+ d_config
.d_lazyHealthCheckFailedInterval
;
698 vinfolog("Backend %s is in %s state, next check in %d seconds", getNameWithAddr(), (stats
.d_status
== DownstreamState::LazyHealthCheckStats::LazyStatus::PotentialFailure
? "potential failure" : "failed"), d_config
.d_lazyHealthCheckFailedInterval
);
702 void DownstreamState::submitHealthCheckResult(bool initial
, bool newResult
)
705 ++d_healthCheckMetrics
.d_failures
;
709 /* if this is the initial health-check, at startup, we do not care
710 about the minimum number of failed/successful health-checks */
711 if (!IsAnyAddress(d_config
.remote
)) {
712 infolog("Marking downstream %s as '%s'", getNameWithAddr(), newResult
? "up" : "down");
714 setUpStatus(newResult
);
715 if (newResult
== false) {
716 currentCheckFailures
++;
717 auto stats
= d_lazyHealthCheckStats
.lock();
718 stats
->d_status
= LazyHealthCheckStats::LazyStatus::Failed
;
719 updateNextLazyHealthCheck(*stats
, false);
724 bool newState
= newResult
;
727 /* check succeeded */
728 currentCheckFailures
= 0;
731 /* we were previously marked as "down" and had a successful health-check,
732 let's see if this is enough to move to the "up" state or if we need
733 more successful health-checks for that */
734 consecutiveSuccessfulChecks
++;
735 if (consecutiveSuccessfulChecks
< d_config
.minRiseSuccesses
) {
736 /* we need more than one successful check to rise
737 and we didn't reach the threshold yet, let's stay down */
740 if (d_config
.availability
== DownstreamState::Availability::Lazy
) {
741 auto stats
= d_lazyHealthCheckStats
.lock();
742 updateNextLazyHealthCheck(*stats
, false);
748 if (d_config
.availability
== DownstreamState::Availability::Lazy
) {
749 auto stats
= d_lazyHealthCheckStats
.lock();
750 vinfolog("Backend %s had %d successful checks, moving to Healthy", getNameWithAddr(), std::to_string(consecutiveSuccessfulChecks
));
751 stats
->d_status
= LazyHealthCheckStats::LazyStatus::Healthy
;
752 stats
->d_lastResults
.clear();
758 consecutiveSuccessfulChecks
= 0;
760 currentCheckFailures
++;
763 /* we were previously marked as "up" and failed a health-check,
764 let's see if this is enough to move to the "down" state or if
765 need more failed checks for that */
766 if (currentCheckFailures
< d_config
.maxCheckFailures
) {
767 /* we need more than one failure to be marked as down,
768 and we did not reach the threshold yet, let's stay up */
771 else if (d_config
.availability
== DownstreamState::Availability::Lazy
) {
772 auto stats
= d_lazyHealthCheckStats
.lock();
773 vinfolog("Backend %s failed its health-check, moving from Potential failure to Failed", getNameWithAddr());
774 stats
->d_status
= LazyHealthCheckStats::LazyStatus::Failed
;
775 currentCheckFailures
= 0;
776 updateNextLazyHealthCheck(*stats
, false);
781 if (newState
!= upStatus
) {
782 /* we are actually moving to a new state */
783 if (!IsAnyAddress(d_config
.remote
)) {
784 infolog("Marking downstream %s as '%s'", getNameWithAddr(), newState
? "up" : "down");
787 if (newState
&& !isTCPOnly() && (!connected
|| d_config
.reconnectOnUp
)) {
788 newState
= reconnect();
791 setUpStatus(newState
);
792 if (g_snmpAgent
&& g_snmpTrapsEnabled
) {
793 g_snmpAgent
->sendBackendStatusChangeTrap(*this);
798 size_t ServerPool::countServers(bool upOnly
)
800 std::shared_ptr
<const ServerPolicy::NumberedServerVector
> servers
= nullptr;
802 auto lock
= d_servers
.read_lock();
807 for (const auto& server
: *servers
) {
808 if (!upOnly
|| std::get
<1>(server
)->isUp() ) {
816 size_t ServerPool::poolLoad()
818 std::shared_ptr
<const ServerPolicy::NumberedServerVector
> servers
= nullptr;
820 auto lock
= d_servers
.read_lock();
825 for (const auto& server
: *servers
) {
826 size_t serverOutstanding
= std::get
<1>(server
)->outstanding
.load();
827 load
+= serverOutstanding
;
832 const std::shared_ptr
<const ServerPolicy::NumberedServerVector
> ServerPool::getServers()
834 std::shared_ptr
<const ServerPolicy::NumberedServerVector
> result
;
836 result
= *(d_servers
.read_lock());
841 void ServerPool::addServer(shared_ptr
<DownstreamState
>& server
)
843 auto servers
= d_servers
.write_lock();
844 /* we can't update the content of the shared pointer directly even when holding the lock,
845 as other threads might hold a copy. We can however update the pointer as long as we hold the lock. */
846 unsigned int count
= static_cast<unsigned int>((*servers
)->size());
847 auto newServers
= ServerPolicy::NumberedServerVector(*(*servers
));
848 newServers
.emplace_back(++count
, server
);
849 /* we need to reorder based on the server 'order' */
850 std::stable_sort(newServers
.begin(), newServers
.end(), [](const std::pair
<unsigned int,std::shared_ptr
<DownstreamState
> >& a
, const std::pair
<unsigned int,std::shared_ptr
<DownstreamState
> >& b
) {
851 return a
.second
->d_config
.order
< b
.second
->d_config
.order
;
853 /* and now we need to renumber for Lua (custom policies) */
855 for (auto& serv
: newServers
) {
858 *servers
= std::make_shared
<const ServerPolicy::NumberedServerVector
>(std::move(newServers
));
861 void ServerPool::removeServer(shared_ptr
<DownstreamState
>& server
)
863 auto servers
= d_servers
.write_lock();
864 /* we can't update the content of the shared pointer directly even when holding the lock,
865 as other threads might hold a copy. We can however update the pointer as long as we hold the lock. */
866 auto newServers
= std::make_shared
<ServerPolicy::NumberedServerVector
>(*(*servers
));
869 for (auto it
= newServers
->begin(); it
!= newServers
->end();) {
871 /* we need to renumber the servers placed
872 after the removed one, for Lua (custom policies) */
876 else if (it
->second
== server
) {
877 it
= newServers
->erase(it
);
884 *servers
= std::move(newServers
);