]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
Refactor RecThreadInfo to be more than just a struct
authorOtto Moerbeek <otto.moerbeek@open-xchange.com>
Tue, 25 Jan 2022 12:25:29 +0000 (13:25 +0100)
committerOtto Moerbeek <otto.moerbeek@open-xchange.com>
Fri, 4 Feb 2022 10:06:15 +0000 (11:06 +0100)
pdns/lua-recursor4.cc
pdns/pdns_recursor.cc
pdns/rec_channel_rec.cc
pdns/recursordist/rec-main.cc
pdns/recursordist/rec-main.hh
pdns/recursordist/rec-tcp.cc
pdns/recursordist/test-syncres_cc.cc
pdns/syncres.hh

index b346c3f4130ec22edf1c9237e97a8efd3bbfb9eb..4b49c68828a6163b7e12a7a7874f11157497f7c6 100644 (file)
@@ -422,8 +422,8 @@ void RecursorLua4::postPrepareContext()
     });
 
   d_lw->writeFunction("getRecursorThreadId", []() {
-      return getRecursorThreadId();
-    });
+    return RecThreadInfo::id();
+  });
 
   d_lw->writeFunction("sendCustomSNMPTrap", [](const std::string& str) {
       if (g_snmpAgent) {
index 5f0414013d1e6477e0c88050bf7ef0d8e16abf1f..625050fb68074f6b7b07eb21d8d7ca80e501bd8b 100644 (file)
@@ -43,7 +43,6 @@
 #endif /* NOD_ENABLED */
 
 thread_local std::shared_ptr<RecursorLua4> t_pdl;
-thread_local unsigned int t_id = 0;
 thread_local std::shared_ptr<Regex> t_traceRegex;
 thread_local std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>> t_protobufServers{nullptr};
 thread_local std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>> t_outgoingProtobufServers{nullptr};
@@ -964,7 +963,7 @@ void startDoResolve(void* p)
     }
 
     if (!g_quiet || tracedQuery) {
-      g_log << Logger::Warning << t_id << " [" << MT->getTid() << "/" << MT->numProcesses() << "] " << (dc->d_tcp ? "TCP " : "") << "question for '" << dc->d_mdp.d_qname << "|"
+      g_log << Logger::Warning << RecThreadInfo::id() << " [" << MT->getTid() << "/" << MT->numProcesses() << "] " << (dc->d_tcp ? "TCP " : "") << "question for '" << dc->d_mdp.d_qname << "|"
             << QType(dc->d_mdp.d_qtype) << "' from " << dc->getRemote();
       if (!dc->d_ednssubnet.source.empty()) {
         g_log << " (ecs " << dc->d_ednssubnet.source.toString() << ")";
@@ -1555,7 +1554,7 @@ void startDoResolve(void* p)
     // Now it always uses an integral number of microseconds, except for averages, which use doubles
     uint64_t spentUsec = uSec(sr.getNow() - dc->d_now);
     if (!g_quiet) {
-      g_log << Logger::Error << t_id << " [" << MT->getTid() << "/" << MT->numProcesses() << "] answer to " << (dc->d_mdp.d_header.rd ? "" : "non-rd ") << "question '" << dc->d_mdp.d_qname << "|" << DNSRecordContent::NumberToType(dc->d_mdp.d_qtype);
+      g_log << Logger::Error << RecThreadInfo::id() << " [" << MT->getTid() << "/" << MT->numProcesses() << "] answer to " << (dc->d_mdp.d_header.rd ? "" : "non-rd ") << "question '" << dc->d_mdp.d_qname << "|" << DNSRecordContent::NumberToType(dc->d_mdp.d_qtype);
       g_log << "': " << ntohs(pw.getHeader()->ancount) << " answers, " << ntohs(pw.getHeader()->arcount) << " additional, took " << sr.d_outqueries << " packets, " << sr.d_totUsec / 1000.0 << " netw ms, " << spentUsec / 1000.0 << " tot ms, " << sr.d_throttledqueries << " throttled, " << sr.d_timeouts << " timeouts, " << sr.d_tcpoutqueries << "/" << sr.d_dotoutqueries << " tcp/dot connections, rcode=" << res;
 
       if (!shouldNotValidate && sr.isDNSSECValidationRequested()) {
@@ -1771,7 +1770,7 @@ bool expectProxyProtocol(const ComboAddress& from)
 
 static string* doProcessUDPQuestion(const std::string& question, const ComboAddress& fromaddr, const ComboAddress& destaddr, ComboAddress source, ComboAddress destination, struct timeval tv, int fd, std::vector<ProxyProtocolValue>& proxyProtocolValues, RecEventTrace& eventTrace)
 {
-  ++g_threadInfos[t_id].numberOfDistributedQueries;
+  ++(RecThreadInfo::self().numberOfDistributedQueries);
   gettimeofday(&g_now, nullptr);
   if (tv.tv_sec) {
     struct timeval diff = g_now - tv;
@@ -1907,7 +1906,7 @@ static string* doProcessUDPQuestion(const std::string& question, const ComboAddr
       eventTrace.add(RecEventTrace::PCacheCheck, cacheHit, false);
       if (cacheHit) {
         if (!g_quiet) {
-          g_log << Logger::Notice << t_id << " question answered from packet cache tag=" << ctag << " from " << source.toStringWithPort() << (source != fromaddr ? " (via " + fromaddr.toStringWithPort() + ")" : "") << endl;
+          g_log << Logger::Notice << RecThreadInfo::id() << " question answered from packet cache tag=" << ctag << " from " << source.toStringWithPort() << (source != fromaddr ? " (via " + fromaddr.toStringWithPort() + ")" : "") << endl;
         }
         struct msghdr msgh;
         struct iovec iov;
@@ -1952,7 +1951,7 @@ static string* doProcessUDPQuestion(const std::string& question, const ComboAddr
     bool ipf = t_pdl->ipfilter(source, destination, *dh, eventTrace);
     if (ipf) {
       if (!g_quiet) {
-        g_log << Logger::Notice << t_id << " [" << MT->getTid() << "/" << MT->numProcesses() << "] DROPPED question from " << source.toStringWithPort() << (source != fromaddr ? " (via " + fromaddr.toStringWithPort() + ")" : "") << " based on policy" << endl;
+        g_log << Logger::Notice << RecThreadInfo::id() << " [" << MT->getTid() << "/" << MT->numProcesses() << "] DROPPED question from " << source.toStringWithPort() << (source != fromaddr ? " (via " + fromaddr.toStringWithPort() + ")" : "") << " based on policy" << endl;
       }
       g_stats.policyDrops++;
       return 0;
@@ -1970,7 +1969,7 @@ static string* doProcessUDPQuestion(const std::string& question, const ComboAddr
     }
 
     if (!g_quiet) {
-      g_log << Logger::Notice << t_id << " got NOTIFY for " << qname.toLogString() << " from " << source.toStringWithPort() << (source != fromaddr ? " (via " + fromaddr.toStringWithPort() + ")" : "") << endl;
+      g_log << Logger::Notice << RecThreadInfo::id() << " got NOTIFY for " << qname.toLogString() << " from " << source.toStringWithPort() << (source != fromaddr ? " (via " + fromaddr.toStringWithPort() + ")" : "") << endl;
     }
 
     requestWipeCaches(qname);
@@ -1984,7 +1983,7 @@ static string* doProcessUDPQuestion(const std::string& question, const ComboAddr
 
   if (MT->numProcesses() > g_maxMThreads) {
     if (!g_quiet)
-      g_log << Logger::Notice << t_id << " [" << MT->getTid() << "/" << MT->numProcesses() << "] DROPPED question from " << source.toStringWithPort() << (source != fromaddr ? " (via " + fromaddr.toStringWithPort() + ")" : "") << ", over capacity" << endl;
+      g_log << Logger::Notice << RecThreadInfo::id() << " [" << MT->getTid() << "/" << MT->numProcesses() << "] DROPPED question from " << source.toStringWithPort() << (source != fromaddr ? " (via " + fromaddr.toStringWithPort() + ")" : "") << ", over capacity" << endl;
 
     g_stats.overCapacityDrops++;
     return 0;
@@ -2188,7 +2187,7 @@ static void handleNewUDPQuestion(int fd, FDMultiplexer::funcparam_t& var)
             destination = dest;
           }
 
-          if (g_weDistributeQueries) {
+          if (RecThreadInfo::s_weDistributeQueries) {
             std::string localdata = data;
             distributeAsyncFunction(data, [localdata, fromaddr, dest, source, destination, tv, fd, proxyProtocolValues, eventTrace]() mutable {
               return doProcessUDPQuestion(localdata, fromaddr, dest, source, destination, tv, fd, proxyProtocolValues, eventTrace);
@@ -2324,7 +2323,7 @@ void makeUDPServerSockets(deferredAdd_t& deferredAdds)
 static bool trySendingQueryToWorker(unsigned int target, ThreadMSG* tmsg)
 {
   auto& targetInfo = g_threadInfos[target];
-  if (!targetInfo.isWorker) {
+  if (!targetInfo.isWorker()) {
     g_log << Logger::Error << "distributeAsyncFunction() tried to assign a query to a non-worker thread" << endl;
     _exit(1);
   }
@@ -2354,7 +2353,7 @@ static bool trySendingQueryToWorker(unsigned int target, ThreadMSG* tmsg)
 
 static unsigned int getWorkerLoad(size_t workerIdx)
 {
-  const auto mt = g_threadInfos[/* skip handler */ 1 + g_numDistributorThreads + workerIdx].mt;
+  const auto mt = g_threadInfos[/* skip handler */ 1 + RecThreadInfo::s_numDistributorThreads + workerIdx].mt;
   if (mt != nullptr) {
     return mt->numProcesses();
   }
@@ -2364,39 +2363,39 @@ static unsigned int getWorkerLoad(size_t workerIdx)
 static unsigned int selectWorker(unsigned int hash)
 {
   if (g_balancingFactor == 0) {
-    return /* skip handler */ 1 + g_numDistributorThreads + (hash % g_numWorkerThreads);
+    return /* skip handler */ 1 + RecThreadInfo::s_numDistributorThreads + (hash % RecThreadInfo::s_numWorkerThreads);
   }
 
   /* we start with one, representing the query we are currently handling */
   double currentLoad = 1;
-  std::vector<unsigned int> load(g_numWorkerThreads);
-  for (size_t idx = 0; idx < g_numWorkerThreads; idx++) {
+  std::vector<unsigned int> load(RecThreadInfo::s_numWorkerThreads);
+  for (size_t idx = 0; idx < RecThreadInfo::s_numWorkerThreads; idx++) {
     load[idx] = getWorkerLoad(idx);
     currentLoad += load[idx];
     // cerr<<"load for worker "<<idx<<" is "<<load[idx]<<endl;
   }
 
-  double targetLoad = (currentLoad / g_numWorkerThreads) * g_balancingFactor;
+  double targetLoad = (currentLoad / RecThreadInfo::s_numWorkerThreads) * g_balancingFactor;
   // cerr<<"total load is "<<currentLoad<<", number of workers is "<<g_numWorkerThreads<<", target load is "<<targetLoad<<endl;
 
-  unsigned int worker = hash % g_numWorkerThreads;
+  unsigned int worker = hash % RecThreadInfo::s_numWorkerThreads;
   /* at least one server has to be at or below the average load */
   if (load[worker] > targetLoad) {
     ++g_stats.rebalancedQueries;
     do {
       // cerr<<"worker "<<worker<<" is above the target load, selecting another one"<<endl;
-      worker = (worker + 1) % g_numWorkerThreads;
+      worker = (worker + 1) % RecThreadInfo::s_numWorkerThreads;
     } while (load[worker] > targetLoad);
   }
 
-  return /* skip handler */ 1 + g_numDistributorThreads + worker;
+  return /* skip handler */ 1 + RecThreadInfo::s_numDistributorThreads + worker;
 }
 
 // This function is only called by the distributor threads, when pdns-distributes-queries is set
 void distributeAsyncFunction(const string& packet, const pipefunc_t& func)
 {
-  if (!isDistributorThread()) {
-    g_log << Logger::Error << "distributeAsyncFunction() has been called by a worker (" << t_id << ")" << endl;
+  if (!RecThreadInfo::self().isDistributor()) {
+    g_log << Logger::Error << "distributeAsyncFunction() has been called by a worker (" << RecThreadInfo::id() << ")" << endl;
     _exit(1);
   }
 
@@ -2412,7 +2411,7 @@ void distributeAsyncFunction(const string& packet, const pipefunc_t& func)
        was full, let's try another one */
     unsigned int newTarget = 0;
     do {
-      newTarget = /* skip handler */ 1 + g_numDistributorThreads + dns_random(g_numWorkerThreads);
+      newTarget = /* skip handler */ 1 + RecThreadInfo::s_numDistributorThreads + dns_random(RecThreadInfo::s_numWorkerThreads);
     } while (newTarget == target);
 
     if (!trySendingQueryToWorker(newTarget, tmsg)) {
index 1010369cf04e62de24f86c23b39107a89fc27845..b7fc66ee140ad949703a20f5755aa6d74385ad05 100644 (file)
@@ -1118,7 +1118,7 @@ static StatsMap toCPUStatsMap(const string& name)
 {
   const string pbasename = getPrometheusName(name);
   StatsMap entries;
-  for (unsigned int n = 0; n < g_numThreads; ++n) {
+  for (unsigned int n = 0; n < RecThreadInfo::numThreads(); ++n) {
     uint64_t tm = doGetThreadCPUMsec(n);
     std::string pname = pbasename + "{thread=\"" + std::to_string(n) + "\"}";
     entries.emplace(name + "-thread-" + std::to_string(n), StatsMapEntry{pname, std::to_string(tm)});
index f15ea7eb4ab2e7556e626328cb92b82f83353209..bcaf57fffa59cc39d57282deb993cc14547adebd 100644 (file)
@@ -34,7 +34,6 @@
 #include "validate-recursor.hh"
 #include "pubsuffix.hh"
 #include "opensslsigners.hh"
-#include "threadname.hh"
 #include "ws-recursor.hh"
 #include "rec-taskqueue.hh"
 #include "secpoll-recursor.hh"
@@ -78,9 +77,7 @@ thread_local std::shared_ptr<nod::UniqueResponseDB> t_udrDBp;
 #endif /* NOD_ENABLED */
 
 std::atomic<bool> statsWanted;
-unsigned int g_numWorkerThreads;
 uint32_t g_disthashseed;
-bool g_weDistributeQueries; // if true, 1 or more threads listen on the incoming query sockets and distribute them to workers
 bool g_useIncomingECS;
 uint16_t g_xpfRRCode{0};
 NetmaskGroup g_proxyProtocolACL;
@@ -91,8 +88,6 @@ std::shared_ptr<NetmaskGroup> g_initialAllowFrom; // new thread needs to be setu
 std::shared_ptr<NetmaskGroup> g_initialAllowNotifyFrom; // new threads need this to be setup
 std::shared_ptr<notifyset_t> g_initialAllowNotifyFor; // new threads need this to be setup
 bool g_logRPZChanges{false};
-unsigned int g_numDistributorThreads;
-unsigned int g_numThreads;
 static time_t s_statisticsInterval;
 bool g_addExtendedResolutionDNSErrors;
 static std::atomic<uint32_t> s_counter;
@@ -108,6 +103,11 @@ deferredAdd_t g_deferredAdds;
    and finally the workers */
 std::vector<RecThreadInfo> g_threadInfos;
 
+bool RecThreadInfo::s_weDistributeQueries; // if true, 1 or more threads listen on the incoming query sockets and distribute them to workers
+unsigned int RecThreadInfo::s_numDistributorThreads;
+unsigned int RecThreadInfo::s_numWorkerThreads;
+thread_local unsigned int RecThreadInfo::t_id;
+
 ArgvMap& arg()
 {
   static ArgvMap theArg;
@@ -629,8 +629,8 @@ static void checkLinuxIPv6Limits()
 static void checkOrFixFDS()
 {
   unsigned int availFDs = getFilenumLimit();
-  unsigned int wantFDs = g_maxMThreads * g_numWorkerThreads + 25; // even healthier margin then before
-  wantFDs += g_numWorkerThreads * TCPOutConnectionManager::s_maxIdlePerThread;
+  unsigned int wantFDs = g_maxMThreads * RecThreadInfo::s_numWorkerThreads + 25; // even healthier margin then before
+  wantFDs += RecThreadInfo::s_numWorkerThreads * TCPOutConnectionManager::s_maxIdlePerThread;
 
   if (wantFDs > availFDs) {
     unsigned int hardlimit = getFilenumLimit(true);
@@ -639,7 +639,7 @@ static void checkOrFixFDS()
       g_log << Logger::Warning << "Raised soft limit on number of filedescriptors to " << wantFDs << " to match max-mthreads and threads settings" << endl;
     }
     else {
-      int newval = (hardlimit - 25 - TCPOutConnectionManager::s_maxIdlePerThread) / g_numWorkerThreads;
+      int newval = (hardlimit - 25 - TCPOutConnectionManager::s_maxIdlePerThread) / RecThreadInfo::s_numWorkerThreads;
       g_log << Logger::Warning << "Insufficient number of filedescriptors available for max-mthreads*threads setting! (" << hardlimit << " < " << wantFDs << "), reducing max-mthreads to " << newval << endl;
       g_maxMThreads = newval;
       setFilenumLimit(hardlimit);
@@ -697,7 +697,7 @@ static void makeThreadPipes()
   }
 
   /* thread 0 is the handler / SNMP, worker threads start at 1 */
-  for (unsigned int n = 0; n <= (g_numWorkerThreads + g_numDistributorThreads); ++n) {
+  for (unsigned int n = 0; n <= (RecThreadInfo::s_numWorkerThreads + RecThreadInfo::s_numDistributorThreads); ++n) {
     auto& threadInfos = g_threadInfos.at(n);
 
     int fd[2];
@@ -783,7 +783,7 @@ static void doStats(void)
 
     size_t idx = 0;
     for (const auto& threadInfo : g_threadInfos) {
-      if (threadInfo.isWorker) {
+      if (threadInfo.isWorker()) {
         g_log << Logger::Notice << "stats: thread " << idx << " has been distributed " << threadInfo.numberOfDistributedQueries << " queries" << endl;
         ++idx;
       }
@@ -936,7 +936,7 @@ void broadcastFunction(const pipefunc_t& func)
      for the initialization of ACLs and domain maps. After that it should only
      be called by the handler. */
 
-  if (g_threadInfos.empty() && isHandlerThread()) {
+  if (g_threadInfos.empty() && RecThreadInfo::id() == 0) {
     /* the handler and  distributors will call themselves below, but
        during startup we get called while g_threadInfos has not been
        populated yet to update the ACL or domain maps, so we need to
@@ -947,7 +947,7 @@ void broadcastFunction(const pipefunc_t& func)
 
   unsigned int n = 0;
   for (const auto& threadInfo : g_threadInfos) {
-    if (n++ == t_id) {
+    if (n++ ==  RecThreadInfo::id()) {
       func(); // don't write to ourselves!
       continue;
     }
@@ -997,15 +997,15 @@ static vector<pair<DNSName, uint16_t>>& operator+=(vector<pair<DNSName, uint16_t
 template <class T>
 T broadcastAccFunction(const boost::function<T*()>& func)
 {
-  if (!isHandlerThread()) {
-    g_log << Logger::Error << "broadcastAccFunction has been called by a worker (" << t_id << ")" << endl;
+  if (!RecThreadInfo::self().isHandler()) {
+    g_log << Logger::Error << "broadcastAccFunction has been called by a worker (" << RecThreadInfo::id() << ")" << endl;
     _exit(1);
   }
 
   unsigned int n = 0;
   T ret = T();
   for (const auto& threadInfo : g_threadInfos) {
-    if (n++ == t_id) {
+    if (n++ == RecThreadInfo::id()) {
       continue;
     }
 
@@ -1148,8 +1148,8 @@ static int serviceMain(int argc, char* argv[])
   }
 
   /* this needs to be done before parseACLs(), which call broadcastFunction() */
-  g_weDistributeQueries = ::arg().mustDo("pdns-distributes-queries");
-  if (g_weDistributeQueries) {
+  RecThreadInfo::s_weDistributeQueries = ::arg().mustDo("pdns-distributes-queries");
+  if (RecThreadInfo::s_weDistributeQueries) {
     g_log << Logger::Warning << "PowerDNS Recursor itself will distribute queries over threads" << endl;
   }
 
@@ -1323,14 +1323,13 @@ static int serviceMain(int argc, char* argv[])
   }
   g_paddingTag = ::arg().asNum("edns-padding-tag");
 
-  g_numDistributorThreads = ::arg().asNum("distributor-threads");
-  g_numWorkerThreads = ::arg().asNum("threads");
-  if (g_numWorkerThreads < 1) {
+  RecThreadInfo::s_numDistributorThreads = ::arg().asNum("distributor-threads");
+  RecThreadInfo::s_numWorkerThreads = ::arg().asNum("threads");
+  if (RecThreadInfo::s_numWorkerThreads < 1) {
     g_log << Logger::Warning << "Asked to run with 0 threads, raising to 1 instead" << endl;
-    g_numWorkerThreads = 1;
+    RecThreadInfo::s_numWorkerThreads = 1;
   }
 
-  g_numThreads = g_numDistributorThreads + g_numWorkerThreads;
   g_maxMThreads = ::arg().asNum("max-mthreads");
 
   int64_t maxInFlight = ::arg().asNum("max-concurrent-requests-per-tcp-connection");
@@ -1425,12 +1424,12 @@ static int serviceMain(int argc, char* argv[])
   g_reusePort = ::arg().mustDo("reuseport");
 #endif
 
-  g_threadInfos.resize(g_numDistributorThreads + g_numWorkerThreads + /* handler */ 1);
+  g_threadInfos.resize(RecThreadInfo::s_numDistributorThreads + RecThreadInfo::s_numWorkerThreads + /* handler */ 1);
 
   if (g_reusePort) {
-    if (g_weDistributeQueries) {
+    if (RecThreadInfo::s_weDistributeQueries) {
       /* first thread is the handler, then distributors */
-      for (unsigned int threadId = 1; threadId <= g_numDistributorThreads; threadId++) {
+      for (unsigned int threadId = 1; threadId <= RecThreadInfo::s_numDistributorThreads; threadId++) {
         auto& deferredAdds = g_threadInfos.at(threadId).deferredAdds;
         auto& tcpSockets = g_threadInfos.at(threadId).tcpSockets;
         makeUDPServerSockets(deferredAdds);
@@ -1439,7 +1438,7 @@ static int serviceMain(int argc, char* argv[])
     }
     else {
       /* first thread is the handler, there is no distributor here and workers are accepting queries */
-      for (unsigned int threadId = 1; threadId <= g_numWorkerThreads; threadId++) {
+      for (unsigned int threadId = 1; threadId <= RecThreadInfo::s_numWorkerThreads; threadId++) {
         auto& deferredAdds = g_threadInfos.at(threadId).deferredAdds;
         auto& tcpSockets = g_threadInfos.at(threadId).tcpSockets;
         makeUDPServerSockets(deferredAdds);
@@ -1456,15 +1455,15 @@ static int serviceMain(int argc, char* argv[])
 
     /* every listener (so distributor if g_weDistributeQueries, workers otherwise)
        needs to listen to the shared sockets */
-    if (g_weDistributeQueries) {
+    if (RecThreadInfo::s_weDistributeQueries) {
       /* first thread is the handler, then distributors */
-      for (unsigned int threadId = 1; threadId <= g_numDistributorThreads; threadId++) {
+      for (unsigned int threadId = 1; threadId <= RecThreadInfo::s_numDistributorThreads; threadId++) {
         g_threadInfos.at(threadId).tcpSockets = tcpSockets;
       }
     }
     else {
       /* first thread is the handler, there is no distributor here and workers are accepting queries */
-      for (unsigned int threadId = 1; threadId <= g_numWorkerThreads; threadId++) {
+      for (unsigned int threadId = 1; threadId <= RecThreadInfo::s_numWorkerThreads; threadId++) {
         g_threadInfos.at(threadId).tcpSockets = tcpSockets;
       }
     }
@@ -1631,58 +1630,57 @@ static int serviceMain(int argc, char* argv[])
   unsigned int currentThreadId = 1;
   const auto cpusMap = parseCPUMap();
 
-  if (g_numThreads == 1) {
+  if (RecThreadInfo::numThreads() == 1) {
     g_log << Logger::Warning << "Operating unthreaded" << endl;
 #ifdef HAVE_SYSTEMD
     sd_notify(0, "READY=1");
 #endif
 
     /* This thread handles the web server, carbon, statistics and the control channel */
-    auto& handlerInfos = g_threadInfos.at(0);
-    handlerInfos.isHandler = true;
-    handlerInfos.thread = std::thread(recursorThread, 0, "web+stat");
+    auto& handlerInfo = g_threadInfos.at(0);
+    handlerInfo.setHandler();
+    handlerInfo.start(0, "web+stat");
 
     setCPUMap(cpusMap, currentThreadId, pthread_self());
 
-    auto& infos = g_threadInfos.at(currentThreadId);
-    infos.isListener = true;
-    infos.isWorker = true;
-    recursorThread(currentThreadId++, "worker");
+    auto& info = g_threadInfos.at(currentThreadId);
+    info.setListener();
+    info.setWorker();
+    info.setThreadId(currentThreadId++);
+    recursorThread();
 
-    handlerInfos.thread.join();
-    if (handlerInfos.exitCode != 0) {
-      ret = handlerInfos.exitCode;
+    handlerInfo.thread.join();
+    if (handlerInfo.exitCode != 0) {
+      ret = handlerInfo.exitCode;
     }
   }
   else {
-
-    if (g_weDistributeQueries) {
-      for (unsigned int n = 0; n < g_numDistributorThreads; ++n) {
-        auto& infos = g_threadInfos.at(currentThreadId + n);
-        infos.isListener = true;
+    if (RecThreadInfo::s_weDistributeQueries) {
+      for (unsigned int n = 0; n < RecThreadInfo::s_numDistributorThreads; ++n) {
+        g_threadInfos.at(currentThreadId + n).setListener();
       }
     }
-    for (unsigned int n = 0; n < g_numWorkerThreads; ++n) {
-      auto& infos = g_threadInfos.at(currentThreadId + (g_weDistributeQueries ? g_numDistributorThreads : 0) + n);
-      infos.isListener = !g_weDistributeQueries;
-      infos.isWorker = true;
+    for (unsigned int n = 0; n < RecThreadInfo::s_numWorkerThreads; ++n) {
+      auto& info = g_threadInfos.at(currentThreadId + (RecThreadInfo::s_weDistributeQueries ? RecThreadInfo::s_numDistributorThreads : 0) + n);
+      info.setListener(!RecThreadInfo::s_weDistributeQueries);
+      info.setWorker();
     }
 
-    if (g_weDistributeQueries) {
-      g_log << Logger::Warning << "Launching " << g_numDistributorThreads << " distributor threads" << endl;
-      for (unsigned int n = 0; n < g_numDistributorThreads; ++n) {
-        auto& infos = g_threadInfos.at(currentThreadId);
-        infos.thread = std::thread(recursorThread, currentThreadId++, "distr");
-        setCPUMap(cpusMap, currentThreadId, infos.thread.native_handle());
+    if (RecThreadInfo::s_weDistributeQueries) {
+      g_log << Logger::Warning << "Launching " << RecThreadInfo::s_numDistributorThreads << " distributor threads" << endl;
+      for (unsigned int n = 0; n < RecThreadInfo::s_numDistributorThreads; ++n) {
+        auto& info = g_threadInfos.at(currentThreadId);
+        info.start(currentThreadId++, "distr");
+        setCPUMap(cpusMap, currentThreadId, info.thread.native_handle());
       }
     }
 
-    g_log << Logger::Warning << "Launching " << g_numWorkerThreads << " worker threads" << endl;
+    g_log << Logger::Warning << "Launching " << RecThreadInfo::s_numWorkerThreads << " worker threads" << endl;
 
-    for (unsigned int n = 0; n < g_numWorkerThreads; ++n) {
-      auto& infos = g_threadInfos.at(currentThreadId);
-      infos.thread = std::thread(recursorThread, currentThreadId++, "worker");
-      setCPUMap(cpusMap, currentThreadId, infos.thread.native_handle());
+    for (unsigned int n = 0; n < RecThreadInfo::s_numWorkerThreads; ++n) {
+      auto& info = g_threadInfos.at(currentThreadId);
+      info.start(currentThreadId++, "worker");
+      setCPUMap(cpusMap, currentThreadId, info.thread.native_handle());
     }
 
 #ifdef HAVE_SYSTEMD
@@ -1690,9 +1688,9 @@ static int serviceMain(int argc, char* argv[])
 #endif
 
     /* This thread handles the web server, carbon, statistics and the control channel */
-    auto& infos = g_threadInfos.at(0);
-    infos.isHandler = true;
-    infos.thread = std::thread(recursorThread, 0, "web+stat");
+    auto& info = g_threadInfos.at(0);
+    info.setHandler();
+    info.start(0, "web+stat");
 
     for (auto& ti : g_threadInfos) {
       ti.thread.join();
@@ -1726,8 +1724,7 @@ static void handlePipeRequest(int fd, FDMultiplexer::funcparam_t& var)
       g_log << Logger::Error << "PIPE function we executed created PDNS exception: " << e.reason << endl; // but what if they wanted an answer.. we send 0
   }
   if (tmsg->wantAnswer) {
-    const auto& threadInfo = g_threadInfos.at(t_id);
-    if (write(threadInfo.pipes.writeFromThread, &resp, sizeof(resp)) != sizeof(resp)) {
+    if (write(RecThreadInfo::self().pipes.writeFromThread, &resp, sizeof(resp)) != sizeof(resp)) {
       delete tmsg;
       unixDie("write to thread pipe returned wrong size or error");
     }
@@ -1790,7 +1787,7 @@ static void houseKeeping(void*)
     past = now;
     past.tv_sec -= 5;
     if (t_last_prune < past) {
-      t_packetCache->doPruneTo(g_maxPacketCacheEntries / (g_numWorkerThreads + g_numDistributorThreads));
+      t_packetCache->doPruneTo(g_maxPacketCacheEntries / (RecThreadInfo::s_numWorkerThreads + RecThreadInfo::s_numDistributorThreads));
 
       time_t limit;
       if (!((t_cleanCounter++) % 40)) { // this is a full scan!
@@ -1804,7 +1801,7 @@ static void houseKeeping(void*)
       Utility::gettimeofday(&t_last_prune, nullptr);
     }
 
-    if (isHandlerThread()) {
+    if (RecThreadInfo::self().isHandler()) {
       if (now.tv_sec - s_last_ZTC_prune > 60) {
         s_last_ZTC_prune = now.tv_sec;
         static map<DNSName, RecZoneToCache::State> ztcStates;
@@ -1813,7 +1810,6 @@ static void houseKeeping(void*)
           RecZoneToCache::ZoneToCache(ztc.second, ztcStates.at(ztc.first));
         }
       }
-
       if (now.tv_sec - s_last_RC_prune > 5) {
         g_recCache->doPrune(g_maxCacheEntries);
         g_negCache->prune(g_maxCacheEntries / 10);
@@ -1901,230 +1897,229 @@ static void houseKeeping(void*)
   }
 }
 
-void* recursorThread(unsigned int n, const string& threadName)
-try {
-  t_id = n;
-  auto& threadInfo = g_threadInfos.at(t_id);
-
-  static string threadPrefix = "pdns-r/";
-  setThreadName(threadPrefix + threadName);
-
-  SyncRes tmp(g_now); // make sure it allocates tsstorage before we do anything, like primeHints or so..
-  SyncRes::setDomainMap(g_initialDomainMap);
-  t_allowFrom = g_initialAllowFrom;
-  t_allowNotifyFrom = g_initialAllowNotifyFrom;
-  t_allowNotifyFor = g_initialAllowNotifyFor;
-  t_udpclientsocks = std::make_unique<UDPClientSocks>();
-  t_tcpClientCounts = std::make_unique<tcpClientCounts_t>();
-
-  if (threadInfo.isHandler) {
-    if (!primeHints()) {
-      threadInfo.exitCode = EXIT_FAILURE;
-      RecursorControlChannel::stop = 1;
-      g_log << Logger::Critical << "Priming cache failed, stopping" << endl;
-      return nullptr;
+void* recursorThread()
+{
+  try {
+    auto& threadInfo = RecThreadInfo::self();
+    {
+      SyncRes tmp(g_now); // make sure it allocates tsstorage before we do anything, like primeHints or so..
+      SyncRes::setDomainMap(g_initialDomainMap);
+      t_allowFrom = g_initialAllowFrom;
+      t_allowNotifyFrom = g_initialAllowNotifyFrom;
+      t_allowNotifyFor = g_initialAllowNotifyFor;
+      t_udpclientsocks = std::make_unique<UDPClientSocks>();
+      t_tcpClientCounts = std::make_unique<tcpClientCounts_t>();
+
+      if (threadInfo.isHandler()) {
+        if (!primeHints()) {
+          threadInfo.exitCode = EXIT_FAILURE;
+          RecursorControlChannel::stop = 1;
+          g_log << Logger::Critical << "Priming cache failed, stopping" << endl;
+          return nullptr;
+        }
+        g_log << Logger::Debug << "Done priming cache with root hints" << endl;
+      }
     }
-    g_log << Logger::Debug << "Done priming cache with root hints" << endl;
-  }
 
-  t_packetCache = std::make_unique<RecursorPacketCache>();
+    t_packetCache = std::make_unique<RecursorPacketCache>();
 
 #ifdef NOD_ENABLED
-  if (threadInfo.isWorker)
-    setupNODThread();
+    if (threadInfo.isWorker())
+      setupNODThread();
 #endif /* NOD_ENABLED */
 
-  /* the listener threads handle TCP queries */
-  if (threadInfo.isWorker || threadInfo.isListener) {
-    try {
-      if (!::arg()["lua-dns-script"].empty()) {
-        t_pdl = std::make_shared<RecursorLua4>();
-        t_pdl->loadFile(::arg()["lua-dns-script"]);
-        g_log << Logger::Warning << "Loaded 'lua' script from '" << ::arg()["lua-dns-script"] << "'" << endl;
+    /* the listener threads handle TCP queries */
+    if (threadInfo.isWorker() || threadInfo.isListener()) {
+      try {
+        if (!::arg()["lua-dns-script"].empty()) {
+          t_pdl = std::make_shared<RecursorLua4>();
+          t_pdl->loadFile(::arg()["lua-dns-script"]);
+          g_log << Logger::Warning << "Loaded 'lua' script from '" << ::arg()["lua-dns-script"] << "'" << endl;
+        }
+      }
+      catch (std::exception& e) {
+        g_log << Logger::Error << "Failed to load 'lua' script from '" << ::arg()["lua-dns-script"] << "': " << e.what() << endl;
+        _exit(99);
       }
     }
-    catch (std::exception& e) {
-      g_log << Logger::Error << "Failed to load 'lua' script from '" << ::arg()["lua-dns-script"] << "': " << e.what() << endl;
-      _exit(99);
-    }
-  }
 
-  unsigned int ringsize = ::arg().asNum("stats-ringbuffer-entries") / g_numWorkerThreads;
-  if (ringsize) {
-    t_remotes = std::make_unique<addrringbuf_t>();
-    if (g_weDistributeQueries)
-      t_remotes->set_capacity(::arg().asNum("stats-ringbuffer-entries") / g_numDistributorThreads);
-    else
-      t_remotes->set_capacity(ringsize);
-    t_servfailremotes = std::make_unique<addrringbuf_t>();
-    t_servfailremotes->set_capacity(ringsize);
-    t_bogusremotes = std::make_unique<addrringbuf_t>();
-    t_bogusremotes->set_capacity(ringsize);
-    t_largeanswerremotes = std::make_unique<addrringbuf_t>();
-    t_largeanswerremotes->set_capacity(ringsize);
-    t_timeouts = std::make_unique<addrringbuf_t>();
-    t_timeouts->set_capacity(ringsize);
-
-    t_queryring = std::make_unique<boost::circular_buffer<pair<DNSName, uint16_t>>>();
-    t_queryring->set_capacity(ringsize);
-    t_servfailqueryring = std::make_unique<boost::circular_buffer<pair<DNSName, uint16_t>>>();
-    t_servfailqueryring->set_capacity(ringsize);
-    t_bogusqueryring = std::make_unique<boost::circular_buffer<pair<DNSName, uint16_t>>>();
-    t_bogusqueryring->set_capacity(ringsize);
-  }
-  MT = std::make_unique<MT_t>(::arg().asNum("stack-size"));
-  threadInfo.mt = MT.get();
-
-  /* start protobuf export threads if needed */
-  auto luaconfsLocal = g_luaconfs.getLocal();
-  checkProtobufExport(luaconfsLocal);
-  checkOutgoingProtobufExport(luaconfsLocal);
+    unsigned int ringsize = ::arg().asNum("stats-ringbuffer-entries") / RecThreadInfo::s_numWorkerThreads;
+    if (ringsize) {
+      t_remotes = std::make_unique<addrringbuf_t>();
+      if (RecThreadInfo::s_weDistributeQueries)
+        t_remotes->set_capacity(::arg().asNum("stats-ringbuffer-entries") / RecThreadInfo::s_numDistributorThreads);
+      else
+        t_remotes->set_capacity(ringsize);
+      t_servfailremotes = std::make_unique<addrringbuf_t>();
+      t_servfailremotes->set_capacity(ringsize);
+      t_bogusremotes = std::make_unique<addrringbuf_t>();
+      t_bogusremotes->set_capacity(ringsize);
+      t_largeanswerremotes = std::make_unique<addrringbuf_t>();
+      t_largeanswerremotes->set_capacity(ringsize);
+      t_timeouts = std::make_unique<addrringbuf_t>();
+      t_timeouts->set_capacity(ringsize);
+
+      t_queryring = std::make_unique<boost::circular_buffer<pair<DNSName, uint16_t>>>();
+      t_queryring->set_capacity(ringsize);
+      t_servfailqueryring = std::make_unique<boost::circular_buffer<pair<DNSName, uint16_t>>>();
+      t_servfailqueryring->set_capacity(ringsize);
+      t_bogusqueryring = std::make_unique<boost::circular_buffer<pair<DNSName, uint16_t>>>();
+      t_bogusqueryring->set_capacity(ringsize);
+    }
+    MT = std::make_unique<MT_t>(::arg().asNum("stack-size"));
+    threadInfo.mt = MT.get();
+
+    /* start protobuf export threads if needed */
+    auto luaconfsLocal = g_luaconfs.getLocal();
+    checkProtobufExport(luaconfsLocal);
+    checkOutgoingProtobufExport(luaconfsLocal);
 #ifdef HAVE_FSTRM
-  checkFrameStreamExport(luaconfsLocal);
+    checkFrameStreamExport(luaconfsLocal);
 #endif
 
-  PacketID pident;
+    PacketID pident;
 
-  t_fdm = getMultiplexer();
+    t_fdm = getMultiplexer();
 
-  RecursorWebServer* rws = nullptr;
+    RecursorWebServer* rws = nullptr;
 
-  t_fdm->addReadFD(threadInfo.pipes.readToThread, handlePipeRequest);
+    t_fdm->addReadFD(threadInfo.pipes.readToThread, handlePipeRequest);
 
-  if (threadInfo.isHandler) {
-    if (::arg().mustDo("webserver")) {
-      g_log << Logger::Warning << "Enabling web server" << endl;
-      try {
-        rws = new RecursorWebServer(t_fdm);
-      }
-      catch (const PDNSException& e) {
-        g_log << Logger::Error << "Unable to start the internal web server: " << e.reason << endl;
-        _exit(99);
+    if (threadInfo.isHandler()) {
+      if (::arg().mustDo("webserver")) {
+        g_log << Logger::Warning << "Enabling web server" << endl;
+        try {
+          rws = new RecursorWebServer(t_fdm);
+        }
+        catch (const PDNSException& e) {
+          g_log << Logger::Error << "Unable to start the internal web server: " << e.reason << endl;
+          _exit(99);
+        }
       }
+      g_log << Logger::Info << "Enabled '" << t_fdm->getName() << "' multiplexer" << endl;
     }
-    g_log << Logger::Info << "Enabled '" << t_fdm->getName() << "' multiplexer" << endl;
-  }
-  else {
-    t_fdm->addReadFD(threadInfo.pipes.readQueriesToThread, handlePipeRequest);
+    else {
+      t_fdm->addReadFD(threadInfo.pipes.readQueriesToThread, handlePipeRequest);
 
-    if (threadInfo.isListener) {
-      if (g_reusePort) {
-        /* then every listener has its own FDs */
-        for (const auto& deferred : threadInfo.deferredAdds) {
-          t_fdm->addReadFD(deferred.first, deferred.second);
+      if (threadInfo.isListener()) {
+        if (g_reusePort) {
+          /* then every listener has its own FDs */
+          for (const auto& deferred : threadInfo.deferredAdds) {
+            t_fdm->addReadFD(deferred.first, deferred.second);
+          }
         }
-      }
-      else {
-        /* otherwise all listeners are listening on the same ones */
-        for (const auto& deferred : g_deferredAdds) {
-          t_fdm->addReadFD(deferred.first, deferred.second);
+        else {
+          /* otherwise all listeners are listening on the same ones */
+          for (const auto& deferred : g_deferredAdds) {
+            t_fdm->addReadFD(deferred.first, deferred.second);
+          }
         }
       }
     }
-  }
 
-  registerAllStats();
+    registerAllStats();
 
-  if (threadInfo.isHandler) {
-    t_fdm->addReadFD(g_rcc.d_fd, handleRCC); // control channel
-  }
+    if (threadInfo.isHandler()) {
+      t_fdm->addReadFD(g_rcc.d_fd, handleRCC); // control channel
+    }
 
-  unsigned int maxTcpClients = ::arg().asNum("max-tcp-clients");
+    unsigned int maxTcpClients = ::arg().asNum("max-tcp-clients");
 
-  bool listenOnTCP(true);
+    bool listenOnTCP(true);
 
-  time_t last_stat = 0;
-  time_t last_carbon = 0, last_lua_maintenance = 0;
-  time_t carbonInterval = ::arg().asNum("carbon-interval");
-  time_t luaMaintenanceInterval = ::arg().asNum("lua-maintenance-interval");
-  s_counter.store(0); // used to periodically execute certain tasks
+    time_t last_stat = 0;
+    time_t last_carbon = 0, last_lua_maintenance = 0;
+    time_t carbonInterval = ::arg().asNum("carbon-interval");
+    time_t luaMaintenanceInterval = ::arg().asNum("lua-maintenance-interval");
+    s_counter.store(0); // used to periodically execute certain tasks
 
-  while (!RecursorControlChannel::stop) {
-    while (MT->schedule(&g_now))
-      ; // MTasker letting the mthreads do their thing
+    while (!RecursorControlChannel::stop) {
+      while (MT->schedule(&g_now))
+        ; // MTasker letting the mthreads do their thing
 
-    // Use primes, it avoid not being scheduled in cases where the counter has a regular pattern.
-    // We want to call handler thread often, it gets scheduled about 2 times per second
-    if ((threadInfo.isHandler && s_counter % 11 == 0) || s_counter % 499 == 0) {
-      MT->makeThread(houseKeeping, 0);
-    }
+      // Use primes, it avoid not being scheduled in cases where the counter has a regular pattern.
+      // We want to call handler thread often, it gets scheduled about 2 times per second
+      if ((threadInfo.isHandler() && s_counter % 11 == 0) || s_counter % 499 == 0) {
+        MT->makeThread(houseKeeping, 0);
+      }
 
-    if (!(s_counter % 55)) {
-      typedef vector<pair<int, FDMultiplexer::funcparam_t>> expired_t;
-      expired_t expired = t_fdm->getTimeouts(g_now);
+      if (!(s_counter % 55)) {
+        typedef vector<pair<int, FDMultiplexer::funcparam_t>> expired_t;
+        expired_t expired = t_fdm->getTimeouts(g_now);
 
-      for (expired_t::iterator i = expired.begin(); i != expired.end(); ++i) {
-        shared_ptr<TCPConnection> conn = boost::any_cast<shared_ptr<TCPConnection>>(i->second);
-        if (g_logCommonErrors)
-          g_log << Logger::Warning << "Timeout from remote TCP client " << conn->d_remote.toStringWithPort() << endl;
-        t_fdm->removeReadFD(i->first);
+        for (expired_t::iterator i = expired.begin(); i != expired.end(); ++i) {
+          shared_ptr<TCPConnection> conn = boost::any_cast<shared_ptr<TCPConnection>>(i->second);
+          if (g_logCommonErrors)
+            g_log << Logger::Warning << "Timeout from remote TCP client " << conn->d_remote.toStringWithPort() << endl;
+          t_fdm->removeReadFD(i->first);
+        }
       }
-    }
 
-    s_counter++;
+      s_counter++;
 
-    if (threadInfo.isHandler) {
-      if (statsWanted || (s_statisticsInterval > 0 && (g_now.tv_sec - last_stat) >= s_statisticsInterval)) {
-        doStats();
-        last_stat = g_now.tv_sec;
-      }
+      if (threadInfo.isHandler()) {
+        if (statsWanted || (s_statisticsInterval > 0 && (g_now.tv_sec - last_stat) >= s_statisticsInterval)) {
+          doStats();
+          last_stat = g_now.tv_sec;
+        }
 
-      Utility::gettimeofday(&g_now, nullptr);
+        Utility::gettimeofday(&g_now, nullptr);
 
-      if ((g_now.tv_sec - last_carbon) >= carbonInterval) {
-        MT->makeThread(doCarbonDump, 0);
-        last_carbon = g_now.tv_sec;
+        if ((g_now.tv_sec - last_carbon) >= carbonInterval) {
+          MT->makeThread(doCarbonDump, 0);
+          last_carbon = g_now.tv_sec;
+        }
       }
-    }
-    if (t_pdl != nullptr) {
-      // lua-dns-script directive is present, call the maintenance callback if needed
-      /* remember that the listener threads handle TCP queries */
-      if (threadInfo.isWorker || threadInfo.isListener) {
-        // Only on threads processing queries
-        if (g_now.tv_sec - last_lua_maintenance >= luaMaintenanceInterval) {
-          t_pdl->maintenance();
-          last_lua_maintenance = g_now.tv_sec;
+      if (t_pdl != nullptr) {
+        // lua-dns-script directive is present, call the maintenance callback if needed
+        /* remember that the listener threads handle TCP queries */
+        if (threadInfo.isWorker() || threadInfo.isListener()) {
+          // Only on threads processing queries
+          if (g_now.tv_sec - last_lua_maintenance >= luaMaintenanceInterval) {
+            t_pdl->maintenance();
+            last_lua_maintenance = g_now.tv_sec;
+          }
         }
       }
-    }
 
-    t_fdm->run(&g_now);
-    // 'run' updates g_now for us
+      t_fdm->run(&g_now);
+      // 'run' updates g_now for us
 
-    if (threadInfo.isListener) {
-      if (listenOnTCP) {
-        if (TCPConnection::getCurrentConnections() > maxTcpClients) { // shutdown, too many connections
-          for (const auto fd : threadInfo.tcpSockets) {
-            t_fdm->removeReadFD(fd);
+      if (threadInfo.isListener()) {
+        if (listenOnTCP) {
+          if (TCPConnection::getCurrentConnections() > maxTcpClients) { // shutdown, too many connections
+            for (const auto fd : threadInfo.tcpSockets) {
+              t_fdm->removeReadFD(fd);
+            }
+            listenOnTCP = false;
           }
-          listenOnTCP = false;
         }
-      }
-      else {
-        if (TCPConnection::getCurrentConnections() <= maxTcpClients) { // reenable
-          for (const auto fd : threadInfo.tcpSockets) {
-            t_fdm->addReadFD(fd, handleNewTCPQuestion);
+        else {
+          if (TCPConnection::getCurrentConnections() <= maxTcpClients) { // reenable
+            for (const auto fd : threadInfo.tcpSockets) {
+              t_fdm->addReadFD(fd, handleNewTCPQuestion);
+            }
+            listenOnTCP = true;
           }
-          listenOnTCP = true;
         }
       }
     }
+    delete rws;
+    delete t_fdm;
+    return nullptr;
+  }
+  catch (PDNSException& ae) {
+    g_log << Logger::Error << "Exception: " << ae.reason << endl;
+    return nullptr;
+  }
+  catch (std::exception& e) {
+    g_log << Logger::Error << "STL Exception: " << e.what() << endl;
+    return nullptr;
+  }
+  catch (...) {
+    g_log << Logger::Error << "any other exception in main: " << endl;
+    return nullptr;
   }
-  delete rws;
-  delete t_fdm;
-  return 0;
-}
-catch (PDNSException& ae) {
-  g_log << Logger::Error << "Exception: " << ae.reason << endl;
-  return 0;
-}
-catch (std::exception& e) {
-  g_log << Logger::Error << "STL Exception: " << e.what() << endl;
-  return 0;
-}
-catch (...) {
-  g_log << Logger::Error << "any other exception in main: " << endl;
-  return 0;
 }
 
 int main(int argc, char** argv)
@@ -2516,25 +2511,25 @@ static RecursorControlChannel::Answer* doReloadLuaScript()
   try {
     if (fname.empty()) {
       t_pdl.reset();
-      g_log << Logger::Info << t_id << " Unloaded current lua script" << endl;
+      g_log << Logger::Info << RecThreadInfo::id() << " Unloaded current lua script" << endl;
       return new RecursorControlChannel::Answer{0, string("unloaded\n")};
     }
     else {
       t_pdl = std::make_shared<RecursorLua4>();
       int err = t_pdl->loadFile(fname);
       if (err != 0) {
-        string msg = std::to_string(t_id) + " Retaining current script, could not read '" + fname + "': " + stringerror(err);
+        string msg = std::to_string(RecThreadInfo::id()) + " Retaining current script, could not read '" + fname + "': " + stringerror(err);
         g_log << Logger::Error << msg << endl;
         return new RecursorControlChannel::Answer{1, msg + "\n"};
       }
     }
   }
   catch (std::exception& e) {
-    g_log << Logger::Error << t_id << " Retaining current script, error from '" << fname << "': " << e.what() << endl;
+    g_log << Logger::Error << RecThreadInfo::id() << " Retaining current script, error from '" << fname << "': " << e.what() << endl;
     return new RecursorControlChannel::Answer{1, string("retaining current script, error from '" + fname + "': " + e.what() + "\n")};
   }
 
-  g_log << Logger::Warning << t_id << " (Re)loaded lua script from '" << fname << "'" << endl;
+  g_log << Logger::Warning << RecThreadInfo::id() << " (Re)loaded lua script from '" << fname << "'" << endl;
   return new RecursorControlChannel::Answer{0, string("(re)loaded '" + fname + "'\n")};
 }
 
index a5328bd35a95ac5076815e1bb9c24f5b233494f2..e8554badd932e7a2b22d22cf342d6c5bca24865e 100644 (file)
@@ -35,6 +35,7 @@
 #include "syncres.hh"
 #include "rec-snmp.hh"
 #include "rec_channel.hh"
+#include "threadname.hh"
 
 #ifdef NOD_ENABLED
 #include "nod.hh"
@@ -199,7 +200,6 @@ extern thread_local std::shared_ptr<NetmaskGroup> t_allowFrom;
 extern thread_local std::shared_ptr<NetmaskGroup> t_allowNotifyFrom;
 extern thread_local std::shared_ptr<notifyset_t> t_allowNotifyFor;
 extern thread_local std::unique_ptr<UDPClientSocks> t_udpclientsocks;
-extern bool g_weDistributeQueries; // if true, 1 or more threads listen on the incoming query sockets and distribute them to workers
 extern bool g_useIncomingECS;
 extern boost::optional<ComboAddress> g_dns64Prefix;
 extern DNSName g_dns64PrefixReverse;
@@ -208,8 +208,6 @@ extern bool g_addExtendedResolutionDNSErrors;
 extern uint16_t g_xpfRRCode;
 extern NetmaskGroup g_proxyProtocolACL;
 extern std::atomic<bool> g_statsWanted;
-extern unsigned int g_numDistributorThreads;
-extern unsigned int g_numWorkerThreads;
 extern uint32_t g_disthashseed;
 extern int g_argc;
 extern char** g_argv;
@@ -258,13 +256,6 @@ inline MT_t* getMT()
   return MT ? MT.get() : nullptr;
 }
 
-extern thread_local unsigned int t_id;
-
-inline unsigned int getRecursorThreadId()
-{
-  return t_id;
-}
-
 /* this function is called with both a string and a vector<uint8_t> representing a packet */
 template <class T>
 static bool sendResponseOverTCP(const std::unique_ptr<DNSComboWriter>& dc, const T& packet)
@@ -299,6 +290,14 @@ static bool sendResponseOverTCP(const std::unique_ptr<DNSComboWriter>& dc, const
   return hadError;
 }
 
+struct RecThreadInfo;
+/* first we have the handler thread, t_id == 0 (some other
+   helper threads like SNMP might have t_id == 0 as well)
+   then the distributor threads if any
+   and finally the workers */
+extern std::vector<RecThreadInfo> g_threadInfos;
+void* recursorThread();
+
 // for communicating with our threads
 // effectively readonly after startup
 struct RecThreadInfo
@@ -313,6 +312,74 @@ struct RecThreadInfo
     int readQueriesToThread{-1};
   };
 
+public:
+  static RecThreadInfo& self()
+  {
+    return g_threadInfos.at(t_id);
+  }
+
+  bool isDistributor() const
+  {
+    if (t_id == 0) {
+      return false;
+    }
+    return s_weDistributeQueries && listener;
+  }
+
+  bool isHandler() const
+  {
+    if (t_id == 0) {
+      return true;
+    }
+    return handler;
+  }
+
+  bool isWorker() const
+  {
+    return worker;
+  }
+
+  bool isListener() const
+  {
+    return listener;
+  }
+
+  void setHandler()
+  {
+    handler = true;
+  }
+
+  void setWorker()
+  {
+    worker = true;
+  }
+
+  void setListener(bool flag = true)
+  {
+    listener = flag;
+  }
+
+  void start(unsigned int id, const string& name)
+  {
+    thread = std::thread([id, name] {
+      t_id = id;
+      const string threadPrefix = "rec/";
+      setThreadName(threadPrefix + name);
+      recursorThread();
+    });
+    sleep(1);
+  }
+
+  static unsigned int id()
+  {
+    return t_id;
+  }
+
+  static void setThreadId(unsigned int id)
+  {
+    t_id = id;
+  }
+
   /* FD corresponding to TCP sockets this thread is listening
      on.
      These FDs are also in deferredAdds when we have one
@@ -327,12 +394,24 @@ struct RecThreadInfo
   MT_t* mt{nullptr};
   uint64_t numberOfDistributedQueries{0};
   int exitCode{0};
+
+  static bool s_weDistributeQueries; // if true, 1 or more threads listen on the incoming query sockets and distribute them to workers
+  static unsigned int s_numDistributorThreads;
+  static unsigned int s_numWorkerThreads;
+
+  static unsigned int numThreads()
+  {
+    return s_numDistributorThreads + s_numWorkerThreads;
+  }
+
+private:
   /* handle the web server, carbon, statistics and the control channel */
-  bool isHandler{false};
+  bool handler{false};
   /* accept incoming queries (and distributes them to the workers if pdns-distributes-queries is set) */
-  bool isListener{false};
+  bool listener{false};
   /* process queries */
-  bool isWorker{false};
+  bool worker{false};
+  static thread_local unsigned int t_id;
 };
 
 struct ThreadMSG
@@ -341,29 +420,7 @@ struct ThreadMSG
   bool wantAnswer;
 };
 
-/* first we have the handler thread, t_id == 0 (some other
-   helper threads like SNMP might have t_id == 0 as well)
-   then the distributor threads if any
-   and finally the workers */
-extern std::vector<RecThreadInfo> g_threadInfos;
-
-inline bool isDistributorThread()
-{
-  if (t_id == 0) {
-    return false;
-  }
-
-  return g_weDistributeQueries && g_threadInfos.at(t_id).isListener;
-}
 
-inline bool isHandlerThread()
-{
-  if (t_id == 0) {
-    return true;
-  }
-
-  return g_threadInfos.at(t_id).isHandler;
-}
 
 PacketBuffer GenUDPQueryResponse(const ComboAddress& dest, const string& query);
 bool checkProtobufExport(LocalStateHolder<LuaConfigItems>& luaconfsLocal);
@@ -397,7 +454,6 @@ void makeTCPServerSockets(deferredAdd_t& deferredAdds, std::set<int>& tcpSockets
 void handleNewTCPQuestion(int fd, FDMultiplexer::funcparam_t&);
 
 void makeUDPServerSockets(deferredAdd_t& deferredAdds);
-void* recursorThread(unsigned int n, const string& threadName);
 
 #define LOCAL_NETS "127.0.0.0/8, 10.0.0.0/8, 100.64.0.0/10, 169.254.0.0/16, 192.168.0.0/16, 172.16.0.0/12, ::1/128, fc00::/7, fe80::/10"
 #define LOCAL_NETS_INVERSE "!127.0.0.0/8, !10.0.0.0/8, !100.64.0.0/10, !169.254.0.0/16, !192.168.0.0/16, !172.16.0.0/12, !::1/128, !fc00::/7, !fe80::/10"
index 14fd9ae1cef251b075d37bef3ae225a2715541dd..4e41c08be3dcf0bddd4e636ce4b4e9a5423ea701 100644 (file)
@@ -452,7 +452,7 @@ static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var)
         bool ipf = t_pdl->ipfilter(dc->d_source, dc->d_destination, *dh, dc->d_eventTrace);
         if (ipf) {
           if (!g_quiet) {
-            g_log << Logger::Notice << t_id << " [" << MT->getTid() << "/" << MT->numProcesses() << "] DROPPED TCP question from " << dc->d_source.toStringWithPort() << (dc->d_source != dc->d_remote ? " (via " + dc->d_remote.toStringWithPort() + ")" : "") << " based on policy" << endl;
+            g_log << Logger::Notice << RecThreadInfo::id() << " [" << MT->getTid() << "/" << MT->numProcesses() << "] DROPPED TCP question from " << dc->d_source.toStringWithPort() << (dc->d_source != dc->d_remote ? " (via " + dc->d_remote.toStringWithPort() + ")" : "") << " based on policy" << endl;
           }
           g_stats.policyDrops++;
           return;
@@ -522,7 +522,7 @@ static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var)
 
           if (cacheHit) {
             if (!g_quiet) {
-              g_log << Logger::Notice << t_id << " TCP question answered from packet cache tag=" << dc->d_tag << " from " << dc->d_source.toStringWithPort() << (dc->d_source != dc->d_remote ? " (via " + dc->d_remote.toStringWithPort() + ")" : "") << endl;
+              g_log << Logger::Notice << RecThreadInfo::id() << " TCP question answered from packet cache tag=" << dc->d_tag << " from " << dc->d_source.toStringWithPort() << (dc->d_source != dc->d_remote ? " (via " + dc->d_remote.toStringWithPort() + ")" : "") << endl;
             }
 
             bool hadError = sendResponseOverTCP(dc, response);
@@ -551,7 +551,7 @@ static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var)
 
         if (dc->d_mdp.d_header.opcode == Opcode::Notify) {
           if (!g_quiet) {
-            g_log << Logger::Notice << t_id << " got NOTIFY for " << qname.toLogString() << " from " << dc->d_source.toStringWithPort() << (dc->d_source != dc->d_remote ? " (via " + dc->d_remote.toStringWithPort() + ")" : "") << endl;
+            g_log << Logger::Notice << RecThreadInfo::id() << " got NOTIFY for " << qname.toLogString() << " from " << dc->d_source.toStringWithPort() << (dc->d_source != dc->d_remote ? " (via " + dc->d_remote.toStringWithPort() + ")" : "") << endl;
           }
 
           requestWipeCaches(qname);
index 7b5d47c61afd391ade2660cc5b9f929cb8830710..fef6a3bac4f8a8d483fa22b9a1a0a6f6a583a016 100644 (file)
@@ -16,7 +16,6 @@ GlobalStateHolder<NetmaskGroup> g_dontThrottleNetmasks;
 GlobalStateHolder<SuffixMatchNode> g_DoTToAuthNames;
 std::unique_ptr<MemRecursorCache> g_recCache;
 std::unique_ptr<NegCache> g_negCache;
-unsigned int g_numThreads = 1;
 bool g_lowercaseOutgoing = false;
 
 /* Fake some required functions we didn't want the trouble to
index e8e3845d933955b663558074498326fd692fbc84..dd777e00f9b4fbcbffc770bef99da264c8cecc90 100644 (file)
@@ -1171,7 +1171,6 @@ string doTraceRegex(vector<string>::const_iterator begin, vector<string>::const_
 void parseACLs();
 extern RecursorStats g_stats;
 extern unsigned int g_networkTimeoutMsec;
-extern unsigned int g_numThreads;
 extern uint16_t g_outgoingEDNSBufsize;
 extern std::atomic<uint32_t> g_maxCacheEntries, g_maxPacketCacheEntries;
 extern bool g_lowercaseOutgoing;