]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
move out generic socket tools from dnsdist, integrate TCP statistics
authorbert hubert <bert.hubert@netherlabs.nl>
Wed, 26 Jun 2013 18:41:00 +0000 (20:41 +0200)
committerbert hubert <bert.hubert@netherlabs.nl>
Wed, 26 Jun 2013 18:41:00 +0000 (20:41 +0200)
pdns/Makefile.am
pdns/dnsdist.cc
pdns/iputils.cc [new file with mode: 0644]
pdns/iputils.hh

index a38c480c1da19e37af54d8cd373102efaf620ae6..b3276bc9ea926fd2cb9ef3584fae93f4b879f734 100644 (file)
@@ -150,7 +150,7 @@ dnstcpbench_LDADD=$(BOOST_PROGRAM_OPTIONS_LIBS)
 
 dnsdist_SOURCES=dnsdist.cc sstuff.hh dnsparser.cc dnsparser.hh dnsrecords.cc dnswriter.cc dnslabeltext.cc dnswriter.hh \
        misc.cc misc.hh rcpgenerator.cc rcpgenerator.hh base64.cc base64.hh unix_utility.cc \
-       logger.cc statbag.cc qtype.cc sillyrecords.cc nsecrecords.cc base32.cc epollmplexer.cc
+       logger.cc statbag.cc qtype.cc sillyrecords.cc nsecrecords.cc base32.cc epollmplexer.cc iputils.cc
 dnsdist_LDFLAGS=$(BOOST_PROGRAM_OPTIONS_LDFLAGS)
 dnsdist_LDADD=$(BOOST_PROGRAM_OPTIONS_LIBS)
 
index cd515395f407ae4d2ae347ab7edb97e7360c6f6f..e50fe386470d33222f5edc55c229d428edd3cad5 100644 (file)
@@ -22,7 +22,6 @@
 #include <boost/program_options.hpp>
 #include <boost/foreach.hpp>
 #include <limits>
-#include "mplexer.hh"
 
 /* syntax: dnsdist 8.8.8.8 8.8.4.4 208.67.222.222 208.67.220.220
    Added downstream server 8.8.8.8:53
@@ -42,61 +41,6 @@ bool g_verbose;
 AtomicCounter g_pos, g_numQueries;
 uint16_t g_maxOutstanding;
 
-void RuntimeError(const boost::format& fmt)
-{
-  throw runtime_error(fmt.str());
-}
-
-int Socket(int family, int type, int flags)
-{
-  int ret = socket(family, type, flags);
-  if(ret < 0)
-    RuntimeError(boost::format("creating socket of type %d: %s") % family % strerror(errno));
-  return ret;
-}
-
-int Connect(int sockfd, const ComboAddress& remote)
-{
-  int ret = connect(sockfd, (struct sockaddr*)&remote, remote.getSocklen());
-  if(ret < 0)
-    RuntimeError(boost::format("connecting socket to %s: %s") % remote.toStringWithPort() % strerror(errno));
-  return ret;
-}
-
-int Bind(int sockfd, const ComboAddress& local)
-{
-  int ret = bind(sockfd, (struct sockaddr*)&local, local.getSocklen());
-  if(ret < 0)
-    RuntimeError(boost::format("binding socket to %s: %s") % local.toStringWithPort() % strerror(errno));
-  return ret;
-}
-
-int Accept(int sockfd, ComboAddress& remote)
-{
-  socklen_t remlen = remote.getSocklen();
-
-  int ret = accept(sockfd, (struct sockaddr*)&remote, &remlen);
-  if(ret < 0)
-    RuntimeError(boost::format("accepting new connection on socket: %s") % strerror(errno));
-  return ret;
-}
-
-int Listen(int sockfd, int limit)
-{
-  int ret = listen(sockfd, limit);
-  if(ret < 0)
-    RuntimeError(boost::format("setting socket to listen: %s") % strerror(errno));
-  return ret;
-}
-
-int Setsockopt(int sockfd, int level, int opname, int value)
-{
-  int ret = setsockopt(sockfd, level, opname, &value, sizeof(value));
-  if(ret < 0)
-    RuntimeError(boost::format("setsockopt for level %d and opname %d to %d failed: %s") % level % opname % value % strerror(errno));
-  return ret;
-
-}
 
 /* UDP: the grand design. Per socket we listen on for incoming queries there is one thread.
    Then we have a bunch of connected sockets for talking to downstream servers. 
@@ -177,7 +121,8 @@ void* responderThread(void *p)
 struct ClientState
 {
   ComboAddress local;
-  int fd;
+  int udpFD;
+  int tcpFD;
 };
 
 DownstreamState& getBestDownstream()
@@ -210,7 +155,7 @@ void* udpClientThread(void* p)
   int len;
 
   for(;;) {
-    len = recvfrom(cs->fd, packet, sizeof(packet), 0, (struct sockaddr*) &remote, &socklen);
+    len = recvfrom(cs->udpFD, packet, sizeof(packet), 0, (struct sockaddr*) &remote, &socklen);
     if(len < 0) 
       continue;
     g_numQueries++;
@@ -226,7 +171,7 @@ void* udpClientThread(void* p)
     else
       ss.reuseds++;
 
-    ids->origFD = cs->fd;
+    ids->origFD = cs->udpFD;
     ids->age = AtomicCounter();
     ids->origID = dh->id;
     ids->origRemote = remote;
@@ -258,7 +203,7 @@ void* statThread(void*)
     unsigned int outstanding=0;
     for(unsigned int n=0; n < g_numremotes; ++n) {
       DownstreamState& dss = g_dstates[n];
-      cout<<dss.remote.toStringWithPort()<<": "<<dss.outstanding<<" outstanding, "<<(dss.queries - prev[n].queries)/interval <<" qps"<<endl;
+      cout<<'\t'<<dss.remote.toStringWithPort()<<": "<<dss.outstanding<<" outstanding, "<<(dss.queries - prev[n].queries)/interval <<" qps"<<endl;
       outstanding += dss.outstanding;
       prev[n].queries = dss.queries;
 
@@ -269,8 +214,7 @@ void* statThread(void*)
          ids.origFD = -1;
          dss.reuseds++;
          --dss.outstanding;
-       }
-         
+       }         
       }
     }
     cout<<outstanding<<" outstanding queries, " <<(g_numQueries - lastQueries)/interval <<" qps"<<endl;
@@ -294,12 +238,12 @@ void* statThread(void*)
    Let's start naively.
 */
 
-int getTCPDownstream()
+int getTCPDownstream(DownstreamState** ds)
 {
-  DownstreamState& ds= getBestDownstream();
-  cout<<"TCP connecting to downstream "<<ds.remote.toStringWithPort()<<endl;
-  int sock = Socket(ds.remote.sin4.sin_family, SOCK_STREAM, 0);
-  Connect(sock, ds.remote);
+  *ds = &getBestDownstream();
+  cout<<"TCP connecting to downstream "<<(*ds)->remote.toStringWithPort()<<endl;
+  int sock = SSocket((*ds)->remote.sin4.sin_family, SOCK_STREAM, 0);
+  SConnect(sock, (*ds)->remote);
   return sock;
 }
 
@@ -337,16 +281,17 @@ void* tcpClientThread(void* p)
   ComboAddress remote;
   remote.sin4.sin_family = cs->local.sin4.sin_family;
   
-  int ds = -1;
+  int dsock = -1;
   int client = -1;
+  DownstreamState *ds=0;
   for(;;) {
     try {
-      client = Accept(cs->fd, remote);
+      client = SAccept(cs->tcpFD, remote);
       if(g_verbose)
        cout << "Got connection from "<<remote.toStringWithPort()<<endl;
 
-      if(ds == -1)
-       ds = getTCPDownstream();
+      if(dsock == -1)
+       dsock = getTCPDownstream(&ds);
       else if(g_verbose)
        cout <<"Reusing existing TCP connection"<<endl;
 
@@ -355,37 +300,37 @@ void* tcpClientThread(void* p)
        if(!getMsgLen(client, &qlen))
          break;
        g_numQueries++;
+       ds->queries++;
        char query[qlen];
        int ret = read(client, query, qlen);
       
       retry:; 
-       if(!putMsgLen(ds, qlen)) {
+       if(!putMsgLen(dsock, qlen)) {
          if(g_verbose)
            cout<<"Downstream connection died on us, getting a new one!"<<endl;
-         close(ds);
-         ds=getTCPDownstream();
+         close(dsock);
+         dsock=getTCPDownstream(&ds);
          goto retry;
        }
       
+       ret = write(dsock, query, qlen);
       
-       ret = write(ds, query, qlen);
-      
-       if(!getMsgLen(ds, &rlen)) {
+       if(!getMsgLen(dsock, &rlen)) {
          if(g_verbose)
            cout<<"Downstream connection died on us phase 2, getting a new one!"<<endl;
-         close(ds);
-         ds=getTCPDownstream();
+         close(dsock);
+         dsock=getTCPDownstream(&ds);
          goto retry;
        }
 
        char answerbuffer[rlen];
-       ret = read(ds, answerbuffer, rlen);
+       ret = read(dsock, answerbuffer, rlen);
       
        putMsgLen(client, rlen);
        ret = write(client, answerbuffer, rlen);
       }
       if(g_verbose)
-       cout<<"Closing connection"<<endl;
+       cout<<"Closing client connection"<<endl;
       close(client); 
       client=-1;
     }
@@ -447,8 +392,8 @@ try
  
     dss.remote = ComboAddress(remote, 53);
     
-    dss.fd = Socket(dss.remote.sin4.sin_family, SOCK_DGRAM, 0);
-    Connect(dss.fd, dss.remote);
+    dss.fd = SSocket(dss.remote.sin4.sin_family, SOCK_DGRAM, 0);
+    SConnect(dss.fd, dss.remote);
 
     dss.idStates.resize(g_maxOutstanding);
 
@@ -465,11 +410,11 @@ try
   BOOST_FOREACH(const string& local, locals) {
     ClientState* cs = new ClientState;
     cs->local= ComboAddress(local, 53);
-    cs->fd = Socket(cs->local.sin4.sin_family, SOCK_DGRAM, 0);
+    cs->udpFD = SSocket(cs->local.sin4.sin_family, SOCK_DGRAM, 0);
     if(cs->local.sin4.sin_family == AF_INET6) {
-      Setsockopt(cs->fd, IPPROTO_IPV6, IPV6_V6ONLY, 1);
+      SSetsockopt(cs->udpFD, IPPROTO_IPV6, IPV6_V6ONLY, 1);
     }
-    Bind(cs->fd, cs->local);    
+    SBind(cs->udpFD, cs->local);    
     pthread_create(&tid, 0, udpClientThread, (void*) cs);
   }
 
@@ -477,20 +422,19 @@ try
     ClientState* cs = new ClientState;
     cs->local= ComboAddress(local, 53);
 
-    cs->fd = Socket(cs->local.sin4.sin_family, SOCK_STREAM, 0);
+    cs->tcpFD = SSocket(cs->local.sin4.sin_family, SOCK_STREAM, 0);
 
-    Setsockopt(cs->fd, SOL_SOCKET, SO_REUSEADDR, 1);
-    Setsockopt(cs->fd, SOL_TCP,TCP_DEFER_ACCEPT, 1);
+    SSetsockopt(cs->tcpFD, SOL_SOCKET, SO_REUSEADDR, 1);
+    SSetsockopt(cs->tcpFD, SOL_TCP,TCP_DEFER_ACCEPT, 1);
     if(cs->local.sin4.sin_family == AF_INET6) {
-      Setsockopt(cs->fd, IPPROTO_IPV6, IPV6_V6ONLY, 1);
+      SSetsockopt(cs->tcpFD, IPPROTO_IPV6, IPV6_V6ONLY, 1);
     }
 
-    Bind(cs->fd, cs->local);
-    Listen(cs->fd, 64);
+    SBind(cs->tcpFD, cs->local);
+    SListen(cs->tcpFD, 64);
     pthread_create(&tid, 0, tcpClientThread, (void*) cs);
   }
 
-
   pthread_t stattid;
   pthread_create(&stattid, 0, statThread, 0);
   void* status;
diff --git a/pdns/iputils.cc b/pdns/iputils.cc
new file mode 100644 (file)
index 0000000..d9eb9b5
--- /dev/null
@@ -0,0 +1,60 @@
+#include "iputils.hh"
+/** these functions provide a very lightweight wrapper to the Berkeley sockets API. Errors -> exceptions! */
+
+static void RuntimeError(const boost::format& fmt)
+{
+  throw runtime_error(fmt.str());
+}
+
+
+int SSocket(int family, int type, int flags)
+{
+  int ret = socket(family, type, flags);
+  if(ret < 0)
+    RuntimeError(boost::format("creating socket of type %d: %s") % family % strerror(errno));
+  return ret;
+}
+
+int SConnect(int sockfd, const ComboAddress& remote)
+{
+  int ret = connect(sockfd, (struct sockaddr*)&remote, remote.getSocklen());
+  if(ret < 0)
+    RuntimeError(boost::format("connecting socket to %s: %s") % remote.toStringWithPort() % strerror(errno));
+  return ret;
+}
+
+int SBind(int sockfd, const ComboAddress& local)
+{
+  int ret = bind(sockfd, (struct sockaddr*)&local, local.getSocklen());
+  if(ret < 0)
+    RuntimeError(boost::format("binding socket to %s: %s") % local.toStringWithPort() % strerror(errno));
+  return ret;
+}
+
+int SAccept(int sockfd, ComboAddress& remote)
+{
+  socklen_t remlen = remote.getSocklen();
+
+  int ret = accept(sockfd, (struct sockaddr*)&remote, &remlen);
+  if(ret < 0)
+    RuntimeError(boost::format("accepting new connection on socket: %s") % strerror(errno));
+  return ret;
+}
+
+int SListen(int sockfd, int limit)
+{
+  int ret = listen(sockfd, limit);
+  if(ret < 0)
+    RuntimeError(boost::format("setting socket to listen: %s") % strerror(errno));
+  return ret;
+}
+
+int SSetsockopt(int sockfd, int level, int opname, int value)
+{
+  int ret = setsockopt(sockfd, level, opname, &value, sizeof(value));
+  if(ret < 0)
+    RuntimeError(boost::format("setsockopt for level %d and opname %d to %d failed: %s") % level % opname % value % strerror(errno));
+  return ret;
+}
+
+
index 8619eaeaa76177da072bd63140c40eb13f10f137..17e563dba8173864e3fa9eddaaec03cf84a57f8c 100644 (file)
@@ -360,11 +360,17 @@ public:
     return str.str();
   }
 
-
 private:
   typedef vector<Netmask> container_t;
-  container_t d_masks;
-  
+  container_t d_masks;  
 };
 
+
+int SSocket(int family, int type, int flags);
+int SConnect(int sockfd, const ComboAddress& remote);
+int SBind(int sockfd, const ComboAddress& local);
+int SAccept(int sockfd, ComboAddress& remote);
+int SListen(int sockfd, int limit);
+int SSetsockopt(int sockfd, int level, int opname, int value);
+
 #endif