]> git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/dnsdist.cc
75d7c56ea299d30c8a31bd31fb61128377ee9d9c
[thirdparty/pdns.git] / pdns / dnsdist.cc
1 /*
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
8 *
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
22
23 #include "config.h"
24
25 #include <fstream>
26 #include <getopt.h>
27 #include <grp.h>
28 #include <limits>
29 #include <netinet/tcp.h>
30 #include <pwd.h>
31 #include <sys/resource.h>
32 #include <unistd.h>
33
34 #if defined (__OpenBSD__) || defined(__NetBSD__)
35 #include <readline/readline.h>
36 #else
37 #include <editline/readline.h>
38 #endif
39
40 #include "dnsdist-systemd.hh"
41 #ifdef HAVE_SYSTEMD
42 #include <systemd/sd-daemon.h>
43 #endif
44
45 #include "dnsdist.hh"
46 #include "dnsdist-cache.hh"
47 #include "dnsdist-console.hh"
48 #include "dnsdist-ecs.hh"
49 #include "dnsdist-healthchecks.hh"
50 #include "dnsdist-lua.hh"
51 #include "dnsdist-rings.hh"
52 #include "dnsdist-secpoll.hh"
53 #include "dnsdist-xpf.hh"
54
55 #include "base64.hh"
56 #include "delaypipe.hh"
57 #include "dolog.hh"
58 #include "dnsname.hh"
59 #include "dnsparser.hh"
60 #include "ednsoptions.hh"
61 #include "gettime.hh"
62 #include "lock.hh"
63 #include "misc.hh"
64 #include "sodcrypto.hh"
65 #include "sstuff.hh"
66 #include "threadname.hh"
67
68 /* Known sins:
69
70 Receiver is currently single threaded
71 not *that* bad actually, but now that we are thread safe, might want to scale
72 */
73
74 /* the Rulaction plan
75 Set of Rules, if one matches, it leads to an Action
76 Both rules and actions could conceivably be Lua based.
77 On the C++ side, both could be inherited from a class Rule and a class Action,
78 on the Lua side we can't do that. */
79
80 using std::atomic;
81 using std::thread;
82 bool g_verbose;
83
84 struct DNSDistStats g_stats;
85
86 uint16_t g_maxOutstanding{std::numeric_limits<uint16_t>::max()};
87 uint32_t g_staleCacheEntriesTTL{0};
88 bool g_syslog{true};
89 bool g_allowEmptyResponse{false};
90
91 GlobalStateHolder<NetmaskGroup> g_ACL;
92 string g_outputBuffer;
93
94 std::vector<std::shared_ptr<TLSFrontend>> g_tlslocals;
95 std::vector<std::shared_ptr<DOHFrontend>> g_dohlocals;
96 std::vector<std::shared_ptr<DNSCryptContext>> g_dnsCryptLocals;
97 #ifdef HAVE_EBPF
98 shared_ptr<BPFFilter> g_defaultBPFFilter;
99 std::vector<std::shared_ptr<DynBPFFilter> > g_dynBPFFilters;
100 #endif /* HAVE_EBPF */
101 std::vector<std::unique_ptr<ClientState>> g_frontends;
102 GlobalStateHolder<pools_t> g_pools;
103 size_t g_udpVectorSize{1};
104
105 bool g_snmpEnabled{false};
106 bool g_snmpTrapsEnabled{false};
107 DNSDistSNMPAgent* g_snmpAgent{nullptr};
108
109 /* UDP: the grand design. Per socket we listen on for incoming queries there is one thread.
110 Then we have a bunch of connected sockets for talking to downstream servers.
111 We send directly to those sockets.
112
113 For the return path, per downstream server we have a thread that listens to responses.
114
115 Per socket there is an array of 2^16 states, when we send out a packet downstream, we note
116 there the original requestor and the original id. The new ID is the offset in the array.
117
118 When an answer comes in on a socket, we look up the offset by the id, and lob it to the
119 original requestor.
120
121 IDs are assigned by atomic increments of the socket offset.
122 */
123
124 GlobalStateHolder<vector<DNSDistRuleAction> > g_rulactions;
125 GlobalStateHolder<vector<DNSDistResponseRuleAction> > g_resprulactions;
126 GlobalStateHolder<vector<DNSDistResponseRuleAction> > g_cachehitresprulactions;
127 GlobalStateHolder<vector<DNSDistResponseRuleAction> > g_selfansweredresprulactions;
128
129 Rings g_rings;
130 QueryCount g_qcount;
131
132 GlobalStateHolder<servers_t> g_dstates;
133 GlobalStateHolder<NetmaskTree<DynBlock>> g_dynblockNMG;
134 GlobalStateHolder<SuffixMatchTree<DynBlock>> g_dynblockSMT;
135 DNSAction::Action g_dynBlockAction = DNSAction::Action::Drop;
136 int g_tcpRecvTimeout{2};
137 int g_tcpSendTimeout{2};
138 int g_udpTimeout{2};
139
140 bool g_servFailOnNoPolicy{false};
141 bool g_truncateTC{false};
142 bool g_fixupCase{false};
143 bool g_preserveTrailingData{false};
144 bool g_roundrobinFailOnNoServer{false};
145
146 std::set<std::string> g_capabilitiesToRetain;
147
148 static void truncateTC(char* packet, uint16_t* len, size_t responseSize, unsigned int consumed)
149 try
150 {
151 bool hadEDNS = false;
152 uint16_t payloadSize = 0;
153 uint16_t z = 0;
154
155 if (g_addEDNSToSelfGeneratedResponses) {
156 hadEDNS = getEDNSUDPPayloadSizeAndZ(packet, *len, &payloadSize, &z);
157 }
158
159 *len=static_cast<uint16_t>(sizeof(dnsheader)+consumed+DNS_TYPE_SIZE+DNS_CLASS_SIZE);
160 struct dnsheader* dh = reinterpret_cast<struct dnsheader*>(packet);
161 dh->ancount = dh->arcount = dh->nscount = 0;
162
163 if (hadEDNS) {
164 addEDNS(dh, *len, responseSize, z & EDNS_HEADER_FLAG_DO, payloadSize, 0);
165 }
166 }
167 catch(...)
168 {
169 g_stats.truncFail++;
170 }
171
172 struct DelayedPacket
173 {
174 int fd;
175 string packet;
176 ComboAddress destination;
177 ComboAddress origDest;
178 void operator()()
179 {
180 ssize_t res;
181 if(origDest.sin4.sin_family == 0) {
182 res = sendto(fd, packet.c_str(), packet.size(), 0, (struct sockaddr*)&destination, destination.getSocklen());
183 }
184 else {
185 res = sendfromto(fd, packet.c_str(), packet.size(), 0, origDest, destination);
186 }
187 if (res == -1) {
188 int err = errno;
189 vinfolog("Error sending delayed response to %s: %s", destination.toStringWithPort(), strerror(err));
190 }
191 }
192 };
193
194 DelayPipe<DelayedPacket>* g_delay = nullptr;
195
196 std::string DNSQuestion::getTrailingData() const
197 {
198 const char* message = reinterpret_cast<const char*>(this->dh);
199 const uint16_t messageLen = getDNSPacketLength(message, this->len);
200 return std::string(message + messageLen, this->len - messageLen);
201 }
202
203 bool DNSQuestion::setTrailingData(const std::string& tail)
204 {
205 char* message = reinterpret_cast<char*>(this->dh);
206 const uint16_t messageLen = getDNSPacketLength(message, this->len);
207 const uint16_t tailLen = tail.size();
208 if (tailLen > (this->size - messageLen)) {
209 return false;
210 }
211
212 /* Update length and copy data from the Lua string. */
213 this->len = messageLen + tailLen;
214 if(tailLen > 0) {
215 tail.copy(message + messageLen, tailLen);
216 }
217 return true;
218 }
219
220 void doLatencyStats(double udiff)
221 {
222 if(udiff < 1000) ++g_stats.latency0_1;
223 else if(udiff < 10000) ++g_stats.latency1_10;
224 else if(udiff < 50000) ++g_stats.latency10_50;
225 else if(udiff < 100000) ++g_stats.latency50_100;
226 else if(udiff < 1000000) ++g_stats.latency100_1000;
227 else ++g_stats.latencySlow;
228 g_stats.latencySum += udiff / 1000;
229
230 auto doAvg = [](double& var, double n, double weight) {
231 var = (weight -1) * var/weight + n/weight;
232 };
233
234 doAvg(g_stats.latencyAvg100, udiff, 100);
235 doAvg(g_stats.latencyAvg1000, udiff, 1000);
236 doAvg(g_stats.latencyAvg10000, udiff, 10000);
237 doAvg(g_stats.latencyAvg1000000, udiff, 1000000);
238 }
239
240 bool responseContentMatches(const char* response, const uint16_t responseLen, const DNSName& qname, const uint16_t qtype, const uint16_t qclass, const ComboAddress& remote, unsigned int& consumed)
241 {
242 if (responseLen < sizeof(dnsheader)) {
243 return false;
244 }
245
246 const struct dnsheader* dh = reinterpret_cast<const struct dnsheader*>(response);
247 if (dh->qdcount == 0) {
248 if ((dh->rcode != RCode::NoError && dh->rcode != RCode::NXDomain) || g_allowEmptyResponse) {
249 return true;
250 }
251 else {
252 ++g_stats.nonCompliantResponses;
253 return false;
254 }
255 }
256
257 uint16_t rqtype, rqclass;
258 DNSName rqname;
259 try {
260 rqname=DNSName(response, responseLen, sizeof(dnsheader), false, &rqtype, &rqclass, &consumed);
261 }
262 catch(const std::exception& e) {
263 if(responseLen > 0 && static_cast<size_t>(responseLen) > sizeof(dnsheader)) {
264 infolog("Backend %s sent us a response with id %d that did not parse: %s", remote.toStringWithPort(), ntohs(dh->id), e.what());
265 }
266 ++g_stats.nonCompliantResponses;
267 return false;
268 }
269
270 if (rqtype != qtype || rqclass != qclass || rqname != qname) {
271 return false;
272 }
273
274 return true;
275 }
276
277 static void restoreFlags(struct dnsheader* dh, uint16_t origFlags)
278 {
279 static const uint16_t rdMask = 1 << FLAGS_RD_OFFSET;
280 static const uint16_t cdMask = 1 << FLAGS_CD_OFFSET;
281 static const uint16_t restoreFlagsMask = UINT16_MAX & ~(rdMask | cdMask);
282 uint16_t * flags = getFlagsFromDNSHeader(dh);
283 /* clear the flags we are about to restore */
284 *flags &= restoreFlagsMask;
285 /* only keep the flags we want to restore */
286 origFlags &= ~restoreFlagsMask;
287 /* set the saved flags as they were */
288 *flags |= origFlags;
289 }
290
291 static bool fixUpQueryTurnedResponse(DNSQuestion& dq, const uint16_t origFlags)
292 {
293 restoreFlags(dq.dh, origFlags);
294
295 return addEDNSToQueryTurnedResponse(dq);
296 }
297
298 static bool fixUpResponse(char** response, uint16_t* responseLen, size_t* responseSize, const DNSName& qname, uint16_t origFlags, bool ednsAdded, bool ecsAdded, std::vector<uint8_t>& rewrittenResponse, uint16_t addRoom, bool* zeroScope)
299 {
300 if (*responseLen < sizeof(dnsheader)) {
301 return false;
302 }
303
304 struct dnsheader* dh = reinterpret_cast<struct dnsheader*>(*response);
305 restoreFlags(dh, origFlags);
306
307 if (*responseLen == sizeof(dnsheader)) {
308 return true;
309 }
310
311 if(g_fixupCase) {
312 string realname = qname.toDNSString();
313 if (*responseLen >= (sizeof(dnsheader) + realname.length())) {
314 memcpy(*response + sizeof(dnsheader), realname.c_str(), realname.length());
315 }
316 }
317
318 if (ednsAdded || ecsAdded) {
319 uint16_t optStart;
320 size_t optLen = 0;
321 bool last = false;
322
323 const std::string responseStr(*response, *responseLen);
324 int res = locateEDNSOptRR(responseStr, &optStart, &optLen, &last);
325
326 if (res == 0) {
327 if (zeroScope) { // this finds if an EDNS Client Subnet scope was set, and if it is 0
328 size_t optContentStart = 0;
329 uint16_t optContentLen = 0;
330 /* we need at least 4 bytes after the option length (family: 2, source prefix-length: 1, scope prefix-length: 1) */
331 if (isEDNSOptionInOpt(responseStr, optStart, optLen, EDNSOptionCode::ECS, &optContentStart, &optContentLen) && optContentLen >= 4) {
332 /* see if the EDNS Client Subnet SCOPE PREFIX-LENGTH byte in position 3 is set to 0, which is the only thing
333 we care about. */
334 *zeroScope = responseStr.at(optContentStart + 3) == 0;
335 }
336 }
337
338 if (ednsAdded) {
339 /* we added the entire OPT RR,
340 therefore we need to remove it entirely */
341 if (last) {
342 /* simply remove the last AR */
343 *responseLen -= optLen;
344 uint16_t arcount = ntohs(dh->arcount);
345 arcount--;
346 dh->arcount = htons(arcount);
347 }
348 else {
349 /* Removing an intermediary RR could lead to compression error */
350 if (rewriteResponseWithoutEDNS(responseStr, rewrittenResponse) == 0) {
351 *responseLen = rewrittenResponse.size();
352 if (addRoom && (UINT16_MAX - *responseLen) > addRoom) {
353 rewrittenResponse.reserve(*responseLen + addRoom);
354 }
355 *responseSize = rewrittenResponse.capacity();
356 *response = reinterpret_cast<char*>(rewrittenResponse.data());
357 }
358 else {
359 warnlog("Error rewriting content");
360 }
361 }
362 }
363 else {
364 /* the OPT RR was already present, but without ECS,
365 we need to remove the ECS option if any */
366 if (last) {
367 /* nothing after the OPT RR, we can simply remove the
368 ECS option */
369 size_t existingOptLen = optLen;
370 removeEDNSOptionFromOPT(*response + optStart, &optLen, EDNSOptionCode::ECS);
371 *responseLen -= (existingOptLen - optLen);
372 }
373 else {
374 /* Removing an intermediary RR could lead to compression error */
375 if (rewriteResponseWithoutEDNSOption(responseStr, EDNSOptionCode::ECS, rewrittenResponse) == 0) {
376 *responseLen = rewrittenResponse.size();
377 if (addRoom && (UINT16_MAX - *responseLen) > addRoom) {
378 rewrittenResponse.reserve(*responseLen + addRoom);
379 }
380 *responseSize = rewrittenResponse.capacity();
381 *response = reinterpret_cast<char*>(rewrittenResponse.data());
382 }
383 else {
384 warnlog("Error rewriting content");
385 }
386 }
387 }
388 }
389 }
390
391 return true;
392 }
393
394 #ifdef HAVE_DNSCRYPT
395 static bool encryptResponse(char* response, uint16_t* responseLen, size_t responseSize, bool tcp, std::shared_ptr<DNSCryptQuery> dnsCryptQuery, dnsheader** dh, dnsheader* dhCopy)
396 {
397 if (dnsCryptQuery) {
398 uint16_t encryptedResponseLen = 0;
399
400 /* save the original header before encrypting it in place */
401 if (dh != nullptr && *dh != nullptr && dhCopy != nullptr) {
402 memcpy(dhCopy, *dh, sizeof(dnsheader));
403 *dh = dhCopy;
404 }
405
406 int res = dnsCryptQuery->encryptResponse(response, *responseLen, responseSize, tcp, &encryptedResponseLen);
407 if (res == 0) {
408 *responseLen = encryptedResponseLen;
409 } else {
410 /* dropping response */
411 vinfolog("Error encrypting the response, dropping.");
412 return false;
413 }
414 }
415 return true;
416 }
417 #endif /* HAVE_DNSCRYPT */
418
419 static bool applyRulesToResponse(LocalStateHolder<vector<DNSDistResponseRuleAction> >& localRespRulactions, DNSResponse& dr)
420 {
421 DNSResponseAction::Action action=DNSResponseAction::Action::None;
422 std::string ruleresult;
423 for(const auto& lr : *localRespRulactions) {
424 if(lr.d_rule->matches(&dr)) {
425 lr.d_rule->d_matches++;
426 action=(*lr.d_action)(&dr, &ruleresult);
427 switch(action) {
428 case DNSResponseAction::Action::Allow:
429 return true;
430 break;
431 case DNSResponseAction::Action::Drop:
432 return false;
433 break;
434 case DNSResponseAction::Action::HeaderModify:
435 return true;
436 break;
437 case DNSResponseAction::Action::ServFail:
438 dr.dh->rcode = RCode::ServFail;
439 return true;
440 break;
441 /* non-terminal actions follow */
442 case DNSResponseAction::Action::Delay:
443 dr.delayMsec = static_cast<int>(pdns_stou(ruleresult)); // sorry
444 break;
445 case DNSResponseAction::Action::None:
446 break;
447 }
448 }
449 }
450
451 return true;
452 }
453
454 bool processResponse(char** response, uint16_t* responseLen, size_t* responseSize, LocalStateHolder<vector<DNSDistResponseRuleAction> >& localRespRulactions, DNSResponse& dr, size_t addRoom, std::vector<uint8_t>& rewrittenResponse, bool muted)
455 {
456 if (!applyRulesToResponse(localRespRulactions, dr)) {
457 return false;
458 }
459
460 bool zeroScope = false;
461 if (!fixUpResponse(response, responseLen, responseSize, *dr.qname, dr.origFlags, dr.ednsAdded, dr.ecsAdded, rewrittenResponse, addRoom, dr.useZeroScope ? &zeroScope : nullptr)) {
462 return false;
463 }
464
465 if (dr.packetCache && !dr.skipCache && *responseLen <= s_maxPacketCacheEntrySize) {
466 if (!dr.useZeroScope) {
467 /* if the query was not suitable for zero-scope, for
468 example because it had an existing ECS entry so the hash is
469 not really 'no ECS', so just insert it for the existing subnet
470 since:
471 - we don't have the correct hash for a non-ECS query
472 - inserting with hash computed before the ECS replacement but with
473 the subnet extracted _after_ the replacement would not work.
474 */
475 zeroScope = false;
476 }
477 // if zeroScope, pass the pre-ECS hash-key and do not pass the subnet to the cache
478 dr.packetCache->insert(zeroScope ? dr.cacheKeyNoECS : dr.cacheKey, zeroScope ? boost::none : dr.subnet, dr.origFlags, dr.dnssecOK, *dr.qname, dr.qtype, dr.qclass, *response, *responseLen, dr.tcp, dr.dh->rcode, dr.tempFailureTTL);
479 }
480
481 #ifdef HAVE_DNSCRYPT
482 if (!muted) {
483 if (!encryptResponse(*response, responseLen, *responseSize, dr.tcp, dr.dnsCryptQuery, nullptr, nullptr)) {
484 return false;
485 }
486 }
487 #endif /* HAVE_DNSCRYPT */
488
489 return true;
490 }
491
492 static bool sendUDPResponse(int origFD, const char* response, const uint16_t responseLen, const int delayMsec, const ComboAddress& origDest, const ComboAddress& origRemote)
493 {
494 if(delayMsec && g_delay) {
495 DelayedPacket dp{origFD, string(response,responseLen), origRemote, origDest};
496 g_delay->submit(dp, delayMsec);
497 }
498 else {
499 ssize_t res;
500 if(origDest.sin4.sin_family == 0) {
501 res = sendto(origFD, response, responseLen, 0, reinterpret_cast<const struct sockaddr*>(&origRemote), origRemote.getSocklen());
502 }
503 else {
504 res = sendfromto(origFD, response, responseLen, 0, origDest, origRemote);
505 }
506 if (res == -1) {
507 int err = errno;
508 vinfolog("Error sending response to %s: %s", origRemote.toStringWithPort(), stringerror(err));
509 }
510 }
511
512 return true;
513 }
514
515
516 int pickBackendSocketForSending(std::shared_ptr<DownstreamState>& state)
517 {
518 return state->sockets[state->socketsOffset++ % state->sockets.size()];
519 }
520
521 static void pickBackendSocketsReadyForReceiving(const std::shared_ptr<DownstreamState>& state, std::vector<int>& ready)
522 {
523 ready.clear();
524
525 if (state->sockets.size() == 1) {
526 ready.push_back(state->sockets[0]);
527 return ;
528 }
529
530 {
531 std::lock_guard<std::mutex> lock(state->socketsLock);
532 state->mplexer->getAvailableFDs(ready, -1);
533 }
534 }
535
536 // listens on a dedicated socket, lobs answers from downstream servers to original requestors
537 void responderThread(std::shared_ptr<DownstreamState> dss)
538 try {
539 setThreadName("dnsdist/respond");
540 auto localRespRulactions = g_resprulactions.getLocal();
541 char packet[s_maxPacketCacheEntrySize + DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE];
542 static_assert(sizeof(packet) <= UINT16_MAX, "Packet size should fit in a uint16_t");
543 /* when the answer is encrypted in place, we need to get a copy
544 of the original header before encryption to fill the ring buffer */
545 dnsheader cleartextDH;
546 vector<uint8_t> rewrittenResponse;
547
548 uint16_t queryId = 0;
549 std::vector<int> sockets;
550 sockets.reserve(dss->sockets.size());
551
552 for(;;) {
553 dnsheader* dh = reinterpret_cast<struct dnsheader*>(packet);
554 try {
555 pickBackendSocketsReadyForReceiving(dss, sockets);
556 for (const auto& fd : sockets) {
557 ssize_t got = recv(fd, packet, sizeof(packet), 0);
558 char * response = packet;
559 size_t responseSize = sizeof(packet);
560
561 if (got < 0 || static_cast<size_t>(got) < sizeof(dnsheader))
562 continue;
563
564 uint16_t responseLen = static_cast<uint16_t>(got);
565 queryId = dh->id;
566
567 if(queryId >= dss->idStates.size()) {
568 continue;
569 }
570
571 IDState* ids = &dss->idStates[queryId];
572 int64_t usageIndicator = ids->usageIndicator;
573
574 if(!IDState::isInUse(usageIndicator)) {
575 /* the corresponding state is marked as not in use, meaning that:
576 - it was already cleaned up by another thread and the state is gone ;
577 - we already got a response for this query and this one is a duplicate.
578 Either way, we don't touch it.
579 */
580 continue;
581 }
582
583 /* read the potential DOHUnit state as soon as possible, but don't use it
584 until we have confirmed that we own this state by updating usageIndicator */
585 auto du = ids->du;
586 /* setting age to 0 to prevent the maintainer thread from
587 cleaning this IDS while we process the response.
588 */
589 ids->age = 0;
590 int origFD = ids->origFD;
591
592 unsigned int consumed = 0;
593 if (!responseContentMatches(response, responseLen, ids->qname, ids->qtype, ids->qclass, dss->remote, consumed)) {
594 continue;
595 }
596
597 bool isDoH = du != nullptr;
598 /* atomically mark the state as available, but only if it has not been altered
599 in the meantime */
600 if (ids->tryMarkUnused(usageIndicator)) {
601 /* clear the potential DOHUnit asap, it's ours now
602 and since we just marked the state as unused,
603 someone could overwrite it. */
604 ids->du = nullptr;
605 /* we only decrement the outstanding counter if the value was not
606 altered in the meantime, which would mean that the state has been actively reused
607 and the other thread has not incremented the outstanding counter, so we don't
608 want it to be decremented twice. */
609 --dss->outstanding; // you'd think an attacker could game this, but we're using connected socket
610 } else {
611 /* someone updated the state in the meantime, we can't touch the existing pointer */
612 du = nullptr;
613 /* since the state has been updated, we can't safely access it so let's just drop
614 this response */
615 continue;
616 }
617
618 if(dh->tc && g_truncateTC) {
619 truncateTC(response, &responseLen, responseSize, consumed);
620 }
621
622 dh->id = ids->origID;
623
624 uint16_t addRoom = 0;
625 DNSResponse dr = makeDNSResponseFromIDState(*ids, dh, sizeof(packet), responseLen, false);
626 if (dr.dnsCryptQuery) {
627 addRoom = DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE;
628 }
629
630 memcpy(&cleartextDH, dr.dh, sizeof(cleartextDH));
631 if (!processResponse(&response, &responseLen, &responseSize, localRespRulactions, dr, addRoom, rewrittenResponse, ids->cs && ids->cs->muted)) {
632 continue;
633 }
634
635 if (ids->cs && !ids->cs->muted) {
636 if (du) {
637 #ifdef HAVE_DNS_OVER_HTTPS
638 // DoH query
639 du->response = std::string(response, responseLen);
640 if (send(du->rsock, &du, sizeof(du), 0) != sizeof(du)) {
641 /* at this point we have the only remaining pointer on this
642 DOHUnit object since we did set ids->du to nullptr earlier,
643 except if we got the response before the pointer could be
644 released by the frontend */
645 du->release();
646 }
647 #endif /* HAVE_DNS_OVER_HTTPS */
648 du = nullptr;
649 }
650 else {
651 ComboAddress empty;
652 empty.sin4.sin_family = 0;
653 /* if ids->destHarvested is false, origDest holds the listening address.
654 We don't want to use that as a source since it could be 0.0.0.0 for example. */
655 sendUDPResponse(origFD, response, responseLen, dr.delayMsec, ids->destHarvested ? ids->origDest : empty, ids->origRemote);
656 }
657 }
658
659 ++g_stats.responses;
660 if (ids->cs) {
661 ++ids->cs->responses;
662 }
663 ++dss->responses;
664
665 double udiff = ids->sentTime.udiff();
666 vinfolog("Got answer from %s, relayed to %s%s, took %f usec", dss->remote.toStringWithPort(), ids->origRemote.toStringWithPort(),
667 isDoH ? " (https)": "", udiff);
668
669 struct timespec ts;
670 gettime(&ts);
671 g_rings.insertResponse(ts, *dr.remote, *dr.qname, dr.qtype, static_cast<unsigned int>(udiff), static_cast<unsigned int>(got), cleartextDH, dss->remote);
672
673 switch (cleartextDH.rcode) {
674 case RCode::NXDomain:
675 ++g_stats.frontendNXDomain;
676 break;
677 case RCode::ServFail:
678 ++g_stats.servfailResponses;
679 ++g_stats.frontendServFail;
680 break;
681 case RCode::NoError:
682 ++g_stats.frontendNoError;
683 break;
684 }
685 dss->latencyUsec = (127.0 * dss->latencyUsec / 128.0) + udiff/128.0;
686
687 doLatencyStats(udiff);
688
689 rewrittenResponse.clear();
690 }
691 }
692 catch(const std::exception& e){
693 vinfolog("Got an error in UDP responder thread while parsing a response from %s, id %d: %s", dss->remote.toStringWithPort(), queryId, e.what());
694 }
695 }
696 }
697 catch(const std::exception& e)
698 {
699 errlog("UDP responder thread died because of exception: %s", e.what());
700 }
701 catch(const PDNSException& e)
702 {
703 errlog("UDP responder thread died because of PowerDNS exception: %s", e.reason);
704 }
705 catch(...)
706 {
707 errlog("UDP responder thread died because of an exception: %s", "unknown");
708 }
709
710 bool DownstreamState::reconnect()
711 {
712 std::unique_lock<std::mutex> tl(connectLock, std::try_to_lock);
713 if (!tl.owns_lock()) {
714 /* we are already reconnecting */
715 return false;
716 }
717
718 connected = false;
719 for (auto& fd : sockets) {
720 if (fd != -1) {
721 if (sockets.size() > 1) {
722 std::lock_guard<std::mutex> lock(socketsLock);
723 mplexer->removeReadFD(fd);
724 }
725 /* shutdown() is needed to wake up recv() in the responderThread */
726 shutdown(fd, SHUT_RDWR);
727 close(fd);
728 fd = -1;
729 }
730 if (!IsAnyAddress(remote)) {
731 fd = SSocket(remote.sin4.sin_family, SOCK_DGRAM, 0);
732 if (!IsAnyAddress(sourceAddr)) {
733 SSetsockopt(fd, SOL_SOCKET, SO_REUSEADDR, 1);
734 if (!sourceItfName.empty()) {
735 #ifdef SO_BINDTODEVICE
736 int res = setsockopt(fd, SOL_SOCKET, SO_BINDTODEVICE, sourceItfName.c_str(), sourceItfName.length());
737 if (res != 0) {
738 infolog("Error setting up the interface on backend socket '%s': %s", remote.toStringWithPort(), stringerror());
739 }
740 #endif
741 }
742
743 SBind(fd, sourceAddr);
744 }
745 try {
746 SConnect(fd, remote);
747 if (sockets.size() > 1) {
748 std::lock_guard<std::mutex> lock(socketsLock);
749 mplexer->addReadFD(fd, [](int, boost::any) {});
750 }
751 connected = true;
752 }
753 catch(const std::runtime_error& error) {
754 infolog("Error connecting to new server with address %s: %s", remote.toStringWithPort(), error.what());
755 connected = false;
756 break;
757 }
758 }
759 }
760
761 /* if at least one (re-)connection failed, close all sockets */
762 if (!connected) {
763 for (auto& fd : sockets) {
764 if (fd != -1) {
765 if (sockets.size() > 1) {
766 std::lock_guard<std::mutex> lock(socketsLock);
767 mplexer->removeReadFD(fd);
768 }
769 /* shutdown() is needed to wake up recv() in the responderThread */
770 shutdown(fd, SHUT_RDWR);
771 close(fd);
772 fd = -1;
773 }
774 }
775 }
776
777 return connected;
778 }
779 void DownstreamState::hash()
780 {
781 vinfolog("Computing hashes for id=%s and weight=%d", id, weight);
782 auto w = weight;
783 WriteLock wl(&d_lock);
784 hashes.clear();
785 while (w > 0) {
786 std::string uuid = boost::str(boost::format("%s-%d") % id % w);
787 unsigned int wshash = burtleCI((const unsigned char*)uuid.c_str(), uuid.size(), g_hashperturb);
788 hashes.insert(wshash);
789 --w;
790 }
791 }
792
793 void DownstreamState::setId(const boost::uuids::uuid& newId)
794 {
795 id = newId;
796 // compute hashes only if already done
797 if (!hashes.empty()) {
798 hash();
799 }
800 }
801
802 void DownstreamState::setWeight(int newWeight)
803 {
804 if (newWeight < 1) {
805 errlog("Error setting server's weight: downstream weight value must be greater than 0.");
806 return ;
807 }
808 weight = newWeight;
809 if (!hashes.empty()) {
810 hash();
811 }
812 }
813
814 DownstreamState::DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf_, const std::string& sourceItfName_, size_t numberOfSockets, bool connect=true): sourceItfName(sourceItfName_), remote(remote_), sourceAddr(sourceAddr_), sourceItf(sourceItf_)
815 {
816 pthread_rwlock_init(&d_lock, nullptr);
817 id = getUniqueID();
818 threadStarted.clear();
819
820 mplexer = std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
821
822 sockets.resize(numberOfSockets);
823 for (auto& fd : sockets) {
824 fd = -1;
825 }
826
827 if (connect && !IsAnyAddress(remote)) {
828 reconnect();
829 idStates.resize(g_maxOutstanding);
830 sw.start();
831 infolog("Added downstream server %s", remote.toStringWithPort());
832 }
833
834 }
835
836 std::mutex g_luamutex;
837 LuaContext g_lua;
838
839 GlobalStateHolder<ServerPolicy> g_policy;
840
841 shared_ptr<DownstreamState> firstAvailable(const NumberedServerVector& servers, const DNSQuestion* dq)
842 {
843 for(auto& d : servers) {
844 if(d.second->isUp() && d.second->qps.check())
845 return d.second;
846 }
847 return leastOutstanding(servers, dq);
848 }
849
850 // get server with least outstanding queries, and within those, with the lowest order, and within those: the fastest
851 shared_ptr<DownstreamState> leastOutstanding(const NumberedServerVector& servers, const DNSQuestion* dq)
852 {
853 if (servers.size() == 1 && servers[0].second->isUp()) {
854 return servers[0].second;
855 }
856
857 vector<pair<tuple<int,int,double>, shared_ptr<DownstreamState>>> poss;
858 /* so you might wonder, why do we go through this trouble? The data on which we sort could change during the sort,
859 which would suck royally and could even lead to crashes. So first we snapshot on what we sort, and then we sort */
860 poss.reserve(servers.size());
861 for(auto& d : servers) {
862 if(d.second->isUp()) {
863 poss.push_back({make_tuple(d.second->outstanding.load(), d.second->order, d.second->latencyUsec), d.second});
864 }
865 }
866 if(poss.empty())
867 return shared_ptr<DownstreamState>();
868 nth_element(poss.begin(), poss.begin(), poss.end(), [](const decltype(poss)::value_type& a, const decltype(poss)::value_type& b) { return a.first < b.first; });
869 return poss.begin()->second;
870 }
871
872 shared_ptr<DownstreamState> valrandom(unsigned int val, const NumberedServerVector& servers, const DNSQuestion* dq)
873 {
874 vector<pair<int, shared_ptr<DownstreamState>>> poss;
875 int sum = 0;
876 int max = std::numeric_limits<int>::max();
877
878 for(auto& d : servers) { // w=1, w=10 -> 1, 11
879 if(d.second->isUp()) {
880 // Don't overflow sum when adding high weights
881 if(d.second->weight > max - sum) {
882 sum = max;
883 } else {
884 sum += d.second->weight;
885 }
886
887 poss.push_back({sum, d.second});
888 }
889 }
890
891 // Catch poss & sum are empty to avoid SIGFPE
892 if(poss.empty())
893 return shared_ptr<DownstreamState>();
894
895 int r = val % sum;
896 auto p = upper_bound(poss.begin(), poss.end(),r, [](int r_, const decltype(poss)::value_type& a) { return r_ < a.first;});
897 if(p==poss.end())
898 return shared_ptr<DownstreamState>();
899 return p->second;
900 }
901
902 shared_ptr<DownstreamState> wrandom(const NumberedServerVector& servers, const DNSQuestion* dq)
903 {
904 return valrandom(random(), servers, dq);
905 }
906
907 uint32_t g_hashperturb;
908 double g_consistentHashBalancingFactor = 0;
909 shared_ptr<DownstreamState> whashed(const NumberedServerVector& servers, const DNSQuestion* dq)
910 {
911 return valrandom(dq->qname->hash(g_hashperturb), servers, dq);
912 }
913
914 shared_ptr<DownstreamState> chashed(const NumberedServerVector& servers, const DNSQuestion* dq)
915 {
916 unsigned int qhash = dq->qname->hash(g_hashperturb);
917 unsigned int sel = std::numeric_limits<unsigned int>::max();
918 unsigned int min = std::numeric_limits<unsigned int>::max();
919 shared_ptr<DownstreamState> ret = nullptr, first = nullptr;
920
921 double targetLoad = std::numeric_limits<double>::max();
922 if (g_consistentHashBalancingFactor > 0) {
923 /* we start with one, representing the query we are currently handling */
924 double currentLoad = 1;
925 for (const auto& pair : servers) {
926 currentLoad += pair.second->outstanding;
927 }
928 targetLoad = (currentLoad / servers.size()) * g_consistentHashBalancingFactor;
929 }
930
931 for (const auto& d: servers) {
932 if (d.second->isUp() && d.second->outstanding <= targetLoad) {
933 // make sure hashes have been computed
934 if (d.second->hashes.empty()) {
935 d.second->hash();
936 }
937 {
938 ReadLock rl(&(d.second->d_lock));
939 const auto& server = d.second;
940 // we want to keep track of the last hash
941 if (min > *(server->hashes.begin())) {
942 min = *(server->hashes.begin());
943 first = server;
944 }
945
946 auto hash_it = server->hashes.lower_bound(qhash);
947 if (hash_it != server->hashes.end()) {
948 if (*hash_it < sel) {
949 sel = *hash_it;
950 ret = server;
951 }
952 }
953 }
954 }
955 }
956 if (ret != nullptr) {
957 return ret;
958 }
959 if (first != nullptr) {
960 return first;
961 }
962 return shared_ptr<DownstreamState>();
963 }
964
965 shared_ptr<DownstreamState> roundrobin(const NumberedServerVector& servers, const DNSQuestion* dq)
966 {
967 NumberedServerVector poss;
968
969 for(auto& d : servers) {
970 if(d.second->isUp()) {
971 poss.push_back(d);
972 }
973 }
974
975 const auto *res=&poss;
976 if(poss.empty() && !g_roundrobinFailOnNoServer)
977 res = &servers;
978
979 if(res->empty())
980 return shared_ptr<DownstreamState>();
981
982 static unsigned int counter;
983
984 return (*res)[(counter++) % res->size()].second;
985 }
986
987 ComboAddress g_serverControl{"127.0.0.1:5199"};
988
989 std::shared_ptr<ServerPool> createPoolIfNotExists(pools_t& pools, const string& poolName)
990 {
991 std::shared_ptr<ServerPool> pool;
992 pools_t::iterator it = pools.find(poolName);
993 if (it != pools.end()) {
994 pool = it->second;
995 }
996 else {
997 if (!poolName.empty())
998 vinfolog("Creating pool %s", poolName);
999 pool = std::make_shared<ServerPool>();
1000 pools.insert(std::pair<std::string,std::shared_ptr<ServerPool> >(poolName, pool));
1001 }
1002 return pool;
1003 }
1004
1005 void setPoolPolicy(pools_t& pools, const string& poolName, std::shared_ptr<ServerPolicy> policy)
1006 {
1007 std::shared_ptr<ServerPool> pool = createPoolIfNotExists(pools, poolName);
1008 if (!poolName.empty()) {
1009 vinfolog("Setting pool %s server selection policy to %s", poolName, policy->name);
1010 } else {
1011 vinfolog("Setting default pool server selection policy to %s", policy->name);
1012 }
1013 pool->policy = policy;
1014 }
1015
1016 void addServerToPool(pools_t& pools, const string& poolName, std::shared_ptr<DownstreamState> server)
1017 {
1018 std::shared_ptr<ServerPool> pool = createPoolIfNotExists(pools, poolName);
1019 if (!poolName.empty()) {
1020 vinfolog("Adding server to pool %s", poolName);
1021 } else {
1022 vinfolog("Adding server to default pool");
1023 }
1024 pool->addServer(server);
1025 }
1026
1027 void removeServerFromPool(pools_t& pools, const string& poolName, std::shared_ptr<DownstreamState> server)
1028 {
1029 std::shared_ptr<ServerPool> pool = getPool(pools, poolName);
1030
1031 if (!poolName.empty()) {
1032 vinfolog("Removing server from pool %s", poolName);
1033 }
1034 else {
1035 vinfolog("Removing server from default pool");
1036 }
1037
1038 pool->removeServer(server);
1039 }
1040
1041 std::shared_ptr<ServerPool> getPool(const pools_t& pools, const std::string& poolName)
1042 {
1043 pools_t::const_iterator it = pools.find(poolName);
1044
1045 if (it == pools.end()) {
1046 throw std::out_of_range("No pool named " + poolName);
1047 }
1048
1049 return it->second;
1050 }
1051
1052 NumberedServerVector getDownstreamCandidates(const pools_t& pools, const std::string& poolName)
1053 {
1054 std::shared_ptr<ServerPool> pool = getPool(pools, poolName);
1055 return pool->getServers();
1056 }
1057
1058 static void spoofResponseFromString(DNSQuestion& dq, const string& spoofContent, bool raw)
1059 {
1060 string result;
1061
1062 if (raw) {
1063 SpoofAction sa(spoofContent);
1064 sa(&dq, &result);
1065 }
1066 else {
1067 std::vector<std::string> addrs;
1068 stringtok(addrs, spoofContent, " ,");
1069
1070 if (addrs.size() == 1) {
1071 try {
1072 ComboAddress spoofAddr(spoofContent);
1073 SpoofAction sa({spoofAddr});
1074 sa(&dq, &result);
1075 }
1076 catch(const PDNSException &e) {
1077 DNSName cname(spoofContent);
1078 SpoofAction sa(cname); // CNAME then
1079 sa(&dq, &result);
1080 }
1081 } else {
1082 std::vector<ComboAddress> cas;
1083 for (const auto& addr : addrs) {
1084 try {
1085 cas.push_back(ComboAddress(addr));
1086 }
1087 catch (...) {
1088 }
1089 }
1090 SpoofAction sa(cas);
1091 sa(&dq, &result);
1092 }
1093 }
1094 }
1095
1096 bool processRulesResult(const DNSAction::Action& action, DNSQuestion& dq, std::string& ruleresult, bool& drop)
1097 {
1098 switch(action) {
1099 case DNSAction::Action::Allow:
1100 return true;
1101 break;
1102 case DNSAction::Action::Drop:
1103 ++g_stats.ruleDrop;
1104 drop = true;
1105 return true;
1106 break;
1107 case DNSAction::Action::Nxdomain:
1108 dq.dh->rcode = RCode::NXDomain;
1109 dq.dh->qr=true;
1110 ++g_stats.ruleNXDomain;
1111 return true;
1112 break;
1113 case DNSAction::Action::Refused:
1114 dq.dh->rcode = RCode::Refused;
1115 dq.dh->qr=true;
1116 ++g_stats.ruleRefused;
1117 return true;
1118 break;
1119 case DNSAction::Action::ServFail:
1120 dq.dh->rcode = RCode::ServFail;
1121 dq.dh->qr=true;
1122 ++g_stats.ruleServFail;
1123 return true;
1124 break;
1125 case DNSAction::Action::Spoof:
1126 spoofResponseFromString(dq, ruleresult, false);
1127 return true;
1128 break;
1129 case DNSAction::Action::SpoofRaw:
1130 spoofResponseFromString(dq, ruleresult, true);
1131 return true;
1132 break;
1133 case DNSAction::Action::Truncate:
1134 dq.dh->tc = true;
1135 dq.dh->qr = true;
1136 dq.dh->ra = dq.dh->rd;
1137 dq.dh->aa = false;
1138 dq.dh->ad = false;
1139 return true;
1140 break;
1141 case DNSAction::Action::HeaderModify:
1142 return true;
1143 break;
1144 case DNSAction::Action::Pool:
1145 dq.poolname=ruleresult;
1146 return true;
1147 break;
1148 case DNSAction::Action::NoRecurse:
1149 dq.dh->rd = false;
1150 return true;
1151 break;
1152 /* non-terminal actions follow */
1153 case DNSAction::Action::Delay:
1154 dq.delayMsec = static_cast<int>(pdns_stou(ruleresult)); // sorry
1155 break;
1156 case DNSAction::Action::None:
1157 /* fall-through */
1158 case DNSAction::Action::NoOp:
1159 break;
1160 }
1161
1162 /* false means that we don't stop the processing */
1163 return false;
1164 }
1165
1166
1167 static bool applyRulesToQuery(LocalHolders& holders, DNSQuestion& dq, const struct timespec& now)
1168 {
1169 g_rings.insertQuery(now, *dq.remote, *dq.qname, dq.qtype, dq.len, *dq.dh);
1170
1171 if(g_qcount.enabled) {
1172 string qname = (*dq.qname).toLogString();
1173 bool countQuery{true};
1174 if(g_qcount.filter) {
1175 std::lock_guard<std::mutex> lock(g_luamutex);
1176 std::tie (countQuery, qname) = g_qcount.filter(&dq);
1177 }
1178
1179 if(countQuery) {
1180 WriteLock wl(&g_qcount.queryLock);
1181 if(!g_qcount.records.count(qname)) {
1182 g_qcount.records[qname] = 0;
1183 }
1184 g_qcount.records[qname]++;
1185 }
1186 }
1187
1188 if(auto got = holders.dynNMGBlock->lookup(*dq.remote)) {
1189 auto updateBlockStats = [&got]() {
1190 ++g_stats.dynBlocked;
1191 got->second.blocks++;
1192 };
1193
1194 if(now < got->second.until) {
1195 DNSAction::Action action = got->second.action;
1196 if (action == DNSAction::Action::None) {
1197 action = g_dynBlockAction;
1198 }
1199 switch (action) {
1200 case DNSAction::Action::NoOp:
1201 /* do nothing */
1202 break;
1203
1204 case DNSAction::Action::Nxdomain:
1205 vinfolog("Query from %s turned into NXDomain because of dynamic block", dq.remote->toStringWithPort());
1206 updateBlockStats();
1207
1208 dq.dh->rcode = RCode::NXDomain;
1209 dq.dh->qr=true;
1210 return true;
1211
1212 case DNSAction::Action::Refused:
1213 vinfolog("Query from %s refused because of dynamic block", dq.remote->toStringWithPort());
1214 updateBlockStats();
1215
1216 dq.dh->rcode = RCode::Refused;
1217 dq.dh->qr = true;
1218 return true;
1219
1220 case DNSAction::Action::Truncate:
1221 if(!dq.tcp) {
1222 updateBlockStats();
1223 vinfolog("Query from %s truncated because of dynamic block", dq.remote->toStringWithPort());
1224 dq.dh->tc = true;
1225 dq.dh->qr = true;
1226 dq.dh->ra = dq.dh->rd;
1227 dq.dh->aa = false;
1228 dq.dh->ad = false;
1229 return true;
1230 }
1231 else {
1232 vinfolog("Query from %s for %s over TCP *not* truncated because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toLogString());
1233 }
1234 break;
1235 case DNSAction::Action::NoRecurse:
1236 updateBlockStats();
1237 vinfolog("Query from %s setting rd=0 because of dynamic block", dq.remote->toStringWithPort());
1238 dq.dh->rd = false;
1239 return true;
1240 default:
1241 updateBlockStats();
1242 vinfolog("Query from %s dropped because of dynamic block", dq.remote->toStringWithPort());
1243 return false;
1244 }
1245 }
1246 }
1247
1248 if(auto got = holders.dynSMTBlock->lookup(*dq.qname)) {
1249 auto updateBlockStats = [&got]() {
1250 ++g_stats.dynBlocked;
1251 got->blocks++;
1252 };
1253
1254 if(now < got->until) {
1255 DNSAction::Action action = got->action;
1256 if (action == DNSAction::Action::None) {
1257 action = g_dynBlockAction;
1258 }
1259 switch (action) {
1260 case DNSAction::Action::NoOp:
1261 /* do nothing */
1262 break;
1263 case DNSAction::Action::Nxdomain:
1264 vinfolog("Query from %s for %s turned into NXDomain because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toLogString());
1265 updateBlockStats();
1266
1267 dq.dh->rcode = RCode::NXDomain;
1268 dq.dh->qr=true;
1269 return true;
1270 case DNSAction::Action::Refused:
1271 vinfolog("Query from %s for %s refused because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toLogString());
1272 updateBlockStats();
1273
1274 dq.dh->rcode = RCode::Refused;
1275 dq.dh->qr=true;
1276 return true;
1277 case DNSAction::Action::Truncate:
1278 if(!dq.tcp) {
1279 updateBlockStats();
1280
1281 vinfolog("Query from %s for %s truncated because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toLogString());
1282 dq.dh->tc = true;
1283 dq.dh->qr = true;
1284 dq.dh->ra = dq.dh->rd;
1285 dq.dh->aa = false;
1286 dq.dh->ad = false;
1287 return true;
1288 }
1289 else {
1290 vinfolog("Query from %s for %s over TCP *not* truncated because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toLogString());
1291 }
1292 break;
1293 case DNSAction::Action::NoRecurse:
1294 updateBlockStats();
1295 vinfolog("Query from %s setting rd=0 because of dynamic block", dq.remote->toStringWithPort());
1296 dq.dh->rd = false;
1297 return true;
1298 default:
1299 updateBlockStats();
1300 vinfolog("Query from %s for %s dropped because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toLogString());
1301 return false;
1302 }
1303 }
1304 }
1305
1306 DNSAction::Action action=DNSAction::Action::None;
1307 string ruleresult;
1308 bool drop = false;
1309 for(const auto& lr : *holders.rulactions) {
1310 if(lr.d_rule->matches(&dq)) {
1311 lr.d_rule->d_matches++;
1312 action=(*lr.d_action)(&dq, &ruleresult);
1313 if (processRulesResult(action, dq, ruleresult, drop)) {
1314 break;
1315 }
1316 }
1317 }
1318
1319 if (drop) {
1320 return false;
1321 }
1322
1323 return true;
1324 }
1325
1326 ssize_t udpClientSendRequestToBackend(const std::shared_ptr<DownstreamState>& ss, const int sd, const char* request, const size_t requestLen, bool healthCheck)
1327 {
1328 ssize_t result;
1329
1330 if (ss->sourceItf == 0) {
1331 result = send(sd, request, requestLen, 0);
1332 }
1333 else {
1334 struct msghdr msgh;
1335 struct iovec iov;
1336 cmsgbuf_aligned cbuf;
1337 ComboAddress remote(ss->remote);
1338 fillMSGHdr(&msgh, &iov, &cbuf, sizeof(cbuf), const_cast<char*>(request), requestLen, &remote);
1339 addCMsgSrcAddr(&msgh, &cbuf, &ss->sourceAddr, ss->sourceItf);
1340 result = sendmsg(sd, &msgh, 0);
1341 }
1342
1343 if (result == -1) {
1344 int savederrno = errno;
1345 vinfolog("Error sending request to backend %s: %d", ss->remote.toStringWithPort(), savederrno);
1346
1347 /* This might sound silly, but on Linux send() might fail with EINVAL
1348 if the interface the socket was bound to doesn't exist anymore.
1349 We don't want to reconnect the real socket if the healthcheck failed,
1350 because it's not using the same socket.
1351 */
1352 if (!healthCheck && (savederrno == EINVAL || savederrno == ENODEV)) {
1353 ss->reconnect();
1354 }
1355 }
1356
1357 return result;
1358 }
1359
1360 static bool isUDPQueryAcceptable(ClientState& cs, LocalHolders& holders, const struct msghdr* msgh, const ComboAddress& remote, ComboAddress& dest)
1361 {
1362 if (msgh->msg_flags & MSG_TRUNC) {
1363 /* message was too large for our buffer */
1364 vinfolog("Dropping message too large for our buffer");
1365 ++g_stats.nonCompliantQueries;
1366 return false;
1367 }
1368
1369 if(!holders.acl->match(remote)) {
1370 vinfolog("Query from %s dropped because of ACL", remote.toStringWithPort());
1371 ++g_stats.aclDrops;
1372 return false;
1373 }
1374
1375 cs.queries++;
1376 ++g_stats.queries;
1377
1378 if (HarvestDestinationAddress(msgh, &dest)) {
1379 /* we don't get the port, only the address */
1380 dest.sin4.sin_port = cs.local.sin4.sin_port;
1381 }
1382 else {
1383 dest.sin4.sin_family = 0;
1384 }
1385
1386 return true;
1387 }
1388
1389 boost::optional<std::vector<uint8_t>> checkDNSCryptQuery(const ClientState& cs, const char* query, uint16_t& len, std::shared_ptr<DNSCryptQuery>& dnsCryptQuery, time_t now, bool tcp)
1390 {
1391 if (cs.dnscryptCtx) {
1392 #ifdef HAVE_DNSCRYPT
1393 vector<uint8_t> response;
1394 uint16_t decryptedQueryLen = 0;
1395
1396 dnsCryptQuery = std::make_shared<DNSCryptQuery>(cs.dnscryptCtx);
1397
1398 bool decrypted = handleDNSCryptQuery(const_cast<char*>(query), len, dnsCryptQuery, &decryptedQueryLen, tcp, now, response);
1399
1400 if (!decrypted) {
1401 if (response.size() > 0) {
1402 return response;
1403 }
1404 throw std::runtime_error("Unable to decrypt DNSCrypt query, dropping.");
1405 }
1406
1407 len = decryptedQueryLen;
1408 #endif /* HAVE_DNSCRYPT */
1409 }
1410 return boost::none;
1411 }
1412
1413 bool checkQueryHeaders(const struct dnsheader* dh)
1414 {
1415 if (dh->qr) { // don't respond to responses
1416 ++g_stats.nonCompliantQueries;
1417 return false;
1418 }
1419
1420 if (dh->qdcount == 0) {
1421 ++g_stats.emptyQueries;
1422 return false;
1423 }
1424
1425 if (dh->rd) {
1426 ++g_stats.rdQueries;
1427 }
1428
1429 return true;
1430 }
1431
1432 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1433 static void queueResponse(const ClientState& cs, const char* response, uint16_t responseLen, const ComboAddress& dest, const ComboAddress& remote, struct mmsghdr& outMsg, struct iovec* iov, cmsgbuf_aligned* cbuf)
1434 {
1435 outMsg.msg_len = 0;
1436 fillMSGHdr(&outMsg.msg_hdr, iov, nullptr, 0, const_cast<char*>(response), responseLen, const_cast<ComboAddress*>(&remote));
1437
1438 if (dest.sin4.sin_family == 0) {
1439 outMsg.msg_hdr.msg_control = nullptr;
1440 }
1441 else {
1442 addCMsgSrcAddr(&outMsg.msg_hdr, cbuf, &dest, 0);
1443 }
1444 }
1445 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1446
1447 /* self-generated responses or cache hits */
1448 static bool prepareOutgoingResponse(LocalHolders& holders, ClientState& cs, DNSQuestion& dq, bool cacheHit)
1449 {
1450 DNSResponse dr(dq.qname, dq.qtype, dq.qclass, dq.consumed, dq.local, dq.remote, reinterpret_cast<dnsheader*>(dq.dh), dq.size, dq.len, dq.tcp, dq.queryTime);
1451
1452 #ifdef HAVE_PROTOBUF
1453 dr.uniqueId = dq.uniqueId;
1454 #endif
1455 dr.qTag = dq.qTag;
1456 dr.delayMsec = dq.delayMsec;
1457
1458 if (!applyRulesToResponse(cacheHit ? holders.cacheHitRespRulactions : holders.selfAnsweredRespRulactions, dr)) {
1459 return false;
1460 }
1461
1462 /* in case a rule changed it */
1463 dq.delayMsec = dr.delayMsec;
1464
1465 #ifdef HAVE_DNSCRYPT
1466 if (!cs.muted) {
1467 if (!encryptResponse(reinterpret_cast<char*>(dq.dh), &dq.len, dq.size, dq.tcp, dq.dnsCryptQuery, nullptr, nullptr)) {
1468 return false;
1469 }
1470 }
1471 #endif /* HAVE_DNSCRYPT */
1472
1473 if (cacheHit) {
1474 ++g_stats.cacheHits;
1475 }
1476
1477 switch (dr.dh->rcode) {
1478 case RCode::NXDomain:
1479 ++g_stats.frontendNXDomain;
1480 break;
1481 case RCode::ServFail:
1482 ++g_stats.frontendServFail;
1483 break;
1484 case RCode::NoError:
1485 ++g_stats.frontendNoError;
1486 break;
1487 }
1488
1489 doLatencyStats(0); // we're not going to measure this
1490 return true;
1491 }
1492
1493 ProcessQueryResult processQuery(DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend)
1494 {
1495 const uint16_t queryId = ntohs(dq.dh->id);
1496
1497 try {
1498 /* we need an accurate ("real") value for the response and
1499 to store into the IDS, but not for insertion into the
1500 rings for example */
1501 struct timespec now;
1502 gettime(&now);
1503
1504 if (!applyRulesToQuery(holders, dq, now)) {
1505 return ProcessQueryResult::Drop;
1506 }
1507
1508 if(dq.dh->qr) { // something turned it into a response
1509 fixUpQueryTurnedResponse(dq, dq.origFlags);
1510
1511 if (!prepareOutgoingResponse(holders, cs, dq, false)) {
1512 return ProcessQueryResult::Drop;
1513 }
1514
1515 ++g_stats.selfAnswered;
1516 ++cs.responses;
1517 return ProcessQueryResult::SendAnswer;
1518 }
1519
1520 std::shared_ptr<ServerPool> serverPool = getPool(*holders.pools, dq.poolname);
1521 dq.packetCache = serverPool->packetCache;
1522 auto policy = *(holders.policy);
1523 if (serverPool->policy != nullptr) {
1524 policy = *(serverPool->policy);
1525 }
1526 auto servers = serverPool->getServers();
1527 if (policy.isLua) {
1528 std::lock_guard<std::mutex> lock(g_luamutex);
1529 selectedBackend = policy.policy(servers, &dq);
1530 }
1531 else {
1532 selectedBackend = policy.policy(servers, &dq);
1533 }
1534
1535 uint16_t cachedResponseSize = dq.size;
1536 uint32_t allowExpired = selectedBackend ? 0 : g_staleCacheEntriesTTL;
1537
1538 if (dq.packetCache && !dq.skipCache) {
1539 dq.dnssecOK = (getEDNSZ(dq) & EDNS_HEADER_FLAG_DO);
1540 }
1541
1542 if (dq.useECS && ((selectedBackend && selectedBackend->useECS) || (!selectedBackend && serverPool->getECS()))) {
1543 // we special case our cache in case a downstream explicitly gave us a universally valid response with a 0 scope
1544 // we need ECS parsing (parseECS) to be true so we can be sure that the initial incoming query did not have an existing
1545 // ECS option, which would make it unsuitable for the zero-scope feature.
1546 if (dq.packetCache && !dq.skipCache && (!selectedBackend || !selectedBackend->disableZeroScope) && dq.packetCache->isECSParsingEnabled()) {
1547 if (dq.packetCache->get(dq, dq.consumed, dq.dh->id, reinterpret_cast<char*>(dq.dh), &cachedResponseSize, &dq.cacheKeyNoECS, dq.subnet, dq.dnssecOK, allowExpired)) {
1548 dq.len = cachedResponseSize;
1549
1550 if (!prepareOutgoingResponse(holders, cs, dq, true)) {
1551 return ProcessQueryResult::Drop;
1552 }
1553
1554 return ProcessQueryResult::SendAnswer;
1555 }
1556
1557 if (!dq.subnet) {
1558 /* there was no existing ECS on the query, enable the zero-scope feature */
1559 dq.useZeroScope = true;
1560 }
1561 }
1562
1563 if (!handleEDNSClientSubnet(dq, dq.ednsAdded, dq.ecsAdded, g_preserveTrailingData)) {
1564 vinfolog("Dropping query from %s because we couldn't insert the ECS value", dq.remote->toStringWithPort());
1565 return ProcessQueryResult::Drop;
1566 }
1567 }
1568
1569 if (dq.packetCache && !dq.skipCache) {
1570 if (dq.packetCache->get(dq, dq.consumed, dq.dh->id, reinterpret_cast<char*>(dq.dh), &cachedResponseSize, &dq.cacheKey, dq.subnet, dq.dnssecOK, allowExpired)) {
1571 dq.len = cachedResponseSize;
1572
1573 if (!prepareOutgoingResponse(holders, cs, dq, true)) {
1574 return ProcessQueryResult::Drop;
1575 }
1576
1577 return ProcessQueryResult::SendAnswer;
1578 }
1579 ++g_stats.cacheMisses;
1580 }
1581
1582 if(!selectedBackend) {
1583 ++g_stats.noPolicy;
1584
1585 vinfolog("%s query for %s|%s from %s, no policy applied", g_servFailOnNoPolicy ? "ServFailed" : "Dropped", dq.qname->toLogString(), QType(dq.qtype).getName(), dq.remote->toStringWithPort());
1586 if (g_servFailOnNoPolicy) {
1587 restoreFlags(dq.dh, dq.origFlags);
1588
1589 dq.dh->rcode = RCode::ServFail;
1590 dq.dh->qr = true;
1591
1592 if (!prepareOutgoingResponse(holders, cs, dq, false)) {
1593 return ProcessQueryResult::Drop;
1594 }
1595 // no response-only statistics counter to update.
1596 return ProcessQueryResult::SendAnswer;
1597 }
1598
1599 return ProcessQueryResult::Drop;
1600 }
1601
1602 if (dq.addXPF && selectedBackend->xpfRRCode != 0) {
1603 addXPF(dq, selectedBackend->xpfRRCode, g_preserveTrailingData);
1604 }
1605
1606 selectedBackend->queries++;
1607 return ProcessQueryResult::PassToBackend;
1608 }
1609 catch(const std::exception& e){
1610 vinfolog("Got an error while parsing a %s query from %s, id %d: %s", (dq.tcp ? "TCP" : "UDP"), dq.remote->toStringWithPort(), queryId, e.what());
1611 }
1612 return ProcessQueryResult::Drop;
1613 }
1614
1615 static void processUDPQuery(ClientState& cs, LocalHolders& holders, const struct msghdr* msgh, const ComboAddress& remote, ComboAddress& dest, char* query, uint16_t len, size_t queryBufferSize, struct mmsghdr* responsesVect, unsigned int* queuedResponses, struct iovec* respIOV, cmsgbuf_aligned* respCBuf)
1616 {
1617 assert(responsesVect == nullptr || (queuedResponses != nullptr && respIOV != nullptr && respCBuf != nullptr));
1618 uint16_t queryId = 0;
1619
1620 try {
1621 if (!isUDPQueryAcceptable(cs, holders, msgh, remote, dest)) {
1622 return;
1623 }
1624
1625 /* we need an accurate ("real") value for the response and
1626 to store into the IDS, but not for insertion into the
1627 rings for example */
1628 struct timespec queryRealTime;
1629 gettime(&queryRealTime, true);
1630
1631 std::shared_ptr<DNSCryptQuery> dnsCryptQuery = nullptr;
1632 auto dnsCryptResponse = checkDNSCryptQuery(cs, query, len, dnsCryptQuery, queryRealTime.tv_sec, false);
1633 if (dnsCryptResponse) {
1634 sendUDPResponse(cs.udpFD, reinterpret_cast<char*>(dnsCryptResponse->data()), static_cast<uint16_t>(dnsCryptResponse->size()), 0, dest, remote);
1635 return;
1636 }
1637
1638 struct dnsheader* dh = reinterpret_cast<struct dnsheader*>(query);
1639 queryId = ntohs(dh->id);
1640
1641 if (!checkQueryHeaders(dh)) {
1642 return;
1643 }
1644
1645 uint16_t qtype, qclass;
1646 unsigned int consumed = 0;
1647 DNSName qname(query, len, sizeof(dnsheader), false, &qtype, &qclass, &consumed);
1648 DNSQuestion dq(&qname, qtype, qclass, consumed, dest.sin4.sin_family != 0 ? &dest : &cs.local, &remote, dh, queryBufferSize, len, false, &queryRealTime);
1649 dq.dnsCryptQuery = std::move(dnsCryptQuery);
1650 std::shared_ptr<DownstreamState> ss{nullptr};
1651 auto result = processQuery(dq, cs, holders, ss);
1652
1653 if (result == ProcessQueryResult::Drop) {
1654 return;
1655 }
1656
1657 if (result == ProcessQueryResult::SendAnswer) {
1658 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1659 if (dq.delayMsec == 0 && responsesVect != nullptr) {
1660 queueResponse(cs, reinterpret_cast<char*>(dq.dh), dq.len, *dq.local, *dq.remote, responsesVect[*queuedResponses], respIOV, respCBuf);
1661 (*queuedResponses)++;
1662 return;
1663 }
1664 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1665 /* we use dest, always, because we don't want to use the listening address to send a response since it could be 0.0.0.0 */
1666 sendUDPResponse(cs.udpFD, reinterpret_cast<char*>(dq.dh), dq.len, dq.delayMsec, dest, *dq.remote);
1667 return;
1668 }
1669
1670 if (result != ProcessQueryResult::PassToBackend || ss == nullptr) {
1671 return;
1672 }
1673
1674 unsigned int idOffset = (ss->idOffset++) % ss->idStates.size();
1675 IDState* ids = &ss->idStates[idOffset];
1676 ids->age = 0;
1677 DOHUnit* du = nullptr;
1678
1679 /* that means that the state was in use, possibly with an allocated
1680 DOHUnit that we will need to handle, but we can't touch it before
1681 confirming that we now own this state */
1682 if (ids->isInUse()) {
1683 du = ids->du;
1684 }
1685
1686 /* we atomically replace the value, we now own this state */
1687 if (!ids->markAsUsed()) {
1688 /* the state was not in use.
1689 we reset 'du' because it might have still been in use when we read it. */
1690 du = nullptr;
1691 ++ss->outstanding;
1692 }
1693 else {
1694 /* we are reusing a state, no change in outstanding but if there was an existing DOHUnit we need
1695 to handle it because it's about to be overwritten. */
1696 ids->du = nullptr;
1697 ++ss->reuseds;
1698 ++g_stats.downstreamTimeouts;
1699 handleDOHTimeout(du);
1700 }
1701
1702 ids->cs = &cs;
1703 ids->origFD = cs.udpFD;
1704 ids->origID = dh->id;
1705 setIDStateFromDNSQuestion(*ids, dq, std::move(qname));
1706
1707 /* If we couldn't harvest the real dest addr, still
1708 write down the listening addr since it will be useful
1709 (especially if it's not an 'any' one).
1710 We need to keep track of which one it is since we may
1711 want to use the real but not the listening addr to reply.
1712 */
1713 if (dest.sin4.sin_family != 0) {
1714 ids->origDest = dest;
1715 ids->destHarvested = true;
1716 }
1717 else {
1718 ids->origDest = cs.local;
1719 ids->destHarvested = false;
1720 }
1721
1722 dh->id = idOffset;
1723
1724 int fd = pickBackendSocketForSending(ss);
1725 ssize_t ret = udpClientSendRequestToBackend(ss, fd, query, dq.len);
1726
1727 if(ret < 0) {
1728 ++ss->sendErrors;
1729 ++g_stats.downstreamSendErrors;
1730 }
1731
1732 vinfolog("Got query for %s|%s from %s, relayed to %s", ids->qname.toLogString(), QType(ids->qtype).getName(), remote.toStringWithPort(), ss->getName());
1733 }
1734 catch(const std::exception& e){
1735 vinfolog("Got an error in UDP question thread while parsing a query from %s, id %d: %s", remote.toStringWithPort(), queryId, e.what());
1736 }
1737 }
1738
1739 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1740 static void MultipleMessagesUDPClientThread(ClientState* cs, LocalHolders& holders)
1741 {
1742 struct MMReceiver
1743 {
1744 char packet[s_maxPacketCacheEntrySize];
1745 ComboAddress remote;
1746 ComboAddress dest;
1747 struct iovec iov;
1748 /* used by HarvestDestinationAddress */
1749 cmsgbuf_aligned cbuf;
1750 };
1751 const size_t vectSize = g_udpVectorSize;
1752 /* the actual buffer is larger because:
1753 - we may have to add EDNS and/or ECS
1754 - we use it for self-generated responses (from rule or cache)
1755 but we only accept incoming payloads up to that size
1756 */
1757 static_assert(s_udpIncomingBufferSize <= sizeof(MMReceiver::packet), "the incoming buffer size should not be larger than sizeof(MMReceiver::packet)");
1758
1759 auto recvData = std::unique_ptr<MMReceiver[]>(new MMReceiver[vectSize]);
1760 auto msgVec = std::unique_ptr<struct mmsghdr[]>(new struct mmsghdr[vectSize]);
1761 auto outMsgVec = std::unique_ptr<struct mmsghdr[]>(new struct mmsghdr[vectSize]);
1762
1763 /* initialize the structures needed to receive our messages */
1764 for (size_t idx = 0; idx < vectSize; idx++) {
1765 recvData[idx].remote.sin4.sin_family = cs->local.sin4.sin_family;
1766 fillMSGHdr(&msgVec[idx].msg_hdr, &recvData[idx].iov, &recvData[idx].cbuf, sizeof(recvData[idx].cbuf), recvData[idx].packet, s_udpIncomingBufferSize, &recvData[idx].remote);
1767 }
1768
1769 /* go now */
1770 for(;;) {
1771
1772 /* reset the IO vector, since it's also used to send the vector of responses
1773 to avoid having to copy the data around */
1774 for (size_t idx = 0; idx < vectSize; idx++) {
1775 recvData[idx].iov.iov_base = recvData[idx].packet;
1776 recvData[idx].iov.iov_len = sizeof(recvData[idx].packet);
1777 }
1778
1779 /* block until we have at least one message ready, but return
1780 as many as possible to save the syscall costs */
1781 int msgsGot = recvmmsg(cs->udpFD, msgVec.get(), vectSize, MSG_WAITFORONE | MSG_TRUNC, nullptr);
1782
1783 if (msgsGot <= 0) {
1784 vinfolog("Getting UDP messages via recvmmsg() failed with: %s", stringerror());
1785 continue;
1786 }
1787
1788 unsigned int msgsToSend = 0;
1789
1790 /* process the received messages */
1791 for (int msgIdx = 0; msgIdx < msgsGot; msgIdx++) {
1792 const struct msghdr* msgh = &msgVec[msgIdx].msg_hdr;
1793 unsigned int got = msgVec[msgIdx].msg_len;
1794 const ComboAddress& remote = recvData[msgIdx].remote;
1795
1796 if (static_cast<size_t>(got) < sizeof(struct dnsheader)) {
1797 ++g_stats.nonCompliantQueries;
1798 continue;
1799 }
1800
1801 processUDPQuery(*cs, holders, msgh, remote, recvData[msgIdx].dest, recvData[msgIdx].packet, static_cast<uint16_t>(got), sizeof(recvData[msgIdx].packet), outMsgVec.get(), &msgsToSend, &recvData[msgIdx].iov, &recvData[msgIdx].cbuf);
1802
1803 }
1804
1805 /* immediate (not delayed or sent to a backend) responses (mostly from a rule, dynamic block
1806 or the cache) can be sent in batch too */
1807
1808 if (msgsToSend > 0 && msgsToSend <= static_cast<unsigned int>(msgsGot)) {
1809 int sent = sendmmsg(cs->udpFD, outMsgVec.get(), msgsToSend, 0);
1810
1811 if (sent < 0 || static_cast<unsigned int>(sent) != msgsToSend) {
1812 vinfolog("Error sending responses with sendmmsg() (%d on %u): %s", sent, msgsToSend, stringerror());
1813 }
1814 }
1815
1816 }
1817 }
1818 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1819
1820 // listens to incoming queries, sends out to downstream servers, noting the intended return path
1821 static void udpClientThread(ClientState* cs)
1822 try
1823 {
1824 setThreadName("dnsdist/udpClie");
1825 LocalHolders holders;
1826
1827 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1828 if (g_udpVectorSize > 1) {
1829 MultipleMessagesUDPClientThread(cs, holders);
1830
1831 }
1832 else
1833 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1834 {
1835 char packet[s_maxPacketCacheEntrySize];
1836 /* the actual buffer is larger because:
1837 - we may have to add EDNS and/or ECS
1838 - we use it for self-generated responses (from rule or cache)
1839 but we only accept incoming payloads up to that size
1840 */
1841 static_assert(s_udpIncomingBufferSize <= sizeof(packet), "the incoming buffer size should not be larger than sizeof(MMReceiver::packet)");
1842 struct msghdr msgh;
1843 struct iovec iov;
1844 /* used by HarvestDestinationAddress */
1845 cmsgbuf_aligned cbuf;
1846
1847 ComboAddress remote;
1848 ComboAddress dest;
1849 remote.sin4.sin_family = cs->local.sin4.sin_family;
1850 fillMSGHdr(&msgh, &iov, &cbuf, sizeof(cbuf), packet, s_udpIncomingBufferSize, &remote);
1851
1852 for(;;) {
1853 ssize_t got = recvmsg(cs->udpFD, &msgh, 0);
1854
1855 if (got < 0 || static_cast<size_t>(got) < sizeof(struct dnsheader)) {
1856 ++g_stats.nonCompliantQueries;
1857 continue;
1858 }
1859
1860 processUDPQuery(*cs, holders, &msgh, remote, dest, packet, static_cast<uint16_t>(got), sizeof(packet), nullptr, nullptr, nullptr, nullptr);
1861 }
1862 }
1863 }
1864 catch(const std::exception &e)
1865 {
1866 errlog("UDP client thread died because of exception: %s", e.what());
1867 }
1868 catch(const PDNSException &e)
1869 {
1870 errlog("UDP client thread died because of PowerDNS exception: %s", e.reason);
1871 }
1872 catch(...)
1873 {
1874 errlog("UDP client thread died because of an exception: %s", "unknown");
1875 }
1876
1877 uint16_t getRandomDNSID()
1878 {
1879 #ifdef HAVE_LIBSODIUM
1880 return randombytes_uniform(65536);
1881 #else
1882 return (random() % 65536);
1883 #endif
1884 }
1885
1886 uint64_t g_maxTCPClientThreads{10};
1887 std::atomic<uint16_t> g_cacheCleaningDelay{60};
1888 std::atomic<uint16_t> g_cacheCleaningPercentage{100};
1889
1890 void maintThread()
1891 {
1892 setThreadName("dnsdist/main");
1893 int interval = 1;
1894 size_t counter = 0;
1895 int32_t secondsToWaitLog = 0;
1896
1897 for(;;) {
1898 sleep(interval);
1899
1900 {
1901 std::lock_guard<std::mutex> lock(g_luamutex);
1902 auto f = g_lua.readVariable<boost::optional<std::function<void()> > >("maintenance");
1903 if(f) {
1904 try {
1905 (*f)();
1906 secondsToWaitLog = 0;
1907 }
1908 catch(std::exception &e) {
1909 if (secondsToWaitLog <= 0) {
1910 infolog("Error during execution of maintenance function: %s", e.what());
1911 secondsToWaitLog = 61;
1912 }
1913 secondsToWaitLog -= interval;
1914 }
1915 }
1916 }
1917
1918 counter++;
1919 if (counter >= g_cacheCleaningDelay) {
1920 /* keep track, for each cache, of whether we should keep
1921 expired entries */
1922 std::map<std::shared_ptr<DNSDistPacketCache>, bool> caches;
1923
1924 /* gather all caches actually used by at least one pool, and see
1925 if something prevents us from cleaning the expired entries */
1926 auto localPools = g_pools.getLocal();
1927 for (const auto& entry : *localPools) {
1928 auto& pool = entry.second;
1929
1930 auto packetCache = pool->packetCache;
1931 if (!packetCache) {
1932 continue;
1933 }
1934
1935 auto pair = caches.insert({packetCache, false});
1936 auto& iter = pair.first;
1937 /* if we need to keep stale data for this cache (ie, not clear
1938 expired entries when at least one pool using this cache
1939 has all its backends down) */
1940 if (packetCache->keepStaleData() && iter->second == false) {
1941 /* so far all pools had at least one backend up */
1942 if (pool->countServers(true) == 0) {
1943 iter->second = true;
1944 }
1945 }
1946 }
1947
1948 for (auto pair : caches) {
1949 /* shall we keep expired entries ? */
1950 if (pair.second == true) {
1951 continue;
1952 }
1953 auto& packetCache = pair.first;
1954 size_t upTo = (packetCache->getMaxEntries()* (100 - g_cacheCleaningPercentage)) / 100;
1955 packetCache->purgeExpired(upTo);
1956 }
1957 counter = 0;
1958 }
1959
1960 // ponder pruning g_dynblocks of expired entries here
1961 }
1962 }
1963
1964 static void secPollThread()
1965 {
1966 setThreadName("dnsdist/secpoll");
1967
1968 for (;;) {
1969 try {
1970 doSecPoll(g_secPollSuffix);
1971 }
1972 catch(...) {
1973 }
1974 sleep(g_secPollInterval);
1975 }
1976 }
1977
1978 static void healthChecksThread()
1979 {
1980 setThreadName("dnsdist/healthC");
1981
1982 static const int interval = 1;
1983
1984 for(;;) {
1985 sleep(interval);
1986
1987 if(g_tcpclientthreads->getQueuedCount() > 1 && !g_tcpclientthreads->hasReachedMaxThreads()) {
1988 g_tcpclientthreads->addTCPClientThread();
1989 }
1990
1991 auto mplexer = std::shared_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
1992 auto states = g_dstates.getLocal(); // this points to the actual shared_ptrs!
1993 for(auto& dss : *states) {
1994 if(++dss->lastCheck < dss->checkInterval) {
1995 continue;
1996 }
1997
1998 dss->lastCheck = 0;
1999
2000 if (dss->availability == DownstreamState::Availability::Auto) {
2001 if (!queueHealthCheck(mplexer, dss)) {
2002 updateHealthCheckResult(dss, false);
2003 }
2004 }
2005
2006 auto delta = dss->sw.udiffAndSet()/1000000.0;
2007 dss->queryLoad = 1.0*(dss->queries.load() - dss->prev.queries.load())/delta;
2008 dss->dropRate = 1.0*(dss->reuseds.load() - dss->prev.reuseds.load())/delta;
2009 dss->prev.queries.store(dss->queries.load());
2010 dss->prev.reuseds.store(dss->reuseds.load());
2011
2012 for (IDState& ids : dss->idStates) { // timeouts
2013 int64_t usageIndicator = ids.usageIndicator;
2014 if(IDState::isInUse(usageIndicator) && ids.age++ > g_udpTimeout) {
2015 /* We mark the state as unused as soon as possible
2016 to limit the risk of racing with the
2017 responder thread.
2018 */
2019 auto oldDU = ids.du;
2020
2021 if (!ids.tryMarkUnused(usageIndicator)) {
2022 /* this state has been altered in the meantime,
2023 don't go anywhere near it */
2024 continue;
2025 }
2026 ids.du = nullptr;
2027 handleDOHTimeout(oldDU);
2028 ids.age = 0;
2029 dss->reuseds++;
2030 --dss->outstanding;
2031 ++g_stats.downstreamTimeouts; // this is an 'actively' discovered timeout
2032 vinfolog("Had a downstream timeout from %s (%s) for query for %s|%s from %s",
2033 dss->remote.toStringWithPort(), dss->name,
2034 ids.qname.toLogString(), QType(ids.qtype).getName(), ids.origRemote.toStringWithPort());
2035
2036 struct timespec ts;
2037 gettime(&ts);
2038
2039 struct dnsheader fake;
2040 memset(&fake, 0, sizeof(fake));
2041 fake.id = ids.origID;
2042
2043 g_rings.insertResponse(ts, ids.origRemote, ids.qname, ids.qtype, std::numeric_limits<unsigned int>::max(), 0, fake, dss->remote);
2044 }
2045 }
2046 }
2047
2048 handleQueuedHealthChecks(mplexer);
2049 }
2050 }
2051
2052 static void bindAny(int af, int sock)
2053 {
2054 __attribute__((unused)) int one = 1;
2055
2056 #ifdef IP_FREEBIND
2057 if (setsockopt(sock, IPPROTO_IP, IP_FREEBIND, &one, sizeof(one)) < 0)
2058 warnlog("Warning: IP_FREEBIND setsockopt failed: %s", stringerror());
2059 #endif
2060
2061 #ifdef IP_BINDANY
2062 if (af == AF_INET)
2063 if (setsockopt(sock, IPPROTO_IP, IP_BINDANY, &one, sizeof(one)) < 0)
2064 warnlog("Warning: IP_BINDANY setsockopt failed: %s", stringerror());
2065 #endif
2066 #ifdef IPV6_BINDANY
2067 if (af == AF_INET6)
2068 if (setsockopt(sock, IPPROTO_IPV6, IPV6_BINDANY, &one, sizeof(one)) < 0)
2069 warnlog("Warning: IPV6_BINDANY setsockopt failed: %s", stringerror());
2070 #endif
2071 #ifdef SO_BINDANY
2072 if (setsockopt(sock, SOL_SOCKET, SO_BINDANY, &one, sizeof(one)) < 0)
2073 warnlog("Warning: SO_BINDANY setsockopt failed: %s", stringerror());
2074 #endif
2075 }
2076
2077 static void dropGroupPrivs(gid_t gid)
2078 {
2079 if (gid) {
2080 if (setgid(gid) == 0) {
2081 if (setgroups(0, NULL) < 0) {
2082 warnlog("Warning: Unable to drop supplementary gids: %s", stringerror());
2083 }
2084 }
2085 else {
2086 warnlog("Warning: Unable to set group ID to %d: %s", gid, stringerror());
2087 }
2088 }
2089 }
2090
2091 static void dropUserPrivs(uid_t uid)
2092 {
2093 if(uid) {
2094 if(setuid(uid) < 0) {
2095 warnlog("Warning: Unable to set user ID to %d: %s", uid, stringerror());
2096 }
2097 }
2098 }
2099
2100 static void checkFileDescriptorsLimits(size_t udpBindsCount, size_t tcpBindsCount)
2101 {
2102 /* stdin, stdout, stderr */
2103 size_t requiredFDsCount = 3;
2104 auto backends = g_dstates.getLocal();
2105 /* UDP sockets to backends */
2106 size_t backendUDPSocketsCount = 0;
2107 for (const auto& backend : *backends) {
2108 backendUDPSocketsCount += backend->sockets.size();
2109 }
2110 requiredFDsCount += backendUDPSocketsCount;
2111 /* TCP sockets to backends */
2112 requiredFDsCount += (backends->size() * g_maxTCPClientThreads);
2113 /* listening sockets */
2114 requiredFDsCount += udpBindsCount;
2115 requiredFDsCount += tcpBindsCount;
2116 /* max TCP connections currently served */
2117 requiredFDsCount += g_maxTCPClientThreads;
2118 /* max pipes for communicating between TCP acceptors and client threads */
2119 requiredFDsCount += (g_maxTCPClientThreads * 2);
2120 /* max TCP queued connections */
2121 requiredFDsCount += g_maxTCPQueuedConnections;
2122 /* DelayPipe pipe */
2123 requiredFDsCount += 2;
2124 /* syslog socket */
2125 requiredFDsCount++;
2126 /* webserver main socket */
2127 requiredFDsCount++;
2128 /* console main socket */
2129 requiredFDsCount++;
2130 /* carbon export */
2131 requiredFDsCount++;
2132 /* history file */
2133 requiredFDsCount++;
2134 struct rlimit rl;
2135 getrlimit(RLIMIT_NOFILE, &rl);
2136 if (rl.rlim_cur <= requiredFDsCount) {
2137 warnlog("Warning, this configuration can use more than %d file descriptors, web server and console connections not included, and the current limit is %d.", std::to_string(requiredFDsCount), std::to_string(rl.rlim_cur));
2138 #ifdef HAVE_SYSTEMD
2139 warnlog("You can increase this value by using LimitNOFILE= in the systemd unit file or ulimit.");
2140 #else
2141 warnlog("You can increase this value by using ulimit.");
2142 #endif
2143 }
2144 }
2145
2146 static void setUpLocalBind(std::unique_ptr<ClientState>& cs)
2147 {
2148 /* skip some warnings if there is an identical UDP context */
2149 bool warn = cs->tcp == false || cs->tlsFrontend != nullptr || cs->dohFrontend != nullptr;
2150 int& fd = cs->tcp == false ? cs->udpFD : cs->tcpFD;
2151 (void) warn;
2152
2153 fd = SSocket(cs->local.sin4.sin_family, cs->tcp == false ? SOCK_DGRAM : SOCK_STREAM, 0);
2154
2155 if (cs->tcp) {
2156 SSetsockopt(fd, SOL_SOCKET, SO_REUSEADDR, 1);
2157 #ifdef TCP_DEFER_ACCEPT
2158 SSetsockopt(fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, 1);
2159 #endif
2160 if (cs->fastOpenQueueSize > 0) {
2161 #ifdef TCP_FASTOPEN
2162 SSetsockopt(fd, IPPROTO_TCP, TCP_FASTOPEN, cs->fastOpenQueueSize);
2163 #else
2164 if (warn) {
2165 warnlog("TCP Fast Open has been configured on local address '%s' but is not supported", cs->local.toStringWithPort());
2166 }
2167 #endif
2168 }
2169 }
2170
2171 if(cs->local.sin4.sin_family == AF_INET6) {
2172 SSetsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, 1);
2173 }
2174
2175 bindAny(cs->local.sin4.sin_family, fd);
2176
2177 if(!cs->tcp && IsAnyAddress(cs->local)) {
2178 int one=1;
2179 setsockopt(fd, IPPROTO_IP, GEN_IP_PKTINFO, &one, sizeof(one)); // linux supports this, so why not - might fail on other systems
2180 #ifdef IPV6_RECVPKTINFO
2181 setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, &one, sizeof(one));
2182 #endif
2183 }
2184
2185 if (cs->reuseport) {
2186 #ifdef SO_REUSEPORT
2187 SSetsockopt(fd, SOL_SOCKET, SO_REUSEPORT, 1);
2188 #else
2189 if (warn) {
2190 /* no need to warn again if configured but support is not available, we already did for UDP */
2191 warnlog("SO_REUSEPORT has been configured on local address '%s' but is not supported", cs->local.toStringWithPort());
2192 }
2193 #endif
2194 }
2195
2196 if (!cs->tcp) {
2197 if (cs->local.isIPv4()) {
2198 try {
2199 setSocketIgnorePMTU(cs->udpFD);
2200 }
2201 catch(const std::exception& e) {
2202 warnlog("Failed to set IP_MTU_DISCOVER on UDP server socket for local address '%s': %s", cs->local.toStringWithPort(), e.what());
2203 }
2204 }
2205 }
2206
2207 const std::string& itf = cs->interface;
2208 if (!itf.empty()) {
2209 #ifdef SO_BINDTODEVICE
2210 int res = setsockopt(fd, SOL_SOCKET, SO_BINDTODEVICE, itf.c_str(), itf.length());
2211 if (res != 0) {
2212 warnlog("Error setting up the interface on local address '%s': %s", cs->local.toStringWithPort(), stringerror());
2213 }
2214 #else
2215 if (warn) {
2216 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", cs->local.toStringWithPort());
2217 }
2218 #endif
2219 }
2220
2221 #ifdef HAVE_EBPF
2222 if (g_defaultBPFFilter) {
2223 cs->attachFilter(g_defaultBPFFilter);
2224 vinfolog("Attaching default BPF Filter to %s frontend %s", (!cs->tcp ? "UDP" : "TCP"), cs->local.toStringWithPort());
2225 }
2226 #endif /* HAVE_EBPF */
2227
2228 if (cs->tlsFrontend != nullptr) {
2229 if (!cs->tlsFrontend->setupTLS()) {
2230 errlog("Error while setting up TLS on local address '%s', exiting", cs->local.toStringWithPort());
2231 _exit(EXIT_FAILURE);
2232 }
2233 }
2234
2235 if (cs->dohFrontend != nullptr) {
2236 cs->dohFrontend->setup();
2237 }
2238
2239 SBind(fd, cs->local);
2240
2241 if (cs->tcp) {
2242 SListen(cs->tcpFD, SOMAXCONN);
2243 if (cs->tlsFrontend != nullptr) {
2244 warnlog("Listening on %s for TLS", cs->local.toStringWithPort());
2245 }
2246 else if (cs->dohFrontend != nullptr) {
2247 warnlog("Listening on %s for DoH", cs->local.toStringWithPort());
2248 }
2249 else if (cs->dnscryptCtx != nullptr) {
2250 warnlog("Listening on %s for DNSCrypt", cs->local.toStringWithPort());
2251 }
2252 else {
2253 warnlog("Listening on %s", cs->local.toStringWithPort());
2254 }
2255 }
2256
2257 cs->ready = true;
2258 }
2259
2260 struct
2261 {
2262 vector<string> locals;
2263 vector<string> remotes;
2264 bool checkConfig{false};
2265 bool beClient{false};
2266 bool beSupervised{false};
2267 string command;
2268 string config;
2269 string uid;
2270 string gid;
2271 } g_cmdLine;
2272
2273 std::atomic<bool> g_configurationDone{false};
2274
2275 static void usage()
2276 {
2277 cout<<endl;
2278 cout<<"Syntax: dnsdist [-C,--config file] [-c,--client [IP[:PORT]]]\n";
2279 cout<<"[-e,--execute cmd] [-h,--help] [-l,--local addr]\n";
2280 cout<<"[-v,--verbose] [--check-config] [--version]\n";
2281 cout<<"\n";
2282 cout<<"-a,--acl netmask Add this netmask to the ACL\n";
2283 cout<<"-C,--config file Load configuration from 'file'\n";
2284 cout<<"-c,--client Operate as a client, connect to dnsdist. This reads\n";
2285 cout<<" controlSocket from your configuration file, but also\n";
2286 cout<<" accepts an IP:PORT argument\n";
2287 #ifdef HAVE_LIBSODIUM
2288 cout<<"-k,--setkey KEY Use KEY for encrypted communication to dnsdist. This\n";
2289 cout<<" is similar to setting setKey in the configuration file.\n";
2290 cout<<" NOTE: this will leak this key in your shell's history\n";
2291 cout<<" and in the systems running process list.\n";
2292 #endif
2293 cout<<"--check-config Validate the configuration file and exit. The exit-code\n";
2294 cout<<" reflects the validation, 0 is OK, 1 means an error.\n";
2295 cout<<" Any errors are printed as well.\n";
2296 cout<<"-e,--execute cmd Connect to dnsdist and execute 'cmd'\n";
2297 cout<<"-g,--gid gid Change the process group ID after binding sockets\n";
2298 cout<<"-h,--help Display this helpful message\n";
2299 cout<<"-l,--local address Listen on this local address\n";
2300 cout<<"--supervised Don't open a console, I'm supervised\n";
2301 cout<<" (use with e.g. systemd and daemontools)\n";
2302 cout<<"--disable-syslog Don't log to syslog, only to stdout\n";
2303 cout<<" (use with e.g. systemd)\n";
2304 cout<<"-u,--uid uid Change the process user ID after binding sockets\n";
2305 cout<<"-v,--verbose Enable verbose mode\n";
2306 cout<<"-V,--version Show dnsdist version information and exit\n";
2307 }
2308
2309 int main(int argc, char** argv)
2310 try
2311 {
2312 size_t udpBindsCount = 0;
2313 size_t tcpBindsCount = 0;
2314 rl_attempted_completion_function = my_completion;
2315 rl_completion_append_character = 0;
2316
2317 signal(SIGPIPE, SIG_IGN);
2318 signal(SIGCHLD, SIG_IGN);
2319 openlog("dnsdist", LOG_PID|LOG_NDELAY, LOG_DAEMON);
2320
2321 #ifdef HAVE_LIBSODIUM
2322 if (sodium_init() == -1) {
2323 cerr<<"Unable to initialize crypto library"<<endl;
2324 exit(EXIT_FAILURE);
2325 }
2326 g_hashperturb=randombytes_uniform(0xffffffff);
2327 srandom(randombytes_uniform(0xffffffff));
2328 #else
2329 {
2330 struct timeval tv;
2331 gettimeofday(&tv, 0);
2332 srandom(tv.tv_sec ^ tv.tv_usec ^ getpid());
2333 g_hashperturb=random();
2334 }
2335
2336 #endif
2337 ComboAddress clientAddress = ComboAddress();
2338 g_cmdLine.config=SYSCONFDIR "/dnsdist.conf";
2339 struct option longopts[]={
2340 {"acl", required_argument, 0, 'a'},
2341 {"check-config", no_argument, 0, 1},
2342 {"client", no_argument, 0, 'c'},
2343 {"config", required_argument, 0, 'C'},
2344 {"disable-syslog", no_argument, 0, 2},
2345 {"execute", required_argument, 0, 'e'},
2346 {"gid", required_argument, 0, 'g'},
2347 {"help", no_argument, 0, 'h'},
2348 {"local", required_argument, 0, 'l'},
2349 {"setkey", required_argument, 0, 'k'},
2350 {"supervised", no_argument, 0, 3},
2351 {"uid", required_argument, 0, 'u'},
2352 {"verbose", no_argument, 0, 'v'},
2353 {"version", no_argument, 0, 'V'},
2354 {0,0,0,0}
2355 };
2356 int longindex=0;
2357 string optstring;
2358 for(;;) {
2359 int c=getopt_long(argc, argv, "a:cC:e:g:hk:l:u:vV", longopts, &longindex);
2360 if(c==-1)
2361 break;
2362 switch(c) {
2363 case 1:
2364 g_cmdLine.checkConfig=true;
2365 break;
2366 case 2:
2367 g_syslog=false;
2368 break;
2369 case 3:
2370 g_cmdLine.beSupervised=true;
2371 break;
2372 case 'C':
2373 g_cmdLine.config=optarg;
2374 break;
2375 case 'c':
2376 g_cmdLine.beClient=true;
2377 break;
2378 case 'e':
2379 g_cmdLine.command=optarg;
2380 break;
2381 case 'g':
2382 g_cmdLine.gid=optarg;
2383 break;
2384 case 'h':
2385 cout<<"dnsdist "<<VERSION<<endl;
2386 usage();
2387 cout<<"\n";
2388 exit(EXIT_SUCCESS);
2389 break;
2390 case 'a':
2391 optstring=optarg;
2392 g_ACL.modify([optstring](NetmaskGroup& nmg) { nmg.addMask(optstring); });
2393 break;
2394 case 'k':
2395 #ifdef HAVE_LIBSODIUM
2396 if (B64Decode(string(optarg), g_consoleKey) < 0) {
2397 cerr<<"Unable to decode key '"<<optarg<<"'."<<endl;
2398 exit(EXIT_FAILURE);
2399 }
2400 #else
2401 cerr<<"dnsdist has been built without libsodium, -k/--setkey is unsupported."<<endl;
2402 exit(EXIT_FAILURE);
2403 #endif
2404 break;
2405 case 'l':
2406 g_cmdLine.locals.push_back(trim_copy(string(optarg)));
2407 break;
2408 case 'u':
2409 g_cmdLine.uid=optarg;
2410 break;
2411 case 'v':
2412 g_verbose=true;
2413 break;
2414 case 'V':
2415 #ifdef LUAJIT_VERSION
2416 cout<<"dnsdist "<<VERSION<<" ("<<LUA_RELEASE<<" ["<<LUAJIT_VERSION<<"])"<<endl;
2417 #else
2418 cout<<"dnsdist "<<VERSION<<" ("<<LUA_RELEASE<<")"<<endl;
2419 #endif
2420 cout<<"Enabled features: ";
2421 #ifdef HAVE_CDB
2422 cout<<"cdb ";
2423 #endif
2424 #ifdef HAVE_DNS_OVER_TLS
2425 cout<<"dns-over-tls(";
2426 #ifdef HAVE_GNUTLS
2427 cout<<"gnutls";
2428 #ifdef HAVE_LIBSSL
2429 cout<<" ";
2430 #endif
2431 #endif
2432 #ifdef HAVE_LIBSSL
2433 cout<<"openssl";
2434 #endif
2435 cout<<") ";
2436 #endif
2437 #ifdef HAVE_DNS_OVER_HTTPS
2438 cout<<"dns-over-https(DOH) ";
2439 #endif
2440 #ifdef HAVE_DNSCRYPT
2441 cout<<"dnscrypt ";
2442 #endif
2443 #ifdef HAVE_EBPF
2444 cout<<"ebpf ";
2445 #endif
2446 #ifdef HAVE_FSTRM
2447 cout<<"fstrm ";
2448 #endif
2449 #ifdef HAVE_LIBCRYPTO
2450 cout<<"ipcipher ";
2451 #endif
2452 #ifdef HAVE_LIBSODIUM
2453 cout<<"libsodium ";
2454 #endif
2455 #ifdef HAVE_LMDB
2456 cout<<"lmdb ";
2457 #endif
2458 #ifdef HAVE_PROTOBUF
2459 cout<<"protobuf ";
2460 #endif
2461 #ifdef HAVE_RE2
2462 cout<<"re2 ";
2463 #endif
2464 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
2465 cout<<"recvmmsg/sendmmsg ";
2466 #endif
2467 #ifdef HAVE_NET_SNMP
2468 cout<<"snmp ";
2469 #endif
2470 #ifdef HAVE_SYSTEMD
2471 cout<<"systemd";
2472 #endif
2473 cout<<endl;
2474 exit(EXIT_SUCCESS);
2475 break;
2476 case '?':
2477 //getopt_long printed an error message.
2478 usage();
2479 exit(EXIT_FAILURE);
2480 break;
2481 }
2482 }
2483
2484 argc-=optind;
2485 argv+=optind;
2486 for(auto p = argv; *p; ++p) {
2487 if(g_cmdLine.beClient) {
2488 clientAddress = ComboAddress(*p, 5199);
2489 } else {
2490 g_cmdLine.remotes.push_back(*p);
2491 }
2492 }
2493
2494 ServerPolicy leastOutstandingPol{"leastOutstanding", leastOutstanding, false};
2495
2496 g_policy.setState(leastOutstandingPol);
2497 if(g_cmdLine.beClient || !g_cmdLine.command.empty()) {
2498 setupLua(true, false, g_cmdLine.config);
2499 if (clientAddress != ComboAddress())
2500 g_serverControl = clientAddress;
2501 doClient(g_serverControl, g_cmdLine.command);
2502 _exit(EXIT_SUCCESS);
2503 }
2504
2505 auto acl = g_ACL.getCopy();
2506 if(acl.empty()) {
2507 for(auto& addr : {"127.0.0.0/8", "10.0.0.0/8", "100.64.0.0/10", "169.254.0.0/16", "192.168.0.0/16", "172.16.0.0/12", "::1/128", "fc00::/7", "fe80::/10"})
2508 acl.addMask(addr);
2509 g_ACL.setState(acl);
2510 }
2511
2512 auto consoleACL = g_consoleACL.getCopy();
2513 for (const auto& mask : { "127.0.0.1/8", "::1/128" }) {
2514 consoleACL.addMask(mask);
2515 }
2516 g_consoleACL.setState(consoleACL);
2517
2518 if (g_cmdLine.checkConfig) {
2519 setupLua(false, true, g_cmdLine.config);
2520 // No exception was thrown
2521 infolog("Configuration '%s' OK!", g_cmdLine.config);
2522 _exit(EXIT_SUCCESS);
2523 }
2524
2525 auto todo=setupLua(false, false, g_cmdLine.config);
2526
2527 auto localPools = g_pools.getCopy();
2528 {
2529 bool precompute = false;
2530 if (g_policy.getLocal()->name == "chashed") {
2531 precompute = true;
2532 } else {
2533 for (const auto& entry: localPools) {
2534 if (entry.second->policy != nullptr && entry.second->policy->name == "chashed") {
2535 precompute = true;
2536 break ;
2537 }
2538 }
2539 }
2540 if (precompute) {
2541 vinfolog("Pre-computing hashes for consistent hash load-balancing policy");
2542 // pre compute hashes
2543 auto backends = g_dstates.getLocal();
2544 for (auto& backend: *backends) {
2545 backend->hash();
2546 }
2547 }
2548 }
2549
2550 if (!g_cmdLine.locals.empty()) {
2551 for (auto it = g_frontends.begin(); it != g_frontends.end(); ) {
2552 /* DoH, DoT and DNSCrypt frontends are separate */
2553 if ((*it)->dohFrontend == nullptr && (*it)->tlsFrontend == nullptr && (*it)->dnscryptCtx == nullptr) {
2554 it = g_frontends.erase(it);
2555 }
2556 else {
2557 ++it;
2558 }
2559 }
2560
2561 for(const auto& loc : g_cmdLine.locals) {
2562 /* UDP */
2563 g_frontends.push_back(std::unique_ptr<ClientState>(new ClientState(ComboAddress(loc, 53), false, false, 0, "", {})));
2564 /* TCP */
2565 g_frontends.push_back(std::unique_ptr<ClientState>(new ClientState(ComboAddress(loc, 53), true, false, 0, "", {})));
2566 }
2567 }
2568
2569 if (g_frontends.empty()) {
2570 /* UDP */
2571 g_frontends.push_back(std::unique_ptr<ClientState>(new ClientState(ComboAddress("127.0.0.1", 53), false, false, 0, "", {})));
2572 /* TCP */
2573 g_frontends.push_back(std::unique_ptr<ClientState>(new ClientState(ComboAddress("127.0.0.1", 53), true, false, 0, "", {})));
2574 }
2575
2576 g_configurationDone = true;
2577
2578 for(auto& frontend : g_frontends) {
2579 setUpLocalBind(frontend);
2580
2581 if (frontend->tcp == false) {
2582 ++udpBindsCount;
2583 }
2584 else {
2585 ++tcpBindsCount;
2586 }
2587 }
2588
2589 warnlog("dnsdist %s comes with ABSOLUTELY NO WARRANTY. This is free software, and you are welcome to redistribute it according to the terms of the GPL version 2", VERSION);
2590
2591 vector<string> vec;
2592 std::string acls;
2593 g_ACL.getLocal()->toStringVector(&vec);
2594 for(const auto& s : vec) {
2595 if (!acls.empty())
2596 acls += ", ";
2597 acls += s;
2598 }
2599 infolog("ACL allowing queries from: %s", acls.c_str());
2600 vec.clear();
2601 acls.clear();
2602 g_consoleACL.getLocal()->toStringVector(&vec);
2603 for (const auto& entry : vec) {
2604 if (!acls.empty()) {
2605 acls += ", ";
2606 }
2607 acls += entry;
2608 }
2609 infolog("Console ACL allowing connections from: %s", acls.c_str());
2610
2611 #ifdef HAVE_LIBSODIUM
2612 if (g_consoleEnabled && g_consoleKey.empty()) {
2613 warnlog("Warning, the console has been enabled via 'controlSocket()' but no key has been set with 'setKey()' so all connections will fail until a key has been set");
2614 }
2615 #endif
2616
2617 uid_t newgid=getegid();
2618 gid_t newuid=geteuid();
2619
2620 if(!g_cmdLine.gid.empty())
2621 newgid = strToGID(g_cmdLine.gid.c_str());
2622
2623 if(!g_cmdLine.uid.empty())
2624 newuid = strToUID(g_cmdLine.uid.c_str());
2625
2626 if (getegid() != newgid) {
2627 if (running_in_service_mgr()) {
2628 errlog("--gid/-g set on command-line, but dnsdist was started as a systemd service. Use the 'Group' setting in the systemd unit file to set the group to run as");
2629 _exit(EXIT_FAILURE);
2630 }
2631 dropGroupPrivs(newgid);
2632 }
2633
2634 if (geteuid() != newuid) {
2635 if (running_in_service_mgr()) {
2636 errlog("--uid/-u set on command-line, but dnsdist was started as a systemd service. Use the 'User' setting in the systemd unit file to set the user to run as");
2637 _exit(EXIT_FAILURE);
2638 }
2639 dropUserPrivs(newuid);
2640 }
2641
2642 try {
2643 /* we might still have capabilities remaining,
2644 for example if we have been started as root
2645 without --uid or --gid (please don't do that)
2646 or as an unprivileged user with ambient
2647 capabilities like CAP_NET_BIND_SERVICE.
2648 */
2649 dropCapabilities(g_capabilitiesToRetain);
2650 }
2651 catch(const std::exception& e) {
2652 warnlog("%s", e.what());
2653 }
2654
2655 /* this need to be done _after_ dropping privileges */
2656 g_delay = new DelayPipe<DelayedPacket>();
2657
2658 if (g_snmpAgent) {
2659 g_snmpAgent->run();
2660 }
2661
2662 g_tcpclientthreads = std::unique_ptr<TCPClientCollection>(new TCPClientCollection(g_maxTCPClientThreads, g_useTCPSinglePipe));
2663
2664 for(auto& t : todo)
2665 t();
2666
2667 localPools = g_pools.getCopy();
2668 /* create the default pool no matter what */
2669 createPoolIfNotExists(localPools, "");
2670 if(g_cmdLine.remotes.size()) {
2671 for(const auto& address : g_cmdLine.remotes) {
2672 auto ret=std::make_shared<DownstreamState>(ComboAddress(address, 53));
2673 addServerToPool(localPools, "", ret);
2674 if (ret->connected && !ret->threadStarted.test_and_set()) {
2675 ret->tid = thread(responderThread, ret);
2676 }
2677 g_dstates.modify([ret](servers_t& servers) { servers.push_back(ret); });
2678 }
2679 }
2680 g_pools.setState(localPools);
2681
2682 if(g_dstates.getLocal()->empty()) {
2683 errlog("No downstream servers defined: all packets will get dropped");
2684 // you might define them later, but you need to know
2685 }
2686
2687 checkFileDescriptorsLimits(udpBindsCount, tcpBindsCount);
2688
2689 auto mplexer = std::shared_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
2690 for(auto& dss : g_dstates.getCopy()) { // it is a copy, but the internal shared_ptrs are the real deal
2691 if (dss->availability == DownstreamState::Availability::Auto) {
2692 if (!queueHealthCheck(mplexer, dss, true)) {
2693 dss->upStatus = false;
2694 warnlog("Marking downstream %s as 'down'", dss->getNameWithAddr());
2695 }
2696 }
2697 }
2698 handleQueuedHealthChecks(mplexer, true);
2699
2700 for(auto& cs : g_frontends) {
2701 if (cs->dohFrontend != nullptr) {
2702 #ifdef HAVE_DNS_OVER_HTTPS
2703 std::thread t1(dohThread, cs.get());
2704 if (!cs->cpus.empty()) {
2705 mapThreadToCPUList(t1.native_handle(), cs->cpus);
2706 }
2707 t1.detach();
2708 #endif /* HAVE_DNS_OVER_HTTPS */
2709 continue;
2710 }
2711 if (cs->udpFD >= 0) {
2712 thread t1(udpClientThread, cs.get());
2713 if (!cs->cpus.empty()) {
2714 mapThreadToCPUList(t1.native_handle(), cs->cpus);
2715 }
2716 t1.detach();
2717 }
2718 else if (cs->tcpFD >= 0) {
2719 thread t1(tcpAcceptorThread, cs.get());
2720 if (!cs->cpus.empty()) {
2721 mapThreadToCPUList(t1.native_handle(), cs->cpus);
2722 }
2723 t1.detach();
2724 }
2725 }
2726
2727 thread carbonthread(carbonDumpThread);
2728 carbonthread.detach();
2729
2730 thread stattid(maintThread);
2731 stattid.detach();
2732
2733 thread healththread(healthChecksThread);
2734
2735 if (!g_secPollSuffix.empty()) {
2736 thread secpollthread(secPollThread);
2737 secpollthread.detach();
2738 }
2739
2740 if(g_cmdLine.beSupervised) {
2741 #ifdef HAVE_SYSTEMD
2742 sd_notify(0, "READY=1");
2743 #endif
2744 healththread.join();
2745 }
2746 else {
2747 healththread.detach();
2748 doConsole();
2749 }
2750 _exit(EXIT_SUCCESS);
2751
2752 }
2753 catch(const LuaContext::ExecutionErrorException& e) {
2754 try {
2755 errlog("Fatal Lua error: %s", e.what());
2756 std::rethrow_if_nested(e);
2757 } catch(const std::exception& ne) {
2758 errlog("Details: %s", ne.what());
2759 }
2760 catch(PDNSException &ae)
2761 {
2762 errlog("Fatal pdns error: %s", ae.reason);
2763 }
2764 _exit(EXIT_FAILURE);
2765 }
2766 catch(std::exception &e)
2767 {
2768 errlog("Fatal error: %s", e.what());
2769 _exit(EXIT_FAILURE);
2770 }
2771 catch(PDNSException &ae)
2772 {
2773 errlog("Fatal pdns error: %s", ae.reason);
2774 _exit(EXIT_FAILURE);
2775 }
2776
2777 uint64_t getLatencyCount(const std::string&)
2778 {
2779 return g_stats.responses + g_stats.selfAnswered + g_stats.cacheHits;
2780 }