]> git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/dnsdist.cc
dnsdist: Set SO_BINDTODEVICE _before_ binding the health check socket
[thirdparty/pdns.git] / pdns / dnsdist.cc
1 /*
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
8 *
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
22
23 #include "config.h"
24
25 #include <fstream>
26 #include <getopt.h>
27 #include <grp.h>
28 #include <limits>
29 #include <netinet/tcp.h>
30 #include <pwd.h>
31 #include <sys/resource.h>
32 #include <unistd.h>
33
34 #if defined (__OpenBSD__) || defined(__NetBSD__)
35 #include <readline/readline.h>
36 #else
37 #include <editline/readline.h>
38 #endif
39
40 #ifdef HAVE_SYSTEMD
41 #include <systemd/sd-daemon.h>
42 #endif
43
44 #include "dnsdist.hh"
45 #include "dnsdist-cache.hh"
46 #include "dnsdist-console.hh"
47 #include "dnsdist-ecs.hh"
48 #include "dnsdist-lua.hh"
49 #include "dnsdist-rings.hh"
50 #include "dnsdist-secpoll.hh"
51 #include "dnsdist-xpf.hh"
52
53 #include "base64.hh"
54 #include "delaypipe.hh"
55 #include "dolog.hh"
56 #include "dnsname.hh"
57 #include "dnsparser.hh"
58 #include "dnswriter.hh"
59 #include "ednsoptions.hh"
60 #include "gettime.hh"
61 #include "lock.hh"
62 #include "misc.hh"
63 #include "sodcrypto.hh"
64 #include "sstuff.hh"
65 #include "threadname.hh"
66
67 /* Known sins:
68
69 Receiver is currently single threaded
70 not *that* bad actually, but now that we are thread safe, might want to scale
71 */
72
73 /* the Rulaction plan
74 Set of Rules, if one matches, it leads to an Action
75 Both rules and actions could conceivably be Lua based.
76 On the C++ side, both could be inherited from a class Rule and a class Action,
77 on the Lua side we can't do that. */
78
79 using std::atomic;
80 using std::thread;
81 bool g_verbose;
82
83 struct DNSDistStats g_stats;
84 MetricDefinitionStorage g_metricDefinitions;
85
86 uint16_t g_maxOutstanding{std::numeric_limits<uint16_t>::max()};
87 bool g_verboseHealthChecks{false};
88 uint32_t g_staleCacheEntriesTTL{0};
89 bool g_syslog{true};
90 bool g_allowEmptyResponse{false};
91
92 GlobalStateHolder<NetmaskGroup> g_ACL;
93 string g_outputBuffer;
94
95 std::vector<std::shared_ptr<TLSFrontend>> g_tlslocals;
96 std::vector<std::shared_ptr<DOHFrontend>> g_dohlocals;
97 std::vector<std::shared_ptr<DNSCryptContext>> g_dnsCryptLocals;
98 #ifdef HAVE_EBPF
99 shared_ptr<BPFFilter> g_defaultBPFFilter;
100 std::vector<std::shared_ptr<DynBPFFilter> > g_dynBPFFilters;
101 #endif /* HAVE_EBPF */
102 std::vector<std::unique_ptr<ClientState>> g_frontends;
103 GlobalStateHolder<pools_t> g_pools;
104 size_t g_udpVectorSize{1};
105
106 bool g_snmpEnabled{false};
107 bool g_snmpTrapsEnabled{false};
108 DNSDistSNMPAgent* g_snmpAgent{nullptr};
109
110 /* UDP: the grand design. Per socket we listen on for incoming queries there is one thread.
111 Then we have a bunch of connected sockets for talking to downstream servers.
112 We send directly to those sockets.
113
114 For the return path, per downstream server we have a thread that listens to responses.
115
116 Per socket there is an array of 2^16 states, when we send out a packet downstream, we note
117 there the original requestor and the original id. The new ID is the offset in the array.
118
119 When an answer comes in on a socket, we look up the offset by the id, and lob it to the
120 original requestor.
121
122 IDs are assigned by atomic increments of the socket offset.
123 */
124
125 GlobalStateHolder<vector<DNSDistRuleAction> > g_rulactions;
126 GlobalStateHolder<vector<DNSDistResponseRuleAction> > g_resprulactions;
127 GlobalStateHolder<vector<DNSDistResponseRuleAction> > g_cachehitresprulactions;
128 GlobalStateHolder<vector<DNSDistResponseRuleAction> > g_selfansweredresprulactions;
129
130 Rings g_rings;
131 QueryCount g_qcount;
132
133 GlobalStateHolder<servers_t> g_dstates;
134 GlobalStateHolder<NetmaskTree<DynBlock>> g_dynblockNMG;
135 GlobalStateHolder<SuffixMatchTree<DynBlock>> g_dynblockSMT;
136 DNSAction::Action g_dynBlockAction = DNSAction::Action::Drop;
137 int g_tcpRecvTimeout{2};
138 int g_tcpSendTimeout{2};
139 int g_udpTimeout{2};
140
141 bool g_servFailOnNoPolicy{false};
142 bool g_truncateTC{false};
143 bool g_fixupCase{false};
144 bool g_preserveTrailingData{false};
145 bool g_roundrobinFailOnNoServer{false};
146
147 static void truncateTC(char* packet, uint16_t* len, size_t responseSize, unsigned int consumed)
148 try
149 {
150 bool hadEDNS = false;
151 uint16_t payloadSize = 0;
152 uint16_t z = 0;
153
154 if (g_addEDNSToSelfGeneratedResponses) {
155 hadEDNS = getEDNSUDPPayloadSizeAndZ(packet, *len, &payloadSize, &z);
156 }
157
158 *len=static_cast<uint16_t>(sizeof(dnsheader)+consumed+DNS_TYPE_SIZE+DNS_CLASS_SIZE);
159 struct dnsheader* dh = reinterpret_cast<struct dnsheader*>(packet);
160 dh->ancount = dh->arcount = dh->nscount = 0;
161
162 if (hadEDNS) {
163 addEDNS(dh, *len, responseSize, z & EDNS_HEADER_FLAG_DO, payloadSize, 0);
164 }
165 }
166 catch(...)
167 {
168 g_stats.truncFail++;
169 }
170
171 struct DelayedPacket
172 {
173 int fd;
174 string packet;
175 ComboAddress destination;
176 ComboAddress origDest;
177 void operator()()
178 {
179 ssize_t res;
180 if(origDest.sin4.sin_family == 0) {
181 res = sendto(fd, packet.c_str(), packet.size(), 0, (struct sockaddr*)&destination, destination.getSocklen());
182 }
183 else {
184 res = sendfromto(fd, packet.c_str(), packet.size(), 0, origDest, destination);
185 }
186 if (res == -1) {
187 int err = errno;
188 vinfolog("Error sending delayed response to %s: %s", destination.toStringWithPort(), strerror(err));
189 }
190 }
191 };
192
193 DelayPipe<DelayedPacket>* g_delay = nullptr;
194
195 void doLatencyStats(double udiff)
196 {
197 if(udiff < 1000) ++g_stats.latency0_1;
198 else if(udiff < 10000) ++g_stats.latency1_10;
199 else if(udiff < 50000) ++g_stats.latency10_50;
200 else if(udiff < 100000) ++g_stats.latency50_100;
201 else if(udiff < 1000000) ++g_stats.latency100_1000;
202 else ++g_stats.latencySlow;
203 g_stats.latencySum += udiff / 1000;
204
205 auto doAvg = [](double& var, double n, double weight) {
206 var = (weight -1) * var/weight + n/weight;
207 };
208
209 doAvg(g_stats.latencyAvg100, udiff, 100);
210 doAvg(g_stats.latencyAvg1000, udiff, 1000);
211 doAvg(g_stats.latencyAvg10000, udiff, 10000);
212 doAvg(g_stats.latencyAvg1000000, udiff, 1000000);
213 }
214
215 bool responseContentMatches(const char* response, const uint16_t responseLen, const DNSName& qname, const uint16_t qtype, const uint16_t qclass, const ComboAddress& remote, unsigned int& consumed)
216 {
217 if (responseLen < sizeof(dnsheader)) {
218 return false;
219 }
220
221 const struct dnsheader* dh = reinterpret_cast<const struct dnsheader*>(response);
222 if (dh->qdcount == 0) {
223 if ((dh->rcode != RCode::NoError && dh->rcode != RCode::NXDomain) || g_allowEmptyResponse) {
224 return true;
225 }
226 else {
227 ++g_stats.nonCompliantResponses;
228 return false;
229 }
230 }
231
232 uint16_t rqtype, rqclass;
233 DNSName rqname;
234 try {
235 rqname=DNSName(response, responseLen, sizeof(dnsheader), false, &rqtype, &rqclass, &consumed);
236 }
237 catch(const std::exception& e) {
238 if(responseLen > 0 && static_cast<size_t>(responseLen) > sizeof(dnsheader)) {
239 infolog("Backend %s sent us a response with id %d that did not parse: %s", remote.toStringWithPort(), ntohs(dh->id), e.what());
240 }
241 ++g_stats.nonCompliantResponses;
242 return false;
243 }
244
245 if (rqtype != qtype || rqclass != qclass || rqname != qname) {
246 return false;
247 }
248
249 return true;
250 }
251
252 static void restoreFlags(struct dnsheader* dh, uint16_t origFlags)
253 {
254 static const uint16_t rdMask = 1 << FLAGS_RD_OFFSET;
255 static const uint16_t cdMask = 1 << FLAGS_CD_OFFSET;
256 static const uint16_t restoreFlagsMask = UINT16_MAX & ~(rdMask | cdMask);
257 uint16_t * flags = getFlagsFromDNSHeader(dh);
258 /* clear the flags we are about to restore */
259 *flags &= restoreFlagsMask;
260 /* only keep the flags we want to restore */
261 origFlags &= ~restoreFlagsMask;
262 /* set the saved flags as they were */
263 *flags |= origFlags;
264 }
265
266 static bool fixUpQueryTurnedResponse(DNSQuestion& dq, const uint16_t origFlags)
267 {
268 restoreFlags(dq.dh, origFlags);
269
270 return addEDNSToQueryTurnedResponse(dq);
271 }
272
273 static bool fixUpResponse(char** response, uint16_t* responseLen, size_t* responseSize, const DNSName& qname, uint16_t origFlags, bool ednsAdded, bool ecsAdded, std::vector<uint8_t>& rewrittenResponse, uint16_t addRoom, bool* zeroScope)
274 {
275 if (*responseLen < sizeof(dnsheader)) {
276 return false;
277 }
278
279 struct dnsheader* dh = reinterpret_cast<struct dnsheader*>(*response);
280 restoreFlags(dh, origFlags);
281
282 if (*responseLen == sizeof(dnsheader)) {
283 return true;
284 }
285
286 if(g_fixupCase) {
287 string realname = qname.toDNSString();
288 if (*responseLen >= (sizeof(dnsheader) + realname.length())) {
289 memcpy(*response + sizeof(dnsheader), realname.c_str(), realname.length());
290 }
291 }
292
293 if (ednsAdded || ecsAdded) {
294 uint16_t optStart;
295 size_t optLen = 0;
296 bool last = false;
297
298 const std::string responseStr(*response, *responseLen);
299 int res = locateEDNSOptRR(responseStr, &optStart, &optLen, &last);
300
301 if (res == 0) {
302 if (zeroScope) { // this finds if an EDNS Client Subnet scope was set, and if it is 0
303 size_t optContentStart = 0;
304 uint16_t optContentLen = 0;
305 /* we need at least 4 bytes after the option length (family: 2, source prefix-length: 1, scope prefix-length: 1) */
306 if (isEDNSOptionInOpt(responseStr, optStart, optLen, EDNSOptionCode::ECS, &optContentStart, &optContentLen) && optContentLen >= 4) {
307 /* see if the EDNS Client Subnet SCOPE PREFIX-LENGTH byte in position 3 is set to 0, which is the only thing
308 we care about. */
309 *zeroScope = responseStr.at(optContentStart + 3) == 0;
310 }
311 }
312
313 if (ednsAdded) {
314 /* we added the entire OPT RR,
315 therefore we need to remove it entirely */
316 if (last) {
317 /* simply remove the last AR */
318 *responseLen -= optLen;
319 uint16_t arcount = ntohs(dh->arcount);
320 arcount--;
321 dh->arcount = htons(arcount);
322 }
323 else {
324 /* Removing an intermediary RR could lead to compression error */
325 if (rewriteResponseWithoutEDNS(responseStr, rewrittenResponse) == 0) {
326 *responseLen = rewrittenResponse.size();
327 if (addRoom && (UINT16_MAX - *responseLen) > addRoom) {
328 rewrittenResponse.reserve(*responseLen + addRoom);
329 }
330 *responseSize = rewrittenResponse.capacity();
331 *response = reinterpret_cast<char*>(rewrittenResponse.data());
332 }
333 else {
334 warnlog("Error rewriting content");
335 }
336 }
337 }
338 else {
339 /* the OPT RR was already present, but without ECS,
340 we need to remove the ECS option if any */
341 if (last) {
342 /* nothing after the OPT RR, we can simply remove the
343 ECS option */
344 size_t existingOptLen = optLen;
345 removeEDNSOptionFromOPT(*response + optStart, &optLen, EDNSOptionCode::ECS);
346 *responseLen -= (existingOptLen - optLen);
347 }
348 else {
349 /* Removing an intermediary RR could lead to compression error */
350 if (rewriteResponseWithoutEDNSOption(responseStr, EDNSOptionCode::ECS, rewrittenResponse) == 0) {
351 *responseLen = rewrittenResponse.size();
352 if (addRoom && (UINT16_MAX - *responseLen) > addRoom) {
353 rewrittenResponse.reserve(*responseLen + addRoom);
354 }
355 *responseSize = rewrittenResponse.capacity();
356 *response = reinterpret_cast<char*>(rewrittenResponse.data());
357 }
358 else {
359 warnlog("Error rewriting content");
360 }
361 }
362 }
363 }
364 }
365
366 return true;
367 }
368
369 #ifdef HAVE_DNSCRYPT
370 static bool encryptResponse(char* response, uint16_t* responseLen, size_t responseSize, bool tcp, std::shared_ptr<DNSCryptQuery> dnsCryptQuery, dnsheader** dh, dnsheader* dhCopy)
371 {
372 if (dnsCryptQuery) {
373 uint16_t encryptedResponseLen = 0;
374
375 /* save the original header before encrypting it in place */
376 if (dh != nullptr && *dh != nullptr && dhCopy != nullptr) {
377 memcpy(dhCopy, *dh, sizeof(dnsheader));
378 *dh = dhCopy;
379 }
380
381 int res = dnsCryptQuery->encryptResponse(response, *responseLen, responseSize, tcp, &encryptedResponseLen);
382 if (res == 0) {
383 *responseLen = encryptedResponseLen;
384 } else {
385 /* dropping response */
386 vinfolog("Error encrypting the response, dropping.");
387 return false;
388 }
389 }
390 return true;
391 }
392 #endif /* HAVE_DNSCRYPT */
393
394 static bool applyRulesToResponse(LocalStateHolder<vector<DNSDistResponseRuleAction> >& localRespRulactions, DNSResponse& dr)
395 {
396 DNSResponseAction::Action action=DNSResponseAction::Action::None;
397 std::string ruleresult;
398 for(const auto& lr : *localRespRulactions) {
399 if(lr.d_rule->matches(&dr)) {
400 lr.d_rule->d_matches++;
401 action=(*lr.d_action)(&dr, &ruleresult);
402 switch(action) {
403 case DNSResponseAction::Action::Allow:
404 return true;
405 break;
406 case DNSResponseAction::Action::Drop:
407 return false;
408 break;
409 case DNSResponseAction::Action::HeaderModify:
410 return true;
411 break;
412 case DNSResponseAction::Action::ServFail:
413 dr.dh->rcode = RCode::ServFail;
414 return true;
415 break;
416 /* non-terminal actions follow */
417 case DNSResponseAction::Action::Delay:
418 dr.delayMsec = static_cast<int>(pdns_stou(ruleresult)); // sorry
419 break;
420 case DNSResponseAction::Action::None:
421 break;
422 }
423 }
424 }
425
426 return true;
427 }
428
429 bool processResponse(char** response, uint16_t* responseLen, size_t* responseSize, LocalStateHolder<vector<DNSDistResponseRuleAction> >& localRespRulactions, DNSResponse& dr, size_t addRoom, std::vector<uint8_t>& rewrittenResponse, bool muted)
430 {
431 if (!applyRulesToResponse(localRespRulactions, dr)) {
432 return false;
433 }
434
435 bool zeroScope = false;
436 if (!fixUpResponse(response, responseLen, responseSize, *dr.qname, dr.origFlags, dr.ednsAdded, dr.ecsAdded, rewrittenResponse, addRoom, dr.useZeroScope ? &zeroScope : nullptr)) {
437 return false;
438 }
439
440 if (dr.packetCache && !dr.skipCache) {
441 if (!dr.useZeroScope) {
442 /* if the query was not suitable for zero-scope, for
443 example because it had an existing ECS entry so the hash is
444 not really 'no ECS', so just insert it for the existing subnet
445 since:
446 - we don't have the correct hash for a non-ECS query
447 - inserting with hash computed before the ECS replacement but with
448 the subnet extracted _after_ the replacement would not work.
449 */
450 zeroScope = false;
451 }
452 // if zeroScope, pass the pre-ECS hash-key and do not pass the subnet to the cache
453 dr.packetCache->insert(zeroScope ? dr.cacheKeyNoECS : dr.cacheKey, zeroScope ? boost::none : dr.subnet, dr.origFlags, dr.dnssecOK, *dr.qname, dr.qtype, dr.qclass, *response, *responseLen, dr.tcp, dr.dh->rcode, dr.tempFailureTTL);
454 }
455
456 #ifdef HAVE_DNSCRYPT
457 if (!muted) {
458 if (!encryptResponse(*response, responseLen, *responseSize, dr.tcp, dr.dnsCryptQuery, nullptr, nullptr)) {
459 return false;
460 }
461 }
462 #endif /* HAVE_DNSCRYPT */
463
464 return true;
465 }
466
467 static bool sendUDPResponse(int origFD, const char* response, const uint16_t responseLen, const int delayMsec, const ComboAddress& origDest, const ComboAddress& origRemote)
468 {
469 if(delayMsec && g_delay) {
470 DelayedPacket dp{origFD, string(response,responseLen), origRemote, origDest};
471 g_delay->submit(dp, delayMsec);
472 }
473 else {
474 ssize_t res;
475 if(origDest.sin4.sin_family == 0) {
476 res = sendto(origFD, response, responseLen, 0, reinterpret_cast<const struct sockaddr*>(&origRemote), origRemote.getSocklen());
477 }
478 else {
479 res = sendfromto(origFD, response, responseLen, 0, origDest, origRemote);
480 }
481 if (res == -1) {
482 int err = errno;
483 vinfolog("Error sending response to %s: %s", origRemote.toStringWithPort(), stringerror(err));
484 }
485 }
486
487 return true;
488 }
489
490
491 int pickBackendSocketForSending(std::shared_ptr<DownstreamState>& state)
492 {
493 return state->sockets[state->socketsOffset++ % state->sockets.size()];
494 }
495
496 static void pickBackendSocketsReadyForReceiving(const std::shared_ptr<DownstreamState>& state, std::vector<int>& ready)
497 {
498 ready.clear();
499
500 if (state->sockets.size() == 1) {
501 ready.push_back(state->sockets[0]);
502 return ;
503 }
504
505 {
506 std::lock_guard<std::mutex> lock(state->socketsLock);
507 state->mplexer->getAvailableFDs(ready, -1);
508 }
509 }
510
511 // listens on a dedicated socket, lobs answers from downstream servers to original requestors
512 void responderThread(std::shared_ptr<DownstreamState> dss)
513 try {
514 setThreadName("dnsdist/respond");
515 auto localRespRulactions = g_resprulactions.getLocal();
516 char packet[4096 + DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE];
517 static_assert(sizeof(packet) <= UINT16_MAX, "Packet size should fit in a uint16_t");
518 /* when the answer is encrypted in place, we need to get a copy
519 of the original header before encryption to fill the ring buffer */
520 dnsheader cleartextDH;
521 vector<uint8_t> rewrittenResponse;
522
523 uint16_t queryId = 0;
524 std::vector<int> sockets;
525 sockets.reserve(dss->sockets.size());
526
527 for(;;) {
528 dnsheader* dh = reinterpret_cast<struct dnsheader*>(packet);
529 try {
530 pickBackendSocketsReadyForReceiving(dss, sockets);
531 for (const auto& fd : sockets) {
532 ssize_t got = recv(fd, packet, sizeof(packet), 0);
533 char * response = packet;
534 size_t responseSize = sizeof(packet);
535
536 if (got < 0 || static_cast<size_t>(got) < sizeof(dnsheader))
537 continue;
538
539 uint16_t responseLen = static_cast<uint16_t>(got);
540 queryId = dh->id;
541
542 if(queryId >= dss->idStates.size()) {
543 continue;
544 }
545
546 IDState* ids = &dss->idStates[queryId];
547 int64_t usageIndicator = ids->usageIndicator;
548
549 if(!IDState::isInUse(usageIndicator)) {
550 /* the corresponding state is marked as not in use, meaning that:
551 - it was already cleaned up by another thread and the state is gone ;
552 - we already got a response for this query and this one is a duplicate.
553 Either way, we don't touch it.
554 */
555 continue;
556 }
557
558 /* read the potential DOHUnit state as soon as possible, but don't use it
559 until we have confirmed that we own this state by updating usageIndicator */
560 auto du = ids->du;
561 /* setting age to 0 to prevent the maintainer thread from
562 cleaning this IDS while we process the response.
563 */
564 ids->age = 0;
565 int origFD = ids->origFD;
566
567 unsigned int consumed = 0;
568 if (!responseContentMatches(response, responseLen, ids->qname, ids->qtype, ids->qclass, dss->remote, consumed)) {
569 continue;
570 }
571
572 bool isDoH = du != nullptr;
573 /* atomically mark the state as available, but only if it has not been altered
574 in the meantime */
575 if (ids->tryMarkUnused(usageIndicator)) {
576 /* clear the potential DOHUnit asap, it's ours now
577 and since we just marked the state as unused,
578 someone could overwrite it. */
579 ids->du = nullptr;
580 /* we only decrement the outstanding counter if the value was not
581 altered in the meantime, which would mean that the state has been actively reused
582 and the other thread has not incremented the outstanding counter, so we don't
583 want it to be decremented twice. */
584 --dss->outstanding; // you'd think an attacker could game this, but we're using connected socket
585 } else {
586 /* someone updated the state in the meantime, we can't touch the existing pointer */
587 du = nullptr;
588 /* since the state has been updated, we can't safely access it so let's just drop
589 this response */
590 continue;
591 }
592
593 if(dh->tc && g_truncateTC) {
594 truncateTC(response, &responseLen, responseSize, consumed);
595 }
596
597 dh->id = ids->origID;
598
599 uint16_t addRoom = 0;
600 DNSResponse dr = makeDNSResponseFromIDState(*ids, dh, sizeof(packet), responseLen, false);
601 if (dr.dnsCryptQuery) {
602 addRoom = DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE;
603 }
604
605 memcpy(&cleartextDH, dr.dh, sizeof(cleartextDH));
606 if (!processResponse(&response, &responseLen, &responseSize, localRespRulactions, dr, addRoom, rewrittenResponse, ids->cs && ids->cs->muted)) {
607 continue;
608 }
609
610 if (ids->cs && !ids->cs->muted) {
611 if (du) {
612 #ifdef HAVE_DNS_OVER_HTTPS
613 // DoH query
614 du->response = std::string(response, responseLen);
615 if (send(du->rsock, &du, sizeof(du), 0) != sizeof(du)) {
616 /* at this point we have the only remaining pointer on this
617 DOHUnit object since we did set ids->du to nullptr earlier */
618 delete du;
619 }
620 #endif /* HAVE_DNS_OVER_HTTPS */
621 du = nullptr;
622 }
623 else {
624 ComboAddress empty;
625 empty.sin4.sin_family = 0;
626 /* if ids->destHarvested is false, origDest holds the listening address.
627 We don't want to use that as a source since it could be 0.0.0.0 for example. */
628 sendUDPResponse(origFD, response, responseLen, dr.delayMsec, ids->destHarvested ? ids->origDest : empty, ids->origRemote);
629 }
630 }
631
632 ++g_stats.responses;
633 ++ids->cs->responses;
634 ++dss->responses;
635
636 double udiff = ids->sentTime.udiff();
637 vinfolog("Got answer from %s, relayed to %s%s, took %f usec", dss->remote.toStringWithPort(), ids->origRemote.toStringWithPort(),
638 isDoH ? " (https)": "", udiff);
639
640 struct timespec ts;
641 gettime(&ts);
642 g_rings.insertResponse(ts, *dr.remote, *dr.qname, dr.qtype, static_cast<unsigned int>(udiff), static_cast<unsigned int>(got), cleartextDH, dss->remote);
643
644 switch (cleartextDH.rcode) {
645 case RCode::NXDomain:
646 ++g_stats.frontendNXDomain;
647 break;
648 case RCode::ServFail:
649 ++g_stats.servfailResponses;
650 ++g_stats.frontendServFail;
651 break;
652 case RCode::NoError:
653 ++g_stats.frontendNoError;
654 break;
655 }
656 dss->latencyUsec = (127.0 * dss->latencyUsec / 128.0) + udiff/128.0;
657
658 doLatencyStats(udiff);
659
660 rewrittenResponse.clear();
661 }
662 }
663 catch(const std::exception& e){
664 vinfolog("Got an error in UDP responder thread while parsing a response from %s, id %d: %s", dss->remote.toStringWithPort(), queryId, e.what());
665 }
666 }
667 }
668 catch(const std::exception& e)
669 {
670 errlog("UDP responder thread died because of exception: %s", e.what());
671 }
672 catch(const PDNSException& e)
673 {
674 errlog("UDP responder thread died because of PowerDNS exception: %s", e.reason);
675 }
676 catch(...)
677 {
678 errlog("UDP responder thread died because of an exception: %s", "unknown");
679 }
680
681 bool DownstreamState::reconnect()
682 {
683 std::unique_lock<std::mutex> tl(connectLock, std::try_to_lock);
684 if (!tl.owns_lock()) {
685 /* we are already reconnecting */
686 return false;
687 }
688
689 connected = false;
690 for (auto& fd : sockets) {
691 if (fd != -1) {
692 if (sockets.size() > 1) {
693 std::lock_guard<std::mutex> lock(socketsLock);
694 mplexer->removeReadFD(fd);
695 }
696 /* shutdown() is needed to wake up recv() in the responderThread */
697 shutdown(fd, SHUT_RDWR);
698 close(fd);
699 fd = -1;
700 }
701 if (!IsAnyAddress(remote)) {
702 fd = SSocket(remote.sin4.sin_family, SOCK_DGRAM, 0);
703 if (!IsAnyAddress(sourceAddr)) {
704 SSetsockopt(fd, SOL_SOCKET, SO_REUSEADDR, 1);
705 if (!sourceItfName.empty()) {
706 #ifdef SO_BINDTODEVICE
707 int res = setsockopt(fd, SOL_SOCKET, SO_BINDTODEVICE, sourceItfName.c_str(), sourceItfName.length());
708 if (res != 0) {
709 infolog("Error setting up the interface on backend socket '%s': %s", remote.toStringWithPort(), stringerror());
710 }
711 #endif
712 }
713
714 SBind(fd, sourceAddr);
715 }
716 try {
717 SConnect(fd, remote);
718 if (sockets.size() > 1) {
719 std::lock_guard<std::mutex> lock(socketsLock);
720 mplexer->addReadFD(fd, [](int, boost::any) {});
721 }
722 connected = true;
723 }
724 catch(const std::runtime_error& error) {
725 infolog("Error connecting to new server with address %s: %s", remote.toStringWithPort(), error.what());
726 connected = false;
727 break;
728 }
729 }
730 }
731
732 /* if at least one (re-)connection failed, close all sockets */
733 if (!connected) {
734 for (auto& fd : sockets) {
735 if (fd != -1) {
736 if (sockets.size() > 1) {
737 std::lock_guard<std::mutex> lock(socketsLock);
738 mplexer->removeReadFD(fd);
739 }
740 /* shutdown() is needed to wake up recv() in the responderThread */
741 shutdown(fd, SHUT_RDWR);
742 close(fd);
743 fd = -1;
744 }
745 }
746 }
747
748 return connected;
749 }
750 void DownstreamState::hash()
751 {
752 vinfolog("Computing hashes for id=%s and weight=%d", id, weight);
753 auto w = weight;
754 WriteLock wl(&d_lock);
755 hashes.clear();
756 while (w > 0) {
757 std::string uuid = boost::str(boost::format("%s-%d") % id % w);
758 unsigned int wshash = burtleCI((const unsigned char*)uuid.c_str(), uuid.size(), g_hashperturb);
759 hashes.insert(wshash);
760 --w;
761 }
762 }
763
764 void DownstreamState::setId(const boost::uuids::uuid& newId)
765 {
766 id = newId;
767 // compute hashes only if already done
768 if (!hashes.empty()) {
769 hash();
770 }
771 }
772
773 void DownstreamState::setWeight(int newWeight)
774 {
775 if (newWeight < 1) {
776 errlog("Error setting server's weight: downstream weight value must be greater than 0.");
777 return ;
778 }
779 weight = newWeight;
780 if (!hashes.empty()) {
781 hash();
782 }
783 }
784
785 DownstreamState::DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf_, const std::string& sourceItfName_, size_t numberOfSockets): sourceItfName(sourceItfName_), remote(remote_), sourceAddr(sourceAddr_), sourceItf(sourceItf_)
786 {
787 pthread_rwlock_init(&d_lock, nullptr);
788 id = getUniqueID();
789 threadStarted.clear();
790
791 mplexer = std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
792
793 sockets.resize(numberOfSockets);
794 for (auto& fd : sockets) {
795 fd = -1;
796 }
797
798 if (!IsAnyAddress(remote)) {
799 reconnect();
800 idStates.resize(g_maxOutstanding);
801 sw.start();
802 infolog("Added downstream server %s", remote.toStringWithPort());
803 }
804
805 }
806
807 std::mutex g_luamutex;
808 LuaContext g_lua;
809
810 GlobalStateHolder<ServerPolicy> g_policy;
811
812 shared_ptr<DownstreamState> firstAvailable(const NumberedServerVector& servers, const DNSQuestion* dq)
813 {
814 for(auto& d : servers) {
815 if(d.second->isUp() && d.second->qps.check())
816 return d.second;
817 }
818 return leastOutstanding(servers, dq);
819 }
820
821 // get server with least outstanding queries, and within those, with the lowest order, and within those: the fastest
822 shared_ptr<DownstreamState> leastOutstanding(const NumberedServerVector& servers, const DNSQuestion* dq)
823 {
824 if (servers.size() == 1 && servers[0].second->isUp()) {
825 return servers[0].second;
826 }
827
828 vector<pair<tuple<int,int,double>, shared_ptr<DownstreamState>>> poss;
829 /* so you might wonder, why do we go through this trouble? The data on which we sort could change during the sort,
830 which would suck royally and could even lead to crashes. So first we snapshot on what we sort, and then we sort */
831 poss.reserve(servers.size());
832 for(auto& d : servers) {
833 if(d.second->isUp()) {
834 poss.push_back({make_tuple(d.second->outstanding.load(), d.second->order, d.second->latencyUsec), d.second});
835 }
836 }
837 if(poss.empty())
838 return shared_ptr<DownstreamState>();
839 nth_element(poss.begin(), poss.begin(), poss.end(), [](const decltype(poss)::value_type& a, const decltype(poss)::value_type& b) { return a.first < b.first; });
840 return poss.begin()->second;
841 }
842
843 shared_ptr<DownstreamState> valrandom(unsigned int val, const NumberedServerVector& servers, const DNSQuestion* dq)
844 {
845 vector<pair<int, shared_ptr<DownstreamState>>> poss;
846 int sum = 0;
847 int max = std::numeric_limits<int>::max();
848
849 for(auto& d : servers) { // w=1, w=10 -> 1, 11
850 if(d.second->isUp()) {
851 // Don't overflow sum when adding high weights
852 if(d.second->weight > max - sum) {
853 sum = max;
854 } else {
855 sum += d.second->weight;
856 }
857
858 poss.push_back({sum, d.second});
859 }
860 }
861
862 // Catch poss & sum are empty to avoid SIGFPE
863 if(poss.empty())
864 return shared_ptr<DownstreamState>();
865
866 int r = val % sum;
867 auto p = upper_bound(poss.begin(), poss.end(),r, [](int r_, const decltype(poss)::value_type& a) { return r_ < a.first;});
868 if(p==poss.end())
869 return shared_ptr<DownstreamState>();
870 return p->second;
871 }
872
873 shared_ptr<DownstreamState> wrandom(const NumberedServerVector& servers, const DNSQuestion* dq)
874 {
875 return valrandom(random(), servers, dq);
876 }
877
878 uint32_t g_hashperturb;
879 shared_ptr<DownstreamState> whashed(const NumberedServerVector& servers, const DNSQuestion* dq)
880 {
881 return valrandom(dq->qname->hash(g_hashperturb), servers, dq);
882 }
883
884 shared_ptr<DownstreamState> chashed(const NumberedServerVector& servers, const DNSQuestion* dq)
885 {
886 unsigned int qhash = dq->qname->hash(g_hashperturb);
887 unsigned int sel = std::numeric_limits<unsigned int>::max();
888 unsigned int min = std::numeric_limits<unsigned int>::max();
889 shared_ptr<DownstreamState> ret = nullptr, first = nullptr;
890
891 for (const auto& d: servers) {
892 if (d.second->isUp()) {
893 // make sure hashes have been computed
894 if (d.second->hashes.empty()) {
895 d.second->hash();
896 }
897 {
898 ReadLock rl(&(d.second->d_lock));
899 const auto& server = d.second;
900 // we want to keep track of the last hash
901 if (min > *(server->hashes.begin())) {
902 min = *(server->hashes.begin());
903 first = server;
904 }
905
906 auto hash_it = server->hashes.lower_bound(qhash);
907 if (hash_it != server->hashes.end()) {
908 if (*hash_it < sel) {
909 sel = *hash_it;
910 ret = server;
911 }
912 }
913 }
914 }
915 }
916 if (ret != nullptr) {
917 return ret;
918 }
919 if (first != nullptr) {
920 return first;
921 }
922 return shared_ptr<DownstreamState>();
923 }
924
925 shared_ptr<DownstreamState> roundrobin(const NumberedServerVector& servers, const DNSQuestion* dq)
926 {
927 NumberedServerVector poss;
928
929 for(auto& d : servers) {
930 if(d.second->isUp()) {
931 poss.push_back(d);
932 }
933 }
934
935 const auto *res=&poss;
936 if(poss.empty() && !g_roundrobinFailOnNoServer)
937 res = &servers;
938
939 if(res->empty())
940 return shared_ptr<DownstreamState>();
941
942 static unsigned int counter;
943
944 return (*res)[(counter++) % res->size()].second;
945 }
946
947 ComboAddress g_serverControl{"127.0.0.1:5199"};
948
949 std::shared_ptr<ServerPool> createPoolIfNotExists(pools_t& pools, const string& poolName)
950 {
951 std::shared_ptr<ServerPool> pool;
952 pools_t::iterator it = pools.find(poolName);
953 if (it != pools.end()) {
954 pool = it->second;
955 }
956 else {
957 if (!poolName.empty())
958 vinfolog("Creating pool %s", poolName);
959 pool = std::make_shared<ServerPool>();
960 pools.insert(std::pair<std::string,std::shared_ptr<ServerPool> >(poolName, pool));
961 }
962 return pool;
963 }
964
965 void setPoolPolicy(pools_t& pools, const string& poolName, std::shared_ptr<ServerPolicy> policy)
966 {
967 std::shared_ptr<ServerPool> pool = createPoolIfNotExists(pools, poolName);
968 if (!poolName.empty()) {
969 vinfolog("Setting pool %s server selection policy to %s", poolName, policy->name);
970 } else {
971 vinfolog("Setting default pool server selection policy to %s", policy->name);
972 }
973 pool->policy = policy;
974 }
975
976 void addServerToPool(pools_t& pools, const string& poolName, std::shared_ptr<DownstreamState> server)
977 {
978 std::shared_ptr<ServerPool> pool = createPoolIfNotExists(pools, poolName);
979 if (!poolName.empty()) {
980 vinfolog("Adding server to pool %s", poolName);
981 } else {
982 vinfolog("Adding server to default pool");
983 }
984 pool->addServer(server);
985 }
986
987 void removeServerFromPool(pools_t& pools, const string& poolName, std::shared_ptr<DownstreamState> server)
988 {
989 std::shared_ptr<ServerPool> pool = getPool(pools, poolName);
990
991 if (!poolName.empty()) {
992 vinfolog("Removing server from pool %s", poolName);
993 }
994 else {
995 vinfolog("Removing server from default pool");
996 }
997
998 pool->removeServer(server);
999 }
1000
1001 std::shared_ptr<ServerPool> getPool(const pools_t& pools, const std::string& poolName)
1002 {
1003 pools_t::const_iterator it = pools.find(poolName);
1004
1005 if (it == pools.end()) {
1006 throw std::out_of_range("No pool named " + poolName);
1007 }
1008
1009 return it->second;
1010 }
1011
1012 NumberedServerVector getDownstreamCandidates(const pools_t& pools, const std::string& poolName)
1013 {
1014 std::shared_ptr<ServerPool> pool = getPool(pools, poolName);
1015 return pool->getServers();
1016 }
1017
1018 static void spoofResponseFromString(DNSQuestion& dq, const string& spoofContent)
1019 {
1020 string result;
1021
1022 std::vector<std::string> addrs;
1023 stringtok(addrs, spoofContent, " ,");
1024
1025 if (addrs.size() == 1) {
1026 try {
1027 ComboAddress spoofAddr(spoofContent);
1028 SpoofAction sa({spoofAddr});
1029 sa(&dq, &result);
1030 }
1031 catch(const PDNSException &e) {
1032 SpoofAction sa(spoofContent); // CNAME then
1033 sa(&dq, &result);
1034 }
1035 } else {
1036 std::vector<ComboAddress> cas;
1037 for (const auto& addr : addrs) {
1038 try {
1039 cas.push_back(ComboAddress(addr));
1040 }
1041 catch (...) {
1042 }
1043 }
1044 SpoofAction sa(cas);
1045 sa(&dq, &result);
1046 }
1047 }
1048
1049 bool processRulesResult(const DNSAction::Action& action, DNSQuestion& dq, std::string& ruleresult, bool& drop)
1050 {
1051 switch(action) {
1052 case DNSAction::Action::Allow:
1053 return true;
1054 break;
1055 case DNSAction::Action::Drop:
1056 ++g_stats.ruleDrop;
1057 drop = true;
1058 return true;
1059 break;
1060 case DNSAction::Action::Nxdomain:
1061 dq.dh->rcode = RCode::NXDomain;
1062 dq.dh->qr=true;
1063 ++g_stats.ruleNXDomain;
1064 return true;
1065 break;
1066 case DNSAction::Action::Refused:
1067 dq.dh->rcode = RCode::Refused;
1068 dq.dh->qr=true;
1069 ++g_stats.ruleRefused;
1070 return true;
1071 break;
1072 case DNSAction::Action::ServFail:
1073 dq.dh->rcode = RCode::ServFail;
1074 dq.dh->qr=true;
1075 ++g_stats.ruleServFail;
1076 return true;
1077 break;
1078 case DNSAction::Action::Spoof:
1079 spoofResponseFromString(dq, ruleresult);
1080 return true;
1081 break;
1082 case DNSAction::Action::Truncate:
1083 dq.dh->tc = true;
1084 dq.dh->qr = true;
1085 return true;
1086 break;
1087 case DNSAction::Action::HeaderModify:
1088 return true;
1089 break;
1090 case DNSAction::Action::Pool:
1091 dq.poolname=ruleresult;
1092 return true;
1093 break;
1094 case DNSAction::Action::NoRecurse:
1095 dq.dh->rd = false;
1096 return true;
1097 break;
1098 /* non-terminal actions follow */
1099 case DNSAction::Action::Delay:
1100 dq.delayMsec = static_cast<int>(pdns_stou(ruleresult)); // sorry
1101 break;
1102 case DNSAction::Action::None:
1103 /* fall-through */
1104 case DNSAction::Action::NoOp:
1105 break;
1106 }
1107
1108 /* false means that we don't stop the processing */
1109 return false;
1110 }
1111
1112
1113 static bool applyRulesToQuery(LocalHolders& holders, DNSQuestion& dq, const struct timespec& now)
1114 {
1115 g_rings.insertQuery(now, *dq.remote, *dq.qname, dq.qtype, dq.len, *dq.dh);
1116
1117 if(g_qcount.enabled) {
1118 string qname = (*dq.qname).toLogString();
1119 bool countQuery{true};
1120 if(g_qcount.filter) {
1121 std::lock_guard<std::mutex> lock(g_luamutex);
1122 std::tie (countQuery, qname) = g_qcount.filter(&dq);
1123 }
1124
1125 if(countQuery) {
1126 WriteLock wl(&g_qcount.queryLock);
1127 if(!g_qcount.records.count(qname)) {
1128 g_qcount.records[qname] = 0;
1129 }
1130 g_qcount.records[qname]++;
1131 }
1132 }
1133
1134 if(auto got = holders.dynNMGBlock->lookup(*dq.remote)) {
1135 auto updateBlockStats = [&got]() {
1136 ++g_stats.dynBlocked;
1137 got->second.blocks++;
1138 };
1139
1140 if(now < got->second.until) {
1141 DNSAction::Action action = got->second.action;
1142 if (action == DNSAction::Action::None) {
1143 action = g_dynBlockAction;
1144 }
1145 switch (action) {
1146 case DNSAction::Action::NoOp:
1147 /* do nothing */
1148 break;
1149
1150 case DNSAction::Action::Nxdomain:
1151 vinfolog("Query from %s turned into NXDomain because of dynamic block", dq.remote->toStringWithPort());
1152 updateBlockStats();
1153
1154 dq.dh->rcode = RCode::NXDomain;
1155 dq.dh->qr=true;
1156 return true;
1157
1158 case DNSAction::Action::Refused:
1159 vinfolog("Query from %s refused because of dynamic block", dq.remote->toStringWithPort());
1160 updateBlockStats();
1161
1162 dq.dh->rcode = RCode::Refused;
1163 dq.dh->qr = true;
1164 return true;
1165
1166 case DNSAction::Action::Truncate:
1167 if(!dq.tcp) {
1168 updateBlockStats();
1169 vinfolog("Query from %s truncated because of dynamic block", dq.remote->toStringWithPort());
1170 dq.dh->tc = true;
1171 dq.dh->qr = true;
1172 return true;
1173 }
1174 else {
1175 vinfolog("Query from %s for %s over TCP *not* truncated because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toLogString());
1176 }
1177 break;
1178 case DNSAction::Action::NoRecurse:
1179 updateBlockStats();
1180 vinfolog("Query from %s setting rd=0 because of dynamic block", dq.remote->toStringWithPort());
1181 dq.dh->rd = false;
1182 return true;
1183 default:
1184 updateBlockStats();
1185 vinfolog("Query from %s dropped because of dynamic block", dq.remote->toStringWithPort());
1186 return false;
1187 }
1188 }
1189 }
1190
1191 if(auto got = holders.dynSMTBlock->lookup(*dq.qname)) {
1192 auto updateBlockStats = [&got]() {
1193 ++g_stats.dynBlocked;
1194 got->blocks++;
1195 };
1196
1197 if(now < got->until) {
1198 DNSAction::Action action = got->action;
1199 if (action == DNSAction::Action::None) {
1200 action = g_dynBlockAction;
1201 }
1202 switch (action) {
1203 case DNSAction::Action::NoOp:
1204 /* do nothing */
1205 break;
1206 case DNSAction::Action::Nxdomain:
1207 vinfolog("Query from %s for %s turned into NXDomain because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toLogString());
1208 updateBlockStats();
1209
1210 dq.dh->rcode = RCode::NXDomain;
1211 dq.dh->qr=true;
1212 return true;
1213 case DNSAction::Action::Refused:
1214 vinfolog("Query from %s for %s refused because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toLogString());
1215 updateBlockStats();
1216
1217 dq.dh->rcode = RCode::Refused;
1218 dq.dh->qr=true;
1219 return true;
1220 case DNSAction::Action::Truncate:
1221 if(!dq.tcp) {
1222 updateBlockStats();
1223
1224 vinfolog("Query from %s for %s truncated because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toLogString());
1225 dq.dh->tc = true;
1226 dq.dh->qr = true;
1227 return true;
1228 }
1229 else {
1230 vinfolog("Query from %s for %s over TCP *not* truncated because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toLogString());
1231 }
1232 break;
1233 case DNSAction::Action::NoRecurse:
1234 updateBlockStats();
1235 vinfolog("Query from %s setting rd=0 because of dynamic block", dq.remote->toStringWithPort());
1236 dq.dh->rd = false;
1237 return true;
1238 default:
1239 updateBlockStats();
1240 vinfolog("Query from %s for %s dropped because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toLogString());
1241 return false;
1242 }
1243 }
1244 }
1245
1246 DNSAction::Action action=DNSAction::Action::None;
1247 string ruleresult;
1248 bool drop = false;
1249 for(const auto& lr : *holders.rulactions) {
1250 if(lr.d_rule->matches(&dq)) {
1251 lr.d_rule->d_matches++;
1252 action=(*lr.d_action)(&dq, &ruleresult);
1253 if (processRulesResult(action, dq, ruleresult, drop)) {
1254 break;
1255 }
1256 }
1257 }
1258
1259 if (drop) {
1260 return false;
1261 }
1262
1263 return true;
1264 }
1265
1266 ssize_t udpClientSendRequestToBackend(const std::shared_ptr<DownstreamState>& ss, const int sd, const char* request, const size_t requestLen, bool healthCheck)
1267 {
1268 ssize_t result;
1269
1270 if (ss->sourceItf == 0) {
1271 result = send(sd, request, requestLen, 0);
1272 }
1273 else {
1274 struct msghdr msgh;
1275 struct iovec iov;
1276 cmsgbuf_aligned cbuf;
1277 ComboAddress remote(ss->remote);
1278 fillMSGHdr(&msgh, &iov, &cbuf, sizeof(cbuf), const_cast<char*>(request), requestLen, &remote);
1279 addCMsgSrcAddr(&msgh, &cbuf, &ss->sourceAddr, ss->sourceItf);
1280 result = sendmsg(sd, &msgh, 0);
1281 }
1282
1283 if (result == -1) {
1284 int savederrno = errno;
1285 vinfolog("Error sending request to backend %s: %d", ss->remote.toStringWithPort(), savederrno);
1286
1287 /* This might sound silly, but on Linux send() might fail with EINVAL
1288 if the interface the socket was bound to doesn't exist anymore.
1289 We don't want to reconnect the real socket if the healthcheck failed,
1290 because it's not using the same socket.
1291 */
1292 if (!healthCheck && (savederrno == EINVAL || savederrno == ENODEV)) {
1293 ss->reconnect();
1294 }
1295 }
1296
1297 return result;
1298 }
1299
1300 static bool isUDPQueryAcceptable(ClientState& cs, LocalHolders& holders, const struct msghdr* msgh, const ComboAddress& remote, ComboAddress& dest)
1301 {
1302 if (msgh->msg_flags & MSG_TRUNC) {
1303 /* message was too large for our buffer */
1304 vinfolog("Dropping message too large for our buffer");
1305 ++g_stats.nonCompliantQueries;
1306 return false;
1307 }
1308
1309 if(!holders.acl->match(remote)) {
1310 vinfolog("Query from %s dropped because of ACL", remote.toStringWithPort());
1311 ++g_stats.aclDrops;
1312 return false;
1313 }
1314
1315 cs.queries++;
1316 ++g_stats.queries;
1317
1318 if (HarvestDestinationAddress(msgh, &dest)) {
1319 /* we don't get the port, only the address */
1320 dest.sin4.sin_port = cs.local.sin4.sin_port;
1321 }
1322 else {
1323 dest.sin4.sin_family = 0;
1324 }
1325
1326 return true;
1327 }
1328
1329 boost::optional<std::vector<uint8_t>> checkDNSCryptQuery(const ClientState& cs, const char* query, uint16_t& len, std::shared_ptr<DNSCryptQuery>& dnsCryptQuery, time_t now, bool tcp)
1330 {
1331 if (cs.dnscryptCtx) {
1332 #ifdef HAVE_DNSCRYPT
1333 vector<uint8_t> response;
1334 uint16_t decryptedQueryLen = 0;
1335
1336 dnsCryptQuery = std::make_shared<DNSCryptQuery>(cs.dnscryptCtx);
1337
1338 bool decrypted = handleDNSCryptQuery(const_cast<char*>(query), len, dnsCryptQuery, &decryptedQueryLen, tcp, now, response);
1339
1340 if (!decrypted) {
1341 if (response.size() > 0) {
1342 return response;
1343 }
1344 throw std::runtime_error("Unable to decrypt DNSCrypt query, dropping.");
1345 }
1346
1347 len = decryptedQueryLen;
1348 #endif /* HAVE_DNSCRYPT */
1349 }
1350 return boost::none;
1351 }
1352
1353 bool checkQueryHeaders(const struct dnsheader* dh)
1354 {
1355 if (dh->qr) { // don't respond to responses
1356 ++g_stats.nonCompliantQueries;
1357 return false;
1358 }
1359
1360 if (dh->qdcount == 0) {
1361 ++g_stats.emptyQueries;
1362 return false;
1363 }
1364
1365 if (dh->rd) {
1366 ++g_stats.rdQueries;
1367 }
1368
1369 return true;
1370 }
1371
1372 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1373 static void queueResponse(const ClientState& cs, const char* response, uint16_t responseLen, const ComboAddress& dest, const ComboAddress& remote, struct mmsghdr& outMsg, struct iovec* iov, cmsgbuf_aligned* cbuf)
1374 {
1375 outMsg.msg_len = 0;
1376 fillMSGHdr(&outMsg.msg_hdr, iov, nullptr, 0, const_cast<char*>(response), responseLen, const_cast<ComboAddress*>(&remote));
1377
1378 if (dest.sin4.sin_family == 0) {
1379 outMsg.msg_hdr.msg_control = nullptr;
1380 }
1381 else {
1382 addCMsgSrcAddr(&outMsg.msg_hdr, cbuf, &dest, 0);
1383 }
1384 }
1385 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1386
1387 /* self-generated responses or cache hits */
1388 static bool prepareOutgoingResponse(LocalHolders& holders, ClientState& cs, DNSQuestion& dq, bool cacheHit)
1389 {
1390 DNSResponse dr(dq.qname, dq.qtype, dq.qclass, dq.consumed, dq.local, dq.remote, reinterpret_cast<dnsheader*>(dq.dh), dq.size, dq.len, dq.tcp, dq.queryTime);
1391
1392 #ifdef HAVE_PROTOBUF
1393 dr.uniqueId = dq.uniqueId;
1394 #endif
1395 dr.qTag = dq.qTag;
1396 dr.delayMsec = dq.delayMsec;
1397
1398 if (!applyRulesToResponse(cacheHit ? holders.cacheHitRespRulactions : holders.selfAnsweredRespRulactions, dr)) {
1399 return false;
1400 }
1401
1402 /* in case a rule changed it */
1403 dq.delayMsec = dr.delayMsec;
1404
1405 #ifdef HAVE_DNSCRYPT
1406 if (!cs.muted) {
1407 if (!encryptResponse(reinterpret_cast<char*>(dq.dh), &dq.len, dq.size, dq.tcp, dq.dnsCryptQuery, nullptr, nullptr)) {
1408 return false;
1409 }
1410 }
1411 #endif /* HAVE_DNSCRYPT */
1412
1413 if (cacheHit) {
1414 ++g_stats.cacheHits;
1415 }
1416
1417 switch (dr.dh->rcode) {
1418 case RCode::NXDomain:
1419 ++g_stats.frontendNXDomain;
1420 break;
1421 case RCode::ServFail:
1422 ++g_stats.frontendServFail;
1423 break;
1424 case RCode::NoError:
1425 ++g_stats.frontendNoError;
1426 break;
1427 }
1428
1429 doLatencyStats(0); // we're not going to measure this
1430 return true;
1431 }
1432
1433 ProcessQueryResult processQuery(DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend)
1434 {
1435 const uint16_t queryId = ntohs(dq.dh->id);
1436
1437 try {
1438 /* we need an accurate ("real") value for the response and
1439 to store into the IDS, but not for insertion into the
1440 rings for example */
1441 struct timespec now;
1442 gettime(&now);
1443
1444 if (!applyRulesToQuery(holders, dq, now)) {
1445 return ProcessQueryResult::Drop;
1446 }
1447
1448 if(dq.dh->qr) { // something turned it into a response
1449 fixUpQueryTurnedResponse(dq, dq.origFlags);
1450
1451 if (!prepareOutgoingResponse(holders, cs, dq, false)) {
1452 return ProcessQueryResult::Drop;
1453 }
1454
1455 ++g_stats.selfAnswered;
1456 ++cs.responses;
1457 return ProcessQueryResult::SendAnswer;
1458 }
1459
1460 std::shared_ptr<ServerPool> serverPool = getPool(*holders.pools, dq.poolname);
1461 dq.packetCache = serverPool->packetCache;
1462 auto policy = *(holders.policy);
1463 if (serverPool->policy != nullptr) {
1464 policy = *(serverPool->policy);
1465 }
1466 auto servers = serverPool->getServers();
1467 if (policy.isLua) {
1468 std::lock_guard<std::mutex> lock(g_luamutex);
1469 selectedBackend = policy.policy(servers, &dq);
1470 }
1471 else {
1472 selectedBackend = policy.policy(servers, &dq);
1473 }
1474
1475 uint16_t cachedResponseSize = dq.size;
1476 uint32_t allowExpired = selectedBackend ? 0 : g_staleCacheEntriesTTL;
1477
1478 if (dq.packetCache && !dq.skipCache) {
1479 dq.dnssecOK = (getEDNSZ(dq) & EDNS_HEADER_FLAG_DO);
1480 }
1481
1482 if (dq.useECS && ((selectedBackend && selectedBackend->useECS) || (!selectedBackend && serverPool->getECS()))) {
1483 // we special case our cache in case a downstream explicitly gave us a universally valid response with a 0 scope
1484 if (dq.packetCache && !dq.skipCache && (!selectedBackend || !selectedBackend->disableZeroScope) && dq.packetCache->isECSParsingEnabled()) {
1485 if (dq.packetCache->get(dq, dq.consumed, dq.dh->id, reinterpret_cast<char*>(dq.dh), &cachedResponseSize, &dq.cacheKeyNoECS, dq.subnet, dq.dnssecOK, allowExpired)) {
1486 dq.len = cachedResponseSize;
1487
1488 if (!prepareOutgoingResponse(holders, cs, dq, true)) {
1489 return ProcessQueryResult::Drop;
1490 }
1491
1492 return ProcessQueryResult::SendAnswer;
1493 }
1494
1495 if (!dq.subnet) {
1496 /* there was no existing ECS on the query, enable the zero-scope feature */
1497 dq.useZeroScope = true;
1498 }
1499 }
1500
1501 if (!handleEDNSClientSubnet(dq, &(dq.ednsAdded), &(dq.ecsAdded), g_preserveTrailingData)) {
1502 vinfolog("Dropping query from %s because we couldn't insert the ECS value", dq.remote->toStringWithPort());
1503 return ProcessQueryResult::Drop;
1504 }
1505 }
1506
1507 if (dq.packetCache && !dq.skipCache) {
1508 if (dq.packetCache->get(dq, dq.consumed, dq.dh->id, reinterpret_cast<char*>(dq.dh), &cachedResponseSize, &dq.cacheKey, dq.subnet, dq.dnssecOK, allowExpired)) {
1509 dq.len = cachedResponseSize;
1510
1511 if (!prepareOutgoingResponse(holders, cs, dq, true)) {
1512 return ProcessQueryResult::Drop;
1513 }
1514
1515 return ProcessQueryResult::SendAnswer;
1516 }
1517 ++g_stats.cacheMisses;
1518 }
1519
1520 if(!selectedBackend) {
1521 ++g_stats.noPolicy;
1522
1523 vinfolog("%s query for %s|%s from %s, no policy applied", g_servFailOnNoPolicy ? "ServFailed" : "Dropped", dq.qname->toLogString(), QType(dq.qtype).getName(), dq.remote->toStringWithPort());
1524 if (g_servFailOnNoPolicy) {
1525 restoreFlags(dq.dh, dq.origFlags);
1526
1527 dq.dh->rcode = RCode::ServFail;
1528 dq.dh->qr = true;
1529
1530 if (!prepareOutgoingResponse(holders, cs, dq, false)) {
1531 return ProcessQueryResult::Drop;
1532 }
1533 // no response-only statistics counter to update.
1534 return ProcessQueryResult::SendAnswer;
1535 }
1536
1537 return ProcessQueryResult::Drop;
1538 }
1539
1540 if (dq.addXPF && selectedBackend->xpfRRCode != 0) {
1541 addXPF(dq, selectedBackend->xpfRRCode, g_preserveTrailingData);
1542 }
1543
1544 selectedBackend->queries++;
1545 return ProcessQueryResult::PassToBackend;
1546 }
1547 catch(const std::exception& e){
1548 vinfolog("Got an error while parsing a %s query from %s, id %d: %s", (dq.tcp ? "TCP" : "UDP"), dq.remote->toStringWithPort(), queryId, e.what());
1549 }
1550 return ProcessQueryResult::Drop;
1551 }
1552
1553 static void processUDPQuery(ClientState& cs, LocalHolders& holders, const struct msghdr* msgh, const ComboAddress& remote, ComboAddress& dest, char* query, uint16_t len, size_t queryBufferSize, struct mmsghdr* responsesVect, unsigned int* queuedResponses, struct iovec* respIOV, cmsgbuf_aligned* respCBuf)
1554 {
1555 assert(responsesVect == nullptr || (queuedResponses != nullptr && respIOV != nullptr && respCBuf != nullptr));
1556 uint16_t queryId = 0;
1557
1558 try {
1559 if (!isUDPQueryAcceptable(cs, holders, msgh, remote, dest)) {
1560 return;
1561 }
1562
1563 /* we need an accurate ("real") value for the response and
1564 to store into the IDS, but not for insertion into the
1565 rings for example */
1566 struct timespec queryRealTime;
1567 gettime(&queryRealTime, true);
1568
1569 std::shared_ptr<DNSCryptQuery> dnsCryptQuery = nullptr;
1570 auto dnsCryptResponse = checkDNSCryptQuery(cs, query, len, dnsCryptQuery, queryRealTime.tv_sec, false);
1571 if (dnsCryptResponse) {
1572 sendUDPResponse(cs.udpFD, reinterpret_cast<char*>(dnsCryptResponse->data()), static_cast<uint16_t>(dnsCryptResponse->size()), 0, dest, remote);
1573 return;
1574 }
1575
1576 struct dnsheader* dh = reinterpret_cast<struct dnsheader*>(query);
1577 queryId = ntohs(dh->id);
1578
1579 if (!checkQueryHeaders(dh)) {
1580 return;
1581 }
1582
1583 uint16_t qtype, qclass;
1584 unsigned int consumed = 0;
1585 DNSName qname(query, len, sizeof(dnsheader), false, &qtype, &qclass, &consumed);
1586 DNSQuestion dq(&qname, qtype, qclass, consumed, dest.sin4.sin_family != 0 ? &dest : &cs.local, &remote, dh, queryBufferSize, len, false, &queryRealTime);
1587 dq.dnsCryptQuery = std::move(dnsCryptQuery);
1588 std::shared_ptr<DownstreamState> ss{nullptr};
1589 auto result = processQuery(dq, cs, holders, ss);
1590
1591 if (result == ProcessQueryResult::Drop) {
1592 return;
1593 }
1594
1595 if (result == ProcessQueryResult::SendAnswer) {
1596 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1597 if (dq.delayMsec == 0 && responsesVect != nullptr) {
1598 queueResponse(cs, reinterpret_cast<char*>(dq.dh), dq.len, *dq.local, *dq.remote, responsesVect[*queuedResponses], respIOV, respCBuf);
1599 (*queuedResponses)++;
1600 return;
1601 }
1602 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1603 /* we use dest, always, because we don't want to use the listening address to send a response since it could be 0.0.0.0 */
1604 sendUDPResponse(cs.udpFD, reinterpret_cast<char*>(dq.dh), dq.len, dq.delayMsec, dest, *dq.remote);
1605 return;
1606 }
1607
1608 if (result != ProcessQueryResult::PassToBackend || ss == nullptr) {
1609 return;
1610 }
1611
1612 unsigned int idOffset = (ss->idOffset++) % ss->idStates.size();
1613 IDState* ids = &ss->idStates[idOffset];
1614 ids->age = 0;
1615 DOHUnit* du = nullptr;
1616
1617 /* that means that the state was in use, possibly with an allocated
1618 DOHUnit that we will need to handle, but we can't touch it before
1619 confirming that we now own this state */
1620 if (ids->isInUse()) {
1621 du = ids->du;
1622 }
1623
1624 /* we atomically replace the value, we now own this state */
1625 if (!ids->markAsUsed()) {
1626 /* the state was not in use.
1627 we reset 'du' because it might have still been in use when we read it. */
1628 du = nullptr;
1629 ++ss->outstanding;
1630 }
1631 else {
1632 /* we are reusing a state, no change in outstanding but if there was an existing DOHUnit we need
1633 to handle it because it's about to be overwritten. */
1634 ids->du = nullptr;
1635 ++ss->reuseds;
1636 ++g_stats.downstreamTimeouts;
1637 handleDOHTimeout(du);
1638 }
1639
1640 ids->cs = &cs;
1641 ids->origFD = cs.udpFD;
1642 ids->origID = dh->id;
1643 setIDStateFromDNSQuestion(*ids, dq, std::move(qname));
1644
1645 /* If we couldn't harvest the real dest addr, still
1646 write down the listening addr since it will be useful
1647 (especially if it's not an 'any' one).
1648 We need to keep track of which one it is since we may
1649 want to use the real but not the listening addr to reply.
1650 */
1651 if (dest.sin4.sin_family != 0) {
1652 ids->origDest = dest;
1653 ids->destHarvested = true;
1654 }
1655 else {
1656 ids->origDest = cs.local;
1657 ids->destHarvested = false;
1658 }
1659
1660 dh->id = idOffset;
1661
1662 int fd = pickBackendSocketForSending(ss);
1663 ssize_t ret = udpClientSendRequestToBackend(ss, fd, query, dq.len);
1664
1665 if(ret < 0) {
1666 ++ss->sendErrors;
1667 ++g_stats.downstreamSendErrors;
1668 }
1669
1670 vinfolog("Got query for %s|%s from %s, relayed to %s", ids->qname.toLogString(), QType(ids->qtype).getName(), remote.toStringWithPort(), ss->getName());
1671 }
1672 catch(const std::exception& e){
1673 vinfolog("Got an error in UDP question thread while parsing a query from %s, id %d: %s", remote.toStringWithPort(), queryId, e.what());
1674 }
1675 }
1676
1677 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1678 static void MultipleMessagesUDPClientThread(ClientState* cs, LocalHolders& holders)
1679 {
1680 struct MMReceiver
1681 {
1682 char packet[4096];
1683 ComboAddress remote;
1684 ComboAddress dest;
1685 struct iovec iov;
1686 /* used by HarvestDestinationAddress */
1687 cmsgbuf_aligned cbuf;
1688 };
1689 const size_t vectSize = g_udpVectorSize;
1690 /* the actual buffer is larger because:
1691 - we may have to add EDNS and/or ECS
1692 - we use it for self-generated responses (from rule or cache)
1693 but we only accept incoming payloads up to that size
1694 */
1695 static_assert(s_udpIncomingBufferSize <= sizeof(MMReceiver::packet), "the incoming buffer size should not be larger than sizeof(MMReceiver::packet)");
1696
1697 auto recvData = std::unique_ptr<MMReceiver[]>(new MMReceiver[vectSize]);
1698 auto msgVec = std::unique_ptr<struct mmsghdr[]>(new struct mmsghdr[vectSize]);
1699 auto outMsgVec = std::unique_ptr<struct mmsghdr[]>(new struct mmsghdr[vectSize]);
1700
1701 /* initialize the structures needed to receive our messages */
1702 for (size_t idx = 0; idx < vectSize; idx++) {
1703 recvData[idx].remote.sin4.sin_family = cs->local.sin4.sin_family;
1704 fillMSGHdr(&msgVec[idx].msg_hdr, &recvData[idx].iov, &recvData[idx].cbuf, sizeof(recvData[idx].cbuf), recvData[idx].packet, s_udpIncomingBufferSize, &recvData[idx].remote);
1705 }
1706
1707 /* go now */
1708 for(;;) {
1709
1710 /* reset the IO vector, since it's also used to send the vector of responses
1711 to avoid having to copy the data around */
1712 for (size_t idx = 0; idx < vectSize; idx++) {
1713 recvData[idx].iov.iov_base = recvData[idx].packet;
1714 recvData[idx].iov.iov_len = sizeof(recvData[idx].packet);
1715 }
1716
1717 /* block until we have at least one message ready, but return
1718 as many as possible to save the syscall costs */
1719 int msgsGot = recvmmsg(cs->udpFD, msgVec.get(), vectSize, MSG_WAITFORONE | MSG_TRUNC, nullptr);
1720
1721 if (msgsGot <= 0) {
1722 vinfolog("Getting UDP messages via recvmmsg() failed with: %s", stringerror());
1723 continue;
1724 }
1725
1726 unsigned int msgsToSend = 0;
1727
1728 /* process the received messages */
1729 for (int msgIdx = 0; msgIdx < msgsGot; msgIdx++) {
1730 const struct msghdr* msgh = &msgVec[msgIdx].msg_hdr;
1731 unsigned int got = msgVec[msgIdx].msg_len;
1732 const ComboAddress& remote = recvData[msgIdx].remote;
1733
1734 if (static_cast<size_t>(got) < sizeof(struct dnsheader)) {
1735 ++g_stats.nonCompliantQueries;
1736 continue;
1737 }
1738
1739 processUDPQuery(*cs, holders, msgh, remote, recvData[msgIdx].dest, recvData[msgIdx].packet, static_cast<uint16_t>(got), sizeof(recvData[msgIdx].packet), outMsgVec.get(), &msgsToSend, &recvData[msgIdx].iov, &recvData[msgIdx].cbuf);
1740
1741 }
1742
1743 /* immediate (not delayed or sent to a backend) responses (mostly from a rule, dynamic block
1744 or the cache) can be sent in batch too */
1745
1746 if (msgsToSend > 0 && msgsToSend <= static_cast<unsigned int>(msgsGot)) {
1747 int sent = sendmmsg(cs->udpFD, outMsgVec.get(), msgsToSend, 0);
1748
1749 if (sent < 0 || static_cast<unsigned int>(sent) != msgsToSend) {
1750 vinfolog("Error sending responses with sendmmsg() (%d on %u): %s", sent, msgsToSend, stringerror());
1751 }
1752 }
1753
1754 }
1755 }
1756 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1757
1758 // listens to incoming queries, sends out to downstream servers, noting the intended return path
1759 static void udpClientThread(ClientState* cs)
1760 try
1761 {
1762 setThreadName("dnsdist/udpClie");
1763 LocalHolders holders;
1764
1765 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1766 if (g_udpVectorSize > 1) {
1767 MultipleMessagesUDPClientThread(cs, holders);
1768
1769 }
1770 else
1771 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1772 {
1773 char packet[4096];
1774 /* the actual buffer is larger because:
1775 - we may have to add EDNS and/or ECS
1776 - we use it for self-generated responses (from rule or cache)
1777 but we only accept incoming payloads up to that size
1778 */
1779 static_assert(s_udpIncomingBufferSize <= sizeof(packet), "the incoming buffer size should not be larger than sizeof(MMReceiver::packet)");
1780 struct msghdr msgh;
1781 struct iovec iov;
1782 /* used by HarvestDestinationAddress */
1783 cmsgbuf_aligned cbuf;
1784
1785 ComboAddress remote;
1786 ComboAddress dest;
1787 remote.sin4.sin_family = cs->local.sin4.sin_family;
1788 fillMSGHdr(&msgh, &iov, &cbuf, sizeof(cbuf), packet, sizeof(packet), &remote);
1789
1790 for(;;) {
1791 ssize_t got = recvmsg(cs->udpFD, &msgh, 0);
1792
1793 if (got < 0 || static_cast<size_t>(got) < sizeof(struct dnsheader)) {
1794 ++g_stats.nonCompliantQueries;
1795 continue;
1796 }
1797
1798 processUDPQuery(*cs, holders, &msgh, remote, dest, packet, static_cast<uint16_t>(got), s_udpIncomingBufferSize, nullptr, nullptr, nullptr, nullptr);
1799 }
1800 }
1801 }
1802 catch(const std::exception &e)
1803 {
1804 errlog("UDP client thread died because of exception: %s", e.what());
1805 }
1806 catch(const PDNSException &e)
1807 {
1808 errlog("UDP client thread died because of PowerDNS exception: %s", e.reason);
1809 }
1810 catch(...)
1811 {
1812 errlog("UDP client thread died because of an exception: %s", "unknown");
1813 }
1814
1815 uint16_t getRandomDNSID()
1816 {
1817 #ifdef HAVE_LIBSODIUM
1818 return (randombytes_random() % 65536);
1819 #else
1820 return (random() % 65536);
1821 #endif
1822 }
1823
1824 static bool upCheck(const shared_ptr<DownstreamState>& ds)
1825 try
1826 {
1827 DNSName checkName = ds->checkName;
1828 uint16_t checkType = ds->checkType.getCode();
1829 uint16_t checkClass = ds->checkClass;
1830 dnsheader checkHeader;
1831 memset(&checkHeader, 0, sizeof(checkHeader));
1832
1833 checkHeader.qdcount = htons(1);
1834 checkHeader.id = getRandomDNSID();
1835
1836 checkHeader.rd = true;
1837 if (ds->setCD) {
1838 checkHeader.cd = true;
1839 }
1840
1841 if (ds->checkFunction) {
1842 std::lock_guard<std::mutex> lock(g_luamutex);
1843 auto ret = ds->checkFunction(checkName, checkType, checkClass, &checkHeader);
1844 checkName = std::get<0>(ret);
1845 checkType = std::get<1>(ret);
1846 checkClass = std::get<2>(ret);
1847 }
1848
1849 vector<uint8_t> packet;
1850 DNSPacketWriter dpw(packet, checkName, checkType, checkClass);
1851 dnsheader * requestHeader = dpw.getHeader();
1852 *requestHeader = checkHeader;
1853
1854 Socket sock(ds->remote.sin4.sin_family, SOCK_DGRAM);
1855 sock.setNonBlocking();
1856 if (!IsAnyAddress(ds->sourceAddr)) {
1857 sock.setReuseAddr();
1858 if (!ds->sourceItfName.empty()) {
1859 #ifdef SO_BINDTODEVICE
1860 int res = setsockopt(sock.getHandle(), SOL_SOCKET, SO_BINDTODEVICE, ds->sourceItfName.c_str(), ds->sourceItfName.length());
1861 if (res != 0 && g_verboseHealthChecks) {
1862 infolog("Error settting SO_BINDTODEVICE on the health check socket for backend '%s': %s", ds->getNameWithAddr(), stringerror());
1863 }
1864 #endif
1865 }
1866 sock.bind(ds->sourceAddr);
1867 }
1868 sock.connect(ds->remote);
1869 ssize_t sent = udpClientSendRequestToBackend(ds, sock.getHandle(), reinterpret_cast<char*>(&packet[0]), packet.size(), true);
1870 if (sent < 0) {
1871 int ret = errno;
1872 if (g_verboseHealthChecks)
1873 infolog("Error while sending a health check query to backend %s: %d", ds->getNameWithAddr(), ret);
1874 return false;
1875 }
1876
1877 int ret = waitForRWData(sock.getHandle(), true, /* ms to seconds */ ds->checkTimeout / 1000, /* remaining ms to us */ (ds->checkTimeout % 1000) * 1000);
1878 if(ret < 0 || !ret) { // error, timeout, both are down!
1879 if (ret < 0) {
1880 ret = errno;
1881 if (g_verboseHealthChecks)
1882 infolog("Error while waiting for the health check response from backend %s: %d", ds->getNameWithAddr(), ret);
1883 }
1884 else {
1885 if (g_verboseHealthChecks)
1886 infolog("Timeout while waiting for the health check response from backend %s", ds->getNameWithAddr());
1887 }
1888 return false;
1889 }
1890
1891 string reply;
1892 ComboAddress from;
1893 sock.recvFrom(reply, from);
1894
1895 /* we are using a connected socket but hey.. */
1896 if (from != ds->remote) {
1897 if (g_verboseHealthChecks)
1898 infolog("Invalid health check response received from %s, expecting one from %s", from.toStringWithPort(), ds->remote.toStringWithPort());
1899 return false;
1900 }
1901
1902 const dnsheader * responseHeader = reinterpret_cast<const dnsheader *>(reply.c_str());
1903
1904 if (reply.size() < sizeof(*responseHeader)) {
1905 if (g_verboseHealthChecks)
1906 infolog("Invalid health check response of size %d from backend %s, expecting at least %d", reply.size(), ds->getNameWithAddr(), sizeof(*responseHeader));
1907 return false;
1908 }
1909
1910 if (responseHeader->id != requestHeader->id) {
1911 if (g_verboseHealthChecks)
1912 infolog("Invalid health check response id %d from backend %s, expecting %d", responseHeader->id, ds->getNameWithAddr(), requestHeader->id);
1913 return false;
1914 }
1915
1916 if (!responseHeader->qr) {
1917 if (g_verboseHealthChecks)
1918 infolog("Invalid health check response from backend %s, expecting QR to be set", ds->getNameWithAddr());
1919 return false;
1920 }
1921
1922 if (responseHeader->rcode == RCode::ServFail) {
1923 if (g_verboseHealthChecks)
1924 infolog("Backend %s responded to health check with ServFail", ds->getNameWithAddr());
1925 return false;
1926 }
1927
1928 if (ds->mustResolve && (responseHeader->rcode == RCode::NXDomain || responseHeader->rcode == RCode::Refused)) {
1929 if (g_verboseHealthChecks)
1930 infolog("Backend %s responded to health check with %s while mustResolve is set", ds->getNameWithAddr(), responseHeader->rcode == RCode::NXDomain ? "NXDomain" : "Refused");
1931 return false;
1932 }
1933
1934 uint16_t receivedType;
1935 uint16_t receivedClass;
1936 DNSName receivedName(reply.c_str(), reply.size(), sizeof(dnsheader), false, &receivedType, &receivedClass);
1937
1938 if (receivedName != checkName || receivedType != checkType || receivedClass != checkClass) {
1939 if (g_verboseHealthChecks)
1940 infolog("Backend %s responded to health check with an invalid qname (%s vs %s), qtype (%s vs %s) or qclass (%d vs %d)", ds->getNameWithAddr(), receivedName.toLogString(), checkName.toLogString(), QType(receivedType).getName(), QType(checkType).getName(), receivedClass, checkClass);
1941 return false;
1942 }
1943
1944 return true;
1945 }
1946 catch(const std::exception& e)
1947 {
1948 if (g_verboseHealthChecks)
1949 infolog("Error checking the health of backend %s: %s", ds->getNameWithAddr(), e.what());
1950 return false;
1951 }
1952 catch(...)
1953 {
1954 if (g_verboseHealthChecks)
1955 infolog("Unknown exception while checking the health of backend %s", ds->getNameWithAddr());
1956 return false;
1957 }
1958
1959 uint64_t g_maxTCPClientThreads{10};
1960 std::atomic<uint16_t> g_cacheCleaningDelay{60};
1961 std::atomic<uint16_t> g_cacheCleaningPercentage{100};
1962
1963 void maintThread()
1964 {
1965 setThreadName("dnsdist/main");
1966 int interval = 1;
1967 size_t counter = 0;
1968 int32_t secondsToWaitLog = 0;
1969
1970 for(;;) {
1971 sleep(interval);
1972
1973 {
1974 std::lock_guard<std::mutex> lock(g_luamutex);
1975 auto f = g_lua.readVariable<boost::optional<std::function<void()> > >("maintenance");
1976 if(f) {
1977 try {
1978 (*f)();
1979 secondsToWaitLog = 0;
1980 }
1981 catch(std::exception &e) {
1982 if (secondsToWaitLog <= 0) {
1983 infolog("Error during execution of maintenance function: %s", e.what());
1984 secondsToWaitLog = 61;
1985 }
1986 secondsToWaitLog -= interval;
1987 }
1988 }
1989 }
1990
1991 counter++;
1992 if (counter >= g_cacheCleaningDelay) {
1993 /* keep track, for each cache, of whether we should keep
1994 expired entries */
1995 std::map<std::shared_ptr<DNSDistPacketCache>, bool> caches;
1996
1997 /* gather all caches actually used by at least one pool, and see
1998 if something prevents us from cleaning the expired entries */
1999 auto localPools = g_pools.getLocal();
2000 for (const auto& entry : *localPools) {
2001 auto& pool = entry.second;
2002
2003 auto packetCache = pool->packetCache;
2004 if (!packetCache) {
2005 continue;
2006 }
2007
2008 auto pair = caches.insert({packetCache, false});
2009 auto& iter = pair.first;
2010 /* if we need to keep stale data for this cache (ie, not clear
2011 expired entries when at least one pool using this cache
2012 has all its backends down) */
2013 if (packetCache->keepStaleData() && iter->second == false) {
2014 /* so far all pools had at least one backend up */
2015 if (pool->countServers(true) == 0) {
2016 iter->second = true;
2017 }
2018 }
2019 }
2020
2021 for (auto pair : caches) {
2022 /* shall we keep expired entries ? */
2023 if (pair.second == true) {
2024 continue;
2025 }
2026 auto& packetCache = pair.first;
2027 size_t upTo = (packetCache->getMaxEntries()* (100 - g_cacheCleaningPercentage)) / 100;
2028 packetCache->purgeExpired(upTo);
2029 }
2030 counter = 0;
2031 }
2032
2033 // ponder pruning g_dynblocks of expired entries here
2034 }
2035 }
2036
2037 static void secPollThread()
2038 {
2039 setThreadName("dnsdist/secpoll");
2040
2041 for (;;) {
2042 try {
2043 doSecPoll(g_secPollSuffix);
2044 }
2045 catch(...) {
2046 }
2047 sleep(g_secPollInterval);
2048 }
2049 }
2050
2051 static void healthChecksThread()
2052 {
2053 setThreadName("dnsdist/healthC");
2054
2055 int interval = 1;
2056
2057 for(;;) {
2058 sleep(interval);
2059
2060 if(g_tcpclientthreads->getQueuedCount() > 1 && !g_tcpclientthreads->hasReachedMaxThreads())
2061 g_tcpclientthreads->addTCPClientThread();
2062
2063 auto states = g_dstates.getLocal(); // this points to the actual shared_ptrs!
2064 for(auto& dss : *states) {
2065 if(++dss->lastCheck < dss->checkInterval)
2066 continue;
2067 dss->lastCheck = 0;
2068 if(dss->availability==DownstreamState::Availability::Auto) {
2069 bool newState=upCheck(dss);
2070 if (newState) {
2071 /* check succeeded */
2072 dss->currentCheckFailures = 0;
2073
2074 if (!dss->upStatus) {
2075 /* we were marked as down */
2076 dss->consecutiveSuccessfulChecks++;
2077 if (dss->consecutiveSuccessfulChecks < dss->minRiseSuccesses) {
2078 /* if we need more than one successful check to rise
2079 and we didn't reach the threshold yet,
2080 let's stay down */
2081 newState = false;
2082 }
2083 }
2084 }
2085 else {
2086 /* check failed */
2087 dss->consecutiveSuccessfulChecks = 0;
2088
2089 if (dss->upStatus) {
2090 /* we are currently up */
2091 dss->currentCheckFailures++;
2092 if (dss->currentCheckFailures < dss->maxCheckFailures) {
2093 /* we need more than one failure to be marked as down,
2094 and we did not reach the threshold yet, let's stay down */
2095 newState = true;
2096 }
2097 }
2098 }
2099
2100 if(newState != dss->upStatus) {
2101 warnlog("Marking downstream %s as '%s'", dss->getNameWithAddr(), newState ? "up" : "down");
2102
2103 if (newState && !dss->connected) {
2104 newState = dss->reconnect();
2105
2106 if (dss->connected && !dss->threadStarted.test_and_set()) {
2107 dss->tid = thread(responderThread, dss);
2108 }
2109 }
2110
2111 dss->upStatus = newState;
2112 dss->currentCheckFailures = 0;
2113 dss->consecutiveSuccessfulChecks = 0;
2114 if (g_snmpAgent && g_snmpTrapsEnabled) {
2115 g_snmpAgent->sendBackendStatusChangeTrap(dss);
2116 }
2117 }
2118 }
2119
2120 auto delta = dss->sw.udiffAndSet()/1000000.0;
2121 dss->queryLoad = 1.0*(dss->queries.load() - dss->prev.queries.load())/delta;
2122 dss->dropRate = 1.0*(dss->reuseds.load() - dss->prev.reuseds.load())/delta;
2123 dss->prev.queries.store(dss->queries.load());
2124 dss->prev.reuseds.store(dss->reuseds.load());
2125
2126 for(IDState& ids : dss->idStates) { // timeouts
2127 int64_t usageIndicator = ids.usageIndicator;
2128 if(IDState::isInUse(usageIndicator) && ids.age++ > g_udpTimeout) {
2129 /* We mark the state as unused as soon as possible
2130 to limit the risk of racing with the
2131 responder thread.
2132 */
2133 auto oldDU = ids.du;
2134
2135 if (!ids.tryMarkUnused(usageIndicator)) {
2136 /* this state has been altered in the meantime,
2137 don't go anywhere near it */
2138 continue;
2139 }
2140 ids.du = nullptr;
2141 handleDOHTimeout(oldDU);
2142 ids.age = 0;
2143 dss->reuseds++;
2144 --dss->outstanding;
2145 ++g_stats.downstreamTimeouts; // this is an 'actively' discovered timeout
2146 vinfolog("Had a downstream timeout from %s (%s) for query for %s|%s from %s",
2147 dss->remote.toStringWithPort(), dss->name,
2148 ids.qname.toLogString(), QType(ids.qtype).getName(), ids.origRemote.toStringWithPort());
2149
2150 struct timespec ts;
2151 gettime(&ts);
2152
2153 struct dnsheader fake;
2154 memset(&fake, 0, sizeof(fake));
2155 fake.id = ids.origID;
2156
2157 g_rings.insertResponse(ts, ids.origRemote, ids.qname, ids.qtype, std::numeric_limits<unsigned int>::max(), 0, fake, dss->remote);
2158 }
2159 }
2160 }
2161 }
2162 }
2163
2164 static void bindAny(int af, int sock)
2165 {
2166 __attribute__((unused)) int one = 1;
2167
2168 #ifdef IP_FREEBIND
2169 if (setsockopt(sock, IPPROTO_IP, IP_FREEBIND, &one, sizeof(one)) < 0)
2170 warnlog("Warning: IP_FREEBIND setsockopt failed: %s", stringerror());
2171 #endif
2172
2173 #ifdef IP_BINDANY
2174 if (af == AF_INET)
2175 if (setsockopt(sock, IPPROTO_IP, IP_BINDANY, &one, sizeof(one)) < 0)
2176 warnlog("Warning: IP_BINDANY setsockopt failed: %s", stringerror());
2177 #endif
2178 #ifdef IPV6_BINDANY
2179 if (af == AF_INET6)
2180 if (setsockopt(sock, IPPROTO_IPV6, IPV6_BINDANY, &one, sizeof(one)) < 0)
2181 warnlog("Warning: IPV6_BINDANY setsockopt failed: %s", stringerror());
2182 #endif
2183 #ifdef SO_BINDANY
2184 if (setsockopt(sock, SOL_SOCKET, SO_BINDANY, &one, sizeof(one)) < 0)
2185 warnlog("Warning: SO_BINDANY setsockopt failed: %s", stringerror());
2186 #endif
2187 }
2188
2189 static void dropGroupPrivs(gid_t gid)
2190 {
2191 if (gid) {
2192 if (setgid(gid) == 0) {
2193 if (setgroups(0, NULL) < 0) {
2194 warnlog("Warning: Unable to drop supplementary gids: %s", stringerror());
2195 }
2196 }
2197 else {
2198 warnlog("Warning: Unable to set group ID to %d: %s", gid, stringerror());
2199 }
2200 }
2201 }
2202
2203 static void dropUserPrivs(uid_t uid)
2204 {
2205 if(uid) {
2206 if(setuid(uid) < 0) {
2207 warnlog("Warning: Unable to set user ID to %d: %s", uid, stringerror());
2208 }
2209 }
2210 }
2211
2212 static void checkFileDescriptorsLimits(size_t udpBindsCount, size_t tcpBindsCount)
2213 {
2214 /* stdin, stdout, stderr */
2215 size_t requiredFDsCount = 3;
2216 auto backends = g_dstates.getLocal();
2217 /* UDP sockets to backends */
2218 size_t backendUDPSocketsCount = 0;
2219 for (const auto& backend : *backends) {
2220 backendUDPSocketsCount += backend->sockets.size();
2221 }
2222 requiredFDsCount += backendUDPSocketsCount;
2223 /* TCP sockets to backends */
2224 requiredFDsCount += (backends->size() * g_maxTCPClientThreads);
2225 /* listening sockets */
2226 requiredFDsCount += udpBindsCount;
2227 requiredFDsCount += tcpBindsCount;
2228 /* max TCP connections currently served */
2229 requiredFDsCount += g_maxTCPClientThreads;
2230 /* max pipes for communicating between TCP acceptors and client threads */
2231 requiredFDsCount += (g_maxTCPClientThreads * 2);
2232 /* max TCP queued connections */
2233 requiredFDsCount += g_maxTCPQueuedConnections;
2234 /* DelayPipe pipe */
2235 requiredFDsCount += 2;
2236 /* syslog socket */
2237 requiredFDsCount++;
2238 /* webserver main socket */
2239 requiredFDsCount++;
2240 /* console main socket */
2241 requiredFDsCount++;
2242 /* carbon export */
2243 requiredFDsCount++;
2244 /* history file */
2245 requiredFDsCount++;
2246 struct rlimit rl;
2247 getrlimit(RLIMIT_NOFILE, &rl);
2248 if (rl.rlim_cur <= requiredFDsCount) {
2249 warnlog("Warning, this configuration can use more than %d file descriptors, web server and console connections not included, and the current limit is %d.", std::to_string(requiredFDsCount), std::to_string(rl.rlim_cur));
2250 #ifdef HAVE_SYSTEMD
2251 warnlog("You can increase this value by using LimitNOFILE= in the systemd unit file or ulimit.");
2252 #else
2253 warnlog("You can increase this value by using ulimit.");
2254 #endif
2255 }
2256 }
2257
2258 static void setUpLocalBind(std::unique_ptr<ClientState>& cs)
2259 {
2260 /* skip some warnings if there is an identical UDP context */
2261 bool warn = cs->tcp == false || cs->tlsFrontend != nullptr || cs->dohFrontend != nullptr;
2262 int& fd = cs->tcp == false ? cs->udpFD : cs->tcpFD;
2263 (void) warn;
2264
2265 fd = SSocket(cs->local.sin4.sin_family, cs->tcp == false ? SOCK_DGRAM : SOCK_STREAM, 0);
2266
2267 if (cs->tcp) {
2268 SSetsockopt(fd, SOL_SOCKET, SO_REUSEADDR, 1);
2269 #ifdef TCP_DEFER_ACCEPT
2270 SSetsockopt(fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, 1);
2271 #endif
2272 if (cs->fastOpenQueueSize > 0) {
2273 #ifdef TCP_FASTOPEN
2274 SSetsockopt(fd, IPPROTO_TCP, TCP_FASTOPEN, cs->fastOpenQueueSize);
2275 #else
2276 if (warn) {
2277 warnlog("TCP Fast Open has been configured on local address '%s' but is not supported", cs->local.toStringWithPort());
2278 }
2279 #endif
2280 }
2281 }
2282
2283 if(cs->local.sin4.sin_family == AF_INET6) {
2284 SSetsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, 1);
2285 }
2286
2287 bindAny(cs->local.sin4.sin_family, fd);
2288
2289 if(!cs->tcp && IsAnyAddress(cs->local)) {
2290 int one=1;
2291 setsockopt(fd, IPPROTO_IP, GEN_IP_PKTINFO, &one, sizeof(one)); // linux supports this, so why not - might fail on other systems
2292 #ifdef IPV6_RECVPKTINFO
2293 setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, &one, sizeof(one));
2294 #endif
2295 }
2296
2297 if (cs->reuseport) {
2298 #ifdef SO_REUSEPORT
2299 SSetsockopt(fd, SOL_SOCKET, SO_REUSEPORT, 1);
2300 #else
2301 if (warn) {
2302 /* no need to warn again if configured but support is not available, we already did for UDP */
2303 warnlog("SO_REUSEPORT has been configured on local address '%s' but is not supported", cs->local.toStringWithPort());
2304 }
2305 #endif
2306 }
2307
2308 if (!cs->tcp) {
2309 if (cs->local.isIPv4()) {
2310 try {
2311 setSocketIgnorePMTU(cs->udpFD);
2312 }
2313 catch(const std::exception& e) {
2314 warnlog("Failed to set IP_MTU_DISCOVER on UDP server socket for local address '%s': %s", cs->local.toStringWithPort(), e.what());
2315 }
2316 }
2317 }
2318
2319 const std::string& itf = cs->interface;
2320 if (!itf.empty()) {
2321 #ifdef SO_BINDTODEVICE
2322 int res = setsockopt(fd, SOL_SOCKET, SO_BINDTODEVICE, itf.c_str(), itf.length());
2323 if (res != 0) {
2324 warnlog("Error setting up the interface on local address '%s': %s", cs->local.toStringWithPort(), stringerror());
2325 }
2326 #else
2327 if (warn) {
2328 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", cs->local.toStringWithPort());
2329 }
2330 #endif
2331 }
2332
2333 #ifdef HAVE_EBPF
2334 if (g_defaultBPFFilter) {
2335 cs->attachFilter(g_defaultBPFFilter);
2336 vinfolog("Attaching default BPF Filter to %s frontend %s", (!cs->tcp ? "UDP" : "TCP"), cs->local.toStringWithPort());
2337 }
2338 #endif /* HAVE_EBPF */
2339
2340 if (cs->tlsFrontend != nullptr) {
2341 if (!cs->tlsFrontend->setupTLS()) {
2342 errlog("Error while setting up TLS on local address '%s', exiting", cs->local.toStringWithPort());
2343 _exit(EXIT_FAILURE);
2344 }
2345 }
2346
2347 if (cs->dohFrontend != nullptr) {
2348 cs->dohFrontend->setup();
2349 }
2350
2351 SBind(fd, cs->local);
2352
2353 if (cs->tcp) {
2354 SListen(cs->tcpFD, SOMAXCONN);
2355 if (cs->tlsFrontend != nullptr) {
2356 warnlog("Listening on %s for TLS", cs->local.toStringWithPort());
2357 }
2358 else if (cs->dohFrontend != nullptr) {
2359 warnlog("Listening on %s for DoH", cs->local.toStringWithPort());
2360 }
2361 else if (cs->dnscryptCtx != nullptr) {
2362 warnlog("Listening on %s for DNSCrypt", cs->local.toStringWithPort());
2363 }
2364 else {
2365 warnlog("Listening on %s", cs->local.toStringWithPort());
2366 }
2367 }
2368
2369 cs->ready = true;
2370 }
2371
2372 struct
2373 {
2374 vector<string> locals;
2375 vector<string> remotes;
2376 bool checkConfig{false};
2377 bool beClient{false};
2378 bool beSupervised{false};
2379 string command;
2380 string config;
2381 string uid;
2382 string gid;
2383 } g_cmdLine;
2384
2385 std::atomic<bool> g_configurationDone{false};
2386
2387 static void usage()
2388 {
2389 cout<<endl;
2390 cout<<"Syntax: dnsdist [-C,--config file] [-c,--client [IP[:PORT]]]\n";
2391 cout<<"[-e,--execute cmd] [-h,--help] [-l,--local addr]\n";
2392 cout<<"[-v,--verbose] [--check-config] [--version]\n";
2393 cout<<"\n";
2394 cout<<"-a,--acl netmask Add this netmask to the ACL\n";
2395 cout<<"-C,--config file Load configuration from 'file'\n";
2396 cout<<"-c,--client Operate as a client, connect to dnsdist. This reads\n";
2397 cout<<" controlSocket from your configuration file, but also\n";
2398 cout<<" accepts an IP:PORT argument\n";
2399 #ifdef HAVE_LIBSODIUM
2400 cout<<"-k,--setkey KEY Use KEY for encrypted communication to dnsdist. This\n";
2401 cout<<" is similar to setting setKey in the configuration file.\n";
2402 cout<<" NOTE: this will leak this key in your shell's history\n";
2403 cout<<" and in the systems running process list.\n";
2404 #endif
2405 cout<<"--check-config Validate the configuration file and exit. The exit-code\n";
2406 cout<<" reflects the validation, 0 is OK, 1 means an error.\n";
2407 cout<<" Any errors are printed as well.\n";
2408 cout<<"-e,--execute cmd Connect to dnsdist and execute 'cmd'\n";
2409 cout<<"-g,--gid gid Change the process group ID after binding sockets\n";
2410 cout<<"-h,--help Display this helpful message\n";
2411 cout<<"-l,--local address Listen on this local address\n";
2412 cout<<"--supervised Don't open a console, I'm supervised\n";
2413 cout<<" (use with e.g. systemd and daemontools)\n";
2414 cout<<"--disable-syslog Don't log to syslog, only to stdout\n";
2415 cout<<" (use with e.g. systemd)\n";
2416 cout<<"-u,--uid uid Change the process user ID after binding sockets\n";
2417 cout<<"-v,--verbose Enable verbose mode\n";
2418 cout<<"-V,--version Show dnsdist version information and exit\n";
2419 }
2420
2421 int main(int argc, char** argv)
2422 try
2423 {
2424 size_t udpBindsCount = 0;
2425 size_t tcpBindsCount = 0;
2426 rl_attempted_completion_function = my_completion;
2427 rl_completion_append_character = 0;
2428
2429 signal(SIGPIPE, SIG_IGN);
2430 signal(SIGCHLD, SIG_IGN);
2431 openlog("dnsdist", LOG_PID|LOG_NDELAY, LOG_DAEMON);
2432
2433 #ifdef HAVE_LIBSODIUM
2434 if (sodium_init() == -1) {
2435 cerr<<"Unable to initialize crypto library"<<endl;
2436 exit(EXIT_FAILURE);
2437 }
2438 g_hashperturb=randombytes_uniform(0xffffffff);
2439 srandom(randombytes_uniform(0xffffffff));
2440 #else
2441 {
2442 struct timeval tv;
2443 gettimeofday(&tv, 0);
2444 srandom(tv.tv_sec ^ tv.tv_usec ^ getpid());
2445 g_hashperturb=random();
2446 }
2447
2448 #endif
2449 ComboAddress clientAddress = ComboAddress();
2450 g_cmdLine.config=SYSCONFDIR "/dnsdist.conf";
2451 struct option longopts[]={
2452 {"acl", required_argument, 0, 'a'},
2453 {"check-config", no_argument, 0, 1},
2454 {"client", no_argument, 0, 'c'},
2455 {"config", required_argument, 0, 'C'},
2456 {"disable-syslog", no_argument, 0, 2},
2457 {"execute", required_argument, 0, 'e'},
2458 {"gid", required_argument, 0, 'g'},
2459 {"help", no_argument, 0, 'h'},
2460 {"local", required_argument, 0, 'l'},
2461 {"setkey", required_argument, 0, 'k'},
2462 {"supervised", no_argument, 0, 3},
2463 {"uid", required_argument, 0, 'u'},
2464 {"verbose", no_argument, 0, 'v'},
2465 {"version", no_argument, 0, 'V'},
2466 {0,0,0,0}
2467 };
2468 int longindex=0;
2469 string optstring;
2470 for(;;) {
2471 int c=getopt_long(argc, argv, "a:cC:e:g:hk:l:u:vV", longopts, &longindex);
2472 if(c==-1)
2473 break;
2474 switch(c) {
2475 case 1:
2476 g_cmdLine.checkConfig=true;
2477 break;
2478 case 2:
2479 g_syslog=false;
2480 break;
2481 case 3:
2482 g_cmdLine.beSupervised=true;
2483 break;
2484 case 'C':
2485 g_cmdLine.config=optarg;
2486 break;
2487 case 'c':
2488 g_cmdLine.beClient=true;
2489 break;
2490 case 'e':
2491 g_cmdLine.command=optarg;
2492 break;
2493 case 'g':
2494 g_cmdLine.gid=optarg;
2495 break;
2496 case 'h':
2497 cout<<"dnsdist "<<VERSION<<endl;
2498 usage();
2499 cout<<"\n";
2500 exit(EXIT_SUCCESS);
2501 break;
2502 case 'a':
2503 optstring=optarg;
2504 g_ACL.modify([optstring](NetmaskGroup& nmg) { nmg.addMask(optstring); });
2505 break;
2506 case 'k':
2507 #ifdef HAVE_LIBSODIUM
2508 if (B64Decode(string(optarg), g_consoleKey) < 0) {
2509 cerr<<"Unable to decode key '"<<optarg<<"'."<<endl;
2510 exit(EXIT_FAILURE);
2511 }
2512 #else
2513 cerr<<"dnsdist has been built without libsodium, -k/--setkey is unsupported."<<endl;
2514 exit(EXIT_FAILURE);
2515 #endif
2516 break;
2517 case 'l':
2518 g_cmdLine.locals.push_back(trim_copy(string(optarg)));
2519 break;
2520 case 'u':
2521 g_cmdLine.uid=optarg;
2522 break;
2523 case 'v':
2524 g_verbose=true;
2525 break;
2526 case 'V':
2527 #ifdef LUAJIT_VERSION
2528 cout<<"dnsdist "<<VERSION<<" ("<<LUA_RELEASE<<" ["<<LUAJIT_VERSION<<"])"<<endl;
2529 #else
2530 cout<<"dnsdist "<<VERSION<<" ("<<LUA_RELEASE<<")"<<endl;
2531 #endif
2532 cout<<"Enabled features: ";
2533 #ifdef HAVE_CDB
2534 cout<<"cdb ";
2535 #endif
2536 #ifdef HAVE_DNS_OVER_TLS
2537 cout<<"dns-over-tls(";
2538 #ifdef HAVE_GNUTLS
2539 cout<<"gnutls";
2540 #ifdef HAVE_LIBSSL
2541 cout<<" ";
2542 #endif
2543 #endif
2544 #ifdef HAVE_LIBSSL
2545 cout<<"openssl";
2546 #endif
2547 cout<<") ";
2548 #endif
2549 #ifdef HAVE_DNS_OVER_HTTPS
2550 cout<<"dns-over-https(DOH) ";
2551 #endif
2552 #ifdef HAVE_DNSCRYPT
2553 cout<<"dnscrypt ";
2554 #endif
2555 #ifdef HAVE_EBPF
2556 cout<<"ebpf ";
2557 #endif
2558 #ifdef HAVE_FSTRM
2559 cout<<"fstrm ";
2560 #endif
2561 #ifdef HAVE_LIBCRYPTO
2562 cout<<"ipcipher ";
2563 #endif
2564 #ifdef HAVE_LIBSODIUM
2565 cout<<"libsodium ";
2566 #endif
2567 #ifdef HAVE_LMDB
2568 cout<<"lmdb ";
2569 #endif
2570 #ifdef HAVE_PROTOBUF
2571 cout<<"protobuf ";
2572 #endif
2573 #ifdef HAVE_RE2
2574 cout<<"re2 ";
2575 #endif
2576 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
2577 cout<<"recvmmsg/sendmmsg ";
2578 #endif
2579 #ifdef HAVE_NET_SNMP
2580 cout<<"snmp ";
2581 #endif
2582 #ifdef HAVE_SYSTEMD
2583 cout<<"systemd";
2584 #endif
2585 cout<<endl;
2586 exit(EXIT_SUCCESS);
2587 break;
2588 case '?':
2589 //getopt_long printed an error message.
2590 usage();
2591 exit(EXIT_FAILURE);
2592 break;
2593 }
2594 }
2595
2596 argc-=optind;
2597 argv+=optind;
2598 for(auto p = argv; *p; ++p) {
2599 if(g_cmdLine.beClient) {
2600 clientAddress = ComboAddress(*p, 5199);
2601 } else {
2602 g_cmdLine.remotes.push_back(*p);
2603 }
2604 }
2605
2606 ServerPolicy leastOutstandingPol{"leastOutstanding", leastOutstanding, false};
2607
2608 g_policy.setState(leastOutstandingPol);
2609 if(g_cmdLine.beClient || !g_cmdLine.command.empty()) {
2610 setupLua(true, g_cmdLine.config);
2611 if (clientAddress != ComboAddress())
2612 g_serverControl = clientAddress;
2613 doClient(g_serverControl, g_cmdLine.command);
2614 _exit(EXIT_SUCCESS);
2615 }
2616
2617 auto acl = g_ACL.getCopy();
2618 if(acl.empty()) {
2619 for(auto& addr : {"127.0.0.0/8", "10.0.0.0/8", "100.64.0.0/10", "169.254.0.0/16", "192.168.0.0/16", "172.16.0.0/12", "::1/128", "fc00::/7", "fe80::/10"})
2620 acl.addMask(addr);
2621 g_ACL.setState(acl);
2622 }
2623
2624 auto consoleACL = g_consoleACL.getCopy();
2625 for (const auto& mask : { "127.0.0.1/8", "::1/128" }) {
2626 consoleACL.addMask(mask);
2627 }
2628 g_consoleACL.setState(consoleACL);
2629
2630 if (g_cmdLine.checkConfig) {
2631 setupLua(true, g_cmdLine.config);
2632 // No exception was thrown
2633 infolog("Configuration '%s' OK!", g_cmdLine.config);
2634 _exit(EXIT_SUCCESS);
2635 }
2636
2637 auto todo=setupLua(false, g_cmdLine.config);
2638
2639 auto localPools = g_pools.getCopy();
2640 {
2641 bool precompute = false;
2642 if (g_policy.getLocal()->name == "chashed") {
2643 precompute = true;
2644 } else {
2645 for (const auto& entry: localPools) {
2646 if (entry.second->policy != nullptr && entry.second->policy->name == "chashed") {
2647 precompute = true;
2648 break ;
2649 }
2650 }
2651 }
2652 if (precompute) {
2653 vinfolog("Pre-computing hashes for consistent hash load-balancing policy");
2654 // pre compute hashes
2655 auto backends = g_dstates.getLocal();
2656 for (auto& backend: *backends) {
2657 backend->hash();
2658 }
2659 }
2660 }
2661
2662 if (!g_cmdLine.locals.empty()) {
2663 for (auto it = g_frontends.begin(); it != g_frontends.end(); ) {
2664 /* DoH, DoT and DNSCrypt frontends are separate */
2665 if ((*it)->dohFrontend == nullptr && (*it)->tlsFrontend == nullptr && (*it)->dnscryptCtx == nullptr) {
2666 it = g_frontends.erase(it);
2667 }
2668 else {
2669 ++it;
2670 }
2671 }
2672
2673 for(const auto& loc : g_cmdLine.locals) {
2674 /* UDP */
2675 g_frontends.push_back(std::unique_ptr<ClientState>(new ClientState(ComboAddress(loc, 53), false, false, 0, "", {})));
2676 /* TCP */
2677 g_frontends.push_back(std::unique_ptr<ClientState>(new ClientState(ComboAddress(loc, 53), true, false, 0, "", {})));
2678 }
2679 }
2680
2681 if (g_frontends.empty()) {
2682 /* UDP */
2683 g_frontends.push_back(std::unique_ptr<ClientState>(new ClientState(ComboAddress("127.0.0.1", 53), false, false, 0, "", {})));
2684 /* TCP */
2685 g_frontends.push_back(std::unique_ptr<ClientState>(new ClientState(ComboAddress("127.0.0.1", 53), true, false, 0, "", {})));
2686 }
2687
2688 g_configurationDone = true;
2689
2690 for(auto& frontend : g_frontends) {
2691 setUpLocalBind(frontend);
2692
2693 if (frontend->tcp == false) {
2694 ++udpBindsCount;
2695 }
2696 else {
2697 ++tcpBindsCount;
2698 }
2699 }
2700
2701 warnlog("dnsdist %s comes with ABSOLUTELY NO WARRANTY. This is free software, and you are welcome to redistribute it according to the terms of the GPL version 2", VERSION);
2702
2703 vector<string> vec;
2704 std::string acls;
2705 g_ACL.getLocal()->toStringVector(&vec);
2706 for(const auto& s : vec) {
2707 if (!acls.empty())
2708 acls += ", ";
2709 acls += s;
2710 }
2711 infolog("ACL allowing queries from: %s", acls.c_str());
2712 vec.clear();
2713 acls.clear();
2714 g_consoleACL.getLocal()->toStringVector(&vec);
2715 for (const auto& entry : vec) {
2716 if (!acls.empty()) {
2717 acls += ", ";
2718 }
2719 acls += entry;
2720 }
2721 infolog("Console ACL allowing connections from: %s", acls.c_str());
2722
2723 #ifdef HAVE_LIBSODIUM
2724 if (g_consoleEnabled && g_consoleKey.empty()) {
2725 warnlog("Warning, the console has been enabled via 'controlSocket()' but no key has been set with 'setKey()' so all connections will fail until a key has been set");
2726 }
2727 #endif
2728
2729 uid_t newgid=0;
2730 gid_t newuid=0;
2731
2732 if(!g_cmdLine.gid.empty())
2733 newgid = strToGID(g_cmdLine.gid.c_str());
2734
2735 if(!g_cmdLine.uid.empty())
2736 newuid = strToUID(g_cmdLine.uid.c_str());
2737
2738 dropGroupPrivs(newgid);
2739 dropUserPrivs(newuid);
2740 try {
2741 /* we might still have capabilities remaining,
2742 for example if we have been started as root
2743 without --uid or --gid (please don't do that)
2744 or as an unprivileged user with ambient
2745 capabilities like CAP_NET_BIND_SERVICE.
2746 */
2747 dropCapabilities();
2748 }
2749 catch(const std::exception& e) {
2750 warnlog("%s", e.what());
2751 }
2752
2753 /* this need to be done _after_ dropping privileges */
2754 g_delay = new DelayPipe<DelayedPacket>();
2755
2756 if (g_snmpAgent) {
2757 g_snmpAgent->run();
2758 }
2759
2760 g_tcpclientthreads = std::unique_ptr<TCPClientCollection>(new TCPClientCollection(g_maxTCPClientThreads, g_useTCPSinglePipe));
2761
2762 for(auto& t : todo)
2763 t();
2764
2765 localPools = g_pools.getCopy();
2766 /* create the default pool no matter what */
2767 createPoolIfNotExists(localPools, "");
2768 if(g_cmdLine.remotes.size()) {
2769 for(const auto& address : g_cmdLine.remotes) {
2770 auto ret=std::make_shared<DownstreamState>(ComboAddress(address, 53));
2771 addServerToPool(localPools, "", ret);
2772 if (ret->connected && !ret->threadStarted.test_and_set()) {
2773 ret->tid = thread(responderThread, ret);
2774 }
2775 g_dstates.modify([ret](servers_t& servers) { servers.push_back(ret); });
2776 }
2777 }
2778 g_pools.setState(localPools);
2779
2780 if(g_dstates.getLocal()->empty()) {
2781 errlog("No downstream servers defined: all packets will get dropped");
2782 // you might define them later, but you need to know
2783 }
2784
2785 checkFileDescriptorsLimits(udpBindsCount, tcpBindsCount);
2786
2787 for(auto& dss : g_dstates.getCopy()) { // it is a copy, but the internal shared_ptrs are the real deal
2788 if(dss->availability==DownstreamState::Availability::Auto) {
2789 bool newState=upCheck(dss);
2790 warnlog("Marking downstream %s as '%s'", dss->getNameWithAddr(), newState ? "up" : "down");
2791 dss->upStatus = newState;
2792 }
2793 }
2794
2795 for(auto& cs : g_frontends) {
2796 if (cs->dohFrontend != nullptr) {
2797 #ifdef HAVE_DNS_OVER_HTTPS
2798 std::thread t1(dohThread, cs.get());
2799 if (!cs->cpus.empty()) {
2800 mapThreadToCPUList(t1.native_handle(), cs->cpus);
2801 }
2802 t1.detach();
2803 #endif /* HAVE_DNS_OVER_HTTPS */
2804 continue;
2805 }
2806 if (cs->udpFD >= 0) {
2807 thread t1(udpClientThread, cs.get());
2808 if (!cs->cpus.empty()) {
2809 mapThreadToCPUList(t1.native_handle(), cs->cpus);
2810 }
2811 t1.detach();
2812 }
2813 else if (cs->tcpFD >= 0) {
2814 thread t1(tcpAcceptorThread, cs.get());
2815 if (!cs->cpus.empty()) {
2816 mapThreadToCPUList(t1.native_handle(), cs->cpus);
2817 }
2818 t1.detach();
2819 }
2820 }
2821
2822 thread carbonthread(carbonDumpThread);
2823 carbonthread.detach();
2824
2825 thread stattid(maintThread);
2826 stattid.detach();
2827
2828 thread healththread(healthChecksThread);
2829
2830 if (!g_secPollSuffix.empty()) {
2831 thread secpollthread(secPollThread);
2832 secpollthread.detach();
2833 }
2834
2835 if(g_cmdLine.beSupervised) {
2836 #ifdef HAVE_SYSTEMD
2837 sd_notify(0, "READY=1");
2838 #endif
2839 healththread.join();
2840 }
2841 else {
2842 healththread.detach();
2843 doConsole();
2844 }
2845 _exit(EXIT_SUCCESS);
2846
2847 }
2848 catch(const LuaContext::ExecutionErrorException& e) {
2849 try {
2850 errlog("Fatal Lua error: %s", e.what());
2851 std::rethrow_if_nested(e);
2852 } catch(const std::exception& ne) {
2853 errlog("Details: %s", ne.what());
2854 }
2855 catch(PDNSException &ae)
2856 {
2857 errlog("Fatal pdns error: %s", ae.reason);
2858 }
2859 _exit(EXIT_FAILURE);
2860 }
2861 catch(std::exception &e)
2862 {
2863 errlog("Fatal error: %s", e.what());
2864 _exit(EXIT_FAILURE);
2865 }
2866 catch(PDNSException &ae)
2867 {
2868 errlog("Fatal pdns error: %s", ae.reason);
2869 _exit(EXIT_FAILURE);
2870 }
2871
2872 uint64_t getLatencyCount(const std::string&)
2873 {
2874 return g_stats.responses + g_stats.selfAnswered + g_stats.cacheHits;
2875 }