]> git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/pdns_recursor.cc
2f04a574a4f8a05718650466a255d405f692dcb3
[thirdparty/pdns.git] / pdns / pdns_recursor.cc
1 /*
2 PowerDNS Versatile Database Driven Nameserver
3 Copyright (C) 2003 - 2013 PowerDNS.COM BV
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License version 2
7 as published by the Free Software Foundation
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18
19 #include <netdb.h>
20 #include <sys/stat.h>
21 #include <unistd.h>
22 #include <boost/foreach.hpp>
23 #include "json_ws.hh"
24 #include <pthread.h>
25 #include "recpacketcache.hh"
26 #include "utility.hh"
27 #include "dns_random.hh"
28 #include <iostream>
29 #include <errno.h>
30 #include <map>
31 #include <set>
32 #include "recursor_cache.hh"
33 #include "cachecleaner.hh"
34 #include <stdio.h>
35 #include <signal.h>
36 #include <stdlib.h>
37 #include "misc.hh"
38 #include "mtasker.hh"
39 #include <utility>
40 #include "arguments.hh"
41 #include "syncres.hh"
42 #include <fcntl.h>
43 #include <fstream>
44 #include "sstuff.hh"
45 #include <boost/tuple/tuple.hpp>
46 #include <boost/tuple/tuple_comparison.hpp>
47 #include <boost/shared_array.hpp>
48 #include <boost/lexical_cast.hpp>
49 #include <boost/function.hpp>
50 #include <boost/algorithm/string.hpp>
51 #include <netinet/tcp.h>
52 #include "dnsparser.hh"
53 #include "dnswriter.hh"
54 #include "dnsrecords.hh"
55 #include "zoneparser-tng.hh"
56 #include "rec_channel.hh"
57 #include "logger.hh"
58 #include "iputils.hh"
59 #include "mplexer.hh"
60 #include "config.h"
61 #include "lua-recursor.hh"
62 #include "version.hh"
63
64 #ifndef RECURSOR
65 #include "statbag.hh"
66 StatBag S;
67 #endif
68
69 __thread FDMultiplexer* t_fdm;
70 __thread unsigned int t_id;
71 unsigned int g_maxTCPPerClient;
72 unsigned int g_networkTimeoutMsec;
73 bool g_logCommonErrors;
74 __thread shared_ptr<RecursorLua>* t_pdl;
75 __thread RemoteKeeper* t_remotes;
76 __thread shared_ptr<Regex>* t_traceRegex;
77
78 RecursorControlChannel s_rcc; // only active in thread 0
79
80 // for communicating with our threads
81 struct ThreadPipeSet
82 {
83 int writeToThread;
84 int readToThread;
85 int writeFromThread;
86 int readFromThread;
87 };
88
89 vector<ThreadPipeSet> g_pipes; // effectively readonly after startup
90
91 SyncRes::domainmap_t* g_initialDomainMap; // new threads needs this to be setup
92
93 #include "namespaces.hh"
94
95 __thread MemRecursorCache* t_RC;
96 __thread RecursorPacketCache* t_packetCache;
97 RecursorStats g_stats;
98 bool g_quiet;
99
100 bool g_weDistributeQueries; // if true, only 1 thread listens on the incoming query sockets
101
102 static __thread NetmaskGroup* t_allowFrom;
103 static NetmaskGroup* g_initialAllowFrom; // new thread needs to be setup with this
104
105 NetmaskGroup* g_dontQuery;
106 string s_programname="recursor";
107
108 typedef vector<int> tcpListenSockets_t;
109 tcpListenSockets_t g_tcpListenSockets; // shared across threads, but this is fine, never written to from a thread. All threads listen on all sockets
110 int g_tcpTimeout;
111 unsigned int g_maxMThreads;
112 struct timeval g_now; // timestamp, updated (too) frequently
113 map<int, ComboAddress> g_listenSocketsAddresses; // is shared across all threads right now
114
115 __thread MT_t* MT; // the big MTasker
116
117 unsigned int g_numThreads;
118
119 #define LOCAL_NETS "127.0.0.0/8, 10.0.0.0/8, 100.64.0.0/10, 169.254.0.0/16, 192.168.0.0/16, 172.16.0.0/12, ::1/128, fe80::/10"
120
121 //! used to send information to a newborn mthread
122 struct DNSComboWriter {
123 DNSComboWriter(const char* data, uint16_t len, const struct timeval& now) : d_mdp(data, len), d_now(now),
124 d_tcp(false), d_socket(-1)
125 {}
126 MOADNSParser d_mdp;
127 void setRemote(const ComboAddress* sa)
128 {
129 d_remote=*sa;
130 }
131
132 void setSocket(int sock)
133 {
134 d_socket=sock;
135 }
136
137 string getRemote() const
138 {
139 return d_remote.toString();
140 }
141
142 struct timeval d_now;
143 ComboAddress d_remote;
144 bool d_tcp;
145 int d_socket;
146 shared_ptr<TCPConnection> d_tcpConnection;
147 };
148
149
150 ArgvMap &arg()
151 {
152 static ArgvMap theArg;
153 return theArg;
154 }
155
156
157 void handleTCPClientWritable(int fd, FDMultiplexer::funcparam_t& var);
158
159 // -1 is error, 0 is timeout, 1 is success
160 int asendtcp(const string& data, Socket* sock)
161 {
162 PacketID pident;
163 pident.sock=sock;
164 pident.outMSG=data;
165
166 t_fdm->addWriteFD(sock->getHandle(), handleTCPClientWritable, pident);
167 string packet;
168
169 int ret=MT->waitEvent(pident, &packet, g_networkTimeoutMsec);
170
171 if(!ret || ret==-1) { // timeout
172 t_fdm->removeWriteFD(sock->getHandle());
173 }
174 else if(packet.size() !=data.size()) { // main loop tells us what it sent out, or empty in case of an error
175 return -1;
176 }
177 return ret;
178 }
179
180 void handleTCPClientReadable(int fd, FDMultiplexer::funcparam_t& var);
181
182 // -1 is error, 0 is timeout, 1 is success
183 int arecvtcp(string& data, int len, Socket* sock)
184 {
185 data.clear();
186 PacketID pident;
187 pident.sock=sock;
188 pident.inNeeded=len;
189 t_fdm->addReadFD(sock->getHandle(), handleTCPClientReadable, pident);
190
191 int ret=MT->waitEvent(pident,&data, g_networkTimeoutMsec);
192 if(!ret || ret==-1) { // timeout
193 t_fdm->removeReadFD(sock->getHandle());
194 }
195 else if(data.empty()) {// error, EOF or other
196 return -1;
197 }
198
199 return ret;
200 }
201
202 vector<ComboAddress> g_localQueryAddresses4, g_localQueryAddresses6;
203 const ComboAddress g_local4("0.0.0.0"), g_local6("::");
204
205 //! pick a random query local address
206 ComboAddress getQueryLocalAddress(int family, uint16_t port)
207 {
208 ComboAddress ret;
209 if(family==AF_INET) {
210 if(g_localQueryAddresses4.empty())
211 ret = g_local4;
212 else
213 ret = g_localQueryAddresses4[dns_random(g_localQueryAddresses4.size())];
214 ret.sin4.sin_port = htons(port);
215 }
216 else {
217 if(g_localQueryAddresses6.empty())
218 ret = g_local6;
219 else
220 ret = g_localQueryAddresses6[dns_random(g_localQueryAddresses6.size())];
221
222 ret.sin6.sin6_port = htons(port);
223 }
224 return ret;
225 }
226
227 void handleUDPServerResponse(int fd, FDMultiplexer::funcparam_t&);
228
229 void setSocketBuffer(int fd, int optname, uint32_t size)
230 {
231 uint32_t psize=0;
232 socklen_t len=sizeof(psize);
233
234 if(!getsockopt(fd, SOL_SOCKET, optname, (char*)&psize, &len) && psize > size) {
235 L<<Logger::Error<<"Not decreasing socket buffer size from "<<psize<<" to "<<size<<endl;
236 return;
237 }
238
239 if (setsockopt(fd, SOL_SOCKET, optname, (char*)&size, sizeof(size)) < 0 )
240 L<<Logger::Error<<"Warning: unable to raise socket buffer size to "<<size<<": "<<strerror(errno)<<endl;
241 }
242
243
244 static void setSocketReceiveBuffer(int fd, uint32_t size)
245 {
246 setSocketBuffer(fd, SO_RCVBUF, size);
247 }
248
249 static void setSocketSendBuffer(int fd, uint32_t size)
250 {
251 setSocketBuffer(fd, SO_SNDBUF, size);
252 }
253
254
255 // you can ask this class for a UDP socket to send a query from
256 // this socket is not yours, don't even think about deleting it
257 // but after you call 'returnSocket' on it, don't assume anything anymore
258 class UDPClientSocks
259 {
260 unsigned int d_numsocks;
261 unsigned int d_maxsocks;
262 public:
263 UDPClientSocks() : d_numsocks(0), d_maxsocks(5000)
264 {
265 }
266
267 typedef set<int> socks_t;
268 socks_t d_socks;
269
270 // returning -1 means: temporary OS error (ie, out of files), -2 means OS error
271 int getSocket(const ComboAddress& toaddr, int* fd)
272 {
273 *fd=makeClientSocket(toaddr.sin4.sin_family);
274 if(*fd < 0) // temporary error - receive exception otherwise
275 return -1;
276
277 if(connect(*fd, (struct sockaddr*)(&toaddr), toaddr.getSocklen()) < 0) {
278 int err = errno;
279 // returnSocket(*fd);
280 Utility::closesocket(*fd);
281 if(err==ENETUNREACH) // Seth "My Interfaces Are Like A Yo Yo" Arnold special
282 return -2;
283 return -1;
284 }
285
286 d_socks.insert(*fd);
287 d_numsocks++;
288 return 0;
289 }
290
291 void returnSocket(int fd)
292 {
293 socks_t::iterator i=d_socks.find(fd);
294 if(i==d_socks.end()) {
295 throw PDNSException("Trying to return a socket (fd="+lexical_cast<string>(fd)+") not in the pool");
296 }
297 returnSocketLocked(i);
298 }
299
300 // return a socket to the pool, or simply erase it
301 void returnSocketLocked(socks_t::iterator& i)
302 {
303 if(i==d_socks.end()) {
304 throw PDNSException("Trying to return a socket not in the pool");
305 }
306 try {
307 t_fdm->removeReadFD(*i);
308 }
309 catch(FDMultiplexerException& e) {
310 // we sometimes return a socket that has not yet been assigned to t_fdm
311 }
312 Utility::closesocket(*i);
313
314 d_socks.erase(i++);
315 --d_numsocks;
316 }
317
318 // returns -1 for errors which might go away, throws for ones that won't
319 static int makeClientSocket(int family)
320 {
321 int ret=(int)socket(family, SOCK_DGRAM, 0);
322
323 if(ret < 0 && errno==EMFILE) // this is not a catastrophic error
324 return ret;
325
326 if(ret<0)
327 throw PDNSException("Making a socket for resolver (family = "+lexical_cast<string>(family)+"): "+stringerror());
328
329 Utility::setCloseOnExec(ret);
330
331 int tries=10;
332 while(--tries) {
333 uint16_t port;
334
335 if(tries==1) // fall back to kernel 'random'
336 port = 0;
337 else
338 port = 1025 + dns_random(64510);
339
340 ComboAddress sin=getQueryLocalAddress(family, port); // does htons for us
341
342 if (::bind(ret, (struct sockaddr *)&sin, sin.getSocklen()) >= 0)
343 break;
344 }
345 if(!tries)
346 throw PDNSException("Resolver binding to local query client socket: "+stringerror());
347
348 Utility::setNonBlocking(ret);
349 return ret;
350 }
351 };
352
353 static __thread UDPClientSocks* t_udpclientsocks;
354
355 /* these two functions are used by LWRes */
356 // -2 is OS error, -1 is error that depends on the remote, > 0 is success
357 int asendto(const char *data, int len, int flags,
358 const ComboAddress& toaddr, uint16_t id, const string& domain, uint16_t qtype, int* fd)
359 {
360
361 PacketID pident;
362 pident.domain = domain;
363 pident.remote = toaddr;
364 pident.type = qtype;
365
366 // see if there is an existing outstanding request we can chain on to, using partial equivalence function
367 pair<MT_t::waiters_t::iterator, MT_t::waiters_t::iterator> chain=MT->d_waiters.equal_range(pident, PacketIDBirthdayCompare());
368
369 for(; chain.first != chain.second; chain.first++) {
370 if(chain.first->key.fd > -1) { // don't chain onto existing chained waiter!
371 /*
372 cerr<<"Orig: "<<pident.domain<<", "<<pident.remote.toString()<<", id="<<id<<endl;
373 cerr<<"Had hit: "<< chain.first->key.domain<<", "<<chain.first->key.remote.toString()<<", id="<<chain.first->key.id
374 <<", count="<<chain.first->key.chain.size()<<", origfd: "<<chain.first->key.fd<<endl;
375 */
376 chain.first->key.chain.insert(id); // we can chain
377 *fd=-1; // gets used in waitEvent / sendEvent later on
378 return 1;
379 }
380 }
381
382 int ret=t_udpclientsocks->getSocket(toaddr, fd);
383 if(ret < 0)
384 return ret;
385
386 pident.fd=*fd;
387 pident.id=id;
388
389 t_fdm->addReadFD(*fd, handleUDPServerResponse, pident);
390 ret = send(*fd, data, len, 0);
391
392 int tmp = errno;
393
394 if(ret < 0)
395 t_udpclientsocks->returnSocket(*fd);
396
397 errno = tmp; // this is for logging purposes only
398 return ret;
399 }
400
401 // -1 is error, 0 is timeout, 1 is success
402 int arecvfrom(char *data, int len, int flags, const ComboAddress& fromaddr, int *d_len,
403 uint16_t id, const string& domain, uint16_t qtype, int fd, struct timeval* now)
404 {
405 static optional<unsigned int> nearMissLimit;
406 if(!nearMissLimit)
407 nearMissLimit=::arg().asNum("spoof-nearmiss-max");
408
409 PacketID pident;
410 pident.fd=fd;
411 pident.id=id;
412 pident.domain=domain;
413 pident.type = qtype;
414 pident.remote=fromaddr;
415
416 string packet;
417 int ret=MT->waitEvent(pident, &packet, g_networkTimeoutMsec, now);
418
419 if(ret > 0) {
420 if(packet.empty()) // means "error"
421 return -1;
422
423 *d_len=(int)packet.size();
424 memcpy(data,packet.c_str(),min(len,*d_len));
425 if(*nearMissLimit && pident.nearMisses > *nearMissLimit) {
426 L<<Logger::Error<<"Too many ("<<pident.nearMisses<<" > "<<*nearMissLimit<<") bogus answers for '"<<domain<<"' from "<<fromaddr.toString()<<", assuming spoof attempt."<<endl;
427 g_stats.spoofCount++;
428 return -1;
429 }
430 }
431 else {
432 if(fd >= 0)
433 t_udpclientsocks->returnSocket(fd);
434 }
435 return ret;
436 }
437
438
439 string s_pidfname;
440 static void writePid(void)
441 {
442 ofstream of(s_pidfname.c_str(), std::ios_base::app);
443 if(of)
444 of<< Utility::getpid() <<endl;
445 else
446 L<<Logger::Error<<"Requested to write pid for "<<Utility::getpid()<<" to "<<s_pidfname<<" failed: "<<strerror(errno)<<endl;
447 }
448
449 typedef map<ComboAddress, uint32_t, ComboAddress::addressOnlyLessThan> tcpClientCounts_t;
450 tcpClientCounts_t __thread* t_tcpClientCounts;
451
452 TCPConnection::TCPConnection(int fd, const ComboAddress& addr) : d_remote(addr), d_fd(fd)
453 {
454 ++s_currentConnections;
455 (*t_tcpClientCounts)[d_remote]++;
456 }
457
458 TCPConnection::~TCPConnection()
459 {
460 if(Utility::closesocket(d_fd) < 0)
461 unixDie("closing socket for TCPConnection");
462 if(t_tcpClientCounts->count(d_remote) && !(*t_tcpClientCounts)[d_remote]--)
463 t_tcpClientCounts->erase(d_remote);
464 --s_currentConnections;
465 }
466
467 AtomicCounter TCPConnection::s_currentConnections;
468 void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var);
469
470 void updateRcodeStats(int res)
471 {
472 switch(res) {
473 case RCode::ServFail:
474 g_stats.servFails++;
475 break;
476 case RCode::NXDomain:
477 g_stats.nxDomains++;
478 break;
479 case RCode::NoError:
480 g_stats.noErrors++;
481 break;
482 }
483 }
484
485 void startDoResolve(void *p)
486 {
487 DNSComboWriter* dc=(DNSComboWriter *)p;
488 string loginfo="";
489
490 try {
491 loginfo=" (while setting loginfo)";
492 loginfo=" ("+dc->d_mdp.d_qname+"/"+lexical_cast<string>(dc->d_mdp.d_qtype)+" from "+(dc->d_remote.toString())+")";
493 uint32_t maxanswersize= dc->d_tcp ? 65535 : 512;
494 EDNSOpts edo;
495 if(getEDNSOpts(dc->d_mdp, &edo)) {
496 maxanswersize = min(edo.d_packetsize, (uint16_t) (dc->d_tcp ? 65535 : 1680));
497 }
498
499 vector<DNSResourceRecord> ret;
500 vector<uint8_t> packet;
501
502 DNSPacketWriter pw(packet, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_mdp.d_qclass);
503
504 pw.getHeader()->aa=0;
505 pw.getHeader()->ra=1;
506 pw.getHeader()->qr=1;
507 pw.getHeader()->tc=0;
508 pw.getHeader()->id=dc->d_mdp.d_header.id;
509 pw.getHeader()->rd=dc->d_mdp.d_header.rd;
510
511 SyncRes sr(dc->d_now);
512 bool tracedQuery=false; // we could consider letting Lua know about this too
513 if(t_traceRegex->get() && (*t_traceRegex)->match(dc->d_mdp.d_qname)) {
514 sr.setLogMode(SyncRes::Store);
515 tracedQuery=true;
516 }
517
518 if(!g_quiet || tracedQuery)
519 L<<Logger::Warning<<t_id<<" ["<<MT->getTid()<<"] " << (dc->d_tcp ? "TCP " : "") << "question for '"<<dc->d_mdp.d_qname<<"|"
520 <<DNSRecordContent::NumberToType(dc->d_mdp.d_qtype)<<"' from "<<dc->getRemote()<<endl;
521
522 sr.setId(MT->getTid());
523 if(!dc->d_mdp.d_header.rd)
524 sr.setCacheOnly();
525
526 int res;
527
528 bool variableAnswer = false;
529 // if there is a RecursorLua active, and it 'took' the query in preResolve, we don't launch beginResolve
530 if(!t_pdl->get() || !(*t_pdl)->preresolve(dc->d_remote, g_listenSocketsAddresses[dc->d_socket], dc->d_mdp.d_qname, QType(dc->d_mdp.d_qtype), ret, res, &variableAnswer)) {
531 res = sr.beginResolve(dc->d_mdp.d_qname, QType(dc->d_mdp.d_qtype), dc->d_mdp.d_qclass, ret);
532
533 if(t_pdl->get()) {
534 if(res == RCode::NoError) {
535 vector<DNSResourceRecord>::const_iterator i;
536 for(i=ret.begin(); i!=ret.end(); ++i)
537 if(i->qtype.getCode() == dc->d_mdp.d_qtype && i->d_place == DNSResourceRecord::ANSWER)
538 break;
539 if(i == ret.end())
540 (*t_pdl)->nodata(dc->d_remote, g_listenSocketsAddresses[dc->d_socket], dc->d_mdp.d_qname, QType(dc->d_mdp.d_qtype), ret, res, &variableAnswer);
541 }
542 else if(res == RCode::NXDomain)
543 (*t_pdl)->nxdomain(dc->d_remote, g_listenSocketsAddresses[dc->d_socket], dc->d_mdp.d_qname, QType(dc->d_mdp.d_qtype), ret, res, &variableAnswer);
544
545 (*t_pdl)->postresolve(dc->d_remote, g_listenSocketsAddresses[dc->d_socket], dc->d_mdp.d_qname, QType(dc->d_mdp.d_qtype), ret, res, &variableAnswer);
546 }
547 }
548
549 uint32_t minTTL=std::numeric_limits<uint32_t>::max();
550
551 if(tracedQuery || res < 0 || res == RCode::ServFail || pw.getHeader()->rcode == RCode::ServFail)
552 {
553 string trace(sr.getTrace());
554 if(!trace.empty()) {
555 vector<string> lines;
556 boost::split(lines, trace, boost::is_any_of("\n"));
557 BOOST_FOREACH(const string& line, lines) {
558 if(!line.empty())
559 L<<Logger::Warning<< line << endl;
560 }
561 }
562 }
563
564 if(res < 0) {
565 pw.getHeader()->rcode=RCode::ServFail;
566 // no commit here, because no record
567 g_stats.servFails++;
568 }
569 else {
570 pw.getHeader()->rcode=res;
571 updateRcodeStats(res);
572
573 if(ret.size()) {
574 orderAndShuffle(ret);
575
576 for(vector<DNSResourceRecord>::const_iterator i=ret.begin(); i!=ret.end(); ++i) {
577 pw.startRecord(i->qname, i->qtype.getCode(), i->ttl, i->qclass, (DNSPacketWriter::Place)i->d_place);
578 minTTL = min(minTTL, i->ttl);
579 if(i->qtype.getCode() == QType::A) { // blast out A record w/o doing whole dnswriter thing
580 uint32_t ip=0;
581 IpToU32(i->content, &ip);
582 pw.xfr32BitInt(htonl(ip));
583 } else {
584 shared_ptr<DNSRecordContent> drc(DNSRecordContent::mastermake(i->qtype.getCode(), i->qclass, i->content));
585 drc->toPacket(pw);
586 }
587 if(pw.size() > maxanswersize) {
588 pw.rollback();
589 if(i->d_place==DNSResourceRecord::ANSWER) // only truncate if we actually omitted parts of the answer
590 pw.getHeader()->tc=1;
591 goto sendit; // need to jump over pw.commit
592 }
593 }
594
595 pw.commit();
596 }
597 }
598 sendit:;
599 if(!dc->d_tcp) {
600 sendto(dc->d_socket, (const char*)&*packet.begin(), packet.size(), 0, (struct sockaddr *)(&dc->d_remote), dc->d_remote.getSocklen());
601 if(!SyncRes::s_nopacketcache && !variableAnswer ) {
602 t_packetCache->insertResponsePacket(string((const char*)&*packet.begin(), packet.size()), g_now.tv_sec,
603 min(minTTL,
604 (pw.getHeader()->rcode == RCode::ServFail) ? SyncRes::s_packetcacheservfailttl : SyncRes::s_packetcachettl
605 )
606 );
607 }
608 }
609 else {
610 char buf[2];
611 buf[0]=packet.size()/256;
612 buf[1]=packet.size()%256;
613
614 Utility::iovec iov[2];
615
616 iov[0].iov_base=(void*)buf; iov[0].iov_len=2;
617 iov[1].iov_base=(void*)&*packet.begin(); iov[1].iov_len = packet.size();
618
619 int ret=Utility::writev(dc->d_socket, iov, 2);
620 bool hadError=true;
621
622 if(ret == 0)
623 L<<Logger::Error<<"EOF writing TCP answer to "<<dc->getRemote()<<endl;
624 else if(ret < 0 )
625 L<<Logger::Error<<"Error writing TCP answer to "<<dc->getRemote()<<": "<< strerror(errno) <<endl;
626 else if((unsigned int)ret != 2 + packet.size())
627 L<<Logger::Error<<"Oops, partial answer sent to "<<dc->getRemote()<<" for "<<dc->d_mdp.d_qname<<" (size="<< (2 + packet.size()) <<", sent "<<ret<<")"<<endl;
628 else
629 hadError=false;
630
631 // update tcp connection status, either by closing or moving to 'BYTE0'
632
633 if(hadError) {
634 // no need to remove us from FDM, we weren't there
635 dc->d_socket = -1;
636 }
637 else {
638 dc->d_tcpConnection->state=TCPConnection::BYTE0;
639 Utility::gettimeofday(&g_now, 0); // needs to be updated
640 t_fdm->addReadFD(dc->d_socket, handleRunningTCPQuestion, dc->d_tcpConnection);
641 t_fdm->setReadTTD(dc->d_socket, g_now, g_tcpTimeout);
642 }
643 }
644
645 if(!g_quiet) {
646 L<<Logger::Error<<t_id<<" ["<<MT->getTid()<<"] answer to "<<(dc->d_mdp.d_header.rd?"":"non-rd ")<<"question '"<<dc->d_mdp.d_qname<<"|"<<DNSRecordContent::NumberToType(dc->d_mdp.d_qtype);
647 L<<"': "<<ntohs(pw.getHeader()->ancount)<<" answers, "<<ntohs(pw.getHeader()->arcount)<<" additional, took "<<sr.d_outqueries<<" packets, "<<
648 sr.d_throttledqueries<<" throttled, "<<sr.d_timeouts<<" timeouts, "<<sr.d_tcpoutqueries<<" tcp connections, rcode="<<res<<endl;
649 }
650
651 sr.d_outqueries ? t_RC->cacheMisses++ : t_RC->cacheHits++;
652 float spent=makeFloat(sr.d_now-dc->d_now);
653 if(spent < 0.001)
654 g_stats.answers0_1++;
655 else if(spent < 0.010)
656 g_stats.answers1_10++;
657 else if(spent < 0.1)
658 g_stats.answers10_100++;
659 else if(spent < 1.0)
660 g_stats.answers100_1000++;
661 else
662 g_stats.answersSlow++;
663
664 uint64_t newLat=(uint64_t)(spent*1000000);
665 if(newLat < 1000000) // outliers of several minutes exist..
666 g_stats.avgLatencyUsec=(uint64_t)((1-0.0001)*g_stats.avgLatencyUsec + 0.0001*newLat);
667
668 delete dc;
669 dc=0;
670 }
671 catch(PDNSException &ae) {
672 L<<Logger::Error<<"startDoResolve problem"<<loginfo<<": "<<ae.reason<<endl;
673 delete dc;
674 }
675 catch(MOADNSException& e) {
676 L<<Logger::Error<<"DNS parser error"<<loginfo<<": "<<dc->d_mdp.d_qname<<", "<<e.what()<<endl;
677 delete dc;
678 }
679 catch(std::exception& e) {
680 L<<Logger::Error<<"STL error"<<loginfo<<": "<<e.what()<<endl;
681 delete dc;
682 }
683 catch(...) {
684 L<<Logger::Error<<"Any other exception in a resolver context"<<loginfo<<endl;
685 }
686
687 g_stats.maxMThreadStackUsage = max(MT->getMaxStackUsage(), g_stats.maxMThreadStackUsage);
688 }
689
690 void makeControlChannelSocket(int processNum=-1)
691 {
692 string sockname=::arg()["socket-dir"]+"/pdns_"+s_programname;
693 if(processNum >= 0)
694 sockname += "."+lexical_cast<string>(processNum);
695 sockname+=".controlsocket";
696 s_rcc.listen(sockname);
697
698 int sockowner = -1;
699 int sockgroup = -1;
700
701 if (!::arg().isEmpty("socket-group"))
702 sockgroup=::arg().asGid("socket-group");
703 if (!::arg().isEmpty("socket-owner"))
704 sockowner=::arg().asUid("socket-owner");
705
706 if (sockgroup > -1 || sockowner > -1) {
707 if(chown(sockname.c_str(), sockowner, sockgroup) < 0) {
708 unixDie("Failed to chown control socket");
709 }
710 }
711
712 // do mode change if socket-mode is given
713 if(!::arg().isEmpty("socket-mode")) {
714 mode_t sockmode=::arg().asMode("socket-mode");
715 chmod(sockname.c_str(), sockmode);
716 }
717 }
718
719 void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var)
720 {
721 shared_ptr<TCPConnection> conn=any_cast<shared_ptr<TCPConnection> >(var);
722
723 if(conn->state==TCPConnection::BYTE0) {
724 int bytes=recv(conn->getFD(), conn->data, 2, 0);
725 if(bytes==1)
726 conn->state=TCPConnection::BYTE1;
727 if(bytes==2) {
728 conn->qlen=(((unsigned char)conn->data[0]) << 8)+ (unsigned char)conn->data[1];
729 conn->bytesread=0;
730 conn->state=TCPConnection::GETQUESTION;
731 }
732 if(!bytes || bytes < 0) {
733 t_fdm->removeReadFD(fd);
734 return;
735 }
736 }
737 else if(conn->state==TCPConnection::BYTE1) {
738 int bytes=recv(conn->getFD(), conn->data+1, 1, 0);
739 if(bytes==1) {
740 conn->state=TCPConnection::GETQUESTION;
741 conn->qlen=(((unsigned char)conn->data[0]) << 8)+ (unsigned char)conn->data[1];
742 conn->bytesread=0;
743 }
744 if(!bytes || bytes < 0) {
745 if(g_logCommonErrors)
746 L<<Logger::Error<<"TCP client "<< conn->d_remote.toString() <<" disconnected after first byte"<<endl;
747 t_fdm->removeReadFD(fd);
748 return;
749 }
750 }
751 else if(conn->state==TCPConnection::GETQUESTION) {
752 int bytes=recv(conn->getFD(), conn->data + conn->bytesread, conn->qlen - conn->bytesread, 0);
753 if(!bytes || bytes < 0) {
754 L<<Logger::Error<<"TCP client "<< conn->d_remote.toString() <<" disconnected while reading question body"<<endl;
755 t_fdm->removeReadFD(fd);
756 return;
757 }
758 conn->bytesread+=bytes;
759 if(conn->bytesread==conn->qlen) {
760 t_fdm->removeReadFD(fd); // should no longer awake ourselves when there is data to read
761
762 DNSComboWriter* dc=0;
763 try {
764 dc=new DNSComboWriter(conn->data, conn->qlen, g_now);
765 }
766 catch(MOADNSException &mde) {
767 g_stats.clientParseError++;
768 if(g_logCommonErrors)
769 L<<Logger::Error<<"Unable to parse packet from TCP client "<< conn->d_remote.toString() <<endl;
770 return;
771 }
772 dc->d_tcpConnection = conn; // carry the torch
773 dc->setSocket(conn->getFD()); // this is the only time a copy is made of the actual fd
774 dc->d_tcp=true;
775 dc->setRemote(&conn->d_remote);
776 if(dc->d_mdp.d_header.qr) {
777 delete dc;
778 L<<Logger::Error<<"Ignoring answer on server socket!"<<endl;
779 return;
780 }
781 if(dc->d_mdp.d_header.opcode) {
782 delete dc;
783 L<<Logger::Error<<"Ignoring non-query opcode on server socket!"<<endl;
784 return;
785 }
786 else {
787 ++g_stats.qcounter;
788 ++g_stats.tcpqcounter;
789 MT->makeThread(startDoResolve, dc); // deletes dc, will set state to BYTE0 again
790 return;
791 }
792 }
793 }
794 }
795
796 //! Handle new incoming TCP connection
797 void handleNewTCPQuestion(int fd, FDMultiplexer::funcparam_t& )
798 {
799 ComboAddress addr;
800 socklen_t addrlen=sizeof(addr);
801 int newsock=(int)accept(fd, (struct sockaddr*)&addr, &addrlen);
802 if(newsock>0) {
803 if(MT->numProcesses() > g_maxMThreads) {
804 g_stats.overCapacityDrops++;
805 Utility::closesocket(newsock);
806 return;
807 }
808
809 t_remotes->addRemote(addr);
810 if(t_allowFrom && !t_allowFrom->match(&addr)) {
811 if(!g_quiet)
812 L<<Logger::Error<<"["<<MT->getTid()<<"] dropping TCP query from "<<addr.toString()<<", address not matched by allow-from"<<endl;
813
814 g_stats.unauthorizedTCP++;
815 Utility::closesocket(newsock);
816 return;
817 }
818 if(g_maxTCPPerClient && t_tcpClientCounts->count(addr) && (*t_tcpClientCounts)[addr] >= g_maxTCPPerClient) {
819 g_stats.tcpClientOverflow++;
820 Utility::closesocket(newsock); // don't call TCPConnection::closeAndCleanup here - did not enter it in the counts yet!
821 return;
822 }
823
824 Utility::setNonBlocking(newsock);
825 shared_ptr<TCPConnection> tc(new TCPConnection(newsock, addr));
826 tc->state=TCPConnection::BYTE0;
827
828 t_fdm->addReadFD(tc->getFD(), handleRunningTCPQuestion, tc);
829
830 struct timeval now;
831 Utility::gettimeofday(&now, 0);
832 t_fdm->setReadTTD(tc->getFD(), now, g_tcpTimeout);
833 }
834 }
835
836 string* doProcessUDPQuestion(const std::string& question, const ComboAddress& fromaddr, int fd)
837 {
838 ++g_stats.qcounter;
839 if(fromaddr.sin4.sin_family==AF_INET6)
840 g_stats.ipv6qcounter++;
841
842 string response;
843 try {
844 uint32_t age;
845 if(!SyncRes::s_nopacketcache && t_packetCache->getResponsePacket(question, g_now.tv_sec, &response, &age)) {
846 if(!g_quiet)
847 L<<Logger::Error<<t_id<< " question answered from packet cache from "<<fromaddr.toString()<<endl;
848
849 g_stats.packetCacheHits++;
850 SyncRes::s_queries++;
851 ageDNSPacket(response, age);
852 sendto(fd, response.c_str(), response.length(), 0, (struct sockaddr*) &fromaddr, fromaddr.getSocklen());
853 if(response.length() >= sizeof(struct dnsheader)) {
854 struct dnsheader dh;
855 memcpy(&dh, response.c_str(), sizeof(dh));
856 updateRcodeStats(dh.rcode);
857 }
858 g_stats.avgLatencyUsec=(uint64_t)((1-0.0001)*g_stats.avgLatencyUsec + 0); // we assume 0 usec
859 return 0;
860 }
861 }
862 catch(std::exception& e) {
863 L<<Logger::Error<<"Error processing or aging answer packet: "<<e.what()<<endl;
864 return 0;
865 }
866
867
868 if(MT->numProcesses() > g_maxMThreads) {
869 g_stats.overCapacityDrops++;
870 return 0;
871 }
872
873 DNSComboWriter* dc = new DNSComboWriter(question.c_str(), question.size(), g_now);
874 dc->setSocket(fd);
875 dc->setRemote(&fromaddr);
876
877 dc->d_tcp=false;
878 MT->makeThread(startDoResolve, (void*) dc); // deletes dc
879 return 0;
880 }
881
882 void handleNewUDPQuestion(int fd, FDMultiplexer::funcparam_t& var)
883 {
884 int len;
885 char data[1500];
886 ComboAddress fromaddr;
887 socklen_t addrlen=sizeof(fromaddr);
888
889 if((len=recvfrom(fd, data, sizeof(data), 0, (sockaddr *)&fromaddr, &addrlen)) >= 0) {
890 t_remotes->addRemote(fromaddr);
891
892 if(t_allowFrom && !t_allowFrom->match(&fromaddr)) {
893 if(!g_quiet)
894 L<<Logger::Error<<"["<<MT->getTid()<<"] dropping UDP query from "<<fromaddr.toString()<<", address not matched by allow-from"<<endl;
895
896 g_stats.unauthorizedUDP++;
897 return;
898 }
899 try {
900 dnsheader* dh=(dnsheader*)data;
901
902 if(dh->qr) {
903 if(g_logCommonErrors)
904 L<<Logger::Error<<"Ignoring answer from "<<fromaddr.toString()<<" on server socket!"<<endl;
905 }
906 else if(dh->opcode) {
907 if(g_logCommonErrors)
908 L<<Logger::Error<<"Ignoring non-query opcode "<<dh->opcode<<" from "<<fromaddr.toString()<<" on server socket!"<<endl;
909 }
910 else {
911 string question(data, len);
912 if(g_weDistributeQueries)
913 distributeAsyncFunction(boost::bind(doProcessUDPQuestion, question, fromaddr, fd));
914 else
915 doProcessUDPQuestion(question, fromaddr, fd);
916 }
917 }
918 catch(MOADNSException& mde) {
919 g_stats.clientParseError++;
920 if(g_logCommonErrors)
921 L<<Logger::Error<<"Unable to parse packet from remote UDP client "<<fromaddr.toString() <<": "<<mde.what()<<endl;
922 }
923 }
924 else {
925 // cerr<<t_id<<" had error: "<<stringerror()<<endl;
926 if(errno == EAGAIN)
927 g_stats.noPacketError++;
928 }
929 }
930
931
932 typedef vector<pair<int, function< void(int, any&) > > > deferredAdd_t;
933 deferredAdd_t deferredAdd;
934
935 void makeTCPServerSockets()
936 {
937 int fd;
938 vector<string>locals;
939 stringtok(locals,::arg()["local-address"]," ,");
940
941 if(locals.empty())
942 throw PDNSException("No local address specified");
943
944 for(vector<string>::const_iterator i=locals.begin();i!=locals.end();++i) {
945 ServiceTuple st;
946 st.port=::arg().asNum("local-port");
947 parseService(*i, st);
948
949 ComboAddress sin;
950
951 memset((char *)&sin,0, sizeof(sin));
952 sin.sin4.sin_family = AF_INET;
953 if(!IpToU32(st.host, (uint32_t*)&sin.sin4.sin_addr.s_addr)) {
954 sin.sin6.sin6_family = AF_INET6;
955 if(makeIPv6sockaddr(st.host, &sin.sin6) < 0)
956 throw PDNSException("Unable to resolve local address for TCP server on '"+ st.host +"'");
957 }
958
959 fd=socket(sin.sin6.sin6_family, SOCK_STREAM, 0);
960 Utility::setCloseOnExec(fd);
961
962 if(fd<0)
963 throw PDNSException("Making a TCP server socket for resolver: "+stringerror());
964
965 int tmp=1;
966 if(setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,(char*)&tmp,sizeof tmp)<0) {
967 L<<Logger::Error<<"Setsockopt failed for TCP listening socket"<<endl;
968 exit(1);
969 }
970
971 #ifdef TCP_DEFER_ACCEPT
972 if(setsockopt(fd, SOL_TCP,TCP_DEFER_ACCEPT,(char*)&tmp,sizeof tmp) >= 0) {
973 if(i==locals.begin())
974 L<<Logger::Error<<"Enabled TCP data-ready filter for (slight) DoS protection"<<endl;
975 }
976 #endif
977
978 sin.sin4.sin_port = htons(st.port);
979 int socklen=sin.sin4.sin_family==AF_INET ? sizeof(sin.sin4) : sizeof(sin.sin6);
980 if (::bind(fd, (struct sockaddr *)&sin, socklen )<0)
981 throw PDNSException("Binding TCP server socket for "+ st.host +": "+stringerror());
982
983 Utility::setNonBlocking(fd);
984 setSocketSendBuffer(fd, 65000);
985 listen(fd, 128);
986 deferredAdd.push_back(make_pair(fd, handleNewTCPQuestion));
987 g_tcpListenSockets.push_back(fd);
988
989 if(sin.sin4.sin_family == AF_INET)
990 L<<Logger::Error<<"Listening for TCP queries on "<< sin.toString() <<":"<<st.port<<endl;
991 else
992 L<<Logger::Error<<"Listening for TCP queries on ["<< sin.toString() <<"]:"<<st.port<<endl;
993 }
994 }
995
996
997
998 void makeUDPServerSockets()
999 {
1000 vector<string>locals;
1001 stringtok(locals,::arg()["local-address"]," ,");
1002
1003 if(locals.empty())
1004 throw PDNSException("No local address specified");
1005
1006 if(::arg()["local-address"]=="0.0.0.0") {
1007 L<<Logger::Warning<<"It is advised to bind to explicit addresses with the --local-address option"<<endl;
1008 }
1009
1010 for(vector<string>::const_iterator i=locals.begin();i!=locals.end();++i) {
1011 ServiceTuple st;
1012 st.port=::arg().asNum("local-port");
1013 parseService(*i, st);
1014
1015 ComboAddress sin;
1016
1017 memset(&sin, 0, sizeof(sin));
1018 sin.sin4.sin_family = AF_INET;
1019 if(!IpToU32(st.host.c_str() , (uint32_t*)&sin.sin4.sin_addr.s_addr)) {
1020 sin.sin6.sin6_family = AF_INET6;
1021 if(makeIPv6sockaddr(st.host, &sin.sin6) < 0)
1022 throw PDNSException("Unable to resolve local address for UDP server on '"+ st.host +"'");
1023 }
1024
1025 int fd=socket(sin.sin4.sin_family, SOCK_DGRAM, 0);
1026 Utility::setCloseOnExec(fd);
1027
1028 if(fd < 0) {
1029 throw PDNSException("Making a UDP server socket for resolver: "+netstringerror());
1030 }
1031
1032 setSocketReceiveBuffer(fd, 200000);
1033 sin.sin4.sin_port = htons(st.port);
1034
1035 int socklen=sin.sin4.sin_family==AF_INET ? sizeof(sin.sin4) : sizeof(sin.sin6);
1036 if (::bind(fd, (struct sockaddr *)&sin, socklen)<0)
1037 throw PDNSException("Resolver binding to server socket on port "+ lexical_cast<string>(st.port) +" for "+ st.host+": "+stringerror());
1038
1039 Utility::setNonBlocking(fd);
1040
1041 deferredAdd.push_back(make_pair(fd, handleNewUDPQuestion));
1042 g_listenSocketsAddresses[fd]=sin; // this is written to only from the startup thread, not from the workers
1043 if(sin.sin4.sin_family == AF_INET)
1044 L<<Logger::Error<<"Listening for UDP queries on "<< sin.toString() <<":"<<st.port<<endl;
1045 else
1046 L<<Logger::Error<<"Listening for UDP queries on ["<< sin.toString() <<"]:"<<st.port<<endl;
1047 }
1048 }
1049
1050
1051 void daemonize(void)
1052 {
1053 if(fork())
1054 exit(0); // bye bye
1055
1056 setsid();
1057
1058 int i=open("/dev/null",O_RDWR); /* open stdin */
1059 if(i < 0)
1060 L<<Logger::Critical<<"Unable to open /dev/null: "<<stringerror()<<endl;
1061 else {
1062 dup2(i,0); /* stdin */
1063 dup2(i,1); /* stderr */
1064 dup2(i,2); /* stderr */
1065 close(i);
1066 }
1067 }
1068
1069 uint64_t counter;
1070 bool statsWanted;
1071
1072 void usr1Handler(int)
1073 {
1074 statsWanted=true;
1075 }
1076
1077 void usr2Handler(int)
1078 {
1079 SyncRes::setDefaultLogMode(SyncRes::Log);
1080 g_quiet=false;
1081 ::arg().set("quiet")="no";
1082
1083 }
1084
1085 void doStats(void)
1086 {
1087 static time_t lastOutputTime;
1088 static uint64_t lastQueryCount;
1089
1090 if(g_stats.qcounter && (t_RC->cacheHits + t_RC->cacheMisses) && SyncRes::s_queries && SyncRes::s_outqueries) { // this only runs once thread 0 has had hits
1091 uint64_t cacheHits = broadcastAccFunction<uint64_t>(pleaseGetCacheHits);
1092 uint64_t cacheMisses = broadcastAccFunction<uint64_t>(pleaseGetCacheMisses);
1093
1094 L<<Logger::Warning<<"stats: "<<g_stats.qcounter<<" questions, "<<
1095 broadcastAccFunction<uint64_t>(pleaseGetCacheSize)<< " cache entries, "<<
1096 broadcastAccFunction<uint64_t>(pleaseGetNegCacheSize)<<" negative entries, "<<
1097 (int)((cacheHits*100.0)/(cacheHits+cacheMisses))<<"% cache hits"<<endl;
1098
1099 L<<Logger::Warning<<"stats: throttle map: "
1100 << broadcastAccFunction<uint64_t>(pleaseGetThrottleSize) <<", ns speeds: "
1101 << broadcastAccFunction<uint64_t>(pleaseGetNsSpeedsSize)<<endl;
1102 L<<Logger::Warning<<"stats: outpacket/query ratio "<<(int)(SyncRes::s_outqueries*100.0/SyncRes::s_queries)<<"%";
1103 L<<Logger::Warning<<", "<<(int)(SyncRes::s_throttledqueries*100.0/(SyncRes::s_outqueries+SyncRes::s_throttledqueries))<<"% throttled, "
1104 <<SyncRes::s_nodelegated<<" no-delegation drops"<<endl;
1105 L<<Logger::Warning<<"stats: "<<SyncRes::s_tcpoutqueries<<" outgoing tcp connections, "<<
1106 broadcastAccFunction<uint64_t>(pleaseGetConcurrentQueries)<<" queries running, "<<SyncRes::s_outgoingtimeouts<<" outgoing timeouts"<<endl;
1107
1108 //L<<Logger::Warning<<"stats: "<<g_stats.ednsPingMatches<<" ping matches, "<<g_stats.ednsPingMismatches<<" mismatches, "<<
1109 //g_stats.noPingOutQueries<<" outqueries w/o ping, "<< g_stats.noEdnsOutQueries<<" w/o EDNS"<<endl;
1110
1111 L<<Logger::Warning<<"stats: " << broadcastAccFunction<uint64_t>(pleaseGetPacketCacheSize) <<
1112 " packet cache entries, "<<(int)(100.0*broadcastAccFunction<uint64_t>(pleaseGetPacketCacheHits)/SyncRes::s_queries) << "% packet cache hits"<<endl;
1113
1114 time_t now = time(0);
1115 if(lastOutputTime && lastQueryCount && now != lastOutputTime) {
1116 L<<Logger::Warning<<"stats: "<< (SyncRes::s_queries - lastQueryCount) / (now - lastOutputTime) <<" qps (average over "<< (now - lastOutputTime) << " seconds)"<<endl;
1117 }
1118 lastOutputTime = now;
1119 lastQueryCount = SyncRes::s_queries;
1120 }
1121 else if(statsWanted)
1122 L<<Logger::Warning<<"stats: no stats yet!"<<endl;
1123
1124 statsWanted=false;
1125 }
1126
1127 static void houseKeeping(void *)
1128 try
1129 {
1130 static __thread time_t last_stat, last_rootupdate, last_prune;
1131 static __thread int cleanCounter=0;
1132 struct timeval now;
1133 Utility::gettimeofday(&now, 0);
1134
1135 // clog<<"* "<<t_id<<" "<<(void*)&last_stat<<"\t"<<(unsigned int)last_stat<<endl;
1136
1137 if(now.tv_sec - last_prune > (time_t)(5 + t_id)) {
1138 DTime dt;
1139 dt.setTimeval(now);
1140 t_RC->doPrune(); // this function is local to a thread, so fine anyhow
1141 t_packetCache->doPruneTo(::arg().asNum("max-packetcache-entries") / g_numThreads);
1142
1143 pruneCollection(t_sstorage->negcache, ::arg().asNum("max-cache-entries") / (g_numThreads * 10), 200);
1144
1145 if(!((cleanCounter++)%40)) { // this is a full scan!
1146 time_t limit=now.tv_sec-300;
1147 for(SyncRes::nsspeeds_t::iterator i = t_sstorage->nsSpeeds.begin() ; i!= t_sstorage->nsSpeeds.end(); )
1148 if(i->second.stale(limit))
1149 t_sstorage->nsSpeeds.erase(i++);
1150 else
1151 ++i;
1152 }
1153 // L<<Logger::Warning<<"Spent "<<dt.udiff()/1000<<" msec cleaning"<<endl;
1154 last_prune=time(0);
1155 }
1156
1157 if(!t_id) {
1158 if(now.tv_sec - last_stat > 1800) {
1159 doStats();
1160 last_stat=time(0);
1161 }
1162 }
1163
1164 if(now.tv_sec - last_rootupdate > 7200) {
1165 SyncRes sr(now);
1166 sr.setDoEDNS0(true);
1167 vector<DNSResourceRecord> ret;
1168
1169 sr.setNoCache();
1170 int res=sr.beginResolve(".", QType(QType::NS), 1, ret);
1171 if(!res) {
1172 L<<Logger::Warning<<"Refreshed . records"<<endl;
1173 last_rootupdate=now.tv_sec;
1174 }
1175 else
1176 L<<Logger::Error<<"Failed to update . records, RCODE="<<res<<endl;
1177 }
1178 }
1179 catch(PDNSException& ae)
1180 {
1181 L<<Logger::Error<<"Fatal error: "<<ae.reason<<endl;
1182 throw;
1183 }
1184 ;
1185
1186 void makeThreadPipes()
1187 {
1188 for(unsigned int n=0; n < g_numThreads; ++n) {
1189 struct ThreadPipeSet tps;
1190 int fd[2];
1191 if(pipe(fd) < 0)
1192 unixDie("Creating pipe for inter-thread communications");
1193
1194 tps.readToThread = fd[0];
1195 tps.writeToThread = fd[1];
1196
1197 if(pipe(fd) < 0)
1198 unixDie("Creating pipe for inter-thread communications");
1199 tps.readFromThread = fd[0];
1200 tps.writeFromThread = fd[1];
1201
1202 g_pipes.push_back(tps);
1203 }
1204 }
1205
1206 struct ThreadMSG
1207 {
1208 pipefunc_t func;
1209 bool wantAnswer;
1210 };
1211
1212 void broadcastFunction(const pipefunc_t& func, bool skipSelf)
1213 {
1214 unsigned int n = 0;
1215 BOOST_FOREACH(ThreadPipeSet& tps, g_pipes)
1216 {
1217 if(n++ == t_id) {
1218 if(!skipSelf)
1219 func(); // don't write to ourselves!
1220 continue;
1221 }
1222
1223 ThreadMSG* tmsg = new ThreadMSG();
1224 tmsg->func = func;
1225 tmsg->wantAnswer = true;
1226 if(write(tps.writeToThread, &tmsg, sizeof(tmsg)) != sizeof(tmsg))
1227 unixDie("write to thread pipe returned wrong size or error");
1228
1229 string* resp;
1230 if(read(tps.readFromThread, &resp, sizeof(resp)) != sizeof(resp))
1231 unixDie("read from thread pipe returned wrong size or error");
1232
1233 if(resp) {
1234 // cerr <<"got response: " << *resp << endl;
1235 delete resp;
1236 }
1237 }
1238 }
1239 void distributeAsyncFunction(const pipefunc_t& func)
1240 {
1241 static unsigned int counter;
1242 unsigned int target = 1 + (++counter % (g_pipes.size()-1));
1243 // cerr<<"Sending to: "<<target<<endl;
1244 if(target == t_id) {
1245 func();
1246 return;
1247 }
1248 ThreadPipeSet& tps = g_pipes[target];
1249 ThreadMSG* tmsg = new ThreadMSG();
1250 tmsg->func = func;
1251 tmsg->wantAnswer = false;
1252
1253 if(write(tps.writeToThread, &tmsg, sizeof(tmsg)) != sizeof(tmsg))
1254 unixDie("write to thread pipe returned wrong size or error");
1255
1256 }
1257
1258 void handlePipeRequest(int fd, FDMultiplexer::funcparam_t& var)
1259 {
1260 ThreadMSG* tmsg;
1261
1262 if(read(fd, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) { // fd == readToThread
1263 unixDie("read from thread pipe returned wrong size or error");
1264 }
1265
1266 void *resp = tmsg->func();
1267 if(tmsg->wantAnswer)
1268 if(write(g_pipes[t_id].writeFromThread, &resp, sizeof(resp)) != sizeof(resp))
1269 unixDie("write to thread pipe returned wrong size or error");
1270
1271 delete tmsg;
1272 }
1273
1274 template<class T> void *voider(const boost::function<T*()>& func)
1275 {
1276 return func();
1277 }
1278
1279 vector<ComboAddress>& operator+=(vector<ComboAddress>&a, const vector<ComboAddress>& b)
1280 {
1281 a.insert(a.end(), b.begin(), b.end());
1282 return a;
1283 }
1284
1285 template<class T> T broadcastAccFunction(const boost::function<T*()>& func, bool skipSelf)
1286 {
1287 unsigned int n = 0;
1288 T ret=T();
1289 BOOST_FOREACH(ThreadPipeSet& tps, g_pipes)
1290 {
1291 if(n++ == t_id) {
1292 if(!skipSelf) {
1293 T* resp = (T*)func(); // don't write to ourselves!
1294 if(resp) {
1295 //~ cerr <<"got direct: " << *resp << endl;
1296 ret += *resp;
1297 delete resp;
1298 }
1299 }
1300 continue;
1301 }
1302
1303 ThreadMSG* tmsg = new ThreadMSG();
1304 tmsg->func = boost::bind(voider<T>, func);
1305 tmsg->wantAnswer = true;
1306
1307 if(write(tps.writeToThread, &tmsg, sizeof(tmsg)) != sizeof(tmsg))
1308 unixDie("write to thread pipe returned wrong size or error");
1309
1310
1311 T* resp;
1312 if(read(tps.readFromThread, &resp, sizeof(resp)) != sizeof(resp))
1313 unixDie("read from thread pipe returned wrong size or error");
1314
1315 if(resp) {
1316 //~ cerr <<"got response: " << *resp << endl;
1317 ret += *resp;
1318 delete resp;
1319 }
1320 }
1321 return ret;
1322 }
1323
1324 template string broadcastAccFunction(const boost::function<string*()>& fun, bool skipSelf); // explicit instantiation
1325 template uint64_t broadcastAccFunction(const boost::function<uint64_t*()>& fun, bool skipSelf); // explicit instantiation
1326 template vector<ComboAddress> broadcastAccFunction(const boost::function<vector<ComboAddress> *()>& fun, bool skipSelf); // explicit instantiation
1327
1328 void handleRCC(int fd, FDMultiplexer::funcparam_t& var)
1329 {
1330 string remote;
1331 string msg=s_rcc.recv(&remote);
1332 RecursorControlParser rcp;
1333 RecursorControlParser::func_t* command;
1334
1335 string answer=rcp.getAnswer(msg, &command);
1336 try {
1337 s_rcc.send(answer, &remote);
1338 command();
1339 }
1340 catch(std::exception& e) {
1341 L<<Logger::Error<<"Error dealing with control socket request: "<<e.what()<<endl;
1342 }
1343 catch(PDNSException& ae) {
1344 L<<Logger::Error<<"Error dealing with control socket request: "<<ae.reason<<endl;
1345 }
1346 }
1347
1348 void handleTCPClientReadable(int fd, FDMultiplexer::funcparam_t& var)
1349 {
1350 PacketID* pident=any_cast<PacketID>(&var);
1351 // cerr<<"handleTCPClientReadable called for fd "<<fd<<", pident->inNeeded: "<<pident->inNeeded<<", "<<pident->sock->getHandle()<<endl;
1352
1353 shared_array<char> buffer(new char[pident->inNeeded]);
1354
1355 int ret=recv(fd, buffer.get(), pident->inNeeded,0);
1356 if(ret > 0) {
1357 pident->inMSG.append(&buffer[0], &buffer[ret]);
1358 pident->inNeeded-=ret;
1359 if(!pident->inNeeded) {
1360 // cerr<<"Got entire load of "<<pident->inMSG.size()<<" bytes"<<endl;
1361 PacketID pid=*pident;
1362 string msg=pident->inMSG;
1363
1364 t_fdm->removeReadFD(fd);
1365 MT->sendEvent(pid, &msg);
1366 }
1367 else {
1368 // cerr<<"Still have "<<pident->inNeeded<<" left to go"<<endl;
1369 }
1370 }
1371 else {
1372 PacketID tmp=*pident;
1373 t_fdm->removeReadFD(fd); // pident might now be invalid (it isn't, but still)
1374 string empty;
1375 MT->sendEvent(tmp, &empty); // this conveys error status
1376 }
1377 }
1378
1379 void handleTCPClientWritable(int fd, FDMultiplexer::funcparam_t& var)
1380 {
1381 PacketID* pid=any_cast<PacketID>(&var);
1382 int ret=send(fd, pid->outMSG.c_str() + pid->outPos, pid->outMSG.size() - pid->outPos,0);
1383 if(ret > 0) {
1384 pid->outPos+=ret;
1385 if(pid->outPos==pid->outMSG.size()) {
1386 PacketID tmp=*pid;
1387 t_fdm->removeWriteFD(fd);
1388 MT->sendEvent(tmp, &tmp.outMSG); // send back what we sent to convey everything is ok
1389 }
1390 }
1391 else { // error or EOF
1392 PacketID tmp(*pid);
1393 t_fdm->removeWriteFD(fd);
1394 string sent;
1395 MT->sendEvent(tmp, &sent); // we convey error status by sending empty string
1396 }
1397 }
1398
1399 // resend event to everybody chained onto it
1400 void doResends(MT_t::waiters_t::iterator& iter, PacketID resend, const string& content)
1401 {
1402 if(iter->key.chain.empty())
1403 return;
1404 // cerr<<"doResends called!\n";
1405 for(PacketID::chain_t::iterator i=iter->key.chain.begin(); i != iter->key.chain.end() ; ++i) {
1406 resend.fd=-1;
1407 resend.id=*i;
1408 // cerr<<"\tResending "<<content.size()<<" bytes for fd="<<resend.fd<<" and id="<<resend.id<<endl;
1409
1410 MT->sendEvent(resend, &content);
1411 g_stats.chainResends++;
1412 }
1413 }
1414
1415 void handleUDPServerResponse(int fd, FDMultiplexer::funcparam_t& var)
1416 {
1417 PacketID pid=any_cast<PacketID>(var);
1418 int len;
1419 char data[1500];
1420 ComboAddress fromaddr;
1421 socklen_t addrlen=sizeof(fromaddr);
1422
1423 len=recvfrom(fd, data, sizeof(data), 0, (sockaddr *)&fromaddr, &addrlen);
1424
1425 if(len < (int)sizeof(dnsheader)) {
1426 if(len < 0)
1427 ; // cerr<<"Error on fd "<<fd<<": "<<stringerror()<<"\n";
1428 else {
1429 g_stats.serverParseError++;
1430 if(g_logCommonErrors)
1431 L<<Logger::Error<<"Unable to parse packet from remote UDP server "<< fromaddr.toString() <<
1432 ": packet smaller than DNS header"<<endl;
1433 }
1434
1435 t_udpclientsocks->returnSocket(fd);
1436 string empty;
1437
1438 MT_t::waiters_t::iterator iter=MT->d_waiters.find(pid);
1439 if(iter != MT->d_waiters.end())
1440 doResends(iter, pid, empty);
1441
1442 MT->sendEvent(pid, &empty); // this denotes error (does lookup again.. at least L1 will be hot)
1443 return;
1444 }
1445
1446 dnsheader dh;
1447 memcpy(&dh, data, sizeof(dh));
1448
1449 PacketID pident;
1450 pident.remote=fromaddr;
1451 pident.id=dh.id;
1452 pident.fd=fd;
1453
1454 if(!dh.qr) {
1455 L<<Logger::Warning<<"Not taking data from question on outgoing socket from "<< fromaddr.toStringWithPort() <<endl;
1456 }
1457
1458 if(!dh.qdcount || // UPC, Nominum, very old BIND on FormErr, NSD
1459 !dh.qr) { // one weird server
1460 pident.domain.clear();
1461 pident.type = 0;
1462 }
1463 else {
1464 try {
1465 pident.domain=questionExpand(data, len, pident.type); // don't copy this from above - we need to do the actual read
1466 }
1467 catch(std::exception& e) {
1468 g_stats.serverParseError++; // won't be fed to lwres.cc, so we have to increment
1469 L<<Logger::Warning<<"Error in packet from "<< fromaddr.toStringWithPort() << ": "<<e.what() << endl;
1470 return;
1471 }
1472 }
1473 string packet;
1474 packet.assign(data, len);
1475
1476 MT_t::waiters_t::iterator iter=MT->d_waiters.find(pident);
1477 if(iter != MT->d_waiters.end()) {
1478 doResends(iter, pident, packet);
1479 }
1480
1481 retryWithName:
1482
1483 if(!MT->sendEvent(pident, &packet)) {
1484 // we do a full scan for outstanding queries on unexpected answers. not too bad since we only accept them on the right port number, which is hard enough to guess
1485 for(MT_t::waiters_t::iterator mthread=MT->d_waiters.begin(); mthread!=MT->d_waiters.end(); ++mthread) {
1486 if(pident.fd==mthread->key.fd && mthread->key.remote==pident.remote && mthread->key.type == pident.type &&
1487 pdns_iequals(pident.domain, mthread->key.domain)) {
1488 mthread->key.nearMisses++;
1489 }
1490
1491 // be a bit paranoid here since we're weakening our matching
1492 if(pident.domain.empty() && !mthread->key.domain.empty() && !pident.type && mthread->key.type &&
1493 pident.id == mthread->key.id && mthread->key.remote == pident.remote) {
1494 // cerr<<"Empty response, rest matches though, sending to a waiter"<<endl;
1495 pident.domain = mthread->key.domain;
1496 pident.type = mthread->key.type;
1497 goto retryWithName; // note that this only passes on an error, lwres will still reject the packet
1498 }
1499 }
1500 g_stats.unexpectedCount++; // if we made it here, it really is an unexpected answer
1501 if(g_logCommonErrors) {
1502 L<<Logger::Warning<<"Discarding unexpected packet from "<<fromaddr.toStringWithPort()<<": "<<pident.domain<<", "<<pident.type<<", "<<MT->d_waiters.size()<<" waiters"<<endl;
1503 }
1504 }
1505 else if(fd >= 0) {
1506 t_udpclientsocks->returnSocket(fd);
1507 }
1508 }
1509
1510 FDMultiplexer* getMultiplexer()
1511 {
1512 FDMultiplexer* ret;
1513 for(FDMultiplexer::FDMultiplexermap_t::const_iterator i = FDMultiplexer::getMultiplexerMap().begin();
1514 i != FDMultiplexer::getMultiplexerMap().end(); ++i) {
1515 try {
1516 ret=i->second();
1517 return ret;
1518 }
1519 catch(FDMultiplexerException &fe) {
1520 L<<Logger::Error<<"Non-fatal error initializing possible multiplexer ("<<fe.what()<<"), falling back"<<endl;
1521 }
1522 catch(...) {
1523 L<<Logger::Error<<"Non-fatal error initializing possible multiplexer"<<endl;
1524 }
1525 }
1526 L<<Logger::Error<<"No working multiplexer found!"<<endl;
1527 exit(1);
1528 }
1529
1530
1531 string* doReloadLuaScript()
1532 {
1533 string fname= ::arg()["lua-dns-script"];
1534 try {
1535 if(fname.empty()) {
1536 t_pdl->reset();
1537 L<<Logger::Error<<t_id<<" Unloaded current lua script"<<endl;
1538 return new string("unloaded\n");
1539 }
1540 else {
1541 *t_pdl = shared_ptr<RecursorLua>(new RecursorLua(fname));
1542 }
1543 }
1544 catch(std::exception& e) {
1545 L<<Logger::Error<<t_id<<" Retaining current script, error from '"<<fname<<"': "<< e.what() <<endl;
1546 return new string("retaining current script, error from '"+fname+"': "+e.what()+"\n");
1547 }
1548
1549 L<<Logger::Warning<<t_id<<" (Re)loaded lua script from '"<<fname<<"'"<<endl;
1550 return new string("(re)loaded '"+fname+"'\n");
1551 }
1552
1553 string doQueueReloadLuaScript(vector<string>::const_iterator begin, vector<string>::const_iterator end)
1554 {
1555 if(begin != end)
1556 ::arg().set("lua-dns-script") = *begin;
1557
1558 return broadcastAccFunction<string>(doReloadLuaScript);
1559 }
1560
1561 string* pleaseUseNewTraceRegex(const std::string& newRegex)
1562 try
1563 {
1564 if(newRegex.empty()) {
1565 t_traceRegex->reset();
1566 return new string("unset\n");
1567 }
1568 else {
1569 (*t_traceRegex) = shared_ptr<Regex>(new Regex(newRegex));
1570 return new string("ok\n");
1571 }
1572 }
1573 catch(PDNSException& ae)
1574 {
1575 return new string(ae.reason+"\n");
1576 }
1577
1578 string doTraceRegex(vector<string>::const_iterator begin, vector<string>::const_iterator end)
1579 {
1580 return broadcastAccFunction<string>(boost::bind(pleaseUseNewTraceRegex, begin!=end ? *begin : ""));
1581 }
1582
1583
1584 void* recursorThread(void*);
1585
1586 void* pleaseSupplantACLs(NetmaskGroup *ng)
1587 {
1588 t_allowFrom = ng;
1589 return 0;
1590 }
1591
1592 int g_argc;
1593 char** g_argv;
1594
1595 void parseACLs()
1596 {
1597 static bool l_initialized;
1598
1599 if(l_initialized) { // only reload configuration file on second call
1600 string configname=::arg()["config-dir"]+"/recursor.conf";
1601 cleanSlashes(configname);
1602
1603 if(!::arg().preParseFile(configname.c_str(), "allow-from-file"))
1604 L<<Logger::Warning<<"Unable to re-parse configuration file '"<<configname<<"'"<<endl;
1605 ::arg().preParse(g_argc, g_argv, "allow-from-file");
1606 ::arg().preParseFile(configname.c_str(), "allow-from", LOCAL_NETS);
1607 ::arg().preParse(g_argc, g_argv, "allow-from");
1608 }
1609
1610 NetmaskGroup* oldAllowFrom = t_allowFrom, *allowFrom=new NetmaskGroup;
1611
1612 if(!::arg()["allow-from-file"].empty()) {
1613 string line;
1614 ifstream ifs(::arg()["allow-from-file"].c_str());
1615 if(!ifs) {
1616 delete allowFrom;
1617 throw runtime_error("Could not open '"+::arg()["allow-from-file"]+"': "+stringerror());
1618 }
1619
1620 string::size_type pos;
1621 while(getline(ifs,line)) {
1622 pos=line.find('#');
1623 if(pos!=string::npos)
1624 line.resize(pos);
1625 trim(line);
1626 if(line.empty())
1627 continue;
1628
1629 allowFrom->addMask(line);
1630 }
1631 L<<Logger::Warning<<"Done parsing " << allowFrom->size() <<" allow-from ranges from file '"<<::arg()["allow-from-file"]<<"' - overriding 'allow-from' setting"<<endl;
1632 }
1633 else if(!::arg()["allow-from"].empty()) {
1634 vector<string> ips;
1635 stringtok(ips, ::arg()["allow-from"], ", ");
1636
1637 L<<Logger::Warning<<"Only allowing queries from: ";
1638 for(vector<string>::const_iterator i = ips.begin(); i!= ips.end(); ++i) {
1639 allowFrom->addMask(*i);
1640 if(i!=ips.begin())
1641 L<<Logger::Warning<<", ";
1642 L<<Logger::Warning<<*i;
1643 }
1644 L<<Logger::Warning<<endl;
1645 }
1646 else {
1647 if(::arg()["local-address"]!="127.0.0.1" && ::arg().asNum("local-port")==53)
1648 L<<Logger::Error<<"WARNING: Allowing queries from all IP addresses - this can be a security risk!"<<endl;
1649 delete allowFrom;
1650 allowFrom = 0;
1651 }
1652
1653 g_initialAllowFrom = allowFrom;
1654 broadcastFunction(boost::bind(pleaseSupplantACLs, allowFrom));
1655 delete oldAllowFrom;
1656
1657 l_initialized = true;
1658 }
1659
1660 int serviceMain(int argc, char*argv[])
1661 {
1662
1663
1664 L.setName(s_programname);
1665
1666 L.setLoglevel((Logger::Urgency)(6)); // info and up
1667
1668 if(!::arg()["logging-facility"].empty()) {
1669 int val=logFacilityToLOG(::arg().asNum("logging-facility") );
1670 if(val >= 0)
1671 theL().setFacility(val);
1672 else
1673 L<<Logger::Error<<"Unknown logging facility "<<::arg().asNum("logging-facility") <<endl;
1674 }
1675
1676 showProductVersion();
1677
1678 #if 0
1679 unsigned int maxFDs, curFDs;
1680 getFDLimits(curFDs, maxFDs);
1681 if(curFDs < 2048)
1682 L<<Logger::Warning<<"Only "<<curFDs<<" file descriptors available (out of: "<<maxFDs<<"), may not be suitable for high performance"<<endl;
1683 #endif
1684
1685 seedRandom(::arg()["entropy-source"]);
1686
1687 parseACLs();
1688
1689 if(!::arg()["dont-query"].empty()) {
1690 g_dontQuery=new NetmaskGroup;
1691 vector<string> ips;
1692 stringtok(ips, ::arg()["dont-query"], ", ");
1693 ips.push_back("0.0.0.0");
1694 ips.push_back("::");
1695
1696 L<<Logger::Warning<<"Will not send queries to: ";
1697 for(vector<string>::const_iterator i = ips.begin(); i!= ips.end(); ++i) {
1698 g_dontQuery->addMask(*i);
1699 if(i!=ips.begin())
1700 L<<Logger::Warning<<", ";
1701 L<<Logger::Warning<<*i;
1702 }
1703 L<<Logger::Warning<<endl;
1704 }
1705
1706 g_quiet=::arg().mustDo("quiet");
1707 g_weDistributeQueries = ::arg().mustDo("pdns-distributes-queries");
1708 if(g_weDistributeQueries) {
1709 L<<Logger::Warning<<"PowerDNS Recursor itself will distribute queries over threads"<<endl;
1710 }
1711
1712 if(::arg()["trace"]=="fail") {
1713 SyncRes::setDefaultLogMode(SyncRes::Store);
1714 }
1715 else if(::arg().mustDo("trace")) {
1716 SyncRes::setDefaultLogMode(SyncRes::Log);
1717 ::arg().set("quiet")="no";
1718 g_quiet=false;
1719 }
1720
1721
1722 try {
1723 vector<string> addrs;
1724 if(!::arg()["query-local-address6"].empty()) {
1725 SyncRes::s_doIPv6=true;
1726 L<<Logger::Warning<<"Enabling IPv6 transport for outgoing queries"<<endl;
1727
1728 stringtok(addrs, ::arg()["query-local-address6"], ", ;");
1729 BOOST_FOREACH(const string& addr, addrs) {
1730 g_localQueryAddresses6.push_back(ComboAddress(addr));
1731 }
1732 }
1733 else {
1734 L<<Logger::Warning<<"NOT using IPv6 for outgoing queries - set 'query-local-address6=::' to enable"<<endl;
1735 }
1736 addrs.clear();
1737 stringtok(addrs, ::arg()["query-local-address"], ", ;");
1738 BOOST_FOREACH(const string& addr, addrs) {
1739 g_localQueryAddresses4.push_back(ComboAddress(addr));
1740 }
1741 }
1742 catch(std::exception& e) {
1743 L<<Logger::Error<<"Assigning local query addresses: "<<e.what();
1744 exit(99);
1745 }
1746
1747 SyncRes::s_doAAAAAdditionalProcessing = ::arg().mustDo("aaaa-additional-processing");
1748 SyncRes::s_doAdditionalProcessing = ::arg().mustDo("additional-processing") | SyncRes::s_doAAAAAdditionalProcessing;
1749
1750 SyncRes::s_noEDNSPing = true; // ::arg().mustDo("disable-edns-ping");
1751 SyncRes::s_noEDNS = ::arg().mustDo("disable-edns");
1752 if(!SyncRes::s_noEDNS) {
1753 L<<Logger::Warning<<"Running in experimental EDNS mode - may cause problems"<<endl;
1754 }
1755
1756 SyncRes::s_nopacketcache = ::arg().mustDo("disable-packetcache");
1757
1758 SyncRes::s_maxnegttl=::arg().asNum("max-negative-ttl");
1759 SyncRes::s_maxcachettl=::arg().asNum("max-cache-ttl");
1760 SyncRes::s_packetcachettl=::arg().asNum("packetcache-ttl");
1761 SyncRes::s_packetcacheservfailttl=::arg().asNum("packetcache-servfail-ttl");
1762 SyncRes::s_serverID=::arg()["server-id"];
1763 if(SyncRes::s_serverID.empty()) {
1764 char tmp[128];
1765 gethostname(tmp, sizeof(tmp)-1);
1766 SyncRes::s_serverID=tmp;
1767 }
1768
1769 g_networkTimeoutMsec = ::arg().asNum("network-timeout");
1770
1771 g_initialDomainMap = parseAuthAndForwards();
1772
1773
1774 g_logCommonErrors=::arg().mustDo("log-common-errors");
1775
1776 makeUDPServerSockets();
1777 makeTCPServerSockets();
1778
1779 int forks;
1780 for(forks = 0; forks < ::arg().asNum("processes") - 1; ++forks) {
1781 if(!fork()) // we are child
1782 break;
1783 }
1784
1785 s_pidfname=::arg()["socket-dir"]+"/pdns_"+s_programname+".pid";
1786 if(!s_pidfname.empty())
1787 unlink(s_pidfname.c_str()); // remove possible old pid file
1788
1789 if(::arg().mustDo("daemon")) {
1790 L<<Logger::Warning<<"Calling daemonize, going to background"<<endl;
1791 L.toConsole(Logger::Critical);
1792 daemonize();
1793 }
1794 signal(SIGUSR1,usr1Handler);
1795 signal(SIGUSR2,usr2Handler);
1796 signal(SIGPIPE,SIG_IGN);
1797 writePid();
1798 makeControlChannelSocket( ::arg().asNum("processes") > 1 ? forks : -1);
1799
1800 int newgid=0;
1801 if(!::arg()["setgid"].empty())
1802 newgid=Utility::makeGidNumeric(::arg()["setgid"]);
1803 int newuid=0;
1804 if(!::arg()["setuid"].empty())
1805 newuid=Utility::makeUidNumeric(::arg()["setuid"]);
1806
1807 if (!::arg()["chroot"].empty()) {
1808 if (chroot(::arg()["chroot"].c_str())<0 || chdir("/") < 0) {
1809 L<<Logger::Error<<"Unable to chroot to '"+::arg()["chroot"]+"': "<<strerror (errno)<<", exiting"<<endl;
1810 exit(1);
1811 }
1812 }
1813
1814 Utility::dropPrivs(newuid, newgid);
1815 g_numThreads = ::arg().asNum("threads") + ::arg().mustDo("pdns-distributes-queries");
1816
1817 makeThreadPipes();
1818
1819 g_tcpTimeout=::arg().asNum("client-tcp-timeout");
1820 g_maxTCPPerClient=::arg().asNum("max-tcp-per-client");
1821 g_maxMThreads=::arg().asNum("max-mthreads");
1822
1823 if(g_numThreads == 1) {
1824 L<<Logger::Warning<<"Operating unthreaded"<<endl;
1825 recursorThread(0);
1826 }
1827 else {
1828 pthread_t tid;
1829 L<<Logger::Warning<<"Launching "<< g_numThreads <<" threads"<<endl;
1830 for(unsigned int n=0; n < g_numThreads; ++n) {
1831 pthread_create(&tid, 0, recursorThread, (void*)(long)n);
1832 }
1833 void* res;
1834
1835
1836 pthread_join(tid, &res);
1837 }
1838 return 0;
1839 }
1840
1841 void* recursorThread(void* ptr)
1842 try
1843 {
1844 t_id=(int) (long) ptr;
1845 SyncRes tmp(g_now); // make sure it allocates tsstorage before we do anything, like primeHints or so..
1846 t_sstorage->domainmap = g_initialDomainMap;
1847 t_allowFrom = g_initialAllowFrom;
1848 t_udpclientsocks = new UDPClientSocks();
1849 t_tcpClientCounts = new tcpClientCounts_t();
1850 primeHints();
1851
1852 t_packetCache = new RecursorPacketCache();
1853
1854 L<<Logger::Warning<<"Done priming cache with root hints"<<endl;
1855
1856 t_pdl = new shared_ptr<RecursorLua>();
1857
1858 try {
1859 if(!::arg()["lua-dns-script"].empty()) {
1860 *t_pdl = shared_ptr<RecursorLua>(new RecursorLua(::arg()["lua-dns-script"]));
1861 L<<Logger::Warning<<"Loaded 'lua' script from '"<<::arg()["lua-dns-script"]<<"'"<<endl;
1862 }
1863
1864 }
1865 catch(std::exception &e) {
1866 L<<Logger::Error<<"Failed to load 'lua' script from '"<<::arg()["lua-dns-script"]<<"': "<<e.what()<<endl;
1867 exit(99);
1868 }
1869
1870 t_traceRegex = new shared_ptr<Regex>();
1871
1872
1873 t_remotes = new RemoteKeeper();
1874 t_remotes->remotes.resize(::arg().asNum("remotes-ringbuffer-entries") / g_numThreads);
1875
1876 if(!t_remotes->remotes.empty())
1877 memset(&t_remotes->remotes[0], 0, t_remotes->remotes.size() * sizeof(RemoteKeeper::remotes_t::value_type));
1878
1879
1880 MT=new MTasker<PacketID,string>(::arg().asNum("stack-size"));
1881
1882 PacketID pident;
1883
1884 t_fdm=getMultiplexer();
1885 if(!t_id) {
1886 if(::arg().mustDo("experimental-json-interface")) {
1887 L<<Logger::Warning << "Enabling JSON interface" << endl;
1888 try {
1889 new JWebserver(t_fdm);
1890 }
1891 catch(PDNSException &e) {
1892 L<<Logger::Error<<"Exception: "<<e.reason<<endl;
1893 exit(99);
1894 }
1895 }
1896 L<<Logger::Error<<"Enabled '"<< t_fdm->getName() << "' multiplexer"<<endl;
1897 }
1898
1899 t_fdm->addReadFD(g_pipes[t_id].readToThread, handlePipeRequest);
1900
1901 if(!g_weDistributeQueries || !t_id) // if we distribute queries, only t_id = 0 listens
1902 for(deferredAdd_t::const_iterator i=deferredAdd.begin(); i!=deferredAdd.end(); ++i)
1903 t_fdm->addReadFD(i->first, i->second);
1904
1905 if(!t_id) {
1906 t_fdm->addReadFD(s_rcc.d_fd, handleRCC); // control channel
1907 }
1908
1909 unsigned int maxTcpClients=::arg().asNum("max-tcp-clients");
1910
1911 bool listenOnTCP(true);
1912
1913 counter=0; // used to periodically execute certain tasks
1914 for(;;) {
1915 while(MT->schedule(&g_now)); // MTasker letting the mthreads do their thing
1916
1917 if(!(counter%500)) {
1918 MT->makeThread(houseKeeping, 0);
1919 }
1920
1921 if(!(counter%55)) {
1922 typedef vector<pair<int, FDMultiplexer::funcparam_t> > expired_t;
1923 expired_t expired=t_fdm->getTimeouts(g_now);
1924
1925 for(expired_t::iterator i=expired.begin() ; i != expired.end(); ++i) {
1926 shared_ptr<TCPConnection> conn=any_cast<shared_ptr<TCPConnection> >(i->second);
1927 if(g_logCommonErrors)
1928 L<<Logger::Warning<<"Timeout from remote TCP client "<< conn->d_remote.toString() <<endl;
1929 t_fdm->removeReadFD(i->first);
1930 }
1931 }
1932
1933 counter++;
1934
1935 if(!t_id && statsWanted) {
1936 doStats();
1937 }
1938
1939 Utility::gettimeofday(&g_now, 0);
1940 t_fdm->run(&g_now);
1941 // 'run' updates g_now for us
1942
1943 if(listenOnTCP) {
1944 if(TCPConnection::getCurrentConnections() > maxTcpClients) { // shutdown, too many connections
1945 for(tcpListenSockets_t::iterator i=g_tcpListenSockets.begin(); i != g_tcpListenSockets.end(); ++i)
1946 t_fdm->removeReadFD(*i);
1947 listenOnTCP=false;
1948 }
1949 }
1950 else {
1951 if(TCPConnection::getCurrentConnections() <= maxTcpClients) { // reenable
1952 for(tcpListenSockets_t::iterator i=g_tcpListenSockets.begin(); i != g_tcpListenSockets.end(); ++i)
1953 t_fdm->addReadFD(*i, handleNewTCPQuestion);
1954 listenOnTCP=true;
1955 }
1956 }
1957 }
1958 }
1959 catch(PDNSException &ae) {
1960 L<<Logger::Error<<"Exception: "<<ae.reason<<endl;
1961 return 0;
1962 }
1963 catch(std::exception &e) {
1964 L<<Logger::Error<<"STL Exception: "<<e.what()<<endl;
1965 return 0;
1966 }
1967 catch(...) {
1968 L<<Logger::Error<<"any other exception in main: "<<endl;
1969 return 0;
1970 }
1971
1972
1973 int main(int argc, char **argv)
1974 {
1975 g_argc = argc;
1976 g_argv = argv;
1977 g_stats.startupTime=time(0);
1978 versionSetProduct("Recursor");
1979 reportBasicTypes();
1980 reportOtherTypes();
1981
1982 int ret = EXIT_SUCCESS;
1983
1984 try {
1985 ::arg().set("stack-size","stack size per mthread")="200000";
1986 ::arg().set("soa-minimum-ttl","Don't change")="0";
1987 ::arg().set("soa-serial-offset","Don't change")="0";
1988 ::arg().set("no-shuffle","Don't change")="off";
1989 ::arg().set("additional-processing","turn on to do additional processing")="off";
1990 ::arg().set("aaaa-additional-processing","turn on to do AAAA additional processing (slow)")="off";
1991 ::arg().set("local-port","port to listen on")="53";
1992 ::arg().set("local-address","IP addresses to listen on, separated by spaces or commas. Also accepts ports.")="127.0.0.1";
1993 ::arg().set("trace","if we should output heaps of logging. set to 'fail' to only log failing domains")="off";
1994 ::arg().set("daemon","Operate as a daemon")="yes";
1995 ::arg().set("log-common-errors","If we should log rather common errors")="yes";
1996 ::arg().set("chroot","switch to chroot jail")="";
1997 ::arg().set("setgid","If set, change group id to this gid for more security")="";
1998 ::arg().set("setuid","If set, change user id to this uid for more security")="";
1999 ::arg().set("network-timeout", "Wait this nummer of milliseconds for network i/o")="1500";
2000 ::arg().set("threads", "Launch this number of threads")="2";
2001 ::arg().set("processes", "Launch this number of processes (EXPERIMENTAL, DO NOT CHANGE)")="1";
2002 ::arg().set("config-name","Name of this virtual configuration - will rename the binary image")="";
2003 ::arg().set( "experimental-logfile", "Filename of the log file for JSON parser" )= "/var/log/pdns.log";
2004 ::arg().setSwitch( "experimental-json-interface", "If we should run a JSON webserver") = "no";
2005 ::arg().set("quiet","Suppress logging of questions and answers")="";
2006 ::arg().set("logging-facility","Facility to log messages as. 0 corresponds to local0")="";
2007 ::arg().set("config-dir","Location of configuration directory (recursor.conf)")=SYSCONFDIR;
2008 ::arg().set("socket-owner","Owner of socket")="";
2009 ::arg().set("socket-group","Group of socket")="";
2010 ::arg().set("socket-mode", "Permissions for socket")="";
2011
2012 ::arg().set("socket-dir","Where the controlsocket will live")=LOCALSTATEDIR;
2013 ::arg().set("delegation-only","Which domains we only accept delegations from")="";
2014 ::arg().set("query-local-address","Source IP address for sending queries")="0.0.0.0";
2015 ::arg().set("query-local-address6","Source IPv6 address for sending queries. IF UNSET, IPv6 WILL NOT BE USED FOR OUTGOING QUERIES")="";
2016 ::arg().set("client-tcp-timeout","Timeout in seconds when talking to TCP clients")="2";
2017 ::arg().set("max-mthreads", "Maximum number of simultaneous Mtasker threads")="2048";
2018 ::arg().set("max-tcp-clients","Maximum number of simultaneous TCP clients")="128";
2019 ::arg().set("hint-file", "If set, load root hints from this file")="";
2020 ::arg().set("max-cache-entries", "If set, maximum number of entries in the main cache")="1000000";
2021 ::arg().set("max-negative-ttl", "maximum number of seconds to keep a negative cached entry in memory")="3600";
2022 ::arg().set("max-cache-ttl", "maximum number of seconds to keep a cached entry in memory")="86400";
2023 ::arg().set("packetcache-ttl", "maximum number of seconds to keep a cached entry in packetcache")="3600";
2024 ::arg().set("max-packetcache-entries", "maximum number of entries to keep in the packetcache")="500000";
2025 ::arg().set("packetcache-servfail-ttl", "maximum number of seconds to keep a cached servfail entry in packetcache")="60";
2026 ::arg().set("server-id", "Returned when queried for 'server.id' TXT or NSID, defaults to hostname")="";
2027 ::arg().set("remotes-ringbuffer-entries", "maximum number of packets to store statistics for")="0";
2028 ::arg().set("version-string", "string reported on version.pdns or version.bind")=fullVersionString();
2029 ::arg().set("allow-from", "If set, only allow these comma separated netmasks to recurse")=LOCAL_NETS;
2030 ::arg().set("allow-from-file", "If set, load allowed netmasks from this file")="";
2031 ::arg().set("entropy-source", "If set, read entropy from this file")="/dev/urandom";
2032 ::arg().set("dont-query", "If set, do not query these netmasks for DNS data")=LOCAL_NETS;
2033 ::arg().set("max-tcp-per-client", "If set, maximum number of TCP sessions per client (IP address)")="0";
2034 ::arg().set("spoof-nearmiss-max", "If non-zero, assume spoofing after this many near misses")="20";
2035 ::arg().set("single-socket", "If set, only use a single socket for outgoing queries")="off";
2036 ::arg().set("auth-zones", "Zones for which we have authoritative data, comma separated domain=file pairs ")="";
2037 ::arg().set("forward-zones", "Zones for which we forward queries, comma separated domain=ip pairs")="";
2038 ::arg().set("forward-zones-recurse", "Zones for which we forward queries with recursion bit, comma separated domain=ip pairs")="";
2039 ::arg().set("forward-zones-file", "File with (+)domain=ip pairs for forwarding")="";
2040 ::arg().set("export-etc-hosts", "If we should serve up contents from /etc/hosts")="off";
2041 ::arg().set("export-etc-hosts-search-suffix", "Also serve up the contents of /etc/hosts with this suffix")="";
2042 ::arg().set("etc-hosts-file", "Path to 'hosts' file")="/etc/hosts";
2043 ::arg().set("serve-rfc1918", "If we should be authoritative for RFC 1918 private IP space")="";
2044 ::arg().set("lua-dns-script", "Filename containing an optional 'lua' script that will be used to modify dns answers")="";
2045 // ::arg().setSwitch( "disable-edns-ping", "Disable EDNSPing - EXPERIMENTAL, LEAVE DISABLED" )= "no";
2046 ::arg().setSwitch( "disable-edns", "Disable EDNS - EXPERIMENTAL, LEAVE DISABLED" )= "";
2047 ::arg().setSwitch( "disable-packetcache", "Disable packetcache" )= "no";
2048 ::arg().setSwitch( "pdns-distributes-queries", "If PowerDNS itself should distribute queries over threads (EXPERIMENTAL)")="no";
2049 ::arg().set("include-dir","Include *.conf files from this directory")="";
2050
2051 ::arg().setCmd("help","Provide a helpful message");
2052 ::arg().setCmd("version","Print version string");
2053 ::arg().setCmd("config","Output blank configuration");
2054 L.toConsole(Logger::Info);
2055 ::arg().laxParse(argc,argv); // do a lax parse
2056
2057 if(::arg()["config-name"]!="")
2058 s_programname+="-"+::arg()["config-name"];
2059
2060
2061 if(::arg().mustDo("config")) {
2062 cout<<::arg().configstring()<<endl;
2063 exit(0);
2064 }
2065
2066
2067 string configname=::arg()["config-dir"]+"/"+s_programname+".conf";
2068 cleanSlashes(configname);
2069
2070 if(!::arg().file(configname.c_str()))
2071 L<<Logger::Warning<<"Unable to parse configuration file '"<<configname<<"'"<<endl;
2072
2073 ::arg().parse(argc,argv);
2074
2075 ::arg().set("delegation-only")=toLower(::arg()["delegation-only"]);
2076
2077 if(::arg().mustDo("help")) {
2078 cerr<<"syntax:"<<endl<<endl;
2079 cerr<<::arg().helpstring(::arg()["help"])<<endl;
2080 exit(99);
2081 }
2082 if(::arg().mustDo("version")) {
2083 showProductVersion();
2084 showBuildConfiguration();
2085 exit(99);
2086 }
2087
2088 serviceMain(argc, argv);
2089 }
2090 catch(PDNSException &ae) {
2091 L<<Logger::Error<<"Exception: "<<ae.reason<<endl;
2092 ret=EXIT_FAILURE;
2093 }
2094 catch(std::exception &e) {
2095 L<<Logger::Error<<"STL Exception: "<<e.what()<<endl;
2096 ret=EXIT_FAILURE;
2097 }
2098 catch(...) {
2099 L<<Logger::Error<<"any other exception in main: "<<endl;
2100 ret=EXIT_FAILURE;
2101 }
2102
2103 return ret;
2104 }