]> git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/dnsdist.cc
dnsdist: Add DNSQuestion:getEDNSOptions() to access incoming EDNS options
[thirdparty/pdns.git] / pdns / dnsdist.cc
1 /*
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
8 *
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
22
23 #include "config.h"
24
25 #include <fstream>
26 #include <getopt.h>
27 #include <grp.h>
28 #include <limits>
29 #include <netinet/tcp.h>
30 #include <pwd.h>
31 #include <sys/resource.h>
32 #include <unistd.h>
33
34 #if defined (__OpenBSD__) || defined(__NetBSD__)
35 #include <readline/readline.h>
36 #else
37 #include <editline/readline.h>
38 #endif
39
40 #ifdef HAVE_SYSTEMD
41 #include <systemd/sd-daemon.h>
42 #endif
43
44 #include "dnsdist.hh"
45 #include "dnsdist-cache.hh"
46 #include "dnsdist-console.hh"
47 #include "dnsdist-ecs.hh"
48 #include "dnsdist-lua.hh"
49 #include "dnsdist-rings.hh"
50
51 #include "base64.hh"
52 #include "delaypipe.hh"
53 #include "dolog.hh"
54 #include "dnsname.hh"
55 #include "dnsparser.hh"
56 #include "dnswriter.hh"
57 #include "ednsoptions.hh"
58 #include "gettime.hh"
59 #include "lock.hh"
60 #include "misc.hh"
61 #include "sodcrypto.hh"
62 #include "sstuff.hh"
63 #include "xpf.hh"
64
65 thread_local boost::uuids::random_generator t_uuidGenerator;
66
67 /* Known sins:
68
69 Receiver is currently single threaded
70 not *that* bad actually, but now that we are thread safe, might want to scale
71 */
72
73 /* the Rulaction plan
74 Set of Rules, if one matches, it leads to an Action
75 Both rules and actions could conceivably be Lua based.
76 On the C++ side, both could be inherited from a class Rule and a class Action,
77 on the Lua side we can't do that. */
78
79 using std::atomic;
80 using std::thread;
81 bool g_verbose;
82
83 struct DNSDistStats g_stats;
84 MetricDefinitionStorage g_metricDefinitions;
85
86 uint16_t g_maxOutstanding{10240};
87 bool g_verboseHealthChecks{false};
88 uint32_t g_staleCacheEntriesTTL{0};
89 bool g_syslog{true};
90
91 GlobalStateHolder<NetmaskGroup> g_ACL;
92 string g_outputBuffer;
93
94 vector<std::tuple<ComboAddress, bool, bool, int, string, std::set<int>>> g_locals;
95 std::vector<std::shared_ptr<TLSFrontend>> g_tlslocals;
96 #ifdef HAVE_DNSCRYPT
97 std::vector<std::tuple<ComboAddress,std::shared_ptr<DNSCryptContext>,bool, int, string, std::set<int> >> g_dnsCryptLocals;
98 #endif
99 #ifdef HAVE_EBPF
100 shared_ptr<BPFFilter> g_defaultBPFFilter;
101 std::vector<std::shared_ptr<DynBPFFilter> > g_dynBPFFilters;
102 #endif /* HAVE_EBPF */
103 vector<ClientState *> g_frontends;
104 GlobalStateHolder<pools_t> g_pools;
105 size_t g_udpVectorSize{1};
106
107 bool g_snmpEnabled{false};
108 bool g_snmpTrapsEnabled{false};
109 DNSDistSNMPAgent* g_snmpAgent{nullptr};
110
111 /* UDP: the grand design. Per socket we listen on for incoming queries there is one thread.
112 Then we have a bunch of connected sockets for talking to downstream servers.
113 We send directly to those sockets.
114
115 For the return path, per downstream server we have a thread that listens to responses.
116
117 Per socket there is an array of 2^16 states, when we send out a packet downstream, we note
118 there the original requestor and the original id. The new ID is the offset in the array.
119
120 When an answer comes in on a socket, we look up the offset by the id, and lob it to the
121 original requestor.
122
123 IDs are assigned by atomic increments of the socket offset.
124 */
125
126 GlobalStateHolder<vector<DNSDistRuleAction> > g_rulactions;
127 GlobalStateHolder<vector<DNSDistResponseRuleAction> > g_resprulactions;
128 GlobalStateHolder<vector<DNSDistResponseRuleAction> > g_cachehitresprulactions;
129 GlobalStateHolder<vector<DNSDistResponseRuleAction> > g_selfansweredresprulactions;
130
131 Rings g_rings;
132 QueryCount g_qcount;
133
134 GlobalStateHolder<servers_t> g_dstates;
135 GlobalStateHolder<NetmaskTree<DynBlock>> g_dynblockNMG;
136 GlobalStateHolder<SuffixMatchTree<DynBlock>> g_dynblockSMT;
137 DNSAction::Action g_dynBlockAction = DNSAction::Action::Drop;
138 int g_tcpRecvTimeout{2};
139 int g_tcpSendTimeout{2};
140 int g_udpTimeout{2};
141
142 bool g_servFailOnNoPolicy{false};
143 bool g_truncateTC{false};
144 bool g_fixupCase{0};
145
146 static void truncateTC(char* packet, uint16_t* len, size_t responseSize, unsigned int consumed)
147 try
148 {
149 bool hadEDNS = false;
150 uint16_t payloadSize = 0;
151 uint16_t z = 0;
152
153 if (g_addEDNSToSelfGeneratedResponses) {
154 hadEDNS = getEDNSUDPPayloadSizeAndZ(packet, *len, &payloadSize, &z);
155 }
156
157 *len=(uint16_t) (sizeof(dnsheader)+consumed+DNS_TYPE_SIZE+DNS_CLASS_SIZE);
158 struct dnsheader* dh = reinterpret_cast<struct dnsheader*>(packet);
159 dh->ancount = dh->arcount = dh->nscount = 0;
160
161 if (hadEDNS) {
162 addEDNS(dh, *len, responseSize, z & EDNS_HEADER_FLAG_DO, payloadSize);
163 }
164 }
165 catch(...)
166 {
167 g_stats.truncFail++;
168 }
169
170 struct DelayedPacket
171 {
172 int fd;
173 string packet;
174 ComboAddress destination;
175 ComboAddress origDest;
176 void operator()()
177 {
178 ssize_t res;
179 if(origDest.sin4.sin_family == 0) {
180 res = sendto(fd, packet.c_str(), packet.size(), 0, (struct sockaddr*)&destination, destination.getSocklen());
181 }
182 else {
183 res = sendfromto(fd, packet.c_str(), packet.size(), 0, origDest, destination);
184 }
185 if (res == -1) {
186 int err = errno;
187 vinfolog("Error sending delayed response to %s: %s", destination.toStringWithPort(), strerror(err));
188 }
189 }
190 };
191
192 DelayPipe<DelayedPacket> * g_delay = 0;
193
194 void doLatencyStats(double udiff)
195 {
196 if(udiff < 1000) g_stats.latency0_1++;
197 else if(udiff < 10000) g_stats.latency1_10++;
198 else if(udiff < 50000) g_stats.latency10_50++;
199 else if(udiff < 100000) g_stats.latency50_100++;
200 else if(udiff < 1000000) g_stats.latency100_1000++;
201 else g_stats.latencySlow++;
202
203 auto doAvg = [](double& var, double n, double weight) {
204 var = (weight -1) * var/weight + n/weight;
205 };
206
207 doAvg(g_stats.latencyAvg100, udiff, 100);
208 doAvg(g_stats.latencyAvg1000, udiff, 1000);
209 doAvg(g_stats.latencyAvg10000, udiff, 10000);
210 doAvg(g_stats.latencyAvg1000000, udiff, 1000000);
211 }
212
213 bool responseContentMatches(const char* response, const uint16_t responseLen, const DNSName& qname, const uint16_t qtype, const uint16_t qclass, const ComboAddress& remote, unsigned int& consumed)
214 {
215 uint16_t rqtype, rqclass;
216 DNSName rqname;
217 const struct dnsheader* dh = (struct dnsheader*) response;
218
219 if (responseLen < sizeof(dnsheader)) {
220 return false;
221 }
222
223 if (dh->qdcount == 0) {
224 if (dh->rcode != RCode::NoError && dh->rcode != RCode::NXDomain) {
225 return true;
226 }
227 else {
228 g_stats.nonCompliantResponses++;
229 return false;
230 }
231 }
232
233 try {
234 rqname=DNSName(response, responseLen, sizeof(dnsheader), false, &rqtype, &rqclass, &consumed);
235 }
236 catch(std::exception& e) {
237 if(responseLen > (ssize_t)sizeof(dnsheader))
238 infolog("Backend %s sent us a response with id %d that did not parse: %s", remote.toStringWithPort(), ntohs(dh->id), e.what());
239 g_stats.nonCompliantResponses++;
240 return false;
241 }
242
243 if (rqtype != qtype || rqclass != qclass || rqname != qname) {
244 return false;
245 }
246
247 return true;
248 }
249
250 void restoreFlags(struct dnsheader* dh, uint16_t origFlags)
251 {
252 static const uint16_t rdMask = 1 << FLAGS_RD_OFFSET;
253 static const uint16_t cdMask = 1 << FLAGS_CD_OFFSET;
254 static const uint16_t restoreFlagsMask = UINT16_MAX & ~(rdMask | cdMask);
255 uint16_t * flags = getFlagsFromDNSHeader(dh);
256 /* clear the flags we are about to restore */
257 *flags &= restoreFlagsMask;
258 /* only keep the flags we want to restore */
259 origFlags &= ~restoreFlagsMask;
260 /* set the saved flags as they were */
261 *flags |= origFlags;
262 }
263
264 bool fixUpQueryTurnedResponse(DNSQuestion& dq, const uint16_t origFlags)
265 {
266 restoreFlags(dq.dh, origFlags);
267
268 return addEDNSToQueryTurnedResponse(dq);
269 }
270
271 bool fixUpResponse(char** response, uint16_t* responseLen, size_t* responseSize, const DNSName& qname, uint16_t origFlags, bool ednsAdded, bool ecsAdded, std::vector<uint8_t>& rewrittenResponse, uint16_t addRoom)
272 {
273 struct dnsheader* dh = (struct dnsheader*) *response;
274
275 if (*responseLen < sizeof(dnsheader)) {
276 return false;
277 }
278
279 restoreFlags(dh, origFlags);
280
281 if (*responseLen == sizeof(dnsheader)) {
282 return true;
283 }
284
285 if(g_fixupCase) {
286 string realname = qname.toDNSString();
287 if (*responseLen >= (sizeof(dnsheader) + realname.length())) {
288 memcpy(*response + sizeof(dnsheader), realname.c_str(), realname.length());
289 }
290 }
291
292 if (ednsAdded || ecsAdded) {
293 uint16_t optStart;
294 size_t optLen = 0;
295 bool last = false;
296
297 const std::string responseStr(*response, *responseLen);
298 int res = locateEDNSOptRR(responseStr, &optStart, &optLen, &last);
299
300 if (res == 0) {
301 if (ednsAdded) {
302 /* we added the entire OPT RR,
303 therefore we need to remove it entirely */
304 if (last) {
305 /* simply remove the last AR */
306 *responseLen -= optLen;
307 uint16_t arcount = ntohs(dh->arcount);
308 arcount--;
309 dh->arcount = htons(arcount);
310 }
311 else {
312 /* Removing an intermediary RR could lead to compression error */
313 if (rewriteResponseWithoutEDNS(responseStr, rewrittenResponse) == 0) {
314 *responseLen = rewrittenResponse.size();
315 if (addRoom && (UINT16_MAX - *responseLen) > addRoom) {
316 rewrittenResponse.reserve(*responseLen + addRoom);
317 }
318 *responseSize = rewrittenResponse.capacity();
319 *response = reinterpret_cast<char*>(rewrittenResponse.data());
320 }
321 else {
322 warnlog("Error rewriting content");
323 }
324 }
325 }
326 else {
327 /* the OPT RR was already present, but without ECS,
328 we need to remove the ECS option if any */
329 if (last) {
330 /* nothing after the OPT RR, we can simply remove the
331 ECS option */
332 size_t existingOptLen = optLen;
333 removeEDNSOptionFromOPT(*response + optStart, &optLen, EDNSOptionCode::ECS);
334 *responseLen -= (existingOptLen - optLen);
335 }
336 else {
337 /* Removing an intermediary RR could lead to compression error */
338 if (rewriteResponseWithoutEDNSOption(responseStr, EDNSOptionCode::ECS, rewrittenResponse) == 0) {
339 *responseLen = rewrittenResponse.size();
340 if (addRoom && (UINT16_MAX - *responseLen) > addRoom) {
341 rewrittenResponse.reserve(*responseLen + addRoom);
342 }
343 *responseSize = rewrittenResponse.capacity();
344 *response = reinterpret_cast<char*>(rewrittenResponse.data());
345 }
346 else {
347 warnlog("Error rewriting content");
348 }
349 }
350 }
351 }
352 }
353
354 return true;
355 }
356
357 #ifdef HAVE_DNSCRYPT
358 bool encryptResponse(char* response, uint16_t* responseLen, size_t responseSize, bool tcp, std::shared_ptr<DNSCryptQuery> dnsCryptQuery, dnsheader** dh, dnsheader* dhCopy)
359 {
360 if (dnsCryptQuery) {
361 uint16_t encryptedResponseLen = 0;
362
363 /* save the original header before encrypting it in place */
364 if (dh != nullptr && *dh != nullptr && dhCopy != nullptr) {
365 memcpy(dhCopy, *dh, sizeof(dnsheader));
366 *dh = dhCopy;
367 }
368
369 int res = dnsCryptQuery->encryptResponse(response, *responseLen, responseSize, tcp, &encryptedResponseLen);
370 if (res == 0) {
371 *responseLen = encryptedResponseLen;
372 } else {
373 /* dropping response */
374 vinfolog("Error encrypting the response, dropping.");
375 return false;
376 }
377 }
378 return true;
379 }
380 #endif
381
382 static bool sendUDPResponse(int origFD, char* response, uint16_t responseLen, int delayMsec, const ComboAddress& origDest, const ComboAddress& origRemote)
383 {
384 if(delayMsec && g_delay) {
385 DelayedPacket dp{origFD, string(response,responseLen), origRemote, origDest};
386 g_delay->submit(dp, delayMsec);
387 }
388 else {
389 ssize_t res;
390 if(origDest.sin4.sin_family == 0) {
391 res = sendto(origFD, response, responseLen, 0, (struct sockaddr*)&origRemote, origRemote.getSocklen());
392 }
393 else {
394 res = sendfromto(origFD, response, responseLen, 0, origDest, origRemote);
395 }
396 if (res == -1) {
397 int err = errno;
398 vinfolog("Error sending response to %s: %s", origRemote.toStringWithPort(), strerror(err));
399 }
400 }
401
402 return true;
403 }
404
405
406 static int pickBackendSocketForSending(DownstreamState* state)
407 {
408 return state->sockets[state->socketsOffset++ % state->sockets.size()];
409 }
410
411 static void pickBackendSocketsReadyForReceiving(const std::shared_ptr<DownstreamState>& state, std::vector<int>& ready)
412 {
413 ready.clear();
414
415 if (state->sockets.size() == 1) {
416 ready.push_back(state->sockets[0]);
417 return ;
418 }
419
420 {
421 std::lock_guard<std::mutex> lock(state->socketsLock);
422 state->mplexer->getAvailableFDs(ready, -1);
423 }
424 }
425
426 // listens on a dedicated socket, lobs answers from downstream servers to original requestors
427 void* responderThread(std::shared_ptr<DownstreamState> dss)
428 try {
429 auto localRespRulactions = g_resprulactions.getLocal();
430 #ifdef HAVE_DNSCRYPT
431 char packet[4096 + DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE];
432 /* when the answer is encrypted in place, we need to get a copy
433 of the original header before encryption to fill the ring buffer */
434 dnsheader dhCopy;
435 #else
436 char packet[4096];
437 #endif
438 static_assert(sizeof(packet) <= UINT16_MAX, "Packet size should fit in a uint16_t");
439 vector<uint8_t> rewrittenResponse;
440
441 uint16_t queryId = 0;
442 std::vector<int> sockets;
443 sockets.reserve(dss->sockets.size());
444
445 for(;;) {
446 dnsheader* dh = reinterpret_cast<struct dnsheader*>(packet);
447 try {
448 pickBackendSocketsReadyForReceiving(dss, sockets);
449 for (const auto& fd : sockets) {
450 ssize_t got = recv(fd, packet, sizeof(packet), 0);
451 char * response = packet;
452 size_t responseSize = sizeof(packet);
453
454 if (got < (ssize_t) sizeof(dnsheader))
455 continue;
456
457 uint16_t responseLen = (uint16_t) got;
458 queryId = dh->id;
459
460 if(queryId >= dss->idStates.size())
461 continue;
462
463 IDState* ids = &dss->idStates[queryId];
464 int origFD = ids->origFD;
465
466 if(origFD < 0) // duplicate
467 continue;
468
469 /* setting age to 0 to prevent the maintainer thread from
470 cleaning this IDS while we process the response.
471 We have already a copy of the origFD, so it would
472 mostly mess up the outstanding counter.
473 */
474 ids->age = 0;
475
476 unsigned int consumed = 0;
477 if (!responseContentMatches(response, responseLen, ids->qname, ids->qtype, ids->qclass, dss->remote, consumed)) {
478 continue;
479 }
480
481 int oldFD = ids->origFD.exchange(-1);
482 if (oldFD == origFD) {
483 /* we only decrement the outstanding counter if the value was not
484 altered in the meantime, which would mean that the state has been actively reused
485 and the other thread has not incremented the outstanding counter, so we don't
486 want it to be decremented twice. */
487 --dss->outstanding; // you'd think an attacker could game this, but we're using connected socket
488 }
489
490 if(dh->tc && g_truncateTC) {
491 truncateTC(response, &responseLen, responseSize, consumed);
492 }
493
494 dh->id = ids->origID;
495
496 uint16_t addRoom = 0;
497 DNSResponse dr(&ids->qname, ids->qtype, ids->qclass, consumed, &ids->origDest, &ids->origRemote, dh, sizeof(packet), responseLen, false, &ids->sentTime.d_start);
498 #ifdef HAVE_PROTOBUF
499 dr.uniqueId = ids->uniqueId;
500 #endif
501 dr.qTag = ids->qTag;
502
503 if (!processResponse(localRespRulactions, dr, &ids->delayMsec)) {
504 continue;
505 }
506
507 #ifdef HAVE_DNSCRYPT
508 if (ids->dnsCryptQuery) {
509 addRoom = DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE;
510 }
511 #endif
512 if (!fixUpResponse(&response, &responseLen, &responseSize, ids->qname, ids->origFlags, ids->ednsAdded, ids->ecsAdded, rewrittenResponse, addRoom)) {
513 continue;
514 }
515
516 if (ids->packetCache && !ids->skipCache) {
517 ids->packetCache->insert(ids->cacheKey, ids->subnet, ids->origFlags, ids->qname, ids->qtype, ids->qclass, response, responseLen, false, dh->rcode, ids->tempFailureTTL);
518 }
519
520 if (ids->cs && !ids->cs->muted) {
521 #ifdef HAVE_DNSCRYPT
522 if (!encryptResponse(response, &responseLen, responseSize, false, ids->dnsCryptQuery, &dh, &dhCopy)) {
523 continue;
524 }
525 #endif
526
527 ComboAddress empty;
528 empty.sin4.sin_family = 0;
529 /* if ids->destHarvested is false, origDest holds the listening address.
530 We don't want to use that as a source since it could be 0.0.0.0 for example. */
531 sendUDPResponse(origFD, response, responseLen, ids->delayMsec, ids->destHarvested ? ids->origDest : empty, ids->origRemote);
532 }
533
534 g_stats.responses++;
535
536 double udiff = ids->sentTime.udiff();
537 vinfolog("Got answer from %s, relayed to %s, took %f usec", dss->remote.toStringWithPort(), ids->origRemote.toStringWithPort(), udiff);
538
539 struct timespec ts;
540 gettime(&ts);
541 g_rings.insertResponse(ts, ids->origRemote, ids->qname, ids->qtype, (unsigned int)udiff, (unsigned int)got, *dh, dss->remote);
542
543 if(dh->rcode == RCode::ServFail)
544 g_stats.servfailResponses++;
545 dss->latencyUsec = (127.0 * dss->latencyUsec / 128.0) + udiff/128.0;
546
547 doLatencyStats(udiff);
548
549 /* if the FD is not -1, the state has been actively reused and we should
550 not alter anything */
551 if (ids->origFD == -1) {
552 #ifdef HAVE_DNSCRYPT
553 ids->dnsCryptQuery = nullptr;
554 #endif
555 }
556
557 rewrittenResponse.clear();
558 }
559 }
560 catch(const std::exception& e){
561 vinfolog("Got an error in UDP responder thread while parsing a response from %s, id %d: %s", dss->remote.toStringWithPort(), queryId, e.what());
562 }
563 }
564 return 0;
565 }
566 catch(const std::exception& e)
567 {
568 errlog("UDP responder thread died because of exception: %s", e.what());
569 return 0;
570 }
571 catch(const PDNSException& e)
572 {
573 errlog("UDP responder thread died because of PowerDNS exception: %s", e.reason);
574 return 0;
575 }
576 catch(...)
577 {
578 errlog("UDP responder thread died because of an exception: %s", "unknown");
579 return 0;
580 }
581
582 bool DownstreamState::reconnect()
583 {
584 std::unique_lock<std::mutex> tl(connectLock, std::try_to_lock);
585 if (!tl.owns_lock()) {
586 /* we are already reconnecting */
587 return false;
588 }
589
590 connected = false;
591 for (auto& fd : sockets) {
592 if (fd != -1) {
593 if (sockets.size() > 1) {
594 std::lock_guard<std::mutex> lock(socketsLock);
595 mplexer->removeReadFD(fd);
596 }
597 /* shutdown() is needed to wake up recv() in the responderThread */
598 shutdown(fd, SHUT_RDWR);
599 close(fd);
600 fd = -1;
601 }
602 if (!IsAnyAddress(remote)) {
603 fd = SSocket(remote.sin4.sin_family, SOCK_DGRAM, 0);
604 if (!IsAnyAddress(sourceAddr)) {
605 SSetsockopt(fd, SOL_SOCKET, SO_REUSEADDR, 1);
606 SBind(fd, sourceAddr);
607 }
608 try {
609 SConnect(fd, remote);
610 if (sockets.size() > 1) {
611 std::lock_guard<std::mutex> lock(socketsLock);
612 mplexer->addReadFD(fd, [](int, boost::any) {});
613 }
614 connected = true;
615 }
616 catch(const std::runtime_error& error) {
617 infolog("Error connecting to new server with address %s: %s", remote.toStringWithPort(), error.what());
618 connected = false;
619 break;
620 }
621 }
622 }
623
624 /* if at least one (re-)connection failed, close all sockets */
625 if (!connected) {
626 for (auto& fd : sockets) {
627 if (fd != -1) {
628 if (sockets.size() > 1) {
629 std::lock_guard<std::mutex> lock(socketsLock);
630 mplexer->removeReadFD(fd);
631 }
632 /* shutdown() is needed to wake up recv() in the responderThread */
633 shutdown(fd, SHUT_RDWR);
634 close(fd);
635 fd = -1;
636 }
637 }
638 }
639
640 return connected;
641 }
642 void DownstreamState::hash()
643 {
644 vinfolog("Computing hashes for id=%s and weight=%d", id, weight);
645 auto w = weight;
646 WriteLock wl(&d_lock);
647 hashes.clear();
648 while (w > 0) {
649 std::string uuid = boost::str(boost::format("%s-%d") % id % w);
650 unsigned int wshash = burtleCI((const unsigned char*)uuid.c_str(), uuid.size(), g_hashperturb);
651 hashes.insert(wshash);
652 --w;
653 }
654 }
655
656 void DownstreamState::setId(const boost::uuids::uuid& newId)
657 {
658 id = newId;
659 // compute hashes only if already done
660 if (!hashes.empty()) {
661 hash();
662 }
663 }
664
665 void DownstreamState::setWeight(int newWeight)
666 {
667 if (newWeight < 1) {
668 errlog("Error setting server's weight: downstream weight value must be greater than 0.");
669 return ;
670 }
671 weight = newWeight;
672 if (!hashes.empty()) {
673 hash();
674 }
675 }
676
677 DownstreamState::DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf_, size_t numberOfSockets): remote(remote_), sourceAddr(sourceAddr_), sourceItf(sourceItf_)
678 {
679 pthread_rwlock_init(&d_lock, nullptr);
680 id = t_uuidGenerator();
681 threadStarted.clear();
682
683 mplexer = std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
684
685 sockets.resize(numberOfSockets);
686 for (auto& fd : sockets) {
687 fd = -1;
688 }
689
690 if (!IsAnyAddress(remote)) {
691 reconnect();
692 idStates.resize(g_maxOutstanding);
693 sw.start();
694 infolog("Added downstream server %s", remote.toStringWithPort());
695 }
696
697 }
698
699 std::mutex g_luamutex;
700 LuaContext g_lua;
701
702 GlobalStateHolder<ServerPolicy> g_policy;
703
704 shared_ptr<DownstreamState> firstAvailable(const NumberedServerVector& servers, const DNSQuestion* dq)
705 {
706 for(auto& d : servers) {
707 if(d.second->isUp() && d.second->qps.check())
708 return d.second;
709 }
710 return leastOutstanding(servers, dq);
711 }
712
713 // get server with least outstanding queries, and within those, with the lowest order, and within those: the fastest
714 shared_ptr<DownstreamState> leastOutstanding(const NumberedServerVector& servers, const DNSQuestion* dq)
715 {
716 if (servers.size() == 1 && servers[0].second->isUp()) {
717 return servers[0].second;
718 }
719
720 vector<pair<tuple<int,int,double>, shared_ptr<DownstreamState>>> poss;
721 /* so you might wonder, why do we go through this trouble? The data on which we sort could change during the sort,
722 which would suck royally and could even lead to crashes. So first we snapshot on what we sort, and then we sort */
723 poss.reserve(servers.size());
724 for(auto& d : servers) {
725 if(d.second->isUp()) {
726 poss.push_back({make_tuple(d.second->outstanding.load(), d.second->order, d.second->latencyUsec), d.second});
727 }
728 }
729 if(poss.empty())
730 return shared_ptr<DownstreamState>();
731 nth_element(poss.begin(), poss.begin(), poss.end(), [](const decltype(poss)::value_type& a, const decltype(poss)::value_type& b) { return a.first < b.first; });
732 return poss.begin()->second;
733 }
734
735 shared_ptr<DownstreamState> valrandom(unsigned int val, const NumberedServerVector& servers, const DNSQuestion* dq)
736 {
737 vector<pair<int, shared_ptr<DownstreamState>>> poss;
738 int sum = 0;
739 int max = std::numeric_limits<int>::max();
740
741 for(auto& d : servers) { // w=1, w=10 -> 1, 11
742 if(d.second->isUp()) {
743 // Don't overflow sum when adding high weights
744 if(d.second->weight > max - sum) {
745 sum = max;
746 } else {
747 sum += d.second->weight;
748 }
749
750 poss.push_back({sum, d.second});
751 }
752 }
753
754 // Catch poss & sum are empty to avoid SIGFPE
755 if(poss.empty())
756 return shared_ptr<DownstreamState>();
757
758 int r = val % sum;
759 auto p = upper_bound(poss.begin(), poss.end(),r, [](int r_, const decltype(poss)::value_type& a) { return r_ < a.first;});
760 if(p==poss.end())
761 return shared_ptr<DownstreamState>();
762 return p->second;
763 }
764
765 shared_ptr<DownstreamState> wrandom(const NumberedServerVector& servers, const DNSQuestion* dq)
766 {
767 return valrandom(random(), servers, dq);
768 }
769
770 uint32_t g_hashperturb;
771 shared_ptr<DownstreamState> whashed(const NumberedServerVector& servers, const DNSQuestion* dq)
772 {
773 return valrandom(dq->qname->hash(g_hashperturb), servers, dq);
774 }
775
776 shared_ptr<DownstreamState> chashed(const NumberedServerVector& servers, const DNSQuestion* dq)
777 {
778 unsigned int qhash = dq->qname->hash(g_hashperturb);
779 unsigned int sel = std::numeric_limits<unsigned int>::max();
780 unsigned int min = std::numeric_limits<unsigned int>::max();
781 shared_ptr<DownstreamState> ret = nullptr, first = nullptr;
782
783 for (const auto& d: servers) {
784 if (d.second->isUp()) {
785 // make sure hashes have been computed
786 if (d.second->hashes.empty()) {
787 d.second->hash();
788 }
789 {
790 ReadLock rl(&(d.second->d_lock));
791 const auto& server = d.second;
792 // we want to keep track of the last hash
793 if (min > *(server->hashes.begin())) {
794 min = *(server->hashes.begin());
795 first = server;
796 }
797
798 auto hash_it = server->hashes.lower_bound(qhash);
799 if (hash_it != server->hashes.end()) {
800 if (*hash_it < sel) {
801 sel = *hash_it;
802 ret = server;
803 }
804 }
805 }
806 }
807 }
808 if (ret != nullptr) {
809 return ret;
810 }
811 if (first != nullptr) {
812 return first;
813 }
814 return shared_ptr<DownstreamState>();
815 }
816
817 shared_ptr<DownstreamState> roundrobin(const NumberedServerVector& servers, const DNSQuestion* dq)
818 {
819 NumberedServerVector poss;
820
821 for(auto& d : servers) {
822 if(d.second->isUp()) {
823 poss.push_back(d);
824 }
825 }
826
827 const auto *res=&poss;
828 if(poss.empty())
829 res = &servers;
830
831 if(res->empty())
832 return shared_ptr<DownstreamState>();
833
834 static unsigned int counter;
835
836 return (*res)[(counter++) % res->size()].second;
837 }
838
839 ComboAddress g_serverControl{"127.0.0.1:5199"};
840
841 std::shared_ptr<ServerPool> createPoolIfNotExists(pools_t& pools, const string& poolName)
842 {
843 std::shared_ptr<ServerPool> pool;
844 pools_t::iterator it = pools.find(poolName);
845 if (it != pools.end()) {
846 pool = it->second;
847 }
848 else {
849 if (!poolName.empty())
850 vinfolog("Creating pool %s", poolName);
851 pool = std::make_shared<ServerPool>();
852 pools.insert(std::pair<std::string,std::shared_ptr<ServerPool> >(poolName, pool));
853 }
854 return pool;
855 }
856
857 void setPoolPolicy(pools_t& pools, const string& poolName, std::shared_ptr<ServerPolicy> policy)
858 {
859 std::shared_ptr<ServerPool> pool = createPoolIfNotExists(pools, poolName);
860 if (!poolName.empty()) {
861 vinfolog("Setting pool %s server selection policy to %s", poolName, policy->name);
862 } else {
863 vinfolog("Setting default pool server selection policy to %s", policy->name);
864 }
865 pool->policy = policy;
866 }
867
868 void addServerToPool(pools_t& pools, const string& poolName, std::shared_ptr<DownstreamState> server)
869 {
870 std::shared_ptr<ServerPool> pool = createPoolIfNotExists(pools, poolName);
871 if (!poolName.empty()) {
872 vinfolog("Adding server to pool %s", poolName);
873 } else {
874 vinfolog("Adding server to default pool");
875 }
876 pool->addServer(server);
877 }
878
879 void removeServerFromPool(pools_t& pools, const string& poolName, std::shared_ptr<DownstreamState> server)
880 {
881 std::shared_ptr<ServerPool> pool = getPool(pools, poolName);
882
883 if (!poolName.empty()) {
884 vinfolog("Removing server from pool %s", poolName);
885 }
886 else {
887 vinfolog("Removing server from default pool");
888 }
889
890 pool->removeServer(server);
891 }
892
893 std::shared_ptr<ServerPool> getPool(const pools_t& pools, const std::string& poolName)
894 {
895 pools_t::const_iterator it = pools.find(poolName);
896
897 if (it == pools.end()) {
898 throw std::out_of_range("No pool named " + poolName);
899 }
900
901 return it->second;
902 }
903
904 NumberedServerVector getDownstreamCandidates(const pools_t& pools, const std::string& poolName)
905 {
906 std::shared_ptr<ServerPool> pool = getPool(pools, poolName);
907 return pool->getServers();
908 }
909
910 static void spoofResponseFromString(DNSQuestion& dq, const string& spoofContent)
911 {
912 string result;
913
914 std::vector<std::string> addrs;
915 stringtok(addrs, spoofContent, " ,");
916
917 if (addrs.size() == 1) {
918 try {
919 ComboAddress spoofAddr(spoofContent);
920 SpoofAction sa({spoofAddr});
921 sa(&dq, &result);
922 }
923 catch(const PDNSException &e) {
924 SpoofAction sa(spoofContent); // CNAME then
925 sa(&dq, &result);
926 }
927 } else {
928 std::vector<ComboAddress> cas;
929 for (const auto& addr : addrs) {
930 try {
931 cas.push_back(ComboAddress(addr));
932 }
933 catch (...) {
934 }
935 }
936 SpoofAction sa(cas);
937 sa(&dq, &result);
938 }
939 }
940
941 bool processQuery(LocalHolders& holders, DNSQuestion& dq, string& poolname, int* delayMsec, const struct timespec& now)
942 {
943 g_rings.insertQuery(now,*dq.remote,*dq.qname,dq.qtype,dq.len,*dq.dh);
944
945 if(g_qcount.enabled) {
946 string qname = (*dq.qname).toString(".");
947 bool countQuery{true};
948 if(g_qcount.filter) {
949 std::lock_guard<std::mutex> lock(g_luamutex);
950 std::tie (countQuery, qname) = g_qcount.filter(dq);
951 }
952
953 if(countQuery) {
954 WriteLock wl(&g_qcount.queryLock);
955 if(!g_qcount.records.count(qname)) {
956 g_qcount.records[qname] = 0;
957 }
958 g_qcount.records[qname]++;
959 }
960 }
961
962 if(auto got = holders.dynNMGBlock->lookup(*dq.remote)) {
963 auto updateBlockStats = [&got]() {
964 g_stats.dynBlocked++;
965 got->second.blocks++;
966 };
967
968 if(now < got->second.until) {
969 DNSAction::Action action = got->second.action;
970 if (action == DNSAction::Action::None) {
971 action = g_dynBlockAction;
972 }
973 switch (action) {
974 case DNSAction::Action::NoOp:
975 /* do nothing */
976 break;
977 case DNSAction::Action::Refused:
978 vinfolog("Query from %s refused because of dynamic block", dq.remote->toStringWithPort());
979 updateBlockStats();
980
981 dq.dh->rcode = RCode::Refused;
982 dq.dh->qr=true;
983 return true;
984
985 case DNSAction::Action::Truncate:
986 if(!dq.tcp) {
987 updateBlockStats();
988 vinfolog("Query from %s truncated because of dynamic block", dq.remote->toStringWithPort());
989 dq.dh->tc = true;
990 dq.dh->qr = true;
991 return true;
992 }
993 else {
994 vinfolog("Query from %s for %s over TCP *not* truncated because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toString());
995 }
996 break;
997 default:
998 updateBlockStats();
999 vinfolog("Query from %s dropped because of dynamic block", dq.remote->toStringWithPort());
1000 return false;
1001 }
1002 }
1003 }
1004
1005 if(auto got = holders.dynSMTBlock->lookup(*dq.qname)) {
1006 auto updateBlockStats = [&got]() {
1007 g_stats.dynBlocked++;
1008 got->blocks++;
1009 };
1010
1011 if(now < got->until) {
1012 DNSAction::Action action = got->action;
1013 if (action == DNSAction::Action::None) {
1014 action = g_dynBlockAction;
1015 }
1016 switch (action) {
1017 case DNSAction::Action::NoOp:
1018 /* do nothing */
1019 break;
1020 case DNSAction::Action::Refused:
1021 vinfolog("Query from %s for %s refused because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toString());
1022 updateBlockStats();
1023
1024 dq.dh->rcode = RCode::Refused;
1025 dq.dh->qr=true;
1026 return true;
1027 case DNSAction::Action::Truncate:
1028 if(!dq.tcp) {
1029 updateBlockStats();
1030
1031 vinfolog("Query from %s for %s truncated because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toString());
1032 dq.dh->tc = true;
1033 dq.dh->qr = true;
1034 return true;
1035 }
1036 else {
1037 vinfolog("Query from %s for %s over TCP *not* truncated because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toString());
1038 }
1039 break;
1040 default:
1041 updateBlockStats();
1042 vinfolog("Query from %s for %s dropped because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toString());
1043 return false;
1044 }
1045 }
1046 }
1047
1048 DNSAction::Action action=DNSAction::Action::None;
1049 string ruleresult;
1050 for(const auto& lr : *holders.rulactions) {
1051 if(lr.d_rule->matches(&dq)) {
1052 lr.d_rule->d_matches++;
1053 action=(*lr.d_action)(&dq, &ruleresult);
1054
1055 switch(action) {
1056 case DNSAction::Action::Allow:
1057 return true;
1058 break;
1059 case DNSAction::Action::Drop:
1060 g_stats.ruleDrop++;
1061 return false;
1062 break;
1063 case DNSAction::Action::Nxdomain:
1064 dq.dh->rcode = RCode::NXDomain;
1065 dq.dh->qr=true;
1066 g_stats.ruleNXDomain++;
1067 return true;
1068 break;
1069 case DNSAction::Action::Refused:
1070 dq.dh->rcode = RCode::Refused;
1071 dq.dh->qr=true;
1072 g_stats.ruleRefused++;
1073 return true;
1074 break;
1075 case DNSAction::Action::ServFail:
1076 dq.dh->rcode = RCode::ServFail;
1077 dq.dh->qr=true;
1078 g_stats.ruleServFail++;
1079 return true;
1080 break;
1081 case DNSAction::Action::Spoof:
1082 spoofResponseFromString(dq, ruleresult);
1083 return true;
1084 break;
1085 case DNSAction::Action::Truncate:
1086 dq.dh->tc = true;
1087 dq.dh->qr = true;
1088 return true;
1089 break;
1090 case DNSAction::Action::HeaderModify:
1091 return true;
1092 break;
1093 case DNSAction::Action::Pool:
1094 poolname=ruleresult;
1095 return true;
1096 break;
1097 /* non-terminal actions follow */
1098 case DNSAction::Action::Delay:
1099 *delayMsec = static_cast<int>(pdns_stou(ruleresult)); // sorry
1100 break;
1101 case DNSAction::Action::None:
1102 /* fall-through */
1103 case DNSAction::Action::NoOp:
1104 break;
1105 }
1106 }
1107 }
1108
1109 return true;
1110 }
1111
1112 bool processResponse(LocalStateHolder<vector<DNSDistResponseRuleAction> >& localRespRulactions, DNSResponse& dr, int* delayMsec)
1113 {
1114 DNSResponseAction::Action action=DNSResponseAction::Action::None;
1115 std::string ruleresult;
1116 for(const auto& lr : *localRespRulactions) {
1117 if(lr.d_rule->matches(&dr)) {
1118 lr.d_rule->d_matches++;
1119 action=(*lr.d_action)(&dr, &ruleresult);
1120 switch(action) {
1121 case DNSResponseAction::Action::Allow:
1122 return true;
1123 break;
1124 case DNSResponseAction::Action::Drop:
1125 return false;
1126 break;
1127 case DNSResponseAction::Action::HeaderModify:
1128 return true;
1129 break;
1130 case DNSResponseAction::Action::ServFail:
1131 dr.dh->rcode = RCode::ServFail;
1132 return true;
1133 break;
1134 /* non-terminal actions follow */
1135 case DNSResponseAction::Action::Delay:
1136 *delayMsec = static_cast<int>(pdns_stou(ruleresult)); // sorry
1137 break;
1138 case DNSResponseAction::Action::None:
1139 break;
1140 }
1141 }
1142 }
1143
1144 return true;
1145 }
1146
1147 static ssize_t udpClientSendRequestToBackend(DownstreamState* ss, const int sd, const char* request, const size_t requestLen, bool healthCheck=false)
1148 {
1149 ssize_t result;
1150
1151 if (ss->sourceItf == 0) {
1152 result = send(sd, request, requestLen, 0);
1153 }
1154 else {
1155 struct msghdr msgh;
1156 struct iovec iov;
1157 char cbuf[256];
1158 ComboAddress remote(ss->remote);
1159 fillMSGHdr(&msgh, &iov, cbuf, sizeof(cbuf), const_cast<char*>(request), requestLen, &remote);
1160 addCMsgSrcAddr(&msgh, cbuf, &ss->sourceAddr, ss->sourceItf);
1161 result = sendmsg(sd, &msgh, 0);
1162 }
1163
1164 if (result == -1) {
1165 int savederrno = errno;
1166 vinfolog("Error sending request to backend %s: %d", ss->remote.toStringWithPort(), savederrno);
1167
1168 /* This might sound silly, but on Linux send() might fail with EINVAL
1169 if the interface the socket was bound to doesn't exist anymore.
1170 We don't want to reconnect the real socket if the healthcheck failed,
1171 because it's not using the same socket.
1172 */
1173 if (!healthCheck && (savederrno == EINVAL || savederrno == ENODEV)) {
1174 ss->reconnect();
1175 }
1176 }
1177
1178 return result;
1179 }
1180
1181 bool addXPF(DNSQuestion& dq, uint16_t optionCode)
1182 {
1183 std::string payload = generateXPFPayload(dq.tcp, *dq.remote, *dq.local);
1184 uint8_t root = '\0';
1185 dnsrecordheader drh;
1186 drh.d_type = htons(optionCode);
1187 drh.d_class = htons(QClass::IN);
1188 drh.d_ttl = 0;
1189 drh.d_clen = htons(payload.size());
1190 size_t recordHeaderLen = sizeof(root) + sizeof(drh);
1191
1192 size_t available = dq.size - dq.len;
1193
1194 if ((payload.size() + recordHeaderLen) > available) {
1195 return false;
1196 }
1197
1198 size_t pos = dq.len;
1199 memcpy(reinterpret_cast<char*>(dq.dh) + pos, &root, sizeof(root));
1200 pos += sizeof(root);
1201 memcpy(reinterpret_cast<char*>(dq.dh) + pos, &drh, sizeof(drh));
1202 pos += sizeof(drh);
1203 memcpy(reinterpret_cast<char*>(dq.dh) + pos, payload.data(), payload.size());
1204 pos += payload.size();
1205
1206 dq.len = pos;
1207
1208 dq.dh->arcount = htons(ntohs(dq.dh->arcount) + 1);
1209
1210 return true;
1211 }
1212
1213 static bool isUDPQueryAcceptable(ClientState& cs, LocalHolders& holders, const struct msghdr* msgh, const ComboAddress& remote, ComboAddress& dest)
1214 {
1215 if (msgh->msg_flags & MSG_TRUNC) {
1216 /* message was too large for our buffer */
1217 vinfolog("Dropping message too large for our buffer");
1218 g_stats.nonCompliantQueries++;
1219 return false;
1220 }
1221
1222 if(!holders.acl->match(remote)) {
1223 vinfolog("Query from %s dropped because of ACL", remote.toStringWithPort());
1224 g_stats.aclDrops++;
1225 return false;
1226 }
1227
1228 cs.queries++;
1229 g_stats.queries++;
1230
1231 if (HarvestDestinationAddress(msgh, &dest)) {
1232 /* we don't get the port, only the address */
1233 dest.sin4.sin_port = cs.local.sin4.sin_port;
1234 }
1235 else {
1236 dest.sin4.sin_family = 0;
1237 }
1238
1239 return true;
1240 }
1241
1242 #ifdef HAVE_DNSCRYPT
1243 static bool checkDNSCryptQuery(const ClientState& cs, const char* query, uint16_t& len, std::shared_ptr<DNSCryptQuery>& dnsCryptQuery, const ComboAddress& dest, const ComboAddress& remote, time_t now)
1244 {
1245 if (cs.dnscryptCtx) {
1246 vector<uint8_t> response;
1247 uint16_t decryptedQueryLen = 0;
1248
1249 dnsCryptQuery = std::make_shared<DNSCryptQuery>(cs.dnscryptCtx);
1250
1251 bool decrypted = handleDNSCryptQuery(const_cast<char*>(query), len, dnsCryptQuery, &decryptedQueryLen, false, now, response);
1252
1253 if (!decrypted) {
1254 if (response.size() > 0) {
1255 sendUDPResponse(cs.udpFD, reinterpret_cast<char*>(response.data()), static_cast<uint16_t>(response.size()), 0, dest, remote);
1256 }
1257 return false;
1258 }
1259
1260 len = decryptedQueryLen;
1261 }
1262 return true;
1263 }
1264 #endif /* HAVE_DNSCRYPT */
1265
1266 bool checkQueryHeaders(const struct dnsheader* dh)
1267 {
1268 if (dh->qr) { // don't respond to responses
1269 g_stats.nonCompliantQueries++;
1270 return false;
1271 }
1272
1273 if (dh->qdcount == 0) {
1274 g_stats.emptyQueries++;
1275 return false;
1276 }
1277
1278 if (dh->rd) {
1279 g_stats.rdQueries++;
1280 }
1281
1282 return true;
1283 }
1284
1285 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1286 static void queueResponse(const ClientState& cs, const char* response, uint16_t responseLen, const ComboAddress& dest, const ComboAddress& remote, struct mmsghdr& outMsg, struct iovec* iov, char* cbuf)
1287 {
1288 outMsg.msg_len = 0;
1289 fillMSGHdr(&outMsg.msg_hdr, iov, nullptr, 0, const_cast<char*>(response), responseLen, const_cast<ComboAddress*>(&remote));
1290
1291 if (dest.sin4.sin_family == 0) {
1292 outMsg.msg_hdr.msg_control = nullptr;
1293 }
1294 else {
1295 addCMsgSrcAddr(&outMsg.msg_hdr, cbuf, &dest, 0);
1296 }
1297 }
1298 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1299
1300 static void processUDPQuery(ClientState& cs, LocalHolders& holders, const struct msghdr* msgh, const ComboAddress& remote, ComboAddress& dest, char* query, uint16_t len, size_t queryBufferSize, struct mmsghdr* responsesVect, unsigned int* queuedResponses, struct iovec* respIOV, char* respCBuf)
1301 {
1302 assert(responsesVect == nullptr || (queuedResponses != nullptr && respIOV != nullptr && respCBuf != nullptr));
1303 uint16_t queryId = 0;
1304
1305 try {
1306 if (!isUDPQueryAcceptable(cs, holders, msgh, remote, dest)) {
1307 return;
1308 }
1309
1310 /* we need an accurate ("real") value for the response and
1311 to store into the IDS, but not for insertion into the
1312 rings for example */
1313 struct timespec queryRealTime;
1314 struct timespec now;
1315 gettime(&now);
1316 gettime(&queryRealTime, true);
1317
1318 #ifdef HAVE_DNSCRYPT
1319 std::shared_ptr<DNSCryptQuery> dnsCryptQuery = nullptr;
1320
1321 if (!checkDNSCryptQuery(cs, query, len, dnsCryptQuery, dest, remote, queryRealTime.tv_sec)) {
1322 return;
1323 }
1324 #endif
1325
1326 struct dnsheader* dh = reinterpret_cast<struct dnsheader*>(query);
1327 queryId = ntohs(dh->id);
1328
1329 if (!checkQueryHeaders(dh)) {
1330 return;
1331 }
1332
1333 string poolname;
1334 int delayMsec = 0;
1335 const uint16_t * flags = getFlagsFromDNSHeader(dh);
1336 const uint16_t origFlags = *flags;
1337 uint16_t qtype, qclass;
1338 unsigned int consumed = 0;
1339 DNSName qname(query, len, sizeof(dnsheader), false, &qtype, &qclass, &consumed);
1340 DNSQuestion dq(&qname, qtype, qclass, consumed, dest.sin4.sin_family != 0 ? &dest : &cs.local, &remote, dh, queryBufferSize, len, false, &queryRealTime);
1341
1342 if (!processQuery(holders, dq, poolname, &delayMsec, now))
1343 {
1344 return;
1345 }
1346
1347 if(dq.dh->qr) { // something turned it into a response
1348 fixUpQueryTurnedResponse(dq, origFlags);
1349
1350 if (!cs.muted) {
1351 char* response = query;
1352 uint16_t responseLen = dq.len;
1353
1354 DNSResponse dr(dq.qname, dq.qtype, dq.qclass, consumed, dq.local, dq.remote, reinterpret_cast<dnsheader*>(response), dq.size, responseLen, false, &queryRealTime);
1355 #ifdef HAVE_PROTOBUF
1356 dr.uniqueId = dq.uniqueId;
1357 #endif
1358 dr.qTag = dq.qTag;
1359
1360 if (!processResponse(holders.selfAnsweredRespRulactions, dr, &delayMsec)) {
1361 return;
1362 }
1363
1364 #ifdef HAVE_DNSCRYPT
1365 if (!encryptResponse(response, &responseLen, dq.size, false, dnsCryptQuery, nullptr, nullptr)) {
1366 return;
1367 }
1368 #endif
1369 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1370 if (delayMsec == 0 && responsesVect != nullptr) {
1371 queueResponse(cs, response, responseLen, dest, remote, responsesVect[*queuedResponses], respIOV, respCBuf);
1372 (*queuedResponses)++;
1373 }
1374 else
1375 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1376 {
1377 sendUDPResponse(cs.udpFD, response, responseLen, delayMsec, dest, remote);
1378 }
1379
1380 g_stats.selfAnswered++;
1381 doLatencyStats(0); // we're not going to measure this
1382 }
1383
1384 return;
1385 }
1386
1387 DownstreamState* ss = nullptr;
1388 std::shared_ptr<ServerPool> serverPool = getPool(*holders.pools, poolname);
1389 std::shared_ptr<DNSDistPacketCache> packetCache = serverPool->packetCache;
1390 auto policy = *(holders.policy);
1391 if (serverPool->policy != nullptr) {
1392 policy = *(serverPool->policy);
1393 }
1394 auto servers = serverPool->getServers();
1395 if (policy.isLua) {
1396 std::lock_guard<std::mutex> lock(g_luamutex);
1397 ss = policy.policy(servers, &dq).get();
1398 }
1399 else {
1400 ss = policy.policy(servers, &dq).get();
1401 }
1402
1403 bool ednsAdded = false;
1404 bool ecsAdded = false;
1405 if (dq.useECS && ((ss && ss->useECS) || (!ss && serverPool->getECS()))) {
1406 if (!handleEDNSClientSubnet(dq, &(ednsAdded), &(ecsAdded))) {
1407 vinfolog("Dropping query from %s because we couldn't insert the ECS value", remote.toStringWithPort());
1408 return;
1409 }
1410 }
1411
1412 uint32_t cacheKey = 0;
1413 boost::optional<Netmask> subnet;
1414 if (packetCache && !dq.skipCache) {
1415 uint16_t cachedResponseSize = dq.size;
1416 uint32_t allowExpired = ss ? 0 : g_staleCacheEntriesTTL;
1417 if (packetCache->get(dq, consumed, dh->id, query, &cachedResponseSize, &cacheKey, subnet, allowExpired)) {
1418 DNSResponse dr(dq.qname, dq.qtype, dq.qclass, dq.consumed, dq.local, dq.remote, reinterpret_cast<dnsheader*>(query), dq.size, cachedResponseSize, false, &queryRealTime);
1419 #ifdef HAVE_PROTOBUF
1420 dr.uniqueId = dq.uniqueId;
1421 #endif
1422 dr.qTag = dq.qTag;
1423
1424 if (!processResponse(holders.cacheHitRespRulactions, dr, &delayMsec)) {
1425 return;
1426 }
1427
1428 if (!cs.muted) {
1429 #ifdef HAVE_DNSCRYPT
1430 if (!encryptResponse(query, &cachedResponseSize, dq.size, false, dnsCryptQuery, nullptr, nullptr)) {
1431 return;
1432 }
1433 #endif
1434 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1435 if (delayMsec == 0 && responsesVect != nullptr) {
1436 queueResponse(cs, query, cachedResponseSize, dest, remote, responsesVect[*queuedResponses], respIOV, respCBuf);
1437 (*queuedResponses)++;
1438 }
1439 else
1440 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1441 {
1442 sendUDPResponse(cs.udpFD, query, cachedResponseSize, delayMsec, dest, remote);
1443 }
1444 }
1445
1446 g_stats.cacheHits++;
1447 doLatencyStats(0); // we're not going to measure this
1448 return;
1449 }
1450 g_stats.cacheMisses++;
1451 }
1452
1453 if(!ss) {
1454 g_stats.noPolicy++;
1455
1456 if (g_servFailOnNoPolicy && !cs.muted) {
1457 char* response = query;
1458 uint16_t responseLen = dq.len;
1459 restoreFlags(dh, origFlags);
1460
1461 dq.dh->rcode = RCode::ServFail;
1462 dq.dh->qr = true;
1463
1464 DNSResponse dr(dq.qname, dq.qtype, dq.qclass, dq.consumed, dq.local, dq.remote, reinterpret_cast<dnsheader*>(response), dq.size, responseLen, false, &queryRealTime);
1465 #ifdef HAVE_PROTOBUF
1466 dr.uniqueId = dq.uniqueId;
1467 #endif
1468 dr.qTag = dq.qTag;
1469
1470 if (!processResponse(holders.selfAnsweredRespRulactions, dr, &delayMsec)) {
1471 return;
1472 }
1473
1474 #ifdef HAVE_DNSCRYPT
1475 if (!encryptResponse(response, &responseLen, dq.size, false, dnsCryptQuery, nullptr, nullptr)) {
1476 return;
1477 }
1478 #endif
1479 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1480 if (responsesVect != nullptr) {
1481 queueResponse(cs, response, responseLen, dest, remote, responsesVect[*queuedResponses], respIOV, respCBuf);
1482 (*queuedResponses)++;
1483 }
1484 else
1485 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1486 {
1487 sendUDPResponse(cs.udpFD, response, responseLen, 0, dest, remote);
1488 }
1489
1490 // no response-only statistics counter to update.
1491 doLatencyStats(0); // we're not going to measure this
1492 }
1493 vinfolog("%s query for %s|%s from %s, no policy applied", g_servFailOnNoPolicy ? "ServFailed" : "Dropped", dq.qname->toString(), QType(dq.qtype).getName(), remote.toStringWithPort());
1494 return;
1495 }
1496
1497 if (dq.addXPF && ss->xpfRRCode != 0) {
1498 addXPF(dq, ss->xpfRRCode);
1499 }
1500
1501 ss->queries++;
1502
1503 unsigned int idOffset = (ss->idOffset++) % ss->idStates.size();
1504 IDState* ids = &ss->idStates[idOffset];
1505 ids->age = 0;
1506
1507 int oldFD = ids->origFD.exchange(cs.udpFD);
1508 if(oldFD < 0) {
1509 // if we are reusing, no change in outstanding
1510 ss->outstanding++;
1511 }
1512 else {
1513 ss->reuseds++;
1514 g_stats.downstreamTimeouts++;
1515 }
1516
1517 ids->cs = &cs;
1518 ids->origID = dh->id;
1519 ids->origRemote = remote;
1520 ids->sentTime.set(queryRealTime);
1521 ids->qname = qname;
1522 ids->qtype = dq.qtype;
1523 ids->qclass = dq.qclass;
1524 ids->delayMsec = delayMsec;
1525 ids->tempFailureTTL = dq.tempFailureTTL;
1526 ids->origFlags = origFlags;
1527 ids->cacheKey = cacheKey;
1528 ids->subnet = subnet;
1529 ids->skipCache = dq.skipCache;
1530 ids->packetCache = packetCache;
1531 ids->ednsAdded = ednsAdded;
1532 ids->ecsAdded = ecsAdded;
1533 ids->qTag = dq.qTag;
1534
1535 /* If we couldn't harvest the real dest addr, still
1536 write down the listening addr since it will be useful
1537 (especially if it's not an 'any' one).
1538 We need to keep track of which one it is since we may
1539 want to use the real but not the listening addr to reply.
1540 */
1541 if (dest.sin4.sin_family != 0) {
1542 ids->origDest = dest;
1543 ids->destHarvested = true;
1544 }
1545 else {
1546 ids->origDest = cs.local;
1547 ids->destHarvested = false;
1548 }
1549 #ifdef HAVE_DNSCRYPT
1550 ids->dnsCryptQuery = dnsCryptQuery;
1551 #endif
1552 #ifdef HAVE_PROTOBUF
1553 ids->uniqueId = dq.uniqueId;
1554 #endif
1555
1556 dh->id = idOffset;
1557
1558 int fd = pickBackendSocketForSending(ss);
1559 ssize_t ret = udpClientSendRequestToBackend(ss, fd, query, dq.len);
1560
1561 if(ret < 0) {
1562 ss->sendErrors++;
1563 g_stats.downstreamSendErrors++;
1564 }
1565
1566 vinfolog("Got query for %s|%s from %s, relayed to %s", ids->qname.toString(), QType(ids->qtype).getName(), remote.toStringWithPort(), ss->getName());
1567 }
1568 catch(const std::exception& e){
1569 vinfolog("Got an error in UDP question thread while parsing a query from %s, id %d: %s", remote.toStringWithPort(), queryId, e.what());
1570 }
1571 }
1572
1573 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1574 static void MultipleMessagesUDPClientThread(ClientState* cs, LocalHolders& holders)
1575 {
1576 struct MMReceiver
1577 {
1578 char packet[4096];
1579 /* used by HarvestDestinationAddress */
1580 char cbuf[256];
1581 ComboAddress remote;
1582 ComboAddress dest;
1583 struct iovec iov;
1584 };
1585 const size_t vectSize = g_udpVectorSize;
1586 /* the actual buffer is larger because:
1587 - we may have to add EDNS and/or ECS
1588 - we use it for self-generated responses (from rule or cache)
1589 but we only accept incoming payloads up to that size
1590 */
1591 static_assert(s_udpIncomingBufferSize <= sizeof(MMReceiver::packet), "the incoming buffer size should not be larger than sizeof(MMReceiver::packet)");
1592
1593 auto recvData = std::unique_ptr<MMReceiver[]>(new MMReceiver[vectSize]);
1594 auto msgVec = std::unique_ptr<struct mmsghdr[]>(new struct mmsghdr[vectSize]);
1595 auto outMsgVec = std::unique_ptr<struct mmsghdr[]>(new struct mmsghdr[vectSize]);
1596
1597 /* initialize the structures needed to receive our messages */
1598 for (size_t idx = 0; idx < vectSize; idx++) {
1599 recvData[idx].remote.sin4.sin_family = cs->local.sin4.sin_family;
1600 fillMSGHdr(&msgVec[idx].msg_hdr, &recvData[idx].iov, recvData[idx].cbuf, sizeof(recvData[idx].cbuf), recvData[idx].packet, s_udpIncomingBufferSize, &recvData[idx].remote);
1601 }
1602
1603 /* go now */
1604 for(;;) {
1605
1606 /* reset the IO vector, since it's also used to send the vector of responses
1607 to avoid having to copy the data around */
1608 for (size_t idx = 0; idx < vectSize; idx++) {
1609 recvData[idx].iov.iov_base = recvData[idx].packet;
1610 recvData[idx].iov.iov_len = sizeof(recvData[idx].packet);
1611 }
1612
1613 /* block until we have at least one message ready, but return
1614 as many as possible to save the syscall costs */
1615 int msgsGot = recvmmsg(cs->udpFD, msgVec.get(), vectSize, MSG_WAITFORONE | MSG_TRUNC, nullptr);
1616
1617 if (msgsGot <= 0) {
1618 vinfolog("Getting UDP messages via recvmmsg() failed with: %s", strerror(errno));
1619 continue;
1620 }
1621
1622 unsigned int msgsToSend = 0;
1623
1624 /* process the received messages */
1625 for (int msgIdx = 0; msgIdx < msgsGot; msgIdx++) {
1626 const struct msghdr* msgh = &msgVec[msgIdx].msg_hdr;
1627 unsigned int got = msgVec[msgIdx].msg_len;
1628 const ComboAddress& remote = recvData[msgIdx].remote;
1629
1630 if (got < sizeof(struct dnsheader)) {
1631 g_stats.nonCompliantQueries++;
1632 continue;
1633 }
1634
1635 processUDPQuery(*cs, holders, msgh, remote, recvData[msgIdx].dest, recvData[msgIdx].packet, static_cast<uint16_t>(got), sizeof(recvData[msgIdx].packet), outMsgVec.get(), &msgsToSend, &recvData[msgIdx].iov, recvData[msgIdx].cbuf);
1636
1637 }
1638
1639 /* immediate (not delayed or sent to a backend) responses (mostly from a rule, dynamic block
1640 or the cache) can be sent in batch too */
1641
1642 if (msgsToSend > 0 && msgsToSend <= static_cast<unsigned int>(msgsGot)) {
1643 int sent = sendmmsg(cs->udpFD, outMsgVec.get(), msgsToSend, 0);
1644
1645 if (sent < 0 || static_cast<unsigned int>(sent) != msgsToSend) {
1646 vinfolog("Error sending responses with sendmmsg() (%d on %u): %s", sent, msgsToSend, strerror(errno));
1647 }
1648 }
1649
1650 }
1651 }
1652 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1653
1654 // listens to incoming queries, sends out to downstream servers, noting the intended return path
1655 static void* udpClientThread(ClientState* cs)
1656 try
1657 {
1658 LocalHolders holders;
1659
1660 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1661 if (g_udpVectorSize > 1) {
1662 MultipleMessagesUDPClientThread(cs, holders);
1663
1664 }
1665 else
1666 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1667 {
1668 char packet[4096];
1669 /* the actual buffer is larger because:
1670 - we may have to add EDNS and/or ECS
1671 - we use it for self-generated responses (from rule or cache)
1672 but we only accept incoming payloads up to that size
1673 */
1674 static_assert(s_udpIncomingBufferSize <= sizeof(packet), "the incoming buffer size should not be larger than sizeof(MMReceiver::packet)");
1675 struct msghdr msgh;
1676 struct iovec iov;
1677 /* used by HarvestDestinationAddress */
1678 char cbuf[256];
1679
1680 ComboAddress remote;
1681 ComboAddress dest;
1682 remote.sin4.sin_family = cs->local.sin4.sin_family;
1683 fillMSGHdr(&msgh, &iov, cbuf, sizeof(cbuf), packet, sizeof(packet), &remote);
1684
1685 for(;;) {
1686 ssize_t got = recvmsg(cs->udpFD, &msgh, 0);
1687
1688 if (got < 0 || static_cast<size_t>(got) < sizeof(struct dnsheader)) {
1689 g_stats.nonCompliantQueries++;
1690 continue;
1691 }
1692
1693 processUDPQuery(*cs, holders, &msgh, remote, dest, packet, static_cast<uint16_t>(got), s_udpIncomingBufferSize, nullptr, nullptr, nullptr, nullptr);
1694 }
1695 }
1696
1697 return nullptr;
1698 }
1699 catch(const std::exception &e)
1700 {
1701 errlog("UDP client thread died because of exception: %s", e.what());
1702 return nullptr;
1703 }
1704 catch(const PDNSException &e)
1705 {
1706 errlog("UDP client thread died because of PowerDNS exception: %s", e.reason);
1707 return nullptr;
1708 }
1709 catch(...)
1710 {
1711 errlog("UDP client thread died because of an exception: %s", "unknown");
1712 return nullptr;
1713 }
1714
1715 static bool upCheck(DownstreamState& ds)
1716 try
1717 {
1718 DNSName checkName = ds.checkName;
1719 uint16_t checkType = ds.checkType.getCode();
1720 uint16_t checkClass = ds.checkClass;
1721 dnsheader checkHeader;
1722 memset(&checkHeader, 0, sizeof(checkHeader));
1723
1724 checkHeader.qdcount = htons(1);
1725 #ifdef HAVE_LIBSODIUM
1726 checkHeader.id = randombytes_random() % 65536;
1727 #else
1728 checkHeader.id = random() % 65536;
1729 #endif
1730
1731 checkHeader.rd = true;
1732 if (ds.setCD) {
1733 checkHeader.cd = true;
1734 }
1735
1736
1737 if (ds.checkFunction) {
1738 std::lock_guard<std::mutex> lock(g_luamutex);
1739 auto ret = ds.checkFunction(checkName, checkType, checkClass, &checkHeader);
1740 checkName = std::get<0>(ret);
1741 checkType = std::get<1>(ret);
1742 checkClass = std::get<2>(ret);
1743 }
1744
1745 vector<uint8_t> packet;
1746 DNSPacketWriter dpw(packet, checkName, checkType, checkClass);
1747 dnsheader * requestHeader = dpw.getHeader();
1748 *requestHeader = checkHeader;
1749
1750 Socket sock(ds.remote.sin4.sin_family, SOCK_DGRAM);
1751 sock.setNonBlocking();
1752 if (!IsAnyAddress(ds.sourceAddr)) {
1753 sock.setReuseAddr();
1754 sock.bind(ds.sourceAddr);
1755 }
1756 sock.connect(ds.remote);
1757 ssize_t sent = udpClientSendRequestToBackend(&ds, sock.getHandle(), (char*)&packet[0], packet.size(), true);
1758 if (sent < 0) {
1759 int ret = errno;
1760 if (g_verboseHealthChecks)
1761 infolog("Error while sending a health check query to backend %s: %d", ds.getNameWithAddr(), ret);
1762 return false;
1763 }
1764
1765 int ret=waitForRWData(sock.getHandle(), true, 1, 0);
1766 if(ret < 0 || !ret) { // error, timeout, both are down!
1767 if (ret < 0) {
1768 ret = errno;
1769 if (g_verboseHealthChecks)
1770 infolog("Error while waiting for the health check response from backend %s: %d", ds.getNameWithAddr(), ret);
1771 }
1772 else {
1773 if (g_verboseHealthChecks)
1774 infolog("Timeout while waiting for the health check response from backend %s", ds.getNameWithAddr());
1775 }
1776 return false;
1777 }
1778
1779 string reply;
1780 ComboAddress from;
1781 sock.recvFrom(reply, from);
1782
1783 /* we are using a connected socket but hey.. */
1784 if (from != ds.remote) {
1785 if (g_verboseHealthChecks)
1786 infolog("Invalid health check response received from %s, expecting one from %s", from.toStringWithPort(), ds.remote.toStringWithPort());
1787 return false;
1788 }
1789
1790 const dnsheader * responseHeader = reinterpret_cast<const dnsheader *>(reply.c_str());
1791
1792 if (reply.size() < sizeof(*responseHeader)) {
1793 if (g_verboseHealthChecks)
1794 infolog("Invalid health check response of size %d from backend %s, expecting at least %d", reply.size(), ds.getNameWithAddr(), sizeof(*responseHeader));
1795 return false;
1796 }
1797
1798 if (responseHeader->id != requestHeader->id) {
1799 if (g_verboseHealthChecks)
1800 infolog("Invalid health check response id %d from backend %s, expecting %d", responseHeader->id, ds.getNameWithAddr(), requestHeader->id);
1801 return false;
1802 }
1803
1804 if (!responseHeader->qr) {
1805 if (g_verboseHealthChecks)
1806 infolog("Invalid health check response from backend %s, expecting QR to be set", ds.getNameWithAddr());
1807 return false;
1808 }
1809
1810 if (responseHeader->rcode == RCode::ServFail) {
1811 if (g_verboseHealthChecks)
1812 infolog("Backend %s responded to health check with ServFail", ds.getNameWithAddr());
1813 return false;
1814 }
1815
1816 if (ds.mustResolve && (responseHeader->rcode == RCode::NXDomain || responseHeader->rcode == RCode::Refused)) {
1817 if (g_verboseHealthChecks)
1818 infolog("Backend %s responded to health check with %s while mustResolve is set", ds.getNameWithAddr(), responseHeader->rcode == RCode::NXDomain ? "NXDomain" : "Refused");
1819 return false;
1820 }
1821
1822 uint16_t receivedType;
1823 uint16_t receivedClass;
1824 DNSName receivedName(reply.c_str(), reply.size(), sizeof(dnsheader), false, &receivedType, &receivedClass);
1825
1826 if (receivedName != checkName || receivedType != checkType || receivedClass != checkClass) {
1827 if (g_verboseHealthChecks)
1828 infolog("Backend %s responded to health check with an invalid qname (%s vs %s), qtype (%s vs %s) or qclass (%d vs %d)", ds.getNameWithAddr(), receivedName.toLogString(), checkName.toLogString(), QType(receivedType).getName(), QType(checkType).getName(), receivedClass, checkClass);
1829 return false;
1830 }
1831
1832 return true;
1833 }
1834 catch(const std::exception& e)
1835 {
1836 if (g_verboseHealthChecks)
1837 infolog("Error checking the health of backend %s: %s", ds.getNameWithAddr(), e.what());
1838 return false;
1839 }
1840 catch(...)
1841 {
1842 if (g_verboseHealthChecks)
1843 infolog("Unknown exception while checking the health of backend %s", ds.getNameWithAddr());
1844 return false;
1845 }
1846
1847 uint64_t g_maxTCPClientThreads{10};
1848 std::atomic<uint16_t> g_cacheCleaningDelay{60};
1849 std::atomic<uint16_t> g_cacheCleaningPercentage{100};
1850
1851 void* maintThread()
1852 {
1853 int interval = 1;
1854 size_t counter = 0;
1855 int32_t secondsToWaitLog = 0;
1856
1857 for(;;) {
1858 sleep(interval);
1859
1860 {
1861 std::lock_guard<std::mutex> lock(g_luamutex);
1862 auto f = g_lua.readVariable<boost::optional<std::function<void()> > >("maintenance");
1863 if(f) {
1864 try {
1865 (*f)();
1866 secondsToWaitLog = 0;
1867 }
1868 catch(std::exception &e) {
1869 if (secondsToWaitLog <= 0) {
1870 infolog("Error during execution of maintenance function: %s", e.what());
1871 secondsToWaitLog = 61;
1872 }
1873 secondsToWaitLog -= interval;
1874 }
1875 }
1876 }
1877
1878 counter++;
1879 if (counter >= g_cacheCleaningDelay) {
1880 auto localPools = g_pools.getLocal();
1881 std::shared_ptr<DNSDistPacketCache> packetCache = nullptr;
1882 for (const auto& entry : *localPools) {
1883 packetCache = entry.second->packetCache;
1884 if (packetCache) {
1885 size_t upTo = (packetCache->getMaxEntries()* (100 - g_cacheCleaningPercentage)) / 100;
1886 packetCache->purgeExpired(upTo);
1887 }
1888 }
1889 counter = 0;
1890 }
1891
1892 // ponder pruning g_dynblocks of expired entries here
1893 }
1894 return 0;
1895 }
1896
1897 void* healthChecksThread()
1898 {
1899 int interval = 1;
1900
1901 for(;;) {
1902 sleep(interval);
1903
1904 if(g_tcpclientthreads->getQueuedCount() > 1 && !g_tcpclientthreads->hasReachedMaxThreads())
1905 g_tcpclientthreads->addTCPClientThread();
1906
1907 auto states = g_dstates.getLocal(); // this points to the actual shared_ptrs!
1908 for(auto& dss : *states) {
1909 if(dss->availability==DownstreamState::Availability::Auto) {
1910 bool newState=upCheck(*dss);
1911 if (newState) {
1912 if (dss->currentCheckFailures != 0) {
1913 dss->currentCheckFailures = 0;
1914 }
1915 }
1916 else if (!newState && dss->upStatus) {
1917 dss->currentCheckFailures++;
1918 if (dss->currentCheckFailures < dss->maxCheckFailures) {
1919 newState = true;
1920 }
1921 }
1922
1923 if(newState != dss->upStatus) {
1924 warnlog("Marking downstream %s as '%s'", dss->getNameWithAddr(), newState ? "up" : "down");
1925
1926 if (newState && !dss->connected) {
1927 newState = dss->reconnect();
1928
1929 if (dss->connected && !dss->threadStarted.test_and_set()) {
1930 dss->tid = thread(responderThread, dss);
1931 }
1932 }
1933
1934 dss->upStatus = newState;
1935 dss->currentCheckFailures = 0;
1936 if (g_snmpAgent && g_snmpTrapsEnabled) {
1937 g_snmpAgent->sendBackendStatusChangeTrap(dss);
1938 }
1939 }
1940 }
1941
1942 auto delta = dss->sw.udiffAndSet()/1000000.0;
1943 dss->queryLoad = 1.0*(dss->queries.load() - dss->prev.queries.load())/delta;
1944 dss->dropRate = 1.0*(dss->reuseds.load() - dss->prev.reuseds.load())/delta;
1945 dss->prev.queries.store(dss->queries.load());
1946 dss->prev.reuseds.store(dss->reuseds.load());
1947
1948 for(IDState& ids : dss->idStates) { // timeouts
1949 int origFD = ids.origFD;
1950 if(origFD >=0 && ids.age++ > g_udpTimeout) {
1951 /* We set origFD to -1 as soon as possible
1952 to limit the risk of racing with the
1953 responder thread.
1954 The UDP client thread only checks origFD to
1955 know whether outstanding has to be incremented,
1956 so the sooner the better any way since we _will_
1957 decrement it.
1958 */
1959 if (ids.origFD.exchange(-1) != origFD) {
1960 /* this state has been altered in the meantime,
1961 don't go anywhere near it */
1962 continue;
1963 }
1964 ids.age = 0;
1965 dss->reuseds++;
1966 --dss->outstanding;
1967 g_stats.downstreamTimeouts++; // this is an 'actively' discovered timeout
1968 vinfolog("Had a downstream timeout from %s (%s) for query for %s|%s from %s",
1969 dss->remote.toStringWithPort(), dss->name,
1970 ids.qname.toString(), QType(ids.qtype).getName(), ids.origRemote.toStringWithPort());
1971
1972 struct timespec ts;
1973 gettime(&ts);
1974
1975 struct dnsheader fake;
1976 memset(&fake, 0, sizeof(fake));
1977 fake.id = ids.origID;
1978
1979 g_rings.insertResponse(ts, ids.origRemote, ids.qname, ids.qtype, std::numeric_limits<unsigned int>::max(), 0, fake, dss->remote);
1980 }
1981 }
1982 }
1983 }
1984 return 0;
1985 }
1986
1987 static void bindAny(int af, int sock)
1988 {
1989 __attribute__((unused)) int one = 1;
1990
1991 #ifdef IP_FREEBIND
1992 if (setsockopt(sock, IPPROTO_IP, IP_FREEBIND, &one, sizeof(one)) < 0)
1993 warnlog("Warning: IP_FREEBIND setsockopt failed: %s", strerror(errno));
1994 #endif
1995
1996 #ifdef IP_BINDANY
1997 if (af == AF_INET)
1998 if (setsockopt(sock, IPPROTO_IP, IP_BINDANY, &one, sizeof(one)) < 0)
1999 warnlog("Warning: IP_BINDANY setsockopt failed: %s", strerror(errno));
2000 #endif
2001 #ifdef IPV6_BINDANY
2002 if (af == AF_INET6)
2003 if (setsockopt(sock, IPPROTO_IPV6, IPV6_BINDANY, &one, sizeof(one)) < 0)
2004 warnlog("Warning: IPV6_BINDANY setsockopt failed: %s", strerror(errno));
2005 #endif
2006 #ifdef SO_BINDANY
2007 if (setsockopt(sock, SOL_SOCKET, SO_BINDANY, &one, sizeof(one)) < 0)
2008 warnlog("Warning: SO_BINDANY setsockopt failed: %s", strerror(errno));
2009 #endif
2010 }
2011
2012 static void dropGroupPrivs(gid_t gid)
2013 {
2014 if (gid) {
2015 if (setgid(gid) == 0) {
2016 if (setgroups(0, NULL) < 0) {
2017 warnlog("Warning: Unable to drop supplementary gids: %s", strerror(errno));
2018 }
2019 }
2020 else {
2021 warnlog("Warning: Unable to set group ID to %d: %s", gid, strerror(errno));
2022 }
2023 }
2024 }
2025
2026 static void dropUserPrivs(uid_t uid)
2027 {
2028 if(uid) {
2029 if(setuid(uid) < 0) {
2030 warnlog("Warning: Unable to set user ID to %d: %s", uid, strerror(errno));
2031 }
2032 }
2033 }
2034
2035 static void checkFileDescriptorsLimits(size_t udpBindsCount, size_t tcpBindsCount)
2036 {
2037 /* stdin, stdout, stderr */
2038 size_t requiredFDsCount = 3;
2039 auto backends = g_dstates.getLocal();
2040 /* UDP sockets to backends */
2041 size_t backendUDPSocketsCount = 0;
2042 for (const auto& backend : *backends) {
2043 backendUDPSocketsCount += backend->sockets.size();
2044 }
2045 requiredFDsCount += backendUDPSocketsCount;
2046 /* TCP sockets to backends */
2047 requiredFDsCount += (backends->size() * g_maxTCPClientThreads);
2048 /* listening sockets */
2049 requiredFDsCount += udpBindsCount;
2050 requiredFDsCount += tcpBindsCount;
2051 /* max TCP connections currently served */
2052 requiredFDsCount += g_maxTCPClientThreads;
2053 /* max pipes for communicating between TCP acceptors and client threads */
2054 requiredFDsCount += (g_maxTCPClientThreads * 2);
2055 /* max TCP queued connections */
2056 requiredFDsCount += g_maxTCPQueuedConnections;
2057 /* DelayPipe pipe */
2058 requiredFDsCount += 2;
2059 /* syslog socket */
2060 requiredFDsCount++;
2061 /* webserver main socket */
2062 requiredFDsCount++;
2063 /* console main socket */
2064 requiredFDsCount++;
2065 /* carbon export */
2066 requiredFDsCount++;
2067 /* history file */
2068 requiredFDsCount++;
2069 struct rlimit rl;
2070 getrlimit(RLIMIT_NOFILE, &rl);
2071 if (rl.rlim_cur <= requiredFDsCount) {
2072 warnlog("Warning, this configuration can use more than %d file descriptors, web server and console connections not included, and the current limit is %d.", std::to_string(requiredFDsCount), std::to_string(rl.rlim_cur));
2073 #ifdef HAVE_SYSTEMD
2074 warnlog("You can increase this value by using LimitNOFILE= in the systemd unit file or ulimit.");
2075 #else
2076 warnlog("You can increase this value by using ulimit.");
2077 #endif
2078 }
2079 }
2080
2081 struct
2082 {
2083 vector<string> locals;
2084 vector<string> remotes;
2085 bool checkConfig{false};
2086 bool beClient{false};
2087 bool beSupervised{false};
2088 string command;
2089 string config;
2090 string uid;
2091 string gid;
2092 } g_cmdLine;
2093
2094 std::atomic<bool> g_configurationDone{false};
2095
2096 static void usage()
2097 {
2098 cout<<endl;
2099 cout<<"Syntax: dnsdist [-C,--config file] [-c,--client [IP[:PORT]]]\n";
2100 cout<<"[-e,--execute cmd] [-h,--help] [-l,--local addr]\n";
2101 cout<<"[-v,--verbose] [--check-config] [--version]\n";
2102 cout<<"\n";
2103 cout<<"-a,--acl netmask Add this netmask to the ACL\n";
2104 cout<<"-C,--config file Load configuration from 'file'\n";
2105 cout<<"-c,--client Operate as a client, connect to dnsdist. This reads\n";
2106 cout<<" controlSocket from your configuration file, but also\n";
2107 cout<<" accepts an IP:PORT argument\n";
2108 #ifdef HAVE_LIBSODIUM
2109 cout<<"-k,--setkey KEY Use KEY for encrypted communication to dnsdist. This\n";
2110 cout<<" is similar to setting setKey in the configuration file.\n";
2111 cout<<" NOTE: this will leak this key in your shell's history\n";
2112 cout<<" and in the systems running process list.\n";
2113 #endif
2114 cout<<"--check-config Validate the configuration file and exit. The exit-code\n";
2115 cout<<" reflects the validation, 0 is OK, 1 means an error.\n";
2116 cout<<" Any errors are printed as well.\n";
2117 cout<<"-e,--execute cmd Connect to dnsdist and execute 'cmd'\n";
2118 cout<<"-g,--gid gid Change the process group ID after binding sockets\n";
2119 cout<<"-h,--help Display this helpful message\n";
2120 cout<<"-l,--local address Listen on this local address\n";
2121 cout<<"--supervised Don't open a console, I'm supervised\n";
2122 cout<<" (use with e.g. systemd and daemontools)\n";
2123 cout<<"--disable-syslog Don't log to syslog, only to stdout\n";
2124 cout<<" (use with e.g. systemd)\n";
2125 cout<<"-u,--uid uid Change the process user ID after binding sockets\n";
2126 cout<<"-v,--verbose Enable verbose mode\n";
2127 cout<<"-V,--version Show dnsdist version information and exit\n";
2128 }
2129
2130 int main(int argc, char** argv)
2131 try
2132 {
2133 size_t udpBindsCount = 0;
2134 size_t tcpBindsCount = 0;
2135 rl_attempted_completion_function = my_completion;
2136 rl_completion_append_character = 0;
2137
2138 signal(SIGPIPE, SIG_IGN);
2139 signal(SIGCHLD, SIG_IGN);
2140 openlog("dnsdist", LOG_PID, LOG_DAEMON);
2141
2142 #ifdef HAVE_LIBSODIUM
2143 if (sodium_init() == -1) {
2144 cerr<<"Unable to initialize crypto library"<<endl;
2145 exit(EXIT_FAILURE);
2146 }
2147 g_hashperturb=randombytes_uniform(0xffffffff);
2148 srandom(randombytes_uniform(0xffffffff));
2149 #else
2150 {
2151 struct timeval tv;
2152 gettimeofday(&tv, 0);
2153 srandom(tv.tv_sec ^ tv.tv_usec ^ getpid());
2154 g_hashperturb=random();
2155 }
2156
2157 #endif
2158 ComboAddress clientAddress = ComboAddress();
2159 g_cmdLine.config=SYSCONFDIR "/dnsdist.conf";
2160 struct option longopts[]={
2161 {"acl", required_argument, 0, 'a'},
2162 {"check-config", no_argument, 0, 1},
2163 {"client", no_argument, 0, 'c'},
2164 {"config", required_argument, 0, 'C'},
2165 {"disable-syslog", no_argument, 0, 2},
2166 {"execute", required_argument, 0, 'e'},
2167 {"gid", required_argument, 0, 'g'},
2168 {"help", no_argument, 0, 'h'},
2169 {"local", required_argument, 0, 'l'},
2170 {"setkey", required_argument, 0, 'k'},
2171 {"supervised", no_argument, 0, 3},
2172 {"uid", required_argument, 0, 'u'},
2173 {"verbose", no_argument, 0, 'v'},
2174 {"version", no_argument, 0, 'V'},
2175 {0,0,0,0}
2176 };
2177 int longindex=0;
2178 string optstring;
2179 for(;;) {
2180 int c=getopt_long(argc, argv, "a:cC:e:g:hk:l:u:vV", longopts, &longindex);
2181 if(c==-1)
2182 break;
2183 switch(c) {
2184 case 1:
2185 g_cmdLine.checkConfig=true;
2186 break;
2187 case 2:
2188 g_syslog=false;
2189 break;
2190 case 3:
2191 g_cmdLine.beSupervised=true;
2192 break;
2193 case 'C':
2194 g_cmdLine.config=optarg;
2195 break;
2196 case 'c':
2197 g_cmdLine.beClient=true;
2198 break;
2199 case 'e':
2200 g_cmdLine.command=optarg;
2201 break;
2202 case 'g':
2203 g_cmdLine.gid=optarg;
2204 break;
2205 case 'h':
2206 cout<<"dnsdist "<<VERSION<<endl;
2207 usage();
2208 cout<<"\n";
2209 exit(EXIT_SUCCESS);
2210 break;
2211 case 'a':
2212 optstring=optarg;
2213 g_ACL.modify([optstring](NetmaskGroup& nmg) { nmg.addMask(optstring); });
2214 break;
2215 case 'k':
2216 #ifdef HAVE_LIBSODIUM
2217 if (B64Decode(string(optarg), g_consoleKey) < 0) {
2218 cerr<<"Unable to decode key '"<<optarg<<"'."<<endl;
2219 exit(EXIT_FAILURE);
2220 }
2221 #else
2222 cerr<<"dnsdist has been built without libsodium, -k/--setkey is unsupported."<<endl;
2223 exit(EXIT_FAILURE);
2224 #endif
2225 break;
2226 case 'l':
2227 g_cmdLine.locals.push_back(trim_copy(string(optarg)));
2228 break;
2229 case 'u':
2230 g_cmdLine.uid=optarg;
2231 break;
2232 case 'v':
2233 g_verbose=true;
2234 break;
2235 case 'V':
2236 #ifdef LUAJIT_VERSION
2237 cout<<"dnsdist "<<VERSION<<" ("<<LUA_RELEASE<<" ["<<LUAJIT_VERSION<<"])"<<endl;
2238 #else
2239 cout<<"dnsdist "<<VERSION<<" ("<<LUA_RELEASE<<")"<<endl;
2240 #endif
2241 cout<<"Enabled features: ";
2242 #ifdef HAVE_DNS_OVER_TLS
2243 cout<<"dns-over-tls(";
2244 #ifdef HAVE_GNUTLS
2245 cout<<"gnutls";
2246 #ifdef HAVE_LIBSSL
2247 cout<<" ";
2248 #endif
2249 #endif
2250 #ifdef HAVE_LIBSSL
2251 cout<<"openssl";
2252 #endif
2253 cout<<") ";
2254 #endif
2255 #ifdef HAVE_DNSCRYPT
2256 cout<<"dnscrypt ";
2257 #endif
2258 #ifdef HAVE_EBPF
2259 cout<<"ebpf ";
2260 #endif
2261 #ifdef HAVE_FSTRM
2262 cout<<"fstrm ";
2263 #endif
2264 #ifdef HAVE_LIBSODIUM
2265 cout<<"libsodium ";
2266 #endif
2267 #ifdef HAVE_PROTOBUF
2268 cout<<"protobuf ";
2269 #endif
2270 #ifdef HAVE_RE2
2271 cout<<"re2 ";
2272 #endif
2273 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
2274 cout<<"recvmmsg/sendmmsg ";
2275 #endif
2276 #ifdef HAVE_NET_SNMP
2277 cout<<"snmp ";
2278 #endif
2279 #ifdef HAVE_SYSTEMD
2280 cout<<"systemd";
2281 #endif
2282 cout<<endl;
2283 exit(EXIT_SUCCESS);
2284 break;
2285 case '?':
2286 //getopt_long printed an error message.
2287 usage();
2288 exit(EXIT_FAILURE);
2289 break;
2290 }
2291 }
2292
2293 argc-=optind;
2294 argv+=optind;
2295 for(auto p = argv; *p; ++p) {
2296 if(g_cmdLine.beClient) {
2297 clientAddress = ComboAddress(*p, 5199);
2298 } else {
2299 g_cmdLine.remotes.push_back(*p);
2300 }
2301 }
2302
2303 ServerPolicy leastOutstandingPol{"leastOutstanding", leastOutstanding, false};
2304
2305 g_policy.setState(leastOutstandingPol);
2306 if(g_cmdLine.beClient || !g_cmdLine.command.empty()) {
2307 setupLua(true, g_cmdLine.config);
2308 if (clientAddress != ComboAddress())
2309 g_serverControl = clientAddress;
2310 doClient(g_serverControl, g_cmdLine.command);
2311 _exit(EXIT_SUCCESS);
2312 }
2313
2314 auto acl = g_ACL.getCopy();
2315 if(acl.empty()) {
2316 for(auto& addr : {"127.0.0.0/8", "10.0.0.0/8", "100.64.0.0/10", "169.254.0.0/16", "192.168.0.0/16", "172.16.0.0/12", "::1/128", "fc00::/7", "fe80::/10"})
2317 acl.addMask(addr);
2318 g_ACL.setState(acl);
2319 }
2320
2321 auto consoleACL = g_consoleACL.getCopy();
2322 for (const auto& mask : { "127.0.0.1/8", "::1/128" }) {
2323 consoleACL.addMask(mask);
2324 }
2325 g_consoleACL.setState(consoleACL);
2326
2327 if (g_cmdLine.checkConfig) {
2328 setupLua(true, g_cmdLine.config);
2329 // No exception was thrown
2330 infolog("Configuration '%s' OK!", g_cmdLine.config);
2331 _exit(EXIT_SUCCESS);
2332 }
2333
2334 auto todo=setupLua(false, g_cmdLine.config);
2335
2336 auto localPools = g_pools.getCopy();
2337 {
2338 bool precompute = false;
2339 if (g_policy.getLocal()->name == "chashed") {
2340 precompute = true;
2341 } else {
2342 for (const auto& entry: localPools) {
2343 if (entry.second->policy != nullptr && entry.second->policy->name == "chashed") {
2344 precompute = true;
2345 break ;
2346 }
2347 }
2348 }
2349 if (precompute) {
2350 vinfolog("Pre-computing hashes for consistent hash load-balancing policy");
2351 // pre compute hashes
2352 auto backends = g_dstates.getLocal();
2353 for (auto& backend: *backends) {
2354 backend->hash();
2355 }
2356 }
2357 }
2358
2359 if(g_cmdLine.locals.size()) {
2360 g_locals.clear();
2361 for(auto loc : g_cmdLine.locals)
2362 g_locals.push_back(std::make_tuple(ComboAddress(loc, 53), true, false, 0, "", std::set<int>()));
2363 }
2364
2365 if(g_locals.empty())
2366 g_locals.push_back(std::make_tuple(ComboAddress("127.0.0.1", 53), true, false, 0, "", std::set<int>()));
2367
2368 g_configurationDone = true;
2369
2370 vector<ClientState*> toLaunch;
2371 for(const auto& local : g_locals) {
2372 ClientState* cs = new ClientState;
2373 cs->local= std::get<0>(local);
2374 cs->udpFD = SSocket(cs->local.sin4.sin_family, SOCK_DGRAM, 0);
2375 if(cs->local.sin4.sin_family == AF_INET6) {
2376 SSetsockopt(cs->udpFD, IPPROTO_IPV6, IPV6_V6ONLY, 1);
2377 }
2378 //if(g_vm.count("bind-non-local"))
2379 bindAny(cs->local.sin4.sin_family, cs->udpFD);
2380
2381 // if (!setSocketTimestamps(cs->udpFD))
2382 // g_log<<Logger::Warning<<"Unable to enable timestamp reporting for socket"<<endl;
2383
2384
2385 if(IsAnyAddress(cs->local)) {
2386 int one=1;
2387 setsockopt(cs->udpFD, IPPROTO_IP, GEN_IP_PKTINFO, &one, sizeof(one)); // linux supports this, so why not - might fail on other systems
2388 #ifdef IPV6_RECVPKTINFO
2389 setsockopt(cs->udpFD, IPPROTO_IPV6, IPV6_RECVPKTINFO, &one, sizeof(one));
2390 #endif
2391 }
2392
2393 if (std::get<2>(local)) {
2394 #ifdef SO_REUSEPORT
2395 SSetsockopt(cs->udpFD, SOL_SOCKET, SO_REUSEPORT, 1);
2396 #else
2397 warnlog("SO_REUSEPORT has been configured on local address '%s' but is not supported", std::get<0>(local).toStringWithPort());
2398 #endif
2399 }
2400
2401 const std::string& itf = std::get<4>(local);
2402 if (!itf.empty()) {
2403 #ifdef SO_BINDTODEVICE
2404 int res = setsockopt(cs->udpFD, SOL_SOCKET, SO_BINDTODEVICE, itf.c_str(), itf.length());
2405 if (res != 0) {
2406 warnlog("Error setting up the interface on local address '%s': %s", std::get<0>(local).toStringWithPort(), strerror(errno));
2407 }
2408 #else
2409 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", std::get<0>(local).toStringWithPort());
2410 #endif
2411 }
2412
2413 #ifdef HAVE_EBPF
2414 if (g_defaultBPFFilter) {
2415 cs->attachFilter(g_defaultBPFFilter);
2416 vinfolog("Attaching default BPF Filter to UDP frontend %s", cs->local.toStringWithPort());
2417 }
2418 #endif /* HAVE_EBPF */
2419
2420 cs->cpus = std::get<5>(local);
2421
2422 SBind(cs->udpFD, cs->local);
2423 toLaunch.push_back(cs);
2424 g_frontends.push_back(cs);
2425 udpBindsCount++;
2426 }
2427
2428 for(const auto& local : g_locals) {
2429 if(!std::get<1>(local)) { // no TCP/IP
2430 warnlog("Not providing TCP/IP service on local address '%s'", std::get<0>(local).toStringWithPort());
2431 continue;
2432 }
2433 ClientState* cs = new ClientState;
2434 cs->local= std::get<0>(local);
2435
2436 cs->tcpFD = SSocket(cs->local.sin4.sin_family, SOCK_STREAM, 0);
2437
2438 SSetsockopt(cs->tcpFD, SOL_SOCKET, SO_REUSEADDR, 1);
2439 #ifdef TCP_DEFER_ACCEPT
2440 SSetsockopt(cs->tcpFD, IPPROTO_TCP, TCP_DEFER_ACCEPT, 1);
2441 #endif
2442 if (std::get<3>(local) > 0) {
2443 #ifdef TCP_FASTOPEN
2444 SSetsockopt(cs->tcpFD, IPPROTO_TCP, TCP_FASTOPEN, std::get<3>(local));
2445 #else
2446 warnlog("TCP Fast Open has been configured on local address '%s' but is not supported", std::get<0>(local).toStringWithPort());
2447 #endif
2448 }
2449 if(cs->local.sin4.sin_family == AF_INET6) {
2450 SSetsockopt(cs->tcpFD, IPPROTO_IPV6, IPV6_V6ONLY, 1);
2451 }
2452 #ifdef SO_REUSEPORT
2453 /* no need to warn again if configured but support is not available, we already did for UDP */
2454 if (std::get<2>(local)) {
2455 SSetsockopt(cs->tcpFD, SOL_SOCKET, SO_REUSEPORT, 1);
2456 }
2457 #endif
2458
2459 const std::string& itf = std::get<4>(local);
2460 if (!itf.empty()) {
2461 #ifdef SO_BINDTODEVICE
2462 int res = setsockopt(cs->tcpFD, SOL_SOCKET, SO_BINDTODEVICE, itf.c_str(), itf.length());
2463 if (res != 0) {
2464 warnlog("Error setting up the interface on local address '%s': %s", std::get<0>(local).toStringWithPort(), strerror(errno));
2465 }
2466 #else
2467 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", std::get<0>(local).toStringWithPort());
2468 #endif
2469 }
2470
2471 #ifdef HAVE_EBPF
2472 if (g_defaultBPFFilter) {
2473 cs->attachFilter(g_defaultBPFFilter);
2474 vinfolog("Attaching default BPF Filter to TCP frontend %s", cs->local.toStringWithPort());
2475 }
2476 #endif /* HAVE_EBPF */
2477
2478 // if(g_vm.count("bind-non-local"))
2479 bindAny(cs->local.sin4.sin_family, cs->tcpFD);
2480 SBind(cs->tcpFD, cs->local);
2481 SListen(cs->tcpFD, 64);
2482 warnlog("Listening on %s", cs->local.toStringWithPort());
2483
2484 toLaunch.push_back(cs);
2485 g_frontends.push_back(cs);
2486 tcpBindsCount++;
2487 }
2488
2489 #ifdef HAVE_DNSCRYPT
2490 for(auto& dcLocal : g_dnsCryptLocals) {
2491 ClientState* cs = new ClientState;
2492 cs->local = std::get<0>(dcLocal);
2493 cs->dnscryptCtx = std::get<1>(dcLocal);
2494 cs->udpFD = SSocket(cs->local.sin4.sin_family, SOCK_DGRAM, 0);
2495 if(cs->local.sin4.sin_family == AF_INET6) {
2496 SSetsockopt(cs->udpFD, IPPROTO_IPV6, IPV6_V6ONLY, 1);
2497 }
2498 bindAny(cs->local.sin4.sin_family, cs->udpFD);
2499 if(IsAnyAddress(cs->local)) {
2500 int one=1;
2501 setsockopt(cs->udpFD, IPPROTO_IP, GEN_IP_PKTINFO, &one, sizeof(one)); // linux supports this, so why not - might fail on other systems
2502 #ifdef IPV6_RECVPKTINFO
2503 setsockopt(cs->udpFD, IPPROTO_IPV6, IPV6_RECVPKTINFO, &one, sizeof(one));
2504 #endif
2505 }
2506 if (std::get<2>(dcLocal)) {
2507 #ifdef SO_REUSEPORT
2508 SSetsockopt(cs->udpFD, SOL_SOCKET, SO_REUSEPORT, 1);
2509 #else
2510 warnlog("SO_REUSEPORT has been configured on local address '%s' but is not supported", std::get<0>(dcLocal).toStringWithPort());
2511 #endif
2512 }
2513
2514 const std::string& itf = std::get<4>(dcLocal);
2515 if (!itf.empty()) {
2516 #ifdef SO_BINDTODEVICE
2517 int res = setsockopt(cs->udpFD, SOL_SOCKET, SO_BINDTODEVICE, itf.c_str(), itf.length());
2518 if (res != 0) {
2519 warnlog("Error setting up the interface on local address '%s': %s", std::get<0>(dcLocal).toStringWithPort(), strerror(errno));
2520 }
2521 #else
2522 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", std::get<0>(dcLocal).toStringWithPort());
2523 #endif
2524 }
2525
2526 #ifdef HAVE_EBPF
2527 if (g_defaultBPFFilter) {
2528 cs->attachFilter(g_defaultBPFFilter);
2529 vinfolog("Attaching default BPF Filter to UDP DNSCrypt frontend %s", cs->local.toStringWithPort());
2530 }
2531 #endif /* HAVE_EBPF */
2532 SBind(cs->udpFD, cs->local);
2533 toLaunch.push_back(cs);
2534 g_frontends.push_back(cs);
2535 udpBindsCount++;
2536
2537 cs = new ClientState;
2538 cs->local = std::get<0>(dcLocal);
2539 cs->dnscryptCtx = std::get<1>(dcLocal);
2540 cs->tcpFD = SSocket(cs->local.sin4.sin_family, SOCK_STREAM, 0);
2541 SSetsockopt(cs->tcpFD, SOL_SOCKET, SO_REUSEADDR, 1);
2542 #ifdef TCP_DEFER_ACCEPT
2543 SSetsockopt(cs->tcpFD, IPPROTO_TCP, TCP_DEFER_ACCEPT, 1);
2544 #endif
2545 if (std::get<3>(dcLocal) > 0) {
2546 #ifdef TCP_FASTOPEN
2547 SSetsockopt(cs->tcpFD, IPPROTO_TCP, TCP_FASTOPEN, std::get<3>(dcLocal));
2548 #else
2549 warnlog("TCP Fast Open has been configured on local address '%s' but is not supported", std::get<0>(dcLocal).toStringWithPort());
2550 #endif
2551 }
2552
2553 #ifdef SO_REUSEPORT
2554 /* no need to warn again if configured but support is not available, we already did for UDP */
2555 if (std::get<2>(dcLocal)) {
2556 SSetsockopt(cs->tcpFD, SOL_SOCKET, SO_REUSEPORT, 1);
2557 }
2558 #endif
2559
2560 if (!itf.empty()) {
2561 #ifdef SO_BINDTODEVICE
2562 int res = setsockopt(cs->tcpFD, SOL_SOCKET, SO_BINDTODEVICE, itf.c_str(), itf.length());
2563 if (res != 0) {
2564 warnlog("Error setting up the interface on local address '%s': %s", std::get<0>(dcLocal).toStringWithPort(), strerror(errno));
2565 }
2566 #else
2567 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", std::get<0>(dcLocal).toStringWithPort());
2568 #endif
2569 }
2570
2571 if(cs->local.sin4.sin_family == AF_INET6) {
2572 SSetsockopt(cs->tcpFD, IPPROTO_IPV6, IPV6_V6ONLY, 1);
2573 }
2574 #ifdef HAVE_EBPF
2575 if (g_defaultBPFFilter) {
2576 cs->attachFilter(g_defaultBPFFilter);
2577 vinfolog("Attaching default BPF Filter to TCP DNSCrypt frontend %s", cs->local.toStringWithPort());
2578 }
2579 #endif /* HAVE_EBPF */
2580
2581 cs->cpus = std::get<5>(dcLocal);
2582
2583 bindAny(cs->local.sin4.sin_family, cs->tcpFD);
2584 SBind(cs->tcpFD, cs->local);
2585 SListen(cs->tcpFD, 64);
2586 warnlog("Listening on %s", cs->local.toStringWithPort());
2587 toLaunch.push_back(cs);
2588 g_frontends.push_back(cs);
2589 tcpBindsCount++;
2590 }
2591 #endif
2592
2593 for(auto& frontend : g_tlslocals) {
2594 ClientState* cs = new ClientState;
2595 cs->local = frontend->d_addr;
2596 cs->tcpFD = SSocket(cs->local.sin4.sin_family, SOCK_STREAM, 0);
2597 SSetsockopt(cs->tcpFD, SOL_SOCKET, SO_REUSEADDR, 1);
2598 #ifdef TCP_DEFER_ACCEPT
2599 SSetsockopt(cs->tcpFD, IPPROTO_TCP, TCP_DEFER_ACCEPT, 1);
2600 #endif
2601 if (frontend->d_tcpFastOpenQueueSize > 0) {
2602 #ifdef TCP_FASTOPEN
2603 SSetsockopt(cs->tcpFD, IPPROTO_TCP, TCP_FASTOPEN, frontend->d_tcpFastOpenQueueSize);
2604 #else
2605 warnlog("TCP Fast Open has been configured on local address '%s' but is not supported", cs->local.toStringWithPort());
2606 #endif
2607 }
2608 if (frontend->d_reusePort) {
2609 #ifdef SO_REUSEPORT
2610 SSetsockopt(cs->tcpFD, SOL_SOCKET, SO_REUSEPORT, 1);
2611 #else
2612 warnlog("SO_REUSEPORT has been configured on local address '%s' but is not supported", cs->local.toStringWithPort());
2613 #endif
2614 }
2615 if(cs->local.sin4.sin_family == AF_INET6) {
2616 SSetsockopt(cs->tcpFD, IPPROTO_IPV6, IPV6_V6ONLY, 1);
2617 }
2618
2619 if (!frontend->d_interface.empty()) {
2620 #ifdef SO_BINDTODEVICE
2621 int res = setsockopt(cs->tcpFD, SOL_SOCKET, SO_BINDTODEVICE, frontend->d_interface.c_str(), frontend->d_interface.length());
2622 if (res != 0) {
2623 warnlog("Error setting up the interface on local address '%s': %s", cs->local.toStringWithPort(), strerror(errno));
2624 }
2625 #else
2626 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", cs->local.toStringWithPort());
2627 #endif
2628 }
2629
2630 cs->cpus = frontend->d_cpus;
2631
2632 bindAny(cs->local.sin4.sin_family, cs->tcpFD);
2633 if (frontend->setupTLS()) {
2634 cs->tlsFrontend = frontend;
2635 SBind(cs->tcpFD, cs->local);
2636 SListen(cs->tcpFD, 64);
2637 warnlog("Listening on %s for TLS", cs->local.toStringWithPort());
2638 toLaunch.push_back(cs);
2639 g_frontends.push_back(cs);
2640 tcpBindsCount++;
2641 }
2642 else {
2643 delete cs;
2644 errlog("Error while setting up TLS on local address '%s', exiting", cs->local.toStringWithPort());
2645 _exit(EXIT_FAILURE);
2646 }
2647 }
2648
2649 warnlog("dnsdist %s comes with ABSOLUTELY NO WARRANTY. This is free software, and you are welcome to redistribute it according to the terms of the GPL version 2", VERSION);
2650
2651 vector<string> vec;
2652 std::string acls;
2653 g_ACL.getLocal()->toStringVector(&vec);
2654 for(const auto& s : vec) {
2655 if (!acls.empty())
2656 acls += ", ";
2657 acls += s;
2658 }
2659 infolog("ACL allowing queries from: %s", acls.c_str());
2660 vec.clear();
2661 acls.clear();
2662 g_consoleACL.getLocal()->toStringVector(&vec);
2663 for (const auto& entry : vec) {
2664 if (!acls.empty()) {
2665 acls += ", ";
2666 }
2667 acls += entry;
2668 }
2669 infolog("Console ACL allowing connections from: %s", acls.c_str());
2670
2671 #ifdef HAVE_LIBSODIUM
2672 if (g_consoleEnabled && g_consoleKey.empty()) {
2673 warnlog("Warning, the console has been enabled via 'controlSocket()' but no key has been set with 'setKey()' so all connections will fail until a key has been set");
2674 }
2675 #endif
2676
2677 uid_t newgid=0;
2678 gid_t newuid=0;
2679
2680 if(!g_cmdLine.gid.empty())
2681 newgid = strToGID(g_cmdLine.gid.c_str());
2682
2683 if(!g_cmdLine.uid.empty())
2684 newuid = strToUID(g_cmdLine.uid.c_str());
2685
2686 dropGroupPrivs(newgid);
2687 dropUserPrivs(newuid);
2688
2689 /* this need to be done _after_ dropping privileges */
2690 g_delay = new DelayPipe<DelayedPacket>();
2691
2692 if (g_snmpAgent) {
2693 g_snmpAgent->run();
2694 }
2695
2696 g_tcpclientthreads = std::make_shared<TCPClientCollection>(g_maxTCPClientThreads, g_useTCPSinglePipe);
2697
2698 for(auto& t : todo)
2699 t();
2700
2701 localPools = g_pools.getCopy();
2702 /* create the default pool no matter what */
2703 createPoolIfNotExists(localPools, "");
2704 if(g_cmdLine.remotes.size()) {
2705 for(const auto& address : g_cmdLine.remotes) {
2706 auto ret=std::make_shared<DownstreamState>(ComboAddress(address, 53));
2707 addServerToPool(localPools, "", ret);
2708 if (ret->connected && !ret->threadStarted.test_and_set()) {
2709 ret->tid = thread(responderThread, ret);
2710 }
2711 g_dstates.modify([ret](servers_t& servers) { servers.push_back(ret); });
2712 }
2713 }
2714 g_pools.setState(localPools);
2715
2716 if(g_dstates.getLocal()->empty()) {
2717 errlog("No downstream servers defined: all packets will get dropped");
2718 // you might define them later, but you need to know
2719 }
2720
2721 checkFileDescriptorsLimits(udpBindsCount, tcpBindsCount);
2722
2723 for(auto& dss : g_dstates.getCopy()) { // it is a copy, but the internal shared_ptrs are the real deal
2724 if(dss->availability==DownstreamState::Availability::Auto) {
2725 bool newState=upCheck(*dss);
2726 warnlog("Marking downstream %s as '%s'", dss->getNameWithAddr(), newState ? "up" : "down");
2727 dss->upStatus = newState;
2728 }
2729 }
2730
2731 for(auto& cs : toLaunch) {
2732 if (cs->udpFD >= 0) {
2733 thread t1(udpClientThread, cs);
2734 if (!cs->cpus.empty()) {
2735 mapThreadToCPUList(t1.native_handle(), cs->cpus);
2736 }
2737 t1.detach();
2738 }
2739 else if (cs->tcpFD >= 0) {
2740 thread t1(tcpAcceptorThread, cs);
2741 if (!cs->cpus.empty()) {
2742 mapThreadToCPUList(t1.native_handle(), cs->cpus);
2743 }
2744 t1.detach();
2745 }
2746 }
2747
2748 thread carbonthread(carbonDumpThread);
2749 carbonthread.detach();
2750
2751 thread stattid(maintThread);
2752 stattid.detach();
2753
2754 thread healththread(healthChecksThread);
2755
2756 if(g_cmdLine.beSupervised) {
2757 #ifdef HAVE_SYSTEMD
2758 sd_notify(0, "READY=1");
2759 #endif
2760 healththread.join();
2761 }
2762 else {
2763 healththread.detach();
2764 doConsole();
2765 }
2766 _exit(EXIT_SUCCESS);
2767
2768 }
2769 catch(const LuaContext::ExecutionErrorException& e) {
2770 try {
2771 errlog("Fatal Lua error: %s", e.what());
2772 std::rethrow_if_nested(e);
2773 } catch(const std::exception& ne) {
2774 errlog("Details: %s", ne.what());
2775 }
2776 catch(PDNSException &ae)
2777 {
2778 errlog("Fatal pdns error: %s", ae.reason);
2779 }
2780 _exit(EXIT_FAILURE);
2781 }
2782 catch(std::exception &e)
2783 {
2784 errlog("Fatal error: %s", e.what());
2785 _exit(EXIT_FAILURE);
2786 }
2787 catch(PDNSException &ae)
2788 {
2789 errlog("Fatal pdns error: %s", ae.reason);
2790 _exit(EXIT_FAILURE);
2791 }