]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
Adopt code change suggestions from @rgacogne's review
authorOtto Moerbeek <otto.moerbeek@open-xchange.com>
Fri, 1 Sep 2023 07:03:40 +0000 (09:03 +0200)
committerOtto Moerbeek <otto.moerbeek@open-xchange.com>
Wed, 13 Sep 2023 11:20:54 +0000 (13:20 +0200)
pdns/recursordist/pdns_recursor.cc
pdns/recursordist/rec-main.cc
pdns/recursordist/rec-main.hh
pdns/recursordist/rec-tcp.cc

index d8da4bd7a41b2e5a6a78e6e5c10b56f63cb9c00f..3eafd3f69c2d510e89aad371167386f462fe72ca 100644 (file)
@@ -2723,27 +2723,27 @@ static unsigned int getWorkerLoad(size_t workerIdx)
 
 static unsigned int selectWorker(unsigned int hash)
 {
-  assert(RecThreadInfo::numWorkers() != 0); // NOLINT: assert implementation
+  assert(RecThreadInfo::numUDPWorkers() != 0); // NOLINT: assert implementation
   if (g_balancingFactor == 0) {
-    return RecThreadInfo::numHandlers() + RecThreadInfo::numDistributors() + (hash % RecThreadInfo::numWorkers());
+    return RecThreadInfo::numHandlers() + RecThreadInfo::numDistributors() + (hash % RecThreadInfo::numUDPWorkers());
   }
 
   /* we start with one, representing the query we are currently handling */
   double currentLoad = 1;
-  std::vector<unsigned int> load(RecThreadInfo::numWorkers());
-  for (size_t idx = 0; idx < RecThreadInfo::numWorkers(); idx++) {
+  std::vector<unsigned int> load(RecThreadInfo::numUDPWorkers());
+  for (size_t idx = 0; idx < RecThreadInfo::numUDPWorkers(); idx++) {
     load[idx] = getWorkerLoad(idx);
     currentLoad += load[idx];
   }
 
-  double targetLoad = (currentLoad / RecThreadInfo::numWorkers()) * g_balancingFactor;
+  double targetLoad = (currentLoad / RecThreadInfo::numUDPWorkers()) * g_balancingFactor;
 
-  unsigned int worker = hash % RecThreadInfo::numWorkers();
+  unsigned int worker = hash % RecThreadInfo::numUDPWorkers();
   /* at least one server has to be at or below the average load */
   if (load[worker] > targetLoad) {
     ++t_Counters.at(rec::Counter::rebalancedQueries);
     do {
-      worker = (worker + 1) % RecThreadInfo::numWorkers();
+      worker = (worker + 1) % RecThreadInfo::numUDPWorkers();
     } while (load[worker] > targetLoad);
   }
 
@@ -2777,7 +2777,7 @@ void distributeAsyncFunction(const string& packet, const pipefunc_t& func)
        was full, let's try another one */
     unsigned int newTarget = 0;
     do {
-      newTarget = RecThreadInfo::numHandlers() + RecThreadInfo::numDistributors() + dns_random(RecThreadInfo::numWorkers());
+      newTarget = RecThreadInfo::numHandlers() + RecThreadInfo::numDistributors() + dns_random(RecThreadInfo::numUDPWorkers());
     } while (newTarget == target);
 
     if (!trySendingQueryToWorker(newTarget, tmsg)) {
index 59ab27e7ed67f323fb0fb57ac3d76278e2980d41..997882f9cb8066e63ae73dbec4e05201e8ac37a3 100644 (file)
@@ -106,8 +106,8 @@ std::shared_ptr<Logr::Logger> g_slogudpin;
 std::shared_ptr<Logr::Logger> g_slogudpout;
 
 /* without reuseport, all listeners share the same sockets */
-static deferredAdd_t g_deferredAdds;
-static deferredAdd_t g_deferredTCPAdds;
+static deferredAdd_t s_deferredUDPadds;
+static deferredAdd_t s_deferredTCPadds;
 
 /* first we have the handler thread, t_id == 0 (some other
    helper threads like SNMP might have t_id == 0 as well)
@@ -120,7 +120,7 @@ thread_local std::unique_ptr<ProxyMapping> t_proxyMapping;
 
 bool RecThreadInfo::s_weDistributeQueries; // if true, 1 or more threads listen on the incoming query sockets and distribute them to workers
 unsigned int RecThreadInfo::s_numDistributorThreads;
-unsigned int RecThreadInfo::s_numWorkerThreads;
+unsigned int RecThreadInfo::s_numUDPWorkerThreads;
 unsigned int RecThreadInfo::s_numTCPWorkerThreads;
 thread_local unsigned int RecThreadInfo::t_id;
 
@@ -222,7 +222,7 @@ int RecThreadInfo::runThreads(Logr::log_t log)
   int ret = EXIT_SUCCESS;
   const auto cpusMap = parseCPUMap(log);
 
-  if (RecThreadInfo::numDistributors() + RecThreadInfo::numWorkers() == 1) {
+  if (RecThreadInfo::numDistributors() + RecThreadInfo::numUDPWorkers() == 1) {
     SLOG(g_log << Logger::Warning << "Operating with single distributor/worker thread" << endl,
          log->info(Logr::Notice, "Operating with single distributor/worker thread"));
 
@@ -236,7 +236,6 @@ int RecThreadInfo::runThreads(Logr::log_t log)
     currentThreadId = 2;
     for (unsigned int thread = 0; thread < RecThreadInfo::numTCPWorkers(); thread++, currentThreadId++) {
       auto& info = RecThreadInfo::info(currentThreadId);
-      info.setListener();
       info.setTCPListener();
       info.setWorker();
       info.start(currentThreadId, "tcpworker", cpusMap, log);
@@ -274,14 +273,13 @@ int RecThreadInfo::runThreads(Logr::log_t log)
         RecThreadInfo::info(currentThreadId).setListener();
       }
     }
-    for (unsigned int thread = 0; thread < RecThreadInfo::numWorkers(); thread++, currentThreadId++) {
+    for (unsigned int thread = 0; thread < RecThreadInfo::numUDPWorkers(); thread++, currentThreadId++) {
       auto& info = RecThreadInfo::info(currentThreadId);
       info.setListener(!RecThreadInfo::weDistributeQueries());
       info.setWorker();
     }
     for (unsigned int thread = 0; thread < RecThreadInfo::numTCPWorkers(); thread++, currentThreadId++) {
       auto& info = RecThreadInfo::info(currentThreadId);
-      info.setListener();
       info.setTCPListener();
       info.setWorker();
     }
@@ -300,10 +298,10 @@ int RecThreadInfo::runThreads(Logr::log_t log)
         info.start(currentThreadId, "distr", cpusMap, log);
       }
     }
-    SLOG(g_log << Logger::Warning << "Launching " << RecThreadInfo::numWorkers() << " worker threads" << endl,
-         log->info(Logr::Notice, "Launching worker threads", "count", Logging::Loggable(RecThreadInfo::numWorkers())));
+    SLOG(g_log << Logger::Warning << "Launching " << RecThreadInfo::numUDPWorkers() << " worker threads" << endl,
+         log->info(Logr::Notice, "Launching worker threads", "count", Logging::Loggable(RecThreadInfo::numUDPWorkers())));
 
-    for (unsigned int thread = 0; thread < RecThreadInfo::numWorkers(); thread++, currentThreadId++) {
+    for (unsigned int thread = 0; thread < RecThreadInfo::numUDPWorkers(); thread++, currentThreadId++) {
       auto& info = RecThreadInfo::info(currentThreadId);
       info.start(currentThreadId, "worker", cpusMap, log);
     }
@@ -910,8 +908,8 @@ static void checkLinuxIPv6Limits([[maybe_unused]] Logr::log_t log)
 static void checkOrFixFDS(Logr::log_t log)
 {
   unsigned int availFDs = getFilenumLimit();
-  unsigned int wantFDs = g_maxMThreads * (RecThreadInfo::numWorkers() + RecThreadInfo::numTCPWorkers()) + 25; // even healthier margin than before
-  wantFDs += (RecThreadInfo::numWorkers() + RecThreadInfo::numTCPWorkers()) * TCPOutConnectionManager::s_maxIdlePerThread;
+  unsigned int wantFDs = g_maxMThreads * (RecThreadInfo::numUDPWorkers() + RecThreadInfo::numTCPWorkers()) + 25; // even healthier margin than before
+  wantFDs += (RecThreadInfo::numUDPWorkers() + RecThreadInfo::numTCPWorkers()) * TCPOutConnectionManager::s_maxIdlePerThread;
 
   if (wantFDs > availFDs) {
     unsigned int hardlimit = getFilenumLimit(true);
@@ -921,7 +919,7 @@ static void checkOrFixFDS(Logr::log_t log)
            log->info(Logr::Warning, "Raised soft limit on number of filedescriptors to match max-mthreads and threads settings", "limit", Logging::Loggable(wantFDs)));
     }
     else {
-      auto newval = (hardlimit - 25 - TCPOutConnectionManager::s_maxIdlePerThread) / (RecThreadInfo::numWorkers() + RecThreadInfo::numTCPWorkers());
+      auto newval = (hardlimit - 25 - TCPOutConnectionManager::s_maxIdlePerThread) / (RecThreadInfo::numUDPWorkers() + RecThreadInfo::numTCPWorkers());
       SLOG(g_log << Logger::Warning << "Insufficient number of filedescriptors available for max-mthreads*threads setting! (" << hardlimit << " < " << wantFDs << "), reducing max-mthreads to " << newval << endl,
            log->info(Logr::Warning, "Insufficient number of filedescriptors available for max-mthreads*threads setting! Reducing max-mthreads", "hardlimit", Logging::Loggable(hardlimit), "want", Logging::Loggable(wantFDs), "max-mthreads", Logging::Loggable(newval)));
       g_maxMThreads = newval;
@@ -1375,7 +1373,7 @@ void broadcastFunction(const pipefunc_t& func)
   }
 
   unsigned int thread = 0;
-  for (auto& threadInfo : RecThreadInfo::infos()) {
+  for (const auto& threadInfo : RecThreadInfo::infos()) {
     if (thread++ == RecThreadInfo::id()) {
       func(); // don't write to ourselves!
       continue;
@@ -1454,7 +1452,7 @@ T broadcastAccFunction(const std::function<T*()>& func)
 
   unsigned int thread = 0;
   T ret = T();
-  for (auto& threadInfo : RecThreadInfo::infos()) {
+  for (const auto& threadInfo : RecThreadInfo::infos()) {
     if (thread++ == RecThreadInfo::id()) {
       continue;
     }
@@ -1768,13 +1766,13 @@ static void initDistribution(Logr::log_t log)
     }
     else {
       /* first thread is the handler, there is no distributor here and workers are accepting queries */
-      for (unsigned int i = 0; i < RecThreadInfo::numWorkers(); i++, threadNum++) {
+      for (unsigned int i = 0; i < RecThreadInfo::numUDPWorkers(); i++, threadNum++) {
         auto& info = RecThreadInfo::info(threadNum);
         auto& deferredAdds = info.getDeferredAdds();
         makeUDPServerSockets(deferredAdds, log);
       }
     }
-    threadNum = 1 + RecThreadInfo::numDistributors() + RecThreadInfo::numWorkers();
+    threadNum = 1 + RecThreadInfo::numDistributors() + RecThreadInfo::numUDPWorkers();
     for (unsigned int i = 0; i < RecThreadInfo::numTCPWorkers(); i++, threadNum++) {
       auto& info = RecThreadInfo::info(threadNum);
       auto& deferredAdds = info.getDeferredAdds();
@@ -1786,12 +1784,12 @@ static void initDistribution(Logr::log_t log)
     std::set<int> tcpSockets;
     /* we don't have reuseport so we can only open one socket per
        listening addr:port and everyone will listen on it */
-    makeUDPServerSockets(g_deferredAdds, log);
-    makeTCPServerSockets(g_deferredTCPAdds, tcpSockets, log);
+    makeUDPServerSockets(s_deferredUDPadds, log);
+    makeTCPServerSockets(s_deferredTCPadds, tcpSockets, log);
 
     // TCP queries are handled by TCP workers
     for (unsigned int i = 0; i < RecThreadInfo::numTCPWorkers(); i++) {
-      auto& info = RecThreadInfo::info(i + 1 + RecThreadInfo::numDistributors() + RecThreadInfo::numWorkers());
+      auto& info = RecThreadInfo::info(i + 1 + RecThreadInfo::numDistributors() + RecThreadInfo::numUDPWorkers());
       info.setTCPSockets(tcpSockets);
     }
   }
@@ -2125,11 +2123,11 @@ static int serviceMain(Logr::log_t log)
   g_paddingOutgoing = ::arg().mustDo("edns-padding-out");
 
   RecThreadInfo::setNumDistributorThreads(::arg().asNum("distributor-threads"));
-  RecThreadInfo::setNumWorkerThreads(::arg().asNum("threads"));
-  if (RecThreadInfo::numWorkers() < 1) {
+  RecThreadInfo::setNumUDPWorkerThreads(::arg().asNum("threads"));
+  if (RecThreadInfo::numUDPWorkers() < 1) {
     SLOG(g_log << Logger::Warning << "Asked to run with 0 threads, raising to 1 instead" << endl,
          log->info(Logr::Warning, "Asked to run with 0 threads, raising to 1 instead"));
-    RecThreadInfo::setNumWorkerThreads(1);
+    RecThreadInfo::setNumUDPWorkerThreads(1);
   }
   RecThreadInfo::setNumTCPWorkerThreads(1); // XXX
   if (RecThreadInfo::numTCPWorkers() < 1) {
@@ -2748,7 +2746,7 @@ static void recursorThread()
       }
     }
 
-    unsigned int ringsize = ::arg().asNum("stats-ringbuffer-entries") / RecThreadInfo::numWorkers();
+    unsigned int ringsize = ::arg().asNum("stats-ringbuffer-entries") / RecThreadInfo::numUDPWorkers();
     if (ringsize != 0) {
       t_remotes = std::make_unique<addrringbuf_t>();
       if (RecThreadInfo::weDistributeQueries()) {
@@ -2819,7 +2817,7 @@ static void recursorThread()
         }
         else {
           /* otherwise all listeners are listening on the same ones */
-          for (const auto& deferred : threadInfo.isTCPListener() ? g_deferredTCPAdds : g_deferredAdds) {
+          for (const auto& deferred : threadInfo.isTCPListener() ? s_deferredTCPadds : s_deferredUDPadds) {
             t_fdm->addReadFD(deferred.first, deferred.second);
           }
         }
index d254274b33a99743620bbdb71b97f40d4b8f976c..daeb0dc77f4d3e3e63fd614152ba3ce7d43ccd02 100644 (file)
@@ -376,10 +376,13 @@ public:
     return worker;
   }
 
+  // UDP or TCP listener?
   [[nodiscard]] bool isListener() const
   {
     return listener;
   }
+
+  // A TCP-only listener?
   [[nodiscard]] bool isTCPListener() const
   {
     return tcplistener;
@@ -407,6 +410,7 @@ public:
 
   void setTCPListener(bool flag = true)
   {
+    setListener(flag);
     tcplistener = flag;
   }
 
@@ -440,9 +444,9 @@ public:
     return 1;
   }
 
-  static unsigned int numWorkers()
+  static unsigned int numUDPWorkers()
   {
-    return s_numWorkerThreads;
+    return s_numUDPWorkerThreads;
   }
 
   static unsigned int numTCPWorkers()
@@ -465,9 +469,9 @@ public:
     s_weDistributeQueries = flag;
   }
 
-  static void setNumWorkerThreads(unsigned int n)
+  static void setNumUDPWorkerThreads(unsigned int n)
   {
-    s_numWorkerThreads = n;
+    s_numUDPWorkerThreads = n;
   }
 
   static void setNumTCPWorkerThreads(unsigned int n)
@@ -482,7 +486,7 @@ public:
 
   static unsigned int numRecursorThreads()
   {
-    return numHandlers() + numDistributors() + numWorkers() + numTCPWorkers() + numTaskThreads();
+    return numHandlers() + numDistributors() + numUDPWorkers() + numTCPWorkers() + numTaskThreads();
   }
 
   static int runThreads(Logr::log_t);
@@ -508,7 +512,7 @@ public:
     return deferredAdds;
   }
 
-  ThreadPipeSet& getPipes()
+  const ThreadPipeSet& getPipes() const
   {
     return pipes;
   }
@@ -547,7 +551,7 @@ private:
   MT_t* mt{nullptr};
   uint64_t numberOfDistributedQueries{0};
 
-  void start(unsigned int theId, const string& name, const std::map<unsigned int, std::set<int>>& cpusMap, Logr::log_t);
+  void start(unsigned int tid, const string& tname, const std::map<unsigned int, std::set<int>>& cpusMap, Logr::log_t);
 
   std::string name;
   std::thread thread;
@@ -568,7 +572,7 @@ private:
   static std::vector<RecThreadInfo> s_threadInfos;
   static bool s_weDistributeQueries; // if true, 1 or more threads listen on the incoming query sockets and distribute them to workers
   static unsigned int s_numDistributorThreads;
-  static unsigned int s_numWorkerThreads;
+  static unsigned int s_numUDPWorkerThreads;
   static unsigned int s_numTCPWorkerThreads;
 };
 
index e8b9da98c3797a9bebe13af52f33a96dbb6717f8..a5a921cd49a31f6000f2a051440a8e264ec1aa14 100644 (file)
@@ -56,7 +56,6 @@
 // And this approach was implemented in https://github.com/PowerDNS/pdns/pull/13195. The distributor
 // and worker thread(s) now no longe process TCP queries.
 
-
 size_t g_tcpMaxQueriesPerConn;
 unsigned int g_maxTCPPerClient;
 int g_tcpTimeout;