2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
29 #include <netinet/tcp.h>
31 #include <sys/resource.h>
34 #if defined (__OpenBSD__) || defined(__NetBSD__)
35 #include <readline/readline.h>
37 #include <editline/readline.h>
41 #include <systemd/sd-daemon.h>
45 #include "dnsdist-cache.hh"
46 #include "dnsdist-console.hh"
47 #include "dnsdist-ecs.hh"
48 #include "dnsdist-lua.hh"
49 #include "dnsdist-rings.hh"
52 #include "delaypipe.hh"
55 #include "dnswriter.hh"
56 #include "ednsoptions.hh"
60 #include "sodcrypto.hh"
64 thread_local
boost::uuids::random_generator t_uuidGenerator
;
68 Receiver is currently single threaded
69 not *that* bad actually, but now that we are thread safe, might want to scale
73 Set of Rules, if one matches, it leads to an Action
74 Both rules and actions could conceivably be Lua based.
75 On the C++ side, both could be inherited from a class Rule and a class Action,
76 on the Lua side we can't do that. */
82 struct DNSDistStats g_stats
;
83 uint16_t g_maxOutstanding
{10240};
85 bool g_verboseHealthChecks
{false};
86 uint32_t g_staleCacheEntriesTTL
{0};
89 GlobalStateHolder
<NetmaskGroup
> g_ACL
;
90 string g_outputBuffer
;
92 vector
<std::tuple
<ComboAddress
, bool, bool, int, string
, std::set
<int>>> g_locals
;
93 std::vector
<std::shared_ptr
<TLSFrontend
>> g_tlslocals
;
95 std::vector
<std::tuple
<ComboAddress
,std::shared_ptr
<DNSCryptContext
>,bool, int, string
, std::set
<int> >> g_dnsCryptLocals
;
98 shared_ptr
<BPFFilter
> g_defaultBPFFilter
;
99 std::vector
<std::shared_ptr
<DynBPFFilter
> > g_dynBPFFilters
;
100 #endif /* HAVE_EBPF */
101 vector
<ClientState
*> g_frontends
;
102 GlobalStateHolder
<pools_t
> g_pools
;
103 size_t g_udpVectorSize
{1};
105 bool g_snmpEnabled
{false};
106 bool g_snmpTrapsEnabled
{false};
107 DNSDistSNMPAgent
* g_snmpAgent
{nullptr};
109 /* UDP: the grand design. Per socket we listen on for incoming queries there is one thread.
110 Then we have a bunch of connected sockets for talking to downstream servers.
111 We send directly to those sockets.
113 For the return path, per downstream server we have a thread that listens to responses.
115 Per socket there is an array of 2^16 states, when we send out a packet downstream, we note
116 there the original requestor and the original id. The new ID is the offset in the array.
118 When an answer comes in on a socket, we look up the offset by the id, and lob it to the
121 IDs are assigned by atomic increments of the socket offset.
124 GlobalStateHolder
<vector
<DNSDistRuleAction
> > g_rulactions
;
125 GlobalStateHolder
<vector
<DNSDistResponseRuleAction
> > g_resprulactions
;
126 GlobalStateHolder
<vector
<DNSDistResponseRuleAction
> > g_cachehitresprulactions
;
127 GlobalStateHolder
<vector
<DNSDistResponseRuleAction
> > g_selfansweredresprulactions
;
132 GlobalStateHolder
<servers_t
> g_dstates
;
133 GlobalStateHolder
<NetmaskTree
<DynBlock
>> g_dynblockNMG
;
134 GlobalStateHolder
<SuffixMatchTree
<DynBlock
>> g_dynblockSMT
;
135 DNSAction::Action g_dynBlockAction
= DNSAction::Action::Drop
;
136 int g_tcpRecvTimeout
{2};
137 int g_tcpSendTimeout
{2};
140 bool g_servFailOnNoPolicy
{false};
141 bool g_truncateTC
{false};
144 static const size_t s_udpIncomingBufferSize
{1500};
146 static void truncateTC(const char* packet
, uint16_t* len
)
149 unsigned int consumed
;
150 DNSName
qname(packet
, *len
, sizeof(dnsheader
), false, 0, 0, &consumed
);
151 *len
=(uint16_t) (sizeof(dnsheader
)+consumed
+DNS_TYPE_SIZE
+DNS_CLASS_SIZE
);
152 struct dnsheader
* dh
=(struct dnsheader
*)packet
;
153 dh
->ancount
= dh
->arcount
= dh
->nscount
=0;
164 ComboAddress destination
;
165 ComboAddress origDest
;
169 if(origDest
.sin4
.sin_family
== 0) {
170 res
= sendto(fd
, packet
.c_str(), packet
.size(), 0, (struct sockaddr
*)&destination
, destination
.getSocklen());
173 res
= sendfromto(fd
, packet
.c_str(), packet
.size(), 0, origDest
, destination
);
177 vinfolog("Error sending delayed response to %s: %s", destination
.toStringWithPort(), strerror(err
));
182 DelayPipe
<DelayedPacket
> * g_delay
= 0;
184 void doLatencyStats(double udiff
)
186 if(udiff
< 1000) g_stats
.latency0_1
++;
187 else if(udiff
< 10000) g_stats
.latency1_10
++;
188 else if(udiff
< 50000) g_stats
.latency10_50
++;
189 else if(udiff
< 100000) g_stats
.latency50_100
++;
190 else if(udiff
< 1000000) g_stats
.latency100_1000
++;
191 else g_stats
.latencySlow
++;
193 auto doAvg
= [](double& var
, double n
, double weight
) {
194 var
= (weight
-1) * var
/weight
+ n
/weight
;
197 doAvg(g_stats
.latencyAvg100
, udiff
, 100);
198 doAvg(g_stats
.latencyAvg1000
, udiff
, 1000);
199 doAvg(g_stats
.latencyAvg10000
, udiff
, 10000);
200 doAvg(g_stats
.latencyAvg1000000
, udiff
, 1000000);
203 bool responseContentMatches(const char* response
, const uint16_t responseLen
, const DNSName
& qname
, const uint16_t qtype
, const uint16_t qclass
, const ComboAddress
& remote
)
205 uint16_t rqtype
, rqclass
;
206 unsigned int consumed
;
208 const struct dnsheader
* dh
= (struct dnsheader
*) response
;
210 if (responseLen
< sizeof(dnsheader
)) {
214 if (dh
->qdcount
== 0) {
215 if (dh
->rcode
!= RCode::NoError
&& dh
->rcode
!= RCode::NXDomain
) {
219 g_stats
.nonCompliantResponses
++;
225 rqname
=DNSName(response
, responseLen
, sizeof(dnsheader
), false, &rqtype
, &rqclass
, &consumed
);
227 catch(std::exception
& e
) {
228 if(responseLen
> (ssize_t
)sizeof(dnsheader
))
229 infolog("Backend %s sent us a response with id %d that did not parse: %s", remote
.toStringWithPort(), ntohs(dh
->id
), e
.what());
230 g_stats
.nonCompliantResponses
++;
234 if (rqtype
!= qtype
|| rqclass
!= qclass
|| rqname
!= qname
) {
241 void restoreFlags(struct dnsheader
* dh
, uint16_t origFlags
)
243 static const uint16_t rdMask
= 1 << FLAGS_RD_OFFSET
;
244 static const uint16_t cdMask
= 1 << FLAGS_CD_OFFSET
;
245 static const uint16_t restoreFlagsMask
= UINT16_MAX
& ~(rdMask
| cdMask
);
246 uint16_t * flags
= getFlagsFromDNSHeader(dh
);
247 /* clear the flags we are about to restore */
248 *flags
&= restoreFlagsMask
;
249 /* only keep the flags we want to restore */
250 origFlags
&= ~restoreFlagsMask
;
251 /* set the saved flags as they were */
255 bool fixUpResponse(char** response
, uint16_t* responseLen
, size_t* responseSize
, const DNSName
& qname
, uint16_t origFlags
, bool ednsAdded
, bool ecsAdded
, std::vector
<uint8_t>& rewrittenResponse
, uint16_t addRoom
)
257 struct dnsheader
* dh
= (struct dnsheader
*) *response
;
259 if (*responseLen
< sizeof(dnsheader
)) {
263 restoreFlags(dh
, origFlags
);
265 if (*responseLen
== sizeof(dnsheader
)) {
270 string realname
= qname
.toDNSString();
271 if (*responseLen
>= (sizeof(dnsheader
) + realname
.length())) {
272 memcpy(*response
+ sizeof(dnsheader
), realname
.c_str(), realname
.length());
276 if (ednsAdded
|| ecsAdded
) {
277 char * optStart
= NULL
;
281 int res
= locateEDNSOptRR(*response
, *responseLen
, &optStart
, &optLen
, &last
);
285 /* we added the entire OPT RR,
286 therefore we need to remove it entirely */
288 /* simply remove the last AR */
289 *responseLen
-= optLen
;
290 uint16_t arcount
= ntohs(dh
->arcount
);
292 dh
->arcount
= htons(arcount
);
295 /* Removing an intermediary RR could lead to compression error */
296 if (rewriteResponseWithoutEDNS(*response
, *responseLen
, rewrittenResponse
) == 0) {
297 *responseLen
= rewrittenResponse
.size();
298 if (addRoom
&& (UINT16_MAX
- *responseLen
) > addRoom
) {
299 rewrittenResponse
.reserve(*responseLen
+ addRoom
);
301 *responseSize
= rewrittenResponse
.capacity();
302 *response
= reinterpret_cast<char*>(rewrittenResponse
.data());
305 warnlog("Error rewriting content");
310 /* the OPT RR was already present, but without ECS,
311 we need to remove the ECS option if any */
313 /* nothing after the OPT RR, we can simply remove the
315 size_t existingOptLen
= optLen
;
316 removeEDNSOptionFromOPT(optStart
, &optLen
, EDNSOptionCode::ECS
);
317 *responseLen
-= (existingOptLen
- optLen
);
320 /* Removing an intermediary RR could lead to compression error */
321 if (rewriteResponseWithoutEDNSOption(*response
, *responseLen
, EDNSOptionCode::ECS
, rewrittenResponse
) == 0) {
322 *responseLen
= rewrittenResponse
.size();
323 if (addRoom
&& (UINT16_MAX
- *responseLen
) > addRoom
) {
324 rewrittenResponse
.reserve(*responseLen
+ addRoom
);
326 *responseSize
= rewrittenResponse
.capacity();
327 *response
= reinterpret_cast<char*>(rewrittenResponse
.data());
330 warnlog("Error rewriting content");
341 bool encryptResponse(char* response
, uint16_t* responseLen
, size_t responseSize
, bool tcp
, std::shared_ptr
<DNSCryptQuery
> dnsCryptQuery
, dnsheader
** dh
, dnsheader
* dhCopy
)
344 uint16_t encryptedResponseLen
= 0;
346 /* save the original header before encrypting it in place */
347 if (dh
!= nullptr && *dh
!= nullptr && dhCopy
!= nullptr) {
348 memcpy(dhCopy
, *dh
, sizeof(dnsheader
));
352 int res
= dnsCryptQuery
->encryptResponse(response
, *responseLen
, responseSize
, tcp
, &encryptedResponseLen
);
354 *responseLen
= encryptedResponseLen
;
356 /* dropping response */
357 vinfolog("Error encrypting the response, dropping.");
365 static bool sendUDPResponse(int origFD
, char* response
, uint16_t responseLen
, int delayMsec
, const ComboAddress
& origDest
, const ComboAddress
& origRemote
)
367 if(delayMsec
&& g_delay
) {
368 DelayedPacket dp
{origFD
, string(response
,responseLen
), origRemote
, origDest
};
369 g_delay
->submit(dp
, delayMsec
);
373 if(origDest
.sin4
.sin_family
== 0) {
374 res
= sendto(origFD
, response
, responseLen
, 0, (struct sockaddr
*)&origRemote
, origRemote
.getSocklen());
377 res
= sendfromto(origFD
, response
, responseLen
, 0, origDest
, origRemote
);
381 vinfolog("Error sending response to %s: %s", origRemote
.toStringWithPort(), strerror(err
));
389 static int pickBackendSocketForSending(DownstreamState
* state
)
391 return state
->sockets
[state
->socketsOffset
++ % state
->sockets
.size()];
394 static void pickBackendSocketsReadyForReceiving(const std::shared_ptr
<DownstreamState
>& state
, std::vector
<int>& ready
)
398 if (state
->sockets
.size() == 1) {
399 ready
.push_back(state
->sockets
[0]);
404 std::lock_guard
<std::mutex
> lock(state
->socketsLock
);
405 state
->mplexer
->getAvailableFDs(ready
, -1);
409 // listens on a dedicated socket, lobs answers from downstream servers to original requestors
410 void* responderThread(std::shared_ptr
<DownstreamState
> dss
)
412 auto localRespRulactions
= g_resprulactions
.getLocal();
414 char packet
[4096 + DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE
];
415 /* when the answer is encrypted in place, we need to get a copy
416 of the original header before encryption to fill the ring buffer */
421 static_assert(sizeof(packet
) <= UINT16_MAX
, "Packet size should fit in a uint16_t");
422 vector
<uint8_t> rewrittenResponse
;
424 uint16_t queryId
= 0;
425 std::vector
<int> sockets
;
426 sockets
.reserve(dss
->sockets
.size());
429 dnsheader
* dh
= reinterpret_cast<struct dnsheader
*>(packet
);
430 bool outstandingDecreased
= false;
432 pickBackendSocketsReadyForReceiving(dss
, sockets
);
433 for (const auto& fd
: sockets
) {
434 ssize_t got
= recv(fd
, packet
, sizeof(packet
), 0);
435 char * response
= packet
;
436 size_t responseSize
= sizeof(packet
);
438 if (got
< (ssize_t
) sizeof(dnsheader
))
441 uint16_t responseLen
= (uint16_t) got
;
444 if(queryId
>= dss
->idStates
.size())
447 IDState
* ids
= &dss
->idStates
[queryId
];
448 int origFD
= ids
->origFD
;
450 if(origFD
< 0) // duplicate
453 /* setting age to 0 to prevent the maintainer thread from
454 cleaning this IDS while we process the response.
455 We have already a copy of the origFD, so it would
456 mostly mess up the outstanding counter.
460 if (!responseContentMatches(response
, responseLen
, ids
->qname
, ids
->qtype
, ids
->qclass
, dss
->remote
)) {
464 --dss
->outstanding
; // you'd think an attacker could game this, but we're using connected socket
465 outstandingDecreased
= true;
467 if(dh
->tc
&& g_truncateTC
) {
468 truncateTC(response
, &responseLen
);
471 dh
->id
= ids
->origID
;
473 uint16_t addRoom
= 0;
474 DNSResponse
dr(&ids
->qname
, ids
->qtype
, ids
->qclass
, &ids
->origDest
, &ids
->origRemote
, dh
, sizeof(packet
), responseLen
, false, &ids
->sentTime
.d_start
);
476 dr
.uniqueId
= ids
->uniqueId
;
480 if (!processResponse(localRespRulactions
, dr
, &ids
->delayMsec
)) {
485 if (ids
->dnsCryptQuery
) {
486 addRoom
= DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE
;
489 if (!fixUpResponse(&response
, &responseLen
, &responseSize
, ids
->qname
, ids
->origFlags
, ids
->ednsAdded
, ids
->ecsAdded
, rewrittenResponse
, addRoom
)) {
493 if (ids
->packetCache
&& !ids
->skipCache
) {
494 ids
->packetCache
->insert(ids
->cacheKey
, ids
->qname
, ids
->qtype
, ids
->qclass
, response
, responseLen
, false, dh
->rcode
, ids
->tempFailureTTL
);
497 if (ids
->cs
&& !ids
->cs
->muted
) {
499 if (!encryptResponse(response
, &responseLen
, responseSize
, false, ids
->dnsCryptQuery
, &dh
, &dhCopy
)) {
505 empty
.sin4
.sin_family
= 0;
506 /* if ids->destHarvested is false, origDest holds the listening address.
507 We don't want to use that as a source since it could be 0.0.0.0 for example. */
508 sendUDPResponse(origFD
, response
, responseLen
, ids
->delayMsec
, ids
->destHarvested
? ids
->origDest
: empty
, ids
->origRemote
);
513 double udiff
= ids
->sentTime
.udiff();
514 vinfolog("Got answer from %s, relayed to %s, took %f usec", dss
->remote
.toStringWithPort(), ids
->origRemote
.toStringWithPort(), udiff
);
518 g_rings
.insertResponse(ts
, ids
->origRemote
, ids
->qname
, ids
->qtype
, (unsigned int)udiff
, (unsigned int)got
, *dh
, dss
->remote
);
520 if(dh
->rcode
== RCode::ServFail
)
521 g_stats
.servfailResponses
++;
522 dss
->latencyUsec
= (127.0 * dss
->latencyUsec
/ 128.0) + udiff
/128.0;
524 doLatencyStats(udiff
);
526 if (ids
->origFD
== origFD
) {
528 ids
->dnsCryptQuery
= nullptr;
531 outstandingDecreased
= false;
534 rewrittenResponse
.clear();
537 catch(const std::exception
& e
){
538 vinfolog("Got an error in UDP responder thread while parsing a response from %s, id %d: %s", dss
->remote
.toStringWithPort(), queryId
, e
.what());
539 if (outstandingDecreased
) {
540 /* so an exception was raised after we decreased the outstanding queries counter,
541 but before we could set ids->origFD to -1 (because we also set outstandingDecreased
542 to false then), meaning the IDS is still considered active and we will decrease the
543 counter again on a duplicate, or simply while reaping downstream timeouts, so let's
551 catch(const std::exception
& e
)
553 errlog("UDP responder thread died because of exception: %s", e
.what());
556 catch(const PDNSException
& e
)
558 errlog("UDP responder thread died because of PowerDNS exception: %s", e
.reason
);
563 errlog("UDP responder thread died because of an exception: %s", "unknown");
567 void DownstreamState::reconnect()
570 for (auto& fd
: sockets
) {
573 std::lock_guard
<std::mutex
> lock(socketsLock
);
574 mplexer
->removeReadFD(fd
);
576 /* shutdown() is needed to wake up recv() in the responderThread */
577 shutdown(fd
, SHUT_RDWR
);
581 if (!IsAnyAddress(remote
)) {
582 fd
= SSocket(remote
.sin4
.sin_family
, SOCK_DGRAM
, 0);
583 if (!IsAnyAddress(sourceAddr
)) {
584 SSetsockopt(fd
, SOL_SOCKET
, SO_REUSEADDR
, 1);
585 SBind(fd
, sourceAddr
);
588 SConnect(fd
, remote
);
590 std::lock_guard
<std::mutex
> lock(socketsLock
);
591 mplexer
->addReadFD(fd
, [](int, boost::any
) {});
595 catch(const std::runtime_error
& error
) {
596 infolog("Error connecting to new server with address %s: %s", remote
.toStringWithPort(), error
.what());
603 /* if at least one (re-)connection failed, close all sockets */
605 for (auto& fd
: sockets
) {
607 /* shutdown() is needed to wake up recv() in the responderThread */
608 shutdown(fd
, SHUT_RDWR
);
616 DownstreamState::DownstreamState(const ComboAddress
& remote_
, const ComboAddress
& sourceAddr_
, unsigned int sourceItf_
, size_t numberOfSockets
): remote(remote_
), sourceAddr(sourceAddr_
), sourceItf(sourceItf_
)
618 mplexer
= std::unique_ptr
<FDMultiplexer
>(FDMultiplexer::getMultiplexerSilent());
620 sockets
.resize(numberOfSockets
);
621 for (auto& fd
: sockets
) {
625 if (!IsAnyAddress(remote
)) {
627 idStates
.resize(g_maxOutstanding
);
629 infolog("Added downstream server %s", remote
.toStringWithPort());
633 std::mutex g_luamutex
;
636 GlobalStateHolder
<ServerPolicy
> g_policy
;
638 shared_ptr
<DownstreamState
> firstAvailable(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
640 for(auto& d
: servers
) {
641 if(d
.second
->isUp() && d
.second
->qps
.check())
644 return leastOutstanding(servers
, dq
);
647 // get server with least outstanding queries, and within those, with the lowest order, and within those: the fastest
648 shared_ptr
<DownstreamState
> leastOutstanding(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
650 if (servers
.size() == 1 && servers
[0].second
->isUp()) {
651 return servers
[0].second
;
654 vector
<pair
<tuple
<int,int,double>, shared_ptr
<DownstreamState
>>> poss
;
655 /* so you might wonder, why do we go through this trouble? The data on which we sort could change during the sort,
656 which would suck royally and could even lead to crashes. So first we snapshot on what we sort, and then we sort */
657 poss
.reserve(servers
.size());
658 for(auto& d
: servers
) {
659 if(d
.second
->isUp()) {
660 poss
.push_back({make_tuple(d
.second
->outstanding
.load(), d
.second
->order
, d
.second
->latencyUsec
), d
.second
});
664 return shared_ptr
<DownstreamState
>();
665 nth_element(poss
.begin(), poss
.begin(), poss
.end(), [](const decltype(poss
)::value_type
& a
, const decltype(poss
)::value_type
& b
) { return a
.first
< b
.first
; });
666 return poss
.begin()->second
;
669 shared_ptr
<DownstreamState
> valrandom(unsigned int val
, const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
671 vector
<pair
<int, shared_ptr
<DownstreamState
>>> poss
;
673 for(auto& d
: servers
) { // w=1, w=10 -> 1, 11
674 if(d
.second
->isUp()) {
675 sum
+=d
.second
->weight
;
676 poss
.push_back({sum
, d
.second
});
680 // Catch poss & sum are empty to avoid SIGFPE
682 return shared_ptr
<DownstreamState
>();
685 auto p
= upper_bound(poss
.begin(), poss
.end(),r
, [](int r_
, const decltype(poss
)::value_type
& a
) { return r_
< a
.first
;});
687 return shared_ptr
<DownstreamState
>();
691 shared_ptr
<DownstreamState
> wrandom(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
693 return valrandom(random(), servers
, dq
);
696 uint32_t g_hashperturb
;
697 shared_ptr
<DownstreamState
> whashed(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
699 return valrandom(dq
->qname
->hash(g_hashperturb
), servers
, dq
);
703 shared_ptr
<DownstreamState
> roundrobin(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
705 NumberedServerVector poss
;
707 for(auto& d
: servers
) {
708 if(d
.second
->isUp()) {
713 const auto *res
=&poss
;
718 return shared_ptr
<DownstreamState
>();
720 static unsigned int counter
;
722 return (*res
)[(counter
++) % res
->size()].second
;
725 ComboAddress g_serverControl
{"127.0.0.1:5199"};
727 std::shared_ptr
<ServerPool
> createPoolIfNotExists(pools_t
& pools
, const string
& poolName
)
729 std::shared_ptr
<ServerPool
> pool
;
730 pools_t::iterator it
= pools
.find(poolName
);
731 if (it
!= pools
.end()) {
735 if (!poolName
.empty())
736 vinfolog("Creating pool %s", poolName
);
737 pool
= std::make_shared
<ServerPool
>();
738 pools
.insert(std::pair
<std::string
,std::shared_ptr
<ServerPool
> >(poolName
, pool
));
743 void setPoolPolicy(pools_t
& pools
, const string
& poolName
, std::shared_ptr
<ServerPolicy
> policy
)
745 std::shared_ptr
<ServerPool
> pool
= createPoolIfNotExists(pools
, poolName
);
746 if (!poolName
.empty()) {
747 vinfolog("Setting pool %s server selection policy to %s", poolName
, policy
->name
);
749 vinfolog("Setting default pool server selection policy to %s", policy
->name
);
751 pool
->policy
= policy
;
754 void addServerToPool(pools_t
& pools
, const string
& poolName
, std::shared_ptr
<DownstreamState
> server
)
756 std::shared_ptr
<ServerPool
> pool
= createPoolIfNotExists(pools
, poolName
);
757 if (!poolName
.empty()) {
758 vinfolog("Adding server to pool %s", poolName
);
760 vinfolog("Adding server to default pool");
762 pool
->addServer(server
);
765 void removeServerFromPool(pools_t
& pools
, const string
& poolName
, std::shared_ptr
<DownstreamState
> server
)
767 std::shared_ptr
<ServerPool
> pool
= getPool(pools
, poolName
);
769 if (!poolName
.empty()) {
770 vinfolog("Removing server from pool %s", poolName
);
773 vinfolog("Removing server from default pool");
776 pool
->removeServer(server
);
779 std::shared_ptr
<ServerPool
> getPool(const pools_t
& pools
, const std::string
& poolName
)
781 pools_t::const_iterator it
= pools
.find(poolName
);
783 if (it
== pools
.end()) {
784 throw std::out_of_range("No pool named " + poolName
);
790 NumberedServerVector
getDownstreamCandidates(const pools_t
& pools
, const std::string
& poolName
)
792 std::shared_ptr
<ServerPool
> pool
= getPool(pools
, poolName
);
793 return pool
->getServers();
796 // goal in life - if you send us a reasonably normal packet, we'll get Z for you, otherwise 0
797 int getEDNSZ(const char* packet
, unsigned int len
)
800 struct dnsheader
* dh
=(struct dnsheader
*)packet
;
802 if(ntohs(dh
->qdcount
) != 1 || dh
->ancount
!=0 || ntohs(dh
->arcount
)!=1 || dh
->nscount
!=0)
805 if (len
<= sizeof(dnsheader
))
808 unsigned int consumed
;
809 DNSName
qname(packet
, len
, sizeof(dnsheader
), false, 0, 0, &consumed
);
810 size_t pos
= consumed
+ DNS_TYPE_SIZE
+ DNS_CLASS_SIZE
;
811 uint16_t qtype
, qclass
;
813 if (len
<= (sizeof(dnsheader
)+pos
))
816 DNSName
aname(packet
, len
, sizeof(dnsheader
)+pos
, true, &qtype
, &qclass
, &consumed
);
818 if(qtype
!=QType::OPT
|| sizeof(dnsheader
)+pos
+consumed
+DNS_TYPE_SIZE
+DNS_CLASS_SIZE
+EDNS_EXTENDED_RCODE_SIZE
+EDNS_VERSION_SIZE
+1 >= len
)
821 uint8_t* z
= (uint8_t*)packet
+sizeof(dnsheader
)+pos
+consumed
+DNS_TYPE_SIZE
+DNS_CLASS_SIZE
+EDNS_EXTENDED_RCODE_SIZE
+EDNS_VERSION_SIZE
;
822 return 0x100 * (*z
) + *(z
+1);
829 static void spoofResponseFromString(DNSQuestion
& dq
, const string
& spoofContent
)
833 std::vector
<std::string
> addrs
;
834 stringtok(addrs
, spoofContent
, " ,");
836 if (addrs
.size() == 1) {
838 ComboAddress
spoofAddr(spoofContent
);
839 SpoofAction
sa({spoofAddr
});
842 catch(const PDNSException
&e
) {
843 SpoofAction
sa(spoofContent
); // CNAME then
847 std::vector
<ComboAddress
> cas
;
848 for (const auto& addr
: addrs
) {
850 cas
.push_back(ComboAddress(addr
));
860 bool processQuery(LocalHolders
& holders
, DNSQuestion
& dq
, string
& poolname
, int* delayMsec
, const struct timespec
& now
)
862 g_rings
.insertQuery(now
,*dq
.remote
,*dq
.qname
,dq
.qtype
,dq
.len
,*dq
.dh
);
864 if(g_qcount
.enabled
) {
865 string qname
= (*dq
.qname
).toString(".");
866 bool countQuery
{true};
867 if(g_qcount
.filter
) {
868 std::lock_guard
<std::mutex
> lock(g_luamutex
);
869 std::tie (countQuery
, qname
) = g_qcount
.filter(dq
);
873 WriteLock
wl(&g_qcount
.queryLock
);
874 if(!g_qcount
.records
.count(qname
)) {
875 g_qcount
.records
[qname
] = 0;
877 g_qcount
.records
[qname
]++;
881 if(auto got
= holders
.dynNMGBlock
->lookup(*dq
.remote
)) {
882 auto updateBlockStats
= [&got
]() {
883 g_stats
.dynBlocked
++;
884 got
->second
.blocks
++;
887 if(now
< got
->second
.until
) {
888 DNSAction::Action action
= got
->second
.action
;
889 if (action
== DNSAction::Action::None
) {
890 action
= g_dynBlockAction
;
892 if (action
== DNSAction::Action::Refused
) {
893 vinfolog("Query from %s refused because of dynamic block", dq
.remote
->toStringWithPort());
896 dq
.dh
->rcode
= RCode::Refused
;
900 else if (action
== DNSAction::Action::Truncate
) {
903 vinfolog("Query from %s truncated because of dynamic block", dq
.remote
->toStringWithPort());
909 vinfolog("Query from %s for %s over TCP *not* truncated because of dynamic block", dq
.remote
->toStringWithPort(), dq
.qname
->toString());
915 vinfolog("Query from %s dropped because of dynamic block", dq
.remote
->toStringWithPort());
921 if(auto got
= holders
.dynSMTBlock
->lookup(*dq
.qname
)) {
922 auto updateBlockStats
= [&got
]() {
923 g_stats
.dynBlocked
++;
927 if(now
< got
->until
) {
928 DNSAction::Action action
= got
->action
;
929 if (action
== DNSAction::Action::None
) {
930 action
= g_dynBlockAction
;
932 if (action
== DNSAction::Action::Refused
) {
933 vinfolog("Query from %s for %s refused because of dynamic block", dq
.remote
->toStringWithPort(), dq
.qname
->toString());
936 dq
.dh
->rcode
= RCode::Refused
;
940 else if (action
== DNSAction::Action::Truncate
) {
944 vinfolog("Query from %s for %s truncated because of dynamic block", dq
.remote
->toStringWithPort(), dq
.qname
->toString());
950 vinfolog("Query from %s for %s over TCP *not* truncated because of dynamic block", dq
.remote
->toStringWithPort(), dq
.qname
->toString());
955 vinfolog("Query from %s for %s dropped because of dynamic block", dq
.remote
->toStringWithPort(), dq
.qname
->toString());
961 DNSAction::Action action
=DNSAction::Action::None
;
963 for(const auto& lr
: *holders
.rulactions
) {
964 if(lr
.d_rule
->matches(&dq
)) {
965 lr
.d_rule
->d_matches
++;
966 action
=(*lr
.d_action
)(&dq
, &ruleresult
);
969 case DNSAction::Action::Allow
:
972 case DNSAction::Action::Drop
:
976 case DNSAction::Action::Nxdomain
:
977 dq
.dh
->rcode
= RCode::NXDomain
;
979 g_stats
.ruleNXDomain
++;
982 case DNSAction::Action::Refused
:
983 dq
.dh
->rcode
= RCode::Refused
;
985 g_stats
.ruleRefused
++;
988 case DNSAction::Action::ServFail
:
989 dq
.dh
->rcode
= RCode::ServFail
;
991 g_stats
.ruleServFail
++;
994 case DNSAction::Action::Spoof
:
995 spoofResponseFromString(dq
, ruleresult
);
998 case DNSAction::Action::Truncate
:
1003 case DNSAction::Action::HeaderModify
:
1006 case DNSAction::Action::Pool
:
1007 poolname
=ruleresult
;
1010 /* non-terminal actions follow */
1011 case DNSAction::Action::Delay
:
1012 *delayMsec
= static_cast<int>(pdns_stou(ruleresult
)); // sorry
1014 case DNSAction::Action::None
:
1023 bool processResponse(LocalStateHolder
<vector
<DNSDistResponseRuleAction
> >& localRespRulactions
, DNSResponse
& dr
, int* delayMsec
)
1025 DNSResponseAction::Action action
=DNSResponseAction::Action::None
;
1026 std::string ruleresult
;
1027 for(const auto& lr
: *localRespRulactions
) {
1028 if(lr
.d_rule
->matches(&dr
)) {
1029 lr
.d_rule
->d_matches
++;
1030 action
=(*lr
.d_action
)(&dr
, &ruleresult
);
1032 case DNSResponseAction::Action::Allow
:
1035 case DNSResponseAction::Action::Drop
:
1038 case DNSResponseAction::Action::HeaderModify
:
1041 case DNSResponseAction::Action::ServFail
:
1042 dr
.dh
->rcode
= RCode::ServFail
;
1045 /* non-terminal actions follow */
1046 case DNSResponseAction::Action::Delay
:
1047 *delayMsec
= static_cast<int>(pdns_stou(ruleresult
)); // sorry
1049 case DNSResponseAction::Action::None
:
1058 static ssize_t
udpClientSendRequestToBackend(DownstreamState
* ss
, const int sd
, const char* request
, const size_t requestLen
)
1062 if (ss
->sourceItf
== 0) {
1063 result
= send(sd
, request
, requestLen
, 0);
1069 fillMSGHdr(&msgh
, &iov
, cbuf
, sizeof(cbuf
), const_cast<char*>(request
), requestLen
, &ss
->remote
);
1070 addCMsgSrcAddr(&msgh
, cbuf
, &ss
->sourceAddr
, ss
->sourceItf
);
1071 result
= sendmsg(sd
, &msgh
, 0);
1075 int savederrno
= errno
;
1076 vinfolog("Error sending request to backend %s: %d", ss
->remote
.toStringWithPort(), savederrno
);
1078 /* This might sound silly, but on Linux send() might fail with EINVAL
1079 if the interface the socket was bound to doesn't exist anymore. */
1080 if (savederrno
== EINVAL
) {
1088 bool addXPF(DNSQuestion
& dq
, uint16_t optionCode
)
1090 std::string payload
= generateXPFPayload(dq
.tcp
, *dq
.remote
, *dq
.local
);
1091 uint8_t root
= '\0';
1092 dnsrecordheader drh
;
1093 drh
.d_type
= htons(optionCode
);
1094 drh
.d_class
= htons(QClass::IN
);
1096 drh
.d_clen
= htons(payload
.size());
1097 size_t recordHeaderLen
= sizeof(root
) + sizeof(drh
);
1099 size_t available
= dq
.size
- dq
.len
;
1101 if ((payload
.size() + recordHeaderLen
) > available
) {
1105 size_t pos
= dq
.len
;
1106 memcpy(reinterpret_cast<char*>(dq
.dh
) + pos
, &root
, sizeof(root
));
1107 pos
+= sizeof(root
);
1108 memcpy(reinterpret_cast<char*>(dq
.dh
) + pos
, &drh
, sizeof(drh
));
1110 memcpy(reinterpret_cast<char*>(dq
.dh
) + pos
, payload
.data(), payload
.size());
1111 pos
+= payload
.size();
1115 dq
.dh
->arcount
= htons(ntohs(dq
.dh
->arcount
) + 1);
1120 static bool isUDPQueryAcceptable(ClientState
& cs
, LocalHolders
& holders
, const struct msghdr
* msgh
, const ComboAddress
& remote
, ComboAddress
& dest
)
1122 if (msgh
->msg_flags
& MSG_TRUNC
) {
1123 /* message was too large for our buffer */
1124 vinfolog("Dropping message too large for our buffer");
1125 g_stats
.nonCompliantQueries
++;
1129 if(!holders
.acl
->match(remote
)) {
1130 vinfolog("Query from %s dropped because of ACL", remote
.toStringWithPort());
1138 if (HarvestDestinationAddress(msgh
, &dest
)) {
1139 /* we don't get the port, only the address */
1140 dest
.sin4
.sin_port
= cs
.local
.sin4
.sin_port
;
1143 dest
.sin4
.sin_family
= 0;
1149 #ifdef HAVE_DNSCRYPT
1150 static bool checkDNSCryptQuery(const ClientState
& cs
, const char* query
, uint16_t& len
, std::shared_ptr
<DNSCryptQuery
>& dnsCryptQuery
, const ComboAddress
& dest
, const ComboAddress
& remote
, time_t now
)
1152 if (cs
.dnscryptCtx
) {
1153 vector
<uint8_t> response
;
1154 uint16_t decryptedQueryLen
= 0;
1156 dnsCryptQuery
= std::make_shared
<DNSCryptQuery
>(cs
.dnscryptCtx
);
1158 bool decrypted
= handleDNSCryptQuery(const_cast<char*>(query
), len
, dnsCryptQuery
, &decryptedQueryLen
, false, now
, response
);
1161 if (response
.size() > 0) {
1162 sendUDPResponse(cs
.udpFD
, reinterpret_cast<char*>(response
.data()), static_cast<uint16_t>(response
.size()), 0, dest
, remote
);
1167 len
= decryptedQueryLen
;
1171 #endif /* HAVE_DNSCRYPT */
1173 bool checkQueryHeaders(const struct dnsheader
* dh
)
1175 if (dh
->qr
) { // don't respond to responses
1176 g_stats
.nonCompliantQueries
++;
1180 if (dh
->qdcount
== 0) {
1181 g_stats
.emptyQueries
++;
1186 g_stats
.rdQueries
++;
1192 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1193 static void queueResponse(const ClientState
& cs
, const char* response
, uint16_t responseLen
, const ComboAddress
& dest
, const ComboAddress
& remote
, struct mmsghdr
& outMsg
, struct iovec
* iov
, char* cbuf
)
1196 fillMSGHdr(&outMsg
.msg_hdr
, iov
, nullptr, 0, const_cast<char*>(response
), responseLen
, const_cast<ComboAddress
*>(&remote
));
1198 if (dest
.sin4
.sin_family
== 0) {
1199 outMsg
.msg_hdr
.msg_control
= nullptr;
1202 addCMsgSrcAddr(&outMsg
.msg_hdr
, cbuf
, &dest
, 0);
1205 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1207 static void processUDPQuery(ClientState
& cs
, LocalHolders
& holders
, const struct msghdr
* msgh
, const ComboAddress
& remote
, ComboAddress
& dest
, char* query
, uint16_t len
, size_t queryBufferSize
, struct mmsghdr
* responsesVect
, unsigned int* queuedResponses
, struct iovec
* respIOV
, char* respCBuf
)
1209 assert(responsesVect
== nullptr || (queuedResponses
!= nullptr && respIOV
!= nullptr && respCBuf
!= nullptr));
1210 uint16_t queryId
= 0;
1213 if (!isUDPQueryAcceptable(cs
, holders
, msgh
, remote
, dest
)) {
1217 /* we need an accurate ("real") value for the response and
1218 to store into the IDS, but not for insertion into the
1219 rings for example */
1220 struct timespec queryRealTime
;
1221 struct timespec now
;
1223 gettime(&queryRealTime
, true);
1225 #ifdef HAVE_DNSCRYPT
1226 std::shared_ptr
<DNSCryptQuery
> dnsCryptQuery
= nullptr;
1228 if (!checkDNSCryptQuery(cs
, query
, len
, dnsCryptQuery
, dest
, remote
, queryRealTime
.tv_sec
)) {
1233 struct dnsheader
* dh
= reinterpret_cast<struct dnsheader
*>(query
);
1234 queryId
= ntohs(dh
->id
);
1236 if (!checkQueryHeaders(dh
)) {
1242 const uint16_t * flags
= getFlagsFromDNSHeader(dh
);
1243 const uint16_t origFlags
= *flags
;
1244 uint16_t qtype
, qclass
;
1245 unsigned int consumed
= 0;
1246 DNSName
qname(query
, len
, sizeof(dnsheader
), false, &qtype
, &qclass
, &consumed
);
1247 DNSQuestion
dq(&qname
, qtype
, qclass
, dest
.sin4
.sin_family
!= 0 ? &dest
: &cs
.local
, &remote
, dh
, queryBufferSize
, len
, false, &queryRealTime
);
1249 if (!processQuery(holders
, dq
, poolname
, &delayMsec
, now
))
1254 if(dq
.dh
->qr
) { // something turned it into a response
1255 restoreFlags(dh
, origFlags
);
1258 char* response
= query
;
1259 uint16_t responseLen
= dq
.len
;
1261 DNSResponse
dr(dq
.qname
, dq
.qtype
, dq
.qclass
, dq
.local
, dq
.remote
, reinterpret_cast<dnsheader
*>(response
), dq
.size
, responseLen
, false, &queryRealTime
);
1262 #ifdef HAVE_PROTOBUF
1263 dr
.uniqueId
= dq
.uniqueId
;
1267 if (!processResponse(holders
.selfAnsweredRespRulactions
, dr
, &delayMsec
)) {
1271 #ifdef HAVE_DNSCRYPT
1272 if (!encryptResponse(response
, &responseLen
, dq
.size
, false, dnsCryptQuery
, nullptr, nullptr)) {
1276 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1277 if (delayMsec
== 0 && responsesVect
!= nullptr) {
1278 queueResponse(cs
, response
, responseLen
, dest
, remote
, responsesVect
[*queuedResponses
], respIOV
, respCBuf
);
1279 (*queuedResponses
)++;
1282 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1284 sendUDPResponse(cs
.udpFD
, response
, responseLen
, delayMsec
, dest
, remote
);
1287 g_stats
.selfAnswered
++;
1288 doLatencyStats(0); // we're not going to measure this
1294 DownstreamState
* ss
= nullptr;
1295 std::shared_ptr
<ServerPool
> serverPool
= getPool(*holders
.pools
, poolname
);
1296 std::shared_ptr
<DNSDistPacketCache
> packetCache
= serverPool
->packetCache
;
1297 auto policy
= *(holders
.policy
);
1298 if (serverPool
->policy
!= nullptr) {
1299 policy
= *(serverPool
->policy
);
1301 auto servers
= serverPool
->getServers();
1303 std::lock_guard
<std::mutex
> lock(g_luamutex
);
1304 ss
= policy
.policy(servers
, &dq
).get();
1307 ss
= policy
.policy(servers
, &dq
).get();
1310 bool ednsAdded
= false;
1311 bool ecsAdded
= false;
1312 if (dq
.useECS
&& ((ss
&& ss
->useECS
) || (!ss
&& serverPool
->getECS()))) {
1313 if (!handleEDNSClientSubnet(query
, dq
.size
, consumed
, &dq
.len
, &(ednsAdded
), &(ecsAdded
), remote
, dq
.ecsOverride
, dq
.ecsPrefixLength
)) {
1314 vinfolog("Dropping query from %s because we couldn't insert the ECS value", remote
.toStringWithPort());
1319 uint32_t cacheKey
= 0;
1320 if (packetCache
&& !dq
.skipCache
) {
1321 uint16_t cachedResponseSize
= dq
.size
;
1322 uint32_t allowExpired
= ss
? 0 : g_staleCacheEntriesTTL
;
1323 if (packetCache
->get(dq
, consumed
, dh
->id
, query
, &cachedResponseSize
, &cacheKey
, allowExpired
)) {
1324 DNSResponse
dr(dq
.qname
, dq
.qtype
, dq
.qclass
, dq
.local
, dq
.remote
, reinterpret_cast<dnsheader
*>(query
), dq
.size
, cachedResponseSize
, false, &queryRealTime
);
1325 #ifdef HAVE_PROTOBUF
1326 dr
.uniqueId
= dq
.uniqueId
;
1330 if (!processResponse(holders
.cacheHitRespRulactions
, dr
, &delayMsec
)) {
1335 #ifdef HAVE_DNSCRYPT
1336 if (!encryptResponse(query
, &cachedResponseSize
, dq
.size
, false, dnsCryptQuery
, nullptr, nullptr)) {
1340 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1341 if (delayMsec
== 0 && responsesVect
!= nullptr) {
1342 queueResponse(cs
, query
, cachedResponseSize
, dest
, remote
, responsesVect
[*queuedResponses
], respIOV
, respCBuf
);
1343 (*queuedResponses
)++;
1346 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1348 sendUDPResponse(cs
.udpFD
, query
, cachedResponseSize
, delayMsec
, dest
, remote
);
1352 g_stats
.cacheHits
++;
1353 doLatencyStats(0); // we're not going to measure this
1356 g_stats
.cacheMisses
++;
1362 if (g_servFailOnNoPolicy
&& !cs
.muted
) {
1363 char* response
= query
;
1364 uint16_t responseLen
= dq
.len
;
1365 restoreFlags(dh
, origFlags
);
1367 dq
.dh
->rcode
= RCode::ServFail
;
1370 DNSResponse
dr(dq
.qname
, dq
.qtype
, dq
.qclass
, dq
.local
, dq
.remote
, reinterpret_cast<dnsheader
*>(response
), dq
.size
, responseLen
, false, &queryRealTime
);
1371 #ifdef HAVE_PROTOBUF
1372 dr
.uniqueId
= dq
.uniqueId
;
1376 if (!processResponse(holders
.selfAnsweredRespRulactions
, dr
, &delayMsec
)) {
1380 #ifdef HAVE_DNSCRYPT
1381 if (!encryptResponse(response
, &responseLen
, dq
.size
, false, dnsCryptQuery
, nullptr, nullptr)) {
1385 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1386 if (responsesVect
!= nullptr) {
1387 queueResponse(cs
, response
, responseLen
, dest
, remote
, responsesVect
[*queuedResponses
], respIOV
, respCBuf
);
1388 (*queuedResponses
)++;
1391 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1393 sendUDPResponse(cs
.udpFD
, response
, responseLen
, 0, dest
, remote
);
1396 // no response-only statistics counter to update.
1397 doLatencyStats(0); // we're not going to measure this
1399 vinfolog("%s query for %s|%s from %s, no policy applied", g_servFailOnNoPolicy
? "ServFailed" : "Dropped", dq
.qname
->toString(), QType(dq
.qtype
).getName(), remote
.toStringWithPort());
1403 if (dq
.addXPF
&& ss
->xpfRRCode
!= 0) {
1404 addXPF(dq
, ss
->xpfRRCode
);
1409 unsigned int idOffset
= (ss
->idOffset
++) % ss
->idStates
.size();
1410 IDState
* ids
= &ss
->idStates
[idOffset
];
1413 if(ids
->origFD
< 0) // if we are reusing, no change in outstanding
1417 g_stats
.downstreamTimeouts
++;
1421 ids
->origFD
= cs
.udpFD
;
1422 ids
->origID
= dh
->id
;
1423 ids
->origRemote
= remote
;
1424 ids
->sentTime
.set(queryRealTime
);
1426 ids
->qtype
= dq
.qtype
;
1427 ids
->qclass
= dq
.qclass
;
1428 ids
->delayMsec
= delayMsec
;
1429 ids
->tempFailureTTL
= dq
.tempFailureTTL
;
1430 ids
->origFlags
= origFlags
;
1431 ids
->cacheKey
= cacheKey
;
1432 ids
->skipCache
= dq
.skipCache
;
1433 ids
->packetCache
= packetCache
;
1434 ids
->ednsAdded
= ednsAdded
;
1435 ids
->ecsAdded
= ecsAdded
;
1436 ids
->qTag
= dq
.qTag
;
1438 /* If we couldn't harvest the real dest addr, still
1439 write down the listening addr since it will be useful
1440 (especially if it's not an 'any' one).
1441 We need to keep track of which one it is since we may
1442 want to use the real but not the listening addr to reply.
1444 if (dest
.sin4
.sin_family
!= 0) {
1445 ids
->origDest
= dest
;
1446 ids
->destHarvested
= true;
1449 ids
->origDest
= cs
.local
;
1450 ids
->destHarvested
= false;
1452 #ifdef HAVE_DNSCRYPT
1453 ids
->dnsCryptQuery
= dnsCryptQuery
;
1455 #ifdef HAVE_PROTOBUF
1456 ids
->uniqueId
= dq
.uniqueId
;
1461 int fd
= pickBackendSocketForSending(ss
);
1462 ssize_t ret
= udpClientSendRequestToBackend(ss
, fd
, query
, dq
.len
);
1466 g_stats
.downstreamSendErrors
++;
1469 vinfolog("Got query for %s|%s from %s, relayed to %s", ids
->qname
.toString(), QType(ids
->qtype
).getName(), remote
.toStringWithPort(), ss
->getName());
1471 catch(const std::exception
& e
){
1472 vinfolog("Got an error in UDP question thread while parsing a query from %s, id %d: %s", remote
.toStringWithPort(), queryId
, e
.what());
1476 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1477 static void MultipleMessagesUDPClientThread(ClientState
* cs
, LocalHolders
& holders
)
1482 /* used by HarvestDestinationAddress */
1484 ComboAddress remote
;
1488 const size_t vectSize
= g_udpVectorSize
;
1489 /* the actual buffer is larger because:
1490 - we may have to add EDNS and/or ECS
1491 - we use it for self-generated responses (from rule or cache)
1492 but we only accept incoming payloads up to that size
1494 static_assert(s_udpIncomingBufferSize
<= sizeof(MMReceiver::packet
), "the incoming buffer size should not be larger than sizeof(MMReceiver::packet)");
1496 auto recvData
= std::unique_ptr
<MMReceiver
[]>(new MMReceiver
[vectSize
]);
1497 auto msgVec
= std::unique_ptr
<struct mmsghdr
[]>(new struct mmsghdr
[vectSize
]);
1498 auto outMsgVec
= std::unique_ptr
<struct mmsghdr
[]>(new struct mmsghdr
[vectSize
]);
1500 /* initialize the structures needed to receive our messages */
1501 for (size_t idx
= 0; idx
< vectSize
; idx
++) {
1502 recvData
[idx
].remote
.sin4
.sin_family
= cs
->local
.sin4
.sin_family
;
1503 fillMSGHdr(&msgVec
[idx
].msg_hdr
, &recvData
[idx
].iov
, recvData
[idx
].cbuf
, sizeof(recvData
[idx
].cbuf
), recvData
[idx
].packet
, s_udpIncomingBufferSize
, &recvData
[idx
].remote
);
1509 /* reset the IO vector, since it's also used to send the vector of responses
1510 to avoid having to copy the data around */
1511 for (size_t idx
= 0; idx
< vectSize
; idx
++) {
1512 recvData
[idx
].iov
.iov_base
= recvData
[idx
].packet
;
1513 recvData
[idx
].iov
.iov_len
= sizeof(recvData
[idx
].packet
);
1516 /* block until we have at least one message ready, but return
1517 as many as possible to save the syscall costs */
1518 int msgsGot
= recvmmsg(cs
->udpFD
, msgVec
.get(), vectSize
, MSG_WAITFORONE
| MSG_TRUNC
, nullptr);
1521 vinfolog("Getting UDP messages via recvmmsg() failed with: %s", strerror(errno
));
1525 unsigned int msgsToSend
= 0;
1527 /* process the received messages */
1528 for (int msgIdx
= 0; msgIdx
< msgsGot
; msgIdx
++) {
1529 const struct msghdr
* msgh
= &msgVec
[msgIdx
].msg_hdr
;
1530 unsigned int got
= msgVec
[msgIdx
].msg_len
;
1531 const ComboAddress
& remote
= recvData
[msgIdx
].remote
;
1533 if (got
< sizeof(struct dnsheader
)) {
1534 g_stats
.nonCompliantQueries
++;
1538 processUDPQuery(*cs
, holders
, msgh
, remote
, recvData
[msgIdx
].dest
, recvData
[msgIdx
].packet
, static_cast<uint16_t>(got
), sizeof(recvData
[msgIdx
].packet
), outMsgVec
.get(), &msgsToSend
, &recvData
[msgIdx
].iov
, recvData
[msgIdx
].cbuf
);
1542 /* immediate (not delayed or sent to a backend) responses (mostly from a rule, dynamic block
1543 or the cache) can be sent in batch too */
1545 if (msgsToSend
> 0 && msgsToSend
<= static_cast<unsigned int>(msgsGot
)) {
1546 int sent
= sendmmsg(cs
->udpFD
, outMsgVec
.get(), msgsToSend
, 0);
1548 if (sent
< 0 || static_cast<unsigned int>(sent
) != msgsToSend
) {
1549 vinfolog("Error sending responses with sendmmsg() (%d on %u): %s", sent
, msgsToSend
, strerror(errno
));
1555 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1557 // listens to incoming queries, sends out to downstream servers, noting the intended return path
1558 static void* udpClientThread(ClientState
* cs
)
1561 LocalHolders holders
;
1563 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1564 if (g_udpVectorSize
> 1) {
1565 MultipleMessagesUDPClientThread(cs
, holders
);
1569 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1572 /* the actual buffer is larger because:
1573 - we may have to add EDNS and/or ECS
1574 - we use it for self-generated responses (from rule or cache)
1575 but we only accept incoming payloads up to that size
1577 static_assert(s_udpIncomingBufferSize
<= sizeof(packet
), "the incoming buffer size should not be larger than sizeof(MMReceiver::packet)");
1580 /* used by HarvestDestinationAddress */
1583 ComboAddress remote
;
1585 remote
.sin4
.sin_family
= cs
->local
.sin4
.sin_family
;
1586 fillMSGHdr(&msgh
, &iov
, cbuf
, sizeof(cbuf
), packet
, sizeof(packet
), &remote
);
1589 ssize_t got
= recvmsg(cs
->udpFD
, &msgh
, 0);
1591 if (got
< 0 || static_cast<size_t>(got
) < sizeof(struct dnsheader
)) {
1592 g_stats
.nonCompliantQueries
++;
1596 processUDPQuery(*cs
, holders
, &msgh
, remote
, dest
, packet
, static_cast<uint16_t>(got
), s_udpIncomingBufferSize
, nullptr, nullptr, nullptr, nullptr);
1602 catch(const std::exception
&e
)
1604 errlog("UDP client thread died because of exception: %s", e
.what());
1607 catch(const PDNSException
&e
)
1609 errlog("UDP client thread died because of PowerDNS exception: %s", e
.reason
);
1614 errlog("UDP client thread died because of an exception: %s", "unknown");
1618 static bool upCheck(DownstreamState
& ds
)
1621 DNSName checkName
= ds
.checkName
;
1622 uint16_t checkType
= ds
.checkType
.getCode();
1623 uint16_t checkClass
= ds
.checkClass
;
1624 dnsheader checkHeader
;
1625 memset(&checkHeader
, 0, sizeof(checkHeader
));
1627 checkHeader
.qdcount
= htons(1);
1628 #ifdef HAVE_LIBSODIUM
1629 checkHeader
.id
= randombytes_random() % 65536;
1631 checkHeader
.id
= random() % 65536;
1634 checkHeader
.rd
= true;
1636 checkHeader
.cd
= true;
1640 if (ds
.checkFunction
) {
1641 std::lock_guard
<std::mutex
> lock(g_luamutex
);
1642 auto ret
= ds
.checkFunction(checkName
, checkType
, checkClass
, &checkHeader
);
1643 checkName
= std::get
<0>(ret
);
1644 checkType
= std::get
<1>(ret
);
1645 checkClass
= std::get
<2>(ret
);
1648 vector
<uint8_t> packet
;
1649 DNSPacketWriter
dpw(packet
, checkName
, checkType
, checkClass
);
1650 dnsheader
* requestHeader
= dpw
.getHeader();
1651 *requestHeader
= checkHeader
;
1653 Socket
sock(ds
.remote
.sin4
.sin_family
, SOCK_DGRAM
);
1654 sock
.setNonBlocking();
1655 if (!IsAnyAddress(ds
.sourceAddr
)) {
1656 sock
.setReuseAddr();
1657 sock
.bind(ds
.sourceAddr
);
1659 sock
.connect(ds
.remote
);
1660 ssize_t sent
= udpClientSendRequestToBackend(&ds
, sock
.getHandle(), (char*)&packet
[0], packet
.size());
1663 if (g_verboseHealthChecks
)
1664 infolog("Error while sending a health check query to backend %s: %d", ds
.getNameWithAddr(), ret
);
1668 int ret
=waitForRWData(sock
.getHandle(), true, 1, 0);
1669 if(ret
< 0 || !ret
) { // error, timeout, both are down!
1672 if (g_verboseHealthChecks
)
1673 infolog("Error while waiting for the health check response from backend %s: %d", ds
.getNameWithAddr(), ret
);
1676 if (g_verboseHealthChecks
)
1677 infolog("Timeout while waiting for the health check response from backend %s", ds
.getNameWithAddr());
1683 sock
.recvFrom(reply
, ds
.remote
);
1685 const dnsheader
* responseHeader
= reinterpret_cast<const dnsheader
*>(reply
.c_str());
1687 if (reply
.size() < sizeof(*responseHeader
)) {
1688 if (g_verboseHealthChecks
)
1689 infolog("Invalid health check response of size %d from backend %s, expecting at least %d", reply
.size(), ds
.getNameWithAddr(), sizeof(*responseHeader
));
1693 if (responseHeader
->id
!= requestHeader
->id
) {
1694 if (g_verboseHealthChecks
)
1695 infolog("Invalid health check response id %d from backend %s, expecting %d", responseHeader
->id
, ds
.getNameWithAddr(), requestHeader
->id
);
1699 if (!responseHeader
->qr
) {
1700 if (g_verboseHealthChecks
)
1701 infolog("Invalid health check response from backend %s, expecting QR to be set", ds
.getNameWithAddr());
1705 if (responseHeader
->rcode
== RCode::ServFail
) {
1706 if (g_verboseHealthChecks
)
1707 infolog("Backend %s responded to health check with ServFail", ds
.getNameWithAddr());
1711 if (ds
.mustResolve
&& (responseHeader
->rcode
== RCode::NXDomain
|| responseHeader
->rcode
== RCode::Refused
)) {
1712 if (g_verboseHealthChecks
)
1713 infolog("Backend %s responded to health check with %s while mustResolve is set", ds
.getNameWithAddr(), responseHeader
->rcode
== RCode::NXDomain
? "NXDomain" : "Refused");
1717 uint16_t receivedType
;
1718 uint16_t receivedClass
;
1719 DNSName
receivedName(reply
.c_str(), reply
.size(), sizeof(dnsheader
), false, &receivedType
, &receivedClass
);
1721 if (receivedName
!= checkName
|| receivedType
!= checkType
|| receivedClass
!= checkClass
) {
1722 if (g_verboseHealthChecks
)
1723 infolog("Backend %s responded to health check with an invalid qname (%s vs %s), qtype (%s vs %s) or qclass (%d vs %d)", ds
.getNameWithAddr(), receivedName
.toLogString(), checkName
.toLogString(), QType(receivedType
).getName(), QType(checkType
).getName(), receivedClass
, checkClass
);
1729 catch(const std::exception
& e
)
1731 if (g_verboseHealthChecks
)
1732 infolog("Error checking the health of backend %s: %s", ds
.getNameWithAddr(), e
.what());
1737 if (g_verboseHealthChecks
)
1738 infolog("Unknown exception while checking the health of backend %s", ds
.getNameWithAddr());
1742 uint64_t g_maxTCPClientThreads
{10};
1743 std::atomic
<uint16_t> g_cacheCleaningDelay
{60};
1744 std::atomic
<uint16_t> g_cacheCleaningPercentage
{100};
1750 int32_t secondsToWaitLog
= 0;
1756 std::lock_guard
<std::mutex
> lock(g_luamutex
);
1757 auto f
= g_lua
.readVariable
<boost::optional
<std::function
<void()> > >("maintenance");
1761 secondsToWaitLog
= 0;
1763 catch(std::exception
&e
) {
1764 if (secondsToWaitLog
<= 0) {
1765 infolog("Error during execution of maintenance function: %s", e
.what());
1766 secondsToWaitLog
= 61;
1768 secondsToWaitLog
-= interval
;
1774 if (counter
>= g_cacheCleaningDelay
) {
1775 auto localPools
= g_pools
.getLocal();
1776 std::shared_ptr
<DNSDistPacketCache
> packetCache
= nullptr;
1777 for (const auto& entry
: *localPools
) {
1778 packetCache
= entry
.second
->packetCache
;
1780 size_t upTo
= (packetCache
->getMaxEntries()* (100 - g_cacheCleaningPercentage
)) / 100;
1781 packetCache
->purgeExpired(upTo
);
1787 // ponder pruning g_dynblocks of expired entries here
1792 void* healthChecksThread()
1799 if(g_tcpclientthreads
->getQueuedCount() > 1 && !g_tcpclientthreads
->hasReachedMaxThreads())
1800 g_tcpclientthreads
->addTCPClientThread();
1802 auto states
= g_dstates
.getLocal(); // this points to the actual shared_ptrs!
1803 for(auto& dss
: *states
) {
1804 if(dss
->availability
==DownstreamState::Availability::Auto
) {
1805 bool newState
=upCheck(*dss
);
1807 if (dss
->currentCheckFailures
!= 0) {
1808 dss
->currentCheckFailures
= 0;
1811 else if (!newState
&& dss
->upStatus
) {
1812 dss
->currentCheckFailures
++;
1813 if (dss
->currentCheckFailures
< dss
->maxCheckFailures
) {
1818 if(newState
!= dss
->upStatus
) {
1819 warnlog("Marking downstream %s as '%s'", dss
->getNameWithAddr(), newState
? "up" : "down");
1821 if (newState
&& !dss
->connected
) {
1822 for (auto& fd
: dss
->sockets
) {
1824 SConnect(fd
, dss
->remote
);
1826 std::lock_guard
<std::mutex
> lock(dss
->socketsLock
);
1827 dss
->mplexer
->addReadFD(fd
, [](int, boost::any
) {});
1829 dss
->connected
= true;
1831 catch(const std::runtime_error
& error
) {
1832 infolog("Error connecting to new server with address %s: %s", dss
->remote
.toStringWithPort(), error
.what());
1834 dss
->connected
= false;
1837 if (dss
->connected
) {
1838 dss
->tid
= thread(responderThread
, dss
);
1842 dss
->upStatus
= newState
;
1843 dss
->currentCheckFailures
= 0;
1844 if (g_snmpAgent
&& g_snmpTrapsEnabled
) {
1845 g_snmpAgent
->sendBackendStatusChangeTrap(dss
);
1850 auto delta
= dss
->sw
.udiffAndSet()/1000000.0;
1851 dss
->queryLoad
= 1.0*(dss
->queries
.load() - dss
->prev
.queries
.load())/delta
;
1852 dss
->dropRate
= 1.0*(dss
->reuseds
.load() - dss
->prev
.reuseds
.load())/delta
;
1853 dss
->prev
.queries
.store(dss
->queries
.load());
1854 dss
->prev
.reuseds
.store(dss
->reuseds
.load());
1856 for(IDState
& ids
: dss
->idStates
) { // timeouts
1857 if(ids
.origFD
>=0 && ids
.age
++ > g_udpTimeout
) {
1858 /* We set origFD to -1 as soon as possible
1859 to limit the risk of racing with the
1861 The UDP client thread only checks origFD to
1862 know whether outstanding has to be incremented,
1863 so the sooner the better any way since we _will_
1870 g_stats
.downstreamTimeouts
++; // this is an 'actively' discovered timeout
1871 vinfolog("Had a downstream timeout from %s (%s) for query for %s|%s from %s",
1872 dss
->remote
.toStringWithPort(), dss
->name
,
1873 ids
.qname
.toString(), QType(ids
.qtype
).getName(), ids
.origRemote
.toStringWithPort());
1878 struct dnsheader fake
;
1879 memset(&fake
, 0, sizeof(fake
));
1880 fake
.id
= ids
.origID
;
1882 g_rings
.insertResponse(ts
, ids
.origRemote
, ids
.qname
, ids
.qtype
, std::numeric_limits
<unsigned int>::max(), 0, fake
, dss
->remote
);
1890 static void bindAny(int af
, int sock
)
1892 __attribute__((unused
)) int one
= 1;
1895 if (setsockopt(sock
, IPPROTO_IP
, IP_FREEBIND
, &one
, sizeof(one
)) < 0)
1896 warnlog("Warning: IP_FREEBIND setsockopt failed: %s", strerror(errno
));
1901 if (setsockopt(sock
, IPPROTO_IP
, IP_BINDANY
, &one
, sizeof(one
)) < 0)
1902 warnlog("Warning: IP_BINDANY setsockopt failed: %s", strerror(errno
));
1906 if (setsockopt(sock
, IPPROTO_IPV6
, IPV6_BINDANY
, &one
, sizeof(one
)) < 0)
1907 warnlog("Warning: IPV6_BINDANY setsockopt failed: %s", strerror(errno
));
1910 if (setsockopt(sock
, SOL_SOCKET
, SO_BINDANY
, &one
, sizeof(one
)) < 0)
1911 warnlog("Warning: SO_BINDANY setsockopt failed: %s", strerror(errno
));
1915 static void dropGroupPrivs(gid_t gid
)
1918 if (setgid(gid
) == 0) {
1919 if (setgroups(0, NULL
) < 0) {
1920 warnlog("Warning: Unable to drop supplementary gids: %s", strerror(errno
));
1924 warnlog("Warning: Unable to set group ID to %d: %s", gid
, strerror(errno
));
1929 static void dropUserPrivs(uid_t uid
)
1932 if(setuid(uid
) < 0) {
1933 warnlog("Warning: Unable to set user ID to %d: %s", uid
, strerror(errno
));
1938 static void checkFileDescriptorsLimits(size_t udpBindsCount
, size_t tcpBindsCount
)
1940 /* stdin, stdout, stderr */
1941 size_t requiredFDsCount
= 3;
1942 auto backends
= g_dstates
.getLocal();
1943 /* UDP sockets to backends */
1944 size_t backendUDPSocketsCount
= 0;
1945 for (const auto& backend
: *backends
) {
1946 backendUDPSocketsCount
+= backend
->sockets
.size();
1948 requiredFDsCount
+= backendUDPSocketsCount
;
1949 /* TCP sockets to backends */
1950 requiredFDsCount
+= (backends
->size() * g_maxTCPClientThreads
);
1951 /* listening sockets */
1952 requiredFDsCount
+= udpBindsCount
;
1953 requiredFDsCount
+= tcpBindsCount
;
1954 /* max TCP connections currently served */
1955 requiredFDsCount
+= g_maxTCPClientThreads
;
1956 /* max pipes for communicating between TCP acceptors and client threads */
1957 requiredFDsCount
+= (g_maxTCPClientThreads
* 2);
1958 /* max TCP queued connections */
1959 requiredFDsCount
+= g_maxTCPQueuedConnections
;
1960 /* DelayPipe pipe */
1961 requiredFDsCount
+= 2;
1964 /* webserver main socket */
1966 /* console main socket */
1973 getrlimit(RLIMIT_NOFILE
, &rl
);
1974 if (rl
.rlim_cur
<= requiredFDsCount
) {
1975 warnlog("Warning, this configuration can use more than %d file descriptors, web server and console connections not included, and the current limit is %d.", std::to_string(requiredFDsCount
), std::to_string(rl
.rlim_cur
));
1977 warnlog("You can increase this value by using LimitNOFILE= in the systemd unit file or ulimit.");
1979 warnlog("You can increase this value by using ulimit.");
1986 vector
<string
> locals
;
1987 vector
<string
> remotes
;
1988 bool checkConfig
{false};
1989 bool beClient
{false};
1990 bool beSupervised
{false};
1997 std::atomic
<bool> g_configurationDone
{false};
2002 cout
<<"Syntax: dnsdist [-C,--config file] [-c,--client [IP[:PORT]]]\n";
2003 cout
<<"[-e,--execute cmd] [-h,--help] [-l,--local addr]\n";
2004 cout
<<"[-v,--verbose] [--check-config] [--version]\n";
2006 cout
<<"-a,--acl netmask Add this netmask to the ACL\n";
2007 cout
<<"-C,--config file Load configuration from 'file'\n";
2008 cout
<<"-c,--client Operate as a client, connect to dnsdist. This reads\n";
2009 cout
<<" controlSocket from your configuration file, but also\n";
2010 cout
<<" accepts an IP:PORT argument\n";
2011 #ifdef HAVE_LIBSODIUM
2012 cout
<<"-k,--setkey KEY Use KEY for encrypted communication to dnsdist. This\n";
2013 cout
<<" is similar to setting setKey in the configuration file.\n";
2014 cout
<<" NOTE: this will leak this key in your shell's history!\n";
2016 cout
<<"--check-config Validate the configuration file and exit. The exit-code\n";
2017 cout
<<" reflects the validation, 0 is OK, 1 means an error.\n";
2018 cout
<<" Any errors are printed as well.\n";
2019 cout
<<"-e,--execute cmd Connect to dnsdist and execute 'cmd'\n";
2020 cout
<<"-g,--gid gid Change the process group ID after binding sockets\n";
2021 cout
<<"-h,--help Display this helpful message\n";
2022 cout
<<"-l,--local address Listen on this local address\n";
2023 cout
<<"--supervised Don't open a console, I'm supervised\n";
2024 cout
<<" (use with e.g. systemd and daemontools)\n";
2025 cout
<<"--disable-syslog Don't log to syslog, only to stdout\n";
2026 cout
<<" (use with e.g. systemd)\n";
2027 cout
<<"-u,--uid uid Change the process user ID after binding sockets\n";
2028 cout
<<"-v,--verbose Enable verbose mode\n";
2029 cout
<<"-V,--version Show dnsdist version information and exit\n";
2032 int main(int argc
, char** argv
)
2035 size_t udpBindsCount
= 0;
2036 size_t tcpBindsCount
= 0;
2037 rl_attempted_completion_function
= my_completion
;
2038 rl_completion_append_character
= 0;
2040 signal(SIGPIPE
, SIG_IGN
);
2041 signal(SIGCHLD
, SIG_IGN
);
2042 openlog("dnsdist", LOG_PID
, LOG_DAEMON
);
2045 #ifdef HAVE_LIBSODIUM
2046 if (sodium_init() == -1) {
2047 cerr
<<"Unable to initialize crypto library"<<endl
;
2050 g_hashperturb
=randombytes_uniform(0xffffffff);
2051 srandom(randombytes_uniform(0xffffffff));
2055 gettimeofday(&tv
, 0);
2056 srandom(tv
.tv_sec
^ tv
.tv_usec
^ getpid());
2057 g_hashperturb
=random();
2061 ComboAddress clientAddress
= ComboAddress();
2062 g_cmdLine
.config
=SYSCONFDIR
"/dnsdist.conf";
2063 struct option longopts
[]={
2064 {"acl", required_argument
, 0, 'a'},
2065 {"config", required_argument
, 0, 'C'},
2066 {"check-config", 0, 0, 1},
2067 {"execute", required_argument
, 0, 'e'},
2068 {"client", 0, 0, 'c'},
2069 {"gid", required_argument
, 0, 'g'},
2070 #ifdef HAVE_LIBSODIUM
2071 {"setkey", required_argument
, 0, 'k'},
2073 {"local", required_argument
, 0, 'l'},
2074 {"supervised", 0, 0, 's'},
2075 {"disable-syslog", 0, 0, 2},
2076 {"uid", required_argument
, 0, 'u'},
2077 {"verbose", 0, 0, 'v'},
2078 {"version", 0, 0, 'V'},
2079 {"help", 0, 0, 'h'},
2085 #ifdef HAVE_LIBSODIUM
2086 int c
=getopt_long(argc
, argv
, "a:hce:C:k:l:v:g:u:V", longopts
, &longindex
);
2088 int c
=getopt_long(argc
, argv
, "a:hce:C:l:v:g:u:V", longopts
, &longindex
);
2094 g_cmdLine
.checkConfig
=true;
2100 g_cmdLine
.config
=optarg
;
2103 g_cmdLine
.beClient
=true;
2106 g_cmdLine
.command
=optarg
;
2109 g_cmdLine
.gid
=optarg
;
2112 cout
<<"dnsdist "<<VERSION
<<endl
;
2119 g_ACL
.modify([optstring
](NetmaskGroup
& nmg
) { nmg
.addMask(optstring
); });
2121 #ifdef HAVE_LIBSODIUM
2123 if (B64Decode(string(optarg
), g_consoleKey
) < 0) {
2124 cerr
<<"Unable to decode key '"<<optarg
<<"'."<<endl
;
2130 g_cmdLine
.locals
.push_back(trim_copy(string(optarg
)));
2133 g_cmdLine
.beSupervised
=true;
2136 g_cmdLine
.uid
=optarg
;
2142 #ifdef LUAJIT_VERSION
2143 cout
<<"dnsdist "<<VERSION
<<" ("<<LUA_RELEASE
<<" ["<<LUAJIT_VERSION
<<"])"<<endl
;
2145 cout
<<"dnsdist "<<VERSION
<<" ("<<LUA_RELEASE
<<")"<<endl
;
2147 cout
<<"Enabled features: ";
2148 #ifdef HAVE_DNS_OVER_TLS
2149 cout
<<"dns-over-tls(";
2161 #ifdef HAVE_DNSCRYPT
2170 #ifdef HAVE_LIBSODIUM
2173 #ifdef HAVE_PROTOBUF
2179 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
2180 cout
<<"recvmmsg/sendmmsg ";
2182 #ifdef HAVE_NET_SNMP
2192 //getopt_long printed an error message.
2201 for(auto p
= argv
; *p
; ++p
) {
2202 if(g_cmdLine
.beClient
) {
2203 clientAddress
= ComboAddress(*p
, 5199);
2205 g_cmdLine
.remotes
.push_back(*p
);
2209 ServerPolicy leastOutstandingPol
{"leastOutstanding", leastOutstanding
, false};
2211 g_policy
.setState(leastOutstandingPol
);
2212 if(g_cmdLine
.beClient
|| !g_cmdLine
.command
.empty()) {
2213 setupLua(true, g_cmdLine
.config
);
2214 if (clientAddress
!= ComboAddress())
2215 g_serverControl
= clientAddress
;
2216 doClient(g_serverControl
, g_cmdLine
.command
);
2217 _exit(EXIT_SUCCESS
);
2220 auto acl
= g_ACL
.getCopy();
2222 for(auto& addr
: {"127.0.0.0/8", "10.0.0.0/8", "100.64.0.0/10", "169.254.0.0/16", "192.168.0.0/16", "172.16.0.0/12", "::1/128", "fc00::/7", "fe80::/10"})
2224 g_ACL
.setState(acl
);
2227 auto consoleACL
= g_consoleACL
.getCopy();
2228 for (const auto& mask
: { "127.0.0.1/8", "::1/128" }) {
2229 consoleACL
.addMask(mask
);
2231 g_consoleACL
.setState(consoleACL
);
2233 if (g_cmdLine
.checkConfig
) {
2234 setupLua(true, g_cmdLine
.config
);
2235 // No exception was thrown
2236 infolog("Configuration '%s' OK!", g_cmdLine
.config
);
2237 _exit(EXIT_SUCCESS
);
2240 auto todo
=setupLua(false, g_cmdLine
.config
);
2242 if(g_cmdLine
.locals
.size()) {
2244 for(auto loc
: g_cmdLine
.locals
)
2245 g_locals
.push_back(std::make_tuple(ComboAddress(loc
, 53), true, false, 0, "", std::set
<int>()));
2248 if(g_locals
.empty())
2249 g_locals
.push_back(std::make_tuple(ComboAddress("127.0.0.1", 53), true, false, 0, "", std::set
<int>()));
2251 g_configurationDone
= true;
2253 vector
<ClientState
*> toLaunch
;
2254 for(const auto& local
: g_locals
) {
2255 ClientState
* cs
= new ClientState
;
2256 cs
->local
= std::get
<0>(local
);
2257 cs
->udpFD
= SSocket(cs
->local
.sin4
.sin_family
, SOCK_DGRAM
, 0);
2258 if(cs
->local
.sin4
.sin_family
== AF_INET6
) {
2259 SSetsockopt(cs
->udpFD
, IPPROTO_IPV6
, IPV6_V6ONLY
, 1);
2261 //if(g_vm.count("bind-non-local"))
2262 bindAny(cs
->local
.sin4
.sin_family
, cs
->udpFD
);
2264 // if (!setSocketTimestamps(cs->udpFD))
2265 // g_log<<Logger::Warning<<"Unable to enable timestamp reporting for socket"<<endl;
2268 if(IsAnyAddress(cs
->local
)) {
2270 setsockopt(cs
->udpFD
, IPPROTO_IP
, GEN_IP_PKTINFO
, &one
, sizeof(one
)); // linux supports this, so why not - might fail on other systems
2271 #ifdef IPV6_RECVPKTINFO
2272 setsockopt(cs
->udpFD
, IPPROTO_IPV6
, IPV6_RECVPKTINFO
, &one
, sizeof(one
));
2276 if (std::get
<2>(local
)) {
2278 SSetsockopt(cs
->udpFD
, SOL_SOCKET
, SO_REUSEPORT
, 1);
2280 warnlog("SO_REUSEPORT has been configured on local address '%s' but is not supported", std::get
<0>(local
).toStringWithPort());
2284 const std::string
& itf
= std::get
<4>(local
);
2286 #ifdef SO_BINDTODEVICE
2287 int res
= setsockopt(cs
->udpFD
, SOL_SOCKET
, SO_BINDTODEVICE
, itf
.c_str(), itf
.length());
2289 warnlog("Error setting up the interface on local address '%s': %s", std::get
<0>(local
).toStringWithPort(), strerror(errno
));
2292 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", std::get
<0>(local
).toStringWithPort());
2297 if (g_defaultBPFFilter
) {
2298 cs
->attachFilter(g_defaultBPFFilter
);
2299 vinfolog("Attaching default BPF Filter to UDP frontend %s", cs
->local
.toStringWithPort());
2301 #endif /* HAVE_EBPF */
2303 cs
->cpus
= std::get
<5>(local
);
2305 SBind(cs
->udpFD
, cs
->local
);
2306 toLaunch
.push_back(cs
);
2307 g_frontends
.push_back(cs
);
2311 for(const auto& local
: g_locals
) {
2312 if(!std::get
<1>(local
)) { // no TCP/IP
2313 warnlog("Not providing TCP/IP service on local address '%s'", std::get
<0>(local
).toStringWithPort());
2316 ClientState
* cs
= new ClientState
;
2317 cs
->local
= std::get
<0>(local
);
2319 cs
->tcpFD
= SSocket(cs
->local
.sin4
.sin_family
, SOCK_STREAM
, 0);
2321 SSetsockopt(cs
->tcpFD
, SOL_SOCKET
, SO_REUSEADDR
, 1);
2322 #ifdef TCP_DEFER_ACCEPT
2323 SSetsockopt(cs
->tcpFD
, IPPROTO_TCP
, TCP_DEFER_ACCEPT
, 1);
2325 if (std::get
<3>(local
) > 0) {
2327 SSetsockopt(cs
->tcpFD
, IPPROTO_TCP
, TCP_FASTOPEN
, std::get
<3>(local
));
2329 warnlog("TCP Fast Open has been configured on local address '%s' but is not supported", std::get
<0>(local
).toStringWithPort());
2332 if(cs
->local
.sin4
.sin_family
== AF_INET6
) {
2333 SSetsockopt(cs
->tcpFD
, IPPROTO_IPV6
, IPV6_V6ONLY
, 1);
2336 /* no need to warn again if configured but support is not available, we already did for UDP */
2337 if (std::get
<2>(local
)) {
2338 SSetsockopt(cs
->tcpFD
, SOL_SOCKET
, SO_REUSEPORT
, 1);
2342 const std::string
& itf
= std::get
<4>(local
);
2344 #ifdef SO_BINDTODEVICE
2345 int res
= setsockopt(cs
->tcpFD
, SOL_SOCKET
, SO_BINDTODEVICE
, itf
.c_str(), itf
.length());
2347 warnlog("Error setting up the interface on local address '%s': %s", std::get
<0>(local
).toStringWithPort(), strerror(errno
));
2350 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", std::get
<0>(local
).toStringWithPort());
2355 if (g_defaultBPFFilter
) {
2356 cs
->attachFilter(g_defaultBPFFilter
);
2357 vinfolog("Attaching default BPF Filter to TCP frontend %s", cs
->local
.toStringWithPort());
2359 #endif /* HAVE_EBPF */
2361 // if(g_vm.count("bind-non-local"))
2362 bindAny(cs
->local
.sin4
.sin_family
, cs
->tcpFD
);
2363 SBind(cs
->tcpFD
, cs
->local
);
2364 SListen(cs
->tcpFD
, 64);
2365 warnlog("Listening on %s", cs
->local
.toStringWithPort());
2367 toLaunch
.push_back(cs
);
2368 g_frontends
.push_back(cs
);
2372 #ifdef HAVE_DNSCRYPT
2373 for(auto& dcLocal
: g_dnsCryptLocals
) {
2374 ClientState
* cs
= new ClientState
;
2375 cs
->local
= std::get
<0>(dcLocal
);
2376 cs
->dnscryptCtx
= std::get
<1>(dcLocal
);
2377 cs
->udpFD
= SSocket(cs
->local
.sin4
.sin_family
, SOCK_DGRAM
, 0);
2378 if(cs
->local
.sin4
.sin_family
== AF_INET6
) {
2379 SSetsockopt(cs
->udpFD
, IPPROTO_IPV6
, IPV6_V6ONLY
, 1);
2381 bindAny(cs
->local
.sin4
.sin_family
, cs
->udpFD
);
2382 if(IsAnyAddress(cs
->local
)) {
2384 setsockopt(cs
->udpFD
, IPPROTO_IP
, GEN_IP_PKTINFO
, &one
, sizeof(one
)); // linux supports this, so why not - might fail on other systems
2385 #ifdef IPV6_RECVPKTINFO
2386 setsockopt(cs
->udpFD
, IPPROTO_IPV6
, IPV6_RECVPKTINFO
, &one
, sizeof(one
));
2389 if (std::get
<2>(dcLocal
)) {
2391 SSetsockopt(cs
->udpFD
, SOL_SOCKET
, SO_REUSEPORT
, 1);
2393 warnlog("SO_REUSEPORT has been configured on local address '%s' but is not supported", std::get
<0>(dcLocal
).toStringWithPort());
2397 const std::string
& itf
= std::get
<4>(dcLocal
);
2399 #ifdef SO_BINDTODEVICE
2400 int res
= setsockopt(cs
->udpFD
, SOL_SOCKET
, SO_BINDTODEVICE
, itf
.c_str(), itf
.length());
2402 warnlog("Error setting up the interface on local address '%s': %s", std::get
<0>(dcLocal
).toStringWithPort(), strerror(errno
));
2405 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", std::get
<0>(dcLocal
).toStringWithPort());
2410 if (g_defaultBPFFilter
) {
2411 cs
->attachFilter(g_defaultBPFFilter
);
2412 vinfolog("Attaching default BPF Filter to UDP DNSCrypt frontend %s", cs
->local
.toStringWithPort());
2414 #endif /* HAVE_EBPF */
2415 SBind(cs
->udpFD
, cs
->local
);
2416 toLaunch
.push_back(cs
);
2417 g_frontends
.push_back(cs
);
2420 cs
= new ClientState
;
2421 cs
->local
= std::get
<0>(dcLocal
);
2422 cs
->dnscryptCtx
= std::get
<1>(dcLocal
);
2423 cs
->tcpFD
= SSocket(cs
->local
.sin4
.sin_family
, SOCK_STREAM
, 0);
2424 SSetsockopt(cs
->tcpFD
, SOL_SOCKET
, SO_REUSEADDR
, 1);
2425 #ifdef TCP_DEFER_ACCEPT
2426 SSetsockopt(cs
->tcpFD
, IPPROTO_TCP
, TCP_DEFER_ACCEPT
, 1);
2428 if (std::get
<3>(dcLocal
) > 0) {
2430 SSetsockopt(cs
->tcpFD
, IPPROTO_TCP
, TCP_FASTOPEN
, std::get
<3>(dcLocal
));
2432 warnlog("TCP Fast Open has been configured on local address '%s' but is not supported", std::get
<0>(dcLocal
).toStringWithPort());
2437 /* no need to warn again if configured but support is not available, we already did for UDP */
2438 if (std::get
<2>(dcLocal
)) {
2439 SSetsockopt(cs
->tcpFD
, SOL_SOCKET
, SO_REUSEPORT
, 1);
2444 #ifdef SO_BINDTODEVICE
2445 int res
= setsockopt(cs
->tcpFD
, SOL_SOCKET
, SO_BINDTODEVICE
, itf
.c_str(), itf
.length());
2447 warnlog("Error setting up the interface on local address '%s': %s", std::get
<0>(dcLocal
).toStringWithPort(), strerror(errno
));
2450 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", std::get
<0>(dcLocal
).toStringWithPort());
2454 if(cs
->local
.sin4
.sin_family
== AF_INET6
) {
2455 SSetsockopt(cs
->tcpFD
, IPPROTO_IPV6
, IPV6_V6ONLY
, 1);
2458 if (g_defaultBPFFilter
) {
2459 cs
->attachFilter(g_defaultBPFFilter
);
2460 vinfolog("Attaching default BPF Filter to TCP DNSCrypt frontend %s", cs
->local
.toStringWithPort());
2462 #endif /* HAVE_EBPF */
2464 cs
->cpus
= std::get
<5>(dcLocal
);
2466 bindAny(cs
->local
.sin4
.sin_family
, cs
->tcpFD
);
2467 SBind(cs
->tcpFD
, cs
->local
);
2468 SListen(cs
->tcpFD
, 64);
2469 warnlog("Listening on %s", cs
->local
.toStringWithPort());
2470 toLaunch
.push_back(cs
);
2471 g_frontends
.push_back(cs
);
2476 for(auto& frontend
: g_tlslocals
) {
2477 ClientState
* cs
= new ClientState
;
2478 cs
->local
= frontend
->d_addr
;
2479 cs
->tcpFD
= SSocket(cs
->local
.sin4
.sin_family
, SOCK_STREAM
, 0);
2480 SSetsockopt(cs
->tcpFD
, SOL_SOCKET
, SO_REUSEADDR
, 1);
2481 #ifdef TCP_DEFER_ACCEPT
2482 SSetsockopt(cs
->tcpFD
, IPPROTO_TCP
, TCP_DEFER_ACCEPT
, 1);
2484 if (frontend
->d_tcpFastOpenQueueSize
> 0) {
2486 SSetsockopt(cs
->tcpFD
, IPPROTO_TCP
, TCP_FASTOPEN
, frontend
->d_tcpFastOpenQueueSize
);
2488 warnlog("TCP Fast Open has been configured on local address '%s' but is not supported", cs
->local
.toStringWithPort());
2491 if (frontend
->d_reusePort
) {
2493 SSetsockopt(cs
->tcpFD
, SOL_SOCKET
, SO_REUSEPORT
, 1);
2495 warnlog("SO_REUSEPORT has been configured on local address '%s' but is not supported", cs
.local
.toStringWithPort());
2498 if(cs
->local
.sin4
.sin_family
== AF_INET6
) {
2499 SSetsockopt(cs
->tcpFD
, IPPROTO_IPV6
, IPV6_V6ONLY
, 1);
2502 if (!frontend
->d_interface
.empty()) {
2503 #ifdef SO_BINDTODEVICE
2504 int res
= setsockopt(cs
->tcpFD
, SOL_SOCKET
, SO_BINDTODEVICE
, frontend
->d_interface
.c_str(), frontend
->d_interface
.length());
2506 warnlog("Error setting up the interface on local address '%s': %s", cs
->local
.toStringWithPort(), strerror(errno
));
2509 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", cs
->local
.toStringWithPort());
2513 cs
->cpus
= frontend
->d_cpus
;
2515 bindAny(cs
->local
.sin4
.sin_family
, cs
->tcpFD
);
2516 if (frontend
->setupTLS()) {
2517 cs
->tlsFrontend
= frontend
;
2518 SBind(cs
->tcpFD
, cs
->local
);
2519 SListen(cs
->tcpFD
, 64);
2520 warnlog("Listening on %s for TLS", cs
->local
.toStringWithPort());
2521 toLaunch
.push_back(cs
);
2522 g_frontends
.push_back(cs
);
2527 errlog("Error while setting up TLS on local address '%s', exiting", cs
->local
.toStringWithPort());
2528 _exit(EXIT_FAILURE
);
2532 warnlog("dnsdist %s comes with ABSOLUTELY NO WARRANTY. This is free software, and you are welcome to redistribute it according to the terms of the GPL version 2", VERSION
);
2535 g_ACL
.getLocal()->toStringVector(&vec
);
2536 for(const auto& s
: vec
) {
2541 infolog("ACL allowing queries from: %s", acls
.c_str());
2544 g_consoleACL
.getLocal()->toStringVector(&vec
);
2545 for (const auto& entry
: vec
) {
2546 if (!acls
.empty()) {
2551 infolog("Console ACL allowing connections from: %s", acls
.c_str());
2556 if(!g_cmdLine
.gid
.empty())
2557 newgid
= strToGID(g_cmdLine
.gid
.c_str());
2559 if(!g_cmdLine
.uid
.empty())
2560 newuid
= strToUID(g_cmdLine
.uid
.c_str());
2562 dropGroupPrivs(newgid
);
2563 dropUserPrivs(newuid
);
2565 /* this need to be done _after_ dropping privileges */
2566 g_delay
= new DelayPipe
<DelayedPacket
>();
2572 g_tcpclientthreads
= std::make_shared
<TCPClientCollection
>(g_maxTCPClientThreads
, g_useTCPSinglePipe
);
2577 auto localPools
= g_pools
.getCopy();
2578 /* create the default pool no matter what */
2579 createPoolIfNotExists(localPools
, "");
2580 if(g_cmdLine
.remotes
.size()) {
2581 for(const auto& address
: g_cmdLine
.remotes
) {
2582 auto ret
=std::make_shared
<DownstreamState
>(ComboAddress(address
, 53));
2583 addServerToPool(localPools
, "", ret
);
2584 if (ret
->connected
) {
2585 ret
->tid
= thread(responderThread
, ret
);
2587 g_dstates
.modify([ret
](servers_t
& servers
) { servers
.push_back(ret
); });
2590 g_pools
.setState(localPools
);
2592 if(g_dstates
.getLocal()->empty()) {
2593 errlog("No downstream servers defined: all packets will get dropped");
2594 // you might define them later, but you need to know
2597 checkFileDescriptorsLimits(udpBindsCount
, tcpBindsCount
);
2599 for(auto& dss
: g_dstates
.getCopy()) { // it is a copy, but the internal shared_ptrs are the real deal
2600 if(dss
->availability
==DownstreamState::Availability::Auto
) {
2601 bool newState
=upCheck(*dss
);
2602 warnlog("Marking downstream %s as '%s'", dss
->getNameWithAddr(), newState
? "up" : "down");
2603 dss
->upStatus
= newState
;
2607 for(auto& cs
: toLaunch
) {
2608 if (cs
->udpFD
>= 0) {
2609 thread
t1(udpClientThread
, cs
);
2610 if (!cs
->cpus
.empty()) {
2611 mapThreadToCPUList(t1
.native_handle(), cs
->cpus
);
2615 else if (cs
->tcpFD
>= 0) {
2616 thread
t1(tcpAcceptorThread
, cs
);
2617 if (!cs
->cpus
.empty()) {
2618 mapThreadToCPUList(t1
.native_handle(), cs
->cpus
);
2624 thread
carbonthread(carbonDumpThread
);
2625 carbonthread
.detach();
2627 thread
stattid(maintThread
);
2630 thread
healththread(healthChecksThread
);
2632 if(g_cmdLine
.beSupervised
) {
2634 sd_notify(0, "READY=1");
2636 healththread
.join();
2639 healththread
.detach();
2642 _exit(EXIT_SUCCESS
);
2645 catch(const LuaContext::ExecutionErrorException
& e
) {
2647 errlog("Fatal Lua error: %s", e
.what());
2648 std::rethrow_if_nested(e
);
2649 } catch(const std::exception
& ne
) {
2650 errlog("Details: %s", ne
.what());
2652 catch(PDNSException
&ae
)
2654 errlog("Fatal pdns error: %s", ae
.reason
);
2656 _exit(EXIT_FAILURE
);
2658 catch(std::exception
&e
)
2660 errlog("Fatal error: %s", e
.what());
2661 _exit(EXIT_FAILURE
);
2663 catch(PDNSException
&ae
)
2665 errlog("Fatal pdns error: %s", ae
.reason
);
2666 _exit(EXIT_FAILURE
);