From 8a2dd7db65242b50e8a09359ccd25a21c9b4422f Mon Sep 17 00:00:00 2001 From: Remi Gacogne Date: Mon, 24 Jan 2022 09:34:35 +0100 Subject: [PATCH] dnsdist: Move DownstreamConnectionsManager to its own header --- pdns/dnsdist-lua.cc | 6 +- pdns/dnsdist-tcp.cc | 1 + pdns/dnsdistdist/Makefile.am | 2 + .../dnsdist-downstream-connection.hh | 304 ++++++++++++++++++ pdns/dnsdistdist/dnsdist-nghttp2.cc | 1 + pdns/dnsdistdist/dnsdist-tcp-downstream.cc | 16 + pdns/dnsdistdist/dnsdist-tcp-downstream.hh | 299 +---------------- .../test-dnsdist-connections-cache.cc | 1 + 8 files changed, 331 insertions(+), 299 deletions(-) create mode 100644 pdns/dnsdistdist/dnsdist-downstream-connection.hh diff --git a/pdns/dnsdist-lua.cc b/pdns/dnsdist-lua.cc index 2f8d858c9b..4e0b475dc6 100644 --- a/pdns/dnsdist-lua.cc +++ b/pdns/dnsdist-lua.cc @@ -1356,7 +1356,7 @@ static void setupLuaConfig(LuaContext& luaCtx, bool client, bool configCheck) }); luaCtx.writeFunction("setMaxCachedTCPConnectionsPerDownstream", [](uint64_t max) { - DownstreamTCPConnectionsManager::setMaxIdleConnectionsPerDownstream(max); + setTCPDownstreamMaxIdleConnectionsPerBackend(max); }); luaCtx.writeFunction("setMaxIdleDoHConnectionsPerDownstream", [](uint64_t max) { @@ -2104,7 +2104,7 @@ static void setupLuaConfig(LuaContext& luaCtx, bool client, bool configCheck) luaCtx.writeFunction("setTCPDownstreamCleanupInterval", [](uint64_t interval) { setLuaSideEffect(); checkParameterBound("setTCPDownstreamCleanupInterval", interval); - DownstreamTCPConnectionsManager::setCleanupInterval(interval); + setTCPDownstreamCleanupInterval(interval); }); luaCtx.writeFunction("setDoHDownstreamCleanupInterval", [](uint64_t interval) { @@ -2116,7 +2116,7 @@ static void setupLuaConfig(LuaContext& luaCtx, bool client, bool configCheck) luaCtx.writeFunction("setTCPDownstreamMaxIdleTime", [](uint64_t max) { setLuaSideEffect(); checkParameterBound("setTCPDownstreamMaxIdleTime", max); - DownstreamTCPConnectionsManager::setMaxIdleTime(max); + setTCPDownstreamMaxIdleTime(max); }); luaCtx.writeFunction("setDoHDownstreamMaxIdleTime", [](uint64_t max) { diff --git a/pdns/dnsdist-tcp.cc b/pdns/dnsdist-tcp.cc index ada447c3de..9252f53c63 100644 --- a/pdns/dnsdist-tcp.cc +++ b/pdns/dnsdist-tcp.cc @@ -30,6 +30,7 @@ #include "dnsdist-rings.hh" #include "dnsdist-tcp.hh" #include "dnsdist-tcp-downstream.hh" +#include "dnsdist-downstream-connection.hh" #include "dnsdist-tcp-upstream.hh" #include "dnsdist-xpf.hh" #include "dnsparser.hh" diff --git a/pdns/dnsdistdist/Makefile.am b/pdns/dnsdistdist/Makefile.am index 2a15da9086..2a7e711ccf 100644 --- a/pdns/dnsdistdist/Makefile.am +++ b/pdns/dnsdistdist/Makefile.am @@ -140,6 +140,7 @@ dnsdist_SOURCES = \ dnsdist-console.cc dnsdist-console.hh \ dnsdist-discovery.cc dnsdist-discovery.hh \ dnsdist-dnscrypt.cc \ + dnsdist-downstream-connection.hh \ dnsdist-dynblocks.cc dnsdist-dynblocks.hh \ dnsdist-dynbpf.cc dnsdist-dynbpf.hh \ dnsdist-ecs.cc dnsdist-ecs.hh \ @@ -236,6 +237,7 @@ testrunner_SOURCES = \ dnscrypt.cc dnscrypt.hh \ dnsdist-backend.cc \ dnsdist-cache.cc dnsdist-cache.hh \ + dnsdist-downstream-connection.hh \ dnsdist-dynblocks.cc dnsdist-dynblocks.hh \ dnsdist-dynbpf.cc dnsdist-dynbpf.hh \ dnsdist-ecs.cc dnsdist-ecs.hh \ diff --git a/pdns/dnsdistdist/dnsdist-downstream-connection.hh b/pdns/dnsdistdist/dnsdist-downstream-connection.hh new file mode 100644 index 0000000000..2d71919e64 --- /dev/null +++ b/pdns/dnsdistdist/dnsdist-downstream-connection.hh @@ -0,0 +1,304 @@ +#pragma once + +#include +#include +#include +#include + +#include "tcpiohandler-mplexer.hh" +#include "dnsdist-tcp.hh" + +template +class DownstreamConnectionsManager +{ + struct SequencedTag + { + }; + struct OrderedTag + { + }; + + typedef multi_index_container< + std::shared_ptr, + indexed_by< + ordered_unique, + identity>>, + /* new elements are added to the front of the sequence */ + sequenced>>> + list_t; + struct ConnectionLists + { + list_t d_actives; + list_t d_idles; + }; + +public: + static void setMaxIdleConnectionsPerDownstream(size_t max) + { + s_maxIdleConnectionsPerDownstream = max; + } + + static void setCleanupInterval(uint16_t interval) + { + s_cleanupInterval = interval; + } + + static void setMaxIdleTime(uint16_t max) + { + s_maxIdleTime = max; + } + + std::shared_ptr getConnectionToDownstream(std::unique_ptr& mplexer, const std::shared_ptr& ds, const struct timeval& now, std::string&& proxyProtocolPayload) + { + struct timeval freshCutOff = now; + freshCutOff.tv_sec -= 1; + + auto backendId = ds->getID(); + + cleanupClosedConnections(now); + + const bool haveProxyProtocol = ds->d_config.useProxyProtocol || !proxyProtocolPayload.empty(); + if (!haveProxyProtocol) { + const auto& it = d_downstreamConnections.find(backendId); + if (it != d_downstreamConnections.end()) { + /* first scan idle connections, more recent first */ + auto entry = findUsableConnectionInList(now, freshCutOff, it->second.d_idles, true); + if (entry) { + ++ds->tcpReusedConnections; + it->second.d_actives.insert(entry); + return entry; + } + + /* then scan actives ones, more recent first as well */ + entry = findUsableConnectionInList(now, freshCutOff, it->second.d_actives, false); + if (entry) { + ++ds->tcpReusedConnections; + return entry; + } + } + } + + auto newConnection = std::make_shared(ds, mplexer, now, std::move(proxyProtocolPayload)); + if (!haveProxyProtocol) { + auto& list = d_downstreamConnections[backendId].d_actives; + list.template get().push_front(newConnection); + } + + return newConnection; + } + + void cleanupClosedConnections(const struct timeval& now) + { + if (s_cleanupInterval == 0 || (d_nextCleanup != 0 && d_nextCleanup > now.tv_sec)) { + return; + } + + d_nextCleanup = now.tv_sec + s_cleanupInterval; + + struct timeval freshCutOff = now; + freshCutOff.tv_sec -= 1; + struct timeval idleCutOff = now; + idleCutOff.tv_sec -= s_maxIdleTime; + + for (auto dsIt = d_downstreamConnections.begin(); dsIt != d_downstreamConnections.end();) { + cleanUpList(dsIt->second.d_idles, now, freshCutOff, idleCutOff); + cleanUpList(dsIt->second.d_actives, now, freshCutOff, idleCutOff); + + if (dsIt->second.d_idles.empty() && dsIt->second.d_actives.empty()) { + dsIt = d_downstreamConnections.erase(dsIt); + } + else { + ++dsIt; + } + } + } + + size_t clear() + { + size_t count = 0; + for (const auto& downstream : d_downstreamConnections) { + count += downstream.second.d_actives.size(); + for (auto& conn : downstream.second.d_actives) { + conn->stopIO(); + } + count += downstream.second.d_idles.size(); + for (auto& conn : downstream.second.d_idles) { + conn->stopIO(); + } + } + + d_downstreamConnections.clear(); + return count; + } + + size_t count() const + { + return getActiveCount() + getIdleCount(); + } + + size_t getActiveCount() const + { + size_t count = 0; + for (const auto& downstream : d_downstreamConnections) { + count += downstream.second.d_actives.size(); + } + return count; + } + + size_t getIdleCount() const + { + size_t count = 0; + for (const auto& downstream : d_downstreamConnections) { + count += downstream.second.d_idles.size(); + } + return count; + } + + bool removeDownstreamConnection(std::shared_ptr& conn) + { + auto backendIt = d_downstreamConnections.find(conn->getDS()->getID()); + if (backendIt == d_downstreamConnections.end()) { + return false; + } + + /* idle list first */ + { + auto it = backendIt->second.d_idles.find(conn); + if (it != backendIt->second.d_idles.end()) { + backendIt->second.d_idles.erase(it); + return true; + } + } + /* then active */ + { + auto it = backendIt->second.d_actives.find(conn); + if (it != backendIt->second.d_actives.end()) { + backendIt->second.d_actives.erase(it); + return true; + } + } + + return false; + } + + bool moveToIdle(std::shared_ptr& conn) + { + auto backendIt = d_downstreamConnections.find(conn->getDS()->getID()); + if (backendIt == d_downstreamConnections.end()) { + return false; + } + + auto it = backendIt->second.d_actives.find(conn); + if (it == backendIt->second.d_actives.end()) { + return false; + } + + backendIt->second.d_actives.erase(it); + + if (backendIt->second.d_idles.size() >= s_maxIdleConnectionsPerDownstream) { + backendIt->second.d_idles.template get().pop_back(); + } + + backendIt->second.d_idles.template get().push_front(conn); + return true; + } + +protected: + void cleanUpList(list_t& list, const struct timeval& now, const struct timeval& freshCutOff, const struct timeval& idleCutOff) + { + auto& sidx = list.template get(); + for (auto connIt = sidx.begin(); connIt != sidx.end();) { + if (!(*connIt)) { + connIt = sidx.erase(connIt); + continue; + } + + auto& entry = *connIt; + + /* don't bother checking freshly used connections */ + if (freshCutOff < entry->getLastDataReceivedTime()) { + ++connIt; + continue; + } + + if (entry->isIdle() && entry->getLastDataReceivedTime() < idleCutOff) { + /* idle for too long */ + connIt = sidx.erase(connIt); + continue; + } + + if (entry->isUsable()) { + ++connIt; + continue; + } + + connIt = sidx.erase(connIt); + } + } + + std::shared_ptr findUsableConnectionInList(const struct timeval& now, const struct timeval& freshCutOff, list_t& list, bool removeIfFound) + { + auto& sidx = list.template get(); + for (auto listIt = sidx.begin(); listIt != sidx.end();) { + if (!(*listIt)) { + listIt = sidx.erase(listIt); + continue; + } + + auto& entry = *listIt; + if (isConnectionUsable(entry, now, freshCutOff)) { + entry->setReused(); + // make a copy since the iterator will be invalidated after erasing + auto result = entry; + if (removeIfFound) { + sidx.erase(listIt); + } + return result; + } + + if (entry->willBeReusable(false)) { + ++listIt; + continue; + } + + /* that connection will not be usable later, no need to keep it in that list */ + listIt = sidx.erase(listIt); + } + + return nullptr; + } + + bool isConnectionUsable(const std::shared_ptr& conn, const struct timeval& now, const struct timeval& freshCutOff) + { + if (!conn->canBeReused()) { + return false; + } + + /* for connections that have not been used very recently, + check whether they have been closed in the meantime */ + if (freshCutOff < conn->getLastDataReceivedTime()) { + /* used recently enough, skip the check */ + return true; + } + + return conn->isUsable(); + } + + static size_t s_maxIdleConnectionsPerDownstream; + static uint16_t s_cleanupInterval; + static uint16_t s_maxIdleTime; + + std::map d_downstreamConnections; + + time_t d_nextCleanup{0}; +}; + +template +size_t DownstreamConnectionsManager::s_maxIdleConnectionsPerDownstream{10}; +template +uint16_t DownstreamConnectionsManager::s_cleanupInterval{60}; +template +uint16_t DownstreamConnectionsManager::s_maxIdleTime{300}; + +using DownstreamTCPConnectionsManager = DownstreamConnectionsManager; +extern thread_local DownstreamTCPConnectionsManager t_downstreamTCPConnectionsManager; diff --git a/pdns/dnsdistdist/dnsdist-nghttp2.cc b/pdns/dnsdistdist/dnsdist-nghttp2.cc index 573319a46c..85abf0d3fd 100644 --- a/pdns/dnsdistdist/dnsdist-nghttp2.cc +++ b/pdns/dnsdistdist/dnsdist-nghttp2.cc @@ -29,6 +29,7 @@ #include "dnsdist-nghttp2.hh" #include "dnsdist-tcp.hh" #include "dnsdist-tcp-downstream.hh" +#include "dnsdist-downstream-connection.hh" #include "dolog.hh" #include "iputils.hh" diff --git a/pdns/dnsdistdist/dnsdist-tcp-downstream.cc b/pdns/dnsdistdist/dnsdist-tcp-downstream.cc index a85d83cd37..34d6cc790c 100644 --- a/pdns/dnsdistdist/dnsdist-tcp-downstream.cc +++ b/pdns/dnsdistdist/dnsdist-tcp-downstream.cc @@ -2,6 +2,7 @@ #include "dnsdist-session-cache.hh" #include "dnsdist-tcp-downstream.hh" #include "dnsdist-tcp-upstream.hh" +#include "dnsdist-downstream-connection.hh" #include "dnsparser.hh" @@ -774,3 +775,18 @@ bool TCPConnectionToBackend::isXFRFinished(const TCPResponse& response, TCPQuery } return done; } + +void setTCPDownstreamMaxIdleConnectionsPerBackend(uint64_t max) +{ + DownstreamTCPConnectionsManager::setMaxIdleConnectionsPerDownstream(max); +} + +void setTCPDownstreamCleanupInterval(uint64_t interval) +{ + DownstreamTCPConnectionsManager::setCleanupInterval(interval); +} + +void setTCPDownstreamMaxIdleTime(uint64_t max) +{ + DownstreamTCPConnectionsManager::setMaxIdleTime(max); +} diff --git a/pdns/dnsdistdist/dnsdist-tcp-downstream.hh b/pdns/dnsdistdist/dnsdist-tcp-downstream.hh index 7f3cd6a0d8..c726775938 100644 --- a/pdns/dnsdistdist/dnsdist-tcp-downstream.hh +++ b/pdns/dnsdistdist/dnsdist-tcp-downstream.hh @@ -1,12 +1,5 @@ #pragma once -#include -#include -#include -#include - -#include - #include "sstuff.hh" #include "tcpiohandler-mplexer.hh" #include "dnsdist.hh" @@ -304,292 +297,6 @@ private: State d_state{State::idle}; }; -template class DownstreamConnectionsManager -{ - struct SequencedTag {}; - struct OrderedTag {}; - - typedef multi_index_container< - std::shared_ptr, - indexed_by < - ordered_unique, - identity> - >, - /* new elements are added to the front of the sequence */ - sequenced > - > - > list_t; - struct ConnectionLists - { - list_t d_actives; - list_t d_idles; - }; - -public: - static void setMaxIdleConnectionsPerDownstream(size_t max) - { - s_maxIdleConnectionsPerDownstream = max; - } - - static void setCleanupInterval(uint16_t interval) - { - s_cleanupInterval = interval; - } - - static void setMaxIdleTime(uint16_t max) - { - s_maxIdleTime = max; - } - - std::shared_ptr getConnectionToDownstream(std::unique_ptr& mplexer, const std::shared_ptr& ds, const struct timeval& now, std::string&& proxyProtocolPayload) - { - struct timeval freshCutOff = now; - freshCutOff.tv_sec -= 1; - - auto backendId = ds->getID(); - - cleanupClosedConnections(now); - - const bool haveProxyProtocol = ds->d_config.useProxyProtocol || !proxyProtocolPayload.empty(); - if (!haveProxyProtocol) { - const auto& it = d_downstreamConnections.find(backendId); - if (it != d_downstreamConnections.end()) { - /* first scan idle connections, more recent first */ - auto entry = findUsableConnectionInList(now, freshCutOff, it->second.d_idles, true); - if (entry) { - ++ds->tcpReusedConnections; - it->second.d_actives.insert(entry); - return entry; - } - - /* then scan actives ones, more recent first as well */ - entry = findUsableConnectionInList(now, freshCutOff, it->second.d_actives, false); - if (entry) { - ++ds->tcpReusedConnections; - return entry; - } - } - } - - auto newConnection = std::make_shared(ds, mplexer, now, std::move(proxyProtocolPayload)); - if (!haveProxyProtocol) { - auto& list = d_downstreamConnections[backendId].d_actives; - list.template get().push_front(newConnection); - } - - return newConnection; - } - - void cleanupClosedConnections(const struct timeval& now) - { - if (s_cleanupInterval == 0 || (d_nextCleanup != 0 && d_nextCleanup > now.tv_sec)) { - return; - } - - d_nextCleanup = now.tv_sec + s_cleanupInterval; - - struct timeval freshCutOff = now; - freshCutOff.tv_sec -= 1; - struct timeval idleCutOff = now; - idleCutOff.tv_sec -= s_maxIdleTime; - - for (auto dsIt = d_downstreamConnections.begin(); dsIt != d_downstreamConnections.end(); ) { - cleanUpList(dsIt->second.d_idles, now, freshCutOff, idleCutOff); - cleanUpList(dsIt->second.d_actives, now, freshCutOff, idleCutOff); - - if (dsIt->second.d_idles.empty() && dsIt->second.d_actives.empty()) { - dsIt = d_downstreamConnections.erase(dsIt); - } - else { - ++dsIt; - } - } - } - - size_t clear() - { - size_t count = 0; - for (const auto& downstream : d_downstreamConnections) { - count += downstream.second.d_actives.size(); - for (auto& conn : downstream.second.d_actives) { - conn->stopIO(); - } - count += downstream.second.d_idles.size(); - for (auto& conn : downstream.second.d_idles) { - conn->stopIO(); - } - } - - d_downstreamConnections.clear(); - return count; - } - - size_t count() const - { - return getActiveCount() + getIdleCount(); - } - - size_t getActiveCount() const - { - size_t count = 0; - for (const auto& downstream : d_downstreamConnections) { - count += downstream.second.d_actives.size(); - } - return count; - } - - size_t getIdleCount() const - { - size_t count = 0; - for (const auto& downstream : d_downstreamConnections) { - count += downstream.second.d_idles.size(); - } - return count; - } - - bool removeDownstreamConnection(std::shared_ptr& conn) - { - auto backendIt = d_downstreamConnections.find(conn->getDS()->getID()); - if (backendIt == d_downstreamConnections.end()) { - return false; - } - - /* idle list first */ - { - auto it = backendIt->second.d_idles.find(conn); - if (it != backendIt->second.d_idles.end()) { - backendIt->second.d_idles.erase(it); - return true; - } - } - /* then active */ - { - auto it = backendIt->second.d_actives.find(conn); - if (it != backendIt->second.d_actives.end()) { - backendIt->second.d_actives.erase(it); - return true; - } - } - - return false; - } - - bool moveToIdle(std::shared_ptr& conn) - { - auto backendIt = d_downstreamConnections.find(conn->getDS()->getID()); - if (backendIt == d_downstreamConnections.end()) { - return false; - } - - auto it = backendIt->second.d_actives.find(conn); - if (it == backendIt->second.d_actives.end()) { - return false; - } - - backendIt->second.d_actives.erase(it); - - if (backendIt->second.d_idles.size() >= s_maxIdleConnectionsPerDownstream) { - backendIt->second.d_idles.template get().pop_back(); - } - - backendIt->second.d_idles.template get().push_front(conn); - return true; - } - -protected: - - void cleanUpList(list_t& list, const struct timeval& now, const struct timeval& freshCutOff, const struct timeval& idleCutOff) - { - auto& sidx = list.template get(); - for (auto connIt = sidx.begin(); connIt != sidx.end(); ) { - if (!(*connIt)) { - connIt = sidx.erase(connIt); - continue; - } - - auto& entry = *connIt; - - /* don't bother checking freshly used connections */ - if (freshCutOff < entry->getLastDataReceivedTime()) { - ++connIt; - continue; - } - - if (entry->isIdle() && entry->getLastDataReceivedTime() < idleCutOff) { - /* idle for too long */ - connIt = sidx.erase(connIt); - continue; - } - - if (entry->isUsable()) { - ++connIt; - continue; - } - - connIt = sidx.erase(connIt); - } - } - - std::shared_ptr findUsableConnectionInList(const struct timeval& now, const struct timeval& freshCutOff, list_t& list, bool removeIfFound) - { - auto& sidx = list.template get(); - for (auto listIt = sidx.begin(); listIt != sidx.end(); ) { - if (!(*listIt)) { - listIt = sidx.erase(listIt); - continue; - } - - auto& entry = *listIt; - if (isConnectionUsable(entry, now, freshCutOff)) { - entry->setReused(); - // make a copy since the iterator will be invalidated after erasing - auto result = entry; - if (removeIfFound) { - sidx.erase(listIt); - } - return result; - } - - if (entry->willBeReusable(false)) { - ++listIt; - continue; - } - - /* that connection will not be usable later, no need to keep it in that list */ - listIt = sidx.erase(listIt); - } - - return nullptr; - } - - bool isConnectionUsable(const std::shared_ptr& conn, const struct timeval& now, const struct timeval& freshCutOff) - { - if (!conn->canBeReused()) { - return false; - } - - /* for connections that have not been used very recently, - check whether they have been closed in the meantime */ - if (freshCutOff < conn->getLastDataReceivedTime()) { - /* used recently enough, skip the check */ - return true; - } - - return conn->isUsable(); - } - - static size_t s_maxIdleConnectionsPerDownstream; - static uint16_t s_cleanupInterval; - static uint16_t s_maxIdleTime; - - std::map d_downstreamConnections; - - time_t d_nextCleanup{0}; -}; - -template size_t DownstreamConnectionsManager::s_maxIdleConnectionsPerDownstream{10}; -template uint16_t DownstreamConnectionsManager::s_cleanupInterval{60}; -template uint16_t DownstreamConnectionsManager::s_maxIdleTime{300}; - -using DownstreamTCPConnectionsManager = DownstreamConnectionsManager; -extern thread_local DownstreamTCPConnectionsManager t_downstreamTCPConnectionsManager; +void setTCPDownstreamMaxIdleConnectionsPerBackend(uint64_t max); +void setTCPDownstreamCleanupInterval(uint64_t interval); +void setTCPDownstreamMaxIdleTime(uint64_t max); diff --git a/pdns/dnsdistdist/test-dnsdist-connections-cache.cc b/pdns/dnsdistdist/test-dnsdist-connections-cache.cc index a59163b039..78c667c714 100644 --- a/pdns/dnsdistdist/test-dnsdist-connections-cache.cc +++ b/pdns/dnsdistdist/test-dnsdist-connections-cache.cc @@ -25,6 +25,7 @@ #include #include "dnsdist-tcp-downstream.hh" +#include "dnsdist-downstream-connection.hh" class MockupConnection { -- 2.47.2