]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
dnsdist: Share the downstream TCP connections cache between threads
authorRemi Gacogne <remi.gacogne@powerdns.com>
Thu, 6 May 2021 12:41:57 +0000 (14:41 +0200)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Thu, 26 Aug 2021 14:30:27 +0000 (16:30 +0200)
pdns/dnsdist-lua.cc
pdns/dnsdist-tcp.cc
pdns/dnsdist.hh
pdns/dnsdistdist/dnsdist-tcp-downstream.cc
pdns/dnsdistdist/dnsdist-tcp-downstream.hh
pdns/dnsdistdist/dnsdist-tcp-upstream.hh
pdns/dnsdistdist/dnsdist-tcp.hh

index 4e44913e19d64d0d3147329c8ce7431ca8869a64..fddb268427c51337c48113e81f29fbb4570b3b76 100644 (file)
@@ -43,6 +43,7 @@
 #include "dnsdist-proxy-protocol.hh"
 #include "dnsdist-rings.hh"
 #include "dnsdist-secpoll.hh"
+#include "dnsdist-tcp-downstream.hh"
 #include "dnsdist-web.hh"
 
 #include "base64.hh"
@@ -1974,7 +1975,7 @@ static void setupLuaConfig(LuaContext& luaCtx, bool client, bool configCheck)
 
   luaCtx.writeFunction("setTCPDownstreamCleanupInterval", [](uint16_t interval) {
       setLuaSideEffect();
-      g_downstreamTCPCleanupInterval = interval;
+      DownstreamConnectionsManager::setCleanupInterval(interval);
     });
 
   luaCtx.writeFunction("setConsoleConnectionsLogging", [](bool enabled) {
index aa4a47d31d5fec75e4153e73b0c3203d1414a42a..cb40ce90daa8c92fb9df4a1b672b7f5a0a513ac6 100644 (file)
@@ -70,135 +70,11 @@ uint64_t g_maxTCPQueuedConnections{10000};
 size_t g_tcpInternalPipeBufferSize{0};
 uint64_t g_maxTCPQueuedConnections{1000};
 #endif
-uint16_t g_downstreamTCPCleanupInterval{60};
+
 int g_tcpRecvTimeout{2};
 int g_tcpSendTimeout{2};
 std::atomic<uint64_t> g_tcpStatesDumpRequested{0};
 
-class DownstreamConnectionsManager
-{
-public:
-
-  static std::shared_ptr<TCPConnectionToBackend> getConnectionToDownstream(std::unique_ptr<FDMultiplexer>& mplexer, std::shared_ptr<DownstreamState>& ds, const struct timeval& now)
-  {
-    std::shared_ptr<TCPConnectionToBackend> result;
-    struct timeval freshCutOff = now;
-    freshCutOff.tv_sec -= 1;
-
-    const auto& it = t_downstreamConnections.find(ds);
-    if (it != t_downstreamConnections.end()) {
-      auto& list = it->second;
-      while (!list.empty()) {
-        result = std::move(list.back());
-        list.pop_back();
-
-        result->setReused();
-        /* for connections that have not been used very recently,
-           check whether they have been closed in the meantime */
-        if (freshCutOff < result->getLastDataReceivedTime()) {
-          /* used recently enough, skip the check */
-          ++ds->tcpReusedConnections;
-          return result;
-        }
-
-        if (isTCPSocketUsable(result->getHandle())) {
-          ++ds->tcpReusedConnections;
-          return result;
-        }
-
-        /* otherwise let's try the next one, if any */
-      }
-    }
-
-    return std::make_shared<TCPConnectionToBackend>(ds, mplexer, now);
-  }
-
-  static void releaseDownstreamConnection(std::shared_ptr<TCPConnectionToBackend>&& conn)
-  {
-    if (conn == nullptr) {
-      return;
-    }
-
-    if (!conn->canBeReused()) {
-      conn.reset();
-      return;
-    }
-
-    const auto& ds = conn->getDS();
-    auto& list = t_downstreamConnections[ds];
-    while (list.size() >= s_maxCachedConnectionsPerDownstream) {
-      /* too many connections queued already */
-      list.pop_front();
-    }
-
-    list.push_back(std::move(conn));
-  }
-
-  static void cleanupClosedTCPConnections(struct timeval now)
-  {
-    struct timeval freshCutOff = now;
-    freshCutOff.tv_sec -= 1;
-
-    for (auto dsIt = t_downstreamConnections.begin(); dsIt != t_downstreamConnections.end(); ) {
-      for (auto connIt = dsIt->second.begin(); connIt != dsIt->second.end(); ) {
-        if (!(*connIt)) {
-          ++connIt;
-          continue;
-        }
-
-        /* don't bother checking freshly used connections */
-        if (freshCutOff < (*connIt)->getLastDataReceivedTime()) {
-          ++connIt;
-          continue;
-        }
-
-        if (isTCPSocketUsable((*connIt)->getHandle())) {
-          ++connIt;
-        }
-        else {
-          connIt = dsIt->second.erase(connIt);
-        }
-      }
-
-      if (!dsIt->second.empty()) {
-        ++dsIt;
-      }
-      else {
-        dsIt = t_downstreamConnections.erase(dsIt);
-      }
-    }
-  }
-
-  static size_t clear()
-  {
-    size_t count = 0;
-    for (const auto& downstream : t_downstreamConnections) {
-      count += downstream.second.size();
-    }
-
-    t_downstreamConnections.clear();
-
-    return count;
-  }
-
-  static void setMaxCachedConnectionsPerDownstream(size_t max)
-  {
-    s_maxCachedConnectionsPerDownstream = max;
-  }
-
-private:
-  static thread_local map<std::shared_ptr<DownstreamState>, std::deque<std::shared_ptr<TCPConnectionToBackend>>> t_downstreamConnections;
-  static size_t s_maxCachedConnectionsPerDownstream;
-};
-
-void setMaxCachedTCPConnectionsPerDownstream(size_t max)
-{
-  DownstreamConnectionsManager::setMaxCachedConnectionsPerDownstream(max);
-}
-
-thread_local map<std::shared_ptr<DownstreamState>, std::deque<std::shared_ptr<TCPConnectionToBackend>>> DownstreamConnectionsManager::t_downstreamConnections;
-size_t DownstreamConnectionsManager::s_maxCachedConnectionsPerDownstream{10};
-
 static void decrementTCPClientCount(const ComboAddress& client)
 {
   if (g_maxTCPConnectionsPerClient) {
@@ -1166,15 +1042,44 @@ static void tcpClientThread(int pipefd, int crossProtocolPipeFD)
 
   struct timeval now;
   gettimeofday(&now, nullptr);
-  time_t lastTCPCleanup = now.tv_sec;
   time_t lastTimeoutScan = now.tv_sec;
 
   for (;;) {
     data.mplexer->run(&now);
 
-    if (g_downstreamTCPCleanupInterval > 0 && (now.tv_sec > (lastTCPCleanup + g_downstreamTCPCleanupInterval))) {
-      DownstreamConnectionsManager::cleanupClosedTCPConnections(now);
-      lastTCPCleanup = now.tv_sec;
+    if (now.tv_sec > lastTimeoutScan) {
+      lastTimeoutScan = now.tv_sec;
+      auto expiredReadConns = data.mplexer->getTimeouts(now, false);
+      for (const auto& cbData : expiredReadConns) {
+        if (cbData.second.type() == typeid(std::shared_ptr<IncomingTCPConnectionState>)) {
+          auto state = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(cbData.second);
+          if (cbData.first == state->d_handler.getDescriptor()) {
+            vinfolog("Timeout (read) from remote TCP client %s", state->d_ci.remote.toStringWithPort());
+            state->handleTimeout(state, false);
+          }
+        }
+        else if (cbData.second.type() == typeid(std::shared_ptr<TCPConnectionToBackend>)) {
+          auto conn = boost::any_cast<std::shared_ptr<TCPConnectionToBackend>>(cbData.second);
+          vinfolog("Timeout (read) from remote backend %s", conn->getBackendName());
+          conn->handleTimeout(now, false);
+        }
+      }
+
+      auto expiredWriteConns = data.mplexer->getTimeouts(now, true);
+      for (const auto& cbData : expiredWriteConns) {
+        if (cbData.second.type() == typeid(std::shared_ptr<IncomingTCPConnectionState>)) {
+          auto state = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(cbData.second);
+          if (cbData.first == state->d_handler.getDescriptor()) {
+            vinfolog("Timeout (write) from remote TCP client %s", state->d_ci.remote.toStringWithPort());
+            state->handleTimeout(state, true);
+          }
+        }
+        else if (cbData.second.type() == typeid(std::shared_ptr<TCPConnectionToBackend>)) {
+          auto conn = boost::any_cast<std::shared_ptr<TCPConnectionToBackend>>(cbData.second);
+          vinfolog("Timeout (write) from remote backend %s", conn->getBackendName());
+          conn->handleTimeout(now, true);
+        }
+      }
 
       if (g_tcpStatesDumpRequested > 0) {
         /* just to keep things clean in the output, debug only */
@@ -1210,41 +1115,6 @@ static void tcpClientThread(int pipefd, int crossProtocolPipeFD)
         }
       }
     }
-
-    if (now.tv_sec > lastTimeoutScan) {
-      lastTimeoutScan = now.tv_sec;
-      auto expiredReadConns = data.mplexer->getTimeouts(now, false);
-      for (const auto& cbData : expiredReadConns) {
-        if (cbData.second.type() == typeid(std::shared_ptr<IncomingTCPConnectionState>)) {
-          auto state = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(cbData.second);
-          if (cbData.first == state->d_handler.getDescriptor()) {
-            vinfolog("Timeout (read) from remote TCP client %s", state->d_ci.remote.toStringWithPort());
-            state->handleTimeout(state, false);
-          }
-        }
-        else if (cbData.second.type() == typeid(std::shared_ptr<TCPConnectionToBackend>)) {
-          auto conn = boost::any_cast<std::shared_ptr<TCPConnectionToBackend>>(cbData.second);
-          vinfolog("Timeout (read) from remote backend %s", conn->getBackendName());
-          conn->handleTimeout(now, false);
-        }
-      }
-
-      auto expiredWriteConns = data.mplexer->getTimeouts(now, true);
-      for (const auto& cbData : expiredWriteConns) {
-        if (cbData.second.type() == typeid(std::shared_ptr<IncomingTCPConnectionState>)) {
-          auto state = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(cbData.second);
-          if (cbData.first == state->d_handler.getDescriptor()) {
-            vinfolog("Timeout (write) from remote TCP client %s", state->d_ci.remote.toStringWithPort());
-            state->handleTimeout(state, true);
-          }
-        }
-        else if (cbData.second.type() == typeid(std::shared_ptr<TCPConnectionToBackend>)) {
-          auto conn = boost::any_cast<std::shared_ptr<TCPConnectionToBackend>>(cbData.second);
-          vinfolog("Timeout (write) from remote backend %s", conn->getBackendName());
-          conn->handleTimeout(now, true);
-        }
-      }
-    }
   }
 }
 
index af26d04ff5ef84401f7ca24d565e580a15f03c14..2d394ca5dedade1496544c7aaf5d38433584344c 100644 (file)
@@ -948,8 +948,6 @@ extern uint32_t g_staleCacheEntriesTTL;
 extern bool g_apiReadWrite;
 extern std::string g_apiConfigDirectory;
 extern bool g_servFailOnNoPolicy;
-extern bool g_useTCPSinglePipe;
-extern uint16_t g_downstreamTCPCleanupInterval;
 extern size_t g_udpVectorSize;
 extern bool g_allowEmptyResponse;
 
index f1e2baeb6d4b167055a32d35861eaa95dcfde867..afe744f8f38adc5ae2c3799f1ecd06b16048d913 100644 (file)
@@ -567,10 +567,8 @@ IOState TCPConnectionToBackend::handleResponse(std::shared_ptr<TCPConnectionToBa
     // get ready to read the next packet, if any
     return IOState::NeedRead;
   }
-  else {
-    --conn->d_ds->outstanding;
-  }
 
+  --conn->d_ds->outstanding;
   auto ids = std::move(it->second.d_idstate);
   d_pendingResponses.erase(it);
   /* marking as idle for now, so we can accept new queries if our queues are empty */
@@ -579,6 +577,9 @@ IOState TCPConnectionToBackend::handleResponse(std::shared_ptr<TCPConnectionToBa
   }
 
   DEBUGLOG("passing response to client connection for "<<ids.qname);
+  // make sure that we still exist after calling handleResponse()
+  auto shared = shared_from_this();
+  bool release = canBeReused() && sender->releaseConnection();
   sender->handleResponse(now, TCPResponse(std::move(d_responseBuffer), std::move(ids), conn));
 
   if (!d_pendingQueries.empty()) {
@@ -600,6 +601,9 @@ IOState TCPConnectionToBackend::handleResponse(std::shared_ptr<TCPConnectionToBa
     DEBUGLOG("nothing to do, waiting for a new query");
     d_state = State::idle;
     d_sender.reset();
+    if (release) {
+      DownstreamConnectionsManager::releaseDownstreamConnection(std::move(shared));
+    }
     return IOState::Done;
   }
 }
@@ -689,3 +693,131 @@ bool TCPConnectionToBackend::isXFRFinished(const TCPResponse& response, TCPQuery
   }
   return done;
 }
+
+std::shared_ptr<TCPConnectionToBackend> DownstreamConnectionsManager::getConnectionToDownstream(std::unique_ptr<FDMultiplexer>& mplexer, std::shared_ptr<DownstreamState>& ds, const struct timeval& now)
+{
+  std::shared_ptr<TCPConnectionToBackend> result;
+  struct timeval freshCutOff = now;
+  freshCutOff.tv_sec -= 1;
+
+  auto backendId = ds->getID();
+
+  if (s_cleanupInterval > 0 && (s_nextCleanup == 0 || s_nextCleanup <= now.tv_sec)) {
+    s_nextCleanup = now.tv_sec + s_cleanupInterval;
+    cleanupClosedTCPConnections(now);
+  }
+
+  {
+    std::lock_guard<decltype(s_lock)> lock(s_lock);
+    const auto& it = s_downstreamConnections.find(backendId);
+    if (it != s_downstreamConnections.end()) {
+      auto& list = it->second;
+      while (!list.empty()) {
+        result = std::move(list.back());
+        list.pop_back();
+
+        result->setReused();
+        /* for connections that have not been used very recently,
+           check whether they have been closed in the meantime */
+        if (freshCutOff < result->getLastDataReceivedTime()) {
+          /* used recently enough, skip the check */
+          ++ds->tcpReusedConnections;
+          return result;
+        }
+
+        if (isTCPSocketUsable(result->getHandle())) {
+          ++ds->tcpReusedConnections;
+          return result;
+        }
+
+        /* otherwise let's try the next one, if any */
+      }
+    }
+  }
+
+  return std::make_shared<TCPConnectionToBackend>(ds, mplexer, now);
+}
+
+void DownstreamConnectionsManager::releaseDownstreamConnection(std::shared_ptr<TCPConnectionToBackend>&& conn)
+{
+  if (conn == nullptr) {
+    return;
+  }
+
+  if (!conn->canBeReused()) {
+    conn.reset();
+    return;
+  }
+
+  const auto& ds = conn->getDS();
+  {
+    std::lock_guard<decltype(s_lock)> lock(s_lock);
+    auto& list = s_downstreamConnections[ds->getID()];
+    while (list.size() >= s_maxCachedConnectionsPerDownstream) {
+      /* too many connections queued already */
+      list.pop_front();
+    }
+
+    list.push_back(std::move(conn));
+  }
+}
+
+void DownstreamConnectionsManager::cleanupClosedTCPConnections(struct timeval now)
+{
+  struct timeval freshCutOff = now;
+  freshCutOff.tv_sec -= 1;
+
+  std::lock_guard<decltype(s_lock)> lock(s_lock);
+  for (auto dsIt = s_downstreamConnections.begin(); dsIt != s_downstreamConnections.end(); ) {
+    for (auto connIt = dsIt->second.begin(); connIt != dsIt->second.end(); ) {
+      if (!(*connIt)) {
+        ++connIt;
+        continue;
+      }
+
+      /* don't bother checking freshly used connections */
+      if (freshCutOff < (*connIt)->getLastDataReceivedTime()) {
+        ++connIt;
+        continue;
+      }
+
+      if (isTCPSocketUsable((*connIt)->getHandle())) {
+        ++connIt;
+      }
+      else {
+        connIt = dsIt->second.erase(connIt);
+      }
+    }
+
+    if (!dsIt->second.empty()) {
+      ++dsIt;
+    }
+    else {
+      dsIt = s_downstreamConnections.erase(dsIt);
+    }
+  }
+}
+
+size_t DownstreamConnectionsManager::clear()
+{
+  size_t count = 0;
+  std::lock_guard<decltype(s_lock)> lock(s_lock);
+  for (const auto& downstream : s_downstreamConnections) {
+    count += downstream.second.size();
+    }
+
+  s_downstreamConnections.clear();
+
+  return count;
+}
+
+void setMaxCachedTCPConnectionsPerDownstream(size_t max)
+{
+  DownstreamConnectionsManager::setMaxCachedConnectionsPerDownstream(max);
+}
+
+map<boost::uuids::uuid, std::deque<std::shared_ptr<TCPConnectionToBackend>>> DownstreamConnectionsManager::s_downstreamConnections;
+std::mutex DownstreamConnectionsManager::s_lock;
+size_t DownstreamConnectionsManager::s_maxCachedConnectionsPerDownstream{10};
+time_t DownstreamConnectionsManager::s_nextCleanup{0};
+uint16_t DownstreamConnectionsManager::s_cleanupInterval{60};
index 8f7e354d2a6f7040f76bf4edd708a29661006eb8..59cc936b1b7582d0218a9932ca384f24e8e4e61e 100644 (file)
@@ -218,3 +218,29 @@ private:
   bool d_connectionDied{false};
   bool d_proxyProtocolPayloadSent{false};
 };
+
+class DownstreamConnectionsManager
+{
+public:
+  static std::shared_ptr<TCPConnectionToBackend> getConnectionToDownstream(std::unique_ptr<FDMultiplexer>& mplexer, std::shared_ptr<DownstreamState>& ds, const struct timeval& now);
+  static void releaseDownstreamConnection(std::shared_ptr<TCPConnectionToBackend>&& conn);
+  static void cleanupClosedTCPConnections(struct timeval now);
+  static size_t clear();
+
+  static void setMaxCachedConnectionsPerDownstream(size_t max)
+  {
+    s_maxCachedConnectionsPerDownstream = max;
+  }
+
+  static void setCleanupInterval(uint16_t interval)
+  {
+    s_cleanupInterval = interval;
+  }
+
+private:
+  static map<boost::uuids::uuid, std::deque<std::shared_ptr<TCPConnectionToBackend>>> s_downstreamConnections;
+  static std::mutex s_lock;
+  static size_t s_maxCachedConnectionsPerDownstream;
+  static time_t s_nextCleanup;
+  static uint16_t s_cleanupInterval;
+};
index e628e7c908b27c415e4f078647dbbe4e99f8fdd0..9d392006c958bddceb18d88eebb394a01b540214 100644 (file)
@@ -30,6 +30,9 @@ public:
     d_ci.fd = -1;
     d_proxiedDestination = d_origDest;
     d_proxiedRemote = d_ci.remote;
+
+    /* we manage the release of the downstream connection ourselves */
+    d_releaseConnection = false;
   }
 
   IncomingTCPConnectionState(const IncomingTCPConnectionState& rhs) = delete;
index 6a0058ca53acd2481ff6b31ba0e8c1606faad1da..02581e68745934bb14e33fff619158136938042c 100644 (file)
@@ -140,6 +140,15 @@ public:
   virtual void handleResponse(const struct timeval& now, TCPResponse&& response) = 0;
   virtual void handleXFRResponse(const struct timeval& now, TCPResponse&& response) = 0;
   virtual void notifyIOError(IDState&& query, const struct timeval& now) = 0;
+
+  /* whether the connection should be automatically released to the pool after handleResponse()
+     has been called */
+  bool releaseConnection() const
+  {
+    return d_releaseConnection;
+  }
+protected:
+  bool d_releaseConnection{true};
 };
 
 struct CrossProtocolQuery