]> git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/pdns_recursor.cc
Merge pull request #6867 from pieterlexis/rec-414-backports
[thirdparty/pdns.git] / pdns / pdns_recursor.cc
1 /*
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
8 *
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
22 #ifdef HAVE_CONFIG_H
23 #include "config.h"
24 #endif
25
26 #include <netdb.h>
27 #include <sys/stat.h>
28 #include <unistd.h>
29
30 #include "recpacketcache.hh"
31 #include "ws-recursor.hh"
32 #include <pthread.h>
33 #include "utility.hh"
34 #include "dns_random.hh"
35 #ifdef HAVE_LIBSODIUM
36 #include <sodium.h>
37 #endif
38 #include "opensslsigners.hh"
39 #include <iostream>
40 #include <errno.h>
41 #include <boost/static_assert.hpp>
42 #include <map>
43 #include <set>
44 #include "recursor_cache.hh"
45 #include "cachecleaner.hh"
46 #include <stdio.h>
47 #include <signal.h>
48 #include <stdlib.h>
49 #include "misc.hh"
50 #include "mtasker.hh"
51 #include <utility>
52 #include "arguments.hh"
53 #include "syncres.hh"
54 #include <fcntl.h>
55 #include <fstream>
56 #include "sortlist.hh"
57 #include "sstuff.hh"
58 #include <boost/tuple/tuple.hpp>
59 #include <boost/tuple/tuple_comparison.hpp>
60 #include <boost/shared_array.hpp>
61 #include <boost/function.hpp>
62 #include <boost/algorithm/string.hpp>
63 #ifdef MALLOC_TRACE
64 #include "malloctrace.hh"
65 #endif
66 #include <netinet/tcp.h>
67 #include "dnsparser.hh"
68 #include "dnswriter.hh"
69 #include "dnsrecords.hh"
70 #include "zoneparser-tng.hh"
71 #include "rec_channel.hh"
72 #include "logger.hh"
73 #include "iputils.hh"
74 #include "mplexer.hh"
75 #include "config.h"
76 #include "lua-recursor4.hh"
77 #include "version.hh"
78 #include "responsestats.hh"
79 #include "secpoll-recursor.hh"
80 #include "dnsname.hh"
81 #include "filterpo.hh"
82 #include "rpzloader.hh"
83 #include "validate-recursor.hh"
84 #include "rec-lua-conf.hh"
85 #include "ednsoptions.hh"
86 #include "gettime.hh"
87
88 #include "rec-protobuf.hh"
89 #include "rec-snmp.hh"
90
91 #ifdef HAVE_SYSTEMD
92 #include <systemd/sd-daemon.h>
93 #endif
94
95 #include "namespaces.hh"
96
97 typedef map<ComboAddress, uint32_t, ComboAddress::addressOnlyLessThan> tcpClientCounts_t;
98
99 static thread_local std::shared_ptr<RecursorLua4> t_pdl;
100 static thread_local int t_id = -1;
101 static thread_local std::shared_ptr<Regex> t_traceRegex;
102 static thread_local std::unique_ptr<tcpClientCounts_t> t_tcpClientCounts;
103
104 thread_local std::unique_ptr<MT_t> MT; // the big MTasker
105 thread_local std::unique_ptr<MemRecursorCache> t_RC;
106 thread_local std::unique_ptr<RecursorPacketCache> t_packetCache;
107 thread_local FDMultiplexer* t_fdm{nullptr};
108 thread_local std::unique_ptr<addrringbuf_t> t_remotes, t_servfailremotes, t_largeanswerremotes;
109 thread_local std::unique_ptr<boost::circular_buffer<pair<DNSName, uint16_t> > > t_queryring, t_servfailqueryring;
110 thread_local std::shared_ptr<NetmaskGroup> t_allowFrom;
111 #ifdef HAVE_PROTOBUF
112 thread_local std::unique_ptr<boost::uuids::random_generator> t_uuidGenerator;
113 #endif
114 __thread struct timeval g_now; // timestamp, updated (too) frequently
115
116 // for communicating with our threads
117 struct ThreadPipeSet
118 {
119 int writeToThread;
120 int readToThread;
121 int writeFromThread;
122 int readFromThread;
123 int writeQueriesToThread; // this one is non-blocking
124 int readQueriesToThread;
125 };
126
127 /* the TID of the thread handling the web server, carbon, statistics and the control channel */
128 static const int s_handlerThreadID = -1;
129 /* when pdns-distributes-queries is set, the TID of the thread handling, hashing and distributing new queries
130 to the other threads */
131 static const int s_distributorThreadID = 0;
132
133 typedef std::map<int, std::set<int>> tcpListenSockets_t;
134 typedef map<int, ComboAddress> listenSocketsAddresses_t; // is shared across all threads right now
135
136 typedef vector<pair<int, function< void(int, any&) > > > deferredAdd_t;
137
138 static const ComboAddress g_local4("0.0.0.0"), g_local6("::");
139 static vector<ThreadPipeSet> g_pipes; // effectively readonly after startup
140 static tcpListenSockets_t g_tcpListenSockets; // shared across threads, but this is fine, never written to from a thread. All threads listen on all sockets
141 static listenSocketsAddresses_t g_listenSocketsAddresses; // is shared across all threads right now
142 static std::unordered_map<unsigned int, deferredAdd_t> deferredAdds;
143 static set<int> g_fromtosockets; // listen sockets that use 'sendfromto()' mechanism
144 static vector<ComboAddress> g_localQueryAddresses4, g_localQueryAddresses6;
145 static AtomicCounter counter;
146 static std::shared_ptr<SyncRes::domainmap_t> g_initialDomainMap; // new threads needs this to be setup
147 static std::shared_ptr<NetmaskGroup> g_initialAllowFrom; // new thread needs to be setup with this
148 static size_t g_tcpMaxQueriesPerConn;
149 static size_t s_maxUDPQueriesPerRound;
150 static uint64_t g_latencyStatSize;
151 static uint32_t g_disthashseed;
152 static unsigned int g_maxTCPPerClient;
153 static unsigned int g_networkTimeoutMsec;
154 static unsigned int g_maxMThreads;
155 static unsigned int g_numWorkerThreads;
156 static int g_tcpTimeout;
157 static uint16_t g_udpTruncationThreshold;
158 static std::atomic<bool> statsWanted;
159 static std::atomic<bool> g_quiet;
160 static bool g_logCommonErrors;
161 static bool g_anyToTcp;
162 static bool g_weDistributeQueries; // if true, only 1 thread listens on the incoming query sockets
163 static bool g_reusePort{false};
164 static bool g_useOneSocketPerThread;
165 static bool g_gettagNeedsEDNSOptions{false};
166 static time_t g_statisticsInterval;
167 static bool g_useIncomingECS;
168 std::atomic<uint32_t> g_maxCacheEntries, g_maxPacketCacheEntries;
169
170 RecursorControlChannel s_rcc; // only active in thread 0
171 RecursorStats g_stats;
172 string s_programname="pdns_recursor";
173 string s_pidfname;
174 bool g_lowercaseOutgoing;
175 unsigned int g_numThreads;
176 uint16_t g_outgoingEDNSBufsize;
177 bool g_logRPZChanges{false};
178
179 #define LOCAL_NETS "127.0.0.0/8, 10.0.0.0/8, 100.64.0.0/10, 169.254.0.0/16, 192.168.0.0/16, 172.16.0.0/12, ::1/128, fc00::/7, fe80::/10"
180 // Bad Nets taken from both:
181 // http://www.iana.org/assignments/iana-ipv4-special-registry/iana-ipv4-special-registry.xhtml
182 // and
183 // http://www.iana.org/assignments/iana-ipv6-special-registry/iana-ipv6-special-registry.xhtml
184 // where such a network may not be considered a valid destination
185 #define BAD_NETS "0.0.0.0/8, 192.0.0.0/24, 192.0.2.0/24, 198.51.100.0/24, 203.0.113.0/24, 240.0.0.0/4, ::/96, ::ffff:0:0/96, 100::/64, 2001:db8::/32"
186 #define DONT_QUERY LOCAL_NETS ", " BAD_NETS
187
188 //! used to send information to a newborn mthread
189 struct DNSComboWriter {
190 DNSComboWriter(const char* data, uint16_t len, const struct timeval& now) : d_mdp(true, data, len), d_now(now),
191 d_tcp(false), d_socket(-1)
192 {}
193 MOADNSParser d_mdp;
194 void setRemote(const ComboAddress* sa)
195 {
196 d_remote=*sa;
197 }
198
199 void setLocal(const ComboAddress& sa)
200 {
201 d_local=sa;
202 }
203
204
205 void setSocket(int sock)
206 {
207 d_socket=sock;
208 }
209
210 string getRemote() const
211 {
212 return d_remote.toString();
213 }
214
215 struct timeval d_now;
216 ComboAddress d_remote, d_local;
217 #ifdef HAVE_PROTOBUF
218 boost::uuids::uuid d_uuid;
219 string d_requestorId;
220 string d_deviceId;
221 #endif
222 EDNSSubnetOpts d_ednssubnet;
223 bool d_ecsFound{false};
224 bool d_ecsParsed{false};
225 bool d_tcp;
226 int d_socket;
227 unsigned int d_tag{0};
228 uint32_t d_qhash{0};
229 string d_query;
230 shared_ptr<TCPConnection> d_tcpConnection;
231 vector<pair<uint16_t, string> > d_ednsOpts;
232 std::vector<std::string> d_policyTags;
233 LuaContext::LuaObject d_data;
234 uint32_t d_ttlCap{std::numeric_limits<uint32_t>::max()};
235 bool d_variable{false};
236 };
237
238 MT_t* getMT()
239 {
240 return MT ? MT.get() : nullptr;
241 }
242
243 ArgvMap &arg()
244 {
245 static ArgvMap theArg;
246 return theArg;
247 }
248
249 int getRecursorThreadId()
250 {
251 return t_id;
252 }
253
254 int getMTaskerTID()
255 {
256 return MT->getTid();
257 }
258
259 static void handleTCPClientWritable(int fd, FDMultiplexer::funcparam_t& var);
260
261 // -1 is error, 0 is timeout, 1 is success
262 int asendtcp(const string& data, Socket* sock)
263 {
264 PacketID pident;
265 pident.sock=sock;
266 pident.outMSG=data;
267
268 t_fdm->addWriteFD(sock->getHandle(), handleTCPClientWritable, pident);
269 string packet;
270
271 int ret=MT->waitEvent(pident, &packet, g_networkTimeoutMsec);
272
273 if(!ret || ret==-1) { // timeout
274 t_fdm->removeWriteFD(sock->getHandle());
275 }
276 else if(packet.size() !=data.size()) { // main loop tells us what it sent out, or empty in case of an error
277 return -1;
278 }
279 return ret;
280 }
281
282 static void handleTCPClientReadable(int fd, FDMultiplexer::funcparam_t& var);
283
284 // -1 is error, 0 is timeout, 1 is success
285 int arecvtcp(string& data, size_t len, Socket* sock, bool incompleteOkay)
286 {
287 data.clear();
288 PacketID pident;
289 pident.sock=sock;
290 pident.inNeeded=len;
291 pident.inIncompleteOkay=incompleteOkay;
292 t_fdm->addReadFD(sock->getHandle(), handleTCPClientReadable, pident);
293
294 int ret=MT->waitEvent(pident,&data, g_networkTimeoutMsec);
295 if(!ret || ret==-1) { // timeout
296 t_fdm->removeReadFD(sock->getHandle());
297 }
298 else if(data.empty()) {// error, EOF or other
299 return -1;
300 }
301
302 return ret;
303 }
304
305 static void handleGenUDPQueryResponse(int fd, FDMultiplexer::funcparam_t& var)
306 {
307 PacketID pident=*any_cast<PacketID>(&var);
308 char resp[512];
309 ssize_t ret=recv(fd, resp, sizeof(resp), 0);
310 t_fdm->removeReadFD(fd);
311 if(ret >= 0) {
312 string data(resp, (size_t) ret);
313 MT->sendEvent(pident, &data);
314 }
315 else {
316 string empty;
317 MT->sendEvent(pident, &empty);
318 // cerr<<"Had some kind of error: "<<ret<<", "<<strerror(errno)<<endl;
319 }
320 }
321 string GenUDPQueryResponse(const ComboAddress& dest, const string& query)
322 {
323 Socket s(dest.sin4.sin_family, SOCK_DGRAM);
324 s.setNonBlocking();
325 ComboAddress local = getQueryLocalAddress(dest.sin4.sin_family, 0);
326
327 s.bind(local);
328 s.connect(dest);
329 s.send(query);
330
331 PacketID pident;
332 pident.sock=&s;
333 pident.type=0;
334 t_fdm->addReadFD(s.getHandle(), handleGenUDPQueryResponse, pident);
335
336 string data;
337
338 int ret=MT->waitEvent(pident,&data, g_networkTimeoutMsec);
339
340 if(!ret || ret==-1) { // timeout
341 t_fdm->removeReadFD(s.getHandle());
342 }
343 else if(data.empty()) {// error, EOF or other
344 // we could special case this
345 return data;
346 }
347 return data;
348 }
349
350 //! pick a random query local address
351 ComboAddress getQueryLocalAddress(int family, uint16_t port)
352 {
353 ComboAddress ret;
354 if(family==AF_INET) {
355 if(g_localQueryAddresses4.empty())
356 ret = g_local4;
357 else
358 ret = g_localQueryAddresses4[dns_random(g_localQueryAddresses4.size())];
359 ret.sin4.sin_port = htons(port);
360 }
361 else {
362 if(g_localQueryAddresses6.empty())
363 ret = g_local6;
364 else
365 ret = g_localQueryAddresses6[dns_random(g_localQueryAddresses6.size())];
366
367 ret.sin6.sin6_port = htons(port);
368 }
369 return ret;
370 }
371
372 static void handleUDPServerResponse(int fd, FDMultiplexer::funcparam_t&);
373
374 static void setSocketBuffer(int fd, int optname, uint32_t size)
375 {
376 uint32_t psize=0;
377 socklen_t len=sizeof(psize);
378
379 if(!getsockopt(fd, SOL_SOCKET, optname, (char*)&psize, &len) && psize > size) {
380 L<<Logger::Error<<"Not decreasing socket buffer size from "<<psize<<" to "<<size<<endl;
381 return;
382 }
383
384 if (setsockopt(fd, SOL_SOCKET, optname, (char*)&size, sizeof(size)) < 0 )
385 L<<Logger::Error<<"Unable to raise socket buffer size to "<<size<<": "<<strerror(errno)<<endl;
386 }
387
388
389 static void setSocketReceiveBuffer(int fd, uint32_t size)
390 {
391 setSocketBuffer(fd, SO_RCVBUF, size);
392 }
393
394 static void setSocketSendBuffer(int fd, uint32_t size)
395 {
396 setSocketBuffer(fd, SO_SNDBUF, size);
397 }
398
399
400 // you can ask this class for a UDP socket to send a query from
401 // this socket is not yours, don't even think about deleting it
402 // but after you call 'returnSocket' on it, don't assume anything anymore
403 class UDPClientSocks
404 {
405 unsigned int d_numsocks;
406 public:
407 UDPClientSocks() : d_numsocks(0)
408 {
409 }
410
411 typedef set<int> socks_t;
412 socks_t d_socks;
413
414 // returning -2 means: temporary OS error (ie, out of files), -1 means error related to remote
415 int getSocket(const ComboAddress& toaddr, int* fd)
416 {
417 *fd=makeClientSocket(toaddr.sin4.sin_family);
418 if(*fd < 0) // temporary error - receive exception otherwise
419 return -2;
420
421 if(connect(*fd, (struct sockaddr*)(&toaddr), toaddr.getSocklen()) < 0) {
422 int err = errno;
423 // returnSocket(*fd);
424 try {
425 closesocket(*fd);
426 }
427 catch(const PDNSException& e) {
428 L<<Logger::Error<<"Error closing UDP socket after connect() failed: "<<e.reason<<endl;
429 }
430
431 if(err==ENETUNREACH) // Seth "My Interfaces Are Like A Yo Yo" Arnold special
432 return -2;
433 return -1;
434 }
435
436 d_socks.insert(*fd);
437 d_numsocks++;
438 return 0;
439 }
440
441 void returnSocket(int fd)
442 {
443 socks_t::iterator i=d_socks.find(fd);
444 if(i==d_socks.end()) {
445 throw PDNSException("Trying to return a socket (fd="+std::to_string(fd)+") not in the pool");
446 }
447 returnSocketLocked(i);
448 }
449
450 // return a socket to the pool, or simply erase it
451 void returnSocketLocked(socks_t::iterator& i)
452 {
453 if(i==d_socks.end()) {
454 throw PDNSException("Trying to return a socket not in the pool");
455 }
456 try {
457 t_fdm->removeReadFD(*i);
458 }
459 catch(FDMultiplexerException& e) {
460 // we sometimes return a socket that has not yet been assigned to t_fdm
461 }
462 try {
463 closesocket(*i);
464 }
465 catch(const PDNSException& e) {
466 L<<Logger::Error<<"Error closing returned UDP socket: "<<e.reason<<endl;
467 }
468
469 d_socks.erase(i++);
470 --d_numsocks;
471 }
472
473 // returns -1 for errors which might go away, throws for ones that won't
474 static int makeClientSocket(int family)
475 {
476 int ret=socket(family, SOCK_DGRAM, 0 ); // turns out that setting CLO_EXEC and NONBLOCK from here is not a performance win on Linux (oddly enough)
477
478 if(ret < 0 && errno==EMFILE) // this is not a catastrophic error
479 return ret;
480
481 if(ret<0)
482 throw PDNSException("Making a socket for resolver (family = "+std::to_string(family)+"): "+stringerror());
483
484 // setCloseOnExec(ret); // we're not going to exec
485
486 int tries=10;
487 ComboAddress sin;
488 while(--tries) {
489 uint16_t port;
490
491 if(tries==1) // fall back to kernel 'random'
492 port = 0;
493 else
494 port = 1025 + dns_random(64510);
495
496 sin=getQueryLocalAddress(family, port); // does htons for us
497
498 if (::bind(ret, (struct sockaddr *)&sin, sin.getSocklen()) >= 0)
499 break;
500 }
501 if(!tries)
502 throw PDNSException("Resolver binding to local query client socket on "+sin.toString()+": "+stringerror());
503
504 setNonBlocking(ret);
505 return ret;
506 }
507 };
508
509 static thread_local std::unique_ptr<UDPClientSocks> t_udpclientsocks;
510
511 /* these two functions are used by LWRes */
512 // -2 is OS error, -1 is error that depends on the remote, > 0 is success
513 int asendto(const char *data, size_t len, int flags,
514 const ComboAddress& toaddr, uint16_t id, const DNSName& domain, uint16_t qtype, int* fd)
515 {
516
517 PacketID pident;
518 pident.domain = domain;
519 pident.remote = toaddr;
520 pident.type = qtype;
521
522 // see if there is an existing outstanding request we can chain on to, using partial equivalence function
523 pair<MT_t::waiters_t::iterator, MT_t::waiters_t::iterator> chain=MT->d_waiters.equal_range(pident, PacketIDBirthdayCompare());
524
525 for(; chain.first != chain.second; chain.first++) {
526 if(chain.first->key.fd > -1) { // don't chain onto existing chained waiter!
527 /*
528 cerr<<"Orig: "<<pident.domain<<", "<<pident.remote.toString()<<", id="<<id<<endl;
529 cerr<<"Had hit: "<< chain.first->key.domain<<", "<<chain.first->key.remote.toString()<<", id="<<chain.first->key.id
530 <<", count="<<chain.first->key.chain.size()<<", origfd: "<<chain.first->key.fd<<endl;
531 */
532 chain.first->key.chain.insert(id); // we can chain
533 *fd=-1; // gets used in waitEvent / sendEvent later on
534 return 1;
535 }
536 }
537
538 int ret=t_udpclientsocks->getSocket(toaddr, fd);
539 if(ret < 0)
540 return ret;
541
542 pident.fd=*fd;
543 pident.id=id;
544
545 t_fdm->addReadFD(*fd, handleUDPServerResponse, pident);
546 ret = send(*fd, data, len, 0);
547
548 int tmp = errno;
549
550 if(ret < 0)
551 t_udpclientsocks->returnSocket(*fd);
552
553 errno = tmp; // this is for logging purposes only
554 return ret;
555 }
556
557 // -1 is error, 0 is timeout, 1 is success
558 int arecvfrom(char *data, size_t len, int flags, const ComboAddress& fromaddr, size_t *d_len,
559 uint16_t id, const DNSName& domain, uint16_t qtype, int fd, struct timeval* now)
560 {
561 static optional<unsigned int> nearMissLimit;
562 if(!nearMissLimit)
563 nearMissLimit=::arg().asNum("spoof-nearmiss-max");
564
565 PacketID pident;
566 pident.fd=fd;
567 pident.id=id;
568 pident.domain=domain;
569 pident.type = qtype;
570 pident.remote=fromaddr;
571
572 string packet;
573 int ret=MT->waitEvent(pident, &packet, g_networkTimeoutMsec, now);
574
575 if(ret > 0) {
576 if(packet.empty()) // means "error"
577 return -1;
578
579 *d_len=packet.size();
580 memcpy(data,packet.c_str(),min(len,*d_len));
581 if(*nearMissLimit && pident.nearMisses > *nearMissLimit) {
582 L<<Logger::Error<<"Too many ("<<pident.nearMisses<<" > "<<*nearMissLimit<<") bogus answers for '"<<domain<<"' from "<<fromaddr.toString()<<", assuming spoof attempt."<<endl;
583 g_stats.spoofCount++;
584 return -1;
585 }
586 }
587 else {
588 if(fd >= 0)
589 t_udpclientsocks->returnSocket(fd);
590 }
591 return ret;
592 }
593
594 static void writePid(void)
595 {
596 if(!::arg().mustDo("write-pid"))
597 return;
598 ofstream of(s_pidfname.c_str(), std::ios_base::app);
599 if(of)
600 of<< Utility::getpid() <<endl;
601 else
602 L<<Logger::Error<<"Writing pid for "<<Utility::getpid()<<" to "<<s_pidfname<<" failed: "<<strerror(errno)<<endl;
603 }
604
605 TCPConnection::TCPConnection(int fd, const ComboAddress& addr) : d_remote(addr), d_fd(fd)
606 {
607 ++s_currentConnections;
608 (*t_tcpClientCounts)[d_remote]++;
609 }
610
611 TCPConnection::~TCPConnection()
612 {
613 try {
614 if(closesocket(d_fd) < 0)
615 L<<Logger::Error<<"Error closing socket for TCPConnection"<<endl;
616 }
617 catch(const PDNSException& e) {
618 L<<Logger::Error<<"Error closing TCPConnection socket: "<<e.reason<<endl;
619 }
620
621 if(t_tcpClientCounts->count(d_remote) && !(*t_tcpClientCounts)[d_remote]--)
622 t_tcpClientCounts->erase(d_remote);
623 --s_currentConnections;
624 }
625
626 AtomicCounter TCPConnection::s_currentConnections;
627
628 static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var);
629
630 // the idea is, only do things that depend on the *response* here. Incoming accounting is on incoming.
631 static void updateResponseStats(int res, const ComboAddress& remote, unsigned int packetsize, const DNSName* query, uint16_t qtype)
632 {
633 if(packetsize > 1000 && t_largeanswerremotes)
634 t_largeanswerremotes->push_back(remote);
635 switch(res) {
636 case RCode::ServFail:
637 if(t_servfailremotes) {
638 t_servfailremotes->push_back(remote);
639 if(query && t_servfailqueryring) // packet cache
640 t_servfailqueryring->push_back(make_pair(*query, qtype));
641 }
642 g_stats.servFails++;
643 break;
644 case RCode::NXDomain:
645 g_stats.nxDomains++;
646 break;
647 case RCode::NoError:
648 g_stats.noErrors++;
649 break;
650 }
651 }
652
653 static string makeLoginfo(DNSComboWriter* dc)
654 try
655 {
656 return "("+dc->d_mdp.d_qname.toLogString()+"/"+DNSRecordContent::NumberToType(dc->d_mdp.d_qtype)+" from "+(dc->d_remote.toString())+")";
657 }
658 catch(...)
659 {
660 return "Exception making error message for exception";
661 }
662
663 #ifdef HAVE_PROTOBUF
664 static void protobufLogQuery(const std::shared_ptr<RemoteLogger>& logger, uint8_t maskV4, uint8_t maskV6, const boost::uuids::uuid& uniqueId, const ComboAddress& remote, const ComboAddress& local, const Netmask& ednssubnet, bool tcp, uint16_t id, size_t len, const DNSName& qname, uint16_t qtype, uint16_t qclass, const std::vector<std::string>& policyTags, const std::string& requestorId, const std::string& deviceId)
665 {
666 Netmask requestorNM(remote, remote.sin4.sin_family == AF_INET ? maskV4 : maskV6);
667 const ComboAddress& requestor = requestorNM.getMaskedNetwork();
668 RecProtoBufMessage message(DNSProtoBufMessage::Query, uniqueId, &requestor, &local, qname, qtype, qclass, id, tcp, len);
669 message.setEDNSSubnet(ednssubnet, ednssubnet.isIpv4() ? maskV4 : maskV6);
670 message.setRequestorId(requestorId);
671 message.setDeviceId(deviceId);
672
673 if (!policyTags.empty()) {
674 message.setPolicyTags(policyTags);
675 }
676
677 // cerr <<message.toDebugString()<<endl;
678 std::string str;
679 message.serialize(str);
680 logger->queueData(str);
681 }
682
683 static void protobufLogResponse(const std::shared_ptr<RemoteLogger>& logger, const RecProtoBufMessage& message)
684 {
685 // cerr <<message.toDebugString()<<endl;
686 std::string str;
687 message.serialize(str);
688 logger->queueData(str);
689 }
690 #endif
691
692 /**
693 * Chases the CNAME provided by the PolicyCustom RPZ policy.
694 *
695 * @param spoofed: The DNSRecord that was created by the policy, should already be added to ret
696 * @param qtype: The QType of the original query
697 * @param sr: A SyncRes
698 * @param res: An integer that will contain the RCODE of the lookup we do
699 * @param ret: A vector of DNSRecords where the result of the CNAME chase should be appended to
700 */
701 static void handleRPZCustom(const DNSRecord& spoofed, const QType& qtype, SyncRes& sr, int& res, vector<DNSRecord>& ret)
702 {
703 if (spoofed.d_type == QType::CNAME) {
704 bool oldWantsRPZ = sr.getWantsRPZ();
705 sr.setWantsRPZ(false);
706 vector<DNSRecord> ans;
707 res = sr.beginResolve(DNSName(spoofed.d_content->getZoneRepresentation()), qtype, 1, ans);
708 for (const auto& rec : ans) {
709 if(rec.d_place == DNSResourceRecord::ANSWER) {
710 ret.push_back(rec);
711 }
712 }
713 // Reset the RPZ state of the SyncRes
714 sr.setWantsRPZ(oldWantsRPZ);
715 }
716 }
717
718 static bool addRecordToPacket(DNSPacketWriter& pw, const DNSRecord& rec, uint32_t& minTTL, uint32_t ttlCap, const uint16_t maxAnswerSize)
719 {
720 pw.startRecord(rec.d_name, rec.d_type, (rec.d_ttl > ttlCap ? ttlCap : rec.d_ttl), rec.d_class, rec.d_place);
721
722 if(rec.d_type != QType::OPT) // their TTL ain't real
723 minTTL = min(minTTL, rec.d_ttl);
724
725 rec.d_content->toPacket(pw);
726 if(pw.size() > static_cast<size_t>(maxAnswerSize)) {
727 pw.rollback();
728 if(rec.d_place != DNSResourceRecord::ADDITIONAL) {
729 pw.getHeader()->tc=1;
730 pw.truncate();
731 }
732 return false;
733 }
734
735 return true;
736 }
737
738 static void startDoResolve(void *p)
739 {
740 DNSComboWriter* dc=(DNSComboWriter *)p;
741 try {
742 if (t_queryring)
743 t_queryring->push_back(make_pair(dc->d_mdp.d_qname, dc->d_mdp.d_qtype));
744
745 uint16_t maxanswersize = dc->d_tcp ? 65535 : min(static_cast<uint16_t>(512), g_udpTruncationThreshold);
746 EDNSOpts edo;
747 bool haveEDNS=false;
748 if(getEDNSOpts(dc->d_mdp, &edo)) {
749 if(!dc->d_tcp) {
750 /* rfc6891 6.2.3:
751 "Values lower than 512 MUST be treated as equal to 512."
752 */
753 maxanswersize = min(static_cast<uint16_t>(edo.d_packetsize >= 512 ? edo.d_packetsize : 512), g_udpTruncationThreshold);
754 }
755 dc->d_ednsOpts = edo.d_options;
756 haveEDNS=true;
757
758 if (g_useIncomingECS && !dc->d_ecsParsed) {
759 for (const auto& o : edo.d_options) {
760 if (o.first == EDNSOptionCode::ECS) {
761 dc->d_ecsFound = getEDNSSubnetOptsFromString(o.second, &dc->d_ednssubnet);
762 break;
763 }
764 }
765 }
766 }
767 /* perhaps there was no EDNS or no ECS but by now we looked */
768 dc->d_ecsParsed = true;
769 vector<DNSRecord> ret;
770 vector<uint8_t> packet;
771
772 auto luaconfsLocal = g_luaconfs.getLocal();
773 // Used to tell syncres later on if we should apply NSDNAME and NSIP RPZ triggers for this query
774 bool wantsRPZ(true);
775 RecProtoBufMessage pbMessage(RecProtoBufMessage::Response);
776 #ifdef HAVE_PROTOBUF
777 if (luaconfsLocal->protobufServer) {
778 Netmask requestorNM(dc->d_remote, dc->d_remote.sin4.sin_family == AF_INET ? luaconfsLocal->protobufMaskV4 : luaconfsLocal->protobufMaskV6);
779 const ComboAddress& requestor = requestorNM.getMaskedNetwork();
780 pbMessage.update(dc->d_uuid, &requestor, &dc->d_local, dc->d_tcp, dc->d_mdp.d_header.id);
781 pbMessage.setEDNSSubnet(dc->d_ednssubnet.source, dc->d_ednssubnet.source.isIpv4() ? luaconfsLocal->protobufMaskV4 : luaconfsLocal->protobufMaskV6);
782 pbMessage.setQuestion(dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_mdp.d_qclass);
783 }
784 #endif /* HAVE_PROTOBUF */
785
786 DNSPacketWriter pw(packet, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_mdp.d_qclass);
787
788 pw.getHeader()->aa=0;
789 pw.getHeader()->ra=1;
790 pw.getHeader()->qr=1;
791 pw.getHeader()->tc=0;
792 pw.getHeader()->id=dc->d_mdp.d_header.id;
793 pw.getHeader()->rd=dc->d_mdp.d_header.rd;
794 pw.getHeader()->cd=dc->d_mdp.d_header.cd;
795
796 /* This is the lowest TTL seen in the records of the response,
797 so we can't cache it for longer than this value.
798 If we have a TTL cap, this value can't be larger than the
799 cap no matter what. */
800 uint32_t minTTL = dc->d_ttlCap;
801
802 SyncRes sr(dc->d_now);
803
804 bool DNSSECOK=false;
805 if(t_pdl) {
806 sr.setLuaEngine(t_pdl);
807 }
808 sr.d_requestor=dc->d_remote; // ECS needs this too
809 if(g_dnssecmode != DNSSECMode::Off) {
810 sr.setDoDNSSEC(true);
811
812 // Does the requestor want DNSSEC records?
813 if(edo.d_Z & EDNSOpts::DNSSECOK) {
814 DNSSECOK=true;
815 g_stats.dnssecQueries++;
816 }
817 } else {
818 // Ignore the client-set CD flag
819 pw.getHeader()->cd=0;
820 }
821 sr.setDNSSECValidationRequested(g_dnssecmode == DNSSECMode::ValidateAll || g_dnssecmode==DNSSECMode::ValidateForLog || ((dc->d_mdp.d_header.ad || DNSSECOK) && g_dnssecmode==DNSSECMode::Process));
822
823 #ifdef HAVE_PROTOBUF
824 sr.setInitialRequestId(dc->d_uuid);
825 #endif
826
827 if (g_useIncomingECS) {
828 sr.setIncomingECSFound(dc->d_ecsFound);
829 if (dc->d_ecsFound) {
830 sr.setIncomingECS(dc->d_ednssubnet);
831 }
832 }
833
834 bool tracedQuery=false; // we could consider letting Lua know about this too
835 bool variableAnswer = dc->d_variable;
836 bool shouldNotValidate = false;
837
838 /* preresolve expects res (dq.rcode) to be set to RCode::NoError by default */
839 int res = RCode::NoError;
840 DNSFilterEngine::Policy appliedPolicy;
841 DNSRecord spoofed;
842 RecursorLua4::DNSQuestion dq(dc->d_remote, dc->d_local, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_tcp, variableAnswer, wantsRPZ);
843 dq.ednsFlags = &edo.d_Z;
844 dq.ednsOptions = &dc->d_ednsOpts;
845 dq.tag = dc->d_tag;
846 dq.discardedPolicies = &sr.d_discardedPolicies;
847 dq.policyTags = &dc->d_policyTags;
848 dq.appliedPolicy = &appliedPolicy;
849 dq.currentRecords = &ret;
850 dq.dh = &dc->d_mdp.d_header;
851 dq.data = dc->d_data;
852 #ifdef HAVE_PROTOBUF
853 dq.requestorId = dc->d_requestorId;
854 dq.deviceId = dc->d_deviceId;
855 #endif
856
857 if(dc->d_mdp.d_qtype==QType::ANY && !dc->d_tcp && g_anyToTcp) {
858 pw.getHeader()->tc = 1;
859 res = 0;
860 variableAnswer = true;
861 goto sendit;
862 }
863
864 if(t_traceRegex && t_traceRegex->match(dc->d_mdp.d_qname.toString())) {
865 sr.setLogMode(SyncRes::Store);
866 tracedQuery=true;
867 }
868
869
870 if(!g_quiet || tracedQuery) {
871 L<<Logger::Warning<<t_id<<" ["<<MT->getTid()<<"/"<<MT->numProcesses()<<"] " << (dc->d_tcp ? "TCP " : "") << "question for '"<<dc->d_mdp.d_qname<<"|"
872 <<DNSRecordContent::NumberToType(dc->d_mdp.d_qtype)<<"' from "<<dc->getRemote();
873 if(!dc->d_ednssubnet.source.empty()) {
874 L<<" (ecs "<<dc->d_ednssubnet.source.toString()<<")";
875 }
876 L<<endl;
877 }
878
879 sr.setId(MT->getTid());
880 if(!dc->d_mdp.d_header.rd)
881 sr.setCacheOnly();
882
883 if (t_pdl) {
884 t_pdl->prerpz(dq, res);
885 }
886
887 // Check if the query has a policy attached to it
888 if (wantsRPZ) {
889 appliedPolicy = luaconfsLocal->dfe.getQueryPolicy(dc->d_mdp.d_qname, dc->d_remote, sr.d_discardedPolicies);
890 }
891
892 // if there is a RecursorLua active, and it 'took' the query in preResolve, we don't launch beginResolve
893 if(!t_pdl || !t_pdl->preresolve(dq, res)) {
894
895 sr.setWantsRPZ(wantsRPZ);
896 if(wantsRPZ) {
897 switch(appliedPolicy.d_kind) {
898 case DNSFilterEngine::PolicyKind::NoAction:
899 break;
900 case DNSFilterEngine::PolicyKind::Drop:
901 g_stats.policyDrops++;
902 g_stats.policyResults[appliedPolicy.d_kind]++;
903 delete dc;
904 dc=0;
905 return;
906 case DNSFilterEngine::PolicyKind::NXDOMAIN:
907 g_stats.policyResults[appliedPolicy.d_kind]++;
908 res=RCode::NXDomain;
909 goto haveAnswer;
910 case DNSFilterEngine::PolicyKind::NODATA:
911 g_stats.policyResults[appliedPolicy.d_kind]++;
912 res=RCode::NoError;
913 goto haveAnswer;
914 case DNSFilterEngine::PolicyKind::Custom:
915 g_stats.policyResults[appliedPolicy.d_kind]++;
916 res=RCode::NoError;
917 spoofed=appliedPolicy.getCustomRecord(dc->d_mdp.d_qname);
918 ret.push_back(spoofed);
919 handleRPZCustom(spoofed, QType(dc->d_mdp.d_qtype), sr, res, ret);
920 goto haveAnswer;
921 case DNSFilterEngine::PolicyKind::Truncate:
922 if(!dc->d_tcp) {
923 g_stats.policyResults[appliedPolicy.d_kind]++;
924 res=RCode::NoError;
925 pw.getHeader()->tc=1;
926 goto haveAnswer;
927 }
928 break;
929 }
930 }
931
932 // Query got not handled for QNAME Policy reasons, now actually go out to find an answer
933 try {
934 res = sr.beginResolve(dc->d_mdp.d_qname, QType(dc->d_mdp.d_qtype), dc->d_mdp.d_qclass, ret);
935 shouldNotValidate = sr.wasOutOfBand();
936 }
937 catch(ImmediateServFailException &e) {
938 if(g_logCommonErrors)
939 L<<Logger::Notice<<"Sending SERVFAIL to "<<dc->getRemote()<<" during resolve of '"<<dc->d_mdp.d_qname<<"' because: "<<e.reason<<endl;
940 res = RCode::ServFail;
941 }
942
943 dq.validationState = sr.getValidationState();
944
945 // During lookup, an NSDNAME or NSIP trigger was hit in RPZ
946 if (res == -2) { // XXX This block should be macro'd, it is repeated post-resolve.
947 appliedPolicy = sr.d_appliedPolicy;
948 g_stats.policyResults[appliedPolicy.d_kind]++;
949 switch(appliedPolicy.d_kind) {
950 case DNSFilterEngine::PolicyKind::NoAction: // This can never happen
951 throw PDNSException("NoAction policy returned while a NSDNAME or NSIP trigger was hit");
952 case DNSFilterEngine::PolicyKind::Drop:
953 g_stats.policyDrops++;
954 delete dc;
955 dc=0;
956 return;
957 case DNSFilterEngine::PolicyKind::NXDOMAIN:
958 ret.clear();
959 res=RCode::NXDomain;
960 goto haveAnswer;
961
962 case DNSFilterEngine::PolicyKind::NODATA:
963 ret.clear();
964 res=RCode::NoError;
965 goto haveAnswer;
966
967 case DNSFilterEngine::PolicyKind::Truncate:
968 if(!dc->d_tcp) {
969 ret.clear();
970 res=RCode::NoError;
971 pw.getHeader()->tc=1;
972 goto haveAnswer;
973 }
974 break;
975
976 case DNSFilterEngine::PolicyKind::Custom:
977 ret.clear();
978 res=RCode::NoError;
979 spoofed=appliedPolicy.getCustomRecord(dc->d_mdp.d_qname);
980 ret.push_back(spoofed);
981 handleRPZCustom(spoofed, QType(dc->d_mdp.d_qtype), sr, res, ret);
982 goto haveAnswer;
983 }
984 }
985
986 if (wantsRPZ) {
987 appliedPolicy = luaconfsLocal->dfe.getPostPolicy(ret, sr.d_discardedPolicies);
988 }
989
990 if(t_pdl) {
991 if(res == RCode::NoError) {
992 auto i=ret.cbegin();
993 for(; i!= ret.cend(); ++i)
994 if(i->d_type == dc->d_mdp.d_qtype && i->d_place == DNSResourceRecord::ANSWER)
995 break;
996 if(i == ret.cend() && t_pdl->nodata(dq, res))
997 shouldNotValidate = true;
998
999 }
1000 else if(res == RCode::NXDomain && t_pdl->nxdomain(dq, res))
1001 shouldNotValidate = true;
1002
1003 if(t_pdl->postresolve(dq, res))
1004 shouldNotValidate = true;
1005 }
1006
1007 if (wantsRPZ) { //XXX This block is repeated, see above
1008 g_stats.policyResults[appliedPolicy.d_kind]++;
1009 switch(appliedPolicy.d_kind) {
1010 case DNSFilterEngine::PolicyKind::NoAction:
1011 break;
1012 case DNSFilterEngine::PolicyKind::Drop:
1013 g_stats.policyDrops++;
1014 delete dc;
1015 dc=0;
1016 return;
1017 case DNSFilterEngine::PolicyKind::NXDOMAIN:
1018 ret.clear();
1019 res=RCode::NXDomain;
1020 goto haveAnswer;
1021
1022 case DNSFilterEngine::PolicyKind::NODATA:
1023 ret.clear();
1024 res=RCode::NoError;
1025 goto haveAnswer;
1026
1027 case DNSFilterEngine::PolicyKind::Truncate:
1028 if(!dc->d_tcp) {
1029 ret.clear();
1030 res=RCode::NoError;
1031 pw.getHeader()->tc=1;
1032 goto haveAnswer;
1033 }
1034 break;
1035
1036 case DNSFilterEngine::PolicyKind::Custom:
1037 ret.clear();
1038 res=RCode::NoError;
1039 spoofed=appliedPolicy.getCustomRecord(dc->d_mdp.d_qname);
1040 ret.push_back(spoofed);
1041 handleRPZCustom(spoofed, QType(dc->d_mdp.d_qtype), sr, res, ret);
1042 goto haveAnswer;
1043 }
1044 }
1045 }
1046 haveAnswer:;
1047 if(res == PolicyDecision::DROP) {
1048 g_stats.policyDrops++;
1049 delete dc;
1050 dc=0;
1051 return;
1052 }
1053 if(tracedQuery || res == -1 || res == RCode::ServFail || pw.getHeader()->rcode == RCode::ServFail)
1054 {
1055 string trace(sr.getTrace());
1056 if(!trace.empty()) {
1057 vector<string> lines;
1058 boost::split(lines, trace, boost::is_any_of("\n"));
1059 for(const string& line : lines) {
1060 if(!line.empty())
1061 L<<Logger::Warning<< line << endl;
1062 }
1063 }
1064 }
1065
1066 if(res == -1) {
1067 pw.getHeader()->rcode=RCode::ServFail;
1068 // no commit here, because no record
1069 g_stats.servFails++;
1070 }
1071 else {
1072 pw.getHeader()->rcode=res;
1073
1074 // Does the validation mode or query demand validation?
1075 if(!shouldNotValidate && sr.isDNSSECValidationRequested()) {
1076 try {
1077 if(sr.doLog()) {
1078 L<<Logger::Warning<<"Starting validation of answer to "<<dc->d_mdp.d_qname<<"|"<<QType(dc->d_mdp.d_qtype).getName()<<" for "<<dc->d_remote.toStringWithPort()<<endl;
1079 }
1080
1081 auto state = sr.getValidationState();
1082
1083 if(state == Secure) {
1084 if(sr.doLog()) {
1085 L<<Logger::Warning<<"Answer to "<<dc->d_mdp.d_qname<<"|"<<QType(dc->d_mdp.d_qtype).getName()<<" for "<<dc->d_remote.toStringWithPort()<<" validates correctly"<<endl;
1086 }
1087
1088 // Is the query source interested in the value of the ad-bit?
1089 if (dc->d_mdp.d_header.ad || DNSSECOK)
1090 pw.getHeader()->ad=1;
1091 }
1092 else if(state == Insecure) {
1093 if(sr.doLog()) {
1094 L<<Logger::Warning<<"Answer to "<<dc->d_mdp.d_qname<<"|"<<QType(dc->d_mdp.d_qtype).getName()<<" for "<<dc->d_remote.toStringWithPort()<<" validates as Insecure"<<endl;
1095 }
1096
1097 pw.getHeader()->ad=0;
1098 }
1099 else if(state == Bogus) {
1100 if(g_dnssecLogBogus || sr.doLog() || g_dnssecmode == DNSSECMode::ValidateForLog) {
1101 L<<Logger::Warning<<"Answer to "<<dc->d_mdp.d_qname<<"|"<<QType(dc->d_mdp.d_qtype).getName()<<" for "<<dc->d_remote.toStringWithPort()<<" validates as Bogus"<<endl;
1102 }
1103
1104 // Does the query or validation mode sending out a SERVFAIL on validation errors?
1105 if(!pw.getHeader()->cd && (g_dnssecmode == DNSSECMode::ValidateAll || dc->d_mdp.d_header.ad || DNSSECOK)) {
1106 if(sr.doLog()) {
1107 L<<Logger::Warning<<"Sending out SERVFAIL for "<<dc->d_mdp.d_qname<<"|"<<QType(dc->d_mdp.d_qtype).getName()<<" because recursor or query demands it for Bogus results"<<endl;
1108 }
1109
1110 pw.getHeader()->rcode=RCode::ServFail;
1111 goto sendit;
1112 } else {
1113 if(sr.doLog()) {
1114 L<<Logger::Warning<<"Not sending out SERVFAIL for "<<dc->d_mdp.d_qname<<"|"<<QType(dc->d_mdp.d_qtype).getName()<<" Bogus validation since neither config nor query demands this"<<endl;
1115 }
1116 }
1117 }
1118 }
1119 catch(ImmediateServFailException &e) {
1120 if(g_logCommonErrors)
1121 L<<Logger::Notice<<"Sending SERVFAIL to "<<dc->getRemote()<<" during validation of '"<<dc->d_mdp.d_qname<<"|"<<QType(dc->d_mdp.d_qtype).getName()<<"' because: "<<e.reason<<endl;
1122 pw.getHeader()->rcode=RCode::ServFail;
1123 goto sendit;
1124 }
1125 }
1126
1127 if(ret.size()) {
1128 orderAndShuffle(ret);
1129 if(auto sl = luaconfsLocal->sortlist.getOrderCmp(dc->d_remote)) {
1130 stable_sort(ret.begin(), ret.end(), *sl);
1131 variableAnswer=true;
1132 }
1133 }
1134
1135 bool needCommit = false;
1136 for(auto i=ret.cbegin(); i!=ret.cend(); ++i) {
1137 if( ! DNSSECOK &&
1138 ( i->d_type == QType::NSEC3 ||
1139 (
1140 ( i->d_type == QType::RRSIG || i->d_type==QType::NSEC ) &&
1141 (
1142 ( dc->d_mdp.d_qtype != i->d_type && dc->d_mdp.d_qtype != QType::ANY ) ||
1143 i->d_place != DNSResourceRecord::ANSWER
1144 )
1145 )
1146 )
1147 ) {
1148 continue;
1149 }
1150
1151 if (!addRecordToPacket(pw, *i, minTTL, dc->d_ttlCap, maxanswersize)) {
1152 needCommit = false;
1153 break;
1154 }
1155 needCommit = true;
1156
1157 #ifdef HAVE_PROTOBUF
1158 if(luaconfsLocal->protobufServer && (i->d_type == QType::A || i->d_type == QType::AAAA || i->d_type == QType::CNAME)) {
1159 pbMessage.addRR(*i);
1160 }
1161 #endif
1162 }
1163 if(needCommit)
1164 pw.commit();
1165 }
1166 sendit:;
1167
1168 if (haveEDNS) {
1169 /* we try to add the EDNS OPT RR even for truncated answers,
1170 as rfc6891 states:
1171 "The minimal response MUST be the DNS header, question section, and an
1172 OPT record. This MUST also occur when a truncated response (using
1173 the DNS header's TC bit) is returned."
1174 */
1175 if (addRecordToPacket(pw, makeOpt(edo.d_packetsize, 0, edo.d_Z), minTTL, dc->d_ttlCap, maxanswersize)) {
1176 pw.commit();
1177 }
1178 }
1179
1180 g_rs.submitResponse(dc->d_mdp.d_qtype, packet.size(), !dc->d_tcp);
1181 updateResponseStats(res, dc->d_remote, packet.size(), &dc->d_mdp.d_qname, dc->d_mdp.d_qtype);
1182 #ifdef HAVE_PROTOBUF
1183 if (luaconfsLocal->protobufServer && (!luaconfsLocal->protobufTaggedOnly || (appliedPolicy.d_name && !appliedPolicy.d_name->empty()) || !dc->d_policyTags.empty())) {
1184 pbMessage.setBytes(packet.size());
1185 pbMessage.setResponseCode(pw.getHeader()->rcode);
1186 if (appliedPolicy.d_name) {
1187 pbMessage.setAppliedPolicy(*appliedPolicy.d_name);
1188 pbMessage.setAppliedPolicyType(appliedPolicy.d_type);
1189 }
1190 pbMessage.setPolicyTags(dc->d_policyTags);
1191 pbMessage.setQueryTime(dc->d_now.tv_sec, dc->d_now.tv_usec);
1192 pbMessage.setRequestorId(dq.requestorId);
1193 pbMessage.setDeviceId(dq.deviceId);
1194 protobufLogResponse(luaconfsLocal->protobufServer, pbMessage);
1195 }
1196 #endif
1197 if(!dc->d_tcp) {
1198 struct msghdr msgh;
1199 struct iovec iov;
1200 char cbuf[256];
1201 fillMSGHdr(&msgh, &iov, cbuf, 0, (char*)&*packet.begin(), packet.size(), &dc->d_remote);
1202 msgh.msg_control=NULL;
1203
1204 if(g_fromtosockets.count(dc->d_socket)) {
1205 addCMsgSrcAddr(&msgh, cbuf, &dc->d_local, 0);
1206 }
1207 if(sendmsg(dc->d_socket, &msgh, 0) < 0 && g_logCommonErrors)
1208 L<<Logger::Warning<<"Sending UDP reply to client "<<dc->d_remote.toStringWithPort()<<" failed with: "<<strerror(errno)<<endl;
1209
1210 if(!SyncRes::s_nopacketcache && !variableAnswer && !sr.wasVariable() ) {
1211 t_packetCache->insertResponsePacket(dc->d_tag, dc->d_qhash, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_mdp.d_qclass,
1212 string((const char*)&*packet.begin(), packet.size()),
1213 g_now.tv_sec,
1214 pw.getHeader()->rcode == RCode::ServFail ? SyncRes::s_packetcacheservfailttl :
1215 min(minTTL,SyncRes::s_packetcachettl),
1216 &pbMessage);
1217 }
1218 // else cerr<<"Not putting in packet cache: "<<sr.wasVariable()<<endl;
1219 }
1220 else {
1221 char buf[2];
1222 buf[0]=packet.size()/256;
1223 buf[1]=packet.size()%256;
1224
1225 Utility::iovec iov[2];
1226
1227 iov[0].iov_base=(void*)buf; iov[0].iov_len=2;
1228 iov[1].iov_base=(void*)&*packet.begin(); iov[1].iov_len = packet.size();
1229
1230 int wret=Utility::writev(dc->d_socket, iov, 2);
1231 bool hadError=true;
1232
1233 if(wret == 0)
1234 L<<Logger::Error<<"EOF writing TCP answer to "<<dc->getRemote()<<endl;
1235 else if(wret < 0 )
1236 L<<Logger::Error<<"Error writing TCP answer to "<<dc->getRemote()<<": "<< strerror(errno) <<endl;
1237 else if((unsigned int)wret != 2 + packet.size())
1238 L<<Logger::Error<<"Oops, partial answer sent to "<<dc->getRemote()<<" for "<<dc->d_mdp.d_qname<<" (size="<< (2 + packet.size()) <<", sent "<<wret<<")"<<endl;
1239 else
1240 hadError=false;
1241
1242 // update tcp connection status, either by closing or moving to 'BYTE0'
1243
1244 if(hadError) {
1245 // no need to remove us from FDM, we weren't there
1246 dc->d_socket = -1;
1247 }
1248 else {
1249 dc->d_tcpConnection->queriesCount++;
1250 if (g_tcpMaxQueriesPerConn && dc->d_tcpConnection->queriesCount >= g_tcpMaxQueriesPerConn) {
1251 dc->d_socket = -1;
1252 }
1253 else {
1254 dc->d_tcpConnection->state=TCPConnection::BYTE0;
1255 Utility::gettimeofday(&g_now, 0); // needs to be updated
1256 t_fdm->addReadFD(dc->d_socket, handleRunningTCPQuestion, dc->d_tcpConnection);
1257 t_fdm->setReadTTD(dc->d_socket, g_now, g_tcpTimeout);
1258 }
1259 }
1260 }
1261 float spent=makeFloat(sr.getNow()-dc->d_now);
1262 if(!g_quiet) {
1263 L<<Logger::Error<<t_id<<" ["<<MT->getTid()<<"/"<<MT->numProcesses()<<"] answer to "<<(dc->d_mdp.d_header.rd?"":"non-rd ")<<"question '"<<dc->d_mdp.d_qname<<"|"<<DNSRecordContent::NumberToType(dc->d_mdp.d_qtype);
1264 L<<"': "<<ntohs(pw.getHeader()->ancount)<<" answers, "<<ntohs(pw.getHeader()->arcount)<<" additional, took "<<sr.d_outqueries<<" packets, "<<
1265 sr.d_totUsec/1000.0<<" netw ms, "<< spent*1000.0<<" tot ms, "<<
1266 sr.d_throttledqueries<<" throttled, "<<sr.d_timeouts<<" timeouts, "<<sr.d_tcpoutqueries<<" tcp connections, rcode="<< res;
1267
1268 if(!shouldNotValidate && sr.isDNSSECValidationRequested()) {
1269 L<< ", dnssec="<<vStates[sr.getValidationState()];
1270 }
1271
1272 L<<endl;
1273
1274 }
1275
1276 if (sr.d_outqueries || sr.d_authzonequeries) {
1277 t_RC->cacheMisses++;
1278 }
1279 else {
1280 t_RC->cacheHits++;
1281 }
1282
1283 if(spent < 0.001)
1284 g_stats.answers0_1++;
1285 else if(spent < 0.010)
1286 g_stats.answers1_10++;
1287 else if(spent < 0.1)
1288 g_stats.answers10_100++;
1289 else if(spent < 1.0)
1290 g_stats.answers100_1000++;
1291 else
1292 g_stats.answersSlow++;
1293
1294 uint64_t newLat=(uint64_t)(spent*1000000);
1295 newLat = min(newLat,(uint64_t)(((uint64_t) g_networkTimeoutMsec)*1000)); // outliers of several minutes exist..
1296 g_stats.avgLatencyUsec=(1-1.0/g_latencyStatSize)*g_stats.avgLatencyUsec + (float)newLat/g_latencyStatSize;
1297 // no worries, we do this for packet cache hits elsewhere
1298
1299 auto ourtime = 1000.0*spent-sr.d_totUsec/1000.0; // in msec
1300 if(ourtime < 1)
1301 g_stats.ourtime0_1++;
1302 else if(ourtime < 2)
1303 g_stats.ourtime1_2++;
1304 else if(ourtime < 4)
1305 g_stats.ourtime2_4++;
1306 else if(ourtime < 8)
1307 g_stats.ourtime4_8++;
1308 else if(ourtime < 16)
1309 g_stats.ourtime8_16++;
1310 else if(ourtime < 32)
1311 g_stats.ourtime16_32++;
1312 else {
1313 // cerr<<"SLOW: "<<ourtime<<"ms -> "<<dc->d_mdp.d_qname<<"|"<<DNSRecordContent::NumberToType(dc->d_mdp.d_qtype)<<endl;
1314 g_stats.ourtimeSlow++;
1315 }
1316 if(ourtime >= 0.0) {
1317 newLat=ourtime*1000; // usec
1318 g_stats.avgLatencyOursUsec=(1-1.0/g_latencyStatSize)*g_stats.avgLatencyOursUsec + (float)newLat/g_latencyStatSize;
1319 }
1320 // cout<<dc->d_mdp.d_qname<<"\t"<<MT->getUsec()<<"\t"<<sr.d_outqueries<<endl;
1321 delete dc;
1322 dc=0;
1323 }
1324 catch(PDNSException &ae) {
1325 L<<Logger::Error<<"startDoResolve problem "<<makeLoginfo(dc)<<": "<<ae.reason<<endl;
1326 delete dc;
1327 }
1328 catch(MOADNSException& e) {
1329 L<<Logger::Error<<"DNS parser error "<<makeLoginfo(dc) <<": "<<dc->d_mdp.d_qname<<", "<<e.what()<<endl;
1330 delete dc;
1331 }
1332 catch(std::exception& e) {
1333 L<<Logger::Error<<"STL error "<< makeLoginfo(dc)<<": "<<e.what();
1334
1335 // Luawrapper nests the exception from Lua, so we unnest it here
1336 try {
1337 std::rethrow_if_nested(e);
1338 } catch(const std::exception& ne) {
1339 L<<". Extra info: "<<ne.what();
1340 } catch(...) {}
1341
1342 L<<endl;
1343 delete dc;
1344 }
1345 catch(...) {
1346 L<<Logger::Error<<"Any other exception in a resolver context "<< makeLoginfo(dc) <<endl;
1347 }
1348
1349 g_stats.maxMThreadStackUsage = max(MT->getMaxStackUsage(), g_stats.maxMThreadStackUsage);
1350 }
1351
1352 static void makeControlChannelSocket(int processNum=-1)
1353 {
1354 string sockname=::arg()["socket-dir"]+"/"+s_programname;
1355 if(processNum >= 0)
1356 sockname += "."+std::to_string(processNum);
1357 sockname+=".controlsocket";
1358 s_rcc.listen(sockname);
1359
1360 int sockowner = -1;
1361 int sockgroup = -1;
1362
1363 if (!::arg().isEmpty("socket-group"))
1364 sockgroup=::arg().asGid("socket-group");
1365 if (!::arg().isEmpty("socket-owner"))
1366 sockowner=::arg().asUid("socket-owner");
1367
1368 if (sockgroup > -1 || sockowner > -1) {
1369 if(chown(sockname.c_str(), sockowner, sockgroup) < 0) {
1370 unixDie("Failed to chown control socket");
1371 }
1372 }
1373
1374 // do mode change if socket-mode is given
1375 if(!::arg().isEmpty("socket-mode")) {
1376 mode_t sockmode=::arg().asMode("socket-mode");
1377 if(chmod(sockname.c_str(), sockmode) < 0) {
1378 unixDie("Failed to chmod control socket");
1379 }
1380 }
1381 }
1382
1383 static bool getQNameAndSubnet(const std::string& question, DNSName* dnsname, uint16_t* qtype, uint16_t* qclass, EDNSSubnetOpts* ednssubnet, std::map<uint16_t, EDNSOptionView>* options)
1384 {
1385 bool found = false;
1386 const struct dnsheader* dh = (struct dnsheader*)question.c_str();
1387 size_t questionLen = question.length();
1388 unsigned int consumed=0;
1389 *dnsname=DNSName(question.c_str(), questionLen, sizeof(dnsheader), false, qtype, qclass, &consumed);
1390
1391 size_t pos= sizeof(dnsheader)+consumed+4;
1392 /* at least OPT root label (1), type (2), class (2) and ttl (4) + OPT RR rdlen (2)
1393 = 11 */
1394 if(ntohs(dh->arcount) == 1 && questionLen > pos + 11) { // this code can extract one (1) EDNS Subnet option
1395 /* OPT root label (1) followed by type (2) */
1396 if(question.at(pos)==0 && question.at(pos+1)==0 && question.at(pos+2)==QType::OPT) {
1397 if (!options) {
1398 char* ecsStart = nullptr;
1399 size_t ecsLen = 0;
1400 int res = getEDNSOption((char*)question.c_str()+pos+9, questionLen - pos - 9, EDNSOptionCode::ECS, &ecsStart, &ecsLen);
1401 if (res == 0 && ecsLen > 4) {
1402 EDNSSubnetOpts eso;
1403 if(getEDNSSubnetOptsFromString(ecsStart + 4, ecsLen - 4, &eso)) {
1404 *ednssubnet=eso;
1405 found = true;
1406 }
1407 }
1408 }
1409 else {
1410 int res = getEDNSOptions((char*)question.c_str()+pos+9, questionLen - pos - 9, *options);
1411 if (res == 0) {
1412 const auto& it = options->find(EDNSOptionCode::ECS);
1413 if (it != options->end() && it->second.content != nullptr && it->second.size > 0) {
1414 EDNSSubnetOpts eso;
1415 if(getEDNSSubnetOptsFromString(it->second.content, it->second.size, &eso)) {
1416 *ednssubnet=eso;
1417 found = true;
1418 }
1419 }
1420 }
1421 }
1422 }
1423 }
1424 return found;
1425 }
1426
1427 static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var)
1428 {
1429 shared_ptr<TCPConnection> conn=any_cast<shared_ptr<TCPConnection> >(var);
1430
1431 if(conn->state==TCPConnection::BYTE0) {
1432 ssize_t bytes=recv(conn->getFD(), conn->data, 2, 0);
1433 if(bytes==1)
1434 conn->state=TCPConnection::BYTE1;
1435 if(bytes==2) {
1436 conn->qlen=(((unsigned char)conn->data[0]) << 8)+ (unsigned char)conn->data[1];
1437 conn->bytesread=0;
1438 conn->state=TCPConnection::GETQUESTION;
1439 }
1440 if(!bytes || bytes < 0) {
1441 t_fdm->removeReadFD(fd);
1442 return;
1443 }
1444 }
1445 else if(conn->state==TCPConnection::BYTE1) {
1446 ssize_t bytes=recv(conn->getFD(), conn->data+1, 1, 0);
1447 if(bytes==1) {
1448 conn->state=TCPConnection::GETQUESTION;
1449 conn->qlen=(((unsigned char)conn->data[0]) << 8)+ (unsigned char)conn->data[1];
1450 conn->bytesread=0;
1451 }
1452 if(!bytes || bytes < 0) {
1453 if(g_logCommonErrors)
1454 L<<Logger::Error<<"TCP client "<< conn->d_remote.toString() <<" disconnected after first byte"<<endl;
1455 t_fdm->removeReadFD(fd);
1456 return;
1457 }
1458 }
1459 else if(conn->state==TCPConnection::GETQUESTION) {
1460 ssize_t bytes=recv(conn->getFD(), conn->data + conn->bytesread, conn->qlen - conn->bytesread, 0);
1461 if(!bytes || bytes < 0 || bytes > std::numeric_limits<std::uint16_t>::max()) {
1462 L<<Logger::Error<<"TCP client "<< conn->d_remote.toString() <<" disconnected while reading question body"<<endl;
1463 t_fdm->removeReadFD(fd);
1464 return;
1465 }
1466 conn->bytesread+=(uint16_t)bytes;
1467 if(conn->bytesread==conn->qlen) {
1468 t_fdm->removeReadFD(fd); // should no longer awake ourselves when there is data to read
1469
1470 DNSComboWriter* dc=nullptr;
1471 try {
1472 dc=new DNSComboWriter(conn->data, conn->qlen, g_now);
1473 }
1474 catch(MOADNSException &mde) {
1475 g_stats.clientParseError++;
1476 if(g_logCommonErrors)
1477 L<<Logger::Error<<"Unable to parse packet from TCP client "<< conn->d_remote.toString() <<endl;
1478 return;
1479 }
1480 dc->d_tcpConnection = conn; // carry the torch
1481 dc->setSocket(conn->getFD()); // this is the only time a copy is made of the actual fd
1482 dc->d_tcp=true;
1483 dc->setRemote(&conn->d_remote);
1484 ComboAddress dest;
1485 dest.reset();
1486 dest.sin4.sin_family = conn->d_remote.sin4.sin_family;
1487 socklen_t len = dest.getSocklen();
1488 getsockname(conn->getFD(), (sockaddr*)&dest, &len); // if this fails, we're ok with it
1489 dc->setLocal(dest);
1490 DNSName qname;
1491 uint16_t qtype=0;
1492 uint16_t qclass=0;
1493 bool needECS = false;
1494 string requestorId;
1495 string deviceId;
1496 #ifdef HAVE_PROTOBUF
1497 auto luaconfsLocal = g_luaconfs.getLocal();
1498 if (luaconfsLocal->protobufServer) {
1499 needECS = true;
1500 }
1501 #endif
1502
1503 if(needECS || (t_pdl && (t_pdl->d_gettag_ffi || t_pdl->d_gettag))) {
1504
1505 try {
1506 std::map<uint16_t, EDNSOptionView> ednsOptions;
1507 dc->d_ecsParsed = true;
1508 dc->d_ecsFound = getQNameAndSubnet(std::string(conn->data, conn->qlen), &qname, &qtype, &qclass, &dc->d_ednssubnet, g_gettagNeedsEDNSOptions ? &ednsOptions : nullptr);
1509
1510 if(t_pdl) {
1511 try {
1512 if (t_pdl->d_gettag_ffi) {
1513 dc->d_tag = t_pdl->gettag_ffi(conn->d_remote, dc->d_ednssubnet.source, dest, qname, qtype, &dc->d_policyTags, dc->d_data, ednsOptions, true, requestorId, deviceId, dc->d_ttlCap, dc->d_variable);
1514 }
1515 else if (t_pdl->d_gettag) {
1516 dc->d_tag = t_pdl->gettag(conn->d_remote, dc->d_ednssubnet.source, dest, qname, qtype, &dc->d_policyTags, dc->d_data, ednsOptions, true, requestorId, deviceId);
1517 }
1518 }
1519 catch(const std::exception& e) {
1520 if(g_logCommonErrors)
1521 L<<Logger::Warning<<"Error parsing a query packet qname='"<<qname<<"' for tag determination, setting tag=0: "<<e.what()<<endl;
1522 }
1523 }
1524 }
1525 catch(const std::exception& e)
1526 {
1527 if(g_logCommonErrors)
1528 L<<Logger::Warning<<"Error parsing a query packet for tag determination, setting tag=0: "<<e.what()<<endl;
1529 }
1530 }
1531 #ifdef HAVE_PROTOBUF
1532 if(luaconfsLocal->protobufServer || luaconfsLocal->outgoingProtobufServer) {
1533 dc->d_requestorId = requestorId;
1534 dc->d_deviceId = deviceId;
1535 dc->d_uuid = (*t_uuidGenerator)();
1536 }
1537
1538 if(luaconfsLocal->protobufServer) {
1539 try {
1540 const struct dnsheader* dh = (const struct dnsheader*) conn->data;
1541
1542 if (!luaconfsLocal->protobufTaggedOnly) {
1543 protobufLogQuery(luaconfsLocal->protobufServer, luaconfsLocal->protobufMaskV4, luaconfsLocal->protobufMaskV6, dc->d_uuid, conn->d_remote, dest, dc->d_ednssubnet.source, true, dh->id, conn->qlen, qname, qtype, qclass, dc->d_policyTags, dc->d_requestorId, dc->d_deviceId);
1544 }
1545 }
1546 catch(std::exception& e) {
1547 if(g_logCommonErrors)
1548 L<<Logger::Warning<<"Error parsing a TCP query packet for edns subnet: "<<e.what()<<endl;
1549 }
1550 }
1551 #endif
1552 if(dc->d_mdp.d_header.qr) {
1553 delete dc;
1554 g_stats.ignoredCount++;
1555 L<<Logger::Error<<"Ignoring answer from TCP client "<< conn->d_remote.toString() <<" on server socket!"<<endl;
1556 return;
1557 }
1558 if(dc->d_mdp.d_header.opcode) {
1559 delete dc;
1560 g_stats.ignoredCount++;
1561 L<<Logger::Error<<"Ignoring non-query opcode from TCP client "<< conn->d_remote.toString() <<" on server socket!"<<endl;
1562 return;
1563 }
1564 else {
1565 ++g_stats.qcounter;
1566 ++g_stats.tcpqcounter;
1567 MT->makeThread(startDoResolve, dc); // deletes dc, will set state to BYTE0 again
1568 return;
1569 }
1570 }
1571 }
1572 }
1573
1574 //! Handle new incoming TCP connection
1575 static void handleNewTCPQuestion(int fd, FDMultiplexer::funcparam_t& )
1576 {
1577 ComboAddress addr;
1578 socklen_t addrlen=sizeof(addr);
1579 int newsock=accept(fd, (struct sockaddr*)&addr, &addrlen);
1580 if(newsock>=0) {
1581 if(MT->numProcesses() > g_maxMThreads) {
1582 g_stats.overCapacityDrops++;
1583 try {
1584 closesocket(newsock);
1585 }
1586 catch(const PDNSException& e) {
1587 L<<Logger::Error<<"Error closing TCP socket after an over capacity drop: "<<e.reason<<endl;
1588 }
1589 return;
1590 }
1591
1592 if(t_remotes)
1593 t_remotes->push_back(addr);
1594 if(t_allowFrom && !t_allowFrom->match(&addr)) {
1595 if(!g_quiet)
1596 L<<Logger::Error<<"["<<MT->getTid()<<"] dropping TCP query from "<<addr.toString()<<", address not matched by allow-from"<<endl;
1597
1598 g_stats.unauthorizedTCP++;
1599 try {
1600 closesocket(newsock);
1601 }
1602 catch(const PDNSException& e) {
1603 L<<Logger::Error<<"Error closing TCP socket after an ACL drop: "<<e.reason<<endl;
1604 }
1605 return;
1606 }
1607 if(g_maxTCPPerClient && t_tcpClientCounts->count(addr) && (*t_tcpClientCounts)[addr] >= g_maxTCPPerClient) {
1608 g_stats.tcpClientOverflow++;
1609 try {
1610 closesocket(newsock); // don't call TCPConnection::closeAndCleanup here - did not enter it in the counts yet!
1611 }
1612 catch(const PDNSException& e) {
1613 L<<Logger::Error<<"Error closing TCP socket after an overflow drop: "<<e.reason<<endl;
1614 }
1615 return;
1616 }
1617
1618 setNonBlocking(newsock);
1619 std::shared_ptr<TCPConnection> tc = std::make_shared<TCPConnection>(newsock, addr);
1620 tc->state=TCPConnection::BYTE0;
1621
1622 t_fdm->addReadFD(tc->getFD(), handleRunningTCPQuestion, tc);
1623
1624 struct timeval now;
1625 Utility::gettimeofday(&now, 0);
1626 t_fdm->setReadTTD(tc->getFD(), now, g_tcpTimeout);
1627 }
1628 }
1629
1630 static string* doProcessUDPQuestion(const std::string& question, const ComboAddress& fromaddr, const ComboAddress& destaddr, struct timeval tv, int fd)
1631 {
1632 gettimeofday(&g_now, 0);
1633 struct timeval diff = g_now - tv;
1634 double delta=(diff.tv_sec*1000 + diff.tv_usec/1000.0);
1635
1636 if(tv.tv_sec && delta > 1000.0) {
1637 g_stats.tooOldDrops++;
1638 return 0;
1639 }
1640
1641 ++g_stats.qcounter;
1642 if(fromaddr.sin4.sin_family==AF_INET6)
1643 g_stats.ipv6qcounter++;
1644
1645 string response;
1646 const struct dnsheader* dh = (struct dnsheader*)question.c_str();
1647 unsigned int ctag=0;
1648 uint32_t qhash = 0;
1649 bool needECS = false;
1650 std::vector<std::string> policyTags;
1651 LuaContext::LuaObject data;
1652 string requestorId;
1653 string deviceId;
1654 #ifdef HAVE_PROTOBUF
1655 boost::uuids::uuid uniqueId;
1656 auto luaconfsLocal = g_luaconfs.getLocal();
1657 if (luaconfsLocal->protobufServer) {
1658 uniqueId = (*t_uuidGenerator)();
1659 needECS = true;
1660 } else if (luaconfsLocal->outgoingProtobufServer) {
1661 uniqueId = (*t_uuidGenerator)();
1662 }
1663 #endif
1664 EDNSSubnetOpts ednssubnet;
1665 bool ecsFound = false;
1666 bool ecsParsed = false;
1667 uint32_t ttlCap = std::numeric_limits<uint32_t>::max();
1668 bool variable = false;
1669 try {
1670 DNSName qname;
1671 uint16_t qtype=0;
1672 uint16_t qclass=0;
1673 uint32_t age;
1674 bool qnameParsed=false;
1675 #ifdef MALLOC_TRACE
1676 /*
1677 static uint64_t last=0;
1678 if(!last)
1679 g_mtracer->clearAllocators();
1680 cout<<g_mtracer->getAllocs()-last<<" "<<g_mtracer->getNumOut()<<" -- BEGIN TRACE"<<endl;
1681 last=g_mtracer->getAllocs();
1682 cout<<g_mtracer->topAllocatorsString()<<endl;
1683 g_mtracer->clearAllocators();
1684 */
1685 #endif
1686
1687 if(needECS || (t_pdl && (t_pdl->d_gettag || t_pdl->d_gettag_ffi))) {
1688 try {
1689 std::map<uint16_t, EDNSOptionView> ednsOptions;
1690 ecsFound = getQNameAndSubnet(question, &qname, &qtype, &qclass, &ednssubnet, g_gettagNeedsEDNSOptions ? &ednsOptions : nullptr);
1691 qnameParsed = true;
1692 ecsParsed = true;
1693
1694 if(t_pdl) {
1695 try {
1696 if (t_pdl->d_gettag_ffi) {
1697 ctag = t_pdl->gettag_ffi(fromaddr, ednssubnet.source, destaddr, qname, qtype, &policyTags, data, ednsOptions, false, requestorId, deviceId, ttlCap, variable);
1698 }
1699 else if (t_pdl->d_gettag) {
1700 ctag=t_pdl->gettag(fromaddr, ednssubnet.source, destaddr, qname, qtype, &policyTags, data, ednsOptions, false, requestorId, deviceId);
1701 }
1702 }
1703 catch(const std::exception& e) {
1704 if(g_logCommonErrors)
1705 L<<Logger::Warning<<"Error parsing a query packet qname='"<<qname<<"' for tag determination, setting tag=0: "<<e.what()<<endl;
1706 }
1707 }
1708 }
1709 catch(const std::exception& e)
1710 {
1711 if(g_logCommonErrors)
1712 L<<Logger::Warning<<"Error parsing a query packet for tag determination, setting tag=0: "<<e.what()<<endl;
1713 }
1714 }
1715
1716 bool cacheHit = false;
1717 RecProtoBufMessage pbMessage(DNSProtoBufMessage::DNSProtoBufMessageType::Response);
1718 #ifdef HAVE_PROTOBUF
1719 if(luaconfsLocal->protobufServer) {
1720 if (!luaconfsLocal->protobufTaggedOnly || !policyTags.empty()) {
1721 protobufLogQuery(luaconfsLocal->protobufServer, luaconfsLocal->protobufMaskV4, luaconfsLocal->protobufMaskV6, uniqueId, fromaddr, destaddr, ednssubnet.source, false, dh->id, question.size(), qname, qtype, qclass, policyTags, requestorId, deviceId);
1722 }
1723 }
1724 #endif /* HAVE_PROTOBUF */
1725
1726 /* It might seem like a good idea to skip the packet cache lookup if we know that the answer is not cacheable,
1727 but it means that the hash would not be computed. If some script decides at a later time to mark back the answer
1728 as cacheable we would cache it with a wrong tag, so better safe than sorry. */
1729 if (qnameParsed) {
1730 cacheHit = (!SyncRes::s_nopacketcache && t_packetCache->getResponsePacket(ctag, question, qname, qtype, qclass, g_now.tv_sec, &response, &age, &qhash, &pbMessage));
1731 }
1732 else {
1733 cacheHit = (!SyncRes::s_nopacketcache && t_packetCache->getResponsePacket(ctag, question, g_now.tv_sec, &response, &age, &qhash, &pbMessage));
1734 }
1735
1736 if (cacheHit) {
1737 #ifdef HAVE_PROTOBUF
1738 if(luaconfsLocal->protobufServer && (!luaconfsLocal->protobufTaggedOnly || !pbMessage.getAppliedPolicy().empty() || !pbMessage.getPolicyTags().empty())) {
1739 Netmask requestorNM(fromaddr, fromaddr.sin4.sin_family == AF_INET ? luaconfsLocal->protobufMaskV4 : luaconfsLocal->protobufMaskV6);
1740 const ComboAddress& requestor = requestorNM.getMaskedNetwork();
1741 pbMessage.update(uniqueId, &requestor, &destaddr, false, dh->id);
1742 pbMessage.setEDNSSubnet(ednssubnet.source, ednssubnet.source.isIpv4() ? luaconfsLocal->protobufMaskV4 : luaconfsLocal->protobufMaskV6);
1743 pbMessage.setQueryTime(g_now.tv_sec, g_now.tv_usec);
1744 pbMessage.setRequestorId(requestorId);
1745 pbMessage.setDeviceId(deviceId);
1746 protobufLogResponse(luaconfsLocal->protobufServer, pbMessage);
1747 }
1748 #endif /* HAVE_PROTOBUF */
1749 if(!g_quiet)
1750 L<<Logger::Notice<<t_id<< " question answered from packet cache tag="<<ctag<<" from "<<fromaddr.toString()<<endl;
1751
1752 g_stats.packetCacheHits++;
1753 SyncRes::s_queries++;
1754 ageDNSPacket(response, age);
1755 struct msghdr msgh;
1756 struct iovec iov;
1757 char cbuf[256];
1758 fillMSGHdr(&msgh, &iov, cbuf, 0, (char*)response.c_str(), response.length(), const_cast<ComboAddress*>(&fromaddr));
1759 msgh.msg_control=NULL;
1760
1761 if(g_fromtosockets.count(fd)) {
1762 addCMsgSrcAddr(&msgh, cbuf, &destaddr, 0);
1763 }
1764 if(sendmsg(fd, &msgh, 0) < 0 && g_logCommonErrors)
1765 L<<Logger::Warning<<"Sending UDP reply to client "<<fromaddr.toStringWithPort()<<" failed with: "<<strerror(errno)<<endl;
1766
1767 if(response.length() >= sizeof(struct dnsheader)) {
1768 struct dnsheader tmpdh;
1769 memcpy(&tmpdh, response.c_str(), sizeof(tmpdh));
1770 updateResponseStats(tmpdh.rcode, fromaddr, response.length(), 0, 0);
1771 }
1772 g_stats.avgLatencyUsec=(1-1.0/g_latencyStatSize)*g_stats.avgLatencyUsec + 0.0; // we assume 0 usec
1773 g_stats.avgLatencyOursUsec=(1-1.0/g_latencyStatSize)*g_stats.avgLatencyOursUsec + 0.0; // we assume 0 usec
1774 return 0;
1775 }
1776 }
1777 catch(std::exception& e) {
1778 L<<Logger::Error<<"Error processing or aging answer packet: "<<e.what()<<endl;
1779 return 0;
1780 }
1781
1782 if(t_pdl) {
1783 if(t_pdl->ipfilter(fromaddr, destaddr, *dh)) {
1784 if(!g_quiet)
1785 L<<Logger::Notice<<t_id<<" ["<<MT->getTid()<<"/"<<MT->numProcesses()<<"] DROPPED question from "<<fromaddr.toStringWithPort()<<" based on policy"<<endl;
1786 g_stats.policyDrops++;
1787 return 0;
1788 }
1789 }
1790
1791 if(MT->numProcesses() > g_maxMThreads) {
1792 if(!g_quiet)
1793 L<<Logger::Notice<<t_id<<" ["<<MT->getTid()<<"/"<<MT->numProcesses()<<"] DROPPED question from "<<fromaddr.toStringWithPort()<<", over capacity"<<endl;
1794
1795 g_stats.overCapacityDrops++;
1796 return 0;
1797 }
1798
1799 DNSComboWriter* dc = new DNSComboWriter(question.c_str(), question.size(), g_now);
1800 dc->setSocket(fd);
1801 dc->d_tag=ctag;
1802 dc->d_qhash=qhash;
1803 dc->d_query = question;
1804 dc->setRemote(&fromaddr);
1805 dc->setLocal(destaddr);
1806 dc->d_tcp=false;
1807 dc->d_policyTags = policyTags;
1808 dc->d_data = data;
1809 dc->d_ecsFound = ecsFound;
1810 dc->d_ecsParsed = ecsParsed;
1811 dc->d_ednssubnet = ednssubnet;
1812 dc->d_ttlCap = ttlCap;
1813 dc->d_variable = variable;
1814 #ifdef HAVE_PROTOBUF
1815 if (luaconfsLocal->protobufServer || luaconfsLocal->outgoingProtobufServer) {
1816 dc->d_uuid = uniqueId;
1817 }
1818 dc->d_requestorId = requestorId;
1819 dc->d_deviceId = deviceId;
1820 #endif
1821
1822 MT->makeThread(startDoResolve, (void*) dc); // deletes dc
1823 return 0;
1824 }
1825
1826
1827 static void handleNewUDPQuestion(int fd, FDMultiplexer::funcparam_t& var)
1828 {
1829 ssize_t len;
1830 char data[1500];
1831 ComboAddress fromaddr;
1832 struct msghdr msgh;
1833 struct iovec iov;
1834 char cbuf[256];
1835 bool firstQuery = true;
1836
1837 fromaddr.sin6.sin6_family=AF_INET6; // this makes sure fromaddr is big enough
1838 fillMSGHdr(&msgh, &iov, cbuf, sizeof(cbuf), data, sizeof(data), &fromaddr);
1839
1840 for(size_t counter = 0; counter < s_maxUDPQueriesPerRound; counter++)
1841 if((len=recvmsg(fd, &msgh, 0)) >= 0) {
1842
1843 firstQuery = false;
1844
1845 if(t_remotes)
1846 t_remotes->push_back(fromaddr);
1847
1848 if(t_allowFrom && !t_allowFrom->match(&fromaddr)) {
1849 if(!g_quiet)
1850 L<<Logger::Error<<"["<<MT->getTid()<<"] dropping UDP query from "<<fromaddr.toString()<<", address not matched by allow-from"<<endl;
1851
1852 g_stats.unauthorizedUDP++;
1853 return;
1854 }
1855 BOOST_STATIC_ASSERT(offsetof(sockaddr_in, sin_port) == offsetof(sockaddr_in6, sin6_port));
1856 if(!fromaddr.sin4.sin_port) { // also works for IPv6
1857 if(!g_quiet)
1858 L<<Logger::Error<<"["<<MT->getTid()<<"] dropping UDP query from "<<fromaddr.toStringWithPort()<<", can't deal with port 0"<<endl;
1859
1860 g_stats.clientParseError++; // not quite the best place to put it, but needs to go somewhere
1861 return;
1862 }
1863 try {
1864 dnsheader* dh=(dnsheader*)data;
1865
1866 if(dh->qr) {
1867 g_stats.ignoredCount++;
1868 if(g_logCommonErrors)
1869 L<<Logger::Error<<"Ignoring answer from "<<fromaddr.toString()<<" on server socket!"<<endl;
1870 }
1871 else if(dh->opcode) {
1872 g_stats.ignoredCount++;
1873 if(g_logCommonErrors)
1874 L<<Logger::Error<<"Ignoring non-query opcode "<<dh->opcode<<" from "<<fromaddr.toString()<<" on server socket!"<<endl;
1875 }
1876 else {
1877 string question(data, (size_t)len);
1878 struct timeval tv={0,0};
1879 HarvestTimestamp(&msgh, &tv);
1880 ComboAddress dest;
1881 dest.reset(); // this makes sure we ignore this address if not returned by recvmsg above
1882 auto loc = rplookup(g_listenSocketsAddresses, fd);
1883 if(HarvestDestinationAddress(&msgh, &dest)) {
1884 // but.. need to get port too
1885 if(loc)
1886 dest.sin4.sin_port = loc->sin4.sin_port;
1887 }
1888 else {
1889 if(loc) {
1890 dest = *loc;
1891 }
1892 else {
1893 dest.sin4.sin_family = fromaddr.sin4.sin_family;
1894 socklen_t slen = dest.getSocklen();
1895 getsockname(fd, (sockaddr*)&dest, &slen); // if this fails, we're ok with it
1896 }
1897 }
1898 if(g_weDistributeQueries)
1899 distributeAsyncFunction(question, boost::bind(doProcessUDPQuestion, question, fromaddr, dest, tv, fd));
1900 else
1901 doProcessUDPQuestion(question, fromaddr, dest, tv, fd);
1902 }
1903 }
1904 catch(MOADNSException& mde) {
1905 g_stats.clientParseError++;
1906 if(g_logCommonErrors)
1907 L<<Logger::Error<<"Unable to parse packet from remote UDP client "<<fromaddr.toString() <<": "<<mde.what()<<endl;
1908 }
1909 catch(std::runtime_error& e) {
1910 g_stats.clientParseError++;
1911 if(g_logCommonErrors)
1912 L<<Logger::Error<<"Unable to parse packet from remote UDP client "<<fromaddr.toString() <<": "<<e.what()<<endl;
1913 }
1914 }
1915 else {
1916 // cerr<<t_id<<" had error: "<<stringerror()<<endl;
1917 if(firstQuery && errno == EAGAIN)
1918 g_stats.noPacketError++;
1919
1920 break;
1921 }
1922 }
1923
1924 static void makeTCPServerSockets(unsigned int threadId)
1925 {
1926 int fd;
1927 vector<string>locals;
1928 stringtok(locals,::arg()["local-address"]," ,");
1929
1930 if(locals.empty())
1931 throw PDNSException("No local address specified");
1932
1933 for(vector<string>::const_iterator i=locals.begin();i!=locals.end();++i) {
1934 ServiceTuple st;
1935 st.port=::arg().asNum("local-port");
1936 parseService(*i, st);
1937
1938 ComboAddress sin;
1939
1940 sin.reset();
1941 sin.sin4.sin_family = AF_INET;
1942 if(!IpToU32(st.host, (uint32_t*)&sin.sin4.sin_addr.s_addr)) {
1943 sin.sin6.sin6_family = AF_INET6;
1944 if(makeIPv6sockaddr(st.host, &sin.sin6) < 0)
1945 throw PDNSException("Unable to resolve local address for TCP server on '"+ st.host +"'");
1946 }
1947
1948 fd=socket(sin.sin6.sin6_family, SOCK_STREAM, 0);
1949 if(fd<0)
1950 throw PDNSException("Making a TCP server socket for resolver: "+stringerror());
1951
1952 setCloseOnExec(fd);
1953
1954 int tmp=1;
1955 if(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &tmp, sizeof tmp)<0) {
1956 L<<Logger::Error<<"Setsockopt failed for TCP listening socket"<<endl;
1957 exit(1);
1958 }
1959 if(sin.sin6.sin6_family == AF_INET6 && setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &tmp, sizeof(tmp)) < 0) {
1960 L<<Logger::Error<<"Failed to set IPv6 socket to IPv6 only, continuing anyhow: "<<strerror(errno)<<endl;
1961 }
1962
1963 #ifdef TCP_DEFER_ACCEPT
1964 if(setsockopt(fd, SOL_TCP, TCP_DEFER_ACCEPT, &tmp, sizeof tmp) >= 0) {
1965 if(i==locals.begin())
1966 L<<Logger::Error<<"Enabled TCP data-ready filter for (slight) DoS protection"<<endl;
1967 }
1968 #endif
1969
1970 if( ::arg().mustDo("non-local-bind") )
1971 Utility::setBindAny(AF_INET, fd);
1972
1973 #ifdef SO_REUSEPORT
1974 if(g_reusePort) {
1975 if(setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &tmp, sizeof(tmp)) < 0)
1976 throw PDNSException("SO_REUSEPORT: "+stringerror());
1977 }
1978 #endif
1979
1980 if (::arg().asNum("tcp-fast-open") > 0) {
1981 #ifdef TCP_FASTOPEN
1982 int fastOpenQueueSize = ::arg().asNum("tcp-fast-open");
1983 if (setsockopt(fd, IPPROTO_TCP, TCP_FASTOPEN, &fastOpenQueueSize, sizeof fastOpenQueueSize) < 0) {
1984 L<<Logger::Error<<"Failed to enable TCP Fast Open for listening socket: "<<strerror(errno)<<endl;
1985 }
1986 #else
1987 L<<Logger::Warning<<"TCP Fast Open configured but not supported for listening socket"<<endl;
1988 #endif
1989 }
1990
1991 sin.sin4.sin_port = htons(st.port);
1992 socklen_t socklen=sin.sin4.sin_family==AF_INET ? sizeof(sin.sin4) : sizeof(sin.sin6);
1993 if (::bind(fd, (struct sockaddr *)&sin, socklen )<0)
1994 throw PDNSException("Binding TCP server socket for "+ st.host +": "+stringerror());
1995
1996 setNonBlocking(fd);
1997 setSocketSendBuffer(fd, 65000);
1998 listen(fd, 128);
1999 deferredAdds[threadId].push_back(make_pair(fd, handleNewTCPQuestion));
2000 g_tcpListenSockets[threadId].insert(fd);
2001 // we don't need to update g_listenSocketsAddresses since it doesn't work for TCP/IP:
2002 // - fd is not that which we know here, but returned from accept()
2003 if(sin.sin4.sin_family == AF_INET)
2004 L<<Logger::Error<<"Listening for TCP queries on "<< sin.toString() <<":"<<st.port<<endl;
2005 else
2006 L<<Logger::Error<<"Listening for TCP queries on ["<< sin.toString() <<"]:"<<st.port<<endl;
2007 }
2008 }
2009
2010 static void makeUDPServerSockets(unsigned int threadId)
2011 {
2012 int one=1;
2013 vector<string>locals;
2014 stringtok(locals,::arg()["local-address"]," ,");
2015
2016 if(locals.empty())
2017 throw PDNSException("No local address specified");
2018
2019 for(vector<string>::const_iterator i=locals.begin();i!=locals.end();++i) {
2020 ServiceTuple st;
2021 st.port=::arg().asNum("local-port");
2022 parseService(*i, st);
2023
2024 ComboAddress sin;
2025
2026 sin.reset();
2027 sin.sin4.sin_family = AF_INET;
2028 if(!IpToU32(st.host.c_str() , (uint32_t*)&sin.sin4.sin_addr.s_addr)) {
2029 sin.sin6.sin6_family = AF_INET6;
2030 if(makeIPv6sockaddr(st.host, &sin.sin6) < 0)
2031 throw PDNSException("Unable to resolve local address for UDP server on '"+ st.host +"'");
2032 }
2033
2034 int fd=socket(sin.sin4.sin_family, SOCK_DGRAM, 0);
2035 if(fd < 0) {
2036 throw PDNSException("Making a UDP server socket for resolver: "+netstringerror());
2037 }
2038 if (!setSocketTimestamps(fd))
2039 L<<Logger::Warning<<"Unable to enable timestamp reporting for socket"<<endl;
2040
2041 if(IsAnyAddress(sin)) {
2042 if(sin.sin4.sin_family == AF_INET)
2043 if(!setsockopt(fd, IPPROTO_IP, GEN_IP_PKTINFO, &one, sizeof(one))) // linux supports this, so why not - might fail on other systems
2044 g_fromtosockets.insert(fd);
2045 #ifdef IPV6_RECVPKTINFO
2046 if(sin.sin4.sin_family == AF_INET6)
2047 if(!setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, &one, sizeof(one)))
2048 g_fromtosockets.insert(fd);
2049 #endif
2050 if(sin.sin6.sin6_family == AF_INET6 && setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one)) < 0) {
2051 L<<Logger::Error<<"Failed to set IPv6 socket to IPv6 only, continuing anyhow: "<<strerror(errno)<<endl;
2052 }
2053 }
2054 if( ::arg().mustDo("non-local-bind") )
2055 Utility::setBindAny(AF_INET6, fd);
2056
2057 setCloseOnExec(fd);
2058
2059 setSocketReceiveBuffer(fd, 250000);
2060 sin.sin4.sin_port = htons(st.port);
2061
2062
2063 #ifdef SO_REUSEPORT
2064 if(g_reusePort) {
2065 if(setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &one, sizeof(one)) < 0)
2066 throw PDNSException("SO_REUSEPORT: "+stringerror());
2067 }
2068 #endif
2069 socklen_t socklen=sin.getSocklen();
2070 if (::bind(fd, (struct sockaddr *)&sin, socklen)<0)
2071 throw PDNSException("Resolver binding to server socket on port "+ std::to_string(st.port) +" for "+ st.host+": "+stringerror());
2072
2073 setNonBlocking(fd);
2074
2075 deferredAdds[threadId].push_back(make_pair(fd, handleNewUDPQuestion));
2076 g_listenSocketsAddresses[fd]=sin; // this is written to only from the startup thread, not from the workers
2077 if(sin.sin4.sin_family == AF_INET)
2078 L<<Logger::Error<<"Listening for UDP queries on "<< sin.toString() <<":"<<st.port<<endl;
2079 else
2080 L<<Logger::Error<<"Listening for UDP queries on ["<< sin.toString() <<"]:"<<st.port<<endl;
2081 }
2082 }
2083
2084 static void daemonize(void)
2085 {
2086 if(fork())
2087 exit(0); // bye bye
2088
2089 setsid();
2090
2091 int i=open("/dev/null",O_RDWR); /* open stdin */
2092 if(i < 0)
2093 L<<Logger::Critical<<"Unable to open /dev/null: "<<stringerror()<<endl;
2094 else {
2095 dup2(i,0); /* stdin */
2096 dup2(i,1); /* stderr */
2097 dup2(i,2); /* stderr */
2098 close(i);
2099 }
2100 }
2101
2102 static void usr1Handler(int)
2103 {
2104 statsWanted=true;
2105 }
2106
2107 static void usr2Handler(int)
2108 {
2109 g_quiet= !g_quiet;
2110 SyncRes::setDefaultLogMode(g_quiet ? SyncRes::LogNone : SyncRes::Log);
2111 ::arg().set("quiet")=g_quiet ? "" : "no";
2112 }
2113
2114 static void doStats(void)
2115 {
2116 static time_t lastOutputTime;
2117 static uint64_t lastQueryCount;
2118
2119 uint64_t cacheHits = broadcastAccFunction<uint64_t>(pleaseGetCacheHits);
2120 uint64_t cacheMisses = broadcastAccFunction<uint64_t>(pleaseGetCacheMisses);
2121
2122 if(g_stats.qcounter && (cacheHits + cacheMisses) && SyncRes::s_queries && SyncRes::s_outqueries) {
2123 L<<Logger::Notice<<"stats: "<<g_stats.qcounter<<" questions, "<<
2124 broadcastAccFunction<uint64_t>(pleaseGetCacheSize)<< " cache entries, "<<
2125 broadcastAccFunction<uint64_t>(pleaseGetNegCacheSize)<<" negative entries, "<<
2126 (int)((cacheHits*100.0)/(cacheHits+cacheMisses))<<"% cache hits"<<endl;
2127
2128 L<<Logger::Notice<<"stats: throttle map: "
2129 << broadcastAccFunction<uint64_t>(pleaseGetThrottleSize) <<", ns speeds: "
2130 << broadcastAccFunction<uint64_t>(pleaseGetNsSpeedsSize)<<endl;
2131 L<<Logger::Notice<<"stats: outpacket/query ratio "<<(int)(SyncRes::s_outqueries*100.0/SyncRes::s_queries)<<"%";
2132 L<<Logger::Notice<<", "<<(int)(SyncRes::s_throttledqueries*100.0/(SyncRes::s_outqueries+SyncRes::s_throttledqueries))<<"% throttled, "
2133 <<SyncRes::s_nodelegated<<" no-delegation drops"<<endl;
2134 L<<Logger::Notice<<"stats: "<<SyncRes::s_tcpoutqueries<<" outgoing tcp connections, "<<
2135 broadcastAccFunction<uint64_t>(pleaseGetConcurrentQueries)<<" queries running, "<<SyncRes::s_outgoingtimeouts<<" outgoing timeouts"<<endl;
2136
2137 //L<<Logger::Notice<<"stats: "<<g_stats.ednsPingMatches<<" ping matches, "<<g_stats.ednsPingMismatches<<" mismatches, "<<
2138 //g_stats.noPingOutQueries<<" outqueries w/o ping, "<< g_stats.noEdnsOutQueries<<" w/o EDNS"<<endl;
2139
2140 L<<Logger::Notice<<"stats: " << broadcastAccFunction<uint64_t>(pleaseGetPacketCacheSize) <<
2141 " packet cache entries, "<<(int)(100.0*broadcastAccFunction<uint64_t>(pleaseGetPacketCacheHits)/SyncRes::s_queries) << "% packet cache hits"<<endl;
2142
2143 time_t now = time(0);
2144 if(lastOutputTime && lastQueryCount && now != lastOutputTime) {
2145 L<<Logger::Notice<<"stats: "<< (SyncRes::s_queries - lastQueryCount) / (now - lastOutputTime) <<" qps (average over "<< (now - lastOutputTime) << " seconds)"<<endl;
2146 }
2147 lastOutputTime = now;
2148 lastQueryCount = SyncRes::s_queries;
2149 }
2150 else if(statsWanted)
2151 L<<Logger::Notice<<"stats: no stats yet!"<<endl;
2152
2153 statsWanted=false;
2154 }
2155
2156 static void houseKeeping(void *)
2157 {
2158 static thread_local time_t last_rootupdate, last_prune, last_secpoll;
2159 static thread_local int cleanCounter=0;
2160 static thread_local bool s_running; // houseKeeping can get suspended in secpoll, and be restarted, which makes us do duplicate work
2161 try {
2162 if(s_running)
2163 return;
2164 s_running=true;
2165
2166 struct timeval now;
2167 Utility::gettimeofday(&now, 0);
2168
2169 if(now.tv_sec - last_prune > (time_t)(5 + t_id)) {
2170 DTime dt;
2171 dt.setTimeval(now);
2172 t_RC->doPrune(g_maxCacheEntries / g_numThreads); // this function is local to a thread, so fine anyhow
2173 t_packetCache->doPruneTo(g_maxPacketCacheEntries / g_numWorkerThreads);
2174
2175 SyncRes::pruneNegCache(g_maxCacheEntries / (g_numWorkerThreads * 10));
2176
2177 if(!((cleanCounter++)%40)) { // this is a full scan!
2178 time_t limit=now.tv_sec-300;
2179 SyncRes::pruneNSSpeeds(limit);
2180 }
2181 last_prune=time(0);
2182 }
2183
2184 if(now.tv_sec - last_rootupdate > 7200) {
2185 int res = SyncRes::getRootNS(g_now, nullptr);
2186 if (!res)
2187 last_rootupdate=now.tv_sec;
2188 }
2189
2190 if(t_id == s_distributorThreadID) {
2191
2192 if(now.tv_sec - last_secpoll >= 3600) {
2193 try {
2194 doSecPoll(&last_secpoll);
2195 }
2196 catch(std::exception& e)
2197 {
2198 L<<Logger::Error<<"Exception while performing security poll: "<<e.what()<<endl;
2199 }
2200 catch(PDNSException& e)
2201 {
2202 L<<Logger::Error<<"Exception while performing security poll: "<<e.reason<<endl;
2203 }
2204 catch(ImmediateServFailException &e)
2205 {
2206 L<<Logger::Error<<"Exception while performing security poll: "<<e.reason<<endl;
2207 }
2208 catch(...)
2209 {
2210 L<<Logger::Error<<"Exception while performing security poll"<<endl;
2211 }
2212
2213 }
2214 }
2215 s_running=false;
2216 }
2217 catch(PDNSException& ae)
2218 {
2219 s_running=false;
2220 L<<Logger::Error<<"Fatal error in housekeeping thread: "<<ae.reason<<endl;
2221 throw;
2222 }
2223 }
2224
2225 static void makeThreadPipes()
2226 {
2227 for(unsigned int n=0; n < g_numThreads; ++n) {
2228 struct ThreadPipeSet tps;
2229 int fd[2];
2230 if(pipe(fd) < 0)
2231 unixDie("Creating pipe for inter-thread communications");
2232
2233 tps.readToThread = fd[0];
2234 tps.writeToThread = fd[1];
2235
2236 if(pipe(fd) < 0)
2237 unixDie("Creating pipe for inter-thread communications");
2238 tps.readFromThread = fd[0];
2239 tps.writeFromThread = fd[1];
2240
2241 if(pipe(fd) < 0)
2242 unixDie("Creating pipe for inter-thread communications");
2243 tps.readQueriesToThread = fd[0];
2244 tps.writeQueriesToThread = fd[1];
2245
2246 if (!setNonBlocking(tps.writeQueriesToThread)) {
2247 unixDie("Making pipe for inter-thread communications non-blocking");
2248 }
2249
2250 g_pipes.push_back(tps);
2251 }
2252 }
2253
2254 struct ThreadMSG
2255 {
2256 pipefunc_t func;
2257 bool wantAnswer;
2258 };
2259
2260 void broadcastFunction(const pipefunc_t& func)
2261 {
2262 /* This function might be called before t_id are set during startup
2263 for the initialization of ACLs and domain maps, but the default is the same
2264 than the handler thread */
2265 if (t_id != s_handlerThreadID) {
2266 L<<Logger::Error<<"broadcastFunction() has been called by a worker ("<<t_id<<")"<<endl;
2267 exit(1);
2268 }
2269
2270 /* the distributor will call itself below, but if we are the handler thread,
2271 call the function ourselves to update the ACL or domain maps for example */
2272 func();
2273
2274 int n = 0;
2275 for(ThreadPipeSet& tps : g_pipes)
2276 {
2277 if(n++ == t_id) {
2278 func(); // don't write to ourselves!
2279 continue;
2280 }
2281
2282 ThreadMSG* tmsg = new ThreadMSG();
2283 tmsg->func = func;
2284 tmsg->wantAnswer = true;
2285 if(write(tps.writeToThread, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) {
2286 delete tmsg;
2287 unixDie("write to thread pipe returned wrong size or error");
2288 }
2289
2290 string* resp = nullptr;
2291 if(read(tps.readFromThread, &resp, sizeof(resp)) != sizeof(resp))
2292 unixDie("read from thread pipe returned wrong size or error");
2293
2294 if(resp) {
2295 delete resp;
2296 resp = nullptr;
2297 }
2298 }
2299 }
2300
2301 // This function is only called by the distributor thread, when pdns-distributes-queries is set
2302 void distributeAsyncFunction(const string& packet, const pipefunc_t& func)
2303 {
2304 if (t_id != s_distributorThreadID) {
2305 L<<Logger::Error<<"distributeAsyncFunction() has been called by a worker ("<<t_id<<")"<<endl;
2306 exit(1);
2307 }
2308
2309 unsigned int hash = hashQuestion(packet.c_str(), packet.length(), g_disthashseed);
2310 unsigned int target = 1 + (hash % (g_pipes.size()-1));
2311
2312 if(target == static_cast<unsigned int>(s_distributorThreadID)) {
2313 L<<Logger::Error<<"distributeAsyncFunction() tried to assign a query to the distributor"<<endl;
2314 exit(1);
2315 }
2316
2317 ThreadPipeSet& tps = g_pipes[target];
2318 ThreadMSG* tmsg = new ThreadMSG();
2319 tmsg->func = func;
2320 tmsg->wantAnswer = false;
2321
2322 ssize_t written = write(tps.writeQueriesToThread, &tmsg, sizeof(tmsg));
2323 if (written > 0) {
2324 if (static_cast<size_t>(written) != sizeof(tmsg)) {
2325 delete tmsg;
2326 unixDie("write to thread pipe returned wrong size or error");
2327 }
2328 }
2329 else {
2330 int error = errno;
2331 delete tmsg;
2332 if (error == EAGAIN || error == EWOULDBLOCK) {
2333 g_stats.queryPipeFullDrops++;
2334 } else {
2335 unixDie("write to thread pipe returned wrong size or error:" + std::to_string(error));
2336 }
2337 }
2338 }
2339
2340 static void handlePipeRequest(int fd, FDMultiplexer::funcparam_t& var)
2341 {
2342 ThreadMSG* tmsg = nullptr;
2343
2344 if(read(fd, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) { // fd == readToThread || fd == readQueriesToThread
2345 unixDie("read from thread pipe returned wrong size or error");
2346 }
2347
2348 void *resp=0;
2349 try {
2350 resp = tmsg->func();
2351 }
2352 catch(std::exception& e) {
2353 if(g_logCommonErrors)
2354 L<<Logger::Error<<"PIPE function we executed created exception: "<<e.what()<<endl; // but what if they wanted an answer.. we send 0
2355 }
2356 catch(PDNSException& e) {
2357 if(g_logCommonErrors)
2358 L<<Logger::Error<<"PIPE function we executed created PDNS exception: "<<e.reason<<endl; // but what if they wanted an answer.. we send 0
2359 }
2360 if(tmsg->wantAnswer) {
2361 if(write(g_pipes[t_id].writeFromThread, &resp, sizeof(resp)) != sizeof(resp)) {
2362 delete tmsg;
2363 unixDie("write to thread pipe returned wrong size or error");
2364 }
2365 }
2366
2367 delete tmsg;
2368 }
2369
2370 template<class T> void *voider(const boost::function<T*()>& func)
2371 {
2372 return func();
2373 }
2374
2375 vector<ComboAddress>& operator+=(vector<ComboAddress>&a, const vector<ComboAddress>& b)
2376 {
2377 a.insert(a.end(), b.begin(), b.end());
2378 return a;
2379 }
2380
2381 vector<pair<string, uint16_t> >& operator+=(vector<pair<string, uint16_t> >&a, const vector<pair<string, uint16_t> >& b)
2382 {
2383 a.insert(a.end(), b.begin(), b.end());
2384 return a;
2385 }
2386
2387 vector<pair<DNSName, uint16_t> >& operator+=(vector<pair<DNSName, uint16_t> >&a, const vector<pair<DNSName, uint16_t> >& b)
2388 {
2389 a.insert(a.end(), b.begin(), b.end());
2390 return a;
2391 }
2392
2393
2394 /*
2395 This function should only be called by the handler to gather metrics, wipe the cache,
2396 reload the Lua script (not the Lua config) or change the current trace regex,
2397 and by the SNMP thread to gather metrics. */
2398 template<class T> T broadcastAccFunction(const boost::function<T*()>& func)
2399 {
2400 /* the SNMP thread uses id -1 too */
2401 if (t_id != s_handlerThreadID) {
2402 L<<Logger::Error<<"broadcastAccFunction has been called by a worker ("<<t_id<<")"<<endl;
2403 exit(1);
2404 }
2405
2406 T ret=T();
2407 for(ThreadPipeSet& tps : g_pipes)
2408 {
2409 ThreadMSG* tmsg = new ThreadMSG();
2410 tmsg->func = boost::bind(voider<T>, func);
2411 tmsg->wantAnswer = true;
2412
2413 if(write(tps.writeToThread, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) {
2414 delete tmsg;
2415 unixDie("write to thread pipe returned wrong size or error");
2416 }
2417
2418 T* resp = nullptr;
2419 if(read(tps.readFromThread, &resp, sizeof(resp)) != sizeof(resp))
2420 unixDie("read from thread pipe returned wrong size or error");
2421
2422 if(resp) {
2423 ret += *resp;
2424 delete resp;
2425 resp = nullptr;
2426 }
2427 }
2428 return ret;
2429 }
2430
2431 template string broadcastAccFunction(const boost::function<string*()>& fun); // explicit instantiation
2432 template uint64_t broadcastAccFunction(const boost::function<uint64_t*()>& fun); // explicit instantiation
2433 template vector<ComboAddress> broadcastAccFunction(const boost::function<vector<ComboAddress> *()>& fun); // explicit instantiation
2434 template vector<pair<DNSName,uint16_t> > broadcastAccFunction(const boost::function<vector<pair<DNSName, uint16_t> > *()>& fun); // explicit instantiation
2435
2436 static void handleRCC(int fd, FDMultiplexer::funcparam_t& var)
2437 {
2438 string remote;
2439 string msg=s_rcc.recv(&remote);
2440 RecursorControlParser rcp;
2441 RecursorControlParser::func_t* command;
2442
2443 string answer=rcp.getAnswer(msg, &command);
2444
2445 // If we are inside a chroot, we need to strip
2446 if (!arg()["chroot"].empty()) {
2447 size_t len = arg()["chroot"].length();
2448 remote = remote.substr(len);
2449 }
2450
2451 try {
2452 s_rcc.send(answer, &remote);
2453 command();
2454 }
2455 catch(std::exception& e) {
2456 L<<Logger::Error<<"Error dealing with control socket request: "<<e.what()<<endl;
2457 }
2458 catch(PDNSException& ae) {
2459 L<<Logger::Error<<"Error dealing with control socket request: "<<ae.reason<<endl;
2460 }
2461 }
2462
2463 static void handleTCPClientReadable(int fd, FDMultiplexer::funcparam_t& var)
2464 {
2465 PacketID* pident=any_cast<PacketID>(&var);
2466 // cerr<<"handleTCPClientReadable called for fd "<<fd<<", pident->inNeeded: "<<pident->inNeeded<<", "<<pident->sock->getHandle()<<endl;
2467
2468 shared_array<char> buffer(new char[pident->inNeeded]);
2469
2470 ssize_t ret=recv(fd, buffer.get(), pident->inNeeded,0);
2471 if(ret > 0) {
2472 pident->inMSG.append(&buffer[0], &buffer[ret]);
2473 pident->inNeeded-=(size_t)ret;
2474 if(!pident->inNeeded || pident->inIncompleteOkay) {
2475 // cerr<<"Got entire load of "<<pident->inMSG.size()<<" bytes"<<endl;
2476 PacketID pid=*pident;
2477 string msg=pident->inMSG;
2478
2479 t_fdm->removeReadFD(fd);
2480 MT->sendEvent(pid, &msg);
2481 }
2482 else {
2483 // cerr<<"Still have "<<pident->inNeeded<<" left to go"<<endl;
2484 }
2485 }
2486 else {
2487 PacketID tmp=*pident;
2488 t_fdm->removeReadFD(fd); // pident might now be invalid (it isn't, but still)
2489 string empty;
2490 MT->sendEvent(tmp, &empty); // this conveys error status
2491 }
2492 }
2493
2494 static void handleTCPClientWritable(int fd, FDMultiplexer::funcparam_t& var)
2495 {
2496 PacketID* pid=any_cast<PacketID>(&var);
2497 ssize_t ret=send(fd, pid->outMSG.c_str() + pid->outPos, pid->outMSG.size() - pid->outPos,0);
2498 if(ret > 0) {
2499 pid->outPos+=(ssize_t)ret;
2500 if(pid->outPos==pid->outMSG.size()) {
2501 PacketID tmp=*pid;
2502 t_fdm->removeWriteFD(fd);
2503 MT->sendEvent(tmp, &tmp.outMSG); // send back what we sent to convey everything is ok
2504 }
2505 }
2506 else { // error or EOF
2507 PacketID tmp(*pid);
2508 t_fdm->removeWriteFD(fd);
2509 string sent;
2510 MT->sendEvent(tmp, &sent); // we convey error status by sending empty string
2511 }
2512 }
2513
2514 // resend event to everybody chained onto it
2515 static void doResends(MT_t::waiters_t::iterator& iter, PacketID resend, const string& content)
2516 {
2517 if(iter->key.chain.empty())
2518 return;
2519 // cerr<<"doResends called!\n";
2520 for(PacketID::chain_t::iterator i=iter->key.chain.begin(); i != iter->key.chain.end() ; ++i) {
2521 resend.fd=-1;
2522 resend.id=*i;
2523 // cerr<<"\tResending "<<content.size()<<" bytes for fd="<<resend.fd<<" and id="<<resend.id<<endl;
2524
2525 MT->sendEvent(resend, &content);
2526 g_stats.chainResends++;
2527 }
2528 }
2529
2530 static void handleUDPServerResponse(int fd, FDMultiplexer::funcparam_t& var)
2531 {
2532 PacketID pid=any_cast<PacketID>(var);
2533 ssize_t len;
2534 char data[g_outgoingEDNSBufsize];
2535 ComboAddress fromaddr;
2536 socklen_t addrlen=sizeof(fromaddr);
2537
2538 len=recvfrom(fd, data, sizeof(data), 0, (sockaddr *)&fromaddr, &addrlen);
2539
2540 if(len < (ssize_t) sizeof(dnsheader)) {
2541 if(len < 0)
2542 ; // cerr<<"Error on fd "<<fd<<": "<<stringerror()<<"\n";
2543 else {
2544 g_stats.serverParseError++;
2545 if(g_logCommonErrors)
2546 L<<Logger::Error<<"Unable to parse packet from remote UDP server "<< fromaddr.toString() <<
2547 ": packet smaller than DNS header"<<endl;
2548 }
2549
2550 t_udpclientsocks->returnSocket(fd);
2551 string empty;
2552
2553 MT_t::waiters_t::iterator iter=MT->d_waiters.find(pid);
2554 if(iter != MT->d_waiters.end())
2555 doResends(iter, pid, empty);
2556
2557 MT->sendEvent(pid, &empty); // this denotes error (does lookup again.. at least L1 will be hot)
2558 return;
2559 }
2560
2561 dnsheader dh;
2562 memcpy(&dh, data, sizeof(dh));
2563
2564 PacketID pident;
2565 pident.remote=fromaddr;
2566 pident.id=dh.id;
2567 pident.fd=fd;
2568
2569 if(!dh.qr && g_logCommonErrors) {
2570 L<<Logger::Notice<<"Not taking data from question on outgoing socket from "<< fromaddr.toStringWithPort() <<endl;
2571 }
2572
2573 if(!dh.qdcount || // UPC, Nominum, very old BIND on FormErr, NSD
2574 !dh.qr) { // one weird server
2575 pident.domain.clear();
2576 pident.type = 0;
2577 }
2578 else {
2579 try {
2580 if(len > 12)
2581 pident.domain=DNSName(data, len, 12, false, &pident.type); // don't copy this from above - we need to do the actual read
2582 }
2583 catch(std::exception& e) {
2584 g_stats.serverParseError++; // won't be fed to lwres.cc, so we have to increment
2585 L<<Logger::Warning<<"Error in packet from remote nameserver "<< fromaddr.toStringWithPort() << ": "<<e.what() << endl;
2586 return;
2587 }
2588 }
2589 string packet;
2590 packet.assign(data, len);
2591
2592 MT_t::waiters_t::iterator iter=MT->d_waiters.find(pident);
2593 if(iter != MT->d_waiters.end()) {
2594 doResends(iter, pident, packet);
2595 }
2596
2597 retryWithName:
2598
2599 if(!MT->sendEvent(pident, &packet)) {
2600 // we do a full scan for outstanding queries on unexpected answers. not too bad since we only accept them on the right port number, which is hard enough to guess
2601 for(MT_t::waiters_t::iterator mthread=MT->d_waiters.begin(); mthread!=MT->d_waiters.end(); ++mthread) {
2602 if(pident.fd==mthread->key.fd && mthread->key.remote==pident.remote && mthread->key.type == pident.type &&
2603 pident.domain == mthread->key.domain) {
2604 mthread->key.nearMisses++;
2605 }
2606
2607 // be a bit paranoid here since we're weakening our matching
2608 if(pident.domain.empty() && !mthread->key.domain.empty() && !pident.type && mthread->key.type &&
2609 pident.id == mthread->key.id && mthread->key.remote == pident.remote) {
2610 // cerr<<"Empty response, rest matches though, sending to a waiter"<<endl;
2611 pident.domain = mthread->key.domain;
2612 pident.type = mthread->key.type;
2613 goto retryWithName; // note that this only passes on an error, lwres will still reject the packet
2614 }
2615 }
2616 g_stats.unexpectedCount++; // if we made it here, it really is an unexpected answer
2617 if(g_logCommonErrors) {
2618 L<<Logger::Warning<<"Discarding unexpected packet from "<<fromaddr.toStringWithPort()<<": "<< (pident.domain.empty() ? "<empty>" : pident.domain.toString())<<", "<<pident.type<<", "<<MT->d_waiters.size()<<" waiters"<<endl;
2619 }
2620 }
2621 else if(fd >= 0) {
2622 t_udpclientsocks->returnSocket(fd);
2623 }
2624 }
2625
2626 FDMultiplexer* getMultiplexer()
2627 {
2628 FDMultiplexer* ret;
2629 for(const auto& i : FDMultiplexer::getMultiplexerMap()) {
2630 try {
2631 ret=i.second();
2632 return ret;
2633 }
2634 catch(FDMultiplexerException &fe) {
2635 L<<Logger::Error<<"Non-fatal error initializing possible multiplexer ("<<fe.what()<<"), falling back"<<endl;
2636 }
2637 catch(...) {
2638 L<<Logger::Error<<"Non-fatal error initializing possible multiplexer"<<endl;
2639 }
2640 }
2641 L<<Logger::Error<<"No working multiplexer found!"<<endl;
2642 exit(1);
2643 }
2644
2645
2646 static string* doReloadLuaScript()
2647 {
2648 string fname= ::arg()["lua-dns-script"];
2649 try {
2650 if(fname.empty()) {
2651 t_pdl.reset();
2652 L<<Logger::Error<<t_id<<" Unloaded current lua script"<<endl;
2653 return new string("unloaded\n");
2654 }
2655 else {
2656 t_pdl = std::make_shared<RecursorLua4>(fname);
2657 }
2658 }
2659 catch(std::exception& e) {
2660 L<<Logger::Error<<t_id<<" Retaining current script, error from '"<<fname<<"': "<< e.what() <<endl;
2661 return new string("retaining current script, error from '"+fname+"': "+e.what()+"\n");
2662 }
2663
2664 L<<Logger::Warning<<t_id<<" (Re)loaded lua script from '"<<fname<<"'"<<endl;
2665 return new string("(re)loaded '"+fname+"'\n");
2666 }
2667
2668 string doQueueReloadLuaScript(vector<string>::const_iterator begin, vector<string>::const_iterator end)
2669 {
2670 if(begin != end)
2671 ::arg().set("lua-dns-script") = *begin;
2672
2673 return broadcastAccFunction<string>(doReloadLuaScript);
2674 }
2675
2676 static string* pleaseUseNewTraceRegex(const std::string& newRegex)
2677 try
2678 {
2679 if(newRegex.empty()) {
2680 t_traceRegex.reset();
2681 return new string("unset\n");
2682 }
2683 else {
2684 t_traceRegex = std::make_shared<Regex>(newRegex);
2685 return new string("ok\n");
2686 }
2687 }
2688 catch(PDNSException& ae)
2689 {
2690 return new string(ae.reason+"\n");
2691 }
2692
2693 string doTraceRegex(vector<string>::const_iterator begin, vector<string>::const_iterator end)
2694 {
2695 return broadcastAccFunction<string>(boost::bind(pleaseUseNewTraceRegex, begin!=end ? *begin : ""));
2696 }
2697
2698 static void checkLinuxIPv6Limits()
2699 {
2700 #ifdef __linux__
2701 string line;
2702 if(readFileIfThere("/proc/sys/net/ipv6/route/max_size", &line)) {
2703 int lim=std::stoi(line);
2704 if(lim < 16384) {
2705 L<<Logger::Error<<"If using IPv6, please raise sysctl net.ipv6.route.max_size, currently set to "<<lim<<" which is < 16384"<<endl;
2706 }
2707 }
2708 #endif
2709 }
2710 static void checkOrFixFDS()
2711 {
2712 unsigned int availFDs=getFilenumLimit();
2713 unsigned int wantFDs = g_maxMThreads * g_numWorkerThreads +25; // even healthier margin then before
2714
2715 if(wantFDs > availFDs) {
2716 unsigned int hardlimit= getFilenumLimit(true);
2717 if(hardlimit >= wantFDs) {
2718 setFilenumLimit(wantFDs);
2719 L<<Logger::Warning<<"Raised soft limit on number of filedescriptors to "<<wantFDs<<" to match max-mthreads and threads settings"<<endl;
2720 }
2721 else {
2722 int newval = (hardlimit - 25) / g_numWorkerThreads;
2723 L<<Logger::Warning<<"Insufficient number of filedescriptors available for max-mthreads*threads setting! ("<<hardlimit<<" < "<<wantFDs<<"), reducing max-mthreads to "<<newval<<endl;
2724 g_maxMThreads = newval;
2725 setFilenumLimit(hardlimit);
2726 }
2727 }
2728 }
2729
2730 static void* recursorThread(int tid, bool worker);
2731
2732 static void* pleaseSupplantACLs(std::shared_ptr<NetmaskGroup> ng)
2733 {
2734 t_allowFrom = ng;
2735 return nullptr;
2736 }
2737
2738 int g_argc;
2739 char** g_argv;
2740
2741 void parseACLs()
2742 {
2743 static bool l_initialized;
2744
2745 if(l_initialized) { // only reload configuration file on second call
2746 string configname=::arg()["config-dir"]+"/recursor.conf";
2747 if(::arg()["config-name"]!="") {
2748 configname=::arg()["config-dir"]+"/recursor-"+::arg()["config-name"]+".conf";
2749 }
2750 cleanSlashes(configname);
2751
2752 if(!::arg().preParseFile(configname.c_str(), "allow-from-file"))
2753 throw runtime_error("Unable to re-parse configuration file '"+configname+"'");
2754 ::arg().preParseFile(configname.c_str(), "allow-from", LOCAL_NETS);
2755 ::arg().preParseFile(configname.c_str(), "include-dir");
2756 ::arg().preParse(g_argc, g_argv, "include-dir");
2757
2758 // then process includes
2759 std::vector<std::string> extraConfigs;
2760 ::arg().gatherIncludes(extraConfigs);
2761
2762 for(const std::string& fn : extraConfigs) {
2763 if(!::arg().preParseFile(fn.c_str(), "allow-from-file", ::arg()["allow-from-file"]))
2764 throw runtime_error("Unable to re-parse configuration file include '"+fn+"'");
2765 if(!::arg().preParseFile(fn.c_str(), "allow-from", ::arg()["allow-from"]))
2766 throw runtime_error("Unable to re-parse configuration file include '"+fn+"'");
2767 }
2768
2769 ::arg().preParse(g_argc, g_argv, "allow-from-file");
2770 ::arg().preParse(g_argc, g_argv, "allow-from");
2771 }
2772
2773 std::shared_ptr<NetmaskGroup> oldAllowFrom = t_allowFrom;
2774 std::shared_ptr<NetmaskGroup> allowFrom = std::make_shared<NetmaskGroup>();
2775
2776 if(!::arg()["allow-from-file"].empty()) {
2777 string line;
2778 ifstream ifs(::arg()["allow-from-file"].c_str());
2779 if(!ifs) {
2780 throw runtime_error("Could not open '"+::arg()["allow-from-file"]+"': "+stringerror());
2781 }
2782
2783 string::size_type pos;
2784 while(getline(ifs,line)) {
2785 pos=line.find('#');
2786 if(pos!=string::npos)
2787 line.resize(pos);
2788 trim(line);
2789 if(line.empty())
2790 continue;
2791
2792 allowFrom->addMask(line);
2793 }
2794 L<<Logger::Warning<<"Done parsing " << allowFrom->size() <<" allow-from ranges from file '"<<::arg()["allow-from-file"]<<"' - overriding 'allow-from' setting"<<endl;
2795 }
2796 else if(!::arg()["allow-from"].empty()) {
2797 vector<string> ips;
2798 stringtok(ips, ::arg()["allow-from"], ", ");
2799
2800 L<<Logger::Warning<<"Only allowing queries from: ";
2801 for(vector<string>::const_iterator i = ips.begin(); i!= ips.end(); ++i) {
2802 allowFrom->addMask(*i);
2803 if(i!=ips.begin())
2804 L<<Logger::Warning<<", ";
2805 L<<Logger::Warning<<*i;
2806 }
2807 L<<Logger::Warning<<endl;
2808 }
2809 else {
2810 if(::arg()["local-address"]!="127.0.0.1" && ::arg().asNum("local-port")==53)
2811 L<<Logger::Error<<"WARNING: Allowing queries from all IP addresses - this can be a security risk!"<<endl;
2812 allowFrom = nullptr;
2813 }
2814
2815 g_initialAllowFrom = allowFrom;
2816 broadcastFunction(boost::bind(pleaseSupplantACLs, allowFrom));
2817 oldAllowFrom = nullptr;
2818
2819 l_initialized = true;
2820 }
2821
2822
2823 static void setupDelegationOnly()
2824 {
2825 vector<string> parts;
2826 stringtok(parts, ::arg()["delegation-only"], ", \t");
2827 for(const auto& p : parts) {
2828 SyncRes::addDelegationOnly(DNSName(p));
2829 }
2830 }
2831
2832 static std::map<unsigned int, std::set<int> > parseCPUMap()
2833 {
2834 std::map<unsigned int, std::set<int> > result;
2835
2836 const std::string value = ::arg()["cpu-map"];
2837
2838 if (!value.empty() && !isSettingThreadCPUAffinitySupported()) {
2839 L<<Logger::Warning<<"CPU mapping requested but not supported, skipping"<<endl;
2840 return result;
2841 }
2842
2843 std::vector<std::string> parts;
2844
2845 stringtok(parts, value, " \t");
2846
2847 for(const auto& part : parts) {
2848 if (part.find('=') == string::npos)
2849 continue;
2850
2851 try {
2852 auto headers = splitField(part, '=');
2853 trim(headers.first);
2854 trim(headers.second);
2855
2856 unsigned int threadId = pdns_stou(headers.first);
2857 std::vector<std::string> cpus;
2858
2859 stringtok(cpus, headers.second, ",");
2860
2861 for(const auto& cpu : cpus) {
2862 int cpuId = std::stoi(cpu);
2863
2864 result[threadId].insert(cpuId);
2865 }
2866 }
2867 catch(const std::exception& e) {
2868 L<<Logger::Error<<"Error parsing cpu-map entry '"<<part<<"': "<<e.what()<<endl;
2869 }
2870 }
2871
2872 return result;
2873 }
2874
2875 static void setCPUMap(const std::map<unsigned int, std::set<int> >& cpusMap, unsigned int n, pthread_t tid)
2876 {
2877 const auto& cpuMapping = cpusMap.find(n);
2878 if (cpuMapping != cpusMap.cend()) {
2879 int rc = mapThreadToCPUList(tid, cpuMapping->second);
2880 if (rc == 0) {
2881 L<<Logger::Info<<"CPU affinity for worker "<<n<<" has been set to CPU map:";
2882 for (const auto cpu : cpuMapping->second) {
2883 L<<Logger::Info<<" "<<cpu;
2884 }
2885 L<<Logger::Info<<endl;
2886 }
2887 else {
2888 L<<Logger::Warning<<"Error setting CPU affinity for worker "<<n<<" to CPU map:";
2889 for (const auto cpu : cpuMapping->second) {
2890 L<<Logger::Info<<" "<<cpu;
2891 }
2892 L<<Logger::Info<<strerror(rc)<<endl;
2893 }
2894 }
2895 }
2896
2897 static int serviceMain(int argc, char*argv[])
2898 {
2899 L.setName(s_programname);
2900 L.disableSyslog(::arg().mustDo("disable-syslog"));
2901 L.setTimestamps(::arg().mustDo("log-timestamp"));
2902
2903 if(!::arg()["logging-facility"].empty()) {
2904 int val=logFacilityToLOG(::arg().asNum("logging-facility") );
2905 if(val >= 0)
2906 theL().setFacility(val);
2907 else
2908 L<<Logger::Error<<"Unknown logging facility "<<::arg().asNum("logging-facility") <<endl;
2909 }
2910
2911 showProductVersion();
2912 seedRandom(::arg()["entropy-source"]);
2913
2914 g_disthashseed=dns_random(0xffffffff);
2915
2916 checkLinuxIPv6Limits();
2917 try {
2918 vector<string> addrs;
2919 if(!::arg()["query-local-address6"].empty()) {
2920 SyncRes::s_doIPv6=true;
2921 L<<Logger::Warning<<"Enabling IPv6 transport for outgoing queries"<<endl;
2922
2923 stringtok(addrs, ::arg()["query-local-address6"], ", ;");
2924 for(const string& addr : addrs) {
2925 g_localQueryAddresses6.push_back(ComboAddress(addr));
2926 }
2927 }
2928 else {
2929 L<<Logger::Warning<<"NOT using IPv6 for outgoing queries - set 'query-local-address6=::' to enable"<<endl;
2930 }
2931 addrs.clear();
2932 stringtok(addrs, ::arg()["query-local-address"], ", ;");
2933 for(const string& addr : addrs) {
2934 g_localQueryAddresses4.push_back(ComboAddress(addr));
2935 }
2936 }
2937 catch(std::exception& e) {
2938 L<<Logger::Error<<"Assigning local query addresses: "<<e.what();
2939 exit(99);
2940 }
2941
2942 // keep this ABOVE loadRecursorLuaConfig!
2943 if(::arg()["dnssec"]=="off")
2944 g_dnssecmode=DNSSECMode::Off;
2945 else if(::arg()["dnssec"]=="process-no-validate")
2946 g_dnssecmode=DNSSECMode::ProcessNoValidate;
2947 else if(::arg()["dnssec"]=="process")
2948 g_dnssecmode=DNSSECMode::Process;
2949 else if(::arg()["dnssec"]=="validate")
2950 g_dnssecmode=DNSSECMode::ValidateAll;
2951 else if(::arg()["dnssec"]=="log-fail")
2952 g_dnssecmode=DNSSECMode::ValidateForLog;
2953 else {
2954 L<<Logger::Error<<"Unknown DNSSEC mode "<<::arg()["dnssec"]<<endl;
2955 exit(1);
2956 }
2957
2958 g_dnssecLogBogus = ::arg().mustDo("dnssec-log-bogus");
2959 g_maxNSEC3Iterations = ::arg().asNum("nsec3-max-iterations");
2960
2961 g_maxCacheEntries = ::arg().asNum("max-cache-entries");
2962 g_maxPacketCacheEntries = ::arg().asNum("max-packetcache-entries");
2963
2964 try {
2965 loadRecursorLuaConfig(::arg()["lua-config-file"], ::arg().mustDo("daemon"));
2966 }
2967 catch (PDNSException &e) {
2968 L<<Logger::Error<<"Cannot load Lua configuration: "<<e.reason<<endl;
2969 exit(1);
2970 }
2971
2972 parseACLs();
2973 sortPublicSuffixList();
2974
2975 if(!::arg()["dont-query"].empty()) {
2976 vector<string> ips;
2977 stringtok(ips, ::arg()["dont-query"], ", ");
2978 ips.push_back("0.0.0.0");
2979 ips.push_back("::");
2980
2981 L<<Logger::Warning<<"Will not send queries to: ";
2982 for(vector<string>::const_iterator i = ips.begin(); i!= ips.end(); ++i) {
2983 SyncRes::addDontQuery(*i);
2984 if(i!=ips.begin())
2985 L<<Logger::Warning<<", ";
2986 L<<Logger::Warning<<*i;
2987 }
2988 L<<Logger::Warning<<endl;
2989 }
2990
2991 g_quiet=::arg().mustDo("quiet");
2992
2993 g_weDistributeQueries = ::arg().mustDo("pdns-distributes-queries");
2994 if(g_weDistributeQueries) {
2995 L<<Logger::Warning<<"PowerDNS Recursor itself will distribute queries over threads"<<endl;
2996 }
2997
2998 setupDelegationOnly();
2999 g_outgoingEDNSBufsize=::arg().asNum("edns-outgoing-bufsize");
3000
3001 if(::arg()["trace"]=="fail") {
3002 SyncRes::setDefaultLogMode(SyncRes::Store);
3003 }
3004 else if(::arg().mustDo("trace")) {
3005 SyncRes::setDefaultLogMode(SyncRes::Log);
3006 ::arg().set("quiet")="no";
3007 g_quiet=false;
3008 g_dnssecLOG=true;
3009 }
3010
3011 SyncRes::s_minimumTTL = ::arg().asNum("minimum-ttl-override");
3012
3013 SyncRes::s_nopacketcache = ::arg().mustDo("disable-packetcache");
3014
3015 SyncRes::s_maxnegttl=::arg().asNum("max-negative-ttl");
3016 SyncRes::s_maxcachettl=max(::arg().asNum("max-cache-ttl"), 15);
3017 SyncRes::s_packetcachettl=::arg().asNum("packetcache-ttl");
3018 // Cap the packetcache-servfail-ttl to the packetcache-ttl
3019 uint32_t packetCacheServFailTTL = ::arg().asNum("packetcache-servfail-ttl");
3020 SyncRes::s_packetcacheservfailttl=(packetCacheServFailTTL > SyncRes::s_packetcachettl) ? SyncRes::s_packetcachettl : packetCacheServFailTTL;
3021 SyncRes::s_serverdownmaxfails=::arg().asNum("server-down-max-fails");
3022 SyncRes::s_serverdownthrottletime=::arg().asNum("server-down-throttle-time");
3023 SyncRes::s_serverID=::arg()["server-id"];
3024 SyncRes::s_maxqperq=::arg().asNum("max-qperq");
3025 SyncRes::s_maxtotusec=1000*::arg().asNum("max-total-msec");
3026 SyncRes::s_maxdepth=::arg().asNum("max-recursion-depth");
3027 SyncRes::s_rootNXTrust = ::arg().mustDo( "root-nx-trust");
3028 if(SyncRes::s_serverID.empty()) {
3029 char tmp[128];
3030 gethostname(tmp, sizeof(tmp)-1);
3031 SyncRes::s_serverID=tmp;
3032 }
3033
3034 SyncRes::s_ecsipv4limit = ::arg().asNum("ecs-ipv4-bits");
3035 SyncRes::s_ecsipv6limit = ::arg().asNum("ecs-ipv6-bits");
3036
3037 if (!::arg().isEmpty("ecs-scope-zero-address")) {
3038 ComboAddress scopeZero(::arg()["ecs-scope-zero-address"]);
3039 SyncRes::setECSScopeZeroAddress(Netmask(scopeZero, scopeZero.isIPv4() ? 32 : 128));
3040 }
3041 else {
3042 bool found = false;
3043 for (const auto& addr : g_localQueryAddresses4) {
3044 if (!IsAnyAddress(addr)) {
3045 SyncRes::setECSScopeZeroAddress(Netmask(addr, 32));
3046 found = true;
3047 break;
3048 }
3049 }
3050 if (!found) {
3051 for (const auto& addr : g_localQueryAddresses6) {
3052 if (!IsAnyAddress(addr)) {
3053 SyncRes::setECSScopeZeroAddress(Netmask(addr, 128));
3054 found = true;
3055 break;
3056 }
3057 }
3058 if (!found) {
3059 SyncRes::setECSScopeZeroAddress(Netmask("127.0.0.1/32"));
3060 }
3061 }
3062 }
3063
3064 g_networkTimeoutMsec = ::arg().asNum("network-timeout");
3065
3066 g_initialDomainMap = parseAuthAndForwards();
3067
3068 g_latencyStatSize=::arg().asNum("latency-statistic-size");
3069
3070 g_logCommonErrors=::arg().mustDo("log-common-errors");
3071 g_logRPZChanges = ::arg().mustDo("log-rpz-changes");
3072
3073 g_anyToTcp = ::arg().mustDo("any-to-tcp");
3074 g_udpTruncationThreshold = ::arg().asNum("udp-truncation-threshold");
3075
3076 g_lowercaseOutgoing = ::arg().mustDo("lowercase-outgoing");
3077
3078 g_numWorkerThreads = ::arg().asNum("threads");
3079 if (g_numWorkerThreads < 1) {
3080 L<<Logger::Warning<<"Asked to run with 0 threads, raising to 1 instead"<<endl;
3081 g_numWorkerThreads = 1;
3082 }
3083
3084 g_numThreads = g_numWorkerThreads + g_weDistributeQueries;
3085 g_maxMThreads = ::arg().asNum("max-mthreads");
3086
3087 g_gettagNeedsEDNSOptions = ::arg().mustDo("gettag-needs-edns-options");
3088
3089 g_statisticsInterval = ::arg().asNum("statistics-interval");
3090
3091 #ifdef SO_REUSEPORT
3092 g_reusePort = ::arg().mustDo("reuseport");
3093 #endif
3094
3095 g_useOneSocketPerThread = (!g_weDistributeQueries && g_reusePort);
3096
3097 if (g_useOneSocketPerThread) {
3098 for (unsigned int threadId = 0; threadId < g_numWorkerThreads; threadId++) {
3099 makeUDPServerSockets(threadId);
3100 makeTCPServerSockets(threadId);
3101 }
3102 }
3103 else {
3104 makeUDPServerSockets(0);
3105 makeTCPServerSockets(0);
3106
3107 if (!g_weDistributeQueries) {
3108 /* we are not distributing queries and we don't have reuseport,
3109 so every thread will be listening on all the TCP sockets */
3110 for (unsigned int threadId = 1; threadId < g_numWorkerThreads; threadId++) {
3111 g_tcpListenSockets[threadId] = g_tcpListenSockets[0];
3112 }
3113 }
3114 }
3115
3116 SyncRes::parseEDNSSubnetWhitelist(::arg()["edns-subnet-whitelist"]);
3117 g_useIncomingECS = ::arg().mustDo("use-incoming-edns-subnet");
3118
3119 int forks;
3120 for(forks = 0; forks < ::arg().asNum("processes") - 1; ++forks) {
3121 if(!fork()) // we are child
3122 break;
3123 }
3124
3125 if(::arg().mustDo("daemon")) {
3126 L<<Logger::Warning<<"Calling daemonize, going to background"<<endl;
3127 L.toConsole(Logger::Critical);
3128 daemonize();
3129 loadRecursorLuaConfig(::arg()["lua-config-file"], false);
3130 }
3131 signal(SIGUSR1,usr1Handler);
3132 signal(SIGUSR2,usr2Handler);
3133 signal(SIGPIPE,SIG_IGN);
3134
3135 checkOrFixFDS();
3136
3137 #ifdef HAVE_LIBSODIUM
3138 if (sodium_init() == -1) {
3139 L<<Logger::Error<<"Unable to initialize sodium crypto library"<<endl;
3140 exit(99);
3141 }
3142 #endif
3143
3144 openssl_thread_setup();
3145 openssl_seed();
3146
3147 int newgid=0;
3148 if(!::arg()["setgid"].empty())
3149 newgid=Utility::makeGidNumeric(::arg()["setgid"]);
3150 int newuid=0;
3151 if(!::arg()["setuid"].empty())
3152 newuid=Utility::makeUidNumeric(::arg()["setuid"]);
3153
3154 Utility::dropGroupPrivs(newuid, newgid);
3155
3156 if (!::arg()["chroot"].empty()) {
3157 #ifdef HAVE_SYSTEMD
3158 char *ns;
3159 ns = getenv("NOTIFY_SOCKET");
3160 if (ns != nullptr) {
3161 L<<Logger::Error<<"Unable to chroot when running from systemd. Please disable chroot= or set the 'Type' for this service to 'simple'"<<endl;
3162 exit(1);
3163 }
3164 #endif
3165 if (chroot(::arg()["chroot"].c_str())<0 || chdir("/") < 0) {
3166 L<<Logger::Error<<"Unable to chroot to '"+::arg()["chroot"]+"': "<<strerror (errno)<<", exiting"<<endl;
3167 exit(1);
3168 }
3169 else
3170 L<<Logger::Error<<"Chrooted to '"<<::arg()["chroot"]<<"'"<<endl;
3171 }
3172
3173 s_pidfname=::arg()["socket-dir"]+"/"+s_programname+".pid";
3174 if(!s_pidfname.empty())
3175 unlink(s_pidfname.c_str()); // remove possible old pid file
3176 writePid();
3177
3178 makeControlChannelSocket( ::arg().asNum("processes") > 1 ? forks : -1);
3179
3180 Utility::dropUserPrivs(newuid);
3181
3182 makeThreadPipes();
3183
3184 g_tcpTimeout=::arg().asNum("client-tcp-timeout");
3185 g_maxTCPPerClient=::arg().asNum("max-tcp-per-client");
3186 g_tcpMaxQueriesPerConn=::arg().asNum("max-tcp-queries-per-connection");
3187 s_maxUDPQueriesPerRound=::arg().asNum("max-udp-queries-per-round");
3188
3189 if (::arg().mustDo("snmp-agent")) {
3190 g_snmpAgent = std::make_shared<RecursorSNMPAgent>("recursor", ::arg()["snmp-master-socket"]);
3191 g_snmpAgent->run();
3192 }
3193
3194 /* This thread handles the web server, carbon, statistics and the control channel */
3195 std::thread handlerThread(recursorThread, s_handlerThreadID, false);
3196
3197 const auto cpusMap = parseCPUMap();
3198
3199 std::vector<std::thread> workers(g_numThreads);
3200 if(g_numThreads == 1) {
3201 L<<Logger::Warning<<"Operating unthreaded"<<endl;
3202 #ifdef HAVE_SYSTEMD
3203 sd_notify(0, "READY=1");
3204 #endif
3205 setCPUMap(cpusMap, 0, pthread_self());
3206 recursorThread(0, true);
3207 }
3208 else {
3209 L<<Logger::Warning<<"Launching "<< g_numThreads <<" threads"<<endl;
3210 for(unsigned int n=0; n < g_numThreads; ++n) {
3211 workers[n] = std::thread(recursorThread, n, true);
3212
3213 setCPUMap(cpusMap, n, workers[n].native_handle());
3214 }
3215 #ifdef HAVE_SYSTEMD
3216 sd_notify(0, "READY=1");
3217 #endif
3218 workers.back().join();
3219 }
3220 return 0;
3221 }
3222
3223 static void* recursorThread(int n, bool worker)
3224 try
3225 {
3226 t_id=n;
3227 SyncRes tmp(g_now); // make sure it allocates tsstorage before we do anything, like primeHints or so..
3228 SyncRes::setDomainMap(g_initialDomainMap);
3229 t_allowFrom = g_initialAllowFrom;
3230 t_udpclientsocks = std::unique_ptr<UDPClientSocks>(new UDPClientSocks());
3231 t_tcpClientCounts = std::unique_ptr<tcpClientCounts_t>(new tcpClientCounts_t());
3232 primeHints();
3233
3234 t_packetCache = std::unique_ptr<RecursorPacketCache>(new RecursorPacketCache());
3235
3236 #ifdef HAVE_PROTOBUF
3237 t_uuidGenerator = std::unique_ptr<boost::uuids::random_generator>(new boost::uuids::random_generator());
3238 #endif
3239 L<<Logger::Warning<<"Done priming cache with root hints"<<endl;
3240
3241 if(worker && (!g_weDistributeQueries || t_id != s_distributorThreadID)) {
3242 try {
3243 if(!::arg()["lua-dns-script"].empty()) {
3244 t_pdl = std::make_shared<RecursorLua4>(::arg()["lua-dns-script"]);
3245 L<<Logger::Warning<<"Loaded 'lua' script from '"<<::arg()["lua-dns-script"]<<"'"<<endl;
3246 }
3247 }
3248 catch(std::exception &e) {
3249 L<<Logger::Error<<"Failed to load 'lua' script from '"<<::arg()["lua-dns-script"]<<"': "<<e.what()<<endl;
3250 _exit(99);
3251 }
3252 }
3253
3254 unsigned int ringsize=::arg().asNum("stats-ringbuffer-entries") / g_numWorkerThreads;
3255 if(ringsize) {
3256 t_remotes = std::unique_ptr<addrringbuf_t>(new addrringbuf_t());
3257 if(g_weDistributeQueries) // if so, only 1 thread does recvfrom
3258 t_remotes->set_capacity(::arg().asNum("stats-ringbuffer-entries"));
3259 else
3260 t_remotes->set_capacity(ringsize);
3261 t_servfailremotes = std::unique_ptr<addrringbuf_t>(new addrringbuf_t());
3262 t_servfailremotes->set_capacity(ringsize);
3263 t_largeanswerremotes = std::unique_ptr<addrringbuf_t>(new addrringbuf_t());
3264 t_largeanswerremotes->set_capacity(ringsize);
3265
3266 t_queryring = std::unique_ptr<boost::circular_buffer<pair<DNSName, uint16_t> > >(new boost::circular_buffer<pair<DNSName, uint16_t> >());
3267 t_queryring->set_capacity(ringsize);
3268 t_servfailqueryring = std::unique_ptr<boost::circular_buffer<pair<DNSName, uint16_t> > >(new boost::circular_buffer<pair<DNSName, uint16_t> >());
3269 t_servfailqueryring->set_capacity(ringsize);
3270 }
3271
3272 MT=std::unique_ptr<MTasker<PacketID,string> >(new MTasker<PacketID,string>(::arg().asNum("stack-size")));
3273
3274 PacketID pident;
3275
3276 t_fdm=getMultiplexer();
3277
3278 if(!worker) {
3279 if(::arg().mustDo("webserver")) {
3280 L<<Logger::Warning << "Enabling web server" << endl;
3281 try {
3282 new RecursorWebServer(t_fdm);
3283 }
3284 catch(PDNSException &e) {
3285 L<<Logger::Error<<"Exception: "<<e.reason<<endl;
3286 exit(99);
3287 }
3288 }
3289 L<<Logger::Error<<"Enabled '"<< t_fdm->getName() << "' multiplexer"<<endl;
3290 }
3291 else {
3292 t_fdm->addReadFD(g_pipes[t_id].readToThread, handlePipeRequest);
3293 t_fdm->addReadFD(g_pipes[t_id].readQueriesToThread, handlePipeRequest);
3294
3295 if(g_useOneSocketPerThread) {
3296 for(deferredAdd_t::const_iterator i = deferredAdds[t_id].cbegin(); i != deferredAdds[t_id].cend(); ++i) {
3297 t_fdm->addReadFD(i->first, i->second);
3298 }
3299 }
3300 else {
3301 if(!g_weDistributeQueries || t_id == s_distributorThreadID) { // if we distribute queries, only t_id = 0 listens
3302 for(deferredAdd_t::const_iterator i = deferredAdds[0].cbegin(); i != deferredAdds[0].cend(); ++i) {
3303 t_fdm->addReadFD(i->first, i->second);
3304 }
3305 }
3306 }
3307 }
3308
3309 registerAllStats();
3310
3311 if(!worker) {
3312 t_fdm->addReadFD(s_rcc.d_fd, handleRCC); // control channel
3313 }
3314
3315 unsigned int maxTcpClients=::arg().asNum("max-tcp-clients");
3316
3317 bool listenOnTCP(true);
3318
3319 time_t last_stat = 0;
3320 time_t last_carbon=0;
3321 time_t carbonInterval=::arg().asNum("carbon-interval");
3322 counter.store(0); // used to periodically execute certain tasks
3323 for(;;) {
3324 while(MT->schedule(&g_now)); // MTasker letting the mthreads do their thing
3325
3326 if(!(counter%500)) {
3327 MT->makeThread(houseKeeping, 0);
3328 }
3329
3330 if(!(counter%55)) {
3331 typedef vector<pair<int, FDMultiplexer::funcparam_t> > expired_t;
3332 expired_t expired=t_fdm->getTimeouts(g_now);
3333
3334 for(expired_t::iterator i=expired.begin() ; i != expired.end(); ++i) {
3335 shared_ptr<TCPConnection> conn=any_cast<shared_ptr<TCPConnection> >(i->second);
3336 if(g_logCommonErrors)
3337 L<<Logger::Warning<<"Timeout from remote TCP client "<< conn->d_remote.toString() <<endl;
3338 t_fdm->removeReadFD(i->first);
3339 }
3340 }
3341
3342 counter++;
3343
3344 if(!worker) {
3345 if(statsWanted || (g_statisticsInterval > 0 && (g_now.tv_sec - last_stat) >= g_statisticsInterval)) {
3346 doStats();
3347 last_stat = g_now.tv_sec;
3348 }
3349
3350 Utility::gettimeofday(&g_now, 0);
3351
3352 if((g_now.tv_sec - last_carbon) >= carbonInterval) {
3353 MT->makeThread(doCarbonDump, 0);
3354 last_carbon = g_now.tv_sec;
3355 }
3356 }
3357
3358 t_fdm->run(&g_now);
3359 // 'run' updates g_now for us
3360
3361 if(worker && (!g_weDistributeQueries || t_id == s_distributorThreadID)) { // if pdns distributes queries, only tid 0 should do this
3362 if(listenOnTCP) {
3363 if(TCPConnection::getCurrentConnections() > maxTcpClients) { // shutdown, too many connections
3364 for(const auto fd : g_tcpListenSockets[t_id]) {
3365 t_fdm->removeReadFD(fd);
3366 }
3367 listenOnTCP=false;
3368 }
3369 }
3370 else {
3371 if(TCPConnection::getCurrentConnections() <= maxTcpClients) { // reenable
3372 for(const auto fd : g_tcpListenSockets[t_id]) {
3373 t_fdm->addReadFD(fd, handleNewTCPQuestion);
3374 }
3375 listenOnTCP=true;
3376 }
3377 }
3378 }
3379 }
3380 }
3381 catch(PDNSException &ae) {
3382 L<<Logger::Error<<"Exception: "<<ae.reason<<endl;
3383 return 0;
3384 }
3385 catch(std::exception &e) {
3386 L<<Logger::Error<<"STL Exception: "<<e.what()<<endl;
3387 return 0;
3388 }
3389 catch(...) {
3390 L<<Logger::Error<<"any other exception in main: "<<endl;
3391 return 0;
3392 }
3393
3394
3395 int main(int argc, char **argv)
3396 {
3397 g_argc = argc;
3398 g_argv = argv;
3399 g_stats.startupTime=time(0);
3400 versionSetProduct(ProductRecursor);
3401 reportBasicTypes();
3402 reportOtherTypes();
3403
3404 int ret = EXIT_SUCCESS;
3405
3406 try {
3407 ::arg().set("stack-size","stack size per mthread")="200000";
3408 ::arg().set("soa-minimum-ttl","Don't change")="0";
3409 ::arg().set("no-shuffle","Don't change")="off";
3410 ::arg().set("local-port","port to listen on")="53";
3411 ::arg().set("local-address","IP addresses to listen on, separated by spaces or commas. Also accepts ports.")="127.0.0.1";
3412 ::arg().setSwitch("non-local-bind", "Enable binding to non-local addresses by using FREEBIND / BINDANY socket options")="no";
3413 ::arg().set("trace","if we should output heaps of logging. set to 'fail' to only log failing domains")="off";
3414 ::arg().set("dnssec", "DNSSEC mode: off/process-no-validate (default)/process/log-fail/validate")="process-no-validate";
3415 ::arg().set("dnssec-log-bogus", "Log DNSSEC bogus validations")="no";
3416 ::arg().set("daemon","Operate as a daemon")="no";
3417 ::arg().setSwitch("write-pid","Write a PID file")="yes";
3418 ::arg().set("loglevel","Amount of logging. Higher is more. Do not set below 3")="6";
3419 ::arg().set("disable-syslog","Disable logging to syslog, useful when running inside a supervisor that logs stdout")="no";
3420 ::arg().set("log-timestamp","Print timestamps in log lines, useful to disable when running with a tool that timestamps stdout already")="yes";
3421 ::arg().set("log-common-errors","If we should log rather common errors")="no";
3422 ::arg().set("chroot","switch to chroot jail")="";
3423 ::arg().set("setgid","If set, change group id to this gid for more security")="";
3424 ::arg().set("setuid","If set, change user id to this uid for more security")="";
3425 ::arg().set("network-timeout", "Wait this number of milliseconds for network i/o")="1500";
3426 ::arg().set("threads", "Launch this number of threads")="2";
3427 ::arg().set("processes", "Launch this number of processes (EXPERIMENTAL, DO NOT CHANGE)")="1"; // if we un-experimental this, need to fix openssl rand seeding for multiple PIDs!
3428 ::arg().set("config-name","Name of this virtual configuration - will rename the binary image")="";
3429 ::arg().set("api-config-dir", "Directory where REST API stores config and zones") = "";
3430 ::arg().set("api-key", "Static pre-shared authentication key for access to the REST API") = "";
3431 ::arg().set("api-logfile", "Location of the server logfile (used by the REST API)") = "/var/log/pdns.log";
3432 ::arg().set("api-readonly", "Disallow data modification through the REST API when set") = "no";
3433 ::arg().setSwitch("webserver", "Start a webserver (for REST API)") = "no";
3434 ::arg().set("webserver-address", "IP Address of webserver to listen on") = "127.0.0.1";
3435 ::arg().set("webserver-port", "Port of webserver to listen on") = "8082";
3436 ::arg().set("webserver-password", "Password required for accessing the webserver") = "";
3437 ::arg().set("webserver-allow-from","Webserver access is only allowed from these subnets")="127.0.0.1,::1";
3438 ::arg().set("carbon-ourname", "If set, overrides our reported hostname for carbon stats")="";
3439 ::arg().set("carbon-server", "If set, send metrics in carbon (graphite) format to this server IP address")="";
3440 ::arg().set("carbon-interval", "Number of seconds between carbon (graphite) updates")="30";
3441 ::arg().set("statistics-interval", "Number of seconds between printing of recursor statistics, 0 to disable")="1800";
3442 ::arg().set("quiet","Suppress logging of questions and answers")="";
3443 ::arg().set("logging-facility","Facility to log messages as. 0 corresponds to local0")="";
3444 ::arg().set("config-dir","Location of configuration directory (recursor.conf)")=SYSCONFDIR;
3445 ::arg().set("socket-owner","Owner of socket")="";
3446 ::arg().set("socket-group","Group of socket")="";
3447 ::arg().set("socket-mode", "Permissions for socket")="";
3448
3449 ::arg().set("socket-dir",string("Where the controlsocket will live, ")+LOCALSTATEDIR+" when unset and not chrooted" )="";
3450 ::arg().set("delegation-only","Which domains we only accept delegations from")="";
3451 ::arg().set("query-local-address","Source IP address for sending queries")="0.0.0.0";
3452 ::arg().set("query-local-address6","Source IPv6 address for sending queries. IF UNSET, IPv6 WILL NOT BE USED FOR OUTGOING QUERIES")="";
3453 ::arg().set("client-tcp-timeout","Timeout in seconds when talking to TCP clients")="2";
3454 ::arg().set("max-mthreads", "Maximum number of simultaneous Mtasker threads")="2048";
3455 ::arg().set("max-tcp-clients","Maximum number of simultaneous TCP clients")="128";
3456 ::arg().set("server-down-max-fails","Maximum number of consecutive timeouts (and unreachables) to mark a server as down ( 0 => disabled )")="64";
3457 ::arg().set("server-down-throttle-time","Number of seconds to throttle all queries to a server after being marked as down")="60";
3458 ::arg().set("hint-file", "If set, load root hints from this file")="";
3459 ::arg().set("max-cache-entries", "If set, maximum number of entries in the main cache")="1000000";
3460 ::arg().set("max-negative-ttl", "maximum number of seconds to keep a negative cached entry in memory")="3600";
3461 ::arg().set("max-cache-ttl", "maximum number of seconds to keep a cached entry in memory")="86400";
3462 ::arg().set("packetcache-ttl", "maximum number of seconds to keep a cached entry in packetcache")="3600";
3463 ::arg().set("max-packetcache-entries", "maximum number of entries to keep in the packetcache")="500000";
3464 ::arg().set("packetcache-servfail-ttl", "maximum number of seconds to keep a cached servfail entry in packetcache")="60";
3465 ::arg().set("server-id", "Returned when queried for 'id.server' TXT or NSID, defaults to hostname")="";
3466 ::arg().set("stats-ringbuffer-entries", "maximum number of packets to store statistics for")="10000";
3467 ::arg().set("version-string", "string reported on version.pdns or version.bind")=fullVersionString();
3468 ::arg().set("allow-from", "If set, only allow these comma separated netmasks to recurse")=LOCAL_NETS;
3469 ::arg().set("allow-from-file", "If set, load allowed netmasks from this file")="";
3470 ::arg().set("entropy-source", "If set, read entropy from this file")="/dev/urandom";
3471 ::arg().set("dont-query", "If set, do not query these netmasks for DNS data")=DONT_QUERY;
3472 ::arg().set("max-tcp-per-client", "If set, maximum number of TCP sessions per client (IP address)")="0";
3473 ::arg().set("max-tcp-queries-per-connection", "If set, maximum number of TCP queries in a TCP connection")="0";
3474 ::arg().set("spoof-nearmiss-max", "If non-zero, assume spoofing after this many near misses")="20";
3475 ::arg().set("single-socket", "If set, only use a single socket for outgoing queries")="off";
3476 ::arg().set("auth-zones", "Zones for which we have authoritative data, comma separated domain=file pairs ")="";
3477 ::arg().set("lua-config-file", "More powerful configuration options")="";
3478
3479 ::arg().set("forward-zones", "Zones for which we forward queries, comma separated domain=ip pairs")="";
3480 ::arg().set("forward-zones-recurse", "Zones for which we forward queries with recursion bit, comma separated domain=ip pairs")="";
3481 ::arg().set("forward-zones-file", "File with (+)domain=ip pairs for forwarding")="";
3482 ::arg().set("export-etc-hosts", "If we should serve up contents from /etc/hosts")="off";
3483 ::arg().set("export-etc-hosts-search-suffix", "Also serve up the contents of /etc/hosts with this suffix")="";
3484 ::arg().set("etc-hosts-file", "Path to 'hosts' file")="/etc/hosts";
3485 ::arg().set("serve-rfc1918", "If we should be authoritative for RFC 1918 private IP space")="yes";
3486 ::arg().set("lua-dns-script", "Filename containing an optional 'lua' script that will be used to modify dns answers")="";
3487 ::arg().set("latency-statistic-size","Number of latency values to calculate the qa-latency average")="10000";
3488 ::arg().setSwitch( "disable-packetcache", "Disable packetcache" )= "no";
3489 ::arg().set("ecs-ipv4-bits", "Number of bits of IPv4 address to pass for EDNS Client Subnet")="24";
3490 ::arg().set("ecs-ipv6-bits", "Number of bits of IPv6 address to pass for EDNS Client Subnet")="56";
3491 ::arg().set("edns-subnet-whitelist", "List of netmasks and domains that we should enable EDNS subnet for")="";
3492 ::arg().set("ecs-scope-zero-address", "Address to send to whitelisted authoritative servers for incoming queries with ECS prefix-length source of 0")="";
3493 ::arg().setSwitch( "use-incoming-edns-subnet", "Pass along received EDNS Client Subnet information")="no";
3494 ::arg().setSwitch( "pdns-distributes-queries", "If PowerDNS itself should distribute queries over threads")="yes";
3495 ::arg().setSwitch( "root-nx-trust", "If set, believe that an NXDOMAIN from the root means the TLD does not exist")="yes";
3496 ::arg().setSwitch( "any-to-tcp","Answer ANY queries with tc=1, shunting to TCP" )="no";
3497 ::arg().setSwitch( "lowercase-outgoing","Force outgoing questions to lowercase")="no";
3498 ::arg().setSwitch("gettag-needs-edns-options", "If EDNS Options should be extracted before calling the gettag() hook")="no";
3499 ::arg().set("udp-truncation-threshold", "Maximum UDP response size before we truncate")="1680";
3500 ::arg().set("edns-outgoing-bufsize", "Outgoing EDNS buffer size")="1680";
3501 ::arg().set("minimum-ttl-override", "Set under adverse conditions, a minimum TTL")="0";
3502 ::arg().set("max-qperq", "Maximum outgoing queries per query")="50";
3503 ::arg().set("max-total-msec", "Maximum total wall-clock time per query in milliseconds, 0 for unlimited")="7000";
3504 ::arg().set("max-recursion-depth", "Maximum number of internal recursion calls per query, 0 for unlimited")="40";
3505 ::arg().set("max-udp-queries-per-round", "Maximum number of UDP queries processed per recvmsg() round, before returning back to normal processing")="10000";
3506
3507 ::arg().set("include-dir","Include *.conf files from this directory")="";
3508 ::arg().set("security-poll-suffix","Domain name from which to query security update notifications")="secpoll.powerdns.com.";
3509
3510 ::arg().setSwitch("reuseport","Enable SO_REUSEPORT allowing multiple recursors processes to listen to 1 address")="no";
3511
3512 ::arg().setSwitch("snmp-agent", "If set, register as an SNMP agent")="no";
3513 ::arg().set("snmp-master-socket", "If set and snmp-agent is set, the socket to use to register to the SNMP master")="";
3514
3515 ::arg().set("tcp-fast-open", "Enable TCP Fast Open support on the listening sockets, using the supplied numerical value as the queue size")="0";
3516 ::arg().set("nsec3-max-iterations", "Maximum number of iterations allowed for an NSEC3 record")="2500";
3517
3518 ::arg().set("cpu-map", "Thread to CPU mapping, space separated thread-id=cpu1,cpu2..cpuN pairs")="";
3519
3520 ::arg().setSwitch("log-rpz-changes", "Log additions and removals to RPZ zones at Info level")="no";
3521
3522 ::arg().setCmd("help","Provide a helpful message");
3523 ::arg().setCmd("version","Print version string");
3524 ::arg().setCmd("config","Output blank configuration");
3525 L.toConsole(Logger::Info);
3526 ::arg().laxParse(argc,argv); // do a lax parse
3527
3528 string configname=::arg()["config-dir"]+"/recursor.conf";
3529 if(::arg()["config-name"]!="") {
3530 configname=::arg()["config-dir"]+"/recursor-"+::arg()["config-name"]+".conf";
3531 s_programname+="-"+::arg()["config-name"];
3532 }
3533 cleanSlashes(configname);
3534
3535 if(::arg().mustDo("config")) {
3536 cout<<::arg().configstring()<<endl;
3537 exit(0);
3538 }
3539
3540 if(!::arg().file(configname.c_str()))
3541 L<<Logger::Warning<<"Unable to parse configuration file '"<<configname<<"'"<<endl;
3542
3543 ::arg().parse(argc,argv);
3544
3545 if( !::arg()["chroot"].empty() && !::arg()["api-config-dir"].empty() && !::arg().mustDo("api-readonly") ) {
3546 L<<Logger::Error<<"Using chroot and a writable API is not possible"<<endl;
3547 exit(EXIT_FAILURE);
3548 }
3549
3550 if (::arg()["socket-dir"].empty()) {
3551 if (::arg()["chroot"].empty())
3552 ::arg().set("socket-dir") = LOCALSTATEDIR;
3553 else
3554 ::arg().set("socket-dir") = "/";
3555 }
3556
3557 ::arg().set("delegation-only")=toLower(::arg()["delegation-only"]);
3558
3559 if(::arg().asNum("threads")==1)
3560 ::arg().set("pdns-distributes-queries")="no";
3561
3562 if(::arg().mustDo("help")) {
3563 cout<<"syntax:"<<endl<<endl;
3564 cout<<::arg().helpstring(::arg()["help"])<<endl;
3565 exit(0);
3566 }
3567 if(::arg().mustDo("version")) {
3568 showProductVersion();
3569 showBuildConfiguration();
3570 exit(0);
3571 }
3572
3573 Logger::Urgency logUrgency = (Logger::Urgency)::arg().asNum("loglevel");
3574
3575 if (logUrgency < Logger::Error)
3576 logUrgency = Logger::Error;
3577 if(!g_quiet && logUrgency < Logger::Info) { // Logger::Info=6, Logger::Debug=7
3578 logUrgency = Logger::Info; // if you do --quiet=no, you need Info to also see the query log
3579 }
3580 L.setLoglevel(logUrgency);
3581 L.toConsole(logUrgency);
3582
3583 serviceMain(argc, argv);
3584 }
3585 catch(PDNSException &ae) {
3586 L<<Logger::Error<<"Exception: "<<ae.reason<<endl;
3587 ret=EXIT_FAILURE;
3588 }
3589 catch(std::exception &e) {
3590 L<<Logger::Error<<"STL Exception: "<<e.what()<<endl;
3591 ret=EXIT_FAILURE;
3592 }
3593 catch(...) {
3594 L<<Logger::Error<<"any other exception in main: "<<endl;
3595 ret=EXIT_FAILURE;
3596 }
3597
3598 return ret;
3599 }