]> git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/pdns_recursor.cc
Merge pull request #7941 from rgacogne/rec-records-from-gettag-ffi
[thirdparty/pdns.git] / pdns / pdns_recursor.cc
1 /*
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
8 *
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
22 #ifdef HAVE_CONFIG_H
23 #include "config.h"
24 #endif
25
26 #include <netdb.h>
27 #include <sys/stat.h>
28 #include <unistd.h>
29 #ifdef HAVE_BOOST_CONTAINER_FLAT_SET_HPP
30 #include <boost/container/flat_set.hpp>
31 #endif
32 #include "ws-recursor.hh"
33 #include <thread>
34 #include "threadname.hh"
35 #include "recpacketcache.hh"
36 #include "utility.hh"
37 #include "dns_random.hh"
38 #ifdef HAVE_LIBSODIUM
39 #include <sodium.h>
40 #endif
41 #include "opensslsigners.hh"
42 #include <iostream>
43 #include <errno.h>
44 #include <boost/static_assert.hpp>
45 #include <map>
46 #include <set>
47 #include "recursor_cache.hh"
48 #include "cachecleaner.hh"
49 #include <stdio.h>
50 #include <signal.h>
51 #include <stdlib.h>
52 #include "misc.hh"
53 #include "mtasker.hh"
54 #include <utility>
55 #include "arguments.hh"
56 #include "syncres.hh"
57 #include <fcntl.h>
58 #include <fstream>
59 #include "sortlist.hh"
60 #include "sstuff.hh"
61 #include <boost/tuple/tuple.hpp>
62 #include <boost/tuple/tuple_comparison.hpp>
63 #include <boost/shared_array.hpp>
64 #include <boost/function.hpp>
65 #include <boost/algorithm/string.hpp>
66 #ifdef MALLOC_TRACE
67 #include "malloctrace.hh"
68 #endif
69 #include <netinet/tcp.h>
70 #include "capabilities.hh"
71 #include "dnsparser.hh"
72 #include "dnswriter.hh"
73 #include "dnsrecords.hh"
74 #include "zoneparser-tng.hh"
75 #include "rec_channel.hh"
76 #include "logger.hh"
77 #include "iputils.hh"
78 #include "mplexer.hh"
79 #include "config.h"
80 #include "lua-recursor4.hh"
81 #include "version.hh"
82 #include "responsestats.hh"
83 #include "secpoll-recursor.hh"
84 #include "dnsname.hh"
85 #include "filterpo.hh"
86 #include "rpzloader.hh"
87 #include "validate-recursor.hh"
88 #include "rec-lua-conf.hh"
89 #include "ednsoptions.hh"
90 #include "gettime.hh"
91 #include "pubsuffix.hh"
92 #ifdef NOD_ENABLED
93 #include "nod.hh"
94 #endif /* NOD_ENABLED */
95
96 #include "rec-protobuf.hh"
97 #include "rec-snmp.hh"
98
99 #ifdef HAVE_SYSTEMD
100 #include <systemd/sd-daemon.h>
101 #endif
102
103 #include "namespaces.hh"
104
105 #ifdef HAVE_PROTOBUF
106 #include "uuid-utils.hh"
107 #endif /* HAVE_PROTOBUF */
108
109 #include "xpf.hh"
110
111 typedef map<ComboAddress, uint32_t, ComboAddress::addressOnlyLessThan> tcpClientCounts_t;
112
113 static thread_local std::shared_ptr<RecursorLua4> t_pdl;
114 static thread_local unsigned int t_id = 0;
115 static thread_local std::shared_ptr<Regex> t_traceRegex;
116 static thread_local std::unique_ptr<tcpClientCounts_t> t_tcpClientCounts;
117 #ifdef HAVE_PROTOBUF
118 static thread_local std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>> t_protobufServers{nullptr};
119 static thread_local uint64_t t_protobufServersGeneration;
120 static thread_local std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>> t_outgoingProtobufServers{nullptr};
121 static thread_local uint64_t t_outgoingProtobufServersGeneration;
122 #endif /* HAVE_PROTOBUF */
123
124 #ifdef HAVE_FSTRM
125 static thread_local std::shared_ptr<std::vector<std::unique_ptr<FrameStreamLogger>>> t_frameStreamServers{nullptr};
126 static thread_local uint64_t t_frameStreamServersGeneration;
127 #endif /* HAVE_FSTRM */
128
129 thread_local std::unique_ptr<MT_t> MT; // the big MTasker
130 thread_local std::unique_ptr<MemRecursorCache> t_RC;
131 thread_local std::unique_ptr<RecursorPacketCache> t_packetCache;
132 thread_local FDMultiplexer* t_fdm{nullptr};
133 thread_local std::unique_ptr<addrringbuf_t> t_remotes, t_servfailremotes, t_largeanswerremotes, t_bogusremotes;
134 thread_local std::unique_ptr<boost::circular_buffer<pair<DNSName, uint16_t> > > t_queryring, t_servfailqueryring, t_bogusqueryring;
135 thread_local std::shared_ptr<NetmaskGroup> t_allowFrom;
136 #ifdef NOD_ENABLED
137 thread_local std::shared_ptr<nod::NODDB> t_nodDBp;
138 thread_local std::shared_ptr<nod::UniqueResponseDB> t_udrDBp;
139 #endif /* NOD_ENABLED */
140 __thread struct timeval g_now; // timestamp, updated (too) frequently
141
142 typedef vector<pair<int, function< void(int, any&) > > > deferredAdd_t;
143
144 // for communicating with our threads
145 // effectively readonly after startup
146 struct RecThreadInfo
147 {
148 struct ThreadPipeSet
149 {
150 int writeToThread{-1};
151 int readToThread{-1};
152 int writeFromThread{-1};
153 int readFromThread{-1};
154 int writeQueriesToThread{-1}; // this one is non-blocking
155 int readQueriesToThread{-1};
156 };
157
158 /* FD corresponding to TCP sockets this thread is listening
159 on.
160 These FDs are also in deferredAdds when we have one
161 socket per listener, and in g_deferredAdds instead. */
162 std::set<int> tcpSockets;
163 /* FD corresponding to listening sockets if we have one socket per
164 listener (with reuseport), otherwise all listeners share the
165 same FD and g_deferredAdds is then used instead */
166 deferredAdd_t deferredAdds;
167 struct ThreadPipeSet pipes;
168 std::thread thread;
169 MT_t* mt{nullptr};
170 uint64_t numberOfDistributedQueries{0};
171 /* handle the web server, carbon, statistics and the control channel */
172 bool isHandler{false};
173 /* accept incoming queries (and distributes them to the workers if pdns-distributes-queries is set) */
174 bool isListener{false};
175 /* process queries */
176 bool isWorker{false};
177 };
178
179 /* first we have the handler thread, t_id == 0 (some other
180 helper threads like SNMP might have t_id == 0 as well)
181 then the distributor threads if any
182 and finally the workers */
183 static std::vector<RecThreadInfo> s_threadInfos;
184 /* without reuseport, all listeners share the same sockets */
185 static deferredAdd_t g_deferredAdds;
186
187 typedef vector<int> tcpListenSockets_t;
188 typedef map<int, ComboAddress> listenSocketsAddresses_t; // is shared across all threads right now
189
190 static const ComboAddress g_local4("0.0.0.0"), g_local6("::");
191 static listenSocketsAddresses_t g_listenSocketsAddresses; // is shared across all threads right now
192 static set<int> g_fromtosockets; // listen sockets that use 'sendfromto()' mechanism
193 static vector<ComboAddress> g_localQueryAddresses4, g_localQueryAddresses6;
194 static AtomicCounter counter;
195 static std::shared_ptr<SyncRes::domainmap_t> g_initialDomainMap; // new threads needs this to be setup
196 static std::shared_ptr<NetmaskGroup> g_initialAllowFrom; // new thread needs to be setup with this
197 static NetmaskGroup g_XPFAcl;
198 static size_t g_tcpMaxQueriesPerConn;
199 static size_t s_maxUDPQueriesPerRound;
200 static uint64_t g_latencyStatSize;
201 static uint32_t g_disthashseed;
202 static unsigned int g_maxTCPPerClient;
203 static unsigned int g_maxMThreads;
204 static unsigned int g_numDistributorThreads;
205 static unsigned int g_numWorkerThreads;
206 static int g_tcpTimeout;
207 static uint16_t g_udpTruncationThreshold;
208 static uint16_t g_xpfRRCode{0};
209 static std::atomic<bool> statsWanted;
210 static std::atomic<bool> g_quiet;
211 static bool g_logCommonErrors;
212 static bool g_anyToTcp;
213 static bool g_weDistributeQueries; // if true, 1 or more threads listen on the incoming query sockets and distribute them to workers
214 static bool g_reusePort{false};
215 static bool g_gettagNeedsEDNSOptions{false};
216 static time_t g_statisticsInterval;
217 static bool g_useIncomingECS;
218 static bool g_useKernelTimestamp;
219 std::atomic<uint32_t> g_maxCacheEntries, g_maxPacketCacheEntries;
220 #ifdef NOD_ENABLED
221 static bool g_nodEnabled;
222 static DNSName g_nodLookupDomain;
223 static bool g_nodLog;
224 static SuffixMatchNode g_nodDomainWL;
225 static std::string g_nod_pbtag;
226 static bool g_udrEnabled;
227 static bool g_udrLog;
228 static std::string g_udr_pbtag;
229 #endif /* NOD_ENABLED */
230 #ifdef HAVE_BOOST_CONTAINER_FLAT_SET_HPP
231 static boost::container::flat_set<uint16_t> s_avoidUdpSourcePorts;
232 #else
233 static std::set<uint16_t> s_avoidUdpSourcePorts;
234 #endif
235 static uint16_t s_minUdpSourcePort;
236 static uint16_t s_maxUdpSourcePort;
237 static double s_balancingFactor;
238
239 RecursorControlChannel s_rcc; // only active in the handler thread
240 RecursorStats g_stats;
241 string s_programname="pdns_recursor";
242 string s_pidfname;
243 bool g_lowercaseOutgoing;
244 unsigned int g_networkTimeoutMsec;
245 unsigned int g_numThreads;
246 uint16_t g_outgoingEDNSBufsize;
247 bool g_logRPZChanges{false};
248
249 // Used in the Syncres to not throttle certain servers
250 GlobalStateHolder<SuffixMatchNode> g_dontThrottleNames;
251 GlobalStateHolder<NetmaskGroup> g_dontThrottleNetmasks;
252
253 #define LOCAL_NETS "127.0.0.0/8, 10.0.0.0/8, 100.64.0.0/10, 169.254.0.0/16, 192.168.0.0/16, 172.16.0.0/12, ::1/128, fc00::/7, fe80::/10"
254 #define LOCAL_NETS_INVERSE "!127.0.0.0/8, !10.0.0.0/8, !100.64.0.0/10, !169.254.0.0/16, !192.168.0.0/16, !172.16.0.0/12, !::1/128, !fc00::/7, !fe80::/10"
255 // Bad Nets taken from both:
256 // http://www.iana.org/assignments/iana-ipv4-special-registry/iana-ipv4-special-registry.xhtml
257 // and
258 // http://www.iana.org/assignments/iana-ipv6-special-registry/iana-ipv6-special-registry.xhtml
259 // where such a network may not be considered a valid destination
260 #define BAD_NETS "0.0.0.0/8, 192.0.0.0/24, 192.0.2.0/24, 198.51.100.0/24, 203.0.113.0/24, 240.0.0.0/4, ::/96, ::ffff:0:0/96, 100::/64, 2001:db8::/32"
261 #define DONT_QUERY LOCAL_NETS ", " BAD_NETS
262
263 //! used to send information to a newborn mthread
264 struct DNSComboWriter {
265 DNSComboWriter(const std::string& query, const struct timeval& now): d_mdp(true, query), d_now(now), d_query(query)
266 {
267 }
268
269 DNSComboWriter(const std::string& query, const struct timeval& now, std::vector<std::string>&& policyTags, LuaContext::LuaObject&& data, std::vector<DNSRecord>&& records): d_mdp(true, query), d_now(now), d_query(query), d_policyTags(std::move(policyTags)), d_records(std::move(records)), d_data(std::move(data))
270 {
271 }
272
273 void setRemote(const ComboAddress& sa)
274 {
275 d_remote=sa;
276 }
277
278 void setSource(const ComboAddress& sa)
279 {
280 d_source=sa;
281 }
282
283 void setLocal(const ComboAddress& sa)
284 {
285 d_local=sa;
286 }
287
288 void setDestination(const ComboAddress& sa)
289 {
290 d_destination=sa;
291 }
292
293 void setSocket(int sock)
294 {
295 d_socket=sock;
296 }
297
298 string getRemote() const
299 {
300 if (d_source == d_remote) {
301 return d_source.toStringWithPort();
302 }
303 return d_source.toStringWithPort() + " (proxied by " + d_remote.toStringWithPort() + ")";
304 }
305
306 MOADNSParser d_mdp;
307 struct timeval d_now;
308 /* Remote client, might differ from d_source
309 in case of XPF, in which case d_source holds
310 the IP of the client and d_remote of the proxy
311 */
312 ComboAddress d_remote;
313 ComboAddress d_source;
314 /* Destination address, might differ from
315 d_destination in case of XPF, in which case
316 d_destination holds the IP of the proxy and
317 d_local holds our own. */
318 ComboAddress d_local;
319 ComboAddress d_destination;
320 #ifdef HAVE_PROTOBUF
321 boost::uuids::uuid d_uuid;
322 string d_requestorId;
323 string d_deviceId;
324 string d_deviceName;
325 struct timeval d_kernelTimestamp{0,0};
326 #endif
327 std::string d_query;
328 std::vector<std::string> d_policyTags;
329 std::vector<DNSRecord> d_records;
330 LuaContext::LuaObject d_data;
331 EDNSSubnetOpts d_ednssubnet;
332 shared_ptr<TCPConnection> d_tcpConnection;
333 boost::optional<int> d_rcode{boost::none};
334 int d_socket;
335 unsigned int d_tag{0};
336 uint32_t d_qhash{0};
337 uint32_t d_ttlCap{std::numeric_limits<uint32_t>::max()};
338 uint16_t d_ecsBegin{0};
339 uint16_t d_ecsEnd{0};
340 bool d_variable{false};
341 bool d_ecsFound{false};
342 bool d_ecsParsed{false};
343 bool d_followCNAMERecords{false};
344 bool d_logResponse{false};
345 bool d_tcp;
346 };
347
348 MT_t* getMT()
349 {
350 return MT ? MT.get() : nullptr;
351 }
352
353 ArgvMap &arg()
354 {
355 static ArgvMap theArg;
356 return theArg;
357 }
358
359 unsigned int getRecursorThreadId()
360 {
361 return t_id;
362 }
363
364 int getMTaskerTID()
365 {
366 return MT->getTid();
367 }
368
369 static bool isDistributorThread()
370 {
371 if (t_id == 0) {
372 return false;
373 }
374
375 return g_weDistributeQueries && s_threadInfos.at(t_id).isListener;
376 }
377
378 static bool isHandlerThread()
379 {
380 if (t_id == 0) {
381 return true;
382 }
383
384 return s_threadInfos.at(t_id).isHandler;
385 }
386
387 static void handleTCPClientWritable(int fd, FDMultiplexer::funcparam_t& var);
388
389 // -1 is error, 0 is timeout, 1 is success
390 int asendtcp(const string& data, Socket* sock)
391 {
392 PacketID pident;
393 pident.sock=sock;
394 pident.outMSG=data;
395
396 t_fdm->addWriteFD(sock->getHandle(), handleTCPClientWritable, pident);
397 string packet;
398
399 int ret=MT->waitEvent(pident, &packet, g_networkTimeoutMsec);
400
401 if(!ret || ret==-1) { // timeout
402 t_fdm->removeWriteFD(sock->getHandle());
403 }
404 else if(packet.size() !=data.size()) { // main loop tells us what it sent out, or empty in case of an error
405 return -1;
406 }
407 return ret;
408 }
409
410 static void handleTCPClientReadable(int fd, FDMultiplexer::funcparam_t& var);
411
412 // -1 is error, 0 is timeout, 1 is success
413 int arecvtcp(string& data, size_t len, Socket* sock, bool incompleteOkay)
414 {
415 data.clear();
416 PacketID pident;
417 pident.sock=sock;
418 pident.inNeeded=len;
419 pident.inIncompleteOkay=incompleteOkay;
420 t_fdm->addReadFD(sock->getHandle(), handleTCPClientReadable, pident);
421
422 int ret=MT->waitEvent(pident,&data, g_networkTimeoutMsec);
423 if(!ret || ret==-1) { // timeout
424 t_fdm->removeReadFD(sock->getHandle());
425 }
426 else if(data.empty()) {// error, EOF or other
427 return -1;
428 }
429
430 return ret;
431 }
432
433 static void handleGenUDPQueryResponse(int fd, FDMultiplexer::funcparam_t& var)
434 {
435 PacketID pident=*any_cast<PacketID>(&var);
436 char resp[512];
437 ComboAddress fromaddr;
438 socklen_t addrlen=sizeof(fromaddr);
439
440 ssize_t ret=recvfrom(fd, resp, sizeof(resp), 0, (sockaddr *)&fromaddr, &addrlen);
441 if (fromaddr != pident.remote) {
442 g_log<<Logger::Notice<<"Response received from the wrong remote host ("<<fromaddr.toStringWithPort()<<" instead of "<<pident.remote.toStringWithPort()<<"), discarding"<<endl;
443
444 }
445
446 t_fdm->removeReadFD(fd);
447 if(ret >= 0) {
448 string data(resp, (size_t) ret);
449 MT->sendEvent(pident, &data);
450 }
451 else {
452 string empty;
453 MT->sendEvent(pident, &empty);
454 // cerr<<"Had some kind of error: "<<ret<<", "<<strerror(errno)<<endl;
455 }
456 }
457 string GenUDPQueryResponse(const ComboAddress& dest, const string& query)
458 {
459 Socket s(dest.sin4.sin_family, SOCK_DGRAM);
460 s.setNonBlocking();
461 ComboAddress local = getQueryLocalAddress(dest.sin4.sin_family, 0);
462
463 s.bind(local);
464 s.connect(dest);
465 s.send(query);
466
467 PacketID pident;
468 pident.sock=&s;
469 pident.remote=dest;
470 pident.type=0;
471 t_fdm->addReadFD(s.getHandle(), handleGenUDPQueryResponse, pident);
472
473 string data;
474
475 int ret=MT->waitEvent(pident,&data, g_networkTimeoutMsec);
476
477 if(!ret || ret==-1) { // timeout
478 t_fdm->removeReadFD(s.getHandle());
479 }
480 else if(data.empty()) {// error, EOF or other
481 // we could special case this
482 return data;
483 }
484 return data;
485 }
486
487 //! pick a random query local address
488 ComboAddress getQueryLocalAddress(int family, uint16_t port)
489 {
490 ComboAddress ret;
491 if(family==AF_INET) {
492 if(g_localQueryAddresses4.empty())
493 ret = g_local4;
494 else
495 ret = g_localQueryAddresses4[dns_random(g_localQueryAddresses4.size())];
496 ret.sin4.sin_port = htons(port);
497 }
498 else {
499 if(g_localQueryAddresses6.empty())
500 ret = g_local6;
501 else
502 ret = g_localQueryAddresses6[dns_random(g_localQueryAddresses6.size())];
503
504 ret.sin6.sin6_port = htons(port);
505 }
506 return ret;
507 }
508
509 static void handleUDPServerResponse(int fd, FDMultiplexer::funcparam_t&);
510
511 static void setSocketBuffer(int fd, int optname, uint32_t size)
512 {
513 uint32_t psize=0;
514 socklen_t len=sizeof(psize);
515
516 if(!getsockopt(fd, SOL_SOCKET, optname, (char*)&psize, &len) && psize > size) {
517 g_log<<Logger::Error<<"Not decreasing socket buffer size from "<<psize<<" to "<<size<<endl;
518 return;
519 }
520
521 if (setsockopt(fd, SOL_SOCKET, optname, (char*)&size, sizeof(size)) < 0 )
522 g_log<<Logger::Error<<"Unable to raise socket buffer size to "<<size<<": "<<strerror(errno)<<endl;
523 }
524
525
526 static void setSocketReceiveBuffer(int fd, uint32_t size)
527 {
528 setSocketBuffer(fd, SO_RCVBUF, size);
529 }
530
531 static void setSocketSendBuffer(int fd, uint32_t size)
532 {
533 setSocketBuffer(fd, SO_SNDBUF, size);
534 }
535
536
537 // you can ask this class for a UDP socket to send a query from
538 // this socket is not yours, don't even think about deleting it
539 // but after you call 'returnSocket' on it, don't assume anything anymore
540 class UDPClientSocks
541 {
542 unsigned int d_numsocks;
543 public:
544 UDPClientSocks() : d_numsocks(0)
545 {
546 }
547
548 // returning -2 means: temporary OS error (ie, out of files), -1 means error related to remote
549 int getSocket(const ComboAddress& toaddr, int* fd)
550 {
551 *fd=makeClientSocket(toaddr.sin4.sin_family);
552 if(*fd < 0) // temporary error - receive exception otherwise
553 return -2;
554
555 if(connect(*fd, (struct sockaddr*)(&toaddr), toaddr.getSocklen()) < 0) {
556 int err = errno;
557 try {
558 closesocket(*fd);
559 }
560 catch(const PDNSException& e) {
561 g_log<<Logger::Error<<"Error closing UDP socket after connect() failed: "<<e.reason<<endl;
562 }
563
564 if(err==ENETUNREACH) // Seth "My Interfaces Are Like A Yo Yo" Arnold special
565 return -2;
566 return -1;
567 }
568
569 d_numsocks++;
570 return 0;
571 }
572
573 // return a socket to the pool, or simply erase it
574 void returnSocket(int fd)
575 {
576 try {
577 t_fdm->removeReadFD(fd);
578 }
579 catch(const FDMultiplexerException& e) {
580 // we sometimes return a socket that has not yet been assigned to t_fdm
581 }
582
583 try {
584 closesocket(fd);
585 }
586 catch(const PDNSException& e) {
587 g_log<<Logger::Error<<"Error closing returned UDP socket: "<<e.reason<<endl;
588 }
589
590 --d_numsocks;
591 }
592
593 private:
594
595 // returns -1 for errors which might go away, throws for ones that won't
596 static int makeClientSocket(int family)
597 {
598 int ret=socket(family, SOCK_DGRAM, 0 ); // turns out that setting CLO_EXEC and NONBLOCK from here is not a performance win on Linux (oddly enough)
599
600 if(ret < 0 && errno==EMFILE) // this is not a catastrophic error
601 return ret;
602
603 if(ret<0)
604 throw PDNSException("Making a socket for resolver (family = "+std::to_string(family)+"): "+stringerror());
605
606 // setCloseOnExec(ret); // we're not going to exec
607
608 int tries=10;
609 ComboAddress sin;
610 while(--tries) {
611 uint16_t port;
612
613 if(tries==1) // fall back to kernel 'random'
614 port = 0;
615 else {
616 do {
617 port = s_minUdpSourcePort + dns_random(s_maxUdpSourcePort - s_minUdpSourcePort + 1);
618 }
619 while (s_avoidUdpSourcePorts.count(port));
620 }
621
622 sin=getQueryLocalAddress(family, port); // does htons for us
623
624 if (::bind(ret, (struct sockaddr *)&sin, sin.getSocklen()) >= 0)
625 break;
626 }
627
628 if(!tries) {
629 closesocket(ret);
630 throw PDNSException("Resolver binding to local query client socket on "+sin.toString()+": "+stringerror());
631 }
632
633 try {
634 setReceiveSocketErrors(ret, family);
635 setNonBlocking(ret);
636 }
637 catch(...) {
638 closesocket(ret);
639 throw;
640 }
641
642 return ret;
643 }
644 };
645
646 static thread_local std::unique_ptr<UDPClientSocks> t_udpclientsocks;
647
648 /* these two functions are used by LWRes */
649 // -2 is OS error, -1 is error that depends on the remote, > 0 is success
650 int asendto(const char *data, size_t len, int flags,
651 const ComboAddress& toaddr, uint16_t id, const DNSName& domain, uint16_t qtype, int* fd)
652 {
653
654 PacketID pident;
655 pident.domain = domain;
656 pident.remote = toaddr;
657 pident.type = qtype;
658
659 // see if there is an existing outstanding request we can chain on to, using partial equivalence function
660 pair<MT_t::waiters_t::iterator, MT_t::waiters_t::iterator> chain=MT->d_waiters.equal_range(pident, PacketIDBirthdayCompare());
661
662 for(; chain.first != chain.second; chain.first++) {
663 if(chain.first->key.fd > -1) { // don't chain onto existing chained waiter!
664 /*
665 cerr<<"Orig: "<<pident.domain<<", "<<pident.remote.toString()<<", id="<<id<<endl;
666 cerr<<"Had hit: "<< chain.first->key.domain<<", "<<chain.first->key.remote.toString()<<", id="<<chain.first->key.id
667 <<", count="<<chain.first->key.chain.size()<<", origfd: "<<chain.first->key.fd<<endl;
668 */
669 chain.first->key.chain.insert(id); // we can chain
670 *fd=-1; // gets used in waitEvent / sendEvent later on
671 return 1;
672 }
673 }
674
675 int ret=t_udpclientsocks->getSocket(toaddr, fd);
676 if(ret < 0)
677 return ret;
678
679 pident.fd=*fd;
680 pident.id=id;
681
682 t_fdm->addReadFD(*fd, handleUDPServerResponse, pident);
683 ret = send(*fd, data, len, 0);
684
685 int tmp = errno;
686
687 if(ret < 0)
688 t_udpclientsocks->returnSocket(*fd);
689
690 errno = tmp; // this is for logging purposes only
691 return ret;
692 }
693
694 // -1 is error, 0 is timeout, 1 is success
695 int arecvfrom(std::string& packet, int flags, const ComboAddress& fromaddr, size_t *d_len,
696 uint16_t id, const DNSName& domain, uint16_t qtype, int fd, struct timeval* now)
697 {
698 static optional<unsigned int> nearMissLimit;
699 if(!nearMissLimit)
700 nearMissLimit=::arg().asNum("spoof-nearmiss-max");
701
702 PacketID pident;
703 pident.fd=fd;
704 pident.id=id;
705 pident.domain=domain;
706 pident.type = qtype;
707 pident.remote=fromaddr;
708
709 int ret=MT->waitEvent(pident, &packet, g_networkTimeoutMsec, now);
710
711 /* -1 means error, 0 means timeout, 1 means a result from handleUDPServerResponse() which might still be an error */
712 if(ret > 0) {
713 /* handleUDPServerResponse() will close the socket for us no matter what */
714 if(packet.empty()) // means "error"
715 return -1;
716
717 *d_len=packet.size();
718
719 if(*nearMissLimit && pident.nearMisses > *nearMissLimit) {
720 g_log<<Logger::Error<<"Too many ("<<pident.nearMisses<<" > "<<*nearMissLimit<<") bogus answers for '"<<domain<<"' from "<<fromaddr.toString()<<", assuming spoof attempt."<<endl;
721 g_stats.spoofCount++;
722 return -1;
723 }
724 }
725 else {
726 /* getting there means error or timeout, it's up to us to close the socket */
727 if(fd >= 0)
728 t_udpclientsocks->returnSocket(fd);
729 }
730 return ret;
731 }
732
733 static void writePid(void)
734 {
735 if(!::arg().mustDo("write-pid"))
736 return;
737 ofstream of(s_pidfname.c_str(), std::ios_base::app);
738 if(of)
739 of<< Utility::getpid() <<endl;
740 else
741 g_log<<Logger::Error<<"Writing pid for "<<Utility::getpid()<<" to "<<s_pidfname<<" failed: "<<strerror(errno)<<endl;
742 }
743
744 TCPConnection::TCPConnection(int fd, const ComboAddress& addr) : data(2, 0), d_remote(addr), d_fd(fd)
745 {
746 ++s_currentConnections;
747 (*t_tcpClientCounts)[d_remote]++;
748 }
749
750 TCPConnection::~TCPConnection()
751 {
752 try {
753 if(closesocket(d_fd) < 0)
754 g_log<<Logger::Error<<"Error closing socket for TCPConnection"<<endl;
755 }
756 catch(const PDNSException& e) {
757 g_log<<Logger::Error<<"Error closing TCPConnection socket: "<<e.reason<<endl;
758 }
759
760 if(t_tcpClientCounts->count(d_remote) && !(*t_tcpClientCounts)[d_remote]--)
761 t_tcpClientCounts->erase(d_remote);
762 --s_currentConnections;
763 }
764
765 AtomicCounter TCPConnection::s_currentConnections;
766
767 static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var);
768
769 // the idea is, only do things that depend on the *response* here. Incoming accounting is on incoming.
770 static void updateResponseStats(int res, const ComboAddress& remote, unsigned int packetsize, const DNSName* query, uint16_t qtype)
771 {
772 if(packetsize > 1000 && t_largeanswerremotes)
773 t_largeanswerremotes->push_back(remote);
774 switch(res) {
775 case RCode::ServFail:
776 if(t_servfailremotes) {
777 t_servfailremotes->push_back(remote);
778 if(query && t_servfailqueryring) // packet cache
779 t_servfailqueryring->push_back(make_pair(*query, qtype));
780 }
781 g_stats.servFails++;
782 break;
783 case RCode::NXDomain:
784 g_stats.nxDomains++;
785 break;
786 case RCode::NoError:
787 g_stats.noErrors++;
788 break;
789 }
790 }
791
792 static string makeLoginfo(const std::unique_ptr<DNSComboWriter>& dc)
793 try
794 {
795 return "("+dc->d_mdp.d_qname.toLogString()+"/"+DNSRecordContent::NumberToType(dc->d_mdp.d_qtype)+" from "+(dc->getRemote())+")";
796 }
797 catch(...)
798 {
799 return "Exception making error message for exception";
800 }
801
802 #ifdef HAVE_PROTOBUF
803 static void protobufLogQuery(uint8_t maskV4, uint8_t maskV6, const boost::uuids::uuid& uniqueId, const ComboAddress& remote, const ComboAddress& local, const Netmask& ednssubnet, bool tcp, uint16_t id, size_t len, const DNSName& qname, uint16_t qtype, uint16_t qclass, const std::vector<std::string>& policyTags, const std::string& requestorId, const std::string& deviceId, const std::string& deviceName)
804 {
805 if (!t_protobufServers) {
806 return;
807 }
808
809 Netmask requestorNM(remote, remote.sin4.sin_family == AF_INET ? maskV4 : maskV6);
810 const ComboAddress& requestor = requestorNM.getMaskedNetwork();
811 RecProtoBufMessage message(DNSProtoBufMessage::Query, uniqueId, &requestor, &local, qname, qtype, qclass, id, tcp, len);
812 message.setServerIdentity(SyncRes::s_serverID);
813 message.setEDNSSubnet(ednssubnet, ednssubnet.isIpv4() ? maskV4 : maskV6);
814 message.setRequestorId(requestorId);
815 message.setDeviceId(deviceId);
816 message.setDeviceName(deviceName);
817
818 if (!policyTags.empty()) {
819 message.setPolicyTags(policyTags);
820 }
821
822 // cerr <<message.toDebugString()<<endl;
823 std::string str;
824 message.serialize(str);
825
826 for (auto& server : *t_protobufServers) {
827 server->queueData(str);
828 }
829 }
830
831 static void protobufLogResponse(const RecProtoBufMessage& message)
832 {
833 if (!t_protobufServers) {
834 return;
835 }
836
837 // cerr <<message.toDebugString()<<endl;
838 std::string str;
839 message.serialize(str);
840
841 for (auto& server : *t_protobufServers) {
842 server->queueData(str);
843 }
844 }
845 #endif
846
847 /**
848 * Chases the CNAME provided by the PolicyCustom RPZ policy.
849 *
850 * @param spoofed: The DNSRecord that was created by the policy, should already be added to ret
851 * @param qtype: The QType of the original query
852 * @param sr: A SyncRes
853 * @param res: An integer that will contain the RCODE of the lookup we do
854 * @param ret: A vector of DNSRecords where the result of the CNAME chase should be appended to
855 */
856 static void handleRPZCustom(const DNSRecord& spoofed, const QType& qtype, SyncRes& sr, int& res, vector<DNSRecord>& ret)
857 {
858 if (spoofed.d_type == QType::CNAME) {
859 bool oldWantsRPZ = sr.getWantsRPZ();
860 sr.setWantsRPZ(false);
861 vector<DNSRecord> ans;
862 res = sr.beginResolve(DNSName(spoofed.d_content->getZoneRepresentation()), qtype, QClass::IN, ans);
863 for (const auto& rec : ans) {
864 if(rec.d_place == DNSResourceRecord::ANSWER) {
865 ret.push_back(rec);
866 }
867 }
868 // Reset the RPZ state of the SyncRes
869 sr.setWantsRPZ(oldWantsRPZ);
870 }
871 }
872
873 static bool addRecordToPacket(DNSPacketWriter& pw, const DNSRecord& rec, uint32_t& minTTL, uint32_t ttlCap, const uint16_t maxAnswerSize)
874 {
875 pw.startRecord(rec.d_name, rec.d_type, (rec.d_ttl > ttlCap ? ttlCap : rec.d_ttl), rec.d_class, rec.d_place);
876
877 if(rec.d_type != QType::OPT) // their TTL ain't real
878 minTTL = min(minTTL, rec.d_ttl);
879
880 rec.d_content->toPacket(pw);
881 if(pw.size() > static_cast<size_t>(maxAnswerSize)) {
882 pw.rollback();
883 if(rec.d_place != DNSResourceRecord::ADDITIONAL) {
884 pw.getHeader()->tc=1;
885 pw.truncate();
886 }
887 return false;
888 }
889
890 return true;
891 }
892
893 #ifdef HAVE_PROTOBUF
894 static std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>> startProtobufServers(const ProtobufExportConfig& config)
895 {
896 auto result = std::make_shared<std::vector<std::unique_ptr<RemoteLogger>>>();
897
898 for (const auto& server : config.servers) {
899 try {
900 auto logger = make_unique<RemoteLogger>(server, config.timeout, 100*config.maxQueuedEntries, config.reconnectWaitTime, config.asyncConnect);
901 logger->setLogQueries(config.logQueries);
902 logger->setLogResponses(config.logResponses);
903 result->emplace_back(std::move(logger));
904 }
905 catch(const std::exception& e) {
906 g_log<<Logger::Error<<"Error while starting protobuf logger to '"<<server<<": "<<e.what()<<endl;
907 }
908 catch(const PDNSException& e) {
909 g_log<<Logger::Error<<"Error while starting protobuf logger to '"<<server<<": "<<e.reason<<endl;
910 }
911 }
912
913 return result;
914 }
915
916 static bool checkProtobufExport(LocalStateHolder<LuaConfigItems>& luaconfsLocal)
917 {
918 if (!luaconfsLocal->protobufExportConfig.enabled) {
919 if (t_protobufServers) {
920 for (auto& server : *t_protobufServers) {
921 server->stop();
922 }
923 t_protobufServers.reset();
924 }
925
926 return false;
927 }
928
929 /* if the server was not running, or if it was running according to a
930 previous configuration */
931 if (!t_protobufServers ||
932 t_protobufServersGeneration < luaconfsLocal->generation) {
933
934 if (t_protobufServers) {
935 for (auto& server : *t_protobufServers) {
936 server->stop();
937 }
938 }
939 t_protobufServers.reset();
940
941 t_protobufServers = startProtobufServers(luaconfsLocal->protobufExportConfig);
942 t_protobufServersGeneration = luaconfsLocal->generation;
943 }
944
945 return true;
946 }
947
948 static bool checkOutgoingProtobufExport(LocalStateHolder<LuaConfigItems>& luaconfsLocal)
949 {
950 if (!luaconfsLocal->outgoingProtobufExportConfig.enabled) {
951 if (t_outgoingProtobufServers) {
952 for (auto& server : *t_outgoingProtobufServers) {
953 server->stop();
954 }
955 }
956 t_outgoingProtobufServers.reset();
957
958 return false;
959 }
960
961 /* if the server was not running, or if it was running according to a
962 previous configuration */
963 if (!t_outgoingProtobufServers ||
964 t_outgoingProtobufServersGeneration < luaconfsLocal->generation) {
965
966 if (t_outgoingProtobufServers) {
967 for (auto& server : *t_outgoingProtobufServers) {
968 server->stop();
969 }
970 }
971 t_outgoingProtobufServers.reset();
972
973 t_outgoingProtobufServers = startProtobufServers(luaconfsLocal->outgoingProtobufExportConfig);
974 t_outgoingProtobufServersGeneration = luaconfsLocal->generation;
975 }
976
977 return true;
978 }
979
980 #ifdef HAVE_FSTRM
981
982 static std::shared_ptr<std::vector<std::unique_ptr<FrameStreamLogger>>> startFrameStreamServers(const FrameStreamExportConfig& config)
983 {
984 auto result = std::make_shared<std::vector<std::unique_ptr<FrameStreamLogger>>>();
985
986 for (const auto& server : config.servers) {
987 try {
988 std::unordered_map<string,unsigned> options;
989 options["bufferHint"] = config.bufferHint;
990 options["flushTimeout"] = config.flushTimeout;
991 options["inputQueueSize"] = config.inputQueueSize;
992 options["outputQueueSize"] = config.outputQueueSize;
993 options["queueNotifyThreshold"] = config.queueNotifyThreshold;
994 options["reopenInterval"] = config.reopenInterval;
995 FrameStreamLogger *fsl = nullptr;
996 try {
997 ComboAddress address(server);
998 fsl = new FrameStreamLogger(address.sin4.sin_family, address.toStringWithPort(), true, options);
999 }
1000 catch (const PDNSException& e) {
1001 fsl = new FrameStreamLogger(AF_UNIX, server, true, options);
1002 }
1003 fsl->setLogQueries(config.logQueries);
1004 fsl->setLogResponses(config.logResponses);
1005 result->emplace_back(fsl);
1006 }
1007 catch(const std::exception& e) {
1008 g_log<<Logger::Error<<"Error while starting dnstap framestream logger to '"<<server<<": "<<e.what()<<endl;
1009 }
1010 catch(const PDNSException& e) {
1011 g_log<<Logger::Error<<"Error while starting dnstap framestream logger to '"<<server<<": "<<e.reason<<endl;
1012 }
1013 }
1014
1015 return result;
1016 }
1017
1018 static bool checkFrameStreamExport(LocalStateHolder<LuaConfigItems>& luaconfsLocal)
1019 {
1020 if (!luaconfsLocal->frameStreamExportConfig.enabled) {
1021 if (t_frameStreamServers) {
1022 // dt's take care of cleanup
1023 t_frameStreamServers.reset();
1024 }
1025
1026 return false;
1027 }
1028
1029 /* if the server was not running, or if it was running according to a
1030 previous configuration */
1031 if (!t_frameStreamServers ||
1032 t_frameStreamServersGeneration < luaconfsLocal->generation) {
1033
1034 if (t_frameStreamServers) {
1035 // dt's take care of cleanup
1036 t_frameStreamServers.reset();
1037 }
1038
1039 t_frameStreamServers = startFrameStreamServers(luaconfsLocal->frameStreamExportConfig);
1040 t_frameStreamServersGeneration = luaconfsLocal->generation;
1041 }
1042
1043 return true;
1044 }
1045 #endif /* HAVE_FSTRM */
1046 #endif /* HAVE_PROTOBUF */
1047
1048 #ifdef NOD_ENABLED
1049 static bool nodCheckNewDomain(const DNSName& dname)
1050 {
1051 static const QType qt(QType::A);
1052 static const uint16_t qc(QClass::IN);
1053 bool ret = false;
1054 // First check the (sub)domain isn't whitelisted for NOD purposes
1055 if (!g_nodDomainWL.check(dname)) {
1056 // Now check the NODDB (note this is probablistic so can have FNs/FPs)
1057 if (t_nodDBp && t_nodDBp->isNewDomain(dname)) {
1058 if (g_nodLog) {
1059 // This should probably log to a dedicated log file
1060 g_log<<Logger::Notice<<"Newly observed domain nod="<<dname.toLogString()<<endl;
1061 }
1062 if (!(g_nodLookupDomain.isRoot())) {
1063 // Send a DNS A query to <domain>.g_nodLookupDomain
1064 DNSName qname = dname;
1065 vector<DNSRecord> dummy;
1066 qname += g_nodLookupDomain;
1067 directResolve(qname, qt, qc, dummy);
1068 }
1069 ret = true;
1070 }
1071 }
1072 return ret;
1073 }
1074
1075 static void nodAddDomain(const DNSName& dname)
1076 {
1077 // Don't bother adding domains on the nod whitelist
1078 if (!g_nodDomainWL.check(dname)) {
1079 if (t_nodDBp) {
1080 // This keeps the nod info up to date
1081 t_nodDBp->addDomain(dname);
1082 }
1083 }
1084 }
1085
1086 static bool udrCheckUniqueDNSRecord(const DNSName& dname, uint16_t qtype, const DNSRecord& record)
1087 {
1088 bool ret = false;
1089 if (record.d_place == DNSResourceRecord::ANSWER ||
1090 record.d_place == DNSResourceRecord::ADDITIONAL) {
1091 // Create a string that represent a triplet of (qname, qtype and RR[type, name, content])
1092 std::stringstream ss;
1093 ss << dname.toDNSStringLC() << ":" << qtype << ":" << qtype << ":" << record.d_type << ":" << record.d_name.toDNSStringLC() << ":" << record.d_content->getZoneRepresentation();
1094 if (t_udrDBp && t_udrDBp->isUniqueResponse(ss.str())) {
1095 if (g_udrLog) {
1096 // This should also probably log to a dedicated file.
1097 g_log<<Logger::Notice<<"Unique response observed: qname="<<dname.toLogString()<<" qtype="<<QType(qtype).getName()<< " rrtype=" << QType(record.d_type).getName() << " rrname=" << record.d_name.toLogString() << " rrcontent=" << record.d_content->getZoneRepresentation() << endl;
1098 }
1099 ret = true;
1100 }
1101 }
1102 return ret;
1103 }
1104 #endif /* NOD_ENABLED */
1105
1106 int followCNAMERecords(vector<DNSRecord>& ret, const QType& qtype)
1107 {
1108 vector<DNSRecord> resolved;
1109 DNSName target;
1110 for(const DNSRecord& rr : ret) {
1111 if(rr.d_type == QType::CNAME) {
1112 auto rec = getRR<CNAMERecordContent>(rr);
1113 if(rec) {
1114 target=rec->getTarget();
1115 break;
1116 }
1117 }
1118 }
1119
1120 if(target.empty()) {
1121 return 0;
1122 }
1123
1124 int rcode = directResolve(target, qtype, QClass::IN, resolved);
1125
1126 for(DNSRecord& rr : resolved) {
1127 ret.push_back(std::move(rr));
1128 }
1129 return rcode;
1130 }
1131
1132 static void startDoResolve(void *p)
1133 {
1134 auto dc=std::unique_ptr<DNSComboWriter>(reinterpret_cast<DNSComboWriter*>(p));
1135 try {
1136 if (t_queryring)
1137 t_queryring->push_back(make_pair(dc->d_mdp.d_qname, dc->d_mdp.d_qtype));
1138
1139 uint16_t maxanswersize = dc->d_tcp ? 65535 : min(static_cast<uint16_t>(512), g_udpTruncationThreshold);
1140 EDNSOpts edo;
1141 std::vector<pair<uint16_t, string> > ednsOpts;
1142 bool variableAnswer = dc->d_variable;
1143 bool haveEDNS=false;
1144 #ifdef NOD_ENABLED
1145 bool hasUDR = false;
1146 #endif /* NOD_ENABLED */
1147 DNSPacketWriter::optvect_t returnedEdnsOptions; // Here we stuff all the options for the return packet
1148 uint8_t ednsExtRCode = 0;
1149 if(getEDNSOpts(dc->d_mdp, &edo)) {
1150 haveEDNS=true;
1151 if (edo.d_version != 0) {
1152 ednsExtRCode = ERCode::BADVERS;
1153 }
1154
1155 if(!dc->d_tcp) {
1156 /* rfc6891 6.2.3:
1157 "Values lower than 512 MUST be treated as equal to 512."
1158 */
1159 maxanswersize = min(static_cast<uint16_t>(edo.d_packetsize >= 512 ? edo.d_packetsize : 512), g_udpTruncationThreshold);
1160 }
1161 ednsOpts = edo.d_options;
1162 maxanswersize -= 11; // EDNS header size
1163
1164 for (const auto& o : edo.d_options) {
1165 if (o.first == EDNSOptionCode::ECS && g_useIncomingECS && !dc->d_ecsParsed) {
1166 dc->d_ecsFound = getEDNSSubnetOptsFromString(o.second, &dc->d_ednssubnet);
1167 } else if (o.first == EDNSOptionCode::NSID) {
1168 const static string mode_server_id = ::arg()["server-id"];
1169 if(mode_server_id != "disabled" && !mode_server_id.empty() &&
1170 maxanswersize > (2 + 2 + mode_server_id.size())) {
1171 returnedEdnsOptions.push_back(make_pair(EDNSOptionCode::NSID, mode_server_id));
1172 variableAnswer = true; // Can't packetcache an answer with NSID
1173 // Option Code and Option Length are both 2
1174 maxanswersize -= 2 + 2 + mode_server_id.size();
1175 }
1176 }
1177 }
1178 }
1179 /* perhaps there was no EDNS or no ECS but by now we looked */
1180 dc->d_ecsParsed = true;
1181 vector<DNSRecord> ret;
1182 vector<uint8_t> packet;
1183
1184 auto luaconfsLocal = g_luaconfs.getLocal();
1185 // Used to tell syncres later on if we should apply NSDNAME and NSIP RPZ triggers for this query
1186 bool wantsRPZ(true);
1187 boost::optional<RecProtoBufMessage> pbMessage(boost::none);
1188 #ifdef HAVE_PROTOBUF
1189 if (checkProtobufExport(luaconfsLocal)) {
1190 Netmask requestorNM(dc->d_source, dc->d_source.sin4.sin_family == AF_INET ? luaconfsLocal->protobufMaskV4 : luaconfsLocal->protobufMaskV6);
1191 const ComboAddress& requestor = requestorNM.getMaskedNetwork();
1192 pbMessage = RecProtoBufMessage(RecProtoBufMessage::Response, dc->d_uuid, &requestor, &dc->d_destination, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_mdp.d_qclass, dc->d_mdp.d_header.id, dc->d_tcp, 0);
1193 pbMessage->setServerIdentity(SyncRes::s_serverID);
1194 pbMessage->setEDNSSubnet(dc->d_ednssubnet.source, dc->d_ednssubnet.source.isIpv4() ? luaconfsLocal->protobufMaskV4 : luaconfsLocal->protobufMaskV6);
1195 }
1196 #endif /* HAVE_PROTOBUF */
1197
1198 #ifdef HAVE_FSTRM
1199 checkFrameStreamExport(luaconfsLocal);
1200 #endif
1201
1202 DNSPacketWriter pw(packet, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_mdp.d_qclass);
1203
1204 pw.getHeader()->aa=0;
1205 pw.getHeader()->ra=1;
1206 pw.getHeader()->qr=1;
1207 pw.getHeader()->tc=0;
1208 pw.getHeader()->id=dc->d_mdp.d_header.id;
1209 pw.getHeader()->rd=dc->d_mdp.d_header.rd;
1210 pw.getHeader()->cd=dc->d_mdp.d_header.cd;
1211
1212 /* This is the lowest TTL seen in the records of the response,
1213 so we can't cache it for longer than this value.
1214 If we have a TTL cap, this value can't be larger than the
1215 cap no matter what. */
1216 uint32_t minTTL = dc->d_ttlCap;
1217
1218 SyncRes sr(dc->d_now);
1219 sr.setId(MT->getTid());
1220
1221 bool DNSSECOK=false;
1222 if(t_pdl) {
1223 sr.setLuaEngine(t_pdl);
1224 }
1225 if(g_dnssecmode != DNSSECMode::Off) {
1226 sr.setDoDNSSEC(true);
1227
1228 // Does the requestor want DNSSEC records?
1229 if(edo.d_extFlags & EDNSOpts::DNSSECOK) {
1230 DNSSECOK=true;
1231 g_stats.dnssecQueries++;
1232 }
1233 if (dc->d_mdp.d_header.cd) {
1234 /* Per rfc6840 section 5.9, "When processing a request with
1235 the Checking Disabled (CD) bit set, a resolver SHOULD attempt
1236 to return all response data, even data that has failed DNSSEC
1237 validation. */
1238 ++g_stats.dnssecCheckDisabledQueries;
1239 }
1240 if (dc->d_mdp.d_header.ad) {
1241 /* Per rfc6840 section 5.7, "the AD bit in a query as a signal
1242 indicating that the requester understands and is interested in the
1243 value of the AD bit in the response. This allows a requester to
1244 indicate that it understands the AD bit without also requesting
1245 DNSSEC data via the DO bit. */
1246 ++g_stats.dnssecAuthenticDataQueries;
1247 }
1248 } else {
1249 // Ignore the client-set CD flag
1250 pw.getHeader()->cd=0;
1251 }
1252 sr.setDNSSECValidationRequested(g_dnssecmode == DNSSECMode::ValidateAll || g_dnssecmode==DNSSECMode::ValidateForLog || ((dc->d_mdp.d_header.ad || DNSSECOK) && g_dnssecmode==DNSSECMode::Process));
1253
1254 #ifdef HAVE_PROTOBUF
1255 sr.setInitialRequestId(dc->d_uuid);
1256 sr.setOutgoingProtobufServers(t_outgoingProtobufServers);
1257 #endif
1258 #ifdef HAVE_FSTRM
1259 sr.setFrameStreamServers(t_frameStreamServers);
1260 #endif
1261 sr.setQuerySource(dc->d_remote, g_useIncomingECS && !dc->d_ednssubnet.source.empty() ? boost::optional<const EDNSSubnetOpts&>(dc->d_ednssubnet) : boost::none);
1262
1263 bool tracedQuery=false; // we could consider letting Lua know about this too
1264 bool shouldNotValidate = false;
1265
1266 /* preresolve expects res (dq.rcode) to be set to RCode::NoError by default */
1267 int res = RCode::NoError;
1268
1269 DNSFilterEngine::Policy appliedPolicy;
1270 std::vector<DNSRecord> spoofed;
1271 RecursorLua4::DNSQuestion dq(dc->d_source, dc->d_destination, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_tcp, variableAnswer, wantsRPZ, dc->d_logResponse);
1272 dq.ednsFlags = &edo.d_extFlags;
1273 dq.ednsOptions = &ednsOpts;
1274 dq.tag = dc->d_tag;
1275 dq.discardedPolicies = &sr.d_discardedPolicies;
1276 dq.policyTags = &dc->d_policyTags;
1277 dq.appliedPolicy = &appliedPolicy;
1278 dq.currentRecords = &ret;
1279 dq.dh = &dc->d_mdp.d_header;
1280 dq.data = dc->d_data;
1281 #ifdef HAVE_PROTOBUF
1282 dq.requestorId = dc->d_requestorId;
1283 dq.deviceId = dc->d_deviceId;
1284 dq.deviceName = dc->d_deviceName;
1285 #endif
1286
1287 if(ednsExtRCode != 0) {
1288 goto sendit;
1289 }
1290
1291 if(dc->d_mdp.d_qtype==QType::ANY && !dc->d_tcp && g_anyToTcp) {
1292 pw.getHeader()->tc = 1;
1293 res = 0;
1294 variableAnswer = true;
1295 goto sendit;
1296 }
1297
1298 if(t_traceRegex && t_traceRegex->match(dc->d_mdp.d_qname.toString())) {
1299 sr.setLogMode(SyncRes::Store);
1300 tracedQuery=true;
1301 }
1302
1303 if(!g_quiet || tracedQuery) {
1304 g_log<<Logger::Warning<<t_id<<" ["<<MT->getTid()<<"/"<<MT->numProcesses()<<"] " << (dc->d_tcp ? "TCP " : "") << "question for '"<<dc->d_mdp.d_qname<<"|"
1305 <<DNSRecordContent::NumberToType(dc->d_mdp.d_qtype)<<"' from "<<dc->getRemote();
1306 if(!dc->d_ednssubnet.source.empty()) {
1307 g_log<<" (ecs "<<dc->d_ednssubnet.source.toString()<<")";
1308 }
1309 g_log<<endl;
1310 }
1311
1312 if(!dc->d_mdp.d_header.rd) {
1313 sr.setCacheOnly();
1314 }
1315
1316 if (dc->d_rcode != boost::none) {
1317 /* we have a response ready to go, most likely from gettag_ffi */
1318 ret = std::move(dc->d_records);
1319 res = *dc->d_rcode;
1320 if (res == RCode::NoError && dc->d_followCNAMERecords) {
1321 res = followCNAMERecords(ret, QType(dc->d_mdp.d_qtype));
1322 }
1323 goto haveAnswer;
1324 }
1325
1326 if (t_pdl) {
1327 t_pdl->prerpz(dq, res);
1328 }
1329
1330 // Check if the query has a policy attached to it
1331 if (wantsRPZ) {
1332 appliedPolicy = luaconfsLocal->dfe.getQueryPolicy(dc->d_mdp.d_qname, dc->d_source, sr.d_discardedPolicies);
1333 }
1334
1335 // if there is a RecursorLua active, and it 'took' the query in preResolve, we don't launch beginResolve
1336 if(!t_pdl || !t_pdl->preresolve(dq, res)) {
1337
1338 sr.setWantsRPZ(wantsRPZ);
1339 if(wantsRPZ) {
1340 switch(appliedPolicy.d_kind) {
1341 case DNSFilterEngine::PolicyKind::NoAction:
1342 break;
1343 case DNSFilterEngine::PolicyKind::Drop:
1344 g_stats.policyDrops++;
1345 g_stats.policyResults[appliedPolicy.d_kind]++;
1346 return;
1347 case DNSFilterEngine::PolicyKind::NXDOMAIN:
1348 g_stats.policyResults[appliedPolicy.d_kind]++;
1349 res=RCode::NXDomain;
1350 goto haveAnswer;
1351 case DNSFilterEngine::PolicyKind::NODATA:
1352 g_stats.policyResults[appliedPolicy.d_kind]++;
1353 res=RCode::NoError;
1354 goto haveAnswer;
1355 case DNSFilterEngine::PolicyKind::Custom:
1356 g_stats.policyResults[appliedPolicy.d_kind]++;
1357 res=RCode::NoError;
1358 spoofed=appliedPolicy.getCustomRecords(dc->d_mdp.d_qname, dc->d_mdp.d_qtype);
1359 for (const auto& dr : spoofed) {
1360 ret.push_back(dr);
1361 handleRPZCustom(dr, QType(dc->d_mdp.d_qtype), sr, res, ret);
1362 }
1363 goto haveAnswer;
1364 case DNSFilterEngine::PolicyKind::Truncate:
1365 if(!dc->d_tcp) {
1366 g_stats.policyResults[appliedPolicy.d_kind]++;
1367 res=RCode::NoError;
1368 pw.getHeader()->tc=1;
1369 goto haveAnswer;
1370 }
1371 break;
1372 }
1373 }
1374
1375 // Query got not handled for QNAME Policy reasons, now actually go out to find an answer
1376 try {
1377 res = sr.beginResolve(dc->d_mdp.d_qname, QType(dc->d_mdp.d_qtype), dc->d_mdp.d_qclass, ret);
1378 shouldNotValidate = sr.wasOutOfBand();
1379 }
1380 catch(ImmediateServFailException &e) {
1381 if(g_logCommonErrors)
1382 g_log<<Logger::Notice<<"Sending SERVFAIL to "<<dc->getRemote()<<" during resolve of '"<<dc->d_mdp.d_qname<<"' because: "<<e.reason<<endl;
1383 res = RCode::ServFail;
1384 }
1385
1386 dq.validationState = sr.getValidationState();
1387
1388 // During lookup, an NSDNAME or NSIP trigger was hit in RPZ
1389 if (res == -2) { // XXX This block should be macro'd, it is repeated post-resolve.
1390 appliedPolicy = sr.d_appliedPolicy;
1391 g_stats.policyResults[appliedPolicy.d_kind]++;
1392 switch(appliedPolicy.d_kind) {
1393 case DNSFilterEngine::PolicyKind::NoAction: // This can never happen
1394 throw PDNSException("NoAction policy returned while a NSDNAME or NSIP trigger was hit");
1395 case DNSFilterEngine::PolicyKind::Drop:
1396 g_stats.policyDrops++;
1397 return;
1398 case DNSFilterEngine::PolicyKind::NXDOMAIN:
1399 ret.clear();
1400 res=RCode::NXDomain;
1401 goto haveAnswer;
1402
1403 case DNSFilterEngine::PolicyKind::NODATA:
1404 ret.clear();
1405 res=RCode::NoError;
1406 goto haveAnswer;
1407
1408 case DNSFilterEngine::PolicyKind::Truncate:
1409 if(!dc->d_tcp) {
1410 ret.clear();
1411 res=RCode::NoError;
1412 pw.getHeader()->tc=1;
1413 goto haveAnswer;
1414 }
1415 break;
1416
1417 case DNSFilterEngine::PolicyKind::Custom:
1418 ret.clear();
1419 res=RCode::NoError;
1420 spoofed=appliedPolicy.getCustomRecords(dc->d_mdp.d_qname, dc->d_mdp.d_qtype);
1421 for (const auto& dr : spoofed) {
1422 ret.push_back(dr);
1423 handleRPZCustom(dr, QType(dc->d_mdp.d_qtype), sr, res, ret);
1424 }
1425 goto haveAnswer;
1426 }
1427 }
1428
1429 if (wantsRPZ) {
1430 appliedPolicy = luaconfsLocal->dfe.getPostPolicy(ret, sr.d_discardedPolicies);
1431 }
1432
1433 if(t_pdl) {
1434 if(res == RCode::NoError) {
1435 auto i=ret.cbegin();
1436 for(; i!= ret.cend(); ++i)
1437 if(i->d_type == dc->d_mdp.d_qtype && i->d_place == DNSResourceRecord::ANSWER)
1438 break;
1439 if(i == ret.cend() && t_pdl->nodata(dq, res))
1440 shouldNotValidate = true;
1441
1442 }
1443 else if(res == RCode::NXDomain && t_pdl->nxdomain(dq, res))
1444 shouldNotValidate = true;
1445
1446 if(t_pdl->postresolve(dq, res))
1447 shouldNotValidate = true;
1448 }
1449
1450 if (wantsRPZ) { //XXX This block is repeated, see above
1451 g_stats.policyResults[appliedPolicy.d_kind]++;
1452 switch(appliedPolicy.d_kind) {
1453 case DNSFilterEngine::PolicyKind::NoAction:
1454 break;
1455 case DNSFilterEngine::PolicyKind::Drop:
1456 g_stats.policyDrops++;
1457 return;
1458 case DNSFilterEngine::PolicyKind::NXDOMAIN:
1459 ret.clear();
1460 res=RCode::NXDomain;
1461 goto haveAnswer;
1462
1463 case DNSFilterEngine::PolicyKind::NODATA:
1464 ret.clear();
1465 res=RCode::NoError;
1466 goto haveAnswer;
1467
1468 case DNSFilterEngine::PolicyKind::Truncate:
1469 if(!dc->d_tcp) {
1470 ret.clear();
1471 res=RCode::NoError;
1472 pw.getHeader()->tc=1;
1473 goto haveAnswer;
1474 }
1475 break;
1476
1477 case DNSFilterEngine::PolicyKind::Custom:
1478 ret.clear();
1479 res=RCode::NoError;
1480 spoofed=appliedPolicy.getCustomRecords(dc->d_mdp.d_qname, dc->d_mdp.d_qtype);
1481 for (const auto& dr : spoofed) {
1482 ret.push_back(dr);
1483 handleRPZCustom(dr, QType(dc->d_mdp.d_qtype), sr, res, ret);
1484 }
1485 goto haveAnswer;
1486 }
1487 }
1488 }
1489 haveAnswer:;
1490 if(res == PolicyDecision::DROP) {
1491 g_stats.policyDrops++;
1492 return;
1493 }
1494 if(tracedQuery || res == -1 || res == RCode::ServFail || pw.getHeader()->rcode == RCode::ServFail)
1495 {
1496 string trace(sr.getTrace());
1497 if(!trace.empty()) {
1498 vector<string> lines;
1499 boost::split(lines, trace, boost::is_any_of("\n"));
1500 for(const string& line : lines) {
1501 if(!line.empty())
1502 g_log<<Logger::Warning<< line << endl;
1503 }
1504 }
1505 }
1506
1507 if(res == -1) {
1508 pw.getHeader()->rcode=RCode::ServFail;
1509 // no commit here, because no record
1510 g_stats.servFails++;
1511 }
1512 else {
1513 pw.getHeader()->rcode=res;
1514
1515 // Does the validation mode or query demand validation?
1516 if(!shouldNotValidate && sr.isDNSSECValidationRequested()) {
1517 try {
1518 if(sr.doLog()) {
1519 g_log<<Logger::Warning<<"Starting validation of answer to "<<dc->d_mdp.d_qname<<"|"<<QType(dc->d_mdp.d_qtype).getName()<<" for "<<dc->getRemote()<<endl;
1520 }
1521
1522 auto state = sr.getValidationState();
1523
1524 if(state == Secure) {
1525 if(sr.doLog()) {
1526 g_log<<Logger::Warning<<"Answer to "<<dc->d_mdp.d_qname<<"|"<<QType(dc->d_mdp.d_qtype).getName()<<" for "<<dc->getRemote()<<" validates correctly"<<endl;
1527 }
1528
1529 // Is the query source interested in the value of the ad-bit?
1530 if (dc->d_mdp.d_header.ad || DNSSECOK)
1531 pw.getHeader()->ad=1;
1532 }
1533 else if(state == Insecure) {
1534 if(sr.doLog()) {
1535 g_log<<Logger::Warning<<"Answer to "<<dc->d_mdp.d_qname<<"|"<<QType(dc->d_mdp.d_qtype).getName()<<" for "<<dc->getRemote()<<" validates as Insecure"<<endl;
1536 }
1537
1538 pw.getHeader()->ad=0;
1539 }
1540 else if(state == Bogus) {
1541 if(t_bogusremotes)
1542 t_bogusremotes->push_back(dc->d_source);
1543 if(t_bogusqueryring)
1544 t_bogusqueryring->push_back(make_pair(dc->d_mdp.d_qname, dc->d_mdp.d_qtype));
1545 if(g_dnssecLogBogus || sr.doLog() || g_dnssecmode == DNSSECMode::ValidateForLog) {
1546 g_log<<Logger::Warning<<"Answer to "<<dc->d_mdp.d_qname<<"|"<<QType(dc->d_mdp.d_qtype).getName()<<" for "<<dc->getRemote()<<" validates as Bogus"<<endl;
1547 }
1548
1549 // Does the query or validation mode sending out a SERVFAIL on validation errors?
1550 if(!pw.getHeader()->cd && (g_dnssecmode == DNSSECMode::ValidateAll || dc->d_mdp.d_header.ad || DNSSECOK)) {
1551 if(sr.doLog()) {
1552 g_log<<Logger::Warning<<"Sending out SERVFAIL for "<<dc->d_mdp.d_qname<<"|"<<QType(dc->d_mdp.d_qtype).getName()<<" because recursor or query demands it for Bogus results"<<endl;
1553 }
1554
1555 pw.getHeader()->rcode=RCode::ServFail;
1556 goto sendit;
1557 } else {
1558 if(sr.doLog()) {
1559 g_log<<Logger::Warning<<"Not sending out SERVFAIL for "<<dc->d_mdp.d_qname<<"|"<<QType(dc->d_mdp.d_qtype).getName()<<" Bogus validation since neither config nor query demands this"<<endl;
1560 }
1561 }
1562 }
1563 }
1564 catch(ImmediateServFailException &e) {
1565 if(g_logCommonErrors)
1566 g_log<<Logger::Notice<<"Sending SERVFAIL to "<<dc->getRemote()<<" during validation of '"<<dc->d_mdp.d_qname<<"|"<<QType(dc->d_mdp.d_qtype).getName()<<"' because: "<<e.reason<<endl;
1567 pw.getHeader()->rcode=RCode::ServFail;
1568 goto sendit;
1569 }
1570 }
1571
1572 if(ret.size()) {
1573 orderAndShuffle(ret);
1574 if(auto sl = luaconfsLocal->sortlist.getOrderCmp(dc->d_source)) {
1575 stable_sort(ret.begin(), ret.end(), *sl);
1576 variableAnswer=true;
1577 }
1578 }
1579
1580 bool needCommit = false;
1581 for(auto i=ret.cbegin(); i!=ret.cend(); ++i) {
1582 if( ! DNSSECOK &&
1583 ( i->d_type == QType::NSEC3 ||
1584 (
1585 ( i->d_type == QType::RRSIG || i->d_type==QType::NSEC ) &&
1586 (
1587 ( dc->d_mdp.d_qtype != i->d_type && dc->d_mdp.d_qtype != QType::ANY ) ||
1588 i->d_place != DNSResourceRecord::ANSWER
1589 )
1590 )
1591 )
1592 ) {
1593 continue;
1594 }
1595
1596 if (!addRecordToPacket(pw, *i, minTTL, dc->d_ttlCap, maxanswersize)) {
1597 needCommit = false;
1598 break;
1599 }
1600 needCommit = true;
1601
1602 #ifdef NOD_ENABLED
1603 bool udr = false;
1604 if (g_udrEnabled) {
1605 udr = udrCheckUniqueDNSRecord(dc->d_mdp.d_qname, dc->d_mdp.d_qtype, *i);
1606 if (!hasUDR && udr)
1607 hasUDR = true;
1608 }
1609 #endif /* NOD ENABLED */
1610
1611 #ifdef HAVE_PROTOBUF
1612 if (t_protobufServers) {
1613 #ifdef NOD_ENABLED
1614 pbMessage->addRR(*i, luaconfsLocal->protobufExportConfig.exportTypes, udr);
1615 #else
1616 pbMessage->addRR(*i, luaconfsLocal->protobufExportConfig.exportTypes);
1617 #endif /* NOD_ENABLED */
1618 }
1619 #endif
1620 }
1621 if(needCommit)
1622 pw.commit();
1623 }
1624 sendit:;
1625
1626 if(g_useIncomingECS && dc->d_ecsFound && !sr.wasVariable() && !variableAnswer) {
1627 // cerr<<"Stuffing in a 0 scope because answer is static"<<endl;
1628 EDNSSubnetOpts eo;
1629 eo.source = dc->d_ednssubnet.source;
1630 ComboAddress sa;
1631 sa.reset();
1632 sa.sin4.sin_family = eo.source.getNetwork().sin4.sin_family;
1633 eo.scope = Netmask(sa, 0);
1634
1635 returnedEdnsOptions.push_back(make_pair(EDNSOptionCode::ECS, makeEDNSSubnetOptsString(eo)));
1636 }
1637
1638 if (haveEDNS) {
1639 /* we try to add the EDNS OPT RR even for truncated answers,
1640 as rfc6891 states:
1641 "The minimal response MUST be the DNS header, question section, and an
1642 OPT record. This MUST also occur when a truncated response (using
1643 the DNS header's TC bit) is returned."
1644 */
1645 pw.addOpt(512, ednsExtRCode, DNSSECOK ? EDNSOpts::DNSSECOK : 0, returnedEdnsOptions);
1646 pw.commit();
1647 }
1648
1649 g_rs.submitResponse(dc->d_mdp.d_qtype, packet.size(), !dc->d_tcp);
1650 updateResponseStats(res, dc->d_source, packet.size(), &dc->d_mdp.d_qname, dc->d_mdp.d_qtype);
1651 #ifdef NOD_ENABLED
1652 bool nod = false;
1653 if (g_nodEnabled) {
1654 if (nodCheckNewDomain(dc->d_mdp.d_qname))
1655 nod = true;
1656 }
1657 #endif /* NOD_ENABLED */
1658 #ifdef HAVE_PROTOBUF
1659 if (t_protobufServers && !(luaconfsLocal->protobufExportConfig.taggedOnly && (!appliedPolicy.d_name || appliedPolicy.d_name->empty()) && dc->d_policyTags.empty())) {
1660 pbMessage->setBytes(packet.size());
1661 pbMessage->setResponseCode(pw.getHeader()->rcode);
1662 if (appliedPolicy.d_name) {
1663 pbMessage->setAppliedPolicy(*appliedPolicy.d_name);
1664 pbMessage->setAppliedPolicyType(appliedPolicy.d_type);
1665 }
1666 pbMessage->setPolicyTags(dc->d_policyTags);
1667 if (g_useKernelTimestamp && dc->d_kernelTimestamp.tv_sec) {
1668 pbMessage->setQueryTime(dc->d_kernelTimestamp.tv_sec, dc->d_kernelTimestamp.tv_usec);
1669 }
1670 else {
1671 pbMessage->setQueryTime(dc->d_now.tv_sec, dc->d_now.tv_usec);
1672 }
1673 pbMessage->setRequestorId(dq.requestorId);
1674 pbMessage->setDeviceId(dq.deviceId);
1675 pbMessage->setDeviceName(dq.deviceName);
1676 #ifdef NOD_ENABLED
1677 if (g_nodEnabled) {
1678 if (nod) {
1679 pbMessage->setNOD(true);
1680 pbMessage->addPolicyTag(g_nod_pbtag);
1681 }
1682 if (hasUDR) {
1683 pbMessage->addPolicyTag(g_udr_pbtag);
1684 }
1685 }
1686 #endif /* NOD_ENABLED */
1687 if (dc->d_logResponse) {
1688 protobufLogResponse(*pbMessage);
1689 }
1690 #ifdef NOD_ENABLED
1691 if (g_nodEnabled) {
1692 pbMessage->setNOD(false);
1693 pbMessage->clearUDR();
1694 if (nod)
1695 pbMessage->removePolicyTag(g_nod_pbtag);
1696 if (hasUDR)
1697 pbMessage->removePolicyTag(g_udr_pbtag);
1698 }
1699 #endif /* NOD_ENABLED */
1700 }
1701 #endif
1702 if(!dc->d_tcp) {
1703 struct msghdr msgh;
1704 struct iovec iov;
1705 cmsgbuf_aligned cbuf;
1706 fillMSGHdr(&msgh, &iov, &cbuf, 0, (char*)&*packet.begin(), packet.size(), &dc->d_remote);
1707 msgh.msg_control=NULL;
1708
1709 if(g_fromtosockets.count(dc->d_socket)) {
1710 addCMsgSrcAddr(&msgh, &cbuf, &dc->d_local, 0);
1711 }
1712 if(sendmsg(dc->d_socket, &msgh, 0) < 0 && g_logCommonErrors)
1713 g_log<<Logger::Warning<<"Sending UDP reply to client "<<dc->getRemote()<<" failed with: "<<strerror(errno)<<endl;
1714
1715 if(variableAnswer || sr.wasVariable()) {
1716 g_stats.variableResponses++;
1717 }
1718 if(!SyncRes::s_nopacketcache && !variableAnswer && !sr.wasVariable() ) {
1719 t_packetCache->insertResponsePacket(dc->d_tag, dc->d_qhash, std::move(dc->d_query), dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_mdp.d_qclass,
1720 string((const char*)&*packet.begin(), packet.size()),
1721 g_now.tv_sec,
1722 pw.getHeader()->rcode == RCode::ServFail ? SyncRes::s_packetcacheservfailttl :
1723 min(minTTL,SyncRes::s_packetcachettl),
1724 dq.validationState,
1725 dc->d_ecsBegin,
1726 dc->d_ecsEnd,
1727 std::move(pbMessage));
1728 }
1729 // else cerr<<"Not putting in packet cache: "<<sr.wasVariable()<<endl;
1730 }
1731 else {
1732 char buf[2];
1733 buf[0]=packet.size()/256;
1734 buf[1]=packet.size()%256;
1735
1736 Utility::iovec iov[2];
1737
1738 iov[0].iov_base=(void*)buf; iov[0].iov_len=2;
1739 iov[1].iov_base=(void*)&*packet.begin(); iov[1].iov_len = packet.size();
1740
1741 int wret=Utility::writev(dc->d_socket, iov, 2);
1742 bool hadError=true;
1743
1744 if(wret == 0)
1745 g_log<<Logger::Error<<"EOF writing TCP answer to "<<dc->getRemote()<<endl;
1746 else if(wret < 0 )
1747 g_log<<Logger::Error<<"Error writing TCP answer to "<<dc->getRemote()<<": "<< strerror(errno) <<endl;
1748 else if((unsigned int)wret != 2 + packet.size())
1749 g_log<<Logger::Error<<"Oops, partial answer sent to "<<dc->getRemote()<<" for "<<dc->d_mdp.d_qname<<" (size="<< (2 + packet.size()) <<", sent "<<wret<<")"<<endl;
1750 else
1751 hadError=false;
1752
1753 // update tcp connection status, either by closing or moving to 'BYTE0'
1754
1755 if(hadError) {
1756 // no need to remove us from FDM, we weren't there
1757 dc->d_socket = -1;
1758 }
1759 else {
1760 dc->d_tcpConnection->queriesCount++;
1761 if (g_tcpMaxQueriesPerConn && dc->d_tcpConnection->queriesCount >= g_tcpMaxQueriesPerConn) {
1762 dc->d_socket = -1;
1763 }
1764 else {
1765 dc->d_tcpConnection->state=TCPConnection::BYTE0;
1766 Utility::gettimeofday(&g_now, 0); // needs to be updated
1767 struct timeval ttd = g_now;
1768 ttd.tv_sec += g_tcpTimeout;
1769
1770 t_fdm->addReadFD(dc->d_socket, handleRunningTCPQuestion, dc->d_tcpConnection, &ttd);
1771 }
1772 }
1773 }
1774 float spent=makeFloat(sr.getNow()-dc->d_now);
1775 if(!g_quiet) {
1776 g_log<<Logger::Error<<t_id<<" ["<<MT->getTid()<<"/"<<MT->numProcesses()<<"] answer to "<<(dc->d_mdp.d_header.rd?"":"non-rd ")<<"question '"<<dc->d_mdp.d_qname<<"|"<<DNSRecordContent::NumberToType(dc->d_mdp.d_qtype);
1777 g_log<<"': "<<ntohs(pw.getHeader()->ancount)<<" answers, "<<ntohs(pw.getHeader()->arcount)<<" additional, took "<<sr.d_outqueries<<" packets, "<<
1778 sr.d_totUsec/1000.0<<" netw ms, "<< spent*1000.0<<" tot ms, "<<
1779 sr.d_throttledqueries<<" throttled, "<<sr.d_timeouts<<" timeouts, "<<sr.d_tcpoutqueries<<" tcp connections, rcode="<< res;
1780
1781 if(!shouldNotValidate && sr.isDNSSECValidationRequested()) {
1782 g_log<< ", dnssec="<<vStates[sr.getValidationState()];
1783 }
1784
1785 g_log<<endl;
1786
1787 }
1788
1789 if (sr.d_outqueries || sr.d_authzonequeries) {
1790 t_RC->cacheMisses++;
1791 }
1792 else {
1793 t_RC->cacheHits++;
1794 }
1795
1796 if(spent < 0.001)
1797 g_stats.answers0_1++;
1798 else if(spent < 0.010)
1799 g_stats.answers1_10++;
1800 else if(spent < 0.1)
1801 g_stats.answers10_100++;
1802 else if(spent < 1.0)
1803 g_stats.answers100_1000++;
1804 else
1805 g_stats.answersSlow++;
1806
1807 uint64_t newLat=(uint64_t)(spent*1000000);
1808 newLat = min(newLat,(uint64_t)(((uint64_t) g_networkTimeoutMsec)*1000)); // outliers of several minutes exist..
1809 g_stats.avgLatencyUsec=(1-1.0/g_latencyStatSize)*g_stats.avgLatencyUsec + (float)newLat/g_latencyStatSize;
1810 // no worries, we do this for packet cache hits elsewhere
1811
1812 auto ourtime = 1000.0*spent-sr.d_totUsec/1000.0; // in msec
1813 if(ourtime < 1)
1814 g_stats.ourtime0_1++;
1815 else if(ourtime < 2)
1816 g_stats.ourtime1_2++;
1817 else if(ourtime < 4)
1818 g_stats.ourtime2_4++;
1819 else if(ourtime < 8)
1820 g_stats.ourtime4_8++;
1821 else if(ourtime < 16)
1822 g_stats.ourtime8_16++;
1823 else if(ourtime < 32)
1824 g_stats.ourtime16_32++;
1825 else {
1826 // cerr<<"SLOW: "<<ourtime<<"ms -> "<<dc->d_mdp.d_qname<<"|"<<DNSRecordContent::NumberToType(dc->d_mdp.d_qtype)<<endl;
1827 g_stats.ourtimeSlow++;
1828 }
1829 if(ourtime >= 0.0) {
1830 newLat=ourtime*1000; // usec
1831 g_stats.avgLatencyOursUsec=(1-1.0/g_latencyStatSize)*g_stats.avgLatencyOursUsec + (float)newLat/g_latencyStatSize;
1832 }
1833 // cout<<dc->d_mdp.d_qname<<"\t"<<MT->getUsec()<<"\t"<<sr.d_outqueries<<endl;
1834 }
1835 catch(PDNSException &ae) {
1836 g_log<<Logger::Error<<"startDoResolve problem "<<makeLoginfo(dc)<<": "<<ae.reason<<endl;
1837 }
1838 catch(const MOADNSException &mde) {
1839 g_log<<Logger::Error<<"DNS parser error "<<makeLoginfo(dc) <<": "<<dc->d_mdp.d_qname<<", "<<mde.what()<<endl;
1840 }
1841 catch(std::exception& e) {
1842 g_log<<Logger::Error<<"STL error "<< makeLoginfo(dc)<<": "<<e.what();
1843
1844 // Luawrapper nests the exception from Lua, so we unnest it here
1845 try {
1846 std::rethrow_if_nested(e);
1847 } catch(const std::exception& ne) {
1848 g_log<<". Extra info: "<<ne.what();
1849 } catch(...) {}
1850
1851 g_log<<endl;
1852 }
1853 catch(...) {
1854 g_log<<Logger::Error<<"Any other exception in a resolver context "<< makeLoginfo(dc) <<endl;
1855 }
1856
1857 g_stats.maxMThreadStackUsage = max(MT->getMaxStackUsage(), g_stats.maxMThreadStackUsage);
1858 }
1859
1860 static void makeControlChannelSocket(int processNum=-1)
1861 {
1862 string sockname=::arg()["socket-dir"]+"/"+s_programname;
1863 if(processNum >= 0)
1864 sockname += "."+std::to_string(processNum);
1865 sockname+=".controlsocket";
1866 s_rcc.listen(sockname);
1867
1868 int sockowner = -1;
1869 int sockgroup = -1;
1870
1871 if (!::arg().isEmpty("socket-group"))
1872 sockgroup=::arg().asGid("socket-group");
1873 if (!::arg().isEmpty("socket-owner"))
1874 sockowner=::arg().asUid("socket-owner");
1875
1876 if (sockgroup > -1 || sockowner > -1) {
1877 if(chown(sockname.c_str(), sockowner, sockgroup) < 0) {
1878 unixDie("Failed to chown control socket");
1879 }
1880 }
1881
1882 // do mode change if socket-mode is given
1883 if(!::arg().isEmpty("socket-mode")) {
1884 mode_t sockmode=::arg().asMode("socket-mode");
1885 if(chmod(sockname.c_str(), sockmode) < 0) {
1886 unixDie("Failed to chmod control socket");
1887 }
1888 }
1889 }
1890
1891 static void getQNameAndSubnet(const std::string& question, DNSName* dnsname, uint16_t* qtype, uint16_t* qclass,
1892 bool& foundECS, EDNSSubnetOpts* ednssubnet, EDNSOptionViewMap* options,
1893 bool& foundXPF, ComboAddress* xpfSource, ComboAddress* xpfDest)
1894 {
1895 const bool lookForXPF = xpfSource != nullptr && g_xpfRRCode != 0;
1896 const bool lookForECS = ednssubnet != nullptr;
1897 const struct dnsheader* dh = reinterpret_cast<const struct dnsheader*>(question.c_str());
1898 size_t questionLen = question.length();
1899 unsigned int consumed=0;
1900 *dnsname=DNSName(question.c_str(), questionLen, sizeof(dnsheader), false, qtype, qclass, &consumed);
1901
1902 size_t pos= sizeof(dnsheader)+consumed+4;
1903 const size_t headerSize = /* root */ 1 + sizeof(dnsrecordheader);
1904 const uint16_t arcount = ntohs(dh->arcount);
1905
1906 for (uint16_t arpos = 0; arpos < arcount && questionLen > (pos + headerSize) && ((lookForECS && !foundECS) || (lookForXPF && !foundXPF)); arpos++) {
1907 if (question.at(pos) != 0) {
1908 /* not an OPT or a XPF, bye. */
1909 return;
1910 }
1911
1912 pos += 1;
1913 const dnsrecordheader* drh = reinterpret_cast<const dnsrecordheader*>(&question.at(pos));
1914 pos += sizeof(dnsrecordheader);
1915
1916 if (pos >= questionLen) {
1917 return;
1918 }
1919
1920 /* OPT root label (1) followed by type (2) */
1921 if(lookForECS && ntohs(drh->d_type) == QType::OPT) {
1922 if (!options) {
1923 char* ecsStart = nullptr;
1924 size_t ecsLen = 0;
1925 /* we need to pass the record len */
1926 int res = getEDNSOption(const_cast<char*>(reinterpret_cast<const char*>(&question.at(pos - sizeof(drh->d_clen)))), questionLen - pos + sizeof(drh->d_clen), EDNSOptionCode::ECS, &ecsStart, &ecsLen);
1927 if (res == 0 && ecsLen > 4) {
1928 EDNSSubnetOpts eso;
1929 if(getEDNSSubnetOptsFromString(ecsStart + 4, ecsLen - 4, &eso)) {
1930 *ednssubnet=eso;
1931 foundECS = true;
1932 }
1933 }
1934 }
1935 else {
1936 /* we need to pass the record len */
1937 int res = getEDNSOptions(reinterpret_cast<const char*>(&question.at(pos -sizeof(drh->d_clen))), questionLen - pos + (sizeof(drh->d_clen)), *options);
1938 if (res == 0) {
1939 const auto& it = options->find(EDNSOptionCode::ECS);
1940 if (it != options->end() && !it->second.values.empty() && it->second.values.at(0).content != nullptr && it->second.values.at(0).size > 0) {
1941 EDNSSubnetOpts eso;
1942 if(getEDNSSubnetOptsFromString(it->second.values.at(0).content, it->second.values.at(0).size, &eso)) {
1943 *ednssubnet=eso;
1944 foundECS = true;
1945 }
1946 }
1947 }
1948 }
1949 }
1950 else if (lookForXPF && ntohs(drh->d_type) == g_xpfRRCode && ntohs(drh->d_class) == QClass::IN && drh->d_ttl == 0) {
1951 if ((questionLen - pos) < ntohs(drh->d_clen)) {
1952 return;
1953 }
1954
1955 foundXPF = parseXPFPayload(reinterpret_cast<const char*>(&question.at(pos)), ntohs(drh->d_clen), *xpfSource, xpfDest);
1956 }
1957
1958 pos += ntohs(drh->d_clen);
1959 }
1960 }
1961
1962 static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var)
1963 {
1964 shared_ptr<TCPConnection> conn=any_cast<shared_ptr<TCPConnection> >(var);
1965
1966 if(conn->state==TCPConnection::BYTE0) {
1967 ssize_t bytes=recv(conn->getFD(), &conn->data[0], 2, 0);
1968 if(bytes==1)
1969 conn->state=TCPConnection::BYTE1;
1970 if(bytes==2) {
1971 conn->qlen=(((unsigned char)conn->data[0]) << 8)+ (unsigned char)conn->data[1];
1972 conn->data.resize(conn->qlen);
1973 conn->bytesread=0;
1974 conn->state=TCPConnection::GETQUESTION;
1975 }
1976 if(!bytes || bytes < 0) {
1977 t_fdm->removeReadFD(fd);
1978 return;
1979 }
1980 }
1981 else if(conn->state==TCPConnection::BYTE1) {
1982 ssize_t bytes=recv(conn->getFD(), &conn->data[1], 1, 0);
1983 if(bytes==1) {
1984 conn->state=TCPConnection::GETQUESTION;
1985 conn->qlen=(((unsigned char)conn->data[0]) << 8)+ (unsigned char)conn->data[1];
1986 conn->data.resize(conn->qlen);
1987 conn->bytesread=0;
1988 }
1989 if(!bytes || bytes < 0) {
1990 if(g_logCommonErrors)
1991 g_log<<Logger::Error<<"TCP client "<< conn->d_remote.toStringWithPort() <<" disconnected after first byte"<<endl;
1992 t_fdm->removeReadFD(fd);
1993 return;
1994 }
1995 }
1996 else if(conn->state==TCPConnection::GETQUESTION) {
1997 ssize_t bytes=recv(conn->getFD(), &conn->data[conn->bytesread], conn->qlen - conn->bytesread, 0);
1998 if(!bytes || bytes < 0 || bytes > std::numeric_limits<std::uint16_t>::max()) {
1999 if(g_logCommonErrors) {
2000 g_log<<Logger::Error<<"TCP client "<< conn->d_remote.toStringWithPort() <<" disconnected while reading question body"<<endl;
2001 }
2002 t_fdm->removeReadFD(fd);
2003 return;
2004 }
2005 conn->bytesread+=(uint16_t)bytes;
2006 if(conn->bytesread==conn->qlen) {
2007 t_fdm->removeReadFD(fd); // should no longer awake ourselves when there is data to read
2008
2009 std::unique_ptr<DNSComboWriter> dc;
2010 try {
2011 dc=std::unique_ptr<DNSComboWriter>(new DNSComboWriter(conn->data, g_now));
2012 }
2013 catch(const MOADNSException &mde) {
2014 g_stats.clientParseError++;
2015 if(g_logCommonErrors)
2016 g_log<<Logger::Error<<"Unable to parse packet from TCP client "<< conn->d_remote.toStringWithPort() <<endl;
2017 return;
2018 }
2019 dc->d_tcpConnection = conn; // carry the torch
2020 dc->setSocket(conn->getFD()); // this is the only time a copy is made of the actual fd
2021 dc->d_tcp=true;
2022 dc->setRemote(conn->d_remote);
2023 dc->setSource(conn->d_remote);
2024 ComboAddress dest;
2025 dest.reset();
2026 dest.sin4.sin_family = conn->d_remote.sin4.sin_family;
2027 socklen_t len = dest.getSocklen();
2028 getsockname(conn->getFD(), (sockaddr*)&dest, &len); // if this fails, we're ok with it
2029 dc->setLocal(dest);
2030 dc->setDestination(dest);
2031 DNSName qname;
2032 uint16_t qtype=0;
2033 uint16_t qclass=0;
2034 bool needECS = false;
2035 bool needXPF = g_XPFAcl.match(conn->d_remote);
2036 string requestorId;
2037 string deviceId;
2038 string deviceName;
2039 bool logQuery = false;
2040 #ifdef HAVE_PROTOBUF
2041 auto luaconfsLocal = g_luaconfs.getLocal();
2042 if (checkProtobufExport(luaconfsLocal)) {
2043 needECS = true;
2044 }
2045 logQuery = t_protobufServers && luaconfsLocal->protobufExportConfig.logQueries;
2046 dc->d_logResponse = t_protobufServers && luaconfsLocal->protobufExportConfig.logResponses;
2047 #endif /* HAVE_PROTOBUF */
2048
2049 #ifdef HAVE_FSTRM
2050 checkFrameStreamExport(luaconfsLocal);
2051 #endif
2052
2053 if(needECS || needXPF || (t_pdl && (t_pdl->d_gettag_ffi || t_pdl->d_gettag))) {
2054
2055 try {
2056 EDNSOptionViewMap ednsOptions;
2057 bool xpfFound = false;
2058 dc->d_ecsParsed = true;
2059 dc->d_ecsFound = false;
2060 getQNameAndSubnet(conn->data, &qname, &qtype, &qclass,
2061 dc->d_ecsFound, &dc->d_ednssubnet, g_gettagNeedsEDNSOptions ? &ednsOptions : nullptr,
2062 xpfFound, needXPF ? &dc->d_source : nullptr, needXPF ? &dc->d_destination : nullptr);
2063
2064 if(t_pdl) {
2065 try {
2066 if (t_pdl->d_gettag_ffi) {
2067 dc->d_tag = t_pdl->gettag_ffi(dc->d_source, dc->d_ednssubnet.source, dc->d_destination, qname, qtype, &dc->d_policyTags, dc->d_records, dc->d_data, ednsOptions, true, requestorId, deviceId, deviceName, dc->d_rcode, dc->d_ttlCap, dc->d_variable, logQuery, dc->d_logResponse, dc->d_followCNAMERecords);
2068 }
2069 else if (t_pdl->d_gettag) {
2070 dc->d_tag = t_pdl->gettag(dc->d_source, dc->d_ednssubnet.source, dc->d_destination, qname, qtype, &dc->d_policyTags, dc->d_data, ednsOptions, true, requestorId, deviceId, deviceName);
2071 }
2072 }
2073 catch(const std::exception& e) {
2074 if(g_logCommonErrors)
2075 g_log<<Logger::Warning<<"Error parsing a query packet qname='"<<qname<<"' for tag determination, setting tag=0: "<<e.what()<<endl;
2076 }
2077 }
2078 }
2079 catch(const std::exception& e)
2080 {
2081 if(g_logCommonErrors)
2082 g_log<<Logger::Warning<<"Error parsing a query packet for tag determination, setting tag=0: "<<e.what()<<endl;
2083 }
2084 }
2085
2086 const struct dnsheader* dh = reinterpret_cast<const struct dnsheader*>(&conn->data[0]);
2087
2088 #ifdef HAVE_PROTOBUF
2089 if(t_protobufServers || t_outgoingProtobufServers) {
2090 dc->d_requestorId = requestorId;
2091 dc->d_deviceId = deviceId;
2092 dc->d_deviceName = deviceName;
2093 dc->d_uuid = getUniqueID();
2094 }
2095
2096 if(t_protobufServers) {
2097 try {
2098
2099 if (logQuery && !(luaconfsLocal->protobufExportConfig.taggedOnly && dc->d_policyTags.empty())) {
2100 protobufLogQuery(luaconfsLocal->protobufMaskV4, luaconfsLocal->protobufMaskV6, dc->d_uuid, dc->d_source, dc->d_destination, dc->d_ednssubnet.source, true, dh->id, conn->qlen, qname, qtype, qclass, dc->d_policyTags, dc->d_requestorId, dc->d_deviceId, dc->d_deviceName);
2101 }
2102 }
2103 catch(std::exception& e) {
2104 if(g_logCommonErrors)
2105 g_log<<Logger::Warning<<"Error parsing a TCP query packet for edns subnet: "<<e.what()<<endl;
2106 }
2107 }
2108 #endif
2109 if(t_pdl) {
2110 if(t_pdl->ipfilter(dc->d_source, dc->d_destination, *dh)) {
2111 if(!g_quiet)
2112 g_log<<Logger::Notice<<t_id<<" ["<<MT->getTid()<<"/"<<MT->numProcesses()<<"] DROPPED TCP question from "<<dc->d_source.toStringWithPort()<<(dc->d_source != dc->d_remote ? " (via "+dc->d_remote.toStringWithPort()+")" : "")<<" based on policy"<<endl;
2113 g_stats.policyDrops++;
2114 return;
2115 }
2116 }
2117
2118 if(dc->d_mdp.d_header.qr) {
2119 g_stats.ignoredCount++;
2120 if(g_logCommonErrors) {
2121 g_log<<Logger::Error<<"Ignoring answer from TCP client "<< dc->getRemote() <<" on server socket!"<<endl;
2122 }
2123 return;
2124 }
2125 if(dc->d_mdp.d_header.opcode) {
2126 g_stats.ignoredCount++;
2127 if(g_logCommonErrors) {
2128 g_log<<Logger::Error<<"Ignoring non-query opcode from TCP client "<< dc->getRemote() <<" on server socket!"<<endl;
2129 }
2130 return;
2131 }
2132 else if (dh->qdcount == 0) {
2133 g_stats.emptyQueriesCount++;
2134 if(g_logCommonErrors) {
2135 g_log<<Logger::Error<<"Ignoring empty (qdcount == 0) query from "<< dc->getRemote() <<" on server socket!"<<endl;
2136 }
2137 return;
2138 }
2139 else {
2140 ++g_stats.qcounter;
2141 ++g_stats.tcpqcounter;
2142 MT->makeThread(startDoResolve, dc.release()); // deletes dc, will set state to BYTE0 again
2143 return;
2144 }
2145 }
2146 }
2147 }
2148
2149 //! Handle new incoming TCP connection
2150 static void handleNewTCPQuestion(int fd, FDMultiplexer::funcparam_t& )
2151 {
2152 ComboAddress addr;
2153 socklen_t addrlen=sizeof(addr);
2154 int newsock=accept(fd, (struct sockaddr*)&addr, &addrlen);
2155 if(newsock>=0) {
2156 if(MT->numProcesses() > g_maxMThreads) {
2157 g_stats.overCapacityDrops++;
2158 try {
2159 closesocket(newsock);
2160 }
2161 catch(const PDNSException& e) {
2162 g_log<<Logger::Error<<"Error closing TCP socket after an over capacity drop: "<<e.reason<<endl;
2163 }
2164 return;
2165 }
2166
2167 if(t_remotes)
2168 t_remotes->push_back(addr);
2169 if(t_allowFrom && !t_allowFrom->match(&addr)) {
2170 if(!g_quiet)
2171 g_log<<Logger::Error<<"["<<MT->getTid()<<"] dropping TCP query from "<<addr.toString()<<", address not matched by allow-from"<<endl;
2172
2173 g_stats.unauthorizedTCP++;
2174 try {
2175 closesocket(newsock);
2176 }
2177 catch(const PDNSException& e) {
2178 g_log<<Logger::Error<<"Error closing TCP socket after an ACL drop: "<<e.reason<<endl;
2179 }
2180 return;
2181 }
2182 if(g_maxTCPPerClient && t_tcpClientCounts->count(addr) && (*t_tcpClientCounts)[addr] >= g_maxTCPPerClient) {
2183 g_stats.tcpClientOverflow++;
2184 try {
2185 closesocket(newsock); // don't call TCPConnection::closeAndCleanup here - did not enter it in the counts yet!
2186 }
2187 catch(const PDNSException& e) {
2188 g_log<<Logger::Error<<"Error closing TCP socket after an overflow drop: "<<e.reason<<endl;
2189 }
2190 return;
2191 }
2192
2193 setNonBlocking(newsock);
2194 std::shared_ptr<TCPConnection> tc = std::make_shared<TCPConnection>(newsock, addr);
2195 tc->state=TCPConnection::BYTE0;
2196
2197 struct timeval ttd;
2198 Utility::gettimeofday(&ttd, 0);
2199 ttd.tv_sec += g_tcpTimeout;
2200
2201 t_fdm->addReadFD(tc->getFD(), handleRunningTCPQuestion, tc, &ttd);
2202 }
2203 }
2204
2205 static string* doProcessUDPQuestion(const std::string& question, const ComboAddress& fromaddr, const ComboAddress& destaddr, struct timeval tv, int fd)
2206 {
2207 gettimeofday(&g_now, 0);
2208 if (tv.tv_sec) {
2209 struct timeval diff = g_now - tv;
2210 double delta=(diff.tv_sec*1000 + diff.tv_usec/1000.0);
2211
2212 if(delta > 1000.0) {
2213 g_stats.tooOldDrops++;
2214 return nullptr;
2215 }
2216 }
2217
2218 ++g_stats.qcounter;
2219 if(fromaddr.sin4.sin_family==AF_INET6)
2220 g_stats.ipv6qcounter++;
2221
2222 string response;
2223 const struct dnsheader* dh = (struct dnsheader*)question.c_str();
2224 unsigned int ctag=0;
2225 uint32_t qhash = 0;
2226 bool needECS = false;
2227 bool needXPF = g_XPFAcl.match(fromaddr);
2228 std::vector<std::string> policyTags;
2229 LuaContext::LuaObject data;
2230 ComboAddress source = fromaddr;
2231 ComboAddress destination = destaddr;
2232 string requestorId;
2233 string deviceId;
2234 string deviceName;
2235 bool logQuery = false;
2236 bool logResponse = false;
2237 #ifdef HAVE_PROTOBUF
2238 boost::uuids::uuid uniqueId;
2239 auto luaconfsLocal = g_luaconfs.getLocal();
2240 if (checkProtobufExport(luaconfsLocal)) {
2241 uniqueId = getUniqueID();
2242 needECS = true;
2243 } else if (checkOutgoingProtobufExport(luaconfsLocal)) {
2244 uniqueId = getUniqueID();
2245 }
2246 logQuery = t_protobufServers && luaconfsLocal->protobufExportConfig.logQueries;
2247 logResponse = t_protobufServers && luaconfsLocal->protobufExportConfig.logResponses;
2248 #endif
2249 #ifdef HAVE_FSTRM
2250 checkFrameStreamExport(luaconfsLocal);
2251 #endif
2252 EDNSSubnetOpts ednssubnet;
2253 bool ecsFound = false;
2254 bool ecsParsed = false;
2255 uint16_t ecsBegin = 0;
2256 uint16_t ecsEnd = 0;
2257 std::vector<DNSRecord> records;
2258 boost::optional<int> rcode = boost::none;
2259 uint32_t ttlCap = std::numeric_limits<uint32_t>::max();
2260 bool variable = false;
2261 bool followCNAMEs = false;
2262 try {
2263 DNSName qname;
2264 uint16_t qtype=0;
2265 uint16_t qclass=0;
2266 uint32_t age;
2267 bool qnameParsed=false;
2268 #ifdef MALLOC_TRACE
2269 /*
2270 static uint64_t last=0;
2271 if(!last)
2272 g_mtracer->clearAllocators();
2273 cout<<g_mtracer->getAllocs()-last<<" "<<g_mtracer->getNumOut()<<" -- BEGIN TRACE"<<endl;
2274 last=g_mtracer->getAllocs();
2275 cout<<g_mtracer->topAllocatorsString()<<endl;
2276 g_mtracer->clearAllocators();
2277 */
2278 #endif
2279
2280 if(needECS || needXPF || (t_pdl && (t_pdl->d_gettag || t_pdl->d_gettag_ffi))) {
2281 try {
2282 EDNSOptionViewMap ednsOptions;
2283 bool xpfFound = false;
2284
2285 ecsFound = false;
2286
2287 getQNameAndSubnet(question, &qname, &qtype, &qclass,
2288 ecsFound, &ednssubnet, g_gettagNeedsEDNSOptions ? &ednsOptions : nullptr,
2289 xpfFound, needXPF ? &source : nullptr, needXPF ? &destination : nullptr);
2290
2291 qnameParsed = true;
2292 ecsParsed = true;
2293
2294 if(t_pdl) {
2295 try {
2296 if (t_pdl->d_gettag_ffi) {
2297 ctag = t_pdl->gettag_ffi(source, ednssubnet.source, destination, qname, qtype, &policyTags, records, data, ednsOptions, false, requestorId, deviceId, deviceName, rcode, ttlCap, variable, logQuery, logResponse, followCNAMEs);
2298 }
2299 else if (t_pdl->d_gettag) {
2300 ctag = t_pdl->gettag(source, ednssubnet.source, destination, qname, qtype, &policyTags, data, ednsOptions, false, requestorId, deviceId, deviceName);
2301 }
2302 }
2303 catch(const std::exception& e) {
2304 if(g_logCommonErrors)
2305 g_log<<Logger::Warning<<"Error parsing a query packet qname='"<<qname<<"' for tag determination, setting tag=0: "<<e.what()<<endl;
2306 }
2307 }
2308 }
2309 catch(const std::exception& e)
2310 {
2311 if(g_logCommonErrors)
2312 g_log<<Logger::Warning<<"Error parsing a query packet for tag determination, setting tag=0: "<<e.what()<<endl;
2313 }
2314 }
2315
2316 bool cacheHit = false;
2317 boost::optional<RecProtoBufMessage> pbMessage(boost::none);
2318 #ifdef HAVE_PROTOBUF
2319 if (t_protobufServers) {
2320 pbMessage = RecProtoBufMessage(DNSProtoBufMessage::DNSProtoBufMessageType::Response);
2321 pbMessage->setServerIdentity(SyncRes::s_serverID);
2322 if (logQuery && !(luaconfsLocal->protobufExportConfig.taggedOnly && policyTags.empty())) {
2323 protobufLogQuery(luaconfsLocal->protobufMaskV4, luaconfsLocal->protobufMaskV6, uniqueId, source, destination, ednssubnet.source, false, dh->id, question.size(), qname, qtype, qclass, policyTags, requestorId, deviceId, deviceName);
2324 }
2325 }
2326 #endif /* HAVE_PROTOBUF */
2327
2328 /* It might seem like a good idea to skip the packet cache lookup if we know that the answer is not cacheable,
2329 but it means that the hash would not be computed. If some script decides at a later time to mark back the answer
2330 as cacheable we would cache it with a wrong tag, so better safe than sorry. */
2331 vState valState;
2332 if (qnameParsed) {
2333 cacheHit = (!SyncRes::s_nopacketcache && t_packetCache->getResponsePacket(ctag, question, qname, qtype, qclass, g_now.tv_sec, &response, &age, &valState, &qhash, &ecsBegin, &ecsEnd, pbMessage ? &(*pbMessage) : nullptr));
2334 }
2335 else {
2336 cacheHit = (!SyncRes::s_nopacketcache && t_packetCache->getResponsePacket(ctag, question, qname, &qtype, &qclass, g_now.tv_sec, &response, &age, &valState, &qhash, &ecsBegin, &ecsEnd, pbMessage ? &(*pbMessage) : nullptr));
2337 }
2338
2339 if (cacheHit) {
2340 if(valState == Bogus) {
2341 if(t_bogusremotes)
2342 t_bogusremotes->push_back(source);
2343 if(t_bogusqueryring)
2344 t_bogusqueryring->push_back(make_pair(qname, qtype));
2345 }
2346
2347 #ifdef HAVE_PROTOBUF
2348 if(t_protobufServers && logResponse && !(luaconfsLocal->protobufExportConfig.taggedOnly && pbMessage->getAppliedPolicy().empty() && pbMessage->getPolicyTags().empty())) {
2349 Netmask requestorNM(source, source.sin4.sin_family == AF_INET ? luaconfsLocal->protobufMaskV4 : luaconfsLocal->protobufMaskV6);
2350 const ComboAddress& requestor = requestorNM.getMaskedNetwork();
2351 pbMessage->update(uniqueId, &requestor, &destination, false, dh->id);
2352 pbMessage->setEDNSSubnet(ednssubnet.source, ednssubnet.source.isIpv4() ? luaconfsLocal->protobufMaskV4 : luaconfsLocal->protobufMaskV6);
2353 if (g_useKernelTimestamp && tv.tv_sec) {
2354 pbMessage->setQueryTime(tv.tv_sec, tv.tv_usec);
2355 }
2356 else {
2357 pbMessage->setQueryTime(g_now.tv_sec, g_now.tv_usec);
2358 }
2359 pbMessage->setRequestorId(requestorId);
2360 pbMessage->setDeviceId(deviceId);
2361 pbMessage->setDeviceName(deviceName);
2362 protobufLogResponse(*pbMessage);
2363 }
2364 #endif /* HAVE_PROTOBUF */
2365 if(!g_quiet)
2366 g_log<<Logger::Notice<<t_id<< " question answered from packet cache tag="<<ctag<<" from "<<source.toStringWithPort()<<(source != fromaddr ? " (via "+fromaddr.toStringWithPort()+")" : "")<<endl;
2367
2368 g_stats.packetCacheHits++;
2369 SyncRes::s_queries++;
2370 ageDNSPacket(response, age);
2371 struct msghdr msgh;
2372 struct iovec iov;
2373 cmsgbuf_aligned cbuf;
2374 fillMSGHdr(&msgh, &iov, &cbuf, 0, (char*)response.c_str(), response.length(), const_cast<ComboAddress*>(&fromaddr));
2375 msgh.msg_control=NULL;
2376
2377 if(g_fromtosockets.count(fd)) {
2378 addCMsgSrcAddr(&msgh, &cbuf, &destaddr, 0);
2379 }
2380 if(sendmsg(fd, &msgh, 0) < 0 && g_logCommonErrors)
2381 g_log<<Logger::Warning<<"Sending UDP reply to client "<<source.toStringWithPort()<<(source != fromaddr ? " (via "+fromaddr.toStringWithPort()+")" : "")<<" failed with: "<<strerror(errno)<<endl;
2382
2383 if(response.length() >= sizeof(struct dnsheader)) {
2384 struct dnsheader tmpdh;
2385 memcpy(&tmpdh, response.c_str(), sizeof(tmpdh));
2386 updateResponseStats(tmpdh.rcode, source, response.length(), 0, 0);
2387 }
2388 g_stats.avgLatencyUsec=(1-1.0/g_latencyStatSize)*g_stats.avgLatencyUsec + 0.0; // we assume 0 usec
2389 g_stats.avgLatencyOursUsec=(1-1.0/g_latencyStatSize)*g_stats.avgLatencyOursUsec + 0.0; // we assume 0 usec
2390 return 0;
2391 }
2392 }
2393 catch(std::exception& e) {
2394 g_log<<Logger::Error<<"Error processing or aging answer packet: "<<e.what()<<endl;
2395 return 0;
2396 }
2397
2398 if(t_pdl) {
2399 if(t_pdl->ipfilter(source, destination, *dh)) {
2400 if(!g_quiet)
2401 g_log<<Logger::Notice<<t_id<<" ["<<MT->getTid()<<"/"<<MT->numProcesses()<<"] DROPPED question from "<<source.toStringWithPort()<<(source != fromaddr ? " (via "+fromaddr.toStringWithPort()+")" : "")<<" based on policy"<<endl;
2402 g_stats.policyDrops++;
2403 return 0;
2404 }
2405 }
2406
2407 if(MT->numProcesses() > g_maxMThreads) {
2408 if(!g_quiet)
2409 g_log<<Logger::Notice<<t_id<<" ["<<MT->getTid()<<"/"<<MT->numProcesses()<<"] DROPPED question from "<<source.toStringWithPort()<<(source != fromaddr ? " (via "+fromaddr.toStringWithPort()+")" : "")<<", over capacity"<<endl;
2410
2411 g_stats.overCapacityDrops++;
2412 return 0;
2413 }
2414
2415 auto dc = std::unique_ptr<DNSComboWriter>(new DNSComboWriter(question, g_now, std::move(policyTags), std::move(data), std::move(records)));
2416 dc->setSocket(fd);
2417 dc->d_tag=ctag;
2418 dc->d_qhash=qhash;
2419 dc->setRemote(fromaddr);
2420 dc->setSource(source);
2421 dc->setLocal(destaddr);
2422 dc->setDestination(destination);
2423 dc->d_tcp=false;
2424 dc->d_ecsFound = ecsFound;
2425 dc->d_ecsParsed = ecsParsed;
2426 dc->d_ecsBegin = ecsBegin;
2427 dc->d_ecsEnd = ecsEnd;
2428 dc->d_ednssubnet = ednssubnet;
2429 dc->d_ttlCap = ttlCap;
2430 dc->d_variable = variable;
2431 dc->d_followCNAMERecords = followCNAMEs;
2432 dc->d_rcode = rcode;
2433 dc->d_logResponse = logResponse;
2434 #ifdef HAVE_PROTOBUF
2435 if (t_protobufServers || t_outgoingProtobufServers) {
2436 dc->d_uuid = std::move(uniqueId);
2437 }
2438 dc->d_requestorId = requestorId;
2439 dc->d_deviceId = deviceId;
2440 dc->d_deviceName = deviceName;
2441 dc->d_kernelTimestamp = tv;
2442 #endif
2443
2444 MT->makeThread(startDoResolve, (void*) dc.release()); // deletes dc
2445 return 0;
2446 }
2447
2448
2449 static void handleNewUDPQuestion(int fd, FDMultiplexer::funcparam_t& var)
2450 {
2451 ssize_t len;
2452 static const size_t maxIncomingQuerySize = 512;
2453 static thread_local std::string data;
2454 ComboAddress fromaddr;
2455 struct msghdr msgh;
2456 struct iovec iov;
2457 cmsgbuf_aligned cbuf;
2458 bool firstQuery = true;
2459
2460 for(size_t queriesCounter = 0; queriesCounter < s_maxUDPQueriesPerRound; queriesCounter++) {
2461 data.resize(maxIncomingQuerySize);
2462 fromaddr.sin6.sin6_family=AF_INET6; // this makes sure fromaddr is big enough
2463 fillMSGHdr(&msgh, &iov, &cbuf, sizeof(cbuf), &data[0], data.size(), &fromaddr);
2464
2465 if((len=recvmsg(fd, &msgh, 0)) >= 0) {
2466
2467 firstQuery = false;
2468
2469 if (static_cast<size_t>(len) < sizeof(dnsheader)) {
2470 g_stats.ignoredCount++;
2471 if (!g_quiet) {
2472 g_log<<Logger::Error<<"Ignoring too-short ("<<std::to_string(len)<<") query from "<<fromaddr.toString()<<endl;
2473 }
2474 return;
2475 }
2476
2477 if (msgh.msg_flags & MSG_TRUNC) {
2478 g_stats.truncatedDrops++;
2479 if (!g_quiet) {
2480 g_log<<Logger::Error<<"Ignoring truncated query from "<<fromaddr.toString()<<endl;
2481 }
2482 return;
2483 }
2484
2485 if(t_remotes) {
2486 t_remotes->push_back(fromaddr);
2487 }
2488
2489 if(t_allowFrom && !t_allowFrom->match(&fromaddr)) {
2490 if(!g_quiet) {
2491 g_log<<Logger::Error<<"["<<MT->getTid()<<"] dropping UDP query from "<<fromaddr.toString()<<", address not matched by allow-from"<<endl;
2492 }
2493
2494 g_stats.unauthorizedUDP++;
2495 return;
2496 }
2497 BOOST_STATIC_ASSERT(offsetof(sockaddr_in, sin_port) == offsetof(sockaddr_in6, sin6_port));
2498 if(!fromaddr.sin4.sin_port) { // also works for IPv6
2499 if(!g_quiet) {
2500 g_log<<Logger::Error<<"["<<MT->getTid()<<"] dropping UDP query from "<<fromaddr.toStringWithPort()<<", can't deal with port 0"<<endl;
2501 }
2502
2503 g_stats.clientParseError++; // not quite the best place to put it, but needs to go somewhere
2504 return;
2505 }
2506
2507 try {
2508 data.resize(static_cast<size_t>(len));
2509 dnsheader* dh=(dnsheader*)&data[0];
2510
2511 if(dh->qr) {
2512 g_stats.ignoredCount++;
2513 if(g_logCommonErrors) {
2514 g_log<<Logger::Error<<"Ignoring answer from "<<fromaddr.toString()<<" on server socket!"<<endl;
2515 }
2516 }
2517 else if(dh->opcode) {
2518 g_stats.ignoredCount++;
2519 if(g_logCommonErrors) {
2520 g_log<<Logger::Error<<"Ignoring non-query opcode "<<dh->opcode<<" from "<<fromaddr.toString()<<" on server socket!"<<endl;
2521 }
2522 }
2523 else if (dh->qdcount == 0) {
2524 g_stats.emptyQueriesCount++;
2525 if(g_logCommonErrors) {
2526 g_log<<Logger::Error<<"Ignoring empty (qdcount == 0) query from "<<fromaddr.toString()<<" on server socket!"<<endl;
2527 }
2528 }
2529 else {
2530 struct timeval tv={0,0};
2531 HarvestTimestamp(&msgh, &tv);
2532 ComboAddress dest;
2533 dest.reset(); // this makes sure we ignore this address if not returned by recvmsg above
2534 auto loc = rplookup(g_listenSocketsAddresses, fd);
2535 if(HarvestDestinationAddress(&msgh, &dest)) {
2536 // but.. need to get port too
2537 if(loc) {
2538 dest.sin4.sin_port = loc->sin4.sin_port;
2539 }
2540 }
2541 else {
2542 if(loc) {
2543 dest = *loc;
2544 }
2545 else {
2546 dest.sin4.sin_family = fromaddr.sin4.sin_family;
2547 socklen_t slen = dest.getSocklen();
2548 getsockname(fd, (sockaddr*)&dest, &slen); // if this fails, we're ok with it
2549 }
2550 }
2551
2552 if(g_weDistributeQueries) {
2553 distributeAsyncFunction(data, boost::bind(doProcessUDPQuestion, data, fromaddr, dest, tv, fd));
2554 }
2555 else {
2556 ++s_threadInfos[t_id].numberOfDistributedQueries;
2557 doProcessUDPQuestion(data, fromaddr, dest, tv, fd);
2558 }
2559 }
2560 }
2561 catch(const MOADNSException &mde) {
2562 g_stats.clientParseError++;
2563 if(g_logCommonErrors) {
2564 g_log<<Logger::Error<<"Unable to parse packet from remote UDP client "<<fromaddr.toString() <<": "<<mde.what()<<endl;
2565 }
2566 }
2567 catch(const std::runtime_error& e) {
2568 g_stats.clientParseError++;
2569 if(g_logCommonErrors) {
2570 g_log<<Logger::Error<<"Unable to parse packet from remote UDP client "<<fromaddr.toString() <<": "<<e.what()<<endl;
2571 }
2572 }
2573 }
2574 else {
2575 // cerr<<t_id<<" had error: "<<stringerror()<<endl;
2576 if(firstQuery && errno == EAGAIN) {
2577 g_stats.noPacketError++;
2578 }
2579
2580 break;
2581 }
2582 }
2583 }
2584
2585 static void makeTCPServerSockets(deferredAdd_t& deferredAdds, std::set<int>& tcpSockets)
2586 {
2587 int fd;
2588 vector<string>locals;
2589 stringtok(locals,::arg()["local-address"]," ,");
2590
2591 if(locals.empty())
2592 throw PDNSException("No local address specified");
2593
2594 for(vector<string>::const_iterator i=locals.begin();i!=locals.end();++i) {
2595 ServiceTuple st;
2596 st.port=::arg().asNum("local-port");
2597 parseService(*i, st);
2598
2599 ComboAddress sin;
2600
2601 sin.reset();
2602 sin.sin4.sin_family = AF_INET;
2603 if(!IpToU32(st.host, (uint32_t*)&sin.sin4.sin_addr.s_addr)) {
2604 sin.sin6.sin6_family = AF_INET6;
2605 if(makeIPv6sockaddr(st.host, &sin.sin6) < 0)
2606 throw PDNSException("Unable to resolve local address for TCP server on '"+ st.host +"'");
2607 }
2608
2609 fd=socket(sin.sin6.sin6_family, SOCK_STREAM, 0);
2610 if(fd<0)
2611 throw PDNSException("Making a TCP server socket for resolver: "+stringerror());
2612
2613 setCloseOnExec(fd);
2614
2615 int tmp=1;
2616 if(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &tmp, sizeof tmp)<0) {
2617 g_log<<Logger::Error<<"Setsockopt failed for TCP listening socket"<<endl;
2618 exit(1);
2619 }
2620 if(sin.sin6.sin6_family == AF_INET6 && setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &tmp, sizeof(tmp)) < 0) {
2621 g_log<<Logger::Error<<"Failed to set IPv6 socket to IPv6 only, continuing anyhow: "<<strerror(errno)<<endl;
2622 }
2623
2624 #ifdef TCP_DEFER_ACCEPT
2625 if(setsockopt(fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &tmp, sizeof tmp) >= 0) {
2626 if(i==locals.begin())
2627 g_log<<Logger::Info<<"Enabled TCP data-ready filter for (slight) DoS protection"<<endl;
2628 }
2629 #endif
2630
2631 if( ::arg().mustDo("non-local-bind") )
2632 Utility::setBindAny(AF_INET, fd);
2633
2634 #ifdef SO_REUSEPORT
2635 if(g_reusePort) {
2636 if(setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &tmp, sizeof(tmp)) < 0)
2637 throw PDNSException("SO_REUSEPORT: "+stringerror());
2638 }
2639 #endif
2640
2641 if (::arg().asNum("tcp-fast-open") > 0) {
2642 #ifdef TCP_FASTOPEN
2643 int fastOpenQueueSize = ::arg().asNum("tcp-fast-open");
2644 if (setsockopt(fd, IPPROTO_TCP, TCP_FASTOPEN, &fastOpenQueueSize, sizeof fastOpenQueueSize) < 0) {
2645 g_log<<Logger::Error<<"Failed to enable TCP Fast Open for listening socket: "<<strerror(errno)<<endl;
2646 }
2647 #else
2648 g_log<<Logger::Warning<<"TCP Fast Open configured but not supported for listening socket"<<endl;
2649 #endif
2650 }
2651
2652 sin.sin4.sin_port = htons(st.port);
2653 socklen_t socklen=sin.sin4.sin_family==AF_INET ? sizeof(sin.sin4) : sizeof(sin.sin6);
2654 if (::bind(fd, (struct sockaddr *)&sin, socklen )<0)
2655 throw PDNSException("Binding TCP server socket for "+ st.host +": "+stringerror());
2656
2657 setNonBlocking(fd);
2658 setSocketSendBuffer(fd, 65000);
2659 listen(fd, 128);
2660 deferredAdds.push_back(make_pair(fd, handleNewTCPQuestion));
2661 tcpSockets.insert(fd);
2662
2663 // we don't need to update g_listenSocketsAddresses since it doesn't work for TCP/IP:
2664 // - fd is not that which we know here, but returned from accept()
2665 if(sin.sin4.sin_family == AF_INET)
2666 g_log<<Logger::Info<<"Listening for TCP queries on "<< sin.toString() <<":"<<st.port<<endl;
2667 else
2668 g_log<<Logger::Info<<"Listening for TCP queries on ["<< sin.toString() <<"]:"<<st.port<<endl;
2669 }
2670 }
2671
2672 static void makeUDPServerSockets(deferredAdd_t& deferredAdds)
2673 {
2674 int one=1;
2675 vector<string>locals;
2676 stringtok(locals,::arg()["local-address"]," ,");
2677
2678 if(locals.empty())
2679 throw PDNSException("No local address specified");
2680
2681 for(vector<string>::const_iterator i=locals.begin();i!=locals.end();++i) {
2682 ServiceTuple st;
2683 st.port=::arg().asNum("local-port");
2684 parseService(*i, st);
2685
2686 ComboAddress sin;
2687
2688 sin.reset();
2689 sin.sin4.sin_family = AF_INET;
2690 if(!IpToU32(st.host.c_str() , (uint32_t*)&sin.sin4.sin_addr.s_addr)) {
2691 sin.sin6.sin6_family = AF_INET6;
2692 if(makeIPv6sockaddr(st.host, &sin.sin6) < 0)
2693 throw PDNSException("Unable to resolve local address for UDP server on '"+ st.host +"'");
2694 }
2695
2696 int fd=socket(sin.sin4.sin_family, SOCK_DGRAM, 0);
2697 if(fd < 0) {
2698 throw PDNSException("Making a UDP server socket for resolver: "+netstringerror());
2699 }
2700 if (!setSocketTimestamps(fd))
2701 g_log<<Logger::Warning<<"Unable to enable timestamp reporting for socket"<<endl;
2702
2703 if(IsAnyAddress(sin)) {
2704 if(sin.sin4.sin_family == AF_INET)
2705 if(!setsockopt(fd, IPPROTO_IP, GEN_IP_PKTINFO, &one, sizeof(one))) // linux supports this, so why not - might fail on other systems
2706 g_fromtosockets.insert(fd);
2707 #ifdef IPV6_RECVPKTINFO
2708 if(sin.sin4.sin_family == AF_INET6)
2709 if(!setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, &one, sizeof(one)))
2710 g_fromtosockets.insert(fd);
2711 #endif
2712 if(sin.sin6.sin6_family == AF_INET6 && setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one)) < 0) {
2713 g_log<<Logger::Error<<"Failed to set IPv6 socket to IPv6 only, continuing anyhow: "<<strerror(errno)<<endl;
2714 }
2715 }
2716 if( ::arg().mustDo("non-local-bind") )
2717 Utility::setBindAny(AF_INET6, fd);
2718
2719 setCloseOnExec(fd);
2720
2721 setSocketReceiveBuffer(fd, 250000);
2722 sin.sin4.sin_port = htons(st.port);
2723
2724
2725 #ifdef SO_REUSEPORT
2726 if(g_reusePort) {
2727 if(setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &one, sizeof(one)) < 0)
2728 throw PDNSException("SO_REUSEPORT: "+stringerror());
2729 }
2730 #endif
2731
2732 if (sin.isIPv4()) {
2733 try {
2734 setSocketIgnorePMTU(fd);
2735 }
2736 catch(const std::exception& e) {
2737 g_log<<Logger::Warning<<"Failed to set IP_MTU_DISCOVER on UDP server socket: "<<e.what()<<endl;
2738 }
2739 }
2740
2741 socklen_t socklen=sin.getSocklen();
2742 if (::bind(fd, (struct sockaddr *)&sin, socklen)<0)
2743 throw PDNSException("Resolver binding to server socket on port "+ std::to_string(st.port) +" for "+ st.host+": "+stringerror());
2744
2745 setNonBlocking(fd);
2746
2747 deferredAdds.push_back(make_pair(fd, handleNewUDPQuestion));
2748 g_listenSocketsAddresses[fd]=sin; // this is written to only from the startup thread, not from the workers
2749 if(sin.sin4.sin_family == AF_INET)
2750 g_log<<Logger::Info<<"Listening for UDP queries on "<< sin.toString() <<":"<<st.port<<endl;
2751 else
2752 g_log<<Logger::Info<<"Listening for UDP queries on ["<< sin.toString() <<"]:"<<st.port<<endl;
2753 }
2754 }
2755
2756 static void daemonize(void)
2757 {
2758 if(fork())
2759 exit(0); // bye bye
2760
2761 setsid();
2762
2763 int i=open("/dev/null",O_RDWR); /* open stdin */
2764 if(i < 0)
2765 g_log<<Logger::Critical<<"Unable to open /dev/null: "<<stringerror()<<endl;
2766 else {
2767 dup2(i,0); /* stdin */
2768 dup2(i,1); /* stderr */
2769 dup2(i,2); /* stderr */
2770 close(i);
2771 }
2772 }
2773
2774 static void usr1Handler(int)
2775 {
2776 statsWanted=true;
2777 }
2778
2779 static void usr2Handler(int)
2780 {
2781 g_quiet= !g_quiet;
2782 SyncRes::setDefaultLogMode(g_quiet ? SyncRes::LogNone : SyncRes::Log);
2783 ::arg().set("quiet")=g_quiet ? "" : "no";
2784 }
2785
2786 static void doStats(void)
2787 {
2788 static time_t lastOutputTime;
2789 static uint64_t lastQueryCount;
2790
2791 uint64_t cacheHits = broadcastAccFunction<uint64_t>(pleaseGetCacheHits);
2792 uint64_t cacheMisses = broadcastAccFunction<uint64_t>(pleaseGetCacheMisses);
2793
2794 if(g_stats.qcounter && (cacheHits + cacheMisses) && SyncRes::s_queries && SyncRes::s_outqueries) {
2795 g_log<<Logger::Notice<<"stats: "<<g_stats.qcounter<<" questions, "<<
2796 broadcastAccFunction<uint64_t>(pleaseGetCacheSize)<< " cache entries, "<<
2797 broadcastAccFunction<uint64_t>(pleaseGetNegCacheSize)<<" negative entries, "<<
2798 (int)((cacheHits*100.0)/(cacheHits+cacheMisses))<<"% cache hits"<<endl;
2799
2800 g_log<<Logger::Notice<<"stats: throttle map: "
2801 << broadcastAccFunction<uint64_t>(pleaseGetThrottleSize) <<", ns speeds: "
2802 << broadcastAccFunction<uint64_t>(pleaseGetNsSpeedsSize)<<endl;
2803 g_log<<Logger::Notice<<"stats: outpacket/query ratio "<<(int)(SyncRes::s_outqueries*100.0/SyncRes::s_queries)<<"%";
2804 g_log<<Logger::Notice<<", "<<(int)(SyncRes::s_throttledqueries*100.0/(SyncRes::s_outqueries+SyncRes::s_throttledqueries))<<"% throttled, "
2805 <<SyncRes::s_nodelegated<<" no-delegation drops"<<endl;
2806 g_log<<Logger::Notice<<"stats: "<<SyncRes::s_tcpoutqueries<<" outgoing tcp connections, "<<
2807 broadcastAccFunction<uint64_t>(pleaseGetConcurrentQueries)<<" queries running, "<<SyncRes::s_outgoingtimeouts<<" outgoing timeouts"<<endl;
2808
2809 //g_log<<Logger::Notice<<"stats: "<<g_stats.ednsPingMatches<<" ping matches, "<<g_stats.ednsPingMismatches<<" mismatches, "<<
2810 //g_stats.noPingOutQueries<<" outqueries w/o ping, "<< g_stats.noEdnsOutQueries<<" w/o EDNS"<<endl;
2811
2812 g_log<<Logger::Notice<<"stats: " << broadcastAccFunction<uint64_t>(pleaseGetPacketCacheSize) <<
2813 " packet cache entries, "<<(int)(100.0*broadcastAccFunction<uint64_t>(pleaseGetPacketCacheHits)/SyncRes::s_queries) << "% packet cache hits"<<endl;
2814
2815 size_t idx = 0;
2816 for (const auto& threadInfo : s_threadInfos) {
2817 if(threadInfo.isWorker) {
2818 g_log<<Logger::Notice<<"stats: thread "<<idx<<" has been distributed "<<threadInfo.numberOfDistributedQueries<<" queries"<<endl;
2819 ++idx;
2820 }
2821 }
2822
2823 time_t now = time(0);
2824 if(lastOutputTime && lastQueryCount && now != lastOutputTime) {
2825 g_log<<Logger::Notice<<"stats: "<< (SyncRes::s_queries - lastQueryCount) / (now - lastOutputTime) <<" qps (average over "<< (now - lastOutputTime) << " seconds)"<<endl;
2826 }
2827 lastOutputTime = now;
2828 lastQueryCount = SyncRes::s_queries;
2829 }
2830 else if(statsWanted)
2831 g_log<<Logger::Notice<<"stats: no stats yet!"<<endl;
2832
2833 statsWanted=false;
2834 }
2835
2836 static void houseKeeping(void *)
2837 {
2838 static thread_local time_t last_rootupdate, last_prune, last_secpoll, last_trustAnchorUpdate{0};
2839 static thread_local int cleanCounter=0;
2840 static thread_local bool s_running; // houseKeeping can get suspended in secpoll, and be restarted, which makes us do duplicate work
2841 auto luaconfsLocal = g_luaconfs.getLocal();
2842
2843 if (last_trustAnchorUpdate == 0 && !luaconfsLocal->trustAnchorFileInfo.fname.empty() && luaconfsLocal->trustAnchorFileInfo.interval != 0) {
2844 // Loading the Lua config file already "refreshed" the TAs
2845 last_trustAnchorUpdate = g_now.tv_sec + luaconfsLocal->trustAnchorFileInfo.interval * 3600;
2846 }
2847
2848 try {
2849 if(s_running) {
2850 return;
2851 }
2852 s_running=true;
2853
2854 struct timeval now;
2855 Utility::gettimeofday(&now, 0);
2856
2857 if(now.tv_sec - last_prune > (time_t)(5 + t_id)) {
2858 t_RC->doPrune(g_maxCacheEntries / g_numThreads); // this function is local to a thread, so fine anyhow
2859 t_packetCache->doPruneTo(g_maxPacketCacheEntries / g_numWorkerThreads);
2860
2861 SyncRes::pruneNegCache(g_maxCacheEntries / (g_numWorkerThreads * 10));
2862
2863 if(!((cleanCounter++)%40)) { // this is a full scan!
2864 time_t limit=now.tv_sec-300;
2865 SyncRes::pruneNSSpeeds(limit);
2866 }
2867 last_prune=time(0);
2868 }
2869
2870 if(now.tv_sec - last_rootupdate > 7200) {
2871 int res = SyncRes::getRootNS(g_now, nullptr);
2872 if (!res)
2873 last_rootupdate=now.tv_sec;
2874 }
2875
2876 if(isHandlerThread()) {
2877
2878 if(now.tv_sec - last_secpoll >= 3600) {
2879 try {
2880 doSecPoll(&last_secpoll);
2881 }
2882 catch(std::exception& e)
2883 {
2884 g_log<<Logger::Error<<"Exception while performing security poll: "<<e.what()<<endl;
2885 }
2886 catch(PDNSException& e)
2887 {
2888 g_log<<Logger::Error<<"Exception while performing security poll: "<<e.reason<<endl;
2889 }
2890 catch(ImmediateServFailException &e)
2891 {
2892 g_log<<Logger::Error<<"Exception while performing security poll: "<<e.reason<<endl;
2893 }
2894 catch(...)
2895 {
2896 g_log<<Logger::Error<<"Exception while performing security poll"<<endl;
2897 }
2898 }
2899
2900 if (!luaconfsLocal->trustAnchorFileInfo.fname.empty() && luaconfsLocal->trustAnchorFileInfo.interval != 0 &&
2901 g_now.tv_sec - last_trustAnchorUpdate >= (luaconfsLocal->trustAnchorFileInfo.interval * 3600)) {
2902 g_log<<Logger::Debug<<"Refreshing Trust Anchors from file"<<endl;
2903 try {
2904 map<DNSName, dsmap_t> dsAnchors;
2905 if (updateTrustAnchorsFromFile(luaconfsLocal->trustAnchorFileInfo.fname, dsAnchors)) {
2906 g_luaconfs.modify([&dsAnchors](LuaConfigItems& lci) {
2907 lci.dsAnchors = dsAnchors;
2908 });
2909 }
2910 last_trustAnchorUpdate = now.tv_sec;
2911 } catch (const PDNSException &pe) {
2912 g_log<<Logger::Error<<"Unable to update Trust Anchors: "<<pe.reason<<endl;
2913 }
2914 }
2915 }
2916 s_running=false;
2917 }
2918 catch(PDNSException& ae)
2919 {
2920 s_running=false;
2921 g_log<<Logger::Error<<"Fatal error in housekeeping thread: "<<ae.reason<<endl;
2922 throw;
2923 }
2924 }
2925
2926 static void makeThreadPipes()
2927 {
2928 auto pipeBufferSize = ::arg().asNum("distribution-pipe-buffer-size");
2929 if (pipeBufferSize > 0) {
2930 g_log<<Logger::Info<<"Resizing the buffer of the distribution pipe to "<<pipeBufferSize<<endl;
2931 }
2932
2933 /* thread 0 is the handler / SNMP, we start at 1 */
2934 for(unsigned int n = 1; n <= (g_numWorkerThreads + g_numDistributorThreads); ++n) {
2935 auto& threadInfos = s_threadInfos.at(n);
2936
2937 int fd[2];
2938 if(pipe(fd) < 0)
2939 unixDie("Creating pipe for inter-thread communications");
2940
2941 threadInfos.pipes.readToThread = fd[0];
2942 threadInfos.pipes.writeToThread = fd[1];
2943
2944 if(pipe(fd) < 0)
2945 unixDie("Creating pipe for inter-thread communications");
2946
2947 threadInfos.pipes.readFromThread = fd[0];
2948 threadInfos.pipes.writeFromThread = fd[1];
2949
2950 if(pipe(fd) < 0)
2951 unixDie("Creating pipe for inter-thread communications");
2952
2953 threadInfos.pipes.readQueriesToThread = fd[0];
2954 threadInfos.pipes.writeQueriesToThread = fd[1];
2955
2956 if (pipeBufferSize > 0) {
2957 if (!setPipeBufferSize(threadInfos.pipes.writeQueriesToThread, pipeBufferSize)) {
2958 g_log<<Logger::Warning<<"Error resizing the buffer of the distribution pipe for thread "<<n<<" to "<<pipeBufferSize<<": "<<strerror(errno)<<endl;
2959 auto existingSize = getPipeBufferSize(threadInfos.pipes.writeQueriesToThread);
2960 if (existingSize > 0) {
2961 g_log<<Logger::Warning<<"The current size of the distribution pipe's buffer for thread "<<n<<" is "<<existingSize<<endl;
2962 }
2963 }
2964 }
2965
2966 if (!setNonBlocking(threadInfos.pipes.writeQueriesToThread)) {
2967 unixDie("Making pipe for inter-thread communications non-blocking");
2968 }
2969 }
2970 }
2971
2972 struct ThreadMSG
2973 {
2974 pipefunc_t func;
2975 bool wantAnswer;
2976 };
2977
2978 void broadcastFunction(const pipefunc_t& func)
2979 {
2980 /* This function might be called by the worker with t_id 0 during startup
2981 for the initialization of ACLs and domain maps. After that it should only
2982 be called by the handler. */
2983
2984 if (s_threadInfos.empty() && isHandlerThread()) {
2985 /* the handler and distributors will call themselves below, but
2986 during startup we get called while s_threadInfos has not been
2987 populated yet to update the ACL or domain maps, so we need to
2988 handle that case.
2989 */
2990 func();
2991 }
2992
2993 unsigned int n = 0;
2994 for (const auto& threadInfo : s_threadInfos) {
2995 if(n++ == t_id) {
2996 func(); // don't write to ourselves!
2997 continue;
2998 }
2999
3000 ThreadMSG* tmsg = new ThreadMSG();
3001 tmsg->func = func;
3002 tmsg->wantAnswer = true;
3003 if(write(threadInfo.pipes.writeToThread, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) {
3004 delete tmsg;
3005
3006 unixDie("write to thread pipe returned wrong size or error");
3007 }
3008
3009 string* resp = nullptr;
3010 if(read(threadInfo.pipes.readFromThread, &resp, sizeof(resp)) != sizeof(resp))
3011 unixDie("read from thread pipe returned wrong size or error");
3012
3013 if(resp) {
3014 delete resp;
3015 resp = nullptr;
3016 }
3017 }
3018 }
3019
3020 static bool trySendingQueryToWorker(unsigned int target, ThreadMSG* tmsg)
3021 {
3022 auto& targetInfo = s_threadInfos[target];
3023 if(!targetInfo.isWorker) {
3024 g_log<<Logger::Error<<"distributeAsyncFunction() tried to assign a query to a non-worker thread"<<endl;
3025 exit(1);
3026 }
3027
3028 const auto& tps = targetInfo.pipes;
3029
3030 ssize_t written = write(tps.writeQueriesToThread, &tmsg, sizeof(tmsg));
3031 if (written > 0) {
3032 if (static_cast<size_t>(written) != sizeof(tmsg)) {
3033 delete tmsg;
3034 unixDie("write to thread pipe returned wrong size or error");
3035 }
3036 }
3037 else {
3038 int error = errno;
3039 if (error == EAGAIN || error == EWOULDBLOCK) {
3040 return false;
3041 } else {
3042 delete tmsg;
3043 unixDie("write to thread pipe returned wrong size or error:" + std::to_string(error));
3044 }
3045 }
3046
3047 ++targetInfo.numberOfDistributedQueries;
3048
3049 return true;
3050 }
3051
3052 static unsigned int getWorkerLoad(size_t workerIdx)
3053 {
3054 const auto mt = s_threadInfos[/* skip handler */ 1 + g_numDistributorThreads + workerIdx].mt;
3055 if (mt != nullptr) {
3056 return mt->numProcesses();
3057 }
3058 return 0;
3059 }
3060
3061 static unsigned int selectWorker(unsigned int hash)
3062 {
3063 if (s_balancingFactor == 0) {
3064 return /* skip handler */ 1 + g_numDistributorThreads + (hash % g_numWorkerThreads);
3065 }
3066
3067 /* we start with one, representing the query we are currently handling */
3068 double currentLoad = 1;
3069 std::vector<unsigned int> load(g_numWorkerThreads);
3070 for (size_t idx = 0; idx < g_numWorkerThreads; idx++) {
3071 load[idx] = getWorkerLoad(idx);
3072 currentLoad += load[idx];
3073 // cerr<<"load for worker "<<idx<<" is "<<load[idx]<<endl;
3074 }
3075
3076 double targetLoad = (currentLoad / g_numWorkerThreads) * s_balancingFactor;
3077 // cerr<<"total load is "<<currentLoad<<", number of workers is "<<g_numWorkerThreads<<", target load is "<<targetLoad<<endl;
3078
3079 unsigned int worker = hash % g_numWorkerThreads;
3080 /* at least one server has to be at or below the average load */
3081 if (load[worker] > targetLoad) {
3082 ++g_stats.rebalancedQueries;
3083 do {
3084 // cerr<<"worker "<<worker<<" is above the target load, selecting another one"<<endl;
3085 worker = (worker + 1) % g_numWorkerThreads;
3086 }
3087 while(load[worker] > targetLoad);
3088 }
3089
3090 return /* skip handler */ 1 + g_numDistributorThreads + worker;
3091 }
3092
3093 // This function is only called by the distributor threads, when pdns-distributes-queries is set
3094 void distributeAsyncFunction(const string& packet, const pipefunc_t& func)
3095 {
3096 if (!isDistributorThread()) {
3097 g_log<<Logger::Error<<"distributeAsyncFunction() has been called by a worker ("<<t_id<<")"<<endl;
3098 exit(1);
3099 }
3100
3101 unsigned int hash = hashQuestion(packet.c_str(), packet.length(), g_disthashseed);
3102 unsigned int target = selectWorker(hash);
3103
3104 ThreadMSG* tmsg = new ThreadMSG();
3105 tmsg->func = func;
3106 tmsg->wantAnswer = false;
3107
3108 if (!trySendingQueryToWorker(target, tmsg)) {
3109 /* if this function failed but did not raise an exception, it means that the pipe
3110 was full, let's try another one */
3111 unsigned int newTarget = 0;
3112 do {
3113 newTarget = /* skip handler */ 1 + g_numDistributorThreads + dns_random(g_numWorkerThreads);
3114 } while (newTarget == target);
3115
3116 if (!trySendingQueryToWorker(newTarget, tmsg)) {
3117 g_stats.queryPipeFullDrops++;
3118 delete tmsg;
3119 }
3120 }
3121 }
3122
3123 static void handlePipeRequest(int fd, FDMultiplexer::funcparam_t& var)
3124 {
3125 ThreadMSG* tmsg = nullptr;
3126
3127 if(read(fd, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) { // fd == readToThread || fd == readQueriesToThread
3128 unixDie("read from thread pipe returned wrong size or error");
3129 }
3130
3131 void *resp=0;
3132 try {
3133 resp = tmsg->func();
3134 }
3135 catch(std::exception& e) {
3136 if(g_logCommonErrors)
3137 g_log<<Logger::Error<<"PIPE function we executed created exception: "<<e.what()<<endl; // but what if they wanted an answer.. we send 0
3138 }
3139 catch(PDNSException& e) {
3140 if(g_logCommonErrors)
3141 g_log<<Logger::Error<<"PIPE function we executed created PDNS exception: "<<e.reason<<endl; // but what if they wanted an answer.. we send 0
3142 }
3143 if(tmsg->wantAnswer) {
3144 const auto& threadInfo = s_threadInfos.at(t_id);
3145 if(write(threadInfo.pipes.writeFromThread, &resp, sizeof(resp)) != sizeof(resp)) {
3146 delete tmsg;
3147 unixDie("write to thread pipe returned wrong size or error");
3148 }
3149 }
3150
3151 delete tmsg;
3152 }
3153
3154 template<class T> void *voider(const boost::function<T*()>& func)
3155 {
3156 return func();
3157 }
3158
3159 vector<ComboAddress>& operator+=(vector<ComboAddress>&a, const vector<ComboAddress>& b)
3160 {
3161 a.insert(a.end(), b.begin(), b.end());
3162 return a;
3163 }
3164
3165 vector<pair<string, uint16_t> >& operator+=(vector<pair<string, uint16_t> >&a, const vector<pair<string, uint16_t> >& b)
3166 {
3167 a.insert(a.end(), b.begin(), b.end());
3168 return a;
3169 }
3170
3171 vector<pair<DNSName, uint16_t> >& operator+=(vector<pair<DNSName, uint16_t> >&a, const vector<pair<DNSName, uint16_t> >& b)
3172 {
3173 a.insert(a.end(), b.begin(), b.end());
3174 return a;
3175 }
3176
3177
3178 /*
3179 This function should only be called by the handler to gather metrics, wipe the cache,
3180 reload the Lua script (not the Lua config) or change the current trace regex,
3181 and by the SNMP thread to gather metrics. */
3182 template<class T> T broadcastAccFunction(const boost::function<T*()>& func)
3183 {
3184 if (!isHandlerThread()) {
3185 g_log<<Logger::Error<<"broadcastAccFunction has been called by a worker ("<<t_id<<")"<<endl;
3186 exit(1);
3187 }
3188
3189 unsigned int n = 0;
3190 T ret=T();
3191 for (const auto& threadInfo : s_threadInfos) {
3192 if (n++ == t_id) {
3193 continue;
3194 }
3195
3196 const auto& tps = threadInfo.pipes;
3197 ThreadMSG* tmsg = new ThreadMSG();
3198 tmsg->func = boost::bind(voider<T>, func);
3199 tmsg->wantAnswer = true;
3200
3201 if(write(tps.writeToThread, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) {
3202 delete tmsg;
3203 unixDie("write to thread pipe returned wrong size or error");
3204 }
3205
3206 T* resp = nullptr;
3207 if(read(tps.readFromThread, &resp, sizeof(resp)) != sizeof(resp))
3208 unixDie("read from thread pipe returned wrong size or error");
3209
3210 if(resp) {
3211 ret += *resp;
3212 delete resp;
3213 resp = nullptr;
3214 }
3215 }
3216 return ret;
3217 }
3218
3219 template string broadcastAccFunction(const boost::function<string*()>& fun); // explicit instantiation
3220 template uint64_t broadcastAccFunction(const boost::function<uint64_t*()>& fun); // explicit instantiation
3221 template vector<ComboAddress> broadcastAccFunction(const boost::function<vector<ComboAddress> *()>& fun); // explicit instantiation
3222 template vector<pair<DNSName,uint16_t> > broadcastAccFunction(const boost::function<vector<pair<DNSName, uint16_t> > *()>& fun); // explicit instantiation
3223 template ThreadTimes broadcastAccFunction(const boost::function<ThreadTimes*()>& fun);
3224
3225 static void handleRCC(int fd, FDMultiplexer::funcparam_t& var)
3226 {
3227 try {
3228 string remote;
3229 string msg=s_rcc.recv(&remote);
3230 RecursorControlParser rcp;
3231 RecursorControlParser::func_t* command;
3232
3233 string answer=rcp.getAnswer(msg, &command);
3234
3235 // If we are inside a chroot, we need to strip
3236 if (!arg()["chroot"].empty()) {
3237 size_t len = arg()["chroot"].length();
3238 remote = remote.substr(len);
3239 }
3240
3241 s_rcc.send(answer, &remote);
3242 command();
3243 }
3244 catch(const std::exception& e) {
3245 g_log<<Logger::Error<<"Error dealing with control socket request: "<<e.what()<<endl;
3246 }
3247 catch(const PDNSException& ae) {
3248 g_log<<Logger::Error<<"Error dealing with control socket request: "<<ae.reason<<endl;
3249 }
3250 }
3251
3252 static void handleTCPClientReadable(int fd, FDMultiplexer::funcparam_t& var)
3253 {
3254 PacketID* pident=any_cast<PacketID>(&var);
3255 // cerr<<"handleTCPClientReadable called for fd "<<fd<<", pident->inNeeded: "<<pident->inNeeded<<", "<<pident->sock->getHandle()<<endl;
3256
3257 shared_array<char> buffer(new char[pident->inNeeded]);
3258
3259 ssize_t ret=recv(fd, buffer.get(), pident->inNeeded,0);
3260 if(ret > 0) {
3261 pident->inMSG.append(&buffer[0], &buffer[ret]);
3262 pident->inNeeded-=(size_t)ret;
3263 if(!pident->inNeeded || pident->inIncompleteOkay) {
3264 // cerr<<"Got entire load of "<<pident->inMSG.size()<<" bytes"<<endl;
3265 PacketID pid=*pident;
3266 string msg=pident->inMSG;
3267
3268 t_fdm->removeReadFD(fd);
3269 MT->sendEvent(pid, &msg);
3270 }
3271 else {
3272 // cerr<<"Still have "<<pident->inNeeded<<" left to go"<<endl;
3273 }
3274 }
3275 else {
3276 PacketID tmp=*pident;
3277 t_fdm->removeReadFD(fd); // pident might now be invalid (it isn't, but still)
3278 string empty;
3279 MT->sendEvent(tmp, &empty); // this conveys error status
3280 }
3281 }
3282
3283 static void handleTCPClientWritable(int fd, FDMultiplexer::funcparam_t& var)
3284 {
3285 PacketID* pid=any_cast<PacketID>(&var);
3286 ssize_t ret=send(fd, pid->outMSG.c_str() + pid->outPos, pid->outMSG.size() - pid->outPos,0);
3287 if(ret > 0) {
3288 pid->outPos+=(ssize_t)ret;
3289 if(pid->outPos==pid->outMSG.size()) {
3290 PacketID tmp=*pid;
3291 t_fdm->removeWriteFD(fd);
3292 MT->sendEvent(tmp, &tmp.outMSG); // send back what we sent to convey everything is ok
3293 }
3294 }
3295 else { // error or EOF
3296 PacketID tmp(*pid);
3297 t_fdm->removeWriteFD(fd);
3298 string sent;
3299 MT->sendEvent(tmp, &sent); // we convey error status by sending empty string
3300 }
3301 }
3302
3303 // resend event to everybody chained onto it
3304 static void doResends(MT_t::waiters_t::iterator& iter, PacketID resend, const string& content)
3305 {
3306 if(iter->key.chain.empty())
3307 return;
3308 // cerr<<"doResends called!\n";
3309 for(PacketID::chain_t::iterator i=iter->key.chain.begin(); i != iter->key.chain.end() ; ++i) {
3310 resend.fd=-1;
3311 resend.id=*i;
3312 // cerr<<"\tResending "<<content.size()<<" bytes for fd="<<resend.fd<<" and id="<<resend.id<<endl;
3313
3314 MT->sendEvent(resend, &content);
3315 g_stats.chainResends++;
3316 }
3317 }
3318
3319 static void handleUDPServerResponse(int fd, FDMultiplexer::funcparam_t& var)
3320 {
3321 PacketID pid=any_cast<PacketID>(var);
3322 ssize_t len;
3323 std::string packet;
3324 packet.resize(g_outgoingEDNSBufsize);
3325 ComboAddress fromaddr;
3326 socklen_t addrlen=sizeof(fromaddr);
3327
3328 len=recvfrom(fd, &packet.at(0), packet.size(), 0, (sockaddr *)&fromaddr, &addrlen);
3329
3330 if(len < (ssize_t) sizeof(dnsheader)) {
3331 if(len < 0)
3332 ; // cerr<<"Error on fd "<<fd<<": "<<stringerror()<<"\n";
3333 else {
3334 g_stats.serverParseError++;
3335 if(g_logCommonErrors)
3336 g_log<<Logger::Error<<"Unable to parse packet from remote UDP server "<< fromaddr.toString() <<
3337 ": packet smaller than DNS header"<<endl;
3338 }
3339
3340 t_udpclientsocks->returnSocket(fd);
3341 string empty;
3342
3343 MT_t::waiters_t::iterator iter=MT->d_waiters.find(pid);
3344 if(iter != MT->d_waiters.end())
3345 doResends(iter, pid, empty);
3346
3347 MT->sendEvent(pid, &empty); // this denotes error (does lookup again.. at least L1 will be hot)
3348 return;
3349 }
3350
3351 packet.resize(len);
3352 dnsheader dh;
3353 memcpy(&dh, &packet.at(0), sizeof(dh));
3354
3355 PacketID pident;
3356 pident.remote=fromaddr;
3357 pident.id=dh.id;
3358 pident.fd=fd;
3359
3360 if(!dh.qr && g_logCommonErrors) {
3361 g_log<<Logger::Notice<<"Not taking data from question on outgoing socket from "<< fromaddr.toStringWithPort() <<endl;
3362 }
3363
3364 if(!dh.qdcount || // UPC, Nominum, very old BIND on FormErr, NSD
3365 !dh.qr) { // one weird server
3366 pident.domain.clear();
3367 pident.type = 0;
3368 }
3369 else {
3370 try {
3371 if(len > 12)
3372 pident.domain=DNSName(&packet.at(0), len, 12, false, &pident.type); // don't copy this from above - we need to do the actual read
3373 }
3374 catch(std::exception& e) {
3375 g_stats.serverParseError++; // won't be fed to lwres.cc, so we have to increment
3376 g_log<<Logger::Warning<<"Error in packet from remote nameserver "<< fromaddr.toStringWithPort() << ": "<<e.what() << endl;
3377 return;
3378 }
3379 }
3380
3381 MT_t::waiters_t::iterator iter=MT->d_waiters.find(pident);
3382 if(iter != MT->d_waiters.end()) {
3383 doResends(iter, pident, packet);
3384 }
3385
3386 retryWithName:
3387
3388 if(!MT->sendEvent(pident, &packet)) {
3389 /* we did not find a match for this response, something is wrong */
3390
3391 // we do a full scan for outstanding queries on unexpected answers. not too bad since we only accept them on the right port number, which is hard enough to guess
3392 for(MT_t::waiters_t::iterator mthread=MT->d_waiters.begin(); mthread!=MT->d_waiters.end(); ++mthread) {
3393 if(pident.fd==mthread->key.fd && mthread->key.remote==pident.remote && mthread->key.type == pident.type &&
3394 pident.domain == mthread->key.domain) {
3395 mthread->key.nearMisses++;
3396 }
3397
3398 // be a bit paranoid here since we're weakening our matching
3399 if(pident.domain.empty() && !mthread->key.domain.empty() && !pident.type && mthread->key.type &&
3400 pident.id == mthread->key.id && mthread->key.remote == pident.remote) {
3401 // cerr<<"Empty response, rest matches though, sending to a waiter"<<endl;
3402 pident.domain = mthread->key.domain;
3403 pident.type = mthread->key.type;
3404 goto retryWithName; // note that this only passes on an error, lwres will still reject the packet
3405 }
3406 }
3407 g_stats.unexpectedCount++; // if we made it here, it really is an unexpected answer
3408 if(g_logCommonErrors) {
3409 g_log<<Logger::Warning<<"Discarding unexpected packet from "<<fromaddr.toStringWithPort()<<": "<< (pident.domain.empty() ? "<empty>" : pident.domain.toString())<<", "<<pident.type<<", "<<MT->d_waiters.size()<<" waiters"<<endl;
3410 }
3411 }
3412 else if(fd >= 0) {
3413 /* we either found a waiter (1) or encountered an issue (-1), it's up to us to clean the socket anyway */
3414 t_udpclientsocks->returnSocket(fd);
3415 }
3416 }
3417
3418 FDMultiplexer* getMultiplexer()
3419 {
3420 FDMultiplexer* ret;
3421 for(const auto& i : FDMultiplexer::getMultiplexerMap()) {
3422 try {
3423 ret=i.second();
3424 return ret;
3425 }
3426 catch(FDMultiplexerException &fe) {
3427 g_log<<Logger::Error<<"Non-fatal error initializing possible multiplexer ("<<fe.what()<<"), falling back"<<endl;
3428 }
3429 catch(...) {
3430 g_log<<Logger::Error<<"Non-fatal error initializing possible multiplexer"<<endl;
3431 }
3432 }
3433 g_log<<Logger::Error<<"No working multiplexer found!"<<endl;
3434 exit(1);
3435 }
3436
3437
3438 static string* doReloadLuaScript()
3439 {
3440 string fname= ::arg()["lua-dns-script"];
3441 try {
3442 if(fname.empty()) {
3443 t_pdl.reset();
3444 g_log<<Logger::Info<<t_id<<" Unloaded current lua script"<<endl;
3445 return new string("unloaded\n");
3446 }
3447 else {
3448 t_pdl = std::make_shared<RecursorLua4>();
3449 t_pdl->loadFile(fname);
3450 }
3451 }
3452 catch(std::exception& e) {
3453 g_log<<Logger::Error<<t_id<<" Retaining current script, error from '"<<fname<<"': "<< e.what() <<endl;
3454 return new string("retaining current script, error from '"+fname+"': "+e.what()+"\n");
3455 }
3456
3457 g_log<<Logger::Warning<<t_id<<" (Re)loaded lua script from '"<<fname<<"'"<<endl;
3458 return new string("(re)loaded '"+fname+"'\n");
3459 }
3460
3461 string doQueueReloadLuaScript(vector<string>::const_iterator begin, vector<string>::const_iterator end)
3462 {
3463 if(begin != end)
3464 ::arg().set("lua-dns-script") = *begin;
3465
3466 return broadcastAccFunction<string>(doReloadLuaScript);
3467 }
3468
3469 static string* pleaseUseNewTraceRegex(const std::string& newRegex)
3470 try
3471 {
3472 if(newRegex.empty()) {
3473 t_traceRegex.reset();
3474 return new string("unset\n");
3475 }
3476 else {
3477 t_traceRegex = std::make_shared<Regex>(newRegex);
3478 return new string("ok\n");
3479 }
3480 }
3481 catch(PDNSException& ae)
3482 {
3483 return new string(ae.reason+"\n");
3484 }
3485
3486 string doTraceRegex(vector<string>::const_iterator begin, vector<string>::const_iterator end)
3487 {
3488 return broadcastAccFunction<string>(boost::bind(pleaseUseNewTraceRegex, begin!=end ? *begin : ""));
3489 }
3490
3491 static void checkLinuxIPv6Limits()
3492 {
3493 #ifdef __linux__
3494 string line;
3495 if(readFileIfThere("/proc/sys/net/ipv6/route/max_size", &line)) {
3496 int lim=std::stoi(line);
3497 if(lim < 16384) {
3498 g_log<<Logger::Error<<"If using IPv6, please raise sysctl net.ipv6.route.max_size, currently set to "<<lim<<" which is < 16384"<<endl;
3499 }
3500 }
3501 #endif
3502 }
3503 static void checkOrFixFDS()
3504 {
3505 unsigned int availFDs=getFilenumLimit();
3506 unsigned int wantFDs = g_maxMThreads * g_numWorkerThreads +25; // even healthier margin then before
3507
3508 if(wantFDs > availFDs) {
3509 unsigned int hardlimit= getFilenumLimit(true);
3510 if(hardlimit >= wantFDs) {
3511 setFilenumLimit(wantFDs);
3512 g_log<<Logger::Warning<<"Raised soft limit on number of filedescriptors to "<<wantFDs<<" to match max-mthreads and threads settings"<<endl;
3513 }
3514 else {
3515 int newval = (hardlimit - 25) / g_numWorkerThreads;
3516 g_log<<Logger::Warning<<"Insufficient number of filedescriptors available for max-mthreads*threads setting! ("<<hardlimit<<" < "<<wantFDs<<"), reducing max-mthreads to "<<newval<<endl;
3517 g_maxMThreads = newval;
3518 setFilenumLimit(hardlimit);
3519 }
3520 }
3521 }
3522
3523 static void* recursorThread(unsigned int tid, const string& threadName);
3524
3525 static void* pleaseSupplantACLs(std::shared_ptr<NetmaskGroup> ng)
3526 {
3527 t_allowFrom = ng;
3528 return nullptr;
3529 }
3530
3531 int g_argc;
3532 char** g_argv;
3533
3534 void parseACLs()
3535 {
3536 static bool l_initialized;
3537
3538 if(l_initialized) { // only reload configuration file on second call
3539 string configname=::arg()["config-dir"]+"/recursor.conf";
3540 if(::arg()["config-name"]!="") {
3541 configname=::arg()["config-dir"]+"/recursor-"+::arg()["config-name"]+".conf";
3542 }
3543 cleanSlashes(configname);
3544
3545 if(!::arg().preParseFile(configname.c_str(), "allow-from-file"))
3546 throw runtime_error("Unable to re-parse configuration file '"+configname+"'");
3547 ::arg().preParseFile(configname.c_str(), "allow-from", LOCAL_NETS);
3548 ::arg().preParseFile(configname.c_str(), "include-dir");
3549 ::arg().preParse(g_argc, g_argv, "include-dir");
3550
3551 // then process includes
3552 std::vector<std::string> extraConfigs;
3553 ::arg().gatherIncludes(extraConfigs);
3554
3555 for(const std::string& fn : extraConfigs) {
3556 if(!::arg().preParseFile(fn.c_str(), "allow-from-file", ::arg()["allow-from-file"]))
3557 throw runtime_error("Unable to re-parse configuration file include '"+fn+"'");
3558 if(!::arg().preParseFile(fn.c_str(), "allow-from", ::arg()["allow-from"]))
3559 throw runtime_error("Unable to re-parse configuration file include '"+fn+"'");
3560 }
3561
3562 ::arg().preParse(g_argc, g_argv, "allow-from-file");
3563 ::arg().preParse(g_argc, g_argv, "allow-from");
3564 }
3565
3566 std::shared_ptr<NetmaskGroup> oldAllowFrom = t_allowFrom;
3567 std::shared_ptr<NetmaskGroup> allowFrom = std::make_shared<NetmaskGroup>();
3568
3569 if(!::arg()["allow-from-file"].empty()) {
3570 string line;
3571 ifstream ifs(::arg()["allow-from-file"].c_str());
3572 if(!ifs) {
3573 throw runtime_error("Could not open '"+::arg()["allow-from-file"]+"': "+stringerror());
3574 }
3575
3576 string::size_type pos;
3577 while(getline(ifs,line)) {
3578 pos=line.find('#');
3579 if(pos!=string::npos)
3580 line.resize(pos);
3581 trim(line);
3582 if(line.empty())
3583 continue;
3584
3585 allowFrom->addMask(line);
3586 }
3587 g_log<<Logger::Warning<<"Done parsing " << allowFrom->size() <<" allow-from ranges from file '"<<::arg()["allow-from-file"]<<"' - overriding 'allow-from' setting"<<endl;
3588 }
3589 else if(!::arg()["allow-from"].empty()) {
3590 vector<string> ips;
3591 stringtok(ips, ::arg()["allow-from"], ", ");
3592
3593 g_log<<Logger::Warning<<"Only allowing queries from: ";
3594 for(vector<string>::const_iterator i = ips.begin(); i!= ips.end(); ++i) {
3595 allowFrom->addMask(*i);
3596 if(i!=ips.begin())
3597 g_log<<Logger::Warning<<", ";
3598 g_log<<Logger::Warning<<*i;
3599 }
3600 g_log<<Logger::Warning<<endl;
3601 }
3602 else {
3603 if(::arg()["local-address"]!="127.0.0.1" && ::arg().asNum("local-port")==53)
3604 g_log<<Logger::Warning<<"WARNING: Allowing queries from all IP addresses - this can be a security risk!"<<endl;
3605 allowFrom = nullptr;
3606 }
3607
3608 g_initialAllowFrom = allowFrom;
3609 broadcastFunction(boost::bind(pleaseSupplantACLs, allowFrom));
3610 oldAllowFrom = nullptr;
3611
3612 l_initialized = true;
3613 }
3614
3615
3616 static void setupDelegationOnly()
3617 {
3618 vector<string> parts;
3619 stringtok(parts, ::arg()["delegation-only"], ", \t");
3620 for(const auto& p : parts) {
3621 SyncRes::addDelegationOnly(DNSName(p));
3622 }
3623 }
3624
3625 static std::map<unsigned int, std::set<int> > parseCPUMap()
3626 {
3627 std::map<unsigned int, std::set<int> > result;
3628
3629 const std::string value = ::arg()["cpu-map"];
3630
3631 if (!value.empty() && !isSettingThreadCPUAffinitySupported()) {
3632 g_log<<Logger::Warning<<"CPU mapping requested but not supported, skipping"<<endl;
3633 return result;
3634 }
3635
3636 std::vector<std::string> parts;
3637
3638 stringtok(parts, value, " \t");
3639
3640 for(const auto& part : parts) {
3641 if (part.find('=') == string::npos)
3642 continue;
3643
3644 try {
3645 auto headers = splitField(part, '=');
3646 trim(headers.first);
3647 trim(headers.second);
3648
3649 unsigned int threadId = pdns_stou(headers.first);
3650 std::vector<std::string> cpus;
3651
3652 stringtok(cpus, headers.second, ",");
3653
3654 for(const auto& cpu : cpus) {
3655 int cpuId = std::stoi(cpu);
3656
3657 result[threadId].insert(cpuId);
3658 }
3659 }
3660 catch(const std::exception& e) {
3661 g_log<<Logger::Error<<"Error parsing cpu-map entry '"<<part<<"': "<<e.what()<<endl;
3662 }
3663 }
3664
3665 return result;
3666 }
3667
3668 static void setCPUMap(const std::map<unsigned int, std::set<int> >& cpusMap, unsigned int n, pthread_t tid)
3669 {
3670 const auto& cpuMapping = cpusMap.find(n);
3671 if (cpuMapping != cpusMap.cend()) {
3672 int rc = mapThreadToCPUList(tid, cpuMapping->second);
3673 if (rc == 0) {
3674 g_log<<Logger::Info<<"CPU affinity for worker "<<n<<" has been set to CPU map:";
3675 for (const auto cpu : cpuMapping->second) {
3676 g_log<<Logger::Info<<" "<<cpu;
3677 }
3678 g_log<<Logger::Info<<endl;
3679 }
3680 else {
3681 g_log<<Logger::Warning<<"Error setting CPU affinity for worker "<<n<<" to CPU map:";
3682 for (const auto cpu : cpuMapping->second) {
3683 g_log<<Logger::Info<<" "<<cpu;
3684 }
3685 g_log<<Logger::Info<<strerror(rc)<<endl;
3686 }
3687 }
3688 }
3689
3690 #ifdef NOD_ENABLED
3691 static void setupNODThread()
3692 {
3693 if (g_nodEnabled) {
3694 uint32_t num_cells = ::arg().asNum("new-domain-db-size");
3695 t_nodDBp = std::make_shared<nod::NODDB>(num_cells);
3696 try {
3697 t_nodDBp->setCacheDir(::arg()["new-domain-history-dir"]);
3698 }
3699 catch (const PDNSException& e) {
3700 g_log<<Logger::Error<<"new-domain-history-dir (" << ::arg()["new-domain-history-dir"] << ") is not readable or does not exist"<<endl;
3701 _exit(1);
3702 }
3703 if (!t_nodDBp->init()) {
3704 g_log<<Logger::Error<<"Could not initialize domain tracking"<<endl;
3705 _exit(1);
3706 }
3707 std::thread t(nod::NODDB::startHousekeepingThread, t_nodDBp, std::this_thread::get_id());
3708 t.detach();
3709 g_nod_pbtag = ::arg()["new-domain-pb-tag"];
3710 }
3711 if (g_udrEnabled) {
3712 uint32_t num_cells = ::arg().asNum("unique-response-db-size");
3713 t_udrDBp = std::make_shared<nod::UniqueResponseDB>(num_cells);
3714 try {
3715 t_udrDBp->setCacheDir(::arg()["unique-response-history-dir"]);
3716 }
3717 catch (const PDNSException& e) {
3718 g_log<<Logger::Error<<"unique-response-history-dir (" << ::arg()["unique-response-history-dir"] << ") is not readable or does not exist"<<endl;
3719 _exit(1);
3720 }
3721 if (!t_udrDBp->init()) {
3722 g_log<<Logger::Error<<"Could not initialize unique response tracking"<<endl;
3723 _exit(1);
3724 }
3725 std::thread t(nod::UniqueResponseDB::startHousekeepingThread, t_udrDBp, std::this_thread::get_id());
3726 t.detach();
3727 g_udr_pbtag = ::arg()["unique-response-pb-tag"];
3728 }
3729 }
3730
3731 void parseNODWhitelist(const std::string& wlist)
3732 {
3733 vector<string> parts;
3734 stringtok(parts, wlist, ",; ");
3735 for(const auto& a : parts) {
3736 g_nodDomainWL.add(DNSName(a));
3737 }
3738 }
3739
3740 static void setupNODGlobal()
3741 {
3742 // Setup NOD subsystem
3743 g_nodEnabled = ::arg().mustDo("new-domain-tracking");
3744 g_nodLookupDomain = DNSName(::arg()["new-domain-lookup"]);
3745 g_nodLog = ::arg().mustDo("new-domain-log");
3746 parseNODWhitelist(::arg()["new-domain-whitelist"]);
3747
3748 // Setup Unique DNS Response subsystem
3749 g_udrEnabled = ::arg().mustDo("unique-response-tracking");
3750 g_udrLog = ::arg().mustDo("unique-response-log");
3751 }
3752 #endif /* NOD_ENABLED */
3753
3754 static int serviceMain(int argc, char*argv[])
3755 {
3756 g_log.setName(s_programname);
3757 g_log.disableSyslog(::arg().mustDo("disable-syslog"));
3758 g_log.setTimestamps(::arg().mustDo("log-timestamp"));
3759
3760 if(!::arg()["logging-facility"].empty()) {
3761 int val=logFacilityToLOG(::arg().asNum("logging-facility") );
3762 if(val >= 0)
3763 g_log.setFacility(val);
3764 else
3765 g_log<<Logger::Error<<"Unknown logging facility "<<::arg().asNum("logging-facility") <<endl;
3766 }
3767
3768 showProductVersion();
3769
3770 g_disthashseed=dns_random(0xffffffff);
3771
3772 checkLinuxIPv6Limits();
3773 try {
3774 vector<string> addrs;
3775 if(!::arg()["query-local-address6"].empty()) {
3776 SyncRes::s_doIPv6=true;
3777 g_log<<Logger::Warning<<"Enabling IPv6 transport for outgoing queries"<<endl;
3778
3779 stringtok(addrs, ::arg()["query-local-address6"], ", ;");
3780 for(const string& addr : addrs) {
3781 g_localQueryAddresses6.push_back(ComboAddress(addr));
3782 }
3783 }
3784 else {
3785 g_log<<Logger::Warning<<"NOT using IPv6 for outgoing queries - set 'query-local-address6=::' to enable"<<endl;
3786 }
3787 addrs.clear();
3788 stringtok(addrs, ::arg()["query-local-address"], ", ;");
3789 for(const string& addr : addrs) {
3790 g_localQueryAddresses4.push_back(ComboAddress(addr));
3791 }
3792 }
3793 catch(std::exception& e) {
3794 g_log<<Logger::Error<<"Assigning local query addresses: "<<e.what();
3795 exit(99);
3796 }
3797
3798 // keep this ABOVE loadRecursorLuaConfig!
3799 if(::arg()["dnssec"]=="off")
3800 g_dnssecmode=DNSSECMode::Off;
3801 else if(::arg()["dnssec"]=="process-no-validate")
3802 g_dnssecmode=DNSSECMode::ProcessNoValidate;
3803 else if(::arg()["dnssec"]=="process")
3804 g_dnssecmode=DNSSECMode::Process;
3805 else if(::arg()["dnssec"]=="validate")
3806 g_dnssecmode=DNSSECMode::ValidateAll;
3807 else if(::arg()["dnssec"]=="log-fail")
3808 g_dnssecmode=DNSSECMode::ValidateForLog;
3809 else {
3810 g_log<<Logger::Error<<"Unknown DNSSEC mode "<<::arg()["dnssec"]<<endl;
3811 exit(1);
3812 }
3813
3814 g_signatureInceptionSkew = ::arg().asNum("signature-inception-skew");
3815 if (g_signatureInceptionSkew < 0) {
3816 g_log<<Logger::Error<<"A negative value for 'signature-inception-skew' is not allowed"<<endl;
3817 exit(1);
3818 }
3819
3820 g_dnssecLogBogus = ::arg().mustDo("dnssec-log-bogus");
3821 g_maxNSEC3Iterations = ::arg().asNum("nsec3-max-iterations");
3822
3823 g_maxCacheEntries = ::arg().asNum("max-cache-entries");
3824 g_maxPacketCacheEntries = ::arg().asNum("max-packetcache-entries");
3825
3826 luaConfigDelayedThreads delayedLuaThreads;
3827 try {
3828 loadRecursorLuaConfig(::arg()["lua-config-file"], delayedLuaThreads);
3829 }
3830 catch (PDNSException &e) {
3831 g_log<<Logger::Error<<"Cannot load Lua configuration: "<<e.reason<<endl;
3832 exit(1);
3833 }
3834
3835 parseACLs();
3836 initPublicSuffixList(::arg()["public-suffix-list-file"]);
3837
3838 if(!::arg()["dont-query"].empty()) {
3839 vector<string> ips;
3840 stringtok(ips, ::arg()["dont-query"], ", ");
3841 ips.push_back("0.0.0.0");
3842 ips.push_back("::");
3843
3844 g_log<<Logger::Warning<<"Will not send queries to: ";
3845 for(vector<string>::const_iterator i = ips.begin(); i!= ips.end(); ++i) {
3846 SyncRes::addDontQuery(*i);
3847 if(i!=ips.begin())
3848 g_log<<Logger::Warning<<", ";
3849 g_log<<Logger::Warning<<*i;
3850 }
3851 g_log<<Logger::Warning<<endl;
3852 }
3853
3854 g_quiet=::arg().mustDo("quiet");
3855
3856 /* this needs to be done before parseACLs(), which call broadcastFunction() */
3857 g_weDistributeQueries = ::arg().mustDo("pdns-distributes-queries");
3858 if(g_weDistributeQueries) {
3859 g_log<<Logger::Warning<<"PowerDNS Recursor itself will distribute queries over threads"<<endl;
3860 }
3861
3862 setupDelegationOnly();
3863 g_outgoingEDNSBufsize=::arg().asNum("edns-outgoing-bufsize");
3864
3865 if(::arg()["trace"]=="fail") {
3866 SyncRes::setDefaultLogMode(SyncRes::Store);
3867 }
3868 else if(::arg().mustDo("trace")) {
3869 SyncRes::setDefaultLogMode(SyncRes::Log);
3870 ::arg().set("quiet")="no";
3871 g_quiet=false;
3872 g_dnssecLOG=true;
3873 }
3874 string myHostname = getHostname();
3875 if (myHostname == "UNKNOWN"){
3876 g_log<<Logger::Warning<<"Unable to get the hostname, NSID and id.server values will be empty"<<endl;
3877 myHostname = "";
3878 }
3879
3880 SyncRes::s_minimumTTL = ::arg().asNum("minimum-ttl-override");
3881 SyncRes::s_minimumECSTTL = ::arg().asNum("ecs-minimum-ttl-override");
3882
3883 SyncRes::s_nopacketcache = ::arg().mustDo("disable-packetcache");
3884
3885 SyncRes::s_maxnegttl=::arg().asNum("max-negative-ttl");
3886 SyncRes::s_maxbogusttl=::arg().asNum("max-cache-bogus-ttl");
3887 SyncRes::s_maxcachettl=max(::arg().asNum("max-cache-ttl"), 15);
3888 SyncRes::s_packetcachettl=::arg().asNum("packetcache-ttl");
3889 // Cap the packetcache-servfail-ttl to the packetcache-ttl
3890 uint32_t packetCacheServFailTTL = ::arg().asNum("packetcache-servfail-ttl");
3891 SyncRes::s_packetcacheservfailttl=(packetCacheServFailTTL > SyncRes::s_packetcachettl) ? SyncRes::s_packetcachettl : packetCacheServFailTTL;
3892 SyncRes::s_serverdownmaxfails=::arg().asNum("server-down-max-fails");
3893 SyncRes::s_serverdownthrottletime=::arg().asNum("server-down-throttle-time");
3894 SyncRes::s_serverID=::arg()["server-id"];
3895 SyncRes::s_maxqperq=::arg().asNum("max-qperq");
3896 SyncRes::s_maxtotusec=1000*::arg().asNum("max-total-msec");
3897 SyncRes::s_maxdepth=::arg().asNum("max-recursion-depth");
3898 SyncRes::s_rootNXTrust = ::arg().mustDo( "root-nx-trust");
3899 if(SyncRes::s_serverID.empty()) {
3900 SyncRes::s_serverID = myHostname;
3901 }
3902
3903 SyncRes::s_ecsipv4limit = ::arg().asNum("ecs-ipv4-bits");
3904 SyncRes::s_ecsipv6limit = ::arg().asNum("ecs-ipv6-bits");
3905 SyncRes::clearECSStats();
3906 SyncRes::s_ecsipv4cachelimit = ::arg().asNum("ecs-ipv4-cache-bits");
3907 SyncRes::s_ecsipv6cachelimit = ::arg().asNum("ecs-ipv6-cache-bits");
3908 SyncRes::s_ecscachelimitttl = ::arg().asNum("ecs-cache-limit-ttl");
3909
3910 SyncRes::s_qnameminimization = ::arg().mustDo("qname-minimization");
3911
3912 if (!::arg().isEmpty("ecs-scope-zero-address")) {
3913 ComboAddress scopeZero(::arg()["ecs-scope-zero-address"]);
3914 SyncRes::setECSScopeZeroAddress(Netmask(scopeZero, scopeZero.isIPv4() ? 32 : 128));
3915 }
3916 else {
3917 bool found = false;
3918 for (const auto& addr : g_localQueryAddresses4) {
3919 if (!IsAnyAddress(addr)) {
3920 SyncRes::setECSScopeZeroAddress(Netmask(addr, 32));
3921 found = true;
3922 break;
3923 }
3924 }
3925 if (!found) {
3926 for (const auto& addr : g_localQueryAddresses6) {
3927 if (!IsAnyAddress(addr)) {
3928 SyncRes::setECSScopeZeroAddress(Netmask(addr, 128));
3929 found = true;
3930 break;
3931 }
3932 }
3933 if (!found) {
3934 SyncRes::setECSScopeZeroAddress(Netmask("127.0.0.1/32"));
3935 }
3936 }
3937 }
3938
3939 SyncRes::parseEDNSSubnetWhitelist(::arg()["edns-subnet-whitelist"]);
3940 SyncRes::parseEDNSSubnetAddFor(::arg()["ecs-add-for"]);
3941 g_useIncomingECS = ::arg().mustDo("use-incoming-edns-subnet");
3942
3943 g_XPFAcl.toMasks(::arg()["xpf-allow-from"]);
3944 g_xpfRRCode = ::arg().asNum("xpf-rr-code");
3945
3946 g_networkTimeoutMsec = ::arg().asNum("network-timeout");
3947
3948 g_initialDomainMap = parseAuthAndForwards();
3949
3950 g_latencyStatSize=::arg().asNum("latency-statistic-size");
3951
3952 g_logCommonErrors=::arg().mustDo("log-common-errors");
3953 g_logRPZChanges = ::arg().mustDo("log-rpz-changes");
3954
3955 g_anyToTcp = ::arg().mustDo("any-to-tcp");
3956 g_udpTruncationThreshold = ::arg().asNum("udp-truncation-threshold");
3957
3958 g_lowercaseOutgoing = ::arg().mustDo("lowercase-outgoing");
3959
3960 g_numDistributorThreads = ::arg().asNum("distributor-threads");
3961 g_numWorkerThreads = ::arg().asNum("threads");
3962 if (g_numWorkerThreads < 1) {
3963 g_log<<Logger::Warning<<"Asked to run with 0 threads, raising to 1 instead"<<endl;
3964 g_numWorkerThreads = 1;
3965 }
3966
3967 g_numThreads = g_numDistributorThreads + g_numWorkerThreads;
3968 g_maxMThreads = ::arg().asNum("max-mthreads");
3969
3970 g_gettagNeedsEDNSOptions = ::arg().mustDo("gettag-needs-edns-options");
3971
3972 g_statisticsInterval = ::arg().asNum("statistics-interval");
3973
3974 {
3975 SuffixMatchNode dontThrottleNames;
3976 vector<string> parts;
3977 stringtok(parts, ::arg()["dont-throttle-names"]);
3978 for (const auto &p : parts) {
3979 dontThrottleNames.add(DNSName(p));
3980 }
3981 g_dontThrottleNames.setState(dontThrottleNames);
3982
3983 NetmaskGroup dontThrottleNetmasks;
3984 stringtok(parts, ::arg()["dont-throttle-netmasks"]);
3985 for (const auto &p : parts) {
3986 dontThrottleNetmasks.addMask(Netmask(p));
3987 }
3988 g_dontThrottleNetmasks.setState(dontThrottleNetmasks);
3989 }
3990
3991 s_balancingFactor = ::arg().asDouble("distribution-load-factor");
3992 if (s_balancingFactor != 0.0 && s_balancingFactor < 1.0) {
3993 s_balancingFactor = 0.0;
3994 g_log<<Logger::Warning<<"Asked to run with a distribution-load-factor below 1.0, disabling it instead"<<endl;
3995 }
3996
3997 #ifdef SO_REUSEPORT
3998 g_reusePort = ::arg().mustDo("reuseport");
3999 #endif
4000
4001 s_threadInfos.resize(g_numDistributorThreads + g_numWorkerThreads + /* handler */ 1);
4002
4003 if (g_reusePort) {
4004 if (g_weDistributeQueries) {
4005 /* first thread is the handler, then distributors */
4006 for (unsigned int threadId = 1; threadId <= g_numDistributorThreads; threadId++) {
4007 auto& deferredAdds = s_threadInfos.at(threadId).deferredAdds;
4008 auto& tcpSockets = s_threadInfos.at(threadId).tcpSockets;
4009 makeUDPServerSockets(deferredAdds);
4010 makeTCPServerSockets(deferredAdds, tcpSockets);
4011 }
4012 }
4013 else {
4014 /* first thread is the handler, there is no distributor here and workers are accepting queries */
4015 for (unsigned int threadId = 1; threadId <= g_numWorkerThreads; threadId++) {
4016 auto& deferredAdds = s_threadInfos.at(threadId).deferredAdds;
4017 auto& tcpSockets = s_threadInfos.at(threadId).tcpSockets;
4018 makeUDPServerSockets(deferredAdds);
4019 makeTCPServerSockets(deferredAdds, tcpSockets);
4020 }
4021 }
4022 }
4023 else {
4024 std::set<int> tcpSockets;
4025 /* we don't have reuseport so we can only open one socket per
4026 listening addr:port and everyone will listen on it */
4027 makeUDPServerSockets(g_deferredAdds);
4028 makeTCPServerSockets(g_deferredAdds, tcpSockets);
4029
4030 /* every listener (so distributor if g_weDistributeQueries, workers otherwise)
4031 needs to listen to the shared sockets */
4032 if (g_weDistributeQueries) {
4033 /* first thread is the handler, then distributors */
4034 for (unsigned int threadId = 1; threadId <= g_numDistributorThreads; threadId++) {
4035 s_threadInfos.at(threadId).tcpSockets = tcpSockets;
4036 }
4037 }
4038 else {
4039 /* first thread is the handler, there is no distributor here and workers are accepting queries */
4040 for (unsigned int threadId = 1; threadId <= g_numWorkerThreads; threadId++) {
4041 s_threadInfos.at(threadId).tcpSockets = tcpSockets;
4042 }
4043 }
4044 }
4045
4046 #ifdef NOD_ENABLED
4047 // Setup newly observed domain globals
4048 setupNODGlobal();
4049 #endif /* NOD_ENABLED */
4050
4051 int forks;
4052 for(forks = 0; forks < ::arg().asNum("processes") - 1; ++forks) {
4053 if(!fork()) // we are child
4054 break;
4055 }
4056
4057 if(::arg().mustDo("daemon")) {
4058 g_log<<Logger::Warning<<"Calling daemonize, going to background"<<endl;
4059 g_log.toConsole(Logger::Critical);
4060 daemonize();
4061 }
4062 signal(SIGUSR1,usr1Handler);
4063 signal(SIGUSR2,usr2Handler);
4064 signal(SIGPIPE,SIG_IGN);
4065
4066 checkOrFixFDS();
4067
4068 #ifdef HAVE_LIBSODIUM
4069 if (sodium_init() == -1) {
4070 g_log<<Logger::Error<<"Unable to initialize sodium crypto library"<<endl;
4071 exit(99);
4072 }
4073 #endif
4074
4075 openssl_thread_setup();
4076 openssl_seed();
4077 /* setup rng before chroot */
4078 dns_random_init();
4079
4080 if(::arg()["server-id"].empty()) {
4081 ::arg().set("server-id") = myHostname;
4082 }
4083
4084 int newgid=0;
4085 if(!::arg()["setgid"].empty())
4086 newgid = strToGID(::arg()["setgid"]);
4087 int newuid=0;
4088 if(!::arg()["setuid"].empty())
4089 newuid = strToUID(::arg()["setuid"]);
4090
4091 Utility::dropGroupPrivs(newuid, newgid);
4092
4093 if (!::arg()["chroot"].empty()) {
4094 #ifdef HAVE_SYSTEMD
4095 char *ns;
4096 ns = getenv("NOTIFY_SOCKET");
4097 if (ns != nullptr) {
4098 g_log<<Logger::Error<<"Unable to chroot when running from systemd. Please disable chroot= or set the 'Type' for this service to 'simple'"<<endl;
4099 exit(1);
4100 }
4101 #endif
4102 if (chroot(::arg()["chroot"].c_str())<0 || chdir("/") < 0) {
4103 g_log<<Logger::Error<<"Unable to chroot to '"+::arg()["chroot"]+"': "<<strerror (errno)<<", exiting"<<endl;
4104 exit(1);
4105 }
4106 else
4107 g_log<<Logger::Info<<"Chrooted to '"<<::arg()["chroot"]<<"'"<<endl;
4108 }
4109
4110 s_pidfname=::arg()["socket-dir"]+"/"+s_programname+".pid";
4111 if(!s_pidfname.empty())
4112 unlink(s_pidfname.c_str()); // remove possible old pid file
4113 writePid();
4114
4115 makeControlChannelSocket( ::arg().asNum("processes") > 1 ? forks : -1);
4116
4117 Utility::dropUserPrivs(newuid);
4118 try {
4119 /* we might still have capabilities remaining, for example if we have been started as root
4120 without --setuid (please don't do that) or as an unprivileged user with ambient capabilities
4121 like CAP_NET_BIND_SERVICE.
4122 */
4123 dropCapabilities();
4124 }
4125 catch(const std::exception& e) {
4126 g_log<<Logger::Warning<<e.what()<<endl;
4127 }
4128
4129 startLuaConfigDelayedThreads(delayedLuaThreads, g_luaconfs.getCopy().generation);
4130
4131 makeThreadPipes();
4132
4133 g_tcpTimeout=::arg().asNum("client-tcp-timeout");
4134 g_maxTCPPerClient=::arg().asNum("max-tcp-per-client");
4135 g_tcpMaxQueriesPerConn=::arg().asNum("max-tcp-queries-per-connection");
4136 s_maxUDPQueriesPerRound=::arg().asNum("max-udp-queries-per-round");
4137
4138 g_useKernelTimestamp = ::arg().mustDo("protobuf-use-kernel-timestamp");
4139
4140 blacklistStats(StatComponent::API, ::arg()["stats-api-blacklist"]);
4141 blacklistStats(StatComponent::Carbon, ::arg()["stats-carbon-blacklist"]);
4142 blacklistStats(StatComponent::RecControl, ::arg()["stats-rec-control-blacklist"]);
4143 blacklistStats(StatComponent::SNMP, ::arg()["stats-snmp-blacklist"]);
4144
4145 if (::arg().mustDo("snmp-agent")) {
4146 g_snmpAgent = std::make_shared<RecursorSNMPAgent>("recursor", ::arg()["snmp-master-socket"]);
4147 g_snmpAgent->run();
4148 }
4149
4150 int port = ::arg().asNum("udp-source-port-min");
4151 if(port < 1024 || port > 65535){
4152 g_log<<Logger::Error<<"Unable to launch, udp-source-port-min is not a valid port number"<<endl;
4153 exit(99); // this isn't going to fix itself either
4154 }
4155 s_minUdpSourcePort = port;
4156 port = ::arg().asNum("udp-source-port-max");
4157 if(port < 1024 || port > 65535 || port < s_minUdpSourcePort){
4158 g_log<<Logger::Error<<"Unable to launch, udp-source-port-max is not a valid port number or is smaller than udp-source-port-min"<<endl;
4159 exit(99); // this isn't going to fix itself either
4160 }
4161 s_maxUdpSourcePort = port;
4162 std::vector<string> parts {};
4163 stringtok(parts, ::arg()["udp-source-port-avoid"], ", ");
4164 for (const auto &part : parts)
4165 {
4166 port = std::stoi(part);
4167 if(port < 1024 || port > 65535){
4168 g_log<<Logger::Error<<"Unable to launch, udp-source-port-avoid contains an invalid port number: "<<part<<endl;
4169 exit(99); // this isn't going to fix itself either
4170 }
4171 s_avoidUdpSourcePorts.insert(port);
4172 }
4173
4174 unsigned int currentThreadId = 1;
4175 const auto cpusMap = parseCPUMap();
4176
4177 if(g_numThreads == 1) {
4178 g_log<<Logger::Warning<<"Operating unthreaded"<<endl;
4179 #ifdef HAVE_SYSTEMD
4180 sd_notify(0, "READY=1");
4181 #endif
4182
4183 /* This thread handles the web server, carbon, statistics and the control channel */
4184 auto& handlerInfos = s_threadInfos.at(0);
4185 handlerInfos.isHandler = true;
4186 handlerInfos.thread = std::thread(recursorThread, 0, "main");
4187
4188 setCPUMap(cpusMap, currentThreadId, pthread_self());
4189
4190 auto& infos = s_threadInfos.at(currentThreadId);
4191 infos.isListener = true;
4192 infos.isWorker = true;
4193 recursorThread(currentThreadId++, "worker");
4194 }
4195 else {
4196
4197 if (g_weDistributeQueries) {
4198 g_log<<Logger::Warning<<"Launching "<< g_numDistributorThreads <<" distributor threads"<<endl;
4199 for(unsigned int n=0; n < g_numDistributorThreads; ++n) {
4200 auto& infos = s_threadInfos.at(currentThreadId);
4201 infos.isListener = true;
4202 infos.thread = std::thread(recursorThread, currentThreadId++, "distr");
4203
4204 setCPUMap(cpusMap, currentThreadId, infos.thread.native_handle());
4205 }
4206 }
4207
4208 g_log<<Logger::Warning<<"Launching "<< g_numWorkerThreads <<" worker threads"<<endl;
4209
4210 for(unsigned int n=0; n < g_numWorkerThreads; ++n) {
4211 auto& infos = s_threadInfos.at(currentThreadId);
4212 infos.isListener = g_weDistributeQueries ? false : true;
4213 infos.isWorker = true;
4214 infos.thread = std::thread(recursorThread, currentThreadId++, "worker");
4215
4216 setCPUMap(cpusMap, currentThreadId, infos.thread.native_handle());
4217 }
4218
4219 #ifdef HAVE_SYSTEMD
4220 sd_notify(0, "READY=1");
4221 #endif
4222
4223 /* This thread handles the web server, carbon, statistics and the control channel */
4224 auto& infos = s_threadInfos.at(0);
4225 infos.isHandler = true;
4226 infos.thread = std::thread(recursorThread, 0, "web+stat");
4227
4228 s_threadInfos.at(0).thread.join();
4229 }
4230 return 0;
4231 }
4232
4233 static void* recursorThread(unsigned int n, const string& threadName)
4234 try
4235 {
4236 t_id=n;
4237 auto& threadInfo = s_threadInfos.at(t_id);
4238
4239 static string threadPrefix = "pdns-r/";
4240 setThreadName(threadPrefix + threadName);
4241
4242 SyncRes tmp(g_now); // make sure it allocates tsstorage before we do anything, like primeHints or so..
4243 SyncRes::setDomainMap(g_initialDomainMap);
4244 t_allowFrom = g_initialAllowFrom;
4245 t_udpclientsocks = std::unique_ptr<UDPClientSocks>(new UDPClientSocks());
4246 t_tcpClientCounts = std::unique_ptr<tcpClientCounts_t>(new tcpClientCounts_t());
4247 primeHints();
4248
4249 t_packetCache = std::unique_ptr<RecursorPacketCache>(new RecursorPacketCache());
4250
4251 g_log<<Logger::Warning<<"Done priming cache with root hints"<<endl;
4252
4253 #ifdef NOD_ENABLED
4254 if (threadInfo.isWorker)
4255 setupNODThread();
4256 #endif /* NOD_ENABLED */
4257
4258 /* the listener threads handle TCP queries */
4259 if(threadInfo.isWorker || threadInfo.isListener) {
4260 try {
4261 if(!::arg()["lua-dns-script"].empty()) {
4262 t_pdl = std::make_shared<RecursorLua4>();
4263 t_pdl->loadFile(::arg()["lua-dns-script"]);
4264 g_log<<Logger::Warning<<"Loaded 'lua' script from '"<<::arg()["lua-dns-script"]<<"'"<<endl;
4265 }
4266 }
4267 catch(std::exception &e) {
4268 g_log<<Logger::Error<<"Failed to load 'lua' script from '"<<::arg()["lua-dns-script"]<<"': "<<e.what()<<endl;
4269 _exit(99);
4270 }
4271 }
4272
4273 unsigned int ringsize=::arg().asNum("stats-ringbuffer-entries") / g_numWorkerThreads;
4274 if(ringsize) {
4275 t_remotes = std::unique_ptr<addrringbuf_t>(new addrringbuf_t());
4276 if(g_weDistributeQueries)
4277 t_remotes->set_capacity(::arg().asNum("stats-ringbuffer-entries") / g_numDistributorThreads);
4278 else
4279 t_remotes->set_capacity(ringsize);
4280 t_servfailremotes = std::unique_ptr<addrringbuf_t>(new addrringbuf_t());
4281 t_servfailremotes->set_capacity(ringsize);
4282 t_bogusremotes = std::unique_ptr<addrringbuf_t>(new addrringbuf_t());
4283 t_bogusremotes->set_capacity(ringsize);
4284 t_largeanswerremotes = std::unique_ptr<addrringbuf_t>(new addrringbuf_t());
4285 t_largeanswerremotes->set_capacity(ringsize);
4286 t_timeouts = std::unique_ptr<addrringbuf_t>(new addrringbuf_t());
4287 t_timeouts->set_capacity(ringsize);
4288
4289 t_queryring = std::unique_ptr<boost::circular_buffer<pair<DNSName, uint16_t> > >(new boost::circular_buffer<pair<DNSName, uint16_t> >());
4290 t_queryring->set_capacity(ringsize);
4291 t_servfailqueryring = std::unique_ptr<boost::circular_buffer<pair<DNSName, uint16_t> > >(new boost::circular_buffer<pair<DNSName, uint16_t> >());
4292 t_servfailqueryring->set_capacity(ringsize);
4293 t_bogusqueryring = std::unique_ptr<boost::circular_buffer<pair<DNSName, uint16_t> > >(new boost::circular_buffer<pair<DNSName, uint16_t> >());
4294 t_bogusqueryring->set_capacity(ringsize);
4295 }
4296
4297 MT=std::unique_ptr<MTasker<PacketID,string> >(new MTasker<PacketID,string>(::arg().asNum("stack-size")));
4298 threadInfo.mt = MT.get();
4299
4300 #ifdef HAVE_PROTOBUF
4301 /* start protobuf export threads if needed */
4302 auto luaconfsLocal = g_luaconfs.getLocal();
4303 checkProtobufExport(luaconfsLocal);
4304 checkOutgoingProtobufExport(luaconfsLocal);
4305 #endif /* HAVE_PROTOBUF */
4306 #ifdef HAVE_FSTRM
4307 checkFrameStreamExport(luaconfsLocal);
4308 #endif
4309
4310 PacketID pident;
4311
4312 t_fdm=getMultiplexer();
4313
4314 if(threadInfo.isHandler) {
4315 if(::arg().mustDo("webserver")) {
4316 g_log<<Logger::Warning << "Enabling web server" << endl;
4317 try {
4318 new RecursorWebServer(t_fdm);
4319 }
4320 catch(PDNSException &e) {
4321 g_log<<Logger::Error<<"Exception: "<<e.reason<<endl;
4322 exit(99);
4323 }
4324 }
4325 g_log<<Logger::Info<<"Enabled '"<< t_fdm->getName() << "' multiplexer"<<endl;
4326 }
4327 else {
4328
4329 t_fdm->addReadFD(threadInfo.pipes.readToThread, handlePipeRequest);
4330 t_fdm->addReadFD(threadInfo.pipes.readQueriesToThread, handlePipeRequest);
4331
4332 if (threadInfo.isListener) {
4333 if (g_reusePort) {
4334 /* then every listener has its own FDs */
4335 for(const auto deferred : threadInfo.deferredAdds) {
4336 t_fdm->addReadFD(deferred.first, deferred.second);
4337 }
4338 }
4339 else {
4340 /* otherwise all listeners are listening on the same ones */
4341 for(const auto deferred : g_deferredAdds) {
4342 t_fdm->addReadFD(deferred.first, deferred.second);
4343 }
4344 }
4345 }
4346 }
4347
4348 registerAllStats();
4349
4350 if(threadInfo.isHandler) {
4351 t_fdm->addReadFD(s_rcc.d_fd, handleRCC); // control channel
4352 }
4353
4354 unsigned int maxTcpClients=::arg().asNum("max-tcp-clients");
4355
4356 bool listenOnTCP(true);
4357
4358 time_t last_stat = 0;
4359 time_t last_carbon=0, last_lua_maintenance=0;
4360 time_t carbonInterval=::arg().asNum("carbon-interval");
4361 time_t luaMaintenanceInterval=::arg().asNum("lua-maintenance-interval");
4362 counter.store(0); // used to periodically execute certain tasks
4363 for(;;) {
4364 while(MT->schedule(&g_now)); // MTasker letting the mthreads do their thing
4365
4366 if(!(counter%500)) {
4367 MT->makeThread(houseKeeping, 0);
4368 }
4369
4370 if(!(counter%55)) {
4371 typedef vector<pair<int, FDMultiplexer::funcparam_t> > expired_t;
4372 expired_t expired=t_fdm->getTimeouts(g_now);
4373
4374 for(expired_t::iterator i=expired.begin() ; i != expired.end(); ++i) {
4375 shared_ptr<TCPConnection> conn=any_cast<shared_ptr<TCPConnection> >(i->second);
4376 if(g_logCommonErrors)
4377 g_log<<Logger::Warning<<"Timeout from remote TCP client "<< conn->d_remote.toStringWithPort() <<endl;
4378 t_fdm->removeReadFD(i->first);
4379 }
4380 }
4381
4382 counter++;
4383
4384 if(threadInfo.isHandler) {
4385 if(statsWanted || (g_statisticsInterval > 0 && (g_now.tv_sec - last_stat) >= g_statisticsInterval)) {
4386 doStats();
4387 last_stat = g_now.tv_sec;
4388 }
4389
4390 Utility::gettimeofday(&g_now, 0);
4391
4392 if((g_now.tv_sec - last_carbon) >= carbonInterval) {
4393 MT->makeThread(doCarbonDump, 0);
4394 last_carbon = g_now.tv_sec;
4395 }
4396 }
4397 if (t_pdl != nullptr) {
4398 // lua-dns-script directive is present, call the maintenance callback if needed
4399 /* remember that the listener threads handle TCP queries */
4400 if (threadInfo.isWorker || threadInfo.isListener) {
4401 // Only on threads processing queries
4402 if(g_now.tv_sec - last_lua_maintenance >= luaMaintenanceInterval) {
4403 t_pdl->maintenance();
4404 last_lua_maintenance = g_now.tv_sec;
4405 }
4406 }
4407 }
4408
4409 t_fdm->run(&g_now);
4410 // 'run' updates g_now for us
4411
4412 if(threadInfo.isListener) {
4413 if(listenOnTCP) {
4414 if(TCPConnection::getCurrentConnections() > maxTcpClients) { // shutdown, too many connections
4415 for(const auto fd : threadInfo.tcpSockets) {
4416 t_fdm->removeReadFD(fd);
4417 }
4418 listenOnTCP=false;
4419 }
4420 }
4421 else {
4422 if(TCPConnection::getCurrentConnections() <= maxTcpClients) { // reenable
4423 for(const auto fd : threadInfo.tcpSockets) {
4424 t_fdm->addReadFD(fd, handleNewTCPQuestion);
4425 }
4426 listenOnTCP=true;
4427 }
4428 }
4429 }
4430 }
4431 }
4432 catch(PDNSException &ae) {
4433 g_log<<Logger::Error<<"Exception: "<<ae.reason<<endl;
4434 return 0;
4435 }
4436 catch(std::exception &e) {
4437 g_log<<Logger::Error<<"STL Exception: "<<e.what()<<endl;
4438 return 0;
4439 }
4440 catch(...) {
4441 g_log<<Logger::Error<<"any other exception in main: "<<endl;
4442 return 0;
4443 }
4444
4445
4446 int main(int argc, char **argv)
4447 {
4448 g_argc = argc;
4449 g_argv = argv;
4450 g_stats.startupTime=time(0);
4451 Utility::srandom();
4452 versionSetProduct(ProductRecursor);
4453 reportBasicTypes();
4454 reportOtherTypes();
4455
4456 int ret = EXIT_SUCCESS;
4457
4458 try {
4459 ::arg().set("stack-size","stack size per mthread")="200000";
4460 ::arg().set("soa-minimum-ttl","Don't change")="0";
4461 ::arg().set("no-shuffle","Don't change")="off";
4462 ::arg().set("local-port","port to listen on")="53";
4463 ::arg().set("local-address","IP addresses to listen on, separated by spaces or commas. Also accepts ports.")="127.0.0.1";
4464 ::arg().setSwitch("non-local-bind", "Enable binding to non-local addresses by using FREEBIND / BINDANY socket options")="no";
4465 ::arg().set("trace","if we should output heaps of logging. set to 'fail' to only log failing domains")="off";
4466 ::arg().set("dnssec", "DNSSEC mode: off/process-no-validate (default)/process/log-fail/validate")="process-no-validate";
4467 ::arg().set("dnssec-log-bogus", "Log DNSSEC bogus validations")="no";
4468 ::arg().set("signature-inception-skew", "Allow the signature inception to be off by this number of seconds")="60";
4469 ::arg().set("daemon","Operate as a daemon")="no";
4470 ::arg().setSwitch("write-pid","Write a PID file")="yes";
4471 ::arg().set("loglevel","Amount of logging. Higher is more. Do not set below 3")="6";
4472 ::arg().set("disable-syslog","Disable logging to syslog, useful when running inside a supervisor that logs stdout")="no";
4473 ::arg().set("log-timestamp","Print timestamps in log lines, useful to disable when running with a tool that timestamps stdout already")="yes";
4474 ::arg().set("log-common-errors","If we should log rather common errors")="no";
4475 ::arg().set("chroot","switch to chroot jail")="";
4476 ::arg().set("setgid","If set, change group id to this gid for more security"
4477 #ifdef HAVE_SYSTEMD
4478 #define SYSTEMD_SETID_MSG ". When running inside systemd, use the User and Group settings in the unit-file!"
4479 SYSTEMD_SETID_MSG
4480 #endif
4481 )="";
4482 ::arg().set("setuid","If set, change user id to this uid for more security"
4483 #ifdef HAVE_SYSTEMD
4484 SYSTEMD_SETID_MSG
4485 #endif
4486 )="";
4487 ::arg().set("network-timeout", "Wait this number of milliseconds for network i/o")="1500";
4488 ::arg().set("threads", "Launch this number of threads")="2";
4489 ::arg().set("distributor-threads", "Launch this number of distributor threads, distributing queries to other threads")="0";
4490 ::arg().set("processes", "Launch this number of processes (EXPERIMENTAL, DO NOT CHANGE)")="1"; // if we un-experimental this, need to fix openssl rand seeding for multiple PIDs!
4491 ::arg().set("config-name","Name of this virtual configuration - will rename the binary image")="";
4492 ::arg().set("api-config-dir", "Directory where REST API stores config and zones") = "";
4493 ::arg().set("api-key", "Static pre-shared authentication key for access to the REST API") = "";
4494 ::arg().setSwitch("webserver", "Start a webserver (for REST API)") = "no";
4495 ::arg().set("webserver-address", "IP Address of webserver to listen on") = "127.0.0.1";
4496 ::arg().set("webserver-port", "Port of webserver to listen on") = "8082";
4497 ::arg().set("webserver-password", "Password required for accessing the webserver") = "";
4498 ::arg().set("webserver-allow-from","Webserver access is only allowed from these subnets")="127.0.0.1,::1";
4499 ::arg().set("webserver-loglevel", "Amount of logging in the webserver (none, normal, detailed)") = "normal";
4500 ::arg().set("carbon-ourname", "If set, overrides our reported hostname for carbon stats")="";
4501 ::arg().set("carbon-server", "If set, send metrics in carbon (graphite) format to this server IP address")="";
4502 ::arg().set("carbon-interval", "Number of seconds between carbon (graphite) updates")="30";
4503 ::arg().set("carbon-namespace", "If set overwrites the first part of the carbon string")="pdns";
4504 ::arg().set("carbon-instance", "If set overwrites the the instance name default")="recursor";
4505
4506 ::arg().set("statistics-interval", "Number of seconds between printing of recursor statistics, 0 to disable")="1800";
4507 ::arg().set("quiet","Suppress logging of questions and answers")="";
4508 ::arg().set("logging-facility","Facility to log messages as. 0 corresponds to local0")="";
4509 ::arg().set("config-dir","Location of configuration directory (recursor.conf)")=SYSCONFDIR;
4510 ::arg().set("socket-owner","Owner of socket")="";
4511 ::arg().set("socket-group","Group of socket")="";
4512 ::arg().set("socket-mode", "Permissions for socket")="";
4513
4514 ::arg().set("socket-dir",string("Where the controlsocket will live, ")+LOCALSTATEDIR+"/pdns-recursor when unset and not chrooted" )="";
4515 ::arg().set("delegation-only","Which domains we only accept delegations from")="";
4516 ::arg().set("query-local-address","Source IP address for sending queries")="0.0.0.0";
4517 ::arg().set("query-local-address6","Source IPv6 address for sending queries. IF UNSET, IPv6 WILL NOT BE USED FOR OUTGOING QUERIES")="";
4518 ::arg().set("client-tcp-timeout","Timeout in seconds when talking to TCP clients")="2";
4519 ::arg().set("max-mthreads", "Maximum number of simultaneous Mtasker threads")="2048";
4520 ::arg().set("max-tcp-clients","Maximum number of simultaneous TCP clients")="128";
4521 ::arg().set("server-down-max-fails","Maximum number of consecutive timeouts (and unreachables) to mark a server as down ( 0 => disabled )")="64";
4522 ::arg().set("server-down-throttle-time","Number of seconds to throttle all queries to a server after being marked as down")="60";
4523 ::arg().set("dont-throttle-names", "Do not throttle nameservers with this name or suffix")="";
4524 ::arg().set("dont-throttle-netmasks", "Do not throttle nameservers with this IP netmask")="";
4525 ::arg().set("hint-file", "If set, load root hints from this file")="";
4526 ::arg().set("max-cache-entries", "If set, maximum number of entries in the main cache")="1000000";
4527 ::arg().set("max-negative-ttl", "maximum number of seconds to keep a negative cached entry in memory")="3600";
4528 ::arg().set("max-cache-bogus-ttl", "maximum number of seconds to keep a Bogus (positive or negative) cached entry in memory")="3600";
4529 ::arg().set("max-cache-ttl", "maximum number of seconds to keep a cached entry in memory")="86400";
4530 ::arg().set("packetcache-ttl", "maximum number of seconds to keep a cached entry in packetcache")="3600";
4531 ::arg().set("max-packetcache-entries", "maximum number of entries to keep in the packetcache")="500000";
4532 ::arg().set("packetcache-servfail-ttl", "maximum number of seconds to keep a cached servfail entry in packetcache")="60";
4533 ::arg().set("server-id", "Returned when queried for 'id.server' TXT or NSID, defaults to hostname, set custom or 'disabled'")="";
4534 ::arg().set("stats-ringbuffer-entries", "maximum number of packets to store statistics for")="10000";
4535 ::arg().set("version-string", "string reported on version.pdns or version.bind")=fullVersionString();
4536 ::arg().set("allow-from", "If set, only allow these comma separated netmasks to recurse")=LOCAL_NETS;
4537 ::arg().set("allow-from-file", "If set, load allowed netmasks from this file")="";
4538 ::arg().set("entropy-source", "If set, read entropy from this file")="/dev/urandom";
4539 ::arg().set("dont-query", "If set, do not query these netmasks for DNS data")=DONT_QUERY;
4540 ::arg().set("max-tcp-per-client", "If set, maximum number of TCP sessions per client (IP address)")="0";
4541 ::arg().set("max-tcp-queries-per-connection", "If set, maximum number of TCP queries in a TCP connection")="0";
4542 ::arg().set("spoof-nearmiss-max", "If non-zero, assume spoofing after this many near misses")="20";
4543 ::arg().set("single-socket", "If set, only use a single socket for outgoing queries")="off";
4544 ::arg().set("auth-zones", "Zones for which we have authoritative data, comma separated domain=file pairs ")="";
4545 ::arg().set("lua-config-file", "More powerful configuration options")="";
4546 ::arg().setSwitch("allow-trust-anchor-query", "Allow queries for trustanchor.server CH TXT and negativetrustanchor.server CH TXT")="no";
4547
4548 ::arg().set("forward-zones", "Zones for which we forward queries, comma separated domain=ip pairs")="";
4549 ::arg().set("forward-zones-recurse", "Zones for which we forward queries with recursion bit, comma separated domain=ip pairs")="";
4550 ::arg().set("forward-zones-file", "File with (+)domain=ip pairs for forwarding")="";
4551 ::arg().set("export-etc-hosts", "If we should serve up contents from /etc/hosts")="off";
4552 ::arg().set("export-etc-hosts-search-suffix", "Also serve up the contents of /etc/hosts with this suffix")="";
4553 ::arg().set("etc-hosts-file", "Path to 'hosts' file")="/etc/hosts";
4554 ::arg().set("serve-rfc1918", "If we should be authoritative for RFC 1918 private IP space")="yes";
4555 ::arg().set("lua-dns-script", "Filename containing an optional 'lua' script that will be used to modify dns answers")="";
4556 ::arg().set("lua-maintenance-interval", "Number of seconds between calls to the lua user defined maintenance() function")="1";
4557 ::arg().set("latency-statistic-size","Number of latency values to calculate the qa-latency average")="10000";
4558 ::arg().setSwitch( "disable-packetcache", "Disable packetcache" )= "no";
4559 ::arg().set("ecs-ipv4-bits", "Number of bits of IPv4 address to pass for EDNS Client Subnet")="24";
4560 ::arg().set("ecs-ipv4-cache-bits", "Maximum number of bits of IPv4 mask to cache ECS response")="24";
4561 ::arg().set("ecs-ipv6-bits", "Number of bits of IPv6 address to pass for EDNS Client Subnet")="56";
4562 ::arg().set("ecs-ipv6-cache-bits", "Maximum number of bits of IPv6 mask to cache ECS response")="56";
4563 ::arg().set("ecs-minimum-ttl-override", "Set under adverse conditions, a minimum TTL for records in ECS-specific answers")="0";
4564 ::arg().set("ecs-cache-limit-ttl", "Minimum TTL to cache ECS response")="0";
4565 ::arg().set("edns-subnet-whitelist", "List of netmasks and domains that we should enable EDNS subnet for")="";
4566 ::arg().set("ecs-add-for", "List of client netmasks for which EDNS Client Subnet will be added")="0.0.0.0/0, ::/0, " LOCAL_NETS_INVERSE;
4567 ::arg().set("ecs-scope-zero-address", "Address to send to whitelisted authoritative servers for incoming queries with ECS prefix-length source of 0")="";
4568 ::arg().setSwitch( "use-incoming-edns-subnet", "Pass along received EDNS Client Subnet information")="no";
4569 ::arg().setSwitch( "pdns-distributes-queries", "If PowerDNS itself should distribute queries over threads")="yes";
4570 ::arg().setSwitch( "root-nx-trust", "If set, believe that an NXDOMAIN from the root means the TLD does not exist")="yes";
4571 ::arg().setSwitch( "any-to-tcp","Answer ANY queries with tc=1, shunting to TCP" )="no";
4572 ::arg().setSwitch( "lowercase-outgoing","Force outgoing questions to lowercase")="no";
4573 ::arg().setSwitch("gettag-needs-edns-options", "If EDNS Options should be extracted before calling the gettag() hook")="no";
4574 ::arg().set("udp-truncation-threshold", "Maximum UDP response size before we truncate")="1232";
4575 ::arg().set("edns-outgoing-bufsize", "Outgoing EDNS buffer size")="1232";
4576 ::arg().set("minimum-ttl-override", "Set under adverse conditions, a minimum TTL")="0";
4577 ::arg().set("max-qperq", "Maximum outgoing queries per query")="50";
4578 ::arg().set("max-total-msec", "Maximum total wall-clock time per query in milliseconds, 0 for unlimited")="7000";
4579 ::arg().set("max-recursion-depth", "Maximum number of internal recursion calls per query, 0 for unlimited")="40";
4580 ::arg().set("max-udp-queries-per-round", "Maximum number of UDP queries processed per recvmsg() round, before returning back to normal processing")="10000";
4581 ::arg().set("protobuf-use-kernel-timestamp", "Compute the latency of queries in protobuf messages by using the timestamp set by the kernel when the query was received (when available)")="";
4582 ::arg().set("distribution-pipe-buffer-size", "Size in bytes of the internal buffer of the pipe used by the distributor to pass incoming queries to a worker thread")="0";
4583
4584 ::arg().set("include-dir","Include *.conf files from this directory")="";
4585 ::arg().set("security-poll-suffix","Domain name from which to query security update notifications")="secpoll.powerdns.com.";
4586
4587 ::arg().setSwitch("reuseport","Enable SO_REUSEPORT allowing multiple recursors processes to listen to 1 address")="no";
4588
4589 ::arg().setSwitch("snmp-agent", "If set, register as an SNMP agent")="no";
4590 ::arg().set("snmp-master-socket", "If set and snmp-agent is set, the socket to use to register to the SNMP master")="";
4591
4592 std::string defaultBlacklistedStats = "cache-bytes, packetcache-bytes, special-memory-usage";
4593 for (size_t idx = 0; idx < 32; idx++) {
4594 defaultBlacklistedStats += ", ecs-v4-response-bits-" + std::to_string(idx + 1);
4595 }
4596 for (size_t idx = 0; idx < 128; idx++) {
4597 defaultBlacklistedStats += ", ecs-v6-response-bits-" + std::to_string(idx + 1);
4598 }
4599 ::arg().set("stats-api-blacklist", "List of statistics that are disabled when retrieving the complete list of statistics via the API")=defaultBlacklistedStats;
4600 ::arg().set("stats-carbon-blacklist", "List of statistics that are prevented from being exported via Carbon")=defaultBlacklistedStats;
4601 ::arg().set("stats-rec-control-blacklist", "List of statistics that are prevented from being exported via rec_control get-all")=defaultBlacklistedStats;
4602 ::arg().set("stats-snmp-blacklist", "List of statistics that are prevented from being exported via SNMP")=defaultBlacklistedStats;
4603
4604 ::arg().set("tcp-fast-open", "Enable TCP Fast Open support on the listening sockets, using the supplied numerical value as the queue size")="0";
4605 ::arg().set("nsec3-max-iterations", "Maximum number of iterations allowed for an NSEC3 record")="2500";
4606
4607 ::arg().set("cpu-map", "Thread to CPU mapping, space separated thread-id=cpu1,cpu2..cpuN pairs")="";
4608
4609 ::arg().setSwitch("log-rpz-changes", "Log additions and removals to RPZ zones at Info level")="no";
4610
4611 ::arg().set("xpf-allow-from","XPF information is only processed from these subnets")="";
4612 ::arg().set("xpf-rr-code","XPF option code to use")="0";
4613
4614 ::arg().set("udp-source-port-min", "Minimum UDP port to bind on")="1024";
4615 ::arg().set("udp-source-port-max", "Maximum UDP port to bind on")="65535";
4616 ::arg().set("udp-source-port-avoid", "List of comma separated UDP port number to avoid")="11211";
4617 ::arg().set("rng", "Specify random number generator to use. Valid values are auto,sodium,openssl,getrandom,arc4random,urandom.")="auto";
4618 ::arg().set("public-suffix-list-file", "Path to the Public Suffix List file, if any")="";
4619 ::arg().set("distribution-load-factor", "The load factor used when PowerDNS is distributing queries to worker threads")="0.0";
4620 ::arg().setSwitch("qname-minimization", "Use Query Name Minimization")="no";
4621 #ifdef NOD_ENABLED
4622 ::arg().set("new-domain-tracking", "Track newly observed domains (i.e. never seen before).")="no";
4623 ::arg().set("new-domain-log", "Log newly observed domains.")="yes";
4624 ::arg().set("new-domain-lookup", "Perform a DNS lookup newly observed domains as a subdomain of the configured domain")="";
4625 ::arg().set("new-domain-history-dir", "Persist new domain tracking data here to persist between restarts")=string(NODCACHEDIR)+"/nod";
4626 ::arg().set("new-domain-whitelist", "List of domains (and implicitly all subdomains) which will never be considered a new domain")="";
4627 ::arg().set("new-domain-db-size", "Size of the DB used to track new domains in terms of number of cells. Defaults to 67108864")="67108864";
4628 ::arg().set("new-domain-pb-tag", "If protobuf is configured, the tag to use for messages containing newly observed domains. Defaults to 'pdns-nod'")="pdns-nod";
4629 ::arg().set("unique-response-tracking", "Track unique responses (tuple of query name, type and RR).")="no";
4630 ::arg().set("unique-response-log", "Log unique responses")="yes";
4631 ::arg().set("unique-response-history-dir", "Persist unique response tracking data here to persist between restarts")=string(NODCACHEDIR)+"/udr";
4632 ::arg().set("unique-response-db-size", "Size of the DB used to track unique responses in terms of number of cells. Defaults to 67108864")="67108864";
4633 ::arg().set("unique-response-pb-tag", "If protobuf is configured, the tag to use for messages containing unique DNS responses. Defaults to 'pdns-udr'")="pdns-udr";
4634 #endif /* NOD_ENABLED */
4635 ::arg().setCmd("help","Provide a helpful message");
4636 ::arg().setCmd("version","Print version string");
4637 ::arg().setCmd("config","Output blank configuration");
4638 g_log.toConsole(Logger::Info);
4639 ::arg().laxParse(argc,argv); // do a lax parse
4640
4641 string configname=::arg()["config-dir"]+"/recursor.conf";
4642 if(::arg()["config-name"]!="") {
4643 configname=::arg()["config-dir"]+"/recursor-"+::arg()["config-name"]+".conf";
4644 s_programname+="-"+::arg()["config-name"];
4645 }
4646 cleanSlashes(configname);
4647
4648 if(!::arg().getCommands().empty()) {
4649 cerr<<"Fatal: non-option on the command line, perhaps a '--setting=123' statement missed the '='?"<<endl;
4650 exit(99);
4651 }
4652
4653 if(::arg().mustDo("config")) {
4654 cout<<::arg().configstring()<<endl;
4655 exit(0);
4656 }
4657
4658 if(!::arg().file(configname.c_str()))
4659 g_log<<Logger::Warning<<"Unable to parse configuration file '"<<configname<<"'"<<endl;
4660
4661 ::arg().parse(argc,argv);
4662
4663 if( !::arg()["chroot"].empty() && !::arg()["api-config-dir"].empty() ) {
4664 g_log<<Logger::Error<<"Using chroot and enabling the API is not possible"<<endl;
4665 exit(EXIT_FAILURE);
4666 }
4667
4668 if (::arg()["socket-dir"].empty()) {
4669 if (::arg()["chroot"].empty())
4670 ::arg().set("socket-dir") = std::string(LOCALSTATEDIR) + "/pdns-recursor";
4671 else
4672 ::arg().set("socket-dir") = "/";
4673 }
4674
4675 ::arg().set("delegation-only")=toLower(::arg()["delegation-only"]);
4676
4677 if(::arg().asNum("threads")==1) {
4678 if (::arg().mustDo("pdns-distributes-queries")) {
4679 g_log<<Logger::Warning<<"Only one thread, no need to distribute queries ourselves"<<endl;
4680 ::arg().set("pdns-distributes-queries")="no";
4681 }
4682 }
4683
4684 if(::arg().mustDo("pdns-distributes-queries") && ::arg().asNum("distributor-threads") <= 0) {
4685 g_log<<Logger::Warning<<"Asked to run with pdns-distributes-queries set but no distributor threads, raising to 1"<<endl;
4686 ::arg().set("distributor-threads")="1";
4687 }
4688
4689 if (!::arg().mustDo("pdns-distributes-queries")) {
4690 ::arg().set("distributor-threads")="0";
4691 }
4692
4693 if(::arg().mustDo("help")) {
4694 cout<<"syntax:"<<endl<<endl;
4695 cout<<::arg().helpstring(::arg()["help"])<<endl;
4696 exit(0);
4697 }
4698 if(::arg().mustDo("version")) {
4699 showProductVersion();
4700 showBuildConfiguration();
4701 exit(0);
4702 }
4703
4704 Logger::Urgency logUrgency = (Logger::Urgency)::arg().asNum("loglevel");
4705
4706 if (logUrgency < Logger::Error)
4707 logUrgency = Logger::Error;
4708 if(!g_quiet && logUrgency < Logger::Info) { // Logger::Info=6, Logger::Debug=7
4709 logUrgency = Logger::Info; // if you do --quiet=no, you need Info to also see the query log
4710 }
4711 g_log.setLoglevel(logUrgency);
4712 g_log.toConsole(logUrgency);
4713
4714 serviceMain(argc, argv);
4715 }
4716 catch(PDNSException &ae) {
4717 g_log<<Logger::Error<<"Exception: "<<ae.reason<<endl;
4718 ret=EXIT_FAILURE;
4719 }
4720 catch(std::exception &e) {
4721 g_log<<Logger::Error<<"STL Exception: "<<e.what()<<endl;
4722 ret=EXIT_FAILURE;
4723 }
4724 catch(...) {
4725 g_log<<Logger::Error<<"any other exception in main: "<<endl;
4726 ret=EXIT_FAILURE;
4727 }
4728
4729 return ret;
4730 }