]> git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/dnsdist.cc
dnsdist: Add 'setSyslogFacility()'
[thirdparty/pdns.git] / pdns / dnsdist.cc
1 /*
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
8 *
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
22
23 #include "config.h"
24
25 #include <fstream>
26 #include <getopt.h>
27 #include <grp.h>
28 #include <limits>
29 #include <netinet/tcp.h>
30 #include <pwd.h>
31 #include <sys/resource.h>
32 #include <unistd.h>
33
34 #if defined (__OpenBSD__) || defined(__NetBSD__)
35 #include <readline/readline.h>
36 #else
37 #include <editline/readline.h>
38 #endif
39
40 #ifdef HAVE_SYSTEMD
41 #include <systemd/sd-daemon.h>
42 #endif
43
44 #include "dnsdist.hh"
45 #include "dnsdist-cache.hh"
46 #include "dnsdist-console.hh"
47 #include "dnsdist-ecs.hh"
48 #include "dnsdist-lua.hh"
49 #include "dnsdist-rings.hh"
50 #include "dnsdist-secpoll.hh"
51 #include "dnsdist-xpf.hh"
52
53 #include "base64.hh"
54 #include "delaypipe.hh"
55 #include "dolog.hh"
56 #include "dnsname.hh"
57 #include "dnsparser.hh"
58 #include "dnswriter.hh"
59 #include "ednsoptions.hh"
60 #include "gettime.hh"
61 #include "lock.hh"
62 #include "misc.hh"
63 #include "sodcrypto.hh"
64 #include "sstuff.hh"
65 #include "threadname.hh"
66
67 /* Known sins:
68
69 Receiver is currently single threaded
70 not *that* bad actually, but now that we are thread safe, might want to scale
71 */
72
73 /* the Rulaction plan
74 Set of Rules, if one matches, it leads to an Action
75 Both rules and actions could conceivably be Lua based.
76 On the C++ side, both could be inherited from a class Rule and a class Action,
77 on the Lua side we can't do that. */
78
79 using std::atomic;
80 using std::thread;
81 bool g_verbose;
82
83 struct DNSDistStats g_stats;
84 MetricDefinitionStorage g_metricDefinitions;
85
86 uint16_t g_maxOutstanding{10240};
87 bool g_verboseHealthChecks{false};
88 uint32_t g_staleCacheEntriesTTL{0};
89 bool g_syslog{true};
90 bool g_allowEmptyResponse{false};
91
92 GlobalStateHolder<NetmaskGroup> g_ACL;
93 string g_outputBuffer;
94
95 vector<std::tuple<ComboAddress, bool, bool, int, string, std::set<int>>> g_locals;
96 std::vector<std::shared_ptr<TLSFrontend>> g_tlslocals;
97 std::vector<std::tuple<ComboAddress,std::shared_ptr<DNSCryptContext>,bool, int, string, std::set<int> >> g_dnsCryptLocals;
98 #ifdef HAVE_EBPF
99 shared_ptr<BPFFilter> g_defaultBPFFilter;
100 std::vector<std::shared_ptr<DynBPFFilter> > g_dynBPFFilters;
101 #endif /* HAVE_EBPF */
102 vector<ClientState *> g_frontends;
103 GlobalStateHolder<pools_t> g_pools;
104 size_t g_udpVectorSize{1};
105
106 bool g_snmpEnabled{false};
107 bool g_snmpTrapsEnabled{false};
108 DNSDistSNMPAgent* g_snmpAgent{nullptr};
109
110 /* UDP: the grand design. Per socket we listen on for incoming queries there is one thread.
111 Then we have a bunch of connected sockets for talking to downstream servers.
112 We send directly to those sockets.
113
114 For the return path, per downstream server we have a thread that listens to responses.
115
116 Per socket there is an array of 2^16 states, when we send out a packet downstream, we note
117 there the original requestor and the original id. The new ID is the offset in the array.
118
119 When an answer comes in on a socket, we look up the offset by the id, and lob it to the
120 original requestor.
121
122 IDs are assigned by atomic increments of the socket offset.
123 */
124
125 GlobalStateHolder<vector<DNSDistRuleAction> > g_rulactions;
126 GlobalStateHolder<vector<DNSDistResponseRuleAction> > g_resprulactions;
127 GlobalStateHolder<vector<DNSDistResponseRuleAction> > g_cachehitresprulactions;
128 GlobalStateHolder<vector<DNSDistResponseRuleAction> > g_selfansweredresprulactions;
129
130 Rings g_rings;
131 QueryCount g_qcount;
132
133 GlobalStateHolder<servers_t> g_dstates;
134 GlobalStateHolder<NetmaskTree<DynBlock>> g_dynblockNMG;
135 GlobalStateHolder<SuffixMatchTree<DynBlock>> g_dynblockSMT;
136 DNSAction::Action g_dynBlockAction = DNSAction::Action::Drop;
137 int g_tcpRecvTimeout{2};
138 int g_tcpSendTimeout{2};
139 int g_udpTimeout{2};
140
141 bool g_servFailOnNoPolicy{false};
142 bool g_truncateTC{false};
143 bool g_fixupCase{false};
144 bool g_preserveTrailingData{false};
145
146 static void truncateTC(char* packet, uint16_t* len, size_t responseSize, unsigned int consumed)
147 try
148 {
149 bool hadEDNS = false;
150 uint16_t payloadSize = 0;
151 uint16_t z = 0;
152
153 if (g_addEDNSToSelfGeneratedResponses) {
154 hadEDNS = getEDNSUDPPayloadSizeAndZ(packet, *len, &payloadSize, &z);
155 }
156
157 *len=static_cast<uint16_t>(sizeof(dnsheader)+consumed+DNS_TYPE_SIZE+DNS_CLASS_SIZE);
158 struct dnsheader* dh = reinterpret_cast<struct dnsheader*>(packet);
159 dh->ancount = dh->arcount = dh->nscount = 0;
160
161 if (hadEDNS) {
162 addEDNS(dh, *len, responseSize, z & EDNS_HEADER_FLAG_DO, payloadSize, 0);
163 }
164 }
165 catch(...)
166 {
167 g_stats.truncFail++;
168 }
169
170 struct DelayedPacket
171 {
172 int fd;
173 string packet;
174 ComboAddress destination;
175 ComboAddress origDest;
176 void operator()()
177 {
178 ssize_t res;
179 if(origDest.sin4.sin_family == 0) {
180 res = sendto(fd, packet.c_str(), packet.size(), 0, (struct sockaddr*)&destination, destination.getSocklen());
181 }
182 else {
183 res = sendfromto(fd, packet.c_str(), packet.size(), 0, origDest, destination);
184 }
185 if (res == -1) {
186 int err = errno;
187 vinfolog("Error sending delayed response to %s: %s", destination.toStringWithPort(), strerror(err));
188 }
189 }
190 };
191
192 DelayPipe<DelayedPacket>* g_delay = nullptr;
193
194 void doLatencyStats(double udiff)
195 {
196 if(udiff < 1000) ++g_stats.latency0_1;
197 else if(udiff < 10000) ++g_stats.latency1_10;
198 else if(udiff < 50000) ++g_stats.latency10_50;
199 else if(udiff < 100000) ++g_stats.latency50_100;
200 else if(udiff < 1000000) ++g_stats.latency100_1000;
201 else ++g_stats.latencySlow;
202
203 auto doAvg = [](double& var, double n, double weight) {
204 var = (weight -1) * var/weight + n/weight;
205 };
206
207 doAvg(g_stats.latencyAvg100, udiff, 100);
208 doAvg(g_stats.latencyAvg1000, udiff, 1000);
209 doAvg(g_stats.latencyAvg10000, udiff, 10000);
210 doAvg(g_stats.latencyAvg1000000, udiff, 1000000);
211 }
212
213 bool responseContentMatches(const char* response, const uint16_t responseLen, const DNSName& qname, const uint16_t qtype, const uint16_t qclass, const ComboAddress& remote, unsigned int& consumed)
214 {
215 if (responseLen < sizeof(dnsheader)) {
216 return false;
217 }
218
219 const struct dnsheader* dh = reinterpret_cast<const struct dnsheader*>(response);
220 if (dh->qdcount == 0) {
221 if ((dh->rcode != RCode::NoError && dh->rcode != RCode::NXDomain) || g_allowEmptyResponse) {
222 return true;
223 }
224 else {
225 ++g_stats.nonCompliantResponses;
226 return false;
227 }
228 }
229
230 uint16_t rqtype, rqclass;
231 DNSName rqname;
232 try {
233 rqname=DNSName(response, responseLen, sizeof(dnsheader), false, &rqtype, &rqclass, &consumed);
234 }
235 catch(const std::exception& e) {
236 if(responseLen > 0 && static_cast<size_t>(responseLen) > sizeof(dnsheader)) {
237 infolog("Backend %s sent us a response with id %d that did not parse: %s", remote.toStringWithPort(), ntohs(dh->id), e.what());
238 }
239 ++g_stats.nonCompliantResponses;
240 return false;
241 }
242
243 if (rqtype != qtype || rqclass != qclass || rqname != qname) {
244 return false;
245 }
246
247 return true;
248 }
249
250 static void restoreFlags(struct dnsheader* dh, uint16_t origFlags)
251 {
252 static const uint16_t rdMask = 1 << FLAGS_RD_OFFSET;
253 static const uint16_t cdMask = 1 << FLAGS_CD_OFFSET;
254 static const uint16_t restoreFlagsMask = UINT16_MAX & ~(rdMask | cdMask);
255 uint16_t * flags = getFlagsFromDNSHeader(dh);
256 /* clear the flags we are about to restore */
257 *flags &= restoreFlagsMask;
258 /* only keep the flags we want to restore */
259 origFlags &= ~restoreFlagsMask;
260 /* set the saved flags as they were */
261 *flags |= origFlags;
262 }
263
264 static bool fixUpQueryTurnedResponse(DNSQuestion& dq, const uint16_t origFlags)
265 {
266 restoreFlags(dq.dh, origFlags);
267
268 return addEDNSToQueryTurnedResponse(dq);
269 }
270
271 static bool fixUpResponse(char** response, uint16_t* responseLen, size_t* responseSize, const DNSName& qname, uint16_t origFlags, bool ednsAdded, bool ecsAdded, std::vector<uint8_t>& rewrittenResponse, uint16_t addRoom, bool* zeroScope)
272 {
273 if (*responseLen < sizeof(dnsheader)) {
274 return false;
275 }
276
277 struct dnsheader* dh = reinterpret_cast<struct dnsheader*>(*response);
278 restoreFlags(dh, origFlags);
279
280 if (*responseLen == sizeof(dnsheader)) {
281 return true;
282 }
283
284 if(g_fixupCase) {
285 string realname = qname.toDNSString();
286 if (*responseLen >= (sizeof(dnsheader) + realname.length())) {
287 memcpy(*response + sizeof(dnsheader), realname.c_str(), realname.length());
288 }
289 }
290
291 if (ednsAdded || ecsAdded) {
292 uint16_t optStart;
293 size_t optLen = 0;
294 bool last = false;
295
296 const std::string responseStr(*response, *responseLen);
297 int res = locateEDNSOptRR(responseStr, &optStart, &optLen, &last);
298
299 if (res == 0) {
300 if (zeroScope) { // this finds if an EDNS Client Subnet scope was set, and if it is 0
301 size_t optContentStart = 0;
302 uint16_t optContentLen = 0;
303 /* we need at least 4 bytes after the option length (family: 2, source prefix-length: 1, scope prefix-length: 1) */
304 if (isEDNSOptionInOpt(responseStr, optStart, optLen, EDNSOptionCode::ECS, &optContentStart, &optContentLen) && optContentLen >= 4) {
305 /* see if the EDNS Client Subnet SCOPE PREFIX-LENGTH byte in position 3 is set to 0, which is the only thing
306 we care about. */
307 *zeroScope = responseStr.at(optContentStart + 3) == 0;
308 }
309 }
310
311 if (ednsAdded) {
312 /* we added the entire OPT RR,
313 therefore we need to remove it entirely */
314 if (last) {
315 /* simply remove the last AR */
316 *responseLen -= optLen;
317 uint16_t arcount = ntohs(dh->arcount);
318 arcount--;
319 dh->arcount = htons(arcount);
320 }
321 else {
322 /* Removing an intermediary RR could lead to compression error */
323 if (rewriteResponseWithoutEDNS(responseStr, rewrittenResponse) == 0) {
324 *responseLen = rewrittenResponse.size();
325 if (addRoom && (UINT16_MAX - *responseLen) > addRoom) {
326 rewrittenResponse.reserve(*responseLen + addRoom);
327 }
328 *responseSize = rewrittenResponse.capacity();
329 *response = reinterpret_cast<char*>(rewrittenResponse.data());
330 }
331 else {
332 warnlog("Error rewriting content");
333 }
334 }
335 }
336 else {
337 /* the OPT RR was already present, but without ECS,
338 we need to remove the ECS option if any */
339 if (last) {
340 /* nothing after the OPT RR, we can simply remove the
341 ECS option */
342 size_t existingOptLen = optLen;
343 removeEDNSOptionFromOPT(*response + optStart, &optLen, EDNSOptionCode::ECS);
344 *responseLen -= (existingOptLen - optLen);
345 }
346 else {
347 /* Removing an intermediary RR could lead to compression error */
348 if (rewriteResponseWithoutEDNSOption(responseStr, EDNSOptionCode::ECS, rewrittenResponse) == 0) {
349 *responseLen = rewrittenResponse.size();
350 if (addRoom && (UINT16_MAX - *responseLen) > addRoom) {
351 rewrittenResponse.reserve(*responseLen + addRoom);
352 }
353 *responseSize = rewrittenResponse.capacity();
354 *response = reinterpret_cast<char*>(rewrittenResponse.data());
355 }
356 else {
357 warnlog("Error rewriting content");
358 }
359 }
360 }
361 }
362 }
363
364 return true;
365 }
366
367 #ifdef HAVE_DNSCRYPT
368 static bool encryptResponse(char* response, uint16_t* responseLen, size_t responseSize, bool tcp, std::shared_ptr<DNSCryptQuery> dnsCryptQuery, dnsheader** dh, dnsheader* dhCopy)
369 {
370 if (dnsCryptQuery) {
371 uint16_t encryptedResponseLen = 0;
372
373 /* save the original header before encrypting it in place */
374 if (dh != nullptr && *dh != nullptr && dhCopy != nullptr) {
375 memcpy(dhCopy, *dh, sizeof(dnsheader));
376 *dh = dhCopy;
377 }
378
379 int res = dnsCryptQuery->encryptResponse(response, *responseLen, responseSize, tcp, &encryptedResponseLen);
380 if (res == 0) {
381 *responseLen = encryptedResponseLen;
382 } else {
383 /* dropping response */
384 vinfolog("Error encrypting the response, dropping.");
385 return false;
386 }
387 }
388 return true;
389 }
390 #endif /* HAVE_DNSCRYPT */
391
392 static bool applyRulesToResponse(LocalStateHolder<vector<DNSDistResponseRuleAction> >& localRespRulactions, DNSResponse& dr)
393 {
394 DNSResponseAction::Action action=DNSResponseAction::Action::None;
395 std::string ruleresult;
396 for(const auto& lr : *localRespRulactions) {
397 if(lr.d_rule->matches(&dr)) {
398 lr.d_rule->d_matches++;
399 action=(*lr.d_action)(&dr, &ruleresult);
400 switch(action) {
401 case DNSResponseAction::Action::Allow:
402 return true;
403 break;
404 case DNSResponseAction::Action::Drop:
405 return false;
406 break;
407 case DNSResponseAction::Action::HeaderModify:
408 return true;
409 break;
410 case DNSResponseAction::Action::ServFail:
411 dr.dh->rcode = RCode::ServFail;
412 return true;
413 break;
414 /* non-terminal actions follow */
415 case DNSResponseAction::Action::Delay:
416 dr.delayMsec = static_cast<int>(pdns_stou(ruleresult)); // sorry
417 break;
418 case DNSResponseAction::Action::None:
419 break;
420 }
421 }
422 }
423
424 return true;
425 }
426
427 bool processResponse(char** response, uint16_t* responseLen, size_t* responseSize, LocalStateHolder<vector<DNSDistResponseRuleAction> >& localRespRulactions, DNSResponse& dr, size_t addRoom, std::vector<uint8_t>& rewrittenResponse, bool muted)
428 {
429 if (!applyRulesToResponse(localRespRulactions, dr)) {
430 return false;
431 }
432
433 bool zeroScope = false;
434 if (!fixUpResponse(response, responseLen, responseSize, *dr.qname, dr.origFlags, dr.ednsAdded, dr.ecsAdded, rewrittenResponse, addRoom, dr.useZeroScope ? &zeroScope : nullptr)) {
435 return false;
436 }
437
438 if (dr.packetCache && !dr.skipCache) {
439 if (!dr.useZeroScope) {
440 /* if the query was not suitable for zero-scope, for
441 example because it had an existing ECS entry so the hash is
442 not really 'no ECS', so just insert it for the existing subnet
443 since:
444 - we don't have the correct hash for a non-ECS query
445 - inserting with hash computed before the ECS replacement but with
446 the subnet extracted _after_ the replacement would not work.
447 */
448 zeroScope = false;
449 }
450 // if zeroScope, pass the pre-ECS hash-key and do not pass the subnet to the cache
451 dr.packetCache->insert(zeroScope ? dr.cacheKeyNoECS : dr.cacheKey, zeroScope ? boost::none : dr.subnet, dr.origFlags, dr.dnssecOK, *dr.qname, dr.qtype, dr.qclass, *response, *responseLen, dr.tcp, dr.dh->rcode, dr.tempFailureTTL);
452 }
453
454 #ifdef HAVE_DNSCRYPT
455 if (!muted) {
456 if (!encryptResponse(*response, responseLen, *responseSize, dr.tcp, dr.dnsCryptQuery, nullptr, nullptr)) {
457 return false;
458 }
459 }
460 #endif /* HAVE_DNSCRYPT */
461
462 return true;
463 }
464
465 static bool sendUDPResponse(int origFD, const char* response, const uint16_t responseLen, const int delayMsec, const ComboAddress& origDest, const ComboAddress& origRemote)
466 {
467 if(delayMsec && g_delay) {
468 DelayedPacket dp{origFD, string(response,responseLen), origRemote, origDest};
469 g_delay->submit(dp, delayMsec);
470 }
471 else {
472 ssize_t res;
473 if(origDest.sin4.sin_family == 0) {
474 res = sendto(origFD, response, responseLen, 0, reinterpret_cast<const struct sockaddr*>(&origRemote), origRemote.getSocklen());
475 }
476 else {
477 res = sendfromto(origFD, response, responseLen, 0, origDest, origRemote);
478 }
479 if (res == -1) {
480 int err = errno;
481 vinfolog("Error sending response to %s: %s", origRemote.toStringWithPort(), strerror(err));
482 }
483 }
484
485 return true;
486 }
487
488
489 static int pickBackendSocketForSending(std::shared_ptr<DownstreamState>& state)
490 {
491 return state->sockets[state->socketsOffset++ % state->sockets.size()];
492 }
493
494 static void pickBackendSocketsReadyForReceiving(const std::shared_ptr<DownstreamState>& state, std::vector<int>& ready)
495 {
496 ready.clear();
497
498 if (state->sockets.size() == 1) {
499 ready.push_back(state->sockets[0]);
500 return ;
501 }
502
503 {
504 std::lock_guard<std::mutex> lock(state->socketsLock);
505 state->mplexer->getAvailableFDs(ready, -1);
506 }
507 }
508
509 // listens on a dedicated socket, lobs answers from downstream servers to original requestors
510 void responderThread(std::shared_ptr<DownstreamState> dss)
511 try {
512 setThreadName("dnsdist/respond");
513 auto localRespRulactions = g_resprulactions.getLocal();
514 char packet[4096 + DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE];
515 static_assert(sizeof(packet) <= UINT16_MAX, "Packet size should fit in a uint16_t");
516 /* when the answer is encrypted in place, we need to get a copy
517 of the original header before encryption to fill the ring buffer */
518 dnsheader cleartextDH;
519 vector<uint8_t> rewrittenResponse;
520
521 uint16_t queryId = 0;
522 std::vector<int> sockets;
523 sockets.reserve(dss->sockets.size());
524
525 for(;;) {
526 dnsheader* dh = reinterpret_cast<struct dnsheader*>(packet);
527 try {
528 pickBackendSocketsReadyForReceiving(dss, sockets);
529 for (const auto& fd : sockets) {
530 ssize_t got = recv(fd, packet, sizeof(packet), 0);
531 char * response = packet;
532 size_t responseSize = sizeof(packet);
533
534 if (got < 0 || static_cast<size_t>(got) < sizeof(dnsheader))
535 continue;
536
537 uint16_t responseLen = static_cast<uint16_t>(got);
538 queryId = dh->id;
539
540 if(queryId >= dss->idStates.size())
541 continue;
542
543 IDState* ids = &dss->idStates[queryId];
544 int origFD = ids->origFD;
545
546 if(origFD < 0) // duplicate
547 continue;
548
549 /* setting age to 0 to prevent the maintainer thread from
550 cleaning this IDS while we process the response.
551 We have already a copy of the origFD, so it would
552 mostly mess up the outstanding counter.
553 */
554 ids->age = 0;
555
556 unsigned int consumed = 0;
557 if (!responseContentMatches(response, responseLen, ids->qname, ids->qtype, ids->qclass, dss->remote, consumed)) {
558 continue;
559 }
560
561 int oldFD = ids->origFD.exchange(-1);
562 if (oldFD == origFD) {
563 /* we only decrement the outstanding counter if the value was not
564 altered in the meantime, which would mean that the state has been actively reused
565 and the other thread has not incremented the outstanding counter, so we don't
566 want it to be decremented twice. */
567 --dss->outstanding; // you'd think an attacker could game this, but we're using connected socket
568 }
569
570 if(dh->tc && g_truncateTC) {
571 truncateTC(response, &responseLen, responseSize, consumed);
572 }
573
574 dh->id = ids->origID;
575
576 uint16_t addRoom = 0;
577 DNSResponse dr = makeDNSResponseFromIDState(*ids, dh, sizeof(packet), responseLen, false);
578 if (dr.dnsCryptQuery) {
579 addRoom = DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE;
580 }
581
582 memcpy(&cleartextDH, dr.dh, sizeof(cleartextDH));
583 if (!processResponse(&response, &responseLen, &responseSize, localRespRulactions, dr, addRoom, rewrittenResponse, ids->cs && ids->cs->muted)) {
584 continue;
585 }
586
587 if (ids->cs && !ids->cs->muted) {
588 ComboAddress empty;
589 empty.sin4.sin_family = 0;
590 /* if ids->destHarvested is false, origDest holds the listening address.
591 We don't want to use that as a source since it could be 0.0.0.0 for example. */
592 sendUDPResponse(origFD, response, responseLen, dr.delayMsec, ids->destHarvested ? ids->origDest : empty, ids->origRemote);
593 }
594
595 ++g_stats.responses;
596
597 double udiff = ids->sentTime.udiff();
598 vinfolog("Got answer from %s, relayed to %s, took %f usec", dss->remote.toStringWithPort(), dr.remote->toStringWithPort(), udiff);
599
600 struct timespec ts;
601 gettime(&ts);
602 g_rings.insertResponse(ts, *dr.remote, *dr.qname, dr.qtype, static_cast<unsigned int>(udiff), static_cast<unsigned int>(got), cleartextDH, dss->remote);
603
604 switch (dh->rcode) {
605 case RCode::NXDomain:
606 ++g_stats.frontendNXDomain;
607 break;
608 case RCode::ServFail:
609 ++g_stats.servfailResponses;
610 ++g_stats.frontendServFail;
611 break;
612 case RCode::NoError:
613 ++g_stats.frontendNoError;
614 break;
615 }
616 dss->latencyUsec = (127.0 * dss->latencyUsec / 128.0) + udiff/128.0;
617
618 doLatencyStats(udiff);
619
620 rewrittenResponse.clear();
621 }
622 }
623 catch(const std::exception& e){
624 vinfolog("Got an error in UDP responder thread while parsing a response from %s, id %d: %s", dss->remote.toStringWithPort(), queryId, e.what());
625 }
626 }
627 }
628 catch(const std::exception& e)
629 {
630 errlog("UDP responder thread died because of exception: %s", e.what());
631 }
632 catch(const PDNSException& e)
633 {
634 errlog("UDP responder thread died because of PowerDNS exception: %s", e.reason);
635 }
636 catch(...)
637 {
638 errlog("UDP responder thread died because of an exception: %s", "unknown");
639 }
640
641 bool DownstreamState::reconnect()
642 {
643 std::unique_lock<std::mutex> tl(connectLock, std::try_to_lock);
644 if (!tl.owns_lock()) {
645 /* we are already reconnecting */
646 return false;
647 }
648
649 connected = false;
650 for (auto& fd : sockets) {
651 if (fd != -1) {
652 if (sockets.size() > 1) {
653 std::lock_guard<std::mutex> lock(socketsLock);
654 mplexer->removeReadFD(fd);
655 }
656 /* shutdown() is needed to wake up recv() in the responderThread */
657 shutdown(fd, SHUT_RDWR);
658 close(fd);
659 fd = -1;
660 }
661 if (!IsAnyAddress(remote)) {
662 fd = SSocket(remote.sin4.sin_family, SOCK_DGRAM, 0);
663 if (!IsAnyAddress(sourceAddr)) {
664 SSetsockopt(fd, SOL_SOCKET, SO_REUSEADDR, 1);
665 SBind(fd, sourceAddr);
666 }
667 try {
668 SConnect(fd, remote);
669 if (sockets.size() > 1) {
670 std::lock_guard<std::mutex> lock(socketsLock);
671 mplexer->addReadFD(fd, [](int, boost::any) {});
672 }
673 connected = true;
674 }
675 catch(const std::runtime_error& error) {
676 infolog("Error connecting to new server with address %s: %s", remote.toStringWithPort(), error.what());
677 connected = false;
678 break;
679 }
680 }
681 }
682
683 /* if at least one (re-)connection failed, close all sockets */
684 if (!connected) {
685 for (auto& fd : sockets) {
686 if (fd != -1) {
687 if (sockets.size() > 1) {
688 std::lock_guard<std::mutex> lock(socketsLock);
689 mplexer->removeReadFD(fd);
690 }
691 /* shutdown() is needed to wake up recv() in the responderThread */
692 shutdown(fd, SHUT_RDWR);
693 close(fd);
694 fd = -1;
695 }
696 }
697 }
698
699 return connected;
700 }
701 void DownstreamState::hash()
702 {
703 vinfolog("Computing hashes for id=%s and weight=%d", id, weight);
704 auto w = weight;
705 WriteLock wl(&d_lock);
706 hashes.clear();
707 while (w > 0) {
708 std::string uuid = boost::str(boost::format("%s-%d") % id % w);
709 unsigned int wshash = burtleCI((const unsigned char*)uuid.c_str(), uuid.size(), g_hashperturb);
710 hashes.insert(wshash);
711 --w;
712 }
713 }
714
715 void DownstreamState::setId(const boost::uuids::uuid& newId)
716 {
717 id = newId;
718 // compute hashes only if already done
719 if (!hashes.empty()) {
720 hash();
721 }
722 }
723
724 void DownstreamState::setWeight(int newWeight)
725 {
726 if (newWeight < 1) {
727 errlog("Error setting server's weight: downstream weight value must be greater than 0.");
728 return ;
729 }
730 weight = newWeight;
731 if (!hashes.empty()) {
732 hash();
733 }
734 }
735
736 DownstreamState::DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf_, size_t numberOfSockets): remote(remote_), sourceAddr(sourceAddr_), sourceItf(sourceItf_)
737 {
738 pthread_rwlock_init(&d_lock, nullptr);
739 id = getUniqueID();
740 threadStarted.clear();
741
742 mplexer = std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
743
744 sockets.resize(numberOfSockets);
745 for (auto& fd : sockets) {
746 fd = -1;
747 }
748
749 if (!IsAnyAddress(remote)) {
750 reconnect();
751 idStates.resize(g_maxOutstanding);
752 sw.start();
753 infolog("Added downstream server %s", remote.toStringWithPort());
754 }
755
756 }
757
758 std::mutex g_luamutex;
759 LuaContext g_lua;
760
761 GlobalStateHolder<ServerPolicy> g_policy;
762
763 shared_ptr<DownstreamState> firstAvailable(const NumberedServerVector& servers, const DNSQuestion* dq)
764 {
765 for(auto& d : servers) {
766 if(d.second->isUp() && d.second->qps.check())
767 return d.second;
768 }
769 return leastOutstanding(servers, dq);
770 }
771
772 // get server with least outstanding queries, and within those, with the lowest order, and within those: the fastest
773 shared_ptr<DownstreamState> leastOutstanding(const NumberedServerVector& servers, const DNSQuestion* dq)
774 {
775 if (servers.size() == 1 && servers[0].second->isUp()) {
776 return servers[0].second;
777 }
778
779 vector<pair<tuple<int,int,double>, shared_ptr<DownstreamState>>> poss;
780 /* so you might wonder, why do we go through this trouble? The data on which we sort could change during the sort,
781 which would suck royally and could even lead to crashes. So first we snapshot on what we sort, and then we sort */
782 poss.reserve(servers.size());
783 for(auto& d : servers) {
784 if(d.second->isUp()) {
785 poss.push_back({make_tuple(d.second->outstanding.load(), d.second->order, d.second->latencyUsec), d.second});
786 }
787 }
788 if(poss.empty())
789 return shared_ptr<DownstreamState>();
790 nth_element(poss.begin(), poss.begin(), poss.end(), [](const decltype(poss)::value_type& a, const decltype(poss)::value_type& b) { return a.first < b.first; });
791 return poss.begin()->second;
792 }
793
794 shared_ptr<DownstreamState> valrandom(unsigned int val, const NumberedServerVector& servers, const DNSQuestion* dq)
795 {
796 vector<pair<int, shared_ptr<DownstreamState>>> poss;
797 int sum = 0;
798 int max = std::numeric_limits<int>::max();
799
800 for(auto& d : servers) { // w=1, w=10 -> 1, 11
801 if(d.second->isUp()) {
802 // Don't overflow sum when adding high weights
803 if(d.second->weight > max - sum) {
804 sum = max;
805 } else {
806 sum += d.second->weight;
807 }
808
809 poss.push_back({sum, d.second});
810 }
811 }
812
813 // Catch poss & sum are empty to avoid SIGFPE
814 if(poss.empty())
815 return shared_ptr<DownstreamState>();
816
817 int r = val % sum;
818 auto p = upper_bound(poss.begin(), poss.end(),r, [](int r_, const decltype(poss)::value_type& a) { return r_ < a.first;});
819 if(p==poss.end())
820 return shared_ptr<DownstreamState>();
821 return p->second;
822 }
823
824 shared_ptr<DownstreamState> wrandom(const NumberedServerVector& servers, const DNSQuestion* dq)
825 {
826 return valrandom(random(), servers, dq);
827 }
828
829 uint32_t g_hashperturb;
830 shared_ptr<DownstreamState> whashed(const NumberedServerVector& servers, const DNSQuestion* dq)
831 {
832 return valrandom(dq->qname->hash(g_hashperturb), servers, dq);
833 }
834
835 shared_ptr<DownstreamState> chashed(const NumberedServerVector& servers, const DNSQuestion* dq)
836 {
837 unsigned int qhash = dq->qname->hash(g_hashperturb);
838 unsigned int sel = std::numeric_limits<unsigned int>::max();
839 unsigned int min = std::numeric_limits<unsigned int>::max();
840 shared_ptr<DownstreamState> ret = nullptr, first = nullptr;
841
842 for (const auto& d: servers) {
843 if (d.second->isUp()) {
844 // make sure hashes have been computed
845 if (d.second->hashes.empty()) {
846 d.second->hash();
847 }
848 {
849 ReadLock rl(&(d.second->d_lock));
850 const auto& server = d.second;
851 // we want to keep track of the last hash
852 if (min > *(server->hashes.begin())) {
853 min = *(server->hashes.begin());
854 first = server;
855 }
856
857 auto hash_it = server->hashes.lower_bound(qhash);
858 if (hash_it != server->hashes.end()) {
859 if (*hash_it < sel) {
860 sel = *hash_it;
861 ret = server;
862 }
863 }
864 }
865 }
866 }
867 if (ret != nullptr) {
868 return ret;
869 }
870 if (first != nullptr) {
871 return first;
872 }
873 return shared_ptr<DownstreamState>();
874 }
875
876 shared_ptr<DownstreamState> roundrobin(const NumberedServerVector& servers, const DNSQuestion* dq)
877 {
878 NumberedServerVector poss;
879
880 for(auto& d : servers) {
881 if(d.second->isUp()) {
882 poss.push_back(d);
883 }
884 }
885
886 const auto *res=&poss;
887 if(poss.empty())
888 res = &servers;
889
890 if(res->empty())
891 return shared_ptr<DownstreamState>();
892
893 static unsigned int counter;
894
895 return (*res)[(counter++) % res->size()].second;
896 }
897
898 ComboAddress g_serverControl{"127.0.0.1:5199"};
899
900 std::shared_ptr<ServerPool> createPoolIfNotExists(pools_t& pools, const string& poolName)
901 {
902 std::shared_ptr<ServerPool> pool;
903 pools_t::iterator it = pools.find(poolName);
904 if (it != pools.end()) {
905 pool = it->second;
906 }
907 else {
908 if (!poolName.empty())
909 vinfolog("Creating pool %s", poolName);
910 pool = std::make_shared<ServerPool>();
911 pools.insert(std::pair<std::string,std::shared_ptr<ServerPool> >(poolName, pool));
912 }
913 return pool;
914 }
915
916 void setPoolPolicy(pools_t& pools, const string& poolName, std::shared_ptr<ServerPolicy> policy)
917 {
918 std::shared_ptr<ServerPool> pool = createPoolIfNotExists(pools, poolName);
919 if (!poolName.empty()) {
920 vinfolog("Setting pool %s server selection policy to %s", poolName, policy->name);
921 } else {
922 vinfolog("Setting default pool server selection policy to %s", policy->name);
923 }
924 pool->policy = policy;
925 }
926
927 void addServerToPool(pools_t& pools, const string& poolName, std::shared_ptr<DownstreamState> server)
928 {
929 std::shared_ptr<ServerPool> pool = createPoolIfNotExists(pools, poolName);
930 if (!poolName.empty()) {
931 vinfolog("Adding server to pool %s", poolName);
932 } else {
933 vinfolog("Adding server to default pool");
934 }
935 pool->addServer(server);
936 }
937
938 void removeServerFromPool(pools_t& pools, const string& poolName, std::shared_ptr<DownstreamState> server)
939 {
940 std::shared_ptr<ServerPool> pool = getPool(pools, poolName);
941
942 if (!poolName.empty()) {
943 vinfolog("Removing server from pool %s", poolName);
944 }
945 else {
946 vinfolog("Removing server from default pool");
947 }
948
949 pool->removeServer(server);
950 }
951
952 std::shared_ptr<ServerPool> getPool(const pools_t& pools, const std::string& poolName)
953 {
954 pools_t::const_iterator it = pools.find(poolName);
955
956 if (it == pools.end()) {
957 throw std::out_of_range("No pool named " + poolName);
958 }
959
960 return it->second;
961 }
962
963 NumberedServerVector getDownstreamCandidates(const pools_t& pools, const std::string& poolName)
964 {
965 std::shared_ptr<ServerPool> pool = getPool(pools, poolName);
966 return pool->getServers();
967 }
968
969 static void spoofResponseFromString(DNSQuestion& dq, const string& spoofContent)
970 {
971 string result;
972
973 std::vector<std::string> addrs;
974 stringtok(addrs, spoofContent, " ,");
975
976 if (addrs.size() == 1) {
977 try {
978 ComboAddress spoofAddr(spoofContent);
979 SpoofAction sa({spoofAddr});
980 sa(&dq, &result);
981 }
982 catch(const PDNSException &e) {
983 SpoofAction sa(spoofContent); // CNAME then
984 sa(&dq, &result);
985 }
986 } else {
987 std::vector<ComboAddress> cas;
988 for (const auto& addr : addrs) {
989 try {
990 cas.push_back(ComboAddress(addr));
991 }
992 catch (...) {
993 }
994 }
995 SpoofAction sa(cas);
996 sa(&dq, &result);
997 }
998 }
999
1000 static bool applyRulesToQuery(LocalHolders& holders, DNSQuestion& dq, string& poolname, const struct timespec& now)
1001 {
1002 g_rings.insertQuery(now, *dq.remote, *dq.qname, dq.qtype, dq.len, *dq.dh);
1003
1004 if(g_qcount.enabled) {
1005 string qname = (*dq.qname).toString(".");
1006 bool countQuery{true};
1007 if(g_qcount.filter) {
1008 std::lock_guard<std::mutex> lock(g_luamutex);
1009 std::tie (countQuery, qname) = g_qcount.filter(dq);
1010 }
1011
1012 if(countQuery) {
1013 WriteLock wl(&g_qcount.queryLock);
1014 if(!g_qcount.records.count(qname)) {
1015 g_qcount.records[qname] = 0;
1016 }
1017 g_qcount.records[qname]++;
1018 }
1019 }
1020
1021 if(auto got = holders.dynNMGBlock->lookup(*dq.remote)) {
1022 auto updateBlockStats = [&got]() {
1023 ++g_stats.dynBlocked;
1024 got->second.blocks++;
1025 };
1026
1027 if(now < got->second.until) {
1028 DNSAction::Action action = got->second.action;
1029 if (action == DNSAction::Action::None) {
1030 action = g_dynBlockAction;
1031 }
1032 switch (action) {
1033 case DNSAction::Action::NoOp:
1034 /* do nothing */
1035 break;
1036
1037 case DNSAction::Action::Nxdomain:
1038 vinfolog("Query from %s turned into NXDomain because of dynamic block", dq.remote->toStringWithPort());
1039 updateBlockStats();
1040
1041 dq.dh->rcode = RCode::NXDomain;
1042 dq.dh->qr=true;
1043 return true;
1044
1045 case DNSAction::Action::Refused:
1046 vinfolog("Query from %s refused because of dynamic block", dq.remote->toStringWithPort());
1047 updateBlockStats();
1048
1049 dq.dh->rcode = RCode::Refused;
1050 dq.dh->qr = true;
1051 return true;
1052
1053 case DNSAction::Action::Truncate:
1054 if(!dq.tcp) {
1055 updateBlockStats();
1056 vinfolog("Query from %s truncated because of dynamic block", dq.remote->toStringWithPort());
1057 dq.dh->tc = true;
1058 dq.dh->qr = true;
1059 return true;
1060 }
1061 else {
1062 vinfolog("Query from %s for %s over TCP *not* truncated because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toString());
1063 }
1064 break;
1065 case DNSAction::Action::NoRecurse:
1066 updateBlockStats();
1067 vinfolog("Query from %s setting rd=0 because of dynamic block", dq.remote->toStringWithPort());
1068 dq.dh->rd = false;
1069 return true;
1070 default:
1071 updateBlockStats();
1072 vinfolog("Query from %s dropped because of dynamic block", dq.remote->toStringWithPort());
1073 return false;
1074 }
1075 }
1076 }
1077
1078 if(auto got = holders.dynSMTBlock->lookup(*dq.qname)) {
1079 auto updateBlockStats = [&got]() {
1080 ++g_stats.dynBlocked;
1081 got->blocks++;
1082 };
1083
1084 if(now < got->until) {
1085 DNSAction::Action action = got->action;
1086 if (action == DNSAction::Action::None) {
1087 action = g_dynBlockAction;
1088 }
1089 switch (action) {
1090 case DNSAction::Action::NoOp:
1091 /* do nothing */
1092 break;
1093 case DNSAction::Action::Nxdomain:
1094 vinfolog("Query from %s for %s turned into NXDomain because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toString());
1095 updateBlockStats();
1096
1097 dq.dh->rcode = RCode::NXDomain;
1098 dq.dh->qr=true;
1099 return true;
1100 case DNSAction::Action::Refused:
1101 vinfolog("Query from %s for %s refused because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toString());
1102 updateBlockStats();
1103
1104 dq.dh->rcode = RCode::Refused;
1105 dq.dh->qr=true;
1106 return true;
1107 case DNSAction::Action::Truncate:
1108 if(!dq.tcp) {
1109 updateBlockStats();
1110
1111 vinfolog("Query from %s for %s truncated because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toString());
1112 dq.dh->tc = true;
1113 dq.dh->qr = true;
1114 return true;
1115 }
1116 else {
1117 vinfolog("Query from %s for %s over TCP *not* truncated because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toString());
1118 }
1119 break;
1120 case DNSAction::Action::NoRecurse:
1121 updateBlockStats();
1122 vinfolog("Query from %s setting rd=0 because of dynamic block", dq.remote->toStringWithPort());
1123 dq.dh->rd = false;
1124 return true;
1125 default:
1126 updateBlockStats();
1127 vinfolog("Query from %s for %s dropped because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toString());
1128 return false;
1129 }
1130 }
1131 }
1132
1133 DNSAction::Action action=DNSAction::Action::None;
1134 string ruleresult;
1135 for(const auto& lr : *holders.rulactions) {
1136 if(lr.d_rule->matches(&dq)) {
1137 lr.d_rule->d_matches++;
1138 action=(*lr.d_action)(&dq, &ruleresult);
1139
1140 switch(action) {
1141 case DNSAction::Action::Allow:
1142 return true;
1143 break;
1144 case DNSAction::Action::Drop:
1145 ++g_stats.ruleDrop;
1146 return false;
1147 break;
1148 case DNSAction::Action::Nxdomain:
1149 dq.dh->rcode = RCode::NXDomain;
1150 dq.dh->qr=true;
1151 ++g_stats.ruleNXDomain;
1152 return true;
1153 break;
1154 case DNSAction::Action::Refused:
1155 dq.dh->rcode = RCode::Refused;
1156 dq.dh->qr=true;
1157 ++g_stats.ruleRefused;
1158 return true;
1159 break;
1160 case DNSAction::Action::ServFail:
1161 dq.dh->rcode = RCode::ServFail;
1162 dq.dh->qr=true;
1163 ++g_stats.ruleServFail;
1164 return true;
1165 break;
1166 case DNSAction::Action::Spoof:
1167 spoofResponseFromString(dq, ruleresult);
1168 return true;
1169 break;
1170 case DNSAction::Action::Truncate:
1171 dq.dh->tc = true;
1172 dq.dh->qr = true;
1173 return true;
1174 break;
1175 case DNSAction::Action::HeaderModify:
1176 return true;
1177 break;
1178 case DNSAction::Action::Pool:
1179 poolname=ruleresult;
1180 return true;
1181 break;
1182 /* non-terminal actions follow */
1183 case DNSAction::Action::Delay:
1184 dq.delayMsec = static_cast<int>(pdns_stou(ruleresult)); // sorry
1185 break;
1186 case DNSAction::Action::None:
1187 /* fall-through */
1188 case DNSAction::Action::NoOp:
1189 break;
1190 case DNSAction::Action::NoRecurse:
1191 dq.dh->rd = false;
1192 return true;
1193 break;
1194 }
1195 }
1196 }
1197
1198 return true;
1199 }
1200
1201 static ssize_t udpClientSendRequestToBackend(const std::shared_ptr<DownstreamState>& ss, const int sd, const char* request, const size_t requestLen, bool healthCheck=false)
1202 {
1203 ssize_t result;
1204
1205 if (ss->sourceItf == 0) {
1206 result = send(sd, request, requestLen, 0);
1207 }
1208 else {
1209 struct msghdr msgh;
1210 struct iovec iov;
1211 char cbuf[256];
1212 ComboAddress remote(ss->remote);
1213 fillMSGHdr(&msgh, &iov, cbuf, sizeof(cbuf), const_cast<char*>(request), requestLen, &remote);
1214 addCMsgSrcAddr(&msgh, cbuf, &ss->sourceAddr, ss->sourceItf);
1215 result = sendmsg(sd, &msgh, 0);
1216 }
1217
1218 if (result == -1) {
1219 int savederrno = errno;
1220 vinfolog("Error sending request to backend %s: %d", ss->remote.toStringWithPort(), savederrno);
1221
1222 /* This might sound silly, but on Linux send() might fail with EINVAL
1223 if the interface the socket was bound to doesn't exist anymore.
1224 We don't want to reconnect the real socket if the healthcheck failed,
1225 because it's not using the same socket.
1226 */
1227 if (!healthCheck && (savederrno == EINVAL || savederrno == ENODEV)) {
1228 ss->reconnect();
1229 }
1230 }
1231
1232 return result;
1233 }
1234
1235 static bool isUDPQueryAcceptable(ClientState& cs, LocalHolders& holders, const struct msghdr* msgh, const ComboAddress& remote, ComboAddress& dest)
1236 {
1237 if (msgh->msg_flags & MSG_TRUNC) {
1238 /* message was too large for our buffer */
1239 vinfolog("Dropping message too large for our buffer");
1240 ++g_stats.nonCompliantQueries;
1241 return false;
1242 }
1243
1244 if(!holders.acl->match(remote)) {
1245 vinfolog("Query from %s dropped because of ACL", remote.toStringWithPort());
1246 ++g_stats.aclDrops;
1247 return false;
1248 }
1249
1250 cs.queries++;
1251 ++g_stats.queries;
1252
1253 if (HarvestDestinationAddress(msgh, &dest)) {
1254 /* we don't get the port, only the address */
1255 dest.sin4.sin_port = cs.local.sin4.sin_port;
1256 }
1257 else {
1258 dest.sin4.sin_family = 0;
1259 }
1260
1261 return true;
1262 }
1263
1264 boost::optional<std::vector<uint8_t>> checkDNSCryptQuery(const ClientState& cs, const char* query, uint16_t& len, std::shared_ptr<DNSCryptQuery>& dnsCryptQuery, time_t now, bool tcp)
1265 {
1266 if (cs.dnscryptCtx) {
1267 #ifdef HAVE_DNSCRYPT
1268 vector<uint8_t> response;
1269 uint16_t decryptedQueryLen = 0;
1270
1271 dnsCryptQuery = std::make_shared<DNSCryptQuery>(cs.dnscryptCtx);
1272
1273 bool decrypted = handleDNSCryptQuery(const_cast<char*>(query), len, dnsCryptQuery, &decryptedQueryLen, tcp, now, response);
1274
1275 if (!decrypted) {
1276 if (response.size() > 0) {
1277 return response;
1278 }
1279 throw std::runtime_error("Unable to decrypt DNSCrypt query, dropping.");
1280 }
1281
1282 len = decryptedQueryLen;
1283 #endif /* HAVE_DNSCRYPT */
1284 }
1285 return boost::none;
1286 }
1287
1288 bool checkQueryHeaders(const struct dnsheader* dh)
1289 {
1290 if (dh->qr) { // don't respond to responses
1291 ++g_stats.nonCompliantQueries;
1292 return false;
1293 }
1294
1295 if (dh->qdcount == 0) {
1296 ++g_stats.emptyQueries;
1297 return false;
1298 }
1299
1300 if (dh->rd) {
1301 ++g_stats.rdQueries;
1302 }
1303
1304 return true;
1305 }
1306
1307 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1308 static void queueResponse(const ClientState& cs, const char* response, uint16_t responseLen, const ComboAddress& dest, const ComboAddress& remote, struct mmsghdr& outMsg, struct iovec* iov, char* cbuf)
1309 {
1310 outMsg.msg_len = 0;
1311 fillMSGHdr(&outMsg.msg_hdr, iov, nullptr, 0, const_cast<char*>(response), responseLen, const_cast<ComboAddress*>(&remote));
1312
1313 if (dest.sin4.sin_family == 0) {
1314 outMsg.msg_hdr.msg_control = nullptr;
1315 }
1316 else {
1317 addCMsgSrcAddr(&outMsg.msg_hdr, cbuf, &dest, 0);
1318 }
1319 }
1320 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1321
1322 /* self-generated responses or cache hits */
1323 static bool prepareOutgoingResponse(LocalHolders& holders, ClientState& cs, DNSQuestion& dq, bool cacheHit)
1324 {
1325 DNSResponse dr(dq.qname, dq.qtype, dq.qclass, dq.consumed, dq.local, dq.remote, reinterpret_cast<dnsheader*>(dq.dh), dq.size, dq.len, dq.tcp, dq.queryTime);
1326
1327 #ifdef HAVE_PROTOBUF
1328 dr.uniqueId = dq.uniqueId;
1329 #endif
1330 dr.qTag = dq.qTag;
1331 dr.delayMsec = dq.delayMsec;
1332
1333 if (!applyRulesToResponse(cacheHit ? holders.cacheHitRespRulactions : holders.selfAnsweredRespRulactions, dr)) {
1334 return false;
1335 }
1336
1337 /* in case a rule changed it */
1338 dq.delayMsec = dr.delayMsec;
1339
1340 #ifdef HAVE_DNSCRYPT
1341 if (!cs.muted) {
1342 if (!encryptResponse(reinterpret_cast<char*>(dq.dh), &dq.len, dq.size, dq.tcp, dq.dnsCryptQuery, nullptr, nullptr)) {
1343 return false;
1344 }
1345 }
1346 #endif /* HAVE_DNSCRYPT */
1347
1348 if (cacheHit) {
1349 ++g_stats.cacheHits;
1350 }
1351
1352 switch (dr.dh->rcode) {
1353 case RCode::NXDomain:
1354 ++g_stats.frontendNXDomain;
1355 break;
1356 case RCode::ServFail:
1357 ++g_stats.frontendServFail;
1358 break;
1359 case RCode::NoError:
1360 ++g_stats.frontendNoError;
1361 break;
1362 }
1363
1364 doLatencyStats(0); // we're not going to measure this
1365 return true;
1366 }
1367
1368 ProcessQueryResult processQuery(DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend)
1369 {
1370 const uint16_t queryId = ntohs(dq.dh->id);
1371
1372 try {
1373 /* we need an accurate ("real") value for the response and
1374 to store into the IDS, but not for insertion into the
1375 rings for example */
1376 struct timespec now;
1377 gettime(&now);
1378
1379 string poolname;
1380
1381 if (!applyRulesToQuery(holders, dq, poolname, now)) {
1382 return ProcessQueryResult::Drop;
1383 }
1384
1385 if(dq.dh->qr) { // something turned it into a response
1386 fixUpQueryTurnedResponse(dq, dq.origFlags);
1387
1388 if (!prepareOutgoingResponse(holders, cs, dq, false)) {
1389 return ProcessQueryResult::Drop;
1390 }
1391
1392 ++g_stats.selfAnswered;
1393 return ProcessQueryResult::SendAnswer;
1394 }
1395
1396 std::shared_ptr<ServerPool> serverPool = getPool(*holders.pools, poolname);
1397 dq.packetCache = serverPool->packetCache;
1398 auto policy = *(holders.policy);
1399 if (serverPool->policy != nullptr) {
1400 policy = *(serverPool->policy);
1401 }
1402 auto servers = serverPool->getServers();
1403 if (policy.isLua) {
1404 std::lock_guard<std::mutex> lock(g_luamutex);
1405 selectedBackend = policy.policy(servers, &dq);
1406 }
1407 else {
1408 selectedBackend = policy.policy(servers, &dq);
1409 }
1410
1411 uint16_t cachedResponseSize = dq.size;
1412 uint32_t allowExpired = selectedBackend ? 0 : g_staleCacheEntriesTTL;
1413
1414 if (dq.packetCache && !dq.skipCache) {
1415 dq.dnssecOK = (getEDNSZ(dq) & EDNS_HEADER_FLAG_DO);
1416 }
1417
1418 if (dq.useECS && ((selectedBackend && selectedBackend->useECS) || (!selectedBackend && serverPool->getECS()))) {
1419 // we special case our cache in case a downstream explicitly gave us a universally valid response with a 0 scope
1420 if (dq.packetCache && !dq.skipCache && (!selectedBackend || !selectedBackend->disableZeroScope) && dq.packetCache->isECSParsingEnabled()) {
1421 if (dq.packetCache->get(dq, dq.consumed, dq.dh->id, reinterpret_cast<char*>(dq.dh), &cachedResponseSize, &dq.cacheKeyNoECS, dq.subnet, dq.dnssecOK, allowExpired)) {
1422 dq.len = cachedResponseSize;
1423
1424 if (!prepareOutgoingResponse(holders, cs, dq, true)) {
1425 return ProcessQueryResult::Drop;
1426 }
1427
1428 return ProcessQueryResult::SendAnswer;
1429 }
1430
1431 if (!dq.subnet) {
1432 /* there was no existing ECS on the query, enable the zero-scope feature */
1433 dq.useZeroScope = true;
1434 }
1435 }
1436
1437 if (!handleEDNSClientSubnet(dq, &(dq.ednsAdded), &(dq.ecsAdded), g_preserveTrailingData)) {
1438 vinfolog("Dropping query from %s because we couldn't insert the ECS value", dq.remote->toStringWithPort());
1439 return ProcessQueryResult::Drop;
1440 }
1441 }
1442
1443 if (dq.packetCache && !dq.skipCache) {
1444 if (dq.packetCache->get(dq, dq.consumed, dq.dh->id, reinterpret_cast<char*>(dq.dh), &cachedResponseSize, &dq.cacheKey, dq.subnet, dq.dnssecOK, allowExpired)) {
1445 dq.len = cachedResponseSize;
1446
1447 if (!prepareOutgoingResponse(holders, cs, dq, true)) {
1448 return ProcessQueryResult::Drop;
1449 }
1450
1451 return ProcessQueryResult::SendAnswer;
1452 }
1453 ++g_stats.cacheMisses;
1454 }
1455
1456 if(!selectedBackend) {
1457 ++g_stats.noPolicy;
1458
1459 vinfolog("%s query for %s|%s from %s, no policy applied", g_servFailOnNoPolicy ? "ServFailed" : "Dropped", dq.qname->toString(), QType(dq.qtype).getName(), dq.remote->toStringWithPort());
1460 if (g_servFailOnNoPolicy) {
1461 restoreFlags(dq.dh, dq.origFlags);
1462
1463 dq.dh->rcode = RCode::ServFail;
1464 dq.dh->qr = true;
1465
1466 if (!prepareOutgoingResponse(holders, cs, dq, false)) {
1467 return ProcessQueryResult::Drop;
1468 }
1469 // no response-only statistics counter to update.
1470 return ProcessQueryResult::SendAnswer;
1471 }
1472
1473 return ProcessQueryResult::Drop;
1474 }
1475
1476 if (dq.addXPF && selectedBackend->xpfRRCode != 0) {
1477 addXPF(dq, selectedBackend->xpfRRCode, g_preserveTrailingData);
1478 }
1479
1480 selectedBackend->queries++;
1481 return ProcessQueryResult::PassToBackend;
1482 }
1483 catch(const std::exception& e){
1484 vinfolog("Got an error while parsing a %s query from %s, id %d: %s", (dq.tcp ? "TCP" : "UDP"), dq.remote->toStringWithPort(), queryId, e.what());
1485 }
1486 return ProcessQueryResult::Drop;
1487 }
1488
1489 static void processUDPQuery(ClientState& cs, LocalHolders& holders, const struct msghdr* msgh, const ComboAddress& remote, ComboAddress& dest, char* query, uint16_t len, size_t queryBufferSize, struct mmsghdr* responsesVect, unsigned int* queuedResponses, struct iovec* respIOV, char* respCBuf)
1490 {
1491 assert(responsesVect == nullptr || (queuedResponses != nullptr && respIOV != nullptr && respCBuf != nullptr));
1492 uint16_t queryId = 0;
1493
1494 try {
1495 if (!isUDPQueryAcceptable(cs, holders, msgh, remote, dest)) {
1496 return;
1497 }
1498
1499 /* we need an accurate ("real") value for the response and
1500 to store into the IDS, but not for insertion into the
1501 rings for example */
1502 struct timespec queryRealTime;
1503 gettime(&queryRealTime, true);
1504
1505 std::shared_ptr<DNSCryptQuery> dnsCryptQuery = nullptr;
1506 auto dnsCryptResponse = checkDNSCryptQuery(cs, query, len, dnsCryptQuery, queryRealTime.tv_sec, false);
1507 if (dnsCryptResponse) {
1508 sendUDPResponse(cs.udpFD, reinterpret_cast<char*>(dnsCryptResponse->data()), static_cast<uint16_t>(dnsCryptResponse->size()), 0, dest, remote);
1509 return;
1510 }
1511
1512 struct dnsheader* dh = reinterpret_cast<struct dnsheader*>(query);
1513 queryId = ntohs(dh->id);
1514
1515 if (!checkQueryHeaders(dh)) {
1516 return;
1517 }
1518
1519 uint16_t qtype, qclass;
1520 unsigned int consumed = 0;
1521 DNSName qname(query, len, sizeof(dnsheader), false, &qtype, &qclass, &consumed);
1522 DNSQuestion dq(&qname, qtype, qclass, consumed, dest.sin4.sin_family != 0 ? &dest : &cs.local, &remote, dh, queryBufferSize, len, false, &queryRealTime);
1523 dq.dnsCryptQuery = std::move(dnsCryptQuery);
1524 std::shared_ptr<DownstreamState> ss{nullptr};
1525 auto result = processQuery(dq, cs, holders, ss);
1526
1527 if (result == ProcessQueryResult::Drop) {
1528 return;
1529 }
1530
1531 if (result == ProcessQueryResult::SendAnswer) {
1532 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1533 if (dq.delayMsec == 0 && responsesVect != nullptr) {
1534 queueResponse(cs, reinterpret_cast<char*>(dq.dh), dq.len, *dq.local, *dq.remote, responsesVect[*queuedResponses], respIOV, respCBuf);
1535 (*queuedResponses)++;
1536 return;
1537 }
1538 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1539 sendUDPResponse(cs.udpFD, reinterpret_cast<char*>(dq.dh), dq.len, dq.delayMsec, *dq.local, *dq.remote);
1540 return;
1541 }
1542
1543 if (result != ProcessQueryResult::PassToBackend || ss == nullptr) {
1544 return;
1545 }
1546
1547 unsigned int idOffset = (ss->idOffset++) % ss->idStates.size();
1548 IDState* ids = &ss->idStates[idOffset];
1549 ids->age = 0;
1550
1551 int oldFD = ids->origFD.exchange(cs.udpFD);
1552 if(oldFD < 0) {
1553 // if we are reusing, no change in outstanding
1554 ss->outstanding++;
1555 }
1556 else {
1557 ss->reuseds++;
1558 ++g_stats.downstreamTimeouts;
1559 }
1560
1561 ids->cs = &cs;
1562 ids->origID = dh->id;
1563 setIDStateFromDNSQuestion(*ids, dq, std::move(qname));
1564
1565 /* If we couldn't harvest the real dest addr, still
1566 write down the listening addr since it will be useful
1567 (especially if it's not an 'any' one).
1568 We need to keep track of which one it is since we may
1569 want to use the real but not the listening addr to reply.
1570 */
1571 if (dest.sin4.sin_family != 0) {
1572 ids->origDest = dest;
1573 ids->destHarvested = true;
1574 }
1575 else {
1576 ids->origDest = cs.local;
1577 ids->destHarvested = false;
1578 }
1579
1580 dh->id = idOffset;
1581
1582 int fd = pickBackendSocketForSending(ss);
1583 ssize_t ret = udpClientSendRequestToBackend(ss, fd, query, dq.len);
1584
1585 if(ret < 0) {
1586 ss->sendErrors++;
1587 ++g_stats.downstreamSendErrors;
1588 }
1589
1590 vinfolog("Got query for %s|%s from %s, relayed to %s", ids->qname.toString(), QType(ids->qtype).getName(), remote.toStringWithPort(), ss->getName());
1591 }
1592 catch(const std::exception& e){
1593 vinfolog("Got an error in UDP question thread while parsing a query from %s, id %d: %s", remote.toStringWithPort(), queryId, e.what());
1594 }
1595 }
1596
1597 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1598 static void MultipleMessagesUDPClientThread(ClientState* cs, LocalHolders& holders)
1599 {
1600 struct MMReceiver
1601 {
1602 char packet[4096];
1603 /* used by HarvestDestinationAddress */
1604 char cbuf[256];
1605 ComboAddress remote;
1606 ComboAddress dest;
1607 struct iovec iov;
1608 };
1609 const size_t vectSize = g_udpVectorSize;
1610 /* the actual buffer is larger because:
1611 - we may have to add EDNS and/or ECS
1612 - we use it for self-generated responses (from rule or cache)
1613 but we only accept incoming payloads up to that size
1614 */
1615 static_assert(s_udpIncomingBufferSize <= sizeof(MMReceiver::packet), "the incoming buffer size should not be larger than sizeof(MMReceiver::packet)");
1616
1617 auto recvData = std::unique_ptr<MMReceiver[]>(new MMReceiver[vectSize]);
1618 auto msgVec = std::unique_ptr<struct mmsghdr[]>(new struct mmsghdr[vectSize]);
1619 auto outMsgVec = std::unique_ptr<struct mmsghdr[]>(new struct mmsghdr[vectSize]);
1620
1621 /* initialize the structures needed to receive our messages */
1622 for (size_t idx = 0; idx < vectSize; idx++) {
1623 recvData[idx].remote.sin4.sin_family = cs->local.sin4.sin_family;
1624 fillMSGHdr(&msgVec[idx].msg_hdr, &recvData[idx].iov, recvData[idx].cbuf, sizeof(recvData[idx].cbuf), recvData[idx].packet, s_udpIncomingBufferSize, &recvData[idx].remote);
1625 }
1626
1627 /* go now */
1628 for(;;) {
1629
1630 /* reset the IO vector, since it's also used to send the vector of responses
1631 to avoid having to copy the data around */
1632 for (size_t idx = 0; idx < vectSize; idx++) {
1633 recvData[idx].iov.iov_base = recvData[idx].packet;
1634 recvData[idx].iov.iov_len = sizeof(recvData[idx].packet);
1635 }
1636
1637 /* block until we have at least one message ready, but return
1638 as many as possible to save the syscall costs */
1639 int msgsGot = recvmmsg(cs->udpFD, msgVec.get(), vectSize, MSG_WAITFORONE | MSG_TRUNC, nullptr);
1640
1641 if (msgsGot <= 0) {
1642 vinfolog("Getting UDP messages via recvmmsg() failed with: %s", strerror(errno));
1643 continue;
1644 }
1645
1646 unsigned int msgsToSend = 0;
1647
1648 /* process the received messages */
1649 for (int msgIdx = 0; msgIdx < msgsGot; msgIdx++) {
1650 const struct msghdr* msgh = &msgVec[msgIdx].msg_hdr;
1651 unsigned int got = msgVec[msgIdx].msg_len;
1652 const ComboAddress& remote = recvData[msgIdx].remote;
1653
1654 if (got < 0 || static_cast<size_t>(got) < sizeof(struct dnsheader)) {
1655 ++g_stats.nonCompliantQueries;
1656 continue;
1657 }
1658
1659 processUDPQuery(*cs, holders, msgh, remote, recvData[msgIdx].dest, recvData[msgIdx].packet, static_cast<uint16_t>(got), sizeof(recvData[msgIdx].packet), outMsgVec.get(), &msgsToSend, &recvData[msgIdx].iov, recvData[msgIdx].cbuf);
1660
1661 }
1662
1663 /* immediate (not delayed or sent to a backend) responses (mostly from a rule, dynamic block
1664 or the cache) can be sent in batch too */
1665
1666 if (msgsToSend > 0 && msgsToSend <= static_cast<unsigned int>(msgsGot)) {
1667 int sent = sendmmsg(cs->udpFD, outMsgVec.get(), msgsToSend, 0);
1668
1669 if (sent < 0 || static_cast<unsigned int>(sent) != msgsToSend) {
1670 vinfolog("Error sending responses with sendmmsg() (%d on %u): %s", sent, msgsToSend, strerror(errno));
1671 }
1672 }
1673
1674 }
1675 }
1676 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1677
1678 // listens to incoming queries, sends out to downstream servers, noting the intended return path
1679 static void udpClientThread(ClientState* cs)
1680 try
1681 {
1682 setThreadName("dnsdist/udpClie");
1683 LocalHolders holders;
1684
1685 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1686 if (g_udpVectorSize > 1) {
1687 MultipleMessagesUDPClientThread(cs, holders);
1688
1689 }
1690 else
1691 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1692 {
1693 char packet[4096];
1694 /* the actual buffer is larger because:
1695 - we may have to add EDNS and/or ECS
1696 - we use it for self-generated responses (from rule or cache)
1697 but we only accept incoming payloads up to that size
1698 */
1699 static_assert(s_udpIncomingBufferSize <= sizeof(packet), "the incoming buffer size should not be larger than sizeof(MMReceiver::packet)");
1700 struct msghdr msgh;
1701 struct iovec iov;
1702 /* used by HarvestDestinationAddress */
1703 char cbuf[256];
1704
1705 ComboAddress remote;
1706 ComboAddress dest;
1707 remote.sin4.sin_family = cs->local.sin4.sin_family;
1708 fillMSGHdr(&msgh, &iov, cbuf, sizeof(cbuf), packet, sizeof(packet), &remote);
1709
1710 for(;;) {
1711 ssize_t got = recvmsg(cs->udpFD, &msgh, 0);
1712
1713 if (got < 0 || static_cast<size_t>(got) < sizeof(struct dnsheader)) {
1714 ++g_stats.nonCompliantQueries;
1715 continue;
1716 }
1717
1718 processUDPQuery(*cs, holders, &msgh, remote, dest, packet, static_cast<uint16_t>(got), s_udpIncomingBufferSize, nullptr, nullptr, nullptr, nullptr);
1719 }
1720 }
1721 }
1722 catch(const std::exception &e)
1723 {
1724 errlog("UDP client thread died because of exception: %s", e.what());
1725 }
1726 catch(const PDNSException &e)
1727 {
1728 errlog("UDP client thread died because of PowerDNS exception: %s", e.reason);
1729 }
1730 catch(...)
1731 {
1732 errlog("UDP client thread died because of an exception: %s", "unknown");
1733 }
1734
1735 uint16_t getRandomDNSID()
1736 {
1737 #ifdef HAVE_LIBSODIUM
1738 return (randombytes_random() % 65536);
1739 #else
1740 return (random() % 65536);
1741 #endif
1742 }
1743
1744 static bool upCheck(const shared_ptr<DownstreamState>& ds)
1745 try
1746 {
1747 DNSName checkName = ds->checkName;
1748 uint16_t checkType = ds->checkType.getCode();
1749 uint16_t checkClass = ds->checkClass;
1750 dnsheader checkHeader;
1751 memset(&checkHeader, 0, sizeof(checkHeader));
1752
1753 checkHeader.qdcount = htons(1);
1754 checkHeader.id = getRandomDNSID();
1755
1756 checkHeader.rd = true;
1757 if (ds->setCD) {
1758 checkHeader.cd = true;
1759 }
1760
1761 if (ds->checkFunction) {
1762 std::lock_guard<std::mutex> lock(g_luamutex);
1763 auto ret = ds->checkFunction(checkName, checkType, checkClass, &checkHeader);
1764 checkName = std::get<0>(ret);
1765 checkType = std::get<1>(ret);
1766 checkClass = std::get<2>(ret);
1767 }
1768
1769 vector<uint8_t> packet;
1770 DNSPacketWriter dpw(packet, checkName, checkType, checkClass);
1771 dnsheader * requestHeader = dpw.getHeader();
1772 *requestHeader = checkHeader;
1773
1774 Socket sock(ds->remote.sin4.sin_family, SOCK_DGRAM);
1775 sock.setNonBlocking();
1776 if (!IsAnyAddress(ds->sourceAddr)) {
1777 sock.setReuseAddr();
1778 sock.bind(ds->sourceAddr);
1779 }
1780 sock.connect(ds->remote);
1781 ssize_t sent = udpClientSendRequestToBackend(ds, sock.getHandle(), reinterpret_cast<char*>(&packet[0]), packet.size(), true);
1782 if (sent < 0) {
1783 int ret = errno;
1784 if (g_verboseHealthChecks)
1785 infolog("Error while sending a health check query to backend %s: %d", ds->getNameWithAddr(), ret);
1786 return false;
1787 }
1788
1789 int ret = waitForRWData(sock.getHandle(), true, /* ms to seconds */ ds->checkTimeout / 1000, /* remaining ms to us */ (ds->checkTimeout % 1000) * 1000);
1790 if(ret < 0 || !ret) { // error, timeout, both are down!
1791 if (ret < 0) {
1792 ret = errno;
1793 if (g_verboseHealthChecks)
1794 infolog("Error while waiting for the health check response from backend %s: %d", ds->getNameWithAddr(), ret);
1795 }
1796 else {
1797 if (g_verboseHealthChecks)
1798 infolog("Timeout while waiting for the health check response from backend %s", ds->getNameWithAddr());
1799 }
1800 return false;
1801 }
1802
1803 string reply;
1804 ComboAddress from;
1805 sock.recvFrom(reply, from);
1806
1807 /* we are using a connected socket but hey.. */
1808 if (from != ds->remote) {
1809 if (g_verboseHealthChecks)
1810 infolog("Invalid health check response received from %s, expecting one from %s", from.toStringWithPort(), ds->remote.toStringWithPort());
1811 return false;
1812 }
1813
1814 const dnsheader * responseHeader = reinterpret_cast<const dnsheader *>(reply.c_str());
1815
1816 if (reply.size() < sizeof(*responseHeader)) {
1817 if (g_verboseHealthChecks)
1818 infolog("Invalid health check response of size %d from backend %s, expecting at least %d", reply.size(), ds->getNameWithAddr(), sizeof(*responseHeader));
1819 return false;
1820 }
1821
1822 if (responseHeader->id != requestHeader->id) {
1823 if (g_verboseHealthChecks)
1824 infolog("Invalid health check response id %d from backend %s, expecting %d", responseHeader->id, ds->getNameWithAddr(), requestHeader->id);
1825 return false;
1826 }
1827
1828 if (!responseHeader->qr) {
1829 if (g_verboseHealthChecks)
1830 infolog("Invalid health check response from backend %s, expecting QR to be set", ds->getNameWithAddr());
1831 return false;
1832 }
1833
1834 if (responseHeader->rcode == RCode::ServFail) {
1835 if (g_verboseHealthChecks)
1836 infolog("Backend %s responded to health check with ServFail", ds->getNameWithAddr());
1837 return false;
1838 }
1839
1840 if (ds->mustResolve && (responseHeader->rcode == RCode::NXDomain || responseHeader->rcode == RCode::Refused)) {
1841 if (g_verboseHealthChecks)
1842 infolog("Backend %s responded to health check with %s while mustResolve is set", ds->getNameWithAddr(), responseHeader->rcode == RCode::NXDomain ? "NXDomain" : "Refused");
1843 return false;
1844 }
1845
1846 uint16_t receivedType;
1847 uint16_t receivedClass;
1848 DNSName receivedName(reply.c_str(), reply.size(), sizeof(dnsheader), false, &receivedType, &receivedClass);
1849
1850 if (receivedName != checkName || receivedType != checkType || receivedClass != checkClass) {
1851 if (g_verboseHealthChecks)
1852 infolog("Backend %s responded to health check with an invalid qname (%s vs %s), qtype (%s vs %s) or qclass (%d vs %d)", ds->getNameWithAddr(), receivedName.toLogString(), checkName.toLogString(), QType(receivedType).getName(), QType(checkType).getName(), receivedClass, checkClass);
1853 return false;
1854 }
1855
1856 return true;
1857 }
1858 catch(const std::exception& e)
1859 {
1860 if (g_verboseHealthChecks)
1861 infolog("Error checking the health of backend %s: %s", ds->getNameWithAddr(), e.what());
1862 return false;
1863 }
1864 catch(...)
1865 {
1866 if (g_verboseHealthChecks)
1867 infolog("Unknown exception while checking the health of backend %s", ds->getNameWithAddr());
1868 return false;
1869 }
1870
1871 uint64_t g_maxTCPClientThreads{10};
1872 std::atomic<uint16_t> g_cacheCleaningDelay{60};
1873 std::atomic<uint16_t> g_cacheCleaningPercentage{100};
1874
1875 void maintThread()
1876 {
1877 setThreadName("dnsdist/main");
1878 int interval = 1;
1879 size_t counter = 0;
1880 int32_t secondsToWaitLog = 0;
1881
1882 for(;;) {
1883 sleep(interval);
1884
1885 {
1886 std::lock_guard<std::mutex> lock(g_luamutex);
1887 auto f = g_lua.readVariable<boost::optional<std::function<void()> > >("maintenance");
1888 if(f) {
1889 try {
1890 (*f)();
1891 secondsToWaitLog = 0;
1892 }
1893 catch(std::exception &e) {
1894 if (secondsToWaitLog <= 0) {
1895 infolog("Error during execution of maintenance function: %s", e.what());
1896 secondsToWaitLog = 61;
1897 }
1898 secondsToWaitLog -= interval;
1899 }
1900 }
1901 }
1902
1903 counter++;
1904 if (counter >= g_cacheCleaningDelay) {
1905 /* keep track, for each cache, of whether we should keep
1906 expired entries */
1907 std::map<std::shared_ptr<DNSDistPacketCache>, bool> caches;
1908
1909 /* gather all caches actually used by at least one pool, and see
1910 if something prevents us from cleaning the expired entries */
1911 auto localPools = g_pools.getLocal();
1912 for (const auto& entry : *localPools) {
1913 auto& pool = entry.second;
1914
1915 auto packetCache = pool->packetCache;
1916 if (!packetCache) {
1917 continue;
1918 }
1919
1920 auto pair = caches.insert({packetCache, false});
1921 auto& iter = pair.first;
1922 /* if we need to keep stale data for this cache (ie, not clear
1923 expired entries when at least one pool using this cache
1924 has all its backends down) */
1925 if (packetCache->keepStaleData() && iter->second == false) {
1926 /* so far all pools had at least one backend up */
1927 if (pool->countServers(true) == 0) {
1928 iter->second = true;
1929 }
1930 }
1931 }
1932
1933 for (auto pair : caches) {
1934 /* shall we keep expired entries ? */
1935 if (pair.second == true) {
1936 continue;
1937 }
1938 auto& packetCache = pair.first;
1939 size_t upTo = (packetCache->getMaxEntries()* (100 - g_cacheCleaningPercentage)) / 100;
1940 packetCache->purgeExpired(upTo);
1941 }
1942 counter = 0;
1943 }
1944
1945 // ponder pruning g_dynblocks of expired entries here
1946 }
1947 }
1948
1949 static void secPollThread()
1950 {
1951 setThreadName("dnsdist/secpoll");
1952
1953 for (;;) {
1954 try {
1955 doSecPoll(g_secPollSuffix);
1956 }
1957 catch(...) {
1958 }
1959 sleep(g_secPollInterval);
1960 }
1961 }
1962
1963 static void healthChecksThread()
1964 {
1965 setThreadName("dnsdist/healthC");
1966
1967 int interval = 1;
1968
1969 for(;;) {
1970 sleep(interval);
1971
1972 if(g_tcpclientthreads->getQueuedCount() > 1 && !g_tcpclientthreads->hasReachedMaxThreads())
1973 g_tcpclientthreads->addTCPClientThread();
1974
1975 auto states = g_dstates.getLocal(); // this points to the actual shared_ptrs!
1976 for(auto& dss : *states) {
1977 if(++dss->lastCheck < dss->checkInterval)
1978 continue;
1979 dss->lastCheck = 0;
1980 if(dss->availability==DownstreamState::Availability::Auto) {
1981 bool newState=upCheck(dss);
1982 if (newState) {
1983 /* check succeeded */
1984 dss->currentCheckFailures = 0;
1985
1986 if (!dss->upStatus) {
1987 /* we were marked as down */
1988 dss->consecutiveSuccessfulChecks++;
1989 if (dss->consecutiveSuccessfulChecks < dss->minRiseSuccesses) {
1990 /* if we need more than one successful check to rise
1991 and we didn't reach the threshold yet,
1992 let's stay down */
1993 newState = false;
1994 }
1995 }
1996 }
1997 else {
1998 /* check failed */
1999 dss->consecutiveSuccessfulChecks = 0;
2000
2001 if (dss->upStatus) {
2002 /* we are currently up */
2003 dss->currentCheckFailures++;
2004 if (dss->currentCheckFailures < dss->maxCheckFailures) {
2005 /* we need more than one failure to be marked as down,
2006 and we did not reach the threshold yet, let's stay down */
2007 newState = true;
2008 }
2009 }
2010 }
2011
2012 if(newState != dss->upStatus) {
2013 warnlog("Marking downstream %s as '%s'", dss->getNameWithAddr(), newState ? "up" : "down");
2014
2015 if (newState && !dss->connected) {
2016 newState = dss->reconnect();
2017
2018 if (dss->connected && !dss->threadStarted.test_and_set()) {
2019 dss->tid = thread(responderThread, dss);
2020 }
2021 }
2022
2023 dss->upStatus = newState;
2024 dss->currentCheckFailures = 0;
2025 dss->consecutiveSuccessfulChecks = 0;
2026 if (g_snmpAgent && g_snmpTrapsEnabled) {
2027 g_snmpAgent->sendBackendStatusChangeTrap(dss);
2028 }
2029 }
2030 }
2031
2032 auto delta = dss->sw.udiffAndSet()/1000000.0;
2033 dss->queryLoad = 1.0*(dss->queries.load() - dss->prev.queries.load())/delta;
2034 dss->dropRate = 1.0*(dss->reuseds.load() - dss->prev.reuseds.load())/delta;
2035 dss->prev.queries.store(dss->queries.load());
2036 dss->prev.reuseds.store(dss->reuseds.load());
2037
2038 for(IDState& ids : dss->idStates) { // timeouts
2039 int origFD = ids.origFD;
2040 if(origFD >=0 && ids.age++ > g_udpTimeout) {
2041 /* We set origFD to -1 as soon as possible
2042 to limit the risk of racing with the
2043 responder thread.
2044 The UDP client thread only checks origFD to
2045 know whether outstanding has to be incremented,
2046 so the sooner the better any way since we _will_
2047 decrement it.
2048 */
2049 if (ids.origFD.exchange(-1) != origFD) {
2050 /* this state has been altered in the meantime,
2051 don't go anywhere near it */
2052 continue;
2053 }
2054 ids.age = 0;
2055 dss->reuseds++;
2056 --dss->outstanding;
2057 ++g_stats.downstreamTimeouts; // this is an 'actively' discovered timeout
2058 vinfolog("Had a downstream timeout from %s (%s) for query for %s|%s from %s",
2059 dss->remote.toStringWithPort(), dss->name,
2060 ids.qname.toString(), QType(ids.qtype).getName(), ids.origRemote.toStringWithPort());
2061
2062 struct timespec ts;
2063 gettime(&ts);
2064
2065 struct dnsheader fake;
2066 memset(&fake, 0, sizeof(fake));
2067 fake.id = ids.origID;
2068
2069 g_rings.insertResponse(ts, ids.origRemote, ids.qname, ids.qtype, std::numeric_limits<unsigned int>::max(), 0, fake, dss->remote);
2070 }
2071 }
2072 }
2073 }
2074 }
2075
2076 static void bindAny(int af, int sock)
2077 {
2078 __attribute__((unused)) int one = 1;
2079
2080 #ifdef IP_FREEBIND
2081 if (setsockopt(sock, IPPROTO_IP, IP_FREEBIND, &one, sizeof(one)) < 0)
2082 warnlog("Warning: IP_FREEBIND setsockopt failed: %s", strerror(errno));
2083 #endif
2084
2085 #ifdef IP_BINDANY
2086 if (af == AF_INET)
2087 if (setsockopt(sock, IPPROTO_IP, IP_BINDANY, &one, sizeof(one)) < 0)
2088 warnlog("Warning: IP_BINDANY setsockopt failed: %s", strerror(errno));
2089 #endif
2090 #ifdef IPV6_BINDANY
2091 if (af == AF_INET6)
2092 if (setsockopt(sock, IPPROTO_IPV6, IPV6_BINDANY, &one, sizeof(one)) < 0)
2093 warnlog("Warning: IPV6_BINDANY setsockopt failed: %s", strerror(errno));
2094 #endif
2095 #ifdef SO_BINDANY
2096 if (setsockopt(sock, SOL_SOCKET, SO_BINDANY, &one, sizeof(one)) < 0)
2097 warnlog("Warning: SO_BINDANY setsockopt failed: %s", strerror(errno));
2098 #endif
2099 }
2100
2101 static void dropGroupPrivs(gid_t gid)
2102 {
2103 if (gid) {
2104 if (setgid(gid) == 0) {
2105 if (setgroups(0, NULL) < 0) {
2106 warnlog("Warning: Unable to drop supplementary gids: %s", strerror(errno));
2107 }
2108 }
2109 else {
2110 warnlog("Warning: Unable to set group ID to %d: %s", gid, strerror(errno));
2111 }
2112 }
2113 }
2114
2115 static void dropUserPrivs(uid_t uid)
2116 {
2117 if(uid) {
2118 if(setuid(uid) < 0) {
2119 warnlog("Warning: Unable to set user ID to %d: %s", uid, strerror(errno));
2120 }
2121 }
2122 }
2123
2124 static void checkFileDescriptorsLimits(size_t udpBindsCount, size_t tcpBindsCount)
2125 {
2126 /* stdin, stdout, stderr */
2127 size_t requiredFDsCount = 3;
2128 auto backends = g_dstates.getLocal();
2129 /* UDP sockets to backends */
2130 size_t backendUDPSocketsCount = 0;
2131 for (const auto& backend : *backends) {
2132 backendUDPSocketsCount += backend->sockets.size();
2133 }
2134 requiredFDsCount += backendUDPSocketsCount;
2135 /* TCP sockets to backends */
2136 requiredFDsCount += (backends->size() * g_maxTCPClientThreads);
2137 /* listening sockets */
2138 requiredFDsCount += udpBindsCount;
2139 requiredFDsCount += tcpBindsCount;
2140 /* max TCP connections currently served */
2141 requiredFDsCount += g_maxTCPClientThreads;
2142 /* max pipes for communicating between TCP acceptors and client threads */
2143 requiredFDsCount += (g_maxTCPClientThreads * 2);
2144 /* max TCP queued connections */
2145 requiredFDsCount += g_maxTCPQueuedConnections;
2146 /* DelayPipe pipe */
2147 requiredFDsCount += 2;
2148 /* syslog socket */
2149 requiredFDsCount++;
2150 /* webserver main socket */
2151 requiredFDsCount++;
2152 /* console main socket */
2153 requiredFDsCount++;
2154 /* carbon export */
2155 requiredFDsCount++;
2156 /* history file */
2157 requiredFDsCount++;
2158 struct rlimit rl;
2159 getrlimit(RLIMIT_NOFILE, &rl);
2160 if (rl.rlim_cur <= requiredFDsCount) {
2161 warnlog("Warning, this configuration can use more than %d file descriptors, web server and console connections not included, and the current limit is %d.", std::to_string(requiredFDsCount), std::to_string(rl.rlim_cur));
2162 #ifdef HAVE_SYSTEMD
2163 warnlog("You can increase this value by using LimitNOFILE= in the systemd unit file or ulimit.");
2164 #else
2165 warnlog("You can increase this value by using ulimit.");
2166 #endif
2167 }
2168 }
2169
2170 struct
2171 {
2172 vector<string> locals;
2173 vector<string> remotes;
2174 bool checkConfig{false};
2175 bool beClient{false};
2176 bool beSupervised{false};
2177 string command;
2178 string config;
2179 string uid;
2180 string gid;
2181 } g_cmdLine;
2182
2183 std::atomic<bool> g_configurationDone{false};
2184
2185 static void usage()
2186 {
2187 cout<<endl;
2188 cout<<"Syntax: dnsdist [-C,--config file] [-c,--client [IP[:PORT]]]\n";
2189 cout<<"[-e,--execute cmd] [-h,--help] [-l,--local addr]\n";
2190 cout<<"[-v,--verbose] [--check-config] [--version]\n";
2191 cout<<"\n";
2192 cout<<"-a,--acl netmask Add this netmask to the ACL\n";
2193 cout<<"-C,--config file Load configuration from 'file'\n";
2194 cout<<"-c,--client Operate as a client, connect to dnsdist. This reads\n";
2195 cout<<" controlSocket from your configuration file, but also\n";
2196 cout<<" accepts an IP:PORT argument\n";
2197 #ifdef HAVE_LIBSODIUM
2198 cout<<"-k,--setkey KEY Use KEY for encrypted communication to dnsdist. This\n";
2199 cout<<" is similar to setting setKey in the configuration file.\n";
2200 cout<<" NOTE: this will leak this key in your shell's history\n";
2201 cout<<" and in the systems running process list.\n";
2202 #endif
2203 cout<<"--check-config Validate the configuration file and exit. The exit-code\n";
2204 cout<<" reflects the validation, 0 is OK, 1 means an error.\n";
2205 cout<<" Any errors are printed as well.\n";
2206 cout<<"-e,--execute cmd Connect to dnsdist and execute 'cmd'\n";
2207 cout<<"-g,--gid gid Change the process group ID after binding sockets\n";
2208 cout<<"-h,--help Display this helpful message\n";
2209 cout<<"-l,--local address Listen on this local address\n";
2210 cout<<"--supervised Don't open a console, I'm supervised\n";
2211 cout<<" (use with e.g. systemd and daemontools)\n";
2212 cout<<"--disable-syslog Don't log to syslog, only to stdout\n";
2213 cout<<" (use with e.g. systemd)\n";
2214 cout<<"-u,--uid uid Change the process user ID after binding sockets\n";
2215 cout<<"-v,--verbose Enable verbose mode\n";
2216 cout<<"-V,--version Show dnsdist version information and exit\n";
2217 }
2218
2219 int main(int argc, char** argv)
2220 try
2221 {
2222 size_t udpBindsCount = 0;
2223 size_t tcpBindsCount = 0;
2224 rl_attempted_completion_function = my_completion;
2225 rl_completion_append_character = 0;
2226
2227 signal(SIGPIPE, SIG_IGN);
2228 signal(SIGCHLD, SIG_IGN);
2229 openlog("dnsdist", LOG_PID|LOG_NDELAY, LOG_DAEMON);
2230
2231 #ifdef HAVE_LIBSODIUM
2232 if (sodium_init() == -1) {
2233 cerr<<"Unable to initialize crypto library"<<endl;
2234 exit(EXIT_FAILURE);
2235 }
2236 g_hashperturb=randombytes_uniform(0xffffffff);
2237 srandom(randombytes_uniform(0xffffffff));
2238 #else
2239 {
2240 struct timeval tv;
2241 gettimeofday(&tv, 0);
2242 srandom(tv.tv_sec ^ tv.tv_usec ^ getpid());
2243 g_hashperturb=random();
2244 }
2245
2246 #endif
2247 ComboAddress clientAddress = ComboAddress();
2248 g_cmdLine.config=SYSCONFDIR "/dnsdist.conf";
2249 struct option longopts[]={
2250 {"acl", required_argument, 0, 'a'},
2251 {"check-config", no_argument, 0, 1},
2252 {"client", no_argument, 0, 'c'},
2253 {"config", required_argument, 0, 'C'},
2254 {"disable-syslog", no_argument, 0, 2},
2255 {"execute", required_argument, 0, 'e'},
2256 {"gid", required_argument, 0, 'g'},
2257 {"help", no_argument, 0, 'h'},
2258 {"local", required_argument, 0, 'l'},
2259 {"setkey", required_argument, 0, 'k'},
2260 {"supervised", no_argument, 0, 3},
2261 {"uid", required_argument, 0, 'u'},
2262 {"verbose", no_argument, 0, 'v'},
2263 {"version", no_argument, 0, 'V'},
2264 {0,0,0,0}
2265 };
2266 int longindex=0;
2267 string optstring;
2268 for(;;) {
2269 int c=getopt_long(argc, argv, "a:cC:e:g:hk:l:u:vV", longopts, &longindex);
2270 if(c==-1)
2271 break;
2272 switch(c) {
2273 case 1:
2274 g_cmdLine.checkConfig=true;
2275 break;
2276 case 2:
2277 g_syslog=false;
2278 break;
2279 case 3:
2280 g_cmdLine.beSupervised=true;
2281 break;
2282 case 'C':
2283 g_cmdLine.config=optarg;
2284 break;
2285 case 'c':
2286 g_cmdLine.beClient=true;
2287 break;
2288 case 'e':
2289 g_cmdLine.command=optarg;
2290 break;
2291 case 'g':
2292 g_cmdLine.gid=optarg;
2293 break;
2294 case 'h':
2295 cout<<"dnsdist "<<VERSION<<endl;
2296 usage();
2297 cout<<"\n";
2298 exit(EXIT_SUCCESS);
2299 break;
2300 case 'a':
2301 optstring=optarg;
2302 g_ACL.modify([optstring](NetmaskGroup& nmg) { nmg.addMask(optstring); });
2303 break;
2304 case 'k':
2305 #ifdef HAVE_LIBSODIUM
2306 if (B64Decode(string(optarg), g_consoleKey) < 0) {
2307 cerr<<"Unable to decode key '"<<optarg<<"'."<<endl;
2308 exit(EXIT_FAILURE);
2309 }
2310 #else
2311 cerr<<"dnsdist has been built without libsodium, -k/--setkey is unsupported."<<endl;
2312 exit(EXIT_FAILURE);
2313 #endif
2314 break;
2315 case 'l':
2316 g_cmdLine.locals.push_back(trim_copy(string(optarg)));
2317 break;
2318 case 'u':
2319 g_cmdLine.uid=optarg;
2320 break;
2321 case 'v':
2322 g_verbose=true;
2323 break;
2324 case 'V':
2325 #ifdef LUAJIT_VERSION
2326 cout<<"dnsdist "<<VERSION<<" ("<<LUA_RELEASE<<" ["<<LUAJIT_VERSION<<"])"<<endl;
2327 #else
2328 cout<<"dnsdist "<<VERSION<<" ("<<LUA_RELEASE<<")"<<endl;
2329 #endif
2330 cout<<"Enabled features: ";
2331 #ifdef HAVE_DNS_OVER_TLS
2332 cout<<"dns-over-tls(";
2333 #ifdef HAVE_GNUTLS
2334 cout<<"gnutls";
2335 #ifdef HAVE_LIBSSL
2336 cout<<" ";
2337 #endif
2338 #endif
2339 #ifdef HAVE_LIBSSL
2340 cout<<"openssl";
2341 #endif
2342 cout<<") ";
2343 #endif
2344 #ifdef HAVE_DNSCRYPT
2345 cout<<"dnscrypt ";
2346 #endif
2347 #ifdef HAVE_EBPF
2348 cout<<"ebpf ";
2349 #endif
2350 #ifdef HAVE_FSTRM
2351 cout<<"fstrm ";
2352 #endif
2353 #ifdef HAVE_LIBCRYPTO
2354 cout<<"ipcipher ";
2355 #endif
2356 #ifdef HAVE_LIBSODIUM
2357 cout<<"libsodium ";
2358 #endif
2359 #ifdef HAVE_PROTOBUF
2360 cout<<"protobuf ";
2361 #endif
2362 #ifdef HAVE_RE2
2363 cout<<"re2 ";
2364 #endif
2365 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
2366 cout<<"recvmmsg/sendmmsg ";
2367 #endif
2368 #ifdef HAVE_NET_SNMP
2369 cout<<"snmp ";
2370 #endif
2371 #ifdef HAVE_SYSTEMD
2372 cout<<"systemd";
2373 #endif
2374 cout<<endl;
2375 exit(EXIT_SUCCESS);
2376 break;
2377 case '?':
2378 //getopt_long printed an error message.
2379 usage();
2380 exit(EXIT_FAILURE);
2381 break;
2382 }
2383 }
2384
2385 argc-=optind;
2386 argv+=optind;
2387 for(auto p = argv; *p; ++p) {
2388 if(g_cmdLine.beClient) {
2389 clientAddress = ComboAddress(*p, 5199);
2390 } else {
2391 g_cmdLine.remotes.push_back(*p);
2392 }
2393 }
2394
2395 ServerPolicy leastOutstandingPol{"leastOutstanding", leastOutstanding, false};
2396
2397 g_policy.setState(leastOutstandingPol);
2398 if(g_cmdLine.beClient || !g_cmdLine.command.empty()) {
2399 setupLua(true, g_cmdLine.config);
2400 if (clientAddress != ComboAddress())
2401 g_serverControl = clientAddress;
2402 doClient(g_serverControl, g_cmdLine.command);
2403 _exit(EXIT_SUCCESS);
2404 }
2405
2406 auto acl = g_ACL.getCopy();
2407 if(acl.empty()) {
2408 for(auto& addr : {"127.0.0.0/8", "10.0.0.0/8", "100.64.0.0/10", "169.254.0.0/16", "192.168.0.0/16", "172.16.0.0/12", "::1/128", "fc00::/7", "fe80::/10"})
2409 acl.addMask(addr);
2410 g_ACL.setState(acl);
2411 }
2412
2413 auto consoleACL = g_consoleACL.getCopy();
2414 for (const auto& mask : { "127.0.0.1/8", "::1/128" }) {
2415 consoleACL.addMask(mask);
2416 }
2417 g_consoleACL.setState(consoleACL);
2418
2419 if (g_cmdLine.checkConfig) {
2420 setupLua(true, g_cmdLine.config);
2421 // No exception was thrown
2422 infolog("Configuration '%s' OK!", g_cmdLine.config);
2423 _exit(EXIT_SUCCESS);
2424 }
2425
2426 auto todo=setupLua(false, g_cmdLine.config);
2427
2428 auto localPools = g_pools.getCopy();
2429 {
2430 bool precompute = false;
2431 if (g_policy.getLocal()->name == "chashed") {
2432 precompute = true;
2433 } else {
2434 for (const auto& entry: localPools) {
2435 if (entry.second->policy != nullptr && entry.second->policy->name == "chashed") {
2436 precompute = true;
2437 break ;
2438 }
2439 }
2440 }
2441 if (precompute) {
2442 vinfolog("Pre-computing hashes for consistent hash load-balancing policy");
2443 // pre compute hashes
2444 auto backends = g_dstates.getLocal();
2445 for (auto& backend: *backends) {
2446 backend->hash();
2447 }
2448 }
2449 }
2450
2451 if(g_cmdLine.locals.size()) {
2452 g_locals.clear();
2453 for(auto loc : g_cmdLine.locals)
2454 g_locals.push_back(std::make_tuple(ComboAddress(loc, 53), true, false, 0, "", std::set<int>()));
2455 }
2456
2457 if(g_locals.empty())
2458 g_locals.push_back(std::make_tuple(ComboAddress("127.0.0.1", 53), true, false, 0, "", std::set<int>()));
2459
2460 g_configurationDone = true;
2461
2462 vector<ClientState*> toLaunch;
2463 for(const auto& local : g_locals) {
2464 ClientState* cs = new ClientState;
2465 cs->local= std::get<0>(local);
2466 cs->udpFD = SSocket(cs->local.sin4.sin_family, SOCK_DGRAM, 0);
2467 if(cs->local.sin4.sin_family == AF_INET6) {
2468 SSetsockopt(cs->udpFD, IPPROTO_IPV6, IPV6_V6ONLY, 1);
2469 }
2470 //if(g_vm.count("bind-non-local"))
2471 bindAny(cs->local.sin4.sin_family, cs->udpFD);
2472
2473 // if (!setSocketTimestamps(cs->udpFD))
2474 // g_log<<Logger::Warning<<"Unable to enable timestamp reporting for socket"<<endl;
2475
2476
2477 if(IsAnyAddress(cs->local)) {
2478 int one=1;
2479 setsockopt(cs->udpFD, IPPROTO_IP, GEN_IP_PKTINFO, &one, sizeof(one)); // linux supports this, so why not - might fail on other systems
2480 #ifdef IPV6_RECVPKTINFO
2481 setsockopt(cs->udpFD, IPPROTO_IPV6, IPV6_RECVPKTINFO, &one, sizeof(one));
2482 #endif
2483 }
2484
2485 if (std::get<2>(local)) {
2486 #ifdef SO_REUSEPORT
2487 SSetsockopt(cs->udpFD, SOL_SOCKET, SO_REUSEPORT, 1);
2488 #else
2489 warnlog("SO_REUSEPORT has been configured on local address '%s' but is not supported", std::get<0>(local).toStringWithPort());
2490 #endif
2491 }
2492
2493 const std::string& itf = std::get<4>(local);
2494 if (!itf.empty()) {
2495 #ifdef SO_BINDTODEVICE
2496 int res = setsockopt(cs->udpFD, SOL_SOCKET, SO_BINDTODEVICE, itf.c_str(), itf.length());
2497 if (res != 0) {
2498 warnlog("Error setting up the interface on local address '%s': %s", std::get<0>(local).toStringWithPort(), strerror(errno));
2499 }
2500 #else
2501 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", std::get<0>(local).toStringWithPort());
2502 #endif
2503 }
2504
2505 #ifdef HAVE_EBPF
2506 if (g_defaultBPFFilter) {
2507 cs->attachFilter(g_defaultBPFFilter);
2508 vinfolog("Attaching default BPF Filter to UDP frontend %s", cs->local.toStringWithPort());
2509 }
2510 #endif /* HAVE_EBPF */
2511
2512 cs->cpus = std::get<5>(local);
2513
2514 SBind(cs->udpFD, cs->local);
2515 toLaunch.push_back(cs);
2516 g_frontends.push_back(cs);
2517 udpBindsCount++;
2518 }
2519
2520 for(const auto& local : g_locals) {
2521 if(!std::get<1>(local)) { // no TCP/IP
2522 warnlog("Not providing TCP/IP service on local address '%s'", std::get<0>(local).toStringWithPort());
2523 continue;
2524 }
2525 ClientState* cs = new ClientState;
2526 cs->local= std::get<0>(local);
2527
2528 cs->tcpFD = SSocket(cs->local.sin4.sin_family, SOCK_STREAM, 0);
2529
2530 SSetsockopt(cs->tcpFD, SOL_SOCKET, SO_REUSEADDR, 1);
2531 #ifdef TCP_DEFER_ACCEPT
2532 SSetsockopt(cs->tcpFD, IPPROTO_TCP, TCP_DEFER_ACCEPT, 1);
2533 #endif
2534 if (std::get<3>(local) > 0) {
2535 #ifdef TCP_FASTOPEN
2536 SSetsockopt(cs->tcpFD, IPPROTO_TCP, TCP_FASTOPEN, std::get<3>(local));
2537 #else
2538 warnlog("TCP Fast Open has been configured on local address '%s' but is not supported", std::get<0>(local).toStringWithPort());
2539 #endif
2540 }
2541 if(cs->local.sin4.sin_family == AF_INET6) {
2542 SSetsockopt(cs->tcpFD, IPPROTO_IPV6, IPV6_V6ONLY, 1);
2543 }
2544 #ifdef SO_REUSEPORT
2545 /* no need to warn again if configured but support is not available, we already did for UDP */
2546 if (std::get<2>(local)) {
2547 SSetsockopt(cs->tcpFD, SOL_SOCKET, SO_REUSEPORT, 1);
2548 }
2549 #endif
2550
2551 const std::string& itf = std::get<4>(local);
2552 if (!itf.empty()) {
2553 #ifdef SO_BINDTODEVICE
2554 int res = setsockopt(cs->tcpFD, SOL_SOCKET, SO_BINDTODEVICE, itf.c_str(), itf.length());
2555 if (res != 0) {
2556 warnlog("Error setting up the interface on local address '%s': %s", std::get<0>(local).toStringWithPort(), strerror(errno));
2557 }
2558 #else
2559 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", std::get<0>(local).toStringWithPort());
2560 #endif
2561 }
2562
2563 #ifdef HAVE_EBPF
2564 if (g_defaultBPFFilter) {
2565 cs->attachFilter(g_defaultBPFFilter);
2566 vinfolog("Attaching default BPF Filter to TCP frontend %s", cs->local.toStringWithPort());
2567 }
2568 #endif /* HAVE_EBPF */
2569
2570 // if(g_vm.count("bind-non-local"))
2571 bindAny(cs->local.sin4.sin_family, cs->tcpFD);
2572 SBind(cs->tcpFD, cs->local);
2573 SListen(cs->tcpFD, 64);
2574 warnlog("Listening on %s", cs->local.toStringWithPort());
2575
2576 toLaunch.push_back(cs);
2577 g_frontends.push_back(cs);
2578 tcpBindsCount++;
2579 }
2580
2581 for(auto& dcLocal : g_dnsCryptLocals) {
2582 ClientState* cs = new ClientState;
2583 cs->local = std::get<0>(dcLocal);
2584 cs->dnscryptCtx = std::get<1>(dcLocal);
2585 cs->udpFD = SSocket(cs->local.sin4.sin_family, SOCK_DGRAM, 0);
2586 if(cs->local.sin4.sin_family == AF_INET6) {
2587 SSetsockopt(cs->udpFD, IPPROTO_IPV6, IPV6_V6ONLY, 1);
2588 }
2589 bindAny(cs->local.sin4.sin_family, cs->udpFD);
2590 if(IsAnyAddress(cs->local)) {
2591 int one=1;
2592 setsockopt(cs->udpFD, IPPROTO_IP, GEN_IP_PKTINFO, &one, sizeof(one)); // linux supports this, so why not - might fail on other systems
2593 #ifdef IPV6_RECVPKTINFO
2594 setsockopt(cs->udpFD, IPPROTO_IPV6, IPV6_RECVPKTINFO, &one, sizeof(one));
2595 #endif
2596 }
2597 if (std::get<2>(dcLocal)) {
2598 #ifdef SO_REUSEPORT
2599 SSetsockopt(cs->udpFD, SOL_SOCKET, SO_REUSEPORT, 1);
2600 #else
2601 warnlog("SO_REUSEPORT has been configured on local address '%s' but is not supported", std::get<0>(dcLocal).toStringWithPort());
2602 #endif
2603 }
2604
2605 const std::string& itf = std::get<4>(dcLocal);
2606 if (!itf.empty()) {
2607 #ifdef SO_BINDTODEVICE
2608 int res = setsockopt(cs->udpFD, SOL_SOCKET, SO_BINDTODEVICE, itf.c_str(), itf.length());
2609 if (res != 0) {
2610 warnlog("Error setting up the interface on local address '%s': %s", std::get<0>(dcLocal).toStringWithPort(), strerror(errno));
2611 }
2612 #else
2613 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", std::get<0>(dcLocal).toStringWithPort());
2614 #endif
2615 }
2616
2617 #ifdef HAVE_EBPF
2618 if (g_defaultBPFFilter) {
2619 cs->attachFilter(g_defaultBPFFilter);
2620 vinfolog("Attaching default BPF Filter to UDP DNSCrypt frontend %s", cs->local.toStringWithPort());
2621 }
2622 #endif /* HAVE_EBPF */
2623 SBind(cs->udpFD, cs->local);
2624 toLaunch.push_back(cs);
2625 g_frontends.push_back(cs);
2626 udpBindsCount++;
2627
2628 cs = new ClientState;
2629 cs->local = std::get<0>(dcLocal);
2630 cs->dnscryptCtx = std::get<1>(dcLocal);
2631 cs->tcpFD = SSocket(cs->local.sin4.sin_family, SOCK_STREAM, 0);
2632 SSetsockopt(cs->tcpFD, SOL_SOCKET, SO_REUSEADDR, 1);
2633 #ifdef TCP_DEFER_ACCEPT
2634 SSetsockopt(cs->tcpFD, IPPROTO_TCP, TCP_DEFER_ACCEPT, 1);
2635 #endif
2636 if (std::get<3>(dcLocal) > 0) {
2637 #ifdef TCP_FASTOPEN
2638 SSetsockopt(cs->tcpFD, IPPROTO_TCP, TCP_FASTOPEN, std::get<3>(dcLocal));
2639 #else
2640 warnlog("TCP Fast Open has been configured on local address '%s' but is not supported", std::get<0>(dcLocal).toStringWithPort());
2641 #endif
2642 }
2643
2644 #ifdef SO_REUSEPORT
2645 /* no need to warn again if configured but support is not available, we already did for UDP */
2646 if (std::get<2>(dcLocal)) {
2647 SSetsockopt(cs->tcpFD, SOL_SOCKET, SO_REUSEPORT, 1);
2648 }
2649 #endif
2650
2651 if (!itf.empty()) {
2652 #ifdef SO_BINDTODEVICE
2653 int res = setsockopt(cs->tcpFD, SOL_SOCKET, SO_BINDTODEVICE, itf.c_str(), itf.length());
2654 if (res != 0) {
2655 warnlog("Error setting up the interface on local address '%s': %s", std::get<0>(dcLocal).toStringWithPort(), strerror(errno));
2656 }
2657 #else
2658 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", std::get<0>(dcLocal).toStringWithPort());
2659 #endif
2660 }
2661
2662 if(cs->local.sin4.sin_family == AF_INET6) {
2663 SSetsockopt(cs->tcpFD, IPPROTO_IPV6, IPV6_V6ONLY, 1);
2664 }
2665 #ifdef HAVE_EBPF
2666 if (g_defaultBPFFilter) {
2667 cs->attachFilter(g_defaultBPFFilter);
2668 vinfolog("Attaching default BPF Filter to TCP DNSCrypt frontend %s", cs->local.toStringWithPort());
2669 }
2670 #endif /* HAVE_EBPF */
2671
2672 cs->cpus = std::get<5>(dcLocal);
2673
2674 bindAny(cs->local.sin4.sin_family, cs->tcpFD);
2675 SBind(cs->tcpFD, cs->local);
2676 SListen(cs->tcpFD, 64);
2677 warnlog("Listening on %s", cs->local.toStringWithPort());
2678 toLaunch.push_back(cs);
2679 g_frontends.push_back(cs);
2680 tcpBindsCount++;
2681 }
2682
2683 for(auto& frontend : g_tlslocals) {
2684 ClientState* cs = new ClientState;
2685 cs->local = frontend->d_addr;
2686 cs->tcpFD = SSocket(cs->local.sin4.sin_family, SOCK_STREAM, 0);
2687 SSetsockopt(cs->tcpFD, SOL_SOCKET, SO_REUSEADDR, 1);
2688 #ifdef TCP_DEFER_ACCEPT
2689 SSetsockopt(cs->tcpFD, IPPROTO_TCP, TCP_DEFER_ACCEPT, 1);
2690 #endif
2691 if (frontend->d_tcpFastOpenQueueSize > 0) {
2692 #ifdef TCP_FASTOPEN
2693 SSetsockopt(cs->tcpFD, IPPROTO_TCP, TCP_FASTOPEN, frontend->d_tcpFastOpenQueueSize);
2694 #else
2695 warnlog("TCP Fast Open has been configured on local address '%s' but is not supported", cs->local.toStringWithPort());
2696 #endif
2697 }
2698 if (frontend->d_reusePort) {
2699 #ifdef SO_REUSEPORT
2700 SSetsockopt(cs->tcpFD, SOL_SOCKET, SO_REUSEPORT, 1);
2701 #else
2702 warnlog("SO_REUSEPORT has been configured on local address '%s' but is not supported", cs->local.toStringWithPort());
2703 #endif
2704 }
2705 if(cs->local.sin4.sin_family == AF_INET6) {
2706 SSetsockopt(cs->tcpFD, IPPROTO_IPV6, IPV6_V6ONLY, 1);
2707 }
2708
2709 if (!frontend->d_interface.empty()) {
2710 #ifdef SO_BINDTODEVICE
2711 int res = setsockopt(cs->tcpFD, SOL_SOCKET, SO_BINDTODEVICE, frontend->d_interface.c_str(), frontend->d_interface.length());
2712 if (res != 0) {
2713 warnlog("Error setting up the interface on local address '%s': %s", cs->local.toStringWithPort(), strerror(errno));
2714 }
2715 #else
2716 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", cs->local.toStringWithPort());
2717 #endif
2718 }
2719
2720 cs->cpus = frontend->d_cpus;
2721
2722 bindAny(cs->local.sin4.sin_family, cs->tcpFD);
2723 if (frontend->setupTLS()) {
2724 cs->tlsFrontend = frontend;
2725 SBind(cs->tcpFD, cs->local);
2726 SListen(cs->tcpFD, 64);
2727 warnlog("Listening on %s for TLS", cs->local.toStringWithPort());
2728 toLaunch.push_back(cs);
2729 g_frontends.push_back(cs);
2730 tcpBindsCount++;
2731 }
2732 else {
2733 errlog("Error while setting up TLS on local address '%s', exiting", cs->local.toStringWithPort());
2734 delete cs;
2735 _exit(EXIT_FAILURE);
2736 }
2737 }
2738
2739 warnlog("dnsdist %s comes with ABSOLUTELY NO WARRANTY. This is free software, and you are welcome to redistribute it according to the terms of the GPL version 2", VERSION);
2740
2741 vector<string> vec;
2742 std::string acls;
2743 g_ACL.getLocal()->toStringVector(&vec);
2744 for(const auto& s : vec) {
2745 if (!acls.empty())
2746 acls += ", ";
2747 acls += s;
2748 }
2749 infolog("ACL allowing queries from: %s", acls.c_str());
2750 vec.clear();
2751 acls.clear();
2752 g_consoleACL.getLocal()->toStringVector(&vec);
2753 for (const auto& entry : vec) {
2754 if (!acls.empty()) {
2755 acls += ", ";
2756 }
2757 acls += entry;
2758 }
2759 infolog("Console ACL allowing connections from: %s", acls.c_str());
2760
2761 #ifdef HAVE_LIBSODIUM
2762 if (g_consoleEnabled && g_consoleKey.empty()) {
2763 warnlog("Warning, the console has been enabled via 'controlSocket()' but no key has been set with 'setKey()' so all connections will fail until a key has been set");
2764 }
2765 #endif
2766
2767 uid_t newgid=0;
2768 gid_t newuid=0;
2769
2770 if(!g_cmdLine.gid.empty())
2771 newgid = strToGID(g_cmdLine.gid.c_str());
2772
2773 if(!g_cmdLine.uid.empty())
2774 newuid = strToUID(g_cmdLine.uid.c_str());
2775
2776 dropGroupPrivs(newgid);
2777 dropUserPrivs(newuid);
2778 try {
2779 /* we might still have capabilities remaining,
2780 for example if we have been started as root
2781 without --uid or --gid (please don't do that)
2782 or as an unprivileged user with ambient
2783 capabilities like CAP_NET_BIND_SERVICE.
2784 */
2785 dropCapabilities();
2786 }
2787 catch(const std::exception& e) {
2788 warnlog("%s", e.what());
2789 }
2790
2791 /* this need to be done _after_ dropping privileges */
2792 g_delay = new DelayPipe<DelayedPacket>();
2793
2794 if (g_snmpAgent) {
2795 g_snmpAgent->run();
2796 }
2797
2798 g_tcpclientthreads = std::unique_ptr<TCPClientCollection>(new TCPClientCollection(g_maxTCPClientThreads, g_useTCPSinglePipe));
2799
2800 for(auto& t : todo)
2801 t();
2802
2803 localPools = g_pools.getCopy();
2804 /* create the default pool no matter what */
2805 createPoolIfNotExists(localPools, "");
2806 if(g_cmdLine.remotes.size()) {
2807 for(const auto& address : g_cmdLine.remotes) {
2808 auto ret=std::make_shared<DownstreamState>(ComboAddress(address, 53));
2809 addServerToPool(localPools, "", ret);
2810 if (ret->connected && !ret->threadStarted.test_and_set()) {
2811 ret->tid = thread(responderThread, ret);
2812 }
2813 g_dstates.modify([ret](servers_t& servers) { servers.push_back(ret); });
2814 }
2815 }
2816 g_pools.setState(localPools);
2817
2818 if(g_dstates.getLocal()->empty()) {
2819 errlog("No downstream servers defined: all packets will get dropped");
2820 // you might define them later, but you need to know
2821 }
2822
2823 checkFileDescriptorsLimits(udpBindsCount, tcpBindsCount);
2824
2825 for(auto& dss : g_dstates.getCopy()) { // it is a copy, but the internal shared_ptrs are the real deal
2826 if(dss->availability==DownstreamState::Availability::Auto) {
2827 bool newState=upCheck(dss);
2828 warnlog("Marking downstream %s as '%s'", dss->getNameWithAddr(), newState ? "up" : "down");
2829 dss->upStatus = newState;
2830 }
2831 }
2832
2833 for(auto& cs : toLaunch) {
2834 if (cs->udpFD >= 0) {
2835 thread t1(udpClientThread, cs);
2836 if (!cs->cpus.empty()) {
2837 mapThreadToCPUList(t1.native_handle(), cs->cpus);
2838 }
2839 t1.detach();
2840 }
2841 else if (cs->tcpFD >= 0) {
2842 thread t1(tcpAcceptorThread, cs);
2843 if (!cs->cpus.empty()) {
2844 mapThreadToCPUList(t1.native_handle(), cs->cpus);
2845 }
2846 t1.detach();
2847 }
2848 }
2849
2850 thread carbonthread(carbonDumpThread);
2851 carbonthread.detach();
2852
2853 thread stattid(maintThread);
2854 stattid.detach();
2855
2856 thread healththread(healthChecksThread);
2857
2858 if (!g_secPollSuffix.empty()) {
2859 thread secpollthread(secPollThread);
2860 secpollthread.detach();
2861 }
2862
2863 if(g_cmdLine.beSupervised) {
2864 #ifdef HAVE_SYSTEMD
2865 sd_notify(0, "READY=1");
2866 #endif
2867 healththread.join();
2868 }
2869 else {
2870 healththread.detach();
2871 doConsole();
2872 }
2873 _exit(EXIT_SUCCESS);
2874
2875 }
2876 catch(const LuaContext::ExecutionErrorException& e) {
2877 try {
2878 errlog("Fatal Lua error: %s", e.what());
2879 std::rethrow_if_nested(e);
2880 } catch(const std::exception& ne) {
2881 errlog("Details: %s", ne.what());
2882 }
2883 catch(PDNSException &ae)
2884 {
2885 errlog("Fatal pdns error: %s", ae.reason);
2886 }
2887 _exit(EXIT_FAILURE);
2888 }
2889 catch(std::exception &e)
2890 {
2891 errlog("Fatal error: %s", e.what());
2892 _exit(EXIT_FAILURE);
2893 }
2894 catch(PDNSException &ae)
2895 {
2896 errlog("Fatal pdns error: %s", ae.reason);
2897 _exit(EXIT_FAILURE);
2898 }