return 0;
}
-void* statThread(void*)
-{
- int interval = 1;
- if(!interval)
- return 0;
- uint32_t lastQueries=0;
- vector<DownstreamState> prev;
- prev.resize(g_numdownstreams);
-
- for(;;) {
- sleep(interval);
-
- unsigned int outstanding=0;
- uint64_t numQueries=0;
- for(unsigned int n=0; n < g_numdownstreams; ++n) {
- DownstreamState& dss = g_dstates[n];
- if(g_verbose)
- cout<<'\t'<<dss.remote.toStringWithPort()<<": "<<dss.outstanding<<" outstanding, "<<(dss.queries - prev[n].queries)/interval <<" qps"<<endl;
- outstanding += dss.outstanding;
- prev[n].queries = dss.queries;
- numQueries += dss.queries;
- for(unsigned int i=0 ; i < g_maxOutstanding; ++i) {
- IDState& ids = dss.idStates[i];
- if(ids.origFD >=0 && ids.age++ > 2) {
- ids.age = AtomicCounter();
- ids.origFD = -1;
- dss.reuseds++;
- --dss.outstanding;
- }
- }
- }
- if(g_verbose)
- cout<<outstanding<<" outstanding queries, " <<(numQueries - lastQueries)/interval <<" qps"<<endl;
- lastQueries=numQueries;
- }
- return 0;
-}
-
/* TCP: the grand design.
We forward 'messages' between clients and downstream servers. Messages are 65k bytes large, tops.
An answer might theoretically consist of multiple messages (for example, in the case of AXFR), initially
return false;
}
+struct ConnectionInfo
+{
+ int fd;
+ ComboAddress remote;
+};
+
+void* tcpClientThread(void* p);
+class TCPClientCollection {
+ vector<int> d_tcpclientthreads;
+ AtomicCounter d_pos;
+public:
+ AtomicCounter d_queued, d_numthreads;
+
+ TCPClientCollection()
+ {
+ d_tcpclientthreads.reserve(1024);
+ }
+
+ int getThread()
+ {
+ int pos = d_pos++;
+ ++d_queued;
+ return d_tcpclientthreads[pos % d_numthreads];
+ }
+
+ // Should not be called simultaneously!
+ void addTCPClientThread()
+ {
+ if(g_verbose)
+ cout<<"Adding TCP Client thread"<<endl;
+ int pipefds[2];
+ if(pipe(pipefds) < 0)
+ unixDie("Creating pipe");
+ int *fd = new int(pipefds[0]);
+ d_tcpclientthreads.push_back(pipefds[1]);
+
+ pthread_t tid;
+ pthread_create(&tid, 0, tcpClientThread, (void*)fd);
+ ++d_numthreads;
+ }
+} g_tcpclientthreads;
+
-/* spawn as many of these as required, they call Accept on a socket on which they will accept queries, and
- they will initiate downstream connections to service them. An attempt is made to reuse existing
- connections.
-*/
void* tcpClientThread(void* p)
{
- ClientState* cs = (ClientState*) p;
+ /* we get launched with a pipe on which we receive file descriptors from clients that we own
+ from that point on */
+ int pipefd = *(int*)p;
+ delete (int*)p;
- ComboAddress remote;
- remote.sin4.sin_family = cs->local.sin4.sin_family;
-
int dsock = -1;
- int client = -1;
DownstreamState *ds=0;
+
for(;;) {
- try {
- client = SAccept(cs->tcpFD, remote);
+ ConnectionInfo* citmp, ci;
+ readn2(pipefd, &citmp, sizeof(citmp));
+ --g_tcpclientthreads.d_queued;
+ ci=*citmp;
+ delete citmp;
+
+ if(dsock == -1)
+ dsock = getTCPDownstream(&ds);
+ else {
if(g_verbose)
- cout << "Got connection from "<<remote.toStringWithPort()<<endl;
-
- if(dsock == -1)
- dsock = getTCPDownstream(&ds);
- else {
- if(g_verbose)
- cout <<"Reusing existing TCP connection"<<endl;
- }
+ cout <<"Reusing existing TCP connection"<<endl;
+ }
- uint16_t qlen, rlen;
- for(;;) {
- if(!getMsgLen(client, &qlen))
+ uint16_t qlen, rlen;
+ try {
+ for(;;) {
+ if(!getMsgLen(ci.fd, &qlen))
break;
-
+
ds->queries++;
+ ds->outstanding++;
char query[qlen];
- readn2(client, query, qlen);
-
+ readn2(ci.fd, query, qlen);
+
retry:;
if(!putMsgLen(dsock, qlen)) {
if(g_verbose)
char answerbuffer[rlen];
readn2(dsock, answerbuffer, rlen);
- putMsgLen(client, rlen);
- writen2(client, answerbuffer, rlen);
+ putMsgLen(ci.fd, rlen);
+ writen2(ci.fd, answerbuffer, rlen);
}
- if(g_verbose)
- cout<<"Closing client connection"<<endl;
- close(client);
- client=-1;
}
- catch(std::exception& e)
- {
- if(client >= 0)
- close(client);
+ catch(...){}
+ if(g_verbose)
+ cout<<"Closing client connection"<<endl;
+ close(ci.fd);
+ ci.fd=-1;
+ --ds->outstanding;
+ }
+ return 0;
+}
- continue;
- }
+
+/* spawn as many of these as required, they call Accept on a socket on which they will accept queries, and
+ they will hand off to worker threads & spawn more of them if required
+*/
+void* tcpAcceptorThread(void* p)
+{
+ ClientState* cs = (ClientState*) p;
+
+ ComboAddress remote;
+ remote.sin4.sin_family = cs->local.sin4.sin_family;
+
+ g_tcpclientthreads.addTCPClientThread();
+
+ for(;;) {
+ try {
+ ConnectionInfo* ci = new ConnectionInfo;
+ ci->fd = SAccept(cs->tcpFD, remote);
+ if(g_verbose)
+ cout << "Got connection from "<<remote.toStringWithPort()<<endl;
+
+ ci->remote = remote;
+ writen2(g_tcpclientthreads.getThread(), &ci, sizeof(ci));
+ }
+ catch(...){}
}
return 0;
}
+void* statThread(void*)
+{
+ int interval = 1;
+ if(!interval)
+ return 0;
+ uint32_t lastQueries=0;
+ vector<DownstreamState> prev;
+ prev.resize(g_numdownstreams);
+
+ for(;;) {
+ sleep(interval);
+
+ if(g_tcpclientthreads.d_queued > 1 && g_tcpclientthreads.d_numthreads < 10)
+ g_tcpclientthreads.addTCPClientThread();
+
+ unsigned int outstanding=0;
+ uint64_t numQueries=0;
+ for(unsigned int n=0; n < g_numdownstreams; ++n) {
+ DownstreamState& dss = g_dstates[n];
+ if(g_verbose)
+ cout<<'\t'<<dss.remote.toStringWithPort()<<": "<<dss.outstanding<<" outstanding, "<<(dss.queries - prev[n].queries)/interval <<" qps"<<endl;
+ outstanding += dss.outstanding;
+ prev[n].queries = dss.queries;
+ numQueries += dss.queries;
+ for(unsigned int i=0 ; i < g_maxOutstanding; ++i) {
+ IDState& ids = dss.idStates[i];
+ if(ids.origFD >=0 && ids.age++ > 2) {
+ ids.age = AtomicCounter();
+ ids.origFD = -1;
+ dss.reuseds++;
+ --dss.outstanding;
+ }
+ }
+ }
+ if(g_verbose)
+ cout<<outstanding<<" outstanding queries, " <<(numQueries - lastQueries)/interval <<" qps"<<endl;
+ lastQueries=numQueries;
+ }
+ return 0;
+}
+
+
int main(int argc, char** argv)
try
{
SBind(cs->tcpFD, cs->local);
SListen(cs->tcpFD, 64);
- pthread_create(&tid, 0, tcpClientThread, (void*) cs);
+ pthread_create(&tid, 0, tcpAcceptorThread, (void*) cs);
}
pthread_t stattid;