2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
29 #include <netinet/tcp.h>
31 #include <sys/resource.h>
34 #if defined (__OpenBSD__) || defined(__NetBSD__)
35 #include <readline/readline.h>
37 #include <editline/readline.h>
41 #include <systemd/sd-daemon.h>
45 #include "dnsdist-cache.hh"
46 #include "dnsdist-console.hh"
47 #include "dnsdist-ecs.hh"
48 #include "dnsdist-healthchecks.hh"
49 #include "dnsdist-lua.hh"
50 #include "dnsdist-rings.hh"
51 #include "dnsdist-secpoll.hh"
52 #include "dnsdist-xpf.hh"
55 #include "delaypipe.hh"
58 #include "dnsparser.hh"
59 #include "ednsoptions.hh"
63 #include "sodcrypto.hh"
65 #include "threadname.hh"
69 Receiver is currently single threaded
70 not *that* bad actually, but now that we are thread safe, might want to scale
74 Set of Rules, if one matches, it leads to an Action
75 Both rules and actions could conceivably be Lua based.
76 On the C++ side, both could be inherited from a class Rule and a class Action,
77 on the Lua side we can't do that. */
83 struct DNSDistStats g_stats
;
84 MetricDefinitionStorage g_metricDefinitions
;
86 uint16_t g_maxOutstanding
{std::numeric_limits
<uint16_t>::max()};
87 uint32_t g_staleCacheEntriesTTL
{0};
89 bool g_allowEmptyResponse
{false};
91 GlobalStateHolder
<NetmaskGroup
> g_ACL
;
92 string g_outputBuffer
;
94 std::vector
<std::shared_ptr
<TLSFrontend
>> g_tlslocals
;
95 std::vector
<std::shared_ptr
<DOHFrontend
>> g_dohlocals
;
96 std::vector
<std::shared_ptr
<DNSCryptContext
>> g_dnsCryptLocals
;
98 shared_ptr
<BPFFilter
> g_defaultBPFFilter
;
99 std::vector
<std::shared_ptr
<DynBPFFilter
> > g_dynBPFFilters
;
100 #endif /* HAVE_EBPF */
101 std::vector
<std::unique_ptr
<ClientState
>> g_frontends
;
102 GlobalStateHolder
<pools_t
> g_pools
;
103 size_t g_udpVectorSize
{1};
105 bool g_snmpEnabled
{false};
106 bool g_snmpTrapsEnabled
{false};
107 DNSDistSNMPAgent
* g_snmpAgent
{nullptr};
109 /* UDP: the grand design. Per socket we listen on for incoming queries there is one thread.
110 Then we have a bunch of connected sockets for talking to downstream servers.
111 We send directly to those sockets.
113 For the return path, per downstream server we have a thread that listens to responses.
115 Per socket there is an array of 2^16 states, when we send out a packet downstream, we note
116 there the original requestor and the original id. The new ID is the offset in the array.
118 When an answer comes in on a socket, we look up the offset by the id, and lob it to the
121 IDs are assigned by atomic increments of the socket offset.
124 GlobalStateHolder
<vector
<DNSDistRuleAction
> > g_rulactions
;
125 GlobalStateHolder
<vector
<DNSDistResponseRuleAction
> > g_resprulactions
;
126 GlobalStateHolder
<vector
<DNSDistResponseRuleAction
> > g_cachehitresprulactions
;
127 GlobalStateHolder
<vector
<DNSDistResponseRuleAction
> > g_selfansweredresprulactions
;
132 GlobalStateHolder
<servers_t
> g_dstates
;
133 GlobalStateHolder
<NetmaskTree
<DynBlock
>> g_dynblockNMG
;
134 GlobalStateHolder
<SuffixMatchTree
<DynBlock
>> g_dynblockSMT
;
135 DNSAction::Action g_dynBlockAction
= DNSAction::Action::Drop
;
136 int g_tcpRecvTimeout
{2};
137 int g_tcpSendTimeout
{2};
140 bool g_servFailOnNoPolicy
{false};
141 bool g_truncateTC
{false};
142 bool g_fixupCase
{false};
143 bool g_preserveTrailingData
{false};
144 bool g_roundrobinFailOnNoServer
{false};
146 std::set
<std::string
> g_capabilitiesToRetain
;
148 static void truncateTC(char* packet
, uint16_t* len
, size_t responseSize
, unsigned int consumed
)
151 bool hadEDNS
= false;
152 uint16_t payloadSize
= 0;
155 if (g_addEDNSToSelfGeneratedResponses
) {
156 hadEDNS
= getEDNSUDPPayloadSizeAndZ(packet
, *len
, &payloadSize
, &z
);
159 *len
=static_cast<uint16_t>(sizeof(dnsheader
)+consumed
+DNS_TYPE_SIZE
+DNS_CLASS_SIZE
);
160 struct dnsheader
* dh
= reinterpret_cast<struct dnsheader
*>(packet
);
161 dh
->ancount
= dh
->arcount
= dh
->nscount
= 0;
164 addEDNS(dh
, *len
, responseSize
, z
& EDNS_HEADER_FLAG_DO
, payloadSize
, 0);
176 ComboAddress destination
;
177 ComboAddress origDest
;
181 if(origDest
.sin4
.sin_family
== 0) {
182 res
= sendto(fd
, packet
.c_str(), packet
.size(), 0, (struct sockaddr
*)&destination
, destination
.getSocklen());
185 res
= sendfromto(fd
, packet
.c_str(), packet
.size(), 0, origDest
, destination
);
189 vinfolog("Error sending delayed response to %s: %s", destination
.toStringWithPort(), strerror(err
));
194 DelayPipe
<DelayedPacket
>* g_delay
= nullptr;
196 void doLatencyStats(double udiff
)
198 if(udiff
< 1000) ++g_stats
.latency0_1
;
199 else if(udiff
< 10000) ++g_stats
.latency1_10
;
200 else if(udiff
< 50000) ++g_stats
.latency10_50
;
201 else if(udiff
< 100000) ++g_stats
.latency50_100
;
202 else if(udiff
< 1000000) ++g_stats
.latency100_1000
;
203 else ++g_stats
.latencySlow
;
204 g_stats
.latencySum
+= udiff
/ 1000;
206 auto doAvg
= [](double& var
, double n
, double weight
) {
207 var
= (weight
-1) * var
/weight
+ n
/weight
;
210 doAvg(g_stats
.latencyAvg100
, udiff
, 100);
211 doAvg(g_stats
.latencyAvg1000
, udiff
, 1000);
212 doAvg(g_stats
.latencyAvg10000
, udiff
, 10000);
213 doAvg(g_stats
.latencyAvg1000000
, udiff
, 1000000);
216 bool responseContentMatches(const char* response
, const uint16_t responseLen
, const DNSName
& qname
, const uint16_t qtype
, const uint16_t qclass
, const ComboAddress
& remote
, unsigned int& consumed
)
218 if (responseLen
< sizeof(dnsheader
)) {
222 const struct dnsheader
* dh
= reinterpret_cast<const struct dnsheader
*>(response
);
223 if (dh
->qdcount
== 0) {
224 if ((dh
->rcode
!= RCode::NoError
&& dh
->rcode
!= RCode::NXDomain
) || g_allowEmptyResponse
) {
228 ++g_stats
.nonCompliantResponses
;
233 uint16_t rqtype
, rqclass
;
236 rqname
=DNSName(response
, responseLen
, sizeof(dnsheader
), false, &rqtype
, &rqclass
, &consumed
);
238 catch(const std::exception
& e
) {
239 if(responseLen
> 0 && static_cast<size_t>(responseLen
) > sizeof(dnsheader
)) {
240 infolog("Backend %s sent us a response with id %d that did not parse: %s", remote
.toStringWithPort(), ntohs(dh
->id
), e
.what());
242 ++g_stats
.nonCompliantResponses
;
246 if (rqtype
!= qtype
|| rqclass
!= qclass
|| rqname
!= qname
) {
253 static void restoreFlags(struct dnsheader
* dh
, uint16_t origFlags
)
255 static const uint16_t rdMask
= 1 << FLAGS_RD_OFFSET
;
256 static const uint16_t cdMask
= 1 << FLAGS_CD_OFFSET
;
257 static const uint16_t restoreFlagsMask
= UINT16_MAX
& ~(rdMask
| cdMask
);
258 uint16_t * flags
= getFlagsFromDNSHeader(dh
);
259 /* clear the flags we are about to restore */
260 *flags
&= restoreFlagsMask
;
261 /* only keep the flags we want to restore */
262 origFlags
&= ~restoreFlagsMask
;
263 /* set the saved flags as they were */
267 static bool fixUpQueryTurnedResponse(DNSQuestion
& dq
, const uint16_t origFlags
)
269 restoreFlags(dq
.dh
, origFlags
);
271 return addEDNSToQueryTurnedResponse(dq
);
274 static bool fixUpResponse(char** response
, uint16_t* responseLen
, size_t* responseSize
, const DNSName
& qname
, uint16_t origFlags
, bool ednsAdded
, bool ecsAdded
, std::vector
<uint8_t>& rewrittenResponse
, uint16_t addRoom
, bool* zeroScope
)
276 if (*responseLen
< sizeof(dnsheader
)) {
280 struct dnsheader
* dh
= reinterpret_cast<struct dnsheader
*>(*response
);
281 restoreFlags(dh
, origFlags
);
283 if (*responseLen
== sizeof(dnsheader
)) {
288 string realname
= qname
.toDNSString();
289 if (*responseLen
>= (sizeof(dnsheader
) + realname
.length())) {
290 memcpy(*response
+ sizeof(dnsheader
), realname
.c_str(), realname
.length());
294 if (ednsAdded
|| ecsAdded
) {
299 const std::string
responseStr(*response
, *responseLen
);
300 int res
= locateEDNSOptRR(responseStr
, &optStart
, &optLen
, &last
);
303 if (zeroScope
) { // this finds if an EDNS Client Subnet scope was set, and if it is 0
304 size_t optContentStart
= 0;
305 uint16_t optContentLen
= 0;
306 /* we need at least 4 bytes after the option length (family: 2, source prefix-length: 1, scope prefix-length: 1) */
307 if (isEDNSOptionInOpt(responseStr
, optStart
, optLen
, EDNSOptionCode::ECS
, &optContentStart
, &optContentLen
) && optContentLen
>= 4) {
308 /* see if the EDNS Client Subnet SCOPE PREFIX-LENGTH byte in position 3 is set to 0, which is the only thing
310 *zeroScope
= responseStr
.at(optContentStart
+ 3) == 0;
315 /* we added the entire OPT RR,
316 therefore we need to remove it entirely */
318 /* simply remove the last AR */
319 *responseLen
-= optLen
;
320 uint16_t arcount
= ntohs(dh
->arcount
);
322 dh
->arcount
= htons(arcount
);
325 /* Removing an intermediary RR could lead to compression error */
326 if (rewriteResponseWithoutEDNS(responseStr
, rewrittenResponse
) == 0) {
327 *responseLen
= rewrittenResponse
.size();
328 if (addRoom
&& (UINT16_MAX
- *responseLen
) > addRoom
) {
329 rewrittenResponse
.reserve(*responseLen
+ addRoom
);
331 *responseSize
= rewrittenResponse
.capacity();
332 *response
= reinterpret_cast<char*>(rewrittenResponse
.data());
335 warnlog("Error rewriting content");
340 /* the OPT RR was already present, but without ECS,
341 we need to remove the ECS option if any */
343 /* nothing after the OPT RR, we can simply remove the
345 size_t existingOptLen
= optLen
;
346 removeEDNSOptionFromOPT(*response
+ optStart
, &optLen
, EDNSOptionCode::ECS
);
347 *responseLen
-= (existingOptLen
- optLen
);
350 /* Removing an intermediary RR could lead to compression error */
351 if (rewriteResponseWithoutEDNSOption(responseStr
, EDNSOptionCode::ECS
, rewrittenResponse
) == 0) {
352 *responseLen
= rewrittenResponse
.size();
353 if (addRoom
&& (UINT16_MAX
- *responseLen
) > addRoom
) {
354 rewrittenResponse
.reserve(*responseLen
+ addRoom
);
356 *responseSize
= rewrittenResponse
.capacity();
357 *response
= reinterpret_cast<char*>(rewrittenResponse
.data());
360 warnlog("Error rewriting content");
371 static bool encryptResponse(char* response
, uint16_t* responseLen
, size_t responseSize
, bool tcp
, std::shared_ptr
<DNSCryptQuery
> dnsCryptQuery
, dnsheader
** dh
, dnsheader
* dhCopy
)
374 uint16_t encryptedResponseLen
= 0;
376 /* save the original header before encrypting it in place */
377 if (dh
!= nullptr && *dh
!= nullptr && dhCopy
!= nullptr) {
378 memcpy(dhCopy
, *dh
, sizeof(dnsheader
));
382 int res
= dnsCryptQuery
->encryptResponse(response
, *responseLen
, responseSize
, tcp
, &encryptedResponseLen
);
384 *responseLen
= encryptedResponseLen
;
386 /* dropping response */
387 vinfolog("Error encrypting the response, dropping.");
393 #endif /* HAVE_DNSCRYPT */
395 static bool applyRulesToResponse(LocalStateHolder
<vector
<DNSDistResponseRuleAction
> >& localRespRulactions
, DNSResponse
& dr
)
397 DNSResponseAction::Action action
=DNSResponseAction::Action::None
;
398 std::string ruleresult
;
399 for(const auto& lr
: *localRespRulactions
) {
400 if(lr
.d_rule
->matches(&dr
)) {
401 lr
.d_rule
->d_matches
++;
402 action
=(*lr
.d_action
)(&dr
, &ruleresult
);
404 case DNSResponseAction::Action::Allow
:
407 case DNSResponseAction::Action::Drop
:
410 case DNSResponseAction::Action::HeaderModify
:
413 case DNSResponseAction::Action::ServFail
:
414 dr
.dh
->rcode
= RCode::ServFail
;
417 /* non-terminal actions follow */
418 case DNSResponseAction::Action::Delay
:
419 dr
.delayMsec
= static_cast<int>(pdns_stou(ruleresult
)); // sorry
421 case DNSResponseAction::Action::None
:
430 bool processResponse(char** response
, uint16_t* responseLen
, size_t* responseSize
, LocalStateHolder
<vector
<DNSDistResponseRuleAction
> >& localRespRulactions
, DNSResponse
& dr
, size_t addRoom
, std::vector
<uint8_t>& rewrittenResponse
, bool muted
)
432 if (!applyRulesToResponse(localRespRulactions
, dr
)) {
436 bool zeroScope
= false;
437 if (!fixUpResponse(response
, responseLen
, responseSize
, *dr
.qname
, dr
.origFlags
, dr
.ednsAdded
, dr
.ecsAdded
, rewrittenResponse
, addRoom
, dr
.useZeroScope
? &zeroScope
: nullptr)) {
441 if (dr
.packetCache
&& !dr
.skipCache
&& *responseLen
<= s_maxPacketCacheEntrySize
) {
442 if (!dr
.useZeroScope
) {
443 /* if the query was not suitable for zero-scope, for
444 example because it had an existing ECS entry so the hash is
445 not really 'no ECS', so just insert it for the existing subnet
447 - we don't have the correct hash for a non-ECS query
448 - inserting with hash computed before the ECS replacement but with
449 the subnet extracted _after_ the replacement would not work.
453 // if zeroScope, pass the pre-ECS hash-key and do not pass the subnet to the cache
454 dr
.packetCache
->insert(zeroScope
? dr
.cacheKeyNoECS
: dr
.cacheKey
, zeroScope
? boost::none
: dr
.subnet
, dr
.origFlags
, dr
.dnssecOK
, *dr
.qname
, dr
.qtype
, dr
.qclass
, *response
, *responseLen
, dr
.tcp
, dr
.dh
->rcode
, dr
.tempFailureTTL
);
459 if (!encryptResponse(*response
, responseLen
, *responseSize
, dr
.tcp
, dr
.dnsCryptQuery
, nullptr, nullptr)) {
463 #endif /* HAVE_DNSCRYPT */
468 static bool sendUDPResponse(int origFD
, const char* response
, const uint16_t responseLen
, const int delayMsec
, const ComboAddress
& origDest
, const ComboAddress
& origRemote
)
470 if(delayMsec
&& g_delay
) {
471 DelayedPacket dp
{origFD
, string(response
,responseLen
), origRemote
, origDest
};
472 g_delay
->submit(dp
, delayMsec
);
476 if(origDest
.sin4
.sin_family
== 0) {
477 res
= sendto(origFD
, response
, responseLen
, 0, reinterpret_cast<const struct sockaddr
*>(&origRemote
), origRemote
.getSocklen());
480 res
= sendfromto(origFD
, response
, responseLen
, 0, origDest
, origRemote
);
484 vinfolog("Error sending response to %s: %s", origRemote
.toStringWithPort(), stringerror(err
));
492 int pickBackendSocketForSending(std::shared_ptr
<DownstreamState
>& state
)
494 return state
->sockets
[state
->socketsOffset
++ % state
->sockets
.size()];
497 static void pickBackendSocketsReadyForReceiving(const std::shared_ptr
<DownstreamState
>& state
, std::vector
<int>& ready
)
501 if (state
->sockets
.size() == 1) {
502 ready
.push_back(state
->sockets
[0]);
507 std::lock_guard
<std::mutex
> lock(state
->socketsLock
);
508 state
->mplexer
->getAvailableFDs(ready
, -1);
512 // listens on a dedicated socket, lobs answers from downstream servers to original requestors
513 void responderThread(std::shared_ptr
<DownstreamState
> dss
)
515 setThreadName("dnsdist/respond");
516 auto localRespRulactions
= g_resprulactions
.getLocal();
517 char packet
[s_maxPacketCacheEntrySize
+ DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE
];
518 static_assert(sizeof(packet
) <= UINT16_MAX
, "Packet size should fit in a uint16_t");
519 /* when the answer is encrypted in place, we need to get a copy
520 of the original header before encryption to fill the ring buffer */
521 dnsheader cleartextDH
;
522 vector
<uint8_t> rewrittenResponse
;
524 uint16_t queryId
= 0;
525 std::vector
<int> sockets
;
526 sockets
.reserve(dss
->sockets
.size());
529 dnsheader
* dh
= reinterpret_cast<struct dnsheader
*>(packet
);
531 pickBackendSocketsReadyForReceiving(dss
, sockets
);
532 for (const auto& fd
: sockets
) {
533 ssize_t got
= recv(fd
, packet
, sizeof(packet
), 0);
534 char * response
= packet
;
535 size_t responseSize
= sizeof(packet
);
537 if (got
< 0 || static_cast<size_t>(got
) < sizeof(dnsheader
))
540 uint16_t responseLen
= static_cast<uint16_t>(got
);
543 if(queryId
>= dss
->idStates
.size()) {
547 IDState
* ids
= &dss
->idStates
[queryId
];
548 int64_t usageIndicator
= ids
->usageIndicator
;
550 if(!IDState::isInUse(usageIndicator
)) {
551 /* the corresponding state is marked as not in use, meaning that:
552 - it was already cleaned up by another thread and the state is gone ;
553 - we already got a response for this query and this one is a duplicate.
554 Either way, we don't touch it.
559 /* read the potential DOHUnit state as soon as possible, but don't use it
560 until we have confirmed that we own this state by updating usageIndicator */
562 /* setting age to 0 to prevent the maintainer thread from
563 cleaning this IDS while we process the response.
566 int origFD
= ids
->origFD
;
568 unsigned int consumed
= 0;
569 if (!responseContentMatches(response
, responseLen
, ids
->qname
, ids
->qtype
, ids
->qclass
, dss
->remote
, consumed
)) {
573 bool isDoH
= du
!= nullptr;
574 /* atomically mark the state as available, but only if it has not been altered
576 if (ids
->tryMarkUnused(usageIndicator
)) {
577 /* clear the potential DOHUnit asap, it's ours now
578 and since we just marked the state as unused,
579 someone could overwrite it. */
581 /* we only decrement the outstanding counter if the value was not
582 altered in the meantime, which would mean that the state has been actively reused
583 and the other thread has not incremented the outstanding counter, so we don't
584 want it to be decremented twice. */
585 --dss
->outstanding
; // you'd think an attacker could game this, but we're using connected socket
587 /* someone updated the state in the meantime, we can't touch the existing pointer */
589 /* since the state has been updated, we can't safely access it so let's just drop
594 if(dh
->tc
&& g_truncateTC
) {
595 truncateTC(response
, &responseLen
, responseSize
, consumed
);
598 dh
->id
= ids
->origID
;
600 uint16_t addRoom
= 0;
601 DNSResponse dr
= makeDNSResponseFromIDState(*ids
, dh
, sizeof(packet
), responseLen
, false);
602 if (dr
.dnsCryptQuery
) {
603 addRoom
= DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE
;
606 memcpy(&cleartextDH
, dr
.dh
, sizeof(cleartextDH
));
607 if (!processResponse(&response
, &responseLen
, &responseSize
, localRespRulactions
, dr
, addRoom
, rewrittenResponse
, ids
->cs
&& ids
->cs
->muted
)) {
611 if (ids
->cs
&& !ids
->cs
->muted
) {
613 #ifdef HAVE_DNS_OVER_HTTPS
615 du
->response
= std::string(response
, responseLen
);
616 if (send(du
->rsock
, &du
, sizeof(du
), 0) != sizeof(du
)) {
617 /* at this point we have the only remaining pointer on this
618 DOHUnit object since we did set ids->du to nullptr earlier,
619 except if we got the response before the pointer could be
620 released by the frontend */
623 #endif /* HAVE_DNS_OVER_HTTPS */
628 empty
.sin4
.sin_family
= 0;
629 /* if ids->destHarvested is false, origDest holds the listening address.
630 We don't want to use that as a source since it could be 0.0.0.0 for example. */
631 sendUDPResponse(origFD
, response
, responseLen
, dr
.delayMsec
, ids
->destHarvested
? ids
->origDest
: empty
, ids
->origRemote
);
637 ++ids
->cs
->responses
;
641 double udiff
= ids
->sentTime
.udiff();
642 vinfolog("Got answer from %s, relayed to %s%s, took %f usec", dss
->remote
.toStringWithPort(), ids
->origRemote
.toStringWithPort(),
643 isDoH
? " (https)": "", udiff
);
647 g_rings
.insertResponse(ts
, *dr
.remote
, *dr
.qname
, dr
.qtype
, static_cast<unsigned int>(udiff
), static_cast<unsigned int>(got
), cleartextDH
, dss
->remote
);
649 switch (cleartextDH
.rcode
) {
650 case RCode::NXDomain
:
651 ++g_stats
.frontendNXDomain
;
653 case RCode::ServFail
:
654 ++g_stats
.servfailResponses
;
655 ++g_stats
.frontendServFail
;
658 ++g_stats
.frontendNoError
;
661 dss
->latencyUsec
= (127.0 * dss
->latencyUsec
/ 128.0) + udiff
/128.0;
663 doLatencyStats(udiff
);
665 rewrittenResponse
.clear();
668 catch(const std::exception
& e
){
669 vinfolog("Got an error in UDP responder thread while parsing a response from %s, id %d: %s", dss
->remote
.toStringWithPort(), queryId
, e
.what());
673 catch(const std::exception
& e
)
675 errlog("UDP responder thread died because of exception: %s", e
.what());
677 catch(const PDNSException
& e
)
679 errlog("UDP responder thread died because of PowerDNS exception: %s", e
.reason
);
683 errlog("UDP responder thread died because of an exception: %s", "unknown");
686 bool DownstreamState::reconnect()
688 std::unique_lock
<std::mutex
> tl(connectLock
, std::try_to_lock
);
689 if (!tl
.owns_lock()) {
690 /* we are already reconnecting */
695 for (auto& fd
: sockets
) {
697 if (sockets
.size() > 1) {
698 std::lock_guard
<std::mutex
> lock(socketsLock
);
699 mplexer
->removeReadFD(fd
);
701 /* shutdown() is needed to wake up recv() in the responderThread */
702 shutdown(fd
, SHUT_RDWR
);
706 if (!IsAnyAddress(remote
)) {
707 fd
= SSocket(remote
.sin4
.sin_family
, SOCK_DGRAM
, 0);
708 if (!IsAnyAddress(sourceAddr
)) {
709 SSetsockopt(fd
, SOL_SOCKET
, SO_REUSEADDR
, 1);
710 if (!sourceItfName
.empty()) {
711 #ifdef SO_BINDTODEVICE
712 int res
= setsockopt(fd
, SOL_SOCKET
, SO_BINDTODEVICE
, sourceItfName
.c_str(), sourceItfName
.length());
714 infolog("Error setting up the interface on backend socket '%s': %s", remote
.toStringWithPort(), stringerror());
719 SBind(fd
, sourceAddr
);
722 SConnect(fd
, remote
);
723 if (sockets
.size() > 1) {
724 std::lock_guard
<std::mutex
> lock(socketsLock
);
725 mplexer
->addReadFD(fd
, [](int, boost::any
) {});
729 catch(const std::runtime_error
& error
) {
730 infolog("Error connecting to new server with address %s: %s", remote
.toStringWithPort(), error
.what());
737 /* if at least one (re-)connection failed, close all sockets */
739 for (auto& fd
: sockets
) {
741 if (sockets
.size() > 1) {
742 std::lock_guard
<std::mutex
> lock(socketsLock
);
743 mplexer
->removeReadFD(fd
);
745 /* shutdown() is needed to wake up recv() in the responderThread */
746 shutdown(fd
, SHUT_RDWR
);
755 void DownstreamState::hash()
757 vinfolog("Computing hashes for id=%s and weight=%d", id
, weight
);
759 WriteLock
wl(&d_lock
);
762 std::string uuid
= boost::str(boost::format("%s-%d") % id
% w
);
763 unsigned int wshash
= burtleCI((const unsigned char*)uuid
.c_str(), uuid
.size(), g_hashperturb
);
764 hashes
.insert(wshash
);
769 void DownstreamState::setId(const boost::uuids::uuid
& newId
)
772 // compute hashes only if already done
773 if (!hashes
.empty()) {
778 void DownstreamState::setWeight(int newWeight
)
781 errlog("Error setting server's weight: downstream weight value must be greater than 0.");
785 if (!hashes
.empty()) {
790 DownstreamState::DownstreamState(const ComboAddress
& remote_
, const ComboAddress
& sourceAddr_
, unsigned int sourceItf_
, const std::string
& sourceItfName_
, size_t numberOfSockets
, bool connect
=true): sourceItfName(sourceItfName_
), remote(remote_
), sourceAddr(sourceAddr_
), sourceItf(sourceItf_
)
792 pthread_rwlock_init(&d_lock
, nullptr);
794 threadStarted
.clear();
796 mplexer
= std::unique_ptr
<FDMultiplexer
>(FDMultiplexer::getMultiplexerSilent());
798 sockets
.resize(numberOfSockets
);
799 for (auto& fd
: sockets
) {
803 if (connect
&& !IsAnyAddress(remote
)) {
805 idStates
.resize(g_maxOutstanding
);
807 infolog("Added downstream server %s", remote
.toStringWithPort());
812 std::mutex g_luamutex
;
815 GlobalStateHolder
<ServerPolicy
> g_policy
;
817 shared_ptr
<DownstreamState
> firstAvailable(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
819 for(auto& d
: servers
) {
820 if(d
.second
->isUp() && d
.second
->qps
.check())
823 return leastOutstanding(servers
, dq
);
826 // get server with least outstanding queries, and within those, with the lowest order, and within those: the fastest
827 shared_ptr
<DownstreamState
> leastOutstanding(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
829 if (servers
.size() == 1 && servers
[0].second
->isUp()) {
830 return servers
[0].second
;
833 vector
<pair
<tuple
<int,int,double>, shared_ptr
<DownstreamState
>>> poss
;
834 /* so you might wonder, why do we go through this trouble? The data on which we sort could change during the sort,
835 which would suck royally and could even lead to crashes. So first we snapshot on what we sort, and then we sort */
836 poss
.reserve(servers
.size());
837 for(auto& d
: servers
) {
838 if(d
.second
->isUp()) {
839 poss
.push_back({make_tuple(d
.second
->outstanding
.load(), d
.second
->order
, d
.second
->latencyUsec
), d
.second
});
843 return shared_ptr
<DownstreamState
>();
844 nth_element(poss
.begin(), poss
.begin(), poss
.end(), [](const decltype(poss
)::value_type
& a
, const decltype(poss
)::value_type
& b
) { return a
.first
< b
.first
; });
845 return poss
.begin()->second
;
848 shared_ptr
<DownstreamState
> valrandom(unsigned int val
, const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
850 vector
<pair
<int, shared_ptr
<DownstreamState
>>> poss
;
852 int max
= std::numeric_limits
<int>::max();
854 for(auto& d
: servers
) { // w=1, w=10 -> 1, 11
855 if(d
.second
->isUp()) {
856 // Don't overflow sum when adding high weights
857 if(d
.second
->weight
> max
- sum
) {
860 sum
+= d
.second
->weight
;
863 poss
.push_back({sum
, d
.second
});
867 // Catch poss & sum are empty to avoid SIGFPE
869 return shared_ptr
<DownstreamState
>();
872 auto p
= upper_bound(poss
.begin(), poss
.end(),r
, [](int r_
, const decltype(poss
)::value_type
& a
) { return r_
< a
.first
;});
874 return shared_ptr
<DownstreamState
>();
878 shared_ptr
<DownstreamState
> wrandom(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
880 return valrandom(random(), servers
, dq
);
883 uint32_t g_hashperturb
;
884 double g_consistentHashBalancingFactor
= 0;
885 shared_ptr
<DownstreamState
> whashed(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
887 return valrandom(dq
->qname
->hash(g_hashperturb
), servers
, dq
);
890 shared_ptr
<DownstreamState
> chashed(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
892 unsigned int qhash
= dq
->qname
->hash(g_hashperturb
);
893 unsigned int sel
= std::numeric_limits
<unsigned int>::max();
894 unsigned int min
= std::numeric_limits
<unsigned int>::max();
895 shared_ptr
<DownstreamState
> ret
= nullptr, first
= nullptr;
897 double targetLoad
= std::numeric_limits
<double>::max();
898 if (g_consistentHashBalancingFactor
> 0) {
899 /* we start with one, representing the query we are currently handling */
900 double currentLoad
= 1;
901 for (const auto& pair
: servers
) {
902 currentLoad
+= pair
.second
->outstanding
;
904 targetLoad
= (currentLoad
/ servers
.size()) * g_consistentHashBalancingFactor
;
907 for (const auto& d
: servers
) {
908 if (d
.second
->isUp() && d
.second
->outstanding
<= targetLoad
) {
909 // make sure hashes have been computed
910 if (d
.second
->hashes
.empty()) {
914 ReadLock
rl(&(d
.second
->d_lock
));
915 const auto& server
= d
.second
;
916 // we want to keep track of the last hash
917 if (min
> *(server
->hashes
.begin())) {
918 min
= *(server
->hashes
.begin());
922 auto hash_it
= server
->hashes
.lower_bound(qhash
);
923 if (hash_it
!= server
->hashes
.end()) {
924 if (*hash_it
< sel
) {
932 if (ret
!= nullptr) {
935 if (first
!= nullptr) {
938 return shared_ptr
<DownstreamState
>();
941 shared_ptr
<DownstreamState
> roundrobin(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
943 NumberedServerVector poss
;
945 for(auto& d
: servers
) {
946 if(d
.second
->isUp()) {
951 const auto *res
=&poss
;
952 if(poss
.empty() && !g_roundrobinFailOnNoServer
)
956 return shared_ptr
<DownstreamState
>();
958 static unsigned int counter
;
960 return (*res
)[(counter
++) % res
->size()].second
;
963 ComboAddress g_serverControl
{"127.0.0.1:5199"};
965 std::shared_ptr
<ServerPool
> createPoolIfNotExists(pools_t
& pools
, const string
& poolName
)
967 std::shared_ptr
<ServerPool
> pool
;
968 pools_t::iterator it
= pools
.find(poolName
);
969 if (it
!= pools
.end()) {
973 if (!poolName
.empty())
974 vinfolog("Creating pool %s", poolName
);
975 pool
= std::make_shared
<ServerPool
>();
976 pools
.insert(std::pair
<std::string
,std::shared_ptr
<ServerPool
> >(poolName
, pool
));
981 void setPoolPolicy(pools_t
& pools
, const string
& poolName
, std::shared_ptr
<ServerPolicy
> policy
)
983 std::shared_ptr
<ServerPool
> pool
= createPoolIfNotExists(pools
, poolName
);
984 if (!poolName
.empty()) {
985 vinfolog("Setting pool %s server selection policy to %s", poolName
, policy
->name
);
987 vinfolog("Setting default pool server selection policy to %s", policy
->name
);
989 pool
->policy
= policy
;
992 void addServerToPool(pools_t
& pools
, const string
& poolName
, std::shared_ptr
<DownstreamState
> server
)
994 std::shared_ptr
<ServerPool
> pool
= createPoolIfNotExists(pools
, poolName
);
995 if (!poolName
.empty()) {
996 vinfolog("Adding server to pool %s", poolName
);
998 vinfolog("Adding server to default pool");
1000 pool
->addServer(server
);
1003 void removeServerFromPool(pools_t
& pools
, const string
& poolName
, std::shared_ptr
<DownstreamState
> server
)
1005 std::shared_ptr
<ServerPool
> pool
= getPool(pools
, poolName
);
1007 if (!poolName
.empty()) {
1008 vinfolog("Removing server from pool %s", poolName
);
1011 vinfolog("Removing server from default pool");
1014 pool
->removeServer(server
);
1017 std::shared_ptr
<ServerPool
> getPool(const pools_t
& pools
, const std::string
& poolName
)
1019 pools_t::const_iterator it
= pools
.find(poolName
);
1021 if (it
== pools
.end()) {
1022 throw std::out_of_range("No pool named " + poolName
);
1028 NumberedServerVector
getDownstreamCandidates(const pools_t
& pools
, const std::string
& poolName
)
1030 std::shared_ptr
<ServerPool
> pool
= getPool(pools
, poolName
);
1031 return pool
->getServers();
1034 static void spoofResponseFromString(DNSQuestion
& dq
, const string
& spoofContent
)
1038 std::vector
<std::string
> addrs
;
1039 stringtok(addrs
, spoofContent
, " ,");
1041 if (addrs
.size() == 1) {
1043 ComboAddress
spoofAddr(spoofContent
);
1044 SpoofAction
sa({spoofAddr
});
1047 catch(const PDNSException
&e
) {
1048 SpoofAction
sa(spoofContent
); // CNAME then
1052 std::vector
<ComboAddress
> cas
;
1053 for (const auto& addr
: addrs
) {
1055 cas
.push_back(ComboAddress(addr
));
1060 SpoofAction
sa(cas
);
1065 bool processRulesResult(const DNSAction::Action
& action
, DNSQuestion
& dq
, std::string
& ruleresult
, bool& drop
)
1068 case DNSAction::Action::Allow
:
1071 case DNSAction::Action::Drop
:
1076 case DNSAction::Action::Nxdomain
:
1077 dq
.dh
->rcode
= RCode::NXDomain
;
1079 ++g_stats
.ruleNXDomain
;
1082 case DNSAction::Action::Refused
:
1083 dq
.dh
->rcode
= RCode::Refused
;
1085 ++g_stats
.ruleRefused
;
1088 case DNSAction::Action::ServFail
:
1089 dq
.dh
->rcode
= RCode::ServFail
;
1091 ++g_stats
.ruleServFail
;
1094 case DNSAction::Action::Spoof
:
1095 spoofResponseFromString(dq
, ruleresult
);
1098 case DNSAction::Action::Truncate
:
1101 dq
.dh
->ra
= dq
.dh
->rd
;
1106 case DNSAction::Action::HeaderModify
:
1109 case DNSAction::Action::Pool
:
1110 dq
.poolname
=ruleresult
;
1113 case DNSAction::Action::NoRecurse
:
1117 /* non-terminal actions follow */
1118 case DNSAction::Action::Delay
:
1119 dq
.delayMsec
= static_cast<int>(pdns_stou(ruleresult
)); // sorry
1121 case DNSAction::Action::None
:
1123 case DNSAction::Action::NoOp
:
1127 /* false means that we don't stop the processing */
1132 static bool applyRulesToQuery(LocalHolders
& holders
, DNSQuestion
& dq
, const struct timespec
& now
)
1134 g_rings
.insertQuery(now
, *dq
.remote
, *dq
.qname
, dq
.qtype
, dq
.len
, *dq
.dh
);
1136 if(g_qcount
.enabled
) {
1137 string qname
= (*dq
.qname
).toLogString();
1138 bool countQuery
{true};
1139 if(g_qcount
.filter
) {
1140 std::lock_guard
<std::mutex
> lock(g_luamutex
);
1141 std::tie (countQuery
, qname
) = g_qcount
.filter(&dq
);
1145 WriteLock
wl(&g_qcount
.queryLock
);
1146 if(!g_qcount
.records
.count(qname
)) {
1147 g_qcount
.records
[qname
] = 0;
1149 g_qcount
.records
[qname
]++;
1153 if(auto got
= holders
.dynNMGBlock
->lookup(*dq
.remote
)) {
1154 auto updateBlockStats
= [&got
]() {
1155 ++g_stats
.dynBlocked
;
1156 got
->second
.blocks
++;
1159 if(now
< got
->second
.until
) {
1160 DNSAction::Action action
= got
->second
.action
;
1161 if (action
== DNSAction::Action::None
) {
1162 action
= g_dynBlockAction
;
1165 case DNSAction::Action::NoOp
:
1169 case DNSAction::Action::Nxdomain
:
1170 vinfolog("Query from %s turned into NXDomain because of dynamic block", dq
.remote
->toStringWithPort());
1173 dq
.dh
->rcode
= RCode::NXDomain
;
1177 case DNSAction::Action::Refused
:
1178 vinfolog("Query from %s refused because of dynamic block", dq
.remote
->toStringWithPort());
1181 dq
.dh
->rcode
= RCode::Refused
;
1185 case DNSAction::Action::Truncate
:
1188 vinfolog("Query from %s truncated because of dynamic block", dq
.remote
->toStringWithPort());
1191 dq
.dh
->ra
= dq
.dh
->rd
;
1197 vinfolog("Query from %s for %s over TCP *not* truncated because of dynamic block", dq
.remote
->toStringWithPort(), dq
.qname
->toLogString());
1200 case DNSAction::Action::NoRecurse
:
1202 vinfolog("Query from %s setting rd=0 because of dynamic block", dq
.remote
->toStringWithPort());
1207 vinfolog("Query from %s dropped because of dynamic block", dq
.remote
->toStringWithPort());
1213 if(auto got
= holders
.dynSMTBlock
->lookup(*dq
.qname
)) {
1214 auto updateBlockStats
= [&got
]() {
1215 ++g_stats
.dynBlocked
;
1219 if(now
< got
->until
) {
1220 DNSAction::Action action
= got
->action
;
1221 if (action
== DNSAction::Action::None
) {
1222 action
= g_dynBlockAction
;
1225 case DNSAction::Action::NoOp
:
1228 case DNSAction::Action::Nxdomain
:
1229 vinfolog("Query from %s for %s turned into NXDomain because of dynamic block", dq
.remote
->toStringWithPort(), dq
.qname
->toLogString());
1232 dq
.dh
->rcode
= RCode::NXDomain
;
1235 case DNSAction::Action::Refused
:
1236 vinfolog("Query from %s for %s refused because of dynamic block", dq
.remote
->toStringWithPort(), dq
.qname
->toLogString());
1239 dq
.dh
->rcode
= RCode::Refused
;
1242 case DNSAction::Action::Truncate
:
1246 vinfolog("Query from %s for %s truncated because of dynamic block", dq
.remote
->toStringWithPort(), dq
.qname
->toLogString());
1249 dq
.dh
->ra
= dq
.dh
->rd
;
1255 vinfolog("Query from %s for %s over TCP *not* truncated because of dynamic block", dq
.remote
->toStringWithPort(), dq
.qname
->toLogString());
1258 case DNSAction::Action::NoRecurse
:
1260 vinfolog("Query from %s setting rd=0 because of dynamic block", dq
.remote
->toStringWithPort());
1265 vinfolog("Query from %s for %s dropped because of dynamic block", dq
.remote
->toStringWithPort(), dq
.qname
->toLogString());
1271 DNSAction::Action action
=DNSAction::Action::None
;
1274 for(const auto& lr
: *holders
.rulactions
) {
1275 if(lr
.d_rule
->matches(&dq
)) {
1276 lr
.d_rule
->d_matches
++;
1277 action
=(*lr
.d_action
)(&dq
, &ruleresult
);
1278 if (processRulesResult(action
, dq
, ruleresult
, drop
)) {
1291 ssize_t
udpClientSendRequestToBackend(const std::shared_ptr
<DownstreamState
>& ss
, const int sd
, const char* request
, const size_t requestLen
, bool healthCheck
)
1295 if (ss
->sourceItf
== 0) {
1296 result
= send(sd
, request
, requestLen
, 0);
1301 cmsgbuf_aligned cbuf
;
1302 ComboAddress
remote(ss
->remote
);
1303 fillMSGHdr(&msgh
, &iov
, &cbuf
, sizeof(cbuf
), const_cast<char*>(request
), requestLen
, &remote
);
1304 addCMsgSrcAddr(&msgh
, &cbuf
, &ss
->sourceAddr
, ss
->sourceItf
);
1305 result
= sendmsg(sd
, &msgh
, 0);
1309 int savederrno
= errno
;
1310 vinfolog("Error sending request to backend %s: %d", ss
->remote
.toStringWithPort(), savederrno
);
1312 /* This might sound silly, but on Linux send() might fail with EINVAL
1313 if the interface the socket was bound to doesn't exist anymore.
1314 We don't want to reconnect the real socket if the healthcheck failed,
1315 because it's not using the same socket.
1317 if (!healthCheck
&& (savederrno
== EINVAL
|| savederrno
== ENODEV
)) {
1325 static bool isUDPQueryAcceptable(ClientState
& cs
, LocalHolders
& holders
, const struct msghdr
* msgh
, const ComboAddress
& remote
, ComboAddress
& dest
)
1327 if (msgh
->msg_flags
& MSG_TRUNC
) {
1328 /* message was too large for our buffer */
1329 vinfolog("Dropping message too large for our buffer");
1330 ++g_stats
.nonCompliantQueries
;
1334 if(!holders
.acl
->match(remote
)) {
1335 vinfolog("Query from %s dropped because of ACL", remote
.toStringWithPort());
1343 if (HarvestDestinationAddress(msgh
, &dest
)) {
1344 /* we don't get the port, only the address */
1345 dest
.sin4
.sin_port
= cs
.local
.sin4
.sin_port
;
1348 dest
.sin4
.sin_family
= 0;
1354 boost::optional
<std::vector
<uint8_t>> checkDNSCryptQuery(const ClientState
& cs
, const char* query
, uint16_t& len
, std::shared_ptr
<DNSCryptQuery
>& dnsCryptQuery
, time_t now
, bool tcp
)
1356 if (cs
.dnscryptCtx
) {
1357 #ifdef HAVE_DNSCRYPT
1358 vector
<uint8_t> response
;
1359 uint16_t decryptedQueryLen
= 0;
1361 dnsCryptQuery
= std::make_shared
<DNSCryptQuery
>(cs
.dnscryptCtx
);
1363 bool decrypted
= handleDNSCryptQuery(const_cast<char*>(query
), len
, dnsCryptQuery
, &decryptedQueryLen
, tcp
, now
, response
);
1366 if (response
.size() > 0) {
1369 throw std::runtime_error("Unable to decrypt DNSCrypt query, dropping.");
1372 len
= decryptedQueryLen
;
1373 #endif /* HAVE_DNSCRYPT */
1378 bool checkQueryHeaders(const struct dnsheader
* dh
)
1380 if (dh
->qr
) { // don't respond to responses
1381 ++g_stats
.nonCompliantQueries
;
1385 if (dh
->qdcount
== 0) {
1386 ++g_stats
.emptyQueries
;
1391 ++g_stats
.rdQueries
;
1397 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1398 static void queueResponse(const ClientState
& cs
, const char* response
, uint16_t responseLen
, const ComboAddress
& dest
, const ComboAddress
& remote
, struct mmsghdr
& outMsg
, struct iovec
* iov
, cmsgbuf_aligned
* cbuf
)
1401 fillMSGHdr(&outMsg
.msg_hdr
, iov
, nullptr, 0, const_cast<char*>(response
), responseLen
, const_cast<ComboAddress
*>(&remote
));
1403 if (dest
.sin4
.sin_family
== 0) {
1404 outMsg
.msg_hdr
.msg_control
= nullptr;
1407 addCMsgSrcAddr(&outMsg
.msg_hdr
, cbuf
, &dest
, 0);
1410 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1412 /* self-generated responses or cache hits */
1413 static bool prepareOutgoingResponse(LocalHolders
& holders
, ClientState
& cs
, DNSQuestion
& dq
, bool cacheHit
)
1415 DNSResponse
dr(dq
.qname
, dq
.qtype
, dq
.qclass
, dq
.consumed
, dq
.local
, dq
.remote
, reinterpret_cast<dnsheader
*>(dq
.dh
), dq
.size
, dq
.len
, dq
.tcp
, dq
.queryTime
);
1417 #ifdef HAVE_PROTOBUF
1418 dr
.uniqueId
= dq
.uniqueId
;
1421 dr
.delayMsec
= dq
.delayMsec
;
1423 if (!applyRulesToResponse(cacheHit
? holders
.cacheHitRespRulactions
: holders
.selfAnsweredRespRulactions
, dr
)) {
1427 /* in case a rule changed it */
1428 dq
.delayMsec
= dr
.delayMsec
;
1430 #ifdef HAVE_DNSCRYPT
1432 if (!encryptResponse(reinterpret_cast<char*>(dq
.dh
), &dq
.len
, dq
.size
, dq
.tcp
, dq
.dnsCryptQuery
, nullptr, nullptr)) {
1436 #endif /* HAVE_DNSCRYPT */
1439 ++g_stats
.cacheHits
;
1442 switch (dr
.dh
->rcode
) {
1443 case RCode::NXDomain
:
1444 ++g_stats
.frontendNXDomain
;
1446 case RCode::ServFail
:
1447 ++g_stats
.frontendServFail
;
1449 case RCode::NoError
:
1450 ++g_stats
.frontendNoError
;
1454 doLatencyStats(0); // we're not going to measure this
1458 ProcessQueryResult
processQuery(DNSQuestion
& dq
, ClientState
& cs
, LocalHolders
& holders
, std::shared_ptr
<DownstreamState
>& selectedBackend
)
1460 const uint16_t queryId
= ntohs(dq
.dh
->id
);
1463 /* we need an accurate ("real") value for the response and
1464 to store into the IDS, but not for insertion into the
1465 rings for example */
1466 struct timespec now
;
1469 if (!applyRulesToQuery(holders
, dq
, now
)) {
1470 return ProcessQueryResult::Drop
;
1473 if(dq
.dh
->qr
) { // something turned it into a response
1474 fixUpQueryTurnedResponse(dq
, dq
.origFlags
);
1476 if (!prepareOutgoingResponse(holders
, cs
, dq
, false)) {
1477 return ProcessQueryResult::Drop
;
1480 ++g_stats
.selfAnswered
;
1482 return ProcessQueryResult::SendAnswer
;
1485 std::shared_ptr
<ServerPool
> serverPool
= getPool(*holders
.pools
, dq
.poolname
);
1486 dq
.packetCache
= serverPool
->packetCache
;
1487 auto policy
= *(holders
.policy
);
1488 if (serverPool
->policy
!= nullptr) {
1489 policy
= *(serverPool
->policy
);
1491 auto servers
= serverPool
->getServers();
1493 std::lock_guard
<std::mutex
> lock(g_luamutex
);
1494 selectedBackend
= policy
.policy(servers
, &dq
);
1497 selectedBackend
= policy
.policy(servers
, &dq
);
1500 uint16_t cachedResponseSize
= dq
.size
;
1501 uint32_t allowExpired
= selectedBackend
? 0 : g_staleCacheEntriesTTL
;
1503 if (dq
.packetCache
&& !dq
.skipCache
) {
1504 dq
.dnssecOK
= (getEDNSZ(dq
) & EDNS_HEADER_FLAG_DO
);
1507 if (dq
.useECS
&& ((selectedBackend
&& selectedBackend
->useECS
) || (!selectedBackend
&& serverPool
->getECS()))) {
1508 // we special case our cache in case a downstream explicitly gave us a universally valid response with a 0 scope
1509 // we need ECS parsing (parseECS) to be true so we can be sure that the initial incoming query did not have an existing
1510 // ECS option, which would make it unsuitable for the zero-scope feature.
1511 if (dq
.packetCache
&& !dq
.skipCache
&& (!selectedBackend
|| !selectedBackend
->disableZeroScope
) && dq
.packetCache
->isECSParsingEnabled()) {
1512 if (dq
.packetCache
->get(dq
, dq
.consumed
, dq
.dh
->id
, reinterpret_cast<char*>(dq
.dh
), &cachedResponseSize
, &dq
.cacheKeyNoECS
, dq
.subnet
, dq
.dnssecOK
, allowExpired
)) {
1513 dq
.len
= cachedResponseSize
;
1515 if (!prepareOutgoingResponse(holders
, cs
, dq
, true)) {
1516 return ProcessQueryResult::Drop
;
1519 return ProcessQueryResult::SendAnswer
;
1523 /* there was no existing ECS on the query, enable the zero-scope feature */
1524 dq
.useZeroScope
= true;
1528 if (!handleEDNSClientSubnet(dq
, &(dq
.ednsAdded
), &(dq
.ecsAdded
), g_preserveTrailingData
)) {
1529 vinfolog("Dropping query from %s because we couldn't insert the ECS value", dq
.remote
->toStringWithPort());
1530 return ProcessQueryResult::Drop
;
1534 if (dq
.packetCache
&& !dq
.skipCache
) {
1535 if (dq
.packetCache
->get(dq
, dq
.consumed
, dq
.dh
->id
, reinterpret_cast<char*>(dq
.dh
), &cachedResponseSize
, &dq
.cacheKey
, dq
.subnet
, dq
.dnssecOK
, allowExpired
)) {
1536 dq
.len
= cachedResponseSize
;
1538 if (!prepareOutgoingResponse(holders
, cs
, dq
, true)) {
1539 return ProcessQueryResult::Drop
;
1542 return ProcessQueryResult::SendAnswer
;
1544 ++g_stats
.cacheMisses
;
1547 if(!selectedBackend
) {
1550 vinfolog("%s query for %s|%s from %s, no policy applied", g_servFailOnNoPolicy
? "ServFailed" : "Dropped", dq
.qname
->toLogString(), QType(dq
.qtype
).getName(), dq
.remote
->toStringWithPort());
1551 if (g_servFailOnNoPolicy
) {
1552 restoreFlags(dq
.dh
, dq
.origFlags
);
1554 dq
.dh
->rcode
= RCode::ServFail
;
1557 if (!prepareOutgoingResponse(holders
, cs
, dq
, false)) {
1558 return ProcessQueryResult::Drop
;
1560 // no response-only statistics counter to update.
1561 return ProcessQueryResult::SendAnswer
;
1564 return ProcessQueryResult::Drop
;
1567 if (dq
.addXPF
&& selectedBackend
->xpfRRCode
!= 0) {
1568 addXPF(dq
, selectedBackend
->xpfRRCode
, g_preserveTrailingData
);
1571 selectedBackend
->queries
++;
1572 return ProcessQueryResult::PassToBackend
;
1574 catch(const std::exception
& e
){
1575 vinfolog("Got an error while parsing a %s query from %s, id %d: %s", (dq
.tcp
? "TCP" : "UDP"), dq
.remote
->toStringWithPort(), queryId
, e
.what());
1577 return ProcessQueryResult::Drop
;
1580 static void processUDPQuery(ClientState
& cs
, LocalHolders
& holders
, const struct msghdr
* msgh
, const ComboAddress
& remote
, ComboAddress
& dest
, char* query
, uint16_t len
, size_t queryBufferSize
, struct mmsghdr
* responsesVect
, unsigned int* queuedResponses
, struct iovec
* respIOV
, cmsgbuf_aligned
* respCBuf
)
1582 assert(responsesVect
== nullptr || (queuedResponses
!= nullptr && respIOV
!= nullptr && respCBuf
!= nullptr));
1583 uint16_t queryId
= 0;
1586 if (!isUDPQueryAcceptable(cs
, holders
, msgh
, remote
, dest
)) {
1590 /* we need an accurate ("real") value for the response and
1591 to store into the IDS, but not for insertion into the
1592 rings for example */
1593 struct timespec queryRealTime
;
1594 gettime(&queryRealTime
, true);
1596 std::shared_ptr
<DNSCryptQuery
> dnsCryptQuery
= nullptr;
1597 auto dnsCryptResponse
= checkDNSCryptQuery(cs
, query
, len
, dnsCryptQuery
, queryRealTime
.tv_sec
, false);
1598 if (dnsCryptResponse
) {
1599 sendUDPResponse(cs
.udpFD
, reinterpret_cast<char*>(dnsCryptResponse
->data()), static_cast<uint16_t>(dnsCryptResponse
->size()), 0, dest
, remote
);
1603 struct dnsheader
* dh
= reinterpret_cast<struct dnsheader
*>(query
);
1604 queryId
= ntohs(dh
->id
);
1606 if (!checkQueryHeaders(dh
)) {
1610 uint16_t qtype
, qclass
;
1611 unsigned int consumed
= 0;
1612 DNSName
qname(query
, len
, sizeof(dnsheader
), false, &qtype
, &qclass
, &consumed
);
1613 DNSQuestion
dq(&qname
, qtype
, qclass
, consumed
, dest
.sin4
.sin_family
!= 0 ? &dest
: &cs
.local
, &remote
, dh
, queryBufferSize
, len
, false, &queryRealTime
);
1614 dq
.dnsCryptQuery
= std::move(dnsCryptQuery
);
1615 std::shared_ptr
<DownstreamState
> ss
{nullptr};
1616 auto result
= processQuery(dq
, cs
, holders
, ss
);
1618 if (result
== ProcessQueryResult::Drop
) {
1622 if (result
== ProcessQueryResult::SendAnswer
) {
1623 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1624 if (dq
.delayMsec
== 0 && responsesVect
!= nullptr) {
1625 queueResponse(cs
, reinterpret_cast<char*>(dq
.dh
), dq
.len
, *dq
.local
, *dq
.remote
, responsesVect
[*queuedResponses
], respIOV
, respCBuf
);
1626 (*queuedResponses
)++;
1629 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1630 /* we use dest, always, because we don't want to use the listening address to send a response since it could be 0.0.0.0 */
1631 sendUDPResponse(cs
.udpFD
, reinterpret_cast<char*>(dq
.dh
), dq
.len
, dq
.delayMsec
, dest
, *dq
.remote
);
1635 if (result
!= ProcessQueryResult::PassToBackend
|| ss
== nullptr) {
1639 unsigned int idOffset
= (ss
->idOffset
++) % ss
->idStates
.size();
1640 IDState
* ids
= &ss
->idStates
[idOffset
];
1642 DOHUnit
* du
= nullptr;
1644 /* that means that the state was in use, possibly with an allocated
1645 DOHUnit that we will need to handle, but we can't touch it before
1646 confirming that we now own this state */
1647 if (ids
->isInUse()) {
1651 /* we atomically replace the value, we now own this state */
1652 if (!ids
->markAsUsed()) {
1653 /* the state was not in use.
1654 we reset 'du' because it might have still been in use when we read it. */
1659 /* we are reusing a state, no change in outstanding but if there was an existing DOHUnit we need
1660 to handle it because it's about to be overwritten. */
1663 ++g_stats
.downstreamTimeouts
;
1664 handleDOHTimeout(du
);
1668 ids
->origFD
= cs
.udpFD
;
1669 ids
->origID
= dh
->id
;
1670 setIDStateFromDNSQuestion(*ids
, dq
, std::move(qname
));
1672 /* If we couldn't harvest the real dest addr, still
1673 write down the listening addr since it will be useful
1674 (especially if it's not an 'any' one).
1675 We need to keep track of which one it is since we may
1676 want to use the real but not the listening addr to reply.
1678 if (dest
.sin4
.sin_family
!= 0) {
1679 ids
->origDest
= dest
;
1680 ids
->destHarvested
= true;
1683 ids
->origDest
= cs
.local
;
1684 ids
->destHarvested
= false;
1689 int fd
= pickBackendSocketForSending(ss
);
1690 ssize_t ret
= udpClientSendRequestToBackend(ss
, fd
, query
, dq
.len
);
1694 ++g_stats
.downstreamSendErrors
;
1697 vinfolog("Got query for %s|%s from %s, relayed to %s", ids
->qname
.toLogString(), QType(ids
->qtype
).getName(), remote
.toStringWithPort(), ss
->getName());
1699 catch(const std::exception
& e
){
1700 vinfolog("Got an error in UDP question thread while parsing a query from %s, id %d: %s", remote
.toStringWithPort(), queryId
, e
.what());
1704 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1705 static void MultipleMessagesUDPClientThread(ClientState
* cs
, LocalHolders
& holders
)
1709 char packet
[s_maxPacketCacheEntrySize
];
1710 ComboAddress remote
;
1713 /* used by HarvestDestinationAddress */
1714 cmsgbuf_aligned cbuf
;
1716 const size_t vectSize
= g_udpVectorSize
;
1717 /* the actual buffer is larger because:
1718 - we may have to add EDNS and/or ECS
1719 - we use it for self-generated responses (from rule or cache)
1720 but we only accept incoming payloads up to that size
1722 static_assert(s_udpIncomingBufferSize
<= sizeof(MMReceiver::packet
), "the incoming buffer size should not be larger than sizeof(MMReceiver::packet)");
1724 auto recvData
= std::unique_ptr
<MMReceiver
[]>(new MMReceiver
[vectSize
]);
1725 auto msgVec
= std::unique_ptr
<struct mmsghdr
[]>(new struct mmsghdr
[vectSize
]);
1726 auto outMsgVec
= std::unique_ptr
<struct mmsghdr
[]>(new struct mmsghdr
[vectSize
]);
1728 /* initialize the structures needed to receive our messages */
1729 for (size_t idx
= 0; idx
< vectSize
; idx
++) {
1730 recvData
[idx
].remote
.sin4
.sin_family
= cs
->local
.sin4
.sin_family
;
1731 fillMSGHdr(&msgVec
[idx
].msg_hdr
, &recvData
[idx
].iov
, &recvData
[idx
].cbuf
, sizeof(recvData
[idx
].cbuf
), recvData
[idx
].packet
, s_udpIncomingBufferSize
, &recvData
[idx
].remote
);
1737 /* reset the IO vector, since it's also used to send the vector of responses
1738 to avoid having to copy the data around */
1739 for (size_t idx
= 0; idx
< vectSize
; idx
++) {
1740 recvData
[idx
].iov
.iov_base
= recvData
[idx
].packet
;
1741 recvData
[idx
].iov
.iov_len
= sizeof(recvData
[idx
].packet
);
1744 /* block until we have at least one message ready, but return
1745 as many as possible to save the syscall costs */
1746 int msgsGot
= recvmmsg(cs
->udpFD
, msgVec
.get(), vectSize
, MSG_WAITFORONE
| MSG_TRUNC
, nullptr);
1749 vinfolog("Getting UDP messages via recvmmsg() failed with: %s", stringerror());
1753 unsigned int msgsToSend
= 0;
1755 /* process the received messages */
1756 for (int msgIdx
= 0; msgIdx
< msgsGot
; msgIdx
++) {
1757 const struct msghdr
* msgh
= &msgVec
[msgIdx
].msg_hdr
;
1758 unsigned int got
= msgVec
[msgIdx
].msg_len
;
1759 const ComboAddress
& remote
= recvData
[msgIdx
].remote
;
1761 if (static_cast<size_t>(got
) < sizeof(struct dnsheader
)) {
1762 ++g_stats
.nonCompliantQueries
;
1766 processUDPQuery(*cs
, holders
, msgh
, remote
, recvData
[msgIdx
].dest
, recvData
[msgIdx
].packet
, static_cast<uint16_t>(got
), sizeof(recvData
[msgIdx
].packet
), outMsgVec
.get(), &msgsToSend
, &recvData
[msgIdx
].iov
, &recvData
[msgIdx
].cbuf
);
1770 /* immediate (not delayed or sent to a backend) responses (mostly from a rule, dynamic block
1771 or the cache) can be sent in batch too */
1773 if (msgsToSend
> 0 && msgsToSend
<= static_cast<unsigned int>(msgsGot
)) {
1774 int sent
= sendmmsg(cs
->udpFD
, outMsgVec
.get(), msgsToSend
, 0);
1776 if (sent
< 0 || static_cast<unsigned int>(sent
) != msgsToSend
) {
1777 vinfolog("Error sending responses with sendmmsg() (%d on %u): %s", sent
, msgsToSend
, stringerror());
1783 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1785 // listens to incoming queries, sends out to downstream servers, noting the intended return path
1786 static void udpClientThread(ClientState
* cs
)
1789 setThreadName("dnsdist/udpClie");
1790 LocalHolders holders
;
1792 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1793 if (g_udpVectorSize
> 1) {
1794 MultipleMessagesUDPClientThread(cs
, holders
);
1798 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1800 char packet
[s_maxPacketCacheEntrySize
];
1801 /* the actual buffer is larger because:
1802 - we may have to add EDNS and/or ECS
1803 - we use it for self-generated responses (from rule or cache)
1804 but we only accept incoming payloads up to that size
1806 static_assert(s_udpIncomingBufferSize
<= sizeof(packet
), "the incoming buffer size should not be larger than sizeof(MMReceiver::packet)");
1809 /* used by HarvestDestinationAddress */
1810 cmsgbuf_aligned cbuf
;
1812 ComboAddress remote
;
1814 remote
.sin4
.sin_family
= cs
->local
.sin4
.sin_family
;
1815 fillMSGHdr(&msgh
, &iov
, &cbuf
, sizeof(cbuf
), packet
, s_udpIncomingBufferSize
, &remote
);
1818 ssize_t got
= recvmsg(cs
->udpFD
, &msgh
, 0);
1820 if (got
< 0 || static_cast<size_t>(got
) < sizeof(struct dnsheader
)) {
1821 ++g_stats
.nonCompliantQueries
;
1825 processUDPQuery(*cs
, holders
, &msgh
, remote
, dest
, packet
, static_cast<uint16_t>(got
), sizeof(packet
), nullptr, nullptr, nullptr, nullptr);
1829 catch(const std::exception
&e
)
1831 errlog("UDP client thread died because of exception: %s", e
.what());
1833 catch(const PDNSException
&e
)
1835 errlog("UDP client thread died because of PowerDNS exception: %s", e
.reason
);
1839 errlog("UDP client thread died because of an exception: %s", "unknown");
1842 uint16_t getRandomDNSID()
1844 #ifdef HAVE_LIBSODIUM
1845 return randombytes_uniform(65536);
1847 return (random() % 65536);
1851 uint64_t g_maxTCPClientThreads
{10};
1852 std::atomic
<uint16_t> g_cacheCleaningDelay
{60};
1853 std::atomic
<uint16_t> g_cacheCleaningPercentage
{100};
1857 setThreadName("dnsdist/main");
1860 int32_t secondsToWaitLog
= 0;
1866 std::lock_guard
<std::mutex
> lock(g_luamutex
);
1867 auto f
= g_lua
.readVariable
<boost::optional
<std::function
<void()> > >("maintenance");
1871 secondsToWaitLog
= 0;
1873 catch(std::exception
&e
) {
1874 if (secondsToWaitLog
<= 0) {
1875 infolog("Error during execution of maintenance function: %s", e
.what());
1876 secondsToWaitLog
= 61;
1878 secondsToWaitLog
-= interval
;
1884 if (counter
>= g_cacheCleaningDelay
) {
1885 /* keep track, for each cache, of whether we should keep
1887 std::map
<std::shared_ptr
<DNSDistPacketCache
>, bool> caches
;
1889 /* gather all caches actually used by at least one pool, and see
1890 if something prevents us from cleaning the expired entries */
1891 auto localPools
= g_pools
.getLocal();
1892 for (const auto& entry
: *localPools
) {
1893 auto& pool
= entry
.second
;
1895 auto packetCache
= pool
->packetCache
;
1900 auto pair
= caches
.insert({packetCache
, false});
1901 auto& iter
= pair
.first
;
1902 /* if we need to keep stale data for this cache (ie, not clear
1903 expired entries when at least one pool using this cache
1904 has all its backends down) */
1905 if (packetCache
->keepStaleData() && iter
->second
== false) {
1906 /* so far all pools had at least one backend up */
1907 if (pool
->countServers(true) == 0) {
1908 iter
->second
= true;
1913 for (auto pair
: caches
) {
1914 /* shall we keep expired entries ? */
1915 if (pair
.second
== true) {
1918 auto& packetCache
= pair
.first
;
1919 size_t upTo
= (packetCache
->getMaxEntries()* (100 - g_cacheCleaningPercentage
)) / 100;
1920 packetCache
->purgeExpired(upTo
);
1925 // ponder pruning g_dynblocks of expired entries here
1929 static void secPollThread()
1931 setThreadName("dnsdist/secpoll");
1935 doSecPoll(g_secPollSuffix
);
1939 sleep(g_secPollInterval
);
1943 static void healthChecksThread()
1945 setThreadName("dnsdist/healthC");
1947 static const int interval
= 1;
1952 if(g_tcpclientthreads
->getQueuedCount() > 1 && !g_tcpclientthreads
->hasReachedMaxThreads()) {
1953 g_tcpclientthreads
->addTCPClientThread();
1956 auto mplexer
= std::shared_ptr
<FDMultiplexer
>(FDMultiplexer::getMultiplexerSilent());
1957 auto states
= g_dstates
.getLocal(); // this points to the actual shared_ptrs!
1958 for(auto& dss
: *states
) {
1959 if(++dss
->lastCheck
< dss
->checkInterval
) {
1965 if (dss
->availability
== DownstreamState::Availability::Auto
) {
1966 if (!queueHealthCheck(mplexer
, dss
)) {
1967 updateHealthCheckResult(dss
, false);
1971 auto delta
= dss
->sw
.udiffAndSet()/1000000.0;
1972 dss
->queryLoad
= 1.0*(dss
->queries
.load() - dss
->prev
.queries
.load())/delta
;
1973 dss
->dropRate
= 1.0*(dss
->reuseds
.load() - dss
->prev
.reuseds
.load())/delta
;
1974 dss
->prev
.queries
.store(dss
->queries
.load());
1975 dss
->prev
.reuseds
.store(dss
->reuseds
.load());
1977 for (IDState
& ids
: dss
->idStates
) { // timeouts
1978 int64_t usageIndicator
= ids
.usageIndicator
;
1979 if(IDState::isInUse(usageIndicator
) && ids
.age
++ > g_udpTimeout
) {
1980 /* We mark the state as unused as soon as possible
1981 to limit the risk of racing with the
1984 auto oldDU
= ids
.du
;
1986 if (!ids
.tryMarkUnused(usageIndicator
)) {
1987 /* this state has been altered in the meantime,
1988 don't go anywhere near it */
1992 handleDOHTimeout(oldDU
);
1996 ++g_stats
.downstreamTimeouts
; // this is an 'actively' discovered timeout
1997 vinfolog("Had a downstream timeout from %s (%s) for query for %s|%s from %s",
1998 dss
->remote
.toStringWithPort(), dss
->name
,
1999 ids
.qname
.toLogString(), QType(ids
.qtype
).getName(), ids
.origRemote
.toStringWithPort());
2004 struct dnsheader fake
;
2005 memset(&fake
, 0, sizeof(fake
));
2006 fake
.id
= ids
.origID
;
2008 g_rings
.insertResponse(ts
, ids
.origRemote
, ids
.qname
, ids
.qtype
, std::numeric_limits
<unsigned int>::max(), 0, fake
, dss
->remote
);
2013 handleQueuedHealthChecks(mplexer
);
2017 static void bindAny(int af
, int sock
)
2019 __attribute__((unused
)) int one
= 1;
2022 if (setsockopt(sock
, IPPROTO_IP
, IP_FREEBIND
, &one
, sizeof(one
)) < 0)
2023 warnlog("Warning: IP_FREEBIND setsockopt failed: %s", stringerror());
2028 if (setsockopt(sock
, IPPROTO_IP
, IP_BINDANY
, &one
, sizeof(one
)) < 0)
2029 warnlog("Warning: IP_BINDANY setsockopt failed: %s", stringerror());
2033 if (setsockopt(sock
, IPPROTO_IPV6
, IPV6_BINDANY
, &one
, sizeof(one
)) < 0)
2034 warnlog("Warning: IPV6_BINDANY setsockopt failed: %s", stringerror());
2037 if (setsockopt(sock
, SOL_SOCKET
, SO_BINDANY
, &one
, sizeof(one
)) < 0)
2038 warnlog("Warning: SO_BINDANY setsockopt failed: %s", stringerror());
2042 static void dropGroupPrivs(gid_t gid
)
2045 if (setgid(gid
) == 0) {
2046 if (setgroups(0, NULL
) < 0) {
2047 warnlog("Warning: Unable to drop supplementary gids: %s", stringerror());
2051 warnlog("Warning: Unable to set group ID to %d: %s", gid
, stringerror());
2056 static void dropUserPrivs(uid_t uid
)
2059 if(setuid(uid
) < 0) {
2060 warnlog("Warning: Unable to set user ID to %d: %s", uid
, stringerror());
2065 static void checkFileDescriptorsLimits(size_t udpBindsCount
, size_t tcpBindsCount
)
2067 /* stdin, stdout, stderr */
2068 size_t requiredFDsCount
= 3;
2069 auto backends
= g_dstates
.getLocal();
2070 /* UDP sockets to backends */
2071 size_t backendUDPSocketsCount
= 0;
2072 for (const auto& backend
: *backends
) {
2073 backendUDPSocketsCount
+= backend
->sockets
.size();
2075 requiredFDsCount
+= backendUDPSocketsCount
;
2076 /* TCP sockets to backends */
2077 requiredFDsCount
+= (backends
->size() * g_maxTCPClientThreads
);
2078 /* listening sockets */
2079 requiredFDsCount
+= udpBindsCount
;
2080 requiredFDsCount
+= tcpBindsCount
;
2081 /* max TCP connections currently served */
2082 requiredFDsCount
+= g_maxTCPClientThreads
;
2083 /* max pipes for communicating between TCP acceptors and client threads */
2084 requiredFDsCount
+= (g_maxTCPClientThreads
* 2);
2085 /* max TCP queued connections */
2086 requiredFDsCount
+= g_maxTCPQueuedConnections
;
2087 /* DelayPipe pipe */
2088 requiredFDsCount
+= 2;
2091 /* webserver main socket */
2093 /* console main socket */
2100 getrlimit(RLIMIT_NOFILE
, &rl
);
2101 if (rl
.rlim_cur
<= requiredFDsCount
) {
2102 warnlog("Warning, this configuration can use more than %d file descriptors, web server and console connections not included, and the current limit is %d.", std::to_string(requiredFDsCount
), std::to_string(rl
.rlim_cur
));
2104 warnlog("You can increase this value by using LimitNOFILE= in the systemd unit file or ulimit.");
2106 warnlog("You can increase this value by using ulimit.");
2111 static void setUpLocalBind(std::unique_ptr
<ClientState
>& cs
)
2113 /* skip some warnings if there is an identical UDP context */
2114 bool warn
= cs
->tcp
== false || cs
->tlsFrontend
!= nullptr || cs
->dohFrontend
!= nullptr;
2115 int& fd
= cs
->tcp
== false ? cs
->udpFD
: cs
->tcpFD
;
2118 fd
= SSocket(cs
->local
.sin4
.sin_family
, cs
->tcp
== false ? SOCK_DGRAM
: SOCK_STREAM
, 0);
2121 SSetsockopt(fd
, SOL_SOCKET
, SO_REUSEADDR
, 1);
2122 #ifdef TCP_DEFER_ACCEPT
2123 SSetsockopt(fd
, IPPROTO_TCP
, TCP_DEFER_ACCEPT
, 1);
2125 if (cs
->fastOpenQueueSize
> 0) {
2127 SSetsockopt(fd
, IPPROTO_TCP
, TCP_FASTOPEN
, cs
->fastOpenQueueSize
);
2130 warnlog("TCP Fast Open has been configured on local address '%s' but is not supported", cs
->local
.toStringWithPort());
2136 if(cs
->local
.sin4
.sin_family
== AF_INET6
) {
2137 SSetsockopt(fd
, IPPROTO_IPV6
, IPV6_V6ONLY
, 1);
2140 bindAny(cs
->local
.sin4
.sin_family
, fd
);
2142 if(!cs
->tcp
&& IsAnyAddress(cs
->local
)) {
2144 setsockopt(fd
, IPPROTO_IP
, GEN_IP_PKTINFO
, &one
, sizeof(one
)); // linux supports this, so why not - might fail on other systems
2145 #ifdef IPV6_RECVPKTINFO
2146 setsockopt(fd
, IPPROTO_IPV6
, IPV6_RECVPKTINFO
, &one
, sizeof(one
));
2150 if (cs
->reuseport
) {
2152 SSetsockopt(fd
, SOL_SOCKET
, SO_REUSEPORT
, 1);
2155 /* no need to warn again if configured but support is not available, we already did for UDP */
2156 warnlog("SO_REUSEPORT has been configured on local address '%s' but is not supported", cs
->local
.toStringWithPort());
2162 if (cs
->local
.isIPv4()) {
2164 setSocketIgnorePMTU(cs
->udpFD
);
2166 catch(const std::exception
& e
) {
2167 warnlog("Failed to set IP_MTU_DISCOVER on UDP server socket for local address '%s': %s", cs
->local
.toStringWithPort(), e
.what());
2172 const std::string
& itf
= cs
->interface
;
2174 #ifdef SO_BINDTODEVICE
2175 int res
= setsockopt(fd
, SOL_SOCKET
, SO_BINDTODEVICE
, itf
.c_str(), itf
.length());
2177 warnlog("Error setting up the interface on local address '%s': %s", cs
->local
.toStringWithPort(), stringerror());
2181 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", cs
->local
.toStringWithPort());
2187 if (g_defaultBPFFilter
) {
2188 cs
->attachFilter(g_defaultBPFFilter
);
2189 vinfolog("Attaching default BPF Filter to %s frontend %s", (!cs
->tcp
? "UDP" : "TCP"), cs
->local
.toStringWithPort());
2191 #endif /* HAVE_EBPF */
2193 if (cs
->tlsFrontend
!= nullptr) {
2194 if (!cs
->tlsFrontend
->setupTLS()) {
2195 errlog("Error while setting up TLS on local address '%s', exiting", cs
->local
.toStringWithPort());
2196 _exit(EXIT_FAILURE
);
2200 if (cs
->dohFrontend
!= nullptr) {
2201 cs
->dohFrontend
->setup();
2204 SBind(fd
, cs
->local
);
2207 SListen(cs
->tcpFD
, SOMAXCONN
);
2208 if (cs
->tlsFrontend
!= nullptr) {
2209 warnlog("Listening on %s for TLS", cs
->local
.toStringWithPort());
2211 else if (cs
->dohFrontend
!= nullptr) {
2212 warnlog("Listening on %s for DoH", cs
->local
.toStringWithPort());
2214 else if (cs
->dnscryptCtx
!= nullptr) {
2215 warnlog("Listening on %s for DNSCrypt", cs
->local
.toStringWithPort());
2218 warnlog("Listening on %s", cs
->local
.toStringWithPort());
2227 vector
<string
> locals
;
2228 vector
<string
> remotes
;
2229 bool checkConfig
{false};
2230 bool beClient
{false};
2231 bool beSupervised
{false};
2238 std::atomic
<bool> g_configurationDone
{false};
2243 cout
<<"Syntax: dnsdist [-C,--config file] [-c,--client [IP[:PORT]]]\n";
2244 cout
<<"[-e,--execute cmd] [-h,--help] [-l,--local addr]\n";
2245 cout
<<"[-v,--verbose] [--check-config] [--version]\n";
2247 cout
<<"-a,--acl netmask Add this netmask to the ACL\n";
2248 cout
<<"-C,--config file Load configuration from 'file'\n";
2249 cout
<<"-c,--client Operate as a client, connect to dnsdist. This reads\n";
2250 cout
<<" controlSocket from your configuration file, but also\n";
2251 cout
<<" accepts an IP:PORT argument\n";
2252 #ifdef HAVE_LIBSODIUM
2253 cout
<<"-k,--setkey KEY Use KEY for encrypted communication to dnsdist. This\n";
2254 cout
<<" is similar to setting setKey in the configuration file.\n";
2255 cout
<<" NOTE: this will leak this key in your shell's history\n";
2256 cout
<<" and in the systems running process list.\n";
2258 cout
<<"--check-config Validate the configuration file and exit. The exit-code\n";
2259 cout
<<" reflects the validation, 0 is OK, 1 means an error.\n";
2260 cout
<<" Any errors are printed as well.\n";
2261 cout
<<"-e,--execute cmd Connect to dnsdist and execute 'cmd'\n";
2262 cout
<<"-g,--gid gid Change the process group ID after binding sockets\n";
2263 cout
<<"-h,--help Display this helpful message\n";
2264 cout
<<"-l,--local address Listen on this local address\n";
2265 cout
<<"--supervised Don't open a console, I'm supervised\n";
2266 cout
<<" (use with e.g. systemd and daemontools)\n";
2267 cout
<<"--disable-syslog Don't log to syslog, only to stdout\n";
2268 cout
<<" (use with e.g. systemd)\n";
2269 cout
<<"-u,--uid uid Change the process user ID after binding sockets\n";
2270 cout
<<"-v,--verbose Enable verbose mode\n";
2271 cout
<<"-V,--version Show dnsdist version information and exit\n";
2274 int main(int argc
, char** argv
)
2277 size_t udpBindsCount
= 0;
2278 size_t tcpBindsCount
= 0;
2279 rl_attempted_completion_function
= my_completion
;
2280 rl_completion_append_character
= 0;
2282 signal(SIGPIPE
, SIG_IGN
);
2283 signal(SIGCHLD
, SIG_IGN
);
2284 openlog("dnsdist", LOG_PID
|LOG_NDELAY
, LOG_DAEMON
);
2286 #ifdef HAVE_LIBSODIUM
2287 if (sodium_init() == -1) {
2288 cerr
<<"Unable to initialize crypto library"<<endl
;
2291 g_hashperturb
=randombytes_uniform(0xffffffff);
2292 srandom(randombytes_uniform(0xffffffff));
2296 gettimeofday(&tv
, 0);
2297 srandom(tv
.tv_sec
^ tv
.tv_usec
^ getpid());
2298 g_hashperturb
=random();
2302 ComboAddress clientAddress
= ComboAddress();
2303 g_cmdLine
.config
=SYSCONFDIR
"/dnsdist.conf";
2304 struct option longopts
[]={
2305 {"acl", required_argument
, 0, 'a'},
2306 {"check-config", no_argument
, 0, 1},
2307 {"client", no_argument
, 0, 'c'},
2308 {"config", required_argument
, 0, 'C'},
2309 {"disable-syslog", no_argument
, 0, 2},
2310 {"execute", required_argument
, 0, 'e'},
2311 {"gid", required_argument
, 0, 'g'},
2312 {"help", no_argument
, 0, 'h'},
2313 {"local", required_argument
, 0, 'l'},
2314 {"setkey", required_argument
, 0, 'k'},
2315 {"supervised", no_argument
, 0, 3},
2316 {"uid", required_argument
, 0, 'u'},
2317 {"verbose", no_argument
, 0, 'v'},
2318 {"version", no_argument
, 0, 'V'},
2324 int c
=getopt_long(argc
, argv
, "a:cC:e:g:hk:l:u:vV", longopts
, &longindex
);
2329 g_cmdLine
.checkConfig
=true;
2335 g_cmdLine
.beSupervised
=true;
2338 g_cmdLine
.config
=optarg
;
2341 g_cmdLine
.beClient
=true;
2344 g_cmdLine
.command
=optarg
;
2347 g_cmdLine
.gid
=optarg
;
2350 cout
<<"dnsdist "<<VERSION
<<endl
;
2357 g_ACL
.modify([optstring
](NetmaskGroup
& nmg
) { nmg
.addMask(optstring
); });
2360 #ifdef HAVE_LIBSODIUM
2361 if (B64Decode(string(optarg
), g_consoleKey
) < 0) {
2362 cerr
<<"Unable to decode key '"<<optarg
<<"'."<<endl
;
2366 cerr
<<"dnsdist has been built without libsodium, -k/--setkey is unsupported."<<endl
;
2371 g_cmdLine
.locals
.push_back(trim_copy(string(optarg
)));
2374 g_cmdLine
.uid
=optarg
;
2380 #ifdef LUAJIT_VERSION
2381 cout
<<"dnsdist "<<VERSION
<<" ("<<LUA_RELEASE
<<" ["<<LUAJIT_VERSION
<<"])"<<endl
;
2383 cout
<<"dnsdist "<<VERSION
<<" ("<<LUA_RELEASE
<<")"<<endl
;
2385 cout
<<"Enabled features: ";
2389 #ifdef HAVE_DNS_OVER_TLS
2390 cout
<<"dns-over-tls(";
2402 #ifdef HAVE_DNS_OVER_HTTPS
2403 cout
<<"dns-over-https(DOH) ";
2405 #ifdef HAVE_DNSCRYPT
2414 #ifdef HAVE_LIBCRYPTO
2417 #ifdef HAVE_LIBSODIUM
2423 #ifdef HAVE_PROTOBUF
2429 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
2430 cout
<<"recvmmsg/sendmmsg ";
2432 #ifdef HAVE_NET_SNMP
2442 //getopt_long printed an error message.
2451 for(auto p
= argv
; *p
; ++p
) {
2452 if(g_cmdLine
.beClient
) {
2453 clientAddress
= ComboAddress(*p
, 5199);
2455 g_cmdLine
.remotes
.push_back(*p
);
2459 ServerPolicy leastOutstandingPol
{"leastOutstanding", leastOutstanding
, false};
2461 g_policy
.setState(leastOutstandingPol
);
2462 if(g_cmdLine
.beClient
|| !g_cmdLine
.command
.empty()) {
2463 setupLua(true, false, g_cmdLine
.config
);
2464 if (clientAddress
!= ComboAddress())
2465 g_serverControl
= clientAddress
;
2466 doClient(g_serverControl
, g_cmdLine
.command
);
2467 _exit(EXIT_SUCCESS
);
2470 auto acl
= g_ACL
.getCopy();
2472 for(auto& addr
: {"127.0.0.0/8", "10.0.0.0/8", "100.64.0.0/10", "169.254.0.0/16", "192.168.0.0/16", "172.16.0.0/12", "::1/128", "fc00::/7", "fe80::/10"})
2474 g_ACL
.setState(acl
);
2477 auto consoleACL
= g_consoleACL
.getCopy();
2478 for (const auto& mask
: { "127.0.0.1/8", "::1/128" }) {
2479 consoleACL
.addMask(mask
);
2481 g_consoleACL
.setState(consoleACL
);
2483 if (g_cmdLine
.checkConfig
) {
2484 setupLua(false, true, g_cmdLine
.config
);
2485 // No exception was thrown
2486 infolog("Configuration '%s' OK!", g_cmdLine
.config
);
2487 _exit(EXIT_SUCCESS
);
2490 auto todo
=setupLua(false, false, g_cmdLine
.config
);
2492 auto localPools
= g_pools
.getCopy();
2494 bool precompute
= false;
2495 if (g_policy
.getLocal()->name
== "chashed") {
2498 for (const auto& entry
: localPools
) {
2499 if (entry
.second
->policy
!= nullptr && entry
.second
->policy
->name
== "chashed") {
2506 vinfolog("Pre-computing hashes for consistent hash load-balancing policy");
2507 // pre compute hashes
2508 auto backends
= g_dstates
.getLocal();
2509 for (auto& backend
: *backends
) {
2515 if (!g_cmdLine
.locals
.empty()) {
2516 for (auto it
= g_frontends
.begin(); it
!= g_frontends
.end(); ) {
2517 /* DoH, DoT and DNSCrypt frontends are separate */
2518 if ((*it
)->dohFrontend
== nullptr && (*it
)->tlsFrontend
== nullptr && (*it
)->dnscryptCtx
== nullptr) {
2519 it
= g_frontends
.erase(it
);
2526 for(const auto& loc
: g_cmdLine
.locals
) {
2528 g_frontends
.push_back(std::unique_ptr
<ClientState
>(new ClientState(ComboAddress(loc
, 53), false, false, 0, "", {})));
2530 g_frontends
.push_back(std::unique_ptr
<ClientState
>(new ClientState(ComboAddress(loc
, 53), true, false, 0, "", {})));
2534 if (g_frontends
.empty()) {
2536 g_frontends
.push_back(std::unique_ptr
<ClientState
>(new ClientState(ComboAddress("127.0.0.1", 53), false, false, 0, "", {})));
2538 g_frontends
.push_back(std::unique_ptr
<ClientState
>(new ClientState(ComboAddress("127.0.0.1", 53), true, false, 0, "", {})));
2541 g_configurationDone
= true;
2543 for(auto& frontend
: g_frontends
) {
2544 setUpLocalBind(frontend
);
2546 if (frontend
->tcp
== false) {
2554 warnlog("dnsdist %s comes with ABSOLUTELY NO WARRANTY. This is free software, and you are welcome to redistribute it according to the terms of the GPL version 2", VERSION
);
2558 g_ACL
.getLocal()->toStringVector(&vec
);
2559 for(const auto& s
: vec
) {
2564 infolog("ACL allowing queries from: %s", acls
.c_str());
2567 g_consoleACL
.getLocal()->toStringVector(&vec
);
2568 for (const auto& entry
: vec
) {
2569 if (!acls
.empty()) {
2574 infolog("Console ACL allowing connections from: %s", acls
.c_str());
2576 #ifdef HAVE_LIBSODIUM
2577 if (g_consoleEnabled
&& g_consoleKey
.empty()) {
2578 warnlog("Warning, the console has been enabled via 'controlSocket()' but no key has been set with 'setKey()' so all connections will fail until a key has been set");
2585 if(!g_cmdLine
.gid
.empty())
2586 newgid
= strToGID(g_cmdLine
.gid
.c_str());
2588 if(!g_cmdLine
.uid
.empty())
2589 newuid
= strToUID(g_cmdLine
.uid
.c_str());
2591 dropGroupPrivs(newgid
);
2592 dropUserPrivs(newuid
);
2594 /* we might still have capabilities remaining,
2595 for example if we have been started as root
2596 without --uid or --gid (please don't do that)
2597 or as an unprivileged user with ambient
2598 capabilities like CAP_NET_BIND_SERVICE.
2600 dropCapabilities(g_capabilitiesToRetain
);
2602 catch(const std::exception
& e
) {
2603 warnlog("%s", e
.what());
2606 /* this need to be done _after_ dropping privileges */
2607 g_delay
= new DelayPipe
<DelayedPacket
>();
2613 g_tcpclientthreads
= std::unique_ptr
<TCPClientCollection
>(new TCPClientCollection(g_maxTCPClientThreads
, g_useTCPSinglePipe
));
2618 localPools
= g_pools
.getCopy();
2619 /* create the default pool no matter what */
2620 createPoolIfNotExists(localPools
, "");
2621 if(g_cmdLine
.remotes
.size()) {
2622 for(const auto& address
: g_cmdLine
.remotes
) {
2623 auto ret
=std::make_shared
<DownstreamState
>(ComboAddress(address
, 53));
2624 addServerToPool(localPools
, "", ret
);
2625 if (ret
->connected
&& !ret
->threadStarted
.test_and_set()) {
2626 ret
->tid
= thread(responderThread
, ret
);
2628 g_dstates
.modify([ret
](servers_t
& servers
) { servers
.push_back(ret
); });
2631 g_pools
.setState(localPools
);
2633 if(g_dstates
.getLocal()->empty()) {
2634 errlog("No downstream servers defined: all packets will get dropped");
2635 // you might define them later, but you need to know
2638 checkFileDescriptorsLimits(udpBindsCount
, tcpBindsCount
);
2640 auto mplexer
= std::shared_ptr
<FDMultiplexer
>(FDMultiplexer::getMultiplexerSilent());
2641 for(auto& dss
: g_dstates
.getCopy()) { // it is a copy, but the internal shared_ptrs are the real deal
2642 if (dss
->availability
== DownstreamState::Availability::Auto
) {
2643 if (!queueHealthCheck(mplexer
, dss
, true)) {
2644 dss
->upStatus
= false;
2645 warnlog("Marking downstream %s as 'down'", dss
->getNameWithAddr());
2649 handleQueuedHealthChecks(mplexer
, true);
2651 for(auto& cs
: g_frontends
) {
2652 if (cs
->dohFrontend
!= nullptr) {
2653 #ifdef HAVE_DNS_OVER_HTTPS
2654 std::thread
t1(dohThread
, cs
.get());
2655 if (!cs
->cpus
.empty()) {
2656 mapThreadToCPUList(t1
.native_handle(), cs
->cpus
);
2659 #endif /* HAVE_DNS_OVER_HTTPS */
2662 if (cs
->udpFD
>= 0) {
2663 thread
t1(udpClientThread
, cs
.get());
2664 if (!cs
->cpus
.empty()) {
2665 mapThreadToCPUList(t1
.native_handle(), cs
->cpus
);
2669 else if (cs
->tcpFD
>= 0) {
2670 thread
t1(tcpAcceptorThread
, cs
.get());
2671 if (!cs
->cpus
.empty()) {
2672 mapThreadToCPUList(t1
.native_handle(), cs
->cpus
);
2678 thread
carbonthread(carbonDumpThread
);
2679 carbonthread
.detach();
2681 thread
stattid(maintThread
);
2684 thread
healththread(healthChecksThread
);
2686 if (!g_secPollSuffix
.empty()) {
2687 thread
secpollthread(secPollThread
);
2688 secpollthread
.detach();
2691 if(g_cmdLine
.beSupervised
) {
2693 sd_notify(0, "READY=1");
2695 healththread
.join();
2698 healththread
.detach();
2701 _exit(EXIT_SUCCESS
);
2704 catch(const LuaContext::ExecutionErrorException
& e
) {
2706 errlog("Fatal Lua error: %s", e
.what());
2707 std::rethrow_if_nested(e
);
2708 } catch(const std::exception
& ne
) {
2709 errlog("Details: %s", ne
.what());
2711 catch(PDNSException
&ae
)
2713 errlog("Fatal pdns error: %s", ae
.reason
);
2715 _exit(EXIT_FAILURE
);
2717 catch(std::exception
&e
)
2719 errlog("Fatal error: %s", e
.what());
2720 _exit(EXIT_FAILURE
);
2722 catch(PDNSException
&ae
)
2724 errlog("Fatal pdns error: %s", ae
.reason
);
2725 _exit(EXIT_FAILURE
);
2728 uint64_t getLatencyCount(const std::string
&)
2730 return g_stats
.responses
+ g_stats
.selfAnswered
+ g_stats
.cacheHits
;