From: Otto Moerbeek Date: Wed, 26 Jan 2022 08:26:19 +0000 (+0100) Subject: Refactor recursorThread startup and join code. X-Git-Tag: auth-4.7.0-alpha1~19^2~7 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=3487974d120df19f5cf93d93d97db984bc8c8445;p=thirdparty%2Fpdns.git Refactor recursorThread startup and join code. --- diff --git a/pdns/recursordist/rec-main.cc b/pdns/recursordist/rec-main.cc index 9f3a63db61..79342bbf8a 100644 --- a/pdns/recursordist/rec-main.cc +++ b/pdns/recursordist/rec-main.cc @@ -108,6 +108,229 @@ unsigned int RecThreadInfo::s_numDistributorThreads; unsigned int RecThreadInfo::s_numWorkerThreads; thread_local unsigned int RecThreadInfo::t_id; +static std::map> parseCPUMap() +{ + std::map> result; + + const std::string value = ::arg()["cpu-map"]; + + if (!value.empty() && !isSettingThreadCPUAffinitySupported()) { + g_log << Logger::Warning << "CPU mapping requested but not supported, skipping" << endl; + return result; + } + + std::vector parts; + + stringtok(parts, value, " \t"); + + for (const auto& part : parts) { + if (part.find('=') == string::npos) + continue; + + try { + auto headers = splitField(part, '='); + boost::trim(headers.first); + boost::trim(headers.second); + + unsigned int threadId = pdns_stou(headers.first); + std::vector cpus; + + stringtok(cpus, headers.second, ","); + + for (const auto& cpu : cpus) { + int cpuId = std::stoi(cpu); + + result[threadId].insert(cpuId); + } + } + catch (const std::exception& e) { + g_log << Logger::Error << "Error parsing cpu-map entry '" << part << "': " << e.what() << endl; + } + } + + return result; +} + +static void setCPUMap(const std::map>& cpusMap, unsigned int n, pthread_t tid) +{ + const auto& cpuMapping = cpusMap.find(n); + if (cpuMapping == cpusMap.cend()) { + return; + } + int rc = mapThreadToCPUList(tid, cpuMapping->second); + if (rc == 0) { + g_log << Logger::Info << "CPU affinity for thread " << n << " has been set to CPU map:"; + for (const auto cpu : cpuMapping->second) { + g_log << Logger::Info << " " << cpu; + } + g_log << Logger::Info << endl; + } + else { + g_log << Logger::Warning << "Error setting CPU affinity for thread " << n << " to CPU map:"; + for (const auto cpu : cpuMapping->second) { + g_log << Logger::Info << " " << cpu; + } + g_log << Logger::Info << ' ' << strerror(rc) << endl; + } +} + +void RecThreadInfo::start(unsigned int id, const string& name, const std::map>& cpusMap) +{ + thread = std::thread([id, name] { + t_id = id; + const string threadPrefix = "rec/"; + setThreadName(threadPrefix + name); + recursorThread(); + }); + setCPUMap(cpusMap, id, thread.native_handle()); +} + +int RecThreadInfo::runThreads() +{ + int ret = EXIT_SUCCESS; + unsigned int currentThreadId = 1; + const auto cpusMap = parseCPUMap(); + + if (RecThreadInfo::numDistributors() + RecThreadInfo::numWorkers() == 1) { + g_log << Logger::Warning << "Operating unthreaded" << endl; + +#ifdef HAVE_SYSTEMD + sd_notify(0, "READY=1"); +#endif + + /* This thread handles the web server, carbon, statistics and the control channel */ + auto& handlerInfo = RecThreadInfo::info(0); + handlerInfo.setHandler(); + handlerInfo.start(0, "web+stat", cpusMap); + auto& taskInfo = RecThreadInfo::info(2); + taskInfo.setTaskThread(); + taskInfo.start(2, "tasks", cpusMap); + + auto& info = RecThreadInfo::info(currentThreadId); + info.setListener(); + info.setWorker(); + info.setThreadId(currentThreadId++); + recursorThread(); + + handlerInfo.thread.join(); + if (handlerInfo.exitCode != 0) { + ret = handlerInfo.exitCode; + } + taskInfo.thread.join(); + if (taskInfo.exitCode != 0) { + ret = taskInfo.exitCode; + } + } + else { + // Setup RecThreadInfo objects + unsigned int tmp = currentThreadId; + if (RecThreadInfo::weDistributeQueries()) { + for (unsigned int n = 0; n < RecThreadInfo::numDistributors(); ++n) { + RecThreadInfo::info(tmp++).setListener(); + } + } + for (unsigned int n = 0; n < RecThreadInfo::numWorkers(); ++n) { + auto& info = RecThreadInfo::info(tmp++); + info.setListener(!RecThreadInfo::weDistributeQueries()); + info.setWorker(); + } + for (unsigned int n = 0; n < RecThreadInfo::numTaskThreads(); ++n) { + auto& info = RecThreadInfo::info(tmp++); + info.setTaskThread(); + } + + // And now start the actual threads + if (RecThreadInfo::weDistributeQueries()) { + g_log << Logger::Warning << "Launching " << RecThreadInfo::numDistributors() << " distributor threads" << endl; + for (unsigned int n = 0; n < RecThreadInfo::numDistributors(); ++n) { + auto& info = RecThreadInfo::info(currentThreadId); + info.start(currentThreadId++, "distr", cpusMap); + } + } + + g_log << Logger::Warning << "Launching " << RecThreadInfo::numWorkers() << " worker threads" << endl; + + for (unsigned int n = 0; n < RecThreadInfo::numWorkers(); ++n) { + auto& info = RecThreadInfo::info(currentThreadId); + info.start(currentThreadId++, "worker", cpusMap); + } + + for (unsigned int n = 0; n < RecThreadInfo::numTaskThreads(); ++n) { + auto& info = RecThreadInfo::info(currentThreadId); + info.start(currentThreadId++, "taskThread", cpusMap); + } + +#ifdef HAVE_SYSTEMD + sd_notify(0, "READY=1"); +#endif + + /* This thread handles the web server, carbon, statistics and the control channel */ + auto& info = RecThreadInfo::info(0); + info.setHandler(); + info.start(0, "web+stat", cpusMap); + + for (auto& ti : RecThreadInfo::infos()) { + ti.thread.join(); + if (ti.exitCode != 0) { + ret = ti.exitCode; + } + } + } + return ret; +} + +void RecThreadInfo::makeThreadPipes() +{ + auto pipeBufferSize = ::arg().asNum("distribution-pipe-buffer-size"); + if (pipeBufferSize > 0) { + g_log << Logger::Info << "Resizing the buffer of the distribution pipe to " << pipeBufferSize << endl; + } + + /* thread 0 is the handler / SNMP, worker threads start at 1 */ + for (unsigned int n = 0; n < numRecursorThreads(); ++n) { + auto& threadInfo = info(n); + + int fd[2]; + if (pipe(fd) < 0) + unixDie("Creating pipe for inter-thread communications"); + + threadInfo.pipes.readToThread = fd[0]; + threadInfo.pipes.writeToThread = fd[1]; + + // handler thread only gets first pipe, not the others + if (n == 0) { + continue; + } + + if (pipe(fd) < 0) + unixDie("Creating pipe for inter-thread communications"); + + threadInfo.pipes.readFromThread = fd[0]; + threadInfo.pipes.writeFromThread = fd[1]; + + if (pipe(fd) < 0) + unixDie("Creating pipe for inter-thread communications"); + + threadInfo.pipes.readQueriesToThread = fd[0]; + threadInfo.pipes.writeQueriesToThread = fd[1]; + + if (pipeBufferSize > 0) { + if (!setPipeBufferSize(threadInfo.pipes.writeQueriesToThread, pipeBufferSize)) { + int err = errno; + g_log << Logger::Warning << "Error resizing the buffer of the distribution pipe for thread " << n << " to " << pipeBufferSize << ": " << strerror(err) << endl; + auto existingSize = getPipeBufferSize(threadInfo.pipes.writeQueriesToThread); + if (existingSize > 0) { + g_log << Logger::Warning << "The current size of the distribution pipe's buffer for thread " << n << " is " << existingSize << endl; + } + } + } + + if (!setNonBlocking(threadInfo.pipes.writeQueriesToThread)) { + unixDie("Making pipe for inter-thread communications non-blocking"); + } + } +} + ArgvMap& arg() { static ArgvMap theArg; @@ -426,71 +649,6 @@ static void writePid(void) } } -static std::map> parseCPUMap() -{ - std::map> result; - - const std::string value = ::arg()["cpu-map"]; - - if (!value.empty() && !isSettingThreadCPUAffinitySupported()) { - g_log << Logger::Warning << "CPU mapping requested but not supported, skipping" << endl; - return result; - } - - std::vector parts; - - stringtok(parts, value, " \t"); - - for (const auto& part : parts) { - if (part.find('=') == string::npos) - continue; - - try { - auto headers = splitField(part, '='); - boost::trim(headers.first); - boost::trim(headers.second); - - unsigned int threadId = pdns_stou(headers.first); - std::vector cpus; - - stringtok(cpus, headers.second, ","); - - for (const auto& cpu : cpus) { - int cpuId = std::stoi(cpu); - - result[threadId].insert(cpuId); - } - } - catch (const std::exception& e) { - g_log << Logger::Error << "Error parsing cpu-map entry '" << part << "': " << e.what() << endl; - } - } - - return result; -} - -static void setCPUMap(const std::map>& cpusMap, unsigned int n, pthread_t tid) -{ - const auto& cpuMapping = cpusMap.find(n); - if (cpuMapping != cpusMap.cend()) { - int rc = mapThreadToCPUList(tid, cpuMapping->second); - if (rc == 0) { - g_log << Logger::Info << "CPU affinity for worker " << n << " has been set to CPU map:"; - for (const auto cpu : cpuMapping->second) { - g_log << Logger::Info << " " << cpu; - } - g_log << Logger::Info << endl; - } - else { - g_log << Logger::Warning << "Error setting CPU affinity for worker " << n << " to CPU map:"; - for (const auto cpu : cpuMapping->second) { - g_log << Logger::Info << " " << cpu; - } - g_log << Logger::Info << strerror(rc) << endl; - } - } -} - static void checkSocketDir(void) { struct stat st; @@ -689,57 +847,6 @@ static void loggerBackend(const Logging::Entry& entry) g_log << u << buf.str() << endl; } -static void makeThreadPipes() -{ - auto pipeBufferSize = ::arg().asNum("distribution-pipe-buffer-size"); - if (pipeBufferSize > 0) { - g_log << Logger::Info << "Resizing the buffer of the distribution pipe to " << pipeBufferSize << endl; - } - - /* thread 0 is the handler / SNMP, worker threads start at 1 */ - for (unsigned int n = 0; n < RecThreadInfo::numRecursorThreads(); ++n) { - auto& threadInfo = RecThreadInfo::info(n); - - int fd[2]; - if (pipe(fd) < 0) - unixDie("Creating pipe for inter-thread communications"); - - threadInfo.pipes.readToThread = fd[0]; - threadInfo.pipes.writeToThread = fd[1]; - - // handler thread only gets first pipe, not the others - if (n == 0) { - continue; - } - - if (pipe(fd) < 0) - unixDie("Creating pipe for inter-thread communications"); - - threadInfo.pipes.readFromThread = fd[0]; - threadInfo.pipes.writeFromThread = fd[1]; - - if (pipe(fd) < 0) - unixDie("Creating pipe for inter-thread communications"); - - threadInfo.pipes.readQueriesToThread = fd[0]; - threadInfo.pipes.writeQueriesToThread = fd[1]; - - if (pipeBufferSize > 0) { - if (!setPipeBufferSize(threadInfo.pipes.writeQueriesToThread, pipeBufferSize)) { - int err = errno; - g_log << Logger::Warning << "Error resizing the buffer of the distribution pipe for thread " << n << " to " << pipeBufferSize << ": " << strerror(err) << endl; - auto existingSize = getPipeBufferSize(threadInfo.pipes.writeQueriesToThread); - if (existingSize > 0) { - g_log << Logger::Warning << "The current size of the distribution pipe's buffer for thread " << n << " is " << existingSize << endl; - } - } - } - - if (!setNonBlocking(threadInfo.pipes.writeQueriesToThread)) { - unixDie("Making pipe for inter-thread communications non-blocking"); - } - } -} static int ratePercentage(uint64_t nom, uint64_t denom) { @@ -1041,8 +1148,6 @@ template ThreadTimes broadcastAccFunction(const boost::function& static int serviceMain(int argc, char* argv[]) { - int ret = EXIT_SUCCESS; - g_slogStructured = ::arg().mustDo("structured-logging"); g_log.setName(g_programname); @@ -1578,7 +1683,7 @@ static int serviceMain(int argc, char* argv[]) startLuaConfigDelayedThreads(delayedLuaThreads, g_luaconfs.getCopy().generation); delayedLuaThreads.rpzPrimaryThreads.clear(); // no longer needed - makeThreadPipes(); + RecThreadInfo::makeThreadPipes(); g_tcpTimeout = ::arg().asNum("client-tcp-timeout"); g_maxTCPPerClient = ::arg().asNum("max-tcp-per-client"); @@ -1629,99 +1734,7 @@ static int serviceMain(int argc, char* argv[]) g_avoidUdpSourcePorts.insert(port); } - unsigned int currentThreadId = 1; - const auto cpusMap = parseCPUMap(); - - if (RecThreadInfo::numDistributors() + RecThreadInfo::numWorkers() == 1) { - g_log << Logger::Warning << "Operating unthreaded" << endl; -#ifdef HAVE_SYSTEMD - sd_notify(0, "READY=1"); -#endif - - /* This thread handles the web server, carbon, statistics and the control channel */ - auto& handlerInfo = RecThreadInfo::info(0); - handlerInfo.setHandler(); - handlerInfo.start(0, "web+stat"); - auto& taskInfo = RecThreadInfo::info(2); - taskInfo.setTaskThread(); - taskInfo.start(2, "tasks"); - - setCPUMap(cpusMap, currentThreadId, pthread_self()); - - auto& info = RecThreadInfo::info(currentThreadId); - info.setListener(); - info.setWorker(); - info.setThreadId(currentThreadId++); - recursorThread(); - - handlerInfo.thread.join(); - if (handlerInfo.exitCode != 0) { - ret = handlerInfo.exitCode; - } - taskInfo.thread.join(); - if (taskInfo.exitCode != 0) { - ret = taskInfo.exitCode; - } - } - else { - // Setup RecThreadInfo objects - unsigned int tmp = currentThreadId; - if (RecThreadInfo::weDistributeQueries()) { - for (unsigned int n = 0; n < RecThreadInfo::numDistributors(); ++n) { - RecThreadInfo::info(tmp++).setListener(); - } - } - for (unsigned int n = 0; n < RecThreadInfo::numWorkers(); ++n) { - auto& info = RecThreadInfo::info(tmp++); - info.setListener(!RecThreadInfo::weDistributeQueries()); - info.setWorker(); - } - for (unsigned int n = 0; n < RecThreadInfo::numTaskThreads(); ++n) { - auto& info = RecThreadInfo::info(tmp++); - info.setTaskThread(); - } - - if (RecThreadInfo::weDistributeQueries()) { - g_log << Logger::Warning << "Launching " << RecThreadInfo::numDistributors() << " distributor threads" << endl; - for (unsigned int n = 0; n < RecThreadInfo::numDistributors(); ++n) { - auto& info = RecThreadInfo::info(currentThreadId); - info.start(currentThreadId++, "distr"); - setCPUMap(cpusMap, currentThreadId, info.thread.native_handle()); // XXX off by one? - } - } - - g_log << Logger::Warning << "Launching " << RecThreadInfo::numWorkers() << " worker threads" << endl; - - for (unsigned int n = 0; n < RecThreadInfo::numWorkers(); ++n) { - auto& info = RecThreadInfo::info(currentThreadId); - info.start(currentThreadId++, "worker"); - setCPUMap(cpusMap, currentThreadId, info.thread.native_handle()); // XXX off by one? - } - - for (unsigned int n = 0; n < RecThreadInfo::numTaskThreads(); ++n) { - auto& info = RecThreadInfo::info(currentThreadId); - info.start(currentThreadId++, "taskThread"); - setCPUMap(cpusMap, currentThreadId, info.thread.native_handle()); // XXX off by one? - } - -#ifdef HAVE_SYSTEMD - sd_notify(0, "READY=1"); -#endif - - /* This thread handles the web server, carbon, statistics and the control channel */ - auto& info = RecThreadInfo::info(0); - info.setHandler(); - info.start(0, "web+stat"); - - for (auto& ti : RecThreadInfo::infos()) { - ti.thread.join(); - if (ti.exitCode != 0) { - ret = ti.exitCode; - } - } - } - - return ret; + return RecThreadInfo::runThreads(); } static void handlePipeRequest(int fd, FDMultiplexer::funcparam_t& var) @@ -1937,7 +1950,7 @@ void* recursorThread() if (threadInfo.isHandler()) { if (!primeHints()) { - threadInfo.exitCode = EXIT_FAILURE; + threadInfo.setExitCode(EXIT_FAILURE); RecursorControlChannel::stop = 1; g_log << Logger::Critical << "Priming cache failed, stopping" << endl; return nullptr; diff --git a/pdns/recursordist/rec-main.hh b/pdns/recursordist/rec-main.hh index 93d5c979eb..01d375e4ea 100644 --- a/pdns/recursordist/rec-main.hh +++ b/pdns/recursordist/rec-main.hh @@ -376,16 +376,6 @@ public: taskThread = true; } - void start(unsigned int id, const string& name) - { - thread = std::thread([id, name] { - t_id = id; - const string threadPrefix = "rec/"; - setThreadName(threadPrefix + name); - recursorThread(); - }); - } - static unsigned int id() { return t_id; @@ -441,6 +431,14 @@ public: return numHandlers() + numDistributors() + numWorkers() + numTaskThreads(); } + static int runThreads(); + static void makeThreadPipes(); + + void setExitCode(int e) + { + exitCode = e; + } + // FD corresponding to TCP sockets this thread is listening on. // These FDs are also in deferredAdds when we have one socket per // listener, and in g_deferredAdds instead. @@ -451,12 +449,15 @@ public: deferredAdd_t deferredAdds; struct ThreadPipeSet pipes; - std::thread thread; MT_t* mt{nullptr}; uint64_t numberOfDistributedQueries{0}; - int exitCode{0}; private: + void start(unsigned int id, const string& name, const std::map>& cpusMap); + + std::thread thread; + int exitCode{0}; + // handle the web server, carbon, statistics and the control channel bool handler{false}; // accept incoming queries (and distributes them to the workers if pdns-distributes-queries is set)