]> git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/dnsdist.cc
Merge pull request #7738 from cvdwel/patch-2
[thirdparty/pdns.git] / pdns / dnsdist.cc
1 /*
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
8 *
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
22
23 #include "config.h"
24
25 #include <fstream>
26 #include <getopt.h>
27 #include <grp.h>
28 #include <limits>
29 #include <netinet/tcp.h>
30 #include <pwd.h>
31 #include <sys/resource.h>
32 #include <unistd.h>
33
34 #if defined (__OpenBSD__) || defined(__NetBSD__)
35 #include <readline/readline.h>
36 #else
37 #include <editline/readline.h>
38 #endif
39
40 #ifdef HAVE_SYSTEMD
41 #include <systemd/sd-daemon.h>
42 #endif
43
44 #include "dnsdist.hh"
45 #include "dnsdist-cache.hh"
46 #include "dnsdist-console.hh"
47 #include "dnsdist-ecs.hh"
48 #include "dnsdist-lua.hh"
49 #include "dnsdist-rings.hh"
50 #include "dnsdist-secpoll.hh"
51 #include "dnsdist-xpf.hh"
52
53 #include "base64.hh"
54 #include "delaypipe.hh"
55 #include "dolog.hh"
56 #include "dnsname.hh"
57 #include "dnsparser.hh"
58 #include "dnswriter.hh"
59 #include "ednsoptions.hh"
60 #include "gettime.hh"
61 #include "lock.hh"
62 #include "misc.hh"
63 #include "sodcrypto.hh"
64 #include "sstuff.hh"
65 #include "threadname.hh"
66
67 /* Known sins:
68
69 Receiver is currently single threaded
70 not *that* bad actually, but now that we are thread safe, might want to scale
71 */
72
73 /* the Rulaction plan
74 Set of Rules, if one matches, it leads to an Action
75 Both rules and actions could conceivably be Lua based.
76 On the C++ side, both could be inherited from a class Rule and a class Action,
77 on the Lua side we can't do that. */
78
79 using std::atomic;
80 using std::thread;
81 bool g_verbose;
82
83 struct DNSDistStats g_stats;
84 MetricDefinitionStorage g_metricDefinitions;
85
86 uint16_t g_maxOutstanding{10240};
87 bool g_verboseHealthChecks{false};
88 uint32_t g_staleCacheEntriesTTL{0};
89 bool g_syslog{true};
90 bool g_allowEmptyResponse{false};
91
92 GlobalStateHolder<NetmaskGroup> g_ACL;
93 string g_outputBuffer;
94
95 std::vector<std::shared_ptr<TLSFrontend>> g_tlslocals;
96 std::vector<std::shared_ptr<DOHFrontend>> g_dohlocals;
97 std::vector<std::shared_ptr<DNSCryptContext>> g_dnsCryptLocals;
98 #ifdef HAVE_EBPF
99 shared_ptr<BPFFilter> g_defaultBPFFilter;
100 std::vector<std::shared_ptr<DynBPFFilter> > g_dynBPFFilters;
101 #endif /* HAVE_EBPF */
102 std::vector<std::unique_ptr<ClientState>> g_frontends;
103 GlobalStateHolder<pools_t> g_pools;
104 size_t g_udpVectorSize{1};
105
106 bool g_snmpEnabled{false};
107 bool g_snmpTrapsEnabled{false};
108 DNSDistSNMPAgent* g_snmpAgent{nullptr};
109
110 /* UDP: the grand design. Per socket we listen on for incoming queries there is one thread.
111 Then we have a bunch of connected sockets for talking to downstream servers.
112 We send directly to those sockets.
113
114 For the return path, per downstream server we have a thread that listens to responses.
115
116 Per socket there is an array of 2^16 states, when we send out a packet downstream, we note
117 there the original requestor and the original id. The new ID is the offset in the array.
118
119 When an answer comes in on a socket, we look up the offset by the id, and lob it to the
120 original requestor.
121
122 IDs are assigned by atomic increments of the socket offset.
123 */
124
125 GlobalStateHolder<vector<DNSDistRuleAction> > g_rulactions;
126 GlobalStateHolder<vector<DNSDistResponseRuleAction> > g_resprulactions;
127 GlobalStateHolder<vector<DNSDistResponseRuleAction> > g_cachehitresprulactions;
128 GlobalStateHolder<vector<DNSDistResponseRuleAction> > g_selfansweredresprulactions;
129
130 Rings g_rings;
131 QueryCount g_qcount;
132
133 GlobalStateHolder<servers_t> g_dstates;
134 GlobalStateHolder<NetmaskTree<DynBlock>> g_dynblockNMG;
135 GlobalStateHolder<SuffixMatchTree<DynBlock>> g_dynblockSMT;
136 DNSAction::Action g_dynBlockAction = DNSAction::Action::Drop;
137 int g_tcpRecvTimeout{2};
138 int g_tcpSendTimeout{2};
139 int g_udpTimeout{2};
140
141 bool g_servFailOnNoPolicy{false};
142 bool g_truncateTC{false};
143 bool g_fixupCase{false};
144 bool g_preserveTrailingData{false};
145 bool g_roundrobinFailOnNoServer{false};
146
147 static void truncateTC(char* packet, uint16_t* len, size_t responseSize, unsigned int consumed)
148 try
149 {
150 bool hadEDNS = false;
151 uint16_t payloadSize = 0;
152 uint16_t z = 0;
153
154 if (g_addEDNSToSelfGeneratedResponses) {
155 hadEDNS = getEDNSUDPPayloadSizeAndZ(packet, *len, &payloadSize, &z);
156 }
157
158 *len=static_cast<uint16_t>(sizeof(dnsheader)+consumed+DNS_TYPE_SIZE+DNS_CLASS_SIZE);
159 struct dnsheader* dh = reinterpret_cast<struct dnsheader*>(packet);
160 dh->ancount = dh->arcount = dh->nscount = 0;
161
162 if (hadEDNS) {
163 addEDNS(dh, *len, responseSize, z & EDNS_HEADER_FLAG_DO, payloadSize, 0);
164 }
165 }
166 catch(...)
167 {
168 g_stats.truncFail++;
169 }
170
171 struct DelayedPacket
172 {
173 int fd;
174 string packet;
175 ComboAddress destination;
176 ComboAddress origDest;
177 void operator()()
178 {
179 ssize_t res;
180 if(origDest.sin4.sin_family == 0) {
181 res = sendto(fd, packet.c_str(), packet.size(), 0, (struct sockaddr*)&destination, destination.getSocklen());
182 }
183 else {
184 res = sendfromto(fd, packet.c_str(), packet.size(), 0, origDest, destination);
185 }
186 if (res == -1) {
187 int err = errno;
188 vinfolog("Error sending delayed response to %s: %s", destination.toStringWithPort(), strerror(err));
189 }
190 }
191 };
192
193 DelayPipe<DelayedPacket>* g_delay = nullptr;
194
195 void doLatencyStats(double udiff)
196 {
197 if(udiff < 1000) ++g_stats.latency0_1;
198 else if(udiff < 10000) ++g_stats.latency1_10;
199 else if(udiff < 50000) ++g_stats.latency10_50;
200 else if(udiff < 100000) ++g_stats.latency50_100;
201 else if(udiff < 1000000) ++g_stats.latency100_1000;
202 else ++g_stats.latencySlow;
203
204 auto doAvg = [](double& var, double n, double weight) {
205 var = (weight -1) * var/weight + n/weight;
206 };
207
208 doAvg(g_stats.latencyAvg100, udiff, 100);
209 doAvg(g_stats.latencyAvg1000, udiff, 1000);
210 doAvg(g_stats.latencyAvg10000, udiff, 10000);
211 doAvg(g_stats.latencyAvg1000000, udiff, 1000000);
212 }
213
214 bool responseContentMatches(const char* response, const uint16_t responseLen, const DNSName& qname, const uint16_t qtype, const uint16_t qclass, const ComboAddress& remote, unsigned int& consumed)
215 {
216 if (responseLen < sizeof(dnsheader)) {
217 return false;
218 }
219
220 const struct dnsheader* dh = reinterpret_cast<const struct dnsheader*>(response);
221 if (dh->qdcount == 0) {
222 if ((dh->rcode != RCode::NoError && dh->rcode != RCode::NXDomain) || g_allowEmptyResponse) {
223 return true;
224 }
225 else {
226 ++g_stats.nonCompliantResponses;
227 return false;
228 }
229 }
230
231 uint16_t rqtype, rqclass;
232 DNSName rqname;
233 try {
234 rqname=DNSName(response, responseLen, sizeof(dnsheader), false, &rqtype, &rqclass, &consumed);
235 }
236 catch(const std::exception& e) {
237 if(responseLen > 0 && static_cast<size_t>(responseLen) > sizeof(dnsheader)) {
238 infolog("Backend %s sent us a response with id %d that did not parse: %s", remote.toStringWithPort(), ntohs(dh->id), e.what());
239 }
240 ++g_stats.nonCompliantResponses;
241 return false;
242 }
243
244 if (rqtype != qtype || rqclass != qclass || rqname != qname) {
245 return false;
246 }
247
248 return true;
249 }
250
251 static void restoreFlags(struct dnsheader* dh, uint16_t origFlags)
252 {
253 static const uint16_t rdMask = 1 << FLAGS_RD_OFFSET;
254 static const uint16_t cdMask = 1 << FLAGS_CD_OFFSET;
255 static const uint16_t restoreFlagsMask = UINT16_MAX & ~(rdMask | cdMask);
256 uint16_t * flags = getFlagsFromDNSHeader(dh);
257 /* clear the flags we are about to restore */
258 *flags &= restoreFlagsMask;
259 /* only keep the flags we want to restore */
260 origFlags &= ~restoreFlagsMask;
261 /* set the saved flags as they were */
262 *flags |= origFlags;
263 }
264
265 static bool fixUpQueryTurnedResponse(DNSQuestion& dq, const uint16_t origFlags)
266 {
267 restoreFlags(dq.dh, origFlags);
268
269 return addEDNSToQueryTurnedResponse(dq);
270 }
271
272 static bool fixUpResponse(char** response, uint16_t* responseLen, size_t* responseSize, const DNSName& qname, uint16_t origFlags, bool ednsAdded, bool ecsAdded, std::vector<uint8_t>& rewrittenResponse, uint16_t addRoom, bool* zeroScope)
273 {
274 if (*responseLen < sizeof(dnsheader)) {
275 return false;
276 }
277
278 struct dnsheader* dh = reinterpret_cast<struct dnsheader*>(*response);
279 restoreFlags(dh, origFlags);
280
281 if (*responseLen == sizeof(dnsheader)) {
282 return true;
283 }
284
285 if(g_fixupCase) {
286 string realname = qname.toDNSString();
287 if (*responseLen >= (sizeof(dnsheader) + realname.length())) {
288 memcpy(*response + sizeof(dnsheader), realname.c_str(), realname.length());
289 }
290 }
291
292 if (ednsAdded || ecsAdded) {
293 uint16_t optStart;
294 size_t optLen = 0;
295 bool last = false;
296
297 const std::string responseStr(*response, *responseLen);
298 int res = locateEDNSOptRR(responseStr, &optStart, &optLen, &last);
299
300 if (res == 0) {
301 if (zeroScope) { // this finds if an EDNS Client Subnet scope was set, and if it is 0
302 size_t optContentStart = 0;
303 uint16_t optContentLen = 0;
304 /* we need at least 4 bytes after the option length (family: 2, source prefix-length: 1, scope prefix-length: 1) */
305 if (isEDNSOptionInOpt(responseStr, optStart, optLen, EDNSOptionCode::ECS, &optContentStart, &optContentLen) && optContentLen >= 4) {
306 /* see if the EDNS Client Subnet SCOPE PREFIX-LENGTH byte in position 3 is set to 0, which is the only thing
307 we care about. */
308 *zeroScope = responseStr.at(optContentStart + 3) == 0;
309 }
310 }
311
312 if (ednsAdded) {
313 /* we added the entire OPT RR,
314 therefore we need to remove it entirely */
315 if (last) {
316 /* simply remove the last AR */
317 *responseLen -= optLen;
318 uint16_t arcount = ntohs(dh->arcount);
319 arcount--;
320 dh->arcount = htons(arcount);
321 }
322 else {
323 /* Removing an intermediary RR could lead to compression error */
324 if (rewriteResponseWithoutEDNS(responseStr, rewrittenResponse) == 0) {
325 *responseLen = rewrittenResponse.size();
326 if (addRoom && (UINT16_MAX - *responseLen) > addRoom) {
327 rewrittenResponse.reserve(*responseLen + addRoom);
328 }
329 *responseSize = rewrittenResponse.capacity();
330 *response = reinterpret_cast<char*>(rewrittenResponse.data());
331 }
332 else {
333 warnlog("Error rewriting content");
334 }
335 }
336 }
337 else {
338 /* the OPT RR was already present, but without ECS,
339 we need to remove the ECS option if any */
340 if (last) {
341 /* nothing after the OPT RR, we can simply remove the
342 ECS option */
343 size_t existingOptLen = optLen;
344 removeEDNSOptionFromOPT(*response + optStart, &optLen, EDNSOptionCode::ECS);
345 *responseLen -= (existingOptLen - optLen);
346 }
347 else {
348 /* Removing an intermediary RR could lead to compression error */
349 if (rewriteResponseWithoutEDNSOption(responseStr, EDNSOptionCode::ECS, rewrittenResponse) == 0) {
350 *responseLen = rewrittenResponse.size();
351 if (addRoom && (UINT16_MAX - *responseLen) > addRoom) {
352 rewrittenResponse.reserve(*responseLen + addRoom);
353 }
354 *responseSize = rewrittenResponse.capacity();
355 *response = reinterpret_cast<char*>(rewrittenResponse.data());
356 }
357 else {
358 warnlog("Error rewriting content");
359 }
360 }
361 }
362 }
363 }
364
365 return true;
366 }
367
368 #ifdef HAVE_DNSCRYPT
369 static bool encryptResponse(char* response, uint16_t* responseLen, size_t responseSize, bool tcp, std::shared_ptr<DNSCryptQuery> dnsCryptQuery, dnsheader** dh, dnsheader* dhCopy)
370 {
371 if (dnsCryptQuery) {
372 uint16_t encryptedResponseLen = 0;
373
374 /* save the original header before encrypting it in place */
375 if (dh != nullptr && *dh != nullptr && dhCopy != nullptr) {
376 memcpy(dhCopy, *dh, sizeof(dnsheader));
377 *dh = dhCopy;
378 }
379
380 int res = dnsCryptQuery->encryptResponse(response, *responseLen, responseSize, tcp, &encryptedResponseLen);
381 if (res == 0) {
382 *responseLen = encryptedResponseLen;
383 } else {
384 /* dropping response */
385 vinfolog("Error encrypting the response, dropping.");
386 return false;
387 }
388 }
389 return true;
390 }
391 #endif /* HAVE_DNSCRYPT */
392
393 static bool applyRulesToResponse(LocalStateHolder<vector<DNSDistResponseRuleAction> >& localRespRulactions, DNSResponse& dr)
394 {
395 DNSResponseAction::Action action=DNSResponseAction::Action::None;
396 std::string ruleresult;
397 for(const auto& lr : *localRespRulactions) {
398 if(lr.d_rule->matches(&dr)) {
399 lr.d_rule->d_matches++;
400 action=(*lr.d_action)(&dr, &ruleresult);
401 switch(action) {
402 case DNSResponseAction::Action::Allow:
403 return true;
404 break;
405 case DNSResponseAction::Action::Drop:
406 return false;
407 break;
408 case DNSResponseAction::Action::HeaderModify:
409 return true;
410 break;
411 case DNSResponseAction::Action::ServFail:
412 dr.dh->rcode = RCode::ServFail;
413 return true;
414 break;
415 /* non-terminal actions follow */
416 case DNSResponseAction::Action::Delay:
417 dr.delayMsec = static_cast<int>(pdns_stou(ruleresult)); // sorry
418 break;
419 case DNSResponseAction::Action::None:
420 break;
421 }
422 }
423 }
424
425 return true;
426 }
427
428 bool processResponse(char** response, uint16_t* responseLen, size_t* responseSize, LocalStateHolder<vector<DNSDistResponseRuleAction> >& localRespRulactions, DNSResponse& dr, size_t addRoom, std::vector<uint8_t>& rewrittenResponse, bool muted)
429 {
430 if (!applyRulesToResponse(localRespRulactions, dr)) {
431 return false;
432 }
433
434 bool zeroScope = false;
435 if (!fixUpResponse(response, responseLen, responseSize, *dr.qname, dr.origFlags, dr.ednsAdded, dr.ecsAdded, rewrittenResponse, addRoom, dr.useZeroScope ? &zeroScope : nullptr)) {
436 return false;
437 }
438
439 if (dr.packetCache && !dr.skipCache) {
440 if (!dr.useZeroScope) {
441 /* if the query was not suitable for zero-scope, for
442 example because it had an existing ECS entry so the hash is
443 not really 'no ECS', so just insert it for the existing subnet
444 since:
445 - we don't have the correct hash for a non-ECS query
446 - inserting with hash computed before the ECS replacement but with
447 the subnet extracted _after_ the replacement would not work.
448 */
449 zeroScope = false;
450 }
451 // if zeroScope, pass the pre-ECS hash-key and do not pass the subnet to the cache
452 dr.packetCache->insert(zeroScope ? dr.cacheKeyNoECS : dr.cacheKey, zeroScope ? boost::none : dr.subnet, dr.origFlags, dr.dnssecOK, *dr.qname, dr.qtype, dr.qclass, *response, *responseLen, dr.tcp, dr.dh->rcode, dr.tempFailureTTL);
453 }
454
455 #ifdef HAVE_DNSCRYPT
456 if (!muted) {
457 if (!encryptResponse(*response, responseLen, *responseSize, dr.tcp, dr.dnsCryptQuery, nullptr, nullptr)) {
458 return false;
459 }
460 }
461 #endif /* HAVE_DNSCRYPT */
462
463 return true;
464 }
465
466 static bool sendUDPResponse(int origFD, const char* response, const uint16_t responseLen, const int delayMsec, const ComboAddress& origDest, const ComboAddress& origRemote)
467 {
468 if(delayMsec && g_delay) {
469 DelayedPacket dp{origFD, string(response,responseLen), origRemote, origDest};
470 g_delay->submit(dp, delayMsec);
471 }
472 else {
473 ssize_t res;
474 if(origDest.sin4.sin_family == 0) {
475 res = sendto(origFD, response, responseLen, 0, reinterpret_cast<const struct sockaddr*>(&origRemote), origRemote.getSocklen());
476 }
477 else {
478 res = sendfromto(origFD, response, responseLen, 0, origDest, origRemote);
479 }
480 if (res == -1) {
481 int err = errno;
482 vinfolog("Error sending response to %s: %s", origRemote.toStringWithPort(), strerror(err));
483 }
484 }
485
486 return true;
487 }
488
489
490 int pickBackendSocketForSending(std::shared_ptr<DownstreamState>& state)
491 {
492 return state->sockets[state->socketsOffset++ % state->sockets.size()];
493 }
494
495 static void pickBackendSocketsReadyForReceiving(const std::shared_ptr<DownstreamState>& state, std::vector<int>& ready)
496 {
497 ready.clear();
498
499 if (state->sockets.size() == 1) {
500 ready.push_back(state->sockets[0]);
501 return ;
502 }
503
504 {
505 std::lock_guard<std::mutex> lock(state->socketsLock);
506 state->mplexer->getAvailableFDs(ready, -1);
507 }
508 }
509
510 // listens on a dedicated socket, lobs answers from downstream servers to original requestors
511 void responderThread(std::shared_ptr<DownstreamState> dss)
512 try {
513 setThreadName("dnsdist/respond");
514 auto localRespRulactions = g_resprulactions.getLocal();
515 char packet[4096 + DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE];
516 static_assert(sizeof(packet) <= UINT16_MAX, "Packet size should fit in a uint16_t");
517 /* when the answer is encrypted in place, we need to get a copy
518 of the original header before encryption to fill the ring buffer */
519 dnsheader cleartextDH;
520 vector<uint8_t> rewrittenResponse;
521
522 uint16_t queryId = 0;
523 std::vector<int> sockets;
524 sockets.reserve(dss->sockets.size());
525
526 for(;;) {
527 dnsheader* dh = reinterpret_cast<struct dnsheader*>(packet);
528 try {
529 pickBackendSocketsReadyForReceiving(dss, sockets);
530 for (const auto& fd : sockets) {
531 ssize_t got = recv(fd, packet, sizeof(packet), 0);
532 char * response = packet;
533 size_t responseSize = sizeof(packet);
534
535 if (got < 0 || static_cast<size_t>(got) < sizeof(dnsheader))
536 continue;
537
538 uint16_t responseLen = static_cast<uint16_t>(got);
539 queryId = dh->id;
540
541 if(queryId >= dss->idStates.size()) {
542 continue;
543 }
544
545 IDState* ids = &dss->idStates[queryId];
546 int origFD = ids->origFD;
547
548 if(origFD < 0 && ids->du == nullptr) // duplicate
549 continue;
550
551 /* setting age to 0 to prevent the maintainer thread from
552 cleaning this IDS while we process the response.
553 We have already a copy of the origFD, so it would
554 mostly mess up the outstanding counter.
555 */
556 ids->age = 0;
557
558 unsigned int consumed = 0;
559 if (!responseContentMatches(response, responseLen, ids->qname, ids->qtype, ids->qclass, dss->remote, consumed)) {
560 continue;
561 }
562
563 int oldFD = ids->origFD.exchange(-1);
564 if (oldFD == origFD) {
565 /* we only decrement the outstanding counter if the value was not
566 altered in the meantime, which would mean that the state has been actively reused
567 and the other thread has not incremented the outstanding counter, so we don't
568 want it to be decremented twice. */
569 --dss->outstanding; // you'd think an attacker could game this, but we're using connected socket
570 }
571
572 if(dh->tc && g_truncateTC) {
573 truncateTC(response, &responseLen, responseSize, consumed);
574 }
575
576 dh->id = ids->origID;
577
578 uint16_t addRoom = 0;
579 DNSResponse dr = makeDNSResponseFromIDState(*ids, dh, sizeof(packet), responseLen, false);
580 if (dr.dnsCryptQuery) {
581 addRoom = DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE;
582 }
583
584 memcpy(&cleartextDH, dr.dh, sizeof(cleartextDH));
585 if (!processResponse(&response, &responseLen, &responseSize, localRespRulactions, dr, addRoom, rewrittenResponse, ids->cs && ids->cs->muted)) {
586 continue;
587 }
588
589 if (ids->cs && !ids->cs->muted) {
590 if (ids->du) {
591 #ifdef HAVE_DNS_OVER_HTTPS
592 // DoH query
593 ids->du->query = std::string(response, responseLen);
594 if (send(ids->du->rsock, &ids->du, sizeof(ids->du), 0) != sizeof(ids->du)) {
595 delete ids->du;
596 }
597 #endif /* HAVE_DNS_OVER_HTTPS */
598 ids->du = nullptr;
599 }
600 else {
601 ComboAddress empty;
602 empty.sin4.sin_family = 0;
603 /* if ids->destHarvested is false, origDest holds the listening address.
604 We don't want to use that as a source since it could be 0.0.0.0 for example. */
605 sendUDPResponse(origFD, response, responseLen, dr.delayMsec, ids->destHarvested ? ids->origDest : empty, ids->origRemote);
606 }
607 }
608
609 ++g_stats.responses;
610
611 double udiff = ids->sentTime.udiff();
612 vinfolog("Got answer from %s, relayed to %s%s, took %f usec", dss->remote.toStringWithPort(), ids->origRemote.toStringWithPort(),
613 ids->du ? " (https)": "", udiff);
614
615 struct timespec ts;
616 gettime(&ts);
617 g_rings.insertResponse(ts, *dr.remote, *dr.qname, dr.qtype, static_cast<unsigned int>(udiff), static_cast<unsigned int>(got), cleartextDH, dss->remote);
618
619 switch (dh->rcode) {
620 case RCode::NXDomain:
621 ++g_stats.frontendNXDomain;
622 break;
623 case RCode::ServFail:
624 ++g_stats.servfailResponses;
625 ++g_stats.frontendServFail;
626 break;
627 case RCode::NoError:
628 ++g_stats.frontendNoError;
629 break;
630 }
631 dss->latencyUsec = (127.0 * dss->latencyUsec / 128.0) + udiff/128.0;
632
633 doLatencyStats(udiff);
634
635 rewrittenResponse.clear();
636 }
637 }
638 catch(const std::exception& e){
639 vinfolog("Got an error in UDP responder thread while parsing a response from %s, id %d: %s", dss->remote.toStringWithPort(), queryId, e.what());
640 }
641 }
642 }
643 catch(const std::exception& e)
644 {
645 errlog("UDP responder thread died because of exception: %s", e.what());
646 }
647 catch(const PDNSException& e)
648 {
649 errlog("UDP responder thread died because of PowerDNS exception: %s", e.reason);
650 }
651 catch(...)
652 {
653 errlog("UDP responder thread died because of an exception: %s", "unknown");
654 }
655
656 bool DownstreamState::reconnect()
657 {
658 std::unique_lock<std::mutex> tl(connectLock, std::try_to_lock);
659 if (!tl.owns_lock()) {
660 /* we are already reconnecting */
661 return false;
662 }
663
664 connected = false;
665 for (auto& fd : sockets) {
666 if (fd != -1) {
667 if (sockets.size() > 1) {
668 std::lock_guard<std::mutex> lock(socketsLock);
669 mplexer->removeReadFD(fd);
670 }
671 /* shutdown() is needed to wake up recv() in the responderThread */
672 shutdown(fd, SHUT_RDWR);
673 close(fd);
674 fd = -1;
675 }
676 if (!IsAnyAddress(remote)) {
677 fd = SSocket(remote.sin4.sin_family, SOCK_DGRAM, 0);
678 if (!IsAnyAddress(sourceAddr)) {
679 SSetsockopt(fd, SOL_SOCKET, SO_REUSEADDR, 1);
680 SBind(fd, sourceAddr);
681 }
682 try {
683 SConnect(fd, remote);
684 if (sockets.size() > 1) {
685 std::lock_guard<std::mutex> lock(socketsLock);
686 mplexer->addReadFD(fd, [](int, boost::any) {});
687 }
688 connected = true;
689 }
690 catch(const std::runtime_error& error) {
691 infolog("Error connecting to new server with address %s: %s", remote.toStringWithPort(), error.what());
692 connected = false;
693 break;
694 }
695 }
696 }
697
698 /* if at least one (re-)connection failed, close all sockets */
699 if (!connected) {
700 for (auto& fd : sockets) {
701 if (fd != -1) {
702 if (sockets.size() > 1) {
703 std::lock_guard<std::mutex> lock(socketsLock);
704 mplexer->removeReadFD(fd);
705 }
706 /* shutdown() is needed to wake up recv() in the responderThread */
707 shutdown(fd, SHUT_RDWR);
708 close(fd);
709 fd = -1;
710 }
711 }
712 }
713
714 return connected;
715 }
716 void DownstreamState::hash()
717 {
718 vinfolog("Computing hashes for id=%s and weight=%d", id, weight);
719 auto w = weight;
720 WriteLock wl(&d_lock);
721 hashes.clear();
722 while (w > 0) {
723 std::string uuid = boost::str(boost::format("%s-%d") % id % w);
724 unsigned int wshash = burtleCI((const unsigned char*)uuid.c_str(), uuid.size(), g_hashperturb);
725 hashes.insert(wshash);
726 --w;
727 }
728 }
729
730 void DownstreamState::setId(const boost::uuids::uuid& newId)
731 {
732 id = newId;
733 // compute hashes only if already done
734 if (!hashes.empty()) {
735 hash();
736 }
737 }
738
739 void DownstreamState::setWeight(int newWeight)
740 {
741 if (newWeight < 1) {
742 errlog("Error setting server's weight: downstream weight value must be greater than 0.");
743 return ;
744 }
745 weight = newWeight;
746 if (!hashes.empty()) {
747 hash();
748 }
749 }
750
751 DownstreamState::DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf_, size_t numberOfSockets): remote(remote_), sourceAddr(sourceAddr_), sourceItf(sourceItf_)
752 {
753 pthread_rwlock_init(&d_lock, nullptr);
754 id = getUniqueID();
755 threadStarted.clear();
756
757 mplexer = std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
758
759 sockets.resize(numberOfSockets);
760 for (auto& fd : sockets) {
761 fd = -1;
762 }
763
764 if (!IsAnyAddress(remote)) {
765 reconnect();
766 idStates.resize(g_maxOutstanding);
767 sw.start();
768 infolog("Added downstream server %s", remote.toStringWithPort());
769 }
770
771 }
772
773 std::mutex g_luamutex;
774 LuaContext g_lua;
775
776 GlobalStateHolder<ServerPolicy> g_policy;
777
778 shared_ptr<DownstreamState> firstAvailable(const NumberedServerVector& servers, const DNSQuestion* dq)
779 {
780 for(auto& d : servers) {
781 if(d.second->isUp() && d.second->qps.check())
782 return d.second;
783 }
784 return leastOutstanding(servers, dq);
785 }
786
787 // get server with least outstanding queries, and within those, with the lowest order, and within those: the fastest
788 shared_ptr<DownstreamState> leastOutstanding(const NumberedServerVector& servers, const DNSQuestion* dq)
789 {
790 if (servers.size() == 1 && servers[0].second->isUp()) {
791 return servers[0].second;
792 }
793
794 vector<pair<tuple<int,int,double>, shared_ptr<DownstreamState>>> poss;
795 /* so you might wonder, why do we go through this trouble? The data on which we sort could change during the sort,
796 which would suck royally and could even lead to crashes. So first we snapshot on what we sort, and then we sort */
797 poss.reserve(servers.size());
798 for(auto& d : servers) {
799 if(d.second->isUp()) {
800 poss.push_back({make_tuple(d.second->outstanding.load(), d.second->order, d.second->latencyUsec), d.second});
801 }
802 }
803 if(poss.empty())
804 return shared_ptr<DownstreamState>();
805 nth_element(poss.begin(), poss.begin(), poss.end(), [](const decltype(poss)::value_type& a, const decltype(poss)::value_type& b) { return a.first < b.first; });
806 return poss.begin()->second;
807 }
808
809 shared_ptr<DownstreamState> valrandom(unsigned int val, const NumberedServerVector& servers, const DNSQuestion* dq)
810 {
811 vector<pair<int, shared_ptr<DownstreamState>>> poss;
812 int sum = 0;
813 int max = std::numeric_limits<int>::max();
814
815 for(auto& d : servers) { // w=1, w=10 -> 1, 11
816 if(d.second->isUp()) {
817 // Don't overflow sum when adding high weights
818 if(d.second->weight > max - sum) {
819 sum = max;
820 } else {
821 sum += d.second->weight;
822 }
823
824 poss.push_back({sum, d.second});
825 }
826 }
827
828 // Catch poss & sum are empty to avoid SIGFPE
829 if(poss.empty())
830 return shared_ptr<DownstreamState>();
831
832 int r = val % sum;
833 auto p = upper_bound(poss.begin(), poss.end(),r, [](int r_, const decltype(poss)::value_type& a) { return r_ < a.first;});
834 if(p==poss.end())
835 return shared_ptr<DownstreamState>();
836 return p->second;
837 }
838
839 shared_ptr<DownstreamState> wrandom(const NumberedServerVector& servers, const DNSQuestion* dq)
840 {
841 return valrandom(random(), servers, dq);
842 }
843
844 uint32_t g_hashperturb;
845 shared_ptr<DownstreamState> whashed(const NumberedServerVector& servers, const DNSQuestion* dq)
846 {
847 return valrandom(dq->qname->hash(g_hashperturb), servers, dq);
848 }
849
850 shared_ptr<DownstreamState> chashed(const NumberedServerVector& servers, const DNSQuestion* dq)
851 {
852 unsigned int qhash = dq->qname->hash(g_hashperturb);
853 unsigned int sel = std::numeric_limits<unsigned int>::max();
854 unsigned int min = std::numeric_limits<unsigned int>::max();
855 shared_ptr<DownstreamState> ret = nullptr, first = nullptr;
856
857 for (const auto& d: servers) {
858 if (d.second->isUp()) {
859 // make sure hashes have been computed
860 if (d.second->hashes.empty()) {
861 d.second->hash();
862 }
863 {
864 ReadLock rl(&(d.second->d_lock));
865 const auto& server = d.second;
866 // we want to keep track of the last hash
867 if (min > *(server->hashes.begin())) {
868 min = *(server->hashes.begin());
869 first = server;
870 }
871
872 auto hash_it = server->hashes.lower_bound(qhash);
873 if (hash_it != server->hashes.end()) {
874 if (*hash_it < sel) {
875 sel = *hash_it;
876 ret = server;
877 }
878 }
879 }
880 }
881 }
882 if (ret != nullptr) {
883 return ret;
884 }
885 if (first != nullptr) {
886 return first;
887 }
888 return shared_ptr<DownstreamState>();
889 }
890
891 shared_ptr<DownstreamState> roundrobin(const NumberedServerVector& servers, const DNSQuestion* dq)
892 {
893 NumberedServerVector poss;
894
895 for(auto& d : servers) {
896 if(d.second->isUp()) {
897 poss.push_back(d);
898 }
899 }
900
901 const auto *res=&poss;
902 if(poss.empty() && !g_roundrobinFailOnNoServer)
903 res = &servers;
904
905 if(res->empty())
906 return shared_ptr<DownstreamState>();
907
908 static unsigned int counter;
909
910 return (*res)[(counter++) % res->size()].second;
911 }
912
913 ComboAddress g_serverControl{"127.0.0.1:5199"};
914
915 std::shared_ptr<ServerPool> createPoolIfNotExists(pools_t& pools, const string& poolName)
916 {
917 std::shared_ptr<ServerPool> pool;
918 pools_t::iterator it = pools.find(poolName);
919 if (it != pools.end()) {
920 pool = it->second;
921 }
922 else {
923 if (!poolName.empty())
924 vinfolog("Creating pool %s", poolName);
925 pool = std::make_shared<ServerPool>();
926 pools.insert(std::pair<std::string,std::shared_ptr<ServerPool> >(poolName, pool));
927 }
928 return pool;
929 }
930
931 void setPoolPolicy(pools_t& pools, const string& poolName, std::shared_ptr<ServerPolicy> policy)
932 {
933 std::shared_ptr<ServerPool> pool = createPoolIfNotExists(pools, poolName);
934 if (!poolName.empty()) {
935 vinfolog("Setting pool %s server selection policy to %s", poolName, policy->name);
936 } else {
937 vinfolog("Setting default pool server selection policy to %s", policy->name);
938 }
939 pool->policy = policy;
940 }
941
942 void addServerToPool(pools_t& pools, const string& poolName, std::shared_ptr<DownstreamState> server)
943 {
944 std::shared_ptr<ServerPool> pool = createPoolIfNotExists(pools, poolName);
945 if (!poolName.empty()) {
946 vinfolog("Adding server to pool %s", poolName);
947 } else {
948 vinfolog("Adding server to default pool");
949 }
950 pool->addServer(server);
951 }
952
953 void removeServerFromPool(pools_t& pools, const string& poolName, std::shared_ptr<DownstreamState> server)
954 {
955 std::shared_ptr<ServerPool> pool = getPool(pools, poolName);
956
957 if (!poolName.empty()) {
958 vinfolog("Removing server from pool %s", poolName);
959 }
960 else {
961 vinfolog("Removing server from default pool");
962 }
963
964 pool->removeServer(server);
965 }
966
967 std::shared_ptr<ServerPool> getPool(const pools_t& pools, const std::string& poolName)
968 {
969 pools_t::const_iterator it = pools.find(poolName);
970
971 if (it == pools.end()) {
972 throw std::out_of_range("No pool named " + poolName);
973 }
974
975 return it->second;
976 }
977
978 NumberedServerVector getDownstreamCandidates(const pools_t& pools, const std::string& poolName)
979 {
980 std::shared_ptr<ServerPool> pool = getPool(pools, poolName);
981 return pool->getServers();
982 }
983
984 static void spoofResponseFromString(DNSQuestion& dq, const string& spoofContent)
985 {
986 string result;
987
988 std::vector<std::string> addrs;
989 stringtok(addrs, spoofContent, " ,");
990
991 if (addrs.size() == 1) {
992 try {
993 ComboAddress spoofAddr(spoofContent);
994 SpoofAction sa({spoofAddr});
995 sa(&dq, &result);
996 }
997 catch(const PDNSException &e) {
998 SpoofAction sa(spoofContent); // CNAME then
999 sa(&dq, &result);
1000 }
1001 } else {
1002 std::vector<ComboAddress> cas;
1003 for (const auto& addr : addrs) {
1004 try {
1005 cas.push_back(ComboAddress(addr));
1006 }
1007 catch (...) {
1008 }
1009 }
1010 SpoofAction sa(cas);
1011 sa(&dq, &result);
1012 }
1013 }
1014
1015 static bool applyRulesToQuery(LocalHolders& holders, DNSQuestion& dq, string& poolname, const struct timespec& now)
1016 {
1017 g_rings.insertQuery(now, *dq.remote, *dq.qname, dq.qtype, dq.len, *dq.dh);
1018
1019 if(g_qcount.enabled) {
1020 string qname = (*dq.qname).toString(".");
1021 bool countQuery{true};
1022 if(g_qcount.filter) {
1023 std::lock_guard<std::mutex> lock(g_luamutex);
1024 std::tie (countQuery, qname) = g_qcount.filter(&dq);
1025 }
1026
1027 if(countQuery) {
1028 WriteLock wl(&g_qcount.queryLock);
1029 if(!g_qcount.records.count(qname)) {
1030 g_qcount.records[qname] = 0;
1031 }
1032 g_qcount.records[qname]++;
1033 }
1034 }
1035
1036 if(auto got = holders.dynNMGBlock->lookup(*dq.remote)) {
1037 auto updateBlockStats = [&got]() {
1038 ++g_stats.dynBlocked;
1039 got->second.blocks++;
1040 };
1041
1042 if(now < got->second.until) {
1043 DNSAction::Action action = got->second.action;
1044 if (action == DNSAction::Action::None) {
1045 action = g_dynBlockAction;
1046 }
1047 switch (action) {
1048 case DNSAction::Action::NoOp:
1049 /* do nothing */
1050 break;
1051
1052 case DNSAction::Action::Nxdomain:
1053 vinfolog("Query from %s turned into NXDomain because of dynamic block", dq.remote->toStringWithPort());
1054 updateBlockStats();
1055
1056 dq.dh->rcode = RCode::NXDomain;
1057 dq.dh->qr=true;
1058 return true;
1059
1060 case DNSAction::Action::Refused:
1061 vinfolog("Query from %s refused because of dynamic block", dq.remote->toStringWithPort());
1062 updateBlockStats();
1063
1064 dq.dh->rcode = RCode::Refused;
1065 dq.dh->qr = true;
1066 return true;
1067
1068 case DNSAction::Action::Truncate:
1069 if(!dq.tcp) {
1070 updateBlockStats();
1071 vinfolog("Query from %s truncated because of dynamic block", dq.remote->toStringWithPort());
1072 dq.dh->tc = true;
1073 dq.dh->qr = true;
1074 return true;
1075 }
1076 else {
1077 vinfolog("Query from %s for %s over TCP *not* truncated because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toString());
1078 }
1079 break;
1080 case DNSAction::Action::NoRecurse:
1081 updateBlockStats();
1082 vinfolog("Query from %s setting rd=0 because of dynamic block", dq.remote->toStringWithPort());
1083 dq.dh->rd = false;
1084 return true;
1085 default:
1086 updateBlockStats();
1087 vinfolog("Query from %s dropped because of dynamic block", dq.remote->toStringWithPort());
1088 return false;
1089 }
1090 }
1091 }
1092
1093 if(auto got = holders.dynSMTBlock->lookup(*dq.qname)) {
1094 auto updateBlockStats = [&got]() {
1095 ++g_stats.dynBlocked;
1096 got->blocks++;
1097 };
1098
1099 if(now < got->until) {
1100 DNSAction::Action action = got->action;
1101 if (action == DNSAction::Action::None) {
1102 action = g_dynBlockAction;
1103 }
1104 switch (action) {
1105 case DNSAction::Action::NoOp:
1106 /* do nothing */
1107 break;
1108 case DNSAction::Action::Nxdomain:
1109 vinfolog("Query from %s for %s turned into NXDomain because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toString());
1110 updateBlockStats();
1111
1112 dq.dh->rcode = RCode::NXDomain;
1113 dq.dh->qr=true;
1114 return true;
1115 case DNSAction::Action::Refused:
1116 vinfolog("Query from %s for %s refused because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toString());
1117 updateBlockStats();
1118
1119 dq.dh->rcode = RCode::Refused;
1120 dq.dh->qr=true;
1121 return true;
1122 case DNSAction::Action::Truncate:
1123 if(!dq.tcp) {
1124 updateBlockStats();
1125
1126 vinfolog("Query from %s for %s truncated because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toString());
1127 dq.dh->tc = true;
1128 dq.dh->qr = true;
1129 return true;
1130 }
1131 else {
1132 vinfolog("Query from %s for %s over TCP *not* truncated because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toString());
1133 }
1134 break;
1135 case DNSAction::Action::NoRecurse:
1136 updateBlockStats();
1137 vinfolog("Query from %s setting rd=0 because of dynamic block", dq.remote->toStringWithPort());
1138 dq.dh->rd = false;
1139 return true;
1140 default:
1141 updateBlockStats();
1142 vinfolog("Query from %s for %s dropped because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toString());
1143 return false;
1144 }
1145 }
1146 }
1147
1148 DNSAction::Action action=DNSAction::Action::None;
1149 string ruleresult;
1150 for(const auto& lr : *holders.rulactions) {
1151 if(lr.d_rule->matches(&dq)) {
1152 lr.d_rule->d_matches++;
1153 action=(*lr.d_action)(&dq, &ruleresult);
1154
1155 switch(action) {
1156 case DNSAction::Action::Allow:
1157 return true;
1158 break;
1159 case DNSAction::Action::Drop:
1160 ++g_stats.ruleDrop;
1161 return false;
1162 break;
1163 case DNSAction::Action::Nxdomain:
1164 dq.dh->rcode = RCode::NXDomain;
1165 dq.dh->qr=true;
1166 ++g_stats.ruleNXDomain;
1167 return true;
1168 break;
1169 case DNSAction::Action::Refused:
1170 dq.dh->rcode = RCode::Refused;
1171 dq.dh->qr=true;
1172 ++g_stats.ruleRefused;
1173 return true;
1174 break;
1175 case DNSAction::Action::ServFail:
1176 dq.dh->rcode = RCode::ServFail;
1177 dq.dh->qr=true;
1178 ++g_stats.ruleServFail;
1179 return true;
1180 break;
1181 case DNSAction::Action::Spoof:
1182 spoofResponseFromString(dq, ruleresult);
1183 return true;
1184 break;
1185 case DNSAction::Action::Truncate:
1186 dq.dh->tc = true;
1187 dq.dh->qr = true;
1188 return true;
1189 break;
1190 case DNSAction::Action::HeaderModify:
1191 return true;
1192 break;
1193 case DNSAction::Action::Pool:
1194 poolname=ruleresult;
1195 return true;
1196 break;
1197 /* non-terminal actions follow */
1198 case DNSAction::Action::Delay:
1199 dq.delayMsec = static_cast<int>(pdns_stou(ruleresult)); // sorry
1200 break;
1201 case DNSAction::Action::None:
1202 /* fall-through */
1203 case DNSAction::Action::NoOp:
1204 break;
1205 case DNSAction::Action::NoRecurse:
1206 dq.dh->rd = false;
1207 return true;
1208 break;
1209 }
1210 }
1211 }
1212
1213 return true;
1214 }
1215
1216 ssize_t udpClientSendRequestToBackend(const std::shared_ptr<DownstreamState>& ss, const int sd, const char* request, const size_t requestLen, bool healthCheck)
1217 {
1218 ssize_t result;
1219
1220 if (ss->sourceItf == 0) {
1221 result = send(sd, request, requestLen, 0);
1222 }
1223 else {
1224 struct msghdr msgh;
1225 struct iovec iov;
1226 char cbuf[256];
1227 ComboAddress remote(ss->remote);
1228 fillMSGHdr(&msgh, &iov, cbuf, sizeof(cbuf), const_cast<char*>(request), requestLen, &remote);
1229 addCMsgSrcAddr(&msgh, cbuf, &ss->sourceAddr, ss->sourceItf);
1230 result = sendmsg(sd, &msgh, 0);
1231 }
1232
1233 if (result == -1) {
1234 int savederrno = errno;
1235 vinfolog("Error sending request to backend %s: %d", ss->remote.toStringWithPort(), savederrno);
1236
1237 /* This might sound silly, but on Linux send() might fail with EINVAL
1238 if the interface the socket was bound to doesn't exist anymore.
1239 We don't want to reconnect the real socket if the healthcheck failed,
1240 because it's not using the same socket.
1241 */
1242 if (!healthCheck && (savederrno == EINVAL || savederrno == ENODEV)) {
1243 ss->reconnect();
1244 }
1245 }
1246
1247 return result;
1248 }
1249
1250 static bool isUDPQueryAcceptable(ClientState& cs, LocalHolders& holders, const struct msghdr* msgh, const ComboAddress& remote, ComboAddress& dest)
1251 {
1252 if (msgh->msg_flags & MSG_TRUNC) {
1253 /* message was too large for our buffer */
1254 vinfolog("Dropping message too large for our buffer");
1255 ++g_stats.nonCompliantQueries;
1256 return false;
1257 }
1258
1259 if(!holders.acl->match(remote)) {
1260 vinfolog("Query from %s dropped because of ACL", remote.toStringWithPort());
1261 ++g_stats.aclDrops;
1262 return false;
1263 }
1264
1265 cs.queries++;
1266 ++g_stats.queries;
1267
1268 if (HarvestDestinationAddress(msgh, &dest)) {
1269 /* we don't get the port, only the address */
1270 dest.sin4.sin_port = cs.local.sin4.sin_port;
1271 }
1272 else {
1273 dest.sin4.sin_family = 0;
1274 }
1275
1276 return true;
1277 }
1278
1279 boost::optional<std::vector<uint8_t>> checkDNSCryptQuery(const ClientState& cs, const char* query, uint16_t& len, std::shared_ptr<DNSCryptQuery>& dnsCryptQuery, time_t now, bool tcp)
1280 {
1281 if (cs.dnscryptCtx) {
1282 #ifdef HAVE_DNSCRYPT
1283 vector<uint8_t> response;
1284 uint16_t decryptedQueryLen = 0;
1285
1286 dnsCryptQuery = std::make_shared<DNSCryptQuery>(cs.dnscryptCtx);
1287
1288 bool decrypted = handleDNSCryptQuery(const_cast<char*>(query), len, dnsCryptQuery, &decryptedQueryLen, tcp, now, response);
1289
1290 if (!decrypted) {
1291 if (response.size() > 0) {
1292 return response;
1293 }
1294 throw std::runtime_error("Unable to decrypt DNSCrypt query, dropping.");
1295 }
1296
1297 len = decryptedQueryLen;
1298 #endif /* HAVE_DNSCRYPT */
1299 }
1300 return boost::none;
1301 }
1302
1303 bool checkQueryHeaders(const struct dnsheader* dh)
1304 {
1305 if (dh->qr) { // don't respond to responses
1306 ++g_stats.nonCompliantQueries;
1307 return false;
1308 }
1309
1310 if (dh->qdcount == 0) {
1311 ++g_stats.emptyQueries;
1312 return false;
1313 }
1314
1315 if (dh->rd) {
1316 ++g_stats.rdQueries;
1317 }
1318
1319 return true;
1320 }
1321
1322 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1323 static void queueResponse(const ClientState& cs, const char* response, uint16_t responseLen, const ComboAddress& dest, const ComboAddress& remote, struct mmsghdr& outMsg, struct iovec* iov, char* cbuf)
1324 {
1325 outMsg.msg_len = 0;
1326 fillMSGHdr(&outMsg.msg_hdr, iov, nullptr, 0, const_cast<char*>(response), responseLen, const_cast<ComboAddress*>(&remote));
1327
1328 if (dest.sin4.sin_family == 0) {
1329 outMsg.msg_hdr.msg_control = nullptr;
1330 }
1331 else {
1332 addCMsgSrcAddr(&outMsg.msg_hdr, cbuf, &dest, 0);
1333 }
1334 }
1335 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1336
1337 /* self-generated responses or cache hits */
1338 static bool prepareOutgoingResponse(LocalHolders& holders, ClientState& cs, DNSQuestion& dq, bool cacheHit)
1339 {
1340 DNSResponse dr(dq.qname, dq.qtype, dq.qclass, dq.consumed, dq.local, dq.remote, reinterpret_cast<dnsheader*>(dq.dh), dq.size, dq.len, dq.tcp, dq.queryTime);
1341
1342 #ifdef HAVE_PROTOBUF
1343 dr.uniqueId = dq.uniqueId;
1344 #endif
1345 dr.qTag = dq.qTag;
1346 dr.delayMsec = dq.delayMsec;
1347
1348 if (!applyRulesToResponse(cacheHit ? holders.cacheHitRespRulactions : holders.selfAnsweredRespRulactions, dr)) {
1349 return false;
1350 }
1351
1352 /* in case a rule changed it */
1353 dq.delayMsec = dr.delayMsec;
1354
1355 #ifdef HAVE_DNSCRYPT
1356 if (!cs.muted) {
1357 if (!encryptResponse(reinterpret_cast<char*>(dq.dh), &dq.len, dq.size, dq.tcp, dq.dnsCryptQuery, nullptr, nullptr)) {
1358 return false;
1359 }
1360 }
1361 #endif /* HAVE_DNSCRYPT */
1362
1363 if (cacheHit) {
1364 ++g_stats.cacheHits;
1365 }
1366
1367 switch (dr.dh->rcode) {
1368 case RCode::NXDomain:
1369 ++g_stats.frontendNXDomain;
1370 break;
1371 case RCode::ServFail:
1372 ++g_stats.frontendServFail;
1373 break;
1374 case RCode::NoError:
1375 ++g_stats.frontendNoError;
1376 break;
1377 }
1378
1379 doLatencyStats(0); // we're not going to measure this
1380 return true;
1381 }
1382
1383 ProcessQueryResult processQuery(DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend)
1384 {
1385 const uint16_t queryId = ntohs(dq.dh->id);
1386
1387 try {
1388 /* we need an accurate ("real") value for the response and
1389 to store into the IDS, but not for insertion into the
1390 rings for example */
1391 struct timespec now;
1392 gettime(&now);
1393
1394 string poolname;
1395
1396 if (!applyRulesToQuery(holders, dq, poolname, now)) {
1397 return ProcessQueryResult::Drop;
1398 }
1399
1400 if(dq.dh->qr) { // something turned it into a response
1401 fixUpQueryTurnedResponse(dq, dq.origFlags);
1402
1403 if (!prepareOutgoingResponse(holders, cs, dq, false)) {
1404 return ProcessQueryResult::Drop;
1405 }
1406
1407 ++g_stats.selfAnswered;
1408 return ProcessQueryResult::SendAnswer;
1409 }
1410
1411 std::shared_ptr<ServerPool> serverPool = getPool(*holders.pools, poolname);
1412 dq.packetCache = serverPool->packetCache;
1413 auto policy = *(holders.policy);
1414 if (serverPool->policy != nullptr) {
1415 policy = *(serverPool->policy);
1416 }
1417 auto servers = serverPool->getServers();
1418 if (policy.isLua) {
1419 std::lock_guard<std::mutex> lock(g_luamutex);
1420 selectedBackend = policy.policy(servers, &dq);
1421 }
1422 else {
1423 selectedBackend = policy.policy(servers, &dq);
1424 }
1425
1426 uint16_t cachedResponseSize = dq.size;
1427 uint32_t allowExpired = selectedBackend ? 0 : g_staleCacheEntriesTTL;
1428
1429 if (dq.packetCache && !dq.skipCache) {
1430 dq.dnssecOK = (getEDNSZ(dq) & EDNS_HEADER_FLAG_DO);
1431 }
1432
1433 if (dq.useECS && ((selectedBackend && selectedBackend->useECS) || (!selectedBackend && serverPool->getECS()))) {
1434 // we special case our cache in case a downstream explicitly gave us a universally valid response with a 0 scope
1435 if (dq.packetCache && !dq.skipCache && (!selectedBackend || !selectedBackend->disableZeroScope) && dq.packetCache->isECSParsingEnabled()) {
1436 if (dq.packetCache->get(dq, dq.consumed, dq.dh->id, reinterpret_cast<char*>(dq.dh), &cachedResponseSize, &dq.cacheKeyNoECS, dq.subnet, dq.dnssecOK, allowExpired)) {
1437 dq.len = cachedResponseSize;
1438
1439 if (!prepareOutgoingResponse(holders, cs, dq, true)) {
1440 return ProcessQueryResult::Drop;
1441 }
1442
1443 return ProcessQueryResult::SendAnswer;
1444 }
1445
1446 if (!dq.subnet) {
1447 /* there was no existing ECS on the query, enable the zero-scope feature */
1448 dq.useZeroScope = true;
1449 }
1450 }
1451
1452 if (!handleEDNSClientSubnet(dq, &(dq.ednsAdded), &(dq.ecsAdded), g_preserveTrailingData)) {
1453 vinfolog("Dropping query from %s because we couldn't insert the ECS value", dq.remote->toStringWithPort());
1454 return ProcessQueryResult::Drop;
1455 }
1456 }
1457
1458 if (dq.packetCache && !dq.skipCache) {
1459 if (dq.packetCache->get(dq, dq.consumed, dq.dh->id, reinterpret_cast<char*>(dq.dh), &cachedResponseSize, &dq.cacheKey, dq.subnet, dq.dnssecOK, allowExpired)) {
1460 dq.len = cachedResponseSize;
1461
1462 if (!prepareOutgoingResponse(holders, cs, dq, true)) {
1463 return ProcessQueryResult::Drop;
1464 }
1465
1466 return ProcessQueryResult::SendAnswer;
1467 }
1468 ++g_stats.cacheMisses;
1469 }
1470
1471 if(!selectedBackend) {
1472 ++g_stats.noPolicy;
1473
1474 vinfolog("%s query for %s|%s from %s, no policy applied", g_servFailOnNoPolicy ? "ServFailed" : "Dropped", dq.qname->toString(), QType(dq.qtype).getName(), dq.remote->toStringWithPort());
1475 if (g_servFailOnNoPolicy) {
1476 restoreFlags(dq.dh, dq.origFlags);
1477
1478 dq.dh->rcode = RCode::ServFail;
1479 dq.dh->qr = true;
1480
1481 if (!prepareOutgoingResponse(holders, cs, dq, false)) {
1482 return ProcessQueryResult::Drop;
1483 }
1484 // no response-only statistics counter to update.
1485 return ProcessQueryResult::SendAnswer;
1486 }
1487
1488 return ProcessQueryResult::Drop;
1489 }
1490
1491 if (dq.addXPF && selectedBackend->xpfRRCode != 0) {
1492 addXPF(dq, selectedBackend->xpfRRCode, g_preserveTrailingData);
1493 }
1494
1495 selectedBackend->queries++;
1496 return ProcessQueryResult::PassToBackend;
1497 }
1498 catch(const std::exception& e){
1499 vinfolog("Got an error while parsing a %s query from %s, id %d: %s", (dq.tcp ? "TCP" : "UDP"), dq.remote->toStringWithPort(), queryId, e.what());
1500 }
1501 return ProcessQueryResult::Drop;
1502 }
1503
1504 static void processUDPQuery(ClientState& cs, LocalHolders& holders, const struct msghdr* msgh, const ComboAddress& remote, ComboAddress& dest, char* query, uint16_t len, size_t queryBufferSize, struct mmsghdr* responsesVect, unsigned int* queuedResponses, struct iovec* respIOV, char* respCBuf)
1505 {
1506 assert(responsesVect == nullptr || (queuedResponses != nullptr && respIOV != nullptr && respCBuf != nullptr));
1507 uint16_t queryId = 0;
1508
1509 try {
1510 if (!isUDPQueryAcceptable(cs, holders, msgh, remote, dest)) {
1511 return;
1512 }
1513
1514 /* we need an accurate ("real") value for the response and
1515 to store into the IDS, but not for insertion into the
1516 rings for example */
1517 struct timespec queryRealTime;
1518 gettime(&queryRealTime, true);
1519
1520 std::shared_ptr<DNSCryptQuery> dnsCryptQuery = nullptr;
1521 auto dnsCryptResponse = checkDNSCryptQuery(cs, query, len, dnsCryptQuery, queryRealTime.tv_sec, false);
1522 if (dnsCryptResponse) {
1523 sendUDPResponse(cs.udpFD, reinterpret_cast<char*>(dnsCryptResponse->data()), static_cast<uint16_t>(dnsCryptResponse->size()), 0, dest, remote);
1524 return;
1525 }
1526
1527 struct dnsheader* dh = reinterpret_cast<struct dnsheader*>(query);
1528 queryId = ntohs(dh->id);
1529
1530 if (!checkQueryHeaders(dh)) {
1531 return;
1532 }
1533
1534 uint16_t qtype, qclass;
1535 unsigned int consumed = 0;
1536 DNSName qname(query, len, sizeof(dnsheader), false, &qtype, &qclass, &consumed);
1537 DNSQuestion dq(&qname, qtype, qclass, consumed, dest.sin4.sin_family != 0 ? &dest : &cs.local, &remote, dh, queryBufferSize, len, false, &queryRealTime);
1538 dq.dnsCryptQuery = std::move(dnsCryptQuery);
1539 std::shared_ptr<DownstreamState> ss{nullptr};
1540 auto result = processQuery(dq, cs, holders, ss);
1541
1542 if (result == ProcessQueryResult::Drop) {
1543 return;
1544 }
1545
1546 if (result == ProcessQueryResult::SendAnswer) {
1547 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1548 if (dq.delayMsec == 0 && responsesVect != nullptr) {
1549 queueResponse(cs, reinterpret_cast<char*>(dq.dh), dq.len, *dq.local, *dq.remote, responsesVect[*queuedResponses], respIOV, respCBuf);
1550 (*queuedResponses)++;
1551 return;
1552 }
1553 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1554 /* we use dest, always, because we don't want to use the listening address to send a response since it could be 0.0.0.0 */
1555 sendUDPResponse(cs.udpFD, reinterpret_cast<char*>(dq.dh), dq.len, dq.delayMsec, dest, *dq.remote);
1556 return;
1557 }
1558
1559 if (result != ProcessQueryResult::PassToBackend || ss == nullptr) {
1560 return;
1561 }
1562
1563 unsigned int idOffset = (ss->idOffset++) % ss->idStates.size();
1564 IDState* ids = &ss->idStates[idOffset];
1565 ids->age = 0;
1566 ids->du = nullptr;
1567
1568 int oldFD = ids->origFD.exchange(cs.udpFD);
1569 if(oldFD < 0) {
1570 // if we are reusing, no change in outstanding
1571 ++ss->outstanding;
1572 }
1573 else {
1574 ++ss->reuseds;
1575 ++g_stats.downstreamTimeouts;
1576 }
1577
1578 ids->cs = &cs;
1579 ids->origID = dh->id;
1580 setIDStateFromDNSQuestion(*ids, dq, std::move(qname));
1581
1582 /* If we couldn't harvest the real dest addr, still
1583 write down the listening addr since it will be useful
1584 (especially if it's not an 'any' one).
1585 We need to keep track of which one it is since we may
1586 want to use the real but not the listening addr to reply.
1587 */
1588 if (dest.sin4.sin_family != 0) {
1589 ids->origDest = dest;
1590 ids->destHarvested = true;
1591 }
1592 else {
1593 ids->origDest = cs.local;
1594 ids->destHarvested = false;
1595 }
1596
1597 dh->id = idOffset;
1598
1599 int fd = pickBackendSocketForSending(ss);
1600 ssize_t ret = udpClientSendRequestToBackend(ss, fd, query, dq.len);
1601
1602 if(ret < 0) {
1603 ++ss->sendErrors;
1604 ++g_stats.downstreamSendErrors;
1605 }
1606
1607 vinfolog("Got query for %s|%s from %s, relayed to %s", ids->qname.toString(), QType(ids->qtype).getName(), remote.toStringWithPort(), ss->getName());
1608 }
1609 catch(const std::exception& e){
1610 vinfolog("Got an error in UDP question thread while parsing a query from %s, id %d: %s", remote.toStringWithPort(), queryId, e.what());
1611 }
1612 }
1613
1614 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1615 static void MultipleMessagesUDPClientThread(ClientState* cs, LocalHolders& holders)
1616 {
1617 struct MMReceiver
1618 {
1619 char packet[4096];
1620 /* used by HarvestDestinationAddress */
1621 char cbuf[256];
1622 ComboAddress remote;
1623 ComboAddress dest;
1624 struct iovec iov;
1625 };
1626 const size_t vectSize = g_udpVectorSize;
1627 /* the actual buffer is larger because:
1628 - we may have to add EDNS and/or ECS
1629 - we use it for self-generated responses (from rule or cache)
1630 but we only accept incoming payloads up to that size
1631 */
1632 static_assert(s_udpIncomingBufferSize <= sizeof(MMReceiver::packet), "the incoming buffer size should not be larger than sizeof(MMReceiver::packet)");
1633
1634 auto recvData = std::unique_ptr<MMReceiver[]>(new MMReceiver[vectSize]);
1635 auto msgVec = std::unique_ptr<struct mmsghdr[]>(new struct mmsghdr[vectSize]);
1636 auto outMsgVec = std::unique_ptr<struct mmsghdr[]>(new struct mmsghdr[vectSize]);
1637
1638 /* initialize the structures needed to receive our messages */
1639 for (size_t idx = 0; idx < vectSize; idx++) {
1640 recvData[idx].remote.sin4.sin_family = cs->local.sin4.sin_family;
1641 fillMSGHdr(&msgVec[idx].msg_hdr, &recvData[idx].iov, recvData[idx].cbuf, sizeof(recvData[idx].cbuf), recvData[idx].packet, s_udpIncomingBufferSize, &recvData[idx].remote);
1642 }
1643
1644 /* go now */
1645 for(;;) {
1646
1647 /* reset the IO vector, since it's also used to send the vector of responses
1648 to avoid having to copy the data around */
1649 for (size_t idx = 0; idx < vectSize; idx++) {
1650 recvData[idx].iov.iov_base = recvData[idx].packet;
1651 recvData[idx].iov.iov_len = sizeof(recvData[idx].packet);
1652 }
1653
1654 /* block until we have at least one message ready, but return
1655 as many as possible to save the syscall costs */
1656 int msgsGot = recvmmsg(cs->udpFD, msgVec.get(), vectSize, MSG_WAITFORONE | MSG_TRUNC, nullptr);
1657
1658 if (msgsGot <= 0) {
1659 vinfolog("Getting UDP messages via recvmmsg() failed with: %s", strerror(errno));
1660 continue;
1661 }
1662
1663 unsigned int msgsToSend = 0;
1664
1665 /* process the received messages */
1666 for (int msgIdx = 0; msgIdx < msgsGot; msgIdx++) {
1667 const struct msghdr* msgh = &msgVec[msgIdx].msg_hdr;
1668 unsigned int got = msgVec[msgIdx].msg_len;
1669 const ComboAddress& remote = recvData[msgIdx].remote;
1670
1671 if (got < 0 || static_cast<size_t>(got) < sizeof(struct dnsheader)) {
1672 ++g_stats.nonCompliantQueries;
1673 continue;
1674 }
1675
1676 processUDPQuery(*cs, holders, msgh, remote, recvData[msgIdx].dest, recvData[msgIdx].packet, static_cast<uint16_t>(got), sizeof(recvData[msgIdx].packet), outMsgVec.get(), &msgsToSend, &recvData[msgIdx].iov, recvData[msgIdx].cbuf);
1677
1678 }
1679
1680 /* immediate (not delayed or sent to a backend) responses (mostly from a rule, dynamic block
1681 or the cache) can be sent in batch too */
1682
1683 if (msgsToSend > 0 && msgsToSend <= static_cast<unsigned int>(msgsGot)) {
1684 int sent = sendmmsg(cs->udpFD, outMsgVec.get(), msgsToSend, 0);
1685
1686 if (sent < 0 || static_cast<unsigned int>(sent) != msgsToSend) {
1687 vinfolog("Error sending responses with sendmmsg() (%d on %u): %s", sent, msgsToSend, strerror(errno));
1688 }
1689 }
1690
1691 }
1692 }
1693 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1694
1695 // listens to incoming queries, sends out to downstream servers, noting the intended return path
1696 static void udpClientThread(ClientState* cs)
1697 try
1698 {
1699 setThreadName("dnsdist/udpClie");
1700 LocalHolders holders;
1701
1702 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1703 if (g_udpVectorSize > 1) {
1704 MultipleMessagesUDPClientThread(cs, holders);
1705
1706 }
1707 else
1708 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1709 {
1710 char packet[4096];
1711 /* the actual buffer is larger because:
1712 - we may have to add EDNS and/or ECS
1713 - we use it for self-generated responses (from rule or cache)
1714 but we only accept incoming payloads up to that size
1715 */
1716 static_assert(s_udpIncomingBufferSize <= sizeof(packet), "the incoming buffer size should not be larger than sizeof(MMReceiver::packet)");
1717 struct msghdr msgh;
1718 struct iovec iov;
1719 /* used by HarvestDestinationAddress */
1720 char cbuf[256];
1721
1722 ComboAddress remote;
1723 ComboAddress dest;
1724 remote.sin4.sin_family = cs->local.sin4.sin_family;
1725 fillMSGHdr(&msgh, &iov, cbuf, sizeof(cbuf), packet, sizeof(packet), &remote);
1726
1727 for(;;) {
1728 ssize_t got = recvmsg(cs->udpFD, &msgh, 0);
1729
1730 if (got < 0 || static_cast<size_t>(got) < sizeof(struct dnsheader)) {
1731 ++g_stats.nonCompliantQueries;
1732 continue;
1733 }
1734
1735 processUDPQuery(*cs, holders, &msgh, remote, dest, packet, static_cast<uint16_t>(got), s_udpIncomingBufferSize, nullptr, nullptr, nullptr, nullptr);
1736 }
1737 }
1738 }
1739 catch(const std::exception &e)
1740 {
1741 errlog("UDP client thread died because of exception: %s", e.what());
1742 }
1743 catch(const PDNSException &e)
1744 {
1745 errlog("UDP client thread died because of PowerDNS exception: %s", e.reason);
1746 }
1747 catch(...)
1748 {
1749 errlog("UDP client thread died because of an exception: %s", "unknown");
1750 }
1751
1752 uint16_t getRandomDNSID()
1753 {
1754 #ifdef HAVE_LIBSODIUM
1755 return (randombytes_random() % 65536);
1756 #else
1757 return (random() % 65536);
1758 #endif
1759 }
1760
1761 static bool upCheck(const shared_ptr<DownstreamState>& ds)
1762 try
1763 {
1764 DNSName checkName = ds->checkName;
1765 uint16_t checkType = ds->checkType.getCode();
1766 uint16_t checkClass = ds->checkClass;
1767 dnsheader checkHeader;
1768 memset(&checkHeader, 0, sizeof(checkHeader));
1769
1770 checkHeader.qdcount = htons(1);
1771 checkHeader.id = getRandomDNSID();
1772
1773 checkHeader.rd = true;
1774 if (ds->setCD) {
1775 checkHeader.cd = true;
1776 }
1777
1778 if (ds->checkFunction) {
1779 std::lock_guard<std::mutex> lock(g_luamutex);
1780 auto ret = ds->checkFunction(checkName, checkType, checkClass, &checkHeader);
1781 checkName = std::get<0>(ret);
1782 checkType = std::get<1>(ret);
1783 checkClass = std::get<2>(ret);
1784 }
1785
1786 vector<uint8_t> packet;
1787 DNSPacketWriter dpw(packet, checkName, checkType, checkClass);
1788 dnsheader * requestHeader = dpw.getHeader();
1789 *requestHeader = checkHeader;
1790
1791 Socket sock(ds->remote.sin4.sin_family, SOCK_DGRAM);
1792 sock.setNonBlocking();
1793 if (!IsAnyAddress(ds->sourceAddr)) {
1794 sock.setReuseAddr();
1795 sock.bind(ds->sourceAddr);
1796 }
1797 sock.connect(ds->remote);
1798 ssize_t sent = udpClientSendRequestToBackend(ds, sock.getHandle(), reinterpret_cast<char*>(&packet[0]), packet.size(), true);
1799 if (sent < 0) {
1800 int ret = errno;
1801 if (g_verboseHealthChecks)
1802 infolog("Error while sending a health check query to backend %s: %d", ds->getNameWithAddr(), ret);
1803 return false;
1804 }
1805
1806 int ret = waitForRWData(sock.getHandle(), true, /* ms to seconds */ ds->checkTimeout / 1000, /* remaining ms to us */ (ds->checkTimeout % 1000) * 1000);
1807 if(ret < 0 || !ret) { // error, timeout, both are down!
1808 if (ret < 0) {
1809 ret = errno;
1810 if (g_verboseHealthChecks)
1811 infolog("Error while waiting for the health check response from backend %s: %d", ds->getNameWithAddr(), ret);
1812 }
1813 else {
1814 if (g_verboseHealthChecks)
1815 infolog("Timeout while waiting for the health check response from backend %s", ds->getNameWithAddr());
1816 }
1817 return false;
1818 }
1819
1820 string reply;
1821 ComboAddress from;
1822 sock.recvFrom(reply, from);
1823
1824 /* we are using a connected socket but hey.. */
1825 if (from != ds->remote) {
1826 if (g_verboseHealthChecks)
1827 infolog("Invalid health check response received from %s, expecting one from %s", from.toStringWithPort(), ds->remote.toStringWithPort());
1828 return false;
1829 }
1830
1831 const dnsheader * responseHeader = reinterpret_cast<const dnsheader *>(reply.c_str());
1832
1833 if (reply.size() < sizeof(*responseHeader)) {
1834 if (g_verboseHealthChecks)
1835 infolog("Invalid health check response of size %d from backend %s, expecting at least %d", reply.size(), ds->getNameWithAddr(), sizeof(*responseHeader));
1836 return false;
1837 }
1838
1839 if (responseHeader->id != requestHeader->id) {
1840 if (g_verboseHealthChecks)
1841 infolog("Invalid health check response id %d from backend %s, expecting %d", responseHeader->id, ds->getNameWithAddr(), requestHeader->id);
1842 return false;
1843 }
1844
1845 if (!responseHeader->qr) {
1846 if (g_verboseHealthChecks)
1847 infolog("Invalid health check response from backend %s, expecting QR to be set", ds->getNameWithAddr());
1848 return false;
1849 }
1850
1851 if (responseHeader->rcode == RCode::ServFail) {
1852 if (g_verboseHealthChecks)
1853 infolog("Backend %s responded to health check with ServFail", ds->getNameWithAddr());
1854 return false;
1855 }
1856
1857 if (ds->mustResolve && (responseHeader->rcode == RCode::NXDomain || responseHeader->rcode == RCode::Refused)) {
1858 if (g_verboseHealthChecks)
1859 infolog("Backend %s responded to health check with %s while mustResolve is set", ds->getNameWithAddr(), responseHeader->rcode == RCode::NXDomain ? "NXDomain" : "Refused");
1860 return false;
1861 }
1862
1863 uint16_t receivedType;
1864 uint16_t receivedClass;
1865 DNSName receivedName(reply.c_str(), reply.size(), sizeof(dnsheader), false, &receivedType, &receivedClass);
1866
1867 if (receivedName != checkName || receivedType != checkType || receivedClass != checkClass) {
1868 if (g_verboseHealthChecks)
1869 infolog("Backend %s responded to health check with an invalid qname (%s vs %s), qtype (%s vs %s) or qclass (%d vs %d)", ds->getNameWithAddr(), receivedName.toLogString(), checkName.toLogString(), QType(receivedType).getName(), QType(checkType).getName(), receivedClass, checkClass);
1870 return false;
1871 }
1872
1873 return true;
1874 }
1875 catch(const std::exception& e)
1876 {
1877 if (g_verboseHealthChecks)
1878 infolog("Error checking the health of backend %s: %s", ds->getNameWithAddr(), e.what());
1879 return false;
1880 }
1881 catch(...)
1882 {
1883 if (g_verboseHealthChecks)
1884 infolog("Unknown exception while checking the health of backend %s", ds->getNameWithAddr());
1885 return false;
1886 }
1887
1888 uint64_t g_maxTCPClientThreads{10};
1889 std::atomic<uint16_t> g_cacheCleaningDelay{60};
1890 std::atomic<uint16_t> g_cacheCleaningPercentage{100};
1891
1892 void maintThread()
1893 {
1894 setThreadName("dnsdist/main");
1895 int interval = 1;
1896 size_t counter = 0;
1897 int32_t secondsToWaitLog = 0;
1898
1899 for(;;) {
1900 sleep(interval);
1901
1902 {
1903 std::lock_guard<std::mutex> lock(g_luamutex);
1904 auto f = g_lua.readVariable<boost::optional<std::function<void()> > >("maintenance");
1905 if(f) {
1906 try {
1907 (*f)();
1908 secondsToWaitLog = 0;
1909 }
1910 catch(std::exception &e) {
1911 if (secondsToWaitLog <= 0) {
1912 infolog("Error during execution of maintenance function: %s", e.what());
1913 secondsToWaitLog = 61;
1914 }
1915 secondsToWaitLog -= interval;
1916 }
1917 }
1918 }
1919
1920 counter++;
1921 if (counter >= g_cacheCleaningDelay) {
1922 /* keep track, for each cache, of whether we should keep
1923 expired entries */
1924 std::map<std::shared_ptr<DNSDistPacketCache>, bool> caches;
1925
1926 /* gather all caches actually used by at least one pool, and see
1927 if something prevents us from cleaning the expired entries */
1928 auto localPools = g_pools.getLocal();
1929 for (const auto& entry : *localPools) {
1930 auto& pool = entry.second;
1931
1932 auto packetCache = pool->packetCache;
1933 if (!packetCache) {
1934 continue;
1935 }
1936
1937 auto pair = caches.insert({packetCache, false});
1938 auto& iter = pair.first;
1939 /* if we need to keep stale data for this cache (ie, not clear
1940 expired entries when at least one pool using this cache
1941 has all its backends down) */
1942 if (packetCache->keepStaleData() && iter->second == false) {
1943 /* so far all pools had at least one backend up */
1944 if (pool->countServers(true) == 0) {
1945 iter->second = true;
1946 }
1947 }
1948 }
1949
1950 for (auto pair : caches) {
1951 /* shall we keep expired entries ? */
1952 if (pair.second == true) {
1953 continue;
1954 }
1955 auto& packetCache = pair.first;
1956 size_t upTo = (packetCache->getMaxEntries()* (100 - g_cacheCleaningPercentage)) / 100;
1957 packetCache->purgeExpired(upTo);
1958 }
1959 counter = 0;
1960 }
1961
1962 // ponder pruning g_dynblocks of expired entries here
1963 }
1964 }
1965
1966 static void secPollThread()
1967 {
1968 setThreadName("dnsdist/secpoll");
1969
1970 for (;;) {
1971 try {
1972 doSecPoll(g_secPollSuffix);
1973 }
1974 catch(...) {
1975 }
1976 sleep(g_secPollInterval);
1977 }
1978 }
1979
1980 static void healthChecksThread()
1981 {
1982 setThreadName("dnsdist/healthC");
1983
1984 int interval = 1;
1985
1986 for(;;) {
1987 sleep(interval);
1988
1989 if(g_tcpclientthreads->getQueuedCount() > 1 && !g_tcpclientthreads->hasReachedMaxThreads())
1990 g_tcpclientthreads->addTCPClientThread();
1991
1992 auto states = g_dstates.getLocal(); // this points to the actual shared_ptrs!
1993 for(auto& dss : *states) {
1994 if(++dss->lastCheck < dss->checkInterval)
1995 continue;
1996 dss->lastCheck = 0;
1997 if(dss->availability==DownstreamState::Availability::Auto) {
1998 bool newState=upCheck(dss);
1999 if (newState) {
2000 /* check succeeded */
2001 dss->currentCheckFailures = 0;
2002
2003 if (!dss->upStatus) {
2004 /* we were marked as down */
2005 dss->consecutiveSuccessfulChecks++;
2006 if (dss->consecutiveSuccessfulChecks < dss->minRiseSuccesses) {
2007 /* if we need more than one successful check to rise
2008 and we didn't reach the threshold yet,
2009 let's stay down */
2010 newState = false;
2011 }
2012 }
2013 }
2014 else {
2015 /* check failed */
2016 dss->consecutiveSuccessfulChecks = 0;
2017
2018 if (dss->upStatus) {
2019 /* we are currently up */
2020 dss->currentCheckFailures++;
2021 if (dss->currentCheckFailures < dss->maxCheckFailures) {
2022 /* we need more than one failure to be marked as down,
2023 and we did not reach the threshold yet, let's stay down */
2024 newState = true;
2025 }
2026 }
2027 }
2028
2029 if(newState != dss->upStatus) {
2030 warnlog("Marking downstream %s as '%s'", dss->getNameWithAddr(), newState ? "up" : "down");
2031
2032 if (newState && !dss->connected) {
2033 newState = dss->reconnect();
2034
2035 if (dss->connected && !dss->threadStarted.test_and_set()) {
2036 dss->tid = thread(responderThread, dss);
2037 }
2038 }
2039
2040 dss->upStatus = newState;
2041 dss->currentCheckFailures = 0;
2042 dss->consecutiveSuccessfulChecks = 0;
2043 if (g_snmpAgent && g_snmpTrapsEnabled) {
2044 g_snmpAgent->sendBackendStatusChangeTrap(dss);
2045 }
2046 }
2047 }
2048
2049 auto delta = dss->sw.udiffAndSet()/1000000.0;
2050 dss->queryLoad = 1.0*(dss->queries.load() - dss->prev.queries.load())/delta;
2051 dss->dropRate = 1.0*(dss->reuseds.load() - dss->prev.reuseds.load())/delta;
2052 dss->prev.queries.store(dss->queries.load());
2053 dss->prev.reuseds.store(dss->reuseds.load());
2054
2055 for(IDState& ids : dss->idStates) { // timeouts
2056 int origFD = ids.origFD;
2057 if(origFD >=0 && ids.age++ > g_udpTimeout) {
2058 /* We set origFD to -1 as soon as possible
2059 to limit the risk of racing with the
2060 responder thread.
2061 The UDP client thread only checks origFD to
2062 know whether outstanding has to be incremented,
2063 so the sooner the better any way since we _will_
2064 decrement it.
2065 */
2066 if (ids.origFD.exchange(-1) != origFD) {
2067 /* this state has been altered in the meantime,
2068 don't go anywhere near it */
2069 continue;
2070 }
2071 ids.du = nullptr;
2072 ids.age = 0;
2073 dss->reuseds++;
2074 --dss->outstanding;
2075 ++g_stats.downstreamTimeouts; // this is an 'actively' discovered timeout
2076 vinfolog("Had a downstream timeout from %s (%s) for query for %s|%s from %s",
2077 dss->remote.toStringWithPort(), dss->name,
2078 ids.qname.toString(), QType(ids.qtype).getName(), ids.origRemote.toStringWithPort());
2079
2080 struct timespec ts;
2081 gettime(&ts);
2082
2083 struct dnsheader fake;
2084 memset(&fake, 0, sizeof(fake));
2085 fake.id = ids.origID;
2086
2087 g_rings.insertResponse(ts, ids.origRemote, ids.qname, ids.qtype, std::numeric_limits<unsigned int>::max(), 0, fake, dss->remote);
2088 }
2089 }
2090 }
2091 }
2092 }
2093
2094 static void bindAny(int af, int sock)
2095 {
2096 __attribute__((unused)) int one = 1;
2097
2098 #ifdef IP_FREEBIND
2099 if (setsockopt(sock, IPPROTO_IP, IP_FREEBIND, &one, sizeof(one)) < 0)
2100 warnlog("Warning: IP_FREEBIND setsockopt failed: %s", strerror(errno));
2101 #endif
2102
2103 #ifdef IP_BINDANY
2104 if (af == AF_INET)
2105 if (setsockopt(sock, IPPROTO_IP, IP_BINDANY, &one, sizeof(one)) < 0)
2106 warnlog("Warning: IP_BINDANY setsockopt failed: %s", strerror(errno));
2107 #endif
2108 #ifdef IPV6_BINDANY
2109 if (af == AF_INET6)
2110 if (setsockopt(sock, IPPROTO_IPV6, IPV6_BINDANY, &one, sizeof(one)) < 0)
2111 warnlog("Warning: IPV6_BINDANY setsockopt failed: %s", strerror(errno));
2112 #endif
2113 #ifdef SO_BINDANY
2114 if (setsockopt(sock, SOL_SOCKET, SO_BINDANY, &one, sizeof(one)) < 0)
2115 warnlog("Warning: SO_BINDANY setsockopt failed: %s", strerror(errno));
2116 #endif
2117 }
2118
2119 static void dropGroupPrivs(gid_t gid)
2120 {
2121 if (gid) {
2122 if (setgid(gid) == 0) {
2123 if (setgroups(0, NULL) < 0) {
2124 warnlog("Warning: Unable to drop supplementary gids: %s", strerror(errno));
2125 }
2126 }
2127 else {
2128 warnlog("Warning: Unable to set group ID to %d: %s", gid, strerror(errno));
2129 }
2130 }
2131 }
2132
2133 static void dropUserPrivs(uid_t uid)
2134 {
2135 if(uid) {
2136 if(setuid(uid) < 0) {
2137 warnlog("Warning: Unable to set user ID to %d: %s", uid, strerror(errno));
2138 }
2139 }
2140 }
2141
2142 static void checkFileDescriptorsLimits(size_t udpBindsCount, size_t tcpBindsCount)
2143 {
2144 /* stdin, stdout, stderr */
2145 size_t requiredFDsCount = 3;
2146 auto backends = g_dstates.getLocal();
2147 /* UDP sockets to backends */
2148 size_t backendUDPSocketsCount = 0;
2149 for (const auto& backend : *backends) {
2150 backendUDPSocketsCount += backend->sockets.size();
2151 }
2152 requiredFDsCount += backendUDPSocketsCount;
2153 /* TCP sockets to backends */
2154 requiredFDsCount += (backends->size() * g_maxTCPClientThreads);
2155 /* listening sockets */
2156 requiredFDsCount += udpBindsCount;
2157 requiredFDsCount += tcpBindsCount;
2158 /* max TCP connections currently served */
2159 requiredFDsCount += g_maxTCPClientThreads;
2160 /* max pipes for communicating between TCP acceptors and client threads */
2161 requiredFDsCount += (g_maxTCPClientThreads * 2);
2162 /* max TCP queued connections */
2163 requiredFDsCount += g_maxTCPQueuedConnections;
2164 /* DelayPipe pipe */
2165 requiredFDsCount += 2;
2166 /* syslog socket */
2167 requiredFDsCount++;
2168 /* webserver main socket */
2169 requiredFDsCount++;
2170 /* console main socket */
2171 requiredFDsCount++;
2172 /* carbon export */
2173 requiredFDsCount++;
2174 /* history file */
2175 requiredFDsCount++;
2176 struct rlimit rl;
2177 getrlimit(RLIMIT_NOFILE, &rl);
2178 if (rl.rlim_cur <= requiredFDsCount) {
2179 warnlog("Warning, this configuration can use more than %d file descriptors, web server and console connections not included, and the current limit is %d.", std::to_string(requiredFDsCount), std::to_string(rl.rlim_cur));
2180 #ifdef HAVE_SYSTEMD
2181 warnlog("You can increase this value by using LimitNOFILE= in the systemd unit file or ulimit.");
2182 #else
2183 warnlog("You can increase this value by using ulimit.");
2184 #endif
2185 }
2186 }
2187
2188 static void setUpLocalBind(std::unique_ptr<ClientState>& cs)
2189 {
2190 /* skip some warnings if there is an identical UDP context */
2191 bool warn = cs->tcp == false || cs->tlsFrontend != nullptr || cs->dohFrontend != nullptr;
2192 int& fd = cs->tcp == false ? cs->udpFD : cs->tcpFD;
2193 (void) warn;
2194
2195 fd = SSocket(cs->local.sin4.sin_family, cs->tcp == false ? SOCK_DGRAM : SOCK_STREAM, 0);
2196
2197 if (cs->tcp) {
2198 SSetsockopt(fd, SOL_SOCKET, SO_REUSEADDR, 1);
2199 #ifdef TCP_DEFER_ACCEPT
2200 SSetsockopt(fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, 1);
2201 #endif
2202 if (cs->fastOpenQueueSize > 0) {
2203 #ifdef TCP_FASTOPEN
2204 SSetsockopt(fd, IPPROTO_TCP, TCP_FASTOPEN, cs->fastOpenQueueSize);
2205 #else
2206 if (warn) {
2207 warnlog("TCP Fast Open has been configured on local address '%s' but is not supported", cs->local.toStringWithPort());
2208 }
2209 #endif
2210 }
2211 }
2212
2213 if(cs->local.sin4.sin_family == AF_INET6) {
2214 SSetsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, 1);
2215 }
2216
2217 bindAny(cs->local.sin4.sin_family, fd);
2218
2219 if(!cs->tcp && IsAnyAddress(cs->local)) {
2220 int one=1;
2221 setsockopt(fd, IPPROTO_IP, GEN_IP_PKTINFO, &one, sizeof(one)); // linux supports this, so why not - might fail on other systems
2222 #ifdef IPV6_RECVPKTINFO
2223 setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, &one, sizeof(one));
2224 #endif
2225 }
2226
2227 if (cs->reuseport) {
2228 #ifdef SO_REUSEPORT
2229 SSetsockopt(fd, SOL_SOCKET, SO_REUSEPORT, 1);
2230 #else
2231 if (warn) {
2232 /* no need to warn again if configured but support is not available, we already did for UDP */
2233 warnlog("SO_REUSEPORT has been configured on local address '%s' but is not supported", cs->local.toStringWithPort());
2234 }
2235 #endif
2236 }
2237
2238 if (!cs->tcp) {
2239 if (cs->local.isIPv4()) {
2240 try {
2241 setSocketIgnorePMTU(cs->udpFD);
2242 }
2243 catch(const std::exception& e) {
2244 warnlog("Failed to set IP_MTU_DISCOVER on UDP server socket for local address '%s': %s", cs->local.toStringWithPort(), e.what());
2245 }
2246 }
2247 }
2248
2249 const std::string& itf = cs->interface;
2250 if (!itf.empty()) {
2251 #ifdef SO_BINDTODEVICE
2252 int res = setsockopt(fd, SOL_SOCKET, SO_BINDTODEVICE, itf.c_str(), itf.length());
2253 if (res != 0) {
2254 warnlog("Error setting up the interface on local address '%s': %s", cs->local.toStringWithPort(), strerror(errno));
2255 }
2256 #else
2257 if (warn) {
2258 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", cs->local.toStringWithPort());
2259 }
2260 #endif
2261 }
2262
2263 #ifdef HAVE_EBPF
2264 if (g_defaultBPFFilter) {
2265 cs->attachFilter(g_defaultBPFFilter);
2266 vinfolog("Attaching default BPF Filter to %s frontend %s", (!cs->tcp ? "UDP" : "TCP"), cs->local.toStringWithPort());
2267 }
2268 #endif /* HAVE_EBPF */
2269
2270 if (cs->tlsFrontend != nullptr) {
2271 if (!cs->tlsFrontend->setupTLS()) {
2272 errlog("Error while setting up TLS on local address '%s', exiting", cs->local.toStringWithPort());
2273 _exit(EXIT_FAILURE);
2274 }
2275 }
2276
2277 if (cs->dohFrontend != nullptr) {
2278 cs->dohFrontend->setup();
2279 }
2280
2281 SBind(fd, cs->local);
2282
2283 if (cs->tcp) {
2284 SListen(cs->tcpFD, SOMAXCONN);
2285 if (cs->tlsFrontend != nullptr) {
2286 warnlog("Listening on %s for TLS", cs->local.toStringWithPort());
2287 }
2288 else if (cs->dohFrontend != nullptr) {
2289 warnlog("Listening on %s for DoH", cs->local.toStringWithPort());
2290 }
2291 else if (cs->dnscryptCtx != nullptr) {
2292 warnlog("Listening on %s for DNSCrypt", cs->local.toStringWithPort());
2293 }
2294 else {
2295 warnlog("Listening on %s", cs->local.toStringWithPort());
2296 }
2297 }
2298
2299 cs->ready = true;
2300 }
2301
2302 struct
2303 {
2304 vector<string> locals;
2305 vector<string> remotes;
2306 bool checkConfig{false};
2307 bool beClient{false};
2308 bool beSupervised{false};
2309 string command;
2310 string config;
2311 string uid;
2312 string gid;
2313 } g_cmdLine;
2314
2315 std::atomic<bool> g_configurationDone{false};
2316
2317 static void usage()
2318 {
2319 cout<<endl;
2320 cout<<"Syntax: dnsdist [-C,--config file] [-c,--client [IP[:PORT]]]\n";
2321 cout<<"[-e,--execute cmd] [-h,--help] [-l,--local addr]\n";
2322 cout<<"[-v,--verbose] [--check-config] [--version]\n";
2323 cout<<"\n";
2324 cout<<"-a,--acl netmask Add this netmask to the ACL\n";
2325 cout<<"-C,--config file Load configuration from 'file'\n";
2326 cout<<"-c,--client Operate as a client, connect to dnsdist. This reads\n";
2327 cout<<" controlSocket from your configuration file, but also\n";
2328 cout<<" accepts an IP:PORT argument\n";
2329 #ifdef HAVE_LIBSODIUM
2330 cout<<"-k,--setkey KEY Use KEY for encrypted communication to dnsdist. This\n";
2331 cout<<" is similar to setting setKey in the configuration file.\n";
2332 cout<<" NOTE: this will leak this key in your shell's history\n";
2333 cout<<" and in the systems running process list.\n";
2334 #endif
2335 cout<<"--check-config Validate the configuration file and exit. The exit-code\n";
2336 cout<<" reflects the validation, 0 is OK, 1 means an error.\n";
2337 cout<<" Any errors are printed as well.\n";
2338 cout<<"-e,--execute cmd Connect to dnsdist and execute 'cmd'\n";
2339 cout<<"-g,--gid gid Change the process group ID after binding sockets\n";
2340 cout<<"-h,--help Display this helpful message\n";
2341 cout<<"-l,--local address Listen on this local address\n";
2342 cout<<"--supervised Don't open a console, I'm supervised\n";
2343 cout<<" (use with e.g. systemd and daemontools)\n";
2344 cout<<"--disable-syslog Don't log to syslog, only to stdout\n";
2345 cout<<" (use with e.g. systemd)\n";
2346 cout<<"-u,--uid uid Change the process user ID after binding sockets\n";
2347 cout<<"-v,--verbose Enable verbose mode\n";
2348 cout<<"-V,--version Show dnsdist version information and exit\n";
2349 }
2350
2351 int main(int argc, char** argv)
2352 try
2353 {
2354 size_t udpBindsCount = 0;
2355 size_t tcpBindsCount = 0;
2356 rl_attempted_completion_function = my_completion;
2357 rl_completion_append_character = 0;
2358
2359 signal(SIGPIPE, SIG_IGN);
2360 signal(SIGCHLD, SIG_IGN);
2361 openlog("dnsdist", LOG_PID|LOG_NDELAY, LOG_DAEMON);
2362
2363 #ifdef HAVE_LIBSODIUM
2364 if (sodium_init() == -1) {
2365 cerr<<"Unable to initialize crypto library"<<endl;
2366 exit(EXIT_FAILURE);
2367 }
2368 g_hashperturb=randombytes_uniform(0xffffffff);
2369 srandom(randombytes_uniform(0xffffffff));
2370 #else
2371 {
2372 struct timeval tv;
2373 gettimeofday(&tv, 0);
2374 srandom(tv.tv_sec ^ tv.tv_usec ^ getpid());
2375 g_hashperturb=random();
2376 }
2377
2378 #endif
2379 ComboAddress clientAddress = ComboAddress();
2380 g_cmdLine.config=SYSCONFDIR "/dnsdist.conf";
2381 struct option longopts[]={
2382 {"acl", required_argument, 0, 'a'},
2383 {"check-config", no_argument, 0, 1},
2384 {"client", no_argument, 0, 'c'},
2385 {"config", required_argument, 0, 'C'},
2386 {"disable-syslog", no_argument, 0, 2},
2387 {"execute", required_argument, 0, 'e'},
2388 {"gid", required_argument, 0, 'g'},
2389 {"help", no_argument, 0, 'h'},
2390 {"local", required_argument, 0, 'l'},
2391 {"setkey", required_argument, 0, 'k'},
2392 {"supervised", no_argument, 0, 3},
2393 {"uid", required_argument, 0, 'u'},
2394 {"verbose", no_argument, 0, 'v'},
2395 {"version", no_argument, 0, 'V'},
2396 {0,0,0,0}
2397 };
2398 int longindex=0;
2399 string optstring;
2400 for(;;) {
2401 int c=getopt_long(argc, argv, "a:cC:e:g:hk:l:u:vV", longopts, &longindex);
2402 if(c==-1)
2403 break;
2404 switch(c) {
2405 case 1:
2406 g_cmdLine.checkConfig=true;
2407 break;
2408 case 2:
2409 g_syslog=false;
2410 break;
2411 case 3:
2412 g_cmdLine.beSupervised=true;
2413 break;
2414 case 'C':
2415 g_cmdLine.config=optarg;
2416 break;
2417 case 'c':
2418 g_cmdLine.beClient=true;
2419 break;
2420 case 'e':
2421 g_cmdLine.command=optarg;
2422 break;
2423 case 'g':
2424 g_cmdLine.gid=optarg;
2425 break;
2426 case 'h':
2427 cout<<"dnsdist "<<VERSION<<endl;
2428 usage();
2429 cout<<"\n";
2430 exit(EXIT_SUCCESS);
2431 break;
2432 case 'a':
2433 optstring=optarg;
2434 g_ACL.modify([optstring](NetmaskGroup& nmg) { nmg.addMask(optstring); });
2435 break;
2436 case 'k':
2437 #ifdef HAVE_LIBSODIUM
2438 if (B64Decode(string(optarg), g_consoleKey) < 0) {
2439 cerr<<"Unable to decode key '"<<optarg<<"'."<<endl;
2440 exit(EXIT_FAILURE);
2441 }
2442 #else
2443 cerr<<"dnsdist has been built without libsodium, -k/--setkey is unsupported."<<endl;
2444 exit(EXIT_FAILURE);
2445 #endif
2446 break;
2447 case 'l':
2448 g_cmdLine.locals.push_back(trim_copy(string(optarg)));
2449 break;
2450 case 'u':
2451 g_cmdLine.uid=optarg;
2452 break;
2453 case 'v':
2454 g_verbose=true;
2455 break;
2456 case 'V':
2457 #ifdef LUAJIT_VERSION
2458 cout<<"dnsdist "<<VERSION<<" ("<<LUA_RELEASE<<" ["<<LUAJIT_VERSION<<"])"<<endl;
2459 #else
2460 cout<<"dnsdist "<<VERSION<<" ("<<LUA_RELEASE<<")"<<endl;
2461 #endif
2462 cout<<"Enabled features: ";
2463 #ifdef HAVE_DNS_OVER_TLS
2464 cout<<"dns-over-tls(";
2465 #ifdef HAVE_GNUTLS
2466 cout<<"gnutls";
2467 #ifdef HAVE_LIBSSL
2468 cout<<" ";
2469 #endif
2470 #endif
2471 #ifdef HAVE_LIBSSL
2472 cout<<"openssl";
2473 #endif
2474 cout<<") ";
2475 #endif
2476 #ifdef HAVE_DNS_OVER_HTTPS
2477 cout<<"dns-over-https(DOH) ";
2478 #endif
2479 #ifdef HAVE_DNSCRYPT
2480 cout<<"dnscrypt ";
2481 #endif
2482 #ifdef HAVE_EBPF
2483 cout<<"ebpf ";
2484 #endif
2485 #ifdef HAVE_FSTRM
2486 cout<<"fstrm ";
2487 #endif
2488 #ifdef HAVE_LIBCRYPTO
2489 cout<<"ipcipher ";
2490 #endif
2491 #ifdef HAVE_LIBSODIUM
2492 cout<<"libsodium ";
2493 #endif
2494 #ifdef HAVE_PROTOBUF
2495 cout<<"protobuf ";
2496 #endif
2497 #ifdef HAVE_RE2
2498 cout<<"re2 ";
2499 #endif
2500 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
2501 cout<<"recvmmsg/sendmmsg ";
2502 #endif
2503 #ifdef HAVE_NET_SNMP
2504 cout<<"snmp ";
2505 #endif
2506 #ifdef HAVE_SYSTEMD
2507 cout<<"systemd";
2508 #endif
2509 cout<<endl;
2510 exit(EXIT_SUCCESS);
2511 break;
2512 case '?':
2513 //getopt_long printed an error message.
2514 usage();
2515 exit(EXIT_FAILURE);
2516 break;
2517 }
2518 }
2519
2520 argc-=optind;
2521 argv+=optind;
2522 for(auto p = argv; *p; ++p) {
2523 if(g_cmdLine.beClient) {
2524 clientAddress = ComboAddress(*p, 5199);
2525 } else {
2526 g_cmdLine.remotes.push_back(*p);
2527 }
2528 }
2529
2530 ServerPolicy leastOutstandingPol{"leastOutstanding", leastOutstanding, false};
2531
2532 g_policy.setState(leastOutstandingPol);
2533 if(g_cmdLine.beClient || !g_cmdLine.command.empty()) {
2534 setupLua(true, g_cmdLine.config);
2535 if (clientAddress != ComboAddress())
2536 g_serverControl = clientAddress;
2537 doClient(g_serverControl, g_cmdLine.command);
2538 _exit(EXIT_SUCCESS);
2539 }
2540
2541 auto acl = g_ACL.getCopy();
2542 if(acl.empty()) {
2543 for(auto& addr : {"127.0.0.0/8", "10.0.0.0/8", "100.64.0.0/10", "169.254.0.0/16", "192.168.0.0/16", "172.16.0.0/12", "::1/128", "fc00::/7", "fe80::/10"})
2544 acl.addMask(addr);
2545 g_ACL.setState(acl);
2546 }
2547
2548 auto consoleACL = g_consoleACL.getCopy();
2549 for (const auto& mask : { "127.0.0.1/8", "::1/128" }) {
2550 consoleACL.addMask(mask);
2551 }
2552 g_consoleACL.setState(consoleACL);
2553
2554 if (g_cmdLine.checkConfig) {
2555 setupLua(true, g_cmdLine.config);
2556 // No exception was thrown
2557 infolog("Configuration '%s' OK!", g_cmdLine.config);
2558 _exit(EXIT_SUCCESS);
2559 }
2560
2561 auto todo=setupLua(false, g_cmdLine.config);
2562
2563 auto localPools = g_pools.getCopy();
2564 {
2565 bool precompute = false;
2566 if (g_policy.getLocal()->name == "chashed") {
2567 precompute = true;
2568 } else {
2569 for (const auto& entry: localPools) {
2570 if (entry.second->policy != nullptr && entry.second->policy->name == "chashed") {
2571 precompute = true;
2572 break ;
2573 }
2574 }
2575 }
2576 if (precompute) {
2577 vinfolog("Pre-computing hashes for consistent hash load-balancing policy");
2578 // pre compute hashes
2579 auto backends = g_dstates.getLocal();
2580 for (auto& backend: *backends) {
2581 backend->hash();
2582 }
2583 }
2584 }
2585
2586 if (!g_cmdLine.locals.empty()) {
2587 for (auto it = g_frontends.begin(); it != g_frontends.end(); ) {
2588 /* DoH, DoT and DNSCrypt frontends are separate */
2589 if ((*it)->dohFrontend == nullptr && (*it)->tlsFrontend == nullptr && (*it)->dnscryptCtx == nullptr) {
2590 it = g_frontends.erase(it);
2591 }
2592 else {
2593 ++it;
2594 }
2595 }
2596
2597 for(const auto& loc : g_cmdLine.locals) {
2598 /* UDP */
2599 g_frontends.push_back(std::unique_ptr<ClientState>(new ClientState(ComboAddress(loc, 53), false, false, 0, "", {})));
2600 /* TCP */
2601 g_frontends.push_back(std::unique_ptr<ClientState>(new ClientState(ComboAddress(loc, 53), true, false, 0, "", {})));
2602 }
2603 }
2604
2605 if (g_frontends.empty()) {
2606 /* UDP */
2607 g_frontends.push_back(std::unique_ptr<ClientState>(new ClientState(ComboAddress("127.0.0.1", 53), false, false, 0, "", {})));
2608 /* TCP */
2609 g_frontends.push_back(std::unique_ptr<ClientState>(new ClientState(ComboAddress("127.0.0.1", 53), true, false, 0, "", {})));
2610 }
2611
2612 g_configurationDone = true;
2613
2614 for(auto& frontend : g_frontends) {
2615 setUpLocalBind(frontend);
2616
2617 if (frontend->tcp == false) {
2618 ++udpBindsCount;
2619 }
2620 else {
2621 ++tcpBindsCount;
2622 }
2623 }
2624
2625 warnlog("dnsdist %s comes with ABSOLUTELY NO WARRANTY. This is free software, and you are welcome to redistribute it according to the terms of the GPL version 2", VERSION);
2626
2627 vector<string> vec;
2628 std::string acls;
2629 g_ACL.getLocal()->toStringVector(&vec);
2630 for(const auto& s : vec) {
2631 if (!acls.empty())
2632 acls += ", ";
2633 acls += s;
2634 }
2635 infolog("ACL allowing queries from: %s", acls.c_str());
2636 vec.clear();
2637 acls.clear();
2638 g_consoleACL.getLocal()->toStringVector(&vec);
2639 for (const auto& entry : vec) {
2640 if (!acls.empty()) {
2641 acls += ", ";
2642 }
2643 acls += entry;
2644 }
2645 infolog("Console ACL allowing connections from: %s", acls.c_str());
2646
2647 #ifdef HAVE_LIBSODIUM
2648 if (g_consoleEnabled && g_consoleKey.empty()) {
2649 warnlog("Warning, the console has been enabled via 'controlSocket()' but no key has been set with 'setKey()' so all connections will fail until a key has been set");
2650 }
2651 #endif
2652
2653 uid_t newgid=0;
2654 gid_t newuid=0;
2655
2656 if(!g_cmdLine.gid.empty())
2657 newgid = strToGID(g_cmdLine.gid.c_str());
2658
2659 if(!g_cmdLine.uid.empty())
2660 newuid = strToUID(g_cmdLine.uid.c_str());
2661
2662 dropGroupPrivs(newgid);
2663 dropUserPrivs(newuid);
2664 try {
2665 /* we might still have capabilities remaining,
2666 for example if we have been started as root
2667 without --uid or --gid (please don't do that)
2668 or as an unprivileged user with ambient
2669 capabilities like CAP_NET_BIND_SERVICE.
2670 */
2671 dropCapabilities();
2672 }
2673 catch(const std::exception& e) {
2674 warnlog("%s", e.what());
2675 }
2676
2677 /* this need to be done _after_ dropping privileges */
2678 g_delay = new DelayPipe<DelayedPacket>();
2679
2680 if (g_snmpAgent) {
2681 g_snmpAgent->run();
2682 }
2683
2684 g_tcpclientthreads = std::unique_ptr<TCPClientCollection>(new TCPClientCollection(g_maxTCPClientThreads, g_useTCPSinglePipe));
2685
2686 for(auto& t : todo)
2687 t();
2688
2689 localPools = g_pools.getCopy();
2690 /* create the default pool no matter what */
2691 createPoolIfNotExists(localPools, "");
2692 if(g_cmdLine.remotes.size()) {
2693 for(const auto& address : g_cmdLine.remotes) {
2694 auto ret=std::make_shared<DownstreamState>(ComboAddress(address, 53));
2695 addServerToPool(localPools, "", ret);
2696 if (ret->connected && !ret->threadStarted.test_and_set()) {
2697 ret->tid = thread(responderThread, ret);
2698 }
2699 g_dstates.modify([ret](servers_t& servers) { servers.push_back(ret); });
2700 }
2701 }
2702 g_pools.setState(localPools);
2703
2704 if(g_dstates.getLocal()->empty()) {
2705 errlog("No downstream servers defined: all packets will get dropped");
2706 // you might define them later, but you need to know
2707 }
2708
2709 checkFileDescriptorsLimits(udpBindsCount, tcpBindsCount);
2710
2711 for(auto& dss : g_dstates.getCopy()) { // it is a copy, but the internal shared_ptrs are the real deal
2712 if(dss->availability==DownstreamState::Availability::Auto) {
2713 bool newState=upCheck(dss);
2714 warnlog("Marking downstream %s as '%s'", dss->getNameWithAddr(), newState ? "up" : "down");
2715 dss->upStatus = newState;
2716 }
2717 }
2718
2719 for(auto& cs : g_frontends) {
2720 if (cs->dohFrontend != nullptr) {
2721 #ifdef HAVE_DNS_OVER_HTTPS
2722 std::thread t1(dohThread, cs.get());
2723 if (!cs->cpus.empty()) {
2724 mapThreadToCPUList(t1.native_handle(), cs->cpus);
2725 }
2726 t1.detach();
2727 #endif /* HAVE_DNS_OVER_HTTPS */
2728 continue;
2729 }
2730 if (cs->udpFD >= 0) {
2731 thread t1(udpClientThread, cs.get());
2732 if (!cs->cpus.empty()) {
2733 mapThreadToCPUList(t1.native_handle(), cs->cpus);
2734 }
2735 t1.detach();
2736 }
2737 else if (cs->tcpFD >= 0) {
2738 thread t1(tcpAcceptorThread, cs.get());
2739 if (!cs->cpus.empty()) {
2740 mapThreadToCPUList(t1.native_handle(), cs->cpus);
2741 }
2742 t1.detach();
2743 }
2744 }
2745
2746 thread carbonthread(carbonDumpThread);
2747 carbonthread.detach();
2748
2749 thread stattid(maintThread);
2750 stattid.detach();
2751
2752 thread healththread(healthChecksThread);
2753
2754 if (!g_secPollSuffix.empty()) {
2755 thread secpollthread(secPollThread);
2756 secpollthread.detach();
2757 }
2758
2759 if(g_cmdLine.beSupervised) {
2760 #ifdef HAVE_SYSTEMD
2761 sd_notify(0, "READY=1");
2762 #endif
2763 healththread.join();
2764 }
2765 else {
2766 healththread.detach();
2767 doConsole();
2768 }
2769 _exit(EXIT_SUCCESS);
2770
2771 }
2772 catch(const LuaContext::ExecutionErrorException& e) {
2773 try {
2774 errlog("Fatal Lua error: %s", e.what());
2775 std::rethrow_if_nested(e);
2776 } catch(const std::exception& ne) {
2777 errlog("Details: %s", ne.what());
2778 }
2779 catch(PDNSException &ae)
2780 {
2781 errlog("Fatal pdns error: %s", ae.reason);
2782 }
2783 _exit(EXIT_FAILURE);
2784 }
2785 catch(std::exception &e)
2786 {
2787 errlog("Fatal error: %s", e.what());
2788 _exit(EXIT_FAILURE);
2789 }
2790 catch(PDNSException &ae)
2791 {
2792 errlog("Fatal pdns error: %s", ae.reason);
2793 _exit(EXIT_FAILURE);
2794 }