]> git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/dnsdist.cc
Merge pull request #7918 from mind04/soacomplete
[thirdparty/pdns.git] / pdns / dnsdist.cc
1 /*
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
8 *
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
22
23 #include "config.h"
24
25 #include <fstream>
26 #include <getopt.h>
27 #include <grp.h>
28 #include <limits>
29 #include <netinet/tcp.h>
30 #include <pwd.h>
31 #include <sys/resource.h>
32 #include <unistd.h>
33
34 #if defined (__OpenBSD__) || defined(__NetBSD__)
35 #include <readline/readline.h>
36 #else
37 #include <editline/readline.h>
38 #endif
39
40 #ifdef HAVE_SYSTEMD
41 #include <systemd/sd-daemon.h>
42 #endif
43
44 #include "dnsdist.hh"
45 #include "dnsdist-cache.hh"
46 #include "dnsdist-console.hh"
47 #include "dnsdist-ecs.hh"
48 #include "dnsdist-lua.hh"
49 #include "dnsdist-rings.hh"
50 #include "dnsdist-secpoll.hh"
51 #include "dnsdist-xpf.hh"
52
53 #include "base64.hh"
54 #include "delaypipe.hh"
55 #include "dolog.hh"
56 #include "dnsname.hh"
57 #include "dnsparser.hh"
58 #include "dnswriter.hh"
59 #include "ednsoptions.hh"
60 #include "gettime.hh"
61 #include "lock.hh"
62 #include "misc.hh"
63 #include "sodcrypto.hh"
64 #include "sstuff.hh"
65 #include "threadname.hh"
66
67 /* Known sins:
68
69 Receiver is currently single threaded
70 not *that* bad actually, but now that we are thread safe, might want to scale
71 */
72
73 /* the Rulaction plan
74 Set of Rules, if one matches, it leads to an Action
75 Both rules and actions could conceivably be Lua based.
76 On the C++ side, both could be inherited from a class Rule and a class Action,
77 on the Lua side we can't do that. */
78
79 using std::atomic;
80 using std::thread;
81 bool g_verbose;
82
83 struct DNSDistStats g_stats;
84 MetricDefinitionStorage g_metricDefinitions;
85
86 uint16_t g_maxOutstanding{10240};
87 bool g_verboseHealthChecks{false};
88 uint32_t g_staleCacheEntriesTTL{0};
89 bool g_syslog{true};
90 bool g_allowEmptyResponse{false};
91
92 GlobalStateHolder<NetmaskGroup> g_ACL;
93 string g_outputBuffer;
94
95 std::vector<std::shared_ptr<TLSFrontend>> g_tlslocals;
96 std::vector<std::shared_ptr<DOHFrontend>> g_dohlocals;
97 std::vector<std::shared_ptr<DNSCryptContext>> g_dnsCryptLocals;
98 #ifdef HAVE_EBPF
99 shared_ptr<BPFFilter> g_defaultBPFFilter;
100 std::vector<std::shared_ptr<DynBPFFilter> > g_dynBPFFilters;
101 #endif /* HAVE_EBPF */
102 std::vector<std::unique_ptr<ClientState>> g_frontends;
103 GlobalStateHolder<pools_t> g_pools;
104 size_t g_udpVectorSize{1};
105
106 bool g_snmpEnabled{false};
107 bool g_snmpTrapsEnabled{false};
108 DNSDistSNMPAgent* g_snmpAgent{nullptr};
109
110 /* UDP: the grand design. Per socket we listen on for incoming queries there is one thread.
111 Then we have a bunch of connected sockets for talking to downstream servers.
112 We send directly to those sockets.
113
114 For the return path, per downstream server we have a thread that listens to responses.
115
116 Per socket there is an array of 2^16 states, when we send out a packet downstream, we note
117 there the original requestor and the original id. The new ID is the offset in the array.
118
119 When an answer comes in on a socket, we look up the offset by the id, and lob it to the
120 original requestor.
121
122 IDs are assigned by atomic increments of the socket offset.
123 */
124
125 GlobalStateHolder<vector<DNSDistRuleAction> > g_rulactions;
126 GlobalStateHolder<vector<DNSDistResponseRuleAction> > g_resprulactions;
127 GlobalStateHolder<vector<DNSDistResponseRuleAction> > g_cachehitresprulactions;
128 GlobalStateHolder<vector<DNSDistResponseRuleAction> > g_selfansweredresprulactions;
129
130 Rings g_rings;
131 QueryCount g_qcount;
132
133 GlobalStateHolder<servers_t> g_dstates;
134 GlobalStateHolder<NetmaskTree<DynBlock>> g_dynblockNMG;
135 GlobalStateHolder<SuffixMatchTree<DynBlock>> g_dynblockSMT;
136 DNSAction::Action g_dynBlockAction = DNSAction::Action::Drop;
137 int g_tcpRecvTimeout{2};
138 int g_tcpSendTimeout{2};
139 int g_udpTimeout{2};
140
141 bool g_servFailOnNoPolicy{false};
142 bool g_truncateTC{false};
143 bool g_fixupCase{false};
144 bool g_preserveTrailingData{false};
145 bool g_roundrobinFailOnNoServer{false};
146
147 static void truncateTC(char* packet, uint16_t* len, size_t responseSize, unsigned int consumed)
148 try
149 {
150 bool hadEDNS = false;
151 uint16_t payloadSize = 0;
152 uint16_t z = 0;
153
154 if (g_addEDNSToSelfGeneratedResponses) {
155 hadEDNS = getEDNSUDPPayloadSizeAndZ(packet, *len, &payloadSize, &z);
156 }
157
158 *len=static_cast<uint16_t>(sizeof(dnsheader)+consumed+DNS_TYPE_SIZE+DNS_CLASS_SIZE);
159 struct dnsheader* dh = reinterpret_cast<struct dnsheader*>(packet);
160 dh->ancount = dh->arcount = dh->nscount = 0;
161
162 if (hadEDNS) {
163 addEDNS(dh, *len, responseSize, z & EDNS_HEADER_FLAG_DO, payloadSize, 0);
164 }
165 }
166 catch(...)
167 {
168 g_stats.truncFail++;
169 }
170
171 struct DelayedPacket
172 {
173 int fd;
174 string packet;
175 ComboAddress destination;
176 ComboAddress origDest;
177 void operator()()
178 {
179 ssize_t res;
180 if(origDest.sin4.sin_family == 0) {
181 res = sendto(fd, packet.c_str(), packet.size(), 0, (struct sockaddr*)&destination, destination.getSocklen());
182 }
183 else {
184 res = sendfromto(fd, packet.c_str(), packet.size(), 0, origDest, destination);
185 }
186 if (res == -1) {
187 int err = errno;
188 vinfolog("Error sending delayed response to %s: %s", destination.toStringWithPort(), strerror(err));
189 }
190 }
191 };
192
193 DelayPipe<DelayedPacket>* g_delay = nullptr;
194
195 void doLatencyStats(double udiff)
196 {
197 if(udiff < 1000) ++g_stats.latency0_1;
198 else if(udiff < 10000) ++g_stats.latency1_10;
199 else if(udiff < 50000) ++g_stats.latency10_50;
200 else if(udiff < 100000) ++g_stats.latency50_100;
201 else if(udiff < 1000000) ++g_stats.latency100_1000;
202 else ++g_stats.latencySlow;
203 g_stats.latencySum += udiff / 1000;
204
205 auto doAvg = [](double& var, double n, double weight) {
206 var = (weight -1) * var/weight + n/weight;
207 };
208
209 doAvg(g_stats.latencyAvg100, udiff, 100);
210 doAvg(g_stats.latencyAvg1000, udiff, 1000);
211 doAvg(g_stats.latencyAvg10000, udiff, 10000);
212 doAvg(g_stats.latencyAvg1000000, udiff, 1000000);
213 }
214
215 bool responseContentMatches(const char* response, const uint16_t responseLen, const DNSName& qname, const uint16_t qtype, const uint16_t qclass, const ComboAddress& remote, unsigned int& consumed)
216 {
217 if (responseLen < sizeof(dnsheader)) {
218 return false;
219 }
220
221 const struct dnsheader* dh = reinterpret_cast<const struct dnsheader*>(response);
222 if (dh->qdcount == 0) {
223 if ((dh->rcode != RCode::NoError && dh->rcode != RCode::NXDomain) || g_allowEmptyResponse) {
224 return true;
225 }
226 else {
227 ++g_stats.nonCompliantResponses;
228 return false;
229 }
230 }
231
232 uint16_t rqtype, rqclass;
233 DNSName rqname;
234 try {
235 rqname=DNSName(response, responseLen, sizeof(dnsheader), false, &rqtype, &rqclass, &consumed);
236 }
237 catch(const std::exception& e) {
238 if(responseLen > 0 && static_cast<size_t>(responseLen) > sizeof(dnsheader)) {
239 infolog("Backend %s sent us a response with id %d that did not parse: %s", remote.toStringWithPort(), ntohs(dh->id), e.what());
240 }
241 ++g_stats.nonCompliantResponses;
242 return false;
243 }
244
245 if (rqtype != qtype || rqclass != qclass || rqname != qname) {
246 return false;
247 }
248
249 return true;
250 }
251
252 static void restoreFlags(struct dnsheader* dh, uint16_t origFlags)
253 {
254 static const uint16_t rdMask = 1 << FLAGS_RD_OFFSET;
255 static const uint16_t cdMask = 1 << FLAGS_CD_OFFSET;
256 static const uint16_t restoreFlagsMask = UINT16_MAX & ~(rdMask | cdMask);
257 uint16_t * flags = getFlagsFromDNSHeader(dh);
258 /* clear the flags we are about to restore */
259 *flags &= restoreFlagsMask;
260 /* only keep the flags we want to restore */
261 origFlags &= ~restoreFlagsMask;
262 /* set the saved flags as they were */
263 *flags |= origFlags;
264 }
265
266 static bool fixUpQueryTurnedResponse(DNSQuestion& dq, const uint16_t origFlags)
267 {
268 restoreFlags(dq.dh, origFlags);
269
270 return addEDNSToQueryTurnedResponse(dq);
271 }
272
273 static bool fixUpResponse(char** response, uint16_t* responseLen, size_t* responseSize, const DNSName& qname, uint16_t origFlags, bool ednsAdded, bool ecsAdded, std::vector<uint8_t>& rewrittenResponse, uint16_t addRoom, bool* zeroScope)
274 {
275 if (*responseLen < sizeof(dnsheader)) {
276 return false;
277 }
278
279 struct dnsheader* dh = reinterpret_cast<struct dnsheader*>(*response);
280 restoreFlags(dh, origFlags);
281
282 if (*responseLen == sizeof(dnsheader)) {
283 return true;
284 }
285
286 if(g_fixupCase) {
287 string realname = qname.toDNSString();
288 if (*responseLen >= (sizeof(dnsheader) + realname.length())) {
289 memcpy(*response + sizeof(dnsheader), realname.c_str(), realname.length());
290 }
291 }
292
293 if (ednsAdded || ecsAdded) {
294 uint16_t optStart;
295 size_t optLen = 0;
296 bool last = false;
297
298 const std::string responseStr(*response, *responseLen);
299 int res = locateEDNSOptRR(responseStr, &optStart, &optLen, &last);
300
301 if (res == 0) {
302 if (zeroScope) { // this finds if an EDNS Client Subnet scope was set, and if it is 0
303 size_t optContentStart = 0;
304 uint16_t optContentLen = 0;
305 /* we need at least 4 bytes after the option length (family: 2, source prefix-length: 1, scope prefix-length: 1) */
306 if (isEDNSOptionInOpt(responseStr, optStart, optLen, EDNSOptionCode::ECS, &optContentStart, &optContentLen) && optContentLen >= 4) {
307 /* see if the EDNS Client Subnet SCOPE PREFIX-LENGTH byte in position 3 is set to 0, which is the only thing
308 we care about. */
309 *zeroScope = responseStr.at(optContentStart + 3) == 0;
310 }
311 }
312
313 if (ednsAdded) {
314 /* we added the entire OPT RR,
315 therefore we need to remove it entirely */
316 if (last) {
317 /* simply remove the last AR */
318 *responseLen -= optLen;
319 uint16_t arcount = ntohs(dh->arcount);
320 arcount--;
321 dh->arcount = htons(arcount);
322 }
323 else {
324 /* Removing an intermediary RR could lead to compression error */
325 if (rewriteResponseWithoutEDNS(responseStr, rewrittenResponse) == 0) {
326 *responseLen = rewrittenResponse.size();
327 if (addRoom && (UINT16_MAX - *responseLen) > addRoom) {
328 rewrittenResponse.reserve(*responseLen + addRoom);
329 }
330 *responseSize = rewrittenResponse.capacity();
331 *response = reinterpret_cast<char*>(rewrittenResponse.data());
332 }
333 else {
334 warnlog("Error rewriting content");
335 }
336 }
337 }
338 else {
339 /* the OPT RR was already present, but without ECS,
340 we need to remove the ECS option if any */
341 if (last) {
342 /* nothing after the OPT RR, we can simply remove the
343 ECS option */
344 size_t existingOptLen = optLen;
345 removeEDNSOptionFromOPT(*response + optStart, &optLen, EDNSOptionCode::ECS);
346 *responseLen -= (existingOptLen - optLen);
347 }
348 else {
349 /* Removing an intermediary RR could lead to compression error */
350 if (rewriteResponseWithoutEDNSOption(responseStr, EDNSOptionCode::ECS, rewrittenResponse) == 0) {
351 *responseLen = rewrittenResponse.size();
352 if (addRoom && (UINT16_MAX - *responseLen) > addRoom) {
353 rewrittenResponse.reserve(*responseLen + addRoom);
354 }
355 *responseSize = rewrittenResponse.capacity();
356 *response = reinterpret_cast<char*>(rewrittenResponse.data());
357 }
358 else {
359 warnlog("Error rewriting content");
360 }
361 }
362 }
363 }
364 }
365
366 return true;
367 }
368
369 #ifdef HAVE_DNSCRYPT
370 static bool encryptResponse(char* response, uint16_t* responseLen, size_t responseSize, bool tcp, std::shared_ptr<DNSCryptQuery> dnsCryptQuery, dnsheader** dh, dnsheader* dhCopy)
371 {
372 if (dnsCryptQuery) {
373 uint16_t encryptedResponseLen = 0;
374
375 /* save the original header before encrypting it in place */
376 if (dh != nullptr && *dh != nullptr && dhCopy != nullptr) {
377 memcpy(dhCopy, *dh, sizeof(dnsheader));
378 *dh = dhCopy;
379 }
380
381 int res = dnsCryptQuery->encryptResponse(response, *responseLen, responseSize, tcp, &encryptedResponseLen);
382 if (res == 0) {
383 *responseLen = encryptedResponseLen;
384 } else {
385 /* dropping response */
386 vinfolog("Error encrypting the response, dropping.");
387 return false;
388 }
389 }
390 return true;
391 }
392 #endif /* HAVE_DNSCRYPT */
393
394 static bool applyRulesToResponse(LocalStateHolder<vector<DNSDistResponseRuleAction> >& localRespRulactions, DNSResponse& dr)
395 {
396 DNSResponseAction::Action action=DNSResponseAction::Action::None;
397 std::string ruleresult;
398 for(const auto& lr : *localRespRulactions) {
399 if(lr.d_rule->matches(&dr)) {
400 lr.d_rule->d_matches++;
401 action=(*lr.d_action)(&dr, &ruleresult);
402 switch(action) {
403 case DNSResponseAction::Action::Allow:
404 return true;
405 break;
406 case DNSResponseAction::Action::Drop:
407 return false;
408 break;
409 case DNSResponseAction::Action::HeaderModify:
410 return true;
411 break;
412 case DNSResponseAction::Action::ServFail:
413 dr.dh->rcode = RCode::ServFail;
414 return true;
415 break;
416 /* non-terminal actions follow */
417 case DNSResponseAction::Action::Delay:
418 dr.delayMsec = static_cast<int>(pdns_stou(ruleresult)); // sorry
419 break;
420 case DNSResponseAction::Action::None:
421 break;
422 }
423 }
424 }
425
426 return true;
427 }
428
429 bool processResponse(char** response, uint16_t* responseLen, size_t* responseSize, LocalStateHolder<vector<DNSDistResponseRuleAction> >& localRespRulactions, DNSResponse& dr, size_t addRoom, std::vector<uint8_t>& rewrittenResponse, bool muted)
430 {
431 if (!applyRulesToResponse(localRespRulactions, dr)) {
432 return false;
433 }
434
435 bool zeroScope = false;
436 if (!fixUpResponse(response, responseLen, responseSize, *dr.qname, dr.origFlags, dr.ednsAdded, dr.ecsAdded, rewrittenResponse, addRoom, dr.useZeroScope ? &zeroScope : nullptr)) {
437 return false;
438 }
439
440 if (dr.packetCache && !dr.skipCache) {
441 if (!dr.useZeroScope) {
442 /* if the query was not suitable for zero-scope, for
443 example because it had an existing ECS entry so the hash is
444 not really 'no ECS', so just insert it for the existing subnet
445 since:
446 - we don't have the correct hash for a non-ECS query
447 - inserting with hash computed before the ECS replacement but with
448 the subnet extracted _after_ the replacement would not work.
449 */
450 zeroScope = false;
451 }
452 // if zeroScope, pass the pre-ECS hash-key and do not pass the subnet to the cache
453 dr.packetCache->insert(zeroScope ? dr.cacheKeyNoECS : dr.cacheKey, zeroScope ? boost::none : dr.subnet, dr.origFlags, dr.dnssecOK, *dr.qname, dr.qtype, dr.qclass, *response, *responseLen, dr.tcp, dr.dh->rcode, dr.tempFailureTTL);
454 }
455
456 #ifdef HAVE_DNSCRYPT
457 if (!muted) {
458 if (!encryptResponse(*response, responseLen, *responseSize, dr.tcp, dr.dnsCryptQuery, nullptr, nullptr)) {
459 return false;
460 }
461 }
462 #endif /* HAVE_DNSCRYPT */
463
464 return true;
465 }
466
467 static bool sendUDPResponse(int origFD, const char* response, const uint16_t responseLen, const int delayMsec, const ComboAddress& origDest, const ComboAddress& origRemote)
468 {
469 if(delayMsec && g_delay) {
470 DelayedPacket dp{origFD, string(response,responseLen), origRemote, origDest};
471 g_delay->submit(dp, delayMsec);
472 }
473 else {
474 ssize_t res;
475 if(origDest.sin4.sin_family == 0) {
476 res = sendto(origFD, response, responseLen, 0, reinterpret_cast<const struct sockaddr*>(&origRemote), origRemote.getSocklen());
477 }
478 else {
479 res = sendfromto(origFD, response, responseLen, 0, origDest, origRemote);
480 }
481 if (res == -1) {
482 int err = errno;
483 vinfolog("Error sending response to %s: %s", origRemote.toStringWithPort(), strerror(err));
484 }
485 }
486
487 return true;
488 }
489
490
491 int pickBackendSocketForSending(std::shared_ptr<DownstreamState>& state)
492 {
493 return state->sockets[state->socketsOffset++ % state->sockets.size()];
494 }
495
496 static void pickBackendSocketsReadyForReceiving(const std::shared_ptr<DownstreamState>& state, std::vector<int>& ready)
497 {
498 ready.clear();
499
500 if (state->sockets.size() == 1) {
501 ready.push_back(state->sockets[0]);
502 return ;
503 }
504
505 {
506 std::lock_guard<std::mutex> lock(state->socketsLock);
507 state->mplexer->getAvailableFDs(ready, -1);
508 }
509 }
510
511 // listens on a dedicated socket, lobs answers from downstream servers to original requestors
512 void responderThread(std::shared_ptr<DownstreamState> dss)
513 try {
514 setThreadName("dnsdist/respond");
515 auto localRespRulactions = g_resprulactions.getLocal();
516 char packet[4096 + DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE];
517 static_assert(sizeof(packet) <= UINT16_MAX, "Packet size should fit in a uint16_t");
518 /* when the answer is encrypted in place, we need to get a copy
519 of the original header before encryption to fill the ring buffer */
520 dnsheader cleartextDH;
521 vector<uint8_t> rewrittenResponse;
522
523 uint16_t queryId = 0;
524 std::vector<int> sockets;
525 sockets.reserve(dss->sockets.size());
526
527 for(;;) {
528 dnsheader* dh = reinterpret_cast<struct dnsheader*>(packet);
529 try {
530 pickBackendSocketsReadyForReceiving(dss, sockets);
531 for (const auto& fd : sockets) {
532 ssize_t got = recv(fd, packet, sizeof(packet), 0);
533 char * response = packet;
534 size_t responseSize = sizeof(packet);
535
536 if (got < 0 || static_cast<size_t>(got) < sizeof(dnsheader))
537 continue;
538
539 uint16_t responseLen = static_cast<uint16_t>(got);
540 queryId = dh->id;
541
542 if(queryId >= dss->idStates.size()) {
543 continue;
544 }
545
546 IDState* ids = &dss->idStates[queryId];
547 int origFD = ids->origFD;
548
549 if(origFD < 0 && ids->du == nullptr) // duplicate
550 continue;
551
552 /* setting age to 0 to prevent the maintainer thread from
553 cleaning this IDS while we process the response.
554 We have already a copy of the origFD, so it would
555 mostly mess up the outstanding counter.
556 */
557 ids->age = 0;
558
559 unsigned int consumed = 0;
560 if (!responseContentMatches(response, responseLen, ids->qname, ids->qtype, ids->qclass, dss->remote, consumed)) {
561 continue;
562 }
563
564 int oldFD = ids->origFD.exchange(-1);
565 if (oldFD == origFD) {
566 /* we only decrement the outstanding counter if the value was not
567 altered in the meantime, which would mean that the state has been actively reused
568 and the other thread has not incremented the outstanding counter, so we don't
569 want it to be decremented twice. */
570 --dss->outstanding; // you'd think an attacker could game this, but we're using connected socket
571 }
572
573 if(dh->tc && g_truncateTC) {
574 truncateTC(response, &responseLen, responseSize, consumed);
575 }
576
577 dh->id = ids->origID;
578
579 uint16_t addRoom = 0;
580 DNSResponse dr = makeDNSResponseFromIDState(*ids, dh, sizeof(packet), responseLen, false);
581 if (dr.dnsCryptQuery) {
582 addRoom = DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE;
583 }
584
585 memcpy(&cleartextDH, dr.dh, sizeof(cleartextDH));
586 if (!processResponse(&response, &responseLen, &responseSize, localRespRulactions, dr, addRoom, rewrittenResponse, ids->cs && ids->cs->muted)) {
587 continue;
588 }
589
590 if (ids->cs && !ids->cs->muted) {
591 if (ids->du) {
592 #ifdef HAVE_DNS_OVER_HTTPS
593 // DoH query
594 ids->du->query = std::string(response, responseLen);
595 if (send(ids->du->rsock, &ids->du, sizeof(ids->du), 0) != sizeof(ids->du)) {
596 delete ids->du;
597 }
598 #endif /* HAVE_DNS_OVER_HTTPS */
599 ids->du = nullptr;
600 }
601 else {
602 ComboAddress empty;
603 empty.sin4.sin_family = 0;
604 /* if ids->destHarvested is false, origDest holds the listening address.
605 We don't want to use that as a source since it could be 0.0.0.0 for example. */
606 sendUDPResponse(origFD, response, responseLen, dr.delayMsec, ids->destHarvested ? ids->origDest : empty, ids->origRemote);
607 }
608 }
609
610 ++g_stats.responses;
611
612 double udiff = ids->sentTime.udiff();
613 vinfolog("Got answer from %s, relayed to %s%s, took %f usec", dss->remote.toStringWithPort(), ids->origRemote.toStringWithPort(),
614 ids->du ? " (https)": "", udiff);
615
616 struct timespec ts;
617 gettime(&ts);
618 g_rings.insertResponse(ts, *dr.remote, *dr.qname, dr.qtype, static_cast<unsigned int>(udiff), static_cast<unsigned int>(got), cleartextDH, dss->remote);
619
620 switch (dh->rcode) {
621 case RCode::NXDomain:
622 ++g_stats.frontendNXDomain;
623 break;
624 case RCode::ServFail:
625 ++g_stats.servfailResponses;
626 ++g_stats.frontendServFail;
627 break;
628 case RCode::NoError:
629 ++g_stats.frontendNoError;
630 break;
631 }
632 dss->latencyUsec = (127.0 * dss->latencyUsec / 128.0) + udiff/128.0;
633
634 doLatencyStats(udiff);
635
636 rewrittenResponse.clear();
637 }
638 }
639 catch(const std::exception& e){
640 vinfolog("Got an error in UDP responder thread while parsing a response from %s, id %d: %s", dss->remote.toStringWithPort(), queryId, e.what());
641 }
642 }
643 }
644 catch(const std::exception& e)
645 {
646 errlog("UDP responder thread died because of exception: %s", e.what());
647 }
648 catch(const PDNSException& e)
649 {
650 errlog("UDP responder thread died because of PowerDNS exception: %s", e.reason);
651 }
652 catch(...)
653 {
654 errlog("UDP responder thread died because of an exception: %s", "unknown");
655 }
656
657 bool DownstreamState::reconnect()
658 {
659 std::unique_lock<std::mutex> tl(connectLock, std::try_to_lock);
660 if (!tl.owns_lock()) {
661 /* we are already reconnecting */
662 return false;
663 }
664
665 connected = false;
666 for (auto& fd : sockets) {
667 if (fd != -1) {
668 if (sockets.size() > 1) {
669 std::lock_guard<std::mutex> lock(socketsLock);
670 mplexer->removeReadFD(fd);
671 }
672 /* shutdown() is needed to wake up recv() in the responderThread */
673 shutdown(fd, SHUT_RDWR);
674 close(fd);
675 fd = -1;
676 }
677 if (!IsAnyAddress(remote)) {
678 fd = SSocket(remote.sin4.sin_family, SOCK_DGRAM, 0);
679 if (!IsAnyAddress(sourceAddr)) {
680 SSetsockopt(fd, SOL_SOCKET, SO_REUSEADDR, 1);
681 SBind(fd, sourceAddr);
682 }
683 try {
684 SConnect(fd, remote);
685 if (sockets.size() > 1) {
686 std::lock_guard<std::mutex> lock(socketsLock);
687 mplexer->addReadFD(fd, [](int, boost::any) {});
688 }
689 connected = true;
690 }
691 catch(const std::runtime_error& error) {
692 infolog("Error connecting to new server with address %s: %s", remote.toStringWithPort(), error.what());
693 connected = false;
694 break;
695 }
696 }
697 }
698
699 /* if at least one (re-)connection failed, close all sockets */
700 if (!connected) {
701 for (auto& fd : sockets) {
702 if (fd != -1) {
703 if (sockets.size() > 1) {
704 std::lock_guard<std::mutex> lock(socketsLock);
705 mplexer->removeReadFD(fd);
706 }
707 /* shutdown() is needed to wake up recv() in the responderThread */
708 shutdown(fd, SHUT_RDWR);
709 close(fd);
710 fd = -1;
711 }
712 }
713 }
714
715 return connected;
716 }
717 void DownstreamState::hash()
718 {
719 vinfolog("Computing hashes for id=%s and weight=%d", id, weight);
720 auto w = weight;
721 WriteLock wl(&d_lock);
722 hashes.clear();
723 while (w > 0) {
724 std::string uuid = boost::str(boost::format("%s-%d") % id % w);
725 unsigned int wshash = burtleCI((const unsigned char*)uuid.c_str(), uuid.size(), g_hashperturb);
726 hashes.insert(wshash);
727 --w;
728 }
729 }
730
731 void DownstreamState::setId(const boost::uuids::uuid& newId)
732 {
733 id = newId;
734 // compute hashes only if already done
735 if (!hashes.empty()) {
736 hash();
737 }
738 }
739
740 void DownstreamState::setWeight(int newWeight)
741 {
742 if (newWeight < 1) {
743 errlog("Error setting server's weight: downstream weight value must be greater than 0.");
744 return ;
745 }
746 weight = newWeight;
747 if (!hashes.empty()) {
748 hash();
749 }
750 }
751
752 DownstreamState::DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf_, size_t numberOfSockets): remote(remote_), sourceAddr(sourceAddr_), sourceItf(sourceItf_)
753 {
754 pthread_rwlock_init(&d_lock, nullptr);
755 id = getUniqueID();
756 threadStarted.clear();
757
758 mplexer = std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
759
760 sockets.resize(numberOfSockets);
761 for (auto& fd : sockets) {
762 fd = -1;
763 }
764
765 if (!IsAnyAddress(remote)) {
766 reconnect();
767 idStates.resize(g_maxOutstanding);
768 sw.start();
769 infolog("Added downstream server %s", remote.toStringWithPort());
770 }
771
772 }
773
774 std::mutex g_luamutex;
775 LuaContext g_lua;
776
777 GlobalStateHolder<ServerPolicy> g_policy;
778
779 shared_ptr<DownstreamState> firstAvailable(const NumberedServerVector& servers, const DNSQuestion* dq)
780 {
781 for(auto& d : servers) {
782 if(d.second->isUp() && d.second->qps.check())
783 return d.second;
784 }
785 return leastOutstanding(servers, dq);
786 }
787
788 // get server with least outstanding queries, and within those, with the lowest order, and within those: the fastest
789 shared_ptr<DownstreamState> leastOutstanding(const NumberedServerVector& servers, const DNSQuestion* dq)
790 {
791 if (servers.size() == 1 && servers[0].second->isUp()) {
792 return servers[0].second;
793 }
794
795 vector<pair<tuple<int,int,double>, shared_ptr<DownstreamState>>> poss;
796 /* so you might wonder, why do we go through this trouble? The data on which we sort could change during the sort,
797 which would suck royally and could even lead to crashes. So first we snapshot on what we sort, and then we sort */
798 poss.reserve(servers.size());
799 for(auto& d : servers) {
800 if(d.second->isUp()) {
801 poss.push_back({make_tuple(d.second->outstanding.load(), d.second->order, d.second->latencyUsec), d.second});
802 }
803 }
804 if(poss.empty())
805 return shared_ptr<DownstreamState>();
806 nth_element(poss.begin(), poss.begin(), poss.end(), [](const decltype(poss)::value_type& a, const decltype(poss)::value_type& b) { return a.first < b.first; });
807 return poss.begin()->second;
808 }
809
810 shared_ptr<DownstreamState> valrandom(unsigned int val, const NumberedServerVector& servers, const DNSQuestion* dq)
811 {
812 vector<pair<int, shared_ptr<DownstreamState>>> poss;
813 int sum = 0;
814 int max = std::numeric_limits<int>::max();
815
816 for(auto& d : servers) { // w=1, w=10 -> 1, 11
817 if(d.second->isUp()) {
818 // Don't overflow sum when adding high weights
819 if(d.second->weight > max - sum) {
820 sum = max;
821 } else {
822 sum += d.second->weight;
823 }
824
825 poss.push_back({sum, d.second});
826 }
827 }
828
829 // Catch poss & sum are empty to avoid SIGFPE
830 if(poss.empty())
831 return shared_ptr<DownstreamState>();
832
833 int r = val % sum;
834 auto p = upper_bound(poss.begin(), poss.end(),r, [](int r_, const decltype(poss)::value_type& a) { return r_ < a.first;});
835 if(p==poss.end())
836 return shared_ptr<DownstreamState>();
837 return p->second;
838 }
839
840 shared_ptr<DownstreamState> wrandom(const NumberedServerVector& servers, const DNSQuestion* dq)
841 {
842 return valrandom(random(), servers, dq);
843 }
844
845 uint32_t g_hashperturb;
846 shared_ptr<DownstreamState> whashed(const NumberedServerVector& servers, const DNSQuestion* dq)
847 {
848 return valrandom(dq->qname->hash(g_hashperturb), servers, dq);
849 }
850
851 shared_ptr<DownstreamState> chashed(const NumberedServerVector& servers, const DNSQuestion* dq)
852 {
853 unsigned int qhash = dq->qname->hash(g_hashperturb);
854 unsigned int sel = std::numeric_limits<unsigned int>::max();
855 unsigned int min = std::numeric_limits<unsigned int>::max();
856 shared_ptr<DownstreamState> ret = nullptr, first = nullptr;
857
858 for (const auto& d: servers) {
859 if (d.second->isUp()) {
860 // make sure hashes have been computed
861 if (d.second->hashes.empty()) {
862 d.second->hash();
863 }
864 {
865 ReadLock rl(&(d.second->d_lock));
866 const auto& server = d.second;
867 // we want to keep track of the last hash
868 if (min > *(server->hashes.begin())) {
869 min = *(server->hashes.begin());
870 first = server;
871 }
872
873 auto hash_it = server->hashes.lower_bound(qhash);
874 if (hash_it != server->hashes.end()) {
875 if (*hash_it < sel) {
876 sel = *hash_it;
877 ret = server;
878 }
879 }
880 }
881 }
882 }
883 if (ret != nullptr) {
884 return ret;
885 }
886 if (first != nullptr) {
887 return first;
888 }
889 return shared_ptr<DownstreamState>();
890 }
891
892 shared_ptr<DownstreamState> roundrobin(const NumberedServerVector& servers, const DNSQuestion* dq)
893 {
894 NumberedServerVector poss;
895
896 for(auto& d : servers) {
897 if(d.second->isUp()) {
898 poss.push_back(d);
899 }
900 }
901
902 const auto *res=&poss;
903 if(poss.empty() && !g_roundrobinFailOnNoServer)
904 res = &servers;
905
906 if(res->empty())
907 return shared_ptr<DownstreamState>();
908
909 static unsigned int counter;
910
911 return (*res)[(counter++) % res->size()].second;
912 }
913
914 ComboAddress g_serverControl{"127.0.0.1:5199"};
915
916 std::shared_ptr<ServerPool> createPoolIfNotExists(pools_t& pools, const string& poolName)
917 {
918 std::shared_ptr<ServerPool> pool;
919 pools_t::iterator it = pools.find(poolName);
920 if (it != pools.end()) {
921 pool = it->second;
922 }
923 else {
924 if (!poolName.empty())
925 vinfolog("Creating pool %s", poolName);
926 pool = std::make_shared<ServerPool>();
927 pools.insert(std::pair<std::string,std::shared_ptr<ServerPool> >(poolName, pool));
928 }
929 return pool;
930 }
931
932 void setPoolPolicy(pools_t& pools, const string& poolName, std::shared_ptr<ServerPolicy> policy)
933 {
934 std::shared_ptr<ServerPool> pool = createPoolIfNotExists(pools, poolName);
935 if (!poolName.empty()) {
936 vinfolog("Setting pool %s server selection policy to %s", poolName, policy->name);
937 } else {
938 vinfolog("Setting default pool server selection policy to %s", policy->name);
939 }
940 pool->policy = policy;
941 }
942
943 void addServerToPool(pools_t& pools, const string& poolName, std::shared_ptr<DownstreamState> server)
944 {
945 std::shared_ptr<ServerPool> pool = createPoolIfNotExists(pools, poolName);
946 if (!poolName.empty()) {
947 vinfolog("Adding server to pool %s", poolName);
948 } else {
949 vinfolog("Adding server to default pool");
950 }
951 pool->addServer(server);
952 }
953
954 void removeServerFromPool(pools_t& pools, const string& poolName, std::shared_ptr<DownstreamState> server)
955 {
956 std::shared_ptr<ServerPool> pool = getPool(pools, poolName);
957
958 if (!poolName.empty()) {
959 vinfolog("Removing server from pool %s", poolName);
960 }
961 else {
962 vinfolog("Removing server from default pool");
963 }
964
965 pool->removeServer(server);
966 }
967
968 std::shared_ptr<ServerPool> getPool(const pools_t& pools, const std::string& poolName)
969 {
970 pools_t::const_iterator it = pools.find(poolName);
971
972 if (it == pools.end()) {
973 throw std::out_of_range("No pool named " + poolName);
974 }
975
976 return it->second;
977 }
978
979 NumberedServerVector getDownstreamCandidates(const pools_t& pools, const std::string& poolName)
980 {
981 std::shared_ptr<ServerPool> pool = getPool(pools, poolName);
982 return pool->getServers();
983 }
984
985 static void spoofResponseFromString(DNSQuestion& dq, const string& spoofContent)
986 {
987 string result;
988
989 std::vector<std::string> addrs;
990 stringtok(addrs, spoofContent, " ,");
991
992 if (addrs.size() == 1) {
993 try {
994 ComboAddress spoofAddr(spoofContent);
995 SpoofAction sa({spoofAddr});
996 sa(&dq, &result);
997 }
998 catch(const PDNSException &e) {
999 SpoofAction sa(spoofContent); // CNAME then
1000 sa(&dq, &result);
1001 }
1002 } else {
1003 std::vector<ComboAddress> cas;
1004 for (const auto& addr : addrs) {
1005 try {
1006 cas.push_back(ComboAddress(addr));
1007 }
1008 catch (...) {
1009 }
1010 }
1011 SpoofAction sa(cas);
1012 sa(&dq, &result);
1013 }
1014 }
1015
1016 static bool applyRulesToQuery(LocalHolders& holders, DNSQuestion& dq, string& poolname, const struct timespec& now)
1017 {
1018 g_rings.insertQuery(now, *dq.remote, *dq.qname, dq.qtype, dq.len, *dq.dh);
1019
1020 if(g_qcount.enabled) {
1021 string qname = (*dq.qname).toString(".");
1022 bool countQuery{true};
1023 if(g_qcount.filter) {
1024 std::lock_guard<std::mutex> lock(g_luamutex);
1025 std::tie (countQuery, qname) = g_qcount.filter(&dq);
1026 }
1027
1028 if(countQuery) {
1029 WriteLock wl(&g_qcount.queryLock);
1030 if(!g_qcount.records.count(qname)) {
1031 g_qcount.records[qname] = 0;
1032 }
1033 g_qcount.records[qname]++;
1034 }
1035 }
1036
1037 if(auto got = holders.dynNMGBlock->lookup(*dq.remote)) {
1038 auto updateBlockStats = [&got]() {
1039 ++g_stats.dynBlocked;
1040 got->second.blocks++;
1041 };
1042
1043 if(now < got->second.until) {
1044 DNSAction::Action action = got->second.action;
1045 if (action == DNSAction::Action::None) {
1046 action = g_dynBlockAction;
1047 }
1048 switch (action) {
1049 case DNSAction::Action::NoOp:
1050 /* do nothing */
1051 break;
1052
1053 case DNSAction::Action::Nxdomain:
1054 vinfolog("Query from %s turned into NXDomain because of dynamic block", dq.remote->toStringWithPort());
1055 updateBlockStats();
1056
1057 dq.dh->rcode = RCode::NXDomain;
1058 dq.dh->qr=true;
1059 return true;
1060
1061 case DNSAction::Action::Refused:
1062 vinfolog("Query from %s refused because of dynamic block", dq.remote->toStringWithPort());
1063 updateBlockStats();
1064
1065 dq.dh->rcode = RCode::Refused;
1066 dq.dh->qr = true;
1067 return true;
1068
1069 case DNSAction::Action::Truncate:
1070 if(!dq.tcp) {
1071 updateBlockStats();
1072 vinfolog("Query from %s truncated because of dynamic block", dq.remote->toStringWithPort());
1073 dq.dh->tc = true;
1074 dq.dh->qr = true;
1075 return true;
1076 }
1077 else {
1078 vinfolog("Query from %s for %s over TCP *not* truncated because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toString());
1079 }
1080 break;
1081 case DNSAction::Action::NoRecurse:
1082 updateBlockStats();
1083 vinfolog("Query from %s setting rd=0 because of dynamic block", dq.remote->toStringWithPort());
1084 dq.dh->rd = false;
1085 return true;
1086 default:
1087 updateBlockStats();
1088 vinfolog("Query from %s dropped because of dynamic block", dq.remote->toStringWithPort());
1089 return false;
1090 }
1091 }
1092 }
1093
1094 if(auto got = holders.dynSMTBlock->lookup(*dq.qname)) {
1095 auto updateBlockStats = [&got]() {
1096 ++g_stats.dynBlocked;
1097 got->blocks++;
1098 };
1099
1100 if(now < got->until) {
1101 DNSAction::Action action = got->action;
1102 if (action == DNSAction::Action::None) {
1103 action = g_dynBlockAction;
1104 }
1105 switch (action) {
1106 case DNSAction::Action::NoOp:
1107 /* do nothing */
1108 break;
1109 case DNSAction::Action::Nxdomain:
1110 vinfolog("Query from %s for %s turned into NXDomain because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toString());
1111 updateBlockStats();
1112
1113 dq.dh->rcode = RCode::NXDomain;
1114 dq.dh->qr=true;
1115 return true;
1116 case DNSAction::Action::Refused:
1117 vinfolog("Query from %s for %s refused because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toString());
1118 updateBlockStats();
1119
1120 dq.dh->rcode = RCode::Refused;
1121 dq.dh->qr=true;
1122 return true;
1123 case DNSAction::Action::Truncate:
1124 if(!dq.tcp) {
1125 updateBlockStats();
1126
1127 vinfolog("Query from %s for %s truncated because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toString());
1128 dq.dh->tc = true;
1129 dq.dh->qr = true;
1130 return true;
1131 }
1132 else {
1133 vinfolog("Query from %s for %s over TCP *not* truncated because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toString());
1134 }
1135 break;
1136 case DNSAction::Action::NoRecurse:
1137 updateBlockStats();
1138 vinfolog("Query from %s setting rd=0 because of dynamic block", dq.remote->toStringWithPort());
1139 dq.dh->rd = false;
1140 return true;
1141 default:
1142 updateBlockStats();
1143 vinfolog("Query from %s for %s dropped because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toString());
1144 return false;
1145 }
1146 }
1147 }
1148
1149 DNSAction::Action action=DNSAction::Action::None;
1150 string ruleresult;
1151 for(const auto& lr : *holders.rulactions) {
1152 if(lr.d_rule->matches(&dq)) {
1153 lr.d_rule->d_matches++;
1154 action=(*lr.d_action)(&dq, &ruleresult);
1155
1156 switch(action) {
1157 case DNSAction::Action::Allow:
1158 return true;
1159 break;
1160 case DNSAction::Action::Drop:
1161 ++g_stats.ruleDrop;
1162 return false;
1163 break;
1164 case DNSAction::Action::Nxdomain:
1165 dq.dh->rcode = RCode::NXDomain;
1166 dq.dh->qr=true;
1167 ++g_stats.ruleNXDomain;
1168 return true;
1169 break;
1170 case DNSAction::Action::Refused:
1171 dq.dh->rcode = RCode::Refused;
1172 dq.dh->qr=true;
1173 ++g_stats.ruleRefused;
1174 return true;
1175 break;
1176 case DNSAction::Action::ServFail:
1177 dq.dh->rcode = RCode::ServFail;
1178 dq.dh->qr=true;
1179 ++g_stats.ruleServFail;
1180 return true;
1181 break;
1182 case DNSAction::Action::Spoof:
1183 spoofResponseFromString(dq, ruleresult);
1184 return true;
1185 break;
1186 case DNSAction::Action::Truncate:
1187 dq.dh->tc = true;
1188 dq.dh->qr = true;
1189 return true;
1190 break;
1191 case DNSAction::Action::HeaderModify:
1192 return true;
1193 break;
1194 case DNSAction::Action::Pool:
1195 poolname=ruleresult;
1196 return true;
1197 break;
1198 /* non-terminal actions follow */
1199 case DNSAction::Action::Delay:
1200 dq.delayMsec = static_cast<int>(pdns_stou(ruleresult)); // sorry
1201 break;
1202 case DNSAction::Action::None:
1203 /* fall-through */
1204 case DNSAction::Action::NoOp:
1205 break;
1206 case DNSAction::Action::NoRecurse:
1207 dq.dh->rd = false;
1208 return true;
1209 break;
1210 }
1211 }
1212 }
1213
1214 return true;
1215 }
1216
1217 ssize_t udpClientSendRequestToBackend(const std::shared_ptr<DownstreamState>& ss, const int sd, const char* request, const size_t requestLen, bool healthCheck)
1218 {
1219 ssize_t result;
1220
1221 if (ss->sourceItf == 0) {
1222 result = send(sd, request, requestLen, 0);
1223 }
1224 else {
1225 struct msghdr msgh;
1226 struct iovec iov;
1227 cmsgbuf_aligned cbuf;
1228 ComboAddress remote(ss->remote);
1229 fillMSGHdr(&msgh, &iov, &cbuf, sizeof(cbuf), const_cast<char*>(request), requestLen, &remote);
1230 addCMsgSrcAddr(&msgh, &cbuf, &ss->sourceAddr, ss->sourceItf);
1231 result = sendmsg(sd, &msgh, 0);
1232 }
1233
1234 if (result == -1) {
1235 int savederrno = errno;
1236 vinfolog("Error sending request to backend %s: %d", ss->remote.toStringWithPort(), savederrno);
1237
1238 /* This might sound silly, but on Linux send() might fail with EINVAL
1239 if the interface the socket was bound to doesn't exist anymore.
1240 We don't want to reconnect the real socket if the healthcheck failed,
1241 because it's not using the same socket.
1242 */
1243 if (!healthCheck && (savederrno == EINVAL || savederrno == ENODEV)) {
1244 ss->reconnect();
1245 }
1246 }
1247
1248 return result;
1249 }
1250
1251 static bool isUDPQueryAcceptable(ClientState& cs, LocalHolders& holders, const struct msghdr* msgh, const ComboAddress& remote, ComboAddress& dest)
1252 {
1253 if (msgh->msg_flags & MSG_TRUNC) {
1254 /* message was too large for our buffer */
1255 vinfolog("Dropping message too large for our buffer");
1256 ++g_stats.nonCompliantQueries;
1257 return false;
1258 }
1259
1260 if(!holders.acl->match(remote)) {
1261 vinfolog("Query from %s dropped because of ACL", remote.toStringWithPort());
1262 ++g_stats.aclDrops;
1263 return false;
1264 }
1265
1266 cs.queries++;
1267 ++g_stats.queries;
1268
1269 if (HarvestDestinationAddress(msgh, &dest)) {
1270 /* we don't get the port, only the address */
1271 dest.sin4.sin_port = cs.local.sin4.sin_port;
1272 }
1273 else {
1274 dest.sin4.sin_family = 0;
1275 }
1276
1277 return true;
1278 }
1279
1280 boost::optional<std::vector<uint8_t>> checkDNSCryptQuery(const ClientState& cs, const char* query, uint16_t& len, std::shared_ptr<DNSCryptQuery>& dnsCryptQuery, time_t now, bool tcp)
1281 {
1282 if (cs.dnscryptCtx) {
1283 #ifdef HAVE_DNSCRYPT
1284 vector<uint8_t> response;
1285 uint16_t decryptedQueryLen = 0;
1286
1287 dnsCryptQuery = std::make_shared<DNSCryptQuery>(cs.dnscryptCtx);
1288
1289 bool decrypted = handleDNSCryptQuery(const_cast<char*>(query), len, dnsCryptQuery, &decryptedQueryLen, tcp, now, response);
1290
1291 if (!decrypted) {
1292 if (response.size() > 0) {
1293 return response;
1294 }
1295 throw std::runtime_error("Unable to decrypt DNSCrypt query, dropping.");
1296 }
1297
1298 len = decryptedQueryLen;
1299 #endif /* HAVE_DNSCRYPT */
1300 }
1301 return boost::none;
1302 }
1303
1304 bool checkQueryHeaders(const struct dnsheader* dh)
1305 {
1306 if (dh->qr) { // don't respond to responses
1307 ++g_stats.nonCompliantQueries;
1308 return false;
1309 }
1310
1311 if (dh->qdcount == 0) {
1312 ++g_stats.emptyQueries;
1313 return false;
1314 }
1315
1316 if (dh->rd) {
1317 ++g_stats.rdQueries;
1318 }
1319
1320 return true;
1321 }
1322
1323 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1324 static void queueResponse(const ClientState& cs, const char* response, uint16_t responseLen, const ComboAddress& dest, const ComboAddress& remote, struct mmsghdr& outMsg, struct iovec* iov, cmsgbuf_aligned* cbuf)
1325 {
1326 outMsg.msg_len = 0;
1327 fillMSGHdr(&outMsg.msg_hdr, iov, nullptr, 0, const_cast<char*>(response), responseLen, const_cast<ComboAddress*>(&remote));
1328
1329 if (dest.sin4.sin_family == 0) {
1330 outMsg.msg_hdr.msg_control = nullptr;
1331 }
1332 else {
1333 addCMsgSrcAddr(&outMsg.msg_hdr, cbuf, &dest, 0);
1334 }
1335 }
1336 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1337
1338 /* self-generated responses or cache hits */
1339 static bool prepareOutgoingResponse(LocalHolders& holders, ClientState& cs, DNSQuestion& dq, bool cacheHit)
1340 {
1341 DNSResponse dr(dq.qname, dq.qtype, dq.qclass, dq.consumed, dq.local, dq.remote, reinterpret_cast<dnsheader*>(dq.dh), dq.size, dq.len, dq.tcp, dq.queryTime);
1342
1343 #ifdef HAVE_PROTOBUF
1344 dr.uniqueId = dq.uniqueId;
1345 #endif
1346 dr.qTag = dq.qTag;
1347 dr.delayMsec = dq.delayMsec;
1348
1349 if (!applyRulesToResponse(cacheHit ? holders.cacheHitRespRulactions : holders.selfAnsweredRespRulactions, dr)) {
1350 return false;
1351 }
1352
1353 /* in case a rule changed it */
1354 dq.delayMsec = dr.delayMsec;
1355
1356 #ifdef HAVE_DNSCRYPT
1357 if (!cs.muted) {
1358 if (!encryptResponse(reinterpret_cast<char*>(dq.dh), &dq.len, dq.size, dq.tcp, dq.dnsCryptQuery, nullptr, nullptr)) {
1359 return false;
1360 }
1361 }
1362 #endif /* HAVE_DNSCRYPT */
1363
1364 if (cacheHit) {
1365 ++g_stats.cacheHits;
1366 }
1367
1368 switch (dr.dh->rcode) {
1369 case RCode::NXDomain:
1370 ++g_stats.frontendNXDomain;
1371 break;
1372 case RCode::ServFail:
1373 ++g_stats.frontendServFail;
1374 break;
1375 case RCode::NoError:
1376 ++g_stats.frontendNoError;
1377 break;
1378 }
1379
1380 doLatencyStats(0); // we're not going to measure this
1381 return true;
1382 }
1383
1384 ProcessQueryResult processQuery(DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend)
1385 {
1386 const uint16_t queryId = ntohs(dq.dh->id);
1387
1388 try {
1389 /* we need an accurate ("real") value for the response and
1390 to store into the IDS, but not for insertion into the
1391 rings for example */
1392 struct timespec now;
1393 gettime(&now);
1394
1395 string poolname;
1396
1397 if (!applyRulesToQuery(holders, dq, poolname, now)) {
1398 return ProcessQueryResult::Drop;
1399 }
1400
1401 if(dq.dh->qr) { // something turned it into a response
1402 fixUpQueryTurnedResponse(dq, dq.origFlags);
1403
1404 if (!prepareOutgoingResponse(holders, cs, dq, false)) {
1405 return ProcessQueryResult::Drop;
1406 }
1407
1408 ++g_stats.selfAnswered;
1409 return ProcessQueryResult::SendAnswer;
1410 }
1411
1412 std::shared_ptr<ServerPool> serverPool = getPool(*holders.pools, poolname);
1413 dq.packetCache = serverPool->packetCache;
1414 auto policy = *(holders.policy);
1415 if (serverPool->policy != nullptr) {
1416 policy = *(serverPool->policy);
1417 }
1418 auto servers = serverPool->getServers();
1419 if (policy.isLua) {
1420 std::lock_guard<std::mutex> lock(g_luamutex);
1421 selectedBackend = policy.policy(servers, &dq);
1422 }
1423 else {
1424 selectedBackend = policy.policy(servers, &dq);
1425 }
1426
1427 uint16_t cachedResponseSize = dq.size;
1428 uint32_t allowExpired = selectedBackend ? 0 : g_staleCacheEntriesTTL;
1429
1430 if (dq.packetCache && !dq.skipCache) {
1431 dq.dnssecOK = (getEDNSZ(dq) & EDNS_HEADER_FLAG_DO);
1432 }
1433
1434 if (dq.useECS && ((selectedBackend && selectedBackend->useECS) || (!selectedBackend && serverPool->getECS()))) {
1435 // we special case our cache in case a downstream explicitly gave us a universally valid response with a 0 scope
1436 if (dq.packetCache && !dq.skipCache && (!selectedBackend || !selectedBackend->disableZeroScope) && dq.packetCache->isECSParsingEnabled()) {
1437 if (dq.packetCache->get(dq, dq.consumed, dq.dh->id, reinterpret_cast<char*>(dq.dh), &cachedResponseSize, &dq.cacheKeyNoECS, dq.subnet, dq.dnssecOK, allowExpired)) {
1438 dq.len = cachedResponseSize;
1439
1440 if (!prepareOutgoingResponse(holders, cs, dq, true)) {
1441 return ProcessQueryResult::Drop;
1442 }
1443
1444 return ProcessQueryResult::SendAnswer;
1445 }
1446
1447 if (!dq.subnet) {
1448 /* there was no existing ECS on the query, enable the zero-scope feature */
1449 dq.useZeroScope = true;
1450 }
1451 }
1452
1453 if (!handleEDNSClientSubnet(dq, &(dq.ednsAdded), &(dq.ecsAdded), g_preserveTrailingData)) {
1454 vinfolog("Dropping query from %s because we couldn't insert the ECS value", dq.remote->toStringWithPort());
1455 return ProcessQueryResult::Drop;
1456 }
1457 }
1458
1459 if (dq.packetCache && !dq.skipCache) {
1460 if (dq.packetCache->get(dq, dq.consumed, dq.dh->id, reinterpret_cast<char*>(dq.dh), &cachedResponseSize, &dq.cacheKey, dq.subnet, dq.dnssecOK, allowExpired)) {
1461 dq.len = cachedResponseSize;
1462
1463 if (!prepareOutgoingResponse(holders, cs, dq, true)) {
1464 return ProcessQueryResult::Drop;
1465 }
1466
1467 return ProcessQueryResult::SendAnswer;
1468 }
1469 ++g_stats.cacheMisses;
1470 }
1471
1472 if(!selectedBackend) {
1473 ++g_stats.noPolicy;
1474
1475 vinfolog("%s query for %s|%s from %s, no policy applied", g_servFailOnNoPolicy ? "ServFailed" : "Dropped", dq.qname->toString(), QType(dq.qtype).getName(), dq.remote->toStringWithPort());
1476 if (g_servFailOnNoPolicy) {
1477 restoreFlags(dq.dh, dq.origFlags);
1478
1479 dq.dh->rcode = RCode::ServFail;
1480 dq.dh->qr = true;
1481
1482 if (!prepareOutgoingResponse(holders, cs, dq, false)) {
1483 return ProcessQueryResult::Drop;
1484 }
1485 // no response-only statistics counter to update.
1486 return ProcessQueryResult::SendAnswer;
1487 }
1488
1489 return ProcessQueryResult::Drop;
1490 }
1491
1492 if (dq.addXPF && selectedBackend->xpfRRCode != 0) {
1493 addXPF(dq, selectedBackend->xpfRRCode, g_preserveTrailingData);
1494 }
1495
1496 selectedBackend->queries++;
1497 return ProcessQueryResult::PassToBackend;
1498 }
1499 catch(const std::exception& e){
1500 vinfolog("Got an error while parsing a %s query from %s, id %d: %s", (dq.tcp ? "TCP" : "UDP"), dq.remote->toStringWithPort(), queryId, e.what());
1501 }
1502 return ProcessQueryResult::Drop;
1503 }
1504
1505 static void processUDPQuery(ClientState& cs, LocalHolders& holders, const struct msghdr* msgh, const ComboAddress& remote, ComboAddress& dest, char* query, uint16_t len, size_t queryBufferSize, struct mmsghdr* responsesVect, unsigned int* queuedResponses, struct iovec* respIOV, cmsgbuf_aligned* respCBuf)
1506 {
1507 assert(responsesVect == nullptr || (queuedResponses != nullptr && respIOV != nullptr && respCBuf != nullptr));
1508 uint16_t queryId = 0;
1509
1510 try {
1511 if (!isUDPQueryAcceptable(cs, holders, msgh, remote, dest)) {
1512 return;
1513 }
1514
1515 /* we need an accurate ("real") value for the response and
1516 to store into the IDS, but not for insertion into the
1517 rings for example */
1518 struct timespec queryRealTime;
1519 gettime(&queryRealTime, true);
1520
1521 std::shared_ptr<DNSCryptQuery> dnsCryptQuery = nullptr;
1522 auto dnsCryptResponse = checkDNSCryptQuery(cs, query, len, dnsCryptQuery, queryRealTime.tv_sec, false);
1523 if (dnsCryptResponse) {
1524 sendUDPResponse(cs.udpFD, reinterpret_cast<char*>(dnsCryptResponse->data()), static_cast<uint16_t>(dnsCryptResponse->size()), 0, dest, remote);
1525 return;
1526 }
1527
1528 struct dnsheader* dh = reinterpret_cast<struct dnsheader*>(query);
1529 queryId = ntohs(dh->id);
1530
1531 if (!checkQueryHeaders(dh)) {
1532 return;
1533 }
1534
1535 uint16_t qtype, qclass;
1536 unsigned int consumed = 0;
1537 DNSName qname(query, len, sizeof(dnsheader), false, &qtype, &qclass, &consumed);
1538 DNSQuestion dq(&qname, qtype, qclass, consumed, dest.sin4.sin_family != 0 ? &dest : &cs.local, &remote, dh, queryBufferSize, len, false, &queryRealTime);
1539 dq.dnsCryptQuery = std::move(dnsCryptQuery);
1540 std::shared_ptr<DownstreamState> ss{nullptr};
1541 auto result = processQuery(dq, cs, holders, ss);
1542
1543 if (result == ProcessQueryResult::Drop) {
1544 return;
1545 }
1546
1547 if (result == ProcessQueryResult::SendAnswer) {
1548 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1549 if (dq.delayMsec == 0 && responsesVect != nullptr) {
1550 queueResponse(cs, reinterpret_cast<char*>(dq.dh), dq.len, *dq.local, *dq.remote, responsesVect[*queuedResponses], respIOV, respCBuf);
1551 (*queuedResponses)++;
1552 return;
1553 }
1554 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1555 /* we use dest, always, because we don't want to use the listening address to send a response since it could be 0.0.0.0 */
1556 sendUDPResponse(cs.udpFD, reinterpret_cast<char*>(dq.dh), dq.len, dq.delayMsec, dest, *dq.remote);
1557 return;
1558 }
1559
1560 if (result != ProcessQueryResult::PassToBackend || ss == nullptr) {
1561 return;
1562 }
1563
1564 unsigned int idOffset = (ss->idOffset++) % ss->idStates.size();
1565 IDState* ids = &ss->idStates[idOffset];
1566 ids->age = 0;
1567 ids->du = nullptr;
1568
1569 int oldFD = ids->origFD.exchange(cs.udpFD);
1570 if(oldFD < 0) {
1571 // if we are reusing, no change in outstanding
1572 ++ss->outstanding;
1573 }
1574 else {
1575 ++ss->reuseds;
1576 ++g_stats.downstreamTimeouts;
1577 }
1578
1579 ids->cs = &cs;
1580 ids->origID = dh->id;
1581 setIDStateFromDNSQuestion(*ids, dq, std::move(qname));
1582
1583 /* If we couldn't harvest the real dest addr, still
1584 write down the listening addr since it will be useful
1585 (especially if it's not an 'any' one).
1586 We need to keep track of which one it is since we may
1587 want to use the real but not the listening addr to reply.
1588 */
1589 if (dest.sin4.sin_family != 0) {
1590 ids->origDest = dest;
1591 ids->destHarvested = true;
1592 }
1593 else {
1594 ids->origDest = cs.local;
1595 ids->destHarvested = false;
1596 }
1597
1598 dh->id = idOffset;
1599
1600 int fd = pickBackendSocketForSending(ss);
1601 ssize_t ret = udpClientSendRequestToBackend(ss, fd, query, dq.len);
1602
1603 if(ret < 0) {
1604 ++ss->sendErrors;
1605 ++g_stats.downstreamSendErrors;
1606 }
1607
1608 vinfolog("Got query for %s|%s from %s, relayed to %s", ids->qname.toString(), QType(ids->qtype).getName(), remote.toStringWithPort(), ss->getName());
1609 }
1610 catch(const std::exception& e){
1611 vinfolog("Got an error in UDP question thread while parsing a query from %s, id %d: %s", remote.toStringWithPort(), queryId, e.what());
1612 }
1613 }
1614
1615 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1616 static void MultipleMessagesUDPClientThread(ClientState* cs, LocalHolders& holders)
1617 {
1618 struct MMReceiver
1619 {
1620 char packet[4096];
1621 ComboAddress remote;
1622 ComboAddress dest;
1623 struct iovec iov;
1624 /* used by HarvestDestinationAddress */
1625 cmsgbuf_aligned cbuf;
1626 };
1627 const size_t vectSize = g_udpVectorSize;
1628 /* the actual buffer is larger because:
1629 - we may have to add EDNS and/or ECS
1630 - we use it for self-generated responses (from rule or cache)
1631 but we only accept incoming payloads up to that size
1632 */
1633 static_assert(s_udpIncomingBufferSize <= sizeof(MMReceiver::packet), "the incoming buffer size should not be larger than sizeof(MMReceiver::packet)");
1634
1635 auto recvData = std::unique_ptr<MMReceiver[]>(new MMReceiver[vectSize]);
1636 auto msgVec = std::unique_ptr<struct mmsghdr[]>(new struct mmsghdr[vectSize]);
1637 auto outMsgVec = std::unique_ptr<struct mmsghdr[]>(new struct mmsghdr[vectSize]);
1638
1639 /* initialize the structures needed to receive our messages */
1640 for (size_t idx = 0; idx < vectSize; idx++) {
1641 recvData[idx].remote.sin4.sin_family = cs->local.sin4.sin_family;
1642 fillMSGHdr(&msgVec[idx].msg_hdr, &recvData[idx].iov, &recvData[idx].cbuf, sizeof(recvData[idx].cbuf), recvData[idx].packet, s_udpIncomingBufferSize, &recvData[idx].remote);
1643 }
1644
1645 /* go now */
1646 for(;;) {
1647
1648 /* reset the IO vector, since it's also used to send the vector of responses
1649 to avoid having to copy the data around */
1650 for (size_t idx = 0; idx < vectSize; idx++) {
1651 recvData[idx].iov.iov_base = recvData[idx].packet;
1652 recvData[idx].iov.iov_len = sizeof(recvData[idx].packet);
1653 }
1654
1655 /* block until we have at least one message ready, but return
1656 as many as possible to save the syscall costs */
1657 int msgsGot = recvmmsg(cs->udpFD, msgVec.get(), vectSize, MSG_WAITFORONE | MSG_TRUNC, nullptr);
1658
1659 if (msgsGot <= 0) {
1660 vinfolog("Getting UDP messages via recvmmsg() failed with: %s", strerror(errno));
1661 continue;
1662 }
1663
1664 unsigned int msgsToSend = 0;
1665
1666 /* process the received messages */
1667 for (int msgIdx = 0; msgIdx < msgsGot; msgIdx++) {
1668 const struct msghdr* msgh = &msgVec[msgIdx].msg_hdr;
1669 unsigned int got = msgVec[msgIdx].msg_len;
1670 const ComboAddress& remote = recvData[msgIdx].remote;
1671
1672 if (static_cast<size_t>(got) < sizeof(struct dnsheader)) {
1673 ++g_stats.nonCompliantQueries;
1674 continue;
1675 }
1676
1677 processUDPQuery(*cs, holders, msgh, remote, recvData[msgIdx].dest, recvData[msgIdx].packet, static_cast<uint16_t>(got), sizeof(recvData[msgIdx].packet), outMsgVec.get(), &msgsToSend, &recvData[msgIdx].iov, &recvData[msgIdx].cbuf);
1678
1679 }
1680
1681 /* immediate (not delayed or sent to a backend) responses (mostly from a rule, dynamic block
1682 or the cache) can be sent in batch too */
1683
1684 if (msgsToSend > 0 && msgsToSend <= static_cast<unsigned int>(msgsGot)) {
1685 int sent = sendmmsg(cs->udpFD, outMsgVec.get(), msgsToSend, 0);
1686
1687 if (sent < 0 || static_cast<unsigned int>(sent) != msgsToSend) {
1688 vinfolog("Error sending responses with sendmmsg() (%d on %u): %s", sent, msgsToSend, strerror(errno));
1689 }
1690 }
1691
1692 }
1693 }
1694 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1695
1696 // listens to incoming queries, sends out to downstream servers, noting the intended return path
1697 static void udpClientThread(ClientState* cs)
1698 try
1699 {
1700 setThreadName("dnsdist/udpClie");
1701 LocalHolders holders;
1702
1703 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1704 if (g_udpVectorSize > 1) {
1705 MultipleMessagesUDPClientThread(cs, holders);
1706
1707 }
1708 else
1709 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1710 {
1711 char packet[4096];
1712 /* the actual buffer is larger because:
1713 - we may have to add EDNS and/or ECS
1714 - we use it for self-generated responses (from rule or cache)
1715 but we only accept incoming payloads up to that size
1716 */
1717 static_assert(s_udpIncomingBufferSize <= sizeof(packet), "the incoming buffer size should not be larger than sizeof(MMReceiver::packet)");
1718 struct msghdr msgh;
1719 struct iovec iov;
1720 /* used by HarvestDestinationAddress */
1721 cmsgbuf_aligned cbuf;
1722
1723 ComboAddress remote;
1724 ComboAddress dest;
1725 remote.sin4.sin_family = cs->local.sin4.sin_family;
1726 fillMSGHdr(&msgh, &iov, &cbuf, sizeof(cbuf), packet, sizeof(packet), &remote);
1727
1728 for(;;) {
1729 ssize_t got = recvmsg(cs->udpFD, &msgh, 0);
1730
1731 if (got < 0 || static_cast<size_t>(got) < sizeof(struct dnsheader)) {
1732 ++g_stats.nonCompliantQueries;
1733 continue;
1734 }
1735
1736 processUDPQuery(*cs, holders, &msgh, remote, dest, packet, static_cast<uint16_t>(got), s_udpIncomingBufferSize, nullptr, nullptr, nullptr, nullptr);
1737 }
1738 }
1739 }
1740 catch(const std::exception &e)
1741 {
1742 errlog("UDP client thread died because of exception: %s", e.what());
1743 }
1744 catch(const PDNSException &e)
1745 {
1746 errlog("UDP client thread died because of PowerDNS exception: %s", e.reason);
1747 }
1748 catch(...)
1749 {
1750 errlog("UDP client thread died because of an exception: %s", "unknown");
1751 }
1752
1753 uint16_t getRandomDNSID()
1754 {
1755 #ifdef HAVE_LIBSODIUM
1756 return (randombytes_random() % 65536);
1757 #else
1758 return (random() % 65536);
1759 #endif
1760 }
1761
1762 static bool upCheck(const shared_ptr<DownstreamState>& ds)
1763 try
1764 {
1765 DNSName checkName = ds->checkName;
1766 uint16_t checkType = ds->checkType.getCode();
1767 uint16_t checkClass = ds->checkClass;
1768 dnsheader checkHeader;
1769 memset(&checkHeader, 0, sizeof(checkHeader));
1770
1771 checkHeader.qdcount = htons(1);
1772 checkHeader.id = getRandomDNSID();
1773
1774 checkHeader.rd = true;
1775 if (ds->setCD) {
1776 checkHeader.cd = true;
1777 }
1778
1779 if (ds->checkFunction) {
1780 std::lock_guard<std::mutex> lock(g_luamutex);
1781 auto ret = ds->checkFunction(checkName, checkType, checkClass, &checkHeader);
1782 checkName = std::get<0>(ret);
1783 checkType = std::get<1>(ret);
1784 checkClass = std::get<2>(ret);
1785 }
1786
1787 vector<uint8_t> packet;
1788 DNSPacketWriter dpw(packet, checkName, checkType, checkClass);
1789 dnsheader * requestHeader = dpw.getHeader();
1790 *requestHeader = checkHeader;
1791
1792 Socket sock(ds->remote.sin4.sin_family, SOCK_DGRAM);
1793 sock.setNonBlocking();
1794 if (!IsAnyAddress(ds->sourceAddr)) {
1795 sock.setReuseAddr();
1796 sock.bind(ds->sourceAddr);
1797 }
1798 sock.connect(ds->remote);
1799 ssize_t sent = udpClientSendRequestToBackend(ds, sock.getHandle(), reinterpret_cast<char*>(&packet[0]), packet.size(), true);
1800 if (sent < 0) {
1801 int ret = errno;
1802 if (g_verboseHealthChecks)
1803 infolog("Error while sending a health check query to backend %s: %d", ds->getNameWithAddr(), ret);
1804 return false;
1805 }
1806
1807 int ret = waitForRWData(sock.getHandle(), true, /* ms to seconds */ ds->checkTimeout / 1000, /* remaining ms to us */ (ds->checkTimeout % 1000) * 1000);
1808 if(ret < 0 || !ret) { // error, timeout, both are down!
1809 if (ret < 0) {
1810 ret = errno;
1811 if (g_verboseHealthChecks)
1812 infolog("Error while waiting for the health check response from backend %s: %d", ds->getNameWithAddr(), ret);
1813 }
1814 else {
1815 if (g_verboseHealthChecks)
1816 infolog("Timeout while waiting for the health check response from backend %s", ds->getNameWithAddr());
1817 }
1818 return false;
1819 }
1820
1821 string reply;
1822 ComboAddress from;
1823 sock.recvFrom(reply, from);
1824
1825 /* we are using a connected socket but hey.. */
1826 if (from != ds->remote) {
1827 if (g_verboseHealthChecks)
1828 infolog("Invalid health check response received from %s, expecting one from %s", from.toStringWithPort(), ds->remote.toStringWithPort());
1829 return false;
1830 }
1831
1832 const dnsheader * responseHeader = reinterpret_cast<const dnsheader *>(reply.c_str());
1833
1834 if (reply.size() < sizeof(*responseHeader)) {
1835 if (g_verboseHealthChecks)
1836 infolog("Invalid health check response of size %d from backend %s, expecting at least %d", reply.size(), ds->getNameWithAddr(), sizeof(*responseHeader));
1837 return false;
1838 }
1839
1840 if (responseHeader->id != requestHeader->id) {
1841 if (g_verboseHealthChecks)
1842 infolog("Invalid health check response id %d from backend %s, expecting %d", responseHeader->id, ds->getNameWithAddr(), requestHeader->id);
1843 return false;
1844 }
1845
1846 if (!responseHeader->qr) {
1847 if (g_verboseHealthChecks)
1848 infolog("Invalid health check response from backend %s, expecting QR to be set", ds->getNameWithAddr());
1849 return false;
1850 }
1851
1852 if (responseHeader->rcode == RCode::ServFail) {
1853 if (g_verboseHealthChecks)
1854 infolog("Backend %s responded to health check with ServFail", ds->getNameWithAddr());
1855 return false;
1856 }
1857
1858 if (ds->mustResolve && (responseHeader->rcode == RCode::NXDomain || responseHeader->rcode == RCode::Refused)) {
1859 if (g_verboseHealthChecks)
1860 infolog("Backend %s responded to health check with %s while mustResolve is set", ds->getNameWithAddr(), responseHeader->rcode == RCode::NXDomain ? "NXDomain" : "Refused");
1861 return false;
1862 }
1863
1864 uint16_t receivedType;
1865 uint16_t receivedClass;
1866 DNSName receivedName(reply.c_str(), reply.size(), sizeof(dnsheader), false, &receivedType, &receivedClass);
1867
1868 if (receivedName != checkName || receivedType != checkType || receivedClass != checkClass) {
1869 if (g_verboseHealthChecks)
1870 infolog("Backend %s responded to health check with an invalid qname (%s vs %s), qtype (%s vs %s) or qclass (%d vs %d)", ds->getNameWithAddr(), receivedName.toLogString(), checkName.toLogString(), QType(receivedType).getName(), QType(checkType).getName(), receivedClass, checkClass);
1871 return false;
1872 }
1873
1874 return true;
1875 }
1876 catch(const std::exception& e)
1877 {
1878 if (g_verboseHealthChecks)
1879 infolog("Error checking the health of backend %s: %s", ds->getNameWithAddr(), e.what());
1880 return false;
1881 }
1882 catch(...)
1883 {
1884 if (g_verboseHealthChecks)
1885 infolog("Unknown exception while checking the health of backend %s", ds->getNameWithAddr());
1886 return false;
1887 }
1888
1889 uint64_t g_maxTCPClientThreads{10};
1890 std::atomic<uint16_t> g_cacheCleaningDelay{60};
1891 std::atomic<uint16_t> g_cacheCleaningPercentage{100};
1892
1893 void maintThread()
1894 {
1895 setThreadName("dnsdist/main");
1896 int interval = 1;
1897 size_t counter = 0;
1898 int32_t secondsToWaitLog = 0;
1899
1900 for(;;) {
1901 sleep(interval);
1902
1903 {
1904 std::lock_guard<std::mutex> lock(g_luamutex);
1905 auto f = g_lua.readVariable<boost::optional<std::function<void()> > >("maintenance");
1906 if(f) {
1907 try {
1908 (*f)();
1909 secondsToWaitLog = 0;
1910 }
1911 catch(std::exception &e) {
1912 if (secondsToWaitLog <= 0) {
1913 infolog("Error during execution of maintenance function: %s", e.what());
1914 secondsToWaitLog = 61;
1915 }
1916 secondsToWaitLog -= interval;
1917 }
1918 }
1919 }
1920
1921 counter++;
1922 if (counter >= g_cacheCleaningDelay) {
1923 /* keep track, for each cache, of whether we should keep
1924 expired entries */
1925 std::map<std::shared_ptr<DNSDistPacketCache>, bool> caches;
1926
1927 /* gather all caches actually used by at least one pool, and see
1928 if something prevents us from cleaning the expired entries */
1929 auto localPools = g_pools.getLocal();
1930 for (const auto& entry : *localPools) {
1931 auto& pool = entry.second;
1932
1933 auto packetCache = pool->packetCache;
1934 if (!packetCache) {
1935 continue;
1936 }
1937
1938 auto pair = caches.insert({packetCache, false});
1939 auto& iter = pair.first;
1940 /* if we need to keep stale data for this cache (ie, not clear
1941 expired entries when at least one pool using this cache
1942 has all its backends down) */
1943 if (packetCache->keepStaleData() && iter->second == false) {
1944 /* so far all pools had at least one backend up */
1945 if (pool->countServers(true) == 0) {
1946 iter->second = true;
1947 }
1948 }
1949 }
1950
1951 for (auto pair : caches) {
1952 /* shall we keep expired entries ? */
1953 if (pair.second == true) {
1954 continue;
1955 }
1956 auto& packetCache = pair.first;
1957 size_t upTo = (packetCache->getMaxEntries()* (100 - g_cacheCleaningPercentage)) / 100;
1958 packetCache->purgeExpired(upTo);
1959 }
1960 counter = 0;
1961 }
1962
1963 // ponder pruning g_dynblocks of expired entries here
1964 }
1965 }
1966
1967 static void secPollThread()
1968 {
1969 setThreadName("dnsdist/secpoll");
1970
1971 for (;;) {
1972 try {
1973 doSecPoll(g_secPollSuffix);
1974 }
1975 catch(...) {
1976 }
1977 sleep(g_secPollInterval);
1978 }
1979 }
1980
1981 static void healthChecksThread()
1982 {
1983 setThreadName("dnsdist/healthC");
1984
1985 int interval = 1;
1986
1987 for(;;) {
1988 sleep(interval);
1989
1990 if(g_tcpclientthreads->getQueuedCount() > 1 && !g_tcpclientthreads->hasReachedMaxThreads())
1991 g_tcpclientthreads->addTCPClientThread();
1992
1993 auto states = g_dstates.getLocal(); // this points to the actual shared_ptrs!
1994 for(auto& dss : *states) {
1995 if(++dss->lastCheck < dss->checkInterval)
1996 continue;
1997 dss->lastCheck = 0;
1998 if(dss->availability==DownstreamState::Availability::Auto) {
1999 bool newState=upCheck(dss);
2000 if (newState) {
2001 /* check succeeded */
2002 dss->currentCheckFailures = 0;
2003
2004 if (!dss->upStatus) {
2005 /* we were marked as down */
2006 dss->consecutiveSuccessfulChecks++;
2007 if (dss->consecutiveSuccessfulChecks < dss->minRiseSuccesses) {
2008 /* if we need more than one successful check to rise
2009 and we didn't reach the threshold yet,
2010 let's stay down */
2011 newState = false;
2012 }
2013 }
2014 }
2015 else {
2016 /* check failed */
2017 dss->consecutiveSuccessfulChecks = 0;
2018
2019 if (dss->upStatus) {
2020 /* we are currently up */
2021 dss->currentCheckFailures++;
2022 if (dss->currentCheckFailures < dss->maxCheckFailures) {
2023 /* we need more than one failure to be marked as down,
2024 and we did not reach the threshold yet, let's stay down */
2025 newState = true;
2026 }
2027 }
2028 }
2029
2030 if(newState != dss->upStatus) {
2031 warnlog("Marking downstream %s as '%s'", dss->getNameWithAddr(), newState ? "up" : "down");
2032
2033 if (newState && !dss->connected) {
2034 newState = dss->reconnect();
2035
2036 if (dss->connected && !dss->threadStarted.test_and_set()) {
2037 dss->tid = thread(responderThread, dss);
2038 }
2039 }
2040
2041 dss->upStatus = newState;
2042 dss->currentCheckFailures = 0;
2043 dss->consecutiveSuccessfulChecks = 0;
2044 if (g_snmpAgent && g_snmpTrapsEnabled) {
2045 g_snmpAgent->sendBackendStatusChangeTrap(dss);
2046 }
2047 }
2048 }
2049
2050 auto delta = dss->sw.udiffAndSet()/1000000.0;
2051 dss->queryLoad = 1.0*(dss->queries.load() - dss->prev.queries.load())/delta;
2052 dss->dropRate = 1.0*(dss->reuseds.load() - dss->prev.reuseds.load())/delta;
2053 dss->prev.queries.store(dss->queries.load());
2054 dss->prev.reuseds.store(dss->reuseds.load());
2055
2056 for(IDState& ids : dss->idStates) { // timeouts
2057 int origFD = ids.origFD;
2058 if(origFD >=0 && ids.age++ > g_udpTimeout) {
2059 /* We set origFD to -1 as soon as possible
2060 to limit the risk of racing with the
2061 responder thread.
2062 The UDP client thread only checks origFD to
2063 know whether outstanding has to be incremented,
2064 so the sooner the better any way since we _will_
2065 decrement it.
2066 */
2067 if (ids.origFD.exchange(-1) != origFD) {
2068 /* this state has been altered in the meantime,
2069 don't go anywhere near it */
2070 continue;
2071 }
2072 ids.du = nullptr;
2073 ids.age = 0;
2074 dss->reuseds++;
2075 --dss->outstanding;
2076 ++g_stats.downstreamTimeouts; // this is an 'actively' discovered timeout
2077 vinfolog("Had a downstream timeout from %s (%s) for query for %s|%s from %s",
2078 dss->remote.toStringWithPort(), dss->name,
2079 ids.qname.toString(), QType(ids.qtype).getName(), ids.origRemote.toStringWithPort());
2080
2081 struct timespec ts;
2082 gettime(&ts);
2083
2084 struct dnsheader fake;
2085 memset(&fake, 0, sizeof(fake));
2086 fake.id = ids.origID;
2087
2088 g_rings.insertResponse(ts, ids.origRemote, ids.qname, ids.qtype, std::numeric_limits<unsigned int>::max(), 0, fake, dss->remote);
2089 }
2090 }
2091 }
2092 }
2093 }
2094
2095 static void bindAny(int af, int sock)
2096 {
2097 __attribute__((unused)) int one = 1;
2098
2099 #ifdef IP_FREEBIND
2100 if (setsockopt(sock, IPPROTO_IP, IP_FREEBIND, &one, sizeof(one)) < 0)
2101 warnlog("Warning: IP_FREEBIND setsockopt failed: %s", strerror(errno));
2102 #endif
2103
2104 #ifdef IP_BINDANY
2105 if (af == AF_INET)
2106 if (setsockopt(sock, IPPROTO_IP, IP_BINDANY, &one, sizeof(one)) < 0)
2107 warnlog("Warning: IP_BINDANY setsockopt failed: %s", strerror(errno));
2108 #endif
2109 #ifdef IPV6_BINDANY
2110 if (af == AF_INET6)
2111 if (setsockopt(sock, IPPROTO_IPV6, IPV6_BINDANY, &one, sizeof(one)) < 0)
2112 warnlog("Warning: IPV6_BINDANY setsockopt failed: %s", strerror(errno));
2113 #endif
2114 #ifdef SO_BINDANY
2115 if (setsockopt(sock, SOL_SOCKET, SO_BINDANY, &one, sizeof(one)) < 0)
2116 warnlog("Warning: SO_BINDANY setsockopt failed: %s", strerror(errno));
2117 #endif
2118 }
2119
2120 static void dropGroupPrivs(gid_t gid)
2121 {
2122 if (gid) {
2123 if (setgid(gid) == 0) {
2124 if (setgroups(0, NULL) < 0) {
2125 warnlog("Warning: Unable to drop supplementary gids: %s", strerror(errno));
2126 }
2127 }
2128 else {
2129 warnlog("Warning: Unable to set group ID to %d: %s", gid, strerror(errno));
2130 }
2131 }
2132 }
2133
2134 static void dropUserPrivs(uid_t uid)
2135 {
2136 if(uid) {
2137 if(setuid(uid) < 0) {
2138 warnlog("Warning: Unable to set user ID to %d: %s", uid, strerror(errno));
2139 }
2140 }
2141 }
2142
2143 static void checkFileDescriptorsLimits(size_t udpBindsCount, size_t tcpBindsCount)
2144 {
2145 /* stdin, stdout, stderr */
2146 size_t requiredFDsCount = 3;
2147 auto backends = g_dstates.getLocal();
2148 /* UDP sockets to backends */
2149 size_t backendUDPSocketsCount = 0;
2150 for (const auto& backend : *backends) {
2151 backendUDPSocketsCount += backend->sockets.size();
2152 }
2153 requiredFDsCount += backendUDPSocketsCount;
2154 /* TCP sockets to backends */
2155 requiredFDsCount += (backends->size() * g_maxTCPClientThreads);
2156 /* listening sockets */
2157 requiredFDsCount += udpBindsCount;
2158 requiredFDsCount += tcpBindsCount;
2159 /* max TCP connections currently served */
2160 requiredFDsCount += g_maxTCPClientThreads;
2161 /* max pipes for communicating between TCP acceptors and client threads */
2162 requiredFDsCount += (g_maxTCPClientThreads * 2);
2163 /* max TCP queued connections */
2164 requiredFDsCount += g_maxTCPQueuedConnections;
2165 /* DelayPipe pipe */
2166 requiredFDsCount += 2;
2167 /* syslog socket */
2168 requiredFDsCount++;
2169 /* webserver main socket */
2170 requiredFDsCount++;
2171 /* console main socket */
2172 requiredFDsCount++;
2173 /* carbon export */
2174 requiredFDsCount++;
2175 /* history file */
2176 requiredFDsCount++;
2177 struct rlimit rl;
2178 getrlimit(RLIMIT_NOFILE, &rl);
2179 if (rl.rlim_cur <= requiredFDsCount) {
2180 warnlog("Warning, this configuration can use more than %d file descriptors, web server and console connections not included, and the current limit is %d.", std::to_string(requiredFDsCount), std::to_string(rl.rlim_cur));
2181 #ifdef HAVE_SYSTEMD
2182 warnlog("You can increase this value by using LimitNOFILE= in the systemd unit file or ulimit.");
2183 #else
2184 warnlog("You can increase this value by using ulimit.");
2185 #endif
2186 }
2187 }
2188
2189 static void setUpLocalBind(std::unique_ptr<ClientState>& cs)
2190 {
2191 /* skip some warnings if there is an identical UDP context */
2192 bool warn = cs->tcp == false || cs->tlsFrontend != nullptr || cs->dohFrontend != nullptr;
2193 int& fd = cs->tcp == false ? cs->udpFD : cs->tcpFD;
2194 (void) warn;
2195
2196 fd = SSocket(cs->local.sin4.sin_family, cs->tcp == false ? SOCK_DGRAM : SOCK_STREAM, 0);
2197
2198 if (cs->tcp) {
2199 SSetsockopt(fd, SOL_SOCKET, SO_REUSEADDR, 1);
2200 #ifdef TCP_DEFER_ACCEPT
2201 SSetsockopt(fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, 1);
2202 #endif
2203 if (cs->fastOpenQueueSize > 0) {
2204 #ifdef TCP_FASTOPEN
2205 SSetsockopt(fd, IPPROTO_TCP, TCP_FASTOPEN, cs->fastOpenQueueSize);
2206 #else
2207 if (warn) {
2208 warnlog("TCP Fast Open has been configured on local address '%s' but is not supported", cs->local.toStringWithPort());
2209 }
2210 #endif
2211 }
2212 }
2213
2214 if(cs->local.sin4.sin_family == AF_INET6) {
2215 SSetsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, 1);
2216 }
2217
2218 bindAny(cs->local.sin4.sin_family, fd);
2219
2220 if(!cs->tcp && IsAnyAddress(cs->local)) {
2221 int one=1;
2222 setsockopt(fd, IPPROTO_IP, GEN_IP_PKTINFO, &one, sizeof(one)); // linux supports this, so why not - might fail on other systems
2223 #ifdef IPV6_RECVPKTINFO
2224 setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, &one, sizeof(one));
2225 #endif
2226 }
2227
2228 if (cs->reuseport) {
2229 #ifdef SO_REUSEPORT
2230 SSetsockopt(fd, SOL_SOCKET, SO_REUSEPORT, 1);
2231 #else
2232 if (warn) {
2233 /* no need to warn again if configured but support is not available, we already did for UDP */
2234 warnlog("SO_REUSEPORT has been configured on local address '%s' but is not supported", cs->local.toStringWithPort());
2235 }
2236 #endif
2237 }
2238
2239 if (!cs->tcp) {
2240 if (cs->local.isIPv4()) {
2241 try {
2242 setSocketIgnorePMTU(cs->udpFD);
2243 }
2244 catch(const std::exception& e) {
2245 warnlog("Failed to set IP_MTU_DISCOVER on UDP server socket for local address '%s': %s", cs->local.toStringWithPort(), e.what());
2246 }
2247 }
2248 }
2249
2250 const std::string& itf = cs->interface;
2251 if (!itf.empty()) {
2252 #ifdef SO_BINDTODEVICE
2253 int res = setsockopt(fd, SOL_SOCKET, SO_BINDTODEVICE, itf.c_str(), itf.length());
2254 if (res != 0) {
2255 warnlog("Error setting up the interface on local address '%s': %s", cs->local.toStringWithPort(), strerror(errno));
2256 }
2257 #else
2258 if (warn) {
2259 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", cs->local.toStringWithPort());
2260 }
2261 #endif
2262 }
2263
2264 #ifdef HAVE_EBPF
2265 if (g_defaultBPFFilter) {
2266 cs->attachFilter(g_defaultBPFFilter);
2267 vinfolog("Attaching default BPF Filter to %s frontend %s", (!cs->tcp ? "UDP" : "TCP"), cs->local.toStringWithPort());
2268 }
2269 #endif /* HAVE_EBPF */
2270
2271 if (cs->tlsFrontend != nullptr) {
2272 if (!cs->tlsFrontend->setupTLS()) {
2273 errlog("Error while setting up TLS on local address '%s', exiting", cs->local.toStringWithPort());
2274 _exit(EXIT_FAILURE);
2275 }
2276 }
2277
2278 if (cs->dohFrontend != nullptr) {
2279 cs->dohFrontend->setup();
2280 }
2281
2282 SBind(fd, cs->local);
2283
2284 if (cs->tcp) {
2285 SListen(cs->tcpFD, SOMAXCONN);
2286 if (cs->tlsFrontend != nullptr) {
2287 warnlog("Listening on %s for TLS", cs->local.toStringWithPort());
2288 }
2289 else if (cs->dohFrontend != nullptr) {
2290 warnlog("Listening on %s for DoH", cs->local.toStringWithPort());
2291 }
2292 else if (cs->dnscryptCtx != nullptr) {
2293 warnlog("Listening on %s for DNSCrypt", cs->local.toStringWithPort());
2294 }
2295 else {
2296 warnlog("Listening on %s", cs->local.toStringWithPort());
2297 }
2298 }
2299
2300 cs->ready = true;
2301 }
2302
2303 struct
2304 {
2305 vector<string> locals;
2306 vector<string> remotes;
2307 bool checkConfig{false};
2308 bool beClient{false};
2309 bool beSupervised{false};
2310 string command;
2311 string config;
2312 string uid;
2313 string gid;
2314 } g_cmdLine;
2315
2316 std::atomic<bool> g_configurationDone{false};
2317
2318 static void usage()
2319 {
2320 cout<<endl;
2321 cout<<"Syntax: dnsdist [-C,--config file] [-c,--client [IP[:PORT]]]\n";
2322 cout<<"[-e,--execute cmd] [-h,--help] [-l,--local addr]\n";
2323 cout<<"[-v,--verbose] [--check-config] [--version]\n";
2324 cout<<"\n";
2325 cout<<"-a,--acl netmask Add this netmask to the ACL\n";
2326 cout<<"-C,--config file Load configuration from 'file'\n";
2327 cout<<"-c,--client Operate as a client, connect to dnsdist. This reads\n";
2328 cout<<" controlSocket from your configuration file, but also\n";
2329 cout<<" accepts an IP:PORT argument\n";
2330 #ifdef HAVE_LIBSODIUM
2331 cout<<"-k,--setkey KEY Use KEY for encrypted communication to dnsdist. This\n";
2332 cout<<" is similar to setting setKey in the configuration file.\n";
2333 cout<<" NOTE: this will leak this key in your shell's history\n";
2334 cout<<" and in the systems running process list.\n";
2335 #endif
2336 cout<<"--check-config Validate the configuration file and exit. The exit-code\n";
2337 cout<<" reflects the validation, 0 is OK, 1 means an error.\n";
2338 cout<<" Any errors are printed as well.\n";
2339 cout<<"-e,--execute cmd Connect to dnsdist and execute 'cmd'\n";
2340 cout<<"-g,--gid gid Change the process group ID after binding sockets\n";
2341 cout<<"-h,--help Display this helpful message\n";
2342 cout<<"-l,--local address Listen on this local address\n";
2343 cout<<"--supervised Don't open a console, I'm supervised\n";
2344 cout<<" (use with e.g. systemd and daemontools)\n";
2345 cout<<"--disable-syslog Don't log to syslog, only to stdout\n";
2346 cout<<" (use with e.g. systemd)\n";
2347 cout<<"-u,--uid uid Change the process user ID after binding sockets\n";
2348 cout<<"-v,--verbose Enable verbose mode\n";
2349 cout<<"-V,--version Show dnsdist version information and exit\n";
2350 }
2351
2352 int main(int argc, char** argv)
2353 try
2354 {
2355 size_t udpBindsCount = 0;
2356 size_t tcpBindsCount = 0;
2357 rl_attempted_completion_function = my_completion;
2358 rl_completion_append_character = 0;
2359
2360 signal(SIGPIPE, SIG_IGN);
2361 signal(SIGCHLD, SIG_IGN);
2362 openlog("dnsdist", LOG_PID|LOG_NDELAY, LOG_DAEMON);
2363
2364 #ifdef HAVE_LIBSODIUM
2365 if (sodium_init() == -1) {
2366 cerr<<"Unable to initialize crypto library"<<endl;
2367 exit(EXIT_FAILURE);
2368 }
2369 g_hashperturb=randombytes_uniform(0xffffffff);
2370 srandom(randombytes_uniform(0xffffffff));
2371 #else
2372 {
2373 struct timeval tv;
2374 gettimeofday(&tv, 0);
2375 srandom(tv.tv_sec ^ tv.tv_usec ^ getpid());
2376 g_hashperturb=random();
2377 }
2378
2379 #endif
2380 ComboAddress clientAddress = ComboAddress();
2381 g_cmdLine.config=SYSCONFDIR "/dnsdist.conf";
2382 struct option longopts[]={
2383 {"acl", required_argument, 0, 'a'},
2384 {"check-config", no_argument, 0, 1},
2385 {"client", no_argument, 0, 'c'},
2386 {"config", required_argument, 0, 'C'},
2387 {"disable-syslog", no_argument, 0, 2},
2388 {"execute", required_argument, 0, 'e'},
2389 {"gid", required_argument, 0, 'g'},
2390 {"help", no_argument, 0, 'h'},
2391 {"local", required_argument, 0, 'l'},
2392 {"setkey", required_argument, 0, 'k'},
2393 {"supervised", no_argument, 0, 3},
2394 {"uid", required_argument, 0, 'u'},
2395 {"verbose", no_argument, 0, 'v'},
2396 {"version", no_argument, 0, 'V'},
2397 {0,0,0,0}
2398 };
2399 int longindex=0;
2400 string optstring;
2401 for(;;) {
2402 int c=getopt_long(argc, argv, "a:cC:e:g:hk:l:u:vV", longopts, &longindex);
2403 if(c==-1)
2404 break;
2405 switch(c) {
2406 case 1:
2407 g_cmdLine.checkConfig=true;
2408 break;
2409 case 2:
2410 g_syslog=false;
2411 break;
2412 case 3:
2413 g_cmdLine.beSupervised=true;
2414 break;
2415 case 'C':
2416 g_cmdLine.config=optarg;
2417 break;
2418 case 'c':
2419 g_cmdLine.beClient=true;
2420 break;
2421 case 'e':
2422 g_cmdLine.command=optarg;
2423 break;
2424 case 'g':
2425 g_cmdLine.gid=optarg;
2426 break;
2427 case 'h':
2428 cout<<"dnsdist "<<VERSION<<endl;
2429 usage();
2430 cout<<"\n";
2431 exit(EXIT_SUCCESS);
2432 break;
2433 case 'a':
2434 optstring=optarg;
2435 g_ACL.modify([optstring](NetmaskGroup& nmg) { nmg.addMask(optstring); });
2436 break;
2437 case 'k':
2438 #ifdef HAVE_LIBSODIUM
2439 if (B64Decode(string(optarg), g_consoleKey) < 0) {
2440 cerr<<"Unable to decode key '"<<optarg<<"'."<<endl;
2441 exit(EXIT_FAILURE);
2442 }
2443 #else
2444 cerr<<"dnsdist has been built without libsodium, -k/--setkey is unsupported."<<endl;
2445 exit(EXIT_FAILURE);
2446 #endif
2447 break;
2448 case 'l':
2449 g_cmdLine.locals.push_back(trim_copy(string(optarg)));
2450 break;
2451 case 'u':
2452 g_cmdLine.uid=optarg;
2453 break;
2454 case 'v':
2455 g_verbose=true;
2456 break;
2457 case 'V':
2458 #ifdef LUAJIT_VERSION
2459 cout<<"dnsdist "<<VERSION<<" ("<<LUA_RELEASE<<" ["<<LUAJIT_VERSION<<"])"<<endl;
2460 #else
2461 cout<<"dnsdist "<<VERSION<<" ("<<LUA_RELEASE<<")"<<endl;
2462 #endif
2463 cout<<"Enabled features: ";
2464 #ifdef HAVE_DNS_OVER_TLS
2465 cout<<"dns-over-tls(";
2466 #ifdef HAVE_GNUTLS
2467 cout<<"gnutls";
2468 #ifdef HAVE_LIBSSL
2469 cout<<" ";
2470 #endif
2471 #endif
2472 #ifdef HAVE_LIBSSL
2473 cout<<"openssl";
2474 #endif
2475 cout<<") ";
2476 #endif
2477 #ifdef HAVE_DNS_OVER_HTTPS
2478 cout<<"dns-over-https(DOH) ";
2479 #endif
2480 #ifdef HAVE_DNSCRYPT
2481 cout<<"dnscrypt ";
2482 #endif
2483 #ifdef HAVE_EBPF
2484 cout<<"ebpf ";
2485 #endif
2486 #ifdef HAVE_FSTRM
2487 cout<<"fstrm ";
2488 #endif
2489 #ifdef HAVE_LIBCRYPTO
2490 cout<<"ipcipher ";
2491 #endif
2492 #ifdef HAVE_LIBSODIUM
2493 cout<<"libsodium ";
2494 #endif
2495 #ifdef HAVE_PROTOBUF
2496 cout<<"protobuf ";
2497 #endif
2498 #ifdef HAVE_RE2
2499 cout<<"re2 ";
2500 #endif
2501 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
2502 cout<<"recvmmsg/sendmmsg ";
2503 #endif
2504 #ifdef HAVE_NET_SNMP
2505 cout<<"snmp ";
2506 #endif
2507 #ifdef HAVE_SYSTEMD
2508 cout<<"systemd";
2509 #endif
2510 cout<<endl;
2511 exit(EXIT_SUCCESS);
2512 break;
2513 case '?':
2514 //getopt_long printed an error message.
2515 usage();
2516 exit(EXIT_FAILURE);
2517 break;
2518 }
2519 }
2520
2521 argc-=optind;
2522 argv+=optind;
2523 for(auto p = argv; *p; ++p) {
2524 if(g_cmdLine.beClient) {
2525 clientAddress = ComboAddress(*p, 5199);
2526 } else {
2527 g_cmdLine.remotes.push_back(*p);
2528 }
2529 }
2530
2531 ServerPolicy leastOutstandingPol{"leastOutstanding", leastOutstanding, false};
2532
2533 g_policy.setState(leastOutstandingPol);
2534 if(g_cmdLine.beClient || !g_cmdLine.command.empty()) {
2535 setupLua(true, g_cmdLine.config);
2536 if (clientAddress != ComboAddress())
2537 g_serverControl = clientAddress;
2538 doClient(g_serverControl, g_cmdLine.command);
2539 _exit(EXIT_SUCCESS);
2540 }
2541
2542 auto acl = g_ACL.getCopy();
2543 if(acl.empty()) {
2544 for(auto& addr : {"127.0.0.0/8", "10.0.0.0/8", "100.64.0.0/10", "169.254.0.0/16", "192.168.0.0/16", "172.16.0.0/12", "::1/128", "fc00::/7", "fe80::/10"})
2545 acl.addMask(addr);
2546 g_ACL.setState(acl);
2547 }
2548
2549 auto consoleACL = g_consoleACL.getCopy();
2550 for (const auto& mask : { "127.0.0.1/8", "::1/128" }) {
2551 consoleACL.addMask(mask);
2552 }
2553 g_consoleACL.setState(consoleACL);
2554
2555 if (g_cmdLine.checkConfig) {
2556 setupLua(true, g_cmdLine.config);
2557 // No exception was thrown
2558 infolog("Configuration '%s' OK!", g_cmdLine.config);
2559 _exit(EXIT_SUCCESS);
2560 }
2561
2562 auto todo=setupLua(false, g_cmdLine.config);
2563
2564 auto localPools = g_pools.getCopy();
2565 {
2566 bool precompute = false;
2567 if (g_policy.getLocal()->name == "chashed") {
2568 precompute = true;
2569 } else {
2570 for (const auto& entry: localPools) {
2571 if (entry.second->policy != nullptr && entry.second->policy->name == "chashed") {
2572 precompute = true;
2573 break ;
2574 }
2575 }
2576 }
2577 if (precompute) {
2578 vinfolog("Pre-computing hashes for consistent hash load-balancing policy");
2579 // pre compute hashes
2580 auto backends = g_dstates.getLocal();
2581 for (auto& backend: *backends) {
2582 backend->hash();
2583 }
2584 }
2585 }
2586
2587 if (!g_cmdLine.locals.empty()) {
2588 for (auto it = g_frontends.begin(); it != g_frontends.end(); ) {
2589 /* DoH, DoT and DNSCrypt frontends are separate */
2590 if ((*it)->dohFrontend == nullptr && (*it)->tlsFrontend == nullptr && (*it)->dnscryptCtx == nullptr) {
2591 it = g_frontends.erase(it);
2592 }
2593 else {
2594 ++it;
2595 }
2596 }
2597
2598 for(const auto& loc : g_cmdLine.locals) {
2599 /* UDP */
2600 g_frontends.push_back(std::unique_ptr<ClientState>(new ClientState(ComboAddress(loc, 53), false, false, 0, "", {})));
2601 /* TCP */
2602 g_frontends.push_back(std::unique_ptr<ClientState>(new ClientState(ComboAddress(loc, 53), true, false, 0, "", {})));
2603 }
2604 }
2605
2606 if (g_frontends.empty()) {
2607 /* UDP */
2608 g_frontends.push_back(std::unique_ptr<ClientState>(new ClientState(ComboAddress("127.0.0.1", 53), false, false, 0, "", {})));
2609 /* TCP */
2610 g_frontends.push_back(std::unique_ptr<ClientState>(new ClientState(ComboAddress("127.0.0.1", 53), true, false, 0, "", {})));
2611 }
2612
2613 g_configurationDone = true;
2614
2615 for(auto& frontend : g_frontends) {
2616 setUpLocalBind(frontend);
2617
2618 if (frontend->tcp == false) {
2619 ++udpBindsCount;
2620 }
2621 else {
2622 ++tcpBindsCount;
2623 }
2624 }
2625
2626 warnlog("dnsdist %s comes with ABSOLUTELY NO WARRANTY. This is free software, and you are welcome to redistribute it according to the terms of the GPL version 2", VERSION);
2627
2628 vector<string> vec;
2629 std::string acls;
2630 g_ACL.getLocal()->toStringVector(&vec);
2631 for(const auto& s : vec) {
2632 if (!acls.empty())
2633 acls += ", ";
2634 acls += s;
2635 }
2636 infolog("ACL allowing queries from: %s", acls.c_str());
2637 vec.clear();
2638 acls.clear();
2639 g_consoleACL.getLocal()->toStringVector(&vec);
2640 for (const auto& entry : vec) {
2641 if (!acls.empty()) {
2642 acls += ", ";
2643 }
2644 acls += entry;
2645 }
2646 infolog("Console ACL allowing connections from: %s", acls.c_str());
2647
2648 #ifdef HAVE_LIBSODIUM
2649 if (g_consoleEnabled && g_consoleKey.empty()) {
2650 warnlog("Warning, the console has been enabled via 'controlSocket()' but no key has been set with 'setKey()' so all connections will fail until a key has been set");
2651 }
2652 #endif
2653
2654 uid_t newgid=0;
2655 gid_t newuid=0;
2656
2657 if(!g_cmdLine.gid.empty())
2658 newgid = strToGID(g_cmdLine.gid.c_str());
2659
2660 if(!g_cmdLine.uid.empty())
2661 newuid = strToUID(g_cmdLine.uid.c_str());
2662
2663 dropGroupPrivs(newgid);
2664 dropUserPrivs(newuid);
2665 try {
2666 /* we might still have capabilities remaining,
2667 for example if we have been started as root
2668 without --uid or --gid (please don't do that)
2669 or as an unprivileged user with ambient
2670 capabilities like CAP_NET_BIND_SERVICE.
2671 */
2672 dropCapabilities();
2673 }
2674 catch(const std::exception& e) {
2675 warnlog("%s", e.what());
2676 }
2677
2678 /* this need to be done _after_ dropping privileges */
2679 g_delay = new DelayPipe<DelayedPacket>();
2680
2681 if (g_snmpAgent) {
2682 g_snmpAgent->run();
2683 }
2684
2685 g_tcpclientthreads = std::unique_ptr<TCPClientCollection>(new TCPClientCollection(g_maxTCPClientThreads, g_useTCPSinglePipe));
2686
2687 for(auto& t : todo)
2688 t();
2689
2690 localPools = g_pools.getCopy();
2691 /* create the default pool no matter what */
2692 createPoolIfNotExists(localPools, "");
2693 if(g_cmdLine.remotes.size()) {
2694 for(const auto& address : g_cmdLine.remotes) {
2695 auto ret=std::make_shared<DownstreamState>(ComboAddress(address, 53));
2696 addServerToPool(localPools, "", ret);
2697 if (ret->connected && !ret->threadStarted.test_and_set()) {
2698 ret->tid = thread(responderThread, ret);
2699 }
2700 g_dstates.modify([ret](servers_t& servers) { servers.push_back(ret); });
2701 }
2702 }
2703 g_pools.setState(localPools);
2704
2705 if(g_dstates.getLocal()->empty()) {
2706 errlog("No downstream servers defined: all packets will get dropped");
2707 // you might define them later, but you need to know
2708 }
2709
2710 checkFileDescriptorsLimits(udpBindsCount, tcpBindsCount);
2711
2712 for(auto& dss : g_dstates.getCopy()) { // it is a copy, but the internal shared_ptrs are the real deal
2713 if(dss->availability==DownstreamState::Availability::Auto) {
2714 bool newState=upCheck(dss);
2715 warnlog("Marking downstream %s as '%s'", dss->getNameWithAddr(), newState ? "up" : "down");
2716 dss->upStatus = newState;
2717 }
2718 }
2719
2720 for(auto& cs : g_frontends) {
2721 if (cs->dohFrontend != nullptr) {
2722 #ifdef HAVE_DNS_OVER_HTTPS
2723 std::thread t1(dohThread, cs.get());
2724 if (!cs->cpus.empty()) {
2725 mapThreadToCPUList(t1.native_handle(), cs->cpus);
2726 }
2727 t1.detach();
2728 #endif /* HAVE_DNS_OVER_HTTPS */
2729 continue;
2730 }
2731 if (cs->udpFD >= 0) {
2732 thread t1(udpClientThread, cs.get());
2733 if (!cs->cpus.empty()) {
2734 mapThreadToCPUList(t1.native_handle(), cs->cpus);
2735 }
2736 t1.detach();
2737 }
2738 else if (cs->tcpFD >= 0) {
2739 thread t1(tcpAcceptorThread, cs.get());
2740 if (!cs->cpus.empty()) {
2741 mapThreadToCPUList(t1.native_handle(), cs->cpus);
2742 }
2743 t1.detach();
2744 }
2745 }
2746
2747 thread carbonthread(carbonDumpThread);
2748 carbonthread.detach();
2749
2750 thread stattid(maintThread);
2751 stattid.detach();
2752
2753 thread healththread(healthChecksThread);
2754
2755 if (!g_secPollSuffix.empty()) {
2756 thread secpollthread(secPollThread);
2757 secpollthread.detach();
2758 }
2759
2760 if(g_cmdLine.beSupervised) {
2761 #ifdef HAVE_SYSTEMD
2762 sd_notify(0, "READY=1");
2763 #endif
2764 healththread.join();
2765 }
2766 else {
2767 healththread.detach();
2768 doConsole();
2769 }
2770 _exit(EXIT_SUCCESS);
2771
2772 }
2773 catch(const LuaContext::ExecutionErrorException& e) {
2774 try {
2775 errlog("Fatal Lua error: %s", e.what());
2776 std::rethrow_if_nested(e);
2777 } catch(const std::exception& ne) {
2778 errlog("Details: %s", ne.what());
2779 }
2780 catch(PDNSException &ae)
2781 {
2782 errlog("Fatal pdns error: %s", ae.reason);
2783 }
2784 _exit(EXIT_FAILURE);
2785 }
2786 catch(std::exception &e)
2787 {
2788 errlog("Fatal error: %s", e.what());
2789 _exit(EXIT_FAILURE);
2790 }
2791 catch(PDNSException &ae)
2792 {
2793 errlog("Fatal pdns error: %s", ae.reason);
2794 _exit(EXIT_FAILURE);
2795 }
2796
2797 uint64_t getLatencyCount(const std::string&)
2798 {
2799 return g_stats.responses + g_stats.selfAnswered + g_stats.cacheHits;
2800 }