]> git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/dnsdist.cc
dnsdist: Add steal, iowait and UDP errors metrics
[thirdparty/pdns.git] / pdns / dnsdist.cc
1 /*
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
8 *
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
22
23 #include "config.h"
24
25 #include <fstream>
26 #include <getopt.h>
27 #include <grp.h>
28 #include <limits>
29 #include <netinet/tcp.h>
30 #include <pwd.h>
31 #include <sys/resource.h>
32 #include <unistd.h>
33
34 #if defined (__OpenBSD__) || defined(__NetBSD__)
35 #include <readline/readline.h>
36 #else
37 #include <editline/readline.h>
38 #endif
39
40 #ifdef HAVE_SYSTEMD
41 #include <systemd/sd-daemon.h>
42 #endif
43
44 #include "dnsdist.hh"
45 #include "dnsdist-cache.hh"
46 #include "dnsdist-console.hh"
47 #include "dnsdist-ecs.hh"
48 #include "dnsdist-healthchecks.hh"
49 #include "dnsdist-lua.hh"
50 #include "dnsdist-rings.hh"
51 #include "dnsdist-secpoll.hh"
52 #include "dnsdist-xpf.hh"
53
54 #include "base64.hh"
55 #include "delaypipe.hh"
56 #include "dolog.hh"
57 #include "dnsname.hh"
58 #include "dnsparser.hh"
59 #include "ednsoptions.hh"
60 #include "gettime.hh"
61 #include "lock.hh"
62 #include "misc.hh"
63 #include "sodcrypto.hh"
64 #include "sstuff.hh"
65 #include "threadname.hh"
66
67 /* Known sins:
68
69 Receiver is currently single threaded
70 not *that* bad actually, but now that we are thread safe, might want to scale
71 */
72
73 /* the Rulaction plan
74 Set of Rules, if one matches, it leads to an Action
75 Both rules and actions could conceivably be Lua based.
76 On the C++ side, both could be inherited from a class Rule and a class Action,
77 on the Lua side we can't do that. */
78
79 using std::atomic;
80 using std::thread;
81 bool g_verbose;
82
83 struct DNSDistStats g_stats;
84
85 uint16_t g_maxOutstanding{std::numeric_limits<uint16_t>::max()};
86 uint32_t g_staleCacheEntriesTTL{0};
87 bool g_syslog{true};
88 bool g_allowEmptyResponse{false};
89
90 GlobalStateHolder<NetmaskGroup> g_ACL;
91 string g_outputBuffer;
92
93 std::vector<std::shared_ptr<TLSFrontend>> g_tlslocals;
94 std::vector<std::shared_ptr<DOHFrontend>> g_dohlocals;
95 std::vector<std::shared_ptr<DNSCryptContext>> g_dnsCryptLocals;
96 #ifdef HAVE_EBPF
97 shared_ptr<BPFFilter> g_defaultBPFFilter;
98 std::vector<std::shared_ptr<DynBPFFilter> > g_dynBPFFilters;
99 #endif /* HAVE_EBPF */
100 std::vector<std::unique_ptr<ClientState>> g_frontends;
101 GlobalStateHolder<pools_t> g_pools;
102 size_t g_udpVectorSize{1};
103
104 bool g_snmpEnabled{false};
105 bool g_snmpTrapsEnabled{false};
106 DNSDistSNMPAgent* g_snmpAgent{nullptr};
107
108 /* UDP: the grand design. Per socket we listen on for incoming queries there is one thread.
109 Then we have a bunch of connected sockets for talking to downstream servers.
110 We send directly to those sockets.
111
112 For the return path, per downstream server we have a thread that listens to responses.
113
114 Per socket there is an array of 2^16 states, when we send out a packet downstream, we note
115 there the original requestor and the original id. The new ID is the offset in the array.
116
117 When an answer comes in on a socket, we look up the offset by the id, and lob it to the
118 original requestor.
119
120 IDs are assigned by atomic increments of the socket offset.
121 */
122
123 GlobalStateHolder<vector<DNSDistRuleAction> > g_rulactions;
124 GlobalStateHolder<vector<DNSDistResponseRuleAction> > g_resprulactions;
125 GlobalStateHolder<vector<DNSDistResponseRuleAction> > g_cachehitresprulactions;
126 GlobalStateHolder<vector<DNSDistResponseRuleAction> > g_selfansweredresprulactions;
127
128 Rings g_rings;
129 QueryCount g_qcount;
130
131 GlobalStateHolder<servers_t> g_dstates;
132 GlobalStateHolder<NetmaskTree<DynBlock>> g_dynblockNMG;
133 GlobalStateHolder<SuffixMatchTree<DynBlock>> g_dynblockSMT;
134 DNSAction::Action g_dynBlockAction = DNSAction::Action::Drop;
135 int g_tcpRecvTimeout{2};
136 int g_tcpSendTimeout{2};
137 int g_udpTimeout{2};
138
139 bool g_servFailOnNoPolicy{false};
140 bool g_truncateTC{false};
141 bool g_fixupCase{false};
142 bool g_preserveTrailingData{false};
143 bool g_roundrobinFailOnNoServer{false};
144
145 std::set<std::string> g_capabilitiesToRetain;
146
147 static void truncateTC(char* packet, uint16_t* len, size_t responseSize, unsigned int consumed)
148 try
149 {
150 bool hadEDNS = false;
151 uint16_t payloadSize = 0;
152 uint16_t z = 0;
153
154 if (g_addEDNSToSelfGeneratedResponses) {
155 hadEDNS = getEDNSUDPPayloadSizeAndZ(packet, *len, &payloadSize, &z);
156 }
157
158 *len=static_cast<uint16_t>(sizeof(dnsheader)+consumed+DNS_TYPE_SIZE+DNS_CLASS_SIZE);
159 struct dnsheader* dh = reinterpret_cast<struct dnsheader*>(packet);
160 dh->ancount = dh->arcount = dh->nscount = 0;
161
162 if (hadEDNS) {
163 addEDNS(dh, *len, responseSize, z & EDNS_HEADER_FLAG_DO, payloadSize, 0);
164 }
165 }
166 catch(...)
167 {
168 g_stats.truncFail++;
169 }
170
171 struct DelayedPacket
172 {
173 int fd;
174 string packet;
175 ComboAddress destination;
176 ComboAddress origDest;
177 void operator()()
178 {
179 ssize_t res;
180 if(origDest.sin4.sin_family == 0) {
181 res = sendto(fd, packet.c_str(), packet.size(), 0, (struct sockaddr*)&destination, destination.getSocklen());
182 }
183 else {
184 res = sendfromto(fd, packet.c_str(), packet.size(), 0, origDest, destination);
185 }
186 if (res == -1) {
187 int err = errno;
188 vinfolog("Error sending delayed response to %s: %s", destination.toStringWithPort(), strerror(err));
189 }
190 }
191 };
192
193 DelayPipe<DelayedPacket>* g_delay = nullptr;
194
195 void doLatencyStats(double udiff)
196 {
197 if(udiff < 1000) ++g_stats.latency0_1;
198 else if(udiff < 10000) ++g_stats.latency1_10;
199 else if(udiff < 50000) ++g_stats.latency10_50;
200 else if(udiff < 100000) ++g_stats.latency50_100;
201 else if(udiff < 1000000) ++g_stats.latency100_1000;
202 else ++g_stats.latencySlow;
203 g_stats.latencySum += udiff / 1000;
204
205 auto doAvg = [](double& var, double n, double weight) {
206 var = (weight -1) * var/weight + n/weight;
207 };
208
209 doAvg(g_stats.latencyAvg100, udiff, 100);
210 doAvg(g_stats.latencyAvg1000, udiff, 1000);
211 doAvg(g_stats.latencyAvg10000, udiff, 10000);
212 doAvg(g_stats.latencyAvg1000000, udiff, 1000000);
213 }
214
215 bool responseContentMatches(const char* response, const uint16_t responseLen, const DNSName& qname, const uint16_t qtype, const uint16_t qclass, const ComboAddress& remote, unsigned int& consumed)
216 {
217 if (responseLen < sizeof(dnsheader)) {
218 return false;
219 }
220
221 const struct dnsheader* dh = reinterpret_cast<const struct dnsheader*>(response);
222 if (dh->qdcount == 0) {
223 if ((dh->rcode != RCode::NoError && dh->rcode != RCode::NXDomain) || g_allowEmptyResponse) {
224 return true;
225 }
226 else {
227 ++g_stats.nonCompliantResponses;
228 return false;
229 }
230 }
231
232 uint16_t rqtype, rqclass;
233 DNSName rqname;
234 try {
235 rqname=DNSName(response, responseLen, sizeof(dnsheader), false, &rqtype, &rqclass, &consumed);
236 }
237 catch(const std::exception& e) {
238 if(responseLen > 0 && static_cast<size_t>(responseLen) > sizeof(dnsheader)) {
239 infolog("Backend %s sent us a response with id %d that did not parse: %s", remote.toStringWithPort(), ntohs(dh->id), e.what());
240 }
241 ++g_stats.nonCompliantResponses;
242 return false;
243 }
244
245 if (rqtype != qtype || rqclass != qclass || rqname != qname) {
246 return false;
247 }
248
249 return true;
250 }
251
252 static void restoreFlags(struct dnsheader* dh, uint16_t origFlags)
253 {
254 static const uint16_t rdMask = 1 << FLAGS_RD_OFFSET;
255 static const uint16_t cdMask = 1 << FLAGS_CD_OFFSET;
256 static const uint16_t restoreFlagsMask = UINT16_MAX & ~(rdMask | cdMask);
257 uint16_t * flags = getFlagsFromDNSHeader(dh);
258 /* clear the flags we are about to restore */
259 *flags &= restoreFlagsMask;
260 /* only keep the flags we want to restore */
261 origFlags &= ~restoreFlagsMask;
262 /* set the saved flags as they were */
263 *flags |= origFlags;
264 }
265
266 static bool fixUpQueryTurnedResponse(DNSQuestion& dq, const uint16_t origFlags)
267 {
268 restoreFlags(dq.dh, origFlags);
269
270 return addEDNSToQueryTurnedResponse(dq);
271 }
272
273 static bool fixUpResponse(char** response, uint16_t* responseLen, size_t* responseSize, const DNSName& qname, uint16_t origFlags, bool ednsAdded, bool ecsAdded, std::vector<uint8_t>& rewrittenResponse, uint16_t addRoom, bool* zeroScope)
274 {
275 if (*responseLen < sizeof(dnsheader)) {
276 return false;
277 }
278
279 struct dnsheader* dh = reinterpret_cast<struct dnsheader*>(*response);
280 restoreFlags(dh, origFlags);
281
282 if (*responseLen == sizeof(dnsheader)) {
283 return true;
284 }
285
286 if(g_fixupCase) {
287 string realname = qname.toDNSString();
288 if (*responseLen >= (sizeof(dnsheader) + realname.length())) {
289 memcpy(*response + sizeof(dnsheader), realname.c_str(), realname.length());
290 }
291 }
292
293 if (ednsAdded || ecsAdded) {
294 uint16_t optStart;
295 size_t optLen = 0;
296 bool last = false;
297
298 const std::string responseStr(*response, *responseLen);
299 int res = locateEDNSOptRR(responseStr, &optStart, &optLen, &last);
300
301 if (res == 0) {
302 if (zeroScope) { // this finds if an EDNS Client Subnet scope was set, and if it is 0
303 size_t optContentStart = 0;
304 uint16_t optContentLen = 0;
305 /* we need at least 4 bytes after the option length (family: 2, source prefix-length: 1, scope prefix-length: 1) */
306 if (isEDNSOptionInOpt(responseStr, optStart, optLen, EDNSOptionCode::ECS, &optContentStart, &optContentLen) && optContentLen >= 4) {
307 /* see if the EDNS Client Subnet SCOPE PREFIX-LENGTH byte in position 3 is set to 0, which is the only thing
308 we care about. */
309 *zeroScope = responseStr.at(optContentStart + 3) == 0;
310 }
311 }
312
313 if (ednsAdded) {
314 /* we added the entire OPT RR,
315 therefore we need to remove it entirely */
316 if (last) {
317 /* simply remove the last AR */
318 *responseLen -= optLen;
319 uint16_t arcount = ntohs(dh->arcount);
320 arcount--;
321 dh->arcount = htons(arcount);
322 }
323 else {
324 /* Removing an intermediary RR could lead to compression error */
325 if (rewriteResponseWithoutEDNS(responseStr, rewrittenResponse) == 0) {
326 *responseLen = rewrittenResponse.size();
327 if (addRoom && (UINT16_MAX - *responseLen) > addRoom) {
328 rewrittenResponse.reserve(*responseLen + addRoom);
329 }
330 *responseSize = rewrittenResponse.capacity();
331 *response = reinterpret_cast<char*>(rewrittenResponse.data());
332 }
333 else {
334 warnlog("Error rewriting content");
335 }
336 }
337 }
338 else {
339 /* the OPT RR was already present, but without ECS,
340 we need to remove the ECS option if any */
341 if (last) {
342 /* nothing after the OPT RR, we can simply remove the
343 ECS option */
344 size_t existingOptLen = optLen;
345 removeEDNSOptionFromOPT(*response + optStart, &optLen, EDNSOptionCode::ECS);
346 *responseLen -= (existingOptLen - optLen);
347 }
348 else {
349 /* Removing an intermediary RR could lead to compression error */
350 if (rewriteResponseWithoutEDNSOption(responseStr, EDNSOptionCode::ECS, rewrittenResponse) == 0) {
351 *responseLen = rewrittenResponse.size();
352 if (addRoom && (UINT16_MAX - *responseLen) > addRoom) {
353 rewrittenResponse.reserve(*responseLen + addRoom);
354 }
355 *responseSize = rewrittenResponse.capacity();
356 *response = reinterpret_cast<char*>(rewrittenResponse.data());
357 }
358 else {
359 warnlog("Error rewriting content");
360 }
361 }
362 }
363 }
364 }
365
366 return true;
367 }
368
369 #ifdef HAVE_DNSCRYPT
370 static bool encryptResponse(char* response, uint16_t* responseLen, size_t responseSize, bool tcp, std::shared_ptr<DNSCryptQuery> dnsCryptQuery, dnsheader** dh, dnsheader* dhCopy)
371 {
372 if (dnsCryptQuery) {
373 uint16_t encryptedResponseLen = 0;
374
375 /* save the original header before encrypting it in place */
376 if (dh != nullptr && *dh != nullptr && dhCopy != nullptr) {
377 memcpy(dhCopy, *dh, sizeof(dnsheader));
378 *dh = dhCopy;
379 }
380
381 int res = dnsCryptQuery->encryptResponse(response, *responseLen, responseSize, tcp, &encryptedResponseLen);
382 if (res == 0) {
383 *responseLen = encryptedResponseLen;
384 } else {
385 /* dropping response */
386 vinfolog("Error encrypting the response, dropping.");
387 return false;
388 }
389 }
390 return true;
391 }
392 #endif /* HAVE_DNSCRYPT */
393
394 static bool applyRulesToResponse(LocalStateHolder<vector<DNSDistResponseRuleAction> >& localRespRulactions, DNSResponse& dr)
395 {
396 DNSResponseAction::Action action=DNSResponseAction::Action::None;
397 std::string ruleresult;
398 for(const auto& lr : *localRespRulactions) {
399 if(lr.d_rule->matches(&dr)) {
400 lr.d_rule->d_matches++;
401 action=(*lr.d_action)(&dr, &ruleresult);
402 switch(action) {
403 case DNSResponseAction::Action::Allow:
404 return true;
405 break;
406 case DNSResponseAction::Action::Drop:
407 return false;
408 break;
409 case DNSResponseAction::Action::HeaderModify:
410 return true;
411 break;
412 case DNSResponseAction::Action::ServFail:
413 dr.dh->rcode = RCode::ServFail;
414 return true;
415 break;
416 /* non-terminal actions follow */
417 case DNSResponseAction::Action::Delay:
418 dr.delayMsec = static_cast<int>(pdns_stou(ruleresult)); // sorry
419 break;
420 case DNSResponseAction::Action::None:
421 break;
422 }
423 }
424 }
425
426 return true;
427 }
428
429 bool processResponse(char** response, uint16_t* responseLen, size_t* responseSize, LocalStateHolder<vector<DNSDistResponseRuleAction> >& localRespRulactions, DNSResponse& dr, size_t addRoom, std::vector<uint8_t>& rewrittenResponse, bool muted)
430 {
431 if (!applyRulesToResponse(localRespRulactions, dr)) {
432 return false;
433 }
434
435 bool zeroScope = false;
436 if (!fixUpResponse(response, responseLen, responseSize, *dr.qname, dr.origFlags, dr.ednsAdded, dr.ecsAdded, rewrittenResponse, addRoom, dr.useZeroScope ? &zeroScope : nullptr)) {
437 return false;
438 }
439
440 if (dr.packetCache && !dr.skipCache && *responseLen <= s_maxPacketCacheEntrySize) {
441 if (!dr.useZeroScope) {
442 /* if the query was not suitable for zero-scope, for
443 example because it had an existing ECS entry so the hash is
444 not really 'no ECS', so just insert it for the existing subnet
445 since:
446 - we don't have the correct hash for a non-ECS query
447 - inserting with hash computed before the ECS replacement but with
448 the subnet extracted _after_ the replacement would not work.
449 */
450 zeroScope = false;
451 }
452 // if zeroScope, pass the pre-ECS hash-key and do not pass the subnet to the cache
453 dr.packetCache->insert(zeroScope ? dr.cacheKeyNoECS : dr.cacheKey, zeroScope ? boost::none : dr.subnet, dr.origFlags, dr.dnssecOK, *dr.qname, dr.qtype, dr.qclass, *response, *responseLen, dr.tcp, dr.dh->rcode, dr.tempFailureTTL);
454 }
455
456 #ifdef HAVE_DNSCRYPT
457 if (!muted) {
458 if (!encryptResponse(*response, responseLen, *responseSize, dr.tcp, dr.dnsCryptQuery, nullptr, nullptr)) {
459 return false;
460 }
461 }
462 #endif /* HAVE_DNSCRYPT */
463
464 return true;
465 }
466
467 static bool sendUDPResponse(int origFD, const char* response, const uint16_t responseLen, const int delayMsec, const ComboAddress& origDest, const ComboAddress& origRemote)
468 {
469 if(delayMsec && g_delay) {
470 DelayedPacket dp{origFD, string(response,responseLen), origRemote, origDest};
471 g_delay->submit(dp, delayMsec);
472 }
473 else {
474 ssize_t res;
475 if(origDest.sin4.sin_family == 0) {
476 res = sendto(origFD, response, responseLen, 0, reinterpret_cast<const struct sockaddr*>(&origRemote), origRemote.getSocklen());
477 }
478 else {
479 res = sendfromto(origFD, response, responseLen, 0, origDest, origRemote);
480 }
481 if (res == -1) {
482 int err = errno;
483 vinfolog("Error sending response to %s: %s", origRemote.toStringWithPort(), stringerror(err));
484 }
485 }
486
487 return true;
488 }
489
490
491 int pickBackendSocketForSending(std::shared_ptr<DownstreamState>& state)
492 {
493 return state->sockets[state->socketsOffset++ % state->sockets.size()];
494 }
495
496 static void pickBackendSocketsReadyForReceiving(const std::shared_ptr<DownstreamState>& state, std::vector<int>& ready)
497 {
498 ready.clear();
499
500 if (state->sockets.size() == 1) {
501 ready.push_back(state->sockets[0]);
502 return ;
503 }
504
505 {
506 std::lock_guard<std::mutex> lock(state->socketsLock);
507 state->mplexer->getAvailableFDs(ready, -1);
508 }
509 }
510
511 // listens on a dedicated socket, lobs answers from downstream servers to original requestors
512 void responderThread(std::shared_ptr<DownstreamState> dss)
513 try {
514 setThreadName("dnsdist/respond");
515 auto localRespRulactions = g_resprulactions.getLocal();
516 char packet[s_maxPacketCacheEntrySize + DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE];
517 static_assert(sizeof(packet) <= UINT16_MAX, "Packet size should fit in a uint16_t");
518 /* when the answer is encrypted in place, we need to get a copy
519 of the original header before encryption to fill the ring buffer */
520 dnsheader cleartextDH;
521 vector<uint8_t> rewrittenResponse;
522
523 uint16_t queryId = 0;
524 std::vector<int> sockets;
525 sockets.reserve(dss->sockets.size());
526
527 for(;;) {
528 dnsheader* dh = reinterpret_cast<struct dnsheader*>(packet);
529 try {
530 pickBackendSocketsReadyForReceiving(dss, sockets);
531 for (const auto& fd : sockets) {
532 ssize_t got = recv(fd, packet, sizeof(packet), 0);
533 char * response = packet;
534 size_t responseSize = sizeof(packet);
535
536 if (got < 0 || static_cast<size_t>(got) < sizeof(dnsheader))
537 continue;
538
539 uint16_t responseLen = static_cast<uint16_t>(got);
540 queryId = dh->id;
541
542 if(queryId >= dss->idStates.size()) {
543 continue;
544 }
545
546 IDState* ids = &dss->idStates[queryId];
547 int64_t usageIndicator = ids->usageIndicator;
548
549 if(!IDState::isInUse(usageIndicator)) {
550 /* the corresponding state is marked as not in use, meaning that:
551 - it was already cleaned up by another thread and the state is gone ;
552 - we already got a response for this query and this one is a duplicate.
553 Either way, we don't touch it.
554 */
555 continue;
556 }
557
558 /* read the potential DOHUnit state as soon as possible, but don't use it
559 until we have confirmed that we own this state by updating usageIndicator */
560 auto du = ids->du;
561 /* setting age to 0 to prevent the maintainer thread from
562 cleaning this IDS while we process the response.
563 */
564 ids->age = 0;
565 int origFD = ids->origFD;
566
567 unsigned int consumed = 0;
568 if (!responseContentMatches(response, responseLen, ids->qname, ids->qtype, ids->qclass, dss->remote, consumed)) {
569 continue;
570 }
571
572 bool isDoH = du != nullptr;
573 /* atomically mark the state as available, but only if it has not been altered
574 in the meantime */
575 if (ids->tryMarkUnused(usageIndicator)) {
576 /* clear the potential DOHUnit asap, it's ours now
577 and since we just marked the state as unused,
578 someone could overwrite it. */
579 ids->du = nullptr;
580 /* we only decrement the outstanding counter if the value was not
581 altered in the meantime, which would mean that the state has been actively reused
582 and the other thread has not incremented the outstanding counter, so we don't
583 want it to be decremented twice. */
584 --dss->outstanding; // you'd think an attacker could game this, but we're using connected socket
585 } else {
586 /* someone updated the state in the meantime, we can't touch the existing pointer */
587 du = nullptr;
588 /* since the state has been updated, we can't safely access it so let's just drop
589 this response */
590 continue;
591 }
592
593 if(dh->tc && g_truncateTC) {
594 truncateTC(response, &responseLen, responseSize, consumed);
595 }
596
597 dh->id = ids->origID;
598
599 uint16_t addRoom = 0;
600 DNSResponse dr = makeDNSResponseFromIDState(*ids, dh, sizeof(packet), responseLen, false);
601 if (dr.dnsCryptQuery) {
602 addRoom = DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE;
603 }
604
605 memcpy(&cleartextDH, dr.dh, sizeof(cleartextDH));
606 if (!processResponse(&response, &responseLen, &responseSize, localRespRulactions, dr, addRoom, rewrittenResponse, ids->cs && ids->cs->muted)) {
607 continue;
608 }
609
610 if (ids->cs && !ids->cs->muted) {
611 if (du) {
612 #ifdef HAVE_DNS_OVER_HTTPS
613 // DoH query
614 du->response = std::string(response, responseLen);
615 if (send(du->rsock, &du, sizeof(du), 0) != sizeof(du)) {
616 /* at this point we have the only remaining pointer on this
617 DOHUnit object since we did set ids->du to nullptr earlier,
618 except if we got the response before the pointer could be
619 released by the frontend */
620 du->release();
621 }
622 #endif /* HAVE_DNS_OVER_HTTPS */
623 du = nullptr;
624 }
625 else {
626 ComboAddress empty;
627 empty.sin4.sin_family = 0;
628 /* if ids->destHarvested is false, origDest holds the listening address.
629 We don't want to use that as a source since it could be 0.0.0.0 for example. */
630 sendUDPResponse(origFD, response, responseLen, dr.delayMsec, ids->destHarvested ? ids->origDest : empty, ids->origRemote);
631 }
632 }
633
634 ++g_stats.responses;
635 if (ids->cs) {
636 ++ids->cs->responses;
637 }
638 ++dss->responses;
639
640 double udiff = ids->sentTime.udiff();
641 vinfolog("Got answer from %s, relayed to %s%s, took %f usec", dss->remote.toStringWithPort(), ids->origRemote.toStringWithPort(),
642 isDoH ? " (https)": "", udiff);
643
644 struct timespec ts;
645 gettime(&ts);
646 g_rings.insertResponse(ts, *dr.remote, *dr.qname, dr.qtype, static_cast<unsigned int>(udiff), static_cast<unsigned int>(got), cleartextDH, dss->remote);
647
648 switch (cleartextDH.rcode) {
649 case RCode::NXDomain:
650 ++g_stats.frontendNXDomain;
651 break;
652 case RCode::ServFail:
653 ++g_stats.servfailResponses;
654 ++g_stats.frontendServFail;
655 break;
656 case RCode::NoError:
657 ++g_stats.frontendNoError;
658 break;
659 }
660 dss->latencyUsec = (127.0 * dss->latencyUsec / 128.0) + udiff/128.0;
661
662 doLatencyStats(udiff);
663
664 rewrittenResponse.clear();
665 }
666 }
667 catch(const std::exception& e){
668 vinfolog("Got an error in UDP responder thread while parsing a response from %s, id %d: %s", dss->remote.toStringWithPort(), queryId, e.what());
669 }
670 }
671 }
672 catch(const std::exception& e)
673 {
674 errlog("UDP responder thread died because of exception: %s", e.what());
675 }
676 catch(const PDNSException& e)
677 {
678 errlog("UDP responder thread died because of PowerDNS exception: %s", e.reason);
679 }
680 catch(...)
681 {
682 errlog("UDP responder thread died because of an exception: %s", "unknown");
683 }
684
685 bool DownstreamState::reconnect()
686 {
687 std::unique_lock<std::mutex> tl(connectLock, std::try_to_lock);
688 if (!tl.owns_lock()) {
689 /* we are already reconnecting */
690 return false;
691 }
692
693 connected = false;
694 for (auto& fd : sockets) {
695 if (fd != -1) {
696 if (sockets.size() > 1) {
697 std::lock_guard<std::mutex> lock(socketsLock);
698 mplexer->removeReadFD(fd);
699 }
700 /* shutdown() is needed to wake up recv() in the responderThread */
701 shutdown(fd, SHUT_RDWR);
702 close(fd);
703 fd = -1;
704 }
705 if (!IsAnyAddress(remote)) {
706 fd = SSocket(remote.sin4.sin_family, SOCK_DGRAM, 0);
707 if (!IsAnyAddress(sourceAddr)) {
708 SSetsockopt(fd, SOL_SOCKET, SO_REUSEADDR, 1);
709 if (!sourceItfName.empty()) {
710 #ifdef SO_BINDTODEVICE
711 int res = setsockopt(fd, SOL_SOCKET, SO_BINDTODEVICE, sourceItfName.c_str(), sourceItfName.length());
712 if (res != 0) {
713 infolog("Error setting up the interface on backend socket '%s': %s", remote.toStringWithPort(), stringerror());
714 }
715 #endif
716 }
717
718 SBind(fd, sourceAddr);
719 }
720 try {
721 SConnect(fd, remote);
722 if (sockets.size() > 1) {
723 std::lock_guard<std::mutex> lock(socketsLock);
724 mplexer->addReadFD(fd, [](int, boost::any) {});
725 }
726 connected = true;
727 }
728 catch(const std::runtime_error& error) {
729 infolog("Error connecting to new server with address %s: %s", remote.toStringWithPort(), error.what());
730 connected = false;
731 break;
732 }
733 }
734 }
735
736 /* if at least one (re-)connection failed, close all sockets */
737 if (!connected) {
738 for (auto& fd : sockets) {
739 if (fd != -1) {
740 if (sockets.size() > 1) {
741 std::lock_guard<std::mutex> lock(socketsLock);
742 mplexer->removeReadFD(fd);
743 }
744 /* shutdown() is needed to wake up recv() in the responderThread */
745 shutdown(fd, SHUT_RDWR);
746 close(fd);
747 fd = -1;
748 }
749 }
750 }
751
752 return connected;
753 }
754 void DownstreamState::hash()
755 {
756 vinfolog("Computing hashes for id=%s and weight=%d", id, weight);
757 auto w = weight;
758 WriteLock wl(&d_lock);
759 hashes.clear();
760 while (w > 0) {
761 std::string uuid = boost::str(boost::format("%s-%d") % id % w);
762 unsigned int wshash = burtleCI((const unsigned char*)uuid.c_str(), uuid.size(), g_hashperturb);
763 hashes.insert(wshash);
764 --w;
765 }
766 }
767
768 void DownstreamState::setId(const boost::uuids::uuid& newId)
769 {
770 id = newId;
771 // compute hashes only if already done
772 if (!hashes.empty()) {
773 hash();
774 }
775 }
776
777 void DownstreamState::setWeight(int newWeight)
778 {
779 if (newWeight < 1) {
780 errlog("Error setting server's weight: downstream weight value must be greater than 0.");
781 return ;
782 }
783 weight = newWeight;
784 if (!hashes.empty()) {
785 hash();
786 }
787 }
788
789 DownstreamState::DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf_, const std::string& sourceItfName_, size_t numberOfSockets, bool connect=true): sourceItfName(sourceItfName_), remote(remote_), sourceAddr(sourceAddr_), sourceItf(sourceItf_)
790 {
791 pthread_rwlock_init(&d_lock, nullptr);
792 id = getUniqueID();
793 threadStarted.clear();
794
795 mplexer = std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
796
797 sockets.resize(numberOfSockets);
798 for (auto& fd : sockets) {
799 fd = -1;
800 }
801
802 if (connect && !IsAnyAddress(remote)) {
803 reconnect();
804 idStates.resize(g_maxOutstanding);
805 sw.start();
806 infolog("Added downstream server %s", remote.toStringWithPort());
807 }
808
809 }
810
811 std::mutex g_luamutex;
812 LuaContext g_lua;
813
814 GlobalStateHolder<ServerPolicy> g_policy;
815
816 shared_ptr<DownstreamState> firstAvailable(const NumberedServerVector& servers, const DNSQuestion* dq)
817 {
818 for(auto& d : servers) {
819 if(d.second->isUp() && d.second->qps.check())
820 return d.second;
821 }
822 return leastOutstanding(servers, dq);
823 }
824
825 // get server with least outstanding queries, and within those, with the lowest order, and within those: the fastest
826 shared_ptr<DownstreamState> leastOutstanding(const NumberedServerVector& servers, const DNSQuestion* dq)
827 {
828 if (servers.size() == 1 && servers[0].second->isUp()) {
829 return servers[0].second;
830 }
831
832 vector<pair<tuple<int,int,double>, shared_ptr<DownstreamState>>> poss;
833 /* so you might wonder, why do we go through this trouble? The data on which we sort could change during the sort,
834 which would suck royally and could even lead to crashes. So first we snapshot on what we sort, and then we sort */
835 poss.reserve(servers.size());
836 for(auto& d : servers) {
837 if(d.second->isUp()) {
838 poss.push_back({make_tuple(d.second->outstanding.load(), d.second->order, d.second->latencyUsec), d.second});
839 }
840 }
841 if(poss.empty())
842 return shared_ptr<DownstreamState>();
843 nth_element(poss.begin(), poss.begin(), poss.end(), [](const decltype(poss)::value_type& a, const decltype(poss)::value_type& b) { return a.first < b.first; });
844 return poss.begin()->second;
845 }
846
847 shared_ptr<DownstreamState> valrandom(unsigned int val, const NumberedServerVector& servers, const DNSQuestion* dq)
848 {
849 vector<pair<int, shared_ptr<DownstreamState>>> poss;
850 int sum = 0;
851 int max = std::numeric_limits<int>::max();
852
853 for(auto& d : servers) { // w=1, w=10 -> 1, 11
854 if(d.second->isUp()) {
855 // Don't overflow sum when adding high weights
856 if(d.second->weight > max - sum) {
857 sum = max;
858 } else {
859 sum += d.second->weight;
860 }
861
862 poss.push_back({sum, d.second});
863 }
864 }
865
866 // Catch poss & sum are empty to avoid SIGFPE
867 if(poss.empty())
868 return shared_ptr<DownstreamState>();
869
870 int r = val % sum;
871 auto p = upper_bound(poss.begin(), poss.end(),r, [](int r_, const decltype(poss)::value_type& a) { return r_ < a.first;});
872 if(p==poss.end())
873 return shared_ptr<DownstreamState>();
874 return p->second;
875 }
876
877 shared_ptr<DownstreamState> wrandom(const NumberedServerVector& servers, const DNSQuestion* dq)
878 {
879 return valrandom(random(), servers, dq);
880 }
881
882 uint32_t g_hashperturb;
883 double g_consistentHashBalancingFactor = 0;
884 shared_ptr<DownstreamState> whashed(const NumberedServerVector& servers, const DNSQuestion* dq)
885 {
886 return valrandom(dq->qname->hash(g_hashperturb), servers, dq);
887 }
888
889 shared_ptr<DownstreamState> chashed(const NumberedServerVector& servers, const DNSQuestion* dq)
890 {
891 unsigned int qhash = dq->qname->hash(g_hashperturb);
892 unsigned int sel = std::numeric_limits<unsigned int>::max();
893 unsigned int min = std::numeric_limits<unsigned int>::max();
894 shared_ptr<DownstreamState> ret = nullptr, first = nullptr;
895
896 double targetLoad = std::numeric_limits<double>::max();
897 if (g_consistentHashBalancingFactor > 0) {
898 /* we start with one, representing the query we are currently handling */
899 double currentLoad = 1;
900 for (const auto& pair : servers) {
901 currentLoad += pair.second->outstanding;
902 }
903 targetLoad = (currentLoad / servers.size()) * g_consistentHashBalancingFactor;
904 }
905
906 for (const auto& d: servers) {
907 if (d.second->isUp() && d.second->outstanding <= targetLoad) {
908 // make sure hashes have been computed
909 if (d.second->hashes.empty()) {
910 d.second->hash();
911 }
912 {
913 ReadLock rl(&(d.second->d_lock));
914 const auto& server = d.second;
915 // we want to keep track of the last hash
916 if (min > *(server->hashes.begin())) {
917 min = *(server->hashes.begin());
918 first = server;
919 }
920
921 auto hash_it = server->hashes.lower_bound(qhash);
922 if (hash_it != server->hashes.end()) {
923 if (*hash_it < sel) {
924 sel = *hash_it;
925 ret = server;
926 }
927 }
928 }
929 }
930 }
931 if (ret != nullptr) {
932 return ret;
933 }
934 if (first != nullptr) {
935 return first;
936 }
937 return shared_ptr<DownstreamState>();
938 }
939
940 shared_ptr<DownstreamState> roundrobin(const NumberedServerVector& servers, const DNSQuestion* dq)
941 {
942 NumberedServerVector poss;
943
944 for(auto& d : servers) {
945 if(d.second->isUp()) {
946 poss.push_back(d);
947 }
948 }
949
950 const auto *res=&poss;
951 if(poss.empty() && !g_roundrobinFailOnNoServer)
952 res = &servers;
953
954 if(res->empty())
955 return shared_ptr<DownstreamState>();
956
957 static unsigned int counter;
958
959 return (*res)[(counter++) % res->size()].second;
960 }
961
962 ComboAddress g_serverControl{"127.0.0.1:5199"};
963
964 std::shared_ptr<ServerPool> createPoolIfNotExists(pools_t& pools, const string& poolName)
965 {
966 std::shared_ptr<ServerPool> pool;
967 pools_t::iterator it = pools.find(poolName);
968 if (it != pools.end()) {
969 pool = it->second;
970 }
971 else {
972 if (!poolName.empty())
973 vinfolog("Creating pool %s", poolName);
974 pool = std::make_shared<ServerPool>();
975 pools.insert(std::pair<std::string,std::shared_ptr<ServerPool> >(poolName, pool));
976 }
977 return pool;
978 }
979
980 void setPoolPolicy(pools_t& pools, const string& poolName, std::shared_ptr<ServerPolicy> policy)
981 {
982 std::shared_ptr<ServerPool> pool = createPoolIfNotExists(pools, poolName);
983 if (!poolName.empty()) {
984 vinfolog("Setting pool %s server selection policy to %s", poolName, policy->name);
985 } else {
986 vinfolog("Setting default pool server selection policy to %s", policy->name);
987 }
988 pool->policy = policy;
989 }
990
991 void addServerToPool(pools_t& pools, const string& poolName, std::shared_ptr<DownstreamState> server)
992 {
993 std::shared_ptr<ServerPool> pool = createPoolIfNotExists(pools, poolName);
994 if (!poolName.empty()) {
995 vinfolog("Adding server to pool %s", poolName);
996 } else {
997 vinfolog("Adding server to default pool");
998 }
999 pool->addServer(server);
1000 }
1001
1002 void removeServerFromPool(pools_t& pools, const string& poolName, std::shared_ptr<DownstreamState> server)
1003 {
1004 std::shared_ptr<ServerPool> pool = getPool(pools, poolName);
1005
1006 if (!poolName.empty()) {
1007 vinfolog("Removing server from pool %s", poolName);
1008 }
1009 else {
1010 vinfolog("Removing server from default pool");
1011 }
1012
1013 pool->removeServer(server);
1014 }
1015
1016 std::shared_ptr<ServerPool> getPool(const pools_t& pools, const std::string& poolName)
1017 {
1018 pools_t::const_iterator it = pools.find(poolName);
1019
1020 if (it == pools.end()) {
1021 throw std::out_of_range("No pool named " + poolName);
1022 }
1023
1024 return it->second;
1025 }
1026
1027 NumberedServerVector getDownstreamCandidates(const pools_t& pools, const std::string& poolName)
1028 {
1029 std::shared_ptr<ServerPool> pool = getPool(pools, poolName);
1030 return pool->getServers();
1031 }
1032
1033 static void spoofResponseFromString(DNSQuestion& dq, const string& spoofContent)
1034 {
1035 string result;
1036
1037 std::vector<std::string> addrs;
1038 stringtok(addrs, spoofContent, " ,");
1039
1040 if (addrs.size() == 1) {
1041 try {
1042 ComboAddress spoofAddr(spoofContent);
1043 SpoofAction sa({spoofAddr});
1044 sa(&dq, &result);
1045 }
1046 catch(const PDNSException &e) {
1047 SpoofAction sa(spoofContent); // CNAME then
1048 sa(&dq, &result);
1049 }
1050 } else {
1051 std::vector<ComboAddress> cas;
1052 for (const auto& addr : addrs) {
1053 try {
1054 cas.push_back(ComboAddress(addr));
1055 }
1056 catch (...) {
1057 }
1058 }
1059 SpoofAction sa(cas);
1060 sa(&dq, &result);
1061 }
1062 }
1063
1064 bool processRulesResult(const DNSAction::Action& action, DNSQuestion& dq, std::string& ruleresult, bool& drop)
1065 {
1066 switch(action) {
1067 case DNSAction::Action::Allow:
1068 return true;
1069 break;
1070 case DNSAction::Action::Drop:
1071 ++g_stats.ruleDrop;
1072 drop = true;
1073 return true;
1074 break;
1075 case DNSAction::Action::Nxdomain:
1076 dq.dh->rcode = RCode::NXDomain;
1077 dq.dh->qr=true;
1078 ++g_stats.ruleNXDomain;
1079 return true;
1080 break;
1081 case DNSAction::Action::Refused:
1082 dq.dh->rcode = RCode::Refused;
1083 dq.dh->qr=true;
1084 ++g_stats.ruleRefused;
1085 return true;
1086 break;
1087 case DNSAction::Action::ServFail:
1088 dq.dh->rcode = RCode::ServFail;
1089 dq.dh->qr=true;
1090 ++g_stats.ruleServFail;
1091 return true;
1092 break;
1093 case DNSAction::Action::Spoof:
1094 spoofResponseFromString(dq, ruleresult);
1095 return true;
1096 break;
1097 case DNSAction::Action::Truncate:
1098 dq.dh->tc = true;
1099 dq.dh->qr = true;
1100 dq.dh->ra = dq.dh->rd;
1101 dq.dh->aa = false;
1102 dq.dh->ad = false;
1103 return true;
1104 break;
1105 case DNSAction::Action::HeaderModify:
1106 return true;
1107 break;
1108 case DNSAction::Action::Pool:
1109 dq.poolname=ruleresult;
1110 return true;
1111 break;
1112 case DNSAction::Action::NoRecurse:
1113 dq.dh->rd = false;
1114 return true;
1115 break;
1116 /* non-terminal actions follow */
1117 case DNSAction::Action::Delay:
1118 dq.delayMsec = static_cast<int>(pdns_stou(ruleresult)); // sorry
1119 break;
1120 case DNSAction::Action::None:
1121 /* fall-through */
1122 case DNSAction::Action::NoOp:
1123 break;
1124 }
1125
1126 /* false means that we don't stop the processing */
1127 return false;
1128 }
1129
1130
1131 static bool applyRulesToQuery(LocalHolders& holders, DNSQuestion& dq, const struct timespec& now)
1132 {
1133 g_rings.insertQuery(now, *dq.remote, *dq.qname, dq.qtype, dq.len, *dq.dh);
1134
1135 if(g_qcount.enabled) {
1136 string qname = (*dq.qname).toLogString();
1137 bool countQuery{true};
1138 if(g_qcount.filter) {
1139 std::lock_guard<std::mutex> lock(g_luamutex);
1140 std::tie (countQuery, qname) = g_qcount.filter(&dq);
1141 }
1142
1143 if(countQuery) {
1144 WriteLock wl(&g_qcount.queryLock);
1145 if(!g_qcount.records.count(qname)) {
1146 g_qcount.records[qname] = 0;
1147 }
1148 g_qcount.records[qname]++;
1149 }
1150 }
1151
1152 if(auto got = holders.dynNMGBlock->lookup(*dq.remote)) {
1153 auto updateBlockStats = [&got]() {
1154 ++g_stats.dynBlocked;
1155 got->second.blocks++;
1156 };
1157
1158 if(now < got->second.until) {
1159 DNSAction::Action action = got->second.action;
1160 if (action == DNSAction::Action::None) {
1161 action = g_dynBlockAction;
1162 }
1163 switch (action) {
1164 case DNSAction::Action::NoOp:
1165 /* do nothing */
1166 break;
1167
1168 case DNSAction::Action::Nxdomain:
1169 vinfolog("Query from %s turned into NXDomain because of dynamic block", dq.remote->toStringWithPort());
1170 updateBlockStats();
1171
1172 dq.dh->rcode = RCode::NXDomain;
1173 dq.dh->qr=true;
1174 return true;
1175
1176 case DNSAction::Action::Refused:
1177 vinfolog("Query from %s refused because of dynamic block", dq.remote->toStringWithPort());
1178 updateBlockStats();
1179
1180 dq.dh->rcode = RCode::Refused;
1181 dq.dh->qr = true;
1182 return true;
1183
1184 case DNSAction::Action::Truncate:
1185 if(!dq.tcp) {
1186 updateBlockStats();
1187 vinfolog("Query from %s truncated because of dynamic block", dq.remote->toStringWithPort());
1188 dq.dh->tc = true;
1189 dq.dh->qr = true;
1190 dq.dh->ra = dq.dh->rd;
1191 dq.dh->aa = false;
1192 dq.dh->ad = false;
1193 return true;
1194 }
1195 else {
1196 vinfolog("Query from %s for %s over TCP *not* truncated because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toLogString());
1197 }
1198 break;
1199 case DNSAction::Action::NoRecurse:
1200 updateBlockStats();
1201 vinfolog("Query from %s setting rd=0 because of dynamic block", dq.remote->toStringWithPort());
1202 dq.dh->rd = false;
1203 return true;
1204 default:
1205 updateBlockStats();
1206 vinfolog("Query from %s dropped because of dynamic block", dq.remote->toStringWithPort());
1207 return false;
1208 }
1209 }
1210 }
1211
1212 if(auto got = holders.dynSMTBlock->lookup(*dq.qname)) {
1213 auto updateBlockStats = [&got]() {
1214 ++g_stats.dynBlocked;
1215 got->blocks++;
1216 };
1217
1218 if(now < got->until) {
1219 DNSAction::Action action = got->action;
1220 if (action == DNSAction::Action::None) {
1221 action = g_dynBlockAction;
1222 }
1223 switch (action) {
1224 case DNSAction::Action::NoOp:
1225 /* do nothing */
1226 break;
1227 case DNSAction::Action::Nxdomain:
1228 vinfolog("Query from %s for %s turned into NXDomain because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toLogString());
1229 updateBlockStats();
1230
1231 dq.dh->rcode = RCode::NXDomain;
1232 dq.dh->qr=true;
1233 return true;
1234 case DNSAction::Action::Refused:
1235 vinfolog("Query from %s for %s refused because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toLogString());
1236 updateBlockStats();
1237
1238 dq.dh->rcode = RCode::Refused;
1239 dq.dh->qr=true;
1240 return true;
1241 case DNSAction::Action::Truncate:
1242 if(!dq.tcp) {
1243 updateBlockStats();
1244
1245 vinfolog("Query from %s for %s truncated because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toLogString());
1246 dq.dh->tc = true;
1247 dq.dh->qr = true;
1248 dq.dh->ra = dq.dh->rd;
1249 dq.dh->aa = false;
1250 dq.dh->ad = false;
1251 return true;
1252 }
1253 else {
1254 vinfolog("Query from %s for %s over TCP *not* truncated because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toLogString());
1255 }
1256 break;
1257 case DNSAction::Action::NoRecurse:
1258 updateBlockStats();
1259 vinfolog("Query from %s setting rd=0 because of dynamic block", dq.remote->toStringWithPort());
1260 dq.dh->rd = false;
1261 return true;
1262 default:
1263 updateBlockStats();
1264 vinfolog("Query from %s for %s dropped because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toLogString());
1265 return false;
1266 }
1267 }
1268 }
1269
1270 DNSAction::Action action=DNSAction::Action::None;
1271 string ruleresult;
1272 bool drop = false;
1273 for(const auto& lr : *holders.rulactions) {
1274 if(lr.d_rule->matches(&dq)) {
1275 lr.d_rule->d_matches++;
1276 action=(*lr.d_action)(&dq, &ruleresult);
1277 if (processRulesResult(action, dq, ruleresult, drop)) {
1278 break;
1279 }
1280 }
1281 }
1282
1283 if (drop) {
1284 return false;
1285 }
1286
1287 return true;
1288 }
1289
1290 ssize_t udpClientSendRequestToBackend(const std::shared_ptr<DownstreamState>& ss, const int sd, const char* request, const size_t requestLen, bool healthCheck)
1291 {
1292 ssize_t result;
1293
1294 if (ss->sourceItf == 0) {
1295 result = send(sd, request, requestLen, 0);
1296 }
1297 else {
1298 struct msghdr msgh;
1299 struct iovec iov;
1300 cmsgbuf_aligned cbuf;
1301 ComboAddress remote(ss->remote);
1302 fillMSGHdr(&msgh, &iov, &cbuf, sizeof(cbuf), const_cast<char*>(request), requestLen, &remote);
1303 addCMsgSrcAddr(&msgh, &cbuf, &ss->sourceAddr, ss->sourceItf);
1304 result = sendmsg(sd, &msgh, 0);
1305 }
1306
1307 if (result == -1) {
1308 int savederrno = errno;
1309 vinfolog("Error sending request to backend %s: %d", ss->remote.toStringWithPort(), savederrno);
1310
1311 /* This might sound silly, but on Linux send() might fail with EINVAL
1312 if the interface the socket was bound to doesn't exist anymore.
1313 We don't want to reconnect the real socket if the healthcheck failed,
1314 because it's not using the same socket.
1315 */
1316 if (!healthCheck && (savederrno == EINVAL || savederrno == ENODEV)) {
1317 ss->reconnect();
1318 }
1319 }
1320
1321 return result;
1322 }
1323
1324 static bool isUDPQueryAcceptable(ClientState& cs, LocalHolders& holders, const struct msghdr* msgh, const ComboAddress& remote, ComboAddress& dest)
1325 {
1326 if (msgh->msg_flags & MSG_TRUNC) {
1327 /* message was too large for our buffer */
1328 vinfolog("Dropping message too large for our buffer");
1329 ++g_stats.nonCompliantQueries;
1330 return false;
1331 }
1332
1333 if(!holders.acl->match(remote)) {
1334 vinfolog("Query from %s dropped because of ACL", remote.toStringWithPort());
1335 ++g_stats.aclDrops;
1336 return false;
1337 }
1338
1339 cs.queries++;
1340 ++g_stats.queries;
1341
1342 if (HarvestDestinationAddress(msgh, &dest)) {
1343 /* we don't get the port, only the address */
1344 dest.sin4.sin_port = cs.local.sin4.sin_port;
1345 }
1346 else {
1347 dest.sin4.sin_family = 0;
1348 }
1349
1350 return true;
1351 }
1352
1353 boost::optional<std::vector<uint8_t>> checkDNSCryptQuery(const ClientState& cs, const char* query, uint16_t& len, std::shared_ptr<DNSCryptQuery>& dnsCryptQuery, time_t now, bool tcp)
1354 {
1355 if (cs.dnscryptCtx) {
1356 #ifdef HAVE_DNSCRYPT
1357 vector<uint8_t> response;
1358 uint16_t decryptedQueryLen = 0;
1359
1360 dnsCryptQuery = std::make_shared<DNSCryptQuery>(cs.dnscryptCtx);
1361
1362 bool decrypted = handleDNSCryptQuery(const_cast<char*>(query), len, dnsCryptQuery, &decryptedQueryLen, tcp, now, response);
1363
1364 if (!decrypted) {
1365 if (response.size() > 0) {
1366 return response;
1367 }
1368 throw std::runtime_error("Unable to decrypt DNSCrypt query, dropping.");
1369 }
1370
1371 len = decryptedQueryLen;
1372 #endif /* HAVE_DNSCRYPT */
1373 }
1374 return boost::none;
1375 }
1376
1377 bool checkQueryHeaders(const struct dnsheader* dh)
1378 {
1379 if (dh->qr) { // don't respond to responses
1380 ++g_stats.nonCompliantQueries;
1381 return false;
1382 }
1383
1384 if (dh->qdcount == 0) {
1385 ++g_stats.emptyQueries;
1386 return false;
1387 }
1388
1389 if (dh->rd) {
1390 ++g_stats.rdQueries;
1391 }
1392
1393 return true;
1394 }
1395
1396 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1397 static void queueResponse(const ClientState& cs, const char* response, uint16_t responseLen, const ComboAddress& dest, const ComboAddress& remote, struct mmsghdr& outMsg, struct iovec* iov, cmsgbuf_aligned* cbuf)
1398 {
1399 outMsg.msg_len = 0;
1400 fillMSGHdr(&outMsg.msg_hdr, iov, nullptr, 0, const_cast<char*>(response), responseLen, const_cast<ComboAddress*>(&remote));
1401
1402 if (dest.sin4.sin_family == 0) {
1403 outMsg.msg_hdr.msg_control = nullptr;
1404 }
1405 else {
1406 addCMsgSrcAddr(&outMsg.msg_hdr, cbuf, &dest, 0);
1407 }
1408 }
1409 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1410
1411 /* self-generated responses or cache hits */
1412 static bool prepareOutgoingResponse(LocalHolders& holders, ClientState& cs, DNSQuestion& dq, bool cacheHit)
1413 {
1414 DNSResponse dr(dq.qname, dq.qtype, dq.qclass, dq.consumed, dq.local, dq.remote, reinterpret_cast<dnsheader*>(dq.dh), dq.size, dq.len, dq.tcp, dq.queryTime);
1415
1416 #ifdef HAVE_PROTOBUF
1417 dr.uniqueId = dq.uniqueId;
1418 #endif
1419 dr.qTag = dq.qTag;
1420 dr.delayMsec = dq.delayMsec;
1421
1422 if (!applyRulesToResponse(cacheHit ? holders.cacheHitRespRulactions : holders.selfAnsweredRespRulactions, dr)) {
1423 return false;
1424 }
1425
1426 /* in case a rule changed it */
1427 dq.delayMsec = dr.delayMsec;
1428
1429 #ifdef HAVE_DNSCRYPT
1430 if (!cs.muted) {
1431 if (!encryptResponse(reinterpret_cast<char*>(dq.dh), &dq.len, dq.size, dq.tcp, dq.dnsCryptQuery, nullptr, nullptr)) {
1432 return false;
1433 }
1434 }
1435 #endif /* HAVE_DNSCRYPT */
1436
1437 if (cacheHit) {
1438 ++g_stats.cacheHits;
1439 }
1440
1441 switch (dr.dh->rcode) {
1442 case RCode::NXDomain:
1443 ++g_stats.frontendNXDomain;
1444 break;
1445 case RCode::ServFail:
1446 ++g_stats.frontendServFail;
1447 break;
1448 case RCode::NoError:
1449 ++g_stats.frontendNoError;
1450 break;
1451 }
1452
1453 doLatencyStats(0); // we're not going to measure this
1454 return true;
1455 }
1456
1457 ProcessQueryResult processQuery(DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend)
1458 {
1459 const uint16_t queryId = ntohs(dq.dh->id);
1460
1461 try {
1462 /* we need an accurate ("real") value for the response and
1463 to store into the IDS, but not for insertion into the
1464 rings for example */
1465 struct timespec now;
1466 gettime(&now);
1467
1468 if (!applyRulesToQuery(holders, dq, now)) {
1469 return ProcessQueryResult::Drop;
1470 }
1471
1472 if(dq.dh->qr) { // something turned it into a response
1473 fixUpQueryTurnedResponse(dq, dq.origFlags);
1474
1475 if (!prepareOutgoingResponse(holders, cs, dq, false)) {
1476 return ProcessQueryResult::Drop;
1477 }
1478
1479 ++g_stats.selfAnswered;
1480 ++cs.responses;
1481 return ProcessQueryResult::SendAnswer;
1482 }
1483
1484 std::shared_ptr<ServerPool> serverPool = getPool(*holders.pools, dq.poolname);
1485 dq.packetCache = serverPool->packetCache;
1486 auto policy = *(holders.policy);
1487 if (serverPool->policy != nullptr) {
1488 policy = *(serverPool->policy);
1489 }
1490 auto servers = serverPool->getServers();
1491 if (policy.isLua) {
1492 std::lock_guard<std::mutex> lock(g_luamutex);
1493 selectedBackend = policy.policy(servers, &dq);
1494 }
1495 else {
1496 selectedBackend = policy.policy(servers, &dq);
1497 }
1498
1499 uint16_t cachedResponseSize = dq.size;
1500 uint32_t allowExpired = selectedBackend ? 0 : g_staleCacheEntriesTTL;
1501
1502 if (dq.packetCache && !dq.skipCache) {
1503 dq.dnssecOK = (getEDNSZ(dq) & EDNS_HEADER_FLAG_DO);
1504 }
1505
1506 if (dq.useECS && ((selectedBackend && selectedBackend->useECS) || (!selectedBackend && serverPool->getECS()))) {
1507 // we special case our cache in case a downstream explicitly gave us a universally valid response with a 0 scope
1508 // we need ECS parsing (parseECS) to be true so we can be sure that the initial incoming query did not have an existing
1509 // ECS option, which would make it unsuitable for the zero-scope feature.
1510 if (dq.packetCache && !dq.skipCache && (!selectedBackend || !selectedBackend->disableZeroScope) && dq.packetCache->isECSParsingEnabled()) {
1511 if (dq.packetCache->get(dq, dq.consumed, dq.dh->id, reinterpret_cast<char*>(dq.dh), &cachedResponseSize, &dq.cacheKeyNoECS, dq.subnet, dq.dnssecOK, allowExpired)) {
1512 dq.len = cachedResponseSize;
1513
1514 if (!prepareOutgoingResponse(holders, cs, dq, true)) {
1515 return ProcessQueryResult::Drop;
1516 }
1517
1518 return ProcessQueryResult::SendAnswer;
1519 }
1520
1521 if (!dq.subnet) {
1522 /* there was no existing ECS on the query, enable the zero-scope feature */
1523 dq.useZeroScope = true;
1524 }
1525 }
1526
1527 if (!handleEDNSClientSubnet(dq, &(dq.ednsAdded), &(dq.ecsAdded), g_preserveTrailingData)) {
1528 vinfolog("Dropping query from %s because we couldn't insert the ECS value", dq.remote->toStringWithPort());
1529 return ProcessQueryResult::Drop;
1530 }
1531 }
1532
1533 if (dq.packetCache && !dq.skipCache) {
1534 if (dq.packetCache->get(dq, dq.consumed, dq.dh->id, reinterpret_cast<char*>(dq.dh), &cachedResponseSize, &dq.cacheKey, dq.subnet, dq.dnssecOK, allowExpired)) {
1535 dq.len = cachedResponseSize;
1536
1537 if (!prepareOutgoingResponse(holders, cs, dq, true)) {
1538 return ProcessQueryResult::Drop;
1539 }
1540
1541 return ProcessQueryResult::SendAnswer;
1542 }
1543 ++g_stats.cacheMisses;
1544 }
1545
1546 if(!selectedBackend) {
1547 ++g_stats.noPolicy;
1548
1549 vinfolog("%s query for %s|%s from %s, no policy applied", g_servFailOnNoPolicy ? "ServFailed" : "Dropped", dq.qname->toLogString(), QType(dq.qtype).getName(), dq.remote->toStringWithPort());
1550 if (g_servFailOnNoPolicy) {
1551 restoreFlags(dq.dh, dq.origFlags);
1552
1553 dq.dh->rcode = RCode::ServFail;
1554 dq.dh->qr = true;
1555
1556 if (!prepareOutgoingResponse(holders, cs, dq, false)) {
1557 return ProcessQueryResult::Drop;
1558 }
1559 // no response-only statistics counter to update.
1560 return ProcessQueryResult::SendAnswer;
1561 }
1562
1563 return ProcessQueryResult::Drop;
1564 }
1565
1566 if (dq.addXPF && selectedBackend->xpfRRCode != 0) {
1567 addXPF(dq, selectedBackend->xpfRRCode, g_preserveTrailingData);
1568 }
1569
1570 selectedBackend->queries++;
1571 return ProcessQueryResult::PassToBackend;
1572 }
1573 catch(const std::exception& e){
1574 vinfolog("Got an error while parsing a %s query from %s, id %d: %s", (dq.tcp ? "TCP" : "UDP"), dq.remote->toStringWithPort(), queryId, e.what());
1575 }
1576 return ProcessQueryResult::Drop;
1577 }
1578
1579 static void processUDPQuery(ClientState& cs, LocalHolders& holders, const struct msghdr* msgh, const ComboAddress& remote, ComboAddress& dest, char* query, uint16_t len, size_t queryBufferSize, struct mmsghdr* responsesVect, unsigned int* queuedResponses, struct iovec* respIOV, cmsgbuf_aligned* respCBuf)
1580 {
1581 assert(responsesVect == nullptr || (queuedResponses != nullptr && respIOV != nullptr && respCBuf != nullptr));
1582 uint16_t queryId = 0;
1583
1584 try {
1585 if (!isUDPQueryAcceptable(cs, holders, msgh, remote, dest)) {
1586 return;
1587 }
1588
1589 /* we need an accurate ("real") value for the response and
1590 to store into the IDS, but not for insertion into the
1591 rings for example */
1592 struct timespec queryRealTime;
1593 gettime(&queryRealTime, true);
1594
1595 std::shared_ptr<DNSCryptQuery> dnsCryptQuery = nullptr;
1596 auto dnsCryptResponse = checkDNSCryptQuery(cs, query, len, dnsCryptQuery, queryRealTime.tv_sec, false);
1597 if (dnsCryptResponse) {
1598 sendUDPResponse(cs.udpFD, reinterpret_cast<char*>(dnsCryptResponse->data()), static_cast<uint16_t>(dnsCryptResponse->size()), 0, dest, remote);
1599 return;
1600 }
1601
1602 struct dnsheader* dh = reinterpret_cast<struct dnsheader*>(query);
1603 queryId = ntohs(dh->id);
1604
1605 if (!checkQueryHeaders(dh)) {
1606 return;
1607 }
1608
1609 uint16_t qtype, qclass;
1610 unsigned int consumed = 0;
1611 DNSName qname(query, len, sizeof(dnsheader), false, &qtype, &qclass, &consumed);
1612 DNSQuestion dq(&qname, qtype, qclass, consumed, dest.sin4.sin_family != 0 ? &dest : &cs.local, &remote, dh, queryBufferSize, len, false, &queryRealTime);
1613 dq.dnsCryptQuery = std::move(dnsCryptQuery);
1614 std::shared_ptr<DownstreamState> ss{nullptr};
1615 auto result = processQuery(dq, cs, holders, ss);
1616
1617 if (result == ProcessQueryResult::Drop) {
1618 return;
1619 }
1620
1621 if (result == ProcessQueryResult::SendAnswer) {
1622 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1623 if (dq.delayMsec == 0 && responsesVect != nullptr) {
1624 queueResponse(cs, reinterpret_cast<char*>(dq.dh), dq.len, *dq.local, *dq.remote, responsesVect[*queuedResponses], respIOV, respCBuf);
1625 (*queuedResponses)++;
1626 return;
1627 }
1628 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1629 /* we use dest, always, because we don't want to use the listening address to send a response since it could be 0.0.0.0 */
1630 sendUDPResponse(cs.udpFD, reinterpret_cast<char*>(dq.dh), dq.len, dq.delayMsec, dest, *dq.remote);
1631 return;
1632 }
1633
1634 if (result != ProcessQueryResult::PassToBackend || ss == nullptr) {
1635 return;
1636 }
1637
1638 unsigned int idOffset = (ss->idOffset++) % ss->idStates.size();
1639 IDState* ids = &ss->idStates[idOffset];
1640 ids->age = 0;
1641 DOHUnit* du = nullptr;
1642
1643 /* that means that the state was in use, possibly with an allocated
1644 DOHUnit that we will need to handle, but we can't touch it before
1645 confirming that we now own this state */
1646 if (ids->isInUse()) {
1647 du = ids->du;
1648 }
1649
1650 /* we atomically replace the value, we now own this state */
1651 if (!ids->markAsUsed()) {
1652 /* the state was not in use.
1653 we reset 'du' because it might have still been in use when we read it. */
1654 du = nullptr;
1655 ++ss->outstanding;
1656 }
1657 else {
1658 /* we are reusing a state, no change in outstanding but if there was an existing DOHUnit we need
1659 to handle it because it's about to be overwritten. */
1660 ids->du = nullptr;
1661 ++ss->reuseds;
1662 ++g_stats.downstreamTimeouts;
1663 handleDOHTimeout(du);
1664 }
1665
1666 ids->cs = &cs;
1667 ids->origFD = cs.udpFD;
1668 ids->origID = dh->id;
1669 setIDStateFromDNSQuestion(*ids, dq, std::move(qname));
1670
1671 /* If we couldn't harvest the real dest addr, still
1672 write down the listening addr since it will be useful
1673 (especially if it's not an 'any' one).
1674 We need to keep track of which one it is since we may
1675 want to use the real but not the listening addr to reply.
1676 */
1677 if (dest.sin4.sin_family != 0) {
1678 ids->origDest = dest;
1679 ids->destHarvested = true;
1680 }
1681 else {
1682 ids->origDest = cs.local;
1683 ids->destHarvested = false;
1684 }
1685
1686 dh->id = idOffset;
1687
1688 int fd = pickBackendSocketForSending(ss);
1689 ssize_t ret = udpClientSendRequestToBackend(ss, fd, query, dq.len);
1690
1691 if(ret < 0) {
1692 ++ss->sendErrors;
1693 ++g_stats.downstreamSendErrors;
1694 }
1695
1696 vinfolog("Got query for %s|%s from %s, relayed to %s", ids->qname.toLogString(), QType(ids->qtype).getName(), remote.toStringWithPort(), ss->getName());
1697 }
1698 catch(const std::exception& e){
1699 vinfolog("Got an error in UDP question thread while parsing a query from %s, id %d: %s", remote.toStringWithPort(), queryId, e.what());
1700 }
1701 }
1702
1703 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1704 static void MultipleMessagesUDPClientThread(ClientState* cs, LocalHolders& holders)
1705 {
1706 struct MMReceiver
1707 {
1708 char packet[s_maxPacketCacheEntrySize];
1709 ComboAddress remote;
1710 ComboAddress dest;
1711 struct iovec iov;
1712 /* used by HarvestDestinationAddress */
1713 cmsgbuf_aligned cbuf;
1714 };
1715 const size_t vectSize = g_udpVectorSize;
1716 /* the actual buffer is larger because:
1717 - we may have to add EDNS and/or ECS
1718 - we use it for self-generated responses (from rule or cache)
1719 but we only accept incoming payloads up to that size
1720 */
1721 static_assert(s_udpIncomingBufferSize <= sizeof(MMReceiver::packet), "the incoming buffer size should not be larger than sizeof(MMReceiver::packet)");
1722
1723 auto recvData = std::unique_ptr<MMReceiver[]>(new MMReceiver[vectSize]);
1724 auto msgVec = std::unique_ptr<struct mmsghdr[]>(new struct mmsghdr[vectSize]);
1725 auto outMsgVec = std::unique_ptr<struct mmsghdr[]>(new struct mmsghdr[vectSize]);
1726
1727 /* initialize the structures needed to receive our messages */
1728 for (size_t idx = 0; idx < vectSize; idx++) {
1729 recvData[idx].remote.sin4.sin_family = cs->local.sin4.sin_family;
1730 fillMSGHdr(&msgVec[idx].msg_hdr, &recvData[idx].iov, &recvData[idx].cbuf, sizeof(recvData[idx].cbuf), recvData[idx].packet, s_udpIncomingBufferSize, &recvData[idx].remote);
1731 }
1732
1733 /* go now */
1734 for(;;) {
1735
1736 /* reset the IO vector, since it's also used to send the vector of responses
1737 to avoid having to copy the data around */
1738 for (size_t idx = 0; idx < vectSize; idx++) {
1739 recvData[idx].iov.iov_base = recvData[idx].packet;
1740 recvData[idx].iov.iov_len = sizeof(recvData[idx].packet);
1741 }
1742
1743 /* block until we have at least one message ready, but return
1744 as many as possible to save the syscall costs */
1745 int msgsGot = recvmmsg(cs->udpFD, msgVec.get(), vectSize, MSG_WAITFORONE | MSG_TRUNC, nullptr);
1746
1747 if (msgsGot <= 0) {
1748 vinfolog("Getting UDP messages via recvmmsg() failed with: %s", stringerror());
1749 continue;
1750 }
1751
1752 unsigned int msgsToSend = 0;
1753
1754 /* process the received messages */
1755 for (int msgIdx = 0; msgIdx < msgsGot; msgIdx++) {
1756 const struct msghdr* msgh = &msgVec[msgIdx].msg_hdr;
1757 unsigned int got = msgVec[msgIdx].msg_len;
1758 const ComboAddress& remote = recvData[msgIdx].remote;
1759
1760 if (static_cast<size_t>(got) < sizeof(struct dnsheader)) {
1761 ++g_stats.nonCompliantQueries;
1762 continue;
1763 }
1764
1765 processUDPQuery(*cs, holders, msgh, remote, recvData[msgIdx].dest, recvData[msgIdx].packet, static_cast<uint16_t>(got), sizeof(recvData[msgIdx].packet), outMsgVec.get(), &msgsToSend, &recvData[msgIdx].iov, &recvData[msgIdx].cbuf);
1766
1767 }
1768
1769 /* immediate (not delayed or sent to a backend) responses (mostly from a rule, dynamic block
1770 or the cache) can be sent in batch too */
1771
1772 if (msgsToSend > 0 && msgsToSend <= static_cast<unsigned int>(msgsGot)) {
1773 int sent = sendmmsg(cs->udpFD, outMsgVec.get(), msgsToSend, 0);
1774
1775 if (sent < 0 || static_cast<unsigned int>(sent) != msgsToSend) {
1776 vinfolog("Error sending responses with sendmmsg() (%d on %u): %s", sent, msgsToSend, stringerror());
1777 }
1778 }
1779
1780 }
1781 }
1782 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1783
1784 // listens to incoming queries, sends out to downstream servers, noting the intended return path
1785 static void udpClientThread(ClientState* cs)
1786 try
1787 {
1788 setThreadName("dnsdist/udpClie");
1789 LocalHolders holders;
1790
1791 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1792 if (g_udpVectorSize > 1) {
1793 MultipleMessagesUDPClientThread(cs, holders);
1794
1795 }
1796 else
1797 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1798 {
1799 char packet[s_maxPacketCacheEntrySize];
1800 /* the actual buffer is larger because:
1801 - we may have to add EDNS and/or ECS
1802 - we use it for self-generated responses (from rule or cache)
1803 but we only accept incoming payloads up to that size
1804 */
1805 static_assert(s_udpIncomingBufferSize <= sizeof(packet), "the incoming buffer size should not be larger than sizeof(MMReceiver::packet)");
1806 struct msghdr msgh;
1807 struct iovec iov;
1808 /* used by HarvestDestinationAddress */
1809 cmsgbuf_aligned cbuf;
1810
1811 ComboAddress remote;
1812 ComboAddress dest;
1813 remote.sin4.sin_family = cs->local.sin4.sin_family;
1814 fillMSGHdr(&msgh, &iov, &cbuf, sizeof(cbuf), packet, s_udpIncomingBufferSize, &remote);
1815
1816 for(;;) {
1817 ssize_t got = recvmsg(cs->udpFD, &msgh, 0);
1818
1819 if (got < 0 || static_cast<size_t>(got) < sizeof(struct dnsheader)) {
1820 ++g_stats.nonCompliantQueries;
1821 continue;
1822 }
1823
1824 processUDPQuery(*cs, holders, &msgh, remote, dest, packet, static_cast<uint16_t>(got), sizeof(packet), nullptr, nullptr, nullptr, nullptr);
1825 }
1826 }
1827 }
1828 catch(const std::exception &e)
1829 {
1830 errlog("UDP client thread died because of exception: %s", e.what());
1831 }
1832 catch(const PDNSException &e)
1833 {
1834 errlog("UDP client thread died because of PowerDNS exception: %s", e.reason);
1835 }
1836 catch(...)
1837 {
1838 errlog("UDP client thread died because of an exception: %s", "unknown");
1839 }
1840
1841 uint16_t getRandomDNSID()
1842 {
1843 #ifdef HAVE_LIBSODIUM
1844 return randombytes_uniform(65536);
1845 #else
1846 return (random() % 65536);
1847 #endif
1848 }
1849
1850 uint64_t g_maxTCPClientThreads{10};
1851 std::atomic<uint16_t> g_cacheCleaningDelay{60};
1852 std::atomic<uint16_t> g_cacheCleaningPercentage{100};
1853
1854 void maintThread()
1855 {
1856 setThreadName("dnsdist/main");
1857 int interval = 1;
1858 size_t counter = 0;
1859 int32_t secondsToWaitLog = 0;
1860
1861 for(;;) {
1862 sleep(interval);
1863
1864 {
1865 std::lock_guard<std::mutex> lock(g_luamutex);
1866 auto f = g_lua.readVariable<boost::optional<std::function<void()> > >("maintenance");
1867 if(f) {
1868 try {
1869 (*f)();
1870 secondsToWaitLog = 0;
1871 }
1872 catch(std::exception &e) {
1873 if (secondsToWaitLog <= 0) {
1874 infolog("Error during execution of maintenance function: %s", e.what());
1875 secondsToWaitLog = 61;
1876 }
1877 secondsToWaitLog -= interval;
1878 }
1879 }
1880 }
1881
1882 counter++;
1883 if (counter >= g_cacheCleaningDelay) {
1884 /* keep track, for each cache, of whether we should keep
1885 expired entries */
1886 std::map<std::shared_ptr<DNSDistPacketCache>, bool> caches;
1887
1888 /* gather all caches actually used by at least one pool, and see
1889 if something prevents us from cleaning the expired entries */
1890 auto localPools = g_pools.getLocal();
1891 for (const auto& entry : *localPools) {
1892 auto& pool = entry.second;
1893
1894 auto packetCache = pool->packetCache;
1895 if (!packetCache) {
1896 continue;
1897 }
1898
1899 auto pair = caches.insert({packetCache, false});
1900 auto& iter = pair.first;
1901 /* if we need to keep stale data for this cache (ie, not clear
1902 expired entries when at least one pool using this cache
1903 has all its backends down) */
1904 if (packetCache->keepStaleData() && iter->second == false) {
1905 /* so far all pools had at least one backend up */
1906 if (pool->countServers(true) == 0) {
1907 iter->second = true;
1908 }
1909 }
1910 }
1911
1912 for (auto pair : caches) {
1913 /* shall we keep expired entries ? */
1914 if (pair.second == true) {
1915 continue;
1916 }
1917 auto& packetCache = pair.first;
1918 size_t upTo = (packetCache->getMaxEntries()* (100 - g_cacheCleaningPercentage)) / 100;
1919 packetCache->purgeExpired(upTo);
1920 }
1921 counter = 0;
1922 }
1923
1924 // ponder pruning g_dynblocks of expired entries here
1925 }
1926 }
1927
1928 static void secPollThread()
1929 {
1930 setThreadName("dnsdist/secpoll");
1931
1932 for (;;) {
1933 try {
1934 doSecPoll(g_secPollSuffix);
1935 }
1936 catch(...) {
1937 }
1938 sleep(g_secPollInterval);
1939 }
1940 }
1941
1942 static void healthChecksThread()
1943 {
1944 setThreadName("dnsdist/healthC");
1945
1946 static const int interval = 1;
1947
1948 for(;;) {
1949 sleep(interval);
1950
1951 if(g_tcpclientthreads->getQueuedCount() > 1 && !g_tcpclientthreads->hasReachedMaxThreads()) {
1952 g_tcpclientthreads->addTCPClientThread();
1953 }
1954
1955 auto mplexer = std::shared_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
1956 auto states = g_dstates.getLocal(); // this points to the actual shared_ptrs!
1957 for(auto& dss : *states) {
1958 if(++dss->lastCheck < dss->checkInterval) {
1959 continue;
1960 }
1961
1962 dss->lastCheck = 0;
1963
1964 if (dss->availability == DownstreamState::Availability::Auto) {
1965 if (!queueHealthCheck(mplexer, dss)) {
1966 updateHealthCheckResult(dss, false);
1967 }
1968 }
1969
1970 auto delta = dss->sw.udiffAndSet()/1000000.0;
1971 dss->queryLoad = 1.0*(dss->queries.load() - dss->prev.queries.load())/delta;
1972 dss->dropRate = 1.0*(dss->reuseds.load() - dss->prev.reuseds.load())/delta;
1973 dss->prev.queries.store(dss->queries.load());
1974 dss->prev.reuseds.store(dss->reuseds.load());
1975
1976 for (IDState& ids : dss->idStates) { // timeouts
1977 int64_t usageIndicator = ids.usageIndicator;
1978 if(IDState::isInUse(usageIndicator) && ids.age++ > g_udpTimeout) {
1979 /* We mark the state as unused as soon as possible
1980 to limit the risk of racing with the
1981 responder thread.
1982 */
1983 auto oldDU = ids.du;
1984
1985 if (!ids.tryMarkUnused(usageIndicator)) {
1986 /* this state has been altered in the meantime,
1987 don't go anywhere near it */
1988 continue;
1989 }
1990 ids.du = nullptr;
1991 handleDOHTimeout(oldDU);
1992 ids.age = 0;
1993 dss->reuseds++;
1994 --dss->outstanding;
1995 ++g_stats.downstreamTimeouts; // this is an 'actively' discovered timeout
1996 vinfolog("Had a downstream timeout from %s (%s) for query for %s|%s from %s",
1997 dss->remote.toStringWithPort(), dss->name,
1998 ids.qname.toLogString(), QType(ids.qtype).getName(), ids.origRemote.toStringWithPort());
1999
2000 struct timespec ts;
2001 gettime(&ts);
2002
2003 struct dnsheader fake;
2004 memset(&fake, 0, sizeof(fake));
2005 fake.id = ids.origID;
2006
2007 g_rings.insertResponse(ts, ids.origRemote, ids.qname, ids.qtype, std::numeric_limits<unsigned int>::max(), 0, fake, dss->remote);
2008 }
2009 }
2010 }
2011
2012 handleQueuedHealthChecks(mplexer);
2013 }
2014 }
2015
2016 static void bindAny(int af, int sock)
2017 {
2018 __attribute__((unused)) int one = 1;
2019
2020 #ifdef IP_FREEBIND
2021 if (setsockopt(sock, IPPROTO_IP, IP_FREEBIND, &one, sizeof(one)) < 0)
2022 warnlog("Warning: IP_FREEBIND setsockopt failed: %s", stringerror());
2023 #endif
2024
2025 #ifdef IP_BINDANY
2026 if (af == AF_INET)
2027 if (setsockopt(sock, IPPROTO_IP, IP_BINDANY, &one, sizeof(one)) < 0)
2028 warnlog("Warning: IP_BINDANY setsockopt failed: %s", stringerror());
2029 #endif
2030 #ifdef IPV6_BINDANY
2031 if (af == AF_INET6)
2032 if (setsockopt(sock, IPPROTO_IPV6, IPV6_BINDANY, &one, sizeof(one)) < 0)
2033 warnlog("Warning: IPV6_BINDANY setsockopt failed: %s", stringerror());
2034 #endif
2035 #ifdef SO_BINDANY
2036 if (setsockopt(sock, SOL_SOCKET, SO_BINDANY, &one, sizeof(one)) < 0)
2037 warnlog("Warning: SO_BINDANY setsockopt failed: %s", stringerror());
2038 #endif
2039 }
2040
2041 static void dropGroupPrivs(gid_t gid)
2042 {
2043 if (gid) {
2044 if (setgid(gid) == 0) {
2045 if (setgroups(0, NULL) < 0) {
2046 warnlog("Warning: Unable to drop supplementary gids: %s", stringerror());
2047 }
2048 }
2049 else {
2050 warnlog("Warning: Unable to set group ID to %d: %s", gid, stringerror());
2051 }
2052 }
2053 }
2054
2055 static void dropUserPrivs(uid_t uid)
2056 {
2057 if(uid) {
2058 if(setuid(uid) < 0) {
2059 warnlog("Warning: Unable to set user ID to %d: %s", uid, stringerror());
2060 }
2061 }
2062 }
2063
2064 static void checkFileDescriptorsLimits(size_t udpBindsCount, size_t tcpBindsCount)
2065 {
2066 /* stdin, stdout, stderr */
2067 size_t requiredFDsCount = 3;
2068 auto backends = g_dstates.getLocal();
2069 /* UDP sockets to backends */
2070 size_t backendUDPSocketsCount = 0;
2071 for (const auto& backend : *backends) {
2072 backendUDPSocketsCount += backend->sockets.size();
2073 }
2074 requiredFDsCount += backendUDPSocketsCount;
2075 /* TCP sockets to backends */
2076 requiredFDsCount += (backends->size() * g_maxTCPClientThreads);
2077 /* listening sockets */
2078 requiredFDsCount += udpBindsCount;
2079 requiredFDsCount += tcpBindsCount;
2080 /* max TCP connections currently served */
2081 requiredFDsCount += g_maxTCPClientThreads;
2082 /* max pipes for communicating between TCP acceptors and client threads */
2083 requiredFDsCount += (g_maxTCPClientThreads * 2);
2084 /* max TCP queued connections */
2085 requiredFDsCount += g_maxTCPQueuedConnections;
2086 /* DelayPipe pipe */
2087 requiredFDsCount += 2;
2088 /* syslog socket */
2089 requiredFDsCount++;
2090 /* webserver main socket */
2091 requiredFDsCount++;
2092 /* console main socket */
2093 requiredFDsCount++;
2094 /* carbon export */
2095 requiredFDsCount++;
2096 /* history file */
2097 requiredFDsCount++;
2098 struct rlimit rl;
2099 getrlimit(RLIMIT_NOFILE, &rl);
2100 if (rl.rlim_cur <= requiredFDsCount) {
2101 warnlog("Warning, this configuration can use more than %d file descriptors, web server and console connections not included, and the current limit is %d.", std::to_string(requiredFDsCount), std::to_string(rl.rlim_cur));
2102 #ifdef HAVE_SYSTEMD
2103 warnlog("You can increase this value by using LimitNOFILE= in the systemd unit file or ulimit.");
2104 #else
2105 warnlog("You can increase this value by using ulimit.");
2106 #endif
2107 }
2108 }
2109
2110 static void setUpLocalBind(std::unique_ptr<ClientState>& cs)
2111 {
2112 /* skip some warnings if there is an identical UDP context */
2113 bool warn = cs->tcp == false || cs->tlsFrontend != nullptr || cs->dohFrontend != nullptr;
2114 int& fd = cs->tcp == false ? cs->udpFD : cs->tcpFD;
2115 (void) warn;
2116
2117 fd = SSocket(cs->local.sin4.sin_family, cs->tcp == false ? SOCK_DGRAM : SOCK_STREAM, 0);
2118
2119 if (cs->tcp) {
2120 SSetsockopt(fd, SOL_SOCKET, SO_REUSEADDR, 1);
2121 #ifdef TCP_DEFER_ACCEPT
2122 SSetsockopt(fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, 1);
2123 #endif
2124 if (cs->fastOpenQueueSize > 0) {
2125 #ifdef TCP_FASTOPEN
2126 SSetsockopt(fd, IPPROTO_TCP, TCP_FASTOPEN, cs->fastOpenQueueSize);
2127 #else
2128 if (warn) {
2129 warnlog("TCP Fast Open has been configured on local address '%s' but is not supported", cs->local.toStringWithPort());
2130 }
2131 #endif
2132 }
2133 }
2134
2135 if(cs->local.sin4.sin_family == AF_INET6) {
2136 SSetsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, 1);
2137 }
2138
2139 bindAny(cs->local.sin4.sin_family, fd);
2140
2141 if(!cs->tcp && IsAnyAddress(cs->local)) {
2142 int one=1;
2143 setsockopt(fd, IPPROTO_IP, GEN_IP_PKTINFO, &one, sizeof(one)); // linux supports this, so why not - might fail on other systems
2144 #ifdef IPV6_RECVPKTINFO
2145 setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, &one, sizeof(one));
2146 #endif
2147 }
2148
2149 if (cs->reuseport) {
2150 #ifdef SO_REUSEPORT
2151 SSetsockopt(fd, SOL_SOCKET, SO_REUSEPORT, 1);
2152 #else
2153 if (warn) {
2154 /* no need to warn again if configured but support is not available, we already did for UDP */
2155 warnlog("SO_REUSEPORT has been configured on local address '%s' but is not supported", cs->local.toStringWithPort());
2156 }
2157 #endif
2158 }
2159
2160 if (!cs->tcp) {
2161 if (cs->local.isIPv4()) {
2162 try {
2163 setSocketIgnorePMTU(cs->udpFD);
2164 }
2165 catch(const std::exception& e) {
2166 warnlog("Failed to set IP_MTU_DISCOVER on UDP server socket for local address '%s': %s", cs->local.toStringWithPort(), e.what());
2167 }
2168 }
2169 }
2170
2171 const std::string& itf = cs->interface;
2172 if (!itf.empty()) {
2173 #ifdef SO_BINDTODEVICE
2174 int res = setsockopt(fd, SOL_SOCKET, SO_BINDTODEVICE, itf.c_str(), itf.length());
2175 if (res != 0) {
2176 warnlog("Error setting up the interface on local address '%s': %s", cs->local.toStringWithPort(), stringerror());
2177 }
2178 #else
2179 if (warn) {
2180 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", cs->local.toStringWithPort());
2181 }
2182 #endif
2183 }
2184
2185 #ifdef HAVE_EBPF
2186 if (g_defaultBPFFilter) {
2187 cs->attachFilter(g_defaultBPFFilter);
2188 vinfolog("Attaching default BPF Filter to %s frontend %s", (!cs->tcp ? "UDP" : "TCP"), cs->local.toStringWithPort());
2189 }
2190 #endif /* HAVE_EBPF */
2191
2192 if (cs->tlsFrontend != nullptr) {
2193 if (!cs->tlsFrontend->setupTLS()) {
2194 errlog("Error while setting up TLS on local address '%s', exiting", cs->local.toStringWithPort());
2195 _exit(EXIT_FAILURE);
2196 }
2197 }
2198
2199 if (cs->dohFrontend != nullptr) {
2200 cs->dohFrontend->setup();
2201 }
2202
2203 SBind(fd, cs->local);
2204
2205 if (cs->tcp) {
2206 SListen(cs->tcpFD, SOMAXCONN);
2207 if (cs->tlsFrontend != nullptr) {
2208 warnlog("Listening on %s for TLS", cs->local.toStringWithPort());
2209 }
2210 else if (cs->dohFrontend != nullptr) {
2211 warnlog("Listening on %s for DoH", cs->local.toStringWithPort());
2212 }
2213 else if (cs->dnscryptCtx != nullptr) {
2214 warnlog("Listening on %s for DNSCrypt", cs->local.toStringWithPort());
2215 }
2216 else {
2217 warnlog("Listening on %s", cs->local.toStringWithPort());
2218 }
2219 }
2220
2221 cs->ready = true;
2222 }
2223
2224 struct
2225 {
2226 vector<string> locals;
2227 vector<string> remotes;
2228 bool checkConfig{false};
2229 bool beClient{false};
2230 bool beSupervised{false};
2231 string command;
2232 string config;
2233 string uid;
2234 string gid;
2235 } g_cmdLine;
2236
2237 std::atomic<bool> g_configurationDone{false};
2238
2239 static void usage()
2240 {
2241 cout<<endl;
2242 cout<<"Syntax: dnsdist [-C,--config file] [-c,--client [IP[:PORT]]]\n";
2243 cout<<"[-e,--execute cmd] [-h,--help] [-l,--local addr]\n";
2244 cout<<"[-v,--verbose] [--check-config] [--version]\n";
2245 cout<<"\n";
2246 cout<<"-a,--acl netmask Add this netmask to the ACL\n";
2247 cout<<"-C,--config file Load configuration from 'file'\n";
2248 cout<<"-c,--client Operate as a client, connect to dnsdist. This reads\n";
2249 cout<<" controlSocket from your configuration file, but also\n";
2250 cout<<" accepts an IP:PORT argument\n";
2251 #ifdef HAVE_LIBSODIUM
2252 cout<<"-k,--setkey KEY Use KEY for encrypted communication to dnsdist. This\n";
2253 cout<<" is similar to setting setKey in the configuration file.\n";
2254 cout<<" NOTE: this will leak this key in your shell's history\n";
2255 cout<<" and in the systems running process list.\n";
2256 #endif
2257 cout<<"--check-config Validate the configuration file and exit. The exit-code\n";
2258 cout<<" reflects the validation, 0 is OK, 1 means an error.\n";
2259 cout<<" Any errors are printed as well.\n";
2260 cout<<"-e,--execute cmd Connect to dnsdist and execute 'cmd'\n";
2261 cout<<"-g,--gid gid Change the process group ID after binding sockets\n";
2262 cout<<"-h,--help Display this helpful message\n";
2263 cout<<"-l,--local address Listen on this local address\n";
2264 cout<<"--supervised Don't open a console, I'm supervised\n";
2265 cout<<" (use with e.g. systemd and daemontools)\n";
2266 cout<<"--disable-syslog Don't log to syslog, only to stdout\n";
2267 cout<<" (use with e.g. systemd)\n";
2268 cout<<"-u,--uid uid Change the process user ID after binding sockets\n";
2269 cout<<"-v,--verbose Enable verbose mode\n";
2270 cout<<"-V,--version Show dnsdist version information and exit\n";
2271 }
2272
2273 int main(int argc, char** argv)
2274 try
2275 {
2276 size_t udpBindsCount = 0;
2277 size_t tcpBindsCount = 0;
2278 rl_attempted_completion_function = my_completion;
2279 rl_completion_append_character = 0;
2280
2281 signal(SIGPIPE, SIG_IGN);
2282 signal(SIGCHLD, SIG_IGN);
2283 openlog("dnsdist", LOG_PID|LOG_NDELAY, LOG_DAEMON);
2284
2285 #ifdef HAVE_LIBSODIUM
2286 if (sodium_init() == -1) {
2287 cerr<<"Unable to initialize crypto library"<<endl;
2288 exit(EXIT_FAILURE);
2289 }
2290 g_hashperturb=randombytes_uniform(0xffffffff);
2291 srandom(randombytes_uniform(0xffffffff));
2292 #else
2293 {
2294 struct timeval tv;
2295 gettimeofday(&tv, 0);
2296 srandom(tv.tv_sec ^ tv.tv_usec ^ getpid());
2297 g_hashperturb=random();
2298 }
2299
2300 #endif
2301 ComboAddress clientAddress = ComboAddress();
2302 g_cmdLine.config=SYSCONFDIR "/dnsdist.conf";
2303 struct option longopts[]={
2304 {"acl", required_argument, 0, 'a'},
2305 {"check-config", no_argument, 0, 1},
2306 {"client", no_argument, 0, 'c'},
2307 {"config", required_argument, 0, 'C'},
2308 {"disable-syslog", no_argument, 0, 2},
2309 {"execute", required_argument, 0, 'e'},
2310 {"gid", required_argument, 0, 'g'},
2311 {"help", no_argument, 0, 'h'},
2312 {"local", required_argument, 0, 'l'},
2313 {"setkey", required_argument, 0, 'k'},
2314 {"supervised", no_argument, 0, 3},
2315 {"uid", required_argument, 0, 'u'},
2316 {"verbose", no_argument, 0, 'v'},
2317 {"version", no_argument, 0, 'V'},
2318 {0,0,0,0}
2319 };
2320 int longindex=0;
2321 string optstring;
2322 for(;;) {
2323 int c=getopt_long(argc, argv, "a:cC:e:g:hk:l:u:vV", longopts, &longindex);
2324 if(c==-1)
2325 break;
2326 switch(c) {
2327 case 1:
2328 g_cmdLine.checkConfig=true;
2329 break;
2330 case 2:
2331 g_syslog=false;
2332 break;
2333 case 3:
2334 g_cmdLine.beSupervised=true;
2335 break;
2336 case 'C':
2337 g_cmdLine.config=optarg;
2338 break;
2339 case 'c':
2340 g_cmdLine.beClient=true;
2341 break;
2342 case 'e':
2343 g_cmdLine.command=optarg;
2344 break;
2345 case 'g':
2346 g_cmdLine.gid=optarg;
2347 break;
2348 case 'h':
2349 cout<<"dnsdist "<<VERSION<<endl;
2350 usage();
2351 cout<<"\n";
2352 exit(EXIT_SUCCESS);
2353 break;
2354 case 'a':
2355 optstring=optarg;
2356 g_ACL.modify([optstring](NetmaskGroup& nmg) { nmg.addMask(optstring); });
2357 break;
2358 case 'k':
2359 #ifdef HAVE_LIBSODIUM
2360 if (B64Decode(string(optarg), g_consoleKey) < 0) {
2361 cerr<<"Unable to decode key '"<<optarg<<"'."<<endl;
2362 exit(EXIT_FAILURE);
2363 }
2364 #else
2365 cerr<<"dnsdist has been built without libsodium, -k/--setkey is unsupported."<<endl;
2366 exit(EXIT_FAILURE);
2367 #endif
2368 break;
2369 case 'l':
2370 g_cmdLine.locals.push_back(trim_copy(string(optarg)));
2371 break;
2372 case 'u':
2373 g_cmdLine.uid=optarg;
2374 break;
2375 case 'v':
2376 g_verbose=true;
2377 break;
2378 case 'V':
2379 #ifdef LUAJIT_VERSION
2380 cout<<"dnsdist "<<VERSION<<" ("<<LUA_RELEASE<<" ["<<LUAJIT_VERSION<<"])"<<endl;
2381 #else
2382 cout<<"dnsdist "<<VERSION<<" ("<<LUA_RELEASE<<")"<<endl;
2383 #endif
2384 cout<<"Enabled features: ";
2385 #ifdef HAVE_CDB
2386 cout<<"cdb ";
2387 #endif
2388 #ifdef HAVE_DNS_OVER_TLS
2389 cout<<"dns-over-tls(";
2390 #ifdef HAVE_GNUTLS
2391 cout<<"gnutls";
2392 #ifdef HAVE_LIBSSL
2393 cout<<" ";
2394 #endif
2395 #endif
2396 #ifdef HAVE_LIBSSL
2397 cout<<"openssl";
2398 #endif
2399 cout<<") ";
2400 #endif
2401 #ifdef HAVE_DNS_OVER_HTTPS
2402 cout<<"dns-over-https(DOH) ";
2403 #endif
2404 #ifdef HAVE_DNSCRYPT
2405 cout<<"dnscrypt ";
2406 #endif
2407 #ifdef HAVE_EBPF
2408 cout<<"ebpf ";
2409 #endif
2410 #ifdef HAVE_FSTRM
2411 cout<<"fstrm ";
2412 #endif
2413 #ifdef HAVE_LIBCRYPTO
2414 cout<<"ipcipher ";
2415 #endif
2416 #ifdef HAVE_LIBSODIUM
2417 cout<<"libsodium ";
2418 #endif
2419 #ifdef HAVE_LMDB
2420 cout<<"lmdb ";
2421 #endif
2422 #ifdef HAVE_PROTOBUF
2423 cout<<"protobuf ";
2424 #endif
2425 #ifdef HAVE_RE2
2426 cout<<"re2 ";
2427 #endif
2428 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
2429 cout<<"recvmmsg/sendmmsg ";
2430 #endif
2431 #ifdef HAVE_NET_SNMP
2432 cout<<"snmp ";
2433 #endif
2434 #ifdef HAVE_SYSTEMD
2435 cout<<"systemd";
2436 #endif
2437 cout<<endl;
2438 exit(EXIT_SUCCESS);
2439 break;
2440 case '?':
2441 //getopt_long printed an error message.
2442 usage();
2443 exit(EXIT_FAILURE);
2444 break;
2445 }
2446 }
2447
2448 argc-=optind;
2449 argv+=optind;
2450 for(auto p = argv; *p; ++p) {
2451 if(g_cmdLine.beClient) {
2452 clientAddress = ComboAddress(*p, 5199);
2453 } else {
2454 g_cmdLine.remotes.push_back(*p);
2455 }
2456 }
2457
2458 ServerPolicy leastOutstandingPol{"leastOutstanding", leastOutstanding, false};
2459
2460 g_policy.setState(leastOutstandingPol);
2461 if(g_cmdLine.beClient || !g_cmdLine.command.empty()) {
2462 setupLua(true, false, g_cmdLine.config);
2463 if (clientAddress != ComboAddress())
2464 g_serverControl = clientAddress;
2465 doClient(g_serverControl, g_cmdLine.command);
2466 _exit(EXIT_SUCCESS);
2467 }
2468
2469 auto acl = g_ACL.getCopy();
2470 if(acl.empty()) {
2471 for(auto& addr : {"127.0.0.0/8", "10.0.0.0/8", "100.64.0.0/10", "169.254.0.0/16", "192.168.0.0/16", "172.16.0.0/12", "::1/128", "fc00::/7", "fe80::/10"})
2472 acl.addMask(addr);
2473 g_ACL.setState(acl);
2474 }
2475
2476 auto consoleACL = g_consoleACL.getCopy();
2477 for (const auto& mask : { "127.0.0.1/8", "::1/128" }) {
2478 consoleACL.addMask(mask);
2479 }
2480 g_consoleACL.setState(consoleACL);
2481
2482 if (g_cmdLine.checkConfig) {
2483 setupLua(false, true, g_cmdLine.config);
2484 // No exception was thrown
2485 infolog("Configuration '%s' OK!", g_cmdLine.config);
2486 _exit(EXIT_SUCCESS);
2487 }
2488
2489 auto todo=setupLua(false, false, g_cmdLine.config);
2490
2491 auto localPools = g_pools.getCopy();
2492 {
2493 bool precompute = false;
2494 if (g_policy.getLocal()->name == "chashed") {
2495 precompute = true;
2496 } else {
2497 for (const auto& entry: localPools) {
2498 if (entry.second->policy != nullptr && entry.second->policy->name == "chashed") {
2499 precompute = true;
2500 break ;
2501 }
2502 }
2503 }
2504 if (precompute) {
2505 vinfolog("Pre-computing hashes for consistent hash load-balancing policy");
2506 // pre compute hashes
2507 auto backends = g_dstates.getLocal();
2508 for (auto& backend: *backends) {
2509 backend->hash();
2510 }
2511 }
2512 }
2513
2514 if (!g_cmdLine.locals.empty()) {
2515 for (auto it = g_frontends.begin(); it != g_frontends.end(); ) {
2516 /* DoH, DoT and DNSCrypt frontends are separate */
2517 if ((*it)->dohFrontend == nullptr && (*it)->tlsFrontend == nullptr && (*it)->dnscryptCtx == nullptr) {
2518 it = g_frontends.erase(it);
2519 }
2520 else {
2521 ++it;
2522 }
2523 }
2524
2525 for(const auto& loc : g_cmdLine.locals) {
2526 /* UDP */
2527 g_frontends.push_back(std::unique_ptr<ClientState>(new ClientState(ComboAddress(loc, 53), false, false, 0, "", {})));
2528 /* TCP */
2529 g_frontends.push_back(std::unique_ptr<ClientState>(new ClientState(ComboAddress(loc, 53), true, false, 0, "", {})));
2530 }
2531 }
2532
2533 if (g_frontends.empty()) {
2534 /* UDP */
2535 g_frontends.push_back(std::unique_ptr<ClientState>(new ClientState(ComboAddress("127.0.0.1", 53), false, false, 0, "", {})));
2536 /* TCP */
2537 g_frontends.push_back(std::unique_ptr<ClientState>(new ClientState(ComboAddress("127.0.0.1", 53), true, false, 0, "", {})));
2538 }
2539
2540 g_configurationDone = true;
2541
2542 for(auto& frontend : g_frontends) {
2543 setUpLocalBind(frontend);
2544
2545 if (frontend->tcp == false) {
2546 ++udpBindsCount;
2547 }
2548 else {
2549 ++tcpBindsCount;
2550 }
2551 }
2552
2553 warnlog("dnsdist %s comes with ABSOLUTELY NO WARRANTY. This is free software, and you are welcome to redistribute it according to the terms of the GPL version 2", VERSION);
2554
2555 vector<string> vec;
2556 std::string acls;
2557 g_ACL.getLocal()->toStringVector(&vec);
2558 for(const auto& s : vec) {
2559 if (!acls.empty())
2560 acls += ", ";
2561 acls += s;
2562 }
2563 infolog("ACL allowing queries from: %s", acls.c_str());
2564 vec.clear();
2565 acls.clear();
2566 g_consoleACL.getLocal()->toStringVector(&vec);
2567 for (const auto& entry : vec) {
2568 if (!acls.empty()) {
2569 acls += ", ";
2570 }
2571 acls += entry;
2572 }
2573 infolog("Console ACL allowing connections from: %s", acls.c_str());
2574
2575 #ifdef HAVE_LIBSODIUM
2576 if (g_consoleEnabled && g_consoleKey.empty()) {
2577 warnlog("Warning, the console has been enabled via 'controlSocket()' but no key has been set with 'setKey()' so all connections will fail until a key has been set");
2578 }
2579 #endif
2580
2581 uid_t newgid=0;
2582 gid_t newuid=0;
2583
2584 if(!g_cmdLine.gid.empty())
2585 newgid = strToGID(g_cmdLine.gid.c_str());
2586
2587 if(!g_cmdLine.uid.empty())
2588 newuid = strToUID(g_cmdLine.uid.c_str());
2589
2590 dropGroupPrivs(newgid);
2591 dropUserPrivs(newuid);
2592 try {
2593 /* we might still have capabilities remaining,
2594 for example if we have been started as root
2595 without --uid or --gid (please don't do that)
2596 or as an unprivileged user with ambient
2597 capabilities like CAP_NET_BIND_SERVICE.
2598 */
2599 dropCapabilities(g_capabilitiesToRetain);
2600 }
2601 catch(const std::exception& e) {
2602 warnlog("%s", e.what());
2603 }
2604
2605 /* this need to be done _after_ dropping privileges */
2606 g_delay = new DelayPipe<DelayedPacket>();
2607
2608 if (g_snmpAgent) {
2609 g_snmpAgent->run();
2610 }
2611
2612 g_tcpclientthreads = std::unique_ptr<TCPClientCollection>(new TCPClientCollection(g_maxTCPClientThreads, g_useTCPSinglePipe));
2613
2614 for(auto& t : todo)
2615 t();
2616
2617 localPools = g_pools.getCopy();
2618 /* create the default pool no matter what */
2619 createPoolIfNotExists(localPools, "");
2620 if(g_cmdLine.remotes.size()) {
2621 for(const auto& address : g_cmdLine.remotes) {
2622 auto ret=std::make_shared<DownstreamState>(ComboAddress(address, 53));
2623 addServerToPool(localPools, "", ret);
2624 if (ret->connected && !ret->threadStarted.test_and_set()) {
2625 ret->tid = thread(responderThread, ret);
2626 }
2627 g_dstates.modify([ret](servers_t& servers) { servers.push_back(ret); });
2628 }
2629 }
2630 g_pools.setState(localPools);
2631
2632 if(g_dstates.getLocal()->empty()) {
2633 errlog("No downstream servers defined: all packets will get dropped");
2634 // you might define them later, but you need to know
2635 }
2636
2637 checkFileDescriptorsLimits(udpBindsCount, tcpBindsCount);
2638
2639 auto mplexer = std::shared_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
2640 for(auto& dss : g_dstates.getCopy()) { // it is a copy, but the internal shared_ptrs are the real deal
2641 if (dss->availability == DownstreamState::Availability::Auto) {
2642 if (!queueHealthCheck(mplexer, dss, true)) {
2643 dss->upStatus = false;
2644 warnlog("Marking downstream %s as 'down'", dss->getNameWithAddr());
2645 }
2646 }
2647 }
2648 handleQueuedHealthChecks(mplexer, true);
2649
2650 for(auto& cs : g_frontends) {
2651 if (cs->dohFrontend != nullptr) {
2652 #ifdef HAVE_DNS_OVER_HTTPS
2653 std::thread t1(dohThread, cs.get());
2654 if (!cs->cpus.empty()) {
2655 mapThreadToCPUList(t1.native_handle(), cs->cpus);
2656 }
2657 t1.detach();
2658 #endif /* HAVE_DNS_OVER_HTTPS */
2659 continue;
2660 }
2661 if (cs->udpFD >= 0) {
2662 thread t1(udpClientThread, cs.get());
2663 if (!cs->cpus.empty()) {
2664 mapThreadToCPUList(t1.native_handle(), cs->cpus);
2665 }
2666 t1.detach();
2667 }
2668 else if (cs->tcpFD >= 0) {
2669 thread t1(tcpAcceptorThread, cs.get());
2670 if (!cs->cpus.empty()) {
2671 mapThreadToCPUList(t1.native_handle(), cs->cpus);
2672 }
2673 t1.detach();
2674 }
2675 }
2676
2677 thread carbonthread(carbonDumpThread);
2678 carbonthread.detach();
2679
2680 thread stattid(maintThread);
2681 stattid.detach();
2682
2683 thread healththread(healthChecksThread);
2684
2685 if (!g_secPollSuffix.empty()) {
2686 thread secpollthread(secPollThread);
2687 secpollthread.detach();
2688 }
2689
2690 if(g_cmdLine.beSupervised) {
2691 #ifdef HAVE_SYSTEMD
2692 sd_notify(0, "READY=1");
2693 #endif
2694 healththread.join();
2695 }
2696 else {
2697 healththread.detach();
2698 doConsole();
2699 }
2700 _exit(EXIT_SUCCESS);
2701
2702 }
2703 catch(const LuaContext::ExecutionErrorException& e) {
2704 try {
2705 errlog("Fatal Lua error: %s", e.what());
2706 std::rethrow_if_nested(e);
2707 } catch(const std::exception& ne) {
2708 errlog("Details: %s", ne.what());
2709 }
2710 catch(PDNSException &ae)
2711 {
2712 errlog("Fatal pdns error: %s", ae.reason);
2713 }
2714 _exit(EXIT_FAILURE);
2715 }
2716 catch(std::exception &e)
2717 {
2718 errlog("Fatal error: %s", e.what());
2719 _exit(EXIT_FAILURE);
2720 }
2721 catch(PDNSException &ae)
2722 {
2723 errlog("Fatal pdns error: %s", ae.reason);
2724 _exit(EXIT_FAILURE);
2725 }
2726
2727 uint64_t getLatencyCount(const std::string&)
2728 {
2729 return g_stats.responses + g_stats.selfAnswered + g_stats.cacheHits;
2730 }