From f64992dd90e4544c584c89036fecd50768840bc5 Mon Sep 17 00:00:00 2001 From: Remi Gacogne Date: Thu, 1 Sep 2022 14:16:50 +0200 Subject: [PATCH] dnsdist: In single acceptor mode, merge the TCP acceptor and worker threads As we usually have one TCP worker thread only in single acceptor mode, there is no need to pass the query via a pipe, improving latency and CPU usage, and also saving a thread. --- pdns/dnsdist-tcp.cc | 74 ++++++++++++++++++++++++--------- pdns/dnsdist.cc | 7 ++-- pdns/dnsdistdist/dnsdist-tcp.hh | 4 +- 3 files changed, 60 insertions(+), 25 deletions(-) diff --git a/pdns/dnsdist-tcp.cc b/pdns/dnsdist-tcp.cc index 631fcabddb..2ac8bf88c8 100644 --- a/pdns/dnsdist-tcp.cc +++ b/pdns/dnsdist-tcp.cc @@ -127,16 +127,16 @@ std::shared_ptr IncomingTCPConnectionState::getDownstrea return downstream; } -static void tcpClientThread(int pipefd, int crossProtocolQueriesPipeFD, int crossProtocolResponsesListenPipeFD, int crossProtocolResponsesWritePipeFD); +static void tcpClientThread(int pipefd, int crossProtocolQueriesPipeFD, int crossProtocolResponsesListenPipeFD, int crossProtocolResponsesWritePipeFD, std::vector tcpAcceptStates); -TCPClientCollection::TCPClientCollection(size_t maxThreads): d_tcpclientthreads(maxThreads), d_maxthreads(maxThreads) +TCPClientCollection::TCPClientCollection(size_t maxThreads, std::vector tcpAcceptStates): d_tcpclientthreads(maxThreads), d_maxthreads(maxThreads) { for (size_t idx = 0; idx < maxThreads; idx++) { - addTCPClientThread(); + addTCPClientThread(tcpAcceptStates); } } -void TCPClientCollection::addTCPClientThread() +void TCPClientCollection::addTCPClientThread(std::vector& tcpAcceptStates) { auto preparePipe = [](int fds[2], const std::string& type) -> bool { if (pipe(fds) < 0) { @@ -200,7 +200,7 @@ void TCPClientCollection::addTCPClientThread() no need to worry about it */ TCPWorkerThread worker(pipefds[1], crossProtocolQueriesFDs[1], crossProtocolResponsesFDs[1]); try { - std::thread t1(tcpClientThread, pipefds[0], crossProtocolQueriesFDs[0], crossProtocolResponsesFDs[0], crossProtocolResponsesFDs[1]); + std::thread t1(tcpClientThread, pipefds[0], crossProtocolQueriesFDs[0], crossProtocolResponsesFDs[0], crossProtocolResponsesFDs[1], tcpAcceptStates); t1.detach(); } catch (const std::runtime_error& e) { @@ -1261,7 +1261,17 @@ static void handleCrossProtocolResponse(int pipefd, FDMultiplexer::funcparam_t& } } -static void tcpClientThread(int pipefd, int crossProtocolQueriesPipeFD, int crossProtocolResponsesListenPipeFD, int crossProtocolResponsesWritePipeFD) +struct TCPAcceptorParam +{ + ClientState& cs; + ComboAddress local; + LocalStateHolder& acl; + int socket{-1}; +}; + +static void acceptNewConnection(const TCPAcceptorParam& param, TCPClientThreadData* threadData); + +static void tcpClientThread(int pipefd, int crossProtocolQueriesPipeFD, int crossProtocolResponsesListenPipeFD, int crossProtocolResponsesWritePipeFD, std::vector tcpAcceptStates) { /* we get launched with a pipe on which we receive file descriptors from clients that we own from that point on */ @@ -1276,6 +1286,28 @@ static void tcpClientThread(int pipefd, int crossProtocolQueriesPipeFD, int cros data.mplexer->addReadFD(crossProtocolQueriesPipeFD, handleCrossProtocolQuery, &data); data.mplexer->addReadFD(crossProtocolResponsesListenPipeFD, handleCrossProtocolResponse, &data); + /* only used in single acceptor mode for now */ + auto acl = g_ACL.getLocal(); + std::vector acceptParams; + acceptParams.reserve(tcpAcceptStates.size()); + + for (auto& state : tcpAcceptStates) { + acceptParams.emplace_back(TCPAcceptorParam{*state, state->local, acl, state->tcpFD}); + for (const auto& [addr, socket] : state->d_additionalAddresses) { + acceptParams.emplace_back(TCPAcceptorParam{*state, addr, acl, socket}); + } + } + + auto acceptCallback = [&data](int socket, FDMultiplexer::funcparam_t& funcparam) { + auto acceptorParam = boost::any_cast(funcparam); + acceptNewConnection(*acceptorParam, &data); + }; + + for (size_t idx = 0; idx < acceptParams.size(); idx++) { + const auto& param = acceptParams.at(idx); + data.mplexer->addReadFD(param.socket, acceptCallback, ¶m); + } + struct timeval now; gettimeofday(&now, nullptr); time_t lastTimeoutScan = now.tv_sec; @@ -1366,15 +1398,7 @@ static void tcpClientThread(int pipefd, int crossProtocolQueriesPipeFD, int cros } } -struct TCPAcceptorParam -{ - ClientState& cs; - ComboAddress local; - LocalStateHolder& acl; - int socket{-1}; -}; - -static void acceptNewConnection(const TCPAcceptorParam& param) +static void acceptNewConnection(const TCPAcceptorParam& param, TCPClientThreadData* threadData) { auto& cs = param.cs; auto& acl = param.acl; @@ -1440,11 +1464,19 @@ static void acceptNewConnection(const TCPAcceptorParam& param) vinfolog("Got TCP connection from %s", remote.toStringWithPort()); ci->remote = remote; - if (!g_tcpclientthreads->passConnectionToThread(std::move(ci))) { - if (tcpClientCountIncremented) { - decrementTCPClientCount(remote); + if (threadData == nullptr) { + if (!g_tcpclientthreads->passConnectionToThread(std::move(ci))) { + if (tcpClientCountIncremented) { + decrementTCPClientCount(remote); + } } } + else { + struct timeval now; + gettimeofday(&now, nullptr); + auto state = std::make_shared(std::move(*ci), *threadData, now); + IncomingTCPConnectionState::handleIO(state, now); + } } catch (const std::exception& e) { errlog("While reading a TCP question: %s", e.what()); @@ -1458,6 +1490,7 @@ static void acceptNewConnection(const TCPAcceptorParam& param) /* spawn as many of these as required, they call Accept on a socket on which they will accept queries, and they will hand off to worker threads & spawn more of them if required */ +#ifndef USE_SINGLE_ACCEPTOR_THREAD void tcpAcceptorThread(std::vector states) { setThreadName("dnsdist/tcpAcce"); @@ -1475,13 +1508,13 @@ void tcpAcceptorThread(std::vector states) if (params.size() == 1) { while (true) { - acceptNewConnection(params.at(0)); + acceptNewConnection(params.at(0), nullptr); } } else { auto acceptCallback = [](int socket, FDMultiplexer::funcparam_t& funcparam) { auto acceptorParam = boost::any_cast(funcparam); - acceptNewConnection(*acceptorParam); + acceptNewConnection(*acceptorParam, nullptr); }; auto mplexer = std::unique_ptr(FDMultiplexer::getMultiplexerSilent()); @@ -1496,3 +1529,4 @@ void tcpAcceptorThread(std::vector states) } } } +#endif diff --git a/pdns/dnsdist.cc b/pdns/dnsdist.cc index 958f571fe1..d1e87bdf06 100644 --- a/pdns/dnsdist.cc +++ b/pdns/dnsdist.cc @@ -2734,7 +2734,9 @@ int main(int argc, char** argv) /* we need to create the TCP worker threads before the acceptor ones, otherwise we might crash when processing the first TCP query */ - g_tcpclientthreads = std::make_unique(*g_maxTCPClientThreads); +#ifndef USE_SINGLE_ACCEPTOR_THREAD + g_tcpclientthreads = std::make_unique(*g_maxTCPClientThreads, std::vector()); +#endif initDoHWorkers(); @@ -2821,8 +2823,7 @@ int main(int argc, char** argv) udp.detach(); } if (!tcpStates.empty()) { - thread tcp(tcpAcceptorThread, tcpStates); - tcp.detach(); + g_tcpclientthreads = std::make_unique(1, tcpStates); } #endif /* USE_SINGLE_ACCEPTOR_THREAD */ dnsdist::ServiceDiscovery::run(); diff --git a/pdns/dnsdistdist/dnsdist-tcp.hh b/pdns/dnsdistdist/dnsdist-tcp.hh index 1e896b473e..b7b78582d1 100644 --- a/pdns/dnsdistdist/dnsdist-tcp.hh +++ b/pdns/dnsdistdist/dnsdist-tcp.hh @@ -175,7 +175,7 @@ struct CrossProtocolQuery class TCPClientCollection { public: - TCPClientCollection(size_t maxThreads); + TCPClientCollection(size_t maxThreads, std::vector tcpStates); int getThread() { @@ -249,7 +249,7 @@ public: } private: - void addTCPClientThread(); + void addTCPClientThread(std::vector& tcpAcceptStates); struct TCPWorkerThread { -- 2.47.2