]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
dnsdist: Add USE_SINGLE_ACCEPTOR_THREAD option
authorRemi Gacogne <remi.gacogne@powerdns.com>
Mon, 4 Apr 2022 15:51:24 +0000 (17:51 +0200)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Fri, 30 Sep 2022 14:15:42 +0000 (16:15 +0200)
pdns/dnsdist-tcp.cc
pdns/dnsdist.cc
pdns/dnsdist.hh
pdns/dnsdistdist/docs/install.rst

index bf58871e6c4e317cfd700c731ec0d0b3d3338968..631fcabddbea2058c69a36222184752b346a0195 100644 (file)
@@ -1371,12 +1371,14 @@ struct TCPAcceptorParam
   ClientState& cs;
   ComboAddress local;
   LocalStateHolder<NetmaskGroup>& acl;
+  int socket{-1};
 };
 
-static void acceptNewConnection(int socket, TCPAcceptorParam& param)
+static void acceptNewConnection(const TCPAcceptorParam& param)
 {
   auto& cs = param.cs;
   auto& acl = param.acl;
+  int socket = param.socket;
   bool tcpClientCountIncremented = false;
   ComboAddress remote;
   remote.sin4.sin_family = param.local.sin4.sin_family;
@@ -1456,34 +1458,36 @@ static void acceptNewConnection(int socket, TCPAcceptorParam& param)
 /* spawn as many of these as required, they call Accept on a socket on which they will accept queries, and
    they will hand off to worker threads & spawn more of them if required
 */
-void tcpAcceptorThread(ClientState* cs)
+void tcpAcceptorThread(std::vector<ClientState*> states)
 {
   setThreadName("dnsdist/tcpAcce");
 
   auto acl = g_ACL.getLocal();
-  struct TCPAcceptorParam param{*cs, cs->local, acl};
+  std::vector<TCPAcceptorParam> params;
+  params.reserve(states.size());
 
-  if (cs->d_additionalAddresses.empty()) {
+  for (auto& state : states) {
+    params.emplace_back(TCPAcceptorParam{*state, state->local, acl, state->tcpFD});
+    for (const auto& [addr, socket] : state->d_additionalAddresses) {
+      params.emplace_back(TCPAcceptorParam{*state, addr, acl, socket});
+    }
+  }
+
+  if (params.size() == 1) {
     while (true) {
-      acceptNewConnection(cs->tcpFD, param);
+      acceptNewConnection(params.at(0));
     }
   }
   else {
     auto acceptCallback = [](int socket, FDMultiplexer::funcparam_t& funcparam) {
-      auto acceptorParam = boost::any_cast<TCPAcceptorParam*>(funcparam);
-      acceptNewConnection(socket, *acceptorParam);
+      auto acceptorParam = boost::any_cast<const TCPAcceptorParam*>(funcparam);
+      acceptNewConnection(*acceptorParam);
     };
 
-    std::vector<TCPAcceptorParam> additionalParams;
     auto mplexer = std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
-    mplexer->addReadFD(cs->tcpFD, acceptCallback, &param);
-    for (const auto& [addr, socket] : cs->d_additionalAddresses) {
-      additionalParams.emplace_back(TCPAcceptorParam{*cs, addr, acl});
-    }
-    size_t idx = 0;
-    for (const auto& [addr, socket] : cs->d_additionalAddresses) {
-      mplexer->addReadFD(socket, acceptCallback, &additionalParams.at(idx));
-      idx++;
+    for (size_t idx = 0; idx < params.size(); idx++) {
+      const auto& param = params.at(idx);
+      mplexer->addReadFD(param.socket, acceptCallback, &param);
     }
 
     struct timeval tv;
index 0002181e321fa9e533366ab88aed1767b6724cf0..958f571fe1f9fbbd9a1e723add1f6fb22cb61afb 100644 (file)
@@ -1736,7 +1736,7 @@ static void MultipleMessagesUDPClientThread(ClientState* cs, LocalHolders& holde
 #endif /* DISABLE_RECVMMSG */
 
 // listens to incoming queries, sends out to downstream servers, noting the intended return path
-static void udpClientThread(ClientState* cs)
+static void udpClientThread(std::vector<ClientState*> states)
 {
   try {
     setThreadName("dnsdist/udpClie");
@@ -1744,7 +1744,7 @@ static void udpClientThread(ClientState* cs)
 #ifndef DISABLE_RECVMMSG
 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
     if (g_udpVectorSize > 1) {
-      MultipleMessagesUDPClientThread(cs, holders);
+      MultipleMessagesUDPClientThread(states.at(0), holders);
     }
     else
 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
@@ -1755,49 +1755,84 @@ static void udpClientThread(ClientState* cs)
          - we use it for self-generated responses (from rule or cache)
          but we only accept incoming payloads up to that size
       */
+      struct UDPStateParam
+      {
+        ClientState* cs{nullptr};
+        size_t maxIncomingPacketSize{0};
+        int socket{-1};
+      };
       const size_t initialBufferSize = getInitialUDPPacketBufferSize();
-      const size_t maxIncomingPacketSize = getMaximumIncomingPacketSize(*cs);
       PacketBuffer packet(initialBufferSize);
 
       struct msghdr msgh;
       struct iovec iov;
-      /* used by HarvestDestinationAddress */
-      cmsgbuf_aligned cbuf;
-
       ComboAddress remote;
       ComboAddress dest;
-      remote.sin4.sin_family = cs->local.sin4.sin_family;
-      fillMSGHdr(&msgh, &iov, &cbuf, sizeof(cbuf), reinterpret_cast<char*>(&packet.at(0)), maxIncomingPacketSize, &remote);
 
-      for(;;) {
+      auto handleOnePacket = [&packet, &iov, &holders, &msgh, &remote, &dest, initialBufferSize](const UDPStateParam& param) {
         packet.resize(initialBufferSize);
         iov.iov_base = &packet.at(0);
         iov.iov_len = packet.size();
 
-        ssize_t got = recvmsg(cs->udpFD, &msgh, 0);
+        ssize_t got = recvmsg(param.socket, &msgh, 0);
 
         if (got < 0 || static_cast<size_t>(got) < sizeof(struct dnsheader)) {
           ++g_stats.nonCompliantQueries;
-          ++cs->nonCompliantQueries;
-          continue;
+          ++param.cs->nonCompliantQueries;
+          return;
         }
 
         packet.resize(static_cast<size_t>(got));
 
-        processUDPQuery(*cs, holders, &msgh, remote, dest, packet, nullptr, nullptr, nullptr, nullptr);
+        processUDPQuery(*param.cs, holders, &msgh, remote, dest, packet, nullptr, nullptr, nullptr, nullptr);
+      };
+
+      std::vector<UDPStateParam> params;
+      for (auto& state : states) {
+        const size_t maxIncomingPacketSize = getMaximumIncomingPacketSize(*state);
+        params.emplace_back(UDPStateParam{state, maxIncomingPacketSize, state->udpFD});
+      }
+
+      if (params.size() == 1) {
+        auto param = params.at(0);
+        remote.sin4.sin_family = param.cs->local.sin4.sin_family;
+        /* used by HarvestDestinationAddress */
+        cmsgbuf_aligned cbuf;
+        fillMSGHdr(&msgh, &iov, &cbuf, sizeof(cbuf), reinterpret_cast<char*>(&packet.at(0)), param.maxIncomingPacketSize, &remote);
+        while (true) {
+          handleOnePacket(param);
+        }
+      }
+      else {
+        auto callback = [&remote, &msgh, &iov, &packet, &handleOnePacket, initialBufferSize](int socket, FDMultiplexer::funcparam_t& funcparam) {
+          auto param = boost::any_cast<const UDPStateParam*>(funcparam);
+          remote.sin4.sin_family = param->cs->local.sin4.sin_family;
+          packet.resize(initialBufferSize);
+          /* used by HarvestDestinationAddress */
+          cmsgbuf_aligned cbuf;
+          fillMSGHdr(&msgh, &iov, &cbuf, sizeof(cbuf), reinterpret_cast<char*>(&packet.at(0)), param->maxIncomingPacketSize, &remote);
+          handleOnePacket(*param);
+        };
+        auto mplexer = std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
+        for (size_t idx = 0; idx < params.size(); idx++) {
+          const auto& param = params.at(idx);
+          mplexer->addReadFD(param.socket, callback, &param);
+        }
+
+        struct timeval tv;
+        while (true) {
+          mplexer->run(&tv);
+        }
       }
     }
   }
-  catch(const std::exception &e)
-  {
+  catch (const std::exception &e) {
     errlog("UDP client thread died because of exception: %s", e.what());
   }
-  catch(const PDNSException &e)
-  {
+  catch (const PDNSException &e) {
     errlog("UDP client thread died because of PowerDNS exception: %s", e.reason);
   }
-  catch(...)
-  {
+  catch (...) {
     errlog("UDP client thread died because of an exception: %s", "unknown");
   }
 }
@@ -2744,6 +2779,8 @@ int main(int argc, char** argv)
       handleQueuedHealthChecks(*mplexer, true);
     }
 
+    std::vector<ClientState*> tcpStates;
+    std::vector<ClientState*> udpStates;
     for(auto& cs : g_frontends) {
       if (cs->dohFrontend != nullptr) {
 #ifdef HAVE_DNS_OVER_HTTPS
@@ -2756,21 +2793,38 @@ int main(int argc, char** argv)
         continue;
       }
       if (cs->udpFD >= 0) {
-        thread t1(udpClientThread, cs.get());
+#ifdef USE_SINGLE_ACCEPTOR_THREAD
+        udpStates.push_back(cs.get());
+#else /* USE_SINGLE_ACCEPTOR_THREAD */
+        thread t1(udpClientThread, std::vector<ClientState*>{ cs.get() });
         if (!cs->cpus.empty()) {
           mapThreadToCPUList(t1.native_handle(), cs->cpus);
         }
         t1.detach();
+#endif /* USE_SINGLE_ACCEPTOR_THREAD */
       }
       else if (cs->tcpFD >= 0) {
-        thread t1(tcpAcceptorThread, cs.get());
+#ifdef USE_SINGLE_ACCEPTOR_THREAD
+        tcpStates.push_back(cs.get());
+#else /* USE_SINGLE_ACCEPTOR_THREAD */
+        thread t1(tcpAcceptorThread, std::vector<ClientState*>{cs.get() });
         if (!cs->cpus.empty()) {
           mapThreadToCPUList(t1.native_handle(), cs->cpus);
         }
         t1.detach();
+#endif /* USE_SINGLE_ACCEPTOR_THREAD */
       }
     }
-
+#ifdef USE_SINGLE_ACCEPTOR_THREAD
+    if (!udpStates.empty()) {
+      thread udp(udpClientThread, udpStates);
+      udp.detach();
+    }
+    if (!tcpStates.empty()) {
+      thread tcp(tcpAcceptorThread, tcpStates);
+      tcp.detach();
+    }
+#endif /* USE_SINGLE_ACCEPTOR_THREAD */
     dnsdist::ServiceDiscovery::run();
 
 #ifndef DISABLE_CARBON
index 95129a82a6061d834e28edd6f12931f44e936206..adc7e5c9b1ab90f7eaf2eb645b72f5e54c1f6271 100644 (file)
@@ -719,6 +719,35 @@ struct ClientState
     d_filter = bpf;
   }
 
+  void detachFilter()
+  {
+    if (d_filter) {
+      detachFilter(getSocket());
+      for (const auto& [addr, socket] : d_additionalAddresses) {
+        (void) addr;
+        if (socket != -1) {
+          detachFilter(socket);
+        }
+      }
+
+      d_filter = nullptr;
+    }
+  }
+
+  void attachFilter(shared_ptr<BPFFilter> bpf)
+  {
+    detachFilter();
+
+    bpf->addSocket(getSocket());
+    for (const auto& [addr, socket] : d_additionalAddresses) {
+      (void) addr;
+      if (socket != -1) {
+        bpf->addSocket(socket);
+      }
+    }
+    d_filter = bpf;
+  }
+
   void updateTCPMetrics(size_t nbQueries, uint64_t durationMs)
   {
     tcpAvgQueriesPerConnection = (99.0 * tcpAvgQueriesPerConnection / 100.0) + (nbQueries / 100.0);
@@ -1131,7 +1160,7 @@ struct LocalHolders
 
 vector<std::function<void(void)>> setupLua(bool client, const std::string& config);
 
-void tcpAcceptorThread(ClientState* p);
+void tcpAcceptorThread(std::vector<ClientState*> states);
 
 #ifdef HAVE_DNS_OVER_HTTPS
 void dohThread(ClientState* cs);
index 3a7b6cf8fbe9924b84219c64e5e96da75f270cef..258aabf81b00ab64e5786db6ae2fea0412d70262 100644 (file)
@@ -147,3 +147,6 @@ Additionally several Lua bindings can be removed when they are not needed, as th
 * ``DISABLE_QPS_LIMITER_BINDINGS``
 * ``DISABLE_SUFFIX_MATCH_BINDINGS``
 * ``DISABLE_TOP_N_BINDINGS``
+
+Finally a build flag can be used to make use a single thread to handle all incoming UDP queries from clients, and another single thread to accept new TCP connections from clients, no matter how many :func:`addLocal` directives are present in the configuration. This option is destined to resource-constrained environments where dnsdist needs to listen on several addresses, over several interfaces, and one thread is enough to handle the traffic and therefore the overhead of using multiples threads for that task does not make sense.
+This option can be enabled by setting ``USE_SINGLE_ACCEPTOR_THREAD``.