]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
dnsdist: In single acceptor mode, merge the TCP acceptor and worker threads
authorRemi Gacogne <remi.gacogne@powerdns.com>
Thu, 1 Sep 2022 12:16:50 +0000 (14:16 +0200)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Fri, 30 Sep 2022 14:15:43 +0000 (16:15 +0200)
As we usually have one TCP worker thread only in single acceptor
mode, there is no need to pass the query via a pipe, improving
latency and CPU usage, and also saving a thread.

pdns/dnsdist-tcp.cc
pdns/dnsdist.cc
pdns/dnsdistdist/dnsdist-tcp.hh

index 631fcabddbea2058c69a36222184752b346a0195..2ac8bf88c822719d3a3c0269c4a0c97c2ad666e0 100644 (file)
@@ -127,16 +127,16 @@ std::shared_ptr<TCPConnectionToBackend> IncomingTCPConnectionState::getDownstrea
   return downstream;
 }
 
-static void tcpClientThread(int pipefd, int crossProtocolQueriesPipeFD, int crossProtocolResponsesListenPipeFD, int crossProtocolResponsesWritePipeFD);
+static void tcpClientThread(int pipefd, int crossProtocolQueriesPipeFD, int crossProtocolResponsesListenPipeFD, int crossProtocolResponsesWritePipeFD, std::vector<ClientState*> tcpAcceptStates);
 
-TCPClientCollection::TCPClientCollection(size_t maxThreads): d_tcpclientthreads(maxThreads), d_maxthreads(maxThreads)
+TCPClientCollection::TCPClientCollection(size_t maxThreads, std::vector<ClientState*> tcpAcceptStates): d_tcpclientthreads(maxThreads), d_maxthreads(maxThreads)
 {
   for (size_t idx = 0; idx < maxThreads; idx++) {
-    addTCPClientThread();
+    addTCPClientThread(tcpAcceptStates);
   }
 }
 
-void TCPClientCollection::addTCPClientThread()
+void TCPClientCollection::addTCPClientThread(std::vector<ClientState*>& tcpAcceptStates)
 {
   auto preparePipe = [](int fds[2], const std::string& type) -> bool {
     if (pipe(fds) < 0) {
@@ -200,7 +200,7 @@ void TCPClientCollection::addTCPClientThread()
        no need to worry about it */
     TCPWorkerThread worker(pipefds[1], crossProtocolQueriesFDs[1], crossProtocolResponsesFDs[1]);
     try {
-      std::thread t1(tcpClientThread, pipefds[0], crossProtocolQueriesFDs[0], crossProtocolResponsesFDs[0], crossProtocolResponsesFDs[1]);
+      std::thread t1(tcpClientThread, pipefds[0], crossProtocolQueriesFDs[0], crossProtocolResponsesFDs[0], crossProtocolResponsesFDs[1], tcpAcceptStates);
       t1.detach();
     }
     catch (const std::runtime_error& e) {
@@ -1261,7 +1261,17 @@ static void handleCrossProtocolResponse(int pipefd, FDMultiplexer::funcparam_t&
   }
 }
 
-static void tcpClientThread(int pipefd, int crossProtocolQueriesPipeFD, int crossProtocolResponsesListenPipeFD, int crossProtocolResponsesWritePipeFD)
+struct TCPAcceptorParam
+{
+  ClientState& cs;
+  ComboAddress local;
+  LocalStateHolder<NetmaskGroup>& acl;
+  int socket{-1};
+};
+
+static void acceptNewConnection(const TCPAcceptorParam& param, TCPClientThreadData* threadData);
+
+static void tcpClientThread(int pipefd, int crossProtocolQueriesPipeFD, int crossProtocolResponsesListenPipeFD, int crossProtocolResponsesWritePipeFD, std::vector<ClientState*> tcpAcceptStates)
 {
   /* we get launched with a pipe on which we receive file descriptors from clients that we own
      from that point on */
@@ -1276,6 +1286,28 @@ static void tcpClientThread(int pipefd, int crossProtocolQueriesPipeFD, int cros
     data.mplexer->addReadFD(crossProtocolQueriesPipeFD, handleCrossProtocolQuery, &data);
     data.mplexer->addReadFD(crossProtocolResponsesListenPipeFD, handleCrossProtocolResponse, &data);
 
+    /* only used in single acceptor mode for now */
+    auto acl = g_ACL.getLocal();
+    std::vector<TCPAcceptorParam> acceptParams;
+    acceptParams.reserve(tcpAcceptStates.size());
+
+    for (auto& state : tcpAcceptStates) {
+      acceptParams.emplace_back(TCPAcceptorParam{*state, state->local, acl, state->tcpFD});
+      for (const auto& [addr, socket] : state->d_additionalAddresses) {
+        acceptParams.emplace_back(TCPAcceptorParam{*state, addr, acl, socket});
+      }
+    }
+
+    auto acceptCallback = [&data](int socket, FDMultiplexer::funcparam_t& funcparam) {
+      auto acceptorParam = boost::any_cast<const TCPAcceptorParam*>(funcparam);
+      acceptNewConnection(*acceptorParam, &data);
+    };
+
+    for (size_t idx = 0; idx < acceptParams.size(); idx++) {
+      const auto& param = acceptParams.at(idx);
+      data.mplexer->addReadFD(param.socket, acceptCallback, &param);
+    }
+
     struct timeval now;
     gettimeofday(&now, nullptr);
     time_t lastTimeoutScan = now.tv_sec;
@@ -1366,15 +1398,7 @@ static void tcpClientThread(int pipefd, int crossProtocolQueriesPipeFD, int cros
   }
 }
 
-struct TCPAcceptorParam
-{
-  ClientState& cs;
-  ComboAddress local;
-  LocalStateHolder<NetmaskGroup>& acl;
-  int socket{-1};
-};
-
-static void acceptNewConnection(const TCPAcceptorParam& param)
+static void acceptNewConnection(const TCPAcceptorParam& param, TCPClientThreadData* threadData)
 {
   auto& cs = param.cs;
   auto& acl = param.acl;
@@ -1440,11 +1464,19 @@ static void acceptNewConnection(const TCPAcceptorParam& param)
     vinfolog("Got TCP connection from %s", remote.toStringWithPort());
 
     ci->remote = remote;
-    if (!g_tcpclientthreads->passConnectionToThread(std::move(ci))) {
-      if (tcpClientCountIncremented) {
-        decrementTCPClientCount(remote);
+    if (threadData == nullptr) {
+      if (!g_tcpclientthreads->passConnectionToThread(std::move(ci))) {
+        if (tcpClientCountIncremented) {
+          decrementTCPClientCount(remote);
+        }
       }
     }
+    else {
+      struct timeval now;
+      gettimeofday(&now, nullptr);
+      auto state = std::make_shared<IncomingTCPConnectionState>(std::move(*ci), *threadData, now);
+      IncomingTCPConnectionState::handleIO(state, now);
+    }
   }
   catch (const std::exception& e) {
     errlog("While reading a TCP question: %s", e.what());
@@ -1458,6 +1490,7 @@ static void acceptNewConnection(const TCPAcceptorParam& param)
 /* spawn as many of these as required, they call Accept on a socket on which they will accept queries, and
    they will hand off to worker threads & spawn more of them if required
 */
+#ifndef USE_SINGLE_ACCEPTOR_THREAD
 void tcpAcceptorThread(std::vector<ClientState*> states)
 {
   setThreadName("dnsdist/tcpAcce");
@@ -1475,13 +1508,13 @@ void tcpAcceptorThread(std::vector<ClientState*> states)
 
   if (params.size() == 1) {
     while (true) {
-      acceptNewConnection(params.at(0));
+      acceptNewConnection(params.at(0), nullptr);
     }
   }
   else {
     auto acceptCallback = [](int socket, FDMultiplexer::funcparam_t& funcparam) {
       auto acceptorParam = boost::any_cast<const TCPAcceptorParam*>(funcparam);
-      acceptNewConnection(*acceptorParam);
+      acceptNewConnection(*acceptorParam, nullptr);
     };
 
     auto mplexer = std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
@@ -1496,3 +1529,4 @@ void tcpAcceptorThread(std::vector<ClientState*> states)
     }
   }
 }
+#endif
index 958f571fe1f9fbbd9a1e723add1f6fb22cb61afb..d1e87bdf06ccc9cb808f543d3e4b7bc10c14e5ee 100644 (file)
@@ -2734,7 +2734,9 @@ int main(int argc, char** argv)
     /* we need to create the TCP worker threads before the
        acceptor ones, otherwise we might crash when processing
        the first TCP query */
-    g_tcpclientthreads = std::make_unique<TCPClientCollection>(*g_maxTCPClientThreads);
+#ifndef USE_SINGLE_ACCEPTOR_THREAD
+    g_tcpclientthreads = std::make_unique<TCPClientCollection>(*g_maxTCPClientThreads, std::vector<ClientState*>());
+#endif
 
     initDoHWorkers();
 
@@ -2821,8 +2823,7 @@ int main(int argc, char** argv)
       udp.detach();
     }
     if (!tcpStates.empty()) {
-      thread tcp(tcpAcceptorThread, tcpStates);
-      tcp.detach();
+      g_tcpclientthreads = std::make_unique<TCPClientCollection>(1, tcpStates);
     }
 #endif /* USE_SINGLE_ACCEPTOR_THREAD */
     dnsdist::ServiceDiscovery::run();
index 1e896b473e073e8c002b24201b4d15e693fb9f14..b7b78582d17b1e3aa538c66090d3a5258c94a68c 100644 (file)
@@ -175,7 +175,7 @@ struct CrossProtocolQuery
 class TCPClientCollection
 {
 public:
-  TCPClientCollection(size_t maxThreads);
+  TCPClientCollection(size_t maxThreads, std::vector<ClientState*> tcpStates);
 
   int getThread()
   {
@@ -249,7 +249,7 @@ public:
   }
 
 private:
-  void addTCPClientThread();
+  void addTCPClientThread(std::vector<ClientState*>& tcpAcceptStates);
 
   struct TCPWorkerThread
   {