]> git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/pdns_recursor.cc
Revert "Bail out when no Context library is available"
[thirdparty/pdns.git] / pdns / pdns_recursor.cc
1 /*
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
8 *
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
22 #ifdef HAVE_CONFIG_H
23 #include "config.h"
24 #endif
25
26 #include <netdb.h>
27 #include <sys/stat.h>
28 #include <unistd.h>
29 #ifdef HAVE_BOOST_CONTAINER_FLAT_SET_HPP
30 #include <boost/container/flat_set.hpp>
31 #endif
32 #include "ws-recursor.hh"
33 #include <thread>
34 #include "threadname.hh"
35 #include "recpacketcache.hh"
36 #include "utility.hh"
37 #include "dns_random.hh"
38 #ifdef HAVE_LIBSODIUM
39 #include <sodium.h>
40 #endif
41 #include "opensslsigners.hh"
42 #include <iostream>
43 #include <errno.h>
44 #include <boost/static_assert.hpp>
45 #include <map>
46 #include <set>
47 #include "recursor_cache.hh"
48 #include "cachecleaner.hh"
49 #include <stdio.h>
50 #include <signal.h>
51 #include <stdlib.h>
52 #include "misc.hh"
53 #include "mtasker.hh"
54 #include <utility>
55 #include "arguments.hh"
56 #include "syncres.hh"
57 #include <fcntl.h>
58 #include <fstream>
59 #include "sortlist.hh"
60 #include "sstuff.hh"
61 #include <boost/tuple/tuple.hpp>
62 #include <boost/tuple/tuple_comparison.hpp>
63 #include <boost/shared_array.hpp>
64 #include <boost/function.hpp>
65 #include <boost/algorithm/string.hpp>
66 #ifdef MALLOC_TRACE
67 #include "malloctrace.hh"
68 #endif
69 #include <netinet/tcp.h>
70 #include "capabilities.hh"
71 #include "dnsparser.hh"
72 #include "dnswriter.hh"
73 #include "dnsrecords.hh"
74 #include "zoneparser-tng.hh"
75 #include "rec_channel.hh"
76 #include "logger.hh"
77 #include "iputils.hh"
78 #include "mplexer.hh"
79 #include "config.h"
80 #include "lua-recursor4.hh"
81 #include "version.hh"
82 #include "responsestats.hh"
83 #include "secpoll-recursor.hh"
84 #include "dnsname.hh"
85 #include "filterpo.hh"
86 #include "rpzloader.hh"
87 #include "validate-recursor.hh"
88 #include "rec-lua-conf.hh"
89 #include "ednsoptions.hh"
90 #include "gettime.hh"
91 #include "pubsuffix.hh"
92 #ifdef NOD_ENABLED
93 #include "nod.hh"
94 #endif /* NOD_ENABLED */
95
96 #include "rec-protobuf.hh"
97 #include "rec-snmp.hh"
98
99 #ifdef HAVE_SYSTEMD
100 #include <systemd/sd-daemon.h>
101 #endif
102
103 #include "namespaces.hh"
104
105 #ifdef HAVE_PROTOBUF
106 #include "uuid-utils.hh"
107 #endif /* HAVE_PROTOBUF */
108
109 #include "xpf.hh"
110
111 typedef map<ComboAddress, uint32_t, ComboAddress::addressOnlyLessThan> tcpClientCounts_t;
112
113 static thread_local std::shared_ptr<RecursorLua4> t_pdl;
114 static thread_local unsigned int t_id = 0;
115 static thread_local std::shared_ptr<Regex> t_traceRegex;
116 static thread_local std::unique_ptr<tcpClientCounts_t> t_tcpClientCounts;
117 #ifdef HAVE_PROTOBUF
118 static thread_local std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>> t_protobufServers{nullptr};
119 static thread_local uint64_t t_protobufServersGeneration;
120 static thread_local std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>> t_outgoingProtobufServers{nullptr};
121 static thread_local uint64_t t_outgoingProtobufServersGeneration;
122 #endif /* HAVE_PROTOBUF */
123
124 #ifdef HAVE_FSTRM
125 static thread_local std::shared_ptr<std::vector<std::unique_ptr<FrameStreamLogger>>> t_frameStreamServers{nullptr};
126 static thread_local uint64_t t_frameStreamServersGeneration;
127 #endif /* HAVE_FSTRM */
128
129 thread_local std::unique_ptr<MT_t> MT; // the big MTasker
130 thread_local std::unique_ptr<MemRecursorCache> t_RC;
131 thread_local std::unique_ptr<RecursorPacketCache> t_packetCache;
132 thread_local FDMultiplexer* t_fdm{nullptr};
133 thread_local std::unique_ptr<addrringbuf_t> t_remotes, t_servfailremotes, t_largeanswerremotes, t_bogusremotes;
134 thread_local std::unique_ptr<boost::circular_buffer<pair<DNSName, uint16_t> > > t_queryring, t_servfailqueryring, t_bogusqueryring;
135 thread_local std::shared_ptr<NetmaskGroup> t_allowFrom;
136 #ifdef NOD_ENABLED
137 thread_local std::shared_ptr<nod::NODDB> t_nodDBp;
138 thread_local std::shared_ptr<nod::UniqueResponseDB> t_udrDBp;
139 #endif /* NOD_ENABLED */
140 __thread struct timeval g_now; // timestamp, updated (too) frequently
141
142 typedef vector<pair<int, function< void(int, any&) > > > deferredAdd_t;
143
144 // for communicating with our threads
145 // effectively readonly after startup
146 struct RecThreadInfo
147 {
148 struct ThreadPipeSet
149 {
150 int writeToThread{-1};
151 int readToThread{-1};
152 int writeFromThread{-1};
153 int readFromThread{-1};
154 int writeQueriesToThread{-1}; // this one is non-blocking
155 int readQueriesToThread{-1};
156 };
157
158 /* FD corresponding to TCP sockets this thread is listening
159 on.
160 These FDs are also in deferredAdds when we have one
161 socket per listener, and in g_deferredAdds instead. */
162 std::set<int> tcpSockets;
163 /* FD corresponding to listening sockets if we have one socket per
164 listener (with reuseport), otherwise all listeners share the
165 same FD and g_deferredAdds is then used instead */
166 deferredAdd_t deferredAdds;
167 struct ThreadPipeSet pipes;
168 std::thread thread;
169 MT_t* mt{nullptr};
170 uint64_t numberOfDistributedQueries{0};
171 /* handle the web server, carbon, statistics and the control channel */
172 bool isHandler{false};
173 /* accept incoming queries (and distributes them to the workers if pdns-distributes-queries is set) */
174 bool isListener{false};
175 /* process queries */
176 bool isWorker{false};
177 };
178
179 /* first we have the handler thread, t_id == 0 (some other
180 helper threads like SNMP might have t_id == 0 as well)
181 then the distributor threads if any
182 and finally the workers */
183 static std::vector<RecThreadInfo> s_threadInfos;
184 /* without reuseport, all listeners share the same sockets */
185 static deferredAdd_t g_deferredAdds;
186
187 typedef vector<int> tcpListenSockets_t;
188 typedef map<int, ComboAddress> listenSocketsAddresses_t; // is shared across all threads right now
189
190 static const ComboAddress g_local4("0.0.0.0"), g_local6("::");
191 static listenSocketsAddresses_t g_listenSocketsAddresses; // is shared across all threads right now
192 static set<int> g_fromtosockets; // listen sockets that use 'sendfromto()' mechanism
193 static vector<ComboAddress> g_localQueryAddresses4, g_localQueryAddresses6;
194 static AtomicCounter counter;
195 static std::shared_ptr<SyncRes::domainmap_t> g_initialDomainMap; // new threads needs this to be setup
196 static std::shared_ptr<NetmaskGroup> g_initialAllowFrom; // new thread needs to be setup with this
197 static NetmaskGroup g_XPFAcl;
198 static size_t g_tcpMaxQueriesPerConn;
199 static size_t s_maxUDPQueriesPerRound;
200 static uint64_t g_latencyStatSize;
201 static uint32_t g_disthashseed;
202 static unsigned int g_maxTCPPerClient;
203 static unsigned int g_maxMThreads;
204 static unsigned int g_numDistributorThreads;
205 static unsigned int g_numWorkerThreads;
206 static int g_tcpTimeout;
207 static uint16_t g_udpTruncationThreshold;
208 static uint16_t g_xpfRRCode{0};
209 static std::atomic<bool> statsWanted;
210 static std::atomic<bool> g_quiet;
211 static bool g_logCommonErrors;
212 static bool g_anyToTcp;
213 static bool g_weDistributeQueries; // if true, 1 or more threads listen on the incoming query sockets and distribute them to workers
214 static bool g_reusePort{false};
215 static bool g_gettagNeedsEDNSOptions{false};
216 static time_t g_statisticsInterval;
217 static bool g_useIncomingECS;
218 static bool g_useKernelTimestamp;
219 std::atomic<uint32_t> g_maxCacheEntries, g_maxPacketCacheEntries;
220 #ifdef NOD_ENABLED
221 static bool g_nodEnabled;
222 static DNSName g_nodLookupDomain;
223 static bool g_nodLog;
224 static SuffixMatchNode g_nodDomainWL;
225 static std::string g_nod_pbtag;
226 static bool g_udrEnabled;
227 static bool g_udrLog;
228 static std::string g_udr_pbtag;
229 #endif /* NOD_ENABLED */
230 #ifdef HAVE_BOOST_CONTAINER_FLAT_SET_HPP
231 static boost::container::flat_set<uint16_t> s_avoidUdpSourcePorts;
232 #else
233 static std::set<uint16_t> s_avoidUdpSourcePorts;
234 #endif
235 static uint16_t s_minUdpSourcePort;
236 static uint16_t s_maxUdpSourcePort;
237 static double s_balancingFactor;
238
239 RecursorControlChannel s_rcc; // only active in the handler thread
240 RecursorStats g_stats;
241 string s_programname="pdns_recursor";
242 string s_pidfname;
243 bool g_lowercaseOutgoing;
244 unsigned int g_networkTimeoutMsec;
245 unsigned int g_numThreads;
246 uint16_t g_outgoingEDNSBufsize;
247 bool g_logRPZChanges{false};
248
249 // Used in the Syncres to not throttle certain servers
250 GlobalStateHolder<SuffixMatchNode> g_dontThrottleNames;
251 GlobalStateHolder<NetmaskGroup> g_dontThrottleNetmasks;
252
253 #define LOCAL_NETS "127.0.0.0/8, 10.0.0.0/8, 100.64.0.0/10, 169.254.0.0/16, 192.168.0.0/16, 172.16.0.0/12, ::1/128, fc00::/7, fe80::/10"
254 #define LOCAL_NETS_INVERSE "!127.0.0.0/8, !10.0.0.0/8, !100.64.0.0/10, !169.254.0.0/16, !192.168.0.0/16, !172.16.0.0/12, !::1/128, !fc00::/7, !fe80::/10"
255 // Bad Nets taken from both:
256 // http://www.iana.org/assignments/iana-ipv4-special-registry/iana-ipv4-special-registry.xhtml
257 // and
258 // http://www.iana.org/assignments/iana-ipv6-special-registry/iana-ipv6-special-registry.xhtml
259 // where such a network may not be considered a valid destination
260 #define BAD_NETS "0.0.0.0/8, 192.0.0.0/24, 192.0.2.0/24, 198.51.100.0/24, 203.0.113.0/24, 240.0.0.0/4, ::/96, ::ffff:0:0/96, 100::/64, 2001:db8::/32"
261 #define DONT_QUERY LOCAL_NETS ", " BAD_NETS
262
263 //! used to send information to a newborn mthread
264 struct DNSComboWriter {
265 DNSComboWriter(const std::string& query, const struct timeval& now): d_mdp(true, query), d_now(now), d_query(query)
266 {
267 }
268
269 DNSComboWriter(const std::string& query, const struct timeval& now, std::vector<std::string>&& policyTags, LuaContext::LuaObject&& data): d_mdp(true, query), d_now(now), d_query(query), d_policyTags(std::move(policyTags)), d_data(std::move(data))
270 {
271 }
272
273 void setRemote(const ComboAddress& sa)
274 {
275 d_remote=sa;
276 }
277
278 void setSource(const ComboAddress& sa)
279 {
280 d_source=sa;
281 }
282
283 void setLocal(const ComboAddress& sa)
284 {
285 d_local=sa;
286 }
287
288 void setDestination(const ComboAddress& sa)
289 {
290 d_destination=sa;
291 }
292
293 void setSocket(int sock)
294 {
295 d_socket=sock;
296 }
297
298 string getRemote() const
299 {
300 if (d_source == d_remote) {
301 return d_source.toStringWithPort();
302 }
303 return d_source.toStringWithPort() + " (proxied by " + d_remote.toStringWithPort() + ")";
304 }
305
306 MOADNSParser d_mdp;
307 struct timeval d_now;
308 /* Remote client, might differ from d_source
309 in case of XPF, in which case d_source holds
310 the IP of the client and d_remote of the proxy
311 */
312 ComboAddress d_remote;
313 ComboAddress d_source;
314 /* Destination address, might differ from
315 d_destination in case of XPF, in which case
316 d_destination holds the IP of the proxy and
317 d_local holds our own. */
318 ComboAddress d_local;
319 ComboAddress d_destination;
320 #ifdef HAVE_PROTOBUF
321 boost::uuids::uuid d_uuid;
322 string d_requestorId;
323 string d_deviceId;
324 string d_deviceName;
325 struct timeval d_kernelTimestamp{0,0};
326 #endif
327 std::string d_query;
328 std::vector<std::string> d_policyTags;
329 LuaContext::LuaObject d_data;
330 EDNSSubnetOpts d_ednssubnet;
331 shared_ptr<TCPConnection> d_tcpConnection;
332 int d_socket;
333 unsigned int d_tag{0};
334 uint32_t d_qhash{0};
335 uint32_t d_ttlCap{std::numeric_limits<uint32_t>::max()};
336 uint16_t d_ecsBegin{0};
337 uint16_t d_ecsEnd{0};
338 bool d_variable{false};
339 bool d_ecsFound{false};
340 bool d_ecsParsed{false};
341 bool d_tcp;
342 };
343
344 MT_t* getMT()
345 {
346 return MT ? MT.get() : nullptr;
347 }
348
349 ArgvMap &arg()
350 {
351 static ArgvMap theArg;
352 return theArg;
353 }
354
355 unsigned int getRecursorThreadId()
356 {
357 return t_id;
358 }
359
360 int getMTaskerTID()
361 {
362 return MT->getTid();
363 }
364
365 static bool isDistributorThread()
366 {
367 if (t_id == 0) {
368 return false;
369 }
370
371 return g_weDistributeQueries && s_threadInfos.at(t_id).isListener;
372 }
373
374 static bool isHandlerThread()
375 {
376 if (t_id == 0) {
377 return true;
378 }
379
380 return s_threadInfos.at(t_id).isHandler;
381 }
382
383 static void handleTCPClientWritable(int fd, FDMultiplexer::funcparam_t& var);
384
385 // -1 is error, 0 is timeout, 1 is success
386 int asendtcp(const string& data, Socket* sock)
387 {
388 PacketID pident;
389 pident.sock=sock;
390 pident.outMSG=data;
391
392 t_fdm->addWriteFD(sock->getHandle(), handleTCPClientWritable, pident);
393 string packet;
394
395 int ret=MT->waitEvent(pident, &packet, g_networkTimeoutMsec);
396
397 if(!ret || ret==-1) { // timeout
398 t_fdm->removeWriteFD(sock->getHandle());
399 }
400 else if(packet.size() !=data.size()) { // main loop tells us what it sent out, or empty in case of an error
401 return -1;
402 }
403 return ret;
404 }
405
406 static void handleTCPClientReadable(int fd, FDMultiplexer::funcparam_t& var);
407
408 // -1 is error, 0 is timeout, 1 is success
409 int arecvtcp(string& data, size_t len, Socket* sock, bool incompleteOkay)
410 {
411 data.clear();
412 PacketID pident;
413 pident.sock=sock;
414 pident.inNeeded=len;
415 pident.inIncompleteOkay=incompleteOkay;
416 t_fdm->addReadFD(sock->getHandle(), handleTCPClientReadable, pident);
417
418 int ret=MT->waitEvent(pident,&data, g_networkTimeoutMsec);
419 if(!ret || ret==-1) { // timeout
420 t_fdm->removeReadFD(sock->getHandle());
421 }
422 else if(data.empty()) {// error, EOF or other
423 return -1;
424 }
425
426 return ret;
427 }
428
429 static void handleGenUDPQueryResponse(int fd, FDMultiplexer::funcparam_t& var)
430 {
431 PacketID pident=*any_cast<PacketID>(&var);
432 char resp[512];
433 ComboAddress fromaddr;
434 socklen_t addrlen=sizeof(fromaddr);
435
436 ssize_t ret=recvfrom(fd, resp, sizeof(resp), 0, (sockaddr *)&fromaddr, &addrlen);
437 if (fromaddr != pident.remote) {
438 g_log<<Logger::Notice<<"Response received from the wrong remote host ("<<fromaddr.toStringWithPort()<<" instead of "<<pident.remote.toStringWithPort()<<"), discarding"<<endl;
439
440 }
441
442 t_fdm->removeReadFD(fd);
443 if(ret >= 0) {
444 string data(resp, (size_t) ret);
445 MT->sendEvent(pident, &data);
446 }
447 else {
448 string empty;
449 MT->sendEvent(pident, &empty);
450 // cerr<<"Had some kind of error: "<<ret<<", "<<strerror(errno)<<endl;
451 }
452 }
453 string GenUDPQueryResponse(const ComboAddress& dest, const string& query)
454 {
455 Socket s(dest.sin4.sin_family, SOCK_DGRAM);
456 s.setNonBlocking();
457 ComboAddress local = getQueryLocalAddress(dest.sin4.sin_family, 0);
458
459 s.bind(local);
460 s.connect(dest);
461 s.send(query);
462
463 PacketID pident;
464 pident.sock=&s;
465 pident.remote=dest;
466 pident.type=0;
467 t_fdm->addReadFD(s.getHandle(), handleGenUDPQueryResponse, pident);
468
469 string data;
470
471 int ret=MT->waitEvent(pident,&data, g_networkTimeoutMsec);
472
473 if(!ret || ret==-1) { // timeout
474 t_fdm->removeReadFD(s.getHandle());
475 }
476 else if(data.empty()) {// error, EOF or other
477 // we could special case this
478 return data;
479 }
480 return data;
481 }
482
483 //! pick a random query local address
484 ComboAddress getQueryLocalAddress(int family, uint16_t port)
485 {
486 ComboAddress ret;
487 if(family==AF_INET) {
488 if(g_localQueryAddresses4.empty())
489 ret = g_local4;
490 else
491 ret = g_localQueryAddresses4[dns_random(g_localQueryAddresses4.size())];
492 ret.sin4.sin_port = htons(port);
493 }
494 else {
495 if(g_localQueryAddresses6.empty())
496 ret = g_local6;
497 else
498 ret = g_localQueryAddresses6[dns_random(g_localQueryAddresses6.size())];
499
500 ret.sin6.sin6_port = htons(port);
501 }
502 return ret;
503 }
504
505 static void handleUDPServerResponse(int fd, FDMultiplexer::funcparam_t&);
506
507 static void setSocketBuffer(int fd, int optname, uint32_t size)
508 {
509 uint32_t psize=0;
510 socklen_t len=sizeof(psize);
511
512 if(!getsockopt(fd, SOL_SOCKET, optname, (char*)&psize, &len) && psize > size) {
513 g_log<<Logger::Error<<"Not decreasing socket buffer size from "<<psize<<" to "<<size<<endl;
514 return;
515 }
516
517 if (setsockopt(fd, SOL_SOCKET, optname, (char*)&size, sizeof(size)) < 0 )
518 g_log<<Logger::Error<<"Unable to raise socket buffer size to "<<size<<": "<<strerror(errno)<<endl;
519 }
520
521
522 static void setSocketReceiveBuffer(int fd, uint32_t size)
523 {
524 setSocketBuffer(fd, SO_RCVBUF, size);
525 }
526
527 static void setSocketSendBuffer(int fd, uint32_t size)
528 {
529 setSocketBuffer(fd, SO_SNDBUF, size);
530 }
531
532
533 // you can ask this class for a UDP socket to send a query from
534 // this socket is not yours, don't even think about deleting it
535 // but after you call 'returnSocket' on it, don't assume anything anymore
536 class UDPClientSocks
537 {
538 unsigned int d_numsocks;
539 public:
540 UDPClientSocks() : d_numsocks(0)
541 {
542 }
543
544 // returning -2 means: temporary OS error (ie, out of files), -1 means error related to remote
545 int getSocket(const ComboAddress& toaddr, int* fd)
546 {
547 *fd=makeClientSocket(toaddr.sin4.sin_family);
548 if(*fd < 0) // temporary error - receive exception otherwise
549 return -2;
550
551 if(connect(*fd, (struct sockaddr*)(&toaddr), toaddr.getSocklen()) < 0) {
552 int err = errno;
553 try {
554 closesocket(*fd);
555 }
556 catch(const PDNSException& e) {
557 g_log<<Logger::Error<<"Error closing UDP socket after connect() failed: "<<e.reason<<endl;
558 }
559
560 if(err==ENETUNREACH) // Seth "My Interfaces Are Like A Yo Yo" Arnold special
561 return -2;
562 return -1;
563 }
564
565 d_numsocks++;
566 return 0;
567 }
568
569 // return a socket to the pool, or simply erase it
570 void returnSocket(int fd)
571 {
572 try {
573 t_fdm->removeReadFD(fd);
574 }
575 catch(const FDMultiplexerException& e) {
576 // we sometimes return a socket that has not yet been assigned to t_fdm
577 }
578
579 try {
580 closesocket(fd);
581 }
582 catch(const PDNSException& e) {
583 g_log<<Logger::Error<<"Error closing returned UDP socket: "<<e.reason<<endl;
584 }
585
586 --d_numsocks;
587 }
588
589 private:
590
591 // returns -1 for errors which might go away, throws for ones that won't
592 static int makeClientSocket(int family)
593 {
594 int ret=socket(family, SOCK_DGRAM, 0 ); // turns out that setting CLO_EXEC and NONBLOCK from here is not a performance win on Linux (oddly enough)
595
596 if(ret < 0 && errno==EMFILE) // this is not a catastrophic error
597 return ret;
598
599 if(ret<0)
600 throw PDNSException("Making a socket for resolver (family = "+std::to_string(family)+"): "+stringerror());
601
602 // setCloseOnExec(ret); // we're not going to exec
603
604 int tries=10;
605 ComboAddress sin;
606 while(--tries) {
607 uint16_t port;
608
609 if(tries==1) // fall back to kernel 'random'
610 port = 0;
611 else {
612 do {
613 port = s_minUdpSourcePort + dns_random(s_maxUdpSourcePort - s_minUdpSourcePort + 1);
614 }
615 while (s_avoidUdpSourcePorts.count(port));
616 }
617
618 sin=getQueryLocalAddress(family, port); // does htons for us
619
620 if (::bind(ret, (struct sockaddr *)&sin, sin.getSocklen()) >= 0)
621 break;
622 }
623
624 if(!tries) {
625 closesocket(ret);
626 throw PDNSException("Resolver binding to local query client socket on "+sin.toString()+": "+stringerror());
627 }
628
629 try {
630 setReceiveSocketErrors(ret, family);
631 setNonBlocking(ret);
632 }
633 catch(...) {
634 closesocket(ret);
635 throw;
636 }
637
638 return ret;
639 }
640 };
641
642 static thread_local std::unique_ptr<UDPClientSocks> t_udpclientsocks;
643
644 /* these two functions are used by LWRes */
645 // -2 is OS error, -1 is error that depends on the remote, > 0 is success
646 int asendto(const char *data, size_t len, int flags,
647 const ComboAddress& toaddr, uint16_t id, const DNSName& domain, uint16_t qtype, int* fd)
648 {
649
650 PacketID pident;
651 pident.domain = domain;
652 pident.remote = toaddr;
653 pident.type = qtype;
654
655 // see if there is an existing outstanding request we can chain on to, using partial equivalence function
656 pair<MT_t::waiters_t::iterator, MT_t::waiters_t::iterator> chain=MT->d_waiters.equal_range(pident, PacketIDBirthdayCompare());
657
658 for(; chain.first != chain.second; chain.first++) {
659 if(chain.first->key.fd > -1) { // don't chain onto existing chained waiter!
660 /*
661 cerr<<"Orig: "<<pident.domain<<", "<<pident.remote.toString()<<", id="<<id<<endl;
662 cerr<<"Had hit: "<< chain.first->key.domain<<", "<<chain.first->key.remote.toString()<<", id="<<chain.first->key.id
663 <<", count="<<chain.first->key.chain.size()<<", origfd: "<<chain.first->key.fd<<endl;
664 */
665 chain.first->key.chain.insert(id); // we can chain
666 *fd=-1; // gets used in waitEvent / sendEvent later on
667 return 1;
668 }
669 }
670
671 int ret=t_udpclientsocks->getSocket(toaddr, fd);
672 if(ret < 0)
673 return ret;
674
675 pident.fd=*fd;
676 pident.id=id;
677
678 t_fdm->addReadFD(*fd, handleUDPServerResponse, pident);
679 ret = send(*fd, data, len, 0);
680
681 int tmp = errno;
682
683 if(ret < 0)
684 t_udpclientsocks->returnSocket(*fd);
685
686 errno = tmp; // this is for logging purposes only
687 return ret;
688 }
689
690 // -1 is error, 0 is timeout, 1 is success
691 int arecvfrom(std::string& packet, int flags, const ComboAddress& fromaddr, size_t *d_len,
692 uint16_t id, const DNSName& domain, uint16_t qtype, int fd, struct timeval* now)
693 {
694 static optional<unsigned int> nearMissLimit;
695 if(!nearMissLimit)
696 nearMissLimit=::arg().asNum("spoof-nearmiss-max");
697
698 PacketID pident;
699 pident.fd=fd;
700 pident.id=id;
701 pident.domain=domain;
702 pident.type = qtype;
703 pident.remote=fromaddr;
704
705 int ret=MT->waitEvent(pident, &packet, g_networkTimeoutMsec, now);
706
707 /* -1 means error, 0 means timeout, 1 means a result from handleUDPServerResponse() which might still be an error */
708 if(ret > 0) {
709 /* handleUDPServerResponse() will close the socket for us no matter what */
710 if(packet.empty()) // means "error"
711 return -1;
712
713 *d_len=packet.size();
714
715 if(*nearMissLimit && pident.nearMisses > *nearMissLimit) {
716 g_log<<Logger::Error<<"Too many ("<<pident.nearMisses<<" > "<<*nearMissLimit<<") bogus answers for '"<<domain<<"' from "<<fromaddr.toString()<<", assuming spoof attempt."<<endl;
717 g_stats.spoofCount++;
718 return -1;
719 }
720 }
721 else {
722 /* getting there means error or timeout, it's up to us to close the socket */
723 if(fd >= 0)
724 t_udpclientsocks->returnSocket(fd);
725 }
726 return ret;
727 }
728
729 static void writePid(void)
730 {
731 if(!::arg().mustDo("write-pid"))
732 return;
733 ofstream of(s_pidfname.c_str(), std::ios_base::app);
734 if(of)
735 of<< Utility::getpid() <<endl;
736 else
737 g_log<<Logger::Error<<"Writing pid for "<<Utility::getpid()<<" to "<<s_pidfname<<" failed: "<<strerror(errno)<<endl;
738 }
739
740 TCPConnection::TCPConnection(int fd, const ComboAddress& addr) : data(2, 0), d_remote(addr), d_fd(fd)
741 {
742 ++s_currentConnections;
743 (*t_tcpClientCounts)[d_remote]++;
744 }
745
746 TCPConnection::~TCPConnection()
747 {
748 try {
749 if(closesocket(d_fd) < 0)
750 g_log<<Logger::Error<<"Error closing socket for TCPConnection"<<endl;
751 }
752 catch(const PDNSException& e) {
753 g_log<<Logger::Error<<"Error closing TCPConnection socket: "<<e.reason<<endl;
754 }
755
756 if(t_tcpClientCounts->count(d_remote) && !(*t_tcpClientCounts)[d_remote]--)
757 t_tcpClientCounts->erase(d_remote);
758 --s_currentConnections;
759 }
760
761 AtomicCounter TCPConnection::s_currentConnections;
762
763 static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var);
764
765 // the idea is, only do things that depend on the *response* here. Incoming accounting is on incoming.
766 static void updateResponseStats(int res, const ComboAddress& remote, unsigned int packetsize, const DNSName* query, uint16_t qtype)
767 {
768 if(packetsize > 1000 && t_largeanswerremotes)
769 t_largeanswerremotes->push_back(remote);
770 switch(res) {
771 case RCode::ServFail:
772 if(t_servfailremotes) {
773 t_servfailremotes->push_back(remote);
774 if(query && t_servfailqueryring) // packet cache
775 t_servfailqueryring->push_back(make_pair(*query, qtype));
776 }
777 g_stats.servFails++;
778 break;
779 case RCode::NXDomain:
780 g_stats.nxDomains++;
781 break;
782 case RCode::NoError:
783 g_stats.noErrors++;
784 break;
785 }
786 }
787
788 static string makeLoginfo(const std::unique_ptr<DNSComboWriter>& dc)
789 try
790 {
791 return "("+dc->d_mdp.d_qname.toLogString()+"/"+DNSRecordContent::NumberToType(dc->d_mdp.d_qtype)+" from "+(dc->getRemote())+")";
792 }
793 catch(...)
794 {
795 return "Exception making error message for exception";
796 }
797
798 #ifdef HAVE_PROTOBUF
799 static void protobufLogQuery(uint8_t maskV4, uint8_t maskV6, const boost::uuids::uuid& uniqueId, const ComboAddress& remote, const ComboAddress& local, const Netmask& ednssubnet, bool tcp, uint16_t id, size_t len, const DNSName& qname, uint16_t qtype, uint16_t qclass, const std::vector<std::string>& policyTags, const std::string& requestorId, const std::string& deviceId, const std::string& deviceName)
800 {
801 if (!t_protobufServers) {
802 return;
803 }
804
805 Netmask requestorNM(remote, remote.sin4.sin_family == AF_INET ? maskV4 : maskV6);
806 const ComboAddress& requestor = requestorNM.getMaskedNetwork();
807 RecProtoBufMessage message(DNSProtoBufMessage::Query, uniqueId, &requestor, &local, qname, qtype, qclass, id, tcp, len);
808 message.setServerIdentity(SyncRes::s_serverID);
809 message.setEDNSSubnet(ednssubnet, ednssubnet.isIpv4() ? maskV4 : maskV6);
810 message.setRequestorId(requestorId);
811 message.setDeviceId(deviceId);
812 message.setDeviceName(deviceName);
813
814 if (!policyTags.empty()) {
815 message.setPolicyTags(policyTags);
816 }
817
818 // cerr <<message.toDebugString()<<endl;
819 std::string str;
820 message.serialize(str);
821
822 for (auto& server : *t_protobufServers) {
823 server->queueData(str);
824 }
825 }
826
827 static void protobufLogResponse(const RecProtoBufMessage& message)
828 {
829 if (!t_protobufServers) {
830 return;
831 }
832
833 // cerr <<message.toDebugString()<<endl;
834 std::string str;
835 message.serialize(str);
836
837 for (auto& server : *t_protobufServers) {
838 server->queueData(str);
839 }
840 }
841 #endif
842
843 /**
844 * Chases the CNAME provided by the PolicyCustom RPZ policy.
845 *
846 * @param spoofed: The DNSRecord that was created by the policy, should already be added to ret
847 * @param qtype: The QType of the original query
848 * @param sr: A SyncRes
849 * @param res: An integer that will contain the RCODE of the lookup we do
850 * @param ret: A vector of DNSRecords where the result of the CNAME chase should be appended to
851 */
852 static void handleRPZCustom(const DNSRecord& spoofed, const QType& qtype, SyncRes& sr, int& res, vector<DNSRecord>& ret)
853 {
854 if (spoofed.d_type == QType::CNAME) {
855 bool oldWantsRPZ = sr.getWantsRPZ();
856 sr.setWantsRPZ(false);
857 vector<DNSRecord> ans;
858 res = sr.beginResolve(DNSName(spoofed.d_content->getZoneRepresentation()), qtype, QClass::IN, ans);
859 for (const auto& rec : ans) {
860 if(rec.d_place == DNSResourceRecord::ANSWER) {
861 ret.push_back(rec);
862 }
863 }
864 // Reset the RPZ state of the SyncRes
865 sr.setWantsRPZ(oldWantsRPZ);
866 }
867 }
868
869 static bool addRecordToPacket(DNSPacketWriter& pw, const DNSRecord& rec, uint32_t& minTTL, uint32_t ttlCap, const uint16_t maxAnswerSize)
870 {
871 pw.startRecord(rec.d_name, rec.d_type, (rec.d_ttl > ttlCap ? ttlCap : rec.d_ttl), rec.d_class, rec.d_place);
872
873 if(rec.d_type != QType::OPT) // their TTL ain't real
874 minTTL = min(minTTL, rec.d_ttl);
875
876 rec.d_content->toPacket(pw);
877 if(pw.size() > static_cast<size_t>(maxAnswerSize)) {
878 pw.rollback();
879 if(rec.d_place != DNSResourceRecord::ADDITIONAL) {
880 pw.getHeader()->tc=1;
881 pw.truncate();
882 }
883 return false;
884 }
885
886 return true;
887 }
888
889 #ifdef HAVE_PROTOBUF
890 static std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>> startProtobufServers(const ProtobufExportConfig& config)
891 {
892 auto result = std::make_shared<std::vector<std::unique_ptr<RemoteLogger>>>();
893
894 for (const auto& server : config.servers) {
895 try {
896 auto logger = make_unique<RemoteLogger>(server, config.timeout, 100*config.maxQueuedEntries, config.reconnectWaitTime, config.asyncConnect);
897 logger->setLogQueries(config.logQueries);
898 logger->setLogResponses(config.logResponses);
899 result->emplace_back(std::move(logger));
900 }
901 catch(const std::exception& e) {
902 g_log<<Logger::Error<<"Error while starting protobuf logger to '"<<server<<": "<<e.what()<<endl;
903 }
904 catch(const PDNSException& e) {
905 g_log<<Logger::Error<<"Error while starting protobuf logger to '"<<server<<": "<<e.reason<<endl;
906 }
907 }
908
909 return result;
910 }
911
912 static bool checkProtobufExport(LocalStateHolder<LuaConfigItems>& luaconfsLocal)
913 {
914 if (!luaconfsLocal->protobufExportConfig.enabled) {
915 if (t_protobufServers) {
916 for (auto& server : *t_protobufServers) {
917 server->stop();
918 }
919 t_protobufServers.reset();
920 }
921
922 return false;
923 }
924
925 /* if the server was not running, or if it was running according to a
926 previous configuration */
927 if (!t_protobufServers ||
928 t_protobufServersGeneration < luaconfsLocal->generation) {
929
930 if (t_protobufServers) {
931 for (auto& server : *t_protobufServers) {
932 server->stop();
933 }
934 }
935 t_protobufServers.reset();
936
937 t_protobufServers = startProtobufServers(luaconfsLocal->protobufExportConfig);
938 t_protobufServersGeneration = luaconfsLocal->generation;
939 }
940
941 return true;
942 }
943
944 static bool checkOutgoingProtobufExport(LocalStateHolder<LuaConfigItems>& luaconfsLocal)
945 {
946 if (!luaconfsLocal->outgoingProtobufExportConfig.enabled) {
947 if (t_outgoingProtobufServers) {
948 for (auto& server : *t_outgoingProtobufServers) {
949 server->stop();
950 }
951 }
952 t_outgoingProtobufServers.reset();
953
954 return false;
955 }
956
957 /* if the server was not running, or if it was running according to a
958 previous configuration */
959 if (!t_outgoingProtobufServers ||
960 t_outgoingProtobufServersGeneration < luaconfsLocal->generation) {
961
962 if (t_outgoingProtobufServers) {
963 for (auto& server : *t_outgoingProtobufServers) {
964 server->stop();
965 }
966 }
967 t_outgoingProtobufServers.reset();
968
969 t_outgoingProtobufServers = startProtobufServers(luaconfsLocal->outgoingProtobufExportConfig);
970 t_outgoingProtobufServersGeneration = luaconfsLocal->generation;
971 }
972
973 return true;
974 }
975
976 #ifdef HAVE_FSTRM
977
978 static std::shared_ptr<std::vector<std::unique_ptr<FrameStreamLogger>>> startFrameStreamServers(const FrameStreamExportConfig& config)
979 {
980 auto result = std::make_shared<std::vector<std::unique_ptr<FrameStreamLogger>>>();
981
982 for (const auto& server : config.servers) {
983 try {
984 std::unordered_map<string,unsigned> options;
985 options["bufferHint"] = config.bufferHint;
986 options["flushTimeout"] = config.flushTimeout;
987 options["inputQueueSize"] = config.inputQueueSize;
988 options["outputQueueSize"] = config.outputQueueSize;
989 options["queueNotifyThreshold"] = config.queueNotifyThreshold;
990 options["reopenInterval"] = config.reopenInterval;
991 FrameStreamLogger *fsl = nullptr;
992 try {
993 ComboAddress address(server);
994 fsl = new FrameStreamLogger(address.sin4.sin_family, address.toStringWithPort(), true, options);
995 }
996 catch (const PDNSException& e) {
997 fsl = new FrameStreamLogger(AF_UNIX, server, true, options);
998 }
999 fsl->setLogQueries(config.logQueries);
1000 fsl->setLogResponses(config.logResponses);
1001 result->emplace_back(fsl);
1002 }
1003 catch(const std::exception& e) {
1004 g_log<<Logger::Error<<"Error while starting dnstap framestream logger to '"<<server<<": "<<e.what()<<endl;
1005 }
1006 catch(const PDNSException& e) {
1007 g_log<<Logger::Error<<"Error while starting dnstap framestream logger to '"<<server<<": "<<e.reason<<endl;
1008 }
1009 }
1010
1011 return result;
1012 }
1013
1014 static bool checkFrameStreamExport(LocalStateHolder<LuaConfigItems>& luaconfsLocal)
1015 {
1016 if (!luaconfsLocal->frameStreamExportConfig.enabled) {
1017 if (t_frameStreamServers) {
1018 // dt's take care of cleanup
1019 t_frameStreamServers.reset();
1020 }
1021
1022 return false;
1023 }
1024
1025 /* if the server was not running, or if it was running according to a
1026 previous configuration */
1027 if (!t_frameStreamServers ||
1028 t_frameStreamServersGeneration < luaconfsLocal->generation) {
1029
1030 if (t_frameStreamServers) {
1031 // dt's take care of cleanup
1032 t_frameStreamServers.reset();
1033 }
1034
1035 t_frameStreamServers = startFrameStreamServers(luaconfsLocal->frameStreamExportConfig);
1036 t_frameStreamServersGeneration = luaconfsLocal->generation;
1037 }
1038
1039 return true;
1040 }
1041 #endif /* HAVE_FSTRM */
1042 #endif /* HAVE_PROTOBUF */
1043
1044 #ifdef NOD_ENABLED
1045 static bool nodCheckNewDomain(const DNSName& dname)
1046 {
1047 static const QType qt(QType::A);
1048 static const uint16_t qc(QClass::IN);
1049 bool ret = false;
1050 // First check the (sub)domain isn't whitelisted for NOD purposes
1051 if (!g_nodDomainWL.check(dname)) {
1052 // Now check the NODDB (note this is probablistic so can have FNs/FPs)
1053 if (t_nodDBp && t_nodDBp->isNewDomain(dname)) {
1054 if (g_nodLog) {
1055 // This should probably log to a dedicated log file
1056 g_log<<Logger::Notice<<"Newly observed domain nod="<<dname.toLogString()<<endl;
1057 }
1058 if (!(g_nodLookupDomain.isRoot())) {
1059 // Send a DNS A query to <domain>.g_nodLookupDomain
1060 DNSName qname = dname;
1061 vector<DNSRecord> dummy;
1062 qname += g_nodLookupDomain;
1063 directResolve(qname, qt, qc, dummy);
1064 }
1065 ret = true;
1066 }
1067 }
1068 return ret;
1069 }
1070
1071 static void nodAddDomain(const DNSName& dname)
1072 {
1073 // Don't bother adding domains on the nod whitelist
1074 if (!g_nodDomainWL.check(dname)) {
1075 if (t_nodDBp) {
1076 // This keeps the nod info up to date
1077 t_nodDBp->addDomain(dname);
1078 }
1079 }
1080 }
1081
1082 static bool udrCheckUniqueDNSRecord(const DNSName& dname, uint16_t qtype, const DNSRecord& record)
1083 {
1084 bool ret = false;
1085 if (record.d_place == DNSResourceRecord::ANSWER ||
1086 record.d_place == DNSResourceRecord::ADDITIONAL) {
1087 // Create a string that represent a triplet of (qname, qtype and RR[type, name, content])
1088 std::stringstream ss;
1089 ss << dname.toDNSStringLC() << ":" << qtype << ":" << qtype << ":" << record.d_type << ":" << record.d_name.toDNSStringLC() << ":" << record.d_content->getZoneRepresentation();
1090 if (t_udrDBp && t_udrDBp->isUniqueResponse(ss.str())) {
1091 if (g_udrLog) {
1092 // This should also probably log to a dedicated file.
1093 g_log<<Logger::Notice<<"Unique response observed: qname="<<dname.toLogString()<<" qtype="<<QType(qtype).getName()<< " rrtype=" << QType(record.d_type).getName() << " rrname=" << record.d_name.toLogString() << " rrcontent=" << record.d_content->getZoneRepresentation() << endl;
1094 }
1095 ret = true;
1096 }
1097 }
1098 return ret;
1099 }
1100 #endif /* NOD_ENABLED */
1101
1102 static void startDoResolve(void *p)
1103 {
1104 auto dc=std::unique_ptr<DNSComboWriter>(reinterpret_cast<DNSComboWriter*>(p));
1105 try {
1106 if (t_queryring)
1107 t_queryring->push_back(make_pair(dc->d_mdp.d_qname, dc->d_mdp.d_qtype));
1108
1109 uint16_t maxanswersize = dc->d_tcp ? 65535 : min(static_cast<uint16_t>(512), g_udpTruncationThreshold);
1110 EDNSOpts edo;
1111 std::vector<pair<uint16_t, string> > ednsOpts;
1112 bool variableAnswer = dc->d_variable;
1113 bool haveEDNS=false;
1114 #ifdef NOD_ENABLED
1115 bool hasUDR = false;
1116 #endif /* NOD_ENABLED */
1117 DNSPacketWriter::optvect_t returnedEdnsOptions; // Here we stuff all the options for the return packet
1118 uint8_t ednsExtRCode = 0;
1119 if(getEDNSOpts(dc->d_mdp, &edo)) {
1120 haveEDNS=true;
1121 if (edo.d_version != 0) {
1122 ednsExtRCode = ERCode::BADVERS;
1123 }
1124
1125 if(!dc->d_tcp) {
1126 /* rfc6891 6.2.3:
1127 "Values lower than 512 MUST be treated as equal to 512."
1128 */
1129 maxanswersize = min(static_cast<uint16_t>(edo.d_packetsize >= 512 ? edo.d_packetsize : 512), g_udpTruncationThreshold);
1130 }
1131 ednsOpts = edo.d_options;
1132 maxanswersize -= 11; // EDNS header size
1133
1134 for (const auto& o : edo.d_options) {
1135 if (o.first == EDNSOptionCode::ECS && g_useIncomingECS && !dc->d_ecsParsed) {
1136 dc->d_ecsFound = getEDNSSubnetOptsFromString(o.second, &dc->d_ednssubnet);
1137 } else if (o.first == EDNSOptionCode::NSID) {
1138 const static string mode_server_id = ::arg()["server-id"];
1139 if(mode_server_id != "disabled" && !mode_server_id.empty() &&
1140 maxanswersize > (2 + 2 + mode_server_id.size())) {
1141 returnedEdnsOptions.push_back(make_pair(EDNSOptionCode::NSID, mode_server_id));
1142 variableAnswer = true; // Can't packetcache an answer with NSID
1143 // Option Code and Option Length are both 2
1144 maxanswersize -= 2 + 2 + mode_server_id.size();
1145 }
1146 }
1147 }
1148 }
1149 /* perhaps there was no EDNS or no ECS but by now we looked */
1150 dc->d_ecsParsed = true;
1151 vector<DNSRecord> ret;
1152 vector<uint8_t> packet;
1153
1154 auto luaconfsLocal = g_luaconfs.getLocal();
1155 // Used to tell syncres later on if we should apply NSDNAME and NSIP RPZ triggers for this query
1156 bool wantsRPZ(true);
1157 boost::optional<RecProtoBufMessage> pbMessage(boost::none);
1158 bool logResponse = false;
1159 #ifdef HAVE_PROTOBUF
1160 if (checkProtobufExport(luaconfsLocal)) {
1161 logResponse = t_protobufServers && luaconfsLocal->protobufExportConfig.logResponses;
1162 Netmask requestorNM(dc->d_source, dc->d_source.sin4.sin_family == AF_INET ? luaconfsLocal->protobufMaskV4 : luaconfsLocal->protobufMaskV6);
1163 const ComboAddress& requestor = requestorNM.getMaskedNetwork();
1164 pbMessage = RecProtoBufMessage(RecProtoBufMessage::Response, dc->d_uuid, &requestor, &dc->d_destination, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_mdp.d_qclass, dc->d_mdp.d_header.id, dc->d_tcp, 0);
1165 pbMessage->setServerIdentity(SyncRes::s_serverID);
1166 pbMessage->setEDNSSubnet(dc->d_ednssubnet.source, dc->d_ednssubnet.source.isIpv4() ? luaconfsLocal->protobufMaskV4 : luaconfsLocal->protobufMaskV6);
1167 }
1168 #endif /* HAVE_PROTOBUF */
1169
1170 #ifdef HAVE_FSTRM
1171 checkFrameStreamExport(luaconfsLocal);
1172 #endif
1173
1174 DNSPacketWriter pw(packet, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_mdp.d_qclass);
1175
1176 pw.getHeader()->aa=0;
1177 pw.getHeader()->ra=1;
1178 pw.getHeader()->qr=1;
1179 pw.getHeader()->tc=0;
1180 pw.getHeader()->id=dc->d_mdp.d_header.id;
1181 pw.getHeader()->rd=dc->d_mdp.d_header.rd;
1182 pw.getHeader()->cd=dc->d_mdp.d_header.cd;
1183
1184 /* This is the lowest TTL seen in the records of the response,
1185 so we can't cache it for longer than this value.
1186 If we have a TTL cap, this value can't be larger than the
1187 cap no matter what. */
1188 uint32_t minTTL = dc->d_ttlCap;
1189
1190 SyncRes sr(dc->d_now);
1191
1192 bool DNSSECOK=false;
1193 if(t_pdl) {
1194 sr.setLuaEngine(t_pdl);
1195 }
1196 if(g_dnssecmode != DNSSECMode::Off) {
1197 sr.setDoDNSSEC(true);
1198
1199 // Does the requestor want DNSSEC records?
1200 if(edo.d_extFlags & EDNSOpts::DNSSECOK) {
1201 DNSSECOK=true;
1202 g_stats.dnssecQueries++;
1203 }
1204 if (dc->d_mdp.d_header.cd) {
1205 /* Per rfc6840 section 5.9, "When processing a request with
1206 the Checking Disabled (CD) bit set, a resolver SHOULD attempt
1207 to return all response data, even data that has failed DNSSEC
1208 validation. */
1209 ++g_stats.dnssecCheckDisabledQueries;
1210 }
1211 if (dc->d_mdp.d_header.ad) {
1212 /* Per rfc6840 section 5.7, "the AD bit in a query as a signal
1213 indicating that the requester understands and is interested in the
1214 value of the AD bit in the response. This allows a requester to
1215 indicate that it understands the AD bit without also requesting
1216 DNSSEC data via the DO bit. */
1217 ++g_stats.dnssecAuthenticDataQueries;
1218 }
1219 } else {
1220 // Ignore the client-set CD flag
1221 pw.getHeader()->cd=0;
1222 }
1223 sr.setDNSSECValidationRequested(g_dnssecmode == DNSSECMode::ValidateAll || g_dnssecmode==DNSSECMode::ValidateForLog || ((dc->d_mdp.d_header.ad || DNSSECOK) && g_dnssecmode==DNSSECMode::Process));
1224
1225 #ifdef HAVE_PROTOBUF
1226 sr.setInitialRequestId(dc->d_uuid);
1227 sr.setOutgoingProtobufServers(t_outgoingProtobufServers);
1228 #endif
1229 #ifdef HAVE_FSTRM
1230 sr.setFrameStreamServers(t_frameStreamServers);
1231 #endif
1232 sr.setQuerySource(dc->d_remote, g_useIncomingECS && !dc->d_ednssubnet.source.empty() ? boost::optional<const EDNSSubnetOpts&>(dc->d_ednssubnet) : boost::none);
1233
1234 bool tracedQuery=false; // we could consider letting Lua know about this too
1235 bool shouldNotValidate = false;
1236
1237 /* preresolve expects res (dq.rcode) to be set to RCode::NoError by default */
1238 int res = RCode::NoError;
1239 DNSFilterEngine::Policy appliedPolicy;
1240 std::vector<DNSRecord> spoofed;
1241 RecursorLua4::DNSQuestion dq(dc->d_source, dc->d_destination, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_tcp, variableAnswer, wantsRPZ, logResponse);
1242 dq.ednsFlags = &edo.d_extFlags;
1243 dq.ednsOptions = &ednsOpts;
1244 dq.tag = dc->d_tag;
1245 dq.discardedPolicies = &sr.d_discardedPolicies;
1246 dq.policyTags = &dc->d_policyTags;
1247 dq.appliedPolicy = &appliedPolicy;
1248 dq.currentRecords = &ret;
1249 dq.dh = &dc->d_mdp.d_header;
1250 dq.data = dc->d_data;
1251 #ifdef HAVE_PROTOBUF
1252 dq.requestorId = dc->d_requestorId;
1253 dq.deviceId = dc->d_deviceId;
1254 dq.deviceName = dc->d_deviceName;
1255 #endif
1256
1257 if(ednsExtRCode != 0) {
1258 goto sendit;
1259 }
1260
1261 if(dc->d_mdp.d_qtype==QType::ANY && !dc->d_tcp && g_anyToTcp) {
1262 pw.getHeader()->tc = 1;
1263 res = 0;
1264 variableAnswer = true;
1265 goto sendit;
1266 }
1267
1268 if(t_traceRegex && t_traceRegex->match(dc->d_mdp.d_qname.toString())) {
1269 sr.setLogMode(SyncRes::Store);
1270 tracedQuery=true;
1271 }
1272
1273
1274 if(!g_quiet || tracedQuery) {
1275 g_log<<Logger::Warning<<t_id<<" ["<<MT->getTid()<<"/"<<MT->numProcesses()<<"] " << (dc->d_tcp ? "TCP " : "") << "question for '"<<dc->d_mdp.d_qname<<"|"
1276 <<DNSRecordContent::NumberToType(dc->d_mdp.d_qtype)<<"' from "<<dc->getRemote();
1277 if(!dc->d_ednssubnet.source.empty()) {
1278 g_log<<" (ecs "<<dc->d_ednssubnet.source.toString()<<")";
1279 }
1280 g_log<<endl;
1281 }
1282
1283 sr.setId(MT->getTid());
1284 if(!dc->d_mdp.d_header.rd)
1285 sr.setCacheOnly();
1286
1287 if (t_pdl) {
1288 t_pdl->prerpz(dq, res);
1289 }
1290
1291 // Check if the query has a policy attached to it
1292 if (wantsRPZ) {
1293 appliedPolicy = luaconfsLocal->dfe.getQueryPolicy(dc->d_mdp.d_qname, dc->d_source, sr.d_discardedPolicies);
1294 }
1295
1296 // if there is a RecursorLua active, and it 'took' the query in preResolve, we don't launch beginResolve
1297 if(!t_pdl || !t_pdl->preresolve(dq, res)) {
1298
1299 sr.setWantsRPZ(wantsRPZ);
1300 if(wantsRPZ) {
1301 switch(appliedPolicy.d_kind) {
1302 case DNSFilterEngine::PolicyKind::NoAction:
1303 break;
1304 case DNSFilterEngine::PolicyKind::Drop:
1305 g_stats.policyDrops++;
1306 g_stats.policyResults[appliedPolicy.d_kind]++;
1307 return;
1308 case DNSFilterEngine::PolicyKind::NXDOMAIN:
1309 g_stats.policyResults[appliedPolicy.d_kind]++;
1310 res=RCode::NXDomain;
1311 goto haveAnswer;
1312 case DNSFilterEngine::PolicyKind::NODATA:
1313 g_stats.policyResults[appliedPolicy.d_kind]++;
1314 res=RCode::NoError;
1315 goto haveAnswer;
1316 case DNSFilterEngine::PolicyKind::Custom:
1317 g_stats.policyResults[appliedPolicy.d_kind]++;
1318 res=RCode::NoError;
1319 spoofed=appliedPolicy.getCustomRecords(dc->d_mdp.d_qname, dc->d_mdp.d_qtype);
1320 for (const auto& dr : spoofed) {
1321 ret.push_back(dr);
1322 handleRPZCustom(dr, QType(dc->d_mdp.d_qtype), sr, res, ret);
1323 }
1324 goto haveAnswer;
1325 case DNSFilterEngine::PolicyKind::Truncate:
1326 if(!dc->d_tcp) {
1327 g_stats.policyResults[appliedPolicy.d_kind]++;
1328 res=RCode::NoError;
1329 pw.getHeader()->tc=1;
1330 goto haveAnswer;
1331 }
1332 break;
1333 }
1334 }
1335
1336 // Query got not handled for QNAME Policy reasons, now actually go out to find an answer
1337 try {
1338 res = sr.beginResolve(dc->d_mdp.d_qname, QType(dc->d_mdp.d_qtype), dc->d_mdp.d_qclass, ret);
1339 shouldNotValidate = sr.wasOutOfBand();
1340 }
1341 catch(ImmediateServFailException &e) {
1342 if(g_logCommonErrors)
1343 g_log<<Logger::Notice<<"Sending SERVFAIL to "<<dc->getRemote()<<" during resolve of '"<<dc->d_mdp.d_qname<<"' because: "<<e.reason<<endl;
1344 res = RCode::ServFail;
1345 }
1346
1347 dq.validationState = sr.getValidationState();
1348
1349 // During lookup, an NSDNAME or NSIP trigger was hit in RPZ
1350 if (res == -2) { // XXX This block should be macro'd, it is repeated post-resolve.
1351 appliedPolicy = sr.d_appliedPolicy;
1352 g_stats.policyResults[appliedPolicy.d_kind]++;
1353 switch(appliedPolicy.d_kind) {
1354 case DNSFilterEngine::PolicyKind::NoAction: // This can never happen
1355 throw PDNSException("NoAction policy returned while a NSDNAME or NSIP trigger was hit");
1356 case DNSFilterEngine::PolicyKind::Drop:
1357 g_stats.policyDrops++;
1358 return;
1359 case DNSFilterEngine::PolicyKind::NXDOMAIN:
1360 ret.clear();
1361 res=RCode::NXDomain;
1362 goto haveAnswer;
1363
1364 case DNSFilterEngine::PolicyKind::NODATA:
1365 ret.clear();
1366 res=RCode::NoError;
1367 goto haveAnswer;
1368
1369 case DNSFilterEngine::PolicyKind::Truncate:
1370 if(!dc->d_tcp) {
1371 ret.clear();
1372 res=RCode::NoError;
1373 pw.getHeader()->tc=1;
1374 goto haveAnswer;
1375 }
1376 break;
1377
1378 case DNSFilterEngine::PolicyKind::Custom:
1379 ret.clear();
1380 res=RCode::NoError;
1381 spoofed=appliedPolicy.getCustomRecords(dc->d_mdp.d_qname, dc->d_mdp.d_qtype);
1382 for (const auto& dr : spoofed) {
1383 ret.push_back(dr);
1384 handleRPZCustom(dr, QType(dc->d_mdp.d_qtype), sr, res, ret);
1385 }
1386 goto haveAnswer;
1387 }
1388 }
1389
1390 if (wantsRPZ) {
1391 appliedPolicy = luaconfsLocal->dfe.getPostPolicy(ret, sr.d_discardedPolicies);
1392 }
1393
1394 if(t_pdl) {
1395 if(res == RCode::NoError) {
1396 auto i=ret.cbegin();
1397 for(; i!= ret.cend(); ++i)
1398 if(i->d_type == dc->d_mdp.d_qtype && i->d_place == DNSResourceRecord::ANSWER)
1399 break;
1400 if(i == ret.cend() && t_pdl->nodata(dq, res))
1401 shouldNotValidate = true;
1402
1403 }
1404 else if(res == RCode::NXDomain && t_pdl->nxdomain(dq, res))
1405 shouldNotValidate = true;
1406
1407 if(t_pdl->postresolve(dq, res))
1408 shouldNotValidate = true;
1409 }
1410
1411 if (wantsRPZ) { //XXX This block is repeated, see above
1412 g_stats.policyResults[appliedPolicy.d_kind]++;
1413 switch(appliedPolicy.d_kind) {
1414 case DNSFilterEngine::PolicyKind::NoAction:
1415 break;
1416 case DNSFilterEngine::PolicyKind::Drop:
1417 g_stats.policyDrops++;
1418 return;
1419 case DNSFilterEngine::PolicyKind::NXDOMAIN:
1420 ret.clear();
1421 res=RCode::NXDomain;
1422 goto haveAnswer;
1423
1424 case DNSFilterEngine::PolicyKind::NODATA:
1425 ret.clear();
1426 res=RCode::NoError;
1427 goto haveAnswer;
1428
1429 case DNSFilterEngine::PolicyKind::Truncate:
1430 if(!dc->d_tcp) {
1431 ret.clear();
1432 res=RCode::NoError;
1433 pw.getHeader()->tc=1;
1434 goto haveAnswer;
1435 }
1436 break;
1437
1438 case DNSFilterEngine::PolicyKind::Custom:
1439 ret.clear();
1440 res=RCode::NoError;
1441 spoofed=appliedPolicy.getCustomRecords(dc->d_mdp.d_qname, dc->d_mdp.d_qtype);
1442 for (const auto& dr : spoofed) {
1443 ret.push_back(dr);
1444 handleRPZCustom(dr, QType(dc->d_mdp.d_qtype), sr, res, ret);
1445 }
1446 goto haveAnswer;
1447 }
1448 }
1449 }
1450 haveAnswer:;
1451 if(res == PolicyDecision::DROP) {
1452 g_stats.policyDrops++;
1453 return;
1454 }
1455 if(tracedQuery || res == -1 || res == RCode::ServFail || pw.getHeader()->rcode == RCode::ServFail)
1456 {
1457 string trace(sr.getTrace());
1458 if(!trace.empty()) {
1459 vector<string> lines;
1460 boost::split(lines, trace, boost::is_any_of("\n"));
1461 for(const string& line : lines) {
1462 if(!line.empty())
1463 g_log<<Logger::Warning<< line << endl;
1464 }
1465 }
1466 }
1467
1468 if(res == -1) {
1469 pw.getHeader()->rcode=RCode::ServFail;
1470 // no commit here, because no record
1471 g_stats.servFails++;
1472 }
1473 else {
1474 pw.getHeader()->rcode=res;
1475
1476 // Does the validation mode or query demand validation?
1477 if(!shouldNotValidate && sr.isDNSSECValidationRequested()) {
1478 try {
1479 if(sr.doLog()) {
1480 g_log<<Logger::Warning<<"Starting validation of answer to "<<dc->d_mdp.d_qname<<"|"<<QType(dc->d_mdp.d_qtype).getName()<<" for "<<dc->getRemote()<<endl;
1481 }
1482
1483 auto state = sr.getValidationState();
1484
1485 if(state == Secure) {
1486 if(sr.doLog()) {
1487 g_log<<Logger::Warning<<"Answer to "<<dc->d_mdp.d_qname<<"|"<<QType(dc->d_mdp.d_qtype).getName()<<" for "<<dc->getRemote()<<" validates correctly"<<endl;
1488 }
1489
1490 // Is the query source interested in the value of the ad-bit?
1491 if (dc->d_mdp.d_header.ad || DNSSECOK)
1492 pw.getHeader()->ad=1;
1493 }
1494 else if(state == Insecure) {
1495 if(sr.doLog()) {
1496 g_log<<Logger::Warning<<"Answer to "<<dc->d_mdp.d_qname<<"|"<<QType(dc->d_mdp.d_qtype).getName()<<" for "<<dc->getRemote()<<" validates as Insecure"<<endl;
1497 }
1498
1499 pw.getHeader()->ad=0;
1500 }
1501 else if(state == Bogus) {
1502 if(t_bogusremotes)
1503 t_bogusremotes->push_back(dc->d_source);
1504 if(t_bogusqueryring)
1505 t_bogusqueryring->push_back(make_pair(dc->d_mdp.d_qname, dc->d_mdp.d_qtype));
1506 if(g_dnssecLogBogus || sr.doLog() || g_dnssecmode == DNSSECMode::ValidateForLog) {
1507 g_log<<Logger::Warning<<"Answer to "<<dc->d_mdp.d_qname<<"|"<<QType(dc->d_mdp.d_qtype).getName()<<" for "<<dc->getRemote()<<" validates as Bogus"<<endl;
1508 }
1509
1510 // Does the query or validation mode sending out a SERVFAIL on validation errors?
1511 if(!pw.getHeader()->cd && (g_dnssecmode == DNSSECMode::ValidateAll || dc->d_mdp.d_header.ad || DNSSECOK)) {
1512 if(sr.doLog()) {
1513 g_log<<Logger::Warning<<"Sending out SERVFAIL for "<<dc->d_mdp.d_qname<<"|"<<QType(dc->d_mdp.d_qtype).getName()<<" because recursor or query demands it for Bogus results"<<endl;
1514 }
1515
1516 pw.getHeader()->rcode=RCode::ServFail;
1517 goto sendit;
1518 } else {
1519 if(sr.doLog()) {
1520 g_log<<Logger::Warning<<"Not sending out SERVFAIL for "<<dc->d_mdp.d_qname<<"|"<<QType(dc->d_mdp.d_qtype).getName()<<" Bogus validation since neither config nor query demands this"<<endl;
1521 }
1522 }
1523 }
1524 }
1525 catch(ImmediateServFailException &e) {
1526 if(g_logCommonErrors)
1527 g_log<<Logger::Notice<<"Sending SERVFAIL to "<<dc->getRemote()<<" during validation of '"<<dc->d_mdp.d_qname<<"|"<<QType(dc->d_mdp.d_qtype).getName()<<"' because: "<<e.reason<<endl;
1528 pw.getHeader()->rcode=RCode::ServFail;
1529 goto sendit;
1530 }
1531 }
1532
1533 if(ret.size()) {
1534 orderAndShuffle(ret);
1535 if(auto sl = luaconfsLocal->sortlist.getOrderCmp(dc->d_source)) {
1536 stable_sort(ret.begin(), ret.end(), *sl);
1537 variableAnswer=true;
1538 }
1539 }
1540
1541 bool needCommit = false;
1542 for(auto i=ret.cbegin(); i!=ret.cend(); ++i) {
1543 if( ! DNSSECOK &&
1544 ( i->d_type == QType::NSEC3 ||
1545 (
1546 ( i->d_type == QType::RRSIG || i->d_type==QType::NSEC ) &&
1547 (
1548 ( dc->d_mdp.d_qtype != i->d_type && dc->d_mdp.d_qtype != QType::ANY ) ||
1549 i->d_place != DNSResourceRecord::ANSWER
1550 )
1551 )
1552 )
1553 ) {
1554 continue;
1555 }
1556
1557 if (!addRecordToPacket(pw, *i, minTTL, dc->d_ttlCap, maxanswersize)) {
1558 needCommit = false;
1559 break;
1560 }
1561 needCommit = true;
1562
1563 #ifdef NOD_ENABLED
1564 bool udr = false;
1565 if (g_udrEnabled) {
1566 udr = udrCheckUniqueDNSRecord(dc->d_mdp.d_qname, dc->d_mdp.d_qtype, *i);
1567 if (!hasUDR && udr)
1568 hasUDR = true;
1569 }
1570 #endif /* NOD ENABLED */
1571
1572 #ifdef HAVE_PROTOBUF
1573 if (t_protobufServers) {
1574 #ifdef NOD_ENABLED
1575 pbMessage->addRR(*i, luaconfsLocal->protobufExportConfig.exportTypes, udr);
1576 #else
1577 pbMessage->addRR(*i, luaconfsLocal->protobufExportConfig.exportTypes);
1578 #endif /* NOD_ENABLED */
1579 }
1580 #endif
1581 }
1582 if(needCommit)
1583 pw.commit();
1584 }
1585 sendit:;
1586
1587 if(g_useIncomingECS && dc->d_ecsFound && !sr.wasVariable() && !variableAnswer) {
1588 // cerr<<"Stuffing in a 0 scope because answer is static"<<endl;
1589 EDNSSubnetOpts eo;
1590 eo.source = dc->d_ednssubnet.source;
1591 ComboAddress sa;
1592 sa.reset();
1593 sa.sin4.sin_family = eo.source.getNetwork().sin4.sin_family;
1594 eo.scope = Netmask(sa, 0);
1595
1596 returnedEdnsOptions.push_back(make_pair(EDNSOptionCode::ECS, makeEDNSSubnetOptsString(eo)));
1597 }
1598
1599 if (haveEDNS) {
1600 /* we try to add the EDNS OPT RR even for truncated answers,
1601 as rfc6891 states:
1602 "The minimal response MUST be the DNS header, question section, and an
1603 OPT record. This MUST also occur when a truncated response (using
1604 the DNS header's TC bit) is returned."
1605 */
1606 pw.addOpt(512, ednsExtRCode, DNSSECOK ? EDNSOpts::DNSSECOK : 0, returnedEdnsOptions);
1607 pw.commit();
1608 }
1609
1610 g_rs.submitResponse(dc->d_mdp.d_qtype, packet.size(), !dc->d_tcp);
1611 updateResponseStats(res, dc->d_source, packet.size(), &dc->d_mdp.d_qname, dc->d_mdp.d_qtype);
1612 #ifdef NOD_ENABLED
1613 bool nod = false;
1614 if (g_nodEnabled) {
1615 if (nodCheckNewDomain(dc->d_mdp.d_qname))
1616 nod = true;
1617 }
1618 #endif /* NOD_ENABLED */
1619 #ifdef HAVE_PROTOBUF
1620 if (t_protobufServers && logResponse && !(luaconfsLocal->protobufExportConfig.taggedOnly && (!appliedPolicy.d_name || appliedPolicy.d_name->empty()) && dc->d_policyTags.empty())) {
1621 pbMessage->setBytes(packet.size());
1622 pbMessage->setResponseCode(pw.getHeader()->rcode);
1623 if (appliedPolicy.d_name) {
1624 pbMessage->setAppliedPolicy(*appliedPolicy.d_name);
1625 pbMessage->setAppliedPolicyType(appliedPolicy.d_type);
1626 }
1627 pbMessage->setPolicyTags(dc->d_policyTags);
1628 if (g_useKernelTimestamp && dc->d_kernelTimestamp.tv_sec) {
1629 pbMessage->setQueryTime(dc->d_kernelTimestamp.tv_sec, dc->d_kernelTimestamp.tv_usec);
1630 }
1631 else {
1632 pbMessage->setQueryTime(dc->d_now.tv_sec, dc->d_now.tv_usec);
1633 }
1634 pbMessage->setRequestorId(dq.requestorId);
1635 pbMessage->setDeviceId(dq.deviceId);
1636 pbMessage->setDeviceName(dq.deviceName);
1637 #ifdef NOD_ENABLED
1638 if (g_nodEnabled) {
1639 if (nod) {
1640 pbMessage->setNOD(true);
1641 pbMessage->addPolicyTag(g_nod_pbtag);
1642 }
1643 if (hasUDR) {
1644 pbMessage->addPolicyTag(g_udr_pbtag);
1645 }
1646 }
1647 #endif /* NOD_ENABLED */
1648 protobufLogResponse(*pbMessage);
1649 #ifdef NOD_ENABLED
1650 if (g_nodEnabled) {
1651 pbMessage->setNOD(false);
1652 pbMessage->clearUDR();
1653 if (nod)
1654 pbMessage->removePolicyTag(g_nod_pbtag);
1655 if (hasUDR)
1656 pbMessage->removePolicyTag(g_udr_pbtag);
1657 }
1658 #endif /* NOD_ENABLED */
1659 }
1660 #endif
1661 if(!dc->d_tcp) {
1662 struct msghdr msgh;
1663 struct iovec iov;
1664 cmsgbuf_aligned cbuf;
1665 fillMSGHdr(&msgh, &iov, &cbuf, 0, (char*)&*packet.begin(), packet.size(), &dc->d_remote);
1666 msgh.msg_control=NULL;
1667
1668 if(g_fromtosockets.count(dc->d_socket)) {
1669 addCMsgSrcAddr(&msgh, &cbuf, &dc->d_local, 0);
1670 }
1671 if(sendmsg(dc->d_socket, &msgh, 0) < 0 && g_logCommonErrors)
1672 g_log<<Logger::Warning<<"Sending UDP reply to client "<<dc->getRemote()<<" failed with: "<<strerror(errno)<<endl;
1673
1674 if(variableAnswer || sr.wasVariable()) {
1675 g_stats.variableResponses++;
1676 }
1677 if(!SyncRes::s_nopacketcache && !variableAnswer && !sr.wasVariable() ) {
1678 t_packetCache->insertResponsePacket(dc->d_tag, dc->d_qhash, std::move(dc->d_query), dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_mdp.d_qclass,
1679 string((const char*)&*packet.begin(), packet.size()),
1680 g_now.tv_sec,
1681 pw.getHeader()->rcode == RCode::ServFail ? SyncRes::s_packetcacheservfailttl :
1682 min(minTTL,SyncRes::s_packetcachettl),
1683 dq.validationState,
1684 dc->d_ecsBegin,
1685 dc->d_ecsEnd,
1686 std::move(pbMessage));
1687 }
1688 // else cerr<<"Not putting in packet cache: "<<sr.wasVariable()<<endl;
1689 }
1690 else {
1691 char buf[2];
1692 buf[0]=packet.size()/256;
1693 buf[1]=packet.size()%256;
1694
1695 Utility::iovec iov[2];
1696
1697 iov[0].iov_base=(void*)buf; iov[0].iov_len=2;
1698 iov[1].iov_base=(void*)&*packet.begin(); iov[1].iov_len = packet.size();
1699
1700 int wret=Utility::writev(dc->d_socket, iov, 2);
1701 bool hadError=true;
1702
1703 if(wret == 0)
1704 g_log<<Logger::Error<<"EOF writing TCP answer to "<<dc->getRemote()<<endl;
1705 else if(wret < 0 )
1706 g_log<<Logger::Error<<"Error writing TCP answer to "<<dc->getRemote()<<": "<< strerror(errno) <<endl;
1707 else if((unsigned int)wret != 2 + packet.size())
1708 g_log<<Logger::Error<<"Oops, partial answer sent to "<<dc->getRemote()<<" for "<<dc->d_mdp.d_qname<<" (size="<< (2 + packet.size()) <<", sent "<<wret<<")"<<endl;
1709 else
1710 hadError=false;
1711
1712 // update tcp connection status, either by closing or moving to 'BYTE0'
1713
1714 if(hadError) {
1715 // no need to remove us from FDM, we weren't there
1716 dc->d_socket = -1;
1717 }
1718 else {
1719 dc->d_tcpConnection->queriesCount++;
1720 if (g_tcpMaxQueriesPerConn && dc->d_tcpConnection->queriesCount >= g_tcpMaxQueriesPerConn) {
1721 dc->d_socket = -1;
1722 }
1723 else {
1724 dc->d_tcpConnection->state=TCPConnection::BYTE0;
1725 Utility::gettimeofday(&g_now, 0); // needs to be updated
1726 struct timeval ttd = g_now;
1727 ttd.tv_sec += g_tcpTimeout;
1728
1729 t_fdm->addReadFD(dc->d_socket, handleRunningTCPQuestion, dc->d_tcpConnection, &ttd);
1730 }
1731 }
1732 }
1733 float spent=makeFloat(sr.getNow()-dc->d_now);
1734 if(!g_quiet) {
1735 g_log<<Logger::Error<<t_id<<" ["<<MT->getTid()<<"/"<<MT->numProcesses()<<"] answer to "<<(dc->d_mdp.d_header.rd?"":"non-rd ")<<"question '"<<dc->d_mdp.d_qname<<"|"<<DNSRecordContent::NumberToType(dc->d_mdp.d_qtype);
1736 g_log<<"': "<<ntohs(pw.getHeader()->ancount)<<" answers, "<<ntohs(pw.getHeader()->arcount)<<" additional, took "<<sr.d_outqueries<<" packets, "<<
1737 sr.d_totUsec/1000.0<<" netw ms, "<< spent*1000.0<<" tot ms, "<<
1738 sr.d_throttledqueries<<" throttled, "<<sr.d_timeouts<<" timeouts, "<<sr.d_tcpoutqueries<<" tcp connections, rcode="<< res;
1739
1740 if(!shouldNotValidate && sr.isDNSSECValidationRequested()) {
1741 g_log<< ", dnssec="<<vStates[sr.getValidationState()];
1742 }
1743
1744 g_log<<endl;
1745
1746 }
1747
1748 if (sr.d_outqueries || sr.d_authzonequeries) {
1749 t_RC->cacheMisses++;
1750 }
1751 else {
1752 t_RC->cacheHits++;
1753 }
1754
1755 if(spent < 0.001)
1756 g_stats.answers0_1++;
1757 else if(spent < 0.010)
1758 g_stats.answers1_10++;
1759 else if(spent < 0.1)
1760 g_stats.answers10_100++;
1761 else if(spent < 1.0)
1762 g_stats.answers100_1000++;
1763 else
1764 g_stats.answersSlow++;
1765
1766 uint64_t newLat=(uint64_t)(spent*1000000);
1767 newLat = min(newLat,(uint64_t)(((uint64_t) g_networkTimeoutMsec)*1000)); // outliers of several minutes exist..
1768 g_stats.avgLatencyUsec=(1-1.0/g_latencyStatSize)*g_stats.avgLatencyUsec + (float)newLat/g_latencyStatSize;
1769 // no worries, we do this for packet cache hits elsewhere
1770
1771 auto ourtime = 1000.0*spent-sr.d_totUsec/1000.0; // in msec
1772 if(ourtime < 1)
1773 g_stats.ourtime0_1++;
1774 else if(ourtime < 2)
1775 g_stats.ourtime1_2++;
1776 else if(ourtime < 4)
1777 g_stats.ourtime2_4++;
1778 else if(ourtime < 8)
1779 g_stats.ourtime4_8++;
1780 else if(ourtime < 16)
1781 g_stats.ourtime8_16++;
1782 else if(ourtime < 32)
1783 g_stats.ourtime16_32++;
1784 else {
1785 // cerr<<"SLOW: "<<ourtime<<"ms -> "<<dc->d_mdp.d_qname<<"|"<<DNSRecordContent::NumberToType(dc->d_mdp.d_qtype)<<endl;
1786 g_stats.ourtimeSlow++;
1787 }
1788 if(ourtime >= 0.0) {
1789 newLat=ourtime*1000; // usec
1790 g_stats.avgLatencyOursUsec=(1-1.0/g_latencyStatSize)*g_stats.avgLatencyOursUsec + (float)newLat/g_latencyStatSize;
1791 }
1792 // cout<<dc->d_mdp.d_qname<<"\t"<<MT->getUsec()<<"\t"<<sr.d_outqueries<<endl;
1793 }
1794 catch(PDNSException &ae) {
1795 g_log<<Logger::Error<<"startDoResolve problem "<<makeLoginfo(dc)<<": "<<ae.reason<<endl;
1796 }
1797 catch(const MOADNSException &mde) {
1798 g_log<<Logger::Error<<"DNS parser error "<<makeLoginfo(dc) <<": "<<dc->d_mdp.d_qname<<", "<<mde.what()<<endl;
1799 }
1800 catch(std::exception& e) {
1801 g_log<<Logger::Error<<"STL error "<< makeLoginfo(dc)<<": "<<e.what();
1802
1803 // Luawrapper nests the exception from Lua, so we unnest it here
1804 try {
1805 std::rethrow_if_nested(e);
1806 } catch(const std::exception& ne) {
1807 g_log<<". Extra info: "<<ne.what();
1808 } catch(...) {}
1809
1810 g_log<<endl;
1811 }
1812 catch(...) {
1813 g_log<<Logger::Error<<"Any other exception in a resolver context "<< makeLoginfo(dc) <<endl;
1814 }
1815
1816 g_stats.maxMThreadStackUsage = max(MT->getMaxStackUsage(), g_stats.maxMThreadStackUsage);
1817 }
1818
1819 static void makeControlChannelSocket(int processNum=-1)
1820 {
1821 string sockname=::arg()["socket-dir"]+"/"+s_programname;
1822 if(processNum >= 0)
1823 sockname += "."+std::to_string(processNum);
1824 sockname+=".controlsocket";
1825 s_rcc.listen(sockname);
1826
1827 int sockowner = -1;
1828 int sockgroup = -1;
1829
1830 if (!::arg().isEmpty("socket-group"))
1831 sockgroup=::arg().asGid("socket-group");
1832 if (!::arg().isEmpty("socket-owner"))
1833 sockowner=::arg().asUid("socket-owner");
1834
1835 if (sockgroup > -1 || sockowner > -1) {
1836 if(chown(sockname.c_str(), sockowner, sockgroup) < 0) {
1837 unixDie("Failed to chown control socket");
1838 }
1839 }
1840
1841 // do mode change if socket-mode is given
1842 if(!::arg().isEmpty("socket-mode")) {
1843 mode_t sockmode=::arg().asMode("socket-mode");
1844 if(chmod(sockname.c_str(), sockmode) < 0) {
1845 unixDie("Failed to chmod control socket");
1846 }
1847 }
1848 }
1849
1850 static void getQNameAndSubnet(const std::string& question, DNSName* dnsname, uint16_t* qtype, uint16_t* qclass,
1851 bool& foundECS, EDNSSubnetOpts* ednssubnet, EDNSOptionViewMap* options,
1852 bool& foundXPF, ComboAddress* xpfSource, ComboAddress* xpfDest)
1853 {
1854 const bool lookForXPF = xpfSource != nullptr && g_xpfRRCode != 0;
1855 const bool lookForECS = ednssubnet != nullptr;
1856 const struct dnsheader* dh = reinterpret_cast<const struct dnsheader*>(question.c_str());
1857 size_t questionLen = question.length();
1858 unsigned int consumed=0;
1859 *dnsname=DNSName(question.c_str(), questionLen, sizeof(dnsheader), false, qtype, qclass, &consumed);
1860
1861 size_t pos= sizeof(dnsheader)+consumed+4;
1862 const size_t headerSize = /* root */ 1 + sizeof(dnsrecordheader);
1863 const uint16_t arcount = ntohs(dh->arcount);
1864
1865 for (uint16_t arpos = 0; arpos < arcount && questionLen > (pos + headerSize) && ((lookForECS && !foundECS) || (lookForXPF && !foundXPF)); arpos++) {
1866 if (question.at(pos) != 0) {
1867 /* not an OPT or a XPF, bye. */
1868 return;
1869 }
1870
1871 pos += 1;
1872 const dnsrecordheader* drh = reinterpret_cast<const dnsrecordheader*>(&question.at(pos));
1873 pos += sizeof(dnsrecordheader);
1874
1875 if (pos >= questionLen) {
1876 return;
1877 }
1878
1879 /* OPT root label (1) followed by type (2) */
1880 if(lookForECS && ntohs(drh->d_type) == QType::OPT) {
1881 if (!options) {
1882 char* ecsStart = nullptr;
1883 size_t ecsLen = 0;
1884 /* we need to pass the record len */
1885 int res = getEDNSOption(const_cast<char*>(reinterpret_cast<const char*>(&question.at(pos - sizeof(drh->d_clen)))), questionLen - pos + sizeof(drh->d_clen), EDNSOptionCode::ECS, &ecsStart, &ecsLen);
1886 if (res == 0 && ecsLen > 4) {
1887 EDNSSubnetOpts eso;
1888 if(getEDNSSubnetOptsFromString(ecsStart + 4, ecsLen - 4, &eso)) {
1889 *ednssubnet=eso;
1890 foundECS = true;
1891 }
1892 }
1893 }
1894 else {
1895 /* we need to pass the record len */
1896 int res = getEDNSOptions(reinterpret_cast<const char*>(&question.at(pos -sizeof(drh->d_clen))), questionLen - pos + (sizeof(drh->d_clen)), *options);
1897 if (res == 0) {
1898 const auto& it = options->find(EDNSOptionCode::ECS);
1899 if (it != options->end() && !it->second.values.empty() && it->second.values.at(0).content != nullptr && it->second.values.at(0).size > 0) {
1900 EDNSSubnetOpts eso;
1901 if(getEDNSSubnetOptsFromString(it->second.values.at(0).content, it->second.values.at(0).size, &eso)) {
1902 *ednssubnet=eso;
1903 foundECS = true;
1904 }
1905 }
1906 }
1907 }
1908 }
1909 else if (lookForXPF && ntohs(drh->d_type) == g_xpfRRCode && ntohs(drh->d_class) == QClass::IN && drh->d_ttl == 0) {
1910 if ((questionLen - pos) < ntohs(drh->d_clen)) {
1911 return;
1912 }
1913
1914 foundXPF = parseXPFPayload(reinterpret_cast<const char*>(&question.at(pos)), ntohs(drh->d_clen), *xpfSource, xpfDest);
1915 }
1916
1917 pos += ntohs(drh->d_clen);
1918 }
1919 }
1920
1921 static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var)
1922 {
1923 shared_ptr<TCPConnection> conn=any_cast<shared_ptr<TCPConnection> >(var);
1924
1925 if(conn->state==TCPConnection::BYTE0) {
1926 ssize_t bytes=recv(conn->getFD(), &conn->data[0], 2, 0);
1927 if(bytes==1)
1928 conn->state=TCPConnection::BYTE1;
1929 if(bytes==2) {
1930 conn->qlen=(((unsigned char)conn->data[0]) << 8)+ (unsigned char)conn->data[1];
1931 conn->data.resize(conn->qlen);
1932 conn->bytesread=0;
1933 conn->state=TCPConnection::GETQUESTION;
1934 }
1935 if(!bytes || bytes < 0) {
1936 t_fdm->removeReadFD(fd);
1937 return;
1938 }
1939 }
1940 else if(conn->state==TCPConnection::BYTE1) {
1941 ssize_t bytes=recv(conn->getFD(), &conn->data[1], 1, 0);
1942 if(bytes==1) {
1943 conn->state=TCPConnection::GETQUESTION;
1944 conn->qlen=(((unsigned char)conn->data[0]) << 8)+ (unsigned char)conn->data[1];
1945 conn->data.resize(conn->qlen);
1946 conn->bytesread=0;
1947 }
1948 if(!bytes || bytes < 0) {
1949 if(g_logCommonErrors)
1950 g_log<<Logger::Error<<"TCP client "<< conn->d_remote.toStringWithPort() <<" disconnected after first byte"<<endl;
1951 t_fdm->removeReadFD(fd);
1952 return;
1953 }
1954 }
1955 else if(conn->state==TCPConnection::GETQUESTION) {
1956 ssize_t bytes=recv(conn->getFD(), &conn->data[conn->bytesread], conn->qlen - conn->bytesread, 0);
1957 if(!bytes || bytes < 0 || bytes > std::numeric_limits<std::uint16_t>::max()) {
1958 if(g_logCommonErrors) {
1959 g_log<<Logger::Error<<"TCP client "<< conn->d_remote.toStringWithPort() <<" disconnected while reading question body"<<endl;
1960 }
1961 t_fdm->removeReadFD(fd);
1962 return;
1963 }
1964 conn->bytesread+=(uint16_t)bytes;
1965 if(conn->bytesread==conn->qlen) {
1966 t_fdm->removeReadFD(fd); // should no longer awake ourselves when there is data to read
1967
1968 std::unique_ptr<DNSComboWriter> dc;
1969 try {
1970 dc=std::unique_ptr<DNSComboWriter>(new DNSComboWriter(conn->data, g_now));
1971 }
1972 catch(const MOADNSException &mde) {
1973 g_stats.clientParseError++;
1974 if(g_logCommonErrors)
1975 g_log<<Logger::Error<<"Unable to parse packet from TCP client "<< conn->d_remote.toStringWithPort() <<endl;
1976 return;
1977 }
1978 dc->d_tcpConnection = conn; // carry the torch
1979 dc->setSocket(conn->getFD()); // this is the only time a copy is made of the actual fd
1980 dc->d_tcp=true;
1981 dc->setRemote(conn->d_remote);
1982 dc->setSource(conn->d_remote);
1983 ComboAddress dest;
1984 dest.reset();
1985 dest.sin4.sin_family = conn->d_remote.sin4.sin_family;
1986 socklen_t len = dest.getSocklen();
1987 getsockname(conn->getFD(), (sockaddr*)&dest, &len); // if this fails, we're ok with it
1988 dc->setLocal(dest);
1989 dc->setDestination(dest);
1990 DNSName qname;
1991 uint16_t qtype=0;
1992 uint16_t qclass=0;
1993 bool needECS = false;
1994 bool needXPF = g_XPFAcl.match(conn->d_remote);
1995 string requestorId;
1996 string deviceId;
1997 string deviceName;
1998 bool logQuery = false;
1999 #ifdef HAVE_PROTOBUF
2000 auto luaconfsLocal = g_luaconfs.getLocal();
2001 if (checkProtobufExport(luaconfsLocal)) {
2002 needECS = true;
2003 }
2004 logQuery = t_protobufServers && luaconfsLocal->protobufExportConfig.logQueries;
2005 #endif /* HAVE_PROTOBUF */
2006
2007 #ifdef HAVE_FSTRM
2008 checkFrameStreamExport(luaconfsLocal);
2009 #endif
2010
2011 if(needECS || needXPF || (t_pdl && (t_pdl->d_gettag_ffi || t_pdl->d_gettag))) {
2012
2013 try {
2014 EDNSOptionViewMap ednsOptions;
2015 bool xpfFound = false;
2016 dc->d_ecsParsed = true;
2017 dc->d_ecsFound = false;
2018 getQNameAndSubnet(conn->data, &qname, &qtype, &qclass,
2019 dc->d_ecsFound, &dc->d_ednssubnet, g_gettagNeedsEDNSOptions ? &ednsOptions : nullptr,
2020 xpfFound, needXPF ? &dc->d_source : nullptr, needXPF ? &dc->d_destination : nullptr);
2021
2022 if(t_pdl) {
2023 try {
2024 if (t_pdl->d_gettag_ffi) {
2025 dc->d_tag = t_pdl->gettag_ffi(dc->d_source, dc->d_ednssubnet.source, dc->d_destination, qname, qtype, &dc->d_policyTags, dc->d_data, ednsOptions, true, requestorId, deviceId, deviceName, dc->d_ttlCap, dc->d_variable, logQuery);
2026 }
2027 else if (t_pdl->d_gettag) {
2028 dc->d_tag = t_pdl->gettag(dc->d_source, dc->d_ednssubnet.source, dc->d_destination, qname, qtype, &dc->d_policyTags, dc->d_data, ednsOptions, true, requestorId, deviceId, deviceName);
2029 }
2030 }
2031 catch(const std::exception& e) {
2032 if(g_logCommonErrors)
2033 g_log<<Logger::Warning<<"Error parsing a query packet qname='"<<qname<<"' for tag determination, setting tag=0: "<<e.what()<<endl;
2034 }
2035 }
2036 }
2037 catch(const std::exception& e)
2038 {
2039 if(g_logCommonErrors)
2040 g_log<<Logger::Warning<<"Error parsing a query packet for tag determination, setting tag=0: "<<e.what()<<endl;
2041 }
2042 }
2043
2044 const struct dnsheader* dh = reinterpret_cast<const struct dnsheader*>(&conn->data[0]);
2045
2046 #ifdef HAVE_PROTOBUF
2047 if(t_protobufServers || t_outgoingProtobufServers) {
2048 dc->d_requestorId = requestorId;
2049 dc->d_deviceId = deviceId;
2050 dc->d_deviceName = deviceName;
2051 dc->d_uuid = getUniqueID();
2052 }
2053
2054 if(t_protobufServers) {
2055 try {
2056
2057 if (logQuery && !(luaconfsLocal->protobufExportConfig.taggedOnly && dc->d_policyTags.empty())) {
2058 protobufLogQuery(luaconfsLocal->protobufMaskV4, luaconfsLocal->protobufMaskV6, dc->d_uuid, dc->d_source, dc->d_destination, dc->d_ednssubnet.source, true, dh->id, conn->qlen, qname, qtype, qclass, dc->d_policyTags, dc->d_requestorId, dc->d_deviceId, dc->d_deviceName);
2059 }
2060 }
2061 catch(std::exception& e) {
2062 if(g_logCommonErrors)
2063 g_log<<Logger::Warning<<"Error parsing a TCP query packet for edns subnet: "<<e.what()<<endl;
2064 }
2065 }
2066 #endif
2067 if(t_pdl) {
2068 if(t_pdl->ipfilter(dc->d_source, dc->d_destination, *dh)) {
2069 if(!g_quiet)
2070 g_log<<Logger::Notice<<t_id<<" ["<<MT->getTid()<<"/"<<MT->numProcesses()<<"] DROPPED TCP question from "<<dc->d_source.toStringWithPort()<<(dc->d_source != dc->d_remote ? " (via "+dc->d_remote.toStringWithPort()+")" : "")<<" based on policy"<<endl;
2071 g_stats.policyDrops++;
2072 return;
2073 }
2074 }
2075
2076 if(dc->d_mdp.d_header.qr) {
2077 g_stats.ignoredCount++;
2078 if(g_logCommonErrors) {
2079 g_log<<Logger::Error<<"Ignoring answer from TCP client "<< dc->getRemote() <<" on server socket!"<<endl;
2080 }
2081 return;
2082 }
2083 if(dc->d_mdp.d_header.opcode) {
2084 g_stats.ignoredCount++;
2085 if(g_logCommonErrors) {
2086 g_log<<Logger::Error<<"Ignoring non-query opcode from TCP client "<< dc->getRemote() <<" on server socket!"<<endl;
2087 }
2088 return;
2089 }
2090 else if (dh->qdcount == 0) {
2091 g_stats.emptyQueriesCount++;
2092 if(g_logCommonErrors) {
2093 g_log<<Logger::Error<<"Ignoring empty (qdcount == 0) query from "<< dc->getRemote() <<" on server socket!"<<endl;
2094 }
2095 return;
2096 }
2097 else {
2098 ++g_stats.qcounter;
2099 ++g_stats.tcpqcounter;
2100 MT->makeThread(startDoResolve, dc.release()); // deletes dc, will set state to BYTE0 again
2101 return;
2102 }
2103 }
2104 }
2105 }
2106
2107 //! Handle new incoming TCP connection
2108 static void handleNewTCPQuestion(int fd, FDMultiplexer::funcparam_t& )
2109 {
2110 ComboAddress addr;
2111 socklen_t addrlen=sizeof(addr);
2112 int newsock=accept(fd, (struct sockaddr*)&addr, &addrlen);
2113 if(newsock>=0) {
2114 if(MT->numProcesses() > g_maxMThreads) {
2115 g_stats.overCapacityDrops++;
2116 try {
2117 closesocket(newsock);
2118 }
2119 catch(const PDNSException& e) {
2120 g_log<<Logger::Error<<"Error closing TCP socket after an over capacity drop: "<<e.reason<<endl;
2121 }
2122 return;
2123 }
2124
2125 if(t_remotes)
2126 t_remotes->push_back(addr);
2127 if(t_allowFrom && !t_allowFrom->match(&addr)) {
2128 if(!g_quiet)
2129 g_log<<Logger::Error<<"["<<MT->getTid()<<"] dropping TCP query from "<<addr.toString()<<", address not matched by allow-from"<<endl;
2130
2131 g_stats.unauthorizedTCP++;
2132 try {
2133 closesocket(newsock);
2134 }
2135 catch(const PDNSException& e) {
2136 g_log<<Logger::Error<<"Error closing TCP socket after an ACL drop: "<<e.reason<<endl;
2137 }
2138 return;
2139 }
2140 if(g_maxTCPPerClient && t_tcpClientCounts->count(addr) && (*t_tcpClientCounts)[addr] >= g_maxTCPPerClient) {
2141 g_stats.tcpClientOverflow++;
2142 try {
2143 closesocket(newsock); // don't call TCPConnection::closeAndCleanup here - did not enter it in the counts yet!
2144 }
2145 catch(const PDNSException& e) {
2146 g_log<<Logger::Error<<"Error closing TCP socket after an overflow drop: "<<e.reason<<endl;
2147 }
2148 return;
2149 }
2150
2151 setNonBlocking(newsock);
2152 std::shared_ptr<TCPConnection> tc = std::make_shared<TCPConnection>(newsock, addr);
2153 tc->state=TCPConnection::BYTE0;
2154
2155 struct timeval ttd;
2156 Utility::gettimeofday(&ttd, 0);
2157 ttd.tv_sec += g_tcpTimeout;
2158
2159 t_fdm->addReadFD(tc->getFD(), handleRunningTCPQuestion, tc, &ttd);
2160 }
2161 }
2162
2163 static string* doProcessUDPQuestion(const std::string& question, const ComboAddress& fromaddr, const ComboAddress& destaddr, struct timeval tv, int fd)
2164 {
2165 gettimeofday(&g_now, 0);
2166 if (tv.tv_sec) {
2167 struct timeval diff = g_now - tv;
2168 double delta=(diff.tv_sec*1000 + diff.tv_usec/1000.0);
2169
2170 if(delta > 1000.0) {
2171 g_stats.tooOldDrops++;
2172 return nullptr;
2173 }
2174 }
2175
2176 ++g_stats.qcounter;
2177 if(fromaddr.sin4.sin_family==AF_INET6)
2178 g_stats.ipv6qcounter++;
2179
2180 string response;
2181 const struct dnsheader* dh = (struct dnsheader*)question.c_str();
2182 unsigned int ctag=0;
2183 uint32_t qhash = 0;
2184 bool needECS = false;
2185 bool needXPF = g_XPFAcl.match(fromaddr);
2186 std::vector<std::string> policyTags;
2187 LuaContext::LuaObject data;
2188 ComboAddress source = fromaddr;
2189 ComboAddress destination = destaddr;
2190 string requestorId;
2191 string deviceId;
2192 string deviceName;
2193 bool logQuery = false;
2194 #ifdef HAVE_PROTOBUF
2195 boost::uuids::uuid uniqueId;
2196 auto luaconfsLocal = g_luaconfs.getLocal();
2197 if (checkProtobufExport(luaconfsLocal)) {
2198 uniqueId = getUniqueID();
2199 needECS = true;
2200 } else if (checkOutgoingProtobufExport(luaconfsLocal)) {
2201 uniqueId = getUniqueID();
2202 }
2203 logQuery = t_protobufServers && luaconfsLocal->protobufExportConfig.logQueries;
2204 bool logResponse = t_protobufServers && luaconfsLocal->protobufExportConfig.logResponses;
2205 #endif
2206 #ifdef HAVE_FSTRM
2207 checkFrameStreamExport(luaconfsLocal);
2208 #endif
2209 EDNSSubnetOpts ednssubnet;
2210 bool ecsFound = false;
2211 bool ecsParsed = false;
2212 uint16_t ecsBegin = 0;
2213 uint16_t ecsEnd = 0;
2214 uint32_t ttlCap = std::numeric_limits<uint32_t>::max();
2215 bool variable = false;
2216 try {
2217 DNSName qname;
2218 uint16_t qtype=0;
2219 uint16_t qclass=0;
2220 uint32_t age;
2221 bool qnameParsed=false;
2222 #ifdef MALLOC_TRACE
2223 /*
2224 static uint64_t last=0;
2225 if(!last)
2226 g_mtracer->clearAllocators();
2227 cout<<g_mtracer->getAllocs()-last<<" "<<g_mtracer->getNumOut()<<" -- BEGIN TRACE"<<endl;
2228 last=g_mtracer->getAllocs();
2229 cout<<g_mtracer->topAllocatorsString()<<endl;
2230 g_mtracer->clearAllocators();
2231 */
2232 #endif
2233
2234 if(needECS || needXPF || (t_pdl && (t_pdl->d_gettag || t_pdl->d_gettag_ffi))) {
2235 try {
2236 EDNSOptionViewMap ednsOptions;
2237 bool xpfFound = false;
2238
2239 ecsFound = false;
2240
2241 getQNameAndSubnet(question, &qname, &qtype, &qclass,
2242 ecsFound, &ednssubnet, g_gettagNeedsEDNSOptions ? &ednsOptions : nullptr,
2243 xpfFound, needXPF ? &source : nullptr, needXPF ? &destination : nullptr);
2244
2245 qnameParsed = true;
2246 ecsParsed = true;
2247
2248 if(t_pdl) {
2249 try {
2250 if (t_pdl->d_gettag_ffi) {
2251 ctag = t_pdl->gettag_ffi(source, ednssubnet.source, destination, qname, qtype, &policyTags, data, ednsOptions, false, requestorId, deviceId, deviceName, ttlCap, variable, logQuery);
2252 }
2253 else if (t_pdl->d_gettag) {
2254 ctag = t_pdl->gettag(source, ednssubnet.source, destination, qname, qtype, &policyTags, data, ednsOptions, false, requestorId, deviceId, deviceName);
2255 }
2256 }
2257 catch(const std::exception& e) {
2258 if(g_logCommonErrors)
2259 g_log<<Logger::Warning<<"Error parsing a query packet qname='"<<qname<<"' for tag determination, setting tag=0: "<<e.what()<<endl;
2260 }
2261 }
2262 }
2263 catch(const std::exception& e)
2264 {
2265 if(g_logCommonErrors)
2266 g_log<<Logger::Warning<<"Error parsing a query packet for tag determination, setting tag=0: "<<e.what()<<endl;
2267 }
2268 }
2269
2270 bool cacheHit = false;
2271 boost::optional<RecProtoBufMessage> pbMessage(boost::none);
2272 #ifdef HAVE_PROTOBUF
2273 if (t_protobufServers) {
2274 pbMessage = RecProtoBufMessage(DNSProtoBufMessage::DNSProtoBufMessageType::Response);
2275 pbMessage->setServerIdentity(SyncRes::s_serverID);
2276 if (logQuery && !(luaconfsLocal->protobufExportConfig.taggedOnly && policyTags.empty())) {
2277 protobufLogQuery(luaconfsLocal->protobufMaskV4, luaconfsLocal->protobufMaskV6, uniqueId, source, destination, ednssubnet.source, false, dh->id, question.size(), qname, qtype, qclass, policyTags, requestorId, deviceId, deviceName);
2278 }
2279 }
2280 #endif /* HAVE_PROTOBUF */
2281
2282 /* It might seem like a good idea to skip the packet cache lookup if we know that the answer is not cacheable,
2283 but it means that the hash would not be computed. If some script decides at a later time to mark back the answer
2284 as cacheable we would cache it with a wrong tag, so better safe than sorry. */
2285 vState valState;
2286 if (qnameParsed) {
2287 cacheHit = (!SyncRes::s_nopacketcache && t_packetCache->getResponsePacket(ctag, question, qname, qtype, qclass, g_now.tv_sec, &response, &age, &valState, &qhash, &ecsBegin, &ecsEnd, pbMessage ? &(*pbMessage) : nullptr));
2288 }
2289 else {
2290 cacheHit = (!SyncRes::s_nopacketcache && t_packetCache->getResponsePacket(ctag, question, qname, &qtype, &qclass, g_now.tv_sec, &response, &age, &valState, &qhash, &ecsBegin, &ecsEnd, pbMessage ? &(*pbMessage) : nullptr));
2291 }
2292
2293 if (cacheHit) {
2294 if(valState == Bogus) {
2295 if(t_bogusremotes)
2296 t_bogusremotes->push_back(source);
2297 if(t_bogusqueryring)
2298 t_bogusqueryring->push_back(make_pair(qname, qtype));
2299 }
2300
2301 #ifdef HAVE_PROTOBUF
2302 if(t_protobufServers && logResponse && !(luaconfsLocal->protobufExportConfig.taggedOnly && pbMessage->getAppliedPolicy().empty() && pbMessage->getPolicyTags().empty())) {
2303 Netmask requestorNM(source, source.sin4.sin_family == AF_INET ? luaconfsLocal->protobufMaskV4 : luaconfsLocal->protobufMaskV6);
2304 const ComboAddress& requestor = requestorNM.getMaskedNetwork();
2305 pbMessage->update(uniqueId, &requestor, &destination, false, dh->id);
2306 pbMessage->setEDNSSubnet(ednssubnet.source, ednssubnet.source.isIpv4() ? luaconfsLocal->protobufMaskV4 : luaconfsLocal->protobufMaskV6);
2307 if (g_useKernelTimestamp && tv.tv_sec) {
2308 pbMessage->setQueryTime(tv.tv_sec, tv.tv_usec);
2309 }
2310 else {
2311 pbMessage->setQueryTime(g_now.tv_sec, g_now.tv_usec);
2312 }
2313 pbMessage->setRequestorId(requestorId);
2314 pbMessage->setDeviceId(deviceId);
2315 pbMessage->setDeviceName(deviceName);
2316 protobufLogResponse(*pbMessage);
2317 }
2318 #endif /* HAVE_PROTOBUF */
2319 if(!g_quiet)
2320 g_log<<Logger::Notice<<t_id<< " question answered from packet cache tag="<<ctag<<" from "<<source.toStringWithPort()<<(source != fromaddr ? " (via "+fromaddr.toStringWithPort()+")" : "")<<endl;
2321
2322 g_stats.packetCacheHits++;
2323 SyncRes::s_queries++;
2324 ageDNSPacket(response, age);
2325 struct msghdr msgh;
2326 struct iovec iov;
2327 cmsgbuf_aligned cbuf;
2328 fillMSGHdr(&msgh, &iov, &cbuf, 0, (char*)response.c_str(), response.length(), const_cast<ComboAddress*>(&fromaddr));
2329 msgh.msg_control=NULL;
2330
2331 if(g_fromtosockets.count(fd)) {
2332 addCMsgSrcAddr(&msgh, &cbuf, &destaddr, 0);
2333 }
2334 if(sendmsg(fd, &msgh, 0) < 0 && g_logCommonErrors)
2335 g_log<<Logger::Warning<<"Sending UDP reply to client "<<source.toStringWithPort()<<(source != fromaddr ? " (via "+fromaddr.toStringWithPort()+")" : "")<<" failed with: "<<strerror(errno)<<endl;
2336
2337 if(response.length() >= sizeof(struct dnsheader)) {
2338 struct dnsheader tmpdh;
2339 memcpy(&tmpdh, response.c_str(), sizeof(tmpdh));
2340 updateResponseStats(tmpdh.rcode, source, response.length(), 0, 0);
2341 }
2342 g_stats.avgLatencyUsec=(1-1.0/g_latencyStatSize)*g_stats.avgLatencyUsec + 0.0; // we assume 0 usec
2343 g_stats.avgLatencyOursUsec=(1-1.0/g_latencyStatSize)*g_stats.avgLatencyOursUsec + 0.0; // we assume 0 usec
2344 return 0;
2345 }
2346 }
2347 catch(std::exception& e) {
2348 g_log<<Logger::Error<<"Error processing or aging answer packet: "<<e.what()<<endl;
2349 return 0;
2350 }
2351
2352 if(t_pdl) {
2353 if(t_pdl->ipfilter(source, destination, *dh)) {
2354 if(!g_quiet)
2355 g_log<<Logger::Notice<<t_id<<" ["<<MT->getTid()<<"/"<<MT->numProcesses()<<"] DROPPED question from "<<source.toStringWithPort()<<(source != fromaddr ? " (via "+fromaddr.toStringWithPort()+")" : "")<<" based on policy"<<endl;
2356 g_stats.policyDrops++;
2357 return 0;
2358 }
2359 }
2360
2361 if(MT->numProcesses() > g_maxMThreads) {
2362 if(!g_quiet)
2363 g_log<<Logger::Notice<<t_id<<" ["<<MT->getTid()<<"/"<<MT->numProcesses()<<"] DROPPED question from "<<source.toStringWithPort()<<(source != fromaddr ? " (via "+fromaddr.toStringWithPort()+")" : "")<<", over capacity"<<endl;
2364
2365 g_stats.overCapacityDrops++;
2366 return 0;
2367 }
2368
2369 auto dc = std::unique_ptr<DNSComboWriter>(new DNSComboWriter(question, g_now, std::move(policyTags), std::move(data)));
2370 dc->setSocket(fd);
2371 dc->d_tag=ctag;
2372 dc->d_qhash=qhash;
2373 dc->setRemote(fromaddr);
2374 dc->setSource(source);
2375 dc->setLocal(destaddr);
2376 dc->setDestination(destination);
2377 dc->d_tcp=false;
2378 dc->d_ecsFound = ecsFound;
2379 dc->d_ecsParsed = ecsParsed;
2380 dc->d_ecsBegin = ecsBegin;
2381 dc->d_ecsEnd = ecsEnd;
2382 dc->d_ednssubnet = ednssubnet;
2383 dc->d_ttlCap = ttlCap;
2384 dc->d_variable = variable;
2385 #ifdef HAVE_PROTOBUF
2386 if (t_protobufServers || t_outgoingProtobufServers) {
2387 dc->d_uuid = std::move(uniqueId);
2388 }
2389 dc->d_requestorId = requestorId;
2390 dc->d_deviceId = deviceId;
2391 dc->d_deviceName = deviceName;
2392 dc->d_kernelTimestamp = tv;
2393 #endif
2394
2395 MT->makeThread(startDoResolve, (void*) dc.release()); // deletes dc
2396 return 0;
2397 }
2398
2399
2400 static void handleNewUDPQuestion(int fd, FDMultiplexer::funcparam_t& var)
2401 {
2402 ssize_t len;
2403 static const size_t maxIncomingQuerySize = 512;
2404 static thread_local std::string data;
2405 ComboAddress fromaddr;
2406 struct msghdr msgh;
2407 struct iovec iov;
2408 cmsgbuf_aligned cbuf;
2409 bool firstQuery = true;
2410
2411 for(size_t queriesCounter = 0; queriesCounter < s_maxUDPQueriesPerRound; queriesCounter++) {
2412 data.resize(maxIncomingQuerySize);
2413 fromaddr.sin6.sin6_family=AF_INET6; // this makes sure fromaddr is big enough
2414 fillMSGHdr(&msgh, &iov, &cbuf, sizeof(cbuf), &data[0], data.size(), &fromaddr);
2415
2416 if((len=recvmsg(fd, &msgh, 0)) >= 0) {
2417
2418 firstQuery = false;
2419
2420 if (static_cast<size_t>(len) < sizeof(dnsheader)) {
2421 g_stats.ignoredCount++;
2422 if (!g_quiet) {
2423 g_log<<Logger::Error<<"Ignoring too-short ("<<std::to_string(len)<<") query from "<<fromaddr.toString()<<endl;
2424 }
2425 return;
2426 }
2427
2428 if (msgh.msg_flags & MSG_TRUNC) {
2429 g_stats.truncatedDrops++;
2430 if (!g_quiet) {
2431 g_log<<Logger::Error<<"Ignoring truncated query from "<<fromaddr.toString()<<endl;
2432 }
2433 return;
2434 }
2435
2436 if(t_remotes) {
2437 t_remotes->push_back(fromaddr);
2438 }
2439
2440 if(t_allowFrom && !t_allowFrom->match(&fromaddr)) {
2441 if(!g_quiet) {
2442 g_log<<Logger::Error<<"["<<MT->getTid()<<"] dropping UDP query from "<<fromaddr.toString()<<", address not matched by allow-from"<<endl;
2443 }
2444
2445 g_stats.unauthorizedUDP++;
2446 return;
2447 }
2448 BOOST_STATIC_ASSERT(offsetof(sockaddr_in, sin_port) == offsetof(sockaddr_in6, sin6_port));
2449 if(!fromaddr.sin4.sin_port) { // also works for IPv6
2450 if(!g_quiet) {
2451 g_log<<Logger::Error<<"["<<MT->getTid()<<"] dropping UDP query from "<<fromaddr.toStringWithPort()<<", can't deal with port 0"<<endl;
2452 }
2453
2454 g_stats.clientParseError++; // not quite the best place to put it, but needs to go somewhere
2455 return;
2456 }
2457
2458 try {
2459 data.resize(static_cast<size_t>(len));
2460 dnsheader* dh=(dnsheader*)&data[0];
2461
2462 if(dh->qr) {
2463 g_stats.ignoredCount++;
2464 if(g_logCommonErrors) {
2465 g_log<<Logger::Error<<"Ignoring answer from "<<fromaddr.toString()<<" on server socket!"<<endl;
2466 }
2467 }
2468 else if(dh->opcode) {
2469 g_stats.ignoredCount++;
2470 if(g_logCommonErrors) {
2471 g_log<<Logger::Error<<"Ignoring non-query opcode "<<dh->opcode<<" from "<<fromaddr.toString()<<" on server socket!"<<endl;
2472 }
2473 }
2474 else if (dh->qdcount == 0) {
2475 g_stats.emptyQueriesCount++;
2476 if(g_logCommonErrors) {
2477 g_log<<Logger::Error<<"Ignoring empty (qdcount == 0) query from "<<fromaddr.toString()<<" on server socket!"<<endl;
2478 }
2479 }
2480 else {
2481 struct timeval tv={0,0};
2482 HarvestTimestamp(&msgh, &tv);
2483 ComboAddress dest;
2484 dest.reset(); // this makes sure we ignore this address if not returned by recvmsg above
2485 auto loc = rplookup(g_listenSocketsAddresses, fd);
2486 if(HarvestDestinationAddress(&msgh, &dest)) {
2487 // but.. need to get port too
2488 if(loc) {
2489 dest.sin4.sin_port = loc->sin4.sin_port;
2490 }
2491 }
2492 else {
2493 if(loc) {
2494 dest = *loc;
2495 }
2496 else {
2497 dest.sin4.sin_family = fromaddr.sin4.sin_family;
2498 socklen_t slen = dest.getSocklen();
2499 getsockname(fd, (sockaddr*)&dest, &slen); // if this fails, we're ok with it
2500 }
2501 }
2502
2503 if(g_weDistributeQueries) {
2504 distributeAsyncFunction(data, boost::bind(doProcessUDPQuestion, data, fromaddr, dest, tv, fd));
2505 }
2506 else {
2507 ++s_threadInfos[t_id].numberOfDistributedQueries;
2508 doProcessUDPQuestion(data, fromaddr, dest, tv, fd);
2509 }
2510 }
2511 }
2512 catch(const MOADNSException &mde) {
2513 g_stats.clientParseError++;
2514 if(g_logCommonErrors) {
2515 g_log<<Logger::Error<<"Unable to parse packet from remote UDP client "<<fromaddr.toString() <<": "<<mde.what()<<endl;
2516 }
2517 }
2518 catch(const std::runtime_error& e) {
2519 g_stats.clientParseError++;
2520 if(g_logCommonErrors) {
2521 g_log<<Logger::Error<<"Unable to parse packet from remote UDP client "<<fromaddr.toString() <<": "<<e.what()<<endl;
2522 }
2523 }
2524 }
2525 else {
2526 // cerr<<t_id<<" had error: "<<stringerror()<<endl;
2527 if(firstQuery && errno == EAGAIN) {
2528 g_stats.noPacketError++;
2529 }
2530
2531 break;
2532 }
2533 }
2534 }
2535
2536 static void makeTCPServerSockets(deferredAdd_t& deferredAdds, std::set<int>& tcpSockets)
2537 {
2538 int fd;
2539 vector<string>locals;
2540 stringtok(locals,::arg()["local-address"]," ,");
2541
2542 if(locals.empty())
2543 throw PDNSException("No local address specified");
2544
2545 for(vector<string>::const_iterator i=locals.begin();i!=locals.end();++i) {
2546 ServiceTuple st;
2547 st.port=::arg().asNum("local-port");
2548 parseService(*i, st);
2549
2550 ComboAddress sin;
2551
2552 sin.reset();
2553 sin.sin4.sin_family = AF_INET;
2554 if(!IpToU32(st.host, (uint32_t*)&sin.sin4.sin_addr.s_addr)) {
2555 sin.sin6.sin6_family = AF_INET6;
2556 if(makeIPv6sockaddr(st.host, &sin.sin6) < 0)
2557 throw PDNSException("Unable to resolve local address for TCP server on '"+ st.host +"'");
2558 }
2559
2560 fd=socket(sin.sin6.sin6_family, SOCK_STREAM, 0);
2561 if(fd<0)
2562 throw PDNSException("Making a TCP server socket for resolver: "+stringerror());
2563
2564 setCloseOnExec(fd);
2565
2566 int tmp=1;
2567 if(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &tmp, sizeof tmp)<0) {
2568 g_log<<Logger::Error<<"Setsockopt failed for TCP listening socket"<<endl;
2569 exit(1);
2570 }
2571 if(sin.sin6.sin6_family == AF_INET6 && setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &tmp, sizeof(tmp)) < 0) {
2572 g_log<<Logger::Error<<"Failed to set IPv6 socket to IPv6 only, continuing anyhow: "<<strerror(errno)<<endl;
2573 }
2574
2575 #ifdef TCP_DEFER_ACCEPT
2576 if(setsockopt(fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &tmp, sizeof tmp) >= 0) {
2577 if(i==locals.begin())
2578 g_log<<Logger::Info<<"Enabled TCP data-ready filter for (slight) DoS protection"<<endl;
2579 }
2580 #endif
2581
2582 if( ::arg().mustDo("non-local-bind") )
2583 Utility::setBindAny(AF_INET, fd);
2584
2585 #ifdef SO_REUSEPORT
2586 if(g_reusePort) {
2587 if(setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &tmp, sizeof(tmp)) < 0)
2588 throw PDNSException("SO_REUSEPORT: "+stringerror());
2589 }
2590 #endif
2591
2592 if (::arg().asNum("tcp-fast-open") > 0) {
2593 #ifdef TCP_FASTOPEN
2594 int fastOpenQueueSize = ::arg().asNum("tcp-fast-open");
2595 if (setsockopt(fd, IPPROTO_TCP, TCP_FASTOPEN, &fastOpenQueueSize, sizeof fastOpenQueueSize) < 0) {
2596 g_log<<Logger::Error<<"Failed to enable TCP Fast Open for listening socket: "<<strerror(errno)<<endl;
2597 }
2598 #else
2599 g_log<<Logger::Warning<<"TCP Fast Open configured but not supported for listening socket"<<endl;
2600 #endif
2601 }
2602
2603 sin.sin4.sin_port = htons(st.port);
2604 socklen_t socklen=sin.sin4.sin_family==AF_INET ? sizeof(sin.sin4) : sizeof(sin.sin6);
2605 if (::bind(fd, (struct sockaddr *)&sin, socklen )<0)
2606 throw PDNSException("Binding TCP server socket for "+ st.host +": "+stringerror());
2607
2608 setNonBlocking(fd);
2609 setSocketSendBuffer(fd, 65000);
2610 listen(fd, 128);
2611 deferredAdds.push_back(make_pair(fd, handleNewTCPQuestion));
2612 tcpSockets.insert(fd);
2613
2614 // we don't need to update g_listenSocketsAddresses since it doesn't work for TCP/IP:
2615 // - fd is not that which we know here, but returned from accept()
2616 if(sin.sin4.sin_family == AF_INET)
2617 g_log<<Logger::Info<<"Listening for TCP queries on "<< sin.toString() <<":"<<st.port<<endl;
2618 else
2619 g_log<<Logger::Info<<"Listening for TCP queries on ["<< sin.toString() <<"]:"<<st.port<<endl;
2620 }
2621 }
2622
2623 static void makeUDPServerSockets(deferredAdd_t& deferredAdds)
2624 {
2625 int one=1;
2626 vector<string>locals;
2627 stringtok(locals,::arg()["local-address"]," ,");
2628
2629 if(locals.empty())
2630 throw PDNSException("No local address specified");
2631
2632 for(vector<string>::const_iterator i=locals.begin();i!=locals.end();++i) {
2633 ServiceTuple st;
2634 st.port=::arg().asNum("local-port");
2635 parseService(*i, st);
2636
2637 ComboAddress sin;
2638
2639 sin.reset();
2640 sin.sin4.sin_family = AF_INET;
2641 if(!IpToU32(st.host.c_str() , (uint32_t*)&sin.sin4.sin_addr.s_addr)) {
2642 sin.sin6.sin6_family = AF_INET6;
2643 if(makeIPv6sockaddr(st.host, &sin.sin6) < 0)
2644 throw PDNSException("Unable to resolve local address for UDP server on '"+ st.host +"'");
2645 }
2646
2647 int fd=socket(sin.sin4.sin_family, SOCK_DGRAM, 0);
2648 if(fd < 0) {
2649 throw PDNSException("Making a UDP server socket for resolver: "+netstringerror());
2650 }
2651 if (!setSocketTimestamps(fd))
2652 g_log<<Logger::Warning<<"Unable to enable timestamp reporting for socket"<<endl;
2653
2654 if(IsAnyAddress(sin)) {
2655 if(sin.sin4.sin_family == AF_INET)
2656 if(!setsockopt(fd, IPPROTO_IP, GEN_IP_PKTINFO, &one, sizeof(one))) // linux supports this, so why not - might fail on other systems
2657 g_fromtosockets.insert(fd);
2658 #ifdef IPV6_RECVPKTINFO
2659 if(sin.sin4.sin_family == AF_INET6)
2660 if(!setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, &one, sizeof(one)))
2661 g_fromtosockets.insert(fd);
2662 #endif
2663 if(sin.sin6.sin6_family == AF_INET6 && setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one)) < 0) {
2664 g_log<<Logger::Error<<"Failed to set IPv6 socket to IPv6 only, continuing anyhow: "<<strerror(errno)<<endl;
2665 }
2666 }
2667 if( ::arg().mustDo("non-local-bind") )
2668 Utility::setBindAny(AF_INET6, fd);
2669
2670 setCloseOnExec(fd);
2671
2672 setSocketReceiveBuffer(fd, 250000);
2673 sin.sin4.sin_port = htons(st.port);
2674
2675
2676 #ifdef SO_REUSEPORT
2677 if(g_reusePort) {
2678 if(setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &one, sizeof(one)) < 0)
2679 throw PDNSException("SO_REUSEPORT: "+stringerror());
2680 }
2681 #endif
2682
2683 if (sin.isIPv4()) {
2684 try {
2685 setSocketIgnorePMTU(fd);
2686 }
2687 catch(const std::exception& e) {
2688 g_log<<Logger::Warning<<"Failed to set IP_MTU_DISCOVER on UDP server socket: "<<e.what()<<endl;
2689 }
2690 }
2691
2692 socklen_t socklen=sin.getSocklen();
2693 if (::bind(fd, (struct sockaddr *)&sin, socklen)<0)
2694 throw PDNSException("Resolver binding to server socket on port "+ std::to_string(st.port) +" for "+ st.host+": "+stringerror());
2695
2696 setNonBlocking(fd);
2697
2698 deferredAdds.push_back(make_pair(fd, handleNewUDPQuestion));
2699 g_listenSocketsAddresses[fd]=sin; // this is written to only from the startup thread, not from the workers
2700 if(sin.sin4.sin_family == AF_INET)
2701 g_log<<Logger::Info<<"Listening for UDP queries on "<< sin.toString() <<":"<<st.port<<endl;
2702 else
2703 g_log<<Logger::Info<<"Listening for UDP queries on ["<< sin.toString() <<"]:"<<st.port<<endl;
2704 }
2705 }
2706
2707 static void daemonize(void)
2708 {
2709 if(fork())
2710 exit(0); // bye bye
2711
2712 setsid();
2713
2714 int i=open("/dev/null",O_RDWR); /* open stdin */
2715 if(i < 0)
2716 g_log<<Logger::Critical<<"Unable to open /dev/null: "<<stringerror()<<endl;
2717 else {
2718 dup2(i,0); /* stdin */
2719 dup2(i,1); /* stderr */
2720 dup2(i,2); /* stderr */
2721 close(i);
2722 }
2723 }
2724
2725 static void usr1Handler(int)
2726 {
2727 statsWanted=true;
2728 }
2729
2730 static void usr2Handler(int)
2731 {
2732 g_quiet= !g_quiet;
2733 SyncRes::setDefaultLogMode(g_quiet ? SyncRes::LogNone : SyncRes::Log);
2734 ::arg().set("quiet")=g_quiet ? "" : "no";
2735 }
2736
2737 static void doStats(void)
2738 {
2739 static time_t lastOutputTime;
2740 static uint64_t lastQueryCount;
2741
2742 uint64_t cacheHits = broadcastAccFunction<uint64_t>(pleaseGetCacheHits);
2743 uint64_t cacheMisses = broadcastAccFunction<uint64_t>(pleaseGetCacheMisses);
2744
2745 if(g_stats.qcounter && (cacheHits + cacheMisses) && SyncRes::s_queries && SyncRes::s_outqueries) {
2746 g_log<<Logger::Notice<<"stats: "<<g_stats.qcounter<<" questions, "<<
2747 broadcastAccFunction<uint64_t>(pleaseGetCacheSize)<< " cache entries, "<<
2748 broadcastAccFunction<uint64_t>(pleaseGetNegCacheSize)<<" negative entries, "<<
2749 (int)((cacheHits*100.0)/(cacheHits+cacheMisses))<<"% cache hits"<<endl;
2750
2751 g_log<<Logger::Notice<<"stats: throttle map: "
2752 << broadcastAccFunction<uint64_t>(pleaseGetThrottleSize) <<", ns speeds: "
2753 << broadcastAccFunction<uint64_t>(pleaseGetNsSpeedsSize)<<endl;
2754 g_log<<Logger::Notice<<"stats: outpacket/query ratio "<<(int)(SyncRes::s_outqueries*100.0/SyncRes::s_queries)<<"%";
2755 g_log<<Logger::Notice<<", "<<(int)(SyncRes::s_throttledqueries*100.0/(SyncRes::s_outqueries+SyncRes::s_throttledqueries))<<"% throttled, "
2756 <<SyncRes::s_nodelegated<<" no-delegation drops"<<endl;
2757 g_log<<Logger::Notice<<"stats: "<<SyncRes::s_tcpoutqueries<<" outgoing tcp connections, "<<
2758 broadcastAccFunction<uint64_t>(pleaseGetConcurrentQueries)<<" queries running, "<<SyncRes::s_outgoingtimeouts<<" outgoing timeouts"<<endl;
2759
2760 //g_log<<Logger::Notice<<"stats: "<<g_stats.ednsPingMatches<<" ping matches, "<<g_stats.ednsPingMismatches<<" mismatches, "<<
2761 //g_stats.noPingOutQueries<<" outqueries w/o ping, "<< g_stats.noEdnsOutQueries<<" w/o EDNS"<<endl;
2762
2763 g_log<<Logger::Notice<<"stats: " << broadcastAccFunction<uint64_t>(pleaseGetPacketCacheSize) <<
2764 " packet cache entries, "<<(int)(100.0*broadcastAccFunction<uint64_t>(pleaseGetPacketCacheHits)/SyncRes::s_queries) << "% packet cache hits"<<endl;
2765
2766 size_t idx = 0;
2767 for (const auto& threadInfo : s_threadInfos) {
2768 if(threadInfo.isWorker) {
2769 g_log<<Logger::Notice<<"stats: thread "<<idx<<" has been distributed "<<threadInfo.numberOfDistributedQueries<<" queries"<<endl;
2770 ++idx;
2771 }
2772 }
2773
2774 time_t now = time(0);
2775 if(lastOutputTime && lastQueryCount && now != lastOutputTime) {
2776 g_log<<Logger::Notice<<"stats: "<< (SyncRes::s_queries - lastQueryCount) / (now - lastOutputTime) <<" qps (average over "<< (now - lastOutputTime) << " seconds)"<<endl;
2777 }
2778 lastOutputTime = now;
2779 lastQueryCount = SyncRes::s_queries;
2780 }
2781 else if(statsWanted)
2782 g_log<<Logger::Notice<<"stats: no stats yet!"<<endl;
2783
2784 statsWanted=false;
2785 }
2786
2787 static void houseKeeping(void *)
2788 {
2789 static thread_local time_t last_rootupdate, last_prune, last_secpoll, last_trustAnchorUpdate{0};
2790 static thread_local int cleanCounter=0;
2791 static thread_local bool s_running; // houseKeeping can get suspended in secpoll, and be restarted, which makes us do duplicate work
2792 auto luaconfsLocal = g_luaconfs.getLocal();
2793
2794 if (last_trustAnchorUpdate == 0 && !luaconfsLocal->trustAnchorFileInfo.fname.empty() && luaconfsLocal->trustAnchorFileInfo.interval != 0) {
2795 // Loading the Lua config file already "refreshed" the TAs
2796 last_trustAnchorUpdate = g_now.tv_sec + luaconfsLocal->trustAnchorFileInfo.interval * 3600;
2797 }
2798
2799 try {
2800 if(s_running) {
2801 return;
2802 }
2803 s_running=true;
2804
2805 struct timeval now;
2806 Utility::gettimeofday(&now, 0);
2807
2808 if(now.tv_sec - last_prune > (time_t)(5 + t_id)) {
2809 t_RC->doPrune(g_maxCacheEntries / g_numThreads); // this function is local to a thread, so fine anyhow
2810 t_packetCache->doPruneTo(g_maxPacketCacheEntries / g_numWorkerThreads);
2811
2812 SyncRes::pruneNegCache(g_maxCacheEntries / (g_numWorkerThreads * 10));
2813
2814 if(!((cleanCounter++)%40)) { // this is a full scan!
2815 time_t limit=now.tv_sec-300;
2816 SyncRes::pruneNSSpeeds(limit);
2817 }
2818 last_prune=time(0);
2819 }
2820
2821 if(now.tv_sec - last_rootupdate > 7200) {
2822 int res = SyncRes::getRootNS(g_now, nullptr);
2823 if (!res)
2824 last_rootupdate=now.tv_sec;
2825 }
2826
2827 if(isHandlerThread()) {
2828
2829 if(now.tv_sec - last_secpoll >= 3600) {
2830 try {
2831 doSecPoll(&last_secpoll);
2832 }
2833 catch(std::exception& e)
2834 {
2835 g_log<<Logger::Error<<"Exception while performing security poll: "<<e.what()<<endl;
2836 }
2837 catch(PDNSException& e)
2838 {
2839 g_log<<Logger::Error<<"Exception while performing security poll: "<<e.reason<<endl;
2840 }
2841 catch(ImmediateServFailException &e)
2842 {
2843 g_log<<Logger::Error<<"Exception while performing security poll: "<<e.reason<<endl;
2844 }
2845 catch(...)
2846 {
2847 g_log<<Logger::Error<<"Exception while performing security poll"<<endl;
2848 }
2849 }
2850
2851 if (!luaconfsLocal->trustAnchorFileInfo.fname.empty() && luaconfsLocal->trustAnchorFileInfo.interval != 0 &&
2852 g_now.tv_sec - last_trustAnchorUpdate >= (luaconfsLocal->trustAnchorFileInfo.interval * 3600)) {
2853 g_log<<Logger::Debug<<"Refreshing Trust Anchors from file"<<endl;
2854 try {
2855 map<DNSName, dsmap_t> dsAnchors;
2856 if (updateTrustAnchorsFromFile(luaconfsLocal->trustAnchorFileInfo.fname, dsAnchors)) {
2857 g_luaconfs.modify([&dsAnchors](LuaConfigItems& lci) {
2858 lci.dsAnchors = dsAnchors;
2859 });
2860 }
2861 last_trustAnchorUpdate = now.tv_sec;
2862 } catch (const PDNSException &pe) {
2863 g_log<<Logger::Error<<"Unable to update Trust Anchors: "<<pe.reason<<endl;
2864 }
2865 }
2866 }
2867 s_running=false;
2868 }
2869 catch(PDNSException& ae)
2870 {
2871 s_running=false;
2872 g_log<<Logger::Error<<"Fatal error in housekeeping thread: "<<ae.reason<<endl;
2873 throw;
2874 }
2875 }
2876
2877 static void makeThreadPipes()
2878 {
2879 auto pipeBufferSize = ::arg().asNum("distribution-pipe-buffer-size");
2880 if (pipeBufferSize > 0) {
2881 g_log<<Logger::Info<<"Resizing the buffer of the distribution pipe to "<<pipeBufferSize<<endl;
2882 }
2883
2884 /* thread 0 is the handler / SNMP, we start at 1 */
2885 for(unsigned int n = 1; n <= (g_numWorkerThreads + g_numDistributorThreads); ++n) {
2886 auto& threadInfos = s_threadInfos.at(n);
2887
2888 int fd[2];
2889 if(pipe(fd) < 0)
2890 unixDie("Creating pipe for inter-thread communications");
2891
2892 threadInfos.pipes.readToThread = fd[0];
2893 threadInfos.pipes.writeToThread = fd[1];
2894
2895 if(pipe(fd) < 0)
2896 unixDie("Creating pipe for inter-thread communications");
2897
2898 threadInfos.pipes.readFromThread = fd[0];
2899 threadInfos.pipes.writeFromThread = fd[1];
2900
2901 if(pipe(fd) < 0)
2902 unixDie("Creating pipe for inter-thread communications");
2903
2904 threadInfos.pipes.readQueriesToThread = fd[0];
2905 threadInfos.pipes.writeQueriesToThread = fd[1];
2906
2907 if (pipeBufferSize > 0) {
2908 if (!setPipeBufferSize(threadInfos.pipes.writeQueriesToThread, pipeBufferSize)) {
2909 g_log<<Logger::Warning<<"Error resizing the buffer of the distribution pipe for thread "<<n<<" to "<<pipeBufferSize<<": "<<strerror(errno)<<endl;
2910 auto existingSize = getPipeBufferSize(threadInfos.pipes.writeQueriesToThread);
2911 if (existingSize > 0) {
2912 g_log<<Logger::Warning<<"The current size of the distribution pipe's buffer for thread "<<n<<" is "<<existingSize<<endl;
2913 }
2914 }
2915 }
2916
2917 if (!setNonBlocking(threadInfos.pipes.writeQueriesToThread)) {
2918 unixDie("Making pipe for inter-thread communications non-blocking");
2919 }
2920 }
2921 }
2922
2923 struct ThreadMSG
2924 {
2925 pipefunc_t func;
2926 bool wantAnswer;
2927 };
2928
2929 void broadcastFunction(const pipefunc_t& func)
2930 {
2931 /* This function might be called by the worker with t_id 0 during startup
2932 for the initialization of ACLs and domain maps. After that it should only
2933 be called by the handler. */
2934
2935 if (s_threadInfos.empty() && isHandlerThread()) {
2936 /* the handler and distributors will call themselves below, but
2937 during startup we get called while s_threadInfos has not been
2938 populated yet to update the ACL or domain maps, so we need to
2939 handle that case.
2940 */
2941 func();
2942 }
2943
2944 unsigned int n = 0;
2945 for (const auto& threadInfo : s_threadInfos) {
2946 if(n++ == t_id) {
2947 func(); // don't write to ourselves!
2948 continue;
2949 }
2950
2951 ThreadMSG* tmsg = new ThreadMSG();
2952 tmsg->func = func;
2953 tmsg->wantAnswer = true;
2954 if(write(threadInfo.pipes.writeToThread, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) {
2955 delete tmsg;
2956
2957 unixDie("write to thread pipe returned wrong size or error");
2958 }
2959
2960 string* resp = nullptr;
2961 if(read(threadInfo.pipes.readFromThread, &resp, sizeof(resp)) != sizeof(resp))
2962 unixDie("read from thread pipe returned wrong size or error");
2963
2964 if(resp) {
2965 delete resp;
2966 resp = nullptr;
2967 }
2968 }
2969 }
2970
2971 static bool trySendingQueryToWorker(unsigned int target, ThreadMSG* tmsg)
2972 {
2973 auto& targetInfo = s_threadInfos[target];
2974 if(!targetInfo.isWorker) {
2975 g_log<<Logger::Error<<"distributeAsyncFunction() tried to assign a query to a non-worker thread"<<endl;
2976 exit(1);
2977 }
2978
2979 const auto& tps = targetInfo.pipes;
2980
2981 ssize_t written = write(tps.writeQueriesToThread, &tmsg, sizeof(tmsg));
2982 if (written > 0) {
2983 if (static_cast<size_t>(written) != sizeof(tmsg)) {
2984 delete tmsg;
2985 unixDie("write to thread pipe returned wrong size or error");
2986 }
2987 }
2988 else {
2989 int error = errno;
2990 if (error == EAGAIN || error == EWOULDBLOCK) {
2991 return false;
2992 } else {
2993 delete tmsg;
2994 unixDie("write to thread pipe returned wrong size or error:" + std::to_string(error));
2995 }
2996 }
2997
2998 ++targetInfo.numberOfDistributedQueries;
2999
3000 return true;
3001 }
3002
3003 static unsigned int getWorkerLoad(size_t workerIdx)
3004 {
3005 const auto mt = s_threadInfos[/* skip handler */ 1 + g_numDistributorThreads + workerIdx].mt;
3006 if (mt != nullptr) {
3007 return mt->numProcesses();
3008 }
3009 return 0;
3010 }
3011
3012 static unsigned int selectWorker(unsigned int hash)
3013 {
3014 if (s_balancingFactor == 0) {
3015 return /* skip handler */ 1 + g_numDistributorThreads + (hash % g_numWorkerThreads);
3016 }
3017
3018 /* we start with one, representing the query we are currently handling */
3019 double currentLoad = 1;
3020 std::vector<unsigned int> load(g_numWorkerThreads);
3021 for (size_t idx = 0; idx < g_numWorkerThreads; idx++) {
3022 load[idx] = getWorkerLoad(idx);
3023 currentLoad += load[idx];
3024 // cerr<<"load for worker "<<idx<<" is "<<load[idx]<<endl;
3025 }
3026
3027 double targetLoad = (currentLoad / g_numWorkerThreads) * s_balancingFactor;
3028 // cerr<<"total load is "<<currentLoad<<", number of workers is "<<g_numWorkerThreads<<", target load is "<<targetLoad<<endl;
3029
3030 unsigned int worker = hash % g_numWorkerThreads;
3031 /* at least one server has to be at or below the average load */
3032 if (load[worker] > targetLoad) {
3033 ++g_stats.rebalancedQueries;
3034 do {
3035 // cerr<<"worker "<<worker<<" is above the target load, selecting another one"<<endl;
3036 worker = (worker + 1) % g_numWorkerThreads;
3037 }
3038 while(load[worker] > targetLoad);
3039 }
3040
3041 return /* skip handler */ 1 + g_numDistributorThreads + worker;
3042 }
3043
3044 // This function is only called by the distributor threads, when pdns-distributes-queries is set
3045 void distributeAsyncFunction(const string& packet, const pipefunc_t& func)
3046 {
3047 if (!isDistributorThread()) {
3048 g_log<<Logger::Error<<"distributeAsyncFunction() has been called by a worker ("<<t_id<<")"<<endl;
3049 exit(1);
3050 }
3051
3052 unsigned int hash = hashQuestion(packet.c_str(), packet.length(), g_disthashseed);
3053 unsigned int target = selectWorker(hash);
3054
3055 ThreadMSG* tmsg = new ThreadMSG();
3056 tmsg->func = func;
3057 tmsg->wantAnswer = false;
3058
3059 if (!trySendingQueryToWorker(target, tmsg)) {
3060 /* if this function failed but did not raise an exception, it means that the pipe
3061 was full, let's try another one */
3062 unsigned int newTarget = 0;
3063 do {
3064 newTarget = /* skip handler */ 1 + g_numDistributorThreads + dns_random(g_numWorkerThreads);
3065 } while (newTarget == target);
3066
3067 if (!trySendingQueryToWorker(newTarget, tmsg)) {
3068 g_stats.queryPipeFullDrops++;
3069 delete tmsg;
3070 }
3071 }
3072 }
3073
3074 static void handlePipeRequest(int fd, FDMultiplexer::funcparam_t& var)
3075 {
3076 ThreadMSG* tmsg = nullptr;
3077
3078 if(read(fd, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) { // fd == readToThread || fd == readQueriesToThread
3079 unixDie("read from thread pipe returned wrong size or error");
3080 }
3081
3082 void *resp=0;
3083 try {
3084 resp = tmsg->func();
3085 }
3086 catch(std::exception& e) {
3087 if(g_logCommonErrors)
3088 g_log<<Logger::Error<<"PIPE function we executed created exception: "<<e.what()<<endl; // but what if they wanted an answer.. we send 0
3089 }
3090 catch(PDNSException& e) {
3091 if(g_logCommonErrors)
3092 g_log<<Logger::Error<<"PIPE function we executed created PDNS exception: "<<e.reason<<endl; // but what if they wanted an answer.. we send 0
3093 }
3094 if(tmsg->wantAnswer) {
3095 const auto& threadInfo = s_threadInfos.at(t_id);
3096 if(write(threadInfo.pipes.writeFromThread, &resp, sizeof(resp)) != sizeof(resp)) {
3097 delete tmsg;
3098 unixDie("write to thread pipe returned wrong size or error");
3099 }
3100 }
3101
3102 delete tmsg;
3103 }
3104
3105 template<class T> void *voider(const boost::function<T*()>& func)
3106 {
3107 return func();
3108 }
3109
3110 vector<ComboAddress>& operator+=(vector<ComboAddress>&a, const vector<ComboAddress>& b)
3111 {
3112 a.insert(a.end(), b.begin(), b.end());
3113 return a;
3114 }
3115
3116 vector<pair<string, uint16_t> >& operator+=(vector<pair<string, uint16_t> >&a, const vector<pair<string, uint16_t> >& b)
3117 {
3118 a.insert(a.end(), b.begin(), b.end());
3119 return a;
3120 }
3121
3122 vector<pair<DNSName, uint16_t> >& operator+=(vector<pair<DNSName, uint16_t> >&a, const vector<pair<DNSName, uint16_t> >& b)
3123 {
3124 a.insert(a.end(), b.begin(), b.end());
3125 return a;
3126 }
3127
3128
3129 /*
3130 This function should only be called by the handler to gather metrics, wipe the cache,
3131 reload the Lua script (not the Lua config) or change the current trace regex,
3132 and by the SNMP thread to gather metrics. */
3133 template<class T> T broadcastAccFunction(const boost::function<T*()>& func)
3134 {
3135 if (!isHandlerThread()) {
3136 g_log<<Logger::Error<<"broadcastAccFunction has been called by a worker ("<<t_id<<")"<<endl;
3137 exit(1);
3138 }
3139
3140 unsigned int n = 0;
3141 T ret=T();
3142 for (const auto& threadInfo : s_threadInfos) {
3143 if (n++ == t_id) {
3144 continue;
3145 }
3146
3147 const auto& tps = threadInfo.pipes;
3148 ThreadMSG* tmsg = new ThreadMSG();
3149 tmsg->func = boost::bind(voider<T>, func);
3150 tmsg->wantAnswer = true;
3151
3152 if(write(tps.writeToThread, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) {
3153 delete tmsg;
3154 unixDie("write to thread pipe returned wrong size or error");
3155 }
3156
3157 T* resp = nullptr;
3158 if(read(tps.readFromThread, &resp, sizeof(resp)) != sizeof(resp))
3159 unixDie("read from thread pipe returned wrong size or error");
3160
3161 if(resp) {
3162 ret += *resp;
3163 delete resp;
3164 resp = nullptr;
3165 }
3166 }
3167 return ret;
3168 }
3169
3170 template string broadcastAccFunction(const boost::function<string*()>& fun); // explicit instantiation
3171 template uint64_t broadcastAccFunction(const boost::function<uint64_t*()>& fun); // explicit instantiation
3172 template vector<ComboAddress> broadcastAccFunction(const boost::function<vector<ComboAddress> *()>& fun); // explicit instantiation
3173 template vector<pair<DNSName,uint16_t> > broadcastAccFunction(const boost::function<vector<pair<DNSName, uint16_t> > *()>& fun); // explicit instantiation
3174 template ThreadTimes broadcastAccFunction(const boost::function<ThreadTimes*()>& fun);
3175
3176 static void handleRCC(int fd, FDMultiplexer::funcparam_t& var)
3177 {
3178 try {
3179 string remote;
3180 string msg=s_rcc.recv(&remote);
3181 RecursorControlParser rcp;
3182 RecursorControlParser::func_t* command;
3183
3184 string answer=rcp.getAnswer(msg, &command);
3185
3186 // If we are inside a chroot, we need to strip
3187 if (!arg()["chroot"].empty()) {
3188 size_t len = arg()["chroot"].length();
3189 remote = remote.substr(len);
3190 }
3191
3192 s_rcc.send(answer, &remote);
3193 command();
3194 }
3195 catch(const std::exception& e) {
3196 g_log<<Logger::Error<<"Error dealing with control socket request: "<<e.what()<<endl;
3197 }
3198 catch(const PDNSException& ae) {
3199 g_log<<Logger::Error<<"Error dealing with control socket request: "<<ae.reason<<endl;
3200 }
3201 }
3202
3203 static void handleTCPClientReadable(int fd, FDMultiplexer::funcparam_t& var)
3204 {
3205 PacketID* pident=any_cast<PacketID>(&var);
3206 // cerr<<"handleTCPClientReadable called for fd "<<fd<<", pident->inNeeded: "<<pident->inNeeded<<", "<<pident->sock->getHandle()<<endl;
3207
3208 shared_array<char> buffer(new char[pident->inNeeded]);
3209
3210 ssize_t ret=recv(fd, buffer.get(), pident->inNeeded,0);
3211 if(ret > 0) {
3212 pident->inMSG.append(&buffer[0], &buffer[ret]);
3213 pident->inNeeded-=(size_t)ret;
3214 if(!pident->inNeeded || pident->inIncompleteOkay) {
3215 // cerr<<"Got entire load of "<<pident->inMSG.size()<<" bytes"<<endl;
3216 PacketID pid=*pident;
3217 string msg=pident->inMSG;
3218
3219 t_fdm->removeReadFD(fd);
3220 MT->sendEvent(pid, &msg);
3221 }
3222 else {
3223 // cerr<<"Still have "<<pident->inNeeded<<" left to go"<<endl;
3224 }
3225 }
3226 else {
3227 PacketID tmp=*pident;
3228 t_fdm->removeReadFD(fd); // pident might now be invalid (it isn't, but still)
3229 string empty;
3230 MT->sendEvent(tmp, &empty); // this conveys error status
3231 }
3232 }
3233
3234 static void handleTCPClientWritable(int fd, FDMultiplexer::funcparam_t& var)
3235 {
3236 PacketID* pid=any_cast<PacketID>(&var);
3237 ssize_t ret=send(fd, pid->outMSG.c_str() + pid->outPos, pid->outMSG.size() - pid->outPos,0);
3238 if(ret > 0) {
3239 pid->outPos+=(ssize_t)ret;
3240 if(pid->outPos==pid->outMSG.size()) {
3241 PacketID tmp=*pid;
3242 t_fdm->removeWriteFD(fd);
3243 MT->sendEvent(tmp, &tmp.outMSG); // send back what we sent to convey everything is ok
3244 }
3245 }
3246 else { // error or EOF
3247 PacketID tmp(*pid);
3248 t_fdm->removeWriteFD(fd);
3249 string sent;
3250 MT->sendEvent(tmp, &sent); // we convey error status by sending empty string
3251 }
3252 }
3253
3254 // resend event to everybody chained onto it
3255 static void doResends(MT_t::waiters_t::iterator& iter, PacketID resend, const string& content)
3256 {
3257 if(iter->key.chain.empty())
3258 return;
3259 // cerr<<"doResends called!\n";
3260 for(PacketID::chain_t::iterator i=iter->key.chain.begin(); i != iter->key.chain.end() ; ++i) {
3261 resend.fd=-1;
3262 resend.id=*i;
3263 // cerr<<"\tResending "<<content.size()<<" bytes for fd="<<resend.fd<<" and id="<<resend.id<<endl;
3264
3265 MT->sendEvent(resend, &content);
3266 g_stats.chainResends++;
3267 }
3268 }
3269
3270 static void handleUDPServerResponse(int fd, FDMultiplexer::funcparam_t& var)
3271 {
3272 PacketID pid=any_cast<PacketID>(var);
3273 ssize_t len;
3274 std::string packet;
3275 packet.resize(g_outgoingEDNSBufsize);
3276 ComboAddress fromaddr;
3277 socklen_t addrlen=sizeof(fromaddr);
3278
3279 len=recvfrom(fd, &packet.at(0), packet.size(), 0, (sockaddr *)&fromaddr, &addrlen);
3280
3281 if(len < (ssize_t) sizeof(dnsheader)) {
3282 if(len < 0)
3283 ; // cerr<<"Error on fd "<<fd<<": "<<stringerror()<<"\n";
3284 else {
3285 g_stats.serverParseError++;
3286 if(g_logCommonErrors)
3287 g_log<<Logger::Error<<"Unable to parse packet from remote UDP server "<< fromaddr.toString() <<
3288 ": packet smaller than DNS header"<<endl;
3289 }
3290
3291 t_udpclientsocks->returnSocket(fd);
3292 string empty;
3293
3294 MT_t::waiters_t::iterator iter=MT->d_waiters.find(pid);
3295 if(iter != MT->d_waiters.end())
3296 doResends(iter, pid, empty);
3297
3298 MT->sendEvent(pid, &empty); // this denotes error (does lookup again.. at least L1 will be hot)
3299 return;
3300 }
3301
3302 packet.resize(len);
3303 dnsheader dh;
3304 memcpy(&dh, &packet.at(0), sizeof(dh));
3305
3306 PacketID pident;
3307 pident.remote=fromaddr;
3308 pident.id=dh.id;
3309 pident.fd=fd;
3310
3311 if(!dh.qr && g_logCommonErrors) {
3312 g_log<<Logger::Notice<<"Not taking data from question on outgoing socket from "<< fromaddr.toStringWithPort() <<endl;
3313 }
3314
3315 if(!dh.qdcount || // UPC, Nominum, very old BIND on FormErr, NSD
3316 !dh.qr) { // one weird server
3317 pident.domain.clear();
3318 pident.type = 0;
3319 }
3320 else {
3321 try {
3322 if(len > 12)
3323 pident.domain=DNSName(&packet.at(0), len, 12, false, &pident.type); // don't copy this from above - we need to do the actual read
3324 }
3325 catch(std::exception& e) {
3326 g_stats.serverParseError++; // won't be fed to lwres.cc, so we have to increment
3327 g_log<<Logger::Warning<<"Error in packet from remote nameserver "<< fromaddr.toStringWithPort() << ": "<<e.what() << endl;
3328 return;
3329 }
3330 }
3331
3332 MT_t::waiters_t::iterator iter=MT->d_waiters.find(pident);
3333 if(iter != MT->d_waiters.end()) {
3334 doResends(iter, pident, packet);
3335 }
3336
3337 retryWithName:
3338
3339 if(!MT->sendEvent(pident, &packet)) {
3340 /* we did not find a match for this response, something is wrong */
3341
3342 // we do a full scan for outstanding queries on unexpected answers. not too bad since we only accept them on the right port number, which is hard enough to guess
3343 for(MT_t::waiters_t::iterator mthread=MT->d_waiters.begin(); mthread!=MT->d_waiters.end(); ++mthread) {
3344 if(pident.fd==mthread->key.fd && mthread->key.remote==pident.remote && mthread->key.type == pident.type &&
3345 pident.domain == mthread->key.domain) {
3346 mthread->key.nearMisses++;
3347 }
3348
3349 // be a bit paranoid here since we're weakening our matching
3350 if(pident.domain.empty() && !mthread->key.domain.empty() && !pident.type && mthread->key.type &&
3351 pident.id == mthread->key.id && mthread->key.remote == pident.remote) {
3352 // cerr<<"Empty response, rest matches though, sending to a waiter"<<endl;
3353 pident.domain = mthread->key.domain;
3354 pident.type = mthread->key.type;
3355 goto retryWithName; // note that this only passes on an error, lwres will still reject the packet
3356 }
3357 }
3358 g_stats.unexpectedCount++; // if we made it here, it really is an unexpected answer
3359 if(g_logCommonErrors) {
3360 g_log<<Logger::Warning<<"Discarding unexpected packet from "<<fromaddr.toStringWithPort()<<": "<< (pident.domain.empty() ? "<empty>" : pident.domain.toString())<<", "<<pident.type<<", "<<MT->d_waiters.size()<<" waiters"<<endl;
3361 }
3362 }
3363 else if(fd >= 0) {
3364 /* we either found a waiter (1) or encountered an issue (-1), it's up to us to clean the socket anyway */
3365 t_udpclientsocks->returnSocket(fd);
3366 }
3367 }
3368
3369 FDMultiplexer* getMultiplexer()
3370 {
3371 FDMultiplexer* ret;
3372 for(const auto& i : FDMultiplexer::getMultiplexerMap()) {
3373 try {
3374 ret=i.second();
3375 return ret;
3376 }
3377 catch(FDMultiplexerException &fe) {
3378 g_log<<Logger::Error<<"Non-fatal error initializing possible multiplexer ("<<fe.what()<<"), falling back"<<endl;
3379 }
3380 catch(...) {
3381 g_log<<Logger::Error<<"Non-fatal error initializing possible multiplexer"<<endl;
3382 }
3383 }
3384 g_log<<Logger::Error<<"No working multiplexer found!"<<endl;
3385 exit(1);
3386 }
3387
3388
3389 static string* doReloadLuaScript()
3390 {
3391 string fname= ::arg()["lua-dns-script"];
3392 try {
3393 if(fname.empty()) {
3394 t_pdl.reset();
3395 g_log<<Logger::Info<<t_id<<" Unloaded current lua script"<<endl;
3396 return new string("unloaded\n");
3397 }
3398 else {
3399 t_pdl = std::make_shared<RecursorLua4>();
3400 t_pdl->loadFile(fname);
3401 }
3402 }
3403 catch(std::exception& e) {
3404 g_log<<Logger::Error<<t_id<<" Retaining current script, error from '"<<fname<<"': "<< e.what() <<endl;
3405 return new string("retaining current script, error from '"+fname+"': "+e.what()+"\n");
3406 }
3407
3408 g_log<<Logger::Warning<<t_id<<" (Re)loaded lua script from '"<<fname<<"'"<<endl;
3409 return new string("(re)loaded '"+fname+"'\n");
3410 }
3411
3412 string doQueueReloadLuaScript(vector<string>::const_iterator begin, vector<string>::const_iterator end)
3413 {
3414 if(begin != end)
3415 ::arg().set("lua-dns-script") = *begin;
3416
3417 return broadcastAccFunction<string>(doReloadLuaScript);
3418 }
3419
3420 static string* pleaseUseNewTraceRegex(const std::string& newRegex)
3421 try
3422 {
3423 if(newRegex.empty()) {
3424 t_traceRegex.reset();
3425 return new string("unset\n");
3426 }
3427 else {
3428 t_traceRegex = std::make_shared<Regex>(newRegex);
3429 return new string("ok\n");
3430 }
3431 }
3432 catch(PDNSException& ae)
3433 {
3434 return new string(ae.reason+"\n");
3435 }
3436
3437 string doTraceRegex(vector<string>::const_iterator begin, vector<string>::const_iterator end)
3438 {
3439 return broadcastAccFunction<string>(boost::bind(pleaseUseNewTraceRegex, begin!=end ? *begin : ""));
3440 }
3441
3442 static void checkLinuxIPv6Limits()
3443 {
3444 #ifdef __linux__
3445 string line;
3446 if(readFileIfThere("/proc/sys/net/ipv6/route/max_size", &line)) {
3447 int lim=std::stoi(line);
3448 if(lim < 16384) {
3449 g_log<<Logger::Error<<"If using IPv6, please raise sysctl net.ipv6.route.max_size, currently set to "<<lim<<" which is < 16384"<<endl;
3450 }
3451 }
3452 #endif
3453 }
3454 static void checkOrFixFDS()
3455 {
3456 unsigned int availFDs=getFilenumLimit();
3457 unsigned int wantFDs = g_maxMThreads * g_numWorkerThreads +25; // even healthier margin then before
3458
3459 if(wantFDs > availFDs) {
3460 unsigned int hardlimit= getFilenumLimit(true);
3461 if(hardlimit >= wantFDs) {
3462 setFilenumLimit(wantFDs);
3463 g_log<<Logger::Warning<<"Raised soft limit on number of filedescriptors to "<<wantFDs<<" to match max-mthreads and threads settings"<<endl;
3464 }
3465 else {
3466 int newval = (hardlimit - 25) / g_numWorkerThreads;
3467 g_log<<Logger::Warning<<"Insufficient number of filedescriptors available for max-mthreads*threads setting! ("<<hardlimit<<" < "<<wantFDs<<"), reducing max-mthreads to "<<newval<<endl;
3468 g_maxMThreads = newval;
3469 setFilenumLimit(hardlimit);
3470 }
3471 }
3472 }
3473
3474 static void* recursorThread(unsigned int tid, const string& threadName);
3475
3476 static void* pleaseSupplantACLs(std::shared_ptr<NetmaskGroup> ng)
3477 {
3478 t_allowFrom = ng;
3479 return nullptr;
3480 }
3481
3482 int g_argc;
3483 char** g_argv;
3484
3485 void parseACLs()
3486 {
3487 static bool l_initialized;
3488
3489 if(l_initialized) { // only reload configuration file on second call
3490 string configname=::arg()["config-dir"]+"/recursor.conf";
3491 if(::arg()["config-name"]!="") {
3492 configname=::arg()["config-dir"]+"/recursor-"+::arg()["config-name"]+".conf";
3493 }
3494 cleanSlashes(configname);
3495
3496 if(!::arg().preParseFile(configname.c_str(), "allow-from-file"))
3497 throw runtime_error("Unable to re-parse configuration file '"+configname+"'");
3498 ::arg().preParseFile(configname.c_str(), "allow-from", LOCAL_NETS);
3499 ::arg().preParseFile(configname.c_str(), "include-dir");
3500 ::arg().preParse(g_argc, g_argv, "include-dir");
3501
3502 // then process includes
3503 std::vector<std::string> extraConfigs;
3504 ::arg().gatherIncludes(extraConfigs);
3505
3506 for(const std::string& fn : extraConfigs) {
3507 if(!::arg().preParseFile(fn.c_str(), "allow-from-file", ::arg()["allow-from-file"]))
3508 throw runtime_error("Unable to re-parse configuration file include '"+fn+"'");
3509 if(!::arg().preParseFile(fn.c_str(), "allow-from", ::arg()["allow-from"]))
3510 throw runtime_error("Unable to re-parse configuration file include '"+fn+"'");
3511 }
3512
3513 ::arg().preParse(g_argc, g_argv, "allow-from-file");
3514 ::arg().preParse(g_argc, g_argv, "allow-from");
3515 }
3516
3517 std::shared_ptr<NetmaskGroup> oldAllowFrom = t_allowFrom;
3518 std::shared_ptr<NetmaskGroup> allowFrom = std::make_shared<NetmaskGroup>();
3519
3520 if(!::arg()["allow-from-file"].empty()) {
3521 string line;
3522 ifstream ifs(::arg()["allow-from-file"].c_str());
3523 if(!ifs) {
3524 throw runtime_error("Could not open '"+::arg()["allow-from-file"]+"': "+stringerror());
3525 }
3526
3527 string::size_type pos;
3528 while(getline(ifs,line)) {
3529 pos=line.find('#');
3530 if(pos!=string::npos)
3531 line.resize(pos);
3532 trim(line);
3533 if(line.empty())
3534 continue;
3535
3536 allowFrom->addMask(line);
3537 }
3538 g_log<<Logger::Warning<<"Done parsing " << allowFrom->size() <<" allow-from ranges from file '"<<::arg()["allow-from-file"]<<"' - overriding 'allow-from' setting"<<endl;
3539 }
3540 else if(!::arg()["allow-from"].empty()) {
3541 vector<string> ips;
3542 stringtok(ips, ::arg()["allow-from"], ", ");
3543
3544 g_log<<Logger::Warning<<"Only allowing queries from: ";
3545 for(vector<string>::const_iterator i = ips.begin(); i!= ips.end(); ++i) {
3546 allowFrom->addMask(*i);
3547 if(i!=ips.begin())
3548 g_log<<Logger::Warning<<", ";
3549 g_log<<Logger::Warning<<*i;
3550 }
3551 g_log<<Logger::Warning<<endl;
3552 }
3553 else {
3554 if(::arg()["local-address"]!="127.0.0.1" && ::arg().asNum("local-port")==53)
3555 g_log<<Logger::Warning<<"WARNING: Allowing queries from all IP addresses - this can be a security risk!"<<endl;
3556 allowFrom = nullptr;
3557 }
3558
3559 g_initialAllowFrom = allowFrom;
3560 broadcastFunction(boost::bind(pleaseSupplantACLs, allowFrom));
3561 oldAllowFrom = nullptr;
3562
3563 l_initialized = true;
3564 }
3565
3566
3567 static void setupDelegationOnly()
3568 {
3569 vector<string> parts;
3570 stringtok(parts, ::arg()["delegation-only"], ", \t");
3571 for(const auto& p : parts) {
3572 SyncRes::addDelegationOnly(DNSName(p));
3573 }
3574 }
3575
3576 static std::map<unsigned int, std::set<int> > parseCPUMap()
3577 {
3578 std::map<unsigned int, std::set<int> > result;
3579
3580 const std::string value = ::arg()["cpu-map"];
3581
3582 if (!value.empty() && !isSettingThreadCPUAffinitySupported()) {
3583 g_log<<Logger::Warning<<"CPU mapping requested but not supported, skipping"<<endl;
3584 return result;
3585 }
3586
3587 std::vector<std::string> parts;
3588
3589 stringtok(parts, value, " \t");
3590
3591 for(const auto& part : parts) {
3592 if (part.find('=') == string::npos)
3593 continue;
3594
3595 try {
3596 auto headers = splitField(part, '=');
3597 trim(headers.first);
3598 trim(headers.second);
3599
3600 unsigned int threadId = pdns_stou(headers.first);
3601 std::vector<std::string> cpus;
3602
3603 stringtok(cpus, headers.second, ",");
3604
3605 for(const auto& cpu : cpus) {
3606 int cpuId = std::stoi(cpu);
3607
3608 result[threadId].insert(cpuId);
3609 }
3610 }
3611 catch(const std::exception& e) {
3612 g_log<<Logger::Error<<"Error parsing cpu-map entry '"<<part<<"': "<<e.what()<<endl;
3613 }
3614 }
3615
3616 return result;
3617 }
3618
3619 static void setCPUMap(const std::map<unsigned int, std::set<int> >& cpusMap, unsigned int n, pthread_t tid)
3620 {
3621 const auto& cpuMapping = cpusMap.find(n);
3622 if (cpuMapping != cpusMap.cend()) {
3623 int rc = mapThreadToCPUList(tid, cpuMapping->second);
3624 if (rc == 0) {
3625 g_log<<Logger::Info<<"CPU affinity for worker "<<n<<" has been set to CPU map:";
3626 for (const auto cpu : cpuMapping->second) {
3627 g_log<<Logger::Info<<" "<<cpu;
3628 }
3629 g_log<<Logger::Info<<endl;
3630 }
3631 else {
3632 g_log<<Logger::Warning<<"Error setting CPU affinity for worker "<<n<<" to CPU map:";
3633 for (const auto cpu : cpuMapping->second) {
3634 g_log<<Logger::Info<<" "<<cpu;
3635 }
3636 g_log<<Logger::Info<<strerror(rc)<<endl;
3637 }
3638 }
3639 }
3640
3641 #ifdef NOD_ENABLED
3642 static void setupNODThread()
3643 {
3644 if (g_nodEnabled) {
3645 uint32_t num_cells = ::arg().asNum("new-domain-db-size");
3646 t_nodDBp = std::make_shared<nod::NODDB>(num_cells);
3647 try {
3648 t_nodDBp->setCacheDir(::arg()["new-domain-history-dir"]);
3649 }
3650 catch (const PDNSException& e) {
3651 g_log<<Logger::Error<<"new-domain-history-dir (" << ::arg()["new-domain-history-dir"] << ") is not readable or does not exist"<<endl;
3652 _exit(1);
3653 }
3654 if (!t_nodDBp->init()) {
3655 g_log<<Logger::Error<<"Could not initialize domain tracking"<<endl;
3656 _exit(1);
3657 }
3658 std::thread t(nod::NODDB::startHousekeepingThread, t_nodDBp, std::this_thread::get_id());
3659 t.detach();
3660 g_nod_pbtag = ::arg()["new-domain-pb-tag"];
3661 }
3662 if (g_udrEnabled) {
3663 uint32_t num_cells = ::arg().asNum("unique-response-db-size");
3664 t_udrDBp = std::make_shared<nod::UniqueResponseDB>(num_cells);
3665 try {
3666 t_udrDBp->setCacheDir(::arg()["unique-response-history-dir"]);
3667 }
3668 catch (const PDNSException& e) {
3669 g_log<<Logger::Error<<"unique-response-history-dir (" << ::arg()["unique-response-history-dir"] << ") is not readable or does not exist"<<endl;
3670 _exit(1);
3671 }
3672 if (!t_udrDBp->init()) {
3673 g_log<<Logger::Error<<"Could not initialize unique response tracking"<<endl;
3674 _exit(1);
3675 }
3676 std::thread t(nod::UniqueResponseDB::startHousekeepingThread, t_udrDBp, std::this_thread::get_id());
3677 t.detach();
3678 g_udr_pbtag = ::arg()["unique-response-pb-tag"];
3679 }
3680 }
3681
3682 void parseNODWhitelist(const std::string& wlist)
3683 {
3684 vector<string> parts;
3685 stringtok(parts, wlist, ",; ");
3686 for(const auto& a : parts) {
3687 g_nodDomainWL.add(DNSName(a));
3688 }
3689 }
3690
3691 static void setupNODGlobal()
3692 {
3693 // Setup NOD subsystem
3694 g_nodEnabled = ::arg().mustDo("new-domain-tracking");
3695 g_nodLookupDomain = DNSName(::arg()["new-domain-lookup"]);
3696 g_nodLog = ::arg().mustDo("new-domain-log");
3697 parseNODWhitelist(::arg()["new-domain-whitelist"]);
3698
3699 // Setup Unique DNS Response subsystem
3700 g_udrEnabled = ::arg().mustDo("unique-response-tracking");
3701 g_udrLog = ::arg().mustDo("unique-response-log");
3702 }
3703 #endif /* NOD_ENABLED */
3704
3705 static int serviceMain(int argc, char*argv[])
3706 {
3707 g_log.setName(s_programname);
3708 g_log.disableSyslog(::arg().mustDo("disable-syslog"));
3709 g_log.setTimestamps(::arg().mustDo("log-timestamp"));
3710
3711 if(!::arg()["logging-facility"].empty()) {
3712 int val=logFacilityToLOG(::arg().asNum("logging-facility") );
3713 if(val >= 0)
3714 g_log.setFacility(val);
3715 else
3716 g_log<<Logger::Error<<"Unknown logging facility "<<::arg().asNum("logging-facility") <<endl;
3717 }
3718
3719 showProductVersion();
3720
3721 g_disthashseed=dns_random(0xffffffff);
3722
3723 checkLinuxIPv6Limits();
3724 try {
3725 vector<string> addrs;
3726 if(!::arg()["query-local-address6"].empty()) {
3727 SyncRes::s_doIPv6=true;
3728 g_log<<Logger::Warning<<"Enabling IPv6 transport for outgoing queries"<<endl;
3729
3730 stringtok(addrs, ::arg()["query-local-address6"], ", ;");
3731 for(const string& addr : addrs) {
3732 g_localQueryAddresses6.push_back(ComboAddress(addr));
3733 }
3734 }
3735 else {
3736 g_log<<Logger::Warning<<"NOT using IPv6 for outgoing queries - set 'query-local-address6=::' to enable"<<endl;
3737 }
3738 addrs.clear();
3739 stringtok(addrs, ::arg()["query-local-address"], ", ;");
3740 for(const string& addr : addrs) {
3741 g_localQueryAddresses4.push_back(ComboAddress(addr));
3742 }
3743 }
3744 catch(std::exception& e) {
3745 g_log<<Logger::Error<<"Assigning local query addresses: "<<e.what();
3746 exit(99);
3747 }
3748
3749 // keep this ABOVE loadRecursorLuaConfig!
3750 if(::arg()["dnssec"]=="off")
3751 g_dnssecmode=DNSSECMode::Off;
3752 else if(::arg()["dnssec"]=="process-no-validate")
3753 g_dnssecmode=DNSSECMode::ProcessNoValidate;
3754 else if(::arg()["dnssec"]=="process")
3755 g_dnssecmode=DNSSECMode::Process;
3756 else if(::arg()["dnssec"]=="validate")
3757 g_dnssecmode=DNSSECMode::ValidateAll;
3758 else if(::arg()["dnssec"]=="log-fail")
3759 g_dnssecmode=DNSSECMode::ValidateForLog;
3760 else {
3761 g_log<<Logger::Error<<"Unknown DNSSEC mode "<<::arg()["dnssec"]<<endl;
3762 exit(1);
3763 }
3764
3765 g_signatureInceptionSkew = ::arg().asNum("signature-inception-skew");
3766 if (g_signatureInceptionSkew < 0) {
3767 g_log<<Logger::Error<<"A negative value for 'signature-inception-skew' is not allowed"<<endl;
3768 exit(1);
3769 }
3770
3771 g_dnssecLogBogus = ::arg().mustDo("dnssec-log-bogus");
3772 g_maxNSEC3Iterations = ::arg().asNum("nsec3-max-iterations");
3773
3774 g_maxCacheEntries = ::arg().asNum("max-cache-entries");
3775 g_maxPacketCacheEntries = ::arg().asNum("max-packetcache-entries");
3776
3777 luaConfigDelayedThreads delayedLuaThreads;
3778 try {
3779 loadRecursorLuaConfig(::arg()["lua-config-file"], delayedLuaThreads);
3780 }
3781 catch (PDNSException &e) {
3782 g_log<<Logger::Error<<"Cannot load Lua configuration: "<<e.reason<<endl;
3783 exit(1);
3784 }
3785
3786 parseACLs();
3787 initPublicSuffixList(::arg()["public-suffix-list-file"]);
3788
3789 if(!::arg()["dont-query"].empty()) {
3790 vector<string> ips;
3791 stringtok(ips, ::arg()["dont-query"], ", ");
3792 ips.push_back("0.0.0.0");
3793 ips.push_back("::");
3794
3795 g_log<<Logger::Warning<<"Will not send queries to: ";
3796 for(vector<string>::const_iterator i = ips.begin(); i!= ips.end(); ++i) {
3797 SyncRes::addDontQuery(*i);
3798 if(i!=ips.begin())
3799 g_log<<Logger::Warning<<", ";
3800 g_log<<Logger::Warning<<*i;
3801 }
3802 g_log<<Logger::Warning<<endl;
3803 }
3804
3805 g_quiet=::arg().mustDo("quiet");
3806
3807 /* this needs to be done before parseACLs(), which call broadcastFunction() */
3808 g_weDistributeQueries = ::arg().mustDo("pdns-distributes-queries");
3809 if(g_weDistributeQueries) {
3810 g_log<<Logger::Warning<<"PowerDNS Recursor itself will distribute queries over threads"<<endl;
3811 }
3812
3813 setupDelegationOnly();
3814 g_outgoingEDNSBufsize=::arg().asNum("edns-outgoing-bufsize");
3815
3816 if(::arg()["trace"]=="fail") {
3817 SyncRes::setDefaultLogMode(SyncRes::Store);
3818 }
3819 else if(::arg().mustDo("trace")) {
3820 SyncRes::setDefaultLogMode(SyncRes::Log);
3821 ::arg().set("quiet")="no";
3822 g_quiet=false;
3823 g_dnssecLOG=true;
3824 }
3825 string myHostname = getHostname();
3826 if (myHostname == "UNKNOWN"){
3827 g_log<<Logger::Warning<<"Unable to get the hostname, NSID and id.server values will be empty"<<endl;
3828 myHostname = "";
3829 }
3830
3831 SyncRes::s_minimumTTL = ::arg().asNum("minimum-ttl-override");
3832 SyncRes::s_minimumECSTTL = ::arg().asNum("ecs-minimum-ttl-override");
3833
3834 SyncRes::s_nopacketcache = ::arg().mustDo("disable-packetcache");
3835
3836 SyncRes::s_maxnegttl=::arg().asNum("max-negative-ttl");
3837 SyncRes::s_maxbogusttl=::arg().asNum("max-cache-bogus-ttl");
3838 SyncRes::s_maxcachettl=max(::arg().asNum("max-cache-ttl"), 15);
3839 SyncRes::s_packetcachettl=::arg().asNum("packetcache-ttl");
3840 // Cap the packetcache-servfail-ttl to the packetcache-ttl
3841 uint32_t packetCacheServFailTTL = ::arg().asNum("packetcache-servfail-ttl");
3842 SyncRes::s_packetcacheservfailttl=(packetCacheServFailTTL > SyncRes::s_packetcachettl) ? SyncRes::s_packetcachettl : packetCacheServFailTTL;
3843 SyncRes::s_serverdownmaxfails=::arg().asNum("server-down-max-fails");
3844 SyncRes::s_serverdownthrottletime=::arg().asNum("server-down-throttle-time");
3845 SyncRes::s_serverID=::arg()["server-id"];
3846 SyncRes::s_maxqperq=::arg().asNum("max-qperq");
3847 SyncRes::s_maxtotusec=1000*::arg().asNum("max-total-msec");
3848 SyncRes::s_maxdepth=::arg().asNum("max-recursion-depth");
3849 SyncRes::s_rootNXTrust = ::arg().mustDo( "root-nx-trust");
3850 if(SyncRes::s_serverID.empty()) {
3851 SyncRes::s_serverID = myHostname;
3852 }
3853
3854 SyncRes::s_ecsipv4limit = ::arg().asNum("ecs-ipv4-bits");
3855 SyncRes::s_ecsipv6limit = ::arg().asNum("ecs-ipv6-bits");
3856 SyncRes::clearECSStats();
3857 SyncRes::s_ecsipv4cachelimit = ::arg().asNum("ecs-ipv4-cache-bits");
3858 SyncRes::s_ecsipv6cachelimit = ::arg().asNum("ecs-ipv6-cache-bits");
3859 SyncRes::s_ecscachelimitttl = ::arg().asNum("ecs-cache-limit-ttl");
3860
3861 SyncRes::s_qnameminimization = ::arg().mustDo("qname-minimization");
3862
3863 if (!::arg().isEmpty("ecs-scope-zero-address")) {
3864 ComboAddress scopeZero(::arg()["ecs-scope-zero-address"]);
3865 SyncRes::setECSScopeZeroAddress(Netmask(scopeZero, scopeZero.isIPv4() ? 32 : 128));
3866 }
3867 else {
3868 bool found = false;
3869 for (const auto& addr : g_localQueryAddresses4) {
3870 if (!IsAnyAddress(addr)) {
3871 SyncRes::setECSScopeZeroAddress(Netmask(addr, 32));
3872 found = true;
3873 break;
3874 }
3875 }
3876 if (!found) {
3877 for (const auto& addr : g_localQueryAddresses6) {
3878 if (!IsAnyAddress(addr)) {
3879 SyncRes::setECSScopeZeroAddress(Netmask(addr, 128));
3880 found = true;
3881 break;
3882 }
3883 }
3884 if (!found) {
3885 SyncRes::setECSScopeZeroAddress(Netmask("127.0.0.1/32"));
3886 }
3887 }
3888 }
3889
3890 SyncRes::parseEDNSSubnetWhitelist(::arg()["edns-subnet-whitelist"]);
3891 SyncRes::parseEDNSSubnetAddFor(::arg()["ecs-add-for"]);
3892 g_useIncomingECS = ::arg().mustDo("use-incoming-edns-subnet");
3893
3894 g_XPFAcl.toMasks(::arg()["xpf-allow-from"]);
3895 g_xpfRRCode = ::arg().asNum("xpf-rr-code");
3896
3897 g_networkTimeoutMsec = ::arg().asNum("network-timeout");
3898
3899 g_initialDomainMap = parseAuthAndForwards();
3900
3901 g_latencyStatSize=::arg().asNum("latency-statistic-size");
3902
3903 g_logCommonErrors=::arg().mustDo("log-common-errors");
3904 g_logRPZChanges = ::arg().mustDo("log-rpz-changes");
3905
3906 g_anyToTcp = ::arg().mustDo("any-to-tcp");
3907 g_udpTruncationThreshold = ::arg().asNum("udp-truncation-threshold");
3908
3909 g_lowercaseOutgoing = ::arg().mustDo("lowercase-outgoing");
3910
3911 g_numDistributorThreads = ::arg().asNum("distributor-threads");
3912 g_numWorkerThreads = ::arg().asNum("threads");
3913 if (g_numWorkerThreads < 1) {
3914 g_log<<Logger::Warning<<"Asked to run with 0 threads, raising to 1 instead"<<endl;
3915 g_numWorkerThreads = 1;
3916 }
3917
3918 g_numThreads = g_numDistributorThreads + g_numWorkerThreads;
3919 g_maxMThreads = ::arg().asNum("max-mthreads");
3920
3921 g_gettagNeedsEDNSOptions = ::arg().mustDo("gettag-needs-edns-options");
3922
3923 g_statisticsInterval = ::arg().asNum("statistics-interval");
3924
3925 {
3926 SuffixMatchNode dontThrottleNames;
3927 vector<string> parts;
3928 stringtok(parts, ::arg()["dont-throttle-names"]);
3929 for (const auto &p : parts) {
3930 dontThrottleNames.add(DNSName(p));
3931 }
3932 g_dontThrottleNames.setState(dontThrottleNames);
3933
3934 NetmaskGroup dontThrottleNetmasks;
3935 stringtok(parts, ::arg()["dont-throttle-netmasks"]);
3936 for (const auto &p : parts) {
3937 dontThrottleNetmasks.addMask(Netmask(p));
3938 }
3939 g_dontThrottleNetmasks.setState(dontThrottleNetmasks);
3940 }
3941
3942 s_balancingFactor = ::arg().asDouble("distribution-load-factor");
3943 if (s_balancingFactor != 0.0 && s_balancingFactor < 1.0) {
3944 s_balancingFactor = 0.0;
3945 g_log<<Logger::Warning<<"Asked to run with a distribution-load-factor below 1.0, disabling it instead"<<endl;
3946 }
3947
3948 #ifdef SO_REUSEPORT
3949 g_reusePort = ::arg().mustDo("reuseport");
3950 #endif
3951
3952 s_threadInfos.resize(g_numDistributorThreads + g_numWorkerThreads + /* handler */ 1);
3953
3954 if (g_reusePort) {
3955 if (g_weDistributeQueries) {
3956 /* first thread is the handler, then distributors */
3957 for (unsigned int threadId = 1; threadId <= g_numDistributorThreads; threadId++) {
3958 auto& deferredAdds = s_threadInfos.at(threadId).deferredAdds;
3959 auto& tcpSockets = s_threadInfos.at(threadId).tcpSockets;
3960 makeUDPServerSockets(deferredAdds);
3961 makeTCPServerSockets(deferredAdds, tcpSockets);
3962 }
3963 }
3964 else {
3965 /* first thread is the handler, there is no distributor here and workers are accepting queries */
3966 for (unsigned int threadId = 1; threadId <= g_numWorkerThreads; threadId++) {
3967 auto& deferredAdds = s_threadInfos.at(threadId).deferredAdds;
3968 auto& tcpSockets = s_threadInfos.at(threadId).tcpSockets;
3969 makeUDPServerSockets(deferredAdds);
3970 makeTCPServerSockets(deferredAdds, tcpSockets);
3971 }
3972 }
3973 }
3974 else {
3975 std::set<int> tcpSockets;
3976 /* we don't have reuseport so we can only open one socket per
3977 listening addr:port and everyone will listen on it */
3978 makeUDPServerSockets(g_deferredAdds);
3979 makeTCPServerSockets(g_deferredAdds, tcpSockets);
3980
3981 /* every listener (so distributor if g_weDistributeQueries, workers otherwise)
3982 needs to listen to the shared sockets */
3983 if (g_weDistributeQueries) {
3984 /* first thread is the handler, then distributors */
3985 for (unsigned int threadId = 1; threadId <= g_numDistributorThreads; threadId++) {
3986 s_threadInfos.at(threadId).tcpSockets = tcpSockets;
3987 }
3988 }
3989 else {
3990 /* first thread is the handler, there is no distributor here and workers are accepting queries */
3991 for (unsigned int threadId = 1; threadId <= g_numWorkerThreads; threadId++) {
3992 s_threadInfos.at(threadId).tcpSockets = tcpSockets;
3993 }
3994 }
3995 }
3996
3997 #ifdef NOD_ENABLED
3998 // Setup newly observed domain globals
3999 setupNODGlobal();
4000 #endif /* NOD_ENABLED */
4001
4002 int forks;
4003 for(forks = 0; forks < ::arg().asNum("processes") - 1; ++forks) {
4004 if(!fork()) // we are child
4005 break;
4006 }
4007
4008 if(::arg().mustDo("daemon")) {
4009 g_log<<Logger::Warning<<"Calling daemonize, going to background"<<endl;
4010 g_log.toConsole(Logger::Critical);
4011 daemonize();
4012 }
4013 signal(SIGUSR1,usr1Handler);
4014 signal(SIGUSR2,usr2Handler);
4015 signal(SIGPIPE,SIG_IGN);
4016
4017 checkOrFixFDS();
4018
4019 #ifdef HAVE_LIBSODIUM
4020 if (sodium_init() == -1) {
4021 g_log<<Logger::Error<<"Unable to initialize sodium crypto library"<<endl;
4022 exit(99);
4023 }
4024 #endif
4025
4026 openssl_thread_setup();
4027 openssl_seed();
4028 /* setup rng before chroot */
4029 dns_random_init();
4030
4031 if(::arg()["server-id"].empty()) {
4032 ::arg().set("server-id") = myHostname;
4033 }
4034
4035 int newgid=0;
4036 if(!::arg()["setgid"].empty())
4037 newgid = strToGID(::arg()["setgid"]);
4038 int newuid=0;
4039 if(!::arg()["setuid"].empty())
4040 newuid = strToUID(::arg()["setuid"]);
4041
4042 Utility::dropGroupPrivs(newuid, newgid);
4043
4044 if (!::arg()["chroot"].empty()) {
4045 #ifdef HAVE_SYSTEMD
4046 char *ns;
4047 ns = getenv("NOTIFY_SOCKET");
4048 if (ns != nullptr) {
4049 g_log<<Logger::Error<<"Unable to chroot when running from systemd. Please disable chroot= or set the 'Type' for this service to 'simple'"<<endl;
4050 exit(1);
4051 }
4052 #endif
4053 if (chroot(::arg()["chroot"].c_str())<0 || chdir("/") < 0) {
4054 g_log<<Logger::Error<<"Unable to chroot to '"+::arg()["chroot"]+"': "<<strerror (errno)<<", exiting"<<endl;
4055 exit(1);
4056 }
4057 else
4058 g_log<<Logger::Info<<"Chrooted to '"<<::arg()["chroot"]<<"'"<<endl;
4059 }
4060
4061 s_pidfname=::arg()["socket-dir"]+"/"+s_programname+".pid";
4062 if(!s_pidfname.empty())
4063 unlink(s_pidfname.c_str()); // remove possible old pid file
4064 writePid();
4065
4066 makeControlChannelSocket( ::arg().asNum("processes") > 1 ? forks : -1);
4067
4068 Utility::dropUserPrivs(newuid);
4069 try {
4070 /* we might still have capabilities remaining, for example if we have been started as root
4071 without --setuid (please don't do that) or as an unprivileged user with ambient capabilities
4072 like CAP_NET_BIND_SERVICE.
4073 */
4074 dropCapabilities();
4075 }
4076 catch(const std::exception& e) {
4077 g_log<<Logger::Warning<<e.what()<<endl;
4078 }
4079
4080 startLuaConfigDelayedThreads(delayedLuaThreads, g_luaconfs.getCopy().generation);
4081
4082 makeThreadPipes();
4083
4084 g_tcpTimeout=::arg().asNum("client-tcp-timeout");
4085 g_maxTCPPerClient=::arg().asNum("max-tcp-per-client");
4086 g_tcpMaxQueriesPerConn=::arg().asNum("max-tcp-queries-per-connection");
4087 s_maxUDPQueriesPerRound=::arg().asNum("max-udp-queries-per-round");
4088
4089 g_useKernelTimestamp = ::arg().mustDo("protobuf-use-kernel-timestamp");
4090
4091 blacklistStats(StatComponent::API, ::arg()["stats-api-blacklist"]);
4092 blacklistStats(StatComponent::Carbon, ::arg()["stats-carbon-blacklist"]);
4093 blacklistStats(StatComponent::RecControl, ::arg()["stats-rec-control-blacklist"]);
4094 blacklistStats(StatComponent::SNMP, ::arg()["stats-snmp-blacklist"]);
4095
4096 if (::arg().mustDo("snmp-agent")) {
4097 g_snmpAgent = std::make_shared<RecursorSNMPAgent>("recursor", ::arg()["snmp-master-socket"]);
4098 g_snmpAgent->run();
4099 }
4100
4101 int port = ::arg().asNum("udp-source-port-min");
4102 if(port < 1024 || port > 65535){
4103 g_log<<Logger::Error<<"Unable to launch, udp-source-port-min is not a valid port number"<<endl;
4104 exit(99); // this isn't going to fix itself either
4105 }
4106 s_minUdpSourcePort = port;
4107 port = ::arg().asNum("udp-source-port-max");
4108 if(port < 1024 || port > 65535 || port < s_minUdpSourcePort){
4109 g_log<<Logger::Error<<"Unable to launch, udp-source-port-max is not a valid port number or is smaller than udp-source-port-min"<<endl;
4110 exit(99); // this isn't going to fix itself either
4111 }
4112 s_maxUdpSourcePort = port;
4113 std::vector<string> parts {};
4114 stringtok(parts, ::arg()["udp-source-port-avoid"], ", ");
4115 for (const auto &part : parts)
4116 {
4117 port = std::stoi(part);
4118 if(port < 1024 || port > 65535){
4119 g_log<<Logger::Error<<"Unable to launch, udp-source-port-avoid contains an invalid port number: "<<part<<endl;
4120 exit(99); // this isn't going to fix itself either
4121 }
4122 s_avoidUdpSourcePorts.insert(port);
4123 }
4124
4125 unsigned int currentThreadId = 1;
4126 const auto cpusMap = parseCPUMap();
4127
4128 if(g_numThreads == 1) {
4129 g_log<<Logger::Warning<<"Operating unthreaded"<<endl;
4130 #ifdef HAVE_SYSTEMD
4131 sd_notify(0, "READY=1");
4132 #endif
4133
4134 /* This thread handles the web server, carbon, statistics and the control channel */
4135 auto& handlerInfos = s_threadInfos.at(0);
4136 handlerInfos.isHandler = true;
4137 handlerInfos.thread = std::thread(recursorThread, 0, "main");
4138
4139 setCPUMap(cpusMap, currentThreadId, pthread_self());
4140
4141 auto& infos = s_threadInfos.at(currentThreadId);
4142 infos.isListener = true;
4143 infos.isWorker = true;
4144 recursorThread(currentThreadId++, "worker");
4145 }
4146 else {
4147
4148 if (g_weDistributeQueries) {
4149 g_log<<Logger::Warning<<"Launching "<< g_numDistributorThreads <<" distributor threads"<<endl;
4150 for(unsigned int n=0; n < g_numDistributorThreads; ++n) {
4151 auto& infos = s_threadInfos.at(currentThreadId);
4152 infos.isListener = true;
4153 infos.thread = std::thread(recursorThread, currentThreadId++, "distr");
4154
4155 setCPUMap(cpusMap, currentThreadId, infos.thread.native_handle());
4156 }
4157 }
4158
4159 g_log<<Logger::Warning<<"Launching "<< g_numWorkerThreads <<" worker threads"<<endl;
4160
4161 for(unsigned int n=0; n < g_numWorkerThreads; ++n) {
4162 auto& infos = s_threadInfos.at(currentThreadId);
4163 infos.isListener = g_weDistributeQueries ? false : true;
4164 infos.isWorker = true;
4165 infos.thread = std::thread(recursorThread, currentThreadId++, "worker");
4166
4167 setCPUMap(cpusMap, currentThreadId, infos.thread.native_handle());
4168 }
4169
4170 #ifdef HAVE_SYSTEMD
4171 sd_notify(0, "READY=1");
4172 #endif
4173
4174 /* This thread handles the web server, carbon, statistics and the control channel */
4175 auto& infos = s_threadInfos.at(0);
4176 infos.isHandler = true;
4177 infos.thread = std::thread(recursorThread, 0, "web+stat");
4178
4179 s_threadInfos.at(0).thread.join();
4180 }
4181 return 0;
4182 }
4183
4184 static void* recursorThread(unsigned int n, const string& threadName)
4185 try
4186 {
4187 t_id=n;
4188 auto& threadInfo = s_threadInfos.at(t_id);
4189
4190 static string threadPrefix = "pdns-r/";
4191 setThreadName(threadPrefix + threadName);
4192
4193 SyncRes tmp(g_now); // make sure it allocates tsstorage before we do anything, like primeHints or so..
4194 SyncRes::setDomainMap(g_initialDomainMap);
4195 t_allowFrom = g_initialAllowFrom;
4196 t_udpclientsocks = std::unique_ptr<UDPClientSocks>(new UDPClientSocks());
4197 t_tcpClientCounts = std::unique_ptr<tcpClientCounts_t>(new tcpClientCounts_t());
4198 primeHints();
4199
4200 t_packetCache = std::unique_ptr<RecursorPacketCache>(new RecursorPacketCache());
4201
4202 g_log<<Logger::Warning<<"Done priming cache with root hints"<<endl;
4203
4204 #ifdef NOD_ENABLED
4205 if (threadInfo.isWorker)
4206 setupNODThread();
4207 #endif /* NOD_ENABLED */
4208
4209 /* the listener threads handle TCP queries */
4210 if(threadInfo.isWorker || threadInfo.isListener) {
4211 try {
4212 if(!::arg()["lua-dns-script"].empty()) {
4213 t_pdl = std::make_shared<RecursorLua4>();
4214 t_pdl->loadFile(::arg()["lua-dns-script"]);
4215 g_log<<Logger::Warning<<"Loaded 'lua' script from '"<<::arg()["lua-dns-script"]<<"'"<<endl;
4216 }
4217 }
4218 catch(std::exception &e) {
4219 g_log<<Logger::Error<<"Failed to load 'lua' script from '"<<::arg()["lua-dns-script"]<<"': "<<e.what()<<endl;
4220 _exit(99);
4221 }
4222 }
4223
4224 unsigned int ringsize=::arg().asNum("stats-ringbuffer-entries") / g_numWorkerThreads;
4225 if(ringsize) {
4226 t_remotes = std::unique_ptr<addrringbuf_t>(new addrringbuf_t());
4227 if(g_weDistributeQueries)
4228 t_remotes->set_capacity(::arg().asNum("stats-ringbuffer-entries") / g_numDistributorThreads);
4229 else
4230 t_remotes->set_capacity(ringsize);
4231 t_servfailremotes = std::unique_ptr<addrringbuf_t>(new addrringbuf_t());
4232 t_servfailremotes->set_capacity(ringsize);
4233 t_bogusremotes = std::unique_ptr<addrringbuf_t>(new addrringbuf_t());
4234 t_bogusremotes->set_capacity(ringsize);
4235 t_largeanswerremotes = std::unique_ptr<addrringbuf_t>(new addrringbuf_t());
4236 t_largeanswerremotes->set_capacity(ringsize);
4237 t_timeouts = std::unique_ptr<addrringbuf_t>(new addrringbuf_t());
4238 t_timeouts->set_capacity(ringsize);
4239
4240 t_queryring = std::unique_ptr<boost::circular_buffer<pair<DNSName, uint16_t> > >(new boost::circular_buffer<pair<DNSName, uint16_t> >());
4241 t_queryring->set_capacity(ringsize);
4242 t_servfailqueryring = std::unique_ptr<boost::circular_buffer<pair<DNSName, uint16_t> > >(new boost::circular_buffer<pair<DNSName, uint16_t> >());
4243 t_servfailqueryring->set_capacity(ringsize);
4244 t_bogusqueryring = std::unique_ptr<boost::circular_buffer<pair<DNSName, uint16_t> > >(new boost::circular_buffer<pair<DNSName, uint16_t> >());
4245 t_bogusqueryring->set_capacity(ringsize);
4246 }
4247
4248 MT=std::unique_ptr<MTasker<PacketID,string> >(new MTasker<PacketID,string>(::arg().asNum("stack-size")));
4249 threadInfo.mt = MT.get();
4250
4251 #ifdef HAVE_PROTOBUF
4252 /* start protobuf export threads if needed */
4253 auto luaconfsLocal = g_luaconfs.getLocal();
4254 checkProtobufExport(luaconfsLocal);
4255 checkOutgoingProtobufExport(luaconfsLocal);
4256 #endif /* HAVE_PROTOBUF */
4257 #ifdef HAVE_FSTRM
4258 checkFrameStreamExport(luaconfsLocal);
4259 #endif
4260
4261 PacketID pident;
4262
4263 t_fdm=getMultiplexer();
4264
4265 if(threadInfo.isHandler) {
4266 if(::arg().mustDo("webserver")) {
4267 g_log<<Logger::Warning << "Enabling web server" << endl;
4268 try {
4269 new RecursorWebServer(t_fdm);
4270 }
4271 catch(PDNSException &e) {
4272 g_log<<Logger::Error<<"Exception: "<<e.reason<<endl;
4273 exit(99);
4274 }
4275 }
4276 g_log<<Logger::Info<<"Enabled '"<< t_fdm->getName() << "' multiplexer"<<endl;
4277 }
4278 else {
4279
4280 t_fdm->addReadFD(threadInfo.pipes.readToThread, handlePipeRequest);
4281 t_fdm->addReadFD(threadInfo.pipes.readQueriesToThread, handlePipeRequest);
4282
4283 if (threadInfo.isListener) {
4284 if (g_reusePort) {
4285 /* then every listener has its own FDs */
4286 for(const auto deferred : threadInfo.deferredAdds) {
4287 t_fdm->addReadFD(deferred.first, deferred.second);
4288 }
4289 }
4290 else {
4291 /* otherwise all listeners are listening on the same ones */
4292 for(const auto deferred : g_deferredAdds) {
4293 t_fdm->addReadFD(deferred.first, deferred.second);
4294 }
4295 }
4296 }
4297 }
4298
4299 registerAllStats();
4300
4301 if(threadInfo.isHandler) {
4302 t_fdm->addReadFD(s_rcc.d_fd, handleRCC); // control channel
4303 }
4304
4305 unsigned int maxTcpClients=::arg().asNum("max-tcp-clients");
4306
4307 bool listenOnTCP(true);
4308
4309 time_t last_stat = 0;
4310 time_t last_carbon=0, last_lua_maintenance=0;
4311 time_t carbonInterval=::arg().asNum("carbon-interval");
4312 time_t luaMaintenanceInterval=::arg().asNum("lua-maintenance-interval");
4313 counter.store(0); // used to periodically execute certain tasks
4314 for(;;) {
4315 while(MT->schedule(&g_now)); // MTasker letting the mthreads do their thing
4316
4317 if(!(counter%500)) {
4318 MT->makeThread(houseKeeping, 0);
4319 }
4320
4321 if(!(counter%55)) {
4322 typedef vector<pair<int, FDMultiplexer::funcparam_t> > expired_t;
4323 expired_t expired=t_fdm->getTimeouts(g_now);
4324
4325 for(expired_t::iterator i=expired.begin() ; i != expired.end(); ++i) {
4326 shared_ptr<TCPConnection> conn=any_cast<shared_ptr<TCPConnection> >(i->second);
4327 if(g_logCommonErrors)
4328 g_log<<Logger::Warning<<"Timeout from remote TCP client "<< conn->d_remote.toStringWithPort() <<endl;
4329 t_fdm->removeReadFD(i->first);
4330 }
4331 }
4332
4333 counter++;
4334
4335 if(threadInfo.isHandler) {
4336 if(statsWanted || (g_statisticsInterval > 0 && (g_now.tv_sec - last_stat) >= g_statisticsInterval)) {
4337 doStats();
4338 last_stat = g_now.tv_sec;
4339 }
4340
4341 Utility::gettimeofday(&g_now, 0);
4342
4343 if((g_now.tv_sec - last_carbon) >= carbonInterval) {
4344 MT->makeThread(doCarbonDump, 0);
4345 last_carbon = g_now.tv_sec;
4346 }
4347 }
4348 if (t_pdl != nullptr) {
4349 // lua-dns-script directive is present, call the maintenance callback if needed
4350 /* remember that the listener threads handle TCP queries */
4351 if (threadInfo.isWorker || threadInfo.isListener) {
4352 // Only on threads processing queries
4353 if(g_now.tv_sec - last_lua_maintenance >= luaMaintenanceInterval) {
4354 t_pdl->maintenance();
4355 last_lua_maintenance = g_now.tv_sec;
4356 }
4357 }
4358 }
4359
4360 t_fdm->run(&g_now);
4361 // 'run' updates g_now for us
4362
4363 if(threadInfo.isListener) {
4364 if(listenOnTCP) {
4365 if(TCPConnection::getCurrentConnections() > maxTcpClients) { // shutdown, too many connections
4366 for(const auto fd : threadInfo.tcpSockets) {
4367 t_fdm->removeReadFD(fd);
4368 }
4369 listenOnTCP=false;
4370 }
4371 }
4372 else {
4373 if(TCPConnection::getCurrentConnections() <= maxTcpClients) { // reenable
4374 for(const auto fd : threadInfo.tcpSockets) {
4375 t_fdm->addReadFD(fd, handleNewTCPQuestion);
4376 }
4377 listenOnTCP=true;
4378 }
4379 }
4380 }
4381 }
4382 }
4383 catch(PDNSException &ae) {
4384 g_log<<Logger::Error<<"Exception: "<<ae.reason<<endl;
4385 return 0;
4386 }
4387 catch(std::exception &e) {
4388 g_log<<Logger::Error<<"STL Exception: "<<e.what()<<endl;
4389 return 0;
4390 }
4391 catch(...) {
4392 g_log<<Logger::Error<<"any other exception in main: "<<endl;
4393 return 0;
4394 }
4395
4396
4397 int main(int argc, char **argv)
4398 {
4399 g_argc = argc;
4400 g_argv = argv;
4401 g_stats.startupTime=time(0);
4402 Utility::srandom();
4403 versionSetProduct(ProductRecursor);
4404 reportBasicTypes();
4405 reportOtherTypes();
4406
4407 int ret = EXIT_SUCCESS;
4408
4409 try {
4410 ::arg().set("stack-size","stack size per mthread")="200000";
4411 ::arg().set("soa-minimum-ttl","Don't change")="0";
4412 ::arg().set("no-shuffle","Don't change")="off";
4413 ::arg().set("local-port","port to listen on")="53";
4414 ::arg().set("local-address","IP addresses to listen on, separated by spaces or commas. Also accepts ports.")="127.0.0.1";
4415 ::arg().setSwitch("non-local-bind", "Enable binding to non-local addresses by using FREEBIND / BINDANY socket options")="no";
4416 ::arg().set("trace","if we should output heaps of logging. set to 'fail' to only log failing domains")="off";
4417 ::arg().set("dnssec", "DNSSEC mode: off/process-no-validate (default)/process/log-fail/validate")="process-no-validate";
4418 ::arg().set("dnssec-log-bogus", "Log DNSSEC bogus validations")="no";
4419 ::arg().set("signature-inception-skew", "Allow the signature inception to be off by this number of seconds")="60";
4420 ::arg().set("daemon","Operate as a daemon")="no";
4421 ::arg().setSwitch("write-pid","Write a PID file")="yes";
4422 ::arg().set("loglevel","Amount of logging. Higher is more. Do not set below 3")="6";
4423 ::arg().set("disable-syslog","Disable logging to syslog, useful when running inside a supervisor that logs stdout")="no";
4424 ::arg().set("log-timestamp","Print timestamps in log lines, useful to disable when running with a tool that timestamps stdout already")="yes";
4425 ::arg().set("log-common-errors","If we should log rather common errors")="no";
4426 ::arg().set("chroot","switch to chroot jail")="";
4427 ::arg().set("setgid","If set, change group id to this gid for more security"
4428 #ifdef HAVE_SYSTEMD
4429 #define SYSTEMD_SETID_MSG ". When running inside systemd, use the User and Group settings in the unit-file!"
4430 SYSTEMD_SETID_MSG
4431 #endif
4432 )="";
4433 ::arg().set("setuid","If set, change user id to this uid for more security"
4434 #ifdef HAVE_SYSTEMD
4435 SYSTEMD_SETID_MSG
4436 #endif
4437 )="";
4438 ::arg().set("network-timeout", "Wait this number of milliseconds for network i/o")="1500";
4439 ::arg().set("threads", "Launch this number of threads")="2";
4440 ::arg().set("distributor-threads", "Launch this number of distributor threads, distributing queries to other threads")="0";
4441 ::arg().set("processes", "Launch this number of processes (EXPERIMENTAL, DO NOT CHANGE)")="1"; // if we un-experimental this, need to fix openssl rand seeding for multiple PIDs!
4442 ::arg().set("config-name","Name of this virtual configuration - will rename the binary image")="";
4443 ::arg().set("api-config-dir", "Directory where REST API stores config and zones") = "";
4444 ::arg().set("api-key", "Static pre-shared authentication key for access to the REST API") = "";
4445 ::arg().setSwitch("webserver", "Start a webserver (for REST API)") = "no";
4446 ::arg().set("webserver-address", "IP Address of webserver to listen on") = "127.0.0.1";
4447 ::arg().set("webserver-port", "Port of webserver to listen on") = "8082";
4448 ::arg().set("webserver-password", "Password required for accessing the webserver") = "";
4449 ::arg().set("webserver-allow-from","Webserver access is only allowed from these subnets")="127.0.0.1,::1";
4450 ::arg().set("webserver-loglevel", "Amount of logging in the webserver (none, normal, detailed)") = "normal";
4451 ::arg().set("carbon-ourname", "If set, overrides our reported hostname for carbon stats")="";
4452 ::arg().set("carbon-server", "If set, send metrics in carbon (graphite) format to this server IP address")="";
4453 ::arg().set("carbon-interval", "Number of seconds between carbon (graphite) updates")="30";
4454 ::arg().set("carbon-namespace", "If set overwrites the first part of the carbon string")="pdns";
4455 ::arg().set("carbon-instance", "If set overwrites the the instance name default")="recursor";
4456
4457 ::arg().set("statistics-interval", "Number of seconds between printing of recursor statistics, 0 to disable")="1800";
4458 ::arg().set("quiet","Suppress logging of questions and answers")="";
4459 ::arg().set("logging-facility","Facility to log messages as. 0 corresponds to local0")="";
4460 ::arg().set("config-dir","Location of configuration directory (recursor.conf)")=SYSCONFDIR;
4461 ::arg().set("socket-owner","Owner of socket")="";
4462 ::arg().set("socket-group","Group of socket")="";
4463 ::arg().set("socket-mode", "Permissions for socket")="";
4464
4465 ::arg().set("socket-dir",string("Where the controlsocket will live, ")+LOCALSTATEDIR+"/pdns-recursor when unset and not chrooted" )="";
4466 ::arg().set("delegation-only","Which domains we only accept delegations from")="";
4467 ::arg().set("query-local-address","Source IP address for sending queries")="0.0.0.0";
4468 ::arg().set("query-local-address6","Source IPv6 address for sending queries. IF UNSET, IPv6 WILL NOT BE USED FOR OUTGOING QUERIES")="";
4469 ::arg().set("client-tcp-timeout","Timeout in seconds when talking to TCP clients")="2";
4470 ::arg().set("max-mthreads", "Maximum number of simultaneous Mtasker threads")="2048";
4471 ::arg().set("max-tcp-clients","Maximum number of simultaneous TCP clients")="128";
4472 ::arg().set("server-down-max-fails","Maximum number of consecutive timeouts (and unreachables) to mark a server as down ( 0 => disabled )")="64";
4473 ::arg().set("server-down-throttle-time","Number of seconds to throttle all queries to a server after being marked as down")="60";
4474 ::arg().set("dont-throttle-names", "Do not throttle nameservers with this name or suffix")="";
4475 ::arg().set("dont-throttle-netmasks", "Do not throttle nameservers with this IP netmask")="";
4476 ::arg().set("hint-file", "If set, load root hints from this file")="";
4477 ::arg().set("max-cache-entries", "If set, maximum number of entries in the main cache")="1000000";
4478 ::arg().set("max-negative-ttl", "maximum number of seconds to keep a negative cached entry in memory")="3600";
4479 ::arg().set("max-cache-bogus-ttl", "maximum number of seconds to keep a Bogus (positive or negative) cached entry in memory")="3600";
4480 ::arg().set("max-cache-ttl", "maximum number of seconds to keep a cached entry in memory")="86400";
4481 ::arg().set("packetcache-ttl", "maximum number of seconds to keep a cached entry in packetcache")="3600";
4482 ::arg().set("max-packetcache-entries", "maximum number of entries to keep in the packetcache")="500000";
4483 ::arg().set("packetcache-servfail-ttl", "maximum number of seconds to keep a cached servfail entry in packetcache")="60";
4484 ::arg().set("server-id", "Returned when queried for 'id.server' TXT or NSID, defaults to hostname, set custom or 'disabled'")="";
4485 ::arg().set("stats-ringbuffer-entries", "maximum number of packets to store statistics for")="10000";
4486 ::arg().set("version-string", "string reported on version.pdns or version.bind")=fullVersionString();
4487 ::arg().set("allow-from", "If set, only allow these comma separated netmasks to recurse")=LOCAL_NETS;
4488 ::arg().set("allow-from-file", "If set, load allowed netmasks from this file")="";
4489 ::arg().set("entropy-source", "If set, read entropy from this file")="/dev/urandom";
4490 ::arg().set("dont-query", "If set, do not query these netmasks for DNS data")=DONT_QUERY;
4491 ::arg().set("max-tcp-per-client", "If set, maximum number of TCP sessions per client (IP address)")="0";
4492 ::arg().set("max-tcp-queries-per-connection", "If set, maximum number of TCP queries in a TCP connection")="0";
4493 ::arg().set("spoof-nearmiss-max", "If non-zero, assume spoofing after this many near misses")="20";
4494 ::arg().set("single-socket", "If set, only use a single socket for outgoing queries")="off";
4495 ::arg().set("auth-zones", "Zones for which we have authoritative data, comma separated domain=file pairs ")="";
4496 ::arg().set("lua-config-file", "More powerful configuration options")="";
4497 ::arg().setSwitch("allow-trust-anchor-query", "Allow queries for trustanchor.server CH TXT and negativetrustanchor.server CH TXT")="no";
4498
4499 ::arg().set("forward-zones", "Zones for which we forward queries, comma separated domain=ip pairs")="";
4500 ::arg().set("forward-zones-recurse", "Zones for which we forward queries with recursion bit, comma separated domain=ip pairs")="";
4501 ::arg().set("forward-zones-file", "File with (+)domain=ip pairs for forwarding")="";
4502 ::arg().set("export-etc-hosts", "If we should serve up contents from /etc/hosts")="off";
4503 ::arg().set("export-etc-hosts-search-suffix", "Also serve up the contents of /etc/hosts with this suffix")="";
4504 ::arg().set("etc-hosts-file", "Path to 'hosts' file")="/etc/hosts";
4505 ::arg().set("serve-rfc1918", "If we should be authoritative for RFC 1918 private IP space")="yes";
4506 ::arg().set("lua-dns-script", "Filename containing an optional 'lua' script that will be used to modify dns answers")="";
4507 ::arg().set("lua-maintenance-interval", "Number of seconds between calls to the lua user defined maintenance() function")="1";
4508 ::arg().set("latency-statistic-size","Number of latency values to calculate the qa-latency average")="10000";
4509 ::arg().setSwitch( "disable-packetcache", "Disable packetcache" )= "no";
4510 ::arg().set("ecs-ipv4-bits", "Number of bits of IPv4 address to pass for EDNS Client Subnet")="24";
4511 ::arg().set("ecs-ipv4-cache-bits", "Maximum number of bits of IPv4 mask to cache ECS response")="24";
4512 ::arg().set("ecs-ipv6-bits", "Number of bits of IPv6 address to pass for EDNS Client Subnet")="56";
4513 ::arg().set("ecs-ipv6-cache-bits", "Maximum number of bits of IPv6 mask to cache ECS response")="56";
4514 ::arg().set("ecs-minimum-ttl-override", "Set under adverse conditions, a minimum TTL for records in ECS-specific answers")="0";
4515 ::arg().set("ecs-cache-limit-ttl", "Minimum TTL to cache ECS response")="0";
4516 ::arg().set("edns-subnet-whitelist", "List of netmasks and domains that we should enable EDNS subnet for")="";
4517 ::arg().set("ecs-add-for", "List of client netmasks for which EDNS Client Subnet will be added")="0.0.0.0/0, ::/0, " LOCAL_NETS_INVERSE;
4518 ::arg().set("ecs-scope-zero-address", "Address to send to whitelisted authoritative servers for incoming queries with ECS prefix-length source of 0")="";
4519 ::arg().setSwitch( "use-incoming-edns-subnet", "Pass along received EDNS Client Subnet information")="no";
4520 ::arg().setSwitch( "pdns-distributes-queries", "If PowerDNS itself should distribute queries over threads")="yes";
4521 ::arg().setSwitch( "root-nx-trust", "If set, believe that an NXDOMAIN from the root means the TLD does not exist")="yes";
4522 ::arg().setSwitch( "any-to-tcp","Answer ANY queries with tc=1, shunting to TCP" )="no";
4523 ::arg().setSwitch( "lowercase-outgoing","Force outgoing questions to lowercase")="no";
4524 ::arg().setSwitch("gettag-needs-edns-options", "If EDNS Options should be extracted before calling the gettag() hook")="no";
4525 ::arg().set("udp-truncation-threshold", "Maximum UDP response size before we truncate")="1232";
4526 ::arg().set("edns-outgoing-bufsize", "Outgoing EDNS buffer size")="1232";
4527 ::arg().set("minimum-ttl-override", "Set under adverse conditions, a minimum TTL")="0";
4528 ::arg().set("max-qperq", "Maximum outgoing queries per query")="50";
4529 ::arg().set("max-total-msec", "Maximum total wall-clock time per query in milliseconds, 0 for unlimited")="7000";
4530 ::arg().set("max-recursion-depth", "Maximum number of internal recursion calls per query, 0 for unlimited")="40";
4531 ::arg().set("max-udp-queries-per-round", "Maximum number of UDP queries processed per recvmsg() round, before returning back to normal processing")="10000";
4532 ::arg().set("protobuf-use-kernel-timestamp", "Compute the latency of queries in protobuf messages by using the timestamp set by the kernel when the query was received (when available)")="";
4533 ::arg().set("distribution-pipe-buffer-size", "Size in bytes of the internal buffer of the pipe used by the distributor to pass incoming queries to a worker thread")="0";
4534
4535 ::arg().set("include-dir","Include *.conf files from this directory")="";
4536 ::arg().set("security-poll-suffix","Domain name from which to query security update notifications")="secpoll.powerdns.com.";
4537
4538 ::arg().setSwitch("reuseport","Enable SO_REUSEPORT allowing multiple recursors processes to listen to 1 address")="no";
4539
4540 ::arg().setSwitch("snmp-agent", "If set, register as an SNMP agent")="no";
4541 ::arg().set("snmp-master-socket", "If set and snmp-agent is set, the socket to use to register to the SNMP master")="";
4542
4543 std::string defaultBlacklistedStats = "cache-bytes, packetcache-bytes, special-memory-usage";
4544 for (size_t idx = 0; idx < 32; idx++) {
4545 defaultBlacklistedStats += ", ecs-v4-response-bits-" + std::to_string(idx + 1);
4546 }
4547 for (size_t idx = 0; idx < 128; idx++) {
4548 defaultBlacklistedStats += ", ecs-v6-response-bits-" + std::to_string(idx + 1);
4549 }
4550 ::arg().set("stats-api-blacklist", "List of statistics that are disabled when retrieving the complete list of statistics via the API")=defaultBlacklistedStats;
4551 ::arg().set("stats-carbon-blacklist", "List of statistics that are prevented from being exported via Carbon")=defaultBlacklistedStats;
4552 ::arg().set("stats-rec-control-blacklist", "List of statistics that are prevented from being exported via rec_control get-all")=defaultBlacklistedStats;
4553 ::arg().set("stats-snmp-blacklist", "List of statistics that are prevented from being exported via SNMP")=defaultBlacklistedStats;
4554
4555 ::arg().set("tcp-fast-open", "Enable TCP Fast Open support on the listening sockets, using the supplied numerical value as the queue size")="0";
4556 ::arg().set("nsec3-max-iterations", "Maximum number of iterations allowed for an NSEC3 record")="2500";
4557
4558 ::arg().set("cpu-map", "Thread to CPU mapping, space separated thread-id=cpu1,cpu2..cpuN pairs")="";
4559
4560 ::arg().setSwitch("log-rpz-changes", "Log additions and removals to RPZ zones at Info level")="no";
4561
4562 ::arg().set("xpf-allow-from","XPF information is only processed from these subnets")="";
4563 ::arg().set("xpf-rr-code","XPF option code to use")="0";
4564
4565 ::arg().set("udp-source-port-min", "Minimum UDP port to bind on")="1024";
4566 ::arg().set("udp-source-port-max", "Maximum UDP port to bind on")="65535";
4567 ::arg().set("udp-source-port-avoid", "List of comma separated UDP port number to avoid")="11211";
4568 ::arg().set("rng", "Specify random number generator to use. Valid values are auto,sodium,openssl,getrandom,arc4random,urandom.")="auto";
4569 ::arg().set("public-suffix-list-file", "Path to the Public Suffix List file, if any")="";
4570 ::arg().set("distribution-load-factor", "The load factor used when PowerDNS is distributing queries to worker threads")="0.0";
4571 ::arg().setSwitch("qname-minimization", "Use Query Name Minimization")="no";
4572 #ifdef NOD_ENABLED
4573 ::arg().set("new-domain-tracking", "Track newly observed domains (i.e. never seen before).")="no";
4574 ::arg().set("new-domain-log", "Log newly observed domains.")="yes";
4575 ::arg().set("new-domain-lookup", "Perform a DNS lookup newly observed domains as a subdomain of the configured domain")="";
4576 ::arg().set("new-domain-history-dir", "Persist new domain tracking data here to persist between restarts")=string(NODCACHEDIR)+"/nod";
4577 ::arg().set("new-domain-whitelist", "List of domains (and implicitly all subdomains) which will never be considered a new domain")="";
4578 ::arg().set("new-domain-db-size", "Size of the DB used to track new domains in terms of number of cells. Defaults to 67108864")="67108864";
4579 ::arg().set("new-domain-pb-tag", "If protobuf is configured, the tag to use for messages containing newly observed domains. Defaults to 'pdns-nod'")="pdns-nod";
4580 ::arg().set("unique-response-tracking", "Track unique responses (tuple of query name, type and RR).")="no";
4581 ::arg().set("unique-response-log", "Log unique responses")="yes";
4582 ::arg().set("unique-response-history-dir", "Persist unique response tracking data here to persist between restarts")=string(NODCACHEDIR)+"/udr";
4583 ::arg().set("unique-response-db-size", "Size of the DB used to track unique responses in terms of number of cells. Defaults to 67108864")="67108864";
4584 ::arg().set("unique-response-pb-tag", "If protobuf is configured, the tag to use for messages containing unique DNS responses. Defaults to 'pdns-udr'")="pdns-udr";
4585 #endif /* NOD_ENABLED */
4586 ::arg().setCmd("help","Provide a helpful message");
4587 ::arg().setCmd("version","Print version string");
4588 ::arg().setCmd("config","Output blank configuration");
4589 g_log.toConsole(Logger::Info);
4590 ::arg().laxParse(argc,argv); // do a lax parse
4591
4592 string configname=::arg()["config-dir"]+"/recursor.conf";
4593 if(::arg()["config-name"]!="") {
4594 configname=::arg()["config-dir"]+"/recursor-"+::arg()["config-name"]+".conf";
4595 s_programname+="-"+::arg()["config-name"];
4596 }
4597 cleanSlashes(configname);
4598
4599 if(!::arg().getCommands().empty()) {
4600 cerr<<"Fatal: non-option on the command line, perhaps a '--setting=123' statement missed the '='?"<<endl;
4601 exit(99);
4602 }
4603
4604 if(::arg().mustDo("config")) {
4605 cout<<::arg().configstring()<<endl;
4606 exit(0);
4607 }
4608
4609 if(!::arg().file(configname.c_str()))
4610 g_log<<Logger::Warning<<"Unable to parse configuration file '"<<configname<<"'"<<endl;
4611
4612 ::arg().parse(argc,argv);
4613
4614 if( !::arg()["chroot"].empty() && !::arg()["api-config-dir"].empty() ) {
4615 g_log<<Logger::Error<<"Using chroot and enabling the API is not possible"<<endl;
4616 exit(EXIT_FAILURE);
4617 }
4618
4619 if (::arg()["socket-dir"].empty()) {
4620 if (::arg()["chroot"].empty())
4621 ::arg().set("socket-dir") = std::string(LOCALSTATEDIR) + "/pdns-recursor";
4622 else
4623 ::arg().set("socket-dir") = "/";
4624 }
4625
4626 ::arg().set("delegation-only")=toLower(::arg()["delegation-only"]);
4627
4628 if(::arg().asNum("threads")==1) {
4629 if (::arg().mustDo("pdns-distributes-queries")) {
4630 g_log<<Logger::Warning<<"Only one thread, no need to distribute queries ourselves"<<endl;
4631 ::arg().set("pdns-distributes-queries")="no";
4632 }
4633 }
4634
4635 if(::arg().mustDo("pdns-distributes-queries") && ::arg().asNum("distributor-threads") <= 0) {
4636 g_log<<Logger::Warning<<"Asked to run with pdns-distributes-queries set but no distributor threads, raising to 1"<<endl;
4637 ::arg().set("distributor-threads")="1";
4638 }
4639
4640 if (!::arg().mustDo("pdns-distributes-queries")) {
4641 ::arg().set("distributor-threads")="0";
4642 }
4643
4644 if(::arg().mustDo("help")) {
4645 cout<<"syntax:"<<endl<<endl;
4646 cout<<::arg().helpstring(::arg()["help"])<<endl;
4647 exit(0);
4648 }
4649 if(::arg().mustDo("version")) {
4650 showProductVersion();
4651 showBuildConfiguration();
4652 exit(0);
4653 }
4654
4655 Logger::Urgency logUrgency = (Logger::Urgency)::arg().asNum("loglevel");
4656
4657 if (logUrgency < Logger::Error)
4658 logUrgency = Logger::Error;
4659 if(!g_quiet && logUrgency < Logger::Info) { // Logger::Info=6, Logger::Debug=7
4660 logUrgency = Logger::Info; // if you do --quiet=no, you need Info to also see the query log
4661 }
4662 g_log.setLoglevel(logUrgency);
4663 g_log.toConsole(logUrgency);
4664
4665 serviceMain(argc, argv);
4666 }
4667 catch(PDNSException &ae) {
4668 g_log<<Logger::Error<<"Exception: "<<ae.reason<<endl;
4669 ret=EXIT_FAILURE;
4670 }
4671 catch(std::exception &e) {
4672 g_log<<Logger::Error<<"STL Exception: "<<e.what()<<endl;
4673 ret=EXIT_FAILURE;
4674 }
4675 catch(...) {
4676 g_log<<Logger::Error<<"any other exception in main: "<<endl;
4677 ret=EXIT_FAILURE;
4678 }
4679
4680 return ret;
4681 }