]> git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/dnsdist.cc
Merge pull request #8424 from Habbie/ixfrdist-fixes
[thirdparty/pdns.git] / pdns / dnsdist.cc
1 /*
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
8 *
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
22
23 #include "config.h"
24
25 #include <fstream>
26 #include <getopt.h>
27 #include <grp.h>
28 #include <limits>
29 #include <netinet/tcp.h>
30 #include <pwd.h>
31 #include <sys/resource.h>
32 #include <unistd.h>
33
34 #if defined (__OpenBSD__) || defined(__NetBSD__)
35 #include <readline/readline.h>
36 #else
37 #include <editline/readline.h>
38 #endif
39
40 #ifdef HAVE_SYSTEMD
41 #include <systemd/sd-daemon.h>
42 #endif
43
44 #include "dnsdist.hh"
45 #include "dnsdist-cache.hh"
46 #include "dnsdist-console.hh"
47 #include "dnsdist-ecs.hh"
48 #include "dnsdist-lua.hh"
49 #include "dnsdist-rings.hh"
50 #include "dnsdist-secpoll.hh"
51 #include "dnsdist-xpf.hh"
52
53 #include "base64.hh"
54 #include "delaypipe.hh"
55 #include "dolog.hh"
56 #include "dnsname.hh"
57 #include "dnsparser.hh"
58 #include "dnswriter.hh"
59 #include "ednsoptions.hh"
60 #include "gettime.hh"
61 #include "lock.hh"
62 #include "misc.hh"
63 #include "sodcrypto.hh"
64 #include "sstuff.hh"
65 #include "threadname.hh"
66
67 /* Known sins:
68
69 Receiver is currently single threaded
70 not *that* bad actually, but now that we are thread safe, might want to scale
71 */
72
73 /* the Rulaction plan
74 Set of Rules, if one matches, it leads to an Action
75 Both rules and actions could conceivably be Lua based.
76 On the C++ side, both could be inherited from a class Rule and a class Action,
77 on the Lua side we can't do that. */
78
79 using std::atomic;
80 using std::thread;
81 bool g_verbose;
82
83 struct DNSDistStats g_stats;
84 MetricDefinitionStorage g_metricDefinitions;
85
86 uint16_t g_maxOutstanding{std::numeric_limits<uint16_t>::max()};
87 bool g_verboseHealthChecks{false};
88 uint32_t g_staleCacheEntriesTTL{0};
89 bool g_syslog{true};
90 bool g_allowEmptyResponse{false};
91
92 GlobalStateHolder<NetmaskGroup> g_ACL;
93 string g_outputBuffer;
94
95 std::vector<std::shared_ptr<TLSFrontend>> g_tlslocals;
96 std::vector<std::shared_ptr<DOHFrontend>> g_dohlocals;
97 std::vector<std::shared_ptr<DNSCryptContext>> g_dnsCryptLocals;
98 #ifdef HAVE_EBPF
99 shared_ptr<BPFFilter> g_defaultBPFFilter;
100 std::vector<std::shared_ptr<DynBPFFilter> > g_dynBPFFilters;
101 #endif /* HAVE_EBPF */
102 std::vector<std::unique_ptr<ClientState>> g_frontends;
103 GlobalStateHolder<pools_t> g_pools;
104 size_t g_udpVectorSize{1};
105
106 bool g_snmpEnabled{false};
107 bool g_snmpTrapsEnabled{false};
108 DNSDistSNMPAgent* g_snmpAgent{nullptr};
109
110 /* UDP: the grand design. Per socket we listen on for incoming queries there is one thread.
111 Then we have a bunch of connected sockets for talking to downstream servers.
112 We send directly to those sockets.
113
114 For the return path, per downstream server we have a thread that listens to responses.
115
116 Per socket there is an array of 2^16 states, when we send out a packet downstream, we note
117 there the original requestor and the original id. The new ID is the offset in the array.
118
119 When an answer comes in on a socket, we look up the offset by the id, and lob it to the
120 original requestor.
121
122 IDs are assigned by atomic increments of the socket offset.
123 */
124
125 GlobalStateHolder<vector<DNSDistRuleAction> > g_rulactions;
126 GlobalStateHolder<vector<DNSDistResponseRuleAction> > g_resprulactions;
127 GlobalStateHolder<vector<DNSDistResponseRuleAction> > g_cachehitresprulactions;
128 GlobalStateHolder<vector<DNSDistResponseRuleAction> > g_selfansweredresprulactions;
129
130 Rings g_rings;
131 QueryCount g_qcount;
132
133 GlobalStateHolder<servers_t> g_dstates;
134 GlobalStateHolder<NetmaskTree<DynBlock>> g_dynblockNMG;
135 GlobalStateHolder<SuffixMatchTree<DynBlock>> g_dynblockSMT;
136 DNSAction::Action g_dynBlockAction = DNSAction::Action::Drop;
137 int g_tcpRecvTimeout{2};
138 int g_tcpSendTimeout{2};
139 int g_udpTimeout{2};
140
141 bool g_servFailOnNoPolicy{false};
142 bool g_truncateTC{false};
143 bool g_fixupCase{false};
144 bool g_preserveTrailingData{false};
145 bool g_roundrobinFailOnNoServer{false};
146
147 std::set<std::string> g_capabilitiesToRetain;
148
149 static void truncateTC(char* packet, uint16_t* len, size_t responseSize, unsigned int consumed)
150 try
151 {
152 bool hadEDNS = false;
153 uint16_t payloadSize = 0;
154 uint16_t z = 0;
155
156 if (g_addEDNSToSelfGeneratedResponses) {
157 hadEDNS = getEDNSUDPPayloadSizeAndZ(packet, *len, &payloadSize, &z);
158 }
159
160 *len=static_cast<uint16_t>(sizeof(dnsheader)+consumed+DNS_TYPE_SIZE+DNS_CLASS_SIZE);
161 struct dnsheader* dh = reinterpret_cast<struct dnsheader*>(packet);
162 dh->ancount = dh->arcount = dh->nscount = 0;
163
164 if (hadEDNS) {
165 addEDNS(dh, *len, responseSize, z & EDNS_HEADER_FLAG_DO, payloadSize, 0);
166 }
167 }
168 catch(...)
169 {
170 g_stats.truncFail++;
171 }
172
173 struct DelayedPacket
174 {
175 int fd;
176 string packet;
177 ComboAddress destination;
178 ComboAddress origDest;
179 void operator()()
180 {
181 ssize_t res;
182 if(origDest.sin4.sin_family == 0) {
183 res = sendto(fd, packet.c_str(), packet.size(), 0, (struct sockaddr*)&destination, destination.getSocklen());
184 }
185 else {
186 res = sendfromto(fd, packet.c_str(), packet.size(), 0, origDest, destination);
187 }
188 if (res == -1) {
189 int err = errno;
190 vinfolog("Error sending delayed response to %s: %s", destination.toStringWithPort(), strerror(err));
191 }
192 }
193 };
194
195 DelayPipe<DelayedPacket>* g_delay = nullptr;
196
197 void doLatencyStats(double udiff)
198 {
199 if(udiff < 1000) ++g_stats.latency0_1;
200 else if(udiff < 10000) ++g_stats.latency1_10;
201 else if(udiff < 50000) ++g_stats.latency10_50;
202 else if(udiff < 100000) ++g_stats.latency50_100;
203 else if(udiff < 1000000) ++g_stats.latency100_1000;
204 else ++g_stats.latencySlow;
205 g_stats.latencySum += udiff / 1000;
206
207 auto doAvg = [](double& var, double n, double weight) {
208 var = (weight -1) * var/weight + n/weight;
209 };
210
211 doAvg(g_stats.latencyAvg100, udiff, 100);
212 doAvg(g_stats.latencyAvg1000, udiff, 1000);
213 doAvg(g_stats.latencyAvg10000, udiff, 10000);
214 doAvg(g_stats.latencyAvg1000000, udiff, 1000000);
215 }
216
217 bool responseContentMatches(const char* response, const uint16_t responseLen, const DNSName& qname, const uint16_t qtype, const uint16_t qclass, const ComboAddress& remote, unsigned int& consumed)
218 {
219 if (responseLen < sizeof(dnsheader)) {
220 return false;
221 }
222
223 const struct dnsheader* dh = reinterpret_cast<const struct dnsheader*>(response);
224 if (dh->qdcount == 0) {
225 if ((dh->rcode != RCode::NoError && dh->rcode != RCode::NXDomain) || g_allowEmptyResponse) {
226 return true;
227 }
228 else {
229 ++g_stats.nonCompliantResponses;
230 return false;
231 }
232 }
233
234 uint16_t rqtype, rqclass;
235 DNSName rqname;
236 try {
237 rqname=DNSName(response, responseLen, sizeof(dnsheader), false, &rqtype, &rqclass, &consumed);
238 }
239 catch(const std::exception& e) {
240 if(responseLen > 0 && static_cast<size_t>(responseLen) > sizeof(dnsheader)) {
241 infolog("Backend %s sent us a response with id %d that did not parse: %s", remote.toStringWithPort(), ntohs(dh->id), e.what());
242 }
243 ++g_stats.nonCompliantResponses;
244 return false;
245 }
246
247 if (rqtype != qtype || rqclass != qclass || rqname != qname) {
248 return false;
249 }
250
251 return true;
252 }
253
254 static void restoreFlags(struct dnsheader* dh, uint16_t origFlags)
255 {
256 static const uint16_t rdMask = 1 << FLAGS_RD_OFFSET;
257 static const uint16_t cdMask = 1 << FLAGS_CD_OFFSET;
258 static const uint16_t restoreFlagsMask = UINT16_MAX & ~(rdMask | cdMask);
259 uint16_t * flags = getFlagsFromDNSHeader(dh);
260 /* clear the flags we are about to restore */
261 *flags &= restoreFlagsMask;
262 /* only keep the flags we want to restore */
263 origFlags &= ~restoreFlagsMask;
264 /* set the saved flags as they were */
265 *flags |= origFlags;
266 }
267
268 static bool fixUpQueryTurnedResponse(DNSQuestion& dq, const uint16_t origFlags)
269 {
270 restoreFlags(dq.dh, origFlags);
271
272 return addEDNSToQueryTurnedResponse(dq);
273 }
274
275 static bool fixUpResponse(char** response, uint16_t* responseLen, size_t* responseSize, const DNSName& qname, uint16_t origFlags, bool ednsAdded, bool ecsAdded, std::vector<uint8_t>& rewrittenResponse, uint16_t addRoom, bool* zeroScope)
276 {
277 if (*responseLen < sizeof(dnsheader)) {
278 return false;
279 }
280
281 struct dnsheader* dh = reinterpret_cast<struct dnsheader*>(*response);
282 restoreFlags(dh, origFlags);
283
284 if (*responseLen == sizeof(dnsheader)) {
285 return true;
286 }
287
288 if(g_fixupCase) {
289 string realname = qname.toDNSString();
290 if (*responseLen >= (sizeof(dnsheader) + realname.length())) {
291 memcpy(*response + sizeof(dnsheader), realname.c_str(), realname.length());
292 }
293 }
294
295 if (ednsAdded || ecsAdded) {
296 uint16_t optStart;
297 size_t optLen = 0;
298 bool last = false;
299
300 const std::string responseStr(*response, *responseLen);
301 int res = locateEDNSOptRR(responseStr, &optStart, &optLen, &last);
302
303 if (res == 0) {
304 if (zeroScope) { // this finds if an EDNS Client Subnet scope was set, and if it is 0
305 size_t optContentStart = 0;
306 uint16_t optContentLen = 0;
307 /* we need at least 4 bytes after the option length (family: 2, source prefix-length: 1, scope prefix-length: 1) */
308 if (isEDNSOptionInOpt(responseStr, optStart, optLen, EDNSOptionCode::ECS, &optContentStart, &optContentLen) && optContentLen >= 4) {
309 /* see if the EDNS Client Subnet SCOPE PREFIX-LENGTH byte in position 3 is set to 0, which is the only thing
310 we care about. */
311 *zeroScope = responseStr.at(optContentStart + 3) == 0;
312 }
313 }
314
315 if (ednsAdded) {
316 /* we added the entire OPT RR,
317 therefore we need to remove it entirely */
318 if (last) {
319 /* simply remove the last AR */
320 *responseLen -= optLen;
321 uint16_t arcount = ntohs(dh->arcount);
322 arcount--;
323 dh->arcount = htons(arcount);
324 }
325 else {
326 /* Removing an intermediary RR could lead to compression error */
327 if (rewriteResponseWithoutEDNS(responseStr, rewrittenResponse) == 0) {
328 *responseLen = rewrittenResponse.size();
329 if (addRoom && (UINT16_MAX - *responseLen) > addRoom) {
330 rewrittenResponse.reserve(*responseLen + addRoom);
331 }
332 *responseSize = rewrittenResponse.capacity();
333 *response = reinterpret_cast<char*>(rewrittenResponse.data());
334 }
335 else {
336 warnlog("Error rewriting content");
337 }
338 }
339 }
340 else {
341 /* the OPT RR was already present, but without ECS,
342 we need to remove the ECS option if any */
343 if (last) {
344 /* nothing after the OPT RR, we can simply remove the
345 ECS option */
346 size_t existingOptLen = optLen;
347 removeEDNSOptionFromOPT(*response + optStart, &optLen, EDNSOptionCode::ECS);
348 *responseLen -= (existingOptLen - optLen);
349 }
350 else {
351 /* Removing an intermediary RR could lead to compression error */
352 if (rewriteResponseWithoutEDNSOption(responseStr, EDNSOptionCode::ECS, rewrittenResponse) == 0) {
353 *responseLen = rewrittenResponse.size();
354 if (addRoom && (UINT16_MAX - *responseLen) > addRoom) {
355 rewrittenResponse.reserve(*responseLen + addRoom);
356 }
357 *responseSize = rewrittenResponse.capacity();
358 *response = reinterpret_cast<char*>(rewrittenResponse.data());
359 }
360 else {
361 warnlog("Error rewriting content");
362 }
363 }
364 }
365 }
366 }
367
368 return true;
369 }
370
371 #ifdef HAVE_DNSCRYPT
372 static bool encryptResponse(char* response, uint16_t* responseLen, size_t responseSize, bool tcp, std::shared_ptr<DNSCryptQuery> dnsCryptQuery, dnsheader** dh, dnsheader* dhCopy)
373 {
374 if (dnsCryptQuery) {
375 uint16_t encryptedResponseLen = 0;
376
377 /* save the original header before encrypting it in place */
378 if (dh != nullptr && *dh != nullptr && dhCopy != nullptr) {
379 memcpy(dhCopy, *dh, sizeof(dnsheader));
380 *dh = dhCopy;
381 }
382
383 int res = dnsCryptQuery->encryptResponse(response, *responseLen, responseSize, tcp, &encryptedResponseLen);
384 if (res == 0) {
385 *responseLen = encryptedResponseLen;
386 } else {
387 /* dropping response */
388 vinfolog("Error encrypting the response, dropping.");
389 return false;
390 }
391 }
392 return true;
393 }
394 #endif /* HAVE_DNSCRYPT */
395
396 static bool applyRulesToResponse(LocalStateHolder<vector<DNSDistResponseRuleAction> >& localRespRulactions, DNSResponse& dr)
397 {
398 DNSResponseAction::Action action=DNSResponseAction::Action::None;
399 std::string ruleresult;
400 for(const auto& lr : *localRespRulactions) {
401 if(lr.d_rule->matches(&dr)) {
402 lr.d_rule->d_matches++;
403 action=(*lr.d_action)(&dr, &ruleresult);
404 switch(action) {
405 case DNSResponseAction::Action::Allow:
406 return true;
407 break;
408 case DNSResponseAction::Action::Drop:
409 return false;
410 break;
411 case DNSResponseAction::Action::HeaderModify:
412 return true;
413 break;
414 case DNSResponseAction::Action::ServFail:
415 dr.dh->rcode = RCode::ServFail;
416 return true;
417 break;
418 /* non-terminal actions follow */
419 case DNSResponseAction::Action::Delay:
420 dr.delayMsec = static_cast<int>(pdns_stou(ruleresult)); // sorry
421 break;
422 case DNSResponseAction::Action::None:
423 break;
424 }
425 }
426 }
427
428 return true;
429 }
430
431 bool processResponse(char** response, uint16_t* responseLen, size_t* responseSize, LocalStateHolder<vector<DNSDistResponseRuleAction> >& localRespRulactions, DNSResponse& dr, size_t addRoom, std::vector<uint8_t>& rewrittenResponse, bool muted)
432 {
433 if (!applyRulesToResponse(localRespRulactions, dr)) {
434 return false;
435 }
436
437 bool zeroScope = false;
438 if (!fixUpResponse(response, responseLen, responseSize, *dr.qname, dr.origFlags, dr.ednsAdded, dr.ecsAdded, rewrittenResponse, addRoom, dr.useZeroScope ? &zeroScope : nullptr)) {
439 return false;
440 }
441
442 if (dr.packetCache && !dr.skipCache && *responseLen <= s_maxPacketCacheEntrySize) {
443 if (!dr.useZeroScope) {
444 /* if the query was not suitable for zero-scope, for
445 example because it had an existing ECS entry so the hash is
446 not really 'no ECS', so just insert it for the existing subnet
447 since:
448 - we don't have the correct hash for a non-ECS query
449 - inserting with hash computed before the ECS replacement but with
450 the subnet extracted _after_ the replacement would not work.
451 */
452 zeroScope = false;
453 }
454 // if zeroScope, pass the pre-ECS hash-key and do not pass the subnet to the cache
455 dr.packetCache->insert(zeroScope ? dr.cacheKeyNoECS : dr.cacheKey, zeroScope ? boost::none : dr.subnet, dr.origFlags, dr.dnssecOK, *dr.qname, dr.qtype, dr.qclass, *response, *responseLen, dr.tcp, dr.dh->rcode, dr.tempFailureTTL);
456 }
457
458 #ifdef HAVE_DNSCRYPT
459 if (!muted) {
460 if (!encryptResponse(*response, responseLen, *responseSize, dr.tcp, dr.dnsCryptQuery, nullptr, nullptr)) {
461 return false;
462 }
463 }
464 #endif /* HAVE_DNSCRYPT */
465
466 return true;
467 }
468
469 static bool sendUDPResponse(int origFD, const char* response, const uint16_t responseLen, const int delayMsec, const ComboAddress& origDest, const ComboAddress& origRemote)
470 {
471 if(delayMsec && g_delay) {
472 DelayedPacket dp{origFD, string(response,responseLen), origRemote, origDest};
473 g_delay->submit(dp, delayMsec);
474 }
475 else {
476 ssize_t res;
477 if(origDest.sin4.sin_family == 0) {
478 res = sendto(origFD, response, responseLen, 0, reinterpret_cast<const struct sockaddr*>(&origRemote), origRemote.getSocklen());
479 }
480 else {
481 res = sendfromto(origFD, response, responseLen, 0, origDest, origRemote);
482 }
483 if (res == -1) {
484 int err = errno;
485 vinfolog("Error sending response to %s: %s", origRemote.toStringWithPort(), stringerror(err));
486 }
487 }
488
489 return true;
490 }
491
492
493 int pickBackendSocketForSending(std::shared_ptr<DownstreamState>& state)
494 {
495 return state->sockets[state->socketsOffset++ % state->sockets.size()];
496 }
497
498 static void pickBackendSocketsReadyForReceiving(const std::shared_ptr<DownstreamState>& state, std::vector<int>& ready)
499 {
500 ready.clear();
501
502 if (state->sockets.size() == 1) {
503 ready.push_back(state->sockets[0]);
504 return ;
505 }
506
507 {
508 std::lock_guard<std::mutex> lock(state->socketsLock);
509 state->mplexer->getAvailableFDs(ready, -1);
510 }
511 }
512
513 // listens on a dedicated socket, lobs answers from downstream servers to original requestors
514 void responderThread(std::shared_ptr<DownstreamState> dss)
515 try {
516 setThreadName("dnsdist/respond");
517 auto localRespRulactions = g_resprulactions.getLocal();
518 char packet[s_maxPacketCacheEntrySize + DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE];
519 static_assert(sizeof(packet) <= UINT16_MAX, "Packet size should fit in a uint16_t");
520 /* when the answer is encrypted in place, we need to get a copy
521 of the original header before encryption to fill the ring buffer */
522 dnsheader cleartextDH;
523 vector<uint8_t> rewrittenResponse;
524
525 uint16_t queryId = 0;
526 std::vector<int> sockets;
527 sockets.reserve(dss->sockets.size());
528
529 for(;;) {
530 dnsheader* dh = reinterpret_cast<struct dnsheader*>(packet);
531 try {
532 pickBackendSocketsReadyForReceiving(dss, sockets);
533 for (const auto& fd : sockets) {
534 ssize_t got = recv(fd, packet, sizeof(packet), 0);
535 char * response = packet;
536 size_t responseSize = sizeof(packet);
537
538 if (got < 0 || static_cast<size_t>(got) < sizeof(dnsheader))
539 continue;
540
541 uint16_t responseLen = static_cast<uint16_t>(got);
542 queryId = dh->id;
543
544 if(queryId >= dss->idStates.size()) {
545 continue;
546 }
547
548 IDState* ids = &dss->idStates[queryId];
549 int64_t usageIndicator = ids->usageIndicator;
550
551 if(!IDState::isInUse(usageIndicator)) {
552 /* the corresponding state is marked as not in use, meaning that:
553 - it was already cleaned up by another thread and the state is gone ;
554 - we already got a response for this query and this one is a duplicate.
555 Either way, we don't touch it.
556 */
557 continue;
558 }
559
560 /* read the potential DOHUnit state as soon as possible, but don't use it
561 until we have confirmed that we own this state by updating usageIndicator */
562 auto du = ids->du;
563 /* setting age to 0 to prevent the maintainer thread from
564 cleaning this IDS while we process the response.
565 */
566 ids->age = 0;
567 int origFD = ids->origFD;
568
569 unsigned int consumed = 0;
570 if (!responseContentMatches(response, responseLen, ids->qname, ids->qtype, ids->qclass, dss->remote, consumed)) {
571 continue;
572 }
573
574 bool isDoH = du != nullptr;
575 /* atomically mark the state as available, but only if it has not been altered
576 in the meantime */
577 if (ids->tryMarkUnused(usageIndicator)) {
578 /* clear the potential DOHUnit asap, it's ours now
579 and since we just marked the state as unused,
580 someone could overwrite it. */
581 ids->du = nullptr;
582 /* we only decrement the outstanding counter if the value was not
583 altered in the meantime, which would mean that the state has been actively reused
584 and the other thread has not incremented the outstanding counter, so we don't
585 want it to be decremented twice. */
586 --dss->outstanding; // you'd think an attacker could game this, but we're using connected socket
587 } else {
588 /* someone updated the state in the meantime, we can't touch the existing pointer */
589 du = nullptr;
590 /* since the state has been updated, we can't safely access it so let's just drop
591 this response */
592 continue;
593 }
594
595 if(dh->tc && g_truncateTC) {
596 truncateTC(response, &responseLen, responseSize, consumed);
597 }
598
599 dh->id = ids->origID;
600
601 uint16_t addRoom = 0;
602 DNSResponse dr = makeDNSResponseFromIDState(*ids, dh, sizeof(packet), responseLen, false);
603 if (dr.dnsCryptQuery) {
604 addRoom = DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE;
605 }
606
607 memcpy(&cleartextDH, dr.dh, sizeof(cleartextDH));
608 if (!processResponse(&response, &responseLen, &responseSize, localRespRulactions, dr, addRoom, rewrittenResponse, ids->cs && ids->cs->muted)) {
609 continue;
610 }
611
612 if (ids->cs && !ids->cs->muted) {
613 if (du) {
614 #ifdef HAVE_DNS_OVER_HTTPS
615 // DoH query
616 du->response = std::string(response, responseLen);
617 if (send(du->rsock, &du, sizeof(du), 0) != sizeof(du)) {
618 /* at this point we have the only remaining pointer on this
619 DOHUnit object since we did set ids->du to nullptr earlier */
620 delete du;
621 }
622 #endif /* HAVE_DNS_OVER_HTTPS */
623 du = nullptr;
624 }
625 else {
626 ComboAddress empty;
627 empty.sin4.sin_family = 0;
628 /* if ids->destHarvested is false, origDest holds the listening address.
629 We don't want to use that as a source since it could be 0.0.0.0 for example. */
630 sendUDPResponse(origFD, response, responseLen, dr.delayMsec, ids->destHarvested ? ids->origDest : empty, ids->origRemote);
631 }
632 }
633
634 ++g_stats.responses;
635 if (ids->cs) {
636 ++ids->cs->responses;
637 }
638 ++dss->responses;
639
640 double udiff = ids->sentTime.udiff();
641 vinfolog("Got answer from %s, relayed to %s%s, took %f usec", dss->remote.toStringWithPort(), ids->origRemote.toStringWithPort(),
642 isDoH ? " (https)": "", udiff);
643
644 struct timespec ts;
645 gettime(&ts);
646 g_rings.insertResponse(ts, *dr.remote, *dr.qname, dr.qtype, static_cast<unsigned int>(udiff), static_cast<unsigned int>(got), cleartextDH, dss->remote);
647
648 switch (cleartextDH.rcode) {
649 case RCode::NXDomain:
650 ++g_stats.frontendNXDomain;
651 break;
652 case RCode::ServFail:
653 ++g_stats.servfailResponses;
654 ++g_stats.frontendServFail;
655 break;
656 case RCode::NoError:
657 ++g_stats.frontendNoError;
658 break;
659 }
660 dss->latencyUsec = (127.0 * dss->latencyUsec / 128.0) + udiff/128.0;
661
662 doLatencyStats(udiff);
663
664 rewrittenResponse.clear();
665 }
666 }
667 catch(const std::exception& e){
668 vinfolog("Got an error in UDP responder thread while parsing a response from %s, id %d: %s", dss->remote.toStringWithPort(), queryId, e.what());
669 }
670 }
671 }
672 catch(const std::exception& e)
673 {
674 errlog("UDP responder thread died because of exception: %s", e.what());
675 }
676 catch(const PDNSException& e)
677 {
678 errlog("UDP responder thread died because of PowerDNS exception: %s", e.reason);
679 }
680 catch(...)
681 {
682 errlog("UDP responder thread died because of an exception: %s", "unknown");
683 }
684
685 bool DownstreamState::reconnect()
686 {
687 std::unique_lock<std::mutex> tl(connectLock, std::try_to_lock);
688 if (!tl.owns_lock()) {
689 /* we are already reconnecting */
690 return false;
691 }
692
693 connected = false;
694 for (auto& fd : sockets) {
695 if (fd != -1) {
696 if (sockets.size() > 1) {
697 std::lock_guard<std::mutex> lock(socketsLock);
698 mplexer->removeReadFD(fd);
699 }
700 /* shutdown() is needed to wake up recv() in the responderThread */
701 shutdown(fd, SHUT_RDWR);
702 close(fd);
703 fd = -1;
704 }
705 if (!IsAnyAddress(remote)) {
706 fd = SSocket(remote.sin4.sin_family, SOCK_DGRAM, 0);
707 if (!IsAnyAddress(sourceAddr)) {
708 SSetsockopt(fd, SOL_SOCKET, SO_REUSEADDR, 1);
709 if (!sourceItfName.empty()) {
710 #ifdef SO_BINDTODEVICE
711 int res = setsockopt(fd, SOL_SOCKET, SO_BINDTODEVICE, sourceItfName.c_str(), sourceItfName.length());
712 if (res != 0) {
713 infolog("Error setting up the interface on backend socket '%s': %s", remote.toStringWithPort(), stringerror());
714 }
715 #endif
716 }
717
718 SBind(fd, sourceAddr);
719 }
720 try {
721 SConnect(fd, remote);
722 if (sockets.size() > 1) {
723 std::lock_guard<std::mutex> lock(socketsLock);
724 mplexer->addReadFD(fd, [](int, boost::any) {});
725 }
726 connected = true;
727 }
728 catch(const std::runtime_error& error) {
729 infolog("Error connecting to new server with address %s: %s", remote.toStringWithPort(), error.what());
730 connected = false;
731 break;
732 }
733 }
734 }
735
736 /* if at least one (re-)connection failed, close all sockets */
737 if (!connected) {
738 for (auto& fd : sockets) {
739 if (fd != -1) {
740 if (sockets.size() > 1) {
741 std::lock_guard<std::mutex> lock(socketsLock);
742 mplexer->removeReadFD(fd);
743 }
744 /* shutdown() is needed to wake up recv() in the responderThread */
745 shutdown(fd, SHUT_RDWR);
746 close(fd);
747 fd = -1;
748 }
749 }
750 }
751
752 return connected;
753 }
754 void DownstreamState::hash()
755 {
756 vinfolog("Computing hashes for id=%s and weight=%d", id, weight);
757 auto w = weight;
758 WriteLock wl(&d_lock);
759 hashes.clear();
760 while (w > 0) {
761 std::string uuid = boost::str(boost::format("%s-%d") % id % w);
762 unsigned int wshash = burtleCI((const unsigned char*)uuid.c_str(), uuid.size(), g_hashperturb);
763 hashes.insert(wshash);
764 --w;
765 }
766 }
767
768 void DownstreamState::setId(const boost::uuids::uuid& newId)
769 {
770 id = newId;
771 // compute hashes only if already done
772 if (!hashes.empty()) {
773 hash();
774 }
775 }
776
777 void DownstreamState::setWeight(int newWeight)
778 {
779 if (newWeight < 1) {
780 errlog("Error setting server's weight: downstream weight value must be greater than 0.");
781 return ;
782 }
783 weight = newWeight;
784 if (!hashes.empty()) {
785 hash();
786 }
787 }
788
789 DownstreamState::DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf_, const std::string& sourceItfName_, size_t numberOfSockets): sourceItfName(sourceItfName_), remote(remote_), sourceAddr(sourceAddr_), sourceItf(sourceItf_)
790 {
791 pthread_rwlock_init(&d_lock, nullptr);
792 id = getUniqueID();
793 threadStarted.clear();
794
795 mplexer = std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
796
797 sockets.resize(numberOfSockets);
798 for (auto& fd : sockets) {
799 fd = -1;
800 }
801
802 if (!IsAnyAddress(remote)) {
803 reconnect();
804 idStates.resize(g_maxOutstanding);
805 sw.start();
806 infolog("Added downstream server %s", remote.toStringWithPort());
807 }
808
809 }
810
811 std::mutex g_luamutex;
812 LuaContext g_lua;
813
814 GlobalStateHolder<ServerPolicy> g_policy;
815
816 shared_ptr<DownstreamState> firstAvailable(const NumberedServerVector& servers, const DNSQuestion* dq)
817 {
818 for(auto& d : servers) {
819 if(d.second->isUp() && d.second->qps.check())
820 return d.second;
821 }
822 return leastOutstanding(servers, dq);
823 }
824
825 // get server with least outstanding queries, and within those, with the lowest order, and within those: the fastest
826 shared_ptr<DownstreamState> leastOutstanding(const NumberedServerVector& servers, const DNSQuestion* dq)
827 {
828 if (servers.size() == 1 && servers[0].second->isUp()) {
829 return servers[0].second;
830 }
831
832 vector<pair<tuple<int,int,double>, shared_ptr<DownstreamState>>> poss;
833 /* so you might wonder, why do we go through this trouble? The data on which we sort could change during the sort,
834 which would suck royally and could even lead to crashes. So first we snapshot on what we sort, and then we sort */
835 poss.reserve(servers.size());
836 for(auto& d : servers) {
837 if(d.second->isUp()) {
838 poss.push_back({make_tuple(d.second->outstanding.load(), d.second->order, d.second->latencyUsec), d.second});
839 }
840 }
841 if(poss.empty())
842 return shared_ptr<DownstreamState>();
843 nth_element(poss.begin(), poss.begin(), poss.end(), [](const decltype(poss)::value_type& a, const decltype(poss)::value_type& b) { return a.first < b.first; });
844 return poss.begin()->second;
845 }
846
847 shared_ptr<DownstreamState> valrandom(unsigned int val, const NumberedServerVector& servers, const DNSQuestion* dq)
848 {
849 vector<pair<int, shared_ptr<DownstreamState>>> poss;
850 int sum = 0;
851 int max = std::numeric_limits<int>::max();
852
853 for(auto& d : servers) { // w=1, w=10 -> 1, 11
854 if(d.second->isUp()) {
855 // Don't overflow sum when adding high weights
856 if(d.second->weight > max - sum) {
857 sum = max;
858 } else {
859 sum += d.second->weight;
860 }
861
862 poss.push_back({sum, d.second});
863 }
864 }
865
866 // Catch poss & sum are empty to avoid SIGFPE
867 if(poss.empty())
868 return shared_ptr<DownstreamState>();
869
870 int r = val % sum;
871 auto p = upper_bound(poss.begin(), poss.end(),r, [](int r_, const decltype(poss)::value_type& a) { return r_ < a.first;});
872 if(p==poss.end())
873 return shared_ptr<DownstreamState>();
874 return p->second;
875 }
876
877 shared_ptr<DownstreamState> wrandom(const NumberedServerVector& servers, const DNSQuestion* dq)
878 {
879 return valrandom(random(), servers, dq);
880 }
881
882 uint32_t g_hashperturb;
883 shared_ptr<DownstreamState> whashed(const NumberedServerVector& servers, const DNSQuestion* dq)
884 {
885 return valrandom(dq->qname->hash(g_hashperturb), servers, dq);
886 }
887
888 shared_ptr<DownstreamState> chashed(const NumberedServerVector& servers, const DNSQuestion* dq)
889 {
890 unsigned int qhash = dq->qname->hash(g_hashperturb);
891 unsigned int sel = std::numeric_limits<unsigned int>::max();
892 unsigned int min = std::numeric_limits<unsigned int>::max();
893 shared_ptr<DownstreamState> ret = nullptr, first = nullptr;
894
895 for (const auto& d: servers) {
896 if (d.second->isUp()) {
897 // make sure hashes have been computed
898 if (d.second->hashes.empty()) {
899 d.second->hash();
900 }
901 {
902 ReadLock rl(&(d.second->d_lock));
903 const auto& server = d.second;
904 // we want to keep track of the last hash
905 if (min > *(server->hashes.begin())) {
906 min = *(server->hashes.begin());
907 first = server;
908 }
909
910 auto hash_it = server->hashes.lower_bound(qhash);
911 if (hash_it != server->hashes.end()) {
912 if (*hash_it < sel) {
913 sel = *hash_it;
914 ret = server;
915 }
916 }
917 }
918 }
919 }
920 if (ret != nullptr) {
921 return ret;
922 }
923 if (first != nullptr) {
924 return first;
925 }
926 return shared_ptr<DownstreamState>();
927 }
928
929 shared_ptr<DownstreamState> roundrobin(const NumberedServerVector& servers, const DNSQuestion* dq)
930 {
931 NumberedServerVector poss;
932
933 for(auto& d : servers) {
934 if(d.second->isUp()) {
935 poss.push_back(d);
936 }
937 }
938
939 const auto *res=&poss;
940 if(poss.empty() && !g_roundrobinFailOnNoServer)
941 res = &servers;
942
943 if(res->empty())
944 return shared_ptr<DownstreamState>();
945
946 static unsigned int counter;
947
948 return (*res)[(counter++) % res->size()].second;
949 }
950
951 ComboAddress g_serverControl{"127.0.0.1:5199"};
952
953 std::shared_ptr<ServerPool> createPoolIfNotExists(pools_t& pools, const string& poolName)
954 {
955 std::shared_ptr<ServerPool> pool;
956 pools_t::iterator it = pools.find(poolName);
957 if (it != pools.end()) {
958 pool = it->second;
959 }
960 else {
961 if (!poolName.empty())
962 vinfolog("Creating pool %s", poolName);
963 pool = std::make_shared<ServerPool>();
964 pools.insert(std::pair<std::string,std::shared_ptr<ServerPool> >(poolName, pool));
965 }
966 return pool;
967 }
968
969 void setPoolPolicy(pools_t& pools, const string& poolName, std::shared_ptr<ServerPolicy> policy)
970 {
971 std::shared_ptr<ServerPool> pool = createPoolIfNotExists(pools, poolName);
972 if (!poolName.empty()) {
973 vinfolog("Setting pool %s server selection policy to %s", poolName, policy->name);
974 } else {
975 vinfolog("Setting default pool server selection policy to %s", policy->name);
976 }
977 pool->policy = policy;
978 }
979
980 void addServerToPool(pools_t& pools, const string& poolName, std::shared_ptr<DownstreamState> server)
981 {
982 std::shared_ptr<ServerPool> pool = createPoolIfNotExists(pools, poolName);
983 if (!poolName.empty()) {
984 vinfolog("Adding server to pool %s", poolName);
985 } else {
986 vinfolog("Adding server to default pool");
987 }
988 pool->addServer(server);
989 }
990
991 void removeServerFromPool(pools_t& pools, const string& poolName, std::shared_ptr<DownstreamState> server)
992 {
993 std::shared_ptr<ServerPool> pool = getPool(pools, poolName);
994
995 if (!poolName.empty()) {
996 vinfolog("Removing server from pool %s", poolName);
997 }
998 else {
999 vinfolog("Removing server from default pool");
1000 }
1001
1002 pool->removeServer(server);
1003 }
1004
1005 std::shared_ptr<ServerPool> getPool(const pools_t& pools, const std::string& poolName)
1006 {
1007 pools_t::const_iterator it = pools.find(poolName);
1008
1009 if (it == pools.end()) {
1010 throw std::out_of_range("No pool named " + poolName);
1011 }
1012
1013 return it->second;
1014 }
1015
1016 NumberedServerVector getDownstreamCandidates(const pools_t& pools, const std::string& poolName)
1017 {
1018 std::shared_ptr<ServerPool> pool = getPool(pools, poolName);
1019 return pool->getServers();
1020 }
1021
1022 static void spoofResponseFromString(DNSQuestion& dq, const string& spoofContent)
1023 {
1024 string result;
1025
1026 std::vector<std::string> addrs;
1027 stringtok(addrs, spoofContent, " ,");
1028
1029 if (addrs.size() == 1) {
1030 try {
1031 ComboAddress spoofAddr(spoofContent);
1032 SpoofAction sa({spoofAddr});
1033 sa(&dq, &result);
1034 }
1035 catch(const PDNSException &e) {
1036 SpoofAction sa(spoofContent); // CNAME then
1037 sa(&dq, &result);
1038 }
1039 } else {
1040 std::vector<ComboAddress> cas;
1041 for (const auto& addr : addrs) {
1042 try {
1043 cas.push_back(ComboAddress(addr));
1044 }
1045 catch (...) {
1046 }
1047 }
1048 SpoofAction sa(cas);
1049 sa(&dq, &result);
1050 }
1051 }
1052
1053 bool processRulesResult(const DNSAction::Action& action, DNSQuestion& dq, std::string& ruleresult, bool& drop)
1054 {
1055 switch(action) {
1056 case DNSAction::Action::Allow:
1057 return true;
1058 break;
1059 case DNSAction::Action::Drop:
1060 ++g_stats.ruleDrop;
1061 drop = true;
1062 return true;
1063 break;
1064 case DNSAction::Action::Nxdomain:
1065 dq.dh->rcode = RCode::NXDomain;
1066 dq.dh->qr=true;
1067 ++g_stats.ruleNXDomain;
1068 return true;
1069 break;
1070 case DNSAction::Action::Refused:
1071 dq.dh->rcode = RCode::Refused;
1072 dq.dh->qr=true;
1073 ++g_stats.ruleRefused;
1074 return true;
1075 break;
1076 case DNSAction::Action::ServFail:
1077 dq.dh->rcode = RCode::ServFail;
1078 dq.dh->qr=true;
1079 ++g_stats.ruleServFail;
1080 return true;
1081 break;
1082 case DNSAction::Action::Spoof:
1083 spoofResponseFromString(dq, ruleresult);
1084 return true;
1085 break;
1086 case DNSAction::Action::Truncate:
1087 dq.dh->tc = true;
1088 dq.dh->qr = true;
1089 return true;
1090 break;
1091 case DNSAction::Action::HeaderModify:
1092 return true;
1093 break;
1094 case DNSAction::Action::Pool:
1095 dq.poolname=ruleresult;
1096 return true;
1097 break;
1098 case DNSAction::Action::NoRecurse:
1099 dq.dh->rd = false;
1100 return true;
1101 break;
1102 /* non-terminal actions follow */
1103 case DNSAction::Action::Delay:
1104 dq.delayMsec = static_cast<int>(pdns_stou(ruleresult)); // sorry
1105 break;
1106 case DNSAction::Action::None:
1107 /* fall-through */
1108 case DNSAction::Action::NoOp:
1109 break;
1110 }
1111
1112 /* false means that we don't stop the processing */
1113 return false;
1114 }
1115
1116
1117 static bool applyRulesToQuery(LocalHolders& holders, DNSQuestion& dq, const struct timespec& now)
1118 {
1119 g_rings.insertQuery(now, *dq.remote, *dq.qname, dq.qtype, dq.len, *dq.dh);
1120
1121 if(g_qcount.enabled) {
1122 string qname = (*dq.qname).toLogString();
1123 bool countQuery{true};
1124 if(g_qcount.filter) {
1125 std::lock_guard<std::mutex> lock(g_luamutex);
1126 std::tie (countQuery, qname) = g_qcount.filter(&dq);
1127 }
1128
1129 if(countQuery) {
1130 WriteLock wl(&g_qcount.queryLock);
1131 if(!g_qcount.records.count(qname)) {
1132 g_qcount.records[qname] = 0;
1133 }
1134 g_qcount.records[qname]++;
1135 }
1136 }
1137
1138 if(auto got = holders.dynNMGBlock->lookup(*dq.remote)) {
1139 auto updateBlockStats = [&got]() {
1140 ++g_stats.dynBlocked;
1141 got->second.blocks++;
1142 };
1143
1144 if(now < got->second.until) {
1145 DNSAction::Action action = got->second.action;
1146 if (action == DNSAction::Action::None) {
1147 action = g_dynBlockAction;
1148 }
1149 switch (action) {
1150 case DNSAction::Action::NoOp:
1151 /* do nothing */
1152 break;
1153
1154 case DNSAction::Action::Nxdomain:
1155 vinfolog("Query from %s turned into NXDomain because of dynamic block", dq.remote->toStringWithPort());
1156 updateBlockStats();
1157
1158 dq.dh->rcode = RCode::NXDomain;
1159 dq.dh->qr=true;
1160 return true;
1161
1162 case DNSAction::Action::Refused:
1163 vinfolog("Query from %s refused because of dynamic block", dq.remote->toStringWithPort());
1164 updateBlockStats();
1165
1166 dq.dh->rcode = RCode::Refused;
1167 dq.dh->qr = true;
1168 return true;
1169
1170 case DNSAction::Action::Truncate:
1171 if(!dq.tcp) {
1172 updateBlockStats();
1173 vinfolog("Query from %s truncated because of dynamic block", dq.remote->toStringWithPort());
1174 dq.dh->tc = true;
1175 dq.dh->qr = true;
1176 return true;
1177 }
1178 else {
1179 vinfolog("Query from %s for %s over TCP *not* truncated because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toLogString());
1180 }
1181 break;
1182 case DNSAction::Action::NoRecurse:
1183 updateBlockStats();
1184 vinfolog("Query from %s setting rd=0 because of dynamic block", dq.remote->toStringWithPort());
1185 dq.dh->rd = false;
1186 return true;
1187 default:
1188 updateBlockStats();
1189 vinfolog("Query from %s dropped because of dynamic block", dq.remote->toStringWithPort());
1190 return false;
1191 }
1192 }
1193 }
1194
1195 if(auto got = holders.dynSMTBlock->lookup(*dq.qname)) {
1196 auto updateBlockStats = [&got]() {
1197 ++g_stats.dynBlocked;
1198 got->blocks++;
1199 };
1200
1201 if(now < got->until) {
1202 DNSAction::Action action = got->action;
1203 if (action == DNSAction::Action::None) {
1204 action = g_dynBlockAction;
1205 }
1206 switch (action) {
1207 case DNSAction::Action::NoOp:
1208 /* do nothing */
1209 break;
1210 case DNSAction::Action::Nxdomain:
1211 vinfolog("Query from %s for %s turned into NXDomain because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toLogString());
1212 updateBlockStats();
1213
1214 dq.dh->rcode = RCode::NXDomain;
1215 dq.dh->qr=true;
1216 return true;
1217 case DNSAction::Action::Refused:
1218 vinfolog("Query from %s for %s refused because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toLogString());
1219 updateBlockStats();
1220
1221 dq.dh->rcode = RCode::Refused;
1222 dq.dh->qr=true;
1223 return true;
1224 case DNSAction::Action::Truncate:
1225 if(!dq.tcp) {
1226 updateBlockStats();
1227
1228 vinfolog("Query from %s for %s truncated because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toLogString());
1229 dq.dh->tc = true;
1230 dq.dh->qr = true;
1231 return true;
1232 }
1233 else {
1234 vinfolog("Query from %s for %s over TCP *not* truncated because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toLogString());
1235 }
1236 break;
1237 case DNSAction::Action::NoRecurse:
1238 updateBlockStats();
1239 vinfolog("Query from %s setting rd=0 because of dynamic block", dq.remote->toStringWithPort());
1240 dq.dh->rd = false;
1241 return true;
1242 default:
1243 updateBlockStats();
1244 vinfolog("Query from %s for %s dropped because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toLogString());
1245 return false;
1246 }
1247 }
1248 }
1249
1250 DNSAction::Action action=DNSAction::Action::None;
1251 string ruleresult;
1252 bool drop = false;
1253 for(const auto& lr : *holders.rulactions) {
1254 if(lr.d_rule->matches(&dq)) {
1255 lr.d_rule->d_matches++;
1256 action=(*lr.d_action)(&dq, &ruleresult);
1257 if (processRulesResult(action, dq, ruleresult, drop)) {
1258 break;
1259 }
1260 }
1261 }
1262
1263 if (drop) {
1264 return false;
1265 }
1266
1267 return true;
1268 }
1269
1270 ssize_t udpClientSendRequestToBackend(const std::shared_ptr<DownstreamState>& ss, const int sd, const char* request, const size_t requestLen, bool healthCheck)
1271 {
1272 ssize_t result;
1273
1274 if (ss->sourceItf == 0) {
1275 result = send(sd, request, requestLen, 0);
1276 }
1277 else {
1278 struct msghdr msgh;
1279 struct iovec iov;
1280 cmsgbuf_aligned cbuf;
1281 ComboAddress remote(ss->remote);
1282 fillMSGHdr(&msgh, &iov, &cbuf, sizeof(cbuf), const_cast<char*>(request), requestLen, &remote);
1283 addCMsgSrcAddr(&msgh, &cbuf, &ss->sourceAddr, ss->sourceItf);
1284 result = sendmsg(sd, &msgh, 0);
1285 }
1286
1287 if (result == -1) {
1288 int savederrno = errno;
1289 vinfolog("Error sending request to backend %s: %d", ss->remote.toStringWithPort(), savederrno);
1290
1291 /* This might sound silly, but on Linux send() might fail with EINVAL
1292 if the interface the socket was bound to doesn't exist anymore.
1293 We don't want to reconnect the real socket if the healthcheck failed,
1294 because it's not using the same socket.
1295 */
1296 if (!healthCheck && (savederrno == EINVAL || savederrno == ENODEV)) {
1297 ss->reconnect();
1298 }
1299 }
1300
1301 return result;
1302 }
1303
1304 static bool isUDPQueryAcceptable(ClientState& cs, LocalHolders& holders, const struct msghdr* msgh, const ComboAddress& remote, ComboAddress& dest)
1305 {
1306 if (msgh->msg_flags & MSG_TRUNC) {
1307 /* message was too large for our buffer */
1308 vinfolog("Dropping message too large for our buffer");
1309 ++g_stats.nonCompliantQueries;
1310 return false;
1311 }
1312
1313 if(!holders.acl->match(remote)) {
1314 vinfolog("Query from %s dropped because of ACL", remote.toStringWithPort());
1315 ++g_stats.aclDrops;
1316 return false;
1317 }
1318
1319 cs.queries++;
1320 ++g_stats.queries;
1321
1322 if (HarvestDestinationAddress(msgh, &dest)) {
1323 /* we don't get the port, only the address */
1324 dest.sin4.sin_port = cs.local.sin4.sin_port;
1325 }
1326 else {
1327 dest.sin4.sin_family = 0;
1328 }
1329
1330 return true;
1331 }
1332
1333 boost::optional<std::vector<uint8_t>> checkDNSCryptQuery(const ClientState& cs, const char* query, uint16_t& len, std::shared_ptr<DNSCryptQuery>& dnsCryptQuery, time_t now, bool tcp)
1334 {
1335 if (cs.dnscryptCtx) {
1336 #ifdef HAVE_DNSCRYPT
1337 vector<uint8_t> response;
1338 uint16_t decryptedQueryLen = 0;
1339
1340 dnsCryptQuery = std::make_shared<DNSCryptQuery>(cs.dnscryptCtx);
1341
1342 bool decrypted = handleDNSCryptQuery(const_cast<char*>(query), len, dnsCryptQuery, &decryptedQueryLen, tcp, now, response);
1343
1344 if (!decrypted) {
1345 if (response.size() > 0) {
1346 return response;
1347 }
1348 throw std::runtime_error("Unable to decrypt DNSCrypt query, dropping.");
1349 }
1350
1351 len = decryptedQueryLen;
1352 #endif /* HAVE_DNSCRYPT */
1353 }
1354 return boost::none;
1355 }
1356
1357 bool checkQueryHeaders(const struct dnsheader* dh)
1358 {
1359 if (dh->qr) { // don't respond to responses
1360 ++g_stats.nonCompliantQueries;
1361 return false;
1362 }
1363
1364 if (dh->qdcount == 0) {
1365 ++g_stats.emptyQueries;
1366 return false;
1367 }
1368
1369 if (dh->rd) {
1370 ++g_stats.rdQueries;
1371 }
1372
1373 return true;
1374 }
1375
1376 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1377 static void queueResponse(const ClientState& cs, const char* response, uint16_t responseLen, const ComboAddress& dest, const ComboAddress& remote, struct mmsghdr& outMsg, struct iovec* iov, cmsgbuf_aligned* cbuf)
1378 {
1379 outMsg.msg_len = 0;
1380 fillMSGHdr(&outMsg.msg_hdr, iov, nullptr, 0, const_cast<char*>(response), responseLen, const_cast<ComboAddress*>(&remote));
1381
1382 if (dest.sin4.sin_family == 0) {
1383 outMsg.msg_hdr.msg_control = nullptr;
1384 }
1385 else {
1386 addCMsgSrcAddr(&outMsg.msg_hdr, cbuf, &dest, 0);
1387 }
1388 }
1389 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1390
1391 /* self-generated responses or cache hits */
1392 static bool prepareOutgoingResponse(LocalHolders& holders, ClientState& cs, DNSQuestion& dq, bool cacheHit)
1393 {
1394 DNSResponse dr(dq.qname, dq.qtype, dq.qclass, dq.consumed, dq.local, dq.remote, reinterpret_cast<dnsheader*>(dq.dh), dq.size, dq.len, dq.tcp, dq.queryTime);
1395
1396 #ifdef HAVE_PROTOBUF
1397 dr.uniqueId = dq.uniqueId;
1398 #endif
1399 dr.qTag = dq.qTag;
1400 dr.delayMsec = dq.delayMsec;
1401
1402 if (!applyRulesToResponse(cacheHit ? holders.cacheHitRespRulactions : holders.selfAnsweredRespRulactions, dr)) {
1403 return false;
1404 }
1405
1406 /* in case a rule changed it */
1407 dq.delayMsec = dr.delayMsec;
1408
1409 #ifdef HAVE_DNSCRYPT
1410 if (!cs.muted) {
1411 if (!encryptResponse(reinterpret_cast<char*>(dq.dh), &dq.len, dq.size, dq.tcp, dq.dnsCryptQuery, nullptr, nullptr)) {
1412 return false;
1413 }
1414 }
1415 #endif /* HAVE_DNSCRYPT */
1416
1417 if (cacheHit) {
1418 ++g_stats.cacheHits;
1419 }
1420
1421 switch (dr.dh->rcode) {
1422 case RCode::NXDomain:
1423 ++g_stats.frontendNXDomain;
1424 break;
1425 case RCode::ServFail:
1426 ++g_stats.frontendServFail;
1427 break;
1428 case RCode::NoError:
1429 ++g_stats.frontendNoError;
1430 break;
1431 }
1432
1433 doLatencyStats(0); // we're not going to measure this
1434 return true;
1435 }
1436
1437 ProcessQueryResult processQuery(DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend)
1438 {
1439 const uint16_t queryId = ntohs(dq.dh->id);
1440
1441 try {
1442 /* we need an accurate ("real") value for the response and
1443 to store into the IDS, but not for insertion into the
1444 rings for example */
1445 struct timespec now;
1446 gettime(&now);
1447
1448 if (!applyRulesToQuery(holders, dq, now)) {
1449 return ProcessQueryResult::Drop;
1450 }
1451
1452 if(dq.dh->qr) { // something turned it into a response
1453 fixUpQueryTurnedResponse(dq, dq.origFlags);
1454
1455 if (!prepareOutgoingResponse(holders, cs, dq, false)) {
1456 return ProcessQueryResult::Drop;
1457 }
1458
1459 ++g_stats.selfAnswered;
1460 ++cs.responses;
1461 return ProcessQueryResult::SendAnswer;
1462 }
1463
1464 std::shared_ptr<ServerPool> serverPool = getPool(*holders.pools, dq.poolname);
1465 dq.packetCache = serverPool->packetCache;
1466 auto policy = *(holders.policy);
1467 if (serverPool->policy != nullptr) {
1468 policy = *(serverPool->policy);
1469 }
1470 auto servers = serverPool->getServers();
1471 if (policy.isLua) {
1472 std::lock_guard<std::mutex> lock(g_luamutex);
1473 selectedBackend = policy.policy(servers, &dq);
1474 }
1475 else {
1476 selectedBackend = policy.policy(servers, &dq);
1477 }
1478
1479 uint16_t cachedResponseSize = dq.size;
1480 uint32_t allowExpired = selectedBackend ? 0 : g_staleCacheEntriesTTL;
1481
1482 if (dq.packetCache && !dq.skipCache) {
1483 dq.dnssecOK = (getEDNSZ(dq) & EDNS_HEADER_FLAG_DO);
1484 }
1485
1486 if (dq.useECS && ((selectedBackend && selectedBackend->useECS) || (!selectedBackend && serverPool->getECS()))) {
1487 // we special case our cache in case a downstream explicitly gave us a universally valid response with a 0 scope
1488 if (dq.packetCache && !dq.skipCache && (!selectedBackend || !selectedBackend->disableZeroScope) && dq.packetCache->isECSParsingEnabled()) {
1489 if (dq.packetCache->get(dq, dq.consumed, dq.dh->id, reinterpret_cast<char*>(dq.dh), &cachedResponseSize, &dq.cacheKeyNoECS, dq.subnet, dq.dnssecOK, allowExpired)) {
1490 dq.len = cachedResponseSize;
1491
1492 if (!prepareOutgoingResponse(holders, cs, dq, true)) {
1493 return ProcessQueryResult::Drop;
1494 }
1495
1496 return ProcessQueryResult::SendAnswer;
1497 }
1498
1499 if (!dq.subnet) {
1500 /* there was no existing ECS on the query, enable the zero-scope feature */
1501 dq.useZeroScope = true;
1502 }
1503 }
1504
1505 if (!handleEDNSClientSubnet(dq, &(dq.ednsAdded), &(dq.ecsAdded), g_preserveTrailingData)) {
1506 vinfolog("Dropping query from %s because we couldn't insert the ECS value", dq.remote->toStringWithPort());
1507 return ProcessQueryResult::Drop;
1508 }
1509 }
1510
1511 if (dq.packetCache && !dq.skipCache) {
1512 if (dq.packetCache->get(dq, dq.consumed, dq.dh->id, reinterpret_cast<char*>(dq.dh), &cachedResponseSize, &dq.cacheKey, dq.subnet, dq.dnssecOK, allowExpired)) {
1513 dq.len = cachedResponseSize;
1514
1515 if (!prepareOutgoingResponse(holders, cs, dq, true)) {
1516 return ProcessQueryResult::Drop;
1517 }
1518
1519 return ProcessQueryResult::SendAnswer;
1520 }
1521 ++g_stats.cacheMisses;
1522 }
1523
1524 if(!selectedBackend) {
1525 ++g_stats.noPolicy;
1526
1527 vinfolog("%s query for %s|%s from %s, no policy applied", g_servFailOnNoPolicy ? "ServFailed" : "Dropped", dq.qname->toLogString(), QType(dq.qtype).getName(), dq.remote->toStringWithPort());
1528 if (g_servFailOnNoPolicy) {
1529 restoreFlags(dq.dh, dq.origFlags);
1530
1531 dq.dh->rcode = RCode::ServFail;
1532 dq.dh->qr = true;
1533
1534 if (!prepareOutgoingResponse(holders, cs, dq, false)) {
1535 return ProcessQueryResult::Drop;
1536 }
1537 // no response-only statistics counter to update.
1538 return ProcessQueryResult::SendAnswer;
1539 }
1540
1541 return ProcessQueryResult::Drop;
1542 }
1543
1544 if (dq.addXPF && selectedBackend->xpfRRCode != 0) {
1545 addXPF(dq, selectedBackend->xpfRRCode, g_preserveTrailingData);
1546 }
1547
1548 selectedBackend->queries++;
1549 return ProcessQueryResult::PassToBackend;
1550 }
1551 catch(const std::exception& e){
1552 vinfolog("Got an error while parsing a %s query from %s, id %d: %s", (dq.tcp ? "TCP" : "UDP"), dq.remote->toStringWithPort(), queryId, e.what());
1553 }
1554 return ProcessQueryResult::Drop;
1555 }
1556
1557 static void processUDPQuery(ClientState& cs, LocalHolders& holders, const struct msghdr* msgh, const ComboAddress& remote, ComboAddress& dest, char* query, uint16_t len, size_t queryBufferSize, struct mmsghdr* responsesVect, unsigned int* queuedResponses, struct iovec* respIOV, cmsgbuf_aligned* respCBuf)
1558 {
1559 assert(responsesVect == nullptr || (queuedResponses != nullptr && respIOV != nullptr && respCBuf != nullptr));
1560 uint16_t queryId = 0;
1561
1562 try {
1563 if (!isUDPQueryAcceptable(cs, holders, msgh, remote, dest)) {
1564 return;
1565 }
1566
1567 /* we need an accurate ("real") value for the response and
1568 to store into the IDS, but not for insertion into the
1569 rings for example */
1570 struct timespec queryRealTime;
1571 gettime(&queryRealTime, true);
1572
1573 std::shared_ptr<DNSCryptQuery> dnsCryptQuery = nullptr;
1574 auto dnsCryptResponse = checkDNSCryptQuery(cs, query, len, dnsCryptQuery, queryRealTime.tv_sec, false);
1575 if (dnsCryptResponse) {
1576 sendUDPResponse(cs.udpFD, reinterpret_cast<char*>(dnsCryptResponse->data()), static_cast<uint16_t>(dnsCryptResponse->size()), 0, dest, remote);
1577 return;
1578 }
1579
1580 struct dnsheader* dh = reinterpret_cast<struct dnsheader*>(query);
1581 queryId = ntohs(dh->id);
1582
1583 if (!checkQueryHeaders(dh)) {
1584 return;
1585 }
1586
1587 uint16_t qtype, qclass;
1588 unsigned int consumed = 0;
1589 DNSName qname(query, len, sizeof(dnsheader), false, &qtype, &qclass, &consumed);
1590 DNSQuestion dq(&qname, qtype, qclass, consumed, dest.sin4.sin_family != 0 ? &dest : &cs.local, &remote, dh, queryBufferSize, len, false, &queryRealTime);
1591 dq.dnsCryptQuery = std::move(dnsCryptQuery);
1592 std::shared_ptr<DownstreamState> ss{nullptr};
1593 auto result = processQuery(dq, cs, holders, ss);
1594
1595 if (result == ProcessQueryResult::Drop) {
1596 return;
1597 }
1598
1599 if (result == ProcessQueryResult::SendAnswer) {
1600 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1601 if (dq.delayMsec == 0 && responsesVect != nullptr) {
1602 queueResponse(cs, reinterpret_cast<char*>(dq.dh), dq.len, *dq.local, *dq.remote, responsesVect[*queuedResponses], respIOV, respCBuf);
1603 (*queuedResponses)++;
1604 return;
1605 }
1606 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1607 /* we use dest, always, because we don't want to use the listening address to send a response since it could be 0.0.0.0 */
1608 sendUDPResponse(cs.udpFD, reinterpret_cast<char*>(dq.dh), dq.len, dq.delayMsec, dest, *dq.remote);
1609 return;
1610 }
1611
1612 if (result != ProcessQueryResult::PassToBackend || ss == nullptr) {
1613 return;
1614 }
1615
1616 unsigned int idOffset = (ss->idOffset++) % ss->idStates.size();
1617 IDState* ids = &ss->idStates[idOffset];
1618 ids->age = 0;
1619 DOHUnit* du = nullptr;
1620
1621 /* that means that the state was in use, possibly with an allocated
1622 DOHUnit that we will need to handle, but we can't touch it before
1623 confirming that we now own this state */
1624 if (ids->isInUse()) {
1625 du = ids->du;
1626 }
1627
1628 /* we atomically replace the value, we now own this state */
1629 if (!ids->markAsUsed()) {
1630 /* the state was not in use.
1631 we reset 'du' because it might have still been in use when we read it. */
1632 du = nullptr;
1633 ++ss->outstanding;
1634 }
1635 else {
1636 /* we are reusing a state, no change in outstanding but if there was an existing DOHUnit we need
1637 to handle it because it's about to be overwritten. */
1638 ids->du = nullptr;
1639 ++ss->reuseds;
1640 ++g_stats.downstreamTimeouts;
1641 handleDOHTimeout(du);
1642 }
1643
1644 ids->cs = &cs;
1645 ids->origFD = cs.udpFD;
1646 ids->origID = dh->id;
1647 setIDStateFromDNSQuestion(*ids, dq, std::move(qname));
1648
1649 /* If we couldn't harvest the real dest addr, still
1650 write down the listening addr since it will be useful
1651 (especially if it's not an 'any' one).
1652 We need to keep track of which one it is since we may
1653 want to use the real but not the listening addr to reply.
1654 */
1655 if (dest.sin4.sin_family != 0) {
1656 ids->origDest = dest;
1657 ids->destHarvested = true;
1658 }
1659 else {
1660 ids->origDest = cs.local;
1661 ids->destHarvested = false;
1662 }
1663
1664 dh->id = idOffset;
1665
1666 int fd = pickBackendSocketForSending(ss);
1667 ssize_t ret = udpClientSendRequestToBackend(ss, fd, query, dq.len);
1668
1669 if(ret < 0) {
1670 ++ss->sendErrors;
1671 ++g_stats.downstreamSendErrors;
1672 }
1673
1674 vinfolog("Got query for %s|%s from %s, relayed to %s", ids->qname.toLogString(), QType(ids->qtype).getName(), remote.toStringWithPort(), ss->getName());
1675 }
1676 catch(const std::exception& e){
1677 vinfolog("Got an error in UDP question thread while parsing a query from %s, id %d: %s", remote.toStringWithPort(), queryId, e.what());
1678 }
1679 }
1680
1681 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1682 static void MultipleMessagesUDPClientThread(ClientState* cs, LocalHolders& holders)
1683 {
1684 struct MMReceiver
1685 {
1686 char packet[s_maxPacketCacheEntrySize];
1687 ComboAddress remote;
1688 ComboAddress dest;
1689 struct iovec iov;
1690 /* used by HarvestDestinationAddress */
1691 cmsgbuf_aligned cbuf;
1692 };
1693 const size_t vectSize = g_udpVectorSize;
1694 /* the actual buffer is larger because:
1695 - we may have to add EDNS and/or ECS
1696 - we use it for self-generated responses (from rule or cache)
1697 but we only accept incoming payloads up to that size
1698 */
1699 static_assert(s_udpIncomingBufferSize <= sizeof(MMReceiver::packet), "the incoming buffer size should not be larger than sizeof(MMReceiver::packet)");
1700
1701 auto recvData = std::unique_ptr<MMReceiver[]>(new MMReceiver[vectSize]);
1702 auto msgVec = std::unique_ptr<struct mmsghdr[]>(new struct mmsghdr[vectSize]);
1703 auto outMsgVec = std::unique_ptr<struct mmsghdr[]>(new struct mmsghdr[vectSize]);
1704
1705 /* initialize the structures needed to receive our messages */
1706 for (size_t idx = 0; idx < vectSize; idx++) {
1707 recvData[idx].remote.sin4.sin_family = cs->local.sin4.sin_family;
1708 fillMSGHdr(&msgVec[idx].msg_hdr, &recvData[idx].iov, &recvData[idx].cbuf, sizeof(recvData[idx].cbuf), recvData[idx].packet, s_udpIncomingBufferSize, &recvData[idx].remote);
1709 }
1710
1711 /* go now */
1712 for(;;) {
1713
1714 /* reset the IO vector, since it's also used to send the vector of responses
1715 to avoid having to copy the data around */
1716 for (size_t idx = 0; idx < vectSize; idx++) {
1717 recvData[idx].iov.iov_base = recvData[idx].packet;
1718 recvData[idx].iov.iov_len = sizeof(recvData[idx].packet);
1719 }
1720
1721 /* block until we have at least one message ready, but return
1722 as many as possible to save the syscall costs */
1723 int msgsGot = recvmmsg(cs->udpFD, msgVec.get(), vectSize, MSG_WAITFORONE | MSG_TRUNC, nullptr);
1724
1725 if (msgsGot <= 0) {
1726 vinfolog("Getting UDP messages via recvmmsg() failed with: %s", stringerror());
1727 continue;
1728 }
1729
1730 unsigned int msgsToSend = 0;
1731
1732 /* process the received messages */
1733 for (int msgIdx = 0; msgIdx < msgsGot; msgIdx++) {
1734 const struct msghdr* msgh = &msgVec[msgIdx].msg_hdr;
1735 unsigned int got = msgVec[msgIdx].msg_len;
1736 const ComboAddress& remote = recvData[msgIdx].remote;
1737
1738 if (static_cast<size_t>(got) < sizeof(struct dnsheader)) {
1739 ++g_stats.nonCompliantQueries;
1740 continue;
1741 }
1742
1743 processUDPQuery(*cs, holders, msgh, remote, recvData[msgIdx].dest, recvData[msgIdx].packet, static_cast<uint16_t>(got), sizeof(recvData[msgIdx].packet), outMsgVec.get(), &msgsToSend, &recvData[msgIdx].iov, &recvData[msgIdx].cbuf);
1744
1745 }
1746
1747 /* immediate (not delayed or sent to a backend) responses (mostly from a rule, dynamic block
1748 or the cache) can be sent in batch too */
1749
1750 if (msgsToSend > 0 && msgsToSend <= static_cast<unsigned int>(msgsGot)) {
1751 int sent = sendmmsg(cs->udpFD, outMsgVec.get(), msgsToSend, 0);
1752
1753 if (sent < 0 || static_cast<unsigned int>(sent) != msgsToSend) {
1754 vinfolog("Error sending responses with sendmmsg() (%d on %u): %s", sent, msgsToSend, stringerror());
1755 }
1756 }
1757
1758 }
1759 }
1760 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1761
1762 // listens to incoming queries, sends out to downstream servers, noting the intended return path
1763 static void udpClientThread(ClientState* cs)
1764 try
1765 {
1766 setThreadName("dnsdist/udpClie");
1767 LocalHolders holders;
1768
1769 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1770 if (g_udpVectorSize > 1) {
1771 MultipleMessagesUDPClientThread(cs, holders);
1772
1773 }
1774 else
1775 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1776 {
1777 char packet[s_maxPacketCacheEntrySize];
1778 /* the actual buffer is larger because:
1779 - we may have to add EDNS and/or ECS
1780 - we use it for self-generated responses (from rule or cache)
1781 but we only accept incoming payloads up to that size
1782 */
1783 static_assert(s_udpIncomingBufferSize <= sizeof(packet), "the incoming buffer size should not be larger than sizeof(MMReceiver::packet)");
1784 struct msghdr msgh;
1785 struct iovec iov;
1786 /* used by HarvestDestinationAddress */
1787 cmsgbuf_aligned cbuf;
1788
1789 ComboAddress remote;
1790 ComboAddress dest;
1791 remote.sin4.sin_family = cs->local.sin4.sin_family;
1792 fillMSGHdr(&msgh, &iov, &cbuf, sizeof(cbuf), packet, s_udpIncomingBufferSize, &remote);
1793
1794 for(;;) {
1795 ssize_t got = recvmsg(cs->udpFD, &msgh, 0);
1796
1797 if (got < 0 || static_cast<size_t>(got) < sizeof(struct dnsheader)) {
1798 ++g_stats.nonCompliantQueries;
1799 continue;
1800 }
1801
1802 processUDPQuery(*cs, holders, &msgh, remote, dest, packet, static_cast<uint16_t>(got), sizeof(packet), nullptr, nullptr, nullptr, nullptr);
1803 }
1804 }
1805 }
1806 catch(const std::exception &e)
1807 {
1808 errlog("UDP client thread died because of exception: %s", e.what());
1809 }
1810 catch(const PDNSException &e)
1811 {
1812 errlog("UDP client thread died because of PowerDNS exception: %s", e.reason);
1813 }
1814 catch(...)
1815 {
1816 errlog("UDP client thread died because of an exception: %s", "unknown");
1817 }
1818
1819 uint16_t getRandomDNSID()
1820 {
1821 #ifdef HAVE_LIBSODIUM
1822 return (randombytes_random() % 65536);
1823 #else
1824 return (random() % 65536);
1825 #endif
1826 }
1827
1828 static bool upCheck(const shared_ptr<DownstreamState>& ds)
1829 try
1830 {
1831 DNSName checkName = ds->checkName;
1832 uint16_t checkType = ds->checkType.getCode();
1833 uint16_t checkClass = ds->checkClass;
1834 dnsheader checkHeader;
1835 memset(&checkHeader, 0, sizeof(checkHeader));
1836
1837 checkHeader.qdcount = htons(1);
1838 checkHeader.id = getRandomDNSID();
1839
1840 checkHeader.rd = true;
1841 if (ds->setCD) {
1842 checkHeader.cd = true;
1843 }
1844
1845 if (ds->checkFunction) {
1846 std::lock_guard<std::mutex> lock(g_luamutex);
1847 auto ret = ds->checkFunction(checkName, checkType, checkClass, &checkHeader);
1848 checkName = std::get<0>(ret);
1849 checkType = std::get<1>(ret);
1850 checkClass = std::get<2>(ret);
1851 }
1852
1853 vector<uint8_t> packet;
1854 DNSPacketWriter dpw(packet, checkName, checkType, checkClass);
1855 dnsheader * requestHeader = dpw.getHeader();
1856 *requestHeader = checkHeader;
1857
1858 Socket sock(ds->remote.sin4.sin_family, SOCK_DGRAM);
1859 sock.setNonBlocking();
1860 if (!IsAnyAddress(ds->sourceAddr)) {
1861 sock.setReuseAddr();
1862 if (!ds->sourceItfName.empty()) {
1863 #ifdef SO_BINDTODEVICE
1864 int res = setsockopt(sock.getHandle(), SOL_SOCKET, SO_BINDTODEVICE, ds->sourceItfName.c_str(), ds->sourceItfName.length());
1865 if (res != 0 && g_verboseHealthChecks) {
1866 infolog("Error settting SO_BINDTODEVICE on the health check socket for backend '%s': %s", ds->getNameWithAddr(), stringerror());
1867 }
1868 #endif
1869 }
1870 sock.bind(ds->sourceAddr);
1871 }
1872 sock.connect(ds->remote);
1873 ssize_t sent = udpClientSendRequestToBackend(ds, sock.getHandle(), reinterpret_cast<char*>(&packet[0]), packet.size(), true);
1874 if (sent < 0) {
1875 int ret = errno;
1876 if (g_verboseHealthChecks)
1877 infolog("Error while sending a health check query to backend %s: %d", ds->getNameWithAddr(), ret);
1878 return false;
1879 }
1880
1881 int ret = waitForRWData(sock.getHandle(), true, /* ms to seconds */ ds->checkTimeout / 1000, /* remaining ms to us */ (ds->checkTimeout % 1000) * 1000);
1882 if(ret < 0 || !ret) { // error, timeout, both are down!
1883 if (ret < 0) {
1884 ret = errno;
1885 if (g_verboseHealthChecks)
1886 infolog("Error while waiting for the health check response from backend %s: %d", ds->getNameWithAddr(), ret);
1887 }
1888 else {
1889 if (g_verboseHealthChecks)
1890 infolog("Timeout while waiting for the health check response from backend %s", ds->getNameWithAddr());
1891 }
1892 return false;
1893 }
1894
1895 string reply;
1896 ComboAddress from;
1897 sock.recvFrom(reply, from);
1898
1899 /* we are using a connected socket but hey.. */
1900 if (from != ds->remote) {
1901 if (g_verboseHealthChecks)
1902 infolog("Invalid health check response received from %s, expecting one from %s", from.toStringWithPort(), ds->remote.toStringWithPort());
1903 return false;
1904 }
1905
1906 const dnsheader * responseHeader = reinterpret_cast<const dnsheader *>(reply.c_str());
1907
1908 if (reply.size() < sizeof(*responseHeader)) {
1909 if (g_verboseHealthChecks)
1910 infolog("Invalid health check response of size %d from backend %s, expecting at least %d", reply.size(), ds->getNameWithAddr(), sizeof(*responseHeader));
1911 return false;
1912 }
1913
1914 if (responseHeader->id != requestHeader->id) {
1915 if (g_verboseHealthChecks)
1916 infolog("Invalid health check response id %d from backend %s, expecting %d", responseHeader->id, ds->getNameWithAddr(), requestHeader->id);
1917 return false;
1918 }
1919
1920 if (!responseHeader->qr) {
1921 if (g_verboseHealthChecks)
1922 infolog("Invalid health check response from backend %s, expecting QR to be set", ds->getNameWithAddr());
1923 return false;
1924 }
1925
1926 if (responseHeader->rcode == RCode::ServFail) {
1927 if (g_verboseHealthChecks)
1928 infolog("Backend %s responded to health check with ServFail", ds->getNameWithAddr());
1929 return false;
1930 }
1931
1932 if (ds->mustResolve && (responseHeader->rcode == RCode::NXDomain || responseHeader->rcode == RCode::Refused)) {
1933 if (g_verboseHealthChecks)
1934 infolog("Backend %s responded to health check with %s while mustResolve is set", ds->getNameWithAddr(), responseHeader->rcode == RCode::NXDomain ? "NXDomain" : "Refused");
1935 return false;
1936 }
1937
1938 uint16_t receivedType;
1939 uint16_t receivedClass;
1940 DNSName receivedName(reply.c_str(), reply.size(), sizeof(dnsheader), false, &receivedType, &receivedClass);
1941
1942 if (receivedName != checkName || receivedType != checkType || receivedClass != checkClass) {
1943 if (g_verboseHealthChecks)
1944 infolog("Backend %s responded to health check with an invalid qname (%s vs %s), qtype (%s vs %s) or qclass (%d vs %d)", ds->getNameWithAddr(), receivedName.toLogString(), checkName.toLogString(), QType(receivedType).getName(), QType(checkType).getName(), receivedClass, checkClass);
1945 return false;
1946 }
1947
1948 return true;
1949 }
1950 catch(const std::exception& e)
1951 {
1952 if (g_verboseHealthChecks)
1953 infolog("Error checking the health of backend %s: %s", ds->getNameWithAddr(), e.what());
1954 return false;
1955 }
1956 catch(...)
1957 {
1958 if (g_verboseHealthChecks)
1959 infolog("Unknown exception while checking the health of backend %s", ds->getNameWithAddr());
1960 return false;
1961 }
1962
1963 uint64_t g_maxTCPClientThreads{10};
1964 std::atomic<uint16_t> g_cacheCleaningDelay{60};
1965 std::atomic<uint16_t> g_cacheCleaningPercentage{100};
1966
1967 void maintThread()
1968 {
1969 setThreadName("dnsdist/main");
1970 int interval = 1;
1971 size_t counter = 0;
1972 int32_t secondsToWaitLog = 0;
1973
1974 for(;;) {
1975 sleep(interval);
1976
1977 {
1978 std::lock_guard<std::mutex> lock(g_luamutex);
1979 auto f = g_lua.readVariable<boost::optional<std::function<void()> > >("maintenance");
1980 if(f) {
1981 try {
1982 (*f)();
1983 secondsToWaitLog = 0;
1984 }
1985 catch(std::exception &e) {
1986 if (secondsToWaitLog <= 0) {
1987 infolog("Error during execution of maintenance function: %s", e.what());
1988 secondsToWaitLog = 61;
1989 }
1990 secondsToWaitLog -= interval;
1991 }
1992 }
1993 }
1994
1995 counter++;
1996 if (counter >= g_cacheCleaningDelay) {
1997 /* keep track, for each cache, of whether we should keep
1998 expired entries */
1999 std::map<std::shared_ptr<DNSDistPacketCache>, bool> caches;
2000
2001 /* gather all caches actually used by at least one pool, and see
2002 if something prevents us from cleaning the expired entries */
2003 auto localPools = g_pools.getLocal();
2004 for (const auto& entry : *localPools) {
2005 auto& pool = entry.second;
2006
2007 auto packetCache = pool->packetCache;
2008 if (!packetCache) {
2009 continue;
2010 }
2011
2012 auto pair = caches.insert({packetCache, false});
2013 auto& iter = pair.first;
2014 /* if we need to keep stale data for this cache (ie, not clear
2015 expired entries when at least one pool using this cache
2016 has all its backends down) */
2017 if (packetCache->keepStaleData() && iter->second == false) {
2018 /* so far all pools had at least one backend up */
2019 if (pool->countServers(true) == 0) {
2020 iter->second = true;
2021 }
2022 }
2023 }
2024
2025 for (auto pair : caches) {
2026 /* shall we keep expired entries ? */
2027 if (pair.second == true) {
2028 continue;
2029 }
2030 auto& packetCache = pair.first;
2031 size_t upTo = (packetCache->getMaxEntries()* (100 - g_cacheCleaningPercentage)) / 100;
2032 packetCache->purgeExpired(upTo);
2033 }
2034 counter = 0;
2035 }
2036
2037 // ponder pruning g_dynblocks of expired entries here
2038 }
2039 }
2040
2041 static void secPollThread()
2042 {
2043 setThreadName("dnsdist/secpoll");
2044
2045 for (;;) {
2046 try {
2047 doSecPoll(g_secPollSuffix);
2048 }
2049 catch(...) {
2050 }
2051 sleep(g_secPollInterval);
2052 }
2053 }
2054
2055 static void healthChecksThread()
2056 {
2057 setThreadName("dnsdist/healthC");
2058
2059 int interval = 1;
2060
2061 for(;;) {
2062 sleep(interval);
2063
2064 if(g_tcpclientthreads->getQueuedCount() > 1 && !g_tcpclientthreads->hasReachedMaxThreads())
2065 g_tcpclientthreads->addTCPClientThread();
2066
2067 auto states = g_dstates.getLocal(); // this points to the actual shared_ptrs!
2068 for(auto& dss : *states) {
2069 if(++dss->lastCheck < dss->checkInterval)
2070 continue;
2071 dss->lastCheck = 0;
2072 if(dss->availability==DownstreamState::Availability::Auto) {
2073 bool newState=upCheck(dss);
2074 if (newState) {
2075 /* check succeeded */
2076 dss->currentCheckFailures = 0;
2077
2078 if (!dss->upStatus) {
2079 /* we were marked as down */
2080 dss->consecutiveSuccessfulChecks++;
2081 if (dss->consecutiveSuccessfulChecks < dss->minRiseSuccesses) {
2082 /* if we need more than one successful check to rise
2083 and we didn't reach the threshold yet,
2084 let's stay down */
2085 newState = false;
2086 }
2087 }
2088 }
2089 else {
2090 /* check failed */
2091 dss->consecutiveSuccessfulChecks = 0;
2092
2093 if (dss->upStatus) {
2094 /* we are currently up */
2095 dss->currentCheckFailures++;
2096 if (dss->currentCheckFailures < dss->maxCheckFailures) {
2097 /* we need more than one failure to be marked as down,
2098 and we did not reach the threshold yet, let's stay down */
2099 newState = true;
2100 }
2101 }
2102 }
2103
2104 if(newState != dss->upStatus) {
2105 warnlog("Marking downstream %s as '%s'", dss->getNameWithAddr(), newState ? "up" : "down");
2106
2107 if (newState && !dss->connected) {
2108 newState = dss->reconnect();
2109
2110 if (dss->connected && !dss->threadStarted.test_and_set()) {
2111 dss->tid = thread(responderThread, dss);
2112 }
2113 }
2114
2115 dss->upStatus = newState;
2116 dss->currentCheckFailures = 0;
2117 dss->consecutiveSuccessfulChecks = 0;
2118 if (g_snmpAgent && g_snmpTrapsEnabled) {
2119 g_snmpAgent->sendBackendStatusChangeTrap(dss);
2120 }
2121 }
2122 }
2123
2124 auto delta = dss->sw.udiffAndSet()/1000000.0;
2125 dss->queryLoad = 1.0*(dss->queries.load() - dss->prev.queries.load())/delta;
2126 dss->dropRate = 1.0*(dss->reuseds.load() - dss->prev.reuseds.load())/delta;
2127 dss->prev.queries.store(dss->queries.load());
2128 dss->prev.reuseds.store(dss->reuseds.load());
2129
2130 for(IDState& ids : dss->idStates) { // timeouts
2131 int64_t usageIndicator = ids.usageIndicator;
2132 if(IDState::isInUse(usageIndicator) && ids.age++ > g_udpTimeout) {
2133 /* We mark the state as unused as soon as possible
2134 to limit the risk of racing with the
2135 responder thread.
2136 */
2137 auto oldDU = ids.du;
2138
2139 if (!ids.tryMarkUnused(usageIndicator)) {
2140 /* this state has been altered in the meantime,
2141 don't go anywhere near it */
2142 continue;
2143 }
2144 ids.du = nullptr;
2145 handleDOHTimeout(oldDU);
2146 ids.age = 0;
2147 dss->reuseds++;
2148 --dss->outstanding;
2149 ++g_stats.downstreamTimeouts; // this is an 'actively' discovered timeout
2150 vinfolog("Had a downstream timeout from %s (%s) for query for %s|%s from %s",
2151 dss->remote.toStringWithPort(), dss->name,
2152 ids.qname.toLogString(), QType(ids.qtype).getName(), ids.origRemote.toStringWithPort());
2153
2154 struct timespec ts;
2155 gettime(&ts);
2156
2157 struct dnsheader fake;
2158 memset(&fake, 0, sizeof(fake));
2159 fake.id = ids.origID;
2160
2161 g_rings.insertResponse(ts, ids.origRemote, ids.qname, ids.qtype, std::numeric_limits<unsigned int>::max(), 0, fake, dss->remote);
2162 }
2163 }
2164 }
2165 }
2166 }
2167
2168 static void bindAny(int af, int sock)
2169 {
2170 __attribute__((unused)) int one = 1;
2171
2172 #ifdef IP_FREEBIND
2173 if (setsockopt(sock, IPPROTO_IP, IP_FREEBIND, &one, sizeof(one)) < 0)
2174 warnlog("Warning: IP_FREEBIND setsockopt failed: %s", stringerror());
2175 #endif
2176
2177 #ifdef IP_BINDANY
2178 if (af == AF_INET)
2179 if (setsockopt(sock, IPPROTO_IP, IP_BINDANY, &one, sizeof(one)) < 0)
2180 warnlog("Warning: IP_BINDANY setsockopt failed: %s", stringerror());
2181 #endif
2182 #ifdef IPV6_BINDANY
2183 if (af == AF_INET6)
2184 if (setsockopt(sock, IPPROTO_IPV6, IPV6_BINDANY, &one, sizeof(one)) < 0)
2185 warnlog("Warning: IPV6_BINDANY setsockopt failed: %s", stringerror());
2186 #endif
2187 #ifdef SO_BINDANY
2188 if (setsockopt(sock, SOL_SOCKET, SO_BINDANY, &one, sizeof(one)) < 0)
2189 warnlog("Warning: SO_BINDANY setsockopt failed: %s", stringerror());
2190 #endif
2191 }
2192
2193 static void dropGroupPrivs(gid_t gid)
2194 {
2195 if (gid) {
2196 if (setgid(gid) == 0) {
2197 if (setgroups(0, NULL) < 0) {
2198 warnlog("Warning: Unable to drop supplementary gids: %s", stringerror());
2199 }
2200 }
2201 else {
2202 warnlog("Warning: Unable to set group ID to %d: %s", gid, stringerror());
2203 }
2204 }
2205 }
2206
2207 static void dropUserPrivs(uid_t uid)
2208 {
2209 if(uid) {
2210 if(setuid(uid) < 0) {
2211 warnlog("Warning: Unable to set user ID to %d: %s", uid, stringerror());
2212 }
2213 }
2214 }
2215
2216 static void checkFileDescriptorsLimits(size_t udpBindsCount, size_t tcpBindsCount)
2217 {
2218 /* stdin, stdout, stderr */
2219 size_t requiredFDsCount = 3;
2220 auto backends = g_dstates.getLocal();
2221 /* UDP sockets to backends */
2222 size_t backendUDPSocketsCount = 0;
2223 for (const auto& backend : *backends) {
2224 backendUDPSocketsCount += backend->sockets.size();
2225 }
2226 requiredFDsCount += backendUDPSocketsCount;
2227 /* TCP sockets to backends */
2228 requiredFDsCount += (backends->size() * g_maxTCPClientThreads);
2229 /* listening sockets */
2230 requiredFDsCount += udpBindsCount;
2231 requiredFDsCount += tcpBindsCount;
2232 /* max TCP connections currently served */
2233 requiredFDsCount += g_maxTCPClientThreads;
2234 /* max pipes for communicating between TCP acceptors and client threads */
2235 requiredFDsCount += (g_maxTCPClientThreads * 2);
2236 /* max TCP queued connections */
2237 requiredFDsCount += g_maxTCPQueuedConnections;
2238 /* DelayPipe pipe */
2239 requiredFDsCount += 2;
2240 /* syslog socket */
2241 requiredFDsCount++;
2242 /* webserver main socket */
2243 requiredFDsCount++;
2244 /* console main socket */
2245 requiredFDsCount++;
2246 /* carbon export */
2247 requiredFDsCount++;
2248 /* history file */
2249 requiredFDsCount++;
2250 struct rlimit rl;
2251 getrlimit(RLIMIT_NOFILE, &rl);
2252 if (rl.rlim_cur <= requiredFDsCount) {
2253 warnlog("Warning, this configuration can use more than %d file descriptors, web server and console connections not included, and the current limit is %d.", std::to_string(requiredFDsCount), std::to_string(rl.rlim_cur));
2254 #ifdef HAVE_SYSTEMD
2255 warnlog("You can increase this value by using LimitNOFILE= in the systemd unit file or ulimit.");
2256 #else
2257 warnlog("You can increase this value by using ulimit.");
2258 #endif
2259 }
2260 }
2261
2262 static void setUpLocalBind(std::unique_ptr<ClientState>& cs)
2263 {
2264 /* skip some warnings if there is an identical UDP context */
2265 bool warn = cs->tcp == false || cs->tlsFrontend != nullptr || cs->dohFrontend != nullptr;
2266 int& fd = cs->tcp == false ? cs->udpFD : cs->tcpFD;
2267 (void) warn;
2268
2269 fd = SSocket(cs->local.sin4.sin_family, cs->tcp == false ? SOCK_DGRAM : SOCK_STREAM, 0);
2270
2271 if (cs->tcp) {
2272 SSetsockopt(fd, SOL_SOCKET, SO_REUSEADDR, 1);
2273 #ifdef TCP_DEFER_ACCEPT
2274 SSetsockopt(fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, 1);
2275 #endif
2276 if (cs->fastOpenQueueSize > 0) {
2277 #ifdef TCP_FASTOPEN
2278 SSetsockopt(fd, IPPROTO_TCP, TCP_FASTOPEN, cs->fastOpenQueueSize);
2279 #else
2280 if (warn) {
2281 warnlog("TCP Fast Open has been configured on local address '%s' but is not supported", cs->local.toStringWithPort());
2282 }
2283 #endif
2284 }
2285 }
2286
2287 if(cs->local.sin4.sin_family == AF_INET6) {
2288 SSetsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, 1);
2289 }
2290
2291 bindAny(cs->local.sin4.sin_family, fd);
2292
2293 if(!cs->tcp && IsAnyAddress(cs->local)) {
2294 int one=1;
2295 setsockopt(fd, IPPROTO_IP, GEN_IP_PKTINFO, &one, sizeof(one)); // linux supports this, so why not - might fail on other systems
2296 #ifdef IPV6_RECVPKTINFO
2297 setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, &one, sizeof(one));
2298 #endif
2299 }
2300
2301 if (cs->reuseport) {
2302 #ifdef SO_REUSEPORT
2303 SSetsockopt(fd, SOL_SOCKET, SO_REUSEPORT, 1);
2304 #else
2305 if (warn) {
2306 /* no need to warn again if configured but support is not available, we already did for UDP */
2307 warnlog("SO_REUSEPORT has been configured on local address '%s' but is not supported", cs->local.toStringWithPort());
2308 }
2309 #endif
2310 }
2311
2312 if (!cs->tcp) {
2313 if (cs->local.isIPv4()) {
2314 try {
2315 setSocketIgnorePMTU(cs->udpFD);
2316 }
2317 catch(const std::exception& e) {
2318 warnlog("Failed to set IP_MTU_DISCOVER on UDP server socket for local address '%s': %s", cs->local.toStringWithPort(), e.what());
2319 }
2320 }
2321 }
2322
2323 const std::string& itf = cs->interface;
2324 if (!itf.empty()) {
2325 #ifdef SO_BINDTODEVICE
2326 int res = setsockopt(fd, SOL_SOCKET, SO_BINDTODEVICE, itf.c_str(), itf.length());
2327 if (res != 0) {
2328 warnlog("Error setting up the interface on local address '%s': %s", cs->local.toStringWithPort(), stringerror());
2329 }
2330 #else
2331 if (warn) {
2332 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", cs->local.toStringWithPort());
2333 }
2334 #endif
2335 }
2336
2337 #ifdef HAVE_EBPF
2338 if (g_defaultBPFFilter) {
2339 cs->attachFilter(g_defaultBPFFilter);
2340 vinfolog("Attaching default BPF Filter to %s frontend %s", (!cs->tcp ? "UDP" : "TCP"), cs->local.toStringWithPort());
2341 }
2342 #endif /* HAVE_EBPF */
2343
2344 if (cs->tlsFrontend != nullptr) {
2345 if (!cs->tlsFrontend->setupTLS()) {
2346 errlog("Error while setting up TLS on local address '%s', exiting", cs->local.toStringWithPort());
2347 _exit(EXIT_FAILURE);
2348 }
2349 }
2350
2351 if (cs->dohFrontend != nullptr) {
2352 cs->dohFrontend->setup();
2353 }
2354
2355 SBind(fd, cs->local);
2356
2357 if (cs->tcp) {
2358 SListen(cs->tcpFD, SOMAXCONN);
2359 if (cs->tlsFrontend != nullptr) {
2360 warnlog("Listening on %s for TLS", cs->local.toStringWithPort());
2361 }
2362 else if (cs->dohFrontend != nullptr) {
2363 warnlog("Listening on %s for DoH", cs->local.toStringWithPort());
2364 }
2365 else if (cs->dnscryptCtx != nullptr) {
2366 warnlog("Listening on %s for DNSCrypt", cs->local.toStringWithPort());
2367 }
2368 else {
2369 warnlog("Listening on %s", cs->local.toStringWithPort());
2370 }
2371 }
2372
2373 cs->ready = true;
2374 }
2375
2376 struct
2377 {
2378 vector<string> locals;
2379 vector<string> remotes;
2380 bool checkConfig{false};
2381 bool beClient{false};
2382 bool beSupervised{false};
2383 string command;
2384 string config;
2385 string uid;
2386 string gid;
2387 } g_cmdLine;
2388
2389 std::atomic<bool> g_configurationDone{false};
2390
2391 static void usage()
2392 {
2393 cout<<endl;
2394 cout<<"Syntax: dnsdist [-C,--config file] [-c,--client [IP[:PORT]]]\n";
2395 cout<<"[-e,--execute cmd] [-h,--help] [-l,--local addr]\n";
2396 cout<<"[-v,--verbose] [--check-config] [--version]\n";
2397 cout<<"\n";
2398 cout<<"-a,--acl netmask Add this netmask to the ACL\n";
2399 cout<<"-C,--config file Load configuration from 'file'\n";
2400 cout<<"-c,--client Operate as a client, connect to dnsdist. This reads\n";
2401 cout<<" controlSocket from your configuration file, but also\n";
2402 cout<<" accepts an IP:PORT argument\n";
2403 #ifdef HAVE_LIBSODIUM
2404 cout<<"-k,--setkey KEY Use KEY for encrypted communication to dnsdist. This\n";
2405 cout<<" is similar to setting setKey in the configuration file.\n";
2406 cout<<" NOTE: this will leak this key in your shell's history\n";
2407 cout<<" and in the systems running process list.\n";
2408 #endif
2409 cout<<"--check-config Validate the configuration file and exit. The exit-code\n";
2410 cout<<" reflects the validation, 0 is OK, 1 means an error.\n";
2411 cout<<" Any errors are printed as well.\n";
2412 cout<<"-e,--execute cmd Connect to dnsdist and execute 'cmd'\n";
2413 cout<<"-g,--gid gid Change the process group ID after binding sockets\n";
2414 cout<<"-h,--help Display this helpful message\n";
2415 cout<<"-l,--local address Listen on this local address\n";
2416 cout<<"--supervised Don't open a console, I'm supervised\n";
2417 cout<<" (use with e.g. systemd and daemontools)\n";
2418 cout<<"--disable-syslog Don't log to syslog, only to stdout\n";
2419 cout<<" (use with e.g. systemd)\n";
2420 cout<<"-u,--uid uid Change the process user ID after binding sockets\n";
2421 cout<<"-v,--verbose Enable verbose mode\n";
2422 cout<<"-V,--version Show dnsdist version information and exit\n";
2423 }
2424
2425 int main(int argc, char** argv)
2426 try
2427 {
2428 size_t udpBindsCount = 0;
2429 size_t tcpBindsCount = 0;
2430 rl_attempted_completion_function = my_completion;
2431 rl_completion_append_character = 0;
2432
2433 signal(SIGPIPE, SIG_IGN);
2434 signal(SIGCHLD, SIG_IGN);
2435 openlog("dnsdist", LOG_PID|LOG_NDELAY, LOG_DAEMON);
2436
2437 #ifdef HAVE_LIBSODIUM
2438 if (sodium_init() == -1) {
2439 cerr<<"Unable to initialize crypto library"<<endl;
2440 exit(EXIT_FAILURE);
2441 }
2442 g_hashperturb=randombytes_uniform(0xffffffff);
2443 srandom(randombytes_uniform(0xffffffff));
2444 #else
2445 {
2446 struct timeval tv;
2447 gettimeofday(&tv, 0);
2448 srandom(tv.tv_sec ^ tv.tv_usec ^ getpid());
2449 g_hashperturb=random();
2450 }
2451
2452 #endif
2453 ComboAddress clientAddress = ComboAddress();
2454 g_cmdLine.config=SYSCONFDIR "/dnsdist.conf";
2455 struct option longopts[]={
2456 {"acl", required_argument, 0, 'a'},
2457 {"check-config", no_argument, 0, 1},
2458 {"client", no_argument, 0, 'c'},
2459 {"config", required_argument, 0, 'C'},
2460 {"disable-syslog", no_argument, 0, 2},
2461 {"execute", required_argument, 0, 'e'},
2462 {"gid", required_argument, 0, 'g'},
2463 {"help", no_argument, 0, 'h'},
2464 {"local", required_argument, 0, 'l'},
2465 {"setkey", required_argument, 0, 'k'},
2466 {"supervised", no_argument, 0, 3},
2467 {"uid", required_argument, 0, 'u'},
2468 {"verbose", no_argument, 0, 'v'},
2469 {"version", no_argument, 0, 'V'},
2470 {0,0,0,0}
2471 };
2472 int longindex=0;
2473 string optstring;
2474 for(;;) {
2475 int c=getopt_long(argc, argv, "a:cC:e:g:hk:l:u:vV", longopts, &longindex);
2476 if(c==-1)
2477 break;
2478 switch(c) {
2479 case 1:
2480 g_cmdLine.checkConfig=true;
2481 break;
2482 case 2:
2483 g_syslog=false;
2484 break;
2485 case 3:
2486 g_cmdLine.beSupervised=true;
2487 break;
2488 case 'C':
2489 g_cmdLine.config=optarg;
2490 break;
2491 case 'c':
2492 g_cmdLine.beClient=true;
2493 break;
2494 case 'e':
2495 g_cmdLine.command=optarg;
2496 break;
2497 case 'g':
2498 g_cmdLine.gid=optarg;
2499 break;
2500 case 'h':
2501 cout<<"dnsdist "<<VERSION<<endl;
2502 usage();
2503 cout<<"\n";
2504 exit(EXIT_SUCCESS);
2505 break;
2506 case 'a':
2507 optstring=optarg;
2508 g_ACL.modify([optstring](NetmaskGroup& nmg) { nmg.addMask(optstring); });
2509 break;
2510 case 'k':
2511 #ifdef HAVE_LIBSODIUM
2512 if (B64Decode(string(optarg), g_consoleKey) < 0) {
2513 cerr<<"Unable to decode key '"<<optarg<<"'."<<endl;
2514 exit(EXIT_FAILURE);
2515 }
2516 #else
2517 cerr<<"dnsdist has been built without libsodium, -k/--setkey is unsupported."<<endl;
2518 exit(EXIT_FAILURE);
2519 #endif
2520 break;
2521 case 'l':
2522 g_cmdLine.locals.push_back(trim_copy(string(optarg)));
2523 break;
2524 case 'u':
2525 g_cmdLine.uid=optarg;
2526 break;
2527 case 'v':
2528 g_verbose=true;
2529 break;
2530 case 'V':
2531 #ifdef LUAJIT_VERSION
2532 cout<<"dnsdist "<<VERSION<<" ("<<LUA_RELEASE<<" ["<<LUAJIT_VERSION<<"])"<<endl;
2533 #else
2534 cout<<"dnsdist "<<VERSION<<" ("<<LUA_RELEASE<<")"<<endl;
2535 #endif
2536 cout<<"Enabled features: ";
2537 #ifdef HAVE_CDB
2538 cout<<"cdb ";
2539 #endif
2540 #ifdef HAVE_DNS_OVER_TLS
2541 cout<<"dns-over-tls(";
2542 #ifdef HAVE_GNUTLS
2543 cout<<"gnutls";
2544 #ifdef HAVE_LIBSSL
2545 cout<<" ";
2546 #endif
2547 #endif
2548 #ifdef HAVE_LIBSSL
2549 cout<<"openssl";
2550 #endif
2551 cout<<") ";
2552 #endif
2553 #ifdef HAVE_DNS_OVER_HTTPS
2554 cout<<"dns-over-https(DOH) ";
2555 #endif
2556 #ifdef HAVE_DNSCRYPT
2557 cout<<"dnscrypt ";
2558 #endif
2559 #ifdef HAVE_EBPF
2560 cout<<"ebpf ";
2561 #endif
2562 #ifdef HAVE_FSTRM
2563 cout<<"fstrm ";
2564 #endif
2565 #ifdef HAVE_LIBCRYPTO
2566 cout<<"ipcipher ";
2567 #endif
2568 #ifdef HAVE_LIBSODIUM
2569 cout<<"libsodium ";
2570 #endif
2571 #ifdef HAVE_LMDB
2572 cout<<"lmdb ";
2573 #endif
2574 #ifdef HAVE_PROTOBUF
2575 cout<<"protobuf ";
2576 #endif
2577 #ifdef HAVE_RE2
2578 cout<<"re2 ";
2579 #endif
2580 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
2581 cout<<"recvmmsg/sendmmsg ";
2582 #endif
2583 #ifdef HAVE_NET_SNMP
2584 cout<<"snmp ";
2585 #endif
2586 #ifdef HAVE_SYSTEMD
2587 cout<<"systemd";
2588 #endif
2589 cout<<endl;
2590 exit(EXIT_SUCCESS);
2591 break;
2592 case '?':
2593 //getopt_long printed an error message.
2594 usage();
2595 exit(EXIT_FAILURE);
2596 break;
2597 }
2598 }
2599
2600 argc-=optind;
2601 argv+=optind;
2602 for(auto p = argv; *p; ++p) {
2603 if(g_cmdLine.beClient) {
2604 clientAddress = ComboAddress(*p, 5199);
2605 } else {
2606 g_cmdLine.remotes.push_back(*p);
2607 }
2608 }
2609
2610 ServerPolicy leastOutstandingPol{"leastOutstanding", leastOutstanding, false};
2611
2612 g_policy.setState(leastOutstandingPol);
2613 if(g_cmdLine.beClient || !g_cmdLine.command.empty()) {
2614 setupLua(true, g_cmdLine.config);
2615 if (clientAddress != ComboAddress())
2616 g_serverControl = clientAddress;
2617 doClient(g_serverControl, g_cmdLine.command);
2618 _exit(EXIT_SUCCESS);
2619 }
2620
2621 auto acl = g_ACL.getCopy();
2622 if(acl.empty()) {
2623 for(auto& addr : {"127.0.0.0/8", "10.0.0.0/8", "100.64.0.0/10", "169.254.0.0/16", "192.168.0.0/16", "172.16.0.0/12", "::1/128", "fc00::/7", "fe80::/10"})
2624 acl.addMask(addr);
2625 g_ACL.setState(acl);
2626 }
2627
2628 auto consoleACL = g_consoleACL.getCopy();
2629 for (const auto& mask : { "127.0.0.1/8", "::1/128" }) {
2630 consoleACL.addMask(mask);
2631 }
2632 g_consoleACL.setState(consoleACL);
2633
2634 if (g_cmdLine.checkConfig) {
2635 setupLua(true, g_cmdLine.config);
2636 // No exception was thrown
2637 infolog("Configuration '%s' OK!", g_cmdLine.config);
2638 _exit(EXIT_SUCCESS);
2639 }
2640
2641 auto todo=setupLua(false, g_cmdLine.config);
2642
2643 auto localPools = g_pools.getCopy();
2644 {
2645 bool precompute = false;
2646 if (g_policy.getLocal()->name == "chashed") {
2647 precompute = true;
2648 } else {
2649 for (const auto& entry: localPools) {
2650 if (entry.second->policy != nullptr && entry.second->policy->name == "chashed") {
2651 precompute = true;
2652 break ;
2653 }
2654 }
2655 }
2656 if (precompute) {
2657 vinfolog("Pre-computing hashes for consistent hash load-balancing policy");
2658 // pre compute hashes
2659 auto backends = g_dstates.getLocal();
2660 for (auto& backend: *backends) {
2661 backend->hash();
2662 }
2663 }
2664 }
2665
2666 if (!g_cmdLine.locals.empty()) {
2667 for (auto it = g_frontends.begin(); it != g_frontends.end(); ) {
2668 /* DoH, DoT and DNSCrypt frontends are separate */
2669 if ((*it)->dohFrontend == nullptr && (*it)->tlsFrontend == nullptr && (*it)->dnscryptCtx == nullptr) {
2670 it = g_frontends.erase(it);
2671 }
2672 else {
2673 ++it;
2674 }
2675 }
2676
2677 for(const auto& loc : g_cmdLine.locals) {
2678 /* UDP */
2679 g_frontends.push_back(std::unique_ptr<ClientState>(new ClientState(ComboAddress(loc, 53), false, false, 0, "", {})));
2680 /* TCP */
2681 g_frontends.push_back(std::unique_ptr<ClientState>(new ClientState(ComboAddress(loc, 53), true, false, 0, "", {})));
2682 }
2683 }
2684
2685 if (g_frontends.empty()) {
2686 /* UDP */
2687 g_frontends.push_back(std::unique_ptr<ClientState>(new ClientState(ComboAddress("127.0.0.1", 53), false, false, 0, "", {})));
2688 /* TCP */
2689 g_frontends.push_back(std::unique_ptr<ClientState>(new ClientState(ComboAddress("127.0.0.1", 53), true, false, 0, "", {})));
2690 }
2691
2692 g_configurationDone = true;
2693
2694 for(auto& frontend : g_frontends) {
2695 setUpLocalBind(frontend);
2696
2697 if (frontend->tcp == false) {
2698 ++udpBindsCount;
2699 }
2700 else {
2701 ++tcpBindsCount;
2702 }
2703 }
2704
2705 warnlog("dnsdist %s comes with ABSOLUTELY NO WARRANTY. This is free software, and you are welcome to redistribute it according to the terms of the GPL version 2", VERSION);
2706
2707 vector<string> vec;
2708 std::string acls;
2709 g_ACL.getLocal()->toStringVector(&vec);
2710 for(const auto& s : vec) {
2711 if (!acls.empty())
2712 acls += ", ";
2713 acls += s;
2714 }
2715 infolog("ACL allowing queries from: %s", acls.c_str());
2716 vec.clear();
2717 acls.clear();
2718 g_consoleACL.getLocal()->toStringVector(&vec);
2719 for (const auto& entry : vec) {
2720 if (!acls.empty()) {
2721 acls += ", ";
2722 }
2723 acls += entry;
2724 }
2725 infolog("Console ACL allowing connections from: %s", acls.c_str());
2726
2727 #ifdef HAVE_LIBSODIUM
2728 if (g_consoleEnabled && g_consoleKey.empty()) {
2729 warnlog("Warning, the console has been enabled via 'controlSocket()' but no key has been set with 'setKey()' so all connections will fail until a key has been set");
2730 }
2731 #endif
2732
2733 uid_t newgid=0;
2734 gid_t newuid=0;
2735
2736 if(!g_cmdLine.gid.empty())
2737 newgid = strToGID(g_cmdLine.gid.c_str());
2738
2739 if(!g_cmdLine.uid.empty())
2740 newuid = strToUID(g_cmdLine.uid.c_str());
2741
2742 dropGroupPrivs(newgid);
2743 dropUserPrivs(newuid);
2744 try {
2745 /* we might still have capabilities remaining,
2746 for example if we have been started as root
2747 without --uid or --gid (please don't do that)
2748 or as an unprivileged user with ambient
2749 capabilities like CAP_NET_BIND_SERVICE.
2750 */
2751 dropCapabilities(g_capabilitiesToRetain);
2752 }
2753 catch(const std::exception& e) {
2754 warnlog("%s", e.what());
2755 }
2756
2757 /* this need to be done _after_ dropping privileges */
2758 g_delay = new DelayPipe<DelayedPacket>();
2759
2760 if (g_snmpAgent) {
2761 g_snmpAgent->run();
2762 }
2763
2764 g_tcpclientthreads = std::unique_ptr<TCPClientCollection>(new TCPClientCollection(g_maxTCPClientThreads, g_useTCPSinglePipe));
2765
2766 for(auto& t : todo)
2767 t();
2768
2769 localPools = g_pools.getCopy();
2770 /* create the default pool no matter what */
2771 createPoolIfNotExists(localPools, "");
2772 if(g_cmdLine.remotes.size()) {
2773 for(const auto& address : g_cmdLine.remotes) {
2774 auto ret=std::make_shared<DownstreamState>(ComboAddress(address, 53));
2775 addServerToPool(localPools, "", ret);
2776 if (ret->connected && !ret->threadStarted.test_and_set()) {
2777 ret->tid = thread(responderThread, ret);
2778 }
2779 g_dstates.modify([ret](servers_t& servers) { servers.push_back(ret); });
2780 }
2781 }
2782 g_pools.setState(localPools);
2783
2784 if(g_dstates.getLocal()->empty()) {
2785 errlog("No downstream servers defined: all packets will get dropped");
2786 // you might define them later, but you need to know
2787 }
2788
2789 checkFileDescriptorsLimits(udpBindsCount, tcpBindsCount);
2790
2791 for(auto& dss : g_dstates.getCopy()) { // it is a copy, but the internal shared_ptrs are the real deal
2792 if(dss->availability==DownstreamState::Availability::Auto) {
2793 bool newState=upCheck(dss);
2794 warnlog("Marking downstream %s as '%s'", dss->getNameWithAddr(), newState ? "up" : "down");
2795 dss->upStatus = newState;
2796 }
2797 }
2798
2799 for(auto& cs : g_frontends) {
2800 if (cs->dohFrontend != nullptr) {
2801 #ifdef HAVE_DNS_OVER_HTTPS
2802 std::thread t1(dohThread, cs.get());
2803 if (!cs->cpus.empty()) {
2804 mapThreadToCPUList(t1.native_handle(), cs->cpus);
2805 }
2806 t1.detach();
2807 #endif /* HAVE_DNS_OVER_HTTPS */
2808 continue;
2809 }
2810 if (cs->udpFD >= 0) {
2811 thread t1(udpClientThread, cs.get());
2812 if (!cs->cpus.empty()) {
2813 mapThreadToCPUList(t1.native_handle(), cs->cpus);
2814 }
2815 t1.detach();
2816 }
2817 else if (cs->tcpFD >= 0) {
2818 thread t1(tcpAcceptorThread, cs.get());
2819 if (!cs->cpus.empty()) {
2820 mapThreadToCPUList(t1.native_handle(), cs->cpus);
2821 }
2822 t1.detach();
2823 }
2824 }
2825
2826 thread carbonthread(carbonDumpThread);
2827 carbonthread.detach();
2828
2829 thread stattid(maintThread);
2830 stattid.detach();
2831
2832 thread healththread(healthChecksThread);
2833
2834 if (!g_secPollSuffix.empty()) {
2835 thread secpollthread(secPollThread);
2836 secpollthread.detach();
2837 }
2838
2839 if(g_cmdLine.beSupervised) {
2840 #ifdef HAVE_SYSTEMD
2841 sd_notify(0, "READY=1");
2842 #endif
2843 healththread.join();
2844 }
2845 else {
2846 healththread.detach();
2847 doConsole();
2848 }
2849 _exit(EXIT_SUCCESS);
2850
2851 }
2852 catch(const LuaContext::ExecutionErrorException& e) {
2853 try {
2854 errlog("Fatal Lua error: %s", e.what());
2855 std::rethrow_if_nested(e);
2856 } catch(const std::exception& ne) {
2857 errlog("Details: %s", ne.what());
2858 }
2859 catch(PDNSException &ae)
2860 {
2861 errlog("Fatal pdns error: %s", ae.reason);
2862 }
2863 _exit(EXIT_FAILURE);
2864 }
2865 catch(std::exception &e)
2866 {
2867 errlog("Fatal error: %s", e.what());
2868 _exit(EXIT_FAILURE);
2869 }
2870 catch(PDNSException &ae)
2871 {
2872 errlog("Fatal pdns error: %s", ae.reason);
2873 _exit(EXIT_FAILURE);
2874 }
2875
2876 uint64_t getLatencyCount(const std::string&)
2877 {
2878 return g_stats.responses + g_stats.selfAnswered + g_stats.cacheHits;
2879 }