2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
29 #include <netinet/tcp.h>
31 #include <sys/resource.h>
34 #if defined (__OpenBSD__) || defined(__NetBSD__)
35 #include <readline/readline.h>
37 #include <editline/readline.h>
40 #include "dnsdist-systemd.hh"
42 #include <systemd/sd-daemon.h>
46 #include "dnsdist-cache.hh"
47 #include "dnsdist-console.hh"
48 #include "dnsdist-ecs.hh"
49 #include "dnsdist-healthchecks.hh"
50 #include "dnsdist-lua.hh"
51 #include "dnsdist-rings.hh"
52 #include "dnsdist-secpoll.hh"
53 #include "dnsdist-xpf.hh"
56 #include "delaypipe.hh"
59 #include "dnsparser.hh"
60 #include "ednsoptions.hh"
64 #include "sodcrypto.hh"
66 #include "threadname.hh"
70 Receiver is currently single threaded
71 not *that* bad actually, but now that we are thread safe, might want to scale
75 Set of Rules, if one matches, it leads to an Action
76 Both rules and actions could conceivably be Lua based.
77 On the C++ side, both could be inherited from a class Rule and a class Action,
78 on the Lua side we can't do that. */
84 struct DNSDistStats g_stats
;
86 uint16_t g_maxOutstanding
{std::numeric_limits
<uint16_t>::max()};
87 uint32_t g_staleCacheEntriesTTL
{0};
89 bool g_allowEmptyResponse
{false};
91 GlobalStateHolder
<NetmaskGroup
> g_ACL
;
92 string g_outputBuffer
;
94 std::vector
<std::shared_ptr
<TLSFrontend
>> g_tlslocals
;
95 std::vector
<std::shared_ptr
<DOHFrontend
>> g_dohlocals
;
96 std::vector
<std::shared_ptr
<DNSCryptContext
>> g_dnsCryptLocals
;
98 shared_ptr
<BPFFilter
> g_defaultBPFFilter
;
99 std::vector
<std::shared_ptr
<DynBPFFilter
> > g_dynBPFFilters
;
100 #endif /* HAVE_EBPF */
101 std::vector
<std::unique_ptr
<ClientState
>> g_frontends
;
102 GlobalStateHolder
<pools_t
> g_pools
;
103 size_t g_udpVectorSize
{1};
105 bool g_snmpEnabled
{false};
106 bool g_snmpTrapsEnabled
{false};
107 DNSDistSNMPAgent
* g_snmpAgent
{nullptr};
109 /* UDP: the grand design. Per socket we listen on for incoming queries there is one thread.
110 Then we have a bunch of connected sockets for talking to downstream servers.
111 We send directly to those sockets.
113 For the return path, per downstream server we have a thread that listens to responses.
115 Per socket there is an array of 2^16 states, when we send out a packet downstream, we note
116 there the original requestor and the original id. The new ID is the offset in the array.
118 When an answer comes in on a socket, we look up the offset by the id, and lob it to the
121 IDs are assigned by atomic increments of the socket offset.
124 GlobalStateHolder
<vector
<DNSDistRuleAction
> > g_rulactions
;
125 GlobalStateHolder
<vector
<DNSDistResponseRuleAction
> > g_resprulactions
;
126 GlobalStateHolder
<vector
<DNSDistResponseRuleAction
> > g_cachehitresprulactions
;
127 GlobalStateHolder
<vector
<DNSDistResponseRuleAction
> > g_selfansweredresprulactions
;
132 GlobalStateHolder
<servers_t
> g_dstates
;
133 GlobalStateHolder
<NetmaskTree
<DynBlock
>> g_dynblockNMG
;
134 GlobalStateHolder
<SuffixMatchTree
<DynBlock
>> g_dynblockSMT
;
135 DNSAction::Action g_dynBlockAction
= DNSAction::Action::Drop
;
136 int g_tcpRecvTimeout
{2};
137 int g_tcpSendTimeout
{2};
140 bool g_servFailOnNoPolicy
{false};
141 bool g_truncateTC
{false};
142 bool g_fixupCase
{false};
143 bool g_preserveTrailingData
{false};
144 bool g_roundrobinFailOnNoServer
{false};
146 std::set
<std::string
> g_capabilitiesToRetain
;
148 static void truncateTC(char* packet
, uint16_t* len
, size_t responseSize
, unsigned int consumed
)
151 bool hadEDNS
= false;
152 uint16_t payloadSize
= 0;
155 if (g_addEDNSToSelfGeneratedResponses
) {
156 hadEDNS
= getEDNSUDPPayloadSizeAndZ(packet
, *len
, &payloadSize
, &z
);
159 *len
=static_cast<uint16_t>(sizeof(dnsheader
)+consumed
+DNS_TYPE_SIZE
+DNS_CLASS_SIZE
);
160 struct dnsheader
* dh
= reinterpret_cast<struct dnsheader
*>(packet
);
161 dh
->ancount
= dh
->arcount
= dh
->nscount
= 0;
164 addEDNS(dh
, *len
, responseSize
, z
& EDNS_HEADER_FLAG_DO
, payloadSize
, 0);
176 ComboAddress destination
;
177 ComboAddress origDest
;
181 if(origDest
.sin4
.sin_family
== 0) {
182 res
= sendto(fd
, packet
.c_str(), packet
.size(), 0, (struct sockaddr
*)&destination
, destination
.getSocklen());
185 res
= sendfromto(fd
, packet
.c_str(), packet
.size(), 0, origDest
, destination
);
189 vinfolog("Error sending delayed response to %s: %s", destination
.toStringWithPort(), strerror(err
));
194 DelayPipe
<DelayedPacket
>* g_delay
= nullptr;
196 std::string
DNSQuestion::getTrailingData() const
198 const char* message
= reinterpret_cast<const char*>(this->dh
);
199 const uint16_t messageLen
= getDNSPacketLength(message
, this->len
);
200 return std::string(message
+ messageLen
, this->len
- messageLen
);
203 bool DNSQuestion::setTrailingData(const std::string
& tail
)
205 char* message
= reinterpret_cast<char*>(this->dh
);
206 const uint16_t messageLen
= getDNSPacketLength(message
, this->len
);
207 const uint16_t tailLen
= tail
.size();
208 if (tailLen
> (this->size
- messageLen
)) {
212 /* Update length and copy data from the Lua string. */
213 this->len
= messageLen
+ tailLen
;
215 tail
.copy(message
+ messageLen
, tailLen
);
220 void doLatencyStats(double udiff
)
222 if(udiff
< 1000) ++g_stats
.latency0_1
;
223 else if(udiff
< 10000) ++g_stats
.latency1_10
;
224 else if(udiff
< 50000) ++g_stats
.latency10_50
;
225 else if(udiff
< 100000) ++g_stats
.latency50_100
;
226 else if(udiff
< 1000000) ++g_stats
.latency100_1000
;
227 else ++g_stats
.latencySlow
;
228 g_stats
.latencySum
+= udiff
/ 1000;
230 auto doAvg
= [](double& var
, double n
, double weight
) {
231 var
= (weight
-1) * var
/weight
+ n
/weight
;
234 doAvg(g_stats
.latencyAvg100
, udiff
, 100);
235 doAvg(g_stats
.latencyAvg1000
, udiff
, 1000);
236 doAvg(g_stats
.latencyAvg10000
, udiff
, 10000);
237 doAvg(g_stats
.latencyAvg1000000
, udiff
, 1000000);
240 bool responseContentMatches(const char* response
, const uint16_t responseLen
, const DNSName
& qname
, const uint16_t qtype
, const uint16_t qclass
, const ComboAddress
& remote
, unsigned int& consumed
)
242 if (responseLen
< sizeof(dnsheader
)) {
246 const struct dnsheader
* dh
= reinterpret_cast<const struct dnsheader
*>(response
);
247 if (dh
->qdcount
== 0) {
248 if ((dh
->rcode
!= RCode::NoError
&& dh
->rcode
!= RCode::NXDomain
) || g_allowEmptyResponse
) {
252 ++g_stats
.nonCompliantResponses
;
257 uint16_t rqtype
, rqclass
;
260 rqname
=DNSName(response
, responseLen
, sizeof(dnsheader
), false, &rqtype
, &rqclass
, &consumed
);
262 catch(const std::exception
& e
) {
263 if(responseLen
> 0 && static_cast<size_t>(responseLen
) > sizeof(dnsheader
)) {
264 infolog("Backend %s sent us a response with id %d that did not parse: %s", remote
.toStringWithPort(), ntohs(dh
->id
), e
.what());
266 ++g_stats
.nonCompliantResponses
;
270 if (rqtype
!= qtype
|| rqclass
!= qclass
|| rqname
!= qname
) {
277 static void restoreFlags(struct dnsheader
* dh
, uint16_t origFlags
)
279 static const uint16_t rdMask
= 1 << FLAGS_RD_OFFSET
;
280 static const uint16_t cdMask
= 1 << FLAGS_CD_OFFSET
;
281 static const uint16_t restoreFlagsMask
= UINT16_MAX
& ~(rdMask
| cdMask
);
282 uint16_t * flags
= getFlagsFromDNSHeader(dh
);
283 /* clear the flags we are about to restore */
284 *flags
&= restoreFlagsMask
;
285 /* only keep the flags we want to restore */
286 origFlags
&= ~restoreFlagsMask
;
287 /* set the saved flags as they were */
291 static bool fixUpQueryTurnedResponse(DNSQuestion
& dq
, const uint16_t origFlags
)
293 restoreFlags(dq
.dh
, origFlags
);
295 return addEDNSToQueryTurnedResponse(dq
);
298 static bool fixUpResponse(char** response
, uint16_t* responseLen
, size_t* responseSize
, const DNSName
& qname
, uint16_t origFlags
, bool ednsAdded
, bool ecsAdded
, std::vector
<uint8_t>& rewrittenResponse
, uint16_t addRoom
, bool* zeroScope
)
300 if (*responseLen
< sizeof(dnsheader
)) {
304 struct dnsheader
* dh
= reinterpret_cast<struct dnsheader
*>(*response
);
305 restoreFlags(dh
, origFlags
);
307 if (*responseLen
== sizeof(dnsheader
)) {
312 string realname
= qname
.toDNSString();
313 if (*responseLen
>= (sizeof(dnsheader
) + realname
.length())) {
314 memcpy(*response
+ sizeof(dnsheader
), realname
.c_str(), realname
.length());
318 if (ednsAdded
|| ecsAdded
) {
323 const std::string
responseStr(*response
, *responseLen
);
324 int res
= locateEDNSOptRR(responseStr
, &optStart
, &optLen
, &last
);
327 if (zeroScope
) { // this finds if an EDNS Client Subnet scope was set, and if it is 0
328 size_t optContentStart
= 0;
329 uint16_t optContentLen
= 0;
330 /* we need at least 4 bytes after the option length (family: 2, source prefix-length: 1, scope prefix-length: 1) */
331 if (isEDNSOptionInOpt(responseStr
, optStart
, optLen
, EDNSOptionCode::ECS
, &optContentStart
, &optContentLen
) && optContentLen
>= 4) {
332 /* see if the EDNS Client Subnet SCOPE PREFIX-LENGTH byte in position 3 is set to 0, which is the only thing
334 *zeroScope
= responseStr
.at(optContentStart
+ 3) == 0;
339 /* we added the entire OPT RR,
340 therefore we need to remove it entirely */
342 /* simply remove the last AR */
343 *responseLen
-= optLen
;
344 uint16_t arcount
= ntohs(dh
->arcount
);
346 dh
->arcount
= htons(arcount
);
349 /* Removing an intermediary RR could lead to compression error */
350 if (rewriteResponseWithoutEDNS(responseStr
, rewrittenResponse
) == 0) {
351 *responseLen
= rewrittenResponse
.size();
352 if (addRoom
&& (UINT16_MAX
- *responseLen
) > addRoom
) {
353 rewrittenResponse
.reserve(*responseLen
+ addRoom
);
355 *responseSize
= rewrittenResponse
.capacity();
356 *response
= reinterpret_cast<char*>(rewrittenResponse
.data());
359 warnlog("Error rewriting content");
364 /* the OPT RR was already present, but without ECS,
365 we need to remove the ECS option if any */
367 /* nothing after the OPT RR, we can simply remove the
369 size_t existingOptLen
= optLen
;
370 removeEDNSOptionFromOPT(*response
+ optStart
, &optLen
, EDNSOptionCode::ECS
);
371 *responseLen
-= (existingOptLen
- optLen
);
374 /* Removing an intermediary RR could lead to compression error */
375 if (rewriteResponseWithoutEDNSOption(responseStr
, EDNSOptionCode::ECS
, rewrittenResponse
) == 0) {
376 *responseLen
= rewrittenResponse
.size();
377 if (addRoom
&& (UINT16_MAX
- *responseLen
) > addRoom
) {
378 rewrittenResponse
.reserve(*responseLen
+ addRoom
);
380 *responseSize
= rewrittenResponse
.capacity();
381 *response
= reinterpret_cast<char*>(rewrittenResponse
.data());
384 warnlog("Error rewriting content");
395 static bool encryptResponse(char* response
, uint16_t* responseLen
, size_t responseSize
, bool tcp
, std::shared_ptr
<DNSCryptQuery
> dnsCryptQuery
, dnsheader
** dh
, dnsheader
* dhCopy
)
398 uint16_t encryptedResponseLen
= 0;
400 /* save the original header before encrypting it in place */
401 if (dh
!= nullptr && *dh
!= nullptr && dhCopy
!= nullptr) {
402 memcpy(dhCopy
, *dh
, sizeof(dnsheader
));
406 int res
= dnsCryptQuery
->encryptResponse(response
, *responseLen
, responseSize
, tcp
, &encryptedResponseLen
);
408 *responseLen
= encryptedResponseLen
;
410 /* dropping response */
411 vinfolog("Error encrypting the response, dropping.");
417 #endif /* HAVE_DNSCRYPT */
419 static bool applyRulesToResponse(LocalStateHolder
<vector
<DNSDistResponseRuleAction
> >& localRespRulactions
, DNSResponse
& dr
)
421 DNSResponseAction::Action action
=DNSResponseAction::Action::None
;
422 std::string ruleresult
;
423 for(const auto& lr
: *localRespRulactions
) {
424 if(lr
.d_rule
->matches(&dr
)) {
425 lr
.d_rule
->d_matches
++;
426 action
=(*lr
.d_action
)(&dr
, &ruleresult
);
428 case DNSResponseAction::Action::Allow
:
431 case DNSResponseAction::Action::Drop
:
434 case DNSResponseAction::Action::HeaderModify
:
437 case DNSResponseAction::Action::ServFail
:
438 dr
.dh
->rcode
= RCode::ServFail
;
441 /* non-terminal actions follow */
442 case DNSResponseAction::Action::Delay
:
443 dr
.delayMsec
= static_cast<int>(pdns_stou(ruleresult
)); // sorry
445 case DNSResponseAction::Action::None
:
454 bool processResponse(char** response
, uint16_t* responseLen
, size_t* responseSize
, LocalStateHolder
<vector
<DNSDistResponseRuleAction
> >& localRespRulactions
, DNSResponse
& dr
, size_t addRoom
, std::vector
<uint8_t>& rewrittenResponse
, bool muted
)
456 if (!applyRulesToResponse(localRespRulactions
, dr
)) {
460 bool zeroScope
= false;
461 if (!fixUpResponse(response
, responseLen
, responseSize
, *dr
.qname
, dr
.origFlags
, dr
.ednsAdded
, dr
.ecsAdded
, rewrittenResponse
, addRoom
, dr
.useZeroScope
? &zeroScope
: nullptr)) {
465 if (dr
.packetCache
&& !dr
.skipCache
&& *responseLen
<= s_maxPacketCacheEntrySize
) {
466 if (!dr
.useZeroScope
) {
467 /* if the query was not suitable for zero-scope, for
468 example because it had an existing ECS entry so the hash is
469 not really 'no ECS', so just insert it for the existing subnet
471 - we don't have the correct hash for a non-ECS query
472 - inserting with hash computed before the ECS replacement but with
473 the subnet extracted _after_ the replacement would not work.
477 // if zeroScope, pass the pre-ECS hash-key and do not pass the subnet to the cache
478 dr
.packetCache
->insert(zeroScope
? dr
.cacheKeyNoECS
: dr
.cacheKey
, zeroScope
? boost::none
: dr
.subnet
, dr
.origFlags
, dr
.dnssecOK
, *dr
.qname
, dr
.qtype
, dr
.qclass
, *response
, *responseLen
, dr
.tcp
, dr
.dh
->rcode
, dr
.tempFailureTTL
);
483 if (!encryptResponse(*response
, responseLen
, *responseSize
, dr
.tcp
, dr
.dnsCryptQuery
, nullptr, nullptr)) {
487 #endif /* HAVE_DNSCRYPT */
492 static bool sendUDPResponse(int origFD
, const char* response
, const uint16_t responseLen
, const int delayMsec
, const ComboAddress
& origDest
, const ComboAddress
& origRemote
)
494 if(delayMsec
&& g_delay
) {
495 DelayedPacket dp
{origFD
, string(response
,responseLen
), origRemote
, origDest
};
496 g_delay
->submit(dp
, delayMsec
);
500 if(origDest
.sin4
.sin_family
== 0) {
501 res
= sendto(origFD
, response
, responseLen
, 0, reinterpret_cast<const struct sockaddr
*>(&origRemote
), origRemote
.getSocklen());
504 res
= sendfromto(origFD
, response
, responseLen
, 0, origDest
, origRemote
);
508 vinfolog("Error sending response to %s: %s", origRemote
.toStringWithPort(), stringerror(err
));
516 int pickBackendSocketForSending(std::shared_ptr
<DownstreamState
>& state
)
518 return state
->sockets
[state
->socketsOffset
++ % state
->sockets
.size()];
521 static void pickBackendSocketsReadyForReceiving(const std::shared_ptr
<DownstreamState
>& state
, std::vector
<int>& ready
)
525 if (state
->sockets
.size() == 1) {
526 ready
.push_back(state
->sockets
[0]);
531 std::lock_guard
<std::mutex
> lock(state
->socketsLock
);
532 state
->mplexer
->getAvailableFDs(ready
, -1);
536 // listens on a dedicated socket, lobs answers from downstream servers to original requestors
537 void responderThread(std::shared_ptr
<DownstreamState
> dss
)
539 setThreadName("dnsdist/respond");
540 auto localRespRulactions
= g_resprulactions
.getLocal();
541 char packet
[s_maxPacketCacheEntrySize
+ DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE
];
542 static_assert(sizeof(packet
) <= UINT16_MAX
, "Packet size should fit in a uint16_t");
543 /* when the answer is encrypted in place, we need to get a copy
544 of the original header before encryption to fill the ring buffer */
545 dnsheader cleartextDH
;
546 vector
<uint8_t> rewrittenResponse
;
548 uint16_t queryId
= 0;
549 std::vector
<int> sockets
;
550 sockets
.reserve(dss
->sockets
.size());
553 dnsheader
* dh
= reinterpret_cast<struct dnsheader
*>(packet
);
555 pickBackendSocketsReadyForReceiving(dss
, sockets
);
556 for (const auto& fd
: sockets
) {
557 ssize_t got
= recv(fd
, packet
, sizeof(packet
), 0);
558 char * response
= packet
;
559 size_t responseSize
= sizeof(packet
);
561 if (got
< 0 || static_cast<size_t>(got
) < sizeof(dnsheader
))
564 uint16_t responseLen
= static_cast<uint16_t>(got
);
567 if(queryId
>= dss
->idStates
.size()) {
571 IDState
* ids
= &dss
->idStates
[queryId
];
572 int64_t usageIndicator
= ids
->usageIndicator
;
574 if(!IDState::isInUse(usageIndicator
)) {
575 /* the corresponding state is marked as not in use, meaning that:
576 - it was already cleaned up by another thread and the state is gone ;
577 - we already got a response for this query and this one is a duplicate.
578 Either way, we don't touch it.
583 /* read the potential DOHUnit state as soon as possible, but don't use it
584 until we have confirmed that we own this state by updating usageIndicator */
586 /* setting age to 0 to prevent the maintainer thread from
587 cleaning this IDS while we process the response.
590 int origFD
= ids
->origFD
;
592 unsigned int consumed
= 0;
593 if (!responseContentMatches(response
, responseLen
, ids
->qname
, ids
->qtype
, ids
->qclass
, dss
->remote
, consumed
)) {
597 bool isDoH
= du
!= nullptr;
598 /* atomically mark the state as available, but only if it has not been altered
600 if (ids
->tryMarkUnused(usageIndicator
)) {
601 /* clear the potential DOHUnit asap, it's ours now
602 and since we just marked the state as unused,
603 someone could overwrite it. */
605 /* we only decrement the outstanding counter if the value was not
606 altered in the meantime, which would mean that the state has been actively reused
607 and the other thread has not incremented the outstanding counter, so we don't
608 want it to be decremented twice. */
609 --dss
->outstanding
; // you'd think an attacker could game this, but we're using connected socket
611 /* someone updated the state in the meantime, we can't touch the existing pointer */
613 /* since the state has been updated, we can't safely access it so let's just drop
618 if(dh
->tc
&& g_truncateTC
) {
619 truncateTC(response
, &responseLen
, responseSize
, consumed
);
622 dh
->id
= ids
->origID
;
624 uint16_t addRoom
= 0;
625 DNSResponse dr
= makeDNSResponseFromIDState(*ids
, dh
, sizeof(packet
), responseLen
, false);
626 if (dr
.dnsCryptQuery
) {
627 addRoom
= DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE
;
630 memcpy(&cleartextDH
, dr
.dh
, sizeof(cleartextDH
));
631 if (!processResponse(&response
, &responseLen
, &responseSize
, localRespRulactions
, dr
, addRoom
, rewrittenResponse
, ids
->cs
&& ids
->cs
->muted
)) {
635 if (ids
->cs
&& !ids
->cs
->muted
) {
637 #ifdef HAVE_DNS_OVER_HTTPS
639 du
->response
= std::string(response
, responseLen
);
640 if (send(du
->rsock
, &du
, sizeof(du
), 0) != sizeof(du
)) {
641 /* at this point we have the only remaining pointer on this
642 DOHUnit object since we did set ids->du to nullptr earlier,
643 except if we got the response before the pointer could be
644 released by the frontend */
647 #endif /* HAVE_DNS_OVER_HTTPS */
652 empty
.sin4
.sin_family
= 0;
653 /* if ids->destHarvested is false, origDest holds the listening address.
654 We don't want to use that as a source since it could be 0.0.0.0 for example. */
655 sendUDPResponse(origFD
, response
, responseLen
, dr
.delayMsec
, ids
->destHarvested
? ids
->origDest
: empty
, ids
->origRemote
);
661 ++ids
->cs
->responses
;
665 double udiff
= ids
->sentTime
.udiff();
666 vinfolog("Got answer from %s, relayed to %s%s, took %f usec", dss
->remote
.toStringWithPort(), ids
->origRemote
.toStringWithPort(),
667 isDoH
? " (https)": "", udiff
);
671 g_rings
.insertResponse(ts
, *dr
.remote
, *dr
.qname
, dr
.qtype
, static_cast<unsigned int>(udiff
), static_cast<unsigned int>(got
), cleartextDH
, dss
->remote
);
673 switch (cleartextDH
.rcode
) {
674 case RCode::NXDomain
:
675 ++g_stats
.frontendNXDomain
;
677 case RCode::ServFail
:
678 ++g_stats
.servfailResponses
;
679 ++g_stats
.frontendServFail
;
682 ++g_stats
.frontendNoError
;
685 dss
->latencyUsec
= (127.0 * dss
->latencyUsec
/ 128.0) + udiff
/128.0;
687 doLatencyStats(udiff
);
689 rewrittenResponse
.clear();
692 catch(const std::exception
& e
){
693 vinfolog("Got an error in UDP responder thread while parsing a response from %s, id %d: %s", dss
->remote
.toStringWithPort(), queryId
, e
.what());
697 catch(const std::exception
& e
)
699 errlog("UDP responder thread died because of exception: %s", e
.what());
701 catch(const PDNSException
& e
)
703 errlog("UDP responder thread died because of PowerDNS exception: %s", e
.reason
);
707 errlog("UDP responder thread died because of an exception: %s", "unknown");
710 bool DownstreamState::reconnect()
712 std::unique_lock
<std::mutex
> tl(connectLock
, std::try_to_lock
);
713 if (!tl
.owns_lock()) {
714 /* we are already reconnecting */
719 for (auto& fd
: sockets
) {
721 if (sockets
.size() > 1) {
722 std::lock_guard
<std::mutex
> lock(socketsLock
);
723 mplexer
->removeReadFD(fd
);
725 /* shutdown() is needed to wake up recv() in the responderThread */
726 shutdown(fd
, SHUT_RDWR
);
730 if (!IsAnyAddress(remote
)) {
731 fd
= SSocket(remote
.sin4
.sin_family
, SOCK_DGRAM
, 0);
732 if (!IsAnyAddress(sourceAddr
)) {
733 SSetsockopt(fd
, SOL_SOCKET
, SO_REUSEADDR
, 1);
734 if (!sourceItfName
.empty()) {
735 #ifdef SO_BINDTODEVICE
736 int res
= setsockopt(fd
, SOL_SOCKET
, SO_BINDTODEVICE
, sourceItfName
.c_str(), sourceItfName
.length());
738 infolog("Error setting up the interface on backend socket '%s': %s", remote
.toStringWithPort(), stringerror());
743 SBind(fd
, sourceAddr
);
746 SConnect(fd
, remote
);
747 if (sockets
.size() > 1) {
748 std::lock_guard
<std::mutex
> lock(socketsLock
);
749 mplexer
->addReadFD(fd
, [](int, boost::any
) {});
753 catch(const std::runtime_error
& error
) {
754 infolog("Error connecting to new server with address %s: %s", remote
.toStringWithPort(), error
.what());
761 /* if at least one (re-)connection failed, close all sockets */
763 for (auto& fd
: sockets
) {
765 if (sockets
.size() > 1) {
766 std::lock_guard
<std::mutex
> lock(socketsLock
);
767 mplexer
->removeReadFD(fd
);
769 /* shutdown() is needed to wake up recv() in the responderThread */
770 shutdown(fd
, SHUT_RDWR
);
779 void DownstreamState::hash()
781 vinfolog("Computing hashes for id=%s and weight=%d", id
, weight
);
783 WriteLock
wl(&d_lock
);
786 std::string uuid
= boost::str(boost::format("%s-%d") % id
% w
);
787 unsigned int wshash
= burtleCI((const unsigned char*)uuid
.c_str(), uuid
.size(), g_hashperturb
);
788 hashes
.insert(wshash
);
793 void DownstreamState::setId(const boost::uuids::uuid
& newId
)
796 // compute hashes only if already done
797 if (!hashes
.empty()) {
802 void DownstreamState::setWeight(int newWeight
)
805 errlog("Error setting server's weight: downstream weight value must be greater than 0.");
809 if (!hashes
.empty()) {
814 DownstreamState::DownstreamState(const ComboAddress
& remote_
, const ComboAddress
& sourceAddr_
, unsigned int sourceItf_
, const std::string
& sourceItfName_
, size_t numberOfSockets
, bool connect
=true): sourceItfName(sourceItfName_
), remote(remote_
), sourceAddr(sourceAddr_
), sourceItf(sourceItf_
)
816 pthread_rwlock_init(&d_lock
, nullptr);
818 threadStarted
.clear();
820 mplexer
= std::unique_ptr
<FDMultiplexer
>(FDMultiplexer::getMultiplexerSilent());
822 sockets
.resize(numberOfSockets
);
823 for (auto& fd
: sockets
) {
827 if (connect
&& !IsAnyAddress(remote
)) {
829 idStates
.resize(g_maxOutstanding
);
831 infolog("Added downstream server %s", remote
.toStringWithPort());
836 std::mutex g_luamutex
;
839 GlobalStateHolder
<ServerPolicy
> g_policy
;
841 shared_ptr
<DownstreamState
> firstAvailable(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
843 for(auto& d
: servers
) {
844 if(d
.second
->isUp() && d
.second
->qps
.check())
847 return leastOutstanding(servers
, dq
);
850 // get server with least outstanding queries, and within those, with the lowest order, and within those: the fastest
851 shared_ptr
<DownstreamState
> leastOutstanding(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
853 if (servers
.size() == 1 && servers
[0].second
->isUp()) {
854 return servers
[0].second
;
857 vector
<pair
<tuple
<int,int,double>, shared_ptr
<DownstreamState
>>> poss
;
858 /* so you might wonder, why do we go through this trouble? The data on which we sort could change during the sort,
859 which would suck royally and could even lead to crashes. So first we snapshot on what we sort, and then we sort */
860 poss
.reserve(servers
.size());
861 for(auto& d
: servers
) {
862 if(d
.second
->isUp()) {
863 poss
.push_back({make_tuple(d
.second
->outstanding
.load(), d
.second
->order
, d
.second
->latencyUsec
), d
.second
});
867 return shared_ptr
<DownstreamState
>();
868 nth_element(poss
.begin(), poss
.begin(), poss
.end(), [](const decltype(poss
)::value_type
& a
, const decltype(poss
)::value_type
& b
) { return a
.first
< b
.first
; });
869 return poss
.begin()->second
;
872 shared_ptr
<DownstreamState
> valrandom(unsigned int val
, const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
874 vector
<pair
<int, shared_ptr
<DownstreamState
>>> poss
;
876 int max
= std::numeric_limits
<int>::max();
878 for(auto& d
: servers
) { // w=1, w=10 -> 1, 11
879 if(d
.second
->isUp()) {
880 // Don't overflow sum when adding high weights
881 if(d
.second
->weight
> max
- sum
) {
884 sum
+= d
.second
->weight
;
887 poss
.push_back({sum
, d
.second
});
891 // Catch poss & sum are empty to avoid SIGFPE
893 return shared_ptr
<DownstreamState
>();
896 auto p
= upper_bound(poss
.begin(), poss
.end(),r
, [](int r_
, const decltype(poss
)::value_type
& a
) { return r_
< a
.first
;});
898 return shared_ptr
<DownstreamState
>();
902 shared_ptr
<DownstreamState
> wrandom(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
904 return valrandom(random(), servers
, dq
);
907 uint32_t g_hashperturb
;
908 double g_consistentHashBalancingFactor
= 0;
909 shared_ptr
<DownstreamState
> whashed(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
911 return valrandom(dq
->qname
->hash(g_hashperturb
), servers
, dq
);
914 shared_ptr
<DownstreamState
> chashed(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
916 unsigned int qhash
= dq
->qname
->hash(g_hashperturb
);
917 unsigned int sel
= std::numeric_limits
<unsigned int>::max();
918 unsigned int min
= std::numeric_limits
<unsigned int>::max();
919 shared_ptr
<DownstreamState
> ret
= nullptr, first
= nullptr;
921 double targetLoad
= std::numeric_limits
<double>::max();
922 if (g_consistentHashBalancingFactor
> 0) {
923 /* we start with one, representing the query we are currently handling */
924 double currentLoad
= 1;
925 for (const auto& pair
: servers
) {
926 currentLoad
+= pair
.second
->outstanding
;
928 targetLoad
= (currentLoad
/ servers
.size()) * g_consistentHashBalancingFactor
;
931 for (const auto& d
: servers
) {
932 if (d
.second
->isUp() && d
.second
->outstanding
<= targetLoad
) {
933 // make sure hashes have been computed
934 if (d
.second
->hashes
.empty()) {
938 ReadLock
rl(&(d
.second
->d_lock
));
939 const auto& server
= d
.second
;
940 // we want to keep track of the last hash
941 if (min
> *(server
->hashes
.begin())) {
942 min
= *(server
->hashes
.begin());
946 auto hash_it
= server
->hashes
.lower_bound(qhash
);
947 if (hash_it
!= server
->hashes
.end()) {
948 if (*hash_it
< sel
) {
956 if (ret
!= nullptr) {
959 if (first
!= nullptr) {
962 return shared_ptr
<DownstreamState
>();
965 shared_ptr
<DownstreamState
> roundrobin(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
967 NumberedServerVector poss
;
969 for(auto& d
: servers
) {
970 if(d
.second
->isUp()) {
975 const auto *res
=&poss
;
976 if(poss
.empty() && !g_roundrobinFailOnNoServer
)
980 return shared_ptr
<DownstreamState
>();
982 static unsigned int counter
;
984 return (*res
)[(counter
++) % res
->size()].second
;
987 ComboAddress g_serverControl
{"127.0.0.1:5199"};
989 std::shared_ptr
<ServerPool
> createPoolIfNotExists(pools_t
& pools
, const string
& poolName
)
991 std::shared_ptr
<ServerPool
> pool
;
992 pools_t::iterator it
= pools
.find(poolName
);
993 if (it
!= pools
.end()) {
997 if (!poolName
.empty())
998 vinfolog("Creating pool %s", poolName
);
999 pool
= std::make_shared
<ServerPool
>();
1000 pools
.insert(std::pair
<std::string
,std::shared_ptr
<ServerPool
> >(poolName
, pool
));
1005 void setPoolPolicy(pools_t
& pools
, const string
& poolName
, std::shared_ptr
<ServerPolicy
> policy
)
1007 std::shared_ptr
<ServerPool
> pool
= createPoolIfNotExists(pools
, poolName
);
1008 if (!poolName
.empty()) {
1009 vinfolog("Setting pool %s server selection policy to %s", poolName
, policy
->name
);
1011 vinfolog("Setting default pool server selection policy to %s", policy
->name
);
1013 pool
->policy
= policy
;
1016 void addServerToPool(pools_t
& pools
, const string
& poolName
, std::shared_ptr
<DownstreamState
> server
)
1018 std::shared_ptr
<ServerPool
> pool
= createPoolIfNotExists(pools
, poolName
);
1019 if (!poolName
.empty()) {
1020 vinfolog("Adding server to pool %s", poolName
);
1022 vinfolog("Adding server to default pool");
1024 pool
->addServer(server
);
1027 void removeServerFromPool(pools_t
& pools
, const string
& poolName
, std::shared_ptr
<DownstreamState
> server
)
1029 std::shared_ptr
<ServerPool
> pool
= getPool(pools
, poolName
);
1031 if (!poolName
.empty()) {
1032 vinfolog("Removing server from pool %s", poolName
);
1035 vinfolog("Removing server from default pool");
1038 pool
->removeServer(server
);
1041 std::shared_ptr
<ServerPool
> getPool(const pools_t
& pools
, const std::string
& poolName
)
1043 pools_t::const_iterator it
= pools
.find(poolName
);
1045 if (it
== pools
.end()) {
1046 throw std::out_of_range("No pool named " + poolName
);
1052 NumberedServerVector
getDownstreamCandidates(const pools_t
& pools
, const std::string
& poolName
)
1054 std::shared_ptr
<ServerPool
> pool
= getPool(pools
, poolName
);
1055 return pool
->getServers();
1058 static void spoofResponseFromString(DNSQuestion
& dq
, const string
& spoofContent
, bool raw
)
1063 SpoofAction
sa(spoofContent
);
1067 std::vector
<std::string
> addrs
;
1068 stringtok(addrs
, spoofContent
, " ,");
1070 if (addrs
.size() == 1) {
1072 ComboAddress
spoofAddr(spoofContent
);
1073 SpoofAction
sa({spoofAddr
});
1076 catch(const PDNSException
&e
) {
1077 DNSName
cname(spoofContent
);
1078 SpoofAction
sa(cname
); // CNAME then
1082 std::vector
<ComboAddress
> cas
;
1083 for (const auto& addr
: addrs
) {
1085 cas
.push_back(ComboAddress(addr
));
1090 SpoofAction
sa(cas
);
1096 bool processRulesResult(const DNSAction::Action
& action
, DNSQuestion
& dq
, std::string
& ruleresult
, bool& drop
)
1099 case DNSAction::Action::Allow
:
1102 case DNSAction::Action::Drop
:
1107 case DNSAction::Action::Nxdomain
:
1108 dq
.dh
->rcode
= RCode::NXDomain
;
1110 ++g_stats
.ruleNXDomain
;
1113 case DNSAction::Action::Refused
:
1114 dq
.dh
->rcode
= RCode::Refused
;
1116 ++g_stats
.ruleRefused
;
1119 case DNSAction::Action::ServFail
:
1120 dq
.dh
->rcode
= RCode::ServFail
;
1122 ++g_stats
.ruleServFail
;
1125 case DNSAction::Action::Spoof
:
1126 spoofResponseFromString(dq
, ruleresult
, false);
1129 case DNSAction::Action::SpoofRaw
:
1130 spoofResponseFromString(dq
, ruleresult
, true);
1133 case DNSAction::Action::Truncate
:
1136 dq
.dh
->ra
= dq
.dh
->rd
;
1141 case DNSAction::Action::HeaderModify
:
1144 case DNSAction::Action::Pool
:
1145 dq
.poolname
=ruleresult
;
1148 case DNSAction::Action::NoRecurse
:
1152 /* non-terminal actions follow */
1153 case DNSAction::Action::Delay
:
1154 dq
.delayMsec
= static_cast<int>(pdns_stou(ruleresult
)); // sorry
1156 case DNSAction::Action::None
:
1158 case DNSAction::Action::NoOp
:
1162 /* false means that we don't stop the processing */
1167 static bool applyRulesToQuery(LocalHolders
& holders
, DNSQuestion
& dq
, const struct timespec
& now
)
1169 g_rings
.insertQuery(now
, *dq
.remote
, *dq
.qname
, dq
.qtype
, dq
.len
, *dq
.dh
);
1171 if(g_qcount
.enabled
) {
1172 string qname
= (*dq
.qname
).toLogString();
1173 bool countQuery
{true};
1174 if(g_qcount
.filter
) {
1175 std::lock_guard
<std::mutex
> lock(g_luamutex
);
1176 std::tie (countQuery
, qname
) = g_qcount
.filter(&dq
);
1180 WriteLock
wl(&g_qcount
.queryLock
);
1181 if(!g_qcount
.records
.count(qname
)) {
1182 g_qcount
.records
[qname
] = 0;
1184 g_qcount
.records
[qname
]++;
1188 if(auto got
= holders
.dynNMGBlock
->lookup(*dq
.remote
)) {
1189 auto updateBlockStats
= [&got
]() {
1190 ++g_stats
.dynBlocked
;
1191 got
->second
.blocks
++;
1194 if(now
< got
->second
.until
) {
1195 DNSAction::Action action
= got
->second
.action
;
1196 if (action
== DNSAction::Action::None
) {
1197 action
= g_dynBlockAction
;
1200 case DNSAction::Action::NoOp
:
1204 case DNSAction::Action::Nxdomain
:
1205 vinfolog("Query from %s turned into NXDomain because of dynamic block", dq
.remote
->toStringWithPort());
1208 dq
.dh
->rcode
= RCode::NXDomain
;
1212 case DNSAction::Action::Refused
:
1213 vinfolog("Query from %s refused because of dynamic block", dq
.remote
->toStringWithPort());
1216 dq
.dh
->rcode
= RCode::Refused
;
1220 case DNSAction::Action::Truncate
:
1223 vinfolog("Query from %s truncated because of dynamic block", dq
.remote
->toStringWithPort());
1226 dq
.dh
->ra
= dq
.dh
->rd
;
1232 vinfolog("Query from %s for %s over TCP *not* truncated because of dynamic block", dq
.remote
->toStringWithPort(), dq
.qname
->toLogString());
1235 case DNSAction::Action::NoRecurse
:
1237 vinfolog("Query from %s setting rd=0 because of dynamic block", dq
.remote
->toStringWithPort());
1242 vinfolog("Query from %s dropped because of dynamic block", dq
.remote
->toStringWithPort());
1248 if(auto got
= holders
.dynSMTBlock
->lookup(*dq
.qname
)) {
1249 auto updateBlockStats
= [&got
]() {
1250 ++g_stats
.dynBlocked
;
1254 if(now
< got
->until
) {
1255 DNSAction::Action action
= got
->action
;
1256 if (action
== DNSAction::Action::None
) {
1257 action
= g_dynBlockAction
;
1260 case DNSAction::Action::NoOp
:
1263 case DNSAction::Action::Nxdomain
:
1264 vinfolog("Query from %s for %s turned into NXDomain because of dynamic block", dq
.remote
->toStringWithPort(), dq
.qname
->toLogString());
1267 dq
.dh
->rcode
= RCode::NXDomain
;
1270 case DNSAction::Action::Refused
:
1271 vinfolog("Query from %s for %s refused because of dynamic block", dq
.remote
->toStringWithPort(), dq
.qname
->toLogString());
1274 dq
.dh
->rcode
= RCode::Refused
;
1277 case DNSAction::Action::Truncate
:
1281 vinfolog("Query from %s for %s truncated because of dynamic block", dq
.remote
->toStringWithPort(), dq
.qname
->toLogString());
1284 dq
.dh
->ra
= dq
.dh
->rd
;
1290 vinfolog("Query from %s for %s over TCP *not* truncated because of dynamic block", dq
.remote
->toStringWithPort(), dq
.qname
->toLogString());
1293 case DNSAction::Action::NoRecurse
:
1295 vinfolog("Query from %s setting rd=0 because of dynamic block", dq
.remote
->toStringWithPort());
1300 vinfolog("Query from %s for %s dropped because of dynamic block", dq
.remote
->toStringWithPort(), dq
.qname
->toLogString());
1306 DNSAction::Action action
=DNSAction::Action::None
;
1309 for(const auto& lr
: *holders
.rulactions
) {
1310 if(lr
.d_rule
->matches(&dq
)) {
1311 lr
.d_rule
->d_matches
++;
1312 action
=(*lr
.d_action
)(&dq
, &ruleresult
);
1313 if (processRulesResult(action
, dq
, ruleresult
, drop
)) {
1326 ssize_t
udpClientSendRequestToBackend(const std::shared_ptr
<DownstreamState
>& ss
, const int sd
, const char* request
, const size_t requestLen
, bool healthCheck
)
1330 if (ss
->sourceItf
== 0) {
1331 result
= send(sd
, request
, requestLen
, 0);
1336 cmsgbuf_aligned cbuf
;
1337 ComboAddress
remote(ss
->remote
);
1338 fillMSGHdr(&msgh
, &iov
, &cbuf
, sizeof(cbuf
), const_cast<char*>(request
), requestLen
, &remote
);
1339 addCMsgSrcAddr(&msgh
, &cbuf
, &ss
->sourceAddr
, ss
->sourceItf
);
1340 result
= sendmsg(sd
, &msgh
, 0);
1344 int savederrno
= errno
;
1345 vinfolog("Error sending request to backend %s: %d", ss
->remote
.toStringWithPort(), savederrno
);
1347 /* This might sound silly, but on Linux send() might fail with EINVAL
1348 if the interface the socket was bound to doesn't exist anymore.
1349 We don't want to reconnect the real socket if the healthcheck failed,
1350 because it's not using the same socket.
1352 if (!healthCheck
&& (savederrno
== EINVAL
|| savederrno
== ENODEV
)) {
1360 static bool isUDPQueryAcceptable(ClientState
& cs
, LocalHolders
& holders
, const struct msghdr
* msgh
, const ComboAddress
& remote
, ComboAddress
& dest
)
1362 if (msgh
->msg_flags
& MSG_TRUNC
) {
1363 /* message was too large for our buffer */
1364 vinfolog("Dropping message too large for our buffer");
1365 ++g_stats
.nonCompliantQueries
;
1369 if(!holders
.acl
->match(remote
)) {
1370 vinfolog("Query from %s dropped because of ACL", remote
.toStringWithPort());
1378 if (HarvestDestinationAddress(msgh
, &dest
)) {
1379 /* we don't get the port, only the address */
1380 dest
.sin4
.sin_port
= cs
.local
.sin4
.sin_port
;
1383 dest
.sin4
.sin_family
= 0;
1389 boost::optional
<std::vector
<uint8_t>> checkDNSCryptQuery(const ClientState
& cs
, const char* query
, uint16_t& len
, std::shared_ptr
<DNSCryptQuery
>& dnsCryptQuery
, time_t now
, bool tcp
)
1391 if (cs
.dnscryptCtx
) {
1392 #ifdef HAVE_DNSCRYPT
1393 vector
<uint8_t> response
;
1394 uint16_t decryptedQueryLen
= 0;
1396 dnsCryptQuery
= std::make_shared
<DNSCryptQuery
>(cs
.dnscryptCtx
);
1398 bool decrypted
= handleDNSCryptQuery(const_cast<char*>(query
), len
, dnsCryptQuery
, &decryptedQueryLen
, tcp
, now
, response
);
1401 if (response
.size() > 0) {
1404 throw std::runtime_error("Unable to decrypt DNSCrypt query, dropping.");
1407 len
= decryptedQueryLen
;
1408 #endif /* HAVE_DNSCRYPT */
1413 bool checkQueryHeaders(const struct dnsheader
* dh
)
1415 if (dh
->qr
) { // don't respond to responses
1416 ++g_stats
.nonCompliantQueries
;
1420 if (dh
->qdcount
== 0) {
1421 ++g_stats
.emptyQueries
;
1426 ++g_stats
.rdQueries
;
1432 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1433 static void queueResponse(const ClientState
& cs
, const char* response
, uint16_t responseLen
, const ComboAddress
& dest
, const ComboAddress
& remote
, struct mmsghdr
& outMsg
, struct iovec
* iov
, cmsgbuf_aligned
* cbuf
)
1436 fillMSGHdr(&outMsg
.msg_hdr
, iov
, nullptr, 0, const_cast<char*>(response
), responseLen
, const_cast<ComboAddress
*>(&remote
));
1438 if (dest
.sin4
.sin_family
== 0) {
1439 outMsg
.msg_hdr
.msg_control
= nullptr;
1442 addCMsgSrcAddr(&outMsg
.msg_hdr
, cbuf
, &dest
, 0);
1445 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1447 /* self-generated responses or cache hits */
1448 static bool prepareOutgoingResponse(LocalHolders
& holders
, ClientState
& cs
, DNSQuestion
& dq
, bool cacheHit
)
1450 DNSResponse
dr(dq
.qname
, dq
.qtype
, dq
.qclass
, dq
.consumed
, dq
.local
, dq
.remote
, reinterpret_cast<dnsheader
*>(dq
.dh
), dq
.size
, dq
.len
, dq
.tcp
, dq
.queryTime
);
1452 #ifdef HAVE_PROTOBUF
1453 dr
.uniqueId
= dq
.uniqueId
;
1456 dr
.delayMsec
= dq
.delayMsec
;
1458 if (!applyRulesToResponse(cacheHit
? holders
.cacheHitRespRulactions
: holders
.selfAnsweredRespRulactions
, dr
)) {
1462 /* in case a rule changed it */
1463 dq
.delayMsec
= dr
.delayMsec
;
1465 #ifdef HAVE_DNSCRYPT
1467 if (!encryptResponse(reinterpret_cast<char*>(dq
.dh
), &dq
.len
, dq
.size
, dq
.tcp
, dq
.dnsCryptQuery
, nullptr, nullptr)) {
1471 #endif /* HAVE_DNSCRYPT */
1474 ++g_stats
.cacheHits
;
1477 switch (dr
.dh
->rcode
) {
1478 case RCode::NXDomain
:
1479 ++g_stats
.frontendNXDomain
;
1481 case RCode::ServFail
:
1482 ++g_stats
.frontendServFail
;
1484 case RCode::NoError
:
1485 ++g_stats
.frontendNoError
;
1489 doLatencyStats(0); // we're not going to measure this
1493 ProcessQueryResult
processQuery(DNSQuestion
& dq
, ClientState
& cs
, LocalHolders
& holders
, std::shared_ptr
<DownstreamState
>& selectedBackend
)
1495 const uint16_t queryId
= ntohs(dq
.dh
->id
);
1498 /* we need an accurate ("real") value for the response and
1499 to store into the IDS, but not for insertion into the
1500 rings for example */
1501 struct timespec now
;
1504 if (!applyRulesToQuery(holders
, dq
, now
)) {
1505 return ProcessQueryResult::Drop
;
1508 if(dq
.dh
->qr
) { // something turned it into a response
1509 fixUpQueryTurnedResponse(dq
, dq
.origFlags
);
1511 if (!prepareOutgoingResponse(holders
, cs
, dq
, false)) {
1512 return ProcessQueryResult::Drop
;
1515 ++g_stats
.selfAnswered
;
1517 return ProcessQueryResult::SendAnswer
;
1520 std::shared_ptr
<ServerPool
> serverPool
= getPool(*holders
.pools
, dq
.poolname
);
1521 dq
.packetCache
= serverPool
->packetCache
;
1522 auto policy
= *(holders
.policy
);
1523 if (serverPool
->policy
!= nullptr) {
1524 policy
= *(serverPool
->policy
);
1526 auto servers
= serverPool
->getServers();
1528 std::lock_guard
<std::mutex
> lock(g_luamutex
);
1529 selectedBackend
= policy
.policy(servers
, &dq
);
1532 selectedBackend
= policy
.policy(servers
, &dq
);
1535 uint16_t cachedResponseSize
= dq
.size
;
1536 uint32_t allowExpired
= selectedBackend
? 0 : g_staleCacheEntriesTTL
;
1538 if (dq
.packetCache
&& !dq
.skipCache
) {
1539 dq
.dnssecOK
= (getEDNSZ(dq
) & EDNS_HEADER_FLAG_DO
);
1542 if (dq
.useECS
&& ((selectedBackend
&& selectedBackend
->useECS
) || (!selectedBackend
&& serverPool
->getECS()))) {
1543 // we special case our cache in case a downstream explicitly gave us a universally valid response with a 0 scope
1544 // we need ECS parsing (parseECS) to be true so we can be sure that the initial incoming query did not have an existing
1545 // ECS option, which would make it unsuitable for the zero-scope feature.
1546 if (dq
.packetCache
&& !dq
.skipCache
&& (!selectedBackend
|| !selectedBackend
->disableZeroScope
) && dq
.packetCache
->isECSParsingEnabled()) {
1547 if (dq
.packetCache
->get(dq
, dq
.consumed
, dq
.dh
->id
, reinterpret_cast<char*>(dq
.dh
), &cachedResponseSize
, &dq
.cacheKeyNoECS
, dq
.subnet
, dq
.dnssecOK
, allowExpired
)) {
1548 dq
.len
= cachedResponseSize
;
1550 if (!prepareOutgoingResponse(holders
, cs
, dq
, true)) {
1551 return ProcessQueryResult::Drop
;
1554 return ProcessQueryResult::SendAnswer
;
1558 /* there was no existing ECS on the query, enable the zero-scope feature */
1559 dq
.useZeroScope
= true;
1563 if (!handleEDNSClientSubnet(dq
, dq
.ednsAdded
, dq
.ecsAdded
, g_preserveTrailingData
)) {
1564 vinfolog("Dropping query from %s because we couldn't insert the ECS value", dq
.remote
->toStringWithPort());
1565 return ProcessQueryResult::Drop
;
1569 if (dq
.packetCache
&& !dq
.skipCache
) {
1570 if (dq
.packetCache
->get(dq
, dq
.consumed
, dq
.dh
->id
, reinterpret_cast<char*>(dq
.dh
), &cachedResponseSize
, &dq
.cacheKey
, dq
.subnet
, dq
.dnssecOK
, allowExpired
)) {
1571 dq
.len
= cachedResponseSize
;
1573 if (!prepareOutgoingResponse(holders
, cs
, dq
, true)) {
1574 return ProcessQueryResult::Drop
;
1577 return ProcessQueryResult::SendAnswer
;
1579 ++g_stats
.cacheMisses
;
1582 if(!selectedBackend
) {
1585 vinfolog("%s query for %s|%s from %s, no policy applied", g_servFailOnNoPolicy
? "ServFailed" : "Dropped", dq
.qname
->toLogString(), QType(dq
.qtype
).getName(), dq
.remote
->toStringWithPort());
1586 if (g_servFailOnNoPolicy
) {
1587 restoreFlags(dq
.dh
, dq
.origFlags
);
1589 dq
.dh
->rcode
= RCode::ServFail
;
1592 if (!prepareOutgoingResponse(holders
, cs
, dq
, false)) {
1593 return ProcessQueryResult::Drop
;
1595 // no response-only statistics counter to update.
1596 return ProcessQueryResult::SendAnswer
;
1599 return ProcessQueryResult::Drop
;
1602 if (dq
.addXPF
&& selectedBackend
->xpfRRCode
!= 0) {
1603 addXPF(dq
, selectedBackend
->xpfRRCode
, g_preserveTrailingData
);
1606 selectedBackend
->queries
++;
1607 return ProcessQueryResult::PassToBackend
;
1609 catch(const std::exception
& e
){
1610 vinfolog("Got an error while parsing a %s query from %s, id %d: %s", (dq
.tcp
? "TCP" : "UDP"), dq
.remote
->toStringWithPort(), queryId
, e
.what());
1612 return ProcessQueryResult::Drop
;
1615 static void processUDPQuery(ClientState
& cs
, LocalHolders
& holders
, const struct msghdr
* msgh
, const ComboAddress
& remote
, ComboAddress
& dest
, char* query
, uint16_t len
, size_t queryBufferSize
, struct mmsghdr
* responsesVect
, unsigned int* queuedResponses
, struct iovec
* respIOV
, cmsgbuf_aligned
* respCBuf
)
1617 assert(responsesVect
== nullptr || (queuedResponses
!= nullptr && respIOV
!= nullptr && respCBuf
!= nullptr));
1618 uint16_t queryId
= 0;
1621 if (!isUDPQueryAcceptable(cs
, holders
, msgh
, remote
, dest
)) {
1625 /* we need an accurate ("real") value for the response and
1626 to store into the IDS, but not for insertion into the
1627 rings for example */
1628 struct timespec queryRealTime
;
1629 gettime(&queryRealTime
, true);
1631 std::shared_ptr
<DNSCryptQuery
> dnsCryptQuery
= nullptr;
1632 auto dnsCryptResponse
= checkDNSCryptQuery(cs
, query
, len
, dnsCryptQuery
, queryRealTime
.tv_sec
, false);
1633 if (dnsCryptResponse
) {
1634 sendUDPResponse(cs
.udpFD
, reinterpret_cast<char*>(dnsCryptResponse
->data()), static_cast<uint16_t>(dnsCryptResponse
->size()), 0, dest
, remote
);
1638 struct dnsheader
* dh
= reinterpret_cast<struct dnsheader
*>(query
);
1639 queryId
= ntohs(dh
->id
);
1641 if (!checkQueryHeaders(dh
)) {
1645 uint16_t qtype
, qclass
;
1646 unsigned int consumed
= 0;
1647 DNSName
qname(query
, len
, sizeof(dnsheader
), false, &qtype
, &qclass
, &consumed
);
1648 DNSQuestion
dq(&qname
, qtype
, qclass
, consumed
, dest
.sin4
.sin_family
!= 0 ? &dest
: &cs
.local
, &remote
, dh
, queryBufferSize
, len
, false, &queryRealTime
);
1649 dq
.dnsCryptQuery
= std::move(dnsCryptQuery
);
1650 std::shared_ptr
<DownstreamState
> ss
{nullptr};
1651 auto result
= processQuery(dq
, cs
, holders
, ss
);
1653 if (result
== ProcessQueryResult::Drop
) {
1657 if (result
== ProcessQueryResult::SendAnswer
) {
1658 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1659 if (dq
.delayMsec
== 0 && responsesVect
!= nullptr) {
1660 queueResponse(cs
, reinterpret_cast<char*>(dq
.dh
), dq
.len
, *dq
.local
, *dq
.remote
, responsesVect
[*queuedResponses
], respIOV
, respCBuf
);
1661 (*queuedResponses
)++;
1664 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1665 /* we use dest, always, because we don't want to use the listening address to send a response since it could be 0.0.0.0 */
1666 sendUDPResponse(cs
.udpFD
, reinterpret_cast<char*>(dq
.dh
), dq
.len
, dq
.delayMsec
, dest
, *dq
.remote
);
1670 if (result
!= ProcessQueryResult::PassToBackend
|| ss
== nullptr) {
1674 unsigned int idOffset
= (ss
->idOffset
++) % ss
->idStates
.size();
1675 IDState
* ids
= &ss
->idStates
[idOffset
];
1677 DOHUnit
* du
= nullptr;
1679 /* that means that the state was in use, possibly with an allocated
1680 DOHUnit that we will need to handle, but we can't touch it before
1681 confirming that we now own this state */
1682 if (ids
->isInUse()) {
1686 /* we atomically replace the value, we now own this state */
1687 if (!ids
->markAsUsed()) {
1688 /* the state was not in use.
1689 we reset 'du' because it might have still been in use when we read it. */
1694 /* we are reusing a state, no change in outstanding but if there was an existing DOHUnit we need
1695 to handle it because it's about to be overwritten. */
1698 ++g_stats
.downstreamTimeouts
;
1699 handleDOHTimeout(du
);
1703 ids
->origFD
= cs
.udpFD
;
1704 ids
->origID
= dh
->id
;
1705 setIDStateFromDNSQuestion(*ids
, dq
, std::move(qname
));
1707 /* If we couldn't harvest the real dest addr, still
1708 write down the listening addr since it will be useful
1709 (especially if it's not an 'any' one).
1710 We need to keep track of which one it is since we may
1711 want to use the real but not the listening addr to reply.
1713 if (dest
.sin4
.sin_family
!= 0) {
1714 ids
->origDest
= dest
;
1715 ids
->destHarvested
= true;
1718 ids
->origDest
= cs
.local
;
1719 ids
->destHarvested
= false;
1724 int fd
= pickBackendSocketForSending(ss
);
1725 ssize_t ret
= udpClientSendRequestToBackend(ss
, fd
, query
, dq
.len
);
1729 ++g_stats
.downstreamSendErrors
;
1732 vinfolog("Got query for %s|%s from %s, relayed to %s", ids
->qname
.toLogString(), QType(ids
->qtype
).getName(), remote
.toStringWithPort(), ss
->getName());
1734 catch(const std::exception
& e
){
1735 vinfolog("Got an error in UDP question thread while parsing a query from %s, id %d: %s", remote
.toStringWithPort(), queryId
, e
.what());
1739 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1740 static void MultipleMessagesUDPClientThread(ClientState
* cs
, LocalHolders
& holders
)
1744 char packet
[s_maxPacketCacheEntrySize
];
1745 ComboAddress remote
;
1748 /* used by HarvestDestinationAddress */
1749 cmsgbuf_aligned cbuf
;
1751 const size_t vectSize
= g_udpVectorSize
;
1752 /* the actual buffer is larger because:
1753 - we may have to add EDNS and/or ECS
1754 - we use it for self-generated responses (from rule or cache)
1755 but we only accept incoming payloads up to that size
1757 static_assert(s_udpIncomingBufferSize
<= sizeof(MMReceiver::packet
), "the incoming buffer size should not be larger than sizeof(MMReceiver::packet)");
1759 auto recvData
= std::unique_ptr
<MMReceiver
[]>(new MMReceiver
[vectSize
]);
1760 auto msgVec
= std::unique_ptr
<struct mmsghdr
[]>(new struct mmsghdr
[vectSize
]);
1761 auto outMsgVec
= std::unique_ptr
<struct mmsghdr
[]>(new struct mmsghdr
[vectSize
]);
1763 /* initialize the structures needed to receive our messages */
1764 for (size_t idx
= 0; idx
< vectSize
; idx
++) {
1765 recvData
[idx
].remote
.sin4
.sin_family
= cs
->local
.sin4
.sin_family
;
1766 fillMSGHdr(&msgVec
[idx
].msg_hdr
, &recvData
[idx
].iov
, &recvData
[idx
].cbuf
, sizeof(recvData
[idx
].cbuf
), recvData
[idx
].packet
, s_udpIncomingBufferSize
, &recvData
[idx
].remote
);
1772 /* reset the IO vector, since it's also used to send the vector of responses
1773 to avoid having to copy the data around */
1774 for (size_t idx
= 0; idx
< vectSize
; idx
++) {
1775 recvData
[idx
].iov
.iov_base
= recvData
[idx
].packet
;
1776 recvData
[idx
].iov
.iov_len
= sizeof(recvData
[idx
].packet
);
1779 /* block until we have at least one message ready, but return
1780 as many as possible to save the syscall costs */
1781 int msgsGot
= recvmmsg(cs
->udpFD
, msgVec
.get(), vectSize
, MSG_WAITFORONE
| MSG_TRUNC
, nullptr);
1784 vinfolog("Getting UDP messages via recvmmsg() failed with: %s", stringerror());
1788 unsigned int msgsToSend
= 0;
1790 /* process the received messages */
1791 for (int msgIdx
= 0; msgIdx
< msgsGot
; msgIdx
++) {
1792 const struct msghdr
* msgh
= &msgVec
[msgIdx
].msg_hdr
;
1793 unsigned int got
= msgVec
[msgIdx
].msg_len
;
1794 const ComboAddress
& remote
= recvData
[msgIdx
].remote
;
1796 if (static_cast<size_t>(got
) < sizeof(struct dnsheader
)) {
1797 ++g_stats
.nonCompliantQueries
;
1801 processUDPQuery(*cs
, holders
, msgh
, remote
, recvData
[msgIdx
].dest
, recvData
[msgIdx
].packet
, static_cast<uint16_t>(got
), sizeof(recvData
[msgIdx
].packet
), outMsgVec
.get(), &msgsToSend
, &recvData
[msgIdx
].iov
, &recvData
[msgIdx
].cbuf
);
1805 /* immediate (not delayed or sent to a backend) responses (mostly from a rule, dynamic block
1806 or the cache) can be sent in batch too */
1808 if (msgsToSend
> 0 && msgsToSend
<= static_cast<unsigned int>(msgsGot
)) {
1809 int sent
= sendmmsg(cs
->udpFD
, outMsgVec
.get(), msgsToSend
, 0);
1811 if (sent
< 0 || static_cast<unsigned int>(sent
) != msgsToSend
) {
1812 vinfolog("Error sending responses with sendmmsg() (%d on %u): %s", sent
, msgsToSend
, stringerror());
1818 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1820 // listens to incoming queries, sends out to downstream servers, noting the intended return path
1821 static void udpClientThread(ClientState
* cs
)
1824 setThreadName("dnsdist/udpClie");
1825 LocalHolders holders
;
1827 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1828 if (g_udpVectorSize
> 1) {
1829 MultipleMessagesUDPClientThread(cs
, holders
);
1833 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1835 char packet
[s_maxPacketCacheEntrySize
];
1836 /* the actual buffer is larger because:
1837 - we may have to add EDNS and/or ECS
1838 - we use it for self-generated responses (from rule or cache)
1839 but we only accept incoming payloads up to that size
1841 static_assert(s_udpIncomingBufferSize
<= sizeof(packet
), "the incoming buffer size should not be larger than sizeof(MMReceiver::packet)");
1844 /* used by HarvestDestinationAddress */
1845 cmsgbuf_aligned cbuf
;
1847 ComboAddress remote
;
1849 remote
.sin4
.sin_family
= cs
->local
.sin4
.sin_family
;
1850 fillMSGHdr(&msgh
, &iov
, &cbuf
, sizeof(cbuf
), packet
, s_udpIncomingBufferSize
, &remote
);
1853 ssize_t got
= recvmsg(cs
->udpFD
, &msgh
, 0);
1855 if (got
< 0 || static_cast<size_t>(got
) < sizeof(struct dnsheader
)) {
1856 ++g_stats
.nonCompliantQueries
;
1860 processUDPQuery(*cs
, holders
, &msgh
, remote
, dest
, packet
, static_cast<uint16_t>(got
), sizeof(packet
), nullptr, nullptr, nullptr, nullptr);
1864 catch(const std::exception
&e
)
1866 errlog("UDP client thread died because of exception: %s", e
.what());
1868 catch(const PDNSException
&e
)
1870 errlog("UDP client thread died because of PowerDNS exception: %s", e
.reason
);
1874 errlog("UDP client thread died because of an exception: %s", "unknown");
1877 uint16_t getRandomDNSID()
1879 #ifdef HAVE_LIBSODIUM
1880 return randombytes_uniform(65536);
1882 return (random() % 65536);
1886 uint64_t g_maxTCPClientThreads
{10};
1887 std::atomic
<uint16_t> g_cacheCleaningDelay
{60};
1888 std::atomic
<uint16_t> g_cacheCleaningPercentage
{100};
1892 setThreadName("dnsdist/main");
1895 int32_t secondsToWaitLog
= 0;
1901 std::lock_guard
<std::mutex
> lock(g_luamutex
);
1902 auto f
= g_lua
.readVariable
<boost::optional
<std::function
<void()> > >("maintenance");
1906 secondsToWaitLog
= 0;
1908 catch(std::exception
&e
) {
1909 if (secondsToWaitLog
<= 0) {
1910 infolog("Error during execution of maintenance function: %s", e
.what());
1911 secondsToWaitLog
= 61;
1913 secondsToWaitLog
-= interval
;
1919 if (counter
>= g_cacheCleaningDelay
) {
1920 /* keep track, for each cache, of whether we should keep
1922 std::map
<std::shared_ptr
<DNSDistPacketCache
>, bool> caches
;
1924 /* gather all caches actually used by at least one pool, and see
1925 if something prevents us from cleaning the expired entries */
1926 auto localPools
= g_pools
.getLocal();
1927 for (const auto& entry
: *localPools
) {
1928 auto& pool
= entry
.second
;
1930 auto packetCache
= pool
->packetCache
;
1935 auto pair
= caches
.insert({packetCache
, false});
1936 auto& iter
= pair
.first
;
1937 /* if we need to keep stale data for this cache (ie, not clear
1938 expired entries when at least one pool using this cache
1939 has all its backends down) */
1940 if (packetCache
->keepStaleData() && iter
->second
== false) {
1941 /* so far all pools had at least one backend up */
1942 if (pool
->countServers(true) == 0) {
1943 iter
->second
= true;
1948 for (auto pair
: caches
) {
1949 /* shall we keep expired entries ? */
1950 if (pair
.second
== true) {
1953 auto& packetCache
= pair
.first
;
1954 size_t upTo
= (packetCache
->getMaxEntries()* (100 - g_cacheCleaningPercentage
)) / 100;
1955 packetCache
->purgeExpired(upTo
);
1960 // ponder pruning g_dynblocks of expired entries here
1964 static void secPollThread()
1966 setThreadName("dnsdist/secpoll");
1970 doSecPoll(g_secPollSuffix
);
1974 sleep(g_secPollInterval
);
1978 static void healthChecksThread()
1980 setThreadName("dnsdist/healthC");
1982 static const int interval
= 1;
1987 if(g_tcpclientthreads
->getQueuedCount() > 1 && !g_tcpclientthreads
->hasReachedMaxThreads()) {
1988 g_tcpclientthreads
->addTCPClientThread();
1991 auto mplexer
= std::shared_ptr
<FDMultiplexer
>(FDMultiplexer::getMultiplexerSilent());
1992 auto states
= g_dstates
.getLocal(); // this points to the actual shared_ptrs!
1993 for(auto& dss
: *states
) {
1994 if(++dss
->lastCheck
< dss
->checkInterval
) {
2000 if (dss
->availability
== DownstreamState::Availability::Auto
) {
2001 if (!queueHealthCheck(mplexer
, dss
)) {
2002 updateHealthCheckResult(dss
, false);
2006 auto delta
= dss
->sw
.udiffAndSet()/1000000.0;
2007 dss
->queryLoad
= 1.0*(dss
->queries
.load() - dss
->prev
.queries
.load())/delta
;
2008 dss
->dropRate
= 1.0*(dss
->reuseds
.load() - dss
->prev
.reuseds
.load())/delta
;
2009 dss
->prev
.queries
.store(dss
->queries
.load());
2010 dss
->prev
.reuseds
.store(dss
->reuseds
.load());
2012 for (IDState
& ids
: dss
->idStates
) { // timeouts
2013 int64_t usageIndicator
= ids
.usageIndicator
;
2014 if(IDState::isInUse(usageIndicator
) && ids
.age
++ > g_udpTimeout
) {
2015 /* We mark the state as unused as soon as possible
2016 to limit the risk of racing with the
2019 auto oldDU
= ids
.du
;
2021 if (!ids
.tryMarkUnused(usageIndicator
)) {
2022 /* this state has been altered in the meantime,
2023 don't go anywhere near it */
2027 handleDOHTimeout(oldDU
);
2031 ++g_stats
.downstreamTimeouts
; // this is an 'actively' discovered timeout
2032 vinfolog("Had a downstream timeout from %s (%s) for query for %s|%s from %s",
2033 dss
->remote
.toStringWithPort(), dss
->name
,
2034 ids
.qname
.toLogString(), QType(ids
.qtype
).getName(), ids
.origRemote
.toStringWithPort());
2039 struct dnsheader fake
;
2040 memset(&fake
, 0, sizeof(fake
));
2041 fake
.id
= ids
.origID
;
2043 g_rings
.insertResponse(ts
, ids
.origRemote
, ids
.qname
, ids
.qtype
, std::numeric_limits
<unsigned int>::max(), 0, fake
, dss
->remote
);
2048 handleQueuedHealthChecks(mplexer
);
2052 static void bindAny(int af
, int sock
)
2054 __attribute__((unused
)) int one
= 1;
2057 if (setsockopt(sock
, IPPROTO_IP
, IP_FREEBIND
, &one
, sizeof(one
)) < 0)
2058 warnlog("Warning: IP_FREEBIND setsockopt failed: %s", stringerror());
2063 if (setsockopt(sock
, IPPROTO_IP
, IP_BINDANY
, &one
, sizeof(one
)) < 0)
2064 warnlog("Warning: IP_BINDANY setsockopt failed: %s", stringerror());
2068 if (setsockopt(sock
, IPPROTO_IPV6
, IPV6_BINDANY
, &one
, sizeof(one
)) < 0)
2069 warnlog("Warning: IPV6_BINDANY setsockopt failed: %s", stringerror());
2072 if (setsockopt(sock
, SOL_SOCKET
, SO_BINDANY
, &one
, sizeof(one
)) < 0)
2073 warnlog("Warning: SO_BINDANY setsockopt failed: %s", stringerror());
2077 static void dropGroupPrivs(gid_t gid
)
2080 if (setgid(gid
) == 0) {
2081 if (setgroups(0, NULL
) < 0) {
2082 warnlog("Warning: Unable to drop supplementary gids: %s", stringerror());
2086 warnlog("Warning: Unable to set group ID to %d: %s", gid
, stringerror());
2091 static void dropUserPrivs(uid_t uid
)
2094 if(setuid(uid
) < 0) {
2095 warnlog("Warning: Unable to set user ID to %d: %s", uid
, stringerror());
2100 static void checkFileDescriptorsLimits(size_t udpBindsCount
, size_t tcpBindsCount
)
2102 /* stdin, stdout, stderr */
2103 size_t requiredFDsCount
= 3;
2104 auto backends
= g_dstates
.getLocal();
2105 /* UDP sockets to backends */
2106 size_t backendUDPSocketsCount
= 0;
2107 for (const auto& backend
: *backends
) {
2108 backendUDPSocketsCount
+= backend
->sockets
.size();
2110 requiredFDsCount
+= backendUDPSocketsCount
;
2111 /* TCP sockets to backends */
2112 requiredFDsCount
+= (backends
->size() * g_maxTCPClientThreads
);
2113 /* listening sockets */
2114 requiredFDsCount
+= udpBindsCount
;
2115 requiredFDsCount
+= tcpBindsCount
;
2116 /* max TCP connections currently served */
2117 requiredFDsCount
+= g_maxTCPClientThreads
;
2118 /* max pipes for communicating between TCP acceptors and client threads */
2119 requiredFDsCount
+= (g_maxTCPClientThreads
* 2);
2120 /* max TCP queued connections */
2121 requiredFDsCount
+= g_maxTCPQueuedConnections
;
2122 /* DelayPipe pipe */
2123 requiredFDsCount
+= 2;
2126 /* webserver main socket */
2128 /* console main socket */
2135 getrlimit(RLIMIT_NOFILE
, &rl
);
2136 if (rl
.rlim_cur
<= requiredFDsCount
) {
2137 warnlog("Warning, this configuration can use more than %d file descriptors, web server and console connections not included, and the current limit is %d.", std::to_string(requiredFDsCount
), std::to_string(rl
.rlim_cur
));
2139 warnlog("You can increase this value by using LimitNOFILE= in the systemd unit file or ulimit.");
2141 warnlog("You can increase this value by using ulimit.");
2146 static void setUpLocalBind(std::unique_ptr
<ClientState
>& cs
)
2148 /* skip some warnings if there is an identical UDP context */
2149 bool warn
= cs
->tcp
== false || cs
->tlsFrontend
!= nullptr || cs
->dohFrontend
!= nullptr;
2150 int& fd
= cs
->tcp
== false ? cs
->udpFD
: cs
->tcpFD
;
2153 fd
= SSocket(cs
->local
.sin4
.sin_family
, cs
->tcp
== false ? SOCK_DGRAM
: SOCK_STREAM
, 0);
2156 SSetsockopt(fd
, SOL_SOCKET
, SO_REUSEADDR
, 1);
2157 #ifdef TCP_DEFER_ACCEPT
2158 SSetsockopt(fd
, IPPROTO_TCP
, TCP_DEFER_ACCEPT
, 1);
2160 if (cs
->fastOpenQueueSize
> 0) {
2162 SSetsockopt(fd
, IPPROTO_TCP
, TCP_FASTOPEN
, cs
->fastOpenQueueSize
);
2165 warnlog("TCP Fast Open has been configured on local address '%s' but is not supported", cs
->local
.toStringWithPort());
2171 if(cs
->local
.sin4
.sin_family
== AF_INET6
) {
2172 SSetsockopt(fd
, IPPROTO_IPV6
, IPV6_V6ONLY
, 1);
2175 bindAny(cs
->local
.sin4
.sin_family
, fd
);
2177 if(!cs
->tcp
&& IsAnyAddress(cs
->local
)) {
2179 setsockopt(fd
, IPPROTO_IP
, GEN_IP_PKTINFO
, &one
, sizeof(one
)); // linux supports this, so why not - might fail on other systems
2180 #ifdef IPV6_RECVPKTINFO
2181 setsockopt(fd
, IPPROTO_IPV6
, IPV6_RECVPKTINFO
, &one
, sizeof(one
));
2185 if (cs
->reuseport
) {
2187 SSetsockopt(fd
, SOL_SOCKET
, SO_REUSEPORT
, 1);
2190 /* no need to warn again if configured but support is not available, we already did for UDP */
2191 warnlog("SO_REUSEPORT has been configured on local address '%s' but is not supported", cs
->local
.toStringWithPort());
2197 if (cs
->local
.isIPv4()) {
2199 setSocketIgnorePMTU(cs
->udpFD
);
2201 catch(const std::exception
& e
) {
2202 warnlog("Failed to set IP_MTU_DISCOVER on UDP server socket for local address '%s': %s", cs
->local
.toStringWithPort(), e
.what());
2207 const std::string
& itf
= cs
->interface
;
2209 #ifdef SO_BINDTODEVICE
2210 int res
= setsockopt(fd
, SOL_SOCKET
, SO_BINDTODEVICE
, itf
.c_str(), itf
.length());
2212 warnlog("Error setting up the interface on local address '%s': %s", cs
->local
.toStringWithPort(), stringerror());
2216 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", cs
->local
.toStringWithPort());
2222 if (g_defaultBPFFilter
) {
2223 cs
->attachFilter(g_defaultBPFFilter
);
2224 vinfolog("Attaching default BPF Filter to %s frontend %s", (!cs
->tcp
? "UDP" : "TCP"), cs
->local
.toStringWithPort());
2226 #endif /* HAVE_EBPF */
2228 if (cs
->tlsFrontend
!= nullptr) {
2229 if (!cs
->tlsFrontend
->setupTLS()) {
2230 errlog("Error while setting up TLS on local address '%s', exiting", cs
->local
.toStringWithPort());
2231 _exit(EXIT_FAILURE
);
2235 if (cs
->dohFrontend
!= nullptr) {
2236 cs
->dohFrontend
->setup();
2239 SBind(fd
, cs
->local
);
2242 SListen(cs
->tcpFD
, SOMAXCONN
);
2243 if (cs
->tlsFrontend
!= nullptr) {
2244 warnlog("Listening on %s for TLS", cs
->local
.toStringWithPort());
2246 else if (cs
->dohFrontend
!= nullptr) {
2247 warnlog("Listening on %s for DoH", cs
->local
.toStringWithPort());
2249 else if (cs
->dnscryptCtx
!= nullptr) {
2250 warnlog("Listening on %s for DNSCrypt", cs
->local
.toStringWithPort());
2253 warnlog("Listening on %s", cs
->local
.toStringWithPort());
2262 vector
<string
> locals
;
2263 vector
<string
> remotes
;
2264 bool checkConfig
{false};
2265 bool beClient
{false};
2266 bool beSupervised
{false};
2273 std::atomic
<bool> g_configurationDone
{false};
2278 cout
<<"Syntax: dnsdist [-C,--config file] [-c,--client [IP[:PORT]]]\n";
2279 cout
<<"[-e,--execute cmd] [-h,--help] [-l,--local addr]\n";
2280 cout
<<"[-v,--verbose] [--check-config] [--version]\n";
2282 cout
<<"-a,--acl netmask Add this netmask to the ACL\n";
2283 cout
<<"-C,--config file Load configuration from 'file'\n";
2284 cout
<<"-c,--client Operate as a client, connect to dnsdist. This reads\n";
2285 cout
<<" controlSocket from your configuration file, but also\n";
2286 cout
<<" accepts an IP:PORT argument\n";
2287 #ifdef HAVE_LIBSODIUM
2288 cout
<<"-k,--setkey KEY Use KEY for encrypted communication to dnsdist. This\n";
2289 cout
<<" is similar to setting setKey in the configuration file.\n";
2290 cout
<<" NOTE: this will leak this key in your shell's history\n";
2291 cout
<<" and in the systems running process list.\n";
2293 cout
<<"--check-config Validate the configuration file and exit. The exit-code\n";
2294 cout
<<" reflects the validation, 0 is OK, 1 means an error.\n";
2295 cout
<<" Any errors are printed as well.\n";
2296 cout
<<"-e,--execute cmd Connect to dnsdist and execute 'cmd'\n";
2297 cout
<<"-g,--gid gid Change the process group ID after binding sockets\n";
2298 cout
<<"-h,--help Display this helpful message\n";
2299 cout
<<"-l,--local address Listen on this local address\n";
2300 cout
<<"--supervised Don't open a console, I'm supervised\n";
2301 cout
<<" (use with e.g. systemd and daemontools)\n";
2302 cout
<<"--disable-syslog Don't log to syslog, only to stdout\n";
2303 cout
<<" (use with e.g. systemd)\n";
2304 cout
<<"-u,--uid uid Change the process user ID after binding sockets\n";
2305 cout
<<"-v,--verbose Enable verbose mode\n";
2306 cout
<<"-V,--version Show dnsdist version information and exit\n";
2309 int main(int argc
, char** argv
)
2312 size_t udpBindsCount
= 0;
2313 size_t tcpBindsCount
= 0;
2314 rl_attempted_completion_function
= my_completion
;
2315 rl_completion_append_character
= 0;
2317 signal(SIGPIPE
, SIG_IGN
);
2318 signal(SIGCHLD
, SIG_IGN
);
2319 openlog("dnsdist", LOG_PID
|LOG_NDELAY
, LOG_DAEMON
);
2321 #ifdef HAVE_LIBSODIUM
2322 if (sodium_init() == -1) {
2323 cerr
<<"Unable to initialize crypto library"<<endl
;
2326 g_hashperturb
=randombytes_uniform(0xffffffff);
2327 srandom(randombytes_uniform(0xffffffff));
2331 gettimeofday(&tv
, 0);
2332 srandom(tv
.tv_sec
^ tv
.tv_usec
^ getpid());
2333 g_hashperturb
=random();
2337 ComboAddress clientAddress
= ComboAddress();
2338 g_cmdLine
.config
=SYSCONFDIR
"/dnsdist.conf";
2339 struct option longopts
[]={
2340 {"acl", required_argument
, 0, 'a'},
2341 {"check-config", no_argument
, 0, 1},
2342 {"client", no_argument
, 0, 'c'},
2343 {"config", required_argument
, 0, 'C'},
2344 {"disable-syslog", no_argument
, 0, 2},
2345 {"execute", required_argument
, 0, 'e'},
2346 {"gid", required_argument
, 0, 'g'},
2347 {"help", no_argument
, 0, 'h'},
2348 {"local", required_argument
, 0, 'l'},
2349 {"setkey", required_argument
, 0, 'k'},
2350 {"supervised", no_argument
, 0, 3},
2351 {"uid", required_argument
, 0, 'u'},
2352 {"verbose", no_argument
, 0, 'v'},
2353 {"version", no_argument
, 0, 'V'},
2359 int c
=getopt_long(argc
, argv
, "a:cC:e:g:hk:l:u:vV", longopts
, &longindex
);
2364 g_cmdLine
.checkConfig
=true;
2370 g_cmdLine
.beSupervised
=true;
2373 g_cmdLine
.config
=optarg
;
2376 g_cmdLine
.beClient
=true;
2379 g_cmdLine
.command
=optarg
;
2382 g_cmdLine
.gid
=optarg
;
2385 cout
<<"dnsdist "<<VERSION
<<endl
;
2392 g_ACL
.modify([optstring
](NetmaskGroup
& nmg
) { nmg
.addMask(optstring
); });
2395 #ifdef HAVE_LIBSODIUM
2396 if (B64Decode(string(optarg
), g_consoleKey
) < 0) {
2397 cerr
<<"Unable to decode key '"<<optarg
<<"'."<<endl
;
2401 cerr
<<"dnsdist has been built without libsodium, -k/--setkey is unsupported."<<endl
;
2406 g_cmdLine
.locals
.push_back(trim_copy(string(optarg
)));
2409 g_cmdLine
.uid
=optarg
;
2415 #ifdef LUAJIT_VERSION
2416 cout
<<"dnsdist "<<VERSION
<<" ("<<LUA_RELEASE
<<" ["<<LUAJIT_VERSION
<<"])"<<endl
;
2418 cout
<<"dnsdist "<<VERSION
<<" ("<<LUA_RELEASE
<<")"<<endl
;
2420 cout
<<"Enabled features: ";
2424 #ifdef HAVE_DNS_OVER_TLS
2425 cout
<<"dns-over-tls(";
2437 #ifdef HAVE_DNS_OVER_HTTPS
2438 cout
<<"dns-over-https(DOH) ";
2440 #ifdef HAVE_DNSCRYPT
2449 #ifdef HAVE_LIBCRYPTO
2452 #ifdef HAVE_LIBSODIUM
2458 #ifdef HAVE_PROTOBUF
2464 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
2465 cout
<<"recvmmsg/sendmmsg ";
2467 #ifdef HAVE_NET_SNMP
2477 //getopt_long printed an error message.
2486 for(auto p
= argv
; *p
; ++p
) {
2487 if(g_cmdLine
.beClient
) {
2488 clientAddress
= ComboAddress(*p
, 5199);
2490 g_cmdLine
.remotes
.push_back(*p
);
2494 ServerPolicy leastOutstandingPol
{"leastOutstanding", leastOutstanding
, false};
2496 g_policy
.setState(leastOutstandingPol
);
2497 if(g_cmdLine
.beClient
|| !g_cmdLine
.command
.empty()) {
2498 setupLua(true, false, g_cmdLine
.config
);
2499 if (clientAddress
!= ComboAddress())
2500 g_serverControl
= clientAddress
;
2501 doClient(g_serverControl
, g_cmdLine
.command
);
2502 _exit(EXIT_SUCCESS
);
2505 auto acl
= g_ACL
.getCopy();
2507 for(auto& addr
: {"127.0.0.0/8", "10.0.0.0/8", "100.64.0.0/10", "169.254.0.0/16", "192.168.0.0/16", "172.16.0.0/12", "::1/128", "fc00::/7", "fe80::/10"})
2509 g_ACL
.setState(acl
);
2512 auto consoleACL
= g_consoleACL
.getCopy();
2513 for (const auto& mask
: { "127.0.0.1/8", "::1/128" }) {
2514 consoleACL
.addMask(mask
);
2516 g_consoleACL
.setState(consoleACL
);
2518 if (g_cmdLine
.checkConfig
) {
2519 setupLua(false, true, g_cmdLine
.config
);
2520 // No exception was thrown
2521 infolog("Configuration '%s' OK!", g_cmdLine
.config
);
2522 _exit(EXIT_SUCCESS
);
2525 auto todo
=setupLua(false, false, g_cmdLine
.config
);
2527 auto localPools
= g_pools
.getCopy();
2529 bool precompute
= false;
2530 if (g_policy
.getLocal()->name
== "chashed") {
2533 for (const auto& entry
: localPools
) {
2534 if (entry
.second
->policy
!= nullptr && entry
.second
->policy
->name
== "chashed") {
2541 vinfolog("Pre-computing hashes for consistent hash load-balancing policy");
2542 // pre compute hashes
2543 auto backends
= g_dstates
.getLocal();
2544 for (auto& backend
: *backends
) {
2550 if (!g_cmdLine
.locals
.empty()) {
2551 for (auto it
= g_frontends
.begin(); it
!= g_frontends
.end(); ) {
2552 /* DoH, DoT and DNSCrypt frontends are separate */
2553 if ((*it
)->dohFrontend
== nullptr && (*it
)->tlsFrontend
== nullptr && (*it
)->dnscryptCtx
== nullptr) {
2554 it
= g_frontends
.erase(it
);
2561 for(const auto& loc
: g_cmdLine
.locals
) {
2563 g_frontends
.push_back(std::unique_ptr
<ClientState
>(new ClientState(ComboAddress(loc
, 53), false, false, 0, "", {})));
2565 g_frontends
.push_back(std::unique_ptr
<ClientState
>(new ClientState(ComboAddress(loc
, 53), true, false, 0, "", {})));
2569 if (g_frontends
.empty()) {
2571 g_frontends
.push_back(std::unique_ptr
<ClientState
>(new ClientState(ComboAddress("127.0.0.1", 53), false, false, 0, "", {})));
2573 g_frontends
.push_back(std::unique_ptr
<ClientState
>(new ClientState(ComboAddress("127.0.0.1", 53), true, false, 0, "", {})));
2576 g_configurationDone
= true;
2578 for(auto& frontend
: g_frontends
) {
2579 setUpLocalBind(frontend
);
2581 if (frontend
->tcp
== false) {
2589 warnlog("dnsdist %s comes with ABSOLUTELY NO WARRANTY. This is free software, and you are welcome to redistribute it according to the terms of the GPL version 2", VERSION
);
2593 g_ACL
.getLocal()->toStringVector(&vec
);
2594 for(const auto& s
: vec
) {
2599 infolog("ACL allowing queries from: %s", acls
.c_str());
2602 g_consoleACL
.getLocal()->toStringVector(&vec
);
2603 for (const auto& entry
: vec
) {
2604 if (!acls
.empty()) {
2609 infolog("Console ACL allowing connections from: %s", acls
.c_str());
2611 #ifdef HAVE_LIBSODIUM
2612 if (g_consoleEnabled
&& g_consoleKey
.empty()) {
2613 warnlog("Warning, the console has been enabled via 'controlSocket()' but no key has been set with 'setKey()' so all connections will fail until a key has been set");
2617 uid_t newgid
=getegid();
2618 gid_t newuid
=geteuid();
2620 if(!g_cmdLine
.gid
.empty())
2621 newgid
= strToGID(g_cmdLine
.gid
.c_str());
2623 if(!g_cmdLine
.uid
.empty())
2624 newuid
= strToUID(g_cmdLine
.uid
.c_str());
2626 if (getegid() != newgid
) {
2627 if (running_in_service_mgr()) {
2628 errlog("--gid/-g set on command-line, but dnsdist was started as a systemd service. Use the 'Group' setting in the systemd unit file to set the group to run as");
2629 _exit(EXIT_FAILURE
);
2631 dropGroupPrivs(newgid
);
2634 if (geteuid() != newuid
) {
2635 if (running_in_service_mgr()) {
2636 errlog("--uid/-u set on command-line, but dnsdist was started as a systemd service. Use the 'User' setting in the systemd unit file to set the user to run as");
2637 _exit(EXIT_FAILURE
);
2639 dropUserPrivs(newuid
);
2643 /* we might still have capabilities remaining,
2644 for example if we have been started as root
2645 without --uid or --gid (please don't do that)
2646 or as an unprivileged user with ambient
2647 capabilities like CAP_NET_BIND_SERVICE.
2649 dropCapabilities(g_capabilitiesToRetain
);
2651 catch(const std::exception
& e
) {
2652 warnlog("%s", e
.what());
2655 /* this need to be done _after_ dropping privileges */
2656 g_delay
= new DelayPipe
<DelayedPacket
>();
2662 g_tcpclientthreads
= std::unique_ptr
<TCPClientCollection
>(new TCPClientCollection(g_maxTCPClientThreads
, g_useTCPSinglePipe
));
2667 localPools
= g_pools
.getCopy();
2668 /* create the default pool no matter what */
2669 createPoolIfNotExists(localPools
, "");
2670 if(g_cmdLine
.remotes
.size()) {
2671 for(const auto& address
: g_cmdLine
.remotes
) {
2672 auto ret
=std::make_shared
<DownstreamState
>(ComboAddress(address
, 53));
2673 addServerToPool(localPools
, "", ret
);
2674 if (ret
->connected
&& !ret
->threadStarted
.test_and_set()) {
2675 ret
->tid
= thread(responderThread
, ret
);
2677 g_dstates
.modify([ret
](servers_t
& servers
) { servers
.push_back(ret
); });
2680 g_pools
.setState(localPools
);
2682 if(g_dstates
.getLocal()->empty()) {
2683 errlog("No downstream servers defined: all packets will get dropped");
2684 // you might define them later, but you need to know
2687 checkFileDescriptorsLimits(udpBindsCount
, tcpBindsCount
);
2689 auto mplexer
= std::shared_ptr
<FDMultiplexer
>(FDMultiplexer::getMultiplexerSilent());
2690 for(auto& dss
: g_dstates
.getCopy()) { // it is a copy, but the internal shared_ptrs are the real deal
2691 if (dss
->availability
== DownstreamState::Availability::Auto
) {
2692 if (!queueHealthCheck(mplexer
, dss
, true)) {
2693 dss
->upStatus
= false;
2694 warnlog("Marking downstream %s as 'down'", dss
->getNameWithAddr());
2698 handleQueuedHealthChecks(mplexer
, true);
2700 for(auto& cs
: g_frontends
) {
2701 if (cs
->dohFrontend
!= nullptr) {
2702 #ifdef HAVE_DNS_OVER_HTTPS
2703 std::thread
t1(dohThread
, cs
.get());
2704 if (!cs
->cpus
.empty()) {
2705 mapThreadToCPUList(t1
.native_handle(), cs
->cpus
);
2708 #endif /* HAVE_DNS_OVER_HTTPS */
2711 if (cs
->udpFD
>= 0) {
2712 thread
t1(udpClientThread
, cs
.get());
2713 if (!cs
->cpus
.empty()) {
2714 mapThreadToCPUList(t1
.native_handle(), cs
->cpus
);
2718 else if (cs
->tcpFD
>= 0) {
2719 thread
t1(tcpAcceptorThread
, cs
.get());
2720 if (!cs
->cpus
.empty()) {
2721 mapThreadToCPUList(t1
.native_handle(), cs
->cpus
);
2727 thread
carbonthread(carbonDumpThread
);
2728 carbonthread
.detach();
2730 thread
stattid(maintThread
);
2733 thread
healththread(healthChecksThread
);
2735 if (!g_secPollSuffix
.empty()) {
2736 thread
secpollthread(secPollThread
);
2737 secpollthread
.detach();
2740 if(g_cmdLine
.beSupervised
) {
2742 sd_notify(0, "READY=1");
2744 healththread
.join();
2747 healththread
.detach();
2750 _exit(EXIT_SUCCESS
);
2753 catch(const LuaContext::ExecutionErrorException
& e
) {
2755 errlog("Fatal Lua error: %s", e
.what());
2756 std::rethrow_if_nested(e
);
2757 } catch(const std::exception
& ne
) {
2758 errlog("Details: %s", ne
.what());
2760 catch(PDNSException
&ae
)
2762 errlog("Fatal pdns error: %s", ae
.reason
);
2764 _exit(EXIT_FAILURE
);
2766 catch(std::exception
&e
)
2768 errlog("Fatal error: %s", e
.what());
2769 _exit(EXIT_FAILURE
);
2771 catch(PDNSException
&ae
)
2773 errlog("Fatal pdns error: %s", ae
.reason
);
2774 _exit(EXIT_FAILURE
);
2777 uint64_t getLatencyCount(const std::string
&)
2779 return g_stats
.responses
+ g_stats
.selfAnswered
+ g_stats
.cacheHits
;