]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
cleanup code a bit, add statistics
authorbert hubert <bert.hubert@netherlabs.nl>
Tue, 25 Jun 2013 13:30:38 +0000 (15:30 +0200)
committerbert hubert <bert.hubert@netherlabs.nl>
Tue, 25 Jun 2013 13:30:38 +0000 (15:30 +0200)
pdns/dnsdist.cc

index 755d61d6c965b8a0848698ceb6b5d0308346070a..63ed1cc932e7e1d48062c200ae53c03fe10189df 100644 (file)
@@ -41,13 +41,18 @@ namespace po = boost::program_options;
 po::variables_map g_vm;
 
 bool g_verbose;
-AtomicCounter g_pos, g_timeouts;
+AtomicCounter g_pos;
+
+void RuntimeError(const boost::format& fmt)
+{
+  throw runtime_error(fmt.str());
+}
 
 int Socket(int family, int type, int flags)
 {
   int ret = socket(family, type, flags);
   if(ret < 0)
-    throw runtime_error((boost::format("creating socket of type %d: %s") % family % strerror(errno)).str());
+    RuntimeError(boost::format("creating socket of type %d: %s") % family % strerror(errno));
   return ret;
 }
 
@@ -55,7 +60,7 @@ int Connect(int sockfd, const ComboAddress& remote)
 {
   int ret = connect(sockfd, (struct sockaddr*)&remote, remote.getSocklen());
   if(ret < 0)
-    throw runtime_error((boost::format("connecting socket to %s: %s") % remote.toStringWithPort() % strerror(errno)).str());
+    RuntimeError(boost::format("connecting socket to %s: %s") % remote.toStringWithPort() % strerror(errno));
   return ret;
 }
 
@@ -63,7 +68,7 @@ int Bind(int sockfd, const ComboAddress& local)
 {
   int ret = bind(sockfd, (struct sockaddr*)&local, local.getSocklen());
   if(ret < 0)
-    throw runtime_error((boost::format("binding socket to %s: %s") % local.toStringWithPort() % strerror(errno)).str());
+    RuntimeError(boost::format("binding socket to %s: %s") % local.toStringWithPort() % strerror(errno));
   return ret;
 }
 
@@ -84,26 +89,29 @@ int Bind(int sockfd, const ComboAddress& local)
 
 struct IDState
 {
+  int origFD;  // set to <0 to indicate this state is empty
   uint16_t origID;
   ComboAddress origRemote;
-  int origFD;
+  bool used;
 };
 
-struct SocketState
+struct DownstreamState
 {
-  int fd;
+  int fd;            
   pthread_t tid;
   ComboAddress remote;
   vector<IDState> idStates;
   AtomicCounter idOffset;
+  AtomicCounter sendErrors;
 };
 
-SocketState* g_socketstates;
+DownstreamState* g_dstates;
 unsigned int g_numremotes;
 
+// listens on a dedicated socket, lobs answers from downstream servers to original requestors
 void* responderThread(void *p)
 {
-  SocketState* state = (SocketState*)p;
+  DownstreamState* state = (DownstreamState*)p;
   if(g_verbose)
     cout << "Added downstream server "<<state->remote.toStringWithPort()<<endl;
   char packet[65536];
@@ -133,7 +141,7 @@ struct ClientState
   int fd;
 };
 
-// listens to incoming queries, sends out to downstream servers
+// listens to incoming queries, sends out to downstream servers, noting the intended return path 
 void* clientThread(void* p)
 {
   ClientState* cs = (ClientState*) p;
@@ -150,30 +158,58 @@ void* clientThread(void* p)
 
   for(;;) {
     len = recvfrom(cs->fd, packet, sizeof(packet), 0, (struct sockaddr*) &remote, &socklen);
-    if(len < 0)
+    if(len < 0) 
       continue;
     
-    SocketState& ss = g_socketstates[(g_pos++) % g_numremotes];
+    /* right now, this is our simple round robin downstream selector */
+    DownstreamState& ss = g_dstates[(g_pos++) % g_numremotes]; 
     unsigned int idOffset = ss.idOffset++;
     IDState* ids = &ss.idStates[idOffset];
     ids->origFD = cs->fd;
     ids->origID = dh->id;
     ids->origRemote = remote;
+    ids->used = true;
     dh->id = idOffset;
     
-    send(ss.fd, packet, len, 0);
+    len = send(ss.fd, packet, len, 0);
+    if(len < 0) 
+      ss.sendErrors++;
+
     if(g_verbose)
       cout<<"Got query from "<<remote.toStringWithPort()<<",relayed to "<<ss.remote.toStringWithPort()<<endl;
   }
   return 0;
 }
 
+void* statThread(void*)
+{
+  int interval =g_vm["stats-interval"].as<int>();
+  if(!interval)
+    return 0;
+
+  for(;;) {
+    sleep(interval);
+    unsigned int outstanding=0;
+    for(unsigned int n=0; n < g_numremotes; ++n) {
+      const DownstreamState& dss = g_dstates[n];
+      for(unsigned int i=0 ; i < 65536; ++i) {
+       const IDState& ids = dss.idStates[i];
+       if(ids.used && ids.origFD >=0)
+         outstanding++;
+      }
+    }
+    cout<<outstanding<<" outstanding queries"<<endl;
+  }
+  return 0;
+}
+
 int main(int argc, char** argv)
 try
 {
   po::options_description desc("Allowed options"), hidden, alloptions;
   desc.add_options()
     ("help,h", "produce help message")
+    ("stats-interval,s", po::value<int>()->default_value(5), "produce statistics output every n seconds")
     ("local", po::value<vector<string> >(), "Listen on which address")
     ("verbose,v", "be verbose");
     
@@ -203,22 +239,23 @@ try
   vector<string> remotes = g_vm["remotes"].as<vector<string> >();
 
   g_numremotes = remotes.size();
-  g_socketstates = new SocketState[g_numremotes];
+  g_dstates = new DownstreamState[g_numremotes];
   int pos=0;
   BOOST_FOREACH(const string& remote, remotes) {
-    SocketState& ss = g_socketstates[pos++];
+    DownstreamState& dss = g_dstates[pos++];
  
-    ss.remote = ComboAddress(remote, 53);
+    dss.remote = ComboAddress(remote, 53);
     
-    ss.fd = Socket(ss.remote.sin4.sin_family, SOCK_DGRAM, 0);
-    Connect(ss.fd, ss.remote);
+    dss.fd = Socket(dss.remote.sin4.sin_family, SOCK_DGRAM, 0);
+    Connect(dss.fd, dss.remote);
 
-    ss.idStates.resize(65536);
-    BOOST_FOREACH(IDState& ids, ss.idStates) {
+    dss.idStates.resize(65536);
+    BOOST_FOREACH(IDState& ids, dss.idStates) {
       ids.origFD = -1;
+      ids.used = false;
     }
 
-    pthread_create(&ss.tid, 0, responderThread, (void*)&ss);
+    pthread_create(&dss.tid, 0, responderThread, (void*)&dss);
   }
 
   pthread_t tid;
@@ -241,6 +278,9 @@ try
     pthread_create(&tid, 0, clientThread, (void*) cs);
   }
 
+  pthread_t stattid;
+  pthread_create(&stattid, 0, statThread, 0);
+
   void* status;
   pthread_join(tid, &status);