]> git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/dnsdist.cc
dnsdist: Set a correct EDNS OPT RR for self-generated answers
[thirdparty/pdns.git] / pdns / dnsdist.cc
1 /*
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
8 *
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
22
23 #include "config.h"
24
25 #include <fstream>
26 #include <getopt.h>
27 #include <grp.h>
28 #include <limits>
29 #include <netinet/tcp.h>
30 #include <pwd.h>
31 #include <sys/resource.h>
32 #include <unistd.h>
33
34 #if defined (__OpenBSD__) || defined(__NetBSD__)
35 #include <readline/readline.h>
36 #else
37 #include <editline/readline.h>
38 #endif
39
40 #ifdef HAVE_SYSTEMD
41 #include <systemd/sd-daemon.h>
42 #endif
43
44 #include "dnsdist.hh"
45 #include "dnsdist-cache.hh"
46 #include "dnsdist-console.hh"
47 #include "dnsdist-ecs.hh"
48 #include "dnsdist-lua.hh"
49 #include "dnsdist-rings.hh"
50
51 #include "base64.hh"
52 #include "delaypipe.hh"
53 #include "dolog.hh"
54 #include "dnsname.hh"
55 #include "dnswriter.hh"
56 #include "ednsoptions.hh"
57 #include "gettime.hh"
58 #include "lock.hh"
59 #include "misc.hh"
60 #include "sodcrypto.hh"
61 #include "sstuff.hh"
62 #include "xpf.hh"
63
64 thread_local boost::uuids::random_generator t_uuidGenerator;
65
66 /* Known sins:
67
68 Receiver is currently single threaded
69 not *that* bad actually, but now that we are thread safe, might want to scale
70 */
71
72 /* the Rulaction plan
73 Set of Rules, if one matches, it leads to an Action
74 Both rules and actions could conceivably be Lua based.
75 On the C++ side, both could be inherited from a class Rule and a class Action,
76 on the Lua side we can't do that. */
77
78 using std::atomic;
79 using std::thread;
80 bool g_verbose;
81
82 struct DNSDistStats g_stats;
83 MetricDefinitionStorage g_metricDefinitions;
84
85 uint16_t g_maxOutstanding{10240};
86 bool g_verboseHealthChecks{false};
87 uint32_t g_staleCacheEntriesTTL{0};
88 bool g_syslog{true};
89
90 GlobalStateHolder<NetmaskGroup> g_ACL;
91 string g_outputBuffer;
92
93 vector<std::tuple<ComboAddress, bool, bool, int, string, std::set<int>>> g_locals;
94 std::vector<std::shared_ptr<TLSFrontend>> g_tlslocals;
95 #ifdef HAVE_DNSCRYPT
96 std::vector<std::tuple<ComboAddress,std::shared_ptr<DNSCryptContext>,bool, int, string, std::set<int> >> g_dnsCryptLocals;
97 #endif
98 #ifdef HAVE_EBPF
99 shared_ptr<BPFFilter> g_defaultBPFFilter;
100 std::vector<std::shared_ptr<DynBPFFilter> > g_dynBPFFilters;
101 #endif /* HAVE_EBPF */
102 vector<ClientState *> g_frontends;
103 GlobalStateHolder<pools_t> g_pools;
104 size_t g_udpVectorSize{1};
105
106 bool g_snmpEnabled{false};
107 bool g_snmpTrapsEnabled{false};
108 DNSDistSNMPAgent* g_snmpAgent{nullptr};
109
110 /* UDP: the grand design. Per socket we listen on for incoming queries there is one thread.
111 Then we have a bunch of connected sockets for talking to downstream servers.
112 We send directly to those sockets.
113
114 For the return path, per downstream server we have a thread that listens to responses.
115
116 Per socket there is an array of 2^16 states, when we send out a packet downstream, we note
117 there the original requestor and the original id. The new ID is the offset in the array.
118
119 When an answer comes in on a socket, we look up the offset by the id, and lob it to the
120 original requestor.
121
122 IDs are assigned by atomic increments of the socket offset.
123 */
124
125 GlobalStateHolder<vector<DNSDistRuleAction> > g_rulactions;
126 GlobalStateHolder<vector<DNSDistResponseRuleAction> > g_resprulactions;
127 GlobalStateHolder<vector<DNSDistResponseRuleAction> > g_cachehitresprulactions;
128 GlobalStateHolder<vector<DNSDistResponseRuleAction> > g_selfansweredresprulactions;
129
130 Rings g_rings;
131 QueryCount g_qcount;
132
133 GlobalStateHolder<servers_t> g_dstates;
134 GlobalStateHolder<NetmaskTree<DynBlock>> g_dynblockNMG;
135 GlobalStateHolder<SuffixMatchTree<DynBlock>> g_dynblockSMT;
136 DNSAction::Action g_dynBlockAction = DNSAction::Action::Drop;
137 int g_tcpRecvTimeout{2};
138 int g_tcpSendTimeout{2};
139 int g_udpTimeout{2};
140
141 bool g_servFailOnNoPolicy{false};
142 bool g_truncateTC{false};
143 bool g_fixupCase{0};
144
145 static void truncateTC(char* packet, uint16_t* len, unsigned int consumed)
146 try
147 {
148 *len=(uint16_t) (sizeof(dnsheader)+consumed+DNS_TYPE_SIZE+DNS_CLASS_SIZE);
149 struct dnsheader* dh = reinterpret_cast<struct dnsheader*>(packet);
150 dh->ancount = dh->arcount = dh->nscount = 0;
151 }
152 catch(...)
153 {
154 g_stats.truncFail++;
155 }
156
157 struct DelayedPacket
158 {
159 int fd;
160 string packet;
161 ComboAddress destination;
162 ComboAddress origDest;
163 void operator()()
164 {
165 ssize_t res;
166 if(origDest.sin4.sin_family == 0) {
167 res = sendto(fd, packet.c_str(), packet.size(), 0, (struct sockaddr*)&destination, destination.getSocklen());
168 }
169 else {
170 res = sendfromto(fd, packet.c_str(), packet.size(), 0, origDest, destination);
171 }
172 if (res == -1) {
173 int err = errno;
174 vinfolog("Error sending delayed response to %s: %s", destination.toStringWithPort(), strerror(err));
175 }
176 }
177 };
178
179 DelayPipe<DelayedPacket> * g_delay = 0;
180
181 void doLatencyStats(double udiff)
182 {
183 if(udiff < 1000) g_stats.latency0_1++;
184 else if(udiff < 10000) g_stats.latency1_10++;
185 else if(udiff < 50000) g_stats.latency10_50++;
186 else if(udiff < 100000) g_stats.latency50_100++;
187 else if(udiff < 1000000) g_stats.latency100_1000++;
188 else g_stats.latencySlow++;
189
190 auto doAvg = [](double& var, double n, double weight) {
191 var = (weight -1) * var/weight + n/weight;
192 };
193
194 doAvg(g_stats.latencyAvg100, udiff, 100);
195 doAvg(g_stats.latencyAvg1000, udiff, 1000);
196 doAvg(g_stats.latencyAvg10000, udiff, 10000);
197 doAvg(g_stats.latencyAvg1000000, udiff, 1000000);
198 }
199
200 bool responseContentMatches(const char* response, const uint16_t responseLen, const DNSName& qname, const uint16_t qtype, const uint16_t qclass, const ComboAddress& remote, unsigned int& consumed)
201 {
202 uint16_t rqtype, rqclass;
203 DNSName rqname;
204 const struct dnsheader* dh = (struct dnsheader*) response;
205
206 if (responseLen < sizeof(dnsheader)) {
207 return false;
208 }
209
210 if (dh->qdcount == 0) {
211 if (dh->rcode != RCode::NoError && dh->rcode != RCode::NXDomain) {
212 return true;
213 }
214 else {
215 g_stats.nonCompliantResponses++;
216 return false;
217 }
218 }
219
220 try {
221 rqname=DNSName(response, responseLen, sizeof(dnsheader), false, &rqtype, &rqclass, &consumed);
222 }
223 catch(std::exception& e) {
224 if(responseLen > (ssize_t)sizeof(dnsheader))
225 infolog("Backend %s sent us a response with id %d that did not parse: %s", remote.toStringWithPort(), ntohs(dh->id), e.what());
226 g_stats.nonCompliantResponses++;
227 return false;
228 }
229
230 if (rqtype != qtype || rqclass != qclass || rqname != qname) {
231 return false;
232 }
233
234 return true;
235 }
236
237 void restoreFlags(struct dnsheader* dh, uint16_t origFlags)
238 {
239 static const uint16_t rdMask = 1 << FLAGS_RD_OFFSET;
240 static const uint16_t cdMask = 1 << FLAGS_CD_OFFSET;
241 static const uint16_t restoreFlagsMask = UINT16_MAX & ~(rdMask | cdMask);
242 uint16_t * flags = getFlagsFromDNSHeader(dh);
243 /* clear the flags we are about to restore */
244 *flags &= restoreFlagsMask;
245 /* only keep the flags we want to restore */
246 origFlags &= ~restoreFlagsMask;
247 /* set the saved flags as they were */
248 *flags |= origFlags;
249 }
250
251 bool fixUpQueryTurnedResponse(DNSQuestion& dq, const uint16_t origFlags)
252 {
253 restoreFlags(dq.dh, origFlags);
254
255 return addEDNSToQueryTurnedResponse(dq);
256 }
257
258 bool fixUpResponse(char** response, uint16_t* responseLen, size_t* responseSize, const DNSName& qname, uint16_t origFlags, bool ednsAdded, bool ecsAdded, std::vector<uint8_t>& rewrittenResponse, uint16_t addRoom)
259 {
260 struct dnsheader* dh = (struct dnsheader*) *response;
261
262 if (*responseLen < sizeof(dnsheader)) {
263 return false;
264 }
265
266 restoreFlags(dh, origFlags);
267
268 if (*responseLen == sizeof(dnsheader)) {
269 return true;
270 }
271
272 if(g_fixupCase) {
273 string realname = qname.toDNSString();
274 if (*responseLen >= (sizeof(dnsheader) + realname.length())) {
275 memcpy(*response + sizeof(dnsheader), realname.c_str(), realname.length());
276 }
277 }
278
279 if (ednsAdded || ecsAdded) {
280 uint16_t optStart;
281 size_t optLen = 0;
282 bool last = false;
283
284 const std::string responseStr(*response, *responseLen);
285 int res = locateEDNSOptRR(responseStr, &optStart, &optLen, &last);
286
287 if (res == 0) {
288 if (ednsAdded) {
289 /* we added the entire OPT RR,
290 therefore we need to remove it entirely */
291 if (last) {
292 /* simply remove the last AR */
293 *responseLen -= optLen;
294 uint16_t arcount = ntohs(dh->arcount);
295 arcount--;
296 dh->arcount = htons(arcount);
297 }
298 else {
299 /* Removing an intermediary RR could lead to compression error */
300 if (rewriteResponseWithoutEDNS(responseStr, rewrittenResponse) == 0) {
301 *responseLen = rewrittenResponse.size();
302 if (addRoom && (UINT16_MAX - *responseLen) > addRoom) {
303 rewrittenResponse.reserve(*responseLen + addRoom);
304 }
305 *responseSize = rewrittenResponse.capacity();
306 *response = reinterpret_cast<char*>(rewrittenResponse.data());
307 }
308 else {
309 warnlog("Error rewriting content");
310 }
311 }
312 }
313 else {
314 /* the OPT RR was already present, but without ECS,
315 we need to remove the ECS option if any */
316 if (last) {
317 /* nothing after the OPT RR, we can simply remove the
318 ECS option */
319 size_t existingOptLen = optLen;
320 removeEDNSOptionFromOPT(*response + optStart, &optLen, EDNSOptionCode::ECS);
321 *responseLen -= (existingOptLen - optLen);
322 }
323 else {
324 /* Removing an intermediary RR could lead to compression error */
325 if (rewriteResponseWithoutEDNSOption(responseStr, EDNSOptionCode::ECS, rewrittenResponse) == 0) {
326 *responseLen = rewrittenResponse.size();
327 if (addRoom && (UINT16_MAX - *responseLen) > addRoom) {
328 rewrittenResponse.reserve(*responseLen + addRoom);
329 }
330 *responseSize = rewrittenResponse.capacity();
331 *response = reinterpret_cast<char*>(rewrittenResponse.data());
332 }
333 else {
334 warnlog("Error rewriting content");
335 }
336 }
337 }
338 }
339 }
340
341 return true;
342 }
343
344 #ifdef HAVE_DNSCRYPT
345 bool encryptResponse(char* response, uint16_t* responseLen, size_t responseSize, bool tcp, std::shared_ptr<DNSCryptQuery> dnsCryptQuery, dnsheader** dh, dnsheader* dhCopy)
346 {
347 if (dnsCryptQuery) {
348 uint16_t encryptedResponseLen = 0;
349
350 /* save the original header before encrypting it in place */
351 if (dh != nullptr && *dh != nullptr && dhCopy != nullptr) {
352 memcpy(dhCopy, *dh, sizeof(dnsheader));
353 *dh = dhCopy;
354 }
355
356 int res = dnsCryptQuery->encryptResponse(response, *responseLen, responseSize, tcp, &encryptedResponseLen);
357 if (res == 0) {
358 *responseLen = encryptedResponseLen;
359 } else {
360 /* dropping response */
361 vinfolog("Error encrypting the response, dropping.");
362 return false;
363 }
364 }
365 return true;
366 }
367 #endif
368
369 static bool sendUDPResponse(int origFD, char* response, uint16_t responseLen, int delayMsec, const ComboAddress& origDest, const ComboAddress& origRemote)
370 {
371 if(delayMsec && g_delay) {
372 DelayedPacket dp{origFD, string(response,responseLen), origRemote, origDest};
373 g_delay->submit(dp, delayMsec);
374 }
375 else {
376 ssize_t res;
377 if(origDest.sin4.sin_family == 0) {
378 res = sendto(origFD, response, responseLen, 0, (struct sockaddr*)&origRemote, origRemote.getSocklen());
379 }
380 else {
381 res = sendfromto(origFD, response, responseLen, 0, origDest, origRemote);
382 }
383 if (res == -1) {
384 int err = errno;
385 vinfolog("Error sending response to %s: %s", origRemote.toStringWithPort(), strerror(err));
386 }
387 }
388
389 return true;
390 }
391
392
393 static int pickBackendSocketForSending(DownstreamState* state)
394 {
395 return state->sockets[state->socketsOffset++ % state->sockets.size()];
396 }
397
398 static void pickBackendSocketsReadyForReceiving(const std::shared_ptr<DownstreamState>& state, std::vector<int>& ready)
399 {
400 ready.clear();
401
402 if (state->sockets.size() == 1) {
403 ready.push_back(state->sockets[0]);
404 return ;
405 }
406
407 {
408 std::lock_guard<std::mutex> lock(state->socketsLock);
409 state->mplexer->getAvailableFDs(ready, -1);
410 }
411 }
412
413 // listens on a dedicated socket, lobs answers from downstream servers to original requestors
414 void* responderThread(std::shared_ptr<DownstreamState> dss)
415 try {
416 auto localRespRulactions = g_resprulactions.getLocal();
417 #ifdef HAVE_DNSCRYPT
418 char packet[4096 + DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE];
419 /* when the answer is encrypted in place, we need to get a copy
420 of the original header before encryption to fill the ring buffer */
421 dnsheader dhCopy;
422 #else
423 char packet[4096];
424 #endif
425 static_assert(sizeof(packet) <= UINT16_MAX, "Packet size should fit in a uint16_t");
426 vector<uint8_t> rewrittenResponse;
427
428 uint16_t queryId = 0;
429 std::vector<int> sockets;
430 sockets.reserve(dss->sockets.size());
431
432 for(;;) {
433 dnsheader* dh = reinterpret_cast<struct dnsheader*>(packet);
434 try {
435 pickBackendSocketsReadyForReceiving(dss, sockets);
436 for (const auto& fd : sockets) {
437 ssize_t got = recv(fd, packet, sizeof(packet), 0);
438 char * response = packet;
439 size_t responseSize = sizeof(packet);
440
441 if (got < (ssize_t) sizeof(dnsheader))
442 continue;
443
444 uint16_t responseLen = (uint16_t) got;
445 queryId = dh->id;
446
447 if(queryId >= dss->idStates.size())
448 continue;
449
450 IDState* ids = &dss->idStates[queryId];
451 int origFD = ids->origFD;
452
453 if(origFD < 0) // duplicate
454 continue;
455
456 /* setting age to 0 to prevent the maintainer thread from
457 cleaning this IDS while we process the response.
458 We have already a copy of the origFD, so it would
459 mostly mess up the outstanding counter.
460 */
461 ids->age = 0;
462
463 unsigned int consumed = 0;
464 if (!responseContentMatches(response, responseLen, ids->qname, ids->qtype, ids->qclass, dss->remote, consumed)) {
465 continue;
466 }
467
468 int oldFD = ids->origFD.exchange(-1);
469 if (oldFD == origFD) {
470 /* we only decrement the outstanding counter if the value was not
471 altered in the meantime, which would mean that the state has been actively reused
472 and the other thread has not incremented the outstanding counter, so we don't
473 want it to be decremented twice. */
474 --dss->outstanding; // you'd think an attacker could game this, but we're using connected socket
475 }
476
477 if(dh->tc && g_truncateTC) {
478 truncateTC(response, &responseLen, consumed);
479 }
480
481 dh->id = ids->origID;
482
483 uint16_t addRoom = 0;
484 DNSResponse dr(&ids->qname, ids->qtype, ids->qclass, consumed, &ids->origDest, &ids->origRemote, dh, sizeof(packet), responseLen, false, &ids->sentTime.d_start);
485 #ifdef HAVE_PROTOBUF
486 dr.uniqueId = ids->uniqueId;
487 #endif
488 dr.qTag = ids->qTag;
489
490 if (!processResponse(localRespRulactions, dr, &ids->delayMsec)) {
491 continue;
492 }
493
494 #ifdef HAVE_DNSCRYPT
495 if (ids->dnsCryptQuery) {
496 addRoom = DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE;
497 }
498 #endif
499 if (!fixUpResponse(&response, &responseLen, &responseSize, ids->qname, ids->origFlags, ids->ednsAdded, ids->ecsAdded, rewrittenResponse, addRoom)) {
500 continue;
501 }
502
503 if (ids->packetCache && !ids->skipCache) {
504 ids->packetCache->insert(ids->cacheKey, ids->subnet, ids->origFlags, ids->qname, ids->qtype, ids->qclass, response, responseLen, false, dh->rcode, ids->tempFailureTTL);
505 }
506
507 if (ids->cs && !ids->cs->muted) {
508 #ifdef HAVE_DNSCRYPT
509 if (!encryptResponse(response, &responseLen, responseSize, false, ids->dnsCryptQuery, &dh, &dhCopy)) {
510 continue;
511 }
512 #endif
513
514 ComboAddress empty;
515 empty.sin4.sin_family = 0;
516 /* if ids->destHarvested is false, origDest holds the listening address.
517 We don't want to use that as a source since it could be 0.0.0.0 for example. */
518 sendUDPResponse(origFD, response, responseLen, ids->delayMsec, ids->destHarvested ? ids->origDest : empty, ids->origRemote);
519 }
520
521 g_stats.responses++;
522
523 double udiff = ids->sentTime.udiff();
524 vinfolog("Got answer from %s, relayed to %s, took %f usec", dss->remote.toStringWithPort(), ids->origRemote.toStringWithPort(), udiff);
525
526 struct timespec ts;
527 gettime(&ts);
528 g_rings.insertResponse(ts, ids->origRemote, ids->qname, ids->qtype, (unsigned int)udiff, (unsigned int)got, *dh, dss->remote);
529
530 if(dh->rcode == RCode::ServFail)
531 g_stats.servfailResponses++;
532 dss->latencyUsec = (127.0 * dss->latencyUsec / 128.0) + udiff/128.0;
533
534 doLatencyStats(udiff);
535
536 /* if the FD is not -1, the state has been actively reused and we should
537 not alter anything */
538 if (ids->origFD == -1) {
539 #ifdef HAVE_DNSCRYPT
540 ids->dnsCryptQuery = nullptr;
541 #endif
542 }
543
544 rewrittenResponse.clear();
545 }
546 }
547 catch(const std::exception& e){
548 vinfolog("Got an error in UDP responder thread while parsing a response from %s, id %d: %s", dss->remote.toStringWithPort(), queryId, e.what());
549 }
550 }
551 return 0;
552 }
553 catch(const std::exception& e)
554 {
555 errlog("UDP responder thread died because of exception: %s", e.what());
556 return 0;
557 }
558 catch(const PDNSException& e)
559 {
560 errlog("UDP responder thread died because of PowerDNS exception: %s", e.reason);
561 return 0;
562 }
563 catch(...)
564 {
565 errlog("UDP responder thread died because of an exception: %s", "unknown");
566 return 0;
567 }
568
569 bool DownstreamState::reconnect()
570 {
571 std::unique_lock<std::mutex> tl(connectLock, std::try_to_lock);
572 if (!tl.owns_lock()) {
573 /* we are already reconnecting */
574 return false;
575 }
576
577 connected = false;
578 for (auto& fd : sockets) {
579 if (fd != -1) {
580 if (sockets.size() > 1) {
581 std::lock_guard<std::mutex> lock(socketsLock);
582 mplexer->removeReadFD(fd);
583 }
584 /* shutdown() is needed to wake up recv() in the responderThread */
585 shutdown(fd, SHUT_RDWR);
586 close(fd);
587 fd = -1;
588 }
589 if (!IsAnyAddress(remote)) {
590 fd = SSocket(remote.sin4.sin_family, SOCK_DGRAM, 0);
591 if (!IsAnyAddress(sourceAddr)) {
592 SSetsockopt(fd, SOL_SOCKET, SO_REUSEADDR, 1);
593 SBind(fd, sourceAddr);
594 }
595 try {
596 SConnect(fd, remote);
597 if (sockets.size() > 1) {
598 std::lock_guard<std::mutex> lock(socketsLock);
599 mplexer->addReadFD(fd, [](int, boost::any) {});
600 }
601 connected = true;
602 }
603 catch(const std::runtime_error& error) {
604 infolog("Error connecting to new server with address %s: %s", remote.toStringWithPort(), error.what());
605 connected = false;
606 break;
607 }
608 }
609 }
610
611 /* if at least one (re-)connection failed, close all sockets */
612 if (!connected) {
613 for (auto& fd : sockets) {
614 if (fd != -1) {
615 if (sockets.size() > 1) {
616 std::lock_guard<std::mutex> lock(socketsLock);
617 mplexer->removeReadFD(fd);
618 }
619 /* shutdown() is needed to wake up recv() in the responderThread */
620 shutdown(fd, SHUT_RDWR);
621 close(fd);
622 fd = -1;
623 }
624 }
625 }
626
627 return connected;
628 }
629 void DownstreamState::hash()
630 {
631 vinfolog("Computing hashes for id=%s and weight=%d", id, weight);
632 auto w = weight;
633 WriteLock wl(&d_lock);
634 hashes.clear();
635 while (w > 0) {
636 std::string uuid = boost::str(boost::format("%s-%d") % id % w);
637 unsigned int wshash = burtleCI((const unsigned char*)uuid.c_str(), uuid.size(), g_hashperturb);
638 hashes.insert(wshash);
639 --w;
640 }
641 }
642
643 void DownstreamState::setId(const boost::uuids::uuid& newId)
644 {
645 id = newId;
646 // compute hashes only if already done
647 if (!hashes.empty()) {
648 hash();
649 }
650 }
651
652 void DownstreamState::setWeight(int newWeight)
653 {
654 if (newWeight < 1) {
655 errlog("Error setting server's weight: downstream weight value must be greater than 0.");
656 return ;
657 }
658 weight = newWeight;
659 if (!hashes.empty()) {
660 hash();
661 }
662 }
663
664 DownstreamState::DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf_, size_t numberOfSockets): remote(remote_), sourceAddr(sourceAddr_), sourceItf(sourceItf_)
665 {
666 pthread_rwlock_init(&d_lock, nullptr);
667 id = t_uuidGenerator();
668 threadStarted.clear();
669
670 mplexer = std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
671
672 sockets.resize(numberOfSockets);
673 for (auto& fd : sockets) {
674 fd = -1;
675 }
676
677 if (!IsAnyAddress(remote)) {
678 reconnect();
679 idStates.resize(g_maxOutstanding);
680 sw.start();
681 infolog("Added downstream server %s", remote.toStringWithPort());
682 }
683
684 }
685
686 std::mutex g_luamutex;
687 LuaContext g_lua;
688
689 GlobalStateHolder<ServerPolicy> g_policy;
690
691 shared_ptr<DownstreamState> firstAvailable(const NumberedServerVector& servers, const DNSQuestion* dq)
692 {
693 for(auto& d : servers) {
694 if(d.second->isUp() && d.second->qps.check())
695 return d.second;
696 }
697 return leastOutstanding(servers, dq);
698 }
699
700 // get server with least outstanding queries, and within those, with the lowest order, and within those: the fastest
701 shared_ptr<DownstreamState> leastOutstanding(const NumberedServerVector& servers, const DNSQuestion* dq)
702 {
703 if (servers.size() == 1 && servers[0].second->isUp()) {
704 return servers[0].second;
705 }
706
707 vector<pair<tuple<int,int,double>, shared_ptr<DownstreamState>>> poss;
708 /* so you might wonder, why do we go through this trouble? The data on which we sort could change during the sort,
709 which would suck royally and could even lead to crashes. So first we snapshot on what we sort, and then we sort */
710 poss.reserve(servers.size());
711 for(auto& d : servers) {
712 if(d.second->isUp()) {
713 poss.push_back({make_tuple(d.second->outstanding.load(), d.second->order, d.second->latencyUsec), d.second});
714 }
715 }
716 if(poss.empty())
717 return shared_ptr<DownstreamState>();
718 nth_element(poss.begin(), poss.begin(), poss.end(), [](const decltype(poss)::value_type& a, const decltype(poss)::value_type& b) { return a.first < b.first; });
719 return poss.begin()->second;
720 }
721
722 shared_ptr<DownstreamState> valrandom(unsigned int val, const NumberedServerVector& servers, const DNSQuestion* dq)
723 {
724 vector<pair<int, shared_ptr<DownstreamState>>> poss;
725 int sum = 0;
726 int max = std::numeric_limits<int>::max();
727
728 for(auto& d : servers) { // w=1, w=10 -> 1, 11
729 if(d.second->isUp()) {
730 // Don't overflow sum when adding high weights
731 if(d.second->weight > max - sum) {
732 sum = max;
733 } else {
734 sum += d.second->weight;
735 }
736
737 poss.push_back({sum, d.second});
738 }
739 }
740
741 // Catch poss & sum are empty to avoid SIGFPE
742 if(poss.empty())
743 return shared_ptr<DownstreamState>();
744
745 int r = val % sum;
746 auto p = upper_bound(poss.begin(), poss.end(),r, [](int r_, const decltype(poss)::value_type& a) { return r_ < a.first;});
747 if(p==poss.end())
748 return shared_ptr<DownstreamState>();
749 return p->second;
750 }
751
752 shared_ptr<DownstreamState> wrandom(const NumberedServerVector& servers, const DNSQuestion* dq)
753 {
754 return valrandom(random(), servers, dq);
755 }
756
757 uint32_t g_hashperturb;
758 shared_ptr<DownstreamState> whashed(const NumberedServerVector& servers, const DNSQuestion* dq)
759 {
760 return valrandom(dq->qname->hash(g_hashperturb), servers, dq);
761 }
762
763 shared_ptr<DownstreamState> chashed(const NumberedServerVector& servers, const DNSQuestion* dq)
764 {
765 std::map<unsigned int, shared_ptr<DownstreamState>> circle = {};
766 unsigned int qhash = dq->qname->hash(g_hashperturb);
767 unsigned int sel = 0, max = 0;
768 shared_ptr<DownstreamState> ret = nullptr, last = nullptr;
769
770 for (const auto& d: servers) {
771 if (d.second->isUp()) {
772 // make sure hashes have been computed
773 if (d.second->hashes.empty()) {
774 d.second->hash();
775 }
776 {
777 ReadLock rl(&(d.second->d_lock));
778 const auto& server = d.second;
779 // we want to keep track of the last hash
780 if (max < *(server->hashes.rbegin())) {
781 max = *(server->hashes.rbegin());
782 last = server;
783 }
784 auto hash_it = server->hashes.begin();
785 while (hash_it != server->hashes.end()
786 && *hash_it < qhash) {
787 if (*hash_it > sel) {
788 sel = *hash_it;
789 ret = server;
790 }
791 ++hash_it;
792 }
793 }
794 }
795 }
796 if (ret != nullptr) {
797 return ret;
798 }
799 if (last != nullptr) {
800 return last;
801 }
802 return shared_ptr<DownstreamState>();
803 }
804
805 shared_ptr<DownstreamState> roundrobin(const NumberedServerVector& servers, const DNSQuestion* dq)
806 {
807 NumberedServerVector poss;
808
809 for(auto& d : servers) {
810 if(d.second->isUp()) {
811 poss.push_back(d);
812 }
813 }
814
815 const auto *res=&poss;
816 if(poss.empty())
817 res = &servers;
818
819 if(res->empty())
820 return shared_ptr<DownstreamState>();
821
822 static unsigned int counter;
823
824 return (*res)[(counter++) % res->size()].second;
825 }
826
827 ComboAddress g_serverControl{"127.0.0.1:5199"};
828
829 std::shared_ptr<ServerPool> createPoolIfNotExists(pools_t& pools, const string& poolName)
830 {
831 std::shared_ptr<ServerPool> pool;
832 pools_t::iterator it = pools.find(poolName);
833 if (it != pools.end()) {
834 pool = it->second;
835 }
836 else {
837 if (!poolName.empty())
838 vinfolog("Creating pool %s", poolName);
839 pool = std::make_shared<ServerPool>();
840 pools.insert(std::pair<std::string,std::shared_ptr<ServerPool> >(poolName, pool));
841 }
842 return pool;
843 }
844
845 void setPoolPolicy(pools_t& pools, const string& poolName, std::shared_ptr<ServerPolicy> policy)
846 {
847 std::shared_ptr<ServerPool> pool = createPoolIfNotExists(pools, poolName);
848 if (!poolName.empty()) {
849 vinfolog("Setting pool %s server selection policy to %s", poolName, policy->name);
850 } else {
851 vinfolog("Setting default pool server selection policy to %s", policy->name);
852 }
853 pool->policy = policy;
854 }
855
856 void addServerToPool(pools_t& pools, const string& poolName, std::shared_ptr<DownstreamState> server)
857 {
858 std::shared_ptr<ServerPool> pool = createPoolIfNotExists(pools, poolName);
859 if (!poolName.empty()) {
860 vinfolog("Adding server to pool %s", poolName);
861 } else {
862 vinfolog("Adding server to default pool");
863 }
864 pool->addServer(server);
865 }
866
867 void removeServerFromPool(pools_t& pools, const string& poolName, std::shared_ptr<DownstreamState> server)
868 {
869 std::shared_ptr<ServerPool> pool = getPool(pools, poolName);
870
871 if (!poolName.empty()) {
872 vinfolog("Removing server from pool %s", poolName);
873 }
874 else {
875 vinfolog("Removing server from default pool");
876 }
877
878 pool->removeServer(server);
879 }
880
881 std::shared_ptr<ServerPool> getPool(const pools_t& pools, const std::string& poolName)
882 {
883 pools_t::const_iterator it = pools.find(poolName);
884
885 if (it == pools.end()) {
886 throw std::out_of_range("No pool named " + poolName);
887 }
888
889 return it->second;
890 }
891
892 NumberedServerVector getDownstreamCandidates(const pools_t& pools, const std::string& poolName)
893 {
894 std::shared_ptr<ServerPool> pool = getPool(pools, poolName);
895 return pool->getServers();
896 }
897
898 static void spoofResponseFromString(DNSQuestion& dq, const string& spoofContent)
899 {
900 string result;
901
902 std::vector<std::string> addrs;
903 stringtok(addrs, spoofContent, " ,");
904
905 if (addrs.size() == 1) {
906 try {
907 ComboAddress spoofAddr(spoofContent);
908 SpoofAction sa({spoofAddr});
909 sa(&dq, &result);
910 }
911 catch(const PDNSException &e) {
912 SpoofAction sa(spoofContent); // CNAME then
913 sa(&dq, &result);
914 }
915 } else {
916 std::vector<ComboAddress> cas;
917 for (const auto& addr : addrs) {
918 try {
919 cas.push_back(ComboAddress(addr));
920 }
921 catch (...) {
922 }
923 }
924 SpoofAction sa(cas);
925 sa(&dq, &result);
926 }
927 }
928
929 bool processQuery(LocalHolders& holders, DNSQuestion& dq, string& poolname, int* delayMsec, const struct timespec& now)
930 {
931 g_rings.insertQuery(now,*dq.remote,*dq.qname,dq.qtype,dq.len,*dq.dh);
932
933 if(g_qcount.enabled) {
934 string qname = (*dq.qname).toString(".");
935 bool countQuery{true};
936 if(g_qcount.filter) {
937 std::lock_guard<std::mutex> lock(g_luamutex);
938 std::tie (countQuery, qname) = g_qcount.filter(dq);
939 }
940
941 if(countQuery) {
942 WriteLock wl(&g_qcount.queryLock);
943 if(!g_qcount.records.count(qname)) {
944 g_qcount.records[qname] = 0;
945 }
946 g_qcount.records[qname]++;
947 }
948 }
949
950 if(auto got = holders.dynNMGBlock->lookup(*dq.remote)) {
951 auto updateBlockStats = [&got]() {
952 g_stats.dynBlocked++;
953 got->second.blocks++;
954 };
955
956 if(now < got->second.until) {
957 DNSAction::Action action = got->second.action;
958 if (action == DNSAction::Action::None) {
959 action = g_dynBlockAction;
960 }
961 switch (action) {
962 case DNSAction::Action::NoOp:
963 /* do nothing */
964 break;
965 case DNSAction::Action::Refused:
966 vinfolog("Query from %s refused because of dynamic block", dq.remote->toStringWithPort());
967 updateBlockStats();
968
969 dq.dh->rcode = RCode::Refused;
970 dq.dh->qr=true;
971 return true;
972
973 case DNSAction::Action::Truncate:
974 if(!dq.tcp) {
975 updateBlockStats();
976 vinfolog("Query from %s truncated because of dynamic block", dq.remote->toStringWithPort());
977 dq.dh->tc = true;
978 dq.dh->qr = true;
979 return true;
980 }
981 else {
982 vinfolog("Query from %s for %s over TCP *not* truncated because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toString());
983 }
984 break;
985 default:
986 updateBlockStats();
987 vinfolog("Query from %s dropped because of dynamic block", dq.remote->toStringWithPort());
988 return false;
989 }
990 }
991 }
992
993 if(auto got = holders.dynSMTBlock->lookup(*dq.qname)) {
994 auto updateBlockStats = [&got]() {
995 g_stats.dynBlocked++;
996 got->blocks++;
997 };
998
999 if(now < got->until) {
1000 DNSAction::Action action = got->action;
1001 if (action == DNSAction::Action::None) {
1002 action = g_dynBlockAction;
1003 }
1004 switch (action) {
1005 case DNSAction::Action::NoOp:
1006 /* do nothing */
1007 break;
1008 case DNSAction::Action::Refused:
1009 vinfolog("Query from %s for %s refused because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toString());
1010 updateBlockStats();
1011
1012 dq.dh->rcode = RCode::Refused;
1013 dq.dh->qr=true;
1014 return true;
1015 case DNSAction::Action::Truncate:
1016 if(!dq.tcp) {
1017 updateBlockStats();
1018
1019 vinfolog("Query from %s for %s truncated because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toString());
1020 dq.dh->tc = true;
1021 dq.dh->qr = true;
1022 return true;
1023 }
1024 else {
1025 vinfolog("Query from %s for %s over TCP *not* truncated because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toString());
1026 }
1027 break;
1028 default:
1029 updateBlockStats();
1030 vinfolog("Query from %s for %s dropped because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toString());
1031 return false;
1032 }
1033 }
1034 }
1035
1036 DNSAction::Action action=DNSAction::Action::None;
1037 string ruleresult;
1038 for(const auto& lr : *holders.rulactions) {
1039 if(lr.d_rule->matches(&dq)) {
1040 lr.d_rule->d_matches++;
1041 action=(*lr.d_action)(&dq, &ruleresult);
1042
1043 switch(action) {
1044 case DNSAction::Action::Allow:
1045 return true;
1046 break;
1047 case DNSAction::Action::Drop:
1048 g_stats.ruleDrop++;
1049 return false;
1050 break;
1051 case DNSAction::Action::Nxdomain:
1052 dq.dh->rcode = RCode::NXDomain;
1053 dq.dh->qr=true;
1054 g_stats.ruleNXDomain++;
1055 return true;
1056 break;
1057 case DNSAction::Action::Refused:
1058 dq.dh->rcode = RCode::Refused;
1059 dq.dh->qr=true;
1060 g_stats.ruleRefused++;
1061 return true;
1062 break;
1063 case DNSAction::Action::ServFail:
1064 dq.dh->rcode = RCode::ServFail;
1065 dq.dh->qr=true;
1066 g_stats.ruleServFail++;
1067 return true;
1068 break;
1069 case DNSAction::Action::Spoof:
1070 spoofResponseFromString(dq, ruleresult);
1071 return true;
1072 break;
1073 case DNSAction::Action::Truncate:
1074 dq.dh->tc = true;
1075 dq.dh->qr = true;
1076 return true;
1077 break;
1078 case DNSAction::Action::HeaderModify:
1079 return true;
1080 break;
1081 case DNSAction::Action::Pool:
1082 poolname=ruleresult;
1083 return true;
1084 break;
1085 /* non-terminal actions follow */
1086 case DNSAction::Action::Delay:
1087 *delayMsec = static_cast<int>(pdns_stou(ruleresult)); // sorry
1088 break;
1089 case DNSAction::Action::None:
1090 /* fall-through */
1091 case DNSAction::Action::NoOp:
1092 break;
1093 }
1094 }
1095 }
1096
1097 return true;
1098 }
1099
1100 bool processResponse(LocalStateHolder<vector<DNSDistResponseRuleAction> >& localRespRulactions, DNSResponse& dr, int* delayMsec)
1101 {
1102 DNSResponseAction::Action action=DNSResponseAction::Action::None;
1103 std::string ruleresult;
1104 for(const auto& lr : *localRespRulactions) {
1105 if(lr.d_rule->matches(&dr)) {
1106 lr.d_rule->d_matches++;
1107 action=(*lr.d_action)(&dr, &ruleresult);
1108 switch(action) {
1109 case DNSResponseAction::Action::Allow:
1110 return true;
1111 break;
1112 case DNSResponseAction::Action::Drop:
1113 return false;
1114 break;
1115 case DNSResponseAction::Action::HeaderModify:
1116 return true;
1117 break;
1118 case DNSResponseAction::Action::ServFail:
1119 dr.dh->rcode = RCode::ServFail;
1120 return true;
1121 break;
1122 /* non-terminal actions follow */
1123 case DNSResponseAction::Action::Delay:
1124 *delayMsec = static_cast<int>(pdns_stou(ruleresult)); // sorry
1125 break;
1126 case DNSResponseAction::Action::None:
1127 break;
1128 }
1129 }
1130 }
1131
1132 return true;
1133 }
1134
1135 static ssize_t udpClientSendRequestToBackend(DownstreamState* ss, const int sd, const char* request, const size_t requestLen, bool healthCheck=false)
1136 {
1137 ssize_t result;
1138
1139 if (ss->sourceItf == 0) {
1140 result = send(sd, request, requestLen, 0);
1141 }
1142 else {
1143 struct msghdr msgh;
1144 struct iovec iov;
1145 char cbuf[256];
1146 ComboAddress remote(ss->remote);
1147 fillMSGHdr(&msgh, &iov, cbuf, sizeof(cbuf), const_cast<char*>(request), requestLen, &remote);
1148 addCMsgSrcAddr(&msgh, cbuf, &ss->sourceAddr, ss->sourceItf);
1149 result = sendmsg(sd, &msgh, 0);
1150 }
1151
1152 if (result == -1) {
1153 int savederrno = errno;
1154 vinfolog("Error sending request to backend %s: %d", ss->remote.toStringWithPort(), savederrno);
1155
1156 /* This might sound silly, but on Linux send() might fail with EINVAL
1157 if the interface the socket was bound to doesn't exist anymore.
1158 We don't want to reconnect the real socket if the healthcheck failed,
1159 because it's not using the same socket.
1160 */
1161 if (!healthCheck && (savederrno == EINVAL || savederrno == ENODEV)) {
1162 ss->reconnect();
1163 }
1164 }
1165
1166 return result;
1167 }
1168
1169 bool addXPF(DNSQuestion& dq, uint16_t optionCode)
1170 {
1171 std::string payload = generateXPFPayload(dq.tcp, *dq.remote, *dq.local);
1172 uint8_t root = '\0';
1173 dnsrecordheader drh;
1174 drh.d_type = htons(optionCode);
1175 drh.d_class = htons(QClass::IN);
1176 drh.d_ttl = 0;
1177 drh.d_clen = htons(payload.size());
1178 size_t recordHeaderLen = sizeof(root) + sizeof(drh);
1179
1180 size_t available = dq.size - dq.len;
1181
1182 if ((payload.size() + recordHeaderLen) > available) {
1183 return false;
1184 }
1185
1186 size_t pos = dq.len;
1187 memcpy(reinterpret_cast<char*>(dq.dh) + pos, &root, sizeof(root));
1188 pos += sizeof(root);
1189 memcpy(reinterpret_cast<char*>(dq.dh) + pos, &drh, sizeof(drh));
1190 pos += sizeof(drh);
1191 memcpy(reinterpret_cast<char*>(dq.dh) + pos, payload.data(), payload.size());
1192 pos += payload.size();
1193
1194 dq.len = pos;
1195
1196 dq.dh->arcount = htons(ntohs(dq.dh->arcount) + 1);
1197
1198 return true;
1199 }
1200
1201 static bool isUDPQueryAcceptable(ClientState& cs, LocalHolders& holders, const struct msghdr* msgh, const ComboAddress& remote, ComboAddress& dest)
1202 {
1203 if (msgh->msg_flags & MSG_TRUNC) {
1204 /* message was too large for our buffer */
1205 vinfolog("Dropping message too large for our buffer");
1206 g_stats.nonCompliantQueries++;
1207 return false;
1208 }
1209
1210 if(!holders.acl->match(remote)) {
1211 vinfolog("Query from %s dropped because of ACL", remote.toStringWithPort());
1212 g_stats.aclDrops++;
1213 return false;
1214 }
1215
1216 cs.queries++;
1217 g_stats.queries++;
1218
1219 if (HarvestDestinationAddress(msgh, &dest)) {
1220 /* we don't get the port, only the address */
1221 dest.sin4.sin_port = cs.local.sin4.sin_port;
1222 }
1223 else {
1224 dest.sin4.sin_family = 0;
1225 }
1226
1227 return true;
1228 }
1229
1230 #ifdef HAVE_DNSCRYPT
1231 static bool checkDNSCryptQuery(const ClientState& cs, const char* query, uint16_t& len, std::shared_ptr<DNSCryptQuery>& dnsCryptQuery, const ComboAddress& dest, const ComboAddress& remote, time_t now)
1232 {
1233 if (cs.dnscryptCtx) {
1234 vector<uint8_t> response;
1235 uint16_t decryptedQueryLen = 0;
1236
1237 dnsCryptQuery = std::make_shared<DNSCryptQuery>(cs.dnscryptCtx);
1238
1239 bool decrypted = handleDNSCryptQuery(const_cast<char*>(query), len, dnsCryptQuery, &decryptedQueryLen, false, now, response);
1240
1241 if (!decrypted) {
1242 if (response.size() > 0) {
1243 sendUDPResponse(cs.udpFD, reinterpret_cast<char*>(response.data()), static_cast<uint16_t>(response.size()), 0, dest, remote);
1244 }
1245 return false;
1246 }
1247
1248 len = decryptedQueryLen;
1249 }
1250 return true;
1251 }
1252 #endif /* HAVE_DNSCRYPT */
1253
1254 bool checkQueryHeaders(const struct dnsheader* dh)
1255 {
1256 if (dh->qr) { // don't respond to responses
1257 g_stats.nonCompliantQueries++;
1258 return false;
1259 }
1260
1261 if (dh->qdcount == 0) {
1262 g_stats.emptyQueries++;
1263 return false;
1264 }
1265
1266 if (dh->rd) {
1267 g_stats.rdQueries++;
1268 }
1269
1270 return true;
1271 }
1272
1273 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1274 static void queueResponse(const ClientState& cs, const char* response, uint16_t responseLen, const ComboAddress& dest, const ComboAddress& remote, struct mmsghdr& outMsg, struct iovec* iov, char* cbuf)
1275 {
1276 outMsg.msg_len = 0;
1277 fillMSGHdr(&outMsg.msg_hdr, iov, nullptr, 0, const_cast<char*>(response), responseLen, const_cast<ComboAddress*>(&remote));
1278
1279 if (dest.sin4.sin_family == 0) {
1280 outMsg.msg_hdr.msg_control = nullptr;
1281 }
1282 else {
1283 addCMsgSrcAddr(&outMsg.msg_hdr, cbuf, &dest, 0);
1284 }
1285 }
1286 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1287
1288 static void processUDPQuery(ClientState& cs, LocalHolders& holders, const struct msghdr* msgh, const ComboAddress& remote, ComboAddress& dest, char* query, uint16_t len, size_t queryBufferSize, struct mmsghdr* responsesVect, unsigned int* queuedResponses, struct iovec* respIOV, char* respCBuf)
1289 {
1290 assert(responsesVect == nullptr || (queuedResponses != nullptr && respIOV != nullptr && respCBuf != nullptr));
1291 uint16_t queryId = 0;
1292
1293 try {
1294 if (!isUDPQueryAcceptable(cs, holders, msgh, remote, dest)) {
1295 return;
1296 }
1297
1298 /* we need an accurate ("real") value for the response and
1299 to store into the IDS, but not for insertion into the
1300 rings for example */
1301 struct timespec queryRealTime;
1302 struct timespec now;
1303 gettime(&now);
1304 gettime(&queryRealTime, true);
1305
1306 #ifdef HAVE_DNSCRYPT
1307 std::shared_ptr<DNSCryptQuery> dnsCryptQuery = nullptr;
1308
1309 if (!checkDNSCryptQuery(cs, query, len, dnsCryptQuery, dest, remote, queryRealTime.tv_sec)) {
1310 return;
1311 }
1312 #endif
1313
1314 struct dnsheader* dh = reinterpret_cast<struct dnsheader*>(query);
1315 queryId = ntohs(dh->id);
1316
1317 if (!checkQueryHeaders(dh)) {
1318 return;
1319 }
1320
1321 string poolname;
1322 int delayMsec = 0;
1323 const uint16_t * flags = getFlagsFromDNSHeader(dh);
1324 const uint16_t origFlags = *flags;
1325 uint16_t qtype, qclass;
1326 unsigned int consumed = 0;
1327 DNSName qname(query, len, sizeof(dnsheader), false, &qtype, &qclass, &consumed);
1328 DNSQuestion dq(&qname, qtype, qclass, consumed, dest.sin4.sin_family != 0 ? &dest : &cs.local, &remote, dh, queryBufferSize, len, false, &queryRealTime);
1329
1330 if (!processQuery(holders, dq, poolname, &delayMsec, now))
1331 {
1332 return;
1333 }
1334
1335 if(dq.dh->qr) { // something turned it into a response
1336 fixUpQueryTurnedResponse(dq, origFlags);
1337
1338 if (!cs.muted) {
1339 char* response = query;
1340 uint16_t responseLen = dq.len;
1341
1342 DNSResponse dr(dq.qname, dq.qtype, dq.qclass, consumed, dq.local, dq.remote, reinterpret_cast<dnsheader*>(response), dq.size, responseLen, false, &queryRealTime);
1343 #ifdef HAVE_PROTOBUF
1344 dr.uniqueId = dq.uniqueId;
1345 #endif
1346 dr.qTag = dq.qTag;
1347
1348 if (!processResponse(holders.selfAnsweredRespRulactions, dr, &delayMsec)) {
1349 return;
1350 }
1351
1352 #ifdef HAVE_DNSCRYPT
1353 if (!encryptResponse(response, &responseLen, dq.size, false, dnsCryptQuery, nullptr, nullptr)) {
1354 return;
1355 }
1356 #endif
1357 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1358 if (delayMsec == 0 && responsesVect != nullptr) {
1359 queueResponse(cs, response, responseLen, dest, remote, responsesVect[*queuedResponses], respIOV, respCBuf);
1360 (*queuedResponses)++;
1361 }
1362 else
1363 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1364 {
1365 sendUDPResponse(cs.udpFD, response, responseLen, delayMsec, dest, remote);
1366 }
1367
1368 g_stats.selfAnswered++;
1369 doLatencyStats(0); // we're not going to measure this
1370 }
1371
1372 return;
1373 }
1374
1375 DownstreamState* ss = nullptr;
1376 std::shared_ptr<ServerPool> serverPool = getPool(*holders.pools, poolname);
1377 std::shared_ptr<DNSDistPacketCache> packetCache = serverPool->packetCache;
1378 auto policy = *(holders.policy);
1379 if (serverPool->policy != nullptr) {
1380 policy = *(serverPool->policy);
1381 }
1382 auto servers = serverPool->getServers();
1383 if (policy.isLua) {
1384 std::lock_guard<std::mutex> lock(g_luamutex);
1385 ss = policy.policy(servers, &dq).get();
1386 }
1387 else {
1388 ss = policy.policy(servers, &dq).get();
1389 }
1390
1391 bool ednsAdded = false;
1392 bool ecsAdded = false;
1393 if (dq.useECS && ((ss && ss->useECS) || (!ss && serverPool->getECS()))) {
1394 if (!handleEDNSClientSubnet(query, dq.size, consumed, &dq.len, &(ednsAdded), &(ecsAdded), dq.ecsSet ? dq.ecs.getNetwork() : remote, dq.ecsOverride, dq.ecsSet ? dq.ecs.getBits() : dq.ecsPrefixLength)) {
1395 vinfolog("Dropping query from %s because we couldn't insert the ECS value", remote.toStringWithPort());
1396 return;
1397 }
1398 }
1399
1400 uint32_t cacheKey = 0;
1401 boost::optional<Netmask> subnet;
1402 if (packetCache && !dq.skipCache) {
1403 uint16_t cachedResponseSize = dq.size;
1404 uint32_t allowExpired = ss ? 0 : g_staleCacheEntriesTTL;
1405 if (packetCache->get(dq, consumed, dh->id, query, &cachedResponseSize, &cacheKey, subnet, allowExpired)) {
1406 DNSResponse dr(dq.qname, dq.qtype, dq.qclass, dq.consumed, dq.local, dq.remote, reinterpret_cast<dnsheader*>(query), dq.size, cachedResponseSize, false, &queryRealTime);
1407 #ifdef HAVE_PROTOBUF
1408 dr.uniqueId = dq.uniqueId;
1409 #endif
1410 dr.qTag = dq.qTag;
1411
1412 if (!processResponse(holders.cacheHitRespRulactions, dr, &delayMsec)) {
1413 return;
1414 }
1415
1416 if (!cs.muted) {
1417 #ifdef HAVE_DNSCRYPT
1418 if (!encryptResponse(query, &cachedResponseSize, dq.size, false, dnsCryptQuery, nullptr, nullptr)) {
1419 return;
1420 }
1421 #endif
1422 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1423 if (delayMsec == 0 && responsesVect != nullptr) {
1424 queueResponse(cs, query, cachedResponseSize, dest, remote, responsesVect[*queuedResponses], respIOV, respCBuf);
1425 (*queuedResponses)++;
1426 }
1427 else
1428 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1429 {
1430 sendUDPResponse(cs.udpFD, query, cachedResponseSize, delayMsec, dest, remote);
1431 }
1432 }
1433
1434 g_stats.cacheHits++;
1435 doLatencyStats(0); // we're not going to measure this
1436 return;
1437 }
1438 g_stats.cacheMisses++;
1439 }
1440
1441 if(!ss) {
1442 g_stats.noPolicy++;
1443
1444 if (g_servFailOnNoPolicy && !cs.muted) {
1445 char* response = query;
1446 uint16_t responseLen = dq.len;
1447 restoreFlags(dh, origFlags);
1448
1449 dq.dh->rcode = RCode::ServFail;
1450 dq.dh->qr = true;
1451
1452 DNSResponse dr(dq.qname, dq.qtype, dq.qclass, dq.consumed, dq.local, dq.remote, reinterpret_cast<dnsheader*>(response), dq.size, responseLen, false, &queryRealTime);
1453 #ifdef HAVE_PROTOBUF
1454 dr.uniqueId = dq.uniqueId;
1455 #endif
1456 dr.qTag = dq.qTag;
1457
1458 if (!processResponse(holders.selfAnsweredRespRulactions, dr, &delayMsec)) {
1459 return;
1460 }
1461
1462 #ifdef HAVE_DNSCRYPT
1463 if (!encryptResponse(response, &responseLen, dq.size, false, dnsCryptQuery, nullptr, nullptr)) {
1464 return;
1465 }
1466 #endif
1467 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1468 if (responsesVect != nullptr) {
1469 queueResponse(cs, response, responseLen, dest, remote, responsesVect[*queuedResponses], respIOV, respCBuf);
1470 (*queuedResponses)++;
1471 }
1472 else
1473 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1474 {
1475 sendUDPResponse(cs.udpFD, response, responseLen, 0, dest, remote);
1476 }
1477
1478 // no response-only statistics counter to update.
1479 doLatencyStats(0); // we're not going to measure this
1480 }
1481 vinfolog("%s query for %s|%s from %s, no policy applied", g_servFailOnNoPolicy ? "ServFailed" : "Dropped", dq.qname->toString(), QType(dq.qtype).getName(), remote.toStringWithPort());
1482 return;
1483 }
1484
1485 if (dq.addXPF && ss->xpfRRCode != 0) {
1486 addXPF(dq, ss->xpfRRCode);
1487 }
1488
1489 ss->queries++;
1490
1491 unsigned int idOffset = (ss->idOffset++) % ss->idStates.size();
1492 IDState* ids = &ss->idStates[idOffset];
1493 ids->age = 0;
1494
1495 int oldFD = ids->origFD.exchange(cs.udpFD);
1496 if(oldFD < 0) {
1497 // if we are reusing, no change in outstanding
1498 ss->outstanding++;
1499 }
1500 else {
1501 ss->reuseds++;
1502 g_stats.downstreamTimeouts++;
1503 }
1504
1505 ids->cs = &cs;
1506 ids->origID = dh->id;
1507 ids->origRemote = remote;
1508 ids->sentTime.set(queryRealTime);
1509 ids->qname = qname;
1510 ids->qtype = dq.qtype;
1511 ids->qclass = dq.qclass;
1512 ids->delayMsec = delayMsec;
1513 ids->tempFailureTTL = dq.tempFailureTTL;
1514 ids->origFlags = origFlags;
1515 ids->cacheKey = cacheKey;
1516 ids->subnet = subnet;
1517 ids->skipCache = dq.skipCache;
1518 ids->packetCache = packetCache;
1519 ids->ednsAdded = ednsAdded;
1520 ids->ecsAdded = ecsAdded;
1521 ids->qTag = dq.qTag;
1522
1523 /* If we couldn't harvest the real dest addr, still
1524 write down the listening addr since it will be useful
1525 (especially if it's not an 'any' one).
1526 We need to keep track of which one it is since we may
1527 want to use the real but not the listening addr to reply.
1528 */
1529 if (dest.sin4.sin_family != 0) {
1530 ids->origDest = dest;
1531 ids->destHarvested = true;
1532 }
1533 else {
1534 ids->origDest = cs.local;
1535 ids->destHarvested = false;
1536 }
1537 #ifdef HAVE_DNSCRYPT
1538 ids->dnsCryptQuery = dnsCryptQuery;
1539 #endif
1540 #ifdef HAVE_PROTOBUF
1541 ids->uniqueId = dq.uniqueId;
1542 #endif
1543
1544 dh->id = idOffset;
1545
1546 int fd = pickBackendSocketForSending(ss);
1547 ssize_t ret = udpClientSendRequestToBackend(ss, fd, query, dq.len);
1548
1549 if(ret < 0) {
1550 ss->sendErrors++;
1551 g_stats.downstreamSendErrors++;
1552 }
1553
1554 vinfolog("Got query for %s|%s from %s, relayed to %s", ids->qname.toString(), QType(ids->qtype).getName(), remote.toStringWithPort(), ss->getName());
1555 }
1556 catch(const std::exception& e){
1557 vinfolog("Got an error in UDP question thread while parsing a query from %s, id %d: %s", remote.toStringWithPort(), queryId, e.what());
1558 }
1559 }
1560
1561 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1562 static void MultipleMessagesUDPClientThread(ClientState* cs, LocalHolders& holders)
1563 {
1564 struct MMReceiver
1565 {
1566 char packet[4096];
1567 /* used by HarvestDestinationAddress */
1568 char cbuf[256];
1569 ComboAddress remote;
1570 ComboAddress dest;
1571 struct iovec iov;
1572 };
1573 const size_t vectSize = g_udpVectorSize;
1574 /* the actual buffer is larger because:
1575 - we may have to add EDNS and/or ECS
1576 - we use it for self-generated responses (from rule or cache)
1577 but we only accept incoming payloads up to that size
1578 */
1579 static_assert(s_udpIncomingBufferSize <= sizeof(MMReceiver::packet), "the incoming buffer size should not be larger than sizeof(MMReceiver::packet)");
1580
1581 auto recvData = std::unique_ptr<MMReceiver[]>(new MMReceiver[vectSize]);
1582 auto msgVec = std::unique_ptr<struct mmsghdr[]>(new struct mmsghdr[vectSize]);
1583 auto outMsgVec = std::unique_ptr<struct mmsghdr[]>(new struct mmsghdr[vectSize]);
1584
1585 /* initialize the structures needed to receive our messages */
1586 for (size_t idx = 0; idx < vectSize; idx++) {
1587 recvData[idx].remote.sin4.sin_family = cs->local.sin4.sin_family;
1588 fillMSGHdr(&msgVec[idx].msg_hdr, &recvData[idx].iov, recvData[idx].cbuf, sizeof(recvData[idx].cbuf), recvData[idx].packet, s_udpIncomingBufferSize, &recvData[idx].remote);
1589 }
1590
1591 /* go now */
1592 for(;;) {
1593
1594 /* reset the IO vector, since it's also used to send the vector of responses
1595 to avoid having to copy the data around */
1596 for (size_t idx = 0; idx < vectSize; idx++) {
1597 recvData[idx].iov.iov_base = recvData[idx].packet;
1598 recvData[idx].iov.iov_len = sizeof(recvData[idx].packet);
1599 }
1600
1601 /* block until we have at least one message ready, but return
1602 as many as possible to save the syscall costs */
1603 int msgsGot = recvmmsg(cs->udpFD, msgVec.get(), vectSize, MSG_WAITFORONE | MSG_TRUNC, nullptr);
1604
1605 if (msgsGot <= 0) {
1606 vinfolog("Getting UDP messages via recvmmsg() failed with: %s", strerror(errno));
1607 continue;
1608 }
1609
1610 unsigned int msgsToSend = 0;
1611
1612 /* process the received messages */
1613 for (int msgIdx = 0; msgIdx < msgsGot; msgIdx++) {
1614 const struct msghdr* msgh = &msgVec[msgIdx].msg_hdr;
1615 unsigned int got = msgVec[msgIdx].msg_len;
1616 const ComboAddress& remote = recvData[msgIdx].remote;
1617
1618 if (got < sizeof(struct dnsheader)) {
1619 g_stats.nonCompliantQueries++;
1620 continue;
1621 }
1622
1623 processUDPQuery(*cs, holders, msgh, remote, recvData[msgIdx].dest, recvData[msgIdx].packet, static_cast<uint16_t>(got), sizeof(recvData[msgIdx].packet), outMsgVec.get(), &msgsToSend, &recvData[msgIdx].iov, recvData[msgIdx].cbuf);
1624
1625 }
1626
1627 /* immediate (not delayed or sent to a backend) responses (mostly from a rule, dynamic block
1628 or the cache) can be sent in batch too */
1629
1630 if (msgsToSend > 0 && msgsToSend <= static_cast<unsigned int>(msgsGot)) {
1631 int sent = sendmmsg(cs->udpFD, outMsgVec.get(), msgsToSend, 0);
1632
1633 if (sent < 0 || static_cast<unsigned int>(sent) != msgsToSend) {
1634 vinfolog("Error sending responses with sendmmsg() (%d on %u): %s", sent, msgsToSend, strerror(errno));
1635 }
1636 }
1637
1638 }
1639 }
1640 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1641
1642 // listens to incoming queries, sends out to downstream servers, noting the intended return path
1643 static void* udpClientThread(ClientState* cs)
1644 try
1645 {
1646 LocalHolders holders;
1647
1648 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1649 if (g_udpVectorSize > 1) {
1650 MultipleMessagesUDPClientThread(cs, holders);
1651
1652 }
1653 else
1654 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1655 {
1656 char packet[4096];
1657 /* the actual buffer is larger because:
1658 - we may have to add EDNS and/or ECS
1659 - we use it for self-generated responses (from rule or cache)
1660 but we only accept incoming payloads up to that size
1661 */
1662 static_assert(s_udpIncomingBufferSize <= sizeof(packet), "the incoming buffer size should not be larger than sizeof(MMReceiver::packet)");
1663 struct msghdr msgh;
1664 struct iovec iov;
1665 /* used by HarvestDestinationAddress */
1666 char cbuf[256];
1667
1668 ComboAddress remote;
1669 ComboAddress dest;
1670 remote.sin4.sin_family = cs->local.sin4.sin_family;
1671 fillMSGHdr(&msgh, &iov, cbuf, sizeof(cbuf), packet, sizeof(packet), &remote);
1672
1673 for(;;) {
1674 ssize_t got = recvmsg(cs->udpFD, &msgh, 0);
1675
1676 if (got < 0 || static_cast<size_t>(got) < sizeof(struct dnsheader)) {
1677 g_stats.nonCompliantQueries++;
1678 continue;
1679 }
1680
1681 processUDPQuery(*cs, holders, &msgh, remote, dest, packet, static_cast<uint16_t>(got), s_udpIncomingBufferSize, nullptr, nullptr, nullptr, nullptr);
1682 }
1683 }
1684
1685 return nullptr;
1686 }
1687 catch(const std::exception &e)
1688 {
1689 errlog("UDP client thread died because of exception: %s", e.what());
1690 return nullptr;
1691 }
1692 catch(const PDNSException &e)
1693 {
1694 errlog("UDP client thread died because of PowerDNS exception: %s", e.reason);
1695 return nullptr;
1696 }
1697 catch(...)
1698 {
1699 errlog("UDP client thread died because of an exception: %s", "unknown");
1700 return nullptr;
1701 }
1702
1703 static bool upCheck(DownstreamState& ds)
1704 try
1705 {
1706 DNSName checkName = ds.checkName;
1707 uint16_t checkType = ds.checkType.getCode();
1708 uint16_t checkClass = ds.checkClass;
1709 dnsheader checkHeader;
1710 memset(&checkHeader, 0, sizeof(checkHeader));
1711
1712 checkHeader.qdcount = htons(1);
1713 #ifdef HAVE_LIBSODIUM
1714 checkHeader.id = randombytes_random() % 65536;
1715 #else
1716 checkHeader.id = random() % 65536;
1717 #endif
1718
1719 checkHeader.rd = true;
1720 if (ds.setCD) {
1721 checkHeader.cd = true;
1722 }
1723
1724
1725 if (ds.checkFunction) {
1726 std::lock_guard<std::mutex> lock(g_luamutex);
1727 auto ret = ds.checkFunction(checkName, checkType, checkClass, &checkHeader);
1728 checkName = std::get<0>(ret);
1729 checkType = std::get<1>(ret);
1730 checkClass = std::get<2>(ret);
1731 }
1732
1733 vector<uint8_t> packet;
1734 DNSPacketWriter dpw(packet, checkName, checkType, checkClass);
1735 dnsheader * requestHeader = dpw.getHeader();
1736 *requestHeader = checkHeader;
1737
1738 Socket sock(ds.remote.sin4.sin_family, SOCK_DGRAM);
1739 sock.setNonBlocking();
1740 if (!IsAnyAddress(ds.sourceAddr)) {
1741 sock.setReuseAddr();
1742 sock.bind(ds.sourceAddr);
1743 }
1744 sock.connect(ds.remote);
1745 ssize_t sent = udpClientSendRequestToBackend(&ds, sock.getHandle(), (char*)&packet[0], packet.size(), true);
1746 if (sent < 0) {
1747 int ret = errno;
1748 if (g_verboseHealthChecks)
1749 infolog("Error while sending a health check query to backend %s: %d", ds.getNameWithAddr(), ret);
1750 return false;
1751 }
1752
1753 int ret=waitForRWData(sock.getHandle(), true, 1, 0);
1754 if(ret < 0 || !ret) { // error, timeout, both are down!
1755 if (ret < 0) {
1756 ret = errno;
1757 if (g_verboseHealthChecks)
1758 infolog("Error while waiting for the health check response from backend %s: %d", ds.getNameWithAddr(), ret);
1759 }
1760 else {
1761 if (g_verboseHealthChecks)
1762 infolog("Timeout while waiting for the health check response from backend %s", ds.getNameWithAddr());
1763 }
1764 return false;
1765 }
1766
1767 string reply;
1768 ComboAddress from;
1769 sock.recvFrom(reply, from);
1770
1771 /* we are using a connected socket but hey.. */
1772 if (from != ds.remote) {
1773 if (g_verboseHealthChecks)
1774 infolog("Invalid health check response received from %s, expecting one from %s", from.toStringWithPort(), ds.remote.toStringWithPort());
1775 return false;
1776 }
1777
1778 const dnsheader * responseHeader = reinterpret_cast<const dnsheader *>(reply.c_str());
1779
1780 if (reply.size() < sizeof(*responseHeader)) {
1781 if (g_verboseHealthChecks)
1782 infolog("Invalid health check response of size %d from backend %s, expecting at least %d", reply.size(), ds.getNameWithAddr(), sizeof(*responseHeader));
1783 return false;
1784 }
1785
1786 if (responseHeader->id != requestHeader->id) {
1787 if (g_verboseHealthChecks)
1788 infolog("Invalid health check response id %d from backend %s, expecting %d", responseHeader->id, ds.getNameWithAddr(), requestHeader->id);
1789 return false;
1790 }
1791
1792 if (!responseHeader->qr) {
1793 if (g_verboseHealthChecks)
1794 infolog("Invalid health check response from backend %s, expecting QR to be set", ds.getNameWithAddr());
1795 return false;
1796 }
1797
1798 if (responseHeader->rcode == RCode::ServFail) {
1799 if (g_verboseHealthChecks)
1800 infolog("Backend %s responded to health check with ServFail", ds.getNameWithAddr());
1801 return false;
1802 }
1803
1804 if (ds.mustResolve && (responseHeader->rcode == RCode::NXDomain || responseHeader->rcode == RCode::Refused)) {
1805 if (g_verboseHealthChecks)
1806 infolog("Backend %s responded to health check with %s while mustResolve is set", ds.getNameWithAddr(), responseHeader->rcode == RCode::NXDomain ? "NXDomain" : "Refused");
1807 return false;
1808 }
1809
1810 uint16_t receivedType;
1811 uint16_t receivedClass;
1812 DNSName receivedName(reply.c_str(), reply.size(), sizeof(dnsheader), false, &receivedType, &receivedClass);
1813
1814 if (receivedName != checkName || receivedType != checkType || receivedClass != checkClass) {
1815 if (g_verboseHealthChecks)
1816 infolog("Backend %s responded to health check with an invalid qname (%s vs %s), qtype (%s vs %s) or qclass (%d vs %d)", ds.getNameWithAddr(), receivedName.toLogString(), checkName.toLogString(), QType(receivedType).getName(), QType(checkType).getName(), receivedClass, checkClass);
1817 return false;
1818 }
1819
1820 return true;
1821 }
1822 catch(const std::exception& e)
1823 {
1824 if (g_verboseHealthChecks)
1825 infolog("Error checking the health of backend %s: %s", ds.getNameWithAddr(), e.what());
1826 return false;
1827 }
1828 catch(...)
1829 {
1830 if (g_verboseHealthChecks)
1831 infolog("Unknown exception while checking the health of backend %s", ds.getNameWithAddr());
1832 return false;
1833 }
1834
1835 uint64_t g_maxTCPClientThreads{10};
1836 std::atomic<uint16_t> g_cacheCleaningDelay{60};
1837 std::atomic<uint16_t> g_cacheCleaningPercentage{100};
1838
1839 void* maintThread()
1840 {
1841 int interval = 1;
1842 size_t counter = 0;
1843 int32_t secondsToWaitLog = 0;
1844
1845 for(;;) {
1846 sleep(interval);
1847
1848 {
1849 std::lock_guard<std::mutex> lock(g_luamutex);
1850 auto f = g_lua.readVariable<boost::optional<std::function<void()> > >("maintenance");
1851 if(f) {
1852 try {
1853 (*f)();
1854 secondsToWaitLog = 0;
1855 }
1856 catch(std::exception &e) {
1857 if (secondsToWaitLog <= 0) {
1858 infolog("Error during execution of maintenance function: %s", e.what());
1859 secondsToWaitLog = 61;
1860 }
1861 secondsToWaitLog -= interval;
1862 }
1863 }
1864 }
1865
1866 counter++;
1867 if (counter >= g_cacheCleaningDelay) {
1868 auto localPools = g_pools.getLocal();
1869 std::shared_ptr<DNSDistPacketCache> packetCache = nullptr;
1870 for (const auto& entry : *localPools) {
1871 packetCache = entry.second->packetCache;
1872 if (packetCache) {
1873 size_t upTo = (packetCache->getMaxEntries()* (100 - g_cacheCleaningPercentage)) / 100;
1874 packetCache->purgeExpired(upTo);
1875 }
1876 }
1877 counter = 0;
1878 }
1879
1880 // ponder pruning g_dynblocks of expired entries here
1881 }
1882 return 0;
1883 }
1884
1885 void* healthChecksThread()
1886 {
1887 int interval = 1;
1888
1889 for(;;) {
1890 sleep(interval);
1891
1892 if(g_tcpclientthreads->getQueuedCount() > 1 && !g_tcpclientthreads->hasReachedMaxThreads())
1893 g_tcpclientthreads->addTCPClientThread();
1894
1895 auto states = g_dstates.getLocal(); // this points to the actual shared_ptrs!
1896 for(auto& dss : *states) {
1897 if(dss->availability==DownstreamState::Availability::Auto) {
1898 bool newState=upCheck(*dss);
1899 if (newState) {
1900 if (dss->currentCheckFailures != 0) {
1901 dss->currentCheckFailures = 0;
1902 }
1903 }
1904 else if (!newState && dss->upStatus) {
1905 dss->currentCheckFailures++;
1906 if (dss->currentCheckFailures < dss->maxCheckFailures) {
1907 newState = true;
1908 }
1909 }
1910
1911 if(newState != dss->upStatus) {
1912 warnlog("Marking downstream %s as '%s'", dss->getNameWithAddr(), newState ? "up" : "down");
1913
1914 if (newState && !dss->connected) {
1915 newState = dss->reconnect();
1916
1917 if (dss->connected && !dss->threadStarted.test_and_set()) {
1918 dss->tid = thread(responderThread, dss);
1919 }
1920 }
1921
1922 dss->upStatus = newState;
1923 dss->currentCheckFailures = 0;
1924 if (g_snmpAgent && g_snmpTrapsEnabled) {
1925 g_snmpAgent->sendBackendStatusChangeTrap(dss);
1926 }
1927 }
1928 }
1929
1930 auto delta = dss->sw.udiffAndSet()/1000000.0;
1931 dss->queryLoad = 1.0*(dss->queries.load() - dss->prev.queries.load())/delta;
1932 dss->dropRate = 1.0*(dss->reuseds.load() - dss->prev.reuseds.load())/delta;
1933 dss->prev.queries.store(dss->queries.load());
1934 dss->prev.reuseds.store(dss->reuseds.load());
1935
1936 for(IDState& ids : dss->idStates) { // timeouts
1937 int origFD = ids.origFD;
1938 if(origFD >=0 && ids.age++ > g_udpTimeout) {
1939 /* We set origFD to -1 as soon as possible
1940 to limit the risk of racing with the
1941 responder thread.
1942 The UDP client thread only checks origFD to
1943 know whether outstanding has to be incremented,
1944 so the sooner the better any way since we _will_
1945 decrement it.
1946 */
1947 if (ids.origFD.exchange(-1) != origFD) {
1948 /* this state has been altered in the meantime,
1949 don't go anywhere near it */
1950 continue;
1951 }
1952 ids.age = 0;
1953 dss->reuseds++;
1954 --dss->outstanding;
1955 g_stats.downstreamTimeouts++; // this is an 'actively' discovered timeout
1956 vinfolog("Had a downstream timeout from %s (%s) for query for %s|%s from %s",
1957 dss->remote.toStringWithPort(), dss->name,
1958 ids.qname.toString(), QType(ids.qtype).getName(), ids.origRemote.toStringWithPort());
1959
1960 struct timespec ts;
1961 gettime(&ts);
1962
1963 struct dnsheader fake;
1964 memset(&fake, 0, sizeof(fake));
1965 fake.id = ids.origID;
1966
1967 g_rings.insertResponse(ts, ids.origRemote, ids.qname, ids.qtype, std::numeric_limits<unsigned int>::max(), 0, fake, dss->remote);
1968 }
1969 }
1970 }
1971 }
1972 return 0;
1973 }
1974
1975 static void bindAny(int af, int sock)
1976 {
1977 __attribute__((unused)) int one = 1;
1978
1979 #ifdef IP_FREEBIND
1980 if (setsockopt(sock, IPPROTO_IP, IP_FREEBIND, &one, sizeof(one)) < 0)
1981 warnlog("Warning: IP_FREEBIND setsockopt failed: %s", strerror(errno));
1982 #endif
1983
1984 #ifdef IP_BINDANY
1985 if (af == AF_INET)
1986 if (setsockopt(sock, IPPROTO_IP, IP_BINDANY, &one, sizeof(one)) < 0)
1987 warnlog("Warning: IP_BINDANY setsockopt failed: %s", strerror(errno));
1988 #endif
1989 #ifdef IPV6_BINDANY
1990 if (af == AF_INET6)
1991 if (setsockopt(sock, IPPROTO_IPV6, IPV6_BINDANY, &one, sizeof(one)) < 0)
1992 warnlog("Warning: IPV6_BINDANY setsockopt failed: %s", strerror(errno));
1993 #endif
1994 #ifdef SO_BINDANY
1995 if (setsockopt(sock, SOL_SOCKET, SO_BINDANY, &one, sizeof(one)) < 0)
1996 warnlog("Warning: SO_BINDANY setsockopt failed: %s", strerror(errno));
1997 #endif
1998 }
1999
2000 static void dropGroupPrivs(gid_t gid)
2001 {
2002 if (gid) {
2003 if (setgid(gid) == 0) {
2004 if (setgroups(0, NULL) < 0) {
2005 warnlog("Warning: Unable to drop supplementary gids: %s", strerror(errno));
2006 }
2007 }
2008 else {
2009 warnlog("Warning: Unable to set group ID to %d: %s", gid, strerror(errno));
2010 }
2011 }
2012 }
2013
2014 static void dropUserPrivs(uid_t uid)
2015 {
2016 if(uid) {
2017 if(setuid(uid) < 0) {
2018 warnlog("Warning: Unable to set user ID to %d: %s", uid, strerror(errno));
2019 }
2020 }
2021 }
2022
2023 static void checkFileDescriptorsLimits(size_t udpBindsCount, size_t tcpBindsCount)
2024 {
2025 /* stdin, stdout, stderr */
2026 size_t requiredFDsCount = 3;
2027 auto backends = g_dstates.getLocal();
2028 /* UDP sockets to backends */
2029 size_t backendUDPSocketsCount = 0;
2030 for (const auto& backend : *backends) {
2031 backendUDPSocketsCount += backend->sockets.size();
2032 }
2033 requiredFDsCount += backendUDPSocketsCount;
2034 /* TCP sockets to backends */
2035 requiredFDsCount += (backends->size() * g_maxTCPClientThreads);
2036 /* listening sockets */
2037 requiredFDsCount += udpBindsCount;
2038 requiredFDsCount += tcpBindsCount;
2039 /* max TCP connections currently served */
2040 requiredFDsCount += g_maxTCPClientThreads;
2041 /* max pipes for communicating between TCP acceptors and client threads */
2042 requiredFDsCount += (g_maxTCPClientThreads * 2);
2043 /* max TCP queued connections */
2044 requiredFDsCount += g_maxTCPQueuedConnections;
2045 /* DelayPipe pipe */
2046 requiredFDsCount += 2;
2047 /* syslog socket */
2048 requiredFDsCount++;
2049 /* webserver main socket */
2050 requiredFDsCount++;
2051 /* console main socket */
2052 requiredFDsCount++;
2053 /* carbon export */
2054 requiredFDsCount++;
2055 /* history file */
2056 requiredFDsCount++;
2057 struct rlimit rl;
2058 getrlimit(RLIMIT_NOFILE, &rl);
2059 if (rl.rlim_cur <= requiredFDsCount) {
2060 warnlog("Warning, this configuration can use more than %d file descriptors, web server and console connections not included, and the current limit is %d.", std::to_string(requiredFDsCount), std::to_string(rl.rlim_cur));
2061 #ifdef HAVE_SYSTEMD
2062 warnlog("You can increase this value by using LimitNOFILE= in the systemd unit file or ulimit.");
2063 #else
2064 warnlog("You can increase this value by using ulimit.");
2065 #endif
2066 }
2067 }
2068
2069 struct
2070 {
2071 vector<string> locals;
2072 vector<string> remotes;
2073 bool checkConfig{false};
2074 bool beClient{false};
2075 bool beSupervised{false};
2076 string command;
2077 string config;
2078 string uid;
2079 string gid;
2080 } g_cmdLine;
2081
2082 std::atomic<bool> g_configurationDone{false};
2083
2084 static void usage()
2085 {
2086 cout<<endl;
2087 cout<<"Syntax: dnsdist [-C,--config file] [-c,--client [IP[:PORT]]]\n";
2088 cout<<"[-e,--execute cmd] [-h,--help] [-l,--local addr]\n";
2089 cout<<"[-v,--verbose] [--check-config] [--version]\n";
2090 cout<<"\n";
2091 cout<<"-a,--acl netmask Add this netmask to the ACL\n";
2092 cout<<"-C,--config file Load configuration from 'file'\n";
2093 cout<<"-c,--client Operate as a client, connect to dnsdist. This reads\n";
2094 cout<<" controlSocket from your configuration file, but also\n";
2095 cout<<" accepts an IP:PORT argument\n";
2096 #ifdef HAVE_LIBSODIUM
2097 cout<<"-k,--setkey KEY Use KEY for encrypted communication to dnsdist. This\n";
2098 cout<<" is similar to setting setKey in the configuration file.\n";
2099 cout<<" NOTE: this will leak this key in your shell's history\n";
2100 cout<<" and in the systems running process list.\n";
2101 #endif
2102 cout<<"--check-config Validate the configuration file and exit. The exit-code\n";
2103 cout<<" reflects the validation, 0 is OK, 1 means an error.\n";
2104 cout<<" Any errors are printed as well.\n";
2105 cout<<"-e,--execute cmd Connect to dnsdist and execute 'cmd'\n";
2106 cout<<"-g,--gid gid Change the process group ID after binding sockets\n";
2107 cout<<"-h,--help Display this helpful message\n";
2108 cout<<"-l,--local address Listen on this local address\n";
2109 cout<<"--supervised Don't open a console, I'm supervised\n";
2110 cout<<" (use with e.g. systemd and daemontools)\n";
2111 cout<<"--disable-syslog Don't log to syslog, only to stdout\n";
2112 cout<<" (use with e.g. systemd)\n";
2113 cout<<"-u,--uid uid Change the process user ID after binding sockets\n";
2114 cout<<"-v,--verbose Enable verbose mode\n";
2115 cout<<"-V,--version Show dnsdist version information and exit\n";
2116 }
2117
2118 int main(int argc, char** argv)
2119 try
2120 {
2121 size_t udpBindsCount = 0;
2122 size_t tcpBindsCount = 0;
2123 rl_attempted_completion_function = my_completion;
2124 rl_completion_append_character = 0;
2125
2126 signal(SIGPIPE, SIG_IGN);
2127 signal(SIGCHLD, SIG_IGN);
2128 openlog("dnsdist", LOG_PID, LOG_DAEMON);
2129
2130 #ifdef HAVE_LIBSODIUM
2131 if (sodium_init() == -1) {
2132 cerr<<"Unable to initialize crypto library"<<endl;
2133 exit(EXIT_FAILURE);
2134 }
2135 g_hashperturb=randombytes_uniform(0xffffffff);
2136 srandom(randombytes_uniform(0xffffffff));
2137 #else
2138 {
2139 struct timeval tv;
2140 gettimeofday(&tv, 0);
2141 srandom(tv.tv_sec ^ tv.tv_usec ^ getpid());
2142 g_hashperturb=random();
2143 }
2144
2145 #endif
2146 ComboAddress clientAddress = ComboAddress();
2147 g_cmdLine.config=SYSCONFDIR "/dnsdist.conf";
2148 struct option longopts[]={
2149 {"acl", required_argument, 0, 'a'},
2150 {"check-config", no_argument, 0, 1},
2151 {"client", no_argument, 0, 'c'},
2152 {"config", required_argument, 0, 'C'},
2153 {"disable-syslog", no_argument, 0, 2},
2154 {"execute", required_argument, 0, 'e'},
2155 {"gid", required_argument, 0, 'g'},
2156 {"help", no_argument, 0, 'h'},
2157 {"local", required_argument, 0, 'l'},
2158 {"setkey", required_argument, 0, 'k'},
2159 {"supervised", no_argument, 0, 3},
2160 {"uid", required_argument, 0, 'u'},
2161 {"verbose", no_argument, 0, 'v'},
2162 {"version", no_argument, 0, 'V'},
2163 {0,0,0,0}
2164 };
2165 int longindex=0;
2166 string optstring;
2167 for(;;) {
2168 int c=getopt_long(argc, argv, "a:cC:e:g:hk:l:u:vV", longopts, &longindex);
2169 if(c==-1)
2170 break;
2171 switch(c) {
2172 case 1:
2173 g_cmdLine.checkConfig=true;
2174 break;
2175 case 2:
2176 g_syslog=false;
2177 break;
2178 case 3:
2179 g_cmdLine.beSupervised=true;
2180 break;
2181 case 'C':
2182 g_cmdLine.config=optarg;
2183 break;
2184 case 'c':
2185 g_cmdLine.beClient=true;
2186 break;
2187 case 'e':
2188 g_cmdLine.command=optarg;
2189 break;
2190 case 'g':
2191 g_cmdLine.gid=optarg;
2192 break;
2193 case 'h':
2194 cout<<"dnsdist "<<VERSION<<endl;
2195 usage();
2196 cout<<"\n";
2197 exit(EXIT_SUCCESS);
2198 break;
2199 case 'a':
2200 optstring=optarg;
2201 g_ACL.modify([optstring](NetmaskGroup& nmg) { nmg.addMask(optstring); });
2202 break;
2203 case 'k':
2204 #ifdef HAVE_LIBSODIUM
2205 if (B64Decode(string(optarg), g_consoleKey) < 0) {
2206 cerr<<"Unable to decode key '"<<optarg<<"'."<<endl;
2207 exit(EXIT_FAILURE);
2208 }
2209 #else
2210 cerr<<"dnsdist has been built without libsodium, -k/--setkey is unsupported."<<endl;
2211 exit(EXIT_FAILURE);
2212 #endif
2213 break;
2214 case 'l':
2215 g_cmdLine.locals.push_back(trim_copy(string(optarg)));
2216 break;
2217 case 'u':
2218 g_cmdLine.uid=optarg;
2219 break;
2220 case 'v':
2221 g_verbose=true;
2222 break;
2223 case 'V':
2224 #ifdef LUAJIT_VERSION
2225 cout<<"dnsdist "<<VERSION<<" ("<<LUA_RELEASE<<" ["<<LUAJIT_VERSION<<"])"<<endl;
2226 #else
2227 cout<<"dnsdist "<<VERSION<<" ("<<LUA_RELEASE<<")"<<endl;
2228 #endif
2229 cout<<"Enabled features: ";
2230 #ifdef HAVE_DNS_OVER_TLS
2231 cout<<"dns-over-tls(";
2232 #ifdef HAVE_GNUTLS
2233 cout<<"gnutls";
2234 #ifdef HAVE_LIBSSL
2235 cout<<" ";
2236 #endif
2237 #endif
2238 #ifdef HAVE_LIBSSL
2239 cout<<"openssl";
2240 #endif
2241 cout<<") ";
2242 #endif
2243 #ifdef HAVE_DNSCRYPT
2244 cout<<"dnscrypt ";
2245 #endif
2246 #ifdef HAVE_EBPF
2247 cout<<"ebpf ";
2248 #endif
2249 #ifdef HAVE_FSTRM
2250 cout<<"fstrm ";
2251 #endif
2252 #ifdef HAVE_LIBSODIUM
2253 cout<<"libsodium ";
2254 #endif
2255 #ifdef HAVE_PROTOBUF
2256 cout<<"protobuf ";
2257 #endif
2258 #ifdef HAVE_RE2
2259 cout<<"re2 ";
2260 #endif
2261 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
2262 cout<<"recvmmsg/sendmmsg ";
2263 #endif
2264 #ifdef HAVE_NET_SNMP
2265 cout<<"snmp ";
2266 #endif
2267 #ifdef HAVE_SYSTEMD
2268 cout<<"systemd";
2269 #endif
2270 cout<<endl;
2271 exit(EXIT_SUCCESS);
2272 break;
2273 case '?':
2274 //getopt_long printed an error message.
2275 usage();
2276 exit(EXIT_FAILURE);
2277 break;
2278 }
2279 }
2280
2281 argc-=optind;
2282 argv+=optind;
2283 for(auto p = argv; *p; ++p) {
2284 if(g_cmdLine.beClient) {
2285 clientAddress = ComboAddress(*p, 5199);
2286 } else {
2287 g_cmdLine.remotes.push_back(*p);
2288 }
2289 }
2290
2291 ServerPolicy leastOutstandingPol{"leastOutstanding", leastOutstanding, false};
2292
2293 g_policy.setState(leastOutstandingPol);
2294 if(g_cmdLine.beClient || !g_cmdLine.command.empty()) {
2295 setupLua(true, g_cmdLine.config);
2296 if (clientAddress != ComboAddress())
2297 g_serverControl = clientAddress;
2298 doClient(g_serverControl, g_cmdLine.command);
2299 _exit(EXIT_SUCCESS);
2300 }
2301
2302 auto acl = g_ACL.getCopy();
2303 if(acl.empty()) {
2304 for(auto& addr : {"127.0.0.0/8", "10.0.0.0/8", "100.64.0.0/10", "169.254.0.0/16", "192.168.0.0/16", "172.16.0.0/12", "::1/128", "fc00::/7", "fe80::/10"})
2305 acl.addMask(addr);
2306 g_ACL.setState(acl);
2307 }
2308
2309 auto consoleACL = g_consoleACL.getCopy();
2310 for (const auto& mask : { "127.0.0.1/8", "::1/128" }) {
2311 consoleACL.addMask(mask);
2312 }
2313 g_consoleACL.setState(consoleACL);
2314
2315 if (g_cmdLine.checkConfig) {
2316 setupLua(true, g_cmdLine.config);
2317 // No exception was thrown
2318 infolog("Configuration '%s' OK!", g_cmdLine.config);
2319 _exit(EXIT_SUCCESS);
2320 }
2321
2322 auto todo=setupLua(false, g_cmdLine.config);
2323
2324 auto localPools = g_pools.getCopy();
2325 {
2326 bool precompute = false;
2327 if (g_policy.getLocal()->name == "chashed") {
2328 precompute = true;
2329 } else {
2330 for (const auto& entry: localPools) {
2331 if (entry.second->policy != nullptr && entry.second->policy->name == "chashed") {
2332 precompute = true;
2333 break ;
2334 }
2335 }
2336 }
2337 if (precompute) {
2338 vinfolog("Pre-computing hashes for consistent hash load-balancing policy");
2339 // pre compute hashes
2340 auto backends = g_dstates.getLocal();
2341 for (auto& backend: *backends) {
2342 backend->hash();
2343 }
2344 }
2345 }
2346
2347 if(g_cmdLine.locals.size()) {
2348 g_locals.clear();
2349 for(auto loc : g_cmdLine.locals)
2350 g_locals.push_back(std::make_tuple(ComboAddress(loc, 53), true, false, 0, "", std::set<int>()));
2351 }
2352
2353 if(g_locals.empty())
2354 g_locals.push_back(std::make_tuple(ComboAddress("127.0.0.1", 53), true, false, 0, "", std::set<int>()));
2355
2356 g_configurationDone = true;
2357
2358 vector<ClientState*> toLaunch;
2359 for(const auto& local : g_locals) {
2360 ClientState* cs = new ClientState;
2361 cs->local= std::get<0>(local);
2362 cs->udpFD = SSocket(cs->local.sin4.sin_family, SOCK_DGRAM, 0);
2363 if(cs->local.sin4.sin_family == AF_INET6) {
2364 SSetsockopt(cs->udpFD, IPPROTO_IPV6, IPV6_V6ONLY, 1);
2365 }
2366 //if(g_vm.count("bind-non-local"))
2367 bindAny(cs->local.sin4.sin_family, cs->udpFD);
2368
2369 // if (!setSocketTimestamps(cs->udpFD))
2370 // g_log<<Logger::Warning<<"Unable to enable timestamp reporting for socket"<<endl;
2371
2372
2373 if(IsAnyAddress(cs->local)) {
2374 int one=1;
2375 setsockopt(cs->udpFD, IPPROTO_IP, GEN_IP_PKTINFO, &one, sizeof(one)); // linux supports this, so why not - might fail on other systems
2376 #ifdef IPV6_RECVPKTINFO
2377 setsockopt(cs->udpFD, IPPROTO_IPV6, IPV6_RECVPKTINFO, &one, sizeof(one));
2378 #endif
2379 }
2380
2381 if (std::get<2>(local)) {
2382 #ifdef SO_REUSEPORT
2383 SSetsockopt(cs->udpFD, SOL_SOCKET, SO_REUSEPORT, 1);
2384 #else
2385 warnlog("SO_REUSEPORT has been configured on local address '%s' but is not supported", std::get<0>(local).toStringWithPort());
2386 #endif
2387 }
2388
2389 const std::string& itf = std::get<4>(local);
2390 if (!itf.empty()) {
2391 #ifdef SO_BINDTODEVICE
2392 int res = setsockopt(cs->udpFD, SOL_SOCKET, SO_BINDTODEVICE, itf.c_str(), itf.length());
2393 if (res != 0) {
2394 warnlog("Error setting up the interface on local address '%s': %s", std::get<0>(local).toStringWithPort(), strerror(errno));
2395 }
2396 #else
2397 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", std::get<0>(local).toStringWithPort());
2398 #endif
2399 }
2400
2401 #ifdef HAVE_EBPF
2402 if (g_defaultBPFFilter) {
2403 cs->attachFilter(g_defaultBPFFilter);
2404 vinfolog("Attaching default BPF Filter to UDP frontend %s", cs->local.toStringWithPort());
2405 }
2406 #endif /* HAVE_EBPF */
2407
2408 cs->cpus = std::get<5>(local);
2409
2410 SBind(cs->udpFD, cs->local);
2411 toLaunch.push_back(cs);
2412 g_frontends.push_back(cs);
2413 udpBindsCount++;
2414 }
2415
2416 for(const auto& local : g_locals) {
2417 if(!std::get<1>(local)) { // no TCP/IP
2418 warnlog("Not providing TCP/IP service on local address '%s'", std::get<0>(local).toStringWithPort());
2419 continue;
2420 }
2421 ClientState* cs = new ClientState;
2422 cs->local= std::get<0>(local);
2423
2424 cs->tcpFD = SSocket(cs->local.sin4.sin_family, SOCK_STREAM, 0);
2425
2426 SSetsockopt(cs->tcpFD, SOL_SOCKET, SO_REUSEADDR, 1);
2427 #ifdef TCP_DEFER_ACCEPT
2428 SSetsockopt(cs->tcpFD, IPPROTO_TCP, TCP_DEFER_ACCEPT, 1);
2429 #endif
2430 if (std::get<3>(local) > 0) {
2431 #ifdef TCP_FASTOPEN
2432 SSetsockopt(cs->tcpFD, IPPROTO_TCP, TCP_FASTOPEN, std::get<3>(local));
2433 #else
2434 warnlog("TCP Fast Open has been configured on local address '%s' but is not supported", std::get<0>(local).toStringWithPort());
2435 #endif
2436 }
2437 if(cs->local.sin4.sin_family == AF_INET6) {
2438 SSetsockopt(cs->tcpFD, IPPROTO_IPV6, IPV6_V6ONLY, 1);
2439 }
2440 #ifdef SO_REUSEPORT
2441 /* no need to warn again if configured but support is not available, we already did for UDP */
2442 if (std::get<2>(local)) {
2443 SSetsockopt(cs->tcpFD, SOL_SOCKET, SO_REUSEPORT, 1);
2444 }
2445 #endif
2446
2447 const std::string& itf = std::get<4>(local);
2448 if (!itf.empty()) {
2449 #ifdef SO_BINDTODEVICE
2450 int res = setsockopt(cs->tcpFD, SOL_SOCKET, SO_BINDTODEVICE, itf.c_str(), itf.length());
2451 if (res != 0) {
2452 warnlog("Error setting up the interface on local address '%s': %s", std::get<0>(local).toStringWithPort(), strerror(errno));
2453 }
2454 #else
2455 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", std::get<0>(local).toStringWithPort());
2456 #endif
2457 }
2458
2459 #ifdef HAVE_EBPF
2460 if (g_defaultBPFFilter) {
2461 cs->attachFilter(g_defaultBPFFilter);
2462 vinfolog("Attaching default BPF Filter to TCP frontend %s", cs->local.toStringWithPort());
2463 }
2464 #endif /* HAVE_EBPF */
2465
2466 // if(g_vm.count("bind-non-local"))
2467 bindAny(cs->local.sin4.sin_family, cs->tcpFD);
2468 SBind(cs->tcpFD, cs->local);
2469 SListen(cs->tcpFD, 64);
2470 warnlog("Listening on %s", cs->local.toStringWithPort());
2471
2472 toLaunch.push_back(cs);
2473 g_frontends.push_back(cs);
2474 tcpBindsCount++;
2475 }
2476
2477 #ifdef HAVE_DNSCRYPT
2478 for(auto& dcLocal : g_dnsCryptLocals) {
2479 ClientState* cs = new ClientState;
2480 cs->local = std::get<0>(dcLocal);
2481 cs->dnscryptCtx = std::get<1>(dcLocal);
2482 cs->udpFD = SSocket(cs->local.sin4.sin_family, SOCK_DGRAM, 0);
2483 if(cs->local.sin4.sin_family == AF_INET6) {
2484 SSetsockopt(cs->udpFD, IPPROTO_IPV6, IPV6_V6ONLY, 1);
2485 }
2486 bindAny(cs->local.sin4.sin_family, cs->udpFD);
2487 if(IsAnyAddress(cs->local)) {
2488 int one=1;
2489 setsockopt(cs->udpFD, IPPROTO_IP, GEN_IP_PKTINFO, &one, sizeof(one)); // linux supports this, so why not - might fail on other systems
2490 #ifdef IPV6_RECVPKTINFO
2491 setsockopt(cs->udpFD, IPPROTO_IPV6, IPV6_RECVPKTINFO, &one, sizeof(one));
2492 #endif
2493 }
2494 if (std::get<2>(dcLocal)) {
2495 #ifdef SO_REUSEPORT
2496 SSetsockopt(cs->udpFD, SOL_SOCKET, SO_REUSEPORT, 1);
2497 #else
2498 warnlog("SO_REUSEPORT has been configured on local address '%s' but is not supported", std::get<0>(dcLocal).toStringWithPort());
2499 #endif
2500 }
2501
2502 const std::string& itf = std::get<4>(dcLocal);
2503 if (!itf.empty()) {
2504 #ifdef SO_BINDTODEVICE
2505 int res = setsockopt(cs->udpFD, SOL_SOCKET, SO_BINDTODEVICE, itf.c_str(), itf.length());
2506 if (res != 0) {
2507 warnlog("Error setting up the interface on local address '%s': %s", std::get<0>(dcLocal).toStringWithPort(), strerror(errno));
2508 }
2509 #else
2510 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", std::get<0>(dcLocal).toStringWithPort());
2511 #endif
2512 }
2513
2514 #ifdef HAVE_EBPF
2515 if (g_defaultBPFFilter) {
2516 cs->attachFilter(g_defaultBPFFilter);
2517 vinfolog("Attaching default BPF Filter to UDP DNSCrypt frontend %s", cs->local.toStringWithPort());
2518 }
2519 #endif /* HAVE_EBPF */
2520 SBind(cs->udpFD, cs->local);
2521 toLaunch.push_back(cs);
2522 g_frontends.push_back(cs);
2523 udpBindsCount++;
2524
2525 cs = new ClientState;
2526 cs->local = std::get<0>(dcLocal);
2527 cs->dnscryptCtx = std::get<1>(dcLocal);
2528 cs->tcpFD = SSocket(cs->local.sin4.sin_family, SOCK_STREAM, 0);
2529 SSetsockopt(cs->tcpFD, SOL_SOCKET, SO_REUSEADDR, 1);
2530 #ifdef TCP_DEFER_ACCEPT
2531 SSetsockopt(cs->tcpFD, IPPROTO_TCP, TCP_DEFER_ACCEPT, 1);
2532 #endif
2533 if (std::get<3>(dcLocal) > 0) {
2534 #ifdef TCP_FASTOPEN
2535 SSetsockopt(cs->tcpFD, IPPROTO_TCP, TCP_FASTOPEN, std::get<3>(dcLocal));
2536 #else
2537 warnlog("TCP Fast Open has been configured on local address '%s' but is not supported", std::get<0>(dcLocal).toStringWithPort());
2538 #endif
2539 }
2540
2541 #ifdef SO_REUSEPORT
2542 /* no need to warn again if configured but support is not available, we already did for UDP */
2543 if (std::get<2>(dcLocal)) {
2544 SSetsockopt(cs->tcpFD, SOL_SOCKET, SO_REUSEPORT, 1);
2545 }
2546 #endif
2547
2548 if (!itf.empty()) {
2549 #ifdef SO_BINDTODEVICE
2550 int res = setsockopt(cs->tcpFD, SOL_SOCKET, SO_BINDTODEVICE, itf.c_str(), itf.length());
2551 if (res != 0) {
2552 warnlog("Error setting up the interface on local address '%s': %s", std::get<0>(dcLocal).toStringWithPort(), strerror(errno));
2553 }
2554 #else
2555 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", std::get<0>(dcLocal).toStringWithPort());
2556 #endif
2557 }
2558
2559 if(cs->local.sin4.sin_family == AF_INET6) {
2560 SSetsockopt(cs->tcpFD, IPPROTO_IPV6, IPV6_V6ONLY, 1);
2561 }
2562 #ifdef HAVE_EBPF
2563 if (g_defaultBPFFilter) {
2564 cs->attachFilter(g_defaultBPFFilter);
2565 vinfolog("Attaching default BPF Filter to TCP DNSCrypt frontend %s", cs->local.toStringWithPort());
2566 }
2567 #endif /* HAVE_EBPF */
2568
2569 cs->cpus = std::get<5>(dcLocal);
2570
2571 bindAny(cs->local.sin4.sin_family, cs->tcpFD);
2572 SBind(cs->tcpFD, cs->local);
2573 SListen(cs->tcpFD, 64);
2574 warnlog("Listening on %s", cs->local.toStringWithPort());
2575 toLaunch.push_back(cs);
2576 g_frontends.push_back(cs);
2577 tcpBindsCount++;
2578 }
2579 #endif
2580
2581 for(auto& frontend : g_tlslocals) {
2582 ClientState* cs = new ClientState;
2583 cs->local = frontend->d_addr;
2584 cs->tcpFD = SSocket(cs->local.sin4.sin_family, SOCK_STREAM, 0);
2585 SSetsockopt(cs->tcpFD, SOL_SOCKET, SO_REUSEADDR, 1);
2586 #ifdef TCP_DEFER_ACCEPT
2587 SSetsockopt(cs->tcpFD, IPPROTO_TCP, TCP_DEFER_ACCEPT, 1);
2588 #endif
2589 if (frontend->d_tcpFastOpenQueueSize > 0) {
2590 #ifdef TCP_FASTOPEN
2591 SSetsockopt(cs->tcpFD, IPPROTO_TCP, TCP_FASTOPEN, frontend->d_tcpFastOpenQueueSize);
2592 #else
2593 warnlog("TCP Fast Open has been configured on local address '%s' but is not supported", cs->local.toStringWithPort());
2594 #endif
2595 }
2596 if (frontend->d_reusePort) {
2597 #ifdef SO_REUSEPORT
2598 SSetsockopt(cs->tcpFD, SOL_SOCKET, SO_REUSEPORT, 1);
2599 #else
2600 warnlog("SO_REUSEPORT has been configured on local address '%s' but is not supported", cs.local.toStringWithPort());
2601 #endif
2602 }
2603 if(cs->local.sin4.sin_family == AF_INET6) {
2604 SSetsockopt(cs->tcpFD, IPPROTO_IPV6, IPV6_V6ONLY, 1);
2605 }
2606
2607 if (!frontend->d_interface.empty()) {
2608 #ifdef SO_BINDTODEVICE
2609 int res = setsockopt(cs->tcpFD, SOL_SOCKET, SO_BINDTODEVICE, frontend->d_interface.c_str(), frontend->d_interface.length());
2610 if (res != 0) {
2611 warnlog("Error setting up the interface on local address '%s': %s", cs->local.toStringWithPort(), strerror(errno));
2612 }
2613 #else
2614 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", cs->local.toStringWithPort());
2615 #endif
2616 }
2617
2618 cs->cpus = frontend->d_cpus;
2619
2620 bindAny(cs->local.sin4.sin_family, cs->tcpFD);
2621 if (frontend->setupTLS()) {
2622 cs->tlsFrontend = frontend;
2623 SBind(cs->tcpFD, cs->local);
2624 SListen(cs->tcpFD, 64);
2625 warnlog("Listening on %s for TLS", cs->local.toStringWithPort());
2626 toLaunch.push_back(cs);
2627 g_frontends.push_back(cs);
2628 tcpBindsCount++;
2629 }
2630 else {
2631 delete cs;
2632 errlog("Error while setting up TLS on local address '%s', exiting", cs->local.toStringWithPort());
2633 _exit(EXIT_FAILURE);
2634 }
2635 }
2636
2637 warnlog("dnsdist %s comes with ABSOLUTELY NO WARRANTY. This is free software, and you are welcome to redistribute it according to the terms of the GPL version 2", VERSION);
2638
2639 vector<string> vec;
2640 std::string acls;
2641 g_ACL.getLocal()->toStringVector(&vec);
2642 for(const auto& s : vec) {
2643 if (!acls.empty())
2644 acls += ", ";
2645 acls += s;
2646 }
2647 infolog("ACL allowing queries from: %s", acls.c_str());
2648 vec.clear();
2649 acls.clear();
2650 g_consoleACL.getLocal()->toStringVector(&vec);
2651 for (const auto& entry : vec) {
2652 if (!acls.empty()) {
2653 acls += ", ";
2654 }
2655 acls += entry;
2656 }
2657 infolog("Console ACL allowing connections from: %s", acls.c_str());
2658
2659 #ifdef HAVE_LIBSODIUM
2660 if (g_consoleEnabled && g_consoleKey.empty()) {
2661 warnlog("Warning, the console has been enabled via 'controlSocket()' but no key has been set with 'setKey()' so all connections will fail until a key has been set");
2662 }
2663 #endif
2664
2665 uid_t newgid=0;
2666 gid_t newuid=0;
2667
2668 if(!g_cmdLine.gid.empty())
2669 newgid = strToGID(g_cmdLine.gid.c_str());
2670
2671 if(!g_cmdLine.uid.empty())
2672 newuid = strToUID(g_cmdLine.uid.c_str());
2673
2674 dropGroupPrivs(newgid);
2675 dropUserPrivs(newuid);
2676
2677 /* this need to be done _after_ dropping privileges */
2678 g_delay = new DelayPipe<DelayedPacket>();
2679
2680 if (g_snmpAgent) {
2681 g_snmpAgent->run();
2682 }
2683
2684 g_tcpclientthreads = std::make_shared<TCPClientCollection>(g_maxTCPClientThreads, g_useTCPSinglePipe);
2685
2686 for(auto& t : todo)
2687 t();
2688
2689 localPools = g_pools.getCopy();
2690 /* create the default pool no matter what */
2691 createPoolIfNotExists(localPools, "");
2692 if(g_cmdLine.remotes.size()) {
2693 for(const auto& address : g_cmdLine.remotes) {
2694 auto ret=std::make_shared<DownstreamState>(ComboAddress(address, 53));
2695 addServerToPool(localPools, "", ret);
2696 if (ret->connected && !ret->threadStarted.test_and_set()) {
2697 ret->tid = thread(responderThread, ret);
2698 }
2699 g_dstates.modify([ret](servers_t& servers) { servers.push_back(ret); });
2700 }
2701 }
2702 g_pools.setState(localPools);
2703
2704 if(g_dstates.getLocal()->empty()) {
2705 errlog("No downstream servers defined: all packets will get dropped");
2706 // you might define them later, but you need to know
2707 }
2708
2709 checkFileDescriptorsLimits(udpBindsCount, tcpBindsCount);
2710
2711 for(auto& dss : g_dstates.getCopy()) { // it is a copy, but the internal shared_ptrs are the real deal
2712 if(dss->availability==DownstreamState::Availability::Auto) {
2713 bool newState=upCheck(*dss);
2714 warnlog("Marking downstream %s as '%s'", dss->getNameWithAddr(), newState ? "up" : "down");
2715 dss->upStatus = newState;
2716 }
2717 }
2718
2719 for(auto& cs : toLaunch) {
2720 if (cs->udpFD >= 0) {
2721 thread t1(udpClientThread, cs);
2722 if (!cs->cpus.empty()) {
2723 mapThreadToCPUList(t1.native_handle(), cs->cpus);
2724 }
2725 t1.detach();
2726 }
2727 else if (cs->tcpFD >= 0) {
2728 thread t1(tcpAcceptorThread, cs);
2729 if (!cs->cpus.empty()) {
2730 mapThreadToCPUList(t1.native_handle(), cs->cpus);
2731 }
2732 t1.detach();
2733 }
2734 }
2735
2736 thread carbonthread(carbonDumpThread);
2737 carbonthread.detach();
2738
2739 thread stattid(maintThread);
2740 stattid.detach();
2741
2742 thread healththread(healthChecksThread);
2743
2744 if(g_cmdLine.beSupervised) {
2745 #ifdef HAVE_SYSTEMD
2746 sd_notify(0, "READY=1");
2747 #endif
2748 healththread.join();
2749 }
2750 else {
2751 healththread.detach();
2752 doConsole();
2753 }
2754 _exit(EXIT_SUCCESS);
2755
2756 }
2757 catch(const LuaContext::ExecutionErrorException& e) {
2758 try {
2759 errlog("Fatal Lua error: %s", e.what());
2760 std::rethrow_if_nested(e);
2761 } catch(const std::exception& ne) {
2762 errlog("Details: %s", ne.what());
2763 }
2764 catch(PDNSException &ae)
2765 {
2766 errlog("Fatal pdns error: %s", ae.reason);
2767 }
2768 _exit(EXIT_FAILURE);
2769 }
2770 catch(std::exception &e)
2771 {
2772 errlog("Fatal error: %s", e.what());
2773 _exit(EXIT_FAILURE);
2774 }
2775 catch(PDNSException &ae)
2776 {
2777 errlog("Fatal pdns error: %s", ae.reason);
2778 _exit(EXIT_FAILURE);
2779 }