From: Remi Gacogne Date: Thu, 6 May 2021 12:41:57 +0000 (+0200) Subject: dnsdist: Share the downstream TCP connections cache between threads X-Git-Tag: dnsdist-1.7.0-alpha1~45^2~24 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=9fdcf7ca43061f220d85fe72677c3741f983abc2;p=thirdparty%2Fpdns.git dnsdist: Share the downstream TCP connections cache between threads --- diff --git a/pdns/dnsdist-lua.cc b/pdns/dnsdist-lua.cc index 4e44913e19..fddb268427 100644 --- a/pdns/dnsdist-lua.cc +++ b/pdns/dnsdist-lua.cc @@ -43,6 +43,7 @@ #include "dnsdist-proxy-protocol.hh" #include "dnsdist-rings.hh" #include "dnsdist-secpoll.hh" +#include "dnsdist-tcp-downstream.hh" #include "dnsdist-web.hh" #include "base64.hh" @@ -1974,7 +1975,7 @@ static void setupLuaConfig(LuaContext& luaCtx, bool client, bool configCheck) luaCtx.writeFunction("setTCPDownstreamCleanupInterval", [](uint16_t interval) { setLuaSideEffect(); - g_downstreamTCPCleanupInterval = interval; + DownstreamConnectionsManager::setCleanupInterval(interval); }); luaCtx.writeFunction("setConsoleConnectionsLogging", [](bool enabled) { diff --git a/pdns/dnsdist-tcp.cc b/pdns/dnsdist-tcp.cc index aa4a47d31d..cb40ce90da 100644 --- a/pdns/dnsdist-tcp.cc +++ b/pdns/dnsdist-tcp.cc @@ -70,135 +70,11 @@ uint64_t g_maxTCPQueuedConnections{10000}; size_t g_tcpInternalPipeBufferSize{0}; uint64_t g_maxTCPQueuedConnections{1000}; #endif -uint16_t g_downstreamTCPCleanupInterval{60}; + int g_tcpRecvTimeout{2}; int g_tcpSendTimeout{2}; std::atomic g_tcpStatesDumpRequested{0}; -class DownstreamConnectionsManager -{ -public: - - static std::shared_ptr getConnectionToDownstream(std::unique_ptr& mplexer, std::shared_ptr& ds, const struct timeval& now) - { - std::shared_ptr result; - struct timeval freshCutOff = now; - freshCutOff.tv_sec -= 1; - - const auto& it = t_downstreamConnections.find(ds); - if (it != t_downstreamConnections.end()) { - auto& list = it->second; - while (!list.empty()) { - result = std::move(list.back()); - list.pop_back(); - - result->setReused(); - /* for connections that have not been used very recently, - check whether they have been closed in the meantime */ - if (freshCutOff < result->getLastDataReceivedTime()) { - /* used recently enough, skip the check */ - ++ds->tcpReusedConnections; - return result; - } - - if (isTCPSocketUsable(result->getHandle())) { - ++ds->tcpReusedConnections; - return result; - } - - /* otherwise let's try the next one, if any */ - } - } - - return std::make_shared(ds, mplexer, now); - } - - static void releaseDownstreamConnection(std::shared_ptr&& conn) - { - if (conn == nullptr) { - return; - } - - if (!conn->canBeReused()) { - conn.reset(); - return; - } - - const auto& ds = conn->getDS(); - auto& list = t_downstreamConnections[ds]; - while (list.size() >= s_maxCachedConnectionsPerDownstream) { - /* too many connections queued already */ - list.pop_front(); - } - - list.push_back(std::move(conn)); - } - - static void cleanupClosedTCPConnections(struct timeval now) - { - struct timeval freshCutOff = now; - freshCutOff.tv_sec -= 1; - - for (auto dsIt = t_downstreamConnections.begin(); dsIt != t_downstreamConnections.end(); ) { - for (auto connIt = dsIt->second.begin(); connIt != dsIt->second.end(); ) { - if (!(*connIt)) { - ++connIt; - continue; - } - - /* don't bother checking freshly used connections */ - if (freshCutOff < (*connIt)->getLastDataReceivedTime()) { - ++connIt; - continue; - } - - if (isTCPSocketUsable((*connIt)->getHandle())) { - ++connIt; - } - else { - connIt = dsIt->second.erase(connIt); - } - } - - if (!dsIt->second.empty()) { - ++dsIt; - } - else { - dsIt = t_downstreamConnections.erase(dsIt); - } - } - } - - static size_t clear() - { - size_t count = 0; - for (const auto& downstream : t_downstreamConnections) { - count += downstream.second.size(); - } - - t_downstreamConnections.clear(); - - return count; - } - - static void setMaxCachedConnectionsPerDownstream(size_t max) - { - s_maxCachedConnectionsPerDownstream = max; - } - -private: - static thread_local map, std::deque>> t_downstreamConnections; - static size_t s_maxCachedConnectionsPerDownstream; -}; - -void setMaxCachedTCPConnectionsPerDownstream(size_t max) -{ - DownstreamConnectionsManager::setMaxCachedConnectionsPerDownstream(max); -} - -thread_local map, std::deque>> DownstreamConnectionsManager::t_downstreamConnections; -size_t DownstreamConnectionsManager::s_maxCachedConnectionsPerDownstream{10}; - static void decrementTCPClientCount(const ComboAddress& client) { if (g_maxTCPConnectionsPerClient) { @@ -1166,15 +1042,44 @@ static void tcpClientThread(int pipefd, int crossProtocolPipeFD) struct timeval now; gettimeofday(&now, nullptr); - time_t lastTCPCleanup = now.tv_sec; time_t lastTimeoutScan = now.tv_sec; for (;;) { data.mplexer->run(&now); - if (g_downstreamTCPCleanupInterval > 0 && (now.tv_sec > (lastTCPCleanup + g_downstreamTCPCleanupInterval))) { - DownstreamConnectionsManager::cleanupClosedTCPConnections(now); - lastTCPCleanup = now.tv_sec; + if (now.tv_sec > lastTimeoutScan) { + lastTimeoutScan = now.tv_sec; + auto expiredReadConns = data.mplexer->getTimeouts(now, false); + for (const auto& cbData : expiredReadConns) { + if (cbData.second.type() == typeid(std::shared_ptr)) { + auto state = boost::any_cast>(cbData.second); + if (cbData.first == state->d_handler.getDescriptor()) { + vinfolog("Timeout (read) from remote TCP client %s", state->d_ci.remote.toStringWithPort()); + state->handleTimeout(state, false); + } + } + else if (cbData.second.type() == typeid(std::shared_ptr)) { + auto conn = boost::any_cast>(cbData.second); + vinfolog("Timeout (read) from remote backend %s", conn->getBackendName()); + conn->handleTimeout(now, false); + } + } + + auto expiredWriteConns = data.mplexer->getTimeouts(now, true); + for (const auto& cbData : expiredWriteConns) { + if (cbData.second.type() == typeid(std::shared_ptr)) { + auto state = boost::any_cast>(cbData.second); + if (cbData.first == state->d_handler.getDescriptor()) { + vinfolog("Timeout (write) from remote TCP client %s", state->d_ci.remote.toStringWithPort()); + state->handleTimeout(state, true); + } + } + else if (cbData.second.type() == typeid(std::shared_ptr)) { + auto conn = boost::any_cast>(cbData.second); + vinfolog("Timeout (write) from remote backend %s", conn->getBackendName()); + conn->handleTimeout(now, true); + } + } if (g_tcpStatesDumpRequested > 0) { /* just to keep things clean in the output, debug only */ @@ -1210,41 +1115,6 @@ static void tcpClientThread(int pipefd, int crossProtocolPipeFD) } } } - - if (now.tv_sec > lastTimeoutScan) { - lastTimeoutScan = now.tv_sec; - auto expiredReadConns = data.mplexer->getTimeouts(now, false); - for (const auto& cbData : expiredReadConns) { - if (cbData.second.type() == typeid(std::shared_ptr)) { - auto state = boost::any_cast>(cbData.second); - if (cbData.first == state->d_handler.getDescriptor()) { - vinfolog("Timeout (read) from remote TCP client %s", state->d_ci.remote.toStringWithPort()); - state->handleTimeout(state, false); - } - } - else if (cbData.second.type() == typeid(std::shared_ptr)) { - auto conn = boost::any_cast>(cbData.second); - vinfolog("Timeout (read) from remote backend %s", conn->getBackendName()); - conn->handleTimeout(now, false); - } - } - - auto expiredWriteConns = data.mplexer->getTimeouts(now, true); - for (const auto& cbData : expiredWriteConns) { - if (cbData.second.type() == typeid(std::shared_ptr)) { - auto state = boost::any_cast>(cbData.second); - if (cbData.first == state->d_handler.getDescriptor()) { - vinfolog("Timeout (write) from remote TCP client %s", state->d_ci.remote.toStringWithPort()); - state->handleTimeout(state, true); - } - } - else if (cbData.second.type() == typeid(std::shared_ptr)) { - auto conn = boost::any_cast>(cbData.second); - vinfolog("Timeout (write) from remote backend %s", conn->getBackendName()); - conn->handleTimeout(now, true); - } - } - } } } diff --git a/pdns/dnsdist.hh b/pdns/dnsdist.hh index af26d04ff5..2d394ca5de 100644 --- a/pdns/dnsdist.hh +++ b/pdns/dnsdist.hh @@ -948,8 +948,6 @@ extern uint32_t g_staleCacheEntriesTTL; extern bool g_apiReadWrite; extern std::string g_apiConfigDirectory; extern bool g_servFailOnNoPolicy; -extern bool g_useTCPSinglePipe; -extern uint16_t g_downstreamTCPCleanupInterval; extern size_t g_udpVectorSize; extern bool g_allowEmptyResponse; diff --git a/pdns/dnsdistdist/dnsdist-tcp-downstream.cc b/pdns/dnsdistdist/dnsdist-tcp-downstream.cc index f1e2baeb6d..afe744f8f3 100644 --- a/pdns/dnsdistdist/dnsdist-tcp-downstream.cc +++ b/pdns/dnsdistdist/dnsdist-tcp-downstream.cc @@ -567,10 +567,8 @@ IOState TCPConnectionToBackend::handleResponse(std::shared_ptrd_ds->outstanding; - } + --conn->d_ds->outstanding; auto ids = std::move(it->second.d_idstate); d_pendingResponses.erase(it); /* marking as idle for now, so we can accept new queries if our queues are empty */ @@ -579,6 +577,9 @@ IOState TCPConnectionToBackend::handleResponse(std::shared_ptrreleaseConnection(); sender->handleResponse(now, TCPResponse(std::move(d_responseBuffer), std::move(ids), conn)); if (!d_pendingQueries.empty()) { @@ -600,6 +601,9 @@ IOState TCPConnectionToBackend::handleResponse(std::shared_ptr DownstreamConnectionsManager::getConnectionToDownstream(std::unique_ptr& mplexer, std::shared_ptr& ds, const struct timeval& now) +{ + std::shared_ptr result; + struct timeval freshCutOff = now; + freshCutOff.tv_sec -= 1; + + auto backendId = ds->getID(); + + if (s_cleanupInterval > 0 && (s_nextCleanup == 0 || s_nextCleanup <= now.tv_sec)) { + s_nextCleanup = now.tv_sec + s_cleanupInterval; + cleanupClosedTCPConnections(now); + } + + { + std::lock_guard lock(s_lock); + const auto& it = s_downstreamConnections.find(backendId); + if (it != s_downstreamConnections.end()) { + auto& list = it->second; + while (!list.empty()) { + result = std::move(list.back()); + list.pop_back(); + + result->setReused(); + /* for connections that have not been used very recently, + check whether they have been closed in the meantime */ + if (freshCutOff < result->getLastDataReceivedTime()) { + /* used recently enough, skip the check */ + ++ds->tcpReusedConnections; + return result; + } + + if (isTCPSocketUsable(result->getHandle())) { + ++ds->tcpReusedConnections; + return result; + } + + /* otherwise let's try the next one, if any */ + } + } + } + + return std::make_shared(ds, mplexer, now); +} + +void DownstreamConnectionsManager::releaseDownstreamConnection(std::shared_ptr&& conn) +{ + if (conn == nullptr) { + return; + } + + if (!conn->canBeReused()) { + conn.reset(); + return; + } + + const auto& ds = conn->getDS(); + { + std::lock_guard lock(s_lock); + auto& list = s_downstreamConnections[ds->getID()]; + while (list.size() >= s_maxCachedConnectionsPerDownstream) { + /* too many connections queued already */ + list.pop_front(); + } + + list.push_back(std::move(conn)); + } +} + +void DownstreamConnectionsManager::cleanupClosedTCPConnections(struct timeval now) +{ + struct timeval freshCutOff = now; + freshCutOff.tv_sec -= 1; + + std::lock_guard lock(s_lock); + for (auto dsIt = s_downstreamConnections.begin(); dsIt != s_downstreamConnections.end(); ) { + for (auto connIt = dsIt->second.begin(); connIt != dsIt->second.end(); ) { + if (!(*connIt)) { + ++connIt; + continue; + } + + /* don't bother checking freshly used connections */ + if (freshCutOff < (*connIt)->getLastDataReceivedTime()) { + ++connIt; + continue; + } + + if (isTCPSocketUsable((*connIt)->getHandle())) { + ++connIt; + } + else { + connIt = dsIt->second.erase(connIt); + } + } + + if (!dsIt->second.empty()) { + ++dsIt; + } + else { + dsIt = s_downstreamConnections.erase(dsIt); + } + } +} + +size_t DownstreamConnectionsManager::clear() +{ + size_t count = 0; + std::lock_guard lock(s_lock); + for (const auto& downstream : s_downstreamConnections) { + count += downstream.second.size(); + } + + s_downstreamConnections.clear(); + + return count; +} + +void setMaxCachedTCPConnectionsPerDownstream(size_t max) +{ + DownstreamConnectionsManager::setMaxCachedConnectionsPerDownstream(max); +} + +map>> DownstreamConnectionsManager::s_downstreamConnections; +std::mutex DownstreamConnectionsManager::s_lock; +size_t DownstreamConnectionsManager::s_maxCachedConnectionsPerDownstream{10}; +time_t DownstreamConnectionsManager::s_nextCleanup{0}; +uint16_t DownstreamConnectionsManager::s_cleanupInterval{60}; diff --git a/pdns/dnsdistdist/dnsdist-tcp-downstream.hh b/pdns/dnsdistdist/dnsdist-tcp-downstream.hh index 8f7e354d2a..59cc936b1b 100644 --- a/pdns/dnsdistdist/dnsdist-tcp-downstream.hh +++ b/pdns/dnsdistdist/dnsdist-tcp-downstream.hh @@ -218,3 +218,29 @@ private: bool d_connectionDied{false}; bool d_proxyProtocolPayloadSent{false}; }; + +class DownstreamConnectionsManager +{ +public: + static std::shared_ptr getConnectionToDownstream(std::unique_ptr& mplexer, std::shared_ptr& ds, const struct timeval& now); + static void releaseDownstreamConnection(std::shared_ptr&& conn); + static void cleanupClosedTCPConnections(struct timeval now); + static size_t clear(); + + static void setMaxCachedConnectionsPerDownstream(size_t max) + { + s_maxCachedConnectionsPerDownstream = max; + } + + static void setCleanupInterval(uint16_t interval) + { + s_cleanupInterval = interval; + } + +private: + static map>> s_downstreamConnections; + static std::mutex s_lock; + static size_t s_maxCachedConnectionsPerDownstream; + static time_t s_nextCleanup; + static uint16_t s_cleanupInterval; +}; diff --git a/pdns/dnsdistdist/dnsdist-tcp-upstream.hh b/pdns/dnsdistdist/dnsdist-tcp-upstream.hh index e628e7c908..9d392006c9 100644 --- a/pdns/dnsdistdist/dnsdist-tcp-upstream.hh +++ b/pdns/dnsdistdist/dnsdist-tcp-upstream.hh @@ -30,6 +30,9 @@ public: d_ci.fd = -1; d_proxiedDestination = d_origDest; d_proxiedRemote = d_ci.remote; + + /* we manage the release of the downstream connection ourselves */ + d_releaseConnection = false; } IncomingTCPConnectionState(const IncomingTCPConnectionState& rhs) = delete; diff --git a/pdns/dnsdistdist/dnsdist-tcp.hh b/pdns/dnsdistdist/dnsdist-tcp.hh index 6a0058ca53..02581e6874 100644 --- a/pdns/dnsdistdist/dnsdist-tcp.hh +++ b/pdns/dnsdistdist/dnsdist-tcp.hh @@ -140,6 +140,15 @@ public: virtual void handleResponse(const struct timeval& now, TCPResponse&& response) = 0; virtual void handleXFRResponse(const struct timeval& now, TCPResponse&& response) = 0; virtual void notifyIOError(IDState&& query, const struct timeval& now) = 0; + + /* whether the connection should be automatically released to the pool after handleResponse() + has been called */ + bool releaseConnection() const + { + return d_releaseConnection; + } +protected: + bool d_releaseConnection{true}; }; struct CrossProtocolQuery