]> git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/dnsdist.cc
Merge pull request #8601 from omoerbeek/no-copy-rwlocks
[thirdparty/pdns.git] / pdns / dnsdist.cc
1 /*
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
8 *
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
22
23 #include "config.h"
24
25 #include <fstream>
26 #include <getopt.h>
27 #include <grp.h>
28 #include <limits>
29 #include <netinet/tcp.h>
30 #include <pwd.h>
31 #include <sys/resource.h>
32 #include <unistd.h>
33
34 #if defined (__OpenBSD__) || defined(__NetBSD__)
35 #include <readline/readline.h>
36 #else
37 #include <editline/readline.h>
38 #endif
39
40 #ifdef HAVE_SYSTEMD
41 #include <systemd/sd-daemon.h>
42 #endif
43
44 #include "dnsdist.hh"
45 #include "dnsdist-cache.hh"
46 #include "dnsdist-console.hh"
47 #include "dnsdist-ecs.hh"
48 #include "dnsdist-lua.hh"
49 #include "dnsdist-rings.hh"
50 #include "dnsdist-secpoll.hh"
51 #include "dnsdist-xpf.hh"
52
53 #include "base64.hh"
54 #include "delaypipe.hh"
55 #include "dolog.hh"
56 #include "dnsname.hh"
57 #include "dnsparser.hh"
58 #include "dnswriter.hh"
59 #include "ednsoptions.hh"
60 #include "gettime.hh"
61 #include "lock.hh"
62 #include "misc.hh"
63 #include "sodcrypto.hh"
64 #include "sstuff.hh"
65 #include "threadname.hh"
66
67 /* Known sins:
68
69 Receiver is currently single threaded
70 not *that* bad actually, but now that we are thread safe, might want to scale
71 */
72
73 /* the Rulaction plan
74 Set of Rules, if one matches, it leads to an Action
75 Both rules and actions could conceivably be Lua based.
76 On the C++ side, both could be inherited from a class Rule and a class Action,
77 on the Lua side we can't do that. */
78
79 using std::atomic;
80 using std::thread;
81 bool g_verbose;
82
83 struct DNSDistStats g_stats;
84 MetricDefinitionStorage g_metricDefinitions;
85
86 uint16_t g_maxOutstanding{std::numeric_limits<uint16_t>::max()};
87 bool g_verboseHealthChecks{false};
88 uint32_t g_staleCacheEntriesTTL{0};
89 bool g_syslog{true};
90 bool g_allowEmptyResponse{false};
91
92 GlobalStateHolder<NetmaskGroup> g_ACL;
93 string g_outputBuffer;
94
95 std::vector<std::shared_ptr<TLSFrontend>> g_tlslocals;
96 std::vector<std::shared_ptr<DOHFrontend>> g_dohlocals;
97 std::vector<std::shared_ptr<DNSCryptContext>> g_dnsCryptLocals;
98 #ifdef HAVE_EBPF
99 shared_ptr<BPFFilter> g_defaultBPFFilter;
100 std::vector<std::shared_ptr<DynBPFFilter> > g_dynBPFFilters;
101 #endif /* HAVE_EBPF */
102 std::vector<std::unique_ptr<ClientState>> g_frontends;
103 GlobalStateHolder<pools_t> g_pools;
104 size_t g_udpVectorSize{1};
105
106 bool g_snmpEnabled{false};
107 bool g_snmpTrapsEnabled{false};
108 DNSDistSNMPAgent* g_snmpAgent{nullptr};
109
110 /* UDP: the grand design. Per socket we listen on for incoming queries there is one thread.
111 Then we have a bunch of connected sockets for talking to downstream servers.
112 We send directly to those sockets.
113
114 For the return path, per downstream server we have a thread that listens to responses.
115
116 Per socket there is an array of 2^16 states, when we send out a packet downstream, we note
117 there the original requestor and the original id. The new ID is the offset in the array.
118
119 When an answer comes in on a socket, we look up the offset by the id, and lob it to the
120 original requestor.
121
122 IDs are assigned by atomic increments of the socket offset.
123 */
124
125 GlobalStateHolder<vector<DNSDistRuleAction> > g_rulactions;
126 GlobalStateHolder<vector<DNSDistResponseRuleAction> > g_resprulactions;
127 GlobalStateHolder<vector<DNSDistResponseRuleAction> > g_cachehitresprulactions;
128 GlobalStateHolder<vector<DNSDistResponseRuleAction> > g_selfansweredresprulactions;
129
130 Rings g_rings;
131 QueryCount g_qcount;
132
133 GlobalStateHolder<servers_t> g_dstates;
134 GlobalStateHolder<NetmaskTree<DynBlock>> g_dynblockNMG;
135 GlobalStateHolder<SuffixMatchTree<DynBlock>> g_dynblockSMT;
136 DNSAction::Action g_dynBlockAction = DNSAction::Action::Drop;
137 int g_tcpRecvTimeout{2};
138 int g_tcpSendTimeout{2};
139 int g_udpTimeout{2};
140
141 bool g_servFailOnNoPolicy{false};
142 bool g_truncateTC{false};
143 bool g_fixupCase{false};
144 bool g_preserveTrailingData{false};
145 bool g_roundrobinFailOnNoServer{false};
146
147 std::set<std::string> g_capabilitiesToRetain;
148
149 static void truncateTC(char* packet, uint16_t* len, size_t responseSize, unsigned int consumed)
150 try
151 {
152 bool hadEDNS = false;
153 uint16_t payloadSize = 0;
154 uint16_t z = 0;
155
156 if (g_addEDNSToSelfGeneratedResponses) {
157 hadEDNS = getEDNSUDPPayloadSizeAndZ(packet, *len, &payloadSize, &z);
158 }
159
160 *len=static_cast<uint16_t>(sizeof(dnsheader)+consumed+DNS_TYPE_SIZE+DNS_CLASS_SIZE);
161 struct dnsheader* dh = reinterpret_cast<struct dnsheader*>(packet);
162 dh->ancount = dh->arcount = dh->nscount = 0;
163
164 if (hadEDNS) {
165 addEDNS(dh, *len, responseSize, z & EDNS_HEADER_FLAG_DO, payloadSize, 0);
166 }
167 }
168 catch(...)
169 {
170 g_stats.truncFail++;
171 }
172
173 struct DelayedPacket
174 {
175 int fd;
176 string packet;
177 ComboAddress destination;
178 ComboAddress origDest;
179 void operator()()
180 {
181 ssize_t res;
182 if(origDest.sin4.sin_family == 0) {
183 res = sendto(fd, packet.c_str(), packet.size(), 0, (struct sockaddr*)&destination, destination.getSocklen());
184 }
185 else {
186 res = sendfromto(fd, packet.c_str(), packet.size(), 0, origDest, destination);
187 }
188 if (res == -1) {
189 int err = errno;
190 vinfolog("Error sending delayed response to %s: %s", destination.toStringWithPort(), strerror(err));
191 }
192 }
193 };
194
195 DelayPipe<DelayedPacket>* g_delay = nullptr;
196
197 void doLatencyStats(double udiff)
198 {
199 if(udiff < 1000) ++g_stats.latency0_1;
200 else if(udiff < 10000) ++g_stats.latency1_10;
201 else if(udiff < 50000) ++g_stats.latency10_50;
202 else if(udiff < 100000) ++g_stats.latency50_100;
203 else if(udiff < 1000000) ++g_stats.latency100_1000;
204 else ++g_stats.latencySlow;
205 g_stats.latencySum += udiff / 1000;
206
207 auto doAvg = [](double& var, double n, double weight) {
208 var = (weight -1) * var/weight + n/weight;
209 };
210
211 doAvg(g_stats.latencyAvg100, udiff, 100);
212 doAvg(g_stats.latencyAvg1000, udiff, 1000);
213 doAvg(g_stats.latencyAvg10000, udiff, 10000);
214 doAvg(g_stats.latencyAvg1000000, udiff, 1000000);
215 }
216
217 bool responseContentMatches(const char* response, const uint16_t responseLen, const DNSName& qname, const uint16_t qtype, const uint16_t qclass, const ComboAddress& remote, unsigned int& consumed)
218 {
219 if (responseLen < sizeof(dnsheader)) {
220 return false;
221 }
222
223 const struct dnsheader* dh = reinterpret_cast<const struct dnsheader*>(response);
224 if (dh->qdcount == 0) {
225 if ((dh->rcode != RCode::NoError && dh->rcode != RCode::NXDomain) || g_allowEmptyResponse) {
226 return true;
227 }
228 else {
229 ++g_stats.nonCompliantResponses;
230 return false;
231 }
232 }
233
234 uint16_t rqtype, rqclass;
235 DNSName rqname;
236 try {
237 rqname=DNSName(response, responseLen, sizeof(dnsheader), false, &rqtype, &rqclass, &consumed);
238 }
239 catch(const std::exception& e) {
240 if(responseLen > 0 && static_cast<size_t>(responseLen) > sizeof(dnsheader)) {
241 infolog("Backend %s sent us a response with id %d that did not parse: %s", remote.toStringWithPort(), ntohs(dh->id), e.what());
242 }
243 ++g_stats.nonCompliantResponses;
244 return false;
245 }
246
247 if (rqtype != qtype || rqclass != qclass || rqname != qname) {
248 return false;
249 }
250
251 return true;
252 }
253
254 static void restoreFlags(struct dnsheader* dh, uint16_t origFlags)
255 {
256 static const uint16_t rdMask = 1 << FLAGS_RD_OFFSET;
257 static const uint16_t cdMask = 1 << FLAGS_CD_OFFSET;
258 static const uint16_t restoreFlagsMask = UINT16_MAX & ~(rdMask | cdMask);
259 uint16_t * flags = getFlagsFromDNSHeader(dh);
260 /* clear the flags we are about to restore */
261 *flags &= restoreFlagsMask;
262 /* only keep the flags we want to restore */
263 origFlags &= ~restoreFlagsMask;
264 /* set the saved flags as they were */
265 *flags |= origFlags;
266 }
267
268 static bool fixUpQueryTurnedResponse(DNSQuestion& dq, const uint16_t origFlags)
269 {
270 restoreFlags(dq.dh, origFlags);
271
272 return addEDNSToQueryTurnedResponse(dq);
273 }
274
275 static bool fixUpResponse(char** response, uint16_t* responseLen, size_t* responseSize, const DNSName& qname, uint16_t origFlags, bool ednsAdded, bool ecsAdded, std::vector<uint8_t>& rewrittenResponse, uint16_t addRoom, bool* zeroScope)
276 {
277 if (*responseLen < sizeof(dnsheader)) {
278 return false;
279 }
280
281 struct dnsheader* dh = reinterpret_cast<struct dnsheader*>(*response);
282 restoreFlags(dh, origFlags);
283
284 if (*responseLen == sizeof(dnsheader)) {
285 return true;
286 }
287
288 if(g_fixupCase) {
289 string realname = qname.toDNSString();
290 if (*responseLen >= (sizeof(dnsheader) + realname.length())) {
291 memcpy(*response + sizeof(dnsheader), realname.c_str(), realname.length());
292 }
293 }
294
295 if (ednsAdded || ecsAdded) {
296 uint16_t optStart;
297 size_t optLen = 0;
298 bool last = false;
299
300 const std::string responseStr(*response, *responseLen);
301 int res = locateEDNSOptRR(responseStr, &optStart, &optLen, &last);
302
303 if (res == 0) {
304 if (zeroScope) { // this finds if an EDNS Client Subnet scope was set, and if it is 0
305 size_t optContentStart = 0;
306 uint16_t optContentLen = 0;
307 /* we need at least 4 bytes after the option length (family: 2, source prefix-length: 1, scope prefix-length: 1) */
308 if (isEDNSOptionInOpt(responseStr, optStart, optLen, EDNSOptionCode::ECS, &optContentStart, &optContentLen) && optContentLen >= 4) {
309 /* see if the EDNS Client Subnet SCOPE PREFIX-LENGTH byte in position 3 is set to 0, which is the only thing
310 we care about. */
311 *zeroScope = responseStr.at(optContentStart + 3) == 0;
312 }
313 }
314
315 if (ednsAdded) {
316 /* we added the entire OPT RR,
317 therefore we need to remove it entirely */
318 if (last) {
319 /* simply remove the last AR */
320 *responseLen -= optLen;
321 uint16_t arcount = ntohs(dh->arcount);
322 arcount--;
323 dh->arcount = htons(arcount);
324 }
325 else {
326 /* Removing an intermediary RR could lead to compression error */
327 if (rewriteResponseWithoutEDNS(responseStr, rewrittenResponse) == 0) {
328 *responseLen = rewrittenResponse.size();
329 if (addRoom && (UINT16_MAX - *responseLen) > addRoom) {
330 rewrittenResponse.reserve(*responseLen + addRoom);
331 }
332 *responseSize = rewrittenResponse.capacity();
333 *response = reinterpret_cast<char*>(rewrittenResponse.data());
334 }
335 else {
336 warnlog("Error rewriting content");
337 }
338 }
339 }
340 else {
341 /* the OPT RR was already present, but without ECS,
342 we need to remove the ECS option if any */
343 if (last) {
344 /* nothing after the OPT RR, we can simply remove the
345 ECS option */
346 size_t existingOptLen = optLen;
347 removeEDNSOptionFromOPT(*response + optStart, &optLen, EDNSOptionCode::ECS);
348 *responseLen -= (existingOptLen - optLen);
349 }
350 else {
351 /* Removing an intermediary RR could lead to compression error */
352 if (rewriteResponseWithoutEDNSOption(responseStr, EDNSOptionCode::ECS, rewrittenResponse) == 0) {
353 *responseLen = rewrittenResponse.size();
354 if (addRoom && (UINT16_MAX - *responseLen) > addRoom) {
355 rewrittenResponse.reserve(*responseLen + addRoom);
356 }
357 *responseSize = rewrittenResponse.capacity();
358 *response = reinterpret_cast<char*>(rewrittenResponse.data());
359 }
360 else {
361 warnlog("Error rewriting content");
362 }
363 }
364 }
365 }
366 }
367
368 return true;
369 }
370
371 #ifdef HAVE_DNSCRYPT
372 static bool encryptResponse(char* response, uint16_t* responseLen, size_t responseSize, bool tcp, std::shared_ptr<DNSCryptQuery> dnsCryptQuery, dnsheader** dh, dnsheader* dhCopy)
373 {
374 if (dnsCryptQuery) {
375 uint16_t encryptedResponseLen = 0;
376
377 /* save the original header before encrypting it in place */
378 if (dh != nullptr && *dh != nullptr && dhCopy != nullptr) {
379 memcpy(dhCopy, *dh, sizeof(dnsheader));
380 *dh = dhCopy;
381 }
382
383 int res = dnsCryptQuery->encryptResponse(response, *responseLen, responseSize, tcp, &encryptedResponseLen);
384 if (res == 0) {
385 *responseLen = encryptedResponseLen;
386 } else {
387 /* dropping response */
388 vinfolog("Error encrypting the response, dropping.");
389 return false;
390 }
391 }
392 return true;
393 }
394 #endif /* HAVE_DNSCRYPT */
395
396 static bool applyRulesToResponse(LocalStateHolder<vector<DNSDistResponseRuleAction> >& localRespRulactions, DNSResponse& dr)
397 {
398 DNSResponseAction::Action action=DNSResponseAction::Action::None;
399 std::string ruleresult;
400 for(const auto& lr : *localRespRulactions) {
401 if(lr.d_rule->matches(&dr)) {
402 lr.d_rule->d_matches++;
403 action=(*lr.d_action)(&dr, &ruleresult);
404 switch(action) {
405 case DNSResponseAction::Action::Allow:
406 return true;
407 break;
408 case DNSResponseAction::Action::Drop:
409 return false;
410 break;
411 case DNSResponseAction::Action::HeaderModify:
412 return true;
413 break;
414 case DNSResponseAction::Action::ServFail:
415 dr.dh->rcode = RCode::ServFail;
416 return true;
417 break;
418 /* non-terminal actions follow */
419 case DNSResponseAction::Action::Delay:
420 dr.delayMsec = static_cast<int>(pdns_stou(ruleresult)); // sorry
421 break;
422 case DNSResponseAction::Action::None:
423 break;
424 }
425 }
426 }
427
428 return true;
429 }
430
431 bool processResponse(char** response, uint16_t* responseLen, size_t* responseSize, LocalStateHolder<vector<DNSDistResponseRuleAction> >& localRespRulactions, DNSResponse& dr, size_t addRoom, std::vector<uint8_t>& rewrittenResponse, bool muted)
432 {
433 if (!applyRulesToResponse(localRespRulactions, dr)) {
434 return false;
435 }
436
437 bool zeroScope = false;
438 if (!fixUpResponse(response, responseLen, responseSize, *dr.qname, dr.origFlags, dr.ednsAdded, dr.ecsAdded, rewrittenResponse, addRoom, dr.useZeroScope ? &zeroScope : nullptr)) {
439 return false;
440 }
441
442 if (dr.packetCache && !dr.skipCache && *responseLen <= s_maxPacketCacheEntrySize) {
443 if (!dr.useZeroScope) {
444 /* if the query was not suitable for zero-scope, for
445 example because it had an existing ECS entry so the hash is
446 not really 'no ECS', so just insert it for the existing subnet
447 since:
448 - we don't have the correct hash for a non-ECS query
449 - inserting with hash computed before the ECS replacement but with
450 the subnet extracted _after_ the replacement would not work.
451 */
452 zeroScope = false;
453 }
454 // if zeroScope, pass the pre-ECS hash-key and do not pass the subnet to the cache
455 dr.packetCache->insert(zeroScope ? dr.cacheKeyNoECS : dr.cacheKey, zeroScope ? boost::none : dr.subnet, dr.origFlags, dr.dnssecOK, *dr.qname, dr.qtype, dr.qclass, *response, *responseLen, dr.tcp, dr.dh->rcode, dr.tempFailureTTL);
456 }
457
458 #ifdef HAVE_DNSCRYPT
459 if (!muted) {
460 if (!encryptResponse(*response, responseLen, *responseSize, dr.tcp, dr.dnsCryptQuery, nullptr, nullptr)) {
461 return false;
462 }
463 }
464 #endif /* HAVE_DNSCRYPT */
465
466 return true;
467 }
468
469 static bool sendUDPResponse(int origFD, const char* response, const uint16_t responseLen, const int delayMsec, const ComboAddress& origDest, const ComboAddress& origRemote)
470 {
471 if(delayMsec && g_delay) {
472 DelayedPacket dp{origFD, string(response,responseLen), origRemote, origDest};
473 g_delay->submit(dp, delayMsec);
474 }
475 else {
476 ssize_t res;
477 if(origDest.sin4.sin_family == 0) {
478 res = sendto(origFD, response, responseLen, 0, reinterpret_cast<const struct sockaddr*>(&origRemote), origRemote.getSocklen());
479 }
480 else {
481 res = sendfromto(origFD, response, responseLen, 0, origDest, origRemote);
482 }
483 if (res == -1) {
484 int err = errno;
485 vinfolog("Error sending response to %s: %s", origRemote.toStringWithPort(), stringerror(err));
486 }
487 }
488
489 return true;
490 }
491
492
493 int pickBackendSocketForSending(std::shared_ptr<DownstreamState>& state)
494 {
495 return state->sockets[state->socketsOffset++ % state->sockets.size()];
496 }
497
498 static void pickBackendSocketsReadyForReceiving(const std::shared_ptr<DownstreamState>& state, std::vector<int>& ready)
499 {
500 ready.clear();
501
502 if (state->sockets.size() == 1) {
503 ready.push_back(state->sockets[0]);
504 return ;
505 }
506
507 {
508 std::lock_guard<std::mutex> lock(state->socketsLock);
509 state->mplexer->getAvailableFDs(ready, -1);
510 }
511 }
512
513 // listens on a dedicated socket, lobs answers from downstream servers to original requestors
514 void responderThread(std::shared_ptr<DownstreamState> dss)
515 try {
516 setThreadName("dnsdist/respond");
517 auto localRespRulactions = g_resprulactions.getLocal();
518 char packet[s_maxPacketCacheEntrySize + DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE];
519 static_assert(sizeof(packet) <= UINT16_MAX, "Packet size should fit in a uint16_t");
520 /* when the answer is encrypted in place, we need to get a copy
521 of the original header before encryption to fill the ring buffer */
522 dnsheader cleartextDH;
523 vector<uint8_t> rewrittenResponse;
524
525 uint16_t queryId = 0;
526 std::vector<int> sockets;
527 sockets.reserve(dss->sockets.size());
528
529 for(;;) {
530 dnsheader* dh = reinterpret_cast<struct dnsheader*>(packet);
531 try {
532 pickBackendSocketsReadyForReceiving(dss, sockets);
533 for (const auto& fd : sockets) {
534 ssize_t got = recv(fd, packet, sizeof(packet), 0);
535 char * response = packet;
536 size_t responseSize = sizeof(packet);
537
538 if (got < 0 || static_cast<size_t>(got) < sizeof(dnsheader))
539 continue;
540
541 uint16_t responseLen = static_cast<uint16_t>(got);
542 queryId = dh->id;
543
544 if(queryId >= dss->idStates.size()) {
545 continue;
546 }
547
548 IDState* ids = &dss->idStates[queryId];
549 int64_t usageIndicator = ids->usageIndicator;
550
551 if(!IDState::isInUse(usageIndicator)) {
552 /* the corresponding state is marked as not in use, meaning that:
553 - it was already cleaned up by another thread and the state is gone ;
554 - we already got a response for this query and this one is a duplicate.
555 Either way, we don't touch it.
556 */
557 continue;
558 }
559
560 /* read the potential DOHUnit state as soon as possible, but don't use it
561 until we have confirmed that we own this state by updating usageIndicator */
562 auto du = ids->du;
563 /* setting age to 0 to prevent the maintainer thread from
564 cleaning this IDS while we process the response.
565 */
566 ids->age = 0;
567 int origFD = ids->origFD;
568
569 unsigned int consumed = 0;
570 if (!responseContentMatches(response, responseLen, ids->qname, ids->qtype, ids->qclass, dss->remote, consumed)) {
571 continue;
572 }
573
574 bool isDoH = du != nullptr;
575 /* atomically mark the state as available, but only if it has not been altered
576 in the meantime */
577 if (ids->tryMarkUnused(usageIndicator)) {
578 /* clear the potential DOHUnit asap, it's ours now
579 and since we just marked the state as unused,
580 someone could overwrite it. */
581 ids->du = nullptr;
582 /* we only decrement the outstanding counter if the value was not
583 altered in the meantime, which would mean that the state has been actively reused
584 and the other thread has not incremented the outstanding counter, so we don't
585 want it to be decremented twice. */
586 --dss->outstanding; // you'd think an attacker could game this, but we're using connected socket
587 } else {
588 /* someone updated the state in the meantime, we can't touch the existing pointer */
589 du = nullptr;
590 /* since the state has been updated, we can't safely access it so let's just drop
591 this response */
592 continue;
593 }
594
595 if(dh->tc && g_truncateTC) {
596 truncateTC(response, &responseLen, responseSize, consumed);
597 }
598
599 dh->id = ids->origID;
600
601 uint16_t addRoom = 0;
602 DNSResponse dr = makeDNSResponseFromIDState(*ids, dh, sizeof(packet), responseLen, false);
603 if (dr.dnsCryptQuery) {
604 addRoom = DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE;
605 }
606
607 memcpy(&cleartextDH, dr.dh, sizeof(cleartextDH));
608 if (!processResponse(&response, &responseLen, &responseSize, localRespRulactions, dr, addRoom, rewrittenResponse, ids->cs && ids->cs->muted)) {
609 continue;
610 }
611
612 if (ids->cs && !ids->cs->muted) {
613 if (du) {
614 #ifdef HAVE_DNS_OVER_HTTPS
615 // DoH query
616 du->response = std::string(response, responseLen);
617 if (send(du->rsock, &du, sizeof(du), 0) != sizeof(du)) {
618 /* at this point we have the only remaining pointer on this
619 DOHUnit object since we did set ids->du to nullptr earlier,
620 except if we got the response before the pointer could be
621 released by the frontend */
622 du->release();
623 }
624 #endif /* HAVE_DNS_OVER_HTTPS */
625 du = nullptr;
626 }
627 else {
628 ComboAddress empty;
629 empty.sin4.sin_family = 0;
630 /* if ids->destHarvested is false, origDest holds the listening address.
631 We don't want to use that as a source since it could be 0.0.0.0 for example. */
632 sendUDPResponse(origFD, response, responseLen, dr.delayMsec, ids->destHarvested ? ids->origDest : empty, ids->origRemote);
633 }
634 }
635
636 ++g_stats.responses;
637 if (ids->cs) {
638 ++ids->cs->responses;
639 }
640 ++dss->responses;
641
642 double udiff = ids->sentTime.udiff();
643 vinfolog("Got answer from %s, relayed to %s%s, took %f usec", dss->remote.toStringWithPort(), ids->origRemote.toStringWithPort(),
644 isDoH ? " (https)": "", udiff);
645
646 struct timespec ts;
647 gettime(&ts);
648 g_rings.insertResponse(ts, *dr.remote, *dr.qname, dr.qtype, static_cast<unsigned int>(udiff), static_cast<unsigned int>(got), cleartextDH, dss->remote);
649
650 switch (cleartextDH.rcode) {
651 case RCode::NXDomain:
652 ++g_stats.frontendNXDomain;
653 break;
654 case RCode::ServFail:
655 ++g_stats.servfailResponses;
656 ++g_stats.frontendServFail;
657 break;
658 case RCode::NoError:
659 ++g_stats.frontendNoError;
660 break;
661 }
662 dss->latencyUsec = (127.0 * dss->latencyUsec / 128.0) + udiff/128.0;
663
664 doLatencyStats(udiff);
665
666 rewrittenResponse.clear();
667 }
668 }
669 catch(const std::exception& e){
670 vinfolog("Got an error in UDP responder thread while parsing a response from %s, id %d: %s", dss->remote.toStringWithPort(), queryId, e.what());
671 }
672 }
673 }
674 catch(const std::exception& e)
675 {
676 errlog("UDP responder thread died because of exception: %s", e.what());
677 }
678 catch(const PDNSException& e)
679 {
680 errlog("UDP responder thread died because of PowerDNS exception: %s", e.reason);
681 }
682 catch(...)
683 {
684 errlog("UDP responder thread died because of an exception: %s", "unknown");
685 }
686
687 bool DownstreamState::reconnect()
688 {
689 std::unique_lock<std::mutex> tl(connectLock, std::try_to_lock);
690 if (!tl.owns_lock()) {
691 /* we are already reconnecting */
692 return false;
693 }
694
695 connected = false;
696 for (auto& fd : sockets) {
697 if (fd != -1) {
698 if (sockets.size() > 1) {
699 std::lock_guard<std::mutex> lock(socketsLock);
700 mplexer->removeReadFD(fd);
701 }
702 /* shutdown() is needed to wake up recv() in the responderThread */
703 shutdown(fd, SHUT_RDWR);
704 close(fd);
705 fd = -1;
706 }
707 if (!IsAnyAddress(remote)) {
708 fd = SSocket(remote.sin4.sin_family, SOCK_DGRAM, 0);
709 if (!IsAnyAddress(sourceAddr)) {
710 SSetsockopt(fd, SOL_SOCKET, SO_REUSEADDR, 1);
711 if (!sourceItfName.empty()) {
712 #ifdef SO_BINDTODEVICE
713 int res = setsockopt(fd, SOL_SOCKET, SO_BINDTODEVICE, sourceItfName.c_str(), sourceItfName.length());
714 if (res != 0) {
715 infolog("Error setting up the interface on backend socket '%s': %s", remote.toStringWithPort(), stringerror());
716 }
717 #endif
718 }
719
720 SBind(fd, sourceAddr);
721 }
722 try {
723 SConnect(fd, remote);
724 if (sockets.size() > 1) {
725 std::lock_guard<std::mutex> lock(socketsLock);
726 mplexer->addReadFD(fd, [](int, boost::any) {});
727 }
728 connected = true;
729 }
730 catch(const std::runtime_error& error) {
731 infolog("Error connecting to new server with address %s: %s", remote.toStringWithPort(), error.what());
732 connected = false;
733 break;
734 }
735 }
736 }
737
738 /* if at least one (re-)connection failed, close all sockets */
739 if (!connected) {
740 for (auto& fd : sockets) {
741 if (fd != -1) {
742 if (sockets.size() > 1) {
743 std::lock_guard<std::mutex> lock(socketsLock);
744 mplexer->removeReadFD(fd);
745 }
746 /* shutdown() is needed to wake up recv() in the responderThread */
747 shutdown(fd, SHUT_RDWR);
748 close(fd);
749 fd = -1;
750 }
751 }
752 }
753
754 return connected;
755 }
756 void DownstreamState::hash()
757 {
758 vinfolog("Computing hashes for id=%s and weight=%d", id, weight);
759 auto w = weight;
760 WriteLock wl(&d_lock);
761 hashes.clear();
762 while (w > 0) {
763 std::string uuid = boost::str(boost::format("%s-%d") % id % w);
764 unsigned int wshash = burtleCI((const unsigned char*)uuid.c_str(), uuid.size(), g_hashperturb);
765 hashes.insert(wshash);
766 --w;
767 }
768 }
769
770 void DownstreamState::setId(const boost::uuids::uuid& newId)
771 {
772 id = newId;
773 // compute hashes only if already done
774 if (!hashes.empty()) {
775 hash();
776 }
777 }
778
779 void DownstreamState::setWeight(int newWeight)
780 {
781 if (newWeight < 1) {
782 errlog("Error setting server's weight: downstream weight value must be greater than 0.");
783 return ;
784 }
785 weight = newWeight;
786 if (!hashes.empty()) {
787 hash();
788 }
789 }
790
791 DownstreamState::DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf_, const std::string& sourceItfName_, size_t numberOfSockets): sourceItfName(sourceItfName_), remote(remote_), sourceAddr(sourceAddr_), sourceItf(sourceItf_)
792 {
793 pthread_rwlock_init(&d_lock, nullptr);
794 id = getUniqueID();
795 threadStarted.clear();
796
797 mplexer = std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
798
799 sockets.resize(numberOfSockets);
800 for (auto& fd : sockets) {
801 fd = -1;
802 }
803
804 if (!IsAnyAddress(remote)) {
805 reconnect();
806 idStates.resize(g_maxOutstanding);
807 sw.start();
808 infolog("Added downstream server %s", remote.toStringWithPort());
809 }
810
811 }
812
813 std::mutex g_luamutex;
814 LuaContext g_lua;
815
816 GlobalStateHolder<ServerPolicy> g_policy;
817
818 shared_ptr<DownstreamState> firstAvailable(const NumberedServerVector& servers, const DNSQuestion* dq)
819 {
820 for(auto& d : servers) {
821 if(d.second->isUp() && d.second->qps.check())
822 return d.second;
823 }
824 return leastOutstanding(servers, dq);
825 }
826
827 // get server with least outstanding queries, and within those, with the lowest order, and within those: the fastest
828 shared_ptr<DownstreamState> leastOutstanding(const NumberedServerVector& servers, const DNSQuestion* dq)
829 {
830 if (servers.size() == 1 && servers[0].second->isUp()) {
831 return servers[0].second;
832 }
833
834 vector<pair<tuple<int,int,double>, shared_ptr<DownstreamState>>> poss;
835 /* so you might wonder, why do we go through this trouble? The data on which we sort could change during the sort,
836 which would suck royally and could even lead to crashes. So first we snapshot on what we sort, and then we sort */
837 poss.reserve(servers.size());
838 for(auto& d : servers) {
839 if(d.second->isUp()) {
840 poss.push_back({make_tuple(d.second->outstanding.load(), d.second->order, d.second->latencyUsec), d.second});
841 }
842 }
843 if(poss.empty())
844 return shared_ptr<DownstreamState>();
845 nth_element(poss.begin(), poss.begin(), poss.end(), [](const decltype(poss)::value_type& a, const decltype(poss)::value_type& b) { return a.first < b.first; });
846 return poss.begin()->second;
847 }
848
849 shared_ptr<DownstreamState> valrandom(unsigned int val, const NumberedServerVector& servers, const DNSQuestion* dq)
850 {
851 vector<pair<int, shared_ptr<DownstreamState>>> poss;
852 int sum = 0;
853 int max = std::numeric_limits<int>::max();
854
855 for(auto& d : servers) { // w=1, w=10 -> 1, 11
856 if(d.second->isUp()) {
857 // Don't overflow sum when adding high weights
858 if(d.second->weight > max - sum) {
859 sum = max;
860 } else {
861 sum += d.second->weight;
862 }
863
864 poss.push_back({sum, d.second});
865 }
866 }
867
868 // Catch poss & sum are empty to avoid SIGFPE
869 if(poss.empty())
870 return shared_ptr<DownstreamState>();
871
872 int r = val % sum;
873 auto p = upper_bound(poss.begin(), poss.end(),r, [](int r_, const decltype(poss)::value_type& a) { return r_ < a.first;});
874 if(p==poss.end())
875 return shared_ptr<DownstreamState>();
876 return p->second;
877 }
878
879 shared_ptr<DownstreamState> wrandom(const NumberedServerVector& servers, const DNSQuestion* dq)
880 {
881 return valrandom(random(), servers, dq);
882 }
883
884 uint32_t g_hashperturb;
885 shared_ptr<DownstreamState> whashed(const NumberedServerVector& servers, const DNSQuestion* dq)
886 {
887 return valrandom(dq->qname->hash(g_hashperturb), servers, dq);
888 }
889
890 shared_ptr<DownstreamState> chashed(const NumberedServerVector& servers, const DNSQuestion* dq)
891 {
892 unsigned int qhash = dq->qname->hash(g_hashperturb);
893 unsigned int sel = std::numeric_limits<unsigned int>::max();
894 unsigned int min = std::numeric_limits<unsigned int>::max();
895 shared_ptr<DownstreamState> ret = nullptr, first = nullptr;
896
897 for (const auto& d: servers) {
898 if (d.second->isUp()) {
899 // make sure hashes have been computed
900 if (d.second->hashes.empty()) {
901 d.second->hash();
902 }
903 {
904 ReadLock rl(&(d.second->d_lock));
905 const auto& server = d.second;
906 // we want to keep track of the last hash
907 if (min > *(server->hashes.begin())) {
908 min = *(server->hashes.begin());
909 first = server;
910 }
911
912 auto hash_it = server->hashes.lower_bound(qhash);
913 if (hash_it != server->hashes.end()) {
914 if (*hash_it < sel) {
915 sel = *hash_it;
916 ret = server;
917 }
918 }
919 }
920 }
921 }
922 if (ret != nullptr) {
923 return ret;
924 }
925 if (first != nullptr) {
926 return first;
927 }
928 return shared_ptr<DownstreamState>();
929 }
930
931 shared_ptr<DownstreamState> roundrobin(const NumberedServerVector& servers, const DNSQuestion* dq)
932 {
933 NumberedServerVector poss;
934
935 for(auto& d : servers) {
936 if(d.second->isUp()) {
937 poss.push_back(d);
938 }
939 }
940
941 const auto *res=&poss;
942 if(poss.empty() && !g_roundrobinFailOnNoServer)
943 res = &servers;
944
945 if(res->empty())
946 return shared_ptr<DownstreamState>();
947
948 static unsigned int counter;
949
950 return (*res)[(counter++) % res->size()].second;
951 }
952
953 ComboAddress g_serverControl{"127.0.0.1:5199"};
954
955 std::shared_ptr<ServerPool> createPoolIfNotExists(pools_t& pools, const string& poolName)
956 {
957 std::shared_ptr<ServerPool> pool;
958 pools_t::iterator it = pools.find(poolName);
959 if (it != pools.end()) {
960 pool = it->second;
961 }
962 else {
963 if (!poolName.empty())
964 vinfolog("Creating pool %s", poolName);
965 pool = std::make_shared<ServerPool>();
966 pools.insert(std::pair<std::string,std::shared_ptr<ServerPool> >(poolName, pool));
967 }
968 return pool;
969 }
970
971 void setPoolPolicy(pools_t& pools, const string& poolName, std::shared_ptr<ServerPolicy> policy)
972 {
973 std::shared_ptr<ServerPool> pool = createPoolIfNotExists(pools, poolName);
974 if (!poolName.empty()) {
975 vinfolog("Setting pool %s server selection policy to %s", poolName, policy->name);
976 } else {
977 vinfolog("Setting default pool server selection policy to %s", policy->name);
978 }
979 pool->policy = policy;
980 }
981
982 void addServerToPool(pools_t& pools, const string& poolName, std::shared_ptr<DownstreamState> server)
983 {
984 std::shared_ptr<ServerPool> pool = createPoolIfNotExists(pools, poolName);
985 if (!poolName.empty()) {
986 vinfolog("Adding server to pool %s", poolName);
987 } else {
988 vinfolog("Adding server to default pool");
989 }
990 pool->addServer(server);
991 }
992
993 void removeServerFromPool(pools_t& pools, const string& poolName, std::shared_ptr<DownstreamState> server)
994 {
995 std::shared_ptr<ServerPool> pool = getPool(pools, poolName);
996
997 if (!poolName.empty()) {
998 vinfolog("Removing server from pool %s", poolName);
999 }
1000 else {
1001 vinfolog("Removing server from default pool");
1002 }
1003
1004 pool->removeServer(server);
1005 }
1006
1007 std::shared_ptr<ServerPool> getPool(const pools_t& pools, const std::string& poolName)
1008 {
1009 pools_t::const_iterator it = pools.find(poolName);
1010
1011 if (it == pools.end()) {
1012 throw std::out_of_range("No pool named " + poolName);
1013 }
1014
1015 return it->second;
1016 }
1017
1018 NumberedServerVector getDownstreamCandidates(const pools_t& pools, const std::string& poolName)
1019 {
1020 std::shared_ptr<ServerPool> pool = getPool(pools, poolName);
1021 return pool->getServers();
1022 }
1023
1024 static void spoofResponseFromString(DNSQuestion& dq, const string& spoofContent)
1025 {
1026 string result;
1027
1028 std::vector<std::string> addrs;
1029 stringtok(addrs, spoofContent, " ,");
1030
1031 if (addrs.size() == 1) {
1032 try {
1033 ComboAddress spoofAddr(spoofContent);
1034 SpoofAction sa({spoofAddr});
1035 sa(&dq, &result);
1036 }
1037 catch(const PDNSException &e) {
1038 SpoofAction sa(spoofContent); // CNAME then
1039 sa(&dq, &result);
1040 }
1041 } else {
1042 std::vector<ComboAddress> cas;
1043 for (const auto& addr : addrs) {
1044 try {
1045 cas.push_back(ComboAddress(addr));
1046 }
1047 catch (...) {
1048 }
1049 }
1050 SpoofAction sa(cas);
1051 sa(&dq, &result);
1052 }
1053 }
1054
1055 bool processRulesResult(const DNSAction::Action& action, DNSQuestion& dq, std::string& ruleresult, bool& drop)
1056 {
1057 switch(action) {
1058 case DNSAction::Action::Allow:
1059 return true;
1060 break;
1061 case DNSAction::Action::Drop:
1062 ++g_stats.ruleDrop;
1063 drop = true;
1064 return true;
1065 break;
1066 case DNSAction::Action::Nxdomain:
1067 dq.dh->rcode = RCode::NXDomain;
1068 dq.dh->qr=true;
1069 ++g_stats.ruleNXDomain;
1070 return true;
1071 break;
1072 case DNSAction::Action::Refused:
1073 dq.dh->rcode = RCode::Refused;
1074 dq.dh->qr=true;
1075 ++g_stats.ruleRefused;
1076 return true;
1077 break;
1078 case DNSAction::Action::ServFail:
1079 dq.dh->rcode = RCode::ServFail;
1080 dq.dh->qr=true;
1081 ++g_stats.ruleServFail;
1082 return true;
1083 break;
1084 case DNSAction::Action::Spoof:
1085 spoofResponseFromString(dq, ruleresult);
1086 return true;
1087 break;
1088 case DNSAction::Action::Truncate:
1089 dq.dh->tc = true;
1090 dq.dh->qr = true;
1091 return true;
1092 break;
1093 case DNSAction::Action::HeaderModify:
1094 return true;
1095 break;
1096 case DNSAction::Action::Pool:
1097 dq.poolname=ruleresult;
1098 return true;
1099 break;
1100 case DNSAction::Action::NoRecurse:
1101 dq.dh->rd = false;
1102 return true;
1103 break;
1104 /* non-terminal actions follow */
1105 case DNSAction::Action::Delay:
1106 dq.delayMsec = static_cast<int>(pdns_stou(ruleresult)); // sorry
1107 break;
1108 case DNSAction::Action::None:
1109 /* fall-through */
1110 case DNSAction::Action::NoOp:
1111 break;
1112 }
1113
1114 /* false means that we don't stop the processing */
1115 return false;
1116 }
1117
1118
1119 static bool applyRulesToQuery(LocalHolders& holders, DNSQuestion& dq, const struct timespec& now)
1120 {
1121 g_rings.insertQuery(now, *dq.remote, *dq.qname, dq.qtype, dq.len, *dq.dh);
1122
1123 if(g_qcount.enabled) {
1124 string qname = (*dq.qname).toLogString();
1125 bool countQuery{true};
1126 if(g_qcount.filter) {
1127 std::lock_guard<std::mutex> lock(g_luamutex);
1128 std::tie (countQuery, qname) = g_qcount.filter(&dq);
1129 }
1130
1131 if(countQuery) {
1132 WriteLock wl(&g_qcount.queryLock);
1133 if(!g_qcount.records.count(qname)) {
1134 g_qcount.records[qname] = 0;
1135 }
1136 g_qcount.records[qname]++;
1137 }
1138 }
1139
1140 if(auto got = holders.dynNMGBlock->lookup(*dq.remote)) {
1141 auto updateBlockStats = [&got]() {
1142 ++g_stats.dynBlocked;
1143 got->second.blocks++;
1144 };
1145
1146 if(now < got->second.until) {
1147 DNSAction::Action action = got->second.action;
1148 if (action == DNSAction::Action::None) {
1149 action = g_dynBlockAction;
1150 }
1151 switch (action) {
1152 case DNSAction::Action::NoOp:
1153 /* do nothing */
1154 break;
1155
1156 case DNSAction::Action::Nxdomain:
1157 vinfolog("Query from %s turned into NXDomain because of dynamic block", dq.remote->toStringWithPort());
1158 updateBlockStats();
1159
1160 dq.dh->rcode = RCode::NXDomain;
1161 dq.dh->qr=true;
1162 return true;
1163
1164 case DNSAction::Action::Refused:
1165 vinfolog("Query from %s refused because of dynamic block", dq.remote->toStringWithPort());
1166 updateBlockStats();
1167
1168 dq.dh->rcode = RCode::Refused;
1169 dq.dh->qr = true;
1170 return true;
1171
1172 case DNSAction::Action::Truncate:
1173 if(!dq.tcp) {
1174 updateBlockStats();
1175 vinfolog("Query from %s truncated because of dynamic block", dq.remote->toStringWithPort());
1176 dq.dh->tc = true;
1177 dq.dh->qr = true;
1178 return true;
1179 }
1180 else {
1181 vinfolog("Query from %s for %s over TCP *not* truncated because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toLogString());
1182 }
1183 break;
1184 case DNSAction::Action::NoRecurse:
1185 updateBlockStats();
1186 vinfolog("Query from %s setting rd=0 because of dynamic block", dq.remote->toStringWithPort());
1187 dq.dh->rd = false;
1188 return true;
1189 default:
1190 updateBlockStats();
1191 vinfolog("Query from %s dropped because of dynamic block", dq.remote->toStringWithPort());
1192 return false;
1193 }
1194 }
1195 }
1196
1197 if(auto got = holders.dynSMTBlock->lookup(*dq.qname)) {
1198 auto updateBlockStats = [&got]() {
1199 ++g_stats.dynBlocked;
1200 got->blocks++;
1201 };
1202
1203 if(now < got->until) {
1204 DNSAction::Action action = got->action;
1205 if (action == DNSAction::Action::None) {
1206 action = g_dynBlockAction;
1207 }
1208 switch (action) {
1209 case DNSAction::Action::NoOp:
1210 /* do nothing */
1211 break;
1212 case DNSAction::Action::Nxdomain:
1213 vinfolog("Query from %s for %s turned into NXDomain because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toLogString());
1214 updateBlockStats();
1215
1216 dq.dh->rcode = RCode::NXDomain;
1217 dq.dh->qr=true;
1218 return true;
1219 case DNSAction::Action::Refused:
1220 vinfolog("Query from %s for %s refused because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toLogString());
1221 updateBlockStats();
1222
1223 dq.dh->rcode = RCode::Refused;
1224 dq.dh->qr=true;
1225 return true;
1226 case DNSAction::Action::Truncate:
1227 if(!dq.tcp) {
1228 updateBlockStats();
1229
1230 vinfolog("Query from %s for %s truncated because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toLogString());
1231 dq.dh->tc = true;
1232 dq.dh->qr = true;
1233 return true;
1234 }
1235 else {
1236 vinfolog("Query from %s for %s over TCP *not* truncated because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toLogString());
1237 }
1238 break;
1239 case DNSAction::Action::NoRecurse:
1240 updateBlockStats();
1241 vinfolog("Query from %s setting rd=0 because of dynamic block", dq.remote->toStringWithPort());
1242 dq.dh->rd = false;
1243 return true;
1244 default:
1245 updateBlockStats();
1246 vinfolog("Query from %s for %s dropped because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toLogString());
1247 return false;
1248 }
1249 }
1250 }
1251
1252 DNSAction::Action action=DNSAction::Action::None;
1253 string ruleresult;
1254 bool drop = false;
1255 for(const auto& lr : *holders.rulactions) {
1256 if(lr.d_rule->matches(&dq)) {
1257 lr.d_rule->d_matches++;
1258 action=(*lr.d_action)(&dq, &ruleresult);
1259 if (processRulesResult(action, dq, ruleresult, drop)) {
1260 break;
1261 }
1262 }
1263 }
1264
1265 if (drop) {
1266 return false;
1267 }
1268
1269 return true;
1270 }
1271
1272 ssize_t udpClientSendRequestToBackend(const std::shared_ptr<DownstreamState>& ss, const int sd, const char* request, const size_t requestLen, bool healthCheck)
1273 {
1274 ssize_t result;
1275
1276 if (ss->sourceItf == 0) {
1277 result = send(sd, request, requestLen, 0);
1278 }
1279 else {
1280 struct msghdr msgh;
1281 struct iovec iov;
1282 cmsgbuf_aligned cbuf;
1283 ComboAddress remote(ss->remote);
1284 fillMSGHdr(&msgh, &iov, &cbuf, sizeof(cbuf), const_cast<char*>(request), requestLen, &remote);
1285 addCMsgSrcAddr(&msgh, &cbuf, &ss->sourceAddr, ss->sourceItf);
1286 result = sendmsg(sd, &msgh, 0);
1287 }
1288
1289 if (result == -1) {
1290 int savederrno = errno;
1291 vinfolog("Error sending request to backend %s: %d", ss->remote.toStringWithPort(), savederrno);
1292
1293 /* This might sound silly, but on Linux send() might fail with EINVAL
1294 if the interface the socket was bound to doesn't exist anymore.
1295 We don't want to reconnect the real socket if the healthcheck failed,
1296 because it's not using the same socket.
1297 */
1298 if (!healthCheck && (savederrno == EINVAL || savederrno == ENODEV)) {
1299 ss->reconnect();
1300 }
1301 }
1302
1303 return result;
1304 }
1305
1306 static bool isUDPQueryAcceptable(ClientState& cs, LocalHolders& holders, const struct msghdr* msgh, const ComboAddress& remote, ComboAddress& dest)
1307 {
1308 if (msgh->msg_flags & MSG_TRUNC) {
1309 /* message was too large for our buffer */
1310 vinfolog("Dropping message too large for our buffer");
1311 ++g_stats.nonCompliantQueries;
1312 return false;
1313 }
1314
1315 if(!holders.acl->match(remote)) {
1316 vinfolog("Query from %s dropped because of ACL", remote.toStringWithPort());
1317 ++g_stats.aclDrops;
1318 return false;
1319 }
1320
1321 cs.queries++;
1322 ++g_stats.queries;
1323
1324 if (HarvestDestinationAddress(msgh, &dest)) {
1325 /* we don't get the port, only the address */
1326 dest.sin4.sin_port = cs.local.sin4.sin_port;
1327 }
1328 else {
1329 dest.sin4.sin_family = 0;
1330 }
1331
1332 return true;
1333 }
1334
1335 boost::optional<std::vector<uint8_t>> checkDNSCryptQuery(const ClientState& cs, const char* query, uint16_t& len, std::shared_ptr<DNSCryptQuery>& dnsCryptQuery, time_t now, bool tcp)
1336 {
1337 if (cs.dnscryptCtx) {
1338 #ifdef HAVE_DNSCRYPT
1339 vector<uint8_t> response;
1340 uint16_t decryptedQueryLen = 0;
1341
1342 dnsCryptQuery = std::make_shared<DNSCryptQuery>(cs.dnscryptCtx);
1343
1344 bool decrypted = handleDNSCryptQuery(const_cast<char*>(query), len, dnsCryptQuery, &decryptedQueryLen, tcp, now, response);
1345
1346 if (!decrypted) {
1347 if (response.size() > 0) {
1348 return response;
1349 }
1350 throw std::runtime_error("Unable to decrypt DNSCrypt query, dropping.");
1351 }
1352
1353 len = decryptedQueryLen;
1354 #endif /* HAVE_DNSCRYPT */
1355 }
1356 return boost::none;
1357 }
1358
1359 bool checkQueryHeaders(const struct dnsheader* dh)
1360 {
1361 if (dh->qr) { // don't respond to responses
1362 ++g_stats.nonCompliantQueries;
1363 return false;
1364 }
1365
1366 if (dh->qdcount == 0) {
1367 ++g_stats.emptyQueries;
1368 return false;
1369 }
1370
1371 if (dh->rd) {
1372 ++g_stats.rdQueries;
1373 }
1374
1375 return true;
1376 }
1377
1378 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1379 static void queueResponse(const ClientState& cs, const char* response, uint16_t responseLen, const ComboAddress& dest, const ComboAddress& remote, struct mmsghdr& outMsg, struct iovec* iov, cmsgbuf_aligned* cbuf)
1380 {
1381 outMsg.msg_len = 0;
1382 fillMSGHdr(&outMsg.msg_hdr, iov, nullptr, 0, const_cast<char*>(response), responseLen, const_cast<ComboAddress*>(&remote));
1383
1384 if (dest.sin4.sin_family == 0) {
1385 outMsg.msg_hdr.msg_control = nullptr;
1386 }
1387 else {
1388 addCMsgSrcAddr(&outMsg.msg_hdr, cbuf, &dest, 0);
1389 }
1390 }
1391 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1392
1393 /* self-generated responses or cache hits */
1394 static bool prepareOutgoingResponse(LocalHolders& holders, ClientState& cs, DNSQuestion& dq, bool cacheHit)
1395 {
1396 DNSResponse dr(dq.qname, dq.qtype, dq.qclass, dq.consumed, dq.local, dq.remote, reinterpret_cast<dnsheader*>(dq.dh), dq.size, dq.len, dq.tcp, dq.queryTime);
1397
1398 #ifdef HAVE_PROTOBUF
1399 dr.uniqueId = dq.uniqueId;
1400 #endif
1401 dr.qTag = dq.qTag;
1402 dr.delayMsec = dq.delayMsec;
1403
1404 if (!applyRulesToResponse(cacheHit ? holders.cacheHitRespRulactions : holders.selfAnsweredRespRulactions, dr)) {
1405 return false;
1406 }
1407
1408 /* in case a rule changed it */
1409 dq.delayMsec = dr.delayMsec;
1410
1411 #ifdef HAVE_DNSCRYPT
1412 if (!cs.muted) {
1413 if (!encryptResponse(reinterpret_cast<char*>(dq.dh), &dq.len, dq.size, dq.tcp, dq.dnsCryptQuery, nullptr, nullptr)) {
1414 return false;
1415 }
1416 }
1417 #endif /* HAVE_DNSCRYPT */
1418
1419 if (cacheHit) {
1420 ++g_stats.cacheHits;
1421 }
1422
1423 switch (dr.dh->rcode) {
1424 case RCode::NXDomain:
1425 ++g_stats.frontendNXDomain;
1426 break;
1427 case RCode::ServFail:
1428 ++g_stats.frontendServFail;
1429 break;
1430 case RCode::NoError:
1431 ++g_stats.frontendNoError;
1432 break;
1433 }
1434
1435 doLatencyStats(0); // we're not going to measure this
1436 return true;
1437 }
1438
1439 ProcessQueryResult processQuery(DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend)
1440 {
1441 const uint16_t queryId = ntohs(dq.dh->id);
1442
1443 try {
1444 /* we need an accurate ("real") value for the response and
1445 to store into the IDS, but not for insertion into the
1446 rings for example */
1447 struct timespec now;
1448 gettime(&now);
1449
1450 if (!applyRulesToQuery(holders, dq, now)) {
1451 return ProcessQueryResult::Drop;
1452 }
1453
1454 if(dq.dh->qr) { // something turned it into a response
1455 fixUpQueryTurnedResponse(dq, dq.origFlags);
1456
1457 if (!prepareOutgoingResponse(holders, cs, dq, false)) {
1458 return ProcessQueryResult::Drop;
1459 }
1460
1461 ++g_stats.selfAnswered;
1462 ++cs.responses;
1463 return ProcessQueryResult::SendAnswer;
1464 }
1465
1466 std::shared_ptr<ServerPool> serverPool = getPool(*holders.pools, dq.poolname);
1467 dq.packetCache = serverPool->packetCache;
1468 auto policy = *(holders.policy);
1469 if (serverPool->policy != nullptr) {
1470 policy = *(serverPool->policy);
1471 }
1472 auto servers = serverPool->getServers();
1473 if (policy.isLua) {
1474 std::lock_guard<std::mutex> lock(g_luamutex);
1475 selectedBackend = policy.policy(servers, &dq);
1476 }
1477 else {
1478 selectedBackend = policy.policy(servers, &dq);
1479 }
1480
1481 uint16_t cachedResponseSize = dq.size;
1482 uint32_t allowExpired = selectedBackend ? 0 : g_staleCacheEntriesTTL;
1483
1484 if (dq.packetCache && !dq.skipCache) {
1485 dq.dnssecOK = (getEDNSZ(dq) & EDNS_HEADER_FLAG_DO);
1486 }
1487
1488 if (dq.useECS && ((selectedBackend && selectedBackend->useECS) || (!selectedBackend && serverPool->getECS()))) {
1489 // we special case our cache in case a downstream explicitly gave us a universally valid response with a 0 scope
1490 // we need ECS parsing (parseECS) to be true so we can be sure that the initial incoming query did not have an existing
1491 // ECS option, which would make it unsuitable for the zero-scope feature.
1492 if (dq.packetCache && !dq.skipCache && (!selectedBackend || !selectedBackend->disableZeroScope) && dq.packetCache->isECSParsingEnabled()) {
1493 if (dq.packetCache->get(dq, dq.consumed, dq.dh->id, reinterpret_cast<char*>(dq.dh), &cachedResponseSize, &dq.cacheKeyNoECS, dq.subnet, dq.dnssecOK, allowExpired)) {
1494 dq.len = cachedResponseSize;
1495
1496 if (!prepareOutgoingResponse(holders, cs, dq, true)) {
1497 return ProcessQueryResult::Drop;
1498 }
1499
1500 return ProcessQueryResult::SendAnswer;
1501 }
1502
1503 if (!dq.subnet) {
1504 /* there was no existing ECS on the query, enable the zero-scope feature */
1505 dq.useZeroScope = true;
1506 }
1507 }
1508
1509 if (!handleEDNSClientSubnet(dq, &(dq.ednsAdded), &(dq.ecsAdded), g_preserveTrailingData)) {
1510 vinfolog("Dropping query from %s because we couldn't insert the ECS value", dq.remote->toStringWithPort());
1511 return ProcessQueryResult::Drop;
1512 }
1513 }
1514
1515 if (dq.packetCache && !dq.skipCache) {
1516 if (dq.packetCache->get(dq, dq.consumed, dq.dh->id, reinterpret_cast<char*>(dq.dh), &cachedResponseSize, &dq.cacheKey, dq.subnet, dq.dnssecOK, allowExpired)) {
1517 dq.len = cachedResponseSize;
1518
1519 if (!prepareOutgoingResponse(holders, cs, dq, true)) {
1520 return ProcessQueryResult::Drop;
1521 }
1522
1523 return ProcessQueryResult::SendAnswer;
1524 }
1525 ++g_stats.cacheMisses;
1526 }
1527
1528 if(!selectedBackend) {
1529 ++g_stats.noPolicy;
1530
1531 vinfolog("%s query for %s|%s from %s, no policy applied", g_servFailOnNoPolicy ? "ServFailed" : "Dropped", dq.qname->toLogString(), QType(dq.qtype).getName(), dq.remote->toStringWithPort());
1532 if (g_servFailOnNoPolicy) {
1533 restoreFlags(dq.dh, dq.origFlags);
1534
1535 dq.dh->rcode = RCode::ServFail;
1536 dq.dh->qr = true;
1537
1538 if (!prepareOutgoingResponse(holders, cs, dq, false)) {
1539 return ProcessQueryResult::Drop;
1540 }
1541 // no response-only statistics counter to update.
1542 return ProcessQueryResult::SendAnswer;
1543 }
1544
1545 return ProcessQueryResult::Drop;
1546 }
1547
1548 if (dq.addXPF && selectedBackend->xpfRRCode != 0) {
1549 addXPF(dq, selectedBackend->xpfRRCode, g_preserveTrailingData);
1550 }
1551
1552 selectedBackend->queries++;
1553 return ProcessQueryResult::PassToBackend;
1554 }
1555 catch(const std::exception& e){
1556 vinfolog("Got an error while parsing a %s query from %s, id %d: %s", (dq.tcp ? "TCP" : "UDP"), dq.remote->toStringWithPort(), queryId, e.what());
1557 }
1558 return ProcessQueryResult::Drop;
1559 }
1560
1561 static void processUDPQuery(ClientState& cs, LocalHolders& holders, const struct msghdr* msgh, const ComboAddress& remote, ComboAddress& dest, char* query, uint16_t len, size_t queryBufferSize, struct mmsghdr* responsesVect, unsigned int* queuedResponses, struct iovec* respIOV, cmsgbuf_aligned* respCBuf)
1562 {
1563 assert(responsesVect == nullptr || (queuedResponses != nullptr && respIOV != nullptr && respCBuf != nullptr));
1564 uint16_t queryId = 0;
1565
1566 try {
1567 if (!isUDPQueryAcceptable(cs, holders, msgh, remote, dest)) {
1568 return;
1569 }
1570
1571 /* we need an accurate ("real") value for the response and
1572 to store into the IDS, but not for insertion into the
1573 rings for example */
1574 struct timespec queryRealTime;
1575 gettime(&queryRealTime, true);
1576
1577 std::shared_ptr<DNSCryptQuery> dnsCryptQuery = nullptr;
1578 auto dnsCryptResponse = checkDNSCryptQuery(cs, query, len, dnsCryptQuery, queryRealTime.tv_sec, false);
1579 if (dnsCryptResponse) {
1580 sendUDPResponse(cs.udpFD, reinterpret_cast<char*>(dnsCryptResponse->data()), static_cast<uint16_t>(dnsCryptResponse->size()), 0, dest, remote);
1581 return;
1582 }
1583
1584 struct dnsheader* dh = reinterpret_cast<struct dnsheader*>(query);
1585 queryId = ntohs(dh->id);
1586
1587 if (!checkQueryHeaders(dh)) {
1588 return;
1589 }
1590
1591 uint16_t qtype, qclass;
1592 unsigned int consumed = 0;
1593 DNSName qname(query, len, sizeof(dnsheader), false, &qtype, &qclass, &consumed);
1594 DNSQuestion dq(&qname, qtype, qclass, consumed, dest.sin4.sin_family != 0 ? &dest : &cs.local, &remote, dh, queryBufferSize, len, false, &queryRealTime);
1595 dq.dnsCryptQuery = std::move(dnsCryptQuery);
1596 std::shared_ptr<DownstreamState> ss{nullptr};
1597 auto result = processQuery(dq, cs, holders, ss);
1598
1599 if (result == ProcessQueryResult::Drop) {
1600 return;
1601 }
1602
1603 if (result == ProcessQueryResult::SendAnswer) {
1604 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1605 if (dq.delayMsec == 0 && responsesVect != nullptr) {
1606 queueResponse(cs, reinterpret_cast<char*>(dq.dh), dq.len, *dq.local, *dq.remote, responsesVect[*queuedResponses], respIOV, respCBuf);
1607 (*queuedResponses)++;
1608 return;
1609 }
1610 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1611 /* we use dest, always, because we don't want to use the listening address to send a response since it could be 0.0.0.0 */
1612 sendUDPResponse(cs.udpFD, reinterpret_cast<char*>(dq.dh), dq.len, dq.delayMsec, dest, *dq.remote);
1613 return;
1614 }
1615
1616 if (result != ProcessQueryResult::PassToBackend || ss == nullptr) {
1617 return;
1618 }
1619
1620 unsigned int idOffset = (ss->idOffset++) % ss->idStates.size();
1621 IDState* ids = &ss->idStates[idOffset];
1622 ids->age = 0;
1623 DOHUnit* du = nullptr;
1624
1625 /* that means that the state was in use, possibly with an allocated
1626 DOHUnit that we will need to handle, but we can't touch it before
1627 confirming that we now own this state */
1628 if (ids->isInUse()) {
1629 du = ids->du;
1630 }
1631
1632 /* we atomically replace the value, we now own this state */
1633 if (!ids->markAsUsed()) {
1634 /* the state was not in use.
1635 we reset 'du' because it might have still been in use when we read it. */
1636 du = nullptr;
1637 ++ss->outstanding;
1638 }
1639 else {
1640 /* we are reusing a state, no change in outstanding but if there was an existing DOHUnit we need
1641 to handle it because it's about to be overwritten. */
1642 ids->du = nullptr;
1643 ++ss->reuseds;
1644 ++g_stats.downstreamTimeouts;
1645 handleDOHTimeout(du);
1646 }
1647
1648 ids->cs = &cs;
1649 ids->origFD = cs.udpFD;
1650 ids->origID = dh->id;
1651 setIDStateFromDNSQuestion(*ids, dq, std::move(qname));
1652
1653 /* If we couldn't harvest the real dest addr, still
1654 write down the listening addr since it will be useful
1655 (especially if it's not an 'any' one).
1656 We need to keep track of which one it is since we may
1657 want to use the real but not the listening addr to reply.
1658 */
1659 if (dest.sin4.sin_family != 0) {
1660 ids->origDest = dest;
1661 ids->destHarvested = true;
1662 }
1663 else {
1664 ids->origDest = cs.local;
1665 ids->destHarvested = false;
1666 }
1667
1668 dh->id = idOffset;
1669
1670 int fd = pickBackendSocketForSending(ss);
1671 ssize_t ret = udpClientSendRequestToBackend(ss, fd, query, dq.len);
1672
1673 if(ret < 0) {
1674 ++ss->sendErrors;
1675 ++g_stats.downstreamSendErrors;
1676 }
1677
1678 vinfolog("Got query for %s|%s from %s, relayed to %s", ids->qname.toLogString(), QType(ids->qtype).getName(), remote.toStringWithPort(), ss->getName());
1679 }
1680 catch(const std::exception& e){
1681 vinfolog("Got an error in UDP question thread while parsing a query from %s, id %d: %s", remote.toStringWithPort(), queryId, e.what());
1682 }
1683 }
1684
1685 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1686 static void MultipleMessagesUDPClientThread(ClientState* cs, LocalHolders& holders)
1687 {
1688 struct MMReceiver
1689 {
1690 char packet[s_maxPacketCacheEntrySize];
1691 ComboAddress remote;
1692 ComboAddress dest;
1693 struct iovec iov;
1694 /* used by HarvestDestinationAddress */
1695 cmsgbuf_aligned cbuf;
1696 };
1697 const size_t vectSize = g_udpVectorSize;
1698 /* the actual buffer is larger because:
1699 - we may have to add EDNS and/or ECS
1700 - we use it for self-generated responses (from rule or cache)
1701 but we only accept incoming payloads up to that size
1702 */
1703 static_assert(s_udpIncomingBufferSize <= sizeof(MMReceiver::packet), "the incoming buffer size should not be larger than sizeof(MMReceiver::packet)");
1704
1705 auto recvData = std::unique_ptr<MMReceiver[]>(new MMReceiver[vectSize]);
1706 auto msgVec = std::unique_ptr<struct mmsghdr[]>(new struct mmsghdr[vectSize]);
1707 auto outMsgVec = std::unique_ptr<struct mmsghdr[]>(new struct mmsghdr[vectSize]);
1708
1709 /* initialize the structures needed to receive our messages */
1710 for (size_t idx = 0; idx < vectSize; idx++) {
1711 recvData[idx].remote.sin4.sin_family = cs->local.sin4.sin_family;
1712 fillMSGHdr(&msgVec[idx].msg_hdr, &recvData[idx].iov, &recvData[idx].cbuf, sizeof(recvData[idx].cbuf), recvData[idx].packet, s_udpIncomingBufferSize, &recvData[idx].remote);
1713 }
1714
1715 /* go now */
1716 for(;;) {
1717
1718 /* reset the IO vector, since it's also used to send the vector of responses
1719 to avoid having to copy the data around */
1720 for (size_t idx = 0; idx < vectSize; idx++) {
1721 recvData[idx].iov.iov_base = recvData[idx].packet;
1722 recvData[idx].iov.iov_len = sizeof(recvData[idx].packet);
1723 }
1724
1725 /* block until we have at least one message ready, but return
1726 as many as possible to save the syscall costs */
1727 int msgsGot = recvmmsg(cs->udpFD, msgVec.get(), vectSize, MSG_WAITFORONE | MSG_TRUNC, nullptr);
1728
1729 if (msgsGot <= 0) {
1730 vinfolog("Getting UDP messages via recvmmsg() failed with: %s", stringerror());
1731 continue;
1732 }
1733
1734 unsigned int msgsToSend = 0;
1735
1736 /* process the received messages */
1737 for (int msgIdx = 0; msgIdx < msgsGot; msgIdx++) {
1738 const struct msghdr* msgh = &msgVec[msgIdx].msg_hdr;
1739 unsigned int got = msgVec[msgIdx].msg_len;
1740 const ComboAddress& remote = recvData[msgIdx].remote;
1741
1742 if (static_cast<size_t>(got) < sizeof(struct dnsheader)) {
1743 ++g_stats.nonCompliantQueries;
1744 continue;
1745 }
1746
1747 processUDPQuery(*cs, holders, msgh, remote, recvData[msgIdx].dest, recvData[msgIdx].packet, static_cast<uint16_t>(got), sizeof(recvData[msgIdx].packet), outMsgVec.get(), &msgsToSend, &recvData[msgIdx].iov, &recvData[msgIdx].cbuf);
1748
1749 }
1750
1751 /* immediate (not delayed or sent to a backend) responses (mostly from a rule, dynamic block
1752 or the cache) can be sent in batch too */
1753
1754 if (msgsToSend > 0 && msgsToSend <= static_cast<unsigned int>(msgsGot)) {
1755 int sent = sendmmsg(cs->udpFD, outMsgVec.get(), msgsToSend, 0);
1756
1757 if (sent < 0 || static_cast<unsigned int>(sent) != msgsToSend) {
1758 vinfolog("Error sending responses with sendmmsg() (%d on %u): %s", sent, msgsToSend, stringerror());
1759 }
1760 }
1761
1762 }
1763 }
1764 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1765
1766 // listens to incoming queries, sends out to downstream servers, noting the intended return path
1767 static void udpClientThread(ClientState* cs)
1768 try
1769 {
1770 setThreadName("dnsdist/udpClie");
1771 LocalHolders holders;
1772
1773 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1774 if (g_udpVectorSize > 1) {
1775 MultipleMessagesUDPClientThread(cs, holders);
1776
1777 }
1778 else
1779 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1780 {
1781 char packet[s_maxPacketCacheEntrySize];
1782 /* the actual buffer is larger because:
1783 - we may have to add EDNS and/or ECS
1784 - we use it for self-generated responses (from rule or cache)
1785 but we only accept incoming payloads up to that size
1786 */
1787 static_assert(s_udpIncomingBufferSize <= sizeof(packet), "the incoming buffer size should not be larger than sizeof(MMReceiver::packet)");
1788 struct msghdr msgh;
1789 struct iovec iov;
1790 /* used by HarvestDestinationAddress */
1791 cmsgbuf_aligned cbuf;
1792
1793 ComboAddress remote;
1794 ComboAddress dest;
1795 remote.sin4.sin_family = cs->local.sin4.sin_family;
1796 fillMSGHdr(&msgh, &iov, &cbuf, sizeof(cbuf), packet, s_udpIncomingBufferSize, &remote);
1797
1798 for(;;) {
1799 ssize_t got = recvmsg(cs->udpFD, &msgh, 0);
1800
1801 if (got < 0 || static_cast<size_t>(got) < sizeof(struct dnsheader)) {
1802 ++g_stats.nonCompliantQueries;
1803 continue;
1804 }
1805
1806 processUDPQuery(*cs, holders, &msgh, remote, dest, packet, static_cast<uint16_t>(got), sizeof(packet), nullptr, nullptr, nullptr, nullptr);
1807 }
1808 }
1809 }
1810 catch(const std::exception &e)
1811 {
1812 errlog("UDP client thread died because of exception: %s", e.what());
1813 }
1814 catch(const PDNSException &e)
1815 {
1816 errlog("UDP client thread died because of PowerDNS exception: %s", e.reason);
1817 }
1818 catch(...)
1819 {
1820 errlog("UDP client thread died because of an exception: %s", "unknown");
1821 }
1822
1823 uint16_t getRandomDNSID()
1824 {
1825 #ifdef HAVE_LIBSODIUM
1826 return (randombytes_random() % 65536);
1827 #else
1828 return (random() % 65536);
1829 #endif
1830 }
1831
1832 static bool upCheck(const shared_ptr<DownstreamState>& ds)
1833 try
1834 {
1835 DNSName checkName = ds->checkName;
1836 uint16_t checkType = ds->checkType.getCode();
1837 uint16_t checkClass = ds->checkClass;
1838 dnsheader checkHeader;
1839 memset(&checkHeader, 0, sizeof(checkHeader));
1840
1841 checkHeader.qdcount = htons(1);
1842 checkHeader.id = getRandomDNSID();
1843
1844 checkHeader.rd = true;
1845 if (ds->setCD) {
1846 checkHeader.cd = true;
1847 }
1848
1849 if (ds->checkFunction) {
1850 std::lock_guard<std::mutex> lock(g_luamutex);
1851 auto ret = ds->checkFunction(checkName, checkType, checkClass, &checkHeader);
1852 checkName = std::get<0>(ret);
1853 checkType = std::get<1>(ret);
1854 checkClass = std::get<2>(ret);
1855 }
1856
1857 vector<uint8_t> packet;
1858 DNSPacketWriter dpw(packet, checkName, checkType, checkClass);
1859 dnsheader * requestHeader = dpw.getHeader();
1860 *requestHeader = checkHeader;
1861
1862 Socket sock(ds->remote.sin4.sin_family, SOCK_DGRAM);
1863 sock.setNonBlocking();
1864 if (!IsAnyAddress(ds->sourceAddr)) {
1865 sock.setReuseAddr();
1866 if (!ds->sourceItfName.empty()) {
1867 #ifdef SO_BINDTODEVICE
1868 int res = setsockopt(sock.getHandle(), SOL_SOCKET, SO_BINDTODEVICE, ds->sourceItfName.c_str(), ds->sourceItfName.length());
1869 if (res != 0 && g_verboseHealthChecks) {
1870 infolog("Error setting SO_BINDTODEVICE on the health check socket for backend '%s': %s", ds->getNameWithAddr(), stringerror());
1871 }
1872 #endif
1873 }
1874 sock.bind(ds->sourceAddr);
1875 }
1876 sock.connect(ds->remote);
1877 ssize_t sent = udpClientSendRequestToBackend(ds, sock.getHandle(), reinterpret_cast<char*>(&packet[0]), packet.size(), true);
1878 if (sent < 0) {
1879 int ret = errno;
1880 if (g_verboseHealthChecks)
1881 infolog("Error while sending a health check query to backend %s: %d", ds->getNameWithAddr(), ret);
1882 return false;
1883 }
1884
1885 int ret = waitForRWData(sock.getHandle(), true, /* ms to seconds */ ds->checkTimeout / 1000, /* remaining ms to us */ (ds->checkTimeout % 1000) * 1000);
1886 if(ret < 0 || !ret) { // error, timeout, both are down!
1887 if (ret < 0) {
1888 ret = errno;
1889 if (g_verboseHealthChecks)
1890 infolog("Error while waiting for the health check response from backend %s: %d", ds->getNameWithAddr(), ret);
1891 }
1892 else {
1893 if (g_verboseHealthChecks)
1894 infolog("Timeout while waiting for the health check response from backend %s", ds->getNameWithAddr());
1895 }
1896 return false;
1897 }
1898
1899 string reply;
1900 ComboAddress from;
1901 sock.recvFrom(reply, from);
1902
1903 /* we are using a connected socket but hey.. */
1904 if (from != ds->remote) {
1905 if (g_verboseHealthChecks)
1906 infolog("Invalid health check response received from %s, expecting one from %s", from.toStringWithPort(), ds->remote.toStringWithPort());
1907 return false;
1908 }
1909
1910 const dnsheader * responseHeader = reinterpret_cast<const dnsheader *>(reply.c_str());
1911
1912 if (reply.size() < sizeof(*responseHeader)) {
1913 if (g_verboseHealthChecks)
1914 infolog("Invalid health check response of size %d from backend %s, expecting at least %d", reply.size(), ds->getNameWithAddr(), sizeof(*responseHeader));
1915 return false;
1916 }
1917
1918 if (responseHeader->id != requestHeader->id) {
1919 if (g_verboseHealthChecks)
1920 infolog("Invalid health check response id %d from backend %s, expecting %d", responseHeader->id, ds->getNameWithAddr(), requestHeader->id);
1921 return false;
1922 }
1923
1924 if (!responseHeader->qr) {
1925 if (g_verboseHealthChecks)
1926 infolog("Invalid health check response from backend %s, expecting QR to be set", ds->getNameWithAddr());
1927 return false;
1928 }
1929
1930 if (responseHeader->rcode == RCode::ServFail) {
1931 if (g_verboseHealthChecks)
1932 infolog("Backend %s responded to health check with ServFail", ds->getNameWithAddr());
1933 return false;
1934 }
1935
1936 if (ds->mustResolve && (responseHeader->rcode == RCode::NXDomain || responseHeader->rcode == RCode::Refused)) {
1937 if (g_verboseHealthChecks)
1938 infolog("Backend %s responded to health check with %s while mustResolve is set", ds->getNameWithAddr(), responseHeader->rcode == RCode::NXDomain ? "NXDomain" : "Refused");
1939 return false;
1940 }
1941
1942 uint16_t receivedType;
1943 uint16_t receivedClass;
1944 DNSName receivedName(reply.c_str(), reply.size(), sizeof(dnsheader), false, &receivedType, &receivedClass);
1945
1946 if (receivedName != checkName || receivedType != checkType || receivedClass != checkClass) {
1947 if (g_verboseHealthChecks)
1948 infolog("Backend %s responded to health check with an invalid qname (%s vs %s), qtype (%s vs %s) or qclass (%d vs %d)", ds->getNameWithAddr(), receivedName.toLogString(), checkName.toLogString(), QType(receivedType).getName(), QType(checkType).getName(), receivedClass, checkClass);
1949 return false;
1950 }
1951
1952 return true;
1953 }
1954 catch(const std::exception& e)
1955 {
1956 if (g_verboseHealthChecks)
1957 infolog("Error checking the health of backend %s: %s", ds->getNameWithAddr(), e.what());
1958 return false;
1959 }
1960 catch(...)
1961 {
1962 if (g_verboseHealthChecks)
1963 infolog("Unknown exception while checking the health of backend %s", ds->getNameWithAddr());
1964 return false;
1965 }
1966
1967 uint64_t g_maxTCPClientThreads{10};
1968 std::atomic<uint16_t> g_cacheCleaningDelay{60};
1969 std::atomic<uint16_t> g_cacheCleaningPercentage{100};
1970
1971 void maintThread()
1972 {
1973 setThreadName("dnsdist/main");
1974 int interval = 1;
1975 size_t counter = 0;
1976 int32_t secondsToWaitLog = 0;
1977
1978 for(;;) {
1979 sleep(interval);
1980
1981 {
1982 std::lock_guard<std::mutex> lock(g_luamutex);
1983 auto f = g_lua.readVariable<boost::optional<std::function<void()> > >("maintenance");
1984 if(f) {
1985 try {
1986 (*f)();
1987 secondsToWaitLog = 0;
1988 }
1989 catch(std::exception &e) {
1990 if (secondsToWaitLog <= 0) {
1991 infolog("Error during execution of maintenance function: %s", e.what());
1992 secondsToWaitLog = 61;
1993 }
1994 secondsToWaitLog -= interval;
1995 }
1996 }
1997 }
1998
1999 counter++;
2000 if (counter >= g_cacheCleaningDelay) {
2001 /* keep track, for each cache, of whether we should keep
2002 expired entries */
2003 std::map<std::shared_ptr<DNSDistPacketCache>, bool> caches;
2004
2005 /* gather all caches actually used by at least one pool, and see
2006 if something prevents us from cleaning the expired entries */
2007 auto localPools = g_pools.getLocal();
2008 for (const auto& entry : *localPools) {
2009 auto& pool = entry.second;
2010
2011 auto packetCache = pool->packetCache;
2012 if (!packetCache) {
2013 continue;
2014 }
2015
2016 auto pair = caches.insert({packetCache, false});
2017 auto& iter = pair.first;
2018 /* if we need to keep stale data for this cache (ie, not clear
2019 expired entries when at least one pool using this cache
2020 has all its backends down) */
2021 if (packetCache->keepStaleData() && iter->second == false) {
2022 /* so far all pools had at least one backend up */
2023 if (pool->countServers(true) == 0) {
2024 iter->second = true;
2025 }
2026 }
2027 }
2028
2029 for (auto pair : caches) {
2030 /* shall we keep expired entries ? */
2031 if (pair.second == true) {
2032 continue;
2033 }
2034 auto& packetCache = pair.first;
2035 size_t upTo = (packetCache->getMaxEntries()* (100 - g_cacheCleaningPercentage)) / 100;
2036 packetCache->purgeExpired(upTo);
2037 }
2038 counter = 0;
2039 }
2040
2041 // ponder pruning g_dynblocks of expired entries here
2042 }
2043 }
2044
2045 static void secPollThread()
2046 {
2047 setThreadName("dnsdist/secpoll");
2048
2049 for (;;) {
2050 try {
2051 doSecPoll(g_secPollSuffix);
2052 }
2053 catch(...) {
2054 }
2055 sleep(g_secPollInterval);
2056 }
2057 }
2058
2059 static void healthChecksThread()
2060 {
2061 setThreadName("dnsdist/healthC");
2062
2063 int interval = 1;
2064
2065 for(;;) {
2066 sleep(interval);
2067
2068 if(g_tcpclientthreads->getQueuedCount() > 1 && !g_tcpclientthreads->hasReachedMaxThreads())
2069 g_tcpclientthreads->addTCPClientThread();
2070
2071 auto states = g_dstates.getLocal(); // this points to the actual shared_ptrs!
2072 for(auto& dss : *states) {
2073 if(++dss->lastCheck < dss->checkInterval)
2074 continue;
2075 dss->lastCheck = 0;
2076 if(dss->availability==DownstreamState::Availability::Auto) {
2077 bool newState=upCheck(dss);
2078 if (newState) {
2079 /* check succeeded */
2080 dss->currentCheckFailures = 0;
2081
2082 if (!dss->upStatus) {
2083 /* we were marked as down */
2084 dss->consecutiveSuccessfulChecks++;
2085 if (dss->consecutiveSuccessfulChecks < dss->minRiseSuccesses) {
2086 /* if we need more than one successful check to rise
2087 and we didn't reach the threshold yet,
2088 let's stay down */
2089 newState = false;
2090 }
2091 }
2092 }
2093 else {
2094 /* check failed */
2095 dss->consecutiveSuccessfulChecks = 0;
2096
2097 if (dss->upStatus) {
2098 /* we are currently up */
2099 dss->currentCheckFailures++;
2100 if (dss->currentCheckFailures < dss->maxCheckFailures) {
2101 /* we need more than one failure to be marked as down,
2102 and we did not reach the threshold yet, let's stay down */
2103 newState = true;
2104 }
2105 }
2106 }
2107
2108 if(newState != dss->upStatus) {
2109 warnlog("Marking downstream %s as '%s'", dss->getNameWithAddr(), newState ? "up" : "down");
2110
2111 if (newState && !dss->connected) {
2112 newState = dss->reconnect();
2113
2114 if (dss->connected && !dss->threadStarted.test_and_set()) {
2115 dss->tid = thread(responderThread, dss);
2116 }
2117 }
2118
2119 dss->upStatus = newState;
2120 dss->currentCheckFailures = 0;
2121 dss->consecutiveSuccessfulChecks = 0;
2122 if (g_snmpAgent && g_snmpTrapsEnabled) {
2123 g_snmpAgent->sendBackendStatusChangeTrap(dss);
2124 }
2125 }
2126 }
2127
2128 auto delta = dss->sw.udiffAndSet()/1000000.0;
2129 dss->queryLoad = 1.0*(dss->queries.load() - dss->prev.queries.load())/delta;
2130 dss->dropRate = 1.0*(dss->reuseds.load() - dss->prev.reuseds.load())/delta;
2131 dss->prev.queries.store(dss->queries.load());
2132 dss->prev.reuseds.store(dss->reuseds.load());
2133
2134 for(IDState& ids : dss->idStates) { // timeouts
2135 int64_t usageIndicator = ids.usageIndicator;
2136 if(IDState::isInUse(usageIndicator) && ids.age++ > g_udpTimeout) {
2137 /* We mark the state as unused as soon as possible
2138 to limit the risk of racing with the
2139 responder thread.
2140 */
2141 auto oldDU = ids.du;
2142
2143 if (!ids.tryMarkUnused(usageIndicator)) {
2144 /* this state has been altered in the meantime,
2145 don't go anywhere near it */
2146 continue;
2147 }
2148 ids.du = nullptr;
2149 handleDOHTimeout(oldDU);
2150 ids.age = 0;
2151 dss->reuseds++;
2152 --dss->outstanding;
2153 ++g_stats.downstreamTimeouts; // this is an 'actively' discovered timeout
2154 vinfolog("Had a downstream timeout from %s (%s) for query for %s|%s from %s",
2155 dss->remote.toStringWithPort(), dss->name,
2156 ids.qname.toLogString(), QType(ids.qtype).getName(), ids.origRemote.toStringWithPort());
2157
2158 struct timespec ts;
2159 gettime(&ts);
2160
2161 struct dnsheader fake;
2162 memset(&fake, 0, sizeof(fake));
2163 fake.id = ids.origID;
2164
2165 g_rings.insertResponse(ts, ids.origRemote, ids.qname, ids.qtype, std::numeric_limits<unsigned int>::max(), 0, fake, dss->remote);
2166 }
2167 }
2168 }
2169 }
2170 }
2171
2172 static void bindAny(int af, int sock)
2173 {
2174 __attribute__((unused)) int one = 1;
2175
2176 #ifdef IP_FREEBIND
2177 if (setsockopt(sock, IPPROTO_IP, IP_FREEBIND, &one, sizeof(one)) < 0)
2178 warnlog("Warning: IP_FREEBIND setsockopt failed: %s", stringerror());
2179 #endif
2180
2181 #ifdef IP_BINDANY
2182 if (af == AF_INET)
2183 if (setsockopt(sock, IPPROTO_IP, IP_BINDANY, &one, sizeof(one)) < 0)
2184 warnlog("Warning: IP_BINDANY setsockopt failed: %s", stringerror());
2185 #endif
2186 #ifdef IPV6_BINDANY
2187 if (af == AF_INET6)
2188 if (setsockopt(sock, IPPROTO_IPV6, IPV6_BINDANY, &one, sizeof(one)) < 0)
2189 warnlog("Warning: IPV6_BINDANY setsockopt failed: %s", stringerror());
2190 #endif
2191 #ifdef SO_BINDANY
2192 if (setsockopt(sock, SOL_SOCKET, SO_BINDANY, &one, sizeof(one)) < 0)
2193 warnlog("Warning: SO_BINDANY setsockopt failed: %s", stringerror());
2194 #endif
2195 }
2196
2197 static void dropGroupPrivs(gid_t gid)
2198 {
2199 if (gid) {
2200 if (setgid(gid) == 0) {
2201 if (setgroups(0, NULL) < 0) {
2202 warnlog("Warning: Unable to drop supplementary gids: %s", stringerror());
2203 }
2204 }
2205 else {
2206 warnlog("Warning: Unable to set group ID to %d: %s", gid, stringerror());
2207 }
2208 }
2209 }
2210
2211 static void dropUserPrivs(uid_t uid)
2212 {
2213 if(uid) {
2214 if(setuid(uid) < 0) {
2215 warnlog("Warning: Unable to set user ID to %d: %s", uid, stringerror());
2216 }
2217 }
2218 }
2219
2220 static void checkFileDescriptorsLimits(size_t udpBindsCount, size_t tcpBindsCount)
2221 {
2222 /* stdin, stdout, stderr */
2223 size_t requiredFDsCount = 3;
2224 auto backends = g_dstates.getLocal();
2225 /* UDP sockets to backends */
2226 size_t backendUDPSocketsCount = 0;
2227 for (const auto& backend : *backends) {
2228 backendUDPSocketsCount += backend->sockets.size();
2229 }
2230 requiredFDsCount += backendUDPSocketsCount;
2231 /* TCP sockets to backends */
2232 requiredFDsCount += (backends->size() * g_maxTCPClientThreads);
2233 /* listening sockets */
2234 requiredFDsCount += udpBindsCount;
2235 requiredFDsCount += tcpBindsCount;
2236 /* max TCP connections currently served */
2237 requiredFDsCount += g_maxTCPClientThreads;
2238 /* max pipes for communicating between TCP acceptors and client threads */
2239 requiredFDsCount += (g_maxTCPClientThreads * 2);
2240 /* max TCP queued connections */
2241 requiredFDsCount += g_maxTCPQueuedConnections;
2242 /* DelayPipe pipe */
2243 requiredFDsCount += 2;
2244 /* syslog socket */
2245 requiredFDsCount++;
2246 /* webserver main socket */
2247 requiredFDsCount++;
2248 /* console main socket */
2249 requiredFDsCount++;
2250 /* carbon export */
2251 requiredFDsCount++;
2252 /* history file */
2253 requiredFDsCount++;
2254 struct rlimit rl;
2255 getrlimit(RLIMIT_NOFILE, &rl);
2256 if (rl.rlim_cur <= requiredFDsCount) {
2257 warnlog("Warning, this configuration can use more than %d file descriptors, web server and console connections not included, and the current limit is %d.", std::to_string(requiredFDsCount), std::to_string(rl.rlim_cur));
2258 #ifdef HAVE_SYSTEMD
2259 warnlog("You can increase this value by using LimitNOFILE= in the systemd unit file or ulimit.");
2260 #else
2261 warnlog("You can increase this value by using ulimit.");
2262 #endif
2263 }
2264 }
2265
2266 static void setUpLocalBind(std::unique_ptr<ClientState>& cs)
2267 {
2268 /* skip some warnings if there is an identical UDP context */
2269 bool warn = cs->tcp == false || cs->tlsFrontend != nullptr || cs->dohFrontend != nullptr;
2270 int& fd = cs->tcp == false ? cs->udpFD : cs->tcpFD;
2271 (void) warn;
2272
2273 fd = SSocket(cs->local.sin4.sin_family, cs->tcp == false ? SOCK_DGRAM : SOCK_STREAM, 0);
2274
2275 if (cs->tcp) {
2276 SSetsockopt(fd, SOL_SOCKET, SO_REUSEADDR, 1);
2277 #ifdef TCP_DEFER_ACCEPT
2278 SSetsockopt(fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, 1);
2279 #endif
2280 if (cs->fastOpenQueueSize > 0) {
2281 #ifdef TCP_FASTOPEN
2282 SSetsockopt(fd, IPPROTO_TCP, TCP_FASTOPEN, cs->fastOpenQueueSize);
2283 #else
2284 if (warn) {
2285 warnlog("TCP Fast Open has been configured on local address '%s' but is not supported", cs->local.toStringWithPort());
2286 }
2287 #endif
2288 }
2289 }
2290
2291 if(cs->local.sin4.sin_family == AF_INET6) {
2292 SSetsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, 1);
2293 }
2294
2295 bindAny(cs->local.sin4.sin_family, fd);
2296
2297 if(!cs->tcp && IsAnyAddress(cs->local)) {
2298 int one=1;
2299 setsockopt(fd, IPPROTO_IP, GEN_IP_PKTINFO, &one, sizeof(one)); // linux supports this, so why not - might fail on other systems
2300 #ifdef IPV6_RECVPKTINFO
2301 setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, &one, sizeof(one));
2302 #endif
2303 }
2304
2305 if (cs->reuseport) {
2306 #ifdef SO_REUSEPORT
2307 SSetsockopt(fd, SOL_SOCKET, SO_REUSEPORT, 1);
2308 #else
2309 if (warn) {
2310 /* no need to warn again if configured but support is not available, we already did for UDP */
2311 warnlog("SO_REUSEPORT has been configured on local address '%s' but is not supported", cs->local.toStringWithPort());
2312 }
2313 #endif
2314 }
2315
2316 if (!cs->tcp) {
2317 if (cs->local.isIPv4()) {
2318 try {
2319 setSocketIgnorePMTU(cs->udpFD);
2320 }
2321 catch(const std::exception& e) {
2322 warnlog("Failed to set IP_MTU_DISCOVER on UDP server socket for local address '%s': %s", cs->local.toStringWithPort(), e.what());
2323 }
2324 }
2325 }
2326
2327 const std::string& itf = cs->interface;
2328 if (!itf.empty()) {
2329 #ifdef SO_BINDTODEVICE
2330 int res = setsockopt(fd, SOL_SOCKET, SO_BINDTODEVICE, itf.c_str(), itf.length());
2331 if (res != 0) {
2332 warnlog("Error setting up the interface on local address '%s': %s", cs->local.toStringWithPort(), stringerror());
2333 }
2334 #else
2335 if (warn) {
2336 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", cs->local.toStringWithPort());
2337 }
2338 #endif
2339 }
2340
2341 #ifdef HAVE_EBPF
2342 if (g_defaultBPFFilter) {
2343 cs->attachFilter(g_defaultBPFFilter);
2344 vinfolog("Attaching default BPF Filter to %s frontend %s", (!cs->tcp ? "UDP" : "TCP"), cs->local.toStringWithPort());
2345 }
2346 #endif /* HAVE_EBPF */
2347
2348 if (cs->tlsFrontend != nullptr) {
2349 if (!cs->tlsFrontend->setupTLS()) {
2350 errlog("Error while setting up TLS on local address '%s', exiting", cs->local.toStringWithPort());
2351 _exit(EXIT_FAILURE);
2352 }
2353 }
2354
2355 if (cs->dohFrontend != nullptr) {
2356 cs->dohFrontend->setup();
2357 }
2358
2359 SBind(fd, cs->local);
2360
2361 if (cs->tcp) {
2362 SListen(cs->tcpFD, SOMAXCONN);
2363 if (cs->tlsFrontend != nullptr) {
2364 warnlog("Listening on %s for TLS", cs->local.toStringWithPort());
2365 }
2366 else if (cs->dohFrontend != nullptr) {
2367 warnlog("Listening on %s for DoH", cs->local.toStringWithPort());
2368 }
2369 else if (cs->dnscryptCtx != nullptr) {
2370 warnlog("Listening on %s for DNSCrypt", cs->local.toStringWithPort());
2371 }
2372 else {
2373 warnlog("Listening on %s", cs->local.toStringWithPort());
2374 }
2375 }
2376
2377 cs->ready = true;
2378 }
2379
2380 struct
2381 {
2382 vector<string> locals;
2383 vector<string> remotes;
2384 bool checkConfig{false};
2385 bool beClient{false};
2386 bool beSupervised{false};
2387 string command;
2388 string config;
2389 string uid;
2390 string gid;
2391 } g_cmdLine;
2392
2393 std::atomic<bool> g_configurationDone{false};
2394
2395 static void usage()
2396 {
2397 cout<<endl;
2398 cout<<"Syntax: dnsdist [-C,--config file] [-c,--client [IP[:PORT]]]\n";
2399 cout<<"[-e,--execute cmd] [-h,--help] [-l,--local addr]\n";
2400 cout<<"[-v,--verbose] [--check-config] [--version]\n";
2401 cout<<"\n";
2402 cout<<"-a,--acl netmask Add this netmask to the ACL\n";
2403 cout<<"-C,--config file Load configuration from 'file'\n";
2404 cout<<"-c,--client Operate as a client, connect to dnsdist. This reads\n";
2405 cout<<" controlSocket from your configuration file, but also\n";
2406 cout<<" accepts an IP:PORT argument\n";
2407 #ifdef HAVE_LIBSODIUM
2408 cout<<"-k,--setkey KEY Use KEY for encrypted communication to dnsdist. This\n";
2409 cout<<" is similar to setting setKey in the configuration file.\n";
2410 cout<<" NOTE: this will leak this key in your shell's history\n";
2411 cout<<" and in the systems running process list.\n";
2412 #endif
2413 cout<<"--check-config Validate the configuration file and exit. The exit-code\n";
2414 cout<<" reflects the validation, 0 is OK, 1 means an error.\n";
2415 cout<<" Any errors are printed as well.\n";
2416 cout<<"-e,--execute cmd Connect to dnsdist and execute 'cmd'\n";
2417 cout<<"-g,--gid gid Change the process group ID after binding sockets\n";
2418 cout<<"-h,--help Display this helpful message\n";
2419 cout<<"-l,--local address Listen on this local address\n";
2420 cout<<"--supervised Don't open a console, I'm supervised\n";
2421 cout<<" (use with e.g. systemd and daemontools)\n";
2422 cout<<"--disable-syslog Don't log to syslog, only to stdout\n";
2423 cout<<" (use with e.g. systemd)\n";
2424 cout<<"-u,--uid uid Change the process user ID after binding sockets\n";
2425 cout<<"-v,--verbose Enable verbose mode\n";
2426 cout<<"-V,--version Show dnsdist version information and exit\n";
2427 }
2428
2429 int main(int argc, char** argv)
2430 try
2431 {
2432 size_t udpBindsCount = 0;
2433 size_t tcpBindsCount = 0;
2434 rl_attempted_completion_function = my_completion;
2435 rl_completion_append_character = 0;
2436
2437 signal(SIGPIPE, SIG_IGN);
2438 signal(SIGCHLD, SIG_IGN);
2439 openlog("dnsdist", LOG_PID|LOG_NDELAY, LOG_DAEMON);
2440
2441 #ifdef HAVE_LIBSODIUM
2442 if (sodium_init() == -1) {
2443 cerr<<"Unable to initialize crypto library"<<endl;
2444 exit(EXIT_FAILURE);
2445 }
2446 g_hashperturb=randombytes_uniform(0xffffffff);
2447 srandom(randombytes_uniform(0xffffffff));
2448 #else
2449 {
2450 struct timeval tv;
2451 gettimeofday(&tv, 0);
2452 srandom(tv.tv_sec ^ tv.tv_usec ^ getpid());
2453 g_hashperturb=random();
2454 }
2455
2456 #endif
2457 ComboAddress clientAddress = ComboAddress();
2458 g_cmdLine.config=SYSCONFDIR "/dnsdist.conf";
2459 struct option longopts[]={
2460 {"acl", required_argument, 0, 'a'},
2461 {"check-config", no_argument, 0, 1},
2462 {"client", no_argument, 0, 'c'},
2463 {"config", required_argument, 0, 'C'},
2464 {"disable-syslog", no_argument, 0, 2},
2465 {"execute", required_argument, 0, 'e'},
2466 {"gid", required_argument, 0, 'g'},
2467 {"help", no_argument, 0, 'h'},
2468 {"local", required_argument, 0, 'l'},
2469 {"setkey", required_argument, 0, 'k'},
2470 {"supervised", no_argument, 0, 3},
2471 {"uid", required_argument, 0, 'u'},
2472 {"verbose", no_argument, 0, 'v'},
2473 {"version", no_argument, 0, 'V'},
2474 {0,0,0,0}
2475 };
2476 int longindex=0;
2477 string optstring;
2478 for(;;) {
2479 int c=getopt_long(argc, argv, "a:cC:e:g:hk:l:u:vV", longopts, &longindex);
2480 if(c==-1)
2481 break;
2482 switch(c) {
2483 case 1:
2484 g_cmdLine.checkConfig=true;
2485 break;
2486 case 2:
2487 g_syslog=false;
2488 break;
2489 case 3:
2490 g_cmdLine.beSupervised=true;
2491 break;
2492 case 'C':
2493 g_cmdLine.config=optarg;
2494 break;
2495 case 'c':
2496 g_cmdLine.beClient=true;
2497 break;
2498 case 'e':
2499 g_cmdLine.command=optarg;
2500 break;
2501 case 'g':
2502 g_cmdLine.gid=optarg;
2503 break;
2504 case 'h':
2505 cout<<"dnsdist "<<VERSION<<endl;
2506 usage();
2507 cout<<"\n";
2508 exit(EXIT_SUCCESS);
2509 break;
2510 case 'a':
2511 optstring=optarg;
2512 g_ACL.modify([optstring](NetmaskGroup& nmg) { nmg.addMask(optstring); });
2513 break;
2514 case 'k':
2515 #ifdef HAVE_LIBSODIUM
2516 if (B64Decode(string(optarg), g_consoleKey) < 0) {
2517 cerr<<"Unable to decode key '"<<optarg<<"'."<<endl;
2518 exit(EXIT_FAILURE);
2519 }
2520 #else
2521 cerr<<"dnsdist has been built without libsodium, -k/--setkey is unsupported."<<endl;
2522 exit(EXIT_FAILURE);
2523 #endif
2524 break;
2525 case 'l':
2526 g_cmdLine.locals.push_back(trim_copy(string(optarg)));
2527 break;
2528 case 'u':
2529 g_cmdLine.uid=optarg;
2530 break;
2531 case 'v':
2532 g_verbose=true;
2533 break;
2534 case 'V':
2535 #ifdef LUAJIT_VERSION
2536 cout<<"dnsdist "<<VERSION<<" ("<<LUA_RELEASE<<" ["<<LUAJIT_VERSION<<"])"<<endl;
2537 #else
2538 cout<<"dnsdist "<<VERSION<<" ("<<LUA_RELEASE<<")"<<endl;
2539 #endif
2540 cout<<"Enabled features: ";
2541 #ifdef HAVE_CDB
2542 cout<<"cdb ";
2543 #endif
2544 #ifdef HAVE_DNS_OVER_TLS
2545 cout<<"dns-over-tls(";
2546 #ifdef HAVE_GNUTLS
2547 cout<<"gnutls";
2548 #ifdef HAVE_LIBSSL
2549 cout<<" ";
2550 #endif
2551 #endif
2552 #ifdef HAVE_LIBSSL
2553 cout<<"openssl";
2554 #endif
2555 cout<<") ";
2556 #endif
2557 #ifdef HAVE_DNS_OVER_HTTPS
2558 cout<<"dns-over-https(DOH) ";
2559 #endif
2560 #ifdef HAVE_DNSCRYPT
2561 cout<<"dnscrypt ";
2562 #endif
2563 #ifdef HAVE_EBPF
2564 cout<<"ebpf ";
2565 #endif
2566 #ifdef HAVE_FSTRM
2567 cout<<"fstrm ";
2568 #endif
2569 #ifdef HAVE_LIBCRYPTO
2570 cout<<"ipcipher ";
2571 #endif
2572 #ifdef HAVE_LIBSODIUM
2573 cout<<"libsodium ";
2574 #endif
2575 #ifdef HAVE_LMDB
2576 cout<<"lmdb ";
2577 #endif
2578 #ifdef HAVE_PROTOBUF
2579 cout<<"protobuf ";
2580 #endif
2581 #ifdef HAVE_RE2
2582 cout<<"re2 ";
2583 #endif
2584 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
2585 cout<<"recvmmsg/sendmmsg ";
2586 #endif
2587 #ifdef HAVE_NET_SNMP
2588 cout<<"snmp ";
2589 #endif
2590 #ifdef HAVE_SYSTEMD
2591 cout<<"systemd";
2592 #endif
2593 cout<<endl;
2594 exit(EXIT_SUCCESS);
2595 break;
2596 case '?':
2597 //getopt_long printed an error message.
2598 usage();
2599 exit(EXIT_FAILURE);
2600 break;
2601 }
2602 }
2603
2604 argc-=optind;
2605 argv+=optind;
2606 for(auto p = argv; *p; ++p) {
2607 if(g_cmdLine.beClient) {
2608 clientAddress = ComboAddress(*p, 5199);
2609 } else {
2610 g_cmdLine.remotes.push_back(*p);
2611 }
2612 }
2613
2614 ServerPolicy leastOutstandingPol{"leastOutstanding", leastOutstanding, false};
2615
2616 g_policy.setState(leastOutstandingPol);
2617 if(g_cmdLine.beClient || !g_cmdLine.command.empty()) {
2618 setupLua(true, g_cmdLine.config);
2619 if (clientAddress != ComboAddress())
2620 g_serverControl = clientAddress;
2621 doClient(g_serverControl, g_cmdLine.command);
2622 _exit(EXIT_SUCCESS);
2623 }
2624
2625 auto acl = g_ACL.getCopy();
2626 if(acl.empty()) {
2627 for(auto& addr : {"127.0.0.0/8", "10.0.0.0/8", "100.64.0.0/10", "169.254.0.0/16", "192.168.0.0/16", "172.16.0.0/12", "::1/128", "fc00::/7", "fe80::/10"})
2628 acl.addMask(addr);
2629 g_ACL.setState(acl);
2630 }
2631
2632 auto consoleACL = g_consoleACL.getCopy();
2633 for (const auto& mask : { "127.0.0.1/8", "::1/128" }) {
2634 consoleACL.addMask(mask);
2635 }
2636 g_consoleACL.setState(consoleACL);
2637
2638 if (g_cmdLine.checkConfig) {
2639 setupLua(true, g_cmdLine.config);
2640 // No exception was thrown
2641 infolog("Configuration '%s' OK!", g_cmdLine.config);
2642 _exit(EXIT_SUCCESS);
2643 }
2644
2645 auto todo=setupLua(false, g_cmdLine.config);
2646
2647 auto localPools = g_pools.getCopy();
2648 {
2649 bool precompute = false;
2650 if (g_policy.getLocal()->name == "chashed") {
2651 precompute = true;
2652 } else {
2653 for (const auto& entry: localPools) {
2654 if (entry.second->policy != nullptr && entry.second->policy->name == "chashed") {
2655 precompute = true;
2656 break ;
2657 }
2658 }
2659 }
2660 if (precompute) {
2661 vinfolog("Pre-computing hashes for consistent hash load-balancing policy");
2662 // pre compute hashes
2663 auto backends = g_dstates.getLocal();
2664 for (auto& backend: *backends) {
2665 backend->hash();
2666 }
2667 }
2668 }
2669
2670 if (!g_cmdLine.locals.empty()) {
2671 for (auto it = g_frontends.begin(); it != g_frontends.end(); ) {
2672 /* DoH, DoT and DNSCrypt frontends are separate */
2673 if ((*it)->dohFrontend == nullptr && (*it)->tlsFrontend == nullptr && (*it)->dnscryptCtx == nullptr) {
2674 it = g_frontends.erase(it);
2675 }
2676 else {
2677 ++it;
2678 }
2679 }
2680
2681 for(const auto& loc : g_cmdLine.locals) {
2682 /* UDP */
2683 g_frontends.push_back(std::unique_ptr<ClientState>(new ClientState(ComboAddress(loc, 53), false, false, 0, "", {})));
2684 /* TCP */
2685 g_frontends.push_back(std::unique_ptr<ClientState>(new ClientState(ComboAddress(loc, 53), true, false, 0, "", {})));
2686 }
2687 }
2688
2689 if (g_frontends.empty()) {
2690 /* UDP */
2691 g_frontends.push_back(std::unique_ptr<ClientState>(new ClientState(ComboAddress("127.0.0.1", 53), false, false, 0, "", {})));
2692 /* TCP */
2693 g_frontends.push_back(std::unique_ptr<ClientState>(new ClientState(ComboAddress("127.0.0.1", 53), true, false, 0, "", {})));
2694 }
2695
2696 g_configurationDone = true;
2697
2698 for(auto& frontend : g_frontends) {
2699 setUpLocalBind(frontend);
2700
2701 if (frontend->tcp == false) {
2702 ++udpBindsCount;
2703 }
2704 else {
2705 ++tcpBindsCount;
2706 }
2707 }
2708
2709 warnlog("dnsdist %s comes with ABSOLUTELY NO WARRANTY. This is free software, and you are welcome to redistribute it according to the terms of the GPL version 2", VERSION);
2710
2711 vector<string> vec;
2712 std::string acls;
2713 g_ACL.getLocal()->toStringVector(&vec);
2714 for(const auto& s : vec) {
2715 if (!acls.empty())
2716 acls += ", ";
2717 acls += s;
2718 }
2719 infolog("ACL allowing queries from: %s", acls.c_str());
2720 vec.clear();
2721 acls.clear();
2722 g_consoleACL.getLocal()->toStringVector(&vec);
2723 for (const auto& entry : vec) {
2724 if (!acls.empty()) {
2725 acls += ", ";
2726 }
2727 acls += entry;
2728 }
2729 infolog("Console ACL allowing connections from: %s", acls.c_str());
2730
2731 #ifdef HAVE_LIBSODIUM
2732 if (g_consoleEnabled && g_consoleKey.empty()) {
2733 warnlog("Warning, the console has been enabled via 'controlSocket()' but no key has been set with 'setKey()' so all connections will fail until a key has been set");
2734 }
2735 #endif
2736
2737 uid_t newgid=0;
2738 gid_t newuid=0;
2739
2740 if(!g_cmdLine.gid.empty())
2741 newgid = strToGID(g_cmdLine.gid.c_str());
2742
2743 if(!g_cmdLine.uid.empty())
2744 newuid = strToUID(g_cmdLine.uid.c_str());
2745
2746 dropGroupPrivs(newgid);
2747 dropUserPrivs(newuid);
2748 try {
2749 /* we might still have capabilities remaining,
2750 for example if we have been started as root
2751 without --uid or --gid (please don't do that)
2752 or as an unprivileged user with ambient
2753 capabilities like CAP_NET_BIND_SERVICE.
2754 */
2755 dropCapabilities(g_capabilitiesToRetain);
2756 }
2757 catch(const std::exception& e) {
2758 warnlog("%s", e.what());
2759 }
2760
2761 /* this need to be done _after_ dropping privileges */
2762 g_delay = new DelayPipe<DelayedPacket>();
2763
2764 if (g_snmpAgent) {
2765 g_snmpAgent->run();
2766 }
2767
2768 g_tcpclientthreads = std::unique_ptr<TCPClientCollection>(new TCPClientCollection(g_maxTCPClientThreads, g_useTCPSinglePipe));
2769
2770 for(auto& t : todo)
2771 t();
2772
2773 localPools = g_pools.getCopy();
2774 /* create the default pool no matter what */
2775 createPoolIfNotExists(localPools, "");
2776 if(g_cmdLine.remotes.size()) {
2777 for(const auto& address : g_cmdLine.remotes) {
2778 auto ret=std::make_shared<DownstreamState>(ComboAddress(address, 53));
2779 addServerToPool(localPools, "", ret);
2780 if (ret->connected && !ret->threadStarted.test_and_set()) {
2781 ret->tid = thread(responderThread, ret);
2782 }
2783 g_dstates.modify([ret](servers_t& servers) { servers.push_back(ret); });
2784 }
2785 }
2786 g_pools.setState(localPools);
2787
2788 if(g_dstates.getLocal()->empty()) {
2789 errlog("No downstream servers defined: all packets will get dropped");
2790 // you might define them later, but you need to know
2791 }
2792
2793 checkFileDescriptorsLimits(udpBindsCount, tcpBindsCount);
2794
2795 for(auto& dss : g_dstates.getCopy()) { // it is a copy, but the internal shared_ptrs are the real deal
2796 if(dss->availability==DownstreamState::Availability::Auto) {
2797 bool newState=upCheck(dss);
2798 warnlog("Marking downstream %s as '%s'", dss->getNameWithAddr(), newState ? "up" : "down");
2799 dss->upStatus = newState;
2800 }
2801 }
2802
2803 for(auto& cs : g_frontends) {
2804 if (cs->dohFrontend != nullptr) {
2805 #ifdef HAVE_DNS_OVER_HTTPS
2806 std::thread t1(dohThread, cs.get());
2807 if (!cs->cpus.empty()) {
2808 mapThreadToCPUList(t1.native_handle(), cs->cpus);
2809 }
2810 t1.detach();
2811 #endif /* HAVE_DNS_OVER_HTTPS */
2812 continue;
2813 }
2814 if (cs->udpFD >= 0) {
2815 thread t1(udpClientThread, cs.get());
2816 if (!cs->cpus.empty()) {
2817 mapThreadToCPUList(t1.native_handle(), cs->cpus);
2818 }
2819 t1.detach();
2820 }
2821 else if (cs->tcpFD >= 0) {
2822 thread t1(tcpAcceptorThread, cs.get());
2823 if (!cs->cpus.empty()) {
2824 mapThreadToCPUList(t1.native_handle(), cs->cpus);
2825 }
2826 t1.detach();
2827 }
2828 }
2829
2830 thread carbonthread(carbonDumpThread);
2831 carbonthread.detach();
2832
2833 thread stattid(maintThread);
2834 stattid.detach();
2835
2836 thread healththread(healthChecksThread);
2837
2838 if (!g_secPollSuffix.empty()) {
2839 thread secpollthread(secPollThread);
2840 secpollthread.detach();
2841 }
2842
2843 if(g_cmdLine.beSupervised) {
2844 #ifdef HAVE_SYSTEMD
2845 sd_notify(0, "READY=1");
2846 #endif
2847 healththread.join();
2848 }
2849 else {
2850 healththread.detach();
2851 doConsole();
2852 }
2853 _exit(EXIT_SUCCESS);
2854
2855 }
2856 catch(const LuaContext::ExecutionErrorException& e) {
2857 try {
2858 errlog("Fatal Lua error: %s", e.what());
2859 std::rethrow_if_nested(e);
2860 } catch(const std::exception& ne) {
2861 errlog("Details: %s", ne.what());
2862 }
2863 catch(PDNSException &ae)
2864 {
2865 errlog("Fatal pdns error: %s", ae.reason);
2866 }
2867 _exit(EXIT_FAILURE);
2868 }
2869 catch(std::exception &e)
2870 {
2871 errlog("Fatal error: %s", e.what());
2872 _exit(EXIT_FAILURE);
2873 }
2874 catch(PDNSException &ae)
2875 {
2876 errlog("Fatal pdns error: %s", ae.reason);
2877 _exit(EXIT_FAILURE);
2878 }
2879
2880 uint64_t getLatencyCount(const std::string&)
2881 {
2882 return g_stats.responses + g_stats.selfAnswered + g_stats.cacheHits;
2883 }