]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
dnsdist: Refactoring of the TCP connection caches
authorRemi Gacogne <remi.gacogne@powerdns.com>
Tue, 2 Nov 2021 16:56:04 +0000 (17:56 +0100)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Mon, 8 Nov 2021 09:26:12 +0000 (10:26 +0100)
pdns/dnsdist-lua.cc
pdns/dnsdist-tcp.cc
pdns/dnsdistdist/dnsdist-nghttp2.cc
pdns/dnsdistdist/dnsdist-tcp-downstream.cc
pdns/dnsdistdist/dnsdist-tcp-downstream.hh

index 5d795a6d84f6645afd1a5c16d87d579021262194..9e58507796a6083bda00efc23c3de2c457a91899 100644 (file)
@@ -1306,7 +1306,7 @@ static void setupLuaConfig(LuaContext& luaCtx, bool client, bool configCheck)
   });
 
   luaCtx.writeFunction("setMaxCachedTCPConnectionsPerDownstream", [](size_t max) {
-    DownstreamConnectionsManager::setMaxCachedConnectionsPerDownstream(max);
+    DownstreamTCPConnectionsManager::setMaxCachedConnectionsPerDownstream(max);
   });
 
   luaCtx.writeFunction("setMaxCachedDoHConnectionsPerDownstream", [](size_t max) {
@@ -2121,7 +2121,7 @@ static void setupLuaConfig(LuaContext& luaCtx, bool client, bool configCheck)
   luaCtx.writeFunction("setTCPDownstreamCleanupInterval", [](uint64_t interval) {
     setLuaSideEffect();
     checkParameterBound("setTCPDownstreamCleanupInterval", interval);
-    DownstreamConnectionsManager::setCleanupInterval(interval);
+    DownstreamTCPConnectionsManager::setCleanupInterval(interval);
   });
 
   luaCtx.writeFunction("setDoHDownstreamCleanupInterval", [](uint64_t interval) {
@@ -2133,7 +2133,7 @@ static void setupLuaConfig(LuaContext& luaCtx, bool client, bool configCheck)
   luaCtx.writeFunction("setTCPDownstreamMaxIdleTime", [](uint64_t max) {
     setLuaSideEffect();
     checkParameterBound("setTCPDownstreamMaxIdleTime", max);
-    DownstreamConnectionsManager::setMaxIdleTime(max);
+    DownstreamTCPConnectionsManager::setMaxIdleTime(max);
   });
 
   luaCtx.writeFunction("setDoHDownstreamMaxIdleTime", [](uint64_t max) {
index 86d5b9c7b029db77509584eb0acb8c7cf7e1c18e..5d3a422a8e93e0eecad0f3efd818b5911bcd96a7 100644 (file)
@@ -106,7 +106,7 @@ IncomingTCPConnectionState::~IncomingTCPConnectionState()
 
 size_t IncomingTCPConnectionState::clearAllDownstreamConnections()
 {
-  return DownstreamConnectionsManager::clear();
+  return t_downstreamTCPConnectionsManager.clear();
 }
 
 std::shared_ptr<TCPConnectionToBackend> IncomingTCPConnectionState::getDownstreamConnection(std::shared_ptr<DownstreamState>& ds, const std::unique_ptr<std::vector<ProxyProtocolValue>>& tlvs, const struct timeval& now)
@@ -117,7 +117,7 @@ std::shared_ptr<TCPConnectionToBackend> IncomingTCPConnectionState::getDownstrea
 
   if (!downstream) {
     /* we don't have a connection to this backend owned yet, let's get one (it might not be a fresh one, though) */
-    downstream = DownstreamConnectionsManager::getConnectionToDownstream(d_threadData.mplexer, ds, now);
+    downstream = t_downstreamTCPConnectionsManager.getConnectionToDownstream(d_threadData.mplexer, ds, now, std::string());
     if (ds->useProxyProtocol) {
       registerOwnedDownstreamConnection(downstream);
     }
@@ -1134,7 +1134,7 @@ static void handleCrossProtocolQuery(int pipefd, FDMultiplexer::funcparam_t& par
     tmp = nullptr;
 
     try {
-      auto downstream = DownstreamConnectionsManager::getConnectionToDownstream(threadData->mplexer, downstreamServer, now);
+      auto downstream = t_downstreamTCPConnectionsManager.getConnectionToDownstream(threadData->mplexer, downstreamServer, now, std::string());
 
       prependSizeToTCPQuery(query.d_buffer, proxyProtocolPayloadSize);
       downstream->queueQuery(tqs, std::move(query));
@@ -1210,7 +1210,7 @@ static void tcpClientThread(int pipefd, int crossProtocolQueriesPipeFD, int cros
       data.mplexer->run(&now);
 
       try {
-        DownstreamConnectionsManager::cleanupClosedTCPConnections(now);
+        t_downstreamTCPConnectionsManager.cleanupClosedConnections(now);
 
         if (now.tv_sec > lastTimeoutScan) {
           lastTimeoutScan = now.tv_sec;
index b739ac3c05eaf714520a656601eecff101e4935e..4eb31cec98998dc15509131eac81e6308c38b326 100644 (file)
@@ -46,7 +46,7 @@ std::optional<uint16_t> g_outgoingDoHWorkerThreads{std::nullopt};
 class DoHConnectionToBackend : public ConnectionToBackend
 {
 public:
-  DoHConnectionToBackend(std::shared_ptr<DownstreamState> ds, std::unique_ptr<FDMultiplexer>& mplexer, const struct timeval& now, std::string&& proxyProtocolPayload);
+  DoHConnectionToBackend(const std::shared_ptr<DownstreamState>& ds, std::unique_ptr<FDMultiplexer>& mplexer, const struct timeval& now, std::string&& proxyProtocolPayload);
 
   void handleTimeout(const struct timeval& now, bool write) override;
   void queueQuery(std::shared_ptr<TCPQuerySender>& sender, TCPQuery&& query) override;
@@ -63,7 +63,7 @@ public:
     d_healthCheckQuery = h;
   }
 
-  void stopIO();
+  void stopIO() override;
   bool reachedMaxConcurrentQueries() const override;
   bool reachedMaxStreamID() const override;
   bool isIdle() const override;
@@ -121,37 +121,8 @@ private:
   bool d_firstWrite{true};
 };
 
-class DownstreamDoHConnectionsManager
-{
-public:
-  static std::shared_ptr<DoHConnectionToBackend> getConnectionToDownstream(std::unique_ptr<FDMultiplexer>& mplexer, const std::shared_ptr<DownstreamState>& ds, const struct timeval& now, std::string&& proxyProtocolPayload);
-  static void releaseDownstreamConnection(std::shared_ptr<DoHConnectionToBackend>&& conn);
-  static bool removeDownstreamConnection(std::shared_ptr<DoHConnectionToBackend>& conn);
-  static void cleanupClosedConnections(struct timeval now);
-  static size_t clear();
-
-  static void setMaxCachedConnectionsPerDownstream(size_t max)
-  {
-    s_maxCachedConnectionsPerDownstream = max;
-  }
-
-  static void setCleanupInterval(uint16_t interval)
-  {
-    s_cleanupInterval = interval;
-  }
-
-  static void setMaxIdleTime(uint16_t max)
-  {
-    s_maxIdleTime = max;
-  }
-
-private:
-  static thread_local map<boost::uuids::uuid, std::deque<std::shared_ptr<DoHConnectionToBackend>>> t_downstreamConnections;
-  static thread_local time_t t_nextCleanup;
-  static size_t s_maxCachedConnectionsPerDownstream;
-  static uint16_t s_cleanupInterval;
-  static uint16_t s_maxIdleTime;
-};
+using DownstreamDoHConnectionsManager = DownstreamConnectionsManager<DoHConnectionToBackend>;
+thread_local DownstreamDoHConnectionsManager t_downstreamDoHConnectionsManager;
 
 uint32_t DoHConnectionToBackend::getConcurrentStreamsCount() const
 {
@@ -493,7 +464,7 @@ void DoHConnectionToBackend::stopIO()
     /* remove ourselves from the connection cache, this might mean that our
        reference count drops to zero after that, so we need to be careful */
     auto shared = std::dynamic_pointer_cast<DoHConnectionToBackend>(shared_from_this());
-    DownstreamDoHConnectionsManager::removeDownstreamConnection(shared);
+    t_downstreamDoHConnectionsManager.removeDownstreamConnection(shared);
   }
 }
 
@@ -745,7 +716,7 @@ int DoHConnectionToBackend::on_stream_close_callback(nghttp2_session* session, i
   if (request.d_query.d_downstreamFailures < conn->d_ds->d_retries) {
     // cerr<<"in "<<__PRETTY_FUNCTION__<<", looking for a connection to send a query of size "<<request.d_query.d_buffer.size()<<endl;
     ++request.d_query.d_downstreamFailures;
-    auto downstream = DownstreamDoHConnectionsManager::getConnectionToDownstream(conn->d_mplexer, conn->d_ds, now, std::string(conn->d_proxyProtocolPayload));
+    auto downstream = t_downstreamDoHConnectionsManager.getConnectionToDownstream(conn->d_mplexer, conn->d_ds, now, std::string(conn->d_proxyProtocolPayload));
     downstream->queueQuery(request.d_sender, std::move(request.d_query));
   }
   else {
@@ -803,7 +774,7 @@ int DoHConnectionToBackend::on_error_callback(nghttp2_session* session, int lib_
   return 0;
 }
 
-DoHConnectionToBackend::DoHConnectionToBackend(std::shared_ptr<DownstreamState> ds, std::unique_ptr<FDMultiplexer>& mplexer, const struct timeval& now, std::string&& proxyProtocolPayload) :
+DoHConnectionToBackend::DoHConnectionToBackend(const std::shared_ptr<DownstreamState>& ds, std::unique_ptr<FDMultiplexer>& mplexer, const struct timeval& now, std::string&& proxyProtocolPayload) :
   ConnectionToBackend(ds, mplexer, now), d_proxyProtocolPayload(std::move(proxyProtocolPayload))
 {
   // inherit most of the stuff from the ConnectionToBackend()
@@ -859,147 +830,6 @@ DoHConnectionToBackend::DoHConnectionToBackend(std::shared_ptr<DownstreamState>
   }
 }
 
-thread_local map<boost::uuids::uuid, std::deque<std::shared_ptr<DoHConnectionToBackend>>> DownstreamDoHConnectionsManager::t_downstreamConnections;
-thread_local time_t DownstreamDoHConnectionsManager::t_nextCleanup{0};
-size_t DownstreamDoHConnectionsManager::s_maxCachedConnectionsPerDownstream{10};
-uint16_t DownstreamDoHConnectionsManager::s_cleanupInterval{60};
-uint16_t DownstreamDoHConnectionsManager::s_maxIdleTime{300};
-
-size_t DownstreamDoHConnectionsManager::clear()
-{
-  size_t result = 0;
-  for (const auto& backend : t_downstreamConnections) {
-    result += backend.second.size();
-    for (auto& conn : backend.second) {
-      conn->stopIO();
-    }
-  }
-  t_downstreamConnections.clear();
-  return result;
-}
-
-bool DownstreamDoHConnectionsManager::removeDownstreamConnection(std::shared_ptr<DoHConnectionToBackend>& conn)
-{
-  bool found = false;
-  auto backendIt = t_downstreamConnections.find(conn->getDS()->getID());
-  if (backendIt == t_downstreamConnections.end()) {
-    return found;
-  }
-
-  for (auto connIt = backendIt->second.begin(); connIt != backendIt->second.end(); ++connIt) {
-    if (*connIt == conn) {
-      backendIt->second.erase(connIt);
-      found = true;
-      break;
-    }
-  }
-
-  return found;
-}
-
-void DownstreamDoHConnectionsManager::cleanupClosedConnections(struct timeval now)
-{
-  //cerr<<"cleanup interval is "<<s_cleanupInterval<<", next cleanup is "<<t_nextCleanup<<", now is "<<now.tv_sec<<endl;
-  if (s_cleanupInterval <= 0 || (t_nextCleanup > 0 && t_nextCleanup > now.tv_sec)) {
-    return;
-  }
-
-  t_nextCleanup = now.tv_sec + s_cleanupInterval;
-
-  struct timeval freshCutOff = now;
-  freshCutOff.tv_sec -= 1;
-  struct timeval idleCutOff = now;
-  idleCutOff.tv_sec -= s_maxIdleTime;
-
-  for (auto dsIt = t_downstreamConnections.begin(); dsIt != t_downstreamConnections.end();) {
-    for (auto connIt = dsIt->second.begin(); connIt != dsIt->second.end();) {
-      if (!(*connIt)) {
-        connIt = dsIt->second.erase(connIt);
-        continue;
-      }
-
-      /* don't bother checking freshly used connections */
-      if (freshCutOff < (*connIt)->getLastDataReceivedTime()) {
-        ++connIt;
-        continue;
-      }
-
-      if ((*connIt)->isIdle() && (*connIt)->getLastDataReceivedTime() < idleCutOff) {
-        /* idle for too long */
-        connIt = dsIt->second.erase(connIt);
-        continue;
-      }
-
-      if ((*connIt)->isUsable()) {
-        ++connIt;
-        continue;
-      }
-
-      connIt = dsIt->second.erase(connIt);
-    }
-
-    if (!dsIt->second.empty()) {
-      ++dsIt;
-    }
-    else {
-      dsIt = t_downstreamConnections.erase(dsIt);
-    }
-  }
-}
-
-std::shared_ptr<DoHConnectionToBackend> DownstreamDoHConnectionsManager::getConnectionToDownstream(std::unique_ptr<FDMultiplexer>& mplexer, const std::shared_ptr<DownstreamState>& ds, const struct timeval& now, std::string&& proxyProtocolPayload)
-{
-  std::shared_ptr<DoHConnectionToBackend> result;
-  struct timeval freshCutOff = now;
-  freshCutOff.tv_sec -= 1;
-
-  auto backendId = ds->getID();
-
-  cleanupClosedConnections(now);
-
-  const bool haveProxyProtocol = !proxyProtocolPayload.empty();
-  if (!haveProxyProtocol) {
-    //cerr<<"looking for existing connection"<<endl;
-    const auto& it = t_downstreamConnections.find(backendId);
-    if (it != t_downstreamConnections.end()) {
-      auto& list = it->second;
-      for (auto listIt = list.begin(); listIt != list.end();) {
-        auto& entry = *listIt;
-        if (!entry->canBeReused()) {
-          if (!entry->willBeReusable(false)) {
-            listIt = list.erase(listIt);
-          }
-          else {
-            ++listIt;
-          }
-          continue;
-        }
-        entry->setReused();
-        /* for connections that have not been used very recently,
-           check whether they have been closed in the meantime */
-        if (freshCutOff < entry->getLastDataReceivedTime()) {
-          /* used recently enough, skip the check */
-          ++ds->tcpReusedConnections;
-          return entry;
-        }
-
-        if (isTCPSocketUsable(entry->getHandle())) {
-          ++ds->tcpReusedConnections;
-          return entry;
-        }
-
-        listIt = list.erase(listIt);
-      }
-    }
-  }
-
-  auto newConnection = std::make_shared<DoHConnectionToBackend>(ds, mplexer, now, std::move(proxyProtocolPayload));
-  if (!haveProxyProtocol) {
-    t_downstreamConnections[backendId].push_front(newConnection);
-  }
-  return newConnection;
-}
-
 static void handleCrossProtocolQuery(int pipefd, FDMultiplexer::funcparam_t& param)
 {
   auto threadData = boost::any_cast<DoHClientThreadData*>(param);
@@ -1030,7 +860,7 @@ static void handleCrossProtocolQuery(int pipefd, FDMultiplexer::funcparam_t& par
     tmp = nullptr;
 
     try {
-      auto downstream = DownstreamDoHConnectionsManager::getConnectionToDownstream(threadData->mplexer, downstreamServer, now, std::move(query.d_proxyProtocolPayload));
+      auto downstream = t_downstreamDoHConnectionsManager.getConnectionToDownstream(threadData->mplexer, downstreamServer, now, std::move(query.d_proxyProtocolPayload));
       downstream->queueQuery(tqs, std::move(query));
     }
     catch (...) {
@@ -1062,7 +892,7 @@ static void dohClientThread(int crossProtocolPipeFD)
         lastTimeoutScan = now.tv_sec;
 
         try {
-          DownstreamDoHConnectionsManager::cleanupClosedConnections(now);
+          t_downstreamDoHConnectionsManager.cleanupClosedConnections(now);
           handleH2Timeouts(*data.mplexer, now);
 
           if (g_dohStatesDumpRequested > 0) {
@@ -1304,7 +1134,7 @@ bool sendH2Query(const std::shared_ptr<DownstreamState>& ds, std::unique_ptr<FDM
     newConnection->queueQuery(sender, std::move(query));
   }
   else {
-    auto connection = DownstreamDoHConnectionsManager::getConnectionToDownstream(mplexer, ds, now, std::move(query.d_proxyProtocolPayload));
+    auto connection = t_downstreamDoHConnectionsManager.getConnectionToDownstream(mplexer, ds, now, std::move(query.d_proxyProtocolPayload));
     connection->queueQuery(sender, std::move(query));
   }
 
@@ -1318,7 +1148,7 @@ size_t clearH2Connections()
 {
   size_t cleared = 0;
 #ifdef HAVE_NGHTTP2
-  cleared = DownstreamDoHConnectionsManager::clear();
+  cleared = t_downstreamDoHConnectionsManager.clear();
 #endif /* HAVE_NGHTTP2 */
   return cleared;
 }
index 1a76842dc6fc3e69d4a3622c2a1f562818e9b679..1677d65e7efc7d0464cfc56e35436b7603455a05 100644 (file)
@@ -5,6 +5,8 @@
 
 #include "dnsparser.hh"
 
+thread_local DownstreamTCPConnectionsManager t_downstreamTCPConnectionsManager;
+
 ConnectionToBackend::~ConnectionToBackend()
 {
   if (d_ds && d_handler) {
@@ -738,7 +740,6 @@ bool TCPConnectionToBackend::isXFRFinished(const TCPResponse& response, TCPQuery
         }
         auto raw = unknownContent->getRawContent();
         auto serial = getSerialFromRawSOAContent(raw);
-
         ++query.d_xfrSerialCount;
         if (query.d_xfrMasterSerial == 0) {
           // store the first SOA in our client's connection metadata
@@ -766,121 +767,3 @@ bool TCPConnectionToBackend::isXFRFinished(const TCPResponse& response, TCPQuery
   }
   return done;
 }
-
-std::shared_ptr<TCPConnectionToBackend> DownstreamConnectionsManager::getConnectionToDownstream(std::unique_ptr<FDMultiplexer>& mplexer, std::shared_ptr<DownstreamState>& ds, const struct timeval& now)
-{
-  struct timeval freshCutOff = now;
-  freshCutOff.tv_sec -= 1;
-
-  auto backendId = ds->getID();
-
-  cleanupClosedTCPConnections(now);
-
-  {
-    const auto& it = t_downstreamConnections.find(backendId);
-    if (it != t_downstreamConnections.end()) {
-      auto& list = it->second;
-      for (auto listIt = list.begin(); listIt != list.end(); ) {
-        auto& entry = *listIt;
-        if (!entry->canBeReused()) {
-          if (!entry->willBeReusable(false)) {
-            listIt = list.erase(listIt);
-          }
-          else {
-            ++listIt;
-          }
-          continue;
-        }
-
-        entry->setReused();
-        /* for connections that have not been used very recently,
-           check whether they have been closed in the meantime */
-        if (freshCutOff < entry->getLastDataReceivedTime()) {
-          /* used recently enough, skip the check */
-          ++ds->tcpReusedConnections;
-          return entry;
-        }
-
-        if (entry->isUsable()) {
-          ++ds->tcpReusedConnections;
-          return entry;
-        }
-
-        listIt = list.erase(listIt);
-      }
-    }
-  }
-
-  auto newConnection = std::make_shared<TCPConnectionToBackend>(ds, mplexer, now);
-  if (!ds->useProxyProtocol) {
-    t_downstreamConnections[backendId].push_front(newConnection);
-  }
-  return newConnection;
-}
-
-void DownstreamConnectionsManager::cleanupClosedTCPConnections(struct timeval now)
-{
-  if (s_cleanupInterval == 0 || (t_nextCleanup != 0 && t_nextCleanup > now.tv_sec)) {
-    return;
-  }
-
-  t_nextCleanup = now.tv_sec + s_cleanupInterval;
-
-  struct timeval freshCutOff = now;
-  freshCutOff.tv_sec -= 1;
-  struct timeval idleCutOff = now;
-  idleCutOff.tv_sec -= s_maxIdleTime;
-
-  for (auto dsIt = t_downstreamConnections.begin(); dsIt != t_downstreamConnections.end(); ) {
-    for (auto connIt = dsIt->second.begin(); connIt != dsIt->second.end(); ) {
-      if (!(*connIt)) {
-        ++connIt;
-        continue;
-      }
-
-      /* don't bother checking freshly used connections */
-      if (freshCutOff < (*connIt)->getLastDataReceivedTime()) {
-        ++connIt;
-        continue;
-      }
-
-      if ((*connIt)->isIdle() && (*connIt)->getLastDataReceivedTime() < idleCutOff) {
-        /* idle for too long */
-        connIt = dsIt->second.erase(connIt);
-        continue;
-      }
-
-      if ((*connIt)->isUsable()) {
-        ++connIt;
-        continue;
-      }
-
-      connIt = dsIt->second.erase(connIt);
-    }
-
-    if (!dsIt->second.empty()) {
-      ++dsIt;
-    }
-    else {
-      dsIt = t_downstreamConnections.erase(dsIt);
-    }
-  }
-}
-
-size_t DownstreamConnectionsManager::clear()
-{
-  size_t count = 0;
-  for (const auto& downstream : t_downstreamConnections) {
-    count += downstream.second.size();
-  }
-
-  t_downstreamConnections.clear();
-
-  return count;
-}
-
-thread_local map<boost::uuids::uuid, std::deque<std::shared_ptr<TCPConnectionToBackend>>> DownstreamConnectionsManager::t_downstreamConnections;
-thread_local time_t DownstreamConnectionsManager::t_nextCleanup{0};
-size_t DownstreamConnectionsManager::s_maxCachedConnectionsPerDownstream{10};
-uint16_t DownstreamConnectionsManager::s_cleanupInterval{60};
-uint16_t DownstreamConnectionsManager::s_maxIdleTime{300};
index 78327861393b399a45833335c36ed044cab69144..0eb6c1a2258d1ced8b0ecb5beb4b5e718b35d95f 100644 (file)
@@ -10,7 +10,7 @@
 class ConnectionToBackend : public std::enable_shared_from_this<ConnectionToBackend>
 {
 public:
-  ConnectionToBackend(std::shared_ptr<DownstreamState>& ds, std::unique_ptr<FDMultiplexer>& mplexer, const struct timeval& now): d_connectionStartTime(now), d_lastDataReceivedTime(now), d_ds(ds), d_mplexer(mplexer), d_enableFastOpen(ds->tcpFastOpen)
+  ConnectionToBackend(const std::shared_ptr<DownstreamState>& ds, std::unique_ptr<FDMultiplexer>& mplexer, const struct timeval& now): d_connectionStartTime(now), d_lastDataReceivedTime(now), d_ds(ds), d_mplexer(mplexer), d_enableFastOpen(ds->tcpFastOpen)
   {
     reconnect();
   }
@@ -116,6 +116,9 @@ public:
   virtual bool reachedMaxConcurrentQueries() const = 0;
   virtual bool isIdle() const = 0;
   virtual void release() = 0;
+  virtual void stopIO()
+  {
+  }
 
   bool matches(const std::shared_ptr<DownstreamState>& ds) const
   {
@@ -200,7 +203,7 @@ protected:
 
   struct timeval d_connectionStartTime;
   struct timeval d_lastDataReceivedTime;
-  std::shared_ptr<DownstreamState> d_ds{nullptr};
+  const std::shared_ptr<DownstreamState> d_ds{nullptr};
   std::shared_ptr<TCPQuerySender> d_sender{nullptr};
   std::unique_ptr<FDMultiplexer>& d_mplexer;
   std::unique_ptr<TCPIOHandler> d_handler{nullptr};
@@ -217,7 +220,7 @@ protected:
 class TCPConnectionToBackend : public ConnectionToBackend
 {
 public:
-  TCPConnectionToBackend(std::shared_ptr<DownstreamState>& ds, std::unique_ptr<FDMultiplexer>& mplexer, const struct timeval& now): ConnectionToBackend(ds, mplexer, now), d_responseBuffer(s_maxPacketCacheEntrySize)
+  TCPConnectionToBackend(const std::shared_ptr<DownstreamState>& ds, std::unique_ptr<FDMultiplexer>& mplexer, const struct timeval& now, std::string&& /* proxyProtocolPayload*, unused but there to match the HTTP2 connections, so we can use the same templated connections manager class */): ConnectionToBackend(ds, mplexer, now), d_responseBuffer(s_maxPacketCacheEntrySize)
   {
   }
 
@@ -295,13 +298,9 @@ private:
   State d_state{State::idle};
 };
 
-class DownstreamConnectionsManager
+template <class T> class DownstreamConnectionsManager
 {
 public:
-  static std::shared_ptr<TCPConnectionToBackend> getConnectionToDownstream(std::unique_ptr<FDMultiplexer>& mplexer, std::shared_ptr<DownstreamState>& ds, const struct timeval& now);
-  static void cleanupClosedTCPConnections(struct timeval now);
-  static size_t clear();
-
   static void setMaxCachedConnectionsPerDownstream(size_t max)
   {
     s_maxCachedConnectionsPerDownstream = max;
@@ -317,10 +316,168 @@ public:
     s_maxIdleTime = max;
   }
 
-private:
-  static thread_local map<boost::uuids::uuid, std::deque<std::shared_ptr<TCPConnectionToBackend>>> t_downstreamConnections;
-  static thread_local time_t t_nextCleanup;
+  bool isConnectionUsable(const std::shared_ptr<T>& conn, const struct timeval& now, const struct timeval& freshCutOff)
+  {
+    if (!conn->canBeReused()) {
+      return false;
+    }
+
+    /* for connections that have not been used very recently,
+       check whether they have been closed in the meantime */
+    if (freshCutOff < conn->getLastDataReceivedTime()) {
+      /* used recently enough, skip the check */
+      return true;
+    }
+
+    if (conn->isUsable()) {
+      return true;
+    }
+
+    return false;
+  }
+
+  std::shared_ptr<T> getConnectionToDownstream(std::unique_ptr<FDMultiplexer>& mplexer, const std::shared_ptr<DownstreamState>& ds, const struct timeval& now, std::string&& proxyProtocolPayload)
+  {
+    struct timeval freshCutOff = now;
+    freshCutOff.tv_sec -= 1;
+
+    auto backendId = ds->getID();
+
+    cleanupClosedConnections(now);
+
+    const bool haveProxyProtocol = ds->useProxyProtocol || !proxyProtocolPayload.empty();
+    if (!haveProxyProtocol) {
+      const auto& it = d_downstreamConnections.find(backendId);
+      if (it != d_downstreamConnections.end()) {
+        auto& list = it->second;
+        for (auto listIt = list.begin(); listIt != list.end(); ) {
+          if (!(*listIt)) {
+            listIt = list.erase(listIt);
+            continue;
+          }
+
+          auto& entry = *listIt;
+          if (isConnectionUsable(entry, now, freshCutOff)) {
+            entry->setReused();
+            ++ds->tcpReusedConnections;
+            return entry;
+          }
+
+          if (entry->willBeReusable(false)) {
+            ++listIt;
+            continue;
+          }
+
+          listIt = list.erase(listIt);
+        }
+      }
+    }
+
+    auto newConnection = std::make_shared<T>(ds, mplexer, now, std::move(proxyProtocolPayload));
+    if (!haveProxyProtocol) {
+      d_downstreamConnections[backendId].push_front(newConnection);
+    }
+
+    return newConnection;
+  }
+
+  void cleanupClosedConnections(struct timeval now)
+  {
+    if (s_cleanupInterval == 0 || (d_nextCleanup != 0 && d_nextCleanup > now.tv_sec)) {
+      return;
+    }
+
+    d_nextCleanup = now.tv_sec + s_cleanupInterval;
+
+    struct timeval freshCutOff = now;
+    freshCutOff.tv_sec -= 1;
+    struct timeval idleCutOff = now;
+    idleCutOff.tv_sec -= s_maxIdleTime;
+
+    for (auto dsIt = d_downstreamConnections.begin(); dsIt != d_downstreamConnections.end(); ) {
+      for (auto connIt = dsIt->second.begin(); connIt != dsIt->second.end(); ) {
+        if (!(*connIt)) {
+          connIt = dsIt->second.erase(connIt);
+          continue;
+        }
+
+        auto& entry = *connIt;
+
+        /* don't bother checking freshly used connections */
+        if (freshCutOff < entry->getLastDataReceivedTime()) {
+          ++connIt;
+          continue;
+        }
+
+        if (entry->isIdle() && entry->getLastDataReceivedTime() < idleCutOff) {
+          /* idle for too long */
+          connIt = dsIt->second.erase(connIt);
+          continue;
+        }
+
+        if (entry->isUsable()) {
+          ++connIt;
+          continue;
+        }
+
+        connIt = dsIt->second.erase(connIt);
+      }
+
+      if (!dsIt->second.empty()) {
+        ++dsIt;
+      }
+      else {
+        dsIt = d_downstreamConnections.erase(dsIt);
+      }
+    }
+  }
+
+  size_t clear()
+  {
+    size_t count = 0;
+    for (const auto& downstream : d_downstreamConnections) {
+      count += downstream.second.size();
+      for (auto& conn : downstream.second) {
+        conn->stopIO();
+      }
+    }
+
+    d_downstreamConnections.clear();
+    return count;
+  }
+
+  bool removeDownstreamConnection(std::shared_ptr<T>& conn)
+  {
+    bool found = false;
+    auto backendIt = d_downstreamConnections.find(conn->getDS()->getID());
+    if (backendIt == d_downstreamConnections.end()) {
+      return found;
+    }
+
+    for (auto connIt = backendIt->second.begin(); connIt != backendIt->second.end(); ++connIt) {
+      if (*connIt == conn) {
+        backendIt->second.erase(connIt);
+        found = true;
+        break;
+      }
+    }
+
+    return found;
+  }
+
+protected:
+
   static size_t s_maxCachedConnectionsPerDownstream;
   static uint16_t s_cleanupInterval;
   static uint16_t s_maxIdleTime;
+
+  std::map<boost::uuids::uuid, std::deque<std::shared_ptr<T>>> d_downstreamConnections;
+  time_t d_nextCleanup{0};
 };
+
+template <class T> size_t DownstreamConnectionsManager<T>::s_maxCachedConnectionsPerDownstream{10};
+template <class T> uint16_t DownstreamConnectionsManager<T>::s_cleanupInterval{60};
+template <class T> uint16_t DownstreamConnectionsManager<T>::s_maxIdleTime{300};
+
+using DownstreamTCPConnectionsManager = DownstreamConnectionsManager<TCPConnectionToBackend>;
+extern thread_local DownstreamTCPConnectionsManager t_downstreamTCPConnectionsManager;