From: Remi Gacogne Date: Tue, 3 Jul 2018 07:33:46 +0000 (+0200) Subject: rec: Add support for several distributor threads X-Git-Tag: dnsdist-1.3.3~173^2~2 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=b243ca3bd7308fe82ef500575477c1be3e7c2f90;p=thirdparty%2Fpdns.git rec: Add support for several distributor threads --- diff --git a/pdns/pdns_recursor.cc b/pdns/pdns_recursor.cc index 1bd772c1b5..ab69e74c81 100644 --- a/pdns/pdns_recursor.cc +++ b/pdns/pdns_recursor.cc @@ -104,7 +104,7 @@ typedef map tcpClientCounts_t; static thread_local std::shared_ptr t_pdl; -static thread_local int t_id = -1; +static thread_local unsigned int t_id = 0; static thread_local std::shared_ptr t_traceRegex; static thread_local std::unique_ptr t_tcpClientCounts; #ifdef HAVE_PROTOBUF @@ -127,32 +127,50 @@ thread_local std::shared_ptr t_nodDBp; #endif /* NOD_ENABLED */ __thread struct timeval g_now; // timestamp, updated (too) frequently +typedef vector > > deferredAdd_t; + // for communicating with our threads -struct ThreadPipeSet -{ - int writeToThread; - int readToThread; - int writeFromThread; - int readFromThread; - int writeQueriesToThread; // this one is non-blocking - int readQueriesToThread; +// effectively readonly after startup +struct RecThreadInfo +{ + struct ThreadPipeSet + { + int writeToThread{-1}; + int readToThread{-1}; + int writeFromThread{-1}; + int readFromThread{-1}; + int writeQueriesToThread{-1}; // this one is non-blocking + int readQueriesToThread{-1}; + }; + + /* FD corresponding to listening sockets if we have one socket per + listener (with reuseport), otherwise all listeners share the + same FD and g_deferredAdds is then used instead */ + deferredAdd_t deferredAdds; + struct ThreadPipeSet pipes; + std::thread thread; + /* handle the web server, carbon, statistics and the control channel */ + bool isHandler{false}; + /* accept incoming queries (and distributes them to the workers if pdns-distributes-queries is set) */ + bool isListener{false}; + /* process queries */ + bool isWorker{false}; }; -/* the TID of the thread handling the web server, carbon, statistics and the control channel */ -static const int s_handlerThreadID = -1; -/* when pdns-distributes-queries is set, the TID of the thread handling, hashing and distributing new queries - to the other threads */ -static const int s_distributorThreadID = 0; +/* first we have the handler thread, t_id == 0 (some other + helper threads like SNMP might have t_id == 0 as well) + then the distributor threads if any + and finally the workers */ +static std::vector s_threadInfos; +/* without reuseport, all listeners share the same sockets */ +static deferredAdd_t g_deferredAdds; typedef vector tcpListenSockets_t; typedef map listenSocketsAddresses_t; // is shared across all threads right now -typedef vector > > deferredAdd_t; static const ComboAddress g_local4("0.0.0.0"), g_local6("::"); -static vector g_pipes; // effectively readonly after startup static tcpListenSockets_t g_tcpListenSockets; // shared across threads, but this is fine, never written to from a thread. All threads listen on all sockets static listenSocketsAddresses_t g_listenSocketsAddresses; // is shared across all threads right now -static std::unordered_map deferredAdds; static set g_fromtosockets; // listen sockets that use 'sendfromto()' mechanism static vector g_localQueryAddresses4, g_localQueryAddresses6; static AtomicCounter counter; @@ -166,6 +184,7 @@ static uint32_t g_disthashseed; static unsigned int g_maxTCPPerClient; static unsigned int g_networkTimeoutMsec; static unsigned int g_maxMThreads; +static unsigned int g_numDistributorThreads; static unsigned int g_numWorkerThreads; static int g_tcpTimeout; static uint16_t g_udpTruncationThreshold; @@ -174,9 +193,8 @@ static std::atomic statsWanted; static std::atomic g_quiet; static bool g_logCommonErrors; static bool g_anyToTcp; -static bool g_weDistributeQueries; // if true, only 1 thread listens on the incoming query sockets +static bool g_weDistributeQueries; // if true, 1 or more threads listen on the incoming query sockets and distribute them to workers static bool g_reusePort{false}; -static bool g_useOneSocketPerThread; static bool g_gettagNeedsEDNSOptions{false}; static time_t g_statisticsInterval; static bool g_useIncomingECS; @@ -195,7 +213,7 @@ static std::set s_avoidUdpSourcePorts; static uint16_t s_minUdpSourcePort; static uint16_t s_maxUdpSourcePort; -RecursorControlChannel s_rcc; // only active in thread 0 +RecursorControlChannel s_rcc; // only active in the handler thread RecursorStats g_stats; string s_programname="pdns_recursor"; string s_pidfname; @@ -310,6 +328,24 @@ int getMTaskerTID() return MT->getTid(); } +static bool isDistributorThread() +{ + if (t_id == 0) { + return false; + } + + return g_weDistributeQueries && s_threadInfos.at(t_id).isListener; +} + +static bool isHandlerThread() +{ + if (t_id == 0) { + return true; + } + + return s_threadInfos.at(t_id).isHandler; +} + static void handleTCPClientWritable(int fd, FDMultiplexer::funcparam_t& var); // -1 is error, 0 is timeout, 1 is success @@ -2208,7 +2244,7 @@ static void handleNewUDPQuestion(int fd, FDMultiplexer::funcparam_t& var) } } -static void makeTCPServerSockets(unsigned int threadId) +static void makeTCPServerSockets(deferredAdd_t& deferredAdds) { int fd; vectorlocals; @@ -2283,7 +2319,7 @@ static void makeTCPServerSockets(unsigned int threadId) setNonBlocking(fd); setSocketSendBuffer(fd, 65000); listen(fd, 128); - deferredAdds[threadId].push_back(make_pair(fd, handleNewTCPQuestion)); + deferredAdds.push_back(make_pair(fd, handleNewTCPQuestion)); g_tcpListenSockets.push_back(fd); // we don't need to update g_listenSocketsAddresses since it doesn't work for TCP/IP: // - fd is not that which we know here, but returned from accept() @@ -2294,7 +2330,7 @@ static void makeTCPServerSockets(unsigned int threadId) } } -static void makeUDPServerSockets(unsigned int threadId) +static void makeUDPServerSockets(deferredAdd_t& deferredAdds) { int one=1; vectorlocals; @@ -2359,7 +2395,7 @@ static void makeUDPServerSockets(unsigned int threadId) setNonBlocking(fd); - deferredAdds[threadId].push_back(make_pair(fd, handleNewUDPQuestion)); + deferredAdds.push_back(make_pair(fd, handleNewUDPQuestion)); g_listenSocketsAddresses[fd]=sin; // this is written to only from the startup thread, not from the workers if(sin.sin4.sin_family == AF_INET) g_log<= 3600) { try { @@ -2496,7 +2532,6 @@ static void houseKeeping(void *) { g_log<func = func; tmsg->wantAnswer = true; - if(write(tps.writeToThread, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) { + if(write(threadInfo.pipes.writeToThread, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) { delete tmsg; + unixDie("write to thread pipe returned wrong size or error"); } string* resp = nullptr; - if(read(tps.readFromThread, &resp, sizeof(resp)) != sizeof(resp)) + if(read(threadInfo.pipes.readFromThread, &resp, sizeof(resp)) != sizeof(resp)) unixDie("read from thread pipe returned wrong size or error"); if(resp) { @@ -2585,23 +2623,24 @@ void broadcastFunction(const pipefunc_t& func) } } -// This function is only called by the distributor thread, when pdns-distributes-queries is set +// This function is only called by the distributor threads, when pdns-distributes-queries is set void distributeAsyncFunction(const string& packet, const pipefunc_t& func) { - if (t_id != s_distributorThreadID) { + if (!isDistributorThread()) { g_log<(s_distributorThreadID)) { - g_log<func = func; tmsg->wantAnswer = false; @@ -2645,7 +2684,8 @@ static void handlePipeRequest(int fd, FDMultiplexer::funcparam_t& var) g_log<wantAnswer) { - if(write(g_pipes[t_id].writeFromThread, &resp, sizeof(resp)) != sizeof(resp)) { + const auto& threadInfo = s_threadInfos.at(t_id); + if(write(threadInfo.pipes.writeFromThread, &resp, sizeof(resp)) != sizeof(resp)) { delete tmsg; unixDie("write to thread pipe returned wrong size or error"); } @@ -2684,15 +2724,19 @@ vector >& operator+=(vector >&a, and by the SNMP thread to gather metrics. */ template T broadcastAccFunction(const boost::function& func) { - /* the SNMP thread uses id -1 too */ - if (t_id != s_handlerThreadID) { + if (!isHandlerThread()) { g_log<func = boost::bind(voider, func); tmsg->wantAnswer = true; @@ -3015,7 +3059,7 @@ static void checkOrFixFDS() } } -static void* recursorThread(int tid, bool worker); +static void* recursorThread(unsigned int tid); static void* pleaseSupplantACLs(std::shared_ptr ng) { @@ -3317,9 +3361,10 @@ static int serviceMain(int argc, char*argv[]) g_quiet=::arg().mustDo("quiet"); + /* this needs to be done before parseACLs(), which call broadcastFunction() */ g_weDistributeQueries = ::arg().mustDo("pdns-distributes-queries"); if(g_weDistributeQueries) { - g_log< workers(g_numThreads); if(g_numThreads == 1) { g_log<(new addrringbuf_t()); - if(g_weDistributeQueries) // if so, only 1 thread does recvfrom - t_remotes->set_capacity(::arg().asNum("stats-ringbuffer-entries")); + if(g_weDistributeQueries) + t_remotes->set_capacity(::arg().asNum("stats-ringbuffer-entries") / g_numDistributorThreads); else t_remotes->set_capacity(ringsize); t_servfailremotes = std::unique_ptr(new addrringbuf_t()); @@ -3652,7 +3742,7 @@ try t_fdm=getMultiplexer(); - if(!worker) { + if(threadInfo.isHandler) { if(::arg().mustDo("webserver")) { g_log<getName() << "' multiplexer"<addReadFD(g_pipes[t_id].readToThread, handlePipeRequest); - t_fdm->addReadFD(g_pipes[t_id].readQueriesToThread, handlePipeRequest); - if(g_useOneSocketPerThread) { - for(deferredAdd_t::const_iterator i = deferredAdds[t_id].cbegin(); i != deferredAdds[t_id].cend(); ++i) { - t_fdm->addReadFD(i->first, i->second); + t_fdm->addReadFD(threadInfo.pipes.readToThread, handlePipeRequest); + t_fdm->addReadFD(threadInfo.pipes.readQueriesToThread, handlePipeRequest); + + if (threadInfo.isListener) { + if (g_reusePort) { + /* then every listener has its own FDs */ + for(const auto deferred : threadInfo.deferredAdds) { + t_fdm->addReadFD(deferred.first, deferred.second); + } } - } - else { - if(!g_weDistributeQueries || t_id == s_distributorThreadID) { // if we distribute queries, only t_id = 0 listens - for(deferredAdd_t::const_iterator i = deferredAdds[0].cbegin(); i != deferredAdds[0].cend(); ++i) { - t_fdm->addReadFD(i->first, i->second); + else { + /* otherwise all listeners are listening on the same ones */ + for(const auto deferred : g_deferredAdds) { + t_fdm->addReadFD(deferred.first, deferred.second); } } } @@ -3685,7 +3778,7 @@ try registerAllStats(); - if(!worker) { + if(threadInfo.isHandler) { t_fdm->addReadFD(s_rcc.d_fd, handleRCC); // control channel } @@ -3719,7 +3812,7 @@ try counter++; - if(!worker) { + if(threadInfo.isHandler) { if(statsWanted || (g_statisticsInterval > 0 && (g_now.tv_sec - last_stat) >= g_statisticsInterval)) { doStats(); last_stat = g_now.tv_sec; @@ -3734,7 +3827,7 @@ try } if (t_pdl != nullptr) { // lua-dns-script directive is present, call the maintenance callback if needed - if (worker && (!g_weDistributeQueries || t_id != s_distributorThreadID)) { + if (threadInfo.isWorker) { // Only on threads processing queries if(g_now.tv_sec - last_lua_maintenance >= luaMaintenanceInterval) { t_pdl->maintenance(); @@ -3746,18 +3839,20 @@ try t_fdm->run(&g_now); // 'run' updates g_now for us - if(worker && (!g_weDistributeQueries || t_id == s_distributorThreadID)) { // if pdns distributes queries, only tid 0 should do this + if(threadInfo.isListener) { if(listenOnTCP) { if(TCPConnection::getCurrentConnections() > maxTcpClients) { // shutdown, too many connections - for(tcpListenSockets_t::iterator i=g_tcpListenSockets.begin(); i != g_tcpListenSockets.end(); ++i) - t_fdm->removeReadFD(*i); + for(const auto fd : g_tcpListenSockets) { + t_fdm->removeReadFD(fd); + } listenOnTCP=false; } } else { if(TCPConnection::getCurrentConnections() <= maxTcpClients) { // reenable - for(tcpListenSockets_t::iterator i=g_tcpListenSockets.begin(); i != g_tcpListenSockets.end(); ++i) - t_fdm->addReadFD(*i, handleNewTCPQuestion); + for(const auto fd : g_tcpListenSockets) { + t_fdm->addReadFD(fd, handleNewTCPQuestion); + } listenOnTCP=true; } } @@ -3810,6 +3905,7 @@ int main(int argc, char **argv) ::arg().set("setuid","If set, change user id to this uid for more security")=""; ::arg().set("network-timeout", "Wait this number of milliseconds for network i/o")="1500"; ::arg().set("threads", "Launch this number of threads")="2"; + ::arg().set("distributor-threads", "Launch this number of distributor threads, distributing queries to other threads")="0"; ::arg().set("processes", "Launch this number of processes (EXPERIMENTAL, DO NOT CHANGE)")="1"; // if we un-experimental this, need to fix openssl rand seeding for multiple PIDs! ::arg().set("config-name","Name of this virtual configuration - will rename the binary image")=""; ::arg().set("api-config-dir", "Directory where REST API stores config and zones") = ""; @@ -3963,8 +4059,21 @@ int main(int argc, char **argv) ::arg().set("delegation-only")=toLower(::arg()["delegation-only"]); - if(::arg().asNum("threads")==1) - ::arg().set("pdns-distributes-queries")="no"; + if(::arg().asNum("threads")==1) { + if (::arg().mustDo("pdns-distributes-queries")) { + g_log<