2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
29 #include <netinet/tcp.h>
31 #include <sys/resource.h>
34 #if defined (__OpenBSD__) || defined(__NetBSD__)
35 #include <readline/readline.h>
37 #include <editline/readline.h>
41 #include <systemd/sd-daemon.h>
45 #include "dnsdist-cache.hh"
46 #include "dnsdist-console.hh"
47 #include "dnsdist-ecs.hh"
48 #include "dnsdist-lua.hh"
49 #include "dnsdist-rings.hh"
50 #include "dnsdist-secpoll.hh"
51 #include "dnsdist-xpf.hh"
54 #include "delaypipe.hh"
57 #include "dnsparser.hh"
58 #include "dnswriter.hh"
59 #include "ednsoptions.hh"
63 #include "sodcrypto.hh"
65 #include "threadname.hh"
69 Receiver is currently single threaded
70 not *that* bad actually, but now that we are thread safe, might want to scale
74 Set of Rules, if one matches, it leads to an Action
75 Both rules and actions could conceivably be Lua based.
76 On the C++ side, both could be inherited from a class Rule and a class Action,
77 on the Lua side we can't do that. */
83 struct DNSDistStats g_stats
;
84 MetricDefinitionStorage g_metricDefinitions
;
86 uint16_t g_maxOutstanding
{10240};
87 bool g_verboseHealthChecks
{false};
88 uint32_t g_staleCacheEntriesTTL
{0};
90 bool g_allowEmptyResponse
{false};
92 GlobalStateHolder
<NetmaskGroup
> g_ACL
;
93 string g_outputBuffer
;
95 std::vector
<std::shared_ptr
<TLSFrontend
>> g_tlslocals
;
96 std::vector
<std::shared_ptr
<DOHFrontend
>> g_dohlocals
;
97 std::vector
<std::shared_ptr
<DNSCryptContext
>> g_dnsCryptLocals
;
99 shared_ptr
<BPFFilter
> g_defaultBPFFilter
;
100 std::vector
<std::shared_ptr
<DynBPFFilter
> > g_dynBPFFilters
;
101 #endif /* HAVE_EBPF */
102 std::vector
<std::unique_ptr
<ClientState
>> g_frontends
;
103 GlobalStateHolder
<pools_t
> g_pools
;
104 size_t g_udpVectorSize
{1};
106 bool g_snmpEnabled
{false};
107 bool g_snmpTrapsEnabled
{false};
108 DNSDistSNMPAgent
* g_snmpAgent
{nullptr};
110 /* UDP: the grand design. Per socket we listen on for incoming queries there is one thread.
111 Then we have a bunch of connected sockets for talking to downstream servers.
112 We send directly to those sockets.
114 For the return path, per downstream server we have a thread that listens to responses.
116 Per socket there is an array of 2^16 states, when we send out a packet downstream, we note
117 there the original requestor and the original id. The new ID is the offset in the array.
119 When an answer comes in on a socket, we look up the offset by the id, and lob it to the
122 IDs are assigned by atomic increments of the socket offset.
125 GlobalStateHolder
<vector
<DNSDistRuleAction
> > g_rulactions
;
126 GlobalStateHolder
<vector
<DNSDistResponseRuleAction
> > g_resprulactions
;
127 GlobalStateHolder
<vector
<DNSDistResponseRuleAction
> > g_cachehitresprulactions
;
128 GlobalStateHolder
<vector
<DNSDistResponseRuleAction
> > g_selfansweredresprulactions
;
133 GlobalStateHolder
<servers_t
> g_dstates
;
134 GlobalStateHolder
<NetmaskTree
<DynBlock
>> g_dynblockNMG
;
135 GlobalStateHolder
<SuffixMatchTree
<DynBlock
>> g_dynblockSMT
;
136 DNSAction::Action g_dynBlockAction
= DNSAction::Action::Drop
;
137 int g_tcpRecvTimeout
{2};
138 int g_tcpSendTimeout
{2};
141 bool g_servFailOnNoPolicy
{false};
142 bool g_truncateTC
{false};
143 bool g_fixupCase
{false};
144 bool g_preserveTrailingData
{false};
145 bool g_roundrobinFailOnNoServer
{false};
147 static void truncateTC(char* packet
, uint16_t* len
, size_t responseSize
, unsigned int consumed
)
150 bool hadEDNS
= false;
151 uint16_t payloadSize
= 0;
154 if (g_addEDNSToSelfGeneratedResponses
) {
155 hadEDNS
= getEDNSUDPPayloadSizeAndZ(packet
, *len
, &payloadSize
, &z
);
158 *len
=static_cast<uint16_t>(sizeof(dnsheader
)+consumed
+DNS_TYPE_SIZE
+DNS_CLASS_SIZE
);
159 struct dnsheader
* dh
= reinterpret_cast<struct dnsheader
*>(packet
);
160 dh
->ancount
= dh
->arcount
= dh
->nscount
= 0;
163 addEDNS(dh
, *len
, responseSize
, z
& EDNS_HEADER_FLAG_DO
, payloadSize
, 0);
175 ComboAddress destination
;
176 ComboAddress origDest
;
180 if(origDest
.sin4
.sin_family
== 0) {
181 res
= sendto(fd
, packet
.c_str(), packet
.size(), 0, (struct sockaddr
*)&destination
, destination
.getSocklen());
184 res
= sendfromto(fd
, packet
.c_str(), packet
.size(), 0, origDest
, destination
);
188 vinfolog("Error sending delayed response to %s: %s", destination
.toStringWithPort(), strerror(err
));
193 DelayPipe
<DelayedPacket
>* g_delay
= nullptr;
195 void doLatencyStats(double udiff
)
197 if(udiff
< 1000) ++g_stats
.latency0_1
;
198 else if(udiff
< 10000) ++g_stats
.latency1_10
;
199 else if(udiff
< 50000) ++g_stats
.latency10_50
;
200 else if(udiff
< 100000) ++g_stats
.latency50_100
;
201 else if(udiff
< 1000000) ++g_stats
.latency100_1000
;
202 else ++g_stats
.latencySlow
;
204 auto doAvg
= [](double& var
, double n
, double weight
) {
205 var
= (weight
-1) * var
/weight
+ n
/weight
;
208 doAvg(g_stats
.latencyAvg100
, udiff
, 100);
209 doAvg(g_stats
.latencyAvg1000
, udiff
, 1000);
210 doAvg(g_stats
.latencyAvg10000
, udiff
, 10000);
211 doAvg(g_stats
.latencyAvg1000000
, udiff
, 1000000);
214 bool responseContentMatches(const char* response
, const uint16_t responseLen
, const DNSName
& qname
, const uint16_t qtype
, const uint16_t qclass
, const ComboAddress
& remote
, unsigned int& consumed
)
216 if (responseLen
< sizeof(dnsheader
)) {
220 const struct dnsheader
* dh
= reinterpret_cast<const struct dnsheader
*>(response
);
221 if (dh
->qdcount
== 0) {
222 if ((dh
->rcode
!= RCode::NoError
&& dh
->rcode
!= RCode::NXDomain
) || g_allowEmptyResponse
) {
226 ++g_stats
.nonCompliantResponses
;
231 uint16_t rqtype
, rqclass
;
234 rqname
=DNSName(response
, responseLen
, sizeof(dnsheader
), false, &rqtype
, &rqclass
, &consumed
);
236 catch(const std::exception
& e
) {
237 if(responseLen
> 0 && static_cast<size_t>(responseLen
) > sizeof(dnsheader
)) {
238 infolog("Backend %s sent us a response with id %d that did not parse: %s", remote
.toStringWithPort(), ntohs(dh
->id
), e
.what());
240 ++g_stats
.nonCompliantResponses
;
244 if (rqtype
!= qtype
|| rqclass
!= qclass
|| rqname
!= qname
) {
251 static void restoreFlags(struct dnsheader
* dh
, uint16_t origFlags
)
253 static const uint16_t rdMask
= 1 << FLAGS_RD_OFFSET
;
254 static const uint16_t cdMask
= 1 << FLAGS_CD_OFFSET
;
255 static const uint16_t restoreFlagsMask
= UINT16_MAX
& ~(rdMask
| cdMask
);
256 uint16_t * flags
= getFlagsFromDNSHeader(dh
);
257 /* clear the flags we are about to restore */
258 *flags
&= restoreFlagsMask
;
259 /* only keep the flags we want to restore */
260 origFlags
&= ~restoreFlagsMask
;
261 /* set the saved flags as they were */
265 static bool fixUpQueryTurnedResponse(DNSQuestion
& dq
, const uint16_t origFlags
)
267 restoreFlags(dq
.dh
, origFlags
);
269 return addEDNSToQueryTurnedResponse(dq
);
272 static bool fixUpResponse(char** response
, uint16_t* responseLen
, size_t* responseSize
, const DNSName
& qname
, uint16_t origFlags
, bool ednsAdded
, bool ecsAdded
, std::vector
<uint8_t>& rewrittenResponse
, uint16_t addRoom
, bool* zeroScope
)
274 if (*responseLen
< sizeof(dnsheader
)) {
278 struct dnsheader
* dh
= reinterpret_cast<struct dnsheader
*>(*response
);
279 restoreFlags(dh
, origFlags
);
281 if (*responseLen
== sizeof(dnsheader
)) {
286 string realname
= qname
.toDNSString();
287 if (*responseLen
>= (sizeof(dnsheader
) + realname
.length())) {
288 memcpy(*response
+ sizeof(dnsheader
), realname
.c_str(), realname
.length());
292 if (ednsAdded
|| ecsAdded
) {
297 const std::string
responseStr(*response
, *responseLen
);
298 int res
= locateEDNSOptRR(responseStr
, &optStart
, &optLen
, &last
);
301 if (zeroScope
) { // this finds if an EDNS Client Subnet scope was set, and if it is 0
302 size_t optContentStart
= 0;
303 uint16_t optContentLen
= 0;
304 /* we need at least 4 bytes after the option length (family: 2, source prefix-length: 1, scope prefix-length: 1) */
305 if (isEDNSOptionInOpt(responseStr
, optStart
, optLen
, EDNSOptionCode::ECS
, &optContentStart
, &optContentLen
) && optContentLen
>= 4) {
306 /* see if the EDNS Client Subnet SCOPE PREFIX-LENGTH byte in position 3 is set to 0, which is the only thing
308 *zeroScope
= responseStr
.at(optContentStart
+ 3) == 0;
313 /* we added the entire OPT RR,
314 therefore we need to remove it entirely */
316 /* simply remove the last AR */
317 *responseLen
-= optLen
;
318 uint16_t arcount
= ntohs(dh
->arcount
);
320 dh
->arcount
= htons(arcount
);
323 /* Removing an intermediary RR could lead to compression error */
324 if (rewriteResponseWithoutEDNS(responseStr
, rewrittenResponse
) == 0) {
325 *responseLen
= rewrittenResponse
.size();
326 if (addRoom
&& (UINT16_MAX
- *responseLen
) > addRoom
) {
327 rewrittenResponse
.reserve(*responseLen
+ addRoom
);
329 *responseSize
= rewrittenResponse
.capacity();
330 *response
= reinterpret_cast<char*>(rewrittenResponse
.data());
333 warnlog("Error rewriting content");
338 /* the OPT RR was already present, but without ECS,
339 we need to remove the ECS option if any */
341 /* nothing after the OPT RR, we can simply remove the
343 size_t existingOptLen
= optLen
;
344 removeEDNSOptionFromOPT(*response
+ optStart
, &optLen
, EDNSOptionCode::ECS
);
345 *responseLen
-= (existingOptLen
- optLen
);
348 /* Removing an intermediary RR could lead to compression error */
349 if (rewriteResponseWithoutEDNSOption(responseStr
, EDNSOptionCode::ECS
, rewrittenResponse
) == 0) {
350 *responseLen
= rewrittenResponse
.size();
351 if (addRoom
&& (UINT16_MAX
- *responseLen
) > addRoom
) {
352 rewrittenResponse
.reserve(*responseLen
+ addRoom
);
354 *responseSize
= rewrittenResponse
.capacity();
355 *response
= reinterpret_cast<char*>(rewrittenResponse
.data());
358 warnlog("Error rewriting content");
369 static bool encryptResponse(char* response
, uint16_t* responseLen
, size_t responseSize
, bool tcp
, std::shared_ptr
<DNSCryptQuery
> dnsCryptQuery
, dnsheader
** dh
, dnsheader
* dhCopy
)
372 uint16_t encryptedResponseLen
= 0;
374 /* save the original header before encrypting it in place */
375 if (dh
!= nullptr && *dh
!= nullptr && dhCopy
!= nullptr) {
376 memcpy(dhCopy
, *dh
, sizeof(dnsheader
));
380 int res
= dnsCryptQuery
->encryptResponse(response
, *responseLen
, responseSize
, tcp
, &encryptedResponseLen
);
382 *responseLen
= encryptedResponseLen
;
384 /* dropping response */
385 vinfolog("Error encrypting the response, dropping.");
391 #endif /* HAVE_DNSCRYPT */
393 static bool applyRulesToResponse(LocalStateHolder
<vector
<DNSDistResponseRuleAction
> >& localRespRulactions
, DNSResponse
& dr
)
395 DNSResponseAction::Action action
=DNSResponseAction::Action::None
;
396 std::string ruleresult
;
397 for(const auto& lr
: *localRespRulactions
) {
398 if(lr
.d_rule
->matches(&dr
)) {
399 lr
.d_rule
->d_matches
++;
400 action
=(*lr
.d_action
)(&dr
, &ruleresult
);
402 case DNSResponseAction::Action::Allow
:
405 case DNSResponseAction::Action::Drop
:
408 case DNSResponseAction::Action::HeaderModify
:
411 case DNSResponseAction::Action::ServFail
:
412 dr
.dh
->rcode
= RCode::ServFail
;
415 /* non-terminal actions follow */
416 case DNSResponseAction::Action::Delay
:
417 dr
.delayMsec
= static_cast<int>(pdns_stou(ruleresult
)); // sorry
419 case DNSResponseAction::Action::None
:
428 bool processResponse(char** response
, uint16_t* responseLen
, size_t* responseSize
, LocalStateHolder
<vector
<DNSDistResponseRuleAction
> >& localRespRulactions
, DNSResponse
& dr
, size_t addRoom
, std::vector
<uint8_t>& rewrittenResponse
, bool muted
)
430 if (!applyRulesToResponse(localRespRulactions
, dr
)) {
434 bool zeroScope
= false;
435 if (!fixUpResponse(response
, responseLen
, responseSize
, *dr
.qname
, dr
.origFlags
, dr
.ednsAdded
, dr
.ecsAdded
, rewrittenResponse
, addRoom
, dr
.useZeroScope
? &zeroScope
: nullptr)) {
439 if (dr
.packetCache
&& !dr
.skipCache
) {
440 if (!dr
.useZeroScope
) {
441 /* if the query was not suitable for zero-scope, for
442 example because it had an existing ECS entry so the hash is
443 not really 'no ECS', so just insert it for the existing subnet
445 - we don't have the correct hash for a non-ECS query
446 - inserting with hash computed before the ECS replacement but with
447 the subnet extracted _after_ the replacement would not work.
451 // if zeroScope, pass the pre-ECS hash-key and do not pass the subnet to the cache
452 dr
.packetCache
->insert(zeroScope
? dr
.cacheKeyNoECS
: dr
.cacheKey
, zeroScope
? boost::none
: dr
.subnet
, dr
.origFlags
, dr
.dnssecOK
, *dr
.qname
, dr
.qtype
, dr
.qclass
, *response
, *responseLen
, dr
.tcp
, dr
.dh
->rcode
, dr
.tempFailureTTL
);
457 if (!encryptResponse(*response
, responseLen
, *responseSize
, dr
.tcp
, dr
.dnsCryptQuery
, nullptr, nullptr)) {
461 #endif /* HAVE_DNSCRYPT */
466 static bool sendUDPResponse(int origFD
, const char* response
, const uint16_t responseLen
, const int delayMsec
, const ComboAddress
& origDest
, const ComboAddress
& origRemote
)
468 if(delayMsec
&& g_delay
) {
469 DelayedPacket dp
{origFD
, string(response
,responseLen
), origRemote
, origDest
};
470 g_delay
->submit(dp
, delayMsec
);
474 if(origDest
.sin4
.sin_family
== 0) {
475 res
= sendto(origFD
, response
, responseLen
, 0, reinterpret_cast<const struct sockaddr
*>(&origRemote
), origRemote
.getSocklen());
478 res
= sendfromto(origFD
, response
, responseLen
, 0, origDest
, origRemote
);
482 vinfolog("Error sending response to %s: %s", origRemote
.toStringWithPort(), strerror(err
));
490 int pickBackendSocketForSending(std::shared_ptr
<DownstreamState
>& state
)
492 return state
->sockets
[state
->socketsOffset
++ % state
->sockets
.size()];
495 static void pickBackendSocketsReadyForReceiving(const std::shared_ptr
<DownstreamState
>& state
, std::vector
<int>& ready
)
499 if (state
->sockets
.size() == 1) {
500 ready
.push_back(state
->sockets
[0]);
505 std::lock_guard
<std::mutex
> lock(state
->socketsLock
);
506 state
->mplexer
->getAvailableFDs(ready
, -1);
510 // listens on a dedicated socket, lobs answers from downstream servers to original requestors
511 void responderThread(std::shared_ptr
<DownstreamState
> dss
)
513 setThreadName("dnsdist/respond");
514 auto localRespRulactions
= g_resprulactions
.getLocal();
515 char packet
[4096 + DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE
];
516 static_assert(sizeof(packet
) <= UINT16_MAX
, "Packet size should fit in a uint16_t");
517 /* when the answer is encrypted in place, we need to get a copy
518 of the original header before encryption to fill the ring buffer */
519 dnsheader cleartextDH
;
520 vector
<uint8_t> rewrittenResponse
;
522 uint16_t queryId
= 0;
523 std::vector
<int> sockets
;
524 sockets
.reserve(dss
->sockets
.size());
527 dnsheader
* dh
= reinterpret_cast<struct dnsheader
*>(packet
);
529 pickBackendSocketsReadyForReceiving(dss
, sockets
);
530 for (const auto& fd
: sockets
) {
531 ssize_t got
= recv(fd
, packet
, sizeof(packet
), 0);
532 char * response
= packet
;
533 size_t responseSize
= sizeof(packet
);
535 if (got
< 0 || static_cast<size_t>(got
) < sizeof(dnsheader
))
538 uint16_t responseLen
= static_cast<uint16_t>(got
);
541 if(queryId
>= dss
->idStates
.size()) {
545 IDState
* ids
= &dss
->idStates
[queryId
];
546 int origFD
= ids
->origFD
;
548 if(origFD
< 0 && ids
->du
== nullptr) // duplicate
551 /* setting age to 0 to prevent the maintainer thread from
552 cleaning this IDS while we process the response.
553 We have already a copy of the origFD, so it would
554 mostly mess up the outstanding counter.
558 unsigned int consumed
= 0;
559 if (!responseContentMatches(response
, responseLen
, ids
->qname
, ids
->qtype
, ids
->qclass
, dss
->remote
, consumed
)) {
563 int oldFD
= ids
->origFD
.exchange(-1);
564 if (oldFD
== origFD
) {
565 /* we only decrement the outstanding counter if the value was not
566 altered in the meantime, which would mean that the state has been actively reused
567 and the other thread has not incremented the outstanding counter, so we don't
568 want it to be decremented twice. */
569 --dss
->outstanding
; // you'd think an attacker could game this, but we're using connected socket
572 if(dh
->tc
&& g_truncateTC
) {
573 truncateTC(response
, &responseLen
, responseSize
, consumed
);
576 dh
->id
= ids
->origID
;
578 uint16_t addRoom
= 0;
579 DNSResponse dr
= makeDNSResponseFromIDState(*ids
, dh
, sizeof(packet
), responseLen
, false);
580 if (dr
.dnsCryptQuery
) {
581 addRoom
= DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE
;
584 memcpy(&cleartextDH
, dr
.dh
, sizeof(cleartextDH
));
585 if (!processResponse(&response
, &responseLen
, &responseSize
, localRespRulactions
, dr
, addRoom
, rewrittenResponse
, ids
->cs
&& ids
->cs
->muted
)) {
589 if (ids
->cs
&& !ids
->cs
->muted
) {
591 #ifdef HAVE_DNS_OVER_HTTPS
593 ids
->du
->query
= std::string(response
, responseLen
);
594 if (send(ids
->du
->rsock
, &ids
->du
, sizeof(ids
->du
), 0) != sizeof(ids
->du
)) {
597 #endif /* HAVE_DNS_OVER_HTTPS */
602 empty
.sin4
.sin_family
= 0;
603 /* if ids->destHarvested is false, origDest holds the listening address.
604 We don't want to use that as a source since it could be 0.0.0.0 for example. */
605 sendUDPResponse(origFD
, response
, responseLen
, dr
.delayMsec
, ids
->destHarvested
? ids
->origDest
: empty
, ids
->origRemote
);
611 double udiff
= ids
->sentTime
.udiff();
612 vinfolog("Got answer from %s, relayed to %s%s, took %f usec", dss
->remote
.toStringWithPort(), ids
->origRemote
.toStringWithPort(),
613 ids
->du
? " (https)": "", udiff
);
617 g_rings
.insertResponse(ts
, *dr
.remote
, *dr
.qname
, dr
.qtype
, static_cast<unsigned int>(udiff
), static_cast<unsigned int>(got
), cleartextDH
, dss
->remote
);
620 case RCode::NXDomain
:
621 ++g_stats
.frontendNXDomain
;
623 case RCode::ServFail
:
624 ++g_stats
.servfailResponses
;
625 ++g_stats
.frontendServFail
;
628 ++g_stats
.frontendNoError
;
631 dss
->latencyUsec
= (127.0 * dss
->latencyUsec
/ 128.0) + udiff
/128.0;
633 doLatencyStats(udiff
);
635 rewrittenResponse
.clear();
638 catch(const std::exception
& e
){
639 vinfolog("Got an error in UDP responder thread while parsing a response from %s, id %d: %s", dss
->remote
.toStringWithPort(), queryId
, e
.what());
643 catch(const std::exception
& e
)
645 errlog("UDP responder thread died because of exception: %s", e
.what());
647 catch(const PDNSException
& e
)
649 errlog("UDP responder thread died because of PowerDNS exception: %s", e
.reason
);
653 errlog("UDP responder thread died because of an exception: %s", "unknown");
656 bool DownstreamState::reconnect()
658 std::unique_lock
<std::mutex
> tl(connectLock
, std::try_to_lock
);
659 if (!tl
.owns_lock()) {
660 /* we are already reconnecting */
665 for (auto& fd
: sockets
) {
667 if (sockets
.size() > 1) {
668 std::lock_guard
<std::mutex
> lock(socketsLock
);
669 mplexer
->removeReadFD(fd
);
671 /* shutdown() is needed to wake up recv() in the responderThread */
672 shutdown(fd
, SHUT_RDWR
);
676 if (!IsAnyAddress(remote
)) {
677 fd
= SSocket(remote
.sin4
.sin_family
, SOCK_DGRAM
, 0);
678 if (!IsAnyAddress(sourceAddr
)) {
679 SSetsockopt(fd
, SOL_SOCKET
, SO_REUSEADDR
, 1);
680 SBind(fd
, sourceAddr
);
683 SConnect(fd
, remote
);
684 if (sockets
.size() > 1) {
685 std::lock_guard
<std::mutex
> lock(socketsLock
);
686 mplexer
->addReadFD(fd
, [](int, boost::any
) {});
690 catch(const std::runtime_error
& error
) {
691 infolog("Error connecting to new server with address %s: %s", remote
.toStringWithPort(), error
.what());
698 /* if at least one (re-)connection failed, close all sockets */
700 for (auto& fd
: sockets
) {
702 if (sockets
.size() > 1) {
703 std::lock_guard
<std::mutex
> lock(socketsLock
);
704 mplexer
->removeReadFD(fd
);
706 /* shutdown() is needed to wake up recv() in the responderThread */
707 shutdown(fd
, SHUT_RDWR
);
716 void DownstreamState::hash()
718 vinfolog("Computing hashes for id=%s and weight=%d", id
, weight
);
720 WriteLock
wl(&d_lock
);
723 std::string uuid
= boost::str(boost::format("%s-%d") % id
% w
);
724 unsigned int wshash
= burtleCI((const unsigned char*)uuid
.c_str(), uuid
.size(), g_hashperturb
);
725 hashes
.insert(wshash
);
730 void DownstreamState::setId(const boost::uuids::uuid
& newId
)
733 // compute hashes only if already done
734 if (!hashes
.empty()) {
739 void DownstreamState::setWeight(int newWeight
)
742 errlog("Error setting server's weight: downstream weight value must be greater than 0.");
746 if (!hashes
.empty()) {
751 DownstreamState::DownstreamState(const ComboAddress
& remote_
, const ComboAddress
& sourceAddr_
, unsigned int sourceItf_
, size_t numberOfSockets
): remote(remote_
), sourceAddr(sourceAddr_
), sourceItf(sourceItf_
)
753 pthread_rwlock_init(&d_lock
, nullptr);
755 threadStarted
.clear();
757 mplexer
= std::unique_ptr
<FDMultiplexer
>(FDMultiplexer::getMultiplexerSilent());
759 sockets
.resize(numberOfSockets
);
760 for (auto& fd
: sockets
) {
764 if (!IsAnyAddress(remote
)) {
766 idStates
.resize(g_maxOutstanding
);
768 infolog("Added downstream server %s", remote
.toStringWithPort());
773 std::mutex g_luamutex
;
776 GlobalStateHolder
<ServerPolicy
> g_policy
;
778 shared_ptr
<DownstreamState
> firstAvailable(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
780 for(auto& d
: servers
) {
781 if(d
.second
->isUp() && d
.second
->qps
.check())
784 return leastOutstanding(servers
, dq
);
787 // get server with least outstanding queries, and within those, with the lowest order, and within those: the fastest
788 shared_ptr
<DownstreamState
> leastOutstanding(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
790 if (servers
.size() == 1 && servers
[0].second
->isUp()) {
791 return servers
[0].second
;
794 vector
<pair
<tuple
<int,int,double>, shared_ptr
<DownstreamState
>>> poss
;
795 /* so you might wonder, why do we go through this trouble? The data on which we sort could change during the sort,
796 which would suck royally and could even lead to crashes. So first we snapshot on what we sort, and then we sort */
797 poss
.reserve(servers
.size());
798 for(auto& d
: servers
) {
799 if(d
.second
->isUp()) {
800 poss
.push_back({make_tuple(d
.second
->outstanding
.load(), d
.second
->order
, d
.second
->latencyUsec
), d
.second
});
804 return shared_ptr
<DownstreamState
>();
805 nth_element(poss
.begin(), poss
.begin(), poss
.end(), [](const decltype(poss
)::value_type
& a
, const decltype(poss
)::value_type
& b
) { return a
.first
< b
.first
; });
806 return poss
.begin()->second
;
809 shared_ptr
<DownstreamState
> valrandom(unsigned int val
, const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
811 vector
<pair
<int, shared_ptr
<DownstreamState
>>> poss
;
813 int max
= std::numeric_limits
<int>::max();
815 for(auto& d
: servers
) { // w=1, w=10 -> 1, 11
816 if(d
.second
->isUp()) {
817 // Don't overflow sum when adding high weights
818 if(d
.second
->weight
> max
- sum
) {
821 sum
+= d
.second
->weight
;
824 poss
.push_back({sum
, d
.second
});
828 // Catch poss & sum are empty to avoid SIGFPE
830 return shared_ptr
<DownstreamState
>();
833 auto p
= upper_bound(poss
.begin(), poss
.end(),r
, [](int r_
, const decltype(poss
)::value_type
& a
) { return r_
< a
.first
;});
835 return shared_ptr
<DownstreamState
>();
839 shared_ptr
<DownstreamState
> wrandom(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
841 return valrandom(random(), servers
, dq
);
844 uint32_t g_hashperturb
;
845 shared_ptr
<DownstreamState
> whashed(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
847 return valrandom(dq
->qname
->hash(g_hashperturb
), servers
, dq
);
850 shared_ptr
<DownstreamState
> chashed(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
852 unsigned int qhash
= dq
->qname
->hash(g_hashperturb
);
853 unsigned int sel
= std::numeric_limits
<unsigned int>::max();
854 unsigned int min
= std::numeric_limits
<unsigned int>::max();
855 shared_ptr
<DownstreamState
> ret
= nullptr, first
= nullptr;
857 for (const auto& d
: servers
) {
858 if (d
.second
->isUp()) {
859 // make sure hashes have been computed
860 if (d
.second
->hashes
.empty()) {
864 ReadLock
rl(&(d
.second
->d_lock
));
865 const auto& server
= d
.second
;
866 // we want to keep track of the last hash
867 if (min
> *(server
->hashes
.begin())) {
868 min
= *(server
->hashes
.begin());
872 auto hash_it
= server
->hashes
.lower_bound(qhash
);
873 if (hash_it
!= server
->hashes
.end()) {
874 if (*hash_it
< sel
) {
882 if (ret
!= nullptr) {
885 if (first
!= nullptr) {
888 return shared_ptr
<DownstreamState
>();
891 shared_ptr
<DownstreamState
> roundrobin(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
893 NumberedServerVector poss
;
895 for(auto& d
: servers
) {
896 if(d
.second
->isUp()) {
901 const auto *res
=&poss
;
902 if(poss
.empty() && !g_roundrobinFailOnNoServer
)
906 return shared_ptr
<DownstreamState
>();
908 static unsigned int counter
;
910 return (*res
)[(counter
++) % res
->size()].second
;
913 ComboAddress g_serverControl
{"127.0.0.1:5199"};
915 std::shared_ptr
<ServerPool
> createPoolIfNotExists(pools_t
& pools
, const string
& poolName
)
917 std::shared_ptr
<ServerPool
> pool
;
918 pools_t::iterator it
= pools
.find(poolName
);
919 if (it
!= pools
.end()) {
923 if (!poolName
.empty())
924 vinfolog("Creating pool %s", poolName
);
925 pool
= std::make_shared
<ServerPool
>();
926 pools
.insert(std::pair
<std::string
,std::shared_ptr
<ServerPool
> >(poolName
, pool
));
931 void setPoolPolicy(pools_t
& pools
, const string
& poolName
, std::shared_ptr
<ServerPolicy
> policy
)
933 std::shared_ptr
<ServerPool
> pool
= createPoolIfNotExists(pools
, poolName
);
934 if (!poolName
.empty()) {
935 vinfolog("Setting pool %s server selection policy to %s", poolName
, policy
->name
);
937 vinfolog("Setting default pool server selection policy to %s", policy
->name
);
939 pool
->policy
= policy
;
942 void addServerToPool(pools_t
& pools
, const string
& poolName
, std::shared_ptr
<DownstreamState
> server
)
944 std::shared_ptr
<ServerPool
> pool
= createPoolIfNotExists(pools
, poolName
);
945 if (!poolName
.empty()) {
946 vinfolog("Adding server to pool %s", poolName
);
948 vinfolog("Adding server to default pool");
950 pool
->addServer(server
);
953 void removeServerFromPool(pools_t
& pools
, const string
& poolName
, std::shared_ptr
<DownstreamState
> server
)
955 std::shared_ptr
<ServerPool
> pool
= getPool(pools
, poolName
);
957 if (!poolName
.empty()) {
958 vinfolog("Removing server from pool %s", poolName
);
961 vinfolog("Removing server from default pool");
964 pool
->removeServer(server
);
967 std::shared_ptr
<ServerPool
> getPool(const pools_t
& pools
, const std::string
& poolName
)
969 pools_t::const_iterator it
= pools
.find(poolName
);
971 if (it
== pools
.end()) {
972 throw std::out_of_range("No pool named " + poolName
);
978 NumberedServerVector
getDownstreamCandidates(const pools_t
& pools
, const std::string
& poolName
)
980 std::shared_ptr
<ServerPool
> pool
= getPool(pools
, poolName
);
981 return pool
->getServers();
984 static void spoofResponseFromString(DNSQuestion
& dq
, const string
& spoofContent
)
988 std::vector
<std::string
> addrs
;
989 stringtok(addrs
, spoofContent
, " ,");
991 if (addrs
.size() == 1) {
993 ComboAddress
spoofAddr(spoofContent
);
994 SpoofAction
sa({spoofAddr
});
997 catch(const PDNSException
&e
) {
998 SpoofAction
sa(spoofContent
); // CNAME then
1002 std::vector
<ComboAddress
> cas
;
1003 for (const auto& addr
: addrs
) {
1005 cas
.push_back(ComboAddress(addr
));
1010 SpoofAction
sa(cas
);
1015 static bool applyRulesToQuery(LocalHolders
& holders
, DNSQuestion
& dq
, string
& poolname
, const struct timespec
& now
)
1017 g_rings
.insertQuery(now
, *dq
.remote
, *dq
.qname
, dq
.qtype
, dq
.len
, *dq
.dh
);
1019 if(g_qcount
.enabled
) {
1020 string qname
= (*dq
.qname
).toString(".");
1021 bool countQuery
{true};
1022 if(g_qcount
.filter
) {
1023 std::lock_guard
<std::mutex
> lock(g_luamutex
);
1024 std::tie (countQuery
, qname
) = g_qcount
.filter(&dq
);
1028 WriteLock
wl(&g_qcount
.queryLock
);
1029 if(!g_qcount
.records
.count(qname
)) {
1030 g_qcount
.records
[qname
] = 0;
1032 g_qcount
.records
[qname
]++;
1036 if(auto got
= holders
.dynNMGBlock
->lookup(*dq
.remote
)) {
1037 auto updateBlockStats
= [&got
]() {
1038 ++g_stats
.dynBlocked
;
1039 got
->second
.blocks
++;
1042 if(now
< got
->second
.until
) {
1043 DNSAction::Action action
= got
->second
.action
;
1044 if (action
== DNSAction::Action::None
) {
1045 action
= g_dynBlockAction
;
1048 case DNSAction::Action::NoOp
:
1052 case DNSAction::Action::Nxdomain
:
1053 vinfolog("Query from %s turned into NXDomain because of dynamic block", dq
.remote
->toStringWithPort());
1056 dq
.dh
->rcode
= RCode::NXDomain
;
1060 case DNSAction::Action::Refused
:
1061 vinfolog("Query from %s refused because of dynamic block", dq
.remote
->toStringWithPort());
1064 dq
.dh
->rcode
= RCode::Refused
;
1068 case DNSAction::Action::Truncate
:
1071 vinfolog("Query from %s truncated because of dynamic block", dq
.remote
->toStringWithPort());
1077 vinfolog("Query from %s for %s over TCP *not* truncated because of dynamic block", dq
.remote
->toStringWithPort(), dq
.qname
->toString());
1080 case DNSAction::Action::NoRecurse
:
1082 vinfolog("Query from %s setting rd=0 because of dynamic block", dq
.remote
->toStringWithPort());
1087 vinfolog("Query from %s dropped because of dynamic block", dq
.remote
->toStringWithPort());
1093 if(auto got
= holders
.dynSMTBlock
->lookup(*dq
.qname
)) {
1094 auto updateBlockStats
= [&got
]() {
1095 ++g_stats
.dynBlocked
;
1099 if(now
< got
->until
) {
1100 DNSAction::Action action
= got
->action
;
1101 if (action
== DNSAction::Action::None
) {
1102 action
= g_dynBlockAction
;
1105 case DNSAction::Action::NoOp
:
1108 case DNSAction::Action::Nxdomain
:
1109 vinfolog("Query from %s for %s turned into NXDomain because of dynamic block", dq
.remote
->toStringWithPort(), dq
.qname
->toString());
1112 dq
.dh
->rcode
= RCode::NXDomain
;
1115 case DNSAction::Action::Refused
:
1116 vinfolog("Query from %s for %s refused because of dynamic block", dq
.remote
->toStringWithPort(), dq
.qname
->toString());
1119 dq
.dh
->rcode
= RCode::Refused
;
1122 case DNSAction::Action::Truncate
:
1126 vinfolog("Query from %s for %s truncated because of dynamic block", dq
.remote
->toStringWithPort(), dq
.qname
->toString());
1132 vinfolog("Query from %s for %s over TCP *not* truncated because of dynamic block", dq
.remote
->toStringWithPort(), dq
.qname
->toString());
1135 case DNSAction::Action::NoRecurse
:
1137 vinfolog("Query from %s setting rd=0 because of dynamic block", dq
.remote
->toStringWithPort());
1142 vinfolog("Query from %s for %s dropped because of dynamic block", dq
.remote
->toStringWithPort(), dq
.qname
->toString());
1148 DNSAction::Action action
=DNSAction::Action::None
;
1150 for(const auto& lr
: *holders
.rulactions
) {
1151 if(lr
.d_rule
->matches(&dq
)) {
1152 lr
.d_rule
->d_matches
++;
1153 action
=(*lr
.d_action
)(&dq
, &ruleresult
);
1156 case DNSAction::Action::Allow
:
1159 case DNSAction::Action::Drop
:
1163 case DNSAction::Action::Nxdomain
:
1164 dq
.dh
->rcode
= RCode::NXDomain
;
1166 ++g_stats
.ruleNXDomain
;
1169 case DNSAction::Action::Refused
:
1170 dq
.dh
->rcode
= RCode::Refused
;
1172 ++g_stats
.ruleRefused
;
1175 case DNSAction::Action::ServFail
:
1176 dq
.dh
->rcode
= RCode::ServFail
;
1178 ++g_stats
.ruleServFail
;
1181 case DNSAction::Action::Spoof
:
1182 spoofResponseFromString(dq
, ruleresult
);
1185 case DNSAction::Action::Truncate
:
1190 case DNSAction::Action::HeaderModify
:
1193 case DNSAction::Action::Pool
:
1194 poolname
=ruleresult
;
1197 /* non-terminal actions follow */
1198 case DNSAction::Action::Delay
:
1199 dq
.delayMsec
= static_cast<int>(pdns_stou(ruleresult
)); // sorry
1201 case DNSAction::Action::None
:
1203 case DNSAction::Action::NoOp
:
1205 case DNSAction::Action::NoRecurse
:
1216 ssize_t
udpClientSendRequestToBackend(const std::shared_ptr
<DownstreamState
>& ss
, const int sd
, const char* request
, const size_t requestLen
, bool healthCheck
)
1220 if (ss
->sourceItf
== 0) {
1221 result
= send(sd
, request
, requestLen
, 0);
1227 ComboAddress
remote(ss
->remote
);
1228 fillMSGHdr(&msgh
, &iov
, cbuf
, sizeof(cbuf
), const_cast<char*>(request
), requestLen
, &remote
);
1229 addCMsgSrcAddr(&msgh
, cbuf
, &ss
->sourceAddr
, ss
->sourceItf
);
1230 result
= sendmsg(sd
, &msgh
, 0);
1234 int savederrno
= errno
;
1235 vinfolog("Error sending request to backend %s: %d", ss
->remote
.toStringWithPort(), savederrno
);
1237 /* This might sound silly, but on Linux send() might fail with EINVAL
1238 if the interface the socket was bound to doesn't exist anymore.
1239 We don't want to reconnect the real socket if the healthcheck failed,
1240 because it's not using the same socket.
1242 if (!healthCheck
&& (savederrno
== EINVAL
|| savederrno
== ENODEV
)) {
1250 static bool isUDPQueryAcceptable(ClientState
& cs
, LocalHolders
& holders
, const struct msghdr
* msgh
, const ComboAddress
& remote
, ComboAddress
& dest
)
1252 if (msgh
->msg_flags
& MSG_TRUNC
) {
1253 /* message was too large for our buffer */
1254 vinfolog("Dropping message too large for our buffer");
1255 ++g_stats
.nonCompliantQueries
;
1259 if(!holders
.acl
->match(remote
)) {
1260 vinfolog("Query from %s dropped because of ACL", remote
.toStringWithPort());
1268 if (HarvestDestinationAddress(msgh
, &dest
)) {
1269 /* we don't get the port, only the address */
1270 dest
.sin4
.sin_port
= cs
.local
.sin4
.sin_port
;
1273 dest
.sin4
.sin_family
= 0;
1279 boost::optional
<std::vector
<uint8_t>> checkDNSCryptQuery(const ClientState
& cs
, const char* query
, uint16_t& len
, std::shared_ptr
<DNSCryptQuery
>& dnsCryptQuery
, time_t now
, bool tcp
)
1281 if (cs
.dnscryptCtx
) {
1282 #ifdef HAVE_DNSCRYPT
1283 vector
<uint8_t> response
;
1284 uint16_t decryptedQueryLen
= 0;
1286 dnsCryptQuery
= std::make_shared
<DNSCryptQuery
>(cs
.dnscryptCtx
);
1288 bool decrypted
= handleDNSCryptQuery(const_cast<char*>(query
), len
, dnsCryptQuery
, &decryptedQueryLen
, tcp
, now
, response
);
1291 if (response
.size() > 0) {
1294 throw std::runtime_error("Unable to decrypt DNSCrypt query, dropping.");
1297 len
= decryptedQueryLen
;
1298 #endif /* HAVE_DNSCRYPT */
1303 bool checkQueryHeaders(const struct dnsheader
* dh
)
1305 if (dh
->qr
) { // don't respond to responses
1306 ++g_stats
.nonCompliantQueries
;
1310 if (dh
->qdcount
== 0) {
1311 ++g_stats
.emptyQueries
;
1316 ++g_stats
.rdQueries
;
1322 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1323 static void queueResponse(const ClientState
& cs
, const char* response
, uint16_t responseLen
, const ComboAddress
& dest
, const ComboAddress
& remote
, struct mmsghdr
& outMsg
, struct iovec
* iov
, char* cbuf
)
1326 fillMSGHdr(&outMsg
.msg_hdr
, iov
, nullptr, 0, const_cast<char*>(response
), responseLen
, const_cast<ComboAddress
*>(&remote
));
1328 if (dest
.sin4
.sin_family
== 0) {
1329 outMsg
.msg_hdr
.msg_control
= nullptr;
1332 addCMsgSrcAddr(&outMsg
.msg_hdr
, cbuf
, &dest
, 0);
1335 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1337 /* self-generated responses or cache hits */
1338 static bool prepareOutgoingResponse(LocalHolders
& holders
, ClientState
& cs
, DNSQuestion
& dq
, bool cacheHit
)
1340 DNSResponse
dr(dq
.qname
, dq
.qtype
, dq
.qclass
, dq
.consumed
, dq
.local
, dq
.remote
, reinterpret_cast<dnsheader
*>(dq
.dh
), dq
.size
, dq
.len
, dq
.tcp
, dq
.queryTime
);
1342 #ifdef HAVE_PROTOBUF
1343 dr
.uniqueId
= dq
.uniqueId
;
1346 dr
.delayMsec
= dq
.delayMsec
;
1348 if (!applyRulesToResponse(cacheHit
? holders
.cacheHitRespRulactions
: holders
.selfAnsweredRespRulactions
, dr
)) {
1352 /* in case a rule changed it */
1353 dq
.delayMsec
= dr
.delayMsec
;
1355 #ifdef HAVE_DNSCRYPT
1357 if (!encryptResponse(reinterpret_cast<char*>(dq
.dh
), &dq
.len
, dq
.size
, dq
.tcp
, dq
.dnsCryptQuery
, nullptr, nullptr)) {
1361 #endif /* HAVE_DNSCRYPT */
1364 ++g_stats
.cacheHits
;
1367 switch (dr
.dh
->rcode
) {
1368 case RCode::NXDomain
:
1369 ++g_stats
.frontendNXDomain
;
1371 case RCode::ServFail
:
1372 ++g_stats
.frontendServFail
;
1374 case RCode::NoError
:
1375 ++g_stats
.frontendNoError
;
1379 doLatencyStats(0); // we're not going to measure this
1383 ProcessQueryResult
processQuery(DNSQuestion
& dq
, ClientState
& cs
, LocalHolders
& holders
, std::shared_ptr
<DownstreamState
>& selectedBackend
)
1385 const uint16_t queryId
= ntohs(dq
.dh
->id
);
1388 /* we need an accurate ("real") value for the response and
1389 to store into the IDS, but not for insertion into the
1390 rings for example */
1391 struct timespec now
;
1396 if (!applyRulesToQuery(holders
, dq
, poolname
, now
)) {
1397 return ProcessQueryResult::Drop
;
1400 if(dq
.dh
->qr
) { // something turned it into a response
1401 fixUpQueryTurnedResponse(dq
, dq
.origFlags
);
1403 if (!prepareOutgoingResponse(holders
, cs
, dq
, false)) {
1404 return ProcessQueryResult::Drop
;
1407 ++g_stats
.selfAnswered
;
1408 return ProcessQueryResult::SendAnswer
;
1411 std::shared_ptr
<ServerPool
> serverPool
= getPool(*holders
.pools
, poolname
);
1412 dq
.packetCache
= serverPool
->packetCache
;
1413 auto policy
= *(holders
.policy
);
1414 if (serverPool
->policy
!= nullptr) {
1415 policy
= *(serverPool
->policy
);
1417 auto servers
= serverPool
->getServers();
1419 std::lock_guard
<std::mutex
> lock(g_luamutex
);
1420 selectedBackend
= policy
.policy(servers
, &dq
);
1423 selectedBackend
= policy
.policy(servers
, &dq
);
1426 uint16_t cachedResponseSize
= dq
.size
;
1427 uint32_t allowExpired
= selectedBackend
? 0 : g_staleCacheEntriesTTL
;
1429 if (dq
.packetCache
&& !dq
.skipCache
) {
1430 dq
.dnssecOK
= (getEDNSZ(dq
) & EDNS_HEADER_FLAG_DO
);
1433 if (dq
.useECS
&& ((selectedBackend
&& selectedBackend
->useECS
) || (!selectedBackend
&& serverPool
->getECS()))) {
1434 // we special case our cache in case a downstream explicitly gave us a universally valid response with a 0 scope
1435 if (dq
.packetCache
&& !dq
.skipCache
&& (!selectedBackend
|| !selectedBackend
->disableZeroScope
) && dq
.packetCache
->isECSParsingEnabled()) {
1436 if (dq
.packetCache
->get(dq
, dq
.consumed
, dq
.dh
->id
, reinterpret_cast<char*>(dq
.dh
), &cachedResponseSize
, &dq
.cacheKeyNoECS
, dq
.subnet
, dq
.dnssecOK
, allowExpired
)) {
1437 dq
.len
= cachedResponseSize
;
1439 if (!prepareOutgoingResponse(holders
, cs
, dq
, true)) {
1440 return ProcessQueryResult::Drop
;
1443 return ProcessQueryResult::SendAnswer
;
1447 /* there was no existing ECS on the query, enable the zero-scope feature */
1448 dq
.useZeroScope
= true;
1452 if (!handleEDNSClientSubnet(dq
, &(dq
.ednsAdded
), &(dq
.ecsAdded
), g_preserveTrailingData
)) {
1453 vinfolog("Dropping query from %s because we couldn't insert the ECS value", dq
.remote
->toStringWithPort());
1454 return ProcessQueryResult::Drop
;
1458 if (dq
.packetCache
&& !dq
.skipCache
) {
1459 if (dq
.packetCache
->get(dq
, dq
.consumed
, dq
.dh
->id
, reinterpret_cast<char*>(dq
.dh
), &cachedResponseSize
, &dq
.cacheKey
, dq
.subnet
, dq
.dnssecOK
, allowExpired
)) {
1460 dq
.len
= cachedResponseSize
;
1462 if (!prepareOutgoingResponse(holders
, cs
, dq
, true)) {
1463 return ProcessQueryResult::Drop
;
1466 return ProcessQueryResult::SendAnswer
;
1468 ++g_stats
.cacheMisses
;
1471 if(!selectedBackend
) {
1474 vinfolog("%s query for %s|%s from %s, no policy applied", g_servFailOnNoPolicy
? "ServFailed" : "Dropped", dq
.qname
->toString(), QType(dq
.qtype
).getName(), dq
.remote
->toStringWithPort());
1475 if (g_servFailOnNoPolicy
) {
1476 restoreFlags(dq
.dh
, dq
.origFlags
);
1478 dq
.dh
->rcode
= RCode::ServFail
;
1481 if (!prepareOutgoingResponse(holders
, cs
, dq
, false)) {
1482 return ProcessQueryResult::Drop
;
1484 // no response-only statistics counter to update.
1485 return ProcessQueryResult::SendAnswer
;
1488 return ProcessQueryResult::Drop
;
1491 if (dq
.addXPF
&& selectedBackend
->xpfRRCode
!= 0) {
1492 addXPF(dq
, selectedBackend
->xpfRRCode
, g_preserveTrailingData
);
1495 selectedBackend
->queries
++;
1496 return ProcessQueryResult::PassToBackend
;
1498 catch(const std::exception
& e
){
1499 vinfolog("Got an error while parsing a %s query from %s, id %d: %s", (dq
.tcp
? "TCP" : "UDP"), dq
.remote
->toStringWithPort(), queryId
, e
.what());
1501 return ProcessQueryResult::Drop
;
1504 static void processUDPQuery(ClientState
& cs
, LocalHolders
& holders
, const struct msghdr
* msgh
, const ComboAddress
& remote
, ComboAddress
& dest
, char* query
, uint16_t len
, size_t queryBufferSize
, struct mmsghdr
* responsesVect
, unsigned int* queuedResponses
, struct iovec
* respIOV
, char* respCBuf
)
1506 assert(responsesVect
== nullptr || (queuedResponses
!= nullptr && respIOV
!= nullptr && respCBuf
!= nullptr));
1507 uint16_t queryId
= 0;
1510 if (!isUDPQueryAcceptable(cs
, holders
, msgh
, remote
, dest
)) {
1514 /* we need an accurate ("real") value for the response and
1515 to store into the IDS, but not for insertion into the
1516 rings for example */
1517 struct timespec queryRealTime
;
1518 gettime(&queryRealTime
, true);
1520 std::shared_ptr
<DNSCryptQuery
> dnsCryptQuery
= nullptr;
1521 auto dnsCryptResponse
= checkDNSCryptQuery(cs
, query
, len
, dnsCryptQuery
, queryRealTime
.tv_sec
, false);
1522 if (dnsCryptResponse
) {
1523 sendUDPResponse(cs
.udpFD
, reinterpret_cast<char*>(dnsCryptResponse
->data()), static_cast<uint16_t>(dnsCryptResponse
->size()), 0, dest
, remote
);
1527 struct dnsheader
* dh
= reinterpret_cast<struct dnsheader
*>(query
);
1528 queryId
= ntohs(dh
->id
);
1530 if (!checkQueryHeaders(dh
)) {
1534 uint16_t qtype
, qclass
;
1535 unsigned int consumed
= 0;
1536 DNSName
qname(query
, len
, sizeof(dnsheader
), false, &qtype
, &qclass
, &consumed
);
1537 DNSQuestion
dq(&qname
, qtype
, qclass
, consumed
, dest
.sin4
.sin_family
!= 0 ? &dest
: &cs
.local
, &remote
, dh
, queryBufferSize
, len
, false, &queryRealTime
);
1538 dq
.dnsCryptQuery
= std::move(dnsCryptQuery
);
1539 std::shared_ptr
<DownstreamState
> ss
{nullptr};
1540 auto result
= processQuery(dq
, cs
, holders
, ss
);
1542 if (result
== ProcessQueryResult::Drop
) {
1546 if (result
== ProcessQueryResult::SendAnswer
) {
1547 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1548 if (dq
.delayMsec
== 0 && responsesVect
!= nullptr) {
1549 queueResponse(cs
, reinterpret_cast<char*>(dq
.dh
), dq
.len
, *dq
.local
, *dq
.remote
, responsesVect
[*queuedResponses
], respIOV
, respCBuf
);
1550 (*queuedResponses
)++;
1553 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1554 /* we use dest, always, because we don't want to use the listening address to send a response since it could be 0.0.0.0 */
1555 sendUDPResponse(cs
.udpFD
, reinterpret_cast<char*>(dq
.dh
), dq
.len
, dq
.delayMsec
, dest
, *dq
.remote
);
1559 if (result
!= ProcessQueryResult::PassToBackend
|| ss
== nullptr) {
1563 unsigned int idOffset
= (ss
->idOffset
++) % ss
->idStates
.size();
1564 IDState
* ids
= &ss
->idStates
[idOffset
];
1568 int oldFD
= ids
->origFD
.exchange(cs
.udpFD
);
1570 // if we are reusing, no change in outstanding
1575 ++g_stats
.downstreamTimeouts
;
1579 ids
->origID
= dh
->id
;
1580 setIDStateFromDNSQuestion(*ids
, dq
, std::move(qname
));
1582 /* If we couldn't harvest the real dest addr, still
1583 write down the listening addr since it will be useful
1584 (especially if it's not an 'any' one).
1585 We need to keep track of which one it is since we may
1586 want to use the real but not the listening addr to reply.
1588 if (dest
.sin4
.sin_family
!= 0) {
1589 ids
->origDest
= dest
;
1590 ids
->destHarvested
= true;
1593 ids
->origDest
= cs
.local
;
1594 ids
->destHarvested
= false;
1599 int fd
= pickBackendSocketForSending(ss
);
1600 ssize_t ret
= udpClientSendRequestToBackend(ss
, fd
, query
, dq
.len
);
1604 ++g_stats
.downstreamSendErrors
;
1607 vinfolog("Got query for %s|%s from %s, relayed to %s", ids
->qname
.toString(), QType(ids
->qtype
).getName(), remote
.toStringWithPort(), ss
->getName());
1609 catch(const std::exception
& e
){
1610 vinfolog("Got an error in UDP question thread while parsing a query from %s, id %d: %s", remote
.toStringWithPort(), queryId
, e
.what());
1614 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1615 static void MultipleMessagesUDPClientThread(ClientState
* cs
, LocalHolders
& holders
)
1620 /* used by HarvestDestinationAddress */
1622 ComboAddress remote
;
1626 const size_t vectSize
= g_udpVectorSize
;
1627 /* the actual buffer is larger because:
1628 - we may have to add EDNS and/or ECS
1629 - we use it for self-generated responses (from rule or cache)
1630 but we only accept incoming payloads up to that size
1632 static_assert(s_udpIncomingBufferSize
<= sizeof(MMReceiver::packet
), "the incoming buffer size should not be larger than sizeof(MMReceiver::packet)");
1634 auto recvData
= std::unique_ptr
<MMReceiver
[]>(new MMReceiver
[vectSize
]);
1635 auto msgVec
= std::unique_ptr
<struct mmsghdr
[]>(new struct mmsghdr
[vectSize
]);
1636 auto outMsgVec
= std::unique_ptr
<struct mmsghdr
[]>(new struct mmsghdr
[vectSize
]);
1638 /* initialize the structures needed to receive our messages */
1639 for (size_t idx
= 0; idx
< vectSize
; idx
++) {
1640 recvData
[idx
].remote
.sin4
.sin_family
= cs
->local
.sin4
.sin_family
;
1641 fillMSGHdr(&msgVec
[idx
].msg_hdr
, &recvData
[idx
].iov
, recvData
[idx
].cbuf
, sizeof(recvData
[idx
].cbuf
), recvData
[idx
].packet
, s_udpIncomingBufferSize
, &recvData
[idx
].remote
);
1647 /* reset the IO vector, since it's also used to send the vector of responses
1648 to avoid having to copy the data around */
1649 for (size_t idx
= 0; idx
< vectSize
; idx
++) {
1650 recvData
[idx
].iov
.iov_base
= recvData
[idx
].packet
;
1651 recvData
[idx
].iov
.iov_len
= sizeof(recvData
[idx
].packet
);
1654 /* block until we have at least one message ready, but return
1655 as many as possible to save the syscall costs */
1656 int msgsGot
= recvmmsg(cs
->udpFD
, msgVec
.get(), vectSize
, MSG_WAITFORONE
| MSG_TRUNC
, nullptr);
1659 vinfolog("Getting UDP messages via recvmmsg() failed with: %s", strerror(errno
));
1663 unsigned int msgsToSend
= 0;
1665 /* process the received messages */
1666 for (int msgIdx
= 0; msgIdx
< msgsGot
; msgIdx
++) {
1667 const struct msghdr
* msgh
= &msgVec
[msgIdx
].msg_hdr
;
1668 unsigned int got
= msgVec
[msgIdx
].msg_len
;
1669 const ComboAddress
& remote
= recvData
[msgIdx
].remote
;
1671 if (got
< 0 || static_cast<size_t>(got
) < sizeof(struct dnsheader
)) {
1672 ++g_stats
.nonCompliantQueries
;
1676 processUDPQuery(*cs
, holders
, msgh
, remote
, recvData
[msgIdx
].dest
, recvData
[msgIdx
].packet
, static_cast<uint16_t>(got
), sizeof(recvData
[msgIdx
].packet
), outMsgVec
.get(), &msgsToSend
, &recvData
[msgIdx
].iov
, recvData
[msgIdx
].cbuf
);
1680 /* immediate (not delayed or sent to a backend) responses (mostly from a rule, dynamic block
1681 or the cache) can be sent in batch too */
1683 if (msgsToSend
> 0 && msgsToSend
<= static_cast<unsigned int>(msgsGot
)) {
1684 int sent
= sendmmsg(cs
->udpFD
, outMsgVec
.get(), msgsToSend
, 0);
1686 if (sent
< 0 || static_cast<unsigned int>(sent
) != msgsToSend
) {
1687 vinfolog("Error sending responses with sendmmsg() (%d on %u): %s", sent
, msgsToSend
, strerror(errno
));
1693 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1695 // listens to incoming queries, sends out to downstream servers, noting the intended return path
1696 static void udpClientThread(ClientState
* cs
)
1699 setThreadName("dnsdist/udpClie");
1700 LocalHolders holders
;
1702 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1703 if (g_udpVectorSize
> 1) {
1704 MultipleMessagesUDPClientThread(cs
, holders
);
1708 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1711 /* the actual buffer is larger because:
1712 - we may have to add EDNS and/or ECS
1713 - we use it for self-generated responses (from rule or cache)
1714 but we only accept incoming payloads up to that size
1716 static_assert(s_udpIncomingBufferSize
<= sizeof(packet
), "the incoming buffer size should not be larger than sizeof(MMReceiver::packet)");
1719 /* used by HarvestDestinationAddress */
1722 ComboAddress remote
;
1724 remote
.sin4
.sin_family
= cs
->local
.sin4
.sin_family
;
1725 fillMSGHdr(&msgh
, &iov
, cbuf
, sizeof(cbuf
), packet
, sizeof(packet
), &remote
);
1728 ssize_t got
= recvmsg(cs
->udpFD
, &msgh
, 0);
1730 if (got
< 0 || static_cast<size_t>(got
) < sizeof(struct dnsheader
)) {
1731 ++g_stats
.nonCompliantQueries
;
1735 processUDPQuery(*cs
, holders
, &msgh
, remote
, dest
, packet
, static_cast<uint16_t>(got
), s_udpIncomingBufferSize
, nullptr, nullptr, nullptr, nullptr);
1739 catch(const std::exception
&e
)
1741 errlog("UDP client thread died because of exception: %s", e
.what());
1743 catch(const PDNSException
&e
)
1745 errlog("UDP client thread died because of PowerDNS exception: %s", e
.reason
);
1749 errlog("UDP client thread died because of an exception: %s", "unknown");
1752 uint16_t getRandomDNSID()
1754 #ifdef HAVE_LIBSODIUM
1755 return (randombytes_random() % 65536);
1757 return (random() % 65536);
1761 static bool upCheck(const shared_ptr
<DownstreamState
>& ds
)
1764 DNSName checkName
= ds
->checkName
;
1765 uint16_t checkType
= ds
->checkType
.getCode();
1766 uint16_t checkClass
= ds
->checkClass
;
1767 dnsheader checkHeader
;
1768 memset(&checkHeader
, 0, sizeof(checkHeader
));
1770 checkHeader
.qdcount
= htons(1);
1771 checkHeader
.id
= getRandomDNSID();
1773 checkHeader
.rd
= true;
1775 checkHeader
.cd
= true;
1778 if (ds
->checkFunction
) {
1779 std::lock_guard
<std::mutex
> lock(g_luamutex
);
1780 auto ret
= ds
->checkFunction(checkName
, checkType
, checkClass
, &checkHeader
);
1781 checkName
= std::get
<0>(ret
);
1782 checkType
= std::get
<1>(ret
);
1783 checkClass
= std::get
<2>(ret
);
1786 vector
<uint8_t> packet
;
1787 DNSPacketWriter
dpw(packet
, checkName
, checkType
, checkClass
);
1788 dnsheader
* requestHeader
= dpw
.getHeader();
1789 *requestHeader
= checkHeader
;
1791 Socket
sock(ds
->remote
.sin4
.sin_family
, SOCK_DGRAM
);
1792 sock
.setNonBlocking();
1793 if (!IsAnyAddress(ds
->sourceAddr
)) {
1794 sock
.setReuseAddr();
1795 sock
.bind(ds
->sourceAddr
);
1797 sock
.connect(ds
->remote
);
1798 ssize_t sent
= udpClientSendRequestToBackend(ds
, sock
.getHandle(), reinterpret_cast<char*>(&packet
[0]), packet
.size(), true);
1801 if (g_verboseHealthChecks
)
1802 infolog("Error while sending a health check query to backend %s: %d", ds
->getNameWithAddr(), ret
);
1806 int ret
= waitForRWData(sock
.getHandle(), true, /* ms to seconds */ ds
->checkTimeout
/ 1000, /* remaining ms to us */ (ds
->checkTimeout
% 1000) * 1000);
1807 if(ret
< 0 || !ret
) { // error, timeout, both are down!
1810 if (g_verboseHealthChecks
)
1811 infolog("Error while waiting for the health check response from backend %s: %d", ds
->getNameWithAddr(), ret
);
1814 if (g_verboseHealthChecks
)
1815 infolog("Timeout while waiting for the health check response from backend %s", ds
->getNameWithAddr());
1822 sock
.recvFrom(reply
, from
);
1824 /* we are using a connected socket but hey.. */
1825 if (from
!= ds
->remote
) {
1826 if (g_verboseHealthChecks
)
1827 infolog("Invalid health check response received from %s, expecting one from %s", from
.toStringWithPort(), ds
->remote
.toStringWithPort());
1831 const dnsheader
* responseHeader
= reinterpret_cast<const dnsheader
*>(reply
.c_str());
1833 if (reply
.size() < sizeof(*responseHeader
)) {
1834 if (g_verboseHealthChecks
)
1835 infolog("Invalid health check response of size %d from backend %s, expecting at least %d", reply
.size(), ds
->getNameWithAddr(), sizeof(*responseHeader
));
1839 if (responseHeader
->id
!= requestHeader
->id
) {
1840 if (g_verboseHealthChecks
)
1841 infolog("Invalid health check response id %d from backend %s, expecting %d", responseHeader
->id
, ds
->getNameWithAddr(), requestHeader
->id
);
1845 if (!responseHeader
->qr
) {
1846 if (g_verboseHealthChecks
)
1847 infolog("Invalid health check response from backend %s, expecting QR to be set", ds
->getNameWithAddr());
1851 if (responseHeader
->rcode
== RCode::ServFail
) {
1852 if (g_verboseHealthChecks
)
1853 infolog("Backend %s responded to health check with ServFail", ds
->getNameWithAddr());
1857 if (ds
->mustResolve
&& (responseHeader
->rcode
== RCode::NXDomain
|| responseHeader
->rcode
== RCode::Refused
)) {
1858 if (g_verboseHealthChecks
)
1859 infolog("Backend %s responded to health check with %s while mustResolve is set", ds
->getNameWithAddr(), responseHeader
->rcode
== RCode::NXDomain
? "NXDomain" : "Refused");
1863 uint16_t receivedType
;
1864 uint16_t receivedClass
;
1865 DNSName
receivedName(reply
.c_str(), reply
.size(), sizeof(dnsheader
), false, &receivedType
, &receivedClass
);
1867 if (receivedName
!= checkName
|| receivedType
!= checkType
|| receivedClass
!= checkClass
) {
1868 if (g_verboseHealthChecks
)
1869 infolog("Backend %s responded to health check with an invalid qname (%s vs %s), qtype (%s vs %s) or qclass (%d vs %d)", ds
->getNameWithAddr(), receivedName
.toLogString(), checkName
.toLogString(), QType(receivedType
).getName(), QType(checkType
).getName(), receivedClass
, checkClass
);
1875 catch(const std::exception
& e
)
1877 if (g_verboseHealthChecks
)
1878 infolog("Error checking the health of backend %s: %s", ds
->getNameWithAddr(), e
.what());
1883 if (g_verboseHealthChecks
)
1884 infolog("Unknown exception while checking the health of backend %s", ds
->getNameWithAddr());
1888 uint64_t g_maxTCPClientThreads
{10};
1889 std::atomic
<uint16_t> g_cacheCleaningDelay
{60};
1890 std::atomic
<uint16_t> g_cacheCleaningPercentage
{100};
1894 setThreadName("dnsdist/main");
1897 int32_t secondsToWaitLog
= 0;
1903 std::lock_guard
<std::mutex
> lock(g_luamutex
);
1904 auto f
= g_lua
.readVariable
<boost::optional
<std::function
<void()> > >("maintenance");
1908 secondsToWaitLog
= 0;
1910 catch(std::exception
&e
) {
1911 if (secondsToWaitLog
<= 0) {
1912 infolog("Error during execution of maintenance function: %s", e
.what());
1913 secondsToWaitLog
= 61;
1915 secondsToWaitLog
-= interval
;
1921 if (counter
>= g_cacheCleaningDelay
) {
1922 /* keep track, for each cache, of whether we should keep
1924 std::map
<std::shared_ptr
<DNSDistPacketCache
>, bool> caches
;
1926 /* gather all caches actually used by at least one pool, and see
1927 if something prevents us from cleaning the expired entries */
1928 auto localPools
= g_pools
.getLocal();
1929 for (const auto& entry
: *localPools
) {
1930 auto& pool
= entry
.second
;
1932 auto packetCache
= pool
->packetCache
;
1937 auto pair
= caches
.insert({packetCache
, false});
1938 auto& iter
= pair
.first
;
1939 /* if we need to keep stale data for this cache (ie, not clear
1940 expired entries when at least one pool using this cache
1941 has all its backends down) */
1942 if (packetCache
->keepStaleData() && iter
->second
== false) {
1943 /* so far all pools had at least one backend up */
1944 if (pool
->countServers(true) == 0) {
1945 iter
->second
= true;
1950 for (auto pair
: caches
) {
1951 /* shall we keep expired entries ? */
1952 if (pair
.second
== true) {
1955 auto& packetCache
= pair
.first
;
1956 size_t upTo
= (packetCache
->getMaxEntries()* (100 - g_cacheCleaningPercentage
)) / 100;
1957 packetCache
->purgeExpired(upTo
);
1962 // ponder pruning g_dynblocks of expired entries here
1966 static void secPollThread()
1968 setThreadName("dnsdist/secpoll");
1972 doSecPoll(g_secPollSuffix
);
1976 sleep(g_secPollInterval
);
1980 static void healthChecksThread()
1982 setThreadName("dnsdist/healthC");
1989 if(g_tcpclientthreads
->getQueuedCount() > 1 && !g_tcpclientthreads
->hasReachedMaxThreads())
1990 g_tcpclientthreads
->addTCPClientThread();
1992 auto states
= g_dstates
.getLocal(); // this points to the actual shared_ptrs!
1993 for(auto& dss
: *states
) {
1994 if(++dss
->lastCheck
< dss
->checkInterval
)
1997 if(dss
->availability
==DownstreamState::Availability::Auto
) {
1998 bool newState
=upCheck(dss
);
2000 /* check succeeded */
2001 dss
->currentCheckFailures
= 0;
2003 if (!dss
->upStatus
) {
2004 /* we were marked as down */
2005 dss
->consecutiveSuccessfulChecks
++;
2006 if (dss
->consecutiveSuccessfulChecks
< dss
->minRiseSuccesses
) {
2007 /* if we need more than one successful check to rise
2008 and we didn't reach the threshold yet,
2016 dss
->consecutiveSuccessfulChecks
= 0;
2018 if (dss
->upStatus
) {
2019 /* we are currently up */
2020 dss
->currentCheckFailures
++;
2021 if (dss
->currentCheckFailures
< dss
->maxCheckFailures
) {
2022 /* we need more than one failure to be marked as down,
2023 and we did not reach the threshold yet, let's stay down */
2029 if(newState
!= dss
->upStatus
) {
2030 warnlog("Marking downstream %s as '%s'", dss
->getNameWithAddr(), newState
? "up" : "down");
2032 if (newState
&& !dss
->connected
) {
2033 newState
= dss
->reconnect();
2035 if (dss
->connected
&& !dss
->threadStarted
.test_and_set()) {
2036 dss
->tid
= thread(responderThread
, dss
);
2040 dss
->upStatus
= newState
;
2041 dss
->currentCheckFailures
= 0;
2042 dss
->consecutiveSuccessfulChecks
= 0;
2043 if (g_snmpAgent
&& g_snmpTrapsEnabled
) {
2044 g_snmpAgent
->sendBackendStatusChangeTrap(dss
);
2049 auto delta
= dss
->sw
.udiffAndSet()/1000000.0;
2050 dss
->queryLoad
= 1.0*(dss
->queries
.load() - dss
->prev
.queries
.load())/delta
;
2051 dss
->dropRate
= 1.0*(dss
->reuseds
.load() - dss
->prev
.reuseds
.load())/delta
;
2052 dss
->prev
.queries
.store(dss
->queries
.load());
2053 dss
->prev
.reuseds
.store(dss
->reuseds
.load());
2055 for(IDState
& ids
: dss
->idStates
) { // timeouts
2056 int origFD
= ids
.origFD
;
2057 if(origFD
>=0 && ids
.age
++ > g_udpTimeout
) {
2058 /* We set origFD to -1 as soon as possible
2059 to limit the risk of racing with the
2061 The UDP client thread only checks origFD to
2062 know whether outstanding has to be incremented,
2063 so the sooner the better any way since we _will_
2066 if (ids
.origFD
.exchange(-1) != origFD
) {
2067 /* this state has been altered in the meantime,
2068 don't go anywhere near it */
2075 ++g_stats
.downstreamTimeouts
; // this is an 'actively' discovered timeout
2076 vinfolog("Had a downstream timeout from %s (%s) for query for %s|%s from %s",
2077 dss
->remote
.toStringWithPort(), dss
->name
,
2078 ids
.qname
.toString(), QType(ids
.qtype
).getName(), ids
.origRemote
.toStringWithPort());
2083 struct dnsheader fake
;
2084 memset(&fake
, 0, sizeof(fake
));
2085 fake
.id
= ids
.origID
;
2087 g_rings
.insertResponse(ts
, ids
.origRemote
, ids
.qname
, ids
.qtype
, std::numeric_limits
<unsigned int>::max(), 0, fake
, dss
->remote
);
2094 static void bindAny(int af
, int sock
)
2096 __attribute__((unused
)) int one
= 1;
2099 if (setsockopt(sock
, IPPROTO_IP
, IP_FREEBIND
, &one
, sizeof(one
)) < 0)
2100 warnlog("Warning: IP_FREEBIND setsockopt failed: %s", strerror(errno
));
2105 if (setsockopt(sock
, IPPROTO_IP
, IP_BINDANY
, &one
, sizeof(one
)) < 0)
2106 warnlog("Warning: IP_BINDANY setsockopt failed: %s", strerror(errno
));
2110 if (setsockopt(sock
, IPPROTO_IPV6
, IPV6_BINDANY
, &one
, sizeof(one
)) < 0)
2111 warnlog("Warning: IPV6_BINDANY setsockopt failed: %s", strerror(errno
));
2114 if (setsockopt(sock
, SOL_SOCKET
, SO_BINDANY
, &one
, sizeof(one
)) < 0)
2115 warnlog("Warning: SO_BINDANY setsockopt failed: %s", strerror(errno
));
2119 static void dropGroupPrivs(gid_t gid
)
2122 if (setgid(gid
) == 0) {
2123 if (setgroups(0, NULL
) < 0) {
2124 warnlog("Warning: Unable to drop supplementary gids: %s", strerror(errno
));
2128 warnlog("Warning: Unable to set group ID to %d: %s", gid
, strerror(errno
));
2133 static void dropUserPrivs(uid_t uid
)
2136 if(setuid(uid
) < 0) {
2137 warnlog("Warning: Unable to set user ID to %d: %s", uid
, strerror(errno
));
2142 static void checkFileDescriptorsLimits(size_t udpBindsCount
, size_t tcpBindsCount
)
2144 /* stdin, stdout, stderr */
2145 size_t requiredFDsCount
= 3;
2146 auto backends
= g_dstates
.getLocal();
2147 /* UDP sockets to backends */
2148 size_t backendUDPSocketsCount
= 0;
2149 for (const auto& backend
: *backends
) {
2150 backendUDPSocketsCount
+= backend
->sockets
.size();
2152 requiredFDsCount
+= backendUDPSocketsCount
;
2153 /* TCP sockets to backends */
2154 requiredFDsCount
+= (backends
->size() * g_maxTCPClientThreads
);
2155 /* listening sockets */
2156 requiredFDsCount
+= udpBindsCount
;
2157 requiredFDsCount
+= tcpBindsCount
;
2158 /* max TCP connections currently served */
2159 requiredFDsCount
+= g_maxTCPClientThreads
;
2160 /* max pipes for communicating between TCP acceptors and client threads */
2161 requiredFDsCount
+= (g_maxTCPClientThreads
* 2);
2162 /* max TCP queued connections */
2163 requiredFDsCount
+= g_maxTCPQueuedConnections
;
2164 /* DelayPipe pipe */
2165 requiredFDsCount
+= 2;
2168 /* webserver main socket */
2170 /* console main socket */
2177 getrlimit(RLIMIT_NOFILE
, &rl
);
2178 if (rl
.rlim_cur
<= requiredFDsCount
) {
2179 warnlog("Warning, this configuration can use more than %d file descriptors, web server and console connections not included, and the current limit is %d.", std::to_string(requiredFDsCount
), std::to_string(rl
.rlim_cur
));
2181 warnlog("You can increase this value by using LimitNOFILE= in the systemd unit file or ulimit.");
2183 warnlog("You can increase this value by using ulimit.");
2188 static void setUpLocalBind(std::unique_ptr
<ClientState
>& cs
)
2190 /* skip some warnings if there is an identical UDP context */
2191 bool warn
= cs
->tcp
== false || cs
->tlsFrontend
!= nullptr || cs
->dohFrontend
!= nullptr;
2192 int& fd
= cs
->tcp
== false ? cs
->udpFD
: cs
->tcpFD
;
2195 fd
= SSocket(cs
->local
.sin4
.sin_family
, cs
->tcp
== false ? SOCK_DGRAM
: SOCK_STREAM
, 0);
2198 SSetsockopt(fd
, SOL_SOCKET
, SO_REUSEADDR
, 1);
2199 #ifdef TCP_DEFER_ACCEPT
2200 SSetsockopt(fd
, IPPROTO_TCP
, TCP_DEFER_ACCEPT
, 1);
2202 if (cs
->fastOpenQueueSize
> 0) {
2204 SSetsockopt(fd
, IPPROTO_TCP
, TCP_FASTOPEN
, cs
->fastOpenQueueSize
);
2207 warnlog("TCP Fast Open has been configured on local address '%s' but is not supported", cs
->local
.toStringWithPort());
2213 if(cs
->local
.sin4
.sin_family
== AF_INET6
) {
2214 SSetsockopt(fd
, IPPROTO_IPV6
, IPV6_V6ONLY
, 1);
2217 bindAny(cs
->local
.sin4
.sin_family
, fd
);
2219 if(!cs
->tcp
&& IsAnyAddress(cs
->local
)) {
2221 setsockopt(fd
, IPPROTO_IP
, GEN_IP_PKTINFO
, &one
, sizeof(one
)); // linux supports this, so why not - might fail on other systems
2222 #ifdef IPV6_RECVPKTINFO
2223 setsockopt(fd
, IPPROTO_IPV6
, IPV6_RECVPKTINFO
, &one
, sizeof(one
));
2227 if (cs
->reuseport
) {
2229 SSetsockopt(fd
, SOL_SOCKET
, SO_REUSEPORT
, 1);
2232 /* no need to warn again if configured but support is not available, we already did for UDP */
2233 warnlog("SO_REUSEPORT has been configured on local address '%s' but is not supported", cs
->local
.toStringWithPort());
2239 if (cs
->local
.isIPv4()) {
2241 setSocketIgnorePMTU(cs
->udpFD
);
2243 catch(const std::exception
& e
) {
2244 warnlog("Failed to set IP_MTU_DISCOVER on UDP server socket for local address '%s': %s", cs
->local
.toStringWithPort(), e
.what());
2249 const std::string
& itf
= cs
->interface
;
2251 #ifdef SO_BINDTODEVICE
2252 int res
= setsockopt(fd
, SOL_SOCKET
, SO_BINDTODEVICE
, itf
.c_str(), itf
.length());
2254 warnlog("Error setting up the interface on local address '%s': %s", cs
->local
.toStringWithPort(), strerror(errno
));
2258 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", cs
->local
.toStringWithPort());
2264 if (g_defaultBPFFilter
) {
2265 cs
->attachFilter(g_defaultBPFFilter
);
2266 vinfolog("Attaching default BPF Filter to %s frontend %s", (!cs
->tcp
? "UDP" : "TCP"), cs
->local
.toStringWithPort());
2268 #endif /* HAVE_EBPF */
2270 if (cs
->tlsFrontend
!= nullptr) {
2271 if (!cs
->tlsFrontend
->setupTLS()) {
2272 errlog("Error while setting up TLS on local address '%s', exiting", cs
->local
.toStringWithPort());
2273 _exit(EXIT_FAILURE
);
2277 if (cs
->dohFrontend
!= nullptr) {
2278 cs
->dohFrontend
->setup();
2281 SBind(fd
, cs
->local
);
2284 SListen(cs
->tcpFD
, SOMAXCONN
);
2285 if (cs
->tlsFrontend
!= nullptr) {
2286 warnlog("Listening on %s for TLS", cs
->local
.toStringWithPort());
2288 else if (cs
->dohFrontend
!= nullptr) {
2289 warnlog("Listening on %s for DoH", cs
->local
.toStringWithPort());
2291 else if (cs
->dnscryptCtx
!= nullptr) {
2292 warnlog("Listening on %s for DNSCrypt", cs
->local
.toStringWithPort());
2295 warnlog("Listening on %s", cs
->local
.toStringWithPort());
2304 vector
<string
> locals
;
2305 vector
<string
> remotes
;
2306 bool checkConfig
{false};
2307 bool beClient
{false};
2308 bool beSupervised
{false};
2315 std::atomic
<bool> g_configurationDone
{false};
2320 cout
<<"Syntax: dnsdist [-C,--config file] [-c,--client [IP[:PORT]]]\n";
2321 cout
<<"[-e,--execute cmd] [-h,--help] [-l,--local addr]\n";
2322 cout
<<"[-v,--verbose] [--check-config] [--version]\n";
2324 cout
<<"-a,--acl netmask Add this netmask to the ACL\n";
2325 cout
<<"-C,--config file Load configuration from 'file'\n";
2326 cout
<<"-c,--client Operate as a client, connect to dnsdist. This reads\n";
2327 cout
<<" controlSocket from your configuration file, but also\n";
2328 cout
<<" accepts an IP:PORT argument\n";
2329 #ifdef HAVE_LIBSODIUM
2330 cout
<<"-k,--setkey KEY Use KEY for encrypted communication to dnsdist. This\n";
2331 cout
<<" is similar to setting setKey in the configuration file.\n";
2332 cout
<<" NOTE: this will leak this key in your shell's history\n";
2333 cout
<<" and in the systems running process list.\n";
2335 cout
<<"--check-config Validate the configuration file and exit. The exit-code\n";
2336 cout
<<" reflects the validation, 0 is OK, 1 means an error.\n";
2337 cout
<<" Any errors are printed as well.\n";
2338 cout
<<"-e,--execute cmd Connect to dnsdist and execute 'cmd'\n";
2339 cout
<<"-g,--gid gid Change the process group ID after binding sockets\n";
2340 cout
<<"-h,--help Display this helpful message\n";
2341 cout
<<"-l,--local address Listen on this local address\n";
2342 cout
<<"--supervised Don't open a console, I'm supervised\n";
2343 cout
<<" (use with e.g. systemd and daemontools)\n";
2344 cout
<<"--disable-syslog Don't log to syslog, only to stdout\n";
2345 cout
<<" (use with e.g. systemd)\n";
2346 cout
<<"-u,--uid uid Change the process user ID after binding sockets\n";
2347 cout
<<"-v,--verbose Enable verbose mode\n";
2348 cout
<<"-V,--version Show dnsdist version information and exit\n";
2351 int main(int argc
, char** argv
)
2354 size_t udpBindsCount
= 0;
2355 size_t tcpBindsCount
= 0;
2356 rl_attempted_completion_function
= my_completion
;
2357 rl_completion_append_character
= 0;
2359 signal(SIGPIPE
, SIG_IGN
);
2360 signal(SIGCHLD
, SIG_IGN
);
2361 openlog("dnsdist", LOG_PID
|LOG_NDELAY
, LOG_DAEMON
);
2363 #ifdef HAVE_LIBSODIUM
2364 if (sodium_init() == -1) {
2365 cerr
<<"Unable to initialize crypto library"<<endl
;
2368 g_hashperturb
=randombytes_uniform(0xffffffff);
2369 srandom(randombytes_uniform(0xffffffff));
2373 gettimeofday(&tv
, 0);
2374 srandom(tv
.tv_sec
^ tv
.tv_usec
^ getpid());
2375 g_hashperturb
=random();
2379 ComboAddress clientAddress
= ComboAddress();
2380 g_cmdLine
.config
=SYSCONFDIR
"/dnsdist.conf";
2381 struct option longopts
[]={
2382 {"acl", required_argument
, 0, 'a'},
2383 {"check-config", no_argument
, 0, 1},
2384 {"client", no_argument
, 0, 'c'},
2385 {"config", required_argument
, 0, 'C'},
2386 {"disable-syslog", no_argument
, 0, 2},
2387 {"execute", required_argument
, 0, 'e'},
2388 {"gid", required_argument
, 0, 'g'},
2389 {"help", no_argument
, 0, 'h'},
2390 {"local", required_argument
, 0, 'l'},
2391 {"setkey", required_argument
, 0, 'k'},
2392 {"supervised", no_argument
, 0, 3},
2393 {"uid", required_argument
, 0, 'u'},
2394 {"verbose", no_argument
, 0, 'v'},
2395 {"version", no_argument
, 0, 'V'},
2401 int c
=getopt_long(argc
, argv
, "a:cC:e:g:hk:l:u:vV", longopts
, &longindex
);
2406 g_cmdLine
.checkConfig
=true;
2412 g_cmdLine
.beSupervised
=true;
2415 g_cmdLine
.config
=optarg
;
2418 g_cmdLine
.beClient
=true;
2421 g_cmdLine
.command
=optarg
;
2424 g_cmdLine
.gid
=optarg
;
2427 cout
<<"dnsdist "<<VERSION
<<endl
;
2434 g_ACL
.modify([optstring
](NetmaskGroup
& nmg
) { nmg
.addMask(optstring
); });
2437 #ifdef HAVE_LIBSODIUM
2438 if (B64Decode(string(optarg
), g_consoleKey
) < 0) {
2439 cerr
<<"Unable to decode key '"<<optarg
<<"'."<<endl
;
2443 cerr
<<"dnsdist has been built without libsodium, -k/--setkey is unsupported."<<endl
;
2448 g_cmdLine
.locals
.push_back(trim_copy(string(optarg
)));
2451 g_cmdLine
.uid
=optarg
;
2457 #ifdef LUAJIT_VERSION
2458 cout
<<"dnsdist "<<VERSION
<<" ("<<LUA_RELEASE
<<" ["<<LUAJIT_VERSION
<<"])"<<endl
;
2460 cout
<<"dnsdist "<<VERSION
<<" ("<<LUA_RELEASE
<<")"<<endl
;
2462 cout
<<"Enabled features: ";
2463 #ifdef HAVE_DNS_OVER_TLS
2464 cout
<<"dns-over-tls(";
2476 #ifdef HAVE_DNS_OVER_HTTPS
2477 cout
<<"dns-over-https(DOH) ";
2479 #ifdef HAVE_DNSCRYPT
2488 #ifdef HAVE_LIBCRYPTO
2491 #ifdef HAVE_LIBSODIUM
2494 #ifdef HAVE_PROTOBUF
2500 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
2501 cout
<<"recvmmsg/sendmmsg ";
2503 #ifdef HAVE_NET_SNMP
2513 //getopt_long printed an error message.
2522 for(auto p
= argv
; *p
; ++p
) {
2523 if(g_cmdLine
.beClient
) {
2524 clientAddress
= ComboAddress(*p
, 5199);
2526 g_cmdLine
.remotes
.push_back(*p
);
2530 ServerPolicy leastOutstandingPol
{"leastOutstanding", leastOutstanding
, false};
2532 g_policy
.setState(leastOutstandingPol
);
2533 if(g_cmdLine
.beClient
|| !g_cmdLine
.command
.empty()) {
2534 setupLua(true, g_cmdLine
.config
);
2535 if (clientAddress
!= ComboAddress())
2536 g_serverControl
= clientAddress
;
2537 doClient(g_serverControl
, g_cmdLine
.command
);
2538 _exit(EXIT_SUCCESS
);
2541 auto acl
= g_ACL
.getCopy();
2543 for(auto& addr
: {"127.0.0.0/8", "10.0.0.0/8", "100.64.0.0/10", "169.254.0.0/16", "192.168.0.0/16", "172.16.0.0/12", "::1/128", "fc00::/7", "fe80::/10"})
2545 g_ACL
.setState(acl
);
2548 auto consoleACL
= g_consoleACL
.getCopy();
2549 for (const auto& mask
: { "127.0.0.1/8", "::1/128" }) {
2550 consoleACL
.addMask(mask
);
2552 g_consoleACL
.setState(consoleACL
);
2554 if (g_cmdLine
.checkConfig
) {
2555 setupLua(true, g_cmdLine
.config
);
2556 // No exception was thrown
2557 infolog("Configuration '%s' OK!", g_cmdLine
.config
);
2558 _exit(EXIT_SUCCESS
);
2561 auto todo
=setupLua(false, g_cmdLine
.config
);
2563 auto localPools
= g_pools
.getCopy();
2565 bool precompute
= false;
2566 if (g_policy
.getLocal()->name
== "chashed") {
2569 for (const auto& entry
: localPools
) {
2570 if (entry
.second
->policy
!= nullptr && entry
.second
->policy
->name
== "chashed") {
2577 vinfolog("Pre-computing hashes for consistent hash load-balancing policy");
2578 // pre compute hashes
2579 auto backends
= g_dstates
.getLocal();
2580 for (auto& backend
: *backends
) {
2586 if (!g_cmdLine
.locals
.empty()) {
2587 for (auto it
= g_frontends
.begin(); it
!= g_frontends
.end(); ) {
2588 /* DoH, DoT and DNSCrypt frontends are separate */
2589 if ((*it
)->dohFrontend
== nullptr && (*it
)->tlsFrontend
== nullptr && (*it
)->dnscryptCtx
== nullptr) {
2590 it
= g_frontends
.erase(it
);
2597 for(const auto& loc
: g_cmdLine
.locals
) {
2599 g_frontends
.push_back(std::unique_ptr
<ClientState
>(new ClientState(ComboAddress(loc
, 53), false, false, 0, "", {})));
2601 g_frontends
.push_back(std::unique_ptr
<ClientState
>(new ClientState(ComboAddress(loc
, 53), true, false, 0, "", {})));
2605 if (g_frontends
.empty()) {
2607 g_frontends
.push_back(std::unique_ptr
<ClientState
>(new ClientState(ComboAddress("127.0.0.1", 53), false, false, 0, "", {})));
2609 g_frontends
.push_back(std::unique_ptr
<ClientState
>(new ClientState(ComboAddress("127.0.0.1", 53), true, false, 0, "", {})));
2612 g_configurationDone
= true;
2614 for(auto& frontend
: g_frontends
) {
2615 setUpLocalBind(frontend
);
2617 if (frontend
->tcp
== false) {
2625 warnlog("dnsdist %s comes with ABSOLUTELY NO WARRANTY. This is free software, and you are welcome to redistribute it according to the terms of the GPL version 2", VERSION
);
2629 g_ACL
.getLocal()->toStringVector(&vec
);
2630 for(const auto& s
: vec
) {
2635 infolog("ACL allowing queries from: %s", acls
.c_str());
2638 g_consoleACL
.getLocal()->toStringVector(&vec
);
2639 for (const auto& entry
: vec
) {
2640 if (!acls
.empty()) {
2645 infolog("Console ACL allowing connections from: %s", acls
.c_str());
2647 #ifdef HAVE_LIBSODIUM
2648 if (g_consoleEnabled
&& g_consoleKey
.empty()) {
2649 warnlog("Warning, the console has been enabled via 'controlSocket()' but no key has been set with 'setKey()' so all connections will fail until a key has been set");
2656 if(!g_cmdLine
.gid
.empty())
2657 newgid
= strToGID(g_cmdLine
.gid
.c_str());
2659 if(!g_cmdLine
.uid
.empty())
2660 newuid
= strToUID(g_cmdLine
.uid
.c_str());
2662 dropGroupPrivs(newgid
);
2663 dropUserPrivs(newuid
);
2665 /* we might still have capabilities remaining,
2666 for example if we have been started as root
2667 without --uid or --gid (please don't do that)
2668 or as an unprivileged user with ambient
2669 capabilities like CAP_NET_BIND_SERVICE.
2673 catch(const std::exception
& e
) {
2674 warnlog("%s", e
.what());
2677 /* this need to be done _after_ dropping privileges */
2678 g_delay
= new DelayPipe
<DelayedPacket
>();
2684 g_tcpclientthreads
= std::unique_ptr
<TCPClientCollection
>(new TCPClientCollection(g_maxTCPClientThreads
, g_useTCPSinglePipe
));
2689 localPools
= g_pools
.getCopy();
2690 /* create the default pool no matter what */
2691 createPoolIfNotExists(localPools
, "");
2692 if(g_cmdLine
.remotes
.size()) {
2693 for(const auto& address
: g_cmdLine
.remotes
) {
2694 auto ret
=std::make_shared
<DownstreamState
>(ComboAddress(address
, 53));
2695 addServerToPool(localPools
, "", ret
);
2696 if (ret
->connected
&& !ret
->threadStarted
.test_and_set()) {
2697 ret
->tid
= thread(responderThread
, ret
);
2699 g_dstates
.modify([ret
](servers_t
& servers
) { servers
.push_back(ret
); });
2702 g_pools
.setState(localPools
);
2704 if(g_dstates
.getLocal()->empty()) {
2705 errlog("No downstream servers defined: all packets will get dropped");
2706 // you might define them later, but you need to know
2709 checkFileDescriptorsLimits(udpBindsCount
, tcpBindsCount
);
2711 for(auto& dss
: g_dstates
.getCopy()) { // it is a copy, but the internal shared_ptrs are the real deal
2712 if(dss
->availability
==DownstreamState::Availability::Auto
) {
2713 bool newState
=upCheck(dss
);
2714 warnlog("Marking downstream %s as '%s'", dss
->getNameWithAddr(), newState
? "up" : "down");
2715 dss
->upStatus
= newState
;
2719 for(auto& cs
: g_frontends
) {
2720 if (cs
->dohFrontend
!= nullptr) {
2721 #ifdef HAVE_DNS_OVER_HTTPS
2722 std::thread
t1(dohThread
, cs
.get());
2723 if (!cs
->cpus
.empty()) {
2724 mapThreadToCPUList(t1
.native_handle(), cs
->cpus
);
2727 #endif /* HAVE_DNS_OVER_HTTPS */
2730 if (cs
->udpFD
>= 0) {
2731 thread
t1(udpClientThread
, cs
.get());
2732 if (!cs
->cpus
.empty()) {
2733 mapThreadToCPUList(t1
.native_handle(), cs
->cpus
);
2737 else if (cs
->tcpFD
>= 0) {
2738 thread
t1(tcpAcceptorThread
, cs
.get());
2739 if (!cs
->cpus
.empty()) {
2740 mapThreadToCPUList(t1
.native_handle(), cs
->cpus
);
2746 thread
carbonthread(carbonDumpThread
);
2747 carbonthread
.detach();
2749 thread
stattid(maintThread
);
2752 thread
healththread(healthChecksThread
);
2754 if (!g_secPollSuffix
.empty()) {
2755 thread
secpollthread(secPollThread
);
2756 secpollthread
.detach();
2759 if(g_cmdLine
.beSupervised
) {
2761 sd_notify(0, "READY=1");
2763 healththread
.join();
2766 healththread
.detach();
2769 _exit(EXIT_SUCCESS
);
2772 catch(const LuaContext::ExecutionErrorException
& e
) {
2774 errlog("Fatal Lua error: %s", e
.what());
2775 std::rethrow_if_nested(e
);
2776 } catch(const std::exception
& ne
) {
2777 errlog("Details: %s", ne
.what());
2779 catch(PDNSException
&ae
)
2781 errlog("Fatal pdns error: %s", ae
.reason
);
2783 _exit(EXIT_FAILURE
);
2785 catch(std::exception
&e
)
2787 errlog("Fatal error: %s", e
.what());
2788 _exit(EXIT_FAILURE
);
2790 catch(PDNSException
&ae
)
2792 errlog("Fatal pdns error: %s", ae
.reason
);
2793 _exit(EXIT_FAILURE
);