]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
actually make our TCP support scale to multiple simultaneous clients, reuse threads...
authorbert hubert <bert.hubert@netherlabs.nl>
Thu, 27 Jun 2013 14:44:58 +0000 (16:44 +0200)
committerbert hubert <bert.hubert@netherlabs.nl>
Thu, 27 Jun 2013 14:44:58 +0000 (16:44 +0200)
pdns/dnsdist.cc

index 4347e0fa515286c8776aa5d50343070272bf1b4a..67f935b85e18a7782ffd409e083c1345422bcd14 100644 (file)
@@ -188,44 +188,6 @@ void* udpClientThread(void* p)
   return 0;
 }
 
-void* statThread(void*)
-{
-  int interval = 1;
-  if(!interval)
-    return 0;
-  uint32_t lastQueries=0;
-  vector<DownstreamState> prev;
-  prev.resize(g_numdownstreams);
-
-  for(;;) {
-    sleep(interval);
-    
-    unsigned int outstanding=0;
-    uint64_t numQueries=0;
-    for(unsigned int n=0; n < g_numdownstreams; ++n) {
-      DownstreamState& dss = g_dstates[n];
-      if(g_verbose)
-       cout<<'\t'<<dss.remote.toStringWithPort()<<": "<<dss.outstanding<<" outstanding, "<<(dss.queries - prev[n].queries)/interval <<" qps"<<endl;
-      outstanding += dss.outstanding;
-      prev[n].queries = dss.queries;
-      numQueries += dss.queries;
-      for(unsigned int i=0 ; i < g_maxOutstanding; ++i) {
-       IDState& ids = dss.idStates[i];
-       if(ids.origFD >=0 && ids.age++ > 2) {
-         ids.age = AtomicCounter();
-         ids.origFD = -1;
-         dss.reuseds++;
-         --dss.outstanding;
-       }         
-      }
-    }
-    if(g_verbose)
-      cout<<outstanding<<" outstanding queries, " <<(numQueries - lastQueries)/interval <<" qps"<<endl;
-    lastQueries=numQueries;
-  }
-  return 0;
-}
-
 /* TCP: the grand design. 
    We forward 'messages' between clients and downstream servers. Messages are 65k bytes large, tops. 
    An answer might theoretically consist of multiple messages (for example, in the case of AXFR), initially 
@@ -275,43 +237,84 @@ catch(...) {
   return false;
 }
 
+struct ConnectionInfo
+{
+  int fd;
+  ComboAddress remote;
+};
+
+void* tcpClientThread(void* p);
+class TCPClientCollection {
+  vector<int> d_tcpclientthreads;
+  AtomicCounter d_pos;
+public:
+  AtomicCounter d_queued, d_numthreads;
+
+  TCPClientCollection()
+  {
+    d_tcpclientthreads.reserve(1024);
+  }
+
+  int getThread() 
+  {
+    int pos = d_pos++;
+    ++d_queued;
+    return d_tcpclientthreads[pos % d_numthreads];
+  }
+
+  // Should not be called simultaneously!
+  void addTCPClientThread()
+  {  
+    if(g_verbose)
+      cout<<"Adding TCP Client thread"<<endl;
+    int pipefds[2];
+    if(pipe(pipefds) < 0)
+      unixDie("Creating pipe");
+    int *fd = new int(pipefds[0]);
+    d_tcpclientthreads.push_back(pipefds[1]);
+    
+    pthread_t tid;
+    pthread_create(&tid, 0, tcpClientThread, (void*)fd);
+    ++d_numthreads;
+  }
+} g_tcpclientthreads;
+
 
-/* spawn as many of these as required, they call Accept on a socket on which they will accept queries, and 
-   they will initiate downstream connections to service them. An attempt is made to reuse existing
-   connections.
-*/
 void* tcpClientThread(void* p)
 {
-  ClientState* cs = (ClientState*) p;
+  /* we get launched with a pipe on which we receive file descriptors from clients that we own
+     from that point on */
+  int pipefd = *(int*)p;
+  delete (int*)p;
 
-  ComboAddress remote;
-  remote.sin4.sin_family = cs->local.sin4.sin_family;
-  
   int dsock = -1;
-  int client = -1;
   DownstreamState *ds=0;
+  
   for(;;) {
-    try {
-      client = SAccept(cs->tcpFD, remote);
+    ConnectionInfo* citmp, ci;
+    readn2(pipefd, &citmp, sizeof(citmp));
+    --g_tcpclientthreads.d_queued;
+    ci=*citmp;
+    delete citmp;
+     
+    if(dsock == -1)
+      dsock = getTCPDownstream(&ds);
+    else {
       if(g_verbose)
-       cout << "Got connection from "<<remote.toStringWithPort()<<endl;
-
-      if(dsock == -1)
-       dsock = getTCPDownstream(&ds);
-      else {
-       if(g_verbose)
-         cout <<"Reusing existing TCP connection"<<endl;
-      }
+       cout <<"Reusing existing TCP connection"<<endl;
+    }
 
-      uint16_t qlen, rlen;
-      for(;;) {
-       if(!getMsgLen(client, &qlen))
+    uint16_t qlen, rlen;
+    try {
+      for(;;) {      
+       if(!getMsgLen(ci.fd, &qlen))
          break;
-
+       
        ds->queries++;
+       ds->outstanding++;
        char query[qlen];
-       readn2(client, query, qlen);
-      
+       readn2(ci.fd, query, qlen);
+       
       retry:; 
        if(!putMsgLen(dsock, qlen)) {
          if(g_verbose)
@@ -334,27 +337,92 @@ void* tcpClientThread(void* p)
        char answerbuffer[rlen];
        readn2(dsock, answerbuffer, rlen);
       
-       putMsgLen(client, rlen);
-       writen2(client, answerbuffer, rlen);
+       putMsgLen(ci.fd, rlen);
+       writen2(ci.fd, answerbuffer, rlen);
       }
-      if(g_verbose)
-       cout<<"Closing client connection"<<endl;
-      close(client); 
-      client=-1;
     }
-    catch(std::exception& e) 
-    {
-      if(client >= 0)
-       close(client);
+    catch(...){}
+    if(g_verbose)
+      cout<<"Closing client connection"<<endl;
+    close(ci.fd); 
+    ci.fd=-1;
+    --ds->outstanding;
+  }
+  return 0;
+}
 
-      continue;
-    } 
+
+/* spawn as many of these as required, they call Accept on a socket on which they will accept queries, and 
+   they will hand off to worker threads & spawn more of them if required
+*/
+void* tcpAcceptorThread(void* p)
+{
+  ClientState* cs = (ClientState*) p;
+
+  ComboAddress remote;
+  remote.sin4.sin_family = cs->local.sin4.sin_family;
+  
+  g_tcpclientthreads.addTCPClientThread();
+
+  for(;;) {
+    try {
+      ConnectionInfo* ci = new ConnectionInfo;      
+      ci->fd = SAccept(cs->tcpFD, remote);
+      if(g_verbose)
+       cout << "Got connection from "<<remote.toStringWithPort()<<endl;
+      
+      ci->remote = remote;
+      writen2(g_tcpclientthreads.getThread(), &ci, sizeof(ci));
+    }
+    catch(...){}
   }
 
   return 0;
 }
 
 
+void* statThread(void*)
+{
+  int interval = 1;
+  if(!interval)
+    return 0;
+  uint32_t lastQueries=0;
+  vector<DownstreamState> prev;
+  prev.resize(g_numdownstreams);
+
+  for(;;) {
+    sleep(interval);
+    
+    if(g_tcpclientthreads.d_queued > 1 && g_tcpclientthreads.d_numthreads < 10)
+      g_tcpclientthreads.addTCPClientThread();
+
+    unsigned int outstanding=0;
+    uint64_t numQueries=0;
+    for(unsigned int n=0; n < g_numdownstreams; ++n) {
+      DownstreamState& dss = g_dstates[n];
+      if(g_verbose)
+       cout<<'\t'<<dss.remote.toStringWithPort()<<": "<<dss.outstanding<<" outstanding, "<<(dss.queries - prev[n].queries)/interval <<" qps"<<endl;
+      outstanding += dss.outstanding;
+      prev[n].queries = dss.queries;
+      numQueries += dss.queries;
+      for(unsigned int i=0 ; i < g_maxOutstanding; ++i) {
+       IDState& ids = dss.idStates[i];
+       if(ids.origFD >=0 && ids.age++ > 2) {
+         ids.age = AtomicCounter();
+         ids.origFD = -1;
+         dss.reuseds++;
+         --dss.outstanding;
+       }         
+      }
+    }
+    if(g_verbose)
+      cout<<outstanding<<" outstanding queries, " <<(numQueries - lastQueries)/interval <<" qps"<<endl;
+    lastQueries=numQueries;
+  }
+  return 0;
+}
+
+
 int main(int argc, char** argv)
 try
 {
@@ -440,7 +508,7 @@ try
 
     SBind(cs->tcpFD, cs->local);
     SListen(cs->tcpFD, 64);
-    pthread_create(&tid, 0, tcpClientThread, (void*) cs);
+    pthread_create(&tid, 0, tcpAcceptorThread, (void*) cs);
   }
 
   pthread_t stattid;