]> git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/pdns_recursor.cc
Merge pull request #4062 from pieterlexis/dnsname-toLogString
[thirdparty/pdns.git] / pdns / pdns_recursor.cc
1 /*
2 PowerDNS Versatile Database Driven Nameserver
3 Copyright (C) 2003 - 2016 PowerDNS.COM BV
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License version 2
7 as published by the Free Software Foundation
8
9 Additionally, the license of this program contains a special
10 exception which allows to distribute the program in binary form when
11 it is linked against OpenSSL.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
21 */
22
23 #ifdef HAVE_CONFIG_H
24 #include "config.h"
25 #endif
26
27 #include <netdb.h>
28 #include <sys/stat.h>
29 #include <unistd.h>
30
31 #include "ws-recursor.hh"
32 #include <pthread.h>
33 #include "recpacketcache.hh"
34 #include "utility.hh"
35 #include "dns_random.hh"
36 #include "opensslsigners.hh"
37 #include <iostream>
38 #include <errno.h>
39 #include <boost/static_assert.hpp>
40 #include <map>
41 #include <set>
42 #include "recursor_cache.hh"
43 #include "cachecleaner.hh"
44 #include <stdio.h>
45 #include <signal.h>
46 #include <stdlib.h>
47 #include "misc.hh"
48 #include "mtasker.hh"
49 #include <utility>
50 #include "arguments.hh"
51 #include "syncres.hh"
52 #include <fcntl.h>
53 #include <fstream>
54 #include "sortlist.hh"
55 extern SortList g_sortlist;
56 #include "sstuff.hh"
57 #include <boost/tuple/tuple.hpp>
58 #include <boost/tuple/tuple_comparison.hpp>
59 #include <boost/shared_array.hpp>
60 #include <boost/function.hpp>
61 #include <boost/algorithm/string.hpp>
62 #ifdef MALLOC_TRACE
63 #include "malloctrace.hh"
64 #endif
65 #include <netinet/tcp.h>
66 #include "dnsparser.hh"
67 #include "dnswriter.hh"
68 #include "dnsrecords.hh"
69 #include "zoneparser-tng.hh"
70 #include "rec_channel.hh"
71 #include "logger.hh"
72 #include "iputils.hh"
73 #include "mplexer.hh"
74 #include "config.h"
75 #include "lua-recursor4.hh"
76 #include "version.hh"
77 #include "responsestats.hh"
78 #include "secpoll-recursor.hh"
79 #include "dnsname.hh"
80 #include "filterpo.hh"
81 #include "rpzloader.hh"
82 #include "validate-recursor.hh"
83 #include "rec-lua-conf.hh"
84 #include "ednsoptions.hh"
85 #include "gettime.hh"
86
87 #ifdef HAVE_PROTOBUF
88 #include <boost/uuid/uuid.hpp>
89 #include <boost/uuid/uuid_generators.hpp>
90 #include "dnsmessage.pb.h"
91 #endif
92
93 #ifdef HAVE_SYSTEMD
94 #include <systemd/sd-daemon.h>
95 #endif
96
97 __thread FDMultiplexer* t_fdm;
98 __thread unsigned int t_id;
99 unsigned int g_maxTCPPerClient;
100 unsigned int g_networkTimeoutMsec;
101 uint64_t g_latencyStatSize;
102 bool g_logCommonErrors;
103 bool g_anyToTcp;
104 uint16_t g_udpTruncationThreshold, g_outgoingEDNSBufsize;
105 __thread shared_ptr<RecursorLua4>* t_pdl;
106 bool g_lowercaseOutgoing;
107
108 __thread addrringbuf_t* t_remotes, *t_servfailremotes, *t_largeanswerremotes;
109
110 __thread boost::circular_buffer<pair<DNSName, uint16_t> >* t_queryring, *t_servfailqueryring;
111 __thread shared_ptr<Regex>* t_traceRegex;
112
113 #ifdef HAVE_PROTOBUF
114 __thread boost::uuids::random_generator* t_uuidGenerator;
115 #endif
116
117 NetmaskGroup g_ednssubnets;
118 SuffixMatchNode g_ednsdomains;
119
120 RecursorControlChannel s_rcc; // only active in thread 0
121
122 // for communicating with our threads
123 struct ThreadPipeSet
124 {
125 int writeToThread;
126 int readToThread;
127 int writeFromThread;
128 int readFromThread;
129 };
130
131 vector<ThreadPipeSet> g_pipes; // effectively readonly after startup
132
133 SyncRes::domainmap_t* g_initialDomainMap; // new threads needs this to be setup
134
135 #include "namespaces.hh"
136
137 __thread MemRecursorCache* t_RC;
138 __thread RecursorPacketCache* t_packetCache;
139 RecursorStats g_stats;
140 bool g_quiet;
141
142 bool g_weDistributeQueries; // if true, only 1 thread listens on the incoming query sockets
143
144 __thread NetmaskGroup* t_allowFrom;
145 static NetmaskGroup* g_initialAllowFrom; // new thread needs to be setup with this
146
147 NetmaskGroup* g_dontQuery;
148 string s_programname="pdns_recursor";
149
150 typedef vector<int> tcpListenSockets_t;
151 tcpListenSockets_t g_tcpListenSockets; // shared across threads, but this is fine, never written to from a thread. All threads listen on all sockets
152 int g_tcpTimeout;
153 unsigned int g_maxMThreads;
154 __thread struct timeval g_now; // timestamp, updated (too) frequently
155 typedef map<int, ComboAddress> listenSocketsAddresses_t; // is shared across all threads right now
156 listenSocketsAddresses_t g_listenSocketsAddresses; // is shared across all threads right now
157 set<int> g_fromtosockets; // listen sockets that use 'sendfromto()' mechanism
158
159 __thread MT_t* MT; // the big MTasker
160
161 unsigned int g_numThreads, g_numWorkerThreads;
162
163 #define LOCAL_NETS "127.0.0.0/8, 10.0.0.0/8, 100.64.0.0/10, 169.254.0.0/16, 192.168.0.0/16, 172.16.0.0/12, ::1/128, fc00::/7, fe80::/10"
164 // Bad Nets taken from both:
165 // http://www.iana.org/assignments/iana-ipv4-special-registry/iana-ipv4-special-registry.xhtml
166 // and
167 // http://www.iana.org/assignments/iana-ipv6-special-registry/iana-ipv6-special-registry.xhtml
168 // where such a network may not be considered a valid destination
169 #define BAD_NETS "0.0.0.0/8, 192.0.0.0/24, 192.0.2.0/24, 198.51.100.0/24, 203.0.113.0/24, 240.0.0.0/4, ::/96, ::ffff:0:0/96, 100::/64, 2001:db8::/32"
170 #define DONT_QUERY LOCAL_NETS ", " BAD_NETS
171
172 //! used to send information to a newborn mthread
173 struct DNSComboWriter {
174 DNSComboWriter(const char* data, uint16_t len, const struct timeval& now) : d_mdp(data, len), d_now(now),
175 d_tcp(false), d_socket(-1)
176 {}
177 MOADNSParser d_mdp;
178 void setRemote(const ComboAddress* sa)
179 {
180 d_remote=*sa;
181 }
182
183 void setLocal(const ComboAddress& sa)
184 {
185 d_local=sa;
186 }
187
188
189 void setSocket(int sock)
190 {
191 d_socket=sock;
192 }
193
194 string getRemote() const
195 {
196 return d_remote.toString();
197 }
198
199 struct timeval d_now;
200 ComboAddress d_remote, d_local;
201 #ifdef HAVE_PROTOBUF
202 boost::uuids::uuid d_uuid;
203 Netmask d_ednssubnet;
204 #endif
205 bool d_tcp;
206 int d_socket;
207 int d_tag{0};
208 string d_query;
209 shared_ptr<TCPConnection> d_tcpConnection;
210 vector<pair<uint16_t, string> > d_ednsOpts;
211 std::vector<std::string> d_policyTags;
212 };
213
214
215 ArgvMap &arg()
216 {
217 static ArgvMap theArg;
218 return theArg;
219 }
220
221
222 void handleTCPClientWritable(int fd, FDMultiplexer::funcparam_t& var);
223
224 // -1 is error, 0 is timeout, 1 is success
225 int asendtcp(const string& data, Socket* sock)
226 {
227 PacketID pident;
228 pident.sock=sock;
229 pident.outMSG=data;
230
231 t_fdm->addWriteFD(sock->getHandle(), handleTCPClientWritable, pident);
232 string packet;
233
234 int ret=MT->waitEvent(pident, &packet, g_networkTimeoutMsec);
235
236 if(!ret || ret==-1) { // timeout
237 t_fdm->removeWriteFD(sock->getHandle());
238 }
239 else if(packet.size() !=data.size()) { // main loop tells us what it sent out, or empty in case of an error
240 return -1;
241 }
242 return ret;
243 }
244
245 void handleTCPClientReadable(int fd, FDMultiplexer::funcparam_t& var);
246
247 // -1 is error, 0 is timeout, 1 is success
248 int arecvtcp(string& data, size_t len, Socket* sock, bool incompleteOkay)
249 {
250 data.clear();
251 PacketID pident;
252 pident.sock=sock;
253 pident.inNeeded=len;
254 pident.inIncompleteOkay=incompleteOkay;
255 t_fdm->addReadFD(sock->getHandle(), handleTCPClientReadable, pident);
256
257 int ret=MT->waitEvent(pident,&data, g_networkTimeoutMsec);
258 if(!ret || ret==-1) { // timeout
259 t_fdm->removeReadFD(sock->getHandle());
260 }
261 else if(data.empty()) {// error, EOF or other
262 return -1;
263 }
264
265 return ret;
266 }
267
268 void handleGenUDPQueryResponse(int fd, FDMultiplexer::funcparam_t& var)
269 {
270 PacketID pident=*any_cast<PacketID>(&var);
271 char resp[512];
272 ssize_t ret=recv(fd, resp, sizeof(resp), 0);
273 t_fdm->removeReadFD(fd);
274 if(ret >= 0) {
275 string data(resp, (size_t) ret);
276 MT->sendEvent(pident, &data);
277 }
278 else {
279 string empty;
280 MT->sendEvent(pident, &empty);
281 // cerr<<"Had some kind of error: "<<ret<<", "<<strerror(errno)<<endl;
282 }
283 }
284 string GenUDPQueryResponse(const ComboAddress& dest, const string& query)
285 {
286 Socket s(dest.sin4.sin_family, SOCK_DGRAM);
287 s.setNonBlocking();
288 ComboAddress local = getQueryLocalAddress(dest.sin4.sin_family, 0);
289
290 s.bind(local);
291 s.connect(dest);
292 s.send(query);
293
294 PacketID pident;
295 pident.sock=&s;
296 pident.type=0;
297 t_fdm->addReadFD(s.getHandle(), handleGenUDPQueryResponse, pident);
298
299 string data;
300
301 int ret=MT->waitEvent(pident,&data, g_networkTimeoutMsec);
302
303 if(!ret || ret==-1) { // timeout
304 t_fdm->removeReadFD(s.getHandle());
305 }
306 else if(data.empty()) {// error, EOF or other
307 // we could special case this
308 return data;
309 }
310 return data;
311 }
312
313
314 vector<ComboAddress> g_localQueryAddresses4, g_localQueryAddresses6;
315 const ComboAddress g_local4("0.0.0.0"), g_local6("::");
316
317 //! pick a random query local address
318 ComboAddress getQueryLocalAddress(int family, uint16_t port)
319 {
320 ComboAddress ret;
321 if(family==AF_INET) {
322 if(g_localQueryAddresses4.empty())
323 ret = g_local4;
324 else
325 ret = g_localQueryAddresses4[dns_random(g_localQueryAddresses4.size())];
326 ret.sin4.sin_port = htons(port);
327 }
328 else {
329 if(g_localQueryAddresses6.empty())
330 ret = g_local6;
331 else
332 ret = g_localQueryAddresses6[dns_random(g_localQueryAddresses6.size())];
333
334 ret.sin6.sin6_port = htons(port);
335 }
336 return ret;
337 }
338
339 void handleUDPServerResponse(int fd, FDMultiplexer::funcparam_t&);
340
341 void setSocketBuffer(int fd, int optname, uint32_t size)
342 {
343 uint32_t psize=0;
344 socklen_t len=sizeof(psize);
345
346 if(!getsockopt(fd, SOL_SOCKET, optname, (char*)&psize, &len) && psize > size) {
347 L<<Logger::Error<<"Not decreasing socket buffer size from "<<psize<<" to "<<size<<endl;
348 return;
349 }
350
351 if (setsockopt(fd, SOL_SOCKET, optname, (char*)&size, sizeof(size)) < 0 )
352 L<<Logger::Error<<"Unable to raise socket buffer size to "<<size<<": "<<strerror(errno)<<endl;
353 }
354
355
356 static void setSocketReceiveBuffer(int fd, uint32_t size)
357 {
358 setSocketBuffer(fd, SO_RCVBUF, size);
359 }
360
361 static void setSocketSendBuffer(int fd, uint32_t size)
362 {
363 setSocketBuffer(fd, SO_SNDBUF, size);
364 }
365
366
367 // you can ask this class for a UDP socket to send a query from
368 // this socket is not yours, don't even think about deleting it
369 // but after you call 'returnSocket' on it, don't assume anything anymore
370 class UDPClientSocks
371 {
372 unsigned int d_numsocks;
373 public:
374 UDPClientSocks() : d_numsocks(0)
375 {
376 }
377
378 typedef set<int> socks_t;
379 socks_t d_socks;
380
381 // returning -2 means: temporary OS error (ie, out of files), -1 means error related to remote
382 int getSocket(const ComboAddress& toaddr, int* fd)
383 {
384 *fd=makeClientSocket(toaddr.sin4.sin_family);
385 if(*fd < 0) // temporary error - receive exception otherwise
386 return -2;
387
388 if(connect(*fd, (struct sockaddr*)(&toaddr), toaddr.getSocklen()) < 0) {
389 int err = errno;
390 // returnSocket(*fd);
391 closesocket(*fd);
392 if(err==ENETUNREACH) // Seth "My Interfaces Are Like A Yo Yo" Arnold special
393 return -2;
394 return -1;
395 }
396
397 d_socks.insert(*fd);
398 d_numsocks++;
399 return 0;
400 }
401
402 void returnSocket(int fd)
403 {
404 socks_t::iterator i=d_socks.find(fd);
405 if(i==d_socks.end()) {
406 throw PDNSException("Trying to return a socket (fd="+std::to_string(fd)+") not in the pool");
407 }
408 returnSocketLocked(i);
409 }
410
411 // return a socket to the pool, or simply erase it
412 void returnSocketLocked(socks_t::iterator& i)
413 {
414 if(i==d_socks.end()) {
415 throw PDNSException("Trying to return a socket not in the pool");
416 }
417 try {
418 t_fdm->removeReadFD(*i);
419 }
420 catch(FDMultiplexerException& e) {
421 // we sometimes return a socket that has not yet been assigned to t_fdm
422 }
423 closesocket(*i);
424
425 d_socks.erase(i++);
426 --d_numsocks;
427 }
428
429 // returns -1 for errors which might go away, throws for ones that won't
430 static int makeClientSocket(int family)
431 {
432 int ret=socket(family, SOCK_DGRAM, 0 ); // turns out that setting CLO_EXEC and NONBLOCK from here is not a performance win on Linux (oddly enough)
433
434 if(ret < 0 && errno==EMFILE) // this is not a catastrophic error
435 return ret;
436
437 if(ret<0)
438 throw PDNSException("Making a socket for resolver (family = "+std::to_string(family)+"): "+stringerror());
439
440 // setCloseOnExec(ret); // we're not going to exec
441
442 int tries=10;
443 ComboAddress sin;
444 while(--tries) {
445 uint16_t port;
446
447 if(tries==1) // fall back to kernel 'random'
448 port = 0;
449 else
450 port = 1025 + dns_random(64510);
451
452 sin=getQueryLocalAddress(family, port); // does htons for us
453
454 if (::bind(ret, (struct sockaddr *)&sin, sin.getSocklen()) >= 0)
455 break;
456 }
457 if(!tries)
458 throw PDNSException("Resolver binding to local query client socket on "+sin.toString()+": "+stringerror());
459
460 setNonBlocking(ret);
461 return ret;
462 }
463 };
464
465 static __thread UDPClientSocks* t_udpclientsocks;
466
467 /* these two functions are used by LWRes */
468 // -2 is OS error, -1 is error that depends on the remote, > 0 is success
469 int asendto(const char *data, size_t len, int flags,
470 const ComboAddress& toaddr, uint16_t id, const DNSName& domain, uint16_t qtype, int* fd)
471 {
472
473 PacketID pident;
474 pident.domain = domain;
475 pident.remote = toaddr;
476 pident.type = qtype;
477
478 // see if there is an existing outstanding request we can chain on to, using partial equivalence function
479 pair<MT_t::waiters_t::iterator, MT_t::waiters_t::iterator> chain=MT->d_waiters.equal_range(pident, PacketIDBirthdayCompare());
480
481 for(; chain.first != chain.second; chain.first++) {
482 if(chain.first->key.fd > -1) { // don't chain onto existing chained waiter!
483 /*
484 cerr<<"Orig: "<<pident.domain<<", "<<pident.remote.toString()<<", id="<<id<<endl;
485 cerr<<"Had hit: "<< chain.first->key.domain<<", "<<chain.first->key.remote.toString()<<", id="<<chain.first->key.id
486 <<", count="<<chain.first->key.chain.size()<<", origfd: "<<chain.first->key.fd<<endl;
487 */
488 chain.first->key.chain.insert(id); // we can chain
489 *fd=-1; // gets used in waitEvent / sendEvent later on
490 return 1;
491 }
492 }
493
494 int ret=t_udpclientsocks->getSocket(toaddr, fd);
495 if(ret < 0)
496 return ret;
497
498 pident.fd=*fd;
499 pident.id=id;
500
501 t_fdm->addReadFD(*fd, handleUDPServerResponse, pident);
502 ret = send(*fd, data, len, 0);
503
504 int tmp = errno;
505
506 if(ret < 0)
507 t_udpclientsocks->returnSocket(*fd);
508
509 errno = tmp; // this is for logging purposes only
510 return ret;
511 }
512
513 // -1 is error, 0 is timeout, 1 is success
514 int arecvfrom(char *data, size_t len, int flags, const ComboAddress& fromaddr, size_t *d_len,
515 uint16_t id, const DNSName& domain, uint16_t qtype, int fd, struct timeval* now)
516 {
517 static optional<unsigned int> nearMissLimit;
518 if(!nearMissLimit)
519 nearMissLimit=::arg().asNum("spoof-nearmiss-max");
520
521 PacketID pident;
522 pident.fd=fd;
523 pident.id=id;
524 pident.domain=domain;
525 pident.type = qtype;
526 pident.remote=fromaddr;
527
528 string packet;
529 int ret=MT->waitEvent(pident, &packet, g_networkTimeoutMsec, now);
530
531 if(ret > 0) {
532 if(packet.empty()) // means "error"
533 return -1;
534
535 *d_len=packet.size();
536 memcpy(data,packet.c_str(),min(len,*d_len));
537 if(*nearMissLimit && pident.nearMisses > *nearMissLimit) {
538 L<<Logger::Error<<"Too many ("<<pident.nearMisses<<" > "<<*nearMissLimit<<") bogus answers for '"<<domain<<"' from "<<fromaddr.toString()<<", assuming spoof attempt."<<endl;
539 g_stats.spoofCount++;
540 return -1;
541 }
542 }
543 else {
544 if(fd >= 0)
545 t_udpclientsocks->returnSocket(fd);
546 }
547 return ret;
548 }
549
550
551 string s_pidfname;
552 static void writePid(void)
553 {
554 if(!::arg().mustDo("write-pid"))
555 return;
556 ofstream of(s_pidfname.c_str(), std::ios_base::app);
557 if(of)
558 of<< Utility::getpid() <<endl;
559 else
560 L<<Logger::Error<<"Writing pid for "<<Utility::getpid()<<" to "<<s_pidfname<<" failed: "<<strerror(errno)<<endl;
561 }
562
563 typedef map<ComboAddress, uint32_t, ComboAddress::addressOnlyLessThan> tcpClientCounts_t;
564 tcpClientCounts_t __thread* t_tcpClientCounts;
565
566 TCPConnection::TCPConnection(int fd, const ComboAddress& addr) : d_remote(addr), d_fd(fd)
567 {
568 ++s_currentConnections;
569 (*t_tcpClientCounts)[d_remote]++;
570 }
571
572 TCPConnection::~TCPConnection()
573 {
574 if(closesocket(d_fd) < 0)
575 unixDie("closing socket for TCPConnection");
576 if(t_tcpClientCounts->count(d_remote) && !(*t_tcpClientCounts)[d_remote]--)
577 t_tcpClientCounts->erase(d_remote);
578 --s_currentConnections;
579 }
580
581 AtomicCounter TCPConnection::s_currentConnections;
582 void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var);
583
584 // the idea is, only do things that depend on the *response* here. Incoming accounting is on incoming.
585 void updateResponseStats(int res, const ComboAddress& remote, unsigned int packetsize, const DNSName* query, uint16_t qtype)
586 {
587 if(packetsize > 1000 && t_largeanswerremotes)
588 t_largeanswerremotes->push_back(remote);
589 switch(res) {
590 case RCode::ServFail:
591 if(t_servfailremotes) {
592 t_servfailremotes->push_back(remote);
593 if(query) // packet cache
594 t_servfailqueryring->push_back(make_pair(*query, qtype));
595 }
596 g_stats.servFails++;
597 break;
598 case RCode::NXDomain:
599 g_stats.nxDomains++;
600 break;
601 case RCode::NoError:
602 g_stats.noErrors++;
603 break;
604 }
605 }
606
607 static string makeLoginfo(DNSComboWriter* dc)
608 try
609 {
610 return "("+dc->d_mdp.d_qname.toLogString()+"/"+DNSRecordContent::NumberToType(dc->d_mdp.d_qtype)+" from "+(dc->d_remote.toString())+")";
611 }
612 catch(...)
613 {
614 return "Exception making error message for exception";
615 }
616
617 #ifdef HAVE_PROTOBUF
618 static void protobufUpdateMessage(PBDNSMessage& message, const boost::uuids::uuid& uniqueId, const ComboAddress& remote, const ComboAddress& local, const Netmask& ednssubnet, bool tcp, uint16_t id)
619 {
620 std::string* messageId = message.mutable_messageid();
621 messageId->resize(uniqueId.size());
622 std::copy(uniqueId.begin(), uniqueId.end(), messageId->begin());
623
624 message.set_socketfamily(remote.sin4.sin_family == AF_INET ? PBDNSMessage_SocketFamily_INET : PBDNSMessage_SocketFamily_INET6);
625 message.set_socketprotocol(tcp ? PBDNSMessage_SocketProtocol_TCP : PBDNSMessage_SocketProtocol_UDP);
626
627 if (local.sin4.sin_family == AF_INET) {
628 message.set_to(&local.sin4.sin_addr.s_addr, sizeof(local.sin4.sin_addr.s_addr));
629 }
630 else if (local.sin4.sin_family == AF_INET6) {
631 message.set_to(&local.sin6.sin6_addr.s6_addr, sizeof(local.sin6.sin6_addr.s6_addr));
632 }
633 if (remote.sin4.sin_family == AF_INET) {
634 message.set_from(&remote.sin4.sin_addr.s_addr, sizeof(remote.sin4.sin_addr.s_addr));
635 }
636 else if (remote.sin4.sin_family == AF_INET6) {
637 message.set_from(&remote.sin6.sin6_addr.s6_addr, sizeof(remote.sin6.sin6_addr.s6_addr));
638 }
639 if (!ednssubnet.empty()) {
640 const ComboAddress ca = ednssubnet.getNetwork();
641 if (ca.sin4.sin_family == AF_INET) {
642 message.set_originalrequestorsubnet(&ca.sin4.sin_addr.s_addr, sizeof(ca.sin4.sin_addr.s_addr));
643 }
644 else if (ca.sin4.sin_family == AF_INET6) {
645 message.set_originalrequestorsubnet(&ca.sin6.sin6_addr.s6_addr, sizeof(ca.sin6.sin6_addr.s6_addr));
646 }
647 }
648
649 struct timespec ts;
650 gettime(&ts, true);
651 message.set_timesec(ts.tv_sec);
652 message.set_timeusec(ts.tv_nsec / 1000);
653
654 message.set_id(ntohs(id));
655 }
656
657 static void protobufFillMessage(PBDNSMessage& message, const boost::uuids::uuid& uniqueId, const ComboAddress& remote, const ComboAddress& local, const Netmask& ednssubnet, bool tcp, uint16_t id, const DNSName& qname, uint16_t qtype, uint16_t qclass)
658 {
659 protobufUpdateMessage(message, uniqueId, remote, local, ednssubnet, tcp, id);
660
661 PBDNSMessage_DNSQuestion* question = message.mutable_question();
662 question->set_qname(qname.toString());
663 question->set_qtype(qtype);
664 question->set_qclass(qclass);
665 }
666
667 static void protobufFillMessageFromDC(PBDNSMessage& message, const DNSComboWriter* dc)
668 {
669 protobufFillMessage(message, dc->d_uuid, dc->d_remote, dc->d_local, dc->d_ednssubnet, dc->d_tcp, dc->d_mdp.d_header.id, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_mdp.d_qclass);
670 }
671
672 static void protobufLogQuery(const std::shared_ptr<RemoteLogger>& logger, const boost::uuids::uuid& uniqueId, const ComboAddress& remote, const ComboAddress& local, const Netmask& ednssubnet, bool tcp, uint16_t id, size_t len, const DNSName& qname, uint16_t qtype, uint16_t qclass, const std::string appliedPolicy, const std::vector<std::string>& policyTags)
673 {
674 PBDNSMessage message;
675 message.set_type(PBDNSMessage_Type_DNSQueryType);
676 message.set_inbytes(len);
677 protobufFillMessage(message, uniqueId, remote, local, ednssubnet, tcp, id, qname, qtype, qclass);
678
679 /* just fake it up for now */
680 PBDNSMessage_DNSResponse* response = message.mutable_response();
681 if (!appliedPolicy.empty()) {
682 response->set_appliedpolicy(appliedPolicy);
683 }
684 if (!policyTags.empty()) {
685 for(const auto tag : policyTags) {
686 response->add_tags(tag);
687 }
688 }
689
690 // cerr <<message.DebugString()<<endl;
691 std::string str;
692 message.SerializeToString(&str);
693 logger->queueData(str);
694 }
695
696 static void protobufFillResponseFromDC(PBDNSMessage& message, const DNSComboWriter* dc, size_t responseSize)
697 {
698 message.set_type(PBDNSMessage_Type_DNSResponseType);
699 message.set_inbytes(responseSize);
700 protobufFillMessageFromDC(message, dc);
701 }
702
703 static void protobufLogResponse(const std::shared_ptr<RemoteLogger>& logger, const PBDNSMessage& message)
704 {
705 // cerr <<message.DebugString()<<endl;
706 std::string str;
707 message.SerializeToString(&str);
708 logger->queueData(str);
709 }
710 #endif
711
712 void startDoResolve(void *p)
713 {
714 DNSComboWriter* dc=(DNSComboWriter *)p;
715 try {
716 t_queryring->push_back(make_pair(dc->d_mdp.d_qname, dc->d_mdp.d_qtype));
717
718 uint32_t maxanswersize= dc->d_tcp ? 65535 : min((uint16_t) 512, g_udpTruncationThreshold);
719 EDNSOpts edo;
720 bool haveEDNS=false;
721 if(getEDNSOpts(dc->d_mdp, &edo)) {
722 if(!dc->d_tcp)
723 maxanswersize = min(edo.d_packetsize, g_udpTruncationThreshold);
724 dc->d_ednsOpts = edo.d_options;
725 haveEDNS=true;
726 }
727 vector<DNSRecord> ret;
728 vector<uint8_t> packet;
729
730 auto luaconfsLocal = g_luaconfs.getLocal();
731 std::string appliedPolicy;
732 #ifdef HAVE_PROTOBUF
733 PBDNSMessage pbMessage;
734 PBDNSMessage_DNSResponse* protobufResponse = pbMessage.mutable_response();
735 #endif
736
737 DNSPacketWriter pw(packet, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_mdp.d_qclass);
738
739 pw.getHeader()->aa=0;
740 pw.getHeader()->ra=1;
741 pw.getHeader()->qr=1;
742 pw.getHeader()->tc=0;
743 pw.getHeader()->id=dc->d_mdp.d_header.id;
744 pw.getHeader()->rd=dc->d_mdp.d_header.rd;
745 pw.getHeader()->cd=dc->d_mdp.d_header.cd;
746
747 // DO NOT MOVE THIS CODE UP - DNSPacketWriter needs to get the original-cased version
748 if (g_lowercaseOutgoing)
749 dc->d_mdp.d_qname = DNSName(toLower(dc->d_mdp.d_qname.toString()));
750
751 uint32_t minTTL=std::numeric_limits<uint32_t>::max();
752
753 SyncRes sr(dc->d_now);
754 bool DNSSECOK=false;
755 if(t_pdl) {
756 sr.setLuaEngine(*t_pdl);
757 sr.d_requestor=dc->d_remote;
758 }
759
760 if(g_dnssecmode != DNSSECMode::Off) {
761 sr.d_doDNSSEC=true;
762
763 // Does the requestor want DNSSEC records?
764 if(edo.d_Z & EDNSOpts::DNSSECOK) {
765 DNSSECOK=true;
766 g_stats.dnssecQueries++;
767 }
768 } else {
769 // Ignore the client-set CD flag
770 pw.getHeader()->cd=0;
771 }
772
773 bool tracedQuery=false; // we could consider letting Lua know about this too
774 bool variableAnswer = false;
775
776 int res;
777 DNSFilterEngine::Policy dfepol;
778 DNSRecord spoofed;
779 if(dc->d_mdp.d_qtype==QType::ANY && !dc->d_tcp && g_anyToTcp) {
780 pw.getHeader()->tc = 1;
781 res = 0;
782 variableAnswer = true;
783 goto sendit;
784 }
785
786 if(t_traceRegex->get() && (*t_traceRegex)->match(dc->d_mdp.d_qname.toString())) {
787 sr.setLogMode(SyncRes::Store);
788 tracedQuery=true;
789 }
790
791
792 if(!g_quiet || tracedQuery) {
793 L<<Logger::Warning<<t_id<<" ["<<MT->getTid()<<"/"<<MT->numProcesses()<<"] " << (dc->d_tcp ? "TCP " : "") << "question for '"<<dc->d_mdp.d_qname<<"|"
794 <<DNSRecordContent::NumberToType(dc->d_mdp.d_qtype)<<"' from "<<dc->getRemote();
795 #ifdef HAVE_PROTOBUF
796 if(!dc->d_ednssubnet.empty()) {
797 L<<" (ecs "<<dc->d_ednssubnet.toString()<<")";
798 }
799 #endif
800 L<<endl;
801 }
802
803 sr.setId(MT->getTid());
804 if(!dc->d_mdp.d_header.rd)
805 sr.setCacheOnly();
806
807
808 // if there is a RecursorLua active, and it 'took' the query in preResolve, we don't launch beginResolve
809
810 dfepol = luaconfsLocal->dfe.getQueryPolicy(dc->d_mdp.d_qname, dc->d_remote);
811
812 switch(dfepol.d_kind) {
813 case DNSFilterEngine::PolicyKind::NoAction:
814 break;
815 case DNSFilterEngine::PolicyKind::Drop:
816 g_stats.policyDrops++;
817 delete dc;
818 dc=0;
819 return;
820 case DNSFilterEngine::PolicyKind::NXDOMAIN:
821 res=RCode::NXDomain;
822 appliedPolicy=dfepol.d_name;
823 goto haveAnswer;
824
825 case DNSFilterEngine::PolicyKind::NODATA:
826 res=RCode::NoError;
827 appliedPolicy=dfepol.d_name;
828 goto haveAnswer;
829
830 case DNSFilterEngine::PolicyKind::Custom:
831 res=RCode::NoError;
832 spoofed.d_name=dc->d_mdp.d_qname;
833 spoofed.d_type=dfepol.d_custom->getType();
834 spoofed.d_ttl = dfepol.d_ttl;
835 spoofed.d_class = 1;
836 spoofed.d_content = dfepol.d_custom;
837 spoofed.d_place = DNSResourceRecord::ANSWER;
838 ret.push_back(spoofed);
839 appliedPolicy=dfepol.d_name;
840 goto haveAnswer;
841
842
843 case DNSFilterEngine::PolicyKind::Truncate:
844 if(!dc->d_tcp) {
845 res=RCode::NoError;
846 pw.getHeader()->tc=1;
847 appliedPolicy=dfepol.d_name;
848 goto haveAnswer;
849 }
850 break;
851 }
852
853
854 if(!t_pdl->get() || !(*t_pdl)->preresolve(dc->d_remote, dc->d_local, dc->d_mdp.d_qname, QType(dc->d_mdp.d_qtype), dc->d_tcp, ret, dc->d_ednsOpts.empty() ? 0 : &dc->d_ednsOpts, dc->d_tag, &appliedPolicy, &dc->d_policyTags, res, &variableAnswer)) {
855 try {
856 res = sr.beginResolve(dc->d_mdp.d_qname, QType(dc->d_mdp.d_qtype), dc->d_mdp.d_qclass, ret);
857 }
858 catch(ImmediateServFailException &e) {
859 if(g_logCommonErrors)
860 L<<Logger::Notice<<"Sending SERVFAIL to "<<dc->getRemote()<<" during resolve of '"<<dc->d_mdp.d_qname<<"' because: "<<e.reason<<endl;
861 res = RCode::ServFail;
862 }
863
864 dfepol = luaconfsLocal->dfe.getPostPolicy(ret);
865 switch(dfepol.d_kind) {
866 case DNSFilterEngine::PolicyKind::NoAction:
867 break;
868 case DNSFilterEngine::PolicyKind::Drop:
869 g_stats.policyDrops++;
870 delete dc;
871 dc=0;
872 return;
873 case DNSFilterEngine::PolicyKind::NXDOMAIN:
874 ret.clear();
875 res=RCode::NXDomain;
876 appliedPolicy=dfepol.d_name;
877 goto haveAnswer;
878
879 case DNSFilterEngine::PolicyKind::NODATA:
880 ret.clear();
881 res=RCode::NoError;
882 appliedPolicy=dfepol.d_name;
883 goto haveAnswer;
884
885 case DNSFilterEngine::PolicyKind::Truncate:
886 if(!dc->d_tcp) {
887 ret.clear();
888 res=RCode::NoError;
889 pw.getHeader()->tc=1;
890 appliedPolicy=dfepol.d_name;
891 goto haveAnswer;
892 }
893 break;
894
895 case DNSFilterEngine::PolicyKind::Custom:
896 ret.clear();
897 res=RCode::NoError;
898 spoofed.d_name=dc->d_mdp.d_qname;
899 spoofed.d_type=dfepol.d_custom->getType();
900 spoofed.d_ttl = dfepol.d_ttl;
901 spoofed.d_class = 1;
902 spoofed.d_content = dfepol.d_custom;
903 spoofed.d_place = DNSResourceRecord::ANSWER;
904 ret.push_back(spoofed);
905 appliedPolicy=dfepol.d_name;
906 goto haveAnswer;
907 }
908
909 if(t_pdl->get()) {
910 if(res == RCode::NoError) {
911 auto i=ret.cbegin();
912 for(; i!= ret.cend(); ++i)
913 if(i->d_type == dc->d_mdp.d_qtype && i->d_place == DNSResourceRecord::ANSWER)
914 break;
915 if(i == ret.cend())
916 (*t_pdl)->nodata(dc->d_remote, dc->d_local, dc->d_mdp.d_qname, QType(dc->d_mdp.d_qtype), dc->d_tcp, ret, res, &variableAnswer);
917 }
918 else if(res == RCode::NXDomain)
919 (*t_pdl)->nxdomain(dc->d_remote, dc->d_local, dc->d_mdp.d_qname, QType(dc->d_mdp.d_qtype), dc->d_tcp, ret, res, &variableAnswer);
920
921
922 (*t_pdl)->postresolve(dc->d_remote, dc->d_local, dc->d_mdp.d_qname, QType(dc->d_mdp.d_qtype), dc->d_tcp, ret, &appliedPolicy, &dc->d_policyTags, res, &variableAnswer);
923 }
924 }
925 haveAnswer:;
926 if(res == PolicyDecision::DROP) {
927 g_stats.policyDrops++;
928 delete dc;
929 dc=0;
930 return;
931 }
932 if(tracedQuery || res == PolicyDecision::PASS || res == RCode::ServFail || pw.getHeader()->rcode == RCode::ServFail)
933 {
934 string trace(sr.getTrace());
935 if(!trace.empty()) {
936 vector<string> lines;
937 boost::split(lines, trace, boost::is_any_of("\n"));
938 for(const string& line : lines) {
939 if(!line.empty())
940 L<<Logger::Warning<< line << endl;
941 }
942 }
943 }
944
945 if(res == PolicyDecision::PASS) { // XXX what does this MEAN? Why servfail on PASS?
946 pw.getHeader()->rcode=RCode::ServFail;
947 // no commit here, because no record
948 g_stats.servFails++;
949 }
950 else {
951 pw.getHeader()->rcode=res;
952
953 // Does the validation mode or query demand validation?
954 if(g_dnssecmode == DNSSECMode::ValidateAll || g_dnssecmode==DNSSECMode::ValidateForLog || (dc->d_mdp.d_header.ad && g_dnssecmode==DNSSECMode::Process)) {
955 try {
956 if(sr.doLog()) {
957 L<<Logger::Warning<<"Starting validation of answer to "<<dc->d_mdp.d_qname<<" for "<<dc->d_remote.toStringWithPort()<<endl;
958 }
959
960 auto state=validateRecords(ret);
961 if(state == Secure) {
962 if(sr.doLog()) {
963 L<<Logger::Warning<<"Answer to "<<dc->d_mdp.d_qname<<" for "<<dc->d_remote.toStringWithPort()<<" validates correctly"<<endl;
964 }
965
966 // Is the query source interested in the value of the ad-bit?
967 if (dc->d_mdp.d_header.ad)
968 pw.getHeader()->ad=1;
969 }
970 else if(state == Insecure) {
971 if(sr.doLog()) {
972 L<<Logger::Warning<<"Answer to "<<dc->d_mdp.d_qname<<" for "<<dc->d_remote.toStringWithPort()<<" validates as Insecure"<<endl;
973 }
974
975 pw.getHeader()->ad=0;
976 }
977 else if(state == Bogus) {
978 if(sr.doLog() || g_dnssecmode == DNSSECMode::ValidateForLog) {
979 L<<Logger::Warning<<"Answer to "<<dc->d_mdp.d_qname<<" for "<<dc->d_remote.toStringWithPort()<<" validates as Bogus"<<endl;
980 }
981
982 // Does the query or validation mode sending out a SERVFAIL on validation errors?
983 if(!pw.getHeader()->cd && (g_dnssecmode == DNSSECMode::ValidateAll || dc->d_mdp.d_header.ad)) {
984 if(sr.doLog()) {
985 L<<Logger::Warning<<"Sending out SERVFAIL for "<<dc->d_mdp.d_qname<<" because recursor or query demands it for Bogus results"<<endl;
986 }
987
988 pw.getHeader()->rcode=RCode::ServFail;
989 goto sendit;
990 } else {
991 if(sr.doLog()) {
992 L<<Logger::Warning<<"Not sending out SERVFAIL for "<<dc->d_mdp.d_qname<<" Bogus validation since neither config nor query demands this"<<endl;
993 }
994 }
995 }
996 }
997 catch(ImmediateServFailException &e) {
998 if(g_logCommonErrors)
999 L<<Logger::Notice<<"Sending SERVFAIL to "<<dc->getRemote()<<" during validation of '"<<dc->d_mdp.d_qname<<"' because: "<<e.reason<<endl;
1000 pw.getHeader()->rcode=RCode::ServFail;
1001 goto sendit;
1002 }
1003 }
1004
1005 if(ret.size()) {
1006 orderAndShuffle(ret);
1007 if(auto sl = luaconfsLocal->sortlist.getOrderCmp(dc->d_remote)) {
1008 sort(ret.begin(), ret.end(), *sl);
1009 variableAnswer=true;
1010 }
1011 }
1012 if(haveEDNS) {
1013 ret.push_back(makeOpt(edo.d_packetsize, 0, edo.d_Z));
1014 }
1015
1016 for(auto i=ret.cbegin(); i!=ret.cend(); ++i) {
1017 if(!DNSSECOK && (i->d_type == QType::RRSIG || i->d_type==QType::NSEC || i->d_type==QType::NSEC3))
1018 continue;
1019 pw.startRecord(i->d_name, i->d_type, i->d_ttl, i->d_class, i->d_place);
1020 if(i->d_type != QType::OPT) // their TTL ain't real
1021 minTTL = min(minTTL, i->d_ttl);
1022 i->d_content->toPacket(pw);
1023 if(pw.size() > maxanswersize) {
1024 pw.rollback();
1025 if(i->d_place==DNSResourceRecord::ANSWER) // only truncate if we actually omitted parts of the answer
1026 {
1027 pw.getHeader()->tc=1;
1028 pw.truncate();
1029 }
1030 goto sendit; // need to jump over pw.commit
1031 }
1032 #ifdef HAVE_PROTOBUF
1033 if(luaconfsLocal->protobufServer && protobufResponse && (i->d_type == QType::A || i->d_type == QType::AAAA)) {
1034 PBDNSMessage_DNSResponse_DNSRR* pbRR = protobufResponse->add_rrs();
1035 if(pbRR) {
1036 pbRR->set_name(i->d_name.toString());
1037 pbRR->set_type(i->d_type);
1038 pbRR->set_class_(i->d_class);
1039 pbRR->set_ttl(i->d_ttl);
1040 if (i->d_type == QType::A) {
1041 const ARecordContent& arc = dynamic_cast<const ARecordContent&>(*(i->d_content));
1042 ComboAddress data = arc.getCA();
1043 pbRR->set_rdata(&data.sin4.sin_addr.s_addr, sizeof(data.sin4.sin_addr.s_addr));
1044 }
1045 else if (i->d_type == QType::AAAA) {
1046 const AAAARecordContent& arc = dynamic_cast<const AAAARecordContent&>(*(i->d_content));
1047 ComboAddress data = arc.getCA();
1048 pbRR->set_rdata(&data.sin6.sin6_addr.s6_addr, sizeof(data.sin6.sin6_addr.s6_addr));
1049 }
1050 }
1051 }
1052 #endif
1053 }
1054 if(ret.size())
1055 pw.commit();
1056 }
1057 sendit:;
1058
1059 g_rs.submitResponse(dc->d_mdp.d_qtype, packet.size(), !dc->d_tcp);
1060 updateResponseStats(res, dc->d_remote, packet.size(), &dc->d_mdp.d_qname, dc->d_mdp.d_qtype);
1061 #ifdef HAVE_PROTOBUF
1062 if (luaconfsLocal->protobufServer) {
1063 if (protobufResponse) {
1064 protobufResponse->set_rcode(pw.getHeader()->rcode);
1065 if (!appliedPolicy.empty()) {
1066 protobufResponse->set_appliedpolicy(appliedPolicy);
1067 }
1068 if (!dc->d_policyTags.empty()) {
1069 for(const auto tag : dc->d_policyTags) {
1070 protobufResponse->add_tags(tag);
1071 }
1072 }
1073 }
1074 protobufFillResponseFromDC(pbMessage, dc, packet.size());
1075 protobufLogResponse(luaconfsLocal->protobufServer, pbMessage);
1076 }
1077 #endif
1078 if(!dc->d_tcp) {
1079 struct msghdr msgh;
1080 struct iovec iov;
1081 char cbuf[256];
1082 fillMSGHdr(&msgh, &iov, cbuf, 0, (char*)&*packet.begin(), packet.size(), &dc->d_remote);
1083 msgh.msg_control=NULL;
1084
1085 if(g_fromtosockets.count(dc->d_socket)) {
1086 addCMsgSrcAddr(&msgh, cbuf, &dc->d_local, 0);
1087 }
1088 if(sendmsg(dc->d_socket, &msgh, 0) < 0 && g_logCommonErrors)
1089 L<<Logger::Warning<<"Sending UDP reply to client "<<dc->d_remote.toStringWithPort()<<" failed with: "<<strerror(errno)<<endl;
1090 if(!SyncRes::s_nopacketcache && !variableAnswer && !sr.wasVariable() ) {
1091 #ifdef HAVE_PROTOBUF
1092 if (luaconfsLocal->protobufServer) {
1093 t_packetCache->insertResponsePacket(dc->d_tag, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_query,
1094 string((const char*)&*packet.begin(), packet.size()),
1095 g_now.tv_sec,
1096 pw.getHeader()->rcode == RCode::ServFail ? SyncRes::s_packetcacheservfailttl :
1097 min(minTTL,SyncRes::s_packetcachettl),
1098 &pbMessage);
1099 }
1100 else {
1101 #endif
1102 t_packetCache->insertResponsePacket(dc->d_tag, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_query,
1103 string((const char*)&*packet.begin(), packet.size()),
1104 g_now.tv_sec,
1105 pw.getHeader()->rcode == RCode::ServFail ? SyncRes::s_packetcacheservfailttl :
1106 min(minTTL,SyncRes::s_packetcachettl));
1107 #ifdef HAVE_PROTOBUF
1108 }
1109 #endif
1110 }
1111 // else cerr<<"Not putting in packet cache: "<<sr.wasVariable()<<endl;
1112 }
1113 else {
1114 char buf[2];
1115 buf[0]=packet.size()/256;
1116 buf[1]=packet.size()%256;
1117
1118 Utility::iovec iov[2];
1119
1120 iov[0].iov_base=(void*)buf; iov[0].iov_len=2;
1121 iov[1].iov_base=(void*)&*packet.begin(); iov[1].iov_len = packet.size();
1122
1123 int ret=Utility::writev(dc->d_socket, iov, 2);
1124 bool hadError=true;
1125
1126 if(ret == 0)
1127 L<<Logger::Error<<"EOF writing TCP answer to "<<dc->getRemote()<<endl;
1128 else if(ret < 0 )
1129 L<<Logger::Error<<"Error writing TCP answer to "<<dc->getRemote()<<": "<< strerror(errno) <<endl;
1130 else if((unsigned int)ret != 2 + packet.size())
1131 L<<Logger::Error<<"Oops, partial answer sent to "<<dc->getRemote()<<" for "<<dc->d_mdp.d_qname<<" (size="<< (2 + packet.size()) <<", sent "<<ret<<")"<<endl;
1132 else
1133 hadError=false;
1134
1135 // update tcp connection status, either by closing or moving to 'BYTE0'
1136
1137 if(hadError) {
1138 // no need to remove us from FDM, we weren't there
1139 dc->d_socket = -1;
1140 }
1141 else {
1142 dc->d_tcpConnection->state=TCPConnection::BYTE0;
1143 Utility::gettimeofday(&g_now, 0); // needs to be updated
1144 t_fdm->addReadFD(dc->d_socket, handleRunningTCPQuestion, dc->d_tcpConnection);
1145 t_fdm->setReadTTD(dc->d_socket, g_now, g_tcpTimeout);
1146 }
1147 }
1148
1149 if(!g_quiet) {
1150 L<<Logger::Error<<t_id<<" ["<<MT->getTid()<<"/"<<MT->numProcesses()<<"] answer to "<<(dc->d_mdp.d_header.rd?"":"non-rd ")<<"question '"<<dc->d_mdp.d_qname<<"|"<<DNSRecordContent::NumberToType(dc->d_mdp.d_qtype);
1151 L<<"': "<<ntohs(pw.getHeader()->ancount)<<" answers, "<<ntohs(pw.getHeader()->arcount)<<" additional, took "<<sr.d_outqueries<<" packets, "<<
1152 sr.d_totUsec/1000.0<<" ms, "<<
1153 sr.d_throttledqueries<<" throttled, "<<sr.d_timeouts<<" timeouts, "<<sr.d_tcpoutqueries<<" tcp connections, rcode="<<res<<endl;
1154 }
1155
1156 sr.d_outqueries ? t_RC->cacheMisses++ : t_RC->cacheHits++;
1157 float spent=makeFloat(sr.d_now-dc->d_now);
1158 if(spent < 0.001)
1159 g_stats.answers0_1++;
1160 else if(spent < 0.010)
1161 g_stats.answers1_10++;
1162 else if(spent < 0.1)
1163 g_stats.answers10_100++;
1164 else if(spent < 1.0)
1165 g_stats.answers100_1000++;
1166 else
1167 g_stats.answersSlow++;
1168
1169 uint64_t newLat=(uint64_t)(spent*1000000);
1170 newLat = min(newLat,(uint64_t)(((uint64_t) g_networkTimeoutMsec)*1000)); // outliers of several minutes exist..
1171 g_stats.avgLatencyUsec=(1-1.0/g_latencyStatSize)*g_stats.avgLatencyUsec + (float)newLat/g_latencyStatSize;
1172 // no worries, we do this for packet cache hits elsewhere
1173 // cout<<dc->d_mdp.d_qname<<"\t"<<MT->getUsec()<<"\t"<<sr.d_outqueries<<endl;
1174 delete dc;
1175 dc=0;
1176 }
1177 catch(PDNSException &ae) {
1178 L<<Logger::Error<<"startDoResolve problem "<<makeLoginfo(dc)<<": "<<ae.reason<<endl;
1179 delete dc;
1180 }
1181 catch(MOADNSException& e) {
1182 L<<Logger::Error<<"DNS parser error "<<makeLoginfo(dc) <<": "<<dc->d_mdp.d_qname<<", "<<e.what()<<endl;
1183 delete dc;
1184 }
1185 catch(std::exception& e) {
1186 L<<Logger::Error<<"STL error "<< makeLoginfo(dc)<<": "<<e.what()<<endl;
1187 delete dc;
1188 }
1189 catch(...) {
1190 L<<Logger::Error<<"Any other exception in a resolver context "<< makeLoginfo(dc) <<endl;
1191 }
1192
1193 g_stats.maxMThreadStackUsage = max(MT->getMaxStackUsage(), g_stats.maxMThreadStackUsage);
1194 }
1195
1196 void makeControlChannelSocket(int processNum=-1)
1197 {
1198 string sockname=::arg()["socket-dir"]+"/"+s_programname;
1199 if(processNum >= 0)
1200 sockname += "."+std::to_string(processNum);
1201 sockname+=".controlsocket";
1202 s_rcc.listen(sockname);
1203
1204 int sockowner = -1;
1205 int sockgroup = -1;
1206
1207 if (!::arg().isEmpty("socket-group"))
1208 sockgroup=::arg().asGid("socket-group");
1209 if (!::arg().isEmpty("socket-owner"))
1210 sockowner=::arg().asUid("socket-owner");
1211
1212 if (sockgroup > -1 || sockowner > -1) {
1213 if(chown(sockname.c_str(), sockowner, sockgroup) < 0) {
1214 unixDie("Failed to chown control socket");
1215 }
1216 }
1217
1218 // do mode change if socket-mode is given
1219 if(!::arg().isEmpty("socket-mode")) {
1220 mode_t sockmode=::arg().asMode("socket-mode");
1221 if(chmod(sockname.c_str(), sockmode) < 0) {
1222 unixDie("Failed to chmod control socket");
1223 }
1224 }
1225 }
1226
1227 static void getQNameAndSubnet(const std::string& question, DNSName* dnsname, uint16_t* qtype, uint16_t* qclass, Netmask* ednssubnet)
1228 {
1229 const struct dnsheader* dh = (struct dnsheader*)question.c_str();
1230 size_t questionLen = question.length();
1231 unsigned int consumed=0;
1232 *dnsname=DNSName(question.c_str(), questionLen, sizeof(dnsheader), false, qtype, qclass, &consumed);
1233
1234 size_t pos= sizeof(dnsheader)+consumed+4;
1235 /* at least OPT root label (1), type (2), class (2) and ttl (4) + OPT RR rdlen (2)
1236 = 11 */
1237 if(ntohs(dh->arcount) == 1 && questionLen > pos + 11) { // this code can extract one (1) EDNS Subnet option
1238 /* OPT root label (1) followed by type (2) */
1239 if(question.at(pos)==0 && question.at(pos+1)==0 && question.at(pos+2)==QType::OPT) {
1240 char* ecsStart = nullptr;
1241 size_t ecsLen = 0;
1242 int res = getEDNSOption((char*)question.c_str()+pos+9, questionLen - pos - 9, EDNSOptionCode::ECS, &ecsStart, &ecsLen);
1243 if (res == 0 && ecsLen > 4) {
1244 EDNSSubnetOpts eso;
1245 if(getEDNSSubnetOptsFromString(ecsStart + 4, ecsLen - 4, &eso)) {
1246 *ednssubnet=eso.source;
1247 }
1248 }
1249 }
1250 }
1251 }
1252
1253 void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var)
1254 {
1255 shared_ptr<TCPConnection> conn=any_cast<shared_ptr<TCPConnection> >(var);
1256
1257 if(conn->state==TCPConnection::BYTE0) {
1258 ssize_t bytes=recv(conn->getFD(), conn->data, 2, 0);
1259 if(bytes==1)
1260 conn->state=TCPConnection::BYTE1;
1261 if(bytes==2) {
1262 conn->qlen=(((unsigned char)conn->data[0]) << 8)+ (unsigned char)conn->data[1];
1263 conn->bytesread=0;
1264 conn->state=TCPConnection::GETQUESTION;
1265 }
1266 if(!bytes || bytes < 0) {
1267 t_fdm->removeReadFD(fd);
1268 return;
1269 }
1270 }
1271 else if(conn->state==TCPConnection::BYTE1) {
1272 ssize_t bytes=recv(conn->getFD(), conn->data+1, 1, 0);
1273 if(bytes==1) {
1274 conn->state=TCPConnection::GETQUESTION;
1275 conn->qlen=(((unsigned char)conn->data[0]) << 8)+ (unsigned char)conn->data[1];
1276 conn->bytesread=0;
1277 }
1278 if(!bytes || bytes < 0) {
1279 if(g_logCommonErrors)
1280 L<<Logger::Error<<"TCP client "<< conn->d_remote.toString() <<" disconnected after first byte"<<endl;
1281 t_fdm->removeReadFD(fd);
1282 return;
1283 }
1284 }
1285 else if(conn->state==TCPConnection::GETQUESTION) {
1286 ssize_t bytes=recv(conn->getFD(), conn->data + conn->bytesread, conn->qlen - conn->bytesread, 0);
1287 if(!bytes || bytes < 0 || bytes > std::numeric_limits<std::uint16_t>::max()) {
1288 L<<Logger::Error<<"TCP client "<< conn->d_remote.toString() <<" disconnected while reading question body"<<endl;
1289 t_fdm->removeReadFD(fd);
1290 return;
1291 }
1292 conn->bytesread+=(uint16_t)bytes;
1293 if(conn->bytesread==conn->qlen) {
1294 t_fdm->removeReadFD(fd); // should no longer awake ourselves when there is data to read
1295
1296 DNSComboWriter* dc=0;
1297 try {
1298 dc=new DNSComboWriter(conn->data, conn->qlen, g_now);
1299 }
1300 catch(MOADNSException &mde) {
1301 g_stats.clientParseError++;
1302 if(g_logCommonErrors)
1303 L<<Logger::Error<<"Unable to parse packet from TCP client "<< conn->d_remote.toString() <<endl;
1304 return;
1305 }
1306 dc->d_tcpConnection = conn; // carry the torch
1307 dc->setSocket(conn->getFD()); // this is the only time a copy is made of the actual fd
1308 dc->d_tcp=true;
1309 dc->setRemote(&conn->d_remote);
1310 ComboAddress dest;
1311 memset(&dest, 0, sizeof(dest));
1312 dest.sin4.sin_family = conn->d_remote.sin4.sin_family;
1313 socklen_t len = dest.getSocklen();
1314 getsockname(conn->getFD(), (sockaddr*)&dest, &len); // if this fails, we're ok with it
1315 dc->setLocal(dest);
1316 #ifdef HAVE_PROTOBUF
1317 auto luaconfsLocal = g_luaconfs.getLocal();
1318
1319 if(luaconfsLocal->protobufServer) {
1320 dc->d_uuid = (*t_uuidGenerator)();
1321
1322 try {
1323 DNSName qname;
1324 uint16_t qtype;
1325 uint16_t qclass;
1326 Netmask ednssubnet;
1327 const struct dnsheader* dh = (const struct dnsheader*) conn->data;
1328
1329 getQNameAndSubnet(std::string(conn->data, conn->qlen), &qname, &qtype, &qclass, &ednssubnet);
1330 dc->d_ednssubnet = ednssubnet;
1331
1332 protobufLogQuery(luaconfsLocal->protobufServer, dc->d_uuid, dest, conn->d_remote, ednssubnet, true, dh->id, conn->qlen, qname, qtype, qclass, std::string(), std::vector<std::string>());
1333 }
1334 catch(std::exception& e) {
1335 if(g_logCommonErrors)
1336 L<<Logger::Warning<<"Error parsing a TCP query packet for edns subnet: "<<e.what()<<endl;
1337 }
1338 }
1339 #endif
1340 if(dc->d_mdp.d_header.qr) {
1341 delete dc;
1342 g_stats.ignoredCount++;
1343 L<<Logger::Error<<"Ignoring answer from TCP client "<< conn->d_remote.toString() <<" on server socket!"<<endl;
1344 return;
1345 }
1346 if(dc->d_mdp.d_header.opcode) {
1347 delete dc;
1348 g_stats.ignoredCount++;
1349 L<<Logger::Error<<"Ignoring non-query opcode from TCP client "<< conn->d_remote.toString() <<" on server socket!"<<endl;
1350 return;
1351 }
1352 else {
1353 ++g_stats.qcounter;
1354 ++g_stats.tcpqcounter;
1355 MT->makeThread(startDoResolve, dc); // deletes dc, will set state to BYTE0 again
1356 return;
1357 }
1358 }
1359 }
1360 }
1361
1362 //! Handle new incoming TCP connection
1363 void handleNewTCPQuestion(int fd, FDMultiplexer::funcparam_t& )
1364 {
1365 ComboAddress addr;
1366 socklen_t addrlen=sizeof(addr);
1367 int newsock=accept(fd, (struct sockaddr*)&addr, &addrlen);
1368 if(newsock>=0) {
1369 if(MT->numProcesses() > g_maxMThreads) {
1370 g_stats.overCapacityDrops++;
1371 closesocket(newsock);
1372 return;
1373 }
1374
1375 if(t_remotes)
1376 t_remotes->push_back(addr);
1377 if(t_allowFrom && !t_allowFrom->match(&addr)) {
1378 if(!g_quiet)
1379 L<<Logger::Error<<"["<<MT->getTid()<<"] dropping TCP query from "<<addr.toString()<<", address not matched by allow-from"<<endl;
1380
1381 g_stats.unauthorizedTCP++;
1382 closesocket(newsock);
1383 return;
1384 }
1385 if(g_maxTCPPerClient && t_tcpClientCounts->count(addr) && (*t_tcpClientCounts)[addr] >= g_maxTCPPerClient) {
1386 g_stats.tcpClientOverflow++;
1387 closesocket(newsock); // don't call TCPConnection::closeAndCleanup here - did not enter it in the counts yet!
1388 return;
1389 }
1390
1391 setNonBlocking(newsock);
1392 shared_ptr<TCPConnection> tc(new TCPConnection(newsock, addr));
1393 tc->state=TCPConnection::BYTE0;
1394
1395 t_fdm->addReadFD(tc->getFD(), handleRunningTCPQuestion, tc);
1396
1397 struct timeval now;
1398 Utility::gettimeofday(&now, 0);
1399 t_fdm->setReadTTD(tc->getFD(), now, g_tcpTimeout);
1400 }
1401 }
1402
1403 string* doProcessUDPQuestion(const std::string& question, const ComboAddress& fromaddr, const ComboAddress& destaddr, struct timeval tv, int fd)
1404 {
1405 gettimeofday(&g_now, 0);
1406 struct timeval diff = g_now - tv;
1407 double delta=(diff.tv_sec*1000 + diff.tv_usec/1000.0);
1408
1409 if(tv.tv_sec && delta > 1000.0) {
1410 g_stats.tooOldDrops++;
1411 return 0;
1412 }
1413
1414 ++g_stats.qcounter;
1415 if(fromaddr.sin4.sin_family==AF_INET6)
1416 g_stats.ipv6qcounter++;
1417
1418 string response;
1419 const struct dnsheader* dh = (struct dnsheader*)question.c_str();
1420 unsigned int ctag=0;
1421 bool needECS = false;
1422 std::vector<std::string> policyTags;
1423 #ifdef HAVE_PROTOBUF
1424 boost::uuids::uuid uniqueId;
1425 PBDNSMessage pbMessage;
1426 auto luaconfsLocal = g_luaconfs.getLocal();
1427 if (luaconfsLocal->protobufServer) {
1428 needECS = true;
1429 uniqueId = (*t_uuidGenerator)();
1430 }
1431 #endif
1432 Netmask ednssubnet;
1433 try {
1434 DNSName qname;
1435 uint16_t qtype=0;
1436 uint16_t qclass=0;
1437 uint32_t age;
1438 #ifdef MALLOC_TRACE
1439 /*
1440 static uint64_t last=0;
1441 if(!last)
1442 g_mtracer->clearAllocators();
1443 cout<<g_mtracer->getAllocs()-last<<" "<<g_mtracer->getNumOut()<<" -- BEGIN TRACE"<<endl;
1444 last=g_mtracer->getAllocs();
1445 cout<<g_mtracer->topAllocatorsString()<<endl;
1446 g_mtracer->clearAllocators();
1447 */
1448 #endif
1449
1450 if(needECS || (t_pdl->get() && (*t_pdl)->d_gettag)) {
1451 try {
1452 getQNameAndSubnet(question, &qname, &qtype, &qclass, &ednssubnet);
1453
1454 if(t_pdl->get() && (*t_pdl)->d_gettag) {
1455 try {
1456 ctag=(*t_pdl)->gettag(fromaddr, ednssubnet, destaddr, qname, qtype, &policyTags);
1457 }
1458 catch(std::exception& e) {
1459 if(g_logCommonErrors)
1460 L<<Logger::Warning<<"Error parsing a query packet qname='"<<qname<<"' for tag determination, setting tag=0: "<<e.what()<<endl;
1461 }
1462 }
1463 }
1464 catch(std::exception& e)
1465 {
1466 if(g_logCommonErrors)
1467 L<<Logger::Warning<<"Error parsing a query packet for tag determination, setting tag=0: "<<e.what()<<endl;
1468 }
1469 }
1470
1471 bool cacheHit = false;
1472 #ifdef HAVE_PROTOBUF
1473 if(luaconfsLocal->protobufServer) {
1474 protobufLogQuery(luaconfsLocal->protobufServer, uniqueId, fromaddr, destaddr, ednssubnet, false, dh->id, question.size(), qname, qtype, qclass, std::string(), policyTags);
1475
1476 cacheHit = (!SyncRes::s_nopacketcache && t_packetCache->getResponsePacket(ctag, question, g_now.tv_sec, &response, &age, &pbMessage));
1477 if (cacheHit) {
1478 protobufUpdateMessage(pbMessage, uniqueId, fromaddr, destaddr, ednssubnet, false, dh->id);
1479 protobufLogResponse(luaconfsLocal->protobufServer, pbMessage);
1480 }
1481 }
1482 else {
1483 #endif
1484 cacheHit = (!SyncRes::s_nopacketcache && t_packetCache->getResponsePacket(ctag, question, g_now.tv_sec, &response, &age));
1485 #ifdef HAVE_PROTOBUF
1486 }
1487 #endif
1488 if (cacheHit) {
1489 if(!g_quiet)
1490 L<<Logger::Notice<<t_id<< " question answered from packet cache tag="<<ctag<<" from "<<fromaddr.toString()<<endl;
1491
1492 g_stats.packetCacheHits++;
1493 SyncRes::s_queries++;
1494 ageDNSPacket(response, age);
1495 struct msghdr msgh;
1496 struct iovec iov;
1497 char cbuf[256];
1498 fillMSGHdr(&msgh, &iov, cbuf, 0, (char*)response.c_str(), response.length(), const_cast<ComboAddress*>(&fromaddr));
1499 msgh.msg_control=NULL;
1500
1501 if(g_fromtosockets.count(fd)) {
1502 addCMsgSrcAddr(&msgh, cbuf, &destaddr, 0);
1503 }
1504 if(sendmsg(fd, &msgh, 0) < 0 && g_logCommonErrors)
1505 L<<Logger::Warning<<"Sending UDP reply to client "<<fromaddr.toStringWithPort()<<" failed with: "<<strerror(errno)<<endl;
1506
1507 if(response.length() >= sizeof(struct dnsheader)) {
1508 struct dnsheader dh;
1509 memcpy(&dh, response.c_str(), sizeof(dh));
1510 updateResponseStats(dh.rcode, fromaddr, response.length(), 0, 0);
1511 }
1512 g_stats.avgLatencyUsec=(1-1.0/g_latencyStatSize)*g_stats.avgLatencyUsec + 0.0; // we assume 0 usec
1513 return 0;
1514 }
1515 }
1516 catch(std::exception& e) {
1517 L<<Logger::Error<<"Error processing or aging answer packet: "<<e.what()<<endl;
1518 return 0;
1519 }
1520
1521 if(t_pdl->get()) {
1522 if((*t_pdl)->ipfilter(fromaddr, destaddr, *dh)) {
1523 if(!g_quiet)
1524 L<<Logger::Notice<<t_id<<" ["<<MT->getTid()<<"/"<<MT->numProcesses()<<"] DROPPED question from "<<fromaddr.toStringWithPort()<<" based on policy"<<endl;
1525 g_stats.policyDrops++;
1526 return 0;
1527 }
1528 }
1529
1530 if(MT->numProcesses() > g_maxMThreads) {
1531 if(!g_quiet)
1532 L<<Logger::Notice<<t_id<<" ["<<MT->getTid()<<"/"<<MT->numProcesses()<<"] DROPPED question from "<<fromaddr.toStringWithPort()<<", over capacity"<<endl;
1533
1534 g_stats.overCapacityDrops++;
1535 return 0;
1536 }
1537
1538 DNSComboWriter* dc = new DNSComboWriter(question.c_str(), question.size(), g_now);
1539 dc->setSocket(fd);
1540 dc->d_tag=ctag;
1541 dc->d_query = question;
1542 dc->setRemote(&fromaddr);
1543 dc->setLocal(destaddr);
1544 dc->d_tcp=false;
1545 dc->d_policyTags = policyTags;
1546 #ifdef HAVE_PROTOBUF
1547 dc->d_uuid = uniqueId;
1548 dc->d_ednssubnet = ednssubnet;
1549 #endif
1550
1551 MT->makeThread(startDoResolve, (void*) dc); // deletes dc
1552 return 0;
1553 }
1554
1555
1556 void handleNewUDPQuestion(int fd, FDMultiplexer::funcparam_t& var)
1557 {
1558 ssize_t len;
1559 char data[1500];
1560 ComboAddress fromaddr;
1561 struct msghdr msgh;
1562 struct iovec iov;
1563 char cbuf[256];
1564
1565 fromaddr.sin6.sin6_family=AF_INET6; // this makes sure fromaddr is big enough
1566 fillMSGHdr(&msgh, &iov, cbuf, sizeof(cbuf), data, sizeof(data), &fromaddr);
1567
1568 for(;;)
1569 if((len=recvmsg(fd, &msgh, 0)) >= 0) {
1570 if(t_remotes)
1571 t_remotes->push_back(fromaddr);
1572
1573 if(t_allowFrom && !t_allowFrom->match(&fromaddr)) {
1574 if(!g_quiet)
1575 L<<Logger::Error<<"["<<MT->getTid()<<"] dropping UDP query from "<<fromaddr.toString()<<", address not matched by allow-from"<<endl;
1576
1577 g_stats.unauthorizedUDP++;
1578 return;
1579 }
1580 BOOST_STATIC_ASSERT(offsetof(sockaddr_in, sin_port) == offsetof(sockaddr_in6, sin6_port));
1581 if(!fromaddr.sin4.sin_port) { // also works for IPv6
1582 if(!g_quiet)
1583 L<<Logger::Error<<"["<<MT->getTid()<<"] dropping UDP query from "<<fromaddr.toStringWithPort()<<", can't deal with port 0"<<endl;
1584
1585 g_stats.clientParseError++; // not quite the best place to put it, but needs to go somewhere
1586 return;
1587 }
1588 try {
1589 dnsheader* dh=(dnsheader*)data;
1590
1591 if(dh->qr) {
1592 g_stats.ignoredCount++;
1593 if(g_logCommonErrors)
1594 L<<Logger::Error<<"Ignoring answer from "<<fromaddr.toString()<<" on server socket!"<<endl;
1595 }
1596 else if(dh->opcode) {
1597 g_stats.ignoredCount++;
1598 if(g_logCommonErrors)
1599 L<<Logger::Error<<"Ignoring non-query opcode "<<dh->opcode<<" from "<<fromaddr.toString()<<" on server socket!"<<endl;
1600 }
1601 else {
1602 string question(data, (size_t)len);
1603 struct timeval tv={0,0};
1604 HarvestTimestamp(&msgh, &tv);
1605 ComboAddress dest;
1606 memset(&dest, 0, sizeof(dest)); // this makes sure we igore this address if not returned by recvmsg above
1607 auto loc = rplookup(g_listenSocketsAddresses, fd);
1608 if(HarvestDestinationAddress(&msgh, &dest)) {
1609 // but.. need to get port too
1610 if(loc)
1611 dest.sin4.sin_port = loc->sin4.sin_port;
1612 }
1613 else {
1614 if(loc) {
1615 dest = *loc;
1616 }
1617 else {
1618 dest.sin4.sin_family = fromaddr.sin4.sin_family;
1619 socklen_t slen = dest.getSocklen();
1620 getsockname(fd, (sockaddr*)&dest, &slen); // if this fails, we're ok with it
1621 }
1622 }
1623 if(g_weDistributeQueries)
1624 distributeAsyncFunction(question, boost::bind(doProcessUDPQuestion, question, fromaddr, dest, tv, fd));
1625 else
1626 doProcessUDPQuestion(question, fromaddr, dest, tv, fd);
1627 }
1628 }
1629 catch(MOADNSException& mde) {
1630 g_stats.clientParseError++;
1631 if(g_logCommonErrors)
1632 L<<Logger::Error<<"Unable to parse packet from remote UDP client "<<fromaddr.toString() <<": "<<mde.what()<<endl;
1633 }
1634 catch(std::runtime_error& e) {
1635 g_stats.clientParseError++;
1636 if(g_logCommonErrors)
1637 L<<Logger::Error<<"Unable to parse packet from remote UDP client "<<fromaddr.toString() <<": "<<e.what()<<endl;
1638 }
1639 }
1640 else {
1641 // cerr<<t_id<<" had error: "<<stringerror()<<endl;
1642 if(errno == EAGAIN)
1643 g_stats.noPacketError++;
1644 break;
1645 }
1646 }
1647
1648
1649 typedef vector<pair<int, function< void(int, any&) > > > deferredAdd_t;
1650 deferredAdd_t deferredAdd;
1651
1652 void makeTCPServerSockets()
1653 {
1654 int fd;
1655 vector<string>locals;
1656 stringtok(locals,::arg()["local-address"]," ,");
1657
1658 if(locals.empty())
1659 throw PDNSException("No local address specified");
1660
1661 for(vector<string>::const_iterator i=locals.begin();i!=locals.end();++i) {
1662 ServiceTuple st;
1663 st.port=::arg().asNum("local-port");
1664 parseService(*i, st);
1665
1666 ComboAddress sin;
1667
1668 memset((char *)&sin,0, sizeof(sin));
1669 sin.sin4.sin_family = AF_INET;
1670 if(!IpToU32(st.host, (uint32_t*)&sin.sin4.sin_addr.s_addr)) {
1671 sin.sin6.sin6_family = AF_INET6;
1672 if(makeIPv6sockaddr(st.host, &sin.sin6) < 0)
1673 throw PDNSException("Unable to resolve local address for TCP server on '"+ st.host +"'");
1674 }
1675
1676 fd=socket(sin.sin6.sin6_family, SOCK_STREAM, 0);
1677 if(fd<0)
1678 throw PDNSException("Making a TCP server socket for resolver: "+stringerror());
1679
1680 setCloseOnExec(fd);
1681
1682 int tmp=1;
1683 if(setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,(char*)&tmp,sizeof tmp)<0) {
1684 L<<Logger::Error<<"Setsockopt failed for TCP listening socket"<<endl;
1685 exit(1);
1686 }
1687 if(sin.sin6.sin6_family == AF_INET6 && setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &tmp, sizeof(tmp)) < 0) {
1688 L<<Logger::Error<<"Failed to set IPv6 socket to IPv6 only, continuing anyhow: "<<strerror(errno)<<endl;
1689 }
1690
1691 #ifdef TCP_DEFER_ACCEPT
1692 if(setsockopt(fd, SOL_TCP,TCP_DEFER_ACCEPT,(char*)&tmp,sizeof tmp) >= 0) {
1693 if(i==locals.begin())
1694 L<<Logger::Error<<"Enabled TCP data-ready filter for (slight) DoS protection"<<endl;
1695 }
1696 #endif
1697
1698 if( ::arg().mustDo("non-local-bind") )
1699 Utility::setBindAny(AF_INET, fd);
1700
1701 #ifdef SO_REUSEPORT
1702 if(::arg().mustDo("reuseport")) {
1703 int one=1;
1704 if(setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &one, sizeof(one)) < 0)
1705 throw PDNSException("SO_REUSEPORT: "+stringerror());
1706 }
1707 #endif
1708
1709 sin.sin4.sin_port = htons(st.port);
1710 socklen_t socklen=sin.sin4.sin_family==AF_INET ? sizeof(sin.sin4) : sizeof(sin.sin6);
1711 if (::bind(fd, (struct sockaddr *)&sin, socklen )<0)
1712 throw PDNSException("Binding TCP server socket for "+ st.host +": "+stringerror());
1713
1714 setNonBlocking(fd);
1715 setSocketSendBuffer(fd, 65000);
1716 listen(fd, 128);
1717 deferredAdd.push_back(make_pair(fd, handleNewTCPQuestion));
1718 g_tcpListenSockets.push_back(fd);
1719 // we don't need to update g_listenSocketsAddresses since it doesn't work for TCP/IP:
1720 // - fd is not that which we know here, but returned from accept()
1721 if(sin.sin4.sin_family == AF_INET)
1722 L<<Logger::Error<<"Listening for TCP queries on "<< sin.toString() <<":"<<st.port<<endl;
1723 else
1724 L<<Logger::Error<<"Listening for TCP queries on ["<< sin.toString() <<"]:"<<st.port<<endl;
1725 }
1726 }
1727
1728 void makeUDPServerSockets()
1729 {
1730 int one=1;
1731 vector<string>locals;
1732 stringtok(locals,::arg()["local-address"]," ,");
1733
1734 if(locals.empty())
1735 throw PDNSException("No local address specified");
1736
1737 for(vector<string>::const_iterator i=locals.begin();i!=locals.end();++i) {
1738 ServiceTuple st;
1739 st.port=::arg().asNum("local-port");
1740 parseService(*i, st);
1741
1742 ComboAddress sin;
1743
1744 memset(&sin, 0, sizeof(sin));
1745 sin.sin4.sin_family = AF_INET;
1746 if(!IpToU32(st.host.c_str() , (uint32_t*)&sin.sin4.sin_addr.s_addr)) {
1747 sin.sin6.sin6_family = AF_INET6;
1748 if(makeIPv6sockaddr(st.host, &sin.sin6) < 0)
1749 throw PDNSException("Unable to resolve local address for UDP server on '"+ st.host +"'");
1750 }
1751
1752 int fd=socket(sin.sin4.sin_family, SOCK_DGRAM, 0);
1753 if(fd < 0) {
1754 throw PDNSException("Making a UDP server socket for resolver: "+netstringerror());
1755 }
1756 if (!setSocketTimestamps(fd))
1757 L<<Logger::Warning<<"Unable to enable timestamp reporting for socket"<<endl;
1758
1759 if(IsAnyAddress(sin)) {
1760 if(sin.sin4.sin_family == AF_INET)
1761 if(!setsockopt(fd, IPPROTO_IP, GEN_IP_PKTINFO, &one, sizeof(one))) // linux supports this, so why not - might fail on other systems
1762 g_fromtosockets.insert(fd);
1763 #ifdef IPV6_RECVPKTINFO
1764 if(sin.sin4.sin_family == AF_INET6)
1765 if(!setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, &one, sizeof(one)))
1766 g_fromtosockets.insert(fd);
1767 #endif
1768 if(sin.sin6.sin6_family == AF_INET6 && setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one)) < 0) {
1769 L<<Logger::Error<<"Failed to set IPv6 socket to IPv6 only, continuing anyhow: "<<strerror(errno)<<endl;
1770 }
1771 }
1772 if( ::arg().mustDo("non-local-bind") )
1773 Utility::setBindAny(AF_INET6, fd);
1774
1775 setCloseOnExec(fd);
1776
1777 setSocketReceiveBuffer(fd, 250000);
1778 sin.sin4.sin_port = htons(st.port);
1779
1780
1781 #ifdef SO_REUSEPORT
1782 if(::arg().mustDo("reuseport")) {
1783 int one=1;
1784 if(setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &one, sizeof(one)) < 0)
1785 throw PDNSException("SO_REUSEPORT: "+stringerror());
1786 }
1787 #endif
1788 socklen_t socklen=sin.getSocklen();
1789 if (::bind(fd, (struct sockaddr *)&sin, socklen)<0)
1790 throw PDNSException("Resolver binding to server socket on port "+ std::to_string(st.port) +" for "+ st.host+": "+stringerror());
1791
1792 setNonBlocking(fd);
1793
1794 deferredAdd.push_back(make_pair(fd, handleNewUDPQuestion));
1795 g_listenSocketsAddresses[fd]=sin; // this is written to only from the startup thread, not from the workers
1796 if(sin.sin4.sin_family == AF_INET)
1797 L<<Logger::Error<<"Listening for UDP queries on "<< sin.toString() <<":"<<st.port<<endl;
1798 else
1799 L<<Logger::Error<<"Listening for UDP queries on ["<< sin.toString() <<"]:"<<st.port<<endl;
1800 }
1801 }
1802
1803
1804 void daemonize(void)
1805 {
1806 if(fork())
1807 exit(0); // bye bye
1808
1809 setsid();
1810
1811 int i=open("/dev/null",O_RDWR); /* open stdin */
1812 if(i < 0)
1813 L<<Logger::Critical<<"Unable to open /dev/null: "<<stringerror()<<endl;
1814 else {
1815 dup2(i,0); /* stdin */
1816 dup2(i,1); /* stderr */
1817 dup2(i,2); /* stderr */
1818 close(i);
1819 }
1820 }
1821
1822 AtomicCounter counter;
1823 bool statsWanted;
1824
1825 void usr1Handler(int)
1826 {
1827 statsWanted=true;
1828 }
1829
1830 void usr2Handler(int)
1831 {
1832 g_quiet= !g_quiet;
1833 SyncRes::setDefaultLogMode(g_quiet ? SyncRes::LogNone : SyncRes::Log);
1834 ::arg().set("quiet")=g_quiet ? "" : "no";
1835 }
1836
1837 void doStats(void)
1838 {
1839 static time_t lastOutputTime;
1840 static uint64_t lastQueryCount;
1841
1842 uint64_t cacheHits = broadcastAccFunction<uint64_t>(pleaseGetCacheHits);
1843 uint64_t cacheMisses = broadcastAccFunction<uint64_t>(pleaseGetCacheMisses);
1844
1845 if(g_stats.qcounter && (cacheHits + cacheMisses) && SyncRes::s_queries && SyncRes::s_outqueries) {
1846 L<<Logger::Notice<<"stats: "<<g_stats.qcounter<<" questions, "<<
1847 broadcastAccFunction<uint64_t>(pleaseGetCacheSize)<< " cache entries, "<<
1848 broadcastAccFunction<uint64_t>(pleaseGetNegCacheSize)<<" negative entries, "<<
1849 (int)((cacheHits*100.0)/(cacheHits+cacheMisses))<<"% cache hits"<<endl;
1850
1851 L<<Logger::Notice<<"stats: throttle map: "
1852 << broadcastAccFunction<uint64_t>(pleaseGetThrottleSize) <<", ns speeds: "
1853 << broadcastAccFunction<uint64_t>(pleaseGetNsSpeedsSize)<<endl;
1854 L<<Logger::Notice<<"stats: outpacket/query ratio "<<(int)(SyncRes::s_outqueries*100.0/SyncRes::s_queries)<<"%";
1855 L<<Logger::Notice<<", "<<(int)(SyncRes::s_throttledqueries*100.0/(SyncRes::s_outqueries+SyncRes::s_throttledqueries))<<"% throttled, "
1856 <<SyncRes::s_nodelegated<<" no-delegation drops"<<endl;
1857 L<<Logger::Notice<<"stats: "<<SyncRes::s_tcpoutqueries<<" outgoing tcp connections, "<<
1858 broadcastAccFunction<uint64_t>(pleaseGetConcurrentQueries)<<" queries running, "<<SyncRes::s_outgoingtimeouts<<" outgoing timeouts"<<endl;
1859
1860 //L<<Logger::Notice<<"stats: "<<g_stats.ednsPingMatches<<" ping matches, "<<g_stats.ednsPingMismatches<<" mismatches, "<<
1861 //g_stats.noPingOutQueries<<" outqueries w/o ping, "<< g_stats.noEdnsOutQueries<<" w/o EDNS"<<endl;
1862
1863 L<<Logger::Notice<<"stats: " << broadcastAccFunction<uint64_t>(pleaseGetPacketCacheSize) <<
1864 " packet cache entries, "<<(int)(100.0*broadcastAccFunction<uint64_t>(pleaseGetPacketCacheHits)/SyncRes::s_queries) << "% packet cache hits"<<endl;
1865
1866 time_t now = time(0);
1867 if(lastOutputTime && lastQueryCount && now != lastOutputTime) {
1868 L<<Logger::Notice<<"stats: "<< (SyncRes::s_queries - lastQueryCount) / (now - lastOutputTime) <<" qps (average over "<< (now - lastOutputTime) << " seconds)"<<endl;
1869 }
1870 lastOutputTime = now;
1871 lastQueryCount = SyncRes::s_queries;
1872 }
1873 else if(statsWanted)
1874 L<<Logger::Notice<<"stats: no stats yet!"<<endl;
1875
1876 statsWanted=false;
1877 }
1878
1879 static void houseKeeping(void *)
1880 {
1881 static __thread time_t last_stat, last_rootupdate, last_prune, last_secpoll;
1882 static __thread int cleanCounter=0;
1883 static __thread bool s_running; // houseKeeping can get suspended in secpoll, and be restarted, which makes us do duplicate work
1884 try {
1885 if(s_running)
1886 return;
1887 s_running=true;
1888
1889 struct timeval now;
1890 Utility::gettimeofday(&now, 0);
1891
1892 if(now.tv_sec - last_prune > (time_t)(5 + t_id)) {
1893 DTime dt;
1894 dt.setTimeval(now);
1895 t_RC->doPrune(); // this function is local to a thread, so fine anyhow
1896 t_packetCache->doPruneTo(::arg().asNum("max-packetcache-entries") / g_numWorkerThreads);
1897
1898 pruneCollection(t_sstorage->negcache, ::arg().asNum("max-cache-entries") / (g_numWorkerThreads * 10), 200);
1899
1900 if(!((cleanCounter++)%40)) { // this is a full scan!
1901 time_t limit=now.tv_sec-300;
1902 for(SyncRes::nsspeeds_t::iterator i = t_sstorage->nsSpeeds.begin() ; i!= t_sstorage->nsSpeeds.end(); )
1903 if(i->second.stale(limit))
1904 t_sstorage->nsSpeeds.erase(i++);
1905 else
1906 ++i;
1907 }
1908 last_prune=time(0);
1909 }
1910
1911 if(now.tv_sec - last_rootupdate > 7200) {
1912 SyncRes sr(now);
1913 sr.setDoEDNS0(true);
1914 vector<DNSRecord> ret;
1915
1916 sr.setNoCache();
1917 int res=-1;
1918 try {
1919 res=sr.beginResolve(DNSName("."), QType(QType::NS), 1, ret);
1920 }
1921 catch(PDNSException& e)
1922 {
1923 L<<Logger::Error<<"Failed to update . records, got an exception: "<<e.reason<<endl;
1924 }
1925
1926 catch(std::exception& e)
1927 {
1928 L<<Logger::Error<<"Failed to update . records, got an exception: "<<e.what()<<endl;
1929 }
1930
1931 catch(...)
1932 {
1933 L<<Logger::Error<<"Failed to update . records, got an exception"<<endl;
1934 }
1935 if(!res) {
1936 L<<Logger::Notice<<"Refreshed . records"<<endl;
1937 last_rootupdate=now.tv_sec;
1938 }
1939 else
1940 L<<Logger::Error<<"Failed to update . records, RCODE="<<res<<endl;
1941 }
1942
1943 if(!t_id) {
1944 if(now.tv_sec - last_stat >= 1800) {
1945 doStats();
1946 last_stat=time(0);
1947 }
1948
1949 if(now.tv_sec - last_secpoll >= 3600) {
1950 try {
1951 doSecPoll(&last_secpoll);
1952 }
1953 catch(...) {}
1954 }
1955 }
1956 s_running=false;
1957 }
1958 catch(PDNSException& ae)
1959 {
1960 s_running=false;
1961 L<<Logger::Error<<"Fatal error in housekeeping thread: "<<ae.reason<<endl;
1962 throw;
1963 }
1964 }
1965
1966 void makeThreadPipes()
1967 {
1968 for(unsigned int n=0; n < g_numThreads; ++n) {
1969 struct ThreadPipeSet tps;
1970 int fd[2];
1971 if(pipe(fd) < 0)
1972 unixDie("Creating pipe for inter-thread communications");
1973
1974 tps.readToThread = fd[0];
1975 tps.writeToThread = fd[1];
1976
1977 if(pipe(fd) < 0)
1978 unixDie("Creating pipe for inter-thread communications");
1979 tps.readFromThread = fd[0];
1980 tps.writeFromThread = fd[1];
1981
1982 g_pipes.push_back(tps);
1983 }
1984 }
1985
1986 struct ThreadMSG
1987 {
1988 pipefunc_t func;
1989 bool wantAnswer;
1990 };
1991
1992 void broadcastFunction(const pipefunc_t& func, bool skipSelf)
1993 {
1994 unsigned int n = 0;
1995 for(ThreadPipeSet& tps : g_pipes)
1996 {
1997 if(n++ == t_id) {
1998 if(!skipSelf)
1999 func(); // don't write to ourselves!
2000 continue;
2001 }
2002
2003 ThreadMSG* tmsg = new ThreadMSG();
2004 tmsg->func = func;
2005 tmsg->wantAnswer = true;
2006 if(write(tps.writeToThread, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) {
2007 delete tmsg;
2008 unixDie("write to thread pipe returned wrong size or error");
2009 }
2010
2011 string* resp;
2012 if(read(tps.readFromThread, &resp, sizeof(resp)) != sizeof(resp))
2013 unixDie("read from thread pipe returned wrong size or error");
2014
2015 if(resp) {
2016 // cerr <<"got response: " << *resp << endl;
2017 delete resp;
2018 }
2019 }
2020 }
2021
2022 static uint32_t g_disthashseed;
2023 void distributeAsyncFunction(const string& packet, const pipefunc_t& func)
2024 {
2025 unsigned int hash = hashQuestion(packet.c_str(), packet.length(), g_disthashseed);
2026 unsigned int target = 1 + (hash % (g_pipes.size()-1));
2027
2028 if(target == t_id) {
2029 func();
2030 return;
2031 }
2032 ThreadPipeSet& tps = g_pipes[target];
2033 ThreadMSG* tmsg = new ThreadMSG();
2034 tmsg->func = func;
2035 tmsg->wantAnswer = false;
2036
2037 if(write(tps.writeToThread, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) {
2038 delete tmsg;
2039 unixDie("write to thread pipe returned wrong size or error");
2040 }
2041 }
2042
2043 void handlePipeRequest(int fd, FDMultiplexer::funcparam_t& var)
2044 {
2045 ThreadMSG* tmsg;
2046
2047 if(read(fd, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) { // fd == readToThread
2048 unixDie("read from thread pipe returned wrong size or error");
2049 }
2050
2051 void *resp=0;
2052 try {
2053 resp = tmsg->func();
2054 }
2055 catch(std::exception& e) {
2056 if(g_logCommonErrors)
2057 L<<Logger::Error<<"PIPE function we executed created exception: "<<e.what()<<endl; // but what if they wanted an answer.. we send 0
2058 }
2059 catch(PDNSException& e) {
2060 if(g_logCommonErrors)
2061 L<<Logger::Error<<"PIPE function we executed created PDNS exception: "<<e.reason<<endl; // but what if they wanted an answer.. we send 0
2062 }
2063 if(tmsg->wantAnswer)
2064 if(write(g_pipes[t_id].writeFromThread, &resp, sizeof(resp)) != sizeof(resp))
2065 unixDie("write to thread pipe returned wrong size or error");
2066
2067 delete tmsg;
2068 }
2069
2070 template<class T> void *voider(const boost::function<T*()>& func)
2071 {
2072 return func();
2073 }
2074
2075 vector<ComboAddress>& operator+=(vector<ComboAddress>&a, const vector<ComboAddress>& b)
2076 {
2077 a.insert(a.end(), b.begin(), b.end());
2078 return a;
2079 }
2080
2081 vector<pair<string, uint16_t> >& operator+=(vector<pair<string, uint16_t> >&a, const vector<pair<string, uint16_t> >& b)
2082 {
2083 a.insert(a.end(), b.begin(), b.end());
2084 return a;
2085 }
2086
2087 vector<pair<DNSName, uint16_t> >& operator+=(vector<pair<DNSName, uint16_t> >&a, const vector<pair<DNSName, uint16_t> >& b)
2088 {
2089 a.insert(a.end(), b.begin(), b.end());
2090 return a;
2091 }
2092
2093
2094 template<class T> T broadcastAccFunction(const boost::function<T*()>& func, bool skipSelf)
2095 {
2096 unsigned int n = 0;
2097 T ret=T();
2098 for(ThreadPipeSet& tps : g_pipes)
2099 {
2100 if(n++ == t_id) {
2101 if(!skipSelf) {
2102 T* resp = (T*)func(); // don't write to ourselves!
2103 if(resp) {
2104 //~ cerr <<"got direct: " << *resp << endl;
2105 ret += *resp;
2106 delete resp;
2107 }
2108 }
2109 continue;
2110 }
2111
2112 ThreadMSG* tmsg = new ThreadMSG();
2113 tmsg->func = boost::bind(voider<T>, func);
2114 tmsg->wantAnswer = true;
2115
2116 if(write(tps.writeToThread, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) {
2117 delete tmsg;
2118 unixDie("write to thread pipe returned wrong size or error");
2119 }
2120
2121 T* resp;
2122 if(read(tps.readFromThread, &resp, sizeof(resp)) != sizeof(resp))
2123 unixDie("read from thread pipe returned wrong size or error");
2124
2125 if(resp) {
2126 //~ cerr <<"got response: " << *resp << endl;
2127 ret += *resp;
2128 delete resp;
2129 }
2130 }
2131 return ret;
2132 }
2133
2134 template string broadcastAccFunction(const boost::function<string*()>& fun, bool skipSelf); // explicit instantiation
2135 template uint64_t broadcastAccFunction(const boost::function<uint64_t*()>& fun, bool skipSelf); // explicit instantiation
2136 template vector<ComboAddress> broadcastAccFunction(const boost::function<vector<ComboAddress> *()>& fun, bool skipSelf); // explicit instantiation
2137 template vector<pair<DNSName,uint16_t> > broadcastAccFunction(const boost::function<vector<pair<DNSName, uint16_t> > *()>& fun, bool skipSelf); // explicit instantiation
2138
2139 void handleRCC(int fd, FDMultiplexer::funcparam_t& var)
2140 {
2141 string remote;
2142 string msg=s_rcc.recv(&remote);
2143 RecursorControlParser rcp;
2144 RecursorControlParser::func_t* command;
2145
2146 string answer=rcp.getAnswer(msg, &command);
2147
2148 // If we are inside a chroot, we need to strip
2149 if (!arg()["chroot"].empty()) {
2150 size_t len = arg()["chroot"].length();
2151 remote = remote.substr(len);
2152 }
2153
2154 try {
2155 s_rcc.send(answer, &remote);
2156 command();
2157 }
2158 catch(std::exception& e) {
2159 L<<Logger::Error<<"Error dealing with control socket request: "<<e.what()<<endl;
2160 }
2161 catch(PDNSException& ae) {
2162 L<<Logger::Error<<"Error dealing with control socket request: "<<ae.reason<<endl;
2163 }
2164 }
2165
2166 void handleTCPClientReadable(int fd, FDMultiplexer::funcparam_t& var)
2167 {
2168 PacketID* pident=any_cast<PacketID>(&var);
2169 // cerr<<"handleTCPClientReadable called for fd "<<fd<<", pident->inNeeded: "<<pident->inNeeded<<", "<<pident->sock->getHandle()<<endl;
2170
2171 shared_array<char> buffer(new char[pident->inNeeded]);
2172
2173 ssize_t ret=recv(fd, buffer.get(), pident->inNeeded,0);
2174 if(ret > 0) {
2175 pident->inMSG.append(&buffer[0], &buffer[ret]);
2176 pident->inNeeded-=(size_t)ret;
2177 if(!pident->inNeeded || pident->inIncompleteOkay) {
2178 // cerr<<"Got entire load of "<<pident->inMSG.size()<<" bytes"<<endl;
2179 PacketID pid=*pident;
2180 string msg=pident->inMSG;
2181
2182 t_fdm->removeReadFD(fd);
2183 MT->sendEvent(pid, &msg);
2184 }
2185 else {
2186 // cerr<<"Still have "<<pident->inNeeded<<" left to go"<<endl;
2187 }
2188 }
2189 else {
2190 PacketID tmp=*pident;
2191 t_fdm->removeReadFD(fd); // pident might now be invalid (it isn't, but still)
2192 string empty;
2193 MT->sendEvent(tmp, &empty); // this conveys error status
2194 }
2195 }
2196
2197 void handleTCPClientWritable(int fd, FDMultiplexer::funcparam_t& var)
2198 {
2199 PacketID* pid=any_cast<PacketID>(&var);
2200 ssize_t ret=send(fd, pid->outMSG.c_str() + pid->outPos, pid->outMSG.size() - pid->outPos,0);
2201 if(ret > 0) {
2202 pid->outPos+=(ssize_t)ret;
2203 if(pid->outPos==pid->outMSG.size()) {
2204 PacketID tmp=*pid;
2205 t_fdm->removeWriteFD(fd);
2206 MT->sendEvent(tmp, &tmp.outMSG); // send back what we sent to convey everything is ok
2207 }
2208 }
2209 else { // error or EOF
2210 PacketID tmp(*pid);
2211 t_fdm->removeWriteFD(fd);
2212 string sent;
2213 MT->sendEvent(tmp, &sent); // we convey error status by sending empty string
2214 }
2215 }
2216
2217 // resend event to everybody chained onto it
2218 void doResends(MT_t::waiters_t::iterator& iter, PacketID resend, const string& content)
2219 {
2220 if(iter->key.chain.empty())
2221 return;
2222 // cerr<<"doResends called!\n";
2223 for(PacketID::chain_t::iterator i=iter->key.chain.begin(); i != iter->key.chain.end() ; ++i) {
2224 resend.fd=-1;
2225 resend.id=*i;
2226 // cerr<<"\tResending "<<content.size()<<" bytes for fd="<<resend.fd<<" and id="<<resend.id<<endl;
2227
2228 MT->sendEvent(resend, &content);
2229 g_stats.chainResends++;
2230 }
2231 }
2232
2233 void handleUDPServerResponse(int fd, FDMultiplexer::funcparam_t& var)
2234 {
2235 PacketID pid=any_cast<PacketID>(var);
2236 ssize_t len;
2237 char data[g_outgoingEDNSBufsize];
2238 ComboAddress fromaddr;
2239 socklen_t addrlen=sizeof(fromaddr);
2240
2241 len=recvfrom(fd, data, sizeof(data), 0, (sockaddr *)&fromaddr, &addrlen);
2242
2243 if(len < (ssize_t) sizeof(dnsheader)) {
2244 if(len < 0)
2245 ; // cerr<<"Error on fd "<<fd<<": "<<stringerror()<<"\n";
2246 else {
2247 g_stats.serverParseError++;
2248 if(g_logCommonErrors)
2249 L<<Logger::Error<<"Unable to parse packet from remote UDP server "<< fromaddr.toString() <<
2250 ": packet smaller than DNS header"<<endl;
2251 }
2252
2253 t_udpclientsocks->returnSocket(fd);
2254 string empty;
2255
2256 MT_t::waiters_t::iterator iter=MT->d_waiters.find(pid);
2257 if(iter != MT->d_waiters.end())
2258 doResends(iter, pid, empty);
2259
2260 MT->sendEvent(pid, &empty); // this denotes error (does lookup again.. at least L1 will be hot)
2261 return;
2262 }
2263
2264 dnsheader dh;
2265 memcpy(&dh, data, sizeof(dh));
2266
2267 PacketID pident;
2268 pident.remote=fromaddr;
2269 pident.id=dh.id;
2270 pident.fd=fd;
2271
2272 if(!dh.qr && g_logCommonErrors) {
2273 L<<Logger::Notice<<"Not taking data from question on outgoing socket from "<< fromaddr.toStringWithPort() <<endl;
2274 }
2275
2276 if(!dh.qdcount || // UPC, Nominum, very old BIND on FormErr, NSD
2277 !dh.qr) { // one weird server
2278 pident.domain.clear();
2279 pident.type = 0;
2280 }
2281 else {
2282 try {
2283 if(len > 12)
2284 pident.domain=DNSName(data, len, 12, false, &pident.type); // don't copy this from above - we need to do the actual read
2285 }
2286 catch(std::exception& e) {
2287 g_stats.serverParseError++; // won't be fed to lwres.cc, so we have to increment
2288 L<<Logger::Warning<<"Error in packet from remote nameserver "<< fromaddr.toStringWithPort() << ": "<<e.what() << endl;
2289 return;
2290 }
2291 }
2292 string packet;
2293 packet.assign(data, len);
2294
2295 MT_t::waiters_t::iterator iter=MT->d_waiters.find(pident);
2296 if(iter != MT->d_waiters.end()) {
2297 doResends(iter, pident, packet);
2298 }
2299
2300 retryWithName:
2301
2302 if(!MT->sendEvent(pident, &packet)) {
2303 // we do a full scan for outstanding queries on unexpected answers. not too bad since we only accept them on the right port number, which is hard enough to guess
2304 for(MT_t::waiters_t::iterator mthread=MT->d_waiters.begin(); mthread!=MT->d_waiters.end(); ++mthread) {
2305 if(pident.fd==mthread->key.fd && mthread->key.remote==pident.remote && mthread->key.type == pident.type &&
2306 pident.domain == mthread->key.domain) {
2307 mthread->key.nearMisses++;
2308 }
2309
2310 // be a bit paranoid here since we're weakening our matching
2311 if(pident.domain.empty() && !mthread->key.domain.empty() && !pident.type && mthread->key.type &&
2312 pident.id == mthread->key.id && mthread->key.remote == pident.remote) {
2313 // cerr<<"Empty response, rest matches though, sending to a waiter"<<endl;
2314 pident.domain = mthread->key.domain;
2315 pident.type = mthread->key.type;
2316 goto retryWithName; // note that this only passes on an error, lwres will still reject the packet
2317 }
2318 }
2319 g_stats.unexpectedCount++; // if we made it here, it really is an unexpected answer
2320 if(g_logCommonErrors) {
2321 L<<Logger::Warning<<"Discarding unexpected packet from "<<fromaddr.toStringWithPort()<<": "<< (pident.domain.empty() ? "<empty>" : pident.domain.toString())<<", "<<pident.type<<", "<<MT->d_waiters.size()<<" waiters"<<endl;
2322 }
2323 }
2324 else if(fd >= 0) {
2325 t_udpclientsocks->returnSocket(fd);
2326 }
2327 }
2328
2329 FDMultiplexer* getMultiplexer()
2330 {
2331 FDMultiplexer* ret;
2332 for(FDMultiplexer::FDMultiplexermap_t::const_iterator i = FDMultiplexer::getMultiplexerMap().begin();
2333 i != FDMultiplexer::getMultiplexerMap().end(); ++i) {
2334 try {
2335 ret=i->second();
2336 return ret;
2337 }
2338 catch(FDMultiplexerException &fe) {
2339 L<<Logger::Error<<"Non-fatal error initializing possible multiplexer ("<<fe.what()<<"), falling back"<<endl;
2340 }
2341 catch(...) {
2342 L<<Logger::Error<<"Non-fatal error initializing possible multiplexer"<<endl;
2343 }
2344 }
2345 L<<Logger::Error<<"No working multiplexer found!"<<endl;
2346 exit(1);
2347 }
2348
2349
2350 string* doReloadLuaScript()
2351 {
2352 string fname= ::arg()["lua-dns-script"];
2353 try {
2354 if(fname.empty()) {
2355 t_pdl->reset();
2356 L<<Logger::Error<<t_id<<" Unloaded current lua script"<<endl;
2357 return new string("unloaded\n");
2358 }
2359 else {
2360 *t_pdl = shared_ptr<RecursorLua4>(new RecursorLua4(fname));
2361 }
2362 }
2363 catch(std::exception& e) {
2364 L<<Logger::Error<<t_id<<" Retaining current script, error from '"<<fname<<"': "<< e.what() <<endl;
2365 return new string("retaining current script, error from '"+fname+"': "+e.what()+"\n");
2366 }
2367
2368 L<<Logger::Warning<<t_id<<" (Re)loaded lua script from '"<<fname<<"'"<<endl;
2369 return new string("(re)loaded '"+fname+"'\n");
2370 }
2371
2372 string doQueueReloadLuaScript(vector<string>::const_iterator begin, vector<string>::const_iterator end)
2373 {
2374 if(begin != end)
2375 ::arg().set("lua-dns-script") = *begin;
2376
2377 return broadcastAccFunction<string>(doReloadLuaScript);
2378 }
2379
2380 string* pleaseUseNewTraceRegex(const std::string& newRegex)
2381 try
2382 {
2383 if(newRegex.empty()) {
2384 t_traceRegex->reset();
2385 return new string("unset\n");
2386 }
2387 else {
2388 (*t_traceRegex) = shared_ptr<Regex>(new Regex(newRegex));
2389 return new string("ok\n");
2390 }
2391 }
2392 catch(PDNSException& ae)
2393 {
2394 return new string(ae.reason+"\n");
2395 }
2396
2397 string doTraceRegex(vector<string>::const_iterator begin, vector<string>::const_iterator end)
2398 {
2399 return broadcastAccFunction<string>(boost::bind(pleaseUseNewTraceRegex, begin!=end ? *begin : ""));
2400 }
2401
2402 static void checkLinuxIPv6Limits()
2403 {
2404 #ifdef __linux__
2405 string line;
2406 if(readFileIfThere("/proc/sys/net/ipv6/route/max_size", &line)) {
2407 int lim=std::stoi(line);
2408 if(lim < 16384) {
2409 L<<Logger::Error<<"If using IPv6, please raise sysctl net.ipv6.route.max_size, currently set to "<<lim<<" which is < 16384"<<endl;
2410 }
2411 }
2412 #endif
2413 }
2414 static void checkOrFixFDS()
2415 {
2416 unsigned int availFDs=getFilenumLimit();
2417 unsigned int wantFDs = g_maxMThreads * g_numWorkerThreads +25; // even healthier margin then before
2418
2419 if(wantFDs > availFDs) {
2420 unsigned int hardlimit= getFilenumLimit(true);
2421 if(hardlimit >= wantFDs) {
2422 setFilenumLimit(wantFDs);
2423 L<<Logger::Warning<<"Raised soft limit on number of filedescriptors to "<<wantFDs<<" to match max-mthreads and threads settings"<<endl;
2424 }
2425 else {
2426 int newval = (hardlimit - 25) / g_numWorkerThreads;
2427 L<<Logger::Warning<<"Insufficient number of filedescriptors available for max-mthreads*threads setting! ("<<hardlimit<<" < "<<wantFDs<<"), reducing max-mthreads to "<<newval<<endl;
2428 g_maxMThreads = newval;
2429 setFilenumLimit(hardlimit);
2430 }
2431 }
2432 }
2433
2434 void* recursorThread(void*);
2435
2436 void* pleaseSupplantACLs(NetmaskGroup *ng)
2437 {
2438 t_allowFrom = ng;
2439 return 0;
2440 }
2441
2442 int g_argc;
2443 char** g_argv;
2444
2445 void parseACLs()
2446 {
2447 static bool l_initialized;
2448
2449 if(l_initialized) { // only reload configuration file on second call
2450 string configname=::arg()["config-dir"]+"/recursor.conf";
2451 cleanSlashes(configname);
2452
2453 if(!::arg().preParseFile(configname.c_str(), "allow-from-file"))
2454 throw runtime_error("Unable to re-parse configuration file '"+configname+"'");
2455 ::arg().preParseFile(configname.c_str(), "allow-from", LOCAL_NETS);
2456 ::arg().preParseFile(configname.c_str(), "include-dir");
2457 ::arg().preParse(g_argc, g_argv, "include-dir");
2458
2459 // then process includes
2460 std::vector<std::string> extraConfigs;
2461 ::arg().gatherIncludes(extraConfigs);
2462
2463 for(const std::string& fn : extraConfigs) {
2464 if(!::arg().preParseFile(fn.c_str(), "allow-from-file", ::arg()["allow-from-file"]))
2465 throw runtime_error("Unable to re-parse configuration file include '"+fn+"'");
2466 if(!::arg().preParseFile(fn.c_str(), "allow-from", ::arg()["allow-from"]))
2467 throw runtime_error("Unable to re-parse configuration file include '"+fn+"'");
2468 }
2469
2470 ::arg().preParse(g_argc, g_argv, "allow-from-file");
2471 ::arg().preParse(g_argc, g_argv, "allow-from");
2472 }
2473
2474 NetmaskGroup* oldAllowFrom = t_allowFrom, *allowFrom=new NetmaskGroup;
2475
2476 if(!::arg()["allow-from-file"].empty()) {
2477 string line;
2478 ifstream ifs(::arg()["allow-from-file"].c_str());
2479 if(!ifs) {
2480 delete allowFrom;
2481 throw runtime_error("Could not open '"+::arg()["allow-from-file"]+"': "+stringerror());
2482 }
2483
2484 string::size_type pos;
2485 while(getline(ifs,line)) {
2486 pos=line.find('#');
2487 if(pos!=string::npos)
2488 line.resize(pos);
2489 trim(line);
2490 if(line.empty())
2491 continue;
2492
2493 allowFrom->addMask(line);
2494 }
2495 L<<Logger::Warning<<"Done parsing " << allowFrom->size() <<" allow-from ranges from file '"<<::arg()["allow-from-file"]<<"' - overriding 'allow-from' setting"<<endl;
2496 }
2497 else if(!::arg()["allow-from"].empty()) {
2498 vector<string> ips;
2499 stringtok(ips, ::arg()["allow-from"], ", ");
2500
2501 L<<Logger::Warning<<"Only allowing queries from: ";
2502 for(vector<string>::const_iterator i = ips.begin(); i!= ips.end(); ++i) {
2503 allowFrom->addMask(*i);
2504 if(i!=ips.begin())
2505 L<<Logger::Warning<<", ";
2506 L<<Logger::Warning<<*i;
2507 }
2508 L<<Logger::Warning<<endl;
2509 }
2510 else {
2511 if(::arg()["local-address"]!="127.0.0.1" && ::arg().asNum("local-port")==53)
2512 L<<Logger::Error<<"WARNING: Allowing queries from all IP addresses - this can be a security risk!"<<endl;
2513 delete allowFrom;
2514 allowFrom = 0;
2515 }
2516
2517 g_initialAllowFrom = allowFrom;
2518 broadcastFunction(boost::bind(pleaseSupplantACLs, allowFrom));
2519 delete oldAllowFrom;
2520
2521 l_initialized = true;
2522 }
2523
2524 boost::optional<Netmask> getEDNSSubnetMask(const ComboAddress& local, const DNSName&dn, const ComboAddress& rem)
2525 {
2526 if(local.sin4.sin_family != AF_INET || local.sin4.sin_addr.s_addr) { // detect unset 'requestor'
2527 if(g_ednsdomains.check(dn) || g_ednssubnets.match(rem)) {
2528 int bits =local.sin4.sin_family == AF_INET ? 24 : 56;
2529 ComboAddress trunc(local);
2530 trunc.truncate(bits);
2531 return boost::optional<Netmask>(Netmask(trunc, bits));
2532 }
2533 }
2534 return boost::optional<Netmask>();
2535 }
2536
2537 void parseEDNSSubnetWhitelist(const std::string& wlist)
2538 {
2539 vector<string> parts;
2540 stringtok(parts, wlist, ",; ");
2541 for(const auto& a : parts) {
2542 try {
2543 Netmask nm(a);
2544 g_ednssubnets.addMask(nm);
2545 }
2546 catch(...) {
2547 g_ednsdomains.add(DNSName(a));
2548 }
2549 }
2550 }
2551
2552 SuffixMatchNode g_delegationOnly;
2553 static void setupDelegationOnly()
2554 {
2555 vector<string> parts;
2556 stringtok(parts, ::arg()["delegation-only"], ", \t");
2557 for(const auto& p : parts) {
2558 g_delegationOnly.add(DNSName(p));
2559 }
2560 }
2561
2562 int serviceMain(int argc, char*argv[])
2563 {
2564 L.setName(s_programname);
2565 L.setLoglevel((Logger::Urgency)(6)); // info and up
2566 L.disableSyslog(::arg().mustDo("disable-syslog"));
2567
2568 if(!::arg()["logging-facility"].empty()) {
2569 int val=logFacilityToLOG(::arg().asNum("logging-facility") );
2570 if(val >= 0)
2571 theL().setFacility(val);
2572 else
2573 L<<Logger::Error<<"Unknown logging facility "<<::arg().asNum("logging-facility") <<endl;
2574 }
2575
2576 showProductVersion();
2577 seedRandom(::arg()["entropy-source"]);
2578
2579 g_disthashseed=dns_random(0xffffffff);
2580
2581 loadRecursorLuaConfig(::arg()["lua-config-file"]);
2582
2583 parseACLs();
2584 sortPublicSuffixList();
2585
2586 if(!::arg()["dont-query"].empty()) {
2587 g_dontQuery=new NetmaskGroup;
2588 vector<string> ips;
2589 stringtok(ips, ::arg()["dont-query"], ", ");
2590 ips.push_back("0.0.0.0");
2591 ips.push_back("::");
2592
2593 L<<Logger::Warning<<"Will not send queries to: ";
2594 for(vector<string>::const_iterator i = ips.begin(); i!= ips.end(); ++i) {
2595 g_dontQuery->addMask(*i);
2596 if(i!=ips.begin())
2597 L<<Logger::Warning<<", ";
2598 L<<Logger::Warning<<*i;
2599 }
2600 L<<Logger::Warning<<endl;
2601 }
2602
2603 g_quiet=::arg().mustDo("quiet");
2604
2605 g_weDistributeQueries = ::arg().mustDo("pdns-distributes-queries");
2606 if(g_weDistributeQueries) {
2607 L<<Logger::Warning<<"PowerDNS Recursor itself will distribute queries over threads"<<endl;
2608 }
2609
2610 setupDelegationOnly();
2611 g_outgoingEDNSBufsize=::arg().asNum("edns-outgoing-bufsize");
2612
2613 if(::arg()["dnssec"]=="off")
2614 g_dnssecmode=DNSSECMode::Off;
2615 else if(::arg()["dnssec"]=="process-no-validate")
2616 g_dnssecmode=DNSSECMode::ProcessNoValidate;
2617 else if(::arg()["dnssec"]=="process")
2618 g_dnssecmode=DNSSECMode::Process;
2619 else if(::arg()["dnssec"]=="validate")
2620 g_dnssecmode=DNSSECMode::ValidateAll;
2621 else if(::arg()["dnssec"]=="log-fail")
2622 g_dnssecmode=DNSSECMode::ValidateForLog;
2623 else {
2624 L<<Logger::Error<<"Unknown DNSSEC mode "<<::arg()["dnssec"]<<endl;
2625 exit(1);
2626 }
2627
2628 if(::arg()["trace"]=="fail") {
2629 SyncRes::setDefaultLogMode(SyncRes::Store);
2630 }
2631 else if(::arg().mustDo("trace")) {
2632 SyncRes::setDefaultLogMode(SyncRes::Log);
2633 ::arg().set("quiet")="no";
2634 g_quiet=false;
2635 g_dnssecLOG=true;
2636 }
2637
2638 SyncRes::s_minimumTTL = ::arg().asNum("minimum-ttl-override");
2639
2640 checkLinuxIPv6Limits();
2641 try {
2642 vector<string> addrs;
2643 if(!::arg()["query-local-address6"].empty()) {
2644 SyncRes::s_doIPv6=true;
2645 L<<Logger::Warning<<"Enabling IPv6 transport for outgoing queries"<<endl;
2646
2647 stringtok(addrs, ::arg()["query-local-address6"], ", ;");
2648 for(const string& addr : addrs) {
2649 g_localQueryAddresses6.push_back(ComboAddress(addr));
2650 }
2651 }
2652 else {
2653 L<<Logger::Warning<<"NOT using IPv6 for outgoing queries - set 'query-local-address6=::' to enable"<<endl;
2654 }
2655 addrs.clear();
2656 stringtok(addrs, ::arg()["query-local-address"], ", ;");
2657 for(const string& addr : addrs) {
2658 g_localQueryAddresses4.push_back(ComboAddress(addr));
2659 }
2660 }
2661 catch(std::exception& e) {
2662 L<<Logger::Error<<"Assigning local query addresses: "<<e.what();
2663 exit(99);
2664 }
2665
2666 SyncRes::s_nopacketcache = ::arg().mustDo("disable-packetcache");
2667
2668 SyncRes::s_maxnegttl=::arg().asNum("max-negative-ttl");
2669 SyncRes::s_maxcachettl=::arg().asNum("max-cache-ttl");
2670 SyncRes::s_packetcachettl=::arg().asNum("packetcache-ttl");
2671 // Cap the packetcache-servfail-ttl to the packetcache-ttl
2672 uint32_t packetCacheServFailTTL = ::arg().asNum("packetcache-servfail-ttl");
2673 SyncRes::s_packetcacheservfailttl=(packetCacheServFailTTL > SyncRes::s_packetcachettl) ? SyncRes::s_packetcachettl : packetCacheServFailTTL;
2674 SyncRes::s_serverdownmaxfails=::arg().asNum("server-down-max-fails");
2675 SyncRes::s_serverdownthrottletime=::arg().asNum("server-down-throttle-time");
2676 SyncRes::s_serverID=::arg()["server-id"];
2677 SyncRes::s_maxqperq=::arg().asNum("max-qperq");
2678 SyncRes::s_maxtotusec=1000*::arg().asNum("max-total-msec");
2679 SyncRes::s_rootNXTrust = ::arg().mustDo( "root-nx-trust");
2680 if(SyncRes::s_serverID.empty()) {
2681 char tmp[128];
2682 gethostname(tmp, sizeof(tmp)-1);
2683 SyncRes::s_serverID=tmp;
2684 }
2685
2686 g_networkTimeoutMsec = ::arg().asNum("network-timeout");
2687
2688 g_initialDomainMap = parseAuthAndForwards();
2689
2690 g_latencyStatSize=::arg().asNum("latency-statistic-size");
2691
2692 g_logCommonErrors=::arg().mustDo("log-common-errors");
2693
2694 g_anyToTcp = ::arg().mustDo("any-to-tcp");
2695 g_udpTruncationThreshold = ::arg().asNum("udp-truncation-threshold");
2696
2697 g_lowercaseOutgoing = ::arg().mustDo("lowercase-outgoing");
2698
2699 makeUDPServerSockets();
2700 makeTCPServerSockets();
2701
2702 parseEDNSSubnetWhitelist(::arg()["edns-subnet-whitelist"]);
2703
2704 int forks;
2705 for(forks = 0; forks < ::arg().asNum("processes") - 1; ++forks) {
2706 if(!fork()) // we are child
2707 break;
2708 }
2709
2710 if(::arg().mustDo("daemon")) {
2711 L<<Logger::Warning<<"Calling daemonize, going to background"<<endl;
2712 L.toConsole(Logger::Critical);
2713 daemonize();
2714 }
2715 signal(SIGUSR1,usr1Handler);
2716 signal(SIGUSR2,usr2Handler);
2717 signal(SIGPIPE,SIG_IGN);
2718 g_numThreads = ::arg().asNum("threads") + ::arg().mustDo("pdns-distributes-queries");
2719 g_numWorkerThreads = ::arg().asNum("threads");
2720 g_maxMThreads = ::arg().asNum("max-mthreads");
2721 checkOrFixFDS();
2722
2723 openssl_thread_setup();
2724 openssl_seed();
2725
2726 int newgid=0;
2727 if(!::arg()["setgid"].empty())
2728 newgid=Utility::makeGidNumeric(::arg()["setgid"]);
2729 int newuid=0;
2730 if(!::arg()["setuid"].empty())
2731 newuid=Utility::makeUidNumeric(::arg()["setuid"]);
2732
2733 Utility::dropGroupPrivs(newuid, newgid);
2734
2735 if (!::arg()["chroot"].empty()) {
2736 if (chroot(::arg()["chroot"].c_str())<0 || chdir("/") < 0) {
2737 L<<Logger::Error<<"Unable to chroot to '"+::arg()["chroot"]+"': "<<strerror (errno)<<", exiting"<<endl;
2738 exit(1);
2739 }
2740 else
2741 L<<Logger::Error<<"Chrooted to '"<<::arg()["chroot"]<<"'"<<endl;
2742 }
2743
2744 s_pidfname=::arg()["socket-dir"]+"/"+s_programname+".pid";
2745 if(!s_pidfname.empty())
2746 unlink(s_pidfname.c_str()); // remove possible old pid file
2747 writePid();
2748
2749 makeControlChannelSocket( ::arg().asNum("processes") > 1 ? forks : -1);
2750
2751 Utility::dropUserPrivs(newuid);
2752
2753 makeThreadPipes();
2754
2755 g_tcpTimeout=::arg().asNum("client-tcp-timeout");
2756 g_maxTCPPerClient=::arg().asNum("max-tcp-per-client");
2757
2758 if(g_numThreads == 1) {
2759 L<<Logger::Warning<<"Operating unthreaded"<<endl;
2760 #ifdef HAVE_SYSTEMD
2761 sd_notify(0, "READY=1");
2762 #endif
2763 recursorThread(0);
2764 }
2765 else {
2766 pthread_t tid;
2767 L<<Logger::Warning<<"Launching "<< g_numThreads <<" threads"<<endl;
2768 for(unsigned int n=0; n < g_numThreads; ++n) {
2769 pthread_create(&tid, 0, recursorThread, (void*)(long)n);
2770 }
2771 void* res;
2772 #ifdef HAVE_SYSTEMD
2773 sd_notify(0, "READY=1");
2774 #endif
2775 pthread_join(tid, &res);
2776 }
2777 return 0;
2778 }
2779
2780 void* recursorThread(void* ptr)
2781 try
2782 {
2783 t_id=(int) (long) ptr;
2784 SyncRes tmp(g_now); // make sure it allocates tsstorage before we do anything, like primeHints or so..
2785 t_sstorage->domainmap = g_initialDomainMap;
2786 t_allowFrom = g_initialAllowFrom;
2787 t_udpclientsocks = new UDPClientSocks();
2788 t_tcpClientCounts = new tcpClientCounts_t();
2789 primeHints();
2790
2791 t_packetCache = new RecursorPacketCache();
2792
2793 #ifdef HAVE_PROTOBUF
2794 t_uuidGenerator = new boost::uuids::random_generator();
2795 #endif
2796 L<<Logger::Warning<<"Done priming cache with root hints"<<endl;
2797
2798 t_pdl = new shared_ptr<RecursorLua4>();
2799
2800 try {
2801 if(!::arg()["lua-dns-script"].empty()) {
2802 *t_pdl = shared_ptr<RecursorLua4>(new RecursorLua4(::arg()["lua-dns-script"]));
2803 L<<Logger::Warning<<"Loaded 'lua' script from '"<<::arg()["lua-dns-script"]<<"'"<<endl;
2804 }
2805 }
2806 catch(std::exception &e) {
2807 L<<Logger::Error<<"Failed to load 'lua' script from '"<<::arg()["lua-dns-script"]<<"': "<<e.what()<<endl;
2808 _exit(99);
2809 }
2810
2811 t_traceRegex = new shared_ptr<Regex>();
2812 unsigned int ringsize=::arg().asNum("stats-ringbuffer-entries") / g_numWorkerThreads;
2813 if(ringsize) {
2814 t_remotes = new addrringbuf_t();
2815 if(g_weDistributeQueries) // if so, only 1 thread does recvfrom
2816 t_remotes->set_capacity(::arg().asNum("stats-ringbuffer-entries"));
2817 else
2818 t_remotes->set_capacity(ringsize);
2819 t_servfailremotes = new addrringbuf_t();
2820 t_servfailremotes->set_capacity(ringsize);
2821 t_largeanswerremotes = new addrringbuf_t();
2822 t_largeanswerremotes->set_capacity(ringsize);
2823
2824 t_queryring = new boost::circular_buffer<pair<DNSName, uint16_t> >();
2825 t_queryring->set_capacity(ringsize);
2826 t_servfailqueryring = new boost::circular_buffer<pair<DNSName, uint16_t> >();
2827 t_servfailqueryring->set_capacity(ringsize);
2828 }
2829
2830 MT=new MTasker<PacketID,string>(::arg().asNum("stack-size"));
2831
2832 PacketID pident;
2833
2834 t_fdm=getMultiplexer();
2835 if(!t_id) {
2836 if(::arg().mustDo("webserver")) {
2837 L<<Logger::Warning << "Enabling web server" << endl;
2838 try {
2839 new RecursorWebServer(t_fdm);
2840 }
2841 catch(PDNSException &e) {
2842 L<<Logger::Error<<"Exception: "<<e.reason<<endl;
2843 exit(99);
2844 }
2845 }
2846 L<<Logger::Error<<"Enabled '"<< t_fdm->getName() << "' multiplexer"<<endl;
2847 }
2848
2849 t_fdm->addReadFD(g_pipes[t_id].readToThread, handlePipeRequest);
2850
2851 if(!g_weDistributeQueries || !t_id) // if we distribute queries, only t_id = 0 listens
2852 for(deferredAdd_t::const_iterator i=deferredAdd.begin(); i!=deferredAdd.end(); ++i)
2853 t_fdm->addReadFD(i->first, i->second);
2854
2855 if(!t_id) {
2856 t_fdm->addReadFD(s_rcc.d_fd, handleRCC); // control channel
2857 }
2858
2859 unsigned int maxTcpClients=::arg().asNum("max-tcp-clients");
2860
2861 bool listenOnTCP(true);
2862
2863 time_t last_carbon=0;
2864 time_t carbonInterval=::arg().asNum("carbon-interval");
2865 counter=AtomicCounter(0); // used to periodically execute certain tasks
2866 for(;;) {
2867 while(MT->schedule(&g_now)); // MTasker letting the mthreads do their thing
2868
2869 if(!(counter%500)) {
2870 MT->makeThread(houseKeeping, 0);
2871 }
2872
2873 if(!(counter%55)) {
2874 typedef vector<pair<int, FDMultiplexer::funcparam_t> > expired_t;
2875 expired_t expired=t_fdm->getTimeouts(g_now);
2876
2877 for(expired_t::iterator i=expired.begin() ; i != expired.end(); ++i) {
2878 shared_ptr<TCPConnection> conn=any_cast<shared_ptr<TCPConnection> >(i->second);
2879 if(g_logCommonErrors)
2880 L<<Logger::Warning<<"Timeout from remote TCP client "<< conn->d_remote.toString() <<endl;
2881 t_fdm->removeReadFD(i->first);
2882 }
2883 }
2884
2885 counter++;
2886
2887 if(!t_id && statsWanted) {
2888 doStats();
2889 }
2890
2891 Utility::gettimeofday(&g_now, 0);
2892
2893 if(!t_id && (g_now.tv_sec - last_carbon >= carbonInterval)) {
2894 MT->makeThread(doCarbonDump, 0);
2895 last_carbon = g_now.tv_sec;
2896 }
2897
2898 t_fdm->run(&g_now);
2899 // 'run' updates g_now for us
2900
2901 if(!g_weDistributeQueries || !t_id) { // if pdns distributes queries, only tid 0 should do this
2902 if(listenOnTCP) {
2903 if(TCPConnection::getCurrentConnections() > maxTcpClients) { // shutdown, too many connections
2904 for(tcpListenSockets_t::iterator i=g_tcpListenSockets.begin(); i != g_tcpListenSockets.end(); ++i)
2905 t_fdm->removeReadFD(*i);
2906 listenOnTCP=false;
2907 }
2908 }
2909 else {
2910 if(TCPConnection::getCurrentConnections() <= maxTcpClients) { // reenable
2911 for(tcpListenSockets_t::iterator i=g_tcpListenSockets.begin(); i != g_tcpListenSockets.end(); ++i)
2912 t_fdm->addReadFD(*i, handleNewTCPQuestion);
2913 listenOnTCP=true;
2914 }
2915 }
2916 }
2917 }
2918 }
2919 catch(PDNSException &ae) {
2920 L<<Logger::Error<<"Exception: "<<ae.reason<<endl;
2921 return 0;
2922 }
2923 catch(std::exception &e) {
2924 L<<Logger::Error<<"STL Exception: "<<e.what()<<endl;
2925 return 0;
2926 }
2927 catch(...) {
2928 L<<Logger::Error<<"any other exception in main: "<<endl;
2929 return 0;
2930 }
2931
2932
2933 int main(int argc, char **argv)
2934 {
2935 g_argc = argc;
2936 g_argv = argv;
2937 g_stats.startupTime=time(0);
2938 versionSetProduct(ProductRecursor);
2939 reportBasicTypes();
2940 reportOtherTypes();
2941
2942 int ret = EXIT_SUCCESS;
2943
2944 try {
2945 ::arg().set("stack-size","stack size per mthread")="200000";
2946 ::arg().set("soa-minimum-ttl","Don't change")="0";
2947 ::arg().set("no-shuffle","Don't change")="off";
2948 ::arg().set("local-port","port to listen on")="53";
2949 ::arg().set("local-address","IP addresses to listen on, separated by spaces or commas. Also accepts ports.")="127.0.0.1";
2950 ::arg().setSwitch("non-local-bind", "Enable binding to non-local addresses by using FREEBIND / BINDANY socket options")="no";
2951 ::arg().set("trace","if we should output heaps of logging. set to 'fail' to only log failing domains")="off";
2952 ::arg().set("dnssec", "DNSSEC mode: off/process-no-validate (default)/process/log-fail/validate")="process-no-validate";
2953 ::arg().set("daemon","Operate as a daemon")="no";
2954 ::arg().setSwitch("write-pid","Write a PID file")="yes";
2955 ::arg().set("loglevel","Amount of logging. Higher is more. Do not set below 3")="4";
2956 ::arg().set("disable-syslog","Disable logging to syslog, useful when running inside a supervisor that logs stdout")="no";
2957 ::arg().set("log-common-errors","If we should log rather common errors")="yes";
2958 ::arg().set("chroot","switch to chroot jail")="";
2959 ::arg().set("setgid","If set, change group id to this gid for more security")="";
2960 ::arg().set("setuid","If set, change user id to this uid for more security")="";
2961 ::arg().set("network-timeout", "Wait this nummer of milliseconds for network i/o")="1500";
2962 ::arg().set("threads", "Launch this number of threads")="2";
2963 ::arg().set("processes", "Launch this number of processes (EXPERIMENTAL, DO NOT CHANGE)")="1"; // if we un-experimental this, need to fix openssl rand seeding for multiple PIDs!
2964 ::arg().set("config-name","Name of this virtual configuration - will rename the binary image")="";
2965 ::arg().set("api-config-dir", "Directory where REST API stores config and zones") = "";
2966 ::arg().set("api-key", "Static pre-shared authentication key for access to the REST API") = "";
2967 ::arg().set("api-logfile", "Location of the server logfile (used by the REST API)") = "/var/log/pdns.log";
2968 ::arg().set("api-readonly", "Disallow data modification through the REST API when set") = "no";
2969 ::arg().setSwitch("webserver", "Start a webserver (for REST API)") = "no";
2970 ::arg().set("webserver-address", "IP Address of webserver to listen on") = "127.0.0.1";
2971 ::arg().set("webserver-port", "Port of webserver to listen on") = "8082";
2972 ::arg().set("webserver-password", "Password required for accessing the webserver") = "";
2973 ::arg().set("webserver-allow-from","Webserver access is only allowed from these subnets")="0.0.0.0/0,::/0";
2974 ::arg().set("carbon-ourname", "If set, overrides our reported hostname for carbon stats")="";
2975 ::arg().set("carbon-server", "If set, send metrics in carbon (graphite) format to this server")="";
2976 ::arg().set("carbon-interval", "Number of seconds between carbon (graphite) updates")="30";
2977 ::arg().set("quiet","Suppress logging of questions and answers")="";
2978 ::arg().set("logging-facility","Facility to log messages as. 0 corresponds to local0")="";
2979 ::arg().set("config-dir","Location of configuration directory (recursor.conf)")=SYSCONFDIR;
2980 ::arg().set("socket-owner","Owner of socket")="";
2981 ::arg().set("socket-group","Group of socket")="";
2982 ::arg().set("socket-mode", "Permissions for socket")="";
2983
2984 ::arg().set("socket-dir",string("Where the controlsocket will live, ")+LOCALSTATEDIR+" when unset and not chrooted" )="";
2985 ::arg().set("delegation-only","Which domains we only accept delegations from")="";
2986 ::arg().set("query-local-address","Source IP address for sending queries")="0.0.0.0";
2987 ::arg().set("query-local-address6","Source IPv6 address for sending queries. IF UNSET, IPv6 WILL NOT BE USED FOR OUTGOING QUERIES")="";
2988 ::arg().set("client-tcp-timeout","Timeout in seconds when talking to TCP clients")="2";
2989 ::arg().set("max-mthreads", "Maximum number of simultaneous Mtasker threads")="2048";
2990 ::arg().set("max-tcp-clients","Maximum number of simultaneous TCP clients")="128";
2991 ::arg().set("server-down-max-fails","Maximum number of consecutive timeouts (and unreachables) to mark a server as down ( 0 => disabled )")="64";
2992 ::arg().set("server-down-throttle-time","Number of seconds to throttle all queries to a server after being marked as down")="60";
2993 ::arg().set("hint-file", "If set, load root hints from this file")="";
2994 ::arg().set("max-cache-entries", "If set, maximum number of entries in the main cache")="1000000";
2995 ::arg().set("max-negative-ttl", "maximum number of seconds to keep a negative cached entry in memory")="3600";
2996 ::arg().set("max-cache-ttl", "maximum number of seconds to keep a cached entry in memory")="86400";
2997 ::arg().set("packetcache-ttl", "maximum number of seconds to keep a cached entry in packetcache")="3600";
2998 ::arg().set("max-packetcache-entries", "maximum number of entries to keep in the packetcache")="500000";
2999 ::arg().set("packetcache-servfail-ttl", "maximum number of seconds to keep a cached servfail entry in packetcache")="60";
3000 ::arg().set("server-id", "Returned when queried for 'server.id' TXT or NSID, defaults to hostname")="";
3001 ::arg().set("stats-ringbuffer-entries", "maximum number of packets to store statistics for")="10000";
3002 ::arg().set("version-string", "string reported on version.pdns or version.bind")=fullVersionString();
3003 ::arg().set("allow-from", "If set, only allow these comma separated netmasks to recurse")=LOCAL_NETS;
3004 ::arg().set("allow-from-file", "If set, load allowed netmasks from this file")="";
3005 ::arg().set("entropy-source", "If set, read entropy from this file")="/dev/urandom";
3006 ::arg().set("dont-query", "If set, do not query these netmasks for DNS data")=DONT_QUERY;
3007 ::arg().set("max-tcp-per-client", "If set, maximum number of TCP sessions per client (IP address)")="0";
3008 ::arg().set("spoof-nearmiss-max", "If non-zero, assume spoofing after this many near misses")="20";
3009 ::arg().set("single-socket", "If set, only use a single socket for outgoing queries")="off";
3010 ::arg().set("auth-zones", "Zones for which we have authoritative data, comma separated domain=file pairs ")="";
3011 ::arg().set("lua-config-file", "More powerful configuration options")="";
3012
3013 ::arg().set("forward-zones", "Zones for which we forward queries, comma separated domain=ip pairs")="";
3014 ::arg().set("forward-zones-recurse", "Zones for which we forward queries with recursion bit, comma separated domain=ip pairs")="";
3015 ::arg().set("forward-zones-file", "File with (+)domain=ip pairs for forwarding")="";
3016 ::arg().set("export-etc-hosts", "If we should serve up contents from /etc/hosts")="off";
3017 ::arg().set("export-etc-hosts-search-suffix", "Also serve up the contents of /etc/hosts with this suffix")="";
3018 ::arg().set("etc-hosts-file", "Path to 'hosts' file")="/etc/hosts";
3019 ::arg().set("serve-rfc1918", "If we should be authoritative for RFC 1918 private IP space")="";
3020 ::arg().set("lua-dns-script", "Filename containing an optional 'lua' script that will be used to modify dns answers")="";
3021 ::arg().set("latency-statistic-size","Number of latency values to calculate the qa-latency average")="10000";
3022 ::arg().setSwitch( "disable-packetcache", "Disable packetcache" )= "no";
3023 ::arg().set("edns-subnet-whitelist", "List of netmasks and domains that we should enable EDNS subnet for")="";
3024 ::arg().setSwitch( "pdns-distributes-queries", "If PowerDNS itself should distribute queries over threads")="";
3025 ::arg().setSwitch( "root-nx-trust", "If set, believe that an NXDOMAIN from the root means the TLD does not exist")="no";
3026 ::arg().setSwitch( "any-to-tcp","Answer ANY queries with tc=1, shunting to TCP" )="no";
3027 ::arg().setSwitch( "lowercase-outgoing","Force outgoing questions to lowercase")="no";
3028 ::arg().set("udp-truncation-threshold", "Maximum UDP response size before we truncate")="1680";
3029 ::arg().set("edns-outgoing-bufsize", "Outgoing EDNS buffer size")="1680";
3030 ::arg().set("minimum-ttl-override", "Set under adverse conditions, a minimum TTL")="0";
3031 ::arg().set("max-qperq", "Maximum outgoing queries per query")="50";
3032 ::arg().set("max-total-msec", "Maximum total wall-clock time per query in milliseconds, 0 for unlimited")="7000";
3033
3034 ::arg().set("include-dir","Include *.conf files from this directory")="";
3035 ::arg().set("security-poll-suffix","Domain name from which to query security update notifications")="secpoll.powerdns.com.";
3036
3037 ::arg().setSwitch("reuseport","Enable SO_REUSEPORT allowing multiple recursors processes to listen to 1 address")="no";
3038
3039 ::arg().setCmd("help","Provide a helpful message");
3040 ::arg().setCmd("version","Print version string");
3041 ::arg().setCmd("config","Output blank configuration");
3042 L.toConsole(Logger::Info);
3043 ::arg().laxParse(argc,argv); // do a lax parse
3044
3045 string configname=::arg()["config-dir"]+"/recursor.conf";
3046 if(::arg()["config-name"]!="") {
3047 configname=::arg()["config-dir"]+"/recursor-"+::arg()["config-name"]+".conf";
3048 s_programname+="-"+::arg()["config-name"];
3049 }
3050 cleanSlashes(configname);
3051
3052 if(::arg().mustDo("config")) {
3053 cout<<::arg().configstring()<<endl;
3054 exit(0);
3055 }
3056
3057 if(!::arg().file(configname.c_str()))
3058 L<<Logger::Warning<<"Unable to parse configuration file '"<<configname<<"'"<<endl;
3059
3060 ::arg().parse(argc,argv);
3061
3062 if( !::arg()["chroot"].empty() && !::arg()["api-config-dir"].empty() && !::arg().mustDo("api-readonly") ) {
3063 L<<Logger::Error<<"Using chroot and a writable API is not possible"<<endl;
3064 exit(EXIT_FAILURE);
3065 }
3066
3067 if (::arg()["socket-dir"].empty()) {
3068 if (::arg()["chroot"].empty())
3069 ::arg().set("socket-dir") = LOCALSTATEDIR;
3070 else
3071 ::arg().set("socket-dir") = "/";
3072 }
3073
3074 ::arg().set("delegation-only")=toLower(::arg()["delegation-only"]);
3075
3076 if(::arg().asNum("threads")==1)
3077 ::arg().set("pdns-distributes-queries")="no";
3078
3079 if(::arg().mustDo("help")) {
3080 cout<<"syntax:"<<endl<<endl;
3081 cout<<::arg().helpstring(::arg()["help"])<<endl;
3082 exit(0);
3083 }
3084 if(::arg().mustDo("version")) {
3085 showProductVersion();
3086 showBuildConfiguration();
3087 exit(0);
3088 }
3089
3090 Logger::Urgency logUrgency = (Logger::Urgency)::arg().asNum("loglevel");
3091
3092 if (logUrgency < Logger::Error)
3093 logUrgency = Logger::Error;
3094 if(!g_quiet && logUrgency < Logger::Info) { // Logger::Info=6, Logger::Debug=7
3095 logUrgency = Logger::Info; // if you do --quiet=no, you need Info to also see the query log
3096 }
3097 L.setLoglevel(logUrgency);
3098 L.toConsole(logUrgency);
3099
3100 serviceMain(argc, argv);
3101 }
3102 catch(PDNSException &ae) {
3103 L<<Logger::Error<<"Exception: "<<ae.reason<<endl;
3104 ret=EXIT_FAILURE;
3105 }
3106 catch(std::exception &e) {
3107 L<<Logger::Error<<"STL Exception: "<<e.what()<<endl;
3108 ret=EXIT_FAILURE;
3109 }
3110 catch(...) {
3111 L<<Logger::Error<<"any other exception in main: "<<endl;
3112 ret=EXIT_FAILURE;
3113 }
3114
3115 return ret;
3116 }