From: bert hubert Date: Thu, 27 Jun 2013 14:44:58 +0000 (+0200) Subject: actually make our TCP support scale to multiple simultaneous clients, reuse threads... X-Git-Tag: rec-3.6.0-rc1~618 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=3ae86514ad673fa7de5261acb50eaf13a09cf968;p=thirdparty%2Fpdns.git actually make our TCP support scale to multiple simultaneous clients, reuse threads & downstream connections --- diff --git a/pdns/dnsdist.cc b/pdns/dnsdist.cc index 4347e0fa51..67f935b85e 100644 --- a/pdns/dnsdist.cc +++ b/pdns/dnsdist.cc @@ -188,44 +188,6 @@ void* udpClientThread(void* p) return 0; } -void* statThread(void*) -{ - int interval = 1; - if(!interval) - return 0; - uint32_t lastQueries=0; - vector prev; - prev.resize(g_numdownstreams); - - for(;;) { - sleep(interval); - - unsigned int outstanding=0; - uint64_t numQueries=0; - for(unsigned int n=0; n < g_numdownstreams; ++n) { - DownstreamState& dss = g_dstates[n]; - if(g_verbose) - cout<<'\t'<=0 && ids.age++ > 2) { - ids.age = AtomicCounter(); - ids.origFD = -1; - dss.reuseds++; - --dss.outstanding; - } - } - } - if(g_verbose) - cout< d_tcpclientthreads; + AtomicCounter d_pos; +public: + AtomicCounter d_queued, d_numthreads; + + TCPClientCollection() + { + d_tcpclientthreads.reserve(1024); + } + + int getThread() + { + int pos = d_pos++; + ++d_queued; + return d_tcpclientthreads[pos % d_numthreads]; + } + + // Should not be called simultaneously! + void addTCPClientThread() + { + if(g_verbose) + cout<<"Adding TCP Client thread"<local.sin4.sin_family; - int dsock = -1; - int client = -1; DownstreamState *ds=0; + for(;;) { - try { - client = SAccept(cs->tcpFD, remote); + ConnectionInfo* citmp, ci; + readn2(pipefd, &citmp, sizeof(citmp)); + --g_tcpclientthreads.d_queued; + ci=*citmp; + delete citmp; + + if(dsock == -1) + dsock = getTCPDownstream(&ds); + else { if(g_verbose) - cout << "Got connection from "<queries++; + ds->outstanding++; char query[qlen]; - readn2(client, query, qlen); - + readn2(ci.fd, query, qlen); + retry:; if(!putMsgLen(dsock, qlen)) { if(g_verbose) @@ -334,27 +337,92 @@ void* tcpClientThread(void* p) char answerbuffer[rlen]; readn2(dsock, answerbuffer, rlen); - putMsgLen(client, rlen); - writen2(client, answerbuffer, rlen); + putMsgLen(ci.fd, rlen); + writen2(ci.fd, answerbuffer, rlen); } - if(g_verbose) - cout<<"Closing client connection"<= 0) - close(client); + catch(...){} + if(g_verbose) + cout<<"Closing client connection"<outstanding; + } + return 0; +} - continue; - } + +/* spawn as many of these as required, they call Accept on a socket on which they will accept queries, and + they will hand off to worker threads & spawn more of them if required +*/ +void* tcpAcceptorThread(void* p) +{ + ClientState* cs = (ClientState*) p; + + ComboAddress remote; + remote.sin4.sin_family = cs->local.sin4.sin_family; + + g_tcpclientthreads.addTCPClientThread(); + + for(;;) { + try { + ConnectionInfo* ci = new ConnectionInfo; + ci->fd = SAccept(cs->tcpFD, remote); + if(g_verbose) + cout << "Got connection from "<remote = remote; + writen2(g_tcpclientthreads.getThread(), &ci, sizeof(ci)); + } + catch(...){} } return 0; } +void* statThread(void*) +{ + int interval = 1; + if(!interval) + return 0; + uint32_t lastQueries=0; + vector prev; + prev.resize(g_numdownstreams); + + for(;;) { + sleep(interval); + + if(g_tcpclientthreads.d_queued > 1 && g_tcpclientthreads.d_numthreads < 10) + g_tcpclientthreads.addTCPClientThread(); + + unsigned int outstanding=0; + uint64_t numQueries=0; + for(unsigned int n=0; n < g_numdownstreams; ++n) { + DownstreamState& dss = g_dstates[n]; + if(g_verbose) + cout<<'\t'<=0 && ids.age++ > 2) { + ids.age = AtomicCounter(); + ids.origFD = -1; + dss.reuseds++; + --dss.outstanding; + } + } + } + if(g_verbose) + cout<tcpFD, cs->local); SListen(cs->tcpFD, 64); - pthread_create(&tid, 0, tcpClientThread, (void*) cs); + pthread_create(&tid, 0, tcpAcceptorThread, (void*) cs); } pthread_t stattid;