]> git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/pdns_recursor.cc
rec: Every thread listen on all TCP sockets without reuseport/distributes
[thirdparty/pdns.git] / pdns / pdns_recursor.cc
1 /*
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
8 *
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
22 #ifdef HAVE_CONFIG_H
23 #include "config.h"
24 #endif
25
26 #include <netdb.h>
27 #include <sys/stat.h>
28 #include <unistd.h>
29
30 #include "recpacketcache.hh"
31 #include "ws-recursor.hh"
32 #include <pthread.h>
33 #include "utility.hh"
34 #include "dns_random.hh"
35 #ifdef HAVE_LIBSODIUM
36 #include <sodium.h>
37 #endif
38 #include "opensslsigners.hh"
39 #include <iostream>
40 #include <errno.h>
41 #include <boost/static_assert.hpp>
42 #include <map>
43 #include <set>
44 #include "recursor_cache.hh"
45 #include "cachecleaner.hh"
46 #include <stdio.h>
47 #include <signal.h>
48 #include <stdlib.h>
49 #include "misc.hh"
50 #include "mtasker.hh"
51 #include <utility>
52 #include "arguments.hh"
53 #include "syncres.hh"
54 #include <fcntl.h>
55 #include <fstream>
56 #include "sortlist.hh"
57 #include "sstuff.hh"
58 #include <boost/tuple/tuple.hpp>
59 #include <boost/tuple/tuple_comparison.hpp>
60 #include <boost/shared_array.hpp>
61 #include <boost/function.hpp>
62 #include <boost/algorithm/string.hpp>
63 #ifdef MALLOC_TRACE
64 #include "malloctrace.hh"
65 #endif
66 #include <netinet/tcp.h>
67 #include "dnsparser.hh"
68 #include "dnswriter.hh"
69 #include "dnsrecords.hh"
70 #include "zoneparser-tng.hh"
71 #include "rec_channel.hh"
72 #include "logger.hh"
73 #include "iputils.hh"
74 #include "mplexer.hh"
75 #include "config.h"
76 #include "lua-recursor4.hh"
77 #include "version.hh"
78 #include "responsestats.hh"
79 #include "secpoll-recursor.hh"
80 #include "dnsname.hh"
81 #include "filterpo.hh"
82 #include "rpzloader.hh"
83 #include "validate-recursor.hh"
84 #include "rec-lua-conf.hh"
85 #include "ednsoptions.hh"
86 #include "gettime.hh"
87
88 #include "rec-protobuf.hh"
89 #include "rec-snmp.hh"
90
91 #ifdef HAVE_SYSTEMD
92 #include <systemd/sd-daemon.h>
93 #endif
94
95 #include "namespaces.hh"
96
97 typedef map<ComboAddress, uint32_t, ComboAddress::addressOnlyLessThan> tcpClientCounts_t;
98
99 static thread_local std::shared_ptr<RecursorLua4> t_pdl;
100 static thread_local int t_id = -1;
101 static thread_local std::shared_ptr<Regex> t_traceRegex;
102 static thread_local std::unique_ptr<tcpClientCounts_t> t_tcpClientCounts;
103
104 thread_local std::unique_ptr<MT_t> MT; // the big MTasker
105 thread_local std::unique_ptr<MemRecursorCache> t_RC;
106 thread_local std::unique_ptr<RecursorPacketCache> t_packetCache;
107 thread_local FDMultiplexer* t_fdm{nullptr};
108 thread_local std::unique_ptr<addrringbuf_t> t_remotes, t_servfailremotes, t_largeanswerremotes;
109 thread_local std::unique_ptr<boost::circular_buffer<pair<DNSName, uint16_t> > > t_queryring, t_servfailqueryring;
110 thread_local std::shared_ptr<NetmaskGroup> t_allowFrom;
111 #ifdef HAVE_PROTOBUF
112 thread_local std::unique_ptr<boost::uuids::random_generator> t_uuidGenerator;
113 #endif
114 __thread struct timeval g_now; // timestamp, updated (too) frequently
115
116 // for communicating with our threads
117 struct ThreadPipeSet
118 {
119 int writeToThread;
120 int readToThread;
121 int writeFromThread;
122 int readFromThread;
123 int writeQueriesToThread; // this one is non-blocking
124 int readQueriesToThread;
125 };
126
127 /* the TID of the thread handling the web server, carbon, statistics and the control channel */
128 static const int s_handlerThreadID = -1;
129 /* when pdns-distributes-queries is set, the TID of the thread handling, hashing and distributing new queries
130 to the other threads */
131 static const int s_distributorThreadID = 0;
132
133 typedef std::map<int, std::set<int>> tcpListenSockets_t;
134 typedef map<int, ComboAddress> listenSocketsAddresses_t; // is shared across all threads right now
135
136 typedef vector<pair<int, function< void(int, any&) > > > deferredAdd_t;
137
138 static const ComboAddress g_local4("0.0.0.0"), g_local6("::");
139 static vector<ThreadPipeSet> g_pipes; // effectively readonly after startup
140 static tcpListenSockets_t g_tcpListenSockets; // shared across threads, but this is fine, never written to from a thread. All threads listen on all sockets
141 static listenSocketsAddresses_t g_listenSocketsAddresses; // is shared across all threads right now
142 static std::unordered_map<unsigned int, deferredAdd_t> deferredAdds;
143 static set<int> g_fromtosockets; // listen sockets that use 'sendfromto()' mechanism
144 static vector<ComboAddress> g_localQueryAddresses4, g_localQueryAddresses6;
145 static AtomicCounter counter;
146 static std::shared_ptr<SyncRes::domainmap_t> g_initialDomainMap; // new threads needs this to be setup
147 static std::shared_ptr<NetmaskGroup> g_initialAllowFrom; // new thread needs to be setup with this
148 static size_t g_tcpMaxQueriesPerConn;
149 static uint64_t g_latencyStatSize;
150 static uint32_t g_disthashseed;
151 static unsigned int g_maxTCPPerClient;
152 static unsigned int g_networkTimeoutMsec;
153 static unsigned int g_maxMThreads;
154 static unsigned int g_numWorkerThreads;
155 static int g_tcpTimeout;
156 static uint16_t g_udpTruncationThreshold;
157 static std::atomic<bool> statsWanted;
158 static std::atomic<bool> g_quiet;
159 static bool g_logCommonErrors;
160 static bool g_anyToTcp;
161 static bool g_weDistributeQueries; // if true, only 1 thread listens on the incoming query sockets
162 static bool g_reusePort{false};
163 static bool g_useOneSocketPerThread;
164 static bool g_gettagNeedsEDNSOptions{false};
165 static time_t g_statisticsInterval;
166 static bool g_useIncomingECS;
167 std::atomic<uint32_t> g_maxCacheEntries, g_maxPacketCacheEntries;
168
169 RecursorControlChannel s_rcc; // only active in thread 0
170 RecursorStats g_stats;
171 string s_programname="pdns_recursor";
172 string s_pidfname;
173 bool g_lowercaseOutgoing;
174 unsigned int g_numThreads;
175 uint16_t g_outgoingEDNSBufsize;
176 bool g_logRPZChanges{false};
177
178 #define LOCAL_NETS "127.0.0.0/8, 10.0.0.0/8, 100.64.0.0/10, 169.254.0.0/16, 192.168.0.0/16, 172.16.0.0/12, ::1/128, fc00::/7, fe80::/10"
179 // Bad Nets taken from both:
180 // http://www.iana.org/assignments/iana-ipv4-special-registry/iana-ipv4-special-registry.xhtml
181 // and
182 // http://www.iana.org/assignments/iana-ipv6-special-registry/iana-ipv6-special-registry.xhtml
183 // where such a network may not be considered a valid destination
184 #define BAD_NETS "0.0.0.0/8, 192.0.0.0/24, 192.0.2.0/24, 198.51.100.0/24, 203.0.113.0/24, 240.0.0.0/4, ::/96, ::ffff:0:0/96, 100::/64, 2001:db8::/32"
185 #define DONT_QUERY LOCAL_NETS ", " BAD_NETS
186
187 //! used to send information to a newborn mthread
188 struct DNSComboWriter {
189 DNSComboWriter(const char* data, uint16_t len, const struct timeval& now) : d_mdp(true, data, len), d_now(now),
190 d_tcp(false), d_socket(-1)
191 {}
192 MOADNSParser d_mdp;
193 void setRemote(const ComboAddress* sa)
194 {
195 d_remote=*sa;
196 }
197
198 void setLocal(const ComboAddress& sa)
199 {
200 d_local=sa;
201 }
202
203
204 void setSocket(int sock)
205 {
206 d_socket=sock;
207 }
208
209 string getRemote() const
210 {
211 return d_remote.toString();
212 }
213
214 struct timeval d_now;
215 ComboAddress d_remote, d_local;
216 #ifdef HAVE_PROTOBUF
217 boost::uuids::uuid d_uuid;
218 string d_requestorId;
219 string d_deviceId;
220 #endif
221 EDNSSubnetOpts d_ednssubnet;
222 bool d_ecsFound{false};
223 bool d_ecsParsed{false};
224 bool d_tcp;
225 int d_socket;
226 unsigned int d_tag{0};
227 uint32_t d_qhash{0};
228 string d_query;
229 shared_ptr<TCPConnection> d_tcpConnection;
230 vector<pair<uint16_t, string> > d_ednsOpts;
231 std::vector<std::string> d_policyTags;
232 LuaContext::LuaObject d_data;
233 uint32_t d_ttlCap{std::numeric_limits<uint32_t>::max()};
234 bool d_variable{false};
235 };
236
237 MT_t* getMT()
238 {
239 return MT ? MT.get() : nullptr;
240 }
241
242 ArgvMap &arg()
243 {
244 static ArgvMap theArg;
245 return theArg;
246 }
247
248 unsigned int getRecursorThreadId()
249 {
250 return static_cast<unsigned int>(t_id);
251 }
252
253 int getMTaskerTID()
254 {
255 return MT->getTid();
256 }
257
258 static void handleTCPClientWritable(int fd, FDMultiplexer::funcparam_t& var);
259
260 // -1 is error, 0 is timeout, 1 is success
261 int asendtcp(const string& data, Socket* sock)
262 {
263 PacketID pident;
264 pident.sock=sock;
265 pident.outMSG=data;
266
267 t_fdm->addWriteFD(sock->getHandle(), handleTCPClientWritable, pident);
268 string packet;
269
270 int ret=MT->waitEvent(pident, &packet, g_networkTimeoutMsec);
271
272 if(!ret || ret==-1) { // timeout
273 t_fdm->removeWriteFD(sock->getHandle());
274 }
275 else if(packet.size() !=data.size()) { // main loop tells us what it sent out, or empty in case of an error
276 return -1;
277 }
278 return ret;
279 }
280
281 static void handleTCPClientReadable(int fd, FDMultiplexer::funcparam_t& var);
282
283 // -1 is error, 0 is timeout, 1 is success
284 int arecvtcp(string& data, size_t len, Socket* sock, bool incompleteOkay)
285 {
286 data.clear();
287 PacketID pident;
288 pident.sock=sock;
289 pident.inNeeded=len;
290 pident.inIncompleteOkay=incompleteOkay;
291 t_fdm->addReadFD(sock->getHandle(), handleTCPClientReadable, pident);
292
293 int ret=MT->waitEvent(pident,&data, g_networkTimeoutMsec);
294 if(!ret || ret==-1) { // timeout
295 t_fdm->removeReadFD(sock->getHandle());
296 }
297 else if(data.empty()) {// error, EOF or other
298 return -1;
299 }
300
301 return ret;
302 }
303
304 static void handleGenUDPQueryResponse(int fd, FDMultiplexer::funcparam_t& var)
305 {
306 PacketID pident=*any_cast<PacketID>(&var);
307 char resp[512];
308 ssize_t ret=recv(fd, resp, sizeof(resp), 0);
309 t_fdm->removeReadFD(fd);
310 if(ret >= 0) {
311 string data(resp, (size_t) ret);
312 MT->sendEvent(pident, &data);
313 }
314 else {
315 string empty;
316 MT->sendEvent(pident, &empty);
317 // cerr<<"Had some kind of error: "<<ret<<", "<<strerror(errno)<<endl;
318 }
319 }
320 string GenUDPQueryResponse(const ComboAddress& dest, const string& query)
321 {
322 Socket s(dest.sin4.sin_family, SOCK_DGRAM);
323 s.setNonBlocking();
324 ComboAddress local = getQueryLocalAddress(dest.sin4.sin_family, 0);
325
326 s.bind(local);
327 s.connect(dest);
328 s.send(query);
329
330 PacketID pident;
331 pident.sock=&s;
332 pident.type=0;
333 t_fdm->addReadFD(s.getHandle(), handleGenUDPQueryResponse, pident);
334
335 string data;
336
337 int ret=MT->waitEvent(pident,&data, g_networkTimeoutMsec);
338
339 if(!ret || ret==-1) { // timeout
340 t_fdm->removeReadFD(s.getHandle());
341 }
342 else if(data.empty()) {// error, EOF or other
343 // we could special case this
344 return data;
345 }
346 return data;
347 }
348
349 //! pick a random query local address
350 ComboAddress getQueryLocalAddress(int family, uint16_t port)
351 {
352 ComboAddress ret;
353 if(family==AF_INET) {
354 if(g_localQueryAddresses4.empty())
355 ret = g_local4;
356 else
357 ret = g_localQueryAddresses4[dns_random(g_localQueryAddresses4.size())];
358 ret.sin4.sin_port = htons(port);
359 }
360 else {
361 if(g_localQueryAddresses6.empty())
362 ret = g_local6;
363 else
364 ret = g_localQueryAddresses6[dns_random(g_localQueryAddresses6.size())];
365
366 ret.sin6.sin6_port = htons(port);
367 }
368 return ret;
369 }
370
371 static void handleUDPServerResponse(int fd, FDMultiplexer::funcparam_t&);
372
373 static void setSocketBuffer(int fd, int optname, uint32_t size)
374 {
375 uint32_t psize=0;
376 socklen_t len=sizeof(psize);
377
378 if(!getsockopt(fd, SOL_SOCKET, optname, (char*)&psize, &len) && psize > size) {
379 L<<Logger::Error<<"Not decreasing socket buffer size from "<<psize<<" to "<<size<<endl;
380 return;
381 }
382
383 if (setsockopt(fd, SOL_SOCKET, optname, (char*)&size, sizeof(size)) < 0 )
384 L<<Logger::Error<<"Unable to raise socket buffer size to "<<size<<": "<<strerror(errno)<<endl;
385 }
386
387
388 static void setSocketReceiveBuffer(int fd, uint32_t size)
389 {
390 setSocketBuffer(fd, SO_RCVBUF, size);
391 }
392
393 static void setSocketSendBuffer(int fd, uint32_t size)
394 {
395 setSocketBuffer(fd, SO_SNDBUF, size);
396 }
397
398
399 // you can ask this class for a UDP socket to send a query from
400 // this socket is not yours, don't even think about deleting it
401 // but after you call 'returnSocket' on it, don't assume anything anymore
402 class UDPClientSocks
403 {
404 unsigned int d_numsocks;
405 public:
406 UDPClientSocks() : d_numsocks(0)
407 {
408 }
409
410 typedef set<int> socks_t;
411 socks_t d_socks;
412
413 // returning -2 means: temporary OS error (ie, out of files), -1 means error related to remote
414 int getSocket(const ComboAddress& toaddr, int* fd)
415 {
416 *fd=makeClientSocket(toaddr.sin4.sin_family);
417 if(*fd < 0) // temporary error - receive exception otherwise
418 return -2;
419
420 if(connect(*fd, (struct sockaddr*)(&toaddr), toaddr.getSocklen()) < 0) {
421 int err = errno;
422 // returnSocket(*fd);
423 try {
424 closesocket(*fd);
425 }
426 catch(const PDNSException& e) {
427 L<<Logger::Error<<"Error closing UDP socket after connect() failed: "<<e.reason<<endl;
428 }
429
430 if(err==ENETUNREACH) // Seth "My Interfaces Are Like A Yo Yo" Arnold special
431 return -2;
432 return -1;
433 }
434
435 d_socks.insert(*fd);
436 d_numsocks++;
437 return 0;
438 }
439
440 void returnSocket(int fd)
441 {
442 socks_t::iterator i=d_socks.find(fd);
443 if(i==d_socks.end()) {
444 throw PDNSException("Trying to return a socket (fd="+std::to_string(fd)+") not in the pool");
445 }
446 returnSocketLocked(i);
447 }
448
449 // return a socket to the pool, or simply erase it
450 void returnSocketLocked(socks_t::iterator& i)
451 {
452 if(i==d_socks.end()) {
453 throw PDNSException("Trying to return a socket not in the pool");
454 }
455 try {
456 t_fdm->removeReadFD(*i);
457 }
458 catch(FDMultiplexerException& e) {
459 // we sometimes return a socket that has not yet been assigned to t_fdm
460 }
461 try {
462 closesocket(*i);
463 }
464 catch(const PDNSException& e) {
465 L<<Logger::Error<<"Error closing returned UDP socket: "<<e.reason<<endl;
466 }
467
468 d_socks.erase(i++);
469 --d_numsocks;
470 }
471
472 // returns -1 for errors which might go away, throws for ones that won't
473 static int makeClientSocket(int family)
474 {
475 int ret=socket(family, SOCK_DGRAM, 0 ); // turns out that setting CLO_EXEC and NONBLOCK from here is not a performance win on Linux (oddly enough)
476
477 if(ret < 0 && errno==EMFILE) // this is not a catastrophic error
478 return ret;
479
480 if(ret<0)
481 throw PDNSException("Making a socket for resolver (family = "+std::to_string(family)+"): "+stringerror());
482
483 // setCloseOnExec(ret); // we're not going to exec
484
485 int tries=10;
486 ComboAddress sin;
487 while(--tries) {
488 uint16_t port;
489
490 if(tries==1) // fall back to kernel 'random'
491 port = 0;
492 else
493 port = 1025 + dns_random(64510);
494
495 sin=getQueryLocalAddress(family, port); // does htons for us
496
497 if (::bind(ret, (struct sockaddr *)&sin, sin.getSocklen()) >= 0)
498 break;
499 }
500 if(!tries)
501 throw PDNSException("Resolver binding to local query client socket on "+sin.toString()+": "+stringerror());
502
503 setNonBlocking(ret);
504 return ret;
505 }
506 };
507
508 static thread_local std::unique_ptr<UDPClientSocks> t_udpclientsocks;
509
510 /* these two functions are used by LWRes */
511 // -2 is OS error, -1 is error that depends on the remote, > 0 is success
512 int asendto(const char *data, size_t len, int flags,
513 const ComboAddress& toaddr, uint16_t id, const DNSName& domain, uint16_t qtype, int* fd)
514 {
515
516 PacketID pident;
517 pident.domain = domain;
518 pident.remote = toaddr;
519 pident.type = qtype;
520
521 // see if there is an existing outstanding request we can chain on to, using partial equivalence function
522 pair<MT_t::waiters_t::iterator, MT_t::waiters_t::iterator> chain=MT->d_waiters.equal_range(pident, PacketIDBirthdayCompare());
523
524 for(; chain.first != chain.second; chain.first++) {
525 if(chain.first->key.fd > -1) { // don't chain onto existing chained waiter!
526 /*
527 cerr<<"Orig: "<<pident.domain<<", "<<pident.remote.toString()<<", id="<<id<<endl;
528 cerr<<"Had hit: "<< chain.first->key.domain<<", "<<chain.first->key.remote.toString()<<", id="<<chain.first->key.id
529 <<", count="<<chain.first->key.chain.size()<<", origfd: "<<chain.first->key.fd<<endl;
530 */
531 chain.first->key.chain.insert(id); // we can chain
532 *fd=-1; // gets used in waitEvent / sendEvent later on
533 return 1;
534 }
535 }
536
537 int ret=t_udpclientsocks->getSocket(toaddr, fd);
538 if(ret < 0)
539 return ret;
540
541 pident.fd=*fd;
542 pident.id=id;
543
544 t_fdm->addReadFD(*fd, handleUDPServerResponse, pident);
545 ret = send(*fd, data, len, 0);
546
547 int tmp = errno;
548
549 if(ret < 0)
550 t_udpclientsocks->returnSocket(*fd);
551
552 errno = tmp; // this is for logging purposes only
553 return ret;
554 }
555
556 // -1 is error, 0 is timeout, 1 is success
557 int arecvfrom(char *data, size_t len, int flags, const ComboAddress& fromaddr, size_t *d_len,
558 uint16_t id, const DNSName& domain, uint16_t qtype, int fd, struct timeval* now)
559 {
560 static optional<unsigned int> nearMissLimit;
561 if(!nearMissLimit)
562 nearMissLimit=::arg().asNum("spoof-nearmiss-max");
563
564 PacketID pident;
565 pident.fd=fd;
566 pident.id=id;
567 pident.domain=domain;
568 pident.type = qtype;
569 pident.remote=fromaddr;
570
571 string packet;
572 int ret=MT->waitEvent(pident, &packet, g_networkTimeoutMsec, now);
573
574 if(ret > 0) {
575 if(packet.empty()) // means "error"
576 return -1;
577
578 *d_len=packet.size();
579 memcpy(data,packet.c_str(),min(len,*d_len));
580 if(*nearMissLimit && pident.nearMisses > *nearMissLimit) {
581 L<<Logger::Error<<"Too many ("<<pident.nearMisses<<" > "<<*nearMissLimit<<") bogus answers for '"<<domain<<"' from "<<fromaddr.toString()<<", assuming spoof attempt."<<endl;
582 g_stats.spoofCount++;
583 return -1;
584 }
585 }
586 else {
587 if(fd >= 0)
588 t_udpclientsocks->returnSocket(fd);
589 }
590 return ret;
591 }
592
593 static void writePid(void)
594 {
595 if(!::arg().mustDo("write-pid"))
596 return;
597 ofstream of(s_pidfname.c_str(), std::ios_base::app);
598 if(of)
599 of<< Utility::getpid() <<endl;
600 else
601 L<<Logger::Error<<"Writing pid for "<<Utility::getpid()<<" to "<<s_pidfname<<" failed: "<<strerror(errno)<<endl;
602 }
603
604 TCPConnection::TCPConnection(int fd, const ComboAddress& addr) : d_remote(addr), d_fd(fd)
605 {
606 ++s_currentConnections;
607 (*t_tcpClientCounts)[d_remote]++;
608 }
609
610 TCPConnection::~TCPConnection()
611 {
612 try {
613 if(closesocket(d_fd) < 0)
614 L<<Logger::Error<<"Error closing socket for TCPConnection"<<endl;
615 }
616 catch(const PDNSException& e) {
617 L<<Logger::Error<<"Error closing TCPConnection socket: "<<e.reason<<endl;
618 }
619
620 if(t_tcpClientCounts->count(d_remote) && !(*t_tcpClientCounts)[d_remote]--)
621 t_tcpClientCounts->erase(d_remote);
622 --s_currentConnections;
623 }
624
625 AtomicCounter TCPConnection::s_currentConnections;
626
627 static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var);
628
629 // the idea is, only do things that depend on the *response* here. Incoming accounting is on incoming.
630 static void updateResponseStats(int res, const ComboAddress& remote, unsigned int packetsize, const DNSName* query, uint16_t qtype)
631 {
632 if(packetsize > 1000 && t_largeanswerremotes)
633 t_largeanswerremotes->push_back(remote);
634 switch(res) {
635 case RCode::ServFail:
636 if(t_servfailremotes) {
637 t_servfailremotes->push_back(remote);
638 if(query && t_servfailqueryring) // packet cache
639 t_servfailqueryring->push_back(make_pair(*query, qtype));
640 }
641 g_stats.servFails++;
642 break;
643 case RCode::NXDomain:
644 g_stats.nxDomains++;
645 break;
646 case RCode::NoError:
647 g_stats.noErrors++;
648 break;
649 }
650 }
651
652 static string makeLoginfo(DNSComboWriter* dc)
653 try
654 {
655 return "("+dc->d_mdp.d_qname.toLogString()+"/"+DNSRecordContent::NumberToType(dc->d_mdp.d_qtype)+" from "+(dc->d_remote.toString())+")";
656 }
657 catch(...)
658 {
659 return "Exception making error message for exception";
660 }
661
662 #ifdef HAVE_PROTOBUF
663 static void protobufLogQuery(const std::shared_ptr<RemoteLogger>& logger, uint8_t maskV4, uint8_t maskV6, const boost::uuids::uuid& uniqueId, const ComboAddress& remote, const ComboAddress& local, const Netmask& ednssubnet, bool tcp, uint16_t id, size_t len, const DNSName& qname, uint16_t qtype, uint16_t qclass, const std::vector<std::string>& policyTags, const std::string& requestorId, const std::string& deviceId)
664 {
665 Netmask requestorNM(remote, remote.sin4.sin_family == AF_INET ? maskV4 : maskV6);
666 const ComboAddress& requestor = requestorNM.getMaskedNetwork();
667 RecProtoBufMessage message(DNSProtoBufMessage::Query, uniqueId, &requestor, &local, qname, qtype, qclass, id, tcp, len);
668 message.setEDNSSubnet(ednssubnet, ednssubnet.isIpv4() ? maskV4 : maskV6);
669 message.setRequestorId(requestorId);
670 message.setDeviceId(deviceId);
671
672 if (!policyTags.empty()) {
673 message.setPolicyTags(policyTags);
674 }
675
676 // cerr <<message.toDebugString()<<endl;
677 std::string str;
678 message.serialize(str);
679 logger->queueData(str);
680 }
681
682 static void protobufLogResponse(const std::shared_ptr<RemoteLogger>& logger, const RecProtoBufMessage& message)
683 {
684 // cerr <<message.toDebugString()<<endl;
685 std::string str;
686 message.serialize(str);
687 logger->queueData(str);
688 }
689 #endif
690
691 /**
692 * Chases the CNAME provided by the PolicyCustom RPZ policy.
693 *
694 * @param spoofed: The DNSRecord that was created by the policy, should already be added to ret
695 * @param qtype: The QType of the original query
696 * @param sr: A SyncRes
697 * @param res: An integer that will contain the RCODE of the lookup we do
698 * @param ret: A vector of DNSRecords where the result of the CNAME chase should be appended to
699 */
700 static void handleRPZCustom(const DNSRecord& spoofed, const QType& qtype, SyncRes& sr, int& res, vector<DNSRecord>& ret)
701 {
702 if (spoofed.d_type == QType::CNAME) {
703 bool oldWantsRPZ = sr.getWantsRPZ();
704 sr.setWantsRPZ(false);
705 vector<DNSRecord> ans;
706 res = sr.beginResolve(DNSName(spoofed.d_content->getZoneRepresentation()), qtype, 1, ans);
707 for (const auto& rec : ans) {
708 if(rec.d_place == DNSResourceRecord::ANSWER) {
709 ret.push_back(rec);
710 }
711 }
712 // Reset the RPZ state of the SyncRes
713 sr.setWantsRPZ(oldWantsRPZ);
714 }
715 }
716
717 static bool addRecordToPacket(DNSPacketWriter& pw, const DNSRecord& rec, uint32_t& minTTL, uint32_t ttlCap, const uint16_t maxAnswerSize)
718 {
719 pw.startRecord(rec.d_name, rec.d_type, (rec.d_ttl > ttlCap ? ttlCap : rec.d_ttl), rec.d_class, rec.d_place);
720
721 if(rec.d_type != QType::OPT) // their TTL ain't real
722 minTTL = min(minTTL, rec.d_ttl);
723
724 rec.d_content->toPacket(pw);
725 if(pw.size() > static_cast<size_t>(maxAnswerSize)) {
726 pw.rollback();
727 if(rec.d_place != DNSResourceRecord::ADDITIONAL) {
728 pw.getHeader()->tc=1;
729 pw.truncate();
730 }
731 return false;
732 }
733
734 return true;
735 }
736
737 static void startDoResolve(void *p)
738 {
739 DNSComboWriter* dc=(DNSComboWriter *)p;
740 try {
741 if (t_queryring)
742 t_queryring->push_back(make_pair(dc->d_mdp.d_qname, dc->d_mdp.d_qtype));
743
744 uint16_t maxanswersize = dc->d_tcp ? 65535 : min(static_cast<uint16_t>(512), g_udpTruncationThreshold);
745 EDNSOpts edo;
746 bool haveEDNS=false;
747 if(getEDNSOpts(dc->d_mdp, &edo)) {
748 if(!dc->d_tcp) {
749 /* rfc6891 6.2.3:
750 "Values lower than 512 MUST be treated as equal to 512."
751 */
752 maxanswersize = min(static_cast<uint16_t>(edo.d_packetsize >= 512 ? edo.d_packetsize : 512), g_udpTruncationThreshold);
753 }
754 dc->d_ednsOpts = edo.d_options;
755 haveEDNS=true;
756
757 if (g_useIncomingECS && !dc->d_ecsParsed) {
758 for (const auto& o : edo.d_options) {
759 if (o.first == EDNSOptionCode::ECS) {
760 dc->d_ecsFound = getEDNSSubnetOptsFromString(o.second, &dc->d_ednssubnet);
761 break;
762 }
763 }
764 }
765 }
766 /* perhaps there was no EDNS or no ECS but by now we looked */
767 dc->d_ecsParsed = true;
768 vector<DNSRecord> ret;
769 vector<uint8_t> packet;
770
771 auto luaconfsLocal = g_luaconfs.getLocal();
772 // Used to tell syncres later on if we should apply NSDNAME and NSIP RPZ triggers for this query
773 bool wantsRPZ(true);
774 RecProtoBufMessage pbMessage(RecProtoBufMessage::Response);
775 #ifdef HAVE_PROTOBUF
776 if (luaconfsLocal->protobufServer) {
777 Netmask requestorNM(dc->d_remote, dc->d_remote.sin4.sin_family == AF_INET ? luaconfsLocal->protobufMaskV4 : luaconfsLocal->protobufMaskV6);
778 const ComboAddress& requestor = requestorNM.getMaskedNetwork();
779 pbMessage.update(dc->d_uuid, &requestor, &dc->d_local, dc->d_tcp, dc->d_mdp.d_header.id);
780 pbMessage.setEDNSSubnet(dc->d_ednssubnet.source, dc->d_ednssubnet.source.isIpv4() ? luaconfsLocal->protobufMaskV4 : luaconfsLocal->protobufMaskV6);
781 pbMessage.setQuestion(dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_mdp.d_qclass);
782 }
783 #endif /* HAVE_PROTOBUF */
784
785 DNSPacketWriter pw(packet, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_mdp.d_qclass);
786
787 pw.getHeader()->aa=0;
788 pw.getHeader()->ra=1;
789 pw.getHeader()->qr=1;
790 pw.getHeader()->tc=0;
791 pw.getHeader()->id=dc->d_mdp.d_header.id;
792 pw.getHeader()->rd=dc->d_mdp.d_header.rd;
793 pw.getHeader()->cd=dc->d_mdp.d_header.cd;
794
795 /* This is the lowest TTL seen in the records of the response,
796 so we can't cache it for longer than this value.
797 If we have a TTL cap, this value can't be larger than the
798 cap no matter what. */
799 uint32_t minTTL = dc->d_ttlCap;
800
801 SyncRes sr(dc->d_now);
802
803 bool DNSSECOK=false;
804 if(t_pdl) {
805 sr.setLuaEngine(t_pdl);
806 }
807 sr.d_requestor=dc->d_remote; // ECS needs this too
808 if(g_dnssecmode != DNSSECMode::Off) {
809 sr.setDoDNSSEC(true);
810
811 // Does the requestor want DNSSEC records?
812 if(edo.d_Z & EDNSOpts::DNSSECOK) {
813 DNSSECOK=true;
814 g_stats.dnssecQueries++;
815 }
816 } else {
817 // Ignore the client-set CD flag
818 pw.getHeader()->cd=0;
819 }
820 sr.setDNSSECValidationRequested(g_dnssecmode == DNSSECMode::ValidateAll || g_dnssecmode==DNSSECMode::ValidateForLog || ((dc->d_mdp.d_header.ad || DNSSECOK) && g_dnssecmode==DNSSECMode::Process));
821
822 #ifdef HAVE_PROTOBUF
823 sr.setInitialRequestId(dc->d_uuid);
824 #endif
825
826 if (g_useIncomingECS) {
827 sr.setIncomingECSFound(dc->d_ecsFound);
828 if (dc->d_ecsFound) {
829 sr.setIncomingECS(dc->d_ednssubnet);
830 }
831 }
832
833 bool tracedQuery=false; // we could consider letting Lua know about this too
834 bool variableAnswer = dc->d_variable;
835 bool shouldNotValidate = false;
836
837 /* preresolve expects res (dq.rcode) to be set to RCode::NoError by default */
838 int res = RCode::NoError;
839 DNSFilterEngine::Policy appliedPolicy;
840 DNSRecord spoofed;
841 RecursorLua4::DNSQuestion dq(dc->d_remote, dc->d_local, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_tcp, variableAnswer, wantsRPZ);
842 dq.ednsFlags = &edo.d_Z;
843 dq.ednsOptions = &dc->d_ednsOpts;
844 dq.tag = dc->d_tag;
845 dq.discardedPolicies = &sr.d_discardedPolicies;
846 dq.policyTags = &dc->d_policyTags;
847 dq.appliedPolicy = &appliedPolicy;
848 dq.currentRecords = &ret;
849 dq.dh = &dc->d_mdp.d_header;
850 dq.data = dc->d_data;
851 #ifdef HAVE_PROTOBUF
852 dq.requestorId = dc->d_requestorId;
853 dq.deviceId = dc->d_deviceId;
854 #endif
855
856 if(dc->d_mdp.d_qtype==QType::ANY && !dc->d_tcp && g_anyToTcp) {
857 pw.getHeader()->tc = 1;
858 res = 0;
859 variableAnswer = true;
860 goto sendit;
861 }
862
863 if(t_traceRegex && t_traceRegex->match(dc->d_mdp.d_qname.toString())) {
864 sr.setLogMode(SyncRes::Store);
865 tracedQuery=true;
866 }
867
868
869 if(!g_quiet || tracedQuery) {
870 L<<Logger::Warning<<t_id<<" ["<<MT->getTid()<<"/"<<MT->numProcesses()<<"] " << (dc->d_tcp ? "TCP " : "") << "question for '"<<dc->d_mdp.d_qname<<"|"
871 <<DNSRecordContent::NumberToType(dc->d_mdp.d_qtype)<<"' from "<<dc->getRemote();
872 if(!dc->d_ednssubnet.source.empty()) {
873 L<<" (ecs "<<dc->d_ednssubnet.source.toString()<<")";
874 }
875 L<<endl;
876 }
877
878 sr.setId(MT->getTid());
879 if(!dc->d_mdp.d_header.rd)
880 sr.setCacheOnly();
881
882 if (t_pdl) {
883 t_pdl->prerpz(dq, res);
884 }
885
886 // Check if the query has a policy attached to it
887 if (wantsRPZ) {
888 appliedPolicy = luaconfsLocal->dfe.getQueryPolicy(dc->d_mdp.d_qname, dc->d_remote, sr.d_discardedPolicies);
889 }
890
891 // if there is a RecursorLua active, and it 'took' the query in preResolve, we don't launch beginResolve
892 if(!t_pdl || !t_pdl->preresolve(dq, res)) {
893
894 sr.setWantsRPZ(wantsRPZ);
895 if(wantsRPZ) {
896 switch(appliedPolicy.d_kind) {
897 case DNSFilterEngine::PolicyKind::NoAction:
898 break;
899 case DNSFilterEngine::PolicyKind::Drop:
900 g_stats.policyDrops++;
901 g_stats.policyResults[appliedPolicy.d_kind]++;
902 delete dc;
903 dc=0;
904 return;
905 case DNSFilterEngine::PolicyKind::NXDOMAIN:
906 g_stats.policyResults[appliedPolicy.d_kind]++;
907 res=RCode::NXDomain;
908 goto haveAnswer;
909 case DNSFilterEngine::PolicyKind::NODATA:
910 g_stats.policyResults[appliedPolicy.d_kind]++;
911 res=RCode::NoError;
912 goto haveAnswer;
913 case DNSFilterEngine::PolicyKind::Custom:
914 g_stats.policyResults[appliedPolicy.d_kind]++;
915 res=RCode::NoError;
916 spoofed=appliedPolicy.getCustomRecord(dc->d_mdp.d_qname);
917 ret.push_back(spoofed);
918 handleRPZCustom(spoofed, QType(dc->d_mdp.d_qtype), sr, res, ret);
919 goto haveAnswer;
920 case DNSFilterEngine::PolicyKind::Truncate:
921 if(!dc->d_tcp) {
922 g_stats.policyResults[appliedPolicy.d_kind]++;
923 res=RCode::NoError;
924 pw.getHeader()->tc=1;
925 goto haveAnswer;
926 }
927 break;
928 }
929 }
930
931 // Query got not handled for QNAME Policy reasons, now actually go out to find an answer
932 try {
933 res = sr.beginResolve(dc->d_mdp.d_qname, QType(dc->d_mdp.d_qtype), dc->d_mdp.d_qclass, ret);
934 shouldNotValidate = sr.wasOutOfBand();
935 }
936 catch(ImmediateServFailException &e) {
937 if(g_logCommonErrors)
938 L<<Logger::Notice<<"Sending SERVFAIL to "<<dc->getRemote()<<" during resolve of '"<<dc->d_mdp.d_qname<<"' because: "<<e.reason<<endl;
939 res = RCode::ServFail;
940 }
941
942 dq.validationState = sr.getValidationState();
943
944 // During lookup, an NSDNAME or NSIP trigger was hit in RPZ
945 if (res == -2) { // XXX This block should be macro'd, it is repeated post-resolve.
946 appliedPolicy = sr.d_appliedPolicy;
947 g_stats.policyResults[appliedPolicy.d_kind]++;
948 switch(appliedPolicy.d_kind) {
949 case DNSFilterEngine::PolicyKind::NoAction: // This can never happen
950 throw PDNSException("NoAction policy returned while a NSDNAME or NSIP trigger was hit");
951 case DNSFilterEngine::PolicyKind::Drop:
952 g_stats.policyDrops++;
953 delete dc;
954 dc=0;
955 return;
956 case DNSFilterEngine::PolicyKind::NXDOMAIN:
957 ret.clear();
958 res=RCode::NXDomain;
959 goto haveAnswer;
960
961 case DNSFilterEngine::PolicyKind::NODATA:
962 ret.clear();
963 res=RCode::NoError;
964 goto haveAnswer;
965
966 case DNSFilterEngine::PolicyKind::Truncate:
967 if(!dc->d_tcp) {
968 ret.clear();
969 res=RCode::NoError;
970 pw.getHeader()->tc=1;
971 goto haveAnswer;
972 }
973 break;
974
975 case DNSFilterEngine::PolicyKind::Custom:
976 ret.clear();
977 res=RCode::NoError;
978 spoofed=appliedPolicy.getCustomRecord(dc->d_mdp.d_qname);
979 ret.push_back(spoofed);
980 handleRPZCustom(spoofed, QType(dc->d_mdp.d_qtype), sr, res, ret);
981 goto haveAnswer;
982 }
983 }
984
985 if (wantsRPZ) {
986 appliedPolicy = luaconfsLocal->dfe.getPostPolicy(ret, sr.d_discardedPolicies);
987 }
988
989 if(t_pdl) {
990 if(res == RCode::NoError) {
991 auto i=ret.cbegin();
992 for(; i!= ret.cend(); ++i)
993 if(i->d_type == dc->d_mdp.d_qtype && i->d_place == DNSResourceRecord::ANSWER)
994 break;
995 if(i == ret.cend() && t_pdl->nodata(dq, res))
996 shouldNotValidate = true;
997
998 }
999 else if(res == RCode::NXDomain && t_pdl->nxdomain(dq, res))
1000 shouldNotValidate = true;
1001
1002 if(t_pdl->postresolve(dq, res))
1003 shouldNotValidate = true;
1004 }
1005
1006 if (wantsRPZ) { //XXX This block is repeated, see above
1007 g_stats.policyResults[appliedPolicy.d_kind]++;
1008 switch(appliedPolicy.d_kind) {
1009 case DNSFilterEngine::PolicyKind::NoAction:
1010 break;
1011 case DNSFilterEngine::PolicyKind::Drop:
1012 g_stats.policyDrops++;
1013 delete dc;
1014 dc=0;
1015 return;
1016 case DNSFilterEngine::PolicyKind::NXDOMAIN:
1017 ret.clear();
1018 res=RCode::NXDomain;
1019 goto haveAnswer;
1020
1021 case DNSFilterEngine::PolicyKind::NODATA:
1022 ret.clear();
1023 res=RCode::NoError;
1024 goto haveAnswer;
1025
1026 case DNSFilterEngine::PolicyKind::Truncate:
1027 if(!dc->d_tcp) {
1028 ret.clear();
1029 res=RCode::NoError;
1030 pw.getHeader()->tc=1;
1031 goto haveAnswer;
1032 }
1033 break;
1034
1035 case DNSFilterEngine::PolicyKind::Custom:
1036 ret.clear();
1037 res=RCode::NoError;
1038 spoofed=appliedPolicy.getCustomRecord(dc->d_mdp.d_qname);
1039 ret.push_back(spoofed);
1040 handleRPZCustom(spoofed, QType(dc->d_mdp.d_qtype), sr, res, ret);
1041 goto haveAnswer;
1042 }
1043 }
1044 }
1045 haveAnswer:;
1046 if(res == PolicyDecision::DROP) {
1047 g_stats.policyDrops++;
1048 delete dc;
1049 dc=0;
1050 return;
1051 }
1052 if(tracedQuery || res == -1 || res == RCode::ServFail || pw.getHeader()->rcode == RCode::ServFail)
1053 {
1054 string trace(sr.getTrace());
1055 if(!trace.empty()) {
1056 vector<string> lines;
1057 boost::split(lines, trace, boost::is_any_of("\n"));
1058 for(const string& line : lines) {
1059 if(!line.empty())
1060 L<<Logger::Warning<< line << endl;
1061 }
1062 }
1063 }
1064
1065 if(res == -1) {
1066 pw.getHeader()->rcode=RCode::ServFail;
1067 // no commit here, because no record
1068 g_stats.servFails++;
1069 }
1070 else {
1071 pw.getHeader()->rcode=res;
1072
1073 // Does the validation mode or query demand validation?
1074 if(!shouldNotValidate && sr.isDNSSECValidationRequested()) {
1075 try {
1076 if(sr.doLog()) {
1077 L<<Logger::Warning<<"Starting validation of answer to "<<dc->d_mdp.d_qname<<"|"<<QType(dc->d_mdp.d_qtype).getName()<<" for "<<dc->d_remote.toStringWithPort()<<endl;
1078 }
1079
1080 auto state = sr.getValidationState();
1081
1082 if(state == Secure) {
1083 if(sr.doLog()) {
1084 L<<Logger::Warning<<"Answer to "<<dc->d_mdp.d_qname<<"|"<<QType(dc->d_mdp.d_qtype).getName()<<" for "<<dc->d_remote.toStringWithPort()<<" validates correctly"<<endl;
1085 }
1086
1087 // Is the query source interested in the value of the ad-bit?
1088 if (dc->d_mdp.d_header.ad || DNSSECOK)
1089 pw.getHeader()->ad=1;
1090 }
1091 else if(state == Insecure) {
1092 if(sr.doLog()) {
1093 L<<Logger::Warning<<"Answer to "<<dc->d_mdp.d_qname<<"|"<<QType(dc->d_mdp.d_qtype).getName()<<" for "<<dc->d_remote.toStringWithPort()<<" validates as Insecure"<<endl;
1094 }
1095
1096 pw.getHeader()->ad=0;
1097 }
1098 else if(state == Bogus) {
1099 if(g_dnssecLogBogus || sr.doLog() || g_dnssecmode == DNSSECMode::ValidateForLog) {
1100 L<<Logger::Warning<<"Answer to "<<dc->d_mdp.d_qname<<"|"<<QType(dc->d_mdp.d_qtype).getName()<<" for "<<dc->d_remote.toStringWithPort()<<" validates as Bogus"<<endl;
1101 }
1102
1103 // Does the query or validation mode sending out a SERVFAIL on validation errors?
1104 if(!pw.getHeader()->cd && (g_dnssecmode == DNSSECMode::ValidateAll || dc->d_mdp.d_header.ad || DNSSECOK)) {
1105 if(sr.doLog()) {
1106 L<<Logger::Warning<<"Sending out SERVFAIL for "<<dc->d_mdp.d_qname<<"|"<<QType(dc->d_mdp.d_qtype).getName()<<" because recursor or query demands it for Bogus results"<<endl;
1107 }
1108
1109 pw.getHeader()->rcode=RCode::ServFail;
1110 goto sendit;
1111 } else {
1112 if(sr.doLog()) {
1113 L<<Logger::Warning<<"Not sending out SERVFAIL for "<<dc->d_mdp.d_qname<<"|"<<QType(dc->d_mdp.d_qtype).getName()<<" Bogus validation since neither config nor query demands this"<<endl;
1114 }
1115 }
1116 }
1117 }
1118 catch(ImmediateServFailException &e) {
1119 if(g_logCommonErrors)
1120 L<<Logger::Notice<<"Sending SERVFAIL to "<<dc->getRemote()<<" during validation of '"<<dc->d_mdp.d_qname<<"|"<<QType(dc->d_mdp.d_qtype).getName()<<"' because: "<<e.reason<<endl;
1121 pw.getHeader()->rcode=RCode::ServFail;
1122 goto sendit;
1123 }
1124 }
1125
1126 if(ret.size()) {
1127 orderAndShuffle(ret);
1128 if(auto sl = luaconfsLocal->sortlist.getOrderCmp(dc->d_remote)) {
1129 stable_sort(ret.begin(), ret.end(), *sl);
1130 variableAnswer=true;
1131 }
1132 }
1133
1134 bool needCommit = false;
1135 for(auto i=ret.cbegin(); i!=ret.cend(); ++i) {
1136 if( ! DNSSECOK &&
1137 ( i->d_type == QType::NSEC3 ||
1138 (
1139 ( i->d_type == QType::RRSIG || i->d_type==QType::NSEC ) &&
1140 (
1141 ( dc->d_mdp.d_qtype != i->d_type && dc->d_mdp.d_qtype != QType::ANY ) ||
1142 i->d_place != DNSResourceRecord::ANSWER
1143 )
1144 )
1145 )
1146 ) {
1147 continue;
1148 }
1149
1150 if (!addRecordToPacket(pw, *i, minTTL, dc->d_ttlCap, maxanswersize)) {
1151 needCommit = false;
1152 break;
1153 }
1154 needCommit = true;
1155
1156 #ifdef HAVE_PROTOBUF
1157 if(luaconfsLocal->protobufServer && (i->d_type == QType::A || i->d_type == QType::AAAA || i->d_type == QType::CNAME)) {
1158 pbMessage.addRR(*i);
1159 }
1160 #endif
1161 }
1162 if(needCommit)
1163 pw.commit();
1164 }
1165 sendit:;
1166
1167 if (haveEDNS) {
1168 /* we try to add the EDNS OPT RR even for truncated answers,
1169 as rfc6891 states:
1170 "The minimal response MUST be the DNS header, question section, and an
1171 OPT record. This MUST also occur when a truncated response (using
1172 the DNS header's TC bit) is returned."
1173 */
1174 if (addRecordToPacket(pw, makeOpt(edo.d_packetsize, 0, edo.d_Z), minTTL, dc->d_ttlCap, maxanswersize)) {
1175 pw.commit();
1176 }
1177 }
1178
1179 g_rs.submitResponse(dc->d_mdp.d_qtype, packet.size(), !dc->d_tcp);
1180 updateResponseStats(res, dc->d_remote, packet.size(), &dc->d_mdp.d_qname, dc->d_mdp.d_qtype);
1181 #ifdef HAVE_PROTOBUF
1182 if (luaconfsLocal->protobufServer && (!luaconfsLocal->protobufTaggedOnly || (appliedPolicy.d_name && !appliedPolicy.d_name->empty()) || !dc->d_policyTags.empty())) {
1183 pbMessage.setBytes(packet.size());
1184 pbMessage.setResponseCode(pw.getHeader()->rcode);
1185 if (appliedPolicy.d_name) {
1186 pbMessage.setAppliedPolicy(*appliedPolicy.d_name);
1187 pbMessage.setAppliedPolicyType(appliedPolicy.d_type);
1188 }
1189 pbMessage.setPolicyTags(dc->d_policyTags);
1190 pbMessage.setQueryTime(dc->d_now.tv_sec, dc->d_now.tv_usec);
1191 pbMessage.setRequestorId(dq.requestorId);
1192 pbMessage.setDeviceId(dq.deviceId);
1193 protobufLogResponse(luaconfsLocal->protobufServer, pbMessage);
1194 }
1195 #endif
1196 if(!dc->d_tcp) {
1197 struct msghdr msgh;
1198 struct iovec iov;
1199 char cbuf[256];
1200 fillMSGHdr(&msgh, &iov, cbuf, 0, (char*)&*packet.begin(), packet.size(), &dc->d_remote);
1201 msgh.msg_control=NULL;
1202
1203 if(g_fromtosockets.count(dc->d_socket)) {
1204 addCMsgSrcAddr(&msgh, cbuf, &dc->d_local, 0);
1205 }
1206 if(sendmsg(dc->d_socket, &msgh, 0) < 0 && g_logCommonErrors)
1207 L<<Logger::Warning<<"Sending UDP reply to client "<<dc->d_remote.toStringWithPort()<<" failed with: "<<strerror(errno)<<endl;
1208
1209 if(!SyncRes::s_nopacketcache && !variableAnswer && !sr.wasVariable() ) {
1210 t_packetCache->insertResponsePacket(dc->d_tag, dc->d_qhash, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_mdp.d_qclass,
1211 string((const char*)&*packet.begin(), packet.size()),
1212 g_now.tv_sec,
1213 pw.getHeader()->rcode == RCode::ServFail ? SyncRes::s_packetcacheservfailttl :
1214 min(minTTL,SyncRes::s_packetcachettl),
1215 &pbMessage);
1216 }
1217 // else cerr<<"Not putting in packet cache: "<<sr.wasVariable()<<endl;
1218 }
1219 else {
1220 char buf[2];
1221 buf[0]=packet.size()/256;
1222 buf[1]=packet.size()%256;
1223
1224 Utility::iovec iov[2];
1225
1226 iov[0].iov_base=(void*)buf; iov[0].iov_len=2;
1227 iov[1].iov_base=(void*)&*packet.begin(); iov[1].iov_len = packet.size();
1228
1229 int wret=Utility::writev(dc->d_socket, iov, 2);
1230 bool hadError=true;
1231
1232 if(wret == 0)
1233 L<<Logger::Error<<"EOF writing TCP answer to "<<dc->getRemote()<<endl;
1234 else if(wret < 0 )
1235 L<<Logger::Error<<"Error writing TCP answer to "<<dc->getRemote()<<": "<< strerror(errno) <<endl;
1236 else if((unsigned int)wret != 2 + packet.size())
1237 L<<Logger::Error<<"Oops, partial answer sent to "<<dc->getRemote()<<" for "<<dc->d_mdp.d_qname<<" (size="<< (2 + packet.size()) <<", sent "<<wret<<")"<<endl;
1238 else
1239 hadError=false;
1240
1241 // update tcp connection status, either by closing or moving to 'BYTE0'
1242
1243 if(hadError) {
1244 // no need to remove us from FDM, we weren't there
1245 dc->d_socket = -1;
1246 }
1247 else {
1248 dc->d_tcpConnection->queriesCount++;
1249 if (g_tcpMaxQueriesPerConn && dc->d_tcpConnection->queriesCount >= g_tcpMaxQueriesPerConn) {
1250 dc->d_socket = -1;
1251 }
1252 else {
1253 dc->d_tcpConnection->state=TCPConnection::BYTE0;
1254 Utility::gettimeofday(&g_now, 0); // needs to be updated
1255 t_fdm->addReadFD(dc->d_socket, handleRunningTCPQuestion, dc->d_tcpConnection);
1256 t_fdm->setReadTTD(dc->d_socket, g_now, g_tcpTimeout);
1257 }
1258 }
1259 }
1260 float spent=makeFloat(sr.getNow()-dc->d_now);
1261 if(!g_quiet) {
1262 L<<Logger::Error<<t_id<<" ["<<MT->getTid()<<"/"<<MT->numProcesses()<<"] answer to "<<(dc->d_mdp.d_header.rd?"":"non-rd ")<<"question '"<<dc->d_mdp.d_qname<<"|"<<DNSRecordContent::NumberToType(dc->d_mdp.d_qtype);
1263 L<<"': "<<ntohs(pw.getHeader()->ancount)<<" answers, "<<ntohs(pw.getHeader()->arcount)<<" additional, took "<<sr.d_outqueries<<" packets, "<<
1264 sr.d_totUsec/1000.0<<" netw ms, "<< spent*1000.0<<" tot ms, "<<
1265 sr.d_throttledqueries<<" throttled, "<<sr.d_timeouts<<" timeouts, "<<sr.d_tcpoutqueries<<" tcp connections, rcode="<< res;
1266
1267 if(!shouldNotValidate && sr.isDNSSECValidationRequested()) {
1268 L<< ", dnssec="<<vStates[sr.getValidationState()];
1269 }
1270
1271 L<<endl;
1272
1273 }
1274
1275 if (sr.d_outqueries || sr.d_authzonequeries) {
1276 t_RC->cacheMisses++;
1277 }
1278 else {
1279 t_RC->cacheHits++;
1280 }
1281
1282 if(spent < 0.001)
1283 g_stats.answers0_1++;
1284 else if(spent < 0.010)
1285 g_stats.answers1_10++;
1286 else if(spent < 0.1)
1287 g_stats.answers10_100++;
1288 else if(spent < 1.0)
1289 g_stats.answers100_1000++;
1290 else
1291 g_stats.answersSlow++;
1292
1293 uint64_t newLat=(uint64_t)(spent*1000000);
1294 newLat = min(newLat,(uint64_t)(((uint64_t) g_networkTimeoutMsec)*1000)); // outliers of several minutes exist..
1295 g_stats.avgLatencyUsec=(1-1.0/g_latencyStatSize)*g_stats.avgLatencyUsec + (float)newLat/g_latencyStatSize;
1296 // no worries, we do this for packet cache hits elsewhere
1297
1298 auto ourtime = 1000.0*spent-sr.d_totUsec/1000.0; // in msec
1299 if(ourtime < 1)
1300 g_stats.ourtime0_1++;
1301 else if(ourtime < 2)
1302 g_stats.ourtime1_2++;
1303 else if(ourtime < 4)
1304 g_stats.ourtime2_4++;
1305 else if(ourtime < 8)
1306 g_stats.ourtime4_8++;
1307 else if(ourtime < 16)
1308 g_stats.ourtime8_16++;
1309 else if(ourtime < 32)
1310 g_stats.ourtime16_32++;
1311 else {
1312 // cerr<<"SLOW: "<<ourtime<<"ms -> "<<dc->d_mdp.d_qname<<"|"<<DNSRecordContent::NumberToType(dc->d_mdp.d_qtype)<<endl;
1313 g_stats.ourtimeSlow++;
1314 }
1315 if(ourtime >= 0.0) {
1316 newLat=ourtime*1000; // usec
1317 g_stats.avgLatencyOursUsec=(1-1.0/g_latencyStatSize)*g_stats.avgLatencyOursUsec + (float)newLat/g_latencyStatSize;
1318 }
1319 // cout<<dc->d_mdp.d_qname<<"\t"<<MT->getUsec()<<"\t"<<sr.d_outqueries<<endl;
1320 delete dc;
1321 dc=0;
1322 }
1323 catch(PDNSException &ae) {
1324 L<<Logger::Error<<"startDoResolve problem "<<makeLoginfo(dc)<<": "<<ae.reason<<endl;
1325 delete dc;
1326 }
1327 catch(MOADNSException& e) {
1328 L<<Logger::Error<<"DNS parser error "<<makeLoginfo(dc) <<": "<<dc->d_mdp.d_qname<<", "<<e.what()<<endl;
1329 delete dc;
1330 }
1331 catch(std::exception& e) {
1332 L<<Logger::Error<<"STL error "<< makeLoginfo(dc)<<": "<<e.what();
1333
1334 // Luawrapper nests the exception from Lua, so we unnest it here
1335 try {
1336 std::rethrow_if_nested(e);
1337 } catch(const std::exception& ne) {
1338 L<<". Extra info: "<<ne.what();
1339 } catch(...) {}
1340
1341 L<<endl;
1342 delete dc;
1343 }
1344 catch(...) {
1345 L<<Logger::Error<<"Any other exception in a resolver context "<< makeLoginfo(dc) <<endl;
1346 }
1347
1348 g_stats.maxMThreadStackUsage = max(MT->getMaxStackUsage(), g_stats.maxMThreadStackUsage);
1349 }
1350
1351 static void makeControlChannelSocket(int processNum=-1)
1352 {
1353 string sockname=::arg()["socket-dir"]+"/"+s_programname;
1354 if(processNum >= 0)
1355 sockname += "."+std::to_string(processNum);
1356 sockname+=".controlsocket";
1357 s_rcc.listen(sockname);
1358
1359 int sockowner = -1;
1360 int sockgroup = -1;
1361
1362 if (!::arg().isEmpty("socket-group"))
1363 sockgroup=::arg().asGid("socket-group");
1364 if (!::arg().isEmpty("socket-owner"))
1365 sockowner=::arg().asUid("socket-owner");
1366
1367 if (sockgroup > -1 || sockowner > -1) {
1368 if(chown(sockname.c_str(), sockowner, sockgroup) < 0) {
1369 unixDie("Failed to chown control socket");
1370 }
1371 }
1372
1373 // do mode change if socket-mode is given
1374 if(!::arg().isEmpty("socket-mode")) {
1375 mode_t sockmode=::arg().asMode("socket-mode");
1376 if(chmod(sockname.c_str(), sockmode) < 0) {
1377 unixDie("Failed to chmod control socket");
1378 }
1379 }
1380 }
1381
1382 static bool getQNameAndSubnet(const std::string& question, DNSName* dnsname, uint16_t* qtype, uint16_t* qclass, EDNSSubnetOpts* ednssubnet, std::map<uint16_t, EDNSOptionView>* options)
1383 {
1384 bool found = false;
1385 const struct dnsheader* dh = (struct dnsheader*)question.c_str();
1386 size_t questionLen = question.length();
1387 unsigned int consumed=0;
1388 *dnsname=DNSName(question.c_str(), questionLen, sizeof(dnsheader), false, qtype, qclass, &consumed);
1389
1390 size_t pos= sizeof(dnsheader)+consumed+4;
1391 /* at least OPT root label (1), type (2), class (2) and ttl (4) + OPT RR rdlen (2)
1392 = 11 */
1393 if(ntohs(dh->arcount) == 1 && questionLen > pos + 11) { // this code can extract one (1) EDNS Subnet option
1394 /* OPT root label (1) followed by type (2) */
1395 if(question.at(pos)==0 && question.at(pos+1)==0 && question.at(pos+2)==QType::OPT) {
1396 if (!options) {
1397 char* ecsStart = nullptr;
1398 size_t ecsLen = 0;
1399 int res = getEDNSOption((char*)question.c_str()+pos+9, questionLen - pos - 9, EDNSOptionCode::ECS, &ecsStart, &ecsLen);
1400 if (res == 0 && ecsLen > 4) {
1401 EDNSSubnetOpts eso;
1402 if(getEDNSSubnetOptsFromString(ecsStart + 4, ecsLen - 4, &eso)) {
1403 *ednssubnet=eso;
1404 found = true;
1405 }
1406 }
1407 }
1408 else {
1409 int res = getEDNSOptions((char*)question.c_str()+pos+9, questionLen - pos - 9, *options);
1410 if (res == 0) {
1411 const auto& it = options->find(EDNSOptionCode::ECS);
1412 if (it != options->end() && it->second.content != nullptr && it->second.size > 0) {
1413 EDNSSubnetOpts eso;
1414 if(getEDNSSubnetOptsFromString(it->second.content, it->second.size, &eso)) {
1415 *ednssubnet=eso;
1416 found = true;
1417 }
1418 }
1419 }
1420 }
1421 }
1422 }
1423 return found;
1424 }
1425
1426 static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var)
1427 {
1428 shared_ptr<TCPConnection> conn=any_cast<shared_ptr<TCPConnection> >(var);
1429
1430 if(conn->state==TCPConnection::BYTE0) {
1431 ssize_t bytes=recv(conn->getFD(), conn->data, 2, 0);
1432 if(bytes==1)
1433 conn->state=TCPConnection::BYTE1;
1434 if(bytes==2) {
1435 conn->qlen=(((unsigned char)conn->data[0]) << 8)+ (unsigned char)conn->data[1];
1436 conn->bytesread=0;
1437 conn->state=TCPConnection::GETQUESTION;
1438 }
1439 if(!bytes || bytes < 0) {
1440 t_fdm->removeReadFD(fd);
1441 return;
1442 }
1443 }
1444 else if(conn->state==TCPConnection::BYTE1) {
1445 ssize_t bytes=recv(conn->getFD(), conn->data+1, 1, 0);
1446 if(bytes==1) {
1447 conn->state=TCPConnection::GETQUESTION;
1448 conn->qlen=(((unsigned char)conn->data[0]) << 8)+ (unsigned char)conn->data[1];
1449 conn->bytesread=0;
1450 }
1451 if(!bytes || bytes < 0) {
1452 if(g_logCommonErrors)
1453 L<<Logger::Error<<"TCP client "<< conn->d_remote.toString() <<" disconnected after first byte"<<endl;
1454 t_fdm->removeReadFD(fd);
1455 return;
1456 }
1457 }
1458 else if(conn->state==TCPConnection::GETQUESTION) {
1459 ssize_t bytes=recv(conn->getFD(), conn->data + conn->bytesread, conn->qlen - conn->bytesread, 0);
1460 if(!bytes || bytes < 0 || bytes > std::numeric_limits<std::uint16_t>::max()) {
1461 L<<Logger::Error<<"TCP client "<< conn->d_remote.toString() <<" disconnected while reading question body"<<endl;
1462 t_fdm->removeReadFD(fd);
1463 return;
1464 }
1465 conn->bytesread+=(uint16_t)bytes;
1466 if(conn->bytesread==conn->qlen) {
1467 t_fdm->removeReadFD(fd); // should no longer awake ourselves when there is data to read
1468
1469 DNSComboWriter* dc=nullptr;
1470 try {
1471 dc=new DNSComboWriter(conn->data, conn->qlen, g_now);
1472 }
1473 catch(MOADNSException &mde) {
1474 g_stats.clientParseError++;
1475 if(g_logCommonErrors)
1476 L<<Logger::Error<<"Unable to parse packet from TCP client "<< conn->d_remote.toString() <<endl;
1477 return;
1478 }
1479 dc->d_tcpConnection = conn; // carry the torch
1480 dc->setSocket(conn->getFD()); // this is the only time a copy is made of the actual fd
1481 dc->d_tcp=true;
1482 dc->setRemote(&conn->d_remote);
1483 ComboAddress dest;
1484 dest.reset();
1485 dest.sin4.sin_family = conn->d_remote.sin4.sin_family;
1486 socklen_t len = dest.getSocklen();
1487 getsockname(conn->getFD(), (sockaddr*)&dest, &len); // if this fails, we're ok with it
1488 dc->setLocal(dest);
1489 DNSName qname;
1490 uint16_t qtype=0;
1491 uint16_t qclass=0;
1492 bool needECS = false;
1493 string requestorId;
1494 string deviceId;
1495 #ifdef HAVE_PROTOBUF
1496 auto luaconfsLocal = g_luaconfs.getLocal();
1497 if (luaconfsLocal->protobufServer) {
1498 needECS = true;
1499 }
1500 #endif
1501
1502 if(needECS || (t_pdl && (t_pdl->d_gettag_ffi || t_pdl->d_gettag))) {
1503
1504 try {
1505 std::map<uint16_t, EDNSOptionView> ednsOptions;
1506 dc->d_ecsParsed = true;
1507 dc->d_ecsFound = getQNameAndSubnet(std::string(conn->data, conn->qlen), &qname, &qtype, &qclass, &dc->d_ednssubnet, g_gettagNeedsEDNSOptions ? &ednsOptions : nullptr);
1508
1509 if(t_pdl) {
1510 try {
1511 if (t_pdl->d_gettag_ffi) {
1512 dc->d_tag = t_pdl->gettag_ffi(conn->d_remote, dc->d_ednssubnet.source, dest, qname, qtype, &dc->d_policyTags, dc->d_data, ednsOptions, true, requestorId, deviceId, dc->d_ttlCap, dc->d_variable);
1513 }
1514 else if (t_pdl->d_gettag) {
1515 dc->d_tag = t_pdl->gettag(conn->d_remote, dc->d_ednssubnet.source, dest, qname, qtype, &dc->d_policyTags, dc->d_data, ednsOptions, true, requestorId, deviceId);
1516 }
1517 }
1518 catch(const std::exception& e) {
1519 if(g_logCommonErrors)
1520 L<<Logger::Warning<<"Error parsing a query packet qname='"<<qname<<"' for tag determination, setting tag=0: "<<e.what()<<endl;
1521 }
1522 }
1523 }
1524 catch(const std::exception& e)
1525 {
1526 if(g_logCommonErrors)
1527 L<<Logger::Warning<<"Error parsing a query packet for tag determination, setting tag=0: "<<e.what()<<endl;
1528 }
1529 }
1530 #ifdef HAVE_PROTOBUF
1531 if(luaconfsLocal->protobufServer || luaconfsLocal->outgoingProtobufServer) {
1532 dc->d_requestorId = requestorId;
1533 dc->d_deviceId = deviceId;
1534 dc->d_uuid = (*t_uuidGenerator)();
1535 }
1536
1537 if(luaconfsLocal->protobufServer) {
1538 try {
1539 const struct dnsheader* dh = (const struct dnsheader*) conn->data;
1540
1541 if (!luaconfsLocal->protobufTaggedOnly) {
1542 protobufLogQuery(luaconfsLocal->protobufServer, luaconfsLocal->protobufMaskV4, luaconfsLocal->protobufMaskV6, dc->d_uuid, conn->d_remote, dest, dc->d_ednssubnet.source, true, dh->id, conn->qlen, qname, qtype, qclass, dc->d_policyTags, dc->d_requestorId, dc->d_deviceId);
1543 }
1544 }
1545 catch(std::exception& e) {
1546 if(g_logCommonErrors)
1547 L<<Logger::Warning<<"Error parsing a TCP query packet for edns subnet: "<<e.what()<<endl;
1548 }
1549 }
1550 #endif
1551 if(dc->d_mdp.d_header.qr) {
1552 delete dc;
1553 g_stats.ignoredCount++;
1554 L<<Logger::Error<<"Ignoring answer from TCP client "<< conn->d_remote.toString() <<" on server socket!"<<endl;
1555 return;
1556 }
1557 if(dc->d_mdp.d_header.opcode) {
1558 delete dc;
1559 g_stats.ignoredCount++;
1560 L<<Logger::Error<<"Ignoring non-query opcode from TCP client "<< conn->d_remote.toString() <<" on server socket!"<<endl;
1561 return;
1562 }
1563 else {
1564 ++g_stats.qcounter;
1565 ++g_stats.tcpqcounter;
1566 MT->makeThread(startDoResolve, dc); // deletes dc, will set state to BYTE0 again
1567 return;
1568 }
1569 }
1570 }
1571 }
1572
1573 //! Handle new incoming TCP connection
1574 static void handleNewTCPQuestion(int fd, FDMultiplexer::funcparam_t& )
1575 {
1576 ComboAddress addr;
1577 socklen_t addrlen=sizeof(addr);
1578 int newsock=accept(fd, (struct sockaddr*)&addr, &addrlen);
1579 if(newsock>=0) {
1580 if(MT->numProcesses() > g_maxMThreads) {
1581 g_stats.overCapacityDrops++;
1582 try {
1583 closesocket(newsock);
1584 }
1585 catch(const PDNSException& e) {
1586 L<<Logger::Error<<"Error closing TCP socket after an over capacity drop: "<<e.reason<<endl;
1587 }
1588 return;
1589 }
1590
1591 if(t_remotes)
1592 t_remotes->push_back(addr);
1593 if(t_allowFrom && !t_allowFrom->match(&addr)) {
1594 if(!g_quiet)
1595 L<<Logger::Error<<"["<<MT->getTid()<<"] dropping TCP query from "<<addr.toString()<<", address not matched by allow-from"<<endl;
1596
1597 g_stats.unauthorizedTCP++;
1598 try {
1599 closesocket(newsock);
1600 }
1601 catch(const PDNSException& e) {
1602 L<<Logger::Error<<"Error closing TCP socket after an ACL drop: "<<e.reason<<endl;
1603 }
1604 return;
1605 }
1606 if(g_maxTCPPerClient && t_tcpClientCounts->count(addr) && (*t_tcpClientCounts)[addr] >= g_maxTCPPerClient) {
1607 g_stats.tcpClientOverflow++;
1608 try {
1609 closesocket(newsock); // don't call TCPConnection::closeAndCleanup here - did not enter it in the counts yet!
1610 }
1611 catch(const PDNSException& e) {
1612 L<<Logger::Error<<"Error closing TCP socket after an overflow drop: "<<e.reason<<endl;
1613 }
1614 return;
1615 }
1616
1617 setNonBlocking(newsock);
1618 std::shared_ptr<TCPConnection> tc = std::make_shared<TCPConnection>(newsock, addr);
1619 tc->state=TCPConnection::BYTE0;
1620
1621 t_fdm->addReadFD(tc->getFD(), handleRunningTCPQuestion, tc);
1622
1623 struct timeval now;
1624 Utility::gettimeofday(&now, 0);
1625 t_fdm->setReadTTD(tc->getFD(), now, g_tcpTimeout);
1626 }
1627 }
1628
1629 static string* doProcessUDPQuestion(const std::string& question, const ComboAddress& fromaddr, const ComboAddress& destaddr, struct timeval tv, int fd)
1630 {
1631 gettimeofday(&g_now, 0);
1632 struct timeval diff = g_now - tv;
1633 double delta=(diff.tv_sec*1000 + diff.tv_usec/1000.0);
1634
1635 if(tv.tv_sec && delta > 1000.0) {
1636 g_stats.tooOldDrops++;
1637 return 0;
1638 }
1639
1640 ++g_stats.qcounter;
1641 if(fromaddr.sin4.sin_family==AF_INET6)
1642 g_stats.ipv6qcounter++;
1643
1644 string response;
1645 const struct dnsheader* dh = (struct dnsheader*)question.c_str();
1646 unsigned int ctag=0;
1647 uint32_t qhash = 0;
1648 bool needECS = false;
1649 std::vector<std::string> policyTags;
1650 LuaContext::LuaObject data;
1651 string requestorId;
1652 string deviceId;
1653 #ifdef HAVE_PROTOBUF
1654 boost::uuids::uuid uniqueId;
1655 auto luaconfsLocal = g_luaconfs.getLocal();
1656 if (luaconfsLocal->protobufServer) {
1657 uniqueId = (*t_uuidGenerator)();
1658 needECS = true;
1659 } else if (luaconfsLocal->outgoingProtobufServer) {
1660 uniqueId = (*t_uuidGenerator)();
1661 }
1662 #endif
1663 EDNSSubnetOpts ednssubnet;
1664 bool ecsFound = false;
1665 bool ecsParsed = false;
1666 uint32_t ttlCap = std::numeric_limits<uint32_t>::max();
1667 bool variable = false;
1668 try {
1669 DNSName qname;
1670 uint16_t qtype=0;
1671 uint16_t qclass=0;
1672 uint32_t age;
1673 bool qnameParsed=false;
1674 #ifdef MALLOC_TRACE
1675 /*
1676 static uint64_t last=0;
1677 if(!last)
1678 g_mtracer->clearAllocators();
1679 cout<<g_mtracer->getAllocs()-last<<" "<<g_mtracer->getNumOut()<<" -- BEGIN TRACE"<<endl;
1680 last=g_mtracer->getAllocs();
1681 cout<<g_mtracer->topAllocatorsString()<<endl;
1682 g_mtracer->clearAllocators();
1683 */
1684 #endif
1685
1686 if(needECS || (t_pdl && (t_pdl->d_gettag || t_pdl->d_gettag_ffi))) {
1687 try {
1688 std::map<uint16_t, EDNSOptionView> ednsOptions;
1689 ecsFound = getQNameAndSubnet(question, &qname, &qtype, &qclass, &ednssubnet, g_gettagNeedsEDNSOptions ? &ednsOptions : nullptr);
1690 qnameParsed = true;
1691 ecsParsed = true;
1692
1693 if(t_pdl) {
1694 try {
1695 if (t_pdl->d_gettag_ffi) {
1696 ctag = t_pdl->gettag_ffi(fromaddr, ednssubnet.source, destaddr, qname, qtype, &policyTags, data, ednsOptions, false, requestorId, deviceId, ttlCap, variable);
1697 }
1698 else if (t_pdl->d_gettag) {
1699 ctag=t_pdl->gettag(fromaddr, ednssubnet.source, destaddr, qname, qtype, &policyTags, data, ednsOptions, false, requestorId, deviceId);
1700 }
1701 }
1702 catch(const std::exception& e) {
1703 if(g_logCommonErrors)
1704 L<<Logger::Warning<<"Error parsing a query packet qname='"<<qname<<"' for tag determination, setting tag=0: "<<e.what()<<endl;
1705 }
1706 }
1707 }
1708 catch(const std::exception& e)
1709 {
1710 if(g_logCommonErrors)
1711 L<<Logger::Warning<<"Error parsing a query packet for tag determination, setting tag=0: "<<e.what()<<endl;
1712 }
1713 }
1714
1715 bool cacheHit = false;
1716 RecProtoBufMessage pbMessage(DNSProtoBufMessage::DNSProtoBufMessageType::Response);
1717 #ifdef HAVE_PROTOBUF
1718 if(luaconfsLocal->protobufServer) {
1719 if (!luaconfsLocal->protobufTaggedOnly || !policyTags.empty()) {
1720 protobufLogQuery(luaconfsLocal->protobufServer, luaconfsLocal->protobufMaskV4, luaconfsLocal->protobufMaskV6, uniqueId, fromaddr, destaddr, ednssubnet.source, false, dh->id, question.size(), qname, qtype, qclass, policyTags, requestorId, deviceId);
1721 }
1722 }
1723 #endif /* HAVE_PROTOBUF */
1724
1725 /* It might seem like a good idea to skip the packet cache lookup if we know that the answer is not cacheable,
1726 but it means that the hash would not be computed. If some script decides at a later time to mark back the answer
1727 as cacheable we would cache it with a wrong tag, so better safe than sorry. */
1728 if (qnameParsed) {
1729 cacheHit = (!SyncRes::s_nopacketcache && t_packetCache->getResponsePacket(ctag, question, qname, qtype, qclass, g_now.tv_sec, &response, &age, &qhash, &pbMessage));
1730 }
1731 else {
1732 cacheHit = (!SyncRes::s_nopacketcache && t_packetCache->getResponsePacket(ctag, question, g_now.tv_sec, &response, &age, &qhash, &pbMessage));
1733 }
1734
1735 if (cacheHit) {
1736 #ifdef HAVE_PROTOBUF
1737 if(luaconfsLocal->protobufServer && (!luaconfsLocal->protobufTaggedOnly || !pbMessage.getAppliedPolicy().empty() || !pbMessage.getPolicyTags().empty())) {
1738 Netmask requestorNM(fromaddr, fromaddr.sin4.sin_family == AF_INET ? luaconfsLocal->protobufMaskV4 : luaconfsLocal->protobufMaskV6);
1739 const ComboAddress& requestor = requestorNM.getMaskedNetwork();
1740 pbMessage.update(uniqueId, &requestor, &destaddr, false, dh->id);
1741 pbMessage.setEDNSSubnet(ednssubnet.source, ednssubnet.source.isIpv4() ? luaconfsLocal->protobufMaskV4 : luaconfsLocal->protobufMaskV6);
1742 pbMessage.setQueryTime(g_now.tv_sec, g_now.tv_usec);
1743 pbMessage.setRequestorId(requestorId);
1744 pbMessage.setDeviceId(deviceId);
1745 protobufLogResponse(luaconfsLocal->protobufServer, pbMessage);
1746 }
1747 #endif /* HAVE_PROTOBUF */
1748 if(!g_quiet)
1749 L<<Logger::Notice<<t_id<< " question answered from packet cache tag="<<ctag<<" from "<<fromaddr.toString()<<endl;
1750
1751 g_stats.packetCacheHits++;
1752 SyncRes::s_queries++;
1753 ageDNSPacket(response, age);
1754 struct msghdr msgh;
1755 struct iovec iov;
1756 char cbuf[256];
1757 fillMSGHdr(&msgh, &iov, cbuf, 0, (char*)response.c_str(), response.length(), const_cast<ComboAddress*>(&fromaddr));
1758 msgh.msg_control=NULL;
1759
1760 if(g_fromtosockets.count(fd)) {
1761 addCMsgSrcAddr(&msgh, cbuf, &destaddr, 0);
1762 }
1763 if(sendmsg(fd, &msgh, 0) < 0 && g_logCommonErrors)
1764 L<<Logger::Warning<<"Sending UDP reply to client "<<fromaddr.toStringWithPort()<<" failed with: "<<strerror(errno)<<endl;
1765
1766 if(response.length() >= sizeof(struct dnsheader)) {
1767 struct dnsheader tmpdh;
1768 memcpy(&tmpdh, response.c_str(), sizeof(tmpdh));
1769 updateResponseStats(tmpdh.rcode, fromaddr, response.length(), 0, 0);
1770 }
1771 g_stats.avgLatencyUsec=(1-1.0/g_latencyStatSize)*g_stats.avgLatencyUsec + 0.0; // we assume 0 usec
1772 g_stats.avgLatencyOursUsec=(1-1.0/g_latencyStatSize)*g_stats.avgLatencyOursUsec + 0.0; // we assume 0 usec
1773 return 0;
1774 }
1775 }
1776 catch(std::exception& e) {
1777 L<<Logger::Error<<"Error processing or aging answer packet: "<<e.what()<<endl;
1778 return 0;
1779 }
1780
1781 if(t_pdl) {
1782 if(t_pdl->ipfilter(fromaddr, destaddr, *dh)) {
1783 if(!g_quiet)
1784 L<<Logger::Notice<<t_id<<" ["<<MT->getTid()<<"/"<<MT->numProcesses()<<"] DROPPED question from "<<fromaddr.toStringWithPort()<<" based on policy"<<endl;
1785 g_stats.policyDrops++;
1786 return 0;
1787 }
1788 }
1789
1790 if(MT->numProcesses() > g_maxMThreads) {
1791 if(!g_quiet)
1792 L<<Logger::Notice<<t_id<<" ["<<MT->getTid()<<"/"<<MT->numProcesses()<<"] DROPPED question from "<<fromaddr.toStringWithPort()<<", over capacity"<<endl;
1793
1794 g_stats.overCapacityDrops++;
1795 return 0;
1796 }
1797
1798 DNSComboWriter* dc = new DNSComboWriter(question.c_str(), question.size(), g_now);
1799 dc->setSocket(fd);
1800 dc->d_tag=ctag;
1801 dc->d_qhash=qhash;
1802 dc->d_query = question;
1803 dc->setRemote(&fromaddr);
1804 dc->setLocal(destaddr);
1805 dc->d_tcp=false;
1806 dc->d_policyTags = policyTags;
1807 dc->d_data = data;
1808 dc->d_ecsFound = ecsFound;
1809 dc->d_ecsParsed = ecsParsed;
1810 dc->d_ednssubnet = ednssubnet;
1811 dc->d_ttlCap = ttlCap;
1812 dc->d_variable = variable;
1813 #ifdef HAVE_PROTOBUF
1814 if (luaconfsLocal->protobufServer || luaconfsLocal->outgoingProtobufServer) {
1815 dc->d_uuid = uniqueId;
1816 }
1817 dc->d_requestorId = requestorId;
1818 dc->d_deviceId = deviceId;
1819 #endif
1820
1821 MT->makeThread(startDoResolve, (void*) dc); // deletes dc
1822 return 0;
1823 }
1824
1825
1826 static void handleNewUDPQuestion(int fd, FDMultiplexer::funcparam_t& var)
1827 {
1828 ssize_t len;
1829 char data[1500];
1830 ComboAddress fromaddr;
1831 struct msghdr msgh;
1832 struct iovec iov;
1833 char cbuf[256];
1834 bool firstQuery = true;
1835
1836 fromaddr.sin6.sin6_family=AF_INET6; // this makes sure fromaddr is big enough
1837 fillMSGHdr(&msgh, &iov, cbuf, sizeof(cbuf), data, sizeof(data), &fromaddr);
1838
1839 for(;;)
1840 if((len=recvmsg(fd, &msgh, 0)) >= 0) {
1841
1842 firstQuery = false;
1843
1844 if(t_remotes)
1845 t_remotes->push_back(fromaddr);
1846
1847 if(t_allowFrom && !t_allowFrom->match(&fromaddr)) {
1848 if(!g_quiet)
1849 L<<Logger::Error<<"["<<MT->getTid()<<"] dropping UDP query from "<<fromaddr.toString()<<", address not matched by allow-from"<<endl;
1850
1851 g_stats.unauthorizedUDP++;
1852 return;
1853 }
1854 BOOST_STATIC_ASSERT(offsetof(sockaddr_in, sin_port) == offsetof(sockaddr_in6, sin6_port));
1855 if(!fromaddr.sin4.sin_port) { // also works for IPv6
1856 if(!g_quiet)
1857 L<<Logger::Error<<"["<<MT->getTid()<<"] dropping UDP query from "<<fromaddr.toStringWithPort()<<", can't deal with port 0"<<endl;
1858
1859 g_stats.clientParseError++; // not quite the best place to put it, but needs to go somewhere
1860 return;
1861 }
1862 try {
1863 dnsheader* dh=(dnsheader*)data;
1864
1865 if(dh->qr) {
1866 g_stats.ignoredCount++;
1867 if(g_logCommonErrors)
1868 L<<Logger::Error<<"Ignoring answer from "<<fromaddr.toString()<<" on server socket!"<<endl;
1869 }
1870 else if(dh->opcode) {
1871 g_stats.ignoredCount++;
1872 if(g_logCommonErrors)
1873 L<<Logger::Error<<"Ignoring non-query opcode "<<dh->opcode<<" from "<<fromaddr.toString()<<" on server socket!"<<endl;
1874 }
1875 else {
1876 string question(data, (size_t)len);
1877 struct timeval tv={0,0};
1878 HarvestTimestamp(&msgh, &tv);
1879 ComboAddress dest;
1880 dest.reset(); // this makes sure we ignore this address if not returned by recvmsg above
1881 auto loc = rplookup(g_listenSocketsAddresses, fd);
1882 if(HarvestDestinationAddress(&msgh, &dest)) {
1883 // but.. need to get port too
1884 if(loc)
1885 dest.sin4.sin_port = loc->sin4.sin_port;
1886 }
1887 else {
1888 if(loc) {
1889 dest = *loc;
1890 }
1891 else {
1892 dest.sin4.sin_family = fromaddr.sin4.sin_family;
1893 socklen_t slen = dest.getSocklen();
1894 getsockname(fd, (sockaddr*)&dest, &slen); // if this fails, we're ok with it
1895 }
1896 }
1897 if(g_weDistributeQueries)
1898 distributeAsyncFunction(question, boost::bind(doProcessUDPQuestion, question, fromaddr, dest, tv, fd));
1899 else
1900 doProcessUDPQuestion(question, fromaddr, dest, tv, fd);
1901 }
1902 }
1903 catch(MOADNSException& mde) {
1904 g_stats.clientParseError++;
1905 if(g_logCommonErrors)
1906 L<<Logger::Error<<"Unable to parse packet from remote UDP client "<<fromaddr.toString() <<": "<<mde.what()<<endl;
1907 }
1908 catch(std::runtime_error& e) {
1909 g_stats.clientParseError++;
1910 if(g_logCommonErrors)
1911 L<<Logger::Error<<"Unable to parse packet from remote UDP client "<<fromaddr.toString() <<": "<<e.what()<<endl;
1912 }
1913 }
1914 else {
1915 // cerr<<t_id<<" had error: "<<stringerror()<<endl;
1916 if(firstQuery && errno == EAGAIN)
1917 g_stats.noPacketError++;
1918
1919 break;
1920 }
1921 }
1922
1923 static void makeTCPServerSockets(unsigned int threadId)
1924 {
1925 int fd;
1926 vector<string>locals;
1927 stringtok(locals,::arg()["local-address"]," ,");
1928
1929 if(locals.empty())
1930 throw PDNSException("No local address specified");
1931
1932 for(vector<string>::const_iterator i=locals.begin();i!=locals.end();++i) {
1933 ServiceTuple st;
1934 st.port=::arg().asNum("local-port");
1935 parseService(*i, st);
1936
1937 ComboAddress sin;
1938
1939 sin.reset();
1940 sin.sin4.sin_family = AF_INET;
1941 if(!IpToU32(st.host, (uint32_t*)&sin.sin4.sin_addr.s_addr)) {
1942 sin.sin6.sin6_family = AF_INET6;
1943 if(makeIPv6sockaddr(st.host, &sin.sin6) < 0)
1944 throw PDNSException("Unable to resolve local address for TCP server on '"+ st.host +"'");
1945 }
1946
1947 fd=socket(sin.sin6.sin6_family, SOCK_STREAM, 0);
1948 if(fd<0)
1949 throw PDNSException("Making a TCP server socket for resolver: "+stringerror());
1950
1951 setCloseOnExec(fd);
1952
1953 int tmp=1;
1954 if(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &tmp, sizeof tmp)<0) {
1955 L<<Logger::Error<<"Setsockopt failed for TCP listening socket"<<endl;
1956 exit(1);
1957 }
1958 if(sin.sin6.sin6_family == AF_INET6 && setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &tmp, sizeof(tmp)) < 0) {
1959 L<<Logger::Error<<"Failed to set IPv6 socket to IPv6 only, continuing anyhow: "<<strerror(errno)<<endl;
1960 }
1961
1962 #ifdef TCP_DEFER_ACCEPT
1963 if(setsockopt(fd, SOL_TCP, TCP_DEFER_ACCEPT, &tmp, sizeof tmp) >= 0) {
1964 if(i==locals.begin())
1965 L<<Logger::Error<<"Enabled TCP data-ready filter for (slight) DoS protection"<<endl;
1966 }
1967 #endif
1968
1969 if( ::arg().mustDo("non-local-bind") )
1970 Utility::setBindAny(AF_INET, fd);
1971
1972 #ifdef SO_REUSEPORT
1973 if(g_reusePort) {
1974 if(setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &tmp, sizeof(tmp)) < 0)
1975 throw PDNSException("SO_REUSEPORT: "+stringerror());
1976 }
1977 #endif
1978
1979 if (::arg().asNum("tcp-fast-open") > 0) {
1980 #ifdef TCP_FASTOPEN
1981 int fastOpenQueueSize = ::arg().asNum("tcp-fast-open");
1982 if (setsockopt(fd, IPPROTO_TCP, TCP_FASTOPEN, &fastOpenQueueSize, sizeof fastOpenQueueSize) < 0) {
1983 L<<Logger::Error<<"Failed to enable TCP Fast Open for listening socket: "<<strerror(errno)<<endl;
1984 }
1985 #else
1986 L<<Logger::Warning<<"TCP Fast Open configured but not supported for listening socket"<<endl;
1987 #endif
1988 }
1989
1990 sin.sin4.sin_port = htons(st.port);
1991 socklen_t socklen=sin.sin4.sin_family==AF_INET ? sizeof(sin.sin4) : sizeof(sin.sin6);
1992 if (::bind(fd, (struct sockaddr *)&sin, socklen )<0)
1993 throw PDNSException("Binding TCP server socket for "+ st.host +": "+stringerror());
1994
1995 setNonBlocking(fd);
1996 setSocketSendBuffer(fd, 65000);
1997 listen(fd, 128);
1998 deferredAdds[threadId].push_back(make_pair(fd, handleNewTCPQuestion));
1999 g_tcpListenSockets[threadId].insert(fd);
2000 // we don't need to update g_listenSocketsAddresses since it doesn't work for TCP/IP:
2001 // - fd is not that which we know here, but returned from accept()
2002 if(sin.sin4.sin_family == AF_INET)
2003 L<<Logger::Error<<"Listening for TCP queries on "<< sin.toString() <<":"<<st.port<<endl;
2004 else
2005 L<<Logger::Error<<"Listening for TCP queries on ["<< sin.toString() <<"]:"<<st.port<<endl;
2006 }
2007 }
2008
2009 static void makeUDPServerSockets(unsigned int threadId)
2010 {
2011 int one=1;
2012 vector<string>locals;
2013 stringtok(locals,::arg()["local-address"]," ,");
2014
2015 if(locals.empty())
2016 throw PDNSException("No local address specified");
2017
2018 for(vector<string>::const_iterator i=locals.begin();i!=locals.end();++i) {
2019 ServiceTuple st;
2020 st.port=::arg().asNum("local-port");
2021 parseService(*i, st);
2022
2023 ComboAddress sin;
2024
2025 sin.reset();
2026 sin.sin4.sin_family = AF_INET;
2027 if(!IpToU32(st.host.c_str() , (uint32_t*)&sin.sin4.sin_addr.s_addr)) {
2028 sin.sin6.sin6_family = AF_INET6;
2029 if(makeIPv6sockaddr(st.host, &sin.sin6) < 0)
2030 throw PDNSException("Unable to resolve local address for UDP server on '"+ st.host +"'");
2031 }
2032
2033 int fd=socket(sin.sin4.sin_family, SOCK_DGRAM, 0);
2034 if(fd < 0) {
2035 throw PDNSException("Making a UDP server socket for resolver: "+netstringerror());
2036 }
2037 if (!setSocketTimestamps(fd))
2038 L<<Logger::Warning<<"Unable to enable timestamp reporting for socket"<<endl;
2039
2040 if(IsAnyAddress(sin)) {
2041 if(sin.sin4.sin_family == AF_INET)
2042 if(!setsockopt(fd, IPPROTO_IP, GEN_IP_PKTINFO, &one, sizeof(one))) // linux supports this, so why not - might fail on other systems
2043 g_fromtosockets.insert(fd);
2044 #ifdef IPV6_RECVPKTINFO
2045 if(sin.sin4.sin_family == AF_INET6)
2046 if(!setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, &one, sizeof(one)))
2047 g_fromtosockets.insert(fd);
2048 #endif
2049 if(sin.sin6.sin6_family == AF_INET6 && setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one)) < 0) {
2050 L<<Logger::Error<<"Failed to set IPv6 socket to IPv6 only, continuing anyhow: "<<strerror(errno)<<endl;
2051 }
2052 }
2053 if( ::arg().mustDo("non-local-bind") )
2054 Utility::setBindAny(AF_INET6, fd);
2055
2056 setCloseOnExec(fd);
2057
2058 setSocketReceiveBuffer(fd, 250000);
2059 sin.sin4.sin_port = htons(st.port);
2060
2061
2062 #ifdef SO_REUSEPORT
2063 if(g_reusePort) {
2064 if(setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &one, sizeof(one)) < 0)
2065 throw PDNSException("SO_REUSEPORT: "+stringerror());
2066 }
2067 #endif
2068 socklen_t socklen=sin.getSocklen();
2069 if (::bind(fd, (struct sockaddr *)&sin, socklen)<0)
2070 throw PDNSException("Resolver binding to server socket on port "+ std::to_string(st.port) +" for "+ st.host+": "+stringerror());
2071
2072 setNonBlocking(fd);
2073
2074 deferredAdds[threadId].push_back(make_pair(fd, handleNewUDPQuestion));
2075 g_listenSocketsAddresses[fd]=sin; // this is written to only from the startup thread, not from the workers
2076 if(sin.sin4.sin_family == AF_INET)
2077 L<<Logger::Error<<"Listening for UDP queries on "<< sin.toString() <<":"<<st.port<<endl;
2078 else
2079 L<<Logger::Error<<"Listening for UDP queries on ["<< sin.toString() <<"]:"<<st.port<<endl;
2080 }
2081 }
2082
2083 static void daemonize(void)
2084 {
2085 if(fork())
2086 exit(0); // bye bye
2087
2088 setsid();
2089
2090 int i=open("/dev/null",O_RDWR); /* open stdin */
2091 if(i < 0)
2092 L<<Logger::Critical<<"Unable to open /dev/null: "<<stringerror()<<endl;
2093 else {
2094 dup2(i,0); /* stdin */
2095 dup2(i,1); /* stderr */
2096 dup2(i,2); /* stderr */
2097 close(i);
2098 }
2099 }
2100
2101 static void usr1Handler(int)
2102 {
2103 statsWanted=true;
2104 }
2105
2106 static void usr2Handler(int)
2107 {
2108 g_quiet= !g_quiet;
2109 SyncRes::setDefaultLogMode(g_quiet ? SyncRes::LogNone : SyncRes::Log);
2110 ::arg().set("quiet")=g_quiet ? "" : "no";
2111 }
2112
2113 static void doStats(void)
2114 {
2115 static time_t lastOutputTime;
2116 static uint64_t lastQueryCount;
2117
2118 uint64_t cacheHits = broadcastAccFunction<uint64_t>(pleaseGetCacheHits);
2119 uint64_t cacheMisses = broadcastAccFunction<uint64_t>(pleaseGetCacheMisses);
2120
2121 if(g_stats.qcounter && (cacheHits + cacheMisses) && SyncRes::s_queries && SyncRes::s_outqueries) {
2122 L<<Logger::Notice<<"stats: "<<g_stats.qcounter<<" questions, "<<
2123 broadcastAccFunction<uint64_t>(pleaseGetCacheSize)<< " cache entries, "<<
2124 broadcastAccFunction<uint64_t>(pleaseGetNegCacheSize)<<" negative entries, "<<
2125 (int)((cacheHits*100.0)/(cacheHits+cacheMisses))<<"% cache hits"<<endl;
2126
2127 L<<Logger::Notice<<"stats: throttle map: "
2128 << broadcastAccFunction<uint64_t>(pleaseGetThrottleSize) <<", ns speeds: "
2129 << broadcastAccFunction<uint64_t>(pleaseGetNsSpeedsSize)<<endl;
2130 L<<Logger::Notice<<"stats: outpacket/query ratio "<<(int)(SyncRes::s_outqueries*100.0/SyncRes::s_queries)<<"%";
2131 L<<Logger::Notice<<", "<<(int)(SyncRes::s_throttledqueries*100.0/(SyncRes::s_outqueries+SyncRes::s_throttledqueries))<<"% throttled, "
2132 <<SyncRes::s_nodelegated<<" no-delegation drops"<<endl;
2133 L<<Logger::Notice<<"stats: "<<SyncRes::s_tcpoutqueries<<" outgoing tcp connections, "<<
2134 broadcastAccFunction<uint64_t>(pleaseGetConcurrentQueries)<<" queries running, "<<SyncRes::s_outgoingtimeouts<<" outgoing timeouts"<<endl;
2135
2136 //L<<Logger::Notice<<"stats: "<<g_stats.ednsPingMatches<<" ping matches, "<<g_stats.ednsPingMismatches<<" mismatches, "<<
2137 //g_stats.noPingOutQueries<<" outqueries w/o ping, "<< g_stats.noEdnsOutQueries<<" w/o EDNS"<<endl;
2138
2139 L<<Logger::Notice<<"stats: " << broadcastAccFunction<uint64_t>(pleaseGetPacketCacheSize) <<
2140 " packet cache entries, "<<(int)(100.0*broadcastAccFunction<uint64_t>(pleaseGetPacketCacheHits)/SyncRes::s_queries) << "% packet cache hits"<<endl;
2141
2142 time_t now = time(0);
2143 if(lastOutputTime && lastQueryCount && now != lastOutputTime) {
2144 L<<Logger::Notice<<"stats: "<< (SyncRes::s_queries - lastQueryCount) / (now - lastOutputTime) <<" qps (average over "<< (now - lastOutputTime) << " seconds)"<<endl;
2145 }
2146 lastOutputTime = now;
2147 lastQueryCount = SyncRes::s_queries;
2148 }
2149 else if(statsWanted)
2150 L<<Logger::Notice<<"stats: no stats yet!"<<endl;
2151
2152 statsWanted=false;
2153 }
2154
2155 static void houseKeeping(void *)
2156 {
2157 static thread_local time_t last_rootupdate, last_prune, last_secpoll;
2158 static thread_local int cleanCounter=0;
2159 static thread_local bool s_running; // houseKeeping can get suspended in secpoll, and be restarted, which makes us do duplicate work
2160 try {
2161 if(s_running)
2162 return;
2163 s_running=true;
2164
2165 struct timeval now;
2166 Utility::gettimeofday(&now, 0);
2167
2168 if(now.tv_sec - last_prune > (time_t)(5 + t_id)) {
2169 DTime dt;
2170 dt.setTimeval(now);
2171 t_RC->doPrune(g_maxCacheEntries / g_numThreads); // this function is local to a thread, so fine anyhow
2172 t_packetCache->doPruneTo(g_maxPacketCacheEntries / g_numWorkerThreads);
2173
2174 SyncRes::pruneNegCache(g_maxCacheEntries / (g_numWorkerThreads * 10));
2175
2176 if(!((cleanCounter++)%40)) { // this is a full scan!
2177 time_t limit=now.tv_sec-300;
2178 SyncRes::pruneNSSpeeds(limit);
2179 }
2180 last_prune=time(0);
2181 }
2182
2183 if(now.tv_sec - last_rootupdate > 7200) {
2184 int res = SyncRes::getRootNS(g_now, nullptr);
2185 if (!res)
2186 last_rootupdate=now.tv_sec;
2187 }
2188
2189 if(t_id == s_distributorThreadID) {
2190
2191 if(now.tv_sec - last_secpoll >= 3600) {
2192 try {
2193 doSecPoll(&last_secpoll);
2194 }
2195 catch(std::exception& e)
2196 {
2197 L<<Logger::Error<<"Exception while performing security poll: "<<e.what()<<endl;
2198 }
2199 catch(PDNSException& e)
2200 {
2201 L<<Logger::Error<<"Exception while performing security poll: "<<e.reason<<endl;
2202 }
2203 catch(ImmediateServFailException &e)
2204 {
2205 L<<Logger::Error<<"Exception while performing security poll: "<<e.reason<<endl;
2206 }
2207 catch(...)
2208 {
2209 L<<Logger::Error<<"Exception while performing security poll"<<endl;
2210 }
2211
2212 }
2213 }
2214 s_running=false;
2215 }
2216 catch(PDNSException& ae)
2217 {
2218 s_running=false;
2219 L<<Logger::Error<<"Fatal error in housekeeping thread: "<<ae.reason<<endl;
2220 throw;
2221 }
2222 }
2223
2224 static void makeThreadPipes()
2225 {
2226 for(unsigned int n=0; n < g_numThreads; ++n) {
2227 struct ThreadPipeSet tps;
2228 int fd[2];
2229 if(pipe(fd) < 0)
2230 unixDie("Creating pipe for inter-thread communications");
2231
2232 tps.readToThread = fd[0];
2233 tps.writeToThread = fd[1];
2234
2235 if(pipe(fd) < 0)
2236 unixDie("Creating pipe for inter-thread communications");
2237 tps.readFromThread = fd[0];
2238 tps.writeFromThread = fd[1];
2239
2240 if(pipe(fd) < 0)
2241 unixDie("Creating pipe for inter-thread communications");
2242 tps.readQueriesToThread = fd[0];
2243 tps.writeQueriesToThread = fd[1];
2244
2245 if (!setNonBlocking(tps.writeQueriesToThread)) {
2246 unixDie("Making pipe for inter-thread communications non-blocking");
2247 }
2248
2249 g_pipes.push_back(tps);
2250 }
2251 }
2252
2253 struct ThreadMSG
2254 {
2255 pipefunc_t func;
2256 bool wantAnswer;
2257 };
2258
2259 void broadcastFunction(const pipefunc_t& func)
2260 {
2261 /* This function might be called before t_id are set during startup
2262 for the initialization of ACLs and domain maps, but the default is the same
2263 than the handler thread */
2264 if (t_id != s_handlerThreadID) {
2265 L<<Logger::Error<<"broadcastFunction() has been called by a worker ("<<t_id<<")"<<endl;
2266 exit(1);
2267 }
2268
2269 /* the distributor will call itself below, but if we are the handler thread,
2270 call the function ourselves to update the ACL or domain maps for example */
2271 func();
2272
2273 int n = 0;
2274 for(ThreadPipeSet& tps : g_pipes)
2275 {
2276 if(n++ == t_id) {
2277 func(); // don't write to ourselves!
2278 continue;
2279 }
2280
2281 ThreadMSG* tmsg = new ThreadMSG();
2282 tmsg->func = func;
2283 tmsg->wantAnswer = true;
2284 if(write(tps.writeToThread, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) {
2285 delete tmsg;
2286 unixDie("write to thread pipe returned wrong size or error");
2287 }
2288
2289 string* resp = nullptr;
2290 if(read(tps.readFromThread, &resp, sizeof(resp)) != sizeof(resp))
2291 unixDie("read from thread pipe returned wrong size or error");
2292
2293 if(resp) {
2294 delete resp;
2295 resp = nullptr;
2296 }
2297 }
2298 }
2299
2300 // This function is only called by the distributor thread, when pdns-distributes-queries is set
2301 void distributeAsyncFunction(const string& packet, const pipefunc_t& func)
2302 {
2303 if (t_id != s_distributorThreadID) {
2304 L<<Logger::Error<<"distributeAsyncFunction() has been called by a worker ("<<t_id<<")"<<endl;
2305 exit(1);
2306 }
2307
2308 unsigned int hash = hashQuestion(packet.c_str(), packet.length(), g_disthashseed);
2309 unsigned int target = 1 + (hash % (g_pipes.size()-1));
2310
2311 if(target == static_cast<unsigned int>(s_distributorThreadID)) {
2312 L<<Logger::Error<<"distributeAsyncFunction() tried to assign a query to the distributor"<<endl;
2313 exit(1);
2314 }
2315
2316 ThreadPipeSet& tps = g_pipes[target];
2317 ThreadMSG* tmsg = new ThreadMSG();
2318 tmsg->func = func;
2319 tmsg->wantAnswer = false;
2320
2321 ssize_t written = write(tps.writeQueriesToThread, &tmsg, sizeof(tmsg));
2322 if (written > 0) {
2323 if (static_cast<size_t>(written) != sizeof(tmsg)) {
2324 delete tmsg;
2325 unixDie("write to thread pipe returned wrong size or error");
2326 }
2327 }
2328 else {
2329 int error = errno;
2330 delete tmsg;
2331 if (error == EAGAIN || error == EWOULDBLOCK) {
2332 g_stats.queryPipeFullDrops++;
2333 } else {
2334 unixDie("write to thread pipe returned wrong size or error:" + std::to_string(error));
2335 }
2336 }
2337 }
2338
2339 static void handlePipeRequest(int fd, FDMultiplexer::funcparam_t& var)
2340 {
2341 ThreadMSG* tmsg = nullptr;
2342
2343 if(read(fd, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) { // fd == readToThread || fd == readQueriesToThread
2344 unixDie("read from thread pipe returned wrong size or error");
2345 }
2346
2347 void *resp=0;
2348 try {
2349 resp = tmsg->func();
2350 }
2351 catch(std::exception& e) {
2352 if(g_logCommonErrors)
2353 L<<Logger::Error<<"PIPE function we executed created exception: "<<e.what()<<endl; // but what if they wanted an answer.. we send 0
2354 }
2355 catch(PDNSException& e) {
2356 if(g_logCommonErrors)
2357 L<<Logger::Error<<"PIPE function we executed created PDNS exception: "<<e.reason<<endl; // but what if they wanted an answer.. we send 0
2358 }
2359 if(tmsg->wantAnswer) {
2360 if(write(g_pipes[t_id].writeFromThread, &resp, sizeof(resp)) != sizeof(resp)) {
2361 delete tmsg;
2362 unixDie("write to thread pipe returned wrong size or error");
2363 }
2364 }
2365
2366 delete tmsg;
2367 }
2368
2369 template<class T> void *voider(const boost::function<T*()>& func)
2370 {
2371 return func();
2372 }
2373
2374 vector<ComboAddress>& operator+=(vector<ComboAddress>&a, const vector<ComboAddress>& b)
2375 {
2376 a.insert(a.end(), b.begin(), b.end());
2377 return a;
2378 }
2379
2380 vector<pair<string, uint16_t> >& operator+=(vector<pair<string, uint16_t> >&a, const vector<pair<string, uint16_t> >& b)
2381 {
2382 a.insert(a.end(), b.begin(), b.end());
2383 return a;
2384 }
2385
2386 vector<pair<DNSName, uint16_t> >& operator+=(vector<pair<DNSName, uint16_t> >&a, const vector<pair<DNSName, uint16_t> >& b)
2387 {
2388 a.insert(a.end(), b.begin(), b.end());
2389 return a;
2390 }
2391
2392
2393 /*
2394 This function should only be called by the handler to gather metrics, wipe the cache,
2395 reload the Lua script (not the Lua config) or change the current trace regex,
2396 and by the SNMP thread to gather metrics. */
2397 template<class T> T broadcastAccFunction(const boost::function<T*()>& func)
2398 {
2399 /* the SNMP thread uses id -1 too */
2400 if (t_id != s_handlerThreadID) {
2401 L<<Logger::Error<<"broadcastAccFunction has been called by a worker ("<<t_id<<")"<<endl;
2402 exit(1);
2403 }
2404
2405 T ret=T();
2406 for(ThreadPipeSet& tps : g_pipes)
2407 {
2408 ThreadMSG* tmsg = new ThreadMSG();
2409 tmsg->func = boost::bind(voider<T>, func);
2410 tmsg->wantAnswer = true;
2411
2412 if(write(tps.writeToThread, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) {
2413 delete tmsg;
2414 unixDie("write to thread pipe returned wrong size or error");
2415 }
2416
2417 T* resp = nullptr;
2418 if(read(tps.readFromThread, &resp, sizeof(resp)) != sizeof(resp))
2419 unixDie("read from thread pipe returned wrong size or error");
2420
2421 if(resp) {
2422 ret += *resp;
2423 delete resp;
2424 resp = nullptr;
2425 }
2426 }
2427 return ret;
2428 }
2429
2430 template string broadcastAccFunction(const boost::function<string*()>& fun); // explicit instantiation
2431 template uint64_t broadcastAccFunction(const boost::function<uint64_t*()>& fun); // explicit instantiation
2432 template vector<ComboAddress> broadcastAccFunction(const boost::function<vector<ComboAddress> *()>& fun); // explicit instantiation
2433 template vector<pair<DNSName,uint16_t> > broadcastAccFunction(const boost::function<vector<pair<DNSName, uint16_t> > *()>& fun); // explicit instantiation
2434
2435 static void handleRCC(int fd, FDMultiplexer::funcparam_t& var)
2436 {
2437 string remote;
2438 string msg=s_rcc.recv(&remote);
2439 RecursorControlParser rcp;
2440 RecursorControlParser::func_t* command;
2441
2442 string answer=rcp.getAnswer(msg, &command);
2443
2444 // If we are inside a chroot, we need to strip
2445 if (!arg()["chroot"].empty()) {
2446 size_t len = arg()["chroot"].length();
2447 remote = remote.substr(len);
2448 }
2449
2450 try {
2451 s_rcc.send(answer, &remote);
2452 command();
2453 }
2454 catch(std::exception& e) {
2455 L<<Logger::Error<<"Error dealing with control socket request: "<<e.what()<<endl;
2456 }
2457 catch(PDNSException& ae) {
2458 L<<Logger::Error<<"Error dealing with control socket request: "<<ae.reason<<endl;
2459 }
2460 }
2461
2462 static void handleTCPClientReadable(int fd, FDMultiplexer::funcparam_t& var)
2463 {
2464 PacketID* pident=any_cast<PacketID>(&var);
2465 // cerr<<"handleTCPClientReadable called for fd "<<fd<<", pident->inNeeded: "<<pident->inNeeded<<", "<<pident->sock->getHandle()<<endl;
2466
2467 shared_array<char> buffer(new char[pident->inNeeded]);
2468
2469 ssize_t ret=recv(fd, buffer.get(), pident->inNeeded,0);
2470 if(ret > 0) {
2471 pident->inMSG.append(&buffer[0], &buffer[ret]);
2472 pident->inNeeded-=(size_t)ret;
2473 if(!pident->inNeeded || pident->inIncompleteOkay) {
2474 // cerr<<"Got entire load of "<<pident->inMSG.size()<<" bytes"<<endl;
2475 PacketID pid=*pident;
2476 string msg=pident->inMSG;
2477
2478 t_fdm->removeReadFD(fd);
2479 MT->sendEvent(pid, &msg);
2480 }
2481 else {
2482 // cerr<<"Still have "<<pident->inNeeded<<" left to go"<<endl;
2483 }
2484 }
2485 else {
2486 PacketID tmp=*pident;
2487 t_fdm->removeReadFD(fd); // pident might now be invalid (it isn't, but still)
2488 string empty;
2489 MT->sendEvent(tmp, &empty); // this conveys error status
2490 }
2491 }
2492
2493 static void handleTCPClientWritable(int fd, FDMultiplexer::funcparam_t& var)
2494 {
2495 PacketID* pid=any_cast<PacketID>(&var);
2496 ssize_t ret=send(fd, pid->outMSG.c_str() + pid->outPos, pid->outMSG.size() - pid->outPos,0);
2497 if(ret > 0) {
2498 pid->outPos+=(ssize_t)ret;
2499 if(pid->outPos==pid->outMSG.size()) {
2500 PacketID tmp=*pid;
2501 t_fdm->removeWriteFD(fd);
2502 MT->sendEvent(tmp, &tmp.outMSG); // send back what we sent to convey everything is ok
2503 }
2504 }
2505 else { // error or EOF
2506 PacketID tmp(*pid);
2507 t_fdm->removeWriteFD(fd);
2508 string sent;
2509 MT->sendEvent(tmp, &sent); // we convey error status by sending empty string
2510 }
2511 }
2512
2513 // resend event to everybody chained onto it
2514 static void doResends(MT_t::waiters_t::iterator& iter, PacketID resend, const string& content)
2515 {
2516 if(iter->key.chain.empty())
2517 return;
2518 // cerr<<"doResends called!\n";
2519 for(PacketID::chain_t::iterator i=iter->key.chain.begin(); i != iter->key.chain.end() ; ++i) {
2520 resend.fd=-1;
2521 resend.id=*i;
2522 // cerr<<"\tResending "<<content.size()<<" bytes for fd="<<resend.fd<<" and id="<<resend.id<<endl;
2523
2524 MT->sendEvent(resend, &content);
2525 g_stats.chainResends++;
2526 }
2527 }
2528
2529 static void handleUDPServerResponse(int fd, FDMultiplexer::funcparam_t& var)
2530 {
2531 PacketID pid=any_cast<PacketID>(var);
2532 ssize_t len;
2533 char data[g_outgoingEDNSBufsize];
2534 ComboAddress fromaddr;
2535 socklen_t addrlen=sizeof(fromaddr);
2536
2537 len=recvfrom(fd, data, sizeof(data), 0, (sockaddr *)&fromaddr, &addrlen);
2538
2539 if(len < (ssize_t) sizeof(dnsheader)) {
2540 if(len < 0)
2541 ; // cerr<<"Error on fd "<<fd<<": "<<stringerror()<<"\n";
2542 else {
2543 g_stats.serverParseError++;
2544 if(g_logCommonErrors)
2545 L<<Logger::Error<<"Unable to parse packet from remote UDP server "<< fromaddr.toString() <<
2546 ": packet smaller than DNS header"<<endl;
2547 }
2548
2549 t_udpclientsocks->returnSocket(fd);
2550 string empty;
2551
2552 MT_t::waiters_t::iterator iter=MT->d_waiters.find(pid);
2553 if(iter != MT->d_waiters.end())
2554 doResends(iter, pid, empty);
2555
2556 MT->sendEvent(pid, &empty); // this denotes error (does lookup again.. at least L1 will be hot)
2557 return;
2558 }
2559
2560 dnsheader dh;
2561 memcpy(&dh, data, sizeof(dh));
2562
2563 PacketID pident;
2564 pident.remote=fromaddr;
2565 pident.id=dh.id;
2566 pident.fd=fd;
2567
2568 if(!dh.qr && g_logCommonErrors) {
2569 L<<Logger::Notice<<"Not taking data from question on outgoing socket from "<< fromaddr.toStringWithPort() <<endl;
2570 }
2571
2572 if(!dh.qdcount || // UPC, Nominum, very old BIND on FormErr, NSD
2573 !dh.qr) { // one weird server
2574 pident.domain.clear();
2575 pident.type = 0;
2576 }
2577 else {
2578 try {
2579 if(len > 12)
2580 pident.domain=DNSName(data, len, 12, false, &pident.type); // don't copy this from above - we need to do the actual read
2581 }
2582 catch(std::exception& e) {
2583 g_stats.serverParseError++; // won't be fed to lwres.cc, so we have to increment
2584 L<<Logger::Warning<<"Error in packet from remote nameserver "<< fromaddr.toStringWithPort() << ": "<<e.what() << endl;
2585 return;
2586 }
2587 }
2588 string packet;
2589 packet.assign(data, len);
2590
2591 MT_t::waiters_t::iterator iter=MT->d_waiters.find(pident);
2592 if(iter != MT->d_waiters.end()) {
2593 doResends(iter, pident, packet);
2594 }
2595
2596 retryWithName:
2597
2598 if(!MT->sendEvent(pident, &packet)) {
2599 // we do a full scan for outstanding queries on unexpected answers. not too bad since we only accept them on the right port number, which is hard enough to guess
2600 for(MT_t::waiters_t::iterator mthread=MT->d_waiters.begin(); mthread!=MT->d_waiters.end(); ++mthread) {
2601 if(pident.fd==mthread->key.fd && mthread->key.remote==pident.remote && mthread->key.type == pident.type &&
2602 pident.domain == mthread->key.domain) {
2603 mthread->key.nearMisses++;
2604 }
2605
2606 // be a bit paranoid here since we're weakening our matching
2607 if(pident.domain.empty() && !mthread->key.domain.empty() && !pident.type && mthread->key.type &&
2608 pident.id == mthread->key.id && mthread->key.remote == pident.remote) {
2609 // cerr<<"Empty response, rest matches though, sending to a waiter"<<endl;
2610 pident.domain = mthread->key.domain;
2611 pident.type = mthread->key.type;
2612 goto retryWithName; // note that this only passes on an error, lwres will still reject the packet
2613 }
2614 }
2615 g_stats.unexpectedCount++; // if we made it here, it really is an unexpected answer
2616 if(g_logCommonErrors) {
2617 L<<Logger::Warning<<"Discarding unexpected packet from "<<fromaddr.toStringWithPort()<<": "<< (pident.domain.empty() ? "<empty>" : pident.domain.toString())<<", "<<pident.type<<", "<<MT->d_waiters.size()<<" waiters"<<endl;
2618 }
2619 }
2620 else if(fd >= 0) {
2621 t_udpclientsocks->returnSocket(fd);
2622 }
2623 }
2624
2625 FDMultiplexer* getMultiplexer()
2626 {
2627 FDMultiplexer* ret;
2628 for(const auto& i : FDMultiplexer::getMultiplexerMap()) {
2629 try {
2630 ret=i.second();
2631 return ret;
2632 }
2633 catch(FDMultiplexerException &fe) {
2634 L<<Logger::Error<<"Non-fatal error initializing possible multiplexer ("<<fe.what()<<"), falling back"<<endl;
2635 }
2636 catch(...) {
2637 L<<Logger::Error<<"Non-fatal error initializing possible multiplexer"<<endl;
2638 }
2639 }
2640 L<<Logger::Error<<"No working multiplexer found!"<<endl;
2641 exit(1);
2642 }
2643
2644
2645 static string* doReloadLuaScript()
2646 {
2647 string fname= ::arg()["lua-dns-script"];
2648 try {
2649 if(fname.empty()) {
2650 t_pdl.reset();
2651 L<<Logger::Error<<t_id<<" Unloaded current lua script"<<endl;
2652 return new string("unloaded\n");
2653 }
2654 else {
2655 t_pdl = std::make_shared<RecursorLua4>(fname);
2656 }
2657 }
2658 catch(std::exception& e) {
2659 L<<Logger::Error<<t_id<<" Retaining current script, error from '"<<fname<<"': "<< e.what() <<endl;
2660 return new string("retaining current script, error from '"+fname+"': "+e.what()+"\n");
2661 }
2662
2663 L<<Logger::Warning<<t_id<<" (Re)loaded lua script from '"<<fname<<"'"<<endl;
2664 return new string("(re)loaded '"+fname+"'\n");
2665 }
2666
2667 string doQueueReloadLuaScript(vector<string>::const_iterator begin, vector<string>::const_iterator end)
2668 {
2669 if(begin != end)
2670 ::arg().set("lua-dns-script") = *begin;
2671
2672 return broadcastAccFunction<string>(doReloadLuaScript);
2673 }
2674
2675 static string* pleaseUseNewTraceRegex(const std::string& newRegex)
2676 try
2677 {
2678 if(newRegex.empty()) {
2679 t_traceRegex.reset();
2680 return new string("unset\n");
2681 }
2682 else {
2683 t_traceRegex = std::make_shared<Regex>(newRegex);
2684 return new string("ok\n");
2685 }
2686 }
2687 catch(PDNSException& ae)
2688 {
2689 return new string(ae.reason+"\n");
2690 }
2691
2692 string doTraceRegex(vector<string>::const_iterator begin, vector<string>::const_iterator end)
2693 {
2694 return broadcastAccFunction<string>(boost::bind(pleaseUseNewTraceRegex, begin!=end ? *begin : ""));
2695 }
2696
2697 static void checkLinuxIPv6Limits()
2698 {
2699 #ifdef __linux__
2700 string line;
2701 if(readFileIfThere("/proc/sys/net/ipv6/route/max_size", &line)) {
2702 int lim=std::stoi(line);
2703 if(lim < 16384) {
2704 L<<Logger::Error<<"If using IPv6, please raise sysctl net.ipv6.route.max_size, currently set to "<<lim<<" which is < 16384"<<endl;
2705 }
2706 }
2707 #endif
2708 }
2709 static void checkOrFixFDS()
2710 {
2711 unsigned int availFDs=getFilenumLimit();
2712 unsigned int wantFDs = g_maxMThreads * g_numWorkerThreads +25; // even healthier margin then before
2713
2714 if(wantFDs > availFDs) {
2715 unsigned int hardlimit= getFilenumLimit(true);
2716 if(hardlimit >= wantFDs) {
2717 setFilenumLimit(wantFDs);
2718 L<<Logger::Warning<<"Raised soft limit on number of filedescriptors to "<<wantFDs<<" to match max-mthreads and threads settings"<<endl;
2719 }
2720 else {
2721 int newval = (hardlimit - 25) / g_numWorkerThreads;
2722 L<<Logger::Warning<<"Insufficient number of filedescriptors available for max-mthreads*threads setting! ("<<hardlimit<<" < "<<wantFDs<<"), reducing max-mthreads to "<<newval<<endl;
2723 g_maxMThreads = newval;
2724 setFilenumLimit(hardlimit);
2725 }
2726 }
2727 }
2728
2729 static void* recursorThread(int tid, bool worker);
2730
2731 static void* pleaseSupplantACLs(std::shared_ptr<NetmaskGroup> ng)
2732 {
2733 t_allowFrom = ng;
2734 return nullptr;
2735 }
2736
2737 int g_argc;
2738 char** g_argv;
2739
2740 void parseACLs()
2741 {
2742 static bool l_initialized;
2743
2744 if(l_initialized) { // only reload configuration file on second call
2745 string configname=::arg()["config-dir"]+"/recursor.conf";
2746 if(::arg()["config-name"]!="") {
2747 configname=::arg()["config-dir"]+"/recursor-"+::arg()["config-name"]+".conf";
2748 }
2749 cleanSlashes(configname);
2750
2751 if(!::arg().preParseFile(configname.c_str(), "allow-from-file"))
2752 throw runtime_error("Unable to re-parse configuration file '"+configname+"'");
2753 ::arg().preParseFile(configname.c_str(), "allow-from", LOCAL_NETS);
2754 ::arg().preParseFile(configname.c_str(), "include-dir");
2755 ::arg().preParse(g_argc, g_argv, "include-dir");
2756
2757 // then process includes
2758 std::vector<std::string> extraConfigs;
2759 ::arg().gatherIncludes(extraConfigs);
2760
2761 for(const std::string& fn : extraConfigs) {
2762 if(!::arg().preParseFile(fn.c_str(), "allow-from-file", ::arg()["allow-from-file"]))
2763 throw runtime_error("Unable to re-parse configuration file include '"+fn+"'");
2764 if(!::arg().preParseFile(fn.c_str(), "allow-from", ::arg()["allow-from"]))
2765 throw runtime_error("Unable to re-parse configuration file include '"+fn+"'");
2766 }
2767
2768 ::arg().preParse(g_argc, g_argv, "allow-from-file");
2769 ::arg().preParse(g_argc, g_argv, "allow-from");
2770 }
2771
2772 std::shared_ptr<NetmaskGroup> oldAllowFrom = t_allowFrom;
2773 std::shared_ptr<NetmaskGroup> allowFrom = std::make_shared<NetmaskGroup>();
2774
2775 if(!::arg()["allow-from-file"].empty()) {
2776 string line;
2777 ifstream ifs(::arg()["allow-from-file"].c_str());
2778 if(!ifs) {
2779 throw runtime_error("Could not open '"+::arg()["allow-from-file"]+"': "+stringerror());
2780 }
2781
2782 string::size_type pos;
2783 while(getline(ifs,line)) {
2784 pos=line.find('#');
2785 if(pos!=string::npos)
2786 line.resize(pos);
2787 trim(line);
2788 if(line.empty())
2789 continue;
2790
2791 allowFrom->addMask(line);
2792 }
2793 L<<Logger::Warning<<"Done parsing " << allowFrom->size() <<" allow-from ranges from file '"<<::arg()["allow-from-file"]<<"' - overriding 'allow-from' setting"<<endl;
2794 }
2795 else if(!::arg()["allow-from"].empty()) {
2796 vector<string> ips;
2797 stringtok(ips, ::arg()["allow-from"], ", ");
2798
2799 L<<Logger::Warning<<"Only allowing queries from: ";
2800 for(vector<string>::const_iterator i = ips.begin(); i!= ips.end(); ++i) {
2801 allowFrom->addMask(*i);
2802 if(i!=ips.begin())
2803 L<<Logger::Warning<<", ";
2804 L<<Logger::Warning<<*i;
2805 }
2806 L<<Logger::Warning<<endl;
2807 }
2808 else {
2809 if(::arg()["local-address"]!="127.0.0.1" && ::arg().asNum("local-port")==53)
2810 L<<Logger::Error<<"WARNING: Allowing queries from all IP addresses - this can be a security risk!"<<endl;
2811 allowFrom = nullptr;
2812 }
2813
2814 g_initialAllowFrom = allowFrom;
2815 broadcastFunction(boost::bind(pleaseSupplantACLs, allowFrom));
2816 oldAllowFrom = nullptr;
2817
2818 l_initialized = true;
2819 }
2820
2821
2822 static void setupDelegationOnly()
2823 {
2824 vector<string> parts;
2825 stringtok(parts, ::arg()["delegation-only"], ", \t");
2826 for(const auto& p : parts) {
2827 SyncRes::addDelegationOnly(DNSName(p));
2828 }
2829 }
2830
2831 static std::map<unsigned int, std::set<int> > parseCPUMap()
2832 {
2833 std::map<unsigned int, std::set<int> > result;
2834
2835 const std::string value = ::arg()["cpu-map"];
2836
2837 if (!value.empty() && !isSettingThreadCPUAffinitySupported()) {
2838 L<<Logger::Warning<<"CPU mapping requested but not supported, skipping"<<endl;
2839 return result;
2840 }
2841
2842 std::vector<std::string> parts;
2843
2844 stringtok(parts, value, " \t");
2845
2846 for(const auto& part : parts) {
2847 if (part.find('=') == string::npos)
2848 continue;
2849
2850 try {
2851 auto headers = splitField(part, '=');
2852 trim(headers.first);
2853 trim(headers.second);
2854
2855 unsigned int threadId = pdns_stou(headers.first);
2856 std::vector<std::string> cpus;
2857
2858 stringtok(cpus, headers.second, ",");
2859
2860 for(const auto& cpu : cpus) {
2861 int cpuId = std::stoi(cpu);
2862
2863 result[threadId].insert(cpuId);
2864 }
2865 }
2866 catch(const std::exception& e) {
2867 L<<Logger::Error<<"Error parsing cpu-map entry '"<<part<<"': "<<e.what()<<endl;
2868 }
2869 }
2870
2871 return result;
2872 }
2873
2874 static void setCPUMap(const std::map<unsigned int, std::set<int> >& cpusMap, unsigned int n, pthread_t tid)
2875 {
2876 const auto& cpuMapping = cpusMap.find(n);
2877 if (cpuMapping != cpusMap.cend()) {
2878 int rc = mapThreadToCPUList(tid, cpuMapping->second);
2879 if (rc == 0) {
2880 L<<Logger::Info<<"CPU affinity for worker "<<n<<" has been set to CPU map:";
2881 for (const auto cpu : cpuMapping->second) {
2882 L<<Logger::Info<<" "<<cpu;
2883 }
2884 L<<Logger::Info<<endl;
2885 }
2886 else {
2887 L<<Logger::Warning<<"Error setting CPU affinity for worker "<<n<<" to CPU map:";
2888 for (const auto cpu : cpuMapping->second) {
2889 L<<Logger::Info<<" "<<cpu;
2890 }
2891 L<<Logger::Info<<strerror(rc)<<endl;
2892 }
2893 }
2894 }
2895
2896 static int serviceMain(int argc, char*argv[])
2897 {
2898 L.setName(s_programname);
2899 L.disableSyslog(::arg().mustDo("disable-syslog"));
2900 L.setTimestamps(::arg().mustDo("log-timestamp"));
2901
2902 if(!::arg()["logging-facility"].empty()) {
2903 int val=logFacilityToLOG(::arg().asNum("logging-facility") );
2904 if(val >= 0)
2905 theL().setFacility(val);
2906 else
2907 L<<Logger::Error<<"Unknown logging facility "<<::arg().asNum("logging-facility") <<endl;
2908 }
2909
2910 showProductVersion();
2911 seedRandom(::arg()["entropy-source"]);
2912
2913 g_disthashseed=dns_random(0xffffffff);
2914
2915 checkLinuxIPv6Limits();
2916 try {
2917 vector<string> addrs;
2918 if(!::arg()["query-local-address6"].empty()) {
2919 SyncRes::s_doIPv6=true;
2920 L<<Logger::Warning<<"Enabling IPv6 transport for outgoing queries"<<endl;
2921
2922 stringtok(addrs, ::arg()["query-local-address6"], ", ;");
2923 for(const string& addr : addrs) {
2924 g_localQueryAddresses6.push_back(ComboAddress(addr));
2925 }
2926 }
2927 else {
2928 L<<Logger::Warning<<"NOT using IPv6 for outgoing queries - set 'query-local-address6=::' to enable"<<endl;
2929 }
2930 addrs.clear();
2931 stringtok(addrs, ::arg()["query-local-address"], ", ;");
2932 for(const string& addr : addrs) {
2933 g_localQueryAddresses4.push_back(ComboAddress(addr));
2934 }
2935 }
2936 catch(std::exception& e) {
2937 L<<Logger::Error<<"Assigning local query addresses: "<<e.what();
2938 exit(99);
2939 }
2940
2941 // keep this ABOVE loadRecursorLuaConfig!
2942 if(::arg()["dnssec"]=="off")
2943 g_dnssecmode=DNSSECMode::Off;
2944 else if(::arg()["dnssec"]=="process-no-validate")
2945 g_dnssecmode=DNSSECMode::ProcessNoValidate;
2946 else if(::arg()["dnssec"]=="process")
2947 g_dnssecmode=DNSSECMode::Process;
2948 else if(::arg()["dnssec"]=="validate")
2949 g_dnssecmode=DNSSECMode::ValidateAll;
2950 else if(::arg()["dnssec"]=="log-fail")
2951 g_dnssecmode=DNSSECMode::ValidateForLog;
2952 else {
2953 L<<Logger::Error<<"Unknown DNSSEC mode "<<::arg()["dnssec"]<<endl;
2954 exit(1);
2955 }
2956
2957 g_dnssecLogBogus = ::arg().mustDo("dnssec-log-bogus");
2958 g_maxNSEC3Iterations = ::arg().asNum("nsec3-max-iterations");
2959
2960 g_maxCacheEntries = ::arg().asNum("max-cache-entries");
2961 g_maxPacketCacheEntries = ::arg().asNum("max-packetcache-entries");
2962
2963 try {
2964 loadRecursorLuaConfig(::arg()["lua-config-file"], ::arg().mustDo("daemon"));
2965 }
2966 catch (PDNSException &e) {
2967 L<<Logger::Error<<"Cannot load Lua configuration: "<<e.reason<<endl;
2968 exit(1);
2969 }
2970
2971 parseACLs();
2972 sortPublicSuffixList();
2973
2974 if(!::arg()["dont-query"].empty()) {
2975 vector<string> ips;
2976 stringtok(ips, ::arg()["dont-query"], ", ");
2977 ips.push_back("0.0.0.0");
2978 ips.push_back("::");
2979
2980 L<<Logger::Warning<<"Will not send queries to: ";
2981 for(vector<string>::const_iterator i = ips.begin(); i!= ips.end(); ++i) {
2982 SyncRes::addDontQuery(*i);
2983 if(i!=ips.begin())
2984 L<<Logger::Warning<<", ";
2985 L<<Logger::Warning<<*i;
2986 }
2987 L<<Logger::Warning<<endl;
2988 }
2989
2990 g_quiet=::arg().mustDo("quiet");
2991
2992 g_weDistributeQueries = ::arg().mustDo("pdns-distributes-queries");
2993 if(g_weDistributeQueries) {
2994 L<<Logger::Warning<<"PowerDNS Recursor itself will distribute queries over threads"<<endl;
2995 }
2996
2997 setupDelegationOnly();
2998 g_outgoingEDNSBufsize=::arg().asNum("edns-outgoing-bufsize");
2999
3000 if(::arg()["trace"]=="fail") {
3001 SyncRes::setDefaultLogMode(SyncRes::Store);
3002 }
3003 else if(::arg().mustDo("trace")) {
3004 SyncRes::setDefaultLogMode(SyncRes::Log);
3005 ::arg().set("quiet")="no";
3006 g_quiet=false;
3007 g_dnssecLOG=true;
3008 }
3009
3010 SyncRes::s_minimumTTL = ::arg().asNum("minimum-ttl-override");
3011
3012 SyncRes::s_nopacketcache = ::arg().mustDo("disable-packetcache");
3013
3014 SyncRes::s_maxnegttl=::arg().asNum("max-negative-ttl");
3015 SyncRes::s_maxcachettl=max(::arg().asNum("max-cache-ttl"), 15);
3016 SyncRes::s_packetcachettl=::arg().asNum("packetcache-ttl");
3017 // Cap the packetcache-servfail-ttl to the packetcache-ttl
3018 uint32_t packetCacheServFailTTL = ::arg().asNum("packetcache-servfail-ttl");
3019 SyncRes::s_packetcacheservfailttl=(packetCacheServFailTTL > SyncRes::s_packetcachettl) ? SyncRes::s_packetcachettl : packetCacheServFailTTL;
3020 SyncRes::s_serverdownmaxfails=::arg().asNum("server-down-max-fails");
3021 SyncRes::s_serverdownthrottletime=::arg().asNum("server-down-throttle-time");
3022 SyncRes::s_serverID=::arg()["server-id"];
3023 SyncRes::s_maxqperq=::arg().asNum("max-qperq");
3024 SyncRes::s_maxtotusec=1000*::arg().asNum("max-total-msec");
3025 SyncRes::s_maxdepth=::arg().asNum("max-recursion-depth");
3026 SyncRes::s_rootNXTrust = ::arg().mustDo( "root-nx-trust");
3027 if(SyncRes::s_serverID.empty()) {
3028 char tmp[128];
3029 gethostname(tmp, sizeof(tmp)-1);
3030 SyncRes::s_serverID=tmp;
3031 }
3032
3033 SyncRes::s_ecsipv4limit = ::arg().asNum("ecs-ipv4-bits");
3034 SyncRes::s_ecsipv6limit = ::arg().asNum("ecs-ipv6-bits");
3035
3036 if (!::arg().isEmpty("ecs-scope-zero-address")) {
3037 ComboAddress scopeZero(::arg()["ecs-scope-zero-address"]);
3038 SyncRes::setECSScopeZeroAddress(Netmask(scopeZero, scopeZero.isIPv4() ? 32 : 128));
3039 }
3040 else {
3041 bool found = false;
3042 for (const auto& addr : g_localQueryAddresses4) {
3043 if (!IsAnyAddress(addr)) {
3044 SyncRes::setECSScopeZeroAddress(Netmask(addr, 32));
3045 found = true;
3046 break;
3047 }
3048 }
3049 if (!found) {
3050 for (const auto& addr : g_localQueryAddresses6) {
3051 if (!IsAnyAddress(addr)) {
3052 SyncRes::setECSScopeZeroAddress(Netmask(addr, 128));
3053 found = true;
3054 break;
3055 }
3056 }
3057 if (!found) {
3058 SyncRes::setECSScopeZeroAddress(Netmask("127.0.0.1/32"));
3059 }
3060 }
3061 }
3062
3063 g_networkTimeoutMsec = ::arg().asNum("network-timeout");
3064
3065 g_initialDomainMap = parseAuthAndForwards();
3066
3067 g_latencyStatSize=::arg().asNum("latency-statistic-size");
3068
3069 g_logCommonErrors=::arg().mustDo("log-common-errors");
3070 g_logRPZChanges = ::arg().mustDo("log-rpz-changes");
3071
3072 g_anyToTcp = ::arg().mustDo("any-to-tcp");
3073 g_udpTruncationThreshold = ::arg().asNum("udp-truncation-threshold");
3074
3075 g_lowercaseOutgoing = ::arg().mustDo("lowercase-outgoing");
3076
3077 g_numWorkerThreads = ::arg().asNum("threads");
3078 if (g_numWorkerThreads < 1) {
3079 L<<Logger::Warning<<"Asked to run with 0 threads, raising to 1 instead"<<endl;
3080 g_numWorkerThreads = 1;
3081 }
3082
3083 g_numThreads = g_numWorkerThreads + g_weDistributeQueries;
3084 g_maxMThreads = ::arg().asNum("max-mthreads");
3085
3086 g_gettagNeedsEDNSOptions = ::arg().mustDo("gettag-needs-edns-options");
3087
3088 g_statisticsInterval = ::arg().asNum("statistics-interval");
3089
3090 #ifdef SO_REUSEPORT
3091 g_reusePort = ::arg().mustDo("reuseport");
3092 #endif
3093
3094 g_useOneSocketPerThread = (!g_weDistributeQueries && g_reusePort);
3095
3096 if (g_useOneSocketPerThread) {
3097 for (unsigned int threadId = 0; threadId < g_numWorkerThreads; threadId++) {
3098 makeUDPServerSockets(threadId);
3099 makeTCPServerSockets(threadId);
3100 }
3101 }
3102 else {
3103 makeUDPServerSockets(0);
3104 makeTCPServerSockets(0);
3105
3106 if (!g_weDistributeQueries) {
3107 /* we are not distributing queries and we don't have reuseport,
3108 so every thread will be listening on all the TCP sockets */
3109 for (unsigned int threadId = 1; threadId < g_numWorkerThreads; threadId++) {
3110 g_tcpListenSockets[threadId] = g_tcpListenSockets[0];
3111 }
3112 }
3113 }
3114
3115 SyncRes::parseEDNSSubnetWhitelist(::arg()["edns-subnet-whitelist"]);
3116 g_useIncomingECS = ::arg().mustDo("use-incoming-edns-subnet");
3117
3118 int forks;
3119 for(forks = 0; forks < ::arg().asNum("processes") - 1; ++forks) {
3120 if(!fork()) // we are child
3121 break;
3122 }
3123
3124 if(::arg().mustDo("daemon")) {
3125 L<<Logger::Warning<<"Calling daemonize, going to background"<<endl;
3126 L.toConsole(Logger::Critical);
3127 daemonize();
3128 loadRecursorLuaConfig(::arg()["lua-config-file"], false);
3129 }
3130 signal(SIGUSR1,usr1Handler);
3131 signal(SIGUSR2,usr2Handler);
3132 signal(SIGPIPE,SIG_IGN);
3133
3134 checkOrFixFDS();
3135
3136 #ifdef HAVE_LIBSODIUM
3137 if (sodium_init() == -1) {
3138 L<<Logger::Error<<"Unable to initialize sodium crypto library"<<endl;
3139 exit(99);
3140 }
3141 #endif
3142
3143 openssl_thread_setup();
3144 openssl_seed();
3145
3146 int newgid=0;
3147 if(!::arg()["setgid"].empty())
3148 newgid=Utility::makeGidNumeric(::arg()["setgid"]);
3149 int newuid=0;
3150 if(!::arg()["setuid"].empty())
3151 newuid=Utility::makeUidNumeric(::arg()["setuid"]);
3152
3153 Utility::dropGroupPrivs(newuid, newgid);
3154
3155 if (!::arg()["chroot"].empty()) {
3156 #ifdef HAVE_SYSTEMD
3157 char *ns;
3158 ns = getenv("NOTIFY_SOCKET");
3159 if (ns != nullptr) {
3160 L<<Logger::Error<<"Unable to chroot when running from systemd. Please disable chroot= or set the 'Type' for this service to 'simple'"<<endl;
3161 exit(1);
3162 }
3163 #endif
3164 if (chroot(::arg()["chroot"].c_str())<0 || chdir("/") < 0) {
3165 L<<Logger::Error<<"Unable to chroot to '"+::arg()["chroot"]+"': "<<strerror (errno)<<", exiting"<<endl;
3166 exit(1);
3167 }
3168 else
3169 L<<Logger::Error<<"Chrooted to '"<<::arg()["chroot"]<<"'"<<endl;
3170 }
3171
3172 s_pidfname=::arg()["socket-dir"]+"/"+s_programname+".pid";
3173 if(!s_pidfname.empty())
3174 unlink(s_pidfname.c_str()); // remove possible old pid file
3175 writePid();
3176
3177 makeControlChannelSocket( ::arg().asNum("processes") > 1 ? forks : -1);
3178
3179 Utility::dropUserPrivs(newuid);
3180
3181 makeThreadPipes();
3182
3183 g_tcpTimeout=::arg().asNum("client-tcp-timeout");
3184 g_maxTCPPerClient=::arg().asNum("max-tcp-per-client");
3185 g_tcpMaxQueriesPerConn=::arg().asNum("max-tcp-queries-per-connection");
3186
3187 if (::arg().mustDo("snmp-agent")) {
3188 g_snmpAgent = std::make_shared<RecursorSNMPAgent>("recursor", ::arg()["snmp-master-socket"]);
3189 g_snmpAgent->run();
3190 }
3191
3192 /* This thread handles the web server, carbon, statistics and the control channel */
3193 std::thread handlerThread(recursorThread, s_handlerThreadID, false);
3194
3195 const auto cpusMap = parseCPUMap();
3196
3197 std::vector<std::thread> workers(g_numThreads);
3198 if(g_numThreads == 1) {
3199 L<<Logger::Warning<<"Operating unthreaded"<<endl;
3200 #ifdef HAVE_SYSTEMD
3201 sd_notify(0, "READY=1");
3202 #endif
3203 setCPUMap(cpusMap, 0, pthread_self());
3204 recursorThread(0, true);
3205 }
3206 else {
3207 L<<Logger::Warning<<"Launching "<< g_numThreads <<" threads"<<endl;
3208 for(unsigned int n=0; n < g_numThreads; ++n) {
3209 workers[n] = std::thread(recursorThread, n, true);
3210
3211 setCPUMap(cpusMap, n, workers[n].native_handle());
3212 }
3213 #ifdef HAVE_SYSTEMD
3214 sd_notify(0, "READY=1");
3215 #endif
3216 workers.back().join();
3217 }
3218 return 0;
3219 }
3220
3221 static void* recursorThread(int n, bool worker)
3222 try
3223 {
3224 t_id=n;
3225 SyncRes tmp(g_now); // make sure it allocates tsstorage before we do anything, like primeHints or so..
3226 SyncRes::setDomainMap(g_initialDomainMap);
3227 t_allowFrom = g_initialAllowFrom;
3228 t_udpclientsocks = std::unique_ptr<UDPClientSocks>(new UDPClientSocks());
3229 t_tcpClientCounts = std::unique_ptr<tcpClientCounts_t>(new tcpClientCounts_t());
3230 primeHints();
3231
3232 t_packetCache = std::unique_ptr<RecursorPacketCache>(new RecursorPacketCache());
3233
3234 #ifdef HAVE_PROTOBUF
3235 t_uuidGenerator = std::unique_ptr<boost::uuids::random_generator>(new boost::uuids::random_generator());
3236 #endif
3237 L<<Logger::Warning<<"Done priming cache with root hints"<<endl;
3238
3239 try {
3240 if(!::arg()["lua-dns-script"].empty()) {
3241 t_pdl = std::make_shared<RecursorLua4>(::arg()["lua-dns-script"]);
3242 L<<Logger::Warning<<"Loaded 'lua' script from '"<<::arg()["lua-dns-script"]<<"'"<<endl;
3243 }
3244 }
3245 catch(std::exception &e) {
3246 L<<Logger::Error<<"Failed to load 'lua' script from '"<<::arg()["lua-dns-script"]<<"': "<<e.what()<<endl;
3247 _exit(99);
3248 }
3249
3250 unsigned int ringsize=::arg().asNum("stats-ringbuffer-entries") / g_numWorkerThreads;
3251 if(ringsize) {
3252 t_remotes = std::unique_ptr<addrringbuf_t>(new addrringbuf_t());
3253 if(g_weDistributeQueries) // if so, only 1 thread does recvfrom
3254 t_remotes->set_capacity(::arg().asNum("stats-ringbuffer-entries"));
3255 else
3256 t_remotes->set_capacity(ringsize);
3257 t_servfailremotes = std::unique_ptr<addrringbuf_t>(new addrringbuf_t());
3258 t_servfailremotes->set_capacity(ringsize);
3259 t_largeanswerremotes = std::unique_ptr<addrringbuf_t>(new addrringbuf_t());
3260 t_largeanswerremotes->set_capacity(ringsize);
3261
3262 t_queryring = std::unique_ptr<boost::circular_buffer<pair<DNSName, uint16_t> > >(new boost::circular_buffer<pair<DNSName, uint16_t> >());
3263 t_queryring->set_capacity(ringsize);
3264 t_servfailqueryring = std::unique_ptr<boost::circular_buffer<pair<DNSName, uint16_t> > >(new boost::circular_buffer<pair<DNSName, uint16_t> >());
3265 t_servfailqueryring->set_capacity(ringsize);
3266 }
3267
3268 MT=std::unique_ptr<MTasker<PacketID,string> >(new MTasker<PacketID,string>(::arg().asNum("stack-size")));
3269
3270 PacketID pident;
3271
3272 t_fdm=getMultiplexer();
3273
3274 if(!worker) {
3275 if(::arg().mustDo("webserver")) {
3276 L<<Logger::Warning << "Enabling web server" << endl;
3277 try {
3278 new RecursorWebServer(t_fdm);
3279 }
3280 catch(PDNSException &e) {
3281 L<<Logger::Error<<"Exception: "<<e.reason<<endl;
3282 exit(99);
3283 }
3284 }
3285 L<<Logger::Error<<"Enabled '"<< t_fdm->getName() << "' multiplexer"<<endl;
3286 }
3287 else {
3288 t_fdm->addReadFD(g_pipes[t_id].readToThread, handlePipeRequest);
3289 t_fdm->addReadFD(g_pipes[t_id].readQueriesToThread, handlePipeRequest);
3290
3291 if(g_useOneSocketPerThread) {
3292 for(deferredAdd_t::const_iterator i = deferredAdds[t_id].cbegin(); i != deferredAdds[t_id].cend(); ++i) {
3293 t_fdm->addReadFD(i->first, i->second);
3294 }
3295 }
3296 else {
3297 if(!g_weDistributeQueries || t_id == s_distributorThreadID) { // if we distribute queries, only t_id = 0 listens
3298 for(deferredAdd_t::const_iterator i = deferredAdds[0].cbegin(); i != deferredAdds[0].cend(); ++i) {
3299 t_fdm->addReadFD(i->first, i->second);
3300 }
3301 }
3302 }
3303 }
3304
3305 registerAllStats();
3306
3307 if(!worker) {
3308 t_fdm->addReadFD(s_rcc.d_fd, handleRCC); // control channel
3309 }
3310
3311 unsigned int maxTcpClients=::arg().asNum("max-tcp-clients");
3312
3313 bool listenOnTCP(true);
3314
3315 time_t last_stat = 0;
3316 time_t last_carbon=0;
3317 time_t carbonInterval=::arg().asNum("carbon-interval");
3318 counter.store(0); // used to periodically execute certain tasks
3319 for(;;) {
3320 while(MT->schedule(&g_now)); // MTasker letting the mthreads do their thing
3321
3322 if(!(counter%500)) {
3323 MT->makeThread(houseKeeping, 0);
3324 }
3325
3326 if(!(counter%55)) {
3327 typedef vector<pair<int, FDMultiplexer::funcparam_t> > expired_t;
3328 expired_t expired=t_fdm->getTimeouts(g_now);
3329
3330 for(expired_t::iterator i=expired.begin() ; i != expired.end(); ++i) {
3331 shared_ptr<TCPConnection> conn=any_cast<shared_ptr<TCPConnection> >(i->second);
3332 if(g_logCommonErrors)
3333 L<<Logger::Warning<<"Timeout from remote TCP client "<< conn->d_remote.toString() <<endl;
3334 t_fdm->removeReadFD(i->first);
3335 }
3336 }
3337
3338 counter++;
3339
3340 if(!worker) {
3341 if(statsWanted || (g_statisticsInterval > 0 && (g_now.tv_sec - last_stat) >= g_statisticsInterval)) {
3342 doStats();
3343 last_stat = g_now.tv_sec;
3344 }
3345
3346 Utility::gettimeofday(&g_now, 0);
3347
3348 if((g_now.tv_sec - last_carbon) >= carbonInterval) {
3349 MT->makeThread(doCarbonDump, 0);
3350 last_carbon = g_now.tv_sec;
3351 }
3352 }
3353
3354 t_fdm->run(&g_now);
3355 // 'run' updates g_now for us
3356
3357 if(worker && (!g_weDistributeQueries || t_id == s_distributorThreadID)) { // if pdns distributes queries, only tid 0 should do this
3358 if(listenOnTCP) {
3359 if(TCPConnection::getCurrentConnections() > maxTcpClients) { // shutdown, too many connections
3360 for(const auto fd : g_tcpListenSockets[t_id]) {
3361 t_fdm->removeReadFD(fd);
3362 }
3363 listenOnTCP=false;
3364 }
3365 }
3366 else {
3367 if(TCPConnection::getCurrentConnections() <= maxTcpClients) { // reenable
3368 for(const auto fd : g_tcpListenSockets[t_id]) {
3369 t_fdm->addReadFD(fd, handleNewTCPQuestion);
3370 }
3371 listenOnTCP=true;
3372 }
3373 }
3374 }
3375 }
3376 }
3377 catch(PDNSException &ae) {
3378 L<<Logger::Error<<"Exception: "<<ae.reason<<endl;
3379 return 0;
3380 }
3381 catch(std::exception &e) {
3382 L<<Logger::Error<<"STL Exception: "<<e.what()<<endl;
3383 return 0;
3384 }
3385 catch(...) {
3386 L<<Logger::Error<<"any other exception in main: "<<endl;
3387 return 0;
3388 }
3389
3390
3391 int main(int argc, char **argv)
3392 {
3393 g_argc = argc;
3394 g_argv = argv;
3395 g_stats.startupTime=time(0);
3396 versionSetProduct(ProductRecursor);
3397 reportBasicTypes();
3398 reportOtherTypes();
3399
3400 int ret = EXIT_SUCCESS;
3401
3402 try {
3403 ::arg().set("stack-size","stack size per mthread")="200000";
3404 ::arg().set("soa-minimum-ttl","Don't change")="0";
3405 ::arg().set("no-shuffle","Don't change")="off";
3406 ::arg().set("local-port","port to listen on")="53";
3407 ::arg().set("local-address","IP addresses to listen on, separated by spaces or commas. Also accepts ports.")="127.0.0.1";
3408 ::arg().setSwitch("non-local-bind", "Enable binding to non-local addresses by using FREEBIND / BINDANY socket options")="no";
3409 ::arg().set("trace","if we should output heaps of logging. set to 'fail' to only log failing domains")="off";
3410 ::arg().set("dnssec", "DNSSEC mode: off/process-no-validate (default)/process/log-fail/validate")="process-no-validate";
3411 ::arg().set("dnssec-log-bogus", "Log DNSSEC bogus validations")="no";
3412 ::arg().set("daemon","Operate as a daemon")="no";
3413 ::arg().setSwitch("write-pid","Write a PID file")="yes";
3414 ::arg().set("loglevel","Amount of logging. Higher is more. Do not set below 3")="6";
3415 ::arg().set("disable-syslog","Disable logging to syslog, useful when running inside a supervisor that logs stdout")="no";
3416 ::arg().set("log-timestamp","Print timestamps in log lines, useful to disable when running with a tool that timestamps stdout already")="yes";
3417 ::arg().set("log-common-errors","If we should log rather common errors")="no";
3418 ::arg().set("chroot","switch to chroot jail")="";
3419 ::arg().set("setgid","If set, change group id to this gid for more security")="";
3420 ::arg().set("setuid","If set, change user id to this uid for more security")="";
3421 ::arg().set("network-timeout", "Wait this number of milliseconds for network i/o")="1500";
3422 ::arg().set("threads", "Launch this number of threads")="2";
3423 ::arg().set("processes", "Launch this number of processes (EXPERIMENTAL, DO NOT CHANGE)")="1"; // if we un-experimental this, need to fix openssl rand seeding for multiple PIDs!
3424 ::arg().set("config-name","Name of this virtual configuration - will rename the binary image")="";
3425 ::arg().set("api-config-dir", "Directory where REST API stores config and zones") = "";
3426 ::arg().set("api-key", "Static pre-shared authentication key for access to the REST API") = "";
3427 ::arg().set("api-logfile", "Location of the server logfile (used by the REST API)") = "/var/log/pdns.log";
3428 ::arg().set("api-readonly", "Disallow data modification through the REST API when set") = "no";
3429 ::arg().setSwitch("webserver", "Start a webserver (for REST API)") = "no";
3430 ::arg().set("webserver-address", "IP Address of webserver to listen on") = "127.0.0.1";
3431 ::arg().set("webserver-port", "Port of webserver to listen on") = "8082";
3432 ::arg().set("webserver-password", "Password required for accessing the webserver") = "";
3433 ::arg().set("webserver-allow-from","Webserver access is only allowed from these subnets")="127.0.0.1,::1";
3434 ::arg().set("carbon-ourname", "If set, overrides our reported hostname for carbon stats")="";
3435 ::arg().set("carbon-server", "If set, send metrics in carbon (graphite) format to this server IP address")="";
3436 ::arg().set("carbon-interval", "Number of seconds between carbon (graphite) updates")="30";
3437 ::arg().set("statistics-interval", "Number of seconds between printing of recursor statistics, 0 to disable")="1800";
3438 ::arg().set("quiet","Suppress logging of questions and answers")="";
3439 ::arg().set("logging-facility","Facility to log messages as. 0 corresponds to local0")="";
3440 ::arg().set("config-dir","Location of configuration directory (recursor.conf)")=SYSCONFDIR;
3441 ::arg().set("socket-owner","Owner of socket")="";
3442 ::arg().set("socket-group","Group of socket")="";
3443 ::arg().set("socket-mode", "Permissions for socket")="";
3444
3445 ::arg().set("socket-dir",string("Where the controlsocket will live, ")+LOCALSTATEDIR+" when unset and not chrooted" )="";
3446 ::arg().set("delegation-only","Which domains we only accept delegations from")="";
3447 ::arg().set("query-local-address","Source IP address for sending queries")="0.0.0.0";
3448 ::arg().set("query-local-address6","Source IPv6 address for sending queries. IF UNSET, IPv6 WILL NOT BE USED FOR OUTGOING QUERIES")="";
3449 ::arg().set("client-tcp-timeout","Timeout in seconds when talking to TCP clients")="2";
3450 ::arg().set("max-mthreads", "Maximum number of simultaneous Mtasker threads")="2048";
3451 ::arg().set("max-tcp-clients","Maximum number of simultaneous TCP clients")="128";
3452 ::arg().set("server-down-max-fails","Maximum number of consecutive timeouts (and unreachables) to mark a server as down ( 0 => disabled )")="64";
3453 ::arg().set("server-down-throttle-time","Number of seconds to throttle all queries to a server after being marked as down")="60";
3454 ::arg().set("hint-file", "If set, load root hints from this file")="";
3455 ::arg().set("max-cache-entries", "If set, maximum number of entries in the main cache")="1000000";
3456 ::arg().set("max-negative-ttl", "maximum number of seconds to keep a negative cached entry in memory")="3600";
3457 ::arg().set("max-cache-ttl", "maximum number of seconds to keep a cached entry in memory")="86400";
3458 ::arg().set("packetcache-ttl", "maximum number of seconds to keep a cached entry in packetcache")="3600";
3459 ::arg().set("max-packetcache-entries", "maximum number of entries to keep in the packetcache")="500000";
3460 ::arg().set("packetcache-servfail-ttl", "maximum number of seconds to keep a cached servfail entry in packetcache")="60";
3461 ::arg().set("server-id", "Returned when queried for 'id.server' TXT or NSID, defaults to hostname")="";
3462 ::arg().set("stats-ringbuffer-entries", "maximum number of packets to store statistics for")="10000";
3463 ::arg().set("version-string", "string reported on version.pdns or version.bind")=fullVersionString();
3464 ::arg().set("allow-from", "If set, only allow these comma separated netmasks to recurse")=LOCAL_NETS;
3465 ::arg().set("allow-from-file", "If set, load allowed netmasks from this file")="";
3466 ::arg().set("entropy-source", "If set, read entropy from this file")="/dev/urandom";
3467 ::arg().set("dont-query", "If set, do not query these netmasks for DNS data")=DONT_QUERY;
3468 ::arg().set("max-tcp-per-client", "If set, maximum number of TCP sessions per client (IP address)")="0";
3469 ::arg().set("max-tcp-queries-per-connection", "If set, maximum number of TCP queries in a TCP connection")="0";
3470 ::arg().set("spoof-nearmiss-max", "If non-zero, assume spoofing after this many near misses")="20";
3471 ::arg().set("single-socket", "If set, only use a single socket for outgoing queries")="off";
3472 ::arg().set("auth-zones", "Zones for which we have authoritative data, comma separated domain=file pairs ")="";
3473 ::arg().set("lua-config-file", "More powerful configuration options")="";
3474
3475 ::arg().set("forward-zones", "Zones for which we forward queries, comma separated domain=ip pairs")="";
3476 ::arg().set("forward-zones-recurse", "Zones for which we forward queries with recursion bit, comma separated domain=ip pairs")="";
3477 ::arg().set("forward-zones-file", "File with (+)domain=ip pairs for forwarding")="";
3478 ::arg().set("export-etc-hosts", "If we should serve up contents from /etc/hosts")="off";
3479 ::arg().set("export-etc-hosts-search-suffix", "Also serve up the contents of /etc/hosts with this suffix")="";
3480 ::arg().set("etc-hosts-file", "Path to 'hosts' file")="/etc/hosts";
3481 ::arg().set("serve-rfc1918", "If we should be authoritative for RFC 1918 private IP space")="yes";
3482 ::arg().set("lua-dns-script", "Filename containing an optional 'lua' script that will be used to modify dns answers")="";
3483 ::arg().set("latency-statistic-size","Number of latency values to calculate the qa-latency average")="10000";
3484 ::arg().setSwitch( "disable-packetcache", "Disable packetcache" )= "no";
3485 ::arg().set("ecs-ipv4-bits", "Number of bits of IPv4 address to pass for EDNS Client Subnet")="24";
3486 ::arg().set("ecs-ipv6-bits", "Number of bits of IPv6 address to pass for EDNS Client Subnet")="56";
3487 ::arg().set("edns-subnet-whitelist", "List of netmasks and domains that we should enable EDNS subnet for")="";
3488 ::arg().set("ecs-scope-zero-address", "Address to send to whitelisted authoritative servers for incoming queries with ECS prefix-length source of 0")="";
3489 ::arg().setSwitch( "use-incoming-edns-subnet", "Pass along received EDNS Client Subnet information")="no";
3490 ::arg().setSwitch( "pdns-distributes-queries", "If PowerDNS itself should distribute queries over threads")="yes";
3491 ::arg().setSwitch( "root-nx-trust", "If set, believe that an NXDOMAIN from the root means the TLD does not exist")="yes";
3492 ::arg().setSwitch( "any-to-tcp","Answer ANY queries with tc=1, shunting to TCP" )="no";
3493 ::arg().setSwitch( "lowercase-outgoing","Force outgoing questions to lowercase")="no";
3494 ::arg().setSwitch("gettag-needs-edns-options", "If EDNS Options should be extracted before calling the gettag() hook")="no";
3495 ::arg().set("udp-truncation-threshold", "Maximum UDP response size before we truncate")="1680";
3496 ::arg().set("edns-outgoing-bufsize", "Outgoing EDNS buffer size")="1680";
3497 ::arg().set("minimum-ttl-override", "Set under adverse conditions, a minimum TTL")="0";
3498 ::arg().set("max-qperq", "Maximum outgoing queries per query")="50";
3499 ::arg().set("max-total-msec", "Maximum total wall-clock time per query in milliseconds, 0 for unlimited")="7000";
3500 ::arg().set("max-recursion-depth", "Maximum number of internal recursion calls per query, 0 for unlimited")="40";
3501
3502 ::arg().set("include-dir","Include *.conf files from this directory")="";
3503 ::arg().set("security-poll-suffix","Domain name from which to query security update notifications")="secpoll.powerdns.com.";
3504
3505 ::arg().setSwitch("reuseport","Enable SO_REUSEPORT allowing multiple recursors processes to listen to 1 address")="no";
3506
3507 ::arg().setSwitch("snmp-agent", "If set, register as an SNMP agent")="no";
3508 ::arg().set("snmp-master-socket", "If set and snmp-agent is set, the socket to use to register to the SNMP master")="";
3509
3510 ::arg().set("tcp-fast-open", "Enable TCP Fast Open support on the listening sockets, using the supplied numerical value as the queue size")="0";
3511 ::arg().set("nsec3-max-iterations", "Maximum number of iterations allowed for an NSEC3 record")="2500";
3512
3513 ::arg().set("cpu-map", "Thread to CPU mapping, space separated thread-id=cpu1,cpu2..cpuN pairs")="";
3514
3515 ::arg().setSwitch("log-rpz-changes", "Log additions and removals to RPZ zones at Info level")="no";
3516
3517 ::arg().setCmd("help","Provide a helpful message");
3518 ::arg().setCmd("version","Print version string");
3519 ::arg().setCmd("config","Output blank configuration");
3520 L.toConsole(Logger::Info);
3521 ::arg().laxParse(argc,argv); // do a lax parse
3522
3523 string configname=::arg()["config-dir"]+"/recursor.conf";
3524 if(::arg()["config-name"]!="") {
3525 configname=::arg()["config-dir"]+"/recursor-"+::arg()["config-name"]+".conf";
3526 s_programname+="-"+::arg()["config-name"];
3527 }
3528 cleanSlashes(configname);
3529
3530 if(::arg().mustDo("config")) {
3531 cout<<::arg().configstring()<<endl;
3532 exit(0);
3533 }
3534
3535 if(!::arg().file(configname.c_str()))
3536 L<<Logger::Warning<<"Unable to parse configuration file '"<<configname<<"'"<<endl;
3537
3538 ::arg().parse(argc,argv);
3539
3540 if( !::arg()["chroot"].empty() && !::arg()["api-config-dir"].empty() && !::arg().mustDo("api-readonly") ) {
3541 L<<Logger::Error<<"Using chroot and a writable API is not possible"<<endl;
3542 exit(EXIT_FAILURE);
3543 }
3544
3545 if (::arg()["socket-dir"].empty()) {
3546 if (::arg()["chroot"].empty())
3547 ::arg().set("socket-dir") = LOCALSTATEDIR;
3548 else
3549 ::arg().set("socket-dir") = "/";
3550 }
3551
3552 ::arg().set("delegation-only")=toLower(::arg()["delegation-only"]);
3553
3554 if(::arg().asNum("threads")==1)
3555 ::arg().set("pdns-distributes-queries")="no";
3556
3557 if(::arg().mustDo("help")) {
3558 cout<<"syntax:"<<endl<<endl;
3559 cout<<::arg().helpstring(::arg()["help"])<<endl;
3560 exit(0);
3561 }
3562 if(::arg().mustDo("version")) {
3563 showProductVersion();
3564 showBuildConfiguration();
3565 exit(0);
3566 }
3567
3568 Logger::Urgency logUrgency = (Logger::Urgency)::arg().asNum("loglevel");
3569
3570 if (logUrgency < Logger::Error)
3571 logUrgency = Logger::Error;
3572 if(!g_quiet && logUrgency < Logger::Info) { // Logger::Info=6, Logger::Debug=7
3573 logUrgency = Logger::Info; // if you do --quiet=no, you need Info to also see the query log
3574 }
3575 L.setLoglevel(logUrgency);
3576 L.toConsole(logUrgency);
3577
3578 serviceMain(argc, argv);
3579 }
3580 catch(PDNSException &ae) {
3581 L<<Logger::Error<<"Exception: "<<ae.reason<<endl;
3582 ret=EXIT_FAILURE;
3583 }
3584 catch(std::exception &e) {
3585 L<<Logger::Error<<"STL Exception: "<<e.what()<<endl;
3586 ret=EXIT_FAILURE;
3587 }
3588 catch(...) {
3589 L<<Logger::Error<<"any other exception in main: "<<endl;
3590 ret=EXIT_FAILURE;
3591 }
3592
3593 return ret;
3594 }