]> git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/pdns_recursor.cc
53b6fc525687fc35cc9c885a549c17a434823fc1
[thirdparty/pdns.git] / pdns / pdns_recursor.cc
1 /*
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
8 *
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
22 #ifdef HAVE_CONFIG_H
23 #include "config.h"
24 #endif
25
26 #include <netdb.h>
27 #include <sys/stat.h>
28 #include <unistd.h>
29 #ifdef HAVE_BOOST_CONTAINER_FLAT_SET_HPP
30 #include <boost/container/flat_set.hpp>
31 #endif
32 #include "ws-recursor.hh"
33 #include <thread>
34 #include "threadname.hh"
35 #include "recpacketcache.hh"
36 #include "utility.hh"
37 #include "dns_random.hh"
38 #ifdef HAVE_LIBSODIUM
39 #include <sodium.h>
40 #endif
41 #include "opensslsigners.hh"
42 #include <iostream>
43 #include <errno.h>
44 #include <boost/static_assert.hpp>
45 #include <map>
46 #include <set>
47 #include "recursor_cache.hh"
48 #include "cachecleaner.hh"
49 #include <stdio.h>
50 #include <signal.h>
51 #include <stdlib.h>
52 #include "misc.hh"
53 #include "mtasker.hh"
54 #include <utility>
55 #include "arguments.hh"
56 #include "syncres.hh"
57 #include <fcntl.h>
58 #include <fstream>
59 #include "sortlist.hh"
60 #include "sstuff.hh"
61 #include <boost/tuple/tuple.hpp>
62 #include <boost/tuple/tuple_comparison.hpp>
63 #include <boost/shared_array.hpp>
64 #include <boost/function.hpp>
65 #include <boost/algorithm/string.hpp>
66 #ifdef MALLOC_TRACE
67 #include "malloctrace.hh"
68 #endif
69 #include <netinet/tcp.h>
70 #include "capabilities.hh"
71 #include "dnsparser.hh"
72 #include "dnswriter.hh"
73 #include "dnsrecords.hh"
74 #include "zoneparser-tng.hh"
75 #include "rec_channel.hh"
76 #include "logger.hh"
77 #include "iputils.hh"
78 #include "mplexer.hh"
79 #include "config.h"
80 #include "lua-recursor4.hh"
81 #include "version.hh"
82 #include "responsestats.hh"
83 #include "secpoll-recursor.hh"
84 #include "dnsname.hh"
85 #include "filterpo.hh"
86 #include "rpzloader.hh"
87 #include "validate-recursor.hh"
88 #include "rec-lua-conf.hh"
89 #include "ednsoptions.hh"
90 #include "gettime.hh"
91 #include "proxy-protocol.hh"
92 #include "pubsuffix.hh"
93 #include "shuffle.hh"
94 #ifdef NOD_ENABLED
95 #include "nod.hh"
96 #endif /* NOD_ENABLED */
97
98 #include "rec-protobuf.hh"
99 #include "rec-snmp.hh"
100
101 #ifdef HAVE_SYSTEMD
102 #include <systemd/sd-daemon.h>
103 #endif
104
105 #include "namespaces.hh"
106
107 #ifdef HAVE_PROTOBUF
108 #include "uuid-utils.hh"
109 #endif /* HAVE_PROTOBUF */
110
111 #include "xpf.hh"
112
113 typedef map<ComboAddress, uint32_t, ComboAddress::addressOnlyLessThan> tcpClientCounts_t;
114
115 static thread_local std::shared_ptr<RecursorLua4> t_pdl;
116 static thread_local unsigned int t_id = 0;
117 static thread_local std::shared_ptr<Regex> t_traceRegex;
118 static thread_local std::unique_ptr<tcpClientCounts_t> t_tcpClientCounts;
119 #ifdef HAVE_PROTOBUF
120 static thread_local std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>> t_protobufServers{nullptr};
121 static thread_local uint64_t t_protobufServersGeneration;
122 static thread_local std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>> t_outgoingProtobufServers{nullptr};
123 static thread_local uint64_t t_outgoingProtobufServersGeneration;
124 #endif /* HAVE_PROTOBUF */
125
126 #ifdef HAVE_FSTRM
127 static thread_local std::shared_ptr<std::vector<std::unique_ptr<FrameStreamLogger>>> t_frameStreamServers{nullptr};
128 static thread_local uint64_t t_frameStreamServersGeneration;
129 #endif /* HAVE_FSTRM */
130
131 thread_local std::unique_ptr<MT_t> MT; // the big MTasker
132 std::unique_ptr<MemRecursorCache> s_RC;
133
134
135 thread_local std::unique_ptr<RecursorPacketCache> t_packetCache;
136 thread_local FDMultiplexer* t_fdm{nullptr};
137 thread_local std::unique_ptr<addrringbuf_t> t_remotes, t_servfailremotes, t_largeanswerremotes, t_bogusremotes;
138 thread_local std::unique_ptr<boost::circular_buffer<pair<DNSName, uint16_t> > > t_queryring, t_servfailqueryring, t_bogusqueryring;
139 thread_local std::shared_ptr<NetmaskGroup> t_allowFrom;
140 #ifdef NOD_ENABLED
141 thread_local std::shared_ptr<nod::NODDB> t_nodDBp;
142 thread_local std::shared_ptr<nod::UniqueResponseDB> t_udrDBp;
143 #endif /* NOD_ENABLED */
144 __thread struct timeval g_now; // timestamp, updated (too) frequently
145
146 typedef vector<pair<int, function< void(int, any&) > > > deferredAdd_t;
147
148 // for communicating with our threads
149 // effectively readonly after startup
150 struct RecThreadInfo
151 {
152 struct ThreadPipeSet
153 {
154 int writeToThread{-1};
155 int readToThread{-1};
156 int writeFromThread{-1};
157 int readFromThread{-1};
158 int writeQueriesToThread{-1}; // this one is non-blocking
159 int readQueriesToThread{-1};
160 };
161
162 /* FD corresponding to TCP sockets this thread is listening
163 on.
164 These FDs are also in deferredAdds when we have one
165 socket per listener, and in g_deferredAdds instead. */
166 std::set<int> tcpSockets;
167 /* FD corresponding to listening sockets if we have one socket per
168 listener (with reuseport), otherwise all listeners share the
169 same FD and g_deferredAdds is then used instead */
170 deferredAdd_t deferredAdds;
171 struct ThreadPipeSet pipes;
172 std::thread thread;
173 MT_t* mt{nullptr};
174 uint64_t numberOfDistributedQueries{0};
175 /* handle the web server, carbon, statistics and the control channel */
176 bool isHandler{false};
177 /* accept incoming queries (and distributes them to the workers if pdns-distributes-queries is set) */
178 bool isListener{false};
179 /* process queries */
180 bool isWorker{false};
181 };
182
183 /* first we have the handler thread, t_id == 0 (some other
184 helper threads like SNMP might have t_id == 0 as well)
185 then the distributor threads if any
186 and finally the workers */
187 static std::vector<RecThreadInfo> s_threadInfos;
188 /* without reuseport, all listeners share the same sockets */
189 static deferredAdd_t g_deferredAdds;
190
191 typedef vector<int> tcpListenSockets_t;
192 typedef map<int, ComboAddress> listenSocketsAddresses_t; // is shared across all threads right now
193
194 static const ComboAddress g_local4("0.0.0.0"), g_local6("::");
195 static listenSocketsAddresses_t g_listenSocketsAddresses; // is shared across all threads right now
196 static set<int> g_fromtosockets; // listen sockets that use 'sendfromto()' mechanism
197 static vector<ComboAddress> g_localQueryAddresses4, g_localQueryAddresses6;
198 static AtomicCounter counter;
199 static std::shared_ptr<SyncRes::domainmap_t> g_initialDomainMap; // new threads needs this to be setup
200 static std::shared_ptr<NetmaskGroup> g_initialAllowFrom; // new thread needs to be setup with this
201 static NetmaskGroup g_XPFAcl;
202 static NetmaskGroup g_proxyProtocolACL;
203 static boost::optional<ComboAddress> g_dns64Prefix{boost::none};
204 static DNSName g_dns64PrefixReverse;
205 static size_t g_proxyProtocolMaximumSize;
206 static size_t g_tcpMaxQueriesPerConn;
207 static size_t s_maxUDPQueriesPerRound;
208 static uint64_t g_latencyStatSize;
209 static uint32_t g_disthashseed;
210 static unsigned int g_maxTCPPerClient;
211 static unsigned int g_maxMThreads;
212 static unsigned int g_numDistributorThreads;
213 static unsigned int g_numWorkerThreads;
214 static int g_tcpTimeout;
215 static uint16_t g_udpTruncationThreshold;
216 static uint16_t g_xpfRRCode{0};
217 static std::atomic<bool> statsWanted;
218 static std::atomic<bool> g_quiet;
219 static bool g_logCommonErrors;
220 static bool g_anyToTcp;
221 static bool g_weDistributeQueries; // if true, 1 or more threads listen on the incoming query sockets and distribute them to workers
222 static bool g_reusePort{false};
223 static bool g_gettagNeedsEDNSOptions{false};
224 static time_t g_statisticsInterval;
225 static bool g_useIncomingECS;
226 static bool g_useKernelTimestamp;
227 std::atomic<uint32_t> g_maxCacheEntries, g_maxPacketCacheEntries;
228 #ifdef NOD_ENABLED
229 static bool g_nodEnabled;
230 static DNSName g_nodLookupDomain;
231 static bool g_nodLog;
232 static SuffixMatchNode g_nodDomainWL;
233 static std::string g_nod_pbtag;
234 static bool g_udrEnabled;
235 static bool g_udrLog;
236 static std::string g_udr_pbtag;
237 #endif /* NOD_ENABLED */
238 #ifdef HAVE_BOOST_CONTAINER_FLAT_SET_HPP
239 static boost::container::flat_set<uint16_t> s_avoidUdpSourcePorts;
240 #else
241 static std::set<uint16_t> s_avoidUdpSourcePorts;
242 #endif
243 static uint16_t s_minUdpSourcePort;
244 static uint16_t s_maxUdpSourcePort;
245 static double s_balancingFactor;
246
247 RecursorControlChannel s_rcc; // only active in the handler thread
248 RecursorStats g_stats;
249 string s_programname="pdns_recursor";
250 string s_pidfname;
251 bool g_lowercaseOutgoing;
252 unsigned int g_networkTimeoutMsec;
253 unsigned int g_numThreads;
254 uint16_t g_outgoingEDNSBufsize;
255 bool g_logRPZChanges{false};
256
257 // Used in the Syncres to not throttle certain servers
258 GlobalStateHolder<SuffixMatchNode> g_dontThrottleNames;
259 GlobalStateHolder<NetmaskGroup> g_dontThrottleNetmasks;
260
261 #define LOCAL_NETS "127.0.0.0/8, 10.0.0.0/8, 100.64.0.0/10, 169.254.0.0/16, 192.168.0.0/16, 172.16.0.0/12, ::1/128, fc00::/7, fe80::/10"
262 #define LOCAL_NETS_INVERSE "!127.0.0.0/8, !10.0.0.0/8, !100.64.0.0/10, !169.254.0.0/16, !192.168.0.0/16, !172.16.0.0/12, !::1/128, !fc00::/7, !fe80::/10"
263 // Bad Nets taken from both:
264 // http://www.iana.org/assignments/iana-ipv4-special-registry/iana-ipv4-special-registry.xhtml
265 // and
266 // http://www.iana.org/assignments/iana-ipv6-special-registry/iana-ipv6-special-registry.xhtml
267 // where such a network may not be considered a valid destination
268 #define BAD_NETS "0.0.0.0/8, 192.0.0.0/24, 192.0.2.0/24, 198.51.100.0/24, 203.0.113.0/24, 240.0.0.0/4, ::/96, ::ffff:0:0/96, 100::/64, 2001:db8::/32"
269 #define DONT_QUERY LOCAL_NETS ", " BAD_NETS
270
271 //! used to send information to a newborn mthread
272 struct DNSComboWriter {
273 DNSComboWriter(const std::string& query, const struct timeval& now): d_mdp(true, query), d_now(now), d_query(query)
274 {
275 }
276
277 DNSComboWriter(const std::string& query, const struct timeval& now, std::unordered_set<std::string>&& policyTags, LuaContext::LuaObject&& data, std::vector<DNSRecord>&& records): d_mdp(true, query), d_now(now), d_query(query), d_policyTags(std::move(policyTags)), d_records(std::move(records)), d_data(std::move(data))
278 {
279 }
280
281 void setRemote(const ComboAddress& sa)
282 {
283 d_remote=sa;
284 }
285
286 void setSource(const ComboAddress& sa)
287 {
288 d_source=sa;
289 }
290
291 void setLocal(const ComboAddress& sa)
292 {
293 d_local=sa;
294 }
295
296 void setDestination(const ComboAddress& sa)
297 {
298 d_destination=sa;
299 }
300
301 void setSocket(int sock)
302 {
303 d_socket=sock;
304 }
305
306 string getRemote() const
307 {
308 if (d_source == d_remote) {
309 return d_source.toStringWithPort();
310 }
311 return d_source.toStringWithPort() + " (proxied by " + d_remote.toStringWithPort() + ")";
312 }
313
314 std::vector<ProxyProtocolValue> d_proxyProtocolValues;
315 MOADNSParser d_mdp;
316 struct timeval d_now;
317 /* Remote client, might differ from d_source
318 in case of XPF, in which case d_source holds
319 the IP of the client and d_remote of the proxy
320 */
321 ComboAddress d_remote;
322 ComboAddress d_source;
323 /* Destination address, might differ from
324 d_destination in case of XPF, in which case
325 d_destination holds the IP of the proxy and
326 d_local holds our own. */
327 ComboAddress d_local;
328 ComboAddress d_destination;
329 #ifdef HAVE_PROTOBUF
330 boost::uuids::uuid d_uuid;
331 string d_requestorId;
332 string d_deviceId;
333 string d_deviceName;
334 struct timeval d_kernelTimestamp{0,0};
335 #endif
336 std::string d_query;
337 std::unordered_set<std::string> d_policyTags;
338 std::string d_routingTag;
339 std::vector<DNSRecord> d_records;
340 LuaContext::LuaObject d_data;
341 EDNSSubnetOpts d_ednssubnet;
342 shared_ptr<TCPConnection> d_tcpConnection;
343 boost::optional<int> d_rcode{boost::none};
344 int d_socket{-1};
345 unsigned int d_tag{0};
346 uint32_t d_qhash{0};
347 uint32_t d_ttlCap{std::numeric_limits<uint32_t>::max()};
348 uint16_t d_ecsBegin{0};
349 uint16_t d_ecsEnd{0};
350 bool d_variable{false};
351 bool d_ecsFound{false};
352 bool d_ecsParsed{false};
353 bool d_followCNAMERecords{false};
354 bool d_logResponse{false};
355 bool d_tcp{false};
356 };
357
358 MT_t* getMT()
359 {
360 return MT ? MT.get() : nullptr;
361 }
362
363 ArgvMap &arg()
364 {
365 static ArgvMap theArg;
366 return theArg;
367 }
368
369 unsigned int getRecursorThreadId()
370 {
371 return t_id;
372 }
373
374 int getMTaskerTID()
375 {
376 return MT->getTid();
377 }
378
379 static bool isDistributorThread()
380 {
381 if (t_id == 0) {
382 return false;
383 }
384
385 return g_weDistributeQueries && s_threadInfos.at(t_id).isListener;
386 }
387
388 static bool isHandlerThread()
389 {
390 if (t_id == 0) {
391 return true;
392 }
393
394 return s_threadInfos.at(t_id).isHandler;
395 }
396
397 static void handleTCPClientWritable(int fd, FDMultiplexer::funcparam_t& var);
398
399 // -1 is error, 0 is timeout, 1 is success
400 int asendtcp(const string& data, Socket* sock)
401 {
402 PacketID pident;
403 pident.sock=sock;
404 pident.outMSG=data;
405
406 t_fdm->addWriteFD(sock->getHandle(), handleTCPClientWritable, pident);
407 string packet;
408
409 int ret=MT->waitEvent(pident, &packet, g_networkTimeoutMsec);
410
411 if(!ret || ret==-1) { // timeout
412 t_fdm->removeWriteFD(sock->getHandle());
413 }
414 else if(packet.size() !=data.size()) { // main loop tells us what it sent out, or empty in case of an error
415 return -1;
416 }
417 return ret;
418 }
419
420 static void handleTCPClientReadable(int fd, FDMultiplexer::funcparam_t& var);
421
422 // -1 is error, 0 is timeout, 1 is success
423 int arecvtcp(string& data, size_t len, Socket* sock, bool incompleteOkay)
424 {
425 data.clear();
426 PacketID pident;
427 pident.sock=sock;
428 pident.inNeeded=len;
429 pident.inIncompleteOkay=incompleteOkay;
430 t_fdm->addReadFD(sock->getHandle(), handleTCPClientReadable, pident);
431
432 int ret=MT->waitEvent(pident,&data, g_networkTimeoutMsec);
433 if(!ret || ret==-1) { // timeout
434 t_fdm->removeReadFD(sock->getHandle());
435 }
436 else if(data.empty()) {// error, EOF or other
437 return -1;
438 }
439
440 return ret;
441 }
442
443 static void handleGenUDPQueryResponse(int fd, FDMultiplexer::funcparam_t& var)
444 {
445 PacketID pident=*any_cast<PacketID>(&var);
446 char resp[512];
447 ComboAddress fromaddr;
448 socklen_t addrlen=sizeof(fromaddr);
449
450 ssize_t ret=recvfrom(fd, resp, sizeof(resp), 0, (sockaddr *)&fromaddr, &addrlen);
451 if (fromaddr != pident.remote) {
452 g_log<<Logger::Notice<<"Response received from the wrong remote host ("<<fromaddr.toStringWithPort()<<" instead of "<<pident.remote.toStringWithPort()<<"), discarding"<<endl;
453
454 }
455
456 t_fdm->removeReadFD(fd);
457 if(ret >= 0) {
458 string data(resp, (size_t) ret);
459 MT->sendEvent(pident, &data);
460 }
461 else {
462 string empty;
463 MT->sendEvent(pident, &empty);
464 // cerr<<"Had some kind of error: "<<ret<<", "<<stringerror()<<endl;
465 }
466 }
467 string GenUDPQueryResponse(const ComboAddress& dest, const string& query)
468 {
469 Socket s(dest.sin4.sin_family, SOCK_DGRAM);
470 s.setNonBlocking();
471 ComboAddress local = getQueryLocalAddress(dest.sin4.sin_family, 0);
472
473 s.bind(local);
474 s.connect(dest);
475 s.send(query);
476
477 PacketID pident;
478 pident.sock=&s;
479 pident.remote=dest;
480 pident.type=0;
481 t_fdm->addReadFD(s.getHandle(), handleGenUDPQueryResponse, pident);
482
483 string data;
484
485 int ret=MT->waitEvent(pident,&data, g_networkTimeoutMsec);
486
487 if(!ret || ret==-1) { // timeout
488 t_fdm->removeReadFD(s.getHandle());
489 }
490 else if(data.empty()) {// error, EOF or other
491 // we could special case this
492 return data;
493 }
494 return data;
495 }
496
497 //! pick a random query local address
498 ComboAddress getQueryLocalAddress(int family, uint16_t port)
499 {
500 ComboAddress ret;
501 if(family==AF_INET) {
502 if(g_localQueryAddresses4.empty())
503 ret = g_local4;
504 else
505 ret = g_localQueryAddresses4[dns_random(g_localQueryAddresses4.size())];
506 ret.sin4.sin_port = htons(port);
507 }
508 else {
509 if(g_localQueryAddresses6.empty())
510 ret = g_local6;
511 else
512 ret = g_localQueryAddresses6[dns_random(g_localQueryAddresses6.size())];
513
514 ret.sin6.sin6_port = htons(port);
515 }
516 return ret;
517 }
518
519 static void handleUDPServerResponse(int fd, FDMultiplexer::funcparam_t&);
520
521 static void setSocketBuffer(int fd, int optname, uint32_t size)
522 {
523 uint32_t psize=0;
524 socklen_t len=sizeof(psize);
525
526 if(!getsockopt(fd, SOL_SOCKET, optname, (char*)&psize, &len) && psize > size) {
527 g_log<<Logger::Error<<"Not decreasing socket buffer size from "<<psize<<" to "<<size<<endl;
528 return;
529 }
530
531 if (setsockopt(fd, SOL_SOCKET, optname, (char*)&size, sizeof(size)) < 0) {
532 int err = errno;
533 g_log << Logger::Error << "Unable to raise socket buffer size to " << size << ": " << stringerror(err) << endl;
534 }
535 }
536
537
538 static void setSocketReceiveBuffer(int fd, uint32_t size)
539 {
540 setSocketBuffer(fd, SO_RCVBUF, size);
541 }
542
543 static void setSocketSendBuffer(int fd, uint32_t size)
544 {
545 setSocketBuffer(fd, SO_SNDBUF, size);
546 }
547
548
549 // you can ask this class for a UDP socket to send a query from
550 // this socket is not yours, don't even think about deleting it
551 // but after you call 'returnSocket' on it, don't assume anything anymore
552 class UDPClientSocks
553 {
554 unsigned int d_numsocks;
555 public:
556 UDPClientSocks() : d_numsocks(0)
557 {
558 }
559
560 // returning -2 means: temporary OS error (ie, out of files), -1 means error related to remote
561 int getSocket(const ComboAddress& toaddr, int* fd)
562 {
563 *fd=makeClientSocket(toaddr.sin4.sin_family);
564 if(*fd < 0) // temporary error - receive exception otherwise
565 return -2;
566
567 if(connect(*fd, (struct sockaddr*)(&toaddr), toaddr.getSocklen()) < 0) {
568 int err = errno;
569 try {
570 closesocket(*fd);
571 }
572 catch(const PDNSException& e) {
573 g_log<<Logger::Error<<"Error closing UDP socket after connect() failed: "<<e.reason<<endl;
574 }
575
576 if(err==ENETUNREACH) // Seth "My Interfaces Are Like A Yo Yo" Arnold special
577 return -2;
578 return -1;
579 }
580
581 d_numsocks++;
582 return 0;
583 }
584
585 // return a socket to the pool, or simply erase it
586 void returnSocket(int fd)
587 {
588 try {
589 t_fdm->removeReadFD(fd);
590 }
591 catch(const FDMultiplexerException& e) {
592 // we sometimes return a socket that has not yet been assigned to t_fdm
593 }
594
595 try {
596 closesocket(fd);
597 }
598 catch(const PDNSException& e) {
599 g_log<<Logger::Error<<"Error closing returned UDP socket: "<<e.reason<<endl;
600 }
601
602 --d_numsocks;
603 }
604
605 private:
606
607 // returns -1 for errors which might go away, throws for ones that won't
608 static int makeClientSocket(int family)
609 {
610 int ret=socket(family, SOCK_DGRAM, 0 ); // turns out that setting CLO_EXEC and NONBLOCK from here is not a performance win on Linux (oddly enough)
611
612 if(ret < 0 && errno==EMFILE) // this is not a catastrophic error
613 return ret;
614
615 if(ret<0)
616 throw PDNSException("Making a socket for resolver (family = "+std::to_string(family)+"): "+stringerror());
617
618 // setCloseOnExec(ret); // we're not going to exec
619
620 int tries=10;
621 ComboAddress sin;
622 while(--tries) {
623 uint16_t port;
624
625 if(tries==1) // fall back to kernel 'random'
626 port = 0;
627 else {
628 do {
629 port = s_minUdpSourcePort + dns_random(s_maxUdpSourcePort - s_minUdpSourcePort + 1);
630 }
631 while (s_avoidUdpSourcePorts.count(port));
632 }
633
634 sin=getQueryLocalAddress(family, port); // does htons for us
635
636 if (::bind(ret, (struct sockaddr *)&sin, sin.getSocklen()) >= 0)
637 break;
638 }
639
640 if(!tries) {
641 closesocket(ret);
642 throw PDNSException("Resolver binding to local query client socket on "+sin.toString()+": "+stringerror());
643 }
644
645 try {
646 setReceiveSocketErrors(ret, family);
647 setNonBlocking(ret);
648 }
649 catch(...) {
650 closesocket(ret);
651 throw;
652 }
653
654 return ret;
655 }
656 };
657
658 static thread_local std::unique_ptr<UDPClientSocks> t_udpclientsocks;
659
660 /* these two functions are used by LWRes */
661 // -2 is OS error, -1 is error that depends on the remote, > 0 is success
662 int asendto(const char *data, size_t len, int flags,
663 const ComboAddress& toaddr, uint16_t id, const DNSName& domain, uint16_t qtype, int* fd)
664 {
665
666 PacketID pident;
667 pident.domain = domain;
668 pident.remote = toaddr;
669 pident.type = qtype;
670
671 // see if there is an existing outstanding request we can chain on to, using partial equivalence function
672 pair<MT_t::waiters_t::iterator, MT_t::waiters_t::iterator> chain=MT->d_waiters.equal_range(pident, PacketIDBirthdayCompare());
673
674 for(; chain.first != chain.second; chain.first++) {
675 if(chain.first->key.fd > -1) { // don't chain onto existing chained waiter!
676 /*
677 cerr<<"Orig: "<<pident.domain<<", "<<pident.remote.toString()<<", id="<<id<<endl;
678 cerr<<"Had hit: "<< chain.first->key.domain<<", "<<chain.first->key.remote.toString()<<", id="<<chain.first->key.id
679 <<", count="<<chain.first->key.chain.size()<<", origfd: "<<chain.first->key.fd<<endl;
680 */
681 chain.first->key.chain.insert(id); // we can chain
682 *fd=-1; // gets used in waitEvent / sendEvent later on
683 return 1;
684 }
685 }
686
687 int ret=t_udpclientsocks->getSocket(toaddr, fd);
688 if(ret < 0)
689 return ret;
690
691 pident.fd=*fd;
692 pident.id=id;
693
694 t_fdm->addReadFD(*fd, handleUDPServerResponse, pident);
695 ret = send(*fd, data, len, 0);
696
697 int tmp = errno;
698
699 if(ret < 0)
700 t_udpclientsocks->returnSocket(*fd);
701
702 errno = tmp; // this is for logging purposes only
703 return ret;
704 }
705
706 // -1 is error, 0 is timeout, 1 is success
707 int arecvfrom(std::string& packet, int flags, const ComboAddress& fromaddr, size_t *d_len,
708 uint16_t id, const DNSName& domain, uint16_t qtype, int fd, struct timeval* now)
709 {
710 static optional<unsigned int> nearMissLimit;
711 if(!nearMissLimit)
712 nearMissLimit=::arg().asNum("spoof-nearmiss-max");
713
714 PacketID pident;
715 pident.fd=fd;
716 pident.id=id;
717 pident.domain=domain;
718 pident.type = qtype;
719 pident.remote=fromaddr;
720
721 int ret=MT->waitEvent(pident, &packet, g_networkTimeoutMsec, now);
722
723 /* -1 means error, 0 means timeout, 1 means a result from handleUDPServerResponse() which might still be an error */
724 if(ret > 0) {
725 /* handleUDPServerResponse() will close the socket for us no matter what */
726 if(packet.empty()) // means "error"
727 return -1;
728
729 *d_len=packet.size();
730
731 if(*nearMissLimit && pident.nearMisses > *nearMissLimit) {
732 g_log<<Logger::Error<<"Too many ("<<pident.nearMisses<<" > "<<*nearMissLimit<<") bogus answers for '"<<domain<<"' from "<<fromaddr.toString()<<", assuming spoof attempt."<<endl;
733 g_stats.spoofCount++;
734 return -1;
735 }
736 }
737 else {
738 /* getting there means error or timeout, it's up to us to close the socket */
739 if(fd >= 0)
740 t_udpclientsocks->returnSocket(fd);
741 }
742 return ret;
743 }
744
745 static void writePid(void)
746 {
747 if(!::arg().mustDo("write-pid"))
748 return;
749 ofstream of(s_pidfname.c_str(), std::ios_base::app);
750 if(of)
751 of<< Utility::getpid() <<endl;
752 else {
753 int err = errno;
754 g_log << Logger::Error << "Writing pid for " << Utility::getpid() << " to " << s_pidfname << " failed: "
755 << stringerror(err) << endl;
756 }
757 }
758
759 uint16_t TCPConnection::s_maxInFlight;
760
761 TCPConnection::TCPConnection(int fd, const ComboAddress& addr) : data(2, 0), d_remote(addr), d_fd(fd)
762 {
763 ++s_currentConnections;
764 (*t_tcpClientCounts)[d_remote]++;
765 }
766
767 TCPConnection::~TCPConnection()
768 {
769 try {
770 if(closesocket(d_fd) < 0)
771 g_log<<Logger::Error<<"Error closing socket for TCPConnection"<<endl;
772 }
773 catch(const PDNSException& e) {
774 g_log<<Logger::Error<<"Error closing TCPConnection socket: "<<e.reason<<endl;
775 }
776
777 if(t_tcpClientCounts->count(d_remote) && !(*t_tcpClientCounts)[d_remote]--)
778 t_tcpClientCounts->erase(d_remote);
779 --s_currentConnections;
780 }
781
782 AtomicCounter TCPConnection::s_currentConnections;
783
784 static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var);
785
786 // the idea is, only do things that depend on the *response* here. Incoming accounting is on incoming.
787 static void updateResponseStats(int res, const ComboAddress& remote, unsigned int packetsize, const DNSName* query, uint16_t qtype)
788 {
789 if(packetsize > 1000 && t_largeanswerremotes)
790 t_largeanswerremotes->push_back(remote);
791 switch(res) {
792 case RCode::ServFail:
793 if(t_servfailremotes) {
794 t_servfailremotes->push_back(remote);
795 if(query && t_servfailqueryring) // packet cache
796 t_servfailqueryring->push_back(make_pair(*query, qtype));
797 }
798 g_stats.servFails++;
799 break;
800 case RCode::NXDomain:
801 g_stats.nxDomains++;
802 break;
803 case RCode::NoError:
804 g_stats.noErrors++;
805 break;
806 }
807 }
808
809 static string makeLoginfo(const std::unique_ptr<DNSComboWriter>& dc)
810 try
811 {
812 return "("+dc->d_mdp.d_qname.toLogString()+"/"+DNSRecordContent::NumberToType(dc->d_mdp.d_qtype)+" from "+(dc->getRemote())+")";
813 }
814 catch(...)
815 {
816 return "Exception making error message for exception";
817 }
818
819 #ifdef HAVE_PROTOBUF
820 static void protobufLogQuery(uint8_t maskV4, uint8_t maskV6, const boost::uuids::uuid& uniqueId, const ComboAddress& remote, const ComboAddress& local, const Netmask& ednssubnet, bool tcp, uint16_t id, size_t len, const DNSName& qname, uint16_t qtype, uint16_t qclass, const std::unordered_set<std::string>& policyTags, const std::string& requestorId, const std::string& deviceId, const std::string& deviceName)
821 {
822 if (!t_protobufServers) {
823 return;
824 }
825
826 Netmask requestorNM(remote, remote.sin4.sin_family == AF_INET ? maskV4 : maskV6);
827 ComboAddress requestor = requestorNM.getMaskedNetwork();
828 requestor.setPort(remote.getPort());
829 RecProtoBufMessage message(DNSProtoBufMessage::Query, uniqueId, &requestor, &local, qname, qtype, qclass, id, tcp, len);
830 message.setServerIdentity(SyncRes::s_serverID);
831 message.setEDNSSubnet(ednssubnet, ednssubnet.isIPv4() ? maskV4 : maskV6);
832 message.setRequestorId(requestorId);
833 message.setDeviceId(deviceId);
834 message.setDeviceName(deviceName);
835
836 if (!policyTags.empty()) {
837 message.setPolicyTags(policyTags);
838 }
839
840 // cerr <<message.toDebugString()<<endl;
841 std::string str;
842 message.serialize(str);
843
844 for (auto& server : *t_protobufServers) {
845 server->queueData(str);
846 }
847 }
848
849 static void protobufLogResponse(const RecProtoBufMessage& message)
850 {
851 if (!t_protobufServers) {
852 return;
853 }
854
855 // cerr <<message.toDebugString()<<endl;
856 std::string str;
857 message.serialize(str);
858
859 for (auto& server : *t_protobufServers) {
860 server->queueData(str);
861 }
862 }
863 #endif
864
865 /**
866 * Chases the CNAME provided by the PolicyCustom RPZ policy.
867 *
868 * @param spoofed: The DNSRecord that was created by the policy, should already be added to ret
869 * @param qtype: The QType of the original query
870 * @param sr: A SyncRes
871 * @param res: An integer that will contain the RCODE of the lookup we do
872 * @param ret: A vector of DNSRecords where the result of the CNAME chase should be appended to
873 */
874 static void handleRPZCustom(const DNSRecord& spoofed, const QType& qtype, SyncRes& sr, int& res, vector<DNSRecord>& ret)
875 {
876 if (spoofed.d_type == QType::CNAME) {
877 bool oldWantsRPZ = sr.getWantsRPZ();
878 sr.setWantsRPZ(false);
879 vector<DNSRecord> ans;
880 res = sr.beginResolve(DNSName(spoofed.d_content->getZoneRepresentation()), qtype, QClass::IN, ans);
881 for (const auto& rec : ans) {
882 if(rec.d_place == DNSResourceRecord::ANSWER) {
883 ret.push_back(rec);
884 }
885 }
886 // Reset the RPZ state of the SyncRes
887 sr.setWantsRPZ(oldWantsRPZ);
888 }
889 }
890
891 static bool addRecordToPacket(DNSPacketWriter& pw, const DNSRecord& rec, uint32_t& minTTL, uint32_t ttlCap, const uint16_t maxAnswerSize)
892 {
893 pw.startRecord(rec.d_name, rec.d_type, (rec.d_ttl > ttlCap ? ttlCap : rec.d_ttl), rec.d_class, rec.d_place);
894
895 if(rec.d_type != QType::OPT) // their TTL ain't real
896 minTTL = min(minTTL, rec.d_ttl);
897
898 rec.d_content->toPacket(pw);
899 if(pw.size() > static_cast<size_t>(maxAnswerSize)) {
900 pw.rollback();
901 if(rec.d_place != DNSResourceRecord::ADDITIONAL) {
902 pw.getHeader()->tc=1;
903 pw.truncate();
904 }
905 return false;
906 }
907
908 return true;
909 }
910
911 #ifdef HAVE_PROTOBUF
912 static std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>> startProtobufServers(const ProtobufExportConfig& config)
913 {
914 auto result = std::make_shared<std::vector<std::unique_ptr<RemoteLogger>>>();
915
916 for (const auto& server : config.servers) {
917 try {
918 auto logger = make_unique<RemoteLogger>(server, config.timeout, 100*config.maxQueuedEntries, config.reconnectWaitTime, config.asyncConnect);
919 logger->setLogQueries(config.logQueries);
920 logger->setLogResponses(config.logResponses);
921 result->emplace_back(std::move(logger));
922 }
923 catch(const std::exception& e) {
924 g_log<<Logger::Error<<"Error while starting protobuf logger to '"<<server<<": "<<e.what()<<endl;
925 }
926 catch(const PDNSException& e) {
927 g_log<<Logger::Error<<"Error while starting protobuf logger to '"<<server<<": "<<e.reason<<endl;
928 }
929 }
930
931 return result;
932 }
933
934 static bool checkProtobufExport(LocalStateHolder<LuaConfigItems>& luaconfsLocal)
935 {
936 if (!luaconfsLocal->protobufExportConfig.enabled) {
937 if (t_protobufServers) {
938 for (auto& server : *t_protobufServers) {
939 server->stop();
940 }
941 t_protobufServers.reset();
942 }
943
944 return false;
945 }
946
947 /* if the server was not running, or if it was running according to a
948 previous configuration */
949 if (!t_protobufServers ||
950 t_protobufServersGeneration < luaconfsLocal->generation) {
951
952 if (t_protobufServers) {
953 for (auto& server : *t_protobufServers) {
954 server->stop();
955 }
956 }
957 t_protobufServers.reset();
958
959 t_protobufServers = startProtobufServers(luaconfsLocal->protobufExportConfig);
960 t_protobufServersGeneration = luaconfsLocal->generation;
961 }
962
963 return true;
964 }
965
966 static bool checkOutgoingProtobufExport(LocalStateHolder<LuaConfigItems>& luaconfsLocal)
967 {
968 if (!luaconfsLocal->outgoingProtobufExportConfig.enabled) {
969 if (t_outgoingProtobufServers) {
970 for (auto& server : *t_outgoingProtobufServers) {
971 server->stop();
972 }
973 }
974 t_outgoingProtobufServers.reset();
975
976 return false;
977 }
978
979 /* if the server was not running, or if it was running according to a
980 previous configuration */
981 if (!t_outgoingProtobufServers ||
982 t_outgoingProtobufServersGeneration < luaconfsLocal->generation) {
983
984 if (t_outgoingProtobufServers) {
985 for (auto& server : *t_outgoingProtobufServers) {
986 server->stop();
987 }
988 }
989 t_outgoingProtobufServers.reset();
990
991 t_outgoingProtobufServers = startProtobufServers(luaconfsLocal->outgoingProtobufExportConfig);
992 t_outgoingProtobufServersGeneration = luaconfsLocal->generation;
993 }
994
995 return true;
996 }
997
998 #ifdef HAVE_FSTRM
999
1000 static std::shared_ptr<std::vector<std::unique_ptr<FrameStreamLogger>>> startFrameStreamServers(const FrameStreamExportConfig& config)
1001 {
1002 auto result = std::make_shared<std::vector<std::unique_ptr<FrameStreamLogger>>>();
1003
1004 for (const auto& server : config.servers) {
1005 try {
1006 std::unordered_map<string,unsigned> options;
1007 options["bufferHint"] = config.bufferHint;
1008 options["flushTimeout"] = config.flushTimeout;
1009 options["inputQueueSize"] = config.inputQueueSize;
1010 options["outputQueueSize"] = config.outputQueueSize;
1011 options["queueNotifyThreshold"] = config.queueNotifyThreshold;
1012 options["reopenInterval"] = config.reopenInterval;
1013 FrameStreamLogger *fsl = nullptr;
1014 try {
1015 ComboAddress address(server);
1016 fsl = new FrameStreamLogger(address.sin4.sin_family, address.toStringWithPort(), true, options);
1017 }
1018 catch (const PDNSException& e) {
1019 fsl = new FrameStreamLogger(AF_UNIX, server, true, options);
1020 }
1021 fsl->setLogQueries(config.logQueries);
1022 fsl->setLogResponses(config.logResponses);
1023 result->emplace_back(fsl);
1024 }
1025 catch(const std::exception& e) {
1026 g_log<<Logger::Error<<"Error while starting dnstap framestream logger to '"<<server<<": "<<e.what()<<endl;
1027 }
1028 catch(const PDNSException& e) {
1029 g_log<<Logger::Error<<"Error while starting dnstap framestream logger to '"<<server<<": "<<e.reason<<endl;
1030 }
1031 }
1032
1033 return result;
1034 }
1035
1036 static bool checkFrameStreamExport(LocalStateHolder<LuaConfigItems>& luaconfsLocal)
1037 {
1038 if (!luaconfsLocal->frameStreamExportConfig.enabled) {
1039 if (t_frameStreamServers) {
1040 // dt's take care of cleanup
1041 t_frameStreamServers.reset();
1042 }
1043
1044 return false;
1045 }
1046
1047 /* if the server was not running, or if it was running according to a
1048 previous configuration */
1049 if (!t_frameStreamServers ||
1050 t_frameStreamServersGeneration < luaconfsLocal->generation) {
1051
1052 if (t_frameStreamServers) {
1053 // dt's take care of cleanup
1054 t_frameStreamServers.reset();
1055 }
1056
1057 t_frameStreamServers = startFrameStreamServers(luaconfsLocal->frameStreamExportConfig);
1058 t_frameStreamServersGeneration = luaconfsLocal->generation;
1059 }
1060
1061 return true;
1062 }
1063 #endif /* HAVE_FSTRM */
1064 #endif /* HAVE_PROTOBUF */
1065
1066 #ifdef NOD_ENABLED
1067 static bool nodCheckNewDomain(const DNSName& dname)
1068 {
1069 static const QType qt(QType::A);
1070 static const uint16_t qc(QClass::IN);
1071 bool ret = false;
1072 // First check the (sub)domain isn't whitelisted for NOD purposes
1073 if (!g_nodDomainWL.check(dname)) {
1074 // Now check the NODDB (note this is probabilistic so can have FNs/FPs)
1075 if (t_nodDBp && t_nodDBp->isNewDomain(dname)) {
1076 if (g_nodLog) {
1077 // This should probably log to a dedicated log file
1078 g_log<<Logger::Notice<<"Newly observed domain nod="<<dname.toLogString()<<endl;
1079 }
1080 if (!(g_nodLookupDomain.isRoot())) {
1081 // Send a DNS A query to <domain>.g_nodLookupDomain
1082 DNSName qname = dname;
1083 vector<DNSRecord> dummy;
1084 qname += g_nodLookupDomain;
1085 directResolve(qname, qt, qc, dummy);
1086 }
1087 ret = true;
1088 }
1089 }
1090 return ret;
1091 }
1092
1093 static bool udrCheckUniqueDNSRecord(const DNSName& dname, uint16_t qtype, const DNSRecord& record)
1094 {
1095 bool ret = false;
1096 if (record.d_place == DNSResourceRecord::ANSWER ||
1097 record.d_place == DNSResourceRecord::ADDITIONAL) {
1098 // Create a string that represent a triplet of (qname, qtype and RR[type, name, content])
1099 std::stringstream ss;
1100 ss << dname.toDNSStringLC() << ":" << qtype << ":" << qtype << ":" << record.d_type << ":" << record.d_name.toDNSStringLC() << ":" << record.d_content->getZoneRepresentation();
1101 if (t_udrDBp && t_udrDBp->isUniqueResponse(ss.str())) {
1102 if (g_udrLog) {
1103 // This should also probably log to a dedicated file.
1104 g_log<<Logger::Notice<<"Unique response observed: qname="<<dname.toLogString()<<" qtype="<<QType(qtype).getName()<< " rrtype=" << QType(record.d_type).getName() << " rrname=" << record.d_name.toLogString() << " rrcontent=" << record.d_content->getZoneRepresentation() << endl;
1105 }
1106 ret = true;
1107 }
1108 }
1109 return ret;
1110 }
1111 #endif /* NOD_ENABLED */
1112
1113 int followCNAMERecords(vector<DNSRecord>& ret, const QType& qtype)
1114 {
1115 vector<DNSRecord> resolved;
1116 DNSName target;
1117 for(const DNSRecord& rr : ret) {
1118 if(rr.d_type == QType::CNAME) {
1119 auto rec = getRR<CNAMERecordContent>(rr);
1120 if(rec) {
1121 target=rec->getTarget();
1122 break;
1123 }
1124 }
1125 }
1126
1127 if(target.empty()) {
1128 return 0;
1129 }
1130
1131 int rcode = directResolve(target, qtype, QClass::IN, resolved);
1132
1133 for(DNSRecord& rr : resolved) {
1134 ret.push_back(std::move(rr));
1135 }
1136 return rcode;
1137 }
1138
1139 int getFakeAAAARecords(const DNSName& qname, ComboAddress prefix, vector<DNSRecord>& ret)
1140 {
1141 int rcode = directResolve(qname, QType(QType::A), QClass::IN, ret);
1142
1143 // Remove double CNAME records
1144 std::set<DNSName> seenCNAMEs;
1145 ret.erase(std::remove_if(
1146 ret.begin(),
1147 ret.end(),
1148 [&seenCNAMEs](DNSRecord& rr) {
1149 if (rr.d_type == QType::CNAME) {
1150 auto target = getRR<CNAMERecordContent>(rr);
1151 if (target == nullptr) {
1152 return false;
1153 }
1154 if (seenCNAMEs.count(target->getTarget()) > 0) {
1155 // We've had this CNAME before, remove it
1156 return true;
1157 }
1158 seenCNAMEs.insert(target->getTarget());
1159 }
1160 return false;
1161 }),
1162 ret.end());
1163
1164 bool seenA = false;
1165 for (DNSRecord& rr : ret) {
1166 if (rr.d_type == QType::A && rr.d_place == DNSResourceRecord::ANSWER) {
1167 if (auto rec = getRR<ARecordContent>(rr)) {
1168 ComboAddress ipv4(rec->getCA());
1169 memcpy(&prefix.sin6.sin6_addr.s6_addr[12], &ipv4.sin4.sin_addr.s_addr, sizeof(ipv4.sin4.sin_addr.s_addr));
1170 rr.d_content = std::make_shared<AAAARecordContent>(prefix);
1171 rr.d_type = QType::AAAA;
1172 }
1173 seenA = true;
1174 }
1175 }
1176
1177 if (seenA) {
1178 // We've seen an A in the ANSWER section, so there is no need to keep any
1179 // SOA in the AUTHORITY section as this is not a NODATA response.
1180 ret.erase(std::remove_if(
1181 ret.begin(),
1182 ret.end(),
1183 [](DNSRecord& rr) {
1184 return (rr.d_type == QType::SOA && rr.d_place == DNSResourceRecord::AUTHORITY);
1185 }),
1186 ret.end());
1187 }
1188 return rcode;
1189 }
1190
1191 int getFakePTRRecords(const DNSName& qname, vector<DNSRecord>& ret)
1192 {
1193 /* qname has a reverse ordered IPv6 address, need to extract the underlying IPv4 address from it
1194 and turn it into an IPv4 in-addr.arpa query */
1195 ret.clear();
1196 vector<string> parts = qname.getRawLabels();
1197
1198 if (parts.size() < 8) {
1199 return -1;
1200 }
1201
1202 string newquery;
1203 for (int n = 0; n < 4; ++n) {
1204 newquery +=
1205 std::to_string(stoll(parts[n*2], 0, 16) + 16*stoll(parts[n*2+1], 0, 16));
1206 newquery.append(1, '.');
1207 }
1208 newquery += "in-addr.arpa.";
1209
1210 DNSRecord rr;
1211 rr.d_name = qname;
1212 rr.d_type = QType::CNAME;
1213 rr.d_content = std::make_shared<CNAMERecordContent>(newquery);
1214 ret.push_back(rr);
1215
1216 int rcode = directResolve(DNSName(newquery), QType(QType::PTR), QClass::IN, ret);
1217
1218 return rcode;
1219 }
1220
1221 enum class PolicyResult : uint8_t { NoAction, HaveAnswer, Drop };
1222
1223 static PolicyResult handlePolicyHit(const DNSFilterEngine::Policy& appliedPolicy, const std::unique_ptr<DNSComboWriter>& dc, SyncRes& sr, int& res, vector<DNSRecord>& ret, DNSPacketWriter& pw)
1224 {
1225 /* don't account truncate actions for TCP queries, since they are not applied */
1226 if (appliedPolicy.d_kind != DNSFilterEngine::PolicyKind::Truncate || !dc->d_tcp) {
1227 ++g_stats.policyResults[appliedPolicy.d_kind];
1228 }
1229
1230 switch (appliedPolicy.d_kind) {
1231
1232 case DNSFilterEngine::PolicyKind::NoAction:
1233 return PolicyResult::NoAction;
1234
1235 case DNSFilterEngine::PolicyKind::Drop:
1236 ++g_stats.policyDrops;
1237 return PolicyResult::Drop;
1238
1239 case DNSFilterEngine::PolicyKind::NXDOMAIN:
1240 ret.clear();
1241 res = RCode::NXDomain;
1242 return PolicyResult::HaveAnswer;
1243
1244 case DNSFilterEngine::PolicyKind::NODATA:
1245 ret.clear();
1246 res = RCode::NoError;
1247 return PolicyResult::HaveAnswer;
1248
1249 case DNSFilterEngine::PolicyKind::Truncate:
1250 if (!dc->d_tcp) {
1251 ret.clear();
1252 res = RCode::NoError;
1253 pw.getHeader()->tc = 1;
1254 return PolicyResult::HaveAnswer;
1255 }
1256 return PolicyResult::NoAction;
1257
1258 case DNSFilterEngine::PolicyKind::Custom:
1259 ret.clear();
1260 res = RCode::NoError;
1261 {
1262 auto spoofed = appliedPolicy.getCustomRecords(dc->d_mdp.d_qname, dc->d_mdp.d_qtype);
1263 for (auto& dr : spoofed) {
1264 ret.push_back(dr);
1265 handleRPZCustom(dr, QType(dc->d_mdp.d_qtype), sr, res, ret);
1266 }
1267 }
1268 return PolicyResult::HaveAnswer;
1269 }
1270
1271 return PolicyResult::NoAction;
1272 }
1273
1274 static void startDoResolve(void *p)
1275 {
1276 auto dc=std::unique_ptr<DNSComboWriter>(reinterpret_cast<DNSComboWriter*>(p));
1277 try {
1278 if (t_queryring)
1279 t_queryring->push_back(make_pair(dc->d_mdp.d_qname, dc->d_mdp.d_qtype));
1280
1281 uint16_t maxanswersize = dc->d_tcp ? 65535 : min(static_cast<uint16_t>(512), g_udpTruncationThreshold);
1282 EDNSOpts edo;
1283 std::vector<pair<uint16_t, string> > ednsOpts;
1284 bool variableAnswer = dc->d_variable;
1285 bool haveEDNS=false;
1286 #ifdef NOD_ENABLED
1287 bool hasUDR = false;
1288 #endif /* NOD_ENABLED */
1289 DNSPacketWriter::optvect_t returnedEdnsOptions; // Here we stuff all the options for the return packet
1290 uint8_t ednsExtRCode = 0;
1291 if(getEDNSOpts(dc->d_mdp, &edo)) {
1292 haveEDNS=true;
1293 if (edo.d_version != 0) {
1294 ednsExtRCode = ERCode::BADVERS;
1295 }
1296
1297 if(!dc->d_tcp) {
1298 /* rfc6891 6.2.3:
1299 "Values lower than 512 MUST be treated as equal to 512."
1300 */
1301 maxanswersize = min(static_cast<uint16_t>(edo.d_packetsize >= 512 ? edo.d_packetsize : 512), g_udpTruncationThreshold);
1302 }
1303 ednsOpts = edo.d_options;
1304 maxanswersize -= 11; // EDNS header size
1305
1306 for (const auto& o : edo.d_options) {
1307 if (o.first == EDNSOptionCode::ECS && g_useIncomingECS && !dc->d_ecsParsed) {
1308 dc->d_ecsFound = getEDNSSubnetOptsFromString(o.second, &dc->d_ednssubnet);
1309 } else if (o.first == EDNSOptionCode::NSID) {
1310 const static string mode_server_id = ::arg()["server-id"];
1311 if(mode_server_id != "disabled" && !mode_server_id.empty() &&
1312 maxanswersize > (2 + 2 + mode_server_id.size())) {
1313 returnedEdnsOptions.push_back(make_pair(EDNSOptionCode::NSID, mode_server_id));
1314 variableAnswer = true; // Can't packetcache an answer with NSID
1315 // Option Code and Option Length are both 2
1316 maxanswersize -= 2 + 2 + mode_server_id.size();
1317 }
1318 }
1319 }
1320 }
1321 /* perhaps there was no EDNS or no ECS but by now we looked */
1322 dc->d_ecsParsed = true;
1323 vector<DNSRecord> ret;
1324 vector<uint8_t> packet;
1325
1326 auto luaconfsLocal = g_luaconfs.getLocal();
1327 // Used to tell syncres later on if we should apply NSDNAME and NSIP RPZ triggers for this query
1328 bool wantsRPZ(true);
1329 boost::optional<RecProtoBufMessage> pbMessage(boost::none);
1330 #ifdef HAVE_PROTOBUF
1331 if (checkProtobufExport(luaconfsLocal)) {
1332 Netmask requestorNM(dc->d_source, dc->d_source.sin4.sin_family == AF_INET ? luaconfsLocal->protobufMaskV4 : luaconfsLocal->protobufMaskV6);
1333 ComboAddress requestor = requestorNM.getMaskedNetwork();
1334 requestor.setPort(dc->d_source.getPort());
1335 pbMessage = RecProtoBufMessage(RecProtoBufMessage::Response, dc->d_uuid, &requestor, &dc->d_destination, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_mdp.d_qclass, dc->d_mdp.d_header.id, dc->d_tcp, 0);
1336 pbMessage->setServerIdentity(SyncRes::s_serverID);
1337 pbMessage->setEDNSSubnet(dc->d_ednssubnet.source, dc->d_ednssubnet.source.isIPv4() ? luaconfsLocal->protobufMaskV4 : luaconfsLocal->protobufMaskV6);
1338 }
1339 #endif /* HAVE_PROTOBUF */
1340
1341 #ifdef HAVE_FSTRM
1342 checkFrameStreamExport(luaconfsLocal);
1343 #endif
1344
1345 DNSPacketWriter pw(packet, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_mdp.d_qclass);
1346
1347 pw.getHeader()->aa=0;
1348 pw.getHeader()->ra=1;
1349 pw.getHeader()->qr=1;
1350 pw.getHeader()->tc=0;
1351 pw.getHeader()->id=dc->d_mdp.d_header.id;
1352 pw.getHeader()->rd=dc->d_mdp.d_header.rd;
1353 pw.getHeader()->cd=dc->d_mdp.d_header.cd;
1354
1355 /* This is the lowest TTL seen in the records of the response,
1356 so we can't cache it for longer than this value.
1357 If we have a TTL cap, this value can't be larger than the
1358 cap no matter what. */
1359 uint32_t minTTL = dc->d_ttlCap;
1360
1361 SyncRes sr(dc->d_now);
1362 sr.setId(MT->getTid());
1363
1364 bool DNSSECOK=false;
1365 if(t_pdl) {
1366 sr.setLuaEngine(t_pdl);
1367 }
1368 if(g_dnssecmode != DNSSECMode::Off) {
1369 sr.setDoDNSSEC(true);
1370
1371 // Does the requestor want DNSSEC records?
1372 if(edo.d_extFlags & EDNSOpts::DNSSECOK) {
1373 DNSSECOK=true;
1374 g_stats.dnssecQueries++;
1375 }
1376 if (dc->d_mdp.d_header.cd) {
1377 /* Per rfc6840 section 5.9, "When processing a request with
1378 the Checking Disabled (CD) bit set, a resolver SHOULD attempt
1379 to return all response data, even data that has failed DNSSEC
1380 validation. */
1381 ++g_stats.dnssecCheckDisabledQueries;
1382 }
1383 if (dc->d_mdp.d_header.ad) {
1384 /* Per rfc6840 section 5.7, "the AD bit in a query as a signal
1385 indicating that the requester understands and is interested in the
1386 value of the AD bit in the response. This allows a requester to
1387 indicate that it understands the AD bit without also requesting
1388 DNSSEC data via the DO bit. */
1389 ++g_stats.dnssecAuthenticDataQueries;
1390 }
1391 } else {
1392 // Ignore the client-set CD flag
1393 pw.getHeader()->cd=0;
1394 }
1395 sr.setDNSSECValidationRequested(g_dnssecmode == DNSSECMode::ValidateAll || g_dnssecmode==DNSSECMode::ValidateForLog || ((dc->d_mdp.d_header.ad || DNSSECOK) && g_dnssecmode==DNSSECMode::Process));
1396
1397 #ifdef HAVE_PROTOBUF
1398 sr.setInitialRequestId(dc->d_uuid);
1399 sr.setOutgoingProtobufServers(t_outgoingProtobufServers);
1400 #endif
1401 #ifdef HAVE_FSTRM
1402 sr.setFrameStreamServers(t_frameStreamServers);
1403 #endif
1404 sr.setQuerySource(dc->d_remote, g_useIncomingECS && !dc->d_ednssubnet.source.empty() ? boost::optional<const EDNSSubnetOpts&>(dc->d_ednssubnet) : boost::none);
1405
1406 bool tracedQuery=false; // we could consider letting Lua know about this too
1407 bool shouldNotValidate = false;
1408
1409 /* preresolve expects res (dq.rcode) to be set to RCode::NoError by default */
1410 int res = RCode::NoError;
1411
1412 DNSFilterEngine::Policy appliedPolicy;
1413 RecursorLua4::DNSQuestion dq(dc->d_source, dc->d_destination, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_tcp, variableAnswer, wantsRPZ, dc->d_logResponse);
1414 dq.ednsFlags = &edo.d_extFlags;
1415 dq.ednsOptions = &ednsOpts;
1416 dq.tag = dc->d_tag;
1417 dq.discardedPolicies = &sr.d_discardedPolicies;
1418 dq.policyTags = &dc->d_policyTags;
1419 dq.appliedPolicy = &appliedPolicy;
1420 dq.currentRecords = &ret;
1421 dq.dh = &dc->d_mdp.d_header;
1422 dq.data = dc->d_data;
1423 #ifdef HAVE_PROTOBUF
1424 dq.requestorId = dc->d_requestorId;
1425 dq.deviceId = dc->d_deviceId;
1426 dq.deviceName = dc->d_deviceName;
1427 #endif
1428 dq.proxyProtocolValues = &dc->d_proxyProtocolValues;
1429
1430 if(ednsExtRCode != 0) {
1431 goto sendit;
1432 }
1433
1434 if(dc->d_mdp.d_qtype==QType::ANY && !dc->d_tcp && g_anyToTcp) {
1435 pw.getHeader()->tc = 1;
1436 res = 0;
1437 variableAnswer = true;
1438 goto sendit;
1439 }
1440
1441 if(t_traceRegex && t_traceRegex->match(dc->d_mdp.d_qname.toString())) {
1442 sr.setLogMode(SyncRes::Store);
1443 tracedQuery=true;
1444 }
1445
1446 if(!g_quiet || tracedQuery) {
1447 g_log<<Logger::Warning<<t_id<<" ["<<MT->getTid()<<"/"<<MT->numProcesses()<<"] " << (dc->d_tcp ? "TCP " : "") << "question for '"<<dc->d_mdp.d_qname<<"|"
1448 <<DNSRecordContent::NumberToType(dc->d_mdp.d_qtype)<<"' from "<<dc->getRemote();
1449 if(!dc->d_ednssubnet.source.empty()) {
1450 g_log<<" (ecs "<<dc->d_ednssubnet.source.toString()<<")";
1451 }
1452 g_log<<endl;
1453 }
1454
1455 if(!dc->d_mdp.d_header.rd) {
1456 sr.setCacheOnly();
1457 }
1458
1459 if (dc->d_rcode != boost::none) {
1460 /* we have a response ready to go, most likely from gettag_ffi */
1461 ret = std::move(dc->d_records);
1462 res = *dc->d_rcode;
1463 if (res == RCode::NoError && dc->d_followCNAMERecords) {
1464 res = followCNAMERecords(ret, QType(dc->d_mdp.d_qtype));
1465 }
1466 goto haveAnswer;
1467 }
1468
1469 if (t_pdl) {
1470 t_pdl->prerpz(dq, res);
1471 }
1472
1473 // Check if the query has a policy attached to it
1474 if (wantsRPZ && (appliedPolicy.d_type == DNSFilterEngine::PolicyType::None || appliedPolicy.d_kind == DNSFilterEngine::PolicyKind::NoAction)) {
1475 if (luaconfsLocal->dfe.getQueryPolicy(dc->d_mdp.d_qname, dc->d_source, sr.d_discardedPolicies, appliedPolicy)) {
1476 mergePolicyTags(dc->d_policyTags, appliedPolicy.getTags());
1477 }
1478 }
1479
1480 // if there is a RecursorLua active, and it 'took' the query in preResolve, we don't launch beginResolve
1481 if (!t_pdl || !t_pdl->preresolve(dq, res)) {
1482
1483 if (!g_dns64PrefixReverse.empty() && dq.qtype == QType::PTR && dq.qname.isPartOf(g_dns64PrefixReverse)) {
1484 res = getFakePTRRecords(dq.qname, ret);
1485 goto haveAnswer;
1486 }
1487
1488 sr.setWantsRPZ(wantsRPZ);
1489 if (wantsRPZ && appliedPolicy.d_kind != DNSFilterEngine::PolicyKind::NoAction) {
1490 auto policyResult = handlePolicyHit(appliedPolicy, dc, sr, res, ret, pw);
1491 if (policyResult == PolicyResult::HaveAnswer) {
1492 goto haveAnswer;
1493 }
1494 else if (policyResult == PolicyResult::Drop) {
1495 return;
1496 }
1497 }
1498
1499 // Query got not handled for QNAME Policy reasons, now actually go out to find an answer
1500 try {
1501 sr.d_appliedPolicy = appliedPolicy;
1502 sr.d_policyTags = std::move(dc->d_policyTags);
1503
1504 if (!dc->d_routingTag.empty()) {
1505 sr.d_routingTag = dc->d_routingTag;
1506 }
1507
1508 res = sr.beginResolve(dc->d_mdp.d_qname, QType(dc->d_mdp.d_qtype), dc->d_mdp.d_qclass, ret);
1509 shouldNotValidate = sr.wasOutOfBand();
1510 }
1511 catch(const ImmediateServFailException &e) {
1512 if(g_logCommonErrors) {
1513 g_log<<Logger::Notice<<"Sending SERVFAIL to "<<dc->getRemote()<<" during resolve of '"<<dc->d_mdp.d_qname<<"' because: "<<e.reason<<endl;
1514 }
1515 res = RCode::ServFail;
1516 }
1517 catch(const PolicyHitException& e) {
1518 res = -2;
1519 }
1520 dq.validationState = sr.getValidationState();
1521 appliedPolicy = sr.d_appliedPolicy;
1522 dc->d_policyTags = std::move(sr.d_policyTags);
1523
1524 // During lookup, an NSDNAME or NSIP trigger was hit in RPZ
1525 if (res == -2) { // XXX This block should be macro'd, it is repeated post-resolve.
1526 if (appliedPolicy.d_kind == DNSFilterEngine::PolicyKind::NoAction) {
1527 throw PDNSException("NoAction policy returned while a NSDNAME or NSIP trigger was hit");
1528 }
1529 auto policyResult = handlePolicyHit(appliedPolicy, dc, sr, res, ret, pw);
1530 if (policyResult == PolicyResult::HaveAnswer) {
1531 goto haveAnswer;
1532 }
1533 else if (policyResult == PolicyResult::Drop) {
1534 return;
1535 }
1536 }
1537
1538 if (wantsRPZ && (appliedPolicy.d_type == DNSFilterEngine::PolicyType::None || appliedPolicy.d_kind == DNSFilterEngine::PolicyKind::NoAction)) {
1539 if (luaconfsLocal->dfe.getPostPolicy(ret, sr.d_discardedPolicies, appliedPolicy)) {
1540 mergePolicyTags(dc->d_policyTags, appliedPolicy.getTags());
1541 }
1542 }
1543
1544 if (t_pdl || (g_dns64Prefix && dq.qtype == QType::AAAA && dq.validationState != Bogus)) {
1545 if (res == RCode::NoError) {
1546 auto i = ret.cbegin();
1547 for(; i!= ret.cend(); ++i) {
1548 if (i->d_type == dc->d_mdp.d_qtype && i->d_place == DNSResourceRecord::ANSWER) {
1549 break;
1550 }
1551 }
1552
1553 if (i == ret.cend()) {
1554 /* no record in the answer section, NODATA */
1555 if (t_pdl && t_pdl->nodata(dq, res)) {
1556 shouldNotValidate = true;
1557 }
1558 else if (g_dns64Prefix && dq.qtype == QType::AAAA && dq.validationState != Bogus) {
1559 res = getFakeAAAARecords(dq.qname, *g_dns64Prefix, ret);
1560 shouldNotValidate = true;
1561 }
1562 }
1563
1564 }
1565 else if(res == RCode::NXDomain && t_pdl && t_pdl->nxdomain(dq, res)) {
1566 shouldNotValidate = true;
1567 }
1568
1569 if (t_pdl && t_pdl->postresolve(dq, res)) {
1570 shouldNotValidate = true;
1571 }
1572 }
1573
1574 if (wantsRPZ) { //XXX This block is repeated, see above
1575
1576 auto policyResult = handlePolicyHit(appliedPolicy, dc, sr, res, ret, pw);
1577 if (policyResult == PolicyResult::HaveAnswer) {
1578 goto haveAnswer;
1579 }
1580 else if (policyResult == PolicyResult::Drop) {
1581 return;
1582 }
1583 }
1584 }
1585 haveAnswer:;
1586 if(res == PolicyDecision::DROP) {
1587 g_stats.policyDrops++;
1588 return;
1589 }
1590 if(tracedQuery || res == -1 || res == RCode::ServFail || pw.getHeader()->rcode == RCode::ServFail)
1591 {
1592 string trace(sr.getTrace());
1593 if(!trace.empty()) {
1594 vector<string> lines;
1595 boost::split(lines, trace, boost::is_any_of("\n"));
1596 for(const string& line : lines) {
1597 if(!line.empty())
1598 g_log<<Logger::Warning<< line << endl;
1599 }
1600 }
1601 }
1602
1603 if(res == -1) {
1604 pw.getHeader()->rcode=RCode::ServFail;
1605 // no commit here, because no record
1606 g_stats.servFails++;
1607 }
1608 else {
1609 pw.getHeader()->rcode=res;
1610
1611 // Does the validation mode or query demand validation?
1612 if(!shouldNotValidate && sr.isDNSSECValidationRequested()) {
1613 try {
1614 if(sr.doLog()) {
1615 g_log<<Logger::Warning<<"Starting validation of answer to "<<dc->d_mdp.d_qname<<"|"<<QType(dc->d_mdp.d_qtype).getName()<<" for "<<dc->getRemote()<<endl;
1616 }
1617
1618 auto state = sr.getValidationState();
1619
1620 if(state == Secure) {
1621 if(sr.doLog()) {
1622 g_log<<Logger::Warning<<"Answer to "<<dc->d_mdp.d_qname<<"|"<<QType(dc->d_mdp.d_qtype).getName()<<" for "<<dc->getRemote()<<" validates correctly"<<endl;
1623 }
1624
1625 // Is the query source interested in the value of the ad-bit?
1626 if (dc->d_mdp.d_header.ad || DNSSECOK)
1627 pw.getHeader()->ad=1;
1628 }
1629 else if(state == Insecure) {
1630 if(sr.doLog()) {
1631 g_log<<Logger::Warning<<"Answer to "<<dc->d_mdp.d_qname<<"|"<<QType(dc->d_mdp.d_qtype).getName()<<" for "<<dc->getRemote()<<" validates as Insecure"<<endl;
1632 }
1633
1634 pw.getHeader()->ad=0;
1635 }
1636 else if(state == Bogus) {
1637 if(t_bogusremotes)
1638 t_bogusremotes->push_back(dc->d_source);
1639 if(t_bogusqueryring)
1640 t_bogusqueryring->push_back(make_pair(dc->d_mdp.d_qname, dc->d_mdp.d_qtype));
1641 if(g_dnssecLogBogus || sr.doLog() || g_dnssecmode == DNSSECMode::ValidateForLog) {
1642 g_log<<Logger::Warning<<"Answer to "<<dc->d_mdp.d_qname<<"|"<<QType(dc->d_mdp.d_qtype).getName()<<" for "<<dc->getRemote()<<" validates as Bogus"<<endl;
1643 }
1644
1645 // Does the query or validation mode sending out a SERVFAIL on validation errors?
1646 if(!pw.getHeader()->cd && (g_dnssecmode == DNSSECMode::ValidateAll || dc->d_mdp.d_header.ad || DNSSECOK)) {
1647 if(sr.doLog()) {
1648 g_log<<Logger::Warning<<"Sending out SERVFAIL for "<<dc->d_mdp.d_qname<<"|"<<QType(dc->d_mdp.d_qtype).getName()<<" because recursor or query demands it for Bogus results"<<endl;
1649 }
1650
1651 pw.getHeader()->rcode=RCode::ServFail;
1652 goto sendit;
1653 } else {
1654 if(sr.doLog()) {
1655 g_log<<Logger::Warning<<"Not sending out SERVFAIL for "<<dc->d_mdp.d_qname<<"|"<<QType(dc->d_mdp.d_qtype).getName()<<" Bogus validation since neither config nor query demands this"<<endl;
1656 }
1657 }
1658 }
1659 }
1660 catch(const ImmediateServFailException &e) {
1661 if(g_logCommonErrors)
1662 g_log<<Logger::Notice<<"Sending SERVFAIL to "<<dc->getRemote()<<" during validation of '"<<dc->d_mdp.d_qname<<"|"<<QType(dc->d_mdp.d_qtype).getName()<<"' because: "<<e.reason<<endl;
1663 pw.getHeader()->rcode=RCode::ServFail;
1664 goto sendit;
1665 }
1666 }
1667
1668 if(ret.size()) {
1669 pdns::orderAndShuffle(ret);
1670 if(auto sl = luaconfsLocal->sortlist.getOrderCmp(dc->d_source)) {
1671 stable_sort(ret.begin(), ret.end(), *sl);
1672 variableAnswer=true;
1673 }
1674 }
1675
1676 bool needCommit = false;
1677 for(auto i=ret.cbegin(); i!=ret.cend(); ++i) {
1678 if( ! DNSSECOK &&
1679 ( i->d_type == QType::NSEC3 ||
1680 (
1681 ( i->d_type == QType::RRSIG || i->d_type==QType::NSEC ) &&
1682 (
1683 ( dc->d_mdp.d_qtype != i->d_type && dc->d_mdp.d_qtype != QType::ANY ) ||
1684 i->d_place != DNSResourceRecord::ANSWER
1685 )
1686 )
1687 )
1688 ) {
1689 continue;
1690 }
1691
1692 if (!addRecordToPacket(pw, *i, minTTL, dc->d_ttlCap, maxanswersize)) {
1693 needCommit = false;
1694 break;
1695 }
1696 needCommit = true;
1697
1698 #ifdef NOD_ENABLED
1699 bool udr = false;
1700 if (g_udrEnabled) {
1701 udr = udrCheckUniqueDNSRecord(dc->d_mdp.d_qname, dc->d_mdp.d_qtype, *i);
1702 if (!hasUDR && udr)
1703 hasUDR = true;
1704 }
1705 #endif /* NOD ENABLED */
1706
1707 #ifdef HAVE_PROTOBUF
1708 if (t_protobufServers) {
1709 #ifdef NOD_ENABLED
1710 pbMessage->addRR(*i, luaconfsLocal->protobufExportConfig.exportTypes, udr);
1711 #else
1712 pbMessage->addRR(*i, luaconfsLocal->protobufExportConfig.exportTypes);
1713 #endif /* NOD_ENABLED */
1714 }
1715 #endif
1716 }
1717 if(needCommit)
1718 pw.commit();
1719 }
1720 sendit:;
1721
1722 if(g_useIncomingECS && dc->d_ecsFound && !sr.wasVariable() && !variableAnswer) {
1723 // cerr<<"Stuffing in a 0 scope because answer is static"<<endl;
1724 EDNSSubnetOpts eo;
1725 eo.source = dc->d_ednssubnet.source;
1726 ComboAddress sa;
1727 sa.reset();
1728 sa.sin4.sin_family = eo.source.getNetwork().sin4.sin_family;
1729 eo.scope = Netmask(sa, 0);
1730
1731 returnedEdnsOptions.push_back(make_pair(EDNSOptionCode::ECS, makeEDNSSubnetOptsString(eo)));
1732 }
1733
1734 if (haveEDNS) {
1735 /* we try to add the EDNS OPT RR even for truncated answers,
1736 as rfc6891 states:
1737 "The minimal response MUST be the DNS header, question section, and an
1738 OPT record. This MUST also occur when a truncated response (using
1739 the DNS header's TC bit) is returned."
1740 */
1741 pw.addOpt(512, ednsExtRCode, DNSSECOK ? EDNSOpts::DNSSECOK : 0, returnedEdnsOptions);
1742 pw.commit();
1743 }
1744
1745 g_rs.submitResponse(dc->d_mdp.d_qtype, packet.size(), !dc->d_tcp);
1746 updateResponseStats(res, dc->d_source, packet.size(), &dc->d_mdp.d_qname, dc->d_mdp.d_qtype);
1747 #ifdef NOD_ENABLED
1748 bool nod = false;
1749 if (g_nodEnabled) {
1750 if (nodCheckNewDomain(dc->d_mdp.d_qname))
1751 nod = true;
1752 }
1753 #endif /* NOD_ENABLED */
1754 #ifdef HAVE_PROTOBUF
1755 if (t_protobufServers && !(luaconfsLocal->protobufExportConfig.taggedOnly && appliedPolicy.getName().empty() && dc->d_policyTags.empty())) {
1756 pbMessage->setBytes(packet.size());
1757 pbMessage->setResponseCode(pw.getHeader()->rcode);
1758 if (!appliedPolicy.getName().empty()) {
1759 pbMessage->setAppliedPolicy(appliedPolicy.getName());
1760 pbMessage->setAppliedPolicyType(appliedPolicy.d_type);
1761 }
1762 pbMessage->setPolicyTags(dc->d_policyTags);
1763 if (g_useKernelTimestamp && dc->d_kernelTimestamp.tv_sec) {
1764 pbMessage->setQueryTime(dc->d_kernelTimestamp.tv_sec, dc->d_kernelTimestamp.tv_usec);
1765 }
1766 else {
1767 pbMessage->setQueryTime(dc->d_now.tv_sec, dc->d_now.tv_usec);
1768 }
1769 pbMessage->setRequestorId(dq.requestorId);
1770 pbMessage->setDeviceId(dq.deviceId);
1771 pbMessage->setDeviceName(dq.deviceName);
1772 #ifdef NOD_ENABLED
1773 if (g_nodEnabled) {
1774 if (nod) {
1775 pbMessage->setNOD(true);
1776 pbMessage->addPolicyTag(g_nod_pbtag);
1777 }
1778 if (hasUDR) {
1779 pbMessage->addPolicyTag(g_udr_pbtag);
1780 }
1781 }
1782 #endif /* NOD_ENABLED */
1783 if (dc->d_logResponse) {
1784 protobufLogResponse(*pbMessage);
1785 }
1786 #ifdef NOD_ENABLED
1787 if (g_nodEnabled) {
1788 pbMessage->setNOD(false);
1789 pbMessage->clearUDR();
1790 if (nod)
1791 pbMessage->removePolicyTag(g_nod_pbtag);
1792 if (hasUDR)
1793 pbMessage->removePolicyTag(g_udr_pbtag);
1794 }
1795 #endif /* NOD_ENABLED */
1796 }
1797 #endif
1798 if(!dc->d_tcp) {
1799 struct msghdr msgh;
1800 struct iovec iov;
1801 cmsgbuf_aligned cbuf;
1802 fillMSGHdr(&msgh, &iov, &cbuf, 0, (char*)&*packet.begin(), packet.size(), &dc->d_remote);
1803 msgh.msg_control=NULL;
1804
1805 if(g_fromtosockets.count(dc->d_socket)) {
1806 addCMsgSrcAddr(&msgh, &cbuf, &dc->d_local, 0);
1807 }
1808 if(sendmsg(dc->d_socket, &msgh, 0) < 0 && g_logCommonErrors) {
1809 int err = errno;
1810 g_log << Logger::Warning << "Sending UDP reply to client " << dc->getRemote() << " failed with: "
1811 << strerror(err) << endl;
1812 }
1813
1814 if(variableAnswer || sr.wasVariable()) {
1815 g_stats.variableResponses++;
1816 }
1817 if(!SyncRes::s_nopacketcache && !variableAnswer && !sr.wasVariable() ) {
1818 t_packetCache->insertResponsePacket(dc->d_tag, dc->d_qhash, std::move(dc->d_query), dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_mdp.d_qclass,
1819 string((const char*)&*packet.begin(), packet.size()),
1820 g_now.tv_sec,
1821 pw.getHeader()->rcode == RCode::ServFail ? SyncRes::s_packetcacheservfailttl :
1822 min(minTTL,SyncRes::s_packetcachettl),
1823 dq.validationState,
1824 dc->d_ecsBegin,
1825 dc->d_ecsEnd,
1826 std::move(pbMessage));
1827 }
1828 // else cerr<<"Not putting in packet cache: "<<sr.wasVariable()<<endl;
1829 }
1830 else {
1831 char buf[2];
1832 buf[0]=packet.size()/256;
1833 buf[1]=packet.size()%256;
1834
1835 Utility::iovec iov[2];
1836
1837 iov[0].iov_base=(void*)buf; iov[0].iov_len=2;
1838 iov[1].iov_base=(void*)&*packet.begin(); iov[1].iov_len = packet.size();
1839
1840 int wret=Utility::writev(dc->d_socket, iov, 2);
1841 bool hadError=true;
1842
1843 if(wret == 0)
1844 g_log<<Logger::Error<<"EOF writing TCP answer to "<<dc->getRemote()<<endl;
1845 else if(wret < 0 ) {
1846 int err = errno;
1847 g_log << Logger::Error << "Error writing TCP answer to " << dc->getRemote() << ": " << strerror(err) << endl;
1848 } else if((unsigned int)wret != 2 + packet.size())
1849 g_log<<Logger::Error<<"Oops, partial answer sent to "<<dc->getRemote()<<" for "<<dc->d_mdp.d_qname<<" (size="<< (2 + packet.size()) <<", sent "<<wret<<")"<<endl;
1850 else
1851 hadError=false;
1852
1853 // update tcp connection status, closing if needed and doing the fd multiplexer accounting
1854 if (dc->d_tcpConnection->d_requestsInFlight > 0) {
1855 dc->d_tcpConnection->d_requestsInFlight--;
1856 }
1857
1858 // In the code below, we try to remove the fd from the set, but
1859 // we don't know if another mthread already did the remove, so we can get a
1860 // "Tried to remove unlisted fd" exception. Not that an inflight < limit test
1861 // will not work since we do not know if the other mthread got an error or not.
1862 if(hadError) {
1863 try {
1864 t_fdm->removeReadFD(dc->d_socket);
1865 }
1866 catch (FDMultiplexerException &) {
1867 }
1868 dc->d_socket = -1;
1869 }
1870 else {
1871 dc->d_tcpConnection->queriesCount++;
1872 if (g_tcpMaxQueriesPerConn && dc->d_tcpConnection->queriesCount >= g_tcpMaxQueriesPerConn) {
1873 try {
1874 t_fdm->removeReadFD(dc->d_socket);
1875 }
1876 catch (FDMultiplexerException &) {
1877 }
1878 dc->d_socket = -1;
1879 }
1880 else {
1881 Utility::gettimeofday(&g_now, 0); // needs to be updated
1882 struct timeval ttd = g_now;
1883 // If we cross from max to max-1 in flight requests, the fd was not listened to, add it back
1884 if (dc->d_tcpConnection->d_requestsInFlight == TCPConnection::s_maxInFlight - 1) {
1885 // A read error might have happened. If we add the fd back, it will most likely error again.
1886 // This is not a big issue, the next handleTCPClientReadable() will see another read error
1887 // and take action.
1888 ttd.tv_sec += g_tcpTimeout;
1889 t_fdm->addReadFD(dc->d_socket, handleRunningTCPQuestion, dc->d_tcpConnection, &ttd);
1890 } else {
1891 // fd might have been removed by read error code, so expect an exception
1892 try {
1893 t_fdm->setReadTTD(dc->d_socket, ttd, g_tcpTimeout);
1894 }
1895 catch (FDMultiplexerException &) {
1896 }
1897 }
1898 }
1899 }
1900 }
1901 float spent=makeFloat(sr.getNow()-dc->d_now);
1902 if(!g_quiet) {
1903 g_log<<Logger::Error<<t_id<<" ["<<MT->getTid()<<"/"<<MT->numProcesses()<<"] answer to "<<(dc->d_mdp.d_header.rd?"":"non-rd ")<<"question '"<<dc->d_mdp.d_qname<<"|"<<DNSRecordContent::NumberToType(dc->d_mdp.d_qtype);
1904 g_log<<"': "<<ntohs(pw.getHeader()->ancount)<<" answers, "<<ntohs(pw.getHeader()->arcount)<<" additional, took "<<sr.d_outqueries<<" packets, "<<
1905 sr.d_totUsec/1000.0<<" netw ms, "<< spent*1000.0<<" tot ms, "<<
1906 sr.d_throttledqueries<<" throttled, "<<sr.d_timeouts<<" timeouts, "<<sr.d_tcpoutqueries<<" tcp connections, rcode="<< res;
1907
1908 if(!shouldNotValidate && sr.isDNSSECValidationRequested()) {
1909 g_log<< ", dnssec="<<vStates[sr.getValidationState()];
1910 }
1911
1912 g_log<<endl;
1913
1914 }
1915
1916 if (sr.d_outqueries || sr.d_authzonequeries) {
1917 s_RC->cacheMisses++;
1918 }
1919 else {
1920 s_RC->cacheHits++;
1921 }
1922
1923 if(spent < 0.001)
1924 g_stats.answers0_1++;
1925 else if(spent < 0.010)
1926 g_stats.answers1_10++;
1927 else if(spent < 0.1)
1928 g_stats.answers10_100++;
1929 else if(spent < 1.0)
1930 g_stats.answers100_1000++;
1931 else
1932 g_stats.answersSlow++;
1933
1934 uint64_t newLat=(uint64_t)(spent*1000000);
1935 newLat = min(newLat,(uint64_t)(((uint64_t) g_networkTimeoutMsec)*1000)); // outliers of several minutes exist..
1936 g_stats.avgLatencyUsec=(1-1.0/g_latencyStatSize)*g_stats.avgLatencyUsec + (float)newLat/g_latencyStatSize;
1937 // no worries, we do this for packet cache hits elsewhere
1938
1939 auto ourtime = 1000.0*spent-sr.d_totUsec/1000.0; // in msec
1940 if(ourtime < 1)
1941 g_stats.ourtime0_1++;
1942 else if(ourtime < 2)
1943 g_stats.ourtime1_2++;
1944 else if(ourtime < 4)
1945 g_stats.ourtime2_4++;
1946 else if(ourtime < 8)
1947 g_stats.ourtime4_8++;
1948 else if(ourtime < 16)
1949 g_stats.ourtime8_16++;
1950 else if(ourtime < 32)
1951 g_stats.ourtime16_32++;
1952 else {
1953 // cerr<<"SLOW: "<<ourtime<<"ms -> "<<dc->d_mdp.d_qname<<"|"<<DNSRecordContent::NumberToType(dc->d_mdp.d_qtype)<<endl;
1954 g_stats.ourtimeSlow++;
1955 }
1956 if(ourtime >= 0.0) {
1957 newLat=ourtime*1000; // usec
1958 g_stats.avgLatencyOursUsec=(1-1.0/g_latencyStatSize)*g_stats.avgLatencyOursUsec + (float)newLat/g_latencyStatSize;
1959 }
1960 // cout<<dc->d_mdp.d_qname<<"\t"<<MT->getUsec()<<"\t"<<sr.d_outqueries<<endl;
1961 }
1962 catch(PDNSException &ae) {
1963 g_log<<Logger::Error<<"startDoResolve problem "<<makeLoginfo(dc)<<": "<<ae.reason<<endl;
1964 }
1965 catch(const MOADNSException &mde) {
1966 g_log<<Logger::Error<<"DNS parser error "<<makeLoginfo(dc) <<": "<<dc->d_mdp.d_qname<<", "<<mde.what()<<endl;
1967 }
1968 catch(std::exception& e) {
1969 g_log<<Logger::Error<<"STL error "<< makeLoginfo(dc)<<": "<<e.what();
1970
1971 // Luawrapper nests the exception from Lua, so we unnest it here
1972 try {
1973 std::rethrow_if_nested(e);
1974 } catch(const std::exception& ne) {
1975 g_log<<". Extra info: "<<ne.what();
1976 } catch(...) {}
1977
1978 g_log<<endl;
1979 }
1980 catch(...) {
1981 g_log<<Logger::Error<<"Any other exception in a resolver context "<< makeLoginfo(dc) <<endl;
1982 }
1983
1984 g_stats.maxMThreadStackUsage = max(MT->getMaxStackUsage(), g_stats.maxMThreadStackUsage);
1985 }
1986
1987 static void makeControlChannelSocket(int processNum=-1)
1988 {
1989 string sockname=::arg()["socket-dir"]+"/"+s_programname;
1990 if(processNum >= 0)
1991 sockname += "."+std::to_string(processNum);
1992 sockname+=".controlsocket";
1993 s_rcc.listen(sockname);
1994
1995 int sockowner = -1;
1996 int sockgroup = -1;
1997
1998 if (!::arg().isEmpty("socket-group"))
1999 sockgroup=::arg().asGid("socket-group");
2000 if (!::arg().isEmpty("socket-owner"))
2001 sockowner=::arg().asUid("socket-owner");
2002
2003 if (sockgroup > -1 || sockowner > -1) {
2004 if(chown(sockname.c_str(), sockowner, sockgroup) < 0) {
2005 unixDie("Failed to chown control socket");
2006 }
2007 }
2008
2009 // do mode change if socket-mode is given
2010 if(!::arg().isEmpty("socket-mode")) {
2011 mode_t sockmode=::arg().asMode("socket-mode");
2012 if(chmod(sockname.c_str(), sockmode) < 0) {
2013 unixDie("Failed to chmod control socket");
2014 }
2015 }
2016 }
2017
2018 static void getQNameAndSubnet(const std::string& question, DNSName* dnsname, uint16_t* qtype, uint16_t* qclass,
2019 bool& foundECS, EDNSSubnetOpts* ednssubnet, EDNSOptionViewMap* options,
2020 bool& foundXPF, ComboAddress* xpfSource, ComboAddress* xpfDest)
2021 {
2022 const bool lookForXPF = xpfSource != nullptr && g_xpfRRCode != 0;
2023 const bool lookForECS = ednssubnet != nullptr;
2024 const struct dnsheader* dh = reinterpret_cast<const struct dnsheader*>(question.c_str());
2025 size_t questionLen = question.length();
2026 unsigned int consumed=0;
2027 *dnsname=DNSName(question.c_str(), questionLen, sizeof(dnsheader), false, qtype, qclass, &consumed);
2028
2029 size_t pos= sizeof(dnsheader)+consumed+4;
2030 const size_t headerSize = /* root */ 1 + sizeof(dnsrecordheader);
2031 const uint16_t arcount = ntohs(dh->arcount);
2032
2033 for (uint16_t arpos = 0; arpos < arcount && questionLen > (pos + headerSize) && ((lookForECS && !foundECS) || (lookForXPF && !foundXPF)); arpos++) {
2034 if (question.at(pos) != 0) {
2035 /* not an OPT or a XPF, bye. */
2036 return;
2037 }
2038
2039 pos += 1;
2040 const dnsrecordheader* drh = reinterpret_cast<const dnsrecordheader*>(&question.at(pos));
2041 pos += sizeof(dnsrecordheader);
2042
2043 if (pos >= questionLen) {
2044 return;
2045 }
2046
2047 /* OPT root label (1) followed by type (2) */
2048 if(lookForECS && ntohs(drh->d_type) == QType::OPT) {
2049 if (!options) {
2050 char* ecsStart = nullptr;
2051 size_t ecsLen = 0;
2052 /* we need to pass the record len */
2053 int res = getEDNSOption(const_cast<char*>(reinterpret_cast<const char*>(&question.at(pos - sizeof(drh->d_clen)))), questionLen - pos + sizeof(drh->d_clen), EDNSOptionCode::ECS, &ecsStart, &ecsLen);
2054 if (res == 0 && ecsLen > 4) {
2055 EDNSSubnetOpts eso;
2056 if(getEDNSSubnetOptsFromString(ecsStart + 4, ecsLen - 4, &eso)) {
2057 *ednssubnet=eso;
2058 foundECS = true;
2059 }
2060 }
2061 }
2062 else {
2063 /* we need to pass the record len */
2064 int res = getEDNSOptions(reinterpret_cast<const char*>(&question.at(pos -sizeof(drh->d_clen))), questionLen - pos + (sizeof(drh->d_clen)), *options);
2065 if (res == 0) {
2066 const auto& it = options->find(EDNSOptionCode::ECS);
2067 if (it != options->end() && !it->second.values.empty() && it->second.values.at(0).content != nullptr && it->second.values.at(0).size > 0) {
2068 EDNSSubnetOpts eso;
2069 if(getEDNSSubnetOptsFromString(it->second.values.at(0).content, it->second.values.at(0).size, &eso)) {
2070 *ednssubnet=eso;
2071 foundECS = true;
2072 }
2073 }
2074 }
2075 }
2076 }
2077 else if (lookForXPF && ntohs(drh->d_type) == g_xpfRRCode && ntohs(drh->d_class) == QClass::IN && drh->d_ttl == 0) {
2078 if ((questionLen - pos) < ntohs(drh->d_clen)) {
2079 return;
2080 }
2081
2082 foundXPF = parseXPFPayload(reinterpret_cast<const char*>(&question.at(pos)), ntohs(drh->d_clen), *xpfSource, xpfDest);
2083 }
2084
2085 pos += ntohs(drh->d_clen);
2086 }
2087 }
2088
2089 static bool handleTCPReadResult(int fd, ssize_t bytes)
2090 {
2091 if (bytes == 0) {
2092 /* EOF */
2093 t_fdm->removeReadFD(fd);
2094 return false;
2095 }
2096 else if (bytes < 0) {
2097 if (errno != EAGAIN && errno != EWOULDBLOCK) {
2098 t_fdm->removeReadFD(fd);
2099 return false;
2100 }
2101 }
2102
2103 return true;
2104 }
2105
2106 static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var)
2107 {
2108 shared_ptr<TCPConnection> conn=any_cast<shared_ptr<TCPConnection> >(var);
2109
2110 if (conn->state == TCPConnection::PROXYPROTOCOLHEADER) {
2111 ssize_t bytes = recv(conn->getFD(), &conn->data.at(conn->proxyProtocolGot), conn->proxyProtocolNeed, 0);
2112 if (bytes <= 0) {
2113 handleTCPReadResult(fd, bytes);
2114 return;
2115 }
2116
2117 conn->proxyProtocolGot += bytes;
2118 conn->data.resize(conn->proxyProtocolGot);
2119 ssize_t remaining = isProxyHeaderComplete(conn->data);
2120 if (remaining == 0) {
2121 if (g_logCommonErrors) {
2122 g_log<<Logger::Error<<"Unable to consume proxy protocol header in packet from TCP client "<< conn->d_remote.toStringWithPort() <<endl;
2123 }
2124 ++g_stats.proxyProtocolInvalidCount;
2125 t_fdm->removeReadFD(fd);
2126 return;
2127 }
2128 else if (remaining < 0) {
2129 conn->proxyProtocolNeed = -remaining;
2130 conn->data.resize(conn->proxyProtocolGot + conn->proxyProtocolNeed);
2131 return;
2132 }
2133 else {
2134 /* proxy header received */
2135 /* we ignore the TCP field for now, but we could properly set whether
2136 the connection was received over UDP or TCP if needed */
2137 bool tcp;
2138 bool proxy = false;
2139 size_t used = parseProxyHeader(conn->data, proxy, conn->d_source, conn->d_destination, tcp, conn->proxyProtocolValues);
2140 if (used <= 0) {
2141 if (g_logCommonErrors) {
2142 g_log<<Logger::Error<<"Unable to parse proxy protocol header in packet from TCP client "<< conn->d_remote.toStringWithPort() <<endl;
2143 }
2144 ++g_stats.proxyProtocolInvalidCount;
2145 t_fdm->removeReadFD(fd);
2146 return;
2147 }
2148 else if (static_cast<size_t>(used) > g_proxyProtocolMaximumSize) {
2149 if (g_logCommonErrors) {
2150 g_log<<Logger::Error<<"Proxy protocol header in packet from TCP client "<< conn->d_remote.toStringWithPort() << " is larger than proxy-protocol-maximum-size (" << used << "), dropping"<< endl;
2151 }
2152 ++g_stats.proxyProtocolInvalidCount;
2153 t_fdm->removeReadFD(fd);
2154 return;
2155 }
2156
2157 /* Now that we have retrieved the address of the client, as advertised by the proxy
2158 via the proxy protocol header, check that it is allowed by our ACL */
2159 /* note that if the proxy header used a 'LOCAL' command, the original source and destination are untouched so everything should be fine */
2160 if (t_allowFrom && !t_allowFrom->match(&conn->d_source)) {
2161 if (!g_quiet) {
2162 g_log<<Logger::Error<<"["<<MT->getTid()<<"] dropping TCP query from "<<conn->d_source.toString()<<", address not matched by allow-from"<<endl;
2163 }
2164
2165 ++g_stats.unauthorizedTCP;
2166 t_fdm->removeReadFD(fd);
2167 return;
2168 }
2169
2170 conn->data.resize(2);
2171 conn->state = TCPConnection::BYTE0;
2172 }
2173 }
2174
2175 if (conn->state==TCPConnection::BYTE0) {
2176 ssize_t bytes=recv(conn->getFD(), &conn->data[0], 2, 0);
2177 if(bytes==1)
2178 conn->state=TCPConnection::BYTE1;
2179 if(bytes==2) {
2180 conn->qlen=(((unsigned char)conn->data[0]) << 8)+ (unsigned char)conn->data[1];
2181 conn->data.resize(conn->qlen);
2182 conn->bytesread=0;
2183 conn->state=TCPConnection::GETQUESTION;
2184 }
2185 if (bytes <= 0) {
2186 handleTCPReadResult(fd, bytes);
2187 return;
2188 }
2189 }
2190
2191 if (conn->state==TCPConnection::BYTE1) {
2192 ssize_t bytes=recv(conn->getFD(), &conn->data[1], 1, 0);
2193 if(bytes==1) {
2194 conn->state=TCPConnection::GETQUESTION;
2195 conn->qlen=(((unsigned char)conn->data[0]) << 8)+ (unsigned char)conn->data[1];
2196 conn->data.resize(conn->qlen);
2197 conn->bytesread=0;
2198 }
2199 if (bytes <= 0) {
2200 if (!handleTCPReadResult(fd, bytes)) {
2201 if(g_logCommonErrors) {
2202 g_log<<Logger::Error<<"TCP client "<< conn->d_remote.toStringWithPort() <<" disconnected after first byte"<<endl;
2203 }
2204 }
2205 return;
2206 }
2207 }
2208
2209 if(conn->state==TCPConnection::GETQUESTION) {
2210 ssize_t bytes=recv(conn->getFD(), &conn->data[conn->bytesread], conn->qlen - conn->bytesread, 0);
2211 if (bytes <= 0) {
2212 if (!handleTCPReadResult(fd, bytes)) {
2213 if(g_logCommonErrors) {
2214 g_log<<Logger::Error<<"TCP client "<< conn->d_remote.toStringWithPort() <<" disconnected while reading question body"<<endl;
2215 }
2216 }
2217 return;
2218 }
2219 else if (bytes > std::numeric_limits<std::uint16_t>::max()) {
2220 if(g_logCommonErrors) {
2221 g_log<<Logger::Error<<"TCP client "<< conn->d_remote.toStringWithPort() <<" sent an invalid question size while reading question body"<<endl;
2222 }
2223 t_fdm->removeReadFD(fd);
2224 return;
2225 }
2226 conn->bytesread+=(uint16_t)bytes;
2227 if(conn->bytesread==conn->qlen) {
2228 conn->state = TCPConnection::BYTE0;
2229 std::unique_ptr<DNSComboWriter> dc;
2230 try {
2231 dc=std::unique_ptr<DNSComboWriter>(new DNSComboWriter(conn->data, g_now));
2232 }
2233 catch(const MOADNSException &mde) {
2234 g_stats.clientParseError++;
2235 if(g_logCommonErrors)
2236 g_log<<Logger::Error<<"Unable to parse packet from TCP client "<< conn->d_remote.toStringWithPort() <<endl;
2237 return;
2238 }
2239 dc->d_tcpConnection = conn; // carry the torch
2240 dc->setSocket(conn->getFD()); // this is the only time a copy is made of the actual fd
2241 dc->d_tcp=true;
2242 dc->setRemote(conn->d_remote);
2243 dc->setSource(conn->d_source);
2244 ComboAddress dest;
2245 dest.reset();
2246 dest.sin4.sin_family = conn->d_remote.sin4.sin_family;
2247 socklen_t len = dest.getSocklen();
2248 getsockname(conn->getFD(), (sockaddr*)&dest, &len); // if this fails, we're ok with it
2249 dc->setLocal(dest);
2250 dc->setDestination(conn->d_destination);
2251 /* we can't move this if we want to be able to access the values in
2252 all queries sent over this connection */
2253 dc->d_proxyProtocolValues = conn->proxyProtocolValues;
2254 DNSName qname;
2255 uint16_t qtype=0;
2256 uint16_t qclass=0;
2257 bool needECS = false;
2258 bool needXPF = g_XPFAcl.match(conn->d_remote);
2259 string requestorId;
2260 string deviceId;
2261 string deviceName;
2262 bool logQuery = false;
2263 #ifdef HAVE_PROTOBUF
2264 auto luaconfsLocal = g_luaconfs.getLocal();
2265 if (checkProtobufExport(luaconfsLocal)) {
2266 needECS = true;
2267 }
2268 logQuery = t_protobufServers && luaconfsLocal->protobufExportConfig.logQueries;
2269 dc->d_logResponse = t_protobufServers && luaconfsLocal->protobufExportConfig.logResponses;
2270 #endif /* HAVE_PROTOBUF */
2271
2272 #ifdef HAVE_FSTRM
2273 checkFrameStreamExport(luaconfsLocal);
2274 #endif
2275
2276 if(needECS || needXPF || (t_pdl && (t_pdl->d_gettag_ffi || t_pdl->d_gettag))) {
2277
2278 try {
2279 EDNSOptionViewMap ednsOptions;
2280 bool xpfFound = false;
2281 dc->d_ecsParsed = true;
2282 dc->d_ecsFound = false;
2283 getQNameAndSubnet(conn->data, &qname, &qtype, &qclass,
2284 dc->d_ecsFound, &dc->d_ednssubnet, g_gettagNeedsEDNSOptions ? &ednsOptions : nullptr,
2285 xpfFound, needXPF ? &dc->d_source : nullptr, needXPF ? &dc->d_destination : nullptr);
2286
2287 if(t_pdl) {
2288 try {
2289 if (t_pdl->d_gettag_ffi) {
2290 dc->d_tag = t_pdl->gettag_ffi(dc->d_source, dc->d_ednssubnet.source, dc->d_destination, qname, qtype, &dc->d_policyTags, dc->d_records, dc->d_data, ednsOptions, true, dc->d_proxyProtocolValues, requestorId, deviceId, deviceName, dc->d_routingTag, dc->d_rcode, dc->d_ttlCap, dc->d_variable, logQuery, dc->d_logResponse, dc->d_followCNAMERecords);
2291 }
2292 else if (t_pdl->d_gettag) {
2293 dc->d_tag = t_pdl->gettag(dc->d_source, dc->d_ednssubnet.source, dc->d_destination, qname, qtype, &dc->d_policyTags, dc->d_data, ednsOptions, true, requestorId, deviceId, deviceName, dc->d_routingTag, dc->d_proxyProtocolValues);
2294 }
2295 }
2296 catch(const std::exception& e) {
2297 if(g_logCommonErrors)
2298 g_log<<Logger::Warning<<"Error parsing a query packet qname='"<<qname<<"' for tag determination, setting tag=0: "<<e.what()<<endl;
2299 }
2300 }
2301 }
2302 catch(const std::exception& e)
2303 {
2304 if(g_logCommonErrors)
2305 g_log<<Logger::Warning<<"Error parsing a query packet for tag determination, setting tag=0: "<<e.what()<<endl;
2306 }
2307 }
2308
2309 const struct dnsheader* dh = reinterpret_cast<const struct dnsheader*>(&conn->data[0]);
2310
2311 #ifdef HAVE_PROTOBUF
2312 if(t_protobufServers || t_outgoingProtobufServers) {
2313 dc->d_requestorId = requestorId;
2314 dc->d_deviceId = deviceId;
2315 dc->d_deviceName = deviceName;
2316 dc->d_uuid = getUniqueID();
2317 }
2318
2319 if(t_protobufServers) {
2320 try {
2321
2322 if (logQuery && !(luaconfsLocal->protobufExportConfig.taggedOnly && dc->d_policyTags.empty())) {
2323 protobufLogQuery(luaconfsLocal->protobufMaskV4, luaconfsLocal->protobufMaskV6, dc->d_uuid, dc->d_source, dc->d_destination, dc->d_ednssubnet.source, true, dh->id, conn->qlen, qname, qtype, qclass, dc->d_policyTags, dc->d_requestorId, dc->d_deviceId, dc->d_deviceName);
2324 }
2325 }
2326 catch(std::exception& e) {
2327 if(g_logCommonErrors)
2328 g_log<<Logger::Warning<<"Error parsing a TCP query packet for edns subnet: "<<e.what()<<endl;
2329 }
2330 }
2331 #endif
2332 if(t_pdl) {
2333 if(t_pdl->ipfilter(dc->d_source, dc->d_destination, *dh)) {
2334 if(!g_quiet)
2335 g_log<<Logger::Notice<<t_id<<" ["<<MT->getTid()<<"/"<<MT->numProcesses()<<"] DROPPED TCP question from "<<dc->d_source.toStringWithPort()<<(dc->d_source != dc->d_remote ? " (via "+dc->d_remote.toStringWithPort()+")" : "")<<" based on policy"<<endl;
2336 g_stats.policyDrops++;
2337 return;
2338 }
2339 }
2340
2341 if(dc->d_mdp.d_header.qr) {
2342 g_stats.ignoredCount++;
2343 if(g_logCommonErrors) {
2344 g_log<<Logger::Error<<"Ignoring answer from TCP client "<< dc->getRemote() <<" on server socket!"<<endl;
2345 }
2346 return;
2347 }
2348 if(dc->d_mdp.d_header.opcode) {
2349 g_stats.ignoredCount++;
2350 if(g_logCommonErrors) {
2351 g_log<<Logger::Error<<"Ignoring non-query opcode from TCP client "<< dc->getRemote() <<" on server socket!"<<endl;
2352 }
2353 return;
2354 }
2355 else if (dh->qdcount == 0) {
2356 g_stats.emptyQueriesCount++;
2357 if(g_logCommonErrors) {
2358 g_log<<Logger::Error<<"Ignoring empty (qdcount == 0) query from "<< dc->getRemote() <<" on server socket!"<<endl;
2359 }
2360 return;
2361 }
2362 else {
2363 ++g_stats.qcounter;
2364 ++g_stats.tcpqcounter;
2365 ++conn->d_requestsInFlight;
2366 if (conn->d_requestsInFlight >= TCPConnection::s_maxInFlight) {
2367 t_fdm->removeReadFD(fd); // should no longer awake ourselves when there is data to read
2368 } else {
2369 Utility::gettimeofday(&g_now, 0); // needed?
2370 struct timeval ttd = g_now;
2371 t_fdm->setReadTTD(fd, ttd, g_tcpTimeout);
2372 }
2373 MT->makeThread(startDoResolve, dc.release()); // deletes dc
2374 return;
2375 }
2376 }
2377 }
2378 }
2379
2380 static bool expectProxyProtocol(const ComboAddress& from)
2381 {
2382 return g_proxyProtocolACL.match(from);
2383 }
2384
2385 //! Handle new incoming TCP connection
2386 static void handleNewTCPQuestion(int fd, FDMultiplexer::funcparam_t& )
2387 {
2388 ComboAddress addr;
2389 socklen_t addrlen=sizeof(addr);
2390 int newsock=accept(fd, (struct sockaddr*)&addr, &addrlen);
2391 if(newsock>=0) {
2392 if(MT->numProcesses() > g_maxMThreads) {
2393 g_stats.overCapacityDrops++;
2394 try {
2395 closesocket(newsock);
2396 }
2397 catch(const PDNSException& e) {
2398 g_log<<Logger::Error<<"Error closing TCP socket after an over capacity drop: "<<e.reason<<endl;
2399 }
2400 return;
2401 }
2402
2403 if(t_remotes) {
2404 t_remotes->push_back(addr);
2405 }
2406
2407 bool fromProxyProtocolSource = expectProxyProtocol(addr);
2408 if(t_allowFrom && !t_allowFrom->match(&addr) && !fromProxyProtocolSource) {
2409 if(!g_quiet)
2410 g_log<<Logger::Error<<"["<<MT->getTid()<<"] dropping TCP query from "<<addr.toString()<<", address neither matched by allow-from nor proxy-protocol-from"<<endl;
2411
2412 g_stats.unauthorizedTCP++;
2413 try {
2414 closesocket(newsock);
2415 }
2416 catch(const PDNSException& e) {
2417 g_log<<Logger::Error<<"Error closing TCP socket after an ACL drop: "<<e.reason<<endl;
2418 }
2419 return;
2420 }
2421
2422 if(g_maxTCPPerClient && t_tcpClientCounts->count(addr) && (*t_tcpClientCounts)[addr] >= g_maxTCPPerClient) {
2423 g_stats.tcpClientOverflow++;
2424 try {
2425 closesocket(newsock); // don't call TCPConnection::closeAndCleanup here - did not enter it in the counts yet!
2426 }
2427 catch(const PDNSException& e) {
2428 g_log<<Logger::Error<<"Error closing TCP socket after an overflow drop: "<<e.reason<<endl;
2429 }
2430 return;
2431 }
2432
2433 setNonBlocking(newsock);
2434 std::shared_ptr<TCPConnection> tc = std::make_shared<TCPConnection>(newsock, addr);
2435 tc->d_source = addr;
2436 tc->d_destination.reset();
2437 tc->d_destination.sin4.sin_family = addr.sin4.sin_family;
2438 socklen_t len = tc->d_destination.getSocklen();
2439 getsockname(tc->getFD(), reinterpret_cast<sockaddr*>(&tc->d_destination), &len); // if this fails, we're ok with it
2440
2441 if (fromProxyProtocolSource) {
2442 tc->proxyProtocolNeed = s_proxyProtocolMinimumHeaderSize;
2443 tc->data.resize(tc->proxyProtocolNeed);
2444 tc->state = TCPConnection::PROXYPROTOCOLHEADER;
2445 }
2446 else {
2447 tc->state = TCPConnection::BYTE0;
2448 }
2449
2450 struct timeval ttd;
2451 Utility::gettimeofday(&ttd, 0);
2452 ttd.tv_sec += g_tcpTimeout;
2453
2454 t_fdm->addReadFD(tc->getFD(), handleRunningTCPQuestion, tc, &ttd);
2455 }
2456 }
2457
2458 static string* doProcessUDPQuestion(const std::string& question, const ComboAddress& fromaddr, const ComboAddress& destaddr, ComboAddress source, ComboAddress destination, struct timeval tv, int fd, std::vector<ProxyProtocolValue>& proxyProtocolValues)
2459 {
2460 gettimeofday(&g_now, 0);
2461 if (tv.tv_sec) {
2462 struct timeval diff = g_now - tv;
2463 double delta=(diff.tv_sec*1000 + diff.tv_usec/1000.0);
2464
2465 if(delta > 1000.0) {
2466 g_stats.tooOldDrops++;
2467 return nullptr;
2468 }
2469 }
2470
2471 ++g_stats.qcounter;
2472 if(fromaddr.sin4.sin_family==AF_INET6)
2473 g_stats.ipv6qcounter++;
2474
2475 string response;
2476 const struct dnsheader* dh = (struct dnsheader*)question.c_str();
2477 unsigned int ctag=0;
2478 uint32_t qhash = 0;
2479 bool needECS = false;
2480 bool needXPF = g_XPFAcl.match(fromaddr);
2481 std::unordered_set<std::string> policyTags;
2482 LuaContext::LuaObject data;
2483 string requestorId;
2484 string deviceId;
2485 string deviceName;
2486 string routingTag;
2487 bool logQuery = false;
2488 bool logResponse = false;
2489 #ifdef HAVE_PROTOBUF
2490 boost::uuids::uuid uniqueId;
2491 auto luaconfsLocal = g_luaconfs.getLocal();
2492 if (checkProtobufExport(luaconfsLocal)) {
2493 uniqueId = getUniqueID();
2494 needECS = true;
2495 } else if (checkOutgoingProtobufExport(luaconfsLocal)) {
2496 uniqueId = getUniqueID();
2497 }
2498 logQuery = t_protobufServers && luaconfsLocal->protobufExportConfig.logQueries;
2499 logResponse = t_protobufServers && luaconfsLocal->protobufExportConfig.logResponses;
2500 #endif
2501 #ifdef HAVE_FSTRM
2502 checkFrameStreamExport(luaconfsLocal);
2503 #endif
2504 EDNSSubnetOpts ednssubnet;
2505 bool ecsFound = false;
2506 bool ecsParsed = false;
2507 uint16_t ecsBegin = 0;
2508 uint16_t ecsEnd = 0;
2509 std::vector<DNSRecord> records;
2510 boost::optional<int> rcode = boost::none;
2511 uint32_t ttlCap = std::numeric_limits<uint32_t>::max();
2512 bool variable = false;
2513 bool followCNAMEs = false;
2514 try {
2515 DNSName qname;
2516 uint16_t qtype=0;
2517 uint16_t qclass=0;
2518 uint32_t age;
2519 bool qnameParsed=false;
2520 #ifdef MALLOC_TRACE
2521 /*
2522 static uint64_t last=0;
2523 if(!last)
2524 g_mtracer->clearAllocators();
2525 cout<<g_mtracer->getAllocs()-last<<" "<<g_mtracer->getNumOut()<<" -- BEGIN TRACE"<<endl;
2526 last=g_mtracer->getAllocs();
2527 cout<<g_mtracer->topAllocatorsString()<<endl;
2528 g_mtracer->clearAllocators();
2529 */
2530 #endif
2531
2532 if(needECS || needXPF || (t_pdl && (t_pdl->d_gettag || t_pdl->d_gettag_ffi))) {
2533 try {
2534 EDNSOptionViewMap ednsOptions;
2535 bool xpfFound = false;
2536
2537 ecsFound = false;
2538
2539 getQNameAndSubnet(question, &qname, &qtype, &qclass,
2540 ecsFound, &ednssubnet, g_gettagNeedsEDNSOptions ? &ednsOptions : nullptr,
2541 xpfFound, needXPF ? &source : nullptr, needXPF ? &destination : nullptr);
2542
2543 qnameParsed = true;
2544 ecsParsed = true;
2545
2546 if(t_pdl) {
2547 try {
2548 if (t_pdl->d_gettag_ffi) {
2549 ctag = t_pdl->gettag_ffi(source, ednssubnet.source, destination, qname, qtype, &policyTags, records, data, ednsOptions, false, proxyProtocolValues, requestorId, deviceId, deviceName, routingTag, rcode, ttlCap, variable, logQuery, logResponse, followCNAMEs);
2550 }
2551 else if (t_pdl->d_gettag) {
2552 ctag = t_pdl->gettag(source, ednssubnet.source, destination, qname, qtype, &policyTags, data, ednsOptions, false, requestorId, deviceId, deviceName, routingTag, proxyProtocolValues);
2553 }
2554 }
2555 catch(const std::exception& e) {
2556 if(g_logCommonErrors)
2557 g_log<<Logger::Warning<<"Error parsing a query packet qname='"<<qname<<"' for tag determination, setting tag=0: "<<e.what()<<endl;
2558 }
2559 }
2560 }
2561 catch(const std::exception& e)
2562 {
2563 if(g_logCommonErrors)
2564 g_log<<Logger::Warning<<"Error parsing a query packet for tag determination, setting tag=0: "<<e.what()<<endl;
2565 }
2566 }
2567
2568 bool cacheHit = false;
2569 boost::optional<RecProtoBufMessage> pbMessage(boost::none);
2570 #ifdef HAVE_PROTOBUF
2571 if (t_protobufServers) {
2572 pbMessage = RecProtoBufMessage(DNSProtoBufMessage::DNSProtoBufMessageType::Response);
2573 pbMessage->setServerIdentity(SyncRes::s_serverID);
2574 if (logQuery && !(luaconfsLocal->protobufExportConfig.taggedOnly && policyTags.empty())) {
2575 protobufLogQuery(luaconfsLocal->protobufMaskV4, luaconfsLocal->protobufMaskV6, uniqueId, source, destination, ednssubnet.source, false, dh->id, question.size(), qname, qtype, qclass, policyTags, requestorId, deviceId, deviceName);
2576 }
2577 }
2578 #endif /* HAVE_PROTOBUF */
2579
2580 /* It might seem like a good idea to skip the packet cache lookup if we know that the answer is not cacheable,
2581 but it means that the hash would not be computed. If some script decides at a later time to mark back the answer
2582 as cacheable we would cache it with a wrong tag, so better safe than sorry. */
2583 vState valState;
2584 if (qnameParsed) {
2585 cacheHit = (!SyncRes::s_nopacketcache && t_packetCache->getResponsePacket(ctag, question, qname, qtype, qclass, g_now.tv_sec, &response, &age, &valState, &qhash, &ecsBegin, &ecsEnd, pbMessage ? &(*pbMessage) : nullptr));
2586 }
2587 else {
2588 cacheHit = (!SyncRes::s_nopacketcache && t_packetCache->getResponsePacket(ctag, question, qname, &qtype, &qclass, g_now.tv_sec, &response, &age, &valState, &qhash, &ecsBegin, &ecsEnd, pbMessage ? &(*pbMessage) : nullptr));
2589 }
2590
2591 if (cacheHit) {
2592 if(valState == Bogus) {
2593 if(t_bogusremotes)
2594 t_bogusremotes->push_back(source);
2595 if(t_bogusqueryring)
2596 t_bogusqueryring->push_back(make_pair(qname, qtype));
2597 }
2598
2599 #ifdef HAVE_PROTOBUF
2600 if(t_protobufServers && logResponse && !(luaconfsLocal->protobufExportConfig.taggedOnly && pbMessage->getAppliedPolicy().empty() && pbMessage->getPolicyTags().empty())) {
2601 Netmask requestorNM(source, source.sin4.sin_family == AF_INET ? luaconfsLocal->protobufMaskV4 : luaconfsLocal->protobufMaskV6);
2602 ComboAddress requestor = requestorNM.getMaskedNetwork();
2603 requestor.setPort(source.getPort());
2604 pbMessage->update(uniqueId, &requestor, &destination, false, dh->id);
2605 pbMessage->setEDNSSubnet(ednssubnet.source, ednssubnet.source.isIPv4() ? luaconfsLocal->protobufMaskV4 : luaconfsLocal->protobufMaskV6);
2606 if (g_useKernelTimestamp && tv.tv_sec) {
2607 pbMessage->setQueryTime(tv.tv_sec, tv.tv_usec);
2608 }
2609 else {
2610 pbMessage->setQueryTime(g_now.tv_sec, g_now.tv_usec);
2611 }
2612 pbMessage->setRequestorId(requestorId);
2613 pbMessage->setDeviceId(deviceId);
2614 pbMessage->setDeviceName(deviceName);
2615 protobufLogResponse(*pbMessage);
2616 }
2617 #endif /* HAVE_PROTOBUF */
2618 if(!g_quiet)
2619 g_log<<Logger::Notice<<t_id<< " question answered from packet cache tag="<<ctag<<" from "<<source.toStringWithPort()<<(source != fromaddr ? " (via "+fromaddr.toStringWithPort()+")" : "")<<endl;
2620
2621 g_stats.packetCacheHits++;
2622 SyncRes::s_queries++;
2623 ageDNSPacket(response, age);
2624 struct msghdr msgh;
2625 struct iovec iov;
2626 cmsgbuf_aligned cbuf;
2627 fillMSGHdr(&msgh, &iov, &cbuf, 0, (char*)response.c_str(), response.length(), const_cast<ComboAddress*>(&fromaddr));
2628 msgh.msg_control=NULL;
2629
2630 if(g_fromtosockets.count(fd)) {
2631 addCMsgSrcAddr(&msgh, &cbuf, &destaddr, 0);
2632 }
2633 if(sendmsg(fd, &msgh, 0) < 0 && g_logCommonErrors) {
2634 int err = errno;
2635 g_log << Logger::Warning << "Sending UDP reply to client " << source.toStringWithPort()
2636 << (source != fromaddr ? " (via " + fromaddr.toStringWithPort() + ")" : "") << " failed with: "
2637 << strerror(err) << endl;
2638 }
2639 if(response.length() >= sizeof(struct dnsheader)) {
2640 struct dnsheader tmpdh;
2641 memcpy(&tmpdh, response.c_str(), sizeof(tmpdh));
2642 updateResponseStats(tmpdh.rcode, source, response.length(), 0, 0);
2643 }
2644 g_stats.avgLatencyUsec=(1-1.0/g_latencyStatSize)*g_stats.avgLatencyUsec + 0.0; // we assume 0 usec
2645 g_stats.avgLatencyOursUsec=(1-1.0/g_latencyStatSize)*g_stats.avgLatencyOursUsec + 0.0; // we assume 0 usec
2646 return 0;
2647 }
2648 }
2649 catch(std::exception& e) {
2650 if(g_logCommonErrors)
2651 g_log<<Logger::Error<<"Error processing or aging answer packet: "<<e.what()<<endl;
2652 return 0;
2653 }
2654
2655 if(t_pdl) {
2656 if(t_pdl->ipfilter(source, destination, *dh)) {
2657 if(!g_quiet)
2658 g_log<<Logger::Notice<<t_id<<" ["<<MT->getTid()<<"/"<<MT->numProcesses()<<"] DROPPED question from "<<source.toStringWithPort()<<(source != fromaddr ? " (via "+fromaddr.toStringWithPort()+")" : "")<<" based on policy"<<endl;
2659 g_stats.policyDrops++;
2660 return 0;
2661 }
2662 }
2663
2664 if(MT->numProcesses() > g_maxMThreads) {
2665 if(!g_quiet)
2666 g_log<<Logger::Notice<<t_id<<" ["<<MT->getTid()<<"/"<<MT->numProcesses()<<"] DROPPED question from "<<source.toStringWithPort()<<(source != fromaddr ? " (via "+fromaddr.toStringWithPort()+")" : "")<<", over capacity"<<endl;
2667
2668 g_stats.overCapacityDrops++;
2669 return 0;
2670 }
2671
2672 auto dc = std::unique_ptr<DNSComboWriter>(new DNSComboWriter(question, g_now, std::move(policyTags), std::move(data), std::move(records)));
2673 dc->setSocket(fd);
2674 dc->d_tag=ctag;
2675 dc->d_qhash=qhash;
2676 dc->setRemote(fromaddr);
2677 dc->setSource(source);
2678 dc->setLocal(destaddr);
2679 dc->setDestination(destination);
2680 dc->d_tcp=false;
2681 dc->d_ecsFound = ecsFound;
2682 dc->d_ecsParsed = ecsParsed;
2683 dc->d_ecsBegin = ecsBegin;
2684 dc->d_ecsEnd = ecsEnd;
2685 dc->d_ednssubnet = ednssubnet;
2686 dc->d_ttlCap = ttlCap;
2687 dc->d_variable = variable;
2688 dc->d_followCNAMERecords = followCNAMEs;
2689 dc->d_rcode = rcode;
2690 dc->d_logResponse = logResponse;
2691 #ifdef HAVE_PROTOBUF
2692 if (t_protobufServers || t_outgoingProtobufServers) {
2693 dc->d_uuid = std::move(uniqueId);
2694 }
2695 dc->d_requestorId = requestorId;
2696 dc->d_deviceId = deviceId;
2697 dc->d_deviceName = deviceName;
2698 dc->d_kernelTimestamp = tv;
2699 #endif
2700 dc->d_proxyProtocolValues = std::move(proxyProtocolValues);
2701 dc->d_routingTag = std::move(routingTag);
2702
2703 MT->makeThread(startDoResolve, (void*) dc.release()); // deletes dc
2704 return 0;
2705 }
2706
2707
2708 static void handleNewUDPQuestion(int fd, FDMultiplexer::funcparam_t& var)
2709 {
2710 ssize_t len;
2711 static const size_t maxIncomingQuerySize = g_proxyProtocolACL.empty() ? 512 : (512 + g_proxyProtocolMaximumSize);
2712 static thread_local std::string data;
2713 ComboAddress fromaddr;
2714 ComboAddress source;
2715 ComboAddress destination;
2716 struct msghdr msgh;
2717 struct iovec iov;
2718 cmsgbuf_aligned cbuf;
2719 bool firstQuery = true;
2720 std::vector<ProxyProtocolValue> proxyProtocolValues;
2721
2722 for(size_t queriesCounter = 0; queriesCounter < s_maxUDPQueriesPerRound; queriesCounter++) {
2723 bool proxyProto = false;
2724 data.resize(maxIncomingQuerySize);
2725 fromaddr.sin6.sin6_family=AF_INET6; // this makes sure fromaddr is big enough
2726 fillMSGHdr(&msgh, &iov, &cbuf, sizeof(cbuf), &data[0], data.size(), &fromaddr);
2727
2728 if((len=recvmsg(fd, &msgh, 0)) >= 0) {
2729
2730 firstQuery = false;
2731
2732 if (msgh.msg_flags & MSG_TRUNC) {
2733 g_stats.truncatedDrops++;
2734 if (!g_quiet) {
2735 g_log<<Logger::Error<<"Ignoring truncated query from "<<fromaddr.toString()<<endl;
2736 }
2737 return;
2738 }
2739
2740 data.resize(static_cast<size_t>(len));
2741
2742 if (expectProxyProtocol(fromaddr)) {
2743 bool tcp;
2744 ssize_t used = parseProxyHeader(data, proxyProto, source, destination, tcp, proxyProtocolValues);
2745 if (used <= 0) {
2746 ++g_stats.proxyProtocolInvalidCount;
2747 if (!g_quiet) {
2748 g_log<<Logger::Error<<"Ignoring invalid proxy protocol ("<<std::to_string(len)<<", "<<std::to_string(used)<<") query from "<<fromaddr.toStringWithPort()<<endl;
2749 }
2750 return;
2751 }
2752 else if (static_cast<size_t>(used) > g_proxyProtocolMaximumSize) {
2753 if (g_quiet) {
2754 g_log<<Logger::Error<<"Proxy protocol header in UDP packet from "<< fromaddr.toStringWithPort() << " is larger than proxy-protocol-maximum-size (" << used << "), dropping"<< endl;
2755 }
2756 ++g_stats.proxyProtocolInvalidCount;
2757 return;
2758 }
2759
2760 data.erase(0, used);
2761 }
2762 else if (len > 512) {
2763 /* we only allow UDP packets larger than 512 for those with a proxy protocol header */
2764 g_stats.truncatedDrops++;
2765 if (!g_quiet) {
2766 g_log<<Logger::Error<<"Ignoring truncated query from "<<fromaddr.toStringWithPort()<<endl;
2767 }
2768 return;
2769 }
2770
2771 if (data.size() < sizeof(dnsheader)) {
2772 g_stats.ignoredCount++;
2773 if (!g_quiet) {
2774 g_log<<Logger::Error<<"Ignoring too-short ("<<std::to_string(data.size())<<") query from "<<fromaddr.toString()<<endl;
2775 }
2776 return;
2777 }
2778
2779 if (!proxyProto) {
2780 source = fromaddr;
2781 }
2782
2783 if(t_remotes) {
2784 t_remotes->push_back(fromaddr);
2785 }
2786
2787 if(t_allowFrom && !t_allowFrom->match(&source)) {
2788 if(!g_quiet) {
2789 g_log<<Logger::Error<<"["<<MT->getTid()<<"] dropping UDP query from "<<source.toString()<<", address not matched by allow-from"<<endl;
2790 }
2791
2792 g_stats.unauthorizedUDP++;
2793 return;
2794 }
2795
2796 BOOST_STATIC_ASSERT(offsetof(sockaddr_in, sin_port) == offsetof(sockaddr_in6, sin6_port));
2797 if(!fromaddr.sin4.sin_port) { // also works for IPv6
2798 if(!g_quiet) {
2799 g_log<<Logger::Error<<"["<<MT->getTid()<<"] dropping UDP query from "<<fromaddr.toStringWithPort()<<", can't deal with port 0"<<endl;
2800 }
2801
2802 g_stats.clientParseError++; // not quite the best place to put it, but needs to go somewhere
2803 return;
2804 }
2805
2806 try {
2807 dnsheader* dh=(dnsheader*)&data[0];
2808
2809 if(dh->qr) {
2810 g_stats.ignoredCount++;
2811 if(g_logCommonErrors) {
2812 g_log<<Logger::Error<<"Ignoring answer from "<<fromaddr.toString()<<" on server socket!"<<endl;
2813 }
2814 }
2815 else if(dh->opcode) {
2816 g_stats.ignoredCount++;
2817 if(g_logCommonErrors) {
2818 g_log<<Logger::Error<<"Ignoring non-query opcode "<<dh->opcode<<" from "<<fromaddr.toString()<<" on server socket!"<<endl;
2819 }
2820 }
2821 else if (dh->qdcount == 0) {
2822 g_stats.emptyQueriesCount++;
2823 if(g_logCommonErrors) {
2824 g_log<<Logger::Error<<"Ignoring empty (qdcount == 0) query from "<<fromaddr.toString()<<" on server socket!"<<endl;
2825 }
2826 }
2827 else {
2828 struct timeval tv={0,0};
2829 HarvestTimestamp(&msgh, &tv);
2830 ComboAddress dest;
2831 dest.reset(); // this makes sure we ignore this address if not returned by recvmsg above
2832 auto loc = rplookup(g_listenSocketsAddresses, fd);
2833 if(HarvestDestinationAddress(&msgh, &dest)) {
2834 // but.. need to get port too
2835 if(loc) {
2836 dest.sin4.sin_port = loc->sin4.sin_port;
2837 }
2838 }
2839 else {
2840 if(loc) {
2841 dest = *loc;
2842 }
2843 else {
2844 dest.sin4.sin_family = fromaddr.sin4.sin_family;
2845 socklen_t slen = dest.getSocklen();
2846 getsockname(fd, (sockaddr*)&dest, &slen); // if this fails, we're ok with it
2847 }
2848 }
2849 if (!proxyProto) {
2850 destination = dest;
2851 }
2852
2853 if(g_weDistributeQueries) {
2854 distributeAsyncFunction(data, boost::bind(doProcessUDPQuestion, data, fromaddr, dest, source, destination, tv, fd, proxyProtocolValues));
2855 }
2856 else {
2857 ++s_threadInfos[t_id].numberOfDistributedQueries;
2858 doProcessUDPQuestion(data, fromaddr, dest, source, destination, tv, fd, proxyProtocolValues);
2859 }
2860 }
2861 }
2862 catch(const MOADNSException &mde) {
2863 g_stats.clientParseError++;
2864 if(g_logCommonErrors) {
2865 g_log<<Logger::Error<<"Unable to parse packet from remote UDP client "<<fromaddr.toString() <<": "<<mde.what()<<endl;
2866 }
2867 }
2868 catch(const std::runtime_error& e) {
2869 g_stats.clientParseError++;
2870 if(g_logCommonErrors) {
2871 g_log<<Logger::Error<<"Unable to parse packet from remote UDP client "<<fromaddr.toString() <<": "<<e.what()<<endl;
2872 }
2873 }
2874 }
2875 else {
2876 // cerr<<t_id<<" had error: "<<stringerror()<<endl;
2877 if(firstQuery && errno == EAGAIN) {
2878 g_stats.noPacketError++;
2879 }
2880
2881 break;
2882 }
2883 }
2884 }
2885
2886 static void makeTCPServerSockets(deferredAdd_t& deferredAdds, std::set<int>& tcpSockets)
2887 {
2888 int fd;
2889 vector<string>locals;
2890 stringtok(locals,::arg()["local-address"]," ,");
2891
2892 if(locals.empty())
2893 throw PDNSException("No local address specified");
2894
2895 for(vector<string>::const_iterator i=locals.begin();i!=locals.end();++i) {
2896 ServiceTuple st;
2897 st.port=::arg().asNum("local-port");
2898 parseService(*i, st);
2899
2900 ComboAddress sin;
2901
2902 sin.reset();
2903 sin.sin4.sin_family = AF_INET;
2904 if(!IpToU32(st.host, (uint32_t*)&sin.sin4.sin_addr.s_addr)) {
2905 sin.sin6.sin6_family = AF_INET6;
2906 if(makeIPv6sockaddr(st.host, &sin.sin6) < 0)
2907 throw PDNSException("Unable to resolve local address for TCP server on '"+ st.host +"'");
2908 }
2909
2910 fd=socket(sin.sin6.sin6_family, SOCK_STREAM, 0);
2911 if(fd<0)
2912 throw PDNSException("Making a TCP server socket for resolver: "+stringerror());
2913
2914 setCloseOnExec(fd);
2915
2916 int tmp=1;
2917 if(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &tmp, sizeof tmp)<0) {
2918 g_log<<Logger::Error<<"Setsockopt failed for TCP listening socket"<<endl;
2919 exit(1);
2920 }
2921 if(sin.sin6.sin6_family == AF_INET6 && setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &tmp, sizeof(tmp)) < 0) {
2922 int err = errno;
2923 g_log<<Logger::Error<<"Failed to set IPv6 socket to IPv6 only, continuing anyhow: "<<strerror(err)<<endl;
2924 }
2925
2926 #ifdef TCP_DEFER_ACCEPT
2927 if(setsockopt(fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &tmp, sizeof tmp) >= 0) {
2928 if(i==locals.begin())
2929 g_log<<Logger::Info<<"Enabled TCP data-ready filter for (slight) DoS protection"<<endl;
2930 }
2931 #endif
2932
2933 if( ::arg().mustDo("non-local-bind") )
2934 Utility::setBindAny(AF_INET, fd);
2935
2936 #ifdef SO_REUSEPORT
2937 if(g_reusePort) {
2938 if(setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &tmp, sizeof(tmp)) < 0)
2939 throw PDNSException("SO_REUSEPORT: "+stringerror());
2940 }
2941 #endif
2942
2943 if (::arg().asNum("tcp-fast-open") > 0) {
2944 #ifdef TCP_FASTOPEN
2945 int fastOpenQueueSize = ::arg().asNum("tcp-fast-open");
2946 if (setsockopt(fd, IPPROTO_TCP, TCP_FASTOPEN, &fastOpenQueueSize, sizeof fastOpenQueueSize) < 0) {
2947 int err = errno;
2948 g_log<<Logger::Error<<"Failed to enable TCP Fast Open for listening socket: "<<strerror(err)<<endl;
2949 }
2950 #else
2951 g_log<<Logger::Warning<<"TCP Fast Open configured but not supported for listening socket"<<endl;
2952 #endif
2953 }
2954
2955 sin.sin4.sin_port = htons(st.port);
2956 socklen_t socklen=sin.sin4.sin_family==AF_INET ? sizeof(sin.sin4) : sizeof(sin.sin6);
2957 if (::bind(fd, (struct sockaddr *)&sin, socklen )<0)
2958 throw PDNSException("Binding TCP server socket for "+ st.host +": "+stringerror());
2959
2960 setNonBlocking(fd);
2961 setSocketSendBuffer(fd, 65000);
2962 listen(fd, 128);
2963 deferredAdds.push_back(make_pair(fd, handleNewTCPQuestion));
2964 tcpSockets.insert(fd);
2965
2966 // we don't need to update g_listenSocketsAddresses since it doesn't work for TCP/IP:
2967 // - fd is not that which we know here, but returned from accept()
2968 if(sin.sin4.sin_family == AF_INET)
2969 g_log<<Logger::Info<<"Listening for TCP queries on "<< sin.toString() <<":"<<st.port<<endl;
2970 else
2971 g_log<<Logger::Info<<"Listening for TCP queries on ["<< sin.toString() <<"]:"<<st.port<<endl;
2972 }
2973 }
2974
2975 static void makeUDPServerSockets(deferredAdd_t& deferredAdds)
2976 {
2977 int one=1;
2978 vector<string>locals;
2979 stringtok(locals,::arg()["local-address"]," ,");
2980
2981 if(locals.empty())
2982 throw PDNSException("No local address specified");
2983
2984 for(vector<string>::const_iterator i=locals.begin();i!=locals.end();++i) {
2985 ServiceTuple st;
2986 st.port=::arg().asNum("local-port");
2987 parseService(*i, st);
2988
2989 ComboAddress sin;
2990
2991 sin.reset();
2992 sin.sin4.sin_family = AF_INET;
2993 if(!IpToU32(st.host.c_str() , (uint32_t*)&sin.sin4.sin_addr.s_addr)) {
2994 sin.sin6.sin6_family = AF_INET6;
2995 if(makeIPv6sockaddr(st.host, &sin.sin6) < 0)
2996 throw PDNSException("Unable to resolve local address for UDP server on '"+ st.host +"'");
2997 }
2998
2999 int fd=socket(sin.sin4.sin_family, SOCK_DGRAM, 0);
3000 if(fd < 0) {
3001 throw PDNSException("Making a UDP server socket for resolver: "+stringerror());
3002 }
3003 if (!setSocketTimestamps(fd))
3004 g_log<<Logger::Warning<<"Unable to enable timestamp reporting for socket"<<endl;
3005
3006 if(IsAnyAddress(sin)) {
3007 if(sin.sin4.sin_family == AF_INET)
3008 if(!setsockopt(fd, IPPROTO_IP, GEN_IP_PKTINFO, &one, sizeof(one))) // linux supports this, so why not - might fail on other systems
3009 g_fromtosockets.insert(fd);
3010 #ifdef IPV6_RECVPKTINFO
3011 if(sin.sin4.sin_family == AF_INET6)
3012 if(!setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, &one, sizeof(one)))
3013 g_fromtosockets.insert(fd);
3014 #endif
3015 if(sin.sin6.sin6_family == AF_INET6 && setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one)) < 0) {
3016 int err = errno;
3017 g_log<<Logger::Error<<"Failed to set IPv6 socket to IPv6 only, continuing anyhow: "<<strerror(err)<<endl;
3018 }
3019 }
3020 if( ::arg().mustDo("non-local-bind") )
3021 Utility::setBindAny(AF_INET6, fd);
3022
3023 setCloseOnExec(fd);
3024
3025 setSocketReceiveBuffer(fd, 250000);
3026 sin.sin4.sin_port = htons(st.port);
3027
3028
3029 #ifdef SO_REUSEPORT
3030 if(g_reusePort) {
3031 if(setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &one, sizeof(one)) < 0)
3032 throw PDNSException("SO_REUSEPORT: "+stringerror());
3033 }
3034 #endif
3035
3036 if (sin.isIPv4()) {
3037 try {
3038 setSocketIgnorePMTU(fd);
3039 }
3040 catch(const std::exception& e) {
3041 g_log<<Logger::Warning<<"Failed to set IP_MTU_DISCOVER on UDP server socket: "<<e.what()<<endl;
3042 }
3043 }
3044
3045 socklen_t socklen=sin.getSocklen();
3046 if (::bind(fd, (struct sockaddr *)&sin, socklen)<0)
3047 throw PDNSException("Resolver binding to server socket on port "+ std::to_string(st.port) +" for "+ st.host+": "+stringerror());
3048
3049 setNonBlocking(fd);
3050
3051 deferredAdds.push_back(make_pair(fd, handleNewUDPQuestion));
3052 g_listenSocketsAddresses[fd]=sin; // this is written to only from the startup thread, not from the workers
3053 if(sin.sin4.sin_family == AF_INET)
3054 g_log<<Logger::Info<<"Listening for UDP queries on "<< sin.toString() <<":"<<st.port<<endl;
3055 else
3056 g_log<<Logger::Info<<"Listening for UDP queries on ["<< sin.toString() <<"]:"<<st.port<<endl;
3057 }
3058 }
3059
3060 static void daemonize(void)
3061 {
3062 if(fork())
3063 exit(0); // bye bye
3064
3065 setsid();
3066
3067 int i=open("/dev/null",O_RDWR); /* open stdin */
3068 if(i < 0)
3069 g_log<<Logger::Critical<<"Unable to open /dev/null: "<<stringerror()<<endl;
3070 else {
3071 dup2(i,0); /* stdin */
3072 dup2(i,1); /* stderr */
3073 dup2(i,2); /* stderr */
3074 close(i);
3075 }
3076 }
3077
3078 static void termIntHandler(int)
3079 {
3080 doExit();
3081 }
3082
3083 static void usr1Handler(int)
3084 {
3085 statsWanted=true;
3086 }
3087
3088 static void usr2Handler(int)
3089 {
3090 g_quiet= !g_quiet;
3091 SyncRes::setDefaultLogMode(g_quiet ? SyncRes::LogNone : SyncRes::Log);
3092 ::arg().set("quiet")=g_quiet ? "" : "no";
3093 }
3094
3095 static void doStats(void)
3096 {
3097 static time_t lastOutputTime;
3098 static uint64_t lastQueryCount;
3099
3100 uint64_t cacheHits = s_RC->cacheHits;
3101 uint64_t cacheMisses = s_RC->cacheMisses;
3102 uint64_t cacheSize = s_RC->size();
3103 auto rc_stats = s_RC->stats();
3104 double r = rc_stats.second == 0 ? 0.0 : (100.0 * rc_stats.first / rc_stats.second);
3105
3106 if(g_stats.qcounter && (cacheHits + cacheMisses) && SyncRes::s_queries && SyncRes::s_outqueries) {
3107 g_log<<Logger::Notice<<"stats: "<<g_stats.qcounter<<" questions, "<<
3108 cacheSize << " cache entries, "<<
3109 broadcastAccFunction<uint64_t>(pleaseGetNegCacheSize)<<" negative entries, "<<
3110 (int)((cacheHits*100.0)/(cacheHits+cacheMisses))<<"% cache hits"<<endl;
3111 g_log << Logger::Notice<< "stats: cache contended/acquired " << rc_stats.first << '/' << rc_stats.second << " = " << r << '%' << endl;
3112
3113 g_log<<Logger::Notice<<"stats: throttle map: "
3114 << broadcastAccFunction<uint64_t>(pleaseGetThrottleSize) <<", ns speeds: "
3115 << broadcastAccFunction<uint64_t>(pleaseGetNsSpeedsSize)<<", failed ns: "
3116 << broadcastAccFunction<uint64_t>(pleaseGetFailedServersSize)<<", ednsmap: "
3117 <<broadcastAccFunction<uint64_t>(pleaseGetEDNSStatusesSize)<<endl;
3118 g_log<<Logger::Notice<<"stats: outpacket/query ratio "<<(int)(SyncRes::s_outqueries*100.0/SyncRes::s_queries)<<"%";
3119 g_log<<Logger::Notice<<", "<<(int)(SyncRes::s_throttledqueries*100.0/(SyncRes::s_outqueries+SyncRes::s_throttledqueries))<<"% throttled, "
3120 <<SyncRes::s_nodelegated<<" no-delegation drops"<<endl;
3121 g_log<<Logger::Notice<<"stats: "<<SyncRes::s_tcpoutqueries<<" outgoing tcp connections, "<<
3122 broadcastAccFunction<uint64_t>(pleaseGetConcurrentQueries)<<" queries running, "<<SyncRes::s_outgoingtimeouts<<" outgoing timeouts"<<endl;
3123
3124 //g_log<<Logger::Notice<<"stats: "<<g_stats.ednsPingMatches<<" ping matches, "<<g_stats.ednsPingMismatches<<" mismatches, "<<
3125 //g_stats.noPingOutQueries<<" outqueries w/o ping, "<< g_stats.noEdnsOutQueries<<" w/o EDNS"<<endl;
3126
3127 g_log<<Logger::Notice<<"stats: " << broadcastAccFunction<uint64_t>(pleaseGetPacketCacheSize) <<
3128 " packet cache entries, "<<(int)(100.0*broadcastAccFunction<uint64_t>(pleaseGetPacketCacheHits)/SyncRes::s_queries) << "% packet cache hits"<<endl;
3129
3130 size_t idx = 0;
3131 for (const auto& threadInfo : s_threadInfos) {
3132 if(threadInfo.isWorker) {
3133 g_log<<Logger::Notice<<"stats: thread "<<idx<<" has been distributed "<<threadInfo.numberOfDistributedQueries<<" queries"<<endl;
3134 ++idx;
3135 }
3136 }
3137
3138 time_t now = time(0);
3139 if(lastOutputTime && lastQueryCount && now != lastOutputTime) {
3140 g_log<<Logger::Notice<<"stats: "<< (SyncRes::s_queries - lastQueryCount) / (now - lastOutputTime) <<" qps (average over "<< (now - lastOutputTime) << " seconds)"<<endl;
3141 }
3142 lastOutputTime = now;
3143 lastQueryCount = SyncRes::s_queries;
3144 }
3145 else if(statsWanted)
3146 g_log<<Logger::Notice<<"stats: no stats yet!"<<endl;
3147
3148 statsWanted=false;
3149 }
3150
3151 static void houseKeeping(void *)
3152 {
3153 static thread_local time_t last_rootupdate, last_secpoll, last_trustAnchorUpdate{0}, last_RC_prune;
3154 static thread_local struct timeval last_prune;
3155
3156 static thread_local int cleanCounter=0;
3157 static thread_local bool s_running; // houseKeeping can get suspended in secpoll, and be restarted, which makes us do duplicate work
3158 auto luaconfsLocal = g_luaconfs.getLocal();
3159
3160 if (last_trustAnchorUpdate == 0 && !luaconfsLocal->trustAnchorFileInfo.fname.empty() && luaconfsLocal->trustAnchorFileInfo.interval != 0) {
3161 // Loading the Lua config file already "refreshed" the TAs
3162 last_trustAnchorUpdate = g_now.tv_sec + luaconfsLocal->trustAnchorFileInfo.interval * 3600;
3163 }
3164
3165 try {
3166 if(s_running) {
3167 return;
3168 }
3169 s_running=true;
3170
3171 struct timeval now, past;
3172 Utility::gettimeofday(&now, nullptr);
3173 past = now;
3174 past.tv_sec -= 5;
3175 if (last_prune < past) {
3176 t_packetCache->doPruneTo(g_maxPacketCacheEntries / g_numWorkerThreads);
3177 SyncRes::pruneNegCache(g_maxCacheEntries / (g_numWorkerThreads * 10));
3178
3179 time_t limit;
3180 if(!((cleanCounter++)%40)) { // this is a full scan!
3181 limit=now.tv_sec-300;
3182 SyncRes::pruneNSSpeeds(limit);
3183 }
3184 limit = now.tv_sec - SyncRes::s_serverdownthrottletime * 10;
3185 SyncRes::pruneFailedServers(limit);
3186 limit = now.tv_sec - 2*3600;
3187 SyncRes::pruneEDNSStatuses(limit);
3188 SyncRes::pruneThrottledServers();
3189 Utility::gettimeofday(&last_prune, nullptr);
3190 }
3191
3192 if(isHandlerThread()) {
3193 if (now.tv_sec - last_RC_prune > 5) {
3194 s_RC->doPrune(g_maxCacheEntries);
3195 last_RC_prune = now.tv_sec;
3196 }
3197 // XXX !!! global
3198 if(now.tv_sec - last_rootupdate > 7200) {
3199 int res = SyncRes::getRootNS(g_now, nullptr);
3200 if (!res) {
3201 last_rootupdate=now.tv_sec;
3202 primeRootNSZones(g_dnssecmode != DNSSECMode::Off);
3203 }
3204 }
3205
3206 if(now.tv_sec - last_secpoll >= 3600) {
3207 try {
3208 doSecPoll(&last_secpoll);
3209 }
3210 catch(const std::exception& e)
3211 {
3212 g_log<<Logger::Error<<"Exception while performing security poll: "<<e.what()<<endl;
3213 }
3214 catch(const PDNSException& e)
3215 {
3216 g_log<<Logger::Error<<"Exception while performing security poll: "<<e.reason<<endl;
3217 }
3218 catch(const ImmediateServFailException &e)
3219 {
3220 g_log<<Logger::Error<<"Exception while performing security poll: "<<e.reason<<endl;
3221 }
3222 catch(const PolicyHitException& e) {
3223 g_log<<Logger::Error<<"Policy hit while performing security poll"<<endl;
3224 }
3225 catch(...)
3226 {
3227 g_log<<Logger::Error<<"Exception while performing security poll"<<endl;
3228 }
3229 }
3230
3231 if (!luaconfsLocal->trustAnchorFileInfo.fname.empty() && luaconfsLocal->trustAnchorFileInfo.interval != 0 &&
3232 g_now.tv_sec - last_trustAnchorUpdate >= (luaconfsLocal->trustAnchorFileInfo.interval * 3600)) {
3233 g_log<<Logger::Debug<<"Refreshing Trust Anchors from file"<<endl;
3234 try {
3235 map<DNSName, dsmap_t> dsAnchors;
3236 if (updateTrustAnchorsFromFile(luaconfsLocal->trustAnchorFileInfo.fname, dsAnchors)) {
3237 g_luaconfs.modify([&dsAnchors](LuaConfigItems& lci) {
3238 lci.dsAnchors = dsAnchors;
3239 });
3240 }
3241 last_trustAnchorUpdate = now.tv_sec;
3242 } catch (const PDNSException &pe) {
3243 g_log<<Logger::Error<<"Unable to update Trust Anchors: "<<pe.reason<<endl;
3244 }
3245 }
3246 }
3247 s_running=false;
3248 }
3249 catch(PDNSException& ae)
3250 {
3251 s_running=false;
3252 g_log<<Logger::Error<<"Fatal error in housekeeping thread: "<<ae.reason<<endl;
3253 throw;
3254 }
3255 }
3256
3257 static void makeThreadPipes()
3258 {
3259 auto pipeBufferSize = ::arg().asNum("distribution-pipe-buffer-size");
3260 if (pipeBufferSize > 0) {
3261 g_log<<Logger::Info<<"Resizing the buffer of the distribution pipe to "<<pipeBufferSize<<endl;
3262 }
3263
3264 /* thread 0 is the handler / SNMP, we start at 1 */
3265 for(unsigned int n = 1; n <= (g_numWorkerThreads + g_numDistributorThreads); ++n) {
3266 auto& threadInfos = s_threadInfos.at(n);
3267
3268 int fd[2];
3269 if(pipe(fd) < 0)
3270 unixDie("Creating pipe for inter-thread communications");
3271
3272 threadInfos.pipes.readToThread = fd[0];
3273 threadInfos.pipes.writeToThread = fd[1];
3274
3275 if(pipe(fd) < 0)
3276 unixDie("Creating pipe for inter-thread communications");
3277
3278 threadInfos.pipes.readFromThread = fd[0];
3279 threadInfos.pipes.writeFromThread = fd[1];
3280
3281 if(pipe(fd) < 0)
3282 unixDie("Creating pipe for inter-thread communications");
3283
3284 threadInfos.pipes.readQueriesToThread = fd[0];
3285 threadInfos.pipes.writeQueriesToThread = fd[1];
3286
3287 if (pipeBufferSize > 0) {
3288 if (!setPipeBufferSize(threadInfos.pipes.writeQueriesToThread, pipeBufferSize)) {
3289 int err = errno;
3290 g_log<<Logger::Warning<<"Error resizing the buffer of the distribution pipe for thread "<<n<<" to "<<pipeBufferSize<<": "<<strerror(err)<<endl;
3291 auto existingSize = getPipeBufferSize(threadInfos.pipes.writeQueriesToThread);
3292 if (existingSize > 0) {
3293 g_log<<Logger::Warning<<"The current size of the distribution pipe's buffer for thread "<<n<<" is "<<existingSize<<endl;
3294 }
3295 }
3296 }
3297
3298 if (!setNonBlocking(threadInfos.pipes.writeQueriesToThread)) {
3299 unixDie("Making pipe for inter-thread communications non-blocking");
3300 }
3301 }
3302 }
3303
3304 struct ThreadMSG
3305 {
3306 pipefunc_t func;
3307 bool wantAnswer;
3308 };
3309
3310 void broadcastFunction(const pipefunc_t& func)
3311 {
3312 /* This function might be called by the worker with t_id 0 during startup
3313 for the initialization of ACLs and domain maps. After that it should only
3314 be called by the handler. */
3315
3316 if (s_threadInfos.empty() && isHandlerThread()) {
3317 /* the handler and distributors will call themselves below, but
3318 during startup we get called while s_threadInfos has not been
3319 populated yet to update the ACL or domain maps, so we need to
3320 handle that case.
3321 */
3322 func();
3323 }
3324
3325 unsigned int n = 0;
3326 for (const auto& threadInfo : s_threadInfos) {
3327 if(n++ == t_id) {
3328 func(); // don't write to ourselves!
3329 continue;
3330 }
3331
3332 ThreadMSG* tmsg = new ThreadMSG();
3333 tmsg->func = func;
3334 tmsg->wantAnswer = true;
3335 if(write(threadInfo.pipes.writeToThread, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) {
3336 delete tmsg;
3337
3338 unixDie("write to thread pipe returned wrong size or error");
3339 }
3340
3341 string* resp = nullptr;
3342 if(read(threadInfo.pipes.readFromThread, &resp, sizeof(resp)) != sizeof(resp))
3343 unixDie("read from thread pipe returned wrong size or error");
3344
3345 if(resp) {
3346 delete resp;
3347 resp = nullptr;
3348 }
3349 }
3350 }
3351
3352 static bool trySendingQueryToWorker(unsigned int target, ThreadMSG* tmsg)
3353 {
3354 auto& targetInfo = s_threadInfos[target];
3355 if(!targetInfo.isWorker) {
3356 g_log<<Logger::Error<<"distributeAsyncFunction() tried to assign a query to a non-worker thread"<<endl;
3357 exit(1);
3358 }
3359
3360 const auto& tps = targetInfo.pipes;
3361
3362 ssize_t written = write(tps.writeQueriesToThread, &tmsg, sizeof(tmsg));
3363 if (written > 0) {
3364 if (static_cast<size_t>(written) != sizeof(tmsg)) {
3365 delete tmsg;
3366 unixDie("write to thread pipe returned wrong size or error");
3367 }
3368 }
3369 else {
3370 int error = errno;
3371 if (error == EAGAIN || error == EWOULDBLOCK) {
3372 return false;
3373 } else {
3374 delete tmsg;
3375 unixDie("write to thread pipe returned wrong size or error:" + std::to_string(error));
3376 }
3377 }
3378
3379 ++targetInfo.numberOfDistributedQueries;
3380
3381 return true;
3382 }
3383
3384 static unsigned int getWorkerLoad(size_t workerIdx)
3385 {
3386 const auto mt = s_threadInfos[/* skip handler */ 1 + g_numDistributorThreads + workerIdx].mt;
3387 if (mt != nullptr) {
3388 return mt->numProcesses();
3389 }
3390 return 0;
3391 }
3392
3393 static unsigned int selectWorker(unsigned int hash)
3394 {
3395 if (s_balancingFactor == 0) {
3396 return /* skip handler */ 1 + g_numDistributorThreads + (hash % g_numWorkerThreads);
3397 }
3398
3399 /* we start with one, representing the query we are currently handling */
3400 double currentLoad = 1;
3401 std::vector<unsigned int> load(g_numWorkerThreads);
3402 for (size_t idx = 0; idx < g_numWorkerThreads; idx++) {
3403 load[idx] = getWorkerLoad(idx);
3404 currentLoad += load[idx];
3405 // cerr<<"load for worker "<<idx<<" is "<<load[idx]<<endl;
3406 }
3407
3408 double targetLoad = (currentLoad / g_numWorkerThreads) * s_balancingFactor;
3409 // cerr<<"total load is "<<currentLoad<<", number of workers is "<<g_numWorkerThreads<<", target load is "<<targetLoad<<endl;
3410
3411 unsigned int worker = hash % g_numWorkerThreads;
3412 /* at least one server has to be at or below the average load */
3413 if (load[worker] > targetLoad) {
3414 ++g_stats.rebalancedQueries;
3415 do {
3416 // cerr<<"worker "<<worker<<" is above the target load, selecting another one"<<endl;
3417 worker = (worker + 1) % g_numWorkerThreads;
3418 }
3419 while(load[worker] > targetLoad);
3420 }
3421
3422 return /* skip handler */ 1 + g_numDistributorThreads + worker;
3423 }
3424
3425 // This function is only called by the distributor threads, when pdns-distributes-queries is set
3426 void distributeAsyncFunction(const string& packet, const pipefunc_t& func)
3427 {
3428 if (!isDistributorThread()) {
3429 g_log<<Logger::Error<<"distributeAsyncFunction() has been called by a worker ("<<t_id<<")"<<endl;
3430 exit(1);
3431 }
3432
3433 unsigned int hash = hashQuestion(packet.c_str(), packet.length(), g_disthashseed);
3434 unsigned int target = selectWorker(hash);
3435
3436 ThreadMSG* tmsg = new ThreadMSG();
3437 tmsg->func = func;
3438 tmsg->wantAnswer = false;
3439
3440 if (!trySendingQueryToWorker(target, tmsg)) {
3441 /* if this function failed but did not raise an exception, it means that the pipe
3442 was full, let's try another one */
3443 unsigned int newTarget = 0;
3444 do {
3445 newTarget = /* skip handler */ 1 + g_numDistributorThreads + dns_random(g_numWorkerThreads);
3446 } while (newTarget == target);
3447
3448 if (!trySendingQueryToWorker(newTarget, tmsg)) {
3449 g_stats.queryPipeFullDrops++;
3450 delete tmsg;
3451 }
3452 }
3453 }
3454
3455 static void handlePipeRequest(int fd, FDMultiplexer::funcparam_t& var)
3456 {
3457 ThreadMSG* tmsg = nullptr;
3458
3459 if(read(fd, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) { // fd == readToThread || fd == readQueriesToThread
3460 unixDie("read from thread pipe returned wrong size or error");
3461 }
3462
3463 void *resp=0;
3464 try {
3465 resp = tmsg->func();
3466 }
3467 catch(std::exception& e) {
3468 if(g_logCommonErrors)
3469 g_log<<Logger::Error<<"PIPE function we executed created exception: "<<e.what()<<endl; // but what if they wanted an answer.. we send 0
3470 }
3471 catch(PDNSException& e) {
3472 if(g_logCommonErrors)
3473 g_log<<Logger::Error<<"PIPE function we executed created PDNS exception: "<<e.reason<<endl; // but what if they wanted an answer.. we send 0
3474 }
3475 if(tmsg->wantAnswer) {
3476 const auto& threadInfo = s_threadInfos.at(t_id);
3477 if(write(threadInfo.pipes.writeFromThread, &resp, sizeof(resp)) != sizeof(resp)) {
3478 delete tmsg;
3479 unixDie("write to thread pipe returned wrong size or error");
3480 }
3481 }
3482
3483 delete tmsg;
3484 }
3485
3486 template<class T> void *voider(const boost::function<T*()>& func)
3487 {
3488 return func();
3489 }
3490
3491 vector<ComboAddress>& operator+=(vector<ComboAddress>&a, const vector<ComboAddress>& b)
3492 {
3493 a.insert(a.end(), b.begin(), b.end());
3494 return a;
3495 }
3496
3497 vector<pair<string, uint16_t> >& operator+=(vector<pair<string, uint16_t> >&a, const vector<pair<string, uint16_t> >& b)
3498 {
3499 a.insert(a.end(), b.begin(), b.end());
3500 return a;
3501 }
3502
3503 vector<pair<DNSName, uint16_t> >& operator+=(vector<pair<DNSName, uint16_t> >&a, const vector<pair<DNSName, uint16_t> >& b)
3504 {
3505 a.insert(a.end(), b.begin(), b.end());
3506 return a;
3507 }
3508
3509
3510 /*
3511 This function should only be called by the handler to gather metrics, wipe the cache,
3512 reload the Lua script (not the Lua config) or change the current trace regex,
3513 and by the SNMP thread to gather metrics. */
3514 template<class T> T broadcastAccFunction(const boost::function<T*()>& func)
3515 {
3516 if (!isHandlerThread()) {
3517 g_log<<Logger::Error<<"broadcastAccFunction has been called by a worker ("<<t_id<<")"<<endl;
3518 exit(1);
3519 }
3520
3521 unsigned int n = 0;
3522 T ret=T();
3523 for (const auto& threadInfo : s_threadInfos) {
3524 if (n++ == t_id) {
3525 continue;
3526 }
3527
3528 const auto& tps = threadInfo.pipes;
3529 ThreadMSG* tmsg = new ThreadMSG();
3530 tmsg->func = boost::bind(voider<T>, func);
3531 tmsg->wantAnswer = true;
3532
3533 if(write(tps.writeToThread, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) {
3534 delete tmsg;
3535 unixDie("write to thread pipe returned wrong size or error");
3536 }
3537
3538 T* resp = nullptr;
3539 if(read(tps.readFromThread, &resp, sizeof(resp)) != sizeof(resp))
3540 unixDie("read from thread pipe returned wrong size or error");
3541
3542 if(resp) {
3543 ret += *resp;
3544 delete resp;
3545 resp = nullptr;
3546 }
3547 }
3548 return ret;
3549 }
3550
3551 template string broadcastAccFunction(const boost::function<string*()>& fun); // explicit instantiation
3552 template uint64_t broadcastAccFunction(const boost::function<uint64_t*()>& fun); // explicit instantiation
3553 template vector<ComboAddress> broadcastAccFunction(const boost::function<vector<ComboAddress> *()>& fun); // explicit instantiation
3554 template vector<pair<DNSName,uint16_t> > broadcastAccFunction(const boost::function<vector<pair<DNSName, uint16_t> > *()>& fun); // explicit instantiation
3555 template ThreadTimes broadcastAccFunction(const boost::function<ThreadTimes*()>& fun);
3556
3557 static void handleRCC(int fd, FDMultiplexer::funcparam_t& var)
3558 {
3559 try {
3560 string remote;
3561 string msg=s_rcc.recv(&remote);
3562 RecursorControlParser rcp;
3563 RecursorControlParser::func_t* command;
3564
3565 string answer=rcp.getAnswer(msg, &command);
3566
3567 // If we are inside a chroot, we need to strip
3568 if (!arg()["chroot"].empty()) {
3569 size_t len = arg()["chroot"].length();
3570 remote = remote.substr(len);
3571 }
3572
3573 s_rcc.send(answer, &remote);
3574 command();
3575 }
3576 catch(const std::exception& e) {
3577 g_log<<Logger::Error<<"Error dealing with control socket request: "<<e.what()<<endl;
3578 }
3579 catch(const PDNSException& ae) {
3580 g_log<<Logger::Error<<"Error dealing with control socket request: "<<ae.reason<<endl;
3581 }
3582 }
3583
3584 static void handleTCPClientReadable(int fd, FDMultiplexer::funcparam_t& var)
3585 {
3586 PacketID* pident=any_cast<PacketID>(&var);
3587 // cerr<<"handleTCPClientReadable called for fd "<<fd<<", pident->inNeeded: "<<pident->inNeeded<<", "<<pident->sock->getHandle()<<endl;
3588
3589 shared_array<char> buffer(new char[pident->inNeeded]);
3590
3591 ssize_t ret=recv(fd, buffer.get(), pident->inNeeded,0);
3592 if(ret > 0) {
3593 pident->inMSG.append(&buffer[0], &buffer[ret]);
3594 pident->inNeeded-=(size_t)ret;
3595 if(!pident->inNeeded || pident->inIncompleteOkay) {
3596 // cerr<<"Got entire load of "<<pident->inMSG.size()<<" bytes"<<endl;
3597 PacketID pid=*pident;
3598 string msg=pident->inMSG;
3599
3600 t_fdm->removeReadFD(fd);
3601 MT->sendEvent(pid, &msg);
3602 }
3603 else {
3604 // cerr<<"Still have "<<pident->inNeeded<<" left to go"<<endl;
3605 }
3606 }
3607 else {
3608 PacketID tmp=*pident;
3609 t_fdm->removeReadFD(fd); // pident might now be invalid (it isn't, but still)
3610 string empty;
3611 MT->sendEvent(tmp, &empty); // this conveys error status
3612 }
3613 }
3614
3615 static void handleTCPClientWritable(int fd, FDMultiplexer::funcparam_t& var)
3616 {
3617 PacketID* pid=any_cast<PacketID>(&var);
3618 ssize_t ret=send(fd, pid->outMSG.c_str() + pid->outPos, pid->outMSG.size() - pid->outPos,0);
3619 if(ret > 0) {
3620 pid->outPos+=(ssize_t)ret;
3621 if(pid->outPos==pid->outMSG.size()) {
3622 PacketID tmp=*pid;
3623 t_fdm->removeWriteFD(fd);
3624 MT->sendEvent(tmp, &tmp.outMSG); // send back what we sent to convey everything is ok
3625 }
3626 }
3627 else { // error or EOF
3628 PacketID tmp(*pid);
3629 t_fdm->removeWriteFD(fd);
3630 string sent;
3631 MT->sendEvent(tmp, &sent); // we convey error status by sending empty string
3632 }
3633 }
3634
3635 // resend event to everybody chained onto it
3636 static void doResends(MT_t::waiters_t::iterator& iter, PacketID resend, const string& content)
3637 {
3638 if(iter->key.chain.empty())
3639 return;
3640 // cerr<<"doResends called!\n";
3641 for(PacketID::chain_t::iterator i=iter->key.chain.begin(); i != iter->key.chain.end() ; ++i) {
3642 resend.fd=-1;
3643 resend.id=*i;
3644 // cerr<<"\tResending "<<content.size()<<" bytes for fd="<<resend.fd<<" and id="<<resend.id<<endl;
3645
3646 MT->sendEvent(resend, &content);
3647 g_stats.chainResends++;
3648 }
3649 }
3650
3651 static void handleUDPServerResponse(int fd, FDMultiplexer::funcparam_t& var)
3652 {
3653 PacketID pid=any_cast<PacketID>(var);
3654 ssize_t len;
3655 std::string packet;
3656 packet.resize(g_outgoingEDNSBufsize);
3657 ComboAddress fromaddr;
3658 socklen_t addrlen=sizeof(fromaddr);
3659
3660 len=recvfrom(fd, &packet.at(0), packet.size(), 0, (sockaddr *)&fromaddr, &addrlen);
3661
3662 if(len < (ssize_t) sizeof(dnsheader)) {
3663 if(len < 0)
3664 ; // cerr<<"Error on fd "<<fd<<": "<<stringerror()<<"\n";
3665 else {
3666 g_stats.serverParseError++;
3667 if(g_logCommonErrors)
3668 g_log<<Logger::Error<<"Unable to parse packet from remote UDP server "<< fromaddr.toString() <<
3669 ": packet smaller than DNS header"<<endl;
3670 }
3671
3672 t_udpclientsocks->returnSocket(fd);
3673 string empty;
3674
3675 MT_t::waiters_t::iterator iter=MT->d_waiters.find(pid);
3676 if(iter != MT->d_waiters.end())
3677 doResends(iter, pid, empty);
3678
3679 MT->sendEvent(pid, &empty); // this denotes error (does lookup again.. at least L1 will be hot)
3680 return;
3681 }
3682
3683 packet.resize(len);
3684 dnsheader dh;
3685 memcpy(&dh, &packet.at(0), sizeof(dh));
3686
3687 PacketID pident;
3688 pident.remote=fromaddr;
3689 pident.id=dh.id;
3690 pident.fd=fd;
3691
3692 if(!dh.qr && g_logCommonErrors) {
3693 g_log<<Logger::Notice<<"Not taking data from question on outgoing socket from "<< fromaddr.toStringWithPort() <<endl;
3694 }
3695
3696 if(!dh.qdcount || // UPC, Nominum, very old BIND on FormErr, NSD
3697 !dh.qr) { // one weird server
3698 pident.domain.clear();
3699 pident.type = 0;
3700 }
3701 else {
3702 try {
3703 if(len > 12)
3704 pident.domain=DNSName(&packet.at(0), len, 12, false, &pident.type); // don't copy this from above - we need to do the actual read
3705 }
3706 catch(std::exception& e) {
3707 g_stats.serverParseError++; // won't be fed to lwres.cc, so we have to increment
3708 g_log<<Logger::Warning<<"Error in packet from remote nameserver "<< fromaddr.toStringWithPort() << ": "<<e.what() << endl;
3709 return;
3710 }
3711 }
3712
3713 MT_t::waiters_t::iterator iter=MT->d_waiters.find(pident);
3714 if(iter != MT->d_waiters.end()) {
3715 doResends(iter, pident, packet);
3716 }
3717
3718 retryWithName:
3719
3720 if(!MT->sendEvent(pident, &packet)) {
3721 /* we did not find a match for this response, something is wrong */
3722
3723 // we do a full scan for outstanding queries on unexpected answers. not too bad since we only accept them on the right port number, which is hard enough to guess
3724 for(MT_t::waiters_t::iterator mthread=MT->d_waiters.begin(); mthread!=MT->d_waiters.end(); ++mthread) {
3725 if(pident.fd==mthread->key.fd && mthread->key.remote==pident.remote && mthread->key.type == pident.type &&
3726 pident.domain == mthread->key.domain) {
3727 mthread->key.nearMisses++;
3728 }
3729
3730 // be a bit paranoid here since we're weakening our matching
3731 if(pident.domain.empty() && !mthread->key.domain.empty() && !pident.type && mthread->key.type &&
3732 pident.id == mthread->key.id && mthread->key.remote == pident.remote) {
3733 // cerr<<"Empty response, rest matches though, sending to a waiter"<<endl;
3734 pident.domain = mthread->key.domain;
3735 pident.type = mthread->key.type;
3736 goto retryWithName; // note that this only passes on an error, lwres will still reject the packet
3737 }
3738 }
3739 g_stats.unexpectedCount++; // if we made it here, it really is an unexpected answer
3740 if(g_logCommonErrors) {
3741 g_log<<Logger::Warning<<"Discarding unexpected packet from "<<fromaddr.toStringWithPort()<<": "<< (pident.domain.empty() ? "<empty>" : pident.domain.toString())<<", "<<pident.type<<", "<<MT->d_waiters.size()<<" waiters"<<endl;
3742 }
3743 }
3744 else if(fd >= 0) {
3745 /* we either found a waiter (1) or encountered an issue (-1), it's up to us to clean the socket anyway */
3746 t_udpclientsocks->returnSocket(fd);
3747 }
3748 }
3749
3750 FDMultiplexer* getMultiplexer()
3751 {
3752 FDMultiplexer* ret;
3753 for(const auto& i : FDMultiplexer::getMultiplexerMap()) {
3754 try {
3755 ret=i.second();
3756 return ret;
3757 }
3758 catch(FDMultiplexerException &fe) {
3759 g_log<<Logger::Error<<"Non-fatal error initializing possible multiplexer ("<<fe.what()<<"), falling back"<<endl;
3760 }
3761 catch(...) {
3762 g_log<<Logger::Error<<"Non-fatal error initializing possible multiplexer"<<endl;
3763 }
3764 }
3765 g_log<<Logger::Error<<"No working multiplexer found!"<<endl;
3766 exit(1);
3767 }
3768
3769
3770 static string* doReloadLuaScript()
3771 {
3772 string fname= ::arg()["lua-dns-script"];
3773 try {
3774 if(fname.empty()) {
3775 t_pdl.reset();
3776 g_log<<Logger::Info<<t_id<<" Unloaded current lua script"<<endl;
3777 return new string("unloaded\n");
3778 }
3779 else {
3780 t_pdl = std::make_shared<RecursorLua4>();
3781 t_pdl->loadFile(fname);
3782 }
3783 }
3784 catch(std::exception& e) {
3785 g_log<<Logger::Error<<t_id<<" Retaining current script, error from '"<<fname<<"': "<< e.what() <<endl;
3786 return new string("retaining current script, error from '"+fname+"': "+e.what()+"\n");
3787 }
3788
3789 g_log<<Logger::Warning<<t_id<<" (Re)loaded lua script from '"<<fname<<"'"<<endl;
3790 return new string("(re)loaded '"+fname+"'\n");
3791 }
3792
3793 string doQueueReloadLuaScript(vector<string>::const_iterator begin, vector<string>::const_iterator end)
3794 {
3795 if(begin != end)
3796 ::arg().set("lua-dns-script") = *begin;
3797
3798 return broadcastAccFunction<string>(doReloadLuaScript);
3799 }
3800
3801 static string* pleaseUseNewTraceRegex(const std::string& newRegex)
3802 try
3803 {
3804 if(newRegex.empty()) {
3805 t_traceRegex.reset();
3806 return new string("unset\n");
3807 }
3808 else {
3809 t_traceRegex = std::make_shared<Regex>(newRegex);
3810 return new string("ok\n");
3811 }
3812 }
3813 catch(PDNSException& ae)
3814 {
3815 return new string(ae.reason+"\n");
3816 }
3817
3818 string doTraceRegex(vector<string>::const_iterator begin, vector<string>::const_iterator end)
3819 {
3820 return broadcastAccFunction<string>(boost::bind(pleaseUseNewTraceRegex, begin!=end ? *begin : ""));
3821 }
3822
3823 static void checkLinuxIPv6Limits()
3824 {
3825 #ifdef __linux__
3826 string line;
3827 if(readFileIfThere("/proc/sys/net/ipv6/route/max_size", &line)) {
3828 int lim=std::stoi(line);
3829 if(lim < 16384) {
3830 g_log<<Logger::Error<<"If using IPv6, please raise sysctl net.ipv6.route.max_size, currently set to "<<lim<<" which is < 16384"<<endl;
3831 }
3832 }
3833 #endif
3834 }
3835 static void checkOrFixFDS()
3836 {
3837 unsigned int availFDs=getFilenumLimit();
3838 unsigned int wantFDs = g_maxMThreads * g_numWorkerThreads +25; // even healthier margin then before
3839
3840 if(wantFDs > availFDs) {
3841 unsigned int hardlimit= getFilenumLimit(true);
3842 if(hardlimit >= wantFDs) {
3843 setFilenumLimit(wantFDs);
3844 g_log<<Logger::Warning<<"Raised soft limit on number of filedescriptors to "<<wantFDs<<" to match max-mthreads and threads settings"<<endl;
3845 }
3846 else {
3847 int newval = (hardlimit - 25) / g_numWorkerThreads;
3848 g_log<<Logger::Warning<<"Insufficient number of filedescriptors available for max-mthreads*threads setting! ("<<hardlimit<<" < "<<wantFDs<<"), reducing max-mthreads to "<<newval<<endl;
3849 g_maxMThreads = newval;
3850 setFilenumLimit(hardlimit);
3851 }
3852 }
3853 }
3854
3855 static void* recursorThread(unsigned int tid, const string& threadName);
3856
3857 static void* pleaseSupplantACLs(std::shared_ptr<NetmaskGroup> ng)
3858 {
3859 t_allowFrom = ng;
3860 return nullptr;
3861 }
3862
3863 int g_argc;
3864 char** g_argv;
3865
3866 void parseACLs()
3867 {
3868 static bool l_initialized;
3869
3870 if(l_initialized) { // only reload configuration file on second call
3871 string configname=::arg()["config-dir"]+"/recursor.conf";
3872 if(::arg()["config-name"]!="") {
3873 configname=::arg()["config-dir"]+"/recursor-"+::arg()["config-name"]+".conf";
3874 }
3875 cleanSlashes(configname);
3876
3877 if(!::arg().preParseFile(configname.c_str(), "allow-from-file"))
3878 throw runtime_error("Unable to re-parse configuration file '"+configname+"'");
3879 ::arg().preParseFile(configname.c_str(), "allow-from", LOCAL_NETS);
3880 ::arg().preParseFile(configname.c_str(), "include-dir");
3881 ::arg().preParse(g_argc, g_argv, "include-dir");
3882
3883 // then process includes
3884 std::vector<std::string> extraConfigs;
3885 ::arg().gatherIncludes(extraConfigs);
3886
3887 for(const std::string& fn : extraConfigs) {
3888 if(!::arg().preParseFile(fn.c_str(), "allow-from-file", ::arg()["allow-from-file"]))
3889 throw runtime_error("Unable to re-parse configuration file include '"+fn+"'");
3890 if(!::arg().preParseFile(fn.c_str(), "allow-from", ::arg()["allow-from"]))
3891 throw runtime_error("Unable to re-parse configuration file include '"+fn+"'");
3892 }
3893
3894 ::arg().preParse(g_argc, g_argv, "allow-from-file");
3895 ::arg().preParse(g_argc, g_argv, "allow-from");
3896 }
3897
3898 std::shared_ptr<NetmaskGroup> oldAllowFrom = t_allowFrom;
3899 std::shared_ptr<NetmaskGroup> allowFrom = std::make_shared<NetmaskGroup>();
3900
3901 if(!::arg()["allow-from-file"].empty()) {
3902 string line;
3903 ifstream ifs(::arg()["allow-from-file"].c_str());
3904 if(!ifs) {
3905 throw runtime_error("Could not open '"+::arg()["allow-from-file"]+"': "+stringerror());
3906 }
3907
3908 string::size_type pos;
3909 while(getline(ifs,line)) {
3910 pos=line.find('#');
3911 if(pos!=string::npos)
3912 line.resize(pos);
3913 trim(line);
3914 if(line.empty())
3915 continue;
3916
3917 allowFrom->addMask(line);
3918 }
3919 g_log<<Logger::Warning<<"Done parsing " << allowFrom->size() <<" allow-from ranges from file '"<<::arg()["allow-from-file"]<<"' - overriding 'allow-from' setting"<<endl;
3920 }
3921 else if(!::arg()["allow-from"].empty()) {
3922 vector<string> ips;
3923 stringtok(ips, ::arg()["allow-from"], ", ");
3924
3925 g_log<<Logger::Warning<<"Only allowing queries from: ";
3926 for(vector<string>::const_iterator i = ips.begin(); i!= ips.end(); ++i) {
3927 allowFrom->addMask(*i);
3928 if(i!=ips.begin())
3929 g_log<<Logger::Warning<<", ";
3930 g_log<<Logger::Warning<<*i;
3931 }
3932 g_log<<Logger::Warning<<endl;
3933 }
3934 else {
3935 if(::arg()["local-address"]!="127.0.0.1" && ::arg().asNum("local-port")==53)
3936 g_log<<Logger::Warning<<"WARNING: Allowing queries from all IP addresses - this can be a security risk!"<<endl;
3937 allowFrom = nullptr;
3938 }
3939
3940 g_initialAllowFrom = allowFrom;
3941 broadcastFunction(boost::bind(pleaseSupplantACLs, allowFrom));
3942 oldAllowFrom = nullptr;
3943
3944 l_initialized = true;
3945 }
3946
3947
3948 static void setupDelegationOnly()
3949 {
3950 vector<string> parts;
3951 stringtok(parts, ::arg()["delegation-only"], ", \t");
3952 for(const auto& p : parts) {
3953 SyncRes::addDelegationOnly(DNSName(p));
3954 }
3955 }
3956
3957 static std::map<unsigned int, std::set<int> > parseCPUMap()
3958 {
3959 std::map<unsigned int, std::set<int> > result;
3960
3961 const std::string value = ::arg()["cpu-map"];
3962
3963 if (!value.empty() && !isSettingThreadCPUAffinitySupported()) {
3964 g_log<<Logger::Warning<<"CPU mapping requested but not supported, skipping"<<endl;
3965 return result;
3966 }
3967
3968 std::vector<std::string> parts;
3969
3970 stringtok(parts, value, " \t");
3971
3972 for(const auto& part : parts) {
3973 if (part.find('=') == string::npos)
3974 continue;
3975
3976 try {
3977 auto headers = splitField(part, '=');
3978 trim(headers.first);
3979 trim(headers.second);
3980
3981 unsigned int threadId = pdns_stou(headers.first);
3982 std::vector<std::string> cpus;
3983
3984 stringtok(cpus, headers.second, ",");
3985
3986 for(const auto& cpu : cpus) {
3987 int cpuId = std::stoi(cpu);
3988
3989 result[threadId].insert(cpuId);
3990 }
3991 }
3992 catch(const std::exception& e) {
3993 g_log<<Logger::Error<<"Error parsing cpu-map entry '"<<part<<"': "<<e.what()<<endl;
3994 }
3995 }
3996
3997 return result;
3998 }
3999
4000 static void setCPUMap(const std::map<unsigned int, std::set<int> >& cpusMap, unsigned int n, pthread_t tid)
4001 {
4002 const auto& cpuMapping = cpusMap.find(n);
4003 if (cpuMapping != cpusMap.cend()) {
4004 int rc = mapThreadToCPUList(tid, cpuMapping->second);
4005 if (rc == 0) {
4006 g_log<<Logger::Info<<"CPU affinity for worker "<<n<<" has been set to CPU map:";
4007 for (const auto cpu : cpuMapping->second) {
4008 g_log<<Logger::Info<<" "<<cpu;
4009 }
4010 g_log<<Logger::Info<<endl;
4011 }
4012 else {
4013 g_log<<Logger::Warning<<"Error setting CPU affinity for worker "<<n<<" to CPU map:";
4014 for (const auto cpu : cpuMapping->second) {
4015 g_log<<Logger::Info<<" "<<cpu;
4016 }
4017 g_log<<Logger::Info<<strerror(rc)<<endl;
4018 }
4019 }
4020 }
4021
4022 #ifdef NOD_ENABLED
4023 static void setupNODThread()
4024 {
4025 if (g_nodEnabled) {
4026 uint32_t num_cells = ::arg().asNum("new-domain-db-size");
4027 t_nodDBp = std::make_shared<nod::NODDB>(num_cells);
4028 try {
4029 t_nodDBp->setCacheDir(::arg()["new-domain-history-dir"]);
4030 }
4031 catch (const PDNSException& e) {
4032 g_log<<Logger::Error<<"new-domain-history-dir (" << ::arg()["new-domain-history-dir"] << ") is not readable or does not exist"<<endl;
4033 _exit(1);
4034 }
4035 if (!t_nodDBp->init()) {
4036 g_log<<Logger::Error<<"Could not initialize domain tracking"<<endl;
4037 _exit(1);
4038 }
4039 std::thread t(nod::NODDB::startHousekeepingThread, t_nodDBp, std::this_thread::get_id());
4040 t.detach();
4041 g_nod_pbtag = ::arg()["new-domain-pb-tag"];
4042 }
4043 if (g_udrEnabled) {
4044 uint32_t num_cells = ::arg().asNum("unique-response-db-size");
4045 t_udrDBp = std::make_shared<nod::UniqueResponseDB>(num_cells);
4046 try {
4047 t_udrDBp->setCacheDir(::arg()["unique-response-history-dir"]);
4048 }
4049 catch (const PDNSException& e) {
4050 g_log<<Logger::Error<<"unique-response-history-dir (" << ::arg()["unique-response-history-dir"] << ") is not readable or does not exist"<<endl;
4051 _exit(1);
4052 }
4053 if (!t_udrDBp->init()) {
4054 g_log<<Logger::Error<<"Could not initialize unique response tracking"<<endl;
4055 _exit(1);
4056 }
4057 std::thread t(nod::UniqueResponseDB::startHousekeepingThread, t_udrDBp, std::this_thread::get_id());
4058 t.detach();
4059 g_udr_pbtag = ::arg()["unique-response-pb-tag"];
4060 }
4061 }
4062
4063 void parseNODWhitelist(const std::string& wlist)
4064 {
4065 vector<string> parts;
4066 stringtok(parts, wlist, ",; ");
4067 for(const auto& a : parts) {
4068 g_nodDomainWL.add(DNSName(a));
4069 }
4070 }
4071
4072 static void setupNODGlobal()
4073 {
4074 // Setup NOD subsystem
4075 g_nodEnabled = ::arg().mustDo("new-domain-tracking");
4076 g_nodLookupDomain = DNSName(::arg()["new-domain-lookup"]);
4077 g_nodLog = ::arg().mustDo("new-domain-log");
4078 parseNODWhitelist(::arg()["new-domain-whitelist"]);
4079
4080 // Setup Unique DNS Response subsystem
4081 g_udrEnabled = ::arg().mustDo("unique-response-tracking");
4082 g_udrLog = ::arg().mustDo("unique-response-log");
4083 }
4084 #endif /* NOD_ENABLED */
4085
4086 static void checkSocketDir(void)
4087 {
4088 struct stat st;
4089 string dir(::arg()["socket-dir"]);
4090 string msg;
4091
4092 if (stat(dir.c_str(), &st) == -1) {
4093 msg = "it does not exist or cannot access";
4094 }
4095 else if (!S_ISDIR(st.st_mode)) {
4096 msg = "it is not a directory";
4097 }
4098 else if (access(dir.c_str(), R_OK | W_OK | X_OK) != 0) {
4099 msg = "cannot read, write or search";
4100 } else {
4101 return;
4102 }
4103 g_log << Logger::Error << "Problem with socket directory " << dir << ": " << msg << "; see https://docs.powerdns.com/recursor/upgrade.html#x-to-4-3-0-or-master" << endl;
4104 _exit(1);
4105 }
4106
4107 static int serviceMain(int argc, char*argv[])
4108 {
4109 g_log.setName(s_programname);
4110 g_log.disableSyslog(::arg().mustDo("disable-syslog"));
4111 g_log.setTimestamps(::arg().mustDo("log-timestamp"));
4112
4113 if(!::arg()["logging-facility"].empty()) {
4114 int val=logFacilityToLOG(::arg().asNum("logging-facility") );
4115 if(val >= 0)
4116 g_log.setFacility(val);
4117 else
4118 g_log<<Logger::Error<<"Unknown logging facility "<<::arg().asNum("logging-facility") <<endl;
4119 }
4120
4121 showProductVersion();
4122
4123 g_disthashseed=dns_random(0xffffffff);
4124
4125 checkLinuxIPv6Limits();
4126 try {
4127 vector<string> addrs;
4128 if(!::arg()["query-local-address6"].empty()) {
4129 SyncRes::s_doIPv6=true;
4130 g_log<<Logger::Warning<<"Enabling IPv6 transport for outgoing queries"<<endl;
4131
4132 stringtok(addrs, ::arg()["query-local-address6"], ", ;");
4133 for(const string& addr : addrs) {
4134 g_localQueryAddresses6.push_back(ComboAddress(addr));
4135 }
4136 }
4137 else {
4138 g_log<<Logger::Warning<<"NOT using IPv6 for outgoing queries - set 'query-local-address6=::' to enable"<<endl;
4139 }
4140 addrs.clear();
4141 stringtok(addrs, ::arg()["query-local-address"], ", ;");
4142 for(const string& addr : addrs) {
4143 g_localQueryAddresses4.push_back(ComboAddress(addr));
4144 }
4145 }
4146 catch(std::exception& e) {
4147 g_log<<Logger::Error<<"Assigning local query addresses: "<<e.what();
4148 exit(99);
4149 }
4150
4151 // keep this ABOVE loadRecursorLuaConfig!
4152 if(::arg()["dnssec"]=="off")
4153 g_dnssecmode=DNSSECMode::Off;
4154 else if(::arg()["dnssec"]=="process-no-validate")
4155 g_dnssecmode=DNSSECMode::ProcessNoValidate;
4156 else if(::arg()["dnssec"]=="process")
4157 g_dnssecmode=DNSSECMode::Process;
4158 else if(::arg()["dnssec"]=="validate")
4159 g_dnssecmode=DNSSECMode::ValidateAll;
4160 else if(::arg()["dnssec"]=="log-fail")
4161 g_dnssecmode=DNSSECMode::ValidateForLog;
4162 else {
4163 g_log<<Logger::Error<<"Unknown DNSSEC mode "<<::arg()["dnssec"]<<endl;
4164 exit(1);
4165 }
4166
4167 g_signatureInceptionSkew = ::arg().asNum("signature-inception-skew");
4168 if (g_signatureInceptionSkew < 0) {
4169 g_log<<Logger::Error<<"A negative value for 'signature-inception-skew' is not allowed"<<endl;
4170 exit(1);
4171 }
4172
4173 g_dnssecLogBogus = ::arg().mustDo("dnssec-log-bogus");
4174 g_maxNSEC3Iterations = ::arg().asNum("nsec3-max-iterations");
4175
4176 g_maxCacheEntries = ::arg().asNum("max-cache-entries");
4177 g_maxPacketCacheEntries = ::arg().asNum("max-packetcache-entries");
4178
4179 luaConfigDelayedThreads delayedLuaThreads;
4180 try {
4181 loadRecursorLuaConfig(::arg()["lua-config-file"], delayedLuaThreads);
4182 }
4183 catch (PDNSException &e) {
4184 g_log<<Logger::Error<<"Cannot load Lua configuration: "<<e.reason<<endl;
4185 exit(1);
4186 }
4187
4188 parseACLs();
4189 initPublicSuffixList(::arg()["public-suffix-list-file"]);
4190
4191 if(!::arg()["dont-query"].empty()) {
4192 vector<string> ips;
4193 stringtok(ips, ::arg()["dont-query"], ", ");
4194 ips.push_back("0.0.0.0");
4195 ips.push_back("::");
4196
4197 g_log<<Logger::Warning<<"Will not send queries to: ";
4198 for(vector<string>::const_iterator i = ips.begin(); i!= ips.end(); ++i) {
4199 SyncRes::addDontQuery(*i);
4200 if(i!=ips.begin())
4201 g_log<<Logger::Warning<<", ";
4202 g_log<<Logger::Warning<<*i;
4203 }
4204 g_log<<Logger::Warning<<endl;
4205 }
4206
4207 g_quiet=::arg().mustDo("quiet");
4208
4209 /* this needs to be done before parseACLs(), which call broadcastFunction() */
4210 g_weDistributeQueries = ::arg().mustDo("pdns-distributes-queries");
4211 if(g_weDistributeQueries) {
4212 g_log<<Logger::Warning<<"PowerDNS Recursor itself will distribute queries over threads"<<endl;
4213 }
4214
4215 setupDelegationOnly();
4216 g_outgoingEDNSBufsize=::arg().asNum("edns-outgoing-bufsize");
4217
4218 if(::arg()["trace"]=="fail") {
4219 SyncRes::setDefaultLogMode(SyncRes::Store);
4220 }
4221 else if(::arg().mustDo("trace")) {
4222 SyncRes::setDefaultLogMode(SyncRes::Log);
4223 ::arg().set("quiet")="no";
4224 g_quiet=false;
4225 g_dnssecLOG=true;
4226 }
4227 string myHostname = getHostname();
4228 if (myHostname == "UNKNOWN"){
4229 g_log<<Logger::Warning<<"Unable to get the hostname, NSID and id.server values will be empty"<<endl;
4230 myHostname = "";
4231 }
4232
4233 SyncRes::s_minimumTTL = ::arg().asNum("minimum-ttl-override");
4234 SyncRes::s_minimumECSTTL = ::arg().asNum("ecs-minimum-ttl-override");
4235
4236 SyncRes::s_nopacketcache = ::arg().mustDo("disable-packetcache");
4237
4238 SyncRes::s_maxnegttl=::arg().asNum("max-negative-ttl");
4239 SyncRes::s_maxbogusttl=::arg().asNum("max-cache-bogus-ttl");
4240 SyncRes::s_maxcachettl=max(::arg().asNum("max-cache-ttl"), 15);
4241 SyncRes::s_packetcachettl=::arg().asNum("packetcache-ttl");
4242 // Cap the packetcache-servfail-ttl to the packetcache-ttl
4243 uint32_t packetCacheServFailTTL = ::arg().asNum("packetcache-servfail-ttl");
4244 SyncRes::s_packetcacheservfailttl=(packetCacheServFailTTL > SyncRes::s_packetcachettl) ? SyncRes::s_packetcachettl : packetCacheServFailTTL;
4245 SyncRes::s_serverdownmaxfails=::arg().asNum("server-down-max-fails");
4246 SyncRes::s_serverdownthrottletime=::arg().asNum("server-down-throttle-time");
4247 SyncRes::s_serverID=::arg()["server-id"];
4248 SyncRes::s_maxqperq=::arg().asNum("max-qperq");
4249 SyncRes::s_maxtotusec=1000*::arg().asNum("max-total-msec");
4250 SyncRes::s_maxdepth=::arg().asNum("max-recursion-depth");
4251 SyncRes::s_rootNXTrust = ::arg().mustDo( "root-nx-trust");
4252 if(SyncRes::s_serverID.empty()) {
4253 SyncRes::s_serverID = myHostname;
4254 }
4255
4256 SyncRes::s_ecsipv4limit = ::arg().asNum("ecs-ipv4-bits");
4257 SyncRes::s_ecsipv6limit = ::arg().asNum("ecs-ipv6-bits");
4258 SyncRes::clearECSStats();
4259 SyncRes::s_ecsipv4cachelimit = ::arg().asNum("ecs-ipv4-cache-bits");
4260 SyncRes::s_ecsipv6cachelimit = ::arg().asNum("ecs-ipv6-cache-bits");
4261 SyncRes::s_ecscachelimitttl = ::arg().asNum("ecs-cache-limit-ttl");
4262
4263 SyncRes::s_qnameminimization = ::arg().mustDo("qname-minimization");
4264
4265 if (SyncRes::s_qnameminimization) {
4266 // With an empty cache, a rev ipv6 query with dnssec enabled takes
4267 // almost 100 queries. Default maxqperq is 60.
4268 SyncRes::s_maxqperq = std::max(SyncRes::s_maxqperq, static_cast<unsigned int>(100));
4269 }
4270
4271 SyncRes::s_hardenNXD = SyncRes::HardenNXD::DNSSEC;
4272 string value = ::arg()["nothing-below-nxdomain"];
4273 if (value == "yes") {
4274 SyncRes::s_hardenNXD = SyncRes::HardenNXD::Yes;
4275 } else if (value == "no") {
4276 SyncRes::s_hardenNXD = SyncRes::HardenNXD::No;
4277 } else if (value != "dnssec") {
4278 g_log << Logger::Error << "Unknown nothing-below-nxdomain mode: " << value << endl;
4279 exit(1);
4280 }
4281
4282 if (!::arg().isEmpty("ecs-scope-zero-address")) {
4283 ComboAddress scopeZero(::arg()["ecs-scope-zero-address"]);
4284 SyncRes::setECSScopeZeroAddress(Netmask(scopeZero, scopeZero.isIPv4() ? 32 : 128));
4285 }
4286 else {
4287 bool found = false;
4288 for (const auto& addr : g_localQueryAddresses4) {
4289 if (!IsAnyAddress(addr)) {
4290 SyncRes::setECSScopeZeroAddress(Netmask(addr, 32));
4291 found = true;
4292 break;
4293 }
4294 }
4295 if (!found) {
4296 for (const auto& addr : g_localQueryAddresses6) {
4297 if (!IsAnyAddress(addr)) {
4298 SyncRes::setECSScopeZeroAddress(Netmask(addr, 128));
4299 found = true;
4300 break;
4301 }
4302 }
4303 if (!found) {
4304 SyncRes::setECSScopeZeroAddress(Netmask("127.0.0.1/32"));
4305 }
4306 }
4307 }
4308
4309 SyncRes::parseEDNSSubnetWhitelist(::arg()["edns-subnet-whitelist"]);
4310 SyncRes::parseEDNSSubnetAddFor(::arg()["ecs-add-for"]);
4311 g_useIncomingECS = ::arg().mustDo("use-incoming-edns-subnet");
4312
4313 g_XPFAcl.toMasks(::arg()["xpf-allow-from"]);
4314 g_xpfRRCode = ::arg().asNum("xpf-rr-code");
4315
4316 g_proxyProtocolACL.toMasks(::arg()["proxy-protocol-from"]);
4317 g_proxyProtocolMaximumSize = ::arg().asNum("proxy-protocol-maximum-size");
4318
4319 if (!::arg()["dns64-prefix"].empty()) {
4320 try {
4321 auto dns64Prefix = Netmask(::arg()["dns64-prefix"]);
4322 if (dns64Prefix.getBits() != 96) {
4323 g_log << Logger::Error << "Invalid prefix for 'dns64-prefix', the current implementation only supports /96 prefixes: " << ::arg()["dns64-prefix"] << endl;
4324 exit(1);
4325 }
4326 g_dns64Prefix = dns64Prefix.getNetwork();
4327 g_dns64PrefixReverse = reverseNameFromIP(*g_dns64Prefix);
4328 /* /96 is 24 nibbles + 2 for "ip6.arpa." */
4329 while (g_dns64PrefixReverse.countLabels() > 26) {
4330 g_dns64PrefixReverse.chopOff();
4331 }
4332 }
4333 catch (const NetmaskException& ne) {
4334 g_log << Logger::Error << "Invalid prefix '" << ::arg()["dns64-prefix"] << "' for 'dns64-prefix': " << ne.reason << endl;
4335 exit(1);
4336 }
4337 }
4338
4339 g_networkTimeoutMsec = ::arg().asNum("network-timeout");
4340
4341 g_initialDomainMap = parseAuthAndForwards();
4342
4343 g_latencyStatSize=::arg().asNum("latency-statistic-size");
4344
4345 g_logCommonErrors=::arg().mustDo("log-common-errors");
4346 g_logRPZChanges = ::arg().mustDo("log-rpz-changes");
4347
4348 g_anyToTcp = ::arg().mustDo("any-to-tcp");
4349 g_udpTruncationThreshold = ::arg().asNum("udp-truncation-threshold");
4350
4351 g_lowercaseOutgoing = ::arg().mustDo("lowercase-outgoing");
4352
4353 g_numDistributorThreads = ::arg().asNum("distributor-threads");
4354 g_numWorkerThreads = ::arg().asNum("threads");
4355 if (g_numWorkerThreads < 1) {
4356 g_log<<Logger::Warning<<"Asked to run with 0 threads, raising to 1 instead"<<endl;
4357 g_numWorkerThreads = 1;
4358 }
4359
4360 g_numThreads = g_numDistributorThreads + g_numWorkerThreads;
4361 g_maxMThreads = ::arg().asNum("max-mthreads");
4362
4363
4364 int64_t maxInFlight = ::arg().asNum("max-concurrent-requests-per-tcp-connection");
4365 if (maxInFlight < 1 || maxInFlight > USHRT_MAX || maxInFlight >= g_maxMThreads) {
4366 g_log<<Logger::Warning<<"Asked to run with illegal max-concurrent-requests-per-tcp-connection, setting to default (10)"<<endl;
4367 TCPConnection::s_maxInFlight = 10;
4368 } else {
4369 TCPConnection::s_maxInFlight = maxInFlight;
4370 }
4371
4372
4373 g_gettagNeedsEDNSOptions = ::arg().mustDo("gettag-needs-edns-options");
4374
4375 g_statisticsInterval = ::arg().asNum("statistics-interval");
4376
4377 {
4378 SuffixMatchNode dontThrottleNames;
4379 vector<string> parts;
4380 stringtok(parts, ::arg()["dont-throttle-names"], " ,");
4381 for (const auto &p : parts) {
4382 dontThrottleNames.add(DNSName(p));
4383 }
4384 g_dontThrottleNames.setState(std::move(dontThrottleNames));
4385
4386 NetmaskGroup dontThrottleNetmasks;
4387 stringtok(parts, ::arg()["dont-throttle-netmasks"], " ,");
4388 for (const auto &p : parts) {
4389 dontThrottleNetmasks.addMask(Netmask(p));
4390 }
4391 g_dontThrottleNetmasks.setState(std::move(dontThrottleNetmasks));
4392 }
4393
4394 s_balancingFactor = ::arg().asDouble("distribution-load-factor");
4395 if (s_balancingFactor != 0.0 && s_balancingFactor < 1.0) {
4396 s_balancingFactor = 0.0;
4397 g_log<<Logger::Warning<<"Asked to run with a distribution-load-factor below 1.0, disabling it instead"<<endl;
4398 }
4399
4400 #ifdef SO_REUSEPORT
4401 g_reusePort = ::arg().mustDo("reuseport");
4402 #endif
4403
4404 s_threadInfos.resize(g_numDistributorThreads + g_numWorkerThreads + /* handler */ 1);
4405
4406 if (g_reusePort) {
4407 if (g_weDistributeQueries) {
4408 /* first thread is the handler, then distributors */
4409 for (unsigned int threadId = 1; threadId <= g_numDistributorThreads; threadId++) {
4410 auto& deferredAdds = s_threadInfos.at(threadId).deferredAdds;
4411 auto& tcpSockets = s_threadInfos.at(threadId).tcpSockets;
4412 makeUDPServerSockets(deferredAdds);
4413 makeTCPServerSockets(deferredAdds, tcpSockets);
4414 }
4415 }
4416 else {
4417 /* first thread is the handler, there is no distributor here and workers are accepting queries */
4418 for (unsigned int threadId = 1; threadId <= g_numWorkerThreads; threadId++) {
4419 auto& deferredAdds = s_threadInfos.at(threadId).deferredAdds;
4420 auto& tcpSockets = s_threadInfos.at(threadId).tcpSockets;
4421 makeUDPServerSockets(deferredAdds);
4422 makeTCPServerSockets(deferredAdds, tcpSockets);
4423 }
4424 }
4425 }
4426 else {
4427 std::set<int> tcpSockets;
4428 /* we don't have reuseport so we can only open one socket per
4429 listening addr:port and everyone will listen on it */
4430 makeUDPServerSockets(g_deferredAdds);
4431 makeTCPServerSockets(g_deferredAdds, tcpSockets);
4432
4433 /* every listener (so distributor if g_weDistributeQueries, workers otherwise)
4434 needs to listen to the shared sockets */
4435 if (g_weDistributeQueries) {
4436 /* first thread is the handler, then distributors */
4437 for (unsigned int threadId = 1; threadId <= g_numDistributorThreads; threadId++) {
4438 s_threadInfos.at(threadId).tcpSockets = tcpSockets;
4439 }
4440 }
4441 else {
4442 /* first thread is the handler, there is no distributor here and workers are accepting queries */
4443 for (unsigned int threadId = 1; threadId <= g_numWorkerThreads; threadId++) {
4444 s_threadInfos.at(threadId).tcpSockets = tcpSockets;
4445 }
4446 }
4447 }
4448
4449 #ifdef NOD_ENABLED
4450 // Setup newly observed domain globals
4451 setupNODGlobal();
4452 #endif /* NOD_ENABLED */
4453
4454 int forks;
4455 for(forks = 0; forks < ::arg().asNum("processes") - 1; ++forks) {
4456 if(!fork()) // we are child
4457 break;
4458 }
4459
4460 if(::arg().mustDo("daemon")) {
4461 g_log<<Logger::Warning<<"Calling daemonize, going to background"<<endl;
4462 g_log.toConsole(Logger::Critical);
4463 daemonize();
4464 }
4465 if(Utility::getpid() == 1) {
4466 /* We are running as pid 1, register sigterm and sigint handler
4467
4468 The Linux kernel will handle SIGTERM and SIGINT for all processes, except PID 1.
4469 It assumes that the processes running as pid 1 is an "init" like system.
4470 For years, this was a safe assumption, but containers change that: in
4471 most (all?) container implementations, the application itself is running
4472 as pid 1. This means that sending signals to those applications, will not
4473 be handled by default. Results might be "your container not responding
4474 when asking it to stop", or "ctrl-c not working even when the app is
4475 running in the foreground inside a container".
4476
4477 So TL;DR: If we're running pid 1 (container), we should handle SIGTERM and SIGINT ourselves */
4478
4479 signal(SIGTERM,termIntHandler);
4480 signal(SIGINT,termIntHandler);
4481 }
4482
4483 signal(SIGUSR1,usr1Handler);
4484 signal(SIGUSR2,usr2Handler);
4485 signal(SIGPIPE,SIG_IGN);
4486
4487 checkOrFixFDS();
4488
4489 #ifdef HAVE_LIBSODIUM
4490 if (sodium_init() == -1) {
4491 g_log<<Logger::Error<<"Unable to initialize sodium crypto library"<<endl;
4492 exit(99);
4493 }
4494 #endif
4495
4496 openssl_thread_setup();
4497 openssl_seed();
4498 /* setup rng before chroot */
4499 dns_random_init();
4500
4501 if(::arg()["server-id"].empty()) {
4502 ::arg().set("server-id") = myHostname;
4503 }
4504
4505 int newgid=0;
4506 if(!::arg()["setgid"].empty())
4507 newgid = strToGID(::arg()["setgid"]);
4508 int newuid=0;
4509 if(!::arg()["setuid"].empty())
4510 newuid = strToUID(::arg()["setuid"]);
4511
4512 Utility::dropGroupPrivs(newuid, newgid);
4513
4514 if (!::arg()["chroot"].empty()) {
4515 #ifdef HAVE_SYSTEMD
4516 char *ns;
4517 ns = getenv("NOTIFY_SOCKET");
4518 if (ns != nullptr) {
4519 g_log<<Logger::Error<<"Unable to chroot when running from systemd. Please disable chroot= or set the 'Type' for this service to 'simple'"<<endl;
4520 exit(1);
4521 }
4522 #endif
4523 if (chroot(::arg()["chroot"].c_str())<0 || chdir("/") < 0) {
4524 int err = errno;
4525 g_log<<Logger::Error<<"Unable to chroot to '"+::arg()["chroot"]+"': "<<strerror (err)<<", exiting"<<endl;
4526 exit(1);
4527 }
4528 else
4529 g_log<<Logger::Info<<"Chrooted to '"<<::arg()["chroot"]<<"'"<<endl;
4530 }
4531
4532 checkSocketDir();
4533
4534 s_pidfname=::arg()["socket-dir"]+"/"+s_programname+".pid";
4535 if(!s_pidfname.empty())
4536 unlink(s_pidfname.c_str()); // remove possible old pid file
4537 writePid();
4538
4539 makeControlChannelSocket( ::arg().asNum("processes") > 1 ? forks : -1);
4540
4541 Utility::dropUserPrivs(newuid);
4542 try {
4543 /* we might still have capabilities remaining, for example if we have been started as root
4544 without --setuid (please don't do that) or as an unprivileged user with ambient capabilities
4545 like CAP_NET_BIND_SERVICE.
4546 */
4547 dropCapabilities();
4548 }
4549 catch(const std::exception& e) {
4550 g_log<<Logger::Warning<<e.what()<<endl;
4551 }
4552
4553 startLuaConfigDelayedThreads(delayedLuaThreads, g_luaconfs.getCopy().generation);
4554
4555 makeThreadPipes();
4556
4557 g_tcpTimeout=::arg().asNum("client-tcp-timeout");
4558 g_maxTCPPerClient=::arg().asNum("max-tcp-per-client");
4559 g_tcpMaxQueriesPerConn=::arg().asNum("max-tcp-queries-per-connection");
4560 s_maxUDPQueriesPerRound=::arg().asNum("max-udp-queries-per-round");
4561
4562 g_useKernelTimestamp = ::arg().mustDo("protobuf-use-kernel-timestamp");
4563
4564 blacklistStats(StatComponent::API, ::arg()["stats-api-blacklist"]);
4565 blacklistStats(StatComponent::Carbon, ::arg()["stats-carbon-blacklist"]);
4566 blacklistStats(StatComponent::RecControl, ::arg()["stats-rec-control-blacklist"]);
4567 blacklistStats(StatComponent::SNMP, ::arg()["stats-snmp-blacklist"]);
4568
4569 if (::arg().mustDo("snmp-agent")) {
4570 g_snmpAgent = std::make_shared<RecursorSNMPAgent>("recursor", ::arg()["snmp-master-socket"]);
4571 g_snmpAgent->run();
4572 }
4573
4574 int port = ::arg().asNum("udp-source-port-min");
4575 if(port < 1024 || port > 65535){
4576 g_log<<Logger::Error<<"Unable to launch, udp-source-port-min is not a valid port number"<<endl;
4577 exit(99); // this isn't going to fix itself either
4578 }
4579 s_minUdpSourcePort = port;
4580 port = ::arg().asNum("udp-source-port-max");
4581 if(port < 1024 || port > 65535 || port < s_minUdpSourcePort){
4582 g_log<<Logger::Error<<"Unable to launch, udp-source-port-max is not a valid port number or is smaller than udp-source-port-min"<<endl;
4583 exit(99); // this isn't going to fix itself either
4584 }
4585 s_maxUdpSourcePort = port;
4586 std::vector<string> parts {};
4587 stringtok(parts, ::arg()["udp-source-port-avoid"], ", ");
4588 for (const auto &part : parts)
4589 {
4590 port = std::stoi(part);
4591 if(port < 1024 || port > 65535){
4592 g_log<<Logger::Error<<"Unable to launch, udp-source-port-avoid contains an invalid port number: "<<part<<endl;
4593 exit(99); // this isn't going to fix itself either
4594 }
4595 s_avoidUdpSourcePorts.insert(port);
4596 }
4597
4598 unsigned int currentThreadId = 1;
4599 const auto cpusMap = parseCPUMap();
4600
4601 if(g_numThreads == 1) {
4602 g_log<<Logger::Warning<<"Operating unthreaded"<<endl;
4603 #ifdef HAVE_SYSTEMD
4604 sd_notify(0, "READY=1");
4605 #endif
4606
4607 /* This thread handles the web server, carbon, statistics and the control channel */
4608 auto& handlerInfos = s_threadInfos.at(0);
4609 handlerInfos.isHandler = true;
4610 handlerInfos.thread = std::thread(recursorThread, 0, "main");
4611
4612 setCPUMap(cpusMap, currentThreadId, pthread_self());
4613
4614 auto& infos = s_threadInfos.at(currentThreadId);
4615 infos.isListener = true;
4616 infos.isWorker = true;
4617 recursorThread(currentThreadId++, "worker");
4618
4619 handlerInfos.thread.join();
4620 }
4621 else {
4622
4623
4624 if (g_weDistributeQueries) {
4625 for(unsigned int n=0; n < g_numDistributorThreads; ++n) {
4626 auto& infos = s_threadInfos.at(currentThreadId + n);
4627 infos.isListener = true;
4628 }
4629 }
4630 for(unsigned int n=0; n < g_numWorkerThreads; ++n) {
4631 auto& infos = s_threadInfos.at(currentThreadId + (g_weDistributeQueries ? g_numDistributorThreads : 0) + n);
4632 infos.isListener = !g_weDistributeQueries;
4633 infos.isWorker = true;
4634 }
4635
4636 if (g_weDistributeQueries) {
4637 g_log<<Logger::Warning<<"Launching "<< g_numDistributorThreads <<" distributor threads"<<endl;
4638 for(unsigned int n=0; n < g_numDistributorThreads; ++n) {
4639 auto& infos = s_threadInfos.at(currentThreadId);
4640 infos.thread = std::thread(recursorThread, currentThreadId++, "distr");
4641 setCPUMap(cpusMap, currentThreadId, infos.thread.native_handle());
4642 }
4643 }
4644
4645 g_log<<Logger::Warning<<"Launching "<< g_numWorkerThreads <<" worker threads"<<endl;
4646
4647 for(unsigned int n=0; n < g_numWorkerThreads; ++n) {
4648 auto& infos = s_threadInfos.at(currentThreadId);
4649 infos.thread = std::thread(recursorThread, currentThreadId++, "worker");
4650 setCPUMap(cpusMap, currentThreadId, infos.thread.native_handle());
4651 }
4652
4653 #ifdef HAVE_SYSTEMD
4654 sd_notify(0, "READY=1");
4655 #endif
4656
4657 /* This thread handles the web server, carbon, statistics and the control channel */
4658 auto& infos = s_threadInfos.at(0);
4659 infos.isHandler = true;
4660 infos.thread = std::thread(recursorThread, 0, "web+stat");
4661
4662 for (auto & ti : s_threadInfos) {
4663 ti.thread.join();
4664 }
4665 }
4666
4667 #ifdef HAVE_PROTOBUF
4668 google::protobuf::ShutdownProtobufLibrary();
4669 #endif /* HAVE_PROTOBUF */
4670 return 0;
4671 }
4672
4673 static void* recursorThread(unsigned int n, const string& threadName)
4674 try
4675 {
4676 t_id=n;
4677 auto& threadInfo = s_threadInfos.at(t_id);
4678
4679 static string threadPrefix = "pdns-r/";
4680 setThreadName(threadPrefix + threadName);
4681
4682 SyncRes tmp(g_now); // make sure it allocates tsstorage before we do anything, like primeHints or so..
4683 SyncRes::setDomainMap(g_initialDomainMap);
4684 t_allowFrom = g_initialAllowFrom;
4685 t_udpclientsocks = std::unique_ptr<UDPClientSocks>(new UDPClientSocks());
4686 t_tcpClientCounts = std::unique_ptr<tcpClientCounts_t>(new tcpClientCounts_t());
4687 primeHints();
4688
4689 t_packetCache = std::unique_ptr<RecursorPacketCache>(new RecursorPacketCache());
4690
4691 g_log<<Logger::Warning<<"Done priming cache with root hints"<<endl;
4692
4693 #ifdef NOD_ENABLED
4694 if (threadInfo.isWorker)
4695 setupNODThread();
4696 #endif /* NOD_ENABLED */
4697
4698 /* the listener threads handle TCP queries */
4699 if(threadInfo.isWorker || threadInfo.isListener) {
4700 try {
4701 if(!::arg()["lua-dns-script"].empty()) {
4702 t_pdl = std::make_shared<RecursorLua4>();
4703 t_pdl->loadFile(::arg()["lua-dns-script"]);
4704 g_log<<Logger::Warning<<"Loaded 'lua' script from '"<<::arg()["lua-dns-script"]<<"'"<<endl;
4705 }
4706 }
4707 catch(std::exception &e) {
4708 g_log<<Logger::Error<<"Failed to load 'lua' script from '"<<::arg()["lua-dns-script"]<<"': "<<e.what()<<endl;
4709 _exit(99);
4710 }
4711 }
4712
4713 unsigned int ringsize=::arg().asNum("stats-ringbuffer-entries") / g_numWorkerThreads;
4714 if(ringsize) {
4715 t_remotes = std::unique_ptr<addrringbuf_t>(new addrringbuf_t());
4716 if(g_weDistributeQueries)
4717 t_remotes->set_capacity(::arg().asNum("stats-ringbuffer-entries") / g_numDistributorThreads);
4718 else
4719 t_remotes->set_capacity(ringsize);
4720 t_servfailremotes = std::unique_ptr<addrringbuf_t>(new addrringbuf_t());
4721 t_servfailremotes->set_capacity(ringsize);
4722 t_bogusremotes = std::unique_ptr<addrringbuf_t>(new addrringbuf_t());
4723 t_bogusremotes->set_capacity(ringsize);
4724 t_largeanswerremotes = std::unique_ptr<addrringbuf_t>(new addrringbuf_t());
4725 t_largeanswerremotes->set_capacity(ringsize);
4726 t_timeouts = std::unique_ptr<addrringbuf_t>(new addrringbuf_t());
4727 t_timeouts->set_capacity(ringsize);
4728
4729 t_queryring = std::unique_ptr<boost::circular_buffer<pair<DNSName, uint16_t> > >(new boost::circular_buffer<pair<DNSName, uint16_t> >());
4730 t_queryring->set_capacity(ringsize);
4731 t_servfailqueryring = std::unique_ptr<boost::circular_buffer<pair<DNSName, uint16_t> > >(new boost::circular_buffer<pair<DNSName, uint16_t> >());
4732 t_servfailqueryring->set_capacity(ringsize);
4733 t_bogusqueryring = std::unique_ptr<boost::circular_buffer<pair<DNSName, uint16_t> > >(new boost::circular_buffer<pair<DNSName, uint16_t> >());
4734 t_bogusqueryring->set_capacity(ringsize);
4735 }
4736
4737 MT=std::unique_ptr<MTasker<PacketID,string> >(new MTasker<PacketID,string>(::arg().asNum("stack-size")));
4738 threadInfo.mt = MT.get();
4739
4740 #ifdef HAVE_PROTOBUF
4741 /* start protobuf export threads if needed */
4742 auto luaconfsLocal = g_luaconfs.getLocal();
4743 checkProtobufExport(luaconfsLocal);
4744 checkOutgoingProtobufExport(luaconfsLocal);
4745 #endif /* HAVE_PROTOBUF */
4746 #ifdef HAVE_FSTRM
4747 checkFrameStreamExport(luaconfsLocal);
4748 #endif
4749
4750 PacketID pident;
4751
4752 t_fdm=getMultiplexer();
4753
4754 RecursorWebServer *rws = nullptr;
4755
4756 if(threadInfo.isHandler) {
4757 if(::arg().mustDo("webserver")) {
4758 g_log<<Logger::Warning << "Enabling web server" << endl;
4759 try {
4760 rws = new RecursorWebServer(t_fdm);
4761 }
4762 catch(PDNSException &e) {
4763 g_log<<Logger::Error<<"Exception: "<<e.reason<<endl;
4764 exit(99);
4765 }
4766 }
4767 g_log<<Logger::Info<<"Enabled '"<< t_fdm->getName() << "' multiplexer"<<endl;
4768 }
4769 else {
4770
4771 t_fdm->addReadFD(threadInfo.pipes.readToThread, handlePipeRequest);
4772 t_fdm->addReadFD(threadInfo.pipes.readQueriesToThread, handlePipeRequest);
4773
4774 if (threadInfo.isListener) {
4775 if (g_reusePort) {
4776 /* then every listener has its own FDs */
4777 for(const auto& deferred : threadInfo.deferredAdds) {
4778 t_fdm->addReadFD(deferred.first, deferred.second);
4779 }
4780 }
4781 else {
4782 /* otherwise all listeners are listening on the same ones */
4783 for(const auto& deferred : g_deferredAdds) {
4784 t_fdm->addReadFD(deferred.first, deferred.second);
4785 }
4786 }
4787 }
4788 }
4789
4790 registerAllStats();
4791
4792 if(threadInfo.isHandler) {
4793 t_fdm->addReadFD(s_rcc.d_fd, handleRCC); // control channel
4794 }
4795
4796 unsigned int maxTcpClients=::arg().asNum("max-tcp-clients");
4797
4798 bool listenOnTCP(true);
4799
4800 time_t last_stat = 0;
4801 time_t last_carbon=0, last_lua_maintenance=0;
4802 time_t carbonInterval=::arg().asNum("carbon-interval");
4803 time_t luaMaintenanceInterval=::arg().asNum("lua-maintenance-interval");
4804 counter.store(0); // used to periodically execute certain tasks
4805
4806 while (!RecursorControlChannel::stop) {
4807 while(MT->schedule(&g_now)); // MTasker letting the mthreads do their thing
4808
4809 if(!(counter%500)) {
4810 MT->makeThread(houseKeeping, 0);
4811 }
4812
4813 if(!(counter%55)) {
4814 typedef vector<pair<int, FDMultiplexer::funcparam_t> > expired_t;
4815 expired_t expired=t_fdm->getTimeouts(g_now);
4816
4817 for(expired_t::iterator i=expired.begin() ; i != expired.end(); ++i) {
4818 shared_ptr<TCPConnection> conn=any_cast<shared_ptr<TCPConnection> >(i->second);
4819 if(g_logCommonErrors)
4820 g_log<<Logger::Warning<<"Timeout from remote TCP client "<< conn->d_remote.toStringWithPort() <<endl;
4821 t_fdm->removeReadFD(i->first);
4822 }
4823 }
4824
4825 counter++;
4826
4827 if(threadInfo.isHandler) {
4828 if(statsWanted || (g_statisticsInterval > 0 && (g_now.tv_sec - last_stat) >= g_statisticsInterval)) {
4829 doStats();
4830 last_stat = g_now.tv_sec;
4831 }
4832
4833 Utility::gettimeofday(&g_now, 0);
4834
4835 if((g_now.tv_sec - last_carbon) >= carbonInterval) {
4836 MT->makeThread(doCarbonDump, 0);
4837 last_carbon = g_now.tv_sec;
4838 }
4839 }
4840 if (t_pdl != nullptr) {
4841 // lua-dns-script directive is present, call the maintenance callback if needed
4842 /* remember that the listener threads handle TCP queries */
4843 if (threadInfo.isWorker || threadInfo.isListener) {
4844 // Only on threads processing queries
4845 if(g_now.tv_sec - last_lua_maintenance >= luaMaintenanceInterval) {
4846 t_pdl->maintenance();
4847 last_lua_maintenance = g_now.tv_sec;
4848 }
4849 }
4850 }
4851
4852 t_fdm->run(&g_now);
4853 // 'run' updates g_now for us
4854
4855 if(threadInfo.isListener) {
4856 if(listenOnTCP) {
4857 if(TCPConnection::getCurrentConnections() > maxTcpClients) { // shutdown, too many connections
4858 for(const auto fd : threadInfo.tcpSockets) {
4859 t_fdm->removeReadFD(fd);
4860 }
4861 listenOnTCP=false;
4862 }
4863 }
4864 else {
4865 if(TCPConnection::getCurrentConnections() <= maxTcpClients) { // reenable
4866 for(const auto fd : threadInfo.tcpSockets) {
4867 t_fdm->addReadFD(fd, handleNewTCPQuestion);
4868 }
4869 listenOnTCP=true;
4870 }
4871 }
4872 }
4873 }
4874 delete rws;
4875 delete t_fdm;
4876 return 0;
4877 }
4878 catch(PDNSException &ae) {
4879 g_log<<Logger::Error<<"Exception: "<<ae.reason<<endl;
4880 return 0;
4881 }
4882 catch(std::exception &e) {
4883 g_log<<Logger::Error<<"STL Exception: "<<e.what()<<endl;
4884 return 0;
4885 }
4886 catch(...) {
4887 g_log<<Logger::Error<<"any other exception in main: "<<endl;
4888 return 0;
4889 }
4890
4891
4892 int main(int argc, char **argv)
4893 {
4894 g_argc = argc;
4895 g_argv = argv;
4896 g_stats.startupTime=time(0);
4897 Utility::srandom();
4898 versionSetProduct(ProductRecursor);
4899 reportBasicTypes();
4900 reportOtherTypes();
4901
4902 int ret = EXIT_SUCCESS;
4903
4904 try {
4905 ::arg().set("stack-size","stack size per mthread")="200000";
4906 ::arg().set("soa-minimum-ttl","Don't change")="0";
4907 ::arg().set("no-shuffle","Don't change")="off";
4908 ::arg().set("local-port","port to listen on")="53";
4909 ::arg().set("local-address","IP addresses to listen on, separated by spaces or commas. Also accepts ports.")="127.0.0.1";
4910 ::arg().setSwitch("non-local-bind", "Enable binding to non-local addresses by using FREEBIND / BINDANY socket options")="no";
4911 ::arg().set("trace","if we should output heaps of logging. set to 'fail' to only log failing domains")="off";
4912 ::arg().set("dnssec", "DNSSEC mode: off/process-no-validate (default)/process/log-fail/validate")="process-no-validate";
4913 ::arg().set("dnssec-log-bogus", "Log DNSSEC bogus validations")="no";
4914 ::arg().set("signature-inception-skew", "Allow the signature inception to be off by this number of seconds")="60";
4915 ::arg().set("daemon","Operate as a daemon")="no";
4916 ::arg().setSwitch("write-pid","Write a PID file")="yes";
4917 ::arg().set("loglevel","Amount of logging. Higher is more. Do not set below 3")="6";
4918 ::arg().set("disable-syslog","Disable logging to syslog, useful when running inside a supervisor that logs stdout")="no";
4919 ::arg().set("log-timestamp","Print timestamps in log lines, useful to disable when running with a tool that timestamps stdout already")="yes";
4920 ::arg().set("log-common-errors","If we should log rather common errors")="no";
4921 ::arg().set("chroot","switch to chroot jail")="";
4922 ::arg().set("setgid","If set, change group id to this gid for more security"
4923 #ifdef HAVE_SYSTEMD
4924 #define SYSTEMD_SETID_MSG ". When running inside systemd, use the User and Group settings in the unit-file!"
4925 SYSTEMD_SETID_MSG
4926 #endif
4927 )="";
4928 ::arg().set("setuid","If set, change user id to this uid for more security"
4929 #ifdef HAVE_SYSTEMD
4930 SYSTEMD_SETID_MSG
4931 #endif
4932 )="";
4933 ::arg().set("network-timeout", "Wait this number of milliseconds for network i/o")="1500";
4934 ::arg().set("threads", "Launch this number of threads")="2";
4935 ::arg().set("distributor-threads", "Launch this number of distributor threads, distributing queries to other threads")="0";
4936 ::arg().set("processes", "Launch this number of processes (EXPERIMENTAL, DO NOT CHANGE)")="1"; // if we un-experimental this, need to fix openssl rand seeding for multiple PIDs!
4937 ::arg().set("config-name","Name of this virtual configuration - will rename the binary image")="";
4938 ::arg().set("api-config-dir", "Directory where REST API stores config and zones") = "";
4939 ::arg().set("api-key", "Static pre-shared authentication key for access to the REST API") = "";
4940 ::arg().setSwitch("webserver", "Start a webserver (for REST API)") = "no";
4941 ::arg().set("webserver-address", "IP Address of webserver to listen on") = "127.0.0.1";
4942 ::arg().set("webserver-port", "Port of webserver to listen on") = "8082";
4943 ::arg().set("webserver-password", "Password required for accessing the webserver") = "";
4944 ::arg().set("webserver-allow-from","Webserver access is only allowed from these subnets")="127.0.0.1,::1";
4945 ::arg().set("webserver-loglevel", "Amount of logging in the webserver (none, normal, detailed)") = "normal";
4946 ::arg().set("carbon-ourname", "If set, overrides our reported hostname for carbon stats")="";
4947 ::arg().set("carbon-server", "If set, send metrics in carbon (graphite) format to this server IP address")="";
4948 ::arg().set("carbon-interval", "Number of seconds between carbon (graphite) updates")="30";
4949 ::arg().set("carbon-namespace", "If set overwrites the first part of the carbon string")="pdns";
4950 ::arg().set("carbon-instance", "If set overwrites the the instance name default")="recursor";
4951
4952 ::arg().set("statistics-interval", "Number of seconds between printing of recursor statistics, 0 to disable")="1800";
4953 ::arg().set("quiet","Suppress logging of questions and answers")="";
4954 ::arg().set("logging-facility","Facility to log messages as. 0 corresponds to local0")="";
4955 ::arg().set("config-dir","Location of configuration directory (recursor.conf)")=SYSCONFDIR;
4956 ::arg().set("socket-owner","Owner of socket")="";
4957 ::arg().set("socket-group","Group of socket")="";
4958 ::arg().set("socket-mode", "Permissions for socket")="";
4959
4960 ::arg().set("socket-dir",string("Where the controlsocket will live, ")+LOCALSTATEDIR+"/pdns-recursor when unset and not chrooted" )="";
4961 ::arg().set("delegation-only","Which domains we only accept delegations from")="";
4962 ::arg().set("query-local-address","Source IP address for sending queries")="0.0.0.0";
4963 ::arg().set("query-local-address6","Source IPv6 address for sending queries. IF UNSET, IPv6 WILL NOT BE USED FOR OUTGOING QUERIES")="";
4964 ::arg().set("client-tcp-timeout","Timeout in seconds when talking to TCP clients")="2";
4965 ::arg().set("max-mthreads", "Maximum number of simultaneous Mtasker threads")="2048";
4966 ::arg().set("max-tcp-clients","Maximum number of simultaneous TCP clients")="128";
4967 ::arg().set("max-concurrent-requests-per-tcp-connection", "Maximum number of requests handled concurrently per TCP connection") = "10";
4968 ::arg().set("server-down-max-fails","Maximum number of consecutive timeouts (and unreachables) to mark a server as down ( 0 => disabled )")="64";
4969 ::arg().set("server-down-throttle-time","Number of seconds to throttle all queries to a server after being marked as down")="60";
4970 ::arg().set("dont-throttle-names", "Do not throttle nameservers with this name or suffix")="";
4971 ::arg().set("dont-throttle-netmasks", "Do not throttle nameservers with this IP netmask")="";
4972 ::arg().set("hint-file", "If set, load root hints from this file")="";
4973 ::arg().set("max-cache-entries", "If set, maximum number of entries in the main cache")="1000000";
4974 ::arg().set("max-negative-ttl", "maximum number of seconds to keep a negative cached entry in memory")="3600";
4975 ::arg().set("max-cache-bogus-ttl", "maximum number of seconds to keep a Bogus (positive or negative) cached entry in memory")="3600";
4976 ::arg().set("max-cache-ttl", "maximum number of seconds to keep a cached entry in memory")="86400";
4977 ::arg().set("packetcache-ttl", "maximum number of seconds to keep a cached entry in packetcache")="3600";
4978 ::arg().set("max-packetcache-entries", "maximum number of entries to keep in the packetcache")="500000";
4979 ::arg().set("packetcache-servfail-ttl", "maximum number of seconds to keep a cached servfail entry in packetcache")="60";
4980 ::arg().set("server-id", "Returned when queried for 'id.server' TXT or NSID, defaults to hostname, set custom or 'disabled'")="";
4981 ::arg().set("stats-ringbuffer-entries", "maximum number of packets to store statistics for")="10000";
4982 ::arg().set("version-string", "string reported on version.pdns or version.bind")=fullVersionString();
4983 ::arg().set("allow-from", "If set, only allow these comma separated netmasks to recurse")=LOCAL_NETS;
4984 ::arg().set("allow-from-file", "If set, load allowed netmasks from this file")="";
4985 ::arg().set("entropy-source", "If set, read entropy from this file")="/dev/urandom";
4986 ::arg().set("dont-query", "If set, do not query these netmasks for DNS data")=DONT_QUERY;
4987 ::arg().set("max-tcp-per-client", "If set, maximum number of TCP sessions per client (IP address)")="0";
4988 ::arg().set("max-tcp-queries-per-connection", "If set, maximum number of TCP queries in a TCP connection")="0";
4989 ::arg().set("spoof-nearmiss-max", "If non-zero, assume spoofing after this many near misses")="20";
4990 ::arg().set("single-socket", "If set, only use a single socket for outgoing queries")="off";
4991 ::arg().set("auth-zones", "Zones for which we have authoritative data, comma separated domain=file pairs ")="";
4992 ::arg().set("lua-config-file", "More powerful configuration options")="";
4993 ::arg().setSwitch("allow-trust-anchor-query", "Allow queries for trustanchor.server CH TXT and negativetrustanchor.server CH TXT")="no";
4994
4995 ::arg().set("forward-zones", "Zones for which we forward queries, comma separated domain=ip pairs")="";
4996 ::arg().set("forward-zones-recurse", "Zones for which we forward queries with recursion bit, comma separated domain=ip pairs")="";
4997 ::arg().set("forward-zones-file", "File with (+)domain=ip pairs for forwarding")="";
4998 ::arg().set("export-etc-hosts", "If we should serve up contents from /etc/hosts")="off";
4999 ::arg().set("export-etc-hosts-search-suffix", "Also serve up the contents of /etc/hosts with this suffix")="";
5000 ::arg().set("etc-hosts-file", "Path to 'hosts' file")="/etc/hosts";
5001 ::arg().set("serve-rfc1918", "If we should be authoritative for RFC 1918 private IP space")="yes";
5002 ::arg().set("lua-dns-script", "Filename containing an optional 'lua' script that will be used to modify dns answers")="";
5003 ::arg().set("lua-maintenance-interval", "Number of seconds between calls to the lua user defined maintenance() function")="1";
5004 ::arg().set("latency-statistic-size","Number of latency values to calculate the qa-latency average")="10000";
5005 ::arg().setSwitch( "disable-packetcache", "Disable packetcache" )= "no";
5006 ::arg().set("ecs-ipv4-bits", "Number of bits of IPv4 address to pass for EDNS Client Subnet")="24";
5007 ::arg().set("ecs-ipv4-cache-bits", "Maximum number of bits of IPv4 mask to cache ECS response")="24";
5008 ::arg().set("ecs-ipv6-bits", "Number of bits of IPv6 address to pass for EDNS Client Subnet")="56";
5009 ::arg().set("ecs-ipv6-cache-bits", "Maximum number of bits of IPv6 mask to cache ECS response")="56";
5010 ::arg().set("ecs-minimum-ttl-override", "Set under adverse conditions, a minimum TTL for records in ECS-specific answers")="0";
5011 ::arg().set("ecs-cache-limit-ttl", "Minimum TTL to cache ECS response")="0";
5012 ::arg().set("edns-subnet-whitelist", "List of netmasks and domains that we should enable EDNS subnet for")="";
5013 ::arg().set("ecs-add-for", "List of client netmasks for which EDNS Client Subnet will be added")="0.0.0.0/0, ::/0, " LOCAL_NETS_INVERSE;
5014 ::arg().set("ecs-scope-zero-address", "Address to send to whitelisted authoritative servers for incoming queries with ECS prefix-length source of 0")="";
5015 ::arg().setSwitch( "use-incoming-edns-subnet", "Pass along received EDNS Client Subnet information")="no";
5016 ::arg().setSwitch( "pdns-distributes-queries", "If PowerDNS itself should distribute queries over threads")="yes";
5017 ::arg().setSwitch( "root-nx-trust", "If set, believe that an NXDOMAIN from the root means the TLD does not exist")="yes";
5018 ::arg().setSwitch( "any-to-tcp","Answer ANY queries with tc=1, shunting to TCP" )="no";
5019 ::arg().setSwitch( "lowercase-outgoing","Force outgoing questions to lowercase")="no";
5020 ::arg().setSwitch("gettag-needs-edns-options", "If EDNS Options should be extracted before calling the gettag() hook")="no";
5021 ::arg().set("udp-truncation-threshold", "Maximum UDP response size before we truncate")="1232";
5022 ::arg().set("edns-outgoing-bufsize", "Outgoing EDNS buffer size")="1232";
5023 ::arg().set("minimum-ttl-override", "Set under adverse conditions, a minimum TTL")="0";
5024 ::arg().set("max-qperq", "Maximum outgoing queries per query")="60";
5025 ::arg().set("max-total-msec", "Maximum total wall-clock time per query in milliseconds, 0 for unlimited")="7000";
5026 ::arg().set("max-recursion-depth", "Maximum number of internal recursion calls per query, 0 for unlimited")="40";
5027 ::arg().set("max-udp-queries-per-round", "Maximum number of UDP queries processed per recvmsg() round, before returning back to normal processing")="10000";
5028 ::arg().set("protobuf-use-kernel-timestamp", "Compute the latency of queries in protobuf messages by using the timestamp set by the kernel when the query was received (when available)")="";
5029 ::arg().set("distribution-pipe-buffer-size", "Size in bytes of the internal buffer of the pipe used by the distributor to pass incoming queries to a worker thread")="0";
5030
5031 ::arg().set("include-dir","Include *.conf files from this directory")="";
5032 ::arg().set("security-poll-suffix","Domain name from which to query security update notifications")="secpoll.powerdns.com.";
5033
5034 ::arg().setSwitch("reuseport","Enable SO_REUSEPORT allowing multiple recursors processes to listen to 1 address")="no";
5035
5036 ::arg().setSwitch("snmp-agent", "If set, register as an SNMP agent")="no";
5037 ::arg().set("snmp-master-socket", "If set and snmp-agent is set, the socket to use to register to the SNMP master")="";
5038
5039 std::string defaultBlacklistedStats = "cache-bytes, packetcache-bytes, special-memory-usage";
5040 for (size_t idx = 0; idx < 32; idx++) {
5041 defaultBlacklistedStats += ", ecs-v4-response-bits-" + std::to_string(idx + 1);
5042 }
5043 for (size_t idx = 0; idx < 128; idx++) {
5044 defaultBlacklistedStats += ", ecs-v6-response-bits-" + std::to_string(idx + 1);
5045 }
5046 ::arg().set("stats-api-blacklist", "List of statistics that are disabled when retrieving the complete list of statistics via the API")=defaultBlacklistedStats;
5047 ::arg().set("stats-carbon-blacklist", "List of statistics that are prevented from being exported via Carbon")=defaultBlacklistedStats;
5048 ::arg().set("stats-rec-control-blacklist", "List of statistics that are prevented from being exported via rec_control get-all")=defaultBlacklistedStats;
5049 ::arg().set("stats-snmp-blacklist", "List of statistics that are prevented from being exported via SNMP")=defaultBlacklistedStats;
5050
5051 ::arg().set("tcp-fast-open", "Enable TCP Fast Open support on the listening sockets, using the supplied numerical value as the queue size")="0";
5052 ::arg().set("nsec3-max-iterations", "Maximum number of iterations allowed for an NSEC3 record")="2500";
5053
5054 ::arg().set("cpu-map", "Thread to CPU mapping, space separated thread-id=cpu1,cpu2..cpuN pairs")="";
5055
5056 ::arg().setSwitch("log-rpz-changes", "Log additions and removals to RPZ zones at Info level")="no";
5057
5058 ::arg().set("xpf-allow-from","XPF information is only processed from these subnets")="";
5059 ::arg().set("xpf-rr-code","XPF option code to use")="0";
5060
5061 ::arg().set("proxy-protocol-from", "A Proxy Protocol header is only allowed from these subnets")="";
5062 ::arg().set("proxy-protocol-maximum-size", "The maximum size of a proxy protocol payload, including the TLV values")="512";
5063
5064 ::arg().set("dns64-prefix", "DNS64 prefix")="";
5065
5066 ::arg().set("udp-source-port-min", "Minimum UDP port to bind on")="1024";
5067 ::arg().set("udp-source-port-max", "Maximum UDP port to bind on")="65535";
5068 ::arg().set("udp-source-port-avoid", "List of comma separated UDP port number to avoid")="11211";
5069 ::arg().set("rng", "Specify random number generator to use. Valid values are auto,sodium,openssl,getrandom,arc4random,urandom.")="auto";
5070 ::arg().set("public-suffix-list-file", "Path to the Public Suffix List file, if any")="";
5071 ::arg().set("distribution-load-factor", "The load factor used when PowerDNS is distributing queries to worker threads")="0.0";
5072
5073 ::arg().setSwitch("qname-minimization", "Use Query Name Minimization")="yes";
5074 ::arg().setSwitch("nothing-below-nxdomain", "When an NXDOMAIN exists in cache for a name with fewer labels than the qname, send NXDOMAIN without doing a lookup (see RFC 8020)")="dnssec";
5075 ::arg().set("max-generate-steps", "Maximum number of $GENERATE steps when loading a zone from a file")="0";
5076 ::arg().set("cache-shards", "Number of shards in the record cache")="1024";
5077
5078 #ifdef NOD_ENABLED
5079 ::arg().set("new-domain-tracking", "Track newly observed domains (i.e. never seen before).")="no";
5080 ::arg().set("new-domain-log", "Log newly observed domains.")="yes";
5081 ::arg().set("new-domain-lookup", "Perform a DNS lookup newly observed domains as a subdomain of the configured domain")="";
5082 ::arg().set("new-domain-history-dir", "Persist new domain tracking data here to persist between restarts")=string(NODCACHEDIR)+"/nod";
5083 ::arg().set("new-domain-whitelist", "List of domains (and implicitly all subdomains) which will never be considered a new domain")="";
5084 ::arg().set("new-domain-db-size", "Size of the DB used to track new domains in terms of number of cells. Defaults to 67108864")="67108864";
5085 ::arg().set("new-domain-pb-tag", "If protobuf is configured, the tag to use for messages containing newly observed domains. Defaults to 'pdns-nod'")="pdns-nod";
5086 ::arg().set("unique-response-tracking", "Track unique responses (tuple of query name, type and RR).")="no";
5087 ::arg().set("unique-response-log", "Log unique responses")="yes";
5088 ::arg().set("unique-response-history-dir", "Persist unique response tracking data here to persist between restarts")=string(NODCACHEDIR)+"/udr";
5089 ::arg().set("unique-response-db-size", "Size of the DB used to track unique responses in terms of number of cells. Defaults to 67108864")="67108864";
5090 ::arg().set("unique-response-pb-tag", "If protobuf is configured, the tag to use for messages containing unique DNS responses. Defaults to 'pdns-udr'")="pdns-udr";
5091 #endif /* NOD_ENABLED */
5092 ::arg().setCmd("help","Provide a helpful message");
5093 ::arg().setCmd("version","Print version string");
5094 ::arg().setCmd("config","Output blank configuration");
5095 ::arg().setDefaults();
5096 g_log.toConsole(Logger::Info);
5097 ::arg().laxParse(argc,argv); // do a lax parse
5098
5099 string configname=::arg()["config-dir"]+"/recursor.conf";
5100 if(::arg()["config-name"]!="") {
5101 configname=::arg()["config-dir"]+"/recursor-"+::arg()["config-name"]+".conf";
5102 s_programname+="-"+::arg()["config-name"];
5103 }
5104 cleanSlashes(configname);
5105
5106 if(!::arg().getCommands().empty()) {
5107 cerr<<"Fatal: non-option";
5108 if (::arg().getCommands().size() > 1) {
5109 cerr<<"s";
5110 }
5111 cerr<<" (";
5112 bool first = true;
5113 for (const auto& c : ::arg().getCommands()) {
5114 if (!first) {
5115 cerr<<", ";
5116 }
5117 first = false;
5118 cerr<<c;
5119 }
5120 cerr<<") on the command line, perhaps a '--setting=123' statement missed the '='?"<<endl;
5121 exit(99);
5122 }
5123
5124 if(::arg().mustDo("config")) {
5125 cout<<::arg().configstring(false, true);
5126 exit(0);
5127 }
5128
5129 if(!::arg().file(configname.c_str()))
5130 g_log<<Logger::Warning<<"Unable to parse configuration file '"<<configname<<"'"<<endl;
5131
5132 ::arg().parse(argc,argv);
5133
5134 if( !::arg()["chroot"].empty() && !::arg()["api-config-dir"].empty() ) {
5135 g_log<<Logger::Error<<"Using chroot and enabling the API is not possible"<<endl;
5136 exit(EXIT_FAILURE);
5137 }
5138
5139 if (::arg()["socket-dir"].empty()) {
5140 if (::arg()["chroot"].empty())
5141 ::arg().set("socket-dir") = std::string(LOCALSTATEDIR) + "/pdns-recursor";
5142 else
5143 ::arg().set("socket-dir") = "/";
5144 }
5145
5146 ::arg().set("delegation-only")=toLower(::arg()["delegation-only"]);
5147
5148 if(::arg().asNum("threads")==1) {
5149 if (::arg().mustDo("pdns-distributes-queries")) {
5150 g_log<<Logger::Warning<<"Only one thread, no need to distribute queries ourselves"<<endl;
5151 ::arg().set("pdns-distributes-queries")="no";
5152 }
5153 }
5154
5155 if(::arg().mustDo("pdns-distributes-queries") && ::arg().asNum("distributor-threads") <= 0) {
5156 g_log<<Logger::Warning<<"Asked to run with pdns-distributes-queries set but no distributor threads, raising to 1"<<endl;
5157 ::arg().set("distributor-threads")="1";
5158 }
5159
5160 if (!::arg().mustDo("pdns-distributes-queries")) {
5161 ::arg().set("distributor-threads")="0";
5162 }
5163
5164 if(::arg().mustDo("help")) {
5165 cout<<"syntax:"<<endl<<endl;
5166 cout<<::arg().helpstring(::arg()["help"])<<endl;
5167 exit(0);
5168 }
5169 if(::arg().mustDo("version")) {
5170 showProductVersion();
5171 showBuildConfiguration();
5172 exit(0);
5173 }
5174
5175 s_RC = std::unique_ptr<MemRecursorCache>(new MemRecursorCache(::arg().asNum("cache-shards")));
5176
5177 Logger::Urgency logUrgency = (Logger::Urgency)::arg().asNum("loglevel");
5178
5179 if (logUrgency < Logger::Error)
5180 logUrgency = Logger::Error;
5181 if(!g_quiet && logUrgency < Logger::Info) { // Logger::Info=6, Logger::Debug=7
5182 logUrgency = Logger::Info; // if you do --quiet=no, you need Info to also see the query log
5183 }
5184 g_log.setLoglevel(logUrgency);
5185 g_log.toConsole(logUrgency);
5186
5187 serviceMain(argc, argv);
5188 }
5189 catch(PDNSException &ae) {
5190 g_log<<Logger::Error<<"Exception: "<<ae.reason<<endl;
5191 ret=EXIT_FAILURE;
5192 }
5193 catch(std::exception &e) {
5194 g_log<<Logger::Error<<"STL Exception: "<<e.what()<<endl;
5195 ret=EXIT_FAILURE;
5196 }
5197 catch(...) {
5198 g_log<<Logger::Error<<"any other exception in main: "<<endl;
5199 ret=EXIT_FAILURE;
5200 }
5201
5202 return ret;
5203 }