]> git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/dnsdist.cc
Merge pull request #7820 from pieterlexis/systemd-no-setuid
[thirdparty/pdns.git] / pdns / dnsdist.cc
1 /*
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
8 *
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
22
23 #include "config.h"
24
25 #include <fstream>
26 #include <getopt.h>
27 #include <grp.h>
28 #include <limits>
29 #include <netinet/tcp.h>
30 #include <pwd.h>
31 #include <sys/resource.h>
32 #include <unistd.h>
33
34 #if defined (__OpenBSD__) || defined(__NetBSD__)
35 #include <readline/readline.h>
36 #else
37 #include <editline/readline.h>
38 #endif
39
40 #include "dnsdist-systemd.hh"
41 #ifdef HAVE_SYSTEMD
42 #include <systemd/sd-daemon.h>
43 #endif
44
45 #include "dnsdist.hh"
46 #include "dnsdist-cache.hh"
47 #include "dnsdist-console.hh"
48 #include "dnsdist-ecs.hh"
49 #include "dnsdist-healthchecks.hh"
50 #include "dnsdist-lua.hh"
51 #include "dnsdist-rings.hh"
52 #include "dnsdist-secpoll.hh"
53 #include "dnsdist-xpf.hh"
54
55 #include "base64.hh"
56 #include "delaypipe.hh"
57 #include "dolog.hh"
58 #include "dnsname.hh"
59 #include "dnsparser.hh"
60 #include "ednsoptions.hh"
61 #include "gettime.hh"
62 #include "lock.hh"
63 #include "misc.hh"
64 #include "sodcrypto.hh"
65 #include "sstuff.hh"
66 #include "threadname.hh"
67
68 /* Known sins:
69
70 Receiver is currently single threaded
71 not *that* bad actually, but now that we are thread safe, might want to scale
72 */
73
74 /* the Rulaction plan
75 Set of Rules, if one matches, it leads to an Action
76 Both rules and actions could conceivably be Lua based.
77 On the C++ side, both could be inherited from a class Rule and a class Action,
78 on the Lua side we can't do that. */
79
80 using std::atomic;
81 using std::thread;
82 bool g_verbose;
83
84 struct DNSDistStats g_stats;
85 MetricDefinitionStorage g_metricDefinitions;
86
87 uint16_t g_maxOutstanding{std::numeric_limits<uint16_t>::max()};
88 uint32_t g_staleCacheEntriesTTL{0};
89 bool g_syslog{true};
90 bool g_allowEmptyResponse{false};
91
92 GlobalStateHolder<NetmaskGroup> g_ACL;
93 string g_outputBuffer;
94
95 std::vector<std::shared_ptr<TLSFrontend>> g_tlslocals;
96 std::vector<std::shared_ptr<DOHFrontend>> g_dohlocals;
97 std::vector<std::shared_ptr<DNSCryptContext>> g_dnsCryptLocals;
98 #ifdef HAVE_EBPF
99 shared_ptr<BPFFilter> g_defaultBPFFilter;
100 std::vector<std::shared_ptr<DynBPFFilter> > g_dynBPFFilters;
101 #endif /* HAVE_EBPF */
102 std::vector<std::unique_ptr<ClientState>> g_frontends;
103 GlobalStateHolder<pools_t> g_pools;
104 size_t g_udpVectorSize{1};
105
106 bool g_snmpEnabled{false};
107 bool g_snmpTrapsEnabled{false};
108 DNSDistSNMPAgent* g_snmpAgent{nullptr};
109
110 /* UDP: the grand design. Per socket we listen on for incoming queries there is one thread.
111 Then we have a bunch of connected sockets for talking to downstream servers.
112 We send directly to those sockets.
113
114 For the return path, per downstream server we have a thread that listens to responses.
115
116 Per socket there is an array of 2^16 states, when we send out a packet downstream, we note
117 there the original requestor and the original id. The new ID is the offset in the array.
118
119 When an answer comes in on a socket, we look up the offset by the id, and lob it to the
120 original requestor.
121
122 IDs are assigned by atomic increments of the socket offset.
123 */
124
125 GlobalStateHolder<vector<DNSDistRuleAction> > g_rulactions;
126 GlobalStateHolder<vector<DNSDistResponseRuleAction> > g_resprulactions;
127 GlobalStateHolder<vector<DNSDistResponseRuleAction> > g_cachehitresprulactions;
128 GlobalStateHolder<vector<DNSDistResponseRuleAction> > g_selfansweredresprulactions;
129
130 Rings g_rings;
131 QueryCount g_qcount;
132
133 GlobalStateHolder<servers_t> g_dstates;
134 GlobalStateHolder<NetmaskTree<DynBlock>> g_dynblockNMG;
135 GlobalStateHolder<SuffixMatchTree<DynBlock>> g_dynblockSMT;
136 DNSAction::Action g_dynBlockAction = DNSAction::Action::Drop;
137 int g_tcpRecvTimeout{2};
138 int g_tcpSendTimeout{2};
139 int g_udpTimeout{2};
140
141 bool g_servFailOnNoPolicy{false};
142 bool g_truncateTC{false};
143 bool g_fixupCase{false};
144 bool g_preserveTrailingData{false};
145 bool g_roundrobinFailOnNoServer{false};
146
147 std::set<std::string> g_capabilitiesToRetain;
148
149 static void truncateTC(char* packet, uint16_t* len, size_t responseSize, unsigned int consumed)
150 try
151 {
152 bool hadEDNS = false;
153 uint16_t payloadSize = 0;
154 uint16_t z = 0;
155
156 if (g_addEDNSToSelfGeneratedResponses) {
157 hadEDNS = getEDNSUDPPayloadSizeAndZ(packet, *len, &payloadSize, &z);
158 }
159
160 *len=static_cast<uint16_t>(sizeof(dnsheader)+consumed+DNS_TYPE_SIZE+DNS_CLASS_SIZE);
161 struct dnsheader* dh = reinterpret_cast<struct dnsheader*>(packet);
162 dh->ancount = dh->arcount = dh->nscount = 0;
163
164 if (hadEDNS) {
165 addEDNS(dh, *len, responseSize, z & EDNS_HEADER_FLAG_DO, payloadSize, 0);
166 }
167 }
168 catch(...)
169 {
170 g_stats.truncFail++;
171 }
172
173 struct DelayedPacket
174 {
175 int fd;
176 string packet;
177 ComboAddress destination;
178 ComboAddress origDest;
179 void operator()()
180 {
181 ssize_t res;
182 if(origDest.sin4.sin_family == 0) {
183 res = sendto(fd, packet.c_str(), packet.size(), 0, (struct sockaddr*)&destination, destination.getSocklen());
184 }
185 else {
186 res = sendfromto(fd, packet.c_str(), packet.size(), 0, origDest, destination);
187 }
188 if (res == -1) {
189 int err = errno;
190 vinfolog("Error sending delayed response to %s: %s", destination.toStringWithPort(), strerror(err));
191 }
192 }
193 };
194
195 DelayPipe<DelayedPacket>* g_delay = nullptr;
196
197 void doLatencyStats(double udiff)
198 {
199 if(udiff < 1000) ++g_stats.latency0_1;
200 else if(udiff < 10000) ++g_stats.latency1_10;
201 else if(udiff < 50000) ++g_stats.latency10_50;
202 else if(udiff < 100000) ++g_stats.latency50_100;
203 else if(udiff < 1000000) ++g_stats.latency100_1000;
204 else ++g_stats.latencySlow;
205 g_stats.latencySum += udiff / 1000;
206
207 auto doAvg = [](double& var, double n, double weight) {
208 var = (weight -1) * var/weight + n/weight;
209 };
210
211 doAvg(g_stats.latencyAvg100, udiff, 100);
212 doAvg(g_stats.latencyAvg1000, udiff, 1000);
213 doAvg(g_stats.latencyAvg10000, udiff, 10000);
214 doAvg(g_stats.latencyAvg1000000, udiff, 1000000);
215 }
216
217 bool responseContentMatches(const char* response, const uint16_t responseLen, const DNSName& qname, const uint16_t qtype, const uint16_t qclass, const ComboAddress& remote, unsigned int& consumed)
218 {
219 if (responseLen < sizeof(dnsheader)) {
220 return false;
221 }
222
223 const struct dnsheader* dh = reinterpret_cast<const struct dnsheader*>(response);
224 if (dh->qdcount == 0) {
225 if ((dh->rcode != RCode::NoError && dh->rcode != RCode::NXDomain) || g_allowEmptyResponse) {
226 return true;
227 }
228 else {
229 ++g_stats.nonCompliantResponses;
230 return false;
231 }
232 }
233
234 uint16_t rqtype, rqclass;
235 DNSName rqname;
236 try {
237 rqname=DNSName(response, responseLen, sizeof(dnsheader), false, &rqtype, &rqclass, &consumed);
238 }
239 catch(const std::exception& e) {
240 if(responseLen > 0 && static_cast<size_t>(responseLen) > sizeof(dnsheader)) {
241 infolog("Backend %s sent us a response with id %d that did not parse: %s", remote.toStringWithPort(), ntohs(dh->id), e.what());
242 }
243 ++g_stats.nonCompliantResponses;
244 return false;
245 }
246
247 if (rqtype != qtype || rqclass != qclass || rqname != qname) {
248 return false;
249 }
250
251 return true;
252 }
253
254 static void restoreFlags(struct dnsheader* dh, uint16_t origFlags)
255 {
256 static const uint16_t rdMask = 1 << FLAGS_RD_OFFSET;
257 static const uint16_t cdMask = 1 << FLAGS_CD_OFFSET;
258 static const uint16_t restoreFlagsMask = UINT16_MAX & ~(rdMask | cdMask);
259 uint16_t * flags = getFlagsFromDNSHeader(dh);
260 /* clear the flags we are about to restore */
261 *flags &= restoreFlagsMask;
262 /* only keep the flags we want to restore */
263 origFlags &= ~restoreFlagsMask;
264 /* set the saved flags as they were */
265 *flags |= origFlags;
266 }
267
268 static bool fixUpQueryTurnedResponse(DNSQuestion& dq, const uint16_t origFlags)
269 {
270 restoreFlags(dq.dh, origFlags);
271
272 return addEDNSToQueryTurnedResponse(dq);
273 }
274
275 static bool fixUpResponse(char** response, uint16_t* responseLen, size_t* responseSize, const DNSName& qname, uint16_t origFlags, bool ednsAdded, bool ecsAdded, std::vector<uint8_t>& rewrittenResponse, uint16_t addRoom, bool* zeroScope)
276 {
277 if (*responseLen < sizeof(dnsheader)) {
278 return false;
279 }
280
281 struct dnsheader* dh = reinterpret_cast<struct dnsheader*>(*response);
282 restoreFlags(dh, origFlags);
283
284 if (*responseLen == sizeof(dnsheader)) {
285 return true;
286 }
287
288 if(g_fixupCase) {
289 string realname = qname.toDNSString();
290 if (*responseLen >= (sizeof(dnsheader) + realname.length())) {
291 memcpy(*response + sizeof(dnsheader), realname.c_str(), realname.length());
292 }
293 }
294
295 if (ednsAdded || ecsAdded) {
296 uint16_t optStart;
297 size_t optLen = 0;
298 bool last = false;
299
300 const std::string responseStr(*response, *responseLen);
301 int res = locateEDNSOptRR(responseStr, &optStart, &optLen, &last);
302
303 if (res == 0) {
304 if (zeroScope) { // this finds if an EDNS Client Subnet scope was set, and if it is 0
305 size_t optContentStart = 0;
306 uint16_t optContentLen = 0;
307 /* we need at least 4 bytes after the option length (family: 2, source prefix-length: 1, scope prefix-length: 1) */
308 if (isEDNSOptionInOpt(responseStr, optStart, optLen, EDNSOptionCode::ECS, &optContentStart, &optContentLen) && optContentLen >= 4) {
309 /* see if the EDNS Client Subnet SCOPE PREFIX-LENGTH byte in position 3 is set to 0, which is the only thing
310 we care about. */
311 *zeroScope = responseStr.at(optContentStart + 3) == 0;
312 }
313 }
314
315 if (ednsAdded) {
316 /* we added the entire OPT RR,
317 therefore we need to remove it entirely */
318 if (last) {
319 /* simply remove the last AR */
320 *responseLen -= optLen;
321 uint16_t arcount = ntohs(dh->arcount);
322 arcount--;
323 dh->arcount = htons(arcount);
324 }
325 else {
326 /* Removing an intermediary RR could lead to compression error */
327 if (rewriteResponseWithoutEDNS(responseStr, rewrittenResponse) == 0) {
328 *responseLen = rewrittenResponse.size();
329 if (addRoom && (UINT16_MAX - *responseLen) > addRoom) {
330 rewrittenResponse.reserve(*responseLen + addRoom);
331 }
332 *responseSize = rewrittenResponse.capacity();
333 *response = reinterpret_cast<char*>(rewrittenResponse.data());
334 }
335 else {
336 warnlog("Error rewriting content");
337 }
338 }
339 }
340 else {
341 /* the OPT RR was already present, but without ECS,
342 we need to remove the ECS option if any */
343 if (last) {
344 /* nothing after the OPT RR, we can simply remove the
345 ECS option */
346 size_t existingOptLen = optLen;
347 removeEDNSOptionFromOPT(*response + optStart, &optLen, EDNSOptionCode::ECS);
348 *responseLen -= (existingOptLen - optLen);
349 }
350 else {
351 /* Removing an intermediary RR could lead to compression error */
352 if (rewriteResponseWithoutEDNSOption(responseStr, EDNSOptionCode::ECS, rewrittenResponse) == 0) {
353 *responseLen = rewrittenResponse.size();
354 if (addRoom && (UINT16_MAX - *responseLen) > addRoom) {
355 rewrittenResponse.reserve(*responseLen + addRoom);
356 }
357 *responseSize = rewrittenResponse.capacity();
358 *response = reinterpret_cast<char*>(rewrittenResponse.data());
359 }
360 else {
361 warnlog("Error rewriting content");
362 }
363 }
364 }
365 }
366 }
367
368 return true;
369 }
370
371 #ifdef HAVE_DNSCRYPT
372 static bool encryptResponse(char* response, uint16_t* responseLen, size_t responseSize, bool tcp, std::shared_ptr<DNSCryptQuery> dnsCryptQuery, dnsheader** dh, dnsheader* dhCopy)
373 {
374 if (dnsCryptQuery) {
375 uint16_t encryptedResponseLen = 0;
376
377 /* save the original header before encrypting it in place */
378 if (dh != nullptr && *dh != nullptr && dhCopy != nullptr) {
379 memcpy(dhCopy, *dh, sizeof(dnsheader));
380 *dh = dhCopy;
381 }
382
383 int res = dnsCryptQuery->encryptResponse(response, *responseLen, responseSize, tcp, &encryptedResponseLen);
384 if (res == 0) {
385 *responseLen = encryptedResponseLen;
386 } else {
387 /* dropping response */
388 vinfolog("Error encrypting the response, dropping.");
389 return false;
390 }
391 }
392 return true;
393 }
394 #endif /* HAVE_DNSCRYPT */
395
396 static bool applyRulesToResponse(LocalStateHolder<vector<DNSDistResponseRuleAction> >& localRespRulactions, DNSResponse& dr)
397 {
398 DNSResponseAction::Action action=DNSResponseAction::Action::None;
399 std::string ruleresult;
400 for(const auto& lr : *localRespRulactions) {
401 if(lr.d_rule->matches(&dr)) {
402 lr.d_rule->d_matches++;
403 action=(*lr.d_action)(&dr, &ruleresult);
404 switch(action) {
405 case DNSResponseAction::Action::Allow:
406 return true;
407 break;
408 case DNSResponseAction::Action::Drop:
409 return false;
410 break;
411 case DNSResponseAction::Action::HeaderModify:
412 return true;
413 break;
414 case DNSResponseAction::Action::ServFail:
415 dr.dh->rcode = RCode::ServFail;
416 return true;
417 break;
418 /* non-terminal actions follow */
419 case DNSResponseAction::Action::Delay:
420 dr.delayMsec = static_cast<int>(pdns_stou(ruleresult)); // sorry
421 break;
422 case DNSResponseAction::Action::None:
423 break;
424 }
425 }
426 }
427
428 return true;
429 }
430
431 bool processResponse(char** response, uint16_t* responseLen, size_t* responseSize, LocalStateHolder<vector<DNSDistResponseRuleAction> >& localRespRulactions, DNSResponse& dr, size_t addRoom, std::vector<uint8_t>& rewrittenResponse, bool muted)
432 {
433 if (!applyRulesToResponse(localRespRulactions, dr)) {
434 return false;
435 }
436
437 bool zeroScope = false;
438 if (!fixUpResponse(response, responseLen, responseSize, *dr.qname, dr.origFlags, dr.ednsAdded, dr.ecsAdded, rewrittenResponse, addRoom, dr.useZeroScope ? &zeroScope : nullptr)) {
439 return false;
440 }
441
442 if (dr.packetCache && !dr.skipCache && *responseLen <= s_maxPacketCacheEntrySize) {
443 if (!dr.useZeroScope) {
444 /* if the query was not suitable for zero-scope, for
445 example because it had an existing ECS entry so the hash is
446 not really 'no ECS', so just insert it for the existing subnet
447 since:
448 - we don't have the correct hash for a non-ECS query
449 - inserting with hash computed before the ECS replacement but with
450 the subnet extracted _after_ the replacement would not work.
451 */
452 zeroScope = false;
453 }
454 // if zeroScope, pass the pre-ECS hash-key and do not pass the subnet to the cache
455 dr.packetCache->insert(zeroScope ? dr.cacheKeyNoECS : dr.cacheKey, zeroScope ? boost::none : dr.subnet, dr.origFlags, dr.dnssecOK, *dr.qname, dr.qtype, dr.qclass, *response, *responseLen, dr.tcp, dr.dh->rcode, dr.tempFailureTTL);
456 }
457
458 #ifdef HAVE_DNSCRYPT
459 if (!muted) {
460 if (!encryptResponse(*response, responseLen, *responseSize, dr.tcp, dr.dnsCryptQuery, nullptr, nullptr)) {
461 return false;
462 }
463 }
464 #endif /* HAVE_DNSCRYPT */
465
466 return true;
467 }
468
469 static bool sendUDPResponse(int origFD, const char* response, const uint16_t responseLen, const int delayMsec, const ComboAddress& origDest, const ComboAddress& origRemote)
470 {
471 if(delayMsec && g_delay) {
472 DelayedPacket dp{origFD, string(response,responseLen), origRemote, origDest};
473 g_delay->submit(dp, delayMsec);
474 }
475 else {
476 ssize_t res;
477 if(origDest.sin4.sin_family == 0) {
478 res = sendto(origFD, response, responseLen, 0, reinterpret_cast<const struct sockaddr*>(&origRemote), origRemote.getSocklen());
479 }
480 else {
481 res = sendfromto(origFD, response, responseLen, 0, origDest, origRemote);
482 }
483 if (res == -1) {
484 int err = errno;
485 vinfolog("Error sending response to %s: %s", origRemote.toStringWithPort(), stringerror(err));
486 }
487 }
488
489 return true;
490 }
491
492
493 int pickBackendSocketForSending(std::shared_ptr<DownstreamState>& state)
494 {
495 return state->sockets[state->socketsOffset++ % state->sockets.size()];
496 }
497
498 static void pickBackendSocketsReadyForReceiving(const std::shared_ptr<DownstreamState>& state, std::vector<int>& ready)
499 {
500 ready.clear();
501
502 if (state->sockets.size() == 1) {
503 ready.push_back(state->sockets[0]);
504 return ;
505 }
506
507 {
508 std::lock_guard<std::mutex> lock(state->socketsLock);
509 state->mplexer->getAvailableFDs(ready, -1);
510 }
511 }
512
513 // listens on a dedicated socket, lobs answers from downstream servers to original requestors
514 void responderThread(std::shared_ptr<DownstreamState> dss)
515 try {
516 setThreadName("dnsdist/respond");
517 auto localRespRulactions = g_resprulactions.getLocal();
518 char packet[s_maxPacketCacheEntrySize + DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE];
519 static_assert(sizeof(packet) <= UINT16_MAX, "Packet size should fit in a uint16_t");
520 /* when the answer is encrypted in place, we need to get a copy
521 of the original header before encryption to fill the ring buffer */
522 dnsheader cleartextDH;
523 vector<uint8_t> rewrittenResponse;
524
525 uint16_t queryId = 0;
526 std::vector<int> sockets;
527 sockets.reserve(dss->sockets.size());
528
529 for(;;) {
530 dnsheader* dh = reinterpret_cast<struct dnsheader*>(packet);
531 try {
532 pickBackendSocketsReadyForReceiving(dss, sockets);
533 for (const auto& fd : sockets) {
534 ssize_t got = recv(fd, packet, sizeof(packet), 0);
535 char * response = packet;
536 size_t responseSize = sizeof(packet);
537
538 if (got < 0 || static_cast<size_t>(got) < sizeof(dnsheader))
539 continue;
540
541 uint16_t responseLen = static_cast<uint16_t>(got);
542 queryId = dh->id;
543
544 if(queryId >= dss->idStates.size()) {
545 continue;
546 }
547
548 IDState* ids = &dss->idStates[queryId];
549 int64_t usageIndicator = ids->usageIndicator;
550
551 if(!IDState::isInUse(usageIndicator)) {
552 /* the corresponding state is marked as not in use, meaning that:
553 - it was already cleaned up by another thread and the state is gone ;
554 - we already got a response for this query and this one is a duplicate.
555 Either way, we don't touch it.
556 */
557 continue;
558 }
559
560 /* read the potential DOHUnit state as soon as possible, but don't use it
561 until we have confirmed that we own this state by updating usageIndicator */
562 auto du = ids->du;
563 /* setting age to 0 to prevent the maintainer thread from
564 cleaning this IDS while we process the response.
565 */
566 ids->age = 0;
567 int origFD = ids->origFD;
568
569 unsigned int consumed = 0;
570 if (!responseContentMatches(response, responseLen, ids->qname, ids->qtype, ids->qclass, dss->remote, consumed)) {
571 continue;
572 }
573
574 bool isDoH = du != nullptr;
575 /* atomically mark the state as available, but only if it has not been altered
576 in the meantime */
577 if (ids->tryMarkUnused(usageIndicator)) {
578 /* clear the potential DOHUnit asap, it's ours now
579 and since we just marked the state as unused,
580 someone could overwrite it. */
581 ids->du = nullptr;
582 /* we only decrement the outstanding counter if the value was not
583 altered in the meantime, which would mean that the state has been actively reused
584 and the other thread has not incremented the outstanding counter, so we don't
585 want it to be decremented twice. */
586 --dss->outstanding; // you'd think an attacker could game this, but we're using connected socket
587 } else {
588 /* someone updated the state in the meantime, we can't touch the existing pointer */
589 du = nullptr;
590 /* since the state has been updated, we can't safely access it so let's just drop
591 this response */
592 continue;
593 }
594
595 if(dh->tc && g_truncateTC) {
596 truncateTC(response, &responseLen, responseSize, consumed);
597 }
598
599 dh->id = ids->origID;
600
601 uint16_t addRoom = 0;
602 DNSResponse dr = makeDNSResponseFromIDState(*ids, dh, sizeof(packet), responseLen, false);
603 if (dr.dnsCryptQuery) {
604 addRoom = DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE;
605 }
606
607 memcpy(&cleartextDH, dr.dh, sizeof(cleartextDH));
608 if (!processResponse(&response, &responseLen, &responseSize, localRespRulactions, dr, addRoom, rewrittenResponse, ids->cs && ids->cs->muted)) {
609 continue;
610 }
611
612 if (ids->cs && !ids->cs->muted) {
613 if (du) {
614 #ifdef HAVE_DNS_OVER_HTTPS
615 // DoH query
616 du->response = std::string(response, responseLen);
617 if (send(du->rsock, &du, sizeof(du), 0) != sizeof(du)) {
618 /* at this point we have the only remaining pointer on this
619 DOHUnit object since we did set ids->du to nullptr earlier,
620 except if we got the response before the pointer could be
621 released by the frontend */
622 du->release();
623 }
624 #endif /* HAVE_DNS_OVER_HTTPS */
625 du = nullptr;
626 }
627 else {
628 ComboAddress empty;
629 empty.sin4.sin_family = 0;
630 /* if ids->destHarvested is false, origDest holds the listening address.
631 We don't want to use that as a source since it could be 0.0.0.0 for example. */
632 sendUDPResponse(origFD, response, responseLen, dr.delayMsec, ids->destHarvested ? ids->origDest : empty, ids->origRemote);
633 }
634 }
635
636 ++g_stats.responses;
637 if (ids->cs) {
638 ++ids->cs->responses;
639 }
640 ++dss->responses;
641
642 double udiff = ids->sentTime.udiff();
643 vinfolog("Got answer from %s, relayed to %s%s, took %f usec", dss->remote.toStringWithPort(), ids->origRemote.toStringWithPort(),
644 isDoH ? " (https)": "", udiff);
645
646 struct timespec ts;
647 gettime(&ts);
648 g_rings.insertResponse(ts, *dr.remote, *dr.qname, dr.qtype, static_cast<unsigned int>(udiff), static_cast<unsigned int>(got), cleartextDH, dss->remote);
649
650 switch (cleartextDH.rcode) {
651 case RCode::NXDomain:
652 ++g_stats.frontendNXDomain;
653 break;
654 case RCode::ServFail:
655 ++g_stats.servfailResponses;
656 ++g_stats.frontendServFail;
657 break;
658 case RCode::NoError:
659 ++g_stats.frontendNoError;
660 break;
661 }
662 dss->latencyUsec = (127.0 * dss->latencyUsec / 128.0) + udiff/128.0;
663
664 doLatencyStats(udiff);
665
666 rewrittenResponse.clear();
667 }
668 }
669 catch(const std::exception& e){
670 vinfolog("Got an error in UDP responder thread while parsing a response from %s, id %d: %s", dss->remote.toStringWithPort(), queryId, e.what());
671 }
672 }
673 }
674 catch(const std::exception& e)
675 {
676 errlog("UDP responder thread died because of exception: %s", e.what());
677 }
678 catch(const PDNSException& e)
679 {
680 errlog("UDP responder thread died because of PowerDNS exception: %s", e.reason);
681 }
682 catch(...)
683 {
684 errlog("UDP responder thread died because of an exception: %s", "unknown");
685 }
686
687 bool DownstreamState::reconnect()
688 {
689 std::unique_lock<std::mutex> tl(connectLock, std::try_to_lock);
690 if (!tl.owns_lock()) {
691 /* we are already reconnecting */
692 return false;
693 }
694
695 connected = false;
696 for (auto& fd : sockets) {
697 if (fd != -1) {
698 if (sockets.size() > 1) {
699 std::lock_guard<std::mutex> lock(socketsLock);
700 mplexer->removeReadFD(fd);
701 }
702 /* shutdown() is needed to wake up recv() in the responderThread */
703 shutdown(fd, SHUT_RDWR);
704 close(fd);
705 fd = -1;
706 }
707 if (!IsAnyAddress(remote)) {
708 fd = SSocket(remote.sin4.sin_family, SOCK_DGRAM, 0);
709 if (!IsAnyAddress(sourceAddr)) {
710 SSetsockopt(fd, SOL_SOCKET, SO_REUSEADDR, 1);
711 if (!sourceItfName.empty()) {
712 #ifdef SO_BINDTODEVICE
713 int res = setsockopt(fd, SOL_SOCKET, SO_BINDTODEVICE, sourceItfName.c_str(), sourceItfName.length());
714 if (res != 0) {
715 infolog("Error setting up the interface on backend socket '%s': %s", remote.toStringWithPort(), stringerror());
716 }
717 #endif
718 }
719
720 SBind(fd, sourceAddr);
721 }
722 try {
723 SConnect(fd, remote);
724 if (sockets.size() > 1) {
725 std::lock_guard<std::mutex> lock(socketsLock);
726 mplexer->addReadFD(fd, [](int, boost::any) {});
727 }
728 connected = true;
729 }
730 catch(const std::runtime_error& error) {
731 infolog("Error connecting to new server with address %s: %s", remote.toStringWithPort(), error.what());
732 connected = false;
733 break;
734 }
735 }
736 }
737
738 /* if at least one (re-)connection failed, close all sockets */
739 if (!connected) {
740 for (auto& fd : sockets) {
741 if (fd != -1) {
742 if (sockets.size() > 1) {
743 std::lock_guard<std::mutex> lock(socketsLock);
744 mplexer->removeReadFD(fd);
745 }
746 /* shutdown() is needed to wake up recv() in the responderThread */
747 shutdown(fd, SHUT_RDWR);
748 close(fd);
749 fd = -1;
750 }
751 }
752 }
753
754 return connected;
755 }
756 void DownstreamState::hash()
757 {
758 vinfolog("Computing hashes for id=%s and weight=%d", id, weight);
759 auto w = weight;
760 WriteLock wl(&d_lock);
761 hashes.clear();
762 while (w > 0) {
763 std::string uuid = boost::str(boost::format("%s-%d") % id % w);
764 unsigned int wshash = burtleCI((const unsigned char*)uuid.c_str(), uuid.size(), g_hashperturb);
765 hashes.insert(wshash);
766 --w;
767 }
768 }
769
770 void DownstreamState::setId(const boost::uuids::uuid& newId)
771 {
772 id = newId;
773 // compute hashes only if already done
774 if (!hashes.empty()) {
775 hash();
776 }
777 }
778
779 void DownstreamState::setWeight(int newWeight)
780 {
781 if (newWeight < 1) {
782 errlog("Error setting server's weight: downstream weight value must be greater than 0.");
783 return ;
784 }
785 weight = newWeight;
786 if (!hashes.empty()) {
787 hash();
788 }
789 }
790
791 DownstreamState::DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf_, const std::string& sourceItfName_, size_t numberOfSockets, bool connect=true): sourceItfName(sourceItfName_), remote(remote_), sourceAddr(sourceAddr_), sourceItf(sourceItf_)
792 {
793 pthread_rwlock_init(&d_lock, nullptr);
794 id = getUniqueID();
795 threadStarted.clear();
796
797 mplexer = std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
798
799 sockets.resize(numberOfSockets);
800 for (auto& fd : sockets) {
801 fd = -1;
802 }
803
804 if (connect && !IsAnyAddress(remote)) {
805 reconnect();
806 idStates.resize(g_maxOutstanding);
807 sw.start();
808 infolog("Added downstream server %s", remote.toStringWithPort());
809 }
810
811 }
812
813 std::mutex g_luamutex;
814 LuaContext g_lua;
815
816 GlobalStateHolder<ServerPolicy> g_policy;
817
818 shared_ptr<DownstreamState> firstAvailable(const NumberedServerVector& servers, const DNSQuestion* dq)
819 {
820 for(auto& d : servers) {
821 if(d.second->isUp() && d.second->qps.check())
822 return d.second;
823 }
824 return leastOutstanding(servers, dq);
825 }
826
827 // get server with least outstanding queries, and within those, with the lowest order, and within those: the fastest
828 shared_ptr<DownstreamState> leastOutstanding(const NumberedServerVector& servers, const DNSQuestion* dq)
829 {
830 if (servers.size() == 1 && servers[0].second->isUp()) {
831 return servers[0].second;
832 }
833
834 vector<pair<tuple<int,int,double>, shared_ptr<DownstreamState>>> poss;
835 /* so you might wonder, why do we go through this trouble? The data on which we sort could change during the sort,
836 which would suck royally and could even lead to crashes. So first we snapshot on what we sort, and then we sort */
837 poss.reserve(servers.size());
838 for(auto& d : servers) {
839 if(d.second->isUp()) {
840 poss.push_back({make_tuple(d.second->outstanding.load(), d.second->order, d.second->latencyUsec), d.second});
841 }
842 }
843 if(poss.empty())
844 return shared_ptr<DownstreamState>();
845 nth_element(poss.begin(), poss.begin(), poss.end(), [](const decltype(poss)::value_type& a, const decltype(poss)::value_type& b) { return a.first < b.first; });
846 return poss.begin()->second;
847 }
848
849 shared_ptr<DownstreamState> valrandom(unsigned int val, const NumberedServerVector& servers, const DNSQuestion* dq)
850 {
851 vector<pair<int, shared_ptr<DownstreamState>>> poss;
852 int sum = 0;
853 int max = std::numeric_limits<int>::max();
854
855 for(auto& d : servers) { // w=1, w=10 -> 1, 11
856 if(d.second->isUp()) {
857 // Don't overflow sum when adding high weights
858 if(d.second->weight > max - sum) {
859 sum = max;
860 } else {
861 sum += d.second->weight;
862 }
863
864 poss.push_back({sum, d.second});
865 }
866 }
867
868 // Catch poss & sum are empty to avoid SIGFPE
869 if(poss.empty())
870 return shared_ptr<DownstreamState>();
871
872 int r = val % sum;
873 auto p = upper_bound(poss.begin(), poss.end(),r, [](int r_, const decltype(poss)::value_type& a) { return r_ < a.first;});
874 if(p==poss.end())
875 return shared_ptr<DownstreamState>();
876 return p->second;
877 }
878
879 shared_ptr<DownstreamState> wrandom(const NumberedServerVector& servers, const DNSQuestion* dq)
880 {
881 return valrandom(random(), servers, dq);
882 }
883
884 uint32_t g_hashperturb;
885 double g_consistentHashBalancingFactor = 0;
886 shared_ptr<DownstreamState> whashed(const NumberedServerVector& servers, const DNSQuestion* dq)
887 {
888 return valrandom(dq->qname->hash(g_hashperturb), servers, dq);
889 }
890
891 shared_ptr<DownstreamState> chashed(const NumberedServerVector& servers, const DNSQuestion* dq)
892 {
893 unsigned int qhash = dq->qname->hash(g_hashperturb);
894 unsigned int sel = std::numeric_limits<unsigned int>::max();
895 unsigned int min = std::numeric_limits<unsigned int>::max();
896 shared_ptr<DownstreamState> ret = nullptr, first = nullptr;
897
898 double targetLoad = std::numeric_limits<double>::max();
899 if (g_consistentHashBalancingFactor > 0) {
900 /* we start with one, representing the query we are currently handling */
901 double currentLoad = 1;
902 for (const auto& pair : servers) {
903 currentLoad += pair.second->outstanding;
904 }
905 targetLoad = (currentLoad / servers.size()) * g_consistentHashBalancingFactor;
906 }
907
908 for (const auto& d: servers) {
909 if (d.second->isUp() && d.second->outstanding <= targetLoad) {
910 // make sure hashes have been computed
911 if (d.second->hashes.empty()) {
912 d.second->hash();
913 }
914 {
915 ReadLock rl(&(d.second->d_lock));
916 const auto& server = d.second;
917 // we want to keep track of the last hash
918 if (min > *(server->hashes.begin())) {
919 min = *(server->hashes.begin());
920 first = server;
921 }
922
923 auto hash_it = server->hashes.lower_bound(qhash);
924 if (hash_it != server->hashes.end()) {
925 if (*hash_it < sel) {
926 sel = *hash_it;
927 ret = server;
928 }
929 }
930 }
931 }
932 }
933 if (ret != nullptr) {
934 return ret;
935 }
936 if (first != nullptr) {
937 return first;
938 }
939 return shared_ptr<DownstreamState>();
940 }
941
942 shared_ptr<DownstreamState> roundrobin(const NumberedServerVector& servers, const DNSQuestion* dq)
943 {
944 NumberedServerVector poss;
945
946 for(auto& d : servers) {
947 if(d.second->isUp()) {
948 poss.push_back(d);
949 }
950 }
951
952 const auto *res=&poss;
953 if(poss.empty() && !g_roundrobinFailOnNoServer)
954 res = &servers;
955
956 if(res->empty())
957 return shared_ptr<DownstreamState>();
958
959 static unsigned int counter;
960
961 return (*res)[(counter++) % res->size()].second;
962 }
963
964 ComboAddress g_serverControl{"127.0.0.1:5199"};
965
966 std::shared_ptr<ServerPool> createPoolIfNotExists(pools_t& pools, const string& poolName)
967 {
968 std::shared_ptr<ServerPool> pool;
969 pools_t::iterator it = pools.find(poolName);
970 if (it != pools.end()) {
971 pool = it->second;
972 }
973 else {
974 if (!poolName.empty())
975 vinfolog("Creating pool %s", poolName);
976 pool = std::make_shared<ServerPool>();
977 pools.insert(std::pair<std::string,std::shared_ptr<ServerPool> >(poolName, pool));
978 }
979 return pool;
980 }
981
982 void setPoolPolicy(pools_t& pools, const string& poolName, std::shared_ptr<ServerPolicy> policy)
983 {
984 std::shared_ptr<ServerPool> pool = createPoolIfNotExists(pools, poolName);
985 if (!poolName.empty()) {
986 vinfolog("Setting pool %s server selection policy to %s", poolName, policy->name);
987 } else {
988 vinfolog("Setting default pool server selection policy to %s", policy->name);
989 }
990 pool->policy = policy;
991 }
992
993 void addServerToPool(pools_t& pools, const string& poolName, std::shared_ptr<DownstreamState> server)
994 {
995 std::shared_ptr<ServerPool> pool = createPoolIfNotExists(pools, poolName);
996 if (!poolName.empty()) {
997 vinfolog("Adding server to pool %s", poolName);
998 } else {
999 vinfolog("Adding server to default pool");
1000 }
1001 pool->addServer(server);
1002 }
1003
1004 void removeServerFromPool(pools_t& pools, const string& poolName, std::shared_ptr<DownstreamState> server)
1005 {
1006 std::shared_ptr<ServerPool> pool = getPool(pools, poolName);
1007
1008 if (!poolName.empty()) {
1009 vinfolog("Removing server from pool %s", poolName);
1010 }
1011 else {
1012 vinfolog("Removing server from default pool");
1013 }
1014
1015 pool->removeServer(server);
1016 }
1017
1018 std::shared_ptr<ServerPool> getPool(const pools_t& pools, const std::string& poolName)
1019 {
1020 pools_t::const_iterator it = pools.find(poolName);
1021
1022 if (it == pools.end()) {
1023 throw std::out_of_range("No pool named " + poolName);
1024 }
1025
1026 return it->second;
1027 }
1028
1029 NumberedServerVector getDownstreamCandidates(const pools_t& pools, const std::string& poolName)
1030 {
1031 std::shared_ptr<ServerPool> pool = getPool(pools, poolName);
1032 return pool->getServers();
1033 }
1034
1035 static void spoofResponseFromString(DNSQuestion& dq, const string& spoofContent, bool raw)
1036 {
1037 string result;
1038
1039 if (raw) {
1040 SpoofAction sa(spoofContent);
1041 sa(&dq, &result);
1042 }
1043 else {
1044 std::vector<std::string> addrs;
1045 stringtok(addrs, spoofContent, " ,");
1046
1047 if (addrs.size() == 1) {
1048 try {
1049 ComboAddress spoofAddr(spoofContent);
1050 SpoofAction sa({spoofAddr});
1051 sa(&dq, &result);
1052 }
1053 catch(const PDNSException &e) {
1054 DNSName cname(spoofContent);
1055 SpoofAction sa(cname); // CNAME then
1056 sa(&dq, &result);
1057 }
1058 } else {
1059 std::vector<ComboAddress> cas;
1060 for (const auto& addr : addrs) {
1061 try {
1062 cas.push_back(ComboAddress(addr));
1063 }
1064 catch (...) {
1065 }
1066 }
1067 SpoofAction sa(cas);
1068 sa(&dq, &result);
1069 }
1070 }
1071 }
1072
1073 bool processRulesResult(const DNSAction::Action& action, DNSQuestion& dq, std::string& ruleresult, bool& drop)
1074 {
1075 switch(action) {
1076 case DNSAction::Action::Allow:
1077 return true;
1078 break;
1079 case DNSAction::Action::Drop:
1080 ++g_stats.ruleDrop;
1081 drop = true;
1082 return true;
1083 break;
1084 case DNSAction::Action::Nxdomain:
1085 dq.dh->rcode = RCode::NXDomain;
1086 dq.dh->qr=true;
1087 ++g_stats.ruleNXDomain;
1088 return true;
1089 break;
1090 case DNSAction::Action::Refused:
1091 dq.dh->rcode = RCode::Refused;
1092 dq.dh->qr=true;
1093 ++g_stats.ruleRefused;
1094 return true;
1095 break;
1096 case DNSAction::Action::ServFail:
1097 dq.dh->rcode = RCode::ServFail;
1098 dq.dh->qr=true;
1099 ++g_stats.ruleServFail;
1100 return true;
1101 break;
1102 case DNSAction::Action::Spoof:
1103 spoofResponseFromString(dq, ruleresult, false);
1104 return true;
1105 break;
1106 case DNSAction::Action::SpoofRaw:
1107 spoofResponseFromString(dq, ruleresult, true);
1108 return true;
1109 break;
1110 case DNSAction::Action::Truncate:
1111 dq.dh->tc = true;
1112 dq.dh->qr = true;
1113 dq.dh->ra = dq.dh->rd;
1114 dq.dh->aa = false;
1115 dq.dh->ad = false;
1116 return true;
1117 break;
1118 case DNSAction::Action::HeaderModify:
1119 return true;
1120 break;
1121 case DNSAction::Action::Pool:
1122 dq.poolname=ruleresult;
1123 return true;
1124 break;
1125 case DNSAction::Action::NoRecurse:
1126 dq.dh->rd = false;
1127 return true;
1128 break;
1129 /* non-terminal actions follow */
1130 case DNSAction::Action::Delay:
1131 dq.delayMsec = static_cast<int>(pdns_stou(ruleresult)); // sorry
1132 break;
1133 case DNSAction::Action::None:
1134 /* fall-through */
1135 case DNSAction::Action::NoOp:
1136 break;
1137 }
1138
1139 /* false means that we don't stop the processing */
1140 return false;
1141 }
1142
1143
1144 static bool applyRulesToQuery(LocalHolders& holders, DNSQuestion& dq, const struct timespec& now)
1145 {
1146 g_rings.insertQuery(now, *dq.remote, *dq.qname, dq.qtype, dq.len, *dq.dh);
1147
1148 if(g_qcount.enabled) {
1149 string qname = (*dq.qname).toLogString();
1150 bool countQuery{true};
1151 if(g_qcount.filter) {
1152 std::lock_guard<std::mutex> lock(g_luamutex);
1153 std::tie (countQuery, qname) = g_qcount.filter(&dq);
1154 }
1155
1156 if(countQuery) {
1157 WriteLock wl(&g_qcount.queryLock);
1158 if(!g_qcount.records.count(qname)) {
1159 g_qcount.records[qname] = 0;
1160 }
1161 g_qcount.records[qname]++;
1162 }
1163 }
1164
1165 if(auto got = holders.dynNMGBlock->lookup(*dq.remote)) {
1166 auto updateBlockStats = [&got]() {
1167 ++g_stats.dynBlocked;
1168 got->second.blocks++;
1169 };
1170
1171 if(now < got->second.until) {
1172 DNSAction::Action action = got->second.action;
1173 if (action == DNSAction::Action::None) {
1174 action = g_dynBlockAction;
1175 }
1176 switch (action) {
1177 case DNSAction::Action::NoOp:
1178 /* do nothing */
1179 break;
1180
1181 case DNSAction::Action::Nxdomain:
1182 vinfolog("Query from %s turned into NXDomain because of dynamic block", dq.remote->toStringWithPort());
1183 updateBlockStats();
1184
1185 dq.dh->rcode = RCode::NXDomain;
1186 dq.dh->qr=true;
1187 return true;
1188
1189 case DNSAction::Action::Refused:
1190 vinfolog("Query from %s refused because of dynamic block", dq.remote->toStringWithPort());
1191 updateBlockStats();
1192
1193 dq.dh->rcode = RCode::Refused;
1194 dq.dh->qr = true;
1195 return true;
1196
1197 case DNSAction::Action::Truncate:
1198 if(!dq.tcp) {
1199 updateBlockStats();
1200 vinfolog("Query from %s truncated because of dynamic block", dq.remote->toStringWithPort());
1201 dq.dh->tc = true;
1202 dq.dh->qr = true;
1203 dq.dh->ra = dq.dh->rd;
1204 dq.dh->aa = false;
1205 dq.dh->ad = false;
1206 return true;
1207 }
1208 else {
1209 vinfolog("Query from %s for %s over TCP *not* truncated because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toLogString());
1210 }
1211 break;
1212 case DNSAction::Action::NoRecurse:
1213 updateBlockStats();
1214 vinfolog("Query from %s setting rd=0 because of dynamic block", dq.remote->toStringWithPort());
1215 dq.dh->rd = false;
1216 return true;
1217 default:
1218 updateBlockStats();
1219 vinfolog("Query from %s dropped because of dynamic block", dq.remote->toStringWithPort());
1220 return false;
1221 }
1222 }
1223 }
1224
1225 if(auto got = holders.dynSMTBlock->lookup(*dq.qname)) {
1226 auto updateBlockStats = [&got]() {
1227 ++g_stats.dynBlocked;
1228 got->blocks++;
1229 };
1230
1231 if(now < got->until) {
1232 DNSAction::Action action = got->action;
1233 if (action == DNSAction::Action::None) {
1234 action = g_dynBlockAction;
1235 }
1236 switch (action) {
1237 case DNSAction::Action::NoOp:
1238 /* do nothing */
1239 break;
1240 case DNSAction::Action::Nxdomain:
1241 vinfolog("Query from %s for %s turned into NXDomain because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toLogString());
1242 updateBlockStats();
1243
1244 dq.dh->rcode = RCode::NXDomain;
1245 dq.dh->qr=true;
1246 return true;
1247 case DNSAction::Action::Refused:
1248 vinfolog("Query from %s for %s refused because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toLogString());
1249 updateBlockStats();
1250
1251 dq.dh->rcode = RCode::Refused;
1252 dq.dh->qr=true;
1253 return true;
1254 case DNSAction::Action::Truncate:
1255 if(!dq.tcp) {
1256 updateBlockStats();
1257
1258 vinfolog("Query from %s for %s truncated because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toLogString());
1259 dq.dh->tc = true;
1260 dq.dh->qr = true;
1261 dq.dh->ra = dq.dh->rd;
1262 dq.dh->aa = false;
1263 dq.dh->ad = false;
1264 return true;
1265 }
1266 else {
1267 vinfolog("Query from %s for %s over TCP *not* truncated because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toLogString());
1268 }
1269 break;
1270 case DNSAction::Action::NoRecurse:
1271 updateBlockStats();
1272 vinfolog("Query from %s setting rd=0 because of dynamic block", dq.remote->toStringWithPort());
1273 dq.dh->rd = false;
1274 return true;
1275 default:
1276 updateBlockStats();
1277 vinfolog("Query from %s for %s dropped because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toLogString());
1278 return false;
1279 }
1280 }
1281 }
1282
1283 DNSAction::Action action=DNSAction::Action::None;
1284 string ruleresult;
1285 bool drop = false;
1286 for(const auto& lr : *holders.rulactions) {
1287 if(lr.d_rule->matches(&dq)) {
1288 lr.d_rule->d_matches++;
1289 action=(*lr.d_action)(&dq, &ruleresult);
1290 if (processRulesResult(action, dq, ruleresult, drop)) {
1291 break;
1292 }
1293 }
1294 }
1295
1296 if (drop) {
1297 return false;
1298 }
1299
1300 return true;
1301 }
1302
1303 ssize_t udpClientSendRequestToBackend(const std::shared_ptr<DownstreamState>& ss, const int sd, const char* request, const size_t requestLen, bool healthCheck)
1304 {
1305 ssize_t result;
1306
1307 if (ss->sourceItf == 0) {
1308 result = send(sd, request, requestLen, 0);
1309 }
1310 else {
1311 struct msghdr msgh;
1312 struct iovec iov;
1313 cmsgbuf_aligned cbuf;
1314 ComboAddress remote(ss->remote);
1315 fillMSGHdr(&msgh, &iov, &cbuf, sizeof(cbuf), const_cast<char*>(request), requestLen, &remote);
1316 addCMsgSrcAddr(&msgh, &cbuf, &ss->sourceAddr, ss->sourceItf);
1317 result = sendmsg(sd, &msgh, 0);
1318 }
1319
1320 if (result == -1) {
1321 int savederrno = errno;
1322 vinfolog("Error sending request to backend %s: %d", ss->remote.toStringWithPort(), savederrno);
1323
1324 /* This might sound silly, but on Linux send() might fail with EINVAL
1325 if the interface the socket was bound to doesn't exist anymore.
1326 We don't want to reconnect the real socket if the healthcheck failed,
1327 because it's not using the same socket.
1328 */
1329 if (!healthCheck && (savederrno == EINVAL || savederrno == ENODEV)) {
1330 ss->reconnect();
1331 }
1332 }
1333
1334 return result;
1335 }
1336
1337 static bool isUDPQueryAcceptable(ClientState& cs, LocalHolders& holders, const struct msghdr* msgh, const ComboAddress& remote, ComboAddress& dest)
1338 {
1339 if (msgh->msg_flags & MSG_TRUNC) {
1340 /* message was too large for our buffer */
1341 vinfolog("Dropping message too large for our buffer");
1342 ++g_stats.nonCompliantQueries;
1343 return false;
1344 }
1345
1346 if(!holders.acl->match(remote)) {
1347 vinfolog("Query from %s dropped because of ACL", remote.toStringWithPort());
1348 ++g_stats.aclDrops;
1349 return false;
1350 }
1351
1352 cs.queries++;
1353 ++g_stats.queries;
1354
1355 if (HarvestDestinationAddress(msgh, &dest)) {
1356 /* we don't get the port, only the address */
1357 dest.sin4.sin_port = cs.local.sin4.sin_port;
1358 }
1359 else {
1360 dest.sin4.sin_family = 0;
1361 }
1362
1363 return true;
1364 }
1365
1366 boost::optional<std::vector<uint8_t>> checkDNSCryptQuery(const ClientState& cs, const char* query, uint16_t& len, std::shared_ptr<DNSCryptQuery>& dnsCryptQuery, time_t now, bool tcp)
1367 {
1368 if (cs.dnscryptCtx) {
1369 #ifdef HAVE_DNSCRYPT
1370 vector<uint8_t> response;
1371 uint16_t decryptedQueryLen = 0;
1372
1373 dnsCryptQuery = std::make_shared<DNSCryptQuery>(cs.dnscryptCtx);
1374
1375 bool decrypted = handleDNSCryptQuery(const_cast<char*>(query), len, dnsCryptQuery, &decryptedQueryLen, tcp, now, response);
1376
1377 if (!decrypted) {
1378 if (response.size() > 0) {
1379 return response;
1380 }
1381 throw std::runtime_error("Unable to decrypt DNSCrypt query, dropping.");
1382 }
1383
1384 len = decryptedQueryLen;
1385 #endif /* HAVE_DNSCRYPT */
1386 }
1387 return boost::none;
1388 }
1389
1390 bool checkQueryHeaders(const struct dnsheader* dh)
1391 {
1392 if (dh->qr) { // don't respond to responses
1393 ++g_stats.nonCompliantQueries;
1394 return false;
1395 }
1396
1397 if (dh->qdcount == 0) {
1398 ++g_stats.emptyQueries;
1399 return false;
1400 }
1401
1402 if (dh->rd) {
1403 ++g_stats.rdQueries;
1404 }
1405
1406 return true;
1407 }
1408
1409 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1410 static void queueResponse(const ClientState& cs, const char* response, uint16_t responseLen, const ComboAddress& dest, const ComboAddress& remote, struct mmsghdr& outMsg, struct iovec* iov, cmsgbuf_aligned* cbuf)
1411 {
1412 outMsg.msg_len = 0;
1413 fillMSGHdr(&outMsg.msg_hdr, iov, nullptr, 0, const_cast<char*>(response), responseLen, const_cast<ComboAddress*>(&remote));
1414
1415 if (dest.sin4.sin_family == 0) {
1416 outMsg.msg_hdr.msg_control = nullptr;
1417 }
1418 else {
1419 addCMsgSrcAddr(&outMsg.msg_hdr, cbuf, &dest, 0);
1420 }
1421 }
1422 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1423
1424 /* self-generated responses or cache hits */
1425 static bool prepareOutgoingResponse(LocalHolders& holders, ClientState& cs, DNSQuestion& dq, bool cacheHit)
1426 {
1427 DNSResponse dr(dq.qname, dq.qtype, dq.qclass, dq.consumed, dq.local, dq.remote, reinterpret_cast<dnsheader*>(dq.dh), dq.size, dq.len, dq.tcp, dq.queryTime);
1428
1429 #ifdef HAVE_PROTOBUF
1430 dr.uniqueId = dq.uniqueId;
1431 #endif
1432 dr.qTag = dq.qTag;
1433 dr.delayMsec = dq.delayMsec;
1434
1435 if (!applyRulesToResponse(cacheHit ? holders.cacheHitRespRulactions : holders.selfAnsweredRespRulactions, dr)) {
1436 return false;
1437 }
1438
1439 /* in case a rule changed it */
1440 dq.delayMsec = dr.delayMsec;
1441
1442 #ifdef HAVE_DNSCRYPT
1443 if (!cs.muted) {
1444 if (!encryptResponse(reinterpret_cast<char*>(dq.dh), &dq.len, dq.size, dq.tcp, dq.dnsCryptQuery, nullptr, nullptr)) {
1445 return false;
1446 }
1447 }
1448 #endif /* HAVE_DNSCRYPT */
1449
1450 if (cacheHit) {
1451 ++g_stats.cacheHits;
1452 }
1453
1454 switch (dr.dh->rcode) {
1455 case RCode::NXDomain:
1456 ++g_stats.frontendNXDomain;
1457 break;
1458 case RCode::ServFail:
1459 ++g_stats.frontendServFail;
1460 break;
1461 case RCode::NoError:
1462 ++g_stats.frontendNoError;
1463 break;
1464 }
1465
1466 doLatencyStats(0); // we're not going to measure this
1467 return true;
1468 }
1469
1470 ProcessQueryResult processQuery(DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend)
1471 {
1472 const uint16_t queryId = ntohs(dq.dh->id);
1473
1474 try {
1475 /* we need an accurate ("real") value for the response and
1476 to store into the IDS, but not for insertion into the
1477 rings for example */
1478 struct timespec now;
1479 gettime(&now);
1480
1481 if (!applyRulesToQuery(holders, dq, now)) {
1482 return ProcessQueryResult::Drop;
1483 }
1484
1485 if(dq.dh->qr) { // something turned it into a response
1486 fixUpQueryTurnedResponse(dq, dq.origFlags);
1487
1488 if (!prepareOutgoingResponse(holders, cs, dq, false)) {
1489 return ProcessQueryResult::Drop;
1490 }
1491
1492 ++g_stats.selfAnswered;
1493 ++cs.responses;
1494 return ProcessQueryResult::SendAnswer;
1495 }
1496
1497 std::shared_ptr<ServerPool> serverPool = getPool(*holders.pools, dq.poolname);
1498 dq.packetCache = serverPool->packetCache;
1499 auto policy = *(holders.policy);
1500 if (serverPool->policy != nullptr) {
1501 policy = *(serverPool->policy);
1502 }
1503 auto servers = serverPool->getServers();
1504 if (policy.isLua) {
1505 std::lock_guard<std::mutex> lock(g_luamutex);
1506 selectedBackend = policy.policy(servers, &dq);
1507 }
1508 else {
1509 selectedBackend = policy.policy(servers, &dq);
1510 }
1511
1512 uint16_t cachedResponseSize = dq.size;
1513 uint32_t allowExpired = selectedBackend ? 0 : g_staleCacheEntriesTTL;
1514
1515 if (dq.packetCache && !dq.skipCache) {
1516 dq.dnssecOK = (getEDNSZ(dq) & EDNS_HEADER_FLAG_DO);
1517 }
1518
1519 if (dq.useECS && ((selectedBackend && selectedBackend->useECS) || (!selectedBackend && serverPool->getECS()))) {
1520 // we special case our cache in case a downstream explicitly gave us a universally valid response with a 0 scope
1521 // we need ECS parsing (parseECS) to be true so we can be sure that the initial incoming query did not have an existing
1522 // ECS option, which would make it unsuitable for the zero-scope feature.
1523 if (dq.packetCache && !dq.skipCache && (!selectedBackend || !selectedBackend->disableZeroScope) && dq.packetCache->isECSParsingEnabled()) {
1524 if (dq.packetCache->get(dq, dq.consumed, dq.dh->id, reinterpret_cast<char*>(dq.dh), &cachedResponseSize, &dq.cacheKeyNoECS, dq.subnet, dq.dnssecOK, allowExpired)) {
1525 dq.len = cachedResponseSize;
1526
1527 if (!prepareOutgoingResponse(holders, cs, dq, true)) {
1528 return ProcessQueryResult::Drop;
1529 }
1530
1531 return ProcessQueryResult::SendAnswer;
1532 }
1533
1534 if (!dq.subnet) {
1535 /* there was no existing ECS on the query, enable the zero-scope feature */
1536 dq.useZeroScope = true;
1537 }
1538 }
1539
1540 if (!handleEDNSClientSubnet(dq, dq.ednsAdded, dq.ecsAdded, g_preserveTrailingData)) {
1541 vinfolog("Dropping query from %s because we couldn't insert the ECS value", dq.remote->toStringWithPort());
1542 return ProcessQueryResult::Drop;
1543 }
1544 }
1545
1546 if (dq.packetCache && !dq.skipCache) {
1547 if (dq.packetCache->get(dq, dq.consumed, dq.dh->id, reinterpret_cast<char*>(dq.dh), &cachedResponseSize, &dq.cacheKey, dq.subnet, dq.dnssecOK, allowExpired)) {
1548 dq.len = cachedResponseSize;
1549
1550 if (!prepareOutgoingResponse(holders, cs, dq, true)) {
1551 return ProcessQueryResult::Drop;
1552 }
1553
1554 return ProcessQueryResult::SendAnswer;
1555 }
1556 ++g_stats.cacheMisses;
1557 }
1558
1559 if(!selectedBackend) {
1560 ++g_stats.noPolicy;
1561
1562 vinfolog("%s query for %s|%s from %s, no policy applied", g_servFailOnNoPolicy ? "ServFailed" : "Dropped", dq.qname->toLogString(), QType(dq.qtype).getName(), dq.remote->toStringWithPort());
1563 if (g_servFailOnNoPolicy) {
1564 restoreFlags(dq.dh, dq.origFlags);
1565
1566 dq.dh->rcode = RCode::ServFail;
1567 dq.dh->qr = true;
1568
1569 if (!prepareOutgoingResponse(holders, cs, dq, false)) {
1570 return ProcessQueryResult::Drop;
1571 }
1572 // no response-only statistics counter to update.
1573 return ProcessQueryResult::SendAnswer;
1574 }
1575
1576 return ProcessQueryResult::Drop;
1577 }
1578
1579 if (dq.addXPF && selectedBackend->xpfRRCode != 0) {
1580 addXPF(dq, selectedBackend->xpfRRCode, g_preserveTrailingData);
1581 }
1582
1583 selectedBackend->queries++;
1584 return ProcessQueryResult::PassToBackend;
1585 }
1586 catch(const std::exception& e){
1587 vinfolog("Got an error while parsing a %s query from %s, id %d: %s", (dq.tcp ? "TCP" : "UDP"), dq.remote->toStringWithPort(), queryId, e.what());
1588 }
1589 return ProcessQueryResult::Drop;
1590 }
1591
1592 static void processUDPQuery(ClientState& cs, LocalHolders& holders, const struct msghdr* msgh, const ComboAddress& remote, ComboAddress& dest, char* query, uint16_t len, size_t queryBufferSize, struct mmsghdr* responsesVect, unsigned int* queuedResponses, struct iovec* respIOV, cmsgbuf_aligned* respCBuf)
1593 {
1594 assert(responsesVect == nullptr || (queuedResponses != nullptr && respIOV != nullptr && respCBuf != nullptr));
1595 uint16_t queryId = 0;
1596
1597 try {
1598 if (!isUDPQueryAcceptable(cs, holders, msgh, remote, dest)) {
1599 return;
1600 }
1601
1602 /* we need an accurate ("real") value for the response and
1603 to store into the IDS, but not for insertion into the
1604 rings for example */
1605 struct timespec queryRealTime;
1606 gettime(&queryRealTime, true);
1607
1608 std::shared_ptr<DNSCryptQuery> dnsCryptQuery = nullptr;
1609 auto dnsCryptResponse = checkDNSCryptQuery(cs, query, len, dnsCryptQuery, queryRealTime.tv_sec, false);
1610 if (dnsCryptResponse) {
1611 sendUDPResponse(cs.udpFD, reinterpret_cast<char*>(dnsCryptResponse->data()), static_cast<uint16_t>(dnsCryptResponse->size()), 0, dest, remote);
1612 return;
1613 }
1614
1615 struct dnsheader* dh = reinterpret_cast<struct dnsheader*>(query);
1616 queryId = ntohs(dh->id);
1617
1618 if (!checkQueryHeaders(dh)) {
1619 return;
1620 }
1621
1622 uint16_t qtype, qclass;
1623 unsigned int consumed = 0;
1624 DNSName qname(query, len, sizeof(dnsheader), false, &qtype, &qclass, &consumed);
1625 DNSQuestion dq(&qname, qtype, qclass, consumed, dest.sin4.sin_family != 0 ? &dest : &cs.local, &remote, dh, queryBufferSize, len, false, &queryRealTime);
1626 dq.dnsCryptQuery = std::move(dnsCryptQuery);
1627 std::shared_ptr<DownstreamState> ss{nullptr};
1628 auto result = processQuery(dq, cs, holders, ss);
1629
1630 if (result == ProcessQueryResult::Drop) {
1631 return;
1632 }
1633
1634 if (result == ProcessQueryResult::SendAnswer) {
1635 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1636 if (dq.delayMsec == 0 && responsesVect != nullptr) {
1637 queueResponse(cs, reinterpret_cast<char*>(dq.dh), dq.len, *dq.local, *dq.remote, responsesVect[*queuedResponses], respIOV, respCBuf);
1638 (*queuedResponses)++;
1639 return;
1640 }
1641 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1642 /* we use dest, always, because we don't want to use the listening address to send a response since it could be 0.0.0.0 */
1643 sendUDPResponse(cs.udpFD, reinterpret_cast<char*>(dq.dh), dq.len, dq.delayMsec, dest, *dq.remote);
1644 return;
1645 }
1646
1647 if (result != ProcessQueryResult::PassToBackend || ss == nullptr) {
1648 return;
1649 }
1650
1651 unsigned int idOffset = (ss->idOffset++) % ss->idStates.size();
1652 IDState* ids = &ss->idStates[idOffset];
1653 ids->age = 0;
1654 DOHUnit* du = nullptr;
1655
1656 /* that means that the state was in use, possibly with an allocated
1657 DOHUnit that we will need to handle, but we can't touch it before
1658 confirming that we now own this state */
1659 if (ids->isInUse()) {
1660 du = ids->du;
1661 }
1662
1663 /* we atomically replace the value, we now own this state */
1664 if (!ids->markAsUsed()) {
1665 /* the state was not in use.
1666 we reset 'du' because it might have still been in use when we read it. */
1667 du = nullptr;
1668 ++ss->outstanding;
1669 }
1670 else {
1671 /* we are reusing a state, no change in outstanding but if there was an existing DOHUnit we need
1672 to handle it because it's about to be overwritten. */
1673 ids->du = nullptr;
1674 ++ss->reuseds;
1675 ++g_stats.downstreamTimeouts;
1676 handleDOHTimeout(du);
1677 }
1678
1679 ids->cs = &cs;
1680 ids->origFD = cs.udpFD;
1681 ids->origID = dh->id;
1682 setIDStateFromDNSQuestion(*ids, dq, std::move(qname));
1683
1684 /* If we couldn't harvest the real dest addr, still
1685 write down the listening addr since it will be useful
1686 (especially if it's not an 'any' one).
1687 We need to keep track of which one it is since we may
1688 want to use the real but not the listening addr to reply.
1689 */
1690 if (dest.sin4.sin_family != 0) {
1691 ids->origDest = dest;
1692 ids->destHarvested = true;
1693 }
1694 else {
1695 ids->origDest = cs.local;
1696 ids->destHarvested = false;
1697 }
1698
1699 dh->id = idOffset;
1700
1701 int fd = pickBackendSocketForSending(ss);
1702 ssize_t ret = udpClientSendRequestToBackend(ss, fd, query, dq.len);
1703
1704 if(ret < 0) {
1705 ++ss->sendErrors;
1706 ++g_stats.downstreamSendErrors;
1707 }
1708
1709 vinfolog("Got query for %s|%s from %s, relayed to %s", ids->qname.toLogString(), QType(ids->qtype).getName(), remote.toStringWithPort(), ss->getName());
1710 }
1711 catch(const std::exception& e){
1712 vinfolog("Got an error in UDP question thread while parsing a query from %s, id %d: %s", remote.toStringWithPort(), queryId, e.what());
1713 }
1714 }
1715
1716 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1717 static void MultipleMessagesUDPClientThread(ClientState* cs, LocalHolders& holders)
1718 {
1719 struct MMReceiver
1720 {
1721 char packet[s_maxPacketCacheEntrySize];
1722 ComboAddress remote;
1723 ComboAddress dest;
1724 struct iovec iov;
1725 /* used by HarvestDestinationAddress */
1726 cmsgbuf_aligned cbuf;
1727 };
1728 const size_t vectSize = g_udpVectorSize;
1729 /* the actual buffer is larger because:
1730 - we may have to add EDNS and/or ECS
1731 - we use it for self-generated responses (from rule or cache)
1732 but we only accept incoming payloads up to that size
1733 */
1734 static_assert(s_udpIncomingBufferSize <= sizeof(MMReceiver::packet), "the incoming buffer size should not be larger than sizeof(MMReceiver::packet)");
1735
1736 auto recvData = std::unique_ptr<MMReceiver[]>(new MMReceiver[vectSize]);
1737 auto msgVec = std::unique_ptr<struct mmsghdr[]>(new struct mmsghdr[vectSize]);
1738 auto outMsgVec = std::unique_ptr<struct mmsghdr[]>(new struct mmsghdr[vectSize]);
1739
1740 /* initialize the structures needed to receive our messages */
1741 for (size_t idx = 0; idx < vectSize; idx++) {
1742 recvData[idx].remote.sin4.sin_family = cs->local.sin4.sin_family;
1743 fillMSGHdr(&msgVec[idx].msg_hdr, &recvData[idx].iov, &recvData[idx].cbuf, sizeof(recvData[idx].cbuf), recvData[idx].packet, s_udpIncomingBufferSize, &recvData[idx].remote);
1744 }
1745
1746 /* go now */
1747 for(;;) {
1748
1749 /* reset the IO vector, since it's also used to send the vector of responses
1750 to avoid having to copy the data around */
1751 for (size_t idx = 0; idx < vectSize; idx++) {
1752 recvData[idx].iov.iov_base = recvData[idx].packet;
1753 recvData[idx].iov.iov_len = sizeof(recvData[idx].packet);
1754 }
1755
1756 /* block until we have at least one message ready, but return
1757 as many as possible to save the syscall costs */
1758 int msgsGot = recvmmsg(cs->udpFD, msgVec.get(), vectSize, MSG_WAITFORONE | MSG_TRUNC, nullptr);
1759
1760 if (msgsGot <= 0) {
1761 vinfolog("Getting UDP messages via recvmmsg() failed with: %s", stringerror());
1762 continue;
1763 }
1764
1765 unsigned int msgsToSend = 0;
1766
1767 /* process the received messages */
1768 for (int msgIdx = 0; msgIdx < msgsGot; msgIdx++) {
1769 const struct msghdr* msgh = &msgVec[msgIdx].msg_hdr;
1770 unsigned int got = msgVec[msgIdx].msg_len;
1771 const ComboAddress& remote = recvData[msgIdx].remote;
1772
1773 if (static_cast<size_t>(got) < sizeof(struct dnsheader)) {
1774 ++g_stats.nonCompliantQueries;
1775 continue;
1776 }
1777
1778 processUDPQuery(*cs, holders, msgh, remote, recvData[msgIdx].dest, recvData[msgIdx].packet, static_cast<uint16_t>(got), sizeof(recvData[msgIdx].packet), outMsgVec.get(), &msgsToSend, &recvData[msgIdx].iov, &recvData[msgIdx].cbuf);
1779
1780 }
1781
1782 /* immediate (not delayed or sent to a backend) responses (mostly from a rule, dynamic block
1783 or the cache) can be sent in batch too */
1784
1785 if (msgsToSend > 0 && msgsToSend <= static_cast<unsigned int>(msgsGot)) {
1786 int sent = sendmmsg(cs->udpFD, outMsgVec.get(), msgsToSend, 0);
1787
1788 if (sent < 0 || static_cast<unsigned int>(sent) != msgsToSend) {
1789 vinfolog("Error sending responses with sendmmsg() (%d on %u): %s", sent, msgsToSend, stringerror());
1790 }
1791 }
1792
1793 }
1794 }
1795 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1796
1797 // listens to incoming queries, sends out to downstream servers, noting the intended return path
1798 static void udpClientThread(ClientState* cs)
1799 try
1800 {
1801 setThreadName("dnsdist/udpClie");
1802 LocalHolders holders;
1803
1804 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1805 if (g_udpVectorSize > 1) {
1806 MultipleMessagesUDPClientThread(cs, holders);
1807
1808 }
1809 else
1810 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1811 {
1812 char packet[s_maxPacketCacheEntrySize];
1813 /* the actual buffer is larger because:
1814 - we may have to add EDNS and/or ECS
1815 - we use it for self-generated responses (from rule or cache)
1816 but we only accept incoming payloads up to that size
1817 */
1818 static_assert(s_udpIncomingBufferSize <= sizeof(packet), "the incoming buffer size should not be larger than sizeof(MMReceiver::packet)");
1819 struct msghdr msgh;
1820 struct iovec iov;
1821 /* used by HarvestDestinationAddress */
1822 cmsgbuf_aligned cbuf;
1823
1824 ComboAddress remote;
1825 ComboAddress dest;
1826 remote.sin4.sin_family = cs->local.sin4.sin_family;
1827 fillMSGHdr(&msgh, &iov, &cbuf, sizeof(cbuf), packet, s_udpIncomingBufferSize, &remote);
1828
1829 for(;;) {
1830 ssize_t got = recvmsg(cs->udpFD, &msgh, 0);
1831
1832 if (got < 0 || static_cast<size_t>(got) < sizeof(struct dnsheader)) {
1833 ++g_stats.nonCompliantQueries;
1834 continue;
1835 }
1836
1837 processUDPQuery(*cs, holders, &msgh, remote, dest, packet, static_cast<uint16_t>(got), sizeof(packet), nullptr, nullptr, nullptr, nullptr);
1838 }
1839 }
1840 }
1841 catch(const std::exception &e)
1842 {
1843 errlog("UDP client thread died because of exception: %s", e.what());
1844 }
1845 catch(const PDNSException &e)
1846 {
1847 errlog("UDP client thread died because of PowerDNS exception: %s", e.reason);
1848 }
1849 catch(...)
1850 {
1851 errlog("UDP client thread died because of an exception: %s", "unknown");
1852 }
1853
1854 uint16_t getRandomDNSID()
1855 {
1856 #ifdef HAVE_LIBSODIUM
1857 return randombytes_uniform(65536);
1858 #else
1859 return (random() % 65536);
1860 #endif
1861 }
1862
1863 uint64_t g_maxTCPClientThreads{10};
1864 std::atomic<uint16_t> g_cacheCleaningDelay{60};
1865 std::atomic<uint16_t> g_cacheCleaningPercentage{100};
1866
1867 void maintThread()
1868 {
1869 setThreadName("dnsdist/main");
1870 int interval = 1;
1871 size_t counter = 0;
1872 int32_t secondsToWaitLog = 0;
1873
1874 for(;;) {
1875 sleep(interval);
1876
1877 {
1878 std::lock_guard<std::mutex> lock(g_luamutex);
1879 auto f = g_lua.readVariable<boost::optional<std::function<void()> > >("maintenance");
1880 if(f) {
1881 try {
1882 (*f)();
1883 secondsToWaitLog = 0;
1884 }
1885 catch(std::exception &e) {
1886 if (secondsToWaitLog <= 0) {
1887 infolog("Error during execution of maintenance function: %s", e.what());
1888 secondsToWaitLog = 61;
1889 }
1890 secondsToWaitLog -= interval;
1891 }
1892 }
1893 }
1894
1895 counter++;
1896 if (counter >= g_cacheCleaningDelay) {
1897 /* keep track, for each cache, of whether we should keep
1898 expired entries */
1899 std::map<std::shared_ptr<DNSDistPacketCache>, bool> caches;
1900
1901 /* gather all caches actually used by at least one pool, and see
1902 if something prevents us from cleaning the expired entries */
1903 auto localPools = g_pools.getLocal();
1904 for (const auto& entry : *localPools) {
1905 auto& pool = entry.second;
1906
1907 auto packetCache = pool->packetCache;
1908 if (!packetCache) {
1909 continue;
1910 }
1911
1912 auto pair = caches.insert({packetCache, false});
1913 auto& iter = pair.first;
1914 /* if we need to keep stale data for this cache (ie, not clear
1915 expired entries when at least one pool using this cache
1916 has all its backends down) */
1917 if (packetCache->keepStaleData() && iter->second == false) {
1918 /* so far all pools had at least one backend up */
1919 if (pool->countServers(true) == 0) {
1920 iter->second = true;
1921 }
1922 }
1923 }
1924
1925 for (auto pair : caches) {
1926 /* shall we keep expired entries ? */
1927 if (pair.second == true) {
1928 continue;
1929 }
1930 auto& packetCache = pair.first;
1931 size_t upTo = (packetCache->getMaxEntries()* (100 - g_cacheCleaningPercentage)) / 100;
1932 packetCache->purgeExpired(upTo);
1933 }
1934 counter = 0;
1935 }
1936
1937 // ponder pruning g_dynblocks of expired entries here
1938 }
1939 }
1940
1941 static void secPollThread()
1942 {
1943 setThreadName("dnsdist/secpoll");
1944
1945 for (;;) {
1946 try {
1947 doSecPoll(g_secPollSuffix);
1948 }
1949 catch(...) {
1950 }
1951 sleep(g_secPollInterval);
1952 }
1953 }
1954
1955 static void healthChecksThread()
1956 {
1957 setThreadName("dnsdist/healthC");
1958
1959 static const int interval = 1;
1960
1961 for(;;) {
1962 sleep(interval);
1963
1964 if(g_tcpclientthreads->getQueuedCount() > 1 && !g_tcpclientthreads->hasReachedMaxThreads()) {
1965 g_tcpclientthreads->addTCPClientThread();
1966 }
1967
1968 auto mplexer = std::shared_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
1969 auto states = g_dstates.getLocal(); // this points to the actual shared_ptrs!
1970 for(auto& dss : *states) {
1971 if(++dss->lastCheck < dss->checkInterval) {
1972 continue;
1973 }
1974
1975 dss->lastCheck = 0;
1976
1977 if (dss->availability == DownstreamState::Availability::Auto) {
1978 if (!queueHealthCheck(mplexer, dss)) {
1979 updateHealthCheckResult(dss, false);
1980 }
1981 }
1982
1983 auto delta = dss->sw.udiffAndSet()/1000000.0;
1984 dss->queryLoad = 1.0*(dss->queries.load() - dss->prev.queries.load())/delta;
1985 dss->dropRate = 1.0*(dss->reuseds.load() - dss->prev.reuseds.load())/delta;
1986 dss->prev.queries.store(dss->queries.load());
1987 dss->prev.reuseds.store(dss->reuseds.load());
1988
1989 for (IDState& ids : dss->idStates) { // timeouts
1990 int64_t usageIndicator = ids.usageIndicator;
1991 if(IDState::isInUse(usageIndicator) && ids.age++ > g_udpTimeout) {
1992 /* We mark the state as unused as soon as possible
1993 to limit the risk of racing with the
1994 responder thread.
1995 */
1996 auto oldDU = ids.du;
1997
1998 if (!ids.tryMarkUnused(usageIndicator)) {
1999 /* this state has been altered in the meantime,
2000 don't go anywhere near it */
2001 continue;
2002 }
2003 ids.du = nullptr;
2004 handleDOHTimeout(oldDU);
2005 ids.age = 0;
2006 dss->reuseds++;
2007 --dss->outstanding;
2008 ++g_stats.downstreamTimeouts; // this is an 'actively' discovered timeout
2009 vinfolog("Had a downstream timeout from %s (%s) for query for %s|%s from %s",
2010 dss->remote.toStringWithPort(), dss->name,
2011 ids.qname.toLogString(), QType(ids.qtype).getName(), ids.origRemote.toStringWithPort());
2012
2013 struct timespec ts;
2014 gettime(&ts);
2015
2016 struct dnsheader fake;
2017 memset(&fake, 0, sizeof(fake));
2018 fake.id = ids.origID;
2019
2020 g_rings.insertResponse(ts, ids.origRemote, ids.qname, ids.qtype, std::numeric_limits<unsigned int>::max(), 0, fake, dss->remote);
2021 }
2022 }
2023 }
2024
2025 handleQueuedHealthChecks(mplexer);
2026 }
2027 }
2028
2029 static void bindAny(int af, int sock)
2030 {
2031 __attribute__((unused)) int one = 1;
2032
2033 #ifdef IP_FREEBIND
2034 if (setsockopt(sock, IPPROTO_IP, IP_FREEBIND, &one, sizeof(one)) < 0)
2035 warnlog("Warning: IP_FREEBIND setsockopt failed: %s", stringerror());
2036 #endif
2037
2038 #ifdef IP_BINDANY
2039 if (af == AF_INET)
2040 if (setsockopt(sock, IPPROTO_IP, IP_BINDANY, &one, sizeof(one)) < 0)
2041 warnlog("Warning: IP_BINDANY setsockopt failed: %s", stringerror());
2042 #endif
2043 #ifdef IPV6_BINDANY
2044 if (af == AF_INET6)
2045 if (setsockopt(sock, IPPROTO_IPV6, IPV6_BINDANY, &one, sizeof(one)) < 0)
2046 warnlog("Warning: IPV6_BINDANY setsockopt failed: %s", stringerror());
2047 #endif
2048 #ifdef SO_BINDANY
2049 if (setsockopt(sock, SOL_SOCKET, SO_BINDANY, &one, sizeof(one)) < 0)
2050 warnlog("Warning: SO_BINDANY setsockopt failed: %s", stringerror());
2051 #endif
2052 }
2053
2054 static void dropGroupPrivs(gid_t gid)
2055 {
2056 if (gid) {
2057 if (setgid(gid) == 0) {
2058 if (setgroups(0, NULL) < 0) {
2059 warnlog("Warning: Unable to drop supplementary gids: %s", stringerror());
2060 }
2061 }
2062 else {
2063 warnlog("Warning: Unable to set group ID to %d: %s", gid, stringerror());
2064 }
2065 }
2066 }
2067
2068 static void dropUserPrivs(uid_t uid)
2069 {
2070 if(uid) {
2071 if(setuid(uid) < 0) {
2072 warnlog("Warning: Unable to set user ID to %d: %s", uid, stringerror());
2073 }
2074 }
2075 }
2076
2077 static void checkFileDescriptorsLimits(size_t udpBindsCount, size_t tcpBindsCount)
2078 {
2079 /* stdin, stdout, stderr */
2080 size_t requiredFDsCount = 3;
2081 auto backends = g_dstates.getLocal();
2082 /* UDP sockets to backends */
2083 size_t backendUDPSocketsCount = 0;
2084 for (const auto& backend : *backends) {
2085 backendUDPSocketsCount += backend->sockets.size();
2086 }
2087 requiredFDsCount += backendUDPSocketsCount;
2088 /* TCP sockets to backends */
2089 requiredFDsCount += (backends->size() * g_maxTCPClientThreads);
2090 /* listening sockets */
2091 requiredFDsCount += udpBindsCount;
2092 requiredFDsCount += tcpBindsCount;
2093 /* max TCP connections currently served */
2094 requiredFDsCount += g_maxTCPClientThreads;
2095 /* max pipes for communicating between TCP acceptors and client threads */
2096 requiredFDsCount += (g_maxTCPClientThreads * 2);
2097 /* max TCP queued connections */
2098 requiredFDsCount += g_maxTCPQueuedConnections;
2099 /* DelayPipe pipe */
2100 requiredFDsCount += 2;
2101 /* syslog socket */
2102 requiredFDsCount++;
2103 /* webserver main socket */
2104 requiredFDsCount++;
2105 /* console main socket */
2106 requiredFDsCount++;
2107 /* carbon export */
2108 requiredFDsCount++;
2109 /* history file */
2110 requiredFDsCount++;
2111 struct rlimit rl;
2112 getrlimit(RLIMIT_NOFILE, &rl);
2113 if (rl.rlim_cur <= requiredFDsCount) {
2114 warnlog("Warning, this configuration can use more than %d file descriptors, web server and console connections not included, and the current limit is %d.", std::to_string(requiredFDsCount), std::to_string(rl.rlim_cur));
2115 #ifdef HAVE_SYSTEMD
2116 warnlog("You can increase this value by using LimitNOFILE= in the systemd unit file or ulimit.");
2117 #else
2118 warnlog("You can increase this value by using ulimit.");
2119 #endif
2120 }
2121 }
2122
2123 static void setUpLocalBind(std::unique_ptr<ClientState>& cs)
2124 {
2125 /* skip some warnings if there is an identical UDP context */
2126 bool warn = cs->tcp == false || cs->tlsFrontend != nullptr || cs->dohFrontend != nullptr;
2127 int& fd = cs->tcp == false ? cs->udpFD : cs->tcpFD;
2128 (void) warn;
2129
2130 fd = SSocket(cs->local.sin4.sin_family, cs->tcp == false ? SOCK_DGRAM : SOCK_STREAM, 0);
2131
2132 if (cs->tcp) {
2133 SSetsockopt(fd, SOL_SOCKET, SO_REUSEADDR, 1);
2134 #ifdef TCP_DEFER_ACCEPT
2135 SSetsockopt(fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, 1);
2136 #endif
2137 if (cs->fastOpenQueueSize > 0) {
2138 #ifdef TCP_FASTOPEN
2139 SSetsockopt(fd, IPPROTO_TCP, TCP_FASTOPEN, cs->fastOpenQueueSize);
2140 #else
2141 if (warn) {
2142 warnlog("TCP Fast Open has been configured on local address '%s' but is not supported", cs->local.toStringWithPort());
2143 }
2144 #endif
2145 }
2146 }
2147
2148 if(cs->local.sin4.sin_family == AF_INET6) {
2149 SSetsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, 1);
2150 }
2151
2152 bindAny(cs->local.sin4.sin_family, fd);
2153
2154 if(!cs->tcp && IsAnyAddress(cs->local)) {
2155 int one=1;
2156 setsockopt(fd, IPPROTO_IP, GEN_IP_PKTINFO, &one, sizeof(one)); // linux supports this, so why not - might fail on other systems
2157 #ifdef IPV6_RECVPKTINFO
2158 setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, &one, sizeof(one));
2159 #endif
2160 }
2161
2162 if (cs->reuseport) {
2163 #ifdef SO_REUSEPORT
2164 SSetsockopt(fd, SOL_SOCKET, SO_REUSEPORT, 1);
2165 #else
2166 if (warn) {
2167 /* no need to warn again if configured but support is not available, we already did for UDP */
2168 warnlog("SO_REUSEPORT has been configured on local address '%s' but is not supported", cs->local.toStringWithPort());
2169 }
2170 #endif
2171 }
2172
2173 if (!cs->tcp) {
2174 if (cs->local.isIPv4()) {
2175 try {
2176 setSocketIgnorePMTU(cs->udpFD);
2177 }
2178 catch(const std::exception& e) {
2179 warnlog("Failed to set IP_MTU_DISCOVER on UDP server socket for local address '%s': %s", cs->local.toStringWithPort(), e.what());
2180 }
2181 }
2182 }
2183
2184 const std::string& itf = cs->interface;
2185 if (!itf.empty()) {
2186 #ifdef SO_BINDTODEVICE
2187 int res = setsockopt(fd, SOL_SOCKET, SO_BINDTODEVICE, itf.c_str(), itf.length());
2188 if (res != 0) {
2189 warnlog("Error setting up the interface on local address '%s': %s", cs->local.toStringWithPort(), stringerror());
2190 }
2191 #else
2192 if (warn) {
2193 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", cs->local.toStringWithPort());
2194 }
2195 #endif
2196 }
2197
2198 #ifdef HAVE_EBPF
2199 if (g_defaultBPFFilter) {
2200 cs->attachFilter(g_defaultBPFFilter);
2201 vinfolog("Attaching default BPF Filter to %s frontend %s", (!cs->tcp ? "UDP" : "TCP"), cs->local.toStringWithPort());
2202 }
2203 #endif /* HAVE_EBPF */
2204
2205 if (cs->tlsFrontend != nullptr) {
2206 if (!cs->tlsFrontend->setupTLS()) {
2207 errlog("Error while setting up TLS on local address '%s', exiting", cs->local.toStringWithPort());
2208 _exit(EXIT_FAILURE);
2209 }
2210 }
2211
2212 if (cs->dohFrontend != nullptr) {
2213 cs->dohFrontend->setup();
2214 }
2215
2216 SBind(fd, cs->local);
2217
2218 if (cs->tcp) {
2219 SListen(cs->tcpFD, SOMAXCONN);
2220 if (cs->tlsFrontend != nullptr) {
2221 warnlog("Listening on %s for TLS", cs->local.toStringWithPort());
2222 }
2223 else if (cs->dohFrontend != nullptr) {
2224 warnlog("Listening on %s for DoH", cs->local.toStringWithPort());
2225 }
2226 else if (cs->dnscryptCtx != nullptr) {
2227 warnlog("Listening on %s for DNSCrypt", cs->local.toStringWithPort());
2228 }
2229 else {
2230 warnlog("Listening on %s", cs->local.toStringWithPort());
2231 }
2232 }
2233
2234 cs->ready = true;
2235 }
2236
2237 struct
2238 {
2239 vector<string> locals;
2240 vector<string> remotes;
2241 bool checkConfig{false};
2242 bool beClient{false};
2243 bool beSupervised{false};
2244 string command;
2245 string config;
2246 string uid;
2247 string gid;
2248 } g_cmdLine;
2249
2250 std::atomic<bool> g_configurationDone{false};
2251
2252 static void usage()
2253 {
2254 cout<<endl;
2255 cout<<"Syntax: dnsdist [-C,--config file] [-c,--client [IP[:PORT]]]\n";
2256 cout<<"[-e,--execute cmd] [-h,--help] [-l,--local addr]\n";
2257 cout<<"[-v,--verbose] [--check-config] [--version]\n";
2258 cout<<"\n";
2259 cout<<"-a,--acl netmask Add this netmask to the ACL\n";
2260 cout<<"-C,--config file Load configuration from 'file'\n";
2261 cout<<"-c,--client Operate as a client, connect to dnsdist. This reads\n";
2262 cout<<" controlSocket from your configuration file, but also\n";
2263 cout<<" accepts an IP:PORT argument\n";
2264 #ifdef HAVE_LIBSODIUM
2265 cout<<"-k,--setkey KEY Use KEY for encrypted communication to dnsdist. This\n";
2266 cout<<" is similar to setting setKey in the configuration file.\n";
2267 cout<<" NOTE: this will leak this key in your shell's history\n";
2268 cout<<" and in the systems running process list.\n";
2269 #endif
2270 cout<<"--check-config Validate the configuration file and exit. The exit-code\n";
2271 cout<<" reflects the validation, 0 is OK, 1 means an error.\n";
2272 cout<<" Any errors are printed as well.\n";
2273 cout<<"-e,--execute cmd Connect to dnsdist and execute 'cmd'\n";
2274 cout<<"-g,--gid gid Change the process group ID after binding sockets\n";
2275 cout<<"-h,--help Display this helpful message\n";
2276 cout<<"-l,--local address Listen on this local address\n";
2277 cout<<"--supervised Don't open a console, I'm supervised\n";
2278 cout<<" (use with e.g. systemd and daemontools)\n";
2279 cout<<"--disable-syslog Don't log to syslog, only to stdout\n";
2280 cout<<" (use with e.g. systemd)\n";
2281 cout<<"-u,--uid uid Change the process user ID after binding sockets\n";
2282 cout<<"-v,--verbose Enable verbose mode\n";
2283 cout<<"-V,--version Show dnsdist version information and exit\n";
2284 }
2285
2286 int main(int argc, char** argv)
2287 try
2288 {
2289 size_t udpBindsCount = 0;
2290 size_t tcpBindsCount = 0;
2291 rl_attempted_completion_function = my_completion;
2292 rl_completion_append_character = 0;
2293
2294 signal(SIGPIPE, SIG_IGN);
2295 signal(SIGCHLD, SIG_IGN);
2296 openlog("dnsdist", LOG_PID|LOG_NDELAY, LOG_DAEMON);
2297
2298 #ifdef HAVE_LIBSODIUM
2299 if (sodium_init() == -1) {
2300 cerr<<"Unable to initialize crypto library"<<endl;
2301 exit(EXIT_FAILURE);
2302 }
2303 g_hashperturb=randombytes_uniform(0xffffffff);
2304 srandom(randombytes_uniform(0xffffffff));
2305 #else
2306 {
2307 struct timeval tv;
2308 gettimeofday(&tv, 0);
2309 srandom(tv.tv_sec ^ tv.tv_usec ^ getpid());
2310 g_hashperturb=random();
2311 }
2312
2313 #endif
2314 ComboAddress clientAddress = ComboAddress();
2315 g_cmdLine.config=SYSCONFDIR "/dnsdist.conf";
2316 struct option longopts[]={
2317 {"acl", required_argument, 0, 'a'},
2318 {"check-config", no_argument, 0, 1},
2319 {"client", no_argument, 0, 'c'},
2320 {"config", required_argument, 0, 'C'},
2321 {"disable-syslog", no_argument, 0, 2},
2322 {"execute", required_argument, 0, 'e'},
2323 {"gid", required_argument, 0, 'g'},
2324 {"help", no_argument, 0, 'h'},
2325 {"local", required_argument, 0, 'l'},
2326 {"setkey", required_argument, 0, 'k'},
2327 {"supervised", no_argument, 0, 3},
2328 {"uid", required_argument, 0, 'u'},
2329 {"verbose", no_argument, 0, 'v'},
2330 {"version", no_argument, 0, 'V'},
2331 {0,0,0,0}
2332 };
2333 int longindex=0;
2334 string optstring;
2335 for(;;) {
2336 int c=getopt_long(argc, argv, "a:cC:e:g:hk:l:u:vV", longopts, &longindex);
2337 if(c==-1)
2338 break;
2339 switch(c) {
2340 case 1:
2341 g_cmdLine.checkConfig=true;
2342 break;
2343 case 2:
2344 g_syslog=false;
2345 break;
2346 case 3:
2347 g_cmdLine.beSupervised=true;
2348 break;
2349 case 'C':
2350 g_cmdLine.config=optarg;
2351 break;
2352 case 'c':
2353 g_cmdLine.beClient=true;
2354 break;
2355 case 'e':
2356 g_cmdLine.command=optarg;
2357 break;
2358 case 'g':
2359 g_cmdLine.gid=optarg;
2360 break;
2361 case 'h':
2362 cout<<"dnsdist "<<VERSION<<endl;
2363 usage();
2364 cout<<"\n";
2365 exit(EXIT_SUCCESS);
2366 break;
2367 case 'a':
2368 optstring=optarg;
2369 g_ACL.modify([optstring](NetmaskGroup& nmg) { nmg.addMask(optstring); });
2370 break;
2371 case 'k':
2372 #ifdef HAVE_LIBSODIUM
2373 if (B64Decode(string(optarg), g_consoleKey) < 0) {
2374 cerr<<"Unable to decode key '"<<optarg<<"'."<<endl;
2375 exit(EXIT_FAILURE);
2376 }
2377 #else
2378 cerr<<"dnsdist has been built without libsodium, -k/--setkey is unsupported."<<endl;
2379 exit(EXIT_FAILURE);
2380 #endif
2381 break;
2382 case 'l':
2383 g_cmdLine.locals.push_back(trim_copy(string(optarg)));
2384 break;
2385 case 'u':
2386 g_cmdLine.uid=optarg;
2387 break;
2388 case 'v':
2389 g_verbose=true;
2390 break;
2391 case 'V':
2392 #ifdef LUAJIT_VERSION
2393 cout<<"dnsdist "<<VERSION<<" ("<<LUA_RELEASE<<" ["<<LUAJIT_VERSION<<"])"<<endl;
2394 #else
2395 cout<<"dnsdist "<<VERSION<<" ("<<LUA_RELEASE<<")"<<endl;
2396 #endif
2397 cout<<"Enabled features: ";
2398 #ifdef HAVE_CDB
2399 cout<<"cdb ";
2400 #endif
2401 #ifdef HAVE_DNS_OVER_TLS
2402 cout<<"dns-over-tls(";
2403 #ifdef HAVE_GNUTLS
2404 cout<<"gnutls";
2405 #ifdef HAVE_LIBSSL
2406 cout<<" ";
2407 #endif
2408 #endif
2409 #ifdef HAVE_LIBSSL
2410 cout<<"openssl";
2411 #endif
2412 cout<<") ";
2413 #endif
2414 #ifdef HAVE_DNS_OVER_HTTPS
2415 cout<<"dns-over-https(DOH) ";
2416 #endif
2417 #ifdef HAVE_DNSCRYPT
2418 cout<<"dnscrypt ";
2419 #endif
2420 #ifdef HAVE_EBPF
2421 cout<<"ebpf ";
2422 #endif
2423 #ifdef HAVE_FSTRM
2424 cout<<"fstrm ";
2425 #endif
2426 #ifdef HAVE_LIBCRYPTO
2427 cout<<"ipcipher ";
2428 #endif
2429 #ifdef HAVE_LIBSODIUM
2430 cout<<"libsodium ";
2431 #endif
2432 #ifdef HAVE_LMDB
2433 cout<<"lmdb ";
2434 #endif
2435 #ifdef HAVE_PROTOBUF
2436 cout<<"protobuf ";
2437 #endif
2438 #ifdef HAVE_RE2
2439 cout<<"re2 ";
2440 #endif
2441 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
2442 cout<<"recvmmsg/sendmmsg ";
2443 #endif
2444 #ifdef HAVE_NET_SNMP
2445 cout<<"snmp ";
2446 #endif
2447 #ifdef HAVE_SYSTEMD
2448 cout<<"systemd";
2449 #endif
2450 cout<<endl;
2451 exit(EXIT_SUCCESS);
2452 break;
2453 case '?':
2454 //getopt_long printed an error message.
2455 usage();
2456 exit(EXIT_FAILURE);
2457 break;
2458 }
2459 }
2460
2461 argc-=optind;
2462 argv+=optind;
2463 for(auto p = argv; *p; ++p) {
2464 if(g_cmdLine.beClient) {
2465 clientAddress = ComboAddress(*p, 5199);
2466 } else {
2467 g_cmdLine.remotes.push_back(*p);
2468 }
2469 }
2470
2471 ServerPolicy leastOutstandingPol{"leastOutstanding", leastOutstanding, false};
2472
2473 g_policy.setState(leastOutstandingPol);
2474 if(g_cmdLine.beClient || !g_cmdLine.command.empty()) {
2475 setupLua(true, false, g_cmdLine.config);
2476 if (clientAddress != ComboAddress())
2477 g_serverControl = clientAddress;
2478 doClient(g_serverControl, g_cmdLine.command);
2479 _exit(EXIT_SUCCESS);
2480 }
2481
2482 auto acl = g_ACL.getCopy();
2483 if(acl.empty()) {
2484 for(auto& addr : {"127.0.0.0/8", "10.0.0.0/8", "100.64.0.0/10", "169.254.0.0/16", "192.168.0.0/16", "172.16.0.0/12", "::1/128", "fc00::/7", "fe80::/10"})
2485 acl.addMask(addr);
2486 g_ACL.setState(acl);
2487 }
2488
2489 auto consoleACL = g_consoleACL.getCopy();
2490 for (const auto& mask : { "127.0.0.1/8", "::1/128" }) {
2491 consoleACL.addMask(mask);
2492 }
2493 g_consoleACL.setState(consoleACL);
2494
2495 if (g_cmdLine.checkConfig) {
2496 setupLua(false, true, g_cmdLine.config);
2497 // No exception was thrown
2498 infolog("Configuration '%s' OK!", g_cmdLine.config);
2499 _exit(EXIT_SUCCESS);
2500 }
2501
2502 auto todo=setupLua(false, false, g_cmdLine.config);
2503
2504 auto localPools = g_pools.getCopy();
2505 {
2506 bool precompute = false;
2507 if (g_policy.getLocal()->name == "chashed") {
2508 precompute = true;
2509 } else {
2510 for (const auto& entry: localPools) {
2511 if (entry.second->policy != nullptr && entry.second->policy->name == "chashed") {
2512 precompute = true;
2513 break ;
2514 }
2515 }
2516 }
2517 if (precompute) {
2518 vinfolog("Pre-computing hashes for consistent hash load-balancing policy");
2519 // pre compute hashes
2520 auto backends = g_dstates.getLocal();
2521 for (auto& backend: *backends) {
2522 backend->hash();
2523 }
2524 }
2525 }
2526
2527 if (!g_cmdLine.locals.empty()) {
2528 for (auto it = g_frontends.begin(); it != g_frontends.end(); ) {
2529 /* DoH, DoT and DNSCrypt frontends are separate */
2530 if ((*it)->dohFrontend == nullptr && (*it)->tlsFrontend == nullptr && (*it)->dnscryptCtx == nullptr) {
2531 it = g_frontends.erase(it);
2532 }
2533 else {
2534 ++it;
2535 }
2536 }
2537
2538 for(const auto& loc : g_cmdLine.locals) {
2539 /* UDP */
2540 g_frontends.push_back(std::unique_ptr<ClientState>(new ClientState(ComboAddress(loc, 53), false, false, 0, "", {})));
2541 /* TCP */
2542 g_frontends.push_back(std::unique_ptr<ClientState>(new ClientState(ComboAddress(loc, 53), true, false, 0, "", {})));
2543 }
2544 }
2545
2546 if (g_frontends.empty()) {
2547 /* UDP */
2548 g_frontends.push_back(std::unique_ptr<ClientState>(new ClientState(ComboAddress("127.0.0.1", 53), false, false, 0, "", {})));
2549 /* TCP */
2550 g_frontends.push_back(std::unique_ptr<ClientState>(new ClientState(ComboAddress("127.0.0.1", 53), true, false, 0, "", {})));
2551 }
2552
2553 g_configurationDone = true;
2554
2555 for(auto& frontend : g_frontends) {
2556 setUpLocalBind(frontend);
2557
2558 if (frontend->tcp == false) {
2559 ++udpBindsCount;
2560 }
2561 else {
2562 ++tcpBindsCount;
2563 }
2564 }
2565
2566 warnlog("dnsdist %s comes with ABSOLUTELY NO WARRANTY. This is free software, and you are welcome to redistribute it according to the terms of the GPL version 2", VERSION);
2567
2568 vector<string> vec;
2569 std::string acls;
2570 g_ACL.getLocal()->toStringVector(&vec);
2571 for(const auto& s : vec) {
2572 if (!acls.empty())
2573 acls += ", ";
2574 acls += s;
2575 }
2576 infolog("ACL allowing queries from: %s", acls.c_str());
2577 vec.clear();
2578 acls.clear();
2579 g_consoleACL.getLocal()->toStringVector(&vec);
2580 for (const auto& entry : vec) {
2581 if (!acls.empty()) {
2582 acls += ", ";
2583 }
2584 acls += entry;
2585 }
2586 infolog("Console ACL allowing connections from: %s", acls.c_str());
2587
2588 #ifdef HAVE_LIBSODIUM
2589 if (g_consoleEnabled && g_consoleKey.empty()) {
2590 warnlog("Warning, the console has been enabled via 'controlSocket()' but no key has been set with 'setKey()' so all connections will fail until a key has been set");
2591 }
2592 #endif
2593
2594 uid_t newgid=getegid();
2595 gid_t newuid=geteuid();
2596
2597 if(!g_cmdLine.gid.empty())
2598 newgid = strToGID(g_cmdLine.gid.c_str());
2599
2600 if(!g_cmdLine.uid.empty())
2601 newuid = strToUID(g_cmdLine.uid.c_str());
2602
2603 if (getegid() != newgid) {
2604 if (running_in_service_mgr()) {
2605 errlog("--gid/-g set on command-line, but dnsdist was started as a systemd service. Use the 'Group' setting in the systemd unit file to set the group to run as");
2606 _exit(EXIT_FAILURE);
2607 }
2608 dropGroupPrivs(newgid);
2609 }
2610
2611 if (geteuid() != newuid) {
2612 if (running_in_service_mgr()) {
2613 errlog("--uid/-u set on command-line, but dnsdist was started as a systemd service. Use the 'User' setting in the systemd unit file to set the user to run as");
2614 _exit(EXIT_FAILURE);
2615 }
2616 dropUserPrivs(newuid);
2617 }
2618
2619 try {
2620 /* we might still have capabilities remaining,
2621 for example if we have been started as root
2622 without --uid or --gid (please don't do that)
2623 or as an unprivileged user with ambient
2624 capabilities like CAP_NET_BIND_SERVICE.
2625 */
2626 dropCapabilities(g_capabilitiesToRetain);
2627 }
2628 catch(const std::exception& e) {
2629 warnlog("%s", e.what());
2630 }
2631
2632 /* this need to be done _after_ dropping privileges */
2633 g_delay = new DelayPipe<DelayedPacket>();
2634
2635 if (g_snmpAgent) {
2636 g_snmpAgent->run();
2637 }
2638
2639 g_tcpclientthreads = std::unique_ptr<TCPClientCollection>(new TCPClientCollection(g_maxTCPClientThreads, g_useTCPSinglePipe));
2640
2641 for(auto& t : todo)
2642 t();
2643
2644 localPools = g_pools.getCopy();
2645 /* create the default pool no matter what */
2646 createPoolIfNotExists(localPools, "");
2647 if(g_cmdLine.remotes.size()) {
2648 for(const auto& address : g_cmdLine.remotes) {
2649 auto ret=std::make_shared<DownstreamState>(ComboAddress(address, 53));
2650 addServerToPool(localPools, "", ret);
2651 if (ret->connected && !ret->threadStarted.test_and_set()) {
2652 ret->tid = thread(responderThread, ret);
2653 }
2654 g_dstates.modify([ret](servers_t& servers) { servers.push_back(ret); });
2655 }
2656 }
2657 g_pools.setState(localPools);
2658
2659 if(g_dstates.getLocal()->empty()) {
2660 errlog("No downstream servers defined: all packets will get dropped");
2661 // you might define them later, but you need to know
2662 }
2663
2664 checkFileDescriptorsLimits(udpBindsCount, tcpBindsCount);
2665
2666 auto mplexer = std::shared_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
2667 for(auto& dss : g_dstates.getCopy()) { // it is a copy, but the internal shared_ptrs are the real deal
2668 if (dss->availability == DownstreamState::Availability::Auto) {
2669 if (!queueHealthCheck(mplexer, dss, true)) {
2670 dss->upStatus = false;
2671 warnlog("Marking downstream %s as 'down'", dss->getNameWithAddr());
2672 }
2673 }
2674 }
2675 handleQueuedHealthChecks(mplexer, true);
2676
2677 for(auto& cs : g_frontends) {
2678 if (cs->dohFrontend != nullptr) {
2679 #ifdef HAVE_DNS_OVER_HTTPS
2680 std::thread t1(dohThread, cs.get());
2681 if (!cs->cpus.empty()) {
2682 mapThreadToCPUList(t1.native_handle(), cs->cpus);
2683 }
2684 t1.detach();
2685 #endif /* HAVE_DNS_OVER_HTTPS */
2686 continue;
2687 }
2688 if (cs->udpFD >= 0) {
2689 thread t1(udpClientThread, cs.get());
2690 if (!cs->cpus.empty()) {
2691 mapThreadToCPUList(t1.native_handle(), cs->cpus);
2692 }
2693 t1.detach();
2694 }
2695 else if (cs->tcpFD >= 0) {
2696 thread t1(tcpAcceptorThread, cs.get());
2697 if (!cs->cpus.empty()) {
2698 mapThreadToCPUList(t1.native_handle(), cs->cpus);
2699 }
2700 t1.detach();
2701 }
2702 }
2703
2704 thread carbonthread(carbonDumpThread);
2705 carbonthread.detach();
2706
2707 thread stattid(maintThread);
2708 stattid.detach();
2709
2710 thread healththread(healthChecksThread);
2711
2712 if (!g_secPollSuffix.empty()) {
2713 thread secpollthread(secPollThread);
2714 secpollthread.detach();
2715 }
2716
2717 if(g_cmdLine.beSupervised) {
2718 #ifdef HAVE_SYSTEMD
2719 sd_notify(0, "READY=1");
2720 #endif
2721 healththread.join();
2722 }
2723 else {
2724 healththread.detach();
2725 doConsole();
2726 }
2727 _exit(EXIT_SUCCESS);
2728
2729 }
2730 catch(const LuaContext::ExecutionErrorException& e) {
2731 try {
2732 errlog("Fatal Lua error: %s", e.what());
2733 std::rethrow_if_nested(e);
2734 } catch(const std::exception& ne) {
2735 errlog("Details: %s", ne.what());
2736 }
2737 catch(PDNSException &ae)
2738 {
2739 errlog("Fatal pdns error: %s", ae.reason);
2740 }
2741 _exit(EXIT_FAILURE);
2742 }
2743 catch(std::exception &e)
2744 {
2745 errlog("Fatal error: %s", e.what());
2746 _exit(EXIT_FAILURE);
2747 }
2748 catch(PDNSException &ae)
2749 {
2750 errlog("Fatal pdns error: %s", ae.reason);
2751 _exit(EXIT_FAILURE);
2752 }
2753
2754 uint64_t getLatencyCount(const std::string&)
2755 {
2756 return g_stats.responses + g_stats.selfAnswered + g_stats.cacheHits;
2757 }