]> git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/dnsdist.cc
dnsdist: add --version to --help
[thirdparty/pdns.git] / pdns / dnsdist.cc
1 /*
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
8 *
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
22
23 #include "config.h"
24
25 #include <fstream>
26 #include <getopt.h>
27 #include <grp.h>
28 #include <limits>
29 #include <netinet/tcp.h>
30 #include <pwd.h>
31 #include <sys/resource.h>
32 #include <unistd.h>
33
34 #if defined (__OpenBSD__) || defined(__NetBSD__)
35 #include <readline/readline.h>
36 #else
37 #include <editline/readline.h>
38 #endif
39
40 #ifdef HAVE_SYSTEMD
41 #include <systemd/sd-daemon.h>
42 #endif
43
44 #include "dnsdist.hh"
45 #include "dnsdist-cache.hh"
46 #include "dnsdist-console.hh"
47 #include "dnsdist-ecs.hh"
48 #include "dnsdist-lua.hh"
49 #include "dnsdist-rings.hh"
50
51 #include "base64.hh"
52 #include "delaypipe.hh"
53 #include "dolog.hh"
54 #include "dnsname.hh"
55 #include "dnswriter.hh"
56 #include "ednsoptions.hh"
57 #include "gettime.hh"
58 #include "lock.hh"
59 #include "misc.hh"
60 #include "sodcrypto.hh"
61 #include "sstuff.hh"
62 #include "xpf.hh"
63
64 thread_local boost::uuids::random_generator t_uuidGenerator;
65
66 /* Known sins:
67
68 Receiver is currently single threaded
69 not *that* bad actually, but now that we are thread safe, might want to scale
70 */
71
72 /* the Rulaction plan
73 Set of Rules, if one matches, it leads to an Action
74 Both rules and actions could conceivably be Lua based.
75 On the C++ side, both could be inherited from a class Rule and a class Action,
76 on the Lua side we can't do that. */
77
78 using std::atomic;
79 using std::thread;
80 bool g_verbose;
81
82 struct DNSDistStats g_stats;
83 uint16_t g_maxOutstanding{10240};
84 bool g_console;
85 bool g_verboseHealthChecks{false};
86 uint32_t g_staleCacheEntriesTTL{0};
87 bool g_syslog{true};
88
89 GlobalStateHolder<NetmaskGroup> g_ACL;
90 string g_outputBuffer;
91
92 vector<std::tuple<ComboAddress, bool, bool, int, string, std::set<int>>> g_locals;
93 std::vector<std::shared_ptr<TLSFrontend>> g_tlslocals;
94 #ifdef HAVE_DNSCRYPT
95 std::vector<std::tuple<ComboAddress,std::shared_ptr<DNSCryptContext>,bool, int, string, std::set<int> >> g_dnsCryptLocals;
96 #endif
97 #ifdef HAVE_EBPF
98 shared_ptr<BPFFilter> g_defaultBPFFilter;
99 std::vector<std::shared_ptr<DynBPFFilter> > g_dynBPFFilters;
100 #endif /* HAVE_EBPF */
101 vector<ClientState *> g_frontends;
102 GlobalStateHolder<pools_t> g_pools;
103 size_t g_udpVectorSize{1};
104
105 bool g_snmpEnabled{false};
106 bool g_snmpTrapsEnabled{false};
107 DNSDistSNMPAgent* g_snmpAgent{nullptr};
108
109 /* UDP: the grand design. Per socket we listen on for incoming queries there is one thread.
110 Then we have a bunch of connected sockets for talking to downstream servers.
111 We send directly to those sockets.
112
113 For the return path, per downstream server we have a thread that listens to responses.
114
115 Per socket there is an array of 2^16 states, when we send out a packet downstream, we note
116 there the original requestor and the original id. The new ID is the offset in the array.
117
118 When an answer comes in on a socket, we look up the offset by the id, and lob it to the
119 original requestor.
120
121 IDs are assigned by atomic increments of the socket offset.
122 */
123
124 GlobalStateHolder<vector<DNSDistRuleAction> > g_rulactions;
125 GlobalStateHolder<vector<DNSDistResponseRuleAction> > g_resprulactions;
126 GlobalStateHolder<vector<DNSDistResponseRuleAction> > g_cachehitresprulactions;
127 GlobalStateHolder<vector<DNSDistResponseRuleAction> > g_selfansweredresprulactions;
128
129 Rings g_rings;
130 QueryCount g_qcount;
131
132 GlobalStateHolder<servers_t> g_dstates;
133 GlobalStateHolder<NetmaskTree<DynBlock>> g_dynblockNMG;
134 GlobalStateHolder<SuffixMatchTree<DynBlock>> g_dynblockSMT;
135 DNSAction::Action g_dynBlockAction = DNSAction::Action::Drop;
136 int g_tcpRecvTimeout{2};
137 int g_tcpSendTimeout{2};
138 int g_udpTimeout{2};
139
140 bool g_servFailOnNoPolicy{false};
141 bool g_truncateTC{false};
142 bool g_fixupCase{0};
143
144 static const size_t s_udpIncomingBufferSize{1500};
145
146 static void truncateTC(const char* packet, uint16_t* len)
147 try
148 {
149 unsigned int consumed;
150 DNSName qname(packet, *len, sizeof(dnsheader), false, 0, 0, &consumed);
151 *len=(uint16_t) (sizeof(dnsheader)+consumed+DNS_TYPE_SIZE+DNS_CLASS_SIZE);
152 struct dnsheader* dh =(struct dnsheader*)packet;
153 dh->ancount = dh->arcount = dh->nscount=0;
154 }
155 catch(...)
156 {
157 g_stats.truncFail++;
158 }
159
160 struct DelayedPacket
161 {
162 int fd;
163 string packet;
164 ComboAddress destination;
165 ComboAddress origDest;
166 void operator()()
167 {
168 ssize_t res;
169 if(origDest.sin4.sin_family == 0) {
170 res = sendto(fd, packet.c_str(), packet.size(), 0, (struct sockaddr*)&destination, destination.getSocklen());
171 }
172 else {
173 res = sendfromto(fd, packet.c_str(), packet.size(), 0, origDest, destination);
174 }
175 if (res == -1) {
176 int err = errno;
177 vinfolog("Error sending delayed response to %s: %s", destination.toStringWithPort(), strerror(err));
178 }
179 }
180 };
181
182 DelayPipe<DelayedPacket> * g_delay = 0;
183
184 void doLatencyStats(double udiff)
185 {
186 if(udiff < 1000) g_stats.latency0_1++;
187 else if(udiff < 10000) g_stats.latency1_10++;
188 else if(udiff < 50000) g_stats.latency10_50++;
189 else if(udiff < 100000) g_stats.latency50_100++;
190 else if(udiff < 1000000) g_stats.latency100_1000++;
191 else g_stats.latencySlow++;
192
193 auto doAvg = [](double& var, double n, double weight) {
194 var = (weight -1) * var/weight + n/weight;
195 };
196
197 doAvg(g_stats.latencyAvg100, udiff, 100);
198 doAvg(g_stats.latencyAvg1000, udiff, 1000);
199 doAvg(g_stats.latencyAvg10000, udiff, 10000);
200 doAvg(g_stats.latencyAvg1000000, udiff, 1000000);
201 }
202
203 bool responseContentMatches(const char* response, const uint16_t responseLen, const DNSName& qname, const uint16_t qtype, const uint16_t qclass, const ComboAddress& remote)
204 {
205 uint16_t rqtype, rqclass;
206 unsigned int consumed;
207 DNSName rqname;
208 const struct dnsheader* dh = (struct dnsheader*) response;
209
210 if (responseLen < sizeof(dnsheader)) {
211 return false;
212 }
213
214 if (dh->qdcount == 0) {
215 if (dh->rcode != RCode::NoError && dh->rcode != RCode::NXDomain) {
216 return true;
217 }
218 else {
219 g_stats.nonCompliantResponses++;
220 return false;
221 }
222 }
223
224 try {
225 rqname=DNSName(response, responseLen, sizeof(dnsheader), false, &rqtype, &rqclass, &consumed);
226 }
227 catch(std::exception& e) {
228 if(responseLen > (ssize_t)sizeof(dnsheader))
229 infolog("Backend %s sent us a response with id %d that did not parse: %s", remote.toStringWithPort(), ntohs(dh->id), e.what());
230 g_stats.nonCompliantResponses++;
231 return false;
232 }
233
234 if (rqtype != qtype || rqclass != qclass || rqname != qname) {
235 return false;
236 }
237
238 return true;
239 }
240
241 void restoreFlags(struct dnsheader* dh, uint16_t origFlags)
242 {
243 static const uint16_t rdMask = 1 << FLAGS_RD_OFFSET;
244 static const uint16_t cdMask = 1 << FLAGS_CD_OFFSET;
245 static const uint16_t restoreFlagsMask = UINT16_MAX & ~(rdMask | cdMask);
246 uint16_t * flags = getFlagsFromDNSHeader(dh);
247 /* clear the flags we are about to restore */
248 *flags &= restoreFlagsMask;
249 /* only keep the flags we want to restore */
250 origFlags &= ~restoreFlagsMask;
251 /* set the saved flags as they were */
252 *flags |= origFlags;
253 }
254
255 bool fixUpResponse(char** response, uint16_t* responseLen, size_t* responseSize, const DNSName& qname, uint16_t origFlags, bool ednsAdded, bool ecsAdded, std::vector<uint8_t>& rewrittenResponse, uint16_t addRoom)
256 {
257 struct dnsheader* dh = (struct dnsheader*) *response;
258
259 if (*responseLen < sizeof(dnsheader)) {
260 return false;
261 }
262
263 restoreFlags(dh, origFlags);
264
265 if (*responseLen == sizeof(dnsheader)) {
266 return true;
267 }
268
269 if(g_fixupCase) {
270 string realname = qname.toDNSString();
271 if (*responseLen >= (sizeof(dnsheader) + realname.length())) {
272 memcpy(*response + sizeof(dnsheader), realname.c_str(), realname.length());
273 }
274 }
275
276 if (ednsAdded || ecsAdded) {
277 char * optStart = NULL;
278 size_t optLen = 0;
279 bool last = false;
280
281 int res = locateEDNSOptRR(*response, *responseLen, &optStart, &optLen, &last);
282
283 if (res == 0) {
284 if (ednsAdded) {
285 /* we added the entire OPT RR,
286 therefore we need to remove it entirely */
287 if (last) {
288 /* simply remove the last AR */
289 *responseLen -= optLen;
290 uint16_t arcount = ntohs(dh->arcount);
291 arcount--;
292 dh->arcount = htons(arcount);
293 }
294 else {
295 /* Removing an intermediary RR could lead to compression error */
296 if (rewriteResponseWithoutEDNS(*response, *responseLen, rewrittenResponse) == 0) {
297 *responseLen = rewrittenResponse.size();
298 if (addRoom && (UINT16_MAX - *responseLen) > addRoom) {
299 rewrittenResponse.reserve(*responseLen + addRoom);
300 }
301 *responseSize = rewrittenResponse.capacity();
302 *response = reinterpret_cast<char*>(rewrittenResponse.data());
303 }
304 else {
305 warnlog("Error rewriting content");
306 }
307 }
308 }
309 else {
310 /* the OPT RR was already present, but without ECS,
311 we need to remove the ECS option if any */
312 if (last) {
313 /* nothing after the OPT RR, we can simply remove the
314 ECS option */
315 size_t existingOptLen = optLen;
316 removeEDNSOptionFromOPT(optStart, &optLen, EDNSOptionCode::ECS);
317 *responseLen -= (existingOptLen - optLen);
318 }
319 else {
320 /* Removing an intermediary RR could lead to compression error */
321 if (rewriteResponseWithoutEDNSOption(*response, *responseLen, EDNSOptionCode::ECS, rewrittenResponse) == 0) {
322 *responseLen = rewrittenResponse.size();
323 if (addRoom && (UINT16_MAX - *responseLen) > addRoom) {
324 rewrittenResponse.reserve(*responseLen + addRoom);
325 }
326 *responseSize = rewrittenResponse.capacity();
327 *response = reinterpret_cast<char*>(rewrittenResponse.data());
328 }
329 else {
330 warnlog("Error rewriting content");
331 }
332 }
333 }
334 }
335 }
336
337 return true;
338 }
339
340 #ifdef HAVE_DNSCRYPT
341 bool encryptResponse(char* response, uint16_t* responseLen, size_t responseSize, bool tcp, std::shared_ptr<DNSCryptQuery> dnsCryptQuery, dnsheader** dh, dnsheader* dhCopy)
342 {
343 if (dnsCryptQuery) {
344 uint16_t encryptedResponseLen = 0;
345
346 /* save the original header before encrypting it in place */
347 if (dh != nullptr && *dh != nullptr && dhCopy != nullptr) {
348 memcpy(dhCopy, *dh, sizeof(dnsheader));
349 *dh = dhCopy;
350 }
351
352 int res = dnsCryptQuery->encryptResponse(response, *responseLen, responseSize, tcp, &encryptedResponseLen);
353 if (res == 0) {
354 *responseLen = encryptedResponseLen;
355 } else {
356 /* dropping response */
357 vinfolog("Error encrypting the response, dropping.");
358 return false;
359 }
360 }
361 return true;
362 }
363 #endif
364
365 static bool sendUDPResponse(int origFD, char* response, uint16_t responseLen, int delayMsec, const ComboAddress& origDest, const ComboAddress& origRemote)
366 {
367 if(delayMsec && g_delay) {
368 DelayedPacket dp{origFD, string(response,responseLen), origRemote, origDest};
369 g_delay->submit(dp, delayMsec);
370 }
371 else {
372 ssize_t res;
373 if(origDest.sin4.sin_family == 0) {
374 res = sendto(origFD, response, responseLen, 0, (struct sockaddr*)&origRemote, origRemote.getSocklen());
375 }
376 else {
377 res = sendfromto(origFD, response, responseLen, 0, origDest, origRemote);
378 }
379 if (res == -1) {
380 int err = errno;
381 vinfolog("Error sending response to %s: %s", origRemote.toStringWithPort(), strerror(err));
382 }
383 }
384
385 return true;
386 }
387
388
389 static int pickBackendSocketForSending(DownstreamState* state)
390 {
391 return state->sockets[state->socketsOffset++ % state->sockets.size()];
392 }
393
394 static void pickBackendSocketsReadyForReceiving(const std::shared_ptr<DownstreamState>& state, std::vector<int>& ready)
395 {
396 ready.clear();
397
398 if (state->sockets.size() == 1) {
399 ready.push_back(state->sockets[0]);
400 return ;
401 }
402
403 {
404 std::lock_guard<std::mutex> lock(state->socketsLock);
405 state->mplexer->getAvailableFDs(ready, -1);
406 }
407 }
408
409 // listens on a dedicated socket, lobs answers from downstream servers to original requestors
410 void* responderThread(std::shared_ptr<DownstreamState> dss)
411 try {
412 auto localRespRulactions = g_resprulactions.getLocal();
413 #ifdef HAVE_DNSCRYPT
414 char packet[4096 + DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE];
415 /* when the answer is encrypted in place, we need to get a copy
416 of the original header before encryption to fill the ring buffer */
417 dnsheader dhCopy;
418 #else
419 char packet[4096];
420 #endif
421 static_assert(sizeof(packet) <= UINT16_MAX, "Packet size should fit in a uint16_t");
422 vector<uint8_t> rewrittenResponse;
423
424 uint16_t queryId = 0;
425 std::vector<int> sockets;
426 sockets.reserve(dss->sockets.size());
427
428 for(;;) {
429 dnsheader* dh = reinterpret_cast<struct dnsheader*>(packet);
430 bool outstandingDecreased = false;
431 try {
432 pickBackendSocketsReadyForReceiving(dss, sockets);
433 for (const auto& fd : sockets) {
434 ssize_t got = recv(fd, packet, sizeof(packet), 0);
435 char * response = packet;
436 size_t responseSize = sizeof(packet);
437
438 if (got < (ssize_t) sizeof(dnsheader))
439 continue;
440
441 uint16_t responseLen = (uint16_t) got;
442 queryId = dh->id;
443
444 if(queryId >= dss->idStates.size())
445 continue;
446
447 IDState* ids = &dss->idStates[queryId];
448 int origFD = ids->origFD;
449
450 if(origFD < 0) // duplicate
451 continue;
452
453 /* setting age to 0 to prevent the maintainer thread from
454 cleaning this IDS while we process the response.
455 We have already a copy of the origFD, so it would
456 mostly mess up the outstanding counter.
457 */
458 ids->age = 0;
459
460 if (!responseContentMatches(response, responseLen, ids->qname, ids->qtype, ids->qclass, dss->remote)) {
461 continue;
462 }
463
464 --dss->outstanding; // you'd think an attacker could game this, but we're using connected socket
465 outstandingDecreased = true;
466
467 if(dh->tc && g_truncateTC) {
468 truncateTC(response, &responseLen);
469 }
470
471 dh->id = ids->origID;
472
473 uint16_t addRoom = 0;
474 DNSResponse dr(&ids->qname, ids->qtype, ids->qclass, &ids->origDest, &ids->origRemote, dh, sizeof(packet), responseLen, false, &ids->sentTime.d_start);
475 #ifdef HAVE_PROTOBUF
476 dr.uniqueId = ids->uniqueId;
477 #endif
478 dr.qTag = ids->qTag;
479
480 if (!processResponse(localRespRulactions, dr, &ids->delayMsec)) {
481 continue;
482 }
483
484 #ifdef HAVE_DNSCRYPT
485 if (ids->dnsCryptQuery) {
486 addRoom = DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE;
487 }
488 #endif
489 if (!fixUpResponse(&response, &responseLen, &responseSize, ids->qname, ids->origFlags, ids->ednsAdded, ids->ecsAdded, rewrittenResponse, addRoom)) {
490 continue;
491 }
492
493 if (ids->packetCache && !ids->skipCache) {
494 ids->packetCache->insert(ids->cacheKey, ids->qname, ids->qtype, ids->qclass, response, responseLen, false, dh->rcode, ids->tempFailureTTL);
495 }
496
497 if (ids->cs && !ids->cs->muted) {
498 #ifdef HAVE_DNSCRYPT
499 if (!encryptResponse(response, &responseLen, responseSize, false, ids->dnsCryptQuery, &dh, &dhCopy)) {
500 continue;
501 }
502 #endif
503
504 ComboAddress empty;
505 empty.sin4.sin_family = 0;
506 /* if ids->destHarvested is false, origDest holds the listening address.
507 We don't want to use that as a source since it could be 0.0.0.0 for example. */
508 sendUDPResponse(origFD, response, responseLen, ids->delayMsec, ids->destHarvested ? ids->origDest : empty, ids->origRemote);
509 }
510
511 g_stats.responses++;
512
513 double udiff = ids->sentTime.udiff();
514 vinfolog("Got answer from %s, relayed to %s, took %f usec", dss->remote.toStringWithPort(), ids->origRemote.toStringWithPort(), udiff);
515
516 struct timespec ts;
517 gettime(&ts);
518 g_rings.insertResponse(ts, ids->origRemote, ids->qname, ids->qtype, (unsigned int)udiff, (unsigned int)got, *dh, dss->remote);
519
520 if(dh->rcode == RCode::ServFail)
521 g_stats.servfailResponses++;
522 dss->latencyUsec = (127.0 * dss->latencyUsec / 128.0) + udiff/128.0;
523
524 doLatencyStats(udiff);
525
526 if (ids->origFD == origFD) {
527 #ifdef HAVE_DNSCRYPT
528 ids->dnsCryptQuery = nullptr;
529 #endif
530 ids->origFD = -1;
531 outstandingDecreased = false;
532 }
533
534 rewrittenResponse.clear();
535 }
536 }
537 catch(const std::exception& e){
538 vinfolog("Got an error in UDP responder thread while parsing a response from %s, id %d: %s", dss->remote.toStringWithPort(), queryId, e.what());
539 if (outstandingDecreased) {
540 /* so an exception was raised after we decreased the outstanding queries counter,
541 but before we could set ids->origFD to -1 (because we also set outstandingDecreased
542 to false then), meaning the IDS is still considered active and we will decrease the
543 counter again on a duplicate, or simply while reaping downstream timeouts, so let's
544 increase it back. */
545 dss->outstanding++;
546 }
547 }
548 }
549 return 0;
550 }
551 catch(const std::exception& e)
552 {
553 errlog("UDP responder thread died because of exception: %s", e.what());
554 return 0;
555 }
556 catch(const PDNSException& e)
557 {
558 errlog("UDP responder thread died because of PowerDNS exception: %s", e.reason);
559 return 0;
560 }
561 catch(...)
562 {
563 errlog("UDP responder thread died because of an exception: %s", "unknown");
564 return 0;
565 }
566
567 void DownstreamState::reconnect()
568 {
569 connected = false;
570 for (auto& fd : sockets) {
571 if (fd != -1) {
572 {
573 std::lock_guard<std::mutex> lock(socketsLock);
574 mplexer->removeReadFD(fd);
575 }
576 /* shutdown() is needed to wake up recv() in the responderThread */
577 shutdown(fd, SHUT_RDWR);
578 close(fd);
579 fd = -1;
580 }
581 if (!IsAnyAddress(remote)) {
582 fd = SSocket(remote.sin4.sin_family, SOCK_DGRAM, 0);
583 if (!IsAnyAddress(sourceAddr)) {
584 SSetsockopt(fd, SOL_SOCKET, SO_REUSEADDR, 1);
585 SBind(fd, sourceAddr);
586 }
587 try {
588 SConnect(fd, remote);
589 {
590 std::lock_guard<std::mutex> lock(socketsLock);
591 mplexer->addReadFD(fd, [](int, boost::any) {});
592 }
593 connected = true;
594 }
595 catch(const std::runtime_error& error) {
596 infolog("Error connecting to new server with address %s: %s", remote.toStringWithPort(), error.what());
597 connected = false;
598 break;
599 }
600 }
601 }
602
603 /* if at least one (re-)connection failed, close all sockets */
604 if (!connected) {
605 for (auto& fd : sockets) {
606 if (fd != -1) {
607 /* shutdown() is needed to wake up recv() in the responderThread */
608 shutdown(fd, SHUT_RDWR);
609 close(fd);
610 fd = -1;
611 }
612 }
613 }
614 }
615
616 DownstreamState::DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf_, size_t numberOfSockets): remote(remote_), sourceAddr(sourceAddr_), sourceItf(sourceItf_)
617 {
618 mplexer = std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
619
620 sockets.resize(numberOfSockets);
621 for (auto& fd : sockets) {
622 fd = -1;
623 }
624
625 if (!IsAnyAddress(remote)) {
626 reconnect();
627 idStates.resize(g_maxOutstanding);
628 sw.start();
629 infolog("Added downstream server %s", remote.toStringWithPort());
630 }
631 }
632
633 std::mutex g_luamutex;
634 LuaContext g_lua;
635
636 GlobalStateHolder<ServerPolicy> g_policy;
637
638 shared_ptr<DownstreamState> firstAvailable(const NumberedServerVector& servers, const DNSQuestion* dq)
639 {
640 for(auto& d : servers) {
641 if(d.second->isUp() && d.second->qps.check())
642 return d.second;
643 }
644 return leastOutstanding(servers, dq);
645 }
646
647 // get server with least outstanding queries, and within those, with the lowest order, and within those: the fastest
648 shared_ptr<DownstreamState> leastOutstanding(const NumberedServerVector& servers, const DNSQuestion* dq)
649 {
650 if (servers.size() == 1 && servers[0].second->isUp()) {
651 return servers[0].second;
652 }
653
654 vector<pair<tuple<int,int,double>, shared_ptr<DownstreamState>>> poss;
655 /* so you might wonder, why do we go through this trouble? The data on which we sort could change during the sort,
656 which would suck royally and could even lead to crashes. So first we snapshot on what we sort, and then we sort */
657 poss.reserve(servers.size());
658 for(auto& d : servers) {
659 if(d.second->isUp()) {
660 poss.push_back({make_tuple(d.second->outstanding.load(), d.second->order, d.second->latencyUsec), d.second});
661 }
662 }
663 if(poss.empty())
664 return shared_ptr<DownstreamState>();
665 nth_element(poss.begin(), poss.begin(), poss.end(), [](const decltype(poss)::value_type& a, const decltype(poss)::value_type& b) { return a.first < b.first; });
666 return poss.begin()->second;
667 }
668
669 shared_ptr<DownstreamState> valrandom(unsigned int val, const NumberedServerVector& servers, const DNSQuestion* dq)
670 {
671 vector<pair<int, shared_ptr<DownstreamState>>> poss;
672 int sum=0;
673 for(auto& d : servers) { // w=1, w=10 -> 1, 11
674 if(d.second->isUp()) {
675 sum+=d.second->weight;
676 poss.push_back({sum, d.second});
677 }
678 }
679
680 // Catch poss & sum are empty to avoid SIGFPE
681 if(poss.empty())
682 return shared_ptr<DownstreamState>();
683
684 int r = val % sum;
685 auto p = upper_bound(poss.begin(), poss.end(),r, [](int r_, const decltype(poss)::value_type& a) { return r_ < a.first;});
686 if(p==poss.end())
687 return shared_ptr<DownstreamState>();
688 return p->second;
689 }
690
691 shared_ptr<DownstreamState> wrandom(const NumberedServerVector& servers, const DNSQuestion* dq)
692 {
693 return valrandom(random(), servers, dq);
694 }
695
696 uint32_t g_hashperturb;
697 shared_ptr<DownstreamState> whashed(const NumberedServerVector& servers, const DNSQuestion* dq)
698 {
699 return valrandom(dq->qname->hash(g_hashperturb), servers, dq);
700 }
701
702
703 shared_ptr<DownstreamState> roundrobin(const NumberedServerVector& servers, const DNSQuestion* dq)
704 {
705 NumberedServerVector poss;
706
707 for(auto& d : servers) {
708 if(d.second->isUp()) {
709 poss.push_back(d);
710 }
711 }
712
713 const auto *res=&poss;
714 if(poss.empty())
715 res = &servers;
716
717 if(res->empty())
718 return shared_ptr<DownstreamState>();
719
720 static unsigned int counter;
721
722 return (*res)[(counter++) % res->size()].second;
723 }
724
725 ComboAddress g_serverControl{"127.0.0.1:5199"};
726
727 std::shared_ptr<ServerPool> createPoolIfNotExists(pools_t& pools, const string& poolName)
728 {
729 std::shared_ptr<ServerPool> pool;
730 pools_t::iterator it = pools.find(poolName);
731 if (it != pools.end()) {
732 pool = it->second;
733 }
734 else {
735 if (!poolName.empty())
736 vinfolog("Creating pool %s", poolName);
737 pool = std::make_shared<ServerPool>();
738 pools.insert(std::pair<std::string,std::shared_ptr<ServerPool> >(poolName, pool));
739 }
740 return pool;
741 }
742
743 void setPoolPolicy(pools_t& pools, const string& poolName, std::shared_ptr<ServerPolicy> policy)
744 {
745 std::shared_ptr<ServerPool> pool = createPoolIfNotExists(pools, poolName);
746 if (!poolName.empty()) {
747 vinfolog("Setting pool %s server selection policy to %s", poolName, policy->name);
748 } else {
749 vinfolog("Setting default pool server selection policy to %s", policy->name);
750 }
751 pool->policy = policy;
752 }
753
754 void addServerToPool(pools_t& pools, const string& poolName, std::shared_ptr<DownstreamState> server)
755 {
756 std::shared_ptr<ServerPool> pool = createPoolIfNotExists(pools, poolName);
757 if (!poolName.empty()) {
758 vinfolog("Adding server to pool %s", poolName);
759 } else {
760 vinfolog("Adding server to default pool");
761 }
762 pool->addServer(server);
763 }
764
765 void removeServerFromPool(pools_t& pools, const string& poolName, std::shared_ptr<DownstreamState> server)
766 {
767 std::shared_ptr<ServerPool> pool = getPool(pools, poolName);
768
769 if (!poolName.empty()) {
770 vinfolog("Removing server from pool %s", poolName);
771 }
772 else {
773 vinfolog("Removing server from default pool");
774 }
775
776 pool->removeServer(server);
777 }
778
779 std::shared_ptr<ServerPool> getPool(const pools_t& pools, const std::string& poolName)
780 {
781 pools_t::const_iterator it = pools.find(poolName);
782
783 if (it == pools.end()) {
784 throw std::out_of_range("No pool named " + poolName);
785 }
786
787 return it->second;
788 }
789
790 NumberedServerVector getDownstreamCandidates(const pools_t& pools, const std::string& poolName)
791 {
792 std::shared_ptr<ServerPool> pool = getPool(pools, poolName);
793 return pool->getServers();
794 }
795
796 // goal in life - if you send us a reasonably normal packet, we'll get Z for you, otherwise 0
797 int getEDNSZ(const char* packet, unsigned int len)
798 try
799 {
800 struct dnsheader* dh =(struct dnsheader*)packet;
801
802 if(ntohs(dh->qdcount) != 1 || dh->ancount!=0 || ntohs(dh->arcount)!=1 || dh->nscount!=0)
803 return 0;
804
805 if (len <= sizeof(dnsheader))
806 return 0;
807
808 unsigned int consumed;
809 DNSName qname(packet, len, sizeof(dnsheader), false, 0, 0, &consumed);
810 size_t pos = consumed + DNS_TYPE_SIZE + DNS_CLASS_SIZE;
811 uint16_t qtype, qclass;
812
813 if (len <= (sizeof(dnsheader)+pos))
814 return 0;
815
816 DNSName aname(packet, len, sizeof(dnsheader)+pos, true, &qtype, &qclass, &consumed);
817
818 if(qtype!=QType::OPT || sizeof(dnsheader)+pos+consumed+DNS_TYPE_SIZE+DNS_CLASS_SIZE+EDNS_EXTENDED_RCODE_SIZE+EDNS_VERSION_SIZE+1 >= len)
819 return 0;
820
821 uint8_t* z = (uint8_t*)packet+sizeof(dnsheader)+pos+consumed+DNS_TYPE_SIZE+DNS_CLASS_SIZE+EDNS_EXTENDED_RCODE_SIZE+EDNS_VERSION_SIZE;
822 return 0x100 * (*z) + *(z+1);
823 }
824 catch(...)
825 {
826 return 0;
827 }
828
829 static void spoofResponseFromString(DNSQuestion& dq, const string& spoofContent)
830 {
831 string result;
832
833 std::vector<std::string> addrs;
834 stringtok(addrs, spoofContent, " ,");
835
836 if (addrs.size() == 1) {
837 try {
838 ComboAddress spoofAddr(spoofContent);
839 SpoofAction sa({spoofAddr});
840 sa(&dq, &result);
841 }
842 catch(const PDNSException &e) {
843 SpoofAction sa(spoofContent); // CNAME then
844 sa(&dq, &result);
845 }
846 } else {
847 std::vector<ComboAddress> cas;
848 for (const auto& addr : addrs) {
849 try {
850 cas.push_back(ComboAddress(addr));
851 }
852 catch (...) {
853 }
854 }
855 SpoofAction sa(cas);
856 sa(&dq, &result);
857 }
858 }
859
860 bool processQuery(LocalHolders& holders, DNSQuestion& dq, string& poolname, int* delayMsec, const struct timespec& now)
861 {
862 g_rings.insertQuery(now,*dq.remote,*dq.qname,dq.qtype,dq.len,*dq.dh);
863
864 if(g_qcount.enabled) {
865 string qname = (*dq.qname).toString(".");
866 bool countQuery{true};
867 if(g_qcount.filter) {
868 std::lock_guard<std::mutex> lock(g_luamutex);
869 std::tie (countQuery, qname) = g_qcount.filter(dq);
870 }
871
872 if(countQuery) {
873 WriteLock wl(&g_qcount.queryLock);
874 if(!g_qcount.records.count(qname)) {
875 g_qcount.records[qname] = 0;
876 }
877 g_qcount.records[qname]++;
878 }
879 }
880
881 if(auto got = holders.dynNMGBlock->lookup(*dq.remote)) {
882 auto updateBlockStats = [&got]() {
883 g_stats.dynBlocked++;
884 got->second.blocks++;
885 };
886
887 if(now < got->second.until) {
888 DNSAction::Action action = got->second.action;
889 if (action == DNSAction::Action::None) {
890 action = g_dynBlockAction;
891 }
892 if (action == DNSAction::Action::Refused) {
893 vinfolog("Query from %s refused because of dynamic block", dq.remote->toStringWithPort());
894 updateBlockStats();
895
896 dq.dh->rcode = RCode::Refused;
897 dq.dh->qr=true;
898 return true;
899 }
900 else if (action == DNSAction::Action::Truncate) {
901 if(!dq.tcp) {
902 updateBlockStats();
903 vinfolog("Query from %s truncated because of dynamic block", dq.remote->toStringWithPort());
904 dq.dh->tc = true;
905 dq.dh->qr = true;
906 return true;
907 }
908 else {
909 vinfolog("Query from %s for %s over TCP *not* truncated because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toString());
910 }
911
912 }
913 else {
914 updateBlockStats();
915 vinfolog("Query from %s dropped because of dynamic block", dq.remote->toStringWithPort());
916 return false;
917 }
918 }
919 }
920
921 if(auto got = holders.dynSMTBlock->lookup(*dq.qname)) {
922 auto updateBlockStats = [&got]() {
923 g_stats.dynBlocked++;
924 got->blocks++;
925 };
926
927 if(now < got->until) {
928 DNSAction::Action action = got->action;
929 if (action == DNSAction::Action::None) {
930 action = g_dynBlockAction;
931 }
932 if (action == DNSAction::Action::Refused) {
933 vinfolog("Query from %s for %s refused because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toString());
934 updateBlockStats();
935
936 dq.dh->rcode = RCode::Refused;
937 dq.dh->qr=true;
938 return true;
939 }
940 else if (action == DNSAction::Action::Truncate) {
941 if(!dq.tcp) {
942 updateBlockStats();
943
944 vinfolog("Query from %s for %s truncated because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toString());
945 dq.dh->tc = true;
946 dq.dh->qr = true;
947 return true;
948 }
949 else {
950 vinfolog("Query from %s for %s over TCP *not* truncated because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toString());
951 }
952 }
953 else {
954 updateBlockStats();
955 vinfolog("Query from %s for %s dropped because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toString());
956 return false;
957 }
958 }
959 }
960
961 DNSAction::Action action=DNSAction::Action::None;
962 string ruleresult;
963 for(const auto& lr : *holders.rulactions) {
964 if(lr.d_rule->matches(&dq)) {
965 lr.d_rule->d_matches++;
966 action=(*lr.d_action)(&dq, &ruleresult);
967
968 switch(action) {
969 case DNSAction::Action::Allow:
970 return true;
971 break;
972 case DNSAction::Action::Drop:
973 g_stats.ruleDrop++;
974 return false;
975 break;
976 case DNSAction::Action::Nxdomain:
977 dq.dh->rcode = RCode::NXDomain;
978 dq.dh->qr=true;
979 g_stats.ruleNXDomain++;
980 return true;
981 break;
982 case DNSAction::Action::Refused:
983 dq.dh->rcode = RCode::Refused;
984 dq.dh->qr=true;
985 g_stats.ruleRefused++;
986 return true;
987 break;
988 case DNSAction::Action::ServFail:
989 dq.dh->rcode = RCode::ServFail;
990 dq.dh->qr=true;
991 g_stats.ruleServFail++;
992 return true;
993 break;
994 case DNSAction::Action::Spoof:
995 spoofResponseFromString(dq, ruleresult);
996 return true;
997 break;
998 case DNSAction::Action::Truncate:
999 dq.dh->tc = true;
1000 dq.dh->qr = true;
1001 return true;
1002 break;
1003 case DNSAction::Action::HeaderModify:
1004 return true;
1005 break;
1006 case DNSAction::Action::Pool:
1007 poolname=ruleresult;
1008 return true;
1009 break;
1010 /* non-terminal actions follow */
1011 case DNSAction::Action::Delay:
1012 *delayMsec = static_cast<int>(pdns_stou(ruleresult)); // sorry
1013 break;
1014 case DNSAction::Action::None:
1015 break;
1016 }
1017 }
1018 }
1019
1020 return true;
1021 }
1022
1023 bool processResponse(LocalStateHolder<vector<DNSDistResponseRuleAction> >& localRespRulactions, DNSResponse& dr, int* delayMsec)
1024 {
1025 DNSResponseAction::Action action=DNSResponseAction::Action::None;
1026 std::string ruleresult;
1027 for(const auto& lr : *localRespRulactions) {
1028 if(lr.d_rule->matches(&dr)) {
1029 lr.d_rule->d_matches++;
1030 action=(*lr.d_action)(&dr, &ruleresult);
1031 switch(action) {
1032 case DNSResponseAction::Action::Allow:
1033 return true;
1034 break;
1035 case DNSResponseAction::Action::Drop:
1036 return false;
1037 break;
1038 case DNSResponseAction::Action::HeaderModify:
1039 return true;
1040 break;
1041 case DNSResponseAction::Action::ServFail:
1042 dr.dh->rcode = RCode::ServFail;
1043 return true;
1044 break;
1045 /* non-terminal actions follow */
1046 case DNSResponseAction::Action::Delay:
1047 *delayMsec = static_cast<int>(pdns_stou(ruleresult)); // sorry
1048 break;
1049 case DNSResponseAction::Action::None:
1050 break;
1051 }
1052 }
1053 }
1054
1055 return true;
1056 }
1057
1058 static ssize_t udpClientSendRequestToBackend(DownstreamState* ss, const int sd, const char* request, const size_t requestLen)
1059 {
1060 ssize_t result;
1061
1062 if (ss->sourceItf == 0) {
1063 result = send(sd, request, requestLen, 0);
1064 }
1065 else {
1066 struct msghdr msgh;
1067 struct iovec iov;
1068 char cbuf[256];
1069 fillMSGHdr(&msgh, &iov, cbuf, sizeof(cbuf), const_cast<char*>(request), requestLen, &ss->remote);
1070 addCMsgSrcAddr(&msgh, cbuf, &ss->sourceAddr, ss->sourceItf);
1071 result = sendmsg(sd, &msgh, 0);
1072 }
1073
1074 if (result == -1) {
1075 int savederrno = errno;
1076 vinfolog("Error sending request to backend %s: %d", ss->remote.toStringWithPort(), savederrno);
1077
1078 /* This might sound silly, but on Linux send() might fail with EINVAL
1079 if the interface the socket was bound to doesn't exist anymore. */
1080 if (savederrno == EINVAL) {
1081 ss->reconnect();
1082 }
1083 }
1084
1085 return result;
1086 }
1087
1088 bool addXPF(DNSQuestion& dq, uint16_t optionCode)
1089 {
1090 std::string payload = generateXPFPayload(dq.tcp, *dq.remote, *dq.local);
1091 uint8_t root = '\0';
1092 dnsrecordheader drh;
1093 drh.d_type = htons(optionCode);
1094 drh.d_class = htons(QClass::IN);
1095 drh.d_ttl = 0;
1096 drh.d_clen = htons(payload.size());
1097 size_t recordHeaderLen = sizeof(root) + sizeof(drh);
1098
1099 size_t available = dq.size - dq.len;
1100
1101 if ((payload.size() + recordHeaderLen) > available) {
1102 return false;
1103 }
1104
1105 size_t pos = dq.len;
1106 memcpy(reinterpret_cast<char*>(dq.dh) + pos, &root, sizeof(root));
1107 pos += sizeof(root);
1108 memcpy(reinterpret_cast<char*>(dq.dh) + pos, &drh, sizeof(drh));
1109 pos += sizeof(drh);
1110 memcpy(reinterpret_cast<char*>(dq.dh) + pos, payload.data(), payload.size());
1111 pos += payload.size();
1112
1113 dq.len = pos;
1114
1115 dq.dh->arcount = htons(ntohs(dq.dh->arcount) + 1);
1116
1117 return true;
1118 }
1119
1120 static bool isUDPQueryAcceptable(ClientState& cs, LocalHolders& holders, const struct msghdr* msgh, const ComboAddress& remote, ComboAddress& dest)
1121 {
1122 if (msgh->msg_flags & MSG_TRUNC) {
1123 /* message was too large for our buffer */
1124 vinfolog("Dropping message too large for our buffer");
1125 g_stats.nonCompliantQueries++;
1126 return false;
1127 }
1128
1129 if(!holders.acl->match(remote)) {
1130 vinfolog("Query from %s dropped because of ACL", remote.toStringWithPort());
1131 g_stats.aclDrops++;
1132 return false;
1133 }
1134
1135 cs.queries++;
1136 g_stats.queries++;
1137
1138 if (HarvestDestinationAddress(msgh, &dest)) {
1139 /* we don't get the port, only the address */
1140 dest.sin4.sin_port = cs.local.sin4.sin_port;
1141 }
1142 else {
1143 dest.sin4.sin_family = 0;
1144 }
1145
1146 return true;
1147 }
1148
1149 #ifdef HAVE_DNSCRYPT
1150 static bool checkDNSCryptQuery(const ClientState& cs, const char* query, uint16_t& len, std::shared_ptr<DNSCryptQuery>& dnsCryptQuery, const ComboAddress& dest, const ComboAddress& remote, time_t now)
1151 {
1152 if (cs.dnscryptCtx) {
1153 vector<uint8_t> response;
1154 uint16_t decryptedQueryLen = 0;
1155
1156 dnsCryptQuery = std::make_shared<DNSCryptQuery>(cs.dnscryptCtx);
1157
1158 bool decrypted = handleDNSCryptQuery(const_cast<char*>(query), len, dnsCryptQuery, &decryptedQueryLen, false, now, response);
1159
1160 if (!decrypted) {
1161 if (response.size() > 0) {
1162 sendUDPResponse(cs.udpFD, reinterpret_cast<char*>(response.data()), static_cast<uint16_t>(response.size()), 0, dest, remote);
1163 }
1164 return false;
1165 }
1166
1167 len = decryptedQueryLen;
1168 }
1169 return true;
1170 }
1171 #endif /* HAVE_DNSCRYPT */
1172
1173 bool checkQueryHeaders(const struct dnsheader* dh)
1174 {
1175 if (dh->qr) { // don't respond to responses
1176 g_stats.nonCompliantQueries++;
1177 return false;
1178 }
1179
1180 if (dh->qdcount == 0) {
1181 g_stats.emptyQueries++;
1182 return false;
1183 }
1184
1185 if (dh->rd) {
1186 g_stats.rdQueries++;
1187 }
1188
1189 return true;
1190 }
1191
1192 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1193 static void queueResponse(const ClientState& cs, const char* response, uint16_t responseLen, const ComboAddress& dest, const ComboAddress& remote, struct mmsghdr& outMsg, struct iovec* iov, char* cbuf)
1194 {
1195 outMsg.msg_len = 0;
1196 fillMSGHdr(&outMsg.msg_hdr, iov, nullptr, 0, const_cast<char*>(response), responseLen, const_cast<ComboAddress*>(&remote));
1197
1198 if (dest.sin4.sin_family == 0) {
1199 outMsg.msg_hdr.msg_control = nullptr;
1200 }
1201 else {
1202 addCMsgSrcAddr(&outMsg.msg_hdr, cbuf, &dest, 0);
1203 }
1204 }
1205 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1206
1207 static void processUDPQuery(ClientState& cs, LocalHolders& holders, const struct msghdr* msgh, const ComboAddress& remote, ComboAddress& dest, char* query, uint16_t len, size_t queryBufferSize, struct mmsghdr* responsesVect, unsigned int* queuedResponses, struct iovec* respIOV, char* respCBuf)
1208 {
1209 assert(responsesVect == nullptr || (queuedResponses != nullptr && respIOV != nullptr && respCBuf != nullptr));
1210 uint16_t queryId = 0;
1211
1212 try {
1213 if (!isUDPQueryAcceptable(cs, holders, msgh, remote, dest)) {
1214 return;
1215 }
1216
1217 /* we need an accurate ("real") value for the response and
1218 to store into the IDS, but not for insertion into the
1219 rings for example */
1220 struct timespec queryRealTime;
1221 struct timespec now;
1222 gettime(&now);
1223 gettime(&queryRealTime, true);
1224
1225 #ifdef HAVE_DNSCRYPT
1226 std::shared_ptr<DNSCryptQuery> dnsCryptQuery = nullptr;
1227
1228 if (!checkDNSCryptQuery(cs, query, len, dnsCryptQuery, dest, remote, queryRealTime.tv_sec)) {
1229 return;
1230 }
1231 #endif
1232
1233 struct dnsheader* dh = reinterpret_cast<struct dnsheader*>(query);
1234 queryId = ntohs(dh->id);
1235
1236 if (!checkQueryHeaders(dh)) {
1237 return;
1238 }
1239
1240 string poolname;
1241 int delayMsec = 0;
1242 const uint16_t * flags = getFlagsFromDNSHeader(dh);
1243 const uint16_t origFlags = *flags;
1244 uint16_t qtype, qclass;
1245 unsigned int consumed = 0;
1246 DNSName qname(query, len, sizeof(dnsheader), false, &qtype, &qclass, &consumed);
1247 DNSQuestion dq(&qname, qtype, qclass, dest.sin4.sin_family != 0 ? &dest : &cs.local, &remote, dh, queryBufferSize, len, false, &queryRealTime);
1248
1249 if (!processQuery(holders, dq, poolname, &delayMsec, now))
1250 {
1251 return;
1252 }
1253
1254 if(dq.dh->qr) { // something turned it into a response
1255 restoreFlags(dh, origFlags);
1256
1257 if (!cs.muted) {
1258 char* response = query;
1259 uint16_t responseLen = dq.len;
1260
1261 DNSResponse dr(dq.qname, dq.qtype, dq.qclass, dq.local, dq.remote, reinterpret_cast<dnsheader*>(response), dq.size, responseLen, false, &queryRealTime);
1262 #ifdef HAVE_PROTOBUF
1263 dr.uniqueId = dq.uniqueId;
1264 #endif
1265 dr.qTag = dq.qTag;
1266
1267 if (!processResponse(holders.selfAnsweredRespRulactions, dr, &delayMsec)) {
1268 return;
1269 }
1270
1271 #ifdef HAVE_DNSCRYPT
1272 if (!encryptResponse(response, &responseLen, dq.size, false, dnsCryptQuery, nullptr, nullptr)) {
1273 return;
1274 }
1275 #endif
1276 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1277 if (delayMsec == 0 && responsesVect != nullptr) {
1278 queueResponse(cs, response, responseLen, dest, remote, responsesVect[*queuedResponses], respIOV, respCBuf);
1279 (*queuedResponses)++;
1280 }
1281 else
1282 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1283 {
1284 sendUDPResponse(cs.udpFD, response, responseLen, delayMsec, dest, remote);
1285 }
1286
1287 g_stats.selfAnswered++;
1288 doLatencyStats(0); // we're not going to measure this
1289 }
1290
1291 return;
1292 }
1293
1294 DownstreamState* ss = nullptr;
1295 std::shared_ptr<ServerPool> serverPool = getPool(*holders.pools, poolname);
1296 std::shared_ptr<DNSDistPacketCache> packetCache = serverPool->packetCache;
1297 auto policy = *(holders.policy);
1298 if (serverPool->policy != nullptr) {
1299 policy = *(serverPool->policy);
1300 }
1301 auto servers = serverPool->getServers();
1302 if (policy.isLua) {
1303 std::lock_guard<std::mutex> lock(g_luamutex);
1304 ss = policy.policy(servers, &dq).get();
1305 }
1306 else {
1307 ss = policy.policy(servers, &dq).get();
1308 }
1309
1310 bool ednsAdded = false;
1311 bool ecsAdded = false;
1312 if (dq.useECS && ((ss && ss->useECS) || (!ss && serverPool->getECS()))) {
1313 if (!handleEDNSClientSubnet(query, dq.size, consumed, &dq.len, &(ednsAdded), &(ecsAdded), remote, dq.ecsOverride, dq.ecsPrefixLength)) {
1314 vinfolog("Dropping query from %s because we couldn't insert the ECS value", remote.toStringWithPort());
1315 return;
1316 }
1317 }
1318
1319 uint32_t cacheKey = 0;
1320 if (packetCache && !dq.skipCache) {
1321 uint16_t cachedResponseSize = dq.size;
1322 uint32_t allowExpired = ss ? 0 : g_staleCacheEntriesTTL;
1323 if (packetCache->get(dq, consumed, dh->id, query, &cachedResponseSize, &cacheKey, allowExpired)) {
1324 DNSResponse dr(dq.qname, dq.qtype, dq.qclass, dq.local, dq.remote, reinterpret_cast<dnsheader*>(query), dq.size, cachedResponseSize, false, &queryRealTime);
1325 #ifdef HAVE_PROTOBUF
1326 dr.uniqueId = dq.uniqueId;
1327 #endif
1328 dr.qTag = dq.qTag;
1329
1330 if (!processResponse(holders.cacheHitRespRulactions, dr, &delayMsec)) {
1331 return;
1332 }
1333
1334 if (!cs.muted) {
1335 #ifdef HAVE_DNSCRYPT
1336 if (!encryptResponse(query, &cachedResponseSize, dq.size, false, dnsCryptQuery, nullptr, nullptr)) {
1337 return;
1338 }
1339 #endif
1340 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1341 if (delayMsec == 0 && responsesVect != nullptr) {
1342 queueResponse(cs, query, cachedResponseSize, dest, remote, responsesVect[*queuedResponses], respIOV, respCBuf);
1343 (*queuedResponses)++;
1344 }
1345 else
1346 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1347 {
1348 sendUDPResponse(cs.udpFD, query, cachedResponseSize, delayMsec, dest, remote);
1349 }
1350 }
1351
1352 g_stats.cacheHits++;
1353 doLatencyStats(0); // we're not going to measure this
1354 return;
1355 }
1356 g_stats.cacheMisses++;
1357 }
1358
1359 if(!ss) {
1360 g_stats.noPolicy++;
1361
1362 if (g_servFailOnNoPolicy && !cs.muted) {
1363 char* response = query;
1364 uint16_t responseLen = dq.len;
1365 restoreFlags(dh, origFlags);
1366
1367 dq.dh->rcode = RCode::ServFail;
1368 dq.dh->qr = true;
1369
1370 DNSResponse dr(dq.qname, dq.qtype, dq.qclass, dq.local, dq.remote, reinterpret_cast<dnsheader*>(response), dq.size, responseLen, false, &queryRealTime);
1371 #ifdef HAVE_PROTOBUF
1372 dr.uniqueId = dq.uniqueId;
1373 #endif
1374 dr.qTag = dq.qTag;
1375
1376 if (!processResponse(holders.selfAnsweredRespRulactions, dr, &delayMsec)) {
1377 return;
1378 }
1379
1380 #ifdef HAVE_DNSCRYPT
1381 if (!encryptResponse(response, &responseLen, dq.size, false, dnsCryptQuery, nullptr, nullptr)) {
1382 return;
1383 }
1384 #endif
1385 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1386 if (responsesVect != nullptr) {
1387 queueResponse(cs, response, responseLen, dest, remote, responsesVect[*queuedResponses], respIOV, respCBuf);
1388 (*queuedResponses)++;
1389 }
1390 else
1391 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1392 {
1393 sendUDPResponse(cs.udpFD, response, responseLen, 0, dest, remote);
1394 }
1395
1396 // no response-only statistics counter to update.
1397 doLatencyStats(0); // we're not going to measure this
1398 }
1399 vinfolog("%s query for %s|%s from %s, no policy applied", g_servFailOnNoPolicy ? "ServFailed" : "Dropped", dq.qname->toString(), QType(dq.qtype).getName(), remote.toStringWithPort());
1400 return;
1401 }
1402
1403 if (dq.addXPF && ss->xpfRRCode != 0) {
1404 addXPF(dq, ss->xpfRRCode);
1405 }
1406
1407 ss->queries++;
1408
1409 unsigned int idOffset = (ss->idOffset++) % ss->idStates.size();
1410 IDState* ids = &ss->idStates[idOffset];
1411 ids->age = 0;
1412
1413 if(ids->origFD < 0) // if we are reusing, no change in outstanding
1414 ss->outstanding++;
1415 else {
1416 ss->reuseds++;
1417 g_stats.downstreamTimeouts++;
1418 }
1419
1420 ids->cs = &cs;
1421 ids->origFD = cs.udpFD;
1422 ids->origID = dh->id;
1423 ids->origRemote = remote;
1424 ids->sentTime.set(queryRealTime);
1425 ids->qname = qname;
1426 ids->qtype = dq.qtype;
1427 ids->qclass = dq.qclass;
1428 ids->delayMsec = delayMsec;
1429 ids->tempFailureTTL = dq.tempFailureTTL;
1430 ids->origFlags = origFlags;
1431 ids->cacheKey = cacheKey;
1432 ids->skipCache = dq.skipCache;
1433 ids->packetCache = packetCache;
1434 ids->ednsAdded = ednsAdded;
1435 ids->ecsAdded = ecsAdded;
1436 ids->qTag = dq.qTag;
1437
1438 /* If we couldn't harvest the real dest addr, still
1439 write down the listening addr since it will be useful
1440 (especially if it's not an 'any' one).
1441 We need to keep track of which one it is since we may
1442 want to use the real but not the listening addr to reply.
1443 */
1444 if (dest.sin4.sin_family != 0) {
1445 ids->origDest = dest;
1446 ids->destHarvested = true;
1447 }
1448 else {
1449 ids->origDest = cs.local;
1450 ids->destHarvested = false;
1451 }
1452 #ifdef HAVE_DNSCRYPT
1453 ids->dnsCryptQuery = dnsCryptQuery;
1454 #endif
1455 #ifdef HAVE_PROTOBUF
1456 ids->uniqueId = dq.uniqueId;
1457 #endif
1458
1459 dh->id = idOffset;
1460
1461 int fd = pickBackendSocketForSending(ss);
1462 ssize_t ret = udpClientSendRequestToBackend(ss, fd, query, dq.len);
1463
1464 if(ret < 0) {
1465 ss->sendErrors++;
1466 g_stats.downstreamSendErrors++;
1467 }
1468
1469 vinfolog("Got query for %s|%s from %s, relayed to %s", ids->qname.toString(), QType(ids->qtype).getName(), remote.toStringWithPort(), ss->getName());
1470 }
1471 catch(const std::exception& e){
1472 vinfolog("Got an error in UDP question thread while parsing a query from %s, id %d: %s", remote.toStringWithPort(), queryId, e.what());
1473 }
1474 }
1475
1476 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1477 static void MultipleMessagesUDPClientThread(ClientState* cs, LocalHolders& holders)
1478 {
1479 struct MMReceiver
1480 {
1481 char packet[4096];
1482 /* used by HarvestDestinationAddress */
1483 char cbuf[256];
1484 ComboAddress remote;
1485 ComboAddress dest;
1486 struct iovec iov;
1487 };
1488 const size_t vectSize = g_udpVectorSize;
1489 /* the actual buffer is larger because:
1490 - we may have to add EDNS and/or ECS
1491 - we use it for self-generated responses (from rule or cache)
1492 but we only accept incoming payloads up to that size
1493 */
1494 static_assert(s_udpIncomingBufferSize <= sizeof(MMReceiver::packet), "the incoming buffer size should not be larger than sizeof(MMReceiver::packet)");
1495
1496 auto recvData = std::unique_ptr<MMReceiver[]>(new MMReceiver[vectSize]);
1497 auto msgVec = std::unique_ptr<struct mmsghdr[]>(new struct mmsghdr[vectSize]);
1498 auto outMsgVec = std::unique_ptr<struct mmsghdr[]>(new struct mmsghdr[vectSize]);
1499
1500 /* initialize the structures needed to receive our messages */
1501 for (size_t idx = 0; idx < vectSize; idx++) {
1502 recvData[idx].remote.sin4.sin_family = cs->local.sin4.sin_family;
1503 fillMSGHdr(&msgVec[idx].msg_hdr, &recvData[idx].iov, recvData[idx].cbuf, sizeof(recvData[idx].cbuf), recvData[idx].packet, s_udpIncomingBufferSize, &recvData[idx].remote);
1504 }
1505
1506 /* go now */
1507 for(;;) {
1508
1509 /* reset the IO vector, since it's also used to send the vector of responses
1510 to avoid having to copy the data around */
1511 for (size_t idx = 0; idx < vectSize; idx++) {
1512 recvData[idx].iov.iov_base = recvData[idx].packet;
1513 recvData[idx].iov.iov_len = sizeof(recvData[idx].packet);
1514 }
1515
1516 /* block until we have at least one message ready, but return
1517 as many as possible to save the syscall costs */
1518 int msgsGot = recvmmsg(cs->udpFD, msgVec.get(), vectSize, MSG_WAITFORONE | MSG_TRUNC, nullptr);
1519
1520 if (msgsGot <= 0) {
1521 vinfolog("Getting UDP messages via recvmmsg() failed with: %s", strerror(errno));
1522 continue;
1523 }
1524
1525 unsigned int msgsToSend = 0;
1526
1527 /* process the received messages */
1528 for (int msgIdx = 0; msgIdx < msgsGot; msgIdx++) {
1529 const struct msghdr* msgh = &msgVec[msgIdx].msg_hdr;
1530 unsigned int got = msgVec[msgIdx].msg_len;
1531 const ComboAddress& remote = recvData[msgIdx].remote;
1532
1533 if (got < sizeof(struct dnsheader)) {
1534 g_stats.nonCompliantQueries++;
1535 continue;
1536 }
1537
1538 processUDPQuery(*cs, holders, msgh, remote, recvData[msgIdx].dest, recvData[msgIdx].packet, static_cast<uint16_t>(got), sizeof(recvData[msgIdx].packet), outMsgVec.get(), &msgsToSend, &recvData[msgIdx].iov, recvData[msgIdx].cbuf);
1539
1540 }
1541
1542 /* immediate (not delayed or sent to a backend) responses (mostly from a rule, dynamic block
1543 or the cache) can be sent in batch too */
1544
1545 if (msgsToSend > 0 && msgsToSend <= static_cast<unsigned int>(msgsGot)) {
1546 int sent = sendmmsg(cs->udpFD, outMsgVec.get(), msgsToSend, 0);
1547
1548 if (sent < 0 || static_cast<unsigned int>(sent) != msgsToSend) {
1549 vinfolog("Error sending responses with sendmmsg() (%d on %u): %s", sent, msgsToSend, strerror(errno));
1550 }
1551 }
1552
1553 }
1554 }
1555 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1556
1557 // listens to incoming queries, sends out to downstream servers, noting the intended return path
1558 static void* udpClientThread(ClientState* cs)
1559 try
1560 {
1561 LocalHolders holders;
1562
1563 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1564 if (g_udpVectorSize > 1) {
1565 MultipleMessagesUDPClientThread(cs, holders);
1566
1567 }
1568 else
1569 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1570 {
1571 char packet[4096];
1572 /* the actual buffer is larger because:
1573 - we may have to add EDNS and/or ECS
1574 - we use it for self-generated responses (from rule or cache)
1575 but we only accept incoming payloads up to that size
1576 */
1577 static_assert(s_udpIncomingBufferSize <= sizeof(packet), "the incoming buffer size should not be larger than sizeof(MMReceiver::packet)");
1578 struct msghdr msgh;
1579 struct iovec iov;
1580 /* used by HarvestDestinationAddress */
1581 char cbuf[256];
1582
1583 ComboAddress remote;
1584 ComboAddress dest;
1585 remote.sin4.sin_family = cs->local.sin4.sin_family;
1586 fillMSGHdr(&msgh, &iov, cbuf, sizeof(cbuf), packet, sizeof(packet), &remote);
1587
1588 for(;;) {
1589 ssize_t got = recvmsg(cs->udpFD, &msgh, 0);
1590
1591 if (got < 0 || static_cast<size_t>(got) < sizeof(struct dnsheader)) {
1592 g_stats.nonCompliantQueries++;
1593 continue;
1594 }
1595
1596 processUDPQuery(*cs, holders, &msgh, remote, dest, packet, static_cast<uint16_t>(got), s_udpIncomingBufferSize, nullptr, nullptr, nullptr, nullptr);
1597 }
1598 }
1599
1600 return nullptr;
1601 }
1602 catch(const std::exception &e)
1603 {
1604 errlog("UDP client thread died because of exception: %s", e.what());
1605 return nullptr;
1606 }
1607 catch(const PDNSException &e)
1608 {
1609 errlog("UDP client thread died because of PowerDNS exception: %s", e.reason);
1610 return nullptr;
1611 }
1612 catch(...)
1613 {
1614 errlog("UDP client thread died because of an exception: %s", "unknown");
1615 return nullptr;
1616 }
1617
1618 static bool upCheck(DownstreamState& ds)
1619 try
1620 {
1621 DNSName checkName = ds.checkName;
1622 uint16_t checkType = ds.checkType.getCode();
1623 uint16_t checkClass = ds.checkClass;
1624 dnsheader checkHeader;
1625 memset(&checkHeader, 0, sizeof(checkHeader));
1626
1627 checkHeader.qdcount = htons(1);
1628 #ifdef HAVE_LIBSODIUM
1629 checkHeader.id = randombytes_random() % 65536;
1630 #else
1631 checkHeader.id = random() % 65536;
1632 #endif
1633
1634 checkHeader.rd = true;
1635 if (ds.setCD) {
1636 checkHeader.cd = true;
1637 }
1638
1639
1640 if (ds.checkFunction) {
1641 std::lock_guard<std::mutex> lock(g_luamutex);
1642 auto ret = ds.checkFunction(checkName, checkType, checkClass, &checkHeader);
1643 checkName = std::get<0>(ret);
1644 checkType = std::get<1>(ret);
1645 checkClass = std::get<2>(ret);
1646 }
1647
1648 vector<uint8_t> packet;
1649 DNSPacketWriter dpw(packet, checkName, checkType, checkClass);
1650 dnsheader * requestHeader = dpw.getHeader();
1651 *requestHeader = checkHeader;
1652
1653 Socket sock(ds.remote.sin4.sin_family, SOCK_DGRAM);
1654 sock.setNonBlocking();
1655 if (!IsAnyAddress(ds.sourceAddr)) {
1656 sock.setReuseAddr();
1657 sock.bind(ds.sourceAddr);
1658 }
1659 sock.connect(ds.remote);
1660 ssize_t sent = udpClientSendRequestToBackend(&ds, sock.getHandle(), (char*)&packet[0], packet.size());
1661 if (sent < 0) {
1662 int ret = errno;
1663 if (g_verboseHealthChecks)
1664 infolog("Error while sending a health check query to backend %s: %d", ds.getNameWithAddr(), ret);
1665 return false;
1666 }
1667
1668 int ret=waitForRWData(sock.getHandle(), true, 1, 0);
1669 if(ret < 0 || !ret) { // error, timeout, both are down!
1670 if (ret < 0) {
1671 ret = errno;
1672 if (g_verboseHealthChecks)
1673 infolog("Error while waiting for the health check response from backend %s: %d", ds.getNameWithAddr(), ret);
1674 }
1675 else {
1676 if (g_verboseHealthChecks)
1677 infolog("Timeout while waiting for the health check response from backend %s", ds.getNameWithAddr());
1678 }
1679 return false;
1680 }
1681
1682 string reply;
1683 sock.recvFrom(reply, ds.remote);
1684
1685 const dnsheader * responseHeader = reinterpret_cast<const dnsheader *>(reply.c_str());
1686
1687 if (reply.size() < sizeof(*responseHeader)) {
1688 if (g_verboseHealthChecks)
1689 infolog("Invalid health check response of size %d from backend %s, expecting at least %d", reply.size(), ds.getNameWithAddr(), sizeof(*responseHeader));
1690 return false;
1691 }
1692
1693 if (responseHeader->id != requestHeader->id) {
1694 if (g_verboseHealthChecks)
1695 infolog("Invalid health check response id %d from backend %s, expecting %d", responseHeader->id, ds.getNameWithAddr(), requestHeader->id);
1696 return false;
1697 }
1698
1699 if (!responseHeader->qr) {
1700 if (g_verboseHealthChecks)
1701 infolog("Invalid health check response from backend %s, expecting QR to be set", ds.getNameWithAddr());
1702 return false;
1703 }
1704
1705 if (responseHeader->rcode == RCode::ServFail) {
1706 if (g_verboseHealthChecks)
1707 infolog("Backend %s responded to health check with ServFail", ds.getNameWithAddr());
1708 return false;
1709 }
1710
1711 if (ds.mustResolve && (responseHeader->rcode == RCode::NXDomain || responseHeader->rcode == RCode::Refused)) {
1712 if (g_verboseHealthChecks)
1713 infolog("Backend %s responded to health check with %s while mustResolve is set", ds.getNameWithAddr(), responseHeader->rcode == RCode::NXDomain ? "NXDomain" : "Refused");
1714 return false;
1715 }
1716
1717 uint16_t receivedType;
1718 uint16_t receivedClass;
1719 DNSName receivedName(reply.c_str(), reply.size(), sizeof(dnsheader), false, &receivedType, &receivedClass);
1720
1721 if (receivedName != checkName || receivedType != checkType || receivedClass != checkClass) {
1722 if (g_verboseHealthChecks)
1723 infolog("Backend %s responded to health check with an invalid qname (%s vs %s), qtype (%s vs %s) or qclass (%d vs %d)", ds.getNameWithAddr(), receivedName.toLogString(), checkName.toLogString(), QType(receivedType).getName(), QType(checkType).getName(), receivedClass, checkClass);
1724 return false;
1725 }
1726
1727 return true;
1728 }
1729 catch(const std::exception& e)
1730 {
1731 if (g_verboseHealthChecks)
1732 infolog("Error checking the health of backend %s: %s", ds.getNameWithAddr(), e.what());
1733 return false;
1734 }
1735 catch(...)
1736 {
1737 if (g_verboseHealthChecks)
1738 infolog("Unknown exception while checking the health of backend %s", ds.getNameWithAddr());
1739 return false;
1740 }
1741
1742 uint64_t g_maxTCPClientThreads{10};
1743 std::atomic<uint16_t> g_cacheCleaningDelay{60};
1744 std::atomic<uint16_t> g_cacheCleaningPercentage{100};
1745
1746 void* maintThread()
1747 {
1748 int interval = 1;
1749 size_t counter = 0;
1750 int32_t secondsToWaitLog = 0;
1751
1752 for(;;) {
1753 sleep(interval);
1754
1755 {
1756 std::lock_guard<std::mutex> lock(g_luamutex);
1757 auto f = g_lua.readVariable<boost::optional<std::function<void()> > >("maintenance");
1758 if(f) {
1759 try {
1760 (*f)();
1761 secondsToWaitLog = 0;
1762 }
1763 catch(std::exception &e) {
1764 if (secondsToWaitLog <= 0) {
1765 infolog("Error during execution of maintenance function: %s", e.what());
1766 secondsToWaitLog = 61;
1767 }
1768 secondsToWaitLog -= interval;
1769 }
1770 }
1771 }
1772
1773 counter++;
1774 if (counter >= g_cacheCleaningDelay) {
1775 auto localPools = g_pools.getLocal();
1776 std::shared_ptr<DNSDistPacketCache> packetCache = nullptr;
1777 for (const auto& entry : *localPools) {
1778 packetCache = entry.second->packetCache;
1779 if (packetCache) {
1780 size_t upTo = (packetCache->getMaxEntries()* (100 - g_cacheCleaningPercentage)) / 100;
1781 packetCache->purgeExpired(upTo);
1782 }
1783 }
1784 counter = 0;
1785 }
1786
1787 // ponder pruning g_dynblocks of expired entries here
1788 }
1789 return 0;
1790 }
1791
1792 void* healthChecksThread()
1793 {
1794 int interval = 1;
1795
1796 for(;;) {
1797 sleep(interval);
1798
1799 if(g_tcpclientthreads->getQueuedCount() > 1 && !g_tcpclientthreads->hasReachedMaxThreads())
1800 g_tcpclientthreads->addTCPClientThread();
1801
1802 auto states = g_dstates.getLocal(); // this points to the actual shared_ptrs!
1803 for(auto& dss : *states) {
1804 if(dss->availability==DownstreamState::Availability::Auto) {
1805 bool newState=upCheck(*dss);
1806 if (newState) {
1807 if (dss->currentCheckFailures != 0) {
1808 dss->currentCheckFailures = 0;
1809 }
1810 }
1811 else if (!newState && dss->upStatus) {
1812 dss->currentCheckFailures++;
1813 if (dss->currentCheckFailures < dss->maxCheckFailures) {
1814 newState = true;
1815 }
1816 }
1817
1818 if(newState != dss->upStatus) {
1819 warnlog("Marking downstream %s as '%s'", dss->getNameWithAddr(), newState ? "up" : "down");
1820
1821 if (newState && !dss->connected) {
1822 for (auto& fd : dss->sockets) {
1823 try {
1824 SConnect(fd, dss->remote);
1825 {
1826 std::lock_guard<std::mutex> lock(dss->socketsLock);
1827 dss->mplexer->addReadFD(fd, [](int, boost::any) {});
1828 }
1829 dss->connected = true;
1830 }
1831 catch(const std::runtime_error& error) {
1832 infolog("Error connecting to new server with address %s: %s", dss->remote.toStringWithPort(), error.what());
1833 newState = false;
1834 dss->connected = false;
1835 }
1836 }
1837 if (dss->connected) {
1838 dss->tid = thread(responderThread, dss);
1839 }
1840 }
1841
1842 dss->upStatus = newState;
1843 dss->currentCheckFailures = 0;
1844 if (g_snmpAgent && g_snmpTrapsEnabled) {
1845 g_snmpAgent->sendBackendStatusChangeTrap(dss);
1846 }
1847 }
1848 }
1849
1850 auto delta = dss->sw.udiffAndSet()/1000000.0;
1851 dss->queryLoad = 1.0*(dss->queries.load() - dss->prev.queries.load())/delta;
1852 dss->dropRate = 1.0*(dss->reuseds.load() - dss->prev.reuseds.load())/delta;
1853 dss->prev.queries.store(dss->queries.load());
1854 dss->prev.reuseds.store(dss->reuseds.load());
1855
1856 for(IDState& ids : dss->idStates) { // timeouts
1857 if(ids.origFD >=0 && ids.age++ > g_udpTimeout) {
1858 /* We set origFD to -1 as soon as possible
1859 to limit the risk of racing with the
1860 responder thread.
1861 The UDP client thread only checks origFD to
1862 know whether outstanding has to be incremented,
1863 so the sooner the better any way since we _will_
1864 decrement it.
1865 */
1866 ids.origFD = -1;
1867 ids.age = 0;
1868 dss->reuseds++;
1869 --dss->outstanding;
1870 g_stats.downstreamTimeouts++; // this is an 'actively' discovered timeout
1871 vinfolog("Had a downstream timeout from %s (%s) for query for %s|%s from %s",
1872 dss->remote.toStringWithPort(), dss->name,
1873 ids.qname.toString(), QType(ids.qtype).getName(), ids.origRemote.toStringWithPort());
1874
1875 struct timespec ts;
1876 gettime(&ts);
1877
1878 struct dnsheader fake;
1879 memset(&fake, 0, sizeof(fake));
1880 fake.id = ids.origID;
1881
1882 g_rings.insertResponse(ts, ids.origRemote, ids.qname, ids.qtype, std::numeric_limits<unsigned int>::max(), 0, fake, dss->remote);
1883 }
1884 }
1885 }
1886 }
1887 return 0;
1888 }
1889
1890 static void bindAny(int af, int sock)
1891 {
1892 __attribute__((unused)) int one = 1;
1893
1894 #ifdef IP_FREEBIND
1895 if (setsockopt(sock, IPPROTO_IP, IP_FREEBIND, &one, sizeof(one)) < 0)
1896 warnlog("Warning: IP_FREEBIND setsockopt failed: %s", strerror(errno));
1897 #endif
1898
1899 #ifdef IP_BINDANY
1900 if (af == AF_INET)
1901 if (setsockopt(sock, IPPROTO_IP, IP_BINDANY, &one, sizeof(one)) < 0)
1902 warnlog("Warning: IP_BINDANY setsockopt failed: %s", strerror(errno));
1903 #endif
1904 #ifdef IPV6_BINDANY
1905 if (af == AF_INET6)
1906 if (setsockopt(sock, IPPROTO_IPV6, IPV6_BINDANY, &one, sizeof(one)) < 0)
1907 warnlog("Warning: IPV6_BINDANY setsockopt failed: %s", strerror(errno));
1908 #endif
1909 #ifdef SO_BINDANY
1910 if (setsockopt(sock, SOL_SOCKET, SO_BINDANY, &one, sizeof(one)) < 0)
1911 warnlog("Warning: SO_BINDANY setsockopt failed: %s", strerror(errno));
1912 #endif
1913 }
1914
1915 static void dropGroupPrivs(gid_t gid)
1916 {
1917 if (gid) {
1918 if (setgid(gid) == 0) {
1919 if (setgroups(0, NULL) < 0) {
1920 warnlog("Warning: Unable to drop supplementary gids: %s", strerror(errno));
1921 }
1922 }
1923 else {
1924 warnlog("Warning: Unable to set group ID to %d: %s", gid, strerror(errno));
1925 }
1926 }
1927 }
1928
1929 static void dropUserPrivs(uid_t uid)
1930 {
1931 if(uid) {
1932 if(setuid(uid) < 0) {
1933 warnlog("Warning: Unable to set user ID to %d: %s", uid, strerror(errno));
1934 }
1935 }
1936 }
1937
1938 static void checkFileDescriptorsLimits(size_t udpBindsCount, size_t tcpBindsCount)
1939 {
1940 /* stdin, stdout, stderr */
1941 size_t requiredFDsCount = 3;
1942 auto backends = g_dstates.getLocal();
1943 /* UDP sockets to backends */
1944 size_t backendUDPSocketsCount = 0;
1945 for (const auto& backend : *backends) {
1946 backendUDPSocketsCount += backend->sockets.size();
1947 }
1948 requiredFDsCount += backendUDPSocketsCount;
1949 /* TCP sockets to backends */
1950 requiredFDsCount += (backends->size() * g_maxTCPClientThreads);
1951 /* listening sockets */
1952 requiredFDsCount += udpBindsCount;
1953 requiredFDsCount += tcpBindsCount;
1954 /* max TCP connections currently served */
1955 requiredFDsCount += g_maxTCPClientThreads;
1956 /* max pipes for communicating between TCP acceptors and client threads */
1957 requiredFDsCount += (g_maxTCPClientThreads * 2);
1958 /* max TCP queued connections */
1959 requiredFDsCount += g_maxTCPQueuedConnections;
1960 /* DelayPipe pipe */
1961 requiredFDsCount += 2;
1962 /* syslog socket */
1963 requiredFDsCount++;
1964 /* webserver main socket */
1965 requiredFDsCount++;
1966 /* console main socket */
1967 requiredFDsCount++;
1968 /* carbon export */
1969 requiredFDsCount++;
1970 /* history file */
1971 requiredFDsCount++;
1972 struct rlimit rl;
1973 getrlimit(RLIMIT_NOFILE, &rl);
1974 if (rl.rlim_cur <= requiredFDsCount) {
1975 warnlog("Warning, this configuration can use more than %d file descriptors, web server and console connections not included, and the current limit is %d.", std::to_string(requiredFDsCount), std::to_string(rl.rlim_cur));
1976 #ifdef HAVE_SYSTEMD
1977 warnlog("You can increase this value by using LimitNOFILE= in the systemd unit file or ulimit.");
1978 #else
1979 warnlog("You can increase this value by using ulimit.");
1980 #endif
1981 }
1982 }
1983
1984 struct
1985 {
1986 vector<string> locals;
1987 vector<string> remotes;
1988 bool checkConfig{false};
1989 bool beClient{false};
1990 bool beSupervised{false};
1991 string command;
1992 string config;
1993 string uid;
1994 string gid;
1995 } g_cmdLine;
1996
1997 std::atomic<bool> g_configurationDone{false};
1998
1999 static void usage()
2000 {
2001 cout<<endl;
2002 cout<<"Syntax: dnsdist [-C,--config file] [-c,--client [IP[:PORT]]]\n";
2003 cout<<"[-e,--execute cmd] [-h,--help] [-l,--local addr]\n";
2004 cout<<"[-v,--verbose] [--check-config] [--version]\n";
2005 cout<<"\n";
2006 cout<<"-a,--acl netmask Add this netmask to the ACL\n";
2007 cout<<"-C,--config file Load configuration from 'file'\n";
2008 cout<<"-c,--client Operate as a client, connect to dnsdist. This reads\n";
2009 cout<<" controlSocket from your configuration file, but also\n";
2010 cout<<" accepts an IP:PORT argument\n";
2011 #ifdef HAVE_LIBSODIUM
2012 cout<<"-k,--setkey KEY Use KEY for encrypted communication to dnsdist. This\n";
2013 cout<<" is similar to setting setKey in the configuration file.\n";
2014 cout<<" NOTE: this will leak this key in your shell's history!\n";
2015 #endif
2016 cout<<"--check-config Validate the configuration file and exit. The exit-code\n";
2017 cout<<" reflects the validation, 0 is OK, 1 means an error.\n";
2018 cout<<" Any errors are printed as well.\n";
2019 cout<<"-e,--execute cmd Connect to dnsdist and execute 'cmd'\n";
2020 cout<<"-g,--gid gid Change the process group ID after binding sockets\n";
2021 cout<<"-h,--help Display this helpful message\n";
2022 cout<<"-l,--local address Listen on this local address\n";
2023 cout<<"--supervised Don't open a console, I'm supervised\n";
2024 cout<<" (use with e.g. systemd and daemontools)\n";
2025 cout<<"--disable-syslog Don't log to syslog, only to stdout\n";
2026 cout<<" (use with e.g. systemd)\n";
2027 cout<<"-u,--uid uid Change the process user ID after binding sockets\n";
2028 cout<<"-v,--verbose Enable verbose mode\n";
2029 cout<<"-V,--version Show dnsdist version information and exit\n";
2030 }
2031
2032 int main(int argc, char** argv)
2033 try
2034 {
2035 size_t udpBindsCount = 0;
2036 size_t tcpBindsCount = 0;
2037 rl_attempted_completion_function = my_completion;
2038 rl_completion_append_character = 0;
2039
2040 signal(SIGPIPE, SIG_IGN);
2041 signal(SIGCHLD, SIG_IGN);
2042 openlog("dnsdist", LOG_PID, LOG_DAEMON);
2043 g_console=true;
2044
2045 #ifdef HAVE_LIBSODIUM
2046 if (sodium_init() == -1) {
2047 cerr<<"Unable to initialize crypto library"<<endl;
2048 exit(EXIT_FAILURE);
2049 }
2050 g_hashperturb=randombytes_uniform(0xffffffff);
2051 srandom(randombytes_uniform(0xffffffff));
2052 #else
2053 {
2054 struct timeval tv;
2055 gettimeofday(&tv, 0);
2056 srandom(tv.tv_sec ^ tv.tv_usec ^ getpid());
2057 g_hashperturb=random();
2058 }
2059
2060 #endif
2061 ComboAddress clientAddress = ComboAddress();
2062 g_cmdLine.config=SYSCONFDIR "/dnsdist.conf";
2063 struct option longopts[]={
2064 {"acl", required_argument, 0, 'a'},
2065 {"config", required_argument, 0, 'C'},
2066 {"check-config", 0, 0, 1},
2067 {"execute", required_argument, 0, 'e'},
2068 {"client", 0, 0, 'c'},
2069 {"gid", required_argument, 0, 'g'},
2070 #ifdef HAVE_LIBSODIUM
2071 {"setkey", required_argument, 0, 'k'},
2072 #endif
2073 {"local", required_argument, 0, 'l'},
2074 {"supervised", 0, 0, 's'},
2075 {"disable-syslog", 0, 0, 2},
2076 {"uid", required_argument, 0, 'u'},
2077 {"verbose", 0, 0, 'v'},
2078 {"version", 0, 0, 'V'},
2079 {"help", 0, 0, 'h'},
2080 {0,0,0,0}
2081 };
2082 int longindex=0;
2083 string optstring;
2084 for(;;) {
2085 #ifdef HAVE_LIBSODIUM
2086 int c=getopt_long(argc, argv, "a:hce:C:k:l:v:g:u:V", longopts, &longindex);
2087 #else
2088 int c=getopt_long(argc, argv, "a:hce:C:l:v:g:u:V", longopts, &longindex);
2089 #endif
2090 if(c==-1)
2091 break;
2092 switch(c) {
2093 case 1:
2094 g_cmdLine.checkConfig=true;
2095 break;
2096 case 2:
2097 g_syslog=false;
2098 break;
2099 case 'C':
2100 g_cmdLine.config=optarg;
2101 break;
2102 case 'c':
2103 g_cmdLine.beClient=true;
2104 break;
2105 case 'e':
2106 g_cmdLine.command=optarg;
2107 break;
2108 case 'g':
2109 g_cmdLine.gid=optarg;
2110 break;
2111 case 'h':
2112 cout<<"dnsdist "<<VERSION<<endl;
2113 usage();
2114 cout<<"\n";
2115 exit(EXIT_SUCCESS);
2116 break;
2117 case 'a':
2118 optstring=optarg;
2119 g_ACL.modify([optstring](NetmaskGroup& nmg) { nmg.addMask(optstring); });
2120 break;
2121 #ifdef HAVE_LIBSODIUM
2122 case 'k':
2123 if (B64Decode(string(optarg), g_consoleKey) < 0) {
2124 cerr<<"Unable to decode key '"<<optarg<<"'."<<endl;
2125 exit(EXIT_FAILURE);
2126 }
2127 break;
2128 #endif
2129 case 'l':
2130 g_cmdLine.locals.push_back(trim_copy(string(optarg)));
2131 break;
2132 case 's':
2133 g_cmdLine.beSupervised=true;
2134 break;
2135 case 'u':
2136 g_cmdLine.uid=optarg;
2137 break;
2138 case 'v':
2139 g_verbose=true;
2140 break;
2141 case 'V':
2142 #ifdef LUAJIT_VERSION
2143 cout<<"dnsdist "<<VERSION<<" ("<<LUA_RELEASE<<" ["<<LUAJIT_VERSION<<"])"<<endl;
2144 #else
2145 cout<<"dnsdist "<<VERSION<<" ("<<LUA_RELEASE<<")"<<endl;
2146 #endif
2147 cout<<"Enabled features: ";
2148 #ifdef HAVE_DNS_OVER_TLS
2149 cout<<"dns-over-tls(";
2150 #ifdef HAVE_GNUTLS
2151 cout<<"gnutls";
2152 #ifdef HAVE_LIBSSL
2153 cout<<" ";
2154 #endif
2155 #endif
2156 #ifdef HAVE_LIBSSL
2157 cout<<"openssl";
2158 #endif
2159 cout<<") ";
2160 #endif
2161 #ifdef HAVE_DNSCRYPT
2162 cout<<"dnscrypt ";
2163 #endif
2164 #ifdef HAVE_EBPF
2165 cout<<"ebpf ";
2166 #endif
2167 #ifdef HAVE_FSTRM
2168 cout<<"fstrm ";
2169 #endif
2170 #ifdef HAVE_LIBSODIUM
2171 cout<<"libsodium ";
2172 #endif
2173 #ifdef HAVE_PROTOBUF
2174 cout<<"protobuf ";
2175 #endif
2176 #ifdef HAVE_RE2
2177 cout<<"re2 ";
2178 #endif
2179 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
2180 cout<<"recvmmsg/sendmmsg ";
2181 #endif
2182 #ifdef HAVE_NET_SNMP
2183 cout<<"snmp ";
2184 #endif
2185 #ifdef HAVE_SYSTEMD
2186 cout<<"systemd";
2187 #endif
2188 cout<<endl;
2189 exit(EXIT_SUCCESS);
2190 break;
2191 case '?':
2192 //getopt_long printed an error message.
2193 usage();
2194 exit(EXIT_FAILURE);
2195 break;
2196 }
2197 }
2198
2199 argc-=optind;
2200 argv+=optind;
2201 for(auto p = argv; *p; ++p) {
2202 if(g_cmdLine.beClient) {
2203 clientAddress = ComboAddress(*p, 5199);
2204 } else {
2205 g_cmdLine.remotes.push_back(*p);
2206 }
2207 }
2208
2209 ServerPolicy leastOutstandingPol{"leastOutstanding", leastOutstanding, false};
2210
2211 g_policy.setState(leastOutstandingPol);
2212 if(g_cmdLine.beClient || !g_cmdLine.command.empty()) {
2213 setupLua(true, g_cmdLine.config);
2214 if (clientAddress != ComboAddress())
2215 g_serverControl = clientAddress;
2216 doClient(g_serverControl, g_cmdLine.command);
2217 _exit(EXIT_SUCCESS);
2218 }
2219
2220 auto acl = g_ACL.getCopy();
2221 if(acl.empty()) {
2222 for(auto& addr : {"127.0.0.0/8", "10.0.0.0/8", "100.64.0.0/10", "169.254.0.0/16", "192.168.0.0/16", "172.16.0.0/12", "::1/128", "fc00::/7", "fe80::/10"})
2223 acl.addMask(addr);
2224 g_ACL.setState(acl);
2225 }
2226
2227 auto consoleACL = g_consoleACL.getCopy();
2228 for (const auto& mask : { "127.0.0.1/8", "::1/128" }) {
2229 consoleACL.addMask(mask);
2230 }
2231 g_consoleACL.setState(consoleACL);
2232
2233 if (g_cmdLine.checkConfig) {
2234 setupLua(true, g_cmdLine.config);
2235 // No exception was thrown
2236 infolog("Configuration '%s' OK!", g_cmdLine.config);
2237 _exit(EXIT_SUCCESS);
2238 }
2239
2240 auto todo=setupLua(false, g_cmdLine.config);
2241
2242 if(g_cmdLine.locals.size()) {
2243 g_locals.clear();
2244 for(auto loc : g_cmdLine.locals)
2245 g_locals.push_back(std::make_tuple(ComboAddress(loc, 53), true, false, 0, "", std::set<int>()));
2246 }
2247
2248 if(g_locals.empty())
2249 g_locals.push_back(std::make_tuple(ComboAddress("127.0.0.1", 53), true, false, 0, "", std::set<int>()));
2250
2251 g_configurationDone = true;
2252
2253 vector<ClientState*> toLaunch;
2254 for(const auto& local : g_locals) {
2255 ClientState* cs = new ClientState;
2256 cs->local= std::get<0>(local);
2257 cs->udpFD = SSocket(cs->local.sin4.sin_family, SOCK_DGRAM, 0);
2258 if(cs->local.sin4.sin_family == AF_INET6) {
2259 SSetsockopt(cs->udpFD, IPPROTO_IPV6, IPV6_V6ONLY, 1);
2260 }
2261 //if(g_vm.count("bind-non-local"))
2262 bindAny(cs->local.sin4.sin_family, cs->udpFD);
2263
2264 // if (!setSocketTimestamps(cs->udpFD))
2265 // g_log<<Logger::Warning<<"Unable to enable timestamp reporting for socket"<<endl;
2266
2267
2268 if(IsAnyAddress(cs->local)) {
2269 int one=1;
2270 setsockopt(cs->udpFD, IPPROTO_IP, GEN_IP_PKTINFO, &one, sizeof(one)); // linux supports this, so why not - might fail on other systems
2271 #ifdef IPV6_RECVPKTINFO
2272 setsockopt(cs->udpFD, IPPROTO_IPV6, IPV6_RECVPKTINFO, &one, sizeof(one));
2273 #endif
2274 }
2275
2276 if (std::get<2>(local)) {
2277 #ifdef SO_REUSEPORT
2278 SSetsockopt(cs->udpFD, SOL_SOCKET, SO_REUSEPORT, 1);
2279 #else
2280 warnlog("SO_REUSEPORT has been configured on local address '%s' but is not supported", std::get<0>(local).toStringWithPort());
2281 #endif
2282 }
2283
2284 const std::string& itf = std::get<4>(local);
2285 if (!itf.empty()) {
2286 #ifdef SO_BINDTODEVICE
2287 int res = setsockopt(cs->udpFD, SOL_SOCKET, SO_BINDTODEVICE, itf.c_str(), itf.length());
2288 if (res != 0) {
2289 warnlog("Error setting up the interface on local address '%s': %s", std::get<0>(local).toStringWithPort(), strerror(errno));
2290 }
2291 #else
2292 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", std::get<0>(local).toStringWithPort());
2293 #endif
2294 }
2295
2296 #ifdef HAVE_EBPF
2297 if (g_defaultBPFFilter) {
2298 cs->attachFilter(g_defaultBPFFilter);
2299 vinfolog("Attaching default BPF Filter to UDP frontend %s", cs->local.toStringWithPort());
2300 }
2301 #endif /* HAVE_EBPF */
2302
2303 cs->cpus = std::get<5>(local);
2304
2305 SBind(cs->udpFD, cs->local);
2306 toLaunch.push_back(cs);
2307 g_frontends.push_back(cs);
2308 udpBindsCount++;
2309 }
2310
2311 for(const auto& local : g_locals) {
2312 if(!std::get<1>(local)) { // no TCP/IP
2313 warnlog("Not providing TCP/IP service on local address '%s'", std::get<0>(local).toStringWithPort());
2314 continue;
2315 }
2316 ClientState* cs = new ClientState;
2317 cs->local= std::get<0>(local);
2318
2319 cs->tcpFD = SSocket(cs->local.sin4.sin_family, SOCK_STREAM, 0);
2320
2321 SSetsockopt(cs->tcpFD, SOL_SOCKET, SO_REUSEADDR, 1);
2322 #ifdef TCP_DEFER_ACCEPT
2323 SSetsockopt(cs->tcpFD, IPPROTO_TCP, TCP_DEFER_ACCEPT, 1);
2324 #endif
2325 if (std::get<3>(local) > 0) {
2326 #ifdef TCP_FASTOPEN
2327 SSetsockopt(cs->tcpFD, IPPROTO_TCP, TCP_FASTOPEN, std::get<3>(local));
2328 #else
2329 warnlog("TCP Fast Open has been configured on local address '%s' but is not supported", std::get<0>(local).toStringWithPort());
2330 #endif
2331 }
2332 if(cs->local.sin4.sin_family == AF_INET6) {
2333 SSetsockopt(cs->tcpFD, IPPROTO_IPV6, IPV6_V6ONLY, 1);
2334 }
2335 #ifdef SO_REUSEPORT
2336 /* no need to warn again if configured but support is not available, we already did for UDP */
2337 if (std::get<2>(local)) {
2338 SSetsockopt(cs->tcpFD, SOL_SOCKET, SO_REUSEPORT, 1);
2339 }
2340 #endif
2341
2342 const std::string& itf = std::get<4>(local);
2343 if (!itf.empty()) {
2344 #ifdef SO_BINDTODEVICE
2345 int res = setsockopt(cs->tcpFD, SOL_SOCKET, SO_BINDTODEVICE, itf.c_str(), itf.length());
2346 if (res != 0) {
2347 warnlog("Error setting up the interface on local address '%s': %s", std::get<0>(local).toStringWithPort(), strerror(errno));
2348 }
2349 #else
2350 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", std::get<0>(local).toStringWithPort());
2351 #endif
2352 }
2353
2354 #ifdef HAVE_EBPF
2355 if (g_defaultBPFFilter) {
2356 cs->attachFilter(g_defaultBPFFilter);
2357 vinfolog("Attaching default BPF Filter to TCP frontend %s", cs->local.toStringWithPort());
2358 }
2359 #endif /* HAVE_EBPF */
2360
2361 // if(g_vm.count("bind-non-local"))
2362 bindAny(cs->local.sin4.sin_family, cs->tcpFD);
2363 SBind(cs->tcpFD, cs->local);
2364 SListen(cs->tcpFD, 64);
2365 warnlog("Listening on %s", cs->local.toStringWithPort());
2366
2367 toLaunch.push_back(cs);
2368 g_frontends.push_back(cs);
2369 tcpBindsCount++;
2370 }
2371
2372 #ifdef HAVE_DNSCRYPT
2373 for(auto& dcLocal : g_dnsCryptLocals) {
2374 ClientState* cs = new ClientState;
2375 cs->local = std::get<0>(dcLocal);
2376 cs->dnscryptCtx = std::get<1>(dcLocal);
2377 cs->udpFD = SSocket(cs->local.sin4.sin_family, SOCK_DGRAM, 0);
2378 if(cs->local.sin4.sin_family == AF_INET6) {
2379 SSetsockopt(cs->udpFD, IPPROTO_IPV6, IPV6_V6ONLY, 1);
2380 }
2381 bindAny(cs->local.sin4.sin_family, cs->udpFD);
2382 if(IsAnyAddress(cs->local)) {
2383 int one=1;
2384 setsockopt(cs->udpFD, IPPROTO_IP, GEN_IP_PKTINFO, &one, sizeof(one)); // linux supports this, so why not - might fail on other systems
2385 #ifdef IPV6_RECVPKTINFO
2386 setsockopt(cs->udpFD, IPPROTO_IPV6, IPV6_RECVPKTINFO, &one, sizeof(one));
2387 #endif
2388 }
2389 if (std::get<2>(dcLocal)) {
2390 #ifdef SO_REUSEPORT
2391 SSetsockopt(cs->udpFD, SOL_SOCKET, SO_REUSEPORT, 1);
2392 #else
2393 warnlog("SO_REUSEPORT has been configured on local address '%s' but is not supported", std::get<0>(dcLocal).toStringWithPort());
2394 #endif
2395 }
2396
2397 const std::string& itf = std::get<4>(dcLocal);
2398 if (!itf.empty()) {
2399 #ifdef SO_BINDTODEVICE
2400 int res = setsockopt(cs->udpFD, SOL_SOCKET, SO_BINDTODEVICE, itf.c_str(), itf.length());
2401 if (res != 0) {
2402 warnlog("Error setting up the interface on local address '%s': %s", std::get<0>(dcLocal).toStringWithPort(), strerror(errno));
2403 }
2404 #else
2405 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", std::get<0>(dcLocal).toStringWithPort());
2406 #endif
2407 }
2408
2409 #ifdef HAVE_EBPF
2410 if (g_defaultBPFFilter) {
2411 cs->attachFilter(g_defaultBPFFilter);
2412 vinfolog("Attaching default BPF Filter to UDP DNSCrypt frontend %s", cs->local.toStringWithPort());
2413 }
2414 #endif /* HAVE_EBPF */
2415 SBind(cs->udpFD, cs->local);
2416 toLaunch.push_back(cs);
2417 g_frontends.push_back(cs);
2418 udpBindsCount++;
2419
2420 cs = new ClientState;
2421 cs->local = std::get<0>(dcLocal);
2422 cs->dnscryptCtx = std::get<1>(dcLocal);
2423 cs->tcpFD = SSocket(cs->local.sin4.sin_family, SOCK_STREAM, 0);
2424 SSetsockopt(cs->tcpFD, SOL_SOCKET, SO_REUSEADDR, 1);
2425 #ifdef TCP_DEFER_ACCEPT
2426 SSetsockopt(cs->tcpFD, IPPROTO_TCP, TCP_DEFER_ACCEPT, 1);
2427 #endif
2428 if (std::get<3>(dcLocal) > 0) {
2429 #ifdef TCP_FASTOPEN
2430 SSetsockopt(cs->tcpFD, IPPROTO_TCP, TCP_FASTOPEN, std::get<3>(dcLocal));
2431 #else
2432 warnlog("TCP Fast Open has been configured on local address '%s' but is not supported", std::get<0>(dcLocal).toStringWithPort());
2433 #endif
2434 }
2435
2436 #ifdef SO_REUSEPORT
2437 /* no need to warn again if configured but support is not available, we already did for UDP */
2438 if (std::get<2>(dcLocal)) {
2439 SSetsockopt(cs->tcpFD, SOL_SOCKET, SO_REUSEPORT, 1);
2440 }
2441 #endif
2442
2443 if (!itf.empty()) {
2444 #ifdef SO_BINDTODEVICE
2445 int res = setsockopt(cs->tcpFD, SOL_SOCKET, SO_BINDTODEVICE, itf.c_str(), itf.length());
2446 if (res != 0) {
2447 warnlog("Error setting up the interface on local address '%s': %s", std::get<0>(dcLocal).toStringWithPort(), strerror(errno));
2448 }
2449 #else
2450 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", std::get<0>(dcLocal).toStringWithPort());
2451 #endif
2452 }
2453
2454 if(cs->local.sin4.sin_family == AF_INET6) {
2455 SSetsockopt(cs->tcpFD, IPPROTO_IPV6, IPV6_V6ONLY, 1);
2456 }
2457 #ifdef HAVE_EBPF
2458 if (g_defaultBPFFilter) {
2459 cs->attachFilter(g_defaultBPFFilter);
2460 vinfolog("Attaching default BPF Filter to TCP DNSCrypt frontend %s", cs->local.toStringWithPort());
2461 }
2462 #endif /* HAVE_EBPF */
2463
2464 cs->cpus = std::get<5>(dcLocal);
2465
2466 bindAny(cs->local.sin4.sin_family, cs->tcpFD);
2467 SBind(cs->tcpFD, cs->local);
2468 SListen(cs->tcpFD, 64);
2469 warnlog("Listening on %s", cs->local.toStringWithPort());
2470 toLaunch.push_back(cs);
2471 g_frontends.push_back(cs);
2472 tcpBindsCount++;
2473 }
2474 #endif
2475
2476 for(auto& frontend : g_tlslocals) {
2477 ClientState* cs = new ClientState;
2478 cs->local = frontend->d_addr;
2479 cs->tcpFD = SSocket(cs->local.sin4.sin_family, SOCK_STREAM, 0);
2480 SSetsockopt(cs->tcpFD, SOL_SOCKET, SO_REUSEADDR, 1);
2481 #ifdef TCP_DEFER_ACCEPT
2482 SSetsockopt(cs->tcpFD, IPPROTO_TCP, TCP_DEFER_ACCEPT, 1);
2483 #endif
2484 if (frontend->d_tcpFastOpenQueueSize > 0) {
2485 #ifdef TCP_FASTOPEN
2486 SSetsockopt(cs->tcpFD, IPPROTO_TCP, TCP_FASTOPEN, frontend->d_tcpFastOpenQueueSize);
2487 #else
2488 warnlog("TCP Fast Open has been configured on local address '%s' but is not supported", cs->local.toStringWithPort());
2489 #endif
2490 }
2491 if (frontend->d_reusePort) {
2492 #ifdef SO_REUSEPORT
2493 SSetsockopt(cs->tcpFD, SOL_SOCKET, SO_REUSEPORT, 1);
2494 #else
2495 warnlog("SO_REUSEPORT has been configured on local address '%s' but is not supported", cs.local.toStringWithPort());
2496 #endif
2497 }
2498 if(cs->local.sin4.sin_family == AF_INET6) {
2499 SSetsockopt(cs->tcpFD, IPPROTO_IPV6, IPV6_V6ONLY, 1);
2500 }
2501
2502 if (!frontend->d_interface.empty()) {
2503 #ifdef SO_BINDTODEVICE
2504 int res = setsockopt(cs->tcpFD, SOL_SOCKET, SO_BINDTODEVICE, frontend->d_interface.c_str(), frontend->d_interface.length());
2505 if (res != 0) {
2506 warnlog("Error setting up the interface on local address '%s': %s", cs->local.toStringWithPort(), strerror(errno));
2507 }
2508 #else
2509 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", cs->local.toStringWithPort());
2510 #endif
2511 }
2512
2513 cs->cpus = frontend->d_cpus;
2514
2515 bindAny(cs->local.sin4.sin_family, cs->tcpFD);
2516 if (frontend->setupTLS()) {
2517 cs->tlsFrontend = frontend;
2518 SBind(cs->tcpFD, cs->local);
2519 SListen(cs->tcpFD, 64);
2520 warnlog("Listening on %s for TLS", cs->local.toStringWithPort());
2521 toLaunch.push_back(cs);
2522 g_frontends.push_back(cs);
2523 tcpBindsCount++;
2524 }
2525 else {
2526 delete cs;
2527 errlog("Error while setting up TLS on local address '%s', exiting", cs->local.toStringWithPort());
2528 _exit(EXIT_FAILURE);
2529 }
2530 }
2531
2532 warnlog("dnsdist %s comes with ABSOLUTELY NO WARRANTY. This is free software, and you are welcome to redistribute it according to the terms of the GPL version 2", VERSION);
2533 vector<string> vec;
2534 std::string acls;
2535 g_ACL.getLocal()->toStringVector(&vec);
2536 for(const auto& s : vec) {
2537 if (!acls.empty())
2538 acls += ", ";
2539 acls += s;
2540 }
2541 infolog("ACL allowing queries from: %s", acls.c_str());
2542 vec.clear();
2543 acls.clear();
2544 g_consoleACL.getLocal()->toStringVector(&vec);
2545 for (const auto& entry : vec) {
2546 if (!acls.empty()) {
2547 acls += ", ";
2548 }
2549 acls += entry;
2550 }
2551 infolog("Console ACL allowing connections from: %s", acls.c_str());
2552
2553 uid_t newgid=0;
2554 gid_t newuid=0;
2555
2556 if(!g_cmdLine.gid.empty())
2557 newgid = strToGID(g_cmdLine.gid.c_str());
2558
2559 if(!g_cmdLine.uid.empty())
2560 newuid = strToUID(g_cmdLine.uid.c_str());
2561
2562 dropGroupPrivs(newgid);
2563 dropUserPrivs(newuid);
2564
2565 /* this need to be done _after_ dropping privileges */
2566 g_delay = new DelayPipe<DelayedPacket>();
2567
2568 if (g_snmpAgent) {
2569 g_snmpAgent->run();
2570 }
2571
2572 g_tcpclientthreads = std::make_shared<TCPClientCollection>(g_maxTCPClientThreads, g_useTCPSinglePipe);
2573
2574 for(auto& t : todo)
2575 t();
2576
2577 auto localPools = g_pools.getCopy();
2578 /* create the default pool no matter what */
2579 createPoolIfNotExists(localPools, "");
2580 if(g_cmdLine.remotes.size()) {
2581 for(const auto& address : g_cmdLine.remotes) {
2582 auto ret=std::make_shared<DownstreamState>(ComboAddress(address, 53));
2583 addServerToPool(localPools, "", ret);
2584 if (ret->connected) {
2585 ret->tid = thread(responderThread, ret);
2586 }
2587 g_dstates.modify([ret](servers_t& servers) { servers.push_back(ret); });
2588 }
2589 }
2590 g_pools.setState(localPools);
2591
2592 if(g_dstates.getLocal()->empty()) {
2593 errlog("No downstream servers defined: all packets will get dropped");
2594 // you might define them later, but you need to know
2595 }
2596
2597 checkFileDescriptorsLimits(udpBindsCount, tcpBindsCount);
2598
2599 for(auto& dss : g_dstates.getCopy()) { // it is a copy, but the internal shared_ptrs are the real deal
2600 if(dss->availability==DownstreamState::Availability::Auto) {
2601 bool newState=upCheck(*dss);
2602 warnlog("Marking downstream %s as '%s'", dss->getNameWithAddr(), newState ? "up" : "down");
2603 dss->upStatus = newState;
2604 }
2605 }
2606
2607 for(auto& cs : toLaunch) {
2608 if (cs->udpFD >= 0) {
2609 thread t1(udpClientThread, cs);
2610 if (!cs->cpus.empty()) {
2611 mapThreadToCPUList(t1.native_handle(), cs->cpus);
2612 }
2613 t1.detach();
2614 }
2615 else if (cs->tcpFD >= 0) {
2616 thread t1(tcpAcceptorThread, cs);
2617 if (!cs->cpus.empty()) {
2618 mapThreadToCPUList(t1.native_handle(), cs->cpus);
2619 }
2620 t1.detach();
2621 }
2622 }
2623
2624 thread carbonthread(carbonDumpThread);
2625 carbonthread.detach();
2626
2627 thread stattid(maintThread);
2628 stattid.detach();
2629
2630 thread healththread(healthChecksThread);
2631
2632 if(g_cmdLine.beSupervised) {
2633 #ifdef HAVE_SYSTEMD
2634 sd_notify(0, "READY=1");
2635 #endif
2636 healththread.join();
2637 }
2638 else {
2639 healththread.detach();
2640 doConsole();
2641 }
2642 _exit(EXIT_SUCCESS);
2643
2644 }
2645 catch(const LuaContext::ExecutionErrorException& e) {
2646 try {
2647 errlog("Fatal Lua error: %s", e.what());
2648 std::rethrow_if_nested(e);
2649 } catch(const std::exception& ne) {
2650 errlog("Details: %s", ne.what());
2651 }
2652 catch(PDNSException &ae)
2653 {
2654 errlog("Fatal pdns error: %s", ae.reason);
2655 }
2656 _exit(EXIT_FAILURE);
2657 }
2658 catch(std::exception &e)
2659 {
2660 errlog("Fatal error: %s", e.what());
2661 _exit(EXIT_FAILURE);
2662 }
2663 catch(PDNSException &ae)
2664 {
2665 errlog("Fatal pdns error: %s", ae.reason);
2666 _exit(EXIT_FAILURE);
2667 }