]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
add rough tcp support to dnsdist
authorbert hubert <bert.hubert@netherlabs.nl>
Wed, 26 Jun 2013 15:01:42 +0000 (17:01 +0200)
committerbert hubert <bert.hubert@netherlabs.nl>
Wed, 26 Jun 2013 15:01:42 +0000 (17:01 +0200)
pdns/Makefile.am
pdns/dnsdist.cc

index 07e4418148fa4753ee0f32b604e9b13c61a5e054..1f79ad36983637f0b80347abfd8f04d76a0a77b6 100644 (file)
@@ -151,7 +151,7 @@ dnstcpbench_LDADD=$(BOOST_PROGRAM_OPTIONS_LIBS)
 
 dnsdist_SOURCES=dnsdist.cc sstuff.hh dnsparser.cc dnsparser.hh dnsrecords.cc dnswriter.cc dnslabeltext.cc dnswriter.hh \
        misc.cc misc.hh rcpgenerator.cc rcpgenerator.hh base64.cc base64.hh unix_utility.cc \
-       logger.cc statbag.cc qtype.cc sillyrecords.cc nsecrecords.cc base32.cc
+       logger.cc statbag.cc qtype.cc sillyrecords.cc nsecrecords.cc base32.cc epollmplexer.cc
 dnsdist_LDFLAGS=$(BOOST_PROGRAM_OPTIONS_LDFLAGS)
 dnsdist_LDADD=$(BOOST_PROGRAM_OPTIONS_LIBS)
 
index 24724146b063a1543ed97cdac558cb2186f24c3d..cd515395f407ae4d2ae347ab7edb97e7360c6f6f 100644 (file)
 #include "misc.hh"
 #include "statbag.hh"
 #include <netinet/tcp.h>
-#include <boost/array.hpp>
 #include <boost/program_options.hpp>
 #include <boost/foreach.hpp>
 #include <limits>
+#include "mplexer.hh"
 
 /* syntax: dnsdist 8.8.8.8 8.8.4.4 208.67.222.222 208.67.220.220
    Added downstream server 8.8.8.8:53
@@ -71,7 +71,34 @@ int Bind(int sockfd, const ComboAddress& local)
   return ret;
 }
 
-/* the grand design. Per socket we listen on for incoming queries there is one thread.
+int Accept(int sockfd, ComboAddress& remote)
+{
+  socklen_t remlen = remote.getSocklen();
+
+  int ret = accept(sockfd, (struct sockaddr*)&remote, &remlen);
+  if(ret < 0)
+    RuntimeError(boost::format("accepting new connection on socket: %s") % strerror(errno));
+  return ret;
+}
+
+int Listen(int sockfd, int limit)
+{
+  int ret = listen(sockfd, limit);
+  if(ret < 0)
+    RuntimeError(boost::format("setting socket to listen: %s") % strerror(errno));
+  return ret;
+}
+
+int Setsockopt(int sockfd, int level, int opname, int value)
+{
+  int ret = setsockopt(sockfd, level, opname, &value, sizeof(value));
+  if(ret < 0)
+    RuntimeError(boost::format("setsockopt for level %d and opname %d to %d failed: %s") % level % opname % value % strerror(errno));
+  return ret;
+
+}
+
+/* UDP: the grand design. Per socket we listen on for incoming queries there is one thread.
    Then we have a bunch of connected sockets for talking to downstream servers. 
    We send directly to those sockets.
 
@@ -88,11 +115,12 @@ int Bind(int sockfd, const ComboAddress& local)
 
 struct IDState
 {
+  IDState() : origFD(-1) {}
+
   int origFD;  // set to <0 to indicate this state is empty
   uint16_t origID;
   ComboAddress origRemote;
   AtomicCounter age;
-  bool used;
 };
 
 struct DownstreamState
@@ -100,6 +128,7 @@ struct DownstreamState
   int fd;            
   pthread_t tid;
   ComboAddress remote;
+
   vector<IDState> idStates;
   AtomicCounter idOffset;
   AtomicCounter sendErrors;
@@ -166,7 +195,7 @@ DownstreamState& getBestDownstream()
 }
 
 // listens to incoming queries, sends out to downstream servers, noting the intended return path 
-void* clientThread(void* p)
+void* udpClientThread(void* p)
 {
   ClientState* cs = (ClientState*) p;
   if(g_verbose)
@@ -201,7 +230,7 @@ void* clientThread(void* p)
     ids->age = AtomicCounter();
     ids->origID = dh->id;
     ids->origRemote = remote;
-    ids->used = true;
+
     dh->id = idOffset;
     
     len = send(ss.fd, packet, len, 0);
@@ -216,7 +245,7 @@ void* clientThread(void* p)
 
 void* statThread(void*)
 {
-  int interval =g_vm["stats-interval"].as<int>();
+  int interval = 1;
   if(!interval)
     return 0;
   uint32_t lastQueries=0;
@@ -250,13 +279,136 @@ void* statThread(void*)
   return 0;
 }
 
+/* TCP: the grand design. 
+   We forward 'messages' between clients and downstream servers. Messages are 65k bytes large, tops. 
+   An answer might theoretically consist of multiple messages (for example, in the case of AXFR), initially 
+   we will not go there.
+
+   In a sense there is a strong symmetry between UDP and TCP, once a connection to a downstream has been setup.
+   This symmetry is broken because of head-of-line blocking within TCP though, necessitating additional connections
+   to guarantee performance.
+
+   So the idea is to have a 'pool' of available downstream connections, and forward messages to/from them and never queue.
+   So whenever an answer comes in, we know where it needs to go.
+
+   Let's start naively.
+*/
+
+int getTCPDownstream()
+{
+  DownstreamState& ds= getBestDownstream();
+  cout<<"TCP connecting to downstream "<<ds.remote.toStringWithPort()<<endl;
+  int sock = Socket(ds.remote.sin4.sin_family, SOCK_STREAM, 0);
+  Connect(sock, ds.remote);
+  return sock;
+}
+
+bool getMsgLen(int fd, uint16_t* len)
+{
+  uint16_t raw;
+  int ret = read(fd, &raw, 2);
+  if(ret != 2)
+    return false;
+  *len = ntohs(raw);
+  return true;
+}
+
+bool putMsgLen(int fd, uint16_t len)
+{
+  uint16_t raw = htons(len);
+  int ret = write(fd, &raw, 2);
+  return ret==2;
+}
+
+
+/* spawn as many of these as required, they call Accept on a socket on which they will accept queries, and 
+   they will initiate downstream connections to service them. An attempt is made to reuse existing
+   connections.
+
+   As it stands, this code is both dangerous (doesn't check for partial reads/writes) and inefficient  
+   (does split writes). 
+
+   In addition, we leak filedescriptors. 
+*/
+void* tcpClientThread(void* p)
+{
+  ClientState* cs = (ClientState*) p;
+
+  ComboAddress remote;
+  remote.sin4.sin_family = cs->local.sin4.sin_family;
+  
+  int ds = -1;
+  int client = -1;
+  for(;;) {
+    try {
+      client = Accept(cs->fd, remote);
+      if(g_verbose)
+       cout << "Got connection from "<<remote.toStringWithPort()<<endl;
+
+      if(ds == -1)
+       ds = getTCPDownstream();
+      else if(g_verbose)
+       cout <<"Reusing existing TCP connection"<<endl;
+
+      uint16_t qlen, rlen;
+      for(;;) {
+       if(!getMsgLen(client, &qlen))
+         break;
+       g_numQueries++;
+       char query[qlen];
+       int ret = read(client, query, qlen);
+      
+      retry:; 
+       if(!putMsgLen(ds, qlen)) {
+         if(g_verbose)
+           cout<<"Downstream connection died on us, getting a new one!"<<endl;
+         close(ds);
+         ds=getTCPDownstream();
+         goto retry;
+       }
+      
+      
+       ret = write(ds, query, qlen);
+      
+       if(!getMsgLen(ds, &rlen)) {
+         if(g_verbose)
+           cout<<"Downstream connection died on us phase 2, getting a new one!"<<endl;
+         close(ds);
+         ds=getTCPDownstream();
+         goto retry;
+       }
+
+       char answerbuffer[rlen];
+       ret = read(ds, answerbuffer, rlen);
+      
+       putMsgLen(client, rlen);
+       ret = write(client, answerbuffer, rlen);
+      }
+      if(g_verbose)
+       cout<<"Closing connection"<<endl;
+      close(client); 
+      client=-1;
+    }
+    catch(std::exception& e) 
+    {
+      if(client >= 0)
+       close(client);
+
+      continue;
+    } 
+  }
+
+  return 0;
+}
+
+
 int main(int argc, char** argv)
 try
 {
+  signal(SIGPIPE, SIG_IGN);
   po::options_description desc("Allowed options"), hidden, alloptions;
   desc.add_options()
     ("help,h", "produce help message")
-    ("stats-interval,s", po::value<int>()->default_value(5), "produce statistics output every n seconds")
     ("local", po::value<vector<string> >(), "Listen on which address")
     ("max-outstanding", po::value<uint16_t>()->default_value(65535), "maximum outstanding queries per downstream")
     ("verbose,v", "be verbose");
@@ -299,10 +451,6 @@ try
     Connect(dss.fd, dss.remote);
 
     dss.idStates.resize(g_maxOutstanding);
-    BOOST_FOREACH(IDState& ids, dss.idStates) {
-      ids.origFD = -1;
-      ids.used = false;
-    }
 
     pthread_create(&dss.tid, 0, responderThread, (void*)&dss);
   }
@@ -319,17 +467,32 @@ try
     cs->local= ComboAddress(local, 53);
     cs->fd = Socket(cs->local.sin4.sin_family, SOCK_DGRAM, 0);
     if(cs->local.sin4.sin_family == AF_INET6) {
-      int val = 1;
-      setsockopt(cs->fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof(val));
+      Setsockopt(cs->fd, IPPROTO_IPV6, IPV6_V6ONLY, 1);
     }
+    Bind(cs->fd, cs->local);    
+    pthread_create(&tid, 0, udpClientThread, (void*) cs);
+  }
+
+  BOOST_FOREACH(const string& local, locals) {
+    ClientState* cs = new ClientState;
+    cs->local= ComboAddress(local, 53);
+
+    cs->fd = Socket(cs->local.sin4.sin_family, SOCK_STREAM, 0);
+
+    Setsockopt(cs->fd, SOL_SOCKET, SO_REUSEADDR, 1);
+    Setsockopt(cs->fd, SOL_TCP,TCP_DEFER_ACCEPT, 1);
+    if(cs->local.sin4.sin_family == AF_INET6) {
+      Setsockopt(cs->fd, IPPROTO_IPV6, IPV6_V6ONLY, 1);
+    }
+
     Bind(cs->fd, cs->local);
-    
-    pthread_create(&tid, 0, clientThread, (void*) cs);
+    Listen(cs->fd, 64);
+    pthread_create(&tid, 0, tcpClientThread, (void*) cs);
   }
 
+
   pthread_t stattid;
   pthread_create(&stattid, 0, statThread, 0);
-
   void* status;
   pthread_join(tid, &status);