]> git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/dnsdistdist/dnsdist-backend.cc
8c3eefc2399e1a25f2aa582ec5957ea9d82e6e3d
[thirdparty/pdns.git] / pdns / dnsdistdist / dnsdist-backend.cc
1 /*
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
8 *
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
22
23 #include "dnsdist.hh"
24 #include "dnsdist-backoff.hh"
25 #include "dnsdist-metrics.hh"
26 #include "dnsdist-nghttp2.hh"
27 #include "dnsdist-random.hh"
28 #include "dnsdist-rings.hh"
29 #include "dnsdist-tcp.hh"
30 #include "dolog.hh"
31
32 bool DownstreamState::passCrossProtocolQuery(std::unique_ptr<CrossProtocolQuery>&& cpq)
33 {
34 #if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
35 if (!d_config.d_dohPath.empty()) {
36 return g_dohClientThreads && g_dohClientThreads->passCrossProtocolQueryToThread(std::move(cpq));
37 }
38 #endif
39 return g_tcpclientthreads && g_tcpclientthreads->passCrossProtocolQueryToThread(std::move(cpq));
40 }
41
42 bool DownstreamState::reconnect(bool initialAttempt)
43 {
44 std::unique_lock<std::mutex> tl(connectLock, std::try_to_lock);
45 if (!tl.owns_lock() || isStopped()) {
46 /* we are already reconnecting or stopped anyway */
47 return false;
48 }
49
50 if (IsAnyAddress(d_config.remote)) {
51 return true;
52 }
53
54 connected = false;
55 for (auto& fd : sockets) {
56 if (fd != -1) {
57 if (sockets.size() > 1) {
58 (*mplexer.lock())->removeReadFD(fd);
59 }
60 /* shutdown() is needed to wake up recv() in the responderThread */
61 shutdown(fd, SHUT_RDWR);
62 close(fd);
63 fd = -1;
64 }
65 fd = SSocket(d_config.remote.sin4.sin_family, SOCK_DGRAM, 0);
66
67 #ifdef SO_BINDTODEVICE
68 if (!d_config.sourceItfName.empty()) {
69 int res = setsockopt(fd, SOL_SOCKET, SO_BINDTODEVICE, d_config.sourceItfName.c_str(), d_config.sourceItfName.length());
70 if (res != 0) {
71 infolog("Error setting up the interface on backend socket '%s': %s", d_config.remote.toStringWithPort(), stringerror());
72 }
73 }
74 #endif
75
76 if (!IsAnyAddress(d_config.sourceAddr)) {
77 #ifdef IP_BIND_ADDRESS_NO_PORT
78 if (d_config.ipBindAddrNoPort) {
79 SSetsockopt(fd, SOL_IP, IP_BIND_ADDRESS_NO_PORT, 1);
80 }
81 #endif
82 SBind(fd, d_config.sourceAddr);
83 }
84
85 try {
86 SConnect(fd, d_config.remote);
87 if (sockets.size() > 1) {
88 (*mplexer.lock())->addReadFD(fd, [](int, boost::any) {});
89 }
90 connected = true;
91 }
92 catch (const std::runtime_error& error) {
93 if (initialAttempt || g_verbose) {
94 infolog("Error connecting to new server with address %s: %s", d_config.remote.toStringWithPort(), error.what());
95 }
96 connected = false;
97 break;
98 }
99 }
100
101 /* if at least one (re-)connection failed, close all sockets */
102 if (!connected) {
103 for (auto& fd : sockets) {
104 if (fd != -1) {
105 if (sockets.size() > 1) {
106 try {
107 (*mplexer.lock())->removeReadFD(fd);
108 }
109 catch (const FDMultiplexerException& e) {
110 /* some sockets might not have been added to the multiplexer
111 yet, that's fine */
112 }
113 }
114 /* shutdown() is needed to wake up recv() in the responderThread */
115 shutdown(fd, SHUT_RDWR);
116 close(fd);
117 fd = -1;
118 }
119 }
120 }
121
122 if (connected) {
123 tl.unlock();
124 d_connectedWait.notify_all();
125 if (!initialAttempt) {
126 /* we need to be careful not to start this
127 thread too soon, as the creation should only
128 happen after the configuration has been parsed */
129 start();
130 }
131 }
132
133 return connected;
134 }
135
136 void DownstreamState::waitUntilConnected()
137 {
138 if (d_stopped) {
139 return;
140 }
141 if (connected) {
142 return;
143 }
144 {
145 std::unique_lock<std::mutex> lock(connectLock);
146 d_connectedWait.wait(lock, [this]{
147 return connected.load();
148 });
149 }
150 }
151
152 void DownstreamState::stop()
153 {
154 if (d_stopped) {
155 return;
156 }
157 d_stopped = true;
158
159 {
160 std::lock_guard<std::mutex> tl(connectLock);
161 auto slock = mplexer.lock();
162
163 for (auto& fd : sockets) {
164 if (fd != -1) {
165 /* shutdown() is needed to wake up recv() in the responderThread */
166 shutdown(fd, SHUT_RDWR);
167 }
168 }
169 }
170 }
171
172 void DownstreamState::hash()
173 {
174 vinfolog("Computing hashes for id=%s and weight=%d", *d_config.id, d_config.d_weight);
175 auto w = d_config.d_weight;
176 auto idStr = boost::str(boost::format("%s") % *d_config.id);
177 auto lockedHashes = hashes.write_lock();
178 lockedHashes->clear();
179 lockedHashes->reserve(w);
180 while (w > 0) {
181 std::string uuid = boost::str(boost::format("%s-%d") % idStr % w);
182 unsigned int wshash = burtleCI(reinterpret_cast<const unsigned char*>(uuid.c_str()), uuid.size(), g_hashperturb);
183 lockedHashes->push_back(wshash);
184 --w;
185 }
186 std::sort(lockedHashes->begin(), lockedHashes->end());
187 hashesComputed = true;
188 }
189
190 void DownstreamState::setId(const boost::uuids::uuid& newId)
191 {
192 d_config.id = newId;
193 // compute hashes only if already done
194 if (hashesComputed) {
195 hash();
196 }
197 }
198
199 void DownstreamState::setWeight(int newWeight)
200 {
201 if (newWeight < 1) {
202 errlog("Error setting server's weight: downstream weight value must be greater than 0.");
203 return ;
204 }
205
206 d_config.d_weight = newWeight;
207
208 if (hashesComputed) {
209 hash();
210 }
211 }
212
213 DownstreamState::DownstreamState(DownstreamState::Config&& config, std::shared_ptr<TLSCtx> tlsCtx, bool connect): d_config(std::move(config)), d_tlsCtx(std::move(tlsCtx))
214 {
215 threadStarted.clear();
216
217 if (d_config.d_qpsLimit > 0) {
218 qps = QPSLimiter(d_config.d_qpsLimit, d_config.d_qpsLimit);
219 }
220
221 if (d_config.id) {
222 setId(*d_config.id);
223 }
224 else {
225 d_config.id = getUniqueID();
226 }
227
228 if (d_config.d_weight > 0) {
229 setWeight(d_config.d_weight);
230 }
231
232 if (d_config.availability == Availability::Lazy && d_config.d_lazyHealthCheckSampleSize > 0) {
233 d_lazyHealthCheckStats.lock()->d_lastResults.set_capacity(d_config.d_lazyHealthCheckSampleSize);
234 setUpStatus(true);
235 }
236
237 setName(d_config.name);
238
239 if (d_tlsCtx) {
240 if (!d_config.d_dohPath.empty()) {
241 #ifdef HAVE_NGHTTP2
242 setupDoHClientProtocolNegotiation(d_tlsCtx);
243
244 if (g_configurationDone && g_outgoingDoHWorkerThreads && *g_outgoingDoHWorkerThreads == 0) {
245 throw std::runtime_error("Error: setOutgoingDoHWorkerThreads() is set to 0 so no outgoing DoH worker thread is available to serve queries");
246 }
247
248 if (!g_outgoingDoHWorkerThreads || *g_outgoingDoHWorkerThreads == 0) {
249 g_outgoingDoHWorkerThreads = 1;
250 }
251 #endif /* HAVE_NGHTTP2 */
252 }
253 else {
254 setupDoTProtocolNegotiation(d_tlsCtx);
255 }
256 }
257
258 if (connect && !isTCPOnly()) {
259 if (!IsAnyAddress(d_config.remote)) {
260 connectUDPSockets();
261 }
262 }
263
264 sw.start();
265 }
266
267
268 void DownstreamState::start()
269 {
270 if (connected && !threadStarted.test_and_set()) {
271 tid = std::thread(responderThread, shared_from_this());
272
273 if (!d_config.d_cpus.empty()) {
274 mapThreadToCPUList(tid.native_handle(), d_config.d_cpus);
275 }
276
277 tid.detach();
278 }
279 }
280
281 void DownstreamState::connectUDPSockets()
282 {
283 if (s_randomizeIDs) {
284 idStates.clear();
285 }
286 else {
287 idStates.resize(g_maxOutstanding);
288 }
289 sockets.resize(d_config.d_numberOfSockets);
290
291 if (sockets.size() > 1) {
292 *(mplexer.lock()) = std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent(sockets.size()));
293 }
294
295 for (auto& fd : sockets) {
296 fd = -1;
297 }
298
299 reconnect(true);
300 }
301
302 DownstreamState::~DownstreamState()
303 {
304 for (auto& fd : sockets) {
305 if (fd >= 0) {
306 close(fd);
307 fd = -1;
308 }
309 }
310 }
311
312 void DownstreamState::incCurrentConnectionsCount()
313 {
314 auto currentConnectionsCount = ++tcpCurrentConnections;
315 if (currentConnectionsCount > tcpMaxConcurrentConnections) {
316 tcpMaxConcurrentConnections.store(currentConnectionsCount);
317 }
318 }
319
320 int DownstreamState::pickSocketForSending()
321 {
322 size_t numberOfSockets = sockets.size();
323 if (numberOfSockets == 1) {
324 return sockets[0];
325 }
326
327 size_t idx;
328 if (s_randomizeSockets) {
329 idx = dnsdist::getRandomValue(numberOfSockets);
330 }
331 else {
332 idx = socketsOffset++;
333 }
334
335 return sockets[idx % numberOfSockets];
336 }
337
338 void DownstreamState::pickSocketsReadyForReceiving(std::vector<int>& ready)
339 {
340 ready.clear();
341
342 if (sockets.size() == 1) {
343 ready.push_back(sockets[0]);
344 return ;
345 }
346
347 (*mplexer.lock())->getAvailableFDs(ready, 1000);
348 }
349
350 bool DownstreamState::s_randomizeSockets{false};
351 bool DownstreamState::s_randomizeIDs{false};
352 int DownstreamState::s_udpTimeout{2};
353
354 static bool isIDSExpired(const IDState& ids)
355 {
356 auto age = ids.age.load();
357 return age > DownstreamState::s_udpTimeout;
358 }
359
360 void DownstreamState::handleUDPTimeout(IDState& ids)
361 {
362 ids.age = 0;
363 ids.inUse = false;
364 DOHUnitInterface::handleTimeout(std::move(ids.internal.du));
365 ++reuseds;
366 --outstanding;
367 ++dnsdist::metrics::g_stats.downstreamTimeouts; // this is an 'actively' discovered timeout
368 vinfolog("Had a downstream timeout from %s (%s) for query for %s|%s from %s",
369 d_config.remote.toStringWithPort(), getName(),
370 ids.internal.qname.toLogString(), QType(ids.internal.qtype).toString(), ids.internal.origRemote.toStringWithPort());
371
372 if (g_rings.shouldRecordResponses()) {
373 struct timespec ts;
374 gettime(&ts);
375
376 struct dnsheader fake;
377 memset(&fake, 0, sizeof(fake));
378 fake.id = ids.internal.origID;
379 uint16_t* flags = getFlagsFromDNSHeader(&fake);
380 *flags = ids.internal.origFlags;
381
382 g_rings.insertResponse(ts, ids.internal.origRemote, ids.internal.qname, ids.internal.qtype, std::numeric_limits<unsigned int>::max(), 0, fake, d_config.remote, getProtocol());
383 }
384
385 reportTimeoutOrError();
386 }
387
388 void DownstreamState::reportResponse(uint8_t rcode)
389 {
390 if (d_config.availability == Availability::Lazy && d_config.d_lazyHealthCheckSampleSize > 0) {
391 bool failure = d_config.d_lazyHealthCheckMode == LazyHealthCheckMode::TimeoutOrServFail ? rcode == RCode::ServFail : false;
392 d_lazyHealthCheckStats.lock()->d_lastResults.push_back(failure);
393 }
394 }
395
396 void DownstreamState::reportTimeoutOrError()
397 {
398 if (d_config.availability == Availability::Lazy && d_config.d_lazyHealthCheckSampleSize > 0) {
399 d_lazyHealthCheckStats.lock()->d_lastResults.push_back(true);
400 }
401 }
402
403 void DownstreamState::handleUDPTimeouts()
404 {
405 if (getProtocol() != dnsdist::Protocol::DoUDP) {
406 return;
407 }
408
409 if (s_randomizeIDs) {
410 auto map = d_idStatesMap.lock();
411 for (auto it = map->begin(); it != map->end(); ) {
412 auto& ids = it->second;
413 if (isIDSExpired(ids)) {
414 handleUDPTimeout(ids);
415 it = map->erase(it);
416 continue;
417 }
418 ++ids.age;
419 ++it;
420 }
421 }
422 else {
423 if (outstanding.load() > 0) {
424 for (IDState& ids : idStates) {
425 if (!ids.isInUse()) {
426 continue;
427 }
428 if (!isIDSExpired(ids)) {
429 ++ids.age;
430 continue;
431 }
432 auto guard = ids.acquire();
433 if (!guard) {
434 continue;
435 }
436 /* check again, now that we have locked this state */
437 if (ids.isInUse() && isIDSExpired(ids)) {
438 handleUDPTimeout(ids);
439 }
440 }
441 }
442 }
443 }
444
445 uint16_t DownstreamState::saveState(InternalQueryState&& state)
446 {
447 if (s_randomizeIDs) {
448 /* if the state is already in use we will retry,
449 up to 5 five times. The last selected one is used
450 even if it was already in use */
451 size_t remainingAttempts = 5;
452 auto map = d_idStatesMap.lock();
453
454 do {
455 uint16_t selectedID = dnsdist::getRandomValue(std::numeric_limits<uint16_t>::max());
456 auto [it, inserted] = map->emplace(selectedID, IDState());
457
458 if (!inserted) {
459 remainingAttempts--;
460 if (remainingAttempts > 0) {
461 continue;
462 }
463
464 auto oldDU = std::move(it->second.internal.du);
465 ++reuseds;
466 ++dnsdist::metrics::g_stats.downstreamTimeouts;
467 DOHUnitInterface::handleTimeout(std::move(oldDU));
468 }
469 else {
470 ++outstanding;
471 }
472
473 it->second.internal = std::move(state);
474 it->second.age.store(0);
475
476 return it->first;
477 }
478 while (true);
479 }
480
481 do {
482 uint16_t selectedID = (idOffset++) % idStates.size();
483 IDState& ids = idStates[selectedID];
484 auto guard = ids.acquire();
485 if (!guard) {
486 continue;
487 }
488 if (ids.isInUse()) {
489 /* we are reusing a state, no change in outstanding but if there was an existing DOHUnit we need
490 to handle it because it's about to be overwritten. */
491 auto oldDU = std::move(ids.internal.du);
492 ++reuseds;
493 ++dnsdist::metrics::g_stats.downstreamTimeouts;
494 DOHUnitInterface::handleTimeout(std::move(oldDU));
495 }
496 else {
497 ++outstanding;
498 }
499 ids.internal = std::move(state);
500 ids.age.store(0);
501 ids.inUse = true;
502 return selectedID;
503 }
504 while (true);
505 }
506
507 void DownstreamState::restoreState(uint16_t id, InternalQueryState&& state)
508 {
509 if (s_randomizeIDs) {
510 auto map = d_idStatesMap.lock();
511
512 auto [it, inserted] = map->emplace(id, IDState());
513 if (!inserted) {
514 /* already used */
515 ++reuseds;
516 ++dnsdist::metrics::g_stats.downstreamTimeouts;
517 DOHUnitInterface::handleTimeout(std::move(state.du));
518 }
519 else {
520 it->second.internal = std::move(state);
521 ++outstanding;
522 }
523 return;
524 }
525
526 auto& ids = idStates[id];
527 auto guard = ids.acquire();
528 if (!guard) {
529 /* already used */
530 ++reuseds;
531 ++dnsdist::metrics::g_stats.downstreamTimeouts;
532 DOHUnitInterface::handleTimeout(std::move(state.du));
533 return;
534 }
535 if (ids.isInUse()) {
536 /* already used */
537 ++reuseds;
538 ++dnsdist::metrics::g_stats.downstreamTimeouts;
539 DOHUnitInterface::handleTimeout(std::move(state.du));
540 return;
541 }
542 ids.internal = std::move(state);
543 ids.inUse = true;
544 ++outstanding;
545 }
546
547 std::optional<InternalQueryState> DownstreamState::getState(uint16_t id)
548 {
549 std::optional<InternalQueryState> result = std::nullopt;
550
551 if (s_randomizeIDs) {
552 auto map = d_idStatesMap.lock();
553
554 auto it = map->find(id);
555 if (it == map->end()) {
556 return result;
557 }
558
559 result = std::move(it->second.internal);
560 map->erase(it);
561 --outstanding;
562 return result;
563 }
564
565 if (id > idStates.size()) {
566 return result;
567 }
568
569 auto& ids = idStates[id];
570 auto guard = ids.acquire();
571 if (!guard) {
572 return result;
573 }
574
575 if (ids.isInUse()) {
576 result = std::move(ids.internal);
577 --outstanding;
578 }
579 ids.inUse = false;
580 return result;
581 }
582
583 bool DownstreamState::healthCheckRequired(std::optional<time_t> currentTime)
584 {
585 if (d_config.availability == DownstreamState::Availability::Lazy) {
586 auto stats = d_lazyHealthCheckStats.lock();
587 if (stats->d_status == LazyHealthCheckStats::LazyStatus::PotentialFailure) {
588 vinfolog("Sending health-check query for %s which is still in the Potential Failure state", getNameWithAddr());
589 return true;
590 }
591 if (stats->d_status == LazyHealthCheckStats::LazyStatus::Failed) {
592 auto now = currentTime ? *currentTime : time(nullptr);
593 if (stats->d_nextCheck <= now) {
594 /* we update the next check time here because the check might time out,
595 and we do not want to send a second check during that time unless
596 the timer is actually very short */
597 vinfolog("Sending health-check query for %s which is still in the Failed state", getNameWithAddr());
598 updateNextLazyHealthCheck(*stats, true, now);
599 return true;
600 }
601 return false;
602 }
603 if (stats->d_status == LazyHealthCheckStats::LazyStatus::Healthy) {
604 auto& lastResults = stats->d_lastResults;
605 size_t totalCount = lastResults.size();
606 if (totalCount < d_config.d_lazyHealthCheckMinSampleCount) {
607 return false;
608 }
609
610 size_t failures = 0;
611 for (const auto& result : lastResults) {
612 if (result) {
613 ++failures;
614 }
615 }
616
617 const auto maxFailureRate = static_cast<float>(d_config.d_lazyHealthCheckThreshold);
618 auto current = (100.0 * failures) / totalCount;
619 if (current >= maxFailureRate) {
620 lastResults.clear();
621 vinfolog("Backend %s reached the lazy health-check threshold (%f%% out of %f%%, looking at sample of %d items with %d failures), moving to Potential Failure state", getNameWithAddr(), current, maxFailureRate, totalCount, failures);
622 stats->d_status = LazyHealthCheckStats::LazyStatus::PotentialFailure;
623 /* we update the next check time here because the check might time out,
624 and we do not want to send a second check during that time unless
625 the timer is actually very short */
626 updateNextLazyHealthCheck(*stats, true);
627 return true;
628 }
629 }
630
631 return false;
632 }
633 else if (d_config.availability == DownstreamState::Availability::Auto) {
634
635 if (d_nextCheck > 1) {
636 --d_nextCheck;
637 return false;
638 }
639
640 d_nextCheck = d_config.checkInterval;
641 return true;
642 }
643
644 return false;
645 }
646
647 time_t DownstreamState::getNextLazyHealthCheck()
648 {
649 auto stats = d_lazyHealthCheckStats.lock();
650 return stats->d_nextCheck;
651 }
652
653 void DownstreamState::updateNextLazyHealthCheck(LazyHealthCheckStats& stats, bool checkScheduled, std::optional<time_t> currentTime)
654 {
655 auto now = currentTime ? * currentTime : time(nullptr);
656 if (d_config.d_lazyHealthCheckUseExponentialBackOff) {
657 if (stats.d_status == DownstreamState::LazyHealthCheckStats::LazyStatus::PotentialFailure) {
658 /* we are still in the "up" state, we need to send the next query quickly to
659 determine if the backend is really down */
660 stats.d_nextCheck = now + d_config.checkInterval;
661 vinfolog("Backend %s is in potential failure state, next check in %d seconds", getNameWithAddr(), d_config.checkInterval);
662 }
663 else if (consecutiveSuccessfulChecks > 0) {
664 /* we are in 'Failed' state, but just had one (or more) successful check,
665 so we want the next one to happen quite quickly as the backend might
666 be available again. */
667 stats.d_nextCheck = now + d_config.d_lazyHealthCheckFailedInterval;
668 if (!checkScheduled) {
669 vinfolog("Backend %s is in failed state but had %d consecutive successful checks, next check in %d seconds", getNameWithAddr(), std::to_string(consecutiveSuccessfulChecks), d_config.d_lazyHealthCheckFailedInterval);
670 }
671 }
672 else {
673 uint16_t failedTests = currentCheckFailures;
674 if (checkScheduled) {
675 /* we are planning the check after that one, which will only
676 occur if there is a failure */
677 failedTests++;
678 }
679
680 time_t backOff = d_config.d_lazyHealthCheckMaxBackOff;
681 const ExponentialBackOffTimer backOffTimer(d_config.d_lazyHealthCheckMaxBackOff);
682 auto backOffCoeffTmp = backOffTimer.get(failedTests);
683 /* backOffCoeffTmp cannot be higher than d_config.d_lazyHealthCheckMaxBackOff */
684 const auto backOffCoeff = static_cast<time_t>(backOffCoeffTmp);
685 if ((std::numeric_limits<time_t>::max() / d_config.d_lazyHealthCheckFailedInterval) >= backOffCoeff) {
686 backOff = d_config.d_lazyHealthCheckFailedInterval * backOffCoeff;
687 if (backOff > d_config.d_lazyHealthCheckMaxBackOff || (std::numeric_limits<time_t>::max() - now) <= backOff) {
688 backOff = d_config.d_lazyHealthCheckMaxBackOff;
689 }
690 }
691
692 stats.d_nextCheck = now + backOff;
693 vinfolog("Backend %s is in failed state and has failed %d consecutive checks, next check in %d seconds", getNameWithAddr(), failedTests, backOff);
694 }
695 }
696 else {
697 stats.d_nextCheck = now + d_config.d_lazyHealthCheckFailedInterval;
698 vinfolog("Backend %s is in %s state, next check in %d seconds", getNameWithAddr(), (stats.d_status == DownstreamState::LazyHealthCheckStats::LazyStatus::PotentialFailure ? "potential failure" : "failed"), d_config.d_lazyHealthCheckFailedInterval);
699 }
700 }
701
702 void DownstreamState::submitHealthCheckResult(bool initial, bool newResult)
703 {
704 if (!newResult) {
705 ++d_healthCheckMetrics.d_failures;
706 }
707
708 if (initial) {
709 /* if this is the initial health-check, at startup, we do not care
710 about the minimum number of failed/successful health-checks */
711 if (!IsAnyAddress(d_config.remote)) {
712 infolog("Marking downstream %s as '%s'", getNameWithAddr(), newResult ? "up" : "down");
713 }
714 setUpStatus(newResult);
715 if (newResult == false) {
716 currentCheckFailures++;
717 auto stats = d_lazyHealthCheckStats.lock();
718 stats->d_status = LazyHealthCheckStats::LazyStatus::Failed;
719 updateNextLazyHealthCheck(*stats, false);
720 }
721 return;
722 }
723
724 bool newState = newResult;
725
726 if (newResult) {
727 /* check succeeded */
728 currentCheckFailures = 0;
729
730 if (!upStatus) {
731 /* we were previously marked as "down" and had a successful health-check,
732 let's see if this is enough to move to the "up" state or if we need
733 more successful health-checks for that */
734 consecutiveSuccessfulChecks++;
735 if (consecutiveSuccessfulChecks < d_config.minRiseSuccesses) {
736 /* we need more than one successful check to rise
737 and we didn't reach the threshold yet, let's stay down */
738 newState = false;
739
740 if (d_config.availability == DownstreamState::Availability::Lazy) {
741 auto stats = d_lazyHealthCheckStats.lock();
742 updateNextLazyHealthCheck(*stats, false);
743 }
744 }
745 }
746
747 if (newState) {
748 if (d_config.availability == DownstreamState::Availability::Lazy) {
749 auto stats = d_lazyHealthCheckStats.lock();
750 vinfolog("Backend %s had %d successful checks, moving to Healthy", getNameWithAddr(), std::to_string(consecutiveSuccessfulChecks));
751 stats->d_status = LazyHealthCheckStats::LazyStatus::Healthy;
752 stats->d_lastResults.clear();
753 }
754 }
755 }
756 else {
757 /* check failed */
758 consecutiveSuccessfulChecks = 0;
759
760 currentCheckFailures++;
761
762 if (upStatus) {
763 /* we were previously marked as "up" and failed a health-check,
764 let's see if this is enough to move to the "down" state or if
765 need more failed checks for that */
766 if (currentCheckFailures < d_config.maxCheckFailures) {
767 /* we need more than one failure to be marked as down,
768 and we did not reach the threshold yet, let's stay up */
769 newState = true;
770 }
771 else if (d_config.availability == DownstreamState::Availability::Lazy) {
772 auto stats = d_lazyHealthCheckStats.lock();
773 vinfolog("Backend %s failed its health-check, moving from Potential failure to Failed", getNameWithAddr());
774 stats->d_status = LazyHealthCheckStats::LazyStatus::Failed;
775 currentCheckFailures = 0;
776 updateNextLazyHealthCheck(*stats, false);
777 }
778 }
779 }
780
781 if (newState != upStatus) {
782 /* we are actually moving to a new state */
783 if (!IsAnyAddress(d_config.remote)) {
784 infolog("Marking downstream %s as '%s'", getNameWithAddr(), newState ? "up" : "down");
785 }
786
787 if (newState && !isTCPOnly() && (!connected || d_config.reconnectOnUp)) {
788 newState = reconnect();
789 }
790
791 setUpStatus(newState);
792 if (g_snmpAgent && g_snmpTrapsEnabled) {
793 g_snmpAgent->sendBackendStatusChangeTrap(*this);
794 }
795 }
796 }
797
798 size_t ServerPool::countServers(bool upOnly)
799 {
800 std::shared_ptr<const ServerPolicy::NumberedServerVector> servers = nullptr;
801 {
802 auto lock = d_servers.read_lock();
803 servers = *lock;
804 }
805
806 size_t count = 0;
807 for (const auto& server : *servers) {
808 if (!upOnly || std::get<1>(server)->isUp() ) {
809 count++;
810 }
811 }
812
813 return count;
814 }
815
816 size_t ServerPool::poolLoad()
817 {
818 std::shared_ptr<const ServerPolicy::NumberedServerVector> servers = nullptr;
819 {
820 auto lock = d_servers.read_lock();
821 servers = *lock;
822 }
823
824 size_t load = 0;
825 for (const auto& server : *servers) {
826 size_t serverOutstanding = std::get<1>(server)->outstanding.load();
827 load += serverOutstanding;
828 }
829 return load;
830 }
831
832 const std::shared_ptr<const ServerPolicy::NumberedServerVector> ServerPool::getServers()
833 {
834 std::shared_ptr<const ServerPolicy::NumberedServerVector> result;
835 {
836 result = *(d_servers.read_lock());
837 }
838 return result;
839 }
840
841 void ServerPool::addServer(shared_ptr<DownstreamState>& server)
842 {
843 auto servers = d_servers.write_lock();
844 /* we can't update the content of the shared pointer directly even when holding the lock,
845 as other threads might hold a copy. We can however update the pointer as long as we hold the lock. */
846 unsigned int count = static_cast<unsigned int>((*servers)->size());
847 auto newServers = ServerPolicy::NumberedServerVector(*(*servers));
848 newServers.emplace_back(++count, server);
849 /* we need to reorder based on the server 'order' */
850 std::stable_sort(newServers.begin(), newServers.end(), [](const std::pair<unsigned int,std::shared_ptr<DownstreamState> >& a, const std::pair<unsigned int,std::shared_ptr<DownstreamState> >& b) {
851 return a.second->d_config.order < b.second->d_config.order;
852 });
853 /* and now we need to renumber for Lua (custom policies) */
854 size_t idx = 1;
855 for (auto& serv : newServers) {
856 serv.first = idx++;
857 }
858 *servers = std::make_shared<const ServerPolicy::NumberedServerVector>(std::move(newServers));
859 }
860
861 void ServerPool::removeServer(shared_ptr<DownstreamState>& server)
862 {
863 auto servers = d_servers.write_lock();
864 /* we can't update the content of the shared pointer directly even when holding the lock,
865 as other threads might hold a copy. We can however update the pointer as long as we hold the lock. */
866 auto newServers = std::make_shared<ServerPolicy::NumberedServerVector>(*(*servers));
867 size_t idx = 1;
868 bool found = false;
869 for (auto it = newServers->begin(); it != newServers->end();) {
870 if (found) {
871 /* we need to renumber the servers placed
872 after the removed one, for Lua (custom policies) */
873 it->first = idx++;
874 it++;
875 }
876 else if (it->second == server) {
877 it = newServers->erase(it);
878 found = true;
879 } else {
880 idx++;
881 it++;
882 }
883 }
884 *servers = std::move(newServers);
885 }