From: Remi Gacogne Date: Tue, 2 Nov 2021 16:56:04 +0000 (+0100) Subject: dnsdist: Refactoring of the TCP connection caches X-Git-Tag: dnsdist-1.7.0-beta1~5^2~7 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=9fd8dc1f2d7693e7b0660b4c7a849a7e48859d37;p=thirdparty%2Fpdns.git dnsdist: Refactoring of the TCP connection caches --- diff --git a/pdns/dnsdist-lua.cc b/pdns/dnsdist-lua.cc index 5d795a6d84..9e58507796 100644 --- a/pdns/dnsdist-lua.cc +++ b/pdns/dnsdist-lua.cc @@ -1306,7 +1306,7 @@ static void setupLuaConfig(LuaContext& luaCtx, bool client, bool configCheck) }); luaCtx.writeFunction("setMaxCachedTCPConnectionsPerDownstream", [](size_t max) { - DownstreamConnectionsManager::setMaxCachedConnectionsPerDownstream(max); + DownstreamTCPConnectionsManager::setMaxCachedConnectionsPerDownstream(max); }); luaCtx.writeFunction("setMaxCachedDoHConnectionsPerDownstream", [](size_t max) { @@ -2121,7 +2121,7 @@ static void setupLuaConfig(LuaContext& luaCtx, bool client, bool configCheck) luaCtx.writeFunction("setTCPDownstreamCleanupInterval", [](uint64_t interval) { setLuaSideEffect(); checkParameterBound("setTCPDownstreamCleanupInterval", interval); - DownstreamConnectionsManager::setCleanupInterval(interval); + DownstreamTCPConnectionsManager::setCleanupInterval(interval); }); luaCtx.writeFunction("setDoHDownstreamCleanupInterval", [](uint64_t interval) { @@ -2133,7 +2133,7 @@ static void setupLuaConfig(LuaContext& luaCtx, bool client, bool configCheck) luaCtx.writeFunction("setTCPDownstreamMaxIdleTime", [](uint64_t max) { setLuaSideEffect(); checkParameterBound("setTCPDownstreamMaxIdleTime", max); - DownstreamConnectionsManager::setMaxIdleTime(max); + DownstreamTCPConnectionsManager::setMaxIdleTime(max); }); luaCtx.writeFunction("setDoHDownstreamMaxIdleTime", [](uint64_t max) { diff --git a/pdns/dnsdist-tcp.cc b/pdns/dnsdist-tcp.cc index 86d5b9c7b0..5d3a422a8e 100644 --- a/pdns/dnsdist-tcp.cc +++ b/pdns/dnsdist-tcp.cc @@ -106,7 +106,7 @@ IncomingTCPConnectionState::~IncomingTCPConnectionState() size_t IncomingTCPConnectionState::clearAllDownstreamConnections() { - return DownstreamConnectionsManager::clear(); + return t_downstreamTCPConnectionsManager.clear(); } std::shared_ptr IncomingTCPConnectionState::getDownstreamConnection(std::shared_ptr& ds, const std::unique_ptr>& tlvs, const struct timeval& now) @@ -117,7 +117,7 @@ std::shared_ptr IncomingTCPConnectionState::getDownstrea if (!downstream) { /* we don't have a connection to this backend owned yet, let's get one (it might not be a fresh one, though) */ - downstream = DownstreamConnectionsManager::getConnectionToDownstream(d_threadData.mplexer, ds, now); + downstream = t_downstreamTCPConnectionsManager.getConnectionToDownstream(d_threadData.mplexer, ds, now, std::string()); if (ds->useProxyProtocol) { registerOwnedDownstreamConnection(downstream); } @@ -1134,7 +1134,7 @@ static void handleCrossProtocolQuery(int pipefd, FDMultiplexer::funcparam_t& par tmp = nullptr; try { - auto downstream = DownstreamConnectionsManager::getConnectionToDownstream(threadData->mplexer, downstreamServer, now); + auto downstream = t_downstreamTCPConnectionsManager.getConnectionToDownstream(threadData->mplexer, downstreamServer, now, std::string()); prependSizeToTCPQuery(query.d_buffer, proxyProtocolPayloadSize); downstream->queueQuery(tqs, std::move(query)); @@ -1210,7 +1210,7 @@ static void tcpClientThread(int pipefd, int crossProtocolQueriesPipeFD, int cros data.mplexer->run(&now); try { - DownstreamConnectionsManager::cleanupClosedTCPConnections(now); + t_downstreamTCPConnectionsManager.cleanupClosedConnections(now); if (now.tv_sec > lastTimeoutScan) { lastTimeoutScan = now.tv_sec; diff --git a/pdns/dnsdistdist/dnsdist-nghttp2.cc b/pdns/dnsdistdist/dnsdist-nghttp2.cc index b739ac3c05..4eb31cec98 100644 --- a/pdns/dnsdistdist/dnsdist-nghttp2.cc +++ b/pdns/dnsdistdist/dnsdist-nghttp2.cc @@ -46,7 +46,7 @@ std::optional g_outgoingDoHWorkerThreads{std::nullopt}; class DoHConnectionToBackend : public ConnectionToBackend { public: - DoHConnectionToBackend(std::shared_ptr ds, std::unique_ptr& mplexer, const struct timeval& now, std::string&& proxyProtocolPayload); + DoHConnectionToBackend(const std::shared_ptr& ds, std::unique_ptr& mplexer, const struct timeval& now, std::string&& proxyProtocolPayload); void handleTimeout(const struct timeval& now, bool write) override; void queueQuery(std::shared_ptr& sender, TCPQuery&& query) override; @@ -63,7 +63,7 @@ public: d_healthCheckQuery = h; } - void stopIO(); + void stopIO() override; bool reachedMaxConcurrentQueries() const override; bool reachedMaxStreamID() const override; bool isIdle() const override; @@ -121,37 +121,8 @@ private: bool d_firstWrite{true}; }; -class DownstreamDoHConnectionsManager -{ -public: - static std::shared_ptr getConnectionToDownstream(std::unique_ptr& mplexer, const std::shared_ptr& ds, const struct timeval& now, std::string&& proxyProtocolPayload); - static void releaseDownstreamConnection(std::shared_ptr&& conn); - static bool removeDownstreamConnection(std::shared_ptr& conn); - static void cleanupClosedConnections(struct timeval now); - static size_t clear(); - - static void setMaxCachedConnectionsPerDownstream(size_t max) - { - s_maxCachedConnectionsPerDownstream = max; - } - - static void setCleanupInterval(uint16_t interval) - { - s_cleanupInterval = interval; - } - - static void setMaxIdleTime(uint16_t max) - { - s_maxIdleTime = max; - } - -private: - static thread_local map>> t_downstreamConnections; - static thread_local time_t t_nextCleanup; - static size_t s_maxCachedConnectionsPerDownstream; - static uint16_t s_cleanupInterval; - static uint16_t s_maxIdleTime; -}; +using DownstreamDoHConnectionsManager = DownstreamConnectionsManager; +thread_local DownstreamDoHConnectionsManager t_downstreamDoHConnectionsManager; uint32_t DoHConnectionToBackend::getConcurrentStreamsCount() const { @@ -493,7 +464,7 @@ void DoHConnectionToBackend::stopIO() /* remove ourselves from the connection cache, this might mean that our reference count drops to zero after that, so we need to be careful */ auto shared = std::dynamic_pointer_cast(shared_from_this()); - DownstreamDoHConnectionsManager::removeDownstreamConnection(shared); + t_downstreamDoHConnectionsManager.removeDownstreamConnection(shared); } } @@ -745,7 +716,7 @@ int DoHConnectionToBackend::on_stream_close_callback(nghttp2_session* session, i if (request.d_query.d_downstreamFailures < conn->d_ds->d_retries) { // cerr<<"in "<<__PRETTY_FUNCTION__<<", looking for a connection to send a query of size "<d_mplexer, conn->d_ds, now, std::string(conn->d_proxyProtocolPayload)); + auto downstream = t_downstreamDoHConnectionsManager.getConnectionToDownstream(conn->d_mplexer, conn->d_ds, now, std::string(conn->d_proxyProtocolPayload)); downstream->queueQuery(request.d_sender, std::move(request.d_query)); } else { @@ -803,7 +774,7 @@ int DoHConnectionToBackend::on_error_callback(nghttp2_session* session, int lib_ return 0; } -DoHConnectionToBackend::DoHConnectionToBackend(std::shared_ptr ds, std::unique_ptr& mplexer, const struct timeval& now, std::string&& proxyProtocolPayload) : +DoHConnectionToBackend::DoHConnectionToBackend(const std::shared_ptr& ds, std::unique_ptr& mplexer, const struct timeval& now, std::string&& proxyProtocolPayload) : ConnectionToBackend(ds, mplexer, now), d_proxyProtocolPayload(std::move(proxyProtocolPayload)) { // inherit most of the stuff from the ConnectionToBackend() @@ -859,147 +830,6 @@ DoHConnectionToBackend::DoHConnectionToBackend(std::shared_ptr } } -thread_local map>> DownstreamDoHConnectionsManager::t_downstreamConnections; -thread_local time_t DownstreamDoHConnectionsManager::t_nextCleanup{0}; -size_t DownstreamDoHConnectionsManager::s_maxCachedConnectionsPerDownstream{10}; -uint16_t DownstreamDoHConnectionsManager::s_cleanupInterval{60}; -uint16_t DownstreamDoHConnectionsManager::s_maxIdleTime{300}; - -size_t DownstreamDoHConnectionsManager::clear() -{ - size_t result = 0; - for (const auto& backend : t_downstreamConnections) { - result += backend.second.size(); - for (auto& conn : backend.second) { - conn->stopIO(); - } - } - t_downstreamConnections.clear(); - return result; -} - -bool DownstreamDoHConnectionsManager::removeDownstreamConnection(std::shared_ptr& conn) -{ - bool found = false; - auto backendIt = t_downstreamConnections.find(conn->getDS()->getID()); - if (backendIt == t_downstreamConnections.end()) { - return found; - } - - for (auto connIt = backendIt->second.begin(); connIt != backendIt->second.end(); ++connIt) { - if (*connIt == conn) { - backendIt->second.erase(connIt); - found = true; - break; - } - } - - return found; -} - -void DownstreamDoHConnectionsManager::cleanupClosedConnections(struct timeval now) -{ - //cerr<<"cleanup interval is "< 0 && t_nextCleanup > now.tv_sec)) { - return; - } - - t_nextCleanup = now.tv_sec + s_cleanupInterval; - - struct timeval freshCutOff = now; - freshCutOff.tv_sec -= 1; - struct timeval idleCutOff = now; - idleCutOff.tv_sec -= s_maxIdleTime; - - for (auto dsIt = t_downstreamConnections.begin(); dsIt != t_downstreamConnections.end();) { - for (auto connIt = dsIt->second.begin(); connIt != dsIt->second.end();) { - if (!(*connIt)) { - connIt = dsIt->second.erase(connIt); - continue; - } - - /* don't bother checking freshly used connections */ - if (freshCutOff < (*connIt)->getLastDataReceivedTime()) { - ++connIt; - continue; - } - - if ((*connIt)->isIdle() && (*connIt)->getLastDataReceivedTime() < idleCutOff) { - /* idle for too long */ - connIt = dsIt->second.erase(connIt); - continue; - } - - if ((*connIt)->isUsable()) { - ++connIt; - continue; - } - - connIt = dsIt->second.erase(connIt); - } - - if (!dsIt->second.empty()) { - ++dsIt; - } - else { - dsIt = t_downstreamConnections.erase(dsIt); - } - } -} - -std::shared_ptr DownstreamDoHConnectionsManager::getConnectionToDownstream(std::unique_ptr& mplexer, const std::shared_ptr& ds, const struct timeval& now, std::string&& proxyProtocolPayload) -{ - std::shared_ptr result; - struct timeval freshCutOff = now; - freshCutOff.tv_sec -= 1; - - auto backendId = ds->getID(); - - cleanupClosedConnections(now); - - const bool haveProxyProtocol = !proxyProtocolPayload.empty(); - if (!haveProxyProtocol) { - //cerr<<"looking for existing connection"<second; - for (auto listIt = list.begin(); listIt != list.end();) { - auto& entry = *listIt; - if (!entry->canBeReused()) { - if (!entry->willBeReusable(false)) { - listIt = list.erase(listIt); - } - else { - ++listIt; - } - continue; - } - entry->setReused(); - /* for connections that have not been used very recently, - check whether they have been closed in the meantime */ - if (freshCutOff < entry->getLastDataReceivedTime()) { - /* used recently enough, skip the check */ - ++ds->tcpReusedConnections; - return entry; - } - - if (isTCPSocketUsable(entry->getHandle())) { - ++ds->tcpReusedConnections; - return entry; - } - - listIt = list.erase(listIt); - } - } - } - - auto newConnection = std::make_shared(ds, mplexer, now, std::move(proxyProtocolPayload)); - if (!haveProxyProtocol) { - t_downstreamConnections[backendId].push_front(newConnection); - } - return newConnection; -} - static void handleCrossProtocolQuery(int pipefd, FDMultiplexer::funcparam_t& param) { auto threadData = boost::any_cast(param); @@ -1030,7 +860,7 @@ static void handleCrossProtocolQuery(int pipefd, FDMultiplexer::funcparam_t& par tmp = nullptr; try { - auto downstream = DownstreamDoHConnectionsManager::getConnectionToDownstream(threadData->mplexer, downstreamServer, now, std::move(query.d_proxyProtocolPayload)); + auto downstream = t_downstreamDoHConnectionsManager.getConnectionToDownstream(threadData->mplexer, downstreamServer, now, std::move(query.d_proxyProtocolPayload)); downstream->queueQuery(tqs, std::move(query)); } catch (...) { @@ -1062,7 +892,7 @@ static void dohClientThread(int crossProtocolPipeFD) lastTimeoutScan = now.tv_sec; try { - DownstreamDoHConnectionsManager::cleanupClosedConnections(now); + t_downstreamDoHConnectionsManager.cleanupClosedConnections(now); handleH2Timeouts(*data.mplexer, now); if (g_dohStatesDumpRequested > 0) { @@ -1304,7 +1134,7 @@ bool sendH2Query(const std::shared_ptr& ds, std::unique_ptrqueueQuery(sender, std::move(query)); } else { - auto connection = DownstreamDoHConnectionsManager::getConnectionToDownstream(mplexer, ds, now, std::move(query.d_proxyProtocolPayload)); + auto connection = t_downstreamDoHConnectionsManager.getConnectionToDownstream(mplexer, ds, now, std::move(query.d_proxyProtocolPayload)); connection->queueQuery(sender, std::move(query)); } @@ -1318,7 +1148,7 @@ size_t clearH2Connections() { size_t cleared = 0; #ifdef HAVE_NGHTTP2 - cleared = DownstreamDoHConnectionsManager::clear(); + cleared = t_downstreamDoHConnectionsManager.clear(); #endif /* HAVE_NGHTTP2 */ return cleared; } diff --git a/pdns/dnsdistdist/dnsdist-tcp-downstream.cc b/pdns/dnsdistdist/dnsdist-tcp-downstream.cc index 1a76842dc6..1677d65e7e 100644 --- a/pdns/dnsdistdist/dnsdist-tcp-downstream.cc +++ b/pdns/dnsdistdist/dnsdist-tcp-downstream.cc @@ -5,6 +5,8 @@ #include "dnsparser.hh" +thread_local DownstreamTCPConnectionsManager t_downstreamTCPConnectionsManager; + ConnectionToBackend::~ConnectionToBackend() { if (d_ds && d_handler) { @@ -738,7 +740,6 @@ bool TCPConnectionToBackend::isXFRFinished(const TCPResponse& response, TCPQuery } auto raw = unknownContent->getRawContent(); auto serial = getSerialFromRawSOAContent(raw); - ++query.d_xfrSerialCount; if (query.d_xfrMasterSerial == 0) { // store the first SOA in our client's connection metadata @@ -766,121 +767,3 @@ bool TCPConnectionToBackend::isXFRFinished(const TCPResponse& response, TCPQuery } return done; } - -std::shared_ptr DownstreamConnectionsManager::getConnectionToDownstream(std::unique_ptr& mplexer, std::shared_ptr& ds, const struct timeval& now) -{ - struct timeval freshCutOff = now; - freshCutOff.tv_sec -= 1; - - auto backendId = ds->getID(); - - cleanupClosedTCPConnections(now); - - { - const auto& it = t_downstreamConnections.find(backendId); - if (it != t_downstreamConnections.end()) { - auto& list = it->second; - for (auto listIt = list.begin(); listIt != list.end(); ) { - auto& entry = *listIt; - if (!entry->canBeReused()) { - if (!entry->willBeReusable(false)) { - listIt = list.erase(listIt); - } - else { - ++listIt; - } - continue; - } - - entry->setReused(); - /* for connections that have not been used very recently, - check whether they have been closed in the meantime */ - if (freshCutOff < entry->getLastDataReceivedTime()) { - /* used recently enough, skip the check */ - ++ds->tcpReusedConnections; - return entry; - } - - if (entry->isUsable()) { - ++ds->tcpReusedConnections; - return entry; - } - - listIt = list.erase(listIt); - } - } - } - - auto newConnection = std::make_shared(ds, mplexer, now); - if (!ds->useProxyProtocol) { - t_downstreamConnections[backendId].push_front(newConnection); - } - return newConnection; -} - -void DownstreamConnectionsManager::cleanupClosedTCPConnections(struct timeval now) -{ - if (s_cleanupInterval == 0 || (t_nextCleanup != 0 && t_nextCleanup > now.tv_sec)) { - return; - } - - t_nextCleanup = now.tv_sec + s_cleanupInterval; - - struct timeval freshCutOff = now; - freshCutOff.tv_sec -= 1; - struct timeval idleCutOff = now; - idleCutOff.tv_sec -= s_maxIdleTime; - - for (auto dsIt = t_downstreamConnections.begin(); dsIt != t_downstreamConnections.end(); ) { - for (auto connIt = dsIt->second.begin(); connIt != dsIt->second.end(); ) { - if (!(*connIt)) { - ++connIt; - continue; - } - - /* don't bother checking freshly used connections */ - if (freshCutOff < (*connIt)->getLastDataReceivedTime()) { - ++connIt; - continue; - } - - if ((*connIt)->isIdle() && (*connIt)->getLastDataReceivedTime() < idleCutOff) { - /* idle for too long */ - connIt = dsIt->second.erase(connIt); - continue; - } - - if ((*connIt)->isUsable()) { - ++connIt; - continue; - } - - connIt = dsIt->second.erase(connIt); - } - - if (!dsIt->second.empty()) { - ++dsIt; - } - else { - dsIt = t_downstreamConnections.erase(dsIt); - } - } -} - -size_t DownstreamConnectionsManager::clear() -{ - size_t count = 0; - for (const auto& downstream : t_downstreamConnections) { - count += downstream.second.size(); - } - - t_downstreamConnections.clear(); - - return count; -} - -thread_local map>> DownstreamConnectionsManager::t_downstreamConnections; -thread_local time_t DownstreamConnectionsManager::t_nextCleanup{0}; -size_t DownstreamConnectionsManager::s_maxCachedConnectionsPerDownstream{10}; -uint16_t DownstreamConnectionsManager::s_cleanupInterval{60}; -uint16_t DownstreamConnectionsManager::s_maxIdleTime{300}; diff --git a/pdns/dnsdistdist/dnsdist-tcp-downstream.hh b/pdns/dnsdistdist/dnsdist-tcp-downstream.hh index 7832786139..0eb6c1a225 100644 --- a/pdns/dnsdistdist/dnsdist-tcp-downstream.hh +++ b/pdns/dnsdistdist/dnsdist-tcp-downstream.hh @@ -10,7 +10,7 @@ class ConnectionToBackend : public std::enable_shared_from_this { public: - ConnectionToBackend(std::shared_ptr& ds, std::unique_ptr& mplexer, const struct timeval& now): d_connectionStartTime(now), d_lastDataReceivedTime(now), d_ds(ds), d_mplexer(mplexer), d_enableFastOpen(ds->tcpFastOpen) + ConnectionToBackend(const std::shared_ptr& ds, std::unique_ptr& mplexer, const struct timeval& now): d_connectionStartTime(now), d_lastDataReceivedTime(now), d_ds(ds), d_mplexer(mplexer), d_enableFastOpen(ds->tcpFastOpen) { reconnect(); } @@ -116,6 +116,9 @@ public: virtual bool reachedMaxConcurrentQueries() const = 0; virtual bool isIdle() const = 0; virtual void release() = 0; + virtual void stopIO() + { + } bool matches(const std::shared_ptr& ds) const { @@ -200,7 +203,7 @@ protected: struct timeval d_connectionStartTime; struct timeval d_lastDataReceivedTime; - std::shared_ptr d_ds{nullptr}; + const std::shared_ptr d_ds{nullptr}; std::shared_ptr d_sender{nullptr}; std::unique_ptr& d_mplexer; std::unique_ptr d_handler{nullptr}; @@ -217,7 +220,7 @@ protected: class TCPConnectionToBackend : public ConnectionToBackend { public: - TCPConnectionToBackend(std::shared_ptr& ds, std::unique_ptr& mplexer, const struct timeval& now): ConnectionToBackend(ds, mplexer, now), d_responseBuffer(s_maxPacketCacheEntrySize) + TCPConnectionToBackend(const std::shared_ptr& ds, std::unique_ptr& mplexer, const struct timeval& now, std::string&& /* proxyProtocolPayload*, unused but there to match the HTTP2 connections, so we can use the same templated connections manager class */): ConnectionToBackend(ds, mplexer, now), d_responseBuffer(s_maxPacketCacheEntrySize) { } @@ -295,13 +298,9 @@ private: State d_state{State::idle}; }; -class DownstreamConnectionsManager +template class DownstreamConnectionsManager { public: - static std::shared_ptr getConnectionToDownstream(std::unique_ptr& mplexer, std::shared_ptr& ds, const struct timeval& now); - static void cleanupClosedTCPConnections(struct timeval now); - static size_t clear(); - static void setMaxCachedConnectionsPerDownstream(size_t max) { s_maxCachedConnectionsPerDownstream = max; @@ -317,10 +316,168 @@ public: s_maxIdleTime = max; } -private: - static thread_local map>> t_downstreamConnections; - static thread_local time_t t_nextCleanup; + bool isConnectionUsable(const std::shared_ptr& conn, const struct timeval& now, const struct timeval& freshCutOff) + { + if (!conn->canBeReused()) { + return false; + } + + /* for connections that have not been used very recently, + check whether they have been closed in the meantime */ + if (freshCutOff < conn->getLastDataReceivedTime()) { + /* used recently enough, skip the check */ + return true; + } + + if (conn->isUsable()) { + return true; + } + + return false; + } + + std::shared_ptr getConnectionToDownstream(std::unique_ptr& mplexer, const std::shared_ptr& ds, const struct timeval& now, std::string&& proxyProtocolPayload) + { + struct timeval freshCutOff = now; + freshCutOff.tv_sec -= 1; + + auto backendId = ds->getID(); + + cleanupClosedConnections(now); + + const bool haveProxyProtocol = ds->useProxyProtocol || !proxyProtocolPayload.empty(); + if (!haveProxyProtocol) { + const auto& it = d_downstreamConnections.find(backendId); + if (it != d_downstreamConnections.end()) { + auto& list = it->second; + for (auto listIt = list.begin(); listIt != list.end(); ) { + if (!(*listIt)) { + listIt = list.erase(listIt); + continue; + } + + auto& entry = *listIt; + if (isConnectionUsable(entry, now, freshCutOff)) { + entry->setReused(); + ++ds->tcpReusedConnections; + return entry; + } + + if (entry->willBeReusable(false)) { + ++listIt; + continue; + } + + listIt = list.erase(listIt); + } + } + } + + auto newConnection = std::make_shared(ds, mplexer, now, std::move(proxyProtocolPayload)); + if (!haveProxyProtocol) { + d_downstreamConnections[backendId].push_front(newConnection); + } + + return newConnection; + } + + void cleanupClosedConnections(struct timeval now) + { + if (s_cleanupInterval == 0 || (d_nextCleanup != 0 && d_nextCleanup > now.tv_sec)) { + return; + } + + d_nextCleanup = now.tv_sec + s_cleanupInterval; + + struct timeval freshCutOff = now; + freshCutOff.tv_sec -= 1; + struct timeval idleCutOff = now; + idleCutOff.tv_sec -= s_maxIdleTime; + + for (auto dsIt = d_downstreamConnections.begin(); dsIt != d_downstreamConnections.end(); ) { + for (auto connIt = dsIt->second.begin(); connIt != dsIt->second.end(); ) { + if (!(*connIt)) { + connIt = dsIt->second.erase(connIt); + continue; + } + + auto& entry = *connIt; + + /* don't bother checking freshly used connections */ + if (freshCutOff < entry->getLastDataReceivedTime()) { + ++connIt; + continue; + } + + if (entry->isIdle() && entry->getLastDataReceivedTime() < idleCutOff) { + /* idle for too long */ + connIt = dsIt->second.erase(connIt); + continue; + } + + if (entry->isUsable()) { + ++connIt; + continue; + } + + connIt = dsIt->second.erase(connIt); + } + + if (!dsIt->second.empty()) { + ++dsIt; + } + else { + dsIt = d_downstreamConnections.erase(dsIt); + } + } + } + + size_t clear() + { + size_t count = 0; + for (const auto& downstream : d_downstreamConnections) { + count += downstream.second.size(); + for (auto& conn : downstream.second) { + conn->stopIO(); + } + } + + d_downstreamConnections.clear(); + return count; + } + + bool removeDownstreamConnection(std::shared_ptr& conn) + { + bool found = false; + auto backendIt = d_downstreamConnections.find(conn->getDS()->getID()); + if (backendIt == d_downstreamConnections.end()) { + return found; + } + + for (auto connIt = backendIt->second.begin(); connIt != backendIt->second.end(); ++connIt) { + if (*connIt == conn) { + backendIt->second.erase(connIt); + found = true; + break; + } + } + + return found; + } + +protected: + static size_t s_maxCachedConnectionsPerDownstream; static uint16_t s_cleanupInterval; static uint16_t s_maxIdleTime; + + std::map>> d_downstreamConnections; + time_t d_nextCleanup{0}; }; + +template size_t DownstreamConnectionsManager::s_maxCachedConnectionsPerDownstream{10}; +template uint16_t DownstreamConnectionsManager::s_cleanupInterval{60}; +template uint16_t DownstreamConnectionsManager::s_maxIdleTime{300}; + +using DownstreamTCPConnectionsManager = DownstreamConnectionsManager; +extern thread_local DownstreamTCPConnectionsManager t_downstreamTCPConnectionsManager;