2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
29 #include <netinet/tcp.h>
31 #include <sys/resource.h>
34 #if defined (__OpenBSD__) || defined(__NetBSD__)
35 #include <readline/readline.h>
37 #include <editline/readline.h>
41 #include <systemd/sd-daemon.h>
45 #include "dnsdist-cache.hh"
46 #include "dnsdist-console.hh"
47 #include "dnsdist-ecs.hh"
48 #include "dnsdist-lua.hh"
49 #include "dnsdist-rings.hh"
50 #include "dnsdist-secpoll.hh"
51 #include "dnsdist-xpf.hh"
54 #include "delaypipe.hh"
57 #include "dnsparser.hh"
58 #include "dnswriter.hh"
59 #include "ednsoptions.hh"
63 #include "sodcrypto.hh"
65 #include "threadname.hh"
69 Receiver is currently single threaded
70 not *that* bad actually, but now that we are thread safe, might want to scale
74 Set of Rules, if one matches, it leads to an Action
75 Both rules and actions could conceivably be Lua based.
76 On the C++ side, both could be inherited from a class Rule and a class Action,
77 on the Lua side we can't do that. */
83 struct DNSDistStats g_stats
;
84 MetricDefinitionStorage g_metricDefinitions
;
86 uint16_t g_maxOutstanding
{10240};
87 bool g_verboseHealthChecks
{false};
88 uint32_t g_staleCacheEntriesTTL
{0};
90 bool g_allowEmptyResponse
{false};
92 GlobalStateHolder
<NetmaskGroup
> g_ACL
;
93 string g_outputBuffer
;
95 std::vector
<std::shared_ptr
<TLSFrontend
>> g_tlslocals
;
96 std::vector
<std::shared_ptr
<DOHFrontend
>> g_dohlocals
;
97 std::vector
<std::shared_ptr
<DNSCryptContext
>> g_dnsCryptLocals
;
99 shared_ptr
<BPFFilter
> g_defaultBPFFilter
;
100 std::vector
<std::shared_ptr
<DynBPFFilter
> > g_dynBPFFilters
;
101 #endif /* HAVE_EBPF */
102 std::vector
<std::unique_ptr
<ClientState
>> g_frontends
;
103 GlobalStateHolder
<pools_t
> g_pools
;
104 size_t g_udpVectorSize
{1};
106 bool g_snmpEnabled
{false};
107 bool g_snmpTrapsEnabled
{false};
108 DNSDistSNMPAgent
* g_snmpAgent
{nullptr};
110 /* UDP: the grand design. Per socket we listen on for incoming queries there is one thread.
111 Then we have a bunch of connected sockets for talking to downstream servers.
112 We send directly to those sockets.
114 For the return path, per downstream server we have a thread that listens to responses.
116 Per socket there is an array of 2^16 states, when we send out a packet downstream, we note
117 there the original requestor and the original id. The new ID is the offset in the array.
119 When an answer comes in on a socket, we look up the offset by the id, and lob it to the
122 IDs are assigned by atomic increments of the socket offset.
125 GlobalStateHolder
<vector
<DNSDistRuleAction
> > g_rulactions
;
126 GlobalStateHolder
<vector
<DNSDistResponseRuleAction
> > g_resprulactions
;
127 GlobalStateHolder
<vector
<DNSDistResponseRuleAction
> > g_cachehitresprulactions
;
128 GlobalStateHolder
<vector
<DNSDistResponseRuleAction
> > g_selfansweredresprulactions
;
133 GlobalStateHolder
<servers_t
> g_dstates
;
134 GlobalStateHolder
<NetmaskTree
<DynBlock
>> g_dynblockNMG
;
135 GlobalStateHolder
<SuffixMatchTree
<DynBlock
>> g_dynblockSMT
;
136 DNSAction::Action g_dynBlockAction
= DNSAction::Action::Drop
;
137 int g_tcpRecvTimeout
{2};
138 int g_tcpSendTimeout
{2};
141 bool g_servFailOnNoPolicy
{false};
142 bool g_truncateTC
{false};
143 bool g_fixupCase
{false};
144 bool g_preserveTrailingData
{false};
145 bool g_roundrobinFailOnNoServer
{false};
147 static void truncateTC(char* packet
, uint16_t* len
, size_t responseSize
, unsigned int consumed
)
150 bool hadEDNS
= false;
151 uint16_t payloadSize
= 0;
154 if (g_addEDNSToSelfGeneratedResponses
) {
155 hadEDNS
= getEDNSUDPPayloadSizeAndZ(packet
, *len
, &payloadSize
, &z
);
158 *len
=static_cast<uint16_t>(sizeof(dnsheader
)+consumed
+DNS_TYPE_SIZE
+DNS_CLASS_SIZE
);
159 struct dnsheader
* dh
= reinterpret_cast<struct dnsheader
*>(packet
);
160 dh
->ancount
= dh
->arcount
= dh
->nscount
= 0;
163 addEDNS(dh
, *len
, responseSize
, z
& EDNS_HEADER_FLAG_DO
, payloadSize
, 0);
175 ComboAddress destination
;
176 ComboAddress origDest
;
180 if(origDest
.sin4
.sin_family
== 0) {
181 res
= sendto(fd
, packet
.c_str(), packet
.size(), 0, (struct sockaddr
*)&destination
, destination
.getSocklen());
184 res
= sendfromto(fd
, packet
.c_str(), packet
.size(), 0, origDest
, destination
);
188 vinfolog("Error sending delayed response to %s: %s", destination
.toStringWithPort(), strerror(err
));
193 DelayPipe
<DelayedPacket
>* g_delay
= nullptr;
195 void doLatencyStats(double udiff
)
197 if(udiff
< 1000) ++g_stats
.latency0_1
;
198 else if(udiff
< 10000) ++g_stats
.latency1_10
;
199 else if(udiff
< 50000) ++g_stats
.latency10_50
;
200 else if(udiff
< 100000) ++g_stats
.latency50_100
;
201 else if(udiff
< 1000000) ++g_stats
.latency100_1000
;
202 else ++g_stats
.latencySlow
;
203 g_stats
.latencySum
+= udiff
/ 1000;
205 auto doAvg
= [](double& var
, double n
, double weight
) {
206 var
= (weight
-1) * var
/weight
+ n
/weight
;
209 doAvg(g_stats
.latencyAvg100
, udiff
, 100);
210 doAvg(g_stats
.latencyAvg1000
, udiff
, 1000);
211 doAvg(g_stats
.latencyAvg10000
, udiff
, 10000);
212 doAvg(g_stats
.latencyAvg1000000
, udiff
, 1000000);
215 bool responseContentMatches(const char* response
, const uint16_t responseLen
, const DNSName
& qname
, const uint16_t qtype
, const uint16_t qclass
, const ComboAddress
& remote
, unsigned int& consumed
)
217 if (responseLen
< sizeof(dnsheader
)) {
221 const struct dnsheader
* dh
= reinterpret_cast<const struct dnsheader
*>(response
);
222 if (dh
->qdcount
== 0) {
223 if ((dh
->rcode
!= RCode::NoError
&& dh
->rcode
!= RCode::NXDomain
) || g_allowEmptyResponse
) {
227 ++g_stats
.nonCompliantResponses
;
232 uint16_t rqtype
, rqclass
;
235 rqname
=DNSName(response
, responseLen
, sizeof(dnsheader
), false, &rqtype
, &rqclass
, &consumed
);
237 catch(const std::exception
& e
) {
238 if(responseLen
> 0 && static_cast<size_t>(responseLen
) > sizeof(dnsheader
)) {
239 infolog("Backend %s sent us a response with id %d that did not parse: %s", remote
.toStringWithPort(), ntohs(dh
->id
), e
.what());
241 ++g_stats
.nonCompliantResponses
;
245 if (rqtype
!= qtype
|| rqclass
!= qclass
|| rqname
!= qname
) {
252 static void restoreFlags(struct dnsheader
* dh
, uint16_t origFlags
)
254 static const uint16_t rdMask
= 1 << FLAGS_RD_OFFSET
;
255 static const uint16_t cdMask
= 1 << FLAGS_CD_OFFSET
;
256 static const uint16_t restoreFlagsMask
= UINT16_MAX
& ~(rdMask
| cdMask
);
257 uint16_t * flags
= getFlagsFromDNSHeader(dh
);
258 /* clear the flags we are about to restore */
259 *flags
&= restoreFlagsMask
;
260 /* only keep the flags we want to restore */
261 origFlags
&= ~restoreFlagsMask
;
262 /* set the saved flags as they were */
266 static bool fixUpQueryTurnedResponse(DNSQuestion
& dq
, const uint16_t origFlags
)
268 restoreFlags(dq
.dh
, origFlags
);
270 return addEDNSToQueryTurnedResponse(dq
);
273 static bool fixUpResponse(char** response
, uint16_t* responseLen
, size_t* responseSize
, const DNSName
& qname
, uint16_t origFlags
, bool ednsAdded
, bool ecsAdded
, std::vector
<uint8_t>& rewrittenResponse
, uint16_t addRoom
, bool* zeroScope
)
275 if (*responseLen
< sizeof(dnsheader
)) {
279 struct dnsheader
* dh
= reinterpret_cast<struct dnsheader
*>(*response
);
280 restoreFlags(dh
, origFlags
);
282 if (*responseLen
== sizeof(dnsheader
)) {
287 string realname
= qname
.toDNSString();
288 if (*responseLen
>= (sizeof(dnsheader
) + realname
.length())) {
289 memcpy(*response
+ sizeof(dnsheader
), realname
.c_str(), realname
.length());
293 if (ednsAdded
|| ecsAdded
) {
298 const std::string
responseStr(*response
, *responseLen
);
299 int res
= locateEDNSOptRR(responseStr
, &optStart
, &optLen
, &last
);
302 if (zeroScope
) { // this finds if an EDNS Client Subnet scope was set, and if it is 0
303 size_t optContentStart
= 0;
304 uint16_t optContentLen
= 0;
305 /* we need at least 4 bytes after the option length (family: 2, source prefix-length: 1, scope prefix-length: 1) */
306 if (isEDNSOptionInOpt(responseStr
, optStart
, optLen
, EDNSOptionCode::ECS
, &optContentStart
, &optContentLen
) && optContentLen
>= 4) {
307 /* see if the EDNS Client Subnet SCOPE PREFIX-LENGTH byte in position 3 is set to 0, which is the only thing
309 *zeroScope
= responseStr
.at(optContentStart
+ 3) == 0;
314 /* we added the entire OPT RR,
315 therefore we need to remove it entirely */
317 /* simply remove the last AR */
318 *responseLen
-= optLen
;
319 uint16_t arcount
= ntohs(dh
->arcount
);
321 dh
->arcount
= htons(arcount
);
324 /* Removing an intermediary RR could lead to compression error */
325 if (rewriteResponseWithoutEDNS(responseStr
, rewrittenResponse
) == 0) {
326 *responseLen
= rewrittenResponse
.size();
327 if (addRoom
&& (UINT16_MAX
- *responseLen
) > addRoom
) {
328 rewrittenResponse
.reserve(*responseLen
+ addRoom
);
330 *responseSize
= rewrittenResponse
.capacity();
331 *response
= reinterpret_cast<char*>(rewrittenResponse
.data());
334 warnlog("Error rewriting content");
339 /* the OPT RR was already present, but without ECS,
340 we need to remove the ECS option if any */
342 /* nothing after the OPT RR, we can simply remove the
344 size_t existingOptLen
= optLen
;
345 removeEDNSOptionFromOPT(*response
+ optStart
, &optLen
, EDNSOptionCode::ECS
);
346 *responseLen
-= (existingOptLen
- optLen
);
349 /* Removing an intermediary RR could lead to compression error */
350 if (rewriteResponseWithoutEDNSOption(responseStr
, EDNSOptionCode::ECS
, rewrittenResponse
) == 0) {
351 *responseLen
= rewrittenResponse
.size();
352 if (addRoom
&& (UINT16_MAX
- *responseLen
) > addRoom
) {
353 rewrittenResponse
.reserve(*responseLen
+ addRoom
);
355 *responseSize
= rewrittenResponse
.capacity();
356 *response
= reinterpret_cast<char*>(rewrittenResponse
.data());
359 warnlog("Error rewriting content");
370 static bool encryptResponse(char* response
, uint16_t* responseLen
, size_t responseSize
, bool tcp
, std::shared_ptr
<DNSCryptQuery
> dnsCryptQuery
, dnsheader
** dh
, dnsheader
* dhCopy
)
373 uint16_t encryptedResponseLen
= 0;
375 /* save the original header before encrypting it in place */
376 if (dh
!= nullptr && *dh
!= nullptr && dhCopy
!= nullptr) {
377 memcpy(dhCopy
, *dh
, sizeof(dnsheader
));
381 int res
= dnsCryptQuery
->encryptResponse(response
, *responseLen
, responseSize
, tcp
, &encryptedResponseLen
);
383 *responseLen
= encryptedResponseLen
;
385 /* dropping response */
386 vinfolog("Error encrypting the response, dropping.");
392 #endif /* HAVE_DNSCRYPT */
394 static bool applyRulesToResponse(LocalStateHolder
<vector
<DNSDistResponseRuleAction
> >& localRespRulactions
, DNSResponse
& dr
)
396 DNSResponseAction::Action action
=DNSResponseAction::Action::None
;
397 std::string ruleresult
;
398 for(const auto& lr
: *localRespRulactions
) {
399 if(lr
.d_rule
->matches(&dr
)) {
400 lr
.d_rule
->d_matches
++;
401 action
=(*lr
.d_action
)(&dr
, &ruleresult
);
403 case DNSResponseAction::Action::Allow
:
406 case DNSResponseAction::Action::Drop
:
409 case DNSResponseAction::Action::HeaderModify
:
412 case DNSResponseAction::Action::ServFail
:
413 dr
.dh
->rcode
= RCode::ServFail
;
416 /* non-terminal actions follow */
417 case DNSResponseAction::Action::Delay
:
418 dr
.delayMsec
= static_cast<int>(pdns_stou(ruleresult
)); // sorry
420 case DNSResponseAction::Action::None
:
429 bool processResponse(char** response
, uint16_t* responseLen
, size_t* responseSize
, LocalStateHolder
<vector
<DNSDistResponseRuleAction
> >& localRespRulactions
, DNSResponse
& dr
, size_t addRoom
, std::vector
<uint8_t>& rewrittenResponse
, bool muted
)
431 if (!applyRulesToResponse(localRespRulactions
, dr
)) {
435 bool zeroScope
= false;
436 if (!fixUpResponse(response
, responseLen
, responseSize
, *dr
.qname
, dr
.origFlags
, dr
.ednsAdded
, dr
.ecsAdded
, rewrittenResponse
, addRoom
, dr
.useZeroScope
? &zeroScope
: nullptr)) {
440 if (dr
.packetCache
&& !dr
.skipCache
) {
441 if (!dr
.useZeroScope
) {
442 /* if the query was not suitable for zero-scope, for
443 example because it had an existing ECS entry so the hash is
444 not really 'no ECS', so just insert it for the existing subnet
446 - we don't have the correct hash for a non-ECS query
447 - inserting with hash computed before the ECS replacement but with
448 the subnet extracted _after_ the replacement would not work.
452 // if zeroScope, pass the pre-ECS hash-key and do not pass the subnet to the cache
453 dr
.packetCache
->insert(zeroScope
? dr
.cacheKeyNoECS
: dr
.cacheKey
, zeroScope
? boost::none
: dr
.subnet
, dr
.origFlags
, dr
.dnssecOK
, *dr
.qname
, dr
.qtype
, dr
.qclass
, *response
, *responseLen
, dr
.tcp
, dr
.dh
->rcode
, dr
.tempFailureTTL
);
458 if (!encryptResponse(*response
, responseLen
, *responseSize
, dr
.tcp
, dr
.dnsCryptQuery
, nullptr, nullptr)) {
462 #endif /* HAVE_DNSCRYPT */
467 static bool sendUDPResponse(int origFD
, const char* response
, const uint16_t responseLen
, const int delayMsec
, const ComboAddress
& origDest
, const ComboAddress
& origRemote
)
469 if(delayMsec
&& g_delay
) {
470 DelayedPacket dp
{origFD
, string(response
,responseLen
), origRemote
, origDest
};
471 g_delay
->submit(dp
, delayMsec
);
475 if(origDest
.sin4
.sin_family
== 0) {
476 res
= sendto(origFD
, response
, responseLen
, 0, reinterpret_cast<const struct sockaddr
*>(&origRemote
), origRemote
.getSocklen());
479 res
= sendfromto(origFD
, response
, responseLen
, 0, origDest
, origRemote
);
483 vinfolog("Error sending response to %s: %s", origRemote
.toStringWithPort(), strerror(err
));
491 int pickBackendSocketForSending(std::shared_ptr
<DownstreamState
>& state
)
493 return state
->sockets
[state
->socketsOffset
++ % state
->sockets
.size()];
496 static void pickBackendSocketsReadyForReceiving(const std::shared_ptr
<DownstreamState
>& state
, std::vector
<int>& ready
)
500 if (state
->sockets
.size() == 1) {
501 ready
.push_back(state
->sockets
[0]);
506 std::lock_guard
<std::mutex
> lock(state
->socketsLock
);
507 state
->mplexer
->getAvailableFDs(ready
, -1);
511 // listens on a dedicated socket, lobs answers from downstream servers to original requestors
512 void responderThread(std::shared_ptr
<DownstreamState
> dss
)
514 setThreadName("dnsdist/respond");
515 auto localRespRulactions
= g_resprulactions
.getLocal();
516 char packet
[4096 + DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE
];
517 static_assert(sizeof(packet
) <= UINT16_MAX
, "Packet size should fit in a uint16_t");
518 /* when the answer is encrypted in place, we need to get a copy
519 of the original header before encryption to fill the ring buffer */
520 dnsheader cleartextDH
;
521 vector
<uint8_t> rewrittenResponse
;
523 uint16_t queryId
= 0;
524 std::vector
<int> sockets
;
525 sockets
.reserve(dss
->sockets
.size());
528 dnsheader
* dh
= reinterpret_cast<struct dnsheader
*>(packet
);
530 pickBackendSocketsReadyForReceiving(dss
, sockets
);
531 for (const auto& fd
: sockets
) {
532 ssize_t got
= recv(fd
, packet
, sizeof(packet
), 0);
533 char * response
= packet
;
534 size_t responseSize
= sizeof(packet
);
536 if (got
< 0 || static_cast<size_t>(got
) < sizeof(dnsheader
))
539 uint16_t responseLen
= static_cast<uint16_t>(got
);
542 if(queryId
>= dss
->idStates
.size()) {
546 IDState
* ids
= &dss
->idStates
[queryId
];
547 int origFD
= ids
->origFD
;
549 if(origFD
< 0 && ids
->du
== nullptr) // duplicate
552 /* setting age to 0 to prevent the maintainer thread from
553 cleaning this IDS while we process the response.
554 We have already a copy of the origFD, so it would
555 mostly mess up the outstanding counter.
559 unsigned int consumed
= 0;
560 if (!responseContentMatches(response
, responseLen
, ids
->qname
, ids
->qtype
, ids
->qclass
, dss
->remote
, consumed
)) {
564 int oldFD
= ids
->origFD
.exchange(-1);
565 if (oldFD
== origFD
) {
566 /* we only decrement the outstanding counter if the value was not
567 altered in the meantime, which would mean that the state has been actively reused
568 and the other thread has not incremented the outstanding counter, so we don't
569 want it to be decremented twice. */
570 --dss
->outstanding
; // you'd think an attacker could game this, but we're using connected socket
573 if(dh
->tc
&& g_truncateTC
) {
574 truncateTC(response
, &responseLen
, responseSize
, consumed
);
577 dh
->id
= ids
->origID
;
579 uint16_t addRoom
= 0;
580 DNSResponse dr
= makeDNSResponseFromIDState(*ids
, dh
, sizeof(packet
), responseLen
, false);
581 if (dr
.dnsCryptQuery
) {
582 addRoom
= DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE
;
585 memcpy(&cleartextDH
, dr
.dh
, sizeof(cleartextDH
));
586 if (!processResponse(&response
, &responseLen
, &responseSize
, localRespRulactions
, dr
, addRoom
, rewrittenResponse
, ids
->cs
&& ids
->cs
->muted
)) {
590 if (ids
->cs
&& !ids
->cs
->muted
) {
592 #ifdef HAVE_DNS_OVER_HTTPS
594 ids
->du
->query
= std::string(response
, responseLen
);
595 if (send(ids
->du
->rsock
, &ids
->du
, sizeof(ids
->du
), 0) != sizeof(ids
->du
)) {
598 #endif /* HAVE_DNS_OVER_HTTPS */
603 empty
.sin4
.sin_family
= 0;
604 /* if ids->destHarvested is false, origDest holds the listening address.
605 We don't want to use that as a source since it could be 0.0.0.0 for example. */
606 sendUDPResponse(origFD
, response
, responseLen
, dr
.delayMsec
, ids
->destHarvested
? ids
->origDest
: empty
, ids
->origRemote
);
612 double udiff
= ids
->sentTime
.udiff();
613 vinfolog("Got answer from %s, relayed to %s%s, took %f usec", dss
->remote
.toStringWithPort(), ids
->origRemote
.toStringWithPort(),
614 ids
->du
? " (https)": "", udiff
);
618 g_rings
.insertResponse(ts
, *dr
.remote
, *dr
.qname
, dr
.qtype
, static_cast<unsigned int>(udiff
), static_cast<unsigned int>(got
), cleartextDH
, dss
->remote
);
621 case RCode::NXDomain
:
622 ++g_stats
.frontendNXDomain
;
624 case RCode::ServFail
:
625 ++g_stats
.servfailResponses
;
626 ++g_stats
.frontendServFail
;
629 ++g_stats
.frontendNoError
;
632 dss
->latencyUsec
= (127.0 * dss
->latencyUsec
/ 128.0) + udiff
/128.0;
634 doLatencyStats(udiff
);
636 rewrittenResponse
.clear();
639 catch(const std::exception
& e
){
640 vinfolog("Got an error in UDP responder thread while parsing a response from %s, id %d: %s", dss
->remote
.toStringWithPort(), queryId
, e
.what());
644 catch(const std::exception
& e
)
646 errlog("UDP responder thread died because of exception: %s", e
.what());
648 catch(const PDNSException
& e
)
650 errlog("UDP responder thread died because of PowerDNS exception: %s", e
.reason
);
654 errlog("UDP responder thread died because of an exception: %s", "unknown");
657 bool DownstreamState::reconnect()
659 std::unique_lock
<std::mutex
> tl(connectLock
, std::try_to_lock
);
660 if (!tl
.owns_lock()) {
661 /* we are already reconnecting */
666 for (auto& fd
: sockets
) {
668 if (sockets
.size() > 1) {
669 std::lock_guard
<std::mutex
> lock(socketsLock
);
670 mplexer
->removeReadFD(fd
);
672 /* shutdown() is needed to wake up recv() in the responderThread */
673 shutdown(fd
, SHUT_RDWR
);
677 if (!IsAnyAddress(remote
)) {
678 fd
= SSocket(remote
.sin4
.sin_family
, SOCK_DGRAM
, 0);
679 if (!IsAnyAddress(sourceAddr
)) {
680 SSetsockopt(fd
, SOL_SOCKET
, SO_REUSEADDR
, 1);
681 SBind(fd
, sourceAddr
);
684 SConnect(fd
, remote
);
685 if (sockets
.size() > 1) {
686 std::lock_guard
<std::mutex
> lock(socketsLock
);
687 mplexer
->addReadFD(fd
, [](int, boost::any
) {});
691 catch(const std::runtime_error
& error
) {
692 infolog("Error connecting to new server with address %s: %s", remote
.toStringWithPort(), error
.what());
699 /* if at least one (re-)connection failed, close all sockets */
701 for (auto& fd
: sockets
) {
703 if (sockets
.size() > 1) {
704 std::lock_guard
<std::mutex
> lock(socketsLock
);
705 mplexer
->removeReadFD(fd
);
707 /* shutdown() is needed to wake up recv() in the responderThread */
708 shutdown(fd
, SHUT_RDWR
);
717 void DownstreamState::hash()
719 vinfolog("Computing hashes for id=%s and weight=%d", id
, weight
);
721 WriteLock
wl(&d_lock
);
724 std::string uuid
= boost::str(boost::format("%s-%d") % id
% w
);
725 unsigned int wshash
= burtleCI((const unsigned char*)uuid
.c_str(), uuid
.size(), g_hashperturb
);
726 hashes
.insert(wshash
);
731 void DownstreamState::setId(const boost::uuids::uuid
& newId
)
734 // compute hashes only if already done
735 if (!hashes
.empty()) {
740 void DownstreamState::setWeight(int newWeight
)
743 errlog("Error setting server's weight: downstream weight value must be greater than 0.");
747 if (!hashes
.empty()) {
752 DownstreamState::DownstreamState(const ComboAddress
& remote_
, const ComboAddress
& sourceAddr_
, unsigned int sourceItf_
, size_t numberOfSockets
): remote(remote_
), sourceAddr(sourceAddr_
), sourceItf(sourceItf_
)
754 pthread_rwlock_init(&d_lock
, nullptr);
756 threadStarted
.clear();
758 mplexer
= std::unique_ptr
<FDMultiplexer
>(FDMultiplexer::getMultiplexerSilent());
760 sockets
.resize(numberOfSockets
);
761 for (auto& fd
: sockets
) {
765 if (!IsAnyAddress(remote
)) {
767 idStates
.resize(g_maxOutstanding
);
769 infolog("Added downstream server %s", remote
.toStringWithPort());
774 std::mutex g_luamutex
;
777 GlobalStateHolder
<ServerPolicy
> g_policy
;
779 shared_ptr
<DownstreamState
> firstAvailable(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
781 for(auto& d
: servers
) {
782 if(d
.second
->isUp() && d
.second
->qps
.check())
785 return leastOutstanding(servers
, dq
);
788 // get server with least outstanding queries, and within those, with the lowest order, and within those: the fastest
789 shared_ptr
<DownstreamState
> leastOutstanding(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
791 if (servers
.size() == 1 && servers
[0].second
->isUp()) {
792 return servers
[0].second
;
795 vector
<pair
<tuple
<int,int,double>, shared_ptr
<DownstreamState
>>> poss
;
796 /* so you might wonder, why do we go through this trouble? The data on which we sort could change during the sort,
797 which would suck royally and could even lead to crashes. So first we snapshot on what we sort, and then we sort */
798 poss
.reserve(servers
.size());
799 for(auto& d
: servers
) {
800 if(d
.second
->isUp()) {
801 poss
.push_back({make_tuple(d
.second
->outstanding
.load(), d
.second
->order
, d
.second
->latencyUsec
), d
.second
});
805 return shared_ptr
<DownstreamState
>();
806 nth_element(poss
.begin(), poss
.begin(), poss
.end(), [](const decltype(poss
)::value_type
& a
, const decltype(poss
)::value_type
& b
) { return a
.first
< b
.first
; });
807 return poss
.begin()->second
;
810 shared_ptr
<DownstreamState
> valrandom(unsigned int val
, const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
812 vector
<pair
<int, shared_ptr
<DownstreamState
>>> poss
;
814 int max
= std::numeric_limits
<int>::max();
816 for(auto& d
: servers
) { // w=1, w=10 -> 1, 11
817 if(d
.second
->isUp()) {
818 // Don't overflow sum when adding high weights
819 if(d
.second
->weight
> max
- sum
) {
822 sum
+= d
.second
->weight
;
825 poss
.push_back({sum
, d
.second
});
829 // Catch poss & sum are empty to avoid SIGFPE
831 return shared_ptr
<DownstreamState
>();
834 auto p
= upper_bound(poss
.begin(), poss
.end(),r
, [](int r_
, const decltype(poss
)::value_type
& a
) { return r_
< a
.first
;});
836 return shared_ptr
<DownstreamState
>();
840 shared_ptr
<DownstreamState
> wrandom(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
842 return valrandom(random(), servers
, dq
);
845 uint32_t g_hashperturb
;
846 shared_ptr
<DownstreamState
> whashed(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
848 return valrandom(dq
->qname
->hash(g_hashperturb
), servers
, dq
);
851 shared_ptr
<DownstreamState
> chashed(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
853 unsigned int qhash
= dq
->qname
->hash(g_hashperturb
);
854 unsigned int sel
= std::numeric_limits
<unsigned int>::max();
855 unsigned int min
= std::numeric_limits
<unsigned int>::max();
856 shared_ptr
<DownstreamState
> ret
= nullptr, first
= nullptr;
858 for (const auto& d
: servers
) {
859 if (d
.second
->isUp()) {
860 // make sure hashes have been computed
861 if (d
.second
->hashes
.empty()) {
865 ReadLock
rl(&(d
.second
->d_lock
));
866 const auto& server
= d
.second
;
867 // we want to keep track of the last hash
868 if (min
> *(server
->hashes
.begin())) {
869 min
= *(server
->hashes
.begin());
873 auto hash_it
= server
->hashes
.lower_bound(qhash
);
874 if (hash_it
!= server
->hashes
.end()) {
875 if (*hash_it
< sel
) {
883 if (ret
!= nullptr) {
886 if (first
!= nullptr) {
889 return shared_ptr
<DownstreamState
>();
892 shared_ptr
<DownstreamState
> roundrobin(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
894 NumberedServerVector poss
;
896 for(auto& d
: servers
) {
897 if(d
.second
->isUp()) {
902 const auto *res
=&poss
;
903 if(poss
.empty() && !g_roundrobinFailOnNoServer
)
907 return shared_ptr
<DownstreamState
>();
909 static unsigned int counter
;
911 return (*res
)[(counter
++) % res
->size()].second
;
914 ComboAddress g_serverControl
{"127.0.0.1:5199"};
916 std::shared_ptr
<ServerPool
> createPoolIfNotExists(pools_t
& pools
, const string
& poolName
)
918 std::shared_ptr
<ServerPool
> pool
;
919 pools_t::iterator it
= pools
.find(poolName
);
920 if (it
!= pools
.end()) {
924 if (!poolName
.empty())
925 vinfolog("Creating pool %s", poolName
);
926 pool
= std::make_shared
<ServerPool
>();
927 pools
.insert(std::pair
<std::string
,std::shared_ptr
<ServerPool
> >(poolName
, pool
));
932 void setPoolPolicy(pools_t
& pools
, const string
& poolName
, std::shared_ptr
<ServerPolicy
> policy
)
934 std::shared_ptr
<ServerPool
> pool
= createPoolIfNotExists(pools
, poolName
);
935 if (!poolName
.empty()) {
936 vinfolog("Setting pool %s server selection policy to %s", poolName
, policy
->name
);
938 vinfolog("Setting default pool server selection policy to %s", policy
->name
);
940 pool
->policy
= policy
;
943 void addServerToPool(pools_t
& pools
, const string
& poolName
, std::shared_ptr
<DownstreamState
> server
)
945 std::shared_ptr
<ServerPool
> pool
= createPoolIfNotExists(pools
, poolName
);
946 if (!poolName
.empty()) {
947 vinfolog("Adding server to pool %s", poolName
);
949 vinfolog("Adding server to default pool");
951 pool
->addServer(server
);
954 void removeServerFromPool(pools_t
& pools
, const string
& poolName
, std::shared_ptr
<DownstreamState
> server
)
956 std::shared_ptr
<ServerPool
> pool
= getPool(pools
, poolName
);
958 if (!poolName
.empty()) {
959 vinfolog("Removing server from pool %s", poolName
);
962 vinfolog("Removing server from default pool");
965 pool
->removeServer(server
);
968 std::shared_ptr
<ServerPool
> getPool(const pools_t
& pools
, const std::string
& poolName
)
970 pools_t::const_iterator it
= pools
.find(poolName
);
972 if (it
== pools
.end()) {
973 throw std::out_of_range("No pool named " + poolName
);
979 NumberedServerVector
getDownstreamCandidates(const pools_t
& pools
, const std::string
& poolName
)
981 std::shared_ptr
<ServerPool
> pool
= getPool(pools
, poolName
);
982 return pool
->getServers();
985 static void spoofResponseFromString(DNSQuestion
& dq
, const string
& spoofContent
)
989 std::vector
<std::string
> addrs
;
990 stringtok(addrs
, spoofContent
, " ,");
992 if (addrs
.size() == 1) {
994 ComboAddress
spoofAddr(spoofContent
);
995 SpoofAction
sa({spoofAddr
});
998 catch(const PDNSException
&e
) {
999 SpoofAction
sa(spoofContent
); // CNAME then
1003 std::vector
<ComboAddress
> cas
;
1004 for (const auto& addr
: addrs
) {
1006 cas
.push_back(ComboAddress(addr
));
1011 SpoofAction
sa(cas
);
1016 static bool applyRulesToQuery(LocalHolders
& holders
, DNSQuestion
& dq
, string
& poolname
, const struct timespec
& now
)
1018 g_rings
.insertQuery(now
, *dq
.remote
, *dq
.qname
, dq
.qtype
, dq
.len
, *dq
.dh
);
1020 if(g_qcount
.enabled
) {
1021 string qname
= (*dq
.qname
).toString(".");
1022 bool countQuery
{true};
1023 if(g_qcount
.filter
) {
1024 std::lock_guard
<std::mutex
> lock(g_luamutex
);
1025 std::tie (countQuery
, qname
) = g_qcount
.filter(&dq
);
1029 WriteLock
wl(&g_qcount
.queryLock
);
1030 if(!g_qcount
.records
.count(qname
)) {
1031 g_qcount
.records
[qname
] = 0;
1033 g_qcount
.records
[qname
]++;
1037 if(auto got
= holders
.dynNMGBlock
->lookup(*dq
.remote
)) {
1038 auto updateBlockStats
= [&got
]() {
1039 ++g_stats
.dynBlocked
;
1040 got
->second
.blocks
++;
1043 if(now
< got
->second
.until
) {
1044 DNSAction::Action action
= got
->second
.action
;
1045 if (action
== DNSAction::Action::None
) {
1046 action
= g_dynBlockAction
;
1049 case DNSAction::Action::NoOp
:
1053 case DNSAction::Action::Nxdomain
:
1054 vinfolog("Query from %s turned into NXDomain because of dynamic block", dq
.remote
->toStringWithPort());
1057 dq
.dh
->rcode
= RCode::NXDomain
;
1061 case DNSAction::Action::Refused
:
1062 vinfolog("Query from %s refused because of dynamic block", dq
.remote
->toStringWithPort());
1065 dq
.dh
->rcode
= RCode::Refused
;
1069 case DNSAction::Action::Truncate
:
1072 vinfolog("Query from %s truncated because of dynamic block", dq
.remote
->toStringWithPort());
1078 vinfolog("Query from %s for %s over TCP *not* truncated because of dynamic block", dq
.remote
->toStringWithPort(), dq
.qname
->toString());
1081 case DNSAction::Action::NoRecurse
:
1083 vinfolog("Query from %s setting rd=0 because of dynamic block", dq
.remote
->toStringWithPort());
1088 vinfolog("Query from %s dropped because of dynamic block", dq
.remote
->toStringWithPort());
1094 if(auto got
= holders
.dynSMTBlock
->lookup(*dq
.qname
)) {
1095 auto updateBlockStats
= [&got
]() {
1096 ++g_stats
.dynBlocked
;
1100 if(now
< got
->until
) {
1101 DNSAction::Action action
= got
->action
;
1102 if (action
== DNSAction::Action::None
) {
1103 action
= g_dynBlockAction
;
1106 case DNSAction::Action::NoOp
:
1109 case DNSAction::Action::Nxdomain
:
1110 vinfolog("Query from %s for %s turned into NXDomain because of dynamic block", dq
.remote
->toStringWithPort(), dq
.qname
->toString());
1113 dq
.dh
->rcode
= RCode::NXDomain
;
1116 case DNSAction::Action::Refused
:
1117 vinfolog("Query from %s for %s refused because of dynamic block", dq
.remote
->toStringWithPort(), dq
.qname
->toString());
1120 dq
.dh
->rcode
= RCode::Refused
;
1123 case DNSAction::Action::Truncate
:
1127 vinfolog("Query from %s for %s truncated because of dynamic block", dq
.remote
->toStringWithPort(), dq
.qname
->toString());
1133 vinfolog("Query from %s for %s over TCP *not* truncated because of dynamic block", dq
.remote
->toStringWithPort(), dq
.qname
->toString());
1136 case DNSAction::Action::NoRecurse
:
1138 vinfolog("Query from %s setting rd=0 because of dynamic block", dq
.remote
->toStringWithPort());
1143 vinfolog("Query from %s for %s dropped because of dynamic block", dq
.remote
->toStringWithPort(), dq
.qname
->toString());
1149 DNSAction::Action action
=DNSAction::Action::None
;
1151 for(const auto& lr
: *holders
.rulactions
) {
1152 if(lr
.d_rule
->matches(&dq
)) {
1153 lr
.d_rule
->d_matches
++;
1154 action
=(*lr
.d_action
)(&dq
, &ruleresult
);
1157 case DNSAction::Action::Allow
:
1160 case DNSAction::Action::Drop
:
1164 case DNSAction::Action::Nxdomain
:
1165 dq
.dh
->rcode
= RCode::NXDomain
;
1167 ++g_stats
.ruleNXDomain
;
1170 case DNSAction::Action::Refused
:
1171 dq
.dh
->rcode
= RCode::Refused
;
1173 ++g_stats
.ruleRefused
;
1176 case DNSAction::Action::ServFail
:
1177 dq
.dh
->rcode
= RCode::ServFail
;
1179 ++g_stats
.ruleServFail
;
1182 case DNSAction::Action::Spoof
:
1183 spoofResponseFromString(dq
, ruleresult
);
1186 case DNSAction::Action::Truncate
:
1191 case DNSAction::Action::HeaderModify
:
1194 case DNSAction::Action::Pool
:
1195 poolname
=ruleresult
;
1198 /* non-terminal actions follow */
1199 case DNSAction::Action::Delay
:
1200 dq
.delayMsec
= static_cast<int>(pdns_stou(ruleresult
)); // sorry
1202 case DNSAction::Action::None
:
1204 case DNSAction::Action::NoOp
:
1206 case DNSAction::Action::NoRecurse
:
1217 ssize_t
udpClientSendRequestToBackend(const std::shared_ptr
<DownstreamState
>& ss
, const int sd
, const char* request
, const size_t requestLen
, bool healthCheck
)
1221 if (ss
->sourceItf
== 0) {
1222 result
= send(sd
, request
, requestLen
, 0);
1228 ComboAddress
remote(ss
->remote
);
1229 fillMSGHdr(&msgh
, &iov
, cbuf
, sizeof(cbuf
), const_cast<char*>(request
), requestLen
, &remote
);
1230 addCMsgSrcAddr(&msgh
, cbuf
, &ss
->sourceAddr
, ss
->sourceItf
);
1231 result
= sendmsg(sd
, &msgh
, 0);
1235 int savederrno
= errno
;
1236 vinfolog("Error sending request to backend %s: %d", ss
->remote
.toStringWithPort(), savederrno
);
1238 /* This might sound silly, but on Linux send() might fail with EINVAL
1239 if the interface the socket was bound to doesn't exist anymore.
1240 We don't want to reconnect the real socket if the healthcheck failed,
1241 because it's not using the same socket.
1243 if (!healthCheck
&& (savederrno
== EINVAL
|| savederrno
== ENODEV
)) {
1251 static bool isUDPQueryAcceptable(ClientState
& cs
, LocalHolders
& holders
, const struct msghdr
* msgh
, const ComboAddress
& remote
, ComboAddress
& dest
)
1253 if (msgh
->msg_flags
& MSG_TRUNC
) {
1254 /* message was too large for our buffer */
1255 vinfolog("Dropping message too large for our buffer");
1256 ++g_stats
.nonCompliantQueries
;
1260 if(!holders
.acl
->match(remote
)) {
1261 vinfolog("Query from %s dropped because of ACL", remote
.toStringWithPort());
1269 if (HarvestDestinationAddress(msgh
, &dest
)) {
1270 /* we don't get the port, only the address */
1271 dest
.sin4
.sin_port
= cs
.local
.sin4
.sin_port
;
1274 dest
.sin4
.sin_family
= 0;
1280 boost::optional
<std::vector
<uint8_t>> checkDNSCryptQuery(const ClientState
& cs
, const char* query
, uint16_t& len
, std::shared_ptr
<DNSCryptQuery
>& dnsCryptQuery
, time_t now
, bool tcp
)
1282 if (cs
.dnscryptCtx
) {
1283 #ifdef HAVE_DNSCRYPT
1284 vector
<uint8_t> response
;
1285 uint16_t decryptedQueryLen
= 0;
1287 dnsCryptQuery
= std::make_shared
<DNSCryptQuery
>(cs
.dnscryptCtx
);
1289 bool decrypted
= handleDNSCryptQuery(const_cast<char*>(query
), len
, dnsCryptQuery
, &decryptedQueryLen
, tcp
, now
, response
);
1292 if (response
.size() > 0) {
1295 throw std::runtime_error("Unable to decrypt DNSCrypt query, dropping.");
1298 len
= decryptedQueryLen
;
1299 #endif /* HAVE_DNSCRYPT */
1304 bool checkQueryHeaders(const struct dnsheader
* dh
)
1306 if (dh
->qr
) { // don't respond to responses
1307 ++g_stats
.nonCompliantQueries
;
1311 if (dh
->qdcount
== 0) {
1312 ++g_stats
.emptyQueries
;
1317 ++g_stats
.rdQueries
;
1323 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1324 static void queueResponse(const ClientState
& cs
, const char* response
, uint16_t responseLen
, const ComboAddress
& dest
, const ComboAddress
& remote
, struct mmsghdr
& outMsg
, struct iovec
* iov
, char* cbuf
)
1327 fillMSGHdr(&outMsg
.msg_hdr
, iov
, nullptr, 0, const_cast<char*>(response
), responseLen
, const_cast<ComboAddress
*>(&remote
));
1329 if (dest
.sin4
.sin_family
== 0) {
1330 outMsg
.msg_hdr
.msg_control
= nullptr;
1333 addCMsgSrcAddr(&outMsg
.msg_hdr
, cbuf
, &dest
, 0);
1336 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1338 /* self-generated responses or cache hits */
1339 static bool prepareOutgoingResponse(LocalHolders
& holders
, ClientState
& cs
, DNSQuestion
& dq
, bool cacheHit
)
1341 DNSResponse
dr(dq
.qname
, dq
.qtype
, dq
.qclass
, dq
.consumed
, dq
.local
, dq
.remote
, reinterpret_cast<dnsheader
*>(dq
.dh
), dq
.size
, dq
.len
, dq
.tcp
, dq
.queryTime
);
1343 #ifdef HAVE_PROTOBUF
1344 dr
.uniqueId
= dq
.uniqueId
;
1347 dr
.delayMsec
= dq
.delayMsec
;
1349 if (!applyRulesToResponse(cacheHit
? holders
.cacheHitRespRulactions
: holders
.selfAnsweredRespRulactions
, dr
)) {
1353 /* in case a rule changed it */
1354 dq
.delayMsec
= dr
.delayMsec
;
1356 #ifdef HAVE_DNSCRYPT
1358 if (!encryptResponse(reinterpret_cast<char*>(dq
.dh
), &dq
.len
, dq
.size
, dq
.tcp
, dq
.dnsCryptQuery
, nullptr, nullptr)) {
1362 #endif /* HAVE_DNSCRYPT */
1365 ++g_stats
.cacheHits
;
1368 switch (dr
.dh
->rcode
) {
1369 case RCode::NXDomain
:
1370 ++g_stats
.frontendNXDomain
;
1372 case RCode::ServFail
:
1373 ++g_stats
.frontendServFail
;
1375 case RCode::NoError
:
1376 ++g_stats
.frontendNoError
;
1380 doLatencyStats(0); // we're not going to measure this
1384 ProcessQueryResult
processQuery(DNSQuestion
& dq
, ClientState
& cs
, LocalHolders
& holders
, std::shared_ptr
<DownstreamState
>& selectedBackend
)
1386 const uint16_t queryId
= ntohs(dq
.dh
->id
);
1389 /* we need an accurate ("real") value for the response and
1390 to store into the IDS, but not for insertion into the
1391 rings for example */
1392 struct timespec now
;
1397 if (!applyRulesToQuery(holders
, dq
, poolname
, now
)) {
1398 return ProcessQueryResult::Drop
;
1401 if(dq
.dh
->qr
) { // something turned it into a response
1402 fixUpQueryTurnedResponse(dq
, dq
.origFlags
);
1404 if (!prepareOutgoingResponse(holders
, cs
, dq
, false)) {
1405 return ProcessQueryResult::Drop
;
1408 ++g_stats
.selfAnswered
;
1409 return ProcessQueryResult::SendAnswer
;
1412 std::shared_ptr
<ServerPool
> serverPool
= getPool(*holders
.pools
, poolname
);
1413 dq
.packetCache
= serverPool
->packetCache
;
1414 auto policy
= *(holders
.policy
);
1415 if (serverPool
->policy
!= nullptr) {
1416 policy
= *(serverPool
->policy
);
1418 auto servers
= serverPool
->getServers();
1420 std::lock_guard
<std::mutex
> lock(g_luamutex
);
1421 selectedBackend
= policy
.policy(servers
, &dq
);
1424 selectedBackend
= policy
.policy(servers
, &dq
);
1427 uint16_t cachedResponseSize
= dq
.size
;
1428 uint32_t allowExpired
= selectedBackend
? 0 : g_staleCacheEntriesTTL
;
1430 if (dq
.packetCache
&& !dq
.skipCache
) {
1431 dq
.dnssecOK
= (getEDNSZ(dq
) & EDNS_HEADER_FLAG_DO
);
1434 if (dq
.useECS
&& ((selectedBackend
&& selectedBackend
->useECS
) || (!selectedBackend
&& serverPool
->getECS()))) {
1435 // we special case our cache in case a downstream explicitly gave us a universally valid response with a 0 scope
1436 if (dq
.packetCache
&& !dq
.skipCache
&& (!selectedBackend
|| !selectedBackend
->disableZeroScope
) && dq
.packetCache
->isECSParsingEnabled()) {
1437 if (dq
.packetCache
->get(dq
, dq
.consumed
, dq
.dh
->id
, reinterpret_cast<char*>(dq
.dh
), &cachedResponseSize
, &dq
.cacheKeyNoECS
, dq
.subnet
, dq
.dnssecOK
, allowExpired
)) {
1438 dq
.len
= cachedResponseSize
;
1440 if (!prepareOutgoingResponse(holders
, cs
, dq
, true)) {
1441 return ProcessQueryResult::Drop
;
1444 return ProcessQueryResult::SendAnswer
;
1448 /* there was no existing ECS on the query, enable the zero-scope feature */
1449 dq
.useZeroScope
= true;
1453 if (!handleEDNSClientSubnet(dq
, &(dq
.ednsAdded
), &(dq
.ecsAdded
), g_preserveTrailingData
)) {
1454 vinfolog("Dropping query from %s because we couldn't insert the ECS value", dq
.remote
->toStringWithPort());
1455 return ProcessQueryResult::Drop
;
1459 if (dq
.packetCache
&& !dq
.skipCache
) {
1460 if (dq
.packetCache
->get(dq
, dq
.consumed
, dq
.dh
->id
, reinterpret_cast<char*>(dq
.dh
), &cachedResponseSize
, &dq
.cacheKey
, dq
.subnet
, dq
.dnssecOK
, allowExpired
)) {
1461 dq
.len
= cachedResponseSize
;
1463 if (!prepareOutgoingResponse(holders
, cs
, dq
, true)) {
1464 return ProcessQueryResult::Drop
;
1467 return ProcessQueryResult::SendAnswer
;
1469 ++g_stats
.cacheMisses
;
1472 if(!selectedBackend
) {
1475 vinfolog("%s query for %s|%s from %s, no policy applied", g_servFailOnNoPolicy
? "ServFailed" : "Dropped", dq
.qname
->toString(), QType(dq
.qtype
).getName(), dq
.remote
->toStringWithPort());
1476 if (g_servFailOnNoPolicy
) {
1477 restoreFlags(dq
.dh
, dq
.origFlags
);
1479 dq
.dh
->rcode
= RCode::ServFail
;
1482 if (!prepareOutgoingResponse(holders
, cs
, dq
, false)) {
1483 return ProcessQueryResult::Drop
;
1485 // no response-only statistics counter to update.
1486 return ProcessQueryResult::SendAnswer
;
1489 return ProcessQueryResult::Drop
;
1492 if (dq
.addXPF
&& selectedBackend
->xpfRRCode
!= 0) {
1493 addXPF(dq
, selectedBackend
->xpfRRCode
, g_preserveTrailingData
);
1496 selectedBackend
->queries
++;
1497 return ProcessQueryResult::PassToBackend
;
1499 catch(const std::exception
& e
){
1500 vinfolog("Got an error while parsing a %s query from %s, id %d: %s", (dq
.tcp
? "TCP" : "UDP"), dq
.remote
->toStringWithPort(), queryId
, e
.what());
1502 return ProcessQueryResult::Drop
;
1505 static void processUDPQuery(ClientState
& cs
, LocalHolders
& holders
, const struct msghdr
* msgh
, const ComboAddress
& remote
, ComboAddress
& dest
, char* query
, uint16_t len
, size_t queryBufferSize
, struct mmsghdr
* responsesVect
, unsigned int* queuedResponses
, struct iovec
* respIOV
, char* respCBuf
)
1507 assert(responsesVect
== nullptr || (queuedResponses
!= nullptr && respIOV
!= nullptr && respCBuf
!= nullptr));
1508 uint16_t queryId
= 0;
1511 if (!isUDPQueryAcceptable(cs
, holders
, msgh
, remote
, dest
)) {
1515 /* we need an accurate ("real") value for the response and
1516 to store into the IDS, but not for insertion into the
1517 rings for example */
1518 struct timespec queryRealTime
;
1519 gettime(&queryRealTime
, true);
1521 std::shared_ptr
<DNSCryptQuery
> dnsCryptQuery
= nullptr;
1522 auto dnsCryptResponse
= checkDNSCryptQuery(cs
, query
, len
, dnsCryptQuery
, queryRealTime
.tv_sec
, false);
1523 if (dnsCryptResponse
) {
1524 sendUDPResponse(cs
.udpFD
, reinterpret_cast<char*>(dnsCryptResponse
->data()), static_cast<uint16_t>(dnsCryptResponse
->size()), 0, dest
, remote
);
1528 struct dnsheader
* dh
= reinterpret_cast<struct dnsheader
*>(query
);
1529 queryId
= ntohs(dh
->id
);
1531 if (!checkQueryHeaders(dh
)) {
1535 uint16_t qtype
, qclass
;
1536 unsigned int consumed
= 0;
1537 DNSName
qname(query
, len
, sizeof(dnsheader
), false, &qtype
, &qclass
, &consumed
);
1538 DNSQuestion
dq(&qname
, qtype
, qclass
, consumed
, dest
.sin4
.sin_family
!= 0 ? &dest
: &cs
.local
, &remote
, dh
, queryBufferSize
, len
, false, &queryRealTime
);
1539 dq
.dnsCryptQuery
= std::move(dnsCryptQuery
);
1540 std::shared_ptr
<DownstreamState
> ss
{nullptr};
1541 auto result
= processQuery(dq
, cs
, holders
, ss
);
1543 if (result
== ProcessQueryResult::Drop
) {
1547 if (result
== ProcessQueryResult::SendAnswer
) {
1548 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1549 if (dq
.delayMsec
== 0 && responsesVect
!= nullptr) {
1550 queueResponse(cs
, reinterpret_cast<char*>(dq
.dh
), dq
.len
, *dq
.local
, *dq
.remote
, responsesVect
[*queuedResponses
], respIOV
, respCBuf
);
1551 (*queuedResponses
)++;
1554 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1555 /* we use dest, always, because we don't want to use the listening address to send a response since it could be 0.0.0.0 */
1556 sendUDPResponse(cs
.udpFD
, reinterpret_cast<char*>(dq
.dh
), dq
.len
, dq
.delayMsec
, dest
, *dq
.remote
);
1560 if (result
!= ProcessQueryResult::PassToBackend
|| ss
== nullptr) {
1564 unsigned int idOffset
= (ss
->idOffset
++) % ss
->idStates
.size();
1565 IDState
* ids
= &ss
->idStates
[idOffset
];
1569 int oldFD
= ids
->origFD
.exchange(cs
.udpFD
);
1571 // if we are reusing, no change in outstanding
1576 ++g_stats
.downstreamTimeouts
;
1580 ids
->origID
= dh
->id
;
1581 setIDStateFromDNSQuestion(*ids
, dq
, std::move(qname
));
1583 /* If we couldn't harvest the real dest addr, still
1584 write down the listening addr since it will be useful
1585 (especially if it's not an 'any' one).
1586 We need to keep track of which one it is since we may
1587 want to use the real but not the listening addr to reply.
1589 if (dest
.sin4
.sin_family
!= 0) {
1590 ids
->origDest
= dest
;
1591 ids
->destHarvested
= true;
1594 ids
->origDest
= cs
.local
;
1595 ids
->destHarvested
= false;
1600 int fd
= pickBackendSocketForSending(ss
);
1601 ssize_t ret
= udpClientSendRequestToBackend(ss
, fd
, query
, dq
.len
);
1605 ++g_stats
.downstreamSendErrors
;
1608 vinfolog("Got query for %s|%s from %s, relayed to %s", ids
->qname
.toString(), QType(ids
->qtype
).getName(), remote
.toStringWithPort(), ss
->getName());
1610 catch(const std::exception
& e
){
1611 vinfolog("Got an error in UDP question thread while parsing a query from %s, id %d: %s", remote
.toStringWithPort(), queryId
, e
.what());
1615 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1616 static void MultipleMessagesUDPClientThread(ClientState
* cs
, LocalHolders
& holders
)
1621 /* used by HarvestDestinationAddress */
1623 ComboAddress remote
;
1627 const size_t vectSize
= g_udpVectorSize
;
1628 /* the actual buffer is larger because:
1629 - we may have to add EDNS and/or ECS
1630 - we use it for self-generated responses (from rule or cache)
1631 but we only accept incoming payloads up to that size
1633 static_assert(s_udpIncomingBufferSize
<= sizeof(MMReceiver::packet
), "the incoming buffer size should not be larger than sizeof(MMReceiver::packet)");
1635 auto recvData
= std::unique_ptr
<MMReceiver
[]>(new MMReceiver
[vectSize
]);
1636 auto msgVec
= std::unique_ptr
<struct mmsghdr
[]>(new struct mmsghdr
[vectSize
]);
1637 auto outMsgVec
= std::unique_ptr
<struct mmsghdr
[]>(new struct mmsghdr
[vectSize
]);
1639 /* initialize the structures needed to receive our messages */
1640 for (size_t idx
= 0; idx
< vectSize
; idx
++) {
1641 recvData
[idx
].remote
.sin4
.sin_family
= cs
->local
.sin4
.sin_family
;
1642 fillMSGHdr(&msgVec
[idx
].msg_hdr
, &recvData
[idx
].iov
, recvData
[idx
].cbuf
, sizeof(recvData
[idx
].cbuf
), recvData
[idx
].packet
, s_udpIncomingBufferSize
, &recvData
[idx
].remote
);
1648 /* reset the IO vector, since it's also used to send the vector of responses
1649 to avoid having to copy the data around */
1650 for (size_t idx
= 0; idx
< vectSize
; idx
++) {
1651 recvData
[idx
].iov
.iov_base
= recvData
[idx
].packet
;
1652 recvData
[idx
].iov
.iov_len
= sizeof(recvData
[idx
].packet
);
1655 /* block until we have at least one message ready, but return
1656 as many as possible to save the syscall costs */
1657 int msgsGot
= recvmmsg(cs
->udpFD
, msgVec
.get(), vectSize
, MSG_WAITFORONE
| MSG_TRUNC
, nullptr);
1660 vinfolog("Getting UDP messages via recvmmsg() failed with: %s", strerror(errno
));
1664 unsigned int msgsToSend
= 0;
1666 /* process the received messages */
1667 for (int msgIdx
= 0; msgIdx
< msgsGot
; msgIdx
++) {
1668 const struct msghdr
* msgh
= &msgVec
[msgIdx
].msg_hdr
;
1669 unsigned int got
= msgVec
[msgIdx
].msg_len
;
1670 const ComboAddress
& remote
= recvData
[msgIdx
].remote
;
1672 if (static_cast<size_t>(got
) < sizeof(struct dnsheader
)) {
1673 ++g_stats
.nonCompliantQueries
;
1677 processUDPQuery(*cs
, holders
, msgh
, remote
, recvData
[msgIdx
].dest
, recvData
[msgIdx
].packet
, static_cast<uint16_t>(got
), sizeof(recvData
[msgIdx
].packet
), outMsgVec
.get(), &msgsToSend
, &recvData
[msgIdx
].iov
, recvData
[msgIdx
].cbuf
);
1681 /* immediate (not delayed or sent to a backend) responses (mostly from a rule, dynamic block
1682 or the cache) can be sent in batch too */
1684 if (msgsToSend
> 0 && msgsToSend
<= static_cast<unsigned int>(msgsGot
)) {
1685 int sent
= sendmmsg(cs
->udpFD
, outMsgVec
.get(), msgsToSend
, 0);
1687 if (sent
< 0 || static_cast<unsigned int>(sent
) != msgsToSend
) {
1688 vinfolog("Error sending responses with sendmmsg() (%d on %u): %s", sent
, msgsToSend
, strerror(errno
));
1694 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1696 // listens to incoming queries, sends out to downstream servers, noting the intended return path
1697 static void udpClientThread(ClientState
* cs
)
1700 setThreadName("dnsdist/udpClie");
1701 LocalHolders holders
;
1703 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1704 if (g_udpVectorSize
> 1) {
1705 MultipleMessagesUDPClientThread(cs
, holders
);
1709 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1712 /* the actual buffer is larger because:
1713 - we may have to add EDNS and/or ECS
1714 - we use it for self-generated responses (from rule or cache)
1715 but we only accept incoming payloads up to that size
1717 static_assert(s_udpIncomingBufferSize
<= sizeof(packet
), "the incoming buffer size should not be larger than sizeof(MMReceiver::packet)");
1720 /* used by HarvestDestinationAddress */
1723 ComboAddress remote
;
1725 remote
.sin4
.sin_family
= cs
->local
.sin4
.sin_family
;
1726 fillMSGHdr(&msgh
, &iov
, cbuf
, sizeof(cbuf
), packet
, sizeof(packet
), &remote
);
1729 ssize_t got
= recvmsg(cs
->udpFD
, &msgh
, 0);
1731 if (got
< 0 || static_cast<size_t>(got
) < sizeof(struct dnsheader
)) {
1732 ++g_stats
.nonCompliantQueries
;
1736 processUDPQuery(*cs
, holders
, &msgh
, remote
, dest
, packet
, static_cast<uint16_t>(got
), s_udpIncomingBufferSize
, nullptr, nullptr, nullptr, nullptr);
1740 catch(const std::exception
&e
)
1742 errlog("UDP client thread died because of exception: %s", e
.what());
1744 catch(const PDNSException
&e
)
1746 errlog("UDP client thread died because of PowerDNS exception: %s", e
.reason
);
1750 errlog("UDP client thread died because of an exception: %s", "unknown");
1753 uint16_t getRandomDNSID()
1755 #ifdef HAVE_LIBSODIUM
1756 return (randombytes_random() % 65536);
1758 return (random() % 65536);
1762 static bool upCheck(const shared_ptr
<DownstreamState
>& ds
)
1765 DNSName checkName
= ds
->checkName
;
1766 uint16_t checkType
= ds
->checkType
.getCode();
1767 uint16_t checkClass
= ds
->checkClass
;
1768 dnsheader checkHeader
;
1769 memset(&checkHeader
, 0, sizeof(checkHeader
));
1771 checkHeader
.qdcount
= htons(1);
1772 checkHeader
.id
= getRandomDNSID();
1774 checkHeader
.rd
= true;
1776 checkHeader
.cd
= true;
1779 if (ds
->checkFunction
) {
1780 std::lock_guard
<std::mutex
> lock(g_luamutex
);
1781 auto ret
= ds
->checkFunction(checkName
, checkType
, checkClass
, &checkHeader
);
1782 checkName
= std::get
<0>(ret
);
1783 checkType
= std::get
<1>(ret
);
1784 checkClass
= std::get
<2>(ret
);
1787 vector
<uint8_t> packet
;
1788 DNSPacketWriter
dpw(packet
, checkName
, checkType
, checkClass
);
1789 dnsheader
* requestHeader
= dpw
.getHeader();
1790 *requestHeader
= checkHeader
;
1792 Socket
sock(ds
->remote
.sin4
.sin_family
, SOCK_DGRAM
);
1793 sock
.setNonBlocking();
1794 if (!IsAnyAddress(ds
->sourceAddr
)) {
1795 sock
.setReuseAddr();
1796 sock
.bind(ds
->sourceAddr
);
1798 sock
.connect(ds
->remote
);
1799 ssize_t sent
= udpClientSendRequestToBackend(ds
, sock
.getHandle(), reinterpret_cast<char*>(&packet
[0]), packet
.size(), true);
1802 if (g_verboseHealthChecks
)
1803 infolog("Error while sending a health check query to backend %s: %d", ds
->getNameWithAddr(), ret
);
1807 int ret
= waitForRWData(sock
.getHandle(), true, /* ms to seconds */ ds
->checkTimeout
/ 1000, /* remaining ms to us */ (ds
->checkTimeout
% 1000) * 1000);
1808 if(ret
< 0 || !ret
) { // error, timeout, both are down!
1811 if (g_verboseHealthChecks
)
1812 infolog("Error while waiting for the health check response from backend %s: %d", ds
->getNameWithAddr(), ret
);
1815 if (g_verboseHealthChecks
)
1816 infolog("Timeout while waiting for the health check response from backend %s", ds
->getNameWithAddr());
1823 sock
.recvFrom(reply
, from
);
1825 /* we are using a connected socket but hey.. */
1826 if (from
!= ds
->remote
) {
1827 if (g_verboseHealthChecks
)
1828 infolog("Invalid health check response received from %s, expecting one from %s", from
.toStringWithPort(), ds
->remote
.toStringWithPort());
1832 const dnsheader
* responseHeader
= reinterpret_cast<const dnsheader
*>(reply
.c_str());
1834 if (reply
.size() < sizeof(*responseHeader
)) {
1835 if (g_verboseHealthChecks
)
1836 infolog("Invalid health check response of size %d from backend %s, expecting at least %d", reply
.size(), ds
->getNameWithAddr(), sizeof(*responseHeader
));
1840 if (responseHeader
->id
!= requestHeader
->id
) {
1841 if (g_verboseHealthChecks
)
1842 infolog("Invalid health check response id %d from backend %s, expecting %d", responseHeader
->id
, ds
->getNameWithAddr(), requestHeader
->id
);
1846 if (!responseHeader
->qr
) {
1847 if (g_verboseHealthChecks
)
1848 infolog("Invalid health check response from backend %s, expecting QR to be set", ds
->getNameWithAddr());
1852 if (responseHeader
->rcode
== RCode::ServFail
) {
1853 if (g_verboseHealthChecks
)
1854 infolog("Backend %s responded to health check with ServFail", ds
->getNameWithAddr());
1858 if (ds
->mustResolve
&& (responseHeader
->rcode
== RCode::NXDomain
|| responseHeader
->rcode
== RCode::Refused
)) {
1859 if (g_verboseHealthChecks
)
1860 infolog("Backend %s responded to health check with %s while mustResolve is set", ds
->getNameWithAddr(), responseHeader
->rcode
== RCode::NXDomain
? "NXDomain" : "Refused");
1864 uint16_t receivedType
;
1865 uint16_t receivedClass
;
1866 DNSName
receivedName(reply
.c_str(), reply
.size(), sizeof(dnsheader
), false, &receivedType
, &receivedClass
);
1868 if (receivedName
!= checkName
|| receivedType
!= checkType
|| receivedClass
!= checkClass
) {
1869 if (g_verboseHealthChecks
)
1870 infolog("Backend %s responded to health check with an invalid qname (%s vs %s), qtype (%s vs %s) or qclass (%d vs %d)", ds
->getNameWithAddr(), receivedName
.toLogString(), checkName
.toLogString(), QType(receivedType
).getName(), QType(checkType
).getName(), receivedClass
, checkClass
);
1876 catch(const std::exception
& e
)
1878 if (g_verboseHealthChecks
)
1879 infolog("Error checking the health of backend %s: %s", ds
->getNameWithAddr(), e
.what());
1884 if (g_verboseHealthChecks
)
1885 infolog("Unknown exception while checking the health of backend %s", ds
->getNameWithAddr());
1889 uint64_t g_maxTCPClientThreads
{10};
1890 std::atomic
<uint16_t> g_cacheCleaningDelay
{60};
1891 std::atomic
<uint16_t> g_cacheCleaningPercentage
{100};
1895 setThreadName("dnsdist/main");
1898 int32_t secondsToWaitLog
= 0;
1904 std::lock_guard
<std::mutex
> lock(g_luamutex
);
1905 auto f
= g_lua
.readVariable
<boost::optional
<std::function
<void()> > >("maintenance");
1909 secondsToWaitLog
= 0;
1911 catch(std::exception
&e
) {
1912 if (secondsToWaitLog
<= 0) {
1913 infolog("Error during execution of maintenance function: %s", e
.what());
1914 secondsToWaitLog
= 61;
1916 secondsToWaitLog
-= interval
;
1922 if (counter
>= g_cacheCleaningDelay
) {
1923 /* keep track, for each cache, of whether we should keep
1925 std::map
<std::shared_ptr
<DNSDistPacketCache
>, bool> caches
;
1927 /* gather all caches actually used by at least one pool, and see
1928 if something prevents us from cleaning the expired entries */
1929 auto localPools
= g_pools
.getLocal();
1930 for (const auto& entry
: *localPools
) {
1931 auto& pool
= entry
.second
;
1933 auto packetCache
= pool
->packetCache
;
1938 auto pair
= caches
.insert({packetCache
, false});
1939 auto& iter
= pair
.first
;
1940 /* if we need to keep stale data for this cache (ie, not clear
1941 expired entries when at least one pool using this cache
1942 has all its backends down) */
1943 if (packetCache
->keepStaleData() && iter
->second
== false) {
1944 /* so far all pools had at least one backend up */
1945 if (pool
->countServers(true) == 0) {
1946 iter
->second
= true;
1951 for (auto pair
: caches
) {
1952 /* shall we keep expired entries ? */
1953 if (pair
.second
== true) {
1956 auto& packetCache
= pair
.first
;
1957 size_t upTo
= (packetCache
->getMaxEntries()* (100 - g_cacheCleaningPercentage
)) / 100;
1958 packetCache
->purgeExpired(upTo
);
1963 // ponder pruning g_dynblocks of expired entries here
1967 static void secPollThread()
1969 setThreadName("dnsdist/secpoll");
1973 doSecPoll(g_secPollSuffix
);
1977 sleep(g_secPollInterval
);
1981 static void healthChecksThread()
1983 setThreadName("dnsdist/healthC");
1990 if(g_tcpclientthreads
->getQueuedCount() > 1 && !g_tcpclientthreads
->hasReachedMaxThreads())
1991 g_tcpclientthreads
->addTCPClientThread();
1993 auto states
= g_dstates
.getLocal(); // this points to the actual shared_ptrs!
1994 for(auto& dss
: *states
) {
1995 if(++dss
->lastCheck
< dss
->checkInterval
)
1998 if(dss
->availability
==DownstreamState::Availability::Auto
) {
1999 bool newState
=upCheck(dss
);
2001 /* check succeeded */
2002 dss
->currentCheckFailures
= 0;
2004 if (!dss
->upStatus
) {
2005 /* we were marked as down */
2006 dss
->consecutiveSuccessfulChecks
++;
2007 if (dss
->consecutiveSuccessfulChecks
< dss
->minRiseSuccesses
) {
2008 /* if we need more than one successful check to rise
2009 and we didn't reach the threshold yet,
2017 dss
->consecutiveSuccessfulChecks
= 0;
2019 if (dss
->upStatus
) {
2020 /* we are currently up */
2021 dss
->currentCheckFailures
++;
2022 if (dss
->currentCheckFailures
< dss
->maxCheckFailures
) {
2023 /* we need more than one failure to be marked as down,
2024 and we did not reach the threshold yet, let's stay down */
2030 if(newState
!= dss
->upStatus
) {
2031 warnlog("Marking downstream %s as '%s'", dss
->getNameWithAddr(), newState
? "up" : "down");
2033 if (newState
&& !dss
->connected
) {
2034 newState
= dss
->reconnect();
2036 if (dss
->connected
&& !dss
->threadStarted
.test_and_set()) {
2037 dss
->tid
= thread(responderThread
, dss
);
2041 dss
->upStatus
= newState
;
2042 dss
->currentCheckFailures
= 0;
2043 dss
->consecutiveSuccessfulChecks
= 0;
2044 if (g_snmpAgent
&& g_snmpTrapsEnabled
) {
2045 g_snmpAgent
->sendBackendStatusChangeTrap(dss
);
2050 auto delta
= dss
->sw
.udiffAndSet()/1000000.0;
2051 dss
->queryLoad
= 1.0*(dss
->queries
.load() - dss
->prev
.queries
.load())/delta
;
2052 dss
->dropRate
= 1.0*(dss
->reuseds
.load() - dss
->prev
.reuseds
.load())/delta
;
2053 dss
->prev
.queries
.store(dss
->queries
.load());
2054 dss
->prev
.reuseds
.store(dss
->reuseds
.load());
2056 for(IDState
& ids
: dss
->idStates
) { // timeouts
2057 int origFD
= ids
.origFD
;
2058 if(origFD
>=0 && ids
.age
++ > g_udpTimeout
) {
2059 /* We set origFD to -1 as soon as possible
2060 to limit the risk of racing with the
2062 The UDP client thread only checks origFD to
2063 know whether outstanding has to be incremented,
2064 so the sooner the better any way since we _will_
2067 if (ids
.origFD
.exchange(-1) != origFD
) {
2068 /* this state has been altered in the meantime,
2069 don't go anywhere near it */
2076 ++g_stats
.downstreamTimeouts
; // this is an 'actively' discovered timeout
2077 vinfolog("Had a downstream timeout from %s (%s) for query for %s|%s from %s",
2078 dss
->remote
.toStringWithPort(), dss
->name
,
2079 ids
.qname
.toString(), QType(ids
.qtype
).getName(), ids
.origRemote
.toStringWithPort());
2084 struct dnsheader fake
;
2085 memset(&fake
, 0, sizeof(fake
));
2086 fake
.id
= ids
.origID
;
2088 g_rings
.insertResponse(ts
, ids
.origRemote
, ids
.qname
, ids
.qtype
, std::numeric_limits
<unsigned int>::max(), 0, fake
, dss
->remote
);
2095 static void bindAny(int af
, int sock
)
2097 __attribute__((unused
)) int one
= 1;
2100 if (setsockopt(sock
, IPPROTO_IP
, IP_FREEBIND
, &one
, sizeof(one
)) < 0)
2101 warnlog("Warning: IP_FREEBIND setsockopt failed: %s", strerror(errno
));
2106 if (setsockopt(sock
, IPPROTO_IP
, IP_BINDANY
, &one
, sizeof(one
)) < 0)
2107 warnlog("Warning: IP_BINDANY setsockopt failed: %s", strerror(errno
));
2111 if (setsockopt(sock
, IPPROTO_IPV6
, IPV6_BINDANY
, &one
, sizeof(one
)) < 0)
2112 warnlog("Warning: IPV6_BINDANY setsockopt failed: %s", strerror(errno
));
2115 if (setsockopt(sock
, SOL_SOCKET
, SO_BINDANY
, &one
, sizeof(one
)) < 0)
2116 warnlog("Warning: SO_BINDANY setsockopt failed: %s", strerror(errno
));
2120 static void dropGroupPrivs(gid_t gid
)
2123 if (setgid(gid
) == 0) {
2124 if (setgroups(0, NULL
) < 0) {
2125 warnlog("Warning: Unable to drop supplementary gids: %s", strerror(errno
));
2129 warnlog("Warning: Unable to set group ID to %d: %s", gid
, strerror(errno
));
2134 static void dropUserPrivs(uid_t uid
)
2137 if(setuid(uid
) < 0) {
2138 warnlog("Warning: Unable to set user ID to %d: %s", uid
, strerror(errno
));
2143 static void checkFileDescriptorsLimits(size_t udpBindsCount
, size_t tcpBindsCount
)
2145 /* stdin, stdout, stderr */
2146 size_t requiredFDsCount
= 3;
2147 auto backends
= g_dstates
.getLocal();
2148 /* UDP sockets to backends */
2149 size_t backendUDPSocketsCount
= 0;
2150 for (const auto& backend
: *backends
) {
2151 backendUDPSocketsCount
+= backend
->sockets
.size();
2153 requiredFDsCount
+= backendUDPSocketsCount
;
2154 /* TCP sockets to backends */
2155 requiredFDsCount
+= (backends
->size() * g_maxTCPClientThreads
);
2156 /* listening sockets */
2157 requiredFDsCount
+= udpBindsCount
;
2158 requiredFDsCount
+= tcpBindsCount
;
2159 /* max TCP connections currently served */
2160 requiredFDsCount
+= g_maxTCPClientThreads
;
2161 /* max pipes for communicating between TCP acceptors and client threads */
2162 requiredFDsCount
+= (g_maxTCPClientThreads
* 2);
2163 /* max TCP queued connections */
2164 requiredFDsCount
+= g_maxTCPQueuedConnections
;
2165 /* DelayPipe pipe */
2166 requiredFDsCount
+= 2;
2169 /* webserver main socket */
2171 /* console main socket */
2178 getrlimit(RLIMIT_NOFILE
, &rl
);
2179 if (rl
.rlim_cur
<= requiredFDsCount
) {
2180 warnlog("Warning, this configuration can use more than %d file descriptors, web server and console connections not included, and the current limit is %d.", std::to_string(requiredFDsCount
), std::to_string(rl
.rlim_cur
));
2182 warnlog("You can increase this value by using LimitNOFILE= in the systemd unit file or ulimit.");
2184 warnlog("You can increase this value by using ulimit.");
2189 static void setUpLocalBind(std::unique_ptr
<ClientState
>& cs
)
2191 /* skip some warnings if there is an identical UDP context */
2192 bool warn
= cs
->tcp
== false || cs
->tlsFrontend
!= nullptr || cs
->dohFrontend
!= nullptr;
2193 int& fd
= cs
->tcp
== false ? cs
->udpFD
: cs
->tcpFD
;
2196 fd
= SSocket(cs
->local
.sin4
.sin_family
, cs
->tcp
== false ? SOCK_DGRAM
: SOCK_STREAM
, 0);
2199 SSetsockopt(fd
, SOL_SOCKET
, SO_REUSEADDR
, 1);
2200 #ifdef TCP_DEFER_ACCEPT
2201 SSetsockopt(fd
, IPPROTO_TCP
, TCP_DEFER_ACCEPT
, 1);
2203 if (cs
->fastOpenQueueSize
> 0) {
2205 SSetsockopt(fd
, IPPROTO_TCP
, TCP_FASTOPEN
, cs
->fastOpenQueueSize
);
2208 warnlog("TCP Fast Open has been configured on local address '%s' but is not supported", cs
->local
.toStringWithPort());
2214 if(cs
->local
.sin4
.sin_family
== AF_INET6
) {
2215 SSetsockopt(fd
, IPPROTO_IPV6
, IPV6_V6ONLY
, 1);
2218 bindAny(cs
->local
.sin4
.sin_family
, fd
);
2220 if(!cs
->tcp
&& IsAnyAddress(cs
->local
)) {
2222 setsockopt(fd
, IPPROTO_IP
, GEN_IP_PKTINFO
, &one
, sizeof(one
)); // linux supports this, so why not - might fail on other systems
2223 #ifdef IPV6_RECVPKTINFO
2224 setsockopt(fd
, IPPROTO_IPV6
, IPV6_RECVPKTINFO
, &one
, sizeof(one
));
2228 if (cs
->reuseport
) {
2230 SSetsockopt(fd
, SOL_SOCKET
, SO_REUSEPORT
, 1);
2233 /* no need to warn again if configured but support is not available, we already did for UDP */
2234 warnlog("SO_REUSEPORT has been configured on local address '%s' but is not supported", cs
->local
.toStringWithPort());
2240 if (cs
->local
.isIPv4()) {
2242 setSocketIgnorePMTU(cs
->udpFD
);
2244 catch(const std::exception
& e
) {
2245 warnlog("Failed to set IP_MTU_DISCOVER on UDP server socket for local address '%s': %s", cs
->local
.toStringWithPort(), e
.what());
2250 const std::string
& itf
= cs
->interface
;
2252 #ifdef SO_BINDTODEVICE
2253 int res
= setsockopt(fd
, SOL_SOCKET
, SO_BINDTODEVICE
, itf
.c_str(), itf
.length());
2255 warnlog("Error setting up the interface on local address '%s': %s", cs
->local
.toStringWithPort(), strerror(errno
));
2259 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", cs
->local
.toStringWithPort());
2265 if (g_defaultBPFFilter
) {
2266 cs
->attachFilter(g_defaultBPFFilter
);
2267 vinfolog("Attaching default BPF Filter to %s frontend %s", (!cs
->tcp
? "UDP" : "TCP"), cs
->local
.toStringWithPort());
2269 #endif /* HAVE_EBPF */
2271 if (cs
->tlsFrontend
!= nullptr) {
2272 if (!cs
->tlsFrontend
->setupTLS()) {
2273 errlog("Error while setting up TLS on local address '%s', exiting", cs
->local
.toStringWithPort());
2274 _exit(EXIT_FAILURE
);
2278 if (cs
->dohFrontend
!= nullptr) {
2279 cs
->dohFrontend
->setup();
2282 SBind(fd
, cs
->local
);
2285 SListen(cs
->tcpFD
, SOMAXCONN
);
2286 if (cs
->tlsFrontend
!= nullptr) {
2287 warnlog("Listening on %s for TLS", cs
->local
.toStringWithPort());
2289 else if (cs
->dohFrontend
!= nullptr) {
2290 warnlog("Listening on %s for DoH", cs
->local
.toStringWithPort());
2292 else if (cs
->dnscryptCtx
!= nullptr) {
2293 warnlog("Listening on %s for DNSCrypt", cs
->local
.toStringWithPort());
2296 warnlog("Listening on %s", cs
->local
.toStringWithPort());
2305 vector
<string
> locals
;
2306 vector
<string
> remotes
;
2307 bool checkConfig
{false};
2308 bool beClient
{false};
2309 bool beSupervised
{false};
2316 std::atomic
<bool> g_configurationDone
{false};
2321 cout
<<"Syntax: dnsdist [-C,--config file] [-c,--client [IP[:PORT]]]\n";
2322 cout
<<"[-e,--execute cmd] [-h,--help] [-l,--local addr]\n";
2323 cout
<<"[-v,--verbose] [--check-config] [--version]\n";
2325 cout
<<"-a,--acl netmask Add this netmask to the ACL\n";
2326 cout
<<"-C,--config file Load configuration from 'file'\n";
2327 cout
<<"-c,--client Operate as a client, connect to dnsdist. This reads\n";
2328 cout
<<" controlSocket from your configuration file, but also\n";
2329 cout
<<" accepts an IP:PORT argument\n";
2330 #ifdef HAVE_LIBSODIUM
2331 cout
<<"-k,--setkey KEY Use KEY for encrypted communication to dnsdist. This\n";
2332 cout
<<" is similar to setting setKey in the configuration file.\n";
2333 cout
<<" NOTE: this will leak this key in your shell's history\n";
2334 cout
<<" and in the systems running process list.\n";
2336 cout
<<"--check-config Validate the configuration file and exit. The exit-code\n";
2337 cout
<<" reflects the validation, 0 is OK, 1 means an error.\n";
2338 cout
<<" Any errors are printed as well.\n";
2339 cout
<<"-e,--execute cmd Connect to dnsdist and execute 'cmd'\n";
2340 cout
<<"-g,--gid gid Change the process group ID after binding sockets\n";
2341 cout
<<"-h,--help Display this helpful message\n";
2342 cout
<<"-l,--local address Listen on this local address\n";
2343 cout
<<"--supervised Don't open a console, I'm supervised\n";
2344 cout
<<" (use with e.g. systemd and daemontools)\n";
2345 cout
<<"--disable-syslog Don't log to syslog, only to stdout\n";
2346 cout
<<" (use with e.g. systemd)\n";
2347 cout
<<"-u,--uid uid Change the process user ID after binding sockets\n";
2348 cout
<<"-v,--verbose Enable verbose mode\n";
2349 cout
<<"-V,--version Show dnsdist version information and exit\n";
2352 int main(int argc
, char** argv
)
2355 size_t udpBindsCount
= 0;
2356 size_t tcpBindsCount
= 0;
2357 rl_attempted_completion_function
= my_completion
;
2358 rl_completion_append_character
= 0;
2360 signal(SIGPIPE
, SIG_IGN
);
2361 signal(SIGCHLD
, SIG_IGN
);
2362 openlog("dnsdist", LOG_PID
|LOG_NDELAY
, LOG_DAEMON
);
2364 #ifdef HAVE_LIBSODIUM
2365 if (sodium_init() == -1) {
2366 cerr
<<"Unable to initialize crypto library"<<endl
;
2369 g_hashperturb
=randombytes_uniform(0xffffffff);
2370 srandom(randombytes_uniform(0xffffffff));
2374 gettimeofday(&tv
, 0);
2375 srandom(tv
.tv_sec
^ tv
.tv_usec
^ getpid());
2376 g_hashperturb
=random();
2380 ComboAddress clientAddress
= ComboAddress();
2381 g_cmdLine
.config
=SYSCONFDIR
"/dnsdist.conf";
2382 struct option longopts
[]={
2383 {"acl", required_argument
, 0, 'a'},
2384 {"check-config", no_argument
, 0, 1},
2385 {"client", no_argument
, 0, 'c'},
2386 {"config", required_argument
, 0, 'C'},
2387 {"disable-syslog", no_argument
, 0, 2},
2388 {"execute", required_argument
, 0, 'e'},
2389 {"gid", required_argument
, 0, 'g'},
2390 {"help", no_argument
, 0, 'h'},
2391 {"local", required_argument
, 0, 'l'},
2392 {"setkey", required_argument
, 0, 'k'},
2393 {"supervised", no_argument
, 0, 3},
2394 {"uid", required_argument
, 0, 'u'},
2395 {"verbose", no_argument
, 0, 'v'},
2396 {"version", no_argument
, 0, 'V'},
2402 int c
=getopt_long(argc
, argv
, "a:cC:e:g:hk:l:u:vV", longopts
, &longindex
);
2407 g_cmdLine
.checkConfig
=true;
2413 g_cmdLine
.beSupervised
=true;
2416 g_cmdLine
.config
=optarg
;
2419 g_cmdLine
.beClient
=true;
2422 g_cmdLine
.command
=optarg
;
2425 g_cmdLine
.gid
=optarg
;
2428 cout
<<"dnsdist "<<VERSION
<<endl
;
2435 g_ACL
.modify([optstring
](NetmaskGroup
& nmg
) { nmg
.addMask(optstring
); });
2438 #ifdef HAVE_LIBSODIUM
2439 if (B64Decode(string(optarg
), g_consoleKey
) < 0) {
2440 cerr
<<"Unable to decode key '"<<optarg
<<"'."<<endl
;
2444 cerr
<<"dnsdist has been built without libsodium, -k/--setkey is unsupported."<<endl
;
2449 g_cmdLine
.locals
.push_back(trim_copy(string(optarg
)));
2452 g_cmdLine
.uid
=optarg
;
2458 #ifdef LUAJIT_VERSION
2459 cout
<<"dnsdist "<<VERSION
<<" ("<<LUA_RELEASE
<<" ["<<LUAJIT_VERSION
<<"])"<<endl
;
2461 cout
<<"dnsdist "<<VERSION
<<" ("<<LUA_RELEASE
<<")"<<endl
;
2463 cout
<<"Enabled features: ";
2464 #ifdef HAVE_DNS_OVER_TLS
2465 cout
<<"dns-over-tls(";
2477 #ifdef HAVE_DNS_OVER_HTTPS
2478 cout
<<"dns-over-https(DOH) ";
2480 #ifdef HAVE_DNSCRYPT
2489 #ifdef HAVE_LIBCRYPTO
2492 #ifdef HAVE_LIBSODIUM
2495 #ifdef HAVE_PROTOBUF
2501 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
2502 cout
<<"recvmmsg/sendmmsg ";
2504 #ifdef HAVE_NET_SNMP
2514 //getopt_long printed an error message.
2523 for(auto p
= argv
; *p
; ++p
) {
2524 if(g_cmdLine
.beClient
) {
2525 clientAddress
= ComboAddress(*p
, 5199);
2527 g_cmdLine
.remotes
.push_back(*p
);
2531 ServerPolicy leastOutstandingPol
{"leastOutstanding", leastOutstanding
, false};
2533 g_policy
.setState(leastOutstandingPol
);
2534 if(g_cmdLine
.beClient
|| !g_cmdLine
.command
.empty()) {
2535 setupLua(true, g_cmdLine
.config
);
2536 if (clientAddress
!= ComboAddress())
2537 g_serverControl
= clientAddress
;
2538 doClient(g_serverControl
, g_cmdLine
.command
);
2539 _exit(EXIT_SUCCESS
);
2542 auto acl
= g_ACL
.getCopy();
2544 for(auto& addr
: {"127.0.0.0/8", "10.0.0.0/8", "100.64.0.0/10", "169.254.0.0/16", "192.168.0.0/16", "172.16.0.0/12", "::1/128", "fc00::/7", "fe80::/10"})
2546 g_ACL
.setState(acl
);
2549 auto consoleACL
= g_consoleACL
.getCopy();
2550 for (const auto& mask
: { "127.0.0.1/8", "::1/128" }) {
2551 consoleACL
.addMask(mask
);
2553 g_consoleACL
.setState(consoleACL
);
2555 if (g_cmdLine
.checkConfig
) {
2556 setupLua(true, g_cmdLine
.config
);
2557 // No exception was thrown
2558 infolog("Configuration '%s' OK!", g_cmdLine
.config
);
2559 _exit(EXIT_SUCCESS
);
2562 auto todo
=setupLua(false, g_cmdLine
.config
);
2564 auto localPools
= g_pools
.getCopy();
2566 bool precompute
= false;
2567 if (g_policy
.getLocal()->name
== "chashed") {
2570 for (const auto& entry
: localPools
) {
2571 if (entry
.second
->policy
!= nullptr && entry
.second
->policy
->name
== "chashed") {
2578 vinfolog("Pre-computing hashes for consistent hash load-balancing policy");
2579 // pre compute hashes
2580 auto backends
= g_dstates
.getLocal();
2581 for (auto& backend
: *backends
) {
2587 if (!g_cmdLine
.locals
.empty()) {
2588 for (auto it
= g_frontends
.begin(); it
!= g_frontends
.end(); ) {
2589 /* DoH, DoT and DNSCrypt frontends are separate */
2590 if ((*it
)->dohFrontend
== nullptr && (*it
)->tlsFrontend
== nullptr && (*it
)->dnscryptCtx
== nullptr) {
2591 it
= g_frontends
.erase(it
);
2598 for(const auto& loc
: g_cmdLine
.locals
) {
2600 g_frontends
.push_back(std::unique_ptr
<ClientState
>(new ClientState(ComboAddress(loc
, 53), false, false, 0, "", {})));
2602 g_frontends
.push_back(std::unique_ptr
<ClientState
>(new ClientState(ComboAddress(loc
, 53), true, false, 0, "", {})));
2606 if (g_frontends
.empty()) {
2608 g_frontends
.push_back(std::unique_ptr
<ClientState
>(new ClientState(ComboAddress("127.0.0.1", 53), false, false, 0, "", {})));
2610 g_frontends
.push_back(std::unique_ptr
<ClientState
>(new ClientState(ComboAddress("127.0.0.1", 53), true, false, 0, "", {})));
2613 g_configurationDone
= true;
2615 for(auto& frontend
: g_frontends
) {
2616 setUpLocalBind(frontend
);
2618 if (frontend
->tcp
== false) {
2626 warnlog("dnsdist %s comes with ABSOLUTELY NO WARRANTY. This is free software, and you are welcome to redistribute it according to the terms of the GPL version 2", VERSION
);
2630 g_ACL
.getLocal()->toStringVector(&vec
);
2631 for(const auto& s
: vec
) {
2636 infolog("ACL allowing queries from: %s", acls
.c_str());
2639 g_consoleACL
.getLocal()->toStringVector(&vec
);
2640 for (const auto& entry
: vec
) {
2641 if (!acls
.empty()) {
2646 infolog("Console ACL allowing connections from: %s", acls
.c_str());
2648 #ifdef HAVE_LIBSODIUM
2649 if (g_consoleEnabled
&& g_consoleKey
.empty()) {
2650 warnlog("Warning, the console has been enabled via 'controlSocket()' but no key has been set with 'setKey()' so all connections will fail until a key has been set");
2657 if(!g_cmdLine
.gid
.empty())
2658 newgid
= strToGID(g_cmdLine
.gid
.c_str());
2660 if(!g_cmdLine
.uid
.empty())
2661 newuid
= strToUID(g_cmdLine
.uid
.c_str());
2663 dropGroupPrivs(newgid
);
2664 dropUserPrivs(newuid
);
2666 /* we might still have capabilities remaining,
2667 for example if we have been started as root
2668 without --uid or --gid (please don't do that)
2669 or as an unprivileged user with ambient
2670 capabilities like CAP_NET_BIND_SERVICE.
2674 catch(const std::exception
& e
) {
2675 warnlog("%s", e
.what());
2678 /* this need to be done _after_ dropping privileges */
2679 g_delay
= new DelayPipe
<DelayedPacket
>();
2685 g_tcpclientthreads
= std::unique_ptr
<TCPClientCollection
>(new TCPClientCollection(g_maxTCPClientThreads
, g_useTCPSinglePipe
));
2690 localPools
= g_pools
.getCopy();
2691 /* create the default pool no matter what */
2692 createPoolIfNotExists(localPools
, "");
2693 if(g_cmdLine
.remotes
.size()) {
2694 for(const auto& address
: g_cmdLine
.remotes
) {
2695 auto ret
=std::make_shared
<DownstreamState
>(ComboAddress(address
, 53));
2696 addServerToPool(localPools
, "", ret
);
2697 if (ret
->connected
&& !ret
->threadStarted
.test_and_set()) {
2698 ret
->tid
= thread(responderThread
, ret
);
2700 g_dstates
.modify([ret
](servers_t
& servers
) { servers
.push_back(ret
); });
2703 g_pools
.setState(localPools
);
2705 if(g_dstates
.getLocal()->empty()) {
2706 errlog("No downstream servers defined: all packets will get dropped");
2707 // you might define them later, but you need to know
2710 checkFileDescriptorsLimits(udpBindsCount
, tcpBindsCount
);
2712 for(auto& dss
: g_dstates
.getCopy()) { // it is a copy, but the internal shared_ptrs are the real deal
2713 if(dss
->availability
==DownstreamState::Availability::Auto
) {
2714 bool newState
=upCheck(dss
);
2715 warnlog("Marking downstream %s as '%s'", dss
->getNameWithAddr(), newState
? "up" : "down");
2716 dss
->upStatus
= newState
;
2720 for(auto& cs
: g_frontends
) {
2721 if (cs
->dohFrontend
!= nullptr) {
2722 #ifdef HAVE_DNS_OVER_HTTPS
2723 std::thread
t1(dohThread
, cs
.get());
2724 if (!cs
->cpus
.empty()) {
2725 mapThreadToCPUList(t1
.native_handle(), cs
->cpus
);
2728 #endif /* HAVE_DNS_OVER_HTTPS */
2731 if (cs
->udpFD
>= 0) {
2732 thread
t1(udpClientThread
, cs
.get());
2733 if (!cs
->cpus
.empty()) {
2734 mapThreadToCPUList(t1
.native_handle(), cs
->cpus
);
2738 else if (cs
->tcpFD
>= 0) {
2739 thread
t1(tcpAcceptorThread
, cs
.get());
2740 if (!cs
->cpus
.empty()) {
2741 mapThreadToCPUList(t1
.native_handle(), cs
->cpus
);
2747 thread
carbonthread(carbonDumpThread
);
2748 carbonthread
.detach();
2750 thread
stattid(maintThread
);
2753 thread
healththread(healthChecksThread
);
2755 if (!g_secPollSuffix
.empty()) {
2756 thread
secpollthread(secPollThread
);
2757 secpollthread
.detach();
2760 if(g_cmdLine
.beSupervised
) {
2762 sd_notify(0, "READY=1");
2764 healththread
.join();
2767 healththread
.detach();
2770 _exit(EXIT_SUCCESS
);
2773 catch(const LuaContext::ExecutionErrorException
& e
) {
2775 errlog("Fatal Lua error: %s", e
.what());
2776 std::rethrow_if_nested(e
);
2777 } catch(const std::exception
& ne
) {
2778 errlog("Details: %s", ne
.what());
2780 catch(PDNSException
&ae
)
2782 errlog("Fatal pdns error: %s", ae
.reason
);
2784 _exit(EXIT_FAILURE
);
2786 catch(std::exception
&e
)
2788 errlog("Fatal error: %s", e
.what());
2789 _exit(EXIT_FAILURE
);
2791 catch(PDNSException
&ae
)
2793 errlog("Fatal pdns error: %s", ae
.reason
);
2794 _exit(EXIT_FAILURE
);
2797 uint64_t getLatencyCount(const std::string
&)
2799 return g_stats
.responses
+ g_stats
.selfAnswered
+ g_stats
.cacheHits
;