From: Remi Gacogne Date: Mon, 4 Apr 2022 15:51:24 +0000 (+0200) Subject: dnsdist: Add USE_SINGLE_ACCEPTOR_THREAD option X-Git-Tag: dnsdist-1.8.0-rc1~287^2~3 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=ab64ca35e350d25ad6c19ac2840ea14b03d66c79;p=thirdparty%2Fpdns.git dnsdist: Add USE_SINGLE_ACCEPTOR_THREAD option --- diff --git a/pdns/dnsdist-tcp.cc b/pdns/dnsdist-tcp.cc index bf58871e6c..631fcabddb 100644 --- a/pdns/dnsdist-tcp.cc +++ b/pdns/dnsdist-tcp.cc @@ -1371,12 +1371,14 @@ struct TCPAcceptorParam ClientState& cs; ComboAddress local; LocalStateHolder& acl; + int socket{-1}; }; -static void acceptNewConnection(int socket, TCPAcceptorParam& param) +static void acceptNewConnection(const TCPAcceptorParam& param) { auto& cs = param.cs; auto& acl = param.acl; + int socket = param.socket; bool tcpClientCountIncremented = false; ComboAddress remote; remote.sin4.sin_family = param.local.sin4.sin_family; @@ -1456,34 +1458,36 @@ static void acceptNewConnection(int socket, TCPAcceptorParam& param) /* spawn as many of these as required, they call Accept on a socket on which they will accept queries, and they will hand off to worker threads & spawn more of them if required */ -void tcpAcceptorThread(ClientState* cs) +void tcpAcceptorThread(std::vector states) { setThreadName("dnsdist/tcpAcce"); auto acl = g_ACL.getLocal(); - struct TCPAcceptorParam param{*cs, cs->local, acl}; + std::vector params; + params.reserve(states.size()); - if (cs->d_additionalAddresses.empty()) { + for (auto& state : states) { + params.emplace_back(TCPAcceptorParam{*state, state->local, acl, state->tcpFD}); + for (const auto& [addr, socket] : state->d_additionalAddresses) { + params.emplace_back(TCPAcceptorParam{*state, addr, acl, socket}); + } + } + + if (params.size() == 1) { while (true) { - acceptNewConnection(cs->tcpFD, param); + acceptNewConnection(params.at(0)); } } else { auto acceptCallback = [](int socket, FDMultiplexer::funcparam_t& funcparam) { - auto acceptorParam = boost::any_cast(funcparam); - acceptNewConnection(socket, *acceptorParam); + auto acceptorParam = boost::any_cast(funcparam); + acceptNewConnection(*acceptorParam); }; - std::vector additionalParams; auto mplexer = std::unique_ptr(FDMultiplexer::getMultiplexerSilent()); - mplexer->addReadFD(cs->tcpFD, acceptCallback, ¶m); - for (const auto& [addr, socket] : cs->d_additionalAddresses) { - additionalParams.emplace_back(TCPAcceptorParam{*cs, addr, acl}); - } - size_t idx = 0; - for (const auto& [addr, socket] : cs->d_additionalAddresses) { - mplexer->addReadFD(socket, acceptCallback, &additionalParams.at(idx)); - idx++; + for (size_t idx = 0; idx < params.size(); idx++) { + const auto& param = params.at(idx); + mplexer->addReadFD(param.socket, acceptCallback, ¶m); } struct timeval tv; diff --git a/pdns/dnsdist.cc b/pdns/dnsdist.cc index 0002181e32..958f571fe1 100644 --- a/pdns/dnsdist.cc +++ b/pdns/dnsdist.cc @@ -1736,7 +1736,7 @@ static void MultipleMessagesUDPClientThread(ClientState* cs, LocalHolders& holde #endif /* DISABLE_RECVMMSG */ // listens to incoming queries, sends out to downstream servers, noting the intended return path -static void udpClientThread(ClientState* cs) +static void udpClientThread(std::vector states) { try { setThreadName("dnsdist/udpClie"); @@ -1744,7 +1744,7 @@ static void udpClientThread(ClientState* cs) #ifndef DISABLE_RECVMMSG #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) if (g_udpVectorSize > 1) { - MultipleMessagesUDPClientThread(cs, holders); + MultipleMessagesUDPClientThread(states.at(0), holders); } else #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */ @@ -1755,49 +1755,84 @@ static void udpClientThread(ClientState* cs) - we use it for self-generated responses (from rule or cache) but we only accept incoming payloads up to that size */ + struct UDPStateParam + { + ClientState* cs{nullptr}; + size_t maxIncomingPacketSize{0}; + int socket{-1}; + }; const size_t initialBufferSize = getInitialUDPPacketBufferSize(); - const size_t maxIncomingPacketSize = getMaximumIncomingPacketSize(*cs); PacketBuffer packet(initialBufferSize); struct msghdr msgh; struct iovec iov; - /* used by HarvestDestinationAddress */ - cmsgbuf_aligned cbuf; - ComboAddress remote; ComboAddress dest; - remote.sin4.sin_family = cs->local.sin4.sin_family; - fillMSGHdr(&msgh, &iov, &cbuf, sizeof(cbuf), reinterpret_cast(&packet.at(0)), maxIncomingPacketSize, &remote); - for(;;) { + auto handleOnePacket = [&packet, &iov, &holders, &msgh, &remote, &dest, initialBufferSize](const UDPStateParam& param) { packet.resize(initialBufferSize); iov.iov_base = &packet.at(0); iov.iov_len = packet.size(); - ssize_t got = recvmsg(cs->udpFD, &msgh, 0); + ssize_t got = recvmsg(param.socket, &msgh, 0); if (got < 0 || static_cast(got) < sizeof(struct dnsheader)) { ++g_stats.nonCompliantQueries; - ++cs->nonCompliantQueries; - continue; + ++param.cs->nonCompliantQueries; + return; } packet.resize(static_cast(got)); - processUDPQuery(*cs, holders, &msgh, remote, dest, packet, nullptr, nullptr, nullptr, nullptr); + processUDPQuery(*param.cs, holders, &msgh, remote, dest, packet, nullptr, nullptr, nullptr, nullptr); + }; + + std::vector params; + for (auto& state : states) { + const size_t maxIncomingPacketSize = getMaximumIncomingPacketSize(*state); + params.emplace_back(UDPStateParam{state, maxIncomingPacketSize, state->udpFD}); + } + + if (params.size() == 1) { + auto param = params.at(0); + remote.sin4.sin_family = param.cs->local.sin4.sin_family; + /* used by HarvestDestinationAddress */ + cmsgbuf_aligned cbuf; + fillMSGHdr(&msgh, &iov, &cbuf, sizeof(cbuf), reinterpret_cast(&packet.at(0)), param.maxIncomingPacketSize, &remote); + while (true) { + handleOnePacket(param); + } + } + else { + auto callback = [&remote, &msgh, &iov, &packet, &handleOnePacket, initialBufferSize](int socket, FDMultiplexer::funcparam_t& funcparam) { + auto param = boost::any_cast(funcparam); + remote.sin4.sin_family = param->cs->local.sin4.sin_family; + packet.resize(initialBufferSize); + /* used by HarvestDestinationAddress */ + cmsgbuf_aligned cbuf; + fillMSGHdr(&msgh, &iov, &cbuf, sizeof(cbuf), reinterpret_cast(&packet.at(0)), param->maxIncomingPacketSize, &remote); + handleOnePacket(*param); + }; + auto mplexer = std::unique_ptr(FDMultiplexer::getMultiplexerSilent()); + for (size_t idx = 0; idx < params.size(); idx++) { + const auto& param = params.at(idx); + mplexer->addReadFD(param.socket, callback, ¶m); + } + + struct timeval tv; + while (true) { + mplexer->run(&tv); + } } } } - catch(const std::exception &e) - { + catch (const std::exception &e) { errlog("UDP client thread died because of exception: %s", e.what()); } - catch(const PDNSException &e) - { + catch (const PDNSException &e) { errlog("UDP client thread died because of PowerDNS exception: %s", e.reason); } - catch(...) - { + catch (...) { errlog("UDP client thread died because of an exception: %s", "unknown"); } } @@ -2744,6 +2779,8 @@ int main(int argc, char** argv) handleQueuedHealthChecks(*mplexer, true); } + std::vector tcpStates; + std::vector udpStates; for(auto& cs : g_frontends) { if (cs->dohFrontend != nullptr) { #ifdef HAVE_DNS_OVER_HTTPS @@ -2756,21 +2793,38 @@ int main(int argc, char** argv) continue; } if (cs->udpFD >= 0) { - thread t1(udpClientThread, cs.get()); +#ifdef USE_SINGLE_ACCEPTOR_THREAD + udpStates.push_back(cs.get()); +#else /* USE_SINGLE_ACCEPTOR_THREAD */ + thread t1(udpClientThread, std::vector{ cs.get() }); if (!cs->cpus.empty()) { mapThreadToCPUList(t1.native_handle(), cs->cpus); } t1.detach(); +#endif /* USE_SINGLE_ACCEPTOR_THREAD */ } else if (cs->tcpFD >= 0) { - thread t1(tcpAcceptorThread, cs.get()); +#ifdef USE_SINGLE_ACCEPTOR_THREAD + tcpStates.push_back(cs.get()); +#else /* USE_SINGLE_ACCEPTOR_THREAD */ + thread t1(tcpAcceptorThread, std::vector{cs.get() }); if (!cs->cpus.empty()) { mapThreadToCPUList(t1.native_handle(), cs->cpus); } t1.detach(); +#endif /* USE_SINGLE_ACCEPTOR_THREAD */ } } - +#ifdef USE_SINGLE_ACCEPTOR_THREAD + if (!udpStates.empty()) { + thread udp(udpClientThread, udpStates); + udp.detach(); + } + if (!tcpStates.empty()) { + thread tcp(tcpAcceptorThread, tcpStates); + tcp.detach(); + } +#endif /* USE_SINGLE_ACCEPTOR_THREAD */ dnsdist::ServiceDiscovery::run(); #ifndef DISABLE_CARBON diff --git a/pdns/dnsdist.hh b/pdns/dnsdist.hh index 95129a82a6..adc7e5c9b1 100644 --- a/pdns/dnsdist.hh +++ b/pdns/dnsdist.hh @@ -719,6 +719,35 @@ struct ClientState d_filter = bpf; } + void detachFilter() + { + if (d_filter) { + detachFilter(getSocket()); + for (const auto& [addr, socket] : d_additionalAddresses) { + (void) addr; + if (socket != -1) { + detachFilter(socket); + } + } + + d_filter = nullptr; + } + } + + void attachFilter(shared_ptr bpf) + { + detachFilter(); + + bpf->addSocket(getSocket()); + for (const auto& [addr, socket] : d_additionalAddresses) { + (void) addr; + if (socket != -1) { + bpf->addSocket(socket); + } + } + d_filter = bpf; + } + void updateTCPMetrics(size_t nbQueries, uint64_t durationMs) { tcpAvgQueriesPerConnection = (99.0 * tcpAvgQueriesPerConnection / 100.0) + (nbQueries / 100.0); @@ -1131,7 +1160,7 @@ struct LocalHolders vector> setupLua(bool client, const std::string& config); -void tcpAcceptorThread(ClientState* p); +void tcpAcceptorThread(std::vector states); #ifdef HAVE_DNS_OVER_HTTPS void dohThread(ClientState* cs); diff --git a/pdns/dnsdistdist/docs/install.rst b/pdns/dnsdistdist/docs/install.rst index 3a7b6cf8fb..258aabf81b 100644 --- a/pdns/dnsdistdist/docs/install.rst +++ b/pdns/dnsdistdist/docs/install.rst @@ -147,3 +147,6 @@ Additionally several Lua bindings can be removed when they are not needed, as th * ``DISABLE_QPS_LIMITER_BINDINGS`` * ``DISABLE_SUFFIX_MATCH_BINDINGS`` * ``DISABLE_TOP_N_BINDINGS`` + +Finally a build flag can be used to make use a single thread to handle all incoming UDP queries from clients, and another single thread to accept new TCP connections from clients, no matter how many :func:`addLocal` directives are present in the configuration. This option is destined to resource-constrained environments where dnsdist needs to listen on several addresses, over several interfaces, and one thread is enough to handle the traffic and therefore the overhead of using multiples threads for that task does not make sense. +This option can be enabled by setting ``USE_SINGLE_ACCEPTOR_THREAD``.