]> git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/dnsdist.cc
Merge pull request #8391 from omoerbeek/rec-out-of-order
[thirdparty/pdns.git] / pdns / dnsdist.cc
1 /*
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
8 *
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
22
23 #include "config.h"
24
25 #include <fstream>
26 #include <getopt.h>
27 #include <grp.h>
28 #include <limits>
29 #include <netinet/tcp.h>
30 #include <pwd.h>
31 #include <sys/resource.h>
32 #include <unistd.h>
33
34 #if defined (__OpenBSD__) || defined(__NetBSD__)
35 #include <readline/readline.h>
36 #else
37 #include <editline/readline.h>
38 #endif
39
40 #ifdef HAVE_SYSTEMD
41 #include <systemd/sd-daemon.h>
42 #endif
43
44 #include "dnsdist.hh"
45 #include "dnsdist-cache.hh"
46 #include "dnsdist-console.hh"
47 #include "dnsdist-ecs.hh"
48 #include "dnsdist-lua.hh"
49 #include "dnsdist-rings.hh"
50 #include "dnsdist-secpoll.hh"
51 #include "dnsdist-xpf.hh"
52
53 #include "base64.hh"
54 #include "delaypipe.hh"
55 #include "dolog.hh"
56 #include "dnsname.hh"
57 #include "dnsparser.hh"
58 #include "dnswriter.hh"
59 #include "ednsoptions.hh"
60 #include "gettime.hh"
61 #include "lock.hh"
62 #include "misc.hh"
63 #include "sodcrypto.hh"
64 #include "sstuff.hh"
65 #include "threadname.hh"
66
67 /* Known sins:
68
69 Receiver is currently single threaded
70 not *that* bad actually, but now that we are thread safe, might want to scale
71 */
72
73 /* the Rulaction plan
74 Set of Rules, if one matches, it leads to an Action
75 Both rules and actions could conceivably be Lua based.
76 On the C++ side, both could be inherited from a class Rule and a class Action,
77 on the Lua side we can't do that. */
78
79 using std::atomic;
80 using std::thread;
81 bool g_verbose;
82
83 struct DNSDistStats g_stats;
84 MetricDefinitionStorage g_metricDefinitions;
85
86 uint16_t g_maxOutstanding{std::numeric_limits<uint16_t>::max()};
87 bool g_verboseHealthChecks{false};
88 uint32_t g_staleCacheEntriesTTL{0};
89 bool g_syslog{true};
90 bool g_allowEmptyResponse{false};
91
92 GlobalStateHolder<NetmaskGroup> g_ACL;
93 string g_outputBuffer;
94
95 std::vector<std::shared_ptr<TLSFrontend>> g_tlslocals;
96 std::vector<std::shared_ptr<DOHFrontend>> g_dohlocals;
97 std::vector<std::shared_ptr<DNSCryptContext>> g_dnsCryptLocals;
98 #ifdef HAVE_EBPF
99 shared_ptr<BPFFilter> g_defaultBPFFilter;
100 std::vector<std::shared_ptr<DynBPFFilter> > g_dynBPFFilters;
101 #endif /* HAVE_EBPF */
102 std::vector<std::unique_ptr<ClientState>> g_frontends;
103 GlobalStateHolder<pools_t> g_pools;
104 size_t g_udpVectorSize{1};
105
106 bool g_snmpEnabled{false};
107 bool g_snmpTrapsEnabled{false};
108 DNSDistSNMPAgent* g_snmpAgent{nullptr};
109
110 /* UDP: the grand design. Per socket we listen on for incoming queries there is one thread.
111 Then we have a bunch of connected sockets for talking to downstream servers.
112 We send directly to those sockets.
113
114 For the return path, per downstream server we have a thread that listens to responses.
115
116 Per socket there is an array of 2^16 states, when we send out a packet downstream, we note
117 there the original requestor and the original id. The new ID is the offset in the array.
118
119 When an answer comes in on a socket, we look up the offset by the id, and lob it to the
120 original requestor.
121
122 IDs are assigned by atomic increments of the socket offset.
123 */
124
125 GlobalStateHolder<vector<DNSDistRuleAction> > g_rulactions;
126 GlobalStateHolder<vector<DNSDistResponseRuleAction> > g_resprulactions;
127 GlobalStateHolder<vector<DNSDistResponseRuleAction> > g_cachehitresprulactions;
128 GlobalStateHolder<vector<DNSDistResponseRuleAction> > g_selfansweredresprulactions;
129
130 Rings g_rings;
131 QueryCount g_qcount;
132
133 GlobalStateHolder<servers_t> g_dstates;
134 GlobalStateHolder<NetmaskTree<DynBlock>> g_dynblockNMG;
135 GlobalStateHolder<SuffixMatchTree<DynBlock>> g_dynblockSMT;
136 DNSAction::Action g_dynBlockAction = DNSAction::Action::Drop;
137 int g_tcpRecvTimeout{2};
138 int g_tcpSendTimeout{2};
139 int g_udpTimeout{2};
140
141 bool g_servFailOnNoPolicy{false};
142 bool g_truncateTC{false};
143 bool g_fixupCase{false};
144 bool g_preserveTrailingData{false};
145 bool g_roundrobinFailOnNoServer{false};
146
147 std::set<std::string> g_capabilitiesToRetain;
148
149 static void truncateTC(char* packet, uint16_t* len, size_t responseSize, unsigned int consumed)
150 try
151 {
152 bool hadEDNS = false;
153 uint16_t payloadSize = 0;
154 uint16_t z = 0;
155
156 if (g_addEDNSToSelfGeneratedResponses) {
157 hadEDNS = getEDNSUDPPayloadSizeAndZ(packet, *len, &payloadSize, &z);
158 }
159
160 *len=static_cast<uint16_t>(sizeof(dnsheader)+consumed+DNS_TYPE_SIZE+DNS_CLASS_SIZE);
161 struct dnsheader* dh = reinterpret_cast<struct dnsheader*>(packet);
162 dh->ancount = dh->arcount = dh->nscount = 0;
163
164 if (hadEDNS) {
165 addEDNS(dh, *len, responseSize, z & EDNS_HEADER_FLAG_DO, payloadSize, 0);
166 }
167 }
168 catch(...)
169 {
170 g_stats.truncFail++;
171 }
172
173 struct DelayedPacket
174 {
175 int fd;
176 string packet;
177 ComboAddress destination;
178 ComboAddress origDest;
179 void operator()()
180 {
181 ssize_t res;
182 if(origDest.sin4.sin_family == 0) {
183 res = sendto(fd, packet.c_str(), packet.size(), 0, (struct sockaddr*)&destination, destination.getSocklen());
184 }
185 else {
186 res = sendfromto(fd, packet.c_str(), packet.size(), 0, origDest, destination);
187 }
188 if (res == -1) {
189 int err = errno;
190 vinfolog("Error sending delayed response to %s: %s", destination.toStringWithPort(), strerror(err));
191 }
192 }
193 };
194
195 DelayPipe<DelayedPacket>* g_delay = nullptr;
196
197 void doLatencyStats(double udiff)
198 {
199 if(udiff < 1000) ++g_stats.latency0_1;
200 else if(udiff < 10000) ++g_stats.latency1_10;
201 else if(udiff < 50000) ++g_stats.latency10_50;
202 else if(udiff < 100000) ++g_stats.latency50_100;
203 else if(udiff < 1000000) ++g_stats.latency100_1000;
204 else ++g_stats.latencySlow;
205 g_stats.latencySum += udiff / 1000;
206
207 auto doAvg = [](double& var, double n, double weight) {
208 var = (weight -1) * var/weight + n/weight;
209 };
210
211 doAvg(g_stats.latencyAvg100, udiff, 100);
212 doAvg(g_stats.latencyAvg1000, udiff, 1000);
213 doAvg(g_stats.latencyAvg10000, udiff, 10000);
214 doAvg(g_stats.latencyAvg1000000, udiff, 1000000);
215 }
216
217 bool responseContentMatches(const char* response, const uint16_t responseLen, const DNSName& qname, const uint16_t qtype, const uint16_t qclass, const ComboAddress& remote, unsigned int& consumed)
218 {
219 if (responseLen < sizeof(dnsheader)) {
220 return false;
221 }
222
223 const struct dnsheader* dh = reinterpret_cast<const struct dnsheader*>(response);
224 if (dh->qdcount == 0) {
225 if ((dh->rcode != RCode::NoError && dh->rcode != RCode::NXDomain) || g_allowEmptyResponse) {
226 return true;
227 }
228 else {
229 ++g_stats.nonCompliantResponses;
230 return false;
231 }
232 }
233
234 uint16_t rqtype, rqclass;
235 DNSName rqname;
236 try {
237 rqname=DNSName(response, responseLen, sizeof(dnsheader), false, &rqtype, &rqclass, &consumed);
238 }
239 catch(const std::exception& e) {
240 if(responseLen > 0 && static_cast<size_t>(responseLen) > sizeof(dnsheader)) {
241 infolog("Backend %s sent us a response with id %d that did not parse: %s", remote.toStringWithPort(), ntohs(dh->id), e.what());
242 }
243 ++g_stats.nonCompliantResponses;
244 return false;
245 }
246
247 if (rqtype != qtype || rqclass != qclass || rqname != qname) {
248 return false;
249 }
250
251 return true;
252 }
253
254 static void restoreFlags(struct dnsheader* dh, uint16_t origFlags)
255 {
256 static const uint16_t rdMask = 1 << FLAGS_RD_OFFSET;
257 static const uint16_t cdMask = 1 << FLAGS_CD_OFFSET;
258 static const uint16_t restoreFlagsMask = UINT16_MAX & ~(rdMask | cdMask);
259 uint16_t * flags = getFlagsFromDNSHeader(dh);
260 /* clear the flags we are about to restore */
261 *flags &= restoreFlagsMask;
262 /* only keep the flags we want to restore */
263 origFlags &= ~restoreFlagsMask;
264 /* set the saved flags as they were */
265 *flags |= origFlags;
266 }
267
268 static bool fixUpQueryTurnedResponse(DNSQuestion& dq, const uint16_t origFlags)
269 {
270 restoreFlags(dq.dh, origFlags);
271
272 return addEDNSToQueryTurnedResponse(dq);
273 }
274
275 static bool fixUpResponse(char** response, uint16_t* responseLen, size_t* responseSize, const DNSName& qname, uint16_t origFlags, bool ednsAdded, bool ecsAdded, std::vector<uint8_t>& rewrittenResponse, uint16_t addRoom, bool* zeroScope)
276 {
277 if (*responseLen < sizeof(dnsheader)) {
278 return false;
279 }
280
281 struct dnsheader* dh = reinterpret_cast<struct dnsheader*>(*response);
282 restoreFlags(dh, origFlags);
283
284 if (*responseLen == sizeof(dnsheader)) {
285 return true;
286 }
287
288 if(g_fixupCase) {
289 string realname = qname.toDNSString();
290 if (*responseLen >= (sizeof(dnsheader) + realname.length())) {
291 memcpy(*response + sizeof(dnsheader), realname.c_str(), realname.length());
292 }
293 }
294
295 if (ednsAdded || ecsAdded) {
296 uint16_t optStart;
297 size_t optLen = 0;
298 bool last = false;
299
300 const std::string responseStr(*response, *responseLen);
301 int res = locateEDNSOptRR(responseStr, &optStart, &optLen, &last);
302
303 if (res == 0) {
304 if (zeroScope) { // this finds if an EDNS Client Subnet scope was set, and if it is 0
305 size_t optContentStart = 0;
306 uint16_t optContentLen = 0;
307 /* we need at least 4 bytes after the option length (family: 2, source prefix-length: 1, scope prefix-length: 1) */
308 if (isEDNSOptionInOpt(responseStr, optStart, optLen, EDNSOptionCode::ECS, &optContentStart, &optContentLen) && optContentLen >= 4) {
309 /* see if the EDNS Client Subnet SCOPE PREFIX-LENGTH byte in position 3 is set to 0, which is the only thing
310 we care about. */
311 *zeroScope = responseStr.at(optContentStart + 3) == 0;
312 }
313 }
314
315 if (ednsAdded) {
316 /* we added the entire OPT RR,
317 therefore we need to remove it entirely */
318 if (last) {
319 /* simply remove the last AR */
320 *responseLen -= optLen;
321 uint16_t arcount = ntohs(dh->arcount);
322 arcount--;
323 dh->arcount = htons(arcount);
324 }
325 else {
326 /* Removing an intermediary RR could lead to compression error */
327 if (rewriteResponseWithoutEDNS(responseStr, rewrittenResponse) == 0) {
328 *responseLen = rewrittenResponse.size();
329 if (addRoom && (UINT16_MAX - *responseLen) > addRoom) {
330 rewrittenResponse.reserve(*responseLen + addRoom);
331 }
332 *responseSize = rewrittenResponse.capacity();
333 *response = reinterpret_cast<char*>(rewrittenResponse.data());
334 }
335 else {
336 warnlog("Error rewriting content");
337 }
338 }
339 }
340 else {
341 /* the OPT RR was already present, but without ECS,
342 we need to remove the ECS option if any */
343 if (last) {
344 /* nothing after the OPT RR, we can simply remove the
345 ECS option */
346 size_t existingOptLen = optLen;
347 removeEDNSOptionFromOPT(*response + optStart, &optLen, EDNSOptionCode::ECS);
348 *responseLen -= (existingOptLen - optLen);
349 }
350 else {
351 /* Removing an intermediary RR could lead to compression error */
352 if (rewriteResponseWithoutEDNSOption(responseStr, EDNSOptionCode::ECS, rewrittenResponse) == 0) {
353 *responseLen = rewrittenResponse.size();
354 if (addRoom && (UINT16_MAX - *responseLen) > addRoom) {
355 rewrittenResponse.reserve(*responseLen + addRoom);
356 }
357 *responseSize = rewrittenResponse.capacity();
358 *response = reinterpret_cast<char*>(rewrittenResponse.data());
359 }
360 else {
361 warnlog("Error rewriting content");
362 }
363 }
364 }
365 }
366 }
367
368 return true;
369 }
370
371 #ifdef HAVE_DNSCRYPT
372 static bool encryptResponse(char* response, uint16_t* responseLen, size_t responseSize, bool tcp, std::shared_ptr<DNSCryptQuery> dnsCryptQuery, dnsheader** dh, dnsheader* dhCopy)
373 {
374 if (dnsCryptQuery) {
375 uint16_t encryptedResponseLen = 0;
376
377 /* save the original header before encrypting it in place */
378 if (dh != nullptr && *dh != nullptr && dhCopy != nullptr) {
379 memcpy(dhCopy, *dh, sizeof(dnsheader));
380 *dh = dhCopy;
381 }
382
383 int res = dnsCryptQuery->encryptResponse(response, *responseLen, responseSize, tcp, &encryptedResponseLen);
384 if (res == 0) {
385 *responseLen = encryptedResponseLen;
386 } else {
387 /* dropping response */
388 vinfolog("Error encrypting the response, dropping.");
389 return false;
390 }
391 }
392 return true;
393 }
394 #endif /* HAVE_DNSCRYPT */
395
396 static bool applyRulesToResponse(LocalStateHolder<vector<DNSDistResponseRuleAction> >& localRespRulactions, DNSResponse& dr)
397 {
398 DNSResponseAction::Action action=DNSResponseAction::Action::None;
399 std::string ruleresult;
400 for(const auto& lr : *localRespRulactions) {
401 if(lr.d_rule->matches(&dr)) {
402 lr.d_rule->d_matches++;
403 action=(*lr.d_action)(&dr, &ruleresult);
404 switch(action) {
405 case DNSResponseAction::Action::Allow:
406 return true;
407 break;
408 case DNSResponseAction::Action::Drop:
409 return false;
410 break;
411 case DNSResponseAction::Action::HeaderModify:
412 return true;
413 break;
414 case DNSResponseAction::Action::ServFail:
415 dr.dh->rcode = RCode::ServFail;
416 return true;
417 break;
418 /* non-terminal actions follow */
419 case DNSResponseAction::Action::Delay:
420 dr.delayMsec = static_cast<int>(pdns_stou(ruleresult)); // sorry
421 break;
422 case DNSResponseAction::Action::None:
423 break;
424 }
425 }
426 }
427
428 return true;
429 }
430
431 bool processResponse(char** response, uint16_t* responseLen, size_t* responseSize, LocalStateHolder<vector<DNSDistResponseRuleAction> >& localRespRulactions, DNSResponse& dr, size_t addRoom, std::vector<uint8_t>& rewrittenResponse, bool muted)
432 {
433 if (!applyRulesToResponse(localRespRulactions, dr)) {
434 return false;
435 }
436
437 bool zeroScope = false;
438 if (!fixUpResponse(response, responseLen, responseSize, *dr.qname, dr.origFlags, dr.ednsAdded, dr.ecsAdded, rewrittenResponse, addRoom, dr.useZeroScope ? &zeroScope : nullptr)) {
439 return false;
440 }
441
442 if (dr.packetCache && !dr.skipCache && *responseLen <= s_maxPacketCacheEntrySize) {
443 if (!dr.useZeroScope) {
444 /* if the query was not suitable for zero-scope, for
445 example because it had an existing ECS entry so the hash is
446 not really 'no ECS', so just insert it for the existing subnet
447 since:
448 - we don't have the correct hash for a non-ECS query
449 - inserting with hash computed before the ECS replacement but with
450 the subnet extracted _after_ the replacement would not work.
451 */
452 zeroScope = false;
453 }
454 // if zeroScope, pass the pre-ECS hash-key and do not pass the subnet to the cache
455 dr.packetCache->insert(zeroScope ? dr.cacheKeyNoECS : dr.cacheKey, zeroScope ? boost::none : dr.subnet, dr.origFlags, dr.dnssecOK, *dr.qname, dr.qtype, dr.qclass, *response, *responseLen, dr.tcp, dr.dh->rcode, dr.tempFailureTTL);
456 }
457
458 #ifdef HAVE_DNSCRYPT
459 if (!muted) {
460 if (!encryptResponse(*response, responseLen, *responseSize, dr.tcp, dr.dnsCryptQuery, nullptr, nullptr)) {
461 return false;
462 }
463 }
464 #endif /* HAVE_DNSCRYPT */
465
466 return true;
467 }
468
469 static bool sendUDPResponse(int origFD, const char* response, const uint16_t responseLen, const int delayMsec, const ComboAddress& origDest, const ComboAddress& origRemote)
470 {
471 if(delayMsec && g_delay) {
472 DelayedPacket dp{origFD, string(response,responseLen), origRemote, origDest};
473 g_delay->submit(dp, delayMsec);
474 }
475 else {
476 ssize_t res;
477 if(origDest.sin4.sin_family == 0) {
478 res = sendto(origFD, response, responseLen, 0, reinterpret_cast<const struct sockaddr*>(&origRemote), origRemote.getSocklen());
479 }
480 else {
481 res = sendfromto(origFD, response, responseLen, 0, origDest, origRemote);
482 }
483 if (res == -1) {
484 int err = errno;
485 vinfolog("Error sending response to %s: %s", origRemote.toStringWithPort(), stringerror(err));
486 }
487 }
488
489 return true;
490 }
491
492
493 int pickBackendSocketForSending(std::shared_ptr<DownstreamState>& state)
494 {
495 return state->sockets[state->socketsOffset++ % state->sockets.size()];
496 }
497
498 static void pickBackendSocketsReadyForReceiving(const std::shared_ptr<DownstreamState>& state, std::vector<int>& ready)
499 {
500 ready.clear();
501
502 if (state->sockets.size() == 1) {
503 ready.push_back(state->sockets[0]);
504 return ;
505 }
506
507 {
508 std::lock_guard<std::mutex> lock(state->socketsLock);
509 state->mplexer->getAvailableFDs(ready, -1);
510 }
511 }
512
513 // listens on a dedicated socket, lobs answers from downstream servers to original requestors
514 void responderThread(std::shared_ptr<DownstreamState> dss)
515 try {
516 setThreadName("dnsdist/respond");
517 auto localRespRulactions = g_resprulactions.getLocal();
518 char packet[s_maxPacketCacheEntrySize + DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE];
519 static_assert(sizeof(packet) <= UINT16_MAX, "Packet size should fit in a uint16_t");
520 /* when the answer is encrypted in place, we need to get a copy
521 of the original header before encryption to fill the ring buffer */
522 dnsheader cleartextDH;
523 vector<uint8_t> rewrittenResponse;
524
525 uint16_t queryId = 0;
526 std::vector<int> sockets;
527 sockets.reserve(dss->sockets.size());
528
529 for(;;) {
530 dnsheader* dh = reinterpret_cast<struct dnsheader*>(packet);
531 try {
532 pickBackendSocketsReadyForReceiving(dss, sockets);
533 for (const auto& fd : sockets) {
534 ssize_t got = recv(fd, packet, sizeof(packet), 0);
535 char * response = packet;
536 size_t responseSize = sizeof(packet);
537
538 if (got < 0 || static_cast<size_t>(got) < sizeof(dnsheader))
539 continue;
540
541 uint16_t responseLen = static_cast<uint16_t>(got);
542 queryId = dh->id;
543
544 if(queryId >= dss->idStates.size()) {
545 continue;
546 }
547
548 IDState* ids = &dss->idStates[queryId];
549 int64_t usageIndicator = ids->usageIndicator;
550
551 if(!IDState::isInUse(usageIndicator)) {
552 /* the corresponding state is marked as not in use, meaning that:
553 - it was already cleaned up by another thread and the state is gone ;
554 - we already got a response for this query and this one is a duplicate.
555 Either way, we don't touch it.
556 */
557 continue;
558 }
559
560 /* read the potential DOHUnit state as soon as possible, but don't use it
561 until we have confirmed that we own this state by updating usageIndicator */
562 auto du = ids->du;
563 /* setting age to 0 to prevent the maintainer thread from
564 cleaning this IDS while we process the response.
565 */
566 ids->age = 0;
567 int origFD = ids->origFD;
568
569 unsigned int consumed = 0;
570 if (!responseContentMatches(response, responseLen, ids->qname, ids->qtype, ids->qclass, dss->remote, consumed)) {
571 continue;
572 }
573
574 bool isDoH = du != nullptr;
575 /* atomically mark the state as available, but only if it has not been altered
576 in the meantime */
577 if (ids->tryMarkUnused(usageIndicator)) {
578 /* clear the potential DOHUnit asap, it's ours now
579 and since we just marked the state as unused,
580 someone could overwrite it. */
581 ids->du = nullptr;
582 /* we only decrement the outstanding counter if the value was not
583 altered in the meantime, which would mean that the state has been actively reused
584 and the other thread has not incremented the outstanding counter, so we don't
585 want it to be decremented twice. */
586 --dss->outstanding; // you'd think an attacker could game this, but we're using connected socket
587 } else {
588 /* someone updated the state in the meantime, we can't touch the existing pointer */
589 du = nullptr;
590 /* since the state has been updated, we can't safely access it so let's just drop
591 this response */
592 continue;
593 }
594
595 if(dh->tc && g_truncateTC) {
596 truncateTC(response, &responseLen, responseSize, consumed);
597 }
598
599 dh->id = ids->origID;
600
601 uint16_t addRoom = 0;
602 DNSResponse dr = makeDNSResponseFromIDState(*ids, dh, sizeof(packet), responseLen, false);
603 if (dr.dnsCryptQuery) {
604 addRoom = DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE;
605 }
606
607 memcpy(&cleartextDH, dr.dh, sizeof(cleartextDH));
608 if (!processResponse(&response, &responseLen, &responseSize, localRespRulactions, dr, addRoom, rewrittenResponse, ids->cs && ids->cs->muted)) {
609 continue;
610 }
611
612 if (ids->cs && !ids->cs->muted) {
613 if (du) {
614 #ifdef HAVE_DNS_OVER_HTTPS
615 // DoH query
616 du->response = std::string(response, responseLen);
617 if (send(du->rsock, &du, sizeof(du), 0) != sizeof(du)) {
618 /* at this point we have the only remaining pointer on this
619 DOHUnit object since we did set ids->du to nullptr earlier */
620 delete du;
621 }
622 #endif /* HAVE_DNS_OVER_HTTPS */
623 du = nullptr;
624 }
625 else {
626 ComboAddress empty;
627 empty.sin4.sin_family = 0;
628 /* if ids->destHarvested is false, origDest holds the listening address.
629 We don't want to use that as a source since it could be 0.0.0.0 for example. */
630 sendUDPResponse(origFD, response, responseLen, dr.delayMsec, ids->destHarvested ? ids->origDest : empty, ids->origRemote);
631 }
632 }
633
634 ++g_stats.responses;
635 ++ids->cs->responses;
636 ++dss->responses;
637
638 double udiff = ids->sentTime.udiff();
639 vinfolog("Got answer from %s, relayed to %s%s, took %f usec", dss->remote.toStringWithPort(), ids->origRemote.toStringWithPort(),
640 isDoH ? " (https)": "", udiff);
641
642 struct timespec ts;
643 gettime(&ts);
644 g_rings.insertResponse(ts, *dr.remote, *dr.qname, dr.qtype, static_cast<unsigned int>(udiff), static_cast<unsigned int>(got), cleartextDH, dss->remote);
645
646 switch (cleartextDH.rcode) {
647 case RCode::NXDomain:
648 ++g_stats.frontendNXDomain;
649 break;
650 case RCode::ServFail:
651 ++g_stats.servfailResponses;
652 ++g_stats.frontendServFail;
653 break;
654 case RCode::NoError:
655 ++g_stats.frontendNoError;
656 break;
657 }
658 dss->latencyUsec = (127.0 * dss->latencyUsec / 128.0) + udiff/128.0;
659
660 doLatencyStats(udiff);
661
662 rewrittenResponse.clear();
663 }
664 }
665 catch(const std::exception& e){
666 vinfolog("Got an error in UDP responder thread while parsing a response from %s, id %d: %s", dss->remote.toStringWithPort(), queryId, e.what());
667 }
668 }
669 }
670 catch(const std::exception& e)
671 {
672 errlog("UDP responder thread died because of exception: %s", e.what());
673 }
674 catch(const PDNSException& e)
675 {
676 errlog("UDP responder thread died because of PowerDNS exception: %s", e.reason);
677 }
678 catch(...)
679 {
680 errlog("UDP responder thread died because of an exception: %s", "unknown");
681 }
682
683 bool DownstreamState::reconnect()
684 {
685 std::unique_lock<std::mutex> tl(connectLock, std::try_to_lock);
686 if (!tl.owns_lock()) {
687 /* we are already reconnecting */
688 return false;
689 }
690
691 connected = false;
692 for (auto& fd : sockets) {
693 if (fd != -1) {
694 if (sockets.size() > 1) {
695 std::lock_guard<std::mutex> lock(socketsLock);
696 mplexer->removeReadFD(fd);
697 }
698 /* shutdown() is needed to wake up recv() in the responderThread */
699 shutdown(fd, SHUT_RDWR);
700 close(fd);
701 fd = -1;
702 }
703 if (!IsAnyAddress(remote)) {
704 fd = SSocket(remote.sin4.sin_family, SOCK_DGRAM, 0);
705 if (!IsAnyAddress(sourceAddr)) {
706 SSetsockopt(fd, SOL_SOCKET, SO_REUSEADDR, 1);
707 if (!sourceItfName.empty()) {
708 #ifdef SO_BINDTODEVICE
709 int res = setsockopt(fd, SOL_SOCKET, SO_BINDTODEVICE, sourceItfName.c_str(), sourceItfName.length());
710 if (res != 0) {
711 infolog("Error setting up the interface on backend socket '%s': %s", remote.toStringWithPort(), stringerror());
712 }
713 #endif
714 }
715
716 SBind(fd, sourceAddr);
717 }
718 try {
719 SConnect(fd, remote);
720 if (sockets.size() > 1) {
721 std::lock_guard<std::mutex> lock(socketsLock);
722 mplexer->addReadFD(fd, [](int, boost::any) {});
723 }
724 connected = true;
725 }
726 catch(const std::runtime_error& error) {
727 infolog("Error connecting to new server with address %s: %s", remote.toStringWithPort(), error.what());
728 connected = false;
729 break;
730 }
731 }
732 }
733
734 /* if at least one (re-)connection failed, close all sockets */
735 if (!connected) {
736 for (auto& fd : sockets) {
737 if (fd != -1) {
738 if (sockets.size() > 1) {
739 std::lock_guard<std::mutex> lock(socketsLock);
740 mplexer->removeReadFD(fd);
741 }
742 /* shutdown() is needed to wake up recv() in the responderThread */
743 shutdown(fd, SHUT_RDWR);
744 close(fd);
745 fd = -1;
746 }
747 }
748 }
749
750 return connected;
751 }
752 void DownstreamState::hash()
753 {
754 vinfolog("Computing hashes for id=%s and weight=%d", id, weight);
755 auto w = weight;
756 WriteLock wl(&d_lock);
757 hashes.clear();
758 while (w > 0) {
759 std::string uuid = boost::str(boost::format("%s-%d") % id % w);
760 unsigned int wshash = burtleCI((const unsigned char*)uuid.c_str(), uuid.size(), g_hashperturb);
761 hashes.insert(wshash);
762 --w;
763 }
764 }
765
766 void DownstreamState::setId(const boost::uuids::uuid& newId)
767 {
768 id = newId;
769 // compute hashes only if already done
770 if (!hashes.empty()) {
771 hash();
772 }
773 }
774
775 void DownstreamState::setWeight(int newWeight)
776 {
777 if (newWeight < 1) {
778 errlog("Error setting server's weight: downstream weight value must be greater than 0.");
779 return ;
780 }
781 weight = newWeight;
782 if (!hashes.empty()) {
783 hash();
784 }
785 }
786
787 DownstreamState::DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf_, const std::string& sourceItfName_, size_t numberOfSockets): sourceItfName(sourceItfName_), remote(remote_), sourceAddr(sourceAddr_), sourceItf(sourceItf_)
788 {
789 pthread_rwlock_init(&d_lock, nullptr);
790 id = getUniqueID();
791 threadStarted.clear();
792
793 mplexer = std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
794
795 sockets.resize(numberOfSockets);
796 for (auto& fd : sockets) {
797 fd = -1;
798 }
799
800 if (!IsAnyAddress(remote)) {
801 reconnect();
802 idStates.resize(g_maxOutstanding);
803 sw.start();
804 infolog("Added downstream server %s", remote.toStringWithPort());
805 }
806
807 }
808
809 std::mutex g_luamutex;
810 LuaContext g_lua;
811
812 GlobalStateHolder<ServerPolicy> g_policy;
813
814 shared_ptr<DownstreamState> firstAvailable(const NumberedServerVector& servers, const DNSQuestion* dq)
815 {
816 for(auto& d : servers) {
817 if(d.second->isUp() && d.second->qps.check())
818 return d.second;
819 }
820 return leastOutstanding(servers, dq);
821 }
822
823 // get server with least outstanding queries, and within those, with the lowest order, and within those: the fastest
824 shared_ptr<DownstreamState> leastOutstanding(const NumberedServerVector& servers, const DNSQuestion* dq)
825 {
826 if (servers.size() == 1 && servers[0].second->isUp()) {
827 return servers[0].second;
828 }
829
830 vector<pair<tuple<int,int,double>, shared_ptr<DownstreamState>>> poss;
831 /* so you might wonder, why do we go through this trouble? The data on which we sort could change during the sort,
832 which would suck royally and could even lead to crashes. So first we snapshot on what we sort, and then we sort */
833 poss.reserve(servers.size());
834 for(auto& d : servers) {
835 if(d.second->isUp()) {
836 poss.push_back({make_tuple(d.second->outstanding.load(), d.second->order, d.second->latencyUsec), d.second});
837 }
838 }
839 if(poss.empty())
840 return shared_ptr<DownstreamState>();
841 nth_element(poss.begin(), poss.begin(), poss.end(), [](const decltype(poss)::value_type& a, const decltype(poss)::value_type& b) { return a.first < b.first; });
842 return poss.begin()->second;
843 }
844
845 shared_ptr<DownstreamState> valrandom(unsigned int val, const NumberedServerVector& servers, const DNSQuestion* dq)
846 {
847 vector<pair<int, shared_ptr<DownstreamState>>> poss;
848 int sum = 0;
849 int max = std::numeric_limits<int>::max();
850
851 for(auto& d : servers) { // w=1, w=10 -> 1, 11
852 if(d.second->isUp()) {
853 // Don't overflow sum when adding high weights
854 if(d.second->weight > max - sum) {
855 sum = max;
856 } else {
857 sum += d.second->weight;
858 }
859
860 poss.push_back({sum, d.second});
861 }
862 }
863
864 // Catch poss & sum are empty to avoid SIGFPE
865 if(poss.empty())
866 return shared_ptr<DownstreamState>();
867
868 int r = val % sum;
869 auto p = upper_bound(poss.begin(), poss.end(),r, [](int r_, const decltype(poss)::value_type& a) { return r_ < a.first;});
870 if(p==poss.end())
871 return shared_ptr<DownstreamState>();
872 return p->second;
873 }
874
875 shared_ptr<DownstreamState> wrandom(const NumberedServerVector& servers, const DNSQuestion* dq)
876 {
877 return valrandom(random(), servers, dq);
878 }
879
880 uint32_t g_hashperturb;
881 shared_ptr<DownstreamState> whashed(const NumberedServerVector& servers, const DNSQuestion* dq)
882 {
883 return valrandom(dq->qname->hash(g_hashperturb), servers, dq);
884 }
885
886 shared_ptr<DownstreamState> chashed(const NumberedServerVector& servers, const DNSQuestion* dq)
887 {
888 unsigned int qhash = dq->qname->hash(g_hashperturb);
889 unsigned int sel = std::numeric_limits<unsigned int>::max();
890 unsigned int min = std::numeric_limits<unsigned int>::max();
891 shared_ptr<DownstreamState> ret = nullptr, first = nullptr;
892
893 for (const auto& d: servers) {
894 if (d.second->isUp()) {
895 // make sure hashes have been computed
896 if (d.second->hashes.empty()) {
897 d.second->hash();
898 }
899 {
900 ReadLock rl(&(d.second->d_lock));
901 const auto& server = d.second;
902 // we want to keep track of the last hash
903 if (min > *(server->hashes.begin())) {
904 min = *(server->hashes.begin());
905 first = server;
906 }
907
908 auto hash_it = server->hashes.lower_bound(qhash);
909 if (hash_it != server->hashes.end()) {
910 if (*hash_it < sel) {
911 sel = *hash_it;
912 ret = server;
913 }
914 }
915 }
916 }
917 }
918 if (ret != nullptr) {
919 return ret;
920 }
921 if (first != nullptr) {
922 return first;
923 }
924 return shared_ptr<DownstreamState>();
925 }
926
927 shared_ptr<DownstreamState> roundrobin(const NumberedServerVector& servers, const DNSQuestion* dq)
928 {
929 NumberedServerVector poss;
930
931 for(auto& d : servers) {
932 if(d.second->isUp()) {
933 poss.push_back(d);
934 }
935 }
936
937 const auto *res=&poss;
938 if(poss.empty() && !g_roundrobinFailOnNoServer)
939 res = &servers;
940
941 if(res->empty())
942 return shared_ptr<DownstreamState>();
943
944 static unsigned int counter;
945
946 return (*res)[(counter++) % res->size()].second;
947 }
948
949 ComboAddress g_serverControl{"127.0.0.1:5199"};
950
951 std::shared_ptr<ServerPool> createPoolIfNotExists(pools_t& pools, const string& poolName)
952 {
953 std::shared_ptr<ServerPool> pool;
954 pools_t::iterator it = pools.find(poolName);
955 if (it != pools.end()) {
956 pool = it->second;
957 }
958 else {
959 if (!poolName.empty())
960 vinfolog("Creating pool %s", poolName);
961 pool = std::make_shared<ServerPool>();
962 pools.insert(std::pair<std::string,std::shared_ptr<ServerPool> >(poolName, pool));
963 }
964 return pool;
965 }
966
967 void setPoolPolicy(pools_t& pools, const string& poolName, std::shared_ptr<ServerPolicy> policy)
968 {
969 std::shared_ptr<ServerPool> pool = createPoolIfNotExists(pools, poolName);
970 if (!poolName.empty()) {
971 vinfolog("Setting pool %s server selection policy to %s", poolName, policy->name);
972 } else {
973 vinfolog("Setting default pool server selection policy to %s", policy->name);
974 }
975 pool->policy = policy;
976 }
977
978 void addServerToPool(pools_t& pools, const string& poolName, std::shared_ptr<DownstreamState> server)
979 {
980 std::shared_ptr<ServerPool> pool = createPoolIfNotExists(pools, poolName);
981 if (!poolName.empty()) {
982 vinfolog("Adding server to pool %s", poolName);
983 } else {
984 vinfolog("Adding server to default pool");
985 }
986 pool->addServer(server);
987 }
988
989 void removeServerFromPool(pools_t& pools, const string& poolName, std::shared_ptr<DownstreamState> server)
990 {
991 std::shared_ptr<ServerPool> pool = getPool(pools, poolName);
992
993 if (!poolName.empty()) {
994 vinfolog("Removing server from pool %s", poolName);
995 }
996 else {
997 vinfolog("Removing server from default pool");
998 }
999
1000 pool->removeServer(server);
1001 }
1002
1003 std::shared_ptr<ServerPool> getPool(const pools_t& pools, const std::string& poolName)
1004 {
1005 pools_t::const_iterator it = pools.find(poolName);
1006
1007 if (it == pools.end()) {
1008 throw std::out_of_range("No pool named " + poolName);
1009 }
1010
1011 return it->second;
1012 }
1013
1014 NumberedServerVector getDownstreamCandidates(const pools_t& pools, const std::string& poolName)
1015 {
1016 std::shared_ptr<ServerPool> pool = getPool(pools, poolName);
1017 return pool->getServers();
1018 }
1019
1020 static void spoofResponseFromString(DNSQuestion& dq, const string& spoofContent)
1021 {
1022 string result;
1023
1024 std::vector<std::string> addrs;
1025 stringtok(addrs, spoofContent, " ,");
1026
1027 if (addrs.size() == 1) {
1028 try {
1029 ComboAddress spoofAddr(spoofContent);
1030 SpoofAction sa({spoofAddr});
1031 sa(&dq, &result);
1032 }
1033 catch(const PDNSException &e) {
1034 SpoofAction sa(spoofContent); // CNAME then
1035 sa(&dq, &result);
1036 }
1037 } else {
1038 std::vector<ComboAddress> cas;
1039 for (const auto& addr : addrs) {
1040 try {
1041 cas.push_back(ComboAddress(addr));
1042 }
1043 catch (...) {
1044 }
1045 }
1046 SpoofAction sa(cas);
1047 sa(&dq, &result);
1048 }
1049 }
1050
1051 bool processRulesResult(const DNSAction::Action& action, DNSQuestion& dq, std::string& ruleresult, bool& drop)
1052 {
1053 switch(action) {
1054 case DNSAction::Action::Allow:
1055 return true;
1056 break;
1057 case DNSAction::Action::Drop:
1058 ++g_stats.ruleDrop;
1059 drop = true;
1060 return true;
1061 break;
1062 case DNSAction::Action::Nxdomain:
1063 dq.dh->rcode = RCode::NXDomain;
1064 dq.dh->qr=true;
1065 ++g_stats.ruleNXDomain;
1066 return true;
1067 break;
1068 case DNSAction::Action::Refused:
1069 dq.dh->rcode = RCode::Refused;
1070 dq.dh->qr=true;
1071 ++g_stats.ruleRefused;
1072 return true;
1073 break;
1074 case DNSAction::Action::ServFail:
1075 dq.dh->rcode = RCode::ServFail;
1076 dq.dh->qr=true;
1077 ++g_stats.ruleServFail;
1078 return true;
1079 break;
1080 case DNSAction::Action::Spoof:
1081 spoofResponseFromString(dq, ruleresult);
1082 return true;
1083 break;
1084 case DNSAction::Action::Truncate:
1085 dq.dh->tc = true;
1086 dq.dh->qr = true;
1087 return true;
1088 break;
1089 case DNSAction::Action::HeaderModify:
1090 return true;
1091 break;
1092 case DNSAction::Action::Pool:
1093 dq.poolname=ruleresult;
1094 return true;
1095 break;
1096 case DNSAction::Action::NoRecurse:
1097 dq.dh->rd = false;
1098 return true;
1099 break;
1100 /* non-terminal actions follow */
1101 case DNSAction::Action::Delay:
1102 dq.delayMsec = static_cast<int>(pdns_stou(ruleresult)); // sorry
1103 break;
1104 case DNSAction::Action::None:
1105 /* fall-through */
1106 case DNSAction::Action::NoOp:
1107 break;
1108 }
1109
1110 /* false means that we don't stop the processing */
1111 return false;
1112 }
1113
1114
1115 static bool applyRulesToQuery(LocalHolders& holders, DNSQuestion& dq, const struct timespec& now)
1116 {
1117 g_rings.insertQuery(now, *dq.remote, *dq.qname, dq.qtype, dq.len, *dq.dh);
1118
1119 if(g_qcount.enabled) {
1120 string qname = (*dq.qname).toLogString();
1121 bool countQuery{true};
1122 if(g_qcount.filter) {
1123 std::lock_guard<std::mutex> lock(g_luamutex);
1124 std::tie (countQuery, qname) = g_qcount.filter(&dq);
1125 }
1126
1127 if(countQuery) {
1128 WriteLock wl(&g_qcount.queryLock);
1129 if(!g_qcount.records.count(qname)) {
1130 g_qcount.records[qname] = 0;
1131 }
1132 g_qcount.records[qname]++;
1133 }
1134 }
1135
1136 if(auto got = holders.dynNMGBlock->lookup(*dq.remote)) {
1137 auto updateBlockStats = [&got]() {
1138 ++g_stats.dynBlocked;
1139 got->second.blocks++;
1140 };
1141
1142 if(now < got->second.until) {
1143 DNSAction::Action action = got->second.action;
1144 if (action == DNSAction::Action::None) {
1145 action = g_dynBlockAction;
1146 }
1147 switch (action) {
1148 case DNSAction::Action::NoOp:
1149 /* do nothing */
1150 break;
1151
1152 case DNSAction::Action::Nxdomain:
1153 vinfolog("Query from %s turned into NXDomain because of dynamic block", dq.remote->toStringWithPort());
1154 updateBlockStats();
1155
1156 dq.dh->rcode = RCode::NXDomain;
1157 dq.dh->qr=true;
1158 return true;
1159
1160 case DNSAction::Action::Refused:
1161 vinfolog("Query from %s refused because of dynamic block", dq.remote->toStringWithPort());
1162 updateBlockStats();
1163
1164 dq.dh->rcode = RCode::Refused;
1165 dq.dh->qr = true;
1166 return true;
1167
1168 case DNSAction::Action::Truncate:
1169 if(!dq.tcp) {
1170 updateBlockStats();
1171 vinfolog("Query from %s truncated because of dynamic block", dq.remote->toStringWithPort());
1172 dq.dh->tc = true;
1173 dq.dh->qr = true;
1174 return true;
1175 }
1176 else {
1177 vinfolog("Query from %s for %s over TCP *not* truncated because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toLogString());
1178 }
1179 break;
1180 case DNSAction::Action::NoRecurse:
1181 updateBlockStats();
1182 vinfolog("Query from %s setting rd=0 because of dynamic block", dq.remote->toStringWithPort());
1183 dq.dh->rd = false;
1184 return true;
1185 default:
1186 updateBlockStats();
1187 vinfolog("Query from %s dropped because of dynamic block", dq.remote->toStringWithPort());
1188 return false;
1189 }
1190 }
1191 }
1192
1193 if(auto got = holders.dynSMTBlock->lookup(*dq.qname)) {
1194 auto updateBlockStats = [&got]() {
1195 ++g_stats.dynBlocked;
1196 got->blocks++;
1197 };
1198
1199 if(now < got->until) {
1200 DNSAction::Action action = got->action;
1201 if (action == DNSAction::Action::None) {
1202 action = g_dynBlockAction;
1203 }
1204 switch (action) {
1205 case DNSAction::Action::NoOp:
1206 /* do nothing */
1207 break;
1208 case DNSAction::Action::Nxdomain:
1209 vinfolog("Query from %s for %s turned into NXDomain because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toLogString());
1210 updateBlockStats();
1211
1212 dq.dh->rcode = RCode::NXDomain;
1213 dq.dh->qr=true;
1214 return true;
1215 case DNSAction::Action::Refused:
1216 vinfolog("Query from %s for %s refused because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toLogString());
1217 updateBlockStats();
1218
1219 dq.dh->rcode = RCode::Refused;
1220 dq.dh->qr=true;
1221 return true;
1222 case DNSAction::Action::Truncate:
1223 if(!dq.tcp) {
1224 updateBlockStats();
1225
1226 vinfolog("Query from %s for %s truncated because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toLogString());
1227 dq.dh->tc = true;
1228 dq.dh->qr = true;
1229 return true;
1230 }
1231 else {
1232 vinfolog("Query from %s for %s over TCP *not* truncated because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toLogString());
1233 }
1234 break;
1235 case DNSAction::Action::NoRecurse:
1236 updateBlockStats();
1237 vinfolog("Query from %s setting rd=0 because of dynamic block", dq.remote->toStringWithPort());
1238 dq.dh->rd = false;
1239 return true;
1240 default:
1241 updateBlockStats();
1242 vinfolog("Query from %s for %s dropped because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toLogString());
1243 return false;
1244 }
1245 }
1246 }
1247
1248 DNSAction::Action action=DNSAction::Action::None;
1249 string ruleresult;
1250 bool drop = false;
1251 for(const auto& lr : *holders.rulactions) {
1252 if(lr.d_rule->matches(&dq)) {
1253 lr.d_rule->d_matches++;
1254 action=(*lr.d_action)(&dq, &ruleresult);
1255 if (processRulesResult(action, dq, ruleresult, drop)) {
1256 break;
1257 }
1258 }
1259 }
1260
1261 if (drop) {
1262 return false;
1263 }
1264
1265 return true;
1266 }
1267
1268 ssize_t udpClientSendRequestToBackend(const std::shared_ptr<DownstreamState>& ss, const int sd, const char* request, const size_t requestLen, bool healthCheck)
1269 {
1270 ssize_t result;
1271
1272 if (ss->sourceItf == 0) {
1273 result = send(sd, request, requestLen, 0);
1274 }
1275 else {
1276 struct msghdr msgh;
1277 struct iovec iov;
1278 cmsgbuf_aligned cbuf;
1279 ComboAddress remote(ss->remote);
1280 fillMSGHdr(&msgh, &iov, &cbuf, sizeof(cbuf), const_cast<char*>(request), requestLen, &remote);
1281 addCMsgSrcAddr(&msgh, &cbuf, &ss->sourceAddr, ss->sourceItf);
1282 result = sendmsg(sd, &msgh, 0);
1283 }
1284
1285 if (result == -1) {
1286 int savederrno = errno;
1287 vinfolog("Error sending request to backend %s: %d", ss->remote.toStringWithPort(), savederrno);
1288
1289 /* This might sound silly, but on Linux send() might fail with EINVAL
1290 if the interface the socket was bound to doesn't exist anymore.
1291 We don't want to reconnect the real socket if the healthcheck failed,
1292 because it's not using the same socket.
1293 */
1294 if (!healthCheck && (savederrno == EINVAL || savederrno == ENODEV)) {
1295 ss->reconnect();
1296 }
1297 }
1298
1299 return result;
1300 }
1301
1302 static bool isUDPQueryAcceptable(ClientState& cs, LocalHolders& holders, const struct msghdr* msgh, const ComboAddress& remote, ComboAddress& dest)
1303 {
1304 if (msgh->msg_flags & MSG_TRUNC) {
1305 /* message was too large for our buffer */
1306 vinfolog("Dropping message too large for our buffer");
1307 ++g_stats.nonCompliantQueries;
1308 return false;
1309 }
1310
1311 if(!holders.acl->match(remote)) {
1312 vinfolog("Query from %s dropped because of ACL", remote.toStringWithPort());
1313 ++g_stats.aclDrops;
1314 return false;
1315 }
1316
1317 cs.queries++;
1318 ++g_stats.queries;
1319
1320 if (HarvestDestinationAddress(msgh, &dest)) {
1321 /* we don't get the port, only the address */
1322 dest.sin4.sin_port = cs.local.sin4.sin_port;
1323 }
1324 else {
1325 dest.sin4.sin_family = 0;
1326 }
1327
1328 return true;
1329 }
1330
1331 boost::optional<std::vector<uint8_t>> checkDNSCryptQuery(const ClientState& cs, const char* query, uint16_t& len, std::shared_ptr<DNSCryptQuery>& dnsCryptQuery, time_t now, bool tcp)
1332 {
1333 if (cs.dnscryptCtx) {
1334 #ifdef HAVE_DNSCRYPT
1335 vector<uint8_t> response;
1336 uint16_t decryptedQueryLen = 0;
1337
1338 dnsCryptQuery = std::make_shared<DNSCryptQuery>(cs.dnscryptCtx);
1339
1340 bool decrypted = handleDNSCryptQuery(const_cast<char*>(query), len, dnsCryptQuery, &decryptedQueryLen, tcp, now, response);
1341
1342 if (!decrypted) {
1343 if (response.size() > 0) {
1344 return response;
1345 }
1346 throw std::runtime_error("Unable to decrypt DNSCrypt query, dropping.");
1347 }
1348
1349 len = decryptedQueryLen;
1350 #endif /* HAVE_DNSCRYPT */
1351 }
1352 return boost::none;
1353 }
1354
1355 bool checkQueryHeaders(const struct dnsheader* dh)
1356 {
1357 if (dh->qr) { // don't respond to responses
1358 ++g_stats.nonCompliantQueries;
1359 return false;
1360 }
1361
1362 if (dh->qdcount == 0) {
1363 ++g_stats.emptyQueries;
1364 return false;
1365 }
1366
1367 if (dh->rd) {
1368 ++g_stats.rdQueries;
1369 }
1370
1371 return true;
1372 }
1373
1374 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1375 static void queueResponse(const ClientState& cs, const char* response, uint16_t responseLen, const ComboAddress& dest, const ComboAddress& remote, struct mmsghdr& outMsg, struct iovec* iov, cmsgbuf_aligned* cbuf)
1376 {
1377 outMsg.msg_len = 0;
1378 fillMSGHdr(&outMsg.msg_hdr, iov, nullptr, 0, const_cast<char*>(response), responseLen, const_cast<ComboAddress*>(&remote));
1379
1380 if (dest.sin4.sin_family == 0) {
1381 outMsg.msg_hdr.msg_control = nullptr;
1382 }
1383 else {
1384 addCMsgSrcAddr(&outMsg.msg_hdr, cbuf, &dest, 0);
1385 }
1386 }
1387 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1388
1389 /* self-generated responses or cache hits */
1390 static bool prepareOutgoingResponse(LocalHolders& holders, ClientState& cs, DNSQuestion& dq, bool cacheHit)
1391 {
1392 DNSResponse dr(dq.qname, dq.qtype, dq.qclass, dq.consumed, dq.local, dq.remote, reinterpret_cast<dnsheader*>(dq.dh), dq.size, dq.len, dq.tcp, dq.queryTime);
1393
1394 #ifdef HAVE_PROTOBUF
1395 dr.uniqueId = dq.uniqueId;
1396 #endif
1397 dr.qTag = dq.qTag;
1398 dr.delayMsec = dq.delayMsec;
1399
1400 if (!applyRulesToResponse(cacheHit ? holders.cacheHitRespRulactions : holders.selfAnsweredRespRulactions, dr)) {
1401 return false;
1402 }
1403
1404 /* in case a rule changed it */
1405 dq.delayMsec = dr.delayMsec;
1406
1407 #ifdef HAVE_DNSCRYPT
1408 if (!cs.muted) {
1409 if (!encryptResponse(reinterpret_cast<char*>(dq.dh), &dq.len, dq.size, dq.tcp, dq.dnsCryptQuery, nullptr, nullptr)) {
1410 return false;
1411 }
1412 }
1413 #endif /* HAVE_DNSCRYPT */
1414
1415 if (cacheHit) {
1416 ++g_stats.cacheHits;
1417 }
1418
1419 switch (dr.dh->rcode) {
1420 case RCode::NXDomain:
1421 ++g_stats.frontendNXDomain;
1422 break;
1423 case RCode::ServFail:
1424 ++g_stats.frontendServFail;
1425 break;
1426 case RCode::NoError:
1427 ++g_stats.frontendNoError;
1428 break;
1429 }
1430
1431 doLatencyStats(0); // we're not going to measure this
1432 return true;
1433 }
1434
1435 ProcessQueryResult processQuery(DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend)
1436 {
1437 const uint16_t queryId = ntohs(dq.dh->id);
1438
1439 try {
1440 /* we need an accurate ("real") value for the response and
1441 to store into the IDS, but not for insertion into the
1442 rings for example */
1443 struct timespec now;
1444 gettime(&now);
1445
1446 if (!applyRulesToQuery(holders, dq, now)) {
1447 return ProcessQueryResult::Drop;
1448 }
1449
1450 if(dq.dh->qr) { // something turned it into a response
1451 fixUpQueryTurnedResponse(dq, dq.origFlags);
1452
1453 if (!prepareOutgoingResponse(holders, cs, dq, false)) {
1454 return ProcessQueryResult::Drop;
1455 }
1456
1457 ++g_stats.selfAnswered;
1458 ++cs.responses;
1459 return ProcessQueryResult::SendAnswer;
1460 }
1461
1462 std::shared_ptr<ServerPool> serverPool = getPool(*holders.pools, dq.poolname);
1463 dq.packetCache = serverPool->packetCache;
1464 auto policy = *(holders.policy);
1465 if (serverPool->policy != nullptr) {
1466 policy = *(serverPool->policy);
1467 }
1468 auto servers = serverPool->getServers();
1469 if (policy.isLua) {
1470 std::lock_guard<std::mutex> lock(g_luamutex);
1471 selectedBackend = policy.policy(servers, &dq);
1472 }
1473 else {
1474 selectedBackend = policy.policy(servers, &dq);
1475 }
1476
1477 uint16_t cachedResponseSize = dq.size;
1478 uint32_t allowExpired = selectedBackend ? 0 : g_staleCacheEntriesTTL;
1479
1480 if (dq.packetCache && !dq.skipCache) {
1481 dq.dnssecOK = (getEDNSZ(dq) & EDNS_HEADER_FLAG_DO);
1482 }
1483
1484 if (dq.useECS && ((selectedBackend && selectedBackend->useECS) || (!selectedBackend && serverPool->getECS()))) {
1485 // we special case our cache in case a downstream explicitly gave us a universally valid response with a 0 scope
1486 if (dq.packetCache && !dq.skipCache && (!selectedBackend || !selectedBackend->disableZeroScope) && dq.packetCache->isECSParsingEnabled()) {
1487 if (dq.packetCache->get(dq, dq.consumed, dq.dh->id, reinterpret_cast<char*>(dq.dh), &cachedResponseSize, &dq.cacheKeyNoECS, dq.subnet, dq.dnssecOK, allowExpired)) {
1488 dq.len = cachedResponseSize;
1489
1490 if (!prepareOutgoingResponse(holders, cs, dq, true)) {
1491 return ProcessQueryResult::Drop;
1492 }
1493
1494 return ProcessQueryResult::SendAnswer;
1495 }
1496
1497 if (!dq.subnet) {
1498 /* there was no existing ECS on the query, enable the zero-scope feature */
1499 dq.useZeroScope = true;
1500 }
1501 }
1502
1503 if (!handleEDNSClientSubnet(dq, &(dq.ednsAdded), &(dq.ecsAdded), g_preserveTrailingData)) {
1504 vinfolog("Dropping query from %s because we couldn't insert the ECS value", dq.remote->toStringWithPort());
1505 return ProcessQueryResult::Drop;
1506 }
1507 }
1508
1509 if (dq.packetCache && !dq.skipCache) {
1510 if (dq.packetCache->get(dq, dq.consumed, dq.dh->id, reinterpret_cast<char*>(dq.dh), &cachedResponseSize, &dq.cacheKey, dq.subnet, dq.dnssecOK, allowExpired)) {
1511 dq.len = cachedResponseSize;
1512
1513 if (!prepareOutgoingResponse(holders, cs, dq, true)) {
1514 return ProcessQueryResult::Drop;
1515 }
1516
1517 return ProcessQueryResult::SendAnswer;
1518 }
1519 ++g_stats.cacheMisses;
1520 }
1521
1522 if(!selectedBackend) {
1523 ++g_stats.noPolicy;
1524
1525 vinfolog("%s query for %s|%s from %s, no policy applied", g_servFailOnNoPolicy ? "ServFailed" : "Dropped", dq.qname->toLogString(), QType(dq.qtype).getName(), dq.remote->toStringWithPort());
1526 if (g_servFailOnNoPolicy) {
1527 restoreFlags(dq.dh, dq.origFlags);
1528
1529 dq.dh->rcode = RCode::ServFail;
1530 dq.dh->qr = true;
1531
1532 if (!prepareOutgoingResponse(holders, cs, dq, false)) {
1533 return ProcessQueryResult::Drop;
1534 }
1535 // no response-only statistics counter to update.
1536 return ProcessQueryResult::SendAnswer;
1537 }
1538
1539 return ProcessQueryResult::Drop;
1540 }
1541
1542 if (dq.addXPF && selectedBackend->xpfRRCode != 0) {
1543 addXPF(dq, selectedBackend->xpfRRCode, g_preserveTrailingData);
1544 }
1545
1546 selectedBackend->queries++;
1547 return ProcessQueryResult::PassToBackend;
1548 }
1549 catch(const std::exception& e){
1550 vinfolog("Got an error while parsing a %s query from %s, id %d: %s", (dq.tcp ? "TCP" : "UDP"), dq.remote->toStringWithPort(), queryId, e.what());
1551 }
1552 return ProcessQueryResult::Drop;
1553 }
1554
1555 static void processUDPQuery(ClientState& cs, LocalHolders& holders, const struct msghdr* msgh, const ComboAddress& remote, ComboAddress& dest, char* query, uint16_t len, size_t queryBufferSize, struct mmsghdr* responsesVect, unsigned int* queuedResponses, struct iovec* respIOV, cmsgbuf_aligned* respCBuf)
1556 {
1557 assert(responsesVect == nullptr || (queuedResponses != nullptr && respIOV != nullptr && respCBuf != nullptr));
1558 uint16_t queryId = 0;
1559
1560 try {
1561 if (!isUDPQueryAcceptable(cs, holders, msgh, remote, dest)) {
1562 return;
1563 }
1564
1565 /* we need an accurate ("real") value for the response and
1566 to store into the IDS, but not for insertion into the
1567 rings for example */
1568 struct timespec queryRealTime;
1569 gettime(&queryRealTime, true);
1570
1571 std::shared_ptr<DNSCryptQuery> dnsCryptQuery = nullptr;
1572 auto dnsCryptResponse = checkDNSCryptQuery(cs, query, len, dnsCryptQuery, queryRealTime.tv_sec, false);
1573 if (dnsCryptResponse) {
1574 sendUDPResponse(cs.udpFD, reinterpret_cast<char*>(dnsCryptResponse->data()), static_cast<uint16_t>(dnsCryptResponse->size()), 0, dest, remote);
1575 return;
1576 }
1577
1578 struct dnsheader* dh = reinterpret_cast<struct dnsheader*>(query);
1579 queryId = ntohs(dh->id);
1580
1581 if (!checkQueryHeaders(dh)) {
1582 return;
1583 }
1584
1585 uint16_t qtype, qclass;
1586 unsigned int consumed = 0;
1587 DNSName qname(query, len, sizeof(dnsheader), false, &qtype, &qclass, &consumed);
1588 DNSQuestion dq(&qname, qtype, qclass, consumed, dest.sin4.sin_family != 0 ? &dest : &cs.local, &remote, dh, queryBufferSize, len, false, &queryRealTime);
1589 dq.dnsCryptQuery = std::move(dnsCryptQuery);
1590 std::shared_ptr<DownstreamState> ss{nullptr};
1591 auto result = processQuery(dq, cs, holders, ss);
1592
1593 if (result == ProcessQueryResult::Drop) {
1594 return;
1595 }
1596
1597 if (result == ProcessQueryResult::SendAnswer) {
1598 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1599 if (dq.delayMsec == 0 && responsesVect != nullptr) {
1600 queueResponse(cs, reinterpret_cast<char*>(dq.dh), dq.len, *dq.local, *dq.remote, responsesVect[*queuedResponses], respIOV, respCBuf);
1601 (*queuedResponses)++;
1602 return;
1603 }
1604 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1605 /* we use dest, always, because we don't want to use the listening address to send a response since it could be 0.0.0.0 */
1606 sendUDPResponse(cs.udpFD, reinterpret_cast<char*>(dq.dh), dq.len, dq.delayMsec, dest, *dq.remote);
1607 return;
1608 }
1609
1610 if (result != ProcessQueryResult::PassToBackend || ss == nullptr) {
1611 return;
1612 }
1613
1614 unsigned int idOffset = (ss->idOffset++) % ss->idStates.size();
1615 IDState* ids = &ss->idStates[idOffset];
1616 ids->age = 0;
1617 DOHUnit* du = nullptr;
1618
1619 /* that means that the state was in use, possibly with an allocated
1620 DOHUnit that we will need to handle, but we can't touch it before
1621 confirming that we now own this state */
1622 if (ids->isInUse()) {
1623 du = ids->du;
1624 }
1625
1626 /* we atomically replace the value, we now own this state */
1627 if (!ids->markAsUsed()) {
1628 /* the state was not in use.
1629 we reset 'du' because it might have still been in use when we read it. */
1630 du = nullptr;
1631 ++ss->outstanding;
1632 }
1633 else {
1634 /* we are reusing a state, no change in outstanding but if there was an existing DOHUnit we need
1635 to handle it because it's about to be overwritten. */
1636 ids->du = nullptr;
1637 ++ss->reuseds;
1638 ++g_stats.downstreamTimeouts;
1639 handleDOHTimeout(du);
1640 }
1641
1642 ids->cs = &cs;
1643 ids->origFD = cs.udpFD;
1644 ids->origID = dh->id;
1645 setIDStateFromDNSQuestion(*ids, dq, std::move(qname));
1646
1647 /* If we couldn't harvest the real dest addr, still
1648 write down the listening addr since it will be useful
1649 (especially if it's not an 'any' one).
1650 We need to keep track of which one it is since we may
1651 want to use the real but not the listening addr to reply.
1652 */
1653 if (dest.sin4.sin_family != 0) {
1654 ids->origDest = dest;
1655 ids->destHarvested = true;
1656 }
1657 else {
1658 ids->origDest = cs.local;
1659 ids->destHarvested = false;
1660 }
1661
1662 dh->id = idOffset;
1663
1664 int fd = pickBackendSocketForSending(ss);
1665 ssize_t ret = udpClientSendRequestToBackend(ss, fd, query, dq.len);
1666
1667 if(ret < 0) {
1668 ++ss->sendErrors;
1669 ++g_stats.downstreamSendErrors;
1670 }
1671
1672 vinfolog("Got query for %s|%s from %s, relayed to %s", ids->qname.toLogString(), QType(ids->qtype).getName(), remote.toStringWithPort(), ss->getName());
1673 }
1674 catch(const std::exception& e){
1675 vinfolog("Got an error in UDP question thread while parsing a query from %s, id %d: %s", remote.toStringWithPort(), queryId, e.what());
1676 }
1677 }
1678
1679 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1680 static void MultipleMessagesUDPClientThread(ClientState* cs, LocalHolders& holders)
1681 {
1682 struct MMReceiver
1683 {
1684 char packet[s_maxPacketCacheEntrySize];
1685 ComboAddress remote;
1686 ComboAddress dest;
1687 struct iovec iov;
1688 /* used by HarvestDestinationAddress */
1689 cmsgbuf_aligned cbuf;
1690 };
1691 const size_t vectSize = g_udpVectorSize;
1692 /* the actual buffer is larger because:
1693 - we may have to add EDNS and/or ECS
1694 - we use it for self-generated responses (from rule or cache)
1695 but we only accept incoming payloads up to that size
1696 */
1697 static_assert(s_udpIncomingBufferSize <= sizeof(MMReceiver::packet), "the incoming buffer size should not be larger than sizeof(MMReceiver::packet)");
1698
1699 auto recvData = std::unique_ptr<MMReceiver[]>(new MMReceiver[vectSize]);
1700 auto msgVec = std::unique_ptr<struct mmsghdr[]>(new struct mmsghdr[vectSize]);
1701 auto outMsgVec = std::unique_ptr<struct mmsghdr[]>(new struct mmsghdr[vectSize]);
1702
1703 /* initialize the structures needed to receive our messages */
1704 for (size_t idx = 0; idx < vectSize; idx++) {
1705 recvData[idx].remote.sin4.sin_family = cs->local.sin4.sin_family;
1706 fillMSGHdr(&msgVec[idx].msg_hdr, &recvData[idx].iov, &recvData[idx].cbuf, sizeof(recvData[idx].cbuf), recvData[idx].packet, s_udpIncomingBufferSize, &recvData[idx].remote);
1707 }
1708
1709 /* go now */
1710 for(;;) {
1711
1712 /* reset the IO vector, since it's also used to send the vector of responses
1713 to avoid having to copy the data around */
1714 for (size_t idx = 0; idx < vectSize; idx++) {
1715 recvData[idx].iov.iov_base = recvData[idx].packet;
1716 recvData[idx].iov.iov_len = sizeof(recvData[idx].packet);
1717 }
1718
1719 /* block until we have at least one message ready, but return
1720 as many as possible to save the syscall costs */
1721 int msgsGot = recvmmsg(cs->udpFD, msgVec.get(), vectSize, MSG_WAITFORONE | MSG_TRUNC, nullptr);
1722
1723 if (msgsGot <= 0) {
1724 vinfolog("Getting UDP messages via recvmmsg() failed with: %s", stringerror());
1725 continue;
1726 }
1727
1728 unsigned int msgsToSend = 0;
1729
1730 /* process the received messages */
1731 for (int msgIdx = 0; msgIdx < msgsGot; msgIdx++) {
1732 const struct msghdr* msgh = &msgVec[msgIdx].msg_hdr;
1733 unsigned int got = msgVec[msgIdx].msg_len;
1734 const ComboAddress& remote = recvData[msgIdx].remote;
1735
1736 if (static_cast<size_t>(got) < sizeof(struct dnsheader)) {
1737 ++g_stats.nonCompliantQueries;
1738 continue;
1739 }
1740
1741 processUDPQuery(*cs, holders, msgh, remote, recvData[msgIdx].dest, recvData[msgIdx].packet, static_cast<uint16_t>(got), sizeof(recvData[msgIdx].packet), outMsgVec.get(), &msgsToSend, &recvData[msgIdx].iov, &recvData[msgIdx].cbuf);
1742
1743 }
1744
1745 /* immediate (not delayed or sent to a backend) responses (mostly from a rule, dynamic block
1746 or the cache) can be sent in batch too */
1747
1748 if (msgsToSend > 0 && msgsToSend <= static_cast<unsigned int>(msgsGot)) {
1749 int sent = sendmmsg(cs->udpFD, outMsgVec.get(), msgsToSend, 0);
1750
1751 if (sent < 0 || static_cast<unsigned int>(sent) != msgsToSend) {
1752 vinfolog("Error sending responses with sendmmsg() (%d on %u): %s", sent, msgsToSend, stringerror());
1753 }
1754 }
1755
1756 }
1757 }
1758 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1759
1760 // listens to incoming queries, sends out to downstream servers, noting the intended return path
1761 static void udpClientThread(ClientState* cs)
1762 try
1763 {
1764 setThreadName("dnsdist/udpClie");
1765 LocalHolders holders;
1766
1767 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1768 if (g_udpVectorSize > 1) {
1769 MultipleMessagesUDPClientThread(cs, holders);
1770
1771 }
1772 else
1773 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1774 {
1775 char packet[s_maxPacketCacheEntrySize];
1776 /* the actual buffer is larger because:
1777 - we may have to add EDNS and/or ECS
1778 - we use it for self-generated responses (from rule or cache)
1779 but we only accept incoming payloads up to that size
1780 */
1781 static_assert(s_udpIncomingBufferSize <= sizeof(packet), "the incoming buffer size should not be larger than sizeof(MMReceiver::packet)");
1782 struct msghdr msgh;
1783 struct iovec iov;
1784 /* used by HarvestDestinationAddress */
1785 cmsgbuf_aligned cbuf;
1786
1787 ComboAddress remote;
1788 ComboAddress dest;
1789 remote.sin4.sin_family = cs->local.sin4.sin_family;
1790 fillMSGHdr(&msgh, &iov, &cbuf, sizeof(cbuf), packet, s_udpIncomingBufferSize, &remote);
1791
1792 for(;;) {
1793 ssize_t got = recvmsg(cs->udpFD, &msgh, 0);
1794
1795 if (got < 0 || static_cast<size_t>(got) < sizeof(struct dnsheader)) {
1796 ++g_stats.nonCompliantQueries;
1797 continue;
1798 }
1799
1800 processUDPQuery(*cs, holders, &msgh, remote, dest, packet, static_cast<uint16_t>(got), sizeof(packet), nullptr, nullptr, nullptr, nullptr);
1801 }
1802 }
1803 }
1804 catch(const std::exception &e)
1805 {
1806 errlog("UDP client thread died because of exception: %s", e.what());
1807 }
1808 catch(const PDNSException &e)
1809 {
1810 errlog("UDP client thread died because of PowerDNS exception: %s", e.reason);
1811 }
1812 catch(...)
1813 {
1814 errlog("UDP client thread died because of an exception: %s", "unknown");
1815 }
1816
1817 uint16_t getRandomDNSID()
1818 {
1819 #ifdef HAVE_LIBSODIUM
1820 return (randombytes_random() % 65536);
1821 #else
1822 return (random() % 65536);
1823 #endif
1824 }
1825
1826 static bool upCheck(const shared_ptr<DownstreamState>& ds)
1827 try
1828 {
1829 DNSName checkName = ds->checkName;
1830 uint16_t checkType = ds->checkType.getCode();
1831 uint16_t checkClass = ds->checkClass;
1832 dnsheader checkHeader;
1833 memset(&checkHeader, 0, sizeof(checkHeader));
1834
1835 checkHeader.qdcount = htons(1);
1836 checkHeader.id = getRandomDNSID();
1837
1838 checkHeader.rd = true;
1839 if (ds->setCD) {
1840 checkHeader.cd = true;
1841 }
1842
1843 if (ds->checkFunction) {
1844 std::lock_guard<std::mutex> lock(g_luamutex);
1845 auto ret = ds->checkFunction(checkName, checkType, checkClass, &checkHeader);
1846 checkName = std::get<0>(ret);
1847 checkType = std::get<1>(ret);
1848 checkClass = std::get<2>(ret);
1849 }
1850
1851 vector<uint8_t> packet;
1852 DNSPacketWriter dpw(packet, checkName, checkType, checkClass);
1853 dnsheader * requestHeader = dpw.getHeader();
1854 *requestHeader = checkHeader;
1855
1856 Socket sock(ds->remote.sin4.sin_family, SOCK_DGRAM);
1857 sock.setNonBlocking();
1858 if (!IsAnyAddress(ds->sourceAddr)) {
1859 sock.setReuseAddr();
1860 if (!ds->sourceItfName.empty()) {
1861 #ifdef SO_BINDTODEVICE
1862 int res = setsockopt(sock.getHandle(), SOL_SOCKET, SO_BINDTODEVICE, ds->sourceItfName.c_str(), ds->sourceItfName.length());
1863 if (res != 0 && g_verboseHealthChecks) {
1864 infolog("Error settting SO_BINDTODEVICE on the health check socket for backend '%s': %s", ds->getNameWithAddr(), stringerror());
1865 }
1866 #endif
1867 }
1868 sock.bind(ds->sourceAddr);
1869 }
1870 sock.connect(ds->remote);
1871 ssize_t sent = udpClientSendRequestToBackend(ds, sock.getHandle(), reinterpret_cast<char*>(&packet[0]), packet.size(), true);
1872 if (sent < 0) {
1873 int ret = errno;
1874 if (g_verboseHealthChecks)
1875 infolog("Error while sending a health check query to backend %s: %d", ds->getNameWithAddr(), ret);
1876 return false;
1877 }
1878
1879 int ret = waitForRWData(sock.getHandle(), true, /* ms to seconds */ ds->checkTimeout / 1000, /* remaining ms to us */ (ds->checkTimeout % 1000) * 1000);
1880 if(ret < 0 || !ret) { // error, timeout, both are down!
1881 if (ret < 0) {
1882 ret = errno;
1883 if (g_verboseHealthChecks)
1884 infolog("Error while waiting for the health check response from backend %s: %d", ds->getNameWithAddr(), ret);
1885 }
1886 else {
1887 if (g_verboseHealthChecks)
1888 infolog("Timeout while waiting for the health check response from backend %s", ds->getNameWithAddr());
1889 }
1890 return false;
1891 }
1892
1893 string reply;
1894 ComboAddress from;
1895 sock.recvFrom(reply, from);
1896
1897 /* we are using a connected socket but hey.. */
1898 if (from != ds->remote) {
1899 if (g_verboseHealthChecks)
1900 infolog("Invalid health check response received from %s, expecting one from %s", from.toStringWithPort(), ds->remote.toStringWithPort());
1901 return false;
1902 }
1903
1904 const dnsheader * responseHeader = reinterpret_cast<const dnsheader *>(reply.c_str());
1905
1906 if (reply.size() < sizeof(*responseHeader)) {
1907 if (g_verboseHealthChecks)
1908 infolog("Invalid health check response of size %d from backend %s, expecting at least %d", reply.size(), ds->getNameWithAddr(), sizeof(*responseHeader));
1909 return false;
1910 }
1911
1912 if (responseHeader->id != requestHeader->id) {
1913 if (g_verboseHealthChecks)
1914 infolog("Invalid health check response id %d from backend %s, expecting %d", responseHeader->id, ds->getNameWithAddr(), requestHeader->id);
1915 return false;
1916 }
1917
1918 if (!responseHeader->qr) {
1919 if (g_verboseHealthChecks)
1920 infolog("Invalid health check response from backend %s, expecting QR to be set", ds->getNameWithAddr());
1921 return false;
1922 }
1923
1924 if (responseHeader->rcode == RCode::ServFail) {
1925 if (g_verboseHealthChecks)
1926 infolog("Backend %s responded to health check with ServFail", ds->getNameWithAddr());
1927 return false;
1928 }
1929
1930 if (ds->mustResolve && (responseHeader->rcode == RCode::NXDomain || responseHeader->rcode == RCode::Refused)) {
1931 if (g_verboseHealthChecks)
1932 infolog("Backend %s responded to health check with %s while mustResolve is set", ds->getNameWithAddr(), responseHeader->rcode == RCode::NXDomain ? "NXDomain" : "Refused");
1933 return false;
1934 }
1935
1936 uint16_t receivedType;
1937 uint16_t receivedClass;
1938 DNSName receivedName(reply.c_str(), reply.size(), sizeof(dnsheader), false, &receivedType, &receivedClass);
1939
1940 if (receivedName != checkName || receivedType != checkType || receivedClass != checkClass) {
1941 if (g_verboseHealthChecks)
1942 infolog("Backend %s responded to health check with an invalid qname (%s vs %s), qtype (%s vs %s) or qclass (%d vs %d)", ds->getNameWithAddr(), receivedName.toLogString(), checkName.toLogString(), QType(receivedType).getName(), QType(checkType).getName(), receivedClass, checkClass);
1943 return false;
1944 }
1945
1946 return true;
1947 }
1948 catch(const std::exception& e)
1949 {
1950 if (g_verboseHealthChecks)
1951 infolog("Error checking the health of backend %s: %s", ds->getNameWithAddr(), e.what());
1952 return false;
1953 }
1954 catch(...)
1955 {
1956 if (g_verboseHealthChecks)
1957 infolog("Unknown exception while checking the health of backend %s", ds->getNameWithAddr());
1958 return false;
1959 }
1960
1961 uint64_t g_maxTCPClientThreads{10};
1962 std::atomic<uint16_t> g_cacheCleaningDelay{60};
1963 std::atomic<uint16_t> g_cacheCleaningPercentage{100};
1964
1965 void maintThread()
1966 {
1967 setThreadName("dnsdist/main");
1968 int interval = 1;
1969 size_t counter = 0;
1970 int32_t secondsToWaitLog = 0;
1971
1972 for(;;) {
1973 sleep(interval);
1974
1975 {
1976 std::lock_guard<std::mutex> lock(g_luamutex);
1977 auto f = g_lua.readVariable<boost::optional<std::function<void()> > >("maintenance");
1978 if(f) {
1979 try {
1980 (*f)();
1981 secondsToWaitLog = 0;
1982 }
1983 catch(std::exception &e) {
1984 if (secondsToWaitLog <= 0) {
1985 infolog("Error during execution of maintenance function: %s", e.what());
1986 secondsToWaitLog = 61;
1987 }
1988 secondsToWaitLog -= interval;
1989 }
1990 }
1991 }
1992
1993 counter++;
1994 if (counter >= g_cacheCleaningDelay) {
1995 /* keep track, for each cache, of whether we should keep
1996 expired entries */
1997 std::map<std::shared_ptr<DNSDistPacketCache>, bool> caches;
1998
1999 /* gather all caches actually used by at least one pool, and see
2000 if something prevents us from cleaning the expired entries */
2001 auto localPools = g_pools.getLocal();
2002 for (const auto& entry : *localPools) {
2003 auto& pool = entry.second;
2004
2005 auto packetCache = pool->packetCache;
2006 if (!packetCache) {
2007 continue;
2008 }
2009
2010 auto pair = caches.insert({packetCache, false});
2011 auto& iter = pair.first;
2012 /* if we need to keep stale data for this cache (ie, not clear
2013 expired entries when at least one pool using this cache
2014 has all its backends down) */
2015 if (packetCache->keepStaleData() && iter->second == false) {
2016 /* so far all pools had at least one backend up */
2017 if (pool->countServers(true) == 0) {
2018 iter->second = true;
2019 }
2020 }
2021 }
2022
2023 for (auto pair : caches) {
2024 /* shall we keep expired entries ? */
2025 if (pair.second == true) {
2026 continue;
2027 }
2028 auto& packetCache = pair.first;
2029 size_t upTo = (packetCache->getMaxEntries()* (100 - g_cacheCleaningPercentage)) / 100;
2030 packetCache->purgeExpired(upTo);
2031 }
2032 counter = 0;
2033 }
2034
2035 // ponder pruning g_dynblocks of expired entries here
2036 }
2037 }
2038
2039 static void secPollThread()
2040 {
2041 setThreadName("dnsdist/secpoll");
2042
2043 for (;;) {
2044 try {
2045 doSecPoll(g_secPollSuffix);
2046 }
2047 catch(...) {
2048 }
2049 sleep(g_secPollInterval);
2050 }
2051 }
2052
2053 static void healthChecksThread()
2054 {
2055 setThreadName("dnsdist/healthC");
2056
2057 int interval = 1;
2058
2059 for(;;) {
2060 sleep(interval);
2061
2062 if(g_tcpclientthreads->getQueuedCount() > 1 && !g_tcpclientthreads->hasReachedMaxThreads())
2063 g_tcpclientthreads->addTCPClientThread();
2064
2065 auto states = g_dstates.getLocal(); // this points to the actual shared_ptrs!
2066 for(auto& dss : *states) {
2067 if(++dss->lastCheck < dss->checkInterval)
2068 continue;
2069 dss->lastCheck = 0;
2070 if(dss->availability==DownstreamState::Availability::Auto) {
2071 bool newState=upCheck(dss);
2072 if (newState) {
2073 /* check succeeded */
2074 dss->currentCheckFailures = 0;
2075
2076 if (!dss->upStatus) {
2077 /* we were marked as down */
2078 dss->consecutiveSuccessfulChecks++;
2079 if (dss->consecutiveSuccessfulChecks < dss->minRiseSuccesses) {
2080 /* if we need more than one successful check to rise
2081 and we didn't reach the threshold yet,
2082 let's stay down */
2083 newState = false;
2084 }
2085 }
2086 }
2087 else {
2088 /* check failed */
2089 dss->consecutiveSuccessfulChecks = 0;
2090
2091 if (dss->upStatus) {
2092 /* we are currently up */
2093 dss->currentCheckFailures++;
2094 if (dss->currentCheckFailures < dss->maxCheckFailures) {
2095 /* we need more than one failure to be marked as down,
2096 and we did not reach the threshold yet, let's stay down */
2097 newState = true;
2098 }
2099 }
2100 }
2101
2102 if(newState != dss->upStatus) {
2103 warnlog("Marking downstream %s as '%s'", dss->getNameWithAddr(), newState ? "up" : "down");
2104
2105 if (newState && !dss->connected) {
2106 newState = dss->reconnect();
2107
2108 if (dss->connected && !dss->threadStarted.test_and_set()) {
2109 dss->tid = thread(responderThread, dss);
2110 }
2111 }
2112
2113 dss->upStatus = newState;
2114 dss->currentCheckFailures = 0;
2115 dss->consecutiveSuccessfulChecks = 0;
2116 if (g_snmpAgent && g_snmpTrapsEnabled) {
2117 g_snmpAgent->sendBackendStatusChangeTrap(dss);
2118 }
2119 }
2120 }
2121
2122 auto delta = dss->sw.udiffAndSet()/1000000.0;
2123 dss->queryLoad = 1.0*(dss->queries.load() - dss->prev.queries.load())/delta;
2124 dss->dropRate = 1.0*(dss->reuseds.load() - dss->prev.reuseds.load())/delta;
2125 dss->prev.queries.store(dss->queries.load());
2126 dss->prev.reuseds.store(dss->reuseds.load());
2127
2128 for(IDState& ids : dss->idStates) { // timeouts
2129 int64_t usageIndicator = ids.usageIndicator;
2130 if(IDState::isInUse(usageIndicator) && ids.age++ > g_udpTimeout) {
2131 /* We mark the state as unused as soon as possible
2132 to limit the risk of racing with the
2133 responder thread.
2134 */
2135 auto oldDU = ids.du;
2136
2137 if (!ids.tryMarkUnused(usageIndicator)) {
2138 /* this state has been altered in the meantime,
2139 don't go anywhere near it */
2140 continue;
2141 }
2142 ids.du = nullptr;
2143 handleDOHTimeout(oldDU);
2144 ids.age = 0;
2145 dss->reuseds++;
2146 --dss->outstanding;
2147 ++g_stats.downstreamTimeouts; // this is an 'actively' discovered timeout
2148 vinfolog("Had a downstream timeout from %s (%s) for query for %s|%s from %s",
2149 dss->remote.toStringWithPort(), dss->name,
2150 ids.qname.toLogString(), QType(ids.qtype).getName(), ids.origRemote.toStringWithPort());
2151
2152 struct timespec ts;
2153 gettime(&ts);
2154
2155 struct dnsheader fake;
2156 memset(&fake, 0, sizeof(fake));
2157 fake.id = ids.origID;
2158
2159 g_rings.insertResponse(ts, ids.origRemote, ids.qname, ids.qtype, std::numeric_limits<unsigned int>::max(), 0, fake, dss->remote);
2160 }
2161 }
2162 }
2163 }
2164 }
2165
2166 static void bindAny(int af, int sock)
2167 {
2168 __attribute__((unused)) int one = 1;
2169
2170 #ifdef IP_FREEBIND
2171 if (setsockopt(sock, IPPROTO_IP, IP_FREEBIND, &one, sizeof(one)) < 0)
2172 warnlog("Warning: IP_FREEBIND setsockopt failed: %s", stringerror());
2173 #endif
2174
2175 #ifdef IP_BINDANY
2176 if (af == AF_INET)
2177 if (setsockopt(sock, IPPROTO_IP, IP_BINDANY, &one, sizeof(one)) < 0)
2178 warnlog("Warning: IP_BINDANY setsockopt failed: %s", stringerror());
2179 #endif
2180 #ifdef IPV6_BINDANY
2181 if (af == AF_INET6)
2182 if (setsockopt(sock, IPPROTO_IPV6, IPV6_BINDANY, &one, sizeof(one)) < 0)
2183 warnlog("Warning: IPV6_BINDANY setsockopt failed: %s", stringerror());
2184 #endif
2185 #ifdef SO_BINDANY
2186 if (setsockopt(sock, SOL_SOCKET, SO_BINDANY, &one, sizeof(one)) < 0)
2187 warnlog("Warning: SO_BINDANY setsockopt failed: %s", stringerror());
2188 #endif
2189 }
2190
2191 static void dropGroupPrivs(gid_t gid)
2192 {
2193 if (gid) {
2194 if (setgid(gid) == 0) {
2195 if (setgroups(0, NULL) < 0) {
2196 warnlog("Warning: Unable to drop supplementary gids: %s", stringerror());
2197 }
2198 }
2199 else {
2200 warnlog("Warning: Unable to set group ID to %d: %s", gid, stringerror());
2201 }
2202 }
2203 }
2204
2205 static void dropUserPrivs(uid_t uid)
2206 {
2207 if(uid) {
2208 if(setuid(uid) < 0) {
2209 warnlog("Warning: Unable to set user ID to %d: %s", uid, stringerror());
2210 }
2211 }
2212 }
2213
2214 static void checkFileDescriptorsLimits(size_t udpBindsCount, size_t tcpBindsCount)
2215 {
2216 /* stdin, stdout, stderr */
2217 size_t requiredFDsCount = 3;
2218 auto backends = g_dstates.getLocal();
2219 /* UDP sockets to backends */
2220 size_t backendUDPSocketsCount = 0;
2221 for (const auto& backend : *backends) {
2222 backendUDPSocketsCount += backend->sockets.size();
2223 }
2224 requiredFDsCount += backendUDPSocketsCount;
2225 /* TCP sockets to backends */
2226 requiredFDsCount += (backends->size() * g_maxTCPClientThreads);
2227 /* listening sockets */
2228 requiredFDsCount += udpBindsCount;
2229 requiredFDsCount += tcpBindsCount;
2230 /* max TCP connections currently served */
2231 requiredFDsCount += g_maxTCPClientThreads;
2232 /* max pipes for communicating between TCP acceptors and client threads */
2233 requiredFDsCount += (g_maxTCPClientThreads * 2);
2234 /* max TCP queued connections */
2235 requiredFDsCount += g_maxTCPQueuedConnections;
2236 /* DelayPipe pipe */
2237 requiredFDsCount += 2;
2238 /* syslog socket */
2239 requiredFDsCount++;
2240 /* webserver main socket */
2241 requiredFDsCount++;
2242 /* console main socket */
2243 requiredFDsCount++;
2244 /* carbon export */
2245 requiredFDsCount++;
2246 /* history file */
2247 requiredFDsCount++;
2248 struct rlimit rl;
2249 getrlimit(RLIMIT_NOFILE, &rl);
2250 if (rl.rlim_cur <= requiredFDsCount) {
2251 warnlog("Warning, this configuration can use more than %d file descriptors, web server and console connections not included, and the current limit is %d.", std::to_string(requiredFDsCount), std::to_string(rl.rlim_cur));
2252 #ifdef HAVE_SYSTEMD
2253 warnlog("You can increase this value by using LimitNOFILE= in the systemd unit file or ulimit.");
2254 #else
2255 warnlog("You can increase this value by using ulimit.");
2256 #endif
2257 }
2258 }
2259
2260 static void setUpLocalBind(std::unique_ptr<ClientState>& cs)
2261 {
2262 /* skip some warnings if there is an identical UDP context */
2263 bool warn = cs->tcp == false || cs->tlsFrontend != nullptr || cs->dohFrontend != nullptr;
2264 int& fd = cs->tcp == false ? cs->udpFD : cs->tcpFD;
2265 (void) warn;
2266
2267 fd = SSocket(cs->local.sin4.sin_family, cs->tcp == false ? SOCK_DGRAM : SOCK_STREAM, 0);
2268
2269 if (cs->tcp) {
2270 SSetsockopt(fd, SOL_SOCKET, SO_REUSEADDR, 1);
2271 #ifdef TCP_DEFER_ACCEPT
2272 SSetsockopt(fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, 1);
2273 #endif
2274 if (cs->fastOpenQueueSize > 0) {
2275 #ifdef TCP_FASTOPEN
2276 SSetsockopt(fd, IPPROTO_TCP, TCP_FASTOPEN, cs->fastOpenQueueSize);
2277 #else
2278 if (warn) {
2279 warnlog("TCP Fast Open has been configured on local address '%s' but is not supported", cs->local.toStringWithPort());
2280 }
2281 #endif
2282 }
2283 }
2284
2285 if(cs->local.sin4.sin_family == AF_INET6) {
2286 SSetsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, 1);
2287 }
2288
2289 bindAny(cs->local.sin4.sin_family, fd);
2290
2291 if(!cs->tcp && IsAnyAddress(cs->local)) {
2292 int one=1;
2293 setsockopt(fd, IPPROTO_IP, GEN_IP_PKTINFO, &one, sizeof(one)); // linux supports this, so why not - might fail on other systems
2294 #ifdef IPV6_RECVPKTINFO
2295 setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, &one, sizeof(one));
2296 #endif
2297 }
2298
2299 if (cs->reuseport) {
2300 #ifdef SO_REUSEPORT
2301 SSetsockopt(fd, SOL_SOCKET, SO_REUSEPORT, 1);
2302 #else
2303 if (warn) {
2304 /* no need to warn again if configured but support is not available, we already did for UDP */
2305 warnlog("SO_REUSEPORT has been configured on local address '%s' but is not supported", cs->local.toStringWithPort());
2306 }
2307 #endif
2308 }
2309
2310 if (!cs->tcp) {
2311 if (cs->local.isIPv4()) {
2312 try {
2313 setSocketIgnorePMTU(cs->udpFD);
2314 }
2315 catch(const std::exception& e) {
2316 warnlog("Failed to set IP_MTU_DISCOVER on UDP server socket for local address '%s': %s", cs->local.toStringWithPort(), e.what());
2317 }
2318 }
2319 }
2320
2321 const std::string& itf = cs->interface;
2322 if (!itf.empty()) {
2323 #ifdef SO_BINDTODEVICE
2324 int res = setsockopt(fd, SOL_SOCKET, SO_BINDTODEVICE, itf.c_str(), itf.length());
2325 if (res != 0) {
2326 warnlog("Error setting up the interface on local address '%s': %s", cs->local.toStringWithPort(), stringerror());
2327 }
2328 #else
2329 if (warn) {
2330 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", cs->local.toStringWithPort());
2331 }
2332 #endif
2333 }
2334
2335 #ifdef HAVE_EBPF
2336 if (g_defaultBPFFilter) {
2337 cs->attachFilter(g_defaultBPFFilter);
2338 vinfolog("Attaching default BPF Filter to %s frontend %s", (!cs->tcp ? "UDP" : "TCP"), cs->local.toStringWithPort());
2339 }
2340 #endif /* HAVE_EBPF */
2341
2342 if (cs->tlsFrontend != nullptr) {
2343 if (!cs->tlsFrontend->setupTLS()) {
2344 errlog("Error while setting up TLS on local address '%s', exiting", cs->local.toStringWithPort());
2345 _exit(EXIT_FAILURE);
2346 }
2347 }
2348
2349 if (cs->dohFrontend != nullptr) {
2350 cs->dohFrontend->setup();
2351 }
2352
2353 SBind(fd, cs->local);
2354
2355 if (cs->tcp) {
2356 SListen(cs->tcpFD, SOMAXCONN);
2357 if (cs->tlsFrontend != nullptr) {
2358 warnlog("Listening on %s for TLS", cs->local.toStringWithPort());
2359 }
2360 else if (cs->dohFrontend != nullptr) {
2361 warnlog("Listening on %s for DoH", cs->local.toStringWithPort());
2362 }
2363 else if (cs->dnscryptCtx != nullptr) {
2364 warnlog("Listening on %s for DNSCrypt", cs->local.toStringWithPort());
2365 }
2366 else {
2367 warnlog("Listening on %s", cs->local.toStringWithPort());
2368 }
2369 }
2370
2371 cs->ready = true;
2372 }
2373
2374 struct
2375 {
2376 vector<string> locals;
2377 vector<string> remotes;
2378 bool checkConfig{false};
2379 bool beClient{false};
2380 bool beSupervised{false};
2381 string command;
2382 string config;
2383 string uid;
2384 string gid;
2385 } g_cmdLine;
2386
2387 std::atomic<bool> g_configurationDone{false};
2388
2389 static void usage()
2390 {
2391 cout<<endl;
2392 cout<<"Syntax: dnsdist [-C,--config file] [-c,--client [IP[:PORT]]]\n";
2393 cout<<"[-e,--execute cmd] [-h,--help] [-l,--local addr]\n";
2394 cout<<"[-v,--verbose] [--check-config] [--version]\n";
2395 cout<<"\n";
2396 cout<<"-a,--acl netmask Add this netmask to the ACL\n";
2397 cout<<"-C,--config file Load configuration from 'file'\n";
2398 cout<<"-c,--client Operate as a client, connect to dnsdist. This reads\n";
2399 cout<<" controlSocket from your configuration file, but also\n";
2400 cout<<" accepts an IP:PORT argument\n";
2401 #ifdef HAVE_LIBSODIUM
2402 cout<<"-k,--setkey KEY Use KEY for encrypted communication to dnsdist. This\n";
2403 cout<<" is similar to setting setKey in the configuration file.\n";
2404 cout<<" NOTE: this will leak this key in your shell's history\n";
2405 cout<<" and in the systems running process list.\n";
2406 #endif
2407 cout<<"--check-config Validate the configuration file and exit. The exit-code\n";
2408 cout<<" reflects the validation, 0 is OK, 1 means an error.\n";
2409 cout<<" Any errors are printed as well.\n";
2410 cout<<"-e,--execute cmd Connect to dnsdist and execute 'cmd'\n";
2411 cout<<"-g,--gid gid Change the process group ID after binding sockets\n";
2412 cout<<"-h,--help Display this helpful message\n";
2413 cout<<"-l,--local address Listen on this local address\n";
2414 cout<<"--supervised Don't open a console, I'm supervised\n";
2415 cout<<" (use with e.g. systemd and daemontools)\n";
2416 cout<<"--disable-syslog Don't log to syslog, only to stdout\n";
2417 cout<<" (use with e.g. systemd)\n";
2418 cout<<"-u,--uid uid Change the process user ID after binding sockets\n";
2419 cout<<"-v,--verbose Enable verbose mode\n";
2420 cout<<"-V,--version Show dnsdist version information and exit\n";
2421 }
2422
2423 int main(int argc, char** argv)
2424 try
2425 {
2426 size_t udpBindsCount = 0;
2427 size_t tcpBindsCount = 0;
2428 rl_attempted_completion_function = my_completion;
2429 rl_completion_append_character = 0;
2430
2431 signal(SIGPIPE, SIG_IGN);
2432 signal(SIGCHLD, SIG_IGN);
2433 openlog("dnsdist", LOG_PID|LOG_NDELAY, LOG_DAEMON);
2434
2435 #ifdef HAVE_LIBSODIUM
2436 if (sodium_init() == -1) {
2437 cerr<<"Unable to initialize crypto library"<<endl;
2438 exit(EXIT_FAILURE);
2439 }
2440 g_hashperturb=randombytes_uniform(0xffffffff);
2441 srandom(randombytes_uniform(0xffffffff));
2442 #else
2443 {
2444 struct timeval tv;
2445 gettimeofday(&tv, 0);
2446 srandom(tv.tv_sec ^ tv.tv_usec ^ getpid());
2447 g_hashperturb=random();
2448 }
2449
2450 #endif
2451 ComboAddress clientAddress = ComboAddress();
2452 g_cmdLine.config=SYSCONFDIR "/dnsdist.conf";
2453 struct option longopts[]={
2454 {"acl", required_argument, 0, 'a'},
2455 {"check-config", no_argument, 0, 1},
2456 {"client", no_argument, 0, 'c'},
2457 {"config", required_argument, 0, 'C'},
2458 {"disable-syslog", no_argument, 0, 2},
2459 {"execute", required_argument, 0, 'e'},
2460 {"gid", required_argument, 0, 'g'},
2461 {"help", no_argument, 0, 'h'},
2462 {"local", required_argument, 0, 'l'},
2463 {"setkey", required_argument, 0, 'k'},
2464 {"supervised", no_argument, 0, 3},
2465 {"uid", required_argument, 0, 'u'},
2466 {"verbose", no_argument, 0, 'v'},
2467 {"version", no_argument, 0, 'V'},
2468 {0,0,0,0}
2469 };
2470 int longindex=0;
2471 string optstring;
2472 for(;;) {
2473 int c=getopt_long(argc, argv, "a:cC:e:g:hk:l:u:vV", longopts, &longindex);
2474 if(c==-1)
2475 break;
2476 switch(c) {
2477 case 1:
2478 g_cmdLine.checkConfig=true;
2479 break;
2480 case 2:
2481 g_syslog=false;
2482 break;
2483 case 3:
2484 g_cmdLine.beSupervised=true;
2485 break;
2486 case 'C':
2487 g_cmdLine.config=optarg;
2488 break;
2489 case 'c':
2490 g_cmdLine.beClient=true;
2491 break;
2492 case 'e':
2493 g_cmdLine.command=optarg;
2494 break;
2495 case 'g':
2496 g_cmdLine.gid=optarg;
2497 break;
2498 case 'h':
2499 cout<<"dnsdist "<<VERSION<<endl;
2500 usage();
2501 cout<<"\n";
2502 exit(EXIT_SUCCESS);
2503 break;
2504 case 'a':
2505 optstring=optarg;
2506 g_ACL.modify([optstring](NetmaskGroup& nmg) { nmg.addMask(optstring); });
2507 break;
2508 case 'k':
2509 #ifdef HAVE_LIBSODIUM
2510 if (B64Decode(string(optarg), g_consoleKey) < 0) {
2511 cerr<<"Unable to decode key '"<<optarg<<"'."<<endl;
2512 exit(EXIT_FAILURE);
2513 }
2514 #else
2515 cerr<<"dnsdist has been built without libsodium, -k/--setkey is unsupported."<<endl;
2516 exit(EXIT_FAILURE);
2517 #endif
2518 break;
2519 case 'l':
2520 g_cmdLine.locals.push_back(trim_copy(string(optarg)));
2521 break;
2522 case 'u':
2523 g_cmdLine.uid=optarg;
2524 break;
2525 case 'v':
2526 g_verbose=true;
2527 break;
2528 case 'V':
2529 #ifdef LUAJIT_VERSION
2530 cout<<"dnsdist "<<VERSION<<" ("<<LUA_RELEASE<<" ["<<LUAJIT_VERSION<<"])"<<endl;
2531 #else
2532 cout<<"dnsdist "<<VERSION<<" ("<<LUA_RELEASE<<")"<<endl;
2533 #endif
2534 cout<<"Enabled features: ";
2535 #ifdef HAVE_CDB
2536 cout<<"cdb ";
2537 #endif
2538 #ifdef HAVE_DNS_OVER_TLS
2539 cout<<"dns-over-tls(";
2540 #ifdef HAVE_GNUTLS
2541 cout<<"gnutls";
2542 #ifdef HAVE_LIBSSL
2543 cout<<" ";
2544 #endif
2545 #endif
2546 #ifdef HAVE_LIBSSL
2547 cout<<"openssl";
2548 #endif
2549 cout<<") ";
2550 #endif
2551 #ifdef HAVE_DNS_OVER_HTTPS
2552 cout<<"dns-over-https(DOH) ";
2553 #endif
2554 #ifdef HAVE_DNSCRYPT
2555 cout<<"dnscrypt ";
2556 #endif
2557 #ifdef HAVE_EBPF
2558 cout<<"ebpf ";
2559 #endif
2560 #ifdef HAVE_FSTRM
2561 cout<<"fstrm ";
2562 #endif
2563 #ifdef HAVE_LIBCRYPTO
2564 cout<<"ipcipher ";
2565 #endif
2566 #ifdef HAVE_LIBSODIUM
2567 cout<<"libsodium ";
2568 #endif
2569 #ifdef HAVE_LMDB
2570 cout<<"lmdb ";
2571 #endif
2572 #ifdef HAVE_PROTOBUF
2573 cout<<"protobuf ";
2574 #endif
2575 #ifdef HAVE_RE2
2576 cout<<"re2 ";
2577 #endif
2578 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
2579 cout<<"recvmmsg/sendmmsg ";
2580 #endif
2581 #ifdef HAVE_NET_SNMP
2582 cout<<"snmp ";
2583 #endif
2584 #ifdef HAVE_SYSTEMD
2585 cout<<"systemd";
2586 #endif
2587 cout<<endl;
2588 exit(EXIT_SUCCESS);
2589 break;
2590 case '?':
2591 //getopt_long printed an error message.
2592 usage();
2593 exit(EXIT_FAILURE);
2594 break;
2595 }
2596 }
2597
2598 argc-=optind;
2599 argv+=optind;
2600 for(auto p = argv; *p; ++p) {
2601 if(g_cmdLine.beClient) {
2602 clientAddress = ComboAddress(*p, 5199);
2603 } else {
2604 g_cmdLine.remotes.push_back(*p);
2605 }
2606 }
2607
2608 ServerPolicy leastOutstandingPol{"leastOutstanding", leastOutstanding, false};
2609
2610 g_policy.setState(leastOutstandingPol);
2611 if(g_cmdLine.beClient || !g_cmdLine.command.empty()) {
2612 setupLua(true, g_cmdLine.config);
2613 if (clientAddress != ComboAddress())
2614 g_serverControl = clientAddress;
2615 doClient(g_serverControl, g_cmdLine.command);
2616 _exit(EXIT_SUCCESS);
2617 }
2618
2619 auto acl = g_ACL.getCopy();
2620 if(acl.empty()) {
2621 for(auto& addr : {"127.0.0.0/8", "10.0.0.0/8", "100.64.0.0/10", "169.254.0.0/16", "192.168.0.0/16", "172.16.0.0/12", "::1/128", "fc00::/7", "fe80::/10"})
2622 acl.addMask(addr);
2623 g_ACL.setState(acl);
2624 }
2625
2626 auto consoleACL = g_consoleACL.getCopy();
2627 for (const auto& mask : { "127.0.0.1/8", "::1/128" }) {
2628 consoleACL.addMask(mask);
2629 }
2630 g_consoleACL.setState(consoleACL);
2631
2632 if (g_cmdLine.checkConfig) {
2633 setupLua(true, g_cmdLine.config);
2634 // No exception was thrown
2635 infolog("Configuration '%s' OK!", g_cmdLine.config);
2636 _exit(EXIT_SUCCESS);
2637 }
2638
2639 auto todo=setupLua(false, g_cmdLine.config);
2640
2641 auto localPools = g_pools.getCopy();
2642 {
2643 bool precompute = false;
2644 if (g_policy.getLocal()->name == "chashed") {
2645 precompute = true;
2646 } else {
2647 for (const auto& entry: localPools) {
2648 if (entry.second->policy != nullptr && entry.second->policy->name == "chashed") {
2649 precompute = true;
2650 break ;
2651 }
2652 }
2653 }
2654 if (precompute) {
2655 vinfolog("Pre-computing hashes for consistent hash load-balancing policy");
2656 // pre compute hashes
2657 auto backends = g_dstates.getLocal();
2658 for (auto& backend: *backends) {
2659 backend->hash();
2660 }
2661 }
2662 }
2663
2664 if (!g_cmdLine.locals.empty()) {
2665 for (auto it = g_frontends.begin(); it != g_frontends.end(); ) {
2666 /* DoH, DoT and DNSCrypt frontends are separate */
2667 if ((*it)->dohFrontend == nullptr && (*it)->tlsFrontend == nullptr && (*it)->dnscryptCtx == nullptr) {
2668 it = g_frontends.erase(it);
2669 }
2670 else {
2671 ++it;
2672 }
2673 }
2674
2675 for(const auto& loc : g_cmdLine.locals) {
2676 /* UDP */
2677 g_frontends.push_back(std::unique_ptr<ClientState>(new ClientState(ComboAddress(loc, 53), false, false, 0, "", {})));
2678 /* TCP */
2679 g_frontends.push_back(std::unique_ptr<ClientState>(new ClientState(ComboAddress(loc, 53), true, false, 0, "", {})));
2680 }
2681 }
2682
2683 if (g_frontends.empty()) {
2684 /* UDP */
2685 g_frontends.push_back(std::unique_ptr<ClientState>(new ClientState(ComboAddress("127.0.0.1", 53), false, false, 0, "", {})));
2686 /* TCP */
2687 g_frontends.push_back(std::unique_ptr<ClientState>(new ClientState(ComboAddress("127.0.0.1", 53), true, false, 0, "", {})));
2688 }
2689
2690 g_configurationDone = true;
2691
2692 for(auto& frontend : g_frontends) {
2693 setUpLocalBind(frontend);
2694
2695 if (frontend->tcp == false) {
2696 ++udpBindsCount;
2697 }
2698 else {
2699 ++tcpBindsCount;
2700 }
2701 }
2702
2703 warnlog("dnsdist %s comes with ABSOLUTELY NO WARRANTY. This is free software, and you are welcome to redistribute it according to the terms of the GPL version 2", VERSION);
2704
2705 vector<string> vec;
2706 std::string acls;
2707 g_ACL.getLocal()->toStringVector(&vec);
2708 for(const auto& s : vec) {
2709 if (!acls.empty())
2710 acls += ", ";
2711 acls += s;
2712 }
2713 infolog("ACL allowing queries from: %s", acls.c_str());
2714 vec.clear();
2715 acls.clear();
2716 g_consoleACL.getLocal()->toStringVector(&vec);
2717 for (const auto& entry : vec) {
2718 if (!acls.empty()) {
2719 acls += ", ";
2720 }
2721 acls += entry;
2722 }
2723 infolog("Console ACL allowing connections from: %s", acls.c_str());
2724
2725 #ifdef HAVE_LIBSODIUM
2726 if (g_consoleEnabled && g_consoleKey.empty()) {
2727 warnlog("Warning, the console has been enabled via 'controlSocket()' but no key has been set with 'setKey()' so all connections will fail until a key has been set");
2728 }
2729 #endif
2730
2731 uid_t newgid=0;
2732 gid_t newuid=0;
2733
2734 if(!g_cmdLine.gid.empty())
2735 newgid = strToGID(g_cmdLine.gid.c_str());
2736
2737 if(!g_cmdLine.uid.empty())
2738 newuid = strToUID(g_cmdLine.uid.c_str());
2739
2740 dropGroupPrivs(newgid);
2741 dropUserPrivs(newuid);
2742 try {
2743 /* we might still have capabilities remaining,
2744 for example if we have been started as root
2745 without --uid or --gid (please don't do that)
2746 or as an unprivileged user with ambient
2747 capabilities like CAP_NET_BIND_SERVICE.
2748 */
2749 dropCapabilities(g_capabilitiesToRetain);
2750 }
2751 catch(const std::exception& e) {
2752 warnlog("%s", e.what());
2753 }
2754
2755 /* this need to be done _after_ dropping privileges */
2756 g_delay = new DelayPipe<DelayedPacket>();
2757
2758 if (g_snmpAgent) {
2759 g_snmpAgent->run();
2760 }
2761
2762 g_tcpclientthreads = std::unique_ptr<TCPClientCollection>(new TCPClientCollection(g_maxTCPClientThreads, g_useTCPSinglePipe));
2763
2764 for(auto& t : todo)
2765 t();
2766
2767 localPools = g_pools.getCopy();
2768 /* create the default pool no matter what */
2769 createPoolIfNotExists(localPools, "");
2770 if(g_cmdLine.remotes.size()) {
2771 for(const auto& address : g_cmdLine.remotes) {
2772 auto ret=std::make_shared<DownstreamState>(ComboAddress(address, 53));
2773 addServerToPool(localPools, "", ret);
2774 if (ret->connected && !ret->threadStarted.test_and_set()) {
2775 ret->tid = thread(responderThread, ret);
2776 }
2777 g_dstates.modify([ret](servers_t& servers) { servers.push_back(ret); });
2778 }
2779 }
2780 g_pools.setState(localPools);
2781
2782 if(g_dstates.getLocal()->empty()) {
2783 errlog("No downstream servers defined: all packets will get dropped");
2784 // you might define them later, but you need to know
2785 }
2786
2787 checkFileDescriptorsLimits(udpBindsCount, tcpBindsCount);
2788
2789 for(auto& dss : g_dstates.getCopy()) { // it is a copy, but the internal shared_ptrs are the real deal
2790 if(dss->availability==DownstreamState::Availability::Auto) {
2791 bool newState=upCheck(dss);
2792 warnlog("Marking downstream %s as '%s'", dss->getNameWithAddr(), newState ? "up" : "down");
2793 dss->upStatus = newState;
2794 }
2795 }
2796
2797 for(auto& cs : g_frontends) {
2798 if (cs->dohFrontend != nullptr) {
2799 #ifdef HAVE_DNS_OVER_HTTPS
2800 std::thread t1(dohThread, cs.get());
2801 if (!cs->cpus.empty()) {
2802 mapThreadToCPUList(t1.native_handle(), cs->cpus);
2803 }
2804 t1.detach();
2805 #endif /* HAVE_DNS_OVER_HTTPS */
2806 continue;
2807 }
2808 if (cs->udpFD >= 0) {
2809 thread t1(udpClientThread, cs.get());
2810 if (!cs->cpus.empty()) {
2811 mapThreadToCPUList(t1.native_handle(), cs->cpus);
2812 }
2813 t1.detach();
2814 }
2815 else if (cs->tcpFD >= 0) {
2816 thread t1(tcpAcceptorThread, cs.get());
2817 if (!cs->cpus.empty()) {
2818 mapThreadToCPUList(t1.native_handle(), cs->cpus);
2819 }
2820 t1.detach();
2821 }
2822 }
2823
2824 thread carbonthread(carbonDumpThread);
2825 carbonthread.detach();
2826
2827 thread stattid(maintThread);
2828 stattid.detach();
2829
2830 thread healththread(healthChecksThread);
2831
2832 if (!g_secPollSuffix.empty()) {
2833 thread secpollthread(secPollThread);
2834 secpollthread.detach();
2835 }
2836
2837 if(g_cmdLine.beSupervised) {
2838 #ifdef HAVE_SYSTEMD
2839 sd_notify(0, "READY=1");
2840 #endif
2841 healththread.join();
2842 }
2843 else {
2844 healththread.detach();
2845 doConsole();
2846 }
2847 _exit(EXIT_SUCCESS);
2848
2849 }
2850 catch(const LuaContext::ExecutionErrorException& e) {
2851 try {
2852 errlog("Fatal Lua error: %s", e.what());
2853 std::rethrow_if_nested(e);
2854 } catch(const std::exception& ne) {
2855 errlog("Details: %s", ne.what());
2856 }
2857 catch(PDNSException &ae)
2858 {
2859 errlog("Fatal pdns error: %s", ae.reason);
2860 }
2861 _exit(EXIT_FAILURE);
2862 }
2863 catch(std::exception &e)
2864 {
2865 errlog("Fatal error: %s", e.what());
2866 _exit(EXIT_FAILURE);
2867 }
2868 catch(PDNSException &ae)
2869 {
2870 errlog("Fatal pdns error: %s", ae.reason);
2871 _exit(EXIT_FAILURE);
2872 }
2873
2874 uint64_t getLatencyCount(const std::string&)
2875 {
2876 return g_stats.responses + g_stats.selfAnswered + g_stats.cacheHits;
2877 }