2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
29 #include <netinet/tcp.h>
31 #include <sys/resource.h>
34 #if defined (__OpenBSD__) || defined(__NetBSD__)
35 #include <readline/readline.h>
37 #include <editline/readline.h>
41 #include <systemd/sd-daemon.h>
45 #include "dnsdist-cache.hh"
46 #include "dnsdist-console.hh"
47 #include "dnsdist-ecs.hh"
48 #include "dnsdist-lua.hh"
49 #include "dnsdist-rings.hh"
52 #include "delaypipe.hh"
55 #include "dnswriter.hh"
56 #include "ednsoptions.hh"
60 #include "sodcrypto.hh"
64 thread_local
boost::uuids::random_generator t_uuidGenerator
;
68 Receiver is currently single threaded
69 not *that* bad actually, but now that we are thread safe, might want to scale
73 Set of Rules, if one matches, it leads to an Action
74 Both rules and actions could conceivably be Lua based.
75 On the C++ side, both could be inherited from a class Rule and a class Action,
76 on the Lua side we can't do that. */
82 struct DNSDistStats g_stats
;
83 MetricDefinitionStorage g_metricDefinitions
;
85 uint16_t g_maxOutstanding
{10240};
86 bool g_verboseHealthChecks
{false};
87 uint32_t g_staleCacheEntriesTTL
{0};
90 GlobalStateHolder
<NetmaskGroup
> g_ACL
;
91 string g_outputBuffer
;
93 vector
<std::tuple
<ComboAddress
, bool, bool, int, string
, std::set
<int>>> g_locals
;
94 std::vector
<std::shared_ptr
<TLSFrontend
>> g_tlslocals
;
96 std::vector
<std::tuple
<ComboAddress
,std::shared_ptr
<DNSCryptContext
>,bool, int, string
, std::set
<int> >> g_dnsCryptLocals
;
99 shared_ptr
<BPFFilter
> g_defaultBPFFilter
;
100 std::vector
<std::shared_ptr
<DynBPFFilter
> > g_dynBPFFilters
;
101 #endif /* HAVE_EBPF */
102 vector
<ClientState
*> g_frontends
;
103 GlobalStateHolder
<pools_t
> g_pools
;
104 size_t g_udpVectorSize
{1};
106 bool g_snmpEnabled
{false};
107 bool g_snmpTrapsEnabled
{false};
108 DNSDistSNMPAgent
* g_snmpAgent
{nullptr};
110 /* UDP: the grand design. Per socket we listen on for incoming queries there is one thread.
111 Then we have a bunch of connected sockets for talking to downstream servers.
112 We send directly to those sockets.
114 For the return path, per downstream server we have a thread that listens to responses.
116 Per socket there is an array of 2^16 states, when we send out a packet downstream, we note
117 there the original requestor and the original id. The new ID is the offset in the array.
119 When an answer comes in on a socket, we look up the offset by the id, and lob it to the
122 IDs are assigned by atomic increments of the socket offset.
125 GlobalStateHolder
<vector
<DNSDistRuleAction
> > g_rulactions
;
126 GlobalStateHolder
<vector
<DNSDistResponseRuleAction
> > g_resprulactions
;
127 GlobalStateHolder
<vector
<DNSDistResponseRuleAction
> > g_cachehitresprulactions
;
128 GlobalStateHolder
<vector
<DNSDistResponseRuleAction
> > g_selfansweredresprulactions
;
133 GlobalStateHolder
<servers_t
> g_dstates
;
134 GlobalStateHolder
<NetmaskTree
<DynBlock
>> g_dynblockNMG
;
135 GlobalStateHolder
<SuffixMatchTree
<DynBlock
>> g_dynblockSMT
;
136 DNSAction::Action g_dynBlockAction
= DNSAction::Action::Drop
;
137 int g_tcpRecvTimeout
{2};
138 int g_tcpSendTimeout
{2};
141 bool g_servFailOnNoPolicy
{false};
142 bool g_truncateTC
{false};
145 static void truncateTC(char* packet
, uint16_t* len
, unsigned int consumed
)
148 *len
=(uint16_t) (sizeof(dnsheader
)+consumed
+DNS_TYPE_SIZE
+DNS_CLASS_SIZE
);
149 struct dnsheader
* dh
= reinterpret_cast<struct dnsheader
*>(packet
);
150 dh
->ancount
= dh
->arcount
= dh
->nscount
= 0;
161 ComboAddress destination
;
162 ComboAddress origDest
;
166 if(origDest
.sin4
.sin_family
== 0) {
167 res
= sendto(fd
, packet
.c_str(), packet
.size(), 0, (struct sockaddr
*)&destination
, destination
.getSocklen());
170 res
= sendfromto(fd
, packet
.c_str(), packet
.size(), 0, origDest
, destination
);
174 vinfolog("Error sending delayed response to %s: %s", destination
.toStringWithPort(), strerror(err
));
179 DelayPipe
<DelayedPacket
> * g_delay
= 0;
181 void doLatencyStats(double udiff
)
183 if(udiff
< 1000) g_stats
.latency0_1
++;
184 else if(udiff
< 10000) g_stats
.latency1_10
++;
185 else if(udiff
< 50000) g_stats
.latency10_50
++;
186 else if(udiff
< 100000) g_stats
.latency50_100
++;
187 else if(udiff
< 1000000) g_stats
.latency100_1000
++;
188 else g_stats
.latencySlow
++;
190 auto doAvg
= [](double& var
, double n
, double weight
) {
191 var
= (weight
-1) * var
/weight
+ n
/weight
;
194 doAvg(g_stats
.latencyAvg100
, udiff
, 100);
195 doAvg(g_stats
.latencyAvg1000
, udiff
, 1000);
196 doAvg(g_stats
.latencyAvg10000
, udiff
, 10000);
197 doAvg(g_stats
.latencyAvg1000000
, udiff
, 1000000);
200 bool responseContentMatches(const char* response
, const uint16_t responseLen
, const DNSName
& qname
, const uint16_t qtype
, const uint16_t qclass
, const ComboAddress
& remote
, unsigned int& consumed
)
202 uint16_t rqtype
, rqclass
;
204 const struct dnsheader
* dh
= (struct dnsheader
*) response
;
206 if (responseLen
< sizeof(dnsheader
)) {
210 if (dh
->qdcount
== 0) {
211 if (dh
->rcode
!= RCode::NoError
&& dh
->rcode
!= RCode::NXDomain
) {
215 g_stats
.nonCompliantResponses
++;
221 rqname
=DNSName(response
, responseLen
, sizeof(dnsheader
), false, &rqtype
, &rqclass
, &consumed
);
223 catch(std::exception
& e
) {
224 if(responseLen
> (ssize_t
)sizeof(dnsheader
))
225 infolog("Backend %s sent us a response with id %d that did not parse: %s", remote
.toStringWithPort(), ntohs(dh
->id
), e
.what());
226 g_stats
.nonCompliantResponses
++;
230 if (rqtype
!= qtype
|| rqclass
!= qclass
|| rqname
!= qname
) {
237 void restoreFlags(struct dnsheader
* dh
, uint16_t origFlags
)
239 static const uint16_t rdMask
= 1 << FLAGS_RD_OFFSET
;
240 static const uint16_t cdMask
= 1 << FLAGS_CD_OFFSET
;
241 static const uint16_t restoreFlagsMask
= UINT16_MAX
& ~(rdMask
| cdMask
);
242 uint16_t * flags
= getFlagsFromDNSHeader(dh
);
243 /* clear the flags we are about to restore */
244 *flags
&= restoreFlagsMask
;
245 /* only keep the flags we want to restore */
246 origFlags
&= ~restoreFlagsMask
;
247 /* set the saved flags as they were */
251 bool fixUpQueryTurnedResponse(DNSQuestion
& dq
, const uint16_t origFlags
)
253 restoreFlags(dq
.dh
, origFlags
);
255 return addEDNSToQueryTurnedResponse(dq
);
258 bool fixUpResponse(char** response
, uint16_t* responseLen
, size_t* responseSize
, const DNSName
& qname
, uint16_t origFlags
, bool ednsAdded
, bool ecsAdded
, std::vector
<uint8_t>& rewrittenResponse
, uint16_t addRoom
)
260 struct dnsheader
* dh
= (struct dnsheader
*) *response
;
262 if (*responseLen
< sizeof(dnsheader
)) {
266 restoreFlags(dh
, origFlags
);
268 if (*responseLen
== sizeof(dnsheader
)) {
273 string realname
= qname
.toDNSString();
274 if (*responseLen
>= (sizeof(dnsheader
) + realname
.length())) {
275 memcpy(*response
+ sizeof(dnsheader
), realname
.c_str(), realname
.length());
279 if (ednsAdded
|| ecsAdded
) {
284 const std::string
responseStr(*response
, *responseLen
);
285 int res
= locateEDNSOptRR(responseStr
, &optStart
, &optLen
, &last
);
289 /* we added the entire OPT RR,
290 therefore we need to remove it entirely */
292 /* simply remove the last AR */
293 *responseLen
-= optLen
;
294 uint16_t arcount
= ntohs(dh
->arcount
);
296 dh
->arcount
= htons(arcount
);
299 /* Removing an intermediary RR could lead to compression error */
300 if (rewriteResponseWithoutEDNS(responseStr
, rewrittenResponse
) == 0) {
301 *responseLen
= rewrittenResponse
.size();
302 if (addRoom
&& (UINT16_MAX
- *responseLen
) > addRoom
) {
303 rewrittenResponse
.reserve(*responseLen
+ addRoom
);
305 *responseSize
= rewrittenResponse
.capacity();
306 *response
= reinterpret_cast<char*>(rewrittenResponse
.data());
309 warnlog("Error rewriting content");
314 /* the OPT RR was already present, but without ECS,
315 we need to remove the ECS option if any */
317 /* nothing after the OPT RR, we can simply remove the
319 size_t existingOptLen
= optLen
;
320 removeEDNSOptionFromOPT(*response
+ optStart
, &optLen
, EDNSOptionCode::ECS
);
321 *responseLen
-= (existingOptLen
- optLen
);
324 /* Removing an intermediary RR could lead to compression error */
325 if (rewriteResponseWithoutEDNSOption(responseStr
, EDNSOptionCode::ECS
, rewrittenResponse
) == 0) {
326 *responseLen
= rewrittenResponse
.size();
327 if (addRoom
&& (UINT16_MAX
- *responseLen
) > addRoom
) {
328 rewrittenResponse
.reserve(*responseLen
+ addRoom
);
330 *responseSize
= rewrittenResponse
.capacity();
331 *response
= reinterpret_cast<char*>(rewrittenResponse
.data());
334 warnlog("Error rewriting content");
345 bool encryptResponse(char* response
, uint16_t* responseLen
, size_t responseSize
, bool tcp
, std::shared_ptr
<DNSCryptQuery
> dnsCryptQuery
, dnsheader
** dh
, dnsheader
* dhCopy
)
348 uint16_t encryptedResponseLen
= 0;
350 /* save the original header before encrypting it in place */
351 if (dh
!= nullptr && *dh
!= nullptr && dhCopy
!= nullptr) {
352 memcpy(dhCopy
, *dh
, sizeof(dnsheader
));
356 int res
= dnsCryptQuery
->encryptResponse(response
, *responseLen
, responseSize
, tcp
, &encryptedResponseLen
);
358 *responseLen
= encryptedResponseLen
;
360 /* dropping response */
361 vinfolog("Error encrypting the response, dropping.");
369 static bool sendUDPResponse(int origFD
, char* response
, uint16_t responseLen
, int delayMsec
, const ComboAddress
& origDest
, const ComboAddress
& origRemote
)
371 if(delayMsec
&& g_delay
) {
372 DelayedPacket dp
{origFD
, string(response
,responseLen
), origRemote
, origDest
};
373 g_delay
->submit(dp
, delayMsec
);
377 if(origDest
.sin4
.sin_family
== 0) {
378 res
= sendto(origFD
, response
, responseLen
, 0, (struct sockaddr
*)&origRemote
, origRemote
.getSocklen());
381 res
= sendfromto(origFD
, response
, responseLen
, 0, origDest
, origRemote
);
385 vinfolog("Error sending response to %s: %s", origRemote
.toStringWithPort(), strerror(err
));
393 static int pickBackendSocketForSending(DownstreamState
* state
)
395 return state
->sockets
[state
->socketsOffset
++ % state
->sockets
.size()];
398 static void pickBackendSocketsReadyForReceiving(const std::shared_ptr
<DownstreamState
>& state
, std::vector
<int>& ready
)
402 if (state
->sockets
.size() == 1) {
403 ready
.push_back(state
->sockets
[0]);
408 std::lock_guard
<std::mutex
> lock(state
->socketsLock
);
409 state
->mplexer
->getAvailableFDs(ready
, -1);
413 // listens on a dedicated socket, lobs answers from downstream servers to original requestors
414 void* responderThread(std::shared_ptr
<DownstreamState
> dss
)
416 auto localRespRulactions
= g_resprulactions
.getLocal();
418 char packet
[4096 + DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE
];
419 /* when the answer is encrypted in place, we need to get a copy
420 of the original header before encryption to fill the ring buffer */
425 static_assert(sizeof(packet
) <= UINT16_MAX
, "Packet size should fit in a uint16_t");
426 vector
<uint8_t> rewrittenResponse
;
428 uint16_t queryId
= 0;
429 std::vector
<int> sockets
;
430 sockets
.reserve(dss
->sockets
.size());
433 dnsheader
* dh
= reinterpret_cast<struct dnsheader
*>(packet
);
435 pickBackendSocketsReadyForReceiving(dss
, sockets
);
436 for (const auto& fd
: sockets
) {
437 ssize_t got
= recv(fd
, packet
, sizeof(packet
), 0);
438 char * response
= packet
;
439 size_t responseSize
= sizeof(packet
);
441 if (got
< (ssize_t
) sizeof(dnsheader
))
444 uint16_t responseLen
= (uint16_t) got
;
447 if(queryId
>= dss
->idStates
.size())
450 IDState
* ids
= &dss
->idStates
[queryId
];
451 int origFD
= ids
->origFD
;
453 if(origFD
< 0) // duplicate
456 /* setting age to 0 to prevent the maintainer thread from
457 cleaning this IDS while we process the response.
458 We have already a copy of the origFD, so it would
459 mostly mess up the outstanding counter.
463 unsigned int consumed
= 0;
464 if (!responseContentMatches(response
, responseLen
, ids
->qname
, ids
->qtype
, ids
->qclass
, dss
->remote
, consumed
)) {
468 int oldFD
= ids
->origFD
.exchange(-1);
469 if (oldFD
== origFD
) {
470 /* we only decrement the outstanding counter if the value was not
471 altered in the meantime, which would mean that the state has been actively reused
472 and the other thread has not incremented the outstanding counter, so we don't
473 want it to be decremented twice. */
474 --dss
->outstanding
; // you'd think an attacker could game this, but we're using connected socket
477 if(dh
->tc
&& g_truncateTC
) {
478 truncateTC(response
, &responseLen
, consumed
);
481 dh
->id
= ids
->origID
;
483 uint16_t addRoom
= 0;
484 DNSResponse
dr(&ids
->qname
, ids
->qtype
, ids
->qclass
, consumed
, &ids
->origDest
, &ids
->origRemote
, dh
, sizeof(packet
), responseLen
, false, &ids
->sentTime
.d_start
);
486 dr
.uniqueId
= ids
->uniqueId
;
490 if (!processResponse(localRespRulactions
, dr
, &ids
->delayMsec
)) {
495 if (ids
->dnsCryptQuery
) {
496 addRoom
= DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE
;
499 if (!fixUpResponse(&response
, &responseLen
, &responseSize
, ids
->qname
, ids
->origFlags
, ids
->ednsAdded
, ids
->ecsAdded
, rewrittenResponse
, addRoom
)) {
503 if (ids
->packetCache
&& !ids
->skipCache
) {
504 ids
->packetCache
->insert(ids
->cacheKey
, ids
->subnet
, ids
->origFlags
, ids
->qname
, ids
->qtype
, ids
->qclass
, response
, responseLen
, false, dh
->rcode
, ids
->tempFailureTTL
);
507 if (ids
->cs
&& !ids
->cs
->muted
) {
509 if (!encryptResponse(response
, &responseLen
, responseSize
, false, ids
->dnsCryptQuery
, &dh
, &dhCopy
)) {
515 empty
.sin4
.sin_family
= 0;
516 /* if ids->destHarvested is false, origDest holds the listening address.
517 We don't want to use that as a source since it could be 0.0.0.0 for example. */
518 sendUDPResponse(origFD
, response
, responseLen
, ids
->delayMsec
, ids
->destHarvested
? ids
->origDest
: empty
, ids
->origRemote
);
523 double udiff
= ids
->sentTime
.udiff();
524 vinfolog("Got answer from %s, relayed to %s, took %f usec", dss
->remote
.toStringWithPort(), ids
->origRemote
.toStringWithPort(), udiff
);
528 g_rings
.insertResponse(ts
, ids
->origRemote
, ids
->qname
, ids
->qtype
, (unsigned int)udiff
, (unsigned int)got
, *dh
, dss
->remote
);
530 if(dh
->rcode
== RCode::ServFail
)
531 g_stats
.servfailResponses
++;
532 dss
->latencyUsec
= (127.0 * dss
->latencyUsec
/ 128.0) + udiff
/128.0;
534 doLatencyStats(udiff
);
536 /* if the FD is not -1, the state has been actively reused and we should
537 not alter anything */
538 if (ids
->origFD
== -1) {
540 ids
->dnsCryptQuery
= nullptr;
544 rewrittenResponse
.clear();
547 catch(const std::exception
& e
){
548 vinfolog("Got an error in UDP responder thread while parsing a response from %s, id %d: %s", dss
->remote
.toStringWithPort(), queryId
, e
.what());
553 catch(const std::exception
& e
)
555 errlog("UDP responder thread died because of exception: %s", e
.what());
558 catch(const PDNSException
& e
)
560 errlog("UDP responder thread died because of PowerDNS exception: %s", e
.reason
);
565 errlog("UDP responder thread died because of an exception: %s", "unknown");
569 bool DownstreamState::reconnect()
571 std::unique_lock
<std::mutex
> tl(connectLock
, std::try_to_lock
);
572 if (!tl
.owns_lock()) {
573 /* we are already reconnecting */
578 for (auto& fd
: sockets
) {
580 if (sockets
.size() > 1) {
581 std::lock_guard
<std::mutex
> lock(socketsLock
);
582 mplexer
->removeReadFD(fd
);
584 /* shutdown() is needed to wake up recv() in the responderThread */
585 shutdown(fd
, SHUT_RDWR
);
589 if (!IsAnyAddress(remote
)) {
590 fd
= SSocket(remote
.sin4
.sin_family
, SOCK_DGRAM
, 0);
591 if (!IsAnyAddress(sourceAddr
)) {
592 SSetsockopt(fd
, SOL_SOCKET
, SO_REUSEADDR
, 1);
593 SBind(fd
, sourceAddr
);
596 SConnect(fd
, remote
);
597 if (sockets
.size() > 1) {
598 std::lock_guard
<std::mutex
> lock(socketsLock
);
599 mplexer
->addReadFD(fd
, [](int, boost::any
) {});
603 catch(const std::runtime_error
& error
) {
604 infolog("Error connecting to new server with address %s: %s", remote
.toStringWithPort(), error
.what());
611 /* if at least one (re-)connection failed, close all sockets */
613 for (auto& fd
: sockets
) {
615 if (sockets
.size() > 1) {
616 std::lock_guard
<std::mutex
> lock(socketsLock
);
617 mplexer
->removeReadFD(fd
);
619 /* shutdown() is needed to wake up recv() in the responderThread */
620 shutdown(fd
, SHUT_RDWR
);
629 void DownstreamState::hash()
631 vinfolog("Computing hashes for id=%s and weight=%d", id
, weight
);
633 WriteLock
wl(&d_lock
);
636 std::string uuid
= boost::str(boost::format("%s-%d") % id
% w
);
637 unsigned int wshash
= burtleCI((const unsigned char*)uuid
.c_str(), uuid
.size(), g_hashperturb
);
638 hashes
.insert(wshash
);
643 void DownstreamState::setId(const boost::uuids::uuid
& newId
)
646 // compute hashes only if already done
647 if (!hashes
.empty()) {
652 void DownstreamState::setWeight(int newWeight
)
655 errlog("Error setting server's weight: downstream weight value must be greater than 0.");
659 if (!hashes
.empty()) {
664 DownstreamState::DownstreamState(const ComboAddress
& remote_
, const ComboAddress
& sourceAddr_
, unsigned int sourceItf_
, size_t numberOfSockets
): remote(remote_
), sourceAddr(sourceAddr_
), sourceItf(sourceItf_
)
666 pthread_rwlock_init(&d_lock
, nullptr);
667 id
= t_uuidGenerator();
668 threadStarted
.clear();
670 mplexer
= std::unique_ptr
<FDMultiplexer
>(FDMultiplexer::getMultiplexerSilent());
672 sockets
.resize(numberOfSockets
);
673 for (auto& fd
: sockets
) {
677 if (!IsAnyAddress(remote
)) {
679 idStates
.resize(g_maxOutstanding
);
681 infolog("Added downstream server %s", remote
.toStringWithPort());
686 std::mutex g_luamutex
;
689 GlobalStateHolder
<ServerPolicy
> g_policy
;
691 shared_ptr
<DownstreamState
> firstAvailable(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
693 for(auto& d
: servers
) {
694 if(d
.second
->isUp() && d
.second
->qps
.check())
697 return leastOutstanding(servers
, dq
);
700 // get server with least outstanding queries, and within those, with the lowest order, and within those: the fastest
701 shared_ptr
<DownstreamState
> leastOutstanding(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
703 if (servers
.size() == 1 && servers
[0].second
->isUp()) {
704 return servers
[0].second
;
707 vector
<pair
<tuple
<int,int,double>, shared_ptr
<DownstreamState
>>> poss
;
708 /* so you might wonder, why do we go through this trouble? The data on which we sort could change during the sort,
709 which would suck royally and could even lead to crashes. So first we snapshot on what we sort, and then we sort */
710 poss
.reserve(servers
.size());
711 for(auto& d
: servers
) {
712 if(d
.second
->isUp()) {
713 poss
.push_back({make_tuple(d
.second
->outstanding
.load(), d
.second
->order
, d
.second
->latencyUsec
), d
.second
});
717 return shared_ptr
<DownstreamState
>();
718 nth_element(poss
.begin(), poss
.begin(), poss
.end(), [](const decltype(poss
)::value_type
& a
, const decltype(poss
)::value_type
& b
) { return a
.first
< b
.first
; });
719 return poss
.begin()->second
;
722 shared_ptr
<DownstreamState
> valrandom(unsigned int val
, const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
724 vector
<pair
<int, shared_ptr
<DownstreamState
>>> poss
;
726 int max
= std::numeric_limits
<int>::max();
728 for(auto& d
: servers
) { // w=1, w=10 -> 1, 11
729 if(d
.second
->isUp()) {
730 // Don't overflow sum when adding high weights
731 if(d
.second
->weight
> max
- sum
) {
734 sum
+= d
.second
->weight
;
737 poss
.push_back({sum
, d
.second
});
741 // Catch poss & sum are empty to avoid SIGFPE
743 return shared_ptr
<DownstreamState
>();
746 auto p
= upper_bound(poss
.begin(), poss
.end(),r
, [](int r_
, const decltype(poss
)::value_type
& a
) { return r_
< a
.first
;});
748 return shared_ptr
<DownstreamState
>();
752 shared_ptr
<DownstreamState
> wrandom(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
754 return valrandom(random(), servers
, dq
);
757 uint32_t g_hashperturb
;
758 shared_ptr
<DownstreamState
> whashed(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
760 return valrandom(dq
->qname
->hash(g_hashperturb
), servers
, dq
);
763 shared_ptr
<DownstreamState
> chashed(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
765 std::map
<unsigned int, shared_ptr
<DownstreamState
>> circle
= {};
766 unsigned int qhash
= dq
->qname
->hash(g_hashperturb
);
767 unsigned int sel
= 0, max
= 0;
768 shared_ptr
<DownstreamState
> ret
= nullptr, last
= nullptr;
770 for (const auto& d
: servers
) {
771 if (d
.second
->isUp()) {
772 // make sure hashes have been computed
773 if (d
.second
->hashes
.empty()) {
777 ReadLock
rl(&(d
.second
->d_lock
));
778 const auto& server
= d
.second
;
779 // we want to keep track of the last hash
780 if (max
< *(server
->hashes
.rbegin())) {
781 max
= *(server
->hashes
.rbegin());
784 auto hash_it
= server
->hashes
.begin();
785 while (hash_it
!= server
->hashes
.end()
786 && *hash_it
< qhash
) {
787 if (*hash_it
> sel
) {
796 if (ret
!= nullptr) {
799 if (last
!= nullptr) {
802 return shared_ptr
<DownstreamState
>();
805 shared_ptr
<DownstreamState
> roundrobin(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
807 NumberedServerVector poss
;
809 for(auto& d
: servers
) {
810 if(d
.second
->isUp()) {
815 const auto *res
=&poss
;
820 return shared_ptr
<DownstreamState
>();
822 static unsigned int counter
;
824 return (*res
)[(counter
++) % res
->size()].second
;
827 ComboAddress g_serverControl
{"127.0.0.1:5199"};
829 std::shared_ptr
<ServerPool
> createPoolIfNotExists(pools_t
& pools
, const string
& poolName
)
831 std::shared_ptr
<ServerPool
> pool
;
832 pools_t::iterator it
= pools
.find(poolName
);
833 if (it
!= pools
.end()) {
837 if (!poolName
.empty())
838 vinfolog("Creating pool %s", poolName
);
839 pool
= std::make_shared
<ServerPool
>();
840 pools
.insert(std::pair
<std::string
,std::shared_ptr
<ServerPool
> >(poolName
, pool
));
845 void setPoolPolicy(pools_t
& pools
, const string
& poolName
, std::shared_ptr
<ServerPolicy
> policy
)
847 std::shared_ptr
<ServerPool
> pool
= createPoolIfNotExists(pools
, poolName
);
848 if (!poolName
.empty()) {
849 vinfolog("Setting pool %s server selection policy to %s", poolName
, policy
->name
);
851 vinfolog("Setting default pool server selection policy to %s", policy
->name
);
853 pool
->policy
= policy
;
856 void addServerToPool(pools_t
& pools
, const string
& poolName
, std::shared_ptr
<DownstreamState
> server
)
858 std::shared_ptr
<ServerPool
> pool
= createPoolIfNotExists(pools
, poolName
);
859 if (!poolName
.empty()) {
860 vinfolog("Adding server to pool %s", poolName
);
862 vinfolog("Adding server to default pool");
864 pool
->addServer(server
);
867 void removeServerFromPool(pools_t
& pools
, const string
& poolName
, std::shared_ptr
<DownstreamState
> server
)
869 std::shared_ptr
<ServerPool
> pool
= getPool(pools
, poolName
);
871 if (!poolName
.empty()) {
872 vinfolog("Removing server from pool %s", poolName
);
875 vinfolog("Removing server from default pool");
878 pool
->removeServer(server
);
881 std::shared_ptr
<ServerPool
> getPool(const pools_t
& pools
, const std::string
& poolName
)
883 pools_t::const_iterator it
= pools
.find(poolName
);
885 if (it
== pools
.end()) {
886 throw std::out_of_range("No pool named " + poolName
);
892 NumberedServerVector
getDownstreamCandidates(const pools_t
& pools
, const std::string
& poolName
)
894 std::shared_ptr
<ServerPool
> pool
= getPool(pools
, poolName
);
895 return pool
->getServers();
898 static void spoofResponseFromString(DNSQuestion
& dq
, const string
& spoofContent
)
902 std::vector
<std::string
> addrs
;
903 stringtok(addrs
, spoofContent
, " ,");
905 if (addrs
.size() == 1) {
907 ComboAddress
spoofAddr(spoofContent
);
908 SpoofAction
sa({spoofAddr
});
911 catch(const PDNSException
&e
) {
912 SpoofAction
sa(spoofContent
); // CNAME then
916 std::vector
<ComboAddress
> cas
;
917 for (const auto& addr
: addrs
) {
919 cas
.push_back(ComboAddress(addr
));
929 bool processQuery(LocalHolders
& holders
, DNSQuestion
& dq
, string
& poolname
, int* delayMsec
, const struct timespec
& now
)
931 g_rings
.insertQuery(now
,*dq
.remote
,*dq
.qname
,dq
.qtype
,dq
.len
,*dq
.dh
);
933 if(g_qcount
.enabled
) {
934 string qname
= (*dq
.qname
).toString(".");
935 bool countQuery
{true};
936 if(g_qcount
.filter
) {
937 std::lock_guard
<std::mutex
> lock(g_luamutex
);
938 std::tie (countQuery
, qname
) = g_qcount
.filter(dq
);
942 WriteLock
wl(&g_qcount
.queryLock
);
943 if(!g_qcount
.records
.count(qname
)) {
944 g_qcount
.records
[qname
] = 0;
946 g_qcount
.records
[qname
]++;
950 if(auto got
= holders
.dynNMGBlock
->lookup(*dq
.remote
)) {
951 auto updateBlockStats
= [&got
]() {
952 g_stats
.dynBlocked
++;
953 got
->second
.blocks
++;
956 if(now
< got
->second
.until
) {
957 DNSAction::Action action
= got
->second
.action
;
958 if (action
== DNSAction::Action::None
) {
959 action
= g_dynBlockAction
;
962 case DNSAction::Action::NoOp
:
965 case DNSAction::Action::Refused
:
966 vinfolog("Query from %s refused because of dynamic block", dq
.remote
->toStringWithPort());
969 dq
.dh
->rcode
= RCode::Refused
;
973 case DNSAction::Action::Truncate
:
976 vinfolog("Query from %s truncated because of dynamic block", dq
.remote
->toStringWithPort());
982 vinfolog("Query from %s for %s over TCP *not* truncated because of dynamic block", dq
.remote
->toStringWithPort(), dq
.qname
->toString());
987 vinfolog("Query from %s dropped because of dynamic block", dq
.remote
->toStringWithPort());
993 if(auto got
= holders
.dynSMTBlock
->lookup(*dq
.qname
)) {
994 auto updateBlockStats
= [&got
]() {
995 g_stats
.dynBlocked
++;
999 if(now
< got
->until
) {
1000 DNSAction::Action action
= got
->action
;
1001 if (action
== DNSAction::Action::None
) {
1002 action
= g_dynBlockAction
;
1005 case DNSAction::Action::NoOp
:
1008 case DNSAction::Action::Refused
:
1009 vinfolog("Query from %s for %s refused because of dynamic block", dq
.remote
->toStringWithPort(), dq
.qname
->toString());
1012 dq
.dh
->rcode
= RCode::Refused
;
1015 case DNSAction::Action::Truncate
:
1019 vinfolog("Query from %s for %s truncated because of dynamic block", dq
.remote
->toStringWithPort(), dq
.qname
->toString());
1025 vinfolog("Query from %s for %s over TCP *not* truncated because of dynamic block", dq
.remote
->toStringWithPort(), dq
.qname
->toString());
1030 vinfolog("Query from %s for %s dropped because of dynamic block", dq
.remote
->toStringWithPort(), dq
.qname
->toString());
1036 DNSAction::Action action
=DNSAction::Action::None
;
1038 for(const auto& lr
: *holders
.rulactions
) {
1039 if(lr
.d_rule
->matches(&dq
)) {
1040 lr
.d_rule
->d_matches
++;
1041 action
=(*lr
.d_action
)(&dq
, &ruleresult
);
1044 case DNSAction::Action::Allow
:
1047 case DNSAction::Action::Drop
:
1051 case DNSAction::Action::Nxdomain
:
1052 dq
.dh
->rcode
= RCode::NXDomain
;
1054 g_stats
.ruleNXDomain
++;
1057 case DNSAction::Action::Refused
:
1058 dq
.dh
->rcode
= RCode::Refused
;
1060 g_stats
.ruleRefused
++;
1063 case DNSAction::Action::ServFail
:
1064 dq
.dh
->rcode
= RCode::ServFail
;
1066 g_stats
.ruleServFail
++;
1069 case DNSAction::Action::Spoof
:
1070 spoofResponseFromString(dq
, ruleresult
);
1073 case DNSAction::Action::Truncate
:
1078 case DNSAction::Action::HeaderModify
:
1081 case DNSAction::Action::Pool
:
1082 poolname
=ruleresult
;
1085 /* non-terminal actions follow */
1086 case DNSAction::Action::Delay
:
1087 *delayMsec
= static_cast<int>(pdns_stou(ruleresult
)); // sorry
1089 case DNSAction::Action::None
:
1091 case DNSAction::Action::NoOp
:
1100 bool processResponse(LocalStateHolder
<vector
<DNSDistResponseRuleAction
> >& localRespRulactions
, DNSResponse
& dr
, int* delayMsec
)
1102 DNSResponseAction::Action action
=DNSResponseAction::Action::None
;
1103 std::string ruleresult
;
1104 for(const auto& lr
: *localRespRulactions
) {
1105 if(lr
.d_rule
->matches(&dr
)) {
1106 lr
.d_rule
->d_matches
++;
1107 action
=(*lr
.d_action
)(&dr
, &ruleresult
);
1109 case DNSResponseAction::Action::Allow
:
1112 case DNSResponseAction::Action::Drop
:
1115 case DNSResponseAction::Action::HeaderModify
:
1118 case DNSResponseAction::Action::ServFail
:
1119 dr
.dh
->rcode
= RCode::ServFail
;
1122 /* non-terminal actions follow */
1123 case DNSResponseAction::Action::Delay
:
1124 *delayMsec
= static_cast<int>(pdns_stou(ruleresult
)); // sorry
1126 case DNSResponseAction::Action::None
:
1135 static ssize_t
udpClientSendRequestToBackend(DownstreamState
* ss
, const int sd
, const char* request
, const size_t requestLen
, bool healthCheck
=false)
1139 if (ss
->sourceItf
== 0) {
1140 result
= send(sd
, request
, requestLen
, 0);
1146 ComboAddress
remote(ss
->remote
);
1147 fillMSGHdr(&msgh
, &iov
, cbuf
, sizeof(cbuf
), const_cast<char*>(request
), requestLen
, &remote
);
1148 addCMsgSrcAddr(&msgh
, cbuf
, &ss
->sourceAddr
, ss
->sourceItf
);
1149 result
= sendmsg(sd
, &msgh
, 0);
1153 int savederrno
= errno
;
1154 vinfolog("Error sending request to backend %s: %d", ss
->remote
.toStringWithPort(), savederrno
);
1156 /* This might sound silly, but on Linux send() might fail with EINVAL
1157 if the interface the socket was bound to doesn't exist anymore.
1158 We don't want to reconnect the real socket if the healthcheck failed,
1159 because it's not using the same socket.
1161 if (!healthCheck
&& (savederrno
== EINVAL
|| savederrno
== ENODEV
)) {
1169 bool addXPF(DNSQuestion
& dq
, uint16_t optionCode
)
1171 std::string payload
= generateXPFPayload(dq
.tcp
, *dq
.remote
, *dq
.local
);
1172 uint8_t root
= '\0';
1173 dnsrecordheader drh
;
1174 drh
.d_type
= htons(optionCode
);
1175 drh
.d_class
= htons(QClass::IN
);
1177 drh
.d_clen
= htons(payload
.size());
1178 size_t recordHeaderLen
= sizeof(root
) + sizeof(drh
);
1180 size_t available
= dq
.size
- dq
.len
;
1182 if ((payload
.size() + recordHeaderLen
) > available
) {
1186 size_t pos
= dq
.len
;
1187 memcpy(reinterpret_cast<char*>(dq
.dh
) + pos
, &root
, sizeof(root
));
1188 pos
+= sizeof(root
);
1189 memcpy(reinterpret_cast<char*>(dq
.dh
) + pos
, &drh
, sizeof(drh
));
1191 memcpy(reinterpret_cast<char*>(dq
.dh
) + pos
, payload
.data(), payload
.size());
1192 pos
+= payload
.size();
1196 dq
.dh
->arcount
= htons(ntohs(dq
.dh
->arcount
) + 1);
1201 static bool isUDPQueryAcceptable(ClientState
& cs
, LocalHolders
& holders
, const struct msghdr
* msgh
, const ComboAddress
& remote
, ComboAddress
& dest
)
1203 if (msgh
->msg_flags
& MSG_TRUNC
) {
1204 /* message was too large for our buffer */
1205 vinfolog("Dropping message too large for our buffer");
1206 g_stats
.nonCompliantQueries
++;
1210 if(!holders
.acl
->match(remote
)) {
1211 vinfolog("Query from %s dropped because of ACL", remote
.toStringWithPort());
1219 if (HarvestDestinationAddress(msgh
, &dest
)) {
1220 /* we don't get the port, only the address */
1221 dest
.sin4
.sin_port
= cs
.local
.sin4
.sin_port
;
1224 dest
.sin4
.sin_family
= 0;
1230 #ifdef HAVE_DNSCRYPT
1231 static bool checkDNSCryptQuery(const ClientState
& cs
, const char* query
, uint16_t& len
, std::shared_ptr
<DNSCryptQuery
>& dnsCryptQuery
, const ComboAddress
& dest
, const ComboAddress
& remote
, time_t now
)
1233 if (cs
.dnscryptCtx
) {
1234 vector
<uint8_t> response
;
1235 uint16_t decryptedQueryLen
= 0;
1237 dnsCryptQuery
= std::make_shared
<DNSCryptQuery
>(cs
.dnscryptCtx
);
1239 bool decrypted
= handleDNSCryptQuery(const_cast<char*>(query
), len
, dnsCryptQuery
, &decryptedQueryLen
, false, now
, response
);
1242 if (response
.size() > 0) {
1243 sendUDPResponse(cs
.udpFD
, reinterpret_cast<char*>(response
.data()), static_cast<uint16_t>(response
.size()), 0, dest
, remote
);
1248 len
= decryptedQueryLen
;
1252 #endif /* HAVE_DNSCRYPT */
1254 bool checkQueryHeaders(const struct dnsheader
* dh
)
1256 if (dh
->qr
) { // don't respond to responses
1257 g_stats
.nonCompliantQueries
++;
1261 if (dh
->qdcount
== 0) {
1262 g_stats
.emptyQueries
++;
1267 g_stats
.rdQueries
++;
1273 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1274 static void queueResponse(const ClientState
& cs
, const char* response
, uint16_t responseLen
, const ComboAddress
& dest
, const ComboAddress
& remote
, struct mmsghdr
& outMsg
, struct iovec
* iov
, char* cbuf
)
1277 fillMSGHdr(&outMsg
.msg_hdr
, iov
, nullptr, 0, const_cast<char*>(response
), responseLen
, const_cast<ComboAddress
*>(&remote
));
1279 if (dest
.sin4
.sin_family
== 0) {
1280 outMsg
.msg_hdr
.msg_control
= nullptr;
1283 addCMsgSrcAddr(&outMsg
.msg_hdr
, cbuf
, &dest
, 0);
1286 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1288 static void processUDPQuery(ClientState
& cs
, LocalHolders
& holders
, const struct msghdr
* msgh
, const ComboAddress
& remote
, ComboAddress
& dest
, char* query
, uint16_t len
, size_t queryBufferSize
, struct mmsghdr
* responsesVect
, unsigned int* queuedResponses
, struct iovec
* respIOV
, char* respCBuf
)
1290 assert(responsesVect
== nullptr || (queuedResponses
!= nullptr && respIOV
!= nullptr && respCBuf
!= nullptr));
1291 uint16_t queryId
= 0;
1294 if (!isUDPQueryAcceptable(cs
, holders
, msgh
, remote
, dest
)) {
1298 /* we need an accurate ("real") value for the response and
1299 to store into the IDS, but not for insertion into the
1300 rings for example */
1301 struct timespec queryRealTime
;
1302 struct timespec now
;
1304 gettime(&queryRealTime
, true);
1306 #ifdef HAVE_DNSCRYPT
1307 std::shared_ptr
<DNSCryptQuery
> dnsCryptQuery
= nullptr;
1309 if (!checkDNSCryptQuery(cs
, query
, len
, dnsCryptQuery
, dest
, remote
, queryRealTime
.tv_sec
)) {
1314 struct dnsheader
* dh
= reinterpret_cast<struct dnsheader
*>(query
);
1315 queryId
= ntohs(dh
->id
);
1317 if (!checkQueryHeaders(dh
)) {
1323 const uint16_t * flags
= getFlagsFromDNSHeader(dh
);
1324 const uint16_t origFlags
= *flags
;
1325 uint16_t qtype
, qclass
;
1326 unsigned int consumed
= 0;
1327 DNSName
qname(query
, len
, sizeof(dnsheader
), false, &qtype
, &qclass
, &consumed
);
1328 DNSQuestion
dq(&qname
, qtype
, qclass
, consumed
, dest
.sin4
.sin_family
!= 0 ? &dest
: &cs
.local
, &remote
, dh
, queryBufferSize
, len
, false, &queryRealTime
);
1330 if (!processQuery(holders
, dq
, poolname
, &delayMsec
, now
))
1335 if(dq
.dh
->qr
) { // something turned it into a response
1336 fixUpQueryTurnedResponse(dq
, origFlags
);
1339 char* response
= query
;
1340 uint16_t responseLen
= dq
.len
;
1342 DNSResponse
dr(dq
.qname
, dq
.qtype
, dq
.qclass
, consumed
, dq
.local
, dq
.remote
, reinterpret_cast<dnsheader
*>(response
), dq
.size
, responseLen
, false, &queryRealTime
);
1343 #ifdef HAVE_PROTOBUF
1344 dr
.uniqueId
= dq
.uniqueId
;
1348 if (!processResponse(holders
.selfAnsweredRespRulactions
, dr
, &delayMsec
)) {
1352 #ifdef HAVE_DNSCRYPT
1353 if (!encryptResponse(response
, &responseLen
, dq
.size
, false, dnsCryptQuery
, nullptr, nullptr)) {
1357 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1358 if (delayMsec
== 0 && responsesVect
!= nullptr) {
1359 queueResponse(cs
, response
, responseLen
, dest
, remote
, responsesVect
[*queuedResponses
], respIOV
, respCBuf
);
1360 (*queuedResponses
)++;
1363 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1365 sendUDPResponse(cs
.udpFD
, response
, responseLen
, delayMsec
, dest
, remote
);
1368 g_stats
.selfAnswered
++;
1369 doLatencyStats(0); // we're not going to measure this
1375 DownstreamState
* ss
= nullptr;
1376 std::shared_ptr
<ServerPool
> serverPool
= getPool(*holders
.pools
, poolname
);
1377 std::shared_ptr
<DNSDistPacketCache
> packetCache
= serverPool
->packetCache
;
1378 auto policy
= *(holders
.policy
);
1379 if (serverPool
->policy
!= nullptr) {
1380 policy
= *(serverPool
->policy
);
1382 auto servers
= serverPool
->getServers();
1384 std::lock_guard
<std::mutex
> lock(g_luamutex
);
1385 ss
= policy
.policy(servers
, &dq
).get();
1388 ss
= policy
.policy(servers
, &dq
).get();
1391 bool ednsAdded
= false;
1392 bool ecsAdded
= false;
1393 if (dq
.useECS
&& ((ss
&& ss
->useECS
) || (!ss
&& serverPool
->getECS()))) {
1394 if (!handleEDNSClientSubnet(query
, dq
.size
, consumed
, &dq
.len
, &(ednsAdded
), &(ecsAdded
), dq
.ecsSet
? dq
.ecs
.getNetwork() : remote
, dq
.ecsOverride
, dq
.ecsSet
? dq
.ecs
.getBits() : dq
.ecsPrefixLength
)) {
1395 vinfolog("Dropping query from %s because we couldn't insert the ECS value", remote
.toStringWithPort());
1400 uint32_t cacheKey
= 0;
1401 boost::optional
<Netmask
> subnet
;
1402 if (packetCache
&& !dq
.skipCache
) {
1403 uint16_t cachedResponseSize
= dq
.size
;
1404 uint32_t allowExpired
= ss
? 0 : g_staleCacheEntriesTTL
;
1405 if (packetCache
->get(dq
, consumed
, dh
->id
, query
, &cachedResponseSize
, &cacheKey
, subnet
, allowExpired
)) {
1406 DNSResponse
dr(dq
.qname
, dq
.qtype
, dq
.qclass
, dq
.consumed
, dq
.local
, dq
.remote
, reinterpret_cast<dnsheader
*>(query
), dq
.size
, cachedResponseSize
, false, &queryRealTime
);
1407 #ifdef HAVE_PROTOBUF
1408 dr
.uniqueId
= dq
.uniqueId
;
1412 if (!processResponse(holders
.cacheHitRespRulactions
, dr
, &delayMsec
)) {
1417 #ifdef HAVE_DNSCRYPT
1418 if (!encryptResponse(query
, &cachedResponseSize
, dq
.size
, false, dnsCryptQuery
, nullptr, nullptr)) {
1422 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1423 if (delayMsec
== 0 && responsesVect
!= nullptr) {
1424 queueResponse(cs
, query
, cachedResponseSize
, dest
, remote
, responsesVect
[*queuedResponses
], respIOV
, respCBuf
);
1425 (*queuedResponses
)++;
1428 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1430 sendUDPResponse(cs
.udpFD
, query
, cachedResponseSize
, delayMsec
, dest
, remote
);
1434 g_stats
.cacheHits
++;
1435 doLatencyStats(0); // we're not going to measure this
1438 g_stats
.cacheMisses
++;
1444 if (g_servFailOnNoPolicy
&& !cs
.muted
) {
1445 char* response
= query
;
1446 uint16_t responseLen
= dq
.len
;
1447 restoreFlags(dh
, origFlags
);
1449 dq
.dh
->rcode
= RCode::ServFail
;
1452 DNSResponse
dr(dq
.qname
, dq
.qtype
, dq
.qclass
, dq
.consumed
, dq
.local
, dq
.remote
, reinterpret_cast<dnsheader
*>(response
), dq
.size
, responseLen
, false, &queryRealTime
);
1453 #ifdef HAVE_PROTOBUF
1454 dr
.uniqueId
= dq
.uniqueId
;
1458 if (!processResponse(holders
.selfAnsweredRespRulactions
, dr
, &delayMsec
)) {
1462 #ifdef HAVE_DNSCRYPT
1463 if (!encryptResponse(response
, &responseLen
, dq
.size
, false, dnsCryptQuery
, nullptr, nullptr)) {
1467 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1468 if (responsesVect
!= nullptr) {
1469 queueResponse(cs
, response
, responseLen
, dest
, remote
, responsesVect
[*queuedResponses
], respIOV
, respCBuf
);
1470 (*queuedResponses
)++;
1473 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1475 sendUDPResponse(cs
.udpFD
, response
, responseLen
, 0, dest
, remote
);
1478 // no response-only statistics counter to update.
1479 doLatencyStats(0); // we're not going to measure this
1481 vinfolog("%s query for %s|%s from %s, no policy applied", g_servFailOnNoPolicy
? "ServFailed" : "Dropped", dq
.qname
->toString(), QType(dq
.qtype
).getName(), remote
.toStringWithPort());
1485 if (dq
.addXPF
&& ss
->xpfRRCode
!= 0) {
1486 addXPF(dq
, ss
->xpfRRCode
);
1491 unsigned int idOffset
= (ss
->idOffset
++) % ss
->idStates
.size();
1492 IDState
* ids
= &ss
->idStates
[idOffset
];
1495 int oldFD
= ids
->origFD
.exchange(cs
.udpFD
);
1497 // if we are reusing, no change in outstanding
1502 g_stats
.downstreamTimeouts
++;
1506 ids
->origID
= dh
->id
;
1507 ids
->origRemote
= remote
;
1508 ids
->sentTime
.set(queryRealTime
);
1510 ids
->qtype
= dq
.qtype
;
1511 ids
->qclass
= dq
.qclass
;
1512 ids
->delayMsec
= delayMsec
;
1513 ids
->tempFailureTTL
= dq
.tempFailureTTL
;
1514 ids
->origFlags
= origFlags
;
1515 ids
->cacheKey
= cacheKey
;
1516 ids
->subnet
= subnet
;
1517 ids
->skipCache
= dq
.skipCache
;
1518 ids
->packetCache
= packetCache
;
1519 ids
->ednsAdded
= ednsAdded
;
1520 ids
->ecsAdded
= ecsAdded
;
1521 ids
->qTag
= dq
.qTag
;
1523 /* If we couldn't harvest the real dest addr, still
1524 write down the listening addr since it will be useful
1525 (especially if it's not an 'any' one).
1526 We need to keep track of which one it is since we may
1527 want to use the real but not the listening addr to reply.
1529 if (dest
.sin4
.sin_family
!= 0) {
1530 ids
->origDest
= dest
;
1531 ids
->destHarvested
= true;
1534 ids
->origDest
= cs
.local
;
1535 ids
->destHarvested
= false;
1537 #ifdef HAVE_DNSCRYPT
1538 ids
->dnsCryptQuery
= dnsCryptQuery
;
1540 #ifdef HAVE_PROTOBUF
1541 ids
->uniqueId
= dq
.uniqueId
;
1546 int fd
= pickBackendSocketForSending(ss
);
1547 ssize_t ret
= udpClientSendRequestToBackend(ss
, fd
, query
, dq
.len
);
1551 g_stats
.downstreamSendErrors
++;
1554 vinfolog("Got query for %s|%s from %s, relayed to %s", ids
->qname
.toString(), QType(ids
->qtype
).getName(), remote
.toStringWithPort(), ss
->getName());
1556 catch(const std::exception
& e
){
1557 vinfolog("Got an error in UDP question thread while parsing a query from %s, id %d: %s", remote
.toStringWithPort(), queryId
, e
.what());
1561 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1562 static void MultipleMessagesUDPClientThread(ClientState
* cs
, LocalHolders
& holders
)
1567 /* used by HarvestDestinationAddress */
1569 ComboAddress remote
;
1573 const size_t vectSize
= g_udpVectorSize
;
1574 /* the actual buffer is larger because:
1575 - we may have to add EDNS and/or ECS
1576 - we use it for self-generated responses (from rule or cache)
1577 but we only accept incoming payloads up to that size
1579 static_assert(s_udpIncomingBufferSize
<= sizeof(MMReceiver::packet
), "the incoming buffer size should not be larger than sizeof(MMReceiver::packet)");
1581 auto recvData
= std::unique_ptr
<MMReceiver
[]>(new MMReceiver
[vectSize
]);
1582 auto msgVec
= std::unique_ptr
<struct mmsghdr
[]>(new struct mmsghdr
[vectSize
]);
1583 auto outMsgVec
= std::unique_ptr
<struct mmsghdr
[]>(new struct mmsghdr
[vectSize
]);
1585 /* initialize the structures needed to receive our messages */
1586 for (size_t idx
= 0; idx
< vectSize
; idx
++) {
1587 recvData
[idx
].remote
.sin4
.sin_family
= cs
->local
.sin4
.sin_family
;
1588 fillMSGHdr(&msgVec
[idx
].msg_hdr
, &recvData
[idx
].iov
, recvData
[idx
].cbuf
, sizeof(recvData
[idx
].cbuf
), recvData
[idx
].packet
, s_udpIncomingBufferSize
, &recvData
[idx
].remote
);
1594 /* reset the IO vector, since it's also used to send the vector of responses
1595 to avoid having to copy the data around */
1596 for (size_t idx
= 0; idx
< vectSize
; idx
++) {
1597 recvData
[idx
].iov
.iov_base
= recvData
[idx
].packet
;
1598 recvData
[idx
].iov
.iov_len
= sizeof(recvData
[idx
].packet
);
1601 /* block until we have at least one message ready, but return
1602 as many as possible to save the syscall costs */
1603 int msgsGot
= recvmmsg(cs
->udpFD
, msgVec
.get(), vectSize
, MSG_WAITFORONE
| MSG_TRUNC
, nullptr);
1606 vinfolog("Getting UDP messages via recvmmsg() failed with: %s", strerror(errno
));
1610 unsigned int msgsToSend
= 0;
1612 /* process the received messages */
1613 for (int msgIdx
= 0; msgIdx
< msgsGot
; msgIdx
++) {
1614 const struct msghdr
* msgh
= &msgVec
[msgIdx
].msg_hdr
;
1615 unsigned int got
= msgVec
[msgIdx
].msg_len
;
1616 const ComboAddress
& remote
= recvData
[msgIdx
].remote
;
1618 if (got
< sizeof(struct dnsheader
)) {
1619 g_stats
.nonCompliantQueries
++;
1623 processUDPQuery(*cs
, holders
, msgh
, remote
, recvData
[msgIdx
].dest
, recvData
[msgIdx
].packet
, static_cast<uint16_t>(got
), sizeof(recvData
[msgIdx
].packet
), outMsgVec
.get(), &msgsToSend
, &recvData
[msgIdx
].iov
, recvData
[msgIdx
].cbuf
);
1627 /* immediate (not delayed or sent to a backend) responses (mostly from a rule, dynamic block
1628 or the cache) can be sent in batch too */
1630 if (msgsToSend
> 0 && msgsToSend
<= static_cast<unsigned int>(msgsGot
)) {
1631 int sent
= sendmmsg(cs
->udpFD
, outMsgVec
.get(), msgsToSend
, 0);
1633 if (sent
< 0 || static_cast<unsigned int>(sent
) != msgsToSend
) {
1634 vinfolog("Error sending responses with sendmmsg() (%d on %u): %s", sent
, msgsToSend
, strerror(errno
));
1640 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1642 // listens to incoming queries, sends out to downstream servers, noting the intended return path
1643 static void* udpClientThread(ClientState
* cs
)
1646 LocalHolders holders
;
1648 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1649 if (g_udpVectorSize
> 1) {
1650 MultipleMessagesUDPClientThread(cs
, holders
);
1654 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1657 /* the actual buffer is larger because:
1658 - we may have to add EDNS and/or ECS
1659 - we use it for self-generated responses (from rule or cache)
1660 but we only accept incoming payloads up to that size
1662 static_assert(s_udpIncomingBufferSize
<= sizeof(packet
), "the incoming buffer size should not be larger than sizeof(MMReceiver::packet)");
1665 /* used by HarvestDestinationAddress */
1668 ComboAddress remote
;
1670 remote
.sin4
.sin_family
= cs
->local
.sin4
.sin_family
;
1671 fillMSGHdr(&msgh
, &iov
, cbuf
, sizeof(cbuf
), packet
, sizeof(packet
), &remote
);
1674 ssize_t got
= recvmsg(cs
->udpFD
, &msgh
, 0);
1676 if (got
< 0 || static_cast<size_t>(got
) < sizeof(struct dnsheader
)) {
1677 g_stats
.nonCompliantQueries
++;
1681 processUDPQuery(*cs
, holders
, &msgh
, remote
, dest
, packet
, static_cast<uint16_t>(got
), s_udpIncomingBufferSize
, nullptr, nullptr, nullptr, nullptr);
1687 catch(const std::exception
&e
)
1689 errlog("UDP client thread died because of exception: %s", e
.what());
1692 catch(const PDNSException
&e
)
1694 errlog("UDP client thread died because of PowerDNS exception: %s", e
.reason
);
1699 errlog("UDP client thread died because of an exception: %s", "unknown");
1703 static bool upCheck(DownstreamState
& ds
)
1706 DNSName checkName
= ds
.checkName
;
1707 uint16_t checkType
= ds
.checkType
.getCode();
1708 uint16_t checkClass
= ds
.checkClass
;
1709 dnsheader checkHeader
;
1710 memset(&checkHeader
, 0, sizeof(checkHeader
));
1712 checkHeader
.qdcount
= htons(1);
1713 #ifdef HAVE_LIBSODIUM
1714 checkHeader
.id
= randombytes_random() % 65536;
1716 checkHeader
.id
= random() % 65536;
1719 checkHeader
.rd
= true;
1721 checkHeader
.cd
= true;
1725 if (ds
.checkFunction
) {
1726 std::lock_guard
<std::mutex
> lock(g_luamutex
);
1727 auto ret
= ds
.checkFunction(checkName
, checkType
, checkClass
, &checkHeader
);
1728 checkName
= std::get
<0>(ret
);
1729 checkType
= std::get
<1>(ret
);
1730 checkClass
= std::get
<2>(ret
);
1733 vector
<uint8_t> packet
;
1734 DNSPacketWriter
dpw(packet
, checkName
, checkType
, checkClass
);
1735 dnsheader
* requestHeader
= dpw
.getHeader();
1736 *requestHeader
= checkHeader
;
1738 Socket
sock(ds
.remote
.sin4
.sin_family
, SOCK_DGRAM
);
1739 sock
.setNonBlocking();
1740 if (!IsAnyAddress(ds
.sourceAddr
)) {
1741 sock
.setReuseAddr();
1742 sock
.bind(ds
.sourceAddr
);
1744 sock
.connect(ds
.remote
);
1745 ssize_t sent
= udpClientSendRequestToBackend(&ds
, sock
.getHandle(), (char*)&packet
[0], packet
.size(), true);
1748 if (g_verboseHealthChecks
)
1749 infolog("Error while sending a health check query to backend %s: %d", ds
.getNameWithAddr(), ret
);
1753 int ret
=waitForRWData(sock
.getHandle(), true, 1, 0);
1754 if(ret
< 0 || !ret
) { // error, timeout, both are down!
1757 if (g_verboseHealthChecks
)
1758 infolog("Error while waiting for the health check response from backend %s: %d", ds
.getNameWithAddr(), ret
);
1761 if (g_verboseHealthChecks
)
1762 infolog("Timeout while waiting for the health check response from backend %s", ds
.getNameWithAddr());
1769 sock
.recvFrom(reply
, from
);
1771 /* we are using a connected socket but hey.. */
1772 if (from
!= ds
.remote
) {
1773 if (g_verboseHealthChecks
)
1774 infolog("Invalid health check response received from %s, expecting one from %s", from
.toStringWithPort(), ds
.remote
.toStringWithPort());
1778 const dnsheader
* responseHeader
= reinterpret_cast<const dnsheader
*>(reply
.c_str());
1780 if (reply
.size() < sizeof(*responseHeader
)) {
1781 if (g_verboseHealthChecks
)
1782 infolog("Invalid health check response of size %d from backend %s, expecting at least %d", reply
.size(), ds
.getNameWithAddr(), sizeof(*responseHeader
));
1786 if (responseHeader
->id
!= requestHeader
->id
) {
1787 if (g_verboseHealthChecks
)
1788 infolog("Invalid health check response id %d from backend %s, expecting %d", responseHeader
->id
, ds
.getNameWithAddr(), requestHeader
->id
);
1792 if (!responseHeader
->qr
) {
1793 if (g_verboseHealthChecks
)
1794 infolog("Invalid health check response from backend %s, expecting QR to be set", ds
.getNameWithAddr());
1798 if (responseHeader
->rcode
== RCode::ServFail
) {
1799 if (g_verboseHealthChecks
)
1800 infolog("Backend %s responded to health check with ServFail", ds
.getNameWithAddr());
1804 if (ds
.mustResolve
&& (responseHeader
->rcode
== RCode::NXDomain
|| responseHeader
->rcode
== RCode::Refused
)) {
1805 if (g_verboseHealthChecks
)
1806 infolog("Backend %s responded to health check with %s while mustResolve is set", ds
.getNameWithAddr(), responseHeader
->rcode
== RCode::NXDomain
? "NXDomain" : "Refused");
1810 uint16_t receivedType
;
1811 uint16_t receivedClass
;
1812 DNSName
receivedName(reply
.c_str(), reply
.size(), sizeof(dnsheader
), false, &receivedType
, &receivedClass
);
1814 if (receivedName
!= checkName
|| receivedType
!= checkType
|| receivedClass
!= checkClass
) {
1815 if (g_verboseHealthChecks
)
1816 infolog("Backend %s responded to health check with an invalid qname (%s vs %s), qtype (%s vs %s) or qclass (%d vs %d)", ds
.getNameWithAddr(), receivedName
.toLogString(), checkName
.toLogString(), QType(receivedType
).getName(), QType(checkType
).getName(), receivedClass
, checkClass
);
1822 catch(const std::exception
& e
)
1824 if (g_verboseHealthChecks
)
1825 infolog("Error checking the health of backend %s: %s", ds
.getNameWithAddr(), e
.what());
1830 if (g_verboseHealthChecks
)
1831 infolog("Unknown exception while checking the health of backend %s", ds
.getNameWithAddr());
1835 uint64_t g_maxTCPClientThreads
{10};
1836 std::atomic
<uint16_t> g_cacheCleaningDelay
{60};
1837 std::atomic
<uint16_t> g_cacheCleaningPercentage
{100};
1843 int32_t secondsToWaitLog
= 0;
1849 std::lock_guard
<std::mutex
> lock(g_luamutex
);
1850 auto f
= g_lua
.readVariable
<boost::optional
<std::function
<void()> > >("maintenance");
1854 secondsToWaitLog
= 0;
1856 catch(std::exception
&e
) {
1857 if (secondsToWaitLog
<= 0) {
1858 infolog("Error during execution of maintenance function: %s", e
.what());
1859 secondsToWaitLog
= 61;
1861 secondsToWaitLog
-= interval
;
1867 if (counter
>= g_cacheCleaningDelay
) {
1868 auto localPools
= g_pools
.getLocal();
1869 std::shared_ptr
<DNSDistPacketCache
> packetCache
= nullptr;
1870 for (const auto& entry
: *localPools
) {
1871 packetCache
= entry
.second
->packetCache
;
1873 size_t upTo
= (packetCache
->getMaxEntries()* (100 - g_cacheCleaningPercentage
)) / 100;
1874 packetCache
->purgeExpired(upTo
);
1880 // ponder pruning g_dynblocks of expired entries here
1885 void* healthChecksThread()
1892 if(g_tcpclientthreads
->getQueuedCount() > 1 && !g_tcpclientthreads
->hasReachedMaxThreads())
1893 g_tcpclientthreads
->addTCPClientThread();
1895 auto states
= g_dstates
.getLocal(); // this points to the actual shared_ptrs!
1896 for(auto& dss
: *states
) {
1897 if(dss
->availability
==DownstreamState::Availability::Auto
) {
1898 bool newState
=upCheck(*dss
);
1900 if (dss
->currentCheckFailures
!= 0) {
1901 dss
->currentCheckFailures
= 0;
1904 else if (!newState
&& dss
->upStatus
) {
1905 dss
->currentCheckFailures
++;
1906 if (dss
->currentCheckFailures
< dss
->maxCheckFailures
) {
1911 if(newState
!= dss
->upStatus
) {
1912 warnlog("Marking downstream %s as '%s'", dss
->getNameWithAddr(), newState
? "up" : "down");
1914 if (newState
&& !dss
->connected
) {
1915 newState
= dss
->reconnect();
1917 if (dss
->connected
&& !dss
->threadStarted
.test_and_set()) {
1918 dss
->tid
= thread(responderThread
, dss
);
1922 dss
->upStatus
= newState
;
1923 dss
->currentCheckFailures
= 0;
1924 if (g_snmpAgent
&& g_snmpTrapsEnabled
) {
1925 g_snmpAgent
->sendBackendStatusChangeTrap(dss
);
1930 auto delta
= dss
->sw
.udiffAndSet()/1000000.0;
1931 dss
->queryLoad
= 1.0*(dss
->queries
.load() - dss
->prev
.queries
.load())/delta
;
1932 dss
->dropRate
= 1.0*(dss
->reuseds
.load() - dss
->prev
.reuseds
.load())/delta
;
1933 dss
->prev
.queries
.store(dss
->queries
.load());
1934 dss
->prev
.reuseds
.store(dss
->reuseds
.load());
1936 for(IDState
& ids
: dss
->idStates
) { // timeouts
1937 int origFD
= ids
.origFD
;
1938 if(origFD
>=0 && ids
.age
++ > g_udpTimeout
) {
1939 /* We set origFD to -1 as soon as possible
1940 to limit the risk of racing with the
1942 The UDP client thread only checks origFD to
1943 know whether outstanding has to be incremented,
1944 so the sooner the better any way since we _will_
1947 if (ids
.origFD
.exchange(-1) != origFD
) {
1948 /* this state has been altered in the meantime,
1949 don't go anywhere near it */
1955 g_stats
.downstreamTimeouts
++; // this is an 'actively' discovered timeout
1956 vinfolog("Had a downstream timeout from %s (%s) for query for %s|%s from %s",
1957 dss
->remote
.toStringWithPort(), dss
->name
,
1958 ids
.qname
.toString(), QType(ids
.qtype
).getName(), ids
.origRemote
.toStringWithPort());
1963 struct dnsheader fake
;
1964 memset(&fake
, 0, sizeof(fake
));
1965 fake
.id
= ids
.origID
;
1967 g_rings
.insertResponse(ts
, ids
.origRemote
, ids
.qname
, ids
.qtype
, std::numeric_limits
<unsigned int>::max(), 0, fake
, dss
->remote
);
1975 static void bindAny(int af
, int sock
)
1977 __attribute__((unused
)) int one
= 1;
1980 if (setsockopt(sock
, IPPROTO_IP
, IP_FREEBIND
, &one
, sizeof(one
)) < 0)
1981 warnlog("Warning: IP_FREEBIND setsockopt failed: %s", strerror(errno
));
1986 if (setsockopt(sock
, IPPROTO_IP
, IP_BINDANY
, &one
, sizeof(one
)) < 0)
1987 warnlog("Warning: IP_BINDANY setsockopt failed: %s", strerror(errno
));
1991 if (setsockopt(sock
, IPPROTO_IPV6
, IPV6_BINDANY
, &one
, sizeof(one
)) < 0)
1992 warnlog("Warning: IPV6_BINDANY setsockopt failed: %s", strerror(errno
));
1995 if (setsockopt(sock
, SOL_SOCKET
, SO_BINDANY
, &one
, sizeof(one
)) < 0)
1996 warnlog("Warning: SO_BINDANY setsockopt failed: %s", strerror(errno
));
2000 static void dropGroupPrivs(gid_t gid
)
2003 if (setgid(gid
) == 0) {
2004 if (setgroups(0, NULL
) < 0) {
2005 warnlog("Warning: Unable to drop supplementary gids: %s", strerror(errno
));
2009 warnlog("Warning: Unable to set group ID to %d: %s", gid
, strerror(errno
));
2014 static void dropUserPrivs(uid_t uid
)
2017 if(setuid(uid
) < 0) {
2018 warnlog("Warning: Unable to set user ID to %d: %s", uid
, strerror(errno
));
2023 static void checkFileDescriptorsLimits(size_t udpBindsCount
, size_t tcpBindsCount
)
2025 /* stdin, stdout, stderr */
2026 size_t requiredFDsCount
= 3;
2027 auto backends
= g_dstates
.getLocal();
2028 /* UDP sockets to backends */
2029 size_t backendUDPSocketsCount
= 0;
2030 for (const auto& backend
: *backends
) {
2031 backendUDPSocketsCount
+= backend
->sockets
.size();
2033 requiredFDsCount
+= backendUDPSocketsCount
;
2034 /* TCP sockets to backends */
2035 requiredFDsCount
+= (backends
->size() * g_maxTCPClientThreads
);
2036 /* listening sockets */
2037 requiredFDsCount
+= udpBindsCount
;
2038 requiredFDsCount
+= tcpBindsCount
;
2039 /* max TCP connections currently served */
2040 requiredFDsCount
+= g_maxTCPClientThreads
;
2041 /* max pipes for communicating between TCP acceptors and client threads */
2042 requiredFDsCount
+= (g_maxTCPClientThreads
* 2);
2043 /* max TCP queued connections */
2044 requiredFDsCount
+= g_maxTCPQueuedConnections
;
2045 /* DelayPipe pipe */
2046 requiredFDsCount
+= 2;
2049 /* webserver main socket */
2051 /* console main socket */
2058 getrlimit(RLIMIT_NOFILE
, &rl
);
2059 if (rl
.rlim_cur
<= requiredFDsCount
) {
2060 warnlog("Warning, this configuration can use more than %d file descriptors, web server and console connections not included, and the current limit is %d.", std::to_string(requiredFDsCount
), std::to_string(rl
.rlim_cur
));
2062 warnlog("You can increase this value by using LimitNOFILE= in the systemd unit file or ulimit.");
2064 warnlog("You can increase this value by using ulimit.");
2071 vector
<string
> locals
;
2072 vector
<string
> remotes
;
2073 bool checkConfig
{false};
2074 bool beClient
{false};
2075 bool beSupervised
{false};
2082 std::atomic
<bool> g_configurationDone
{false};
2087 cout
<<"Syntax: dnsdist [-C,--config file] [-c,--client [IP[:PORT]]]\n";
2088 cout
<<"[-e,--execute cmd] [-h,--help] [-l,--local addr]\n";
2089 cout
<<"[-v,--verbose] [--check-config] [--version]\n";
2091 cout
<<"-a,--acl netmask Add this netmask to the ACL\n";
2092 cout
<<"-C,--config file Load configuration from 'file'\n";
2093 cout
<<"-c,--client Operate as a client, connect to dnsdist. This reads\n";
2094 cout
<<" controlSocket from your configuration file, but also\n";
2095 cout
<<" accepts an IP:PORT argument\n";
2096 #ifdef HAVE_LIBSODIUM
2097 cout
<<"-k,--setkey KEY Use KEY for encrypted communication to dnsdist. This\n";
2098 cout
<<" is similar to setting setKey in the configuration file.\n";
2099 cout
<<" NOTE: this will leak this key in your shell's history\n";
2100 cout
<<" and in the systems running process list.\n";
2102 cout
<<"--check-config Validate the configuration file and exit. The exit-code\n";
2103 cout
<<" reflects the validation, 0 is OK, 1 means an error.\n";
2104 cout
<<" Any errors are printed as well.\n";
2105 cout
<<"-e,--execute cmd Connect to dnsdist and execute 'cmd'\n";
2106 cout
<<"-g,--gid gid Change the process group ID after binding sockets\n";
2107 cout
<<"-h,--help Display this helpful message\n";
2108 cout
<<"-l,--local address Listen on this local address\n";
2109 cout
<<"--supervised Don't open a console, I'm supervised\n";
2110 cout
<<" (use with e.g. systemd and daemontools)\n";
2111 cout
<<"--disable-syslog Don't log to syslog, only to stdout\n";
2112 cout
<<" (use with e.g. systemd)\n";
2113 cout
<<"-u,--uid uid Change the process user ID after binding sockets\n";
2114 cout
<<"-v,--verbose Enable verbose mode\n";
2115 cout
<<"-V,--version Show dnsdist version information and exit\n";
2118 int main(int argc
, char** argv
)
2121 size_t udpBindsCount
= 0;
2122 size_t tcpBindsCount
= 0;
2123 rl_attempted_completion_function
= my_completion
;
2124 rl_completion_append_character
= 0;
2126 signal(SIGPIPE
, SIG_IGN
);
2127 signal(SIGCHLD
, SIG_IGN
);
2128 openlog("dnsdist", LOG_PID
, LOG_DAEMON
);
2130 #ifdef HAVE_LIBSODIUM
2131 if (sodium_init() == -1) {
2132 cerr
<<"Unable to initialize crypto library"<<endl
;
2135 g_hashperturb
=randombytes_uniform(0xffffffff);
2136 srandom(randombytes_uniform(0xffffffff));
2140 gettimeofday(&tv
, 0);
2141 srandom(tv
.tv_sec
^ tv
.tv_usec
^ getpid());
2142 g_hashperturb
=random();
2146 ComboAddress clientAddress
= ComboAddress();
2147 g_cmdLine
.config
=SYSCONFDIR
"/dnsdist.conf";
2148 struct option longopts
[]={
2149 {"acl", required_argument
, 0, 'a'},
2150 {"check-config", no_argument
, 0, 1},
2151 {"client", no_argument
, 0, 'c'},
2152 {"config", required_argument
, 0, 'C'},
2153 {"disable-syslog", no_argument
, 0, 2},
2154 {"execute", required_argument
, 0, 'e'},
2155 {"gid", required_argument
, 0, 'g'},
2156 {"help", no_argument
, 0, 'h'},
2157 {"local", required_argument
, 0, 'l'},
2158 {"setkey", required_argument
, 0, 'k'},
2159 {"supervised", no_argument
, 0, 3},
2160 {"uid", required_argument
, 0, 'u'},
2161 {"verbose", no_argument
, 0, 'v'},
2162 {"version", no_argument
, 0, 'V'},
2168 int c
=getopt_long(argc
, argv
, "a:cC:e:g:hk:l:u:vV", longopts
, &longindex
);
2173 g_cmdLine
.checkConfig
=true;
2179 g_cmdLine
.beSupervised
=true;
2182 g_cmdLine
.config
=optarg
;
2185 g_cmdLine
.beClient
=true;
2188 g_cmdLine
.command
=optarg
;
2191 g_cmdLine
.gid
=optarg
;
2194 cout
<<"dnsdist "<<VERSION
<<endl
;
2201 g_ACL
.modify([optstring
](NetmaskGroup
& nmg
) { nmg
.addMask(optstring
); });
2204 #ifdef HAVE_LIBSODIUM
2205 if (B64Decode(string(optarg
), g_consoleKey
) < 0) {
2206 cerr
<<"Unable to decode key '"<<optarg
<<"'."<<endl
;
2210 cerr
<<"dnsdist has been built without libsodium, -k/--setkey is unsupported."<<endl
;
2215 g_cmdLine
.locals
.push_back(trim_copy(string(optarg
)));
2218 g_cmdLine
.uid
=optarg
;
2224 #ifdef LUAJIT_VERSION
2225 cout
<<"dnsdist "<<VERSION
<<" ("<<LUA_RELEASE
<<" ["<<LUAJIT_VERSION
<<"])"<<endl
;
2227 cout
<<"dnsdist "<<VERSION
<<" ("<<LUA_RELEASE
<<")"<<endl
;
2229 cout
<<"Enabled features: ";
2230 #ifdef HAVE_DNS_OVER_TLS
2231 cout
<<"dns-over-tls(";
2243 #ifdef HAVE_DNSCRYPT
2252 #ifdef HAVE_LIBSODIUM
2255 #ifdef HAVE_PROTOBUF
2261 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
2262 cout
<<"recvmmsg/sendmmsg ";
2264 #ifdef HAVE_NET_SNMP
2274 //getopt_long printed an error message.
2283 for(auto p
= argv
; *p
; ++p
) {
2284 if(g_cmdLine
.beClient
) {
2285 clientAddress
= ComboAddress(*p
, 5199);
2287 g_cmdLine
.remotes
.push_back(*p
);
2291 ServerPolicy leastOutstandingPol
{"leastOutstanding", leastOutstanding
, false};
2293 g_policy
.setState(leastOutstandingPol
);
2294 if(g_cmdLine
.beClient
|| !g_cmdLine
.command
.empty()) {
2295 setupLua(true, g_cmdLine
.config
);
2296 if (clientAddress
!= ComboAddress())
2297 g_serverControl
= clientAddress
;
2298 doClient(g_serverControl
, g_cmdLine
.command
);
2299 _exit(EXIT_SUCCESS
);
2302 auto acl
= g_ACL
.getCopy();
2304 for(auto& addr
: {"127.0.0.0/8", "10.0.0.0/8", "100.64.0.0/10", "169.254.0.0/16", "192.168.0.0/16", "172.16.0.0/12", "::1/128", "fc00::/7", "fe80::/10"})
2306 g_ACL
.setState(acl
);
2309 auto consoleACL
= g_consoleACL
.getCopy();
2310 for (const auto& mask
: { "127.0.0.1/8", "::1/128" }) {
2311 consoleACL
.addMask(mask
);
2313 g_consoleACL
.setState(consoleACL
);
2315 if (g_cmdLine
.checkConfig
) {
2316 setupLua(true, g_cmdLine
.config
);
2317 // No exception was thrown
2318 infolog("Configuration '%s' OK!", g_cmdLine
.config
);
2319 _exit(EXIT_SUCCESS
);
2322 auto todo
=setupLua(false, g_cmdLine
.config
);
2324 auto localPools
= g_pools
.getCopy();
2326 bool precompute
= false;
2327 if (g_policy
.getLocal()->name
== "chashed") {
2330 for (const auto& entry
: localPools
) {
2331 if (entry
.second
->policy
!= nullptr && entry
.second
->policy
->name
== "chashed") {
2338 vinfolog("Pre-computing hashes for consistent hash load-balancing policy");
2339 // pre compute hashes
2340 auto backends
= g_dstates
.getLocal();
2341 for (auto& backend
: *backends
) {
2347 if(g_cmdLine
.locals
.size()) {
2349 for(auto loc
: g_cmdLine
.locals
)
2350 g_locals
.push_back(std::make_tuple(ComboAddress(loc
, 53), true, false, 0, "", std::set
<int>()));
2353 if(g_locals
.empty())
2354 g_locals
.push_back(std::make_tuple(ComboAddress("127.0.0.1", 53), true, false, 0, "", std::set
<int>()));
2356 g_configurationDone
= true;
2358 vector
<ClientState
*> toLaunch
;
2359 for(const auto& local
: g_locals
) {
2360 ClientState
* cs
= new ClientState
;
2361 cs
->local
= std::get
<0>(local
);
2362 cs
->udpFD
= SSocket(cs
->local
.sin4
.sin_family
, SOCK_DGRAM
, 0);
2363 if(cs
->local
.sin4
.sin_family
== AF_INET6
) {
2364 SSetsockopt(cs
->udpFD
, IPPROTO_IPV6
, IPV6_V6ONLY
, 1);
2366 //if(g_vm.count("bind-non-local"))
2367 bindAny(cs
->local
.sin4
.sin_family
, cs
->udpFD
);
2369 // if (!setSocketTimestamps(cs->udpFD))
2370 // g_log<<Logger::Warning<<"Unable to enable timestamp reporting for socket"<<endl;
2373 if(IsAnyAddress(cs
->local
)) {
2375 setsockopt(cs
->udpFD
, IPPROTO_IP
, GEN_IP_PKTINFO
, &one
, sizeof(one
)); // linux supports this, so why not - might fail on other systems
2376 #ifdef IPV6_RECVPKTINFO
2377 setsockopt(cs
->udpFD
, IPPROTO_IPV6
, IPV6_RECVPKTINFO
, &one
, sizeof(one
));
2381 if (std::get
<2>(local
)) {
2383 SSetsockopt(cs
->udpFD
, SOL_SOCKET
, SO_REUSEPORT
, 1);
2385 warnlog("SO_REUSEPORT has been configured on local address '%s' but is not supported", std::get
<0>(local
).toStringWithPort());
2389 const std::string
& itf
= std::get
<4>(local
);
2391 #ifdef SO_BINDTODEVICE
2392 int res
= setsockopt(cs
->udpFD
, SOL_SOCKET
, SO_BINDTODEVICE
, itf
.c_str(), itf
.length());
2394 warnlog("Error setting up the interface on local address '%s': %s", std::get
<0>(local
).toStringWithPort(), strerror(errno
));
2397 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", std::get
<0>(local
).toStringWithPort());
2402 if (g_defaultBPFFilter
) {
2403 cs
->attachFilter(g_defaultBPFFilter
);
2404 vinfolog("Attaching default BPF Filter to UDP frontend %s", cs
->local
.toStringWithPort());
2406 #endif /* HAVE_EBPF */
2408 cs
->cpus
= std::get
<5>(local
);
2410 SBind(cs
->udpFD
, cs
->local
);
2411 toLaunch
.push_back(cs
);
2412 g_frontends
.push_back(cs
);
2416 for(const auto& local
: g_locals
) {
2417 if(!std::get
<1>(local
)) { // no TCP/IP
2418 warnlog("Not providing TCP/IP service on local address '%s'", std::get
<0>(local
).toStringWithPort());
2421 ClientState
* cs
= new ClientState
;
2422 cs
->local
= std::get
<0>(local
);
2424 cs
->tcpFD
= SSocket(cs
->local
.sin4
.sin_family
, SOCK_STREAM
, 0);
2426 SSetsockopt(cs
->tcpFD
, SOL_SOCKET
, SO_REUSEADDR
, 1);
2427 #ifdef TCP_DEFER_ACCEPT
2428 SSetsockopt(cs
->tcpFD
, IPPROTO_TCP
, TCP_DEFER_ACCEPT
, 1);
2430 if (std::get
<3>(local
) > 0) {
2432 SSetsockopt(cs
->tcpFD
, IPPROTO_TCP
, TCP_FASTOPEN
, std::get
<3>(local
));
2434 warnlog("TCP Fast Open has been configured on local address '%s' but is not supported", std::get
<0>(local
).toStringWithPort());
2437 if(cs
->local
.sin4
.sin_family
== AF_INET6
) {
2438 SSetsockopt(cs
->tcpFD
, IPPROTO_IPV6
, IPV6_V6ONLY
, 1);
2441 /* no need to warn again if configured but support is not available, we already did for UDP */
2442 if (std::get
<2>(local
)) {
2443 SSetsockopt(cs
->tcpFD
, SOL_SOCKET
, SO_REUSEPORT
, 1);
2447 const std::string
& itf
= std::get
<4>(local
);
2449 #ifdef SO_BINDTODEVICE
2450 int res
= setsockopt(cs
->tcpFD
, SOL_SOCKET
, SO_BINDTODEVICE
, itf
.c_str(), itf
.length());
2452 warnlog("Error setting up the interface on local address '%s': %s", std::get
<0>(local
).toStringWithPort(), strerror(errno
));
2455 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", std::get
<0>(local
).toStringWithPort());
2460 if (g_defaultBPFFilter
) {
2461 cs
->attachFilter(g_defaultBPFFilter
);
2462 vinfolog("Attaching default BPF Filter to TCP frontend %s", cs
->local
.toStringWithPort());
2464 #endif /* HAVE_EBPF */
2466 // if(g_vm.count("bind-non-local"))
2467 bindAny(cs
->local
.sin4
.sin_family
, cs
->tcpFD
);
2468 SBind(cs
->tcpFD
, cs
->local
);
2469 SListen(cs
->tcpFD
, 64);
2470 warnlog("Listening on %s", cs
->local
.toStringWithPort());
2472 toLaunch
.push_back(cs
);
2473 g_frontends
.push_back(cs
);
2477 #ifdef HAVE_DNSCRYPT
2478 for(auto& dcLocal
: g_dnsCryptLocals
) {
2479 ClientState
* cs
= new ClientState
;
2480 cs
->local
= std::get
<0>(dcLocal
);
2481 cs
->dnscryptCtx
= std::get
<1>(dcLocal
);
2482 cs
->udpFD
= SSocket(cs
->local
.sin4
.sin_family
, SOCK_DGRAM
, 0);
2483 if(cs
->local
.sin4
.sin_family
== AF_INET6
) {
2484 SSetsockopt(cs
->udpFD
, IPPROTO_IPV6
, IPV6_V6ONLY
, 1);
2486 bindAny(cs
->local
.sin4
.sin_family
, cs
->udpFD
);
2487 if(IsAnyAddress(cs
->local
)) {
2489 setsockopt(cs
->udpFD
, IPPROTO_IP
, GEN_IP_PKTINFO
, &one
, sizeof(one
)); // linux supports this, so why not - might fail on other systems
2490 #ifdef IPV6_RECVPKTINFO
2491 setsockopt(cs
->udpFD
, IPPROTO_IPV6
, IPV6_RECVPKTINFO
, &one
, sizeof(one
));
2494 if (std::get
<2>(dcLocal
)) {
2496 SSetsockopt(cs
->udpFD
, SOL_SOCKET
, SO_REUSEPORT
, 1);
2498 warnlog("SO_REUSEPORT has been configured on local address '%s' but is not supported", std::get
<0>(dcLocal
).toStringWithPort());
2502 const std::string
& itf
= std::get
<4>(dcLocal
);
2504 #ifdef SO_BINDTODEVICE
2505 int res
= setsockopt(cs
->udpFD
, SOL_SOCKET
, SO_BINDTODEVICE
, itf
.c_str(), itf
.length());
2507 warnlog("Error setting up the interface on local address '%s': %s", std::get
<0>(dcLocal
).toStringWithPort(), strerror(errno
));
2510 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", std::get
<0>(dcLocal
).toStringWithPort());
2515 if (g_defaultBPFFilter
) {
2516 cs
->attachFilter(g_defaultBPFFilter
);
2517 vinfolog("Attaching default BPF Filter to UDP DNSCrypt frontend %s", cs
->local
.toStringWithPort());
2519 #endif /* HAVE_EBPF */
2520 SBind(cs
->udpFD
, cs
->local
);
2521 toLaunch
.push_back(cs
);
2522 g_frontends
.push_back(cs
);
2525 cs
= new ClientState
;
2526 cs
->local
= std::get
<0>(dcLocal
);
2527 cs
->dnscryptCtx
= std::get
<1>(dcLocal
);
2528 cs
->tcpFD
= SSocket(cs
->local
.sin4
.sin_family
, SOCK_STREAM
, 0);
2529 SSetsockopt(cs
->tcpFD
, SOL_SOCKET
, SO_REUSEADDR
, 1);
2530 #ifdef TCP_DEFER_ACCEPT
2531 SSetsockopt(cs
->tcpFD
, IPPROTO_TCP
, TCP_DEFER_ACCEPT
, 1);
2533 if (std::get
<3>(dcLocal
) > 0) {
2535 SSetsockopt(cs
->tcpFD
, IPPROTO_TCP
, TCP_FASTOPEN
, std::get
<3>(dcLocal
));
2537 warnlog("TCP Fast Open has been configured on local address '%s' but is not supported", std::get
<0>(dcLocal
).toStringWithPort());
2542 /* no need to warn again if configured but support is not available, we already did for UDP */
2543 if (std::get
<2>(dcLocal
)) {
2544 SSetsockopt(cs
->tcpFD
, SOL_SOCKET
, SO_REUSEPORT
, 1);
2549 #ifdef SO_BINDTODEVICE
2550 int res
= setsockopt(cs
->tcpFD
, SOL_SOCKET
, SO_BINDTODEVICE
, itf
.c_str(), itf
.length());
2552 warnlog("Error setting up the interface on local address '%s': %s", std::get
<0>(dcLocal
).toStringWithPort(), strerror(errno
));
2555 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", std::get
<0>(dcLocal
).toStringWithPort());
2559 if(cs
->local
.sin4
.sin_family
== AF_INET6
) {
2560 SSetsockopt(cs
->tcpFD
, IPPROTO_IPV6
, IPV6_V6ONLY
, 1);
2563 if (g_defaultBPFFilter
) {
2564 cs
->attachFilter(g_defaultBPFFilter
);
2565 vinfolog("Attaching default BPF Filter to TCP DNSCrypt frontend %s", cs
->local
.toStringWithPort());
2567 #endif /* HAVE_EBPF */
2569 cs
->cpus
= std::get
<5>(dcLocal
);
2571 bindAny(cs
->local
.sin4
.sin_family
, cs
->tcpFD
);
2572 SBind(cs
->tcpFD
, cs
->local
);
2573 SListen(cs
->tcpFD
, 64);
2574 warnlog("Listening on %s", cs
->local
.toStringWithPort());
2575 toLaunch
.push_back(cs
);
2576 g_frontends
.push_back(cs
);
2581 for(auto& frontend
: g_tlslocals
) {
2582 ClientState
* cs
= new ClientState
;
2583 cs
->local
= frontend
->d_addr
;
2584 cs
->tcpFD
= SSocket(cs
->local
.sin4
.sin_family
, SOCK_STREAM
, 0);
2585 SSetsockopt(cs
->tcpFD
, SOL_SOCKET
, SO_REUSEADDR
, 1);
2586 #ifdef TCP_DEFER_ACCEPT
2587 SSetsockopt(cs
->tcpFD
, IPPROTO_TCP
, TCP_DEFER_ACCEPT
, 1);
2589 if (frontend
->d_tcpFastOpenQueueSize
> 0) {
2591 SSetsockopt(cs
->tcpFD
, IPPROTO_TCP
, TCP_FASTOPEN
, frontend
->d_tcpFastOpenQueueSize
);
2593 warnlog("TCP Fast Open has been configured on local address '%s' but is not supported", cs
->local
.toStringWithPort());
2596 if (frontend
->d_reusePort
) {
2598 SSetsockopt(cs
->tcpFD
, SOL_SOCKET
, SO_REUSEPORT
, 1);
2600 warnlog("SO_REUSEPORT has been configured on local address '%s' but is not supported", cs
.local
.toStringWithPort());
2603 if(cs
->local
.sin4
.sin_family
== AF_INET6
) {
2604 SSetsockopt(cs
->tcpFD
, IPPROTO_IPV6
, IPV6_V6ONLY
, 1);
2607 if (!frontend
->d_interface
.empty()) {
2608 #ifdef SO_BINDTODEVICE
2609 int res
= setsockopt(cs
->tcpFD
, SOL_SOCKET
, SO_BINDTODEVICE
, frontend
->d_interface
.c_str(), frontend
->d_interface
.length());
2611 warnlog("Error setting up the interface on local address '%s': %s", cs
->local
.toStringWithPort(), strerror(errno
));
2614 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", cs
->local
.toStringWithPort());
2618 cs
->cpus
= frontend
->d_cpus
;
2620 bindAny(cs
->local
.sin4
.sin_family
, cs
->tcpFD
);
2621 if (frontend
->setupTLS()) {
2622 cs
->tlsFrontend
= frontend
;
2623 SBind(cs
->tcpFD
, cs
->local
);
2624 SListen(cs
->tcpFD
, 64);
2625 warnlog("Listening on %s for TLS", cs
->local
.toStringWithPort());
2626 toLaunch
.push_back(cs
);
2627 g_frontends
.push_back(cs
);
2632 errlog("Error while setting up TLS on local address '%s', exiting", cs
->local
.toStringWithPort());
2633 _exit(EXIT_FAILURE
);
2637 warnlog("dnsdist %s comes with ABSOLUTELY NO WARRANTY. This is free software, and you are welcome to redistribute it according to the terms of the GPL version 2", VERSION
);
2641 g_ACL
.getLocal()->toStringVector(&vec
);
2642 for(const auto& s
: vec
) {
2647 infolog("ACL allowing queries from: %s", acls
.c_str());
2650 g_consoleACL
.getLocal()->toStringVector(&vec
);
2651 for (const auto& entry
: vec
) {
2652 if (!acls
.empty()) {
2657 infolog("Console ACL allowing connections from: %s", acls
.c_str());
2659 #ifdef HAVE_LIBSODIUM
2660 if (g_consoleEnabled
&& g_consoleKey
.empty()) {
2661 warnlog("Warning, the console has been enabled via 'controlSocket()' but no key has been set with 'setKey()' so all connections will fail until a key has been set");
2668 if(!g_cmdLine
.gid
.empty())
2669 newgid
= strToGID(g_cmdLine
.gid
.c_str());
2671 if(!g_cmdLine
.uid
.empty())
2672 newuid
= strToUID(g_cmdLine
.uid
.c_str());
2674 dropGroupPrivs(newgid
);
2675 dropUserPrivs(newuid
);
2677 /* this need to be done _after_ dropping privileges */
2678 g_delay
= new DelayPipe
<DelayedPacket
>();
2684 g_tcpclientthreads
= std::make_shared
<TCPClientCollection
>(g_maxTCPClientThreads
, g_useTCPSinglePipe
);
2689 localPools
= g_pools
.getCopy();
2690 /* create the default pool no matter what */
2691 createPoolIfNotExists(localPools
, "");
2692 if(g_cmdLine
.remotes
.size()) {
2693 for(const auto& address
: g_cmdLine
.remotes
) {
2694 auto ret
=std::make_shared
<DownstreamState
>(ComboAddress(address
, 53));
2695 addServerToPool(localPools
, "", ret
);
2696 if (ret
->connected
&& !ret
->threadStarted
.test_and_set()) {
2697 ret
->tid
= thread(responderThread
, ret
);
2699 g_dstates
.modify([ret
](servers_t
& servers
) { servers
.push_back(ret
); });
2702 g_pools
.setState(localPools
);
2704 if(g_dstates
.getLocal()->empty()) {
2705 errlog("No downstream servers defined: all packets will get dropped");
2706 // you might define them later, but you need to know
2709 checkFileDescriptorsLimits(udpBindsCount
, tcpBindsCount
);
2711 for(auto& dss
: g_dstates
.getCopy()) { // it is a copy, but the internal shared_ptrs are the real deal
2712 if(dss
->availability
==DownstreamState::Availability::Auto
) {
2713 bool newState
=upCheck(*dss
);
2714 warnlog("Marking downstream %s as '%s'", dss
->getNameWithAddr(), newState
? "up" : "down");
2715 dss
->upStatus
= newState
;
2719 for(auto& cs
: toLaunch
) {
2720 if (cs
->udpFD
>= 0) {
2721 thread
t1(udpClientThread
, cs
);
2722 if (!cs
->cpus
.empty()) {
2723 mapThreadToCPUList(t1
.native_handle(), cs
->cpus
);
2727 else if (cs
->tcpFD
>= 0) {
2728 thread
t1(tcpAcceptorThread
, cs
);
2729 if (!cs
->cpus
.empty()) {
2730 mapThreadToCPUList(t1
.native_handle(), cs
->cpus
);
2736 thread
carbonthread(carbonDumpThread
);
2737 carbonthread
.detach();
2739 thread
stattid(maintThread
);
2742 thread
healththread(healthChecksThread
);
2744 if(g_cmdLine
.beSupervised
) {
2746 sd_notify(0, "READY=1");
2748 healththread
.join();
2751 healththread
.detach();
2754 _exit(EXIT_SUCCESS
);
2757 catch(const LuaContext::ExecutionErrorException
& e
) {
2759 errlog("Fatal Lua error: %s", e
.what());
2760 std::rethrow_if_nested(e
);
2761 } catch(const std::exception
& ne
) {
2762 errlog("Details: %s", ne
.what());
2764 catch(PDNSException
&ae
)
2766 errlog("Fatal pdns error: %s", ae
.reason
);
2768 _exit(EXIT_FAILURE
);
2770 catch(std::exception
&e
)
2772 errlog("Fatal error: %s", e
.what());
2773 _exit(EXIT_FAILURE
);
2775 catch(PDNSException
&ae
)
2777 errlog("Fatal pdns error: %s", ae
.reason
);
2778 _exit(EXIT_FAILURE
);