2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
29 #include <netinet/tcp.h>
31 #include <sys/resource.h>
34 #if defined (__OpenBSD__) || defined(__NetBSD__)
35 #include <readline/readline.h>
37 #include <editline/readline.h>
41 #include <systemd/sd-daemon.h>
45 #include "dnsdist-cache.hh"
46 #include "dnsdist-console.hh"
47 #include "dnsdist-ecs.hh"
48 #include "dnsdist-lua.hh"
49 #include "dnsdist-rings.hh"
52 #include "delaypipe.hh"
55 #include "dnsparser.hh"
56 #include "dnswriter.hh"
57 #include "ednsoptions.hh"
61 #include "sodcrypto.hh"
65 thread_local
boost::uuids::random_generator t_uuidGenerator
;
69 Receiver is currently single threaded
70 not *that* bad actually, but now that we are thread safe, might want to scale
74 Set of Rules, if one matches, it leads to an Action
75 Both rules and actions could conceivably be Lua based.
76 On the C++ side, both could be inherited from a class Rule and a class Action,
77 on the Lua side we can't do that. */
83 struct DNSDistStats g_stats
;
84 MetricDefinitionStorage g_metricDefinitions
;
86 uint16_t g_maxOutstanding
{10240};
87 bool g_verboseHealthChecks
{false};
88 uint32_t g_staleCacheEntriesTTL
{0};
91 GlobalStateHolder
<NetmaskGroup
> g_ACL
;
92 string g_outputBuffer
;
94 vector
<std::tuple
<ComboAddress
, bool, bool, int, string
, std::set
<int>>> g_locals
;
95 std::vector
<std::shared_ptr
<TLSFrontend
>> g_tlslocals
;
97 std::vector
<std::tuple
<ComboAddress
,std::shared_ptr
<DNSCryptContext
>,bool, int, string
, std::set
<int> >> g_dnsCryptLocals
;
100 shared_ptr
<BPFFilter
> g_defaultBPFFilter
;
101 std::vector
<std::shared_ptr
<DynBPFFilter
> > g_dynBPFFilters
;
102 #endif /* HAVE_EBPF */
103 vector
<ClientState
*> g_frontends
;
104 GlobalStateHolder
<pools_t
> g_pools
;
105 size_t g_udpVectorSize
{1};
107 bool g_snmpEnabled
{false};
108 bool g_snmpTrapsEnabled
{false};
109 DNSDistSNMPAgent
* g_snmpAgent
{nullptr};
111 /* UDP: the grand design. Per socket we listen on for incoming queries there is one thread.
112 Then we have a bunch of connected sockets for talking to downstream servers.
113 We send directly to those sockets.
115 For the return path, per downstream server we have a thread that listens to responses.
117 Per socket there is an array of 2^16 states, when we send out a packet downstream, we note
118 there the original requestor and the original id. The new ID is the offset in the array.
120 When an answer comes in on a socket, we look up the offset by the id, and lob it to the
123 IDs are assigned by atomic increments of the socket offset.
126 GlobalStateHolder
<vector
<DNSDistRuleAction
> > g_rulactions
;
127 GlobalStateHolder
<vector
<DNSDistResponseRuleAction
> > g_resprulactions
;
128 GlobalStateHolder
<vector
<DNSDistResponseRuleAction
> > g_cachehitresprulactions
;
129 GlobalStateHolder
<vector
<DNSDistResponseRuleAction
> > g_selfansweredresprulactions
;
134 GlobalStateHolder
<servers_t
> g_dstates
;
135 GlobalStateHolder
<NetmaskTree
<DynBlock
>> g_dynblockNMG
;
136 GlobalStateHolder
<SuffixMatchTree
<DynBlock
>> g_dynblockSMT
;
137 DNSAction::Action g_dynBlockAction
= DNSAction::Action::Drop
;
138 int g_tcpRecvTimeout
{2};
139 int g_tcpSendTimeout
{2};
142 bool g_servFailOnNoPolicy
{false};
143 bool g_truncateTC
{false};
146 static void truncateTC(char* packet
, uint16_t* len
, size_t responseSize
, unsigned int consumed
)
149 bool hadEDNS
= false;
150 uint16_t payloadSize
= 0;
153 if (g_addEDNSToSelfGeneratedResponses
) {
154 hadEDNS
= getEDNSUDPPayloadSizeAndZ(packet
, *len
, &payloadSize
, &z
);
157 *len
=(uint16_t) (sizeof(dnsheader
)+consumed
+DNS_TYPE_SIZE
+DNS_CLASS_SIZE
);
158 struct dnsheader
* dh
= reinterpret_cast<struct dnsheader
*>(packet
);
159 dh
->ancount
= dh
->arcount
= dh
->nscount
= 0;
162 addEDNS(dh
, *len
, responseSize
, z
& EDNS_HEADER_FLAG_DO
, payloadSize
);
174 ComboAddress destination
;
175 ComboAddress origDest
;
179 if(origDest
.sin4
.sin_family
== 0) {
180 res
= sendto(fd
, packet
.c_str(), packet
.size(), 0, (struct sockaddr
*)&destination
, destination
.getSocklen());
183 res
= sendfromto(fd
, packet
.c_str(), packet
.size(), 0, origDest
, destination
);
187 vinfolog("Error sending delayed response to %s: %s", destination
.toStringWithPort(), strerror(err
));
192 DelayPipe
<DelayedPacket
> * g_delay
= 0;
194 void doLatencyStats(double udiff
)
196 if(udiff
< 1000) g_stats
.latency0_1
++;
197 else if(udiff
< 10000) g_stats
.latency1_10
++;
198 else if(udiff
< 50000) g_stats
.latency10_50
++;
199 else if(udiff
< 100000) g_stats
.latency50_100
++;
200 else if(udiff
< 1000000) g_stats
.latency100_1000
++;
201 else g_stats
.latencySlow
++;
203 auto doAvg
= [](double& var
, double n
, double weight
) {
204 var
= (weight
-1) * var
/weight
+ n
/weight
;
207 doAvg(g_stats
.latencyAvg100
, udiff
, 100);
208 doAvg(g_stats
.latencyAvg1000
, udiff
, 1000);
209 doAvg(g_stats
.latencyAvg10000
, udiff
, 10000);
210 doAvg(g_stats
.latencyAvg1000000
, udiff
, 1000000);
213 bool responseContentMatches(const char* response
, const uint16_t responseLen
, const DNSName
& qname
, const uint16_t qtype
, const uint16_t qclass
, const ComboAddress
& remote
, unsigned int& consumed
)
215 uint16_t rqtype
, rqclass
;
217 const struct dnsheader
* dh
= (struct dnsheader
*) response
;
219 if (responseLen
< sizeof(dnsheader
)) {
223 if (dh
->qdcount
== 0) {
224 if (dh
->rcode
!= RCode::NoError
&& dh
->rcode
!= RCode::NXDomain
) {
228 g_stats
.nonCompliantResponses
++;
234 rqname
=DNSName(response
, responseLen
, sizeof(dnsheader
), false, &rqtype
, &rqclass
, &consumed
);
236 catch(std::exception
& e
) {
237 if(responseLen
> (ssize_t
)sizeof(dnsheader
))
238 infolog("Backend %s sent us a response with id %d that did not parse: %s", remote
.toStringWithPort(), ntohs(dh
->id
), e
.what());
239 g_stats
.nonCompliantResponses
++;
243 if (rqtype
!= qtype
|| rqclass
!= qclass
|| rqname
!= qname
) {
250 void restoreFlags(struct dnsheader
* dh
, uint16_t origFlags
)
252 static const uint16_t rdMask
= 1 << FLAGS_RD_OFFSET
;
253 static const uint16_t cdMask
= 1 << FLAGS_CD_OFFSET
;
254 static const uint16_t restoreFlagsMask
= UINT16_MAX
& ~(rdMask
| cdMask
);
255 uint16_t * flags
= getFlagsFromDNSHeader(dh
);
256 /* clear the flags we are about to restore */
257 *flags
&= restoreFlagsMask
;
258 /* only keep the flags we want to restore */
259 origFlags
&= ~restoreFlagsMask
;
260 /* set the saved flags as they were */
264 bool fixUpQueryTurnedResponse(DNSQuestion
& dq
, const uint16_t origFlags
)
266 restoreFlags(dq
.dh
, origFlags
);
268 return addEDNSToQueryTurnedResponse(dq
);
271 bool fixUpResponse(char** response
, uint16_t* responseLen
, size_t* responseSize
, const DNSName
& qname
, uint16_t origFlags
, bool ednsAdded
, bool ecsAdded
, std::vector
<uint8_t>& rewrittenResponse
, uint16_t addRoom
)
273 struct dnsheader
* dh
= (struct dnsheader
*) *response
;
275 if (*responseLen
< sizeof(dnsheader
)) {
279 restoreFlags(dh
, origFlags
);
281 if (*responseLen
== sizeof(dnsheader
)) {
286 string realname
= qname
.toDNSString();
287 if (*responseLen
>= (sizeof(dnsheader
) + realname
.length())) {
288 memcpy(*response
+ sizeof(dnsheader
), realname
.c_str(), realname
.length());
292 if (ednsAdded
|| ecsAdded
) {
297 const std::string
responseStr(*response
, *responseLen
);
298 int res
= locateEDNSOptRR(responseStr
, &optStart
, &optLen
, &last
);
302 /* we added the entire OPT RR,
303 therefore we need to remove it entirely */
305 /* simply remove the last AR */
306 *responseLen
-= optLen
;
307 uint16_t arcount
= ntohs(dh
->arcount
);
309 dh
->arcount
= htons(arcount
);
312 /* Removing an intermediary RR could lead to compression error */
313 if (rewriteResponseWithoutEDNS(responseStr
, rewrittenResponse
) == 0) {
314 *responseLen
= rewrittenResponse
.size();
315 if (addRoom
&& (UINT16_MAX
- *responseLen
) > addRoom
) {
316 rewrittenResponse
.reserve(*responseLen
+ addRoom
);
318 *responseSize
= rewrittenResponse
.capacity();
319 *response
= reinterpret_cast<char*>(rewrittenResponse
.data());
322 warnlog("Error rewriting content");
327 /* the OPT RR was already present, but without ECS,
328 we need to remove the ECS option if any */
330 /* nothing after the OPT RR, we can simply remove the
332 size_t existingOptLen
= optLen
;
333 removeEDNSOptionFromOPT(*response
+ optStart
, &optLen
, EDNSOptionCode::ECS
);
334 *responseLen
-= (existingOptLen
- optLen
);
337 /* Removing an intermediary RR could lead to compression error */
338 if (rewriteResponseWithoutEDNSOption(responseStr
, EDNSOptionCode::ECS
, rewrittenResponse
) == 0) {
339 *responseLen
= rewrittenResponse
.size();
340 if (addRoom
&& (UINT16_MAX
- *responseLen
) > addRoom
) {
341 rewrittenResponse
.reserve(*responseLen
+ addRoom
);
343 *responseSize
= rewrittenResponse
.capacity();
344 *response
= reinterpret_cast<char*>(rewrittenResponse
.data());
347 warnlog("Error rewriting content");
358 bool encryptResponse(char* response
, uint16_t* responseLen
, size_t responseSize
, bool tcp
, std::shared_ptr
<DNSCryptQuery
> dnsCryptQuery
, dnsheader
** dh
, dnsheader
* dhCopy
)
361 uint16_t encryptedResponseLen
= 0;
363 /* save the original header before encrypting it in place */
364 if (dh
!= nullptr && *dh
!= nullptr && dhCopy
!= nullptr) {
365 memcpy(dhCopy
, *dh
, sizeof(dnsheader
));
369 int res
= dnsCryptQuery
->encryptResponse(response
, *responseLen
, responseSize
, tcp
, &encryptedResponseLen
);
371 *responseLen
= encryptedResponseLen
;
373 /* dropping response */
374 vinfolog("Error encrypting the response, dropping.");
382 static bool sendUDPResponse(int origFD
, char* response
, uint16_t responseLen
, int delayMsec
, const ComboAddress
& origDest
, const ComboAddress
& origRemote
)
384 if(delayMsec
&& g_delay
) {
385 DelayedPacket dp
{origFD
, string(response
,responseLen
), origRemote
, origDest
};
386 g_delay
->submit(dp
, delayMsec
);
390 if(origDest
.sin4
.sin_family
== 0) {
391 res
= sendto(origFD
, response
, responseLen
, 0, (struct sockaddr
*)&origRemote
, origRemote
.getSocklen());
394 res
= sendfromto(origFD
, response
, responseLen
, 0, origDest
, origRemote
);
398 vinfolog("Error sending response to %s: %s", origRemote
.toStringWithPort(), strerror(err
));
406 static int pickBackendSocketForSending(DownstreamState
* state
)
408 return state
->sockets
[state
->socketsOffset
++ % state
->sockets
.size()];
411 static void pickBackendSocketsReadyForReceiving(const std::shared_ptr
<DownstreamState
>& state
, std::vector
<int>& ready
)
415 if (state
->sockets
.size() == 1) {
416 ready
.push_back(state
->sockets
[0]);
421 std::lock_guard
<std::mutex
> lock(state
->socketsLock
);
422 state
->mplexer
->getAvailableFDs(ready
, -1);
426 // listens on a dedicated socket, lobs answers from downstream servers to original requestors
427 void* responderThread(std::shared_ptr
<DownstreamState
> dss
)
429 auto localRespRulactions
= g_resprulactions
.getLocal();
431 char packet
[4096 + DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE
];
432 /* when the answer is encrypted in place, we need to get a copy
433 of the original header before encryption to fill the ring buffer */
438 static_assert(sizeof(packet
) <= UINT16_MAX
, "Packet size should fit in a uint16_t");
439 vector
<uint8_t> rewrittenResponse
;
441 uint16_t queryId
= 0;
442 std::vector
<int> sockets
;
443 sockets
.reserve(dss
->sockets
.size());
446 dnsheader
* dh
= reinterpret_cast<struct dnsheader
*>(packet
);
448 pickBackendSocketsReadyForReceiving(dss
, sockets
);
449 for (const auto& fd
: sockets
) {
450 ssize_t got
= recv(fd
, packet
, sizeof(packet
), 0);
451 char * response
= packet
;
452 size_t responseSize
= sizeof(packet
);
454 if (got
< (ssize_t
) sizeof(dnsheader
))
457 uint16_t responseLen
= (uint16_t) got
;
460 if(queryId
>= dss
->idStates
.size())
463 IDState
* ids
= &dss
->idStates
[queryId
];
464 int origFD
= ids
->origFD
;
466 if(origFD
< 0) // duplicate
469 /* setting age to 0 to prevent the maintainer thread from
470 cleaning this IDS while we process the response.
471 We have already a copy of the origFD, so it would
472 mostly mess up the outstanding counter.
476 unsigned int consumed
= 0;
477 if (!responseContentMatches(response
, responseLen
, ids
->qname
, ids
->qtype
, ids
->qclass
, dss
->remote
, consumed
)) {
481 int oldFD
= ids
->origFD
.exchange(-1);
482 if (oldFD
== origFD
) {
483 /* we only decrement the outstanding counter if the value was not
484 altered in the meantime, which would mean that the state has been actively reused
485 and the other thread has not incremented the outstanding counter, so we don't
486 want it to be decremented twice. */
487 --dss
->outstanding
; // you'd think an attacker could game this, but we're using connected socket
490 if(dh
->tc
&& g_truncateTC
) {
491 truncateTC(response
, &responseLen
, responseSize
, consumed
);
494 dh
->id
= ids
->origID
;
496 uint16_t addRoom
= 0;
497 DNSResponse
dr(&ids
->qname
, ids
->qtype
, ids
->qclass
, consumed
, &ids
->origDest
, &ids
->origRemote
, dh
, sizeof(packet
), responseLen
, false, &ids
->sentTime
.d_start
);
499 dr
.uniqueId
= ids
->uniqueId
;
503 if (!processResponse(localRespRulactions
, dr
, &ids
->delayMsec
)) {
508 if (ids
->dnsCryptQuery
) {
509 addRoom
= DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE
;
512 if (!fixUpResponse(&response
, &responseLen
, &responseSize
, ids
->qname
, ids
->origFlags
, ids
->ednsAdded
, ids
->ecsAdded
, rewrittenResponse
, addRoom
)) {
516 if (ids
->packetCache
&& !ids
->skipCache
) {
517 ids
->packetCache
->insert(ids
->cacheKey
, ids
->subnet
, ids
->origFlags
, ids
->qname
, ids
->qtype
, ids
->qclass
, response
, responseLen
, false, dh
->rcode
, ids
->tempFailureTTL
);
520 if (ids
->cs
&& !ids
->cs
->muted
) {
522 if (!encryptResponse(response
, &responseLen
, responseSize
, false, ids
->dnsCryptQuery
, &dh
, &dhCopy
)) {
528 empty
.sin4
.sin_family
= 0;
529 /* if ids->destHarvested is false, origDest holds the listening address.
530 We don't want to use that as a source since it could be 0.0.0.0 for example. */
531 sendUDPResponse(origFD
, response
, responseLen
, ids
->delayMsec
, ids
->destHarvested
? ids
->origDest
: empty
, ids
->origRemote
);
536 double udiff
= ids
->sentTime
.udiff();
537 vinfolog("Got answer from %s, relayed to %s, took %f usec", dss
->remote
.toStringWithPort(), ids
->origRemote
.toStringWithPort(), udiff
);
541 g_rings
.insertResponse(ts
, ids
->origRemote
, ids
->qname
, ids
->qtype
, (unsigned int)udiff
, (unsigned int)got
, *dh
, dss
->remote
);
543 if(dh
->rcode
== RCode::ServFail
)
544 g_stats
.servfailResponses
++;
545 dss
->latencyUsec
= (127.0 * dss
->latencyUsec
/ 128.0) + udiff
/128.0;
547 doLatencyStats(udiff
);
549 /* if the FD is not -1, the state has been actively reused and we should
550 not alter anything */
551 if (ids
->origFD
== -1) {
553 ids
->dnsCryptQuery
= nullptr;
557 rewrittenResponse
.clear();
560 catch(const std::exception
& e
){
561 vinfolog("Got an error in UDP responder thread while parsing a response from %s, id %d: %s", dss
->remote
.toStringWithPort(), queryId
, e
.what());
566 catch(const std::exception
& e
)
568 errlog("UDP responder thread died because of exception: %s", e
.what());
571 catch(const PDNSException
& e
)
573 errlog("UDP responder thread died because of PowerDNS exception: %s", e
.reason
);
578 errlog("UDP responder thread died because of an exception: %s", "unknown");
582 bool DownstreamState::reconnect()
584 std::unique_lock
<std::mutex
> tl(connectLock
, std::try_to_lock
);
585 if (!tl
.owns_lock()) {
586 /* we are already reconnecting */
591 for (auto& fd
: sockets
) {
593 if (sockets
.size() > 1) {
594 std::lock_guard
<std::mutex
> lock(socketsLock
);
595 mplexer
->removeReadFD(fd
);
597 /* shutdown() is needed to wake up recv() in the responderThread */
598 shutdown(fd
, SHUT_RDWR
);
602 if (!IsAnyAddress(remote
)) {
603 fd
= SSocket(remote
.sin4
.sin_family
, SOCK_DGRAM
, 0);
604 if (!IsAnyAddress(sourceAddr
)) {
605 SSetsockopt(fd
, SOL_SOCKET
, SO_REUSEADDR
, 1);
606 SBind(fd
, sourceAddr
);
609 SConnect(fd
, remote
);
610 if (sockets
.size() > 1) {
611 std::lock_guard
<std::mutex
> lock(socketsLock
);
612 mplexer
->addReadFD(fd
, [](int, boost::any
) {});
616 catch(const std::runtime_error
& error
) {
617 infolog("Error connecting to new server with address %s: %s", remote
.toStringWithPort(), error
.what());
624 /* if at least one (re-)connection failed, close all sockets */
626 for (auto& fd
: sockets
) {
628 if (sockets
.size() > 1) {
629 std::lock_guard
<std::mutex
> lock(socketsLock
);
630 mplexer
->removeReadFD(fd
);
632 /* shutdown() is needed to wake up recv() in the responderThread */
633 shutdown(fd
, SHUT_RDWR
);
642 void DownstreamState::hash()
644 vinfolog("Computing hashes for id=%s and weight=%d", id
, weight
);
646 WriteLock
wl(&d_lock
);
649 std::string uuid
= boost::str(boost::format("%s-%d") % id
% w
);
650 unsigned int wshash
= burtleCI((const unsigned char*)uuid
.c_str(), uuid
.size(), g_hashperturb
);
651 hashes
.insert(wshash
);
656 void DownstreamState::setId(const boost::uuids::uuid
& newId
)
659 // compute hashes only if already done
660 if (!hashes
.empty()) {
665 void DownstreamState::setWeight(int newWeight
)
668 errlog("Error setting server's weight: downstream weight value must be greater than 0.");
672 if (!hashes
.empty()) {
677 DownstreamState::DownstreamState(const ComboAddress
& remote_
, const ComboAddress
& sourceAddr_
, unsigned int sourceItf_
, size_t numberOfSockets
): remote(remote_
), sourceAddr(sourceAddr_
), sourceItf(sourceItf_
)
679 pthread_rwlock_init(&d_lock
, nullptr);
680 id
= t_uuidGenerator();
681 threadStarted
.clear();
683 mplexer
= std::unique_ptr
<FDMultiplexer
>(FDMultiplexer::getMultiplexerSilent());
685 sockets
.resize(numberOfSockets
);
686 for (auto& fd
: sockets
) {
690 if (!IsAnyAddress(remote
)) {
692 idStates
.resize(g_maxOutstanding
);
694 infolog("Added downstream server %s", remote
.toStringWithPort());
699 std::mutex g_luamutex
;
702 GlobalStateHolder
<ServerPolicy
> g_policy
;
704 shared_ptr
<DownstreamState
> firstAvailable(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
706 for(auto& d
: servers
) {
707 if(d
.second
->isUp() && d
.second
->qps
.check())
710 return leastOutstanding(servers
, dq
);
713 // get server with least outstanding queries, and within those, with the lowest order, and within those: the fastest
714 shared_ptr
<DownstreamState
> leastOutstanding(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
716 if (servers
.size() == 1 && servers
[0].second
->isUp()) {
717 return servers
[0].second
;
720 vector
<pair
<tuple
<int,int,double>, shared_ptr
<DownstreamState
>>> poss
;
721 /* so you might wonder, why do we go through this trouble? The data on which we sort could change during the sort,
722 which would suck royally and could even lead to crashes. So first we snapshot on what we sort, and then we sort */
723 poss
.reserve(servers
.size());
724 for(auto& d
: servers
) {
725 if(d
.second
->isUp()) {
726 poss
.push_back({make_tuple(d
.second
->outstanding
.load(), d
.second
->order
, d
.second
->latencyUsec
), d
.second
});
730 return shared_ptr
<DownstreamState
>();
731 nth_element(poss
.begin(), poss
.begin(), poss
.end(), [](const decltype(poss
)::value_type
& a
, const decltype(poss
)::value_type
& b
) { return a
.first
< b
.first
; });
732 return poss
.begin()->second
;
735 shared_ptr
<DownstreamState
> valrandom(unsigned int val
, const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
737 vector
<pair
<int, shared_ptr
<DownstreamState
>>> poss
;
739 int max
= std::numeric_limits
<int>::max();
741 for(auto& d
: servers
) { // w=1, w=10 -> 1, 11
742 if(d
.second
->isUp()) {
743 // Don't overflow sum when adding high weights
744 if(d
.second
->weight
> max
- sum
) {
747 sum
+= d
.second
->weight
;
750 poss
.push_back({sum
, d
.second
});
754 // Catch poss & sum are empty to avoid SIGFPE
756 return shared_ptr
<DownstreamState
>();
759 auto p
= upper_bound(poss
.begin(), poss
.end(),r
, [](int r_
, const decltype(poss
)::value_type
& a
) { return r_
< a
.first
;});
761 return shared_ptr
<DownstreamState
>();
765 shared_ptr
<DownstreamState
> wrandom(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
767 return valrandom(random(), servers
, dq
);
770 uint32_t g_hashperturb
;
771 shared_ptr
<DownstreamState
> whashed(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
773 return valrandom(dq
->qname
->hash(g_hashperturb
), servers
, dq
);
776 shared_ptr
<DownstreamState
> chashed(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
778 unsigned int qhash
= dq
->qname
->hash(g_hashperturb
);
779 unsigned int sel
= std::numeric_limits
<unsigned int>::max();
780 unsigned int min
= std::numeric_limits
<unsigned int>::max();
781 shared_ptr
<DownstreamState
> ret
= nullptr, first
= nullptr;
783 for (const auto& d
: servers
) {
784 if (d
.second
->isUp()) {
785 // make sure hashes have been computed
786 if (d
.second
->hashes
.empty()) {
790 ReadLock
rl(&(d
.second
->d_lock
));
791 const auto& server
= d
.second
;
792 // we want to keep track of the last hash
793 if (min
> *(server
->hashes
.begin())) {
794 min
= *(server
->hashes
.begin());
798 auto hash_it
= server
->hashes
.lower_bound(qhash
);
799 if (hash_it
!= server
->hashes
.end()) {
800 if (*hash_it
< sel
) {
808 if (ret
!= nullptr) {
811 if (first
!= nullptr) {
814 return shared_ptr
<DownstreamState
>();
817 shared_ptr
<DownstreamState
> roundrobin(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
819 NumberedServerVector poss
;
821 for(auto& d
: servers
) {
822 if(d
.second
->isUp()) {
827 const auto *res
=&poss
;
832 return shared_ptr
<DownstreamState
>();
834 static unsigned int counter
;
836 return (*res
)[(counter
++) % res
->size()].second
;
839 ComboAddress g_serverControl
{"127.0.0.1:5199"};
841 std::shared_ptr
<ServerPool
> createPoolIfNotExists(pools_t
& pools
, const string
& poolName
)
843 std::shared_ptr
<ServerPool
> pool
;
844 pools_t::iterator it
= pools
.find(poolName
);
845 if (it
!= pools
.end()) {
849 if (!poolName
.empty())
850 vinfolog("Creating pool %s", poolName
);
851 pool
= std::make_shared
<ServerPool
>();
852 pools
.insert(std::pair
<std::string
,std::shared_ptr
<ServerPool
> >(poolName
, pool
));
857 void setPoolPolicy(pools_t
& pools
, const string
& poolName
, std::shared_ptr
<ServerPolicy
> policy
)
859 std::shared_ptr
<ServerPool
> pool
= createPoolIfNotExists(pools
, poolName
);
860 if (!poolName
.empty()) {
861 vinfolog("Setting pool %s server selection policy to %s", poolName
, policy
->name
);
863 vinfolog("Setting default pool server selection policy to %s", policy
->name
);
865 pool
->policy
= policy
;
868 void addServerToPool(pools_t
& pools
, const string
& poolName
, std::shared_ptr
<DownstreamState
> server
)
870 std::shared_ptr
<ServerPool
> pool
= createPoolIfNotExists(pools
, poolName
);
871 if (!poolName
.empty()) {
872 vinfolog("Adding server to pool %s", poolName
);
874 vinfolog("Adding server to default pool");
876 pool
->addServer(server
);
879 void removeServerFromPool(pools_t
& pools
, const string
& poolName
, std::shared_ptr
<DownstreamState
> server
)
881 std::shared_ptr
<ServerPool
> pool
= getPool(pools
, poolName
);
883 if (!poolName
.empty()) {
884 vinfolog("Removing server from pool %s", poolName
);
887 vinfolog("Removing server from default pool");
890 pool
->removeServer(server
);
893 std::shared_ptr
<ServerPool
> getPool(const pools_t
& pools
, const std::string
& poolName
)
895 pools_t::const_iterator it
= pools
.find(poolName
);
897 if (it
== pools
.end()) {
898 throw std::out_of_range("No pool named " + poolName
);
904 NumberedServerVector
getDownstreamCandidates(const pools_t
& pools
, const std::string
& poolName
)
906 std::shared_ptr
<ServerPool
> pool
= getPool(pools
, poolName
);
907 return pool
->getServers();
910 static void spoofResponseFromString(DNSQuestion
& dq
, const string
& spoofContent
)
914 std::vector
<std::string
> addrs
;
915 stringtok(addrs
, spoofContent
, " ,");
917 if (addrs
.size() == 1) {
919 ComboAddress
spoofAddr(spoofContent
);
920 SpoofAction
sa({spoofAddr
});
923 catch(const PDNSException
&e
) {
924 SpoofAction
sa(spoofContent
); // CNAME then
928 std::vector
<ComboAddress
> cas
;
929 for (const auto& addr
: addrs
) {
931 cas
.push_back(ComboAddress(addr
));
941 bool processQuery(LocalHolders
& holders
, DNSQuestion
& dq
, string
& poolname
, int* delayMsec
, const struct timespec
& now
)
943 g_rings
.insertQuery(now
,*dq
.remote
,*dq
.qname
,dq
.qtype
,dq
.len
,*dq
.dh
);
945 if(g_qcount
.enabled
) {
946 string qname
= (*dq
.qname
).toString(".");
947 bool countQuery
{true};
948 if(g_qcount
.filter
) {
949 std::lock_guard
<std::mutex
> lock(g_luamutex
);
950 std::tie (countQuery
, qname
) = g_qcount
.filter(dq
);
954 WriteLock
wl(&g_qcount
.queryLock
);
955 if(!g_qcount
.records
.count(qname
)) {
956 g_qcount
.records
[qname
] = 0;
958 g_qcount
.records
[qname
]++;
962 if(auto got
= holders
.dynNMGBlock
->lookup(*dq
.remote
)) {
963 auto updateBlockStats
= [&got
]() {
964 g_stats
.dynBlocked
++;
965 got
->second
.blocks
++;
968 if(now
< got
->second
.until
) {
969 DNSAction::Action action
= got
->second
.action
;
970 if (action
== DNSAction::Action::None
) {
971 action
= g_dynBlockAction
;
974 case DNSAction::Action::NoOp
:
977 case DNSAction::Action::Refused
:
978 vinfolog("Query from %s refused because of dynamic block", dq
.remote
->toStringWithPort());
981 dq
.dh
->rcode
= RCode::Refused
;
985 case DNSAction::Action::Truncate
:
988 vinfolog("Query from %s truncated because of dynamic block", dq
.remote
->toStringWithPort());
994 vinfolog("Query from %s for %s over TCP *not* truncated because of dynamic block", dq
.remote
->toStringWithPort(), dq
.qname
->toString());
999 vinfolog("Query from %s dropped because of dynamic block", dq
.remote
->toStringWithPort());
1005 if(auto got
= holders
.dynSMTBlock
->lookup(*dq
.qname
)) {
1006 auto updateBlockStats
= [&got
]() {
1007 g_stats
.dynBlocked
++;
1011 if(now
< got
->until
) {
1012 DNSAction::Action action
= got
->action
;
1013 if (action
== DNSAction::Action::None
) {
1014 action
= g_dynBlockAction
;
1017 case DNSAction::Action::NoOp
:
1020 case DNSAction::Action::Refused
:
1021 vinfolog("Query from %s for %s refused because of dynamic block", dq
.remote
->toStringWithPort(), dq
.qname
->toString());
1024 dq
.dh
->rcode
= RCode::Refused
;
1027 case DNSAction::Action::Truncate
:
1031 vinfolog("Query from %s for %s truncated because of dynamic block", dq
.remote
->toStringWithPort(), dq
.qname
->toString());
1037 vinfolog("Query from %s for %s over TCP *not* truncated because of dynamic block", dq
.remote
->toStringWithPort(), dq
.qname
->toString());
1042 vinfolog("Query from %s for %s dropped because of dynamic block", dq
.remote
->toStringWithPort(), dq
.qname
->toString());
1048 DNSAction::Action action
=DNSAction::Action::None
;
1050 for(const auto& lr
: *holders
.rulactions
) {
1051 if(lr
.d_rule
->matches(&dq
)) {
1052 lr
.d_rule
->d_matches
++;
1053 action
=(*lr
.d_action
)(&dq
, &ruleresult
);
1056 case DNSAction::Action::Allow
:
1059 case DNSAction::Action::Drop
:
1063 case DNSAction::Action::Nxdomain
:
1064 dq
.dh
->rcode
= RCode::NXDomain
;
1066 g_stats
.ruleNXDomain
++;
1069 case DNSAction::Action::Refused
:
1070 dq
.dh
->rcode
= RCode::Refused
;
1072 g_stats
.ruleRefused
++;
1075 case DNSAction::Action::ServFail
:
1076 dq
.dh
->rcode
= RCode::ServFail
;
1078 g_stats
.ruleServFail
++;
1081 case DNSAction::Action::Spoof
:
1082 spoofResponseFromString(dq
, ruleresult
);
1085 case DNSAction::Action::Truncate
:
1090 case DNSAction::Action::HeaderModify
:
1093 case DNSAction::Action::Pool
:
1094 poolname
=ruleresult
;
1097 /* non-terminal actions follow */
1098 case DNSAction::Action::Delay
:
1099 *delayMsec
= static_cast<int>(pdns_stou(ruleresult
)); // sorry
1101 case DNSAction::Action::None
:
1103 case DNSAction::Action::NoOp
:
1112 bool processResponse(LocalStateHolder
<vector
<DNSDistResponseRuleAction
> >& localRespRulactions
, DNSResponse
& dr
, int* delayMsec
)
1114 DNSResponseAction::Action action
=DNSResponseAction::Action::None
;
1115 std::string ruleresult
;
1116 for(const auto& lr
: *localRespRulactions
) {
1117 if(lr
.d_rule
->matches(&dr
)) {
1118 lr
.d_rule
->d_matches
++;
1119 action
=(*lr
.d_action
)(&dr
, &ruleresult
);
1121 case DNSResponseAction::Action::Allow
:
1124 case DNSResponseAction::Action::Drop
:
1127 case DNSResponseAction::Action::HeaderModify
:
1130 case DNSResponseAction::Action::ServFail
:
1131 dr
.dh
->rcode
= RCode::ServFail
;
1134 /* non-terminal actions follow */
1135 case DNSResponseAction::Action::Delay
:
1136 *delayMsec
= static_cast<int>(pdns_stou(ruleresult
)); // sorry
1138 case DNSResponseAction::Action::None
:
1147 static ssize_t
udpClientSendRequestToBackend(DownstreamState
* ss
, const int sd
, const char* request
, const size_t requestLen
, bool healthCheck
=false)
1151 if (ss
->sourceItf
== 0) {
1152 result
= send(sd
, request
, requestLen
, 0);
1158 ComboAddress
remote(ss
->remote
);
1159 fillMSGHdr(&msgh
, &iov
, cbuf
, sizeof(cbuf
), const_cast<char*>(request
), requestLen
, &remote
);
1160 addCMsgSrcAddr(&msgh
, cbuf
, &ss
->sourceAddr
, ss
->sourceItf
);
1161 result
= sendmsg(sd
, &msgh
, 0);
1165 int savederrno
= errno
;
1166 vinfolog("Error sending request to backend %s: %d", ss
->remote
.toStringWithPort(), savederrno
);
1168 /* This might sound silly, but on Linux send() might fail with EINVAL
1169 if the interface the socket was bound to doesn't exist anymore.
1170 We don't want to reconnect the real socket if the healthcheck failed,
1171 because it's not using the same socket.
1173 if (!healthCheck
&& (savederrno
== EINVAL
|| savederrno
== ENODEV
)) {
1181 bool addXPF(DNSQuestion
& dq
, uint16_t optionCode
)
1183 std::string payload
= generateXPFPayload(dq
.tcp
, *dq
.remote
, *dq
.local
);
1184 uint8_t root
= '\0';
1185 dnsrecordheader drh
;
1186 drh
.d_type
= htons(optionCode
);
1187 drh
.d_class
= htons(QClass::IN
);
1189 drh
.d_clen
= htons(payload
.size());
1190 size_t recordHeaderLen
= sizeof(root
) + sizeof(drh
);
1192 size_t available
= dq
.size
- dq
.len
;
1194 if ((payload
.size() + recordHeaderLen
) > available
) {
1198 size_t pos
= dq
.len
;
1199 memcpy(reinterpret_cast<char*>(dq
.dh
) + pos
, &root
, sizeof(root
));
1200 pos
+= sizeof(root
);
1201 memcpy(reinterpret_cast<char*>(dq
.dh
) + pos
, &drh
, sizeof(drh
));
1203 memcpy(reinterpret_cast<char*>(dq
.dh
) + pos
, payload
.data(), payload
.size());
1204 pos
+= payload
.size();
1208 dq
.dh
->arcount
= htons(ntohs(dq
.dh
->arcount
) + 1);
1213 static bool isUDPQueryAcceptable(ClientState
& cs
, LocalHolders
& holders
, const struct msghdr
* msgh
, const ComboAddress
& remote
, ComboAddress
& dest
)
1215 if (msgh
->msg_flags
& MSG_TRUNC
) {
1216 /* message was too large for our buffer */
1217 vinfolog("Dropping message too large for our buffer");
1218 g_stats
.nonCompliantQueries
++;
1222 if(!holders
.acl
->match(remote
)) {
1223 vinfolog("Query from %s dropped because of ACL", remote
.toStringWithPort());
1231 if (HarvestDestinationAddress(msgh
, &dest
)) {
1232 /* we don't get the port, only the address */
1233 dest
.sin4
.sin_port
= cs
.local
.sin4
.sin_port
;
1236 dest
.sin4
.sin_family
= 0;
1242 #ifdef HAVE_DNSCRYPT
1243 static bool checkDNSCryptQuery(const ClientState
& cs
, const char* query
, uint16_t& len
, std::shared_ptr
<DNSCryptQuery
>& dnsCryptQuery
, const ComboAddress
& dest
, const ComboAddress
& remote
, time_t now
)
1245 if (cs
.dnscryptCtx
) {
1246 vector
<uint8_t> response
;
1247 uint16_t decryptedQueryLen
= 0;
1249 dnsCryptQuery
= std::make_shared
<DNSCryptQuery
>(cs
.dnscryptCtx
);
1251 bool decrypted
= handleDNSCryptQuery(const_cast<char*>(query
), len
, dnsCryptQuery
, &decryptedQueryLen
, false, now
, response
);
1254 if (response
.size() > 0) {
1255 sendUDPResponse(cs
.udpFD
, reinterpret_cast<char*>(response
.data()), static_cast<uint16_t>(response
.size()), 0, dest
, remote
);
1260 len
= decryptedQueryLen
;
1264 #endif /* HAVE_DNSCRYPT */
1266 bool checkQueryHeaders(const struct dnsheader
* dh
)
1268 if (dh
->qr
) { // don't respond to responses
1269 g_stats
.nonCompliantQueries
++;
1273 if (dh
->qdcount
== 0) {
1274 g_stats
.emptyQueries
++;
1279 g_stats
.rdQueries
++;
1285 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1286 static void queueResponse(const ClientState
& cs
, const char* response
, uint16_t responseLen
, const ComboAddress
& dest
, const ComboAddress
& remote
, struct mmsghdr
& outMsg
, struct iovec
* iov
, char* cbuf
)
1289 fillMSGHdr(&outMsg
.msg_hdr
, iov
, nullptr, 0, const_cast<char*>(response
), responseLen
, const_cast<ComboAddress
*>(&remote
));
1291 if (dest
.sin4
.sin_family
== 0) {
1292 outMsg
.msg_hdr
.msg_control
= nullptr;
1295 addCMsgSrcAddr(&outMsg
.msg_hdr
, cbuf
, &dest
, 0);
1298 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1300 static void processUDPQuery(ClientState
& cs
, LocalHolders
& holders
, const struct msghdr
* msgh
, const ComboAddress
& remote
, ComboAddress
& dest
, char* query
, uint16_t len
, size_t queryBufferSize
, struct mmsghdr
* responsesVect
, unsigned int* queuedResponses
, struct iovec
* respIOV
, char* respCBuf
)
1302 assert(responsesVect
== nullptr || (queuedResponses
!= nullptr && respIOV
!= nullptr && respCBuf
!= nullptr));
1303 uint16_t queryId
= 0;
1306 if (!isUDPQueryAcceptable(cs
, holders
, msgh
, remote
, dest
)) {
1310 /* we need an accurate ("real") value for the response and
1311 to store into the IDS, but not for insertion into the
1312 rings for example */
1313 struct timespec queryRealTime
;
1314 struct timespec now
;
1316 gettime(&queryRealTime
, true);
1318 #ifdef HAVE_DNSCRYPT
1319 std::shared_ptr
<DNSCryptQuery
> dnsCryptQuery
= nullptr;
1321 if (!checkDNSCryptQuery(cs
, query
, len
, dnsCryptQuery
, dest
, remote
, queryRealTime
.tv_sec
)) {
1326 struct dnsheader
* dh
= reinterpret_cast<struct dnsheader
*>(query
);
1327 queryId
= ntohs(dh
->id
);
1329 if (!checkQueryHeaders(dh
)) {
1335 const uint16_t * flags
= getFlagsFromDNSHeader(dh
);
1336 const uint16_t origFlags
= *flags
;
1337 uint16_t qtype
, qclass
;
1338 unsigned int consumed
= 0;
1339 DNSName
qname(query
, len
, sizeof(dnsheader
), false, &qtype
, &qclass
, &consumed
);
1340 DNSQuestion
dq(&qname
, qtype
, qclass
, consumed
, dest
.sin4
.sin_family
!= 0 ? &dest
: &cs
.local
, &remote
, dh
, queryBufferSize
, len
, false, &queryRealTime
);
1342 if (!processQuery(holders
, dq
, poolname
, &delayMsec
, now
))
1347 if(dq
.dh
->qr
) { // something turned it into a response
1348 fixUpQueryTurnedResponse(dq
, origFlags
);
1351 char* response
= query
;
1352 uint16_t responseLen
= dq
.len
;
1354 DNSResponse
dr(dq
.qname
, dq
.qtype
, dq
.qclass
, consumed
, dq
.local
, dq
.remote
, reinterpret_cast<dnsheader
*>(response
), dq
.size
, responseLen
, false, &queryRealTime
);
1355 #ifdef HAVE_PROTOBUF
1356 dr
.uniqueId
= dq
.uniqueId
;
1360 if (!processResponse(holders
.selfAnsweredRespRulactions
, dr
, &delayMsec
)) {
1364 #ifdef HAVE_DNSCRYPT
1365 if (!encryptResponse(response
, &responseLen
, dq
.size
, false, dnsCryptQuery
, nullptr, nullptr)) {
1369 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1370 if (delayMsec
== 0 && responsesVect
!= nullptr) {
1371 queueResponse(cs
, response
, responseLen
, dest
, remote
, responsesVect
[*queuedResponses
], respIOV
, respCBuf
);
1372 (*queuedResponses
)++;
1375 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1377 sendUDPResponse(cs
.udpFD
, response
, responseLen
, delayMsec
, dest
, remote
);
1380 g_stats
.selfAnswered
++;
1381 doLatencyStats(0); // we're not going to measure this
1387 DownstreamState
* ss
= nullptr;
1388 std::shared_ptr
<ServerPool
> serverPool
= getPool(*holders
.pools
, poolname
);
1389 std::shared_ptr
<DNSDistPacketCache
> packetCache
= serverPool
->packetCache
;
1390 auto policy
= *(holders
.policy
);
1391 if (serverPool
->policy
!= nullptr) {
1392 policy
= *(serverPool
->policy
);
1394 auto servers
= serverPool
->getServers();
1396 std::lock_guard
<std::mutex
> lock(g_luamutex
);
1397 ss
= policy
.policy(servers
, &dq
).get();
1400 ss
= policy
.policy(servers
, &dq
).get();
1403 bool ednsAdded
= false;
1404 bool ecsAdded
= false;
1405 if (dq
.useECS
&& ((ss
&& ss
->useECS
) || (!ss
&& serverPool
->getECS()))) {
1406 if (!handleEDNSClientSubnet(dq
, &(ednsAdded
), &(ecsAdded
))) {
1407 vinfolog("Dropping query from %s because we couldn't insert the ECS value", remote
.toStringWithPort());
1412 uint32_t cacheKey
= 0;
1413 boost::optional
<Netmask
> subnet
;
1414 if (packetCache
&& !dq
.skipCache
) {
1415 uint16_t cachedResponseSize
= dq
.size
;
1416 uint32_t allowExpired
= ss
? 0 : g_staleCacheEntriesTTL
;
1417 if (packetCache
->get(dq
, consumed
, dh
->id
, query
, &cachedResponseSize
, &cacheKey
, subnet
, allowExpired
)) {
1418 DNSResponse
dr(dq
.qname
, dq
.qtype
, dq
.qclass
, dq
.consumed
, dq
.local
, dq
.remote
, reinterpret_cast<dnsheader
*>(query
), dq
.size
, cachedResponseSize
, false, &queryRealTime
);
1419 #ifdef HAVE_PROTOBUF
1420 dr
.uniqueId
= dq
.uniqueId
;
1424 if (!processResponse(holders
.cacheHitRespRulactions
, dr
, &delayMsec
)) {
1429 #ifdef HAVE_DNSCRYPT
1430 if (!encryptResponse(query
, &cachedResponseSize
, dq
.size
, false, dnsCryptQuery
, nullptr, nullptr)) {
1434 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1435 if (delayMsec
== 0 && responsesVect
!= nullptr) {
1436 queueResponse(cs
, query
, cachedResponseSize
, dest
, remote
, responsesVect
[*queuedResponses
], respIOV
, respCBuf
);
1437 (*queuedResponses
)++;
1440 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1442 sendUDPResponse(cs
.udpFD
, query
, cachedResponseSize
, delayMsec
, dest
, remote
);
1446 g_stats
.cacheHits
++;
1447 doLatencyStats(0); // we're not going to measure this
1450 g_stats
.cacheMisses
++;
1456 if (g_servFailOnNoPolicy
&& !cs
.muted
) {
1457 char* response
= query
;
1458 uint16_t responseLen
= dq
.len
;
1459 restoreFlags(dh
, origFlags
);
1461 dq
.dh
->rcode
= RCode::ServFail
;
1464 DNSResponse
dr(dq
.qname
, dq
.qtype
, dq
.qclass
, dq
.consumed
, dq
.local
, dq
.remote
, reinterpret_cast<dnsheader
*>(response
), dq
.size
, responseLen
, false, &queryRealTime
);
1465 #ifdef HAVE_PROTOBUF
1466 dr
.uniqueId
= dq
.uniqueId
;
1470 if (!processResponse(holders
.selfAnsweredRespRulactions
, dr
, &delayMsec
)) {
1474 #ifdef HAVE_DNSCRYPT
1475 if (!encryptResponse(response
, &responseLen
, dq
.size
, false, dnsCryptQuery
, nullptr, nullptr)) {
1479 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1480 if (responsesVect
!= nullptr) {
1481 queueResponse(cs
, response
, responseLen
, dest
, remote
, responsesVect
[*queuedResponses
], respIOV
, respCBuf
);
1482 (*queuedResponses
)++;
1485 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1487 sendUDPResponse(cs
.udpFD
, response
, responseLen
, 0, dest
, remote
);
1490 // no response-only statistics counter to update.
1491 doLatencyStats(0); // we're not going to measure this
1493 vinfolog("%s query for %s|%s from %s, no policy applied", g_servFailOnNoPolicy
? "ServFailed" : "Dropped", dq
.qname
->toString(), QType(dq
.qtype
).getName(), remote
.toStringWithPort());
1497 if (dq
.addXPF
&& ss
->xpfRRCode
!= 0) {
1498 addXPF(dq
, ss
->xpfRRCode
);
1503 unsigned int idOffset
= (ss
->idOffset
++) % ss
->idStates
.size();
1504 IDState
* ids
= &ss
->idStates
[idOffset
];
1507 int oldFD
= ids
->origFD
.exchange(cs
.udpFD
);
1509 // if we are reusing, no change in outstanding
1514 g_stats
.downstreamTimeouts
++;
1518 ids
->origID
= dh
->id
;
1519 ids
->origRemote
= remote
;
1520 ids
->sentTime
.set(queryRealTime
);
1522 ids
->qtype
= dq
.qtype
;
1523 ids
->qclass
= dq
.qclass
;
1524 ids
->delayMsec
= delayMsec
;
1525 ids
->tempFailureTTL
= dq
.tempFailureTTL
;
1526 ids
->origFlags
= origFlags
;
1527 ids
->cacheKey
= cacheKey
;
1528 ids
->subnet
= subnet
;
1529 ids
->skipCache
= dq
.skipCache
;
1530 ids
->packetCache
= packetCache
;
1531 ids
->ednsAdded
= ednsAdded
;
1532 ids
->ecsAdded
= ecsAdded
;
1533 ids
->qTag
= dq
.qTag
;
1535 /* If we couldn't harvest the real dest addr, still
1536 write down the listening addr since it will be useful
1537 (especially if it's not an 'any' one).
1538 We need to keep track of which one it is since we may
1539 want to use the real but not the listening addr to reply.
1541 if (dest
.sin4
.sin_family
!= 0) {
1542 ids
->origDest
= dest
;
1543 ids
->destHarvested
= true;
1546 ids
->origDest
= cs
.local
;
1547 ids
->destHarvested
= false;
1549 #ifdef HAVE_DNSCRYPT
1550 ids
->dnsCryptQuery
= dnsCryptQuery
;
1552 #ifdef HAVE_PROTOBUF
1553 ids
->uniqueId
= dq
.uniqueId
;
1558 int fd
= pickBackendSocketForSending(ss
);
1559 ssize_t ret
= udpClientSendRequestToBackend(ss
, fd
, query
, dq
.len
);
1563 g_stats
.downstreamSendErrors
++;
1566 vinfolog("Got query for %s|%s from %s, relayed to %s", ids
->qname
.toString(), QType(ids
->qtype
).getName(), remote
.toStringWithPort(), ss
->getName());
1568 catch(const std::exception
& e
){
1569 vinfolog("Got an error in UDP question thread while parsing a query from %s, id %d: %s", remote
.toStringWithPort(), queryId
, e
.what());
1573 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1574 static void MultipleMessagesUDPClientThread(ClientState
* cs
, LocalHolders
& holders
)
1579 /* used by HarvestDestinationAddress */
1581 ComboAddress remote
;
1585 const size_t vectSize
= g_udpVectorSize
;
1586 /* the actual buffer is larger because:
1587 - we may have to add EDNS and/or ECS
1588 - we use it for self-generated responses (from rule or cache)
1589 but we only accept incoming payloads up to that size
1591 static_assert(s_udpIncomingBufferSize
<= sizeof(MMReceiver::packet
), "the incoming buffer size should not be larger than sizeof(MMReceiver::packet)");
1593 auto recvData
= std::unique_ptr
<MMReceiver
[]>(new MMReceiver
[vectSize
]);
1594 auto msgVec
= std::unique_ptr
<struct mmsghdr
[]>(new struct mmsghdr
[vectSize
]);
1595 auto outMsgVec
= std::unique_ptr
<struct mmsghdr
[]>(new struct mmsghdr
[vectSize
]);
1597 /* initialize the structures needed to receive our messages */
1598 for (size_t idx
= 0; idx
< vectSize
; idx
++) {
1599 recvData
[idx
].remote
.sin4
.sin_family
= cs
->local
.sin4
.sin_family
;
1600 fillMSGHdr(&msgVec
[idx
].msg_hdr
, &recvData
[idx
].iov
, recvData
[idx
].cbuf
, sizeof(recvData
[idx
].cbuf
), recvData
[idx
].packet
, s_udpIncomingBufferSize
, &recvData
[idx
].remote
);
1606 /* reset the IO vector, since it's also used to send the vector of responses
1607 to avoid having to copy the data around */
1608 for (size_t idx
= 0; idx
< vectSize
; idx
++) {
1609 recvData
[idx
].iov
.iov_base
= recvData
[idx
].packet
;
1610 recvData
[idx
].iov
.iov_len
= sizeof(recvData
[idx
].packet
);
1613 /* block until we have at least one message ready, but return
1614 as many as possible to save the syscall costs */
1615 int msgsGot
= recvmmsg(cs
->udpFD
, msgVec
.get(), vectSize
, MSG_WAITFORONE
| MSG_TRUNC
, nullptr);
1618 vinfolog("Getting UDP messages via recvmmsg() failed with: %s", strerror(errno
));
1622 unsigned int msgsToSend
= 0;
1624 /* process the received messages */
1625 for (int msgIdx
= 0; msgIdx
< msgsGot
; msgIdx
++) {
1626 const struct msghdr
* msgh
= &msgVec
[msgIdx
].msg_hdr
;
1627 unsigned int got
= msgVec
[msgIdx
].msg_len
;
1628 const ComboAddress
& remote
= recvData
[msgIdx
].remote
;
1630 if (got
< sizeof(struct dnsheader
)) {
1631 g_stats
.nonCompliantQueries
++;
1635 processUDPQuery(*cs
, holders
, msgh
, remote
, recvData
[msgIdx
].dest
, recvData
[msgIdx
].packet
, static_cast<uint16_t>(got
), sizeof(recvData
[msgIdx
].packet
), outMsgVec
.get(), &msgsToSend
, &recvData
[msgIdx
].iov
, recvData
[msgIdx
].cbuf
);
1639 /* immediate (not delayed or sent to a backend) responses (mostly from a rule, dynamic block
1640 or the cache) can be sent in batch too */
1642 if (msgsToSend
> 0 && msgsToSend
<= static_cast<unsigned int>(msgsGot
)) {
1643 int sent
= sendmmsg(cs
->udpFD
, outMsgVec
.get(), msgsToSend
, 0);
1645 if (sent
< 0 || static_cast<unsigned int>(sent
) != msgsToSend
) {
1646 vinfolog("Error sending responses with sendmmsg() (%d on %u): %s", sent
, msgsToSend
, strerror(errno
));
1652 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1654 // listens to incoming queries, sends out to downstream servers, noting the intended return path
1655 static void* udpClientThread(ClientState
* cs
)
1658 LocalHolders holders
;
1660 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1661 if (g_udpVectorSize
> 1) {
1662 MultipleMessagesUDPClientThread(cs
, holders
);
1666 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1669 /* the actual buffer is larger because:
1670 - we may have to add EDNS and/or ECS
1671 - we use it for self-generated responses (from rule or cache)
1672 but we only accept incoming payloads up to that size
1674 static_assert(s_udpIncomingBufferSize
<= sizeof(packet
), "the incoming buffer size should not be larger than sizeof(MMReceiver::packet)");
1677 /* used by HarvestDestinationAddress */
1680 ComboAddress remote
;
1682 remote
.sin4
.sin_family
= cs
->local
.sin4
.sin_family
;
1683 fillMSGHdr(&msgh
, &iov
, cbuf
, sizeof(cbuf
), packet
, sizeof(packet
), &remote
);
1686 ssize_t got
= recvmsg(cs
->udpFD
, &msgh
, 0);
1688 if (got
< 0 || static_cast<size_t>(got
) < sizeof(struct dnsheader
)) {
1689 g_stats
.nonCompliantQueries
++;
1693 processUDPQuery(*cs
, holders
, &msgh
, remote
, dest
, packet
, static_cast<uint16_t>(got
), s_udpIncomingBufferSize
, nullptr, nullptr, nullptr, nullptr);
1699 catch(const std::exception
&e
)
1701 errlog("UDP client thread died because of exception: %s", e
.what());
1704 catch(const PDNSException
&e
)
1706 errlog("UDP client thread died because of PowerDNS exception: %s", e
.reason
);
1711 errlog("UDP client thread died because of an exception: %s", "unknown");
1715 static bool upCheck(DownstreamState
& ds
)
1718 DNSName checkName
= ds
.checkName
;
1719 uint16_t checkType
= ds
.checkType
.getCode();
1720 uint16_t checkClass
= ds
.checkClass
;
1721 dnsheader checkHeader
;
1722 memset(&checkHeader
, 0, sizeof(checkHeader
));
1724 checkHeader
.qdcount
= htons(1);
1725 #ifdef HAVE_LIBSODIUM
1726 checkHeader
.id
= randombytes_random() % 65536;
1728 checkHeader
.id
= random() % 65536;
1731 checkHeader
.rd
= true;
1733 checkHeader
.cd
= true;
1737 if (ds
.checkFunction
) {
1738 std::lock_guard
<std::mutex
> lock(g_luamutex
);
1739 auto ret
= ds
.checkFunction(checkName
, checkType
, checkClass
, &checkHeader
);
1740 checkName
= std::get
<0>(ret
);
1741 checkType
= std::get
<1>(ret
);
1742 checkClass
= std::get
<2>(ret
);
1745 vector
<uint8_t> packet
;
1746 DNSPacketWriter
dpw(packet
, checkName
, checkType
, checkClass
);
1747 dnsheader
* requestHeader
= dpw
.getHeader();
1748 *requestHeader
= checkHeader
;
1750 Socket
sock(ds
.remote
.sin4
.sin_family
, SOCK_DGRAM
);
1751 sock
.setNonBlocking();
1752 if (!IsAnyAddress(ds
.sourceAddr
)) {
1753 sock
.setReuseAddr();
1754 sock
.bind(ds
.sourceAddr
);
1756 sock
.connect(ds
.remote
);
1757 ssize_t sent
= udpClientSendRequestToBackend(&ds
, sock
.getHandle(), (char*)&packet
[0], packet
.size(), true);
1760 if (g_verboseHealthChecks
)
1761 infolog("Error while sending a health check query to backend %s: %d", ds
.getNameWithAddr(), ret
);
1765 int ret
=waitForRWData(sock
.getHandle(), true, 1, 0);
1766 if(ret
< 0 || !ret
) { // error, timeout, both are down!
1769 if (g_verboseHealthChecks
)
1770 infolog("Error while waiting for the health check response from backend %s: %d", ds
.getNameWithAddr(), ret
);
1773 if (g_verboseHealthChecks
)
1774 infolog("Timeout while waiting for the health check response from backend %s", ds
.getNameWithAddr());
1781 sock
.recvFrom(reply
, from
);
1783 /* we are using a connected socket but hey.. */
1784 if (from
!= ds
.remote
) {
1785 if (g_verboseHealthChecks
)
1786 infolog("Invalid health check response received from %s, expecting one from %s", from
.toStringWithPort(), ds
.remote
.toStringWithPort());
1790 const dnsheader
* responseHeader
= reinterpret_cast<const dnsheader
*>(reply
.c_str());
1792 if (reply
.size() < sizeof(*responseHeader
)) {
1793 if (g_verboseHealthChecks
)
1794 infolog("Invalid health check response of size %d from backend %s, expecting at least %d", reply
.size(), ds
.getNameWithAddr(), sizeof(*responseHeader
));
1798 if (responseHeader
->id
!= requestHeader
->id
) {
1799 if (g_verboseHealthChecks
)
1800 infolog("Invalid health check response id %d from backend %s, expecting %d", responseHeader
->id
, ds
.getNameWithAddr(), requestHeader
->id
);
1804 if (!responseHeader
->qr
) {
1805 if (g_verboseHealthChecks
)
1806 infolog("Invalid health check response from backend %s, expecting QR to be set", ds
.getNameWithAddr());
1810 if (responseHeader
->rcode
== RCode::ServFail
) {
1811 if (g_verboseHealthChecks
)
1812 infolog("Backend %s responded to health check with ServFail", ds
.getNameWithAddr());
1816 if (ds
.mustResolve
&& (responseHeader
->rcode
== RCode::NXDomain
|| responseHeader
->rcode
== RCode::Refused
)) {
1817 if (g_verboseHealthChecks
)
1818 infolog("Backend %s responded to health check with %s while mustResolve is set", ds
.getNameWithAddr(), responseHeader
->rcode
== RCode::NXDomain
? "NXDomain" : "Refused");
1822 uint16_t receivedType
;
1823 uint16_t receivedClass
;
1824 DNSName
receivedName(reply
.c_str(), reply
.size(), sizeof(dnsheader
), false, &receivedType
, &receivedClass
);
1826 if (receivedName
!= checkName
|| receivedType
!= checkType
|| receivedClass
!= checkClass
) {
1827 if (g_verboseHealthChecks
)
1828 infolog("Backend %s responded to health check with an invalid qname (%s vs %s), qtype (%s vs %s) or qclass (%d vs %d)", ds
.getNameWithAddr(), receivedName
.toLogString(), checkName
.toLogString(), QType(receivedType
).getName(), QType(checkType
).getName(), receivedClass
, checkClass
);
1834 catch(const std::exception
& e
)
1836 if (g_verboseHealthChecks
)
1837 infolog("Error checking the health of backend %s: %s", ds
.getNameWithAddr(), e
.what());
1842 if (g_verboseHealthChecks
)
1843 infolog("Unknown exception while checking the health of backend %s", ds
.getNameWithAddr());
1847 uint64_t g_maxTCPClientThreads
{10};
1848 std::atomic
<uint16_t> g_cacheCleaningDelay
{60};
1849 std::atomic
<uint16_t> g_cacheCleaningPercentage
{100};
1855 int32_t secondsToWaitLog
= 0;
1861 std::lock_guard
<std::mutex
> lock(g_luamutex
);
1862 auto f
= g_lua
.readVariable
<boost::optional
<std::function
<void()> > >("maintenance");
1866 secondsToWaitLog
= 0;
1868 catch(std::exception
&e
) {
1869 if (secondsToWaitLog
<= 0) {
1870 infolog("Error during execution of maintenance function: %s", e
.what());
1871 secondsToWaitLog
= 61;
1873 secondsToWaitLog
-= interval
;
1879 if (counter
>= g_cacheCleaningDelay
) {
1880 auto localPools
= g_pools
.getLocal();
1881 std::shared_ptr
<DNSDistPacketCache
> packetCache
= nullptr;
1882 for (const auto& entry
: *localPools
) {
1883 packetCache
= entry
.second
->packetCache
;
1885 size_t upTo
= (packetCache
->getMaxEntries()* (100 - g_cacheCleaningPercentage
)) / 100;
1886 packetCache
->purgeExpired(upTo
);
1892 // ponder pruning g_dynblocks of expired entries here
1897 void* healthChecksThread()
1904 if(g_tcpclientthreads
->getQueuedCount() > 1 && !g_tcpclientthreads
->hasReachedMaxThreads())
1905 g_tcpclientthreads
->addTCPClientThread();
1907 auto states
= g_dstates
.getLocal(); // this points to the actual shared_ptrs!
1908 for(auto& dss
: *states
) {
1909 if(dss
->availability
==DownstreamState::Availability::Auto
) {
1910 bool newState
=upCheck(*dss
);
1912 if (dss
->currentCheckFailures
!= 0) {
1913 dss
->currentCheckFailures
= 0;
1916 else if (!newState
&& dss
->upStatus
) {
1917 dss
->currentCheckFailures
++;
1918 if (dss
->currentCheckFailures
< dss
->maxCheckFailures
) {
1923 if(newState
!= dss
->upStatus
) {
1924 warnlog("Marking downstream %s as '%s'", dss
->getNameWithAddr(), newState
? "up" : "down");
1926 if (newState
&& !dss
->connected
) {
1927 newState
= dss
->reconnect();
1929 if (dss
->connected
&& !dss
->threadStarted
.test_and_set()) {
1930 dss
->tid
= thread(responderThread
, dss
);
1934 dss
->upStatus
= newState
;
1935 dss
->currentCheckFailures
= 0;
1936 if (g_snmpAgent
&& g_snmpTrapsEnabled
) {
1937 g_snmpAgent
->sendBackendStatusChangeTrap(dss
);
1942 auto delta
= dss
->sw
.udiffAndSet()/1000000.0;
1943 dss
->queryLoad
= 1.0*(dss
->queries
.load() - dss
->prev
.queries
.load())/delta
;
1944 dss
->dropRate
= 1.0*(dss
->reuseds
.load() - dss
->prev
.reuseds
.load())/delta
;
1945 dss
->prev
.queries
.store(dss
->queries
.load());
1946 dss
->prev
.reuseds
.store(dss
->reuseds
.load());
1948 for(IDState
& ids
: dss
->idStates
) { // timeouts
1949 int origFD
= ids
.origFD
;
1950 if(origFD
>=0 && ids
.age
++ > g_udpTimeout
) {
1951 /* We set origFD to -1 as soon as possible
1952 to limit the risk of racing with the
1954 The UDP client thread only checks origFD to
1955 know whether outstanding has to be incremented,
1956 so the sooner the better any way since we _will_
1959 if (ids
.origFD
.exchange(-1) != origFD
) {
1960 /* this state has been altered in the meantime,
1961 don't go anywhere near it */
1967 g_stats
.downstreamTimeouts
++; // this is an 'actively' discovered timeout
1968 vinfolog("Had a downstream timeout from %s (%s) for query for %s|%s from %s",
1969 dss
->remote
.toStringWithPort(), dss
->name
,
1970 ids
.qname
.toString(), QType(ids
.qtype
).getName(), ids
.origRemote
.toStringWithPort());
1975 struct dnsheader fake
;
1976 memset(&fake
, 0, sizeof(fake
));
1977 fake
.id
= ids
.origID
;
1979 g_rings
.insertResponse(ts
, ids
.origRemote
, ids
.qname
, ids
.qtype
, std::numeric_limits
<unsigned int>::max(), 0, fake
, dss
->remote
);
1987 static void bindAny(int af
, int sock
)
1989 __attribute__((unused
)) int one
= 1;
1992 if (setsockopt(sock
, IPPROTO_IP
, IP_FREEBIND
, &one
, sizeof(one
)) < 0)
1993 warnlog("Warning: IP_FREEBIND setsockopt failed: %s", strerror(errno
));
1998 if (setsockopt(sock
, IPPROTO_IP
, IP_BINDANY
, &one
, sizeof(one
)) < 0)
1999 warnlog("Warning: IP_BINDANY setsockopt failed: %s", strerror(errno
));
2003 if (setsockopt(sock
, IPPROTO_IPV6
, IPV6_BINDANY
, &one
, sizeof(one
)) < 0)
2004 warnlog("Warning: IPV6_BINDANY setsockopt failed: %s", strerror(errno
));
2007 if (setsockopt(sock
, SOL_SOCKET
, SO_BINDANY
, &one
, sizeof(one
)) < 0)
2008 warnlog("Warning: SO_BINDANY setsockopt failed: %s", strerror(errno
));
2012 static void dropGroupPrivs(gid_t gid
)
2015 if (setgid(gid
) == 0) {
2016 if (setgroups(0, NULL
) < 0) {
2017 warnlog("Warning: Unable to drop supplementary gids: %s", strerror(errno
));
2021 warnlog("Warning: Unable to set group ID to %d: %s", gid
, strerror(errno
));
2026 static void dropUserPrivs(uid_t uid
)
2029 if(setuid(uid
) < 0) {
2030 warnlog("Warning: Unable to set user ID to %d: %s", uid
, strerror(errno
));
2035 static void checkFileDescriptorsLimits(size_t udpBindsCount
, size_t tcpBindsCount
)
2037 /* stdin, stdout, stderr */
2038 size_t requiredFDsCount
= 3;
2039 auto backends
= g_dstates
.getLocal();
2040 /* UDP sockets to backends */
2041 size_t backendUDPSocketsCount
= 0;
2042 for (const auto& backend
: *backends
) {
2043 backendUDPSocketsCount
+= backend
->sockets
.size();
2045 requiredFDsCount
+= backendUDPSocketsCount
;
2046 /* TCP sockets to backends */
2047 requiredFDsCount
+= (backends
->size() * g_maxTCPClientThreads
);
2048 /* listening sockets */
2049 requiredFDsCount
+= udpBindsCount
;
2050 requiredFDsCount
+= tcpBindsCount
;
2051 /* max TCP connections currently served */
2052 requiredFDsCount
+= g_maxTCPClientThreads
;
2053 /* max pipes for communicating between TCP acceptors and client threads */
2054 requiredFDsCount
+= (g_maxTCPClientThreads
* 2);
2055 /* max TCP queued connections */
2056 requiredFDsCount
+= g_maxTCPQueuedConnections
;
2057 /* DelayPipe pipe */
2058 requiredFDsCount
+= 2;
2061 /* webserver main socket */
2063 /* console main socket */
2070 getrlimit(RLIMIT_NOFILE
, &rl
);
2071 if (rl
.rlim_cur
<= requiredFDsCount
) {
2072 warnlog("Warning, this configuration can use more than %d file descriptors, web server and console connections not included, and the current limit is %d.", std::to_string(requiredFDsCount
), std::to_string(rl
.rlim_cur
));
2074 warnlog("You can increase this value by using LimitNOFILE= in the systemd unit file or ulimit.");
2076 warnlog("You can increase this value by using ulimit.");
2083 vector
<string
> locals
;
2084 vector
<string
> remotes
;
2085 bool checkConfig
{false};
2086 bool beClient
{false};
2087 bool beSupervised
{false};
2094 std::atomic
<bool> g_configurationDone
{false};
2099 cout
<<"Syntax: dnsdist [-C,--config file] [-c,--client [IP[:PORT]]]\n";
2100 cout
<<"[-e,--execute cmd] [-h,--help] [-l,--local addr]\n";
2101 cout
<<"[-v,--verbose] [--check-config] [--version]\n";
2103 cout
<<"-a,--acl netmask Add this netmask to the ACL\n";
2104 cout
<<"-C,--config file Load configuration from 'file'\n";
2105 cout
<<"-c,--client Operate as a client, connect to dnsdist. This reads\n";
2106 cout
<<" controlSocket from your configuration file, but also\n";
2107 cout
<<" accepts an IP:PORT argument\n";
2108 #ifdef HAVE_LIBSODIUM
2109 cout
<<"-k,--setkey KEY Use KEY for encrypted communication to dnsdist. This\n";
2110 cout
<<" is similar to setting setKey in the configuration file.\n";
2111 cout
<<" NOTE: this will leak this key in your shell's history\n";
2112 cout
<<" and in the systems running process list.\n";
2114 cout
<<"--check-config Validate the configuration file and exit. The exit-code\n";
2115 cout
<<" reflects the validation, 0 is OK, 1 means an error.\n";
2116 cout
<<" Any errors are printed as well.\n";
2117 cout
<<"-e,--execute cmd Connect to dnsdist and execute 'cmd'\n";
2118 cout
<<"-g,--gid gid Change the process group ID after binding sockets\n";
2119 cout
<<"-h,--help Display this helpful message\n";
2120 cout
<<"-l,--local address Listen on this local address\n";
2121 cout
<<"--supervised Don't open a console, I'm supervised\n";
2122 cout
<<" (use with e.g. systemd and daemontools)\n";
2123 cout
<<"--disable-syslog Don't log to syslog, only to stdout\n";
2124 cout
<<" (use with e.g. systemd)\n";
2125 cout
<<"-u,--uid uid Change the process user ID after binding sockets\n";
2126 cout
<<"-v,--verbose Enable verbose mode\n";
2127 cout
<<"-V,--version Show dnsdist version information and exit\n";
2130 int main(int argc
, char** argv
)
2133 size_t udpBindsCount
= 0;
2134 size_t tcpBindsCount
= 0;
2135 rl_attempted_completion_function
= my_completion
;
2136 rl_completion_append_character
= 0;
2138 signal(SIGPIPE
, SIG_IGN
);
2139 signal(SIGCHLD
, SIG_IGN
);
2140 openlog("dnsdist", LOG_PID
, LOG_DAEMON
);
2142 #ifdef HAVE_LIBSODIUM
2143 if (sodium_init() == -1) {
2144 cerr
<<"Unable to initialize crypto library"<<endl
;
2147 g_hashperturb
=randombytes_uniform(0xffffffff);
2148 srandom(randombytes_uniform(0xffffffff));
2152 gettimeofday(&tv
, 0);
2153 srandom(tv
.tv_sec
^ tv
.tv_usec
^ getpid());
2154 g_hashperturb
=random();
2158 ComboAddress clientAddress
= ComboAddress();
2159 g_cmdLine
.config
=SYSCONFDIR
"/dnsdist.conf";
2160 struct option longopts
[]={
2161 {"acl", required_argument
, 0, 'a'},
2162 {"check-config", no_argument
, 0, 1},
2163 {"client", no_argument
, 0, 'c'},
2164 {"config", required_argument
, 0, 'C'},
2165 {"disable-syslog", no_argument
, 0, 2},
2166 {"execute", required_argument
, 0, 'e'},
2167 {"gid", required_argument
, 0, 'g'},
2168 {"help", no_argument
, 0, 'h'},
2169 {"local", required_argument
, 0, 'l'},
2170 {"setkey", required_argument
, 0, 'k'},
2171 {"supervised", no_argument
, 0, 3},
2172 {"uid", required_argument
, 0, 'u'},
2173 {"verbose", no_argument
, 0, 'v'},
2174 {"version", no_argument
, 0, 'V'},
2180 int c
=getopt_long(argc
, argv
, "a:cC:e:g:hk:l:u:vV", longopts
, &longindex
);
2185 g_cmdLine
.checkConfig
=true;
2191 g_cmdLine
.beSupervised
=true;
2194 g_cmdLine
.config
=optarg
;
2197 g_cmdLine
.beClient
=true;
2200 g_cmdLine
.command
=optarg
;
2203 g_cmdLine
.gid
=optarg
;
2206 cout
<<"dnsdist "<<VERSION
<<endl
;
2213 g_ACL
.modify([optstring
](NetmaskGroup
& nmg
) { nmg
.addMask(optstring
); });
2216 #ifdef HAVE_LIBSODIUM
2217 if (B64Decode(string(optarg
), g_consoleKey
) < 0) {
2218 cerr
<<"Unable to decode key '"<<optarg
<<"'."<<endl
;
2222 cerr
<<"dnsdist has been built without libsodium, -k/--setkey is unsupported."<<endl
;
2227 g_cmdLine
.locals
.push_back(trim_copy(string(optarg
)));
2230 g_cmdLine
.uid
=optarg
;
2236 #ifdef LUAJIT_VERSION
2237 cout
<<"dnsdist "<<VERSION
<<" ("<<LUA_RELEASE
<<" ["<<LUAJIT_VERSION
<<"])"<<endl
;
2239 cout
<<"dnsdist "<<VERSION
<<" ("<<LUA_RELEASE
<<")"<<endl
;
2241 cout
<<"Enabled features: ";
2242 #ifdef HAVE_DNS_OVER_TLS
2243 cout
<<"dns-over-tls(";
2255 #ifdef HAVE_DNSCRYPT
2264 #ifdef HAVE_LIBSODIUM
2267 #ifdef HAVE_PROTOBUF
2273 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
2274 cout
<<"recvmmsg/sendmmsg ";
2276 #ifdef HAVE_NET_SNMP
2286 //getopt_long printed an error message.
2295 for(auto p
= argv
; *p
; ++p
) {
2296 if(g_cmdLine
.beClient
) {
2297 clientAddress
= ComboAddress(*p
, 5199);
2299 g_cmdLine
.remotes
.push_back(*p
);
2303 ServerPolicy leastOutstandingPol
{"leastOutstanding", leastOutstanding
, false};
2305 g_policy
.setState(leastOutstandingPol
);
2306 if(g_cmdLine
.beClient
|| !g_cmdLine
.command
.empty()) {
2307 setupLua(true, g_cmdLine
.config
);
2308 if (clientAddress
!= ComboAddress())
2309 g_serverControl
= clientAddress
;
2310 doClient(g_serverControl
, g_cmdLine
.command
);
2311 _exit(EXIT_SUCCESS
);
2314 auto acl
= g_ACL
.getCopy();
2316 for(auto& addr
: {"127.0.0.0/8", "10.0.0.0/8", "100.64.0.0/10", "169.254.0.0/16", "192.168.0.0/16", "172.16.0.0/12", "::1/128", "fc00::/7", "fe80::/10"})
2318 g_ACL
.setState(acl
);
2321 auto consoleACL
= g_consoleACL
.getCopy();
2322 for (const auto& mask
: { "127.0.0.1/8", "::1/128" }) {
2323 consoleACL
.addMask(mask
);
2325 g_consoleACL
.setState(consoleACL
);
2327 if (g_cmdLine
.checkConfig
) {
2328 setupLua(true, g_cmdLine
.config
);
2329 // No exception was thrown
2330 infolog("Configuration '%s' OK!", g_cmdLine
.config
);
2331 _exit(EXIT_SUCCESS
);
2334 auto todo
=setupLua(false, g_cmdLine
.config
);
2336 auto localPools
= g_pools
.getCopy();
2338 bool precompute
= false;
2339 if (g_policy
.getLocal()->name
== "chashed") {
2342 for (const auto& entry
: localPools
) {
2343 if (entry
.second
->policy
!= nullptr && entry
.second
->policy
->name
== "chashed") {
2350 vinfolog("Pre-computing hashes for consistent hash load-balancing policy");
2351 // pre compute hashes
2352 auto backends
= g_dstates
.getLocal();
2353 for (auto& backend
: *backends
) {
2359 if(g_cmdLine
.locals
.size()) {
2361 for(auto loc
: g_cmdLine
.locals
)
2362 g_locals
.push_back(std::make_tuple(ComboAddress(loc
, 53), true, false, 0, "", std::set
<int>()));
2365 if(g_locals
.empty())
2366 g_locals
.push_back(std::make_tuple(ComboAddress("127.0.0.1", 53), true, false, 0, "", std::set
<int>()));
2368 g_configurationDone
= true;
2370 vector
<ClientState
*> toLaunch
;
2371 for(const auto& local
: g_locals
) {
2372 ClientState
* cs
= new ClientState
;
2373 cs
->local
= std::get
<0>(local
);
2374 cs
->udpFD
= SSocket(cs
->local
.sin4
.sin_family
, SOCK_DGRAM
, 0);
2375 if(cs
->local
.sin4
.sin_family
== AF_INET6
) {
2376 SSetsockopt(cs
->udpFD
, IPPROTO_IPV6
, IPV6_V6ONLY
, 1);
2378 //if(g_vm.count("bind-non-local"))
2379 bindAny(cs
->local
.sin4
.sin_family
, cs
->udpFD
);
2381 // if (!setSocketTimestamps(cs->udpFD))
2382 // g_log<<Logger::Warning<<"Unable to enable timestamp reporting for socket"<<endl;
2385 if(IsAnyAddress(cs
->local
)) {
2387 setsockopt(cs
->udpFD
, IPPROTO_IP
, GEN_IP_PKTINFO
, &one
, sizeof(one
)); // linux supports this, so why not - might fail on other systems
2388 #ifdef IPV6_RECVPKTINFO
2389 setsockopt(cs
->udpFD
, IPPROTO_IPV6
, IPV6_RECVPKTINFO
, &one
, sizeof(one
));
2393 if (std::get
<2>(local
)) {
2395 SSetsockopt(cs
->udpFD
, SOL_SOCKET
, SO_REUSEPORT
, 1);
2397 warnlog("SO_REUSEPORT has been configured on local address '%s' but is not supported", std::get
<0>(local
).toStringWithPort());
2401 const std::string
& itf
= std::get
<4>(local
);
2403 #ifdef SO_BINDTODEVICE
2404 int res
= setsockopt(cs
->udpFD
, SOL_SOCKET
, SO_BINDTODEVICE
, itf
.c_str(), itf
.length());
2406 warnlog("Error setting up the interface on local address '%s': %s", std::get
<0>(local
).toStringWithPort(), strerror(errno
));
2409 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", std::get
<0>(local
).toStringWithPort());
2414 if (g_defaultBPFFilter
) {
2415 cs
->attachFilter(g_defaultBPFFilter
);
2416 vinfolog("Attaching default BPF Filter to UDP frontend %s", cs
->local
.toStringWithPort());
2418 #endif /* HAVE_EBPF */
2420 cs
->cpus
= std::get
<5>(local
);
2422 SBind(cs
->udpFD
, cs
->local
);
2423 toLaunch
.push_back(cs
);
2424 g_frontends
.push_back(cs
);
2428 for(const auto& local
: g_locals
) {
2429 if(!std::get
<1>(local
)) { // no TCP/IP
2430 warnlog("Not providing TCP/IP service on local address '%s'", std::get
<0>(local
).toStringWithPort());
2433 ClientState
* cs
= new ClientState
;
2434 cs
->local
= std::get
<0>(local
);
2436 cs
->tcpFD
= SSocket(cs
->local
.sin4
.sin_family
, SOCK_STREAM
, 0);
2438 SSetsockopt(cs
->tcpFD
, SOL_SOCKET
, SO_REUSEADDR
, 1);
2439 #ifdef TCP_DEFER_ACCEPT
2440 SSetsockopt(cs
->tcpFD
, IPPROTO_TCP
, TCP_DEFER_ACCEPT
, 1);
2442 if (std::get
<3>(local
) > 0) {
2444 SSetsockopt(cs
->tcpFD
, IPPROTO_TCP
, TCP_FASTOPEN
, std::get
<3>(local
));
2446 warnlog("TCP Fast Open has been configured on local address '%s' but is not supported", std::get
<0>(local
).toStringWithPort());
2449 if(cs
->local
.sin4
.sin_family
== AF_INET6
) {
2450 SSetsockopt(cs
->tcpFD
, IPPROTO_IPV6
, IPV6_V6ONLY
, 1);
2453 /* no need to warn again if configured but support is not available, we already did for UDP */
2454 if (std::get
<2>(local
)) {
2455 SSetsockopt(cs
->tcpFD
, SOL_SOCKET
, SO_REUSEPORT
, 1);
2459 const std::string
& itf
= std::get
<4>(local
);
2461 #ifdef SO_BINDTODEVICE
2462 int res
= setsockopt(cs
->tcpFD
, SOL_SOCKET
, SO_BINDTODEVICE
, itf
.c_str(), itf
.length());
2464 warnlog("Error setting up the interface on local address '%s': %s", std::get
<0>(local
).toStringWithPort(), strerror(errno
));
2467 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", std::get
<0>(local
).toStringWithPort());
2472 if (g_defaultBPFFilter
) {
2473 cs
->attachFilter(g_defaultBPFFilter
);
2474 vinfolog("Attaching default BPF Filter to TCP frontend %s", cs
->local
.toStringWithPort());
2476 #endif /* HAVE_EBPF */
2478 // if(g_vm.count("bind-non-local"))
2479 bindAny(cs
->local
.sin4
.sin_family
, cs
->tcpFD
);
2480 SBind(cs
->tcpFD
, cs
->local
);
2481 SListen(cs
->tcpFD
, 64);
2482 warnlog("Listening on %s", cs
->local
.toStringWithPort());
2484 toLaunch
.push_back(cs
);
2485 g_frontends
.push_back(cs
);
2489 #ifdef HAVE_DNSCRYPT
2490 for(auto& dcLocal
: g_dnsCryptLocals
) {
2491 ClientState
* cs
= new ClientState
;
2492 cs
->local
= std::get
<0>(dcLocal
);
2493 cs
->dnscryptCtx
= std::get
<1>(dcLocal
);
2494 cs
->udpFD
= SSocket(cs
->local
.sin4
.sin_family
, SOCK_DGRAM
, 0);
2495 if(cs
->local
.sin4
.sin_family
== AF_INET6
) {
2496 SSetsockopt(cs
->udpFD
, IPPROTO_IPV6
, IPV6_V6ONLY
, 1);
2498 bindAny(cs
->local
.sin4
.sin_family
, cs
->udpFD
);
2499 if(IsAnyAddress(cs
->local
)) {
2501 setsockopt(cs
->udpFD
, IPPROTO_IP
, GEN_IP_PKTINFO
, &one
, sizeof(one
)); // linux supports this, so why not - might fail on other systems
2502 #ifdef IPV6_RECVPKTINFO
2503 setsockopt(cs
->udpFD
, IPPROTO_IPV6
, IPV6_RECVPKTINFO
, &one
, sizeof(one
));
2506 if (std::get
<2>(dcLocal
)) {
2508 SSetsockopt(cs
->udpFD
, SOL_SOCKET
, SO_REUSEPORT
, 1);
2510 warnlog("SO_REUSEPORT has been configured on local address '%s' but is not supported", std::get
<0>(dcLocal
).toStringWithPort());
2514 const std::string
& itf
= std::get
<4>(dcLocal
);
2516 #ifdef SO_BINDTODEVICE
2517 int res
= setsockopt(cs
->udpFD
, SOL_SOCKET
, SO_BINDTODEVICE
, itf
.c_str(), itf
.length());
2519 warnlog("Error setting up the interface on local address '%s': %s", std::get
<0>(dcLocal
).toStringWithPort(), strerror(errno
));
2522 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", std::get
<0>(dcLocal
).toStringWithPort());
2527 if (g_defaultBPFFilter
) {
2528 cs
->attachFilter(g_defaultBPFFilter
);
2529 vinfolog("Attaching default BPF Filter to UDP DNSCrypt frontend %s", cs
->local
.toStringWithPort());
2531 #endif /* HAVE_EBPF */
2532 SBind(cs
->udpFD
, cs
->local
);
2533 toLaunch
.push_back(cs
);
2534 g_frontends
.push_back(cs
);
2537 cs
= new ClientState
;
2538 cs
->local
= std::get
<0>(dcLocal
);
2539 cs
->dnscryptCtx
= std::get
<1>(dcLocal
);
2540 cs
->tcpFD
= SSocket(cs
->local
.sin4
.sin_family
, SOCK_STREAM
, 0);
2541 SSetsockopt(cs
->tcpFD
, SOL_SOCKET
, SO_REUSEADDR
, 1);
2542 #ifdef TCP_DEFER_ACCEPT
2543 SSetsockopt(cs
->tcpFD
, IPPROTO_TCP
, TCP_DEFER_ACCEPT
, 1);
2545 if (std::get
<3>(dcLocal
) > 0) {
2547 SSetsockopt(cs
->tcpFD
, IPPROTO_TCP
, TCP_FASTOPEN
, std::get
<3>(dcLocal
));
2549 warnlog("TCP Fast Open has been configured on local address '%s' but is not supported", std::get
<0>(dcLocal
).toStringWithPort());
2554 /* no need to warn again if configured but support is not available, we already did for UDP */
2555 if (std::get
<2>(dcLocal
)) {
2556 SSetsockopt(cs
->tcpFD
, SOL_SOCKET
, SO_REUSEPORT
, 1);
2561 #ifdef SO_BINDTODEVICE
2562 int res
= setsockopt(cs
->tcpFD
, SOL_SOCKET
, SO_BINDTODEVICE
, itf
.c_str(), itf
.length());
2564 warnlog("Error setting up the interface on local address '%s': %s", std::get
<0>(dcLocal
).toStringWithPort(), strerror(errno
));
2567 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", std::get
<0>(dcLocal
).toStringWithPort());
2571 if(cs
->local
.sin4
.sin_family
== AF_INET6
) {
2572 SSetsockopt(cs
->tcpFD
, IPPROTO_IPV6
, IPV6_V6ONLY
, 1);
2575 if (g_defaultBPFFilter
) {
2576 cs
->attachFilter(g_defaultBPFFilter
);
2577 vinfolog("Attaching default BPF Filter to TCP DNSCrypt frontend %s", cs
->local
.toStringWithPort());
2579 #endif /* HAVE_EBPF */
2581 cs
->cpus
= std::get
<5>(dcLocal
);
2583 bindAny(cs
->local
.sin4
.sin_family
, cs
->tcpFD
);
2584 SBind(cs
->tcpFD
, cs
->local
);
2585 SListen(cs
->tcpFD
, 64);
2586 warnlog("Listening on %s", cs
->local
.toStringWithPort());
2587 toLaunch
.push_back(cs
);
2588 g_frontends
.push_back(cs
);
2593 for(auto& frontend
: g_tlslocals
) {
2594 ClientState
* cs
= new ClientState
;
2595 cs
->local
= frontend
->d_addr
;
2596 cs
->tcpFD
= SSocket(cs
->local
.sin4
.sin_family
, SOCK_STREAM
, 0);
2597 SSetsockopt(cs
->tcpFD
, SOL_SOCKET
, SO_REUSEADDR
, 1);
2598 #ifdef TCP_DEFER_ACCEPT
2599 SSetsockopt(cs
->tcpFD
, IPPROTO_TCP
, TCP_DEFER_ACCEPT
, 1);
2601 if (frontend
->d_tcpFastOpenQueueSize
> 0) {
2603 SSetsockopt(cs
->tcpFD
, IPPROTO_TCP
, TCP_FASTOPEN
, frontend
->d_tcpFastOpenQueueSize
);
2605 warnlog("TCP Fast Open has been configured on local address '%s' but is not supported", cs
->local
.toStringWithPort());
2608 if (frontend
->d_reusePort
) {
2610 SSetsockopt(cs
->tcpFD
, SOL_SOCKET
, SO_REUSEPORT
, 1);
2612 warnlog("SO_REUSEPORT has been configured on local address '%s' but is not supported", cs
->local
.toStringWithPort());
2615 if(cs
->local
.sin4
.sin_family
== AF_INET6
) {
2616 SSetsockopt(cs
->tcpFD
, IPPROTO_IPV6
, IPV6_V6ONLY
, 1);
2619 if (!frontend
->d_interface
.empty()) {
2620 #ifdef SO_BINDTODEVICE
2621 int res
= setsockopt(cs
->tcpFD
, SOL_SOCKET
, SO_BINDTODEVICE
, frontend
->d_interface
.c_str(), frontend
->d_interface
.length());
2623 warnlog("Error setting up the interface on local address '%s': %s", cs
->local
.toStringWithPort(), strerror(errno
));
2626 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", cs
->local
.toStringWithPort());
2630 cs
->cpus
= frontend
->d_cpus
;
2632 bindAny(cs
->local
.sin4
.sin_family
, cs
->tcpFD
);
2633 if (frontend
->setupTLS()) {
2634 cs
->tlsFrontend
= frontend
;
2635 SBind(cs
->tcpFD
, cs
->local
);
2636 SListen(cs
->tcpFD
, 64);
2637 warnlog("Listening on %s for TLS", cs
->local
.toStringWithPort());
2638 toLaunch
.push_back(cs
);
2639 g_frontends
.push_back(cs
);
2644 errlog("Error while setting up TLS on local address '%s', exiting", cs
->local
.toStringWithPort());
2645 _exit(EXIT_FAILURE
);
2649 warnlog("dnsdist %s comes with ABSOLUTELY NO WARRANTY. This is free software, and you are welcome to redistribute it according to the terms of the GPL version 2", VERSION
);
2653 g_ACL
.getLocal()->toStringVector(&vec
);
2654 for(const auto& s
: vec
) {
2659 infolog("ACL allowing queries from: %s", acls
.c_str());
2662 g_consoleACL
.getLocal()->toStringVector(&vec
);
2663 for (const auto& entry
: vec
) {
2664 if (!acls
.empty()) {
2669 infolog("Console ACL allowing connections from: %s", acls
.c_str());
2671 #ifdef HAVE_LIBSODIUM
2672 if (g_consoleEnabled
&& g_consoleKey
.empty()) {
2673 warnlog("Warning, the console has been enabled via 'controlSocket()' but no key has been set with 'setKey()' so all connections will fail until a key has been set");
2680 if(!g_cmdLine
.gid
.empty())
2681 newgid
= strToGID(g_cmdLine
.gid
.c_str());
2683 if(!g_cmdLine
.uid
.empty())
2684 newuid
= strToUID(g_cmdLine
.uid
.c_str());
2686 dropGroupPrivs(newgid
);
2687 dropUserPrivs(newuid
);
2689 /* this need to be done _after_ dropping privileges */
2690 g_delay
= new DelayPipe
<DelayedPacket
>();
2696 g_tcpclientthreads
= std::make_shared
<TCPClientCollection
>(g_maxTCPClientThreads
, g_useTCPSinglePipe
);
2701 localPools
= g_pools
.getCopy();
2702 /* create the default pool no matter what */
2703 createPoolIfNotExists(localPools
, "");
2704 if(g_cmdLine
.remotes
.size()) {
2705 for(const auto& address
: g_cmdLine
.remotes
) {
2706 auto ret
=std::make_shared
<DownstreamState
>(ComboAddress(address
, 53));
2707 addServerToPool(localPools
, "", ret
);
2708 if (ret
->connected
&& !ret
->threadStarted
.test_and_set()) {
2709 ret
->tid
= thread(responderThread
, ret
);
2711 g_dstates
.modify([ret
](servers_t
& servers
) { servers
.push_back(ret
); });
2714 g_pools
.setState(localPools
);
2716 if(g_dstates
.getLocal()->empty()) {
2717 errlog("No downstream servers defined: all packets will get dropped");
2718 // you might define them later, but you need to know
2721 checkFileDescriptorsLimits(udpBindsCount
, tcpBindsCount
);
2723 for(auto& dss
: g_dstates
.getCopy()) { // it is a copy, but the internal shared_ptrs are the real deal
2724 if(dss
->availability
==DownstreamState::Availability::Auto
) {
2725 bool newState
=upCheck(*dss
);
2726 warnlog("Marking downstream %s as '%s'", dss
->getNameWithAddr(), newState
? "up" : "down");
2727 dss
->upStatus
= newState
;
2731 for(auto& cs
: toLaunch
) {
2732 if (cs
->udpFD
>= 0) {
2733 thread
t1(udpClientThread
, cs
);
2734 if (!cs
->cpus
.empty()) {
2735 mapThreadToCPUList(t1
.native_handle(), cs
->cpus
);
2739 else if (cs
->tcpFD
>= 0) {
2740 thread
t1(tcpAcceptorThread
, cs
);
2741 if (!cs
->cpus
.empty()) {
2742 mapThreadToCPUList(t1
.native_handle(), cs
->cpus
);
2748 thread
carbonthread(carbonDumpThread
);
2749 carbonthread
.detach();
2751 thread
stattid(maintThread
);
2754 thread
healththread(healthChecksThread
);
2756 if(g_cmdLine
.beSupervised
) {
2758 sd_notify(0, "READY=1");
2760 healththread
.join();
2763 healththread
.detach();
2766 _exit(EXIT_SUCCESS
);
2769 catch(const LuaContext::ExecutionErrorException
& e
) {
2771 errlog("Fatal Lua error: %s", e
.what());
2772 std::rethrow_if_nested(e
);
2773 } catch(const std::exception
& ne
) {
2774 errlog("Details: %s", ne
.what());
2776 catch(PDNSException
&ae
)
2778 errlog("Fatal pdns error: %s", ae
.reason
);
2780 _exit(EXIT_FAILURE
);
2782 catch(std::exception
&e
)
2784 errlog("Fatal error: %s", e
.what());
2785 _exit(EXIT_FAILURE
);
2787 catch(PDNSException
&ae
)
2789 errlog("Fatal pdns error: %s", ae
.reason
);
2790 _exit(EXIT_FAILURE
);