]> git.ipfire.org Git - thirdparty/pdns.git/blame - pdns/dnsdist.cc
rec: Don't shadow variables
[thirdparty/pdns.git] / pdns / dnsdist.cc
CommitLineData
24d5cb00 1/*
12471842
PL
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
8 *
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
df111b53 22#include "dnsdist.hh"
ca404e94 23#include "dnsdist-ecs.hh"
24d5cb00 24#include "sstuff.hh"
25#include "misc.hh"
24d5cb00 26#include <netinet/tcp.h>
2c5db3f7 27#include <limits>
64e4ebb4 28#include "dolog.hh"
424bdfb1
PD
29
30#if defined (__OpenBSD__)
31#include <readline/readline.h>
32#else
33#include <editline/readline.h>
34#endif
35
3c115e0f 36#include "dnsname.hh"
773470ca 37#include "dnswriter.hh"
6d01c80c 38#include "base64.hh"
3f25bce6 39#include <fstream>
7b3865cd 40#include "delaypipe.hh"
7cc68f53 41#include <unistd.h>
6d01c80c 42#include "sodcrypto.hh"
0940e4eb 43#include "dnsrulactions.hh"
a36ce055
RG
44#include <grp.h>
45#include <pwd.h>
0e41337b 46#include "lock.hh"
7cc68f53 47#include <getopt.h>
41408d3a 48#include <sys/resource.h>
886e2cf2 49#include "dnsdist-cache.hh"
85c7ca75 50#include "gettime.hh"
3f25bce6 51
6ab65223
PL
52#ifdef HAVE_SYSTEMD
53#include <systemd/sd-daemon.h>
54#endif
55
0beaa5c8
RG
56#ifdef HAVE_PROTOBUF
57thread_local boost::uuids::random_generator t_uuidGenerator;
58#endif
59
a40df301 60/* Known sins:
e48090d1 61
d12cea01 62 Receiver is currently single threaded
e5a14b2b 63 not *that* bad actually, but now that we are thread safe, might want to scale
a40df301 64*/
24d5cb00 65
0940e4eb 66/* the Rulaction plan
67 Set of Rules, if one matches, it leads to an Action
68 Both rules and actions could conceivably be Lua based.
69 On the C++ side, both could be inherited from a class Rule and a class Action,
70 on the Lua side we can't do that. */
71
64e4ebb4 72using std::atomic;
73using std::thread;
7730131a 74bool g_verbose;
b065f701 75
e48090d1 76struct DNSDistStats g_stats;
13fe35db 77uint16_t g_maxOutstanding{10240};
b076b34a 78bool g_console;
9e87dcb8 79bool g_verboseHealthChecks{false};
1ea747c0 80uint32_t g_staleCacheEntriesTTL{0};
bbfaaa6f 81bool g_syslog{true};
cffde2fd 82
83GlobalStateHolder<NetmaskGroup> g_ACL;
c9262563 84string g_outputBuffer;
f0e4dcba 85vector<std::tuple<ComboAddress, bool, bool, int, string, std::set<int>>> g_locals;
11e1e08b 86#ifdef HAVE_DNSCRYPT
f0e4dcba 87std::vector<std::tuple<ComboAddress,DnsCryptContext,bool, int, string, std::set<int>>> g_dnsCryptLocals;
11e1e08b 88#endif
87b515ed
RG
89#ifdef HAVE_EBPF
90shared_ptr<BPFFilter> g_defaultBPFFilter;
8429ad04 91std::vector<std::shared_ptr<DynBPFFilter> > g_dynBPFFilters;
87b515ed 92#endif /* HAVE_EBPF */
963bef8d 93vector<ClientState *> g_frontends;
886e2cf2 94GlobalStateHolder<pools_t> g_pools;
0beaa5c8 95size_t g_udpVectorSize{1};
7c0860e1 96
9f4eb5cc
RG
97bool g_snmpEnabled{false};
98bool g_snmpTrapsEnabled{false};
99DNSDistSNMPAgent* g_snmpAgent{nullptr};
100
726ddf60 101/* UDP: the grand design. Per socket we listen on for incoming queries there is one thread.
7730131a 102 Then we have a bunch of connected sockets for talking to downstream servers.
103 We send directly to those sockets.
24d5cb00 104
7730131a 105 For the return path, per downstream server we have a thread that listens to responses.
24d5cb00 106
7730131a 107 Per socket there is an array of 2^16 states, when we send out a packet downstream, we note
108 there the original requestor and the original id. The new ID is the offset in the array.
109
110 When an answer comes in on a socket, we look up the offset by the id, and lob it to the
111 original requestor.
112
113 IDs are assigned by atomic increments of the socket offset.
114 */
115
3f25bce6 116/* for our load balancing, we want to support:
117 Round-robin
118 Round-robin with basic uptime checks
119 Send to least loaded server (least outstanding)
120 Send it to the first server that is not overloaded
839f3021 121 Hashed weighted random
3f25bce6 122*/
123
773470ca 124/* Idea:
125 Multiple server groups, by default we load balance to the group with no name.
126 Each instance is either 'up', 'down' or 'auto', where 'auto' means that dnsdist
127 determines if the instance is up or not. Auto should be the default and very very good.
128
129 In addition, to each instance you can attach a QPS object with rate & burst, which will optionally
130 limit the amount of queries we send there.
131
132 If all downstreams are over QPS, we pick the fastest server */
133
0940e4eb 134GlobalStateHolder<vector<pair<std::shared_ptr<DNSRule>, std::shared_ptr<DNSAction> > > > g_rulactions;
8146444b 135GlobalStateHolder<vector<pair<std::shared_ptr<DNSRule>, std::shared_ptr<DNSResponseAction> > > > g_resprulactions;
cf48b0ce 136GlobalStateHolder<vector<pair<std::shared_ptr<DNSRule>, std::shared_ptr<DNSResponseAction> > > > g_cachehitresprulactions;
df111b53 137Rings g_rings;
786e4d8c 138QueryCount g_qcount;
7c0860e1 139
ecbe9133 140GlobalStateHolder<servers_t> g_dstates;
78ffa782 141GlobalStateHolder<NetmaskTree<DynBlock>> g_dynblockNMG;
71c94675 142GlobalStateHolder<SuffixMatchTree<DynBlock>> g_dynblockSMT;
dd46e5e3 143DNSAction::Action g_dynBlockAction = DNSAction::Action::Drop;
3f6d07a4
RG
144int g_tcpRecvTimeout{2};
145int g_tcpSendTimeout{2};
e0b5e49d 146int g_udpTimeout{2};
3f6d07a4 147
26a3cdb7 148bool g_servFailOnNoPolicy{false};
e72fbfc4 149bool g_truncateTC{false};
b29edbee 150bool g_fixupCase{0};
0beaa5c8
RG
151
152static const size_t s_udpIncomingBufferSize{1500};
153
b50ee135 154static void truncateTC(const char* packet, uint16_t* len)
6ad8b29a 155try
156{
157 unsigned int consumed;
ca404e94 158 DNSName qname(packet, *len, sizeof(dnsheader), false, 0, 0, &consumed);
a683e8bd 159 *len=(uint16_t) (sizeof(dnsheader)+consumed+DNS_TYPE_SIZE+DNS_CLASS_SIZE);
6ad8b29a 160 struct dnsheader* dh =(struct dnsheader*)packet;
161 dh->ancount = dh->arcount = dh->nscount=0;
162}
163catch(...)
164{
165 g_stats.truncFail++;
166}
167
7b3865cd 168struct DelayedPacket
169{
170 int fd;
171 string packet;
172 ComboAddress destination;
68f3eb4d 173 ComboAddress origDest;
7b3865cd 174 void operator()()
175 {
20b019fa
RG
176 ssize_t res;
177 if(origDest.sin4.sin_family == 0) {
178 res = sendto(fd, packet.c_str(), packet.size(), 0, (struct sockaddr*)&destination, destination.getSocklen());
179 }
180 else {
181 res = sendfromto(fd, packet.c_str(), packet.size(), 0, origDest, destination);
182 }
183 if (res == -1) {
184 int err = errno;
185 vinfolog("Error sending delayed response to %s: %s", destination.toStringWithPort(), strerror(err));
186 }
7b3865cd 187 }
188};
189
a36ce055 190DelayPipe<DelayedPacket> * g_delay = 0;
7b3865cd 191
daacd477 192static void doLatencyAverages(double udiff)
193{
194 auto doAvg = [](double& var, double n, double weight) {
195 var = (weight -1) * var/weight + n/weight;
196 };
197
198 doAvg(g_stats.latencyAvg100, udiff, 100);
199 doAvg(g_stats.latencyAvg1000, udiff, 1000);
200 doAvg(g_stats.latencyAvg10000, udiff, 10000);
201 doAvg(g_stats.latencyAvg1000000, udiff, 1000000);
202}
ca404e94 203
fcffc585
RG
204bool responseContentMatches(const char* response, const uint16_t responseLen, const DNSName& qname, const uint16_t qtype, const uint16_t qclass, const ComboAddress& remote)
205{
206 uint16_t rqtype, rqclass;
207 unsigned int consumed;
208 DNSName rqname;
209 const struct dnsheader* dh = (struct dnsheader*) response;
210
211 if (responseLen < sizeof(dnsheader)) {
212 return false;
213 }
214
c8c3d4e4
RG
215 if (dh->qdcount == 0) {
216 if (dh->rcode != RCode::NoError && dh->rcode != RCode::NXDomain) {
217 return true;
218 }
219 else {
220 g_stats.nonCompliantResponses++;
221 return false;
222 }
223 }
224
fcffc585
RG
225 try {
226 rqname=DNSName(response, responseLen, sizeof(dnsheader), false, &rqtype, &rqclass, &consumed);
227 }
228 catch(std::exception& e) {
229 if(responseLen > (ssize_t)sizeof(dnsheader))
230 infolog("Backend %s sent us a response with id %d that did not parse: %s", remote.toStringWithPort(), ntohs(dh->id), e.what());
231 g_stats.nonCompliantResponses++;
232 return false;
233 }
234
235 if (rqtype != qtype || rqclass != qclass || rqname != qname) {
236 return false;
237 }
238
239 return true;
240}
241
0f72fd5c 242void restoreFlags(struct dnsheader* dh, uint16_t origFlags)
fcffc585
RG
243{
244 static const uint16_t rdMask = 1 << FLAGS_RD_OFFSET;
245 static const uint16_t cdMask = 1 << FLAGS_CD_OFFSET;
246 static const uint16_t restoreFlagsMask = UINT16_MAX & ~(rdMask | cdMask);
0f72fd5c
RG
247 uint16_t * flags = getFlagsFromDNSHeader(dh);
248 /* clear the flags we are about to restore */
249 *flags &= restoreFlagsMask;
250 /* only keep the flags we want to restore */
251 origFlags &= ~restoreFlagsMask;
252 /* set the saved flags as they were */
253 *flags |= origFlags;
254}
255
ff73f02b 256bool fixUpResponse(char** response, uint16_t* responseLen, size_t* responseSize, const DNSName& qname, uint16_t origFlags, bool ednsAdded, bool ecsAdded, std::vector<uint8_t>& rewrittenResponse, uint16_t addRoom)
0f72fd5c 257{
fcffc585
RG
258 struct dnsheader* dh = (struct dnsheader*) *response;
259
260 if (*responseLen < sizeof(dnsheader)) {
261 return false;
262 }
263
c8c3d4e4
RG
264 restoreFlags(dh, origFlags);
265
266 if (*responseLen == sizeof(dnsheader)) {
267 return true;
268 }
269
fcffc585
RG
270 if(g_fixupCase) {
271 string realname = qname.toDNSString();
272 if (*responseLen >= (sizeof(dnsheader) + realname.length())) {
273 memcpy(*response + sizeof(dnsheader), realname.c_str(), realname.length());
274 }
275 }
276
ff73f02b
RG
277 if (ednsAdded || ecsAdded) {
278 char * optStart = NULL;
fcffc585
RG
279 size_t optLen = 0;
280 bool last = false;
281
282 int res = locateEDNSOptRR(*response, *responseLen, &optStart, &optLen, &last);
283
284 if (res == 0) {
ff73f02b
RG
285 if (ednsAdded) {
286 /* we added the entire OPT RR,
287 therefore we need to remove it entirely */
288 if (last) {
289 /* simply remove the last AR */
290 *responseLen -= optLen;
291 uint16_t arcount = ntohs(dh->arcount);
292 arcount--;
293 dh->arcount = htons(arcount);
294 }
295 else {
296 /* Removing an intermediary RR could lead to compression error */
297 if (rewriteResponseWithoutEDNS(*response, *responseLen, rewrittenResponse) == 0) {
298 *responseLen = rewrittenResponse.size();
299 if (addRoom && (UINT16_MAX - *responseLen) > addRoom) {
300 rewrittenResponse.reserve(*responseLen + addRoom);
301 }
302 *responseSize = rewrittenResponse.capacity();
303 *response = reinterpret_cast<char*>(rewrittenResponse.data());
304 }
305 else {
306 warnlog("Error rewriting content");
307 }
308 }
fcffc585
RG
309 }
310 else {
ff73f02b
RG
311 /* the OPT RR was already present, but without ECS,
312 we need to remove the ECS option if any */
313 if (last) {
314 /* nothing after the OPT RR, we can simply remove the
315 ECS option */
316 size_t existingOptLen = optLen;
317 removeEDNSOptionFromOPT(optStart, &optLen, EDNSOptionCode::ECS);
318 *responseLen -= (existingOptLen - optLen);
fcffc585
RG
319 }
320 else {
ff73f02b
RG
321 /* Removing an intermediary RR could lead to compression error */
322 if (rewriteResponseWithoutEDNSOption(*response, *responseLen, EDNSOptionCode::ECS, rewrittenResponse) == 0) {
323 *responseLen = rewrittenResponse.size();
324 if (addRoom && (UINT16_MAX - *responseLen) > addRoom) {
325 rewrittenResponse.reserve(*responseLen + addRoom);
326 }
327 *responseSize = rewrittenResponse.capacity();
328 *response = reinterpret_cast<char*>(rewrittenResponse.data());
329 }
330 else {
331 warnlog("Error rewriting content");
332 }
fcffc585
RG
333 }
334 }
335 }
336 }
337
338 return true;
339}
340
fcffc585 341#ifdef HAVE_DNSCRYPT
57847d65 342bool encryptResponse(char* response, uint16_t* responseLen, size_t responseSize, bool tcp, std::shared_ptr<DnsCryptQuery> dnsCryptQuery, dnsheader** dh, dnsheader* dhCopy)
fcffc585 343{
0f72fd5c
RG
344 if (dnsCryptQuery) {
345 uint16_t encryptedResponseLen = 0;
57847d65
RG
346
347 /* save the original header before encrypting it in place */
348 if (dh != nullptr && *dh != nullptr && dhCopy != nullptr) {
349 memcpy(dhCopy, *dh, sizeof(dnsheader));
350 *dh = dhCopy;
351 }
352
0f72fd5c 353 int res = dnsCryptQuery->ctx->encryptResponse(response, *responseLen, responseSize, dnsCryptQuery, tcp, &encryptedResponseLen);
fcffc585 354 if (res == 0) {
0f72fd5c 355 *responseLen = encryptedResponseLen;
fcffc585
RG
356 } else {
357 /* dropping response */
358 vinfolog("Error encrypting the response, dropping.");
359 return false;
360 }
361 }
0f72fd5c
RG
362 return true;
363}
fcffc585
RG
364#endif
365
0f72fd5c
RG
366static bool sendUDPResponse(int origFD, char* response, uint16_t responseLen, int delayMsec, const ComboAddress& origDest, const ComboAddress& origRemote)
367{
fcffc585
RG
368 if(delayMsec && g_delay) {
369 DelayedPacket dp{origFD, string(response,responseLen), origRemote, origDest};
370 g_delay->submit(dp, delayMsec);
371 }
372 else {
20b019fa
RG
373 ssize_t res;
374 if(origDest.sin4.sin_family == 0) {
375 res = sendto(origFD, response, responseLen, 0, (struct sockaddr*)&origRemote, origRemote.getSocklen());
376 }
377 else {
378 res = sendfromto(origFD, response, responseLen, 0, origDest, origRemote);
379 }
380 if (res == -1) {
381 int err = errno;
382 vinfolog("Error sending response to %s: %s", origRemote.toStringWithPort(), strerror(err));
383 }
fcffc585
RG
384 }
385
386 return true;
387}
388
4632045b 389// listens on a dedicated socket, lobs answers from downstream servers to original requestors
3c115e0f 390void* responderThread(std::shared_ptr<DownstreamState> state)
66d1c0bf 391try {
d8c19b98 392 auto localRespRulactions = g_resprulactions.getLocal();
11e1e08b
RG
393#ifdef HAVE_DNSCRYPT
394 char packet[4096 + DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE];
57847d65
RG
395 /* when the answer is encrypted in place, we need to get a copy
396 of the original header before encryption to fill the ring buffer */
397 dnsheader dhCopy;
11e1e08b 398#else
64e4ebb4 399 char packet[4096];
11e1e08b 400#endif
b50ee135 401 static_assert(sizeof(packet) <= UINT16_MAX, "Packet size should fit in a uint16_t");
ca404e94 402 vector<uint8_t> rewrittenResponse;
7267213a 403
66d1c0bf 404 uint16_t queryId = 0;
7730131a 405 for(;;) {
57847d65 406 dnsheader* dh = reinterpret_cast<struct dnsheader*>(packet);
66d1c0bf
RG
407 try {
408 ssize_t got = recv(state->fd, packet, sizeof(packet), 0);
409 char * response = packet;
410 size_t responseSize = sizeof(packet);
ca404e94 411
66d1c0bf
RG
412 if (got < (ssize_t) sizeof(dnsheader))
413 continue;
4f380af3 414
66d1c0bf
RG
415 uint16_t responseLen = (uint16_t) got;
416 queryId = dh->id;
b50ee135 417
66d1c0bf
RG
418 if(queryId >= state->idStates.size())
419 continue;
4f380af3 420
66d1c0bf
RG
421 IDState* ids = &state->idStates[queryId];
422 int origFD = ids->origFD;
c9ba8478 423
66d1c0bf
RG
424 if(origFD < 0) // duplicate
425 continue;
7267213a 426
66d1c0bf
RG
427 /* setting age to 0 to prevent the maintainer thread from
428 cleaning this IDS while we process the response.
429 We have already a copy of the origFD, so it would
430 mostly mess up the outstanding counter.
431 */
432 ids->age = 0;
fcffc585 433
66d1c0bf
RG
434 if (!responseContentMatches(response, responseLen, ids->qname, ids->qtype, ids->qclass, state->remote)) {
435 continue;
436 }
7267213a 437
66d1c0bf 438 --state->outstanding; // you'd think an attacker could game this, but we're using connected socket
2c5db3f7 439
66d1c0bf
RG
440 if(dh->tc && g_truncateTC) {
441 truncateTC(response, &responseLen);
442 }
aeb36780 443
66d1c0bf 444 dh->id = ids->origID;
ca404e94 445
66d1c0bf
RG
446 uint16_t addRoom = 0;
447 DNSResponse dr(&ids->qname, ids->qtype, ids->qclass, &ids->origDest, &ids->origRemote, dh, sizeof(packet), responseLen, false, &ids->sentTime.d_start);
d8c19b98 448#ifdef HAVE_PROTOBUF
66d1c0bf 449 dr.uniqueId = ids->uniqueId;
d8c19b98 450#endif
66d1c0bf
RG
451 if (!processResponse(localRespRulactions, dr, &ids->delayMsec)) {
452 continue;
453 }
d8c19b98 454
11e1e08b 455#ifdef HAVE_DNSCRYPT
66d1c0bf
RG
456 if (ids->dnsCryptQuery) {
457 addRoom = DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE;
458 }
11e1e08b 459#endif
66d1c0bf
RG
460 if (!fixUpResponse(&response, &responseLen, &responseSize, ids->qname, ids->origFlags, ids->ednsAdded, ids->ecsAdded, rewrittenResponse, addRoom)) {
461 continue;
462 }
ca404e94 463
66d1c0bf 464 if (ids->packetCache && !ids->skipCache) {
2714396e 465 ids->packetCache->insert(ids->cacheKey, ids->qname, ids->qtype, ids->qclass, response, responseLen, false, dh->rcode);
66d1c0bf 466 }
886e2cf2 467
b5b93e0b 468 if (ids->cs && !ids->cs->muted) {
11e1e08b 469#ifdef HAVE_DNSCRYPT
57847d65 470 if (!encryptResponse(response, &responseLen, responseSize, false, ids->dnsCryptQuery, &dh, &dhCopy)) {
b5b93e0b
RG
471 continue;
472 }
11e1e08b 473#endif
b5b93e0b
RG
474
475 ComboAddress empty;
476 empty.sin4.sin_family = 0;
477 /* if ids->destHarvested is false, origDest holds the listening address.
478 We don't want to use that as a source since it could be 0.0.0.0 for example. */
479 sendUDPResponse(origFD, response, responseLen, ids->delayMsec, ids->destHarvested ? ids->origDest : empty, ids->origRemote);
480 }
66d1c0bf
RG
481
482 g_stats.responses++;
483
484 double udiff = ids->sentTime.udiff();
485 vinfolog("Got answer from %s, relayed to %s, took %f usec", state->remote.toStringWithPort(), ids->origRemote.toStringWithPort(), udiff);
486
487 {
488 struct timespec ts;
489 gettime(&ts);
490 std::lock_guard<std::mutex> lock(g_rings.respMutex);
491 g_rings.respRing.push_back({ts, ids->origRemote, ids->qname, ids->qtype, (unsigned int)udiff, (unsigned int)got, *dh, state->remote});
492 }
493
494 if(dh->rcode == RCode::ServFail)
495 g_stats.servfailResponses++;
496 state->latencyUsec = (127.0 * state->latencyUsec / 128.0) + udiff/128.0;
497
498 if(udiff < 1000) g_stats.latency0_1++;
499 else if(udiff < 10000) g_stats.latency1_10++;
500 else if(udiff < 50000) g_stats.latency10_50++;
501 else if(udiff < 100000) g_stats.latency50_100++;
502 else if(udiff < 1000000) g_stats.latency100_1000++;
503 else g_stats.latencySlow++;
42fae326 504
66d1c0bf 505 doLatencyAverages(udiff);
fcadd56e 506
66d1c0bf 507 if (ids->origFD == origFD) {
11e1e08b 508#ifdef HAVE_DNSCRYPT
0beaa5c8 509 ids->dnsCryptQuery = nullptr;
11e1e08b 510#endif
66d1c0bf
RG
511 ids->origFD = -1;
512 }
ca404e94 513
66d1c0bf
RG
514 rewrittenResponse.clear();
515 }
516 catch(std::exception& e){
517 vinfolog("Got an error in UDP responder thread while parsing a response from %s, id %d: %s", state->remote.toStringWithPort(), queryId, e.what());
518 }
7730131a 519 }
520 return 0;
521}
66d1c0bf
RG
522catch(const std::exception& e)
523{
524 errlog("UDP responder thread died because of exception: %s", e.what());
525 return 0;
526}
527catch(const PDNSException& e)
528{
529 errlog("UDP responder thread died because of PowerDNS exception: %s", e.reason);
530 return 0;
531}
532catch(...)
533{
534 errlog("UDP responder thread died because of an exception: %s", "unknown");
535 return 0;
536}
24d5cb00 537
b58f08e5 538void DownstreamState::reconnect()
3c115e0f 539{
b58f08e5
RG
540 connected = false;
541 if (fd != -1) {
542 /* shutdown() is needed to wake up recv() in the responderThread */
543 shutdown(fd, SHUT_RDWR);
544 close(fd);
545 fd = -1;
546 }
f99e3aaf
RG
547 if (!IsAnyAddress(remote)) {
548 fd = SSocket(remote.sin4.sin_family, SOCK_DGRAM, 0);
549 if (!IsAnyAddress(sourceAddr)) {
550 SSetsockopt(fd, SOL_SOCKET, SO_REUSEADDR, 1);
551 SBind(fd, sourceAddr);
552 }
7565f4e6
RG
553 try {
554 SConnect(fd, remote);
555 connected = true;
556 }
557 catch(const std::runtime_error& error) {
558 infolog("Error connecting to new server with address %s: %s", remote.toStringWithPort(), error.what());
559 }
b58f08e5
RG
560 }
561}
562
563DownstreamState::DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf_): remote(remote_), sourceAddr(sourceAddr_), sourceItf(sourceItf_)
564{
565 if (!IsAnyAddress(remote)) {
566 reconnect();
f99e3aaf
RG
567 idStates.resize(g_maxOutstanding);
568 sw.start();
569 infolog("Added downstream server %s", remote.toStringWithPort());
fbe2a2e0 570 }
3c115e0f 571}
572
773470ca 573std::mutex g_luamutex;
3f25bce6 574LuaContext g_lua;
3f25bce6 575
ecbe9133 576GlobalStateHolder<ServerPolicy> g_policy;
773470ca 577
497a6e3a 578shared_ptr<DownstreamState> firstAvailable(const NumberedServerVector& servers, const DNSQuestion* dq)
773470ca 579{
22b2b326 580 for(auto& d : servers) {
da4e7813 581 if(d.second->isUp() && d.second->qps.check())
582 return d.second;
773470ca 583 }
497a6e3a 584 return leastOutstanding(servers, dq);
2c5db3f7 585}
586
d1fba58e 587// get server with least outstanding queries, and within those, with the lowest order, and within those: the fastest
497a6e3a 588shared_ptr<DownstreamState> leastOutstanding(const NumberedServerVector& servers, const DNSQuestion* dq)
c9262563 589{
886e2cf2
RG
590 if (servers.size() == 1 && servers[0].second->isUp()) {
591 return servers[0].second;
592 }
593
d1fba58e 594 vector<pair<tuple<int,int,double>, shared_ptr<DownstreamState>>> poss;
595 /* so you might wonder, why do we go through this trouble? The data on which we sort could change during the sort,
596 which would suck royally and could even lead to crashes. So first we snapshot on what we sort, and then we sort */
478ce172 597 poss.reserve(servers.size());
0e41337b 598 for(auto& d : servers) {
da4e7813 599 if(d.second->isUp()) {
d1fba58e 600 poss.push_back({make_tuple(d.second->outstanding.load(), d.second->order, d.second->latencyUsec), d.second});
c9262563 601 }
602 }
603 if(poss.empty())
604 return shared_ptr<DownstreamState>();
605 nth_element(poss.begin(), poss.begin(), poss.end(), [](const decltype(poss)::value_type& a, const decltype(poss)::value_type& b) { return a.first < b.first; });
606 return poss.begin()->second;
607}
608
497a6e3a 609shared_ptr<DownstreamState> valrandom(unsigned int val, const NumberedServerVector& servers, const DNSQuestion* dq)
c9262563 610{
611 vector<pair<int, shared_ptr<DownstreamState>>> poss;
612 int sum=0;
22b2b326 613 for(auto& d : servers) { // w=1, w=10 -> 1, 11
da4e7813 614 if(d.second->isUp()) {
615 sum+=d.second->weight;
616 poss.push_back({sum, d.second});
c9262563 617 }
618 }
d2894425
NJ
619
620 // Catch poss & sum are empty to avoid SIGFPE
621 if(poss.empty())
622 return shared_ptr<DownstreamState>();
623
a7f3108c 624 int r = val % sum;
af619119 625 auto p = upper_bound(poss.begin(), poss.end(),r, [](int r_, const decltype(poss)::value_type& a) { return r_ < a.first;});
c9262563 626 if(p==poss.end())
627 return shared_ptr<DownstreamState>();
628 return p->second;
629}
630
497a6e3a 631shared_ptr<DownstreamState> wrandom(const NumberedServerVector& servers, const DNSQuestion* dq)
a7f3108c 632{
497a6e3a 633 return valrandom(random(), servers, dq);
a7f3108c 634}
635
36e763fa 636uint32_t g_hashperturb;
497a6e3a 637shared_ptr<DownstreamState> whashed(const NumberedServerVector& servers, const DNSQuestion* dq)
a7f3108c 638{
497a6e3a 639 return valrandom(dq->qname->hash(g_hashperturb), servers, dq);
a7f3108c 640}
641
642
497a6e3a 643shared_ptr<DownstreamState> roundrobin(const NumberedServerVector& servers, const DNSQuestion* dq)
5f504638 644{
da4e7813 645 NumberedServerVector poss;
c9262563 646
22b2b326 647 for(auto& d : servers) {
da4e7813 648 if(d.second->isUp()) {
5f504638 649 poss.push_back(d);
c9262563 650 }
5f504638 651 }
c9262563 652
ecbe9133 653 const auto *res=&poss;
5f504638 654 if(poss.empty())
ecbe9133 655 res = &servers;
c9262563 656
657 if(res->empty())
658 return shared_ptr<DownstreamState>();
659
660 static unsigned int counter;
661
da4e7813 662 return (*res)[(counter++) % res->size()].second;
5f504638 663}
664
c7b1d943
PL
665static void writepid(string pidfile) {
666 if (!pidfile.empty()) {
667 // Clean up possible stale file
668 unlink(pidfile.c_str());
669
670 // Write the pidfile
671 ofstream of(pidfile.c_str());
672 if (of) {
673 of << getpid();
674 } else {
675 errlog("Unable to write PID-file to '%s'.", pidfile);
676 }
677 of.close();
678 }
679}
680
b076b34a 681static void daemonize(void)
682{
683 if(fork())
684 _exit(0); // bye bye
c7b1d943
PL
685 /* We are child */
686
b076b34a 687 setsid();
688
689 int i=open("/dev/null",O_RDWR); /* open stdin */
690 if(i < 0)
691 ; // L<<Logger::Critical<<"Unable to open /dev/null: "<<stringerror()<<endl;
692 else {
693 dup2(i,0); /* stdin */
694 dup2(i,1); /* stderr */
695 dup2(i,2); /* stderr */
696 close(i);
697 }
698}
699
e6841c9e 700ComboAddress g_serverControl{"127.0.0.1:5199"};
b076b34a 701
886e2cf2
RG
702std::shared_ptr<ServerPool> createPoolIfNotExists(pools_t& pools, const string& poolName)
703{
704 std::shared_ptr<ServerPool> pool;
705 pools_t::iterator it = pools.find(poolName);
706 if (it != pools.end()) {
707 pool = it->second;
708 }
709 else {
8f4f5186
RG
710 if (!poolName.empty())
711 vinfolog("Creating pool %s", poolName);
886e2cf2
RG
712 pool = std::make_shared<ServerPool>();
713 pools.insert(std::pair<std::string,std::shared_ptr<ServerPool> >(poolName, pool));
714 }
715 return pool;
716}
78c8047b 717
742c079a
RG
718void setPoolPolicy(pools_t& pools, const string& poolName, std::shared_ptr<ServerPolicy> policy)
719{
720 std::shared_ptr<ServerPool> pool = createPoolIfNotExists(pools, poolName);
721 if (!poolName.empty()) {
722 vinfolog("Setting pool %s server selection policy to %s", poolName, policy->name);
723 } else {
724 vinfolog("Setting default pool server selection policy to %s", policy->name);
725 }
726 pool->policy = policy;
727}
728
886e2cf2 729void addServerToPool(pools_t& pools, const string& poolName, std::shared_ptr<DownstreamState> server)
22b2b326 730{
886e2cf2 731 std::shared_ptr<ServerPool> pool = createPoolIfNotExists(pools, poolName);
a683e8bd 732 unsigned int count = (unsigned int) pool->servers.size();
c218b223 733 if (!poolName.empty()) {
8f4f5186 734 vinfolog("Adding server to pool %s", poolName);
c218b223 735 } else {
8f4f5186 736 vinfolog("Adding server to default pool");
c218b223 737 }
886e2cf2 738 pool->servers.push_back(make_pair(++count, server));
d12cd8e9
RG
739 /* we need to reorder based on the server 'order' */
740 std::stable_sort(pool->servers.begin(), pool->servers.end(), [](const std::pair<unsigned int,std::shared_ptr<DownstreamState> >& a, const std::pair<unsigned int,std::shared_ptr<DownstreamState> >& b) {
741 return a.second->order < b.second->order;
742 });
743 /* and now we need to renumber for Lua (custom policies) */
744 size_t idx = 1;
af619119
RG
745 for (auto& serv : pool->servers) {
746 serv.first = idx++;
d12cd8e9 747 }
886e2cf2
RG
748}
749
750void removeServerFromPool(pools_t& pools, const string& poolName, std::shared_ptr<DownstreamState> server)
751{
8f4f5186 752 std::shared_ptr<ServerPool> pool = getPool(pools, poolName);
886e2cf2 753
c218b223 754 if (!poolName.empty()) {
8f4f5186 755 vinfolog("Removing server from pool %s", poolName);
c218b223 756 }
757 else {
8f4f5186 758 vinfolog("Removing server from default pool");
c218b223 759 }
886e2cf2 760
d12cd8e9
RG
761 size_t idx = 1;
762 bool found = false;
763 for (NumberedVector<shared_ptr<DownstreamState> >::iterator it = pool->servers.begin(); it != pool->servers.end();) {
764 if (found) {
765 /* we need to renumber the servers placed
766 after the removed one, for Lua (custom policies) */
767 it->first = idx++;
768 it++;
769 }
770 else if (it->second == server) {
771 it = pool->servers.erase(it);
772 found = true;
773 } else {
774 idx++;
775 it++;
886e2cf2
RG
776 }
777 }
778}
779
780std::shared_ptr<ServerPool> getPool(const pools_t& pools, const std::string& poolName)
781{
782 pools_t::const_iterator it = pools.find(poolName);
783
784 if (it == pools.end()) {
785 throw std::out_of_range("No pool named " + poolName);
786 }
787
788 return it->second;
789}
790
791const NumberedServerVector& getDownstreamCandidates(const pools_t& pools, const std::string& poolName)
792{
793 std::shared_ptr<ServerPool> pool = getPool(pools, poolName);
794 return pool->servers;
22b2b326 795}
c9262563 796
520eb5a0 797// goal in life - if you send us a reasonably normal packet, we'll get Z for you, otherwise 0
798int getEDNSZ(const char* packet, unsigned int len)
d08b1cdf 799try
520eb5a0 800{
801 struct dnsheader* dh =(struct dnsheader*)packet;
802
ca404e94 803 if(ntohs(dh->qdcount) != 1 || dh->ancount!=0 || ntohs(dh->arcount)!=1 || dh->nscount!=0)
520eb5a0 804 return 0;
ca404e94
RG
805
806 if (len <= sizeof(dnsheader))
807 return 0;
808
520eb5a0 809 unsigned int consumed;
ca404e94
RG
810 DNSName qname(packet, len, sizeof(dnsheader), false, 0, 0, &consumed);
811 size_t pos = consumed + DNS_TYPE_SIZE + DNS_CLASS_SIZE;
520eb5a0 812 uint16_t qtype, qclass;
813
ca404e94 814 if (len <= (sizeof(dnsheader)+pos))
520eb5a0 815 return 0;
816
ca404e94
RG
817 DNSName aname(packet, len, sizeof(dnsheader)+pos, true, &qtype, &qclass, &consumed);
818
819 if(qtype!=QType::OPT || sizeof(dnsheader)+pos+consumed+DNS_TYPE_SIZE+DNS_CLASS_SIZE+EDNS_EXTENDED_RCODE_SIZE+EDNS_VERSION_SIZE+1 >= len)
820 return 0;
821
822 uint8_t* z = (uint8_t*)packet+sizeof(dnsheader)+pos+consumed+DNS_TYPE_SIZE+DNS_CLASS_SIZE+EDNS_EXTENDED_RCODE_SIZE+EDNS_VERSION_SIZE;
520eb5a0 823 return 0x100 * (*z) + *(z+1);
824}
d08b1cdf 825catch(...)
826{
827 return 0;
828}
0940e4eb 829
614239d0 830static void spoofResponseFromString(DNSQuestion& dq, const string& spoofContent)
7791f83a
RG
831{
832 string result;
614239d0
RG
833
834 std::vector<std::string> addrs;
835 stringtok(addrs, spoofContent, " ,");
836
837 if (addrs.size() == 1) {
838 try {
839 ComboAddress spoofAddr(spoofContent);
840 SpoofAction sa({spoofAddr});
841 sa(&dq, &result);
842 }
843 catch(const PDNSException &e) {
844 SpoofAction sa(spoofContent); // CNAME then
845 sa(&dq, &result);
846 }
847 } else {
848 std::vector<ComboAddress> cas;
849 for (const auto& addr : addrs) {
850 try {
851 cas.push_back(ComboAddress(addr));
852 }
853 catch (...) {
854 }
855 }
856 SpoofAction sa(cas);
6ca7a40a 857 sa(&dq, &result);
7791f83a
RG
858 }
859}
860
0beaa5c8 861bool processQuery(LocalHolders& holders, DNSQuestion& dq, string& poolname, int* delayMsec, const struct timespec& now)
e91084ce
RG
862{
863 {
864 WriteLock wl(&g_rings.queryLock);
8146444b 865 g_rings.queryRing.push_back({now,*dq.remote,*dq.qname,dq.len,dq.qtype,*dq.dh});
e91084ce
RG
866 }
867
786e4d8c
RS
868 if(g_qcount.enabled) {
869 string qname = (*dq.qname).toString(".");
870 bool countQuery{true};
871 if(g_qcount.filter) {
872 std::lock_guard<std::mutex> lock(g_luamutex);
873 std::tie (countQuery, qname) = g_qcount.filter(dq);
874 }
875
876 if(countQuery) {
877 WriteLock wl(&g_qcount.queryLock);
878 if(!g_qcount.records.count(qname)) {
879 g_qcount.records[qname] = 0;
880 }
881 g_qcount.records[qname]++;
882 }
883 }
884
0beaa5c8 885 if(auto got = holders.dynNMGBlock->lookup(*dq.remote)) {
701f690b 886 auto updateBlockStats = [&got]() {
887 g_stats.dynBlocked++;
888 got->second.blocks++;
889 };
890
e91084ce 891 if(now < got->second.until) {
7b925432
RG
892 DNSAction::Action action = got->second.action;
893 if (action == DNSAction::Action::None) {
894 action = g_dynBlockAction;
895 }
896 if (action == DNSAction::Action::Refused) {
dd46e5e3 897 vinfolog("Query from %s refused because of dynamic block", dq.remote->toStringWithPort());
701f690b 898 updateBlockStats();
8477236d 899
dd46e5e3
RG
900 dq.dh->rcode = RCode::Refused;
901 dq.dh->qr=true;
902 return true;
903 }
8477236d 904 else if (action == DNSAction::Action::Truncate) {
905 if(!dq.tcp) {
701f690b 906 updateBlockStats();
8477236d 907 vinfolog("Query from %s truncated because of dynamic block", dq.remote->toStringWithPort());
908 dq.dh->tc = true;
909 dq.dh->qr = true;
910 return true;
911 }
912 else {
913 vinfolog("Query from %s for %s over TCP *not* truncated because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toString());
914 }
915
c4f5aeff 916 }
dd46e5e3 917 else {
701f690b 918 updateBlockStats();
dd46e5e3
RG
919 vinfolog("Query from %s dropped because of dynamic block", dq.remote->toStringWithPort());
920 return false;
921 }
e91084ce
RG
922 }
923 }
924
0beaa5c8 925 if(auto got = holders.dynSMTBlock->lookup(*dq.qname)) {
701f690b 926 auto updateBlockStats = [&got]() {
927 g_stats.dynBlocked++;
928 got->blocks++;
929 };
930
71c94675 931 if(now < got->until) {
7b925432
RG
932 DNSAction::Action action = got->action;
933 if (action == DNSAction::Action::None) {
934 action = g_dynBlockAction;
935 }
936 if (action == DNSAction::Action::Refused) {
dd46e5e3 937 vinfolog("Query from %s for %s refused because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toString());
701f690b 938 updateBlockStats();
8477236d 939
dd46e5e3
RG
940 dq.dh->rcode = RCode::Refused;
941 dq.dh->qr=true;
942 return true;
943 }
8477236d 944 else if (action == DNSAction::Action::Truncate) {
945 if(!dq.tcp) {
701f690b 946 updateBlockStats();
8477236d 947
948 vinfolog("Query from %s for %s truncated because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toString());
949 dq.dh->tc = true;
950 dq.dh->qr = true;
951 return true;
952 }
953 else {
954 vinfolog("Query from %s for %s over TCP *not* truncated because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toString());
955 }
c4f5aeff 956 }
dd46e5e3 957 else {
701f690b 958 updateBlockStats();
dd46e5e3
RG
959 vinfolog("Query from %s for %s dropped because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toString());
960 return false;
961 }
71c94675 962 }
963 }
964
e91084ce
RG
965 DNSAction::Action action=DNSAction::Action::None;
966 string ruleresult;
0beaa5c8 967 for(const auto& lr : *holders.rulactions) {
e91084ce
RG
968 if(lr.first->matches(&dq)) {
969 lr.first->d_matches++;
970 action=(*lr.second)(&dq, &ruleresult);
971
972 switch(action) {
973 case DNSAction::Action::Allow:
974 return true;
975 break;
976 case DNSAction::Action::Drop:
977 g_stats.ruleDrop++;
978 return false;
979 break;
980 case DNSAction::Action::Nxdomain:
981 dq.dh->rcode = RCode::NXDomain;
982 dq.dh->qr=true;
983 g_stats.ruleNXDomain++;
984 return true;
985 break;
dd46e5e3
RG
986 case DNSAction::Action::Refused:
987 dq.dh->rcode = RCode::Refused;
988 dq.dh->qr=true;
989 g_stats.ruleRefused++;
990 return true;
991 break;
e91084ce
RG
992 case DNSAction::Action::Spoof:
993 spoofResponseFromString(dq, ruleresult);
994 return true;
995 break;
c4f5aeff
RG
996 case DNSAction::Action::Truncate:
997 dq.dh->tc = true;
998 dq.dh->qr = true;
999 return true;
1000 break;
e91084ce
RG
1001 case DNSAction::Action::HeaderModify:
1002 return true;
1003 break;
1004 case DNSAction::Action::Pool:
1005 poolname=ruleresult;
1006 return true;
1007 break;
1008 /* non-terminal actions follow */
1009 case DNSAction::Action::Delay:
1010 *delayMsec = static_cast<int>(pdns_stou(ruleresult)); // sorry
1011 break;
1012 case DNSAction::Action::None:
1013 break;
1014 }
1015 }
1016 }
1017
1018 return true;
1019}
1020
788c3243 1021bool processResponse(LocalStateHolder<vector<pair<std::shared_ptr<DNSRule>, std::shared_ptr<DNSResponseAction> > > >& localRespRulactions, DNSResponse& dr, int* delayMsec)
8146444b 1022{
788c3243 1023 DNSResponseAction::Action action=DNSResponseAction::Action::None;
8146444b
RG
1024 std::string ruleresult;
1025 for(const auto& lr : *localRespRulactions) {
1026 if(lr.first->matches(&dr)) {
1027 lr.first->d_matches++;
788c3243
RG
1028 action=(*lr.second)(&dr, &ruleresult);
1029 switch(action) {
1030 case DNSResponseAction::Action::Allow:
1031 return true;
1032 break;
1033 case DNSResponseAction::Action::Drop:
1034 return false;
1035 break;
1036 case DNSResponseAction::Action::HeaderModify:
1037 return true;
1038 break;
1039 /* non-terminal actions follow */
1040 case DNSResponseAction::Action::Delay:
1041 *delayMsec = static_cast<int>(pdns_stou(ruleresult)); // sorry
1042 break;
1043 case DNSResponseAction::Action::None:
1044 break;
1045 }
8146444b
RG
1046 }
1047 }
1048
1049 return true;
1050}
1051
fbe2a2e0
RG
1052static ssize_t udpClientSendRequestToBackend(DownstreamState* ss, const int sd, const char* request, const size_t requestLen)
1053{
b58f08e5
RG
1054 ssize_t result;
1055
fbe2a2e0 1056 if (ss->sourceItf == 0) {
b58f08e5
RG
1057 result = send(sd, request, requestLen, 0);
1058 }
1059 else {
1060 struct msghdr msgh;
1061 struct iovec iov;
1062 char cbuf[256];
1063 fillMSGHdr(&msgh, &iov, cbuf, sizeof(cbuf), const_cast<char*>(request), requestLen, &ss->remote);
1064 addCMsgSrcAddr(&msgh, cbuf, &ss->sourceAddr, ss->sourceItf);
1065 result = sendmsg(sd, &msgh, 0);
fbe2a2e0
RG
1066 }
1067
b58f08e5
RG
1068 if (result == -1) {
1069 int savederrno = errno;
1070 vinfolog("Error sending request to backend %s: %d", ss->remote.toStringWithPort(), savederrno);
1071
1072 /* This might sound silly, but on Linux send() might fail with EINVAL
1073 if the interface the socket was bound to doesn't exist anymore. */
1074 if (savederrno == EINVAL) {
1075 ss->reconnect();
1076 }
fbe2a2e0
RG
1077 }
1078
b58f08e5 1079 return result;
fbe2a2e0
RG
1080}
1081
0beaa5c8 1082static bool isUDPQueryAcceptable(ClientState& cs, LocalHolders& holders, const struct msghdr* msgh, const ComboAddress& remote, ComboAddress& dest)
24d5cb00 1083{
0beaa5c8
RG
1084 if (msgh->msg_flags & MSG_TRUNC) {
1085 /* message was too large for our buffer */
1086 vinfolog("Dropping message too large for our buffer");
1087 g_stats.nonCompliantQueries++;
1088 return false;
1089 }
a4652d55 1090
0beaa5c8
RG
1091 if(!holders.acl->match(remote)) {
1092 vinfolog("Query from %s dropped because of ACL", remote.toStringWithPort());
1093 g_stats.aclDrops++;
1094 return false;
2b3eefc3 1095 }
2b3eefc3 1096
0beaa5c8
RG
1097 cs.queries++;
1098 g_stats.queries++;
2b3eefc3 1099
0beaa5c8
RG
1100 if (HarvestDestinationAddress(msgh, &dest)) {
1101 /* we don't get the port, only the address */
1102 dest.sin4.sin_port = cs.local.sin4.sin_port;
1103 }
1104 else {
1105 dest.sin4.sin_family = 0;
2b3eefc3 1106 }
549d63c9 1107
0beaa5c8
RG
1108 return true;
1109}
1110
11e1e08b 1111#ifdef HAVE_DNSCRYPT
0beaa5c8
RG
1112static bool checkDNSCryptQuery(const ClientState& cs, const char* query, uint16_t& len, std::shared_ptr<DnsCryptQuery>& dnsCryptQuery, const ComboAddress& dest, const ComboAddress& remote)
1113{
1114 if (cs.dnscryptCtx) {
1115 vector<uint8_t> response;
1116 uint16_t decryptedQueryLen = 0;
2b3eefc3 1117
0beaa5c8
RG
1118 dnsCryptQuery = std::make_shared<DnsCryptQuery>();
1119
1120 bool decrypted = handleDnsCryptQuery(cs.dnscryptCtx, const_cast<char*>(query), len, dnsCryptQuery, &decryptedQueryLen, false, response);
1121
1122 if (!decrypted) {
1123 if (response.size() > 0) {
1124 sendUDPResponse(cs.udpFD, reinterpret_cast<char*>(response.data()), static_cast<uint16_t>(response.size()), 0, dest, remote);
2b3eefc3 1125 }
0beaa5c8
RG
1126 return false;
1127 }
2b3eefc3 1128
0beaa5c8
RG
1129 len = decryptedQueryLen;
1130 }
1131 return true;
1132}
1133#endif /* HAVE_DNSCRYPT */
2b3eefc3 1134
0beaa5c8
RG
1135bool checkQueryHeaders(const struct dnsheader* dh)
1136{
1137 if (dh->qr) { // don't respond to responses
1138 g_stats.nonCompliantQueries++;
1139 return false;
1140 }
2b3eefc3 1141
0beaa5c8
RG
1142 if (dh->qdcount == 0) {
1143 g_stats.emptyQueries++;
1144 return false;
1145 }
0ba5eecf 1146
0beaa5c8
RG
1147 if (dh->rd) {
1148 g_stats.rdQueries++;
1149 }
e91084ce 1150
0beaa5c8
RG
1151 return true;
1152}
963bef8d 1153
0beaa5c8
RG
1154#if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1155static void queueResponse(const ClientState& cs, const char* response, uint16_t responseLen, const ComboAddress& dest, const ComboAddress& remote, struct mmsghdr& outMsg, struct iovec* iov, char* cbuf)
1156{
1157 outMsg.msg_len = 0;
1158 fillMSGHdr(&outMsg.msg_hdr, iov, nullptr, 0, const_cast<char*>(response), responseLen, const_cast<ComboAddress*>(&remote));
11e1e08b 1159
0beaa5c8
RG
1160 if (dest.sin4.sin_family == 0) {
1161 outMsg.msg_hdr.msg_control = nullptr;
1162 }
1163 else {
1164 addCMsgSrcAddr(&outMsg.msg_hdr, cbuf, &dest, 0);
1165 }
1166}
1167#endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
43eeadc1 1168
0beaa5c8
RG
1169static void processUDPQuery(ClientState& cs, LocalHolders& holders, const struct msghdr* msgh, const ComboAddress& remote, ComboAddress& dest, char* query, uint16_t len, size_t queryBufferSize, struct mmsghdr* responsesVect, unsigned int* queuedResponses, struct iovec* respIOV, char* respCBuf)
1170{
1171 assert(responsesVect == nullptr || (queuedResponses != nullptr && respIOV != nullptr && respCBuf != nullptr));
1172 uint16_t queryId = 0;
2b3eefc3 1173
0beaa5c8
RG
1174 try {
1175 if (!isUDPQueryAcceptable(cs, holders, msgh, remote, dest)) {
1176 return;
1177 }
7cea4e39 1178
11e1e08b 1179#ifdef HAVE_DNSCRYPT
0beaa5c8 1180 std::shared_ptr<DnsCryptQuery> dnsCryptQuery = nullptr;
11e1e08b 1181
0beaa5c8
RG
1182 if (!checkDNSCryptQuery(cs, query, len, dnsCryptQuery, dest, remote)) {
1183 return;
1184 }
11e1e08b
RG
1185#endif
1186
0beaa5c8
RG
1187 struct dnsheader* dh = reinterpret_cast<struct dnsheader*>(query);
1188 queryId = ntohs(dh->id);
11e1e08b 1189
0beaa5c8
RG
1190 if (!checkQueryHeaders(dh)) {
1191 return;
1192 }
2efd427d 1193
0beaa5c8
RG
1194 const uint16_t * flags = getFlagsFromDNSHeader(dh);
1195 const uint16_t origFlags = *flags;
1196 uint16_t qtype, qclass;
1197 unsigned int consumed = 0;
1198 DNSName qname(query, len, sizeof(dnsheader), false, &qtype, &qclass, &consumed);
1199 DNSQuestion dq(&qname, qtype, qclass, dest.sin4.sin_family != 0 ? &dest : &cs.local, &remote, dh, queryBufferSize, len, false);
11e1e08b 1200
0beaa5c8
RG
1201 string poolname;
1202 int delayMsec = 0;
1203 /* we need an accurate ("real") value for the response and
1204 to store into the IDS, but not for insertion into the
1205 rings for example */
1206 struct timespec realTime;
1207 struct timespec now;
1208 gettime(&now);
1209 gettime(&realTime, true);
1210
1211 if (!processQuery(holders, dq, poolname, &delayMsec, now))
1212 {
1213 return;
1214 }
b63add03 1215
0beaa5c8
RG
1216 if(dq.dh->qr) { // something turned it into a response
1217 g_stats.selfAnswered++;
1218 restoreFlags(dh, origFlags);
1219
1220 if (!cs.muted) {
11e1e08b 1221 char* response = query;
497a6e3a 1222 uint16_t responseLen = dq.len;
11e1e08b 1223
fcffc585 1224#ifdef HAVE_DNSCRYPT
0beaa5c8
RG
1225 if (!encryptResponse(response, &responseLen, dq.size, false, dnsCryptQuery, nullptr, nullptr)) {
1226 return;
1227 }
11e1e08b 1228#endif
0beaa5c8
RG
1229#if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1230 if (delayMsec == 0 && responsesVect != nullptr) {
1231 queueResponse(cs, response, responseLen, dest, remote, responsesVect[*queuedResponses], respIOV, respCBuf);
1232 (*queuedResponses)++;
1233 }
1234 else
1235#endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1236 {
1237 sendUDPResponse(cs.udpFD, response, responseLen, delayMsec, dest, remote);
b5b93e0b 1238 }
22b2b326 1239 }
5f504638 1240
0beaa5c8
RG
1241 return;
1242 }
1243
1244 DownstreamState* ss = nullptr;
1245 std::shared_ptr<ServerPool> serverPool = getPool(*holders.pools, poolname);
1246 std::shared_ptr<DNSDistPacketCache> packetCache = nullptr;
1247 auto policy = holders.policy->policy;
1248 if (serverPool->policy != nullptr) {
1249 policy = serverPool->policy->policy;
1250 }
1251 {
1252 std::lock_guard<std::mutex> lock(g_luamutex);
1253 ss = policy(serverPool->servers, &dq).get();
1254 packetCache = serverPool->packetCache;
1255 }
228e4fe8 1256
0beaa5c8
RG
1257 bool ednsAdded = false;
1258 bool ecsAdded = false;
1259 if (dq.useECS && ss && ss->useECS) {
1260 if (!handleEDNSClientSubnet(query, dq.size, consumed, &dq.len, &(ednsAdded), &(ecsAdded), remote, dq.ecsOverride, dq.ecsPrefixLength)) {
126dcf66 1261 vinfolog("Dropping query from %s because we couldn't insert the ECS value", remote.toStringWithPort());
0beaa5c8 1262 return;
886e2cf2 1263 }
0beaa5c8 1264 }
886e2cf2 1265
0beaa5c8
RG
1266 uint32_t cacheKey = 0;
1267 if (packetCache && !dq.skipCache) {
1268 uint16_t cachedResponseSize = dq.size;
1269 uint32_t allowExpired = ss ? 0 : g_staleCacheEntriesTTL;
1270 if (packetCache->get(dq, consumed, dh->id, query, &cachedResponseSize, &cacheKey, allowExpired)) {
1271 DNSResponse dr(dq.qname, dq.qtype, dq.qclass, dq.local, dq.remote, reinterpret_cast<dnsheader*>(query), dq.size, cachedResponseSize, false, &realTime);
cf48b0ce 1272#ifdef HAVE_PROTOBUF
0beaa5c8 1273 dr.uniqueId = dq.uniqueId;
cf48b0ce 1274#endif
0beaa5c8
RG
1275 if (!processResponse(holders.cacheHitRespRulactions, dr, &delayMsec)) {
1276 return;
1277 }
cf48b0ce 1278
0beaa5c8 1279 if (!cs.muted) {
fcffc585 1280#ifdef HAVE_DNSCRYPT
0beaa5c8
RG
1281 if (!encryptResponse(query, &cachedResponseSize, dq.size, false, dnsCryptQuery, nullptr, nullptr)) {
1282 return;
1283 }
fcffc585 1284#endif
0beaa5c8
RG
1285#if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1286 if (delayMsec == 0 && responsesVect != nullptr) {
1287 queueResponse(cs, query, cachedResponseSize, dest, remote, responsesVect[*queuedResponses], respIOV, respCBuf);
1288 (*queuedResponses)++;
1289 }
1290 else
1291#endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1292 {
1293 sendUDPResponse(cs.udpFD, query, cachedResponseSize, delayMsec, dest, remote);
b5b93e0b 1294 }
886e2cf2 1295 }
0beaa5c8
RG
1296
1297 g_stats.cacheHits++;
1298 g_stats.latency0_1++; // we're not going to measure this
1299 doLatencyAverages(0); // same
1300 return;
886e2cf2 1301 }
0beaa5c8
RG
1302 g_stats.cacheMisses++;
1303 }
886e2cf2 1304
0beaa5c8
RG
1305 if(!ss) {
1306 g_stats.noPolicy++;
26a3cdb7 1307
0beaa5c8
RG
1308 if (g_servFailOnNoPolicy && !cs.muted) {
1309 char* response = query;
1310 uint16_t responseLen = dq.len;
1311 restoreFlags(dh, origFlags);
26a3cdb7 1312
0beaa5c8
RG
1313 dq.dh->rcode = RCode::ServFail;
1314 dq.dh->qr = true;
26a3cdb7
RG
1315
1316#ifdef HAVE_DNSCRYPT
0beaa5c8
RG
1317 if (!encryptResponse(response, &responseLen, dq.size, false, dnsCryptQuery, nullptr, nullptr)) {
1318 return;
1319 }
26a3cdb7 1320#endif
0beaa5c8
RG
1321#if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1322 if (responsesVect != nullptr) {
1323 queueResponse(cs, response, responseLen, dest, remote, responsesVect[*queuedResponses], respIOV, respCBuf);
1324 (*queuedResponses)++;
1325 }
1326 else
1327#endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1328 {
1329 sendUDPResponse(cs.udpFD, response, responseLen, 0, dest, remote);
26a3cdb7 1330 }
1ea747c0 1331 }
0beaa5c8
RG
1332 vinfolog("%s query for %s|%s from %s, no policy applied", g_servFailOnNoPolicy ? "Dropped" : "ServFailed", dq.qname->toString(), QType(dq.qtype).getName(), remote.toStringWithPort());
1333 return;
1334 }
1ea747c0 1335
0beaa5c8 1336 ss->queries++;
11e1e08b 1337
0beaa5c8
RG
1338 unsigned int idOffset = (ss->idOffset++) % ss->idStates.size();
1339 IDState* ids = &ss->idStates[idOffset];
1340 ids->age = 0;
c9ba8478 1341
0beaa5c8
RG
1342 if(ids->origFD < 0) // if we are reusing, no change in outstanding
1343 ss->outstanding++;
1344 else {
1345 ss->reuseds++;
1346 g_stats.downstreamTimeouts++;
1347 }
0e41337b 1348
0beaa5c8
RG
1349 ids->cs = &cs;
1350 ids->origFD = cs.udpFD;
1351 ids->origID = dh->id;
1352 ids->origRemote = remote;
1353 ids->sentTime.set(realTime);
1354 ids->qname = qname;
1355 ids->qtype = dq.qtype;
1356 ids->qclass = dq.qclass;
1357 ids->delayMsec = delayMsec;
1358 ids->origFlags = origFlags;
1359 ids->cacheKey = cacheKey;
1360 ids->skipCache = dq.skipCache;
1361 ids->packetCache = packetCache;
1362 ids->ednsAdded = ednsAdded;
1363 ids->ecsAdded = ecsAdded;
1364
1365 /* If we couldn't harvest the real dest addr, still
1366 write down the listening addr since it will be useful
1367 (especially if it's not an 'any' one).
1368 We need to keep track of which one it is since we may
1369 want to use the real but not the listening addr to reply.
1370 */
1371 if (dest.sin4.sin_family != 0) {
1372 ids->origDest = dest;
1373 ids->destHarvested = true;
1374 }
1375 else {
1376 ids->origDest = cs.local;
1377 ids->destHarvested = false;
1378 }
11e1e08b 1379#ifdef HAVE_DNSCRYPT
0beaa5c8 1380 ids->dnsCryptQuery = dnsCryptQuery;
d8c19b98
RG
1381#endif
1382#ifdef HAVE_PROTOBUF
0beaa5c8 1383 ids->uniqueId = dq.uniqueId;
11e1e08b 1384#endif
c9ba8478 1385
0beaa5c8 1386 dh->id = idOffset;
ca404e94 1387
0beaa5c8 1388 ssize_t ret = udpClientSendRequestToBackend(ss, ss->fd, query, dq.len);
11e1e08b 1389
0beaa5c8
RG
1390 if(ret < 0) {
1391 ss->sendErrors++;
1392 g_stats.downstreamSendErrors++;
1393 }
ca404e94 1394
0beaa5c8
RG
1395 vinfolog("Got query for %s|%s from %s, relayed to %s", ids->qname.toString(), QType(ids->qtype).getName(), remote.toStringWithPort(), ss->getName());
1396 }
1397 catch(const std::exception& e){
1398 vinfolog("Got an error in UDP question thread while parsing a query from %s, id %d: %s", remote.toStringWithPort(), queryId, e.what());
1399 }
1400}
1401
1402#if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1403static void MultipleMessagesUDPClientThread(ClientState* cs, LocalHolders& holders)
1404{
1405 struct MMReceiver
1406 {
1407 char packet[4096];
1408 /* used by HarvestDestinationAddress */
1409 char cbuf[256];
1410 ComboAddress remote;
1411 ComboAddress dest;
1412 struct iovec iov;
1413 };
1414 const size_t vectSize = g_udpVectorSize;
1415 /* the actual buffer is larger because:
1416 - we may have to add EDNS and/or ECS
1417 - we use it for self-generated responses (from rule or cache)
1418 but we only accept incoming payloads up to that size
1419 */
1420 static_assert(s_udpIncomingBufferSize <= sizeof(MMReceiver::packet), "the incoming buffer size should not be larger than sizeof(MMReceiver::packet)");
1421
1422 auto recvData = std::unique_ptr<MMReceiver[]>(new MMReceiver[vectSize]);
1423 auto msgVec = std::unique_ptr<struct mmsghdr[]>(new struct mmsghdr[vectSize]);
1424 auto outMsgVec = std::unique_ptr<struct mmsghdr[]>(new struct mmsghdr[vectSize]);
1425
1426 /* initialize the structures needed to receive our messages */
1427 for (size_t idx = 0; idx < vectSize; idx++) {
1428 recvData[idx].remote.sin4.sin_family = cs->local.sin4.sin_family;
1429 fillMSGHdr(&msgVec[idx].msg_hdr, &recvData[idx].iov, recvData[idx].cbuf, sizeof(recvData[idx].cbuf), recvData[idx].packet, s_udpIncomingBufferSize, &recvData[idx].remote);
1430 }
1431
1432 /* go now */
1433 for(;;) {
1434
1435 /* reset the IO vector, since it's also used to send the vector of responses
1436 to avoid having to copy the data around */
1437 for (size_t idx = 0; idx < vectSize; idx++) {
1438 recvData[idx].iov.iov_base = recvData[idx].packet;
1439 recvData[idx].iov.iov_len = sizeof(recvData[idx].packet);
5f504638 1440 }
0beaa5c8
RG
1441
1442 /* block until we have at least one message ready, but return
1443 as many as possible to save the syscall costs */
1444 int msgsGot = recvmmsg(cs->udpFD, msgVec.get(), vectSize, MSG_WAITFORONE | MSG_TRUNC, nullptr);
1445
1446 if (msgsGot <= 0) {
1447 vinfolog("Getting UDP messages via recvmmsg() failed with: %s", strerror(errno));
1448 continue;
773470ca 1449 }
0beaa5c8
RG
1450
1451 unsigned int msgsToSend = 0;
1452
1453 /* process the received messages */
1454 for (int msgIdx = 0; msgIdx < msgsGot; msgIdx++) {
1455 const struct msghdr* msgh = &msgVec[msgIdx].msg_hdr;
1456 unsigned int got = msgVec[msgIdx].msg_len;
1457 const ComboAddress& remote = recvData[msgIdx].remote;
1458
1459 if (got < sizeof(struct dnsheader)) {
1460 g_stats.nonCompliantQueries++;
1461 continue;
1462 }
1463
1464 processUDPQuery(*cs, holders, msgh, remote, recvData[msgIdx].dest, recvData[msgIdx].packet, static_cast<uint16_t>(got), sizeof(recvData[msgIdx].packet), outMsgVec.get(), &msgsToSend, &recvData[msgIdx].iov, recvData[msgIdx].cbuf);
1465
2b3eefc3 1466 }
0beaa5c8
RG
1467
1468 /* immediate (not delayed or sent to a backend) responses (mostly from a rule, dynamic block
1469 or the cache) can be sent in batch too */
1470
1471 if (msgsToSend > 0 && msgsToSend <= static_cast<unsigned int>(msgsGot)) {
1472 int sent = sendmmsg(cs->udpFD, outMsgVec.get(), msgsToSend, 0);
1473
2b3eefc3 1474 if (sent < 0 || static_cast<unsigned int>(sent) != msgsToSend) {
0beaa5c8 1475 vinfolog("Error sending responses with sendmmsg() (%d on %u): %s", sent, msgsToSend, strerror(errno));
2b3eefc3 1476 }
2b3eefc3 1477 }
0beaa5c8 1478
24d5cb00 1479 }
0beaa5c8
RG
1480}
1481#endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1482
1483// listens to incoming queries, sends out to downstream servers, noting the intended return path
1484static void* udpClientThread(ClientState* cs)
1485try
1486{
1487 LocalHolders holders;
1488
1489#if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
1490 if (g_udpVectorSize > 1) {
1491 MultipleMessagesUDPClientThread(cs, holders);
1492
1493 }
1494 else
1495#endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
1496 {
1497 char packet[4096];
1498 /* the actual buffer is larger because:
1499 - we may have to add EDNS and/or ECS
1500 - we use it for self-generated responses (from rule or cache)
1501 but we only accept incoming payloads up to that size
1502 */
1503 static_assert(s_udpIncomingBufferSize <= sizeof(packet), "the incoming buffer size should not be larger than sizeof(MMReceiver::packet)");
1504 struct msghdr msgh;
1505 struct iovec iov;
1506 /* used by HarvestDestinationAddress */
1507 char cbuf[256];
1508
1509 ComboAddress remote;
1510 ComboAddress dest;
1511 remote.sin4.sin_family = cs->local.sin4.sin_family;
1512 fillMSGHdr(&msgh, &iov, cbuf, sizeof(cbuf), packet, sizeof(packet), &remote);
1513
1514 for(;;) {
1515 ssize_t got = recvmsg(cs->udpFD, &msgh, 0);
1516
1517 if (got < 0 || static_cast<size_t>(got) < sizeof(struct dnsheader)) {
1518 g_stats.nonCompliantQueries++;
1519 continue;
1520 }
1521
1522 processUDPQuery(*cs, holders, &msgh, remote, dest, packet, static_cast<uint16_t>(got), s_udpIncomingBufferSize, nullptr, nullptr, nullptr, nullptr);
1523 }
1524 }
1525
1526 return nullptr;
24d5cb00 1527}
2b3eefc3 1528catch(const std::exception &e)
a4652d55 1529{
1530 errlog("UDP client thread died because of exception: %s", e.what());
0beaa5c8 1531 return nullptr;
a4652d55 1532}
2b3eefc3 1533catch(const PDNSException &e)
a4652d55 1534{
1535 errlog("UDP client thread died because of PowerDNS exception: %s", e.reason);
0beaa5c8 1536 return nullptr;
a4652d55 1537}
1538catch(...)
1539{
1540 errlog("UDP client thread died because of an exception: %s", "unknown");
0beaa5c8 1541 return nullptr;
a4652d55 1542}
24d5cb00 1543
fbe2a2e0 1544static bool upCheck(DownstreamState& ds)
773470ca 1545try
1546{
1547 vector<uint8_t> packet;
fbe2a2e0 1548 DNSPacketWriter dpw(packet, ds.checkName, ds.checkType.getCode());
fc01e0b1
RG
1549 dnsheader * requestHeader = dpw.getHeader();
1550 requestHeader->rd=true;
21830638
RG
1551 if (ds.setCD) {
1552 requestHeader->cd = true;
1553 }
773470ca 1554
fbe2a2e0 1555 Socket sock(ds.remote.sin4.sin_family, SOCK_DGRAM);
773470ca 1556 sock.setNonBlocking();
fbe2a2e0
RG
1557 if (!IsAnyAddress(ds.sourceAddr)) {
1558 sock.setReuseAddr();
1559 sock.bind(ds.sourceAddr);
1560 }
1561 sock.connect(ds.remote);
1562 ssize_t sent = udpClientSendRequestToBackend(&ds, sock.getHandle(), (char*)&packet[0], packet.size());
9e87dcb8
RG
1563 if (sent < 0) {
1564 int ret = errno;
1565 if (g_verboseHealthChecks)
fd23d9bb 1566 infolog("Error while sending a health check query to backend %s: %d", ds.getNameWithAddr(), ret);
fbe2a2e0 1567 return false;
9e87dcb8 1568 }
fbe2a2e0 1569
773470ca 1570 int ret=waitForRWData(sock.getHandle(), true, 1, 0);
9e87dcb8
RG
1571 if(ret < 0 || !ret) { // error, timeout, both are down!
1572 if (ret < 0) {
1573 ret = errno;
1574 if (g_verboseHealthChecks)
fd23d9bb 1575 infolog("Error while waiting for the health check response from backend %s: %d", ds.getNameWithAddr(), ret);
9e87dcb8
RG
1576 }
1577 else {
1578 if (g_verboseHealthChecks)
fd23d9bb 1579 infolog("Timeout while waiting for the health check response from backend %s", ds.getNameWithAddr());
9e87dcb8 1580 }
773470ca 1581 return false;
9e87dcb8
RG
1582 }
1583
773470ca 1584 string reply;
fbe2a2e0 1585 sock.recvFrom(reply, ds.remote);
773470ca 1586
231d3b6b 1587 const dnsheader * responseHeader = (const dnsheader *) reply.c_str();
fc01e0b1 1588
9e87dcb8
RG
1589 if (reply.size() < sizeof(*responseHeader)) {
1590 if (g_verboseHealthChecks)
fd23d9bb 1591 infolog("Invalid health check response of size %d from backend %s, expecting at least %d", reply.size(), ds.getNameWithAddr(), sizeof(*responseHeader));
fc01e0b1 1592 return false;
9e87dcb8 1593 }
fc01e0b1 1594
9e87dcb8
RG
1595 if (responseHeader->id != requestHeader->id) {
1596 if (g_verboseHealthChecks)
fd23d9bb 1597 infolog("Invalid health check response id %d from backend %s, expecting %d", responseHeader->id, ds.getNameWithAddr(), requestHeader->id);
fc01e0b1 1598 return false;
9e87dcb8
RG
1599 }
1600
1601 if (!responseHeader->qr) {
1602 if (g_verboseHealthChecks)
fd23d9bb 1603 infolog("Invalid health check response from backend %s, expecting QR to be set", ds.getNameWithAddr());
fc01e0b1 1604 return false;
9e87dcb8
RG
1605 }
1606
1607 if (responseHeader->rcode == RCode::ServFail) {
1608 if (g_verboseHealthChecks)
fd23d9bb 1609 infolog("Backend %s responded to health check with ServFail", ds.getNameWithAddr());
fc01e0b1 1610 return false;
9e87dcb8
RG
1611 }
1612
1613 if (ds.mustResolve && (responseHeader->rcode == RCode::NXDomain || responseHeader->rcode == RCode::Refused)) {
1614 if (g_verboseHealthChecks)
fd23d9bb 1615 infolog("Backend %s responded to health check with %s while mustResolve is set", ds.getNameWithAddr(), responseHeader->rcode == RCode::NXDomain ? "NXDomain" : "Refused");
a6e02424 1616 return false;
9e87dcb8 1617 }
fc01e0b1 1618
773470ca 1619 // XXX fixme do bunch of checking here etc
1620 return true;
1621}
9e87dcb8
RG
1622catch(const std::exception& e)
1623{
1624 if (g_verboseHealthChecks)
fd23d9bb 1625 infolog("Error checking the health of backend %s: %s", ds.getNameWithAddr(), e.what());
9e87dcb8
RG
1626 return false;
1627}
773470ca 1628catch(...)
1629{
9e87dcb8 1630 if (g_verboseHealthChecks)
fd23d9bb 1631 infolog("Unknown exception while checking the health of backend %s", ds.getNameWithAddr());
773470ca 1632 return false;
1633}
726ddf60 1634
6c1ca990 1635uint64_t g_maxTCPClientThreads{10};
886e2cf2 1636std::atomic<uint16_t> g_cacheCleaningDelay{60};
f65ea0c2 1637std::atomic<uint16_t> g_cacheCleaningPercentage{100};
e41f8165 1638
3c115e0f 1639void* maintThread()
886e2cf2
RG
1640{
1641 int interval = 1;
1642 size_t counter = 0;
5f3ea719 1643 int32_t secondsToWaitLog = 0;
886e2cf2
RG
1644
1645 for(;;) {
1646 sleep(interval);
1647
069e59db 1648 {
1649 std::lock_guard<std::mutex> lock(g_luamutex);
5f3ea719
PL
1650 auto f = g_lua.readVariable<boost::optional<std::function<void()> > >("maintenance");
1651 if(f) {
1652 try {
1653 (*f)();
1654 secondsToWaitLog = 0;
1655 }
1656 catch(std::exception &e) {
1657 if (secondsToWaitLog <= 0) {
1658 infolog("Error during execution of maintenance function: %s", e.what());
1659 secondsToWaitLog = 61;
1660 }
1661 secondsToWaitLog -= interval;
1662 }
1663 }
069e59db 1664 }
886e2cf2
RG
1665
1666 counter++;
1667 if (counter >= g_cacheCleaningDelay) {
1668 const auto localPools = g_pools.getCopy();
e6557ec8 1669 std::shared_ptr<DNSDistPacketCache> packetCache = nullptr;
886e2cf2 1670 for (const auto& entry : localPools) {
e6557ec8
RG
1671 {
1672 std::lock_guard<std::mutex> lock(g_luamutex);
1673 packetCache = entry.second->packetCache;
1674 }
1675 if (packetCache) {
f65ea0c2
RG
1676 size_t upTo = (packetCache->getMaxEntries()* (100 - g_cacheCleaningPercentage)) / 100;
1677 packetCache->purgeExpired(upTo);
886e2cf2
RG
1678 }
1679 }
1680 counter = 0;
1681 }
1682
1683 // ponder pruning g_dynblocks of expired entries here
1684 }
1685 return 0;
1686}
1687
1688void* healthChecksThread()
3ae86514 1689{
fcadd56e 1690 int interval = 1;
64e4ebb4 1691
3ae86514 1692 for(;;) {
1693 sleep(interval);
773470ca 1694
ded1985a 1695 if(g_tcpclientthreads->getQueuedCount() > 1 && !g_tcpclientthreads->hasReachedMaxThreads())
a9bf3ec4 1696 g_tcpclientthreads->addTCPClientThread();
3ae86514 1697
e5a14b2b 1698 for(auto& dss : g_dstates.getCopy()) { // this points to the actual shared_ptrs!
773470ca 1699 if(dss->availability==DownstreamState::Availability::Auto) {
2993d58d 1700 bool newState=upCheck(*dss);
1701 if (newState) {
1702 if (dss->currentCheckFailures != 0) {
1703 dss->currentCheckFailures = 0;
1704 }
1705 }
1706 else if (!newState && dss->upStatus) {
1707 dss->currentCheckFailures++;
1708 if (dss->currentCheckFailures < dss->maxCheckFailures) {
1709 newState = true;
1710 }
1711 }
1712
1713 if(newState != dss->upStatus) {
1714 warnlog("Marking downstream %s as '%s'", dss->getNameWithAddr(), newState ? "up" : "down");
7565f4e6
RG
1715
1716 if (newState && !dss->connected) {
1717 try {
1718 SConnect(dss->fd, dss->remote);
1719 dss->connected = true;
2717a92f 1720 dss->tid = thread(responderThread, dss);
7565f4e6
RG
1721 }
1722 catch(const std::runtime_error& error) {
1723 infolog("Error connecting to new server with address %s: %s", dss->remote.toStringWithPort(), error.what());
b58f08e5
RG
1724 newState = false;
1725 dss->connected = false;
7565f4e6
RG
1726 }
1727 }
1728
2993d58d 1729 dss->upStatus = newState;
1730 dss->currentCheckFailures = 0;
9f4eb5cc
RG
1731 if (g_snmpAgent && g_snmpTrapsEnabled) {
1732 g_snmpAgent->sendBackendStatusChangeTrap(dss);
1733 }
2993d58d 1734 }
773470ca 1735 }
b076b34a 1736
773470ca 1737 auto delta = dss->sw.udiffAndSet()/1000000.0;
1738 dss->queryLoad = 1.0*(dss->queries.load() - dss->prev.queries.load())/delta;
1739 dss->dropRate = 1.0*(dss->reuseds.load() - dss->prev.reuseds.load())/delta;
3c115e0f 1740 dss->prev.queries.store(dss->queries.load());
773470ca 1741 dss->prev.reuseds.store(dss->reuseds.load());
3c115e0f 1742
773470ca 1743 for(IDState& ids : dss->idStates) { // timeouts
e0b5e49d 1744 if(ids.origFD >=0 && ids.age++ > g_udpTimeout) {
51642fe3
RG
1745 /* We set origFD to -1 as soon as possible
1746 to limit the risk of racing with the
1747 responder thread.
1748 The UDP client thread only checks origFD to
1749 know whether outstanding has to be incremented,
1750 so the sooner the better any way since we _will_
1751 decrement it.
1752 */
1753 ids.origFD = -1;
64e4ebb4 1754 ids.age = 0;
3c115e0f 1755 dss->reuseds++;
1756 --dss->outstanding;
51642fe3 1757 g_stats.downstreamTimeouts++; // this is an 'actively' discovered timeout
c08a5092 1758 vinfolog("Had a downstream timeout from %s (%s) for query for %s|%s from %s",
1759 dss->remote.toStringWithPort(), dss->name,
1760 ids.qname.toString(), QType(ids.qtype).getName(), ids.origRemote.toStringWithPort());
51642fe3 1761
c2fbeb27 1762 struct timespec ts;
85c7ca75 1763 gettime(&ts);
c2fbeb27 1764
2d11d1b2 1765 struct dnsheader fake;
1766 memset(&fake, 0, sizeof(fake));
1767 fake.id = ids.origID;
c2fbeb27 1768
1769 std::lock_guard<std::mutex> lock(g_rings.respMutex);
1770 g_rings.respRing.push_back({ts, ids.origRemote, ids.qname, ids.qtype, std::numeric_limits<unsigned int>::max(), 0, fake, dss->remote});
232f0877 1771 }
3ae86514 1772 }
1773 }
3ae86514 1774 }
1775 return 0;
1776}
1777
8c732ebf 1778string g_key;
3ae86514 1779
6d01c80c 1780
1781void controlThread(int fd, ComboAddress local)
1782try
1783{
1784 ComboAddress client;
1785 int sock;
1786 warnlog("Accepting control connections on %s", local.toStringWithPort());
1787 while((sock=SAccept(fd, client)) >= 0) {
506bb661
RG
1788 if (g_logConsoleConnections) {
1789 warnlog("Got control connection from %s", client.toStringWithPort());
1790 }
1791
6d01c80c 1792 thread t(controlClientThread, sock, client);
1793 t.detach();
1794 }
1795}
1796catch(std::exception& e)
1797{
1798 close(fd);
1799 errlog("Control connection died: %s", e.what());
1800}
3f25bce6 1801
b076b34a 1802
3c115e0f 1803
c2a42f97 1804static void bindAny(int af, int sock)
1805{
1806 int one = 1;
1807
1808#ifdef IP_FREEBIND
1809 if (setsockopt(sock, IPPROTO_IP, IP_FREEBIND, &one, sizeof(one)) < 0)
1810 warnlog("Warning: IP_FREEBIND setsockopt failed: %s", strerror(errno));
1811#endif
1812
1813#ifdef IP_BINDANY
1814 if (af == AF_INET)
1815 if (setsockopt(sock, IPPROTO_IP, IP_BINDANY, &one, sizeof(one)) < 0)
1816 warnlog("Warning: IP_BINDANY setsockopt failed: %s", strerror(errno));
1817#endif
1818#ifdef IPV6_BINDANY
1819 if (af == AF_INET6)
1820 if (setsockopt(sock, IPPROTO_IPV6, IPV6_BINDANY, &one, sizeof(one)) < 0)
1821 warnlog("Warning: IPV6_BINDANY setsockopt failed: %s", strerror(errno));
1822#endif
1823#ifdef SO_BINDANY
1824 if (setsockopt(sock, SOL_SOCKET, SO_BINDANY, &one, sizeof(one)) < 0)
1825 warnlog("Warning: SO_BINDANY setsockopt failed: %s", strerror(errno));
1826#endif
1827}
1828
a36ce055
RG
1829static void dropGroupPrivs(gid_t gid)
1830{
1831 if (gid) {
1832 if (setgid(gid) == 0) {
1833 if (setgroups(0, NULL) < 0) {
1834 warnlog("Warning: Unable to drop supplementary gids: %s", strerror(errno));
1835 }
1836 }
1837 else {
1838 warnlog("Warning: Unable to set group ID to %d: %s", gid, strerror(errno));
1839 }
1840 }
1841}
1842
1843static void dropUserPrivs(uid_t uid)
1844{
1845 if(uid) {
1846 if(setuid(uid) < 0) {
1847 warnlog("Warning: Unable to set user ID to %d: %s", uid, strerror(errno));
1848 }
1849 }
1850}
1851
41408d3a
RG
1852static void checkFileDescriptorsLimits(size_t udpBindsCount, size_t tcpBindsCount)
1853{
1854 /* stdin, stdout, stderr */
1855 size_t requiredFDsCount = 3;
1856 size_t backendsCount = g_dstates.getCopy().size();
9fcd6adb 1857 /* listening sockets */
41408d3a
RG
1858 requiredFDsCount += udpBindsCount;
1859 requiredFDsCount += tcpBindsCount;
1860 /* max TCP connections currently served */
1861 requiredFDsCount += g_maxTCPClientThreads;
f2e29d04 1862 /* max pipes for communicating between TCP acceptors and client threads */
41408d3a
RG
1863 requiredFDsCount += (g_maxTCPClientThreads * 2);
1864 /* UDP sockets to backends */
1865 requiredFDsCount += backendsCount;
1866 /* TCP sockets to backends */
1867 requiredFDsCount += (backendsCount * g_maxTCPClientThreads);
1868 /* max TCP queued connections */
9fcd6adb 1869 requiredFDsCount += g_maxTCPQueuedConnections;
41408d3a
RG
1870 /* DelayPipe pipe */
1871 requiredFDsCount += 2;
1872 /* syslog socket */
1873 requiredFDsCount++;
1874 /* webserver main socket */
1875 requiredFDsCount++;
1876 /* console main socket */
1877 requiredFDsCount++;
1878 /* carbon export */
1879 requiredFDsCount++;
1880 /* history file */
1881 requiredFDsCount++;
1882 struct rlimit rl;
1883 getrlimit(RLIMIT_NOFILE, &rl);
9fcd6adb 1884 if (rl.rlim_cur <= requiredFDsCount) {
41408d3a
RG
1885 warnlog("Warning, this configuration can use more than %d file descriptors, web server and console connections not included, and the current limit is %d.", std::to_string(requiredFDsCount), std::to_string(rl.rlim_cur));
1886#ifdef HAVE_SYSTEMD
1887 warnlog("You can increase this value by using LimitNOFILE= in the systemd unit file or ulimit.");
1888#else
1889 warnlog("You can increase this value by using ulimit.");
1890#endif
1891 }
1892}
c2a42f97 1893
7cc68f53 1894struct
1895{
1896 vector<string> locals;
1897 vector<string> remotes;
5efcfa63 1898 bool checkConfig{false};
7cc68f53 1899 bool beDaemon{false};
1900 bool beClient{false};
505ca3d1 1901 bool beSupervised{false};
c7b1d943 1902 string pidfile;
7cc68f53 1903 string command;
1904 string config;
a36ce055
RG
1905 string uid;
1906 string gid;
7cc68f53 1907} g_cmdLine;
520eb5a0 1908
e41f8165 1909std::atomic<bool> g_configurationDone{false};
520eb5a0 1910
24d5cb00 1911int main(int argc, char** argv)
1912try
1913{
41408d3a
RG
1914 size_t udpBindsCount = 0;
1915 size_t tcpBindsCount = 0;
94721140 1916 rl_attempted_completion_function = my_completion;
1917 rl_completion_append_character = 0;
1918
726ddf60 1919 signal(SIGPIPE, SIG_IGN);
6d01c80c 1920 signal(SIGCHLD, SIG_IGN);
b076b34a 1921 openlog("dnsdist", LOG_PID, LOG_DAEMON);
a4652d55 1922 g_console=true;
6d01c80c 1923
0b62ec78 1924#ifdef HAVE_LIBSODIUM
6d01c80c 1925 if (sodium_init() == -1) {
1926 cerr<<"Unable to initialize crypto library"<<endl;
1927 exit(EXIT_FAILURE);
1928 }
7691e7df 1929 g_hashperturb=randombytes_uniform(0xffffffff);
1930 srandom(randombytes_uniform(0xffffffff));
1931#else
1932 {
1933 struct timeval tv;
1934 gettimeofday(&tv, 0);
1935 srandom(tv.tv_sec ^ tv.tv_usec ^ getpid());
1936 g_hashperturb=random();
1937 }
1938
0b62ec78 1939#endif
094b6aff 1940 ComboAddress clientAddress = ComboAddress();
11058bd6 1941 g_cmdLine.config=SYSCONFDIR "/dnsdist.conf";
7cc68f53 1942 struct option longopts[]={
8f2d5ec3 1943 {"acl", required_argument, 0, 'a'},
7cc68f53 1944 {"config", required_argument, 0, 'C'},
5efcfa63 1945 {"check-config", 0, 0, 1},
7cc68f53 1946 {"execute", required_argument, 0, 'e'},
4fdb62a4 1947 {"client", 0, 0, 'c'},
a36ce055 1948 {"gid", required_argument, 0, 'g'},
ddb14ec9
PL
1949#ifdef HAVE_LIBSODIUM
1950 {"setkey", required_argument, 0, 'k'},
1951#endif
7cc68f53 1952 {"local", required_argument, 0, 'l'},
98dd1cda 1953 {"daemon", 0, 0, 'd'},
c7b1d943 1954 {"pidfile", required_argument, 0, 'p'},
505ca3d1 1955 {"supervised", 0, 0, 's'},
bbfaaa6f 1956 {"disable-syslog", 0, 0, 2},
a36ce055 1957 {"uid", required_argument, 0, 'u'},
359d7ca4 1958 {"verbose", 0, 0, 'v'},
6306c282 1959 {"version", 0, 0, 'V'},
c7b1d943 1960 {"help", 0, 0, 'h'},
7cc68f53 1961 {0,0,0,0}
1962 };
1963 int longindex=0;
8f2d5ec3 1964 string optstring;
7cc68f53 1965 for(;;) {
ddb14ec9
PL
1966#ifdef HAVE_LIBSODIUM
1967 int c=getopt_long(argc, argv, "a:hcde:C:k:l:vp:g:u:V", longopts, &longindex);
1968#else
6306c282 1969 int c=getopt_long(argc, argv, "a:hcde:C:l:vp:g:u:V", longopts, &longindex);
ddb14ec9 1970#endif
7cc68f53 1971 if(c==-1)
1972 break;
1973 switch(c) {
5efcfa63
PL
1974 case 1:
1975 g_cmdLine.checkConfig=true;
1976 break;
bbfaaa6f
PL
1977 case 2:
1978 g_syslog=false;
1979 break;
7cc68f53 1980 case 'C':
1981 g_cmdLine.config=optarg;
1982 break;
1983 case 'c':
1984 g_cmdLine.beClient=true;
1985 break;
1986 case 'd':
98dd1cda 1987 g_cmdLine.beDaemon=true;
7cc68f53 1988 break;
1989 case 'e':
1990 g_cmdLine.command=optarg;
1991 break;
a36ce055
RG
1992 case 'g':
1993 g_cmdLine.gid=optarg;
1994 break;
7cc68f53 1995 case 'h':
6306c282
PL
1996 cout<<"dnsdist "<<VERSION<<endl;
1997 cout<<endl;
094b6aff 1998 cout<<"Syntax: dnsdist [-C,--config file] [-c,--client [IP[:PORT]]] [-d,--daemon]\n";
c7b1d943 1999 cout<<"[-p,--pidfile file] [-e,--execute cmd] [-h,--help] [-l,--local addr]\n";
c68aae2e 2000 cout<<"[-v,--verbose] [--check-config]\n";
7cc68f53 2001 cout<<"\n";
8f2d5ec3 2002 cout<<"-a,--acl netmask Add this netmask to the ACL\n";
7cc68f53 2003 cout<<"-C,--config file Load configuration from 'file'\n";
094b6aff
PL
2004 cout<<"-c,--client Operate as a client, connect to dnsdist. This reads\n";
2005 cout<<" controlSocket from your configuration file, but also\n";
2006 cout<<" accepts an IP:PORT argument\n";
ddb14ec9
PL
2007#ifdef HAVE_LIBSODIUM
2008 cout<<"-k,--setkey KEY Use KEY for encrypted communication to dnsdist. This\n";
2009 cout<<" is similar to setting setKey in the configuration file.\n";
2010 cout<<" NOTE: this will leak this key in your shell's history!\n";
2011#endif
c68aae2e
PL
2012 cout<<"--check-config Validate the configuration file and exit. The exit-code\n";
2013 cout<<" reflects the validation, 0 is OK, 1 means an error.\n";
2014 cout<<" Any errors are printed as well.\n";
98dd1cda 2015 cout<<"-d,--daemon Operate as a daemon\n";
7cc68f53 2016 cout<<"-e,--execute cmd Connect to dnsdist and execute 'cmd'\n";
a36ce055 2017 cout<<"-g,--gid gid Change the process group ID after binding sockets\n";
7cc68f53 2018 cout<<"-h,--help Display this helpful message\n";
2019 cout<<"-l,--local address Listen on this local address\n";
505ca3d1
PL
2020 cout<<"--supervised Don't open a console, I'm supervised\n";
2021 cout<<" (use with e.g. systemd and daemontools)\n";
bbfaaa6f
PL
2022 cout<<"--disable-syslog Don't log to syslog, only to stdout\n";
2023 cout<<" (use with e.g. systemd)\n";
c7b1d943 2024 cout<<"-p,--pidfile file Write a pidfile, works only with --daemon\n";
a36ce055 2025 cout<<"-u,--uid uid Change the process user ID after binding sockets\n";
359d7ca4 2026 cout<<"-v,--verbose Enable verbose mode\n";
7cc68f53 2027 cout<<"\n";
2028 exit(EXIT_SUCCESS);
2029 break;
8f2d5ec3 2030 case 'a':
2031 optstring=optarg;
2032 g_ACL.modify([optstring](NetmaskGroup& nmg) { nmg.addMask(optstring); });
2033 break;
ddb14ec9
PL
2034#ifdef HAVE_LIBSODIUM
2035 case 'k':
6f6b4d69 2036 if (B64Decode(string(optarg), g_key) < 0) {
ddb14ec9
PL
2037 cerr<<"Unable to decode key '"<<optarg<<"'."<<endl;
2038 exit(EXIT_FAILURE);
2039 }
2040 break;
2041#endif
7cc68f53 2042 case 'l':
6a363878 2043 g_cmdLine.locals.push_back(trim_copy(string(optarg)));
7cc68f53 2044 break;
c7b1d943
PL
2045 case 'p':
2046 g_cmdLine.pidfile=optarg;
2cd28293 2047 break;
505ca3d1
PL
2048 case 's':
2049 g_cmdLine.beSupervised=true;
2cd28293 2050 break;
a36ce055
RG
2051 case 'u':
2052 g_cmdLine.uid=optarg;
2053 break;
7cc68f53 2054 case 'v':
2055 g_verbose=true;
2056 break;
6306c282 2057 case 'V':
d4d796e5
PD
2058#ifdef LUAJIT_VERSION
2059 cout<<"dnsdist "<<VERSION<<" ("<<LUA_RELEASE<<" ["<<LUAJIT_VERSION<<"])"<<endl;
2060#else
2061 cout<<"dnsdist "<<VERSION<<" ("<<LUA_RELEASE<<")"<<endl;
2062#endif
70829d97
PL
2063 cout<<"Enabled features: ";
2064#ifdef HAVE_DNSCRYPT
2065 cout<<"dnscrypt ";
2066#endif
0beaa5c8
RG
2067#ifdef HAVE_EBPF
2068 cout<<"ebpf ";
2069#endif
70829d97
PL
2070#ifdef HAVE_LIBSODIUM
2071 cout<<"libsodium ";
2072#endif
2073#ifdef HAVE_PROTOBUF
2074 cout<<"protobuf ";
2075#endif
2076#ifdef HAVE_RE2
2077 cout<<"re2 ";
2078#endif
0beaa5c8
RG
2079#if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
2080 cout<<"recvmmsg/sendmmsg ";
2081#endif
2082#ifdef HAVE_NET_SNMP
2083 cout<<"snmp ";
2084#endif
70829d97
PL
2085#ifdef HAVE_SYSTEMD
2086 cout<<"systemd";
2087#endif
2088 cout<<endl;
6306c282
PL
2089 exit(EXIT_SUCCESS);
2090 break;
7cc68f53 2091 }
24d5cb00 2092 }
6ab65223 2093
7cc68f53 2094 argc-=optind;
2095 argv+=optind;
2096 for(auto p = argv; *p; ++p) {
094b6aff
PL
2097 if(g_cmdLine.beClient) {
2098 clientAddress = ComboAddress(*p, 5199);
2099 } else {
2100 g_cmdLine.remotes.push_back(*p);
2101 }
7cc68f53 2102 }
2103
cd29dcb1 2104 ServerPolicy leastOutstandingPol{"leastOutstanding", leastOutstanding};
2105
e5a14b2b 2106 g_policy.setState(leastOutstandingPol);
7cc68f53 2107 if(g_cmdLine.beClient || !g_cmdLine.command.empty()) {
2108 setupLua(true, g_cmdLine.config);
094b6aff
PL
2109 if (clientAddress != ComboAddress())
2110 g_serverControl = clientAddress;
7cc68f53 2111 doClient(g_serverControl, g_cmdLine.command);
e16fd59c 2112 _exit(EXIT_SUCCESS);
6d01c80c 2113 }
2e72cc0e 2114
8f133915 2115 auto acl = g_ACL.getCopy();
8f2d5ec3 2116 if(acl.empty()) {
2117 for(auto& addr : {"127.0.0.0/8", "10.0.0.0/8", "100.64.0.0/10", "169.254.0.0/16", "192.168.0.0/16", "172.16.0.0/12", "::1/128", "fc00::/7", "fe80::/10"})
2118 acl.addMask(addr);
2119 g_ACL.setState(acl);
2120 }
8f133915 2121
5efcfa63
PL
2122 if (g_cmdLine.checkConfig) {
2123 setupLua(true, g_cmdLine.config);
2124 // No exception was thrown
2125 infolog("Configuration '%s' OK!", g_cmdLine.config);
d8c19b98 2126 _exit(EXIT_SUCCESS);
5efcfa63
PL
2127 }
2128
7cc68f53 2129 auto todo=setupLua(false, g_cmdLine.config);
2e72cc0e 2130
7cc68f53 2131 if(g_cmdLine.locals.size()) {
2e72cc0e 2132 g_locals.clear();
7cc68f53 2133 for(auto loc : g_cmdLine.locals)
f0e4dcba 2134 g_locals.push_back(std::make_tuple(ComboAddress(loc, 53), true, false, 0, "", std::set<int>()));
2e72cc0e 2135 }
2136
2137 if(g_locals.empty())
f0e4dcba 2138 g_locals.push_back(std::make_tuple(ComboAddress("127.0.0.1", 53), true, false, 0, "", std::set<int>()));
2e72cc0e 2139
e41f8165
RG
2140 g_configurationDone = true;
2141
2e72cc0e 2142 vector<ClientState*> toLaunch;
2143 for(const auto& local : g_locals) {
2144 ClientState* cs = new ClientState;
4c34246d 2145 cs->local= std::get<0>(local);
2e72cc0e 2146 cs->udpFD = SSocket(cs->local.sin4.sin_family, SOCK_DGRAM, 0);
2147 if(cs->local.sin4.sin_family == AF_INET6) {
2148 SSetsockopt(cs->udpFD, IPPROTO_IPV6, IPV6_V6ONLY, 1);
2149 }
7cc68f53 2150 //if(g_vm.count("bind-non-local"))
4c34246d 2151 bindAny(cs->local.sin4.sin_family, cs->udpFD);
549d63c9 2152
915b0c39
AT
2153 // if (!setSocketTimestamps(cs->udpFD))
2154 // L<<Logger::Warning<<"Unable to enable timestamp reporting for socket"<<endl;
2155
549d63c9 2156
4c34246d 2157 if(IsAnyAddress(cs->local)) {
549d63c9 2158 int one=1;
2159 setsockopt(cs->udpFD, IPPROTO_IP, GEN_IP_PKTINFO, &one, sizeof(one)); // linux supports this, so why not - might fail on other systems
d98d3951 2160#ifdef IPV6_RECVPKTINFO
11e1e08b 2161 setsockopt(cs->udpFD, IPPROTO_IPV6, IPV6_RECVPKTINFO, &one, sizeof(one));
d98d3951 2162#endif
549d63c9 2163 }
9f67b883 2164
4c34246d 2165 if (std::get<2>(local)) {
5f084036 2166#ifdef SO_REUSEPORT
4c34246d 2167 SSetsockopt(cs->udpFD, SOL_SOCKET, SO_REUSEPORT, 1);
5f084036
RG
2168#else
2169 warnlog("SO_REUSEPORT has been configured on local address '%s' but is not supported", std::get<0>(local).toStringWithPort());
4c34246d 2170#endif
5f084036 2171 }
549d63c9 2172
9f67b883
RG
2173 const std::string& itf = std::get<4>(local);
2174 if (!itf.empty()) {
2175#ifdef SO_BINDTODEVICE
2176 int res = setsockopt(cs->udpFD, SOL_SOCKET, SO_BINDTODEVICE, itf.c_str(), itf.length());
2177 if (res != 0) {
2178 warnlog("Error setting up the interface on local address '%s': %s", std::get<0>(local).toStringWithPort(), strerror(errno));
2179 }
2180#else
2181 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", std::get<0>(local).toStringWithPort());
2182#endif
2183 }
2184
87b515ed
RG
2185#ifdef HAVE_EBPF
2186 if (g_defaultBPFFilter) {
8429ad04 2187 cs->attachFilter(g_defaultBPFFilter);
87b515ed
RG
2188 vinfolog("Attaching default BPF Filter to UDP frontend %s", cs->local.toStringWithPort());
2189 }
2190#endif /* HAVE_EBPF */
2191
f0e4dcba
RG
2192 cs->cpus = std::get<5>(local);
2193
11e1e08b 2194 SBind(cs->udpFD, cs->local);
2e72cc0e 2195 toLaunch.push_back(cs);
963bef8d 2196 g_frontends.push_back(cs);
41408d3a 2197 udpBindsCount++;
2e72cc0e 2198 }
2199
a36ce055 2200 for(const auto& local : g_locals) {
4c34246d
RG
2201 if(!std::get<1>(local)) { // no TCP/IP
2202 warnlog("Not providing TCP/IP service on local address '%s'", std::get<0>(local).toStringWithPort());
a36ce055
RG
2203 continue;
2204 }
963bef8d 2205 ClientState* cs = new ClientState;
4c34246d 2206 cs->local= std::get<0>(local);
a36ce055
RG
2207
2208 cs->tcpFD = SSocket(cs->local.sin4.sin_family, SOCK_STREAM, 0);
2209
2210 SSetsockopt(cs->tcpFD, SOL_SOCKET, SO_REUSEADDR, 1);
2211#ifdef TCP_DEFER_ACCEPT
2212 SSetsockopt(cs->tcpFD, SOL_TCP,TCP_DEFER_ACCEPT, 1);
2213#endif
9e284f31 2214 if (std::get<3>(local) > 0) {
5f084036 2215#ifdef TCP_FASTOPEN
1654ece3 2216 SSetsockopt(cs->tcpFD, IPPROTO_TCP, TCP_FASTOPEN, std::get<3>(local));
5f084036
RG
2217#else
2218 warnlog("TCP Fast Open has been configured on local address '%s' but is not supported", std::get<0>(local).toStringWithPort());
a36ce055 2219#endif
5f084036 2220 }
a36ce055
RG
2221 if(cs->local.sin4.sin_family == AF_INET6) {
2222 SSetsockopt(cs->tcpFD, IPPROTO_IPV6, IPV6_V6ONLY, 1);
2223 }
4c34246d 2224#ifdef SO_REUSEPORT
5f084036 2225 /* no need to warn again if configured but support is not available, we already did for UDP */
4c34246d
RG
2226 if (std::get<2>(local)) {
2227 SSetsockopt(cs->tcpFD, SOL_SOCKET, SO_REUSEPORT, 1);
2228 }
2229#endif
9f67b883
RG
2230
2231 const std::string& itf = std::get<4>(local);
2232 if (!itf.empty()) {
2233#ifdef SO_BINDTODEVICE
2234 int res = setsockopt(cs->tcpFD, SOL_SOCKET, SO_BINDTODEVICE, itf.c_str(), itf.length());
2235 if (res != 0) {
2236 warnlog("Error setting up the interface on local address '%s': %s", std::get<0>(local).toStringWithPort(), strerror(errno));
2237 }
2238#else
2239 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", std::get<0>(local).toStringWithPort());
2240#endif
2241 }
2242
87b515ed
RG
2243#ifdef HAVE_EBPF
2244 if (g_defaultBPFFilter) {
8429ad04 2245 cs->attachFilter(g_defaultBPFFilter);
87b515ed
RG
2246 vinfolog("Attaching default BPF Filter to TCP frontend %s", cs->local.toStringWithPort());
2247 }
2248#endif /* HAVE_EBPF */
2249
a36ce055
RG
2250 // if(g_vm.count("bind-non-local"))
2251 bindAny(cs->local.sin4.sin_family, cs->tcpFD);
2252 SBind(cs->tcpFD, cs->local);
2253 SListen(cs->tcpFD, 64);
f0e4dcba 2254 warnlog("Listening on %s", cs->local.toStringWithPort());
a36ce055
RG
2255
2256 toLaunch.push_back(cs);
963bef8d 2257 g_frontends.push_back(cs);
41408d3a 2258 tcpBindsCount++;
a36ce055
RG
2259 }
2260
11e1e08b
RG
2261#ifdef HAVE_DNSCRYPT
2262 for(auto& dcLocal : g_dnsCryptLocals) {
2263 ClientState* cs = new ClientState;
4c34246d
RG
2264 cs->local = std::get<0>(dcLocal);
2265 cs->dnscryptCtx = &(std::get<1>(dcLocal));
11e1e08b
RG
2266 cs->udpFD = SSocket(cs->local.sin4.sin_family, SOCK_DGRAM, 0);
2267 if(cs->local.sin4.sin_family == AF_INET6) {
2268 SSetsockopt(cs->udpFD, IPPROTO_IPV6, IPV6_V6ONLY, 1);
2269 }
2270 bindAny(cs->local.sin4.sin_family, cs->udpFD);
2271 if(IsAnyAddress(cs->local)) {
2272 int one=1;
2273 setsockopt(cs->udpFD, IPPROTO_IP, GEN_IP_PKTINFO, &one, sizeof(one)); // linux supports this, so why not - might fail on other systems
2274#ifdef IPV6_RECVPKTINFO
2275 setsockopt(cs->udpFD, IPPROTO_IPV6, IPV6_RECVPKTINFO, &one, sizeof(one));
05510903
RG
2276#endif
2277 }
2278 if (std::get<2>(dcLocal)) {
2279#ifdef SO_REUSEPORT
2280 SSetsockopt(cs->udpFD, SOL_SOCKET, SO_REUSEPORT, 1);
2281#else
2282 warnlog("SO_REUSEPORT has been configured on local address '%s' but is not supported", std::get<0>(dcLocal).toStringWithPort());
11e1e08b
RG
2283#endif
2284 }
9f67b883
RG
2285
2286 const std::string& itf = std::get<4>(dcLocal);
2287 if (!itf.empty()) {
2288#ifdef SO_BINDTODEVICE
2289 int res = setsockopt(cs->udpFD, SOL_SOCKET, SO_BINDTODEVICE, itf.c_str(), itf.length());
2290 if (res != 0) {
2291 warnlog("Error setting up the interface on local address '%s': %s", std::get<0>(dcLocal).toStringWithPort(), strerror(errno));
2292 }
2293#else
2294 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", std::get<0>(dcLocal).toStringWithPort());
2295#endif
2296 }
2297
87b515ed
RG
2298#ifdef HAVE_EBPF
2299 if (g_defaultBPFFilter) {
8429ad04 2300 cs->attachFilter(g_defaultBPFFilter);
87b515ed
RG
2301 vinfolog("Attaching default BPF Filter to UDP DNSCrypt frontend %s", cs->local.toStringWithPort());
2302 }
2303#endif /* HAVE_EBPF */
11e1e08b
RG
2304 SBind(cs->udpFD, cs->local);
2305 toLaunch.push_back(cs);
2306 g_frontends.push_back(cs);
41408d3a 2307 udpBindsCount++;
11e1e08b
RG
2308
2309 cs = new ClientState;
4c34246d
RG
2310 cs->local = std::get<0>(dcLocal);
2311 cs->dnscryptCtx = &(std::get<1>(dcLocal));
11e1e08b
RG
2312 cs->tcpFD = SSocket(cs->local.sin4.sin_family, SOCK_STREAM, 0);
2313 SSetsockopt(cs->tcpFD, SOL_SOCKET, SO_REUSEADDR, 1);
2314#ifdef TCP_DEFER_ACCEPT
2315 SSetsockopt(cs->tcpFD, SOL_TCP,TCP_DEFER_ACCEPT, 1);
4c34246d 2316#endif
9e284f31 2317 if (std::get<3>(dcLocal) > 0) {
5f084036 2318#ifdef TCP_FASTOPEN
1654ece3 2319 SSetsockopt(cs->tcpFD, IPPROTO_TCP, TCP_FASTOPEN, std::get<3>(dcLocal));
5f084036
RG
2320#else
2321 warnlog("TCP Fast Open has been configured on local address '%s' but is not supported", std::get<0>(dcLocal).toStringWithPort());
9e284f31 2322#endif
5f084036 2323 }
9f67b883 2324
4c34246d 2325#ifdef SO_REUSEPORT
05510903 2326 /* no need to warn again if configured but support is not available, we already did for UDP */
4c34246d
RG
2327 if (std::get<2>(dcLocal)) {
2328 SSetsockopt(cs->tcpFD, SOL_SOCKET, SO_REUSEPORT, 1);
2329 }
11e1e08b 2330#endif
9f67b883
RG
2331
2332 if (!itf.empty()) {
2333#ifdef SO_BINDTODEVICE
2334 int res = setsockopt(cs->tcpFD, SOL_SOCKET, SO_BINDTODEVICE, itf.c_str(), itf.length());
2335 if (res != 0) {
2336 warnlog("Error setting up the interface on local address '%s': %s", std::get<0>(dcLocal).toStringWithPort(), strerror(errno));
2337 }
2338#else
2339 warnlog("An interface has been configured on local address '%s' but SO_BINDTODEVICE is not supported", std::get<0>(dcLocal).toStringWithPort());
2340#endif
2341 }
2342
11e1e08b
RG
2343 if(cs->local.sin4.sin_family == AF_INET6) {
2344 SSetsockopt(cs->tcpFD, IPPROTO_IPV6, IPV6_V6ONLY, 1);
2345 }
87b515ed
RG
2346#ifdef HAVE_EBPF
2347 if (g_defaultBPFFilter) {
8429ad04 2348 cs->attachFilter(g_defaultBPFFilter);
87b515ed
RG
2349 vinfolog("Attaching default BPF Filter to TCP DNSCrypt frontend %s", cs->local.toStringWithPort());
2350 }
2351#endif /* HAVE_EBPF */
f0e4dcba
RG
2352
2353 cs->cpus = std::get<5>(dcLocal);
2354
11e1e08b
RG
2355 bindAny(cs->local.sin4.sin_family, cs->tcpFD);
2356 SBind(cs->tcpFD, cs->local);
2357 SListen(cs->tcpFD, 64);
2358 warnlog("Listening on %s", cs->local.toStringWithPort());
2359 toLaunch.push_back(cs);
2360 g_frontends.push_back(cs);
41408d3a 2361 tcpBindsCount++;
11e1e08b
RG
2362 }
2363#endif
2364
7cc68f53 2365 if(g_cmdLine.beDaemon) {
a4652d55 2366 g_console=false;
b076b34a 2367 daemonize();
c7b1d943 2368 writepid(g_cmdLine.pidfile);
a4652d55 2369 }
b076b34a 2370 else {
64e4ebb4 2371 vinfolog("Running in the foreground");
43ac546d 2372 warnlog("dnsdist %s comes with ABSOLUTELY NO WARRANTY. This is free software, and you are welcome to redistribute it according to the terms of the GPL version 2", VERSION);
3fa28388
RG
2373 vector<string> vec;
2374 std::string acls;
2375 g_ACL.getCopy().toStringVector(&vec);
2376 for(const auto& s : vec) {
2377 if (!acls.empty())
2378 acls += ", ";
2379 acls += s;
2380 }
2381 infolog("ACL allowing queries from: %s", acls.c_str());
b076b34a 2382 }
6d01c80c 2383
aac59883
RG
2384 uid_t newgid=0;
2385 gid_t newuid=0;
2386
2387 if(!g_cmdLine.gid.empty())
2388 newgid = strToGID(g_cmdLine.gid.c_str());
2389
2390 if(!g_cmdLine.uid.empty())
2391 newuid = strToUID(g_cmdLine.uid.c_str());
2392
2393 dropGroupPrivs(newgid);
2394 dropUserPrivs(newuid);
2395
a36ce055
RG
2396 /* this need to be done _after_ dropping privileges */
2397 g_delay = new DelayPipe<DelayedPacket>();
2398
9f4eb5cc
RG
2399 if (g_snmpAgent) {
2400 g_snmpAgent->run();
2401 }
2402
edbda1ad 2403 g_tcpclientthreads = std::make_shared<TCPClientCollection>(g_maxTCPClientThreads, g_useTCPSinglePipe);
a9bf3ec4 2404
2e72cc0e 2405 for(auto& t : todo)
2406 t();
2407
886e2cf2 2408 auto localPools = g_pools.getCopy();
8f4f5186
RG
2409 /* create the default pool no matter what */
2410 createPoolIfNotExists(localPools, "");
7cc68f53 2411 if(g_cmdLine.remotes.size()) {
2412 for(const auto& address : g_cmdLine.remotes) {
c9262563 2413 auto ret=std::make_shared<DownstreamState>(ComboAddress(address, 53));
886e2cf2 2414 addServerToPool(localPools, "", ret);
7565f4e6 2415 if (ret->connected) {
2717a92f 2416 ret->tid = thread(responderThread, ret);
7565f4e6 2417 }
ecbe9133 2418 g_dstates.modify([ret](servers_t& servers) { servers.push_back(ret); });
6d01c80c 2419 }
2420 }
886e2cf2 2421 g_pools.setState(localPools);
6d01c80c 2422
e73ec7d3 2423 if(g_dstates.getCopy().empty()) {
2424 errlog("No downstream servers defined: all packets will get dropped");
2425 // you might define them later, but you need to know
2426 }
2427
41408d3a
RG
2428 checkFileDescriptorsLimits(udpBindsCount, tcpBindsCount);
2429
e5a14b2b 2430 for(auto& dss : g_dstates.getCopy()) { // it is a copy, but the internal shared_ptrs are the real deal
773470ca 2431 if(dss->availability==DownstreamState::Availability::Auto) {
fbe2a2e0 2432 bool newState=upCheck(*dss);
a7940c06 2433 warnlog("Marking downstream %s as '%s'", dss->getNameWithAddr(), newState ? "up" : "down");
773470ca 2434 dss->upStatus = newState;
2435 }
2436 }
b076b34a 2437
2e72cc0e 2438 for(auto& cs : toLaunch) {
a36ce055
RG
2439 if (cs->udpFD >= 0) {
2440 thread t1(udpClientThread, cs);
f0e4dcba
RG
2441 if (!cs->cpus.empty()) {
2442 mapThreadToCPUList(t1.native_handle(), cs->cpus);
2443 }
a36ce055 2444 t1.detach();
652a7355 2445 }
a36ce055
RG
2446 else if (cs->tcpFD >= 0) {
2447 thread t1(tcpAcceptorThread, cs);
f0e4dcba
RG
2448 if (!cs->cpus.empty()) {
2449 mapThreadToCPUList(t1.native_handle(), cs->cpus);
2450 }
a36ce055 2451 t1.detach();
726ddf60 2452 }
24d5cb00 2453 }
7730131a 2454
42fae326 2455 thread carbonthread(carbonDumpThread);
2456 carbonthread.detach();
2457
3c115e0f 2458 thread stattid(maintThread);
886e2cf2 2459 stattid.detach();
6d01c80c 2460
886e2cf2
RG
2461 thread healththread(healthChecksThread);
2462
505ca3d1 2463 if(g_cmdLine.beDaemon || g_cmdLine.beSupervised) {
6ab65223
PL
2464#ifdef HAVE_SYSTEMD
2465 sd_notify(0, "READY=1");
2466#endif
886e2cf2 2467 healththread.join();
773470ca 2468 }
6d01c80c 2469 else {
886e2cf2 2470 healththread.detach();
505ca3d1 2471 doConsole();
3c115e0f 2472 }
9cf811d1 2473 _exit(EXIT_SUCCESS);
3c115e0f 2474
6d01c80c 2475}
3f5c3f1d
PD
2476catch(const LuaContext::ExecutionErrorException& e) {
2477 try {
2478 errlog("Fatal Lua error: %s", e.what());
2479 std::rethrow_if_nested(e);
2010ac95
RG
2480 } catch(const std::exception& ne) {
2481 errlog("Details: %s", ne.what());
3f5c3f1d
PD
2482 }
2483 catch(PDNSException &ae)
2484 {
2485 errlog("Fatal pdns error: %s", ae.reason);
2486 }
2487 _exit(EXIT_FAILURE);
2488}
24d5cb00 2489catch(std::exception &e)
2490{
6d01c80c 2491 errlog("Fatal error: %s", e.what());
4a966472 2492 _exit(EXIT_FAILURE);
24d5cb00 2493}
3f81d239 2494catch(PDNSException &ae)
7730131a 2495{
6d01c80c 2496 errlog("Fatal pdns error: %s", ae.reason);
4a966472 2497 _exit(EXIT_FAILURE);
7730131a 2498}