2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
29 #include <netinet/tcp.h>
31 #include <sys/resource.h>
34 #if defined (__OpenBSD__) || defined(__NetBSD__)
35 #include <readline/readline.h>
37 #include <editline/readline.h>
41 #include <systemd/sd-daemon.h>
45 #include "dnsdist-cache.hh"
46 #include "dnsdist-console.hh"
47 #include "dnsdist-ecs.hh"
48 #include "dnsdist-lua.hh"
49 #include "dnsdist-rings.hh"
50 #include "dnsdist-secpoll.hh"
51 #include "dnsdist-xpf.hh"
54 #include "delaypipe.hh"
57 #include "dnsparser.hh"
58 #include "dnswriter.hh"
59 #include "ednsoptions.hh"
63 #include "sodcrypto.hh"
65 #include "threadname.hh"
69 Receiver is currently single threaded
70 not *that* bad actually, but now that we are thread safe, might want to scale
74 Set of Rules, if one matches, it leads to an Action
75 Both rules and actions could conceivably be Lua based.
76 On the C++ side, both could be inherited from a class Rule and a class Action,
77 on the Lua side we can't do that. */
83 struct DNSDistStats g_stats
;
84 MetricDefinitionStorage g_metricDefinitions
;
86 uint16_t g_maxOutstanding
{std::numeric_limits
<uint16_t>::max()};
87 bool g_verboseHealthChecks
{false};
88 uint32_t g_staleCacheEntriesTTL
{0};
90 bool g_allowEmptyResponse
{false};
92 GlobalStateHolder
<NetmaskGroup
> g_ACL
;
93 string g_outputBuffer
;
95 std::vector
<std::shared_ptr
<TLSFrontend
>> g_tlslocals
;
96 std::vector
<std::shared_ptr
<DOHFrontend
>> g_dohlocals
;
97 std::vector
<std::shared_ptr
<DNSCryptContext
>> g_dnsCryptLocals
;
99 shared_ptr
<BPFFilter
> g_defaultBPFFilter
;
100 std::vector
<std::shared_ptr
<DynBPFFilter
> > g_dynBPFFilters
;
101 #endif /* HAVE_EBPF */
102 std::vector
<std::unique_ptr
<ClientState
>> g_frontends
;
103 GlobalStateHolder
<pools_t
> g_pools
;
104 size_t g_udpVectorSize
{1};
106 bool g_snmpEnabled
{false};
107 bool g_snmpTrapsEnabled
{false};
108 DNSDistSNMPAgent
* g_snmpAgent
{nullptr};
110 /* UDP: the grand design. Per socket we listen on for incoming queries there is one thread.
111 Then we have a bunch of connected sockets for talking to downstream servers.
112 We send directly to those sockets.
114 For the return path, per downstream server we have a thread that listens to responses.
116 Per socket there is an array of 2^16 states, when we send out a packet downstream, we note
117 there the original requestor and the original id. The new ID is the offset in the array.
119 When an answer comes in on a socket, we look up the offset by the id, and lob it to the
122 IDs are assigned by atomic increments of the socket offset.
125 GlobalStateHolder
<vector
<DNSDistRuleAction
> > g_rulactions
;
126 GlobalStateHolder
<vector
<DNSDistResponseRuleAction
> > g_resprulactions
;
127 GlobalStateHolder
<vector
<DNSDistResponseRuleAction
> > g_cachehitresprulactions
;
128 GlobalStateHolder
<vector
<DNSDistResponseRuleAction
> > g_selfansweredresprulactions
;
133 GlobalStateHolder
<servers_t
> g_dstates
;
134 GlobalStateHolder
<NetmaskTree
<DynBlock
>> g_dynblockNMG
;
135 GlobalStateHolder
<SuffixMatchTree
<DynBlock
>> g_dynblockSMT
;
136 DNSAction::Action g_dynBlockAction
= DNSAction::Action::Drop
;
137 int g_tcpRecvTimeout
{2};
138 int g_tcpSendTimeout
{2};
141 bool g_servFailOnNoPolicy
{false};
142 bool g_truncateTC
{false};
143 bool g_fixupCase
{false};
144 bool g_preserveTrailingData
{false};
145 bool g_roundrobinFailOnNoServer
{false};
147 static void truncateTC(char* packet
, uint16_t* len
, size_t responseSize
, unsigned int consumed
)
150 bool hadEDNS
= false;
151 uint16_t payloadSize
= 0;
154 if (g_addEDNSToSelfGeneratedResponses
) {
155 hadEDNS
= getEDNSUDPPayloadSizeAndZ(packet
, *len
, &payloadSize
, &z
);
158 *len
=static_cast<uint16_t>(sizeof(dnsheader
)+consumed
+DNS_TYPE_SIZE
+DNS_CLASS_SIZE
);
159 struct dnsheader
* dh
= reinterpret_cast<struct dnsheader
*>(packet
);
160 dh
->ancount
= dh
->arcount
= dh
->nscount
= 0;
163 addEDNS(dh
, *len
, responseSize
, z
& EDNS_HEADER_FLAG_DO
, payloadSize
, 0);
175 ComboAddress destination
;
176 ComboAddress origDest
;
180 if(origDest
.sin4
.sin_family
== 0) {
181 res
= sendto(fd
, packet
.c_str(), packet
.size(), 0, (struct sockaddr
*)&destination
, destination
.getSocklen());
184 res
= sendfromto(fd
, packet
.c_str(), packet
.size(), 0, origDest
, destination
);
188 vinfolog("Error sending delayed response to %s: %s", destination
.toStringWithPort(), strerror(err
));
193 DelayPipe
<DelayedPacket
>* g_delay
= nullptr;
195 void doLatencyStats(double udiff
)
197 if(udiff
< 1000) ++g_stats
.latency0_1
;
198 else if(udiff
< 10000) ++g_stats
.latency1_10
;
199 else if(udiff
< 50000) ++g_stats
.latency10_50
;
200 else if(udiff
< 100000) ++g_stats
.latency50_100
;
201 else if(udiff
< 1000000) ++g_stats
.latency100_1000
;
202 else ++g_stats
.latencySlow
;
203 g_stats
.latencySum
+= udiff
/ 1000;
205 auto doAvg
= [](double& var
, double n
, double weight
) {
206 var
= (weight
-1) * var
/weight
+ n
/weight
;
209 doAvg(g_stats
.latencyAvg100
, udiff
, 100);
210 doAvg(g_stats
.latencyAvg1000
, udiff
, 1000);
211 doAvg(g_stats
.latencyAvg10000
, udiff
, 10000);
212 doAvg(g_stats
.latencyAvg1000000
, udiff
, 1000000);
215 bool responseContentMatches(const char* response
, const uint16_t responseLen
, const DNSName
& qname
, const uint16_t qtype
, const uint16_t qclass
, const ComboAddress
& remote
, unsigned int& consumed
)
217 if (responseLen
< sizeof(dnsheader
)) {
221 const struct dnsheader
* dh
= reinterpret_cast<const struct dnsheader
*>(response
);
222 if (dh
->qdcount
== 0) {
223 if ((dh
->rcode
!= RCode::NoError
&& dh
->rcode
!= RCode::NXDomain
) || g_allowEmptyResponse
) {
227 ++g_stats
.nonCompliantResponses
;
232 uint16_t rqtype
, rqclass
;
235 rqname
=DNSName(response
, responseLen
, sizeof(dnsheader
), false, &rqtype
, &rqclass
, &consumed
);
237 catch(const std::exception
& e
) {
238 if(responseLen
> 0 && static_cast<size_t>(responseLen
) > sizeof(dnsheader
)) {
239 infolog("Backend %s sent us a response with id %d that did not parse: %s", remote
.toStringWithPort(), ntohs(dh
->id
), e
.what());
241 ++g_stats
.nonCompliantResponses
;
245 if (rqtype
!= qtype
|| rqclass
!= qclass
|| rqname
!= qname
) {
252 static void restoreFlags(struct dnsheader
* dh
, uint16_t origFlags
)
254 static const uint16_t rdMask
= 1 << FLAGS_RD_OFFSET
;
255 static const uint16_t cdMask
= 1 << FLAGS_CD_OFFSET
;
256 static const uint16_t restoreFlagsMask
= UINT16_MAX
& ~(rdMask
| cdMask
);
257 uint16_t * flags
= getFlagsFromDNSHeader(dh
);
258 /* clear the flags we are about to restore */
259 *flags
&= restoreFlagsMask
;
260 /* only keep the flags we want to restore */
261 origFlags
&= ~restoreFlagsMask
;
262 /* set the saved flags as they were */
266 static bool fixUpQueryTurnedResponse(DNSQuestion
& dq
, const uint16_t origFlags
)
268 restoreFlags(dq
.dh
, origFlags
);
270 return addEDNSToQueryTurnedResponse(dq
);
273 static bool fixUpResponse(char** response
, uint16_t* responseLen
, size_t* responseSize
, const DNSName
& qname
, uint16_t origFlags
, bool ednsAdded
, bool ecsAdded
, std::vector
<uint8_t>& rewrittenResponse
, uint16_t addRoom
, bool* zeroScope
)
275 if (*responseLen
< sizeof(dnsheader
)) {
279 struct dnsheader
* dh
= reinterpret_cast<struct dnsheader
*>(*response
);
280 restoreFlags(dh
, origFlags
);
282 if (*responseLen
== sizeof(dnsheader
)) {
287 string realname
= qname
.toDNSString();
288 if (*responseLen
>= (sizeof(dnsheader
) + realname
.length())) {
289 memcpy(*response
+ sizeof(dnsheader
), realname
.c_str(), realname
.length());
293 if (ednsAdded
|| ecsAdded
) {
298 const std::string
responseStr(*response
, *responseLen
);
299 int res
= locateEDNSOptRR(responseStr
, &optStart
, &optLen
, &last
);
302 if (zeroScope
) { // this finds if an EDNS Client Subnet scope was set, and if it is 0
303 size_t optContentStart
= 0;
304 uint16_t optContentLen
= 0;
305 /* we need at least 4 bytes after the option length (family: 2, source prefix-length: 1, scope prefix-length: 1) */
306 if (isEDNSOptionInOpt(responseStr
, optStart
, optLen
, EDNSOptionCode::ECS
, &optContentStart
, &optContentLen
) && optContentLen
>= 4) {
307 /* see if the EDNS Client Subnet SCOPE PREFIX-LENGTH byte in position 3 is set to 0, which is the only thing
309 *zeroScope
= responseStr
.at(optContentStart
+ 3) == 0;
314 /* we added the entire OPT RR,
315 therefore we need to remove it entirely */
317 /* simply remove the last AR */
318 *responseLen
-= optLen
;
319 uint16_t arcount
= ntohs(dh
->arcount
);
321 dh
->arcount
= htons(arcount
);
324 /* Removing an intermediary RR could lead to compression error */
325 if (rewriteResponseWithoutEDNS(responseStr
, rewrittenResponse
) == 0) {
326 *responseLen
= rewrittenResponse
.size();
327 if (addRoom
&& (UINT16_MAX
- *responseLen
) > addRoom
) {
328 rewrittenResponse
.reserve(*responseLen
+ addRoom
);
330 *responseSize
= rewrittenResponse
.capacity();
331 *response
= reinterpret_cast<char*>(rewrittenResponse
.data());
334 warnlog("Error rewriting content");
339 /* the OPT RR was already present, but without ECS,
340 we need to remove the ECS option if any */
342 /* nothing after the OPT RR, we can simply remove the
344 size_t existingOptLen
= optLen
;
345 removeEDNSOptionFromOPT(*response
+ optStart
, &optLen
, EDNSOptionCode::ECS
);
346 *responseLen
-= (existingOptLen
- optLen
);
349 /* Removing an intermediary RR could lead to compression error */
350 if (rewriteResponseWithoutEDNSOption(responseStr
, EDNSOptionCode::ECS
, rewrittenResponse
) == 0) {
351 *responseLen
= rewrittenResponse
.size();
352 if (addRoom
&& (UINT16_MAX
- *responseLen
) > addRoom
) {
353 rewrittenResponse
.reserve(*responseLen
+ addRoom
);
355 *responseSize
= rewrittenResponse
.capacity();
356 *response
= reinterpret_cast<char*>(rewrittenResponse
.data());
359 warnlog("Error rewriting content");
370 static bool encryptResponse(char* response
, uint16_t* responseLen
, size_t responseSize
, bool tcp
, std::shared_ptr
<DNSCryptQuery
> dnsCryptQuery
, dnsheader
** dh
, dnsheader
* dhCopy
)
373 uint16_t encryptedResponseLen
= 0;
375 /* save the original header before encrypting it in place */
376 if (dh
!= nullptr && *dh
!= nullptr && dhCopy
!= nullptr) {
377 memcpy(dhCopy
, *dh
, sizeof(dnsheader
));
381 int res
= dnsCryptQuery
->encryptResponse(response
, *responseLen
, responseSize
, tcp
, &encryptedResponseLen
);
383 *responseLen
= encryptedResponseLen
;
385 /* dropping response */
386 vinfolog("Error encrypting the response, dropping.");
392 #endif /* HAVE_DNSCRYPT */
394 static bool applyRulesToResponse(LocalStateHolder
<vector
<DNSDistResponseRuleAction
> >& localRespRulactions
, DNSResponse
& dr
)
396 DNSResponseAction::Action action
=DNSResponseAction::Action::None
;
397 std::string ruleresult
;
398 for(const auto& lr
: *localRespRulactions
) {
399 if(lr
.d_rule
->matches(&dr
)) {
400 lr
.d_rule
->d_matches
++;
401 action
=(*lr
.d_action
)(&dr
, &ruleresult
);
403 case DNSResponseAction::Action::Allow
:
406 case DNSResponseAction::Action::Drop
:
409 case DNSResponseAction::Action::HeaderModify
:
412 case DNSResponseAction::Action::ServFail
:
413 dr
.dh
->rcode
= RCode::ServFail
;
416 /* non-terminal actions follow */
417 case DNSResponseAction::Action::Delay
:
418 dr
.delayMsec
= static_cast<int>(pdns_stou(ruleresult
)); // sorry
420 case DNSResponseAction::Action::None
:
429 bool processResponse(char** response
, uint16_t* responseLen
, size_t* responseSize
, LocalStateHolder
<vector
<DNSDistResponseRuleAction
> >& localRespRulactions
, DNSResponse
& dr
, size_t addRoom
, std::vector
<uint8_t>& rewrittenResponse
, bool muted
)
431 if (!applyRulesToResponse(localRespRulactions
, dr
)) {
435 bool zeroScope
= false;
436 if (!fixUpResponse(response
, responseLen
, responseSize
, *dr
.qname
, dr
.origFlags
, dr
.ednsAdded
, dr
.ecsAdded
, rewrittenResponse
, addRoom
, dr
.useZeroScope
? &zeroScope
: nullptr)) {
440 if (dr
.packetCache
&& !dr
.skipCache
) {
441 if (!dr
.useZeroScope
) {
442 /* if the query was not suitable for zero-scope, for
443 example because it had an existing ECS entry so the hash is
444 not really 'no ECS', so just insert it for the existing subnet
446 - we don't have the correct hash for a non-ECS query
447 - inserting with hash computed before the ECS replacement but with
448 the subnet extracted _after_ the replacement would not work.
452 // if zeroScope, pass the pre-ECS hash-key and do not pass the subnet to the cache
453 dr
.packetCache
->insert(zeroScope
? dr
.cacheKeyNoECS
: dr
.cacheKey
, zeroScope
? boost::none
: dr
.subnet
, dr
.origFlags
, dr
.dnssecOK
, *dr
.qname
, dr
.qtype
, dr
.qclass
, *response
, *responseLen
, dr
.tcp
, dr
.dh
->rcode
, dr
.tempFailureTTL
);
458 if (!encryptResponse(*response
, responseLen
, *responseSize
, dr
.tcp
, dr
.dnsCryptQuery
, nullptr, nullptr)) {
462 #endif /* HAVE_DNSCRYPT */
467 static bool sendUDPResponse(int origFD
, const char* response
, const uint16_t responseLen
, const int delayMsec
, const ComboAddress
& origDest
, const ComboAddress
& origRemote
)
469 if(delayMsec
&& g_delay
) {
470 DelayedPacket dp
{origFD
, string(response
,responseLen
), origRemote
, origDest
};
471 g_delay
->submit(dp
, delayMsec
);
475 if(origDest
.sin4
.sin_family
== 0) {
476 res
= sendto(origFD
, response
, responseLen
, 0, reinterpret_cast<const struct sockaddr
*>(&origRemote
), origRemote
.getSocklen());
479 res
= sendfromto(origFD
, response
, responseLen
, 0, origDest
, origRemote
);
483 vinfolog("Error sending response to %s: %s", origRemote
.toStringWithPort(), stringerror(err
));
491 int pickBackendSocketForSending(std::shared_ptr
<DownstreamState
>& state
)
493 return state
->sockets
[state
->socketsOffset
++ % state
->sockets
.size()];
496 static void pickBackendSocketsReadyForReceiving(const std::shared_ptr
<DownstreamState
>& state
, std::vector
<int>& ready
)
500 if (state
->sockets
.size() == 1) {
501 ready
.push_back(state
->sockets
[0]);
506 std::lock_guard
<std::mutex
> lock(state
->socketsLock
);
507 state
->mplexer
->getAvailableFDs(ready
, -1);
511 // listens on a dedicated socket, lobs answers from downstream servers to original requestors
512 void responderThread(std::shared_ptr
<DownstreamState
> dss
)
514 setThreadName("dnsdist/respond");
515 auto localRespRulactions
= g_resprulactions
.getLocal();
516 char packet
[4096 + DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE
];
517 static_assert(sizeof(packet
) <= UINT16_MAX
, "Packet size should fit in a uint16_t");
518 /* when the answer is encrypted in place, we need to get a copy
519 of the original header before encryption to fill the ring buffer */
520 dnsheader cleartextDH
;
521 vector
<uint8_t> rewrittenResponse
;
523 uint16_t queryId
= 0;
524 std::vector
<int> sockets
;
525 sockets
.reserve(dss
->sockets
.size());
528 dnsheader
* dh
= reinterpret_cast<struct dnsheader
*>(packet
);
530 pickBackendSocketsReadyForReceiving(dss
, sockets
);
531 for (const auto& fd
: sockets
) {
532 ssize_t got
= recv(fd
, packet
, sizeof(packet
), 0);
533 char * response
= packet
;
534 size_t responseSize
= sizeof(packet
);
536 if (got
< 0 || static_cast<size_t>(got
) < sizeof(dnsheader
))
539 uint16_t responseLen
= static_cast<uint16_t>(got
);
542 if(queryId
>= dss
->idStates
.size()) {
546 IDState
* ids
= &dss
->idStates
[queryId
];
547 int64_t usageIndicator
= ids
->usageIndicator
;
549 if(!IDState::isInUse(usageIndicator
)) {
550 /* the corresponding state is marked as not in use, meaning that:
551 - it was already cleaned up by another thread and the state is gone ;
552 - we already got a response for this query and this one is a duplicate.
553 Either way, we don't touch it.
558 /* read the potential DOHUnit state as soon as possible, but don't use it
559 until we have confirmed that we own this state by updating usageIndicator */
561 /* setting age to 0 to prevent the maintainer thread from
562 cleaning this IDS while we process the response.
565 int origFD
= ids
->origFD
;
567 unsigned int consumed
= 0;
568 if (!responseContentMatches(response
, responseLen
, ids
->qname
, ids
->qtype
, ids
->qclass
, dss
->remote
, consumed
)) {
572 bool isDoH
= du
!= nullptr;
573 /* atomically mark the state as available, but only if it has not been altered
575 if (ids
->tryMarkUnused(usageIndicator
)) {
576 /* clear the potential DOHUnit asap, it's ours now
577 and since we just marked the state as unused,
578 someone could overwrite it. */
580 /* we only decrement the outstanding counter if the value was not
581 altered in the meantime, which would mean that the state has been actively reused
582 and the other thread has not incremented the outstanding counter, so we don't
583 want it to be decremented twice. */
584 --dss
->outstanding
; // you'd think an attacker could game this, but we're using connected socket
586 /* someone updated the state in the meantime, we can't touch the existing pointer */
588 /* since the state has been updated, we can't safely access it so let's just drop
593 if(dh
->tc
&& g_truncateTC
) {
594 truncateTC(response
, &responseLen
, responseSize
, consumed
);
597 dh
->id
= ids
->origID
;
599 uint16_t addRoom
= 0;
600 DNSResponse dr
= makeDNSResponseFromIDState(*ids
, dh
, sizeof(packet
), responseLen
, false);
601 if (dr
.dnsCryptQuery
) {
602 addRoom
= DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE
;
605 memcpy(&cleartextDH
, dr
.dh
, sizeof(cleartextDH
));
606 if (!processResponse(&response
, &responseLen
, &responseSize
, localRespRulactions
, dr
, addRoom
, rewrittenResponse
, ids
->cs
&& ids
->cs
->muted
)) {
610 if (ids
->cs
&& !ids
->cs
->muted
) {
612 #ifdef HAVE_DNS_OVER_HTTPS
614 du
->response
= std::string(response
, responseLen
);
615 if (send(du
->rsock
, &du
, sizeof(du
), 0) != sizeof(du
)) {
616 /* at this point we have the only remaining pointer on this
617 DOHUnit object since we did set ids->du to nullptr earlier */
620 #endif /* HAVE_DNS_OVER_HTTPS */
625 empty
.sin4
.sin_family
= 0;
626 /* if ids->destHarvested is false, origDest holds the listening address.
627 We don't want to use that as a source since it could be 0.0.0.0 for example. */
628 sendUDPResponse(origFD
, response
, responseLen
, dr
.delayMsec
, ids
->destHarvested
? ids
->origDest
: empty
, ids
->origRemote
);
633 ++ids
->cs
->responses
;
636 double udiff
= ids
->sentTime
.udiff();
637 vinfolog("Got answer from %s, relayed to %s%s, took %f usec", dss
->remote
.toStringWithPort(), ids
->origRemote
.toStringWithPort(),
638 isDoH
? " (https)": "", udiff
);
642 g_rings
.insertResponse(ts
, *dr
.remote
, *dr
.qname
, dr
.qtype
, static_cast<unsigned int>(udiff
), static_cast<unsigned int>(got
), cleartextDH
, dss
->remote
);
644 switch (cleartextDH
.rcode
) {
645 case RCode::NXDomain
:
646 ++g_stats
.frontendNXDomain
;
648 case RCode::ServFail
:
649 ++g_stats
.servfailResponses
;
650 ++g_stats
.frontendServFail
;
653 ++g_stats
.frontendNoError
;
656 dss
->latencyUsec
= (127.0 * dss
->latencyUsec
/ 128.0) + udiff
/128.0;
658 doLatencyStats(udiff
);
660 rewrittenResponse
.clear();
663 catch(const std::exception
& e
){
664 vinfolog("Got an error in UDP responder thread while parsing a response from %s, id %d: %s", dss
->remote
.toStringWithPort(), queryId
, e
.what());
668 catch(const std::exception
& e
)
670 errlog("UDP responder thread died because of exception: %s", e
.what());
672 catch(const PDNSException
& e
)
674 errlog("UDP responder thread died because of PowerDNS exception: %s", e
.reason
);
678 errlog("UDP responder thread died because of an exception: %s", "unknown");
681 bool DownstreamState::reconnect()
683 std::unique_lock
<std::mutex
> tl(connectLock
, std::try_to_lock
);
684 if (!tl
.owns_lock()) {
685 /* we are already reconnecting */
690 for (auto& fd
: sockets
) {
692 if (sockets
.size() > 1) {
693 std::lock_guard
<std::mutex
> lock(socketsLock
);
694 mplexer
->removeReadFD(fd
);
696 /* shutdown() is needed to wake up recv() in the responderThread */
697 shutdown(fd
, SHUT_RDWR
);
701 if (!IsAnyAddress(remote
)) {
702 fd
= SSocket(remote
.sin4
.sin_family
, SOCK_DGRAM
, 0);
703 if (!IsAnyAddress(sourceAddr
)) {
704 SSetsockopt(fd
, SOL_SOCKET
, SO_REUSEADDR
, 1);
705 SBind(fd
, sourceAddr
);
708 SConnect(fd
, remote
);
709 if (sockets
.size() > 1) {
710 std::lock_guard
<std::mutex
> lock(socketsLock
);
711 mplexer
->addReadFD(fd
, [](int, boost::any
) {});
715 catch(const std::runtime_error
& error
) {
716 infolog("Error connecting to new server with address %s: %s", remote
.toStringWithPort(), error
.what());
723 /* if at least one (re-)connection failed, close all sockets */
725 for (auto& fd
: sockets
) {
727 if (sockets
.size() > 1) {
728 std::lock_guard
<std::mutex
> lock(socketsLock
);
729 mplexer
->removeReadFD(fd
);
731 /* shutdown() is needed to wake up recv() in the responderThread */
732 shutdown(fd
, SHUT_RDWR
);
741 void DownstreamState::hash()
743 vinfolog("Computing hashes for id=%s and weight=%d", id
, weight
);
745 WriteLock
wl(&d_lock
);
748 std::string uuid
= boost::str(boost::format("%s-%d") % id
% w
);
749 unsigned int wshash
= burtleCI((const unsigned char*)uuid
.c_str(), uuid
.size(), g_hashperturb
);
750 hashes
.insert(wshash
);
755 void DownstreamState::setId(const boost::uuids::uuid
& newId
)
758 // compute hashes only if already done
759 if (!hashes
.empty()) {
764 void DownstreamState::setWeight(int newWeight
)
767 errlog("Error setting server's weight: downstream weight value must be greater than 0.");
771 if (!hashes
.empty()) {
776 DownstreamState::DownstreamState(const ComboAddress
& remote_
, const ComboAddress
& sourceAddr_
, unsigned int sourceItf_
, size_t numberOfSockets
): remote(remote_
), sourceAddr(sourceAddr_
), sourceItf(sourceItf_
)
778 pthread_rwlock_init(&d_lock
, nullptr);
780 threadStarted
.clear();
782 mplexer
= std::unique_ptr
<FDMultiplexer
>(FDMultiplexer::getMultiplexerSilent());
784 sockets
.resize(numberOfSockets
);
785 for (auto& fd
: sockets
) {
789 if (!IsAnyAddress(remote
)) {
791 idStates
.resize(g_maxOutstanding
);
793 infolog("Added downstream server %s", remote
.toStringWithPort());
798 std::mutex g_luamutex
;
801 GlobalStateHolder
<ServerPolicy
> g_policy
;
803 shared_ptr
<DownstreamState
> firstAvailable(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
805 for(auto& d
: servers
) {
806 if(d
.second
->isUp() && d
.second
->qps
.check())
809 return leastOutstanding(servers
, dq
);
812 // get server with least outstanding queries, and within those, with the lowest order, and within those: the fastest
813 shared_ptr
<DownstreamState
> leastOutstanding(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
815 if (servers
.size() == 1 && servers
[0].second
->isUp()) {
816 return servers
[0].second
;
819 vector
<pair
<tuple
<int,int,double>, shared_ptr
<DownstreamState
>>> poss
;
820 /* so you might wonder, why do we go through this trouble? The data on which we sort could change during the sort,
821 which would suck royally and could even lead to crashes. So first we snapshot on what we sort, and then we sort */
822 poss
.reserve(servers
.size());
823 for(auto& d
: servers
) {
824 if(d
.second
->isUp()) {
825 poss
.push_back({make_tuple(d
.second
->outstanding
.load(), d
.second
->order
, d
.second
->latencyUsec
), d
.second
});
829 return shared_ptr
<DownstreamState
>();
830 nth_element(poss
.begin(), poss
.begin(), poss
.end(), [](const decltype(poss
)::value_type
& a
, const decltype(poss
)::value_type
& b
) { return a
.first
< b
.first
; });
831 return poss
.begin()->second
;
834 shared_ptr
<DownstreamState
> valrandom(unsigned int val
, const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
836 vector
<pair
<int, shared_ptr
<DownstreamState
>>> poss
;
838 int max
= std::numeric_limits
<int>::max();
840 for(auto& d
: servers
) { // w=1, w=10 -> 1, 11
841 if(d
.second
->isUp()) {
842 // Don't overflow sum when adding high weights
843 if(d
.second
->weight
> max
- sum
) {
846 sum
+= d
.second
->weight
;
849 poss
.push_back({sum
, d
.second
});
853 // Catch poss & sum are empty to avoid SIGFPE
855 return shared_ptr
<DownstreamState
>();
858 auto p
= upper_bound(poss
.begin(), poss
.end(),r
, [](int r_
, const decltype(poss
)::value_type
& a
) { return r_
< a
.first
;});
860 return shared_ptr
<DownstreamState
>();
864 shared_ptr
<DownstreamState
> wrandom(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
866 return valrandom(random(), servers
, dq
);
869 uint32_t g_hashperturb
;
870 shared_ptr
<DownstreamState
> whashed(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
872 return valrandom(dq
->qname
->hash(g_hashperturb
), servers
, dq
);
875 shared_ptr
<DownstreamState
> chashed(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
877 unsigned int qhash
= dq
->qname
->hash(g_hashperturb
);
878 unsigned int sel
= std::numeric_limits
<unsigned int>::max();
879 unsigned int min
= std::numeric_limits
<unsigned int>::max();
880 shared_ptr
<DownstreamState
> ret
= nullptr, first
= nullptr;
882 for (const auto& d
: servers
) {
883 if (d
.second
->isUp()) {
884 // make sure hashes have been computed
885 if (d
.second
->hashes
.empty()) {
889 ReadLock
rl(&(d
.second
->d_lock
));
890 const auto& server
= d
.second
;
891 // we want to keep track of the last hash
892 if (min
> *(server
->hashes
.begin())) {
893 min
= *(server
->hashes
.begin());
897 auto hash_it
= server
->hashes
.lower_bound(qhash
);
898 if (hash_it
!= server
->hashes
.end()) {
899 if (*hash_it
< sel
) {
907 if (ret
!= nullptr) {
910 if (first
!= nullptr) {
913 return shared_ptr
<DownstreamState
>();
916 shared_ptr
<DownstreamState
> roundrobin(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
918 NumberedServerVector poss
;
920 for(auto& d
: servers
) {
921 if(d
.second
->isUp()) {
926 const auto *res
=&poss
;
927 if(poss
.empty() && !g_roundrobinFailOnNoServer
)
931 return shared_ptr
<DownstreamState
>();
933 static unsigned int counter
;
935 return (*res
)[(counter
++) % res
->size()].second
;
938 ComboAddress g_serverControl
{"127.0.0.1:5199"};
940 std::shared_ptr
<ServerPool
> createPoolIfNotExists(pools_t
& pools
, const string
& poolName
)
942 std::shared_ptr
<ServerPool
> pool
;
943 pools_t::iterator it
= pools
.find(poolName
);
944 if (it
!= pools
.end()) {
948 if (!poolName
.empty())
949 vinfolog("Creating pool %s", poolName
);
950 pool
= std::make_shared
<ServerPool
>();
951 pools
.insert(std::pair
<std::string
,std::shared_ptr
<ServerPool
> >(poolName
, pool
));
956 void setPoolPolicy(pools_t
& pools
, const string
& poolName
, std::shared_ptr
<ServerPolicy
> policy
)
958 std::shared_ptr
<ServerPool
> pool
= createPoolIfNotExists(pools
, poolName
);
959 if (!poolName
.empty()) {
960 vinfolog("Setting pool %s server selection policy to %s", poolName
, policy
->name
);
962 vinfolog("Setting default pool server selection policy to %s", policy
->name
);
964 pool
->policy
= policy
;
967 void addServerToPool(pools_t
& pools
, const string
& poolName
, std::shared_ptr
<DownstreamState
> server
)
969 std::shared_ptr
<ServerPool
> pool
= createPoolIfNotExists(pools
, poolName
);
970 if (!poolName
.empty()) {
971 vinfolog("Adding server to pool %s", poolName
);
973 vinfolog("Adding server to default pool");
975 pool
->addServer(server
);
978 void removeServerFromPool(pools_t
& pools
, const string
& poolName
, std::shared_ptr
<DownstreamState
> server
)
980 std::shared_ptr
<ServerPool
> pool
= getPool(pools
, poolName
);
982 if (!poolName
.empty()) {
983 vinfolog("Removing server from pool %s", poolName
);
986 vinfolog("Removing server from default pool");
989 pool
->removeServer(server
);
992 std::shared_ptr
<ServerPool
> getPool(const pools_t
& pools
, const std::string
& poolName
)
994 pools_t::const_iterator it
= pools
.find(poolName
);
996 if (it
== pools
.end()) {
997 throw std::out_of_range("No pool named " + poolName
);
1003 NumberedServerVector
getDownstreamCandidates(const pools_t
& pools
, const std::string
& poolName
)
1005 std::shared_ptr
<ServerPool
> pool
= getPool(pools
, poolName
);
1006 return pool
->getServers();
1009 static void spoofResponseFromString(DNSQuestion
& dq
, const string
& spoofContent
)
1013 std::vector
<std::string
> addrs
;
1014 stringtok(addrs
, spoofContent
, " ,");
1016 if (addrs
.size() == 1) {
1018 ComboAddress
spoofAddr(spoofContent
);
1019 SpoofAction
sa({spoofAddr
});
1022 catch(const PDNSException
&e
) {
1023 SpoofAction
sa(spoofContent
); // CNAME then
1027 std::vector
<ComboAddress
> cas
;
1028 for (const auto& addr
: addrs
) {
1030 cas
.push_back(ComboAddress(addr
));
1035 SpoofAction
sa(cas
);
1040 bool processRulesResult(const DNSAction::Action
& action
, DNSQuestion
& dq
, std::string
& ruleresult
, bool& drop
)
1043 case DNSAction::Action::Allow
:
1046 case DNSAction::Action::Drop
:
1051 case DNSAction::Action::Nxdomain
:
1052 dq
.dh
->rcode
= RCode::NXDomain
;
1054 ++g_stats
.ruleNXDomain
;
1057 case DNSAction::Action::Refused
:
1058 dq
.dh
->rcode
= RCode::Refused
;
1060 ++g_stats
.ruleRefused
;
1063 case DNSAction::Action::ServFail
:
1064 dq
.dh
->rcode
= RCode::ServFail
;
1066 ++g_stats
.ruleServFail
;
1069 case DNSAction::Action::Spoof
:
1070 spoofResponseFromString(dq
, ruleresult
);
1073 case DNSAction::Action::Truncate
:
1078 case DNSAction::Action::HeaderModify
:
1081 case DNSAction::Action::Pool
:
1082 dq
.poolname
=ruleresult
;
1085 case DNSAction::Action::NoRecurse
:
1089 /* non-terminal actions follow */
1090 case DNSAction::Action::Delay
:
1091 dq
.delayMsec
= static_cast<int>(pdns_stou(ruleresult
)); // sorry
1093 case DNSAction::Action::None
:
1095 case DNSAction::Action::NoOp
:
1099 /* false means that we don't stop the processing */
1104 static bool applyRulesToQuery(LocalHolders
& holders
, DNSQuestion
& dq
, const struct timespec
& now
)
1106 g_rings
.insertQuery(now
, *dq
.remote
, *dq
.qname
, dq
.qtype
, dq
.len
, *dq
.dh
);
1108 if(g_qcount
.enabled
) {
1109 string qname
= (*dq
.qname
).toLogString();
1110 bool countQuery
{true};
1111 if(g_qcount
.filter
) {
1112 std::lock_guard
<std::mutex
> lock(g_luamutex
);
1113 std::tie (countQuery
, qname
) = g_qcount
.filter(&dq
);
1117 WriteLock
wl(&g_qcount
.queryLock
);
1118 if(!g_qcount
.records
.count(qname
)) {
1119 g_qcount
.records
[qname
] = 0;
1121 g_qcount
.records
[qname
]++;
1125 if(auto got
= holders
.dynNMGBlock
->lookup(*dq
.remote
)) {
1126 auto updateBlockStats
= [&got
]() {
1127 ++g_stats
.dynBlocked
;
1128 got
->second
.blocks
++;
1131 if(now
< got
->second
.until
) {
1132 DNSAction::Action action
= got
->second
.action
;
1133 if (action
== DNSAction::Action::None
) {
1134 action
= g_dynBlockAction
;
1137 case DNSAction::Action::NoOp
:
1141 case DNSAction::Action::Nxdomain
:
1142 vinfolog("Query from %s turned into NXDomain because of dynamic block", dq
.remote
->toStringWithPort());
1145 dq
.dh
->rcode
= RCode::NXDomain
;
1149 case DNSAction::Action::Refused
:
1150 vinfolog("Query from %s refused because of dynamic block", dq
.remote
->toStringWithPort());
1153 dq
.dh
->rcode
= RCode::Refused
;
1157 case DNSAction::Action::Truncate
:
1160 vinfolog("Query from %s truncated because of dynamic block", dq
.remote
->toStringWithPort());
1166 vinfolog("Query from %s for %s over TCP *not* truncated because of dynamic block", dq
.remote
->toStringWithPort(), dq
.qname
->toLogString());
1169 case DNSAction::Action::NoRecurse
:
1171 vinfolog("Query from %s setting rd=0 because of dynamic block", dq
.remote
->toStringWithPort());
1176 vinfolog("Query from %s dropped because of dynamic block", dq
.remote
->toStringWithPort());
1182 if(auto got
= holders
.dynSMTBlock
->lookup(*dq
.qname
)) {
1183 auto updateBlockStats
= [&got
]() {
1184 ++g_stats
.dynBlocked
;
1188 if(now
< got
->until
) {
1189 DNSAction::Action action
= got
->action
;
1190 if (action
== DNSAction::Action::None
) {
1191 action
= g_dynBlockAction
;
1194 case DNSAction::Action::NoOp
:
1197 case DNSAction::Action::Nxdomain
:
1198 vinfolog("Query from %s for %s turned into NXDomain because of dynamic block", dq
.remote
->toStringWithPort(), dq
.qname
->toLogString());
1201 dq
.dh
->rcode
= RCode::NXDomain
;
1204 case DNSAction::Action::Refused
:
1205 vinfolog("Query from %s for %s refused because of dynamic block", dq
.remote
->toStringWithPort(), dq
.qname
->toLogString());
1208 dq
.dh
->rcode
= RCode::Refused
;
1211 case DNSAction::Action::Truncate
:
1215 vinfolog("Query from %s for %s truncated because of dynamic block", dq
.remote
->toStringWithPort(), dq
.qname
->toLogString());
1221 vinfolog("Query from %s for %s over TCP *not* truncated because of dynamic block", dq
.remote
->toStringWithPort(), dq
.qname
->toLogString());
1224 case DNSAction::Action::NoRecurse
:
1226 vinfolog("Query from %s setting rd=0 because of dynamic block", dq
.remote
->toStringWithPort());
1231 vinfolog("Query from %s for %s dropped because of dynamic block", dq
.remote
->toStringWithPort(), dq
.qname
->toLogString());
1237 DNSAction::Action action
=DNSAction::Action::None
;
1240 for(const auto& lr
: *holders
.rulactions
) {
1241 if(lr
.d_rule
->matches(&dq
)) {
1242 lr
.d_rule
->d_matches
++;
1243 action
=(*lr
.d_action
)(&dq
, &ruleresult
);
1244 if (processRulesResult(action
, dq
, ruleresult
, drop
)) {
1257 ssize_t
udpClientSendRequestToBackend(const std::shared_ptr
<DownstreamState
>& ss
, const int sd
, const char* request
, const size_t requestLen
, bool healthCheck
)
1261 if (ss
->sourceItf
== 0) {
1262 result
= send(sd
, request
, requestLen
, 0);
1267 cmsgbuf_aligned cbuf
;
1268 ComboAddress
remote(ss
->remote
);
1269 fillMSGHdr(&msgh
, &iov
, &cbuf
, sizeof(cbuf
), const_cast<char*>(request
), requestLen
, &remote
);
1270 addCMsgSrcAddr(&msgh
, &cbuf
, &ss
->sourceAddr
, ss
->sourceItf
);
1271 result
= sendmsg(sd
, &msgh
, 0);
1275 int savederrno
= errno
;
1276 vinfolog("Error sending request to backend %s: %d", ss
->remote
.toStringWithPort(), savederrno
);
1278 /* This might sound silly, but on Linux send() might fail with EINVAL
1279 if the interface the socket was bound to doesn't exist anymore.
1280 We don't want to reconnect the real socket if the healthcheck failed,
1281 because it's not using the same socket.
1283 if (!healthCheck
&& (savederrno
== EINVAL
|| savederrno
== ENODEV
)) {
1291 static bool isUDPQueryAcceptable(ClientState
& cs
, LocalHolders
& holders
, const struct msghdr
* msgh
, const ComboAddress
& remote
, ComboAddress
& dest
)
1293 if (msgh
->msg_flags
& MSG_TRUNC
) {
1294 /* message was too large for our buffer */
1295 vinfolog("Dropping message too large for our buffer");
1296 ++g_stats
.nonCompliantQueries
;
1300 if(!holders
.acl
->match(remote
)) {
1301 vinfolog("Query from %s dropped because of ACL", remote
.toStringWithPort());
1309 if (HarvestDestinationAddress(msgh
, &dest
)) {
1310 /* we don't get the port, only the address */
1311 dest
.sin4
.sin_port
= cs
.local
.sin4
.sin_port
;
1314 dest
.sin4
.sin_family
= 0;
1320 boost::optional
<std::vector
<uint8_t>> checkDNSCryptQuery(const ClientState
& cs
, const char* query
, uint16_t& len
, std::shared_ptr
<DNSCryptQuery
>& dnsCryptQuery
, time_t now
, bool tcp
)
1322 if (cs
.dnscryptCtx
) {
1323 #ifdef HAVE_DNSCRYPT
1324 vector
<uint8_t> response
;
1325 uint16_t decryptedQueryLen
= 0;
1327 dnsCryptQuery
= std::make_shared
<DNSCryptQuery
>(cs
.dnscryptCtx
);
1329 bool decrypted
= handleDNSCryptQuery(const_cast<char*>(query
), len
, dnsCryptQuery
, &decryptedQueryLen
, tcp
, now
, response
);
1332 if (response
.size() > 0) {
1335 throw std::runtime_error("Unable to decrypt DNSCrypt query, dropping.");
1338 len
= decryptedQueryLen
;
1339 #endif /* HAVE_DNSCRYPT */
1344 bool checkQueryHeaders(const struct dnsheader
* dh
)
1346 if (dh
->qr
) { // don't respond to responses
1347 ++g_stats
.nonCompliantQueries
;
1351 if (dh
->qdcount
== 0) {
1352 ++g_stats
.emptyQueries
;
1357 ++g_stats
.rdQueries
;
1363 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1364 static void queueResponse(const ClientState
& cs
, const char* response
, uint16_t responseLen
, const ComboAddress
& dest
, const ComboAddress
& remote
, struct mmsghdr
& outMsg
, struct iovec
* iov
, cmsgbuf_aligned
* cbuf
)
1367 fillMSGHdr(&outMsg
.msg_hdr
, iov
, nullptr, 0, const_cast<char*>(response
), responseLen
, const_cast<ComboAddress
*>(&remote
));
1369 if (dest
.sin4
.sin_family
== 0) {
1370 outMsg
.msg_hdr
.msg_control
= nullptr;
1373 addCMsgSrcAddr(&outMsg
.msg_hdr
, cbuf
, &dest
, 0);
1376 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1378 /* self-generated responses or cache hits */
1379 static bool prepareOutgoingResponse(LocalHolders
& holders
, ClientState
& cs
, DNSQuestion
& dq
, bool cacheHit
)
1381 DNSResponse
dr(dq
.qname
, dq
.qtype
, dq
.qclass
, dq
.consumed
, dq
.local
, dq
.remote
, reinterpret_cast<dnsheader
*>(dq
.dh
), dq
.size
, dq
.len
, dq
.tcp
, dq
.queryTime
);
1383 #ifdef HAVE_PROTOBUF
1384 dr
.uniqueId
= dq
.uniqueId
;
1387 dr
.delayMsec
= dq
.delayMsec
;
1389 if (!applyRulesToResponse(cacheHit
? holders
.cacheHitRespRulactions
: holders
.selfAnsweredRespRulactions
, dr
)) {
1393 /* in case a rule changed it */
1394 dq
.delayMsec
= dr
.delayMsec
;
1396 #ifdef HAVE_DNSCRYPT
1398 if (!encryptResponse(reinterpret_cast<char*>(dq
.dh
), &dq
.len
, dq
.size
, dq
.tcp
, dq
.dnsCryptQuery
, nullptr, nullptr)) {
1402 #endif /* HAVE_DNSCRYPT */
1405 ++g_stats
.cacheHits
;
1408 switch (dr
.dh
->rcode
) {
1409 case RCode::NXDomain
:
1410 ++g_stats
.frontendNXDomain
;
1412 case RCode::ServFail
:
1413 ++g_stats
.frontendServFail
;
1415 case RCode::NoError
:
1416 ++g_stats
.frontendNoError
;
1420 doLatencyStats(0); // we're not going to measure this
1424 ProcessQueryResult
processQuery(DNSQuestion
& dq
, ClientState
& cs
, LocalHolders
& holders
, std::shared_ptr
<DownstreamState
>& selectedBackend
)
1426 const uint16_t queryId
= ntohs(dq
.dh
->id
);
1429 /* we need an accurate ("real") value for the response and
1430 to store into the IDS, but not for insertion into the
1431 rings for example */
1432 struct timespec now
;
1435 if (!applyRulesToQuery(holders
, dq
, now
)) {
1436 return ProcessQueryResult::Drop
;
1439 if(dq
.dh
->qr
) { // something turned it into a response
1440 fixUpQueryTurnedResponse(dq
, dq
.origFlags
);
1442 if (!prepareOutgoingResponse(holders
, cs
, dq
, false)) {
1443 return ProcessQueryResult::Drop
;
1446 ++g_stats
.selfAnswered
;
1448 return ProcessQueryResult::SendAnswer
;
1451 std::shared_ptr
<ServerPool
> serverPool
= getPool(*holders
.pools
, dq
.poolname
);
1452 dq
.packetCache
= serverPool
->packetCache
;
1453 auto policy
= *(holders
.policy
);
1454 if (serverPool
->policy
!= nullptr) {
1455 policy
= *(serverPool
->policy
);
1457 auto servers
= serverPool
->getServers();
1459 std::lock_guard
<std::mutex
> lock(g_luamutex
);
1460 selectedBackend
= policy
.policy(servers
, &dq
);
1463 selectedBackend
= policy
.policy(servers
, &dq
);
1466 uint16_t cachedResponseSize
= dq
.size
;
1467 uint32_t allowExpired
= selectedBackend
? 0 : g_staleCacheEntriesTTL
;
1469 if (dq
.packetCache
&& !dq
.skipCache
) {
1470 dq
.dnssecOK
= (getEDNSZ(dq
) & EDNS_HEADER_FLAG_DO
);
1473 if (dq
.useECS
&& ((selectedBackend
&& selectedBackend
->useECS
) || (!selectedBackend
&& serverPool
->getECS()))) {
1474 // we special case our cache in case a downstream explicitly gave us a universally valid response with a 0 scope
1475 if (dq
.packetCache
&& !dq
.skipCache
&& (!selectedBackend
|| !selectedBackend
->disableZeroScope
) && dq
.packetCache
->isECSParsingEnabled()) {
1476 if (dq
.packetCache
->get(dq
, dq
.consumed
, dq
.dh
->id
, reinterpret_cast<char*>(dq
.dh
), &cachedResponseSize
, &dq
.cacheKeyNoECS
, dq
.subnet
, dq
.dnssecOK
, allowExpired
)) {
1477 dq
.len
= cachedResponseSize
;
1479 if (!prepareOutgoingResponse(holders
, cs
, dq
, true)) {
1480 return ProcessQueryResult::Drop
;
1483 return ProcessQueryResult::SendAnswer
;
1487 /* there was no existing ECS on the query, enable the zero-scope feature */
1488 dq
.useZeroScope
= true;
1492 if (!handleEDNSClientSubnet(dq
, &(dq
.ednsAdded
), &(dq
.ecsAdded
), g_preserveTrailingData
)) {
1493 vinfolog("Dropping query from %s because we couldn't insert the ECS value", dq
.remote
->toStringWithPort());
1494 return ProcessQueryResult::Drop
;
1498 if (dq
.packetCache
&& !dq
.skipCache
) {
1499 if (dq
.packetCache
->get(dq
, dq
.consumed
, dq
.dh
->id
, reinterpret_cast<char*>(dq
.dh
), &cachedResponseSize
, &dq
.cacheKey
, dq
.subnet
, dq
.dnssecOK
, allowExpired
)) {
1500 dq
.len
= cachedResponseSize
;
1502 if (!prepareOutgoingResponse(holders
, cs
, dq
, true)) {
1503 return ProcessQueryResult::Drop
;
1506 return ProcessQueryResult::SendAnswer
;
1508 ++g_stats
.cacheMisses
;
1511 if(!selectedBackend
) {
1514 vinfolog("%s query for %s|%s from %s, no policy applied", g_servFailOnNoPolicy
? "ServFailed" : "Dropped", dq
.qname
->toLogString(), QType(dq
.qtype
).getName(), dq
.remote
->toStringWithPort());
1515 if (g_servFailOnNoPolicy
) {
1516 restoreFlags(dq
.dh
, dq
.origFlags
);
1518 dq
.dh
->rcode
= RCode::ServFail
;
1521 if (!prepareOutgoingResponse(holders
, cs
, dq
, false)) {
1522 return ProcessQueryResult::Drop
;
1524 // no response-only statistics counter to update.
1525 return ProcessQueryResult::SendAnswer
;
1528 return ProcessQueryResult::Drop
;
1531 if (dq
.addXPF
&& selectedBackend
->xpfRRCode
!= 0) {
1532 addXPF(dq
, selectedBackend
->xpfRRCode
, g_preserveTrailingData
);
1535 selectedBackend
->queries
++;
1536 return ProcessQueryResult::PassToBackend
;
1538 catch(const std::exception
& e
){
1539 vinfolog("Got an error while parsing a %s query from %s, id %d: %s", (dq
.tcp
? "TCP" : "UDP"), dq
.remote
->toStringWithPort(), queryId
, e
.what());
1541 return ProcessQueryResult::Drop
;
1544 static void processUDPQuery(ClientState
& cs
, LocalHolders
& holders
, const struct msghdr
* msgh
, const ComboAddress
& remote
, ComboAddress
& dest
, char* query
, uint16_t len
, size_t queryBufferSize
, struct mmsghdr
* responsesVect
, unsigned int* queuedResponses
, struct iovec
* respIOV
, cmsgbuf_aligned
* respCBuf
)
1546 assert(responsesVect
== nullptr || (queuedResponses
!= nullptr && respIOV
!= nullptr && respCBuf
!= nullptr));
1547 uint16_t queryId
= 0;
1550 if (!isUDPQueryAcceptable(cs
, holders
, msgh
, remote
, dest
)) {
1554 /* we need an accurate ("real") value for the response and
1555 to store into the IDS, but not for insertion into the
1556 rings for example */
1557 struct timespec queryRealTime
;
1558 gettime(&queryRealTime
, true);
1560 std::shared_ptr
<DNSCryptQuery
> dnsCryptQuery
= nullptr;
1561 auto dnsCryptResponse
= checkDNSCryptQuery(cs
, query
, len
, dnsCryptQuery
, queryRealTime
.tv_sec
, false);
1562 if (dnsCryptResponse
) {
1563 sendUDPResponse(cs
.udpFD
, reinterpret_cast<char*>(dnsCryptResponse
->data()), static_cast<uint16_t>(dnsCryptResponse
->size()), 0, dest
, remote
);
1567 struct dnsheader
* dh
= reinterpret_cast<struct dnsheader
*>(query
);
1568 queryId
= ntohs(dh
->id
);
1570 if (!checkQueryHeaders(dh
)) {
1574 uint16_t qtype
, qclass
;
1575 unsigned int consumed
= 0;
1576 DNSName
qname(query
, len
, sizeof(dnsheader
), false, &qtype
, &qclass
, &consumed
);
1577 DNSQuestion
dq(&qname
, qtype
, qclass
, consumed
, dest
.sin4
.sin_family
!= 0 ? &dest
: &cs
.local
, &remote
, dh
, queryBufferSize
, len
, false, &queryRealTime
);
1578 dq
.dnsCryptQuery
= std::move(dnsCryptQuery
);
1579 std::shared_ptr
<DownstreamState
> ss
{nullptr};
1580 auto result
= processQuery(dq
, cs
, holders
, ss
);
1582 if (result
== ProcessQueryResult::Drop
) {
1586 if (result
== ProcessQueryResult::SendAnswer
) {
1587 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1588 if (dq
.delayMsec
== 0 && responsesVect
!= nullptr) {
1589 queueResponse(cs
, reinterpret_cast<char*>(dq
.dh
), dq
.len
, *dq
.local
, *dq
.remote
, responsesVect
[*queuedResponses
], respIOV
, respCBuf
);
1590 (*queuedResponses
)++;
1593 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1594 /* we use dest, always, because we don't want to use the listening address to send a response since it could be 0.0.0.0 */
1595 sendUDPResponse(cs
.udpFD
, reinterpret_cast<char*>(dq
.dh
), dq
.len
, dq
.delayMsec
, dest
, *dq
.remote
);
1599 if (result
!= ProcessQueryResult::PassToBackend
|| ss
== nullptr) {
1603 unsigned int idOffset
= (ss
->idOffset
++) % ss
->idStates
.size();
1604 IDState
* ids
= &ss
->idStates
[idOffset
];
1606 DOHUnit
* du
= nullptr;
1608 /* that means that the state was in use, possibly with an allocated
1609 DOHUnit that we will need to handle, but we can't touch it before
1610 confirming that we now own this state */
1611 if (ids
->isInUse()) {
1615 /* we atomically replace the value, we now own this state */
1616 if (!ids
->markAsUsed()) {
1617 /* the state was not in use.
1618 we reset 'du' because it might have still been in use when we read it. */
1623 /* we are reusing a state, no change in outstanding but if there was an existing DOHUnit we need
1624 to handle it because it's about to be overwritten. */
1627 ++g_stats
.downstreamTimeouts
;
1628 handleDOHTimeout(du
);
1632 ids
->origFD
= cs
.udpFD
;
1633 ids
->origID
= dh
->id
;
1634 setIDStateFromDNSQuestion(*ids
, dq
, std::move(qname
));
1636 /* If we couldn't harvest the real dest addr, still
1637 write down the listening addr since it will be useful
1638 (especially if it's not an 'any' one).
1639 We need to keep track of which one it is since we may
1640 want to use the real but not the listening addr to reply.
1642 if (dest
.sin4
.sin_family
!= 0) {
1643 ids
->origDest
= dest
;
1644 ids
->destHarvested
= true;
1647 ids
->origDest
= cs
.local
;
1648 ids
->destHarvested
= false;
1653 int fd
= pickBackendSocketForSending(ss
);
1654 ssize_t ret
= udpClientSendRequestToBackend(ss
, fd
, query
, dq
.len
);
1658 ++g_stats
.downstreamSendErrors
;
1661 vinfolog("Got query for %s|%s from %s, relayed to %s", ids
->qname
.toLogString(), QType(ids
->qtype
).getName(), remote
.toStringWithPort(), ss
->getName());
1663 catch(const std::exception
& e
){
1664 vinfolog("Got an error in UDP question thread while parsing a query from %s, id %d: %s", remote
.toStringWithPort(), queryId
, e
.what());
1668 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1669 static void MultipleMessagesUDPClientThread(ClientState
* cs
, LocalHolders
& holders
)
1674 ComboAddress remote
;
1677 /* used by HarvestDestinationAddress */
1678 cmsgbuf_aligned cbuf
;
1680 const size_t vectSize
= g_udpVectorSize
;
1681 /* the actual buffer is larger because:
1682 - we may have to add EDNS and/or ECS
1683 - we use it for self-generated responses (from rule or cache)
1684 but we only accept incoming payloads up to that size
1686 static_assert(s_udpIncomingBufferSize
<= sizeof(MMReceiver::packet
), "the incoming buffer size should not be larger than sizeof(MMReceiver::packet)");
1688 auto recvData
= std::unique_ptr
<MMReceiver
[]>(new MMReceiver
[vectSize
]);
1689 auto msgVec
= std::unique_ptr
<struct mmsghdr
[]>(new struct mmsghdr
[vectSize
]);
1690 auto outMsgVec
= std::unique_ptr
<struct mmsghdr
[]>(new struct mmsghdr
[vectSize
]);
1692 /* initialize the structures needed to receive our messages */
1693 for (size_t idx
= 0; idx
< vectSize
; idx
++) {
1694 recvData
[idx
].remote
.sin4
.sin_family
= cs
->local
.sin4
.sin_family
;
1695 fillMSGHdr(&msgVec
[idx
].msg_hdr
, &recvData
[idx
].iov
, &recvData
[idx
].cbuf
, sizeof(recvData
[idx
].cbuf
), recvData
[idx
].packet
, s_udpIncomingBufferSize
, &recvData
[idx
].remote
);
1701 /* reset the IO vector, since it's also used to send the vector of responses
1702 to avoid having to copy the data around */
1703 for (size_t idx
= 0; idx
< vectSize
; idx
++) {
1704 recvData
[idx
].iov
.iov_base
= recvData
[idx
].packet
;
1705 recvData
[idx
].iov
.iov_len
= sizeof(recvData
[idx
].packet
);
1708 /* block until we have at least one message ready, but return
1709 as many as possible to save the syscall costs */
1710 int msgsGot
= recvmmsg(cs
->udpFD
, msgVec
.get(), vectSize
, MSG_WAITFORONE
| MSG_TRUNC
, nullptr);
1713 vinfolog("Getting UDP messages via recvmmsg() failed with: %s", stringerror());
1717 unsigned int msgsToSend
= 0;
1719 /* process the received messages */
1720 for (int msgIdx
= 0; msgIdx
< msgsGot
; msgIdx
++) {
1721 const struct msghdr
* msgh
= &msgVec
[msgIdx
].msg_hdr
;
1722 unsigned int got
= msgVec
[msgIdx
].msg_len
;
1723 const ComboAddress
& remote
= recvData
[msgIdx
].remote
;
1725 if (static_cast<size_t>(got
) < sizeof(struct dnsheader
)) {
1726 ++g_stats
.nonCompliantQueries
;
1730 processUDPQuery(*cs
, holders
, msgh
, remote
, recvData
[msgIdx
].dest
, recvData
[msgIdx
].packet
, static_cast<uint16_t>(got
), sizeof(recvData
[msgIdx
].packet
), outMsgVec
.get(), &msgsToSend
, &recvData
[msgIdx
].iov
, &recvData
[msgIdx
].cbuf
);
1734 /* immediate (not delayed or sent to a backend) responses (mostly from a rule, dynamic block
1735 or the cache) can be sent in batch too */
1737 if (msgsToSend
> 0 && msgsToSend
<= static_cast<unsigned int>(msgsGot
)) {
1738 int sent
= sendmmsg(cs
->udpFD
, outMsgVec
.get(), msgsToSend
, 0);
1740 if (sent
< 0 || static_cast<unsigned int>(sent
) != msgsToSend
) {
1741 vinfolog("Error sending responses with sendmmsg() (%d on %u): %s", sent
, msgsToSend
, stringerror());
1747 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1749 // listens to incoming queries, sends out to downstream servers, noting the intended return path
1750 static void udpClientThread(ClientState
* cs
)
1753 setThreadName("dnsdist/udpClie");
1754 LocalHolders holders
;
1756 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1757 if (g_udpVectorSize
> 1) {
1758 MultipleMessagesUDPClientThread(cs
, holders
);
1762 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1765 /* the actual buffer is larger because:
1766 - we may have to add EDNS and/or ECS
1767 - we use it for self-generated responses (from rule or cache)
1768 but we only accept incoming payloads up to that size
1770 static_assert(s_udpIncomingBufferSize
<= sizeof(packet
), "the incoming buffer size should not be larger than sizeof(MMReceiver::packet)");
1773 /* used by HarvestDestinationAddress */
1774 cmsgbuf_aligned cbuf
;
1776 ComboAddress remote
;
1778 remote
.sin4
.sin_family
= cs
->local
.sin4
.sin_family
;
1779 fillMSGHdr(&msgh
, &iov
, &cbuf
, sizeof(cbuf
), packet
, sizeof(packet
), &remote
);
1782 ssize_t got
= recvmsg(cs
->udpFD
, &msgh
, 0);
1784 if (got
< 0 || static_cast<size_t>(got
) < sizeof(struct dnsheader
)) {
1785 ++g_stats
.nonCompliantQueries
;
1789 processUDPQuery(*cs
, holders
, &msgh
, remote
, dest
, packet
, static_cast<uint16_t>(got
), s_udpIncomingBufferSize
, nullptr, nullptr, nullptr, nullptr);
1793 catch(const std::exception
&e
)
1795 errlog("UDP client thread died because of exception: %s", e
.what());
1797 catch(const PDNSException
&e
)
1799 errlog("UDP client thread died because of PowerDNS exception: %s", e
.reason
);
1803 errlog("UDP client thread died because of an exception: %s", "unknown");
1806 uint16_t getRandomDNSID()
1808 #ifdef HAVE_LIBSODIUM
1809 return (randombytes_random() % 65536);
1811 return (random() % 65536);
1815 static bool upCheck(const shared_ptr
<DownstreamState
>& ds
)
1818 DNSName checkName
= ds
->checkName
;
1819 uint16_t checkType
= ds
->checkType
.getCode();
1820 uint16_t checkClass
= ds
->checkClass
;
1821 dnsheader checkHeader
;
1822 memset(&checkHeader
, 0, sizeof(checkHeader
));
1824 checkHeader
.qdcount
= htons(1);
1825 checkHeader
.id
= getRandomDNSID();
1827 checkHeader
.rd
= true;
1829 checkHeader
.cd
= true;
1832 if (ds
->checkFunction
) {
1833 std::lock_guard
<std::mutex
> lock(g_luamutex
);
1834 auto ret
= ds
->checkFunction(checkName
, checkType
, checkClass
, &checkHeader
);
1835 checkName
= std::get
<0>(ret
);
1836 checkType
= std::get
<1>(ret
);
1837 checkClass
= std::get
<2>(ret
);
1840 vector
<uint8_t> packet
;
1841 DNSPacketWriter
dpw(packet
, checkName
, checkType
, checkClass
);
1842 dnsheader
* requestHeader
= dpw
.getHeader();
1843 *requestHeader
= checkHeader
;
1845 Socket
sock(ds
->remote
.sin4
.sin_family
, SOCK_DGRAM
);
1846 sock
.setNonBlocking();
1847 if (!IsAnyAddress(ds
->sourceAddr
)) {
1848 sock
.setReuseAddr();
1849 sock
.bind(ds
->sourceAddr
);
1851 sock
.connect(ds
->remote
);
1852 ssize_t sent
= udpClientSendRequestToBackend(ds
, sock
.getHandle(), reinterpret_cast<char*>(&packet
[0]), packet
.size(), true);
1855 if (g_verboseHealthChecks
)
1856 infolog("Error while sending a health check query to backend %s: %d", ds
->getNameWithAddr(), ret
);
1860 int ret
= waitForRWData(sock
.getHandle(), true, /* ms to seconds */ ds
->checkTimeout
/ 1000, /* remaining ms to us */ (ds
->checkTimeout
% 1000) * 1000);
1861 if(ret
< 0 || !ret
) { // error, timeout, both are down!
1864 if (g_verboseHealthChecks
)
1865 infolog("Error while waiting for the health check response from backend %s: %d", ds
->getNameWithAddr(), ret
);
1868 if (g_verboseHealthChecks
)
1869 infolog("Timeout while waiting for the health check response from backend %s", ds
->getNameWithAddr());
1876 sock
.recvFrom(reply
, from
);
1878 /* we are using a connected socket but hey.. */
1879 if (from
!= ds
->remote
) {
1880 if (g_verboseHealthChecks
)
1881 infolog("Invalid health check response received from %s, expecting one from %s", from
.toStringWithPort(), ds
->remote
.toStringWithPort());
1885 const dnsheader
* responseHeader
= reinterpret_cast<const dnsheader
*>(reply
.c_str());
1887 if (reply
.size() < sizeof(*responseHeader
)) {
1888 if (g_verboseHealthChecks
)
1889 infolog("Invalid health check response of size %d from backend %s, expecting at least %d", reply
.size(), ds
->getNameWithAddr(), sizeof(*responseHeader
));
1893 if (responseHeader
->id
!= requestHeader
->id
) {
1894 if (g_verboseHealthChecks
)
1895 infolog("Invalid health check response id %d from backend %s, expecting %d", responseHeader
->id
, ds
->getNameWithAddr(), requestHeader
->id
);
1899 if (!responseHeader
->qr
) {
1900 if (g_verboseHealthChecks
)
1901 infolog("Invalid health check response from backend %s, expecting QR to be set", ds
->getNameWithAddr());
1905 if (responseHeader
->rcode
== RCode::ServFail
) {
1906 if (g_verboseHealthChecks
)
1907 infolog("Backend %s responded to health check with ServFail", ds
->getNameWithAddr());
1911 if (ds
->mustResolve
&& (responseHeader
->rcode
== RCode::NXDomain
|| responseHeader
->rcode
== RCode::Refused
)) {
1912 if (g_verboseHealthChecks
)
1913 infolog("Backend %s responded to health check with %s while mustResolve is set", ds
->getNameWithAddr(), responseHeader
->rcode
== RCode::NXDomain
? "NXDomain" : "Refused");
1917 uint16_t receivedType
;
1918 uint16_t receivedClass
;
1919 DNSName
receivedName(reply
.c_str(), reply
.size(), sizeof(dnsheader
), false, &receivedType
, &receivedClass
);
1921 if (receivedName
!= checkName
|| receivedType
!= checkType
|| receivedClass
!= checkClass
) {
1922 if (g_verboseHealthChecks
)
1923 infolog("Backend %s responded to health check with an invalid qname (%s vs %s), qtype (%s vs %s) or qclass (%d vs %d)", ds
->getNameWithAddr(), receivedName
.toLogString(), checkName
.toLogString(), QType(receivedType
).getName(), QType(checkType
).getName(), receivedClass
, checkClass
);
1929 catch(const std::exception
& e
)
1931 if (g_verboseHealthChecks
)
1932 infolog("Error checking the health of backend %s: %s", ds
->getNameWithAddr(), e
.what());
1937 if (g_verboseHealthChecks
)
1938 infolog("Unknown exception while checking the health of backend %s", ds
->getNameWithAddr());
1942 uint64_t g_maxTCPClientThreads
{10};
1943 std::atomic
<uint16_t> g_cacheCleaningDelay
{60};
1944 std::atomic
<uint16_t> g_cacheCleaningPercentage
{100};
1948 setThreadName("dnsdist/main");
1951 int32_t secondsToWaitLog
= 0;
1957 std::lock_guard
<std::mutex
> lock(g_luamutex
);
1958 auto f
= g_lua
.readVariable
<boost::optional
<std::function
<void()> > >("maintenance");
1962 secondsToWaitLog
= 0;
1964 catch(std::exception
&e
) {
1965 if (secondsToWaitLog
<= 0) {
1966 infolog("Error during execution of maintenance function: %s", e
.what());
1967 secondsToWaitLog
= 61;
1969 secondsToWaitLog
-= interval
;
1975 if (counter
>= g_cacheCleaningDelay
) {
1976 /* keep track, for each cache, of whether we should keep
1978 std::map
<std::shared_ptr
<DNSDistPacketCache
>, bool> caches
;
1980 /* gather all caches actually used by at least one pool, and see
1981 if something prevents us from cleaning the expired entries */
1982 auto localPools
= g_pools
.getLocal();
1983 for (const auto& entry
: *localPools
) {
1984 auto& pool
= entry
.second
;
1986 auto packetCache
= pool
->packetCache
;
1991 auto pair
= caches
.insert({packetCache
, false});
1992 auto& iter
= pair
.first
;
1993 /* if we need to keep stale data for this cache (ie, not clear
1994 expired entries when at least one pool using this cache
1995 has all its backends down) */
1996 if (packetCache
->keepStaleData() && iter
->second
== false) {
1997 /* so far all pools had at least one backend up */
1998 if (pool
->countServers(true) == 0) {
1999 iter
->second
= true;
2004 for (auto pair
: caches
) {
2005 /* shall we keep expired entries ? */
2006 if (pair
.second
== true) {
2009 auto& packetCache
= pair
.first
;
2010 size_t upTo
= (packetCache
->getMaxEntries()* (100 - g_cacheCleaningPercentage
)) / 100;
2011 packetCache
->purgeExpired(upTo
);
2016 // ponder pruning g_dynblocks of expired entries here
2020 static void secPollThread()
2022 setThreadName("dnsdist/secpoll");
2026 doSecPoll(g_secPollSuffix
);
2030 sleep(g_secPollInterval
);
2034 static void healthChecksThread()
2036 setThreadName("dnsdist/healthC");
2043 if(g_tcpclientthreads
->getQueuedCount() > 1 && !g_tcpclientthreads
->hasReachedMaxThreads())
2044 g_tcpclientthreads
->addTCPClientThread();
2046 auto states
= g_dstates
.getLocal(); // this points to the actual shared_ptrs!
2047 for(auto& dss
: *states
) {
2048 if(++dss
->lastCheck
< dss
->checkInterval
)
2051 if(dss
->availability
==DownstreamState::Availability::Auto
) {
2052 bool newState
=upCheck(dss
);
2054 /* check succeeded */
2055 dss
->currentCheckFailures
= 0;
2057 if (!dss
->upStatus
) {
2058 /* we were marked as down */
2059 dss
->consecutiveSuccessfulChecks
++;
2060 if (dss
->consecutiveSuccessfulChecks
< dss
->minRiseSuccesses
) {
2061 /* if we need more than one successful check to rise
2062 and we didn't reach the threshold yet,
2070 dss
->consecutiveSuccessfulChecks
= 0;
2072 if (dss
->upStatus
) {
2073 /* we are currently up */
2074 dss
->currentCheckFailures
++;
2075 if (dss
->currentCheckFailures
< dss
->maxCheckFailures
) {
2076 /* we need more than one failure to be marked as down,
2077 and we did not reach the threshold yet, let's stay down */
2083 if(newState
!= dss
->upStatus
) {
2084 warnlog("Marking downstream %s as '%s'", dss
->getNameWithAddr(), newState
? "up" : "down");
2086 if (newState
&& !dss
->connected
) {
2087 newState
= dss
->reconnect();
2089 if (dss
->connected
&& !dss
->threadStarted
.test_and_set()) {
2090 dss
->tid
= thread(responderThread
, dss
);
2094 dss
->upStatus
= newState
;
2095 dss
->currentCheckFailures
= 0;
2096 dss
->consecutiveSuccessfulChecks
= 0;
2097 if (g_snmpAgent
&& g_snmpTrapsEnabled
) {
2098 g_snmpAgent
->sendBackendStatusChangeTrap(dss
);
2103 auto delta
= dss
->sw
.udiffAndSet()/1000000.0;
2104 dss
->queryLoad
= 1.0*(dss
->queries
.load() - dss
->prev
.queries
.load())/delta
;
2105 dss
->dropRate
= 1.0*(dss
->reuseds
.load() - dss
->prev
.reuseds
.load())/delta
;
2106 dss
->prev
.queries
.store(dss
->queries
.load());
2107 dss
->prev
.reuseds
.store(dss
->reuseds
.load());
2109 for(IDState
& ids
: dss
->idStates
) { // timeouts
2110 int64_t usageIndicator
= ids
.usageIndicator
;
2111 if(IDState::isInUse(usageIndicator
) && ids
.age
++ > g_udpTimeout
) {
2112 /* We mark the state as unused as soon as possible
2113 to limit the risk of racing with the
2116 auto oldDU
= ids
.du
;
2118 if (!ids
.tryMarkUnused(usageIndicator
)) {
2119 /* this state has been altered in the meantime,
2120 don't go anywhere near it */
2124 handleDOHTimeout(oldDU
);
2128 ++g_stats
.downstreamTimeouts
; // this is an 'actively' discovered timeout
2129 vinfolog("Had a downstream timeout from %s (%s) for query for %s|%s from %s",
2130 dss
->remote
.toStringWithPort(), dss
->name
,
2131 ids
.qname
.toLogString(), QType(ids
.qtype
).getName(), ids
.origRemote
.toStringWithPort());
2136 struct dnsheader fake
;
2137 memset(&fake
, 0, sizeof(fake
));
2138 fake
.id
= ids
.origID
;
2140 g_rings
.insertResponse(ts
, ids
.origRemote
, ids
.qname
, ids
.qtype
, std::numeric_limits
<unsigned int>::max(), 0, fake
, dss
->remote
);
2147 static void bindAny(int af
, int sock
)
2149 __attribute__((unused
)) int one
= 1;
2152 if (setsockopt(sock
, IPPROTO_IP
, IP_FREEBIND
, &one
, sizeof(one
)) < 0)
2153 warnlog("Warning: IP_FREEBIND setsockopt failed: %s", stringerror());
2158 if (setsockopt(sock
, IPPROTO_IP
, IP_BINDANY
, &one
, sizeof(one
)) < 0)
2159 warnlog("Warning: IP_BINDANY setsockopt failed: %s", stringerror());
2163 if (setsockopt(sock
, IPPROTO_IPV6
, IPV6_BINDANY
, &one
, sizeof(one
)) < 0)
2164 warnlog("Warning: IPV6_BINDANY setsockopt failed: %s", stringerror());
2167 if (setsockopt(sock
, SOL_SOCKET
, SO_BINDANY
, &one
, sizeof(one
)) < 0)
2168 warnlog("Warning: SO_BINDANY setsockopt failed: %s", stringerror());
2172 static void dropGroupPrivs(gid_t gid
)
2175 if (setgid(gid
) == 0) {
2176 if (setgroups(0, NULL
) < 0) {
2177 warnlog("Warning: Unable to drop supplementary gids: %s", stringerror());
2181 warnlog("Warning: Unable to set group ID to %d: %s", gid
, stringerror());
2186 static void dropUserPrivs(uid_t uid
)
2189 if(setuid(uid
) < 0) {
2190 warnlog("Warning: Unable to set user ID to %d: %s", uid
, stringerror());
2195 static void checkFileDescriptorsLimits(size_t udpBindsCount
, size_t tcpBindsCount
)
2197 /* stdin, stdout, stderr */
2198 size_t requiredFDsCount
= 3;
2199 auto backends
= g_dstates
.getLocal();
2200 /* UDP sockets to backends */
2201 size_t backendUDPSocketsCount
= 0;
2202 for (const auto& backend
: *backends
) {
2203 backendUDPSocketsCount
+= backend
->sockets
.size();
2205 requiredFDsCount
+= backendUDPSocketsCount
;
2206 /* TCP sockets to backends */
2207 requiredFDsCount
+= (backends
->size() * g_maxTCPClientThreads
);
2208 /* listening sockets */
2209 requiredFDsCount
+= udpBindsCount
;
2210 requiredFDsCount
+= tcpBindsCount
;
2211 /* max TCP connections currently served */
2212 requiredFDsCount
+= g_maxTCPClientThreads
;
2213 /* max pipes for communicating between TCP acceptors and client threads */
2214 requiredFDsCount
+= (g_maxTCPClientThreads
* 2);
2215 /* max TCP queued connections */
2216 requiredFDsCount
+= g_maxTCPQueuedConnections
;
2217 /* DelayPipe pipe */
2218 requiredFDsCount
+= 2;
2221 /* webserver main socket */
2223 /* console main socket */
2230 getrlimit(RLIMIT_NOFILE
, &rl
);
2231 if (rl
.rlim_cur
<= requiredFDsCount
) {
2232 warnlog("Warning, this configuration can use more than %d file descriptors, web server and console connections not included, and the current limit is %d.", std::to_string(requiredFDsCount
), std::to_string(rl
.rlim_cur
));
2234 warnlog("You can increase this value by using LimitNOFILE= in the systemd unit file or ulimit.");
2236 warnlog("You can increase this value by using ulimit.");
2241 static void setUpLocalBind(std::unique_ptr
<ClientState
>& cs
)
2243 /* skip some warnings if there is an identical UDP context */
2244 bool warn
= cs
->tcp
== false || cs
->tlsFrontend
!= nullptr || cs
->dohFrontend
!= nullptr;
2245 int& fd
= cs
->tcp
== false ? cs
->udpFD
: cs
->tcpFD
;
2248 fd
= SSocket(cs
->local
.sin4
.sin_family
, cs
->tcp
== false ? SOCK_DGRAM
: SOCK_STREAM
, 0);
2251 SSetsockopt(fd
, SOL_SOCKET
, SO_REUSEADDR
, 1);
2252 #ifdef TCP_DEFER_ACCEPT
2253 SSetsockopt(fd
, IPPROTO_TCP
, TCP_DEFER_ACCEPT
, 1);
2255 if (cs
->fastOpenQueueSize
> 0) {
2257 SSetsockopt(fd
, IPPROTO_TCP
, TCP_FASTOPEN
, cs
->fastOpenQueueSize
);
2260 warnlog("TCP Fast Open has been configured on local address '%s' but is not supported", cs
->local
.toStringWithPort());
2266 if(cs
->local
.sin4
.sin_family
== AF_INET6
) {
2267 SSetsockopt(fd
, IPPROTO_IPV6
, IPV6_V6ONLY
, 1);
2270 bindAny(cs
->local
.sin4
.sin_family
, fd
);
2272 if(!cs
->tcp
&& IsAnyAddress(cs
->local
)) {
2274 setsockopt(fd
, IPPROTO_IP
, GEN_IP_PKTINFO
, &one
, sizeof(one
)); // linux supports this, so why not - might fail on other systems
2275 #ifdef IPV6_RECVPKTINFO
2276 setsockopt(fd
, IPPROTO_IPV6
, IPV6_RECVPKTINFO
, &one
, sizeof(one
));
2280 if (cs
->reuseport
) {
2282 SSetsockopt(fd
, SOL_SOCKET
, SO_REUSEPORT
, 1);
2285 /* no need to warn again if configured but support is not available, we already did for UDP */
2286 warnlog("SO_REUSEPORT has been configured on local address '%s' but is not supported", cs
->local
.toStringWithPort());
2292 if (cs
->local
.isIPv4()) {
2294 setSocketIgnorePMTU(cs
->udpFD
);
2296 catch(const std::exception
& e
) {
2297 warnlog("Failed to set IP_MTU_DISCOVER on UDP server socket for local address '%s': %s", cs
->local
.toStringWithPort(), e
.what());
2302 const std::string
& itf
= cs
->interface
;
2304 #ifdef SO_BINDTODEVICE
2305 int res
= setsockopt(fd
, SOL_SOCKET
, SO_BINDTODEVICE
, itf
.c_str(), itf
.length());
2307 warnlog("Error setting up the interface on local address '%s': %s", cs
->local
.toStringWithPort(), stringerror());
2311 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", cs
->local
.toStringWithPort());
2317 if (g_defaultBPFFilter
) {
2318 cs
->attachFilter(g_defaultBPFFilter
);
2319 vinfolog("Attaching default BPF Filter to %s frontend %s", (!cs
->tcp
? "UDP" : "TCP"), cs
->local
.toStringWithPort());
2321 #endif /* HAVE_EBPF */
2323 if (cs
->tlsFrontend
!= nullptr) {
2324 if (!cs
->tlsFrontend
->setupTLS()) {
2325 errlog("Error while setting up TLS on local address '%s', exiting", cs
->local
.toStringWithPort());
2326 _exit(EXIT_FAILURE
);
2330 if (cs
->dohFrontend
!= nullptr) {
2331 cs
->dohFrontend
->setup();
2334 SBind(fd
, cs
->local
);
2337 SListen(cs
->tcpFD
, SOMAXCONN
);
2338 if (cs
->tlsFrontend
!= nullptr) {
2339 warnlog("Listening on %s for TLS", cs
->local
.toStringWithPort());
2341 else if (cs
->dohFrontend
!= nullptr) {
2342 warnlog("Listening on %s for DoH", cs
->local
.toStringWithPort());
2344 else if (cs
->dnscryptCtx
!= nullptr) {
2345 warnlog("Listening on %s for DNSCrypt", cs
->local
.toStringWithPort());
2348 warnlog("Listening on %s", cs
->local
.toStringWithPort());
2357 vector
<string
> locals
;
2358 vector
<string
> remotes
;
2359 bool checkConfig
{false};
2360 bool beClient
{false};
2361 bool beSupervised
{false};
2368 std::atomic
<bool> g_configurationDone
{false};
2373 cout
<<"Syntax: dnsdist [-C,--config file] [-c,--client [IP[:PORT]]]\n";
2374 cout
<<"[-e,--execute cmd] [-h,--help] [-l,--local addr]\n";
2375 cout
<<"[-v,--verbose] [--check-config] [--version]\n";
2377 cout
<<"-a,--acl netmask Add this netmask to the ACL\n";
2378 cout
<<"-C,--config file Load configuration from 'file'\n";
2379 cout
<<"-c,--client Operate as a client, connect to dnsdist. This reads\n";
2380 cout
<<" controlSocket from your configuration file, but also\n";
2381 cout
<<" accepts an IP:PORT argument\n";
2382 #ifdef HAVE_LIBSODIUM
2383 cout
<<"-k,--setkey KEY Use KEY for encrypted communication to dnsdist. This\n";
2384 cout
<<" is similar to setting setKey in the configuration file.\n";
2385 cout
<<" NOTE: this will leak this key in your shell's history\n";
2386 cout
<<" and in the systems running process list.\n";
2388 cout
<<"--check-config Validate the configuration file and exit. The exit-code\n";
2389 cout
<<" reflects the validation, 0 is OK, 1 means an error.\n";
2390 cout
<<" Any errors are printed as well.\n";
2391 cout
<<"-e,--execute cmd Connect to dnsdist and execute 'cmd'\n";
2392 cout
<<"-g,--gid gid Change the process group ID after binding sockets\n";
2393 cout
<<"-h,--help Display this helpful message\n";
2394 cout
<<"-l,--local address Listen on this local address\n";
2395 cout
<<"--supervised Don't open a console, I'm supervised\n";
2396 cout
<<" (use with e.g. systemd and daemontools)\n";
2397 cout
<<"--disable-syslog Don't log to syslog, only to stdout\n";
2398 cout
<<" (use with e.g. systemd)\n";
2399 cout
<<"-u,--uid uid Change the process user ID after binding sockets\n";
2400 cout
<<"-v,--verbose Enable verbose mode\n";
2401 cout
<<"-V,--version Show dnsdist version information and exit\n";
2404 int main(int argc
, char** argv
)
2407 size_t udpBindsCount
= 0;
2408 size_t tcpBindsCount
= 0;
2409 rl_attempted_completion_function
= my_completion
;
2410 rl_completion_append_character
= 0;
2412 signal(SIGPIPE
, SIG_IGN
);
2413 signal(SIGCHLD
, SIG_IGN
);
2414 openlog("dnsdist", LOG_PID
|LOG_NDELAY
, LOG_DAEMON
);
2416 #ifdef HAVE_LIBSODIUM
2417 if (sodium_init() == -1) {
2418 cerr
<<"Unable to initialize crypto library"<<endl
;
2421 g_hashperturb
=randombytes_uniform(0xffffffff);
2422 srandom(randombytes_uniform(0xffffffff));
2426 gettimeofday(&tv
, 0);
2427 srandom(tv
.tv_sec
^ tv
.tv_usec
^ getpid());
2428 g_hashperturb
=random();
2432 ComboAddress clientAddress
= ComboAddress();
2433 g_cmdLine
.config
=SYSCONFDIR
"/dnsdist.conf";
2434 struct option longopts
[]={
2435 {"acl", required_argument
, 0, 'a'},
2436 {"check-config", no_argument
, 0, 1},
2437 {"client", no_argument
, 0, 'c'},
2438 {"config", required_argument
, 0, 'C'},
2439 {"disable-syslog", no_argument
, 0, 2},
2440 {"execute", required_argument
, 0, 'e'},
2441 {"gid", required_argument
, 0, 'g'},
2442 {"help", no_argument
, 0, 'h'},
2443 {"local", required_argument
, 0, 'l'},
2444 {"setkey", required_argument
, 0, 'k'},
2445 {"supervised", no_argument
, 0, 3},
2446 {"uid", required_argument
, 0, 'u'},
2447 {"verbose", no_argument
, 0, 'v'},
2448 {"version", no_argument
, 0, 'V'},
2454 int c
=getopt_long(argc
, argv
, "a:cC:e:g:hk:l:u:vV", longopts
, &longindex
);
2459 g_cmdLine
.checkConfig
=true;
2465 g_cmdLine
.beSupervised
=true;
2468 g_cmdLine
.config
=optarg
;
2471 g_cmdLine
.beClient
=true;
2474 g_cmdLine
.command
=optarg
;
2477 g_cmdLine
.gid
=optarg
;
2480 cout
<<"dnsdist "<<VERSION
<<endl
;
2487 g_ACL
.modify([optstring
](NetmaskGroup
& nmg
) { nmg
.addMask(optstring
); });
2490 #ifdef HAVE_LIBSODIUM
2491 if (B64Decode(string(optarg
), g_consoleKey
) < 0) {
2492 cerr
<<"Unable to decode key '"<<optarg
<<"'."<<endl
;
2496 cerr
<<"dnsdist has been built without libsodium, -k/--setkey is unsupported."<<endl
;
2501 g_cmdLine
.locals
.push_back(trim_copy(string(optarg
)));
2504 g_cmdLine
.uid
=optarg
;
2510 #ifdef LUAJIT_VERSION
2511 cout
<<"dnsdist "<<VERSION
<<" ("<<LUA_RELEASE
<<" ["<<LUAJIT_VERSION
<<"])"<<endl
;
2513 cout
<<"dnsdist "<<VERSION
<<" ("<<LUA_RELEASE
<<")"<<endl
;
2515 cout
<<"Enabled features: ";
2519 #ifdef HAVE_DNS_OVER_TLS
2520 cout
<<"dns-over-tls(";
2532 #ifdef HAVE_DNS_OVER_HTTPS
2533 cout
<<"dns-over-https(DOH) ";
2535 #ifdef HAVE_DNSCRYPT
2544 #ifdef HAVE_LIBCRYPTO
2547 #ifdef HAVE_LIBSODIUM
2553 #ifdef HAVE_PROTOBUF
2559 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
2560 cout
<<"recvmmsg/sendmmsg ";
2562 #ifdef HAVE_NET_SNMP
2572 //getopt_long printed an error message.
2581 for(auto p
= argv
; *p
; ++p
) {
2582 if(g_cmdLine
.beClient
) {
2583 clientAddress
= ComboAddress(*p
, 5199);
2585 g_cmdLine
.remotes
.push_back(*p
);
2589 ServerPolicy leastOutstandingPol
{"leastOutstanding", leastOutstanding
, false};
2591 g_policy
.setState(leastOutstandingPol
);
2592 if(g_cmdLine
.beClient
|| !g_cmdLine
.command
.empty()) {
2593 setupLua(true, g_cmdLine
.config
);
2594 if (clientAddress
!= ComboAddress())
2595 g_serverControl
= clientAddress
;
2596 doClient(g_serverControl
, g_cmdLine
.command
);
2597 _exit(EXIT_SUCCESS
);
2600 auto acl
= g_ACL
.getCopy();
2602 for(auto& addr
: {"127.0.0.0/8", "10.0.0.0/8", "100.64.0.0/10", "169.254.0.0/16", "192.168.0.0/16", "172.16.0.0/12", "::1/128", "fc00::/7", "fe80::/10"})
2604 g_ACL
.setState(acl
);
2607 auto consoleACL
= g_consoleACL
.getCopy();
2608 for (const auto& mask
: { "127.0.0.1/8", "::1/128" }) {
2609 consoleACL
.addMask(mask
);
2611 g_consoleACL
.setState(consoleACL
);
2613 if (g_cmdLine
.checkConfig
) {
2614 setupLua(true, g_cmdLine
.config
);
2615 // No exception was thrown
2616 infolog("Configuration '%s' OK!", g_cmdLine
.config
);
2617 _exit(EXIT_SUCCESS
);
2620 auto todo
=setupLua(false, g_cmdLine
.config
);
2622 auto localPools
= g_pools
.getCopy();
2624 bool precompute
= false;
2625 if (g_policy
.getLocal()->name
== "chashed") {
2628 for (const auto& entry
: localPools
) {
2629 if (entry
.second
->policy
!= nullptr && entry
.second
->policy
->name
== "chashed") {
2636 vinfolog("Pre-computing hashes for consistent hash load-balancing policy");
2637 // pre compute hashes
2638 auto backends
= g_dstates
.getLocal();
2639 for (auto& backend
: *backends
) {
2645 if (!g_cmdLine
.locals
.empty()) {
2646 for (auto it
= g_frontends
.begin(); it
!= g_frontends
.end(); ) {
2647 /* DoH, DoT and DNSCrypt frontends are separate */
2648 if ((*it
)->dohFrontend
== nullptr && (*it
)->tlsFrontend
== nullptr && (*it
)->dnscryptCtx
== nullptr) {
2649 it
= g_frontends
.erase(it
);
2656 for(const auto& loc
: g_cmdLine
.locals
) {
2658 g_frontends
.push_back(std::unique_ptr
<ClientState
>(new ClientState(ComboAddress(loc
, 53), false, false, 0, "", {})));
2660 g_frontends
.push_back(std::unique_ptr
<ClientState
>(new ClientState(ComboAddress(loc
, 53), true, false, 0, "", {})));
2664 if (g_frontends
.empty()) {
2666 g_frontends
.push_back(std::unique_ptr
<ClientState
>(new ClientState(ComboAddress("127.0.0.1", 53), false, false, 0, "", {})));
2668 g_frontends
.push_back(std::unique_ptr
<ClientState
>(new ClientState(ComboAddress("127.0.0.1", 53), true, false, 0, "", {})));
2671 g_configurationDone
= true;
2673 for(auto& frontend
: g_frontends
) {
2674 setUpLocalBind(frontend
);
2676 if (frontend
->tcp
== false) {
2684 warnlog("dnsdist %s comes with ABSOLUTELY NO WARRANTY. This is free software, and you are welcome to redistribute it according to the terms of the GPL version 2", VERSION
);
2688 g_ACL
.getLocal()->toStringVector(&vec
);
2689 for(const auto& s
: vec
) {
2694 infolog("ACL allowing queries from: %s", acls
.c_str());
2697 g_consoleACL
.getLocal()->toStringVector(&vec
);
2698 for (const auto& entry
: vec
) {
2699 if (!acls
.empty()) {
2704 infolog("Console ACL allowing connections from: %s", acls
.c_str());
2706 #ifdef HAVE_LIBSODIUM
2707 if (g_consoleEnabled
&& g_consoleKey
.empty()) {
2708 warnlog("Warning, the console has been enabled via 'controlSocket()' but no key has been set with 'setKey()' so all connections will fail until a key has been set");
2715 if(!g_cmdLine
.gid
.empty())
2716 newgid
= strToGID(g_cmdLine
.gid
.c_str());
2718 if(!g_cmdLine
.uid
.empty())
2719 newuid
= strToUID(g_cmdLine
.uid
.c_str());
2721 dropGroupPrivs(newgid
);
2722 dropUserPrivs(newuid
);
2724 /* we might still have capabilities remaining,
2725 for example if we have been started as root
2726 without --uid or --gid (please don't do that)
2727 or as an unprivileged user with ambient
2728 capabilities like CAP_NET_BIND_SERVICE.
2732 catch(const std::exception
& e
) {
2733 warnlog("%s", e
.what());
2736 /* this need to be done _after_ dropping privileges */
2737 g_delay
= new DelayPipe
<DelayedPacket
>();
2743 g_tcpclientthreads
= std::unique_ptr
<TCPClientCollection
>(new TCPClientCollection(g_maxTCPClientThreads
, g_useTCPSinglePipe
));
2748 localPools
= g_pools
.getCopy();
2749 /* create the default pool no matter what */
2750 createPoolIfNotExists(localPools
, "");
2751 if(g_cmdLine
.remotes
.size()) {
2752 for(const auto& address
: g_cmdLine
.remotes
) {
2753 auto ret
=std::make_shared
<DownstreamState
>(ComboAddress(address
, 53));
2754 addServerToPool(localPools
, "", ret
);
2755 if (ret
->connected
&& !ret
->threadStarted
.test_and_set()) {
2756 ret
->tid
= thread(responderThread
, ret
);
2758 g_dstates
.modify([ret
](servers_t
& servers
) { servers
.push_back(ret
); });
2761 g_pools
.setState(localPools
);
2763 if(g_dstates
.getLocal()->empty()) {
2764 errlog("No downstream servers defined: all packets will get dropped");
2765 // you might define them later, but you need to know
2768 checkFileDescriptorsLimits(udpBindsCount
, tcpBindsCount
);
2770 for(auto& dss
: g_dstates
.getCopy()) { // it is a copy, but the internal shared_ptrs are the real deal
2771 if(dss
->availability
==DownstreamState::Availability::Auto
) {
2772 bool newState
=upCheck(dss
);
2773 warnlog("Marking downstream %s as '%s'", dss
->getNameWithAddr(), newState
? "up" : "down");
2774 dss
->upStatus
= newState
;
2778 for(auto& cs
: g_frontends
) {
2779 if (cs
->dohFrontend
!= nullptr) {
2780 #ifdef HAVE_DNS_OVER_HTTPS
2781 std::thread
t1(dohThread
, cs
.get());
2782 if (!cs
->cpus
.empty()) {
2783 mapThreadToCPUList(t1
.native_handle(), cs
->cpus
);
2786 #endif /* HAVE_DNS_OVER_HTTPS */
2789 if (cs
->udpFD
>= 0) {
2790 thread
t1(udpClientThread
, cs
.get());
2791 if (!cs
->cpus
.empty()) {
2792 mapThreadToCPUList(t1
.native_handle(), cs
->cpus
);
2796 else if (cs
->tcpFD
>= 0) {
2797 thread
t1(tcpAcceptorThread
, cs
.get());
2798 if (!cs
->cpus
.empty()) {
2799 mapThreadToCPUList(t1
.native_handle(), cs
->cpus
);
2805 thread
carbonthread(carbonDumpThread
);
2806 carbonthread
.detach();
2808 thread
stattid(maintThread
);
2811 thread
healththread(healthChecksThread
);
2813 if (!g_secPollSuffix
.empty()) {
2814 thread
secpollthread(secPollThread
);
2815 secpollthread
.detach();
2818 if(g_cmdLine
.beSupervised
) {
2820 sd_notify(0, "READY=1");
2822 healththread
.join();
2825 healththread
.detach();
2828 _exit(EXIT_SUCCESS
);
2831 catch(const LuaContext::ExecutionErrorException
& e
) {
2833 errlog("Fatal Lua error: %s", e
.what());
2834 std::rethrow_if_nested(e
);
2835 } catch(const std::exception
& ne
) {
2836 errlog("Details: %s", ne
.what());
2838 catch(PDNSException
&ae
)
2840 errlog("Fatal pdns error: %s", ae
.reason
);
2842 _exit(EXIT_FAILURE
);
2844 catch(std::exception
&e
)
2846 errlog("Fatal error: %s", e
.what());
2847 _exit(EXIT_FAILURE
);
2849 catch(PDNSException
&ae
)
2851 errlog("Fatal pdns error: %s", ae
.reason
);
2852 _exit(EXIT_FAILURE
);
2855 uint64_t getLatencyCount(const std::string
&)
2857 return g_stats
.responses
+ g_stats
.selfAnswered
+ g_stats
.cacheHits
;