]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
Introduce taskThread
authorOtto Moerbeek <otto.moerbeek@open-xchange.com>
Tue, 25 Jan 2022 14:11:20 +0000 (15:11 +0100)
committerOtto Moerbeek <otto.moerbeek@open-xchange.com>
Fri, 4 Feb 2022 10:06:16 +0000 (11:06 +0100)
pdns/pdns_recursor.cc
pdns/rec_channel_rec.cc
pdns/recursordist/rec-main.cc
pdns/recursordist/rec-main.hh

index 10cca1503f33a40d986f566cdd9dc7216b89f0c8..9014af9ca743187fd3c30227c7c1db33ad37300f 100644 (file)
@@ -2187,7 +2187,7 @@ static void handleNewUDPQuestion(int fd, FDMultiplexer::funcparam_t& var)
             destination = dest;
           }
 
-          if (RecThreadInfo::s_weDistributeQueries) {
+          if (RecThreadInfo::weDistributeQueries()) {
             std::string localdata = data;
             distributeAsyncFunction(data, [localdata, fromaddr, dest, source, destination, tv, fd, proxyProtocolValues, eventTrace]() mutable {
               return doProcessUDPQuestion(localdata, fromaddr, dest, source, destination, tv, fd, proxyProtocolValues, eventTrace);
@@ -2353,7 +2353,7 @@ static bool trySendingQueryToWorker(unsigned int target, ThreadMSG* tmsg)
 
 static unsigned int getWorkerLoad(size_t workerIdx)
 {
-  const auto mt = RecThreadInfo::info(/* skip handler */ 1 + RecThreadInfo::s_numDistributorThreads + workerIdx).mt;
+  const auto mt = RecThreadInfo::info(RecThreadInfo::numHandlers() + RecThreadInfo::numDistributors() + workerIdx).mt;
   if (mt != nullptr) {
     return mt->numProcesses();
   }
@@ -2363,32 +2363,29 @@ static unsigned int getWorkerLoad(size_t workerIdx)
 static unsigned int selectWorker(unsigned int hash)
 {
   if (g_balancingFactor == 0) {
-    return /* skip handler */ 1 + RecThreadInfo::s_numDistributorThreads + (hash % RecThreadInfo::s_numWorkerThreads);
+    return RecThreadInfo::numHandlers() + RecThreadInfo::numDistributors() + (hash % RecThreadInfo::numWorkers());
   }
 
   /* we start with one, representing the query we are currently handling */
   double currentLoad = 1;
-  std::vector<unsigned int> load(RecThreadInfo::s_numWorkerThreads);
-  for (size_t idx = 0; idx < RecThreadInfo::s_numWorkerThreads; idx++) {
+  std::vector<unsigned int> load(RecThreadInfo::numWorkers());
+  for (size_t idx = 0; idx < RecThreadInfo::numWorkers(); idx++) {
     load[idx] = getWorkerLoad(idx);
     currentLoad += load[idx];
-    // cerr<<"load for worker "<<idx<<" is "<<load[idx]<<endl;
   }
 
-  double targetLoad = (currentLoad / RecThreadInfo::s_numWorkerThreads) * g_balancingFactor;
-  // cerr<<"total load is "<<currentLoad<<", number of workers is "<<g_numWorkerThreads<<", target load is "<<targetLoad<<endl;
+  double targetLoad = (currentLoad / RecThreadInfo::numWorkers()) * g_balancingFactor;
 
-  unsigned int worker = hash % RecThreadInfo::s_numWorkerThreads;
+  unsigned int worker = hash % RecThreadInfo::numWorkers();
   /* at least one server has to be at or below the average load */
   if (load[worker] > targetLoad) {
     ++g_stats.rebalancedQueries;
     do {
-      // cerr<<"worker "<<worker<<" is above the target load, selecting another one"<<endl;
-      worker = (worker + 1) % RecThreadInfo::s_numWorkerThreads;
+      worker = (worker + 1) % RecThreadInfo::numWorkers();
     } while (load[worker] > targetLoad);
   }
 
-  return /* skip handler */ 1 + RecThreadInfo::s_numDistributorThreads + worker;
+  return RecThreadInfo::numHandlers() + RecThreadInfo::numDistributors() + worker;
 }
 
 // This function is only called by the distributor threads, when pdns-distributes-queries is set
@@ -2411,7 +2408,7 @@ void distributeAsyncFunction(const string& packet, const pipefunc_t& func)
        was full, let's try another one */
     unsigned int newTarget = 0;
     do {
-      newTarget = /* skip handler */ 1 + RecThreadInfo::s_numDistributorThreads + dns_random(RecThreadInfo::s_numWorkerThreads);
+      newTarget = RecThreadInfo::numHandlers() + RecThreadInfo::numDistributors() + dns_random(RecThreadInfo::numWorkers());
     } while (newTarget == target);
 
     if (!trySendingQueryToWorker(newTarget, tmsg)) {
index b7fc66ee140ad949703a20f5755aa6d74385ad05..9678bbeaecda05dc59c08a870cf96d9dc65727a3 100644 (file)
@@ -1118,7 +1118,7 @@ static StatsMap toCPUStatsMap(const string& name)
 {
   const string pbasename = getPrometheusName(name);
   StatsMap entries;
-  for (unsigned int n = 0; n < RecThreadInfo::numThreads(); ++n) {
+  for (unsigned int n = 0; n < RecThreadInfo::numRecursorThreads(); ++n) {
     uint64_t tm = doGetThreadCPUMsec(n);
     std::string pname = pbasename + "{thread=\"" + std::to_string(n) + "\"}";
     entries.emplace(name + "-thread-" + std::to_string(n), StatsMapEntry{pname, std::to_string(tm)});
index 8bf704dc09030c4532bbeba221e9d3c0fb89a995..b00e42f9d41a6d4dd63a1ae836e89699079b7709 100644 (file)
@@ -629,8 +629,8 @@ static void checkLinuxIPv6Limits()
 static void checkOrFixFDS()
 {
   unsigned int availFDs = getFilenumLimit();
-  unsigned int wantFDs = g_maxMThreads * RecThreadInfo::s_numWorkerThreads + 25; // even healthier margin then before
-  wantFDs += RecThreadInfo::s_numWorkerThreads * TCPOutConnectionManager::s_maxIdlePerThread;
+  unsigned int wantFDs = g_maxMThreads * RecThreadInfo::numWorkers() + 25; // even healthier margin then before
+  wantFDs += RecThreadInfo::numWorkers() * TCPOutConnectionManager::s_maxIdlePerThread;
 
   if (wantFDs > availFDs) {
     unsigned int hardlimit = getFilenumLimit(true);
@@ -639,7 +639,7 @@ static void checkOrFixFDS()
       g_log << Logger::Warning << "Raised soft limit on number of filedescriptors to " << wantFDs << " to match max-mthreads and threads settings" << endl;
     }
     else {
-      int newval = (hardlimit - 25 - TCPOutConnectionManager::s_maxIdlePerThread) / RecThreadInfo::s_numWorkerThreads;
+      int newval = (hardlimit - 25 - TCPOutConnectionManager::s_maxIdlePerThread) / RecThreadInfo::numWorkers();
       g_log << Logger::Warning << "Insufficient number of filedescriptors available for max-mthreads*threads setting! (" << hardlimit << " < " << wantFDs << "), reducing max-mthreads to " << newval << endl;
       g_maxMThreads = newval;
       setFilenumLimit(hardlimit);
@@ -697,7 +697,7 @@ static void makeThreadPipes()
   }
 
   /* thread 0 is the handler / SNMP, worker threads start at 1 */
-  for (unsigned int n = 0; n <= RecThreadInfo::numThreads(); ++n) {
+  for (unsigned int n = 0; n < RecThreadInfo::numRecursorThreads(); ++n) {
     auto& threadInfo = RecThreadInfo::info(n);
 
     int fd[2];
@@ -1148,8 +1148,8 @@ static int serviceMain(int argc, char* argv[])
   }
 
   /* this needs to be done before parseACLs(), which call broadcastFunction() */
-  RecThreadInfo::s_weDistributeQueries = ::arg().mustDo("pdns-distributes-queries");
-  if (RecThreadInfo::s_weDistributeQueries) {
+  RecThreadInfo::setWeDistributeQueries(::arg().mustDo("pdns-distributes-queries"));
+  if (RecThreadInfo::weDistributeQueries()) {
     g_log << Logger::Warning << "PowerDNS Recursor itself will distribute queries over threads" << endl;
   }
 
@@ -1323,11 +1323,11 @@ static int serviceMain(int argc, char* argv[])
   }
   g_paddingTag = ::arg().asNum("edns-padding-tag");
 
-  RecThreadInfo::s_numDistributorThreads = ::arg().asNum("distributor-threads");
-  RecThreadInfo::s_numWorkerThreads = ::arg().asNum("threads");
-  if (RecThreadInfo::s_numWorkerThreads < 1) {
+  RecThreadInfo::setNumDistributorThreads(::arg().asNum("distributor-threads"));
+  RecThreadInfo::setNumWorkerThreads(::arg().asNum("threads"));
+  if (RecThreadInfo::numWorkers() < 1) {
     g_log << Logger::Warning << "Asked to run with 0 threads, raising to 1 instead" << endl;
-    RecThreadInfo::s_numWorkerThreads = 1;
+    RecThreadInfo::setNumWorkerThreads(1);
   }
 
   g_maxMThreads = ::arg().asNum("max-mthreads");
@@ -1424,12 +1424,12 @@ static int serviceMain(int argc, char* argv[])
   g_reusePort = ::arg().mustDo("reuseport");
 #endif
 
-  RecThreadInfo::infos().resize(RecThreadInfo::numThreads() + /* handler */ 1);
+  RecThreadInfo::infos().resize(RecThreadInfo::numHandlers() + RecThreadInfo::numDistributors() + RecThreadInfo::numWorkers() + RecThreadInfo::numTaskThreads());
 
   if (g_reusePort) {
-    if (RecThreadInfo::s_weDistributeQueries) {
+    if (RecThreadInfo::weDistributeQueries()) {
       /* first thread is the handler, then distributors */
-      for (unsigned int threadId = 1; threadId <= RecThreadInfo::s_numDistributorThreads; threadId++) {
+      for (unsigned int threadId = 1; threadId <= RecThreadInfo::numDistributors(); threadId++) {
         auto& info =  RecThreadInfo::info(threadId);
         auto& deferredAdds = info.deferredAdds;
         auto& tcpSockets = info.tcpSockets;
@@ -1439,7 +1439,7 @@ static int serviceMain(int argc, char* argv[])
     }
     else {
       /* first thread is the handler, there is no distributor here and workers are accepting queries */
-      for (unsigned int threadId = 1; threadId <= RecThreadInfo::s_numWorkerThreads; threadId++) {
+      for (unsigned int threadId = 1; threadId <= RecThreadInfo::numWorkers(); threadId++) {
         auto& info =  RecThreadInfo::info(threadId);
         auto& deferredAdds = info.deferredAdds;
         auto& tcpSockets = info.tcpSockets;
@@ -1457,15 +1457,15 @@ static int serviceMain(int argc, char* argv[])
 
     /* every listener (so distributor if g_weDistributeQueries, workers otherwise)
        needs to listen to the shared sockets */
-    if (RecThreadInfo::s_weDistributeQueries) {
+    if (RecThreadInfo::weDistributeQueries()) {
       /* first thread is the handler, then distributors */
-      for (unsigned int threadId = 1; threadId <= RecThreadInfo::s_numDistributorThreads; threadId++) {
+      for (unsigned int threadId = 1; threadId <= RecThreadInfo::numDistributors(); threadId++) {
         RecThreadInfo::info(threadId).tcpSockets = tcpSockets;
       }
     }
     else {
       /* first thread is the handler, there is no distributor here and workers are accepting queries */
-      for (unsigned int threadId = 1; threadId <= RecThreadInfo::s_numWorkerThreads; threadId++) {
+      for (unsigned int threadId = 1; threadId <= RecThreadInfo::numWorkers(); threadId++) {
         RecThreadInfo::info(threadId).tcpSockets = tcpSockets;
       }
     }
@@ -1632,7 +1632,7 @@ static int serviceMain(int argc, char* argv[])
   unsigned int currentThreadId = 1;
   const auto cpusMap = parseCPUMap();
 
-  if (RecThreadInfo::numThreads() == 1) {
+  if (RecThreadInfo::numDistributors() + RecThreadInfo::numWorkers() == 1) {
     g_log << Logger::Warning << "Operating unthreaded" << endl;
 #ifdef HAVE_SYSTEMD
     sd_notify(0, "READY=1");
@@ -1642,6 +1642,9 @@ static int serviceMain(int argc, char* argv[])
     auto& handlerInfo = RecThreadInfo::info(0);
     handlerInfo.setHandler();
     handlerInfo.start(0, "web+stat");
+    auto& taskInfo = RecThreadInfo::info(2);
+    taskInfo.setTaskThread();
+    taskInfo.start(2, "tasks");
 
     setCPUMap(cpusMap, currentThreadId, pthread_self());
 
@@ -1655,36 +1658,52 @@ static int serviceMain(int argc, char* argv[])
     if (handlerInfo.exitCode != 0) {
       ret = handlerInfo.exitCode;
     }
+    taskInfo.thread.join();
+    if (taskInfo.exitCode != 0) {
+      ret = taskInfo.exitCode;
+    }
   }
   else {
-    if (RecThreadInfo::s_weDistributeQueries) {
-      for (unsigned int n = 0; n < RecThreadInfo::s_numDistributorThreads; ++n) {
-        RecThreadInfo::info(currentThreadId + n).setListener();
+    // Setup RecThreadInfo objects
+    unsigned int tmp = currentThreadId;
+    if (RecThreadInfo::weDistributeQueries()) {
+      for (unsigned int n = 0; n < RecThreadInfo::numDistributors(); ++n) {
+        RecThreadInfo::info(tmp++).setListener();
       }
     }
-    for (unsigned int n = 0; n < RecThreadInfo::s_numWorkerThreads; ++n) {
-      auto& info = RecThreadInfo::info(currentThreadId + (RecThreadInfo::s_weDistributeQueries ? RecThreadInfo::s_numDistributorThreads : 0) + n);
-      info.setListener(!RecThreadInfo::s_weDistributeQueries);
+    for (unsigned int n = 0; n < RecThreadInfo::numWorkers(); ++n) {
+      auto& info = RecThreadInfo::info(tmp++);
+      info.setListener(!RecThreadInfo::weDistributeQueries());
       info.setWorker();
     }
+    for (unsigned int n = 0; n < RecThreadInfo::numTaskThreads(); ++n) {
+      auto& info = RecThreadInfo::info(tmp++);
+      info.setTaskThread();
+    }
 
-    if (RecThreadInfo::s_weDistributeQueries) {
-      g_log << Logger::Warning << "Launching " << RecThreadInfo::s_numDistributorThreads << " distributor threads" << endl;
-      for (unsigned int n = 0; n < RecThreadInfo::s_numDistributorThreads; ++n) {
+    if (RecThreadInfo::weDistributeQueries()) {
+      g_log << Logger::Warning << "Launching " << RecThreadInfo::numDistributors() << " distributor threads" << endl;
+      for (unsigned int n = 0; n < RecThreadInfo::numDistributors(); ++n) {
         auto& info = RecThreadInfo::info(currentThreadId);
         info.start(currentThreadId++, "distr");
         setCPUMap(cpusMap, currentThreadId, info.thread.native_handle()); // XXX off by one?
       }
     }
 
-    g_log << Logger::Warning << "Launching " << RecThreadInfo::s_numWorkerThreads << " worker threads" << endl;
+    g_log << Logger::Warning << "Launching " << RecThreadInfo::numWorkers() << " worker threads" << endl;
 
-    for (unsigned int n = 0; n < RecThreadInfo::s_numWorkerThreads; ++n) {
+    for (unsigned int n = 0; n < RecThreadInfo::numWorkers(); ++n) {
       auto& info = RecThreadInfo::info(currentThreadId);
       info.start(currentThreadId++, "worker");
       setCPUMap(cpusMap, currentThreadId, info.thread.native_handle()); // XXX off by one?
     }
 
+    for (unsigned int n = 0; n < RecThreadInfo::numTaskThreads(); ++n) {
+      auto& info = RecThreadInfo::info(currentThreadId);
+      info.start(currentThreadId++, "taskThread");
+      setCPUMap(cpusMap, currentThreadId, info.thread.native_handle()); // XXX off by one?
+    }
+
 #ifdef HAVE_SYSTEMD
     sd_notify(0, "READY=1");
 #endif
@@ -1789,7 +1808,7 @@ static void houseKeeping(void*)
     past = now;
     past.tv_sec -= 5;
     if (t_last_prune < past) {
-      t_packetCache->doPruneTo(g_maxPacketCacheEntries / (RecThreadInfo::s_numWorkerThreads + RecThreadInfo::s_numDistributorThreads));
+      t_packetCache->doPruneTo(g_maxPacketCacheEntries / (RecThreadInfo::numWorkers() + RecThreadInfo::numDistributors()));
 
       time_t limit;
       if (!((t_cleanCounter++) % 40)) { // this is a full scan!
@@ -1945,11 +1964,11 @@ void* recursorThread()
       }
     }
 
-    unsigned int ringsize = ::arg().asNum("stats-ringbuffer-entries") / RecThreadInfo::s_numWorkerThreads;
+    unsigned int ringsize = ::arg().asNum("stats-ringbuffer-entries") / RecThreadInfo::numWorkers();
     if (ringsize) {
       t_remotes = std::make_unique<addrringbuf_t>();
-      if (RecThreadInfo::s_weDistributeQueries)
-        t_remotes->set_capacity(::arg().asNum("stats-ringbuffer-entries") / RecThreadInfo::s_numDistributorThreads);
+      if (RecThreadInfo::weDistributeQueries())
+        t_remotes->set_capacity(::arg().asNum("stats-ringbuffer-entries") / RecThreadInfo::numDistributors());
       else
         t_remotes->set_capacity(ringsize);
       t_servfailremotes = std::make_unique<addrringbuf_t>();
index e4f79ae6013a60df02660d87ca01fc5db5e7a155..93d5c979eb5082080d9fc36864e6b6297c836cb1 100644 (file)
@@ -351,6 +351,11 @@ public:
     return listener;
   }
 
+  bool isTaskThread() const
+  {
+    return taskThread;
+  }
+
   void setHandler()
   {
     handler = true;
@@ -366,6 +371,11 @@ public:
     listener = flag;
   }
 
+  void setTaskThread()
+  {
+    taskThread = true;
+  }
+
   void start(unsigned int id, const string& name)
   {
     thread = std::thread([id, name] {
@@ -386,9 +396,54 @@ public:
     t_id = id;
   }
 
-   // FD corresponding to TCP sockets this thread is listening on.
-   // These FDs are also in deferredAdds when we have one socket per
-   // listener, and in g_deferredAdds instead.
+  static unsigned int numHandlers()
+  {
+    return 1;
+  }
+
+  static unsigned int numTaskThreads()
+  {
+    return 1;
+  }
+
+  static unsigned int numWorkers()
+  {
+    return s_numWorkerThreads;
+  }
+
+  static unsigned int numDistributors()
+  {
+    return s_numDistributorThreads;
+  }
+
+  static bool weDistributeQueries()
+  {
+    return s_weDistributeQueries;
+  }
+
+  static void setWeDistributeQueries(bool flag)
+  {
+    s_weDistributeQueries = flag;
+  }
+
+  static void setNumWorkerThreads(unsigned int n)
+  {
+    s_numWorkerThreads = n;
+  }
+
+  static void setNumDistributorThreads(unsigned int n)
+  {
+    s_numDistributorThreads = n;
+  }
+
+  static unsigned int numRecursorThreads()
+  {
+    return numHandlers() + numDistributors() + numWorkers() + numTaskThreads();
+  }
+
+  // FD corresponding to TCP sockets this thread is listening on.
+  // These FDs are also in deferredAdds when we have one socket per
+  // listener, and in g_deferredAdds instead.
   std::set<int> tcpSockets;
   // FD corresponding to listening sockets if we have one socket per
   // listener (with reuseport), otherwise all listeners share the
@@ -401,24 +456,21 @@ public:
   uint64_t numberOfDistributedQueries{0};
   int exitCode{0};
 
-  static bool s_weDistributeQueries; // if true, 1 or more threads listen on the incoming query sockets and distribute them to workers
-  static unsigned int s_numDistributorThreads;
-  static unsigned int s_numWorkerThreads;
-
-  static unsigned int numThreads()
-  {
-    return s_numDistributorThreads + s_numWorkerThreads;
-  }
-
 private:
-  /* handle the web server, carbon, statistics and the control channel */
+  // handle the web server, carbon, statistics and the control channel
   bool handler{false};
-  /* accept incoming queries (and distributes them to the workers if pdns-distributes-queries is set) */
+  // accept incoming queries (and distributes them to the workers if pdns-distributes-queries is set)
   bool listener{false};
-  /* process queries */
+  // process queries
   bool worker{false};
+  // run async tasks: from TastQueue and ZoneToCache
+  bool taskThread{false};
+
   static thread_local unsigned int t_id;
   static std::vector<RecThreadInfo> s_threadInfos;
+  static bool s_weDistributeQueries; // if true, 1 or more threads listen on the incoming query sockets and distribute them to workers
+  static unsigned int s_numDistributorThreads;
+  static unsigned int s_numWorkerThreads;
 };
 
 struct ThreadMSG