]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
Run seperate TCP threads. These threads listen and process incoming TCP queries
authorOtto Moerbeek <otto.moerbeek@open-xchange.com>
Wed, 30 Aug 2023 10:34:34 +0000 (12:34 +0200)
committerOtto Moerbeek <otto.moerbeek@open-xchange.com>
Wed, 13 Sep 2023 11:20:54 +0000 (13:20 +0200)
TODO: test, validate reuseport behaviour, settings, docs

pdns/recursordist/pdns_recursor.cc
pdns/recursordist/rec-main.cc
pdns/recursordist/rec-main.hh

index e6ef3bae63c2af0f43e1ab293a172a4fcdf23264..d8da4bd7a41b2e5a6a78e6e5c10b56f63cb9c00f 100644 (file)
@@ -2064,7 +2064,7 @@ void requestWipeCaches(const DNSName& canon)
   ThreadMSG* tmsg = new ThreadMSG(); // NOLINT: pointer owner
   tmsg->func = [=] { return pleaseWipeCaches(canon, true, 0xffff); };
   tmsg->wantAnswer = false;
-  if (write(RecThreadInfo::info(0).pipes.writeToThread, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) { // NOLINT: correct sizeof
+  if (write(RecThreadInfo::info(0).getPipes().writeToThread, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) { // NOLINT: correct sizeof
     delete tmsg; // NOLINT: pointer owner
 
     unixDie("write to thread pipe returned wrong size or error");
@@ -2084,7 +2084,7 @@ bool expectProxyProtocol(const ComboAddress& from)
 // mappedSource: the address we assume the query is coming from. Differs from source if table based mapping has been applied
 static string* doProcessUDPQuestion(const std::string& question, const ComboAddress& fromaddr, const ComboAddress& destaddr, ComboAddress source, ComboAddress destination, const ComboAddress& mappedSource, struct timeval tval, int fileDesc, std::vector<ProxyProtocolValue>& proxyProtocolValues, RecEventTrace& eventTrace) // NOLINT(readability-function-cognitive-complexity): https://github.com/PowerDNS/pdns/issues/12791
 {
-  ++(RecThreadInfo::self().numberOfDistributedQueries);
+  RecThreadInfo::self().incNumberOfDistributedQueries();
   gettimeofday(&g_now, nullptr);
   if (tval.tv_sec != 0) {
     struct timeval diff = g_now - tval;
@@ -2691,7 +2691,7 @@ static bool trySendingQueryToWorker(unsigned int target, ThreadMSG* tmsg)
     _exit(1);
   }
 
-  const auto& tps = targetInfo.pipes;
+  const auto& tps = targetInfo.getPipes();
 
   ssize_t written = write(tps.writeQueriesToThread, &tmsg, sizeof(tmsg)); // NOLINT: correct sizeof
   if (written > 0) {
@@ -2714,7 +2714,7 @@ static bool trySendingQueryToWorker(unsigned int target, ThreadMSG* tmsg)
 
 static unsigned int getWorkerLoad(size_t workerIdx)
 {
-  const auto* multiThreader = RecThreadInfo::info(RecThreadInfo::numHandlers() + RecThreadInfo::numDistributors() + workerIdx).mt;
+  const auto* multiThreader = RecThreadInfo::info(RecThreadInfo::numHandlers() + RecThreadInfo::numDistributors() + workerIdx).getMT();
   if (multiThreader != nullptr) {
     return multiThreader->numProcesses();
   }
index 10469fb445080a6669b3f087af6201f1aead4fc1..5a060f57e94ee4d79357527b1d229185342b6a9c 100644 (file)
@@ -106,7 +106,8 @@ std::shared_ptr<Logr::Logger> g_slogudpin;
 std::shared_ptr<Logr::Logger> g_slogudpout;
 
 /* without reuseport, all listeners share the same sockets */
-deferredAdd_t g_deferredAdds;
+static deferredAdd_t g_deferredAdds;
+static deferredAdd_t g_deferredTCPAdds;
 
 /* first we have the handler thread, t_id == 0 (some other
    helper threads like SNMP might have t_id == 0 as well)
@@ -120,6 +121,7 @@ thread_local std::unique_ptr<ProxyMapping> t_proxyMapping;
 bool RecThreadInfo::s_weDistributeQueries; // if true, 1 or more threads listen on the incoming query sockets and distribute them to workers
 unsigned int RecThreadInfo::s_numDistributorThreads;
 unsigned int RecThreadInfo::s_numWorkerThreads;
+unsigned int RecThreadInfo::s_numTCPWorkerThreads;
 thread_local unsigned int RecThreadInfo::t_id;
 
 static std::map<unsigned int, std::set<int>> parseCPUMap(Logr::log_t log)
@@ -218,7 +220,6 @@ void RecThreadInfo::start(unsigned int tid, const string& tname, const std::map<
 int RecThreadInfo::runThreads(Logr::log_t log)
 {
   int ret = EXIT_SUCCESS;
-  unsigned int currentThreadId = 1;
   const auto cpusMap = parseCPUMap(log);
 
   if (RecThreadInfo::numDistributors() + RecThreadInfo::numWorkers() == 1) {
@@ -226,72 +227,106 @@ int RecThreadInfo::runThreads(Logr::log_t log)
          log->info(Logr::Notice, "Operating with single distributor/worker thread"));
 
     /* This thread handles the web server, carbon, statistics and the control channel */
-    auto& handlerInfo = RecThreadInfo::info(0);
+    unsigned int currentThreadId = 0;
+    auto& handlerInfo = RecThreadInfo::info(currentThreadId);
     handlerInfo.setHandler();
-    handlerInfo.start(0, "web+stat", cpusMap, log);
-    auto& taskInfo = RecThreadInfo::info(2);
-    taskInfo.setTaskThread();
-    taskInfo.start(2, "task", cpusMap, log);
+    handlerInfo.start(currentThreadId, "web+stat", cpusMap, log);
 
+    // We skip the single UDP worker thread 1, it's handled after the loop and taskthreads
+    currentThreadId = 2;
+    for (unsigned int thread = 0; thread < RecThreadInfo::numTCPWorkers(); thread++, currentThreadId++) {
+      auto& info = RecThreadInfo::info(currentThreadId);
+      info.setListener();
+      info.setTCPListener();
+      info.setWorker();
+      info.start(currentThreadId, "tcpworker", cpusMap, log);
+    }
+
+    for (unsigned int thread = 0; thread < RecThreadInfo::numTaskThreads(); thread++, currentThreadId++) {
+      auto& taskInfo = RecThreadInfo::info(currentThreadId);
+      taskInfo.setTaskThread();
+      taskInfo.start(currentThreadId, "task", cpusMap, log);
+    }
+
+    currentThreadId = 1;
     auto& info = RecThreadInfo::info(currentThreadId);
     info.setListener();
+    info.setTCPListener();
     info.setWorker();
-    RecThreadInfo::setThreadId(currentThreadId++);
+    RecThreadInfo::setThreadId(currentThreadId);
     recursorThread();
 
-    handlerInfo.thread.join();
-    if (handlerInfo.exitCode != 0) {
-      ret = handlerInfo.exitCode;
-    }
-    taskInfo.thread.join();
-    if (taskInfo.exitCode != 0) {
-      ret = taskInfo.exitCode;
+    for (unsigned int thread = 0; thread < RecThreadInfo::numRecursorThreads(); thread++) {
+      if (thread == 1) {
+        continue;
+      }
+      auto& tInfo = RecThreadInfo::info(thread);
+      tInfo.thread.join();
+      if (tInfo.exitCode != 0) {
+        ret = tInfo.exitCode;
+      }
     }
   }
   else {
     // Setup RecThreadInfo objects
-    unsigned int tmp = currentThreadId;
+    unsigned int currentThreadId = 1;
     if (RecThreadInfo::weDistributeQueries()) {
-      for (unsigned int thread = 0; thread < RecThreadInfo::numDistributors(); ++thread) {
-        RecThreadInfo::info(tmp++).setListener();
+      for (unsigned int thread = 0; thread < RecThreadInfo::numDistributors(); thread++, currentThreadId++) {
+        RecThreadInfo::info(currentThreadId).setListener();
       }
     }
-    for (unsigned int thread = 0; thread < RecThreadInfo::numWorkers(); ++thread) {
-      auto& info = RecThreadInfo::info(tmp++);
+    for (unsigned int thread = 0; thread < RecThreadInfo::numWorkers(); thread++, currentThreadId++) {
+      auto& info = RecThreadInfo::info(currentThreadId);
       info.setListener(!RecThreadInfo::weDistributeQueries());
       info.setWorker();
     }
-    for (unsigned int thread = 0; thread < RecThreadInfo::numTaskThreads(); ++thread) {
-      auto& info = RecThreadInfo::info(tmp++);
+    for (unsigned int thread = 0; thread < RecThreadInfo::numTCPWorkers(); thread++, currentThreadId++) {
+      auto& info = RecThreadInfo::info(currentThreadId);
+      info.setListener();
+      info.setTCPListener();
+      info.setWorker();
+    }
+    for (unsigned int thread = 0; thread < RecThreadInfo::numTaskThreads(); thread++, currentThreadId++) {
+      auto& info = RecThreadInfo::info(currentThreadId);
       info.setTaskThread();
     }
 
     // And now start the actual threads
+    currentThreadId = 1;
     if (RecThreadInfo::weDistributeQueries()) {
       SLOG(g_log << Logger::Warning << "Launching " << RecThreadInfo::numDistributors() << " distributor threads" << endl,
            log->info(Logr::Notice, "Launching distributor threads", "count", Logging::Loggable(RecThreadInfo::numDistributors())));
-      for (unsigned int thread = 0; thread < RecThreadInfo::numDistributors(); ++thread) {
+      for (unsigned int thread = 0; thread < RecThreadInfo::numDistributors(); thread++, currentThreadId++) {
         auto& info = RecThreadInfo::info(currentThreadId);
-        info.start(currentThreadId++, "distr", cpusMap, log);
+        info.start(currentThreadId, "distr", cpusMap, log);
       }
     }
     SLOG(g_log << Logger::Warning << "Launching " << RecThreadInfo::numWorkers() << " worker threads" << endl,
          log->info(Logr::Notice, "Launching worker threads", "count", Logging::Loggable(RecThreadInfo::numWorkers())));
 
-    for (unsigned int thread = 0; thread < RecThreadInfo::numWorkers(); ++thread) {
+    for (unsigned int thread = 0; thread < RecThreadInfo::numWorkers(); thread++, currentThreadId++) {
+      auto& info = RecThreadInfo::info(currentThreadId);
+      info.start(currentThreadId, "worker", cpusMap, log);
+    }
+
+    SLOG(g_log << Logger::Warning << "Launching " << RecThreadInfo::numTCPWorkers() << " tcpworker threads" << endl,
+         log->info(Logr::Notice, "Launching tcpworker threads", "count", Logging::Loggable(RecThreadInfo::numTCPWorkers())));
+
+    for (unsigned int thread = 0; thread < RecThreadInfo::numTCPWorkers(); thread++, currentThreadId++) {
       auto& info = RecThreadInfo::info(currentThreadId);
-      info.start(currentThreadId++, "worker", cpusMap, log);
+      info.start(currentThreadId, "tcpworker", cpusMap, log);
     }
 
-    for (unsigned int thread = 0; thread < RecThreadInfo::numTaskThreads(); ++thread) {
+    for (unsigned int thread = 0; thread < RecThreadInfo::numTaskThreads(); thread++, currentThreadId++) {
       auto& info = RecThreadInfo::info(currentThreadId);
-      info.start(currentThreadId++, "task", cpusMap, log);
+      info.start(currentThreadId, "task", cpusMap, log);
     }
 
     /* This thread handles the web server, carbon, statistics and the control channel */
-    auto& info = RecThreadInfo::info(0);
+    currentThreadId = 0;
+    auto& info = RecThreadInfo::info(currentThreadId);
     info.setHandler();
-    info.start(0, "web+stat", cpusMap, log);
+    info.start(currentThreadId, "web+stat", cpusMap, log);
 
     for (auto& tInfo : RecThreadInfo::infos()) {
       tInfo.thread.join();
@@ -1123,8 +1158,8 @@ static void doStats()
     size_t idx = 0;
     for (const auto& threadInfo : RecThreadInfo::infos()) {
       if (threadInfo.isWorker()) {
-        SLOG(g_log << Logger::Notice << "stats: thread " << idx << " has been distributed " << threadInfo.numberOfDistributedQueries << " queries" << endl,
-             log->info(Logr::Info, "Queries handled by thread", "thread", Logging::Loggable(idx), "count", Logging::Loggable(threadInfo.numberOfDistributedQueries)));
+        SLOG(g_log << Logger::Notice << "stats: thread " << idx << " has been distributed " << threadInfo.getNumberOfDistributedQueries() << " queries" << endl,
+             log->info(Logr::Info, "Queries handled by thread", "thread", Logging::Loggable(idx), "count", Logging::Loggable(threadInfo.getNumberOfDistributedQueries())));
         ++idx;
       }
     }
@@ -1341,7 +1376,7 @@ void broadcastFunction(const pipefunc_t& func)
   }
 
   unsigned int thread = 0;
-  for (const auto& threadInfo : RecThreadInfo::infos()) {
+  for (auto& threadInfo : RecThreadInfo::infos()) {
     if (thread++ == RecThreadInfo::id()) {
       func(); // don't write to ourselves!
       continue;
@@ -1350,14 +1385,14 @@ void broadcastFunction(const pipefunc_t& func)
     ThreadMSG* tmsg = new ThreadMSG(); // NOLINT: manual ownership handling
     tmsg->func = func;
     tmsg->wantAnswer = true;
-    if (write(threadInfo.pipes.writeToThread, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) { // NOLINT: sizeof correct
+    if (write(threadInfo.getPipes().writeToThread, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) { // NOLINT: sizeof correct
       delete tmsg; // NOLINT: manual ownership handling
 
       unixDie("write to thread pipe returned wrong size or error");
     }
 
     string* resp = nullptr;
-    if (read(threadInfo.pipes.readFromThread, &resp, sizeof(resp)) != sizeof(resp)) { // NOLINT: sizeof correct
+    if (read(threadInfo.getPipes().readFromThread, &resp, sizeof(resp)) != sizeof(resp)) { // NOLINT: sizeof correct
       unixDie("read from thread pipe returned wrong size or error");
     }
 
@@ -1420,12 +1455,12 @@ T broadcastAccFunction(const std::function<T*()>& func)
 
   unsigned int thread = 0;
   T ret = T();
-  for (const auto& threadInfo : RecThreadInfo::infos()) {
+  for (auto& threadInfo : RecThreadInfo::infos()) {
     if (thread++ == RecThreadInfo::id()) {
       continue;
     }
 
-    const auto& tps = threadInfo.pipes;
+    const auto& tps = threadInfo.getPipes();
     ThreadMSG* tmsg = new ThreadMSG(); // NOLINT: manual ownership handling
     tmsg->func = [func] { return voider<T>(func); };
     tmsg->wantAnswer = true;
@@ -1720,50 +1755,45 @@ static void initDistribution(Logr::log_t log)
   g_reusePort = ::arg().mustDo("reuseport");
 #endif
 
-  RecThreadInfo::infos().resize(RecThreadInfo::numHandlers() + RecThreadInfo::numDistributors() + RecThreadInfo::numWorkers() + RecThreadInfo::numTaskThreads());
+  RecThreadInfo::infos().resize(RecThreadInfo::numRecursorThreads());
 
   if (g_reusePort) {
+    unsigned int threadNum = 1;
     if (RecThreadInfo::weDistributeQueries()) {
       /* first thread is the handler, then distributors */
-      for (unsigned int threadId = 1; threadId <= RecThreadInfo::numDistributors(); threadId++) {
-        auto& info = RecThreadInfo::info(threadId);
-        auto& deferredAdds = info.deferredAdds;
-        auto& tcpSockets = info.tcpSockets;
+      for (unsigned int i = 0; i < RecThreadInfo::numDistributors(); i++, threadNum++) {
+        auto& info = RecThreadInfo::info(threadNum);
+        auto& deferredAdds = info.getDeferredAdds();
         makeUDPServerSockets(deferredAdds, log);
-        makeTCPServerSockets(deferredAdds, tcpSockets, log);
       }
     }
     else {
       /* first thread is the handler, there is no distributor here and workers are accepting queries */
-      for (unsigned int threadId = 1; threadId <= RecThreadInfo::numWorkers(); threadId++) {
-        auto& info = RecThreadInfo::info(threadId);
-        auto& deferredAdds = info.deferredAdds;
-        auto& tcpSockets = info.tcpSockets;
+      for (unsigned int i = 0; i < RecThreadInfo::numWorkers(); i++, threadNum++) {
+        auto& info = RecThreadInfo::info(threadNum);
+        auto& deferredAdds = info.getDeferredAdds();
         makeUDPServerSockets(deferredAdds, log);
-        makeTCPServerSockets(deferredAdds, tcpSockets, log);
       }
     }
+    threadNum = 1 + RecThreadInfo::numDistributors() + RecThreadInfo::numWorkers();
+    for (unsigned int i = 0; i < RecThreadInfo::numTCPWorkers(); i++, threadNum++) {
+      auto& info = RecThreadInfo::info(threadNum);
+      auto& deferredAdds = info.getDeferredAdds();
+      auto& tcpSockets = info.getTCPSockets();
+      makeTCPServerSockets(deferredAdds, tcpSockets, log);
+    }
   }
   else {
     std::set<int> tcpSockets;
     /* we don't have reuseport so we can only open one socket per
        listening addr:port and everyone will listen on it */
     makeUDPServerSockets(g_deferredAdds, log);
-    makeTCPServerSockets(g_deferredAdds, tcpSockets, log);
+    makeTCPServerSockets(g_deferredTCPAdds, tcpSockets, log);
 
-    /* every listener (so distributor if g_weDistributeQueries, workers otherwise)
-       needs to listen to the shared sockets */
-    if (RecThreadInfo::weDistributeQueries()) {
-      /* first thread is the handler, then distributors */
-      for (unsigned int threadId = 1; threadId <= RecThreadInfo::numDistributors(); threadId++) {
-        RecThreadInfo::info(threadId).tcpSockets = tcpSockets;
-      }
-    }
-    else {
-      /* first thread is the handler, there is no distributor here and workers are accepting queries */
-      for (unsigned int threadId = 1; threadId <= RecThreadInfo::numWorkers(); threadId++) {
-        RecThreadInfo::info(threadId).tcpSockets = tcpSockets;
-      }
+    // TCP queries are handled by TCP workers
+    for (unsigned int i = 0; i < RecThreadInfo::numTCPWorkers(); i++) {
+      auto& info = RecThreadInfo::info(i + 1 + RecThreadInfo::numDistributors() + RecThreadInfo::numWorkers());
+      info.setTCPSockets(tcpSockets);
     }
   }
 }
@@ -1938,7 +1968,7 @@ static void initSuffixMatchNodes([[maybe_unused]] Logr::log_t log)
     vector<string> parts;
     stringtok(parts, ::arg()["dot-to-auth-names"], " ,");
 #ifndef HAVE_DNS_OVER_TLS
-    if (parts.size()) {
+    if (!parts.empty()) {
       SLOG(g_log << Logger::Error << "dot-to-auth-names setting contains names, but Recursor was built without DNS over TLS support. Setting will be ignored." << endl,
            log->info(Logr::Error, "dot-to-auth-names setting contains names, but Recursor was built without DNS over TLS support. Setting will be ignored"));
     }
@@ -2102,6 +2132,12 @@ static int serviceMain(Logr::log_t log)
          log->info(Logr::Warning, "Asked to run with 0 threads, raising to 1 instead"));
     RecThreadInfo::setNumWorkerThreads(1);
   }
+  RecThreadInfo::setNumTCPWorkerThreads(1); // XXX
+  if (RecThreadInfo::numTCPWorkers() < 1) {
+    SLOG(g_log << Logger::Warning << "Asked to run with 0 tcpthreads, raising to 1 instead" << endl,
+         log->info(Logr::Warning, "Asked to run with 0 tcpthreads, raising to 1 instead"));
+    RecThreadInfo::setNumTCPWorkerThreads(1);
+  }
 
   g_maxMThreads = ::arg().asNum("max-mthreads");
 
@@ -2241,7 +2277,7 @@ static void handlePipeRequest(int fileDesc, FDMultiplexer::funcparam_t& /* var *
     }
   }
   if (tmsg->wantAnswer) {
-    if (write(RecThreadInfo::self().pipes.writeFromThread, &resp, sizeof(resp)) != sizeof(resp)) {
+    if (write(RecThreadInfo::self().getPipes().writeFromThread, &resp, sizeof(resp)) != sizeof(resp)) {
       delete tmsg; // NOLINT: manual ownership handling
       unixDie("write to thread pipe returned wrong size or error");
     }
@@ -2262,9 +2298,8 @@ static void handleRCC(int fileDesc, FDMultiplexer::funcparam_t& /* var */)
     SLOG(g_log << Logger::Info << "Received rec_control command '" << msg << "' via controlsocket" << endl,
          log->info(Logr::Info, "Received rec_control command via control socket", "command", Logging::Loggable(msg)));
 
-    RecursorControlParser rcp;
     RecursorControlParser::func_t* command = nullptr;
-    auto answer = rcp.getAnswer(clientfd, msg, &command);
+    auto answer = RecursorControlParser::getAnswer(clientfd, msg, &command);
 
     g_rcc.send(clientfd, answer);
     command();
@@ -2293,7 +2328,6 @@ public:
   void runIfDue(struct timeval& now, const std::function<void()>& function)
   {
     if (last_run < now - period) {
-      // cerr << RecThreadInfo::id() << ' ' << name << ' ' << now.tv_sec << '.' << now.tv_usec << " running" << endl;
       function();
       Utility::gettimeofday(&last_run);
       now = last_run;
@@ -2567,10 +2601,10 @@ static void runLuaMaintenance(RecThreadInfo& threadInfo, time_t& last_lua_mainte
 
 static void runTCPMaintenance(RecThreadInfo& threadInfo, bool& listenOnTCP, unsigned int maxTcpClients)
 {
-  if (threadInfo.isListener()) {
+  if (threadInfo.isTCPListener()) {
     if (listenOnTCP) {
       if (TCPConnection::getCurrentConnections() > maxTcpClients) { // shutdown, too many connections
-        for (const auto fileDesc : threadInfo.tcpSockets) {
+        for (const auto fileDesc : threadInfo.getTCPSockets()) {
           t_fdm->removeReadFD(fileDesc);
         }
         listenOnTCP = false;
@@ -2578,7 +2612,7 @@ static void runTCPMaintenance(RecThreadInfo& threadInfo, bool& listenOnTCP, unsi
     }
     else {
       if (TCPConnection::getCurrentConnections() <= maxTcpClients) { // reenable
-        for (const auto fileDesc : threadInfo.tcpSockets) {
+        for (const auto fileDesc : threadInfo.getTCPSockets()) {
           t_fdm->addReadFD(fileDesc, handleNewTCPQuestion);
         }
         listenOnTCP = true;
@@ -2741,7 +2775,7 @@ static void recursorThread()
       t_bogusqueryring->set_capacity(ringsize);
     }
     g_multiTasker = std::make_unique<MT_t>(::arg().asNum("stack-size"), ::arg().asNum("stack-cache-size"));
-    threadInfo.mt = g_multiTasker.get();
+    threadInfo.setMT(g_multiTasker.get());
 
     /* start protobuf export threads if needed */
     auto luaconfsLocal = g_luaconfs.getLocal();
@@ -2756,7 +2790,7 @@ static void recursorThread()
 
     std::unique_ptr<RecursorWebServer> rws;
 
-    t_fdm->addReadFD(threadInfo.pipes.readToThread, handlePipeRequest);
+    t_fdm->addReadFD(threadInfo.getPipes().readToThread, handlePipeRequest);
 
     if (threadInfo.isHandler()) {
       if (::arg().mustDo("webserver")) {
@@ -2775,18 +2809,18 @@ static void recursorThread()
            log->info(Logr::Info, "Enabled multiplexer", "name", Logging::Loggable(t_fdm->getName())));
     }
     else {
-      t_fdm->addReadFD(threadInfo.pipes.readQueriesToThread, handlePipeRequest);
+      t_fdm->addReadFD(threadInfo.getPipes().readQueriesToThread, handlePipeRequest);
 
       if (threadInfo.isListener()) {
         if (g_reusePort) {
           /* then every listener has its own FDs */
-          for (const auto& deferred : threadInfo.deferredAdds) {
+          for (const auto& deferred : threadInfo.getDeferredAdds()) {
             t_fdm->addReadFD(deferred.first, deferred.second);
           }
         }
         else {
           /* otherwise all listeners are listening on the same ones */
-          for (const auto& deferred : g_deferredAdds) {
+          for (const auto& deferred : threadInfo.isTCPListener() ? g_deferredTCPAdds : g_deferredAdds) {
             t_fdm->addReadFD(deferred.first, deferred.second);
           }
         }
@@ -3494,10 +3528,10 @@ static string* pleaseUseNewTraceRegex(const std::string& newRegex, int file)
     }
     t_traceRegex = std::make_shared<Regex>(newRegex);
     t_tracefd = file;
-    return new string("ok\n");
+    return new string("ok\n"); // NOLINT(cppcoreguidelines-owning-memory): it's the API
   }
   catch (const PDNSException& ae) {
-    return new string(ae.reason + "\n");
+    return new string(ae.reason + "\n"); // NOLINT(cppcoreguidelines-owning-memory): it's the API
   }
 }
 
index b048da71a7ff4fa5e33350a7105cb3974b2d4583..d254274b33a99743620bbdb71b97f40d4b8f976c 100644 (file)
@@ -279,7 +279,6 @@ extern std::set<uint16_t> g_avoidUdpSourcePorts;
 
 /* without reuseport, all listeners share the same sockets */
 typedef vector<pair<int, std::function<void(int, boost::any&)>>> deferredAdd_t;
-extern deferredAdd_t g_deferredAdds;
 
 typedef map<ComboAddress, uint32_t, ComboAddress::addressOnlyLessThan> tcpClientCounts_t;
 extern thread_local std::unique_ptr<tcpClientCounts_t> t_tcpClientCounts;
@@ -346,9 +345,9 @@ public:
     return s_threadInfos.at(t_id);
   }
 
-  static RecThreadInfo& info(unsigned int i)
+  static RecThreadInfo& info(unsigned int index)
   {
-    return s_threadInfos.at(i);
+    return s_threadInfos.at(index);
   }
 
   static vector<RecThreadInfo>& infos()
@@ -356,7 +355,7 @@ public:
     return s_threadInfos;
   }
 
-  bool isDistributor() const
+  [[nodiscard]] bool isDistributor() const
   {
     if (t_id == 0) {
       return false;
@@ -364,7 +363,7 @@ public:
     return s_weDistributeQueries && listener;
   }
 
-  bool isHandler() const
+  [[nodiscard]] bool isHandler() const
   {
     if (t_id == 0) {
       return true;
@@ -372,17 +371,21 @@ public:
     return handler;
   }
 
-  bool isWorker() const
+  [[nodiscard]] bool isWorker() const
   {
     return worker;
   }
 
-  bool isListener() const
+  [[nodiscard]] bool isListener() const
   {
     return listener;
   }
+  [[nodiscard]] bool isTCPListener() const
+  {
+    return tcplistener;
+  }
 
-  bool isTaskThread() const
+  [[nodiscard]] bool isTaskThread() const
   {
     return taskThread;
   }
@@ -402,6 +405,11 @@ public:
     listener = flag;
   }
 
+  void setTCPListener(bool flag = true)
+  {
+    tcplistener = flag;
+  }
+
   void setTaskThread()
   {
     taskThread = true;
@@ -412,12 +420,12 @@ public:
     return t_id;
   }
 
-  static void setThreadId(unsigned int id)
+  static void setThreadId(unsigned int arg)
   {
-    t_id = id;
+    t_id = arg;
   }
 
-  std::string getName() const
+  [[nodiscard]] std::string getName() const
   {
     return name;
   }
@@ -437,6 +445,11 @@ public:
     return s_numWorkerThreads;
   }
 
+  static unsigned int numTCPWorkers()
+  {
+    return s_numTCPWorkerThreads;
+  }
+
   static unsigned int numDistributors()
   {
     return s_numDistributorThreads;
@@ -457,6 +470,11 @@ public:
     s_numWorkerThreads = n;
   }
 
+  static void setNumTCPWorkerThreads(unsigned int n)
+  {
+    s_numTCPWorkerThreads = n;
+  }
+
   static void setNumDistributorThreads(unsigned int n)
   {
     s_numDistributorThreads = n;
@@ -464,17 +482,58 @@ public:
 
   static unsigned int numRecursorThreads()
   {
-    return numHandlers() + numDistributors() + numWorkers() + numTaskThreads();
+    return numHandlers() + numDistributors() + numWorkers() + numTCPWorkers() + numTaskThreads();
   }
 
   static int runThreads(Logr::log_t);
   static void makeThreadPipes(Logr::log_t);
 
-  void setExitCode(int e)
+  void setExitCode(int n)
   {
-    exitCode = e;
+    exitCode = n;
   }
 
+  std::set<int>& getTCPSockets()
+  {
+    return tcpSockets;
+  }
+
+  void setTCPSockets(std::set<int>& socks)
+  {
+    tcpSockets = socks;
+  }
+
+  deferredAdd_t& getDeferredAdds()
+  {
+    return deferredAdds;
+  }
+
+  ThreadPipeSet& getPipes()
+  {
+    return pipes;
+  }
+
+  [[nodiscard]] uint64_t getNumberOfDistributedQueries() const
+  {
+    return numberOfDistributedQueries;
+  }
+
+  void incNumberOfDistributedQueries()
+  {
+    numberOfDistributedQueries++;
+  }
+
+  MT_t* getMT()
+  {
+    return mt;
+  }
+
+  void setMT(MT_t* theMT)
+  {
+    mt = theMT;
+  }
+
+private:
   // FD corresponding to TCP sockets this thread is listening on.
   // These FDs are also in deferredAdds when we have one socket per
   // listener, and in g_deferredAdds instead.
@@ -488,8 +547,7 @@ public:
   MT_t* mt{nullptr};
   uint64_t numberOfDistributedQueries{0};
 
-private:
-  void start(unsigned int id, const string& name, const std::map<unsigned int, std::set<int>>& cpusMap, Logr::log_t);
+  void start(unsigned int theId, const string& name, const std::map<unsigned int, std::set<int>>& cpusMap, Logr::log_t);
 
   std::string name;
   std::thread thread;
@@ -499,6 +557,8 @@ private:
   bool handler{false};
   // accept incoming queries (and distributes them to the workers if pdns-distributes-queries is set)
   bool listener{false};
+  // accept incoming TCP queries (and distributes them to the workers if pdns-distributes-queries is set)
+  bool tcplistener{false};
   // process queries
   bool worker{false};
   // run async tasks: from TaskQueue and ZoneToCache
@@ -509,6 +569,7 @@ private:
   static bool s_weDistributeQueries; // if true, 1 or more threads listen on the incoming query sockets and distribute them to workers
   static unsigned int s_numDistributorThreads;
   static unsigned int s_numWorkerThreads;
+  static unsigned int s_numTCPWorkerThreads;
 };
 
 struct ThreadMSG