From: Otto Moerbeek Date: Tue, 25 Jan 2022 14:11:20 +0000 (+0100) Subject: Introduce taskThread X-Git-Tag: auth-4.7.0-alpha1~19^2~9 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=048607b444a605e8caaadb1c22e9502011990b51;p=thirdparty%2Fpdns.git Introduce taskThread --- diff --git a/pdns/pdns_recursor.cc b/pdns/pdns_recursor.cc index 10cca1503f..9014af9ca7 100644 --- a/pdns/pdns_recursor.cc +++ b/pdns/pdns_recursor.cc @@ -2187,7 +2187,7 @@ static void handleNewUDPQuestion(int fd, FDMultiplexer::funcparam_t& var) destination = dest; } - if (RecThreadInfo::s_weDistributeQueries) { + if (RecThreadInfo::weDistributeQueries()) { std::string localdata = data; distributeAsyncFunction(data, [localdata, fromaddr, dest, source, destination, tv, fd, proxyProtocolValues, eventTrace]() mutable { return doProcessUDPQuestion(localdata, fromaddr, dest, source, destination, tv, fd, proxyProtocolValues, eventTrace); @@ -2353,7 +2353,7 @@ static bool trySendingQueryToWorker(unsigned int target, ThreadMSG* tmsg) static unsigned int getWorkerLoad(size_t workerIdx) { - const auto mt = RecThreadInfo::info(/* skip handler */ 1 + RecThreadInfo::s_numDistributorThreads + workerIdx).mt; + const auto mt = RecThreadInfo::info(RecThreadInfo::numHandlers() + RecThreadInfo::numDistributors() + workerIdx).mt; if (mt != nullptr) { return mt->numProcesses(); } @@ -2363,32 +2363,29 @@ static unsigned int getWorkerLoad(size_t workerIdx) static unsigned int selectWorker(unsigned int hash) { if (g_balancingFactor == 0) { - return /* skip handler */ 1 + RecThreadInfo::s_numDistributorThreads + (hash % RecThreadInfo::s_numWorkerThreads); + return RecThreadInfo::numHandlers() + RecThreadInfo::numDistributors() + (hash % RecThreadInfo::numWorkers()); } /* we start with one, representing the query we are currently handling */ double currentLoad = 1; - std::vector load(RecThreadInfo::s_numWorkerThreads); - for (size_t idx = 0; idx < RecThreadInfo::s_numWorkerThreads; idx++) { + std::vector load(RecThreadInfo::numWorkers()); + for (size_t idx = 0; idx < RecThreadInfo::numWorkers(); idx++) { load[idx] = getWorkerLoad(idx); currentLoad += load[idx]; - // cerr<<"load for worker "< targetLoad) { ++g_stats.rebalancedQueries; do { - // cerr<<"worker "< targetLoad); } - return /* skip handler */ 1 + RecThreadInfo::s_numDistributorThreads + worker; + return RecThreadInfo::numHandlers() + RecThreadInfo::numDistributors() + worker; } // This function is only called by the distributor threads, when pdns-distributes-queries is set @@ -2411,7 +2408,7 @@ void distributeAsyncFunction(const string& packet, const pipefunc_t& func) was full, let's try another one */ unsigned int newTarget = 0; do { - newTarget = /* skip handler */ 1 + RecThreadInfo::s_numDistributorThreads + dns_random(RecThreadInfo::s_numWorkerThreads); + newTarget = RecThreadInfo::numHandlers() + RecThreadInfo::numDistributors() + dns_random(RecThreadInfo::numWorkers()); } while (newTarget == target); if (!trySendingQueryToWorker(newTarget, tmsg)) { diff --git a/pdns/rec_channel_rec.cc b/pdns/rec_channel_rec.cc index b7fc66ee14..9678bbeaec 100644 --- a/pdns/rec_channel_rec.cc +++ b/pdns/rec_channel_rec.cc @@ -1118,7 +1118,7 @@ static StatsMap toCPUStatsMap(const string& name) { const string pbasename = getPrometheusName(name); StatsMap entries; - for (unsigned int n = 0; n < RecThreadInfo::numThreads(); ++n) { + for (unsigned int n = 0; n < RecThreadInfo::numRecursorThreads(); ++n) { uint64_t tm = doGetThreadCPUMsec(n); std::string pname = pbasename + "{thread=\"" + std::to_string(n) + "\"}"; entries.emplace(name + "-thread-" + std::to_string(n), StatsMapEntry{pname, std::to_string(tm)}); diff --git a/pdns/recursordist/rec-main.cc b/pdns/recursordist/rec-main.cc index 8bf704dc09..b00e42f9d4 100644 --- a/pdns/recursordist/rec-main.cc +++ b/pdns/recursordist/rec-main.cc @@ -629,8 +629,8 @@ static void checkLinuxIPv6Limits() static void checkOrFixFDS() { unsigned int availFDs = getFilenumLimit(); - unsigned int wantFDs = g_maxMThreads * RecThreadInfo::s_numWorkerThreads + 25; // even healthier margin then before - wantFDs += RecThreadInfo::s_numWorkerThreads * TCPOutConnectionManager::s_maxIdlePerThread; + unsigned int wantFDs = g_maxMThreads * RecThreadInfo::numWorkers() + 25; // even healthier margin then before + wantFDs += RecThreadInfo::numWorkers() * TCPOutConnectionManager::s_maxIdlePerThread; if (wantFDs > availFDs) { unsigned int hardlimit = getFilenumLimit(true); @@ -639,7 +639,7 @@ static void checkOrFixFDS() g_log << Logger::Warning << "Raised soft limit on number of filedescriptors to " << wantFDs << " to match max-mthreads and threads settings" << endl; } else { - int newval = (hardlimit - 25 - TCPOutConnectionManager::s_maxIdlePerThread) / RecThreadInfo::s_numWorkerThreads; + int newval = (hardlimit - 25 - TCPOutConnectionManager::s_maxIdlePerThread) / RecThreadInfo::numWorkers(); g_log << Logger::Warning << "Insufficient number of filedescriptors available for max-mthreads*threads setting! (" << hardlimit << " < " << wantFDs << "), reducing max-mthreads to " << newval << endl; g_maxMThreads = newval; setFilenumLimit(hardlimit); @@ -697,7 +697,7 @@ static void makeThreadPipes() } /* thread 0 is the handler / SNMP, worker threads start at 1 */ - for (unsigned int n = 0; n <= RecThreadInfo::numThreads(); ++n) { + for (unsigned int n = 0; n < RecThreadInfo::numRecursorThreads(); ++n) { auto& threadInfo = RecThreadInfo::info(n); int fd[2]; @@ -1148,8 +1148,8 @@ static int serviceMain(int argc, char* argv[]) } /* this needs to be done before parseACLs(), which call broadcastFunction() */ - RecThreadInfo::s_weDistributeQueries = ::arg().mustDo("pdns-distributes-queries"); - if (RecThreadInfo::s_weDistributeQueries) { + RecThreadInfo::setWeDistributeQueries(::arg().mustDo("pdns-distributes-queries")); + if (RecThreadInfo::weDistributeQueries()) { g_log << Logger::Warning << "PowerDNS Recursor itself will distribute queries over threads" << endl; } @@ -1323,11 +1323,11 @@ static int serviceMain(int argc, char* argv[]) } g_paddingTag = ::arg().asNum("edns-padding-tag"); - RecThreadInfo::s_numDistributorThreads = ::arg().asNum("distributor-threads"); - RecThreadInfo::s_numWorkerThreads = ::arg().asNum("threads"); - if (RecThreadInfo::s_numWorkerThreads < 1) { + RecThreadInfo::setNumDistributorThreads(::arg().asNum("distributor-threads")); + RecThreadInfo::setNumWorkerThreads(::arg().asNum("threads")); + if (RecThreadInfo::numWorkers() < 1) { g_log << Logger::Warning << "Asked to run with 0 threads, raising to 1 instead" << endl; - RecThreadInfo::s_numWorkerThreads = 1; + RecThreadInfo::setNumWorkerThreads(1); } g_maxMThreads = ::arg().asNum("max-mthreads"); @@ -1424,12 +1424,12 @@ static int serviceMain(int argc, char* argv[]) g_reusePort = ::arg().mustDo("reuseport"); #endif - RecThreadInfo::infos().resize(RecThreadInfo::numThreads() + /* handler */ 1); + RecThreadInfo::infos().resize(RecThreadInfo::numHandlers() + RecThreadInfo::numDistributors() + RecThreadInfo::numWorkers() + RecThreadInfo::numTaskThreads()); if (g_reusePort) { - if (RecThreadInfo::s_weDistributeQueries) { + if (RecThreadInfo::weDistributeQueries()) { /* first thread is the handler, then distributors */ - for (unsigned int threadId = 1; threadId <= RecThreadInfo::s_numDistributorThreads; threadId++) { + for (unsigned int threadId = 1; threadId <= RecThreadInfo::numDistributors(); threadId++) { auto& info = RecThreadInfo::info(threadId); auto& deferredAdds = info.deferredAdds; auto& tcpSockets = info.tcpSockets; @@ -1439,7 +1439,7 @@ static int serviceMain(int argc, char* argv[]) } else { /* first thread is the handler, there is no distributor here and workers are accepting queries */ - for (unsigned int threadId = 1; threadId <= RecThreadInfo::s_numWorkerThreads; threadId++) { + for (unsigned int threadId = 1; threadId <= RecThreadInfo::numWorkers(); threadId++) { auto& info = RecThreadInfo::info(threadId); auto& deferredAdds = info.deferredAdds; auto& tcpSockets = info.tcpSockets; @@ -1457,15 +1457,15 @@ static int serviceMain(int argc, char* argv[]) /* every listener (so distributor if g_weDistributeQueries, workers otherwise) needs to listen to the shared sockets */ - if (RecThreadInfo::s_weDistributeQueries) { + if (RecThreadInfo::weDistributeQueries()) { /* first thread is the handler, then distributors */ - for (unsigned int threadId = 1; threadId <= RecThreadInfo::s_numDistributorThreads; threadId++) { + for (unsigned int threadId = 1; threadId <= RecThreadInfo::numDistributors(); threadId++) { RecThreadInfo::info(threadId).tcpSockets = tcpSockets; } } else { /* first thread is the handler, there is no distributor here and workers are accepting queries */ - for (unsigned int threadId = 1; threadId <= RecThreadInfo::s_numWorkerThreads; threadId++) { + for (unsigned int threadId = 1; threadId <= RecThreadInfo::numWorkers(); threadId++) { RecThreadInfo::info(threadId).tcpSockets = tcpSockets; } } @@ -1632,7 +1632,7 @@ static int serviceMain(int argc, char* argv[]) unsigned int currentThreadId = 1; const auto cpusMap = parseCPUMap(); - if (RecThreadInfo::numThreads() == 1) { + if (RecThreadInfo::numDistributors() + RecThreadInfo::numWorkers() == 1) { g_log << Logger::Warning << "Operating unthreaded" << endl; #ifdef HAVE_SYSTEMD sd_notify(0, "READY=1"); @@ -1642,6 +1642,9 @@ static int serviceMain(int argc, char* argv[]) auto& handlerInfo = RecThreadInfo::info(0); handlerInfo.setHandler(); handlerInfo.start(0, "web+stat"); + auto& taskInfo = RecThreadInfo::info(2); + taskInfo.setTaskThread(); + taskInfo.start(2, "tasks"); setCPUMap(cpusMap, currentThreadId, pthread_self()); @@ -1655,36 +1658,52 @@ static int serviceMain(int argc, char* argv[]) if (handlerInfo.exitCode != 0) { ret = handlerInfo.exitCode; } + taskInfo.thread.join(); + if (taskInfo.exitCode != 0) { + ret = taskInfo.exitCode; + } } else { - if (RecThreadInfo::s_weDistributeQueries) { - for (unsigned int n = 0; n < RecThreadInfo::s_numDistributorThreads; ++n) { - RecThreadInfo::info(currentThreadId + n).setListener(); + // Setup RecThreadInfo objects + unsigned int tmp = currentThreadId; + if (RecThreadInfo::weDistributeQueries()) { + for (unsigned int n = 0; n < RecThreadInfo::numDistributors(); ++n) { + RecThreadInfo::info(tmp++).setListener(); } } - for (unsigned int n = 0; n < RecThreadInfo::s_numWorkerThreads; ++n) { - auto& info = RecThreadInfo::info(currentThreadId + (RecThreadInfo::s_weDistributeQueries ? RecThreadInfo::s_numDistributorThreads : 0) + n); - info.setListener(!RecThreadInfo::s_weDistributeQueries); + for (unsigned int n = 0; n < RecThreadInfo::numWorkers(); ++n) { + auto& info = RecThreadInfo::info(tmp++); + info.setListener(!RecThreadInfo::weDistributeQueries()); info.setWorker(); } + for (unsigned int n = 0; n < RecThreadInfo::numTaskThreads(); ++n) { + auto& info = RecThreadInfo::info(tmp++); + info.setTaskThread(); + } - if (RecThreadInfo::s_weDistributeQueries) { - g_log << Logger::Warning << "Launching " << RecThreadInfo::s_numDistributorThreads << " distributor threads" << endl; - for (unsigned int n = 0; n < RecThreadInfo::s_numDistributorThreads; ++n) { + if (RecThreadInfo::weDistributeQueries()) { + g_log << Logger::Warning << "Launching " << RecThreadInfo::numDistributors() << " distributor threads" << endl; + for (unsigned int n = 0; n < RecThreadInfo::numDistributors(); ++n) { auto& info = RecThreadInfo::info(currentThreadId); info.start(currentThreadId++, "distr"); setCPUMap(cpusMap, currentThreadId, info.thread.native_handle()); // XXX off by one? } } - g_log << Logger::Warning << "Launching " << RecThreadInfo::s_numWorkerThreads << " worker threads" << endl; + g_log << Logger::Warning << "Launching " << RecThreadInfo::numWorkers() << " worker threads" << endl; - for (unsigned int n = 0; n < RecThreadInfo::s_numWorkerThreads; ++n) { + for (unsigned int n = 0; n < RecThreadInfo::numWorkers(); ++n) { auto& info = RecThreadInfo::info(currentThreadId); info.start(currentThreadId++, "worker"); setCPUMap(cpusMap, currentThreadId, info.thread.native_handle()); // XXX off by one? } + for (unsigned int n = 0; n < RecThreadInfo::numTaskThreads(); ++n) { + auto& info = RecThreadInfo::info(currentThreadId); + info.start(currentThreadId++, "taskThread"); + setCPUMap(cpusMap, currentThreadId, info.thread.native_handle()); // XXX off by one? + } + #ifdef HAVE_SYSTEMD sd_notify(0, "READY=1"); #endif @@ -1789,7 +1808,7 @@ static void houseKeeping(void*) past = now; past.tv_sec -= 5; if (t_last_prune < past) { - t_packetCache->doPruneTo(g_maxPacketCacheEntries / (RecThreadInfo::s_numWorkerThreads + RecThreadInfo::s_numDistributorThreads)); + t_packetCache->doPruneTo(g_maxPacketCacheEntries / (RecThreadInfo::numWorkers() + RecThreadInfo::numDistributors())); time_t limit; if (!((t_cleanCounter++) % 40)) { // this is a full scan! @@ -1945,11 +1964,11 @@ void* recursorThread() } } - unsigned int ringsize = ::arg().asNum("stats-ringbuffer-entries") / RecThreadInfo::s_numWorkerThreads; + unsigned int ringsize = ::arg().asNum("stats-ringbuffer-entries") / RecThreadInfo::numWorkers(); if (ringsize) { t_remotes = std::make_unique(); - if (RecThreadInfo::s_weDistributeQueries) - t_remotes->set_capacity(::arg().asNum("stats-ringbuffer-entries") / RecThreadInfo::s_numDistributorThreads); + if (RecThreadInfo::weDistributeQueries()) + t_remotes->set_capacity(::arg().asNum("stats-ringbuffer-entries") / RecThreadInfo::numDistributors()); else t_remotes->set_capacity(ringsize); t_servfailremotes = std::make_unique(); diff --git a/pdns/recursordist/rec-main.hh b/pdns/recursordist/rec-main.hh index e4f79ae601..93d5c979eb 100644 --- a/pdns/recursordist/rec-main.hh +++ b/pdns/recursordist/rec-main.hh @@ -351,6 +351,11 @@ public: return listener; } + bool isTaskThread() const + { + return taskThread; + } + void setHandler() { handler = true; @@ -366,6 +371,11 @@ public: listener = flag; } + void setTaskThread() + { + taskThread = true; + } + void start(unsigned int id, const string& name) { thread = std::thread([id, name] { @@ -386,9 +396,54 @@ public: t_id = id; } - // FD corresponding to TCP sockets this thread is listening on. - // These FDs are also in deferredAdds when we have one socket per - // listener, and in g_deferredAdds instead. + static unsigned int numHandlers() + { + return 1; + } + + static unsigned int numTaskThreads() + { + return 1; + } + + static unsigned int numWorkers() + { + return s_numWorkerThreads; + } + + static unsigned int numDistributors() + { + return s_numDistributorThreads; + } + + static bool weDistributeQueries() + { + return s_weDistributeQueries; + } + + static void setWeDistributeQueries(bool flag) + { + s_weDistributeQueries = flag; + } + + static void setNumWorkerThreads(unsigned int n) + { + s_numWorkerThreads = n; + } + + static void setNumDistributorThreads(unsigned int n) + { + s_numDistributorThreads = n; + } + + static unsigned int numRecursorThreads() + { + return numHandlers() + numDistributors() + numWorkers() + numTaskThreads(); + } + + // FD corresponding to TCP sockets this thread is listening on. + // These FDs are also in deferredAdds when we have one socket per + // listener, and in g_deferredAdds instead. std::set tcpSockets; // FD corresponding to listening sockets if we have one socket per // listener (with reuseport), otherwise all listeners share the @@ -401,24 +456,21 @@ public: uint64_t numberOfDistributedQueries{0}; int exitCode{0}; - static bool s_weDistributeQueries; // if true, 1 or more threads listen on the incoming query sockets and distribute them to workers - static unsigned int s_numDistributorThreads; - static unsigned int s_numWorkerThreads; - - static unsigned int numThreads() - { - return s_numDistributorThreads + s_numWorkerThreads; - } - private: - /* handle the web server, carbon, statistics and the control channel */ + // handle the web server, carbon, statistics and the control channel bool handler{false}; - /* accept incoming queries (and distributes them to the workers if pdns-distributes-queries is set) */ + // accept incoming queries (and distributes them to the workers if pdns-distributes-queries is set) bool listener{false}; - /* process queries */ + // process queries bool worker{false}; + // run async tasks: from TastQueue and ZoneToCache + bool taskThread{false}; + static thread_local unsigned int t_id; static std::vector s_threadInfos; + static bool s_weDistributeQueries; // if true, 1 or more threads listen on the incoming query sockets and distribute them to workers + static unsigned int s_numDistributorThreads; + static unsigned int s_numWorkerThreads; }; struct ThreadMSG