]> git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/pdns_recursor.cc
d92b1ffaeb7b15e69859c440bd30bce5538918c0
[thirdparty/pdns.git] / pdns / pdns_recursor.cc
1 /*
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
8 *
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
22 #ifdef HAVE_CONFIG_H
23 #include "config.h"
24 #endif
25
26 #include <netdb.h>
27 #include <sys/stat.h>
28 #include <unistd.h>
29
30 #include "recpacketcache.hh"
31 #include "ws-recursor.hh"
32 #include <pthread.h>
33 #include "utility.hh"
34 #include "dns_random.hh"
35 #ifdef HAVE_LIBSODIUM
36 #include <sodium.h>
37 #endif
38 #include "opensslsigners.hh"
39 #include <iostream>
40 #include <errno.h>
41 #include <boost/static_assert.hpp>
42 #include <map>
43 #include <set>
44 #include "recursor_cache.hh"
45 #include "cachecleaner.hh"
46 #include <stdio.h>
47 #include <signal.h>
48 #include <stdlib.h>
49 #include "misc.hh"
50 #include "mtasker.hh"
51 #include <utility>
52 #include "arguments.hh"
53 #include "syncres.hh"
54 #include <fcntl.h>
55 #include <fstream>
56 #include "sortlist.hh"
57 #include "sstuff.hh"
58 #include <boost/tuple/tuple.hpp>
59 #include <boost/tuple/tuple_comparison.hpp>
60 #include <boost/shared_array.hpp>
61 #include <boost/function.hpp>
62 #include <boost/algorithm/string.hpp>
63 #ifdef MALLOC_TRACE
64 #include "malloctrace.hh"
65 #endif
66 #include <netinet/tcp.h>
67 #include "dnsparser.hh"
68 #include "dnswriter.hh"
69 #include "dnsrecords.hh"
70 #include "zoneparser-tng.hh"
71 #include "rec_channel.hh"
72 #include "logger.hh"
73 #include "iputils.hh"
74 #include "mplexer.hh"
75 #include "config.h"
76 #include "lua-recursor4.hh"
77 #include "version.hh"
78 #include "responsestats.hh"
79 #include "secpoll-recursor.hh"
80 #include "dnsname.hh"
81 #include "filterpo.hh"
82 #include "rpzloader.hh"
83 #include "validate-recursor.hh"
84 #include "rec-lua-conf.hh"
85 #include "ednsoptions.hh"
86 #include "gettime.hh"
87
88 #include "rec-protobuf.hh"
89 #include "rec-snmp.hh"
90
91 #ifdef HAVE_SYSTEMD
92 #include <systemd/sd-daemon.h>
93 #endif
94
95 #include "namespaces.hh"
96
97 typedef map<ComboAddress, uint32_t, ComboAddress::addressOnlyLessThan> tcpClientCounts_t;
98
99 static thread_local std::shared_ptr<RecursorLua4> t_pdl;
100 static thread_local int t_id = -1;
101 static thread_local std::shared_ptr<Regex> t_traceRegex;
102 static thread_local std::unique_ptr<tcpClientCounts_t> t_tcpClientCounts;
103
104 thread_local std::unique_ptr<MT_t> MT; // the big MTasker
105 thread_local std::unique_ptr<MemRecursorCache> t_RC;
106 thread_local std::unique_ptr<RecursorPacketCache> t_packetCache;
107 thread_local FDMultiplexer* t_fdm{nullptr};
108 thread_local std::unique_ptr<addrringbuf_t> t_remotes, t_servfailremotes, t_largeanswerremotes;
109 thread_local std::unique_ptr<boost::circular_buffer<pair<DNSName, uint16_t> > > t_queryring, t_servfailqueryring;
110 thread_local std::shared_ptr<NetmaskGroup> t_allowFrom;
111 #ifdef HAVE_PROTOBUF
112 thread_local std::unique_ptr<boost::uuids::random_generator> t_uuidGenerator;
113 #endif
114 __thread struct timeval g_now; // timestamp, updated (too) frequently
115
116 // for communicating with our threads
117 struct ThreadPipeSet
118 {
119 int writeToThread;
120 int readToThread;
121 int writeFromThread;
122 int readFromThread;
123 int writeQueriesToThread; // this one is non-blocking
124 int readQueriesToThread;
125 };
126
127 /* the TID of the thread handling the web server, carbon, statistics and the control channel */
128 static const int s_handlerThreadID = -1;
129 /* when pdns-distributes-queries is set, the TID of the thread handling, hashing and distributing new queries
130 to the other threads */
131 static const int s_distributorThreadID = 0;
132
133 typedef std::map<int, std::set<int>> tcpListenSockets_t;
134 typedef map<int, ComboAddress> listenSocketsAddresses_t; // is shared across all threads right now
135
136 typedef vector<pair<int, function< void(int, any&) > > > deferredAdd_t;
137
138 static const ComboAddress g_local4("0.0.0.0"), g_local6("::");
139 static vector<ThreadPipeSet> g_pipes; // effectively readonly after startup
140 static tcpListenSockets_t g_tcpListenSockets; // shared across threads, but this is fine, never written to from a thread. All threads listen on all sockets
141 static listenSocketsAddresses_t g_listenSocketsAddresses; // is shared across all threads right now
142 static std::unordered_map<unsigned int, deferredAdd_t> deferredAdds;
143 static set<int> g_fromtosockets; // listen sockets that use 'sendfromto()' mechanism
144 static vector<ComboAddress> g_localQueryAddresses4, g_localQueryAddresses6;
145 static AtomicCounter counter;
146 static std::shared_ptr<SyncRes::domainmap_t> g_initialDomainMap; // new threads needs this to be setup
147 static std::shared_ptr<NetmaskGroup> g_initialAllowFrom; // new thread needs to be setup with this
148 static size_t g_tcpMaxQueriesPerConn;
149 static size_t s_maxUDPQueriesPerRound;
150 static uint64_t g_latencyStatSize;
151 static uint32_t g_disthashseed;
152 static unsigned int g_maxTCPPerClient;
153 static unsigned int g_networkTimeoutMsec;
154 static unsigned int g_maxMThreads;
155 static unsigned int g_numWorkerThreads;
156 static int g_tcpTimeout;
157 static uint16_t g_udpTruncationThreshold;
158 static std::atomic<bool> statsWanted;
159 static std::atomic<bool> g_quiet;
160 static bool g_logCommonErrors;
161 static bool g_anyToTcp;
162 static bool g_weDistributeQueries; // if true, only 1 thread listens on the incoming query sockets
163 static bool g_reusePort{false};
164 static bool g_useOneSocketPerThread;
165 static bool g_gettagNeedsEDNSOptions{false};
166 static time_t g_statisticsInterval;
167 static bool g_useIncomingECS;
168 std::atomic<uint32_t> g_maxCacheEntries, g_maxPacketCacheEntries;
169
170 RecursorControlChannel s_rcc; // only active in thread 0
171 RecursorStats g_stats;
172 string s_programname="pdns_recursor";
173 string s_pidfname;
174 bool g_lowercaseOutgoing;
175 unsigned int g_numThreads;
176 uint16_t g_outgoingEDNSBufsize;
177 bool g_logRPZChanges{false};
178
179 #define LOCAL_NETS "127.0.0.0/8, 10.0.0.0/8, 100.64.0.0/10, 169.254.0.0/16, 192.168.0.0/16, 172.16.0.0/12, ::1/128, fc00::/7, fe80::/10"
180 // Bad Nets taken from both:
181 // http://www.iana.org/assignments/iana-ipv4-special-registry/iana-ipv4-special-registry.xhtml
182 // and
183 // http://www.iana.org/assignments/iana-ipv6-special-registry/iana-ipv6-special-registry.xhtml
184 // where such a network may not be considered a valid destination
185 #define BAD_NETS "0.0.0.0/8, 192.0.0.0/24, 192.0.2.0/24, 198.51.100.0/24, 203.0.113.0/24, 240.0.0.0/4, ::/96, ::ffff:0:0/96, 100::/64, 2001:db8::/32"
186 #define DONT_QUERY LOCAL_NETS ", " BAD_NETS
187
188 //! used to send information to a newborn mthread
189 struct DNSComboWriter {
190 DNSComboWriter(const char* data, uint16_t len, const struct timeval& now) : d_mdp(true, data, len), d_now(now),
191 d_tcp(false), d_socket(-1)
192 {}
193 MOADNSParser d_mdp;
194 void setRemote(const ComboAddress* sa)
195 {
196 d_remote=*sa;
197 }
198
199 void setLocal(const ComboAddress& sa)
200 {
201 d_local=sa;
202 }
203
204
205 void setSocket(int sock)
206 {
207 d_socket=sock;
208 }
209
210 string getRemote() const
211 {
212 return d_remote.toString();
213 }
214
215 struct timeval d_now;
216 ComboAddress d_remote, d_local;
217 #ifdef HAVE_PROTOBUF
218 boost::uuids::uuid d_uuid;
219 string d_requestorId;
220 string d_deviceId;
221 #endif
222 std::string d_query;
223 EDNSSubnetOpts d_ednssubnet;
224 bool d_ecsFound{false};
225 bool d_ecsParsed{false};
226 bool d_tcp;
227 int d_socket;
228 unsigned int d_tag{0};
229 uint32_t d_qhash{0};
230 shared_ptr<TCPConnection> d_tcpConnection;
231 vector<pair<uint16_t, string> > d_ednsOpts;
232 std::vector<std::string> d_policyTags;
233 LuaContext::LuaObject d_data;
234 uint32_t d_ttlCap{std::numeric_limits<uint32_t>::max()};
235 uint16_t d_ecsBegin{0};
236 uint16_t d_ecsEnd{0};
237 bool d_variable{false};
238 };
239
240 MT_t* getMT()
241 {
242 return MT ? MT.get() : nullptr;
243 }
244
245 ArgvMap &arg()
246 {
247 static ArgvMap theArg;
248 return theArg;
249 }
250
251 int getRecursorThreadId()
252 {
253 return t_id;
254 }
255
256 int getMTaskerTID()
257 {
258 return MT->getTid();
259 }
260
261 static void handleTCPClientWritable(int fd, FDMultiplexer::funcparam_t& var);
262
263 // -1 is error, 0 is timeout, 1 is success
264 int asendtcp(const string& data, Socket* sock)
265 {
266 PacketID pident;
267 pident.sock=sock;
268 pident.outMSG=data;
269
270 t_fdm->addWriteFD(sock->getHandle(), handleTCPClientWritable, pident);
271 string packet;
272
273 int ret=MT->waitEvent(pident, &packet, g_networkTimeoutMsec);
274
275 if(!ret || ret==-1) { // timeout
276 t_fdm->removeWriteFD(sock->getHandle());
277 }
278 else if(packet.size() !=data.size()) { // main loop tells us what it sent out, or empty in case of an error
279 return -1;
280 }
281 return ret;
282 }
283
284 static void handleTCPClientReadable(int fd, FDMultiplexer::funcparam_t& var);
285
286 // -1 is error, 0 is timeout, 1 is success
287 int arecvtcp(string& data, size_t len, Socket* sock, bool incompleteOkay)
288 {
289 data.clear();
290 PacketID pident;
291 pident.sock=sock;
292 pident.inNeeded=len;
293 pident.inIncompleteOkay=incompleteOkay;
294 t_fdm->addReadFD(sock->getHandle(), handleTCPClientReadable, pident);
295
296 int ret=MT->waitEvent(pident,&data, g_networkTimeoutMsec);
297 if(!ret || ret==-1) { // timeout
298 t_fdm->removeReadFD(sock->getHandle());
299 }
300 else if(data.empty()) {// error, EOF or other
301 return -1;
302 }
303
304 return ret;
305 }
306
307 static void handleGenUDPQueryResponse(int fd, FDMultiplexer::funcparam_t& var)
308 {
309 PacketID pident=*any_cast<PacketID>(&var);
310 char resp[512];
311 ssize_t ret=recv(fd, resp, sizeof(resp), 0);
312 t_fdm->removeReadFD(fd);
313 if(ret >= 0) {
314 string data(resp, (size_t) ret);
315 MT->sendEvent(pident, &data);
316 }
317 else {
318 string empty;
319 MT->sendEvent(pident, &empty);
320 // cerr<<"Had some kind of error: "<<ret<<", "<<strerror(errno)<<endl;
321 }
322 }
323 string GenUDPQueryResponse(const ComboAddress& dest, const string& query)
324 {
325 Socket s(dest.sin4.sin_family, SOCK_DGRAM);
326 s.setNonBlocking();
327 ComboAddress local = getQueryLocalAddress(dest.sin4.sin_family, 0);
328
329 s.bind(local);
330 s.connect(dest);
331 s.send(query);
332
333 PacketID pident;
334 pident.sock=&s;
335 pident.type=0;
336 t_fdm->addReadFD(s.getHandle(), handleGenUDPQueryResponse, pident);
337
338 string data;
339
340 int ret=MT->waitEvent(pident,&data, g_networkTimeoutMsec);
341
342 if(!ret || ret==-1) { // timeout
343 t_fdm->removeReadFD(s.getHandle());
344 }
345 else if(data.empty()) {// error, EOF or other
346 // we could special case this
347 return data;
348 }
349 return data;
350 }
351
352 //! pick a random query local address
353 ComboAddress getQueryLocalAddress(int family, uint16_t port)
354 {
355 ComboAddress ret;
356 if(family==AF_INET) {
357 if(g_localQueryAddresses4.empty())
358 ret = g_local4;
359 else
360 ret = g_localQueryAddresses4[dns_random(g_localQueryAddresses4.size())];
361 ret.sin4.sin_port = htons(port);
362 }
363 else {
364 if(g_localQueryAddresses6.empty())
365 ret = g_local6;
366 else
367 ret = g_localQueryAddresses6[dns_random(g_localQueryAddresses6.size())];
368
369 ret.sin6.sin6_port = htons(port);
370 }
371 return ret;
372 }
373
374 static void handleUDPServerResponse(int fd, FDMultiplexer::funcparam_t&);
375
376 static void setSocketBuffer(int fd, int optname, uint32_t size)
377 {
378 uint32_t psize=0;
379 socklen_t len=sizeof(psize);
380
381 if(!getsockopt(fd, SOL_SOCKET, optname, (char*)&psize, &len) && psize > size) {
382 L<<Logger::Error<<"Not decreasing socket buffer size from "<<psize<<" to "<<size<<endl;
383 return;
384 }
385
386 if (setsockopt(fd, SOL_SOCKET, optname, (char*)&size, sizeof(size)) < 0 )
387 L<<Logger::Error<<"Unable to raise socket buffer size to "<<size<<": "<<strerror(errno)<<endl;
388 }
389
390
391 static void setSocketReceiveBuffer(int fd, uint32_t size)
392 {
393 setSocketBuffer(fd, SO_RCVBUF, size);
394 }
395
396 static void setSocketSendBuffer(int fd, uint32_t size)
397 {
398 setSocketBuffer(fd, SO_SNDBUF, size);
399 }
400
401
402 // you can ask this class for a UDP socket to send a query from
403 // this socket is not yours, don't even think about deleting it
404 // but after you call 'returnSocket' on it, don't assume anything anymore
405 class UDPClientSocks
406 {
407 unsigned int d_numsocks;
408 public:
409 UDPClientSocks() : d_numsocks(0)
410 {
411 }
412
413 typedef set<int> socks_t;
414 socks_t d_socks;
415
416 // returning -2 means: temporary OS error (ie, out of files), -1 means error related to remote
417 int getSocket(const ComboAddress& toaddr, int* fd)
418 {
419 *fd=makeClientSocket(toaddr.sin4.sin_family);
420 if(*fd < 0) // temporary error - receive exception otherwise
421 return -2;
422
423 if(connect(*fd, (struct sockaddr*)(&toaddr), toaddr.getSocklen()) < 0) {
424 int err = errno;
425 // returnSocket(*fd);
426 try {
427 closesocket(*fd);
428 }
429 catch(const PDNSException& e) {
430 L<<Logger::Error<<"Error closing UDP socket after connect() failed: "<<e.reason<<endl;
431 }
432
433 if(err==ENETUNREACH) // Seth "My Interfaces Are Like A Yo Yo" Arnold special
434 return -2;
435 return -1;
436 }
437
438 d_socks.insert(*fd);
439 d_numsocks++;
440 return 0;
441 }
442
443 void returnSocket(int fd)
444 {
445 socks_t::iterator i=d_socks.find(fd);
446 if(i==d_socks.end()) {
447 throw PDNSException("Trying to return a socket (fd="+std::to_string(fd)+") not in the pool");
448 }
449 returnSocketLocked(i);
450 }
451
452 // return a socket to the pool, or simply erase it
453 void returnSocketLocked(socks_t::iterator& i)
454 {
455 if(i==d_socks.end()) {
456 throw PDNSException("Trying to return a socket not in the pool");
457 }
458 try {
459 t_fdm->removeReadFD(*i);
460 }
461 catch(FDMultiplexerException& e) {
462 // we sometimes return a socket that has not yet been assigned to t_fdm
463 }
464 try {
465 closesocket(*i);
466 }
467 catch(const PDNSException& e) {
468 L<<Logger::Error<<"Error closing returned UDP socket: "<<e.reason<<endl;
469 }
470
471 d_socks.erase(i++);
472 --d_numsocks;
473 }
474
475 // returns -1 for errors which might go away, throws for ones that won't
476 static int makeClientSocket(int family)
477 {
478 int ret=socket(family, SOCK_DGRAM, 0 ); // turns out that setting CLO_EXEC and NONBLOCK from here is not a performance win on Linux (oddly enough)
479
480 if(ret < 0 && errno==EMFILE) // this is not a catastrophic error
481 return ret;
482
483 if(ret<0)
484 throw PDNSException("Making a socket for resolver (family = "+std::to_string(family)+"): "+stringerror());
485
486 // setCloseOnExec(ret); // we're not going to exec
487
488 int tries=10;
489 ComboAddress sin;
490 while(--tries) {
491 uint16_t port;
492
493 if(tries==1) // fall back to kernel 'random'
494 port = 0;
495 else
496 port = 1025 + dns_random(64510);
497
498 sin=getQueryLocalAddress(family, port); // does htons for us
499
500 if (::bind(ret, (struct sockaddr *)&sin, sin.getSocklen()) >= 0)
501 break;
502 }
503 if(!tries)
504 throw PDNSException("Resolver binding to local query client socket on "+sin.toString()+": "+stringerror());
505
506 setNonBlocking(ret);
507 return ret;
508 }
509 };
510
511 static thread_local std::unique_ptr<UDPClientSocks> t_udpclientsocks;
512
513 /* these two functions are used by LWRes */
514 // -2 is OS error, -1 is error that depends on the remote, > 0 is success
515 int asendto(const char *data, size_t len, int flags,
516 const ComboAddress& toaddr, uint16_t id, const DNSName& domain, uint16_t qtype, int* fd)
517 {
518
519 PacketID pident;
520 pident.domain = domain;
521 pident.remote = toaddr;
522 pident.type = qtype;
523
524 // see if there is an existing outstanding request we can chain on to, using partial equivalence function
525 pair<MT_t::waiters_t::iterator, MT_t::waiters_t::iterator> chain=MT->d_waiters.equal_range(pident, PacketIDBirthdayCompare());
526
527 for(; chain.first != chain.second; chain.first++) {
528 if(chain.first->key.fd > -1) { // don't chain onto existing chained waiter!
529 /*
530 cerr<<"Orig: "<<pident.domain<<", "<<pident.remote.toString()<<", id="<<id<<endl;
531 cerr<<"Had hit: "<< chain.first->key.domain<<", "<<chain.first->key.remote.toString()<<", id="<<chain.first->key.id
532 <<", count="<<chain.first->key.chain.size()<<", origfd: "<<chain.first->key.fd<<endl;
533 */
534 chain.first->key.chain.insert(id); // we can chain
535 *fd=-1; // gets used in waitEvent / sendEvent later on
536 return 1;
537 }
538 }
539
540 int ret=t_udpclientsocks->getSocket(toaddr, fd);
541 if(ret < 0)
542 return ret;
543
544 pident.fd=*fd;
545 pident.id=id;
546
547 t_fdm->addReadFD(*fd, handleUDPServerResponse, pident);
548 ret = send(*fd, data, len, 0);
549
550 int tmp = errno;
551
552 if(ret < 0)
553 t_udpclientsocks->returnSocket(*fd);
554
555 errno = tmp; // this is for logging purposes only
556 return ret;
557 }
558
559 // -1 is error, 0 is timeout, 1 is success
560 int arecvfrom(char *data, size_t len, int flags, const ComboAddress& fromaddr, size_t *d_len,
561 uint16_t id, const DNSName& domain, uint16_t qtype, int fd, struct timeval* now)
562 {
563 static optional<unsigned int> nearMissLimit;
564 if(!nearMissLimit)
565 nearMissLimit=::arg().asNum("spoof-nearmiss-max");
566
567 PacketID pident;
568 pident.fd=fd;
569 pident.id=id;
570 pident.domain=domain;
571 pident.type = qtype;
572 pident.remote=fromaddr;
573
574 string packet;
575 int ret=MT->waitEvent(pident, &packet, g_networkTimeoutMsec, now);
576
577 if(ret > 0) {
578 if(packet.empty()) // means "error"
579 return -1;
580
581 *d_len=packet.size();
582 memcpy(data,packet.c_str(),min(len,*d_len));
583 if(*nearMissLimit && pident.nearMisses > *nearMissLimit) {
584 L<<Logger::Error<<"Too many ("<<pident.nearMisses<<" > "<<*nearMissLimit<<") bogus answers for '"<<domain<<"' from "<<fromaddr.toString()<<", assuming spoof attempt."<<endl;
585 g_stats.spoofCount++;
586 return -1;
587 }
588 }
589 else {
590 if(fd >= 0)
591 t_udpclientsocks->returnSocket(fd);
592 }
593 return ret;
594 }
595
596 static void writePid(void)
597 {
598 if(!::arg().mustDo("write-pid"))
599 return;
600 ofstream of(s_pidfname.c_str(), std::ios_base::app);
601 if(of)
602 of<< Utility::getpid() <<endl;
603 else
604 L<<Logger::Error<<"Writing pid for "<<Utility::getpid()<<" to "<<s_pidfname<<" failed: "<<strerror(errno)<<endl;
605 }
606
607 TCPConnection::TCPConnection(int fd, const ComboAddress& addr) : d_remote(addr), d_fd(fd)
608 {
609 ++s_currentConnections;
610 (*t_tcpClientCounts)[d_remote]++;
611 }
612
613 TCPConnection::~TCPConnection()
614 {
615 try {
616 if(closesocket(d_fd) < 0)
617 L<<Logger::Error<<"Error closing socket for TCPConnection"<<endl;
618 }
619 catch(const PDNSException& e) {
620 L<<Logger::Error<<"Error closing TCPConnection socket: "<<e.reason<<endl;
621 }
622
623 if(t_tcpClientCounts->count(d_remote) && !(*t_tcpClientCounts)[d_remote]--)
624 t_tcpClientCounts->erase(d_remote);
625 --s_currentConnections;
626 }
627
628 AtomicCounter TCPConnection::s_currentConnections;
629
630 static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var);
631
632 // the idea is, only do things that depend on the *response* here. Incoming accounting is on incoming.
633 static void updateResponseStats(int res, const ComboAddress& remote, unsigned int packetsize, const DNSName* query, uint16_t qtype)
634 {
635 if(packetsize > 1000 && t_largeanswerremotes)
636 t_largeanswerremotes->push_back(remote);
637 switch(res) {
638 case RCode::ServFail:
639 if(t_servfailremotes) {
640 t_servfailremotes->push_back(remote);
641 if(query && t_servfailqueryring) // packet cache
642 t_servfailqueryring->push_back(make_pair(*query, qtype));
643 }
644 g_stats.servFails++;
645 break;
646 case RCode::NXDomain:
647 g_stats.nxDomains++;
648 break;
649 case RCode::NoError:
650 g_stats.noErrors++;
651 break;
652 }
653 }
654
655 static string makeLoginfo(DNSComboWriter* dc)
656 try
657 {
658 return "("+dc->d_mdp.d_qname.toLogString()+"/"+DNSRecordContent::NumberToType(dc->d_mdp.d_qtype)+" from "+(dc->d_remote.toString())+")";
659 }
660 catch(...)
661 {
662 return "Exception making error message for exception";
663 }
664
665 #ifdef HAVE_PROTOBUF
666 static void protobufLogQuery(const std::shared_ptr<RemoteLogger>& logger, uint8_t maskV4, uint8_t maskV6, const boost::uuids::uuid& uniqueId, const ComboAddress& remote, const ComboAddress& local, const Netmask& ednssubnet, bool tcp, uint16_t id, size_t len, const DNSName& qname, uint16_t qtype, uint16_t qclass, const std::vector<std::string>& policyTags, const std::string& requestorId, const std::string& deviceId)
667 {
668 Netmask requestorNM(remote, remote.sin4.sin_family == AF_INET ? maskV4 : maskV6);
669 const ComboAddress& requestor = requestorNM.getMaskedNetwork();
670 RecProtoBufMessage message(DNSProtoBufMessage::Query, uniqueId, &requestor, &local, qname, qtype, qclass, id, tcp, len);
671 message.setServerIdentity(SyncRes::s_serverID);
672 message.setEDNSSubnet(ednssubnet, ednssubnet.isIpv4() ? maskV4 : maskV6);
673 message.setRequestorId(requestorId);
674 message.setDeviceId(deviceId);
675
676 if (!policyTags.empty()) {
677 message.setPolicyTags(policyTags);
678 }
679
680 // cerr <<message.toDebugString()<<endl;
681 std::string str;
682 message.serialize(str);
683 logger->queueData(str);
684 }
685
686 static void protobufLogResponse(const std::shared_ptr<RemoteLogger>& logger, const RecProtoBufMessage& message)
687 {
688 // cerr <<message.toDebugString()<<endl;
689 std::string str;
690 message.serialize(str);
691 logger->queueData(str);
692 }
693 #endif
694
695 /**
696 * Chases the CNAME provided by the PolicyCustom RPZ policy.
697 *
698 * @param spoofed: The DNSRecord that was created by the policy, should already be added to ret
699 * @param qtype: The QType of the original query
700 * @param sr: A SyncRes
701 * @param res: An integer that will contain the RCODE of the lookup we do
702 * @param ret: A vector of DNSRecords where the result of the CNAME chase should be appended to
703 */
704 static void handleRPZCustom(const DNSRecord& spoofed, const QType& qtype, SyncRes& sr, int& res, vector<DNSRecord>& ret)
705 {
706 if (spoofed.d_type == QType::CNAME) {
707 bool oldWantsRPZ = sr.getWantsRPZ();
708 sr.setWantsRPZ(false);
709 vector<DNSRecord> ans;
710 res = sr.beginResolve(DNSName(spoofed.d_content->getZoneRepresentation()), qtype, 1, ans);
711 for (const auto& rec : ans) {
712 if(rec.d_place == DNSResourceRecord::ANSWER) {
713 ret.push_back(rec);
714 }
715 }
716 // Reset the RPZ state of the SyncRes
717 sr.setWantsRPZ(oldWantsRPZ);
718 }
719 }
720
721 static bool addRecordToPacket(DNSPacketWriter& pw, const DNSRecord& rec, uint32_t& minTTL, uint32_t ttlCap, const uint16_t maxAnswerSize)
722 {
723 pw.startRecord(rec.d_name, rec.d_type, (rec.d_ttl > ttlCap ? ttlCap : rec.d_ttl), rec.d_class, rec.d_place);
724
725 if(rec.d_type != QType::OPT) // their TTL ain't real
726 minTTL = min(minTTL, rec.d_ttl);
727
728 rec.d_content->toPacket(pw);
729 if(pw.size() > static_cast<size_t>(maxAnswerSize)) {
730 pw.rollback();
731 if(rec.d_place != DNSResourceRecord::ADDITIONAL) {
732 pw.getHeader()->tc=1;
733 pw.truncate();
734 }
735 return false;
736 }
737
738 return true;
739 }
740
741 static void startDoResolve(void *p)
742 {
743 DNSComboWriter* dc=(DNSComboWriter *)p;
744 try {
745 if (t_queryring)
746 t_queryring->push_back(make_pair(dc->d_mdp.d_qname, dc->d_mdp.d_qtype));
747
748 uint16_t maxanswersize = dc->d_tcp ? 65535 : min(static_cast<uint16_t>(512), g_udpTruncationThreshold);
749 EDNSOpts edo;
750 bool haveEDNS=false;
751 if(getEDNSOpts(dc->d_mdp, &edo)) {
752 if(!dc->d_tcp) {
753 /* rfc6891 6.2.3:
754 "Values lower than 512 MUST be treated as equal to 512."
755 */
756 maxanswersize = min(static_cast<uint16_t>(edo.d_packetsize >= 512 ? edo.d_packetsize : 512), g_udpTruncationThreshold);
757 }
758 dc->d_ednsOpts = edo.d_options;
759 haveEDNS=true;
760
761 if (g_useIncomingECS && !dc->d_ecsParsed) {
762 for (const auto& o : edo.d_options) {
763 if (o.first == EDNSOptionCode::ECS) {
764 dc->d_ecsFound = getEDNSSubnetOptsFromString(o.second, &dc->d_ednssubnet);
765 break;
766 }
767 }
768 }
769 }
770 /* perhaps there was no EDNS or no ECS but by now we looked */
771 dc->d_ecsParsed = true;
772 vector<DNSRecord> ret;
773 vector<uint8_t> packet;
774
775 auto luaconfsLocal = g_luaconfs.getLocal();
776 // Used to tell syncres later on if we should apply NSDNAME and NSIP RPZ triggers for this query
777 bool wantsRPZ(true);
778 RecProtoBufMessage pbMessage(RecProtoBufMessage::Response);
779 #ifdef HAVE_PROTOBUF
780 if (luaconfsLocal->protobufServer) {
781 Netmask requestorNM(dc->d_remote, dc->d_remote.sin4.sin_family == AF_INET ? luaconfsLocal->protobufMaskV4 : luaconfsLocal->protobufMaskV6);
782 const ComboAddress& requestor = requestorNM.getMaskedNetwork();
783 pbMessage.setServerIdentity(SyncRes::s_serverID);
784 pbMessage.update(dc->d_uuid, &requestor, &dc->d_local, dc->d_tcp, dc->d_mdp.d_header.id);
785 pbMessage.setEDNSSubnet(dc->d_ednssubnet.source, dc->d_ednssubnet.source.isIpv4() ? luaconfsLocal->protobufMaskV4 : luaconfsLocal->protobufMaskV6);
786 pbMessage.setQuestion(dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_mdp.d_qclass);
787 }
788 #endif /* HAVE_PROTOBUF */
789
790 DNSPacketWriter pw(packet, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_mdp.d_qclass);
791
792 pw.getHeader()->aa=0;
793 pw.getHeader()->ra=1;
794 pw.getHeader()->qr=1;
795 pw.getHeader()->tc=0;
796 pw.getHeader()->id=dc->d_mdp.d_header.id;
797 pw.getHeader()->rd=dc->d_mdp.d_header.rd;
798 pw.getHeader()->cd=dc->d_mdp.d_header.cd;
799
800 /* This is the lowest TTL seen in the records of the response,
801 so we can't cache it for longer than this value.
802 If we have a TTL cap, this value can't be larger than the
803 cap no matter what. */
804 uint32_t minTTL = dc->d_ttlCap;
805
806 SyncRes sr(dc->d_now);
807
808 bool DNSSECOK=false;
809 if(t_pdl) {
810 sr.setLuaEngine(t_pdl);
811 }
812 sr.d_requestor=dc->d_remote; // ECS needs this too
813 if(g_dnssecmode != DNSSECMode::Off) {
814 sr.setDoDNSSEC(true);
815
816 // Does the requestor want DNSSEC records?
817 if(edo.d_Z & EDNSOpts::DNSSECOK) {
818 DNSSECOK=true;
819 g_stats.dnssecQueries++;
820 }
821 } else {
822 // Ignore the client-set CD flag
823 pw.getHeader()->cd=0;
824 }
825 sr.setDNSSECValidationRequested(g_dnssecmode == DNSSECMode::ValidateAll || g_dnssecmode==DNSSECMode::ValidateForLog || ((dc->d_mdp.d_header.ad || DNSSECOK) && g_dnssecmode==DNSSECMode::Process));
826
827 #ifdef HAVE_PROTOBUF
828 sr.setInitialRequestId(dc->d_uuid);
829 #endif
830
831 if (g_useIncomingECS) {
832 sr.setIncomingECSFound(dc->d_ecsFound);
833 if (dc->d_ecsFound) {
834 sr.setIncomingECS(dc->d_ednssubnet);
835 }
836 }
837
838 bool tracedQuery=false; // we could consider letting Lua know about this too
839 bool variableAnswer = dc->d_variable;
840 bool shouldNotValidate = false;
841
842 /* preresolve expects res (dq.rcode) to be set to RCode::NoError by default */
843 int res = RCode::NoError;
844 DNSFilterEngine::Policy appliedPolicy;
845 DNSRecord spoofed;
846 RecursorLua4::DNSQuestion dq(dc->d_remote, dc->d_local, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_tcp, variableAnswer, wantsRPZ);
847 dq.ednsFlags = &edo.d_Z;
848 dq.ednsOptions = &dc->d_ednsOpts;
849 dq.tag = dc->d_tag;
850 dq.discardedPolicies = &sr.d_discardedPolicies;
851 dq.policyTags = &dc->d_policyTags;
852 dq.appliedPolicy = &appliedPolicy;
853 dq.currentRecords = &ret;
854 dq.dh = &dc->d_mdp.d_header;
855 dq.data = dc->d_data;
856 #ifdef HAVE_PROTOBUF
857 dq.requestorId = dc->d_requestorId;
858 dq.deviceId = dc->d_deviceId;
859 #endif
860
861 if(dc->d_mdp.d_qtype==QType::ANY && !dc->d_tcp && g_anyToTcp) {
862 pw.getHeader()->tc = 1;
863 res = 0;
864 variableAnswer = true;
865 goto sendit;
866 }
867
868 if(t_traceRegex && t_traceRegex->match(dc->d_mdp.d_qname.toString())) {
869 sr.setLogMode(SyncRes::Store);
870 tracedQuery=true;
871 }
872
873
874 if(!g_quiet || tracedQuery) {
875 L<<Logger::Warning<<t_id<<" ["<<MT->getTid()<<"/"<<MT->numProcesses()<<"] " << (dc->d_tcp ? "TCP " : "") << "question for '"<<dc->d_mdp.d_qname<<"|"
876 <<DNSRecordContent::NumberToType(dc->d_mdp.d_qtype)<<"' from "<<dc->getRemote();
877 if(!dc->d_ednssubnet.source.empty()) {
878 L<<" (ecs "<<dc->d_ednssubnet.source.toString()<<")";
879 }
880 L<<endl;
881 }
882
883 sr.setId(MT->getTid());
884 if(!dc->d_mdp.d_header.rd)
885 sr.setCacheOnly();
886
887 if (t_pdl) {
888 t_pdl->prerpz(dq, res);
889 }
890
891 // Check if the query has a policy attached to it
892 if (wantsRPZ) {
893 appliedPolicy = luaconfsLocal->dfe.getQueryPolicy(dc->d_mdp.d_qname, dc->d_remote, sr.d_discardedPolicies);
894 }
895
896 // if there is a RecursorLua active, and it 'took' the query in preResolve, we don't launch beginResolve
897 if(!t_pdl || !t_pdl->preresolve(dq, res)) {
898
899 sr.setWantsRPZ(wantsRPZ);
900 if(wantsRPZ) {
901 switch(appliedPolicy.d_kind) {
902 case DNSFilterEngine::PolicyKind::NoAction:
903 break;
904 case DNSFilterEngine::PolicyKind::Drop:
905 g_stats.policyDrops++;
906 g_stats.policyResults[appliedPolicy.d_kind]++;
907 delete dc;
908 dc=0;
909 return;
910 case DNSFilterEngine::PolicyKind::NXDOMAIN:
911 g_stats.policyResults[appliedPolicy.d_kind]++;
912 res=RCode::NXDomain;
913 goto haveAnswer;
914 case DNSFilterEngine::PolicyKind::NODATA:
915 g_stats.policyResults[appliedPolicy.d_kind]++;
916 res=RCode::NoError;
917 goto haveAnswer;
918 case DNSFilterEngine::PolicyKind::Custom:
919 g_stats.policyResults[appliedPolicy.d_kind]++;
920 res=RCode::NoError;
921 spoofed=appliedPolicy.getCustomRecord(dc->d_mdp.d_qname);
922 ret.push_back(spoofed);
923 handleRPZCustom(spoofed, QType(dc->d_mdp.d_qtype), sr, res, ret);
924 goto haveAnswer;
925 case DNSFilterEngine::PolicyKind::Truncate:
926 if(!dc->d_tcp) {
927 g_stats.policyResults[appliedPolicy.d_kind]++;
928 res=RCode::NoError;
929 pw.getHeader()->tc=1;
930 goto haveAnswer;
931 }
932 break;
933 }
934 }
935
936 // Query got not handled for QNAME Policy reasons, now actually go out to find an answer
937 try {
938 res = sr.beginResolve(dc->d_mdp.d_qname, QType(dc->d_mdp.d_qtype), dc->d_mdp.d_qclass, ret);
939 shouldNotValidate = sr.wasOutOfBand();
940 }
941 catch(ImmediateServFailException &e) {
942 if(g_logCommonErrors)
943 L<<Logger::Notice<<"Sending SERVFAIL to "<<dc->getRemote()<<" during resolve of '"<<dc->d_mdp.d_qname<<"' because: "<<e.reason<<endl;
944 res = RCode::ServFail;
945 }
946
947 dq.validationState = sr.getValidationState();
948
949 // During lookup, an NSDNAME or NSIP trigger was hit in RPZ
950 if (res == -2) { // XXX This block should be macro'd, it is repeated post-resolve.
951 appliedPolicy = sr.d_appliedPolicy;
952 g_stats.policyResults[appliedPolicy.d_kind]++;
953 switch(appliedPolicy.d_kind) {
954 case DNSFilterEngine::PolicyKind::NoAction: // This can never happen
955 throw PDNSException("NoAction policy returned while a NSDNAME or NSIP trigger was hit");
956 case DNSFilterEngine::PolicyKind::Drop:
957 g_stats.policyDrops++;
958 delete dc;
959 dc=0;
960 return;
961 case DNSFilterEngine::PolicyKind::NXDOMAIN:
962 ret.clear();
963 res=RCode::NXDomain;
964 goto haveAnswer;
965
966 case DNSFilterEngine::PolicyKind::NODATA:
967 ret.clear();
968 res=RCode::NoError;
969 goto haveAnswer;
970
971 case DNSFilterEngine::PolicyKind::Truncate:
972 if(!dc->d_tcp) {
973 ret.clear();
974 res=RCode::NoError;
975 pw.getHeader()->tc=1;
976 goto haveAnswer;
977 }
978 break;
979
980 case DNSFilterEngine::PolicyKind::Custom:
981 ret.clear();
982 res=RCode::NoError;
983 spoofed=appliedPolicy.getCustomRecord(dc->d_mdp.d_qname);
984 ret.push_back(spoofed);
985 handleRPZCustom(spoofed, QType(dc->d_mdp.d_qtype), sr, res, ret);
986 goto haveAnswer;
987 }
988 }
989
990 if (wantsRPZ) {
991 appliedPolicy = luaconfsLocal->dfe.getPostPolicy(ret, sr.d_discardedPolicies);
992 }
993
994 if(t_pdl) {
995 if(res == RCode::NoError) {
996 auto i=ret.cbegin();
997 for(; i!= ret.cend(); ++i)
998 if(i->d_type == dc->d_mdp.d_qtype && i->d_place == DNSResourceRecord::ANSWER)
999 break;
1000 if(i == ret.cend() && t_pdl->nodata(dq, res))
1001 shouldNotValidate = true;
1002
1003 }
1004 else if(res == RCode::NXDomain && t_pdl->nxdomain(dq, res))
1005 shouldNotValidate = true;
1006
1007 if(t_pdl->postresolve(dq, res))
1008 shouldNotValidate = true;
1009 }
1010
1011 if (wantsRPZ) { //XXX This block is repeated, see above
1012 g_stats.policyResults[appliedPolicy.d_kind]++;
1013 switch(appliedPolicy.d_kind) {
1014 case DNSFilterEngine::PolicyKind::NoAction:
1015 break;
1016 case DNSFilterEngine::PolicyKind::Drop:
1017 g_stats.policyDrops++;
1018 delete dc;
1019 dc=0;
1020 return;
1021 case DNSFilterEngine::PolicyKind::NXDOMAIN:
1022 ret.clear();
1023 res=RCode::NXDomain;
1024 goto haveAnswer;
1025
1026 case DNSFilterEngine::PolicyKind::NODATA:
1027 ret.clear();
1028 res=RCode::NoError;
1029 goto haveAnswer;
1030
1031 case DNSFilterEngine::PolicyKind::Truncate:
1032 if(!dc->d_tcp) {
1033 ret.clear();
1034 res=RCode::NoError;
1035 pw.getHeader()->tc=1;
1036 goto haveAnswer;
1037 }
1038 break;
1039
1040 case DNSFilterEngine::PolicyKind::Custom:
1041 ret.clear();
1042 res=RCode::NoError;
1043 spoofed=appliedPolicy.getCustomRecord(dc->d_mdp.d_qname);
1044 ret.push_back(spoofed);
1045 handleRPZCustom(spoofed, QType(dc->d_mdp.d_qtype), sr, res, ret);
1046 goto haveAnswer;
1047 }
1048 }
1049 }
1050 haveAnswer:;
1051 if(res == PolicyDecision::DROP) {
1052 g_stats.policyDrops++;
1053 delete dc;
1054 dc=0;
1055 return;
1056 }
1057 if(tracedQuery || res == -1 || res == RCode::ServFail || pw.getHeader()->rcode == RCode::ServFail)
1058 {
1059 string trace(sr.getTrace());
1060 if(!trace.empty()) {
1061 vector<string> lines;
1062 boost::split(lines, trace, boost::is_any_of("\n"));
1063 for(const string& line : lines) {
1064 if(!line.empty())
1065 L<<Logger::Warning<< line << endl;
1066 }
1067 }
1068 }
1069
1070 if(res == -1) {
1071 pw.getHeader()->rcode=RCode::ServFail;
1072 // no commit here, because no record
1073 g_stats.servFails++;
1074 }
1075 else {
1076 pw.getHeader()->rcode=res;
1077
1078 // Does the validation mode or query demand validation?
1079 if(!shouldNotValidate && sr.isDNSSECValidationRequested()) {
1080 try {
1081 if(sr.doLog()) {
1082 L<<Logger::Warning<<"Starting validation of answer to "<<dc->d_mdp.d_qname<<"|"<<QType(dc->d_mdp.d_qtype).getName()<<" for "<<dc->d_remote.toStringWithPort()<<endl;
1083 }
1084
1085 auto state = sr.getValidationState();
1086
1087 if(state == Secure) {
1088 if(sr.doLog()) {
1089 L<<Logger::Warning<<"Answer to "<<dc->d_mdp.d_qname<<"|"<<QType(dc->d_mdp.d_qtype).getName()<<" for "<<dc->d_remote.toStringWithPort()<<" validates correctly"<<endl;
1090 }
1091
1092 // Is the query source interested in the value of the ad-bit?
1093 if (dc->d_mdp.d_header.ad || DNSSECOK)
1094 pw.getHeader()->ad=1;
1095 }
1096 else if(state == Insecure) {
1097 if(sr.doLog()) {
1098 L<<Logger::Warning<<"Answer to "<<dc->d_mdp.d_qname<<"|"<<QType(dc->d_mdp.d_qtype).getName()<<" for "<<dc->d_remote.toStringWithPort()<<" validates as Insecure"<<endl;
1099 }
1100
1101 pw.getHeader()->ad=0;
1102 }
1103 else if(state == Bogus) {
1104 if(g_dnssecLogBogus || sr.doLog() || g_dnssecmode == DNSSECMode::ValidateForLog) {
1105 L<<Logger::Warning<<"Answer to "<<dc->d_mdp.d_qname<<"|"<<QType(dc->d_mdp.d_qtype).getName()<<" for "<<dc->d_remote.toStringWithPort()<<" validates as Bogus"<<endl;
1106 }
1107
1108 // Does the query or validation mode sending out a SERVFAIL on validation errors?
1109 if(!pw.getHeader()->cd && (g_dnssecmode == DNSSECMode::ValidateAll || dc->d_mdp.d_header.ad || DNSSECOK)) {
1110 if(sr.doLog()) {
1111 L<<Logger::Warning<<"Sending out SERVFAIL for "<<dc->d_mdp.d_qname<<"|"<<QType(dc->d_mdp.d_qtype).getName()<<" because recursor or query demands it for Bogus results"<<endl;
1112 }
1113
1114 pw.getHeader()->rcode=RCode::ServFail;
1115 goto sendit;
1116 } else {
1117 if(sr.doLog()) {
1118 L<<Logger::Warning<<"Not sending out SERVFAIL for "<<dc->d_mdp.d_qname<<"|"<<QType(dc->d_mdp.d_qtype).getName()<<" Bogus validation since neither config nor query demands this"<<endl;
1119 }
1120 }
1121 }
1122 }
1123 catch(ImmediateServFailException &e) {
1124 if(g_logCommonErrors)
1125 L<<Logger::Notice<<"Sending SERVFAIL to "<<dc->getRemote()<<" during validation of '"<<dc->d_mdp.d_qname<<"|"<<QType(dc->d_mdp.d_qtype).getName()<<"' because: "<<e.reason<<endl;
1126 pw.getHeader()->rcode=RCode::ServFail;
1127 goto sendit;
1128 }
1129 }
1130
1131 if(ret.size()) {
1132 orderAndShuffle(ret);
1133 if(auto sl = luaconfsLocal->sortlist.getOrderCmp(dc->d_remote)) {
1134 stable_sort(ret.begin(), ret.end(), *sl);
1135 variableAnswer=true;
1136 }
1137 }
1138
1139 bool needCommit = false;
1140 for(auto i=ret.cbegin(); i!=ret.cend(); ++i) {
1141 if( ! DNSSECOK &&
1142 ( i->d_type == QType::NSEC3 ||
1143 (
1144 ( i->d_type == QType::RRSIG || i->d_type==QType::NSEC ) &&
1145 (
1146 ( dc->d_mdp.d_qtype != i->d_type && dc->d_mdp.d_qtype != QType::ANY ) ||
1147 i->d_place != DNSResourceRecord::ANSWER
1148 )
1149 )
1150 )
1151 ) {
1152 continue;
1153 }
1154
1155 if (!addRecordToPacket(pw, *i, minTTL, dc->d_ttlCap, maxanswersize)) {
1156 needCommit = false;
1157 break;
1158 }
1159 needCommit = true;
1160
1161 #ifdef HAVE_PROTOBUF
1162 if(luaconfsLocal->protobufServer && (i->d_type == QType::A || i->d_type == QType::AAAA || i->d_type == QType::CNAME)) {
1163 pbMessage.addRR(*i);
1164 }
1165 #endif
1166 }
1167 if(needCommit)
1168 pw.commit();
1169 }
1170 sendit:;
1171
1172 if (haveEDNS) {
1173 /* we try to add the EDNS OPT RR even for truncated answers,
1174 as rfc6891 states:
1175 "The minimal response MUST be the DNS header, question section, and an
1176 OPT record. This MUST also occur when a truncated response (using
1177 the DNS header's TC bit) is returned."
1178 */
1179 if (addRecordToPacket(pw, makeOpt(edo.d_packetsize, 0, edo.d_Z), minTTL, dc->d_ttlCap, maxanswersize)) {
1180 pw.commit();
1181 }
1182 }
1183
1184 g_rs.submitResponse(dc->d_mdp.d_qtype, packet.size(), !dc->d_tcp);
1185 updateResponseStats(res, dc->d_remote, packet.size(), &dc->d_mdp.d_qname, dc->d_mdp.d_qtype);
1186 #ifdef HAVE_PROTOBUF
1187 if (luaconfsLocal->protobufServer && (!luaconfsLocal->protobufTaggedOnly || (appliedPolicy.d_name && !appliedPolicy.d_name->empty()) || !dc->d_policyTags.empty())) {
1188 pbMessage.setBytes(packet.size());
1189 pbMessage.setResponseCode(pw.getHeader()->rcode);
1190 if (appliedPolicy.d_name) {
1191 pbMessage.setAppliedPolicy(*appliedPolicy.d_name);
1192 pbMessage.setAppliedPolicyType(appliedPolicy.d_type);
1193 }
1194 pbMessage.setPolicyTags(dc->d_policyTags);
1195 pbMessage.setQueryTime(dc->d_now.tv_sec, dc->d_now.tv_usec);
1196 pbMessage.setRequestorId(dq.requestorId);
1197 pbMessage.setDeviceId(dq.deviceId);
1198 protobufLogResponse(luaconfsLocal->protobufServer, pbMessage);
1199 }
1200 #endif
1201 if(!dc->d_tcp) {
1202 struct msghdr msgh;
1203 struct iovec iov;
1204 char cbuf[256];
1205 fillMSGHdr(&msgh, &iov, cbuf, 0, (char*)&*packet.begin(), packet.size(), &dc->d_remote);
1206 msgh.msg_control=NULL;
1207
1208 if(g_fromtosockets.count(dc->d_socket)) {
1209 addCMsgSrcAddr(&msgh, cbuf, &dc->d_local, 0);
1210 }
1211 if(sendmsg(dc->d_socket, &msgh, 0) < 0 && g_logCommonErrors)
1212 L<<Logger::Warning<<"Sending UDP reply to client "<<dc->d_remote.toStringWithPort()<<" failed with: "<<strerror(errno)<<endl;
1213
1214 if(!SyncRes::s_nopacketcache && !variableAnswer && !sr.wasVariable() ) {
1215 t_packetCache->insertResponsePacket(dc->d_tag, dc->d_qhash, dc->d_query, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_mdp.d_qclass,
1216 string((const char*)&*packet.begin(), packet.size()),
1217 g_now.tv_sec,
1218 pw.getHeader()->rcode == RCode::ServFail ? SyncRes::s_packetcacheservfailttl :
1219 min(minTTL,SyncRes::s_packetcachettl),
1220 dc->d_ecsBegin,
1221 dc->d_ecsEnd,
1222 &pbMessage);
1223 }
1224 // else cerr<<"Not putting in packet cache: "<<sr.wasVariable()<<endl;
1225 }
1226 else {
1227 char buf[2];
1228 buf[0]=packet.size()/256;
1229 buf[1]=packet.size()%256;
1230
1231 Utility::iovec iov[2];
1232
1233 iov[0].iov_base=(void*)buf; iov[0].iov_len=2;
1234 iov[1].iov_base=(void*)&*packet.begin(); iov[1].iov_len = packet.size();
1235
1236 int wret=Utility::writev(dc->d_socket, iov, 2);
1237 bool hadError=true;
1238
1239 if(wret == 0)
1240 L<<Logger::Error<<"EOF writing TCP answer to "<<dc->getRemote()<<endl;
1241 else if(wret < 0 )
1242 L<<Logger::Error<<"Error writing TCP answer to "<<dc->getRemote()<<": "<< strerror(errno) <<endl;
1243 else if((unsigned int)wret != 2 + packet.size())
1244 L<<Logger::Error<<"Oops, partial answer sent to "<<dc->getRemote()<<" for "<<dc->d_mdp.d_qname<<" (size="<< (2 + packet.size()) <<", sent "<<wret<<")"<<endl;
1245 else
1246 hadError=false;
1247
1248 // update tcp connection status, either by closing or moving to 'BYTE0'
1249
1250 if(hadError) {
1251 // no need to remove us from FDM, we weren't there
1252 dc->d_socket = -1;
1253 }
1254 else {
1255 dc->d_tcpConnection->queriesCount++;
1256 if (g_tcpMaxQueriesPerConn && dc->d_tcpConnection->queriesCount >= g_tcpMaxQueriesPerConn) {
1257 dc->d_socket = -1;
1258 }
1259 else {
1260 dc->d_tcpConnection->state=TCPConnection::BYTE0;
1261 Utility::gettimeofday(&g_now, 0); // needs to be updated
1262 t_fdm->addReadFD(dc->d_socket, handleRunningTCPQuestion, dc->d_tcpConnection);
1263 t_fdm->setReadTTD(dc->d_socket, g_now, g_tcpTimeout);
1264 }
1265 }
1266 }
1267 float spent=makeFloat(sr.getNow()-dc->d_now);
1268 if(!g_quiet) {
1269 L<<Logger::Error<<t_id<<" ["<<MT->getTid()<<"/"<<MT->numProcesses()<<"] answer to "<<(dc->d_mdp.d_header.rd?"":"non-rd ")<<"question '"<<dc->d_mdp.d_qname<<"|"<<DNSRecordContent::NumberToType(dc->d_mdp.d_qtype);
1270 L<<"': "<<ntohs(pw.getHeader()->ancount)<<" answers, "<<ntohs(pw.getHeader()->arcount)<<" additional, took "<<sr.d_outqueries<<" packets, "<<
1271 sr.d_totUsec/1000.0<<" netw ms, "<< spent*1000.0<<" tot ms, "<<
1272 sr.d_throttledqueries<<" throttled, "<<sr.d_timeouts<<" timeouts, "<<sr.d_tcpoutqueries<<" tcp connections, rcode="<< res;
1273
1274 if(!shouldNotValidate && sr.isDNSSECValidationRequested()) {
1275 L<< ", dnssec="<<vStates[sr.getValidationState()];
1276 }
1277
1278 L<<endl;
1279
1280 }
1281
1282 if (sr.d_outqueries || sr.d_authzonequeries) {
1283 t_RC->cacheMisses++;
1284 }
1285 else {
1286 t_RC->cacheHits++;
1287 }
1288
1289 if(spent < 0.001)
1290 g_stats.answers0_1++;
1291 else if(spent < 0.010)
1292 g_stats.answers1_10++;
1293 else if(spent < 0.1)
1294 g_stats.answers10_100++;
1295 else if(spent < 1.0)
1296 g_stats.answers100_1000++;
1297 else
1298 g_stats.answersSlow++;
1299
1300 uint64_t newLat=(uint64_t)(spent*1000000);
1301 newLat = min(newLat,(uint64_t)(((uint64_t) g_networkTimeoutMsec)*1000)); // outliers of several minutes exist..
1302 g_stats.avgLatencyUsec=(1-1.0/g_latencyStatSize)*g_stats.avgLatencyUsec + (float)newLat/g_latencyStatSize;
1303 // no worries, we do this for packet cache hits elsewhere
1304
1305 auto ourtime = 1000.0*spent-sr.d_totUsec/1000.0; // in msec
1306 if(ourtime < 1)
1307 g_stats.ourtime0_1++;
1308 else if(ourtime < 2)
1309 g_stats.ourtime1_2++;
1310 else if(ourtime < 4)
1311 g_stats.ourtime2_4++;
1312 else if(ourtime < 8)
1313 g_stats.ourtime4_8++;
1314 else if(ourtime < 16)
1315 g_stats.ourtime8_16++;
1316 else if(ourtime < 32)
1317 g_stats.ourtime16_32++;
1318 else {
1319 // cerr<<"SLOW: "<<ourtime<<"ms -> "<<dc->d_mdp.d_qname<<"|"<<DNSRecordContent::NumberToType(dc->d_mdp.d_qtype)<<endl;
1320 g_stats.ourtimeSlow++;
1321 }
1322 if(ourtime >= 0.0) {
1323 newLat=ourtime*1000; // usec
1324 g_stats.avgLatencyOursUsec=(1-1.0/g_latencyStatSize)*g_stats.avgLatencyOursUsec + (float)newLat/g_latencyStatSize;
1325 }
1326 // cout<<dc->d_mdp.d_qname<<"\t"<<MT->getUsec()<<"\t"<<sr.d_outqueries<<endl;
1327 delete dc;
1328 dc=0;
1329 }
1330 catch(PDNSException &ae) {
1331 L<<Logger::Error<<"startDoResolve problem "<<makeLoginfo(dc)<<": "<<ae.reason<<endl;
1332 delete dc;
1333 }
1334 catch(MOADNSException& e) {
1335 L<<Logger::Error<<"DNS parser error "<<makeLoginfo(dc) <<": "<<dc->d_mdp.d_qname<<", "<<e.what()<<endl;
1336 delete dc;
1337 }
1338 catch(std::exception& e) {
1339 L<<Logger::Error<<"STL error "<< makeLoginfo(dc)<<": "<<e.what();
1340
1341 // Luawrapper nests the exception from Lua, so we unnest it here
1342 try {
1343 std::rethrow_if_nested(e);
1344 } catch(const std::exception& ne) {
1345 L<<". Extra info: "<<ne.what();
1346 } catch(...) {}
1347
1348 L<<endl;
1349 delete dc;
1350 }
1351 catch(...) {
1352 L<<Logger::Error<<"Any other exception in a resolver context "<< makeLoginfo(dc) <<endl;
1353 delete dc;
1354 }
1355
1356 g_stats.maxMThreadStackUsage = max(MT->getMaxStackUsage(), g_stats.maxMThreadStackUsage);
1357 }
1358
1359 static void makeControlChannelSocket(int processNum=-1)
1360 {
1361 string sockname=::arg()["socket-dir"]+"/"+s_programname;
1362 if(processNum >= 0)
1363 sockname += "."+std::to_string(processNum);
1364 sockname+=".controlsocket";
1365 s_rcc.listen(sockname);
1366
1367 int sockowner = -1;
1368 int sockgroup = -1;
1369
1370 if (!::arg().isEmpty("socket-group"))
1371 sockgroup=::arg().asGid("socket-group");
1372 if (!::arg().isEmpty("socket-owner"))
1373 sockowner=::arg().asUid("socket-owner");
1374
1375 if (sockgroup > -1 || sockowner > -1) {
1376 if(chown(sockname.c_str(), sockowner, sockgroup) < 0) {
1377 unixDie("Failed to chown control socket");
1378 }
1379 }
1380
1381 // do mode change if socket-mode is given
1382 if(!::arg().isEmpty("socket-mode")) {
1383 mode_t sockmode=::arg().asMode("socket-mode");
1384 if(chmod(sockname.c_str(), sockmode) < 0) {
1385 unixDie("Failed to chmod control socket");
1386 }
1387 }
1388 }
1389
1390 static bool getQNameAndSubnet(const std::string& question, DNSName* dnsname, uint16_t* qtype, uint16_t* qclass, EDNSSubnetOpts* ednssubnet, std::map<uint16_t, EDNSOptionView>* options)
1391 {
1392 bool found = false;
1393 const struct dnsheader* dh = (struct dnsheader*)question.c_str();
1394 size_t questionLen = question.length();
1395 unsigned int consumed=0;
1396 *dnsname=DNSName(question.c_str(), questionLen, sizeof(dnsheader), false, qtype, qclass, &consumed);
1397
1398 size_t pos= sizeof(dnsheader)+consumed+4;
1399 /* at least OPT root label (1), type (2), class (2) and ttl (4) + OPT RR rdlen (2)
1400 = 11 */
1401 if(ntohs(dh->arcount) == 1 && questionLen > pos + 11) { // this code can extract one (1) EDNS Subnet option
1402 /* OPT root label (1) followed by type (2) */
1403 if(question.at(pos)==0 && question.at(pos+1)==0 && question.at(pos+2)==QType::OPT) {
1404 if (!options) {
1405 char* ecsStart = nullptr;
1406 size_t ecsLen = 0;
1407 int res = getEDNSOption((char*)question.c_str()+pos+9, questionLen - pos - 9, EDNSOptionCode::ECS, &ecsStart, &ecsLen);
1408 if (res == 0 && ecsLen > 4) {
1409 EDNSSubnetOpts eso;
1410 if(getEDNSSubnetOptsFromString(ecsStart + 4, ecsLen - 4, &eso)) {
1411 *ednssubnet=eso;
1412 found = true;
1413 }
1414 }
1415 }
1416 else {
1417 int res = getEDNSOptions((char*)question.c_str()+pos+9, questionLen - pos - 9, *options);
1418 if (res == 0) {
1419 const auto& it = options->find(EDNSOptionCode::ECS);
1420 if (it != options->end() && it->second.content != nullptr && it->second.size > 0) {
1421 EDNSSubnetOpts eso;
1422 if(getEDNSSubnetOptsFromString(it->second.content, it->second.size, &eso)) {
1423 *ednssubnet=eso;
1424 found = true;
1425 }
1426 }
1427 }
1428 }
1429 }
1430 }
1431 return found;
1432 }
1433
1434 static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var)
1435 {
1436 shared_ptr<TCPConnection> conn=any_cast<shared_ptr<TCPConnection> >(var);
1437
1438 if(conn->state==TCPConnection::BYTE0) {
1439 ssize_t bytes=recv(conn->getFD(), conn->data, 2, 0);
1440 if(bytes==1)
1441 conn->state=TCPConnection::BYTE1;
1442 if(bytes==2) {
1443 conn->qlen=(((unsigned char)conn->data[0]) << 8)+ (unsigned char)conn->data[1];
1444 conn->bytesread=0;
1445 conn->state=TCPConnection::GETQUESTION;
1446 }
1447 if(!bytes || bytes < 0) {
1448 t_fdm->removeReadFD(fd);
1449 return;
1450 }
1451 }
1452 else if(conn->state==TCPConnection::BYTE1) {
1453 ssize_t bytes=recv(conn->getFD(), conn->data+1, 1, 0);
1454 if(bytes==1) {
1455 conn->state=TCPConnection::GETQUESTION;
1456 conn->qlen=(((unsigned char)conn->data[0]) << 8)+ (unsigned char)conn->data[1];
1457 conn->bytesread=0;
1458 }
1459 if(!bytes || bytes < 0) {
1460 if(g_logCommonErrors)
1461 L<<Logger::Error<<"TCP client "<< conn->d_remote.toString() <<" disconnected after first byte"<<endl;
1462 t_fdm->removeReadFD(fd);
1463 return;
1464 }
1465 }
1466 else if(conn->state==TCPConnection::GETQUESTION) {
1467 ssize_t bytes=recv(conn->getFD(), conn->data + conn->bytesread, conn->qlen - conn->bytesread, 0);
1468 if(!bytes || bytes < 0 || bytes > std::numeric_limits<std::uint16_t>::max()) {
1469 L<<Logger::Error<<"TCP client "<< conn->d_remote.toString() <<" disconnected while reading question body"<<endl;
1470 t_fdm->removeReadFD(fd);
1471 return;
1472 }
1473 conn->bytesread+=(uint16_t)bytes;
1474 if(conn->bytesread==conn->qlen) {
1475 t_fdm->removeReadFD(fd); // should no longer awake ourselves when there is data to read
1476
1477 DNSComboWriter* dc=nullptr;
1478 try {
1479 dc=new DNSComboWriter(conn->data, conn->qlen, g_now);
1480 }
1481 catch(MOADNSException &mde) {
1482 g_stats.clientParseError++;
1483 if(g_logCommonErrors)
1484 L<<Logger::Error<<"Unable to parse packet from TCP client "<< conn->d_remote.toString() <<endl;
1485 return;
1486 }
1487 dc->d_tcpConnection = conn; // carry the torch
1488 dc->setSocket(conn->getFD()); // this is the only time a copy is made of the actual fd
1489 dc->d_tcp=true;
1490 dc->setRemote(&conn->d_remote);
1491 ComboAddress dest;
1492 dest.reset();
1493 dest.sin4.sin_family = conn->d_remote.sin4.sin_family;
1494 socklen_t len = dest.getSocklen();
1495 getsockname(conn->getFD(), (sockaddr*)&dest, &len); // if this fails, we're ok with it
1496 dc->setLocal(dest);
1497 DNSName qname;
1498 uint16_t qtype=0;
1499 uint16_t qclass=0;
1500 bool needECS = false;
1501 string requestorId;
1502 string deviceId;
1503 #ifdef HAVE_PROTOBUF
1504 auto luaconfsLocal = g_luaconfs.getLocal();
1505 if (luaconfsLocal->protobufServer) {
1506 needECS = true;
1507 }
1508 #endif
1509
1510 if(needECS || (t_pdl && (t_pdl->d_gettag_ffi || t_pdl->d_gettag))) {
1511
1512 try {
1513 std::map<uint16_t, EDNSOptionView> ednsOptions;
1514 dc->d_ecsParsed = true;
1515 dc->d_ecsFound = getQNameAndSubnet(std::string(conn->data, conn->qlen), &qname, &qtype, &qclass, &dc->d_ednssubnet, g_gettagNeedsEDNSOptions ? &ednsOptions : nullptr);
1516
1517 if(t_pdl) {
1518 try {
1519 if (t_pdl->d_gettag_ffi) {
1520 dc->d_tag = t_pdl->gettag_ffi(conn->d_remote, dc->d_ednssubnet.source, dest, qname, qtype, &dc->d_policyTags, dc->d_data, ednsOptions, true, requestorId, deviceId, dc->d_ttlCap, dc->d_variable);
1521 }
1522 else if (t_pdl->d_gettag) {
1523 dc->d_tag = t_pdl->gettag(conn->d_remote, dc->d_ednssubnet.source, dest, qname, qtype, &dc->d_policyTags, dc->d_data, ednsOptions, true, requestorId, deviceId);
1524 }
1525 }
1526 catch(const std::exception& e) {
1527 if(g_logCommonErrors)
1528 L<<Logger::Warning<<"Error parsing a query packet qname='"<<qname<<"' for tag determination, setting tag=0: "<<e.what()<<endl;
1529 }
1530 }
1531 }
1532 catch(const std::exception& e)
1533 {
1534 if(g_logCommonErrors)
1535 L<<Logger::Warning<<"Error parsing a query packet for tag determination, setting tag=0: "<<e.what()<<endl;
1536 }
1537 }
1538 #ifdef HAVE_PROTOBUF
1539 if(luaconfsLocal->protobufServer || luaconfsLocal->outgoingProtobufServer) {
1540 dc->d_requestorId = requestorId;
1541 dc->d_deviceId = deviceId;
1542 dc->d_uuid = (*t_uuidGenerator)();
1543 }
1544
1545 const struct dnsheader* dh = (const struct dnsheader*) conn->data;
1546 if(luaconfsLocal->protobufServer) {
1547 try {
1548
1549 if (!luaconfsLocal->protobufTaggedOnly) {
1550 protobufLogQuery(luaconfsLocal->protobufServer, luaconfsLocal->protobufMaskV4, luaconfsLocal->protobufMaskV6, dc->d_uuid, conn->d_remote, dest, dc->d_ednssubnet.source, true, dh->id, conn->qlen, qname, qtype, qclass, dc->d_policyTags, dc->d_requestorId, dc->d_deviceId);
1551 }
1552 }
1553 catch(std::exception& e) {
1554 if(g_logCommonErrors)
1555 L<<Logger::Warning<<"Error parsing a TCP query packet for edns subnet: "<<e.what()<<endl;
1556 }
1557 }
1558 #endif
1559 if(t_pdl) {
1560 if(t_pdl->ipfilter(dc->d_remote, dc->d_local, *dh)) {
1561 delete dc;
1562 if(!g_quiet)
1563 L<<Logger::Notice<<t_id<<" ["<<MT->getTid()<<"/"<<MT->numProcesses()<<"] DROPPED TCP question from "<<conn->d_remote.toStringWithPort()<<" based on policy"<<endl;
1564 g_stats.policyDrops++;
1565 return;
1566 }
1567 }
1568
1569 if(dc->d_mdp.d_header.qr) {
1570 delete dc;
1571 g_stats.ignoredCount++;
1572 L<<Logger::Error<<"Ignoring answer from TCP client "<< conn->d_remote.toString() <<" on server socket!"<<endl;
1573 return;
1574 }
1575 if(dc->d_mdp.d_header.opcode) {
1576 delete dc;
1577 g_stats.ignoredCount++;
1578 L<<Logger::Error<<"Ignoring non-query opcode from TCP client "<< conn->d_remote.toString() <<" on server socket!"<<endl;
1579 return;
1580 }
1581 else {
1582 ++g_stats.qcounter;
1583 ++g_stats.tcpqcounter;
1584 MT->makeThread(startDoResolve, dc); // deletes dc, will set state to BYTE0 again
1585 return;
1586 }
1587 }
1588 }
1589 }
1590
1591 //! Handle new incoming TCP connection
1592 static void handleNewTCPQuestion(int fd, FDMultiplexer::funcparam_t& )
1593 {
1594 ComboAddress addr;
1595 socklen_t addrlen=sizeof(addr);
1596 int newsock=accept(fd, (struct sockaddr*)&addr, &addrlen);
1597 if(newsock>=0) {
1598 if(MT->numProcesses() > g_maxMThreads) {
1599 g_stats.overCapacityDrops++;
1600 try {
1601 closesocket(newsock);
1602 }
1603 catch(const PDNSException& e) {
1604 L<<Logger::Error<<"Error closing TCP socket after an over capacity drop: "<<e.reason<<endl;
1605 }
1606 return;
1607 }
1608
1609 if(t_remotes)
1610 t_remotes->push_back(addr);
1611 if(t_allowFrom && !t_allowFrom->match(&addr)) {
1612 if(!g_quiet)
1613 L<<Logger::Error<<"["<<MT->getTid()<<"] dropping TCP query from "<<addr.toString()<<", address not matched by allow-from"<<endl;
1614
1615 g_stats.unauthorizedTCP++;
1616 try {
1617 closesocket(newsock);
1618 }
1619 catch(const PDNSException& e) {
1620 L<<Logger::Error<<"Error closing TCP socket after an ACL drop: "<<e.reason<<endl;
1621 }
1622 return;
1623 }
1624 if(g_maxTCPPerClient && t_tcpClientCounts->count(addr) && (*t_tcpClientCounts)[addr] >= g_maxTCPPerClient) {
1625 g_stats.tcpClientOverflow++;
1626 try {
1627 closesocket(newsock); // don't call TCPConnection::closeAndCleanup here - did not enter it in the counts yet!
1628 }
1629 catch(const PDNSException& e) {
1630 L<<Logger::Error<<"Error closing TCP socket after an overflow drop: "<<e.reason<<endl;
1631 }
1632 return;
1633 }
1634
1635 setNonBlocking(newsock);
1636 std::shared_ptr<TCPConnection> tc = std::make_shared<TCPConnection>(newsock, addr);
1637 tc->state=TCPConnection::BYTE0;
1638
1639 t_fdm->addReadFD(tc->getFD(), handleRunningTCPQuestion, tc);
1640
1641 struct timeval now;
1642 Utility::gettimeofday(&now, 0);
1643 t_fdm->setReadTTD(tc->getFD(), now, g_tcpTimeout);
1644 }
1645 }
1646
1647 static string* doProcessUDPQuestion(const std::string& question, const ComboAddress& fromaddr, const ComboAddress& destaddr, struct timeval tv, int fd)
1648 {
1649 gettimeofday(&g_now, 0);
1650 struct timeval diff = g_now - tv;
1651 double delta=(diff.tv_sec*1000 + diff.tv_usec/1000.0);
1652
1653 if(tv.tv_sec && delta > 1000.0) {
1654 g_stats.tooOldDrops++;
1655 return 0;
1656 }
1657
1658 ++g_stats.qcounter;
1659 if(fromaddr.sin4.sin_family==AF_INET6)
1660 g_stats.ipv6qcounter++;
1661
1662 string response;
1663 const struct dnsheader* dh = (struct dnsheader*)question.c_str();
1664 unsigned int ctag=0;
1665 uint32_t qhash = 0;
1666 bool needECS = false;
1667 std::vector<std::string> policyTags;
1668 LuaContext::LuaObject data;
1669 string requestorId;
1670 string deviceId;
1671 #ifdef HAVE_PROTOBUF
1672 boost::uuids::uuid uniqueId;
1673 auto luaconfsLocal = g_luaconfs.getLocal();
1674 if (luaconfsLocal->protobufServer) {
1675 uniqueId = (*t_uuidGenerator)();
1676 needECS = true;
1677 } else if (luaconfsLocal->outgoingProtobufServer) {
1678 uniqueId = (*t_uuidGenerator)();
1679 }
1680 #endif
1681 EDNSSubnetOpts ednssubnet;
1682 bool ecsFound = false;
1683 bool ecsParsed = false;
1684 uint16_t ecsBegin = 0;
1685 uint16_t ecsEnd = 0;
1686 uint32_t ttlCap = std::numeric_limits<uint32_t>::max();
1687 bool variable = false;
1688 try {
1689 DNSName qname;
1690 uint16_t qtype=0;
1691 uint16_t qclass=0;
1692 uint32_t age;
1693 bool qnameParsed=false;
1694 #ifdef MALLOC_TRACE
1695 /*
1696 static uint64_t last=0;
1697 if(!last)
1698 g_mtracer->clearAllocators();
1699 cout<<g_mtracer->getAllocs()-last<<" "<<g_mtracer->getNumOut()<<" -- BEGIN TRACE"<<endl;
1700 last=g_mtracer->getAllocs();
1701 cout<<g_mtracer->topAllocatorsString()<<endl;
1702 g_mtracer->clearAllocators();
1703 */
1704 #endif
1705
1706 if(needECS || (t_pdl && (t_pdl->d_gettag || t_pdl->d_gettag_ffi))) {
1707 try {
1708 std::map<uint16_t, EDNSOptionView> ednsOptions;
1709 ecsFound = getQNameAndSubnet(question, &qname, &qtype, &qclass, &ednssubnet, g_gettagNeedsEDNSOptions ? &ednsOptions : nullptr);
1710 qnameParsed = true;
1711 ecsParsed = true;
1712
1713 if(t_pdl) {
1714 try {
1715 if (t_pdl->d_gettag_ffi) {
1716 ctag = t_pdl->gettag_ffi(fromaddr, ednssubnet.source, destaddr, qname, qtype, &policyTags, data, ednsOptions, false, requestorId, deviceId, ttlCap, variable);
1717 }
1718 else if (t_pdl->d_gettag) {
1719 ctag=t_pdl->gettag(fromaddr, ednssubnet.source, destaddr, qname, qtype, &policyTags, data, ednsOptions, false, requestorId, deviceId);
1720 }
1721 }
1722 catch(const std::exception& e) {
1723 if(g_logCommonErrors)
1724 L<<Logger::Warning<<"Error parsing a query packet qname='"<<qname<<"' for tag determination, setting tag=0: "<<e.what()<<endl;
1725 }
1726 }
1727 }
1728 catch(const std::exception& e)
1729 {
1730 if(g_logCommonErrors)
1731 L<<Logger::Warning<<"Error parsing a query packet for tag determination, setting tag=0: "<<e.what()<<endl;
1732 }
1733 }
1734
1735 bool cacheHit = false;
1736 RecProtoBufMessage pbMessage(DNSProtoBufMessage::DNSProtoBufMessageType::Response);
1737 #ifdef HAVE_PROTOBUF
1738 pbMessage.setServerIdentity(SyncRes::s_serverID);
1739 if(luaconfsLocal->protobufServer) {
1740 if (!luaconfsLocal->protobufTaggedOnly || !policyTags.empty()) {
1741 protobufLogQuery(luaconfsLocal->protobufServer, luaconfsLocal->protobufMaskV4, luaconfsLocal->protobufMaskV6, uniqueId, fromaddr, destaddr, ednssubnet.source, false, dh->id, question.size(), qname, qtype, qclass, policyTags, requestorId, deviceId);
1742 }
1743 }
1744 #endif /* HAVE_PROTOBUF */
1745
1746 /* It might seem like a good idea to skip the packet cache lookup if we know that the answer is not cacheable,
1747 but it means that the hash would not be computed. If some script decides at a later time to mark back the answer
1748 as cacheable we would cache it with a wrong tag, so better safe than sorry. */
1749 if (qnameParsed) {
1750 cacheHit = (!SyncRes::s_nopacketcache && t_packetCache->getResponsePacket(ctag, question, qname, qtype, qclass, g_now.tv_sec, &response, &age, &qhash, &ecsBegin, &ecsEnd, &pbMessage));
1751 }
1752 else {
1753 cacheHit = (!SyncRes::s_nopacketcache && t_packetCache->getResponsePacket(ctag, question, qname, &qtype, &qclass, g_now.tv_sec, &response, &age, &qhash, &ecsBegin, &ecsEnd, &pbMessage));
1754 }
1755
1756 if (cacheHit) {
1757 #ifdef HAVE_PROTOBUF
1758 if(luaconfsLocal->protobufServer && (!luaconfsLocal->protobufTaggedOnly || !pbMessage.getAppliedPolicy().empty() || !pbMessage.getPolicyTags().empty())) {
1759 Netmask requestorNM(fromaddr, fromaddr.sin4.sin_family == AF_INET ? luaconfsLocal->protobufMaskV4 : luaconfsLocal->protobufMaskV6);
1760 const ComboAddress& requestor = requestorNM.getMaskedNetwork();
1761 pbMessage.update(uniqueId, &requestor, &destaddr, false, dh->id);
1762 pbMessage.setEDNSSubnet(ednssubnet.source, ednssubnet.source.isIpv4() ? luaconfsLocal->protobufMaskV4 : luaconfsLocal->protobufMaskV6);
1763 pbMessage.setQueryTime(g_now.tv_sec, g_now.tv_usec);
1764 pbMessage.setRequestorId(requestorId);
1765 pbMessage.setDeviceId(deviceId);
1766 protobufLogResponse(luaconfsLocal->protobufServer, pbMessage);
1767 }
1768 #endif /* HAVE_PROTOBUF */
1769 if(!g_quiet)
1770 L<<Logger::Notice<<t_id<< " question answered from packet cache tag="<<ctag<<" from "<<fromaddr.toString()<<endl;
1771
1772 g_stats.packetCacheHits++;
1773 SyncRes::s_queries++;
1774 ageDNSPacket(response, age);
1775 struct msghdr msgh;
1776 struct iovec iov;
1777 char cbuf[256];
1778 fillMSGHdr(&msgh, &iov, cbuf, 0, (char*)response.c_str(), response.length(), const_cast<ComboAddress*>(&fromaddr));
1779 msgh.msg_control=NULL;
1780
1781 if(g_fromtosockets.count(fd)) {
1782 addCMsgSrcAddr(&msgh, cbuf, &destaddr, 0);
1783 }
1784 if(sendmsg(fd, &msgh, 0) < 0 && g_logCommonErrors)
1785 L<<Logger::Warning<<"Sending UDP reply to client "<<fromaddr.toStringWithPort()<<" failed with: "<<strerror(errno)<<endl;
1786
1787 if(response.length() >= sizeof(struct dnsheader)) {
1788 struct dnsheader tmpdh;
1789 memcpy(&tmpdh, response.c_str(), sizeof(tmpdh));
1790 updateResponseStats(tmpdh.rcode, fromaddr, response.length(), 0, 0);
1791 }
1792 g_stats.avgLatencyUsec=(1-1.0/g_latencyStatSize)*g_stats.avgLatencyUsec + 0.0; // we assume 0 usec
1793 g_stats.avgLatencyOursUsec=(1-1.0/g_latencyStatSize)*g_stats.avgLatencyOursUsec + 0.0; // we assume 0 usec
1794 return 0;
1795 }
1796 }
1797 catch(std::exception& e) {
1798 L<<Logger::Error<<"Error processing or aging answer packet: "<<e.what()<<endl;
1799 return 0;
1800 }
1801
1802 if(t_pdl) {
1803 if(t_pdl->ipfilter(fromaddr, destaddr, *dh)) {
1804 if(!g_quiet)
1805 L<<Logger::Notice<<t_id<<" ["<<MT->getTid()<<"/"<<MT->numProcesses()<<"] DROPPED question from "<<fromaddr.toStringWithPort()<<" based on policy"<<endl;
1806 g_stats.policyDrops++;
1807 return 0;
1808 }
1809 }
1810
1811 if(MT->numProcesses() > g_maxMThreads) {
1812 if(!g_quiet)
1813 L<<Logger::Notice<<t_id<<" ["<<MT->getTid()<<"/"<<MT->numProcesses()<<"] DROPPED question from "<<fromaddr.toStringWithPort()<<", over capacity"<<endl;
1814
1815 g_stats.overCapacityDrops++;
1816 return 0;
1817 }
1818
1819 DNSComboWriter* dc = new DNSComboWriter(question.c_str(), question.size(), g_now);
1820 dc->setSocket(fd);
1821 dc->d_tag=ctag;
1822 dc->d_qhash=qhash;
1823 dc->d_query = question;
1824 dc->setRemote(&fromaddr);
1825 dc->setLocal(destaddr);
1826 dc->d_tcp=false;
1827 dc->d_policyTags = policyTags;
1828 dc->d_data = data;
1829 dc->d_ecsFound = ecsFound;
1830 dc->d_ecsParsed = ecsParsed;
1831 dc->d_ecsBegin = ecsBegin;
1832 dc->d_ecsEnd = ecsEnd;
1833 dc->d_ednssubnet = ednssubnet;
1834 dc->d_ttlCap = ttlCap;
1835 dc->d_variable = variable;
1836 #ifdef HAVE_PROTOBUF
1837 if (luaconfsLocal->protobufServer || luaconfsLocal->outgoingProtobufServer) {
1838 dc->d_uuid = uniqueId;
1839 }
1840 dc->d_requestorId = requestorId;
1841 dc->d_deviceId = deviceId;
1842 #endif
1843
1844 MT->makeThread(startDoResolve, (void*) dc); // deletes dc
1845 return 0;
1846 }
1847
1848
1849 static void handleNewUDPQuestion(int fd, FDMultiplexer::funcparam_t& var)
1850 {
1851 ssize_t len;
1852 char data[1500];
1853 ComboAddress fromaddr;
1854 struct msghdr msgh;
1855 struct iovec iov;
1856 char cbuf[256];
1857 bool firstQuery = true;
1858
1859 fromaddr.sin6.sin6_family=AF_INET6; // this makes sure fromaddr is big enough
1860 fillMSGHdr(&msgh, &iov, cbuf, sizeof(cbuf), data, sizeof(data), &fromaddr);
1861
1862 for(size_t counter = 0; counter < s_maxUDPQueriesPerRound; counter++)
1863 if((len=recvmsg(fd, &msgh, 0)) >= 0) {
1864
1865 firstQuery = false;
1866
1867 if(t_remotes)
1868 t_remotes->push_back(fromaddr);
1869
1870 if(t_allowFrom && !t_allowFrom->match(&fromaddr)) {
1871 if(!g_quiet)
1872 L<<Logger::Error<<"["<<MT->getTid()<<"] dropping UDP query from "<<fromaddr.toString()<<", address not matched by allow-from"<<endl;
1873
1874 g_stats.unauthorizedUDP++;
1875 return;
1876 }
1877 BOOST_STATIC_ASSERT(offsetof(sockaddr_in, sin_port) == offsetof(sockaddr_in6, sin6_port));
1878 if(!fromaddr.sin4.sin_port) { // also works for IPv6
1879 if(!g_quiet)
1880 L<<Logger::Error<<"["<<MT->getTid()<<"] dropping UDP query from "<<fromaddr.toStringWithPort()<<", can't deal with port 0"<<endl;
1881
1882 g_stats.clientParseError++; // not quite the best place to put it, but needs to go somewhere
1883 return;
1884 }
1885 try {
1886 dnsheader* dh=(dnsheader*)data;
1887
1888 if(dh->qr) {
1889 g_stats.ignoredCount++;
1890 if(g_logCommonErrors)
1891 L<<Logger::Error<<"Ignoring answer from "<<fromaddr.toString()<<" on server socket!"<<endl;
1892 }
1893 else if(dh->opcode) {
1894 g_stats.ignoredCount++;
1895 if(g_logCommonErrors)
1896 L<<Logger::Error<<"Ignoring non-query opcode "<<dh->opcode<<" from "<<fromaddr.toString()<<" on server socket!"<<endl;
1897 }
1898 else {
1899 string question(data, (size_t)len);
1900 struct timeval tv={0,0};
1901 HarvestTimestamp(&msgh, &tv);
1902 ComboAddress dest;
1903 dest.reset(); // this makes sure we ignore this address if not returned by recvmsg above
1904 auto loc = rplookup(g_listenSocketsAddresses, fd);
1905 if(HarvestDestinationAddress(&msgh, &dest)) {
1906 // but.. need to get port too
1907 if(loc)
1908 dest.sin4.sin_port = loc->sin4.sin_port;
1909 }
1910 else {
1911 if(loc) {
1912 dest = *loc;
1913 }
1914 else {
1915 dest.sin4.sin_family = fromaddr.sin4.sin_family;
1916 socklen_t slen = dest.getSocklen();
1917 getsockname(fd, (sockaddr*)&dest, &slen); // if this fails, we're ok with it
1918 }
1919 }
1920 if(g_weDistributeQueries)
1921 distributeAsyncFunction(question, boost::bind(doProcessUDPQuestion, question, fromaddr, dest, tv, fd));
1922 else
1923 doProcessUDPQuestion(question, fromaddr, dest, tv, fd);
1924 }
1925 }
1926 catch(MOADNSException& mde) {
1927 g_stats.clientParseError++;
1928 if(g_logCommonErrors)
1929 L<<Logger::Error<<"Unable to parse packet from remote UDP client "<<fromaddr.toString() <<": "<<mde.what()<<endl;
1930 }
1931 catch(std::runtime_error& e) {
1932 g_stats.clientParseError++;
1933 if(g_logCommonErrors)
1934 L<<Logger::Error<<"Unable to parse packet from remote UDP client "<<fromaddr.toString() <<": "<<e.what()<<endl;
1935 }
1936 }
1937 else {
1938 // cerr<<t_id<<" had error: "<<stringerror()<<endl;
1939 if(firstQuery && errno == EAGAIN)
1940 g_stats.noPacketError++;
1941
1942 break;
1943 }
1944 }
1945
1946 static void makeTCPServerSockets(unsigned int threadId)
1947 {
1948 int fd;
1949 vector<string>locals;
1950 stringtok(locals,::arg()["local-address"]," ,");
1951
1952 if(locals.empty())
1953 throw PDNSException("No local address specified");
1954
1955 for(vector<string>::const_iterator i=locals.begin();i!=locals.end();++i) {
1956 ServiceTuple st;
1957 st.port=::arg().asNum("local-port");
1958 parseService(*i, st);
1959
1960 ComboAddress sin;
1961
1962 sin.reset();
1963 sin.sin4.sin_family = AF_INET;
1964 if(!IpToU32(st.host, (uint32_t*)&sin.sin4.sin_addr.s_addr)) {
1965 sin.sin6.sin6_family = AF_INET6;
1966 if(makeIPv6sockaddr(st.host, &sin.sin6) < 0)
1967 throw PDNSException("Unable to resolve local address for TCP server on '"+ st.host +"'");
1968 }
1969
1970 fd=socket(sin.sin6.sin6_family, SOCK_STREAM, 0);
1971 if(fd<0)
1972 throw PDNSException("Making a TCP server socket for resolver: "+stringerror());
1973
1974 setCloseOnExec(fd);
1975
1976 int tmp=1;
1977 if(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &tmp, sizeof tmp)<0) {
1978 L<<Logger::Error<<"Setsockopt failed for TCP listening socket"<<endl;
1979 exit(1);
1980 }
1981 if(sin.sin6.sin6_family == AF_INET6 && setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &tmp, sizeof(tmp)) < 0) {
1982 L<<Logger::Error<<"Failed to set IPv6 socket to IPv6 only, continuing anyhow: "<<strerror(errno)<<endl;
1983 }
1984
1985 #ifdef TCP_DEFER_ACCEPT
1986 if(setsockopt(fd, SOL_TCP, TCP_DEFER_ACCEPT, &tmp, sizeof tmp) >= 0) {
1987 if(i==locals.begin())
1988 L<<Logger::Error<<"Enabled TCP data-ready filter for (slight) DoS protection"<<endl;
1989 }
1990 #endif
1991
1992 if( ::arg().mustDo("non-local-bind") )
1993 Utility::setBindAny(AF_INET, fd);
1994
1995 #ifdef SO_REUSEPORT
1996 if(g_reusePort) {
1997 if(setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &tmp, sizeof(tmp)) < 0)
1998 throw PDNSException("SO_REUSEPORT: "+stringerror());
1999 }
2000 #endif
2001
2002 if (::arg().asNum("tcp-fast-open") > 0) {
2003 #ifdef TCP_FASTOPEN
2004 int fastOpenQueueSize = ::arg().asNum("tcp-fast-open");
2005 if (setsockopt(fd, IPPROTO_TCP, TCP_FASTOPEN, &fastOpenQueueSize, sizeof fastOpenQueueSize) < 0) {
2006 L<<Logger::Error<<"Failed to enable TCP Fast Open for listening socket: "<<strerror(errno)<<endl;
2007 }
2008 #else
2009 L<<Logger::Warning<<"TCP Fast Open configured but not supported for listening socket"<<endl;
2010 #endif
2011 }
2012
2013 sin.sin4.sin_port = htons(st.port);
2014 socklen_t socklen=sin.sin4.sin_family==AF_INET ? sizeof(sin.sin4) : sizeof(sin.sin6);
2015 if (::bind(fd, (struct sockaddr *)&sin, socklen )<0)
2016 throw PDNSException("Binding TCP server socket for "+ st.host +": "+stringerror());
2017
2018 setNonBlocking(fd);
2019 setSocketSendBuffer(fd, 65000);
2020 listen(fd, 128);
2021 deferredAdds[threadId].push_back(make_pair(fd, handleNewTCPQuestion));
2022 g_tcpListenSockets[threadId].insert(fd);
2023 // we don't need to update g_listenSocketsAddresses since it doesn't work for TCP/IP:
2024 // - fd is not that which we know here, but returned from accept()
2025 if(sin.sin4.sin_family == AF_INET)
2026 L<<Logger::Error<<"Listening for TCP queries on "<< sin.toString() <<":"<<st.port<<endl;
2027 else
2028 L<<Logger::Error<<"Listening for TCP queries on ["<< sin.toString() <<"]:"<<st.port<<endl;
2029 }
2030 }
2031
2032 static void makeUDPServerSockets(unsigned int threadId)
2033 {
2034 int one=1;
2035 vector<string>locals;
2036 stringtok(locals,::arg()["local-address"]," ,");
2037
2038 if(locals.empty())
2039 throw PDNSException("No local address specified");
2040
2041 for(vector<string>::const_iterator i=locals.begin();i!=locals.end();++i) {
2042 ServiceTuple st;
2043 st.port=::arg().asNum("local-port");
2044 parseService(*i, st);
2045
2046 ComboAddress sin;
2047
2048 sin.reset();
2049 sin.sin4.sin_family = AF_INET;
2050 if(!IpToU32(st.host.c_str() , (uint32_t*)&sin.sin4.sin_addr.s_addr)) {
2051 sin.sin6.sin6_family = AF_INET6;
2052 if(makeIPv6sockaddr(st.host, &sin.sin6) < 0)
2053 throw PDNSException("Unable to resolve local address for UDP server on '"+ st.host +"'");
2054 }
2055
2056 int fd=socket(sin.sin4.sin_family, SOCK_DGRAM, 0);
2057 if(fd < 0) {
2058 throw PDNSException("Making a UDP server socket for resolver: "+netstringerror());
2059 }
2060 if (!setSocketTimestamps(fd))
2061 L<<Logger::Warning<<"Unable to enable timestamp reporting for socket"<<endl;
2062
2063 if(IsAnyAddress(sin)) {
2064 if(sin.sin4.sin_family == AF_INET)
2065 if(!setsockopt(fd, IPPROTO_IP, GEN_IP_PKTINFO, &one, sizeof(one))) // linux supports this, so why not - might fail on other systems
2066 g_fromtosockets.insert(fd);
2067 #ifdef IPV6_RECVPKTINFO
2068 if(sin.sin4.sin_family == AF_INET6)
2069 if(!setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, &one, sizeof(one)))
2070 g_fromtosockets.insert(fd);
2071 #endif
2072 if(sin.sin6.sin6_family == AF_INET6 && setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one)) < 0) {
2073 L<<Logger::Error<<"Failed to set IPv6 socket to IPv6 only, continuing anyhow: "<<strerror(errno)<<endl;
2074 }
2075 }
2076 if( ::arg().mustDo("non-local-bind") )
2077 Utility::setBindAny(AF_INET6, fd);
2078
2079 setCloseOnExec(fd);
2080
2081 setSocketReceiveBuffer(fd, 250000);
2082 sin.sin4.sin_port = htons(st.port);
2083
2084
2085 #ifdef SO_REUSEPORT
2086 if(g_reusePort) {
2087 if(setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &one, sizeof(one)) < 0)
2088 throw PDNSException("SO_REUSEPORT: "+stringerror());
2089 }
2090 #endif
2091 socklen_t socklen=sin.getSocklen();
2092 if (::bind(fd, (struct sockaddr *)&sin, socklen)<0)
2093 throw PDNSException("Resolver binding to server socket on port "+ std::to_string(st.port) +" for "+ st.host+": "+stringerror());
2094
2095 setNonBlocking(fd);
2096
2097 deferredAdds[threadId].push_back(make_pair(fd, handleNewUDPQuestion));
2098 g_listenSocketsAddresses[fd]=sin; // this is written to only from the startup thread, not from the workers
2099 if(sin.sin4.sin_family == AF_INET)
2100 L<<Logger::Error<<"Listening for UDP queries on "<< sin.toString() <<":"<<st.port<<endl;
2101 else
2102 L<<Logger::Error<<"Listening for UDP queries on ["<< sin.toString() <<"]:"<<st.port<<endl;
2103 }
2104 }
2105
2106 static void daemonize(void)
2107 {
2108 if(fork())
2109 exit(0); // bye bye
2110
2111 setsid();
2112
2113 int i=open("/dev/null",O_RDWR); /* open stdin */
2114 if(i < 0)
2115 L<<Logger::Critical<<"Unable to open /dev/null: "<<stringerror()<<endl;
2116 else {
2117 dup2(i,0); /* stdin */
2118 dup2(i,1); /* stderr */
2119 dup2(i,2); /* stderr */
2120 close(i);
2121 }
2122 }
2123
2124 static void usr1Handler(int)
2125 {
2126 statsWanted=true;
2127 }
2128
2129 static void usr2Handler(int)
2130 {
2131 g_quiet= !g_quiet;
2132 SyncRes::setDefaultLogMode(g_quiet ? SyncRes::LogNone : SyncRes::Log);
2133 ::arg().set("quiet")=g_quiet ? "" : "no";
2134 }
2135
2136 static void doStats(void)
2137 {
2138 static time_t lastOutputTime;
2139 static uint64_t lastQueryCount;
2140
2141 uint64_t cacheHits = broadcastAccFunction<uint64_t>(pleaseGetCacheHits);
2142 uint64_t cacheMisses = broadcastAccFunction<uint64_t>(pleaseGetCacheMisses);
2143
2144 if(g_stats.qcounter && (cacheHits + cacheMisses) && SyncRes::s_queries && SyncRes::s_outqueries) {
2145 L<<Logger::Notice<<"stats: "<<g_stats.qcounter<<" questions, "<<
2146 broadcastAccFunction<uint64_t>(pleaseGetCacheSize)<< " cache entries, "<<
2147 broadcastAccFunction<uint64_t>(pleaseGetNegCacheSize)<<" negative entries, "<<
2148 (int)((cacheHits*100.0)/(cacheHits+cacheMisses))<<"% cache hits"<<endl;
2149
2150 L<<Logger::Notice<<"stats: throttle map: "
2151 << broadcastAccFunction<uint64_t>(pleaseGetThrottleSize) <<", ns speeds: "
2152 << broadcastAccFunction<uint64_t>(pleaseGetNsSpeedsSize)<<endl;
2153 L<<Logger::Notice<<"stats: outpacket/query ratio "<<(int)(SyncRes::s_outqueries*100.0/SyncRes::s_queries)<<"%";
2154 L<<Logger::Notice<<", "<<(int)(SyncRes::s_throttledqueries*100.0/(SyncRes::s_outqueries+SyncRes::s_throttledqueries))<<"% throttled, "
2155 <<SyncRes::s_nodelegated<<" no-delegation drops"<<endl;
2156 L<<Logger::Notice<<"stats: "<<SyncRes::s_tcpoutqueries<<" outgoing tcp connections, "<<
2157 broadcastAccFunction<uint64_t>(pleaseGetConcurrentQueries)<<" queries running, "<<SyncRes::s_outgoingtimeouts<<" outgoing timeouts"<<endl;
2158
2159 //L<<Logger::Notice<<"stats: "<<g_stats.ednsPingMatches<<" ping matches, "<<g_stats.ednsPingMismatches<<" mismatches, "<<
2160 //g_stats.noPingOutQueries<<" outqueries w/o ping, "<< g_stats.noEdnsOutQueries<<" w/o EDNS"<<endl;
2161
2162 L<<Logger::Notice<<"stats: " << broadcastAccFunction<uint64_t>(pleaseGetPacketCacheSize) <<
2163 " packet cache entries, "<<(int)(100.0*broadcastAccFunction<uint64_t>(pleaseGetPacketCacheHits)/SyncRes::s_queries) << "% packet cache hits"<<endl;
2164
2165 time_t now = time(0);
2166 if(lastOutputTime && lastQueryCount && now != lastOutputTime) {
2167 L<<Logger::Notice<<"stats: "<< (SyncRes::s_queries - lastQueryCount) / (now - lastOutputTime) <<" qps (average over "<< (now - lastOutputTime) << " seconds)"<<endl;
2168 }
2169 lastOutputTime = now;
2170 lastQueryCount = SyncRes::s_queries;
2171 }
2172 else if(statsWanted)
2173 L<<Logger::Notice<<"stats: no stats yet!"<<endl;
2174
2175 statsWanted=false;
2176 }
2177
2178 static void houseKeeping(void *)
2179 {
2180 static thread_local time_t last_rootupdate, last_prune, last_secpoll;
2181 static thread_local int cleanCounter=0;
2182 static thread_local bool s_running; // houseKeeping can get suspended in secpoll, and be restarted, which makes us do duplicate work
2183 try {
2184 if(s_running)
2185 return;
2186 s_running=true;
2187
2188 struct timeval now;
2189 Utility::gettimeofday(&now, 0);
2190
2191 if(now.tv_sec - last_prune > (time_t)(5 + t_id)) {
2192 DTime dt;
2193 dt.setTimeval(now);
2194 t_RC->doPrune(g_maxCacheEntries / g_numThreads); // this function is local to a thread, so fine anyhow
2195 t_packetCache->doPruneTo(g_maxPacketCacheEntries / g_numWorkerThreads);
2196
2197 SyncRes::pruneNegCache(g_maxCacheEntries / (g_numWorkerThreads * 10));
2198
2199 if(!((cleanCounter++)%40)) { // this is a full scan!
2200 time_t limit=now.tv_sec-300;
2201 SyncRes::pruneNSSpeeds(limit);
2202 }
2203 last_prune=time(0);
2204 }
2205
2206 if(now.tv_sec - last_rootupdate > 7200) {
2207 int res = SyncRes::getRootNS(g_now, nullptr);
2208 if (!res)
2209 last_rootupdate=now.tv_sec;
2210 }
2211
2212 if(t_id == s_distributorThreadID) {
2213
2214 if(now.tv_sec - last_secpoll >= 3600) {
2215 try {
2216 doSecPoll(&last_secpoll);
2217 }
2218 catch(std::exception& e)
2219 {
2220 L<<Logger::Error<<"Exception while performing security poll: "<<e.what()<<endl;
2221 }
2222 catch(PDNSException& e)
2223 {
2224 L<<Logger::Error<<"Exception while performing security poll: "<<e.reason<<endl;
2225 }
2226 catch(ImmediateServFailException &e)
2227 {
2228 L<<Logger::Error<<"Exception while performing security poll: "<<e.reason<<endl;
2229 }
2230 catch(...)
2231 {
2232 L<<Logger::Error<<"Exception while performing security poll"<<endl;
2233 }
2234
2235 }
2236 }
2237 s_running=false;
2238 }
2239 catch(PDNSException& ae)
2240 {
2241 s_running=false;
2242 L<<Logger::Error<<"Fatal error in housekeeping thread: "<<ae.reason<<endl;
2243 throw;
2244 }
2245 }
2246
2247 static void makeThreadPipes()
2248 {
2249 for(unsigned int n=0; n < g_numThreads; ++n) {
2250 struct ThreadPipeSet tps;
2251 int fd[2];
2252 if(pipe(fd) < 0)
2253 unixDie("Creating pipe for inter-thread communications");
2254
2255 tps.readToThread = fd[0];
2256 tps.writeToThread = fd[1];
2257
2258 if(pipe(fd) < 0)
2259 unixDie("Creating pipe for inter-thread communications");
2260 tps.readFromThread = fd[0];
2261 tps.writeFromThread = fd[1];
2262
2263 if(pipe(fd) < 0)
2264 unixDie("Creating pipe for inter-thread communications");
2265 tps.readQueriesToThread = fd[0];
2266 tps.writeQueriesToThread = fd[1];
2267
2268 if (!setNonBlocking(tps.writeQueriesToThread)) {
2269 unixDie("Making pipe for inter-thread communications non-blocking");
2270 }
2271
2272 g_pipes.push_back(tps);
2273 }
2274 }
2275
2276 struct ThreadMSG
2277 {
2278 pipefunc_t func;
2279 bool wantAnswer;
2280 };
2281
2282 void broadcastFunction(const pipefunc_t& func)
2283 {
2284 /* This function might be called before t_id are set during startup
2285 for the initialization of ACLs and domain maps, but the default is the same
2286 than the handler thread */
2287 if (t_id != s_handlerThreadID) {
2288 L<<Logger::Error<<"broadcastFunction() has been called by a worker ("<<t_id<<")"<<endl;
2289 exit(1);
2290 }
2291
2292 /* the distributor will call itself below, but if we are the handler thread,
2293 call the function ourselves to update the ACL or domain maps for example */
2294 func();
2295
2296 int n = 0;
2297 for(ThreadPipeSet& tps : g_pipes)
2298 {
2299 if(n++ == t_id) {
2300 func(); // don't write to ourselves!
2301 continue;
2302 }
2303
2304 ThreadMSG* tmsg = new ThreadMSG();
2305 tmsg->func = func;
2306 tmsg->wantAnswer = true;
2307 if(write(tps.writeToThread, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) {
2308 delete tmsg;
2309 unixDie("write to thread pipe returned wrong size or error");
2310 }
2311
2312 string* resp = nullptr;
2313 if(read(tps.readFromThread, &resp, sizeof(resp)) != sizeof(resp))
2314 unixDie("read from thread pipe returned wrong size or error");
2315
2316 if(resp) {
2317 delete resp;
2318 resp = nullptr;
2319 }
2320 }
2321 }
2322
2323 static bool trySendingQueryToWorker(unsigned int target, ThreadMSG* tmsg)
2324 {
2325 if(target == static_cast<unsigned int>(s_distributorThreadID)) {
2326 L<<Logger::Error<<"distributeAsyncFunction() tried to assign a query to the distributor"<<endl;
2327 exit(1);
2328 }
2329
2330 ThreadPipeSet& tps = g_pipes[target];
2331 ssize_t written = write(tps.writeQueriesToThread, &tmsg, sizeof(tmsg));
2332 if (written > 0) {
2333 if (static_cast<size_t>(written) != sizeof(tmsg)) {
2334 delete tmsg;
2335 unixDie("write to thread pipe returned wrong size or error");
2336 }
2337 }
2338 else {
2339 int error = errno;
2340 if (error == EAGAIN || error == EWOULDBLOCK) {
2341 /* the pipe is full, sorry */
2342 return false;
2343 } else {
2344 delete tmsg;
2345 unixDie("write to thread pipe returned wrong size or error:" + std::to_string(error));
2346 }
2347 }
2348
2349 return true;
2350 }
2351
2352 // This function is only called by the distributor thread, when pdns-distributes-queries is set
2353 void distributeAsyncFunction(const string& packet, const pipefunc_t& func)
2354 {
2355 if (t_id != s_distributorThreadID) {
2356 L<<Logger::Error<<"distributeAsyncFunction() has been called by a worker ("<<t_id<<")"<<endl;
2357 exit(1);
2358 }
2359
2360 unsigned int hash = hashQuestion(packet.c_str(), packet.length(), g_disthashseed);
2361 unsigned int target = 1 + (hash % (g_pipes.size()-1));
2362
2363 ThreadMSG* tmsg = new ThreadMSG();
2364 tmsg->func = func;
2365 tmsg->wantAnswer = false;
2366
2367 if (!trySendingQueryToWorker(target, tmsg)) {
2368 /* if this function failed but did not raise an exception, it means that the pipe
2369 was full, let's try another one */
2370 unsigned int newTarget = 0;
2371 do {
2372 newTarget = 1 + dns_random(g_pipes.size()-1);
2373 } while (newTarget == target);
2374
2375 if (!trySendingQueryToWorker(newTarget, tmsg)) {
2376 g_stats.queryPipeFullDrops++;
2377 delete tmsg;
2378 }
2379 }
2380 }
2381
2382 static void handlePipeRequest(int fd, FDMultiplexer::funcparam_t& var)
2383 {
2384 ThreadMSG* tmsg = nullptr;
2385
2386 if(read(fd, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) { // fd == readToThread || fd == readQueriesToThread
2387 unixDie("read from thread pipe returned wrong size or error");
2388 }
2389
2390 void *resp=0;
2391 try {
2392 resp = tmsg->func();
2393 }
2394 catch(std::exception& e) {
2395 if(g_logCommonErrors)
2396 L<<Logger::Error<<"PIPE function we executed created exception: "<<e.what()<<endl; // but what if they wanted an answer.. we send 0
2397 }
2398 catch(PDNSException& e) {
2399 if(g_logCommonErrors)
2400 L<<Logger::Error<<"PIPE function we executed created PDNS exception: "<<e.reason<<endl; // but what if they wanted an answer.. we send 0
2401 }
2402 if(tmsg->wantAnswer) {
2403 if(write(g_pipes[t_id].writeFromThread, &resp, sizeof(resp)) != sizeof(resp)) {
2404 delete tmsg;
2405 unixDie("write to thread pipe returned wrong size or error");
2406 }
2407 }
2408
2409 delete tmsg;
2410 }
2411
2412 template<class T> void *voider(const boost::function<T*()>& func)
2413 {
2414 return func();
2415 }
2416
2417 vector<ComboAddress>& operator+=(vector<ComboAddress>&a, const vector<ComboAddress>& b)
2418 {
2419 a.insert(a.end(), b.begin(), b.end());
2420 return a;
2421 }
2422
2423 vector<pair<string, uint16_t> >& operator+=(vector<pair<string, uint16_t> >&a, const vector<pair<string, uint16_t> >& b)
2424 {
2425 a.insert(a.end(), b.begin(), b.end());
2426 return a;
2427 }
2428
2429 vector<pair<DNSName, uint16_t> >& operator+=(vector<pair<DNSName, uint16_t> >&a, const vector<pair<DNSName, uint16_t> >& b)
2430 {
2431 a.insert(a.end(), b.begin(), b.end());
2432 return a;
2433 }
2434
2435
2436 /*
2437 This function should only be called by the handler to gather metrics, wipe the cache,
2438 reload the Lua script (not the Lua config) or change the current trace regex,
2439 and by the SNMP thread to gather metrics. */
2440 template<class T> T broadcastAccFunction(const boost::function<T*()>& func)
2441 {
2442 /* the SNMP thread uses id -1 too */
2443 if (t_id != s_handlerThreadID) {
2444 L<<Logger::Error<<"broadcastAccFunction has been called by a worker ("<<t_id<<")"<<endl;
2445 exit(1);
2446 }
2447
2448 T ret=T();
2449 for(ThreadPipeSet& tps : g_pipes)
2450 {
2451 ThreadMSG* tmsg = new ThreadMSG();
2452 tmsg->func = boost::bind(voider<T>, func);
2453 tmsg->wantAnswer = true;
2454
2455 if(write(tps.writeToThread, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) {
2456 delete tmsg;
2457 unixDie("write to thread pipe returned wrong size or error");
2458 }
2459
2460 T* resp = nullptr;
2461 if(read(tps.readFromThread, &resp, sizeof(resp)) != sizeof(resp))
2462 unixDie("read from thread pipe returned wrong size or error");
2463
2464 if(resp) {
2465 ret += *resp;
2466 delete resp;
2467 resp = nullptr;
2468 }
2469 }
2470 return ret;
2471 }
2472
2473 template string broadcastAccFunction(const boost::function<string*()>& fun); // explicit instantiation
2474 template uint64_t broadcastAccFunction(const boost::function<uint64_t*()>& fun); // explicit instantiation
2475 template vector<ComboAddress> broadcastAccFunction(const boost::function<vector<ComboAddress> *()>& fun); // explicit instantiation
2476 template vector<pair<DNSName,uint16_t> > broadcastAccFunction(const boost::function<vector<pair<DNSName, uint16_t> > *()>& fun); // explicit instantiation
2477
2478 static void handleRCC(int fd, FDMultiplexer::funcparam_t& var)
2479 {
2480 string remote;
2481 string msg=s_rcc.recv(&remote);
2482 RecursorControlParser rcp;
2483 RecursorControlParser::func_t* command;
2484
2485 string answer=rcp.getAnswer(msg, &command);
2486
2487 // If we are inside a chroot, we need to strip
2488 if (!arg()["chroot"].empty()) {
2489 size_t len = arg()["chroot"].length();
2490 remote = remote.substr(len);
2491 }
2492
2493 try {
2494 s_rcc.send(answer, &remote);
2495 command();
2496 }
2497 catch(std::exception& e) {
2498 L<<Logger::Error<<"Error dealing with control socket request: "<<e.what()<<endl;
2499 }
2500 catch(PDNSException& ae) {
2501 L<<Logger::Error<<"Error dealing with control socket request: "<<ae.reason<<endl;
2502 }
2503 }
2504
2505 static void handleTCPClientReadable(int fd, FDMultiplexer::funcparam_t& var)
2506 {
2507 PacketID* pident=any_cast<PacketID>(&var);
2508 // cerr<<"handleTCPClientReadable called for fd "<<fd<<", pident->inNeeded: "<<pident->inNeeded<<", "<<pident->sock->getHandle()<<endl;
2509
2510 shared_array<char> buffer(new char[pident->inNeeded]);
2511
2512 ssize_t ret=recv(fd, buffer.get(), pident->inNeeded,0);
2513 if(ret > 0) {
2514 pident->inMSG.append(&buffer[0], &buffer[ret]);
2515 pident->inNeeded-=(size_t)ret;
2516 if(!pident->inNeeded || pident->inIncompleteOkay) {
2517 // cerr<<"Got entire load of "<<pident->inMSG.size()<<" bytes"<<endl;
2518 PacketID pid=*pident;
2519 string msg=pident->inMSG;
2520
2521 t_fdm->removeReadFD(fd);
2522 MT->sendEvent(pid, &msg);
2523 }
2524 else {
2525 // cerr<<"Still have "<<pident->inNeeded<<" left to go"<<endl;
2526 }
2527 }
2528 else {
2529 PacketID tmp=*pident;
2530 t_fdm->removeReadFD(fd); // pident might now be invalid (it isn't, but still)
2531 string empty;
2532 MT->sendEvent(tmp, &empty); // this conveys error status
2533 }
2534 }
2535
2536 static void handleTCPClientWritable(int fd, FDMultiplexer::funcparam_t& var)
2537 {
2538 PacketID* pid=any_cast<PacketID>(&var);
2539 ssize_t ret=send(fd, pid->outMSG.c_str() + pid->outPos, pid->outMSG.size() - pid->outPos,0);
2540 if(ret > 0) {
2541 pid->outPos+=(ssize_t)ret;
2542 if(pid->outPos==pid->outMSG.size()) {
2543 PacketID tmp=*pid;
2544 t_fdm->removeWriteFD(fd);
2545 MT->sendEvent(tmp, &tmp.outMSG); // send back what we sent to convey everything is ok
2546 }
2547 }
2548 else { // error or EOF
2549 PacketID tmp(*pid);
2550 t_fdm->removeWriteFD(fd);
2551 string sent;
2552 MT->sendEvent(tmp, &sent); // we convey error status by sending empty string
2553 }
2554 }
2555
2556 // resend event to everybody chained onto it
2557 static void doResends(MT_t::waiters_t::iterator& iter, PacketID resend, const string& content)
2558 {
2559 if(iter->key.chain.empty())
2560 return;
2561 // cerr<<"doResends called!\n";
2562 for(PacketID::chain_t::iterator i=iter->key.chain.begin(); i != iter->key.chain.end() ; ++i) {
2563 resend.fd=-1;
2564 resend.id=*i;
2565 // cerr<<"\tResending "<<content.size()<<" bytes for fd="<<resend.fd<<" and id="<<resend.id<<endl;
2566
2567 MT->sendEvent(resend, &content);
2568 g_stats.chainResends++;
2569 }
2570 }
2571
2572 static void handleUDPServerResponse(int fd, FDMultiplexer::funcparam_t& var)
2573 {
2574 PacketID pid=any_cast<PacketID>(var);
2575 ssize_t len;
2576 char data[g_outgoingEDNSBufsize];
2577 ComboAddress fromaddr;
2578 socklen_t addrlen=sizeof(fromaddr);
2579
2580 len=recvfrom(fd, data, sizeof(data), 0, (sockaddr *)&fromaddr, &addrlen);
2581
2582 if(len < (ssize_t) sizeof(dnsheader)) {
2583 if(len < 0)
2584 ; // cerr<<"Error on fd "<<fd<<": "<<stringerror()<<"\n";
2585 else {
2586 g_stats.serverParseError++;
2587 if(g_logCommonErrors)
2588 L<<Logger::Error<<"Unable to parse packet from remote UDP server "<< fromaddr.toString() <<
2589 ": packet smaller than DNS header"<<endl;
2590 }
2591
2592 t_udpclientsocks->returnSocket(fd);
2593 string empty;
2594
2595 MT_t::waiters_t::iterator iter=MT->d_waiters.find(pid);
2596 if(iter != MT->d_waiters.end())
2597 doResends(iter, pid, empty);
2598
2599 MT->sendEvent(pid, &empty); // this denotes error (does lookup again.. at least L1 will be hot)
2600 return;
2601 }
2602
2603 dnsheader dh;
2604 memcpy(&dh, data, sizeof(dh));
2605
2606 PacketID pident;
2607 pident.remote=fromaddr;
2608 pident.id=dh.id;
2609 pident.fd=fd;
2610
2611 if(!dh.qr && g_logCommonErrors) {
2612 L<<Logger::Notice<<"Not taking data from question on outgoing socket from "<< fromaddr.toStringWithPort() <<endl;
2613 }
2614
2615 if(!dh.qdcount || // UPC, Nominum, very old BIND on FormErr, NSD
2616 !dh.qr) { // one weird server
2617 pident.domain.clear();
2618 pident.type = 0;
2619 }
2620 else {
2621 try {
2622 if(len > 12)
2623 pident.domain=DNSName(data, len, 12, false, &pident.type); // don't copy this from above - we need to do the actual read
2624 }
2625 catch(std::exception& e) {
2626 g_stats.serverParseError++; // won't be fed to lwres.cc, so we have to increment
2627 L<<Logger::Warning<<"Error in packet from remote nameserver "<< fromaddr.toStringWithPort() << ": "<<e.what() << endl;
2628 return;
2629 }
2630 }
2631 string packet;
2632 packet.assign(data, len);
2633
2634 MT_t::waiters_t::iterator iter=MT->d_waiters.find(pident);
2635 if(iter != MT->d_waiters.end()) {
2636 doResends(iter, pident, packet);
2637 }
2638
2639 retryWithName:
2640
2641 if(!MT->sendEvent(pident, &packet)) {
2642 // we do a full scan for outstanding queries on unexpected answers. not too bad since we only accept them on the right port number, which is hard enough to guess
2643 for(MT_t::waiters_t::iterator mthread=MT->d_waiters.begin(); mthread!=MT->d_waiters.end(); ++mthread) {
2644 if(pident.fd==mthread->key.fd && mthread->key.remote==pident.remote && mthread->key.type == pident.type &&
2645 pident.domain == mthread->key.domain) {
2646 mthread->key.nearMisses++;
2647 }
2648
2649 // be a bit paranoid here since we're weakening our matching
2650 if(pident.domain.empty() && !mthread->key.domain.empty() && !pident.type && mthread->key.type &&
2651 pident.id == mthread->key.id && mthread->key.remote == pident.remote) {
2652 // cerr<<"Empty response, rest matches though, sending to a waiter"<<endl;
2653 pident.domain = mthread->key.domain;
2654 pident.type = mthread->key.type;
2655 goto retryWithName; // note that this only passes on an error, lwres will still reject the packet
2656 }
2657 }
2658 g_stats.unexpectedCount++; // if we made it here, it really is an unexpected answer
2659 if(g_logCommonErrors) {
2660 L<<Logger::Warning<<"Discarding unexpected packet from "<<fromaddr.toStringWithPort()<<": "<< (pident.domain.empty() ? "<empty>" : pident.domain.toString())<<", "<<pident.type<<", "<<MT->d_waiters.size()<<" waiters"<<endl;
2661 }
2662 }
2663 else if(fd >= 0) {
2664 t_udpclientsocks->returnSocket(fd);
2665 }
2666 }
2667
2668 FDMultiplexer* getMultiplexer()
2669 {
2670 FDMultiplexer* ret;
2671 for(const auto& i : FDMultiplexer::getMultiplexerMap()) {
2672 try {
2673 ret=i.second();
2674 return ret;
2675 }
2676 catch(FDMultiplexerException &fe) {
2677 L<<Logger::Error<<"Non-fatal error initializing possible multiplexer ("<<fe.what()<<"), falling back"<<endl;
2678 }
2679 catch(...) {
2680 L<<Logger::Error<<"Non-fatal error initializing possible multiplexer"<<endl;
2681 }
2682 }
2683 L<<Logger::Error<<"No working multiplexer found!"<<endl;
2684 exit(1);
2685 }
2686
2687
2688 static string* doReloadLuaScript()
2689 {
2690 string fname= ::arg()["lua-dns-script"];
2691 try {
2692 if(fname.empty()) {
2693 t_pdl.reset();
2694 L<<Logger::Error<<t_id<<" Unloaded current lua script"<<endl;
2695 return new string("unloaded\n");
2696 }
2697 else {
2698 t_pdl = std::make_shared<RecursorLua4>(fname);
2699 }
2700 }
2701 catch(std::exception& e) {
2702 L<<Logger::Error<<t_id<<" Retaining current script, error from '"<<fname<<"': "<< e.what() <<endl;
2703 return new string("retaining current script, error from '"+fname+"': "+e.what()+"\n");
2704 }
2705
2706 L<<Logger::Warning<<t_id<<" (Re)loaded lua script from '"<<fname<<"'"<<endl;
2707 return new string("(re)loaded '"+fname+"'\n");
2708 }
2709
2710 string doQueueReloadLuaScript(vector<string>::const_iterator begin, vector<string>::const_iterator end)
2711 {
2712 if(begin != end)
2713 ::arg().set("lua-dns-script") = *begin;
2714
2715 return broadcastAccFunction<string>(doReloadLuaScript);
2716 }
2717
2718 static string* pleaseUseNewTraceRegex(const std::string& newRegex)
2719 try
2720 {
2721 if(newRegex.empty()) {
2722 t_traceRegex.reset();
2723 return new string("unset\n");
2724 }
2725 else {
2726 t_traceRegex = std::make_shared<Regex>(newRegex);
2727 return new string("ok\n");
2728 }
2729 }
2730 catch(PDNSException& ae)
2731 {
2732 return new string(ae.reason+"\n");
2733 }
2734
2735 string doTraceRegex(vector<string>::const_iterator begin, vector<string>::const_iterator end)
2736 {
2737 return broadcastAccFunction<string>(boost::bind(pleaseUseNewTraceRegex, begin!=end ? *begin : ""));
2738 }
2739
2740 static void checkLinuxIPv6Limits()
2741 {
2742 #ifdef __linux__
2743 string line;
2744 if(readFileIfThere("/proc/sys/net/ipv6/route/max_size", &line)) {
2745 int lim=std::stoi(line);
2746 if(lim < 16384) {
2747 L<<Logger::Error<<"If using IPv6, please raise sysctl net.ipv6.route.max_size, currently set to "<<lim<<" which is < 16384"<<endl;
2748 }
2749 }
2750 #endif
2751 }
2752 static void checkOrFixFDS()
2753 {
2754 unsigned int availFDs=getFilenumLimit();
2755 unsigned int wantFDs = g_maxMThreads * g_numWorkerThreads +25; // even healthier margin then before
2756
2757 if(wantFDs > availFDs) {
2758 unsigned int hardlimit= getFilenumLimit(true);
2759 if(hardlimit >= wantFDs) {
2760 setFilenumLimit(wantFDs);
2761 L<<Logger::Warning<<"Raised soft limit on number of filedescriptors to "<<wantFDs<<" to match max-mthreads and threads settings"<<endl;
2762 }
2763 else {
2764 int newval = (hardlimit - 25) / g_numWorkerThreads;
2765 L<<Logger::Warning<<"Insufficient number of filedescriptors available for max-mthreads*threads setting! ("<<hardlimit<<" < "<<wantFDs<<"), reducing max-mthreads to "<<newval<<endl;
2766 g_maxMThreads = newval;
2767 setFilenumLimit(hardlimit);
2768 }
2769 }
2770 }
2771
2772 static void* recursorThread(int tid, bool worker);
2773
2774 static void* pleaseSupplantACLs(std::shared_ptr<NetmaskGroup> ng)
2775 {
2776 t_allowFrom = ng;
2777 return nullptr;
2778 }
2779
2780 int g_argc;
2781 char** g_argv;
2782
2783 void parseACLs()
2784 {
2785 static bool l_initialized;
2786
2787 if(l_initialized) { // only reload configuration file on second call
2788 string configname=::arg()["config-dir"]+"/recursor.conf";
2789 if(::arg()["config-name"]!="") {
2790 configname=::arg()["config-dir"]+"/recursor-"+::arg()["config-name"]+".conf";
2791 }
2792 cleanSlashes(configname);
2793
2794 if(!::arg().preParseFile(configname.c_str(), "allow-from-file"))
2795 throw runtime_error("Unable to re-parse configuration file '"+configname+"'");
2796 ::arg().preParseFile(configname.c_str(), "allow-from", LOCAL_NETS);
2797 ::arg().preParseFile(configname.c_str(), "include-dir");
2798 ::arg().preParse(g_argc, g_argv, "include-dir");
2799
2800 // then process includes
2801 std::vector<std::string> extraConfigs;
2802 ::arg().gatherIncludes(extraConfigs);
2803
2804 for(const std::string& fn : extraConfigs) {
2805 if(!::arg().preParseFile(fn.c_str(), "allow-from-file", ::arg()["allow-from-file"]))
2806 throw runtime_error("Unable to re-parse configuration file include '"+fn+"'");
2807 if(!::arg().preParseFile(fn.c_str(), "allow-from", ::arg()["allow-from"]))
2808 throw runtime_error("Unable to re-parse configuration file include '"+fn+"'");
2809 }
2810
2811 ::arg().preParse(g_argc, g_argv, "allow-from-file");
2812 ::arg().preParse(g_argc, g_argv, "allow-from");
2813 }
2814
2815 std::shared_ptr<NetmaskGroup> oldAllowFrom = t_allowFrom;
2816 std::shared_ptr<NetmaskGroup> allowFrom = std::make_shared<NetmaskGroup>();
2817
2818 if(!::arg()["allow-from-file"].empty()) {
2819 string line;
2820 ifstream ifs(::arg()["allow-from-file"].c_str());
2821 if(!ifs) {
2822 throw runtime_error("Could not open '"+::arg()["allow-from-file"]+"': "+stringerror());
2823 }
2824
2825 string::size_type pos;
2826 while(getline(ifs,line)) {
2827 pos=line.find('#');
2828 if(pos!=string::npos)
2829 line.resize(pos);
2830 trim(line);
2831 if(line.empty())
2832 continue;
2833
2834 allowFrom->addMask(line);
2835 }
2836 L<<Logger::Warning<<"Done parsing " << allowFrom->size() <<" allow-from ranges from file '"<<::arg()["allow-from-file"]<<"' - overriding 'allow-from' setting"<<endl;
2837 }
2838 else if(!::arg()["allow-from"].empty()) {
2839 vector<string> ips;
2840 stringtok(ips, ::arg()["allow-from"], ", ");
2841
2842 L<<Logger::Warning<<"Only allowing queries from: ";
2843 for(vector<string>::const_iterator i = ips.begin(); i!= ips.end(); ++i) {
2844 allowFrom->addMask(*i);
2845 if(i!=ips.begin())
2846 L<<Logger::Warning<<", ";
2847 L<<Logger::Warning<<*i;
2848 }
2849 L<<Logger::Warning<<endl;
2850 }
2851 else {
2852 if(::arg()["local-address"]!="127.0.0.1" && ::arg().asNum("local-port")==53)
2853 L<<Logger::Error<<"WARNING: Allowing queries from all IP addresses - this can be a security risk!"<<endl;
2854 allowFrom = nullptr;
2855 }
2856
2857 g_initialAllowFrom = allowFrom;
2858 broadcastFunction(boost::bind(pleaseSupplantACLs, allowFrom));
2859 oldAllowFrom = nullptr;
2860
2861 l_initialized = true;
2862 }
2863
2864
2865 static void setupDelegationOnly()
2866 {
2867 vector<string> parts;
2868 stringtok(parts, ::arg()["delegation-only"], ", \t");
2869 for(const auto& p : parts) {
2870 SyncRes::addDelegationOnly(DNSName(p));
2871 }
2872 }
2873
2874 static std::map<unsigned int, std::set<int> > parseCPUMap()
2875 {
2876 std::map<unsigned int, std::set<int> > result;
2877
2878 const std::string value = ::arg()["cpu-map"];
2879
2880 if (!value.empty() && !isSettingThreadCPUAffinitySupported()) {
2881 L<<Logger::Warning<<"CPU mapping requested but not supported, skipping"<<endl;
2882 return result;
2883 }
2884
2885 std::vector<std::string> parts;
2886
2887 stringtok(parts, value, " \t");
2888
2889 for(const auto& part : parts) {
2890 if (part.find('=') == string::npos)
2891 continue;
2892
2893 try {
2894 auto headers = splitField(part, '=');
2895 trim(headers.first);
2896 trim(headers.second);
2897
2898 unsigned int threadId = pdns_stou(headers.first);
2899 std::vector<std::string> cpus;
2900
2901 stringtok(cpus, headers.second, ",");
2902
2903 for(const auto& cpu : cpus) {
2904 int cpuId = std::stoi(cpu);
2905
2906 result[threadId].insert(cpuId);
2907 }
2908 }
2909 catch(const std::exception& e) {
2910 L<<Logger::Error<<"Error parsing cpu-map entry '"<<part<<"': "<<e.what()<<endl;
2911 }
2912 }
2913
2914 return result;
2915 }
2916
2917 static void setCPUMap(const std::map<unsigned int, std::set<int> >& cpusMap, unsigned int n, pthread_t tid)
2918 {
2919 const auto& cpuMapping = cpusMap.find(n);
2920 if (cpuMapping != cpusMap.cend()) {
2921 int rc = mapThreadToCPUList(tid, cpuMapping->second);
2922 if (rc == 0) {
2923 L<<Logger::Info<<"CPU affinity for worker "<<n<<" has been set to CPU map:";
2924 for (const auto cpu : cpuMapping->second) {
2925 L<<Logger::Info<<" "<<cpu;
2926 }
2927 L<<Logger::Info<<endl;
2928 }
2929 else {
2930 L<<Logger::Warning<<"Error setting CPU affinity for worker "<<n<<" to CPU map:";
2931 for (const auto cpu : cpuMapping->second) {
2932 L<<Logger::Info<<" "<<cpu;
2933 }
2934 L<<Logger::Info<<strerror(rc)<<endl;
2935 }
2936 }
2937 }
2938
2939 static int serviceMain(int argc, char*argv[])
2940 {
2941 L.setName(s_programname);
2942 L.disableSyslog(::arg().mustDo("disable-syslog"));
2943 L.setTimestamps(::arg().mustDo("log-timestamp"));
2944
2945 if(!::arg()["logging-facility"].empty()) {
2946 int val=logFacilityToLOG(::arg().asNum("logging-facility") );
2947 if(val >= 0)
2948 theL().setFacility(val);
2949 else
2950 L<<Logger::Error<<"Unknown logging facility "<<::arg().asNum("logging-facility") <<endl;
2951 }
2952
2953 showProductVersion();
2954 seedRandom(::arg()["entropy-source"]);
2955
2956 g_disthashseed=dns_random(0xffffffff);
2957
2958 checkLinuxIPv6Limits();
2959 try {
2960 vector<string> addrs;
2961 if(!::arg()["query-local-address6"].empty()) {
2962 SyncRes::s_doIPv6=true;
2963 L<<Logger::Warning<<"Enabling IPv6 transport for outgoing queries"<<endl;
2964
2965 stringtok(addrs, ::arg()["query-local-address6"], ", ;");
2966 for(const string& addr : addrs) {
2967 g_localQueryAddresses6.push_back(ComboAddress(addr));
2968 }
2969 }
2970 else {
2971 L<<Logger::Warning<<"NOT using IPv6 for outgoing queries - set 'query-local-address6=::' to enable"<<endl;
2972 }
2973 addrs.clear();
2974 stringtok(addrs, ::arg()["query-local-address"], ", ;");
2975 for(const string& addr : addrs) {
2976 g_localQueryAddresses4.push_back(ComboAddress(addr));
2977 }
2978 }
2979 catch(std::exception& e) {
2980 L<<Logger::Error<<"Assigning local query addresses: "<<e.what();
2981 exit(99);
2982 }
2983
2984 // keep this ABOVE loadRecursorLuaConfig!
2985 if(::arg()["dnssec"]=="off")
2986 g_dnssecmode=DNSSECMode::Off;
2987 else if(::arg()["dnssec"]=="process-no-validate")
2988 g_dnssecmode=DNSSECMode::ProcessNoValidate;
2989 else if(::arg()["dnssec"]=="process")
2990 g_dnssecmode=DNSSECMode::Process;
2991 else if(::arg()["dnssec"]=="validate")
2992 g_dnssecmode=DNSSECMode::ValidateAll;
2993 else if(::arg()["dnssec"]=="log-fail")
2994 g_dnssecmode=DNSSECMode::ValidateForLog;
2995 else {
2996 L<<Logger::Error<<"Unknown DNSSEC mode "<<::arg()["dnssec"]<<endl;
2997 exit(1);
2998 }
2999
3000 g_signatureInceptionSkew = ::arg().asNum("signature-inception-skew");
3001 if (g_signatureInceptionSkew < 0) {
3002 L<<Logger::Error<<"A negative value for 'signature-inception-skew' is not allowed"<<endl;
3003 exit(1);
3004 }
3005
3006 g_dnssecLogBogus = ::arg().mustDo("dnssec-log-bogus");
3007 g_maxNSEC3Iterations = ::arg().asNum("nsec3-max-iterations");
3008
3009 g_maxCacheEntries = ::arg().asNum("max-cache-entries");
3010 g_maxPacketCacheEntries = ::arg().asNum("max-packetcache-entries");
3011
3012 luaConfigDelayedThreads delayedLuaThreads;
3013 try {
3014 loadRecursorLuaConfig(::arg()["lua-config-file"], delayedLuaThreads);
3015 }
3016 catch (PDNSException &e) {
3017 L<<Logger::Error<<"Cannot load Lua configuration: "<<e.reason<<endl;
3018 exit(1);
3019 }
3020
3021 parseACLs();
3022 sortPublicSuffixList();
3023
3024 if(!::arg()["dont-query"].empty()) {
3025 vector<string> ips;
3026 stringtok(ips, ::arg()["dont-query"], ", ");
3027 ips.push_back("0.0.0.0");
3028 ips.push_back("::");
3029
3030 L<<Logger::Warning<<"Will not send queries to: ";
3031 for(vector<string>::const_iterator i = ips.begin(); i!= ips.end(); ++i) {
3032 SyncRes::addDontQuery(*i);
3033 if(i!=ips.begin())
3034 L<<Logger::Warning<<", ";
3035 L<<Logger::Warning<<*i;
3036 }
3037 L<<Logger::Warning<<endl;
3038 }
3039
3040 g_quiet=::arg().mustDo("quiet");
3041
3042 g_weDistributeQueries = ::arg().mustDo("pdns-distributes-queries");
3043 if(g_weDistributeQueries) {
3044 L<<Logger::Warning<<"PowerDNS Recursor itself will distribute queries over threads"<<endl;
3045 }
3046
3047 setupDelegationOnly();
3048 g_outgoingEDNSBufsize=::arg().asNum("edns-outgoing-bufsize");
3049
3050 if(::arg()["trace"]=="fail") {
3051 SyncRes::setDefaultLogMode(SyncRes::Store);
3052 }
3053 else if(::arg().mustDo("trace")) {
3054 SyncRes::setDefaultLogMode(SyncRes::Log);
3055 ::arg().set("quiet")="no";
3056 g_quiet=false;
3057 g_dnssecLOG=true;
3058 }
3059
3060 SyncRes::s_minimumTTL = ::arg().asNum("minimum-ttl-override");
3061
3062 SyncRes::s_nopacketcache = ::arg().mustDo("disable-packetcache");
3063
3064 SyncRes::s_maxnegttl=::arg().asNum("max-negative-ttl");
3065 SyncRes::s_maxcachettl=max(::arg().asNum("max-cache-ttl"), 15);
3066 SyncRes::s_packetcachettl=::arg().asNum("packetcache-ttl");
3067 // Cap the packetcache-servfail-ttl to the packetcache-ttl
3068 uint32_t packetCacheServFailTTL = ::arg().asNum("packetcache-servfail-ttl");
3069 SyncRes::s_packetcacheservfailttl=(packetCacheServFailTTL > SyncRes::s_packetcachettl) ? SyncRes::s_packetcachettl : packetCacheServFailTTL;
3070 SyncRes::s_serverdownmaxfails=::arg().asNum("server-down-max-fails");
3071 SyncRes::s_serverdownthrottletime=::arg().asNum("server-down-throttle-time");
3072 SyncRes::s_serverID=::arg()["server-id"];
3073 SyncRes::s_maxqperq=::arg().asNum("max-qperq");
3074 SyncRes::s_maxtotusec=1000*::arg().asNum("max-total-msec");
3075 SyncRes::s_maxdepth=::arg().asNum("max-recursion-depth");
3076 SyncRes::s_rootNXTrust = ::arg().mustDo( "root-nx-trust");
3077 if(SyncRes::s_serverID.empty()) {
3078 char tmp[128];
3079 gethostname(tmp, sizeof(tmp)-1);
3080 SyncRes::s_serverID=tmp;
3081 }
3082
3083 SyncRes::s_ecsipv4limit = ::arg().asNum("ecs-ipv4-bits");
3084 SyncRes::s_ecsipv6limit = ::arg().asNum("ecs-ipv6-bits");
3085
3086 if (!::arg().isEmpty("ecs-scope-zero-address")) {
3087 ComboAddress scopeZero(::arg()["ecs-scope-zero-address"]);
3088 SyncRes::setECSScopeZeroAddress(Netmask(scopeZero, scopeZero.isIPv4() ? 32 : 128));
3089 }
3090 else {
3091 bool found = false;
3092 for (const auto& addr : g_localQueryAddresses4) {
3093 if (!IsAnyAddress(addr)) {
3094 SyncRes::setECSScopeZeroAddress(Netmask(addr, 32));
3095 found = true;
3096 break;
3097 }
3098 }
3099 if (!found) {
3100 for (const auto& addr : g_localQueryAddresses6) {
3101 if (!IsAnyAddress(addr)) {
3102 SyncRes::setECSScopeZeroAddress(Netmask(addr, 128));
3103 found = true;
3104 break;
3105 }
3106 }
3107 if (!found) {
3108 SyncRes::setECSScopeZeroAddress(Netmask("127.0.0.1/32"));
3109 }
3110 }
3111 }
3112
3113 g_networkTimeoutMsec = ::arg().asNum("network-timeout");
3114
3115 g_initialDomainMap = parseAuthAndForwards();
3116
3117 g_latencyStatSize=::arg().asNum("latency-statistic-size");
3118
3119 g_logCommonErrors=::arg().mustDo("log-common-errors");
3120 g_logRPZChanges = ::arg().mustDo("log-rpz-changes");
3121
3122 g_anyToTcp = ::arg().mustDo("any-to-tcp");
3123 g_udpTruncationThreshold = ::arg().asNum("udp-truncation-threshold");
3124
3125 g_lowercaseOutgoing = ::arg().mustDo("lowercase-outgoing");
3126
3127 g_numWorkerThreads = ::arg().asNum("threads");
3128 if (g_numWorkerThreads < 1) {
3129 L<<Logger::Warning<<"Asked to run with 0 threads, raising to 1 instead"<<endl;
3130 g_numWorkerThreads = 1;
3131 }
3132
3133 g_numThreads = g_numWorkerThreads + g_weDistributeQueries;
3134 g_maxMThreads = ::arg().asNum("max-mthreads");
3135
3136 g_gettagNeedsEDNSOptions = ::arg().mustDo("gettag-needs-edns-options");
3137
3138 g_statisticsInterval = ::arg().asNum("statistics-interval");
3139
3140 #ifdef SO_REUSEPORT
3141 g_reusePort = ::arg().mustDo("reuseport");
3142 #endif
3143
3144 g_useOneSocketPerThread = (!g_weDistributeQueries && g_reusePort);
3145
3146 if (g_useOneSocketPerThread) {
3147 for (unsigned int threadId = 0; threadId < g_numWorkerThreads; threadId++) {
3148 makeUDPServerSockets(threadId);
3149 makeTCPServerSockets(threadId);
3150 }
3151 }
3152 else {
3153 makeUDPServerSockets(0);
3154 makeTCPServerSockets(0);
3155
3156 if (!g_weDistributeQueries) {
3157 /* we are not distributing queries and we don't have reuseport,
3158 so every thread will be listening on all the TCP sockets */
3159 for (unsigned int threadId = 1; threadId < g_numWorkerThreads; threadId++) {
3160 g_tcpListenSockets[threadId] = g_tcpListenSockets[0];
3161 }
3162 }
3163 }
3164
3165 SyncRes::parseEDNSSubnetWhitelist(::arg()["edns-subnet-whitelist"]);
3166 g_useIncomingECS = ::arg().mustDo("use-incoming-edns-subnet");
3167
3168 int forks;
3169 for(forks = 0; forks < ::arg().asNum("processes") - 1; ++forks) {
3170 if(!fork()) // we are child
3171 break;
3172 }
3173
3174 if(::arg().mustDo("daemon")) {
3175 L<<Logger::Warning<<"Calling daemonize, going to background"<<endl;
3176 L.toConsole(Logger::Critical);
3177 daemonize();
3178 }
3179 signal(SIGUSR1,usr1Handler);
3180 signal(SIGUSR2,usr2Handler);
3181 signal(SIGPIPE,SIG_IGN);
3182
3183 checkOrFixFDS();
3184
3185 #ifdef HAVE_LIBSODIUM
3186 if (sodium_init() == -1) {
3187 L<<Logger::Error<<"Unable to initialize sodium crypto library"<<endl;
3188 exit(99);
3189 }
3190 #endif
3191
3192 openssl_thread_setup();
3193 openssl_seed();
3194
3195 int newgid=0;
3196 if(!::arg()["setgid"].empty())
3197 newgid=Utility::makeGidNumeric(::arg()["setgid"]);
3198 int newuid=0;
3199 if(!::arg()["setuid"].empty())
3200 newuid=Utility::makeUidNumeric(::arg()["setuid"]);
3201
3202 Utility::dropGroupPrivs(newuid, newgid);
3203
3204 if (!::arg()["chroot"].empty()) {
3205 #ifdef HAVE_SYSTEMD
3206 char *ns;
3207 ns = getenv("NOTIFY_SOCKET");
3208 if (ns != nullptr) {
3209 L<<Logger::Error<<"Unable to chroot when running from systemd. Please disable chroot= or set the 'Type' for this service to 'simple'"<<endl;
3210 exit(1);
3211 }
3212 #endif
3213 if (chroot(::arg()["chroot"].c_str())<0 || chdir("/") < 0) {
3214 L<<Logger::Error<<"Unable to chroot to '"+::arg()["chroot"]+"': "<<strerror (errno)<<", exiting"<<endl;
3215 exit(1);
3216 }
3217 else
3218 L<<Logger::Error<<"Chrooted to '"<<::arg()["chroot"]<<"'"<<endl;
3219 }
3220
3221 s_pidfname=::arg()["socket-dir"]+"/"+s_programname+".pid";
3222 if(!s_pidfname.empty())
3223 unlink(s_pidfname.c_str()); // remove possible old pid file
3224 writePid();
3225
3226 makeControlChannelSocket( ::arg().asNum("processes") > 1 ? forks : -1);
3227
3228 Utility::dropUserPrivs(newuid);
3229
3230 startLuaConfigDelayedThreads(delayedLuaThreads);
3231
3232 makeThreadPipes();
3233
3234 g_tcpTimeout=::arg().asNum("client-tcp-timeout");
3235 g_maxTCPPerClient=::arg().asNum("max-tcp-per-client");
3236 g_tcpMaxQueriesPerConn=::arg().asNum("max-tcp-queries-per-connection");
3237 s_maxUDPQueriesPerRound=::arg().asNum("max-udp-queries-per-round");
3238
3239 if (::arg().mustDo("snmp-agent")) {
3240 g_snmpAgent = std::make_shared<RecursorSNMPAgent>("recursor", ::arg()["snmp-master-socket"]);
3241 g_snmpAgent->run();
3242 }
3243
3244 /* This thread handles the web server, carbon, statistics and the control channel */
3245 std::thread handlerThread(recursorThread, s_handlerThreadID, false);
3246
3247 const auto cpusMap = parseCPUMap();
3248
3249 std::vector<std::thread> workers(g_numThreads);
3250 if(g_numThreads == 1) {
3251 L<<Logger::Warning<<"Operating unthreaded"<<endl;
3252 #ifdef HAVE_SYSTEMD
3253 sd_notify(0, "READY=1");
3254 #endif
3255 setCPUMap(cpusMap, 0, pthread_self());
3256 recursorThread(0, true);
3257 }
3258 else {
3259 L<<Logger::Warning<<"Launching "<< g_numThreads <<" threads"<<endl;
3260 for(unsigned int n=0; n < g_numThreads; ++n) {
3261 workers[n] = std::thread(recursorThread, n, true);
3262
3263 setCPUMap(cpusMap, n, workers[n].native_handle());
3264 }
3265 #ifdef HAVE_SYSTEMD
3266 sd_notify(0, "READY=1");
3267 #endif
3268 workers.back().join();
3269 }
3270 return 0;
3271 }
3272
3273 static void* recursorThread(int n, bool worker)
3274 try
3275 {
3276 t_id=n;
3277 SyncRes tmp(g_now); // make sure it allocates tsstorage before we do anything, like primeHints or so..
3278 SyncRes::setDomainMap(g_initialDomainMap);
3279 t_allowFrom = g_initialAllowFrom;
3280 t_udpclientsocks = std::unique_ptr<UDPClientSocks>(new UDPClientSocks());
3281 t_tcpClientCounts = std::unique_ptr<tcpClientCounts_t>(new tcpClientCounts_t());
3282 primeHints();
3283
3284 t_packetCache = std::unique_ptr<RecursorPacketCache>(new RecursorPacketCache());
3285
3286 #ifdef HAVE_PROTOBUF
3287 t_uuidGenerator = std::unique_ptr<boost::uuids::random_generator>(new boost::uuids::random_generator());
3288 #endif
3289 L<<Logger::Warning<<"Done priming cache with root hints"<<endl;
3290
3291 if(worker) {
3292 try {
3293 if(!::arg()["lua-dns-script"].empty()) {
3294 t_pdl = std::make_shared<RecursorLua4>(::arg()["lua-dns-script"]);
3295 L<<Logger::Warning<<"Loaded 'lua' script from '"<<::arg()["lua-dns-script"]<<"'"<<endl;
3296 }
3297 }
3298 catch(std::exception &e) {
3299 L<<Logger::Error<<"Failed to load 'lua' script from '"<<::arg()["lua-dns-script"]<<"': "<<e.what()<<endl;
3300 _exit(99);
3301 }
3302 }
3303
3304 unsigned int ringsize=::arg().asNum("stats-ringbuffer-entries") / g_numWorkerThreads;
3305 if(ringsize) {
3306 t_remotes = std::unique_ptr<addrringbuf_t>(new addrringbuf_t());
3307 if(g_weDistributeQueries) // if so, only 1 thread does recvfrom
3308 t_remotes->set_capacity(::arg().asNum("stats-ringbuffer-entries"));
3309 else
3310 t_remotes->set_capacity(ringsize);
3311 t_servfailremotes = std::unique_ptr<addrringbuf_t>(new addrringbuf_t());
3312 t_servfailremotes->set_capacity(ringsize);
3313 t_largeanswerremotes = std::unique_ptr<addrringbuf_t>(new addrringbuf_t());
3314 t_largeanswerremotes->set_capacity(ringsize);
3315
3316 t_queryring = std::unique_ptr<boost::circular_buffer<pair<DNSName, uint16_t> > >(new boost::circular_buffer<pair<DNSName, uint16_t> >());
3317 t_queryring->set_capacity(ringsize);
3318 t_servfailqueryring = std::unique_ptr<boost::circular_buffer<pair<DNSName, uint16_t> > >(new boost::circular_buffer<pair<DNSName, uint16_t> >());
3319 t_servfailqueryring->set_capacity(ringsize);
3320 }
3321
3322 MT=std::unique_ptr<MTasker<PacketID,string> >(new MTasker<PacketID,string>(::arg().asNum("stack-size")));
3323
3324 PacketID pident;
3325
3326 t_fdm=getMultiplexer();
3327
3328 if(!worker) {
3329 if(::arg().mustDo("webserver")) {
3330 L<<Logger::Warning << "Enabling web server" << endl;
3331 try {
3332 new RecursorWebServer(t_fdm);
3333 }
3334 catch(PDNSException &e) {
3335 L<<Logger::Error<<"Exception: "<<e.reason<<endl;
3336 exit(99);
3337 }
3338 }
3339 L<<Logger::Error<<"Enabled '"<< t_fdm->getName() << "' multiplexer"<<endl;
3340 }
3341 else {
3342 t_fdm->addReadFD(g_pipes[t_id].readToThread, handlePipeRequest);
3343 t_fdm->addReadFD(g_pipes[t_id].readQueriesToThread, handlePipeRequest);
3344
3345 if(g_useOneSocketPerThread) {
3346 for(deferredAdd_t::const_iterator i = deferredAdds[t_id].cbegin(); i != deferredAdds[t_id].cend(); ++i) {
3347 t_fdm->addReadFD(i->first, i->second);
3348 }
3349 }
3350 else {
3351 if(!g_weDistributeQueries || t_id == s_distributorThreadID) { // if we distribute queries, only t_id = 0 listens
3352 for(deferredAdd_t::const_iterator i = deferredAdds[0].cbegin(); i != deferredAdds[0].cend(); ++i) {
3353 t_fdm->addReadFD(i->first, i->second);
3354 }
3355 }
3356 }
3357 }
3358
3359 registerAllStats();
3360
3361 if(!worker) {
3362 t_fdm->addReadFD(s_rcc.d_fd, handleRCC); // control channel
3363 }
3364
3365 unsigned int maxTcpClients=::arg().asNum("max-tcp-clients");
3366
3367 bool listenOnTCP(true);
3368
3369 time_t last_stat = 0;
3370 time_t last_carbon=0;
3371 time_t carbonInterval=::arg().asNum("carbon-interval");
3372 counter.store(0); // used to periodically execute certain tasks
3373 for(;;) {
3374 while(MT->schedule(&g_now)); // MTasker letting the mthreads do their thing
3375
3376 if(!(counter%500)) {
3377 MT->makeThread(houseKeeping, 0);
3378 }
3379
3380 if(!(counter%55)) {
3381 typedef vector<pair<int, FDMultiplexer::funcparam_t> > expired_t;
3382 expired_t expired=t_fdm->getTimeouts(g_now);
3383
3384 for(expired_t::iterator i=expired.begin() ; i != expired.end(); ++i) {
3385 shared_ptr<TCPConnection> conn=any_cast<shared_ptr<TCPConnection> >(i->second);
3386 if(g_logCommonErrors)
3387 L<<Logger::Warning<<"Timeout from remote TCP client "<< conn->d_remote.toString() <<endl;
3388 t_fdm->removeReadFD(i->first);
3389 }
3390 }
3391
3392 counter++;
3393
3394 if(!worker) {
3395 if(statsWanted || (g_statisticsInterval > 0 && (g_now.tv_sec - last_stat) >= g_statisticsInterval)) {
3396 doStats();
3397 last_stat = g_now.tv_sec;
3398 }
3399
3400 Utility::gettimeofday(&g_now, 0);
3401
3402 if((g_now.tv_sec - last_carbon) >= carbonInterval) {
3403 MT->makeThread(doCarbonDump, 0);
3404 last_carbon = g_now.tv_sec;
3405 }
3406 }
3407
3408 t_fdm->run(&g_now);
3409 // 'run' updates g_now for us
3410
3411 if(worker && (!g_weDistributeQueries || t_id == s_distributorThreadID)) { // if pdns distributes queries, only tid 0 should do this
3412 if(listenOnTCP) {
3413 if(TCPConnection::getCurrentConnections() > maxTcpClients) { // shutdown, too many connections
3414 for(const auto fd : g_tcpListenSockets[t_id]) {
3415 t_fdm->removeReadFD(fd);
3416 }
3417 listenOnTCP=false;
3418 }
3419 }
3420 else {
3421 if(TCPConnection::getCurrentConnections() <= maxTcpClients) { // reenable
3422 for(const auto fd : g_tcpListenSockets[t_id]) {
3423 t_fdm->addReadFD(fd, handleNewTCPQuestion);
3424 }
3425 listenOnTCP=true;
3426 }
3427 }
3428 }
3429 }
3430 }
3431 catch(PDNSException &ae) {
3432 L<<Logger::Error<<"Exception: "<<ae.reason<<endl;
3433 return 0;
3434 }
3435 catch(std::exception &e) {
3436 L<<Logger::Error<<"STL Exception: "<<e.what()<<endl;
3437 return 0;
3438 }
3439 catch(...) {
3440 L<<Logger::Error<<"any other exception in main: "<<endl;
3441 return 0;
3442 }
3443
3444
3445 int main(int argc, char **argv)
3446 {
3447 g_argc = argc;
3448 g_argv = argv;
3449 g_stats.startupTime=time(0);
3450 versionSetProduct(ProductRecursor);
3451 reportBasicTypes();
3452 reportOtherTypes();
3453
3454 int ret = EXIT_SUCCESS;
3455
3456 try {
3457 ::arg().set("stack-size","stack size per mthread")="200000";
3458 ::arg().set("soa-minimum-ttl","Don't change")="0";
3459 ::arg().set("no-shuffle","Don't change")="off";
3460 ::arg().set("local-port","port to listen on")="53";
3461 ::arg().set("local-address","IP addresses to listen on, separated by spaces or commas. Also accepts ports.")="127.0.0.1";
3462 ::arg().setSwitch("non-local-bind", "Enable binding to non-local addresses by using FREEBIND / BINDANY socket options")="no";
3463 ::arg().set("trace","if we should output heaps of logging. set to 'fail' to only log failing domains")="off";
3464 ::arg().set("dnssec", "DNSSEC mode: off/process-no-validate (default)/process/log-fail/validate")="process-no-validate";
3465 ::arg().set("dnssec-log-bogus", "Log DNSSEC bogus validations")="no";
3466 ::arg().set("signature-inception-skew", "Allow the signture inception to be off by this number of seconds")="0";
3467 ::arg().set("daemon","Operate as a daemon")="no";
3468 ::arg().setSwitch("write-pid","Write a PID file")="yes";
3469 ::arg().set("loglevel","Amount of logging. Higher is more. Do not set below 3")="6";
3470 ::arg().set("disable-syslog","Disable logging to syslog, useful when running inside a supervisor that logs stdout")="no";
3471 ::arg().set("log-timestamp","Print timestamps in log lines, useful to disable when running with a tool that timestamps stdout already")="yes";
3472 ::arg().set("log-common-errors","If we should log rather common errors")="no";
3473 ::arg().set("chroot","switch to chroot jail")="";
3474 ::arg().set("setgid","If set, change group id to this gid for more security")="";
3475 ::arg().set("setuid","If set, change user id to this uid for more security")="";
3476 ::arg().set("network-timeout", "Wait this number of milliseconds for network i/o")="1500";
3477 ::arg().set("threads", "Launch this number of threads")="2";
3478 ::arg().set("processes", "Launch this number of processes (EXPERIMENTAL, DO NOT CHANGE)")="1"; // if we un-experimental this, need to fix openssl rand seeding for multiple PIDs!
3479 ::arg().set("config-name","Name of this virtual configuration - will rename the binary image")="";
3480 ::arg().set("api-config-dir", "Directory where REST API stores config and zones") = "";
3481 ::arg().set("api-key", "Static pre-shared authentication key for access to the REST API") = "";
3482 ::arg().set("api-logfile", "Location of the server logfile (used by the REST API)") = "/var/log/pdns.log";
3483 ::arg().set("api-readonly", "Disallow data modification through the REST API when set") = "no";
3484 ::arg().setSwitch("webserver", "Start a webserver (for REST API)") = "no";
3485 ::arg().set("webserver-address", "IP Address of webserver to listen on") = "127.0.0.1";
3486 ::arg().set("webserver-port", "Port of webserver to listen on") = "8082";
3487 ::arg().set("webserver-password", "Password required for accessing the webserver") = "";
3488 ::arg().set("webserver-allow-from","Webserver access is only allowed from these subnets")="127.0.0.1,::1";
3489 ::arg().set("carbon-ourname", "If set, overrides our reported hostname for carbon stats")="";
3490 ::arg().set("carbon-server", "If set, send metrics in carbon (graphite) format to this server IP address")="";
3491 ::arg().set("carbon-interval", "Number of seconds between carbon (graphite) updates")="30";
3492 ::arg().set("statistics-interval", "Number of seconds between printing of recursor statistics, 0 to disable")="1800";
3493 ::arg().set("quiet","Suppress logging of questions and answers")="";
3494 ::arg().set("logging-facility","Facility to log messages as. 0 corresponds to local0")="";
3495 ::arg().set("config-dir","Location of configuration directory (recursor.conf)")=SYSCONFDIR;
3496 ::arg().set("socket-owner","Owner of socket")="";
3497 ::arg().set("socket-group","Group of socket")="";
3498 ::arg().set("socket-mode", "Permissions for socket")="";
3499
3500 ::arg().set("socket-dir",string("Where the controlsocket will live, ")+LOCALSTATEDIR+" when unset and not chrooted" )="";
3501 ::arg().set("delegation-only","Which domains we only accept delegations from")="";
3502 ::arg().set("query-local-address","Source IP address for sending queries")="0.0.0.0";
3503 ::arg().set("query-local-address6","Source IPv6 address for sending queries. IF UNSET, IPv6 WILL NOT BE USED FOR OUTGOING QUERIES")="";
3504 ::arg().set("client-tcp-timeout","Timeout in seconds when talking to TCP clients")="2";
3505 ::arg().set("max-mthreads", "Maximum number of simultaneous Mtasker threads")="2048";
3506 ::arg().set("max-tcp-clients","Maximum number of simultaneous TCP clients")="128";
3507 ::arg().set("server-down-max-fails","Maximum number of consecutive timeouts (and unreachables) to mark a server as down ( 0 => disabled )")="64";
3508 ::arg().set("server-down-throttle-time","Number of seconds to throttle all queries to a server after being marked as down")="60";
3509 ::arg().set("hint-file", "If set, load root hints from this file")="";
3510 ::arg().set("max-cache-entries", "If set, maximum number of entries in the main cache")="1000000";
3511 ::arg().set("max-negative-ttl", "maximum number of seconds to keep a negative cached entry in memory")="3600";
3512 ::arg().set("max-cache-ttl", "maximum number of seconds to keep a cached entry in memory")="86400";
3513 ::arg().set("packetcache-ttl", "maximum number of seconds to keep a cached entry in packetcache")="3600";
3514 ::arg().set("max-packetcache-entries", "maximum number of entries to keep in the packetcache")="500000";
3515 ::arg().set("packetcache-servfail-ttl", "maximum number of seconds to keep a cached servfail entry in packetcache")="60";
3516 ::arg().set("server-id", "Returned when queried for 'id.server' TXT or NSID, defaults to hostname")="";
3517 ::arg().set("stats-ringbuffer-entries", "maximum number of packets to store statistics for")="10000";
3518 ::arg().set("version-string", "string reported on version.pdns or version.bind")=fullVersionString();
3519 ::arg().set("allow-from", "If set, only allow these comma separated netmasks to recurse")=LOCAL_NETS;
3520 ::arg().set("allow-from-file", "If set, load allowed netmasks from this file")="";
3521 ::arg().set("entropy-source", "If set, read entropy from this file")="/dev/urandom";
3522 ::arg().set("dont-query", "If set, do not query these netmasks for DNS data")=DONT_QUERY;
3523 ::arg().set("max-tcp-per-client", "If set, maximum number of TCP sessions per client (IP address)")="0";
3524 ::arg().set("max-tcp-queries-per-connection", "If set, maximum number of TCP queries in a TCP connection")="0";
3525 ::arg().set("spoof-nearmiss-max", "If non-zero, assume spoofing after this many near misses")="20";
3526 ::arg().set("single-socket", "If set, only use a single socket for outgoing queries")="off";
3527 ::arg().set("auth-zones", "Zones for which we have authoritative data, comma separated domain=file pairs ")="";
3528 ::arg().set("lua-config-file", "More powerful configuration options")="";
3529
3530 ::arg().set("forward-zones", "Zones for which we forward queries, comma separated domain=ip pairs")="";
3531 ::arg().set("forward-zones-recurse", "Zones for which we forward queries with recursion bit, comma separated domain=ip pairs")="";
3532 ::arg().set("forward-zones-file", "File with (+)domain=ip pairs for forwarding")="";
3533 ::arg().set("export-etc-hosts", "If we should serve up contents from /etc/hosts")="off";
3534 ::arg().set("export-etc-hosts-search-suffix", "Also serve up the contents of /etc/hosts with this suffix")="";
3535 ::arg().set("etc-hosts-file", "Path to 'hosts' file")="/etc/hosts";
3536 ::arg().set("serve-rfc1918", "If we should be authoritative for RFC 1918 private IP space")="yes";
3537 ::arg().set("lua-dns-script", "Filename containing an optional 'lua' script that will be used to modify dns answers")="";
3538 ::arg().set("latency-statistic-size","Number of latency values to calculate the qa-latency average")="10000";
3539 ::arg().setSwitch( "disable-packetcache", "Disable packetcache" )= "no";
3540 ::arg().set("ecs-ipv4-bits", "Number of bits of IPv4 address to pass for EDNS Client Subnet")="24";
3541 ::arg().set("ecs-ipv6-bits", "Number of bits of IPv6 address to pass for EDNS Client Subnet")="56";
3542 ::arg().set("edns-subnet-whitelist", "List of netmasks and domains that we should enable EDNS subnet for")="";
3543 ::arg().set("ecs-scope-zero-address", "Address to send to whitelisted authoritative servers for incoming queries with ECS prefix-length source of 0")="";
3544 ::arg().setSwitch( "use-incoming-edns-subnet", "Pass along received EDNS Client Subnet information")="no";
3545 ::arg().setSwitch( "pdns-distributes-queries", "If PowerDNS itself should distribute queries over threads")="yes";
3546 ::arg().setSwitch( "root-nx-trust", "If set, believe that an NXDOMAIN from the root means the TLD does not exist")="yes";
3547 ::arg().setSwitch( "any-to-tcp","Answer ANY queries with tc=1, shunting to TCP" )="no";
3548 ::arg().setSwitch( "lowercase-outgoing","Force outgoing questions to lowercase")="no";
3549 ::arg().setSwitch("gettag-needs-edns-options", "If EDNS Options should be extracted before calling the gettag() hook")="no";
3550 ::arg().set("udp-truncation-threshold", "Maximum UDP response size before we truncate")="1680";
3551 ::arg().set("edns-outgoing-bufsize", "Outgoing EDNS buffer size")="1680";
3552 ::arg().set("minimum-ttl-override", "Set under adverse conditions, a minimum TTL")="0";
3553 ::arg().set("max-qperq", "Maximum outgoing queries per query")="50";
3554 ::arg().set("max-total-msec", "Maximum total wall-clock time per query in milliseconds, 0 for unlimited")="7000";
3555 ::arg().set("max-recursion-depth", "Maximum number of internal recursion calls per query, 0 for unlimited")="40";
3556 ::arg().set("max-udp-queries-per-round", "Maximum number of UDP queries processed per recvmsg() round, before returning back to normal processing")="10000";
3557
3558 ::arg().set("include-dir","Include *.conf files from this directory")="";
3559 ::arg().set("security-poll-suffix","Domain name from which to query security update notifications")="secpoll.powerdns.com.";
3560
3561 ::arg().setSwitch("reuseport","Enable SO_REUSEPORT allowing multiple recursors processes to listen to 1 address")="no";
3562
3563 ::arg().setSwitch("snmp-agent", "If set, register as an SNMP agent")="no";
3564 ::arg().set("snmp-master-socket", "If set and snmp-agent is set, the socket to use to register to the SNMP master")="";
3565
3566 ::arg().set("tcp-fast-open", "Enable TCP Fast Open support on the listening sockets, using the supplied numerical value as the queue size")="0";
3567 ::arg().set("nsec3-max-iterations", "Maximum number of iterations allowed for an NSEC3 record")="2500";
3568
3569 ::arg().set("cpu-map", "Thread to CPU mapping, space separated thread-id=cpu1,cpu2..cpuN pairs")="";
3570
3571 ::arg().setSwitch("log-rpz-changes", "Log additions and removals to RPZ zones at Info level")="no";
3572
3573 ::arg().setCmd("help","Provide a helpful message");
3574 ::arg().setCmd("version","Print version string");
3575 ::arg().setCmd("config","Output blank configuration");
3576 L.toConsole(Logger::Info);
3577 ::arg().laxParse(argc,argv); // do a lax parse
3578
3579 string configname=::arg()["config-dir"]+"/recursor.conf";
3580 if(::arg()["config-name"]!="") {
3581 configname=::arg()["config-dir"]+"/recursor-"+::arg()["config-name"]+".conf";
3582 s_programname+="-"+::arg()["config-name"];
3583 }
3584 cleanSlashes(configname);
3585
3586 if(::arg().mustDo("config")) {
3587 cout<<::arg().configstring()<<endl;
3588 exit(0);
3589 }
3590
3591 if(!::arg().file(configname.c_str()))
3592 L<<Logger::Warning<<"Unable to parse configuration file '"<<configname<<"'"<<endl;
3593
3594 ::arg().parse(argc,argv);
3595
3596 if( !::arg()["chroot"].empty() && !::arg()["api-config-dir"].empty() && !::arg().mustDo("api-readonly") ) {
3597 L<<Logger::Error<<"Using chroot and a writable API is not possible"<<endl;
3598 exit(EXIT_FAILURE);
3599 }
3600
3601 if (::arg()["socket-dir"].empty()) {
3602 if (::arg()["chroot"].empty())
3603 ::arg().set("socket-dir") = LOCALSTATEDIR;
3604 else
3605 ::arg().set("socket-dir") = "/";
3606 }
3607
3608 ::arg().set("delegation-only")=toLower(::arg()["delegation-only"]);
3609
3610 if(::arg().asNum("threads")==1)
3611 ::arg().set("pdns-distributes-queries")="no";
3612
3613 if(::arg().mustDo("help")) {
3614 cout<<"syntax:"<<endl<<endl;
3615 cout<<::arg().helpstring(::arg()["help"])<<endl;
3616 exit(0);
3617 }
3618 if(::arg().mustDo("version")) {
3619 showProductVersion();
3620 showBuildConfiguration();
3621 exit(0);
3622 }
3623
3624 Logger::Urgency logUrgency = (Logger::Urgency)::arg().asNum("loglevel");
3625
3626 if (logUrgency < Logger::Error)
3627 logUrgency = Logger::Error;
3628 if(!g_quiet && logUrgency < Logger::Info) { // Logger::Info=6, Logger::Debug=7
3629 logUrgency = Logger::Info; // if you do --quiet=no, you need Info to also see the query log
3630 }
3631 L.setLoglevel(logUrgency);
3632 L.toConsole(logUrgency);
3633
3634 serviceMain(argc, argv);
3635 }
3636 catch(PDNSException &ae) {
3637 L<<Logger::Error<<"Exception: "<<ae.reason<<endl;
3638 ret=EXIT_FAILURE;
3639 }
3640 catch(std::exception &e) {
3641 L<<Logger::Error<<"STL Exception: "<<e.what()<<endl;
3642 ret=EXIT_FAILURE;
3643 }
3644 catch(...) {
3645 L<<Logger::Error<<"any other exception in main: "<<endl;
3646 ret=EXIT_FAILURE;
3647 }
3648
3649 return ret;
3650 }