]> git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/dnsdist.cc
dnsdist: Always use DNSName::toLogString() instead of ::toString()
[thirdparty/pdns.git] / pdns / dnsdist.cc
1 /*
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
8 *
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
22
23 #include "config.h"
24
25 #include <fstream>
26 #include <getopt.h>
27 #include <grp.h>
28 #include <limits>
29 #include <netinet/tcp.h>
30 #include <pwd.h>
31 #include <sys/resource.h>
32 #include <unistd.h>
33
34 #if defined (__OpenBSD__) || defined(__NetBSD__)
35 #include <readline/readline.h>
36 #else
37 #include <editline/readline.h>
38 #endif
39
40 #ifdef HAVE_SYSTEMD
41 #include <systemd/sd-daemon.h>
42 #endif
43
44 #include "dnsdist.hh"
45 #include "dnsdist-cache.hh"
46 #include "dnsdist-console.hh"
47 #include "dnsdist-ecs.hh"
48 #include "dnsdist-lua.hh"
49 #include "dnsdist-rings.hh"
50 #include "dnsdist-secpoll.hh"
51 #include "dnsdist-xpf.hh"
52
53 #include "base64.hh"
54 #include "delaypipe.hh"
55 #include "dolog.hh"
56 #include "dnsname.hh"
57 #include "dnsparser.hh"
58 #include "dnswriter.hh"
59 #include "ednsoptions.hh"
60 #include "gettime.hh"
61 #include "lock.hh"
62 #include "misc.hh"
63 #include "sodcrypto.hh"
64 #include "sstuff.hh"
65 #include "threadname.hh"
66
67 /* Known sins:
68
69 Receiver is currently single threaded
70 not *that* bad actually, but now that we are thread safe, might want to scale
71 */
72
73 /* the Rulaction plan
74 Set of Rules, if one matches, it leads to an Action
75 Both rules and actions could conceivably be Lua based.
76 On the C++ side, both could be inherited from a class Rule and a class Action,
77 on the Lua side we can't do that. */
78
79 using std::atomic;
80 using std::thread;
81 bool g_verbose;
82
83 struct DNSDistStats g_stats;
84 MetricDefinitionStorage g_metricDefinitions;
85
86 uint16_t g_maxOutstanding{10240};
87 bool g_verboseHealthChecks{false};
88 uint32_t g_staleCacheEntriesTTL{0};
89 bool g_syslog{true};
90 bool g_allowEmptyResponse{false};
91
92 GlobalStateHolder<NetmaskGroup> g_ACL;
93 string g_outputBuffer;
94
95 std::vector<std::shared_ptr<TLSFrontend>> g_tlslocals;
96 std::vector<std::shared_ptr<DOHFrontend>> g_dohlocals;
97 std::vector<std::shared_ptr<DNSCryptContext>> g_dnsCryptLocals;
98 #ifdef HAVE_EBPF
99 shared_ptr<BPFFilter> g_defaultBPFFilter;
100 std::vector<std::shared_ptr<DynBPFFilter> > g_dynBPFFilters;
101 #endif /* HAVE_EBPF */
102 std::vector<std::unique_ptr<ClientState>> g_frontends;
103 GlobalStateHolder<pools_t> g_pools;
104 size_t g_udpVectorSize{1};
105
106 bool g_snmpEnabled{false};
107 bool g_snmpTrapsEnabled{false};
108 DNSDistSNMPAgent* g_snmpAgent{nullptr};
109
110 /* UDP: the grand design. Per socket we listen on for incoming queries there is one thread.
111 Then we have a bunch of connected sockets for talking to downstream servers.
112 We send directly to those sockets.
113
114 For the return path, per downstream server we have a thread that listens to responses.
115
116 Per socket there is an array of 2^16 states, when we send out a packet downstream, we note
117 there the original requestor and the original id. The new ID is the offset in the array.
118
119 When an answer comes in on a socket, we look up the offset by the id, and lob it to the
120 original requestor.
121
122 IDs are assigned by atomic increments of the socket offset.
123 */
124
125 GlobalStateHolder<vector<DNSDistRuleAction> > g_rulactions;
126 GlobalStateHolder<vector<DNSDistResponseRuleAction> > g_resprulactions;
127 GlobalStateHolder<vector<DNSDistResponseRuleAction> > g_cachehitresprulactions;
128 GlobalStateHolder<vector<DNSDistResponseRuleAction> > g_selfansweredresprulactions;
129
130 Rings g_rings;
131 QueryCount g_qcount;
132
133 GlobalStateHolder<servers_t> g_dstates;
134 GlobalStateHolder<NetmaskTree<DynBlock>> g_dynblockNMG;
135 GlobalStateHolder<SuffixMatchTree<DynBlock>> g_dynblockSMT;
136 DNSAction::Action g_dynBlockAction = DNSAction::Action::Drop;
137 int g_tcpRecvTimeout{2};
138 int g_tcpSendTimeout{2};
139 int g_udpTimeout{2};
140
141 bool g_servFailOnNoPolicy{false};
142 bool g_truncateTC{false};
143 bool g_fixupCase{false};
144 bool g_preserveTrailingData{false};
145 bool g_roundrobinFailOnNoServer{false};
146
147 static void truncateTC(char* packet, uint16_t* len, size_t responseSize, unsigned int consumed)
148 try
149 {
150 bool hadEDNS = false;
151 uint16_t payloadSize = 0;
152 uint16_t z = 0;
153
154 if (g_addEDNSToSelfGeneratedResponses) {
155 hadEDNS = getEDNSUDPPayloadSizeAndZ(packet, *len, &payloadSize, &z);
156 }
157
158 *len=static_cast<uint16_t>(sizeof(dnsheader)+consumed+DNS_TYPE_SIZE+DNS_CLASS_SIZE);
159 struct dnsheader* dh = reinterpret_cast<struct dnsheader*>(packet);
160 dh->ancount = dh->arcount = dh->nscount = 0;
161
162 if (hadEDNS) {
163 addEDNS(dh, *len, responseSize, z & EDNS_HEADER_FLAG_DO, payloadSize, 0);
164 }
165 }
166 catch(...)
167 {
168 g_stats.truncFail++;
169 }
170
171 struct DelayedPacket
172 {
173 int fd;
174 string packet;
175 ComboAddress destination;
176 ComboAddress origDest;
177 void operator()()
178 {
179 ssize_t res;
180 if(origDest.sin4.sin_family == 0) {
181 res = sendto(fd, packet.c_str(), packet.size(), 0, (struct sockaddr*)&destination, destination.getSocklen());
182 }
183 else {
184 res = sendfromto(fd, packet.c_str(), packet.size(), 0, origDest, destination);
185 }
186 if (res == -1) {
187 int err = errno;
188 vinfolog("Error sending delayed response to %s: %s", destination.toStringWithPort(), strerror(err));
189 }
190 }
191 };
192
193 DelayPipe<DelayedPacket>* g_delay = nullptr;
194
195 void doLatencyStats(double udiff)
196 {
197 if(udiff < 1000) ++g_stats.latency0_1;
198 else if(udiff < 10000) ++g_stats.latency1_10;
199 else if(udiff < 50000) ++g_stats.latency10_50;
200 else if(udiff < 100000) ++g_stats.latency50_100;
201 else if(udiff < 1000000) ++g_stats.latency100_1000;
202 else ++g_stats.latencySlow;
203 g_stats.latencySum += udiff / 1000;
204
205 auto doAvg = [](double& var, double n, double weight) {
206 var = (weight -1) * var/weight + n/weight;
207 };
208
209 doAvg(g_stats.latencyAvg100, udiff, 100);
210 doAvg(g_stats.latencyAvg1000, udiff, 1000);
211 doAvg(g_stats.latencyAvg10000, udiff, 10000);
212 doAvg(g_stats.latencyAvg1000000, udiff, 1000000);
213 }
214
215 bool responseContentMatches(const char* response, const uint16_t responseLen, const DNSName& qname, const uint16_t qtype, const uint16_t qclass, const ComboAddress& remote, unsigned int& consumed)
216 {
217 if (responseLen < sizeof(dnsheader)) {
218 return false;
219 }
220
221 const struct dnsheader* dh = reinterpret_cast<const struct dnsheader*>(response);
222 if (dh->qdcount == 0) {
223 if ((dh->rcode != RCode::NoError && dh->rcode != RCode::NXDomain) || g_allowEmptyResponse) {
224 return true;
225 }
226 else {
227 ++g_stats.nonCompliantResponses;
228 return false;
229 }
230 }
231
232 uint16_t rqtype, rqclass;
233 DNSName rqname;
234 try {
235 rqname=DNSName(response, responseLen, sizeof(dnsheader), false, &rqtype, &rqclass, &consumed);
236 }
237 catch(const std::exception& e) {
238 if(responseLen > 0 && static_cast<size_t>(responseLen) > sizeof(dnsheader)) {
239 infolog("Backend %s sent us a response with id %d that did not parse: %s", remote.toStringWithPort(), ntohs(dh->id), e.what());
240 }
241 ++g_stats.nonCompliantResponses;
242 return false;
243 }
244
245 if (rqtype != qtype || rqclass != qclass || rqname != qname) {
246 return false;
247 }
248
249 return true;
250 }
251
252 static void restoreFlags(struct dnsheader* dh, uint16_t origFlags)
253 {
254 static const uint16_t rdMask = 1 << FLAGS_RD_OFFSET;
255 static const uint16_t cdMask = 1 << FLAGS_CD_OFFSET;
256 static const uint16_t restoreFlagsMask = UINT16_MAX & ~(rdMask | cdMask);
257 uint16_t * flags = getFlagsFromDNSHeader(dh);
258 /* clear the flags we are about to restore */
259 *flags &= restoreFlagsMask;
260 /* only keep the flags we want to restore */
261 origFlags &= ~restoreFlagsMask;
262 /* set the saved flags as they were */
263 *flags |= origFlags;
264 }
265
266 static bool fixUpQueryTurnedResponse(DNSQuestion& dq, const uint16_t origFlags)
267 {
268 restoreFlags(dq.dh, origFlags);
269
270 return addEDNSToQueryTurnedResponse(dq);
271 }
272
273 static bool fixUpResponse(char** response, uint16_t* responseLen, size_t* responseSize, const DNSName& qname, uint16_t origFlags, bool ednsAdded, bool ecsAdded, std::vector<uint8_t>& rewrittenResponse, uint16_t addRoom, bool* zeroScope)
274 {
275 if (*responseLen < sizeof(dnsheader)) {
276 return false;
277 }
278
279 struct dnsheader* dh = reinterpret_cast<struct dnsheader*>(*response);
280 restoreFlags(dh, origFlags);
281
282 if (*responseLen == sizeof(dnsheader)) {
283 return true;
284 }
285
286 if(g_fixupCase) {
287 string realname = qname.toDNSString();
288 if (*responseLen >= (sizeof(dnsheader) + realname.length())) {
289 memcpy(*response + sizeof(dnsheader), realname.c_str(), realname.length());
290 }
291 }
292
293 if (ednsAdded || ecsAdded) {
294 uint16_t optStart;
295 size_t optLen = 0;
296 bool last = false;
297
298 const std::string responseStr(*response, *responseLen);
299 int res = locateEDNSOptRR(responseStr, &optStart, &optLen, &last);
300
301 if (res == 0) {
302 if (zeroScope) { // this finds if an EDNS Client Subnet scope was set, and if it is 0
303 size_t optContentStart = 0;
304 uint16_t optContentLen = 0;
305 /* we need at least 4 bytes after the option length (family: 2, source prefix-length: 1, scope prefix-length: 1) */
306 if (isEDNSOptionInOpt(responseStr, optStart, optLen, EDNSOptionCode::ECS, &optContentStart, &optContentLen) && optContentLen >= 4) {
307 /* see if the EDNS Client Subnet SCOPE PREFIX-LENGTH byte in position 3 is set to 0, which is the only thing
308 we care about. */
309 *zeroScope = responseStr.at(optContentStart + 3) == 0;
310 }
311 }
312
313 if (ednsAdded) {
314 /* we added the entire OPT RR,
315 therefore we need to remove it entirely */
316 if (last) {
317 /* simply remove the last AR */
318 *responseLen -= optLen;
319 uint16_t arcount = ntohs(dh->arcount);
320 arcount--;
321 dh->arcount = htons(arcount);
322 }
323 else {
324 /* Removing an intermediary RR could lead to compression error */
325 if (rewriteResponseWithoutEDNS(responseStr, rewrittenResponse) == 0) {
326 *responseLen = rewrittenResponse.size();
327 if (addRoom && (UINT16_MAX - *responseLen) > addRoom) {
328 rewrittenResponse.reserve(*responseLen + addRoom);
329 }
330 *responseSize = rewrittenResponse.capacity();
331 *response = reinterpret_cast<char*>(rewrittenResponse.data());
332 }
333 else {
334 warnlog("Error rewriting content");
335 }
336 }
337 }
338 else {
339 /* the OPT RR was already present, but without ECS,
340 we need to remove the ECS option if any */
341 if (last) {
342 /* nothing after the OPT RR, we can simply remove the
343 ECS option */
344 size_t existingOptLen = optLen;
345 removeEDNSOptionFromOPT(*response + optStart, &optLen, EDNSOptionCode::ECS);
346 *responseLen -= (existingOptLen - optLen);
347 }
348 else {
349 /* Removing an intermediary RR could lead to compression error */
350 if (rewriteResponseWithoutEDNSOption(responseStr, EDNSOptionCode::ECS, rewrittenResponse) == 0) {
351 *responseLen = rewrittenResponse.size();
352 if (addRoom && (UINT16_MAX - *responseLen) > addRoom) {
353 rewrittenResponse.reserve(*responseLen + addRoom);
354 }
355 *responseSize = rewrittenResponse.capacity();
356 *response = reinterpret_cast<char*>(rewrittenResponse.data());
357 }
358 else {
359 warnlog("Error rewriting content");
360 }
361 }
362 }
363 }
364 }
365
366 return true;
367 }
368
369 #ifdef HAVE_DNSCRYPT
370 static bool encryptResponse(char* response, uint16_t* responseLen, size_t responseSize, bool tcp, std::shared_ptr<DNSCryptQuery> dnsCryptQuery, dnsheader** dh, dnsheader* dhCopy)
371 {
372 if (dnsCryptQuery) {
373 uint16_t encryptedResponseLen = 0;
374
375 /* save the original header before encrypting it in place */
376 if (dh != nullptr && *dh != nullptr && dhCopy != nullptr) {
377 memcpy(dhCopy, *dh, sizeof(dnsheader));
378 *dh = dhCopy;
379 }
380
381 int res = dnsCryptQuery->encryptResponse(response, *responseLen, responseSize, tcp, &encryptedResponseLen);
382 if (res == 0) {
383 *responseLen = encryptedResponseLen;
384 } else {
385 /* dropping response */
386 vinfolog("Error encrypting the response, dropping.");
387 return false;
388 }
389 }
390 return true;
391 }
392 #endif /* HAVE_DNSCRYPT */
393
394 static bool applyRulesToResponse(LocalStateHolder<vector<DNSDistResponseRuleAction> >& localRespRulactions, DNSResponse& dr)
395 {
396 DNSResponseAction::Action action=DNSResponseAction::Action::None;
397 std::string ruleresult;
398 for(const auto& lr : *localRespRulactions) {
399 if(lr.d_rule->matches(&dr)) {
400 lr.d_rule->d_matches++;
401 action=(*lr.d_action)(&dr, &ruleresult);
402 switch(action) {
403 case DNSResponseAction::Action::Allow:
404 return true;
405 break;
406 case DNSResponseAction::Action::Drop:
407 return false;
408 break;
409 case DNSResponseAction::Action::HeaderModify:
410 return true;
411 break;
412 case DNSResponseAction::Action::ServFail:
413 dr.dh->rcode = RCode::ServFail;
414 return true;
415 break;
416 /* non-terminal actions follow */
417 case DNSResponseAction::Action::Delay:
418 dr.delayMsec = static_cast<int>(pdns_stou(ruleresult)); // sorry
419 break;
420 case DNSResponseAction::Action::None:
421 break;
422 }
423 }
424 }
425
426 return true;
427 }
428
429 bool processResponse(char** response, uint16_t* responseLen, size_t* responseSize, LocalStateHolder<vector<DNSDistResponseRuleAction> >& localRespRulactions, DNSResponse& dr, size_t addRoom, std::vector<uint8_t>& rewrittenResponse, bool muted)
430 {
431 if (!applyRulesToResponse(localRespRulactions, dr)) {
432 return false;
433 }
434
435 bool zeroScope = false;
436 if (!fixUpResponse(response, responseLen, responseSize, *dr.qname, dr.origFlags, dr.ednsAdded, dr.ecsAdded, rewrittenResponse, addRoom, dr.useZeroScope ? &zeroScope : nullptr)) {
437 return false;
438 }
439
440 if (dr.packetCache && !dr.skipCache) {
441 if (!dr.useZeroScope) {
442 /* if the query was not suitable for zero-scope, for
443 example because it had an existing ECS entry so the hash is
444 not really 'no ECS', so just insert it for the existing subnet
445 since:
446 - we don't have the correct hash for a non-ECS query
447 - inserting with hash computed before the ECS replacement but with
448 the subnet extracted _after_ the replacement would not work.
449 */
450 zeroScope = false;
451 }
452 // if zeroScope, pass the pre-ECS hash-key and do not pass the subnet to the cache
453 dr.packetCache->insert(zeroScope ? dr.cacheKeyNoECS : dr.cacheKey, zeroScope ? boost::none : dr.subnet, dr.origFlags, dr.dnssecOK, *dr.qname, dr.qtype, dr.qclass, *response, *responseLen, dr.tcp, dr.dh->rcode, dr.tempFailureTTL);
454 }
455
456 #ifdef HAVE_DNSCRYPT
457 if (!muted) {
458 if (!encryptResponse(*response, responseLen, *responseSize, dr.tcp, dr.dnsCryptQuery, nullptr, nullptr)) {
459 return false;
460 }
461 }
462 #endif /* HAVE_DNSCRYPT */
463
464 return true;
465 }
466
467 static bool sendUDPResponse(int origFD, const char* response, const uint16_t responseLen, const int delayMsec, const ComboAddress& origDest, const ComboAddress& origRemote)
468 {
469 if(delayMsec && g_delay) {
470 DelayedPacket dp{origFD, string(response,responseLen), origRemote, origDest};
471 g_delay->submit(dp, delayMsec);
472 }
473 else {
474 ssize_t res;
475 if(origDest.sin4.sin_family == 0) {
476 res = sendto(origFD, response, responseLen, 0, reinterpret_cast<const struct sockaddr*>(&origRemote), origRemote.getSocklen());
477 }
478 else {
479 res = sendfromto(origFD, response, responseLen, 0, origDest, origRemote);
480 }
481 if (res == -1) {
482 int err = errno;
483 vinfolog("Error sending response to %s: %s", origRemote.toStringWithPort(), strerror(err));
484 }
485 }
486
487 return true;
488 }
489
490
491 int pickBackendSocketForSending(std::shared_ptr<DownstreamState>& state)
492 {
493 return state->sockets[state->socketsOffset++ % state->sockets.size()];
494 }
495
496 static void pickBackendSocketsReadyForReceiving(const std::shared_ptr<DownstreamState>& state, std::vector<int>& ready)
497 {
498 ready.clear();
499
500 if (state->sockets.size() == 1) {
501 ready.push_back(state->sockets[0]);
502 return ;
503 }
504
505 {
506 std::lock_guard<std::mutex> lock(state->socketsLock);
507 state->mplexer->getAvailableFDs(ready, -1);
508 }
509 }
510
511 // listens on a dedicated socket, lobs answers from downstream servers to original requestors
512 void responderThread(std::shared_ptr<DownstreamState> dss)
513 try {
514 setThreadName("dnsdist/respond");
515 auto localRespRulactions = g_resprulactions.getLocal();
516 char packet[4096 + DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE];
517 static_assert(sizeof(packet) <= UINT16_MAX, "Packet size should fit in a uint16_t");
518 /* when the answer is encrypted in place, we need to get a copy
519 of the original header before encryption to fill the ring buffer */
520 dnsheader cleartextDH;
521 vector<uint8_t> rewrittenResponse;
522
523 uint16_t queryId = 0;
524 std::vector<int> sockets;
525 sockets.reserve(dss->sockets.size());
526
527 for(;;) {
528 dnsheader* dh = reinterpret_cast<struct dnsheader*>(packet);
529 try {
530 pickBackendSocketsReadyForReceiving(dss, sockets);
531 for (const auto& fd : sockets) {
532 ssize_t got = recv(fd, packet, sizeof(packet), 0);
533 char * response = packet;
534 size_t responseSize = sizeof(packet);
535
536 if (got < 0 || static_cast<size_t>(got) < sizeof(dnsheader))
537 continue;
538
539 uint16_t responseLen = static_cast<uint16_t>(got);
540 queryId = dh->id;
541
542 if(queryId >= dss->idStates.size()) {
543 continue;
544 }
545
546 IDState* ids = &dss->idStates[queryId];
547 int64_t usageIndicator = ids->usageIndicator;
548
549 if(usageIndicator < 0) {
550 /* the corresponding state is marked as not in use, meaning that:
551 - it was already cleaned up by another thread and the state is gone ;
552 - we already got a response for this query and this one is a duplicate.
553 Either way, we don't touch it.
554 */
555 continue;
556 }
557
558 /* read the potential DOHUnit state as soon as possible, but don't use it
559 until we have confirmed that we own this state by updating usageIndicator */
560 auto du = ids->du;
561 /* setting age to 0 to prevent the maintainer thread from
562 cleaning this IDS while we process the response.
563 */
564 ids->age = 0;
565 int origFD = ids->origFD;
566
567 unsigned int consumed = 0;
568 if (!responseContentMatches(response, responseLen, ids->qname, ids->qtype, ids->qclass, dss->remote, consumed)) {
569 continue;
570 }
571
572 bool isDoH = du != nullptr;
573 /* atomically mark the state as available, but only if it has not been altered
574 in the meantime */
575 if (ids->usageIndicator.compare_exchange_strong(usageIndicator, -1)) {
576 /* clear the potential DOHUnit asap, it's ours now
577 and since we just marked the state as unused,
578 someone could overwrite it. */
579 ids->du = nullptr;
580 /* we only decrement the outstanding counter if the value was not
581 altered in the meantime, which would mean that the state has been actively reused
582 and the other thread has not incremented the outstanding counter, so we don't
583 want it to be decremented twice. */
584 --dss->outstanding; // you'd think an attacker could game this, but we're using connected socket
585 } else {
586 /* someone updated the state in the meantime, we can't touch the existing pointer */
587 du = nullptr;
588 /* since the state has been updated, we can't safely access it so let's just drop
589 this response */
590 continue;
591 }
592
593 if(dh->tc && g_truncateTC) {
594 truncateTC(response, &responseLen, responseSize, consumed);
595 }
596
597 dh->id = ids->origID;
598
599 uint16_t addRoom = 0;
600 DNSResponse dr = makeDNSResponseFromIDState(*ids, dh, sizeof(packet), responseLen, false);
601 if (dr.dnsCryptQuery) {
602 addRoom = DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE;
603 }
604
605 memcpy(&cleartextDH, dr.dh, sizeof(cleartextDH));
606 if (!processResponse(&response, &responseLen, &responseSize, localRespRulactions, dr, addRoom, rewrittenResponse, ids->cs && ids->cs->muted)) {
607 continue;
608 }
609
610 if (ids->cs && !ids->cs->muted) {
611 if (du) {
612 #ifdef HAVE_DNS_OVER_HTTPS
613 // DoH query
614 du->response = std::string(response, responseLen);
615 if (send(du->rsock, &du, sizeof(du), 0) != sizeof(du)) {
616 /* at this point we have the only remaining pointer on this
617 DOHUnit object since we did set ids->du to nullptr earlier */
618 delete du;
619 }
620 #endif /* HAVE_DNS_OVER_HTTPS */
621 du = nullptr;
622 }
623 else {
624 ComboAddress empty;
625 empty.sin4.sin_family = 0;
626 /* if ids->destHarvested is false, origDest holds the listening address.
627 We don't want to use that as a source since it could be 0.0.0.0 for example. */
628 sendUDPResponse(origFD, response, responseLen, dr.delayMsec, ids->destHarvested ? ids->origDest : empty, ids->origRemote);
629 }
630 }
631
632 ++g_stats.responses;
633
634 double udiff = ids->sentTime.udiff();
635 vinfolog("Got answer from %s, relayed to %s%s, took %f usec", dss->remote.toStringWithPort(), ids->origRemote.toStringWithPort(),
636 isDoH ? " (https)": "", udiff);
637
638 struct timespec ts;
639 gettime(&ts);
640 g_rings.insertResponse(ts, *dr.remote, *dr.qname, dr.qtype, static_cast<unsigned int>(udiff), static_cast<unsigned int>(got), cleartextDH, dss->remote);
641
642 switch (dh->rcode) {
643 case RCode::NXDomain:
644 ++g_stats.frontendNXDomain;
645 break;
646 case RCode::ServFail:
647 ++g_stats.servfailResponses;
648 ++g_stats.frontendServFail;
649 break;
650 case RCode::NoError:
651 ++g_stats.frontendNoError;
652 break;
653 }
654 dss->latencyUsec = (127.0 * dss->latencyUsec / 128.0) + udiff/128.0;
655
656 doLatencyStats(udiff);
657
658 rewrittenResponse.clear();
659 }
660 }
661 catch(const std::exception& e){
662 vinfolog("Got an error in UDP responder thread while parsing a response from %s, id %d: %s", dss->remote.toStringWithPort(), queryId, e.what());
663 }
664 }
665 }
666 catch(const std::exception& e)
667 {
668 errlog("UDP responder thread died because of exception: %s", e.what());
669 }
670 catch(const PDNSException& e)
671 {
672 errlog("UDP responder thread died because of PowerDNS exception: %s", e.reason);
673 }
674 catch(...)
675 {
676 errlog("UDP responder thread died because of an exception: %s", "unknown");
677 }
678
679 bool DownstreamState::reconnect()
680 {
681 std::unique_lock<std::mutex> tl(connectLock, std::try_to_lock);
682 if (!tl.owns_lock()) {
683 /* we are already reconnecting */
684 return false;
685 }
686
687 connected = false;
688 for (auto& fd : sockets) {
689 if (fd != -1) {
690 if (sockets.size() > 1) {
691 std::lock_guard<std::mutex> lock(socketsLock);
692 mplexer->removeReadFD(fd);
693 }
694 /* shutdown() is needed to wake up recv() in the responderThread */
695 shutdown(fd, SHUT_RDWR);
696 close(fd);
697 fd = -1;
698 }
699 if (!IsAnyAddress(remote)) {
700 fd = SSocket(remote.sin4.sin_family, SOCK_DGRAM, 0);
701 if (!IsAnyAddress(sourceAddr)) {
702 SSetsockopt(fd, SOL_SOCKET, SO_REUSEADDR, 1);
703 SBind(fd, sourceAddr);
704 }
705 try {
706 SConnect(fd, remote);
707 if (sockets.size() > 1) {
708 std::lock_guard<std::mutex> lock(socketsLock);
709 mplexer->addReadFD(fd, [](int, boost::any) {});
710 }
711 connected = true;
712 }
713 catch(const std::runtime_error& error) {
714 infolog("Error connecting to new server with address %s: %s", remote.toStringWithPort(), error.what());
715 connected = false;
716 break;
717 }
718 }
719 }
720
721 /* if at least one (re-)connection failed, close all sockets */
722 if (!connected) {
723 for (auto& fd : sockets) {
724 if (fd != -1) {
725 if (sockets.size() > 1) {
726 std::lock_guard<std::mutex> lock(socketsLock);
727 mplexer->removeReadFD(fd);
728 }
729 /* shutdown() is needed to wake up recv() in the responderThread */
730 shutdown(fd, SHUT_RDWR);
731 close(fd);
732 fd = -1;
733 }
734 }
735 }
736
737 return connected;
738 }
739 void DownstreamState::hash()
740 {
741 vinfolog("Computing hashes for id=%s and weight=%d", id, weight);
742 auto w = weight;
743 WriteLock wl(&d_lock);
744 hashes.clear();
745 while (w > 0) {
746 std::string uuid = boost::str(boost::format("%s-%d") % id % w);
747 unsigned int wshash = burtleCI((const unsigned char*)uuid.c_str(), uuid.size(), g_hashperturb);
748 hashes.insert(wshash);
749 --w;
750 }
751 }
752
753 void DownstreamState::setId(const boost::uuids::uuid& newId)
754 {
755 id = newId;
756 // compute hashes only if already done
757 if (!hashes.empty()) {
758 hash();
759 }
760 }
761
762 void DownstreamState::setWeight(int newWeight)
763 {
764 if (newWeight < 1) {
765 errlog("Error setting server's weight: downstream weight value must be greater than 0.");
766 return ;
767 }
768 weight = newWeight;
769 if (!hashes.empty()) {
770 hash();
771 }
772 }
773
774 DownstreamState::DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf_, size_t numberOfSockets): remote(remote_), sourceAddr(sourceAddr_), sourceItf(sourceItf_)
775 {
776 pthread_rwlock_init(&d_lock, nullptr);
777 id = getUniqueID();
778 threadStarted.clear();
779
780 mplexer = std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
781
782 sockets.resize(numberOfSockets);
783 for (auto& fd : sockets) {
784 fd = -1;
785 }
786
787 if (!IsAnyAddress(remote)) {
788 reconnect();
789 idStates.resize(g_maxOutstanding);
790 sw.start();
791 infolog("Added downstream server %s", remote.toStringWithPort());
792 }
793
794 }
795
796 std::mutex g_luamutex;
797 LuaContext g_lua;
798
799 GlobalStateHolder<ServerPolicy> g_policy;
800
801 shared_ptr<DownstreamState> firstAvailable(const NumberedServerVector& servers, const DNSQuestion* dq)
802 {
803 for(auto& d : servers) {
804 if(d.second->isUp() && d.second->qps.check())
805 return d.second;
806 }
807 return leastOutstanding(servers, dq);
808 }
809
810 // get server with least outstanding queries, and within those, with the lowest order, and within those: the fastest
811 shared_ptr<DownstreamState> leastOutstanding(const NumberedServerVector& servers, const DNSQuestion* dq)
812 {
813 if (servers.size() == 1 && servers[0].second->isUp()) {
814 return servers[0].second;
815 }
816
817 vector<pair<tuple<int,int,double>, shared_ptr<DownstreamState>>> poss;
818 /* so you might wonder, why do we go through this trouble? The data on which we sort could change during the sort,
819 which would suck royally and could even lead to crashes. So first we snapshot on what we sort, and then we sort */
820 poss.reserve(servers.size());
821 for(auto& d : servers) {
822 if(d.second->isUp()) {
823 poss.push_back({make_tuple(d.second->outstanding.load(), d.second->order, d.second->latencyUsec), d.second});
824 }
825 }
826 if(poss.empty())
827 return shared_ptr<DownstreamState>();
828 nth_element(poss.begin(), poss.begin(), poss.end(), [](const decltype(poss)::value_type& a, const decltype(poss)::value_type& b) { return a.first < b.first; });
829 return poss.begin()->second;
830 }
831
832 shared_ptr<DownstreamState> valrandom(unsigned int val, const NumberedServerVector& servers, const DNSQuestion* dq)
833 {
834 vector<pair<int, shared_ptr<DownstreamState>>> poss;
835 int sum = 0;
836 int max = std::numeric_limits<int>::max();
837
838 for(auto& d : servers) { // w=1, w=10 -> 1, 11
839 if(d.second->isUp()) {
840 // Don't overflow sum when adding high weights
841 if(d.second->weight > max - sum) {
842 sum = max;
843 } else {
844 sum += d.second->weight;
845 }
846
847 poss.push_back({sum, d.second});
848 }
849 }
850
851 // Catch poss & sum are empty to avoid SIGFPE
852 if(poss.empty())
853 return shared_ptr<DownstreamState>();
854
855 int r = val % sum;
856 auto p = upper_bound(poss.begin(), poss.end(),r, [](int r_, const decltype(poss)::value_type& a) { return r_ < a.first;});
857 if(p==poss.end())
858 return shared_ptr<DownstreamState>();
859 return p->second;
860 }
861
862 shared_ptr<DownstreamState> wrandom(const NumberedServerVector& servers, const DNSQuestion* dq)
863 {
864 return valrandom(random(), servers, dq);
865 }
866
867 uint32_t g_hashperturb;
868 shared_ptr<DownstreamState> whashed(const NumberedServerVector& servers, const DNSQuestion* dq)
869 {
870 return valrandom(dq->qname->hash(g_hashperturb), servers, dq);
871 }
872
873 shared_ptr<DownstreamState> chashed(const NumberedServerVector& servers, const DNSQuestion* dq)
874 {
875 unsigned int qhash = dq->qname->hash(g_hashperturb);
876 unsigned int sel = std::numeric_limits<unsigned int>::max();
877 unsigned int min = std::numeric_limits<unsigned int>::max();
878 shared_ptr<DownstreamState> ret = nullptr, first = nullptr;
879
880 for (const auto& d: servers) {
881 if (d.second->isUp()) {
882 // make sure hashes have been computed
883 if (d.second->hashes.empty()) {
884 d.second->hash();
885 }
886 {
887 ReadLock rl(&(d.second->d_lock));
888 const auto& server = d.second;
889 // we want to keep track of the last hash
890 if (min > *(server->hashes.begin())) {
891 min = *(server->hashes.begin());
892 first = server;
893 }
894
895 auto hash_it = server->hashes.lower_bound(qhash);
896 if (hash_it != server->hashes.end()) {
897 if (*hash_it < sel) {
898 sel = *hash_it;
899 ret = server;
900 }
901 }
902 }
903 }
904 }
905 if (ret != nullptr) {
906 return ret;
907 }
908 if (first != nullptr) {
909 return first;
910 }
911 return shared_ptr<DownstreamState>();
912 }
913
914 shared_ptr<DownstreamState> roundrobin(const NumberedServerVector& servers, const DNSQuestion* dq)
915 {
916 NumberedServerVector poss;
917
918 for(auto& d : servers) {
919 if(d.second->isUp()) {
920 poss.push_back(d);
921 }
922 }
923
924 const auto *res=&poss;
925 if(poss.empty() && !g_roundrobinFailOnNoServer)
926 res = &servers;
927
928 if(res->empty())
929 return shared_ptr<DownstreamState>();
930
931 static unsigned int counter;
932
933 return (*res)[(counter++) % res->size()].second;
934 }
935
936 ComboAddress g_serverControl{"127.0.0.1:5199"};
937
938 std::shared_ptr<ServerPool> createPoolIfNotExists(pools_t& pools, const string& poolName)
939 {
940 std::shared_ptr<ServerPool> pool;
941 pools_t::iterator it = pools.find(poolName);
942 if (it != pools.end()) {
943 pool = it->second;
944 }
945 else {
946 if (!poolName.empty())
947 vinfolog("Creating pool %s", poolName);
948 pool = std::make_shared<ServerPool>();
949 pools.insert(std::pair<std::string,std::shared_ptr<ServerPool> >(poolName, pool));
950 }
951 return pool;
952 }
953
954 void setPoolPolicy(pools_t& pools, const string& poolName, std::shared_ptr<ServerPolicy> policy)
955 {
956 std::shared_ptr<ServerPool> pool = createPoolIfNotExists(pools, poolName);
957 if (!poolName.empty()) {
958 vinfolog("Setting pool %s server selection policy to %s", poolName, policy->name);
959 } else {
960 vinfolog("Setting default pool server selection policy to %s", policy->name);
961 }
962 pool->policy = policy;
963 }
964
965 void addServerToPool(pools_t& pools, const string& poolName, std::shared_ptr<DownstreamState> server)
966 {
967 std::shared_ptr<ServerPool> pool = createPoolIfNotExists(pools, poolName);
968 if (!poolName.empty()) {
969 vinfolog("Adding server to pool %s", poolName);
970 } else {
971 vinfolog("Adding server to default pool");
972 }
973 pool->addServer(server);
974 }
975
976 void removeServerFromPool(pools_t& pools, const string& poolName, std::shared_ptr<DownstreamState> server)
977 {
978 std::shared_ptr<ServerPool> pool = getPool(pools, poolName);
979
980 if (!poolName.empty()) {
981 vinfolog("Removing server from pool %s", poolName);
982 }
983 else {
984 vinfolog("Removing server from default pool");
985 }
986
987 pool->removeServer(server);
988 }
989
990 std::shared_ptr<ServerPool> getPool(const pools_t& pools, const std::string& poolName)
991 {
992 pools_t::const_iterator it = pools.find(poolName);
993
994 if (it == pools.end()) {
995 throw std::out_of_range("No pool named " + poolName);
996 }
997
998 return it->second;
999 }
1000
1001 NumberedServerVector getDownstreamCandidates(const pools_t& pools, const std::string& poolName)
1002 {
1003 std::shared_ptr<ServerPool> pool = getPool(pools, poolName);
1004 return pool->getServers();
1005 }
1006
1007 static void spoofResponseFromString(DNSQuestion& dq, const string& spoofContent)
1008 {
1009 string result;
1010
1011 std::vector<std::string> addrs;
1012 stringtok(addrs, spoofContent, " ,");
1013
1014 if (addrs.size() == 1) {
1015 try {
1016 ComboAddress spoofAddr(spoofContent);
1017 SpoofAction sa({spoofAddr});
1018 sa(&dq, &result);
1019 }
1020 catch(const PDNSException &e) {
1021 SpoofAction sa(spoofContent); // CNAME then
1022 sa(&dq, &result);
1023 }
1024 } else {
1025 std::vector<ComboAddress> cas;
1026 for (const auto& addr : addrs) {
1027 try {
1028 cas.push_back(ComboAddress(addr));
1029 }
1030 catch (...) {
1031 }
1032 }
1033 SpoofAction sa(cas);
1034 sa(&dq, &result);
1035 }
1036 }
1037
1038 static bool applyRulesToQuery(LocalHolders& holders, DNSQuestion& dq, string& poolname, const struct timespec& now)
1039 {
1040 g_rings.insertQuery(now, *dq.remote, *dq.qname, dq.qtype, dq.len, *dq.dh);
1041
1042 if(g_qcount.enabled) {
1043 string qname = (*dq.qname).toLogString();
1044 bool countQuery{true};
1045 if(g_qcount.filter) {
1046 std::lock_guard<std::mutex> lock(g_luamutex);
1047 std::tie (countQuery, qname) = g_qcount.filter(&dq);
1048 }
1049
1050 if(countQuery) {
1051 WriteLock wl(&g_qcount.queryLock);
1052 if(!g_qcount.records.count(qname)) {
1053 g_qcount.records[qname] = 0;
1054 }
1055 g_qcount.records[qname]++;
1056 }
1057 }
1058
1059 if(auto got = holders.dynNMGBlock->lookup(*dq.remote)) {
1060 auto updateBlockStats = [&got]() {
1061 ++g_stats.dynBlocked;
1062 got->second.blocks++;
1063 };
1064
1065 if(now < got->second.until) {
1066 DNSAction::Action action = got->second.action;
1067 if (action == DNSAction::Action::None) {
1068 action = g_dynBlockAction;
1069 }
1070 switch (action) {
1071 case DNSAction::Action::NoOp:
1072 /* do nothing */
1073 break;
1074
1075 case DNSAction::Action::Nxdomain:
1076 vinfolog("Query from %s turned into NXDomain because of dynamic block", dq.remote->toStringWithPort());
1077 updateBlockStats();
1078
1079 dq.dh->rcode = RCode::NXDomain;
1080 dq.dh->qr=true;
1081 return true;
1082
1083 case DNSAction::Action::Refused:
1084 vinfolog("Query from %s refused because of dynamic block", dq.remote->toStringWithPort());
1085 updateBlockStats();
1086
1087 dq.dh->rcode = RCode::Refused;
1088 dq.dh->qr = true;
1089 return true;
1090
1091 case DNSAction::Action::Truncate:
1092 if(!dq.tcp) {
1093 updateBlockStats();
1094 vinfolog("Query from %s truncated because of dynamic block", dq.remote->toStringWithPort());
1095 dq.dh->tc = true;
1096 dq.dh->qr = true;
1097 return true;
1098 }
1099 else {
1100 vinfolog("Query from %s for %s over TCP *not* truncated because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toLogString());
1101 }
1102 break;
1103 case DNSAction::Action::NoRecurse:
1104 updateBlockStats();
1105 vinfolog("Query from %s setting rd=0 because of dynamic block", dq.remote->toStringWithPort());
1106 dq.dh->rd = false;
1107 return true;
1108 default:
1109 updateBlockStats();
1110 vinfolog("Query from %s dropped because of dynamic block", dq.remote->toStringWithPort());
1111 return false;
1112 }
1113 }
1114 }
1115
1116 if(auto got = holders.dynSMTBlock->lookup(*dq.qname)) {
1117 auto updateBlockStats = [&got]() {
1118 ++g_stats.dynBlocked;
1119 got->blocks++;
1120 };
1121
1122 if(now < got->until) {
1123 DNSAction::Action action = got->action;
1124 if (action == DNSAction::Action::None) {
1125 action = g_dynBlockAction;
1126 }
1127 switch (action) {
1128 case DNSAction::Action::NoOp:
1129 /* do nothing */
1130 break;
1131 case DNSAction::Action::Nxdomain:
1132 vinfolog("Query from %s for %s turned into NXDomain because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toLogString());
1133 updateBlockStats();
1134
1135 dq.dh->rcode = RCode::NXDomain;
1136 dq.dh->qr=true;
1137 return true;
1138 case DNSAction::Action::Refused:
1139 vinfolog("Query from %s for %s refused because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toLogString());
1140 updateBlockStats();
1141
1142 dq.dh->rcode = RCode::Refused;
1143 dq.dh->qr=true;
1144 return true;
1145 case DNSAction::Action::Truncate:
1146 if(!dq.tcp) {
1147 updateBlockStats();
1148
1149 vinfolog("Query from %s for %s truncated because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toLogString());
1150 dq.dh->tc = true;
1151 dq.dh->qr = true;
1152 return true;
1153 }
1154 else {
1155 vinfolog("Query from %s for %s over TCP *not* truncated because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toLogString());
1156 }
1157 break;
1158 case DNSAction::Action::NoRecurse:
1159 updateBlockStats();
1160 vinfolog("Query from %s setting rd=0 because of dynamic block", dq.remote->toStringWithPort());
1161 dq.dh->rd = false;
1162 return true;
1163 default:
1164 updateBlockStats();
1165 vinfolog("Query from %s for %s dropped because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toLogString());
1166 return false;
1167 }
1168 }
1169 }
1170
1171 DNSAction::Action action=DNSAction::Action::None;
1172 string ruleresult;
1173 for(const auto& lr : *holders.rulactions) {
1174 if(lr.d_rule->matches(&dq)) {
1175 lr.d_rule->d_matches++;
1176 action=(*lr.d_action)(&dq, &ruleresult);
1177
1178 switch(action) {
1179 case DNSAction::Action::Allow:
1180 return true;
1181 break;
1182 case DNSAction::Action::Drop:
1183 ++g_stats.ruleDrop;
1184 return false;
1185 break;
1186 case DNSAction::Action::Nxdomain:
1187 dq.dh->rcode = RCode::NXDomain;
1188 dq.dh->qr=true;
1189 ++g_stats.ruleNXDomain;
1190 return true;
1191 break;
1192 case DNSAction::Action::Refused:
1193 dq.dh->rcode = RCode::Refused;
1194 dq.dh->qr=true;
1195 ++g_stats.ruleRefused;
1196 return true;
1197 break;
1198 case DNSAction::Action::ServFail:
1199 dq.dh->rcode = RCode::ServFail;
1200 dq.dh->qr=true;
1201 ++g_stats.ruleServFail;
1202 return true;
1203 break;
1204 case DNSAction::Action::Spoof:
1205 spoofResponseFromString(dq, ruleresult);
1206 return true;
1207 break;
1208 case DNSAction::Action::Truncate:
1209 dq.dh->tc = true;
1210 dq.dh->qr = true;
1211 return true;
1212 break;
1213 case DNSAction::Action::HeaderModify:
1214 return true;
1215 break;
1216 case DNSAction::Action::Pool:
1217 poolname=ruleresult;
1218 return true;
1219 break;
1220 /* non-terminal actions follow */
1221 case DNSAction::Action::Delay:
1222 dq.delayMsec = static_cast<int>(pdns_stou(ruleresult)); // sorry
1223 break;
1224 case DNSAction::Action::None:
1225 /* fall-through */
1226 case DNSAction::Action::NoOp:
1227 break;
1228 case DNSAction::Action::NoRecurse:
1229 dq.dh->rd = false;
1230 return true;
1231 break;
1232 }
1233 }
1234 }
1235
1236 return true;
1237 }
1238
1239 ssize_t udpClientSendRequestToBackend(const std::shared_ptr<DownstreamState>& ss, const int sd, const char* request, const size_t requestLen, bool healthCheck)
1240 {
1241 ssize_t result;
1242
1243 if (ss->sourceItf == 0) {
1244 result = send(sd, request, requestLen, 0);
1245 }
1246 else {
1247 struct msghdr msgh;
1248 struct iovec iov;
1249 cmsgbuf_aligned cbuf;
1250 ComboAddress remote(ss->remote);
1251 fillMSGHdr(&msgh, &iov, &cbuf, sizeof(cbuf), const_cast<char*>(request), requestLen, &remote);
1252 addCMsgSrcAddr(&msgh, &cbuf, &ss->sourceAddr, ss->sourceItf);
1253 result = sendmsg(sd, &msgh, 0);
1254 }
1255
1256 if (result == -1) {
1257 int savederrno = errno;
1258 vinfolog("Error sending request to backend %s: %d", ss->remote.toStringWithPort(), savederrno);
1259
1260 /* This might sound silly, but on Linux send() might fail with EINVAL
1261 if the interface the socket was bound to doesn't exist anymore.
1262 We don't want to reconnect the real socket if the healthcheck failed,
1263 because it's not using the same socket.
1264 */
1265 if (!healthCheck && (savederrno == EINVAL || savederrno == ENODEV)) {
1266 ss->reconnect();
1267 }
1268 }
1269
1270 return result;
1271 }
1272
1273 static bool isUDPQueryAcceptable(ClientState& cs, LocalHolders& holders, const struct msghdr* msgh, const ComboAddress& remote, ComboAddress& dest)
1274 {
1275 if (msgh->msg_flags & MSG_TRUNC) {
1276 /* message was too large for our buffer */
1277 vinfolog("Dropping message too large for our buffer");
1278 ++g_stats.nonCompliantQueries;
1279 return false;
1280 }
1281
1282 if(!holders.acl->match(remote)) {
1283 vinfolog("Query from %s dropped because of ACL", remote.toStringWithPort());
1284 ++g_stats.aclDrops;
1285 return false;
1286 }
1287
1288 cs.queries++;
1289 ++g_stats.queries;
1290
1291 if (HarvestDestinationAddress(msgh, &dest)) {
1292 /* we don't get the port, only the address */
1293 dest.sin4.sin_port = cs.local.sin4.sin_port;
1294 }
1295 else {
1296 dest.sin4.sin_family = 0;
1297 }
1298
1299 return true;
1300 }
1301
1302 boost::optional<std::vector<uint8_t>> checkDNSCryptQuery(const ClientState& cs, const char* query, uint16_t& len, std::shared_ptr<DNSCryptQuery>& dnsCryptQuery, time_t now, bool tcp)
1303 {
1304 if (cs.dnscryptCtx) {
1305 #ifdef HAVE_DNSCRYPT
1306 vector<uint8_t> response;
1307 uint16_t decryptedQueryLen = 0;
1308
1309 dnsCryptQuery = std::make_shared<DNSCryptQuery>(cs.dnscryptCtx);
1310
1311 bool decrypted = handleDNSCryptQuery(const_cast<char*>(query), len, dnsCryptQuery, &decryptedQueryLen, tcp, now, response);
1312
1313 if (!decrypted) {
1314 if (response.size() > 0) {
1315 return response;
1316 }
1317 throw std::runtime_error("Unable to decrypt DNSCrypt query, dropping.");
1318 }
1319
1320 len = decryptedQueryLen;
1321 #endif /* HAVE_DNSCRYPT */
1322 }
1323 return boost::none;
1324 }
1325
1326 bool checkQueryHeaders(const struct dnsheader* dh)
1327 {
1328 if (dh->qr) { // don't respond to responses
1329 ++g_stats.nonCompliantQueries;
1330 return false;
1331 }
1332
1333 if (dh->qdcount == 0) {
1334 ++g_stats.emptyQueries;
1335 return false;
1336 }
1337
1338 if (dh->rd) {
1339 ++g_stats.rdQueries;
1340 }
1341
1342 return true;
1343 }
1344
1345 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1346 static void queueResponse(const ClientState& cs, const char* response, uint16_t responseLen, const ComboAddress& dest, const ComboAddress& remote, struct mmsghdr& outMsg, struct iovec* iov, cmsgbuf_aligned* cbuf)
1347 {
1348 outMsg.msg_len = 0;
1349 fillMSGHdr(&outMsg.msg_hdr, iov, nullptr, 0, const_cast<char*>(response), responseLen, const_cast<ComboAddress*>(&remote));
1350
1351 if (dest.sin4.sin_family == 0) {
1352 outMsg.msg_hdr.msg_control = nullptr;
1353 }
1354 else {
1355 addCMsgSrcAddr(&outMsg.msg_hdr, cbuf, &dest, 0);
1356 }
1357 }
1358 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1359
1360 /* self-generated responses or cache hits */
1361 static bool prepareOutgoingResponse(LocalHolders& holders, ClientState& cs, DNSQuestion& dq, bool cacheHit)
1362 {
1363 DNSResponse dr(dq.qname, dq.qtype, dq.qclass, dq.consumed, dq.local, dq.remote, reinterpret_cast<dnsheader*>(dq.dh), dq.size, dq.len, dq.tcp, dq.queryTime);
1364
1365 #ifdef HAVE_PROTOBUF
1366 dr.uniqueId = dq.uniqueId;
1367 #endif
1368 dr.qTag = dq.qTag;
1369 dr.delayMsec = dq.delayMsec;
1370
1371 if (!applyRulesToResponse(cacheHit ? holders.cacheHitRespRulactions : holders.selfAnsweredRespRulactions, dr)) {
1372 return false;
1373 }
1374
1375 /* in case a rule changed it */
1376 dq.delayMsec = dr.delayMsec;
1377
1378 #ifdef HAVE_DNSCRYPT
1379 if (!cs.muted) {
1380 if (!encryptResponse(reinterpret_cast<char*>(dq.dh), &dq.len, dq.size, dq.tcp, dq.dnsCryptQuery, nullptr, nullptr)) {
1381 return false;
1382 }
1383 }
1384 #endif /* HAVE_DNSCRYPT */
1385
1386 if (cacheHit) {
1387 ++g_stats.cacheHits;
1388 }
1389
1390 switch (dr.dh->rcode) {
1391 case RCode::NXDomain:
1392 ++g_stats.frontendNXDomain;
1393 break;
1394 case RCode::ServFail:
1395 ++g_stats.frontendServFail;
1396 break;
1397 case RCode::NoError:
1398 ++g_stats.frontendNoError;
1399 break;
1400 }
1401
1402 doLatencyStats(0); // we're not going to measure this
1403 return true;
1404 }
1405
1406 ProcessQueryResult processQuery(DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend)
1407 {
1408 const uint16_t queryId = ntohs(dq.dh->id);
1409
1410 try {
1411 /* we need an accurate ("real") value for the response and
1412 to store into the IDS, but not for insertion into the
1413 rings for example */
1414 struct timespec now;
1415 gettime(&now);
1416
1417 string poolname;
1418
1419 if (!applyRulesToQuery(holders, dq, poolname, now)) {
1420 return ProcessQueryResult::Drop;
1421 }
1422
1423 if(dq.dh->qr) { // something turned it into a response
1424 fixUpQueryTurnedResponse(dq, dq.origFlags);
1425
1426 if (!prepareOutgoingResponse(holders, cs, dq, false)) {
1427 return ProcessQueryResult::Drop;
1428 }
1429
1430 ++g_stats.selfAnswered;
1431 return ProcessQueryResult::SendAnswer;
1432 }
1433
1434 std::shared_ptr<ServerPool> serverPool = getPool(*holders.pools, poolname);
1435 dq.packetCache = serverPool->packetCache;
1436 auto policy = *(holders.policy);
1437 if (serverPool->policy != nullptr) {
1438 policy = *(serverPool->policy);
1439 }
1440 auto servers = serverPool->getServers();
1441 if (policy.isLua) {
1442 std::lock_guard<std::mutex> lock(g_luamutex);
1443 selectedBackend = policy.policy(servers, &dq);
1444 }
1445 else {
1446 selectedBackend = policy.policy(servers, &dq);
1447 }
1448
1449 uint16_t cachedResponseSize = dq.size;
1450 uint32_t allowExpired = selectedBackend ? 0 : g_staleCacheEntriesTTL;
1451
1452 if (dq.packetCache && !dq.skipCache) {
1453 dq.dnssecOK = (getEDNSZ(dq) & EDNS_HEADER_FLAG_DO);
1454 }
1455
1456 if (dq.useECS && ((selectedBackend && selectedBackend->useECS) || (!selectedBackend && serverPool->getECS()))) {
1457 // we special case our cache in case a downstream explicitly gave us a universally valid response with a 0 scope
1458 if (dq.packetCache && !dq.skipCache && (!selectedBackend || !selectedBackend->disableZeroScope) && dq.packetCache->isECSParsingEnabled()) {
1459 if (dq.packetCache->get(dq, dq.consumed, dq.dh->id, reinterpret_cast<char*>(dq.dh), &cachedResponseSize, &dq.cacheKeyNoECS, dq.subnet, dq.dnssecOK, allowExpired)) {
1460 dq.len = cachedResponseSize;
1461
1462 if (!prepareOutgoingResponse(holders, cs, dq, true)) {
1463 return ProcessQueryResult::Drop;
1464 }
1465
1466 return ProcessQueryResult::SendAnswer;
1467 }
1468
1469 if (!dq.subnet) {
1470 /* there was no existing ECS on the query, enable the zero-scope feature */
1471 dq.useZeroScope = true;
1472 }
1473 }
1474
1475 if (!handleEDNSClientSubnet(dq, &(dq.ednsAdded), &(dq.ecsAdded), g_preserveTrailingData)) {
1476 vinfolog("Dropping query from %s because we couldn't insert the ECS value", dq.remote->toStringWithPort());
1477 return ProcessQueryResult::Drop;
1478 }
1479 }
1480
1481 if (dq.packetCache && !dq.skipCache) {
1482 if (dq.packetCache->get(dq, dq.consumed, dq.dh->id, reinterpret_cast<char*>(dq.dh), &cachedResponseSize, &dq.cacheKey, dq.subnet, dq.dnssecOK, allowExpired)) {
1483 dq.len = cachedResponseSize;
1484
1485 if (!prepareOutgoingResponse(holders, cs, dq, true)) {
1486 return ProcessQueryResult::Drop;
1487 }
1488
1489 return ProcessQueryResult::SendAnswer;
1490 }
1491 ++g_stats.cacheMisses;
1492 }
1493
1494 if(!selectedBackend) {
1495 ++g_stats.noPolicy;
1496
1497 vinfolog("%s query for %s|%s from %s, no policy applied", g_servFailOnNoPolicy ? "ServFailed" : "Dropped", dq.qname->toLogString(), QType(dq.qtype).getName(), dq.remote->toStringWithPort());
1498 if (g_servFailOnNoPolicy) {
1499 restoreFlags(dq.dh, dq.origFlags);
1500
1501 dq.dh->rcode = RCode::ServFail;
1502 dq.dh->qr = true;
1503
1504 if (!prepareOutgoingResponse(holders, cs, dq, false)) {
1505 return ProcessQueryResult::Drop;
1506 }
1507 // no response-only statistics counter to update.
1508 return ProcessQueryResult::SendAnswer;
1509 }
1510
1511 return ProcessQueryResult::Drop;
1512 }
1513
1514 if (dq.addXPF && selectedBackend->xpfRRCode != 0) {
1515 addXPF(dq, selectedBackend->xpfRRCode, g_preserveTrailingData);
1516 }
1517
1518 selectedBackend->queries++;
1519 return ProcessQueryResult::PassToBackend;
1520 }
1521 catch(const std::exception& e){
1522 vinfolog("Got an error while parsing a %s query from %s, id %d: %s", (dq.tcp ? "TCP" : "UDP"), dq.remote->toStringWithPort(), queryId, e.what());
1523 }
1524 return ProcessQueryResult::Drop;
1525 }
1526
1527 static void processUDPQuery(ClientState& cs, LocalHolders& holders, const struct msghdr* msgh, const ComboAddress& remote, ComboAddress& dest, char* query, uint16_t len, size_t queryBufferSize, struct mmsghdr* responsesVect, unsigned int* queuedResponses, struct iovec* respIOV, cmsgbuf_aligned* respCBuf)
1528 {
1529 assert(responsesVect == nullptr || (queuedResponses != nullptr && respIOV != nullptr && respCBuf != nullptr));
1530 uint16_t queryId = 0;
1531
1532 try {
1533 if (!isUDPQueryAcceptable(cs, holders, msgh, remote, dest)) {
1534 return;
1535 }
1536
1537 /* we need an accurate ("real") value for the response and
1538 to store into the IDS, but not for insertion into the
1539 rings for example */
1540 struct timespec queryRealTime;
1541 gettime(&queryRealTime, true);
1542
1543 std::shared_ptr<DNSCryptQuery> dnsCryptQuery = nullptr;
1544 auto dnsCryptResponse = checkDNSCryptQuery(cs, query, len, dnsCryptQuery, queryRealTime.tv_sec, false);
1545 if (dnsCryptResponse) {
1546 sendUDPResponse(cs.udpFD, reinterpret_cast<char*>(dnsCryptResponse->data()), static_cast<uint16_t>(dnsCryptResponse->size()), 0, dest, remote);
1547 return;
1548 }
1549
1550 struct dnsheader* dh = reinterpret_cast<struct dnsheader*>(query);
1551 queryId = ntohs(dh->id);
1552
1553 if (!checkQueryHeaders(dh)) {
1554 return;
1555 }
1556
1557 uint16_t qtype, qclass;
1558 unsigned int consumed = 0;
1559 DNSName qname(query, len, sizeof(dnsheader), false, &qtype, &qclass, &consumed);
1560 DNSQuestion dq(&qname, qtype, qclass, consumed, dest.sin4.sin_family != 0 ? &dest : &cs.local, &remote, dh, queryBufferSize, len, false, &queryRealTime);
1561 dq.dnsCryptQuery = std::move(dnsCryptQuery);
1562 std::shared_ptr<DownstreamState> ss{nullptr};
1563 auto result = processQuery(dq, cs, holders, ss);
1564
1565 if (result == ProcessQueryResult::Drop) {
1566 return;
1567 }
1568
1569 if (result == ProcessQueryResult::SendAnswer) {
1570 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1571 if (dq.delayMsec == 0 && responsesVect != nullptr) {
1572 queueResponse(cs, reinterpret_cast<char*>(dq.dh), dq.len, *dq.local, *dq.remote, responsesVect[*queuedResponses], respIOV, respCBuf);
1573 (*queuedResponses)++;
1574 return;
1575 }
1576 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1577 /* we use dest, always, because we don't want to use the listening address to send a response since it could be 0.0.0.0 */
1578 sendUDPResponse(cs.udpFD, reinterpret_cast<char*>(dq.dh), dq.len, dq.delayMsec, dest, *dq.remote);
1579 return;
1580 }
1581
1582 if (result != ProcessQueryResult::PassToBackend || ss == nullptr) {
1583 return;
1584 }
1585
1586 unsigned int idOffset = (ss->idOffset++) % ss->idStates.size();
1587 IDState* ids = &ss->idStates[idOffset];
1588 ids->age = 0;
1589 DOHUnit* du = nullptr;
1590
1591 /* that means that the state was in use, possibly with an allocated
1592 DOHUnit that we will need to handle, but we can't touch it before
1593 confirming that we now own this state */
1594 if (ids->usageIndicator != -1) {
1595 du = ids->du;
1596 }
1597
1598 /* we atomically replace the value, we now own this state */
1599 auto generation = ids->generation++;
1600 int64_t oldUsage = ids->usageIndicator.exchange(generation);
1601 if(oldUsage < 0) {
1602 /* the value was -1, meaning that the state was not in use.
1603 we reset 'du' because it might have still been in use when we read it. */
1604 du = nullptr;
1605 ++ss->outstanding;
1606 }
1607 else {
1608 /* we are reusing a state, no change in outstanding but if there was an existing DOHUnit we need
1609 to handle it because it's about to be overwritten. */
1610 ids->du = nullptr;
1611 ++ss->reuseds;
1612 ++g_stats.downstreamTimeouts;
1613 handleDOHTimeout(du);
1614 }
1615
1616 ids->cs = &cs;
1617 ids->origFD = cs.udpFD;
1618 ids->origID = dh->id;
1619 setIDStateFromDNSQuestion(*ids, dq, std::move(qname));
1620
1621 /* If we couldn't harvest the real dest addr, still
1622 write down the listening addr since it will be useful
1623 (especially if it's not an 'any' one).
1624 We need to keep track of which one it is since we may
1625 want to use the real but not the listening addr to reply.
1626 */
1627 if (dest.sin4.sin_family != 0) {
1628 ids->origDest = dest;
1629 ids->destHarvested = true;
1630 }
1631 else {
1632 ids->origDest = cs.local;
1633 ids->destHarvested = false;
1634 }
1635
1636 dh->id = idOffset;
1637
1638 int fd = pickBackendSocketForSending(ss);
1639 ssize_t ret = udpClientSendRequestToBackend(ss, fd, query, dq.len);
1640
1641 if(ret < 0) {
1642 ++ss->sendErrors;
1643 ++g_stats.downstreamSendErrors;
1644 }
1645
1646 vinfolog("Got query for %s|%s from %s, relayed to %s", ids->qname.toLogString(), QType(ids->qtype).getName(), remote.toStringWithPort(), ss->getName());
1647 }
1648 catch(const std::exception& e){
1649 vinfolog("Got an error in UDP question thread while parsing a query from %s, id %d: %s", remote.toStringWithPort(), queryId, e.what());
1650 }
1651 }
1652
1653 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1654 static void MultipleMessagesUDPClientThread(ClientState* cs, LocalHolders& holders)
1655 {
1656 struct MMReceiver
1657 {
1658 char packet[4096];
1659 ComboAddress remote;
1660 ComboAddress dest;
1661 struct iovec iov;
1662 /* used by HarvestDestinationAddress */
1663 cmsgbuf_aligned cbuf;
1664 };
1665 const size_t vectSize = g_udpVectorSize;
1666 /* the actual buffer is larger because:
1667 - we may have to add EDNS and/or ECS
1668 - we use it for self-generated responses (from rule or cache)
1669 but we only accept incoming payloads up to that size
1670 */
1671 static_assert(s_udpIncomingBufferSize <= sizeof(MMReceiver::packet), "the incoming buffer size should not be larger than sizeof(MMReceiver::packet)");
1672
1673 auto recvData = std::unique_ptr<MMReceiver[]>(new MMReceiver[vectSize]);
1674 auto msgVec = std::unique_ptr<struct mmsghdr[]>(new struct mmsghdr[vectSize]);
1675 auto outMsgVec = std::unique_ptr<struct mmsghdr[]>(new struct mmsghdr[vectSize]);
1676
1677 /* initialize the structures needed to receive our messages */
1678 for (size_t idx = 0; idx < vectSize; idx++) {
1679 recvData[idx].remote.sin4.sin_family = cs->local.sin4.sin_family;
1680 fillMSGHdr(&msgVec[idx].msg_hdr, &recvData[idx].iov, &recvData[idx].cbuf, sizeof(recvData[idx].cbuf), recvData[idx].packet, s_udpIncomingBufferSize, &recvData[idx].remote);
1681 }
1682
1683 /* go now */
1684 for(;;) {
1685
1686 /* reset the IO vector, since it's also used to send the vector of responses
1687 to avoid having to copy the data around */
1688 for (size_t idx = 0; idx < vectSize; idx++) {
1689 recvData[idx].iov.iov_base = recvData[idx].packet;
1690 recvData[idx].iov.iov_len = sizeof(recvData[idx].packet);
1691 }
1692
1693 /* block until we have at least one message ready, but return
1694 as many as possible to save the syscall costs */
1695 int msgsGot = recvmmsg(cs->udpFD, msgVec.get(), vectSize, MSG_WAITFORONE | MSG_TRUNC, nullptr);
1696
1697 if (msgsGot <= 0) {
1698 vinfolog("Getting UDP messages via recvmmsg() failed with: %s", strerror(errno));
1699 continue;
1700 }
1701
1702 unsigned int msgsToSend = 0;
1703
1704 /* process the received messages */
1705 for (int msgIdx = 0; msgIdx < msgsGot; msgIdx++) {
1706 const struct msghdr* msgh = &msgVec[msgIdx].msg_hdr;
1707 unsigned int got = msgVec[msgIdx].msg_len;
1708 const ComboAddress& remote = recvData[msgIdx].remote;
1709
1710 if (static_cast<size_t>(got) < sizeof(struct dnsheader)) {
1711 ++g_stats.nonCompliantQueries;
1712 continue;
1713 }
1714
1715 processUDPQuery(*cs, holders, msgh, remote, recvData[msgIdx].dest, recvData[msgIdx].packet, static_cast<uint16_t>(got), sizeof(recvData[msgIdx].packet), outMsgVec.get(), &msgsToSend, &recvData[msgIdx].iov, &recvData[msgIdx].cbuf);
1716
1717 }
1718
1719 /* immediate (not delayed or sent to a backend) responses (mostly from a rule, dynamic block
1720 or the cache) can be sent in batch too */
1721
1722 if (msgsToSend > 0 && msgsToSend <= static_cast<unsigned int>(msgsGot)) {
1723 int sent = sendmmsg(cs->udpFD, outMsgVec.get(), msgsToSend, 0);
1724
1725 if (sent < 0 || static_cast<unsigned int>(sent) != msgsToSend) {
1726 vinfolog("Error sending responses with sendmmsg() (%d on %u): %s", sent, msgsToSend, strerror(errno));
1727 }
1728 }
1729
1730 }
1731 }
1732 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1733
1734 // listens to incoming queries, sends out to downstream servers, noting the intended return path
1735 static void udpClientThread(ClientState* cs)
1736 try
1737 {
1738 setThreadName("dnsdist/udpClie");
1739 LocalHolders holders;
1740
1741 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1742 if (g_udpVectorSize > 1) {
1743 MultipleMessagesUDPClientThread(cs, holders);
1744
1745 }
1746 else
1747 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1748 {
1749 char packet[4096];
1750 /* the actual buffer is larger because:
1751 - we may have to add EDNS and/or ECS
1752 - we use it for self-generated responses (from rule or cache)
1753 but we only accept incoming payloads up to that size
1754 */
1755 static_assert(s_udpIncomingBufferSize <= sizeof(packet), "the incoming buffer size should not be larger than sizeof(MMReceiver::packet)");
1756 struct msghdr msgh;
1757 struct iovec iov;
1758 /* used by HarvestDestinationAddress */
1759 cmsgbuf_aligned cbuf;
1760
1761 ComboAddress remote;
1762 ComboAddress dest;
1763 remote.sin4.sin_family = cs->local.sin4.sin_family;
1764 fillMSGHdr(&msgh, &iov, &cbuf, sizeof(cbuf), packet, sizeof(packet), &remote);
1765
1766 for(;;) {
1767 ssize_t got = recvmsg(cs->udpFD, &msgh, 0);
1768
1769 if (got < 0 || static_cast<size_t>(got) < sizeof(struct dnsheader)) {
1770 ++g_stats.nonCompliantQueries;
1771 continue;
1772 }
1773
1774 processUDPQuery(*cs, holders, &msgh, remote, dest, packet, static_cast<uint16_t>(got), s_udpIncomingBufferSize, nullptr, nullptr, nullptr, nullptr);
1775 }
1776 }
1777 }
1778 catch(const std::exception &e)
1779 {
1780 errlog("UDP client thread died because of exception: %s", e.what());
1781 }
1782 catch(const PDNSException &e)
1783 {
1784 errlog("UDP client thread died because of PowerDNS exception: %s", e.reason);
1785 }
1786 catch(...)
1787 {
1788 errlog("UDP client thread died because of an exception: %s", "unknown");
1789 }
1790
1791 uint16_t getRandomDNSID()
1792 {
1793 #ifdef HAVE_LIBSODIUM
1794 return (randombytes_random() % 65536);
1795 #else
1796 return (random() % 65536);
1797 #endif
1798 }
1799
1800 static bool upCheck(const shared_ptr<DownstreamState>& ds)
1801 try
1802 {
1803 DNSName checkName = ds->checkName;
1804 uint16_t checkType = ds->checkType.getCode();
1805 uint16_t checkClass = ds->checkClass;
1806 dnsheader checkHeader;
1807 memset(&checkHeader, 0, sizeof(checkHeader));
1808
1809 checkHeader.qdcount = htons(1);
1810 checkHeader.id = getRandomDNSID();
1811
1812 checkHeader.rd = true;
1813 if (ds->setCD) {
1814 checkHeader.cd = true;
1815 }
1816
1817 if (ds->checkFunction) {
1818 std::lock_guard<std::mutex> lock(g_luamutex);
1819 auto ret = ds->checkFunction(checkName, checkType, checkClass, &checkHeader);
1820 checkName = std::get<0>(ret);
1821 checkType = std::get<1>(ret);
1822 checkClass = std::get<2>(ret);
1823 }
1824
1825 vector<uint8_t> packet;
1826 DNSPacketWriter dpw(packet, checkName, checkType, checkClass);
1827 dnsheader * requestHeader = dpw.getHeader();
1828 *requestHeader = checkHeader;
1829
1830 Socket sock(ds->remote.sin4.sin_family, SOCK_DGRAM);
1831 sock.setNonBlocking();
1832 if (!IsAnyAddress(ds->sourceAddr)) {
1833 sock.setReuseAddr();
1834 sock.bind(ds->sourceAddr);
1835 }
1836 sock.connect(ds->remote);
1837 ssize_t sent = udpClientSendRequestToBackend(ds, sock.getHandle(), reinterpret_cast<char*>(&packet[0]), packet.size(), true);
1838 if (sent < 0) {
1839 int ret = errno;
1840 if (g_verboseHealthChecks)
1841 infolog("Error while sending a health check query to backend %s: %d", ds->getNameWithAddr(), ret);
1842 return false;
1843 }
1844
1845 int ret = waitForRWData(sock.getHandle(), true, /* ms to seconds */ ds->checkTimeout / 1000, /* remaining ms to us */ (ds->checkTimeout % 1000) * 1000);
1846 if(ret < 0 || !ret) { // error, timeout, both are down!
1847 if (ret < 0) {
1848 ret = errno;
1849 if (g_verboseHealthChecks)
1850 infolog("Error while waiting for the health check response from backend %s: %d", ds->getNameWithAddr(), ret);
1851 }
1852 else {
1853 if (g_verboseHealthChecks)
1854 infolog("Timeout while waiting for the health check response from backend %s", ds->getNameWithAddr());
1855 }
1856 return false;
1857 }
1858
1859 string reply;
1860 ComboAddress from;
1861 sock.recvFrom(reply, from);
1862
1863 /* we are using a connected socket but hey.. */
1864 if (from != ds->remote) {
1865 if (g_verboseHealthChecks)
1866 infolog("Invalid health check response received from %s, expecting one from %s", from.toStringWithPort(), ds->remote.toStringWithPort());
1867 return false;
1868 }
1869
1870 const dnsheader * responseHeader = reinterpret_cast<const dnsheader *>(reply.c_str());
1871
1872 if (reply.size() < sizeof(*responseHeader)) {
1873 if (g_verboseHealthChecks)
1874 infolog("Invalid health check response of size %d from backend %s, expecting at least %d", reply.size(), ds->getNameWithAddr(), sizeof(*responseHeader));
1875 return false;
1876 }
1877
1878 if (responseHeader->id != requestHeader->id) {
1879 if (g_verboseHealthChecks)
1880 infolog("Invalid health check response id %d from backend %s, expecting %d", responseHeader->id, ds->getNameWithAddr(), requestHeader->id);
1881 return false;
1882 }
1883
1884 if (!responseHeader->qr) {
1885 if (g_verboseHealthChecks)
1886 infolog("Invalid health check response from backend %s, expecting QR to be set", ds->getNameWithAddr());
1887 return false;
1888 }
1889
1890 if (responseHeader->rcode == RCode::ServFail) {
1891 if (g_verboseHealthChecks)
1892 infolog("Backend %s responded to health check with ServFail", ds->getNameWithAddr());
1893 return false;
1894 }
1895
1896 if (ds->mustResolve && (responseHeader->rcode == RCode::NXDomain || responseHeader->rcode == RCode::Refused)) {
1897 if (g_verboseHealthChecks)
1898 infolog("Backend %s responded to health check with %s while mustResolve is set", ds->getNameWithAddr(), responseHeader->rcode == RCode::NXDomain ? "NXDomain" : "Refused");
1899 return false;
1900 }
1901
1902 uint16_t receivedType;
1903 uint16_t receivedClass;
1904 DNSName receivedName(reply.c_str(), reply.size(), sizeof(dnsheader), false, &receivedType, &receivedClass);
1905
1906 if (receivedName != checkName || receivedType != checkType || receivedClass != checkClass) {
1907 if (g_verboseHealthChecks)
1908 infolog("Backend %s responded to health check with an invalid qname (%s vs %s), qtype (%s vs %s) or qclass (%d vs %d)", ds->getNameWithAddr(), receivedName.toLogString(), checkName.toLogString(), QType(receivedType).getName(), QType(checkType).getName(), receivedClass, checkClass);
1909 return false;
1910 }
1911
1912 return true;
1913 }
1914 catch(const std::exception& e)
1915 {
1916 if (g_verboseHealthChecks)
1917 infolog("Error checking the health of backend %s: %s", ds->getNameWithAddr(), e.what());
1918 return false;
1919 }
1920 catch(...)
1921 {
1922 if (g_verboseHealthChecks)
1923 infolog("Unknown exception while checking the health of backend %s", ds->getNameWithAddr());
1924 return false;
1925 }
1926
1927 uint64_t g_maxTCPClientThreads{10};
1928 std::atomic<uint16_t> g_cacheCleaningDelay{60};
1929 std::atomic<uint16_t> g_cacheCleaningPercentage{100};
1930
1931 void maintThread()
1932 {
1933 setThreadName("dnsdist/main");
1934 int interval = 1;
1935 size_t counter = 0;
1936 int32_t secondsToWaitLog = 0;
1937
1938 for(;;) {
1939 sleep(interval);
1940
1941 {
1942 std::lock_guard<std::mutex> lock(g_luamutex);
1943 auto f = g_lua.readVariable<boost::optional<std::function<void()> > >("maintenance");
1944 if(f) {
1945 try {
1946 (*f)();
1947 secondsToWaitLog = 0;
1948 }
1949 catch(std::exception &e) {
1950 if (secondsToWaitLog <= 0) {
1951 infolog("Error during execution of maintenance function: %s", e.what());
1952 secondsToWaitLog = 61;
1953 }
1954 secondsToWaitLog -= interval;
1955 }
1956 }
1957 }
1958
1959 counter++;
1960 if (counter >= g_cacheCleaningDelay) {
1961 /* keep track, for each cache, of whether we should keep
1962 expired entries */
1963 std::map<std::shared_ptr<DNSDistPacketCache>, bool> caches;
1964
1965 /* gather all caches actually used by at least one pool, and see
1966 if something prevents us from cleaning the expired entries */
1967 auto localPools = g_pools.getLocal();
1968 for (const auto& entry : *localPools) {
1969 auto& pool = entry.second;
1970
1971 auto packetCache = pool->packetCache;
1972 if (!packetCache) {
1973 continue;
1974 }
1975
1976 auto pair = caches.insert({packetCache, false});
1977 auto& iter = pair.first;
1978 /* if we need to keep stale data for this cache (ie, not clear
1979 expired entries when at least one pool using this cache
1980 has all its backends down) */
1981 if (packetCache->keepStaleData() && iter->second == false) {
1982 /* so far all pools had at least one backend up */
1983 if (pool->countServers(true) == 0) {
1984 iter->second = true;
1985 }
1986 }
1987 }
1988
1989 for (auto pair : caches) {
1990 /* shall we keep expired entries ? */
1991 if (pair.second == true) {
1992 continue;
1993 }
1994 auto& packetCache = pair.first;
1995 size_t upTo = (packetCache->getMaxEntries()* (100 - g_cacheCleaningPercentage)) / 100;
1996 packetCache->purgeExpired(upTo);
1997 }
1998 counter = 0;
1999 }
2000
2001 // ponder pruning g_dynblocks of expired entries here
2002 }
2003 }
2004
2005 static void secPollThread()
2006 {
2007 setThreadName("dnsdist/secpoll");
2008
2009 for (;;) {
2010 try {
2011 doSecPoll(g_secPollSuffix);
2012 }
2013 catch(...) {
2014 }
2015 sleep(g_secPollInterval);
2016 }
2017 }
2018
2019 static void healthChecksThread()
2020 {
2021 setThreadName("dnsdist/healthC");
2022
2023 int interval = 1;
2024
2025 for(;;) {
2026 sleep(interval);
2027
2028 if(g_tcpclientthreads->getQueuedCount() > 1 && !g_tcpclientthreads->hasReachedMaxThreads())
2029 g_tcpclientthreads->addTCPClientThread();
2030
2031 auto states = g_dstates.getLocal(); // this points to the actual shared_ptrs!
2032 for(auto& dss : *states) {
2033 if(++dss->lastCheck < dss->checkInterval)
2034 continue;
2035 dss->lastCheck = 0;
2036 if(dss->availability==DownstreamState::Availability::Auto) {
2037 bool newState=upCheck(dss);
2038 if (newState) {
2039 /* check succeeded */
2040 dss->currentCheckFailures = 0;
2041
2042 if (!dss->upStatus) {
2043 /* we were marked as down */
2044 dss->consecutiveSuccessfulChecks++;
2045 if (dss->consecutiveSuccessfulChecks < dss->minRiseSuccesses) {
2046 /* if we need more than one successful check to rise
2047 and we didn't reach the threshold yet,
2048 let's stay down */
2049 newState = false;
2050 }
2051 }
2052 }
2053 else {
2054 /* check failed */
2055 dss->consecutiveSuccessfulChecks = 0;
2056
2057 if (dss->upStatus) {
2058 /* we are currently up */
2059 dss->currentCheckFailures++;
2060 if (dss->currentCheckFailures < dss->maxCheckFailures) {
2061 /* we need more than one failure to be marked as down,
2062 and we did not reach the threshold yet, let's stay down */
2063 newState = true;
2064 }
2065 }
2066 }
2067
2068 if(newState != dss->upStatus) {
2069 warnlog("Marking downstream %s as '%s'", dss->getNameWithAddr(), newState ? "up" : "down");
2070
2071 if (newState && !dss->connected) {
2072 newState = dss->reconnect();
2073
2074 if (dss->connected && !dss->threadStarted.test_and_set()) {
2075 dss->tid = thread(responderThread, dss);
2076 }
2077 }
2078
2079 dss->upStatus = newState;
2080 dss->currentCheckFailures = 0;
2081 dss->consecutiveSuccessfulChecks = 0;
2082 if (g_snmpAgent && g_snmpTrapsEnabled) {
2083 g_snmpAgent->sendBackendStatusChangeTrap(dss);
2084 }
2085 }
2086 }
2087
2088 auto delta = dss->sw.udiffAndSet()/1000000.0;
2089 dss->queryLoad = 1.0*(dss->queries.load() - dss->prev.queries.load())/delta;
2090 dss->dropRate = 1.0*(dss->reuseds.load() - dss->prev.reuseds.load())/delta;
2091 dss->prev.queries.store(dss->queries.load());
2092 dss->prev.reuseds.store(dss->reuseds.load());
2093
2094 for(IDState& ids : dss->idStates) { // timeouts
2095 int64_t usageIndicator = ids.usageIndicator;
2096 if(usageIndicator >=0 && ids.age++ > g_udpTimeout) {
2097 /* We set usageIndicator to -1 as soon as possible
2098 to limit the risk of racing with the
2099 responder thread.
2100 */
2101 auto oldDU = ids.du;
2102
2103 if (!ids.usageIndicator.compare_exchange_strong(usageIndicator, -1)) {
2104 /* this state has been altered in the meantime,
2105 don't go anywhere near it */
2106 continue;
2107 }
2108 ids.du = nullptr;
2109 handleDOHTimeout(oldDU);
2110 ids.age = 0;
2111 dss->reuseds++;
2112 --dss->outstanding;
2113 ++g_stats.downstreamTimeouts; // this is an 'actively' discovered timeout
2114 vinfolog("Had a downstream timeout from %s (%s) for query for %s|%s from %s",
2115 dss->remote.toStringWithPort(), dss->name,
2116 ids.qname.toLogString(), QType(ids.qtype).getName(), ids.origRemote.toStringWithPort());
2117
2118 struct timespec ts;
2119 gettime(&ts);
2120
2121 struct dnsheader fake;
2122 memset(&fake, 0, sizeof(fake));
2123 fake.id = ids.origID;
2124
2125 g_rings.insertResponse(ts, ids.origRemote, ids.qname, ids.qtype, std::numeric_limits<unsigned int>::max(), 0, fake, dss->remote);
2126 }
2127 }
2128 }
2129 }
2130 }
2131
2132 static void bindAny(int af, int sock)
2133 {
2134 __attribute__((unused)) int one = 1;
2135
2136 #ifdef IP_FREEBIND
2137 if (setsockopt(sock, IPPROTO_IP, IP_FREEBIND, &one, sizeof(one)) < 0)
2138 warnlog("Warning: IP_FREEBIND setsockopt failed: %s", strerror(errno));
2139 #endif
2140
2141 #ifdef IP_BINDANY
2142 if (af == AF_INET)
2143 if (setsockopt(sock, IPPROTO_IP, IP_BINDANY, &one, sizeof(one)) < 0)
2144 warnlog("Warning: IP_BINDANY setsockopt failed: %s", strerror(errno));
2145 #endif
2146 #ifdef IPV6_BINDANY
2147 if (af == AF_INET6)
2148 if (setsockopt(sock, IPPROTO_IPV6, IPV6_BINDANY, &one, sizeof(one)) < 0)
2149 warnlog("Warning: IPV6_BINDANY setsockopt failed: %s", strerror(errno));
2150 #endif
2151 #ifdef SO_BINDANY
2152 if (setsockopt(sock, SOL_SOCKET, SO_BINDANY, &one, sizeof(one)) < 0)
2153 warnlog("Warning: SO_BINDANY setsockopt failed: %s", strerror(errno));
2154 #endif
2155 }
2156
2157 static void dropGroupPrivs(gid_t gid)
2158 {
2159 if (gid) {
2160 if (setgid(gid) == 0) {
2161 if (setgroups(0, NULL) < 0) {
2162 warnlog("Warning: Unable to drop supplementary gids: %s", strerror(errno));
2163 }
2164 }
2165 else {
2166 warnlog("Warning: Unable to set group ID to %d: %s", gid, strerror(errno));
2167 }
2168 }
2169 }
2170
2171 static void dropUserPrivs(uid_t uid)
2172 {
2173 if(uid) {
2174 if(setuid(uid) < 0) {
2175 warnlog("Warning: Unable to set user ID to %d: %s", uid, strerror(errno));
2176 }
2177 }
2178 }
2179
2180 static void checkFileDescriptorsLimits(size_t udpBindsCount, size_t tcpBindsCount)
2181 {
2182 /* stdin, stdout, stderr */
2183 size_t requiredFDsCount = 3;
2184 auto backends = g_dstates.getLocal();
2185 /* UDP sockets to backends */
2186 size_t backendUDPSocketsCount = 0;
2187 for (const auto& backend : *backends) {
2188 backendUDPSocketsCount += backend->sockets.size();
2189 }
2190 requiredFDsCount += backendUDPSocketsCount;
2191 /* TCP sockets to backends */
2192 requiredFDsCount += (backends->size() * g_maxTCPClientThreads);
2193 /* listening sockets */
2194 requiredFDsCount += udpBindsCount;
2195 requiredFDsCount += tcpBindsCount;
2196 /* max TCP connections currently served */
2197 requiredFDsCount += g_maxTCPClientThreads;
2198 /* max pipes for communicating between TCP acceptors and client threads */
2199 requiredFDsCount += (g_maxTCPClientThreads * 2);
2200 /* max TCP queued connections */
2201 requiredFDsCount += g_maxTCPQueuedConnections;
2202 /* DelayPipe pipe */
2203 requiredFDsCount += 2;
2204 /* syslog socket */
2205 requiredFDsCount++;
2206 /* webserver main socket */
2207 requiredFDsCount++;
2208 /* console main socket */
2209 requiredFDsCount++;
2210 /* carbon export */
2211 requiredFDsCount++;
2212 /* history file */
2213 requiredFDsCount++;
2214 struct rlimit rl;
2215 getrlimit(RLIMIT_NOFILE, &rl);
2216 if (rl.rlim_cur <= requiredFDsCount) {
2217 warnlog("Warning, this configuration can use more than %d file descriptors, web server and console connections not included, and the current limit is %d.", std::to_string(requiredFDsCount), std::to_string(rl.rlim_cur));
2218 #ifdef HAVE_SYSTEMD
2219 warnlog("You can increase this value by using LimitNOFILE= in the systemd unit file or ulimit.");
2220 #else
2221 warnlog("You can increase this value by using ulimit.");
2222 #endif
2223 }
2224 }
2225
2226 static void setUpLocalBind(std::unique_ptr<ClientState>& cs)
2227 {
2228 /* skip some warnings if there is an identical UDP context */
2229 bool warn = cs->tcp == false || cs->tlsFrontend != nullptr || cs->dohFrontend != nullptr;
2230 int& fd = cs->tcp == false ? cs->udpFD : cs->tcpFD;
2231 (void) warn;
2232
2233 fd = SSocket(cs->local.sin4.sin_family, cs->tcp == false ? SOCK_DGRAM : SOCK_STREAM, 0);
2234
2235 if (cs->tcp) {
2236 SSetsockopt(fd, SOL_SOCKET, SO_REUSEADDR, 1);
2237 #ifdef TCP_DEFER_ACCEPT
2238 SSetsockopt(fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, 1);
2239 #endif
2240 if (cs->fastOpenQueueSize > 0) {
2241 #ifdef TCP_FASTOPEN
2242 SSetsockopt(fd, IPPROTO_TCP, TCP_FASTOPEN, cs->fastOpenQueueSize);
2243 #else
2244 if (warn) {
2245 warnlog("TCP Fast Open has been configured on local address '%s' but is not supported", cs->local.toStringWithPort());
2246 }
2247 #endif
2248 }
2249 }
2250
2251 if(cs->local.sin4.sin_family == AF_INET6) {
2252 SSetsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, 1);
2253 }
2254
2255 bindAny(cs->local.sin4.sin_family, fd);
2256
2257 if(!cs->tcp && IsAnyAddress(cs->local)) {
2258 int one=1;
2259 setsockopt(fd, IPPROTO_IP, GEN_IP_PKTINFO, &one, sizeof(one)); // linux supports this, so why not - might fail on other systems
2260 #ifdef IPV6_RECVPKTINFO
2261 setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, &one, sizeof(one));
2262 #endif
2263 }
2264
2265 if (cs->reuseport) {
2266 #ifdef SO_REUSEPORT
2267 SSetsockopt(fd, SOL_SOCKET, SO_REUSEPORT, 1);
2268 #else
2269 if (warn) {
2270 /* no need to warn again if configured but support is not available, we already did for UDP */
2271 warnlog("SO_REUSEPORT has been configured on local address '%s' but is not supported", cs->local.toStringWithPort());
2272 }
2273 #endif
2274 }
2275
2276 if (!cs->tcp) {
2277 if (cs->local.isIPv4()) {
2278 try {
2279 setSocketIgnorePMTU(cs->udpFD);
2280 }
2281 catch(const std::exception& e) {
2282 warnlog("Failed to set IP_MTU_DISCOVER on UDP server socket for local address '%s': %s", cs->local.toStringWithPort(), e.what());
2283 }
2284 }
2285 }
2286
2287 const std::string& itf = cs->interface;
2288 if (!itf.empty()) {
2289 #ifdef SO_BINDTODEVICE
2290 int res = setsockopt(fd, SOL_SOCKET, SO_BINDTODEVICE, itf.c_str(), itf.length());
2291 if (res != 0) {
2292 warnlog("Error setting up the interface on local address '%s': %s", cs->local.toStringWithPort(), strerror(errno));
2293 }
2294 #else
2295 if (warn) {
2296 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", cs->local.toStringWithPort());
2297 }
2298 #endif
2299 }
2300
2301 #ifdef HAVE_EBPF
2302 if (g_defaultBPFFilter) {
2303 cs->attachFilter(g_defaultBPFFilter);
2304 vinfolog("Attaching default BPF Filter to %s frontend %s", (!cs->tcp ? "UDP" : "TCP"), cs->local.toStringWithPort());
2305 }
2306 #endif /* HAVE_EBPF */
2307
2308 if (cs->tlsFrontend != nullptr) {
2309 if (!cs->tlsFrontend->setupTLS()) {
2310 errlog("Error while setting up TLS on local address '%s', exiting", cs->local.toStringWithPort());
2311 _exit(EXIT_FAILURE);
2312 }
2313 }
2314
2315 if (cs->dohFrontend != nullptr) {
2316 cs->dohFrontend->setup();
2317 }
2318
2319 SBind(fd, cs->local);
2320
2321 if (cs->tcp) {
2322 SListen(cs->tcpFD, SOMAXCONN);
2323 if (cs->tlsFrontend != nullptr) {
2324 warnlog("Listening on %s for TLS", cs->local.toStringWithPort());
2325 }
2326 else if (cs->dohFrontend != nullptr) {
2327 warnlog("Listening on %s for DoH", cs->local.toStringWithPort());
2328 }
2329 else if (cs->dnscryptCtx != nullptr) {
2330 warnlog("Listening on %s for DNSCrypt", cs->local.toStringWithPort());
2331 }
2332 else {
2333 warnlog("Listening on %s", cs->local.toStringWithPort());
2334 }
2335 }
2336
2337 cs->ready = true;
2338 }
2339
2340 struct
2341 {
2342 vector<string> locals;
2343 vector<string> remotes;
2344 bool checkConfig{false};
2345 bool beClient{false};
2346 bool beSupervised{false};
2347 string command;
2348 string config;
2349 string uid;
2350 string gid;
2351 } g_cmdLine;
2352
2353 std::atomic<bool> g_configurationDone{false};
2354
2355 static void usage()
2356 {
2357 cout<<endl;
2358 cout<<"Syntax: dnsdist [-C,--config file] [-c,--client [IP[:PORT]]]\n";
2359 cout<<"[-e,--execute cmd] [-h,--help] [-l,--local addr]\n";
2360 cout<<"[-v,--verbose] [--check-config] [--version]\n";
2361 cout<<"\n";
2362 cout<<"-a,--acl netmask Add this netmask to the ACL\n";
2363 cout<<"-C,--config file Load configuration from 'file'\n";
2364 cout<<"-c,--client Operate as a client, connect to dnsdist. This reads\n";
2365 cout<<" controlSocket from your configuration file, but also\n";
2366 cout<<" accepts an IP:PORT argument\n";
2367 #ifdef HAVE_LIBSODIUM
2368 cout<<"-k,--setkey KEY Use KEY for encrypted communication to dnsdist. This\n";
2369 cout<<" is similar to setting setKey in the configuration file.\n";
2370 cout<<" NOTE: this will leak this key in your shell's history\n";
2371 cout<<" and in the systems running process list.\n";
2372 #endif
2373 cout<<"--check-config Validate the configuration file and exit. The exit-code\n";
2374 cout<<" reflects the validation, 0 is OK, 1 means an error.\n";
2375 cout<<" Any errors are printed as well.\n";
2376 cout<<"-e,--execute cmd Connect to dnsdist and execute 'cmd'\n";
2377 cout<<"-g,--gid gid Change the process group ID after binding sockets\n";
2378 cout<<"-h,--help Display this helpful message\n";
2379 cout<<"-l,--local address Listen on this local address\n";
2380 cout<<"--supervised Don't open a console, I'm supervised\n";
2381 cout<<" (use with e.g. systemd and daemontools)\n";
2382 cout<<"--disable-syslog Don't log to syslog, only to stdout\n";
2383 cout<<" (use with e.g. systemd)\n";
2384 cout<<"-u,--uid uid Change the process user ID after binding sockets\n";
2385 cout<<"-v,--verbose Enable verbose mode\n";
2386 cout<<"-V,--version Show dnsdist version information and exit\n";
2387 }
2388
2389 int main(int argc, char** argv)
2390 try
2391 {
2392 size_t udpBindsCount = 0;
2393 size_t tcpBindsCount = 0;
2394 rl_attempted_completion_function = my_completion;
2395 rl_completion_append_character = 0;
2396
2397 signal(SIGPIPE, SIG_IGN);
2398 signal(SIGCHLD, SIG_IGN);
2399 openlog("dnsdist", LOG_PID|LOG_NDELAY, LOG_DAEMON);
2400
2401 #ifdef HAVE_LIBSODIUM
2402 if (sodium_init() == -1) {
2403 cerr<<"Unable to initialize crypto library"<<endl;
2404 exit(EXIT_FAILURE);
2405 }
2406 g_hashperturb=randombytes_uniform(0xffffffff);
2407 srandom(randombytes_uniform(0xffffffff));
2408 #else
2409 {
2410 struct timeval tv;
2411 gettimeofday(&tv, 0);
2412 srandom(tv.tv_sec ^ tv.tv_usec ^ getpid());
2413 g_hashperturb=random();
2414 }
2415
2416 #endif
2417 ComboAddress clientAddress = ComboAddress();
2418 g_cmdLine.config=SYSCONFDIR "/dnsdist.conf";
2419 struct option longopts[]={
2420 {"acl", required_argument, 0, 'a'},
2421 {"check-config", no_argument, 0, 1},
2422 {"client", no_argument, 0, 'c'},
2423 {"config", required_argument, 0, 'C'},
2424 {"disable-syslog", no_argument, 0, 2},
2425 {"execute", required_argument, 0, 'e'},
2426 {"gid", required_argument, 0, 'g'},
2427 {"help", no_argument, 0, 'h'},
2428 {"local", required_argument, 0, 'l'},
2429 {"setkey", required_argument, 0, 'k'},
2430 {"supervised", no_argument, 0, 3},
2431 {"uid", required_argument, 0, 'u'},
2432 {"verbose", no_argument, 0, 'v'},
2433 {"version", no_argument, 0, 'V'},
2434 {0,0,0,0}
2435 };
2436 int longindex=0;
2437 string optstring;
2438 for(;;) {
2439 int c=getopt_long(argc, argv, "a:cC:e:g:hk:l:u:vV", longopts, &longindex);
2440 if(c==-1)
2441 break;
2442 switch(c) {
2443 case 1:
2444 g_cmdLine.checkConfig=true;
2445 break;
2446 case 2:
2447 g_syslog=false;
2448 break;
2449 case 3:
2450 g_cmdLine.beSupervised=true;
2451 break;
2452 case 'C':
2453 g_cmdLine.config=optarg;
2454 break;
2455 case 'c':
2456 g_cmdLine.beClient=true;
2457 break;
2458 case 'e':
2459 g_cmdLine.command=optarg;
2460 break;
2461 case 'g':
2462 g_cmdLine.gid=optarg;
2463 break;
2464 case 'h':
2465 cout<<"dnsdist "<<VERSION<<endl;
2466 usage();
2467 cout<<"\n";
2468 exit(EXIT_SUCCESS);
2469 break;
2470 case 'a':
2471 optstring=optarg;
2472 g_ACL.modify([optstring](NetmaskGroup& nmg) { nmg.addMask(optstring); });
2473 break;
2474 case 'k':
2475 #ifdef HAVE_LIBSODIUM
2476 if (B64Decode(string(optarg), g_consoleKey) < 0) {
2477 cerr<<"Unable to decode key '"<<optarg<<"'."<<endl;
2478 exit(EXIT_FAILURE);
2479 }
2480 #else
2481 cerr<<"dnsdist has been built without libsodium, -k/--setkey is unsupported."<<endl;
2482 exit(EXIT_FAILURE);
2483 #endif
2484 break;
2485 case 'l':
2486 g_cmdLine.locals.push_back(trim_copy(string(optarg)));
2487 break;
2488 case 'u':
2489 g_cmdLine.uid=optarg;
2490 break;
2491 case 'v':
2492 g_verbose=true;
2493 break;
2494 case 'V':
2495 #ifdef LUAJIT_VERSION
2496 cout<<"dnsdist "<<VERSION<<" ("<<LUA_RELEASE<<" ["<<LUAJIT_VERSION<<"])"<<endl;
2497 #else
2498 cout<<"dnsdist "<<VERSION<<" ("<<LUA_RELEASE<<")"<<endl;
2499 #endif
2500 cout<<"Enabled features: ";
2501 #ifdef HAVE_DNS_OVER_TLS
2502 cout<<"dns-over-tls(";
2503 #ifdef HAVE_GNUTLS
2504 cout<<"gnutls";
2505 #ifdef HAVE_LIBSSL
2506 cout<<" ";
2507 #endif
2508 #endif
2509 #ifdef HAVE_LIBSSL
2510 cout<<"openssl";
2511 #endif
2512 cout<<") ";
2513 #endif
2514 #ifdef HAVE_DNS_OVER_HTTPS
2515 cout<<"dns-over-https(DOH) ";
2516 #endif
2517 #ifdef HAVE_DNSCRYPT
2518 cout<<"dnscrypt ";
2519 #endif
2520 #ifdef HAVE_EBPF
2521 cout<<"ebpf ";
2522 #endif
2523 #ifdef HAVE_FSTRM
2524 cout<<"fstrm ";
2525 #endif
2526 #ifdef HAVE_LIBCRYPTO
2527 cout<<"ipcipher ";
2528 #endif
2529 #ifdef HAVE_LIBSODIUM
2530 cout<<"libsodium ";
2531 #endif
2532 #ifdef HAVE_PROTOBUF
2533 cout<<"protobuf ";
2534 #endif
2535 #ifdef HAVE_RE2
2536 cout<<"re2 ";
2537 #endif
2538 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
2539 cout<<"recvmmsg/sendmmsg ";
2540 #endif
2541 #ifdef HAVE_NET_SNMP
2542 cout<<"snmp ";
2543 #endif
2544 #ifdef HAVE_SYSTEMD
2545 cout<<"systemd";
2546 #endif
2547 cout<<endl;
2548 exit(EXIT_SUCCESS);
2549 break;
2550 case '?':
2551 //getopt_long printed an error message.
2552 usage();
2553 exit(EXIT_FAILURE);
2554 break;
2555 }
2556 }
2557
2558 argc-=optind;
2559 argv+=optind;
2560 for(auto p = argv; *p; ++p) {
2561 if(g_cmdLine.beClient) {
2562 clientAddress = ComboAddress(*p, 5199);
2563 } else {
2564 g_cmdLine.remotes.push_back(*p);
2565 }
2566 }
2567
2568 ServerPolicy leastOutstandingPol{"leastOutstanding", leastOutstanding, false};
2569
2570 g_policy.setState(leastOutstandingPol);
2571 if(g_cmdLine.beClient || !g_cmdLine.command.empty()) {
2572 setupLua(true, g_cmdLine.config);
2573 if (clientAddress != ComboAddress())
2574 g_serverControl = clientAddress;
2575 doClient(g_serverControl, g_cmdLine.command);
2576 _exit(EXIT_SUCCESS);
2577 }
2578
2579 auto acl = g_ACL.getCopy();
2580 if(acl.empty()) {
2581 for(auto& addr : {"127.0.0.0/8", "10.0.0.0/8", "100.64.0.0/10", "169.254.0.0/16", "192.168.0.0/16", "172.16.0.0/12", "::1/128", "fc00::/7", "fe80::/10"})
2582 acl.addMask(addr);
2583 g_ACL.setState(acl);
2584 }
2585
2586 auto consoleACL = g_consoleACL.getCopy();
2587 for (const auto& mask : { "127.0.0.1/8", "::1/128" }) {
2588 consoleACL.addMask(mask);
2589 }
2590 g_consoleACL.setState(consoleACL);
2591
2592 if (g_cmdLine.checkConfig) {
2593 setupLua(true, g_cmdLine.config);
2594 // No exception was thrown
2595 infolog("Configuration '%s' OK!", g_cmdLine.config);
2596 _exit(EXIT_SUCCESS);
2597 }
2598
2599 auto todo=setupLua(false, g_cmdLine.config);
2600
2601 auto localPools = g_pools.getCopy();
2602 {
2603 bool precompute = false;
2604 if (g_policy.getLocal()->name == "chashed") {
2605 precompute = true;
2606 } else {
2607 for (const auto& entry: localPools) {
2608 if (entry.second->policy != nullptr && entry.second->policy->name == "chashed") {
2609 precompute = true;
2610 break ;
2611 }
2612 }
2613 }
2614 if (precompute) {
2615 vinfolog("Pre-computing hashes for consistent hash load-balancing policy");
2616 // pre compute hashes
2617 auto backends = g_dstates.getLocal();
2618 for (auto& backend: *backends) {
2619 backend->hash();
2620 }
2621 }
2622 }
2623
2624 if (!g_cmdLine.locals.empty()) {
2625 for (auto it = g_frontends.begin(); it != g_frontends.end(); ) {
2626 /* DoH, DoT and DNSCrypt frontends are separate */
2627 if ((*it)->dohFrontend == nullptr && (*it)->tlsFrontend == nullptr && (*it)->dnscryptCtx == nullptr) {
2628 it = g_frontends.erase(it);
2629 }
2630 else {
2631 ++it;
2632 }
2633 }
2634
2635 for(const auto& loc : g_cmdLine.locals) {
2636 /* UDP */
2637 g_frontends.push_back(std::unique_ptr<ClientState>(new ClientState(ComboAddress(loc, 53), false, false, 0, "", {})));
2638 /* TCP */
2639 g_frontends.push_back(std::unique_ptr<ClientState>(new ClientState(ComboAddress(loc, 53), true, false, 0, "", {})));
2640 }
2641 }
2642
2643 if (g_frontends.empty()) {
2644 /* UDP */
2645 g_frontends.push_back(std::unique_ptr<ClientState>(new ClientState(ComboAddress("127.0.0.1", 53), false, false, 0, "", {})));
2646 /* TCP */
2647 g_frontends.push_back(std::unique_ptr<ClientState>(new ClientState(ComboAddress("127.0.0.1", 53), true, false, 0, "", {})));
2648 }
2649
2650 g_configurationDone = true;
2651
2652 for(auto& frontend : g_frontends) {
2653 setUpLocalBind(frontend);
2654
2655 if (frontend->tcp == false) {
2656 ++udpBindsCount;
2657 }
2658 else {
2659 ++tcpBindsCount;
2660 }
2661 }
2662
2663 warnlog("dnsdist %s comes with ABSOLUTELY NO WARRANTY. This is free software, and you are welcome to redistribute it according to the terms of the GPL version 2", VERSION);
2664
2665 vector<string> vec;
2666 std::string acls;
2667 g_ACL.getLocal()->toStringVector(&vec);
2668 for(const auto& s : vec) {
2669 if (!acls.empty())
2670 acls += ", ";
2671 acls += s;
2672 }
2673 infolog("ACL allowing queries from: %s", acls.c_str());
2674 vec.clear();
2675 acls.clear();
2676 g_consoleACL.getLocal()->toStringVector(&vec);
2677 for (const auto& entry : vec) {
2678 if (!acls.empty()) {
2679 acls += ", ";
2680 }
2681 acls += entry;
2682 }
2683 infolog("Console ACL allowing connections from: %s", acls.c_str());
2684
2685 #ifdef HAVE_LIBSODIUM
2686 if (g_consoleEnabled && g_consoleKey.empty()) {
2687 warnlog("Warning, the console has been enabled via 'controlSocket()' but no key has been set with 'setKey()' so all connections will fail until a key has been set");
2688 }
2689 #endif
2690
2691 uid_t newgid=0;
2692 gid_t newuid=0;
2693
2694 if(!g_cmdLine.gid.empty())
2695 newgid = strToGID(g_cmdLine.gid.c_str());
2696
2697 if(!g_cmdLine.uid.empty())
2698 newuid = strToUID(g_cmdLine.uid.c_str());
2699
2700 dropGroupPrivs(newgid);
2701 dropUserPrivs(newuid);
2702 try {
2703 /* we might still have capabilities remaining,
2704 for example if we have been started as root
2705 without --uid or --gid (please don't do that)
2706 or as an unprivileged user with ambient
2707 capabilities like CAP_NET_BIND_SERVICE.
2708 */
2709 dropCapabilities();
2710 }
2711 catch(const std::exception& e) {
2712 warnlog("%s", e.what());
2713 }
2714
2715 /* this need to be done _after_ dropping privileges */
2716 g_delay = new DelayPipe<DelayedPacket>();
2717
2718 if (g_snmpAgent) {
2719 g_snmpAgent->run();
2720 }
2721
2722 g_tcpclientthreads = std::unique_ptr<TCPClientCollection>(new TCPClientCollection(g_maxTCPClientThreads, g_useTCPSinglePipe));
2723
2724 for(auto& t : todo)
2725 t();
2726
2727 localPools = g_pools.getCopy();
2728 /* create the default pool no matter what */
2729 createPoolIfNotExists(localPools, "");
2730 if(g_cmdLine.remotes.size()) {
2731 for(const auto& address : g_cmdLine.remotes) {
2732 auto ret=std::make_shared<DownstreamState>(ComboAddress(address, 53));
2733 addServerToPool(localPools, "", ret);
2734 if (ret->connected && !ret->threadStarted.test_and_set()) {
2735 ret->tid = thread(responderThread, ret);
2736 }
2737 g_dstates.modify([ret](servers_t& servers) { servers.push_back(ret); });
2738 }
2739 }
2740 g_pools.setState(localPools);
2741
2742 if(g_dstates.getLocal()->empty()) {
2743 errlog("No downstream servers defined: all packets will get dropped");
2744 // you might define them later, but you need to know
2745 }
2746
2747 checkFileDescriptorsLimits(udpBindsCount, tcpBindsCount);
2748
2749 for(auto& dss : g_dstates.getCopy()) { // it is a copy, but the internal shared_ptrs are the real deal
2750 if(dss->availability==DownstreamState::Availability::Auto) {
2751 bool newState=upCheck(dss);
2752 warnlog("Marking downstream %s as '%s'", dss->getNameWithAddr(), newState ? "up" : "down");
2753 dss->upStatus = newState;
2754 }
2755 }
2756
2757 for(auto& cs : g_frontends) {
2758 if (cs->dohFrontend != nullptr) {
2759 #ifdef HAVE_DNS_OVER_HTTPS
2760 std::thread t1(dohThread, cs.get());
2761 if (!cs->cpus.empty()) {
2762 mapThreadToCPUList(t1.native_handle(), cs->cpus);
2763 }
2764 t1.detach();
2765 #endif /* HAVE_DNS_OVER_HTTPS */
2766 continue;
2767 }
2768 if (cs->udpFD >= 0) {
2769 thread t1(udpClientThread, cs.get());
2770 if (!cs->cpus.empty()) {
2771 mapThreadToCPUList(t1.native_handle(), cs->cpus);
2772 }
2773 t1.detach();
2774 }
2775 else if (cs->tcpFD >= 0) {
2776 thread t1(tcpAcceptorThread, cs.get());
2777 if (!cs->cpus.empty()) {
2778 mapThreadToCPUList(t1.native_handle(), cs->cpus);
2779 }
2780 t1.detach();
2781 }
2782 }
2783
2784 thread carbonthread(carbonDumpThread);
2785 carbonthread.detach();
2786
2787 thread stattid(maintThread);
2788 stattid.detach();
2789
2790 thread healththread(healthChecksThread);
2791
2792 if (!g_secPollSuffix.empty()) {
2793 thread secpollthread(secPollThread);
2794 secpollthread.detach();
2795 }
2796
2797 if(g_cmdLine.beSupervised) {
2798 #ifdef HAVE_SYSTEMD
2799 sd_notify(0, "READY=1");
2800 #endif
2801 healththread.join();
2802 }
2803 else {
2804 healththread.detach();
2805 doConsole();
2806 }
2807 _exit(EXIT_SUCCESS);
2808
2809 }
2810 catch(const LuaContext::ExecutionErrorException& e) {
2811 try {
2812 errlog("Fatal Lua error: %s", e.what());
2813 std::rethrow_if_nested(e);
2814 } catch(const std::exception& ne) {
2815 errlog("Details: %s", ne.what());
2816 }
2817 catch(PDNSException &ae)
2818 {
2819 errlog("Fatal pdns error: %s", ae.reason);
2820 }
2821 _exit(EXIT_FAILURE);
2822 }
2823 catch(std::exception &e)
2824 {
2825 errlog("Fatal error: %s", e.what());
2826 _exit(EXIT_FAILURE);
2827 }
2828 catch(PDNSException &ae)
2829 {
2830 errlog("Fatal pdns error: %s", ae.reason);
2831 _exit(EXIT_FAILURE);
2832 }
2833
2834 uint64_t getLatencyCount(const std::string&)
2835 {
2836 return g_stats.responses + g_stats.selfAnswered + g_stats.cacheHits;
2837 }