From 7819971141a048d482e97db29a70107d2c56b109 Mon Sep 17 00:00:00 2001 From: Otto Moerbeek Date: Wed, 30 Aug 2023 12:34:34 +0200 Subject: [PATCH] Run seperate TCP threads. These threads listen and process incoming TCP queries TODO: test, validate reuseport behaviour, settings, docs --- pdns/recursordist/pdns_recursor.cc | 8 +- pdns/recursordist/rec-main.cc | 188 +++++++++++++++++------------ pdns/recursordist/rec-main.hh | 93 +++++++++++--- 3 files changed, 192 insertions(+), 97 deletions(-) diff --git a/pdns/recursordist/pdns_recursor.cc b/pdns/recursordist/pdns_recursor.cc index e6ef3bae63..d8da4bd7a4 100644 --- a/pdns/recursordist/pdns_recursor.cc +++ b/pdns/recursordist/pdns_recursor.cc @@ -2064,7 +2064,7 @@ void requestWipeCaches(const DNSName& canon) ThreadMSG* tmsg = new ThreadMSG(); // NOLINT: pointer owner tmsg->func = [=] { return pleaseWipeCaches(canon, true, 0xffff); }; tmsg->wantAnswer = false; - if (write(RecThreadInfo::info(0).pipes.writeToThread, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) { // NOLINT: correct sizeof + if (write(RecThreadInfo::info(0).getPipes().writeToThread, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) { // NOLINT: correct sizeof delete tmsg; // NOLINT: pointer owner unixDie("write to thread pipe returned wrong size or error"); @@ -2084,7 +2084,7 @@ bool expectProxyProtocol(const ComboAddress& from) // mappedSource: the address we assume the query is coming from. Differs from source if table based mapping has been applied static string* doProcessUDPQuestion(const std::string& question, const ComboAddress& fromaddr, const ComboAddress& destaddr, ComboAddress source, ComboAddress destination, const ComboAddress& mappedSource, struct timeval tval, int fileDesc, std::vector& proxyProtocolValues, RecEventTrace& eventTrace) // NOLINT(readability-function-cognitive-complexity): https://github.com/PowerDNS/pdns/issues/12791 { - ++(RecThreadInfo::self().numberOfDistributedQueries); + RecThreadInfo::self().incNumberOfDistributedQueries(); gettimeofday(&g_now, nullptr); if (tval.tv_sec != 0) { struct timeval diff = g_now - tval; @@ -2691,7 +2691,7 @@ static bool trySendingQueryToWorker(unsigned int target, ThreadMSG* tmsg) _exit(1); } - const auto& tps = targetInfo.pipes; + const auto& tps = targetInfo.getPipes(); ssize_t written = write(tps.writeQueriesToThread, &tmsg, sizeof(tmsg)); // NOLINT: correct sizeof if (written > 0) { @@ -2714,7 +2714,7 @@ static bool trySendingQueryToWorker(unsigned int target, ThreadMSG* tmsg) static unsigned int getWorkerLoad(size_t workerIdx) { - const auto* multiThreader = RecThreadInfo::info(RecThreadInfo::numHandlers() + RecThreadInfo::numDistributors() + workerIdx).mt; + const auto* multiThreader = RecThreadInfo::info(RecThreadInfo::numHandlers() + RecThreadInfo::numDistributors() + workerIdx).getMT(); if (multiThreader != nullptr) { return multiThreader->numProcesses(); } diff --git a/pdns/recursordist/rec-main.cc b/pdns/recursordist/rec-main.cc index 10469fb445..5a060f57e9 100644 --- a/pdns/recursordist/rec-main.cc +++ b/pdns/recursordist/rec-main.cc @@ -106,7 +106,8 @@ std::shared_ptr g_slogudpin; std::shared_ptr g_slogudpout; /* without reuseport, all listeners share the same sockets */ -deferredAdd_t g_deferredAdds; +static deferredAdd_t g_deferredAdds; +static deferredAdd_t g_deferredTCPAdds; /* first we have the handler thread, t_id == 0 (some other helper threads like SNMP might have t_id == 0 as well) @@ -120,6 +121,7 @@ thread_local std::unique_ptr t_proxyMapping; bool RecThreadInfo::s_weDistributeQueries; // if true, 1 or more threads listen on the incoming query sockets and distribute them to workers unsigned int RecThreadInfo::s_numDistributorThreads; unsigned int RecThreadInfo::s_numWorkerThreads; +unsigned int RecThreadInfo::s_numTCPWorkerThreads; thread_local unsigned int RecThreadInfo::t_id; static std::map> parseCPUMap(Logr::log_t log) @@ -218,7 +220,6 @@ void RecThreadInfo::start(unsigned int tid, const string& tname, const std::map< int RecThreadInfo::runThreads(Logr::log_t log) { int ret = EXIT_SUCCESS; - unsigned int currentThreadId = 1; const auto cpusMap = parseCPUMap(log); if (RecThreadInfo::numDistributors() + RecThreadInfo::numWorkers() == 1) { @@ -226,72 +227,106 @@ int RecThreadInfo::runThreads(Logr::log_t log) log->info(Logr::Notice, "Operating with single distributor/worker thread")); /* This thread handles the web server, carbon, statistics and the control channel */ - auto& handlerInfo = RecThreadInfo::info(0); + unsigned int currentThreadId = 0; + auto& handlerInfo = RecThreadInfo::info(currentThreadId); handlerInfo.setHandler(); - handlerInfo.start(0, "web+stat", cpusMap, log); - auto& taskInfo = RecThreadInfo::info(2); - taskInfo.setTaskThread(); - taskInfo.start(2, "task", cpusMap, log); + handlerInfo.start(currentThreadId, "web+stat", cpusMap, log); + // We skip the single UDP worker thread 1, it's handled after the loop and taskthreads + currentThreadId = 2; + for (unsigned int thread = 0; thread < RecThreadInfo::numTCPWorkers(); thread++, currentThreadId++) { + auto& info = RecThreadInfo::info(currentThreadId); + info.setListener(); + info.setTCPListener(); + info.setWorker(); + info.start(currentThreadId, "tcpworker", cpusMap, log); + } + + for (unsigned int thread = 0; thread < RecThreadInfo::numTaskThreads(); thread++, currentThreadId++) { + auto& taskInfo = RecThreadInfo::info(currentThreadId); + taskInfo.setTaskThread(); + taskInfo.start(currentThreadId, "task", cpusMap, log); + } + + currentThreadId = 1; auto& info = RecThreadInfo::info(currentThreadId); info.setListener(); + info.setTCPListener(); info.setWorker(); - RecThreadInfo::setThreadId(currentThreadId++); + RecThreadInfo::setThreadId(currentThreadId); recursorThread(); - handlerInfo.thread.join(); - if (handlerInfo.exitCode != 0) { - ret = handlerInfo.exitCode; - } - taskInfo.thread.join(); - if (taskInfo.exitCode != 0) { - ret = taskInfo.exitCode; + for (unsigned int thread = 0; thread < RecThreadInfo::numRecursorThreads(); thread++) { + if (thread == 1) { + continue; + } + auto& tInfo = RecThreadInfo::info(thread); + tInfo.thread.join(); + if (tInfo.exitCode != 0) { + ret = tInfo.exitCode; + } } } else { // Setup RecThreadInfo objects - unsigned int tmp = currentThreadId; + unsigned int currentThreadId = 1; if (RecThreadInfo::weDistributeQueries()) { - for (unsigned int thread = 0; thread < RecThreadInfo::numDistributors(); ++thread) { - RecThreadInfo::info(tmp++).setListener(); + for (unsigned int thread = 0; thread < RecThreadInfo::numDistributors(); thread++, currentThreadId++) { + RecThreadInfo::info(currentThreadId).setListener(); } } - for (unsigned int thread = 0; thread < RecThreadInfo::numWorkers(); ++thread) { - auto& info = RecThreadInfo::info(tmp++); + for (unsigned int thread = 0; thread < RecThreadInfo::numWorkers(); thread++, currentThreadId++) { + auto& info = RecThreadInfo::info(currentThreadId); info.setListener(!RecThreadInfo::weDistributeQueries()); info.setWorker(); } - for (unsigned int thread = 0; thread < RecThreadInfo::numTaskThreads(); ++thread) { - auto& info = RecThreadInfo::info(tmp++); + for (unsigned int thread = 0; thread < RecThreadInfo::numTCPWorkers(); thread++, currentThreadId++) { + auto& info = RecThreadInfo::info(currentThreadId); + info.setListener(); + info.setTCPListener(); + info.setWorker(); + } + for (unsigned int thread = 0; thread < RecThreadInfo::numTaskThreads(); thread++, currentThreadId++) { + auto& info = RecThreadInfo::info(currentThreadId); info.setTaskThread(); } // And now start the actual threads + currentThreadId = 1; if (RecThreadInfo::weDistributeQueries()) { SLOG(g_log << Logger::Warning << "Launching " << RecThreadInfo::numDistributors() << " distributor threads" << endl, log->info(Logr::Notice, "Launching distributor threads", "count", Logging::Loggable(RecThreadInfo::numDistributors()))); - for (unsigned int thread = 0; thread < RecThreadInfo::numDistributors(); ++thread) { + for (unsigned int thread = 0; thread < RecThreadInfo::numDistributors(); thread++, currentThreadId++) { auto& info = RecThreadInfo::info(currentThreadId); - info.start(currentThreadId++, "distr", cpusMap, log); + info.start(currentThreadId, "distr", cpusMap, log); } } SLOG(g_log << Logger::Warning << "Launching " << RecThreadInfo::numWorkers() << " worker threads" << endl, log->info(Logr::Notice, "Launching worker threads", "count", Logging::Loggable(RecThreadInfo::numWorkers()))); - for (unsigned int thread = 0; thread < RecThreadInfo::numWorkers(); ++thread) { + for (unsigned int thread = 0; thread < RecThreadInfo::numWorkers(); thread++, currentThreadId++) { + auto& info = RecThreadInfo::info(currentThreadId); + info.start(currentThreadId, "worker", cpusMap, log); + } + + SLOG(g_log << Logger::Warning << "Launching " << RecThreadInfo::numTCPWorkers() << " tcpworker threads" << endl, + log->info(Logr::Notice, "Launching tcpworker threads", "count", Logging::Loggable(RecThreadInfo::numTCPWorkers()))); + + for (unsigned int thread = 0; thread < RecThreadInfo::numTCPWorkers(); thread++, currentThreadId++) { auto& info = RecThreadInfo::info(currentThreadId); - info.start(currentThreadId++, "worker", cpusMap, log); + info.start(currentThreadId, "tcpworker", cpusMap, log); } - for (unsigned int thread = 0; thread < RecThreadInfo::numTaskThreads(); ++thread) { + for (unsigned int thread = 0; thread < RecThreadInfo::numTaskThreads(); thread++, currentThreadId++) { auto& info = RecThreadInfo::info(currentThreadId); - info.start(currentThreadId++, "task", cpusMap, log); + info.start(currentThreadId, "task", cpusMap, log); } /* This thread handles the web server, carbon, statistics and the control channel */ - auto& info = RecThreadInfo::info(0); + currentThreadId = 0; + auto& info = RecThreadInfo::info(currentThreadId); info.setHandler(); - info.start(0, "web+stat", cpusMap, log); + info.start(currentThreadId, "web+stat", cpusMap, log); for (auto& tInfo : RecThreadInfo::infos()) { tInfo.thread.join(); @@ -1123,8 +1158,8 @@ static void doStats() size_t idx = 0; for (const auto& threadInfo : RecThreadInfo::infos()) { if (threadInfo.isWorker()) { - SLOG(g_log << Logger::Notice << "stats: thread " << idx << " has been distributed " << threadInfo.numberOfDistributedQueries << " queries" << endl, - log->info(Logr::Info, "Queries handled by thread", "thread", Logging::Loggable(idx), "count", Logging::Loggable(threadInfo.numberOfDistributedQueries))); + SLOG(g_log << Logger::Notice << "stats: thread " << idx << " has been distributed " << threadInfo.getNumberOfDistributedQueries() << " queries" << endl, + log->info(Logr::Info, "Queries handled by thread", "thread", Logging::Loggable(idx), "count", Logging::Loggable(threadInfo.getNumberOfDistributedQueries()))); ++idx; } } @@ -1341,7 +1376,7 @@ void broadcastFunction(const pipefunc_t& func) } unsigned int thread = 0; - for (const auto& threadInfo : RecThreadInfo::infos()) { + for (auto& threadInfo : RecThreadInfo::infos()) { if (thread++ == RecThreadInfo::id()) { func(); // don't write to ourselves! continue; @@ -1350,14 +1385,14 @@ void broadcastFunction(const pipefunc_t& func) ThreadMSG* tmsg = new ThreadMSG(); // NOLINT: manual ownership handling tmsg->func = func; tmsg->wantAnswer = true; - if (write(threadInfo.pipes.writeToThread, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) { // NOLINT: sizeof correct + if (write(threadInfo.getPipes().writeToThread, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) { // NOLINT: sizeof correct delete tmsg; // NOLINT: manual ownership handling unixDie("write to thread pipe returned wrong size or error"); } string* resp = nullptr; - if (read(threadInfo.pipes.readFromThread, &resp, sizeof(resp)) != sizeof(resp)) { // NOLINT: sizeof correct + if (read(threadInfo.getPipes().readFromThread, &resp, sizeof(resp)) != sizeof(resp)) { // NOLINT: sizeof correct unixDie("read from thread pipe returned wrong size or error"); } @@ -1420,12 +1455,12 @@ T broadcastAccFunction(const std::function& func) unsigned int thread = 0; T ret = T(); - for (const auto& threadInfo : RecThreadInfo::infos()) { + for (auto& threadInfo : RecThreadInfo::infos()) { if (thread++ == RecThreadInfo::id()) { continue; } - const auto& tps = threadInfo.pipes; + const auto& tps = threadInfo.getPipes(); ThreadMSG* tmsg = new ThreadMSG(); // NOLINT: manual ownership handling tmsg->func = [func] { return voider(func); }; tmsg->wantAnswer = true; @@ -1720,50 +1755,45 @@ static void initDistribution(Logr::log_t log) g_reusePort = ::arg().mustDo("reuseport"); #endif - RecThreadInfo::infos().resize(RecThreadInfo::numHandlers() + RecThreadInfo::numDistributors() + RecThreadInfo::numWorkers() + RecThreadInfo::numTaskThreads()); + RecThreadInfo::infos().resize(RecThreadInfo::numRecursorThreads()); if (g_reusePort) { + unsigned int threadNum = 1; if (RecThreadInfo::weDistributeQueries()) { /* first thread is the handler, then distributors */ - for (unsigned int threadId = 1; threadId <= RecThreadInfo::numDistributors(); threadId++) { - auto& info = RecThreadInfo::info(threadId); - auto& deferredAdds = info.deferredAdds; - auto& tcpSockets = info.tcpSockets; + for (unsigned int i = 0; i < RecThreadInfo::numDistributors(); i++, threadNum++) { + auto& info = RecThreadInfo::info(threadNum); + auto& deferredAdds = info.getDeferredAdds(); makeUDPServerSockets(deferredAdds, log); - makeTCPServerSockets(deferredAdds, tcpSockets, log); } } else { /* first thread is the handler, there is no distributor here and workers are accepting queries */ - for (unsigned int threadId = 1; threadId <= RecThreadInfo::numWorkers(); threadId++) { - auto& info = RecThreadInfo::info(threadId); - auto& deferredAdds = info.deferredAdds; - auto& tcpSockets = info.tcpSockets; + for (unsigned int i = 0; i < RecThreadInfo::numWorkers(); i++, threadNum++) { + auto& info = RecThreadInfo::info(threadNum); + auto& deferredAdds = info.getDeferredAdds(); makeUDPServerSockets(deferredAdds, log); - makeTCPServerSockets(deferredAdds, tcpSockets, log); } } + threadNum = 1 + RecThreadInfo::numDistributors() + RecThreadInfo::numWorkers(); + for (unsigned int i = 0; i < RecThreadInfo::numTCPWorkers(); i++, threadNum++) { + auto& info = RecThreadInfo::info(threadNum); + auto& deferredAdds = info.getDeferredAdds(); + auto& tcpSockets = info.getTCPSockets(); + makeTCPServerSockets(deferredAdds, tcpSockets, log); + } } else { std::set tcpSockets; /* we don't have reuseport so we can only open one socket per listening addr:port and everyone will listen on it */ makeUDPServerSockets(g_deferredAdds, log); - makeTCPServerSockets(g_deferredAdds, tcpSockets, log); + makeTCPServerSockets(g_deferredTCPAdds, tcpSockets, log); - /* every listener (so distributor if g_weDistributeQueries, workers otherwise) - needs to listen to the shared sockets */ - if (RecThreadInfo::weDistributeQueries()) { - /* first thread is the handler, then distributors */ - for (unsigned int threadId = 1; threadId <= RecThreadInfo::numDistributors(); threadId++) { - RecThreadInfo::info(threadId).tcpSockets = tcpSockets; - } - } - else { - /* first thread is the handler, there is no distributor here and workers are accepting queries */ - for (unsigned int threadId = 1; threadId <= RecThreadInfo::numWorkers(); threadId++) { - RecThreadInfo::info(threadId).tcpSockets = tcpSockets; - } + // TCP queries are handled by TCP workers + for (unsigned int i = 0; i < RecThreadInfo::numTCPWorkers(); i++) { + auto& info = RecThreadInfo::info(i + 1 + RecThreadInfo::numDistributors() + RecThreadInfo::numWorkers()); + info.setTCPSockets(tcpSockets); } } } @@ -1938,7 +1968,7 @@ static void initSuffixMatchNodes([[maybe_unused]] Logr::log_t log) vector parts; stringtok(parts, ::arg()["dot-to-auth-names"], " ,"); #ifndef HAVE_DNS_OVER_TLS - if (parts.size()) { + if (!parts.empty()) { SLOG(g_log << Logger::Error << "dot-to-auth-names setting contains names, but Recursor was built without DNS over TLS support. Setting will be ignored." << endl, log->info(Logr::Error, "dot-to-auth-names setting contains names, but Recursor was built without DNS over TLS support. Setting will be ignored")); } @@ -2102,6 +2132,12 @@ static int serviceMain(Logr::log_t log) log->info(Logr::Warning, "Asked to run with 0 threads, raising to 1 instead")); RecThreadInfo::setNumWorkerThreads(1); } + RecThreadInfo::setNumTCPWorkerThreads(1); // XXX + if (RecThreadInfo::numTCPWorkers() < 1) { + SLOG(g_log << Logger::Warning << "Asked to run with 0 tcpthreads, raising to 1 instead" << endl, + log->info(Logr::Warning, "Asked to run with 0 tcpthreads, raising to 1 instead")); + RecThreadInfo::setNumTCPWorkerThreads(1); + } g_maxMThreads = ::arg().asNum("max-mthreads"); @@ -2241,7 +2277,7 @@ static void handlePipeRequest(int fileDesc, FDMultiplexer::funcparam_t& /* var * } } if (tmsg->wantAnswer) { - if (write(RecThreadInfo::self().pipes.writeFromThread, &resp, sizeof(resp)) != sizeof(resp)) { + if (write(RecThreadInfo::self().getPipes().writeFromThread, &resp, sizeof(resp)) != sizeof(resp)) { delete tmsg; // NOLINT: manual ownership handling unixDie("write to thread pipe returned wrong size or error"); } @@ -2262,9 +2298,8 @@ static void handleRCC(int fileDesc, FDMultiplexer::funcparam_t& /* var */) SLOG(g_log << Logger::Info << "Received rec_control command '" << msg << "' via controlsocket" << endl, log->info(Logr::Info, "Received rec_control command via control socket", "command", Logging::Loggable(msg))); - RecursorControlParser rcp; RecursorControlParser::func_t* command = nullptr; - auto answer = rcp.getAnswer(clientfd, msg, &command); + auto answer = RecursorControlParser::getAnswer(clientfd, msg, &command); g_rcc.send(clientfd, answer); command(); @@ -2293,7 +2328,6 @@ public: void runIfDue(struct timeval& now, const std::function& function) { if (last_run < now - period) { - // cerr << RecThreadInfo::id() << ' ' << name << ' ' << now.tv_sec << '.' << now.tv_usec << " running" << endl; function(); Utility::gettimeofday(&last_run); now = last_run; @@ -2567,10 +2601,10 @@ static void runLuaMaintenance(RecThreadInfo& threadInfo, time_t& last_lua_mainte static void runTCPMaintenance(RecThreadInfo& threadInfo, bool& listenOnTCP, unsigned int maxTcpClients) { - if (threadInfo.isListener()) { + if (threadInfo.isTCPListener()) { if (listenOnTCP) { if (TCPConnection::getCurrentConnections() > maxTcpClients) { // shutdown, too many connections - for (const auto fileDesc : threadInfo.tcpSockets) { + for (const auto fileDesc : threadInfo.getTCPSockets()) { t_fdm->removeReadFD(fileDesc); } listenOnTCP = false; @@ -2578,7 +2612,7 @@ static void runTCPMaintenance(RecThreadInfo& threadInfo, bool& listenOnTCP, unsi } else { if (TCPConnection::getCurrentConnections() <= maxTcpClients) { // reenable - for (const auto fileDesc : threadInfo.tcpSockets) { + for (const auto fileDesc : threadInfo.getTCPSockets()) { t_fdm->addReadFD(fileDesc, handleNewTCPQuestion); } listenOnTCP = true; @@ -2741,7 +2775,7 @@ static void recursorThread() t_bogusqueryring->set_capacity(ringsize); } g_multiTasker = std::make_unique(::arg().asNum("stack-size"), ::arg().asNum("stack-cache-size")); - threadInfo.mt = g_multiTasker.get(); + threadInfo.setMT(g_multiTasker.get()); /* start protobuf export threads if needed */ auto luaconfsLocal = g_luaconfs.getLocal(); @@ -2756,7 +2790,7 @@ static void recursorThread() std::unique_ptr rws; - t_fdm->addReadFD(threadInfo.pipes.readToThread, handlePipeRequest); + t_fdm->addReadFD(threadInfo.getPipes().readToThread, handlePipeRequest); if (threadInfo.isHandler()) { if (::arg().mustDo("webserver")) { @@ -2775,18 +2809,18 @@ static void recursorThread() log->info(Logr::Info, "Enabled multiplexer", "name", Logging::Loggable(t_fdm->getName()))); } else { - t_fdm->addReadFD(threadInfo.pipes.readQueriesToThread, handlePipeRequest); + t_fdm->addReadFD(threadInfo.getPipes().readQueriesToThread, handlePipeRequest); if (threadInfo.isListener()) { if (g_reusePort) { /* then every listener has its own FDs */ - for (const auto& deferred : threadInfo.deferredAdds) { + for (const auto& deferred : threadInfo.getDeferredAdds()) { t_fdm->addReadFD(deferred.first, deferred.second); } } else { /* otherwise all listeners are listening on the same ones */ - for (const auto& deferred : g_deferredAdds) { + for (const auto& deferred : threadInfo.isTCPListener() ? g_deferredTCPAdds : g_deferredAdds) { t_fdm->addReadFD(deferred.first, deferred.second); } } @@ -3494,10 +3528,10 @@ static string* pleaseUseNewTraceRegex(const std::string& newRegex, int file) } t_traceRegex = std::make_shared(newRegex); t_tracefd = file; - return new string("ok\n"); + return new string("ok\n"); // NOLINT(cppcoreguidelines-owning-memory): it's the API } catch (const PDNSException& ae) { - return new string(ae.reason + "\n"); + return new string(ae.reason + "\n"); // NOLINT(cppcoreguidelines-owning-memory): it's the API } } diff --git a/pdns/recursordist/rec-main.hh b/pdns/recursordist/rec-main.hh index b048da71a7..d254274b33 100644 --- a/pdns/recursordist/rec-main.hh +++ b/pdns/recursordist/rec-main.hh @@ -279,7 +279,6 @@ extern std::set g_avoidUdpSourcePorts; /* without reuseport, all listeners share the same sockets */ typedef vector>> deferredAdd_t; -extern deferredAdd_t g_deferredAdds; typedef map tcpClientCounts_t; extern thread_local std::unique_ptr t_tcpClientCounts; @@ -346,9 +345,9 @@ public: return s_threadInfos.at(t_id); } - static RecThreadInfo& info(unsigned int i) + static RecThreadInfo& info(unsigned int index) { - return s_threadInfos.at(i); + return s_threadInfos.at(index); } static vector& infos() @@ -356,7 +355,7 @@ public: return s_threadInfos; } - bool isDistributor() const + [[nodiscard]] bool isDistributor() const { if (t_id == 0) { return false; @@ -364,7 +363,7 @@ public: return s_weDistributeQueries && listener; } - bool isHandler() const + [[nodiscard]] bool isHandler() const { if (t_id == 0) { return true; @@ -372,17 +371,21 @@ public: return handler; } - bool isWorker() const + [[nodiscard]] bool isWorker() const { return worker; } - bool isListener() const + [[nodiscard]] bool isListener() const { return listener; } + [[nodiscard]] bool isTCPListener() const + { + return tcplistener; + } - bool isTaskThread() const + [[nodiscard]] bool isTaskThread() const { return taskThread; } @@ -402,6 +405,11 @@ public: listener = flag; } + void setTCPListener(bool flag = true) + { + tcplistener = flag; + } + void setTaskThread() { taskThread = true; @@ -412,12 +420,12 @@ public: return t_id; } - static void setThreadId(unsigned int id) + static void setThreadId(unsigned int arg) { - t_id = id; + t_id = arg; } - std::string getName() const + [[nodiscard]] std::string getName() const { return name; } @@ -437,6 +445,11 @@ public: return s_numWorkerThreads; } + static unsigned int numTCPWorkers() + { + return s_numTCPWorkerThreads; + } + static unsigned int numDistributors() { return s_numDistributorThreads; @@ -457,6 +470,11 @@ public: s_numWorkerThreads = n; } + static void setNumTCPWorkerThreads(unsigned int n) + { + s_numTCPWorkerThreads = n; + } + static void setNumDistributorThreads(unsigned int n) { s_numDistributorThreads = n; @@ -464,17 +482,58 @@ public: static unsigned int numRecursorThreads() { - return numHandlers() + numDistributors() + numWorkers() + numTaskThreads(); + return numHandlers() + numDistributors() + numWorkers() + numTCPWorkers() + numTaskThreads(); } static int runThreads(Logr::log_t); static void makeThreadPipes(Logr::log_t); - void setExitCode(int e) + void setExitCode(int n) { - exitCode = e; + exitCode = n; } + std::set& getTCPSockets() + { + return tcpSockets; + } + + void setTCPSockets(std::set& socks) + { + tcpSockets = socks; + } + + deferredAdd_t& getDeferredAdds() + { + return deferredAdds; + } + + ThreadPipeSet& getPipes() + { + return pipes; + } + + [[nodiscard]] uint64_t getNumberOfDistributedQueries() const + { + return numberOfDistributedQueries; + } + + void incNumberOfDistributedQueries() + { + numberOfDistributedQueries++; + } + + MT_t* getMT() + { + return mt; + } + + void setMT(MT_t* theMT) + { + mt = theMT; + } + +private: // FD corresponding to TCP sockets this thread is listening on. // These FDs are also in deferredAdds when we have one socket per // listener, and in g_deferredAdds instead. @@ -488,8 +547,7 @@ public: MT_t* mt{nullptr}; uint64_t numberOfDistributedQueries{0}; -private: - void start(unsigned int id, const string& name, const std::map>& cpusMap, Logr::log_t); + void start(unsigned int theId, const string& name, const std::map>& cpusMap, Logr::log_t); std::string name; std::thread thread; @@ -499,6 +557,8 @@ private: bool handler{false}; // accept incoming queries (and distributes them to the workers if pdns-distributes-queries is set) bool listener{false}; + // accept incoming TCP queries (and distributes them to the workers if pdns-distributes-queries is set) + bool tcplistener{false}; // process queries bool worker{false}; // run async tasks: from TaskQueue and ZoneToCache @@ -509,6 +569,7 @@ private: static bool s_weDistributeQueries; // if true, 1 or more threads listen on the incoming query sockets and distribute them to workers static unsigned int s_numDistributorThreads; static unsigned int s_numWorkerThreads; + static unsigned int s_numTCPWorkerThreads; }; struct ThreadMSG -- 2.47.2