From: Otto Moerbeek Date: Fri, 1 Sep 2023 07:03:40 +0000 (+0200) Subject: Adopt code change suggestions from @rgacogne's review X-Git-Tag: rec-5.0.0-alpha2~58^2~5 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=a808ca6966fbb1ff028409dbe451769118056b9b;p=thirdparty%2Fpdns.git Adopt code change suggestions from @rgacogne's review --- diff --git a/pdns/recursordist/pdns_recursor.cc b/pdns/recursordist/pdns_recursor.cc index d8da4bd7a4..3eafd3f69c 100644 --- a/pdns/recursordist/pdns_recursor.cc +++ b/pdns/recursordist/pdns_recursor.cc @@ -2723,27 +2723,27 @@ static unsigned int getWorkerLoad(size_t workerIdx) static unsigned int selectWorker(unsigned int hash) { - assert(RecThreadInfo::numWorkers() != 0); // NOLINT: assert implementation + assert(RecThreadInfo::numUDPWorkers() != 0); // NOLINT: assert implementation if (g_balancingFactor == 0) { - return RecThreadInfo::numHandlers() + RecThreadInfo::numDistributors() + (hash % RecThreadInfo::numWorkers()); + return RecThreadInfo::numHandlers() + RecThreadInfo::numDistributors() + (hash % RecThreadInfo::numUDPWorkers()); } /* we start with one, representing the query we are currently handling */ double currentLoad = 1; - std::vector load(RecThreadInfo::numWorkers()); - for (size_t idx = 0; idx < RecThreadInfo::numWorkers(); idx++) { + std::vector load(RecThreadInfo::numUDPWorkers()); + for (size_t idx = 0; idx < RecThreadInfo::numUDPWorkers(); idx++) { load[idx] = getWorkerLoad(idx); currentLoad += load[idx]; } - double targetLoad = (currentLoad / RecThreadInfo::numWorkers()) * g_balancingFactor; + double targetLoad = (currentLoad / RecThreadInfo::numUDPWorkers()) * g_balancingFactor; - unsigned int worker = hash % RecThreadInfo::numWorkers(); + unsigned int worker = hash % RecThreadInfo::numUDPWorkers(); /* at least one server has to be at or below the average load */ if (load[worker] > targetLoad) { ++t_Counters.at(rec::Counter::rebalancedQueries); do { - worker = (worker + 1) % RecThreadInfo::numWorkers(); + worker = (worker + 1) % RecThreadInfo::numUDPWorkers(); } while (load[worker] > targetLoad); } @@ -2777,7 +2777,7 @@ void distributeAsyncFunction(const string& packet, const pipefunc_t& func) was full, let's try another one */ unsigned int newTarget = 0; do { - newTarget = RecThreadInfo::numHandlers() + RecThreadInfo::numDistributors() + dns_random(RecThreadInfo::numWorkers()); + newTarget = RecThreadInfo::numHandlers() + RecThreadInfo::numDistributors() + dns_random(RecThreadInfo::numUDPWorkers()); } while (newTarget == target); if (!trySendingQueryToWorker(newTarget, tmsg)) { diff --git a/pdns/recursordist/rec-main.cc b/pdns/recursordist/rec-main.cc index 59ab27e7ed..997882f9cb 100644 --- a/pdns/recursordist/rec-main.cc +++ b/pdns/recursordist/rec-main.cc @@ -106,8 +106,8 @@ std::shared_ptr g_slogudpin; std::shared_ptr g_slogudpout; /* without reuseport, all listeners share the same sockets */ -static deferredAdd_t g_deferredAdds; -static deferredAdd_t g_deferredTCPAdds; +static deferredAdd_t s_deferredUDPadds; +static deferredAdd_t s_deferredTCPadds; /* first we have the handler thread, t_id == 0 (some other helper threads like SNMP might have t_id == 0 as well) @@ -120,7 +120,7 @@ thread_local std::unique_ptr t_proxyMapping; bool RecThreadInfo::s_weDistributeQueries; // if true, 1 or more threads listen on the incoming query sockets and distribute them to workers unsigned int RecThreadInfo::s_numDistributorThreads; -unsigned int RecThreadInfo::s_numWorkerThreads; +unsigned int RecThreadInfo::s_numUDPWorkerThreads; unsigned int RecThreadInfo::s_numTCPWorkerThreads; thread_local unsigned int RecThreadInfo::t_id; @@ -222,7 +222,7 @@ int RecThreadInfo::runThreads(Logr::log_t log) int ret = EXIT_SUCCESS; const auto cpusMap = parseCPUMap(log); - if (RecThreadInfo::numDistributors() + RecThreadInfo::numWorkers() == 1) { + if (RecThreadInfo::numDistributors() + RecThreadInfo::numUDPWorkers() == 1) { SLOG(g_log << Logger::Warning << "Operating with single distributor/worker thread" << endl, log->info(Logr::Notice, "Operating with single distributor/worker thread")); @@ -236,7 +236,6 @@ int RecThreadInfo::runThreads(Logr::log_t log) currentThreadId = 2; for (unsigned int thread = 0; thread < RecThreadInfo::numTCPWorkers(); thread++, currentThreadId++) { auto& info = RecThreadInfo::info(currentThreadId); - info.setListener(); info.setTCPListener(); info.setWorker(); info.start(currentThreadId, "tcpworker", cpusMap, log); @@ -274,14 +273,13 @@ int RecThreadInfo::runThreads(Logr::log_t log) RecThreadInfo::info(currentThreadId).setListener(); } } - for (unsigned int thread = 0; thread < RecThreadInfo::numWorkers(); thread++, currentThreadId++) { + for (unsigned int thread = 0; thread < RecThreadInfo::numUDPWorkers(); thread++, currentThreadId++) { auto& info = RecThreadInfo::info(currentThreadId); info.setListener(!RecThreadInfo::weDistributeQueries()); info.setWorker(); } for (unsigned int thread = 0; thread < RecThreadInfo::numTCPWorkers(); thread++, currentThreadId++) { auto& info = RecThreadInfo::info(currentThreadId); - info.setListener(); info.setTCPListener(); info.setWorker(); } @@ -300,10 +298,10 @@ int RecThreadInfo::runThreads(Logr::log_t log) info.start(currentThreadId, "distr", cpusMap, log); } } - SLOG(g_log << Logger::Warning << "Launching " << RecThreadInfo::numWorkers() << " worker threads" << endl, - log->info(Logr::Notice, "Launching worker threads", "count", Logging::Loggable(RecThreadInfo::numWorkers()))); + SLOG(g_log << Logger::Warning << "Launching " << RecThreadInfo::numUDPWorkers() << " worker threads" << endl, + log->info(Logr::Notice, "Launching worker threads", "count", Logging::Loggable(RecThreadInfo::numUDPWorkers()))); - for (unsigned int thread = 0; thread < RecThreadInfo::numWorkers(); thread++, currentThreadId++) { + for (unsigned int thread = 0; thread < RecThreadInfo::numUDPWorkers(); thread++, currentThreadId++) { auto& info = RecThreadInfo::info(currentThreadId); info.start(currentThreadId, "worker", cpusMap, log); } @@ -910,8 +908,8 @@ static void checkLinuxIPv6Limits([[maybe_unused]] Logr::log_t log) static void checkOrFixFDS(Logr::log_t log) { unsigned int availFDs = getFilenumLimit(); - unsigned int wantFDs = g_maxMThreads * (RecThreadInfo::numWorkers() + RecThreadInfo::numTCPWorkers()) + 25; // even healthier margin than before - wantFDs += (RecThreadInfo::numWorkers() + RecThreadInfo::numTCPWorkers()) * TCPOutConnectionManager::s_maxIdlePerThread; + unsigned int wantFDs = g_maxMThreads * (RecThreadInfo::numUDPWorkers() + RecThreadInfo::numTCPWorkers()) + 25; // even healthier margin than before + wantFDs += (RecThreadInfo::numUDPWorkers() + RecThreadInfo::numTCPWorkers()) * TCPOutConnectionManager::s_maxIdlePerThread; if (wantFDs > availFDs) { unsigned int hardlimit = getFilenumLimit(true); @@ -921,7 +919,7 @@ static void checkOrFixFDS(Logr::log_t log) log->info(Logr::Warning, "Raised soft limit on number of filedescriptors to match max-mthreads and threads settings", "limit", Logging::Loggable(wantFDs))); } else { - auto newval = (hardlimit - 25 - TCPOutConnectionManager::s_maxIdlePerThread) / (RecThreadInfo::numWorkers() + RecThreadInfo::numTCPWorkers()); + auto newval = (hardlimit - 25 - TCPOutConnectionManager::s_maxIdlePerThread) / (RecThreadInfo::numUDPWorkers() + RecThreadInfo::numTCPWorkers()); SLOG(g_log << Logger::Warning << "Insufficient number of filedescriptors available for max-mthreads*threads setting! (" << hardlimit << " < " << wantFDs << "), reducing max-mthreads to " << newval << endl, log->info(Logr::Warning, "Insufficient number of filedescriptors available for max-mthreads*threads setting! Reducing max-mthreads", "hardlimit", Logging::Loggable(hardlimit), "want", Logging::Loggable(wantFDs), "max-mthreads", Logging::Loggable(newval))); g_maxMThreads = newval; @@ -1375,7 +1373,7 @@ void broadcastFunction(const pipefunc_t& func) } unsigned int thread = 0; - for (auto& threadInfo : RecThreadInfo::infos()) { + for (const auto& threadInfo : RecThreadInfo::infos()) { if (thread++ == RecThreadInfo::id()) { func(); // don't write to ourselves! continue; @@ -1454,7 +1452,7 @@ T broadcastAccFunction(const std::function& func) unsigned int thread = 0; T ret = T(); - for (auto& threadInfo : RecThreadInfo::infos()) { + for (const auto& threadInfo : RecThreadInfo::infos()) { if (thread++ == RecThreadInfo::id()) { continue; } @@ -1768,13 +1766,13 @@ static void initDistribution(Logr::log_t log) } else { /* first thread is the handler, there is no distributor here and workers are accepting queries */ - for (unsigned int i = 0; i < RecThreadInfo::numWorkers(); i++, threadNum++) { + for (unsigned int i = 0; i < RecThreadInfo::numUDPWorkers(); i++, threadNum++) { auto& info = RecThreadInfo::info(threadNum); auto& deferredAdds = info.getDeferredAdds(); makeUDPServerSockets(deferredAdds, log); } } - threadNum = 1 + RecThreadInfo::numDistributors() + RecThreadInfo::numWorkers(); + threadNum = 1 + RecThreadInfo::numDistributors() + RecThreadInfo::numUDPWorkers(); for (unsigned int i = 0; i < RecThreadInfo::numTCPWorkers(); i++, threadNum++) { auto& info = RecThreadInfo::info(threadNum); auto& deferredAdds = info.getDeferredAdds(); @@ -1786,12 +1784,12 @@ static void initDistribution(Logr::log_t log) std::set tcpSockets; /* we don't have reuseport so we can only open one socket per listening addr:port and everyone will listen on it */ - makeUDPServerSockets(g_deferredAdds, log); - makeTCPServerSockets(g_deferredTCPAdds, tcpSockets, log); + makeUDPServerSockets(s_deferredUDPadds, log); + makeTCPServerSockets(s_deferredTCPadds, tcpSockets, log); // TCP queries are handled by TCP workers for (unsigned int i = 0; i < RecThreadInfo::numTCPWorkers(); i++) { - auto& info = RecThreadInfo::info(i + 1 + RecThreadInfo::numDistributors() + RecThreadInfo::numWorkers()); + auto& info = RecThreadInfo::info(i + 1 + RecThreadInfo::numDistributors() + RecThreadInfo::numUDPWorkers()); info.setTCPSockets(tcpSockets); } } @@ -2125,11 +2123,11 @@ static int serviceMain(Logr::log_t log) g_paddingOutgoing = ::arg().mustDo("edns-padding-out"); RecThreadInfo::setNumDistributorThreads(::arg().asNum("distributor-threads")); - RecThreadInfo::setNumWorkerThreads(::arg().asNum("threads")); - if (RecThreadInfo::numWorkers() < 1) { + RecThreadInfo::setNumUDPWorkerThreads(::arg().asNum("threads")); + if (RecThreadInfo::numUDPWorkers() < 1) { SLOG(g_log << Logger::Warning << "Asked to run with 0 threads, raising to 1 instead" << endl, log->info(Logr::Warning, "Asked to run with 0 threads, raising to 1 instead")); - RecThreadInfo::setNumWorkerThreads(1); + RecThreadInfo::setNumUDPWorkerThreads(1); } RecThreadInfo::setNumTCPWorkerThreads(1); // XXX if (RecThreadInfo::numTCPWorkers() < 1) { @@ -2748,7 +2746,7 @@ static void recursorThread() } } - unsigned int ringsize = ::arg().asNum("stats-ringbuffer-entries") / RecThreadInfo::numWorkers(); + unsigned int ringsize = ::arg().asNum("stats-ringbuffer-entries") / RecThreadInfo::numUDPWorkers(); if (ringsize != 0) { t_remotes = std::make_unique(); if (RecThreadInfo::weDistributeQueries()) { @@ -2819,7 +2817,7 @@ static void recursorThread() } else { /* otherwise all listeners are listening on the same ones */ - for (const auto& deferred : threadInfo.isTCPListener() ? g_deferredTCPAdds : g_deferredAdds) { + for (const auto& deferred : threadInfo.isTCPListener() ? s_deferredTCPadds : s_deferredUDPadds) { t_fdm->addReadFD(deferred.first, deferred.second); } } diff --git a/pdns/recursordist/rec-main.hh b/pdns/recursordist/rec-main.hh index d254274b33..daeb0dc77f 100644 --- a/pdns/recursordist/rec-main.hh +++ b/pdns/recursordist/rec-main.hh @@ -376,10 +376,13 @@ public: return worker; } + // UDP or TCP listener? [[nodiscard]] bool isListener() const { return listener; } + + // A TCP-only listener? [[nodiscard]] bool isTCPListener() const { return tcplistener; @@ -407,6 +410,7 @@ public: void setTCPListener(bool flag = true) { + setListener(flag); tcplistener = flag; } @@ -440,9 +444,9 @@ public: return 1; } - static unsigned int numWorkers() + static unsigned int numUDPWorkers() { - return s_numWorkerThreads; + return s_numUDPWorkerThreads; } static unsigned int numTCPWorkers() @@ -465,9 +469,9 @@ public: s_weDistributeQueries = flag; } - static void setNumWorkerThreads(unsigned int n) + static void setNumUDPWorkerThreads(unsigned int n) { - s_numWorkerThreads = n; + s_numUDPWorkerThreads = n; } static void setNumTCPWorkerThreads(unsigned int n) @@ -482,7 +486,7 @@ public: static unsigned int numRecursorThreads() { - return numHandlers() + numDistributors() + numWorkers() + numTCPWorkers() + numTaskThreads(); + return numHandlers() + numDistributors() + numUDPWorkers() + numTCPWorkers() + numTaskThreads(); } static int runThreads(Logr::log_t); @@ -508,7 +512,7 @@ public: return deferredAdds; } - ThreadPipeSet& getPipes() + const ThreadPipeSet& getPipes() const { return pipes; } @@ -547,7 +551,7 @@ private: MT_t* mt{nullptr}; uint64_t numberOfDistributedQueries{0}; - void start(unsigned int theId, const string& name, const std::map>& cpusMap, Logr::log_t); + void start(unsigned int tid, const string& tname, const std::map>& cpusMap, Logr::log_t); std::string name; std::thread thread; @@ -568,7 +572,7 @@ private: static std::vector s_threadInfos; static bool s_weDistributeQueries; // if true, 1 or more threads listen on the incoming query sockets and distribute them to workers static unsigned int s_numDistributorThreads; - static unsigned int s_numWorkerThreads; + static unsigned int s_numUDPWorkerThreads; static unsigned int s_numTCPWorkerThreads; }; diff --git a/pdns/recursordist/rec-tcp.cc b/pdns/recursordist/rec-tcp.cc index e8b9da98c3..a5a921cd49 100644 --- a/pdns/recursordist/rec-tcp.cc +++ b/pdns/recursordist/rec-tcp.cc @@ -56,7 +56,6 @@ // And this approach was implemented in https://github.com/PowerDNS/pdns/pull/13195. The distributor // and worker thread(s) now no longe process TCP queries. - size_t g_tcpMaxQueriesPerConn; unsigned int g_maxTCPPerClient; int g_tcpTimeout;