From: Otto Moerbeek Date: Tue, 25 Jan 2022 13:23:08 +0000 (+0100) Subject: Move threadinfo vector to RecThreadInfo class X-Git-Tag: auth-4.7.0-alpha1~19^2~10 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=4f6e00dc93f2055ba29a4fdcc9266060d203b23a;p=thirdparty%2Fpdns.git Move threadinfo vector to RecThreadInfo class --- diff --git a/pdns/pdns_recursor.cc b/pdns/pdns_recursor.cc index 3314b0164b..10cca1503f 100644 --- a/pdns/pdns_recursor.cc +++ b/pdns/pdns_recursor.cc @@ -1756,7 +1756,7 @@ void requestWipeCaches(const DNSName& canon) ThreadMSG* tmsg = new ThreadMSG(); tmsg->func = [=] { return pleaseWipeCaches(canon, true, 0xffff); }; tmsg->wantAnswer = false; - if (write(g_threadInfos[0].pipes.writeToThread, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) { + if (write(RecThreadInfo::info(0).pipes.writeToThread, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) { delete tmsg; unixDie("write to thread pipe returned wrong size or error"); @@ -2322,7 +2322,7 @@ void makeUDPServerSockets(deferredAdd_t& deferredAdds) static bool trySendingQueryToWorker(unsigned int target, ThreadMSG* tmsg) { - auto& targetInfo = g_threadInfos[target]; + auto& targetInfo = RecThreadInfo::info(target); if (!targetInfo.isWorker()) { g_log << Logger::Error << "distributeAsyncFunction() tried to assign a query to a non-worker thread" << endl; _exit(1); @@ -2353,7 +2353,7 @@ static bool trySendingQueryToWorker(unsigned int target, ThreadMSG* tmsg) static unsigned int getWorkerLoad(size_t workerIdx) { - const auto mt = g_threadInfos[/* skip handler */ 1 + RecThreadInfo::s_numDistributorThreads + workerIdx].mt; + const auto mt = RecThreadInfo::info(/* skip handler */ 1 + RecThreadInfo::s_numDistributorThreads + workerIdx).mt; if (mt != nullptr) { return mt->numProcesses(); } diff --git a/pdns/recursordist/rec-main.cc b/pdns/recursordist/rec-main.cc index 97420d9422..8bf704dc09 100644 --- a/pdns/recursordist/rec-main.cc +++ b/pdns/recursordist/rec-main.cc @@ -101,7 +101,7 @@ deferredAdd_t g_deferredAdds; helper threads like SNMP might have t_id == 0 as well) then the distributor threads if any and finally the workers */ -std::vector g_threadInfos; +std::vector RecThreadInfo::s_threadInfos; bool RecThreadInfo::s_weDistributeQueries; // if true, 1 or more threads listen on the incoming query sockets and distribute them to workers unsigned int RecThreadInfo::s_numDistributorThreads; @@ -697,15 +697,15 @@ static void makeThreadPipes() } /* thread 0 is the handler / SNMP, worker threads start at 1 */ - for (unsigned int n = 0; n <= (RecThreadInfo::s_numWorkerThreads + RecThreadInfo::s_numDistributorThreads); ++n) { - auto& threadInfos = g_threadInfos.at(n); + for (unsigned int n = 0; n <= RecThreadInfo::numThreads(); ++n) { + auto& threadInfo = RecThreadInfo::info(n); int fd[2]; if (pipe(fd) < 0) unixDie("Creating pipe for inter-thread communications"); - threadInfos.pipes.readToThread = fd[0]; - threadInfos.pipes.writeToThread = fd[1]; + threadInfo.pipes.readToThread = fd[0]; + threadInfo.pipes.writeToThread = fd[1]; // handler thread only gets first pipe, not the others if (n == 0) { @@ -715,27 +715,27 @@ static void makeThreadPipes() if (pipe(fd) < 0) unixDie("Creating pipe for inter-thread communications"); - threadInfos.pipes.readFromThread = fd[0]; - threadInfos.pipes.writeFromThread = fd[1]; + threadInfo.pipes.readFromThread = fd[0]; + threadInfo.pipes.writeFromThread = fd[1]; if (pipe(fd) < 0) unixDie("Creating pipe for inter-thread communications"); - threadInfos.pipes.readQueriesToThread = fd[0]; - threadInfos.pipes.writeQueriesToThread = fd[1]; + threadInfo.pipes.readQueriesToThread = fd[0]; + threadInfo.pipes.writeQueriesToThread = fd[1]; if (pipeBufferSize > 0) { - if (!setPipeBufferSize(threadInfos.pipes.writeQueriesToThread, pipeBufferSize)) { + if (!setPipeBufferSize(threadInfo.pipes.writeQueriesToThread, pipeBufferSize)) { int err = errno; g_log << Logger::Warning << "Error resizing the buffer of the distribution pipe for thread " << n << " to " << pipeBufferSize << ": " << strerror(err) << endl; - auto existingSize = getPipeBufferSize(threadInfos.pipes.writeQueriesToThread); + auto existingSize = getPipeBufferSize(threadInfo.pipes.writeQueriesToThread); if (existingSize > 0) { g_log << Logger::Warning << "The current size of the distribution pipe's buffer for thread " << n << " is " << existingSize << endl; } } } - if (!setNonBlocking(threadInfos.pipes.writeQueriesToThread)) { + if (!setNonBlocking(threadInfo.pipes.writeQueriesToThread)) { unixDie("Making pipe for inter-thread communications non-blocking"); } } @@ -782,7 +782,7 @@ static void doStats(void) g_log << Logger::Notice << "stats: " << pcSize << " packet cache entries, " << ratePercentage(pcHits, SyncRes::s_queries) << "% packet cache hits" << endl; size_t idx = 0; - for (const auto& threadInfo : g_threadInfos) { + for (const auto& threadInfo : RecThreadInfo::infos()) { if (threadInfo.isWorker()) { g_log << Logger::Notice << "stats: thread " << idx << " has been distributed " << threadInfo.numberOfDistributedQueries << " queries" << endl; ++idx; @@ -936,7 +936,7 @@ void broadcastFunction(const pipefunc_t& func) for the initialization of ACLs and domain maps. After that it should only be called by the handler. */ - if (g_threadInfos.empty() && RecThreadInfo::id() == 0) { + if (RecThreadInfo::infos().empty() && RecThreadInfo::id() == 0) { /* the handler and distributors will call themselves below, but during startup we get called while g_threadInfos has not been populated yet to update the ACL or domain maps, so we need to @@ -946,7 +946,7 @@ void broadcastFunction(const pipefunc_t& func) } unsigned int n = 0; - for (const auto& threadInfo : g_threadInfos) { + for (const auto& threadInfo : RecThreadInfo::infos()) { if (n++ == RecThreadInfo::id()) { func(); // don't write to ourselves! continue; @@ -1004,7 +1004,7 @@ T broadcastAccFunction(const boost::function& func) unsigned int n = 0; T ret = T(); - for (const auto& threadInfo : g_threadInfos) { + for (const auto& threadInfo : RecThreadInfo::infos()) { if (n++ == RecThreadInfo::id()) { continue; } @@ -1424,14 +1424,15 @@ static int serviceMain(int argc, char* argv[]) g_reusePort = ::arg().mustDo("reuseport"); #endif - g_threadInfos.resize(RecThreadInfo::s_numDistributorThreads + RecThreadInfo::s_numWorkerThreads + /* handler */ 1); + RecThreadInfo::infos().resize(RecThreadInfo::numThreads() + /* handler */ 1); if (g_reusePort) { if (RecThreadInfo::s_weDistributeQueries) { /* first thread is the handler, then distributors */ for (unsigned int threadId = 1; threadId <= RecThreadInfo::s_numDistributorThreads; threadId++) { - auto& deferredAdds = g_threadInfos.at(threadId).deferredAdds; - auto& tcpSockets = g_threadInfos.at(threadId).tcpSockets; + auto& info = RecThreadInfo::info(threadId); + auto& deferredAdds = info.deferredAdds; + auto& tcpSockets = info.tcpSockets; makeUDPServerSockets(deferredAdds); makeTCPServerSockets(deferredAdds, tcpSockets); } @@ -1439,8 +1440,9 @@ static int serviceMain(int argc, char* argv[]) else { /* first thread is the handler, there is no distributor here and workers are accepting queries */ for (unsigned int threadId = 1; threadId <= RecThreadInfo::s_numWorkerThreads; threadId++) { - auto& deferredAdds = g_threadInfos.at(threadId).deferredAdds; - auto& tcpSockets = g_threadInfos.at(threadId).tcpSockets; + auto& info = RecThreadInfo::info(threadId); + auto& deferredAdds = info.deferredAdds; + auto& tcpSockets = info.tcpSockets; makeUDPServerSockets(deferredAdds); makeTCPServerSockets(deferredAdds, tcpSockets); } @@ -1458,13 +1460,13 @@ static int serviceMain(int argc, char* argv[]) if (RecThreadInfo::s_weDistributeQueries) { /* first thread is the handler, then distributors */ for (unsigned int threadId = 1; threadId <= RecThreadInfo::s_numDistributorThreads; threadId++) { - g_threadInfos.at(threadId).tcpSockets = tcpSockets; + RecThreadInfo::info(threadId).tcpSockets = tcpSockets; } } else { /* first thread is the handler, there is no distributor here and workers are accepting queries */ for (unsigned int threadId = 1; threadId <= RecThreadInfo::s_numWorkerThreads; threadId++) { - g_threadInfos.at(threadId).tcpSockets = tcpSockets; + RecThreadInfo::info(threadId).tcpSockets = tcpSockets; } } } @@ -1637,13 +1639,13 @@ static int serviceMain(int argc, char* argv[]) #endif /* This thread handles the web server, carbon, statistics and the control channel */ - auto& handlerInfo = g_threadInfos.at(0); + auto& handlerInfo = RecThreadInfo::info(0); handlerInfo.setHandler(); handlerInfo.start(0, "web+stat"); setCPUMap(cpusMap, currentThreadId, pthread_self()); - auto& info = g_threadInfos.at(currentThreadId); + auto& info = RecThreadInfo::info(currentThreadId); info.setListener(); info.setWorker(); info.setThreadId(currentThreadId++); @@ -1657,11 +1659,11 @@ static int serviceMain(int argc, char* argv[]) else { if (RecThreadInfo::s_weDistributeQueries) { for (unsigned int n = 0; n < RecThreadInfo::s_numDistributorThreads; ++n) { - g_threadInfos.at(currentThreadId + n).setListener(); + RecThreadInfo::info(currentThreadId + n).setListener(); } } for (unsigned int n = 0; n < RecThreadInfo::s_numWorkerThreads; ++n) { - auto& info = g_threadInfos.at(currentThreadId + (RecThreadInfo::s_weDistributeQueries ? RecThreadInfo::s_numDistributorThreads : 0) + n); + auto& info = RecThreadInfo::info(currentThreadId + (RecThreadInfo::s_weDistributeQueries ? RecThreadInfo::s_numDistributorThreads : 0) + n); info.setListener(!RecThreadInfo::s_weDistributeQueries); info.setWorker(); } @@ -1669,18 +1671,18 @@ static int serviceMain(int argc, char* argv[]) if (RecThreadInfo::s_weDistributeQueries) { g_log << Logger::Warning << "Launching " << RecThreadInfo::s_numDistributorThreads << " distributor threads" << endl; for (unsigned int n = 0; n < RecThreadInfo::s_numDistributorThreads; ++n) { - auto& info = g_threadInfos.at(currentThreadId); + auto& info = RecThreadInfo::info(currentThreadId); info.start(currentThreadId++, "distr"); - setCPUMap(cpusMap, currentThreadId, info.thread.native_handle()); + setCPUMap(cpusMap, currentThreadId, info.thread.native_handle()); // XXX off by one? } } g_log << Logger::Warning << "Launching " << RecThreadInfo::s_numWorkerThreads << " worker threads" << endl; for (unsigned int n = 0; n < RecThreadInfo::s_numWorkerThreads; ++n) { - auto& info = g_threadInfos.at(currentThreadId); + auto& info = RecThreadInfo::info(currentThreadId); info.start(currentThreadId++, "worker"); - setCPUMap(cpusMap, currentThreadId, info.thread.native_handle()); + setCPUMap(cpusMap, currentThreadId, info.thread.native_handle()); // XXX off by one? } #ifdef HAVE_SYSTEMD @@ -1688,11 +1690,11 @@ static int serviceMain(int argc, char* argv[]) #endif /* This thread handles the web server, carbon, statistics and the control channel */ - auto& info = g_threadInfos.at(0); + auto& info = RecThreadInfo::info(0); info.setHandler(); info.start(0, "web+stat"); - for (auto& ti : g_threadInfos) { + for (auto& ti : RecThreadInfo::infos()) { ti.thread.join(); if (ti.exitCode != 0) { ret = ti.exitCode; diff --git a/pdns/recursordist/rec-main.hh b/pdns/recursordist/rec-main.hh index 49e7fd2d21..e4f79ae601 100644 --- a/pdns/recursordist/rec-main.hh +++ b/pdns/recursordist/rec-main.hh @@ -290,16 +290,13 @@ static bool sendResponseOverTCP(const std::unique_ptr& dc, const return hadError; } -struct RecThreadInfo; -/* first we have the handler thread, t_id == 0 (some other - helper threads like SNMP might have t_id == 0 as well) - then the distributor threads if any - and finally the workers */ -extern std::vector g_threadInfos; void* recursorThread(); -// for communicating with our threads -// effectively readonly after startup +// For communicating with our threads effectively readonly after +// startup. +// First we have the handler thread, t_id == 0 (some other helper +// threads like SNMP might have t_id == 0 as well) then the +// distributor threads if any and finally the workers struct RecThreadInfo { struct ThreadPipeSet @@ -315,7 +312,17 @@ struct RecThreadInfo public: static RecThreadInfo& self() { - return g_threadInfos.at(t_id); + return s_threadInfos.at(t_id); + } + + static RecThreadInfo& info(unsigned int i) + { + return s_threadInfos.at(i); + } + + static vector& infos() + { + return s_threadInfos; } bool isDistributor() const @@ -367,7 +374,6 @@ public: setThreadName(threadPrefix + name); recursorThread(); }); - sleep(1); } static unsigned int id() @@ -380,15 +386,15 @@ public: t_id = id; } - /* FD corresponding to TCP sockets this thread is listening - on. - These FDs are also in deferredAdds when we have one - socket per listener, and in g_deferredAdds instead. */ + // FD corresponding to TCP sockets this thread is listening on. + // These FDs are also in deferredAdds when we have one socket per + // listener, and in g_deferredAdds instead. std::set tcpSockets; - /* FD corresponding to listening sockets if we have one socket per - listener (with reuseport), otherwise all listeners share the - same FD and g_deferredAdds is then used instead */ + // FD corresponding to listening sockets if we have one socket per + // listener (with reuseport), otherwise all listeners share the + // same FD and g_deferredAdds is then used instead deferredAdd_t deferredAdds; + struct ThreadPipeSet pipes; std::thread thread; MT_t* mt{nullptr}; @@ -412,6 +418,7 @@ private: /* process queries */ bool worker{false}; static thread_local unsigned int t_id; + static std::vector s_threadInfos; }; struct ThreadMSG