]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
doq: handle responses in the main thread
authorCharles-Henri Bruyand <charles-henri.bruyand@open-xchange.com>
Thu, 21 Sep 2023 14:54:21 +0000 (16:54 +0200)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Mon, 9 Oct 2023 11:36:49 +0000 (13:36 +0200)
pdns/dnsdist-lua.cc
pdns/dnsdistdist/docs/reference/config.rst
pdns/dnsdistdist/doq.cc
pdns/dnsdistdist/doq.hh

index 6d39d29e0d5a754eeaa17082eb10cc345c7c07e7..e04f3ca20d27fc0e3eec59c9debb8e4aa10f35e3 100644 (file)
@@ -2511,6 +2511,8 @@ static void setupLuaConfig(LuaContext& luaCtx, bool client, bool configCheck)
     if (vars) {
       parseLocalBindVars(vars, reusePort, tcpFastOpenQueueSize, interface, cpus, tcpListenQueueSize, maxInFlightQueriesPerConn, tcpMaxConcurrentConnections);
 
+      getOptionalValue<int>(vars, "internalPipeBufferSize", frontend->d_internalPipeBufferSize);
+
       parseTLSConfig(frontend->d_tlsConfig, "addDOQLocal", vars);
 
       bool ignoreTLSConfigurationErrors = false;
index 6684b3554a4059bbcd7ccf08cde05c2c697314b9..fbb8e73f77645ce30b9a5aa269760b2c5bffe0c4 100644 (file)
@@ -189,6 +189,7 @@ Listen Sockets
   * ``interface=""``: str - Set the network interface to use.
   * ``cpus={}``: table - Set the CPU affinity for this listener thread, asking the scheduler to run it on a single CPU id, or a set of CPU ids. This parameter is only available if the OS provides the pthread_setaffinity_np() function.
   * ``idleTimeout=30``: int - Set the idle timeout, in seconds.
+  * ``internalPipeBufferSize=0``: int - Set the size in bytes of the internal buffer of the pipes used internally to pass queries and responses between threads. Requires support for ``F_SETPIPE_SZ`` which is present in Linux since 2.6.35. The actual size might be rounded up to a multiple of a page size. 0 means that the OS default size is used. The default value is 0, except on Linux where it is 1048576 since 1.6.0.
 
 .. function:: addTLSLocal(address, certFile(s), keyFile(s) [, options])
 
index 80c1c39359e5c9a2bbc24b1a674d168f22a78ed9..6acc201279d0798382c14cffcbc2ef8a228a55d8 100644 (file)
 
 #include "dnsdist-tcp.hh"
 #include "dnsdist-random.hh"
+#include "dnsparser.hh"
 #include "dolog.hh"
 #include "iputils.hh"
 #include "misc.hh"
 #include "sstuff.hh"
-#include "dnsparser.hh"
 #include "threadname.hh"
 #include "dnsdist-ecs.hh"
 #include "dnsdist-proxy-protocol.hh"
 static void sendBackDOQUnit(DOQUnitUniquePtr&& du, const char* description);
 struct DOQServerConfig
 {
-  DOQServerConfig(std::unique_ptr<quiche_config, decltype(&quiche_config_free)>&& config_) :
+  DOQServerConfig(std::unique_ptr<quiche_config, decltype(&quiche_config_free)>&& config_, uint32_t internalPipeBufferSize) :
     config(std::move(config_))
   {
+    {
+      auto [sender, receiver] = pdns::channel::createObjectQueue<DOQUnit>(pdns::channel::SenderBlockingMode::SenderNonBlocking, pdns::channel::ReceiverBlockingMode::ReceiverNonBlocking, internalPipeBufferSize);
+      d_responseSender = std::move(sender);
+      d_responseReceiver = std::move(receiver);
+    }
+
   }
   DOQServerConfig(const DOQServerConfig&) = delete;
   DOQServerConfig(DOQServerConfig&&) = default;
@@ -50,6 +56,8 @@ struct DOQServerConfig
   QuicheConfig config;
   ClientState* cs{nullptr};
   std::shared_ptr<DOQFrontend> df{nullptr};
+  pdns::channel::Sender<DOQUnit> d_responseSender;
+  pdns::channel::Receiver<DOQUnit> d_responseReceiver;
 };
 
 #if 0
@@ -84,6 +92,9 @@ public:
     }
 
     auto du = std::move(response.d_idstate.doqu);
+    if (du->responseSender == nullptr) {
+      return;
+    }
 
     du->response = std::move(response.d_buffer);
     du->ids = std::move(response.d_idstate);
@@ -259,7 +270,7 @@ void DOQFrontend::setup()
   quiche_config_set_cc_algorithm(config.get(), QUICHE_CC_RENO);
   // quiche_config_log_keys(config.get());
 
-  d_server_config = std::make_shared<DOQServerConfig>(std::move(config));
+  d_server_config = std::make_shared<DOQServerConfig>(std::move(config), d_internalPipeBufferSize);
 }
 
 static std::optional<PacketBuffer> getCID()
@@ -359,9 +370,16 @@ static std::optional<std::reference_wrapper<Connection>> getConnection(const Pac
 
 static void sendBackDOQUnit(DOQUnitUniquePtr&& du, const char* description)
 {
-  DEBUGLOG("Handling back a " << description);
-  auto conn = getConnection(du->serverConnID);
-  handleResponse(*du->dsc->df, *conn, du->streamID, du->response);
+  if (du->responseSender == nullptr) {
+    return;
+  }
+  try {
+    if (!du->responseSender->send(std::move(du))) {
+      vinfolog("Unable to pass a %s to the DoQ worker thread because the pipe is full", description);
+    }
+  } catch (const std::exception& e) {
+    vinfolog("Unable to pass a %s to the DoQ worker thread because we couldn't write to the pipe: %s", description, e.what());
+  }
 }
 
 static std::optional<std::reference_wrapper<Connection>> createConnection(QuicheConfig& config, const PacketBuffer& serverSideID, const PacketBuffer& originalDestinationID, const PacketBuffer& token, const ComboAddress& local, const ComboAddress& peer)
@@ -585,6 +603,7 @@ static void doq_dispatch_query(DOQServerConfig& dsc, PacketBuffer&& query, const
     du->ids.protocol = dnsdist::Protocol::DoQ;
     du->serverConnID = serverConnID;
     du->streamID = streamID;
+    du->responseSender = &dsc.d_responseSender;
 
     processDOQQuery(std::move(du));
   }
@@ -593,6 +612,30 @@ static void doq_dispatch_query(DOQServerConfig& dsc, PacketBuffer&& query, const
   }
 }
 
+static void flushResponses(pdns::channel::Receiver<DOQUnit>& receiver)
+{
+  for(;;) {
+    try {
+      auto tmp = receiver.receive();
+      if (!tmp) {
+        return ;
+      }
+
+      auto du = std::move(*tmp);
+      auto conn = getConnection(du->serverConnID);
+
+      handleResponse(*du->dsc->df, *conn, du->streamID, du->response);
+
+    }
+    catch (const std::exception& e) {
+      errlog("Error while processing response received over DoQ: %s", e.what());
+    }
+    catch (...) {
+      errlog("Unspecified error while processing response received over DoQ");
+    }
+  }
+}
+
 // this is the entrypoint from dnsdist.cc
 void doqThread(ClientState* cs)
 {
@@ -607,11 +650,18 @@ void doqThread(ClientState* cs)
     Socket sock(cs->udpFD);
 
     PacketBuffer buffer(std::numeric_limits<unsigned short>::max());
+    auto mplexer = std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
 
+    auto responseReceiverFD = frontend->d_server_config->d_responseReceiver.getDescriptor();
+    mplexer->addReadFD(sock.getHandle(), [](int, FDMultiplexer::funcparam_t&) {});
+    mplexer->addReadFD(responseReceiverFD, [](int, FDMultiplexer::funcparam_t&) {});
     while (true) {
-      std::string bufferStr;
-      ComboAddress client;
-      if (waitForData(sock.getHandle(), 1, 0) > 0) {
+      std::vector<int> readyFDs;
+      mplexer->getAvailableFDs(readyFDs, 500);
+
+      if (std::find(readyFDs.begin(), readyFDs.end(), sock.getHandle()) != readyFDs.end()) {
+        std::string bufferStr;
+        ComboAddress client;
         sock.recvFrom(bufferStr, client);
 
         uint32_t version{0};
@@ -706,6 +756,11 @@ void doqThread(ClientState* cs)
         else {
           DEBUGLOG("Connection not established");
         }
+        // }
+      }
+
+      if (std::find(readyFDs.begin(), readyFDs.end(), responseReceiverFD) != readyFDs.end()) {
+        flushResponses(frontend->d_server_config->d_responseReceiver);
       }
 
       for (auto conn = s_connections.begin(); conn != s_connections.end();) {
index 8761e04ca85bfadb8f21d725c7fe53f5f0c95c54..a45110411a35e155e43c0038057fd7fb6585700e 100644 (file)
@@ -71,6 +71,13 @@ struct DOQFrontend
   ComboAddress d_local;
 
   void setup();
+#ifdef __linux__
+  // On Linux this gives us 128k pending queries (default is 8192 queries),
+  // which should be enough to deal with huge spikes
+  uint32_t d_internalPipeBufferSize{1024*1024};
+#else
+  uint32_t d_internalPipeBufferSize{0};
+#endif
 };
 
 struct DOQUnit