]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
Refactor recursorThread startup and join code.
authorOtto Moerbeek <otto.moerbeek@open-xchange.com>
Wed, 26 Jan 2022 08:26:19 +0000 (09:26 +0100)
committerOtto Moerbeek <otto.moerbeek@open-xchange.com>
Fri, 4 Feb 2022 10:06:16 +0000 (11:06 +0100)
pdns/recursordist/rec-main.cc
pdns/recursordist/rec-main.hh

index 9f3a63db61bc30d2823caecddf46d8d6ed7ae970..79342bbf8afa9d8cac16b4581d59ed5340c859ce 100644 (file)
@@ -108,6 +108,229 @@ unsigned int RecThreadInfo::s_numDistributorThreads;
 unsigned int RecThreadInfo::s_numWorkerThreads;
 thread_local unsigned int RecThreadInfo::t_id;
 
+static std::map<unsigned int, std::set<int>> parseCPUMap()
+{
+  std::map<unsigned int, std::set<int>> result;
+
+  const std::string value = ::arg()["cpu-map"];
+
+  if (!value.empty() && !isSettingThreadCPUAffinitySupported()) {
+    g_log << Logger::Warning << "CPU mapping requested but not supported, skipping" << endl;
+    return result;
+  }
+
+  std::vector<std::string> parts;
+
+  stringtok(parts, value, " \t");
+
+  for (const auto& part : parts) {
+    if (part.find('=') == string::npos)
+      continue;
+
+    try {
+      auto headers = splitField(part, '=');
+      boost::trim(headers.first);
+      boost::trim(headers.second);
+
+      unsigned int threadId = pdns_stou(headers.first);
+      std::vector<std::string> cpus;
+
+      stringtok(cpus, headers.second, ",");
+
+      for (const auto& cpu : cpus) {
+        int cpuId = std::stoi(cpu);
+
+        result[threadId].insert(cpuId);
+      }
+    }
+    catch (const std::exception& e) {
+      g_log << Logger::Error << "Error parsing cpu-map entry '" << part << "': " << e.what() << endl;
+    }
+  }
+
+  return result;
+}
+
+static void setCPUMap(const std::map<unsigned int, std::set<int>>& cpusMap, unsigned int n, pthread_t tid)
+{
+  const auto& cpuMapping = cpusMap.find(n);
+  if (cpuMapping == cpusMap.cend()) {
+    return;
+  }
+  int rc = mapThreadToCPUList(tid, cpuMapping->second);
+  if (rc == 0) {
+    g_log << Logger::Info << "CPU affinity for thread " << n << " has been set to CPU map:";
+    for (const auto cpu : cpuMapping->second) {
+      g_log << Logger::Info << " " << cpu;
+    }
+    g_log << Logger::Info << endl;
+  }
+  else {
+    g_log << Logger::Warning << "Error setting CPU affinity for thread " << n << " to CPU map:";
+    for (const auto cpu : cpuMapping->second) {
+      g_log << Logger::Info << " " << cpu;
+    }
+    g_log << Logger::Info << ' ' << strerror(rc) << endl;
+  }
+}
+
+void RecThreadInfo::start(unsigned int id, const string& name, const std::map<unsigned int, std::set<int>>& cpusMap)
+{
+  thread = std::thread([id, name] {
+    t_id = id;
+    const string threadPrefix = "rec/";
+    setThreadName(threadPrefix + name);
+    recursorThread();
+  });
+  setCPUMap(cpusMap, id, thread.native_handle());
+}
+
+int RecThreadInfo::runThreads()
+{
+  int ret = EXIT_SUCCESS;
+  unsigned int currentThreadId = 1;
+  const auto cpusMap = parseCPUMap();
+
+  if (RecThreadInfo::numDistributors() + RecThreadInfo::numWorkers() == 1) {
+    g_log << Logger::Warning << "Operating unthreaded" << endl;
+
+#ifdef HAVE_SYSTEMD
+    sd_notify(0, "READY=1");
+#endif
+
+    /* This thread handles the web server, carbon, statistics and the control channel */
+    auto& handlerInfo = RecThreadInfo::info(0);
+    handlerInfo.setHandler();
+    handlerInfo.start(0, "web+stat", cpusMap);
+    auto& taskInfo = RecThreadInfo::info(2);
+    taskInfo.setTaskThread();
+    taskInfo.start(2, "tasks", cpusMap);
+
+    auto& info = RecThreadInfo::info(currentThreadId);
+    info.setListener();
+    info.setWorker();
+    info.setThreadId(currentThreadId++);
+    recursorThread();
+
+    handlerInfo.thread.join();
+    if (handlerInfo.exitCode != 0) {
+      ret = handlerInfo.exitCode;
+    }
+    taskInfo.thread.join();
+    if (taskInfo.exitCode != 0) {
+      ret = taskInfo.exitCode;
+    }
+  }
+  else {
+    // Setup RecThreadInfo objects
+    unsigned int tmp = currentThreadId;
+    if (RecThreadInfo::weDistributeQueries()) {
+      for (unsigned int n = 0; n < RecThreadInfo::numDistributors(); ++n) {
+        RecThreadInfo::info(tmp++).setListener();
+      }
+    }
+    for (unsigned int n = 0; n < RecThreadInfo::numWorkers(); ++n) {
+      auto& info = RecThreadInfo::info(tmp++);
+      info.setListener(!RecThreadInfo::weDistributeQueries());
+      info.setWorker();
+    }
+    for (unsigned int n = 0; n < RecThreadInfo::numTaskThreads(); ++n) {
+      auto& info = RecThreadInfo::info(tmp++);
+      info.setTaskThread();
+    }
+
+    // And now start the actual threads
+    if (RecThreadInfo::weDistributeQueries()) {
+      g_log << Logger::Warning << "Launching " << RecThreadInfo::numDistributors() << " distributor threads" << endl;
+      for (unsigned int n = 0; n < RecThreadInfo::numDistributors(); ++n) {
+        auto& info = RecThreadInfo::info(currentThreadId);
+        info.start(currentThreadId++, "distr", cpusMap);
+      }
+    }
+
+    g_log << Logger::Warning << "Launching " << RecThreadInfo::numWorkers() << " worker threads" << endl;
+
+    for (unsigned int n = 0; n < RecThreadInfo::numWorkers(); ++n) {
+      auto& info = RecThreadInfo::info(currentThreadId);
+      info.start(currentThreadId++, "worker", cpusMap);
+    }
+
+    for (unsigned int n = 0; n < RecThreadInfo::numTaskThreads(); ++n) {
+      auto& info = RecThreadInfo::info(currentThreadId);
+      info.start(currentThreadId++, "taskThread", cpusMap);
+    }
+
+#ifdef HAVE_SYSTEMD
+    sd_notify(0, "READY=1");
+#endif
+
+    /* This thread handles the web server, carbon, statistics and the control channel */
+    auto& info = RecThreadInfo::info(0);
+    info.setHandler();
+    info.start(0, "web+stat", cpusMap);
+
+    for (auto& ti : RecThreadInfo::infos()) {
+      ti.thread.join();
+      if (ti.exitCode != 0) {
+        ret = ti.exitCode;
+      }
+    }
+  }
+  return ret;
+}
+
+void RecThreadInfo::makeThreadPipes()
+{
+  auto pipeBufferSize = ::arg().asNum("distribution-pipe-buffer-size");
+  if (pipeBufferSize > 0) {
+    g_log << Logger::Info << "Resizing the buffer of the distribution pipe to " << pipeBufferSize << endl;
+  }
+
+  /* thread 0 is the handler / SNMP, worker threads start at 1 */
+  for (unsigned int n = 0; n < numRecursorThreads(); ++n) {
+    auto& threadInfo = info(n);
+
+    int fd[2];
+    if (pipe(fd) < 0)
+      unixDie("Creating pipe for inter-thread communications");
+
+    threadInfo.pipes.readToThread = fd[0];
+    threadInfo.pipes.writeToThread = fd[1];
+
+    // handler thread only gets first pipe, not the others
+    if (n == 0) {
+      continue;
+    }
+
+    if (pipe(fd) < 0)
+      unixDie("Creating pipe for inter-thread communications");
+
+    threadInfo.pipes.readFromThread = fd[0];
+    threadInfo.pipes.writeFromThread = fd[1];
+
+    if (pipe(fd) < 0)
+      unixDie("Creating pipe for inter-thread communications");
+
+    threadInfo.pipes.readQueriesToThread = fd[0];
+    threadInfo.pipes.writeQueriesToThread = fd[1];
+
+    if (pipeBufferSize > 0) {
+      if (!setPipeBufferSize(threadInfo.pipes.writeQueriesToThread, pipeBufferSize)) {
+        int err = errno;
+        g_log << Logger::Warning << "Error resizing the buffer of the distribution pipe for thread " << n << " to " << pipeBufferSize << ": " << strerror(err) << endl;
+        auto existingSize = getPipeBufferSize(threadInfo.pipes.writeQueriesToThread);
+        if (existingSize > 0) {
+          g_log << Logger::Warning << "The current size of the distribution pipe's buffer for thread " << n << " is " << existingSize << endl;
+        }
+      }
+    }
+
+    if (!setNonBlocking(threadInfo.pipes.writeQueriesToThread)) {
+      unixDie("Making pipe for inter-thread communications non-blocking");
+    }
+  }
+}
+
 ArgvMap& arg()
 {
   static ArgvMap theArg;
@@ -426,71 +649,6 @@ static void writePid(void)
   }
 }
 
-static std::map<unsigned int, std::set<int>> parseCPUMap()
-{
-  std::map<unsigned int, std::set<int>> result;
-
-  const std::string value = ::arg()["cpu-map"];
-
-  if (!value.empty() && !isSettingThreadCPUAffinitySupported()) {
-    g_log << Logger::Warning << "CPU mapping requested but not supported, skipping" << endl;
-    return result;
-  }
-
-  std::vector<std::string> parts;
-
-  stringtok(parts, value, " \t");
-
-  for (const auto& part : parts) {
-    if (part.find('=') == string::npos)
-      continue;
-
-    try {
-      auto headers = splitField(part, '=');
-      boost::trim(headers.first);
-      boost::trim(headers.second);
-
-      unsigned int threadId = pdns_stou(headers.first);
-      std::vector<std::string> cpus;
-
-      stringtok(cpus, headers.second, ",");
-
-      for (const auto& cpu : cpus) {
-        int cpuId = std::stoi(cpu);
-
-        result[threadId].insert(cpuId);
-      }
-    }
-    catch (const std::exception& e) {
-      g_log << Logger::Error << "Error parsing cpu-map entry '" << part << "': " << e.what() << endl;
-    }
-  }
-
-  return result;
-}
-
-static void setCPUMap(const std::map<unsigned int, std::set<int>>& cpusMap, unsigned int n, pthread_t tid)
-{
-  const auto& cpuMapping = cpusMap.find(n);
-  if (cpuMapping != cpusMap.cend()) {
-    int rc = mapThreadToCPUList(tid, cpuMapping->second);
-    if (rc == 0) {
-      g_log << Logger::Info << "CPU affinity for worker " << n << " has been set to CPU map:";
-      for (const auto cpu : cpuMapping->second) {
-        g_log << Logger::Info << " " << cpu;
-      }
-      g_log << Logger::Info << endl;
-    }
-    else {
-      g_log << Logger::Warning << "Error setting CPU affinity for worker " << n << " to CPU map:";
-      for (const auto cpu : cpuMapping->second) {
-        g_log << Logger::Info << " " << cpu;
-      }
-      g_log << Logger::Info << strerror(rc) << endl;
-    }
-  }
-}
-
 static void checkSocketDir(void)
 {
   struct stat st;
@@ -689,57 +847,6 @@ static void loggerBackend(const Logging::Entry& entry)
   g_log << u << buf.str() << endl;
 }
 
-static void makeThreadPipes()
-{
-  auto pipeBufferSize = ::arg().asNum("distribution-pipe-buffer-size");
-  if (pipeBufferSize > 0) {
-    g_log << Logger::Info << "Resizing the buffer of the distribution pipe to " << pipeBufferSize << endl;
-  }
-
-  /* thread 0 is the handler / SNMP, worker threads start at 1 */
-  for (unsigned int n = 0; n < RecThreadInfo::numRecursorThreads(); ++n) {
-    auto& threadInfo = RecThreadInfo::info(n);
-
-    int fd[2];
-    if (pipe(fd) < 0)
-      unixDie("Creating pipe for inter-thread communications");
-
-    threadInfo.pipes.readToThread = fd[0];
-    threadInfo.pipes.writeToThread = fd[1];
-
-    // handler thread only gets first pipe, not the others
-    if (n == 0) {
-      continue;
-    }
-
-    if (pipe(fd) < 0)
-      unixDie("Creating pipe for inter-thread communications");
-
-    threadInfo.pipes.readFromThread = fd[0];
-    threadInfo.pipes.writeFromThread = fd[1];
-
-    if (pipe(fd) < 0)
-      unixDie("Creating pipe for inter-thread communications");
-
-    threadInfo.pipes.readQueriesToThread = fd[0];
-    threadInfo.pipes.writeQueriesToThread = fd[1];
-
-    if (pipeBufferSize > 0) {
-      if (!setPipeBufferSize(threadInfo.pipes.writeQueriesToThread, pipeBufferSize)) {
-        int err = errno;
-        g_log << Logger::Warning << "Error resizing the buffer of the distribution pipe for thread " << n << " to " << pipeBufferSize << ": " << strerror(err) << endl;
-        auto existingSize = getPipeBufferSize(threadInfo.pipes.writeQueriesToThread);
-        if (existingSize > 0) {
-          g_log << Logger::Warning << "The current size of the distribution pipe's buffer for thread " << n << " is " << existingSize << endl;
-        }
-      }
-    }
-
-    if (!setNonBlocking(threadInfo.pipes.writeQueriesToThread)) {
-      unixDie("Making pipe for inter-thread communications non-blocking");
-    }
-  }
-}
 
 static int ratePercentage(uint64_t nom, uint64_t denom)
 {
@@ -1041,8 +1148,6 @@ template ThreadTimes broadcastAccFunction(const boost::function<ThreadTimes*()>&
 
 static int serviceMain(int argc, char* argv[])
 {
-  int ret = EXIT_SUCCESS;
-
   g_slogStructured = ::arg().mustDo("structured-logging");
 
   g_log.setName(g_programname);
@@ -1578,7 +1683,7 @@ static int serviceMain(int argc, char* argv[])
   startLuaConfigDelayedThreads(delayedLuaThreads, g_luaconfs.getCopy().generation);
   delayedLuaThreads.rpzPrimaryThreads.clear(); // no longer needed
 
-  makeThreadPipes();
+  RecThreadInfo::makeThreadPipes();
 
   g_tcpTimeout = ::arg().asNum("client-tcp-timeout");
   g_maxTCPPerClient = ::arg().asNum("max-tcp-per-client");
@@ -1629,99 +1734,7 @@ static int serviceMain(int argc, char* argv[])
     g_avoidUdpSourcePorts.insert(port);
   }
 
-  unsigned int currentThreadId = 1;
-  const auto cpusMap = parseCPUMap();
-
-  if (RecThreadInfo::numDistributors() + RecThreadInfo::numWorkers() == 1) {
-    g_log << Logger::Warning << "Operating unthreaded" << endl;
-#ifdef HAVE_SYSTEMD
-    sd_notify(0, "READY=1");
-#endif
-
-    /* This thread handles the web server, carbon, statistics and the control channel */
-    auto& handlerInfo = RecThreadInfo::info(0);
-    handlerInfo.setHandler();
-    handlerInfo.start(0, "web+stat");
-    auto& taskInfo = RecThreadInfo::info(2);
-    taskInfo.setTaskThread();
-    taskInfo.start(2, "tasks");
-
-    setCPUMap(cpusMap, currentThreadId, pthread_self());
-
-    auto& info = RecThreadInfo::info(currentThreadId);
-    info.setListener();
-    info.setWorker();
-    info.setThreadId(currentThreadId++);
-    recursorThread();
-
-    handlerInfo.thread.join();
-    if (handlerInfo.exitCode != 0) {
-      ret = handlerInfo.exitCode;
-    }
-    taskInfo.thread.join();
-    if (taskInfo.exitCode != 0) {
-      ret = taskInfo.exitCode;
-    }
-  }
-  else {
-    // Setup RecThreadInfo objects
-    unsigned int tmp = currentThreadId;
-    if (RecThreadInfo::weDistributeQueries()) {
-      for (unsigned int n = 0; n < RecThreadInfo::numDistributors(); ++n) {
-        RecThreadInfo::info(tmp++).setListener();
-      }
-    }
-    for (unsigned int n = 0; n < RecThreadInfo::numWorkers(); ++n) {
-      auto& info = RecThreadInfo::info(tmp++);
-      info.setListener(!RecThreadInfo::weDistributeQueries());
-      info.setWorker();
-    }
-    for (unsigned int n = 0; n < RecThreadInfo::numTaskThreads(); ++n) {
-      auto& info = RecThreadInfo::info(tmp++);
-      info.setTaskThread();
-    }
-
-    if (RecThreadInfo::weDistributeQueries()) {
-      g_log << Logger::Warning << "Launching " << RecThreadInfo::numDistributors() << " distributor threads" << endl;
-      for (unsigned int n = 0; n < RecThreadInfo::numDistributors(); ++n) {
-        auto& info = RecThreadInfo::info(currentThreadId);
-        info.start(currentThreadId++, "distr");
-        setCPUMap(cpusMap, currentThreadId, info.thread.native_handle()); // XXX off by one?
-      }
-    }
-
-    g_log << Logger::Warning << "Launching " << RecThreadInfo::numWorkers() << " worker threads" << endl;
-
-    for (unsigned int n = 0; n < RecThreadInfo::numWorkers(); ++n) {
-      auto& info = RecThreadInfo::info(currentThreadId);
-      info.start(currentThreadId++, "worker");
-      setCPUMap(cpusMap, currentThreadId, info.thread.native_handle()); // XXX off by one?
-    }
-
-    for (unsigned int n = 0; n < RecThreadInfo::numTaskThreads(); ++n) {
-      auto& info = RecThreadInfo::info(currentThreadId);
-      info.start(currentThreadId++, "taskThread");
-      setCPUMap(cpusMap, currentThreadId, info.thread.native_handle()); // XXX off by one?
-    }
-
-#ifdef HAVE_SYSTEMD
-    sd_notify(0, "READY=1");
-#endif
-
-    /* This thread handles the web server, carbon, statistics and the control channel */
-    auto& info = RecThreadInfo::info(0);
-    info.setHandler();
-    info.start(0, "web+stat");
-
-    for (auto& ti : RecThreadInfo::infos()) {
-      ti.thread.join();
-      if (ti.exitCode != 0) {
-        ret = ti.exitCode;
-      }
-    }
-  }
-
-  return ret;
+  return RecThreadInfo::runThreads();
 }
 
 static void handlePipeRequest(int fd, FDMultiplexer::funcparam_t& var)
@@ -1937,7 +1950,7 @@ void* recursorThread()
 
       if (threadInfo.isHandler()) {
         if (!primeHints()) {
-          threadInfo.exitCode = EXIT_FAILURE;
+          threadInfo.setExitCode(EXIT_FAILURE);
           RecursorControlChannel::stop = 1;
           g_log << Logger::Critical << "Priming cache failed, stopping" << endl;
           return nullptr;
index 93d5c979eb5082080d9fc36864e6b6297c836cb1..01d375e4ea4a5182914bcc72f9ac0d8746db4097 100644 (file)
@@ -376,16 +376,6 @@ public:
     taskThread = true;
   }
 
-  void start(unsigned int id, const string& name)
-  {
-    thread = std::thread([id, name] {
-      t_id = id;
-      const string threadPrefix = "rec/";
-      setThreadName(threadPrefix + name);
-      recursorThread();
-    });
-  }
-
   static unsigned int id()
   {
     return t_id;
@@ -441,6 +431,14 @@ public:
     return numHandlers() + numDistributors() + numWorkers() + numTaskThreads();
   }
 
+  static int runThreads();
+  static void makeThreadPipes();
+
+  void setExitCode(int e)
+  {
+    exitCode = e;
+  }
+
   // FD corresponding to TCP sockets this thread is listening on.
   // These FDs are also in deferredAdds when we have one socket per
   // listener, and in g_deferredAdds instead.
@@ -451,12 +449,15 @@ public:
   deferredAdd_t deferredAdds;
 
   struct ThreadPipeSet pipes;
-  std::thread thread;
   MT_t* mt{nullptr};
   uint64_t numberOfDistributedQueries{0};
-  int exitCode{0};
 
 private:
+  void start(unsigned int id, const string& name, const std::map<unsigned int, std::set<int>>& cpusMap);
+
+  std::thread thread;
+  int exitCode{0};
+
   // handle the web server, carbon, statistics and the control channel
   bool handler{false};
   // accept incoming queries (and distributes them to the workers if pdns-distributes-queries is set)