2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
23 #include "dnsdist-ecs.hh"
26 #include <netinet/tcp.h>
30 #if defined (__OpenBSD__)
31 #include <readline/readline.h>
33 #include <editline/readline.h>
37 #include "dnswriter.hh"
40 #include "delaypipe.hh"
42 #include "sodcrypto.hh"
43 #include "dnsrulactions.hh"
48 #include <sys/resource.h>
49 #include "dnsdist-cache.hh"
53 #include <systemd/sd-daemon.h>
57 thread_local
boost::uuids::random_generator t_uuidGenerator
;
62 Receiver is currently single threaded
63 not *that* bad actually, but now that we are thread safe, might want to scale
67 Set of Rules, if one matches, it leads to an Action
68 Both rules and actions could conceivably be Lua based.
69 On the C++ side, both could be inherited from a class Rule and a class Action,
70 on the Lua side we can't do that. */
76 struct DNSDistStats g_stats
;
77 uint16_t g_maxOutstanding
{10240};
79 bool g_verboseHealthChecks
{false};
80 uint32_t g_staleCacheEntriesTTL
{0};
83 GlobalStateHolder
<NetmaskGroup
> g_ACL
;
84 string g_outputBuffer
;
85 vector
<std::tuple
<ComboAddress
, bool, bool, int, string
, std::set
<int>>> g_locals
;
87 std::vector
<std::tuple
<ComboAddress
,DnsCryptContext
,bool, int, string
, std::set
<int>>> g_dnsCryptLocals
;
90 shared_ptr
<BPFFilter
> g_defaultBPFFilter
;
91 std::vector
<std::shared_ptr
<DynBPFFilter
> > g_dynBPFFilters
;
92 #endif /* HAVE_EBPF */
93 vector
<ClientState
*> g_frontends
;
94 GlobalStateHolder
<pools_t
> g_pools
;
95 size_t g_udpVectorSize
{1};
97 bool g_snmpEnabled
{false};
98 bool g_snmpTrapsEnabled
{false};
99 DNSDistSNMPAgent
* g_snmpAgent
{nullptr};
101 /* UDP: the grand design. Per socket we listen on for incoming queries there is one thread.
102 Then we have a bunch of connected sockets for talking to downstream servers.
103 We send directly to those sockets.
105 For the return path, per downstream server we have a thread that listens to responses.
107 Per socket there is an array of 2^16 states, when we send out a packet downstream, we note
108 there the original requestor and the original id. The new ID is the offset in the array.
110 When an answer comes in on a socket, we look up the offset by the id, and lob it to the
113 IDs are assigned by atomic increments of the socket offset.
116 /* for our load balancing, we want to support:
118 Round-robin with basic uptime checks
119 Send to least loaded server (least outstanding)
120 Send it to the first server that is not overloaded
121 Hashed weighted random
125 Multiple server groups, by default we load balance to the group with no name.
126 Each instance is either 'up', 'down' or 'auto', where 'auto' means that dnsdist
127 determines if the instance is up or not. Auto should be the default and very very good.
129 In addition, to each instance you can attach a QPS object with rate & burst, which will optionally
130 limit the amount of queries we send there.
132 If all downstreams are over QPS, we pick the fastest server */
134 GlobalStateHolder
<vector
<pair
<std::shared_ptr
<DNSRule
>, std::shared_ptr
<DNSAction
> > > > g_rulactions
;
135 GlobalStateHolder
<vector
<pair
<std::shared_ptr
<DNSRule
>, std::shared_ptr
<DNSResponseAction
> > > > g_resprulactions
;
136 GlobalStateHolder
<vector
<pair
<std::shared_ptr
<DNSRule
>, std::shared_ptr
<DNSResponseAction
> > > > g_cachehitresprulactions
;
140 GlobalStateHolder
<servers_t
> g_dstates
;
141 GlobalStateHolder
<NetmaskTree
<DynBlock
>> g_dynblockNMG
;
142 GlobalStateHolder
<SuffixMatchTree
<DynBlock
>> g_dynblockSMT
;
143 DNSAction::Action g_dynBlockAction
= DNSAction::Action::Drop
;
144 int g_tcpRecvTimeout
{2};
145 int g_tcpSendTimeout
{2};
148 bool g_servFailOnNoPolicy
{false};
149 bool g_truncateTC
{false};
152 static const size_t s_udpIncomingBufferSize
{1500};
154 static void truncateTC(const char* packet
, uint16_t* len
)
157 unsigned int consumed
;
158 DNSName
qname(packet
, *len
, sizeof(dnsheader
), false, 0, 0, &consumed
);
159 *len
=(uint16_t) (sizeof(dnsheader
)+consumed
+DNS_TYPE_SIZE
+DNS_CLASS_SIZE
);
160 struct dnsheader
* dh
=(struct dnsheader
*)packet
;
161 dh
->ancount
= dh
->arcount
= dh
->nscount
=0;
172 ComboAddress destination
;
173 ComboAddress origDest
;
177 if(origDest
.sin4
.sin_family
== 0) {
178 res
= sendto(fd
, packet
.c_str(), packet
.size(), 0, (struct sockaddr
*)&destination
, destination
.getSocklen());
181 res
= sendfromto(fd
, packet
.c_str(), packet
.size(), 0, origDest
, destination
);
185 vinfolog("Error sending delayed response to %s: %s", destination
.toStringWithPort(), strerror(err
));
190 DelayPipe
<DelayedPacket
> * g_delay
= 0;
192 static void doLatencyAverages(double udiff
)
194 auto doAvg
= [](double& var
, double n
, double weight
) {
195 var
= (weight
-1) * var
/weight
+ n
/weight
;
198 doAvg(g_stats
.latencyAvg100
, udiff
, 100);
199 doAvg(g_stats
.latencyAvg1000
, udiff
, 1000);
200 doAvg(g_stats
.latencyAvg10000
, udiff
, 10000);
201 doAvg(g_stats
.latencyAvg1000000
, udiff
, 1000000);
204 bool responseContentMatches(const char* response
, const uint16_t responseLen
, const DNSName
& qname
, const uint16_t qtype
, const uint16_t qclass
, const ComboAddress
& remote
)
206 uint16_t rqtype
, rqclass
;
207 unsigned int consumed
;
209 const struct dnsheader
* dh
= (struct dnsheader
*) response
;
211 if (responseLen
< sizeof(dnsheader
)) {
215 if (dh
->qdcount
== 0) {
216 if (dh
->rcode
!= RCode::NoError
&& dh
->rcode
!= RCode::NXDomain
) {
220 g_stats
.nonCompliantResponses
++;
226 rqname
=DNSName(response
, responseLen
, sizeof(dnsheader
), false, &rqtype
, &rqclass
, &consumed
);
228 catch(std::exception
& e
) {
229 if(responseLen
> (ssize_t
)sizeof(dnsheader
))
230 infolog("Backend %s sent us a response with id %d that did not parse: %s", remote
.toStringWithPort(), ntohs(dh
->id
), e
.what());
231 g_stats
.nonCompliantResponses
++;
235 if (rqtype
!= qtype
|| rqclass
!= qclass
|| rqname
!= qname
) {
242 void restoreFlags(struct dnsheader
* dh
, uint16_t origFlags
)
244 static const uint16_t rdMask
= 1 << FLAGS_RD_OFFSET
;
245 static const uint16_t cdMask
= 1 << FLAGS_CD_OFFSET
;
246 static const uint16_t restoreFlagsMask
= UINT16_MAX
& ~(rdMask
| cdMask
);
247 uint16_t * flags
= getFlagsFromDNSHeader(dh
);
248 /* clear the flags we are about to restore */
249 *flags
&= restoreFlagsMask
;
250 /* only keep the flags we want to restore */
251 origFlags
&= ~restoreFlagsMask
;
252 /* set the saved flags as they were */
256 bool fixUpResponse(char** response
, uint16_t* responseLen
, size_t* responseSize
, const DNSName
& qname
, uint16_t origFlags
, bool ednsAdded
, bool ecsAdded
, std::vector
<uint8_t>& rewrittenResponse
, uint16_t addRoom
)
258 struct dnsheader
* dh
= (struct dnsheader
*) *response
;
260 if (*responseLen
< sizeof(dnsheader
)) {
264 restoreFlags(dh
, origFlags
);
266 if (*responseLen
== sizeof(dnsheader
)) {
271 string realname
= qname
.toDNSString();
272 if (*responseLen
>= (sizeof(dnsheader
) + realname
.length())) {
273 memcpy(*response
+ sizeof(dnsheader
), realname
.c_str(), realname
.length());
277 if (ednsAdded
|| ecsAdded
) {
278 char * optStart
= NULL
;
282 int res
= locateEDNSOptRR(*response
, *responseLen
, &optStart
, &optLen
, &last
);
286 /* we added the entire OPT RR,
287 therefore we need to remove it entirely */
289 /* simply remove the last AR */
290 *responseLen
-= optLen
;
291 uint16_t arcount
= ntohs(dh
->arcount
);
293 dh
->arcount
= htons(arcount
);
296 /* Removing an intermediary RR could lead to compression error */
297 if (rewriteResponseWithoutEDNS(*response
, *responseLen
, rewrittenResponse
) == 0) {
298 *responseLen
= rewrittenResponse
.size();
299 if (addRoom
&& (UINT16_MAX
- *responseLen
) > addRoom
) {
300 rewrittenResponse
.reserve(*responseLen
+ addRoom
);
302 *responseSize
= rewrittenResponse
.capacity();
303 *response
= reinterpret_cast<char*>(rewrittenResponse
.data());
306 warnlog("Error rewriting content");
311 /* the OPT RR was already present, but without ECS,
312 we need to remove the ECS option if any */
314 /* nothing after the OPT RR, we can simply remove the
316 size_t existingOptLen
= optLen
;
317 removeEDNSOptionFromOPT(optStart
, &optLen
, EDNSOptionCode::ECS
);
318 *responseLen
-= (existingOptLen
- optLen
);
321 /* Removing an intermediary RR could lead to compression error */
322 if (rewriteResponseWithoutEDNSOption(*response
, *responseLen
, EDNSOptionCode::ECS
, rewrittenResponse
) == 0) {
323 *responseLen
= rewrittenResponse
.size();
324 if (addRoom
&& (UINT16_MAX
- *responseLen
) > addRoom
) {
325 rewrittenResponse
.reserve(*responseLen
+ addRoom
);
327 *responseSize
= rewrittenResponse
.capacity();
328 *response
= reinterpret_cast<char*>(rewrittenResponse
.data());
331 warnlog("Error rewriting content");
342 bool encryptResponse(char* response
, uint16_t* responseLen
, size_t responseSize
, bool tcp
, std::shared_ptr
<DnsCryptQuery
> dnsCryptQuery
, dnsheader
** dh
, dnsheader
* dhCopy
)
345 uint16_t encryptedResponseLen
= 0;
347 /* save the original header before encrypting it in place */
348 if (dh
!= nullptr && *dh
!= nullptr && dhCopy
!= nullptr) {
349 memcpy(dhCopy
, *dh
, sizeof(dnsheader
));
353 int res
= dnsCryptQuery
->ctx
->encryptResponse(response
, *responseLen
, responseSize
, dnsCryptQuery
, tcp
, &encryptedResponseLen
);
355 *responseLen
= encryptedResponseLen
;
357 /* dropping response */
358 vinfolog("Error encrypting the response, dropping.");
366 static bool sendUDPResponse(int origFD
, char* response
, uint16_t responseLen
, int delayMsec
, const ComboAddress
& origDest
, const ComboAddress
& origRemote
)
368 if(delayMsec
&& g_delay
) {
369 DelayedPacket dp
{origFD
, string(response
,responseLen
), origRemote
, origDest
};
370 g_delay
->submit(dp
, delayMsec
);
374 if(origDest
.sin4
.sin_family
== 0) {
375 res
= sendto(origFD
, response
, responseLen
, 0, (struct sockaddr
*)&origRemote
, origRemote
.getSocklen());
378 res
= sendfromto(origFD
, response
, responseLen
, 0, origDest
, origRemote
);
382 vinfolog("Error sending response to %s: %s", origRemote
.toStringWithPort(), strerror(err
));
389 // listens on a dedicated socket, lobs answers from downstream servers to original requestors
390 void* responderThread(std::shared_ptr
<DownstreamState
> state
)
392 auto localRespRulactions
= g_resprulactions
.getLocal();
394 char packet
[4096 + DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE
];
395 /* when the answer is encrypted in place, we need to get a copy
396 of the original header before encryption to fill the ring buffer */
401 static_assert(sizeof(packet
) <= UINT16_MAX
, "Packet size should fit in a uint16_t");
402 vector
<uint8_t> rewrittenResponse
;
404 uint16_t queryId
= 0;
406 dnsheader
* dh
= reinterpret_cast<struct dnsheader
*>(packet
);
408 ssize_t got
= recv(state
->fd
, packet
, sizeof(packet
), 0);
409 char * response
= packet
;
410 size_t responseSize
= sizeof(packet
);
412 if (got
< (ssize_t
) sizeof(dnsheader
))
415 uint16_t responseLen
= (uint16_t) got
;
418 if(queryId
>= state
->idStates
.size())
421 IDState
* ids
= &state
->idStates
[queryId
];
422 int origFD
= ids
->origFD
;
424 if(origFD
< 0) // duplicate
427 /* setting age to 0 to prevent the maintainer thread from
428 cleaning this IDS while we process the response.
429 We have already a copy of the origFD, so it would
430 mostly mess up the outstanding counter.
434 if (!responseContentMatches(response
, responseLen
, ids
->qname
, ids
->qtype
, ids
->qclass
, state
->remote
)) {
438 --state
->outstanding
; // you'd think an attacker could game this, but we're using connected socket
440 if(dh
->tc
&& g_truncateTC
) {
441 truncateTC(response
, &responseLen
);
444 dh
->id
= ids
->origID
;
446 uint16_t addRoom
= 0;
447 DNSResponse
dr(&ids
->qname
, ids
->qtype
, ids
->qclass
, &ids
->origDest
, &ids
->origRemote
, dh
, sizeof(packet
), responseLen
, false, &ids
->sentTime
.d_start
);
449 dr
.uniqueId
= ids
->uniqueId
;
451 if (!processResponse(localRespRulactions
, dr
, &ids
->delayMsec
)) {
456 if (ids
->dnsCryptQuery
) {
457 addRoom
= DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE
;
460 if (!fixUpResponse(&response
, &responseLen
, &responseSize
, ids
->qname
, ids
->origFlags
, ids
->ednsAdded
, ids
->ecsAdded
, rewrittenResponse
, addRoom
)) {
464 if (ids
->packetCache
&& !ids
->skipCache
) {
465 ids
->packetCache
->insert(ids
->cacheKey
, ids
->qname
, ids
->qtype
, ids
->qclass
, response
, responseLen
, false, dh
->rcode
);
468 if (ids
->cs
&& !ids
->cs
->muted
) {
470 if (!encryptResponse(response
, &responseLen
, responseSize
, false, ids
->dnsCryptQuery
, &dh
, &dhCopy
)) {
476 empty
.sin4
.sin_family
= 0;
477 /* if ids->destHarvested is false, origDest holds the listening address.
478 We don't want to use that as a source since it could be 0.0.0.0 for example. */
479 sendUDPResponse(origFD
, response
, responseLen
, ids
->delayMsec
, ids
->destHarvested
? ids
->origDest
: empty
, ids
->origRemote
);
484 double udiff
= ids
->sentTime
.udiff();
485 vinfolog("Got answer from %s, relayed to %s, took %f usec", state
->remote
.toStringWithPort(), ids
->origRemote
.toStringWithPort(), udiff
);
490 std::lock_guard
<std::mutex
> lock(g_rings
.respMutex
);
491 g_rings
.respRing
.push_back({ts
, ids
->origRemote
, ids
->qname
, ids
->qtype
, (unsigned int)udiff
, (unsigned int)got
, *dh
, state
->remote
});
494 if(dh
->rcode
== RCode::ServFail
)
495 g_stats
.servfailResponses
++;
496 state
->latencyUsec
= (127.0 * state
->latencyUsec
/ 128.0) + udiff
/128.0;
498 if(udiff
< 1000) g_stats
.latency0_1
++;
499 else if(udiff
< 10000) g_stats
.latency1_10
++;
500 else if(udiff
< 50000) g_stats
.latency10_50
++;
501 else if(udiff
< 100000) g_stats
.latency50_100
++;
502 else if(udiff
< 1000000) g_stats
.latency100_1000
++;
503 else g_stats
.latencySlow
++;
505 doLatencyAverages(udiff
);
507 if (ids
->origFD
== origFD
) {
509 ids
->dnsCryptQuery
= nullptr;
514 rewrittenResponse
.clear();
516 catch(std::exception
& e
){
517 vinfolog("Got an error in UDP responder thread while parsing a response from %s, id %d: %s", state
->remote
.toStringWithPort(), queryId
, e
.what());
522 catch(const std::exception
& e
)
524 errlog("UDP responder thread died because of exception: %s", e
.what());
527 catch(const PDNSException
& e
)
529 errlog("UDP responder thread died because of PowerDNS exception: %s", e
.reason
);
534 errlog("UDP responder thread died because of an exception: %s", "unknown");
538 void DownstreamState::reconnect()
542 /* shutdown() is needed to wake up recv() in the responderThread */
543 shutdown(fd
, SHUT_RDWR
);
547 if (!IsAnyAddress(remote
)) {
548 fd
= SSocket(remote
.sin4
.sin_family
, SOCK_DGRAM
, 0);
549 if (!IsAnyAddress(sourceAddr
)) {
550 SSetsockopt(fd
, SOL_SOCKET
, SO_REUSEADDR
, 1);
551 SBind(fd
, sourceAddr
);
554 SConnect(fd
, remote
);
557 catch(const std::runtime_error
& error
) {
558 infolog("Error connecting to new server with address %s: %s", remote
.toStringWithPort(), error
.what());
563 DownstreamState::DownstreamState(const ComboAddress
& remote_
, const ComboAddress
& sourceAddr_
, unsigned int sourceItf_
): remote(remote_
), sourceAddr(sourceAddr_
), sourceItf(sourceItf_
)
565 if (!IsAnyAddress(remote
)) {
567 idStates
.resize(g_maxOutstanding
);
569 infolog("Added downstream server %s", remote
.toStringWithPort());
573 std::mutex g_luamutex
;
576 GlobalStateHolder
<ServerPolicy
> g_policy
;
578 shared_ptr
<DownstreamState
> firstAvailable(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
580 for(auto& d
: servers
) {
581 if(d
.second
->isUp() && d
.second
->qps
.check())
584 return leastOutstanding(servers
, dq
);
587 // get server with least outstanding queries, and within those, with the lowest order, and within those: the fastest
588 shared_ptr
<DownstreamState
> leastOutstanding(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
590 if (servers
.size() == 1 && servers
[0].second
->isUp()) {
591 return servers
[0].second
;
594 vector
<pair
<tuple
<int,int,double>, shared_ptr
<DownstreamState
>>> poss
;
595 /* so you might wonder, why do we go through this trouble? The data on which we sort could change during the sort,
596 which would suck royally and could even lead to crashes. So first we snapshot on what we sort, and then we sort */
597 poss
.reserve(servers
.size());
598 for(auto& d
: servers
) {
599 if(d
.second
->isUp()) {
600 poss
.push_back({make_tuple(d
.second
->outstanding
.load(), d
.second
->order
, d
.second
->latencyUsec
), d
.second
});
604 return shared_ptr
<DownstreamState
>();
605 nth_element(poss
.begin(), poss
.begin(), poss
.end(), [](const decltype(poss
)::value_type
& a
, const decltype(poss
)::value_type
& b
) { return a
.first
< b
.first
; });
606 return poss
.begin()->second
;
609 shared_ptr
<DownstreamState
> valrandom(unsigned int val
, const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
611 vector
<pair
<int, shared_ptr
<DownstreamState
>>> poss
;
613 for(auto& d
: servers
) { // w=1, w=10 -> 1, 11
614 if(d
.second
->isUp()) {
615 sum
+=d
.second
->weight
;
616 poss
.push_back({sum
, d
.second
});
620 // Catch poss & sum are empty to avoid SIGFPE
622 return shared_ptr
<DownstreamState
>();
625 auto p
= upper_bound(poss
.begin(), poss
.end(),r
, [](int r_
, const decltype(poss
)::value_type
& a
) { return r_
< a
.first
;});
627 return shared_ptr
<DownstreamState
>();
631 shared_ptr
<DownstreamState
> wrandom(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
633 return valrandom(random(), servers
, dq
);
636 uint32_t g_hashperturb
;
637 shared_ptr
<DownstreamState
> whashed(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
639 return valrandom(dq
->qname
->hash(g_hashperturb
), servers
, dq
);
643 shared_ptr
<DownstreamState
> roundrobin(const NumberedServerVector
& servers
, const DNSQuestion
* dq
)
645 NumberedServerVector poss
;
647 for(auto& d
: servers
) {
648 if(d
.second
->isUp()) {
653 const auto *res
=&poss
;
658 return shared_ptr
<DownstreamState
>();
660 static unsigned int counter
;
662 return (*res
)[(counter
++) % res
->size()].second
;
665 static void writepid(string pidfile
) {
666 if (!pidfile
.empty()) {
667 // Clean up possible stale file
668 unlink(pidfile
.c_str());
671 ofstream
of(pidfile
.c_str());
675 errlog("Unable to write PID-file to '%s'.", pidfile
);
681 static void daemonize(void)
689 int i
=open("/dev/null",O_RDWR
); /* open stdin */
691 ; // L<<Logger::Critical<<"Unable to open /dev/null: "<<stringerror()<<endl;
693 dup2(i
,0); /* stdin */
694 dup2(i
,1); /* stderr */
695 dup2(i
,2); /* stderr */
700 ComboAddress g_serverControl
{"127.0.0.1:5199"};
702 std::shared_ptr
<ServerPool
> createPoolIfNotExists(pools_t
& pools
, const string
& poolName
)
704 std::shared_ptr
<ServerPool
> pool
;
705 pools_t::iterator it
= pools
.find(poolName
);
706 if (it
!= pools
.end()) {
710 if (!poolName
.empty())
711 vinfolog("Creating pool %s", poolName
);
712 pool
= std::make_shared
<ServerPool
>();
713 pools
.insert(std::pair
<std::string
,std::shared_ptr
<ServerPool
> >(poolName
, pool
));
718 void setPoolPolicy(pools_t
& pools
, const string
& poolName
, std::shared_ptr
<ServerPolicy
> policy
)
720 std::shared_ptr
<ServerPool
> pool
= createPoolIfNotExists(pools
, poolName
);
721 if (!poolName
.empty()) {
722 vinfolog("Setting pool %s server selection policy to %s", poolName
, policy
->name
);
724 vinfolog("Setting default pool server selection policy to %s", policy
->name
);
726 pool
->policy
= policy
;
729 void addServerToPool(pools_t
& pools
, const string
& poolName
, std::shared_ptr
<DownstreamState
> server
)
731 std::shared_ptr
<ServerPool
> pool
= createPoolIfNotExists(pools
, poolName
);
732 unsigned int count
= (unsigned int) pool
->servers
.size();
733 if (!poolName
.empty()) {
734 vinfolog("Adding server to pool %s", poolName
);
736 vinfolog("Adding server to default pool");
738 pool
->servers
.push_back(make_pair(++count
, server
));
739 /* we need to reorder based on the server 'order' */
740 std::stable_sort(pool
->servers
.begin(), pool
->servers
.end(), [](const std::pair
<unsigned int,std::shared_ptr
<DownstreamState
> >& a
, const std::pair
<unsigned int,std::shared_ptr
<DownstreamState
> >& b
) {
741 return a
.second
->order
< b
.second
->order
;
743 /* and now we need to renumber for Lua (custom policies) */
745 for (auto& serv
: pool
->servers
) {
750 void removeServerFromPool(pools_t
& pools
, const string
& poolName
, std::shared_ptr
<DownstreamState
> server
)
752 std::shared_ptr
<ServerPool
> pool
= getPool(pools
, poolName
);
754 if (!poolName
.empty()) {
755 vinfolog("Removing server from pool %s", poolName
);
758 vinfolog("Removing server from default pool");
763 for (NumberedVector
<shared_ptr
<DownstreamState
> >::iterator it
= pool
->servers
.begin(); it
!= pool
->servers
.end();) {
765 /* we need to renumber the servers placed
766 after the removed one, for Lua (custom policies) */
770 else if (it
->second
== server
) {
771 it
= pool
->servers
.erase(it
);
780 std::shared_ptr
<ServerPool
> getPool(const pools_t
& pools
, const std::string
& poolName
)
782 pools_t::const_iterator it
= pools
.find(poolName
);
784 if (it
== pools
.end()) {
785 throw std::out_of_range("No pool named " + poolName
);
791 const NumberedServerVector
& getDownstreamCandidates(const pools_t
& pools
, const std::string
& poolName
)
793 std::shared_ptr
<ServerPool
> pool
= getPool(pools
, poolName
);
794 return pool
->servers
;
797 // goal in life - if you send us a reasonably normal packet, we'll get Z for you, otherwise 0
798 int getEDNSZ(const char* packet
, unsigned int len
)
801 struct dnsheader
* dh
=(struct dnsheader
*)packet
;
803 if(ntohs(dh
->qdcount
) != 1 || dh
->ancount
!=0 || ntohs(dh
->arcount
)!=1 || dh
->nscount
!=0)
806 if (len
<= sizeof(dnsheader
))
809 unsigned int consumed
;
810 DNSName
qname(packet
, len
, sizeof(dnsheader
), false, 0, 0, &consumed
);
811 size_t pos
= consumed
+ DNS_TYPE_SIZE
+ DNS_CLASS_SIZE
;
812 uint16_t qtype
, qclass
;
814 if (len
<= (sizeof(dnsheader
)+pos
))
817 DNSName
aname(packet
, len
, sizeof(dnsheader
)+pos
, true, &qtype
, &qclass
, &consumed
);
819 if(qtype
!=QType::OPT
|| sizeof(dnsheader
)+pos
+consumed
+DNS_TYPE_SIZE
+DNS_CLASS_SIZE
+EDNS_EXTENDED_RCODE_SIZE
+EDNS_VERSION_SIZE
+1 >= len
)
822 uint8_t* z
= (uint8_t*)packet
+sizeof(dnsheader
)+pos
+consumed
+DNS_TYPE_SIZE
+DNS_CLASS_SIZE
+EDNS_EXTENDED_RCODE_SIZE
+EDNS_VERSION_SIZE
;
823 return 0x100 * (*z
) + *(z
+1);
830 static void spoofResponseFromString(DNSQuestion
& dq
, const string
& spoofContent
)
834 std::vector
<std::string
> addrs
;
835 stringtok(addrs
, spoofContent
, " ,");
837 if (addrs
.size() == 1) {
839 ComboAddress
spoofAddr(spoofContent
);
840 SpoofAction
sa({spoofAddr
});
843 catch(const PDNSException
&e
) {
844 SpoofAction
sa(spoofContent
); // CNAME then
848 std::vector
<ComboAddress
> cas
;
849 for (const auto& addr
: addrs
) {
851 cas
.push_back(ComboAddress(addr
));
861 bool processQuery(LocalHolders
& holders
, DNSQuestion
& dq
, string
& poolname
, int* delayMsec
, const struct timespec
& now
)
864 WriteLock
wl(&g_rings
.queryLock
);
865 g_rings
.queryRing
.push_back({now
,*dq
.remote
,*dq
.qname
,dq
.len
,dq
.qtype
,*dq
.dh
});
868 if(g_qcount
.enabled
) {
869 string qname
= (*dq
.qname
).toString(".");
870 bool countQuery
{true};
871 if(g_qcount
.filter
) {
872 std::lock_guard
<std::mutex
> lock(g_luamutex
);
873 std::tie (countQuery
, qname
) = g_qcount
.filter(dq
);
877 WriteLock
wl(&g_qcount
.queryLock
);
878 if(!g_qcount
.records
.count(qname
)) {
879 g_qcount
.records
[qname
] = 0;
881 g_qcount
.records
[qname
]++;
885 if(auto got
= holders
.dynNMGBlock
->lookup(*dq
.remote
)) {
886 auto updateBlockStats
= [&got
]() {
887 g_stats
.dynBlocked
++;
888 got
->second
.blocks
++;
891 if(now
< got
->second
.until
) {
892 DNSAction::Action action
= got
->second
.action
;
893 if (action
== DNSAction::Action::None
) {
894 action
= g_dynBlockAction
;
896 if (action
== DNSAction::Action::Refused
) {
897 vinfolog("Query from %s refused because of dynamic block", dq
.remote
->toStringWithPort());
900 dq
.dh
->rcode
= RCode::Refused
;
904 else if (action
== DNSAction::Action::Truncate
) {
907 vinfolog("Query from %s truncated because of dynamic block", dq
.remote
->toStringWithPort());
913 vinfolog("Query from %s for %s over TCP *not* truncated because of dynamic block", dq
.remote
->toStringWithPort(), dq
.qname
->toString());
919 vinfolog("Query from %s dropped because of dynamic block", dq
.remote
->toStringWithPort());
925 if(auto got
= holders
.dynSMTBlock
->lookup(*dq
.qname
)) {
926 auto updateBlockStats
= [&got
]() {
927 g_stats
.dynBlocked
++;
931 if(now
< got
->until
) {
932 DNSAction::Action action
= got
->action
;
933 if (action
== DNSAction::Action::None
) {
934 action
= g_dynBlockAction
;
936 if (action
== DNSAction::Action::Refused
) {
937 vinfolog("Query from %s for %s refused because of dynamic block", dq
.remote
->toStringWithPort(), dq
.qname
->toString());
940 dq
.dh
->rcode
= RCode::Refused
;
944 else if (action
== DNSAction::Action::Truncate
) {
948 vinfolog("Query from %s for %s truncated because of dynamic block", dq
.remote
->toStringWithPort(), dq
.qname
->toString());
954 vinfolog("Query from %s for %s over TCP *not* truncated because of dynamic block", dq
.remote
->toStringWithPort(), dq
.qname
->toString());
959 vinfolog("Query from %s for %s dropped because of dynamic block", dq
.remote
->toStringWithPort(), dq
.qname
->toString());
965 DNSAction::Action action
=DNSAction::Action::None
;
967 for(const auto& lr
: *holders
.rulactions
) {
968 if(lr
.first
->matches(&dq
)) {
969 lr
.first
->d_matches
++;
970 action
=(*lr
.second
)(&dq
, &ruleresult
);
973 case DNSAction::Action::Allow
:
976 case DNSAction::Action::Drop
:
980 case DNSAction::Action::Nxdomain
:
981 dq
.dh
->rcode
= RCode::NXDomain
;
983 g_stats
.ruleNXDomain
++;
986 case DNSAction::Action::Refused
:
987 dq
.dh
->rcode
= RCode::Refused
;
989 g_stats
.ruleRefused
++;
992 case DNSAction::Action::Spoof
:
993 spoofResponseFromString(dq
, ruleresult
);
996 case DNSAction::Action::Truncate
:
1001 case DNSAction::Action::HeaderModify
:
1004 case DNSAction::Action::Pool
:
1005 poolname
=ruleresult
;
1008 /* non-terminal actions follow */
1009 case DNSAction::Action::Delay
:
1010 *delayMsec
= static_cast<int>(pdns_stou(ruleresult
)); // sorry
1012 case DNSAction::Action::None
:
1021 bool processResponse(LocalStateHolder
<vector
<pair
<std::shared_ptr
<DNSRule
>, std::shared_ptr
<DNSResponseAction
> > > >& localRespRulactions
, DNSResponse
& dr
, int* delayMsec
)
1023 DNSResponseAction::Action action
=DNSResponseAction::Action::None
;
1024 std::string ruleresult
;
1025 for(const auto& lr
: *localRespRulactions
) {
1026 if(lr
.first
->matches(&dr
)) {
1027 lr
.first
->d_matches
++;
1028 action
=(*lr
.second
)(&dr
, &ruleresult
);
1030 case DNSResponseAction::Action::Allow
:
1033 case DNSResponseAction::Action::Drop
:
1036 case DNSResponseAction::Action::HeaderModify
:
1039 /* non-terminal actions follow */
1040 case DNSResponseAction::Action::Delay
:
1041 *delayMsec
= static_cast<int>(pdns_stou(ruleresult
)); // sorry
1043 case DNSResponseAction::Action::None
:
1052 static ssize_t
udpClientSendRequestToBackend(DownstreamState
* ss
, const int sd
, const char* request
, const size_t requestLen
)
1056 if (ss
->sourceItf
== 0) {
1057 result
= send(sd
, request
, requestLen
, 0);
1063 fillMSGHdr(&msgh
, &iov
, cbuf
, sizeof(cbuf
), const_cast<char*>(request
), requestLen
, &ss
->remote
);
1064 addCMsgSrcAddr(&msgh
, cbuf
, &ss
->sourceAddr
, ss
->sourceItf
);
1065 result
= sendmsg(sd
, &msgh
, 0);
1069 int savederrno
= errno
;
1070 vinfolog("Error sending request to backend %s: %d", ss
->remote
.toStringWithPort(), savederrno
);
1072 /* This might sound silly, but on Linux send() might fail with EINVAL
1073 if the interface the socket was bound to doesn't exist anymore. */
1074 if (savederrno
== EINVAL
) {
1082 static bool isUDPQueryAcceptable(ClientState
& cs
, LocalHolders
& holders
, const struct msghdr
* msgh
, const ComboAddress
& remote
, ComboAddress
& dest
)
1084 if (msgh
->msg_flags
& MSG_TRUNC
) {
1085 /* message was too large for our buffer */
1086 vinfolog("Dropping message too large for our buffer");
1087 g_stats
.nonCompliantQueries
++;
1091 if(!holders
.acl
->match(remote
)) {
1092 vinfolog("Query from %s dropped because of ACL", remote
.toStringWithPort());
1100 if (HarvestDestinationAddress(msgh
, &dest
)) {
1101 /* we don't get the port, only the address */
1102 dest
.sin4
.sin_port
= cs
.local
.sin4
.sin_port
;
1105 dest
.sin4
.sin_family
= 0;
1111 #ifdef HAVE_DNSCRYPT
1112 static bool checkDNSCryptQuery(const ClientState
& cs
, const char* query
, uint16_t& len
, std::shared_ptr
<DnsCryptQuery
>& dnsCryptQuery
, const ComboAddress
& dest
, const ComboAddress
& remote
)
1114 if (cs
.dnscryptCtx
) {
1115 vector
<uint8_t> response
;
1116 uint16_t decryptedQueryLen
= 0;
1118 dnsCryptQuery
= std::make_shared
<DnsCryptQuery
>();
1120 bool decrypted
= handleDnsCryptQuery(cs
.dnscryptCtx
, const_cast<char*>(query
), len
, dnsCryptQuery
, &decryptedQueryLen
, false, response
);
1123 if (response
.size() > 0) {
1124 sendUDPResponse(cs
.udpFD
, reinterpret_cast<char*>(response
.data()), static_cast<uint16_t>(response
.size()), 0, dest
, remote
);
1129 len
= decryptedQueryLen
;
1133 #endif /* HAVE_DNSCRYPT */
1135 bool checkQueryHeaders(const struct dnsheader
* dh
)
1137 if (dh
->qr
) { // don't respond to responses
1138 g_stats
.nonCompliantQueries
++;
1142 if (dh
->qdcount
== 0) {
1143 g_stats
.emptyQueries
++;
1148 g_stats
.rdQueries
++;
1154 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1155 static void queueResponse(const ClientState
& cs
, const char* response
, uint16_t responseLen
, const ComboAddress
& dest
, const ComboAddress
& remote
, struct mmsghdr
& outMsg
, struct iovec
* iov
, char* cbuf
)
1158 fillMSGHdr(&outMsg
.msg_hdr
, iov
, nullptr, 0, const_cast<char*>(response
), responseLen
, const_cast<ComboAddress
*>(&remote
));
1160 if (dest
.sin4
.sin_family
== 0) {
1161 outMsg
.msg_hdr
.msg_control
= nullptr;
1164 addCMsgSrcAddr(&outMsg
.msg_hdr
, cbuf
, &dest
, 0);
1167 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1169 static void processUDPQuery(ClientState
& cs
, LocalHolders
& holders
, const struct msghdr
* msgh
, const ComboAddress
& remote
, ComboAddress
& dest
, char* query
, uint16_t len
, size_t queryBufferSize
, struct mmsghdr
* responsesVect
, unsigned int* queuedResponses
, struct iovec
* respIOV
, char* respCBuf
)
1171 assert(responsesVect
== nullptr || (queuedResponses
!= nullptr && respIOV
!= nullptr && respCBuf
!= nullptr));
1172 uint16_t queryId
= 0;
1175 if (!isUDPQueryAcceptable(cs
, holders
, msgh
, remote
, dest
)) {
1179 #ifdef HAVE_DNSCRYPT
1180 std::shared_ptr
<DnsCryptQuery
> dnsCryptQuery
= nullptr;
1182 if (!checkDNSCryptQuery(cs
, query
, len
, dnsCryptQuery
, dest
, remote
)) {
1187 struct dnsheader
* dh
= reinterpret_cast<struct dnsheader
*>(query
);
1188 queryId
= ntohs(dh
->id
);
1190 if (!checkQueryHeaders(dh
)) {
1194 const uint16_t * flags
= getFlagsFromDNSHeader(dh
);
1195 const uint16_t origFlags
= *flags
;
1196 uint16_t qtype
, qclass
;
1197 unsigned int consumed
= 0;
1198 DNSName
qname(query
, len
, sizeof(dnsheader
), false, &qtype
, &qclass
, &consumed
);
1199 DNSQuestion
dq(&qname
, qtype
, qclass
, dest
.sin4
.sin_family
!= 0 ? &dest
: &cs
.local
, &remote
, dh
, queryBufferSize
, len
, false);
1203 /* we need an accurate ("real") value for the response and
1204 to store into the IDS, but not for insertion into the
1205 rings for example */
1206 struct timespec realTime
;
1207 struct timespec now
;
1209 gettime(&realTime
, true);
1211 if (!processQuery(holders
, dq
, poolname
, &delayMsec
, now
))
1216 if(dq
.dh
->qr
) { // something turned it into a response
1217 g_stats
.selfAnswered
++;
1218 restoreFlags(dh
, origFlags
);
1221 char* response
= query
;
1222 uint16_t responseLen
= dq
.len
;
1224 #ifdef HAVE_DNSCRYPT
1225 if (!encryptResponse(response
, &responseLen
, dq
.size
, false, dnsCryptQuery
, nullptr, nullptr)) {
1229 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1230 if (delayMsec
== 0 && responsesVect
!= nullptr) {
1231 queueResponse(cs
, response
, responseLen
, dest
, remote
, responsesVect
[*queuedResponses
], respIOV
, respCBuf
);
1232 (*queuedResponses
)++;
1235 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1237 sendUDPResponse(cs
.udpFD
, response
, responseLen
, delayMsec
, dest
, remote
);
1244 DownstreamState
* ss
= nullptr;
1245 std::shared_ptr
<ServerPool
> serverPool
= getPool(*holders
.pools
, poolname
);
1246 std::shared_ptr
<DNSDistPacketCache
> packetCache
= nullptr;
1247 auto policy
= holders
.policy
->policy
;
1248 if (serverPool
->policy
!= nullptr) {
1249 policy
= serverPool
->policy
->policy
;
1252 std::lock_guard
<std::mutex
> lock(g_luamutex
);
1253 ss
= policy(serverPool
->servers
, &dq
).get();
1254 packetCache
= serverPool
->packetCache
;
1257 bool ednsAdded
= false;
1258 bool ecsAdded
= false;
1259 if (dq
.useECS
&& ss
&& ss
->useECS
) {
1260 if (!handleEDNSClientSubnet(query
, dq
.size
, consumed
, &dq
.len
, &(ednsAdded
), &(ecsAdded
), remote
, dq
.ecsOverride
, dq
.ecsPrefixLength
)) {
1261 vinfolog("Dropping query from %s because we couldn't insert the ECS value", remote
.toStringWithPort());
1266 uint32_t cacheKey
= 0;
1267 if (packetCache
&& !dq
.skipCache
) {
1268 uint16_t cachedResponseSize
= dq
.size
;
1269 uint32_t allowExpired
= ss
? 0 : g_staleCacheEntriesTTL
;
1270 if (packetCache
->get(dq
, consumed
, dh
->id
, query
, &cachedResponseSize
, &cacheKey
, allowExpired
)) {
1271 DNSResponse
dr(dq
.qname
, dq
.qtype
, dq
.qclass
, dq
.local
, dq
.remote
, reinterpret_cast<dnsheader
*>(query
), dq
.size
, cachedResponseSize
, false, &realTime
);
1272 #ifdef HAVE_PROTOBUF
1273 dr
.uniqueId
= dq
.uniqueId
;
1275 if (!processResponse(holders
.cacheHitRespRulactions
, dr
, &delayMsec
)) {
1280 #ifdef HAVE_DNSCRYPT
1281 if (!encryptResponse(query
, &cachedResponseSize
, dq
.size
, false, dnsCryptQuery
, nullptr, nullptr)) {
1285 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1286 if (delayMsec
== 0 && responsesVect
!= nullptr) {
1287 queueResponse(cs
, query
, cachedResponseSize
, dest
, remote
, responsesVect
[*queuedResponses
], respIOV
, respCBuf
);
1288 (*queuedResponses
)++;
1291 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1293 sendUDPResponse(cs
.udpFD
, query
, cachedResponseSize
, delayMsec
, dest
, remote
);
1297 g_stats
.cacheHits
++;
1298 g_stats
.latency0_1
++; // we're not going to measure this
1299 doLatencyAverages(0); // same
1302 g_stats
.cacheMisses
++;
1308 if (g_servFailOnNoPolicy
&& !cs
.muted
) {
1309 char* response
= query
;
1310 uint16_t responseLen
= dq
.len
;
1311 restoreFlags(dh
, origFlags
);
1313 dq
.dh
->rcode
= RCode::ServFail
;
1316 #ifdef HAVE_DNSCRYPT
1317 if (!encryptResponse(response
, &responseLen
, dq
.size
, false, dnsCryptQuery
, nullptr, nullptr)) {
1321 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1322 if (responsesVect
!= nullptr) {
1323 queueResponse(cs
, response
, responseLen
, dest
, remote
, responsesVect
[*queuedResponses
], respIOV
, respCBuf
);
1324 (*queuedResponses
)++;
1327 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1329 sendUDPResponse(cs
.udpFD
, response
, responseLen
, 0, dest
, remote
);
1332 vinfolog("%s query for %s|%s from %s, no policy applied", g_servFailOnNoPolicy
? "Dropped" : "ServFailed", dq
.qname
->toString(), QType(dq
.qtype
).getName(), remote
.toStringWithPort());
1338 unsigned int idOffset
= (ss
->idOffset
++) % ss
->idStates
.size();
1339 IDState
* ids
= &ss
->idStates
[idOffset
];
1342 if(ids
->origFD
< 0) // if we are reusing, no change in outstanding
1346 g_stats
.downstreamTimeouts
++;
1350 ids
->origFD
= cs
.udpFD
;
1351 ids
->origID
= dh
->id
;
1352 ids
->origRemote
= remote
;
1353 ids
->sentTime
.set(realTime
);
1355 ids
->qtype
= dq
.qtype
;
1356 ids
->qclass
= dq
.qclass
;
1357 ids
->delayMsec
= delayMsec
;
1358 ids
->origFlags
= origFlags
;
1359 ids
->cacheKey
= cacheKey
;
1360 ids
->skipCache
= dq
.skipCache
;
1361 ids
->packetCache
= packetCache
;
1362 ids
->ednsAdded
= ednsAdded
;
1363 ids
->ecsAdded
= ecsAdded
;
1365 /* If we couldn't harvest the real dest addr, still
1366 write down the listening addr since it will be useful
1367 (especially if it's not an 'any' one).
1368 We need to keep track of which one it is since we may
1369 want to use the real but not the listening addr to reply.
1371 if (dest
.sin4
.sin_family
!= 0) {
1372 ids
->origDest
= dest
;
1373 ids
->destHarvested
= true;
1376 ids
->origDest
= cs
.local
;
1377 ids
->destHarvested
= false;
1379 #ifdef HAVE_DNSCRYPT
1380 ids
->dnsCryptQuery
= dnsCryptQuery
;
1382 #ifdef HAVE_PROTOBUF
1383 ids
->uniqueId
= dq
.uniqueId
;
1388 ssize_t ret
= udpClientSendRequestToBackend(ss
, ss
->fd
, query
, dq
.len
);
1392 g_stats
.downstreamSendErrors
++;
1395 vinfolog("Got query for %s|%s from %s, relayed to %s", ids
->qname
.toString(), QType(ids
->qtype
).getName(), remote
.toStringWithPort(), ss
->getName());
1397 catch(const std::exception
& e
){
1398 vinfolog("Got an error in UDP question thread while parsing a query from %s, id %d: %s", remote
.toStringWithPort(), queryId
, e
.what());
1402 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1403 static void MultipleMessagesUDPClientThread(ClientState
* cs
, LocalHolders
& holders
)
1408 /* used by HarvestDestinationAddress */
1410 ComboAddress remote
;
1414 const size_t vectSize
= g_udpVectorSize
;
1415 /* the actual buffer is larger because:
1416 - we may have to add EDNS and/or ECS
1417 - we use it for self-generated responses (from rule or cache)
1418 but we only accept incoming payloads up to that size
1420 static_assert(s_udpIncomingBufferSize
<= sizeof(MMReceiver::packet
), "the incoming buffer size should not be larger than sizeof(MMReceiver::packet)");
1422 auto recvData
= std::unique_ptr
<MMReceiver
[]>(new MMReceiver
[vectSize
]);
1423 auto msgVec
= std::unique_ptr
<struct mmsghdr
[]>(new struct mmsghdr
[vectSize
]);
1424 auto outMsgVec
= std::unique_ptr
<struct mmsghdr
[]>(new struct mmsghdr
[vectSize
]);
1426 /* initialize the structures needed to receive our messages */
1427 for (size_t idx
= 0; idx
< vectSize
; idx
++) {
1428 recvData
[idx
].remote
.sin4
.sin_family
= cs
->local
.sin4
.sin_family
;
1429 fillMSGHdr(&msgVec
[idx
].msg_hdr
, &recvData
[idx
].iov
, recvData
[idx
].cbuf
, sizeof(recvData
[idx
].cbuf
), recvData
[idx
].packet
, s_udpIncomingBufferSize
, &recvData
[idx
].remote
);
1435 /* reset the IO vector, since it's also used to send the vector of responses
1436 to avoid having to copy the data around */
1437 for (size_t idx
= 0; idx
< vectSize
; idx
++) {
1438 recvData
[idx
].iov
.iov_base
= recvData
[idx
].packet
;
1439 recvData
[idx
].iov
.iov_len
= sizeof(recvData
[idx
].packet
);
1442 /* block until we have at least one message ready, but return
1443 as many as possible to save the syscall costs */
1444 int msgsGot
= recvmmsg(cs
->udpFD
, msgVec
.get(), vectSize
, MSG_WAITFORONE
| MSG_TRUNC
, nullptr);
1447 vinfolog("Getting UDP messages via recvmmsg() failed with: %s", strerror(errno
));
1451 unsigned int msgsToSend
= 0;
1453 /* process the received messages */
1454 for (int msgIdx
= 0; msgIdx
< msgsGot
; msgIdx
++) {
1455 const struct msghdr
* msgh
= &msgVec
[msgIdx
].msg_hdr
;
1456 unsigned int got
= msgVec
[msgIdx
].msg_len
;
1457 const ComboAddress
& remote
= recvData
[msgIdx
].remote
;
1459 if (got
< sizeof(struct dnsheader
)) {
1460 g_stats
.nonCompliantQueries
++;
1464 processUDPQuery(*cs
, holders
, msgh
, remote
, recvData
[msgIdx
].dest
, recvData
[msgIdx
].packet
, static_cast<uint16_t>(got
), sizeof(recvData
[msgIdx
].packet
), outMsgVec
.get(), &msgsToSend
, &recvData
[msgIdx
].iov
, recvData
[msgIdx
].cbuf
);
1468 /* immediate (not delayed or sent to a backend) responses (mostly from a rule, dynamic block
1469 or the cache) can be sent in batch too */
1471 if (msgsToSend
> 0 && msgsToSend
<= static_cast<unsigned int>(msgsGot
)) {
1472 int sent
= sendmmsg(cs
->udpFD
, outMsgVec
.get(), msgsToSend
, 0);
1474 if (sent
< 0 || static_cast<unsigned int>(sent
) != msgsToSend
) {
1475 vinfolog("Error sending responses with sendmmsg() (%d on %u): %s", sent
, msgsToSend
, strerror(errno
));
1481 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1483 // listens to incoming queries, sends out to downstream servers, noting the intended return path
1484 static void* udpClientThread(ClientState
* cs
)
1487 LocalHolders holders
;
1489 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1490 if (g_udpVectorSize
> 1) {
1491 MultipleMessagesUDPClientThread(cs
, holders
);
1495 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1498 /* the actual buffer is larger because:
1499 - we may have to add EDNS and/or ECS
1500 - we use it for self-generated responses (from rule or cache)
1501 but we only accept incoming payloads up to that size
1503 static_assert(s_udpIncomingBufferSize
<= sizeof(packet
), "the incoming buffer size should not be larger than sizeof(MMReceiver::packet)");
1506 /* used by HarvestDestinationAddress */
1509 ComboAddress remote
;
1511 remote
.sin4
.sin_family
= cs
->local
.sin4
.sin_family
;
1512 fillMSGHdr(&msgh
, &iov
, cbuf
, sizeof(cbuf
), packet
, sizeof(packet
), &remote
);
1515 ssize_t got
= recvmsg(cs
->udpFD
, &msgh
, 0);
1517 if (got
< 0 || static_cast<size_t>(got
) < sizeof(struct dnsheader
)) {
1518 g_stats
.nonCompliantQueries
++;
1522 processUDPQuery(*cs
, holders
, &msgh
, remote
, dest
, packet
, static_cast<uint16_t>(got
), s_udpIncomingBufferSize
, nullptr, nullptr, nullptr, nullptr);
1528 catch(const std::exception
&e
)
1530 errlog("UDP client thread died because of exception: %s", e
.what());
1533 catch(const PDNSException
&e
)
1535 errlog("UDP client thread died because of PowerDNS exception: %s", e
.reason
);
1540 errlog("UDP client thread died because of an exception: %s", "unknown");
1544 static bool upCheck(DownstreamState
& ds
)
1547 vector
<uint8_t> packet
;
1548 DNSPacketWriter
dpw(packet
, ds
.checkName
, ds
.checkType
.getCode());
1549 dnsheader
* requestHeader
= dpw
.getHeader();
1550 requestHeader
->rd
=true;
1552 requestHeader
->cd
= true;
1555 Socket
sock(ds
.remote
.sin4
.sin_family
, SOCK_DGRAM
);
1556 sock
.setNonBlocking();
1557 if (!IsAnyAddress(ds
.sourceAddr
)) {
1558 sock
.setReuseAddr();
1559 sock
.bind(ds
.sourceAddr
);
1561 sock
.connect(ds
.remote
);
1562 ssize_t sent
= udpClientSendRequestToBackend(&ds
, sock
.getHandle(), (char*)&packet
[0], packet
.size());
1565 if (g_verboseHealthChecks
)
1566 infolog("Error while sending a health check query to backend %s: %d", ds
.getNameWithAddr(), ret
);
1570 int ret
=waitForRWData(sock
.getHandle(), true, 1, 0);
1571 if(ret
< 0 || !ret
) { // error, timeout, both are down!
1574 if (g_verboseHealthChecks
)
1575 infolog("Error while waiting for the health check response from backend %s: %d", ds
.getNameWithAddr(), ret
);
1578 if (g_verboseHealthChecks
)
1579 infolog("Timeout while waiting for the health check response from backend %s", ds
.getNameWithAddr());
1585 sock
.recvFrom(reply
, ds
.remote
);
1587 const dnsheader
* responseHeader
= (const dnsheader
*) reply
.c_str();
1589 if (reply
.size() < sizeof(*responseHeader
)) {
1590 if (g_verboseHealthChecks
)
1591 infolog("Invalid health check response of size %d from backend %s, expecting at least %d", reply
.size(), ds
.getNameWithAddr(), sizeof(*responseHeader
));
1595 if (responseHeader
->id
!= requestHeader
->id
) {
1596 if (g_verboseHealthChecks
)
1597 infolog("Invalid health check response id %d from backend %s, expecting %d", responseHeader
->id
, ds
.getNameWithAddr(), requestHeader
->id
);
1601 if (!responseHeader
->qr
) {
1602 if (g_verboseHealthChecks
)
1603 infolog("Invalid health check response from backend %s, expecting QR to be set", ds
.getNameWithAddr());
1607 if (responseHeader
->rcode
== RCode::ServFail
) {
1608 if (g_verboseHealthChecks
)
1609 infolog("Backend %s responded to health check with ServFail", ds
.getNameWithAddr());
1613 if (ds
.mustResolve
&& (responseHeader
->rcode
== RCode::NXDomain
|| responseHeader
->rcode
== RCode::Refused
)) {
1614 if (g_verboseHealthChecks
)
1615 infolog("Backend %s responded to health check with %s while mustResolve is set", ds
.getNameWithAddr(), responseHeader
->rcode
== RCode::NXDomain
? "NXDomain" : "Refused");
1619 // XXX fixme do bunch of checking here etc
1622 catch(const std::exception
& e
)
1624 if (g_verboseHealthChecks
)
1625 infolog("Error checking the health of backend %s: %s", ds
.getNameWithAddr(), e
.what());
1630 if (g_verboseHealthChecks
)
1631 infolog("Unknown exception while checking the health of backend %s", ds
.getNameWithAddr());
1635 uint64_t g_maxTCPClientThreads
{10};
1636 std::atomic
<uint16_t> g_cacheCleaningDelay
{60};
1637 std::atomic
<uint16_t> g_cacheCleaningPercentage
{100};
1643 int32_t secondsToWaitLog
= 0;
1649 std::lock_guard
<std::mutex
> lock(g_luamutex
);
1650 auto f
= g_lua
.readVariable
<boost::optional
<std::function
<void()> > >("maintenance");
1654 secondsToWaitLog
= 0;
1656 catch(std::exception
&e
) {
1657 if (secondsToWaitLog
<= 0) {
1658 infolog("Error during execution of maintenance function: %s", e
.what());
1659 secondsToWaitLog
= 61;
1661 secondsToWaitLog
-= interval
;
1667 if (counter
>= g_cacheCleaningDelay
) {
1668 const auto localPools
= g_pools
.getCopy();
1669 std::shared_ptr
<DNSDistPacketCache
> packetCache
= nullptr;
1670 for (const auto& entry
: localPools
) {
1672 std::lock_guard
<std::mutex
> lock(g_luamutex
);
1673 packetCache
= entry
.second
->packetCache
;
1676 size_t upTo
= (packetCache
->getMaxEntries()* (100 - g_cacheCleaningPercentage
)) / 100;
1677 packetCache
->purgeExpired(upTo
);
1683 // ponder pruning g_dynblocks of expired entries here
1688 void* healthChecksThread()
1695 if(g_tcpclientthreads
->getQueuedCount() > 1 && !g_tcpclientthreads
->hasReachedMaxThreads())
1696 g_tcpclientthreads
->addTCPClientThread();
1698 for(auto& dss
: g_dstates
.getCopy()) { // this points to the actual shared_ptrs!
1699 if(dss
->availability
==DownstreamState::Availability::Auto
) {
1700 bool newState
=upCheck(*dss
);
1702 if (dss
->currentCheckFailures
!= 0) {
1703 dss
->currentCheckFailures
= 0;
1706 else if (!newState
&& dss
->upStatus
) {
1707 dss
->currentCheckFailures
++;
1708 if (dss
->currentCheckFailures
< dss
->maxCheckFailures
) {
1713 if(newState
!= dss
->upStatus
) {
1714 warnlog("Marking downstream %s as '%s'", dss
->getNameWithAddr(), newState
? "up" : "down");
1716 if (newState
&& !dss
->connected
) {
1718 SConnect(dss
->fd
, dss
->remote
);
1719 dss
->connected
= true;
1720 dss
->tid
= thread(responderThread
, dss
);
1722 catch(const std::runtime_error
& error
) {
1723 infolog("Error connecting to new server with address %s: %s", dss
->remote
.toStringWithPort(), error
.what());
1725 dss
->connected
= false;
1729 dss
->upStatus
= newState
;
1730 dss
->currentCheckFailures
= 0;
1731 if (g_snmpAgent
&& g_snmpTrapsEnabled
) {
1732 g_snmpAgent
->sendBackendStatusChangeTrap(dss
);
1737 auto delta
= dss
->sw
.udiffAndSet()/1000000.0;
1738 dss
->queryLoad
= 1.0*(dss
->queries
.load() - dss
->prev
.queries
.load())/delta
;
1739 dss
->dropRate
= 1.0*(dss
->reuseds
.load() - dss
->prev
.reuseds
.load())/delta
;
1740 dss
->prev
.queries
.store(dss
->queries
.load());
1741 dss
->prev
.reuseds
.store(dss
->reuseds
.load());
1743 for(IDState
& ids
: dss
->idStates
) { // timeouts
1744 if(ids
.origFD
>=0 && ids
.age
++ > g_udpTimeout
) {
1745 /* We set origFD to -1 as soon as possible
1746 to limit the risk of racing with the
1748 The UDP client thread only checks origFD to
1749 know whether outstanding has to be incremented,
1750 so the sooner the better any way since we _will_
1757 g_stats
.downstreamTimeouts
++; // this is an 'actively' discovered timeout
1758 vinfolog("Had a downstream timeout from %s (%s) for query for %s|%s from %s",
1759 dss
->remote
.toStringWithPort(), dss
->name
,
1760 ids
.qname
.toString(), QType(ids
.qtype
).getName(), ids
.origRemote
.toStringWithPort());
1765 struct dnsheader fake
;
1766 memset(&fake
, 0, sizeof(fake
));
1767 fake
.id
= ids
.origID
;
1769 std::lock_guard
<std::mutex
> lock(g_rings
.respMutex
);
1770 g_rings
.respRing
.push_back({ts
, ids
.origRemote
, ids
.qname
, ids
.qtype
, std::numeric_limits
<unsigned int>::max(), 0, fake
, dss
->remote
});
1781 void controlThread(int fd
, ComboAddress local
)
1784 ComboAddress client
;
1786 warnlog("Accepting control connections on %s", local
.toStringWithPort());
1787 while((sock
=SAccept(fd
, client
)) >= 0) {
1788 if (g_logConsoleConnections
) {
1789 warnlog("Got control connection from %s", client
.toStringWithPort());
1792 thread
t(controlClientThread
, sock
, client
);
1796 catch(std::exception
& e
)
1799 errlog("Control connection died: %s", e
.what());
1804 static void bindAny(int af
, int sock
)
1809 if (setsockopt(sock
, IPPROTO_IP
, IP_FREEBIND
, &one
, sizeof(one
)) < 0)
1810 warnlog("Warning: IP_FREEBIND setsockopt failed: %s", strerror(errno
));
1815 if (setsockopt(sock
, IPPROTO_IP
, IP_BINDANY
, &one
, sizeof(one
)) < 0)
1816 warnlog("Warning: IP_BINDANY setsockopt failed: %s", strerror(errno
));
1820 if (setsockopt(sock
, IPPROTO_IPV6
, IPV6_BINDANY
, &one
, sizeof(one
)) < 0)
1821 warnlog("Warning: IPV6_BINDANY setsockopt failed: %s", strerror(errno
));
1824 if (setsockopt(sock
, SOL_SOCKET
, SO_BINDANY
, &one
, sizeof(one
)) < 0)
1825 warnlog("Warning: SO_BINDANY setsockopt failed: %s", strerror(errno
));
1829 static void dropGroupPrivs(gid_t gid
)
1832 if (setgid(gid
) == 0) {
1833 if (setgroups(0, NULL
) < 0) {
1834 warnlog("Warning: Unable to drop supplementary gids: %s", strerror(errno
));
1838 warnlog("Warning: Unable to set group ID to %d: %s", gid
, strerror(errno
));
1843 static void dropUserPrivs(uid_t uid
)
1846 if(setuid(uid
) < 0) {
1847 warnlog("Warning: Unable to set user ID to %d: %s", uid
, strerror(errno
));
1852 static void checkFileDescriptorsLimits(size_t udpBindsCount
, size_t tcpBindsCount
)
1854 /* stdin, stdout, stderr */
1855 size_t requiredFDsCount
= 3;
1856 size_t backendsCount
= g_dstates
.getCopy().size();
1857 /* listening sockets */
1858 requiredFDsCount
+= udpBindsCount
;
1859 requiredFDsCount
+= tcpBindsCount
;
1860 /* max TCP connections currently served */
1861 requiredFDsCount
+= g_maxTCPClientThreads
;
1862 /* max pipes for communicating between TCP acceptors and client threads */
1863 requiredFDsCount
+= (g_maxTCPClientThreads
* 2);
1864 /* UDP sockets to backends */
1865 requiredFDsCount
+= backendsCount
;
1866 /* TCP sockets to backends */
1867 requiredFDsCount
+= (backendsCount
* g_maxTCPClientThreads
);
1868 /* max TCP queued connections */
1869 requiredFDsCount
+= g_maxTCPQueuedConnections
;
1870 /* DelayPipe pipe */
1871 requiredFDsCount
+= 2;
1874 /* webserver main socket */
1876 /* console main socket */
1883 getrlimit(RLIMIT_NOFILE
, &rl
);
1884 if (rl
.rlim_cur
<= requiredFDsCount
) {
1885 warnlog("Warning, this configuration can use more than %d file descriptors, web server and console connections not included, and the current limit is %d.", std::to_string(requiredFDsCount
), std::to_string(rl
.rlim_cur
));
1887 warnlog("You can increase this value by using LimitNOFILE= in the systemd unit file or ulimit.");
1889 warnlog("You can increase this value by using ulimit.");
1896 vector
<string
> locals
;
1897 vector
<string
> remotes
;
1898 bool checkConfig
{false};
1899 bool beDaemon
{false};
1900 bool beClient
{false};
1901 bool beSupervised
{false};
1909 std::atomic
<bool> g_configurationDone
{false};
1911 int main(int argc
, char** argv
)
1914 size_t udpBindsCount
= 0;
1915 size_t tcpBindsCount
= 0;
1916 rl_attempted_completion_function
= my_completion
;
1917 rl_completion_append_character
= 0;
1919 signal(SIGPIPE
, SIG_IGN
);
1920 signal(SIGCHLD
, SIG_IGN
);
1921 openlog("dnsdist", LOG_PID
, LOG_DAEMON
);
1924 #ifdef HAVE_LIBSODIUM
1925 if (sodium_init() == -1) {
1926 cerr
<<"Unable to initialize crypto library"<<endl
;
1929 g_hashperturb
=randombytes_uniform(0xffffffff);
1930 srandom(randombytes_uniform(0xffffffff));
1934 gettimeofday(&tv
, 0);
1935 srandom(tv
.tv_sec
^ tv
.tv_usec
^ getpid());
1936 g_hashperturb
=random();
1940 ComboAddress clientAddress
= ComboAddress();
1941 g_cmdLine
.config
=SYSCONFDIR
"/dnsdist.conf";
1942 struct option longopts
[]={
1943 {"acl", required_argument
, 0, 'a'},
1944 {"config", required_argument
, 0, 'C'},
1945 {"check-config", 0, 0, 1},
1946 {"execute", required_argument
, 0, 'e'},
1947 {"client", 0, 0, 'c'},
1948 {"gid", required_argument
, 0, 'g'},
1949 #ifdef HAVE_LIBSODIUM
1950 {"setkey", required_argument
, 0, 'k'},
1952 {"local", required_argument
, 0, 'l'},
1953 {"daemon", 0, 0, 'd'},
1954 {"pidfile", required_argument
, 0, 'p'},
1955 {"supervised", 0, 0, 's'},
1956 {"disable-syslog", 0, 0, 2},
1957 {"uid", required_argument
, 0, 'u'},
1958 {"verbose", 0, 0, 'v'},
1959 {"version", 0, 0, 'V'},
1960 {"help", 0, 0, 'h'},
1966 #ifdef HAVE_LIBSODIUM
1967 int c
=getopt_long(argc
, argv
, "a:hcde:C:k:l:vp:g:u:V", longopts
, &longindex
);
1969 int c
=getopt_long(argc
, argv
, "a:hcde:C:l:vp:g:u:V", longopts
, &longindex
);
1975 g_cmdLine
.checkConfig
=true;
1981 g_cmdLine
.config
=optarg
;
1984 g_cmdLine
.beClient
=true;
1987 g_cmdLine
.beDaemon
=true;
1990 g_cmdLine
.command
=optarg
;
1993 g_cmdLine
.gid
=optarg
;
1996 cout
<<"dnsdist "<<VERSION
<<endl
;
1998 cout
<<"Syntax: dnsdist [-C,--config file] [-c,--client [IP[:PORT]]] [-d,--daemon]\n";
1999 cout
<<"[-p,--pidfile file] [-e,--execute cmd] [-h,--help] [-l,--local addr]\n";
2000 cout
<<"[-v,--verbose] [--check-config]\n";
2002 cout
<<"-a,--acl netmask Add this netmask to the ACL\n";
2003 cout
<<"-C,--config file Load configuration from 'file'\n";
2004 cout
<<"-c,--client Operate as a client, connect to dnsdist. This reads\n";
2005 cout
<<" controlSocket from your configuration file, but also\n";
2006 cout
<<" accepts an IP:PORT argument\n";
2007 #ifdef HAVE_LIBSODIUM
2008 cout
<<"-k,--setkey KEY Use KEY for encrypted communication to dnsdist. This\n";
2009 cout
<<" is similar to setting setKey in the configuration file.\n";
2010 cout
<<" NOTE: this will leak this key in your shell's history!\n";
2012 cout
<<"--check-config Validate the configuration file and exit. The exit-code\n";
2013 cout
<<" reflects the validation, 0 is OK, 1 means an error.\n";
2014 cout
<<" Any errors are printed as well.\n";
2015 cout
<<"-d,--daemon Operate as a daemon\n";
2016 cout
<<"-e,--execute cmd Connect to dnsdist and execute 'cmd'\n";
2017 cout
<<"-g,--gid gid Change the process group ID after binding sockets\n";
2018 cout
<<"-h,--help Display this helpful message\n";
2019 cout
<<"-l,--local address Listen on this local address\n";
2020 cout
<<"--supervised Don't open a console, I'm supervised\n";
2021 cout
<<" (use with e.g. systemd and daemontools)\n";
2022 cout
<<"--disable-syslog Don't log to syslog, only to stdout\n";
2023 cout
<<" (use with e.g. systemd)\n";
2024 cout
<<"-p,--pidfile file Write a pidfile, works only with --daemon\n";
2025 cout
<<"-u,--uid uid Change the process user ID after binding sockets\n";
2026 cout
<<"-v,--verbose Enable verbose mode\n";
2032 g_ACL
.modify([optstring
](NetmaskGroup
& nmg
) { nmg
.addMask(optstring
); });
2034 #ifdef HAVE_LIBSODIUM
2036 if (B64Decode(string(optarg
), g_key
) < 0) {
2037 cerr
<<"Unable to decode key '"<<optarg
<<"'."<<endl
;
2043 g_cmdLine
.locals
.push_back(trim_copy(string(optarg
)));
2046 g_cmdLine
.pidfile
=optarg
;
2049 g_cmdLine
.beSupervised
=true;
2052 g_cmdLine
.uid
=optarg
;
2058 #ifdef LUAJIT_VERSION
2059 cout
<<"dnsdist "<<VERSION
<<" ("<<LUA_RELEASE
<<" ["<<LUAJIT_VERSION
<<"])"<<endl
;
2061 cout
<<"dnsdist "<<VERSION
<<" ("<<LUA_RELEASE
<<")"<<endl
;
2063 cout
<<"Enabled features: ";
2064 #ifdef HAVE_DNSCRYPT
2070 #ifdef HAVE_LIBSODIUM
2073 #ifdef HAVE_PROTOBUF
2079 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
2080 cout
<<"recvmmsg/sendmmsg ";
2082 #ifdef HAVE_NET_SNMP
2096 for(auto p
= argv
; *p
; ++p
) {
2097 if(g_cmdLine
.beClient
) {
2098 clientAddress
= ComboAddress(*p
, 5199);
2100 g_cmdLine
.remotes
.push_back(*p
);
2104 ServerPolicy leastOutstandingPol
{"leastOutstanding", leastOutstanding
};
2106 g_policy
.setState(leastOutstandingPol
);
2107 if(g_cmdLine
.beClient
|| !g_cmdLine
.command
.empty()) {
2108 setupLua(true, g_cmdLine
.config
);
2109 if (clientAddress
!= ComboAddress())
2110 g_serverControl
= clientAddress
;
2111 doClient(g_serverControl
, g_cmdLine
.command
);
2112 _exit(EXIT_SUCCESS
);
2115 auto acl
= g_ACL
.getCopy();
2117 for(auto& addr
: {"127.0.0.0/8", "10.0.0.0/8", "100.64.0.0/10", "169.254.0.0/16", "192.168.0.0/16", "172.16.0.0/12", "::1/128", "fc00::/7", "fe80::/10"})
2119 g_ACL
.setState(acl
);
2122 if (g_cmdLine
.checkConfig
) {
2123 setupLua(true, g_cmdLine
.config
);
2124 // No exception was thrown
2125 infolog("Configuration '%s' OK!", g_cmdLine
.config
);
2126 _exit(EXIT_SUCCESS
);
2129 auto todo
=setupLua(false, g_cmdLine
.config
);
2131 if(g_cmdLine
.locals
.size()) {
2133 for(auto loc
: g_cmdLine
.locals
)
2134 g_locals
.push_back(std::make_tuple(ComboAddress(loc
, 53), true, false, 0, "", std::set
<int>()));
2137 if(g_locals
.empty())
2138 g_locals
.push_back(std::make_tuple(ComboAddress("127.0.0.1", 53), true, false, 0, "", std::set
<int>()));
2140 g_configurationDone
= true;
2142 vector
<ClientState
*> toLaunch
;
2143 for(const auto& local
: g_locals
) {
2144 ClientState
* cs
= new ClientState
;
2145 cs
->local
= std::get
<0>(local
);
2146 cs
->udpFD
= SSocket(cs
->local
.sin4
.sin_family
, SOCK_DGRAM
, 0);
2147 if(cs
->local
.sin4
.sin_family
== AF_INET6
) {
2148 SSetsockopt(cs
->udpFD
, IPPROTO_IPV6
, IPV6_V6ONLY
, 1);
2150 //if(g_vm.count("bind-non-local"))
2151 bindAny(cs
->local
.sin4
.sin_family
, cs
->udpFD
);
2153 // if (!setSocketTimestamps(cs->udpFD))
2154 // L<<Logger::Warning<<"Unable to enable timestamp reporting for socket"<<endl;
2157 if(IsAnyAddress(cs
->local
)) {
2159 setsockopt(cs
->udpFD
, IPPROTO_IP
, GEN_IP_PKTINFO
, &one
, sizeof(one
)); // linux supports this, so why not - might fail on other systems
2160 #ifdef IPV6_RECVPKTINFO
2161 setsockopt(cs
->udpFD
, IPPROTO_IPV6
, IPV6_RECVPKTINFO
, &one
, sizeof(one
));
2165 if (std::get
<2>(local
)) {
2167 SSetsockopt(cs
->udpFD
, SOL_SOCKET
, SO_REUSEPORT
, 1);
2169 warnlog("SO_REUSEPORT has been configured on local address '%s' but is not supported", std::get
<0>(local
).toStringWithPort());
2173 const std::string
& itf
= std::get
<4>(local
);
2175 #ifdef SO_BINDTODEVICE
2176 int res
= setsockopt(cs
->udpFD
, SOL_SOCKET
, SO_BINDTODEVICE
, itf
.c_str(), itf
.length());
2178 warnlog("Error setting up the interface on local address '%s': %s", std::get
<0>(local
).toStringWithPort(), strerror(errno
));
2181 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", std::get
<0>(local
).toStringWithPort());
2186 if (g_defaultBPFFilter
) {
2187 cs
->attachFilter(g_defaultBPFFilter
);
2188 vinfolog("Attaching default BPF Filter to UDP frontend %s", cs
->local
.toStringWithPort());
2190 #endif /* HAVE_EBPF */
2192 cs
->cpus
= std::get
<5>(local
);
2194 SBind(cs
->udpFD
, cs
->local
);
2195 toLaunch
.push_back(cs
);
2196 g_frontends
.push_back(cs
);
2200 for(const auto& local
: g_locals
) {
2201 if(!std::get
<1>(local
)) { // no TCP/IP
2202 warnlog("Not providing TCP/IP service on local address '%s'", std::get
<0>(local
).toStringWithPort());
2205 ClientState
* cs
= new ClientState
;
2206 cs
->local
= std::get
<0>(local
);
2208 cs
->tcpFD
= SSocket(cs
->local
.sin4
.sin_family
, SOCK_STREAM
, 0);
2210 SSetsockopt(cs
->tcpFD
, SOL_SOCKET
, SO_REUSEADDR
, 1);
2211 #ifdef TCP_DEFER_ACCEPT
2212 SSetsockopt(cs
->tcpFD
, SOL_TCP
,TCP_DEFER_ACCEPT
, 1);
2214 if (std::get
<3>(local
) > 0) {
2216 SSetsockopt(cs
->tcpFD
, IPPROTO_TCP
, TCP_FASTOPEN
, std::get
<3>(local
));
2218 warnlog("TCP Fast Open has been configured on local address '%s' but is not supported", std::get
<0>(local
).toStringWithPort());
2221 if(cs
->local
.sin4
.sin_family
== AF_INET6
) {
2222 SSetsockopt(cs
->tcpFD
, IPPROTO_IPV6
, IPV6_V6ONLY
, 1);
2225 /* no need to warn again if configured but support is not available, we already did for UDP */
2226 if (std::get
<2>(local
)) {
2227 SSetsockopt(cs
->tcpFD
, SOL_SOCKET
, SO_REUSEPORT
, 1);
2231 const std::string
& itf
= std::get
<4>(local
);
2233 #ifdef SO_BINDTODEVICE
2234 int res
= setsockopt(cs
->tcpFD
, SOL_SOCKET
, SO_BINDTODEVICE
, itf
.c_str(), itf
.length());
2236 warnlog("Error setting up the interface on local address '%s': %s", std::get
<0>(local
).toStringWithPort(), strerror(errno
));
2239 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", std::get
<0>(local
).toStringWithPort());
2244 if (g_defaultBPFFilter
) {
2245 cs
->attachFilter(g_defaultBPFFilter
);
2246 vinfolog("Attaching default BPF Filter to TCP frontend %s", cs
->local
.toStringWithPort());
2248 #endif /* HAVE_EBPF */
2250 // if(g_vm.count("bind-non-local"))
2251 bindAny(cs
->local
.sin4
.sin_family
, cs
->tcpFD
);
2252 SBind(cs
->tcpFD
, cs
->local
);
2253 SListen(cs
->tcpFD
, 64);
2254 warnlog("Listening on %s", cs
->local
.toStringWithPort());
2256 toLaunch
.push_back(cs
);
2257 g_frontends
.push_back(cs
);
2261 #ifdef HAVE_DNSCRYPT
2262 for(auto& dcLocal
: g_dnsCryptLocals
) {
2263 ClientState
* cs
= new ClientState
;
2264 cs
->local
= std::get
<0>(dcLocal
);
2265 cs
->dnscryptCtx
= &(std::get
<1>(dcLocal
));
2266 cs
->udpFD
= SSocket(cs
->local
.sin4
.sin_family
, SOCK_DGRAM
, 0);
2267 if(cs
->local
.sin4
.sin_family
== AF_INET6
) {
2268 SSetsockopt(cs
->udpFD
, IPPROTO_IPV6
, IPV6_V6ONLY
, 1);
2270 bindAny(cs
->local
.sin4
.sin_family
, cs
->udpFD
);
2271 if(IsAnyAddress(cs
->local
)) {
2273 setsockopt(cs
->udpFD
, IPPROTO_IP
, GEN_IP_PKTINFO
, &one
, sizeof(one
)); // linux supports this, so why not - might fail on other systems
2274 #ifdef IPV6_RECVPKTINFO
2275 setsockopt(cs
->udpFD
, IPPROTO_IPV6
, IPV6_RECVPKTINFO
, &one
, sizeof(one
));
2278 if (std::get
<2>(dcLocal
)) {
2280 SSetsockopt(cs
->udpFD
, SOL_SOCKET
, SO_REUSEPORT
, 1);
2282 warnlog("SO_REUSEPORT has been configured on local address '%s' but is not supported", std::get
<0>(dcLocal
).toStringWithPort());
2286 const std::string
& itf
= std::get
<4>(dcLocal
);
2288 #ifdef SO_BINDTODEVICE
2289 int res
= setsockopt(cs
->udpFD
, SOL_SOCKET
, SO_BINDTODEVICE
, itf
.c_str(), itf
.length());
2291 warnlog("Error setting up the interface on local address '%s': %s", std::get
<0>(dcLocal
).toStringWithPort(), strerror(errno
));
2294 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", std::get
<0>(dcLocal
).toStringWithPort());
2299 if (g_defaultBPFFilter
) {
2300 cs
->attachFilter(g_defaultBPFFilter
);
2301 vinfolog("Attaching default BPF Filter to UDP DNSCrypt frontend %s", cs
->local
.toStringWithPort());
2303 #endif /* HAVE_EBPF */
2304 SBind(cs
->udpFD
, cs
->local
);
2305 toLaunch
.push_back(cs
);
2306 g_frontends
.push_back(cs
);
2309 cs
= new ClientState
;
2310 cs
->local
= std::get
<0>(dcLocal
);
2311 cs
->dnscryptCtx
= &(std::get
<1>(dcLocal
));
2312 cs
->tcpFD
= SSocket(cs
->local
.sin4
.sin_family
, SOCK_STREAM
, 0);
2313 SSetsockopt(cs
->tcpFD
, SOL_SOCKET
, SO_REUSEADDR
, 1);
2314 #ifdef TCP_DEFER_ACCEPT
2315 SSetsockopt(cs
->tcpFD
, SOL_TCP
,TCP_DEFER_ACCEPT
, 1);
2317 if (std::get
<3>(dcLocal
) > 0) {
2319 SSetsockopt(cs
->tcpFD
, IPPROTO_TCP
, TCP_FASTOPEN
, std::get
<3>(dcLocal
));
2321 warnlog("TCP Fast Open has been configured on local address '%s' but is not supported", std::get
<0>(dcLocal
).toStringWithPort());
2326 /* no need to warn again if configured but support is not available, we already did for UDP */
2327 if (std::get
<2>(dcLocal
)) {
2328 SSetsockopt(cs
->tcpFD
, SOL_SOCKET
, SO_REUSEPORT
, 1);
2333 #ifdef SO_BINDTODEVICE
2334 int res
= setsockopt(cs
->tcpFD
, SOL_SOCKET
, SO_BINDTODEVICE
, itf
.c_str(), itf
.length());
2336 warnlog("Error setting up the interface on local address '%s': %s", std::get
<0>(dcLocal
).toStringWithPort(), strerror(errno
));
2339 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", std::get
<0>(dcLocal
).toStringWithPort());
2343 if(cs
->local
.sin4
.sin_family
== AF_INET6
) {
2344 SSetsockopt(cs
->tcpFD
, IPPROTO_IPV6
, IPV6_V6ONLY
, 1);
2347 if (g_defaultBPFFilter
) {
2348 cs
->attachFilter(g_defaultBPFFilter
);
2349 vinfolog("Attaching default BPF Filter to TCP DNSCrypt frontend %s", cs
->local
.toStringWithPort());
2351 #endif /* HAVE_EBPF */
2353 cs
->cpus
= std::get
<5>(dcLocal
);
2355 bindAny(cs
->local
.sin4
.sin_family
, cs
->tcpFD
);
2356 SBind(cs
->tcpFD
, cs
->local
);
2357 SListen(cs
->tcpFD
, 64);
2358 warnlog("Listening on %s", cs
->local
.toStringWithPort());
2359 toLaunch
.push_back(cs
);
2360 g_frontends
.push_back(cs
);
2365 if(g_cmdLine
.beDaemon
) {
2368 writepid(g_cmdLine
.pidfile
);
2371 vinfolog("Running in the foreground");
2372 warnlog("dnsdist %s comes with ABSOLUTELY NO WARRANTY. This is free software, and you are welcome to redistribute it according to the terms of the GPL version 2", VERSION
);
2375 g_ACL
.getCopy().toStringVector(&vec
);
2376 for(const auto& s
: vec
) {
2381 infolog("ACL allowing queries from: %s", acls
.c_str());
2387 if(!g_cmdLine
.gid
.empty())
2388 newgid
= strToGID(g_cmdLine
.gid
.c_str());
2390 if(!g_cmdLine
.uid
.empty())
2391 newuid
= strToUID(g_cmdLine
.uid
.c_str());
2393 dropGroupPrivs(newgid
);
2394 dropUserPrivs(newuid
);
2396 /* this need to be done _after_ dropping privileges */
2397 g_delay
= new DelayPipe
<DelayedPacket
>();
2403 g_tcpclientthreads
= std::make_shared
<TCPClientCollection
>(g_maxTCPClientThreads
, g_useTCPSinglePipe
);
2408 auto localPools
= g_pools
.getCopy();
2409 /* create the default pool no matter what */
2410 createPoolIfNotExists(localPools
, "");
2411 if(g_cmdLine
.remotes
.size()) {
2412 for(const auto& address
: g_cmdLine
.remotes
) {
2413 auto ret
=std::make_shared
<DownstreamState
>(ComboAddress(address
, 53));
2414 addServerToPool(localPools
, "", ret
);
2415 if (ret
->connected
) {
2416 ret
->tid
= thread(responderThread
, ret
);
2418 g_dstates
.modify([ret
](servers_t
& servers
) { servers
.push_back(ret
); });
2421 g_pools
.setState(localPools
);
2423 if(g_dstates
.getCopy().empty()) {
2424 errlog("No downstream servers defined: all packets will get dropped");
2425 // you might define them later, but you need to know
2428 checkFileDescriptorsLimits(udpBindsCount
, tcpBindsCount
);
2430 for(auto& dss
: g_dstates
.getCopy()) { // it is a copy, but the internal shared_ptrs are the real deal
2431 if(dss
->availability
==DownstreamState::Availability::Auto
) {
2432 bool newState
=upCheck(*dss
);
2433 warnlog("Marking downstream %s as '%s'", dss
->getNameWithAddr(), newState
? "up" : "down");
2434 dss
->upStatus
= newState
;
2438 for(auto& cs
: toLaunch
) {
2439 if (cs
->udpFD
>= 0) {
2440 thread
t1(udpClientThread
, cs
);
2441 if (!cs
->cpus
.empty()) {
2442 mapThreadToCPUList(t1
.native_handle(), cs
->cpus
);
2446 else if (cs
->tcpFD
>= 0) {
2447 thread
t1(tcpAcceptorThread
, cs
);
2448 if (!cs
->cpus
.empty()) {
2449 mapThreadToCPUList(t1
.native_handle(), cs
->cpus
);
2455 thread
carbonthread(carbonDumpThread
);
2456 carbonthread
.detach();
2458 thread
stattid(maintThread
);
2461 thread
healththread(healthChecksThread
);
2463 if(g_cmdLine
.beDaemon
|| g_cmdLine
.beSupervised
) {
2465 sd_notify(0, "READY=1");
2467 healththread
.join();
2470 healththread
.detach();
2473 _exit(EXIT_SUCCESS
);
2476 catch(const LuaContext::ExecutionErrorException
& e
) {
2478 errlog("Fatal Lua error: %s", e
.what());
2479 std::rethrow_if_nested(e
);
2480 } catch(const std::exception
& e
) {
2481 errlog("Details: %s", e
.what());
2483 catch(PDNSException
&ae
)
2485 errlog("Fatal pdns error: %s", ae
.reason
);
2487 _exit(EXIT_FAILURE
);
2489 catch(std::exception
&e
)
2491 errlog("Fatal error: %s", e
.what());
2492 _exit(EXIT_FAILURE
);
2494 catch(PDNSException
&ae
)
2496 errlog("Fatal pdns error: %s", ae
.reason
);
2497 _exit(EXIT_FAILURE
);