2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
29 #include <netinet/tcp.h>
31 #include <sys/resource.h>
34 #if defined (__OpenBSD__) || defined(__NetBSD__)
35 #include <readline/readline.h>
37 #include <editline/readline.h>
40 #include "dnsdist-systemd.hh"
42 #include <systemd/sd-daemon.h>
46 #include "dnsdist-cache.hh"
47 #include "dnsdist-console.hh"
48 #include "dnsdist-ecs.hh"
49 #include "dnsdist-healthchecks.hh"
50 #include "dnsdist-lua.hh"
51 #include "dnsdist-rings.hh"
52 #include "dnsdist-secpoll.hh"
53 #include "dnsdist-xpf.hh"
56 #include "delaypipe.hh"
59 #include "dnsparser.hh"
60 #include "ednsoptions.hh"
64 #include "sodcrypto.hh"
66 #include "threadname.hh"
70 Receiver is currently single threaded
71 not *that* bad actually, but now that we are thread safe, might want to scale
75 Set of Rules, if one matches, it leads to an Action
76 Both rules and actions could conceivably be Lua based.
77 On the C++ side, both could be inherited from a class Rule and a class Action,
78 on the Lua side we can't do that. */
84 struct DNSDistStats g_stats
;
85 MetricDefinitionStorage g_metricDefinitions
;
87 uint16_t g_maxOutstanding
{std::numeric_limits
<uint16_t>::max()};
88 uint32_t g_staleCacheEntriesTTL
{0};
90 bool g_allowEmptyResponse
{false};
92 GlobalStateHolder
<NetmaskGroup
> g_ACL
;
93 string g_outputBuffer
;
95 std::vector
<std::shared_ptr
<TLSFrontend
>> g_tlslocals
;
96 std::vector
<std::shared_ptr
<DOHFrontend
>> g_dohlocals
;
97 std::vector
<std::shared_ptr
<DNSCryptContext
>> g_dnsCryptLocals
;
99 shared_ptr
<BPFFilter
> g_defaultBPFFilter
;
100 std::vector
<std::shared_ptr
<DynBPFFilter
> > g_dynBPFFilters
;
101 #endif /* HAVE_EBPF */
102 std::vector
<std::unique_ptr
<ClientState
>> g_frontends
;
103 GlobalStateHolder
<pools_t
> g_pools
;
104 size_t g_udpVectorSize
{1};
106 bool g_snmpEnabled
{false};
107 bool g_snmpTrapsEnabled
{false};
108 DNSDistSNMPAgent
* g_snmpAgent
{nullptr};
110 /* UDP: the grand design. Per socket we listen on for incoming queries there is one thread.
111 Then we have a bunch of connected sockets for talking to downstream servers.
112 We send directly to those sockets.
114 For the return path, per downstream server we have a thread that listens to responses.
116 Per socket there is an array of 2^16 states, when we send out a packet downstream, we note
117 there the original requestor and the original id. The new ID is the offset in the array.
119 When an answer comes in on a socket, we look up the offset by the id, and lob it to the
122 IDs are assigned by atomic increments of the socket offset.
125 GlobalStateHolder
<vector
<DNSDistRuleAction
> > g_rulactions
;
126 GlobalStateHolder
<vector
<DNSDistResponseRuleAction
> > g_resprulactions
;
127 GlobalStateHolder
<vector
<DNSDistResponseRuleAction
> > g_cachehitresprulactions
;
128 GlobalStateHolder
<vector
<DNSDistResponseRuleAction
> > g_selfansweredresprulactions
;
133 GlobalStateHolder
<servers_t
> g_dstates
;
134 GlobalStateHolder
<NetmaskTree
<DynBlock
>> g_dynblockNMG
;
135 GlobalStateHolder
<SuffixMatchTree
<DynBlock
>> g_dynblockSMT
;
136 DNSAction::Action g_dynBlockAction
= DNSAction::Action::Drop
;
137 int g_tcpRecvTimeout
{2};
138 int g_tcpSendTimeout
{2};
141 bool g_servFailOnNoPolicy
{false};
142 bool g_truncateTC
{false};
143 bool g_fixupCase
{false};
144 bool g_preserveTrailingData
{false};
145 bool g_roundrobinFailOnNoServer
{false};
147 std::set
<std::string
> g_capabilitiesToRetain
;
149 static void truncateTC(char* packet
, uint16_t* len
, size_t responseSize
, unsigned int consumed
)
152 bool hadEDNS
= false;
153 uint16_t payloadSize
= 0;
156 if (g_addEDNSToSelfGeneratedResponses
) {
157 hadEDNS
= getEDNSUDPPayloadSizeAndZ(packet
, *len
, &payloadSize
, &z
);
160 *len
=static_cast<uint16_t>(sizeof(dnsheader
)+consumed
+DNS_TYPE_SIZE
+DNS_CLASS_SIZE
);
161 struct dnsheader
* dh
= reinterpret_cast<struct dnsheader
*>(packet
);
162 dh
->ancount
= dh
->arcount
= dh
->nscount
= 0;
165 addEDNS(dh
, *len
, responseSize
, z
& EDNS_HEADER_FLAG_DO
, payloadSize
, 0);
177 ComboAddress destination
;
178 ComboAddress origDest
;
182 if(origDest
.sin4
.sin_family
== 0) {
183 res
= sendto(fd
, packet
.c_str(), packet
.size(), 0, (struct sockaddr
*)&destination
, destination
.getSocklen());
186 res
= sendfromto(fd
, packet
.c_str(), packet
.size(), 0, origDest
, destination
);
190 vinfolog("Error sending delayed response to %s: %s", destination
.toStringWithPort(), strerror(err
));
195 DelayPipe
<DelayedPacket
>* g_delay
= nullptr;
197 void doLatencyStats(double udiff
)
199 if(udiff
< 1000) ++g_stats
.latency0_1
;
200 else if(udiff
< 10000) ++g_stats
.latency1_10
;
201 else if(udiff
< 50000) ++g_stats
.latency10_50
;
202 else if(udiff
< 100000) ++g_stats
.latency50_100
;
203 else if(udiff
< 1000000) ++g_stats
.latency100_1000
;
204 else ++g_stats
.latencySlow
;
205 g_stats
.latencySum
+= udiff
/ 1000;
207 auto doAvg
= [](double& var
, double n
, double weight
) {
208 var
= (weight
-1) * var
/weight
+ n
/weight
;
211 doAvg(g_stats
.latencyAvg100
, udiff
, 100);
212 doAvg(g_stats
.latencyAvg1000
, udiff
, 1000);
213 doAvg(g_stats
.latencyAvg10000
, udiff
, 10000);
214 doAvg(g_stats
.latencyAvg1000000
, udiff
, 1000000);
217 bool responseContentMatches(const char* response
, const uint16_t responseLen
, const DNSName
& qname
, const uint16_t qtype
, const uint16_t qclass
, const ComboAddress
& remote
, unsigned int& consumed
)
219 if (responseLen
< sizeof(dnsheader
)) {
223 const struct dnsheader
* dh
= reinterpret_cast<const struct dnsheader
*>(response
);
224 if (dh
->qdcount
== 0) {
225 if ((dh
->rcode
!= RCode::NoError
&& dh
->rcode
!= RCode::NXDomain
) || g_allowEmptyResponse
) {
229 ++g_stats
.nonCompliantResponses
;
234 uint16_t rqtype
, rqclass
;
237 rqname
=DNSName(response
, responseLen
, sizeof(dnsheader
), false, &rqtype
, &rqclass
, &consumed
);
239 catch(const std::exception
& e
) {
240 if(responseLen
> 0 && static_cast<size_t>(responseLen
) > sizeof(dnsheader
)) {
241 infolog("Backend %s sent us a response with id %d that did not parse: %s", remote
.toStringWithPort(), ntohs(dh
->id
), e
.what());
243 ++g_stats
.nonCompliantResponses
;
247 if (rqtype
!= qtype
|| rqclass
!= qclass
|| rqname
!= qname
) {
254 static void restoreFlags(struct dnsheader
* dh
, uint16_t origFlags
)
256 static const uint16_t rdMask
= 1 << FLAGS_RD_OFFSET
;
257 static const uint16_t cdMask
= 1 << FLAGS_CD_OFFSET
;
258 static const uint16_t restoreFlagsMask
= UINT16_MAX
& ~(rdMask
| cdMask
);
259 uint16_t * flags
= getFlagsFromDNSHeader(dh
);
260 /* clear the flags we are about to restore */
261 *flags
&= restoreFlagsMask
;
262 /* only keep the flags we want to restore */
263 origFlags
&= ~restoreFlagsMask
;
264 /* set the saved flags as they were */
268 static bool fixUpQueryTurnedResponse(DNSQuestion
& dq
, const uint16_t origFlags
)
270 restoreFlags(dq
.dh
, origFlags
);
272 return addEDNSToQueryTurnedResponse(dq
);
275 static bool fixUpResponse(char** response
, uint16_t* responseLen
, size_t* responseSize
, const DNSName
& qname
, uint16_t origFlags
, bool ednsAdded
, bool ecsAdded
, std::vector
<uint8_t>& rewrittenResponse
, uint16_t addRoom
, bool* zeroScope
)
277 if (*responseLen
< sizeof(dnsheader
)) {
281 struct dnsheader
* dh
= reinterpret_cast<struct dnsheader
*>(*response
);
282 restoreFlags(dh
, origFlags
);
284 if (*responseLen
== sizeof(dnsheader
)) {
289 string realname
= qname
.toDNSString();
290 if (*responseLen
>= (sizeof(dnsheader
) + realname
.length())) {
291 memcpy(*response
+ sizeof(dnsheader
), realname
.c_str(), realname
.length());
295 if (ednsAdded
|| ecsAdded
) {
300 const std::string
responseStr(*response
, *responseLen
);
301 int res
= locateEDNSOptRR(responseStr
, &optStart
, &optLen
, &last
);
304 if (zeroScope
) { // this finds if an EDNS Client Subnet scope was set, and if it is 0
305 size_t optContentStart
= 0;
306 uint16_t optContentLen
= 0;
307 /* we need at least 4 bytes after the option length (family: 2, source prefix-length: 1, scope prefix-length: 1) */
308 if (isEDNSOptionInOpt(responseStr
, optStart
, optLen
, EDNSOptionCode::ECS
, &optContentStart
, &optContentLen
) && optContentLen
>= 4) {
309 /* see if the EDNS Client Subnet SCOPE PREFIX-LENGTH byte in position 3 is set to 0, which is the only thing
311 *zeroScope
= responseStr
.at(optContentStart
+ 3) == 0;
316 /* we added the entire OPT RR,
317 therefore we need to remove it entirely */
319 /* simply remove the last AR */
320 *responseLen
-= optLen
;
321 uint16_t arcount
= ntohs(dh
->arcount
);
323 dh
->arcount
= htons(arcount
);
326 /* Removing an intermediary RR could lead to compression error */
327 if (rewriteResponseWithoutEDNS(responseStr
, rewrittenResponse
) == 0) {
328 *responseLen
= rewrittenResponse
.size();
329 if (addRoom
&& (UINT16_MAX
- *responseLen
) > addRoom
) {
330 rewrittenResponse
.reserve(*responseLen
+ addRoom
);
332 *responseSize
= rewrittenResponse
.capacity();
333 *response
= reinterpret_cast<char*>(rewrittenResponse
.data());
336 warnlog("Error rewriting content");
341 /* the OPT RR was already present, but without ECS,
342 we need to remove the ECS option if any */
344 /* nothing after the OPT RR, we can simply remove the
346 size_t existingOptLen
= optLen
;
347 removeEDNSOptionFromOPT(*response
+ optStart
, &optLen
, EDNSOptionCode::ECS
);
348 *responseLen
-= (existingOptLen
- optLen
);
351 /* Removing an intermediary RR could lead to compression error */
352 if (rewriteResponseWithoutEDNSOption(responseStr
, EDNSOptionCode::ECS
, rewrittenResponse
) == 0) {
353 *responseLen
= rewrittenResponse
.size();
354 if (addRoom
&& (UINT16_MAX
- *responseLen
) > addRoom
) {
355 rewrittenResponse
.reserve(*responseLen
+ addRoom
);
357 *responseSize
= rewrittenResponse
.capacity();
358 *response
= reinterpret_cast<char*>(rewrittenResponse
.data());
361 warnlog("Error rewriting content");
372 static bool encryptResponse(char* response
, uint16_t* responseLen
, size_t responseSize
, bool tcp
, std::shared_ptr
<DNSCryptQuery
> dnsCryptQuery
, dnsheader
** dh
, dnsheader
* dhCopy
)
375 uint16_t encryptedResponseLen
= 0;
377 /* save the original header before encrypting it in place */
378 if (dh
!= nullptr && *dh
!= nullptr && dhCopy
!= nullptr) {
379 memcpy(dhCopy
, *dh
, sizeof(dnsheader
));
383 int res
= dnsCryptQuery
->encryptResponse(response
, *responseLen
, responseSize
, tcp
, &encryptedResponseLen
);
385 *responseLen
= encryptedResponseLen
;
387 /* dropping response */
388 vinfolog("Error encrypting the response, dropping.");
394 #endif /* HAVE_DNSCRYPT */
396 static bool applyRulesToResponse(LocalStateHolder
<vector
<DNSDistResponseRuleAction
> >& localRespRulactions
, DNSResponse
& dr
)
398 DNSResponseAction::Action action
=DNSResponseAction::Action::None
;
399 std::string ruleresult
;
400 for(const auto& lr
: *localRespRulactions
) {
401 if(lr
.d_rule
->matches(&dr
)) {
402 lr
.d_rule
->d_matches
++;
403 action
=(*lr
.d_action
)(&dr
, &ruleresult
);
405 case DNSResponseAction::Action::Allow
:
408 case DNSResponseAction::Action::Drop
:
411 case DNSResponseAction::Action::HeaderModify
:
414 case DNSResponseAction::Action::ServFail
:
415 dr
.dh
->rcode
= RCode::ServFail
;
418 /* non-terminal actions follow */
419 case DNSResponseAction::Action::Delay
:
420 dr
.delayMsec
= static_cast<int>(pdns_stou(ruleresult
)); // sorry
422 case DNSResponseAction::Action::None
:
431 bool processResponse(char** response
, uint16_t* responseLen
, size_t* responseSize
, LocalStateHolder
<vector
<DNSDistResponseRuleAction
> >& localRespRulactions
, DNSResponse
& dr
, size_t addRoom
, std::vector
<uint8_t>& rewrittenResponse
, bool muted
)
433 if (!applyRulesToResponse(localRespRulactions
, dr
)) {
437 bool zeroScope
= false;
438 if (!fixUpResponse(response
, responseLen
, responseSize
, *dr
.qname
, dr
.origFlags
, dr
.ednsAdded
, dr
.ecsAdded
, rewrittenResponse
, addRoom
, dr
.useZeroScope
? &zeroScope
: nullptr)) {
442 if (dr
.packetCache
&& !dr
.skipCache
&& *responseLen
<= s_maxPacketCacheEntrySize
) {
443 if (!dr
.useZeroScope
) {
444 /* if the query was not suitable for zero-scope, for
445 example because it had an existing ECS entry so the hash is
446 not really 'no ECS', so just insert it for the existing subnet
448 - we don't have the correct hash for a non-ECS query
449 - inserting with hash computed before the ECS replacement but with
450 the subnet extracted _after_ the replacement would not work.
454 // if zeroScope, pass the pre-ECS hash-key and do not pass the subnet to the cache
455 dr
.packetCache
->insert(zeroScope
? dr
.cacheKeyNoECS
: dr
.cacheKey
, zeroScope
? boost::none
: dr
.subnet
, dr
.origFlags
, dr
.dnssecOK
, *dr
.qname
, dr
.qtype
, dr
.qclass
, *response
, *responseLen
, dr
.tcp
, dr
.dh
->rcode
, dr
.tempFailureTTL
);
460 if (!encryptResponse(*response
, responseLen
, *responseSize
, dr
.tcp
, dr
.dnsCryptQuery
, nullptr, nullptr)) {
464 #endif /* HAVE_DNSCRYPT */
469 static bool sendUDPResponse(int origFD
, const char* response
, const uint16_t responseLen
, const int delayMsec
, const ComboAddress
& origDest
, const ComboAddress
& origRemote
)
471 if(delayMsec
&& g_delay
) {
472 DelayedPacket dp
{origFD
, string(response
,responseLen
), origRemote
, origDest
};
473 g_delay
->submit(dp
, delayMsec
);
477 if(origDest
.sin4
.sin_family
== 0) {
478 res
= sendto(origFD
, response
, responseLen
, 0, reinterpret_cast<const struct sockaddr
*>(&origRemote
), origRemote
.getSocklen());
481 res
= sendfromto(origFD
, response
, responseLen
, 0, origDest
, origRemote
);
485 vinfolog("Error sending response to %s: %s", origRemote
.toStringWithPort(), stringerror(err
));
493 int pickBackendSocketForSending(std::shared_ptr
<DownstreamState
>& state
)
495 return state
->sockets
[state
->socketsOffset
++ % state
->sockets
.size()];
498 static void pickBackendSocketsReadyForReceiving(const std::shared_ptr
<DownstreamState
>& state
, std::vector
<int>& ready
)
502 if (state
->sockets
.size() == 1) {
503 ready
.push_back(state
->sockets
[0]);
508 std::lock_guard
<std::mutex
> lock(state
->socketsLock
);
509 state
->mplexer
->getAvailableFDs(ready
, -1);
513 // listens on a dedicated socket, lobs answers from downstream servers to original requestors
514 void responderThread(std::shared_ptr
<DownstreamState
> dss
)
516 setThreadName("dnsdist/respond");
517 auto localRespRulactions
= g_resprulactions
.getLocal();
518 char packet
[s_maxPacketCacheEntrySize
+ DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE
];
519 static_assert(sizeof(packet
) <= UINT16_MAX
, "Packet size should fit in a uint16_t");
520 /* when the answer is encrypted in place, we need to get a copy
521 of the original header before encryption to fill the ring buffer */
522 dnsheader cleartextDH
;
523 vector
<uint8_t> rewrittenResponse
;
525 uint16_t queryId
= 0;
526 std::vector
<int> sockets
;
527 sockets
.reserve(dss
->sockets
.size());
530 dnsheader
* dh
= reinterpret_cast<struct dnsheader
*>(packet
);
532 pickBackendSocketsReadyForReceiving(dss
, sockets
);
533 for (const auto& fd
: sockets
) {
534 ssize_t got
= recv(fd
, packet
, sizeof(packet
), 0);
535 char * response
= packet
;
536 size_t responseSize
= sizeof(packet
);
538 if (got
< 0 || static_cast<size_t>(got
) < sizeof(dnsheader
))
541 uint16_t responseLen
= static_cast<uint16_t>(got
);
544 if(queryId
>= dss
->idStates
.size()) {
548 IDState
* ids
= &dss
->idStates
[queryId
];
549 int64_t usageIndicator
= ids
->usageIndicator
;
551 if(!IDState::isInUse(usageIndicator
)) {
552 /* the corresponding state is marked as not in use, meaning that:
553 - it was already cleaned up by another thread and the state is gone ;
554 - we already got a response for this query and this one is a duplicate.
555 Either way, we don't touch it.
560 /* read the potential DOHUnit state as soon as possible, but don't use it
561 until we have confirmed that we own this state by updating usageIndicator */
563 /* setting age to 0 to prevent the maintainer thread from
564 cleaning this IDS while we process the response.
567 int origFD
= ids
->origFD
;
569 unsigned int consumed
= 0;
570 if (!responseContentMatches(response
, responseLen
, ids
->qname
, ids
->qtype
, ids
->qclass
, dss
->remote
, consumed
)) {
574 bool isDoH
= du
!= nullptr;
575 /* atomically mark the state as available, but only if it has not been altered
577 if (ids
->tryMarkUnused(usageIndicator
)) {
578 /* clear the potential DOHUnit asap, it's ours now
579 and since we just marked the state as unused,
580 someone could overwrite it. */
582 /* we only decrement the outstanding counter if the value was not
583 altered in the meantime, which would mean that the state has been actively reused
584 and the other thread has not incremented the outstanding counter, so we don't
585 want it to be decremented twice. */
586 --dss
->outstanding
; // you'd think an attacker could game this, but we're using connected socket
588 /* someone updated the state in the meantime, we can't touch the existing pointer */
590 /* since the state has been updated, we can't safely access it so let's just drop
595 if(dh
->tc
&& g_truncateTC
) {
596 truncateTC(response
, &responseLen
, responseSize
, consumed
);
599 dh
->id
= ids
->origID
;
601 uint16_t addRoom
= 0;
602 DNSResponse dr
= makeDNSResponseFromIDState(*ids
, dh
, sizeof(packet
), responseLen
, false);
603 if (dr
.dnsCryptQuery
) {
604 addRoom
= DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE
;
607 memcpy(&cleartextDH
, dr
.dh
, sizeof(cleartextDH
));
608 if (!processResponse(&response
, &responseLen
, &responseSize
, localRespRulactions
, dr
, addRoom
, rewrittenResponse
, ids
->cs
&& ids
->cs
->muted
)) {
612 if (ids
->cs
&& !ids
->cs
->muted
) {
614 #ifdef HAVE_DNS_OVER_HTTPS
616 du
->response
= std::string(response
, responseLen
);
617 if (send(du
->rsock
, &du
, sizeof(du
), 0) != sizeof(du
)) {
618 /* at this point we have the only remaining pointer on this
619 DOHUnit object since we did set ids->du to nullptr earlier,
620 except if we got the response before the pointer could be
621 released by the frontend */
624 #endif /* HAVE_DNS_OVER_HTTPS */
629 empty
.sin4
.sin_family
= 0;
630 /* if ids->destHarvested is false, origDest holds the listening address.
631 We don't want to use that as a source since it could be 0.0.0.0 for example. */
632 sendUDPResponse(origFD
, response
, responseLen
, dr
.delayMsec
, ids
->destHarvested
? ids
->origDest
: empty
, ids
->origRemote
);
638 ++ids
->cs
->responses
;
642 double udiff
= ids
->sentTime
.udiff();
643 vinfolog("Got answer from %s, relayed to %s%s, took %f usec", dss
->remote
.toStringWithPort(), ids
->origRemote
.toStringWithPort(),
644 isDoH
? " (https)": "", udiff
);
648 g_rings
.insertResponse(ts
, *dr
.remote
, *dr
.qname
, dr
.qtype
, static_cast<unsigned int>(udiff
), static_cast<unsigned int>(got
), cleartextDH
, dss
->remote
);
650 switch (cleartextDH
.rcode
) {
651 case RCode::NXDomain
:
652 ++g_stats
.frontendNXDomain
;
654 case RCode::ServFail
:
655 ++g_stats
.servfailResponses
;
656 ++g_stats
.frontendServFail
;
659 ++g_stats
.frontendNoError
;
662 dss
->latencyUsec
= (127.0 * dss
->latencyUsec
/ 128.0) + udiff
/128.0;
664 doLatencyStats(udiff
);
666 rewrittenResponse
.clear();
669 catch(const std::exception
& e
){
670 vinfolog("Got an error in UDP responder thread while parsing a response from %s, id %d: %s", dss
->remote
.toStringWithPort(), queryId
, e
.what());
674 catch(const std::exception
& e
)
676 errlog("UDP responder thread died because of exception: %s", e
.what());
678 catch(const PDNSException
& e
)
680 errlog("UDP responder thread died because of PowerDNS exception: %s", e
.reason
);
684 errlog("UDP responder thread died because of an exception: %s", "unknown");
687 bool DownstreamState::reconnect()
689 std::unique_lock
<std::mutex
> tl(connectLock
, std::try_to_lock
);
690 if (!tl
.owns_lock()) {
691 /* we are already reconnecting */
696 for (auto& fd
: sockets
) {
698 if (sockets
.size() > 1) {
699 std::lock_guard
<std::mutex
> lock(socketsLock
);
700 mplexer
->removeReadFD(fd
);
702 /* shutdown() is needed to wake up recv() in the responderThread */
703 shutdown(fd
, SHUT_RDWR
);
707 if (!IsAnyAddress(remote
)) {
708 fd
= SSocket(remote
.sin4
.sin_family
, SOCK_DGRAM
, 0);
709 if (!IsAnyAddress(sourceAddr
)) {
710 SSetsockopt(fd
, SOL_SOCKET
, SO_REUSEADDR
, 1);
711 if (!sourceItfName
.empty()) {
712 #ifdef SO_BINDTODEVICE
713 int res
= setsockopt(fd
, SOL_SOCKET
, SO_BINDTODEVICE
, sourceItfName
.c_str(), sourceItfName
.length());
715 infolog("Error setting up the interface on backend socket '%s': %s", remote
.toStringWithPort(), stringerror());
720 SBind(fd
, sourceAddr
);
723 SConnect(fd
, remote
);
724 if (sockets
.size() > 1) {
725 std::lock_guard
<std::mutex
> lock(socketsLock
);
726 mplexer
->addReadFD(fd
, [](int, boost::any
) {});
730 catch(const std::runtime_error
& error
) {
731 infolog("Error connecting to new server with address %s: %s", remote
.toStringWithPort(), error
.what());
738 /* if at least one (re-)connection failed, close all sockets */
740 for (auto& fd
: sockets
) {
742 if (sockets
.size() > 1) {
743 std::lock_guard
<std::mutex
> lock(socketsLock
);
744 mplexer
->removeReadFD(fd
);
746 /* shutdown() is needed to wake up recv() in the responderThread */
747 shutdown(fd
, SHUT_RDWR
);
756 void DownstreamState::hash()
758 vinfolog("Computing hashes for id=%s and weight=%d", id
, weight
);
760 WriteLock
wl(&d_lock
);
763 std::string uuid
= boost::str(boost::format("%s-%d") % id
% w
);
764 unsigned int wshash
= burtleCI((const unsigned char*)uuid
.c_str(), uuid
.size(), g_hashperturb
);
765 hashes
.insert(wshash
);
770 void DownstreamState::setId(const boost::uuids::uuid
& newId
)
773 // compute hashes only if already done
774 if (!hashes
.empty()) {
779 void DownstreamState::setWeight(int newWeight
)
782 errlog("Error setting server's weight: downstream weight value must be greater than 0.");
786 if (!hashes
.empty()) {
791 DownstreamState::DownstreamState(const ComboAddress
& remote_
, const ComboAddress
& sourceAddr_
, unsigned int sourceItf_
, const std::string
& sourceItfName_
, size_t numberOfSockets
, bool connect
=true): sourceItfName(sourceItfName_
), remote(remote_
), sourceAddr(sourceAddr_
), sourceItf(sourceItf_
)
793 pthread_rwlock_init(&d_lock
, nullptr);
795 threadStarted
.clear();
797 mplexer
= std::unique_ptr
<FDMultiplexer
>(FDMultiplexer::getMultiplexerSilent());
799 sockets
.resize(numberOfSockets
);
800 for (auto& fd
: sockets
) {
804 if (connect
&& !IsAnyAddress(remote
)) {
806 idStates
.resize(g_maxOutstanding
);
808 infolog("Added downstream server %s", remote
.toStringWithPort());
813 std::mutex g_luamutex
;
816 GlobalStateHolder
<ServerPolicy
> g_policy
;
818 shared_ptr
<DownstreamState
> firstAvailable(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
820 for(auto& d
: servers
) {
821 if(d
.second
->isUp() && d
.second
->qps
.check())
824 return leastOutstanding(servers
, dq
);
827 // get server with least outstanding queries, and within those, with the lowest order, and within those: the fastest
828 shared_ptr
<DownstreamState
> leastOutstanding(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
830 if (servers
.size() == 1 && servers
[0].second
->isUp()) {
831 return servers
[0].second
;
834 vector
<pair
<tuple
<int,int,double>, shared_ptr
<DownstreamState
>>> poss
;
835 /* so you might wonder, why do we go through this trouble? The data on which we sort could change during the sort,
836 which would suck royally and could even lead to crashes. So first we snapshot on what we sort, and then we sort */
837 poss
.reserve(servers
.size());
838 for(auto& d
: servers
) {
839 if(d
.second
->isUp()) {
840 poss
.push_back({make_tuple(d
.second
->outstanding
.load(), d
.second
->order
, d
.second
->latencyUsec
), d
.second
});
844 return shared_ptr
<DownstreamState
>();
845 nth_element(poss
.begin(), poss
.begin(), poss
.end(), [](const decltype(poss
)::value_type
& a
, const decltype(poss
)::value_type
& b
) { return a
.first
< b
.first
; });
846 return poss
.begin()->second
;
849 shared_ptr
<DownstreamState
> valrandom(unsigned int val
, const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
851 vector
<pair
<int, shared_ptr
<DownstreamState
>>> poss
;
853 int max
= std::numeric_limits
<int>::max();
855 for(auto& d
: servers
) { // w=1, w=10 -> 1, 11
856 if(d
.second
->isUp()) {
857 // Don't overflow sum when adding high weights
858 if(d
.second
->weight
> max
- sum
) {
861 sum
+= d
.second
->weight
;
864 poss
.push_back({sum
, d
.second
});
868 // Catch poss & sum are empty to avoid SIGFPE
870 return shared_ptr
<DownstreamState
>();
873 auto p
= upper_bound(poss
.begin(), poss
.end(),r
, [](int r_
, const decltype(poss
)::value_type
& a
) { return r_
< a
.first
;});
875 return shared_ptr
<DownstreamState
>();
879 shared_ptr
<DownstreamState
> wrandom(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
881 return valrandom(random(), servers
, dq
);
884 uint32_t g_hashperturb
;
885 double g_consistentHashBalancingFactor
= 0;
886 shared_ptr
<DownstreamState
> whashed(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
888 return valrandom(dq
->qname
->hash(g_hashperturb
), servers
, dq
);
891 shared_ptr
<DownstreamState
> chashed(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
893 unsigned int qhash
= dq
->qname
->hash(g_hashperturb
);
894 unsigned int sel
= std::numeric_limits
<unsigned int>::max();
895 unsigned int min
= std::numeric_limits
<unsigned int>::max();
896 shared_ptr
<DownstreamState
> ret
= nullptr, first
= nullptr;
898 double targetLoad
= std::numeric_limits
<double>::max();
899 if (g_consistentHashBalancingFactor
> 0) {
900 /* we start with one, representing the query we are currently handling */
901 double currentLoad
= 1;
902 for (const auto& pair
: servers
) {
903 currentLoad
+= pair
.second
->outstanding
;
905 targetLoad
= (currentLoad
/ servers
.size()) * g_consistentHashBalancingFactor
;
908 for (const auto& d
: servers
) {
909 if (d
.second
->isUp() && d
.second
->outstanding
<= targetLoad
) {
910 // make sure hashes have been computed
911 if (d
.second
->hashes
.empty()) {
915 ReadLock
rl(&(d
.second
->d_lock
));
916 const auto& server
= d
.second
;
917 // we want to keep track of the last hash
918 if (min
> *(server
->hashes
.begin())) {
919 min
= *(server
->hashes
.begin());
923 auto hash_it
= server
->hashes
.lower_bound(qhash
);
924 if (hash_it
!= server
->hashes
.end()) {
925 if (*hash_it
< sel
) {
933 if (ret
!= nullptr) {
936 if (first
!= nullptr) {
939 return shared_ptr
<DownstreamState
>();
942 shared_ptr
<DownstreamState
> roundrobin(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
944 NumberedServerVector poss
;
946 for(auto& d
: servers
) {
947 if(d
.second
->isUp()) {
952 const auto *res
=&poss
;
953 if(poss
.empty() && !g_roundrobinFailOnNoServer
)
957 return shared_ptr
<DownstreamState
>();
959 static unsigned int counter
;
961 return (*res
)[(counter
++) % res
->size()].second
;
964 ComboAddress g_serverControl
{"127.0.0.1:5199"};
966 std::shared_ptr
<ServerPool
> createPoolIfNotExists(pools_t
& pools
, const string
& poolName
)
968 std::shared_ptr
<ServerPool
> pool
;
969 pools_t::iterator it
= pools
.find(poolName
);
970 if (it
!= pools
.end()) {
974 if (!poolName
.empty())
975 vinfolog("Creating pool %s", poolName
);
976 pool
= std::make_shared
<ServerPool
>();
977 pools
.insert(std::pair
<std::string
,std::shared_ptr
<ServerPool
> >(poolName
, pool
));
982 void setPoolPolicy(pools_t
& pools
, const string
& poolName
, std::shared_ptr
<ServerPolicy
> policy
)
984 std::shared_ptr
<ServerPool
> pool
= createPoolIfNotExists(pools
, poolName
);
985 if (!poolName
.empty()) {
986 vinfolog("Setting pool %s server selection policy to %s", poolName
, policy
->name
);
988 vinfolog("Setting default pool server selection policy to %s", policy
->name
);
990 pool
->policy
= policy
;
993 void addServerToPool(pools_t
& pools
, const string
& poolName
, std::shared_ptr
<DownstreamState
> server
)
995 std::shared_ptr
<ServerPool
> pool
= createPoolIfNotExists(pools
, poolName
);
996 if (!poolName
.empty()) {
997 vinfolog("Adding server to pool %s", poolName
);
999 vinfolog("Adding server to default pool");
1001 pool
->addServer(server
);
1004 void removeServerFromPool(pools_t
& pools
, const string
& poolName
, std::shared_ptr
<DownstreamState
> server
)
1006 std::shared_ptr
<ServerPool
> pool
= getPool(pools
, poolName
);
1008 if (!poolName
.empty()) {
1009 vinfolog("Removing server from pool %s", poolName
);
1012 vinfolog("Removing server from default pool");
1015 pool
->removeServer(server
);
1018 std::shared_ptr
<ServerPool
> getPool(const pools_t
& pools
, const std::string
& poolName
)
1020 pools_t::const_iterator it
= pools
.find(poolName
);
1022 if (it
== pools
.end()) {
1023 throw std::out_of_range("No pool named " + poolName
);
1029 NumberedServerVector
getDownstreamCandidates(const pools_t
& pools
, const std::string
& poolName
)
1031 std::shared_ptr
<ServerPool
> pool
= getPool(pools
, poolName
);
1032 return pool
->getServers();
1035 static void spoofResponseFromString(DNSQuestion
& dq
, const string
& spoofContent
, bool raw
)
1040 SpoofAction
sa(spoofContent
);
1044 std::vector
<std::string
> addrs
;
1045 stringtok(addrs
, spoofContent
, " ,");
1047 if (addrs
.size() == 1) {
1049 ComboAddress
spoofAddr(spoofContent
);
1050 SpoofAction
sa({spoofAddr
});
1053 catch(const PDNSException
&e
) {
1054 DNSName
cname(spoofContent
);
1055 SpoofAction
sa(cname
); // CNAME then
1059 std::vector
<ComboAddress
> cas
;
1060 for (const auto& addr
: addrs
) {
1062 cas
.push_back(ComboAddress(addr
));
1067 SpoofAction
sa(cas
);
1073 bool processRulesResult(const DNSAction::Action
& action
, DNSQuestion
& dq
, std::string
& ruleresult
, bool& drop
)
1076 case DNSAction::Action::Allow
:
1079 case DNSAction::Action::Drop
:
1084 case DNSAction::Action::Nxdomain
:
1085 dq
.dh
->rcode
= RCode::NXDomain
;
1087 ++g_stats
.ruleNXDomain
;
1090 case DNSAction::Action::Refused
:
1091 dq
.dh
->rcode
= RCode::Refused
;
1093 ++g_stats
.ruleRefused
;
1096 case DNSAction::Action::ServFail
:
1097 dq
.dh
->rcode
= RCode::ServFail
;
1099 ++g_stats
.ruleServFail
;
1102 case DNSAction::Action::Spoof
:
1103 spoofResponseFromString(dq
, ruleresult
, false);
1106 case DNSAction::Action::SpoofRaw
:
1107 spoofResponseFromString(dq
, ruleresult
, true);
1110 case DNSAction::Action::Truncate
:
1113 dq
.dh
->ra
= dq
.dh
->rd
;
1118 case DNSAction::Action::HeaderModify
:
1121 case DNSAction::Action::Pool
:
1122 dq
.poolname
=ruleresult
;
1125 case DNSAction::Action::NoRecurse
:
1129 /* non-terminal actions follow */
1130 case DNSAction::Action::Delay
:
1131 dq
.delayMsec
= static_cast<int>(pdns_stou(ruleresult
)); // sorry
1133 case DNSAction::Action::None
:
1135 case DNSAction::Action::NoOp
:
1139 /* false means that we don't stop the processing */
1144 static bool applyRulesToQuery(LocalHolders
& holders
, DNSQuestion
& dq
, const struct timespec
& now
)
1146 g_rings
.insertQuery(now
, *dq
.remote
, *dq
.qname
, dq
.qtype
, dq
.len
, *dq
.dh
);
1148 if(g_qcount
.enabled
) {
1149 string qname
= (*dq
.qname
).toLogString();
1150 bool countQuery
{true};
1151 if(g_qcount
.filter
) {
1152 std::lock_guard
<std::mutex
> lock(g_luamutex
);
1153 std::tie (countQuery
, qname
) = g_qcount
.filter(&dq
);
1157 WriteLock
wl(&g_qcount
.queryLock
);
1158 if(!g_qcount
.records
.count(qname
)) {
1159 g_qcount
.records
[qname
] = 0;
1161 g_qcount
.records
[qname
]++;
1165 if(auto got
= holders
.dynNMGBlock
->lookup(*dq
.remote
)) {
1166 auto updateBlockStats
= [&got
]() {
1167 ++g_stats
.dynBlocked
;
1168 got
->second
.blocks
++;
1171 if(now
< got
->second
.until
) {
1172 DNSAction::Action action
= got
->second
.action
;
1173 if (action
== DNSAction::Action::None
) {
1174 action
= g_dynBlockAction
;
1177 case DNSAction::Action::NoOp
:
1181 case DNSAction::Action::Nxdomain
:
1182 vinfolog("Query from %s turned into NXDomain because of dynamic block", dq
.remote
->toStringWithPort());
1185 dq
.dh
->rcode
= RCode::NXDomain
;
1189 case DNSAction::Action::Refused
:
1190 vinfolog("Query from %s refused because of dynamic block", dq
.remote
->toStringWithPort());
1193 dq
.dh
->rcode
= RCode::Refused
;
1197 case DNSAction::Action::Truncate
:
1200 vinfolog("Query from %s truncated because of dynamic block", dq
.remote
->toStringWithPort());
1203 dq
.dh
->ra
= dq
.dh
->rd
;
1209 vinfolog("Query from %s for %s over TCP *not* truncated because of dynamic block", dq
.remote
->toStringWithPort(), dq
.qname
->toLogString());
1212 case DNSAction::Action::NoRecurse
:
1214 vinfolog("Query from %s setting rd=0 because of dynamic block", dq
.remote
->toStringWithPort());
1219 vinfolog("Query from %s dropped because of dynamic block", dq
.remote
->toStringWithPort());
1225 if(auto got
= holders
.dynSMTBlock
->lookup(*dq
.qname
)) {
1226 auto updateBlockStats
= [&got
]() {
1227 ++g_stats
.dynBlocked
;
1231 if(now
< got
->until
) {
1232 DNSAction::Action action
= got
->action
;
1233 if (action
== DNSAction::Action::None
) {
1234 action
= g_dynBlockAction
;
1237 case DNSAction::Action::NoOp
:
1240 case DNSAction::Action::Nxdomain
:
1241 vinfolog("Query from %s for %s turned into NXDomain because of dynamic block", dq
.remote
->toStringWithPort(), dq
.qname
->toLogString());
1244 dq
.dh
->rcode
= RCode::NXDomain
;
1247 case DNSAction::Action::Refused
:
1248 vinfolog("Query from %s for %s refused because of dynamic block", dq
.remote
->toStringWithPort(), dq
.qname
->toLogString());
1251 dq
.dh
->rcode
= RCode::Refused
;
1254 case DNSAction::Action::Truncate
:
1258 vinfolog("Query from %s for %s truncated because of dynamic block", dq
.remote
->toStringWithPort(), dq
.qname
->toLogString());
1261 dq
.dh
->ra
= dq
.dh
->rd
;
1267 vinfolog("Query from %s for %s over TCP *not* truncated because of dynamic block", dq
.remote
->toStringWithPort(), dq
.qname
->toLogString());
1270 case DNSAction::Action::NoRecurse
:
1272 vinfolog("Query from %s setting rd=0 because of dynamic block", dq
.remote
->toStringWithPort());
1277 vinfolog("Query from %s for %s dropped because of dynamic block", dq
.remote
->toStringWithPort(), dq
.qname
->toLogString());
1283 DNSAction::Action action
=DNSAction::Action::None
;
1286 for(const auto& lr
: *holders
.rulactions
) {
1287 if(lr
.d_rule
->matches(&dq
)) {
1288 lr
.d_rule
->d_matches
++;
1289 action
=(*lr
.d_action
)(&dq
, &ruleresult
);
1290 if (processRulesResult(action
, dq
, ruleresult
, drop
)) {
1303 ssize_t
udpClientSendRequestToBackend(const std::shared_ptr
<DownstreamState
>& ss
, const int sd
, const char* request
, const size_t requestLen
, bool healthCheck
)
1307 if (ss
->sourceItf
== 0) {
1308 result
= send(sd
, request
, requestLen
, 0);
1313 cmsgbuf_aligned cbuf
;
1314 ComboAddress
remote(ss
->remote
);
1315 fillMSGHdr(&msgh
, &iov
, &cbuf
, sizeof(cbuf
), const_cast<char*>(request
), requestLen
, &remote
);
1316 addCMsgSrcAddr(&msgh
, &cbuf
, &ss
->sourceAddr
, ss
->sourceItf
);
1317 result
= sendmsg(sd
, &msgh
, 0);
1321 int savederrno
= errno
;
1322 vinfolog("Error sending request to backend %s: %d", ss
->remote
.toStringWithPort(), savederrno
);
1324 /* This might sound silly, but on Linux send() might fail with EINVAL
1325 if the interface the socket was bound to doesn't exist anymore.
1326 We don't want to reconnect the real socket if the healthcheck failed,
1327 because it's not using the same socket.
1329 if (!healthCheck
&& (savederrno
== EINVAL
|| savederrno
== ENODEV
)) {
1337 static bool isUDPQueryAcceptable(ClientState
& cs
, LocalHolders
& holders
, const struct msghdr
* msgh
, const ComboAddress
& remote
, ComboAddress
& dest
)
1339 if (msgh
->msg_flags
& MSG_TRUNC
) {
1340 /* message was too large for our buffer */
1341 vinfolog("Dropping message too large for our buffer");
1342 ++g_stats
.nonCompliantQueries
;
1346 if(!holders
.acl
->match(remote
)) {
1347 vinfolog("Query from %s dropped because of ACL", remote
.toStringWithPort());
1355 if (HarvestDestinationAddress(msgh
, &dest
)) {
1356 /* we don't get the port, only the address */
1357 dest
.sin4
.sin_port
= cs
.local
.sin4
.sin_port
;
1360 dest
.sin4
.sin_family
= 0;
1366 boost::optional
<std::vector
<uint8_t>> checkDNSCryptQuery(const ClientState
& cs
, const char* query
, uint16_t& len
, std::shared_ptr
<DNSCryptQuery
>& dnsCryptQuery
, time_t now
, bool tcp
)
1368 if (cs
.dnscryptCtx
) {
1369 #ifdef HAVE_DNSCRYPT
1370 vector
<uint8_t> response
;
1371 uint16_t decryptedQueryLen
= 0;
1373 dnsCryptQuery
= std::make_shared
<DNSCryptQuery
>(cs
.dnscryptCtx
);
1375 bool decrypted
= handleDNSCryptQuery(const_cast<char*>(query
), len
, dnsCryptQuery
, &decryptedQueryLen
, tcp
, now
, response
);
1378 if (response
.size() > 0) {
1381 throw std::runtime_error("Unable to decrypt DNSCrypt query, dropping.");
1384 len
= decryptedQueryLen
;
1385 #endif /* HAVE_DNSCRYPT */
1390 bool checkQueryHeaders(const struct dnsheader
* dh
)
1392 if (dh
->qr
) { // don't respond to responses
1393 ++g_stats
.nonCompliantQueries
;
1397 if (dh
->qdcount
== 0) {
1398 ++g_stats
.emptyQueries
;
1403 ++g_stats
.rdQueries
;
1409 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1410 static void queueResponse(const ClientState
& cs
, const char* response
, uint16_t responseLen
, const ComboAddress
& dest
, const ComboAddress
& remote
, struct mmsghdr
& outMsg
, struct iovec
* iov
, cmsgbuf_aligned
* cbuf
)
1413 fillMSGHdr(&outMsg
.msg_hdr
, iov
, nullptr, 0, const_cast<char*>(response
), responseLen
, const_cast<ComboAddress
*>(&remote
));
1415 if (dest
.sin4
.sin_family
== 0) {
1416 outMsg
.msg_hdr
.msg_control
= nullptr;
1419 addCMsgSrcAddr(&outMsg
.msg_hdr
, cbuf
, &dest
, 0);
1422 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1424 /* self-generated responses or cache hits */
1425 static bool prepareOutgoingResponse(LocalHolders
& holders
, ClientState
& cs
, DNSQuestion
& dq
, bool cacheHit
)
1427 DNSResponse
dr(dq
.qname
, dq
.qtype
, dq
.qclass
, dq
.consumed
, dq
.local
, dq
.remote
, reinterpret_cast<dnsheader
*>(dq
.dh
), dq
.size
, dq
.len
, dq
.tcp
, dq
.queryTime
);
1429 #ifdef HAVE_PROTOBUF
1430 dr
.uniqueId
= dq
.uniqueId
;
1433 dr
.delayMsec
= dq
.delayMsec
;
1435 if (!applyRulesToResponse(cacheHit
? holders
.cacheHitRespRulactions
: holders
.selfAnsweredRespRulactions
, dr
)) {
1439 /* in case a rule changed it */
1440 dq
.delayMsec
= dr
.delayMsec
;
1442 #ifdef HAVE_DNSCRYPT
1444 if (!encryptResponse(reinterpret_cast<char*>(dq
.dh
), &dq
.len
, dq
.size
, dq
.tcp
, dq
.dnsCryptQuery
, nullptr, nullptr)) {
1448 #endif /* HAVE_DNSCRYPT */
1451 ++g_stats
.cacheHits
;
1454 switch (dr
.dh
->rcode
) {
1455 case RCode::NXDomain
:
1456 ++g_stats
.frontendNXDomain
;
1458 case RCode::ServFail
:
1459 ++g_stats
.frontendServFail
;
1461 case RCode::NoError
:
1462 ++g_stats
.frontendNoError
;
1466 doLatencyStats(0); // we're not going to measure this
1470 ProcessQueryResult
processQuery(DNSQuestion
& dq
, ClientState
& cs
, LocalHolders
& holders
, std::shared_ptr
<DownstreamState
>& selectedBackend
)
1472 const uint16_t queryId
= ntohs(dq
.dh
->id
);
1475 /* we need an accurate ("real") value for the response and
1476 to store into the IDS, but not for insertion into the
1477 rings for example */
1478 struct timespec now
;
1481 if (!applyRulesToQuery(holders
, dq
, now
)) {
1482 return ProcessQueryResult::Drop
;
1485 if(dq
.dh
->qr
) { // something turned it into a response
1486 fixUpQueryTurnedResponse(dq
, dq
.origFlags
);
1488 if (!prepareOutgoingResponse(holders
, cs
, dq
, false)) {
1489 return ProcessQueryResult::Drop
;
1492 ++g_stats
.selfAnswered
;
1494 return ProcessQueryResult::SendAnswer
;
1497 std::shared_ptr
<ServerPool
> serverPool
= getPool(*holders
.pools
, dq
.poolname
);
1498 dq
.packetCache
= serverPool
->packetCache
;
1499 auto policy
= *(holders
.policy
);
1500 if (serverPool
->policy
!= nullptr) {
1501 policy
= *(serverPool
->policy
);
1503 auto servers
= serverPool
->getServers();
1505 std::lock_guard
<std::mutex
> lock(g_luamutex
);
1506 selectedBackend
= policy
.policy(servers
, &dq
);
1509 selectedBackend
= policy
.policy(servers
, &dq
);
1512 uint16_t cachedResponseSize
= dq
.size
;
1513 uint32_t allowExpired
= selectedBackend
? 0 : g_staleCacheEntriesTTL
;
1515 if (dq
.packetCache
&& !dq
.skipCache
) {
1516 dq
.dnssecOK
= (getEDNSZ(dq
) & EDNS_HEADER_FLAG_DO
);
1519 if (dq
.useECS
&& ((selectedBackend
&& selectedBackend
->useECS
) || (!selectedBackend
&& serverPool
->getECS()))) {
1520 // we special case our cache in case a downstream explicitly gave us a universally valid response with a 0 scope
1521 // we need ECS parsing (parseECS) to be true so we can be sure that the initial incoming query did not have an existing
1522 // ECS option, which would make it unsuitable for the zero-scope feature.
1523 if (dq
.packetCache
&& !dq
.skipCache
&& (!selectedBackend
|| !selectedBackend
->disableZeroScope
) && dq
.packetCache
->isECSParsingEnabled()) {
1524 if (dq
.packetCache
->get(dq
, dq
.consumed
, dq
.dh
->id
, reinterpret_cast<char*>(dq
.dh
), &cachedResponseSize
, &dq
.cacheKeyNoECS
, dq
.subnet
, dq
.dnssecOK
, allowExpired
)) {
1525 dq
.len
= cachedResponseSize
;
1527 if (!prepareOutgoingResponse(holders
, cs
, dq
, true)) {
1528 return ProcessQueryResult::Drop
;
1531 return ProcessQueryResult::SendAnswer
;
1535 /* there was no existing ECS on the query, enable the zero-scope feature */
1536 dq
.useZeroScope
= true;
1540 if (!handleEDNSClientSubnet(dq
, dq
.ednsAdded
, dq
.ecsAdded
, g_preserveTrailingData
)) {
1541 vinfolog("Dropping query from %s because we couldn't insert the ECS value", dq
.remote
->toStringWithPort());
1542 return ProcessQueryResult::Drop
;
1546 if (dq
.packetCache
&& !dq
.skipCache
) {
1547 if (dq
.packetCache
->get(dq
, dq
.consumed
, dq
.dh
->id
, reinterpret_cast<char*>(dq
.dh
), &cachedResponseSize
, &dq
.cacheKey
, dq
.subnet
, dq
.dnssecOK
, allowExpired
)) {
1548 dq
.len
= cachedResponseSize
;
1550 if (!prepareOutgoingResponse(holders
, cs
, dq
, true)) {
1551 return ProcessQueryResult::Drop
;
1554 return ProcessQueryResult::SendAnswer
;
1556 ++g_stats
.cacheMisses
;
1559 if(!selectedBackend
) {
1562 vinfolog("%s query for %s|%s from %s, no policy applied", g_servFailOnNoPolicy
? "ServFailed" : "Dropped", dq
.qname
->toLogString(), QType(dq
.qtype
).getName(), dq
.remote
->toStringWithPort());
1563 if (g_servFailOnNoPolicy
) {
1564 restoreFlags(dq
.dh
, dq
.origFlags
);
1566 dq
.dh
->rcode
= RCode::ServFail
;
1569 if (!prepareOutgoingResponse(holders
, cs
, dq
, false)) {
1570 return ProcessQueryResult::Drop
;
1572 // no response-only statistics counter to update.
1573 return ProcessQueryResult::SendAnswer
;
1576 return ProcessQueryResult::Drop
;
1579 if (dq
.addXPF
&& selectedBackend
->xpfRRCode
!= 0) {
1580 addXPF(dq
, selectedBackend
->xpfRRCode
, g_preserveTrailingData
);
1583 selectedBackend
->queries
++;
1584 return ProcessQueryResult::PassToBackend
;
1586 catch(const std::exception
& e
){
1587 vinfolog("Got an error while parsing a %s query from %s, id %d: %s", (dq
.tcp
? "TCP" : "UDP"), dq
.remote
->toStringWithPort(), queryId
, e
.what());
1589 return ProcessQueryResult::Drop
;
1592 static void processUDPQuery(ClientState
& cs
, LocalHolders
& holders
, const struct msghdr
* msgh
, const ComboAddress
& remote
, ComboAddress
& dest
, char* query
, uint16_t len
, size_t queryBufferSize
, struct mmsghdr
* responsesVect
, unsigned int* queuedResponses
, struct iovec
* respIOV
, cmsgbuf_aligned
* respCBuf
)
1594 assert(responsesVect
== nullptr || (queuedResponses
!= nullptr && respIOV
!= nullptr && respCBuf
!= nullptr));
1595 uint16_t queryId
= 0;
1598 if (!isUDPQueryAcceptable(cs
, holders
, msgh
, remote
, dest
)) {
1602 /* we need an accurate ("real") value for the response and
1603 to store into the IDS, but not for insertion into the
1604 rings for example */
1605 struct timespec queryRealTime
;
1606 gettime(&queryRealTime
, true);
1608 std::shared_ptr
<DNSCryptQuery
> dnsCryptQuery
= nullptr;
1609 auto dnsCryptResponse
= checkDNSCryptQuery(cs
, query
, len
, dnsCryptQuery
, queryRealTime
.tv_sec
, false);
1610 if (dnsCryptResponse
) {
1611 sendUDPResponse(cs
.udpFD
, reinterpret_cast<char*>(dnsCryptResponse
->data()), static_cast<uint16_t>(dnsCryptResponse
->size()), 0, dest
, remote
);
1615 struct dnsheader
* dh
= reinterpret_cast<struct dnsheader
*>(query
);
1616 queryId
= ntohs(dh
->id
);
1618 if (!checkQueryHeaders(dh
)) {
1622 uint16_t qtype
, qclass
;
1623 unsigned int consumed
= 0;
1624 DNSName
qname(query
, len
, sizeof(dnsheader
), false, &qtype
, &qclass
, &consumed
);
1625 DNSQuestion
dq(&qname
, qtype
, qclass
, consumed
, dest
.sin4
.sin_family
!= 0 ? &dest
: &cs
.local
, &remote
, dh
, queryBufferSize
, len
, false, &queryRealTime
);
1626 dq
.dnsCryptQuery
= std::move(dnsCryptQuery
);
1627 std::shared_ptr
<DownstreamState
> ss
{nullptr};
1628 auto result
= processQuery(dq
, cs
, holders
, ss
);
1630 if (result
== ProcessQueryResult::Drop
) {
1634 if (result
== ProcessQueryResult::SendAnswer
) {
1635 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1636 if (dq
.delayMsec
== 0 && responsesVect
!= nullptr) {
1637 queueResponse(cs
, reinterpret_cast<char*>(dq
.dh
), dq
.len
, *dq
.local
, *dq
.remote
, responsesVect
[*queuedResponses
], respIOV
, respCBuf
);
1638 (*queuedResponses
)++;
1641 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1642 /* we use dest, always, because we don't want to use the listening address to send a response since it could be 0.0.0.0 */
1643 sendUDPResponse(cs
.udpFD
, reinterpret_cast<char*>(dq
.dh
), dq
.len
, dq
.delayMsec
, dest
, *dq
.remote
);
1647 if (result
!= ProcessQueryResult::PassToBackend
|| ss
== nullptr) {
1651 unsigned int idOffset
= (ss
->idOffset
++) % ss
->idStates
.size();
1652 IDState
* ids
= &ss
->idStates
[idOffset
];
1654 DOHUnit
* du
= nullptr;
1656 /* that means that the state was in use, possibly with an allocated
1657 DOHUnit that we will need to handle, but we can't touch it before
1658 confirming that we now own this state */
1659 if (ids
->isInUse()) {
1663 /* we atomically replace the value, we now own this state */
1664 if (!ids
->markAsUsed()) {
1665 /* the state was not in use.
1666 we reset 'du' because it might have still been in use when we read it. */
1671 /* we are reusing a state, no change in outstanding but if there was an existing DOHUnit we need
1672 to handle it because it's about to be overwritten. */
1675 ++g_stats
.downstreamTimeouts
;
1676 handleDOHTimeout(du
);
1680 ids
->origFD
= cs
.udpFD
;
1681 ids
->origID
= dh
->id
;
1682 setIDStateFromDNSQuestion(*ids
, dq
, std::move(qname
));
1684 /* If we couldn't harvest the real dest addr, still
1685 write down the listening addr since it will be useful
1686 (especially if it's not an 'any' one).
1687 We need to keep track of which one it is since we may
1688 want to use the real but not the listening addr to reply.
1690 if (dest
.sin4
.sin_family
!= 0) {
1691 ids
->origDest
= dest
;
1692 ids
->destHarvested
= true;
1695 ids
->origDest
= cs
.local
;
1696 ids
->destHarvested
= false;
1701 int fd
= pickBackendSocketForSending(ss
);
1702 ssize_t ret
= udpClientSendRequestToBackend(ss
, fd
, query
, dq
.len
);
1706 ++g_stats
.downstreamSendErrors
;
1709 vinfolog("Got query for %s|%s from %s, relayed to %s", ids
->qname
.toLogString(), QType(ids
->qtype
).getName(), remote
.toStringWithPort(), ss
->getName());
1711 catch(const std::exception
& e
){
1712 vinfolog("Got an error in UDP question thread while parsing a query from %s, id %d: %s", remote
.toStringWithPort(), queryId
, e
.what());
1716 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1717 static void MultipleMessagesUDPClientThread(ClientState
* cs
, LocalHolders
& holders
)
1721 char packet
[s_maxPacketCacheEntrySize
];
1722 ComboAddress remote
;
1725 /* used by HarvestDestinationAddress */
1726 cmsgbuf_aligned cbuf
;
1728 const size_t vectSize
= g_udpVectorSize
;
1729 /* the actual buffer is larger because:
1730 - we may have to add EDNS and/or ECS
1731 - we use it for self-generated responses (from rule or cache)
1732 but we only accept incoming payloads up to that size
1734 static_assert(s_udpIncomingBufferSize
<= sizeof(MMReceiver::packet
), "the incoming buffer size should not be larger than sizeof(MMReceiver::packet)");
1736 auto recvData
= std::unique_ptr
<MMReceiver
[]>(new MMReceiver
[vectSize
]);
1737 auto msgVec
= std::unique_ptr
<struct mmsghdr
[]>(new struct mmsghdr
[vectSize
]);
1738 auto outMsgVec
= std::unique_ptr
<struct mmsghdr
[]>(new struct mmsghdr
[vectSize
]);
1740 /* initialize the structures needed to receive our messages */
1741 for (size_t idx
= 0; idx
< vectSize
; idx
++) {
1742 recvData
[idx
].remote
.sin4
.sin_family
= cs
->local
.sin4
.sin_family
;
1743 fillMSGHdr(&msgVec
[idx
].msg_hdr
, &recvData
[idx
].iov
, &recvData
[idx
].cbuf
, sizeof(recvData
[idx
].cbuf
), recvData
[idx
].packet
, s_udpIncomingBufferSize
, &recvData
[idx
].remote
);
1749 /* reset the IO vector, since it's also used to send the vector of responses
1750 to avoid having to copy the data around */
1751 for (size_t idx
= 0; idx
< vectSize
; idx
++) {
1752 recvData
[idx
].iov
.iov_base
= recvData
[idx
].packet
;
1753 recvData
[idx
].iov
.iov_len
= sizeof(recvData
[idx
].packet
);
1756 /* block until we have at least one message ready, but return
1757 as many as possible to save the syscall costs */
1758 int msgsGot
= recvmmsg(cs
->udpFD
, msgVec
.get(), vectSize
, MSG_WAITFORONE
| MSG_TRUNC
, nullptr);
1761 vinfolog("Getting UDP messages via recvmmsg() failed with: %s", stringerror());
1765 unsigned int msgsToSend
= 0;
1767 /* process the received messages */
1768 for (int msgIdx
= 0; msgIdx
< msgsGot
; msgIdx
++) {
1769 const struct msghdr
* msgh
= &msgVec
[msgIdx
].msg_hdr
;
1770 unsigned int got
= msgVec
[msgIdx
].msg_len
;
1771 const ComboAddress
& remote
= recvData
[msgIdx
].remote
;
1773 if (static_cast<size_t>(got
) < sizeof(struct dnsheader
)) {
1774 ++g_stats
.nonCompliantQueries
;
1778 processUDPQuery(*cs
, holders
, msgh
, remote
, recvData
[msgIdx
].dest
, recvData
[msgIdx
].packet
, static_cast<uint16_t>(got
), sizeof(recvData
[msgIdx
].packet
), outMsgVec
.get(), &msgsToSend
, &recvData
[msgIdx
].iov
, &recvData
[msgIdx
].cbuf
);
1782 /* immediate (not delayed or sent to a backend) responses (mostly from a rule, dynamic block
1783 or the cache) can be sent in batch too */
1785 if (msgsToSend
> 0 && msgsToSend
<= static_cast<unsigned int>(msgsGot
)) {
1786 int sent
= sendmmsg(cs
->udpFD
, outMsgVec
.get(), msgsToSend
, 0);
1788 if (sent
< 0 || static_cast<unsigned int>(sent
) != msgsToSend
) {
1789 vinfolog("Error sending responses with sendmmsg() (%d on %u): %s", sent
, msgsToSend
, stringerror());
1795 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1797 // listens to incoming queries, sends out to downstream servers, noting the intended return path
1798 static void udpClientThread(ClientState
* cs
)
1801 setThreadName("dnsdist/udpClie");
1802 LocalHolders holders
;
1804 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1805 if (g_udpVectorSize
> 1) {
1806 MultipleMessagesUDPClientThread(cs
, holders
);
1810 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1812 char packet
[s_maxPacketCacheEntrySize
];
1813 /* the actual buffer is larger because:
1814 - we may have to add EDNS and/or ECS
1815 - we use it for self-generated responses (from rule or cache)
1816 but we only accept incoming payloads up to that size
1818 static_assert(s_udpIncomingBufferSize
<= sizeof(packet
), "the incoming buffer size should not be larger than sizeof(MMReceiver::packet)");
1821 /* used by HarvestDestinationAddress */
1822 cmsgbuf_aligned cbuf
;
1824 ComboAddress remote
;
1826 remote
.sin4
.sin_family
= cs
->local
.sin4
.sin_family
;
1827 fillMSGHdr(&msgh
, &iov
, &cbuf
, sizeof(cbuf
), packet
, s_udpIncomingBufferSize
, &remote
);
1830 ssize_t got
= recvmsg(cs
->udpFD
, &msgh
, 0);
1832 if (got
< 0 || static_cast<size_t>(got
) < sizeof(struct dnsheader
)) {
1833 ++g_stats
.nonCompliantQueries
;
1837 processUDPQuery(*cs
, holders
, &msgh
, remote
, dest
, packet
, static_cast<uint16_t>(got
), sizeof(packet
), nullptr, nullptr, nullptr, nullptr);
1841 catch(const std::exception
&e
)
1843 errlog("UDP client thread died because of exception: %s", e
.what());
1845 catch(const PDNSException
&e
)
1847 errlog("UDP client thread died because of PowerDNS exception: %s", e
.reason
);
1851 errlog("UDP client thread died because of an exception: %s", "unknown");
1854 uint16_t getRandomDNSID()
1856 #ifdef HAVE_LIBSODIUM
1857 return randombytes_uniform(65536);
1859 return (random() % 65536);
1863 uint64_t g_maxTCPClientThreads
{10};
1864 std::atomic
<uint16_t> g_cacheCleaningDelay
{60};
1865 std::atomic
<uint16_t> g_cacheCleaningPercentage
{100};
1869 setThreadName("dnsdist/main");
1872 int32_t secondsToWaitLog
= 0;
1878 std::lock_guard
<std::mutex
> lock(g_luamutex
);
1879 auto f
= g_lua
.readVariable
<boost::optional
<std::function
<void()> > >("maintenance");
1883 secondsToWaitLog
= 0;
1885 catch(std::exception
&e
) {
1886 if (secondsToWaitLog
<= 0) {
1887 infolog("Error during execution of maintenance function: %s", e
.what());
1888 secondsToWaitLog
= 61;
1890 secondsToWaitLog
-= interval
;
1896 if (counter
>= g_cacheCleaningDelay
) {
1897 /* keep track, for each cache, of whether we should keep
1899 std::map
<std::shared_ptr
<DNSDistPacketCache
>, bool> caches
;
1901 /* gather all caches actually used by at least one pool, and see
1902 if something prevents us from cleaning the expired entries */
1903 auto localPools
= g_pools
.getLocal();
1904 for (const auto& entry
: *localPools
) {
1905 auto& pool
= entry
.second
;
1907 auto packetCache
= pool
->packetCache
;
1912 auto pair
= caches
.insert({packetCache
, false});
1913 auto& iter
= pair
.first
;
1914 /* if we need to keep stale data for this cache (ie, not clear
1915 expired entries when at least one pool using this cache
1916 has all its backends down) */
1917 if (packetCache
->keepStaleData() && iter
->second
== false) {
1918 /* so far all pools had at least one backend up */
1919 if (pool
->countServers(true) == 0) {
1920 iter
->second
= true;
1925 for (auto pair
: caches
) {
1926 /* shall we keep expired entries ? */
1927 if (pair
.second
== true) {
1930 auto& packetCache
= pair
.first
;
1931 size_t upTo
= (packetCache
->getMaxEntries()* (100 - g_cacheCleaningPercentage
)) / 100;
1932 packetCache
->purgeExpired(upTo
);
1937 // ponder pruning g_dynblocks of expired entries here
1941 static void secPollThread()
1943 setThreadName("dnsdist/secpoll");
1947 doSecPoll(g_secPollSuffix
);
1951 sleep(g_secPollInterval
);
1955 static void healthChecksThread()
1957 setThreadName("dnsdist/healthC");
1959 static const int interval
= 1;
1964 if(g_tcpclientthreads
->getQueuedCount() > 1 && !g_tcpclientthreads
->hasReachedMaxThreads()) {
1965 g_tcpclientthreads
->addTCPClientThread();
1968 auto mplexer
= std::shared_ptr
<FDMultiplexer
>(FDMultiplexer::getMultiplexerSilent());
1969 auto states
= g_dstates
.getLocal(); // this points to the actual shared_ptrs!
1970 for(auto& dss
: *states
) {
1971 if(++dss
->lastCheck
< dss
->checkInterval
) {
1977 if (dss
->availability
== DownstreamState::Availability::Auto
) {
1978 if (!queueHealthCheck(mplexer
, dss
)) {
1979 updateHealthCheckResult(dss
, false);
1983 auto delta
= dss
->sw
.udiffAndSet()/1000000.0;
1984 dss
->queryLoad
= 1.0*(dss
->queries
.load() - dss
->prev
.queries
.load())/delta
;
1985 dss
->dropRate
= 1.0*(dss
->reuseds
.load() - dss
->prev
.reuseds
.load())/delta
;
1986 dss
->prev
.queries
.store(dss
->queries
.load());
1987 dss
->prev
.reuseds
.store(dss
->reuseds
.load());
1989 for (IDState
& ids
: dss
->idStates
) { // timeouts
1990 int64_t usageIndicator
= ids
.usageIndicator
;
1991 if(IDState::isInUse(usageIndicator
) && ids
.age
++ > g_udpTimeout
) {
1992 /* We mark the state as unused as soon as possible
1993 to limit the risk of racing with the
1996 auto oldDU
= ids
.du
;
1998 if (!ids
.tryMarkUnused(usageIndicator
)) {
1999 /* this state has been altered in the meantime,
2000 don't go anywhere near it */
2004 handleDOHTimeout(oldDU
);
2008 ++g_stats
.downstreamTimeouts
; // this is an 'actively' discovered timeout
2009 vinfolog("Had a downstream timeout from %s (%s) for query for %s|%s from %s",
2010 dss
->remote
.toStringWithPort(), dss
->name
,
2011 ids
.qname
.toLogString(), QType(ids
.qtype
).getName(), ids
.origRemote
.toStringWithPort());
2016 struct dnsheader fake
;
2017 memset(&fake
, 0, sizeof(fake
));
2018 fake
.id
= ids
.origID
;
2020 g_rings
.insertResponse(ts
, ids
.origRemote
, ids
.qname
, ids
.qtype
, std::numeric_limits
<unsigned int>::max(), 0, fake
, dss
->remote
);
2025 handleQueuedHealthChecks(mplexer
);
2029 static void bindAny(int af
, int sock
)
2031 __attribute__((unused
)) int one
= 1;
2034 if (setsockopt(sock
, IPPROTO_IP
, IP_FREEBIND
, &one
, sizeof(one
)) < 0)
2035 warnlog("Warning: IP_FREEBIND setsockopt failed: %s", stringerror());
2040 if (setsockopt(sock
, IPPROTO_IP
, IP_BINDANY
, &one
, sizeof(one
)) < 0)
2041 warnlog("Warning: IP_BINDANY setsockopt failed: %s", stringerror());
2045 if (setsockopt(sock
, IPPROTO_IPV6
, IPV6_BINDANY
, &one
, sizeof(one
)) < 0)
2046 warnlog("Warning: IPV6_BINDANY setsockopt failed: %s", stringerror());
2049 if (setsockopt(sock
, SOL_SOCKET
, SO_BINDANY
, &one
, sizeof(one
)) < 0)
2050 warnlog("Warning: SO_BINDANY setsockopt failed: %s", stringerror());
2054 static void dropGroupPrivs(gid_t gid
)
2057 if (setgid(gid
) == 0) {
2058 if (setgroups(0, NULL
) < 0) {
2059 warnlog("Warning: Unable to drop supplementary gids: %s", stringerror());
2063 warnlog("Warning: Unable to set group ID to %d: %s", gid
, stringerror());
2068 static void dropUserPrivs(uid_t uid
)
2071 if(setuid(uid
) < 0) {
2072 warnlog("Warning: Unable to set user ID to %d: %s", uid
, stringerror());
2077 static void checkFileDescriptorsLimits(size_t udpBindsCount
, size_t tcpBindsCount
)
2079 /* stdin, stdout, stderr */
2080 size_t requiredFDsCount
= 3;
2081 auto backends
= g_dstates
.getLocal();
2082 /* UDP sockets to backends */
2083 size_t backendUDPSocketsCount
= 0;
2084 for (const auto& backend
: *backends
) {
2085 backendUDPSocketsCount
+= backend
->sockets
.size();
2087 requiredFDsCount
+= backendUDPSocketsCount
;
2088 /* TCP sockets to backends */
2089 requiredFDsCount
+= (backends
->size() * g_maxTCPClientThreads
);
2090 /* listening sockets */
2091 requiredFDsCount
+= udpBindsCount
;
2092 requiredFDsCount
+= tcpBindsCount
;
2093 /* max TCP connections currently served */
2094 requiredFDsCount
+= g_maxTCPClientThreads
;
2095 /* max pipes for communicating between TCP acceptors and client threads */
2096 requiredFDsCount
+= (g_maxTCPClientThreads
* 2);
2097 /* max TCP queued connections */
2098 requiredFDsCount
+= g_maxTCPQueuedConnections
;
2099 /* DelayPipe pipe */
2100 requiredFDsCount
+= 2;
2103 /* webserver main socket */
2105 /* console main socket */
2112 getrlimit(RLIMIT_NOFILE
, &rl
);
2113 if (rl
.rlim_cur
<= requiredFDsCount
) {
2114 warnlog("Warning, this configuration can use more than %d file descriptors, web server and console connections not included, and the current limit is %d.", std::to_string(requiredFDsCount
), std::to_string(rl
.rlim_cur
));
2116 warnlog("You can increase this value by using LimitNOFILE= in the systemd unit file or ulimit.");
2118 warnlog("You can increase this value by using ulimit.");
2123 static void setUpLocalBind(std::unique_ptr
<ClientState
>& cs
)
2125 /* skip some warnings if there is an identical UDP context */
2126 bool warn
= cs
->tcp
== false || cs
->tlsFrontend
!= nullptr || cs
->dohFrontend
!= nullptr;
2127 int& fd
= cs
->tcp
== false ? cs
->udpFD
: cs
->tcpFD
;
2130 fd
= SSocket(cs
->local
.sin4
.sin_family
, cs
->tcp
== false ? SOCK_DGRAM
: SOCK_STREAM
, 0);
2133 SSetsockopt(fd
, SOL_SOCKET
, SO_REUSEADDR
, 1);
2134 #ifdef TCP_DEFER_ACCEPT
2135 SSetsockopt(fd
, IPPROTO_TCP
, TCP_DEFER_ACCEPT
, 1);
2137 if (cs
->fastOpenQueueSize
> 0) {
2139 SSetsockopt(fd
, IPPROTO_TCP
, TCP_FASTOPEN
, cs
->fastOpenQueueSize
);
2142 warnlog("TCP Fast Open has been configured on local address '%s' but is not supported", cs
->local
.toStringWithPort());
2148 if(cs
->local
.sin4
.sin_family
== AF_INET6
) {
2149 SSetsockopt(fd
, IPPROTO_IPV6
, IPV6_V6ONLY
, 1);
2152 bindAny(cs
->local
.sin4
.sin_family
, fd
);
2154 if(!cs
->tcp
&& IsAnyAddress(cs
->local
)) {
2156 setsockopt(fd
, IPPROTO_IP
, GEN_IP_PKTINFO
, &one
, sizeof(one
)); // linux supports this, so why not - might fail on other systems
2157 #ifdef IPV6_RECVPKTINFO
2158 setsockopt(fd
, IPPROTO_IPV6
, IPV6_RECVPKTINFO
, &one
, sizeof(one
));
2162 if (cs
->reuseport
) {
2164 SSetsockopt(fd
, SOL_SOCKET
, SO_REUSEPORT
, 1);
2167 /* no need to warn again if configured but support is not available, we already did for UDP */
2168 warnlog("SO_REUSEPORT has been configured on local address '%s' but is not supported", cs
->local
.toStringWithPort());
2174 if (cs
->local
.isIPv4()) {
2176 setSocketIgnorePMTU(cs
->udpFD
);
2178 catch(const std::exception
& e
) {
2179 warnlog("Failed to set IP_MTU_DISCOVER on UDP server socket for local address '%s': %s", cs
->local
.toStringWithPort(), e
.what());
2184 const std::string
& itf
= cs
->interface
;
2186 #ifdef SO_BINDTODEVICE
2187 int res
= setsockopt(fd
, SOL_SOCKET
, SO_BINDTODEVICE
, itf
.c_str(), itf
.length());
2189 warnlog("Error setting up the interface on local address '%s': %s", cs
->local
.toStringWithPort(), stringerror());
2193 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", cs
->local
.toStringWithPort());
2199 if (g_defaultBPFFilter
) {
2200 cs
->attachFilter(g_defaultBPFFilter
);
2201 vinfolog("Attaching default BPF Filter to %s frontend %s", (!cs
->tcp
? "UDP" : "TCP"), cs
->local
.toStringWithPort());
2203 #endif /* HAVE_EBPF */
2205 if (cs
->tlsFrontend
!= nullptr) {
2206 if (!cs
->tlsFrontend
->setupTLS()) {
2207 errlog("Error while setting up TLS on local address '%s', exiting", cs
->local
.toStringWithPort());
2208 _exit(EXIT_FAILURE
);
2212 if (cs
->dohFrontend
!= nullptr) {
2213 cs
->dohFrontend
->setup();
2216 SBind(fd
, cs
->local
);
2219 SListen(cs
->tcpFD
, SOMAXCONN
);
2220 if (cs
->tlsFrontend
!= nullptr) {
2221 warnlog("Listening on %s for TLS", cs
->local
.toStringWithPort());
2223 else if (cs
->dohFrontend
!= nullptr) {
2224 warnlog("Listening on %s for DoH", cs
->local
.toStringWithPort());
2226 else if (cs
->dnscryptCtx
!= nullptr) {
2227 warnlog("Listening on %s for DNSCrypt", cs
->local
.toStringWithPort());
2230 warnlog("Listening on %s", cs
->local
.toStringWithPort());
2239 vector
<string
> locals
;
2240 vector
<string
> remotes
;
2241 bool checkConfig
{false};
2242 bool beClient
{false};
2243 bool beSupervised
{false};
2250 std::atomic
<bool> g_configurationDone
{false};
2255 cout
<<"Syntax: dnsdist [-C,--config file] [-c,--client [IP[:PORT]]]\n";
2256 cout
<<"[-e,--execute cmd] [-h,--help] [-l,--local addr]\n";
2257 cout
<<"[-v,--verbose] [--check-config] [--version]\n";
2259 cout
<<"-a,--acl netmask Add this netmask to the ACL\n";
2260 cout
<<"-C,--config file Load configuration from 'file'\n";
2261 cout
<<"-c,--client Operate as a client, connect to dnsdist. This reads\n";
2262 cout
<<" controlSocket from your configuration file, but also\n";
2263 cout
<<" accepts an IP:PORT argument\n";
2264 #ifdef HAVE_LIBSODIUM
2265 cout
<<"-k,--setkey KEY Use KEY for encrypted communication to dnsdist. This\n";
2266 cout
<<" is similar to setting setKey in the configuration file.\n";
2267 cout
<<" NOTE: this will leak this key in your shell's history\n";
2268 cout
<<" and in the systems running process list.\n";
2270 cout
<<"--check-config Validate the configuration file and exit. The exit-code\n";
2271 cout
<<" reflects the validation, 0 is OK, 1 means an error.\n";
2272 cout
<<" Any errors are printed as well.\n";
2273 cout
<<"-e,--execute cmd Connect to dnsdist and execute 'cmd'\n";
2274 cout
<<"-g,--gid gid Change the process group ID after binding sockets\n";
2275 cout
<<"-h,--help Display this helpful message\n";
2276 cout
<<"-l,--local address Listen on this local address\n";
2277 cout
<<"--supervised Don't open a console, I'm supervised\n";
2278 cout
<<" (use with e.g. systemd and daemontools)\n";
2279 cout
<<"--disable-syslog Don't log to syslog, only to stdout\n";
2280 cout
<<" (use with e.g. systemd)\n";
2281 cout
<<"-u,--uid uid Change the process user ID after binding sockets\n";
2282 cout
<<"-v,--verbose Enable verbose mode\n";
2283 cout
<<"-V,--version Show dnsdist version information and exit\n";
2286 int main(int argc
, char** argv
)
2289 size_t udpBindsCount
= 0;
2290 size_t tcpBindsCount
= 0;
2291 rl_attempted_completion_function
= my_completion
;
2292 rl_completion_append_character
= 0;
2294 signal(SIGPIPE
, SIG_IGN
);
2295 signal(SIGCHLD
, SIG_IGN
);
2296 openlog("dnsdist", LOG_PID
|LOG_NDELAY
, LOG_DAEMON
);
2298 #ifdef HAVE_LIBSODIUM
2299 if (sodium_init() == -1) {
2300 cerr
<<"Unable to initialize crypto library"<<endl
;
2303 g_hashperturb
=randombytes_uniform(0xffffffff);
2304 srandom(randombytes_uniform(0xffffffff));
2308 gettimeofday(&tv
, 0);
2309 srandom(tv
.tv_sec
^ tv
.tv_usec
^ getpid());
2310 g_hashperturb
=random();
2314 ComboAddress clientAddress
= ComboAddress();
2315 g_cmdLine
.config
=SYSCONFDIR
"/dnsdist.conf";
2316 struct option longopts
[]={
2317 {"acl", required_argument
, 0, 'a'},
2318 {"check-config", no_argument
, 0, 1},
2319 {"client", no_argument
, 0, 'c'},
2320 {"config", required_argument
, 0, 'C'},
2321 {"disable-syslog", no_argument
, 0, 2},
2322 {"execute", required_argument
, 0, 'e'},
2323 {"gid", required_argument
, 0, 'g'},
2324 {"help", no_argument
, 0, 'h'},
2325 {"local", required_argument
, 0, 'l'},
2326 {"setkey", required_argument
, 0, 'k'},
2327 {"supervised", no_argument
, 0, 3},
2328 {"uid", required_argument
, 0, 'u'},
2329 {"verbose", no_argument
, 0, 'v'},
2330 {"version", no_argument
, 0, 'V'},
2336 int c
=getopt_long(argc
, argv
, "a:cC:e:g:hk:l:u:vV", longopts
, &longindex
);
2341 g_cmdLine
.checkConfig
=true;
2347 g_cmdLine
.beSupervised
=true;
2350 g_cmdLine
.config
=optarg
;
2353 g_cmdLine
.beClient
=true;
2356 g_cmdLine
.command
=optarg
;
2359 g_cmdLine
.gid
=optarg
;
2362 cout
<<"dnsdist "<<VERSION
<<endl
;
2369 g_ACL
.modify([optstring
](NetmaskGroup
& nmg
) { nmg
.addMask(optstring
); });
2372 #ifdef HAVE_LIBSODIUM
2373 if (B64Decode(string(optarg
), g_consoleKey
) < 0) {
2374 cerr
<<"Unable to decode key '"<<optarg
<<"'."<<endl
;
2378 cerr
<<"dnsdist has been built without libsodium, -k/--setkey is unsupported."<<endl
;
2383 g_cmdLine
.locals
.push_back(trim_copy(string(optarg
)));
2386 g_cmdLine
.uid
=optarg
;
2392 #ifdef LUAJIT_VERSION
2393 cout
<<"dnsdist "<<VERSION
<<" ("<<LUA_RELEASE
<<" ["<<LUAJIT_VERSION
<<"])"<<endl
;
2395 cout
<<"dnsdist "<<VERSION
<<" ("<<LUA_RELEASE
<<")"<<endl
;
2397 cout
<<"Enabled features: ";
2401 #ifdef HAVE_DNS_OVER_TLS
2402 cout
<<"dns-over-tls(";
2414 #ifdef HAVE_DNS_OVER_HTTPS
2415 cout
<<"dns-over-https(DOH) ";
2417 #ifdef HAVE_DNSCRYPT
2426 #ifdef HAVE_LIBCRYPTO
2429 #ifdef HAVE_LIBSODIUM
2435 #ifdef HAVE_PROTOBUF
2441 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
2442 cout
<<"recvmmsg/sendmmsg ";
2444 #ifdef HAVE_NET_SNMP
2454 //getopt_long printed an error message.
2463 for(auto p
= argv
; *p
; ++p
) {
2464 if(g_cmdLine
.beClient
) {
2465 clientAddress
= ComboAddress(*p
, 5199);
2467 g_cmdLine
.remotes
.push_back(*p
);
2471 ServerPolicy leastOutstandingPol
{"leastOutstanding", leastOutstanding
, false};
2473 g_policy
.setState(leastOutstandingPol
);
2474 if(g_cmdLine
.beClient
|| !g_cmdLine
.command
.empty()) {
2475 setupLua(true, false, g_cmdLine
.config
);
2476 if (clientAddress
!= ComboAddress())
2477 g_serverControl
= clientAddress
;
2478 doClient(g_serverControl
, g_cmdLine
.command
);
2479 _exit(EXIT_SUCCESS
);
2482 auto acl
= g_ACL
.getCopy();
2484 for(auto& addr
: {"127.0.0.0/8", "10.0.0.0/8", "100.64.0.0/10", "169.254.0.0/16", "192.168.0.0/16", "172.16.0.0/12", "::1/128", "fc00::/7", "fe80::/10"})
2486 g_ACL
.setState(acl
);
2489 auto consoleACL
= g_consoleACL
.getCopy();
2490 for (const auto& mask
: { "127.0.0.1/8", "::1/128" }) {
2491 consoleACL
.addMask(mask
);
2493 g_consoleACL
.setState(consoleACL
);
2495 if (g_cmdLine
.checkConfig
) {
2496 setupLua(false, true, g_cmdLine
.config
);
2497 // No exception was thrown
2498 infolog("Configuration '%s' OK!", g_cmdLine
.config
);
2499 _exit(EXIT_SUCCESS
);
2502 auto todo
=setupLua(false, false, g_cmdLine
.config
);
2504 auto localPools
= g_pools
.getCopy();
2506 bool precompute
= false;
2507 if (g_policy
.getLocal()->name
== "chashed") {
2510 for (const auto& entry
: localPools
) {
2511 if (entry
.second
->policy
!= nullptr && entry
.second
->policy
->name
== "chashed") {
2518 vinfolog("Pre-computing hashes for consistent hash load-balancing policy");
2519 // pre compute hashes
2520 auto backends
= g_dstates
.getLocal();
2521 for (auto& backend
: *backends
) {
2527 if (!g_cmdLine
.locals
.empty()) {
2528 for (auto it
= g_frontends
.begin(); it
!= g_frontends
.end(); ) {
2529 /* DoH, DoT and DNSCrypt frontends are separate */
2530 if ((*it
)->dohFrontend
== nullptr && (*it
)->tlsFrontend
== nullptr && (*it
)->dnscryptCtx
== nullptr) {
2531 it
= g_frontends
.erase(it
);
2538 for(const auto& loc
: g_cmdLine
.locals
) {
2540 g_frontends
.push_back(std::unique_ptr
<ClientState
>(new ClientState(ComboAddress(loc
, 53), false, false, 0, "", {})));
2542 g_frontends
.push_back(std::unique_ptr
<ClientState
>(new ClientState(ComboAddress(loc
, 53), true, false, 0, "", {})));
2546 if (g_frontends
.empty()) {
2548 g_frontends
.push_back(std::unique_ptr
<ClientState
>(new ClientState(ComboAddress("127.0.0.1", 53), false, false, 0, "", {})));
2550 g_frontends
.push_back(std::unique_ptr
<ClientState
>(new ClientState(ComboAddress("127.0.0.1", 53), true, false, 0, "", {})));
2553 g_configurationDone
= true;
2555 for(auto& frontend
: g_frontends
) {
2556 setUpLocalBind(frontend
);
2558 if (frontend
->tcp
== false) {
2566 warnlog("dnsdist %s comes with ABSOLUTELY NO WARRANTY. This is free software, and you are welcome to redistribute it according to the terms of the GPL version 2", VERSION
);
2570 g_ACL
.getLocal()->toStringVector(&vec
);
2571 for(const auto& s
: vec
) {
2576 infolog("ACL allowing queries from: %s", acls
.c_str());
2579 g_consoleACL
.getLocal()->toStringVector(&vec
);
2580 for (const auto& entry
: vec
) {
2581 if (!acls
.empty()) {
2586 infolog("Console ACL allowing connections from: %s", acls
.c_str());
2588 #ifdef HAVE_LIBSODIUM
2589 if (g_consoleEnabled
&& g_consoleKey
.empty()) {
2590 warnlog("Warning, the console has been enabled via 'controlSocket()' but no key has been set with 'setKey()' so all connections will fail until a key has been set");
2594 uid_t newgid
=getegid();
2595 gid_t newuid
=geteuid();
2597 if(!g_cmdLine
.gid
.empty())
2598 newgid
= strToGID(g_cmdLine
.gid
.c_str());
2600 if(!g_cmdLine
.uid
.empty())
2601 newuid
= strToUID(g_cmdLine
.uid
.c_str());
2603 if (getegid() != newgid
) {
2604 if (running_in_service_mgr()) {
2605 errlog("--gid/-g set on command-line, but dnsdist was started as a systemd service. Use the 'Group' setting in the systemd unit file to set the group to run as");
2606 _exit(EXIT_FAILURE
);
2608 dropGroupPrivs(newgid
);
2611 if (geteuid() != newuid
) {
2612 if (running_in_service_mgr()) {
2613 errlog("--uid/-u set on command-line, but dnsdist was started as a systemd service. Use the 'User' setting in the systemd unit file to set the user to run as");
2614 _exit(EXIT_FAILURE
);
2616 dropUserPrivs(newuid
);
2620 /* we might still have capabilities remaining,
2621 for example if we have been started as root
2622 without --uid or --gid (please don't do that)
2623 or as an unprivileged user with ambient
2624 capabilities like CAP_NET_BIND_SERVICE.
2626 dropCapabilities(g_capabilitiesToRetain
);
2628 catch(const std::exception
& e
) {
2629 warnlog("%s", e
.what());
2632 /* this need to be done _after_ dropping privileges */
2633 g_delay
= new DelayPipe
<DelayedPacket
>();
2639 g_tcpclientthreads
= std::unique_ptr
<TCPClientCollection
>(new TCPClientCollection(g_maxTCPClientThreads
, g_useTCPSinglePipe
));
2644 localPools
= g_pools
.getCopy();
2645 /* create the default pool no matter what */
2646 createPoolIfNotExists(localPools
, "");
2647 if(g_cmdLine
.remotes
.size()) {
2648 for(const auto& address
: g_cmdLine
.remotes
) {
2649 auto ret
=std::make_shared
<DownstreamState
>(ComboAddress(address
, 53));
2650 addServerToPool(localPools
, "", ret
);
2651 if (ret
->connected
&& !ret
->threadStarted
.test_and_set()) {
2652 ret
->tid
= thread(responderThread
, ret
);
2654 g_dstates
.modify([ret
](servers_t
& servers
) { servers
.push_back(ret
); });
2657 g_pools
.setState(localPools
);
2659 if(g_dstates
.getLocal()->empty()) {
2660 errlog("No downstream servers defined: all packets will get dropped");
2661 // you might define them later, but you need to know
2664 checkFileDescriptorsLimits(udpBindsCount
, tcpBindsCount
);
2666 auto mplexer
= std::shared_ptr
<FDMultiplexer
>(FDMultiplexer::getMultiplexerSilent());
2667 for(auto& dss
: g_dstates
.getCopy()) { // it is a copy, but the internal shared_ptrs are the real deal
2668 if (dss
->availability
== DownstreamState::Availability::Auto
) {
2669 if (!queueHealthCheck(mplexer
, dss
, true)) {
2670 dss
->upStatus
= false;
2671 warnlog("Marking downstream %s as 'down'", dss
->getNameWithAddr());
2675 handleQueuedHealthChecks(mplexer
, true);
2677 for(auto& cs
: g_frontends
) {
2678 if (cs
->dohFrontend
!= nullptr) {
2679 #ifdef HAVE_DNS_OVER_HTTPS
2680 std::thread
t1(dohThread
, cs
.get());
2681 if (!cs
->cpus
.empty()) {
2682 mapThreadToCPUList(t1
.native_handle(), cs
->cpus
);
2685 #endif /* HAVE_DNS_OVER_HTTPS */
2688 if (cs
->udpFD
>= 0) {
2689 thread
t1(udpClientThread
, cs
.get());
2690 if (!cs
->cpus
.empty()) {
2691 mapThreadToCPUList(t1
.native_handle(), cs
->cpus
);
2695 else if (cs
->tcpFD
>= 0) {
2696 thread
t1(tcpAcceptorThread
, cs
.get());
2697 if (!cs
->cpus
.empty()) {
2698 mapThreadToCPUList(t1
.native_handle(), cs
->cpus
);
2704 thread
carbonthread(carbonDumpThread
);
2705 carbonthread
.detach();
2707 thread
stattid(maintThread
);
2710 thread
healththread(healthChecksThread
);
2712 if (!g_secPollSuffix
.empty()) {
2713 thread
secpollthread(secPollThread
);
2714 secpollthread
.detach();
2717 if(g_cmdLine
.beSupervised
) {
2719 sd_notify(0, "READY=1");
2721 healththread
.join();
2724 healththread
.detach();
2727 _exit(EXIT_SUCCESS
);
2730 catch(const LuaContext::ExecutionErrorException
& e
) {
2732 errlog("Fatal Lua error: %s", e
.what());
2733 std::rethrow_if_nested(e
);
2734 } catch(const std::exception
& ne
) {
2735 errlog("Details: %s", ne
.what());
2737 catch(PDNSException
&ae
)
2739 errlog("Fatal pdns error: %s", ae
.reason
);
2741 _exit(EXIT_FAILURE
);
2743 catch(std::exception
&e
)
2745 errlog("Fatal error: %s", e
.what());
2746 _exit(EXIT_FAILURE
);
2748 catch(PDNSException
&ae
)
2750 errlog("Fatal pdns error: %s", ae
.reason
);
2751 _exit(EXIT_FAILURE
);
2754 uint64_t getLatencyCount(const std::string
&)
2756 return g_stats
.responses
+ g_stats
.selfAnswered
+ g_stats
.cacheHits
;