]> git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/pdns_recursor.cc
implement pdns-distributes-queries to make powerdns distribute queries itself
[thirdparty/pdns.git] / pdns / pdns_recursor.cc
1 /*
2 PowerDNS Versatile Database Driven Nameserver
3 Copyright (C) 2003 - 2010 PowerDNS.COM BV
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License version 2
7 as published by the Free Software Foundation
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18
19 #ifndef WIN32
20 # include <netdb.h>
21 # include <sys/stat.h>
22 # include <unistd.h>
23 #else
24 #include "ntservice.hh"
25 #include "recursorservice.hh"
26 #endif // WIN32
27
28 #include <boost/foreach.hpp>
29 #include <pthread.h>
30 #include "recpacketcache.hh"
31 #include "utility.hh"
32 #include "dns_random.hh"
33 #include <iostream>
34 #include <errno.h>
35 #include <map>
36 #include <set>
37 #include "recursor_cache.hh"
38 #include <stdio.h>
39 #include <signal.h>
40 #include <stdlib.h>
41 #include "misc.hh"
42 #include "mtasker.hh"
43 #include <utility>
44 #include "arguments.hh"
45 #include "syncres.hh"
46 #include <fcntl.h>
47 #include <fstream>
48 #include "sstuff.hh"
49 #include <boost/tuple/tuple.hpp>
50 #include <boost/tuple/tuple_comparison.hpp>
51 #include <boost/shared_array.hpp>
52 #include <boost/lexical_cast.hpp>
53 #include <boost/function.hpp>
54 #include <boost/algorithm/string.hpp>
55 #include <netinet/tcp.h>
56 #include "dnsparser.hh"
57 #include "dnswriter.hh"
58 #include "dnsrecords.hh"
59 #include "zoneparser-tng.hh"
60 #include "rec_channel.hh"
61 #include "logger.hh"
62 #include "iputils.hh"
63 #include "mplexer.hh"
64 #include "config.h"
65 #include "lua-pdns-recursor.hh"
66
67 #ifndef RECURSOR
68 #include "statbag.hh"
69 StatBag S;
70 #endif
71
72 __thread FDMultiplexer* t_fdm;
73 __thread unsigned int t_id;
74 unsigned int g_maxTCPPerClient;
75 unsigned int g_networkTimeoutMsec;
76 bool g_logCommonErrors;
77 __thread shared_ptr<PowerDNSLua>* t_pdl;
78 __thread RemoteKeeper* t_remotes;
79
80 RecursorControlChannel s_rcc; // only active in thread 0
81
82 // for communicating with our threads
83 struct ThreadPipeSet
84 {
85 int writeToThread;
86 int readToThread;
87 int writeFromThread;
88 int readFromThread;
89 };
90
91 vector<ThreadPipeSet> g_pipes; // effectively readonly after startup
92
93 SyncRes::domainmap_t* g_initialDomainMap; // new threads needs this to be setup
94
95 #include "namespaces.hh"
96
97 __thread MemRecursorCache* t_RC;
98 __thread RecursorPacketCache* t_packetCache;
99 RecursorStats g_stats;
100 bool g_quiet;
101
102 bool g_weDistributeQueries; // if true, only 1 thread listens on the incoming query sockets
103
104 static __thread NetmaskGroup* t_allowFrom;
105 static NetmaskGroup* g_initialAllowFrom; // new thread needs to be setup with this
106
107 NetmaskGroup* g_dontQuery;
108 string s_programname="pdns_recursor";
109
110 typedef vector<int> tcpListenSockets_t;
111 tcpListenSockets_t g_tcpListenSockets; // shared across threads, but this is fine, never written to from a thread. All threads listen on all sockets
112 int g_tcpTimeout;
113 unsigned int g_maxMThreads;
114 struct timeval g_now; // timestamp, updated (too) frequently
115 map<int, ComboAddress> g_listenSocketsAddresses; // is shared across all threads right now
116
117 __thread MT_t* MT; // the big MTasker
118
119 unsigned int g_numThreads;
120
121 #define LOCAL_NETS "127.0.0.0/8, 10.0.0.0/8, 192.168.0.0/16, 172.16.0.0/12, ::1/128, fe80::/10"
122
123 //! used to send information to a newborn mthread
124 struct DNSComboWriter {
125 DNSComboWriter(const char* data, uint16_t len, const struct timeval& now) : d_mdp(data, len), d_now(now),
126 d_tcp(false), d_socket(-1)
127 {}
128 MOADNSParser d_mdp;
129 void setRemote(const ComboAddress* sa)
130 {
131 d_remote=*sa;
132 }
133
134 void setSocket(int sock)
135 {
136 d_socket=sock;
137 }
138
139 string getRemote() const
140 {
141 return d_remote.toString();
142 }
143
144 struct timeval d_now;
145 ComboAddress d_remote;
146 bool d_tcp;
147 int d_socket;
148 };
149
150
151 ArgvMap &arg()
152 {
153 static ArgvMap theArg;
154 return theArg;
155 }
156
157
158 void handleTCPClientWritable(int fd, FDMultiplexer::funcparam_t& var);
159
160 // -1 is error, 0 is timeout, 1 is success
161 int asendtcp(const string& data, Socket* sock)
162 {
163 PacketID pident;
164 pident.sock=sock;
165 pident.outMSG=data;
166
167 t_fdm->addWriteFD(sock->getHandle(), handleTCPClientWritable, pident);
168 string packet;
169
170 int ret=MT->waitEvent(pident, &packet, g_networkTimeoutMsec);
171
172 if(!ret || ret==-1) { // timeout
173 t_fdm->removeWriteFD(sock->getHandle());
174 }
175 else if(packet.size() !=data.size()) { // main loop tells us what it sent out, or empty in case of an error
176 return -1;
177 }
178 return ret;
179 }
180
181 void handleTCPClientReadable(int fd, FDMultiplexer::funcparam_t& var);
182
183 // -1 is error, 0 is timeout, 1 is success
184 int arecvtcp(string& data, int len, Socket* sock)
185 {
186 data.clear();
187 PacketID pident;
188 pident.sock=sock;
189 pident.inNeeded=len;
190 t_fdm->addReadFD(sock->getHandle(), handleTCPClientReadable, pident);
191
192 int ret=MT->waitEvent(pident,&data, g_networkTimeoutMsec);
193 if(!ret || ret==-1) { // timeout
194 t_fdm->removeReadFD(sock->getHandle());
195 }
196 else if(data.empty()) {// error, EOF or other
197 return -1;
198 }
199
200 return ret;
201 }
202
203 vector<ComboAddress> g_localQueryAddresses4, g_localQueryAddresses6;
204 const ComboAddress g_local4("0.0.0.0"), g_local6("::");
205
206 //! pick a random query local address
207 ComboAddress getQueryLocalAddress(int family, uint16_t port)
208 {
209 ComboAddress ret;
210 if(family==AF_INET) {
211 if(g_localQueryAddresses4.empty())
212 ret = g_local4;
213 else
214 ret = g_localQueryAddresses4[dns_random(g_localQueryAddresses4.size())];
215 ret.sin4.sin_port = htons(port);
216 }
217 else {
218 if(g_localQueryAddresses6.empty())
219 ret = g_local6;
220 else
221 ret = g_localQueryAddresses6[dns_random(g_localQueryAddresses6.size())];
222
223 ret.sin6.sin6_port = htons(port);
224 }
225 return ret;
226 }
227
228 void handleUDPServerResponse(int fd, FDMultiplexer::funcparam_t&);
229
230 void setSocketBuffer(int fd, int optname, uint32_t size)
231 {
232 uint32_t psize=0;
233 socklen_t len=sizeof(psize);
234
235 if(!getsockopt(fd, SOL_SOCKET, optname, (char*)&psize, &len) && psize > size) {
236 L<<Logger::Error<<"Not decreasing socket buffer size from "<<psize<<" to "<<size<<endl;
237 return;
238 }
239
240 if (setsockopt(fd, SOL_SOCKET, optname, (char*)&size, sizeof(size)) < 0 )
241 L<<Logger::Error<<"Warning: unable to raise socket buffer size to "<<size<<": "<<strerror(errno)<<endl;
242 }
243
244
245 static void setSocketReceiveBuffer(int fd, uint32_t size)
246 {
247 setSocketBuffer(fd, SO_RCVBUF, size);
248 }
249
250 static void setSocketSendBuffer(int fd, uint32_t size)
251 {
252 setSocketBuffer(fd, SO_SNDBUF, size);
253 }
254
255
256 // you can ask this class for a UDP socket to send a query from
257 // this socket is not yours, don't even think about deleting it
258 // but after you call 'returnSocket' on it, don't assume anything anymore
259 class UDPClientSocks
260 {
261 unsigned int d_numsocks;
262 unsigned int d_maxsocks;
263 public:
264 UDPClientSocks() : d_numsocks(0), d_maxsocks(5000)
265 {
266 }
267
268 typedef set<int> socks_t;
269 socks_t d_socks;
270
271 // returning -1 means: temporary OS error (ie, out of files), -2 means OS error
272 int getSocket(const ComboAddress& toaddr, int* fd)
273 {
274 *fd=makeClientSocket(toaddr.sin4.sin_family);
275 if(*fd < 0) // temporary error - receive exception otherwise
276 return -1;
277
278 if(connect(*fd, (struct sockaddr*)(&toaddr), toaddr.getSocklen()) < 0) {
279 int err = errno;
280 // returnSocket(*fd);
281 Utility::closesocket(*fd);
282 if(err==ENETUNREACH) // Seth "My Interfaces Are Like A Yo Yo" Arnold special
283 return -2;
284 return -1;
285 }
286
287 d_socks.insert(*fd);
288 d_numsocks++;
289 return 0;
290 }
291
292 void returnSocket(int fd)
293 {
294 socks_t::iterator i=d_socks.find(fd);
295 if(i==d_socks.end()) {
296 throw AhuException("Trying to return a socket (fd="+lexical_cast<string>(fd)+") not in the pool");
297 }
298 returnSocketLocked(i);
299 }
300
301 // return a socket to the pool, or simply erase it
302 void returnSocketLocked(socks_t::iterator& i)
303 {
304 if(i==d_socks.end()) {
305 throw AhuException("Trying to return a socket not in the pool");
306 }
307 try {
308 t_fdm->removeReadFD(*i);
309 }
310 catch(FDMultiplexerException& e) {
311 // we sometimes return a socket that has not yet been assigned to t_fdm
312 }
313 Utility::closesocket(*i);
314
315 d_socks.erase(i++);
316 --d_numsocks;
317 }
318
319 // returns -1 for errors which might go away, throws for ones that won't
320 static int makeClientSocket(int family)
321 {
322 int ret=(int)socket(family, SOCK_DGRAM, 0);
323 if(ret < 0 && errno==EMFILE) // this is not a catastrophic error
324 return ret;
325
326 if(ret<0)
327 throw AhuException("Making a socket for resolver: "+stringerror());
328
329
330 int tries=10;
331 while(--tries) {
332 uint16_t port;
333
334 if(tries==1) // fall back to kernel 'random'
335 port = 0;
336 else
337 port = 1025 + dns_random(64510);
338
339 ComboAddress sin=getQueryLocalAddress(family, port); // does htons for us
340
341 if (::bind(ret, (struct sockaddr *)&sin, sin.getSocklen()) >= 0)
342 break;
343 }
344 if(!tries)
345 throw AhuException("Resolver binding to local query client socket: "+stringerror());
346
347 Utility::setNonBlocking(ret);
348 return ret;
349 }
350 };
351
352 static __thread UDPClientSocks* t_udpclientsocks;
353
354 /* these two functions are used by LWRes */
355 // -2 is OS error, -1 is error that depends on the remote, > 0 is success
356 int asendto(const char *data, int len, int flags,
357 const ComboAddress& toaddr, uint16_t id, const string& domain, uint16_t qtype, int* fd)
358 {
359
360 PacketID pident;
361 pident.domain = domain;
362 pident.remote = toaddr;
363 pident.type = qtype;
364
365 // see if there is an existing outstanding request we can chain on to, using partial equivalence function
366 pair<MT_t::waiters_t::iterator, MT_t::waiters_t::iterator> chain=MT->d_waiters.equal_range(pident, PacketIDBirthdayCompare());
367
368 for(; chain.first != chain.second; chain.first++) {
369 if(chain.first->key.fd > -1) { // don't chain onto existing chained waiter!
370 /*
371 cerr<<"Orig: "<<pident.domain<<", "<<pident.remote.toString()<<", id="<<id<<endl;
372 cerr<<"Had hit: "<< chain.first->key.domain<<", "<<chain.first->key.remote.toString()<<", id="<<chain.first->key.id
373 <<", count="<<chain.first->key.chain.size()<<", origfd: "<<chain.first->key.fd<<endl;
374 */
375 chain.first->key.chain.insert(id); // we can chain
376 *fd=-1; // gets used in waitEvent / sendEvent later on
377 return 1;
378 }
379 }
380
381 int ret=t_udpclientsocks->getSocket(toaddr, fd);
382 if(ret < 0)
383 return ret;
384
385 pident.fd=*fd;
386 pident.id=id;
387
388 t_fdm->addReadFD(*fd, handleUDPServerResponse, pident);
389 ret = send(*fd, data, len, 0);
390
391 int tmp = errno;
392
393 if(ret < 0)
394 t_udpclientsocks->returnSocket(*fd);
395
396 errno = tmp; // this is for logging purposes only
397 return ret;
398 }
399
400 // -1 is error, 0 is timeout, 1 is success
401 int arecvfrom(char *data, int len, int flags, const ComboAddress& fromaddr, int *d_len,
402 uint16_t id, const string& domain, uint16_t qtype, int fd, struct timeval* now)
403 {
404 static optional<unsigned int> nearMissLimit;
405 if(!nearMissLimit)
406 nearMissLimit=::arg().asNum("spoof-nearmiss-max");
407
408 PacketID pident;
409 pident.fd=fd;
410 pident.id=id;
411 pident.domain=domain;
412 pident.type = qtype;
413 pident.remote=fromaddr;
414
415 string packet;
416 int ret=MT->waitEvent(pident, &packet, g_networkTimeoutMsec, now);
417
418 if(ret > 0) {
419 if(packet.empty()) // means "error"
420 return -1;
421
422 *d_len=(int)packet.size();
423 memcpy(data,packet.c_str(),min(len,*d_len));
424 if(*nearMissLimit && pident.nearMisses > *nearMissLimit) {
425 L<<Logger::Error<<"Too many ("<<pident.nearMisses<<" > "<<*nearMissLimit<<") bogus answers for '"<<domain<<"' from "<<fromaddr.toString()<<", assuming spoof attempt."<<endl;
426 g_stats.spoofCount++;
427 return -1;
428 }
429 }
430 else {
431 if(fd >= 0)
432 t_udpclientsocks->returnSocket(fd);
433 }
434 return ret;
435 }
436
437
438 string s_pidfname;
439 static void writePid(void)
440 {
441 ofstream of(s_pidfname.c_str(), ios_base::app);
442 if(of)
443 of<< Utility::getpid() <<endl;
444 else
445 L<<Logger::Error<<"Requested to write pid for "<<Utility::getpid()<<" to "<<s_pidfname<<" failed: "<<strerror(errno)<<endl;
446 }
447
448 typedef map<ComboAddress, uint32_t, ComboAddress::addressOnlyLessThan> tcpClientCounts_t;
449 tcpClientCounts_t __thread* t_tcpClientCounts;
450
451
452 void TCPConnection::closeAndCleanup(int fd, const ComboAddress& remote)
453 {
454 Utility::closesocket(fd);
455 if(!(*t_tcpClientCounts)[remote]--)
456 t_tcpClientCounts->erase(remote);
457 decCurrentConnections();
458 }
459 void TCPConnection::closeAndCleanup()
460 {
461 closeAndCleanup(fd, remote);
462 }
463
464 volatile unsigned int TCPConnection::s_currentConnections;
465 void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var);
466
467 void updateRcodeStats(int res)
468 {
469 switch(res) {
470 case RCode::ServFail:
471 g_stats.servFails++;
472 break;
473 case RCode::NXDomain:
474 g_stats.nxDomains++;
475 break;
476 case RCode::NoError:
477 g_stats.noErrors++;
478 break;
479 }
480 }
481
482 void startDoResolve(void *p)
483 {
484 DNSComboWriter* dc=(DNSComboWriter *)p;
485
486 try {
487 uint16_t maxudpsize=512;
488 EDNSOpts edo;
489 if(getEDNSOpts(dc->d_mdp, &edo)) {
490 maxudpsize=max(edo.d_packetsize, (uint16_t)1280);
491 }
492
493 vector<DNSResourceRecord> ret;
494 vector<uint8_t> packet;
495
496 DNSPacketWriter pw(packet, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_mdp.d_qclass);
497
498 pw.getHeader()->aa=0;
499 pw.getHeader()->ra=1;
500 pw.getHeader()->qr=1;
501 pw.getHeader()->tc=0;
502 pw.getHeader()->id=dc->d_mdp.d_header.id;
503 pw.getHeader()->rd=dc->d_mdp.d_header.rd;
504
505 SyncRes sr(dc->d_now);
506 if(!g_quiet)
507 L<<Logger::Error<<t_id<<" ["<<MT->getTid()<<"] " << (dc->d_tcp ? "TCP " : "") << "question for '"<<dc->d_mdp.d_qname<<"|"
508 <<DNSRecordContent::NumberToType(dc->d_mdp.d_qtype)<<"' from "<<dc->getRemote()<<endl;
509
510 sr.setId(MT->getTid());
511 if(!dc->d_mdp.d_header.rd)
512 sr.setCacheOnly();
513
514 int res;
515
516 bool variableAnswer = false;
517 // if there is a PowerDNSLua active, and it 'took' the query in preResolve, we don't launch beginResolve
518 if(!t_pdl->get() || !(*t_pdl)->preresolve(dc->d_remote, g_listenSocketsAddresses[dc->d_socket], dc->d_mdp.d_qname, QType(dc->d_mdp.d_qtype), ret, res, &variableAnswer)) {
519 res = sr.beginResolve(dc->d_mdp.d_qname, QType(dc->d_mdp.d_qtype), dc->d_mdp.d_qclass, ret);
520
521 if(t_pdl->get()) {
522 if(res == RCode::NXDomain)
523 (*t_pdl)->nxdomain(dc->d_remote, g_listenSocketsAddresses[dc->d_socket], dc->d_mdp.d_qname, QType(dc->d_mdp.d_qtype), ret, res, &variableAnswer);
524 }
525 }
526
527 uint32_t minTTL=numeric_limits<uint32_t>::max();
528 if(res<0) {
529 pw.getHeader()->rcode=RCode::ServFail;
530 // no commit here, because no record
531 g_stats.servFails++;
532 }
533 else {
534 pw.getHeader()->rcode=res;
535 updateRcodeStats(res);
536
537 if(ret.size()) {
538 shuffle(ret);
539
540 for(vector<DNSResourceRecord>::const_iterator i=ret.begin(); i!=ret.end(); ++i) {
541 pw.startRecord(i->qname, i->qtype.getCode(), i->ttl, i->qclass, (DNSPacketWriter::Place)i->d_place);
542 minTTL = min(minTTL, i->ttl);
543 if(i->qtype.getCode() == QType::A) { // blast out A record w/o doing whole dnswriter thing
544 uint32_t ip=0;
545 IpToU32(i->content, &ip);
546 pw.xfr32BitInt(htonl(ip));
547 } else {
548 shared_ptr<DNSRecordContent> drc(DNSRecordContent::mastermake(i->qtype.getCode(), i->qclass, i->content));
549 drc->toPacket(pw);
550 }
551 if(!dc->d_tcp && pw.size() > maxudpsize) {
552 pw.rollback();
553 if(i->d_place==DNSResourceRecord::ANSWER) // only truncate if we actually omitted parts of the answer
554 pw.getHeader()->tc=1;
555 goto sendit; // need to jump over pw.commit
556 }
557 }
558
559 pw.commit();
560 }
561 }
562 sendit:;
563 if(!dc->d_tcp) {
564 sendto(dc->d_socket, (const char*)&*packet.begin(), packet.size(), 0, (struct sockaddr *)(&dc->d_remote), dc->d_remote.getSocklen());
565 if(!SyncRes::s_nopacketcache && !variableAnswer ) {
566 t_packetCache->insertResponsePacket(string((const char*)&*packet.begin(), packet.size()), g_now.tv_sec,
567 min(minTTL,
568 (pw.getHeader()->rcode == RCode::ServFail) ? SyncRes::s_packetcacheservfailttl : SyncRes::s_packetcachettl
569 )
570 );
571 }
572 }
573 else {
574 char buf[2];
575 buf[0]=packet.size()/256;
576 buf[1]=packet.size()%256;
577
578 Utility::iovec iov[2];
579
580 iov[0].iov_base=(void*)buf; iov[0].iov_len=2;
581 iov[1].iov_base=(void*)&*packet.begin(); iov[1].iov_len = packet.size();
582
583 int ret=Utility::writev(dc->d_socket, iov, 2);
584 bool hadError=true;
585
586 if(ret == 0)
587 L<<Logger::Error<<"EOF writing TCP answer to "<<dc->getRemote()<<endl;
588 else if(ret < 0 )
589 L<<Logger::Error<<"Error writing TCP answer to "<<dc->getRemote()<<": "<< strerror(errno) <<endl;
590 else if((unsigned int)ret != 2 + packet.size())
591 L<<Logger::Error<<"Oops, partial answer sent to "<<dc->getRemote()<<" for "<<dc->d_mdp.d_qname<<" (size="<< (2 + packet.size()) <<", sent "<<ret<<")"<<endl;
592 else
593 hadError=false;
594
595 // update tcp connection status, either by closing or moving to 'BYTE0'
596
597 if(hadError) {
598 // no need to remove us from FDM, we weren't there
599 TCPConnection::closeAndCleanup(dc->d_socket, dc->d_remote);
600 dc->d_socket = -1;
601 }
602 else {
603 TCPConnection tc;
604 tc.fd=dc->d_socket;
605 tc.state=TCPConnection::BYTE0;
606 tc.remote=dc->d_remote;
607 Utility::gettimeofday(&g_now, 0); // needs to be updated
608 t_fdm->addReadFD(tc.fd, handleRunningTCPQuestion, tc);
609 t_fdm->setReadTTD(tc.fd, g_now, g_tcpTimeout);
610 }
611 }
612
613 if(!g_quiet) {
614 L<<Logger::Error<<t_id<<" ["<<MT->getTid()<<"] answer to "<<(dc->d_mdp.d_header.rd?"":"non-rd ")<<"question '"<<dc->d_mdp.d_qname<<"|"<<DNSRecordContent::NumberToType(dc->d_mdp.d_qtype);
615 L<<"': "<<ntohs(pw.getHeader()->ancount)<<" answers, "<<ntohs(pw.getHeader()->arcount)<<" additional, took "<<sr.d_outqueries<<" packets, "<<
616 sr.d_throttledqueries<<" throttled, "<<sr.d_timeouts<<" timeouts, "<<sr.d_tcpoutqueries<<" tcp connections, rcode="<<res<<endl;
617 }
618
619 sr.d_outqueries ? t_RC->cacheMisses++ : t_RC->cacheHits++;
620 float spent=makeFloat(sr.d_now-dc->d_now);
621 if(spent < 0.001)
622 g_stats.answers0_1++;
623 else if(spent < 0.010)
624 g_stats.answers1_10++;
625 else if(spent < 0.1)
626 g_stats.answers10_100++;
627 else if(spent < 1.0)
628 g_stats.answers100_1000++;
629 else
630 g_stats.answersSlow++;
631
632 uint64_t newLat=(uint64_t)(spent*1000000);
633 if(newLat < 1000000) // outliers of several minutes exist..
634 g_stats.avgLatencyUsec=(uint64_t)((1-0.0001)*g_stats.avgLatencyUsec + 0.0001*newLat);
635
636 delete dc;
637 dc=0;
638 }
639 catch(AhuException &ae) {
640 L<<Logger::Error<<"startDoResolve problem: "<<ae.reason<<endl;
641 if(dc && dc->d_tcp && dc->d_socket >= 0)
642 TCPConnection::closeAndCleanup(dc->d_socket, dc->d_remote);
643 delete dc;
644 }
645 catch(MOADNSException& e) {
646 if(dc && dc->d_tcp && dc->d_socket >= 0)
647 TCPConnection::closeAndCleanup(dc->d_socket, dc->d_remote);
648
649 L<<Logger::Error<<"DNS parser error: "<<dc->d_mdp.d_qname<<", "<<e.what()<<endl;
650 delete dc;
651 }
652 catch(std::exception& e) {
653 L<<Logger::Error<<"STL error: "<<e.what()<<endl;
654 if(dc && dc->d_tcp && dc->d_socket >= 0)
655 TCPConnection::closeAndCleanup(dc->d_socket, dc->d_remote);
656 delete dc;
657 }
658 catch(...) {
659 L<<Logger::Error<<"Any other exception in a resolver context"<<endl;
660 }
661 }
662
663 void makeControlChannelSocket()
664 {
665 string sockname=::arg()["socket-dir"]+"/pdns_recursor.controlsocket";
666 s_rcc.listen(sockname);
667
668 #ifndef WIN32
669 int sockowner = -1;
670 int sockgroup = -1;
671
672 if (!::arg().isEmpty("socket-group"))
673 sockgroup=::arg().asGid("socket-group");
674 if (!::arg().isEmpty("socket-owner"))
675 sockowner=::arg().asUid("socket-owner");
676
677 if (sockgroup > -1 || sockowner > -1) {
678 if(chown(sockname.c_str(), sockowner, sockgroup) < 0) {
679 unixDie("Failed to chown control socket");
680 }
681 }
682
683 // do mode change if socket-mode is given
684 if(!::arg().isEmpty("socket-mode")) {
685 mode_t sockmode=::arg().asMode("socket-mode");
686 chmod(sockname.c_str(), sockmode);
687 }
688 #endif
689 }
690
691 void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var)
692 {
693 TCPConnection* conn=any_cast<TCPConnection>(&var);
694
695 if(conn->state==TCPConnection::BYTE0) {
696 int bytes=recv(conn->fd, conn->data, 2, 0);
697 if(bytes==1)
698 conn->state=TCPConnection::BYTE1;
699 if(bytes==2) {
700 conn->qlen=(((unsigned char)conn->data[0]) << 8)+ (unsigned char)conn->data[1];
701 conn->bytesread=0;
702 conn->state=TCPConnection::GETQUESTION;
703 }
704 if(!bytes || bytes < 0) {
705 TCPConnection tmp(*conn);
706 t_fdm->removeReadFD(fd);
707 tmp.closeAndCleanup();
708 return;
709 }
710 }
711 else if(conn->state==TCPConnection::BYTE1) {
712 int bytes=recv(conn->fd, conn->data+1, 1, 0);
713 if(bytes==1) {
714 conn->state=TCPConnection::GETQUESTION;
715 conn->qlen=(((unsigned char)conn->data[0]) << 8)+ (unsigned char)conn->data[1];
716 conn->bytesread=0;
717 }
718 if(!bytes || bytes < 0) {
719 if(g_logCommonErrors)
720 L<<Logger::Error<<"TCP client "<< conn->remote.toString() <<" disconnected after first byte"<<endl;
721 TCPConnection tmp(*conn);
722 t_fdm->removeReadFD(fd);
723 tmp.closeAndCleanup(); // conn loses validity here..
724 return;
725 }
726 }
727 else if(conn->state==TCPConnection::GETQUESTION) {
728 int bytes=recv(conn->fd, conn->data + conn->bytesread, conn->qlen - conn->bytesread, 0);
729 if(!bytes || bytes < 0) {
730 L<<Logger::Error<<"TCP client "<< conn->remote.toString() <<" disconnected while reading question body"<<endl;
731 TCPConnection tmp(*conn);
732 t_fdm->removeReadFD(fd);
733 tmp.closeAndCleanup(); // conn loses validity here..
734
735 return;
736 }
737 conn->bytesread+=bytes;
738 if(conn->bytesread==conn->qlen) {
739 TCPConnection tconn(*conn);
740 t_fdm->removeReadFD(fd); // should no longer awake ourselves when there is data to read
741
742 DNSComboWriter* dc=0;
743 try {
744 dc=new DNSComboWriter(tconn.data, tconn.qlen, g_now);
745 }
746 catch(MOADNSException &mde) {
747 g_stats.clientParseError++;
748 if(g_logCommonErrors)
749 L<<Logger::Error<<"Unable to parse packet from TCP client "<< tconn.remote.toString() <<endl;
750 tconn.closeAndCleanup();
751 return;
752 }
753
754 dc->setSocket(tconn.fd);
755 dc->d_tcp=true;
756 dc->setRemote(&tconn.remote);
757 if(dc->d_mdp.d_header.qr) {
758 delete dc;
759 L<<Logger::Error<<"Ignoring answer on server socket!"<<endl;
760 tconn.closeAndCleanup();
761 return;
762 }
763 else {
764 ++g_stats.qcounter;
765 ++g_stats.tcpqcounter;
766 MT->makeThread(startDoResolve, dc); // deletes dc, will set state to BYTE0 again
767 return;
768 }
769 }
770 }
771 }
772
773 //! Handle new incoming TCP connection
774 void handleNewTCPQuestion(int fd, FDMultiplexer::funcparam_t& )
775 {
776 ComboAddress addr;
777 socklen_t addrlen=sizeof(addr);
778 int newsock=(int)accept(fd, (struct sockaddr*)&addr, &addrlen);
779 if(newsock>0) {
780 if(MT->numProcesses() > g_maxMThreads) {
781 g_stats.overCapacityDrops++;
782 Utility::closesocket(newsock);
783 return;
784 }
785
786 t_remotes->addRemote(addr);
787 if(t_allowFrom && !t_allowFrom->match(&addr)) {
788 if(!g_quiet)
789 L<<Logger::Error<<"["<<MT->getTid()<<"] dropping TCP query from "<<addr.toString()<<", address not matched by allow-from"<<endl;
790
791 g_stats.unauthorizedTCP++;
792 Utility::closesocket(newsock);
793 return;
794 }
795 if(g_maxTCPPerClient && t_tcpClientCounts->count(addr) && (*t_tcpClientCounts)[addr] >= g_maxTCPPerClient) {
796 g_stats.tcpClientOverflow++;
797 Utility::closesocket(newsock); // don't call TCPConnection::closeAndCleanup here - did not enter it in the counts yet!
798 return;
799 }
800 (*t_tcpClientCounts)[addr]++;
801 Utility::setNonBlocking(newsock);
802 TCPConnection tc;
803 tc.fd=newsock;
804 tc.state=TCPConnection::BYTE0;
805 tc.remote=addr;
806 TCPConnection::incCurrentConnections();
807 t_fdm->addReadFD(tc.fd, handleRunningTCPQuestion, tc);
808
809 struct timeval now;
810 Utility::gettimeofday(&now, 0);
811 t_fdm->setReadTTD(tc.fd, now, g_tcpTimeout);
812 }
813 }
814
815 string* doProcessUDPQuestion(const std::string& question, const ComboAddress& fromaddr, int fd)
816 {
817 ++g_stats.qcounter;
818
819 string response;
820 try {
821 uint32_t age;
822 if(!SyncRes::s_nopacketcache && t_packetCache->getResponsePacket(question, g_now.tv_sec, &response, &age)) {
823 if(!g_quiet)
824 L<<Logger::Error<<t_id<< " question answered from packet cache from "<<fromaddr.toString()<<endl;
825
826 g_stats.packetCacheHits++;
827 SyncRes::s_queries++;
828 ageDNSPacket(response, age);
829 sendto(fd, response.c_str(), response.length(), 0, (struct sockaddr*) &fromaddr, fromaddr.getSocklen());
830 if(response.length() >= sizeof(struct dnsheader))
831 updateRcodeStats(((struct dnsheader*)response.c_str())->rcode);
832 g_stats.avgLatencyUsec=(uint64_t)((1-0.0001)*g_stats.avgLatencyUsec + 0); // we assume 0 usec
833 return 0;
834 }
835 }
836 catch(std::exception& e) {
837 L<<Logger::Error<<"Error processing or aging answer packet: "<<e.what()<<endl;
838 return 0;
839 }
840
841
842 if(MT->numProcesses() > g_maxMThreads) {
843 g_stats.overCapacityDrops++;
844 return 0;
845 }
846
847 DNSComboWriter* dc = new DNSComboWriter(question.c_str(), question.size(), g_now);
848 dc->setSocket(fd);
849 dc->setRemote(&fromaddr);
850
851 dc->d_tcp=false;
852 MT->makeThread(startDoResolve, (void*) dc); // deletes dc
853 return 0;
854 }
855
856 void handleNewUDPQuestion(int fd, FDMultiplexer::funcparam_t& var)
857 {
858 int len;
859 char data[1500];
860 ComboAddress fromaddr;
861 socklen_t addrlen=sizeof(fromaddr);
862
863 if((len=recvfrom(fd, data, sizeof(data), 0, (sockaddr *)&fromaddr, &addrlen)) >= 0) {
864 t_remotes->addRemote(fromaddr);
865
866 if(t_allowFrom && !t_allowFrom->match(&fromaddr)) {
867 if(!g_quiet)
868 L<<Logger::Error<<"["<<MT->getTid()<<"] dropping UDP query from "<<fromaddr.toString()<<", address not matched by allow-from"<<endl;
869
870 g_stats.unauthorizedUDP++;
871 return;
872 }
873 try {
874 dnsheader* dh=(dnsheader*)data;
875
876 if(dh->qr) {
877 if(g_logCommonErrors)
878 L<<Logger::Error<<"Ignoring answer from "<<fromaddr.toString()<<" on server socket!"<<endl;
879 }
880 else {
881 string question(data, len);
882 if(g_weDistributeQueries)
883 distributeAsyncFunction(boost::bind(doProcessUDPQuestion, question, fromaddr, fd));
884 else
885 doProcessUDPQuestion(question, fromaddr, fd);
886 }
887 }
888 catch(MOADNSException& mde) {
889 g_stats.clientParseError++;
890 if(g_logCommonErrors)
891 L<<Logger::Error<<"Unable to parse packet from remote UDP client "<<fromaddr.toString() <<": "<<mde.what()<<endl;
892 }
893 }
894 else {
895 // cerr<<t_id<<" had error: "<<stringerror()<<endl;
896 if(errno == EAGAIN)
897 g_stats.noPacketError++;
898 }
899 }
900
901
902 typedef vector<pair<int, function< void(int, any&) > > > deferredAdd_t;
903 deferredAdd_t deferredAdd;
904
905 void makeTCPServerSockets()
906 {
907 int fd;
908 vector<string>locals;
909 stringtok(locals,::arg()["local-address"]," ,");
910
911 if(locals.empty())
912 throw AhuException("No local address specified");
913
914 for(vector<string>::const_iterator i=locals.begin();i!=locals.end();++i) {
915 ServiceTuple st;
916 st.port=::arg().asNum("local-port");
917 parseService(*i, st);
918
919 ComboAddress sin;
920
921 memset((char *)&sin,0, sizeof(sin));
922 sin.sin4.sin_family = AF_INET;
923 if(!IpToU32(st.host, (uint32_t*)&sin.sin4.sin_addr.s_addr)) {
924 sin.sin6.sin6_family = AF_INET6;
925 if(makeIPv6sockaddr(st.host, &sin.sin6) < 0)
926 throw AhuException("Unable to resolve local address for TCP server on '"+ st.host +"'");
927 }
928
929 fd=socket(sin.sin6.sin6_family, SOCK_STREAM, 0);
930 if(fd<0)
931 throw AhuException("Making a TCP server socket for resolver: "+stringerror());
932
933 int tmp=1;
934 if(setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,(char*)&tmp,sizeof tmp)<0) {
935 L<<Logger::Error<<"Setsockopt failed for TCP listening socket"<<endl;
936 exit(1);
937 }
938
939 #ifdef TCP_DEFER_ACCEPT
940 if(setsockopt(fd, SOL_TCP,TCP_DEFER_ACCEPT,(char*)&tmp,sizeof tmp) >= 0) {
941 if(i==locals.begin())
942 L<<Logger::Error<<"Enabled TCP data-ready filter for (slight) DoS protection"<<endl;
943 }
944 #endif
945
946 sin.sin4.sin_port = htons(st.port);
947 int socklen=sin.sin4.sin_family==AF_INET ? sizeof(sin.sin4) : sizeof(sin.sin6);
948 if (::bind(fd, (struct sockaddr *)&sin, socklen )<0)
949 throw AhuException("Binding TCP server socket for "+ st.host +": "+stringerror());
950
951 Utility::setNonBlocking(fd);
952 setSocketSendBuffer(fd, 65000);
953 listen(fd, 128);
954 deferredAdd.push_back(make_pair(fd, handleNewTCPQuestion));
955 g_tcpListenSockets.push_back(fd);
956
957 if(sin.sin4.sin_family == AF_INET)
958 L<<Logger::Error<<"Listening for TCP queries on "<< sin.toString() <<":"<<st.port<<endl;
959 else
960 L<<Logger::Error<<"Listening for TCP queries on ["<< sin.toString() <<"]:"<<st.port<<endl;
961 }
962 }
963
964
965
966 void makeUDPServerSockets()
967 {
968 vector<string>locals;
969 stringtok(locals,::arg()["local-address"]," ,");
970
971 if(locals.empty())
972 throw AhuException("No local address specified");
973
974 if(::arg()["local-address"]=="0.0.0.0") {
975 L<<Logger::Warning<<"It is advised to bind to explicit addresses with the --local-address option"<<endl;
976 }
977
978 for(vector<string>::const_iterator i=locals.begin();i!=locals.end();++i) {
979 ServiceTuple st;
980 st.port=::arg().asNum("local-port");
981 parseService(*i, st);
982
983 ComboAddress sin;
984
985 memset(&sin, 0, sizeof(sin));
986 sin.sin4.sin_family = AF_INET;
987 if(!IpToU32(st.host.c_str() , (uint32_t*)&sin.sin4.sin_addr.s_addr)) {
988 sin.sin6.sin6_family = AF_INET6;
989 if(makeIPv6sockaddr(st.host, &sin.sin6) < 0)
990 throw AhuException("Unable to resolve local address for UDP server on '"+ st.host +"'");
991 }
992
993 int fd=socket(sin.sin4.sin_family, SOCK_DGRAM, 0);
994
995 if(fd < 0) {
996 throw AhuException("Making a UDP server socket for resolver: "+netstringerror());
997 }
998
999 setSocketReceiveBuffer(fd, 200000);
1000 sin.sin4.sin_port = htons(st.port);
1001
1002 int socklen=sin.sin4.sin_family==AF_INET ? sizeof(sin.sin4) : sizeof(sin.sin6);
1003 if (::bind(fd, (struct sockaddr *)&sin, socklen)<0)
1004 throw AhuException("Resolver binding to server socket on port "+ lexical_cast<string>(st.port) +" for "+ st.host+": "+stringerror());
1005
1006 Utility::setNonBlocking(fd);
1007
1008 deferredAdd.push_back(make_pair(fd, handleNewUDPQuestion));
1009 g_listenSocketsAddresses[fd]=sin; // this is written to only from the startup thread, not from the workers
1010 if(sin.sin4.sin_family == AF_INET)
1011 L<<Logger::Error<<"Listening for UDP queries on "<< sin.toString() <<":"<<st.port<<endl;
1012 else
1013 L<<Logger::Error<<"Listening for UDP queries on ["<< sin.toString() <<"]:"<<st.port<<endl;
1014 }
1015 }
1016
1017
1018 #ifndef WIN32
1019 void daemonize(void)
1020 {
1021 if(fork())
1022 exit(0); // bye bye
1023
1024 setsid();
1025
1026 int i=open("/dev/null",O_RDWR); /* open stdin */
1027 if(i < 0)
1028 L<<Logger::Critical<<"Unable to open /dev/null: "<<stringerror()<<endl;
1029 else {
1030 dup2(i,0); /* stdin */
1031 dup2(i,1); /* stderr */
1032 dup2(i,2); /* stderr */
1033 close(i);
1034 }
1035 }
1036 #endif
1037
1038 uint64_t counter;
1039 bool statsWanted;
1040
1041 void usr1Handler(int)
1042 {
1043 statsWanted=true;
1044 }
1045
1046 void usr2Handler(int)
1047 {
1048 SyncRes::setLog(true);
1049 g_quiet=false;
1050 ::arg().set("quiet")="no";
1051
1052 }
1053
1054 void doStats(void)
1055 {
1056 static time_t lastOutputTime;
1057 static uint64_t lastQueryCount;
1058
1059 if(g_stats.qcounter && (t_RC->cacheHits + t_RC->cacheMisses) && SyncRes::s_queries && SyncRes::s_outqueries) { // this only runs once thread 0 has had hits
1060 uint64_t cacheHits = broadcastAccFunction<uint64_t>(pleaseGetCacheHits);
1061 uint64_t cacheMisses = broadcastAccFunction<uint64_t>(pleaseGetCacheMisses);
1062
1063 L<<Logger::Warning<<"stats: "<<g_stats.qcounter<<" questions, "<<
1064 broadcastAccFunction<uint64_t>(pleaseGetCacheSize)<< " cache entries, "<<
1065 broadcastAccFunction<uint64_t>(pleaseGetNegCacheSize)<<" negative entries, "<<
1066 (int)((cacheHits*100.0)/(cacheHits+cacheMisses))<<"% cache hits"<<endl;
1067
1068 L<<Logger::Warning<<"stats: throttle map: "
1069 << broadcastAccFunction<uint64_t>(pleaseGetThrottleSize) <<", ns speeds: "
1070 << broadcastAccFunction<uint64_t>(pleaseGetNsSpeedsSize)<<endl;
1071 L<<Logger::Warning<<"stats: outpacket/query ratio "<<(int)(SyncRes::s_outqueries*100.0/SyncRes::s_queries)<<"%";
1072 L<<Logger::Warning<<", "<<(int)(SyncRes::s_throttledqueries*100.0/(SyncRes::s_outqueries+SyncRes::s_throttledqueries))<<"% throttled, "
1073 <<SyncRes::s_nodelegated<<" no-delegation drops"<<endl;
1074 L<<Logger::Warning<<"stats: "<<SyncRes::s_tcpoutqueries<<" outgoing tcp connections, "<<
1075 broadcastAccFunction<uint64_t>(pleaseGetConcurrentQueries)<<" queries running, "<<SyncRes::s_outgoingtimeouts<<" outgoing timeouts"<<endl;
1076
1077 //L<<Logger::Warning<<"stats: "<<g_stats.ednsPingMatches<<" ping matches, "<<g_stats.ednsPingMismatches<<" mismatches, "<<
1078 //g_stats.noPingOutQueries<<" outqueries w/o ping, "<< g_stats.noEdnsOutQueries<<" w/o EDNS"<<endl;
1079
1080 L<<Logger::Warning<<"stats: " << broadcastAccFunction<uint64_t>(pleaseGetPacketCacheSize) <<
1081 " packet cache entries, "<<(int)(100.0*broadcastAccFunction<uint64_t>(pleaseGetPacketCacheHits)/SyncRes::s_queries) << "% packet cache hits"<<endl;
1082
1083 time_t now = time(0);
1084 if(lastOutputTime && lastQueryCount && now != lastOutputTime) {
1085 L<<Logger::Warning<<"stats: "<< (SyncRes::s_queries - lastQueryCount) / (now - lastOutputTime) <<" qps (average over "<< (now - lastOutputTime) << " seconds)"<<endl;
1086 }
1087 lastOutputTime = now;
1088 lastQueryCount = SyncRes::s_queries;
1089 }
1090 else if(statsWanted)
1091 L<<Logger::Warning<<"stats: no stats yet!"<<endl;
1092
1093 statsWanted=false;
1094 }
1095
1096 static void houseKeeping(void *)
1097 try
1098 {
1099 static __thread time_t last_stat, last_rootupdate, last_prune;
1100 static __thread int cleanCounter=0;
1101 struct timeval now;
1102 Utility::gettimeofday(&now, 0);
1103
1104 // clog<<"* "<<t_id<<" "<<(void*)&last_stat<<"\t"<<(unsigned int)last_stat<<endl;
1105
1106 if(now.tv_sec - last_prune > (time_t)(5 + t_id)) {
1107 DTime dt;
1108 dt.setTimeval(now);
1109 t_RC->doPrune(); // this function is local to a thread, so fine anyhow
1110 t_packetCache->doPruneTo(::arg().asNum("max-packetcache-entries") / g_numThreads);
1111
1112 {
1113 typedef SyncRes::negcache_t::nth_index<1>::type negcache_by_ttd_index_t;
1114 negcache_by_ttd_index_t& ttdindex=boost::multi_index::get<1>(t_sstorage->negcache);
1115 negcache_by_ttd_index_t::iterator i=ttdindex.lower_bound(now.tv_sec);
1116 ttdindex.erase(ttdindex.begin(), i);
1117 }
1118
1119 if(!((cleanCounter++)%40)) { // this is a full scan!
1120 time_t limit=now.tv_sec-300;
1121 for(SyncRes::nsspeeds_t::iterator i = t_sstorage->nsSpeeds.begin() ; i!= t_sstorage->nsSpeeds.end(); )
1122 if(i->second.stale(limit))
1123 t_sstorage->nsSpeeds.erase(i++);
1124 else
1125 ++i;
1126 }
1127 // L<<Logger::Warning<<"Spent "<<dt.udiff()/1000<<" msec cleaning"<<endl;
1128 last_prune=time(0);
1129 }
1130
1131 if(!t_id) {
1132 if(now.tv_sec - last_stat > 1800) {
1133 doStats();
1134 last_stat=time(0);
1135 }
1136 }
1137
1138 if(now.tv_sec - last_rootupdate > 7200) {
1139 SyncRes sr(now);
1140 sr.setDoEDNS0(true);
1141 vector<DNSResourceRecord> ret;
1142
1143 sr.setNoCache();
1144 int res=sr.beginResolve(".", QType(QType::NS), 1, ret);
1145 if(!res) {
1146 L<<Logger::Warning<<"Refreshed . records"<<endl;
1147 last_rootupdate=now.tv_sec;
1148 }
1149 else
1150 L<<Logger::Error<<"Failed to update . records, RCODE="<<res<<endl;
1151 }
1152 }
1153 catch(AhuException& ae)
1154 {
1155 L<<Logger::Error<<"Fatal error: "<<ae.reason<<endl;
1156 throw;
1157 }
1158 ;
1159
1160 void makeThreadPipes()
1161 {
1162 for(unsigned int n=0; n < g_numThreads; ++n) {
1163 struct ThreadPipeSet tps;
1164 int fd[2];
1165 if(pipe(fd) < 0)
1166 unixDie("Creating pipe for inter-thread communications");
1167
1168 tps.readToThread = fd[0];
1169 tps.writeToThread = fd[1];
1170
1171 if(pipe(fd) < 0)
1172 unixDie("Creating pipe for inter-thread communications");
1173 tps.readFromThread = fd[0];
1174 tps.writeFromThread = fd[1];
1175
1176 g_pipes.push_back(tps);
1177 }
1178 }
1179
1180 struct ThreadMSG
1181 {
1182 pipefunc_t func;
1183 bool wantAnswer;
1184 };
1185
1186 void broadcastFunction(const pipefunc_t& func, bool skipSelf)
1187 {
1188 unsigned int n = 0;
1189 BOOST_FOREACH(ThreadPipeSet& tps, g_pipes)
1190 {
1191 if(n++ == t_id) {
1192 if(!skipSelf)
1193 func(); // don't write to ourselves!
1194 continue;
1195 }
1196
1197 ThreadMSG* tmsg = new ThreadMSG();
1198 tmsg->func = func;
1199 tmsg->wantAnswer = true;
1200 if(write(tps.writeToThread, &tmsg, sizeof(tmsg)) != sizeof(tmsg))
1201 unixDie("write to thread pipe returned wrong size or error");
1202
1203 string* resp;
1204 if(read(tps.readFromThread, &resp, sizeof(resp)) != sizeof(resp))
1205 unixDie("read from thread pipe returned wrong size or error");
1206
1207 if(resp) {
1208 // cerr <<"got response: " << *resp << endl;
1209 delete resp;
1210 }
1211 }
1212 }
1213 void distributeAsyncFunction(const pipefunc_t& func)
1214 {
1215 static unsigned int counter;
1216 unsigned int target = 1 + (++counter % (g_pipes.size()-1));
1217 // cerr<<"Sending to: "<<target<<endl;
1218 if(target == t_id) {
1219 func();
1220 return;
1221 }
1222 ThreadPipeSet& tps = g_pipes[target];
1223 ThreadMSG* tmsg = new ThreadMSG();
1224 tmsg->func = func;
1225 tmsg->wantAnswer = false;
1226
1227 if(write(tps.writeToThread, &tmsg, sizeof(tmsg)) != sizeof(tmsg))
1228 unixDie("write to thread pipe returned wrong size or error");
1229
1230 }
1231
1232 void handlePipeRequest(int fd, FDMultiplexer::funcparam_t& var)
1233 {
1234 ThreadMSG* tmsg;
1235
1236 if(read(fd, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) { // fd == readToThread
1237 unixDie("read from thread pipe returned wrong size or error");
1238 }
1239
1240 void *resp = tmsg->func();
1241 if(tmsg->wantAnswer)
1242 if(write(g_pipes[t_id].writeFromThread, &resp, sizeof(resp)) != sizeof(resp))
1243 unixDie("write to thread pipe returned wrong size or error");
1244
1245 delete tmsg;
1246 }
1247
1248 template<class T> void *voider(const boost::function<T*()>& func)
1249 {
1250 return func();
1251 }
1252
1253 vector<ComboAddress>& operator+=(vector<ComboAddress>&a, const vector<ComboAddress>& b)
1254 {
1255 a.insert(a.end(), b.begin(), b.end());
1256 return a;
1257 }
1258
1259 template<class T> T broadcastAccFunction(const boost::function<T*()>& func, bool skipSelf)
1260 {
1261 unsigned int n = 0;
1262 T ret=T();
1263 BOOST_FOREACH(ThreadPipeSet& tps, g_pipes)
1264 {
1265 if(n++ == t_id) {
1266 if(!skipSelf) {
1267 T* resp = (T*)func(); // don't write to ourselves!
1268 if(resp) {
1269 //~ cerr <<"got direct: " << *resp << endl;
1270 ret += *resp;
1271 delete resp;
1272 }
1273 }
1274 continue;
1275 }
1276
1277 ThreadMSG* tmsg = new ThreadMSG();
1278 tmsg->func = boost::bind(voider<T>, func);
1279 tmsg->wantAnswer = true;
1280
1281 if(write(tps.writeToThread, &tmsg, sizeof(tmsg)) != sizeof(tmsg))
1282 unixDie("write to thread pipe returned wrong size or error");
1283
1284
1285 T* resp;
1286 if(read(tps.readFromThread, &resp, sizeof(resp)) != sizeof(resp))
1287 unixDie("read from thread pipe returned wrong size or error");
1288
1289 if(resp) {
1290 //~ cerr <<"got response: " << *resp << endl;
1291 ret += *resp;
1292 delete resp;
1293 }
1294 }
1295 return ret;
1296 }
1297
1298 template string broadcastAccFunction(const boost::function<string*()>& fun, bool skipSelf); // explicit instantiation
1299 template uint64_t broadcastAccFunction(const boost::function<uint64_t*()>& fun, bool skipSelf); // explicit instantiation
1300 template vector<ComboAddress> broadcastAccFunction(const boost::function<vector<ComboAddress> *()>& fun, bool skipSelf); // explicit instantiation
1301
1302 void handleRCC(int fd, FDMultiplexer::funcparam_t& var)
1303 {
1304 string remote;
1305 string msg=s_rcc.recv(&remote);
1306 RecursorControlParser rcp;
1307 RecursorControlParser::func_t* command;
1308 string answer=rcp.getAnswer(msg, &command);
1309 try {
1310 s_rcc.send(answer, &remote);
1311 command();
1312 }
1313 catch(std::exception& e) {
1314 L<<Logger::Error<<"Error dealing with control socket request: "<<e.what()<<endl;
1315 }
1316 catch(AhuException& ae) {
1317 L<<Logger::Error<<"Error dealing with control socket request: "<<ae.reason<<endl;
1318 }
1319 }
1320
1321 void handleTCPClientReadable(int fd, FDMultiplexer::funcparam_t& var)
1322 {
1323 PacketID* pident=any_cast<PacketID>(&var);
1324 // cerr<<"handleTCPClientReadable called for fd "<<fd<<", pident->inNeeded: "<<pident->inNeeded<<", "<<pident->sock->getHandle()<<endl;
1325
1326 shared_array<char> buffer(new char[pident->inNeeded]);
1327
1328 int ret=recv(fd, buffer.get(), pident->inNeeded,0);
1329 if(ret > 0) {
1330 pident->inMSG.append(&buffer[0], &buffer[ret]);
1331 pident->inNeeded-=ret;
1332 if(!pident->inNeeded) {
1333 // cerr<<"Got entire load of "<<pident->inMSG.size()<<" bytes"<<endl;
1334 PacketID pid=*pident;
1335 string msg=pident->inMSG;
1336
1337 t_fdm->removeReadFD(fd);
1338 MT->sendEvent(pid, &msg);
1339 }
1340 else {
1341 // cerr<<"Still have "<<pident->inNeeded<<" left to go"<<endl;
1342 }
1343 }
1344 else {
1345 PacketID tmp=*pident;
1346 t_fdm->removeReadFD(fd); // pident might now be invalid (it isn't, but still)
1347 string empty;
1348 MT->sendEvent(tmp, &empty); // this conveys error status
1349 }
1350 }
1351
1352 void handleTCPClientWritable(int fd, FDMultiplexer::funcparam_t& var)
1353 {
1354 PacketID* pid=any_cast<PacketID>(&var);
1355 int ret=send(fd, pid->outMSG.c_str() + pid->outPos, pid->outMSG.size() - pid->outPos,0);
1356 if(ret > 0) {
1357 pid->outPos+=ret;
1358 if(pid->outPos==pid->outMSG.size()) {
1359 PacketID tmp=*pid;
1360 t_fdm->removeWriteFD(fd);
1361 MT->sendEvent(tmp, &tmp.outMSG); // send back what we sent to convey everything is ok
1362 }
1363 }
1364 else { // error or EOF
1365 PacketID tmp(*pid);
1366 t_fdm->removeWriteFD(fd);
1367 string sent;
1368 MT->sendEvent(tmp, &sent); // we convey error status by sending empty string
1369 }
1370 }
1371
1372 // resend event to everybody chained onto it
1373 void doResends(MT_t::waiters_t::iterator& iter, PacketID resend, const string& content)
1374 {
1375 if(iter->key.chain.empty())
1376 return;
1377 // cerr<<"doResends called!\n";
1378 for(PacketID::chain_t::iterator i=iter->key.chain.begin(); i != iter->key.chain.end() ; ++i) {
1379 resend.fd=-1;
1380 resend.id=*i;
1381 // cerr<<"\tResending "<<content.size()<<" bytes for fd="<<resend.fd<<" and id="<<resend.id<<endl;
1382
1383 MT->sendEvent(resend, &content);
1384 g_stats.chainResends++;
1385 }
1386 }
1387
1388 void handleUDPServerResponse(int fd, FDMultiplexer::funcparam_t& var)
1389 {
1390 PacketID pid=any_cast<PacketID>(var);
1391 int len;
1392 char data[1500];
1393 ComboAddress fromaddr;
1394 socklen_t addrlen=sizeof(fromaddr);
1395
1396 len=recvfrom(fd, data, sizeof(data), 0, (sockaddr *)&fromaddr, &addrlen);
1397
1398 if(len < (int)sizeof(dnsheader)) {
1399 if(len < 0)
1400 ; // cerr<<"Error on fd "<<fd<<": "<<stringerror()<<"\n";
1401 else {
1402 g_stats.serverParseError++;
1403 if(g_logCommonErrors)
1404 L<<Logger::Error<<"Unable to parse packet from remote UDP server "<< sockAddrToString((struct sockaddr_in*) &fromaddr) <<
1405 ": packet smalller than DNS header"<<endl;
1406 }
1407
1408 t_udpclientsocks->returnSocket(fd);
1409 string empty;
1410
1411 MT_t::waiters_t::iterator iter=MT->d_waiters.find(pid);
1412 if(iter != MT->d_waiters.end())
1413 doResends(iter, pid, empty);
1414
1415 MT->sendEvent(pid, &empty); // this denotes error (does lookup again.. at least L1 will be hot)
1416 return;
1417 }
1418
1419 dnsheader dh;
1420 memcpy(&dh, data, sizeof(dh));
1421
1422 if(dh.qr) {
1423 PacketID pident;
1424 pident.remote=fromaddr;
1425 pident.id=dh.id;
1426 pident.fd=fd;
1427 if(!dh.qdcount) { // UPC, Nominum, very old BIND on FormErr, NSD
1428 pident.domain.clear();
1429 pident.type = 0;
1430 }
1431 else {
1432 try {
1433 pident.domain=questionExpand(data, len, pident.type); // don't copy this from above - we need to do the actual read
1434 }
1435 catch(std::exception& e) {
1436 g_stats.serverParseError++; // won't be fed to lwres.cc, so we have to increment
1437 L<<Logger::Warning<<"Error in packet from "<<sockAddrToString((struct sockaddr_in*) &fromaddr) << ": "<<e.what() << endl;
1438 return;
1439 }
1440 }
1441 string packet;
1442 packet.assign(data, len);
1443
1444 MT_t::waiters_t::iterator iter=MT->d_waiters.find(pident);
1445 if(iter != MT->d_waiters.end()) {
1446 doResends(iter, pident, packet);
1447 }
1448
1449 retryWithName:
1450
1451 if(!MT->sendEvent(pident, &packet)) {
1452 // we do a full scan for outstanding queries on unexpected answers. not too bad since we only accept them on the right port number, which is hard enough to guess
1453 for(MT_t::waiters_t::iterator mthread=MT->d_waiters.begin(); mthread!=MT->d_waiters.end(); ++mthread) {
1454 if(pident.fd==mthread->key.fd && mthread->key.remote==pident.remote && mthread->key.type == pident.type &&
1455 pdns_iequals(pident.domain, mthread->key.domain)) {
1456 mthread->key.nearMisses++;
1457 }
1458
1459 // be a bit paranoid here since we're weakening our matching
1460 if(pident.domain.empty() && !mthread->key.domain.empty() && !pident.type && mthread->key.type &&
1461 pident.id == mthread->key.id && mthread->key.remote == pident.remote) {
1462 // cerr<<"Empty response, rest matches though, sending to a waiter"<<endl;
1463 pident.domain = mthread->key.domain;
1464 pident.type = mthread->key.type;
1465 goto retryWithName; // note that this only passes on an error, lwres will still reject the packet
1466 }
1467 }
1468 g_stats.unexpectedCount++; // if we made it here, it really is an unexpected answer
1469 if(g_logCommonErrors)
1470 L<<Logger::Warning<<"Discarding unexpected packet from "<<fromaddr.toStringWithPort()<<": "<<pident.domain<<", "<<pident.type<<endl;
1471 }
1472 else if(fd >= 0) {
1473 t_udpclientsocks->returnSocket(fd);
1474 }
1475 }
1476 else
1477 L<<Logger::Warning<<"Ignoring question on outgoing socket from "<< sockAddrToString((struct sockaddr_in*) &fromaddr) <<endl;
1478 }
1479
1480 FDMultiplexer* getMultiplexer()
1481 {
1482 FDMultiplexer* ret;
1483 for(FDMultiplexer::FDMultiplexermap_t::const_iterator i = FDMultiplexer::getMultiplexerMap().begin();
1484 i != FDMultiplexer::getMultiplexerMap().end(); ++i) {
1485 try {
1486 ret=i->second();
1487 return ret;
1488 }
1489 catch(FDMultiplexerException &fe) {
1490 L<<Logger::Error<<"Non-fatal error initializing possible multiplexer ("<<fe.what()<<"), falling back"<<endl;
1491 }
1492 catch(...) {
1493 L<<Logger::Error<<"Non-fatal error initializing possible multiplexer"<<endl;
1494 }
1495 }
1496 L<<Logger::Error<<"No working multiplexer found!"<<endl;
1497 exit(1);
1498 }
1499
1500
1501 void* doReloadLuaScript()
1502 {
1503 string fname= ::arg()["lua-dns-script"];
1504 try {
1505 if(fname.empty()) {
1506 t_pdl->reset();
1507 L<<Logger::Error<<t_id<<" Unloaded current lua script"<<endl;
1508 }
1509 else {
1510 *t_pdl = shared_ptr<PowerDNSLua>(new PowerDNSLua(fname));
1511 }
1512 }
1513 catch(std::exception& e) {
1514 L<<Logger::Error<<t_id<<" Retaining current script, error from '"<<fname<<"': "<< e.what() <<endl;
1515 }
1516
1517 L<<Logger::Warning<<t_id<<" (Re)loaded lua script from '"<<fname<<"'"<<endl;
1518 return 0;
1519 }
1520
1521 string doQueueReloadLuaScript(vector<string>::const_iterator begin, vector<string>::const_iterator end)
1522 {
1523 if(begin != end)
1524 ::arg().set("lua-dns-script") = *begin;
1525
1526 broadcastFunction(doReloadLuaScript);
1527
1528 return "ok, reload/unload queued\n";
1529 }
1530
1531 void* recursorThread(void*);
1532
1533 void* pleaseSupplantACLs(NetmaskGroup *ng)
1534 {
1535 t_allowFrom = ng;
1536 return 0;
1537 }
1538
1539 void parseACLs()
1540 {
1541 static bool l_initialized;
1542
1543 if(l_initialized) { // only reload configuration file on second call
1544 string configname=::arg()["config-dir"]+"/recursor.conf";
1545 cleanSlashes(configname);
1546
1547 if(!::arg().preParseFile(configname.c_str(), "allow-from-file"))
1548 L<<Logger::Warning<<"Unable to re-parse configuration file '"<<configname<<"'"<<endl;
1549
1550 ::arg().preParseFile(configname.c_str(), "allow-from", LOCAL_NETS);
1551 }
1552
1553 NetmaskGroup* oldAllowFrom = t_allowFrom, *allowFrom=new NetmaskGroup;
1554
1555 if(!::arg()["allow-from-file"].empty()) {
1556 string line;
1557 ifstream ifs(::arg()["allow-from-file"].c_str());
1558 if(!ifs) {
1559 delete allowFrom;
1560 throw runtime_error("Could not open '"+::arg()["allow-from-file"]+"': "+stringerror());
1561 }
1562
1563 string::size_type pos;
1564 while(getline(ifs,line)) {
1565 pos=line.find('#');
1566 if(pos!=string::npos)
1567 line.resize(pos);
1568 trim(line);
1569 if(line.empty())
1570 continue;
1571
1572 allowFrom->addMask(line);
1573 }
1574 L<<Logger::Warning<<"Done parsing " << allowFrom->size() <<" allow-from ranges from file '"<<::arg()["allow-from-file"]<<"' - overriding 'allow-from' setting"<<endl;
1575 }
1576 else if(!::arg()["allow-from"].empty()) {
1577 vector<string> ips;
1578 stringtok(ips, ::arg()["allow-from"], ", ");
1579
1580 L<<Logger::Warning<<"Only allowing queries from: ";
1581 for(vector<string>::const_iterator i = ips.begin(); i!= ips.end(); ++i) {
1582 allowFrom->addMask(*i);
1583 if(i!=ips.begin())
1584 L<<Logger::Warning<<", ";
1585 L<<Logger::Warning<<*i;
1586 }
1587 L<<Logger::Warning<<endl;
1588 }
1589 else {
1590 if(::arg()["local-address"]!="127.0.0.1" && ::arg().asNum("local-port")==53)
1591 L<<Logger::Error<<"WARNING: Allowing queries from all IP addresses - this can be a security risk!"<<endl;
1592 delete allowFrom;
1593 allowFrom = 0;
1594 }
1595
1596 g_initialAllowFrom = allowFrom;
1597 broadcastFunction(boost::bind(pleaseSupplantACLs, allowFrom));
1598 delete oldAllowFrom;
1599
1600 l_initialized = true;
1601 }
1602
1603 int serviceMain(int argc, char*argv[])
1604 {
1605 L.setName("pdns_recursor");
1606
1607 L.setLoglevel((Logger::Urgency)(6)); // info and up
1608
1609 if(!::arg()["logging-facility"].empty()) {
1610 boost::optional<int> val=logFacilityToLOG(::arg().asNum("logging-facility") );
1611 if(val)
1612 theL().setFacility(*val);
1613 else
1614 L<<Logger::Error<<"Unknown logging facility "<<::arg().asNum("logging-facility") <<endl;
1615 }
1616
1617 L<<Logger::Warning<<"PowerDNS recursor "<<VERSION<<" (C) 2001-2010 PowerDNS.COM BV ("<<__DATE__", "__TIME__;
1618 #ifdef __GNUC__
1619 L<<", gcc "__VERSION__;
1620 #endif // add other compilers here
1621 #ifdef _MSC_VER
1622 L<<", MSVC "<<_MSC_VER;
1623 #endif
1624 L<<") starting up"<<endl;
1625
1626 L<<Logger::Warning<<"PowerDNS comes with ABSOLUTELY NO WARRANTY. "
1627 "This is free software, and you are welcome to redistribute it "
1628 "according to the terms of the GPL version 2."<<endl;
1629
1630 L<<Logger::Warning<<"Operating in "<<(sizeof(unsigned long)*8) <<" bits mode"<<endl;
1631
1632 #if 0
1633 unsigned int maxFDs, curFDs;
1634 getFDLimits(curFDs, maxFDs);
1635 if(curFDs < 2048)
1636 L<<Logger::Warning<<"Only "<<curFDs<<" file descriptors available (out of: "<<maxFDs<<"), may not be suitable for high performance"<<endl;
1637 #endif
1638
1639 seedRandom(::arg()["entropy-source"]);
1640
1641 parseACLs();
1642
1643 if(!::arg()["dont-query"].empty()) {
1644 g_dontQuery=new NetmaskGroup;
1645 vector<string> ips;
1646 stringtok(ips, ::arg()["dont-query"], ", ");
1647 ips.push_back("0.0.0.0");
1648 ips.push_back("::");
1649
1650 L<<Logger::Warning<<"Will not send queries to: ";
1651 for(vector<string>::const_iterator i = ips.begin(); i!= ips.end(); ++i) {
1652 g_dontQuery->addMask(*i);
1653 if(i!=ips.begin())
1654 L<<Logger::Warning<<", ";
1655 L<<Logger::Warning<<*i;
1656 }
1657 L<<Logger::Warning<<endl;
1658 }
1659
1660 g_quiet=::arg().mustDo("quiet");
1661 g_weDistributeQueries = ::arg().mustDo("pdns-distributes-queries");
1662 if(g_weDistributeQueries) {
1663 L<<Logger::Warning<<"PowerDNS Recursor itself will distribute queries over threads"<<endl;
1664 }
1665
1666 if(::arg().mustDo("trace")) {
1667 SyncRes::setLog(true);
1668 ::arg().set("quiet")="no";
1669 g_quiet=false;
1670 }
1671
1672 try {
1673 vector<string> addrs;
1674 if(!::arg()["query-local-address6"].empty()) {
1675 SyncRes::s_doIPv6=true;
1676 L<<Logger::Error<<"Enabling IPv6 transport for outgoing queries"<<endl;
1677
1678 stringtok(addrs, ::arg()["query-local-address6"], ", ;");
1679 BOOST_FOREACH(const string& addr, addrs) {
1680 g_localQueryAddresses6.push_back(ComboAddress(addr));
1681 }
1682 }
1683 addrs.clear();
1684 stringtok(addrs, ::arg()["query-local-address"], ", ;");
1685 BOOST_FOREACH(const string& addr, addrs) {
1686 g_localQueryAddresses4.push_back(ComboAddress(addr));
1687 }
1688 }
1689 catch(std::exception& e) {
1690 L<<Logger::Error<<"Assigning local query addresses: "<<e.what();
1691 exit(99);
1692 }
1693
1694 SyncRes::s_noEDNSPing = ::arg().mustDo("disable-edns-ping");
1695 SyncRes::s_noEDNS = ::arg().mustDo("disable-edns");
1696
1697 SyncRes::s_nopacketcache = ::arg().mustDo("disable-packetcache");
1698
1699 SyncRes::s_maxnegttl=::arg().asNum("max-negative-ttl");
1700 SyncRes::s_maxcachettl=::arg().asNum("max-cache-ttl");
1701 SyncRes::s_packetcachettl=::arg().asNum("packetcache-ttl");
1702 SyncRes::s_packetcacheservfailttl=::arg().asNum("packetcache-servfail-ttl");
1703 SyncRes::s_serverID=::arg()["server-id"];
1704 if(SyncRes::s_serverID.empty()) {
1705 char tmp[128];
1706 gethostname(tmp, sizeof(tmp)-1);
1707 SyncRes::s_serverID=tmp;
1708 }
1709
1710 g_networkTimeoutMsec = ::arg().asNum("network-timeout");
1711
1712 g_initialDomainMap = parseAuthAndForwards();
1713
1714
1715 g_logCommonErrors=::arg().mustDo("log-common-errors");
1716
1717 makeUDPServerSockets();
1718 makeTCPServerSockets();
1719
1720 for(int forks = 0; forks < ::arg().asNum("processes") - 1; ++forks) {
1721 if(!fork()) // we are child
1722 break;
1723 }
1724
1725 s_pidfname=::arg()["socket-dir"]+"/"+s_programname+".pid";
1726 if(!s_pidfname.empty())
1727 unlink(s_pidfname.c_str()); // remove possible old pid file
1728
1729 #ifndef WIN32
1730 if(::arg().mustDo("daemon")) {
1731 L<<Logger::Warning<<"Calling daemonize, going to background"<<endl;
1732 L.toConsole(Logger::Critical);
1733 daemonize();
1734 }
1735 signal(SIGUSR1,usr1Handler);
1736 signal(SIGUSR2,usr2Handler);
1737 signal(SIGPIPE,SIG_IGN);
1738 writePid();
1739 #endif
1740 makeControlChannelSocket();
1741
1742 int newgid=0;
1743 if(!::arg()["setgid"].empty())
1744 newgid=Utility::makeGidNumeric(::arg()["setgid"]);
1745 int newuid=0;
1746 if(!::arg()["setuid"].empty())
1747 newuid=Utility::makeUidNumeric(::arg()["setuid"]);
1748
1749 if (!::arg()["chroot"].empty()) {
1750 if (chroot(::arg()["chroot"].c_str())<0 || chdir("/") < 0) {
1751 L<<Logger::Error<<"Unable to chroot to '"+::arg()["chroot"]+"': "<<strerror (errno)<<", exiting"<<endl;
1752 exit(1);
1753 }
1754 }
1755
1756 Utility::dropPrivs(newuid, newgid);
1757
1758
1759 g_numThreads = ::arg().asNum("threads") + ::arg().mustDo("pdns-distributes-queries");
1760
1761 makeThreadPipes();
1762
1763 g_tcpTimeout=::arg().asNum("client-tcp-timeout");
1764 g_maxTCPPerClient=::arg().asNum("max-tcp-per-client");
1765 g_maxMThreads=::arg().asNum("max-mthreads");
1766
1767 if(g_numThreads == 1) {
1768 L<<Logger::Warning<<"Operating unthreaded"<<endl;
1769 recursorThread(0);
1770 }
1771 else {
1772 pthread_t tid;
1773 L<<Logger::Warning<<"Launching "<< g_numThreads <<" threads"<<endl;
1774 for(unsigned int n=0; n < g_numThreads; ++n) {
1775 pthread_create(&tid, 0, recursorThread, (void*)n);
1776 }
1777 void* res;
1778
1779
1780 pthread_join(tid, &res);
1781 }
1782 return 0;
1783 }
1784
1785 void* recursorThread(void* ptr)
1786 try
1787 {
1788 t_id=(int) (long) ptr;
1789 SyncRes tmp(g_now); // make sure it allocates tsstorage before we do anything, like primeHints or so..
1790 t_sstorage->domainmap = g_initialDomainMap;
1791 t_allowFrom = g_initialAllowFrom;
1792 t_udpclientsocks = new UDPClientSocks();
1793 t_tcpClientCounts = new tcpClientCounts_t();
1794 primeHints();
1795
1796 t_packetCache = new RecursorPacketCache();
1797
1798 L<<Logger::Warning<<"Done priming cache with root hints"<<endl;
1799
1800 t_RC->d_followRFC2181=::arg().mustDo("auth-can-lower-ttl");
1801 t_pdl = new shared_ptr<PowerDNSLua>();
1802
1803 try {
1804 if(!::arg()["lua-dns-script"].empty()) {
1805 *t_pdl = shared_ptr<PowerDNSLua>(new PowerDNSLua(::arg()["lua-dns-script"]));
1806 L<<Logger::Warning<<"Loaded 'lua' script from '"<<::arg()["lua-dns-script"]<<"'"<<endl;
1807 }
1808
1809 }
1810 catch(std::exception &e) {
1811 L<<Logger::Error<<"Failed to load 'lua' script from '"<<::arg()["lua-dns-script"]<<"': "<<e.what()<<endl;
1812 exit(99);
1813 }
1814
1815 t_remotes = new RemoteKeeper();
1816 t_remotes->remotes.resize(::arg().asNum("remotes-ringbuffer-entries") / g_numThreads);
1817
1818 if(!t_remotes->remotes.empty())
1819 memset(&t_remotes->remotes[0], 0, t_remotes->remotes.size() * sizeof(RemoteKeeper::remotes_t::value_type));
1820
1821
1822 MT=new MTasker<PacketID,string>(::arg().asNum("stack-size"));
1823
1824 PacketID pident;
1825
1826 t_fdm=getMultiplexer();
1827 if(!t_id)
1828 L<<Logger::Error<<"Enabled '"<< t_fdm->getName() << "' multiplexer"<<endl;
1829
1830 t_fdm->addReadFD(g_pipes[t_id].readToThread, handlePipeRequest);
1831
1832 if(!g_weDistributeQueries || !t_id) // if we distribute queries, only t_id = 0 listens
1833 for(deferredAdd_t::const_iterator i=deferredAdd.begin(); i!=deferredAdd.end(); ++i)
1834 t_fdm->addReadFD(i->first, i->second);
1835
1836 if(!t_id) {
1837 t_fdm->addReadFD(s_rcc.d_fd, handleRCC); // control channel
1838 }
1839
1840 unsigned int maxTcpClients=::arg().asNum("max-tcp-clients");
1841
1842 bool listenOnTCP(true);
1843
1844 counter=0; // used to periodically execute certain tasks
1845 for(;;) {
1846 while(MT->schedule(&g_now)); // MTasker letting the mthreads do their thing
1847
1848 if(!(counter%500)) {
1849 MT->makeThread(houseKeeping, 0);
1850 }
1851
1852 if(!(counter%55)) {
1853 typedef vector<pair<int, FDMultiplexer::funcparam_t> > expired_t;
1854 expired_t expired=t_fdm->getTimeouts(g_now);
1855
1856 for(expired_t::iterator i=expired.begin() ; i != expired.end(); ++i) {
1857 TCPConnection conn=any_cast<TCPConnection>(i->second);
1858 if(g_logCommonErrors)
1859 L<<Logger::Warning<<"Timeout from remote TCP client "<< conn.remote.toString() <<endl;
1860 t_fdm->removeReadFD(i->first);
1861 conn.closeAndCleanup();
1862 }
1863 }
1864
1865 counter++;
1866
1867 if(!t_id && statsWanted) {
1868 doStats();
1869 }
1870
1871 Utility::gettimeofday(&g_now, 0);
1872 t_fdm->run(&g_now);
1873 // 'run' updates g_now for us
1874
1875 if(listenOnTCP) {
1876 if(TCPConnection::getCurrentConnections() > maxTcpClients) { // shutdown, too many connections
1877 for(tcpListenSockets_t::iterator i=g_tcpListenSockets.begin(); i != g_tcpListenSockets.end(); ++i)
1878 t_fdm->removeReadFD(*i);
1879 listenOnTCP=false;
1880 }
1881 }
1882 else {
1883 if(TCPConnection::getCurrentConnections() <= maxTcpClients) { // reenable
1884 for(tcpListenSockets_t::iterator i=g_tcpListenSockets.begin(); i != g_tcpListenSockets.end(); ++i)
1885 t_fdm->addReadFD(*i, handleNewTCPQuestion);
1886 listenOnTCP=true;
1887 }
1888 }
1889 }
1890 }
1891 catch(AhuException &ae) {
1892 L<<Logger::Error<<"Exception: "<<ae.reason<<endl;
1893 return 0;
1894 }
1895 catch(std::exception &e) {
1896 L<<Logger::Error<<"STL Exception: "<<e.what()<<endl;
1897 return 0;
1898 }
1899 catch(...) {
1900 L<<Logger::Error<<"any other exception in main: "<<endl;
1901 return 0;
1902 }
1903
1904 #ifdef WIN32
1905 void doWindowsServiceArguments(RecursorService& recursor)
1906 {
1907 if(::arg().mustDo( "register-service" )) {
1908 if ( !recursor.registerService( "The PowerDNS Recursor.", true )) {
1909 cerr << "Could not register service." << endl;
1910 exit( 99 );
1911 }
1912
1913 exit( 0 );
1914 }
1915
1916 if ( ::arg().mustDo( "unregister-service" )) {
1917 recursor.unregisterService();
1918 exit( 0 );
1919 }
1920 }
1921 #endif
1922
1923
1924 int main(int argc, char **argv)
1925 {
1926 g_stats.startupTime=time(0);
1927 reportBasicTypes();
1928
1929 int ret = EXIT_SUCCESS;
1930 #ifdef WIN32
1931 RecursorService service;
1932 WSADATA wsaData;
1933 if(WSAStartup( MAKEWORD( 2, 2 ), &wsaData )) {
1934 cerr<<"Unable to initialize winsock\n";
1935 exit(1);
1936 }
1937 #endif // WIN32
1938
1939 try {
1940 ::arg().set("stack-size","stack size per mthread")="200000";
1941 ::arg().set("soa-minimum-ttl","Don't change")="0";
1942 ::arg().set("soa-serial-offset","Don't change")="0";
1943 ::arg().set("no-shuffle","Don't change")="off";
1944 ::arg().set("aaaa-additional-processing","turn on to do AAAA additional processing (slow)")="off";
1945 ::arg().set("local-port","port to listen on")="53";
1946 ::arg().set("local-address","IP addresses to listen on, separated by spaces or commas. Also accepts ports.")="127.0.0.1";
1947 ::arg().set("trace","if we should output heaps of logging")="off";
1948 ::arg().set("daemon","Operate as a daemon")="yes";
1949 ::arg().set("log-common-errors","If we should log rather common errors")="yes";
1950 ::arg().set("chroot","switch to chroot jail")="";
1951 ::arg().set("setgid","If set, change group id to this gid for more security")="";
1952 ::arg().set("setuid","If set, change user id to this uid for more security")="";
1953 ::arg().set("network-timeout", "Wait this nummer of milliseconds for network i/o")="1500";
1954 ::arg().set("threads", "Launch this number of threads")="2";
1955 ::arg().set("processes", "Launch this number of processes (EXPERIMENTAL, DO NOT CHANGE)")="1";
1956 #ifdef WIN32
1957 ::arg().set("quiet","Suppress logging of questions and answers")="off";
1958 ::arg().setSwitch( "register-service", "Register the service" )= "no";
1959 ::arg().setSwitch( "unregister-service", "Unregister the service" )= "no";
1960 ::arg().setSwitch( "ntservice", "Run as service" )= "no";
1961 ::arg().setSwitch( "use-ntlog", "Use the NT logging facilities" )= "yes";
1962 ::arg().setSwitch( "use-logfile", "Use a log file" )= "no";
1963 ::arg().setSwitch( "logfile", "Filename of the log file" )= "recursor.log";
1964 #else
1965 ::arg().set("quiet","Suppress logging of questions and answers")="";
1966 ::arg().set("logging-facility","Facility to log messages as. 0 corresponds to local0")="";
1967 #endif
1968 ::arg().set("config-dir","Location of configuration directory (recursor.conf)")=SYSCONFDIR;
1969 #ifndef WIN32
1970 ::arg().set("socket-owner","Owner of socket")="";
1971 ::arg().set("socket-group","Group of socket")="";
1972 ::arg().set("socket-mode", "Permissions for socket")="";
1973 #endif
1974
1975 ::arg().set("socket-dir","Where the controlsocket will live")=LOCALSTATEDIR;
1976 ::arg().set("delegation-only","Which domains we only accept delegations from")="";
1977 ::arg().set("query-local-address","Source IP address for sending queries")="0.0.0.0";
1978 ::arg().set("query-local-address6","Source IPv6 address for sending queries")="";
1979 ::arg().set("client-tcp-timeout","Timeout in seconds when talking to TCP clients")="2";
1980 ::arg().set("max-mthreads", "Maximum number of simultaneous Mtasker threads")="2048";
1981 ::arg().set("max-tcp-clients","Maximum number of simultaneous TCP clients")="128";
1982 ::arg().set("hint-file", "If set, load root hints from this file")="";
1983 ::arg().set("max-cache-entries", "If set, maximum number of entries in the main cache")="1000000";
1984 ::arg().set("max-negative-ttl", "maximum number of seconds to keep a negative cached entry in memory")="3600";
1985 ::arg().set("max-cache-ttl", "maximum number of seconds to keep a cached entry in memory")="86400";
1986 ::arg().set("packetcache-ttl", "maximum number of seconds to keep a cached entry in packetcache")="3600";
1987 ::arg().set("max-packetcache-entries", "maximum number of entries to keep in the packetcache")="500000";
1988 ::arg().set("packetcache-servfail-ttl", "maximum number of seconds to keep a cached servfail entry in packetcache")="60";
1989 ::arg().set("server-id", "Returned when queried for 'server.id' TXT or NSID, defaults to hostname")="";
1990 ::arg().set("remotes-ringbuffer-entries", "maximum number of packets to store statistics for")="0";
1991 ::arg().set("version-string", "string reported on version.pdns or version.bind")="PowerDNS Recursor "VERSION" $Id$";
1992 ::arg().set("allow-from", "If set, only allow these comma separated netmasks to recurse")=LOCAL_NETS;
1993 ::arg().set("allow-from-file", "If set, load allowed netmasks from this file")="";
1994 ::arg().set("entropy-source", "If set, read entropy from this file")="/dev/urandom";
1995 ::arg().set("dont-query", "If set, do not query these netmasks for DNS data")=LOCAL_NETS;
1996 ::arg().set("max-tcp-per-client", "If set, maximum number of TCP sessions per client (IP address)")="0";
1997 ::arg().set("spoof-nearmiss-max", "If non-zero, assume spoofing after this many near misses")="20";
1998 ::arg().set("single-socket", "If set, only use a single socket for outgoing queries")="off";
1999 ::arg().set("auth-zones", "Zones for which we have authoritative data, comma separated domain=file pairs ")="";
2000 ::arg().set("forward-zones", "Zones for which we forward queries, comma separated domain=ip pairs")="";
2001 ::arg().set("forward-zones-recurse", "Zones for which we forward queries with recursion bit, comma separated domain=ip pairs")="";
2002 ::arg().set("forward-zones-file", "File with (+)domain=ip pairs for forwarding")="";
2003 ::arg().set("export-etc-hosts", "If we should serve up contents from /etc/hosts")="off";
2004 ::arg().set("etc-hosts-file", "Path to 'hosts' file")="/etc/hosts";
2005 ::arg().set("serve-rfc1918", "If we should be authoritative for RFC 1918 private IP space")="";
2006 ::arg().set("auth-can-lower-ttl", "If we follow RFC 2181 to the letter, an authoritative server can lower the TTL of NS records")="off";
2007 ::arg().set("lua-dns-script", "Filename containing an optional 'lua' script that will be used to modify dns answers")="";
2008 ::arg().setSwitch( "ignore-rd-bit", "Assume each packet requires recursion, for compatability" )= "off";
2009 ::arg().setSwitch( "disable-edns-ping", "Disable EDNSPing" )= "no";
2010 ::arg().setSwitch( "disable-edns", "Disable EDNS" )= "";
2011 ::arg().setSwitch( "disable-packetcache", "Disable packetcache" )= "no";
2012 ::arg().setSwitch( "pdns-distributes-queries", "If PowerDNS itself should distribute queries over threads (EXPERIMENTAL)")="no";
2013
2014
2015 ::arg().setCmd("help","Provide a helpful message");
2016 ::arg().setCmd("version","Print version string ("VERSION")");
2017 ::arg().setCmd("config","Output blank configuration");
2018 L.toConsole(Logger::Info);
2019 ::arg().laxParse(argc,argv); // do a lax parse
2020
2021 if(::arg().mustDo("config")) {
2022 cout<<::arg().configstring()<<endl;
2023 exit(0);
2024 }
2025
2026
2027 string configname=::arg()["config-dir"]+"/recursor.conf";
2028 cleanSlashes(configname);
2029
2030 if(!::arg().file(configname.c_str()))
2031 L<<Logger::Warning<<"Unable to parse configuration file '"<<configname<<"'"<<endl;
2032
2033 ::arg().parse(argc,argv);
2034
2035 ::arg().set("delegation-only")=toLower(::arg()["delegation-only"]);
2036
2037 if(::arg().mustDo("help")) {
2038 cerr<<"syntax:"<<endl<<endl;
2039 cerr<<::arg().helpstring(::arg()["help"])<<endl;
2040 exit(99);
2041 }
2042 if(::arg().mustDo("version")) {
2043 cerr<<"version: "VERSION<<endl;
2044 exit(99);
2045 }
2046
2047 #ifndef WIN32
2048 serviceMain(argc, argv);
2049 #else
2050 doWindowsServiceArguments(service);
2051 L.toNTLog();
2052 RecursorService::instance()->start( argc, argv, ::arg().mustDo( "ntservice" ));
2053 #endif
2054
2055 }
2056 catch(AhuException &ae) {
2057 L<<Logger::Error<<"Exception: "<<ae.reason<<endl;
2058 ret=EXIT_FAILURE;
2059 }
2060 catch(std::exception &e) {
2061 L<<Logger::Error<<"STL Exception: "<<e.what()<<endl;
2062 ret=EXIT_FAILURE;
2063 }
2064 catch(...) {
2065 L<<Logger::Error<<"any other exception in main: "<<endl;
2066 ret=EXIT_FAILURE;
2067 }
2068
2069 #ifdef WIN32
2070 WSACleanup();
2071 #endif // WIN32
2072
2073 return ret;
2074 }