]> git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/dnsdist.cc
Merge pull request #8342 from chbruyand/pipebackend-unused-warning
[thirdparty/pdns.git] / pdns / dnsdist.cc
1 /*
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
8 *
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
22
23 #include "config.h"
24
25 #include <fstream>
26 #include <getopt.h>
27 #include <grp.h>
28 #include <limits>
29 #include <netinet/tcp.h>
30 #include <pwd.h>
31 #include <sys/resource.h>
32 #include <unistd.h>
33
34 #if defined (__OpenBSD__) || defined(__NetBSD__)
35 #include <readline/readline.h>
36 #else
37 #include <editline/readline.h>
38 #endif
39
40 #ifdef HAVE_SYSTEMD
41 #include <systemd/sd-daemon.h>
42 #endif
43
44 #include "dnsdist.hh"
45 #include "dnsdist-cache.hh"
46 #include "dnsdist-console.hh"
47 #include "dnsdist-ecs.hh"
48 #include "dnsdist-lua.hh"
49 #include "dnsdist-rings.hh"
50 #include "dnsdist-secpoll.hh"
51 #include "dnsdist-xpf.hh"
52
53 #include "base64.hh"
54 #include "delaypipe.hh"
55 #include "dolog.hh"
56 #include "dnsname.hh"
57 #include "dnsparser.hh"
58 #include "dnswriter.hh"
59 #include "ednsoptions.hh"
60 #include "gettime.hh"
61 #include "lock.hh"
62 #include "misc.hh"
63 #include "sodcrypto.hh"
64 #include "sstuff.hh"
65 #include "threadname.hh"
66
67 /* Known sins:
68
69 Receiver is currently single threaded
70 not *that* bad actually, but now that we are thread safe, might want to scale
71 */
72
73 /* the Rulaction plan
74 Set of Rules, if one matches, it leads to an Action
75 Both rules and actions could conceivably be Lua based.
76 On the C++ side, both could be inherited from a class Rule and a class Action,
77 on the Lua side we can't do that. */
78
79 using std::atomic;
80 using std::thread;
81 bool g_verbose;
82
83 struct DNSDistStats g_stats;
84 MetricDefinitionStorage g_metricDefinitions;
85
86 uint16_t g_maxOutstanding{std::numeric_limits<uint16_t>::max()};
87 bool g_verboseHealthChecks{false};
88 uint32_t g_staleCacheEntriesTTL{0};
89 bool g_syslog{true};
90 bool g_allowEmptyResponse{false};
91
92 GlobalStateHolder<NetmaskGroup> g_ACL;
93 string g_outputBuffer;
94
95 std::vector<std::shared_ptr<TLSFrontend>> g_tlslocals;
96 std::vector<std::shared_ptr<DOHFrontend>> g_dohlocals;
97 std::vector<std::shared_ptr<DNSCryptContext>> g_dnsCryptLocals;
98 #ifdef HAVE_EBPF
99 shared_ptr<BPFFilter> g_defaultBPFFilter;
100 std::vector<std::shared_ptr<DynBPFFilter> > g_dynBPFFilters;
101 #endif /* HAVE_EBPF */
102 std::vector<std::unique_ptr<ClientState>> g_frontends;
103 GlobalStateHolder<pools_t> g_pools;
104 size_t g_udpVectorSize{1};
105
106 bool g_snmpEnabled{false};
107 bool g_snmpTrapsEnabled{false};
108 DNSDistSNMPAgent* g_snmpAgent{nullptr};
109
110 /* UDP: the grand design. Per socket we listen on for incoming queries there is one thread.
111 Then we have a bunch of connected sockets for talking to downstream servers.
112 We send directly to those sockets.
113
114 For the return path, per downstream server we have a thread that listens to responses.
115
116 Per socket there is an array of 2^16 states, when we send out a packet downstream, we note
117 there the original requestor and the original id. The new ID is the offset in the array.
118
119 When an answer comes in on a socket, we look up the offset by the id, and lob it to the
120 original requestor.
121
122 IDs are assigned by atomic increments of the socket offset.
123 */
124
125 GlobalStateHolder<vector<DNSDistRuleAction> > g_rulactions;
126 GlobalStateHolder<vector<DNSDistResponseRuleAction> > g_resprulactions;
127 GlobalStateHolder<vector<DNSDistResponseRuleAction> > g_cachehitresprulactions;
128 GlobalStateHolder<vector<DNSDistResponseRuleAction> > g_selfansweredresprulactions;
129
130 Rings g_rings;
131 QueryCount g_qcount;
132
133 GlobalStateHolder<servers_t> g_dstates;
134 GlobalStateHolder<NetmaskTree<DynBlock>> g_dynblockNMG;
135 GlobalStateHolder<SuffixMatchTree<DynBlock>> g_dynblockSMT;
136 DNSAction::Action g_dynBlockAction = DNSAction::Action::Drop;
137 int g_tcpRecvTimeout{2};
138 int g_tcpSendTimeout{2};
139 int g_udpTimeout{2};
140
141 bool g_servFailOnNoPolicy{false};
142 bool g_truncateTC{false};
143 bool g_fixupCase{false};
144 bool g_preserveTrailingData{false};
145 bool g_roundrobinFailOnNoServer{false};
146
147 static void truncateTC(char* packet, uint16_t* len, size_t responseSize, unsigned int consumed)
148 try
149 {
150 bool hadEDNS = false;
151 uint16_t payloadSize = 0;
152 uint16_t z = 0;
153
154 if (g_addEDNSToSelfGeneratedResponses) {
155 hadEDNS = getEDNSUDPPayloadSizeAndZ(packet, *len, &payloadSize, &z);
156 }
157
158 *len=static_cast<uint16_t>(sizeof(dnsheader)+consumed+DNS_TYPE_SIZE+DNS_CLASS_SIZE);
159 struct dnsheader* dh = reinterpret_cast<struct dnsheader*>(packet);
160 dh->ancount = dh->arcount = dh->nscount = 0;
161
162 if (hadEDNS) {
163 addEDNS(dh, *len, responseSize, z & EDNS_HEADER_FLAG_DO, payloadSize, 0);
164 }
165 }
166 catch(...)
167 {
168 g_stats.truncFail++;
169 }
170
171 struct DelayedPacket
172 {
173 int fd;
174 string packet;
175 ComboAddress destination;
176 ComboAddress origDest;
177 void operator()()
178 {
179 ssize_t res;
180 if(origDest.sin4.sin_family == 0) {
181 res = sendto(fd, packet.c_str(), packet.size(), 0, (struct sockaddr*)&destination, destination.getSocklen());
182 }
183 else {
184 res = sendfromto(fd, packet.c_str(), packet.size(), 0, origDest, destination);
185 }
186 if (res == -1) {
187 int err = errno;
188 vinfolog("Error sending delayed response to %s: %s", destination.toStringWithPort(), strerror(err));
189 }
190 }
191 };
192
193 DelayPipe<DelayedPacket>* g_delay = nullptr;
194
195 void doLatencyStats(double udiff)
196 {
197 if(udiff < 1000) ++g_stats.latency0_1;
198 else if(udiff < 10000) ++g_stats.latency1_10;
199 else if(udiff < 50000) ++g_stats.latency10_50;
200 else if(udiff < 100000) ++g_stats.latency50_100;
201 else if(udiff < 1000000) ++g_stats.latency100_1000;
202 else ++g_stats.latencySlow;
203 g_stats.latencySum += udiff / 1000;
204
205 auto doAvg = [](double& var, double n, double weight) {
206 var = (weight -1) * var/weight + n/weight;
207 };
208
209 doAvg(g_stats.latencyAvg100, udiff, 100);
210 doAvg(g_stats.latencyAvg1000, udiff, 1000);
211 doAvg(g_stats.latencyAvg10000, udiff, 10000);
212 doAvg(g_stats.latencyAvg1000000, udiff, 1000000);
213 }
214
215 bool responseContentMatches(const char* response, const uint16_t responseLen, const DNSName& qname, const uint16_t qtype, const uint16_t qclass, const ComboAddress& remote, unsigned int& consumed)
216 {
217 if (responseLen < sizeof(dnsheader)) {
218 return false;
219 }
220
221 const struct dnsheader* dh = reinterpret_cast<const struct dnsheader*>(response);
222 if (dh->qdcount == 0) {
223 if ((dh->rcode != RCode::NoError && dh->rcode != RCode::NXDomain) || g_allowEmptyResponse) {
224 return true;
225 }
226 else {
227 ++g_stats.nonCompliantResponses;
228 return false;
229 }
230 }
231
232 uint16_t rqtype, rqclass;
233 DNSName rqname;
234 try {
235 rqname=DNSName(response, responseLen, sizeof(dnsheader), false, &rqtype, &rqclass, &consumed);
236 }
237 catch(const std::exception& e) {
238 if(responseLen > 0 && static_cast<size_t>(responseLen) > sizeof(dnsheader)) {
239 infolog("Backend %s sent us a response with id %d that did not parse: %s", remote.toStringWithPort(), ntohs(dh->id), e.what());
240 }
241 ++g_stats.nonCompliantResponses;
242 return false;
243 }
244
245 if (rqtype != qtype || rqclass != qclass || rqname != qname) {
246 return false;
247 }
248
249 return true;
250 }
251
252 static void restoreFlags(struct dnsheader* dh, uint16_t origFlags)
253 {
254 static const uint16_t rdMask = 1 << FLAGS_RD_OFFSET;
255 static const uint16_t cdMask = 1 << FLAGS_CD_OFFSET;
256 static const uint16_t restoreFlagsMask = UINT16_MAX & ~(rdMask | cdMask);
257 uint16_t * flags = getFlagsFromDNSHeader(dh);
258 /* clear the flags we are about to restore */
259 *flags &= restoreFlagsMask;
260 /* only keep the flags we want to restore */
261 origFlags &= ~restoreFlagsMask;
262 /* set the saved flags as they were */
263 *flags |= origFlags;
264 }
265
266 static bool fixUpQueryTurnedResponse(DNSQuestion& dq, const uint16_t origFlags)
267 {
268 restoreFlags(dq.dh, origFlags);
269
270 return addEDNSToQueryTurnedResponse(dq);
271 }
272
273 static bool fixUpResponse(char** response, uint16_t* responseLen, size_t* responseSize, const DNSName& qname, uint16_t origFlags, bool ednsAdded, bool ecsAdded, std::vector<uint8_t>& rewrittenResponse, uint16_t addRoom, bool* zeroScope)
274 {
275 if (*responseLen < sizeof(dnsheader)) {
276 return false;
277 }
278
279 struct dnsheader* dh = reinterpret_cast<struct dnsheader*>(*response);
280 restoreFlags(dh, origFlags);
281
282 if (*responseLen == sizeof(dnsheader)) {
283 return true;
284 }
285
286 if(g_fixupCase) {
287 string realname = qname.toDNSString();
288 if (*responseLen >= (sizeof(dnsheader) + realname.length())) {
289 memcpy(*response + sizeof(dnsheader), realname.c_str(), realname.length());
290 }
291 }
292
293 if (ednsAdded || ecsAdded) {
294 uint16_t optStart;
295 size_t optLen = 0;
296 bool last = false;
297
298 const std::string responseStr(*response, *responseLen);
299 int res = locateEDNSOptRR(responseStr, &optStart, &optLen, &last);
300
301 if (res == 0) {
302 if (zeroScope) { // this finds if an EDNS Client Subnet scope was set, and if it is 0
303 size_t optContentStart = 0;
304 uint16_t optContentLen = 0;
305 /* we need at least 4 bytes after the option length (family: 2, source prefix-length: 1, scope prefix-length: 1) */
306 if (isEDNSOptionInOpt(responseStr, optStart, optLen, EDNSOptionCode::ECS, &optContentStart, &optContentLen) && optContentLen >= 4) {
307 /* see if the EDNS Client Subnet SCOPE PREFIX-LENGTH byte in position 3 is set to 0, which is the only thing
308 we care about. */
309 *zeroScope = responseStr.at(optContentStart + 3) == 0;
310 }
311 }
312
313 if (ednsAdded) {
314 /* we added the entire OPT RR,
315 therefore we need to remove it entirely */
316 if (last) {
317 /* simply remove the last AR */
318 *responseLen -= optLen;
319 uint16_t arcount = ntohs(dh->arcount);
320 arcount--;
321 dh->arcount = htons(arcount);
322 }
323 else {
324 /* Removing an intermediary RR could lead to compression error */
325 if (rewriteResponseWithoutEDNS(responseStr, rewrittenResponse) == 0) {
326 *responseLen = rewrittenResponse.size();
327 if (addRoom && (UINT16_MAX - *responseLen) > addRoom) {
328 rewrittenResponse.reserve(*responseLen + addRoom);
329 }
330 *responseSize = rewrittenResponse.capacity();
331 *response = reinterpret_cast<char*>(rewrittenResponse.data());
332 }
333 else {
334 warnlog("Error rewriting content");
335 }
336 }
337 }
338 else {
339 /* the OPT RR was already present, but without ECS,
340 we need to remove the ECS option if any */
341 if (last) {
342 /* nothing after the OPT RR, we can simply remove the
343 ECS option */
344 size_t existingOptLen = optLen;
345 removeEDNSOptionFromOPT(*response + optStart, &optLen, EDNSOptionCode::ECS);
346 *responseLen -= (existingOptLen - optLen);
347 }
348 else {
349 /* Removing an intermediary RR could lead to compression error */
350 if (rewriteResponseWithoutEDNSOption(responseStr, EDNSOptionCode::ECS, rewrittenResponse) == 0) {
351 *responseLen = rewrittenResponse.size();
352 if (addRoom && (UINT16_MAX - *responseLen) > addRoom) {
353 rewrittenResponse.reserve(*responseLen + addRoom);
354 }
355 *responseSize = rewrittenResponse.capacity();
356 *response = reinterpret_cast<char*>(rewrittenResponse.data());
357 }
358 else {
359 warnlog("Error rewriting content");
360 }
361 }
362 }
363 }
364 }
365
366 return true;
367 }
368
369 #ifdef HAVE_DNSCRYPT
370 static bool encryptResponse(char* response, uint16_t* responseLen, size_t responseSize, bool tcp, std::shared_ptr<DNSCryptQuery> dnsCryptQuery, dnsheader** dh, dnsheader* dhCopy)
371 {
372 if (dnsCryptQuery) {
373 uint16_t encryptedResponseLen = 0;
374
375 /* save the original header before encrypting it in place */
376 if (dh != nullptr && *dh != nullptr && dhCopy != nullptr) {
377 memcpy(dhCopy, *dh, sizeof(dnsheader));
378 *dh = dhCopy;
379 }
380
381 int res = dnsCryptQuery->encryptResponse(response, *responseLen, responseSize, tcp, &encryptedResponseLen);
382 if (res == 0) {
383 *responseLen = encryptedResponseLen;
384 } else {
385 /* dropping response */
386 vinfolog("Error encrypting the response, dropping.");
387 return false;
388 }
389 }
390 return true;
391 }
392 #endif /* HAVE_DNSCRYPT */
393
394 static bool applyRulesToResponse(LocalStateHolder<vector<DNSDistResponseRuleAction> >& localRespRulactions, DNSResponse& dr)
395 {
396 DNSResponseAction::Action action=DNSResponseAction::Action::None;
397 std::string ruleresult;
398 for(const auto& lr : *localRespRulactions) {
399 if(lr.d_rule->matches(&dr)) {
400 lr.d_rule->d_matches++;
401 action=(*lr.d_action)(&dr, &ruleresult);
402 switch(action) {
403 case DNSResponseAction::Action::Allow:
404 return true;
405 break;
406 case DNSResponseAction::Action::Drop:
407 return false;
408 break;
409 case DNSResponseAction::Action::HeaderModify:
410 return true;
411 break;
412 case DNSResponseAction::Action::ServFail:
413 dr.dh->rcode = RCode::ServFail;
414 return true;
415 break;
416 /* non-terminal actions follow */
417 case DNSResponseAction::Action::Delay:
418 dr.delayMsec = static_cast<int>(pdns_stou(ruleresult)); // sorry
419 break;
420 case DNSResponseAction::Action::None:
421 break;
422 }
423 }
424 }
425
426 return true;
427 }
428
429 bool processResponse(char** response, uint16_t* responseLen, size_t* responseSize, LocalStateHolder<vector<DNSDistResponseRuleAction> >& localRespRulactions, DNSResponse& dr, size_t addRoom, std::vector<uint8_t>& rewrittenResponse, bool muted)
430 {
431 if (!applyRulesToResponse(localRespRulactions, dr)) {
432 return false;
433 }
434
435 bool zeroScope = false;
436 if (!fixUpResponse(response, responseLen, responseSize, *dr.qname, dr.origFlags, dr.ednsAdded, dr.ecsAdded, rewrittenResponse, addRoom, dr.useZeroScope ? &zeroScope : nullptr)) {
437 return false;
438 }
439
440 if (dr.packetCache && !dr.skipCache) {
441 if (!dr.useZeroScope) {
442 /* if the query was not suitable for zero-scope, for
443 example because it had an existing ECS entry so the hash is
444 not really 'no ECS', so just insert it for the existing subnet
445 since:
446 - we don't have the correct hash for a non-ECS query
447 - inserting with hash computed before the ECS replacement but with
448 the subnet extracted _after_ the replacement would not work.
449 */
450 zeroScope = false;
451 }
452 // if zeroScope, pass the pre-ECS hash-key and do not pass the subnet to the cache
453 dr.packetCache->insert(zeroScope ? dr.cacheKeyNoECS : dr.cacheKey, zeroScope ? boost::none : dr.subnet, dr.origFlags, dr.dnssecOK, *dr.qname, dr.qtype, dr.qclass, *response, *responseLen, dr.tcp, dr.dh->rcode, dr.tempFailureTTL);
454 }
455
456 #ifdef HAVE_DNSCRYPT
457 if (!muted) {
458 if (!encryptResponse(*response, responseLen, *responseSize, dr.tcp, dr.dnsCryptQuery, nullptr, nullptr)) {
459 return false;
460 }
461 }
462 #endif /* HAVE_DNSCRYPT */
463
464 return true;
465 }
466
467 static bool sendUDPResponse(int origFD, const char* response, const uint16_t responseLen, const int delayMsec, const ComboAddress& origDest, const ComboAddress& origRemote)
468 {
469 if(delayMsec && g_delay) {
470 DelayedPacket dp{origFD, string(response,responseLen), origRemote, origDest};
471 g_delay->submit(dp, delayMsec);
472 }
473 else {
474 ssize_t res;
475 if(origDest.sin4.sin_family == 0) {
476 res = sendto(origFD, response, responseLen, 0, reinterpret_cast<const struct sockaddr*>(&origRemote), origRemote.getSocklen());
477 }
478 else {
479 res = sendfromto(origFD, response, responseLen, 0, origDest, origRemote);
480 }
481 if (res == -1) {
482 int err = errno;
483 vinfolog("Error sending response to %s: %s", origRemote.toStringWithPort(), stringerror(err));
484 }
485 }
486
487 return true;
488 }
489
490
491 int pickBackendSocketForSending(std::shared_ptr<DownstreamState>& state)
492 {
493 return state->sockets[state->socketsOffset++ % state->sockets.size()];
494 }
495
496 static void pickBackendSocketsReadyForReceiving(const std::shared_ptr<DownstreamState>& state, std::vector<int>& ready)
497 {
498 ready.clear();
499
500 if (state->sockets.size() == 1) {
501 ready.push_back(state->sockets[0]);
502 return ;
503 }
504
505 {
506 std::lock_guard<std::mutex> lock(state->socketsLock);
507 state->mplexer->getAvailableFDs(ready, -1);
508 }
509 }
510
511 // listens on a dedicated socket, lobs answers from downstream servers to original requestors
512 void responderThread(std::shared_ptr<DownstreamState> dss)
513 try {
514 setThreadName("dnsdist/respond");
515 auto localRespRulactions = g_resprulactions.getLocal();
516 char packet[4096 + DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE];
517 static_assert(sizeof(packet) <= UINT16_MAX, "Packet size should fit in a uint16_t");
518 /* when the answer is encrypted in place, we need to get a copy
519 of the original header before encryption to fill the ring buffer */
520 dnsheader cleartextDH;
521 vector<uint8_t> rewrittenResponse;
522
523 uint16_t queryId = 0;
524 std::vector<int> sockets;
525 sockets.reserve(dss->sockets.size());
526
527 for(;;) {
528 dnsheader* dh = reinterpret_cast<struct dnsheader*>(packet);
529 try {
530 pickBackendSocketsReadyForReceiving(dss, sockets);
531 for (const auto& fd : sockets) {
532 ssize_t got = recv(fd, packet, sizeof(packet), 0);
533 char * response = packet;
534 size_t responseSize = sizeof(packet);
535
536 if (got < 0 || static_cast<size_t>(got) < sizeof(dnsheader))
537 continue;
538
539 uint16_t responseLen = static_cast<uint16_t>(got);
540 queryId = dh->id;
541
542 if(queryId >= dss->idStates.size()) {
543 continue;
544 }
545
546 IDState* ids = &dss->idStates[queryId];
547 int64_t usageIndicator = ids->usageIndicator;
548
549 if(!IDState::isInUse(usageIndicator)) {
550 /* the corresponding state is marked as not in use, meaning that:
551 - it was already cleaned up by another thread and the state is gone ;
552 - we already got a response for this query and this one is a duplicate.
553 Either way, we don't touch it.
554 */
555 continue;
556 }
557
558 /* read the potential DOHUnit state as soon as possible, but don't use it
559 until we have confirmed that we own this state by updating usageIndicator */
560 auto du = ids->du;
561 /* setting age to 0 to prevent the maintainer thread from
562 cleaning this IDS while we process the response.
563 */
564 ids->age = 0;
565 int origFD = ids->origFD;
566
567 unsigned int consumed = 0;
568 if (!responseContentMatches(response, responseLen, ids->qname, ids->qtype, ids->qclass, dss->remote, consumed)) {
569 continue;
570 }
571
572 bool isDoH = du != nullptr;
573 /* atomically mark the state as available, but only if it has not been altered
574 in the meantime */
575 if (ids->tryMarkUnused(usageIndicator)) {
576 /* clear the potential DOHUnit asap, it's ours now
577 and since we just marked the state as unused,
578 someone could overwrite it. */
579 ids->du = nullptr;
580 /* we only decrement the outstanding counter if the value was not
581 altered in the meantime, which would mean that the state has been actively reused
582 and the other thread has not incremented the outstanding counter, so we don't
583 want it to be decremented twice. */
584 --dss->outstanding; // you'd think an attacker could game this, but we're using connected socket
585 } else {
586 /* someone updated the state in the meantime, we can't touch the existing pointer */
587 du = nullptr;
588 /* since the state has been updated, we can't safely access it so let's just drop
589 this response */
590 continue;
591 }
592
593 if(dh->tc && g_truncateTC) {
594 truncateTC(response, &responseLen, responseSize, consumed);
595 }
596
597 dh->id = ids->origID;
598
599 uint16_t addRoom = 0;
600 DNSResponse dr = makeDNSResponseFromIDState(*ids, dh, sizeof(packet), responseLen, false);
601 if (dr.dnsCryptQuery) {
602 addRoom = DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE;
603 }
604
605 memcpy(&cleartextDH, dr.dh, sizeof(cleartextDH));
606 if (!processResponse(&response, &responseLen, &responseSize, localRespRulactions, dr, addRoom, rewrittenResponse, ids->cs && ids->cs->muted)) {
607 continue;
608 }
609
610 if (ids->cs && !ids->cs->muted) {
611 if (du) {
612 #ifdef HAVE_DNS_OVER_HTTPS
613 // DoH query
614 du->response = std::string(response, responseLen);
615 if (send(du->rsock, &du, sizeof(du), 0) != sizeof(du)) {
616 /* at this point we have the only remaining pointer on this
617 DOHUnit object since we did set ids->du to nullptr earlier */
618 delete du;
619 }
620 #endif /* HAVE_DNS_OVER_HTTPS */
621 du = nullptr;
622 }
623 else {
624 ComboAddress empty;
625 empty.sin4.sin_family = 0;
626 /* if ids->destHarvested is false, origDest holds the listening address.
627 We don't want to use that as a source since it could be 0.0.0.0 for example. */
628 sendUDPResponse(origFD, response, responseLen, dr.delayMsec, ids->destHarvested ? ids->origDest : empty, ids->origRemote);
629 }
630 }
631
632 ++g_stats.responses;
633 ++ids->cs->responses;
634 ++dss->responses;
635
636 double udiff = ids->sentTime.udiff();
637 vinfolog("Got answer from %s, relayed to %s%s, took %f usec", dss->remote.toStringWithPort(), ids->origRemote.toStringWithPort(),
638 isDoH ? " (https)": "", udiff);
639
640 struct timespec ts;
641 gettime(&ts);
642 g_rings.insertResponse(ts, *dr.remote, *dr.qname, dr.qtype, static_cast<unsigned int>(udiff), static_cast<unsigned int>(got), cleartextDH, dss->remote);
643
644 switch (cleartextDH.rcode) {
645 case RCode::NXDomain:
646 ++g_stats.frontendNXDomain;
647 break;
648 case RCode::ServFail:
649 ++g_stats.servfailResponses;
650 ++g_stats.frontendServFail;
651 break;
652 case RCode::NoError:
653 ++g_stats.frontendNoError;
654 break;
655 }
656 dss->latencyUsec = (127.0 * dss->latencyUsec / 128.0) + udiff/128.0;
657
658 doLatencyStats(udiff);
659
660 rewrittenResponse.clear();
661 }
662 }
663 catch(const std::exception& e){
664 vinfolog("Got an error in UDP responder thread while parsing a response from %s, id %d: %s", dss->remote.toStringWithPort(), queryId, e.what());
665 }
666 }
667 }
668 catch(const std::exception& e)
669 {
670 errlog("UDP responder thread died because of exception: %s", e.what());
671 }
672 catch(const PDNSException& e)
673 {
674 errlog("UDP responder thread died because of PowerDNS exception: %s", e.reason);
675 }
676 catch(...)
677 {
678 errlog("UDP responder thread died because of an exception: %s", "unknown");
679 }
680
681 bool DownstreamState::reconnect()
682 {
683 std::unique_lock<std::mutex> tl(connectLock, std::try_to_lock);
684 if (!tl.owns_lock()) {
685 /* we are already reconnecting */
686 return false;
687 }
688
689 connected = false;
690 for (auto& fd : sockets) {
691 if (fd != -1) {
692 if (sockets.size() > 1) {
693 std::lock_guard<std::mutex> lock(socketsLock);
694 mplexer->removeReadFD(fd);
695 }
696 /* shutdown() is needed to wake up recv() in the responderThread */
697 shutdown(fd, SHUT_RDWR);
698 close(fd);
699 fd = -1;
700 }
701 if (!IsAnyAddress(remote)) {
702 fd = SSocket(remote.sin4.sin_family, SOCK_DGRAM, 0);
703 if (!IsAnyAddress(sourceAddr)) {
704 SSetsockopt(fd, SOL_SOCKET, SO_REUSEADDR, 1);
705 SBind(fd, sourceAddr);
706 }
707 try {
708 SConnect(fd, remote);
709 if (sockets.size() > 1) {
710 std::lock_guard<std::mutex> lock(socketsLock);
711 mplexer->addReadFD(fd, [](int, boost::any) {});
712 }
713 connected = true;
714 }
715 catch(const std::runtime_error& error) {
716 infolog("Error connecting to new server with address %s: %s", remote.toStringWithPort(), error.what());
717 connected = false;
718 break;
719 }
720 }
721 }
722
723 /* if at least one (re-)connection failed, close all sockets */
724 if (!connected) {
725 for (auto& fd : sockets) {
726 if (fd != -1) {
727 if (sockets.size() > 1) {
728 std::lock_guard<std::mutex> lock(socketsLock);
729 mplexer->removeReadFD(fd);
730 }
731 /* shutdown() is needed to wake up recv() in the responderThread */
732 shutdown(fd, SHUT_RDWR);
733 close(fd);
734 fd = -1;
735 }
736 }
737 }
738
739 return connected;
740 }
741 void DownstreamState::hash()
742 {
743 vinfolog("Computing hashes for id=%s and weight=%d", id, weight);
744 auto w = weight;
745 WriteLock wl(&d_lock);
746 hashes.clear();
747 while (w > 0) {
748 std::string uuid = boost::str(boost::format("%s-%d") % id % w);
749 unsigned int wshash = burtleCI((const unsigned char*)uuid.c_str(), uuid.size(), g_hashperturb);
750 hashes.insert(wshash);
751 --w;
752 }
753 }
754
755 void DownstreamState::setId(const boost::uuids::uuid& newId)
756 {
757 id = newId;
758 // compute hashes only if already done
759 if (!hashes.empty()) {
760 hash();
761 }
762 }
763
764 void DownstreamState::setWeight(int newWeight)
765 {
766 if (newWeight < 1) {
767 errlog("Error setting server's weight: downstream weight value must be greater than 0.");
768 return ;
769 }
770 weight = newWeight;
771 if (!hashes.empty()) {
772 hash();
773 }
774 }
775
776 DownstreamState::DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf_, size_t numberOfSockets): remote(remote_), sourceAddr(sourceAddr_), sourceItf(sourceItf_)
777 {
778 pthread_rwlock_init(&d_lock, nullptr);
779 id = getUniqueID();
780 threadStarted.clear();
781
782 mplexer = std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
783
784 sockets.resize(numberOfSockets);
785 for (auto& fd : sockets) {
786 fd = -1;
787 }
788
789 if (!IsAnyAddress(remote)) {
790 reconnect();
791 idStates.resize(g_maxOutstanding);
792 sw.start();
793 infolog("Added downstream server %s", remote.toStringWithPort());
794 }
795
796 }
797
798 std::mutex g_luamutex;
799 LuaContext g_lua;
800
801 GlobalStateHolder<ServerPolicy> g_policy;
802
803 shared_ptr<DownstreamState> firstAvailable(const NumberedServerVector& servers, const DNSQuestion* dq)
804 {
805 for(auto& d : servers) {
806 if(d.second->isUp() && d.second->qps.check())
807 return d.second;
808 }
809 return leastOutstanding(servers, dq);
810 }
811
812 // get server with least outstanding queries, and within those, with the lowest order, and within those: the fastest
813 shared_ptr<DownstreamState> leastOutstanding(const NumberedServerVector& servers, const DNSQuestion* dq)
814 {
815 if (servers.size() == 1 && servers[0].second->isUp()) {
816 return servers[0].second;
817 }
818
819 vector<pair<tuple<int,int,double>, shared_ptr<DownstreamState>>> poss;
820 /* so you might wonder, why do we go through this trouble? The data on which we sort could change during the sort,
821 which would suck royally and could even lead to crashes. So first we snapshot on what we sort, and then we sort */
822 poss.reserve(servers.size());
823 for(auto& d : servers) {
824 if(d.second->isUp()) {
825 poss.push_back({make_tuple(d.second->outstanding.load(), d.second->order, d.second->latencyUsec), d.second});
826 }
827 }
828 if(poss.empty())
829 return shared_ptr<DownstreamState>();
830 nth_element(poss.begin(), poss.begin(), poss.end(), [](const decltype(poss)::value_type& a, const decltype(poss)::value_type& b) { return a.first < b.first; });
831 return poss.begin()->second;
832 }
833
834 shared_ptr<DownstreamState> valrandom(unsigned int val, const NumberedServerVector& servers, const DNSQuestion* dq)
835 {
836 vector<pair<int, shared_ptr<DownstreamState>>> poss;
837 int sum = 0;
838 int max = std::numeric_limits<int>::max();
839
840 for(auto& d : servers) { // w=1, w=10 -> 1, 11
841 if(d.second->isUp()) {
842 // Don't overflow sum when adding high weights
843 if(d.second->weight > max - sum) {
844 sum = max;
845 } else {
846 sum += d.second->weight;
847 }
848
849 poss.push_back({sum, d.second});
850 }
851 }
852
853 // Catch poss & sum are empty to avoid SIGFPE
854 if(poss.empty())
855 return shared_ptr<DownstreamState>();
856
857 int r = val % sum;
858 auto p = upper_bound(poss.begin(), poss.end(),r, [](int r_, const decltype(poss)::value_type& a) { return r_ < a.first;});
859 if(p==poss.end())
860 return shared_ptr<DownstreamState>();
861 return p->second;
862 }
863
864 shared_ptr<DownstreamState> wrandom(const NumberedServerVector& servers, const DNSQuestion* dq)
865 {
866 return valrandom(random(), servers, dq);
867 }
868
869 uint32_t g_hashperturb;
870 shared_ptr<DownstreamState> whashed(const NumberedServerVector& servers, const DNSQuestion* dq)
871 {
872 return valrandom(dq->qname->hash(g_hashperturb), servers, dq);
873 }
874
875 shared_ptr<DownstreamState> chashed(const NumberedServerVector& servers, const DNSQuestion* dq)
876 {
877 unsigned int qhash = dq->qname->hash(g_hashperturb);
878 unsigned int sel = std::numeric_limits<unsigned int>::max();
879 unsigned int min = std::numeric_limits<unsigned int>::max();
880 shared_ptr<DownstreamState> ret = nullptr, first = nullptr;
881
882 for (const auto& d: servers) {
883 if (d.second->isUp()) {
884 // make sure hashes have been computed
885 if (d.second->hashes.empty()) {
886 d.second->hash();
887 }
888 {
889 ReadLock rl(&(d.second->d_lock));
890 const auto& server = d.second;
891 // we want to keep track of the last hash
892 if (min > *(server->hashes.begin())) {
893 min = *(server->hashes.begin());
894 first = server;
895 }
896
897 auto hash_it = server->hashes.lower_bound(qhash);
898 if (hash_it != server->hashes.end()) {
899 if (*hash_it < sel) {
900 sel = *hash_it;
901 ret = server;
902 }
903 }
904 }
905 }
906 }
907 if (ret != nullptr) {
908 return ret;
909 }
910 if (first != nullptr) {
911 return first;
912 }
913 return shared_ptr<DownstreamState>();
914 }
915
916 shared_ptr<DownstreamState> roundrobin(const NumberedServerVector& servers, const DNSQuestion* dq)
917 {
918 NumberedServerVector poss;
919
920 for(auto& d : servers) {
921 if(d.second->isUp()) {
922 poss.push_back(d);
923 }
924 }
925
926 const auto *res=&poss;
927 if(poss.empty() && !g_roundrobinFailOnNoServer)
928 res = &servers;
929
930 if(res->empty())
931 return shared_ptr<DownstreamState>();
932
933 static unsigned int counter;
934
935 return (*res)[(counter++) % res->size()].second;
936 }
937
938 ComboAddress g_serverControl{"127.0.0.1:5199"};
939
940 std::shared_ptr<ServerPool> createPoolIfNotExists(pools_t& pools, const string& poolName)
941 {
942 std::shared_ptr<ServerPool> pool;
943 pools_t::iterator it = pools.find(poolName);
944 if (it != pools.end()) {
945 pool = it->second;
946 }
947 else {
948 if (!poolName.empty())
949 vinfolog("Creating pool %s", poolName);
950 pool = std::make_shared<ServerPool>();
951 pools.insert(std::pair<std::string,std::shared_ptr<ServerPool> >(poolName, pool));
952 }
953 return pool;
954 }
955
956 void setPoolPolicy(pools_t& pools, const string& poolName, std::shared_ptr<ServerPolicy> policy)
957 {
958 std::shared_ptr<ServerPool> pool = createPoolIfNotExists(pools, poolName);
959 if (!poolName.empty()) {
960 vinfolog("Setting pool %s server selection policy to %s", poolName, policy->name);
961 } else {
962 vinfolog("Setting default pool server selection policy to %s", policy->name);
963 }
964 pool->policy = policy;
965 }
966
967 void addServerToPool(pools_t& pools, const string& poolName, std::shared_ptr<DownstreamState> server)
968 {
969 std::shared_ptr<ServerPool> pool = createPoolIfNotExists(pools, poolName);
970 if (!poolName.empty()) {
971 vinfolog("Adding server to pool %s", poolName);
972 } else {
973 vinfolog("Adding server to default pool");
974 }
975 pool->addServer(server);
976 }
977
978 void removeServerFromPool(pools_t& pools, const string& poolName, std::shared_ptr<DownstreamState> server)
979 {
980 std::shared_ptr<ServerPool> pool = getPool(pools, poolName);
981
982 if (!poolName.empty()) {
983 vinfolog("Removing server from pool %s", poolName);
984 }
985 else {
986 vinfolog("Removing server from default pool");
987 }
988
989 pool->removeServer(server);
990 }
991
992 std::shared_ptr<ServerPool> getPool(const pools_t& pools, const std::string& poolName)
993 {
994 pools_t::const_iterator it = pools.find(poolName);
995
996 if (it == pools.end()) {
997 throw std::out_of_range("No pool named " + poolName);
998 }
999
1000 return it->second;
1001 }
1002
1003 NumberedServerVector getDownstreamCandidates(const pools_t& pools, const std::string& poolName)
1004 {
1005 std::shared_ptr<ServerPool> pool = getPool(pools, poolName);
1006 return pool->getServers();
1007 }
1008
1009 static void spoofResponseFromString(DNSQuestion& dq, const string& spoofContent)
1010 {
1011 string result;
1012
1013 std::vector<std::string> addrs;
1014 stringtok(addrs, spoofContent, " ,");
1015
1016 if (addrs.size() == 1) {
1017 try {
1018 ComboAddress spoofAddr(spoofContent);
1019 SpoofAction sa({spoofAddr});
1020 sa(&dq, &result);
1021 }
1022 catch(const PDNSException &e) {
1023 SpoofAction sa(spoofContent); // CNAME then
1024 sa(&dq, &result);
1025 }
1026 } else {
1027 std::vector<ComboAddress> cas;
1028 for (const auto& addr : addrs) {
1029 try {
1030 cas.push_back(ComboAddress(addr));
1031 }
1032 catch (...) {
1033 }
1034 }
1035 SpoofAction sa(cas);
1036 sa(&dq, &result);
1037 }
1038 }
1039
1040 bool processRulesResult(const DNSAction::Action& action, DNSQuestion& dq, std::string& ruleresult, bool& drop)
1041 {
1042 switch(action) {
1043 case DNSAction::Action::Allow:
1044 return true;
1045 break;
1046 case DNSAction::Action::Drop:
1047 ++g_stats.ruleDrop;
1048 drop = true;
1049 return true;
1050 break;
1051 case DNSAction::Action::Nxdomain:
1052 dq.dh->rcode = RCode::NXDomain;
1053 dq.dh->qr=true;
1054 ++g_stats.ruleNXDomain;
1055 return true;
1056 break;
1057 case DNSAction::Action::Refused:
1058 dq.dh->rcode = RCode::Refused;
1059 dq.dh->qr=true;
1060 ++g_stats.ruleRefused;
1061 return true;
1062 break;
1063 case DNSAction::Action::ServFail:
1064 dq.dh->rcode = RCode::ServFail;
1065 dq.dh->qr=true;
1066 ++g_stats.ruleServFail;
1067 return true;
1068 break;
1069 case DNSAction::Action::Spoof:
1070 spoofResponseFromString(dq, ruleresult);
1071 return true;
1072 break;
1073 case DNSAction::Action::Truncate:
1074 dq.dh->tc = true;
1075 dq.dh->qr = true;
1076 return true;
1077 break;
1078 case DNSAction::Action::HeaderModify:
1079 return true;
1080 break;
1081 case DNSAction::Action::Pool:
1082 dq.poolname=ruleresult;
1083 return true;
1084 break;
1085 case DNSAction::Action::NoRecurse:
1086 dq.dh->rd = false;
1087 return true;
1088 break;
1089 /* non-terminal actions follow */
1090 case DNSAction::Action::Delay:
1091 dq.delayMsec = static_cast<int>(pdns_stou(ruleresult)); // sorry
1092 break;
1093 case DNSAction::Action::None:
1094 /* fall-through */
1095 case DNSAction::Action::NoOp:
1096 break;
1097 }
1098
1099 /* false means that we don't stop the processing */
1100 return false;
1101 }
1102
1103
1104 static bool applyRulesToQuery(LocalHolders& holders, DNSQuestion& dq, const struct timespec& now)
1105 {
1106 g_rings.insertQuery(now, *dq.remote, *dq.qname, dq.qtype, dq.len, *dq.dh);
1107
1108 if(g_qcount.enabled) {
1109 string qname = (*dq.qname).toLogString();
1110 bool countQuery{true};
1111 if(g_qcount.filter) {
1112 std::lock_guard<std::mutex> lock(g_luamutex);
1113 std::tie (countQuery, qname) = g_qcount.filter(&dq);
1114 }
1115
1116 if(countQuery) {
1117 WriteLock wl(&g_qcount.queryLock);
1118 if(!g_qcount.records.count(qname)) {
1119 g_qcount.records[qname] = 0;
1120 }
1121 g_qcount.records[qname]++;
1122 }
1123 }
1124
1125 if(auto got = holders.dynNMGBlock->lookup(*dq.remote)) {
1126 auto updateBlockStats = [&got]() {
1127 ++g_stats.dynBlocked;
1128 got->second.blocks++;
1129 };
1130
1131 if(now < got->second.until) {
1132 DNSAction::Action action = got->second.action;
1133 if (action == DNSAction::Action::None) {
1134 action = g_dynBlockAction;
1135 }
1136 switch (action) {
1137 case DNSAction::Action::NoOp:
1138 /* do nothing */
1139 break;
1140
1141 case DNSAction::Action::Nxdomain:
1142 vinfolog("Query from %s turned into NXDomain because of dynamic block", dq.remote->toStringWithPort());
1143 updateBlockStats();
1144
1145 dq.dh->rcode = RCode::NXDomain;
1146 dq.dh->qr=true;
1147 return true;
1148
1149 case DNSAction::Action::Refused:
1150 vinfolog("Query from %s refused because of dynamic block", dq.remote->toStringWithPort());
1151 updateBlockStats();
1152
1153 dq.dh->rcode = RCode::Refused;
1154 dq.dh->qr = true;
1155 return true;
1156
1157 case DNSAction::Action::Truncate:
1158 if(!dq.tcp) {
1159 updateBlockStats();
1160 vinfolog("Query from %s truncated because of dynamic block", dq.remote->toStringWithPort());
1161 dq.dh->tc = true;
1162 dq.dh->qr = true;
1163 return true;
1164 }
1165 else {
1166 vinfolog("Query from %s for %s over TCP *not* truncated because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toLogString());
1167 }
1168 break;
1169 case DNSAction::Action::NoRecurse:
1170 updateBlockStats();
1171 vinfolog("Query from %s setting rd=0 because of dynamic block", dq.remote->toStringWithPort());
1172 dq.dh->rd = false;
1173 return true;
1174 default:
1175 updateBlockStats();
1176 vinfolog("Query from %s dropped because of dynamic block", dq.remote->toStringWithPort());
1177 return false;
1178 }
1179 }
1180 }
1181
1182 if(auto got = holders.dynSMTBlock->lookup(*dq.qname)) {
1183 auto updateBlockStats = [&got]() {
1184 ++g_stats.dynBlocked;
1185 got->blocks++;
1186 };
1187
1188 if(now < got->until) {
1189 DNSAction::Action action = got->action;
1190 if (action == DNSAction::Action::None) {
1191 action = g_dynBlockAction;
1192 }
1193 switch (action) {
1194 case DNSAction::Action::NoOp:
1195 /* do nothing */
1196 break;
1197 case DNSAction::Action::Nxdomain:
1198 vinfolog("Query from %s for %s turned into NXDomain because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toLogString());
1199 updateBlockStats();
1200
1201 dq.dh->rcode = RCode::NXDomain;
1202 dq.dh->qr=true;
1203 return true;
1204 case DNSAction::Action::Refused:
1205 vinfolog("Query from %s for %s refused because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toLogString());
1206 updateBlockStats();
1207
1208 dq.dh->rcode = RCode::Refused;
1209 dq.dh->qr=true;
1210 return true;
1211 case DNSAction::Action::Truncate:
1212 if(!dq.tcp) {
1213 updateBlockStats();
1214
1215 vinfolog("Query from %s for %s truncated because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toLogString());
1216 dq.dh->tc = true;
1217 dq.dh->qr = true;
1218 return true;
1219 }
1220 else {
1221 vinfolog("Query from %s for %s over TCP *not* truncated because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toLogString());
1222 }
1223 break;
1224 case DNSAction::Action::NoRecurse:
1225 updateBlockStats();
1226 vinfolog("Query from %s setting rd=0 because of dynamic block", dq.remote->toStringWithPort());
1227 dq.dh->rd = false;
1228 return true;
1229 default:
1230 updateBlockStats();
1231 vinfolog("Query from %s for %s dropped because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toLogString());
1232 return false;
1233 }
1234 }
1235 }
1236
1237 DNSAction::Action action=DNSAction::Action::None;
1238 string ruleresult;
1239 bool drop = false;
1240 for(const auto& lr : *holders.rulactions) {
1241 if(lr.d_rule->matches(&dq)) {
1242 lr.d_rule->d_matches++;
1243 action=(*lr.d_action)(&dq, &ruleresult);
1244 if (processRulesResult(action, dq, ruleresult, drop)) {
1245 break;
1246 }
1247 }
1248 }
1249
1250 if (drop) {
1251 return false;
1252 }
1253
1254 return true;
1255 }
1256
1257 ssize_t udpClientSendRequestToBackend(const std::shared_ptr<DownstreamState>& ss, const int sd, const char* request, const size_t requestLen, bool healthCheck)
1258 {
1259 ssize_t result;
1260
1261 if (ss->sourceItf == 0) {
1262 result = send(sd, request, requestLen, 0);
1263 }
1264 else {
1265 struct msghdr msgh;
1266 struct iovec iov;
1267 cmsgbuf_aligned cbuf;
1268 ComboAddress remote(ss->remote);
1269 fillMSGHdr(&msgh, &iov, &cbuf, sizeof(cbuf), const_cast<char*>(request), requestLen, &remote);
1270 addCMsgSrcAddr(&msgh, &cbuf, &ss->sourceAddr, ss->sourceItf);
1271 result = sendmsg(sd, &msgh, 0);
1272 }
1273
1274 if (result == -1) {
1275 int savederrno = errno;
1276 vinfolog("Error sending request to backend %s: %d", ss->remote.toStringWithPort(), savederrno);
1277
1278 /* This might sound silly, but on Linux send() might fail with EINVAL
1279 if the interface the socket was bound to doesn't exist anymore.
1280 We don't want to reconnect the real socket if the healthcheck failed,
1281 because it's not using the same socket.
1282 */
1283 if (!healthCheck && (savederrno == EINVAL || savederrno == ENODEV)) {
1284 ss->reconnect();
1285 }
1286 }
1287
1288 return result;
1289 }
1290
1291 static bool isUDPQueryAcceptable(ClientState& cs, LocalHolders& holders, const struct msghdr* msgh, const ComboAddress& remote, ComboAddress& dest)
1292 {
1293 if (msgh->msg_flags & MSG_TRUNC) {
1294 /* message was too large for our buffer */
1295 vinfolog("Dropping message too large for our buffer");
1296 ++g_stats.nonCompliantQueries;
1297 return false;
1298 }
1299
1300 if(!holders.acl->match(remote)) {
1301 vinfolog("Query from %s dropped because of ACL", remote.toStringWithPort());
1302 ++g_stats.aclDrops;
1303 return false;
1304 }
1305
1306 cs.queries++;
1307 ++g_stats.queries;
1308
1309 if (HarvestDestinationAddress(msgh, &dest)) {
1310 /* we don't get the port, only the address */
1311 dest.sin4.sin_port = cs.local.sin4.sin_port;
1312 }
1313 else {
1314 dest.sin4.sin_family = 0;
1315 }
1316
1317 return true;
1318 }
1319
1320 boost::optional<std::vector<uint8_t>> checkDNSCryptQuery(const ClientState& cs, const char* query, uint16_t& len, std::shared_ptr<DNSCryptQuery>& dnsCryptQuery, time_t now, bool tcp)
1321 {
1322 if (cs.dnscryptCtx) {
1323 #ifdef HAVE_DNSCRYPT
1324 vector<uint8_t> response;
1325 uint16_t decryptedQueryLen = 0;
1326
1327 dnsCryptQuery = std::make_shared<DNSCryptQuery>(cs.dnscryptCtx);
1328
1329 bool decrypted = handleDNSCryptQuery(const_cast<char*>(query), len, dnsCryptQuery, &decryptedQueryLen, tcp, now, response);
1330
1331 if (!decrypted) {
1332 if (response.size() > 0) {
1333 return response;
1334 }
1335 throw std::runtime_error("Unable to decrypt DNSCrypt query, dropping.");
1336 }
1337
1338 len = decryptedQueryLen;
1339 #endif /* HAVE_DNSCRYPT */
1340 }
1341 return boost::none;
1342 }
1343
1344 bool checkQueryHeaders(const struct dnsheader* dh)
1345 {
1346 if (dh->qr) { // don't respond to responses
1347 ++g_stats.nonCompliantQueries;
1348 return false;
1349 }
1350
1351 if (dh->qdcount == 0) {
1352 ++g_stats.emptyQueries;
1353 return false;
1354 }
1355
1356 if (dh->rd) {
1357 ++g_stats.rdQueries;
1358 }
1359
1360 return true;
1361 }
1362
1363 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1364 static void queueResponse(const ClientState& cs, const char* response, uint16_t responseLen, const ComboAddress& dest, const ComboAddress& remote, struct mmsghdr& outMsg, struct iovec* iov, cmsgbuf_aligned* cbuf)
1365 {
1366 outMsg.msg_len = 0;
1367 fillMSGHdr(&outMsg.msg_hdr, iov, nullptr, 0, const_cast<char*>(response), responseLen, const_cast<ComboAddress*>(&remote));
1368
1369 if (dest.sin4.sin_family == 0) {
1370 outMsg.msg_hdr.msg_control = nullptr;
1371 }
1372 else {
1373 addCMsgSrcAddr(&outMsg.msg_hdr, cbuf, &dest, 0);
1374 }
1375 }
1376 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1377
1378 /* self-generated responses or cache hits */
1379 static bool prepareOutgoingResponse(LocalHolders& holders, ClientState& cs, DNSQuestion& dq, bool cacheHit)
1380 {
1381 DNSResponse dr(dq.qname, dq.qtype, dq.qclass, dq.consumed, dq.local, dq.remote, reinterpret_cast<dnsheader*>(dq.dh), dq.size, dq.len, dq.tcp, dq.queryTime);
1382
1383 #ifdef HAVE_PROTOBUF
1384 dr.uniqueId = dq.uniqueId;
1385 #endif
1386 dr.qTag = dq.qTag;
1387 dr.delayMsec = dq.delayMsec;
1388
1389 if (!applyRulesToResponse(cacheHit ? holders.cacheHitRespRulactions : holders.selfAnsweredRespRulactions, dr)) {
1390 return false;
1391 }
1392
1393 /* in case a rule changed it */
1394 dq.delayMsec = dr.delayMsec;
1395
1396 #ifdef HAVE_DNSCRYPT
1397 if (!cs.muted) {
1398 if (!encryptResponse(reinterpret_cast<char*>(dq.dh), &dq.len, dq.size, dq.tcp, dq.dnsCryptQuery, nullptr, nullptr)) {
1399 return false;
1400 }
1401 }
1402 #endif /* HAVE_DNSCRYPT */
1403
1404 if (cacheHit) {
1405 ++g_stats.cacheHits;
1406 }
1407
1408 switch (dr.dh->rcode) {
1409 case RCode::NXDomain:
1410 ++g_stats.frontendNXDomain;
1411 break;
1412 case RCode::ServFail:
1413 ++g_stats.frontendServFail;
1414 break;
1415 case RCode::NoError:
1416 ++g_stats.frontendNoError;
1417 break;
1418 }
1419
1420 doLatencyStats(0); // we're not going to measure this
1421 return true;
1422 }
1423
1424 ProcessQueryResult processQuery(DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend)
1425 {
1426 const uint16_t queryId = ntohs(dq.dh->id);
1427
1428 try {
1429 /* we need an accurate ("real") value for the response and
1430 to store into the IDS, but not for insertion into the
1431 rings for example */
1432 struct timespec now;
1433 gettime(&now);
1434
1435 if (!applyRulesToQuery(holders, dq, now)) {
1436 return ProcessQueryResult::Drop;
1437 }
1438
1439 if(dq.dh->qr) { // something turned it into a response
1440 fixUpQueryTurnedResponse(dq, dq.origFlags);
1441
1442 if (!prepareOutgoingResponse(holders, cs, dq, false)) {
1443 return ProcessQueryResult::Drop;
1444 }
1445
1446 ++g_stats.selfAnswered;
1447 ++cs.responses;
1448 return ProcessQueryResult::SendAnswer;
1449 }
1450
1451 std::shared_ptr<ServerPool> serverPool = getPool(*holders.pools, dq.poolname);
1452 dq.packetCache = serverPool->packetCache;
1453 auto policy = *(holders.policy);
1454 if (serverPool->policy != nullptr) {
1455 policy = *(serverPool->policy);
1456 }
1457 auto servers = serverPool->getServers();
1458 if (policy.isLua) {
1459 std::lock_guard<std::mutex> lock(g_luamutex);
1460 selectedBackend = policy.policy(servers, &dq);
1461 }
1462 else {
1463 selectedBackend = policy.policy(servers, &dq);
1464 }
1465
1466 uint16_t cachedResponseSize = dq.size;
1467 uint32_t allowExpired = selectedBackend ? 0 : g_staleCacheEntriesTTL;
1468
1469 if (dq.packetCache && !dq.skipCache) {
1470 dq.dnssecOK = (getEDNSZ(dq) & EDNS_HEADER_FLAG_DO);
1471 }
1472
1473 if (dq.useECS && ((selectedBackend && selectedBackend->useECS) || (!selectedBackend && serverPool->getECS()))) {
1474 // we special case our cache in case a downstream explicitly gave us a universally valid response with a 0 scope
1475 if (dq.packetCache && !dq.skipCache && (!selectedBackend || !selectedBackend->disableZeroScope) && dq.packetCache->isECSParsingEnabled()) {
1476 if (dq.packetCache->get(dq, dq.consumed, dq.dh->id, reinterpret_cast<char*>(dq.dh), &cachedResponseSize, &dq.cacheKeyNoECS, dq.subnet, dq.dnssecOK, allowExpired)) {
1477 dq.len = cachedResponseSize;
1478
1479 if (!prepareOutgoingResponse(holders, cs, dq, true)) {
1480 return ProcessQueryResult::Drop;
1481 }
1482
1483 return ProcessQueryResult::SendAnswer;
1484 }
1485
1486 if (!dq.subnet) {
1487 /* there was no existing ECS on the query, enable the zero-scope feature */
1488 dq.useZeroScope = true;
1489 }
1490 }
1491
1492 if (!handleEDNSClientSubnet(dq, &(dq.ednsAdded), &(dq.ecsAdded), g_preserveTrailingData)) {
1493 vinfolog("Dropping query from %s because we couldn't insert the ECS value", dq.remote->toStringWithPort());
1494 return ProcessQueryResult::Drop;
1495 }
1496 }
1497
1498 if (dq.packetCache && !dq.skipCache) {
1499 if (dq.packetCache->get(dq, dq.consumed, dq.dh->id, reinterpret_cast<char*>(dq.dh), &cachedResponseSize, &dq.cacheKey, dq.subnet, dq.dnssecOK, allowExpired)) {
1500 dq.len = cachedResponseSize;
1501
1502 if (!prepareOutgoingResponse(holders, cs, dq, true)) {
1503 return ProcessQueryResult::Drop;
1504 }
1505
1506 return ProcessQueryResult::SendAnswer;
1507 }
1508 ++g_stats.cacheMisses;
1509 }
1510
1511 if(!selectedBackend) {
1512 ++g_stats.noPolicy;
1513
1514 vinfolog("%s query for %s|%s from %s, no policy applied", g_servFailOnNoPolicy ? "ServFailed" : "Dropped", dq.qname->toLogString(), QType(dq.qtype).getName(), dq.remote->toStringWithPort());
1515 if (g_servFailOnNoPolicy) {
1516 restoreFlags(dq.dh, dq.origFlags);
1517
1518 dq.dh->rcode = RCode::ServFail;
1519 dq.dh->qr = true;
1520
1521 if (!prepareOutgoingResponse(holders, cs, dq, false)) {
1522 return ProcessQueryResult::Drop;
1523 }
1524 // no response-only statistics counter to update.
1525 return ProcessQueryResult::SendAnswer;
1526 }
1527
1528 return ProcessQueryResult::Drop;
1529 }
1530
1531 if (dq.addXPF && selectedBackend->xpfRRCode != 0) {
1532 addXPF(dq, selectedBackend->xpfRRCode, g_preserveTrailingData);
1533 }
1534
1535 selectedBackend->queries++;
1536 return ProcessQueryResult::PassToBackend;
1537 }
1538 catch(const std::exception& e){
1539 vinfolog("Got an error while parsing a %s query from %s, id %d: %s", (dq.tcp ? "TCP" : "UDP"), dq.remote->toStringWithPort(), queryId, e.what());
1540 }
1541 return ProcessQueryResult::Drop;
1542 }
1543
1544 static void processUDPQuery(ClientState& cs, LocalHolders& holders, const struct msghdr* msgh, const ComboAddress& remote, ComboAddress& dest, char* query, uint16_t len, size_t queryBufferSize, struct mmsghdr* responsesVect, unsigned int* queuedResponses, struct iovec* respIOV, cmsgbuf_aligned* respCBuf)
1545 {
1546 assert(responsesVect == nullptr || (queuedResponses != nullptr && respIOV != nullptr && respCBuf != nullptr));
1547 uint16_t queryId = 0;
1548
1549 try {
1550 if (!isUDPQueryAcceptable(cs, holders, msgh, remote, dest)) {
1551 return;
1552 }
1553
1554 /* we need an accurate ("real") value for the response and
1555 to store into the IDS, but not for insertion into the
1556 rings for example */
1557 struct timespec queryRealTime;
1558 gettime(&queryRealTime, true);
1559
1560 std::shared_ptr<DNSCryptQuery> dnsCryptQuery = nullptr;
1561 auto dnsCryptResponse = checkDNSCryptQuery(cs, query, len, dnsCryptQuery, queryRealTime.tv_sec, false);
1562 if (dnsCryptResponse) {
1563 sendUDPResponse(cs.udpFD, reinterpret_cast<char*>(dnsCryptResponse->data()), static_cast<uint16_t>(dnsCryptResponse->size()), 0, dest, remote);
1564 return;
1565 }
1566
1567 struct dnsheader* dh = reinterpret_cast<struct dnsheader*>(query);
1568 queryId = ntohs(dh->id);
1569
1570 if (!checkQueryHeaders(dh)) {
1571 return;
1572 }
1573
1574 uint16_t qtype, qclass;
1575 unsigned int consumed = 0;
1576 DNSName qname(query, len, sizeof(dnsheader), false, &qtype, &qclass, &consumed);
1577 DNSQuestion dq(&qname, qtype, qclass, consumed, dest.sin4.sin_family != 0 ? &dest : &cs.local, &remote, dh, queryBufferSize, len, false, &queryRealTime);
1578 dq.dnsCryptQuery = std::move(dnsCryptQuery);
1579 std::shared_ptr<DownstreamState> ss{nullptr};
1580 auto result = processQuery(dq, cs, holders, ss);
1581
1582 if (result == ProcessQueryResult::Drop) {
1583 return;
1584 }
1585
1586 if (result == ProcessQueryResult::SendAnswer) {
1587 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1588 if (dq.delayMsec == 0 && responsesVect != nullptr) {
1589 queueResponse(cs, reinterpret_cast<char*>(dq.dh), dq.len, *dq.local, *dq.remote, responsesVect[*queuedResponses], respIOV, respCBuf);
1590 (*queuedResponses)++;
1591 return;
1592 }
1593 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1594 /* we use dest, always, because we don't want to use the listening address to send a response since it could be 0.0.0.0 */
1595 sendUDPResponse(cs.udpFD, reinterpret_cast<char*>(dq.dh), dq.len, dq.delayMsec, dest, *dq.remote);
1596 return;
1597 }
1598
1599 if (result != ProcessQueryResult::PassToBackend || ss == nullptr) {
1600 return;
1601 }
1602
1603 unsigned int idOffset = (ss->idOffset++) % ss->idStates.size();
1604 IDState* ids = &ss->idStates[idOffset];
1605 ids->age = 0;
1606 DOHUnit* du = nullptr;
1607
1608 /* that means that the state was in use, possibly with an allocated
1609 DOHUnit that we will need to handle, but we can't touch it before
1610 confirming that we now own this state */
1611 if (ids->isInUse()) {
1612 du = ids->du;
1613 }
1614
1615 /* we atomically replace the value, we now own this state */
1616 if (!ids->markAsUsed()) {
1617 /* the state was not in use.
1618 we reset 'du' because it might have still been in use when we read it. */
1619 du = nullptr;
1620 ++ss->outstanding;
1621 }
1622 else {
1623 /* we are reusing a state, no change in outstanding but if there was an existing DOHUnit we need
1624 to handle it because it's about to be overwritten. */
1625 ids->du = nullptr;
1626 ++ss->reuseds;
1627 ++g_stats.downstreamTimeouts;
1628 handleDOHTimeout(du);
1629 }
1630
1631 ids->cs = &cs;
1632 ids->origFD = cs.udpFD;
1633 ids->origID = dh->id;
1634 setIDStateFromDNSQuestion(*ids, dq, std::move(qname));
1635
1636 /* If we couldn't harvest the real dest addr, still
1637 write down the listening addr since it will be useful
1638 (especially if it's not an 'any' one).
1639 We need to keep track of which one it is since we may
1640 want to use the real but not the listening addr to reply.
1641 */
1642 if (dest.sin4.sin_family != 0) {
1643 ids->origDest = dest;
1644 ids->destHarvested = true;
1645 }
1646 else {
1647 ids->origDest = cs.local;
1648 ids->destHarvested = false;
1649 }
1650
1651 dh->id = idOffset;
1652
1653 int fd = pickBackendSocketForSending(ss);
1654 ssize_t ret = udpClientSendRequestToBackend(ss, fd, query, dq.len);
1655
1656 if(ret < 0) {
1657 ++ss->sendErrors;
1658 ++g_stats.downstreamSendErrors;
1659 }
1660
1661 vinfolog("Got query for %s|%s from %s, relayed to %s", ids->qname.toLogString(), QType(ids->qtype).getName(), remote.toStringWithPort(), ss->getName());
1662 }
1663 catch(const std::exception& e){
1664 vinfolog("Got an error in UDP question thread while parsing a query from %s, id %d: %s", remote.toStringWithPort(), queryId, e.what());
1665 }
1666 }
1667
1668 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1669 static void MultipleMessagesUDPClientThread(ClientState* cs, LocalHolders& holders)
1670 {
1671 struct MMReceiver
1672 {
1673 char packet[4096];
1674 ComboAddress remote;
1675 ComboAddress dest;
1676 struct iovec iov;
1677 /* used by HarvestDestinationAddress */
1678 cmsgbuf_aligned cbuf;
1679 };
1680 const size_t vectSize = g_udpVectorSize;
1681 /* the actual buffer is larger because:
1682 - we may have to add EDNS and/or ECS
1683 - we use it for self-generated responses (from rule or cache)
1684 but we only accept incoming payloads up to that size
1685 */
1686 static_assert(s_udpIncomingBufferSize <= sizeof(MMReceiver::packet), "the incoming buffer size should not be larger than sizeof(MMReceiver::packet)");
1687
1688 auto recvData = std::unique_ptr<MMReceiver[]>(new MMReceiver[vectSize]);
1689 auto msgVec = std::unique_ptr<struct mmsghdr[]>(new struct mmsghdr[vectSize]);
1690 auto outMsgVec = std::unique_ptr<struct mmsghdr[]>(new struct mmsghdr[vectSize]);
1691
1692 /* initialize the structures needed to receive our messages */
1693 for (size_t idx = 0; idx < vectSize; idx++) {
1694 recvData[idx].remote.sin4.sin_family = cs->local.sin4.sin_family;
1695 fillMSGHdr(&msgVec[idx].msg_hdr, &recvData[idx].iov, &recvData[idx].cbuf, sizeof(recvData[idx].cbuf), recvData[idx].packet, s_udpIncomingBufferSize, &recvData[idx].remote);
1696 }
1697
1698 /* go now */
1699 for(;;) {
1700
1701 /* reset the IO vector, since it's also used to send the vector of responses
1702 to avoid having to copy the data around */
1703 for (size_t idx = 0; idx < vectSize; idx++) {
1704 recvData[idx].iov.iov_base = recvData[idx].packet;
1705 recvData[idx].iov.iov_len = sizeof(recvData[idx].packet);
1706 }
1707
1708 /* block until we have at least one message ready, but return
1709 as many as possible to save the syscall costs */
1710 int msgsGot = recvmmsg(cs->udpFD, msgVec.get(), vectSize, MSG_WAITFORONE | MSG_TRUNC, nullptr);
1711
1712 if (msgsGot <= 0) {
1713 vinfolog("Getting UDP messages via recvmmsg() failed with: %s", stringerror());
1714 continue;
1715 }
1716
1717 unsigned int msgsToSend = 0;
1718
1719 /* process the received messages */
1720 for (int msgIdx = 0; msgIdx < msgsGot; msgIdx++) {
1721 const struct msghdr* msgh = &msgVec[msgIdx].msg_hdr;
1722 unsigned int got = msgVec[msgIdx].msg_len;
1723 const ComboAddress& remote = recvData[msgIdx].remote;
1724
1725 if (static_cast<size_t>(got) < sizeof(struct dnsheader)) {
1726 ++g_stats.nonCompliantQueries;
1727 continue;
1728 }
1729
1730 processUDPQuery(*cs, holders, msgh, remote, recvData[msgIdx].dest, recvData[msgIdx].packet, static_cast<uint16_t>(got), sizeof(recvData[msgIdx].packet), outMsgVec.get(), &msgsToSend, &recvData[msgIdx].iov, &recvData[msgIdx].cbuf);
1731
1732 }
1733
1734 /* immediate (not delayed or sent to a backend) responses (mostly from a rule, dynamic block
1735 or the cache) can be sent in batch too */
1736
1737 if (msgsToSend > 0 && msgsToSend <= static_cast<unsigned int>(msgsGot)) {
1738 int sent = sendmmsg(cs->udpFD, outMsgVec.get(), msgsToSend, 0);
1739
1740 if (sent < 0 || static_cast<unsigned int>(sent) != msgsToSend) {
1741 vinfolog("Error sending responses with sendmmsg() (%d on %u): %s", sent, msgsToSend, stringerror());
1742 }
1743 }
1744
1745 }
1746 }
1747 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1748
1749 // listens to incoming queries, sends out to downstream servers, noting the intended return path
1750 static void udpClientThread(ClientState* cs)
1751 try
1752 {
1753 setThreadName("dnsdist/udpClie");
1754 LocalHolders holders;
1755
1756 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1757 if (g_udpVectorSize > 1) {
1758 MultipleMessagesUDPClientThread(cs, holders);
1759
1760 }
1761 else
1762 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1763 {
1764 char packet[4096];
1765 /* the actual buffer is larger because:
1766 - we may have to add EDNS and/or ECS
1767 - we use it for self-generated responses (from rule or cache)
1768 but we only accept incoming payloads up to that size
1769 */
1770 static_assert(s_udpIncomingBufferSize <= sizeof(packet), "the incoming buffer size should not be larger than sizeof(MMReceiver::packet)");
1771 struct msghdr msgh;
1772 struct iovec iov;
1773 /* used by HarvestDestinationAddress */
1774 cmsgbuf_aligned cbuf;
1775
1776 ComboAddress remote;
1777 ComboAddress dest;
1778 remote.sin4.sin_family = cs->local.sin4.sin_family;
1779 fillMSGHdr(&msgh, &iov, &cbuf, sizeof(cbuf), packet, sizeof(packet), &remote);
1780
1781 for(;;) {
1782 ssize_t got = recvmsg(cs->udpFD, &msgh, 0);
1783
1784 if (got < 0 || static_cast<size_t>(got) < sizeof(struct dnsheader)) {
1785 ++g_stats.nonCompliantQueries;
1786 continue;
1787 }
1788
1789 processUDPQuery(*cs, holders, &msgh, remote, dest, packet, static_cast<uint16_t>(got), s_udpIncomingBufferSize, nullptr, nullptr, nullptr, nullptr);
1790 }
1791 }
1792 }
1793 catch(const std::exception &e)
1794 {
1795 errlog("UDP client thread died because of exception: %s", e.what());
1796 }
1797 catch(const PDNSException &e)
1798 {
1799 errlog("UDP client thread died because of PowerDNS exception: %s", e.reason);
1800 }
1801 catch(...)
1802 {
1803 errlog("UDP client thread died because of an exception: %s", "unknown");
1804 }
1805
1806 uint16_t getRandomDNSID()
1807 {
1808 #ifdef HAVE_LIBSODIUM
1809 return (randombytes_random() % 65536);
1810 #else
1811 return (random() % 65536);
1812 #endif
1813 }
1814
1815 static bool upCheck(const shared_ptr<DownstreamState>& ds)
1816 try
1817 {
1818 DNSName checkName = ds->checkName;
1819 uint16_t checkType = ds->checkType.getCode();
1820 uint16_t checkClass = ds->checkClass;
1821 dnsheader checkHeader;
1822 memset(&checkHeader, 0, sizeof(checkHeader));
1823
1824 checkHeader.qdcount = htons(1);
1825 checkHeader.id = getRandomDNSID();
1826
1827 checkHeader.rd = true;
1828 if (ds->setCD) {
1829 checkHeader.cd = true;
1830 }
1831
1832 if (ds->checkFunction) {
1833 std::lock_guard<std::mutex> lock(g_luamutex);
1834 auto ret = ds->checkFunction(checkName, checkType, checkClass, &checkHeader);
1835 checkName = std::get<0>(ret);
1836 checkType = std::get<1>(ret);
1837 checkClass = std::get<2>(ret);
1838 }
1839
1840 vector<uint8_t> packet;
1841 DNSPacketWriter dpw(packet, checkName, checkType, checkClass);
1842 dnsheader * requestHeader = dpw.getHeader();
1843 *requestHeader = checkHeader;
1844
1845 Socket sock(ds->remote.sin4.sin_family, SOCK_DGRAM);
1846 sock.setNonBlocking();
1847 if (!IsAnyAddress(ds->sourceAddr)) {
1848 sock.setReuseAddr();
1849 sock.bind(ds->sourceAddr);
1850 }
1851 sock.connect(ds->remote);
1852 ssize_t sent = udpClientSendRequestToBackend(ds, sock.getHandle(), reinterpret_cast<char*>(&packet[0]), packet.size(), true);
1853 if (sent < 0) {
1854 int ret = errno;
1855 if (g_verboseHealthChecks)
1856 infolog("Error while sending a health check query to backend %s: %d", ds->getNameWithAddr(), ret);
1857 return false;
1858 }
1859
1860 int ret = waitForRWData(sock.getHandle(), true, /* ms to seconds */ ds->checkTimeout / 1000, /* remaining ms to us */ (ds->checkTimeout % 1000) * 1000);
1861 if(ret < 0 || !ret) { // error, timeout, both are down!
1862 if (ret < 0) {
1863 ret = errno;
1864 if (g_verboseHealthChecks)
1865 infolog("Error while waiting for the health check response from backend %s: %d", ds->getNameWithAddr(), ret);
1866 }
1867 else {
1868 if (g_verboseHealthChecks)
1869 infolog("Timeout while waiting for the health check response from backend %s", ds->getNameWithAddr());
1870 }
1871 return false;
1872 }
1873
1874 string reply;
1875 ComboAddress from;
1876 sock.recvFrom(reply, from);
1877
1878 /* we are using a connected socket but hey.. */
1879 if (from != ds->remote) {
1880 if (g_verboseHealthChecks)
1881 infolog("Invalid health check response received from %s, expecting one from %s", from.toStringWithPort(), ds->remote.toStringWithPort());
1882 return false;
1883 }
1884
1885 const dnsheader * responseHeader = reinterpret_cast<const dnsheader *>(reply.c_str());
1886
1887 if (reply.size() < sizeof(*responseHeader)) {
1888 if (g_verboseHealthChecks)
1889 infolog("Invalid health check response of size %d from backend %s, expecting at least %d", reply.size(), ds->getNameWithAddr(), sizeof(*responseHeader));
1890 return false;
1891 }
1892
1893 if (responseHeader->id != requestHeader->id) {
1894 if (g_verboseHealthChecks)
1895 infolog("Invalid health check response id %d from backend %s, expecting %d", responseHeader->id, ds->getNameWithAddr(), requestHeader->id);
1896 return false;
1897 }
1898
1899 if (!responseHeader->qr) {
1900 if (g_verboseHealthChecks)
1901 infolog("Invalid health check response from backend %s, expecting QR to be set", ds->getNameWithAddr());
1902 return false;
1903 }
1904
1905 if (responseHeader->rcode == RCode::ServFail) {
1906 if (g_verboseHealthChecks)
1907 infolog("Backend %s responded to health check with ServFail", ds->getNameWithAddr());
1908 return false;
1909 }
1910
1911 if (ds->mustResolve && (responseHeader->rcode == RCode::NXDomain || responseHeader->rcode == RCode::Refused)) {
1912 if (g_verboseHealthChecks)
1913 infolog("Backend %s responded to health check with %s while mustResolve is set", ds->getNameWithAddr(), responseHeader->rcode == RCode::NXDomain ? "NXDomain" : "Refused");
1914 return false;
1915 }
1916
1917 uint16_t receivedType;
1918 uint16_t receivedClass;
1919 DNSName receivedName(reply.c_str(), reply.size(), sizeof(dnsheader), false, &receivedType, &receivedClass);
1920
1921 if (receivedName != checkName || receivedType != checkType || receivedClass != checkClass) {
1922 if (g_verboseHealthChecks)
1923 infolog("Backend %s responded to health check with an invalid qname (%s vs %s), qtype (%s vs %s) or qclass (%d vs %d)", ds->getNameWithAddr(), receivedName.toLogString(), checkName.toLogString(), QType(receivedType).getName(), QType(checkType).getName(), receivedClass, checkClass);
1924 return false;
1925 }
1926
1927 return true;
1928 }
1929 catch(const std::exception& e)
1930 {
1931 if (g_verboseHealthChecks)
1932 infolog("Error checking the health of backend %s: %s", ds->getNameWithAddr(), e.what());
1933 return false;
1934 }
1935 catch(...)
1936 {
1937 if (g_verboseHealthChecks)
1938 infolog("Unknown exception while checking the health of backend %s", ds->getNameWithAddr());
1939 return false;
1940 }
1941
1942 uint64_t g_maxTCPClientThreads{10};
1943 std::atomic<uint16_t> g_cacheCleaningDelay{60};
1944 std::atomic<uint16_t> g_cacheCleaningPercentage{100};
1945
1946 void maintThread()
1947 {
1948 setThreadName("dnsdist/main");
1949 int interval = 1;
1950 size_t counter = 0;
1951 int32_t secondsToWaitLog = 0;
1952
1953 for(;;) {
1954 sleep(interval);
1955
1956 {
1957 std::lock_guard<std::mutex> lock(g_luamutex);
1958 auto f = g_lua.readVariable<boost::optional<std::function<void()> > >("maintenance");
1959 if(f) {
1960 try {
1961 (*f)();
1962 secondsToWaitLog = 0;
1963 }
1964 catch(std::exception &e) {
1965 if (secondsToWaitLog <= 0) {
1966 infolog("Error during execution of maintenance function: %s", e.what());
1967 secondsToWaitLog = 61;
1968 }
1969 secondsToWaitLog -= interval;
1970 }
1971 }
1972 }
1973
1974 counter++;
1975 if (counter >= g_cacheCleaningDelay) {
1976 /* keep track, for each cache, of whether we should keep
1977 expired entries */
1978 std::map<std::shared_ptr<DNSDistPacketCache>, bool> caches;
1979
1980 /* gather all caches actually used by at least one pool, and see
1981 if something prevents us from cleaning the expired entries */
1982 auto localPools = g_pools.getLocal();
1983 for (const auto& entry : *localPools) {
1984 auto& pool = entry.second;
1985
1986 auto packetCache = pool->packetCache;
1987 if (!packetCache) {
1988 continue;
1989 }
1990
1991 auto pair = caches.insert({packetCache, false});
1992 auto& iter = pair.first;
1993 /* if we need to keep stale data for this cache (ie, not clear
1994 expired entries when at least one pool using this cache
1995 has all its backends down) */
1996 if (packetCache->keepStaleData() && iter->second == false) {
1997 /* so far all pools had at least one backend up */
1998 if (pool->countServers(true) == 0) {
1999 iter->second = true;
2000 }
2001 }
2002 }
2003
2004 for (auto pair : caches) {
2005 /* shall we keep expired entries ? */
2006 if (pair.second == true) {
2007 continue;
2008 }
2009 auto& packetCache = pair.first;
2010 size_t upTo = (packetCache->getMaxEntries()* (100 - g_cacheCleaningPercentage)) / 100;
2011 packetCache->purgeExpired(upTo);
2012 }
2013 counter = 0;
2014 }
2015
2016 // ponder pruning g_dynblocks of expired entries here
2017 }
2018 }
2019
2020 static void secPollThread()
2021 {
2022 setThreadName("dnsdist/secpoll");
2023
2024 for (;;) {
2025 try {
2026 doSecPoll(g_secPollSuffix);
2027 }
2028 catch(...) {
2029 }
2030 sleep(g_secPollInterval);
2031 }
2032 }
2033
2034 static void healthChecksThread()
2035 {
2036 setThreadName("dnsdist/healthC");
2037
2038 int interval = 1;
2039
2040 for(;;) {
2041 sleep(interval);
2042
2043 if(g_tcpclientthreads->getQueuedCount() > 1 && !g_tcpclientthreads->hasReachedMaxThreads())
2044 g_tcpclientthreads->addTCPClientThread();
2045
2046 auto states = g_dstates.getLocal(); // this points to the actual shared_ptrs!
2047 for(auto& dss : *states) {
2048 if(++dss->lastCheck < dss->checkInterval)
2049 continue;
2050 dss->lastCheck = 0;
2051 if(dss->availability==DownstreamState::Availability::Auto) {
2052 bool newState=upCheck(dss);
2053 if (newState) {
2054 /* check succeeded */
2055 dss->currentCheckFailures = 0;
2056
2057 if (!dss->upStatus) {
2058 /* we were marked as down */
2059 dss->consecutiveSuccessfulChecks++;
2060 if (dss->consecutiveSuccessfulChecks < dss->minRiseSuccesses) {
2061 /* if we need more than one successful check to rise
2062 and we didn't reach the threshold yet,
2063 let's stay down */
2064 newState = false;
2065 }
2066 }
2067 }
2068 else {
2069 /* check failed */
2070 dss->consecutiveSuccessfulChecks = 0;
2071
2072 if (dss->upStatus) {
2073 /* we are currently up */
2074 dss->currentCheckFailures++;
2075 if (dss->currentCheckFailures < dss->maxCheckFailures) {
2076 /* we need more than one failure to be marked as down,
2077 and we did not reach the threshold yet, let's stay down */
2078 newState = true;
2079 }
2080 }
2081 }
2082
2083 if(newState != dss->upStatus) {
2084 warnlog("Marking downstream %s as '%s'", dss->getNameWithAddr(), newState ? "up" : "down");
2085
2086 if (newState && !dss->connected) {
2087 newState = dss->reconnect();
2088
2089 if (dss->connected && !dss->threadStarted.test_and_set()) {
2090 dss->tid = thread(responderThread, dss);
2091 }
2092 }
2093
2094 dss->upStatus = newState;
2095 dss->currentCheckFailures = 0;
2096 dss->consecutiveSuccessfulChecks = 0;
2097 if (g_snmpAgent && g_snmpTrapsEnabled) {
2098 g_snmpAgent->sendBackendStatusChangeTrap(dss);
2099 }
2100 }
2101 }
2102
2103 auto delta = dss->sw.udiffAndSet()/1000000.0;
2104 dss->queryLoad = 1.0*(dss->queries.load() - dss->prev.queries.load())/delta;
2105 dss->dropRate = 1.0*(dss->reuseds.load() - dss->prev.reuseds.load())/delta;
2106 dss->prev.queries.store(dss->queries.load());
2107 dss->prev.reuseds.store(dss->reuseds.load());
2108
2109 for(IDState& ids : dss->idStates) { // timeouts
2110 int64_t usageIndicator = ids.usageIndicator;
2111 if(IDState::isInUse(usageIndicator) && ids.age++ > g_udpTimeout) {
2112 /* We mark the state as unused as soon as possible
2113 to limit the risk of racing with the
2114 responder thread.
2115 */
2116 auto oldDU = ids.du;
2117
2118 if (!ids.tryMarkUnused(usageIndicator)) {
2119 /* this state has been altered in the meantime,
2120 don't go anywhere near it */
2121 continue;
2122 }
2123 ids.du = nullptr;
2124 handleDOHTimeout(oldDU);
2125 ids.age = 0;
2126 dss->reuseds++;
2127 --dss->outstanding;
2128 ++g_stats.downstreamTimeouts; // this is an 'actively' discovered timeout
2129 vinfolog("Had a downstream timeout from %s (%s) for query for %s|%s from %s",
2130 dss->remote.toStringWithPort(), dss->name,
2131 ids.qname.toLogString(), QType(ids.qtype).getName(), ids.origRemote.toStringWithPort());
2132
2133 struct timespec ts;
2134 gettime(&ts);
2135
2136 struct dnsheader fake;
2137 memset(&fake, 0, sizeof(fake));
2138 fake.id = ids.origID;
2139
2140 g_rings.insertResponse(ts, ids.origRemote, ids.qname, ids.qtype, std::numeric_limits<unsigned int>::max(), 0, fake, dss->remote);
2141 }
2142 }
2143 }
2144 }
2145 }
2146
2147 static void bindAny(int af, int sock)
2148 {
2149 __attribute__((unused)) int one = 1;
2150
2151 #ifdef IP_FREEBIND
2152 if (setsockopt(sock, IPPROTO_IP, IP_FREEBIND, &one, sizeof(one)) < 0)
2153 warnlog("Warning: IP_FREEBIND setsockopt failed: %s", stringerror());
2154 #endif
2155
2156 #ifdef IP_BINDANY
2157 if (af == AF_INET)
2158 if (setsockopt(sock, IPPROTO_IP, IP_BINDANY, &one, sizeof(one)) < 0)
2159 warnlog("Warning: IP_BINDANY setsockopt failed: %s", stringerror());
2160 #endif
2161 #ifdef IPV6_BINDANY
2162 if (af == AF_INET6)
2163 if (setsockopt(sock, IPPROTO_IPV6, IPV6_BINDANY, &one, sizeof(one)) < 0)
2164 warnlog("Warning: IPV6_BINDANY setsockopt failed: %s", stringerror());
2165 #endif
2166 #ifdef SO_BINDANY
2167 if (setsockopt(sock, SOL_SOCKET, SO_BINDANY, &one, sizeof(one)) < 0)
2168 warnlog("Warning: SO_BINDANY setsockopt failed: %s", stringerror());
2169 #endif
2170 }
2171
2172 static void dropGroupPrivs(gid_t gid)
2173 {
2174 if (gid) {
2175 if (setgid(gid) == 0) {
2176 if (setgroups(0, NULL) < 0) {
2177 warnlog("Warning: Unable to drop supplementary gids: %s", stringerror());
2178 }
2179 }
2180 else {
2181 warnlog("Warning: Unable to set group ID to %d: %s", gid, stringerror());
2182 }
2183 }
2184 }
2185
2186 static void dropUserPrivs(uid_t uid)
2187 {
2188 if(uid) {
2189 if(setuid(uid) < 0) {
2190 warnlog("Warning: Unable to set user ID to %d: %s", uid, stringerror());
2191 }
2192 }
2193 }
2194
2195 static void checkFileDescriptorsLimits(size_t udpBindsCount, size_t tcpBindsCount)
2196 {
2197 /* stdin, stdout, stderr */
2198 size_t requiredFDsCount = 3;
2199 auto backends = g_dstates.getLocal();
2200 /* UDP sockets to backends */
2201 size_t backendUDPSocketsCount = 0;
2202 for (const auto& backend : *backends) {
2203 backendUDPSocketsCount += backend->sockets.size();
2204 }
2205 requiredFDsCount += backendUDPSocketsCount;
2206 /* TCP sockets to backends */
2207 requiredFDsCount += (backends->size() * g_maxTCPClientThreads);
2208 /* listening sockets */
2209 requiredFDsCount += udpBindsCount;
2210 requiredFDsCount += tcpBindsCount;
2211 /* max TCP connections currently served */
2212 requiredFDsCount += g_maxTCPClientThreads;
2213 /* max pipes for communicating between TCP acceptors and client threads */
2214 requiredFDsCount += (g_maxTCPClientThreads * 2);
2215 /* max TCP queued connections */
2216 requiredFDsCount += g_maxTCPQueuedConnections;
2217 /* DelayPipe pipe */
2218 requiredFDsCount += 2;
2219 /* syslog socket */
2220 requiredFDsCount++;
2221 /* webserver main socket */
2222 requiredFDsCount++;
2223 /* console main socket */
2224 requiredFDsCount++;
2225 /* carbon export */
2226 requiredFDsCount++;
2227 /* history file */
2228 requiredFDsCount++;
2229 struct rlimit rl;
2230 getrlimit(RLIMIT_NOFILE, &rl);
2231 if (rl.rlim_cur <= requiredFDsCount) {
2232 warnlog("Warning, this configuration can use more than %d file descriptors, web server and console connections not included, and the current limit is %d.", std::to_string(requiredFDsCount), std::to_string(rl.rlim_cur));
2233 #ifdef HAVE_SYSTEMD
2234 warnlog("You can increase this value by using LimitNOFILE= in the systemd unit file or ulimit.");
2235 #else
2236 warnlog("You can increase this value by using ulimit.");
2237 #endif
2238 }
2239 }
2240
2241 static void setUpLocalBind(std::unique_ptr<ClientState>& cs)
2242 {
2243 /* skip some warnings if there is an identical UDP context */
2244 bool warn = cs->tcp == false || cs->tlsFrontend != nullptr || cs->dohFrontend != nullptr;
2245 int& fd = cs->tcp == false ? cs->udpFD : cs->tcpFD;
2246 (void) warn;
2247
2248 fd = SSocket(cs->local.sin4.sin_family, cs->tcp == false ? SOCK_DGRAM : SOCK_STREAM, 0);
2249
2250 if (cs->tcp) {
2251 SSetsockopt(fd, SOL_SOCKET, SO_REUSEADDR, 1);
2252 #ifdef TCP_DEFER_ACCEPT
2253 SSetsockopt(fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, 1);
2254 #endif
2255 if (cs->fastOpenQueueSize > 0) {
2256 #ifdef TCP_FASTOPEN
2257 SSetsockopt(fd, IPPROTO_TCP, TCP_FASTOPEN, cs->fastOpenQueueSize);
2258 #else
2259 if (warn) {
2260 warnlog("TCP Fast Open has been configured on local address '%s' but is not supported", cs->local.toStringWithPort());
2261 }
2262 #endif
2263 }
2264 }
2265
2266 if(cs->local.sin4.sin_family == AF_INET6) {
2267 SSetsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, 1);
2268 }
2269
2270 bindAny(cs->local.sin4.sin_family, fd);
2271
2272 if(!cs->tcp && IsAnyAddress(cs->local)) {
2273 int one=1;
2274 setsockopt(fd, IPPROTO_IP, GEN_IP_PKTINFO, &one, sizeof(one)); // linux supports this, so why not - might fail on other systems
2275 #ifdef IPV6_RECVPKTINFO
2276 setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, &one, sizeof(one));
2277 #endif
2278 }
2279
2280 if (cs->reuseport) {
2281 #ifdef SO_REUSEPORT
2282 SSetsockopt(fd, SOL_SOCKET, SO_REUSEPORT, 1);
2283 #else
2284 if (warn) {
2285 /* no need to warn again if configured but support is not available, we already did for UDP */
2286 warnlog("SO_REUSEPORT has been configured on local address '%s' but is not supported", cs->local.toStringWithPort());
2287 }
2288 #endif
2289 }
2290
2291 if (!cs->tcp) {
2292 if (cs->local.isIPv4()) {
2293 try {
2294 setSocketIgnorePMTU(cs->udpFD);
2295 }
2296 catch(const std::exception& e) {
2297 warnlog("Failed to set IP_MTU_DISCOVER on UDP server socket for local address '%s': %s", cs->local.toStringWithPort(), e.what());
2298 }
2299 }
2300 }
2301
2302 const std::string& itf = cs->interface;
2303 if (!itf.empty()) {
2304 #ifdef SO_BINDTODEVICE
2305 int res = setsockopt(fd, SOL_SOCKET, SO_BINDTODEVICE, itf.c_str(), itf.length());
2306 if (res != 0) {
2307 warnlog("Error setting up the interface on local address '%s': %s", cs->local.toStringWithPort(), stringerror());
2308 }
2309 #else
2310 if (warn) {
2311 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", cs->local.toStringWithPort());
2312 }
2313 #endif
2314 }
2315
2316 #ifdef HAVE_EBPF
2317 if (g_defaultBPFFilter) {
2318 cs->attachFilter(g_defaultBPFFilter);
2319 vinfolog("Attaching default BPF Filter to %s frontend %s", (!cs->tcp ? "UDP" : "TCP"), cs->local.toStringWithPort());
2320 }
2321 #endif /* HAVE_EBPF */
2322
2323 if (cs->tlsFrontend != nullptr) {
2324 if (!cs->tlsFrontend->setupTLS()) {
2325 errlog("Error while setting up TLS on local address '%s', exiting", cs->local.toStringWithPort());
2326 _exit(EXIT_FAILURE);
2327 }
2328 }
2329
2330 if (cs->dohFrontend != nullptr) {
2331 cs->dohFrontend->setup();
2332 }
2333
2334 SBind(fd, cs->local);
2335
2336 if (cs->tcp) {
2337 SListen(cs->tcpFD, SOMAXCONN);
2338 if (cs->tlsFrontend != nullptr) {
2339 warnlog("Listening on %s for TLS", cs->local.toStringWithPort());
2340 }
2341 else if (cs->dohFrontend != nullptr) {
2342 warnlog("Listening on %s for DoH", cs->local.toStringWithPort());
2343 }
2344 else if (cs->dnscryptCtx != nullptr) {
2345 warnlog("Listening on %s for DNSCrypt", cs->local.toStringWithPort());
2346 }
2347 else {
2348 warnlog("Listening on %s", cs->local.toStringWithPort());
2349 }
2350 }
2351
2352 cs->ready = true;
2353 }
2354
2355 struct
2356 {
2357 vector<string> locals;
2358 vector<string> remotes;
2359 bool checkConfig{false};
2360 bool beClient{false};
2361 bool beSupervised{false};
2362 string command;
2363 string config;
2364 string uid;
2365 string gid;
2366 } g_cmdLine;
2367
2368 std::atomic<bool> g_configurationDone{false};
2369
2370 static void usage()
2371 {
2372 cout<<endl;
2373 cout<<"Syntax: dnsdist [-C,--config file] [-c,--client [IP[:PORT]]]\n";
2374 cout<<"[-e,--execute cmd] [-h,--help] [-l,--local addr]\n";
2375 cout<<"[-v,--verbose] [--check-config] [--version]\n";
2376 cout<<"\n";
2377 cout<<"-a,--acl netmask Add this netmask to the ACL\n";
2378 cout<<"-C,--config file Load configuration from 'file'\n";
2379 cout<<"-c,--client Operate as a client, connect to dnsdist. This reads\n";
2380 cout<<" controlSocket from your configuration file, but also\n";
2381 cout<<" accepts an IP:PORT argument\n";
2382 #ifdef HAVE_LIBSODIUM
2383 cout<<"-k,--setkey KEY Use KEY for encrypted communication to dnsdist. This\n";
2384 cout<<" is similar to setting setKey in the configuration file.\n";
2385 cout<<" NOTE: this will leak this key in your shell's history\n";
2386 cout<<" and in the systems running process list.\n";
2387 #endif
2388 cout<<"--check-config Validate the configuration file and exit. The exit-code\n";
2389 cout<<" reflects the validation, 0 is OK, 1 means an error.\n";
2390 cout<<" Any errors are printed as well.\n";
2391 cout<<"-e,--execute cmd Connect to dnsdist and execute 'cmd'\n";
2392 cout<<"-g,--gid gid Change the process group ID after binding sockets\n";
2393 cout<<"-h,--help Display this helpful message\n";
2394 cout<<"-l,--local address Listen on this local address\n";
2395 cout<<"--supervised Don't open a console, I'm supervised\n";
2396 cout<<" (use with e.g. systemd and daemontools)\n";
2397 cout<<"--disable-syslog Don't log to syslog, only to stdout\n";
2398 cout<<" (use with e.g. systemd)\n";
2399 cout<<"-u,--uid uid Change the process user ID after binding sockets\n";
2400 cout<<"-v,--verbose Enable verbose mode\n";
2401 cout<<"-V,--version Show dnsdist version information and exit\n";
2402 }
2403
2404 int main(int argc, char** argv)
2405 try
2406 {
2407 size_t udpBindsCount = 0;
2408 size_t tcpBindsCount = 0;
2409 rl_attempted_completion_function = my_completion;
2410 rl_completion_append_character = 0;
2411
2412 signal(SIGPIPE, SIG_IGN);
2413 signal(SIGCHLD, SIG_IGN);
2414 openlog("dnsdist", LOG_PID|LOG_NDELAY, LOG_DAEMON);
2415
2416 #ifdef HAVE_LIBSODIUM
2417 if (sodium_init() == -1) {
2418 cerr<<"Unable to initialize crypto library"<<endl;
2419 exit(EXIT_FAILURE);
2420 }
2421 g_hashperturb=randombytes_uniform(0xffffffff);
2422 srandom(randombytes_uniform(0xffffffff));
2423 #else
2424 {
2425 struct timeval tv;
2426 gettimeofday(&tv, 0);
2427 srandom(tv.tv_sec ^ tv.tv_usec ^ getpid());
2428 g_hashperturb=random();
2429 }
2430
2431 #endif
2432 ComboAddress clientAddress = ComboAddress();
2433 g_cmdLine.config=SYSCONFDIR "/dnsdist.conf";
2434 struct option longopts[]={
2435 {"acl", required_argument, 0, 'a'},
2436 {"check-config", no_argument, 0, 1},
2437 {"client", no_argument, 0, 'c'},
2438 {"config", required_argument, 0, 'C'},
2439 {"disable-syslog", no_argument, 0, 2},
2440 {"execute", required_argument, 0, 'e'},
2441 {"gid", required_argument, 0, 'g'},
2442 {"help", no_argument, 0, 'h'},
2443 {"local", required_argument, 0, 'l'},
2444 {"setkey", required_argument, 0, 'k'},
2445 {"supervised", no_argument, 0, 3},
2446 {"uid", required_argument, 0, 'u'},
2447 {"verbose", no_argument, 0, 'v'},
2448 {"version", no_argument, 0, 'V'},
2449 {0,0,0,0}
2450 };
2451 int longindex=0;
2452 string optstring;
2453 for(;;) {
2454 int c=getopt_long(argc, argv, "a:cC:e:g:hk:l:u:vV", longopts, &longindex);
2455 if(c==-1)
2456 break;
2457 switch(c) {
2458 case 1:
2459 g_cmdLine.checkConfig=true;
2460 break;
2461 case 2:
2462 g_syslog=false;
2463 break;
2464 case 3:
2465 g_cmdLine.beSupervised=true;
2466 break;
2467 case 'C':
2468 g_cmdLine.config=optarg;
2469 break;
2470 case 'c':
2471 g_cmdLine.beClient=true;
2472 break;
2473 case 'e':
2474 g_cmdLine.command=optarg;
2475 break;
2476 case 'g':
2477 g_cmdLine.gid=optarg;
2478 break;
2479 case 'h':
2480 cout<<"dnsdist "<<VERSION<<endl;
2481 usage();
2482 cout<<"\n";
2483 exit(EXIT_SUCCESS);
2484 break;
2485 case 'a':
2486 optstring=optarg;
2487 g_ACL.modify([optstring](NetmaskGroup& nmg) { nmg.addMask(optstring); });
2488 break;
2489 case 'k':
2490 #ifdef HAVE_LIBSODIUM
2491 if (B64Decode(string(optarg), g_consoleKey) < 0) {
2492 cerr<<"Unable to decode key '"<<optarg<<"'."<<endl;
2493 exit(EXIT_FAILURE);
2494 }
2495 #else
2496 cerr<<"dnsdist has been built without libsodium, -k/--setkey is unsupported."<<endl;
2497 exit(EXIT_FAILURE);
2498 #endif
2499 break;
2500 case 'l':
2501 g_cmdLine.locals.push_back(trim_copy(string(optarg)));
2502 break;
2503 case 'u':
2504 g_cmdLine.uid=optarg;
2505 break;
2506 case 'v':
2507 g_verbose=true;
2508 break;
2509 case 'V':
2510 #ifdef LUAJIT_VERSION
2511 cout<<"dnsdist "<<VERSION<<" ("<<LUA_RELEASE<<" ["<<LUAJIT_VERSION<<"])"<<endl;
2512 #else
2513 cout<<"dnsdist "<<VERSION<<" ("<<LUA_RELEASE<<")"<<endl;
2514 #endif
2515 cout<<"Enabled features: ";
2516 #ifdef HAVE_CDB
2517 cout<<"cdb ";
2518 #endif
2519 #ifdef HAVE_DNS_OVER_TLS
2520 cout<<"dns-over-tls(";
2521 #ifdef HAVE_GNUTLS
2522 cout<<"gnutls";
2523 #ifdef HAVE_LIBSSL
2524 cout<<" ";
2525 #endif
2526 #endif
2527 #ifdef HAVE_LIBSSL
2528 cout<<"openssl";
2529 #endif
2530 cout<<") ";
2531 #endif
2532 #ifdef HAVE_DNS_OVER_HTTPS
2533 cout<<"dns-over-https(DOH) ";
2534 #endif
2535 #ifdef HAVE_DNSCRYPT
2536 cout<<"dnscrypt ";
2537 #endif
2538 #ifdef HAVE_EBPF
2539 cout<<"ebpf ";
2540 #endif
2541 #ifdef HAVE_FSTRM
2542 cout<<"fstrm ";
2543 #endif
2544 #ifdef HAVE_LIBCRYPTO
2545 cout<<"ipcipher ";
2546 #endif
2547 #ifdef HAVE_LIBSODIUM
2548 cout<<"libsodium ";
2549 #endif
2550 #ifdef HAVE_LMDB
2551 cout<<"lmdb ";
2552 #endif
2553 #ifdef HAVE_PROTOBUF
2554 cout<<"protobuf ";
2555 #endif
2556 #ifdef HAVE_RE2
2557 cout<<"re2 ";
2558 #endif
2559 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
2560 cout<<"recvmmsg/sendmmsg ";
2561 #endif
2562 #ifdef HAVE_NET_SNMP
2563 cout<<"snmp ";
2564 #endif
2565 #ifdef HAVE_SYSTEMD
2566 cout<<"systemd";
2567 #endif
2568 cout<<endl;
2569 exit(EXIT_SUCCESS);
2570 break;
2571 case '?':
2572 //getopt_long printed an error message.
2573 usage();
2574 exit(EXIT_FAILURE);
2575 break;
2576 }
2577 }
2578
2579 argc-=optind;
2580 argv+=optind;
2581 for(auto p = argv; *p; ++p) {
2582 if(g_cmdLine.beClient) {
2583 clientAddress = ComboAddress(*p, 5199);
2584 } else {
2585 g_cmdLine.remotes.push_back(*p);
2586 }
2587 }
2588
2589 ServerPolicy leastOutstandingPol{"leastOutstanding", leastOutstanding, false};
2590
2591 g_policy.setState(leastOutstandingPol);
2592 if(g_cmdLine.beClient || !g_cmdLine.command.empty()) {
2593 setupLua(true, g_cmdLine.config);
2594 if (clientAddress != ComboAddress())
2595 g_serverControl = clientAddress;
2596 doClient(g_serverControl, g_cmdLine.command);
2597 _exit(EXIT_SUCCESS);
2598 }
2599
2600 auto acl = g_ACL.getCopy();
2601 if(acl.empty()) {
2602 for(auto& addr : {"127.0.0.0/8", "10.0.0.0/8", "100.64.0.0/10", "169.254.0.0/16", "192.168.0.0/16", "172.16.0.0/12", "::1/128", "fc00::/7", "fe80::/10"})
2603 acl.addMask(addr);
2604 g_ACL.setState(acl);
2605 }
2606
2607 auto consoleACL = g_consoleACL.getCopy();
2608 for (const auto& mask : { "127.0.0.1/8", "::1/128" }) {
2609 consoleACL.addMask(mask);
2610 }
2611 g_consoleACL.setState(consoleACL);
2612
2613 if (g_cmdLine.checkConfig) {
2614 setupLua(true, g_cmdLine.config);
2615 // No exception was thrown
2616 infolog("Configuration '%s' OK!", g_cmdLine.config);
2617 _exit(EXIT_SUCCESS);
2618 }
2619
2620 auto todo=setupLua(false, g_cmdLine.config);
2621
2622 auto localPools = g_pools.getCopy();
2623 {
2624 bool precompute = false;
2625 if (g_policy.getLocal()->name == "chashed") {
2626 precompute = true;
2627 } else {
2628 for (const auto& entry: localPools) {
2629 if (entry.second->policy != nullptr && entry.second->policy->name == "chashed") {
2630 precompute = true;
2631 break ;
2632 }
2633 }
2634 }
2635 if (precompute) {
2636 vinfolog("Pre-computing hashes for consistent hash load-balancing policy");
2637 // pre compute hashes
2638 auto backends = g_dstates.getLocal();
2639 for (auto& backend: *backends) {
2640 backend->hash();
2641 }
2642 }
2643 }
2644
2645 if (!g_cmdLine.locals.empty()) {
2646 for (auto it = g_frontends.begin(); it != g_frontends.end(); ) {
2647 /* DoH, DoT and DNSCrypt frontends are separate */
2648 if ((*it)->dohFrontend == nullptr && (*it)->tlsFrontend == nullptr && (*it)->dnscryptCtx == nullptr) {
2649 it = g_frontends.erase(it);
2650 }
2651 else {
2652 ++it;
2653 }
2654 }
2655
2656 for(const auto& loc : g_cmdLine.locals) {
2657 /* UDP */
2658 g_frontends.push_back(std::unique_ptr<ClientState>(new ClientState(ComboAddress(loc, 53), false, false, 0, "", {})));
2659 /* TCP */
2660 g_frontends.push_back(std::unique_ptr<ClientState>(new ClientState(ComboAddress(loc, 53), true, false, 0, "", {})));
2661 }
2662 }
2663
2664 if (g_frontends.empty()) {
2665 /* UDP */
2666 g_frontends.push_back(std::unique_ptr<ClientState>(new ClientState(ComboAddress("127.0.0.1", 53), false, false, 0, "", {})));
2667 /* TCP */
2668 g_frontends.push_back(std::unique_ptr<ClientState>(new ClientState(ComboAddress("127.0.0.1", 53), true, false, 0, "", {})));
2669 }
2670
2671 g_configurationDone = true;
2672
2673 for(auto& frontend : g_frontends) {
2674 setUpLocalBind(frontend);
2675
2676 if (frontend->tcp == false) {
2677 ++udpBindsCount;
2678 }
2679 else {
2680 ++tcpBindsCount;
2681 }
2682 }
2683
2684 warnlog("dnsdist %s comes with ABSOLUTELY NO WARRANTY. This is free software, and you are welcome to redistribute it according to the terms of the GPL version 2", VERSION);
2685
2686 vector<string> vec;
2687 std::string acls;
2688 g_ACL.getLocal()->toStringVector(&vec);
2689 for(const auto& s : vec) {
2690 if (!acls.empty())
2691 acls += ", ";
2692 acls += s;
2693 }
2694 infolog("ACL allowing queries from: %s", acls.c_str());
2695 vec.clear();
2696 acls.clear();
2697 g_consoleACL.getLocal()->toStringVector(&vec);
2698 for (const auto& entry : vec) {
2699 if (!acls.empty()) {
2700 acls += ", ";
2701 }
2702 acls += entry;
2703 }
2704 infolog("Console ACL allowing connections from: %s", acls.c_str());
2705
2706 #ifdef HAVE_LIBSODIUM
2707 if (g_consoleEnabled && g_consoleKey.empty()) {
2708 warnlog("Warning, the console has been enabled via 'controlSocket()' but no key has been set with 'setKey()' so all connections will fail until a key has been set");
2709 }
2710 #endif
2711
2712 uid_t newgid=0;
2713 gid_t newuid=0;
2714
2715 if(!g_cmdLine.gid.empty())
2716 newgid = strToGID(g_cmdLine.gid.c_str());
2717
2718 if(!g_cmdLine.uid.empty())
2719 newuid = strToUID(g_cmdLine.uid.c_str());
2720
2721 dropGroupPrivs(newgid);
2722 dropUserPrivs(newuid);
2723 try {
2724 /* we might still have capabilities remaining,
2725 for example if we have been started as root
2726 without --uid or --gid (please don't do that)
2727 or as an unprivileged user with ambient
2728 capabilities like CAP_NET_BIND_SERVICE.
2729 */
2730 dropCapabilities();
2731 }
2732 catch(const std::exception& e) {
2733 warnlog("%s", e.what());
2734 }
2735
2736 /* this need to be done _after_ dropping privileges */
2737 g_delay = new DelayPipe<DelayedPacket>();
2738
2739 if (g_snmpAgent) {
2740 g_snmpAgent->run();
2741 }
2742
2743 g_tcpclientthreads = std::unique_ptr<TCPClientCollection>(new TCPClientCollection(g_maxTCPClientThreads, g_useTCPSinglePipe));
2744
2745 for(auto& t : todo)
2746 t();
2747
2748 localPools = g_pools.getCopy();
2749 /* create the default pool no matter what */
2750 createPoolIfNotExists(localPools, "");
2751 if(g_cmdLine.remotes.size()) {
2752 for(const auto& address : g_cmdLine.remotes) {
2753 auto ret=std::make_shared<DownstreamState>(ComboAddress(address, 53));
2754 addServerToPool(localPools, "", ret);
2755 if (ret->connected && !ret->threadStarted.test_and_set()) {
2756 ret->tid = thread(responderThread, ret);
2757 }
2758 g_dstates.modify([ret](servers_t& servers) { servers.push_back(ret); });
2759 }
2760 }
2761 g_pools.setState(localPools);
2762
2763 if(g_dstates.getLocal()->empty()) {
2764 errlog("No downstream servers defined: all packets will get dropped");
2765 // you might define them later, but you need to know
2766 }
2767
2768 checkFileDescriptorsLimits(udpBindsCount, tcpBindsCount);
2769
2770 for(auto& dss : g_dstates.getCopy()) { // it is a copy, but the internal shared_ptrs are the real deal
2771 if(dss->availability==DownstreamState::Availability::Auto) {
2772 bool newState=upCheck(dss);
2773 warnlog("Marking downstream %s as '%s'", dss->getNameWithAddr(), newState ? "up" : "down");
2774 dss->upStatus = newState;
2775 }
2776 }
2777
2778 for(auto& cs : g_frontends) {
2779 if (cs->dohFrontend != nullptr) {
2780 #ifdef HAVE_DNS_OVER_HTTPS
2781 std::thread t1(dohThread, cs.get());
2782 if (!cs->cpus.empty()) {
2783 mapThreadToCPUList(t1.native_handle(), cs->cpus);
2784 }
2785 t1.detach();
2786 #endif /* HAVE_DNS_OVER_HTTPS */
2787 continue;
2788 }
2789 if (cs->udpFD >= 0) {
2790 thread t1(udpClientThread, cs.get());
2791 if (!cs->cpus.empty()) {
2792 mapThreadToCPUList(t1.native_handle(), cs->cpus);
2793 }
2794 t1.detach();
2795 }
2796 else if (cs->tcpFD >= 0) {
2797 thread t1(tcpAcceptorThread, cs.get());
2798 if (!cs->cpus.empty()) {
2799 mapThreadToCPUList(t1.native_handle(), cs->cpus);
2800 }
2801 t1.detach();
2802 }
2803 }
2804
2805 thread carbonthread(carbonDumpThread);
2806 carbonthread.detach();
2807
2808 thread stattid(maintThread);
2809 stattid.detach();
2810
2811 thread healththread(healthChecksThread);
2812
2813 if (!g_secPollSuffix.empty()) {
2814 thread secpollthread(secPollThread);
2815 secpollthread.detach();
2816 }
2817
2818 if(g_cmdLine.beSupervised) {
2819 #ifdef HAVE_SYSTEMD
2820 sd_notify(0, "READY=1");
2821 #endif
2822 healththread.join();
2823 }
2824 else {
2825 healththread.detach();
2826 doConsole();
2827 }
2828 _exit(EXIT_SUCCESS);
2829
2830 }
2831 catch(const LuaContext::ExecutionErrorException& e) {
2832 try {
2833 errlog("Fatal Lua error: %s", e.what());
2834 std::rethrow_if_nested(e);
2835 } catch(const std::exception& ne) {
2836 errlog("Details: %s", ne.what());
2837 }
2838 catch(PDNSException &ae)
2839 {
2840 errlog("Fatal pdns error: %s", ae.reason);
2841 }
2842 _exit(EXIT_FAILURE);
2843 }
2844 catch(std::exception &e)
2845 {
2846 errlog("Fatal error: %s", e.what());
2847 _exit(EXIT_FAILURE);
2848 }
2849 catch(PDNSException &ae)
2850 {
2851 errlog("Fatal pdns error: %s", ae.reason);
2852 _exit(EXIT_FAILURE);
2853 }
2854
2855 uint64_t getLatencyCount(const std::string&)
2856 {
2857 return g_stats.responses + g_stats.selfAnswered + g_stats.cacheHits;
2858 }