From: bert hubert Date: Wed, 26 Jun 2013 15:01:42 +0000 (+0200) Subject: add rough tcp support to dnsdist X-Git-Tag: rec-3.6.0-rc1~627 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=726ddf60d56b55599bfa64625426f8024d9e179d;p=thirdparty%2Fpdns.git add rough tcp support to dnsdist --- diff --git a/pdns/Makefile.am b/pdns/Makefile.am index 07e4418148..1f79ad3698 100644 --- a/pdns/Makefile.am +++ b/pdns/Makefile.am @@ -151,7 +151,7 @@ dnstcpbench_LDADD=$(BOOST_PROGRAM_OPTIONS_LIBS) dnsdist_SOURCES=dnsdist.cc sstuff.hh dnsparser.cc dnsparser.hh dnsrecords.cc dnswriter.cc dnslabeltext.cc dnswriter.hh \ misc.cc misc.hh rcpgenerator.cc rcpgenerator.hh base64.cc base64.hh unix_utility.cc \ - logger.cc statbag.cc qtype.cc sillyrecords.cc nsecrecords.cc base32.cc + logger.cc statbag.cc qtype.cc sillyrecords.cc nsecrecords.cc base32.cc epollmplexer.cc dnsdist_LDFLAGS=$(BOOST_PROGRAM_OPTIONS_LDFLAGS) dnsdist_LDADD=$(BOOST_PROGRAM_OPTIONS_LIBS) diff --git a/pdns/dnsdist.cc b/pdns/dnsdist.cc index 24724146b0..cd515395f4 100644 --- a/pdns/dnsdist.cc +++ b/pdns/dnsdist.cc @@ -19,10 +19,10 @@ #include "misc.hh" #include "statbag.hh" #include -#include #include #include #include +#include "mplexer.hh" /* syntax: dnsdist 8.8.8.8 8.8.4.4 208.67.222.222 208.67.220.220 Added downstream server 8.8.8.8:53 @@ -71,7 +71,34 @@ int Bind(int sockfd, const ComboAddress& local) return ret; } -/* the grand design. Per socket we listen on for incoming queries there is one thread. +int Accept(int sockfd, ComboAddress& remote) +{ + socklen_t remlen = remote.getSocklen(); + + int ret = accept(sockfd, (struct sockaddr*)&remote, &remlen); + if(ret < 0) + RuntimeError(boost::format("accepting new connection on socket: %s") % strerror(errno)); + return ret; +} + +int Listen(int sockfd, int limit) +{ + int ret = listen(sockfd, limit); + if(ret < 0) + RuntimeError(boost::format("setting socket to listen: %s") % strerror(errno)); + return ret; +} + +int Setsockopt(int sockfd, int level, int opname, int value) +{ + int ret = setsockopt(sockfd, level, opname, &value, sizeof(value)); + if(ret < 0) + RuntimeError(boost::format("setsockopt for level %d and opname %d to %d failed: %s") % level % opname % value % strerror(errno)); + return ret; + +} + +/* UDP: the grand design. Per socket we listen on for incoming queries there is one thread. Then we have a bunch of connected sockets for talking to downstream servers. We send directly to those sockets. @@ -88,11 +115,12 @@ int Bind(int sockfd, const ComboAddress& local) struct IDState { + IDState() : origFD(-1) {} + int origFD; // set to <0 to indicate this state is empty uint16_t origID; ComboAddress origRemote; AtomicCounter age; - bool used; }; struct DownstreamState @@ -100,6 +128,7 @@ struct DownstreamState int fd; pthread_t tid; ComboAddress remote; + vector idStates; AtomicCounter idOffset; AtomicCounter sendErrors; @@ -166,7 +195,7 @@ DownstreamState& getBestDownstream() } // listens to incoming queries, sends out to downstream servers, noting the intended return path -void* clientThread(void* p) +void* udpClientThread(void* p) { ClientState* cs = (ClientState*) p; if(g_verbose) @@ -201,7 +230,7 @@ void* clientThread(void* p) ids->age = AtomicCounter(); ids->origID = dh->id; ids->origRemote = remote; - ids->used = true; + dh->id = idOffset; len = send(ss.fd, packet, len, 0); @@ -216,7 +245,7 @@ void* clientThread(void* p) void* statThread(void*) { - int interval =g_vm["stats-interval"].as(); + int interval = 1; if(!interval) return 0; uint32_t lastQueries=0; @@ -250,13 +279,136 @@ void* statThread(void*) return 0; } +/* TCP: the grand design. + We forward 'messages' between clients and downstream servers. Messages are 65k bytes large, tops. + An answer might theoretically consist of multiple messages (for example, in the case of AXFR), initially + we will not go there. + + In a sense there is a strong symmetry between UDP and TCP, once a connection to a downstream has been setup. + This symmetry is broken because of head-of-line blocking within TCP though, necessitating additional connections + to guarantee performance. + + So the idea is to have a 'pool' of available downstream connections, and forward messages to/from them and never queue. + So whenever an answer comes in, we know where it needs to go. + + Let's start naively. +*/ + +int getTCPDownstream() +{ + DownstreamState& ds= getBestDownstream(); + cout<<"TCP connecting to downstream "<local.sin4.sin_family; + + int ds = -1; + int client = -1; + for(;;) { + try { + client = Accept(cs->fd, remote); + if(g_verbose) + cout << "Got connection from "<= 0) + close(client); + + continue; + } + } + + return 0; +} + + int main(int argc, char** argv) try { + signal(SIGPIPE, SIG_IGN); po::options_description desc("Allowed options"), hidden, alloptions; desc.add_options() ("help,h", "produce help message") - ("stats-interval,s", po::value()->default_value(5), "produce statistics output every n seconds") ("local", po::value >(), "Listen on which address") ("max-outstanding", po::value()->default_value(65535), "maximum outstanding queries per downstream") ("verbose,v", "be verbose"); @@ -299,10 +451,6 @@ try Connect(dss.fd, dss.remote); dss.idStates.resize(g_maxOutstanding); - BOOST_FOREACH(IDState& ids, dss.idStates) { - ids.origFD = -1; - ids.used = false; - } pthread_create(&dss.tid, 0, responderThread, (void*)&dss); } @@ -319,17 +467,32 @@ try cs->local= ComboAddress(local, 53); cs->fd = Socket(cs->local.sin4.sin_family, SOCK_DGRAM, 0); if(cs->local.sin4.sin_family == AF_INET6) { - int val = 1; - setsockopt(cs->fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof(val)); + Setsockopt(cs->fd, IPPROTO_IPV6, IPV6_V6ONLY, 1); } + Bind(cs->fd, cs->local); + pthread_create(&tid, 0, udpClientThread, (void*) cs); + } + + BOOST_FOREACH(const string& local, locals) { + ClientState* cs = new ClientState; + cs->local= ComboAddress(local, 53); + + cs->fd = Socket(cs->local.sin4.sin_family, SOCK_STREAM, 0); + + Setsockopt(cs->fd, SOL_SOCKET, SO_REUSEADDR, 1); + Setsockopt(cs->fd, SOL_TCP,TCP_DEFER_ACCEPT, 1); + if(cs->local.sin4.sin_family == AF_INET6) { + Setsockopt(cs->fd, IPPROTO_IPV6, IPV6_V6ONLY, 1); + } + Bind(cs->fd, cs->local); - - pthread_create(&tid, 0, clientThread, (void*) cs); + Listen(cs->fd, 64); + pthread_create(&tid, 0, tcpClientThread, (void*) cs); } + pthread_t stattid; pthread_create(&stattid, 0, statThread, 0); - void* status; pthread_join(tid, &status);