]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
Move threadinfo vector to RecThreadInfo class
authorOtto Moerbeek <otto.moerbeek@open-xchange.com>
Tue, 25 Jan 2022 13:23:08 +0000 (14:23 +0100)
committerOtto Moerbeek <otto.moerbeek@open-xchange.com>
Fri, 4 Feb 2022 10:06:16 +0000 (11:06 +0100)
pdns/pdns_recursor.cc
pdns/recursordist/rec-main.cc
pdns/recursordist/rec-main.hh

index 3314b0164b1e8c1affc4e4f67d907d4dd7d6cc1f..10cca1503f33a40d986f566cdd9dc7216b89f0c8 100644 (file)
@@ -1756,7 +1756,7 @@ void requestWipeCaches(const DNSName& canon)
   ThreadMSG* tmsg = new ThreadMSG();
   tmsg->func = [=] { return pleaseWipeCaches(canon, true, 0xffff); };
   tmsg->wantAnswer = false;
-  if (write(g_threadInfos[0].pipes.writeToThread, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) {
+  if (write(RecThreadInfo::info(0).pipes.writeToThread, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) {
     delete tmsg;
 
     unixDie("write to thread pipe returned wrong size or error");
@@ -2322,7 +2322,7 @@ void makeUDPServerSockets(deferredAdd_t& deferredAdds)
 
 static bool trySendingQueryToWorker(unsigned int target, ThreadMSG* tmsg)
 {
-  auto& targetInfo = g_threadInfos[target];
+  auto& targetInfo = RecThreadInfo::info(target);
   if (!targetInfo.isWorker()) {
     g_log << Logger::Error << "distributeAsyncFunction() tried to assign a query to a non-worker thread" << endl;
     _exit(1);
@@ -2353,7 +2353,7 @@ static bool trySendingQueryToWorker(unsigned int target, ThreadMSG* tmsg)
 
 static unsigned int getWorkerLoad(size_t workerIdx)
 {
-  const auto mt = g_threadInfos[/* skip handler */ 1 + RecThreadInfo::s_numDistributorThreads + workerIdx].mt;
+  const auto mt = RecThreadInfo::info(/* skip handler */ 1 + RecThreadInfo::s_numDistributorThreads + workerIdx).mt;
   if (mt != nullptr) {
     return mt->numProcesses();
   }
index 97420d942259a9ea155243e46f8041929c9e5cad..8bf704dc09030c4532bbeba221e9d3c0fb89a995 100644 (file)
@@ -101,7 +101,7 @@ deferredAdd_t g_deferredAdds;
    helper threads like SNMP might have t_id == 0 as well)
    then the distributor threads if any
    and finally the workers */
-std::vector<RecThreadInfo> g_threadInfos;
+std::vector<RecThreadInfo> RecThreadInfo::s_threadInfos;
 
 bool RecThreadInfo::s_weDistributeQueries; // if true, 1 or more threads listen on the incoming query sockets and distribute them to workers
 unsigned int RecThreadInfo::s_numDistributorThreads;
@@ -697,15 +697,15 @@ static void makeThreadPipes()
   }
 
   /* thread 0 is the handler / SNMP, worker threads start at 1 */
-  for (unsigned int n = 0; n <= (RecThreadInfo::s_numWorkerThreads + RecThreadInfo::s_numDistributorThreads); ++n) {
-    auto& threadInfos = g_threadInfos.at(n);
+  for (unsigned int n = 0; n <= RecThreadInfo::numThreads(); ++n) {
+    auto& threadInfo = RecThreadInfo::info(n);
 
     int fd[2];
     if (pipe(fd) < 0)
       unixDie("Creating pipe for inter-thread communications");
 
-    threadInfos.pipes.readToThread = fd[0];
-    threadInfos.pipes.writeToThread = fd[1];
+    threadInfo.pipes.readToThread = fd[0];
+    threadInfo.pipes.writeToThread = fd[1];
 
     // handler thread only gets first pipe, not the others
     if (n == 0) {
@@ -715,27 +715,27 @@ static void makeThreadPipes()
     if (pipe(fd) < 0)
       unixDie("Creating pipe for inter-thread communications");
 
-    threadInfos.pipes.readFromThread = fd[0];
-    threadInfos.pipes.writeFromThread = fd[1];
+    threadInfo.pipes.readFromThread = fd[0];
+    threadInfo.pipes.writeFromThread = fd[1];
 
     if (pipe(fd) < 0)
       unixDie("Creating pipe for inter-thread communications");
 
-    threadInfos.pipes.readQueriesToThread = fd[0];
-    threadInfos.pipes.writeQueriesToThread = fd[1];
+    threadInfo.pipes.readQueriesToThread = fd[0];
+    threadInfo.pipes.writeQueriesToThread = fd[1];
 
     if (pipeBufferSize > 0) {
-      if (!setPipeBufferSize(threadInfos.pipes.writeQueriesToThread, pipeBufferSize)) {
+      if (!setPipeBufferSize(threadInfo.pipes.writeQueriesToThread, pipeBufferSize)) {
         int err = errno;
         g_log << Logger::Warning << "Error resizing the buffer of the distribution pipe for thread " << n << " to " << pipeBufferSize << ": " << strerror(err) << endl;
-        auto existingSize = getPipeBufferSize(threadInfos.pipes.writeQueriesToThread);
+        auto existingSize = getPipeBufferSize(threadInfo.pipes.writeQueriesToThread);
         if (existingSize > 0) {
           g_log << Logger::Warning << "The current size of the distribution pipe's buffer for thread " << n << " is " << existingSize << endl;
         }
       }
     }
 
-    if (!setNonBlocking(threadInfos.pipes.writeQueriesToThread)) {
+    if (!setNonBlocking(threadInfo.pipes.writeQueriesToThread)) {
       unixDie("Making pipe for inter-thread communications non-blocking");
     }
   }
@@ -782,7 +782,7 @@ static void doStats(void)
     g_log << Logger::Notice << "stats: " << pcSize << " packet cache entries, " << ratePercentage(pcHits, SyncRes::s_queries) << "% packet cache hits" << endl;
 
     size_t idx = 0;
-    for (const auto& threadInfo : g_threadInfos) {
+    for (const auto& threadInfo : RecThreadInfo::infos()) {
       if (threadInfo.isWorker()) {
         g_log << Logger::Notice << "stats: thread " << idx << " has been distributed " << threadInfo.numberOfDistributedQueries << " queries" << endl;
         ++idx;
@@ -936,7 +936,7 @@ void broadcastFunction(const pipefunc_t& func)
      for the initialization of ACLs and domain maps. After that it should only
      be called by the handler. */
 
-  if (g_threadInfos.empty() && RecThreadInfo::id() == 0) {
+  if (RecThreadInfo::infos().empty() && RecThreadInfo::id() == 0) {
     /* the handler and  distributors will call themselves below, but
        during startup we get called while g_threadInfos has not been
        populated yet to update the ACL or domain maps, so we need to
@@ -946,7 +946,7 @@ void broadcastFunction(const pipefunc_t& func)
   }
 
   unsigned int n = 0;
-  for (const auto& threadInfo : g_threadInfos) {
+  for (const auto& threadInfo : RecThreadInfo::infos()) {
     if (n++ ==  RecThreadInfo::id()) {
       func(); // don't write to ourselves!
       continue;
@@ -1004,7 +1004,7 @@ T broadcastAccFunction(const boost::function<T*()>& func)
 
   unsigned int n = 0;
   T ret = T();
-  for (const auto& threadInfo : g_threadInfos) {
+  for (const auto& threadInfo : RecThreadInfo::infos()) {
     if (n++ == RecThreadInfo::id()) {
       continue;
     }
@@ -1424,14 +1424,15 @@ static int serviceMain(int argc, char* argv[])
   g_reusePort = ::arg().mustDo("reuseport");
 #endif
 
-  g_threadInfos.resize(RecThreadInfo::s_numDistributorThreads + RecThreadInfo::s_numWorkerThreads + /* handler */ 1);
+  RecThreadInfo::infos().resize(RecThreadInfo::numThreads() + /* handler */ 1);
 
   if (g_reusePort) {
     if (RecThreadInfo::s_weDistributeQueries) {
       /* first thread is the handler, then distributors */
       for (unsigned int threadId = 1; threadId <= RecThreadInfo::s_numDistributorThreads; threadId++) {
-        auto& deferredAdds = g_threadInfos.at(threadId).deferredAdds;
-        auto& tcpSockets = g_threadInfos.at(threadId).tcpSockets;
+        auto& info =  RecThreadInfo::info(threadId);
+        auto& deferredAdds = info.deferredAdds;
+        auto& tcpSockets = info.tcpSockets;
         makeUDPServerSockets(deferredAdds);
         makeTCPServerSockets(deferredAdds, tcpSockets);
       }
@@ -1439,8 +1440,9 @@ static int serviceMain(int argc, char* argv[])
     else {
       /* first thread is the handler, there is no distributor here and workers are accepting queries */
       for (unsigned int threadId = 1; threadId <= RecThreadInfo::s_numWorkerThreads; threadId++) {
-        auto& deferredAdds = g_threadInfos.at(threadId).deferredAdds;
-        auto& tcpSockets = g_threadInfos.at(threadId).tcpSockets;
+        auto& info =  RecThreadInfo::info(threadId);
+        auto& deferredAdds = info.deferredAdds;
+        auto& tcpSockets = info.tcpSockets;
         makeUDPServerSockets(deferredAdds);
         makeTCPServerSockets(deferredAdds, tcpSockets);
       }
@@ -1458,13 +1460,13 @@ static int serviceMain(int argc, char* argv[])
     if (RecThreadInfo::s_weDistributeQueries) {
       /* first thread is the handler, then distributors */
       for (unsigned int threadId = 1; threadId <= RecThreadInfo::s_numDistributorThreads; threadId++) {
-        g_threadInfos.at(threadId).tcpSockets = tcpSockets;
+        RecThreadInfo::info(threadId).tcpSockets = tcpSockets;
       }
     }
     else {
       /* first thread is the handler, there is no distributor here and workers are accepting queries */
       for (unsigned int threadId = 1; threadId <= RecThreadInfo::s_numWorkerThreads; threadId++) {
-        g_threadInfos.at(threadId).tcpSockets = tcpSockets;
+        RecThreadInfo::info(threadId).tcpSockets = tcpSockets;
       }
     }
   }
@@ -1637,13 +1639,13 @@ static int serviceMain(int argc, char* argv[])
 #endif
 
     /* This thread handles the web server, carbon, statistics and the control channel */
-    auto& handlerInfo = g_threadInfos.at(0);
+    auto& handlerInfo = RecThreadInfo::info(0);
     handlerInfo.setHandler();
     handlerInfo.start(0, "web+stat");
 
     setCPUMap(cpusMap, currentThreadId, pthread_self());
 
-    auto& info = g_threadInfos.at(currentThreadId);
+    auto& info = RecThreadInfo::info(currentThreadId);
     info.setListener();
     info.setWorker();
     info.setThreadId(currentThreadId++);
@@ -1657,11 +1659,11 @@ static int serviceMain(int argc, char* argv[])
   else {
     if (RecThreadInfo::s_weDistributeQueries) {
       for (unsigned int n = 0; n < RecThreadInfo::s_numDistributorThreads; ++n) {
-        g_threadInfos.at(currentThreadId + n).setListener();
+        RecThreadInfo::info(currentThreadId + n).setListener();
       }
     }
     for (unsigned int n = 0; n < RecThreadInfo::s_numWorkerThreads; ++n) {
-      auto& info = g_threadInfos.at(currentThreadId + (RecThreadInfo::s_weDistributeQueries ? RecThreadInfo::s_numDistributorThreads : 0) + n);
+      auto& info = RecThreadInfo::info(currentThreadId + (RecThreadInfo::s_weDistributeQueries ? RecThreadInfo::s_numDistributorThreads : 0) + n);
       info.setListener(!RecThreadInfo::s_weDistributeQueries);
       info.setWorker();
     }
@@ -1669,18 +1671,18 @@ static int serviceMain(int argc, char* argv[])
     if (RecThreadInfo::s_weDistributeQueries) {
       g_log << Logger::Warning << "Launching " << RecThreadInfo::s_numDistributorThreads << " distributor threads" << endl;
       for (unsigned int n = 0; n < RecThreadInfo::s_numDistributorThreads; ++n) {
-        auto& info = g_threadInfos.at(currentThreadId);
+        auto& info = RecThreadInfo::info(currentThreadId);
         info.start(currentThreadId++, "distr");
-        setCPUMap(cpusMap, currentThreadId, info.thread.native_handle());
+        setCPUMap(cpusMap, currentThreadId, info.thread.native_handle()); // XXX off by one?
       }
     }
 
     g_log << Logger::Warning << "Launching " << RecThreadInfo::s_numWorkerThreads << " worker threads" << endl;
 
     for (unsigned int n = 0; n < RecThreadInfo::s_numWorkerThreads; ++n) {
-      auto& info = g_threadInfos.at(currentThreadId);
+      auto& info = RecThreadInfo::info(currentThreadId);
       info.start(currentThreadId++, "worker");
-      setCPUMap(cpusMap, currentThreadId, info.thread.native_handle());
+      setCPUMap(cpusMap, currentThreadId, info.thread.native_handle()); // XXX off by one?
     }
 
 #ifdef HAVE_SYSTEMD
@@ -1688,11 +1690,11 @@ static int serviceMain(int argc, char* argv[])
 #endif
 
     /* This thread handles the web server, carbon, statistics and the control channel */
-    auto& info = g_threadInfos.at(0);
+    auto& info = RecThreadInfo::info(0);
     info.setHandler();
     info.start(0, "web+stat");
 
-    for (auto& ti : g_threadInfos) {
+    for (auto& ti : RecThreadInfo::infos()) {
       ti.thread.join();
       if (ti.exitCode != 0) {
         ret = ti.exitCode;
index 49e7fd2d219f5ecdda7c9b011a26ebeb39d66961..e4f79ae6013a60df02660d87ca01fc5db5e7a155 100644 (file)
@@ -290,16 +290,13 @@ static bool sendResponseOverTCP(const std::unique_ptr<DNSComboWriter>& dc, const
   return hadError;
 }
 
-struct RecThreadInfo;
-/* first we have the handler thread, t_id == 0 (some other
-   helper threads like SNMP might have t_id == 0 as well)
-   then the distributor threads if any
-   and finally the workers */
-extern std::vector<RecThreadInfo> g_threadInfos;
 void* recursorThread();
 
-// for communicating with our threads
-// effectively readonly after startup
+// For communicating with our threads effectively readonly after
+// startup.
+// First we have the handler thread, t_id == 0 (some other helper
+// threads like SNMP might have t_id == 0 as well) then the
+// distributor threads if any and finally the workers
 struct RecThreadInfo
 {
   struct ThreadPipeSet
@@ -315,7 +312,17 @@ struct RecThreadInfo
 public:
   static RecThreadInfo& self()
   {
-    return g_threadInfos.at(t_id);
+    return s_threadInfos.at(t_id);
+  }
+
+  static RecThreadInfo& info(unsigned int i)
+  {
+    return s_threadInfos.at(i);
+  }
+
+  static vector<RecThreadInfo>& infos()
+  {
+    return s_threadInfos;
   }
 
   bool isDistributor() const
@@ -367,7 +374,6 @@ public:
       setThreadName(threadPrefix + name);
       recursorThread();
     });
-    sleep(1);
   }
 
   static unsigned int id()
@@ -380,15 +386,15 @@ public:
     t_id = id;
   }
 
-  /* FD corresponding to TCP sockets this thread is listening
-     on.
-     These FDs are also in deferredAdds when we have one
-     socket per listener, and in g_deferredAdds instead. */
+   // FD corresponding to TCP sockets this thread is listening on.
+   // These FDs are also in deferredAdds when we have one socket per
+   // listener, and in g_deferredAdds instead.
   std::set<int> tcpSockets;
-  /* FD corresponding to listening sockets if we have one socket per
-     listener (with reuseport), otherwise all listeners share the
-     same FD and g_deferredAdds is then used instead */
+  // FD corresponding to listening sockets if we have one socket per
+  // listener (with reuseport), otherwise all listeners share the
+  // same FD and g_deferredAdds is then used instead
   deferredAdd_t deferredAdds;
+
   struct ThreadPipeSet pipes;
   std::thread thread;
   MT_t* mt{nullptr};
@@ -412,6 +418,7 @@ private:
   /* process queries */
   bool worker{false};
   static thread_local unsigned int t_id;
+  static std::vector<RecThreadInfo> s_threadInfos;
 };
 
 struct ThreadMSG