From: Charles-Henri Bruyand Date: Thu, 21 Sep 2023 14:54:21 +0000 (+0200) Subject: doq: handle responses in the main thread X-Git-Tag: rec-5.0.0-alpha2~6^2~44 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=f7d508b5f3cc7848da9d7478cac734c99689902c;p=thirdparty%2Fpdns.git doq: handle responses in the main thread --- diff --git a/pdns/dnsdist-lua.cc b/pdns/dnsdist-lua.cc index 6d39d29e0d..e04f3ca20d 100644 --- a/pdns/dnsdist-lua.cc +++ b/pdns/dnsdist-lua.cc @@ -2511,6 +2511,8 @@ static void setupLuaConfig(LuaContext& luaCtx, bool client, bool configCheck) if (vars) { parseLocalBindVars(vars, reusePort, tcpFastOpenQueueSize, interface, cpus, tcpListenQueueSize, maxInFlightQueriesPerConn, tcpMaxConcurrentConnections); + getOptionalValue(vars, "internalPipeBufferSize", frontend->d_internalPipeBufferSize); + parseTLSConfig(frontend->d_tlsConfig, "addDOQLocal", vars); bool ignoreTLSConfigurationErrors = false; diff --git a/pdns/dnsdistdist/docs/reference/config.rst b/pdns/dnsdistdist/docs/reference/config.rst index 6684b3554a..fbb8e73f77 100644 --- a/pdns/dnsdistdist/docs/reference/config.rst +++ b/pdns/dnsdistdist/docs/reference/config.rst @@ -189,6 +189,7 @@ Listen Sockets * ``interface=""``: str - Set the network interface to use. * ``cpus={}``: table - Set the CPU affinity for this listener thread, asking the scheduler to run it on a single CPU id, or a set of CPU ids. This parameter is only available if the OS provides the pthread_setaffinity_np() function. * ``idleTimeout=30``: int - Set the idle timeout, in seconds. + * ``internalPipeBufferSize=0``: int - Set the size in bytes of the internal buffer of the pipes used internally to pass queries and responses between threads. Requires support for ``F_SETPIPE_SZ`` which is present in Linux since 2.6.35. The actual size might be rounded up to a multiple of a page size. 0 means that the OS default size is used. The default value is 0, except on Linux where it is 1048576 since 1.6.0. .. function:: addTLSLocal(address, certFile(s), keyFile(s) [, options]) diff --git a/pdns/dnsdistdist/doq.cc b/pdns/dnsdistdist/doq.cc index 80c1c39359..6acc201279 100644 --- a/pdns/dnsdistdist/doq.cc +++ b/pdns/dnsdistdist/doq.cc @@ -24,11 +24,11 @@ #include "dnsdist-tcp.hh" #include "dnsdist-random.hh" +#include "dnsparser.hh" #include "dolog.hh" #include "iputils.hh" #include "misc.hh" #include "sstuff.hh" -#include "dnsparser.hh" #include "threadname.hh" #include "dnsdist-ecs.hh" #include "dnsdist-proxy-protocol.hh" @@ -36,9 +36,15 @@ static void sendBackDOQUnit(DOQUnitUniquePtr&& du, const char* description); struct DOQServerConfig { - DOQServerConfig(std::unique_ptr&& config_) : + DOQServerConfig(std::unique_ptr&& config_, uint32_t internalPipeBufferSize) : config(std::move(config_)) { + { + auto [sender, receiver] = pdns::channel::createObjectQueue(pdns::channel::SenderBlockingMode::SenderNonBlocking, pdns::channel::ReceiverBlockingMode::ReceiverNonBlocking, internalPipeBufferSize); + d_responseSender = std::move(sender); + d_responseReceiver = std::move(receiver); + } + } DOQServerConfig(const DOQServerConfig&) = delete; DOQServerConfig(DOQServerConfig&&) = default; @@ -50,6 +56,8 @@ struct DOQServerConfig QuicheConfig config; ClientState* cs{nullptr}; std::shared_ptr df{nullptr}; + pdns::channel::Sender d_responseSender; + pdns::channel::Receiver d_responseReceiver; }; #if 0 @@ -84,6 +92,9 @@ public: } auto du = std::move(response.d_idstate.doqu); + if (du->responseSender == nullptr) { + return; + } du->response = std::move(response.d_buffer); du->ids = std::move(response.d_idstate); @@ -259,7 +270,7 @@ void DOQFrontend::setup() quiche_config_set_cc_algorithm(config.get(), QUICHE_CC_RENO); // quiche_config_log_keys(config.get()); - d_server_config = std::make_shared(std::move(config)); + d_server_config = std::make_shared(std::move(config), d_internalPipeBufferSize); } static std::optional getCID() @@ -359,9 +370,16 @@ static std::optional> getConnection(const Pac static void sendBackDOQUnit(DOQUnitUniquePtr&& du, const char* description) { - DEBUGLOG("Handling back a " << description); - auto conn = getConnection(du->serverConnID); - handleResponse(*du->dsc->df, *conn, du->streamID, du->response); + if (du->responseSender == nullptr) { + return; + } + try { + if (!du->responseSender->send(std::move(du))) { + vinfolog("Unable to pass a %s to the DoQ worker thread because the pipe is full", description); + } + } catch (const std::exception& e) { + vinfolog("Unable to pass a %s to the DoQ worker thread because we couldn't write to the pipe: %s", description, e.what()); + } } static std::optional> createConnection(QuicheConfig& config, const PacketBuffer& serverSideID, const PacketBuffer& originalDestinationID, const PacketBuffer& token, const ComboAddress& local, const ComboAddress& peer) @@ -585,6 +603,7 @@ static void doq_dispatch_query(DOQServerConfig& dsc, PacketBuffer&& query, const du->ids.protocol = dnsdist::Protocol::DoQ; du->serverConnID = serverConnID; du->streamID = streamID; + du->responseSender = &dsc.d_responseSender; processDOQQuery(std::move(du)); } @@ -593,6 +612,30 @@ static void doq_dispatch_query(DOQServerConfig& dsc, PacketBuffer&& query, const } } +static void flushResponses(pdns::channel::Receiver& receiver) +{ + for(;;) { + try { + auto tmp = receiver.receive(); + if (!tmp) { + return ; + } + + auto du = std::move(*tmp); + auto conn = getConnection(du->serverConnID); + + handleResponse(*du->dsc->df, *conn, du->streamID, du->response); + + } + catch (const std::exception& e) { + errlog("Error while processing response received over DoQ: %s", e.what()); + } + catch (...) { + errlog("Unspecified error while processing response received over DoQ"); + } + } +} + // this is the entrypoint from dnsdist.cc void doqThread(ClientState* cs) { @@ -607,11 +650,18 @@ void doqThread(ClientState* cs) Socket sock(cs->udpFD); PacketBuffer buffer(std::numeric_limits::max()); + auto mplexer = std::unique_ptr(FDMultiplexer::getMultiplexerSilent()); + auto responseReceiverFD = frontend->d_server_config->d_responseReceiver.getDescriptor(); + mplexer->addReadFD(sock.getHandle(), [](int, FDMultiplexer::funcparam_t&) {}); + mplexer->addReadFD(responseReceiverFD, [](int, FDMultiplexer::funcparam_t&) {}); while (true) { - std::string bufferStr; - ComboAddress client; - if (waitForData(sock.getHandle(), 1, 0) > 0) { + std::vector readyFDs; + mplexer->getAvailableFDs(readyFDs, 500); + + if (std::find(readyFDs.begin(), readyFDs.end(), sock.getHandle()) != readyFDs.end()) { + std::string bufferStr; + ComboAddress client; sock.recvFrom(bufferStr, client); uint32_t version{0}; @@ -706,6 +756,11 @@ void doqThread(ClientState* cs) else { DEBUGLOG("Connection not established"); } + // } + } + + if (std::find(readyFDs.begin(), readyFDs.end(), responseReceiverFD) != readyFDs.end()) { + flushResponses(frontend->d_server_config->d_responseReceiver); } for (auto conn = s_connections.begin(); conn != s_connections.end();) { diff --git a/pdns/dnsdistdist/doq.hh b/pdns/dnsdistdist/doq.hh index 8761e04ca8..a45110411a 100644 --- a/pdns/dnsdistdist/doq.hh +++ b/pdns/dnsdistdist/doq.hh @@ -71,6 +71,13 @@ struct DOQFrontend ComboAddress d_local; void setup(); +#ifdef __linux__ + // On Linux this gives us 128k pending queries (default is 8192 queries), + // which should be enough to deal with huge spikes + uint32_t d_internalPipeBufferSize{1024*1024}; +#else + uint32_t d_internalPipeBufferSize{0}; +#endif }; struct DOQUnit