From: Otto Moerbeek Date: Tue, 25 Jan 2022 12:25:29 +0000 (+0100) Subject: Refactor RecThreadInfo to be more than just a struct X-Git-Tag: auth-4.7.0-alpha1~19^2~12 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=69b39198f7bc03e0ff53d450fa50f7d8c5ce7a65;p=thirdparty%2Fpdns.git Refactor RecThreadInfo to be more than just a struct --- diff --git a/pdns/lua-recursor4.cc b/pdns/lua-recursor4.cc index b346c3f413..4b49c68828 100644 --- a/pdns/lua-recursor4.cc +++ b/pdns/lua-recursor4.cc @@ -422,8 +422,8 @@ void RecursorLua4::postPrepareContext() }); d_lw->writeFunction("getRecursorThreadId", []() { - return getRecursorThreadId(); - }); + return RecThreadInfo::id(); + }); d_lw->writeFunction("sendCustomSNMPTrap", [](const std::string& str) { if (g_snmpAgent) { diff --git a/pdns/pdns_recursor.cc b/pdns/pdns_recursor.cc index 5f0414013d..625050fb68 100644 --- a/pdns/pdns_recursor.cc +++ b/pdns/pdns_recursor.cc @@ -43,7 +43,6 @@ #endif /* NOD_ENABLED */ thread_local std::shared_ptr t_pdl; -thread_local unsigned int t_id = 0; thread_local std::shared_ptr t_traceRegex; thread_local std::shared_ptr>> t_protobufServers{nullptr}; thread_local std::shared_ptr>> t_outgoingProtobufServers{nullptr}; @@ -964,7 +963,7 @@ void startDoResolve(void* p) } if (!g_quiet || tracedQuery) { - g_log << Logger::Warning << t_id << " [" << MT->getTid() << "/" << MT->numProcesses() << "] " << (dc->d_tcp ? "TCP " : "") << "question for '" << dc->d_mdp.d_qname << "|" + g_log << Logger::Warning << RecThreadInfo::id() << " [" << MT->getTid() << "/" << MT->numProcesses() << "] " << (dc->d_tcp ? "TCP " : "") << "question for '" << dc->d_mdp.d_qname << "|" << QType(dc->d_mdp.d_qtype) << "' from " << dc->getRemote(); if (!dc->d_ednssubnet.source.empty()) { g_log << " (ecs " << dc->d_ednssubnet.source.toString() << ")"; @@ -1555,7 +1554,7 @@ void startDoResolve(void* p) // Now it always uses an integral number of microseconds, except for averages, which use doubles uint64_t spentUsec = uSec(sr.getNow() - dc->d_now); if (!g_quiet) { - g_log << Logger::Error << t_id << " [" << MT->getTid() << "/" << MT->numProcesses() << "] answer to " << (dc->d_mdp.d_header.rd ? "" : "non-rd ") << "question '" << dc->d_mdp.d_qname << "|" << DNSRecordContent::NumberToType(dc->d_mdp.d_qtype); + g_log << Logger::Error << RecThreadInfo::id() << " [" << MT->getTid() << "/" << MT->numProcesses() << "] answer to " << (dc->d_mdp.d_header.rd ? "" : "non-rd ") << "question '" << dc->d_mdp.d_qname << "|" << DNSRecordContent::NumberToType(dc->d_mdp.d_qtype); g_log << "': " << ntohs(pw.getHeader()->ancount) << " answers, " << ntohs(pw.getHeader()->arcount) << " additional, took " << sr.d_outqueries << " packets, " << sr.d_totUsec / 1000.0 << " netw ms, " << spentUsec / 1000.0 << " tot ms, " << sr.d_throttledqueries << " throttled, " << sr.d_timeouts << " timeouts, " << sr.d_tcpoutqueries << "/" << sr.d_dotoutqueries << " tcp/dot connections, rcode=" << res; if (!shouldNotValidate && sr.isDNSSECValidationRequested()) { @@ -1771,7 +1770,7 @@ bool expectProxyProtocol(const ComboAddress& from) static string* doProcessUDPQuestion(const std::string& question, const ComboAddress& fromaddr, const ComboAddress& destaddr, ComboAddress source, ComboAddress destination, struct timeval tv, int fd, std::vector& proxyProtocolValues, RecEventTrace& eventTrace) { - ++g_threadInfos[t_id].numberOfDistributedQueries; + ++(RecThreadInfo::self().numberOfDistributedQueries); gettimeofday(&g_now, nullptr); if (tv.tv_sec) { struct timeval diff = g_now - tv; @@ -1907,7 +1906,7 @@ static string* doProcessUDPQuestion(const std::string& question, const ComboAddr eventTrace.add(RecEventTrace::PCacheCheck, cacheHit, false); if (cacheHit) { if (!g_quiet) { - g_log << Logger::Notice << t_id << " question answered from packet cache tag=" << ctag << " from " << source.toStringWithPort() << (source != fromaddr ? " (via " + fromaddr.toStringWithPort() + ")" : "") << endl; + g_log << Logger::Notice << RecThreadInfo::id() << " question answered from packet cache tag=" << ctag << " from " << source.toStringWithPort() << (source != fromaddr ? " (via " + fromaddr.toStringWithPort() + ")" : "") << endl; } struct msghdr msgh; struct iovec iov; @@ -1952,7 +1951,7 @@ static string* doProcessUDPQuestion(const std::string& question, const ComboAddr bool ipf = t_pdl->ipfilter(source, destination, *dh, eventTrace); if (ipf) { if (!g_quiet) { - g_log << Logger::Notice << t_id << " [" << MT->getTid() << "/" << MT->numProcesses() << "] DROPPED question from " << source.toStringWithPort() << (source != fromaddr ? " (via " + fromaddr.toStringWithPort() + ")" : "") << " based on policy" << endl; + g_log << Logger::Notice << RecThreadInfo::id() << " [" << MT->getTid() << "/" << MT->numProcesses() << "] DROPPED question from " << source.toStringWithPort() << (source != fromaddr ? " (via " + fromaddr.toStringWithPort() + ")" : "") << " based on policy" << endl; } g_stats.policyDrops++; return 0; @@ -1970,7 +1969,7 @@ static string* doProcessUDPQuestion(const std::string& question, const ComboAddr } if (!g_quiet) { - g_log << Logger::Notice << t_id << " got NOTIFY for " << qname.toLogString() << " from " << source.toStringWithPort() << (source != fromaddr ? " (via " + fromaddr.toStringWithPort() + ")" : "") << endl; + g_log << Logger::Notice << RecThreadInfo::id() << " got NOTIFY for " << qname.toLogString() << " from " << source.toStringWithPort() << (source != fromaddr ? " (via " + fromaddr.toStringWithPort() + ")" : "") << endl; } requestWipeCaches(qname); @@ -1984,7 +1983,7 @@ static string* doProcessUDPQuestion(const std::string& question, const ComboAddr if (MT->numProcesses() > g_maxMThreads) { if (!g_quiet) - g_log << Logger::Notice << t_id << " [" << MT->getTid() << "/" << MT->numProcesses() << "] DROPPED question from " << source.toStringWithPort() << (source != fromaddr ? " (via " + fromaddr.toStringWithPort() + ")" : "") << ", over capacity" << endl; + g_log << Logger::Notice << RecThreadInfo::id() << " [" << MT->getTid() << "/" << MT->numProcesses() << "] DROPPED question from " << source.toStringWithPort() << (source != fromaddr ? " (via " + fromaddr.toStringWithPort() + ")" : "") << ", over capacity" << endl; g_stats.overCapacityDrops++; return 0; @@ -2188,7 +2187,7 @@ static void handleNewUDPQuestion(int fd, FDMultiplexer::funcparam_t& var) destination = dest; } - if (g_weDistributeQueries) { + if (RecThreadInfo::s_weDistributeQueries) { std::string localdata = data; distributeAsyncFunction(data, [localdata, fromaddr, dest, source, destination, tv, fd, proxyProtocolValues, eventTrace]() mutable { return doProcessUDPQuestion(localdata, fromaddr, dest, source, destination, tv, fd, proxyProtocolValues, eventTrace); @@ -2324,7 +2323,7 @@ void makeUDPServerSockets(deferredAdd_t& deferredAdds) static bool trySendingQueryToWorker(unsigned int target, ThreadMSG* tmsg) { auto& targetInfo = g_threadInfos[target]; - if (!targetInfo.isWorker) { + if (!targetInfo.isWorker()) { g_log << Logger::Error << "distributeAsyncFunction() tried to assign a query to a non-worker thread" << endl; _exit(1); } @@ -2354,7 +2353,7 @@ static bool trySendingQueryToWorker(unsigned int target, ThreadMSG* tmsg) static unsigned int getWorkerLoad(size_t workerIdx) { - const auto mt = g_threadInfos[/* skip handler */ 1 + g_numDistributorThreads + workerIdx].mt; + const auto mt = g_threadInfos[/* skip handler */ 1 + RecThreadInfo::s_numDistributorThreads + workerIdx].mt; if (mt != nullptr) { return mt->numProcesses(); } @@ -2364,39 +2363,39 @@ static unsigned int getWorkerLoad(size_t workerIdx) static unsigned int selectWorker(unsigned int hash) { if (g_balancingFactor == 0) { - return /* skip handler */ 1 + g_numDistributorThreads + (hash % g_numWorkerThreads); + return /* skip handler */ 1 + RecThreadInfo::s_numDistributorThreads + (hash % RecThreadInfo::s_numWorkerThreads); } /* we start with one, representing the query we are currently handling */ double currentLoad = 1; - std::vector load(g_numWorkerThreads); - for (size_t idx = 0; idx < g_numWorkerThreads; idx++) { + std::vector load(RecThreadInfo::s_numWorkerThreads); + for (size_t idx = 0; idx < RecThreadInfo::s_numWorkerThreads; idx++) { load[idx] = getWorkerLoad(idx); currentLoad += load[idx]; // cerr<<"load for worker "< targetLoad) { ++g_stats.rebalancedQueries; do { // cerr<<"worker "< targetLoad); } - return /* skip handler */ 1 + g_numDistributorThreads + worker; + return /* skip handler */ 1 + RecThreadInfo::s_numDistributorThreads + worker; } // This function is only called by the distributor threads, when pdns-distributes-queries is set void distributeAsyncFunction(const string& packet, const pipefunc_t& func) { - if (!isDistributorThread()) { - g_log << Logger::Error << "distributeAsyncFunction() has been called by a worker (" << t_id << ")" << endl; + if (!RecThreadInfo::self().isDistributor()) { + g_log << Logger::Error << "distributeAsyncFunction() has been called by a worker (" << RecThreadInfo::id() << ")" << endl; _exit(1); } @@ -2412,7 +2411,7 @@ void distributeAsyncFunction(const string& packet, const pipefunc_t& func) was full, let's try another one */ unsigned int newTarget = 0; do { - newTarget = /* skip handler */ 1 + g_numDistributorThreads + dns_random(g_numWorkerThreads); + newTarget = /* skip handler */ 1 + RecThreadInfo::s_numDistributorThreads + dns_random(RecThreadInfo::s_numWorkerThreads); } while (newTarget == target); if (!trySendingQueryToWorker(newTarget, tmsg)) { diff --git a/pdns/rec_channel_rec.cc b/pdns/rec_channel_rec.cc index 1010369cf0..b7fc66ee14 100644 --- a/pdns/rec_channel_rec.cc +++ b/pdns/rec_channel_rec.cc @@ -1118,7 +1118,7 @@ static StatsMap toCPUStatsMap(const string& name) { const string pbasename = getPrometheusName(name); StatsMap entries; - for (unsigned int n = 0; n < g_numThreads; ++n) { + for (unsigned int n = 0; n < RecThreadInfo::numThreads(); ++n) { uint64_t tm = doGetThreadCPUMsec(n); std::string pname = pbasename + "{thread=\"" + std::to_string(n) + "\"}"; entries.emplace(name + "-thread-" + std::to_string(n), StatsMapEntry{pname, std::to_string(tm)}); diff --git a/pdns/recursordist/rec-main.cc b/pdns/recursordist/rec-main.cc index f15ea7eb4a..bcaf57fffa 100644 --- a/pdns/recursordist/rec-main.cc +++ b/pdns/recursordist/rec-main.cc @@ -34,7 +34,6 @@ #include "validate-recursor.hh" #include "pubsuffix.hh" #include "opensslsigners.hh" -#include "threadname.hh" #include "ws-recursor.hh" #include "rec-taskqueue.hh" #include "secpoll-recursor.hh" @@ -78,9 +77,7 @@ thread_local std::shared_ptr t_udrDBp; #endif /* NOD_ENABLED */ std::atomic statsWanted; -unsigned int g_numWorkerThreads; uint32_t g_disthashseed; -bool g_weDistributeQueries; // if true, 1 or more threads listen on the incoming query sockets and distribute them to workers bool g_useIncomingECS; uint16_t g_xpfRRCode{0}; NetmaskGroup g_proxyProtocolACL; @@ -91,8 +88,6 @@ std::shared_ptr g_initialAllowFrom; // new thread needs to be setu std::shared_ptr g_initialAllowNotifyFrom; // new threads need this to be setup std::shared_ptr g_initialAllowNotifyFor; // new threads need this to be setup bool g_logRPZChanges{false}; -unsigned int g_numDistributorThreads; -unsigned int g_numThreads; static time_t s_statisticsInterval; bool g_addExtendedResolutionDNSErrors; static std::atomic s_counter; @@ -108,6 +103,11 @@ deferredAdd_t g_deferredAdds; and finally the workers */ std::vector g_threadInfos; +bool RecThreadInfo::s_weDistributeQueries; // if true, 1 or more threads listen on the incoming query sockets and distribute them to workers +unsigned int RecThreadInfo::s_numDistributorThreads; +unsigned int RecThreadInfo::s_numWorkerThreads; +thread_local unsigned int RecThreadInfo::t_id; + ArgvMap& arg() { static ArgvMap theArg; @@ -629,8 +629,8 @@ static void checkLinuxIPv6Limits() static void checkOrFixFDS() { unsigned int availFDs = getFilenumLimit(); - unsigned int wantFDs = g_maxMThreads * g_numWorkerThreads + 25; // even healthier margin then before - wantFDs += g_numWorkerThreads * TCPOutConnectionManager::s_maxIdlePerThread; + unsigned int wantFDs = g_maxMThreads * RecThreadInfo::s_numWorkerThreads + 25; // even healthier margin then before + wantFDs += RecThreadInfo::s_numWorkerThreads * TCPOutConnectionManager::s_maxIdlePerThread; if (wantFDs > availFDs) { unsigned int hardlimit = getFilenumLimit(true); @@ -639,7 +639,7 @@ static void checkOrFixFDS() g_log << Logger::Warning << "Raised soft limit on number of filedescriptors to " << wantFDs << " to match max-mthreads and threads settings" << endl; } else { - int newval = (hardlimit - 25 - TCPOutConnectionManager::s_maxIdlePerThread) / g_numWorkerThreads; + int newval = (hardlimit - 25 - TCPOutConnectionManager::s_maxIdlePerThread) / RecThreadInfo::s_numWorkerThreads; g_log << Logger::Warning << "Insufficient number of filedescriptors available for max-mthreads*threads setting! (" << hardlimit << " < " << wantFDs << "), reducing max-mthreads to " << newval << endl; g_maxMThreads = newval; setFilenumLimit(hardlimit); @@ -697,7 +697,7 @@ static void makeThreadPipes() } /* thread 0 is the handler / SNMP, worker threads start at 1 */ - for (unsigned int n = 0; n <= (g_numWorkerThreads + g_numDistributorThreads); ++n) { + for (unsigned int n = 0; n <= (RecThreadInfo::s_numWorkerThreads + RecThreadInfo::s_numDistributorThreads); ++n) { auto& threadInfos = g_threadInfos.at(n); int fd[2]; @@ -783,7 +783,7 @@ static void doStats(void) size_t idx = 0; for (const auto& threadInfo : g_threadInfos) { - if (threadInfo.isWorker) { + if (threadInfo.isWorker()) { g_log << Logger::Notice << "stats: thread " << idx << " has been distributed " << threadInfo.numberOfDistributedQueries << " queries" << endl; ++idx; } @@ -936,7 +936,7 @@ void broadcastFunction(const pipefunc_t& func) for the initialization of ACLs and domain maps. After that it should only be called by the handler. */ - if (g_threadInfos.empty() && isHandlerThread()) { + if (g_threadInfos.empty() && RecThreadInfo::id() == 0) { /* the handler and distributors will call themselves below, but during startup we get called while g_threadInfos has not been populated yet to update the ACL or domain maps, so we need to @@ -947,7 +947,7 @@ void broadcastFunction(const pipefunc_t& func) unsigned int n = 0; for (const auto& threadInfo : g_threadInfos) { - if (n++ == t_id) { + if (n++ == RecThreadInfo::id()) { func(); // don't write to ourselves! continue; } @@ -997,15 +997,15 @@ static vector>& operator+=(vector T broadcastAccFunction(const boost::function& func) { - if (!isHandlerThread()) { - g_log << Logger::Error << "broadcastAccFunction has been called by a worker (" << t_id << ")" << endl; + if (!RecThreadInfo::self().isHandler()) { + g_log << Logger::Error << "broadcastAccFunction has been called by a worker (" << RecThreadInfo::id() << ")" << endl; _exit(1); } unsigned int n = 0; T ret = T(); for (const auto& threadInfo : g_threadInfos) { - if (n++ == t_id) { + if (n++ == RecThreadInfo::id()) { continue; } @@ -1148,8 +1148,8 @@ static int serviceMain(int argc, char* argv[]) } /* this needs to be done before parseACLs(), which call broadcastFunction() */ - g_weDistributeQueries = ::arg().mustDo("pdns-distributes-queries"); - if (g_weDistributeQueries) { + RecThreadInfo::s_weDistributeQueries = ::arg().mustDo("pdns-distributes-queries"); + if (RecThreadInfo::s_weDistributeQueries) { g_log << Logger::Warning << "PowerDNS Recursor itself will distribute queries over threads" << endl; } @@ -1323,14 +1323,13 @@ static int serviceMain(int argc, char* argv[]) } g_paddingTag = ::arg().asNum("edns-padding-tag"); - g_numDistributorThreads = ::arg().asNum("distributor-threads"); - g_numWorkerThreads = ::arg().asNum("threads"); - if (g_numWorkerThreads < 1) { + RecThreadInfo::s_numDistributorThreads = ::arg().asNum("distributor-threads"); + RecThreadInfo::s_numWorkerThreads = ::arg().asNum("threads"); + if (RecThreadInfo::s_numWorkerThreads < 1) { g_log << Logger::Warning << "Asked to run with 0 threads, raising to 1 instead" << endl; - g_numWorkerThreads = 1; + RecThreadInfo::s_numWorkerThreads = 1; } - g_numThreads = g_numDistributorThreads + g_numWorkerThreads; g_maxMThreads = ::arg().asNum("max-mthreads"); int64_t maxInFlight = ::arg().asNum("max-concurrent-requests-per-tcp-connection"); @@ -1425,12 +1424,12 @@ static int serviceMain(int argc, char* argv[]) g_reusePort = ::arg().mustDo("reuseport"); #endif - g_threadInfos.resize(g_numDistributorThreads + g_numWorkerThreads + /* handler */ 1); + g_threadInfos.resize(RecThreadInfo::s_numDistributorThreads + RecThreadInfo::s_numWorkerThreads + /* handler */ 1); if (g_reusePort) { - if (g_weDistributeQueries) { + if (RecThreadInfo::s_weDistributeQueries) { /* first thread is the handler, then distributors */ - for (unsigned int threadId = 1; threadId <= g_numDistributorThreads; threadId++) { + for (unsigned int threadId = 1; threadId <= RecThreadInfo::s_numDistributorThreads; threadId++) { auto& deferredAdds = g_threadInfos.at(threadId).deferredAdds; auto& tcpSockets = g_threadInfos.at(threadId).tcpSockets; makeUDPServerSockets(deferredAdds); @@ -1439,7 +1438,7 @@ static int serviceMain(int argc, char* argv[]) } else { /* first thread is the handler, there is no distributor here and workers are accepting queries */ - for (unsigned int threadId = 1; threadId <= g_numWorkerThreads; threadId++) { + for (unsigned int threadId = 1; threadId <= RecThreadInfo::s_numWorkerThreads; threadId++) { auto& deferredAdds = g_threadInfos.at(threadId).deferredAdds; auto& tcpSockets = g_threadInfos.at(threadId).tcpSockets; makeUDPServerSockets(deferredAdds); @@ -1456,15 +1455,15 @@ static int serviceMain(int argc, char* argv[]) /* every listener (so distributor if g_weDistributeQueries, workers otherwise) needs to listen to the shared sockets */ - if (g_weDistributeQueries) { + if (RecThreadInfo::s_weDistributeQueries) { /* first thread is the handler, then distributors */ - for (unsigned int threadId = 1; threadId <= g_numDistributorThreads; threadId++) { + for (unsigned int threadId = 1; threadId <= RecThreadInfo::s_numDistributorThreads; threadId++) { g_threadInfos.at(threadId).tcpSockets = tcpSockets; } } else { /* first thread is the handler, there is no distributor here and workers are accepting queries */ - for (unsigned int threadId = 1; threadId <= g_numWorkerThreads; threadId++) { + for (unsigned int threadId = 1; threadId <= RecThreadInfo::s_numWorkerThreads; threadId++) { g_threadInfos.at(threadId).tcpSockets = tcpSockets; } } @@ -1631,58 +1630,57 @@ static int serviceMain(int argc, char* argv[]) unsigned int currentThreadId = 1; const auto cpusMap = parseCPUMap(); - if (g_numThreads == 1) { + if (RecThreadInfo::numThreads() == 1) { g_log << Logger::Warning << "Operating unthreaded" << endl; #ifdef HAVE_SYSTEMD sd_notify(0, "READY=1"); #endif /* This thread handles the web server, carbon, statistics and the control channel */ - auto& handlerInfos = g_threadInfos.at(0); - handlerInfos.isHandler = true; - handlerInfos.thread = std::thread(recursorThread, 0, "web+stat"); + auto& handlerInfo = g_threadInfos.at(0); + handlerInfo.setHandler(); + handlerInfo.start(0, "web+stat"); setCPUMap(cpusMap, currentThreadId, pthread_self()); - auto& infos = g_threadInfos.at(currentThreadId); - infos.isListener = true; - infos.isWorker = true; - recursorThread(currentThreadId++, "worker"); + auto& info = g_threadInfos.at(currentThreadId); + info.setListener(); + info.setWorker(); + info.setThreadId(currentThreadId++); + recursorThread(); - handlerInfos.thread.join(); - if (handlerInfos.exitCode != 0) { - ret = handlerInfos.exitCode; + handlerInfo.thread.join(); + if (handlerInfo.exitCode != 0) { + ret = handlerInfo.exitCode; } } else { - - if (g_weDistributeQueries) { - for (unsigned int n = 0; n < g_numDistributorThreads; ++n) { - auto& infos = g_threadInfos.at(currentThreadId + n); - infos.isListener = true; + if (RecThreadInfo::s_weDistributeQueries) { + for (unsigned int n = 0; n < RecThreadInfo::s_numDistributorThreads; ++n) { + g_threadInfos.at(currentThreadId + n).setListener(); } } - for (unsigned int n = 0; n < g_numWorkerThreads; ++n) { - auto& infos = g_threadInfos.at(currentThreadId + (g_weDistributeQueries ? g_numDistributorThreads : 0) + n); - infos.isListener = !g_weDistributeQueries; - infos.isWorker = true; + for (unsigned int n = 0; n < RecThreadInfo::s_numWorkerThreads; ++n) { + auto& info = g_threadInfos.at(currentThreadId + (RecThreadInfo::s_weDistributeQueries ? RecThreadInfo::s_numDistributorThreads : 0) + n); + info.setListener(!RecThreadInfo::s_weDistributeQueries); + info.setWorker(); } - if (g_weDistributeQueries) { - g_log << Logger::Warning << "Launching " << g_numDistributorThreads << " distributor threads" << endl; - for (unsigned int n = 0; n < g_numDistributorThreads; ++n) { - auto& infos = g_threadInfos.at(currentThreadId); - infos.thread = std::thread(recursorThread, currentThreadId++, "distr"); - setCPUMap(cpusMap, currentThreadId, infos.thread.native_handle()); + if (RecThreadInfo::s_weDistributeQueries) { + g_log << Logger::Warning << "Launching " << RecThreadInfo::s_numDistributorThreads << " distributor threads" << endl; + for (unsigned int n = 0; n < RecThreadInfo::s_numDistributorThreads; ++n) { + auto& info = g_threadInfos.at(currentThreadId); + info.start(currentThreadId++, "distr"); + setCPUMap(cpusMap, currentThreadId, info.thread.native_handle()); } } - g_log << Logger::Warning << "Launching " << g_numWorkerThreads << " worker threads" << endl; + g_log << Logger::Warning << "Launching " << RecThreadInfo::s_numWorkerThreads << " worker threads" << endl; - for (unsigned int n = 0; n < g_numWorkerThreads; ++n) { - auto& infos = g_threadInfos.at(currentThreadId); - infos.thread = std::thread(recursorThread, currentThreadId++, "worker"); - setCPUMap(cpusMap, currentThreadId, infos.thread.native_handle()); + for (unsigned int n = 0; n < RecThreadInfo::s_numWorkerThreads; ++n) { + auto& info = g_threadInfos.at(currentThreadId); + info.start(currentThreadId++, "worker"); + setCPUMap(cpusMap, currentThreadId, info.thread.native_handle()); } #ifdef HAVE_SYSTEMD @@ -1690,9 +1688,9 @@ static int serviceMain(int argc, char* argv[]) #endif /* This thread handles the web server, carbon, statistics and the control channel */ - auto& infos = g_threadInfos.at(0); - infos.isHandler = true; - infos.thread = std::thread(recursorThread, 0, "web+stat"); + auto& info = g_threadInfos.at(0); + info.setHandler(); + info.start(0, "web+stat"); for (auto& ti : g_threadInfos) { ti.thread.join(); @@ -1726,8 +1724,7 @@ static void handlePipeRequest(int fd, FDMultiplexer::funcparam_t& var) g_log << Logger::Error << "PIPE function we executed created PDNS exception: " << e.reason << endl; // but what if they wanted an answer.. we send 0 } if (tmsg->wantAnswer) { - const auto& threadInfo = g_threadInfos.at(t_id); - if (write(threadInfo.pipes.writeFromThread, &resp, sizeof(resp)) != sizeof(resp)) { + if (write(RecThreadInfo::self().pipes.writeFromThread, &resp, sizeof(resp)) != sizeof(resp)) { delete tmsg; unixDie("write to thread pipe returned wrong size or error"); } @@ -1790,7 +1787,7 @@ static void houseKeeping(void*) past = now; past.tv_sec -= 5; if (t_last_prune < past) { - t_packetCache->doPruneTo(g_maxPacketCacheEntries / (g_numWorkerThreads + g_numDistributorThreads)); + t_packetCache->doPruneTo(g_maxPacketCacheEntries / (RecThreadInfo::s_numWorkerThreads + RecThreadInfo::s_numDistributorThreads)); time_t limit; if (!((t_cleanCounter++) % 40)) { // this is a full scan! @@ -1804,7 +1801,7 @@ static void houseKeeping(void*) Utility::gettimeofday(&t_last_prune, nullptr); } - if (isHandlerThread()) { + if (RecThreadInfo::self().isHandler()) { if (now.tv_sec - s_last_ZTC_prune > 60) { s_last_ZTC_prune = now.tv_sec; static map ztcStates; @@ -1813,7 +1810,6 @@ static void houseKeeping(void*) RecZoneToCache::ZoneToCache(ztc.second, ztcStates.at(ztc.first)); } } - if (now.tv_sec - s_last_RC_prune > 5) { g_recCache->doPrune(g_maxCacheEntries); g_negCache->prune(g_maxCacheEntries / 10); @@ -1901,230 +1897,229 @@ static void houseKeeping(void*) } } -void* recursorThread(unsigned int n, const string& threadName) -try { - t_id = n; - auto& threadInfo = g_threadInfos.at(t_id); - - static string threadPrefix = "pdns-r/"; - setThreadName(threadPrefix + threadName); - - SyncRes tmp(g_now); // make sure it allocates tsstorage before we do anything, like primeHints or so.. - SyncRes::setDomainMap(g_initialDomainMap); - t_allowFrom = g_initialAllowFrom; - t_allowNotifyFrom = g_initialAllowNotifyFrom; - t_allowNotifyFor = g_initialAllowNotifyFor; - t_udpclientsocks = std::make_unique(); - t_tcpClientCounts = std::make_unique(); - - if (threadInfo.isHandler) { - if (!primeHints()) { - threadInfo.exitCode = EXIT_FAILURE; - RecursorControlChannel::stop = 1; - g_log << Logger::Critical << "Priming cache failed, stopping" << endl; - return nullptr; +void* recursorThread() +{ + try { + auto& threadInfo = RecThreadInfo::self(); + { + SyncRes tmp(g_now); // make sure it allocates tsstorage before we do anything, like primeHints or so.. + SyncRes::setDomainMap(g_initialDomainMap); + t_allowFrom = g_initialAllowFrom; + t_allowNotifyFrom = g_initialAllowNotifyFrom; + t_allowNotifyFor = g_initialAllowNotifyFor; + t_udpclientsocks = std::make_unique(); + t_tcpClientCounts = std::make_unique(); + + if (threadInfo.isHandler()) { + if (!primeHints()) { + threadInfo.exitCode = EXIT_FAILURE; + RecursorControlChannel::stop = 1; + g_log << Logger::Critical << "Priming cache failed, stopping" << endl; + return nullptr; + } + g_log << Logger::Debug << "Done priming cache with root hints" << endl; + } } - g_log << Logger::Debug << "Done priming cache with root hints" << endl; - } - t_packetCache = std::make_unique(); + t_packetCache = std::make_unique(); #ifdef NOD_ENABLED - if (threadInfo.isWorker) - setupNODThread(); + if (threadInfo.isWorker()) + setupNODThread(); #endif /* NOD_ENABLED */ - /* the listener threads handle TCP queries */ - if (threadInfo.isWorker || threadInfo.isListener) { - try { - if (!::arg()["lua-dns-script"].empty()) { - t_pdl = std::make_shared(); - t_pdl->loadFile(::arg()["lua-dns-script"]); - g_log << Logger::Warning << "Loaded 'lua' script from '" << ::arg()["lua-dns-script"] << "'" << endl; + /* the listener threads handle TCP queries */ + if (threadInfo.isWorker() || threadInfo.isListener()) { + try { + if (!::arg()["lua-dns-script"].empty()) { + t_pdl = std::make_shared(); + t_pdl->loadFile(::arg()["lua-dns-script"]); + g_log << Logger::Warning << "Loaded 'lua' script from '" << ::arg()["lua-dns-script"] << "'" << endl; + } + } + catch (std::exception& e) { + g_log << Logger::Error << "Failed to load 'lua' script from '" << ::arg()["lua-dns-script"] << "': " << e.what() << endl; + _exit(99); } } - catch (std::exception& e) { - g_log << Logger::Error << "Failed to load 'lua' script from '" << ::arg()["lua-dns-script"] << "': " << e.what() << endl; - _exit(99); - } - } - unsigned int ringsize = ::arg().asNum("stats-ringbuffer-entries") / g_numWorkerThreads; - if (ringsize) { - t_remotes = std::make_unique(); - if (g_weDistributeQueries) - t_remotes->set_capacity(::arg().asNum("stats-ringbuffer-entries") / g_numDistributorThreads); - else - t_remotes->set_capacity(ringsize); - t_servfailremotes = std::make_unique(); - t_servfailremotes->set_capacity(ringsize); - t_bogusremotes = std::make_unique(); - t_bogusremotes->set_capacity(ringsize); - t_largeanswerremotes = std::make_unique(); - t_largeanswerremotes->set_capacity(ringsize); - t_timeouts = std::make_unique(); - t_timeouts->set_capacity(ringsize); - - t_queryring = std::make_unique>>(); - t_queryring->set_capacity(ringsize); - t_servfailqueryring = std::make_unique>>(); - t_servfailqueryring->set_capacity(ringsize); - t_bogusqueryring = std::make_unique>>(); - t_bogusqueryring->set_capacity(ringsize); - } - MT = std::make_unique(::arg().asNum("stack-size")); - threadInfo.mt = MT.get(); - - /* start protobuf export threads if needed */ - auto luaconfsLocal = g_luaconfs.getLocal(); - checkProtobufExport(luaconfsLocal); - checkOutgoingProtobufExport(luaconfsLocal); + unsigned int ringsize = ::arg().asNum("stats-ringbuffer-entries") / RecThreadInfo::s_numWorkerThreads; + if (ringsize) { + t_remotes = std::make_unique(); + if (RecThreadInfo::s_weDistributeQueries) + t_remotes->set_capacity(::arg().asNum("stats-ringbuffer-entries") / RecThreadInfo::s_numDistributorThreads); + else + t_remotes->set_capacity(ringsize); + t_servfailremotes = std::make_unique(); + t_servfailremotes->set_capacity(ringsize); + t_bogusremotes = std::make_unique(); + t_bogusremotes->set_capacity(ringsize); + t_largeanswerremotes = std::make_unique(); + t_largeanswerremotes->set_capacity(ringsize); + t_timeouts = std::make_unique(); + t_timeouts->set_capacity(ringsize); + + t_queryring = std::make_unique>>(); + t_queryring->set_capacity(ringsize); + t_servfailqueryring = std::make_unique>>(); + t_servfailqueryring->set_capacity(ringsize); + t_bogusqueryring = std::make_unique>>(); + t_bogusqueryring->set_capacity(ringsize); + } + MT = std::make_unique(::arg().asNum("stack-size")); + threadInfo.mt = MT.get(); + + /* start protobuf export threads if needed */ + auto luaconfsLocal = g_luaconfs.getLocal(); + checkProtobufExport(luaconfsLocal); + checkOutgoingProtobufExport(luaconfsLocal); #ifdef HAVE_FSTRM - checkFrameStreamExport(luaconfsLocal); + checkFrameStreamExport(luaconfsLocal); #endif - PacketID pident; + PacketID pident; - t_fdm = getMultiplexer(); + t_fdm = getMultiplexer(); - RecursorWebServer* rws = nullptr; + RecursorWebServer* rws = nullptr; - t_fdm->addReadFD(threadInfo.pipes.readToThread, handlePipeRequest); + t_fdm->addReadFD(threadInfo.pipes.readToThread, handlePipeRequest); - if (threadInfo.isHandler) { - if (::arg().mustDo("webserver")) { - g_log << Logger::Warning << "Enabling web server" << endl; - try { - rws = new RecursorWebServer(t_fdm); - } - catch (const PDNSException& e) { - g_log << Logger::Error << "Unable to start the internal web server: " << e.reason << endl; - _exit(99); + if (threadInfo.isHandler()) { + if (::arg().mustDo("webserver")) { + g_log << Logger::Warning << "Enabling web server" << endl; + try { + rws = new RecursorWebServer(t_fdm); + } + catch (const PDNSException& e) { + g_log << Logger::Error << "Unable to start the internal web server: " << e.reason << endl; + _exit(99); + } } + g_log << Logger::Info << "Enabled '" << t_fdm->getName() << "' multiplexer" << endl; } - g_log << Logger::Info << "Enabled '" << t_fdm->getName() << "' multiplexer" << endl; - } - else { - t_fdm->addReadFD(threadInfo.pipes.readQueriesToThread, handlePipeRequest); + else { + t_fdm->addReadFD(threadInfo.pipes.readQueriesToThread, handlePipeRequest); - if (threadInfo.isListener) { - if (g_reusePort) { - /* then every listener has its own FDs */ - for (const auto& deferred : threadInfo.deferredAdds) { - t_fdm->addReadFD(deferred.first, deferred.second); + if (threadInfo.isListener()) { + if (g_reusePort) { + /* then every listener has its own FDs */ + for (const auto& deferred : threadInfo.deferredAdds) { + t_fdm->addReadFD(deferred.first, deferred.second); + } } - } - else { - /* otherwise all listeners are listening on the same ones */ - for (const auto& deferred : g_deferredAdds) { - t_fdm->addReadFD(deferred.first, deferred.second); + else { + /* otherwise all listeners are listening on the same ones */ + for (const auto& deferred : g_deferredAdds) { + t_fdm->addReadFD(deferred.first, deferred.second); + } } } } - } - registerAllStats(); + registerAllStats(); - if (threadInfo.isHandler) { - t_fdm->addReadFD(g_rcc.d_fd, handleRCC); // control channel - } + if (threadInfo.isHandler()) { + t_fdm->addReadFD(g_rcc.d_fd, handleRCC); // control channel + } - unsigned int maxTcpClients = ::arg().asNum("max-tcp-clients"); + unsigned int maxTcpClients = ::arg().asNum("max-tcp-clients"); - bool listenOnTCP(true); + bool listenOnTCP(true); - time_t last_stat = 0; - time_t last_carbon = 0, last_lua_maintenance = 0; - time_t carbonInterval = ::arg().asNum("carbon-interval"); - time_t luaMaintenanceInterval = ::arg().asNum("lua-maintenance-interval"); - s_counter.store(0); // used to periodically execute certain tasks + time_t last_stat = 0; + time_t last_carbon = 0, last_lua_maintenance = 0; + time_t carbonInterval = ::arg().asNum("carbon-interval"); + time_t luaMaintenanceInterval = ::arg().asNum("lua-maintenance-interval"); + s_counter.store(0); // used to periodically execute certain tasks - while (!RecursorControlChannel::stop) { - while (MT->schedule(&g_now)) - ; // MTasker letting the mthreads do their thing + while (!RecursorControlChannel::stop) { + while (MT->schedule(&g_now)) + ; // MTasker letting the mthreads do their thing - // Use primes, it avoid not being scheduled in cases where the counter has a regular pattern. - // We want to call handler thread often, it gets scheduled about 2 times per second - if ((threadInfo.isHandler && s_counter % 11 == 0) || s_counter % 499 == 0) { - MT->makeThread(houseKeeping, 0); - } + // Use primes, it avoid not being scheduled in cases where the counter has a regular pattern. + // We want to call handler thread often, it gets scheduled about 2 times per second + if ((threadInfo.isHandler() && s_counter % 11 == 0) || s_counter % 499 == 0) { + MT->makeThread(houseKeeping, 0); + } - if (!(s_counter % 55)) { - typedef vector> expired_t; - expired_t expired = t_fdm->getTimeouts(g_now); + if (!(s_counter % 55)) { + typedef vector> expired_t; + expired_t expired = t_fdm->getTimeouts(g_now); - for (expired_t::iterator i = expired.begin(); i != expired.end(); ++i) { - shared_ptr conn = boost::any_cast>(i->second); - if (g_logCommonErrors) - g_log << Logger::Warning << "Timeout from remote TCP client " << conn->d_remote.toStringWithPort() << endl; - t_fdm->removeReadFD(i->first); + for (expired_t::iterator i = expired.begin(); i != expired.end(); ++i) { + shared_ptr conn = boost::any_cast>(i->second); + if (g_logCommonErrors) + g_log << Logger::Warning << "Timeout from remote TCP client " << conn->d_remote.toStringWithPort() << endl; + t_fdm->removeReadFD(i->first); + } } - } - s_counter++; + s_counter++; - if (threadInfo.isHandler) { - if (statsWanted || (s_statisticsInterval > 0 && (g_now.tv_sec - last_stat) >= s_statisticsInterval)) { - doStats(); - last_stat = g_now.tv_sec; - } + if (threadInfo.isHandler()) { + if (statsWanted || (s_statisticsInterval > 0 && (g_now.tv_sec - last_stat) >= s_statisticsInterval)) { + doStats(); + last_stat = g_now.tv_sec; + } - Utility::gettimeofday(&g_now, nullptr); + Utility::gettimeofday(&g_now, nullptr); - if ((g_now.tv_sec - last_carbon) >= carbonInterval) { - MT->makeThread(doCarbonDump, 0); - last_carbon = g_now.tv_sec; + if ((g_now.tv_sec - last_carbon) >= carbonInterval) { + MT->makeThread(doCarbonDump, 0); + last_carbon = g_now.tv_sec; + } } - } - if (t_pdl != nullptr) { - // lua-dns-script directive is present, call the maintenance callback if needed - /* remember that the listener threads handle TCP queries */ - if (threadInfo.isWorker || threadInfo.isListener) { - // Only on threads processing queries - if (g_now.tv_sec - last_lua_maintenance >= luaMaintenanceInterval) { - t_pdl->maintenance(); - last_lua_maintenance = g_now.tv_sec; + if (t_pdl != nullptr) { + // lua-dns-script directive is present, call the maintenance callback if needed + /* remember that the listener threads handle TCP queries */ + if (threadInfo.isWorker() || threadInfo.isListener()) { + // Only on threads processing queries + if (g_now.tv_sec - last_lua_maintenance >= luaMaintenanceInterval) { + t_pdl->maintenance(); + last_lua_maintenance = g_now.tv_sec; + } } } - } - t_fdm->run(&g_now); - // 'run' updates g_now for us + t_fdm->run(&g_now); + // 'run' updates g_now for us - if (threadInfo.isListener) { - if (listenOnTCP) { - if (TCPConnection::getCurrentConnections() > maxTcpClients) { // shutdown, too many connections - for (const auto fd : threadInfo.tcpSockets) { - t_fdm->removeReadFD(fd); + if (threadInfo.isListener()) { + if (listenOnTCP) { + if (TCPConnection::getCurrentConnections() > maxTcpClients) { // shutdown, too many connections + for (const auto fd : threadInfo.tcpSockets) { + t_fdm->removeReadFD(fd); + } + listenOnTCP = false; } - listenOnTCP = false; } - } - else { - if (TCPConnection::getCurrentConnections() <= maxTcpClients) { // reenable - for (const auto fd : threadInfo.tcpSockets) { - t_fdm->addReadFD(fd, handleNewTCPQuestion); + else { + if (TCPConnection::getCurrentConnections() <= maxTcpClients) { // reenable + for (const auto fd : threadInfo.tcpSockets) { + t_fdm->addReadFD(fd, handleNewTCPQuestion); + } + listenOnTCP = true; } - listenOnTCP = true; } } } + delete rws; + delete t_fdm; + return nullptr; + } + catch (PDNSException& ae) { + g_log << Logger::Error << "Exception: " << ae.reason << endl; + return nullptr; + } + catch (std::exception& e) { + g_log << Logger::Error << "STL Exception: " << e.what() << endl; + return nullptr; + } + catch (...) { + g_log << Logger::Error << "any other exception in main: " << endl; + return nullptr; } - delete rws; - delete t_fdm; - return 0; -} -catch (PDNSException& ae) { - g_log << Logger::Error << "Exception: " << ae.reason << endl; - return 0; -} -catch (std::exception& e) { - g_log << Logger::Error << "STL Exception: " << e.what() << endl; - return 0; -} -catch (...) { - g_log << Logger::Error << "any other exception in main: " << endl; - return 0; } int main(int argc, char** argv) @@ -2516,25 +2511,25 @@ static RecursorControlChannel::Answer* doReloadLuaScript() try { if (fname.empty()) { t_pdl.reset(); - g_log << Logger::Info << t_id << " Unloaded current lua script" << endl; + g_log << Logger::Info << RecThreadInfo::id() << " Unloaded current lua script" << endl; return new RecursorControlChannel::Answer{0, string("unloaded\n")}; } else { t_pdl = std::make_shared(); int err = t_pdl->loadFile(fname); if (err != 0) { - string msg = std::to_string(t_id) + " Retaining current script, could not read '" + fname + "': " + stringerror(err); + string msg = std::to_string(RecThreadInfo::id()) + " Retaining current script, could not read '" + fname + "': " + stringerror(err); g_log << Logger::Error << msg << endl; return new RecursorControlChannel::Answer{1, msg + "\n"}; } } } catch (std::exception& e) { - g_log << Logger::Error << t_id << " Retaining current script, error from '" << fname << "': " << e.what() << endl; + g_log << Logger::Error << RecThreadInfo::id() << " Retaining current script, error from '" << fname << "': " << e.what() << endl; return new RecursorControlChannel::Answer{1, string("retaining current script, error from '" + fname + "': " + e.what() + "\n")}; } - g_log << Logger::Warning << t_id << " (Re)loaded lua script from '" << fname << "'" << endl; + g_log << Logger::Warning << RecThreadInfo::id() << " (Re)loaded lua script from '" << fname << "'" << endl; return new RecursorControlChannel::Answer{0, string("(re)loaded '" + fname + "'\n")}; } diff --git a/pdns/recursordist/rec-main.hh b/pdns/recursordist/rec-main.hh index a5328bd35a..e8554badd9 100644 --- a/pdns/recursordist/rec-main.hh +++ b/pdns/recursordist/rec-main.hh @@ -35,6 +35,7 @@ #include "syncres.hh" #include "rec-snmp.hh" #include "rec_channel.hh" +#include "threadname.hh" #ifdef NOD_ENABLED #include "nod.hh" @@ -199,7 +200,6 @@ extern thread_local std::shared_ptr t_allowFrom; extern thread_local std::shared_ptr t_allowNotifyFrom; extern thread_local std::shared_ptr t_allowNotifyFor; extern thread_local std::unique_ptr t_udpclientsocks; -extern bool g_weDistributeQueries; // if true, 1 or more threads listen on the incoming query sockets and distribute them to workers extern bool g_useIncomingECS; extern boost::optional g_dns64Prefix; extern DNSName g_dns64PrefixReverse; @@ -208,8 +208,6 @@ extern bool g_addExtendedResolutionDNSErrors; extern uint16_t g_xpfRRCode; extern NetmaskGroup g_proxyProtocolACL; extern std::atomic g_statsWanted; -extern unsigned int g_numDistributorThreads; -extern unsigned int g_numWorkerThreads; extern uint32_t g_disthashseed; extern int g_argc; extern char** g_argv; @@ -258,13 +256,6 @@ inline MT_t* getMT() return MT ? MT.get() : nullptr; } -extern thread_local unsigned int t_id; - -inline unsigned int getRecursorThreadId() -{ - return t_id; -} - /* this function is called with both a string and a vector representing a packet */ template static bool sendResponseOverTCP(const std::unique_ptr& dc, const T& packet) @@ -299,6 +290,14 @@ static bool sendResponseOverTCP(const std::unique_ptr& dc, const return hadError; } +struct RecThreadInfo; +/* first we have the handler thread, t_id == 0 (some other + helper threads like SNMP might have t_id == 0 as well) + then the distributor threads if any + and finally the workers */ +extern std::vector g_threadInfos; +void* recursorThread(); + // for communicating with our threads // effectively readonly after startup struct RecThreadInfo @@ -313,6 +312,74 @@ struct RecThreadInfo int readQueriesToThread{-1}; }; +public: + static RecThreadInfo& self() + { + return g_threadInfos.at(t_id); + } + + bool isDistributor() const + { + if (t_id == 0) { + return false; + } + return s_weDistributeQueries && listener; + } + + bool isHandler() const + { + if (t_id == 0) { + return true; + } + return handler; + } + + bool isWorker() const + { + return worker; + } + + bool isListener() const + { + return listener; + } + + void setHandler() + { + handler = true; + } + + void setWorker() + { + worker = true; + } + + void setListener(bool flag = true) + { + listener = flag; + } + + void start(unsigned int id, const string& name) + { + thread = std::thread([id, name] { + t_id = id; + const string threadPrefix = "rec/"; + setThreadName(threadPrefix + name); + recursorThread(); + }); + sleep(1); + } + + static unsigned int id() + { + return t_id; + } + + static void setThreadId(unsigned int id) + { + t_id = id; + } + /* FD corresponding to TCP sockets this thread is listening on. These FDs are also in deferredAdds when we have one @@ -327,12 +394,24 @@ struct RecThreadInfo MT_t* mt{nullptr}; uint64_t numberOfDistributedQueries{0}; int exitCode{0}; + + static bool s_weDistributeQueries; // if true, 1 or more threads listen on the incoming query sockets and distribute them to workers + static unsigned int s_numDistributorThreads; + static unsigned int s_numWorkerThreads; + + static unsigned int numThreads() + { + return s_numDistributorThreads + s_numWorkerThreads; + } + +private: /* handle the web server, carbon, statistics and the control channel */ - bool isHandler{false}; + bool handler{false}; /* accept incoming queries (and distributes them to the workers if pdns-distributes-queries is set) */ - bool isListener{false}; + bool listener{false}; /* process queries */ - bool isWorker{false}; + bool worker{false}; + static thread_local unsigned int t_id; }; struct ThreadMSG @@ -341,29 +420,7 @@ struct ThreadMSG bool wantAnswer; }; -/* first we have the handler thread, t_id == 0 (some other - helper threads like SNMP might have t_id == 0 as well) - then the distributor threads if any - and finally the workers */ -extern std::vector g_threadInfos; - -inline bool isDistributorThread() -{ - if (t_id == 0) { - return false; - } - - return g_weDistributeQueries && g_threadInfos.at(t_id).isListener; -} -inline bool isHandlerThread() -{ - if (t_id == 0) { - return true; - } - - return g_threadInfos.at(t_id).isHandler; -} PacketBuffer GenUDPQueryResponse(const ComboAddress& dest, const string& query); bool checkProtobufExport(LocalStateHolder& luaconfsLocal); @@ -397,7 +454,6 @@ void makeTCPServerSockets(deferredAdd_t& deferredAdds, std::set& tcpSockets void handleNewTCPQuestion(int fd, FDMultiplexer::funcparam_t&); void makeUDPServerSockets(deferredAdd_t& deferredAdds); -void* recursorThread(unsigned int n, const string& threadName); #define LOCAL_NETS "127.0.0.0/8, 10.0.0.0/8, 100.64.0.0/10, 169.254.0.0/16, 192.168.0.0/16, 172.16.0.0/12, ::1/128, fc00::/7, fe80::/10" #define LOCAL_NETS_INVERSE "!127.0.0.0/8, !10.0.0.0/8, !100.64.0.0/10, !169.254.0.0/16, !192.168.0.0/16, !172.16.0.0/12, !::1/128, !fc00::/7, !fe80::/10" diff --git a/pdns/recursordist/rec-tcp.cc b/pdns/recursordist/rec-tcp.cc index 14fd9ae1ce..4e41c08be3 100644 --- a/pdns/recursordist/rec-tcp.cc +++ b/pdns/recursordist/rec-tcp.cc @@ -452,7 +452,7 @@ static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var) bool ipf = t_pdl->ipfilter(dc->d_source, dc->d_destination, *dh, dc->d_eventTrace); if (ipf) { if (!g_quiet) { - g_log << Logger::Notice << t_id << " [" << MT->getTid() << "/" << MT->numProcesses() << "] DROPPED TCP question from " << dc->d_source.toStringWithPort() << (dc->d_source != dc->d_remote ? " (via " + dc->d_remote.toStringWithPort() + ")" : "") << " based on policy" << endl; + g_log << Logger::Notice << RecThreadInfo::id() << " [" << MT->getTid() << "/" << MT->numProcesses() << "] DROPPED TCP question from " << dc->d_source.toStringWithPort() << (dc->d_source != dc->d_remote ? " (via " + dc->d_remote.toStringWithPort() + ")" : "") << " based on policy" << endl; } g_stats.policyDrops++; return; @@ -522,7 +522,7 @@ static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var) if (cacheHit) { if (!g_quiet) { - g_log << Logger::Notice << t_id << " TCP question answered from packet cache tag=" << dc->d_tag << " from " << dc->d_source.toStringWithPort() << (dc->d_source != dc->d_remote ? " (via " + dc->d_remote.toStringWithPort() + ")" : "") << endl; + g_log << Logger::Notice << RecThreadInfo::id() << " TCP question answered from packet cache tag=" << dc->d_tag << " from " << dc->d_source.toStringWithPort() << (dc->d_source != dc->d_remote ? " (via " + dc->d_remote.toStringWithPort() + ")" : "") << endl; } bool hadError = sendResponseOverTCP(dc, response); @@ -551,7 +551,7 @@ static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var) if (dc->d_mdp.d_header.opcode == Opcode::Notify) { if (!g_quiet) { - g_log << Logger::Notice << t_id << " got NOTIFY for " << qname.toLogString() << " from " << dc->d_source.toStringWithPort() << (dc->d_source != dc->d_remote ? " (via " + dc->d_remote.toStringWithPort() + ")" : "") << endl; + g_log << Logger::Notice << RecThreadInfo::id() << " got NOTIFY for " << qname.toLogString() << " from " << dc->d_source.toStringWithPort() << (dc->d_source != dc->d_remote ? " (via " + dc->d_remote.toStringWithPort() + ")" : "") << endl; } requestWipeCaches(qname); diff --git a/pdns/recursordist/test-syncres_cc.cc b/pdns/recursordist/test-syncres_cc.cc index 7b5d47c61a..fef6a3bac4 100644 --- a/pdns/recursordist/test-syncres_cc.cc +++ b/pdns/recursordist/test-syncres_cc.cc @@ -16,7 +16,6 @@ GlobalStateHolder g_dontThrottleNetmasks; GlobalStateHolder g_DoTToAuthNames; std::unique_ptr g_recCache; std::unique_ptr g_negCache; -unsigned int g_numThreads = 1; bool g_lowercaseOutgoing = false; /* Fake some required functions we didn't want the trouble to diff --git a/pdns/syncres.hh b/pdns/syncres.hh index e8e3845d93..dd777e00f9 100644 --- a/pdns/syncres.hh +++ b/pdns/syncres.hh @@ -1171,7 +1171,6 @@ string doTraceRegex(vector::const_iterator begin, vector::const_ void parseACLs(); extern RecursorStats g_stats; extern unsigned int g_networkTimeoutMsec; -extern unsigned int g_numThreads; extern uint16_t g_outgoingEDNSBufsize; extern std::atomic g_maxCacheEntries, g_maxPacketCacheEntries; extern bool g_lowercaseOutgoing;