]> git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/dnsdist.cc
6a442ac57d400fd9201f4d71ec84e2284cbd7114
[thirdparty/pdns.git] / pdns / dnsdist.cc
1 /*
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
8 *
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
22 #include "dnsdist.hh"
23 #include "dnsdist-ecs.hh"
24 #include "sstuff.hh"
25 #include "misc.hh"
26 #include <netinet/tcp.h>
27 #include <limits>
28 #include "dolog.hh"
29
30 #if defined (__OpenBSD__)
31 #include <readline/readline.h>
32 #else
33 #include <editline/readline.h>
34 #endif
35
36 #include "dnsname.hh"
37 #include "dnswriter.hh"
38 #include "base64.hh"
39 #include <fstream>
40 #include "delaypipe.hh"
41 #include <unistd.h>
42 #include "sodcrypto.hh"
43 #include "dnsrulactions.hh"
44 #include <grp.h>
45 #include <pwd.h>
46 #include "lock.hh"
47 #include <getopt.h>
48 #include <sys/resource.h>
49 #include "dnsdist-cache.hh"
50 #include "gettime.hh"
51
52 #ifdef HAVE_SYSTEMD
53 #include <systemd/sd-daemon.h>
54 #endif
55
56 #ifdef HAVE_PROTOBUF
57 thread_local boost::uuids::random_generator t_uuidGenerator;
58 #endif
59
60 /* Known sins:
61
62 Receiver is currently single threaded
63 not *that* bad actually, but now that we are thread safe, might want to scale
64 */
65
66 /* the Rulaction plan
67 Set of Rules, if one matches, it leads to an Action
68 Both rules and actions could conceivably be Lua based.
69 On the C++ side, both could be inherited from a class Rule and a class Action,
70 on the Lua side we can't do that. */
71
72 using std::atomic;
73 using std::thread;
74 bool g_verbose;
75
76 struct DNSDistStats g_stats;
77 uint16_t g_maxOutstanding{10240};
78 bool g_console;
79 bool g_verboseHealthChecks{false};
80 uint32_t g_staleCacheEntriesTTL{0};
81 bool g_syslog{true};
82
83 GlobalStateHolder<NetmaskGroup> g_ACL;
84 string g_outputBuffer;
85 vector<std::tuple<ComboAddress, bool, bool, int, string, std::set<int>>> g_locals;
86 #ifdef HAVE_DNSCRYPT
87 std::vector<std::tuple<ComboAddress,DnsCryptContext,bool, int, string, std::set<int>>> g_dnsCryptLocals;
88 #endif
89 #ifdef HAVE_EBPF
90 shared_ptr<BPFFilter> g_defaultBPFFilter;
91 std::vector<std::shared_ptr<DynBPFFilter> > g_dynBPFFilters;
92 #endif /* HAVE_EBPF */
93 vector<ClientState *> g_frontends;
94 GlobalStateHolder<pools_t> g_pools;
95 size_t g_udpVectorSize{1};
96
97 bool g_snmpEnabled{false};
98 bool g_snmpTrapsEnabled{false};
99 DNSDistSNMPAgent* g_snmpAgent{nullptr};
100
101 /* UDP: the grand design. Per socket we listen on for incoming queries there is one thread.
102 Then we have a bunch of connected sockets for talking to downstream servers.
103 We send directly to those sockets.
104
105 For the return path, per downstream server we have a thread that listens to responses.
106
107 Per socket there is an array of 2^16 states, when we send out a packet downstream, we note
108 there the original requestor and the original id. The new ID is the offset in the array.
109
110 When an answer comes in on a socket, we look up the offset by the id, and lob it to the
111 original requestor.
112
113 IDs are assigned by atomic increments of the socket offset.
114 */
115
116 /* for our load balancing, we want to support:
117 Round-robin
118 Round-robin with basic uptime checks
119 Send to least loaded server (least outstanding)
120 Send it to the first server that is not overloaded
121 Hashed weighted random
122 */
123
124 /* Idea:
125 Multiple server groups, by default we load balance to the group with no name.
126 Each instance is either 'up', 'down' or 'auto', where 'auto' means that dnsdist
127 determines if the instance is up or not. Auto should be the default and very very good.
128
129 In addition, to each instance you can attach a QPS object with rate & burst, which will optionally
130 limit the amount of queries we send there.
131
132 If all downstreams are over QPS, we pick the fastest server */
133
134 GlobalStateHolder<vector<pair<std::shared_ptr<DNSRule>, std::shared_ptr<DNSAction> > > > g_rulactions;
135 GlobalStateHolder<vector<pair<std::shared_ptr<DNSRule>, std::shared_ptr<DNSResponseAction> > > > g_resprulactions;
136 GlobalStateHolder<vector<pair<std::shared_ptr<DNSRule>, std::shared_ptr<DNSResponseAction> > > > g_cachehitresprulactions;
137 Rings g_rings;
138 QueryCount g_qcount;
139
140 GlobalStateHolder<servers_t> g_dstates;
141 GlobalStateHolder<NetmaskTree<DynBlock>> g_dynblockNMG;
142 GlobalStateHolder<SuffixMatchTree<DynBlock>> g_dynblockSMT;
143 DNSAction::Action g_dynBlockAction = DNSAction::Action::Drop;
144 int g_tcpRecvTimeout{2};
145 int g_tcpSendTimeout{2};
146 int g_udpTimeout{2};
147
148 bool g_servFailOnNoPolicy{false};
149 bool g_truncateTC{false};
150 bool g_fixupCase{0};
151
152 static const size_t s_udpIncomingBufferSize{1500};
153
154 static void truncateTC(const char* packet, uint16_t* len)
155 try
156 {
157 unsigned int consumed;
158 DNSName qname(packet, *len, sizeof(dnsheader), false, 0, 0, &consumed);
159 *len=(uint16_t) (sizeof(dnsheader)+consumed+DNS_TYPE_SIZE+DNS_CLASS_SIZE);
160 struct dnsheader* dh =(struct dnsheader*)packet;
161 dh->ancount = dh->arcount = dh->nscount=0;
162 }
163 catch(...)
164 {
165 g_stats.truncFail++;
166 }
167
168 struct DelayedPacket
169 {
170 int fd;
171 string packet;
172 ComboAddress destination;
173 ComboAddress origDest;
174 void operator()()
175 {
176 ssize_t res;
177 if(origDest.sin4.sin_family == 0) {
178 res = sendto(fd, packet.c_str(), packet.size(), 0, (struct sockaddr*)&destination, destination.getSocklen());
179 }
180 else {
181 res = sendfromto(fd, packet.c_str(), packet.size(), 0, origDest, destination);
182 }
183 if (res == -1) {
184 int err = errno;
185 vinfolog("Error sending delayed response to %s: %s", destination.toStringWithPort(), strerror(err));
186 }
187 }
188 };
189
190 DelayPipe<DelayedPacket> * g_delay = 0;
191
192 static void doLatencyAverages(double udiff)
193 {
194 auto doAvg = [](double& var, double n, double weight) {
195 var = (weight -1) * var/weight + n/weight;
196 };
197
198 doAvg(g_stats.latencyAvg100, udiff, 100);
199 doAvg(g_stats.latencyAvg1000, udiff, 1000);
200 doAvg(g_stats.latencyAvg10000, udiff, 10000);
201 doAvg(g_stats.latencyAvg1000000, udiff, 1000000);
202 }
203
204 bool responseContentMatches(const char* response, const uint16_t responseLen, const DNSName& qname, const uint16_t qtype, const uint16_t qclass, const ComboAddress& remote)
205 {
206 uint16_t rqtype, rqclass;
207 unsigned int consumed;
208 DNSName rqname;
209 const struct dnsheader* dh = (struct dnsheader*) response;
210
211 if (responseLen < sizeof(dnsheader)) {
212 return false;
213 }
214
215 if (dh->qdcount == 0) {
216 if (dh->rcode != RCode::NoError && dh->rcode != RCode::NXDomain) {
217 return true;
218 }
219 else {
220 g_stats.nonCompliantResponses++;
221 return false;
222 }
223 }
224
225 try {
226 rqname=DNSName(response, responseLen, sizeof(dnsheader), false, &rqtype, &rqclass, &consumed);
227 }
228 catch(std::exception& e) {
229 if(responseLen > (ssize_t)sizeof(dnsheader))
230 infolog("Backend %s sent us a response with id %d that did not parse: %s", remote.toStringWithPort(), ntohs(dh->id), e.what());
231 g_stats.nonCompliantResponses++;
232 return false;
233 }
234
235 if (rqtype != qtype || rqclass != qclass || rqname != qname) {
236 return false;
237 }
238
239 return true;
240 }
241
242 void restoreFlags(struct dnsheader* dh, uint16_t origFlags)
243 {
244 static const uint16_t rdMask = 1 << FLAGS_RD_OFFSET;
245 static const uint16_t cdMask = 1 << FLAGS_CD_OFFSET;
246 static const uint16_t restoreFlagsMask = UINT16_MAX & ~(rdMask | cdMask);
247 uint16_t * flags = getFlagsFromDNSHeader(dh);
248 /* clear the flags we are about to restore */
249 *flags &= restoreFlagsMask;
250 /* only keep the flags we want to restore */
251 origFlags &= ~restoreFlagsMask;
252 /* set the saved flags as they were */
253 *flags |= origFlags;
254 }
255
256 bool fixUpResponse(char** response, uint16_t* responseLen, size_t* responseSize, const DNSName& qname, uint16_t origFlags, bool ednsAdded, bool ecsAdded, std::vector<uint8_t>& rewrittenResponse, uint16_t addRoom)
257 {
258 struct dnsheader* dh = (struct dnsheader*) *response;
259
260 if (*responseLen < sizeof(dnsheader)) {
261 return false;
262 }
263
264 restoreFlags(dh, origFlags);
265
266 if (*responseLen == sizeof(dnsheader)) {
267 return true;
268 }
269
270 if(g_fixupCase) {
271 string realname = qname.toDNSString();
272 if (*responseLen >= (sizeof(dnsheader) + realname.length())) {
273 memcpy(*response + sizeof(dnsheader), realname.c_str(), realname.length());
274 }
275 }
276
277 if (ednsAdded || ecsAdded) {
278 char * optStart = NULL;
279 size_t optLen = 0;
280 bool last = false;
281
282 int res = locateEDNSOptRR(*response, *responseLen, &optStart, &optLen, &last);
283
284 if (res == 0) {
285 if (ednsAdded) {
286 /* we added the entire OPT RR,
287 therefore we need to remove it entirely */
288 if (last) {
289 /* simply remove the last AR */
290 *responseLen -= optLen;
291 uint16_t arcount = ntohs(dh->arcount);
292 arcount--;
293 dh->arcount = htons(arcount);
294 }
295 else {
296 /* Removing an intermediary RR could lead to compression error */
297 if (rewriteResponseWithoutEDNS(*response, *responseLen, rewrittenResponse) == 0) {
298 *responseLen = rewrittenResponse.size();
299 if (addRoom && (UINT16_MAX - *responseLen) > addRoom) {
300 rewrittenResponse.reserve(*responseLen + addRoom);
301 }
302 *responseSize = rewrittenResponse.capacity();
303 *response = reinterpret_cast<char*>(rewrittenResponse.data());
304 }
305 else {
306 warnlog("Error rewriting content");
307 }
308 }
309 }
310 else {
311 /* the OPT RR was already present, but without ECS,
312 we need to remove the ECS option if any */
313 if (last) {
314 /* nothing after the OPT RR, we can simply remove the
315 ECS option */
316 size_t existingOptLen = optLen;
317 removeEDNSOptionFromOPT(optStart, &optLen, EDNSOptionCode::ECS);
318 *responseLen -= (existingOptLen - optLen);
319 }
320 else {
321 /* Removing an intermediary RR could lead to compression error */
322 if (rewriteResponseWithoutEDNSOption(*response, *responseLen, EDNSOptionCode::ECS, rewrittenResponse) == 0) {
323 *responseLen = rewrittenResponse.size();
324 if (addRoom && (UINT16_MAX - *responseLen) > addRoom) {
325 rewrittenResponse.reserve(*responseLen + addRoom);
326 }
327 *responseSize = rewrittenResponse.capacity();
328 *response = reinterpret_cast<char*>(rewrittenResponse.data());
329 }
330 else {
331 warnlog("Error rewriting content");
332 }
333 }
334 }
335 }
336 }
337
338 return true;
339 }
340
341 #ifdef HAVE_DNSCRYPT
342 bool encryptResponse(char* response, uint16_t* responseLen, size_t responseSize, bool tcp, std::shared_ptr<DnsCryptQuery> dnsCryptQuery, dnsheader** dh, dnsheader* dhCopy)
343 {
344 if (dnsCryptQuery) {
345 uint16_t encryptedResponseLen = 0;
346
347 /* save the original header before encrypting it in place */
348 if (dh != nullptr && *dh != nullptr && dhCopy != nullptr) {
349 memcpy(dhCopy, *dh, sizeof(dnsheader));
350 *dh = dhCopy;
351 }
352
353 int res = dnsCryptQuery->ctx->encryptResponse(response, *responseLen, responseSize, dnsCryptQuery, tcp, &encryptedResponseLen);
354 if (res == 0) {
355 *responseLen = encryptedResponseLen;
356 } else {
357 /* dropping response */
358 vinfolog("Error encrypting the response, dropping.");
359 return false;
360 }
361 }
362 return true;
363 }
364 #endif
365
366 static bool sendUDPResponse(int origFD, char* response, uint16_t responseLen, int delayMsec, const ComboAddress& origDest, const ComboAddress& origRemote)
367 {
368 if(delayMsec && g_delay) {
369 DelayedPacket dp{origFD, string(response,responseLen), origRemote, origDest};
370 g_delay->submit(dp, delayMsec);
371 }
372 else {
373 ssize_t res;
374 if(origDest.sin4.sin_family == 0) {
375 res = sendto(origFD, response, responseLen, 0, (struct sockaddr*)&origRemote, origRemote.getSocklen());
376 }
377 else {
378 res = sendfromto(origFD, response, responseLen, 0, origDest, origRemote);
379 }
380 if (res == -1) {
381 int err = errno;
382 vinfolog("Error sending response to %s: %s", origRemote.toStringWithPort(), strerror(err));
383 }
384 }
385
386 return true;
387 }
388
389 // listens on a dedicated socket, lobs answers from downstream servers to original requestors
390 void* responderThread(std::shared_ptr<DownstreamState> state)
391 try {
392 auto localRespRulactions = g_resprulactions.getLocal();
393 #ifdef HAVE_DNSCRYPT
394 char packet[4096 + DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE];
395 /* when the answer is encrypted in place, we need to get a copy
396 of the original header before encryption to fill the ring buffer */
397 dnsheader dhCopy;
398 #else
399 char packet[4096];
400 #endif
401 static_assert(sizeof(packet) <= UINT16_MAX, "Packet size should fit in a uint16_t");
402 vector<uint8_t> rewrittenResponse;
403
404 uint16_t queryId = 0;
405 for(;;) {
406 dnsheader* dh = reinterpret_cast<struct dnsheader*>(packet);
407 try {
408 ssize_t got = recv(state->fd, packet, sizeof(packet), 0);
409 char * response = packet;
410 size_t responseSize = sizeof(packet);
411
412 if (got < (ssize_t) sizeof(dnsheader))
413 continue;
414
415 uint16_t responseLen = (uint16_t) got;
416 queryId = dh->id;
417
418 if(queryId >= state->idStates.size())
419 continue;
420
421 IDState* ids = &state->idStates[queryId];
422 int origFD = ids->origFD;
423
424 if(origFD < 0) // duplicate
425 continue;
426
427 /* setting age to 0 to prevent the maintainer thread from
428 cleaning this IDS while we process the response.
429 We have already a copy of the origFD, so it would
430 mostly mess up the outstanding counter.
431 */
432 ids->age = 0;
433
434 if (!responseContentMatches(response, responseLen, ids->qname, ids->qtype, ids->qclass, state->remote)) {
435 continue;
436 }
437
438 --state->outstanding; // you'd think an attacker could game this, but we're using connected socket
439
440 if(dh->tc && g_truncateTC) {
441 truncateTC(response, &responseLen);
442 }
443
444 dh->id = ids->origID;
445
446 uint16_t addRoom = 0;
447 DNSResponse dr(&ids->qname, ids->qtype, ids->qclass, &ids->origDest, &ids->origRemote, dh, sizeof(packet), responseLen, false, &ids->sentTime.d_start);
448 #ifdef HAVE_PROTOBUF
449 dr.uniqueId = ids->uniqueId;
450 #endif
451 if (!processResponse(localRespRulactions, dr, &ids->delayMsec)) {
452 continue;
453 }
454
455 #ifdef HAVE_DNSCRYPT
456 if (ids->dnsCryptQuery) {
457 addRoom = DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE;
458 }
459 #endif
460 if (!fixUpResponse(&response, &responseLen, &responseSize, ids->qname, ids->origFlags, ids->ednsAdded, ids->ecsAdded, rewrittenResponse, addRoom)) {
461 continue;
462 }
463
464 if (ids->packetCache && !ids->skipCache) {
465 ids->packetCache->insert(ids->cacheKey, ids->qname, ids->qtype, ids->qclass, response, responseLen, false, dh->rcode);
466 }
467
468 if (ids->cs && !ids->cs->muted) {
469 #ifdef HAVE_DNSCRYPT
470 if (!encryptResponse(response, &responseLen, responseSize, false, ids->dnsCryptQuery, &dh, &dhCopy)) {
471 continue;
472 }
473 #endif
474
475 ComboAddress empty;
476 empty.sin4.sin_family = 0;
477 /* if ids->destHarvested is false, origDest holds the listening address.
478 We don't want to use that as a source since it could be 0.0.0.0 for example. */
479 sendUDPResponse(origFD, response, responseLen, ids->delayMsec, ids->destHarvested ? ids->origDest : empty, ids->origRemote);
480 }
481
482 g_stats.responses++;
483
484 double udiff = ids->sentTime.udiff();
485 vinfolog("Got answer from %s, relayed to %s, took %f usec", state->remote.toStringWithPort(), ids->origRemote.toStringWithPort(), udiff);
486
487 {
488 struct timespec ts;
489 gettime(&ts);
490 std::lock_guard<std::mutex> lock(g_rings.respMutex);
491 g_rings.respRing.push_back({ts, ids->origRemote, ids->qname, ids->qtype, (unsigned int)udiff, (unsigned int)got, *dh, state->remote});
492 }
493
494 if(dh->rcode == RCode::ServFail)
495 g_stats.servfailResponses++;
496 state->latencyUsec = (127.0 * state->latencyUsec / 128.0) + udiff/128.0;
497
498 if(udiff < 1000) g_stats.latency0_1++;
499 else if(udiff < 10000) g_stats.latency1_10++;
500 else if(udiff < 50000) g_stats.latency10_50++;
501 else if(udiff < 100000) g_stats.latency50_100++;
502 else if(udiff < 1000000) g_stats.latency100_1000++;
503 else g_stats.latencySlow++;
504
505 doLatencyAverages(udiff);
506
507 if (ids->origFD == origFD) {
508 #ifdef HAVE_DNSCRYPT
509 ids->dnsCryptQuery = nullptr;
510 #endif
511 ids->origFD = -1;
512 }
513
514 rewrittenResponse.clear();
515 }
516 catch(std::exception& e){
517 vinfolog("Got an error in UDP responder thread while parsing a response from %s, id %d: %s", state->remote.toStringWithPort(), queryId, e.what());
518 }
519 }
520 return 0;
521 }
522 catch(const std::exception& e)
523 {
524 errlog("UDP responder thread died because of exception: %s", e.what());
525 return 0;
526 }
527 catch(const PDNSException& e)
528 {
529 errlog("UDP responder thread died because of PowerDNS exception: %s", e.reason);
530 return 0;
531 }
532 catch(...)
533 {
534 errlog("UDP responder thread died because of an exception: %s", "unknown");
535 return 0;
536 }
537
538 void DownstreamState::reconnect()
539 {
540 connected = false;
541 if (fd != -1) {
542 /* shutdown() is needed to wake up recv() in the responderThread */
543 shutdown(fd, SHUT_RDWR);
544 close(fd);
545 fd = -1;
546 }
547 if (!IsAnyAddress(remote)) {
548 fd = SSocket(remote.sin4.sin_family, SOCK_DGRAM, 0);
549 if (!IsAnyAddress(sourceAddr)) {
550 SSetsockopt(fd, SOL_SOCKET, SO_REUSEADDR, 1);
551 SBind(fd, sourceAddr);
552 }
553 try {
554 SConnect(fd, remote);
555 connected = true;
556 }
557 catch(const std::runtime_error& error) {
558 infolog("Error connecting to new server with address %s: %s", remote.toStringWithPort(), error.what());
559 }
560 }
561 }
562
563 DownstreamState::DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf_): remote(remote_), sourceAddr(sourceAddr_), sourceItf(sourceItf_)
564 {
565 if (!IsAnyAddress(remote)) {
566 reconnect();
567 idStates.resize(g_maxOutstanding);
568 sw.start();
569 infolog("Added downstream server %s", remote.toStringWithPort());
570 }
571 }
572
573 std::mutex g_luamutex;
574 LuaContext g_lua;
575
576 GlobalStateHolder<ServerPolicy> g_policy;
577
578 shared_ptr<DownstreamState> firstAvailable(const NumberedServerVector& servers, const DNSQuestion* dq)
579 {
580 for(auto& d : servers) {
581 if(d.second->isUp() && d.second->qps.check())
582 return d.second;
583 }
584 return leastOutstanding(servers, dq);
585 }
586
587 // get server with least outstanding queries, and within those, with the lowest order, and within those: the fastest
588 shared_ptr<DownstreamState> leastOutstanding(const NumberedServerVector& servers, const DNSQuestion* dq)
589 {
590 if (servers.size() == 1 && servers[0].second->isUp()) {
591 return servers[0].second;
592 }
593
594 vector<pair<tuple<int,int,double>, shared_ptr<DownstreamState>>> poss;
595 /* so you might wonder, why do we go through this trouble? The data on which we sort could change during the sort,
596 which would suck royally and could even lead to crashes. So first we snapshot on what we sort, and then we sort */
597 poss.reserve(servers.size());
598 for(auto& d : servers) {
599 if(d.second->isUp()) {
600 poss.push_back({make_tuple(d.second->outstanding.load(), d.second->order, d.second->latencyUsec), d.second});
601 }
602 }
603 if(poss.empty())
604 return shared_ptr<DownstreamState>();
605 nth_element(poss.begin(), poss.begin(), poss.end(), [](const decltype(poss)::value_type& a, const decltype(poss)::value_type& b) { return a.first < b.first; });
606 return poss.begin()->second;
607 }
608
609 shared_ptr<DownstreamState> valrandom(unsigned int val, const NumberedServerVector& servers, const DNSQuestion* dq)
610 {
611 vector<pair<int, shared_ptr<DownstreamState>>> poss;
612 int sum=0;
613 for(auto& d : servers) { // w=1, w=10 -> 1, 11
614 if(d.second->isUp()) {
615 sum+=d.second->weight;
616 poss.push_back({sum, d.second});
617 }
618 }
619
620 // Catch poss & sum are empty to avoid SIGFPE
621 if(poss.empty())
622 return shared_ptr<DownstreamState>();
623
624 int r = val % sum;
625 auto p = upper_bound(poss.begin(), poss.end(),r, [](int r_, const decltype(poss)::value_type& a) { return r_ < a.first;});
626 if(p==poss.end())
627 return shared_ptr<DownstreamState>();
628 return p->second;
629 }
630
631 shared_ptr<DownstreamState> wrandom(const NumberedServerVector& servers, const DNSQuestion* dq)
632 {
633 return valrandom(random(), servers, dq);
634 }
635
636 uint32_t g_hashperturb;
637 shared_ptr<DownstreamState> whashed(const NumberedServerVector& servers, const DNSQuestion* dq)
638 {
639 return valrandom(dq->qname->hash(g_hashperturb), servers, dq);
640 }
641
642
643 shared_ptr<DownstreamState> roundrobin(const NumberedServerVector& servers, const DNSQuestion* dq)
644 {
645 NumberedServerVector poss;
646
647 for(auto& d : servers) {
648 if(d.second->isUp()) {
649 poss.push_back(d);
650 }
651 }
652
653 const auto *res=&poss;
654 if(poss.empty())
655 res = &servers;
656
657 if(res->empty())
658 return shared_ptr<DownstreamState>();
659
660 static unsigned int counter;
661
662 return (*res)[(counter++) % res->size()].second;
663 }
664
665 static void writepid(string pidfile) {
666 if (!pidfile.empty()) {
667 // Clean up possible stale file
668 unlink(pidfile.c_str());
669
670 // Write the pidfile
671 ofstream of(pidfile.c_str());
672 if (of) {
673 of << getpid();
674 } else {
675 errlog("Unable to write PID-file to '%s'.", pidfile);
676 }
677 of.close();
678 }
679 }
680
681 static void daemonize(void)
682 {
683 if(fork())
684 _exit(0); // bye bye
685 /* We are child */
686
687 setsid();
688
689 int i=open("/dev/null",O_RDWR); /* open stdin */
690 if(i < 0)
691 ; // L<<Logger::Critical<<"Unable to open /dev/null: "<<stringerror()<<endl;
692 else {
693 dup2(i,0); /* stdin */
694 dup2(i,1); /* stderr */
695 dup2(i,2); /* stderr */
696 close(i);
697 }
698 }
699
700 ComboAddress g_serverControl{"127.0.0.1:5199"};
701
702 std::shared_ptr<ServerPool> createPoolIfNotExists(pools_t& pools, const string& poolName)
703 {
704 std::shared_ptr<ServerPool> pool;
705 pools_t::iterator it = pools.find(poolName);
706 if (it != pools.end()) {
707 pool = it->second;
708 }
709 else {
710 if (!poolName.empty())
711 vinfolog("Creating pool %s", poolName);
712 pool = std::make_shared<ServerPool>();
713 pools.insert(std::pair<std::string,std::shared_ptr<ServerPool> >(poolName, pool));
714 }
715 return pool;
716 }
717
718 void setPoolPolicy(pools_t& pools, const string& poolName, std::shared_ptr<ServerPolicy> policy)
719 {
720 std::shared_ptr<ServerPool> pool = createPoolIfNotExists(pools, poolName);
721 if (!poolName.empty()) {
722 vinfolog("Setting pool %s server selection policy to %s", poolName, policy->name);
723 } else {
724 vinfolog("Setting default pool server selection policy to %s", policy->name);
725 }
726 pool->policy = policy;
727 }
728
729 void addServerToPool(pools_t& pools, const string& poolName, std::shared_ptr<DownstreamState> server)
730 {
731 std::shared_ptr<ServerPool> pool = createPoolIfNotExists(pools, poolName);
732 unsigned int count = (unsigned int) pool->servers.size();
733 if (!poolName.empty()) {
734 vinfolog("Adding server to pool %s", poolName);
735 } else {
736 vinfolog("Adding server to default pool");
737 }
738 pool->servers.push_back(make_pair(++count, server));
739 /* we need to reorder based on the server 'order' */
740 std::stable_sort(pool->servers.begin(), pool->servers.end(), [](const std::pair<unsigned int,std::shared_ptr<DownstreamState> >& a, const std::pair<unsigned int,std::shared_ptr<DownstreamState> >& b) {
741 return a.second->order < b.second->order;
742 });
743 /* and now we need to renumber for Lua (custom policies) */
744 size_t idx = 1;
745 for (auto& serv : pool->servers) {
746 serv.first = idx++;
747 }
748 }
749
750 void removeServerFromPool(pools_t& pools, const string& poolName, std::shared_ptr<DownstreamState> server)
751 {
752 std::shared_ptr<ServerPool> pool = getPool(pools, poolName);
753
754 if (!poolName.empty()) {
755 vinfolog("Removing server from pool %s", poolName);
756 }
757 else {
758 vinfolog("Removing server from default pool");
759 }
760
761 size_t idx = 1;
762 bool found = false;
763 for (NumberedVector<shared_ptr<DownstreamState> >::iterator it = pool->servers.begin(); it != pool->servers.end();) {
764 if (found) {
765 /* we need to renumber the servers placed
766 after the removed one, for Lua (custom policies) */
767 it->first = idx++;
768 it++;
769 }
770 else if (it->second == server) {
771 it = pool->servers.erase(it);
772 found = true;
773 } else {
774 idx++;
775 it++;
776 }
777 }
778 }
779
780 std::shared_ptr<ServerPool> getPool(const pools_t& pools, const std::string& poolName)
781 {
782 pools_t::const_iterator it = pools.find(poolName);
783
784 if (it == pools.end()) {
785 throw std::out_of_range("No pool named " + poolName);
786 }
787
788 return it->second;
789 }
790
791 const NumberedServerVector& getDownstreamCandidates(const pools_t& pools, const std::string& poolName)
792 {
793 std::shared_ptr<ServerPool> pool = getPool(pools, poolName);
794 return pool->servers;
795 }
796
797 // goal in life - if you send us a reasonably normal packet, we'll get Z for you, otherwise 0
798 int getEDNSZ(const char* packet, unsigned int len)
799 try
800 {
801 struct dnsheader* dh =(struct dnsheader*)packet;
802
803 if(ntohs(dh->qdcount) != 1 || dh->ancount!=0 || ntohs(dh->arcount)!=1 || dh->nscount!=0)
804 return 0;
805
806 if (len <= sizeof(dnsheader))
807 return 0;
808
809 unsigned int consumed;
810 DNSName qname(packet, len, sizeof(dnsheader), false, 0, 0, &consumed);
811 size_t pos = consumed + DNS_TYPE_SIZE + DNS_CLASS_SIZE;
812 uint16_t qtype, qclass;
813
814 if (len <= (sizeof(dnsheader)+pos))
815 return 0;
816
817 DNSName aname(packet, len, sizeof(dnsheader)+pos, true, &qtype, &qclass, &consumed);
818
819 if(qtype!=QType::OPT || sizeof(dnsheader)+pos+consumed+DNS_TYPE_SIZE+DNS_CLASS_SIZE+EDNS_EXTENDED_RCODE_SIZE+EDNS_VERSION_SIZE+1 >= len)
820 return 0;
821
822 uint8_t* z = (uint8_t*)packet+sizeof(dnsheader)+pos+consumed+DNS_TYPE_SIZE+DNS_CLASS_SIZE+EDNS_EXTENDED_RCODE_SIZE+EDNS_VERSION_SIZE;
823 return 0x100 * (*z) + *(z+1);
824 }
825 catch(...)
826 {
827 return 0;
828 }
829
830 static void spoofResponseFromString(DNSQuestion& dq, const string& spoofContent)
831 {
832 string result;
833
834 std::vector<std::string> addrs;
835 stringtok(addrs, spoofContent, " ,");
836
837 if (addrs.size() == 1) {
838 try {
839 ComboAddress spoofAddr(spoofContent);
840 SpoofAction sa({spoofAddr});
841 sa(&dq, &result);
842 }
843 catch(const PDNSException &e) {
844 SpoofAction sa(spoofContent); // CNAME then
845 sa(&dq, &result);
846 }
847 } else {
848 std::vector<ComboAddress> cas;
849 for (const auto& addr : addrs) {
850 try {
851 cas.push_back(ComboAddress(addr));
852 }
853 catch (...) {
854 }
855 }
856 SpoofAction sa(cas);
857 sa(&dq, &result);
858 }
859 }
860
861 bool processQuery(LocalHolders& holders, DNSQuestion& dq, string& poolname, int* delayMsec, const struct timespec& now)
862 {
863 {
864 WriteLock wl(&g_rings.queryLock);
865 g_rings.queryRing.push_back({now,*dq.remote,*dq.qname,dq.len,dq.qtype,*dq.dh});
866 }
867
868 if(g_qcount.enabled) {
869 string qname = (*dq.qname).toString(".");
870 bool countQuery{true};
871 if(g_qcount.filter) {
872 std::lock_guard<std::mutex> lock(g_luamutex);
873 std::tie (countQuery, qname) = g_qcount.filter(dq);
874 }
875
876 if(countQuery) {
877 WriteLock wl(&g_qcount.queryLock);
878 if(!g_qcount.records.count(qname)) {
879 g_qcount.records[qname] = 0;
880 }
881 g_qcount.records[qname]++;
882 }
883 }
884
885 if(auto got = holders.dynNMGBlock->lookup(*dq.remote)) {
886 auto updateBlockStats = [&got]() {
887 g_stats.dynBlocked++;
888 got->second.blocks++;
889 };
890
891 if(now < got->second.until) {
892 DNSAction::Action action = got->second.action;
893 if (action == DNSAction::Action::None) {
894 action = g_dynBlockAction;
895 }
896 if (action == DNSAction::Action::Refused) {
897 vinfolog("Query from %s refused because of dynamic block", dq.remote->toStringWithPort());
898 updateBlockStats();
899
900 dq.dh->rcode = RCode::Refused;
901 dq.dh->qr=true;
902 return true;
903 }
904 else if (action == DNSAction::Action::Truncate) {
905 if(!dq.tcp) {
906 updateBlockStats();
907 vinfolog("Query from %s truncated because of dynamic block", dq.remote->toStringWithPort());
908 dq.dh->tc = true;
909 dq.dh->qr = true;
910 return true;
911 }
912 else {
913 vinfolog("Query from %s for %s over TCP *not* truncated because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toString());
914 }
915
916 }
917 else {
918 updateBlockStats();
919 vinfolog("Query from %s dropped because of dynamic block", dq.remote->toStringWithPort());
920 return false;
921 }
922 }
923 }
924
925 if(auto got = holders.dynSMTBlock->lookup(*dq.qname)) {
926 auto updateBlockStats = [&got]() {
927 g_stats.dynBlocked++;
928 got->blocks++;
929 };
930
931 if(now < got->until) {
932 DNSAction::Action action = got->action;
933 if (action == DNSAction::Action::None) {
934 action = g_dynBlockAction;
935 }
936 if (action == DNSAction::Action::Refused) {
937 vinfolog("Query from %s for %s refused because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toString());
938 updateBlockStats();
939
940 dq.dh->rcode = RCode::Refused;
941 dq.dh->qr=true;
942 return true;
943 }
944 else if (action == DNSAction::Action::Truncate) {
945 if(!dq.tcp) {
946 updateBlockStats();
947
948 vinfolog("Query from %s for %s truncated because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toString());
949 dq.dh->tc = true;
950 dq.dh->qr = true;
951 return true;
952 }
953 else {
954 vinfolog("Query from %s for %s over TCP *not* truncated because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toString());
955 }
956 }
957 else {
958 updateBlockStats();
959 vinfolog("Query from %s for %s dropped because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toString());
960 return false;
961 }
962 }
963 }
964
965 DNSAction::Action action=DNSAction::Action::None;
966 string ruleresult;
967 for(const auto& lr : *holders.rulactions) {
968 if(lr.first->matches(&dq)) {
969 lr.first->d_matches++;
970 action=(*lr.second)(&dq, &ruleresult);
971
972 switch(action) {
973 case DNSAction::Action::Allow:
974 return true;
975 break;
976 case DNSAction::Action::Drop:
977 g_stats.ruleDrop++;
978 return false;
979 break;
980 case DNSAction::Action::Nxdomain:
981 dq.dh->rcode = RCode::NXDomain;
982 dq.dh->qr=true;
983 g_stats.ruleNXDomain++;
984 return true;
985 break;
986 case DNSAction::Action::Refused:
987 dq.dh->rcode = RCode::Refused;
988 dq.dh->qr=true;
989 g_stats.ruleRefused++;
990 return true;
991 break;
992 case DNSAction::Action::Spoof:
993 spoofResponseFromString(dq, ruleresult);
994 return true;
995 break;
996 case DNSAction::Action::Truncate:
997 dq.dh->tc = true;
998 dq.dh->qr = true;
999 return true;
1000 break;
1001 case DNSAction::Action::HeaderModify:
1002 return true;
1003 break;
1004 case DNSAction::Action::Pool:
1005 poolname=ruleresult;
1006 return true;
1007 break;
1008 /* non-terminal actions follow */
1009 case DNSAction::Action::Delay:
1010 *delayMsec = static_cast<int>(pdns_stou(ruleresult)); // sorry
1011 break;
1012 case DNSAction::Action::None:
1013 break;
1014 }
1015 }
1016 }
1017
1018 return true;
1019 }
1020
1021 bool processResponse(LocalStateHolder<vector<pair<std::shared_ptr<DNSRule>, std::shared_ptr<DNSResponseAction> > > >& localRespRulactions, DNSResponse& dr, int* delayMsec)
1022 {
1023 DNSResponseAction::Action action=DNSResponseAction::Action::None;
1024 std::string ruleresult;
1025 for(const auto& lr : *localRespRulactions) {
1026 if(lr.first->matches(&dr)) {
1027 lr.first->d_matches++;
1028 action=(*lr.second)(&dr, &ruleresult);
1029 switch(action) {
1030 case DNSResponseAction::Action::Allow:
1031 return true;
1032 break;
1033 case DNSResponseAction::Action::Drop:
1034 return false;
1035 break;
1036 case DNSResponseAction::Action::HeaderModify:
1037 return true;
1038 break;
1039 /* non-terminal actions follow */
1040 case DNSResponseAction::Action::Delay:
1041 *delayMsec = static_cast<int>(pdns_stou(ruleresult)); // sorry
1042 break;
1043 case DNSResponseAction::Action::None:
1044 break;
1045 }
1046 }
1047 }
1048
1049 return true;
1050 }
1051
1052 static ssize_t udpClientSendRequestToBackend(DownstreamState* ss, const int sd, const char* request, const size_t requestLen)
1053 {
1054 ssize_t result;
1055
1056 if (ss->sourceItf == 0) {
1057 result = send(sd, request, requestLen, 0);
1058 }
1059 else {
1060 struct msghdr msgh;
1061 struct iovec iov;
1062 char cbuf[256];
1063 fillMSGHdr(&msgh, &iov, cbuf, sizeof(cbuf), const_cast<char*>(request), requestLen, &ss->remote);
1064 addCMsgSrcAddr(&msgh, cbuf, &ss->sourceAddr, ss->sourceItf);
1065 result = sendmsg(sd, &msgh, 0);
1066 }
1067
1068 if (result == -1) {
1069 int savederrno = errno;
1070 vinfolog("Error sending request to backend %s: %d", ss->remote.toStringWithPort(), savederrno);
1071
1072 /* This might sound silly, but on Linux send() might fail with EINVAL
1073 if the interface the socket was bound to doesn't exist anymore. */
1074 if (savederrno == EINVAL) {
1075 ss->reconnect();
1076 }
1077 }
1078
1079 return result;
1080 }
1081
1082 static bool isUDPQueryAcceptable(ClientState& cs, LocalHolders& holders, const struct msghdr* msgh, const ComboAddress& remote, ComboAddress& dest)
1083 {
1084 if (msgh->msg_flags & MSG_TRUNC) {
1085 /* message was too large for our buffer */
1086 vinfolog("Dropping message too large for our buffer");
1087 g_stats.nonCompliantQueries++;
1088 return false;
1089 }
1090
1091 if(!holders.acl->match(remote)) {
1092 vinfolog("Query from %s dropped because of ACL", remote.toStringWithPort());
1093 g_stats.aclDrops++;
1094 return false;
1095 }
1096
1097 cs.queries++;
1098 g_stats.queries++;
1099
1100 if (HarvestDestinationAddress(msgh, &dest)) {
1101 /* we don't get the port, only the address */
1102 dest.sin4.sin_port = cs.local.sin4.sin_port;
1103 }
1104 else {
1105 dest.sin4.sin_family = 0;
1106 }
1107
1108 return true;
1109 }
1110
1111 #ifdef HAVE_DNSCRYPT
1112 static bool checkDNSCryptQuery(const ClientState& cs, const char* query, uint16_t& len, std::shared_ptr<DnsCryptQuery>& dnsCryptQuery, const ComboAddress& dest, const ComboAddress& remote)
1113 {
1114 if (cs.dnscryptCtx) {
1115 vector<uint8_t> response;
1116 uint16_t decryptedQueryLen = 0;
1117
1118 dnsCryptQuery = std::make_shared<DnsCryptQuery>();
1119
1120 bool decrypted = handleDnsCryptQuery(cs.dnscryptCtx, const_cast<char*>(query), len, dnsCryptQuery, &decryptedQueryLen, false, response);
1121
1122 if (!decrypted) {
1123 if (response.size() > 0) {
1124 sendUDPResponse(cs.udpFD, reinterpret_cast<char*>(response.data()), static_cast<uint16_t>(response.size()), 0, dest, remote);
1125 }
1126 return false;
1127 }
1128
1129 len = decryptedQueryLen;
1130 }
1131 return true;
1132 }
1133 #endif /* HAVE_DNSCRYPT */
1134
1135 bool checkQueryHeaders(const struct dnsheader* dh)
1136 {
1137 if (dh->qr) { // don't respond to responses
1138 g_stats.nonCompliantQueries++;
1139 return false;
1140 }
1141
1142 if (dh->qdcount == 0) {
1143 g_stats.emptyQueries++;
1144 return false;
1145 }
1146
1147 if (dh->rd) {
1148 g_stats.rdQueries++;
1149 }
1150
1151 return true;
1152 }
1153
1154 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1155 static void queueResponse(const ClientState& cs, const char* response, uint16_t responseLen, const ComboAddress& dest, const ComboAddress& remote, struct mmsghdr& outMsg, struct iovec* iov, char* cbuf)
1156 {
1157 outMsg.msg_len = 0;
1158 fillMSGHdr(&outMsg.msg_hdr, iov, nullptr, 0, const_cast<char*>(response), responseLen, const_cast<ComboAddress*>(&remote));
1159
1160 if (dest.sin4.sin_family == 0) {
1161 outMsg.msg_hdr.msg_control = nullptr;
1162 }
1163 else {
1164 addCMsgSrcAddr(&outMsg.msg_hdr, cbuf, &dest, 0);
1165 }
1166 }
1167 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1168
1169 static void processUDPQuery(ClientState& cs, LocalHolders& holders, const struct msghdr* msgh, const ComboAddress& remote, ComboAddress& dest, char* query, uint16_t len, size_t queryBufferSize, struct mmsghdr* responsesVect, unsigned int* queuedResponses, struct iovec* respIOV, char* respCBuf)
1170 {
1171 assert(responsesVect == nullptr || (queuedResponses != nullptr && respIOV != nullptr && respCBuf != nullptr));
1172 uint16_t queryId = 0;
1173
1174 try {
1175 if (!isUDPQueryAcceptable(cs, holders, msgh, remote, dest)) {
1176 return;
1177 }
1178
1179 #ifdef HAVE_DNSCRYPT
1180 std::shared_ptr<DnsCryptQuery> dnsCryptQuery = nullptr;
1181
1182 if (!checkDNSCryptQuery(cs, query, len, dnsCryptQuery, dest, remote)) {
1183 return;
1184 }
1185 #endif
1186
1187 struct dnsheader* dh = reinterpret_cast<struct dnsheader*>(query);
1188 queryId = ntohs(dh->id);
1189
1190 if (!checkQueryHeaders(dh)) {
1191 return;
1192 }
1193
1194 const uint16_t * flags = getFlagsFromDNSHeader(dh);
1195 const uint16_t origFlags = *flags;
1196 uint16_t qtype, qclass;
1197 unsigned int consumed = 0;
1198 DNSName qname(query, len, sizeof(dnsheader), false, &qtype, &qclass, &consumed);
1199 DNSQuestion dq(&qname, qtype, qclass, dest.sin4.sin_family != 0 ? &dest : &cs.local, &remote, dh, queryBufferSize, len, false);
1200
1201 string poolname;
1202 int delayMsec = 0;
1203 /* we need an accurate ("real") value for the response and
1204 to store into the IDS, but not for insertion into the
1205 rings for example */
1206 struct timespec realTime;
1207 struct timespec now;
1208 gettime(&now);
1209 gettime(&realTime, true);
1210
1211 if (!processQuery(holders, dq, poolname, &delayMsec, now))
1212 {
1213 return;
1214 }
1215
1216 if(dq.dh->qr) { // something turned it into a response
1217 g_stats.selfAnswered++;
1218 restoreFlags(dh, origFlags);
1219
1220 if (!cs.muted) {
1221 char* response = query;
1222 uint16_t responseLen = dq.len;
1223
1224 #ifdef HAVE_DNSCRYPT
1225 if (!encryptResponse(response, &responseLen, dq.size, false, dnsCryptQuery, nullptr, nullptr)) {
1226 return;
1227 }
1228 #endif
1229 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1230 if (delayMsec == 0 && responsesVect != nullptr) {
1231 queueResponse(cs, response, responseLen, dest, remote, responsesVect[*queuedResponses], respIOV, respCBuf);
1232 (*queuedResponses)++;
1233 }
1234 else
1235 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1236 {
1237 sendUDPResponse(cs.udpFD, response, responseLen, delayMsec, dest, remote);
1238 }
1239 }
1240
1241 return;
1242 }
1243
1244 DownstreamState* ss = nullptr;
1245 std::shared_ptr<ServerPool> serverPool = getPool(*holders.pools, poolname);
1246 std::shared_ptr<DNSDistPacketCache> packetCache = nullptr;
1247 auto policy = holders.policy->policy;
1248 if (serverPool->policy != nullptr) {
1249 policy = serverPool->policy->policy;
1250 }
1251 {
1252 std::lock_guard<std::mutex> lock(g_luamutex);
1253 ss = policy(serverPool->servers, &dq).get();
1254 packetCache = serverPool->packetCache;
1255 }
1256
1257 bool ednsAdded = false;
1258 bool ecsAdded = false;
1259 if (dq.useECS && ss && ss->useECS) {
1260 if (!handleEDNSClientSubnet(query, dq.size, consumed, &dq.len, &(ednsAdded), &(ecsAdded), remote, dq.ecsOverride, dq.ecsPrefixLength)) {
1261 vinfolog("Dropping query from %s because we couldn't insert the ECS value", remote.toStringWithPort());
1262 return;
1263 }
1264 }
1265
1266 uint32_t cacheKey = 0;
1267 if (packetCache && !dq.skipCache) {
1268 uint16_t cachedResponseSize = dq.size;
1269 uint32_t allowExpired = ss ? 0 : g_staleCacheEntriesTTL;
1270 if (packetCache->get(dq, consumed, dh->id, query, &cachedResponseSize, &cacheKey, allowExpired)) {
1271 DNSResponse dr(dq.qname, dq.qtype, dq.qclass, dq.local, dq.remote, reinterpret_cast<dnsheader*>(query), dq.size, cachedResponseSize, false, &realTime);
1272 #ifdef HAVE_PROTOBUF
1273 dr.uniqueId = dq.uniqueId;
1274 #endif
1275 if (!processResponse(holders.cacheHitRespRulactions, dr, &delayMsec)) {
1276 return;
1277 }
1278
1279 if (!cs.muted) {
1280 #ifdef HAVE_DNSCRYPT
1281 if (!encryptResponse(query, &cachedResponseSize, dq.size, false, dnsCryptQuery, nullptr, nullptr)) {
1282 return;
1283 }
1284 #endif
1285 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1286 if (delayMsec == 0 && responsesVect != nullptr) {
1287 queueResponse(cs, query, cachedResponseSize, dest, remote, responsesVect[*queuedResponses], respIOV, respCBuf);
1288 (*queuedResponses)++;
1289 }
1290 else
1291 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1292 {
1293 sendUDPResponse(cs.udpFD, query, cachedResponseSize, delayMsec, dest, remote);
1294 }
1295 }
1296
1297 g_stats.cacheHits++;
1298 g_stats.latency0_1++; // we're not going to measure this
1299 doLatencyAverages(0); // same
1300 return;
1301 }
1302 g_stats.cacheMisses++;
1303 }
1304
1305 if(!ss) {
1306 g_stats.noPolicy++;
1307
1308 if (g_servFailOnNoPolicy && !cs.muted) {
1309 char* response = query;
1310 uint16_t responseLen = dq.len;
1311 restoreFlags(dh, origFlags);
1312
1313 dq.dh->rcode = RCode::ServFail;
1314 dq.dh->qr = true;
1315
1316 #ifdef HAVE_DNSCRYPT
1317 if (!encryptResponse(response, &responseLen, dq.size, false, dnsCryptQuery, nullptr, nullptr)) {
1318 return;
1319 }
1320 #endif
1321 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1322 if (responsesVect != nullptr) {
1323 queueResponse(cs, response, responseLen, dest, remote, responsesVect[*queuedResponses], respIOV, respCBuf);
1324 (*queuedResponses)++;
1325 }
1326 else
1327 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1328 {
1329 sendUDPResponse(cs.udpFD, response, responseLen, 0, dest, remote);
1330 }
1331 }
1332 vinfolog("%s query for %s|%s from %s, no policy applied", g_servFailOnNoPolicy ? "Dropped" : "ServFailed", dq.qname->toString(), QType(dq.qtype).getName(), remote.toStringWithPort());
1333 return;
1334 }
1335
1336 ss->queries++;
1337
1338 unsigned int idOffset = (ss->idOffset++) % ss->idStates.size();
1339 IDState* ids = &ss->idStates[idOffset];
1340 ids->age = 0;
1341
1342 if(ids->origFD < 0) // if we are reusing, no change in outstanding
1343 ss->outstanding++;
1344 else {
1345 ss->reuseds++;
1346 g_stats.downstreamTimeouts++;
1347 }
1348
1349 ids->cs = &cs;
1350 ids->origFD = cs.udpFD;
1351 ids->origID = dh->id;
1352 ids->origRemote = remote;
1353 ids->sentTime.set(realTime);
1354 ids->qname = qname;
1355 ids->qtype = dq.qtype;
1356 ids->qclass = dq.qclass;
1357 ids->delayMsec = delayMsec;
1358 ids->origFlags = origFlags;
1359 ids->cacheKey = cacheKey;
1360 ids->skipCache = dq.skipCache;
1361 ids->packetCache = packetCache;
1362 ids->ednsAdded = ednsAdded;
1363 ids->ecsAdded = ecsAdded;
1364
1365 /* If we couldn't harvest the real dest addr, still
1366 write down the listening addr since it will be useful
1367 (especially if it's not an 'any' one).
1368 We need to keep track of which one it is since we may
1369 want to use the real but not the listening addr to reply.
1370 */
1371 if (dest.sin4.sin_family != 0) {
1372 ids->origDest = dest;
1373 ids->destHarvested = true;
1374 }
1375 else {
1376 ids->origDest = cs.local;
1377 ids->destHarvested = false;
1378 }
1379 #ifdef HAVE_DNSCRYPT
1380 ids->dnsCryptQuery = dnsCryptQuery;
1381 #endif
1382 #ifdef HAVE_PROTOBUF
1383 ids->uniqueId = dq.uniqueId;
1384 #endif
1385
1386 dh->id = idOffset;
1387
1388 ssize_t ret = udpClientSendRequestToBackend(ss, ss->fd, query, dq.len);
1389
1390 if(ret < 0) {
1391 ss->sendErrors++;
1392 g_stats.downstreamSendErrors++;
1393 }
1394
1395 vinfolog("Got query for %s|%s from %s, relayed to %s", ids->qname.toString(), QType(ids->qtype).getName(), remote.toStringWithPort(), ss->getName());
1396 }
1397 catch(const std::exception& e){
1398 vinfolog("Got an error in UDP question thread while parsing a query from %s, id %d: %s", remote.toStringWithPort(), queryId, e.what());
1399 }
1400 }
1401
1402 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1403 static void MultipleMessagesUDPClientThread(ClientState* cs, LocalHolders& holders)
1404 {
1405 struct MMReceiver
1406 {
1407 char packet[4096];
1408 /* used by HarvestDestinationAddress */
1409 char cbuf[256];
1410 ComboAddress remote;
1411 ComboAddress dest;
1412 struct iovec iov;
1413 };
1414 const size_t vectSize = g_udpVectorSize;
1415 /* the actual buffer is larger because:
1416 - we may have to add EDNS and/or ECS
1417 - we use it for self-generated responses (from rule or cache)
1418 but we only accept incoming payloads up to that size
1419 */
1420 static_assert(s_udpIncomingBufferSize <= sizeof(MMReceiver::packet), "the incoming buffer size should not be larger than sizeof(MMReceiver::packet)");
1421
1422 auto recvData = std::unique_ptr<MMReceiver[]>(new MMReceiver[vectSize]);
1423 auto msgVec = std::unique_ptr<struct mmsghdr[]>(new struct mmsghdr[vectSize]);
1424 auto outMsgVec = std::unique_ptr<struct mmsghdr[]>(new struct mmsghdr[vectSize]);
1425
1426 /* initialize the structures needed to receive our messages */
1427 for (size_t idx = 0; idx < vectSize; idx++) {
1428 recvData[idx].remote.sin4.sin_family = cs->local.sin4.sin_family;
1429 fillMSGHdr(&msgVec[idx].msg_hdr, &recvData[idx].iov, recvData[idx].cbuf, sizeof(recvData[idx].cbuf), recvData[idx].packet, s_udpIncomingBufferSize, &recvData[idx].remote);
1430 }
1431
1432 /* go now */
1433 for(;;) {
1434
1435 /* reset the IO vector, since it's also used to send the vector of responses
1436 to avoid having to copy the data around */
1437 for (size_t idx = 0; idx < vectSize; idx++) {
1438 recvData[idx].iov.iov_base = recvData[idx].packet;
1439 recvData[idx].iov.iov_len = sizeof(recvData[idx].packet);
1440 }
1441
1442 /* block until we have at least one message ready, but return
1443 as many as possible to save the syscall costs */
1444 int msgsGot = recvmmsg(cs->udpFD, msgVec.get(), vectSize, MSG_WAITFORONE | MSG_TRUNC, nullptr);
1445
1446 if (msgsGot <= 0) {
1447 vinfolog("Getting UDP messages via recvmmsg() failed with: %s", strerror(errno));
1448 continue;
1449 }
1450
1451 unsigned int msgsToSend = 0;
1452
1453 /* process the received messages */
1454 for (int msgIdx = 0; msgIdx < msgsGot; msgIdx++) {
1455 const struct msghdr* msgh = &msgVec[msgIdx].msg_hdr;
1456 unsigned int got = msgVec[msgIdx].msg_len;
1457 const ComboAddress& remote = recvData[msgIdx].remote;
1458
1459 if (got < sizeof(struct dnsheader)) {
1460 g_stats.nonCompliantQueries++;
1461 continue;
1462 }
1463
1464 processUDPQuery(*cs, holders, msgh, remote, recvData[msgIdx].dest, recvData[msgIdx].packet, static_cast<uint16_t>(got), sizeof(recvData[msgIdx].packet), outMsgVec.get(), &msgsToSend, &recvData[msgIdx].iov, recvData[msgIdx].cbuf);
1465
1466 }
1467
1468 /* immediate (not delayed or sent to a backend) responses (mostly from a rule, dynamic block
1469 or the cache) can be sent in batch too */
1470
1471 if (msgsToSend > 0 && msgsToSend <= static_cast<unsigned int>(msgsGot)) {
1472 int sent = sendmmsg(cs->udpFD, outMsgVec.get(), msgsToSend, 0);
1473
1474 if (sent < 0 || static_cast<unsigned int>(sent) != msgsToSend) {
1475 vinfolog("Error sending responses with sendmmsg() (%d on %u): %s", sent, msgsToSend, strerror(errno));
1476 }
1477 }
1478
1479 }
1480 }
1481 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1482
1483 // listens to incoming queries, sends out to downstream servers, noting the intended return path
1484 static void* udpClientThread(ClientState* cs)
1485 try
1486 {
1487 LocalHolders holders;
1488
1489 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1490 if (g_udpVectorSize > 1) {
1491 MultipleMessagesUDPClientThread(cs, holders);
1492
1493 }
1494 else
1495 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1496 {
1497 char packet[4096];
1498 /* the actual buffer is larger because:
1499 - we may have to add EDNS and/or ECS
1500 - we use it for self-generated responses (from rule or cache)
1501 but we only accept incoming payloads up to that size
1502 */
1503 static_assert(s_udpIncomingBufferSize <= sizeof(packet), "the incoming buffer size should not be larger than sizeof(MMReceiver::packet)");
1504 struct msghdr msgh;
1505 struct iovec iov;
1506 /* used by HarvestDestinationAddress */
1507 char cbuf[256];
1508
1509 ComboAddress remote;
1510 ComboAddress dest;
1511 remote.sin4.sin_family = cs->local.sin4.sin_family;
1512 fillMSGHdr(&msgh, &iov, cbuf, sizeof(cbuf), packet, sizeof(packet), &remote);
1513
1514 for(;;) {
1515 ssize_t got = recvmsg(cs->udpFD, &msgh, 0);
1516
1517 if (got < 0 || static_cast<size_t>(got) < sizeof(struct dnsheader)) {
1518 g_stats.nonCompliantQueries++;
1519 continue;
1520 }
1521
1522 processUDPQuery(*cs, holders, &msgh, remote, dest, packet, static_cast<uint16_t>(got), s_udpIncomingBufferSize, nullptr, nullptr, nullptr, nullptr);
1523 }
1524 }
1525
1526 return nullptr;
1527 }
1528 catch(const std::exception &e)
1529 {
1530 errlog("UDP client thread died because of exception: %s", e.what());
1531 return nullptr;
1532 }
1533 catch(const PDNSException &e)
1534 {
1535 errlog("UDP client thread died because of PowerDNS exception: %s", e.reason);
1536 return nullptr;
1537 }
1538 catch(...)
1539 {
1540 errlog("UDP client thread died because of an exception: %s", "unknown");
1541 return nullptr;
1542 }
1543
1544 static bool upCheck(DownstreamState& ds)
1545 try
1546 {
1547 vector<uint8_t> packet;
1548 DNSPacketWriter dpw(packet, ds.checkName, ds.checkType.getCode());
1549 dnsheader * requestHeader = dpw.getHeader();
1550 requestHeader->rd=true;
1551 if (ds.setCD) {
1552 requestHeader->cd = true;
1553 }
1554
1555 Socket sock(ds.remote.sin4.sin_family, SOCK_DGRAM);
1556 sock.setNonBlocking();
1557 if (!IsAnyAddress(ds.sourceAddr)) {
1558 sock.setReuseAddr();
1559 sock.bind(ds.sourceAddr);
1560 }
1561 sock.connect(ds.remote);
1562 ssize_t sent = udpClientSendRequestToBackend(&ds, sock.getHandle(), (char*)&packet[0], packet.size());
1563 if (sent < 0) {
1564 int ret = errno;
1565 if (g_verboseHealthChecks)
1566 infolog("Error while sending a health check query to backend %s: %d", ds.getNameWithAddr(), ret);
1567 return false;
1568 }
1569
1570 int ret=waitForRWData(sock.getHandle(), true, 1, 0);
1571 if(ret < 0 || !ret) { // error, timeout, both are down!
1572 if (ret < 0) {
1573 ret = errno;
1574 if (g_verboseHealthChecks)
1575 infolog("Error while waiting for the health check response from backend %s: %d", ds.getNameWithAddr(), ret);
1576 }
1577 else {
1578 if (g_verboseHealthChecks)
1579 infolog("Timeout while waiting for the health check response from backend %s", ds.getNameWithAddr());
1580 }
1581 return false;
1582 }
1583
1584 string reply;
1585 sock.recvFrom(reply, ds.remote);
1586
1587 const dnsheader * responseHeader = (const dnsheader *) reply.c_str();
1588
1589 if (reply.size() < sizeof(*responseHeader)) {
1590 if (g_verboseHealthChecks)
1591 infolog("Invalid health check response of size %d from backend %s, expecting at least %d", reply.size(), ds.getNameWithAddr(), sizeof(*responseHeader));
1592 return false;
1593 }
1594
1595 if (responseHeader->id != requestHeader->id) {
1596 if (g_verboseHealthChecks)
1597 infolog("Invalid health check response id %d from backend %s, expecting %d", responseHeader->id, ds.getNameWithAddr(), requestHeader->id);
1598 return false;
1599 }
1600
1601 if (!responseHeader->qr) {
1602 if (g_verboseHealthChecks)
1603 infolog("Invalid health check response from backend %s, expecting QR to be set", ds.getNameWithAddr());
1604 return false;
1605 }
1606
1607 if (responseHeader->rcode == RCode::ServFail) {
1608 if (g_verboseHealthChecks)
1609 infolog("Backend %s responded to health check with ServFail", ds.getNameWithAddr());
1610 return false;
1611 }
1612
1613 if (ds.mustResolve && (responseHeader->rcode == RCode::NXDomain || responseHeader->rcode == RCode::Refused)) {
1614 if (g_verboseHealthChecks)
1615 infolog("Backend %s responded to health check with %s while mustResolve is set", ds.getNameWithAddr(), responseHeader->rcode == RCode::NXDomain ? "NXDomain" : "Refused");
1616 return false;
1617 }
1618
1619 // XXX fixme do bunch of checking here etc
1620 return true;
1621 }
1622 catch(const std::exception& e)
1623 {
1624 if (g_verboseHealthChecks)
1625 infolog("Error checking the health of backend %s: %s", ds.getNameWithAddr(), e.what());
1626 return false;
1627 }
1628 catch(...)
1629 {
1630 if (g_verboseHealthChecks)
1631 infolog("Unknown exception while checking the health of backend %s", ds.getNameWithAddr());
1632 return false;
1633 }
1634
1635 uint64_t g_maxTCPClientThreads{10};
1636 std::atomic<uint16_t> g_cacheCleaningDelay{60};
1637 std::atomic<uint16_t> g_cacheCleaningPercentage{100};
1638
1639 void* maintThread()
1640 {
1641 int interval = 1;
1642 size_t counter = 0;
1643 int32_t secondsToWaitLog = 0;
1644
1645 for(;;) {
1646 sleep(interval);
1647
1648 {
1649 std::lock_guard<std::mutex> lock(g_luamutex);
1650 auto f = g_lua.readVariable<boost::optional<std::function<void()> > >("maintenance");
1651 if(f) {
1652 try {
1653 (*f)();
1654 secondsToWaitLog = 0;
1655 }
1656 catch(std::exception &e) {
1657 if (secondsToWaitLog <= 0) {
1658 infolog("Error during execution of maintenance function: %s", e.what());
1659 secondsToWaitLog = 61;
1660 }
1661 secondsToWaitLog -= interval;
1662 }
1663 }
1664 }
1665
1666 counter++;
1667 if (counter >= g_cacheCleaningDelay) {
1668 const auto localPools = g_pools.getCopy();
1669 std::shared_ptr<DNSDistPacketCache> packetCache = nullptr;
1670 for (const auto& entry : localPools) {
1671 {
1672 std::lock_guard<std::mutex> lock(g_luamutex);
1673 packetCache = entry.second->packetCache;
1674 }
1675 if (packetCache) {
1676 size_t upTo = (packetCache->getMaxEntries()* (100 - g_cacheCleaningPercentage)) / 100;
1677 packetCache->purgeExpired(upTo);
1678 }
1679 }
1680 counter = 0;
1681 }
1682
1683 // ponder pruning g_dynblocks of expired entries here
1684 }
1685 return 0;
1686 }
1687
1688 void* healthChecksThread()
1689 {
1690 int interval = 1;
1691
1692 for(;;) {
1693 sleep(interval);
1694
1695 if(g_tcpclientthreads->getQueuedCount() > 1 && !g_tcpclientthreads->hasReachedMaxThreads())
1696 g_tcpclientthreads->addTCPClientThread();
1697
1698 for(auto& dss : g_dstates.getCopy()) { // this points to the actual shared_ptrs!
1699 if(dss->availability==DownstreamState::Availability::Auto) {
1700 bool newState=upCheck(*dss);
1701 if (newState) {
1702 if (dss->currentCheckFailures != 0) {
1703 dss->currentCheckFailures = 0;
1704 }
1705 }
1706 else if (!newState && dss->upStatus) {
1707 dss->currentCheckFailures++;
1708 if (dss->currentCheckFailures < dss->maxCheckFailures) {
1709 newState = true;
1710 }
1711 }
1712
1713 if(newState != dss->upStatus) {
1714 warnlog("Marking downstream %s as '%s'", dss->getNameWithAddr(), newState ? "up" : "down");
1715
1716 if (newState && !dss->connected) {
1717 try {
1718 SConnect(dss->fd, dss->remote);
1719 dss->connected = true;
1720 dss->tid = thread(responderThread, dss);
1721 }
1722 catch(const std::runtime_error& error) {
1723 infolog("Error connecting to new server with address %s: %s", dss->remote.toStringWithPort(), error.what());
1724 newState = false;
1725 dss->connected = false;
1726 }
1727 }
1728
1729 dss->upStatus = newState;
1730 dss->currentCheckFailures = 0;
1731 if (g_snmpAgent && g_snmpTrapsEnabled) {
1732 g_snmpAgent->sendBackendStatusChangeTrap(dss);
1733 }
1734 }
1735 }
1736
1737 auto delta = dss->sw.udiffAndSet()/1000000.0;
1738 dss->queryLoad = 1.0*(dss->queries.load() - dss->prev.queries.load())/delta;
1739 dss->dropRate = 1.0*(dss->reuseds.load() - dss->prev.reuseds.load())/delta;
1740 dss->prev.queries.store(dss->queries.load());
1741 dss->prev.reuseds.store(dss->reuseds.load());
1742
1743 for(IDState& ids : dss->idStates) { // timeouts
1744 if(ids.origFD >=0 && ids.age++ > g_udpTimeout) {
1745 /* We set origFD to -1 as soon as possible
1746 to limit the risk of racing with the
1747 responder thread.
1748 The UDP client thread only checks origFD to
1749 know whether outstanding has to be incremented,
1750 so the sooner the better any way since we _will_
1751 decrement it.
1752 */
1753 ids.origFD = -1;
1754 ids.age = 0;
1755 dss->reuseds++;
1756 --dss->outstanding;
1757 g_stats.downstreamTimeouts++; // this is an 'actively' discovered timeout
1758 vinfolog("Had a downstream timeout from %s (%s) for query for %s|%s from %s",
1759 dss->remote.toStringWithPort(), dss->name,
1760 ids.qname.toString(), QType(ids.qtype).getName(), ids.origRemote.toStringWithPort());
1761
1762 struct timespec ts;
1763 gettime(&ts);
1764
1765 struct dnsheader fake;
1766 memset(&fake, 0, sizeof(fake));
1767 fake.id = ids.origID;
1768
1769 std::lock_guard<std::mutex> lock(g_rings.respMutex);
1770 g_rings.respRing.push_back({ts, ids.origRemote, ids.qname, ids.qtype, std::numeric_limits<unsigned int>::max(), 0, fake, dss->remote});
1771 }
1772 }
1773 }
1774 }
1775 return 0;
1776 }
1777
1778 string g_key;
1779
1780
1781 void controlThread(int fd, ComboAddress local)
1782 try
1783 {
1784 ComboAddress client;
1785 int sock;
1786 warnlog("Accepting control connections on %s", local.toStringWithPort());
1787 while((sock=SAccept(fd, client)) >= 0) {
1788 if (g_logConsoleConnections) {
1789 warnlog("Got control connection from %s", client.toStringWithPort());
1790 }
1791
1792 thread t(controlClientThread, sock, client);
1793 t.detach();
1794 }
1795 }
1796 catch(std::exception& e)
1797 {
1798 close(fd);
1799 errlog("Control connection died: %s", e.what());
1800 }
1801
1802
1803
1804 static void bindAny(int af, int sock)
1805 {
1806 int one = 1;
1807
1808 #ifdef IP_FREEBIND
1809 if (setsockopt(sock, IPPROTO_IP, IP_FREEBIND, &one, sizeof(one)) < 0)
1810 warnlog("Warning: IP_FREEBIND setsockopt failed: %s", strerror(errno));
1811 #endif
1812
1813 #ifdef IP_BINDANY
1814 if (af == AF_INET)
1815 if (setsockopt(sock, IPPROTO_IP, IP_BINDANY, &one, sizeof(one)) < 0)
1816 warnlog("Warning: IP_BINDANY setsockopt failed: %s", strerror(errno));
1817 #endif
1818 #ifdef IPV6_BINDANY
1819 if (af == AF_INET6)
1820 if (setsockopt(sock, IPPROTO_IPV6, IPV6_BINDANY, &one, sizeof(one)) < 0)
1821 warnlog("Warning: IPV6_BINDANY setsockopt failed: %s", strerror(errno));
1822 #endif
1823 #ifdef SO_BINDANY
1824 if (setsockopt(sock, SOL_SOCKET, SO_BINDANY, &one, sizeof(one)) < 0)
1825 warnlog("Warning: SO_BINDANY setsockopt failed: %s", strerror(errno));
1826 #endif
1827 }
1828
1829 static void dropGroupPrivs(gid_t gid)
1830 {
1831 if (gid) {
1832 if (setgid(gid) == 0) {
1833 if (setgroups(0, NULL) < 0) {
1834 warnlog("Warning: Unable to drop supplementary gids: %s", strerror(errno));
1835 }
1836 }
1837 else {
1838 warnlog("Warning: Unable to set group ID to %d: %s", gid, strerror(errno));
1839 }
1840 }
1841 }
1842
1843 static void dropUserPrivs(uid_t uid)
1844 {
1845 if(uid) {
1846 if(setuid(uid) < 0) {
1847 warnlog("Warning: Unable to set user ID to %d: %s", uid, strerror(errno));
1848 }
1849 }
1850 }
1851
1852 static void checkFileDescriptorsLimits(size_t udpBindsCount, size_t tcpBindsCount)
1853 {
1854 /* stdin, stdout, stderr */
1855 size_t requiredFDsCount = 3;
1856 size_t backendsCount = g_dstates.getCopy().size();
1857 /* listening sockets */
1858 requiredFDsCount += udpBindsCount;
1859 requiredFDsCount += tcpBindsCount;
1860 /* max TCP connections currently served */
1861 requiredFDsCount += g_maxTCPClientThreads;
1862 /* max pipes for communicating between TCP acceptors and client threads */
1863 requiredFDsCount += (g_maxTCPClientThreads * 2);
1864 /* UDP sockets to backends */
1865 requiredFDsCount += backendsCount;
1866 /* TCP sockets to backends */
1867 requiredFDsCount += (backendsCount * g_maxTCPClientThreads);
1868 /* max TCP queued connections */
1869 requiredFDsCount += g_maxTCPQueuedConnections;
1870 /* DelayPipe pipe */
1871 requiredFDsCount += 2;
1872 /* syslog socket */
1873 requiredFDsCount++;
1874 /* webserver main socket */
1875 requiredFDsCount++;
1876 /* console main socket */
1877 requiredFDsCount++;
1878 /* carbon export */
1879 requiredFDsCount++;
1880 /* history file */
1881 requiredFDsCount++;
1882 struct rlimit rl;
1883 getrlimit(RLIMIT_NOFILE, &rl);
1884 if (rl.rlim_cur <= requiredFDsCount) {
1885 warnlog("Warning, this configuration can use more than %d file descriptors, web server and console connections not included, and the current limit is %d.", std::to_string(requiredFDsCount), std::to_string(rl.rlim_cur));
1886 #ifdef HAVE_SYSTEMD
1887 warnlog("You can increase this value by using LimitNOFILE= in the systemd unit file or ulimit.");
1888 #else
1889 warnlog("You can increase this value by using ulimit.");
1890 #endif
1891 }
1892 }
1893
1894 struct
1895 {
1896 vector<string> locals;
1897 vector<string> remotes;
1898 bool checkConfig{false};
1899 bool beDaemon{false};
1900 bool beClient{false};
1901 bool beSupervised{false};
1902 string pidfile;
1903 string command;
1904 string config;
1905 string uid;
1906 string gid;
1907 } g_cmdLine;
1908
1909 std::atomic<bool> g_configurationDone{false};
1910
1911 int main(int argc, char** argv)
1912 try
1913 {
1914 size_t udpBindsCount = 0;
1915 size_t tcpBindsCount = 0;
1916 rl_attempted_completion_function = my_completion;
1917 rl_completion_append_character = 0;
1918
1919 signal(SIGPIPE, SIG_IGN);
1920 signal(SIGCHLD, SIG_IGN);
1921 openlog("dnsdist", LOG_PID, LOG_DAEMON);
1922 g_console=true;
1923
1924 #ifdef HAVE_LIBSODIUM
1925 if (sodium_init() == -1) {
1926 cerr<<"Unable to initialize crypto library"<<endl;
1927 exit(EXIT_FAILURE);
1928 }
1929 g_hashperturb=randombytes_uniform(0xffffffff);
1930 srandom(randombytes_uniform(0xffffffff));
1931 #else
1932 {
1933 struct timeval tv;
1934 gettimeofday(&tv, 0);
1935 srandom(tv.tv_sec ^ tv.tv_usec ^ getpid());
1936 g_hashperturb=random();
1937 }
1938
1939 #endif
1940 ComboAddress clientAddress = ComboAddress();
1941 g_cmdLine.config=SYSCONFDIR "/dnsdist.conf";
1942 struct option longopts[]={
1943 {"acl", required_argument, 0, 'a'},
1944 {"config", required_argument, 0, 'C'},
1945 {"check-config", 0, 0, 1},
1946 {"execute", required_argument, 0, 'e'},
1947 {"client", 0, 0, 'c'},
1948 {"gid", required_argument, 0, 'g'},
1949 #ifdef HAVE_LIBSODIUM
1950 {"setkey", required_argument, 0, 'k'},
1951 #endif
1952 {"local", required_argument, 0, 'l'},
1953 {"daemon", 0, 0, 'd'},
1954 {"pidfile", required_argument, 0, 'p'},
1955 {"supervised", 0, 0, 's'},
1956 {"disable-syslog", 0, 0, 2},
1957 {"uid", required_argument, 0, 'u'},
1958 {"verbose", 0, 0, 'v'},
1959 {"version", 0, 0, 'V'},
1960 {"help", 0, 0, 'h'},
1961 {0,0,0,0}
1962 };
1963 int longindex=0;
1964 string optstring;
1965 for(;;) {
1966 #ifdef HAVE_LIBSODIUM
1967 int c=getopt_long(argc, argv, "a:hcde:C:k:l:vp:g:u:V", longopts, &longindex);
1968 #else
1969 int c=getopt_long(argc, argv, "a:hcde:C:l:vp:g:u:V", longopts, &longindex);
1970 #endif
1971 if(c==-1)
1972 break;
1973 switch(c) {
1974 case 1:
1975 g_cmdLine.checkConfig=true;
1976 break;
1977 case 2:
1978 g_syslog=false;
1979 break;
1980 case 'C':
1981 g_cmdLine.config=optarg;
1982 break;
1983 case 'c':
1984 g_cmdLine.beClient=true;
1985 break;
1986 case 'd':
1987 g_cmdLine.beDaemon=true;
1988 break;
1989 case 'e':
1990 g_cmdLine.command=optarg;
1991 break;
1992 case 'g':
1993 g_cmdLine.gid=optarg;
1994 break;
1995 case 'h':
1996 cout<<"dnsdist "<<VERSION<<endl;
1997 cout<<endl;
1998 cout<<"Syntax: dnsdist [-C,--config file] [-c,--client [IP[:PORT]]] [-d,--daemon]\n";
1999 cout<<"[-p,--pidfile file] [-e,--execute cmd] [-h,--help] [-l,--local addr]\n";
2000 cout<<"[-v,--verbose] [--check-config]\n";
2001 cout<<"\n";
2002 cout<<"-a,--acl netmask Add this netmask to the ACL\n";
2003 cout<<"-C,--config file Load configuration from 'file'\n";
2004 cout<<"-c,--client Operate as a client, connect to dnsdist. This reads\n";
2005 cout<<" controlSocket from your configuration file, but also\n";
2006 cout<<" accepts an IP:PORT argument\n";
2007 #ifdef HAVE_LIBSODIUM
2008 cout<<"-k,--setkey KEY Use KEY for encrypted communication to dnsdist. This\n";
2009 cout<<" is similar to setting setKey in the configuration file.\n";
2010 cout<<" NOTE: this will leak this key in your shell's history!\n";
2011 #endif
2012 cout<<"--check-config Validate the configuration file and exit. The exit-code\n";
2013 cout<<" reflects the validation, 0 is OK, 1 means an error.\n";
2014 cout<<" Any errors are printed as well.\n";
2015 cout<<"-d,--daemon Operate as a daemon\n";
2016 cout<<"-e,--execute cmd Connect to dnsdist and execute 'cmd'\n";
2017 cout<<"-g,--gid gid Change the process group ID after binding sockets\n";
2018 cout<<"-h,--help Display this helpful message\n";
2019 cout<<"-l,--local address Listen on this local address\n";
2020 cout<<"--supervised Don't open a console, I'm supervised\n";
2021 cout<<" (use with e.g. systemd and daemontools)\n";
2022 cout<<"--disable-syslog Don't log to syslog, only to stdout\n";
2023 cout<<" (use with e.g. systemd)\n";
2024 cout<<"-p,--pidfile file Write a pidfile, works only with --daemon\n";
2025 cout<<"-u,--uid uid Change the process user ID after binding sockets\n";
2026 cout<<"-v,--verbose Enable verbose mode\n";
2027 cout<<"\n";
2028 exit(EXIT_SUCCESS);
2029 break;
2030 case 'a':
2031 optstring=optarg;
2032 g_ACL.modify([optstring](NetmaskGroup& nmg) { nmg.addMask(optstring); });
2033 break;
2034 #ifdef HAVE_LIBSODIUM
2035 case 'k':
2036 if (B64Decode(string(optarg), g_key) < 0) {
2037 cerr<<"Unable to decode key '"<<optarg<<"'."<<endl;
2038 exit(EXIT_FAILURE);
2039 }
2040 break;
2041 #endif
2042 case 'l':
2043 g_cmdLine.locals.push_back(trim_copy(string(optarg)));
2044 break;
2045 case 'p':
2046 g_cmdLine.pidfile=optarg;
2047 break;
2048 case 's':
2049 g_cmdLine.beSupervised=true;
2050 break;
2051 case 'u':
2052 g_cmdLine.uid=optarg;
2053 break;
2054 case 'v':
2055 g_verbose=true;
2056 break;
2057 case 'V':
2058 #ifdef LUAJIT_VERSION
2059 cout<<"dnsdist "<<VERSION<<" ("<<LUA_RELEASE<<" ["<<LUAJIT_VERSION<<"])"<<endl;
2060 #else
2061 cout<<"dnsdist "<<VERSION<<" ("<<LUA_RELEASE<<")"<<endl;
2062 #endif
2063 cout<<"Enabled features: ";
2064 #ifdef HAVE_DNSCRYPT
2065 cout<<"dnscrypt ";
2066 #endif
2067 #ifdef HAVE_EBPF
2068 cout<<"ebpf ";
2069 #endif
2070 #ifdef HAVE_LIBSODIUM
2071 cout<<"libsodium ";
2072 #endif
2073 #ifdef HAVE_PROTOBUF
2074 cout<<"protobuf ";
2075 #endif
2076 #ifdef HAVE_RE2
2077 cout<<"re2 ";
2078 #endif
2079 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
2080 cout<<"recvmmsg/sendmmsg ";
2081 #endif
2082 #ifdef HAVE_NET_SNMP
2083 cout<<"snmp ";
2084 #endif
2085 #ifdef HAVE_SYSTEMD
2086 cout<<"systemd";
2087 #endif
2088 cout<<endl;
2089 exit(EXIT_SUCCESS);
2090 break;
2091 }
2092 }
2093
2094 argc-=optind;
2095 argv+=optind;
2096 for(auto p = argv; *p; ++p) {
2097 if(g_cmdLine.beClient) {
2098 clientAddress = ComboAddress(*p, 5199);
2099 } else {
2100 g_cmdLine.remotes.push_back(*p);
2101 }
2102 }
2103
2104 ServerPolicy leastOutstandingPol{"leastOutstanding", leastOutstanding};
2105
2106 g_policy.setState(leastOutstandingPol);
2107 if(g_cmdLine.beClient || !g_cmdLine.command.empty()) {
2108 setupLua(true, g_cmdLine.config);
2109 if (clientAddress != ComboAddress())
2110 g_serverControl = clientAddress;
2111 doClient(g_serverControl, g_cmdLine.command);
2112 _exit(EXIT_SUCCESS);
2113 }
2114
2115 auto acl = g_ACL.getCopy();
2116 if(acl.empty()) {
2117 for(auto& addr : {"127.0.0.0/8", "10.0.0.0/8", "100.64.0.0/10", "169.254.0.0/16", "192.168.0.0/16", "172.16.0.0/12", "::1/128", "fc00::/7", "fe80::/10"})
2118 acl.addMask(addr);
2119 g_ACL.setState(acl);
2120 }
2121
2122 if (g_cmdLine.checkConfig) {
2123 setupLua(true, g_cmdLine.config);
2124 // No exception was thrown
2125 infolog("Configuration '%s' OK!", g_cmdLine.config);
2126 _exit(EXIT_SUCCESS);
2127 }
2128
2129 auto todo=setupLua(false, g_cmdLine.config);
2130
2131 if(g_cmdLine.locals.size()) {
2132 g_locals.clear();
2133 for(auto loc : g_cmdLine.locals)
2134 g_locals.push_back(std::make_tuple(ComboAddress(loc, 53), true, false, 0, "", std::set<int>()));
2135 }
2136
2137 if(g_locals.empty())
2138 g_locals.push_back(std::make_tuple(ComboAddress("127.0.0.1", 53), true, false, 0, "", std::set<int>()));
2139
2140 g_configurationDone = true;
2141
2142 vector<ClientState*> toLaunch;
2143 for(const auto& local : g_locals) {
2144 ClientState* cs = new ClientState;
2145 cs->local= std::get<0>(local);
2146 cs->udpFD = SSocket(cs->local.sin4.sin_family, SOCK_DGRAM, 0);
2147 if(cs->local.sin4.sin_family == AF_INET6) {
2148 SSetsockopt(cs->udpFD, IPPROTO_IPV6, IPV6_V6ONLY, 1);
2149 }
2150 //if(g_vm.count("bind-non-local"))
2151 bindAny(cs->local.sin4.sin_family, cs->udpFD);
2152
2153 // if (!setSocketTimestamps(cs->udpFD))
2154 // L<<Logger::Warning<<"Unable to enable timestamp reporting for socket"<<endl;
2155
2156
2157 if(IsAnyAddress(cs->local)) {
2158 int one=1;
2159 setsockopt(cs->udpFD, IPPROTO_IP, GEN_IP_PKTINFO, &one, sizeof(one)); // linux supports this, so why not - might fail on other systems
2160 #ifdef IPV6_RECVPKTINFO
2161 setsockopt(cs->udpFD, IPPROTO_IPV6, IPV6_RECVPKTINFO, &one, sizeof(one));
2162 #endif
2163 }
2164
2165 if (std::get<2>(local)) {
2166 #ifdef SO_REUSEPORT
2167 SSetsockopt(cs->udpFD, SOL_SOCKET, SO_REUSEPORT, 1);
2168 #else
2169 warnlog("SO_REUSEPORT has been configured on local address '%s' but is not supported", std::get<0>(local).toStringWithPort());
2170 #endif
2171 }
2172
2173 const std::string& itf = std::get<4>(local);
2174 if (!itf.empty()) {
2175 #ifdef SO_BINDTODEVICE
2176 int res = setsockopt(cs->udpFD, SOL_SOCKET, SO_BINDTODEVICE, itf.c_str(), itf.length());
2177 if (res != 0) {
2178 warnlog("Error setting up the interface on local address '%s': %s", std::get<0>(local).toStringWithPort(), strerror(errno));
2179 }
2180 #else
2181 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", std::get<0>(local).toStringWithPort());
2182 #endif
2183 }
2184
2185 #ifdef HAVE_EBPF
2186 if (g_defaultBPFFilter) {
2187 cs->attachFilter(g_defaultBPFFilter);
2188 vinfolog("Attaching default BPF Filter to UDP frontend %s", cs->local.toStringWithPort());
2189 }
2190 #endif /* HAVE_EBPF */
2191
2192 cs->cpus = std::get<5>(local);
2193
2194 SBind(cs->udpFD, cs->local);
2195 toLaunch.push_back(cs);
2196 g_frontends.push_back(cs);
2197 udpBindsCount++;
2198 }
2199
2200 for(const auto& local : g_locals) {
2201 if(!std::get<1>(local)) { // no TCP/IP
2202 warnlog("Not providing TCP/IP service on local address '%s'", std::get<0>(local).toStringWithPort());
2203 continue;
2204 }
2205 ClientState* cs = new ClientState;
2206 cs->local= std::get<0>(local);
2207
2208 cs->tcpFD = SSocket(cs->local.sin4.sin_family, SOCK_STREAM, 0);
2209
2210 SSetsockopt(cs->tcpFD, SOL_SOCKET, SO_REUSEADDR, 1);
2211 #ifdef TCP_DEFER_ACCEPT
2212 SSetsockopt(cs->tcpFD, SOL_TCP,TCP_DEFER_ACCEPT, 1);
2213 #endif
2214 if (std::get<3>(local) > 0) {
2215 #ifdef TCP_FASTOPEN
2216 SSetsockopt(cs->tcpFD, IPPROTO_TCP, TCP_FASTOPEN, std::get<3>(local));
2217 #else
2218 warnlog("TCP Fast Open has been configured on local address '%s' but is not supported", std::get<0>(local).toStringWithPort());
2219 #endif
2220 }
2221 if(cs->local.sin4.sin_family == AF_INET6) {
2222 SSetsockopt(cs->tcpFD, IPPROTO_IPV6, IPV6_V6ONLY, 1);
2223 }
2224 #ifdef SO_REUSEPORT
2225 /* no need to warn again if configured but support is not available, we already did for UDP */
2226 if (std::get<2>(local)) {
2227 SSetsockopt(cs->tcpFD, SOL_SOCKET, SO_REUSEPORT, 1);
2228 }
2229 #endif
2230
2231 const std::string& itf = std::get<4>(local);
2232 if (!itf.empty()) {
2233 #ifdef SO_BINDTODEVICE
2234 int res = setsockopt(cs->tcpFD, SOL_SOCKET, SO_BINDTODEVICE, itf.c_str(), itf.length());
2235 if (res != 0) {
2236 warnlog("Error setting up the interface on local address '%s': %s", std::get<0>(local).toStringWithPort(), strerror(errno));
2237 }
2238 #else
2239 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", std::get<0>(local).toStringWithPort());
2240 #endif
2241 }
2242
2243 #ifdef HAVE_EBPF
2244 if (g_defaultBPFFilter) {
2245 cs->attachFilter(g_defaultBPFFilter);
2246 vinfolog("Attaching default BPF Filter to TCP frontend %s", cs->local.toStringWithPort());
2247 }
2248 #endif /* HAVE_EBPF */
2249
2250 // if(g_vm.count("bind-non-local"))
2251 bindAny(cs->local.sin4.sin_family, cs->tcpFD);
2252 SBind(cs->tcpFD, cs->local);
2253 SListen(cs->tcpFD, 64);
2254 warnlog("Listening on %s", cs->local.toStringWithPort());
2255
2256 toLaunch.push_back(cs);
2257 g_frontends.push_back(cs);
2258 tcpBindsCount++;
2259 }
2260
2261 #ifdef HAVE_DNSCRYPT
2262 for(auto& dcLocal : g_dnsCryptLocals) {
2263 ClientState* cs = new ClientState;
2264 cs->local = std::get<0>(dcLocal);
2265 cs->dnscryptCtx = &(std::get<1>(dcLocal));
2266 cs->udpFD = SSocket(cs->local.sin4.sin_family, SOCK_DGRAM, 0);
2267 if(cs->local.sin4.sin_family == AF_INET6) {
2268 SSetsockopt(cs->udpFD, IPPROTO_IPV6, IPV6_V6ONLY, 1);
2269 }
2270 bindAny(cs->local.sin4.sin_family, cs->udpFD);
2271 if(IsAnyAddress(cs->local)) {
2272 int one=1;
2273 setsockopt(cs->udpFD, IPPROTO_IP, GEN_IP_PKTINFO, &one, sizeof(one)); // linux supports this, so why not - might fail on other systems
2274 #ifdef IPV6_RECVPKTINFO
2275 setsockopt(cs->udpFD, IPPROTO_IPV6, IPV6_RECVPKTINFO, &one, sizeof(one));
2276 #endif
2277 }
2278 if (std::get<2>(dcLocal)) {
2279 #ifdef SO_REUSEPORT
2280 SSetsockopt(cs->udpFD, SOL_SOCKET, SO_REUSEPORT, 1);
2281 #else
2282 warnlog("SO_REUSEPORT has been configured on local address '%s' but is not supported", std::get<0>(dcLocal).toStringWithPort());
2283 #endif
2284 }
2285
2286 const std::string& itf = std::get<4>(dcLocal);
2287 if (!itf.empty()) {
2288 #ifdef SO_BINDTODEVICE
2289 int res = setsockopt(cs->udpFD, SOL_SOCKET, SO_BINDTODEVICE, itf.c_str(), itf.length());
2290 if (res != 0) {
2291 warnlog("Error setting up the interface on local address '%s': %s", std::get<0>(dcLocal).toStringWithPort(), strerror(errno));
2292 }
2293 #else
2294 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", std::get<0>(dcLocal).toStringWithPort());
2295 #endif
2296 }
2297
2298 #ifdef HAVE_EBPF
2299 if (g_defaultBPFFilter) {
2300 cs->attachFilter(g_defaultBPFFilter);
2301 vinfolog("Attaching default BPF Filter to UDP DNSCrypt frontend %s", cs->local.toStringWithPort());
2302 }
2303 #endif /* HAVE_EBPF */
2304 SBind(cs->udpFD, cs->local);
2305 toLaunch.push_back(cs);
2306 g_frontends.push_back(cs);
2307 udpBindsCount++;
2308
2309 cs = new ClientState;
2310 cs->local = std::get<0>(dcLocal);
2311 cs->dnscryptCtx = &(std::get<1>(dcLocal));
2312 cs->tcpFD = SSocket(cs->local.sin4.sin_family, SOCK_STREAM, 0);
2313 SSetsockopt(cs->tcpFD, SOL_SOCKET, SO_REUSEADDR, 1);
2314 #ifdef TCP_DEFER_ACCEPT
2315 SSetsockopt(cs->tcpFD, SOL_TCP,TCP_DEFER_ACCEPT, 1);
2316 #endif
2317 if (std::get<3>(dcLocal) > 0) {
2318 #ifdef TCP_FASTOPEN
2319 SSetsockopt(cs->tcpFD, IPPROTO_TCP, TCP_FASTOPEN, std::get<3>(dcLocal));
2320 #else
2321 warnlog("TCP Fast Open has been configured on local address '%s' but is not supported", std::get<0>(dcLocal).toStringWithPort());
2322 #endif
2323 }
2324
2325 #ifdef SO_REUSEPORT
2326 /* no need to warn again if configured but support is not available, we already did for UDP */
2327 if (std::get<2>(dcLocal)) {
2328 SSetsockopt(cs->tcpFD, SOL_SOCKET, SO_REUSEPORT, 1);
2329 }
2330 #endif
2331
2332 if (!itf.empty()) {
2333 #ifdef SO_BINDTODEVICE
2334 int res = setsockopt(cs->tcpFD, SOL_SOCKET, SO_BINDTODEVICE, itf.c_str(), itf.length());
2335 if (res != 0) {
2336 warnlog("Error setting up the interface on local address '%s': %s", std::get<0>(dcLocal).toStringWithPort(), strerror(errno));
2337 }
2338 #else
2339 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", std::get<0>(dcLocal).toStringWithPort());
2340 #endif
2341 }
2342
2343 if(cs->local.sin4.sin_family == AF_INET6) {
2344 SSetsockopt(cs->tcpFD, IPPROTO_IPV6, IPV6_V6ONLY, 1);
2345 }
2346 #ifdef HAVE_EBPF
2347 if (g_defaultBPFFilter) {
2348 cs->attachFilter(g_defaultBPFFilter);
2349 vinfolog("Attaching default BPF Filter to TCP DNSCrypt frontend %s", cs->local.toStringWithPort());
2350 }
2351 #endif /* HAVE_EBPF */
2352
2353 cs->cpus = std::get<5>(dcLocal);
2354
2355 bindAny(cs->local.sin4.sin_family, cs->tcpFD);
2356 SBind(cs->tcpFD, cs->local);
2357 SListen(cs->tcpFD, 64);
2358 warnlog("Listening on %s", cs->local.toStringWithPort());
2359 toLaunch.push_back(cs);
2360 g_frontends.push_back(cs);
2361 tcpBindsCount++;
2362 }
2363 #endif
2364
2365 if(g_cmdLine.beDaemon) {
2366 g_console=false;
2367 daemonize();
2368 writepid(g_cmdLine.pidfile);
2369 }
2370 else {
2371 vinfolog("Running in the foreground");
2372 warnlog("dnsdist %s comes with ABSOLUTELY NO WARRANTY. This is free software, and you are welcome to redistribute it according to the terms of the GPL version 2", VERSION);
2373 vector<string> vec;
2374 std::string acls;
2375 g_ACL.getCopy().toStringVector(&vec);
2376 for(const auto& s : vec) {
2377 if (!acls.empty())
2378 acls += ", ";
2379 acls += s;
2380 }
2381 infolog("ACL allowing queries from: %s", acls.c_str());
2382 }
2383
2384 uid_t newgid=0;
2385 gid_t newuid=0;
2386
2387 if(!g_cmdLine.gid.empty())
2388 newgid = strToGID(g_cmdLine.gid.c_str());
2389
2390 if(!g_cmdLine.uid.empty())
2391 newuid = strToUID(g_cmdLine.uid.c_str());
2392
2393 dropGroupPrivs(newgid);
2394 dropUserPrivs(newuid);
2395
2396 /* this need to be done _after_ dropping privileges */
2397 g_delay = new DelayPipe<DelayedPacket>();
2398
2399 if (g_snmpAgent) {
2400 g_snmpAgent->run();
2401 }
2402
2403 g_tcpclientthreads = std::make_shared<TCPClientCollection>(g_maxTCPClientThreads, g_useTCPSinglePipe);
2404
2405 for(auto& t : todo)
2406 t();
2407
2408 auto localPools = g_pools.getCopy();
2409 /* create the default pool no matter what */
2410 createPoolIfNotExists(localPools, "");
2411 if(g_cmdLine.remotes.size()) {
2412 for(const auto& address : g_cmdLine.remotes) {
2413 auto ret=std::make_shared<DownstreamState>(ComboAddress(address, 53));
2414 addServerToPool(localPools, "", ret);
2415 if (ret->connected) {
2416 ret->tid = thread(responderThread, ret);
2417 }
2418 g_dstates.modify([ret](servers_t& servers) { servers.push_back(ret); });
2419 }
2420 }
2421 g_pools.setState(localPools);
2422
2423 if(g_dstates.getCopy().empty()) {
2424 errlog("No downstream servers defined: all packets will get dropped");
2425 // you might define them later, but you need to know
2426 }
2427
2428 checkFileDescriptorsLimits(udpBindsCount, tcpBindsCount);
2429
2430 for(auto& dss : g_dstates.getCopy()) { // it is a copy, but the internal shared_ptrs are the real deal
2431 if(dss->availability==DownstreamState::Availability::Auto) {
2432 bool newState=upCheck(*dss);
2433 warnlog("Marking downstream %s as '%s'", dss->getNameWithAddr(), newState ? "up" : "down");
2434 dss->upStatus = newState;
2435 }
2436 }
2437
2438 for(auto& cs : toLaunch) {
2439 if (cs->udpFD >= 0) {
2440 thread t1(udpClientThread, cs);
2441 if (!cs->cpus.empty()) {
2442 mapThreadToCPUList(t1.native_handle(), cs->cpus);
2443 }
2444 t1.detach();
2445 }
2446 else if (cs->tcpFD >= 0) {
2447 thread t1(tcpAcceptorThread, cs);
2448 if (!cs->cpus.empty()) {
2449 mapThreadToCPUList(t1.native_handle(), cs->cpus);
2450 }
2451 t1.detach();
2452 }
2453 }
2454
2455 thread carbonthread(carbonDumpThread);
2456 carbonthread.detach();
2457
2458 thread stattid(maintThread);
2459 stattid.detach();
2460
2461 thread healththread(healthChecksThread);
2462
2463 if(g_cmdLine.beDaemon || g_cmdLine.beSupervised) {
2464 #ifdef HAVE_SYSTEMD
2465 sd_notify(0, "READY=1");
2466 #endif
2467 healththread.join();
2468 }
2469 else {
2470 healththread.detach();
2471 doConsole();
2472 }
2473 _exit(EXIT_SUCCESS);
2474
2475 }
2476 catch(const LuaContext::ExecutionErrorException& e) {
2477 try {
2478 errlog("Fatal Lua error: %s", e.what());
2479 std::rethrow_if_nested(e);
2480 } catch(const std::exception& e) {
2481 errlog("Details: %s", e.what());
2482 }
2483 catch(PDNSException &ae)
2484 {
2485 errlog("Fatal pdns error: %s", ae.reason);
2486 }
2487 _exit(EXIT_FAILURE);
2488 }
2489 catch(std::exception &e)
2490 {
2491 errlog("Fatal error: %s", e.what());
2492 _exit(EXIT_FAILURE);
2493 }
2494 catch(PDNSException &ae)
2495 {
2496 errlog("Fatal pdns error: %s", ae.reason);
2497 _exit(EXIT_FAILURE);
2498 }