]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
dnsdist: Split the list of downstream connections in two, active and idle
authorRemi Gacogne <remi.gacogne@powerdns.com>
Fri, 5 Nov 2021 16:44:59 +0000 (17:44 +0100)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Mon, 8 Nov 2021 09:26:36 +0000 (10:26 +0100)
This way we can easily keep track of how many idle connections we have,
and try to reuse these first.

pdns/dnsdist-lua.cc
pdns/dnsdistdist/dnsdist-nghttp2.cc
pdns/dnsdistdist/dnsdist-nghttp2.hh
pdns/dnsdistdist/dnsdist-tcp-downstream.cc
pdns/dnsdistdist/dnsdist-tcp-downstream.hh
pdns/dnsdistdist/docs/reference/tuning.rst
pdns/dnsdistdist/test-dnsdist-connections-cache.cc

index 9e58507796a6083bda00efc23c3de2c457a91899..fde21a9fccb47b5f95b6d308d060f91e487e40bb 100644 (file)
@@ -1306,11 +1306,11 @@ static void setupLuaConfig(LuaContext& luaCtx, bool client, bool configCheck)
   });
 
   luaCtx.writeFunction("setMaxCachedTCPConnectionsPerDownstream", [](size_t max) {
-    DownstreamTCPConnectionsManager::setMaxCachedConnectionsPerDownstream(max);
+    DownstreamTCPConnectionsManager::setMaxIdleConnectionsPerDownstream(max);
   });
 
-  luaCtx.writeFunction("setMaxCachedDoHConnectionsPerDownstream", [](size_t max) {
-    setDoHDownstreamMaxConnectionsPerBackend(max);
+  luaCtx.writeFunction("setMaxIdleDoHConnectionsPerDownstream", [](size_t max) {
+    setDoHDownstreamMaxIdleConnectionsPerBackend(max);
   });
 
   luaCtx.writeFunction("setOutgoingDoHWorkerThreads", [](uint64_t workers) {
index 4eb31cec98998dc15509131eac81e6308c38b326..5df7f337c804bf44c43d47fe19d21b403c6bf98b 100644 (file)
@@ -394,7 +394,7 @@ void DoHConnectionToBackend::handleReadableIOCallback(int fd, FDMultiplexer::fun
       }
 
       if (newState == IOState::Done) {
-        if (conn->getConcurrentStreamsCount() == 0) {
+        if (conn->isIdle()) {
           conn->stopIO();
           conn->watchForRemoteHostClosingConnection();
           ioGuard.release();
@@ -440,7 +440,7 @@ void DoHConnectionToBackend::handleWritableIOCallback(int fd, FDMultiplexer::fun
       conn->d_out.clear();
       conn->d_outPos = 0;
       conn->stopIO();
-      if (conn->getConcurrentStreamsCount() > 0) {
+      if (!conn->isIdle()) {
         conn->updateIO(IOState::NeedRead, handleReadableIOCallback);
       }
       else {
@@ -460,12 +460,15 @@ void DoHConnectionToBackend::stopIO()
 {
   d_ioState->reset();
 
+  auto shared = std::dynamic_pointer_cast<DoHConnectionToBackend>(shared_from_this());
   if (!willBeReusable(false)) {
     /* remove ourselves from the connection cache, this might mean that our
        reference count drops to zero after that, so we need to be careful */
-    auto shared = std::dynamic_pointer_cast<DoHConnectionToBackend>(shared_from_this());
     t_downstreamDoHConnectionsManager.removeDownstreamConnection(shared);
   }
+  else {
+    t_downstreamDoHConnectionsManager.moveToIdle(shared);
+  }
 }
 
 void DoHConnectionToBackend::updateIO(IOState newState, FDMultiplexer::callbackfunc_t callback, bool noTTD)
@@ -555,7 +558,7 @@ ssize_t DoHConnectionToBackend::send_callback(nghttp2_session* session, const ui
         conn->d_out.clear();
         conn->d_outPos = 0;
         conn->stopIO();
-        if (conn->getConcurrentStreamsCount() > 0) {
+        if (!conn->isIdle()) {
           conn->updateIO(IOState::NeedRead, handleReadableIOCallback);
         }
         else {
@@ -629,7 +632,7 @@ int DoHConnectionToBackend::on_frame_recv_callback(nghttp2_session* session, con
         conn->handleResponseError(std::move(request), now);
       }
 
-      if (conn->getConcurrentStreamsCount() == 0) {
+      if (conn->isIdle()) {
         conn->stopIO();
         conn->watchForRemoteHostClosingConnection();
       }
@@ -680,7 +683,7 @@ int DoHConnectionToBackend::on_data_chunk_recv_callback(nghttp2_session* session
 
       conn->handleResponseError(std::move(request), now);
     }
-    if (conn->getConcurrentStreamsCount() == 0) {
+    if (conn->isIdle()) {
       conn->stopIO();
       conn->watchForRemoteHostClosingConnection();
     }
@@ -724,7 +727,7 @@ int DoHConnectionToBackend::on_stream_close_callback(nghttp2_session* session, i
   }
 
   //cerr<<"we now have "<<conn->getConcurrentStreamsCount()<<" concurrent connections"<<endl;
-  if (conn->getConcurrentStreamsCount() == 0) {
+  if (conn->isIdle()) {
     //cerr<<"stopping IO"<<endl;
     conn->stopIO();
     conn->watchForRemoteHostClosingConnection();
@@ -1194,9 +1197,9 @@ void setDoHDownstreamMaxIdleTime(uint16_t max)
 #endif /* HAVE_NGHTTP2 */
 }
 
-void setDoHDownstreamMaxConnectionsPerBackend(size_t max)
+void setDoHDownstreamMaxIdleConnectionsPerBackend(size_t max)
 {
 #ifdef HAVE_NGHTTP2
-  DownstreamDoHConnectionsManager::setMaxCachedConnectionsPerDownstream(max);
+  DownstreamDoHConnectionsManager::setMaxIdleConnectionsPerDownstream(max);
 #endif /* HAVE_NGHTTP2 */
 }
index 7b7ed6333df0a2b8e22a03e42d9c4fd1528015cb..6e38f28cc383f1c12d76b5821b59414a7c118fe2 100644 (file)
@@ -72,4 +72,4 @@ size_t clearH2Connections();
 
 void setDoHDownstreamCleanupInterval(uint16_t max);
 void setDoHDownstreamMaxIdleTime(uint16_t max);
-void setDoHDownstreamMaxConnectionsPerBackend(size_t max);
+void setDoHDownstreamMaxIdleConnectionsPerBackend(size_t max);
index 1677d65e7efc7d0464cfc56e35436b7603455a05..7d3158cc215d7db267f1c187e992abc6893c593f 100644 (file)
@@ -633,12 +633,14 @@ IOState TCPConnectionToBackend::handleResponse(std::shared_ptr<TCPConnectionToBa
       --conn->d_ds->outstanding;
       /* marking as idle for now, so we can accept new queries if our queues are empty */
       if (d_pendingQueries.empty() && d_pendingResponses.empty()) {
+        t_downstreamTCPConnectionsManager.moveToIdle(conn);
         d_state = State::idle;
       }
     }
 
     sender->handleXFRResponse(now, std::move(response));
     if (done) {
+      t_downstreamTCPConnectionsManager.moveToIdle(conn);
       d_state = State::idle;
       return IOState::Done;
     }
@@ -655,6 +657,7 @@ IOState TCPConnectionToBackend::handleResponse(std::shared_ptr<TCPConnectionToBa
   d_pendingResponses.erase(it);
   /* marking as idle for now, so we can accept new queries if our queues are empty */
   if (d_pendingQueries.empty() && d_pendingResponses.empty()) {
+    t_downstreamTCPConnectionsManager.moveToIdle(conn);
     d_state = State::idle;
   }
 
@@ -678,6 +681,7 @@ IOState TCPConnectionToBackend::handleResponse(std::shared_ptr<TCPConnectionToBa
   }
   else {
     DEBUGLOG("nothing to do, waiting for a new query");
+    t_downstreamTCPConnectionsManager.moveToIdle(conn);
     d_state = State::idle;
     return IOState::Done;
   }
index ace31e4e4817e789b211cc957f717864e7c2bcd1..489405ef1ee141b330a48bcddab99deaae7fe66f 100644 (file)
@@ -300,10 +300,29 @@ private:
 
 template <class T> class DownstreamConnectionsManager
 {
+  struct SequencedTag {};
+  struct OrderedTag {};
+
+  typedef multi_index_container<
+    std::shared_ptr<T>,
+    indexed_by <
+      ordered_unique<tag<OrderedTag>,
+                     identity<std::shared_ptr<T>>
+                     >,
+      /* new elements are added to the front of the sequence */
+      sequenced<tag<SequencedTag> >
+      >
+  > list_t;
+  struct ConnectionLists
+  {
+    list_t d_actives;
+    list_t d_idles;
+  };
+
 public:
-  static void setMaxCachedConnectionsPerDownstream(size_t max)
+  static void setMaxIdleConnectionsPerDownstream(size_t max)
   {
-    s_maxCachedConnectionsPerDownstream = max;
+    s_maxIdleConnectionsPerDownstream = max;
   }
 
   static void setCleanupInterval(uint16_t interval)
@@ -329,37 +348,27 @@ public:
     if (!haveProxyProtocol) {
       const auto& it = d_downstreamConnections.find(backendId);
       if (it != d_downstreamConnections.end()) {
-        auto& list = it->second;
-        for (auto listIt = list.begin(); listIt != list.end(); ) {
-          if (!(*listIt)) {
-            listIt = list.erase(listIt);
-            continue;
-          }
-
-          auto& entry = *listIt;
-          if (isConnectionUsable(entry, now, freshCutOff)) {
-            entry->setReused();
-            ++ds->tcpReusedConnections;
-            return entry;
-          }
-
-          if (entry->willBeReusable(false)) {
-            ++listIt;
-            continue;
-          }
-
-          listIt = list.erase(listIt);
+        /* first scan idle connections, more recent first */
+        auto entry = findUsableConnectionInList(now, freshCutOff, it->second.d_idles, true);
+        if (entry) {
+          ++ds->tcpReusedConnections;
+          it->second.d_actives.insert(entry);
+          return entry;
+        }
+
+        /* then scan actives ones, more recent first as well */
+        entry = findUsableConnectionInList(now, freshCutOff, it->second.d_actives, false);
+        if (entry) {
+          ++ds->tcpReusedConnections;
+          return entry;
         }
       }
     }
 
     auto newConnection = std::make_shared<T>(ds, mplexer, now, std::move(proxyProtocolPayload));
     if (!haveProxyProtocol) {
-      auto& list = d_downstreamConnections[backendId];
-      if (list.size() == s_maxCachedConnectionsPerDownstream) {
-        list.pop_back();
-      }
-      list.push_front(newConnection);
+      auto& list = d_downstreamConnections[backendId].d_actives;
+      list.template get<SequencedTag>().push_front(newConnection);
     }
 
     return newConnection;
@@ -379,39 +388,14 @@ public:
     idleCutOff.tv_sec -= s_maxIdleTime;
 
     for (auto dsIt = d_downstreamConnections.begin(); dsIt != d_downstreamConnections.end(); ) {
-      for (auto connIt = dsIt->second.begin(); connIt != dsIt->second.end(); ) {
-        if (!(*connIt)) {
-          connIt = dsIt->second.erase(connIt);
-          continue;
-        }
-
-        auto& entry = *connIt;
+      cleanUpList(dsIt->second.d_idles, now, freshCutOff, idleCutOff);
+      cleanUpList(dsIt->second.d_actives, now, freshCutOff, idleCutOff);
 
-        /* don't bother checking freshly used connections */
-        if (freshCutOff < entry->getLastDataReceivedTime()) {
-          ++connIt;
-          continue;
-        }
-
-        if (entry->isIdle() && entry->getLastDataReceivedTime() < idleCutOff) {
-          /* idle for too long */
-          connIt = dsIt->second.erase(connIt);
-          continue;
-        }
-
-        if (entry->isUsable()) {
-          ++connIt;
-          continue;
-        }
-
-        connIt = dsIt->second.erase(connIt);
-      }
-
-      if (!dsIt->second.empty()) {
-        ++dsIt;
+      if (dsIt->second.d_idles.empty() && dsIt->second.d_actives.empty()) {
+        dsIt = d_downstreamConnections.erase(dsIt);
       }
       else {
-        dsIt = d_downstreamConnections.erase(dsIt);
+        ++dsIt;
       }
     }
   }
@@ -420,8 +404,12 @@ public:
   {
     size_t count = 0;
     for (const auto& downstream : d_downstreamConnections) {
-      count += downstream.second.size();
-      for (auto& conn : downstream.second) {
+      count += downstream.second.d_actives.size();
+      for (auto& conn : downstream.second.d_actives) {
+        conn->stopIO();
+      }
+      count += downstream.second.d_idles.size();
+      for (auto& conn : downstream.second.d_idles) {
         conn->stopIO();
       }
     }
@@ -431,35 +419,142 @@ public:
   }
 
   size_t count() const
+  {
+    return getActiveCount() + getIdleCount();
+  }
+
+  size_t getActiveCount() const
   {
     size_t count = 0;
     for (const auto& downstream : d_downstreamConnections) {
-      count += downstream.second.size();
+      count += downstream.second.d_actives.size();
+    }
+    return count;
+  }
+
+  size_t getIdleCount() const
+  {
+    size_t count = 0;
+    for (const auto& downstream : d_downstreamConnections) {
+      count += downstream.second.d_idles.size();
     }
     return count;
   }
 
   bool removeDownstreamConnection(std::shared_ptr<T>& conn)
   {
-    bool found = false;
     auto backendIt = d_downstreamConnections.find(conn->getDS()->getID());
     if (backendIt == d_downstreamConnections.end()) {
-      return found;
+      return false;
     }
 
-    for (auto connIt = backendIt->second.begin(); connIt != backendIt->second.end(); ++connIt) {
-      if (*connIt == conn) {
-        backendIt->second.erase(connIt);
-        found = true;
-        break;
+    /* idle list first */
+    {
+      auto it = backendIt->second.d_idles.find(conn);
+      if (it != backendIt->second.d_idles.end()) {
+        backendIt->second.d_idles.erase(it);
+        return true;
+      }
+    }
+    /* then active */
+    {
+      auto it = backendIt->second.d_actives.find(conn);
+      if (it != backendIt->second.d_actives.end()) {
+        backendIt->second.d_actives.erase(it);
+        return true;
       }
     }
 
-    return found;
+    return false;
+  }
+
+  bool moveToIdle(std::shared_ptr<T>& conn)
+  {
+    auto backendIt = d_downstreamConnections.find(conn->getDS()->getID());
+    if (backendIt == d_downstreamConnections.end()) {
+      return false;
+    }
+
+    auto it = backendIt->second.d_actives.find(conn);
+    if (it == backendIt->second.d_actives.end()) {
+      return false;
+    }
+
+    backendIt->second.d_actives.erase(it);
+
+    if (backendIt->second.d_idles.size() >= s_maxIdleConnectionsPerDownstream) {
+      backendIt->second.d_idles.template get<SequencedTag>().pop_back();
+    }
+
+    backendIt->second.d_idles.template get<SequencedTag>().push_front(conn);
+    return true;
   }
 
 protected:
 
+  void cleanUpList(list_t& list, const struct timeval& now, const struct timeval& freshCutOff, const struct timeval& idleCutOff)
+  {
+    auto& sidx = list.template get<SequencedTag>();
+    for (auto connIt = sidx.begin(); connIt != sidx.end(); ) {
+      if (!(*connIt)) {
+        connIt = sidx.erase(connIt);
+        continue;
+      }
+
+      auto& entry = *connIt;
+
+      /* don't bother checking freshly used connections */
+      if (freshCutOff < entry->getLastDataReceivedTime()) {
+        ++connIt;
+        continue;
+      }
+
+      if (entry->isIdle() && entry->getLastDataReceivedTime() < idleCutOff) {
+        /* idle for too long */
+        connIt = sidx.erase(connIt);
+        continue;
+      }
+
+      if (entry->isUsable()) {
+        ++connIt;
+        continue;
+      }
+
+      connIt = sidx.erase(connIt);
+    }
+  }
+
+  std::shared_ptr<T> findUsableConnectionInList(const struct timeval& now, struct timeval& freshCutOff, list_t& list, bool removeIfFound)
+  {
+    auto& sidx = list.template get<SequencedTag>();
+    for (auto listIt = sidx.begin(); listIt != sidx.end(); ) {
+      if (!(*listIt)) {
+        listIt = sidx.erase(listIt);
+        continue;
+      }
+
+      auto& entry = *listIt;
+      if (isConnectionUsable(entry, now, freshCutOff)) {
+        entry->setReused();
+        auto result = entry;
+        if (removeIfFound) {
+          sidx.erase(listIt);
+        }
+        return result;
+      }
+
+      if (entry->willBeReusable(false)) {
+        ++listIt;
+        continue;
+      }
+
+      /* that connection will not be usable later, no need to keep it in that list */
+      listIt = sidx.erase(listIt);
+    }
+
+    return nullptr;
+  }
+
   bool isConnectionUsable(const std::shared_ptr<T>& conn, const struct timeval& now, const struct timeval& freshCutOff)
   {
     if (!conn->canBeReused()) {
@@ -480,15 +575,16 @@ protected:
     return false;
   }
 
-  static size_t s_maxCachedConnectionsPerDownstream;
+  static size_t s_maxIdleConnectionsPerDownstream;
   static uint16_t s_cleanupInterval;
   static uint16_t s_maxIdleTime;
 
-  std::map<boost::uuids::uuid, std::deque<std::shared_ptr<T>>> d_downstreamConnections;
+  std::map<boost::uuids::uuid, ConnectionLists> d_downstreamConnections;
+
   time_t d_nextCleanup{0};
 };
 
-template <class T> size_t DownstreamConnectionsManager<T>::s_maxCachedConnectionsPerDownstream{10};
+template <class T> size_t DownstreamConnectionsManager<T>::s_maxIdleConnectionsPerDownstream{10};
 template <class T> uint16_t DownstreamConnectionsManager<T>::s_cleanupInterval{60};
 template <class T> uint16_t DownstreamConnectionsManager<T>::s_maxIdleTime{300};
 
index 5eeb400d4bff197e23b76b57a0629ef149894362..90442f5b80c1bd3d6456cb2bbacf7d51c011332c 100644 (file)
@@ -16,7 +16,7 @@ Tuning related functions
 
   :param int max: The maximum time in seconds.
 
-.. function:: setMaxCachedDoHConnectionsPerDownstream(max)
+.. function:: setMaxIdleDoHConnectionsPerDownstream(max)
 
   .. versionadded:: 1.7.0
 
index da9e733a46cb77346007596abb676d7a74d96cc2..6cde1186dae8ceec192a4caf5ce70a9de2f8fc4a 100644 (file)
@@ -29,7 +29,7 @@
 class MockupConnection
 {
 public:
-  MockupConnection(const std::shared_ptr<DownstreamState>&, std::unique_ptr<FDMultiplexer>&, const struct timeval&, std::string&&)
+  MockupConnection(const std::shared_ptr<DownstreamState>& ds, std::unique_ptr<FDMultiplexer>&, const struct timeval&, std::string&&): d_ds(ds)
   {
   }
 
@@ -66,6 +66,12 @@ public:
   {
   }
 
+  std::shared_ptr<DownstreamState> getDS() const
+  {
+    return d_ds;
+  }
+
+  std::shared_ptr<DownstreamState> d_ds;
   struct timeval d_lastDataReceivedTime
   {
     0, 0
@@ -80,10 +86,10 @@ BOOST_AUTO_TEST_SUITE(test_dnsdist_connections_cache)
 BOOST_AUTO_TEST_CASE(test_ConnectionsCache)
 {
   DownstreamConnectionsManager<MockupConnection> manager;
-  const size_t maxConnPerDownstream = 5;
+  const size_t maxIdleConnPerDownstream = 5;
   const uint16_t cleanupInterval = 1;
   const uint16_t maxIdleTime = 5;
-  manager.setMaxCachedConnectionsPerDownstream(maxConnPerDownstream);
+  manager.setMaxIdleConnectionsPerDownstream(maxIdleConnPerDownstream);
   manager.setCleanupInterval(cleanupInterval);
   manager.setMaxIdleTime(maxIdleTime);
 
@@ -96,12 +102,15 @@ BOOST_AUTO_TEST_CASE(test_ConnectionsCache)
   auto conn = manager.getConnectionToDownstream(mplexer, downstream1, now, std::string());
   BOOST_REQUIRE(conn != nullptr);
   BOOST_CHECK_EQUAL(manager.count(), 1U);
+  BOOST_CHECK_EQUAL(manager.getActiveCount(), 1U);
+  BOOST_CHECK_EQUAL(manager.getIdleCount(), 0U);
 
   /* since the connection can be reused, we should get the same one */
   {
     auto conn1 = manager.getConnectionToDownstream(mplexer, downstream1, now, std::string());
     BOOST_CHECK(conn.get() == conn1.get());
     BOOST_CHECK_EQUAL(manager.count(), 1U);
+    BOOST_CHECK_EQUAL(manager.getActiveCount(), 1U);
   }
 
   /* if we mark it non-usable, we should get a new one */
@@ -109,12 +118,14 @@ BOOST_AUTO_TEST_CASE(test_ConnectionsCache)
   auto conn2 = manager.getConnectionToDownstream(mplexer, downstream1, now, std::string());
   BOOST_CHECK(conn.get() != conn2.get());
   BOOST_CHECK_EQUAL(manager.count(), 2U);
+  BOOST_CHECK_EQUAL(manager.getActiveCount(), 2U);
 
   /* since the second connection can be reused, we should get it */
   {
     auto conn3 = manager.getConnectionToDownstream(mplexer, downstream1, now, std::string());
     BOOST_CHECK(conn3.get() == conn2.get());
     BOOST_CHECK_EQUAL(manager.count(), 2U);
+    BOOST_CHECK_EQUAL(manager.getActiveCount(), 2U);
   }
 
   /* different downstream so different connection */
@@ -123,11 +134,13 @@ BOOST_AUTO_TEST_CASE(test_ConnectionsCache)
   BOOST_CHECK(differentConn.get() != conn.get());
   BOOST_CHECK(differentConn.get() != conn2.get());
   BOOST_CHECK_EQUAL(manager.count(), 3U);
+  BOOST_CHECK_EQUAL(manager.getActiveCount(), 3U);
   {
     /* but we should be able to reuse it */
     auto sameConn = manager.getConnectionToDownstream(mplexer, downstream2, now, std::string());
     BOOST_CHECK(sameConn.get() == differentConn.get());
     BOOST_CHECK_EQUAL(manager.count(), 3U);
+    BOOST_CHECK_EQUAL(manager.getActiveCount(), 3U);
   }
 
   struct timeval later = now;
@@ -155,16 +168,16 @@ BOOST_AUTO_TEST_CASE(test_ConnectionsCache)
   conn->d_lastDataReceivedTime.tv_sec = 0;
 
   std::vector<std::shared_ptr<MockupConnection>> conns = {conn};
-  while (conns.size() < maxConnPerDownstream) {
+  while (conns.size() < maxIdleConnPerDownstream) {
     auto newConn = manager.getConnectionToDownstream(mplexer, downstream1, now, std::string());
     newConn->d_usable = false;
     conns.push_back(newConn);
     BOOST_CHECK_EQUAL(manager.count(), conns.size());
   }
 
-  /* if we add a new one, the oldest should get expunged */
+  /* if we add a new one, the oldest should NOT get expunged because they are all active ones! */
   auto newConn = manager.getConnectionToDownstream(mplexer, downstream1, now, std::string());
-  BOOST_CHECK_EQUAL(manager.count(), maxConnPerDownstream);
+  BOOST_CHECK_GT(manager.count(), maxIdleConnPerDownstream);
 
   {
     /* mark all connections as not usable anymore */
@@ -175,7 +188,7 @@ BOOST_AUTO_TEST_CASE(test_ConnectionsCache)
     /* except the last one */
     newConn->d_usable = true;
 
-    BOOST_CHECK_EQUAL(manager.count(), maxConnPerDownstream);
+    BOOST_CHECK_EQUAL(manager.count(), conns.size() + 1);
     later.tv_sec += cleanupInterval + 1;
     manager.cleanupClosedConnections(later);
     BOOST_CHECK_EQUAL(manager.count(), 1U);
@@ -184,6 +197,25 @@ BOOST_AUTO_TEST_CASE(test_ConnectionsCache)
   conns.clear();
   auto cleared = manager.clear();
   BOOST_CHECK_EQUAL(cleared, 1U);
+
+  /* add 10 actives connections */
+  while (conns.size() < 10) {
+    newConn = manager.getConnectionToDownstream(mplexer, downstream1, now, std::string());
+    newConn->d_usable = false;
+    conns.push_back(newConn);
+    BOOST_CHECK_EQUAL(manager.count(), conns.size());
+    BOOST_CHECK_EQUAL(manager.getActiveCount(), conns.size());
+  }
+  /* now we mark them as idle */
+  for (auto& c : conns) {
+    /* use a different shared_ptr to make sure that the comparison is done on the actual raw pointer */
+    auto shared = c;
+    shared->d_idle = true;
+    BOOST_CHECK(manager.moveToIdle(shared));
+  }
+  BOOST_CHECK_EQUAL(manager.count(), maxIdleConnPerDownstream);
+  BOOST_CHECK_EQUAL(manager.getActiveCount(), 0U);
+  BOOST_CHECK_EQUAL(manager.getIdleCount(), maxIdleConnPerDownstream);
 }
 
 BOOST_AUTO_TEST_SUITE_END();