From: Remi Gacogne Date: Fri, 5 Nov 2021 16:44:59 +0000 (+0100) Subject: dnsdist: Split the list of downstream connections in two, active and idle X-Git-Tag: dnsdist-1.7.0-beta1~5^2~3 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=54a9a2266a74311fc62835d01df64f0701296a4f;p=thirdparty%2Fpdns.git dnsdist: Split the list of downstream connections in two, active and idle This way we can easily keep track of how many idle connections we have, and try to reuse these first. --- diff --git a/pdns/dnsdist-lua.cc b/pdns/dnsdist-lua.cc index 9e58507796..fde21a9fcc 100644 --- a/pdns/dnsdist-lua.cc +++ b/pdns/dnsdist-lua.cc @@ -1306,11 +1306,11 @@ static void setupLuaConfig(LuaContext& luaCtx, bool client, bool configCheck) }); luaCtx.writeFunction("setMaxCachedTCPConnectionsPerDownstream", [](size_t max) { - DownstreamTCPConnectionsManager::setMaxCachedConnectionsPerDownstream(max); + DownstreamTCPConnectionsManager::setMaxIdleConnectionsPerDownstream(max); }); - luaCtx.writeFunction("setMaxCachedDoHConnectionsPerDownstream", [](size_t max) { - setDoHDownstreamMaxConnectionsPerBackend(max); + luaCtx.writeFunction("setMaxIdleDoHConnectionsPerDownstream", [](size_t max) { + setDoHDownstreamMaxIdleConnectionsPerBackend(max); }); luaCtx.writeFunction("setOutgoingDoHWorkerThreads", [](uint64_t workers) { diff --git a/pdns/dnsdistdist/dnsdist-nghttp2.cc b/pdns/dnsdistdist/dnsdist-nghttp2.cc index 4eb31cec98..5df7f337c8 100644 --- a/pdns/dnsdistdist/dnsdist-nghttp2.cc +++ b/pdns/dnsdistdist/dnsdist-nghttp2.cc @@ -394,7 +394,7 @@ void DoHConnectionToBackend::handleReadableIOCallback(int fd, FDMultiplexer::fun } if (newState == IOState::Done) { - if (conn->getConcurrentStreamsCount() == 0) { + if (conn->isIdle()) { conn->stopIO(); conn->watchForRemoteHostClosingConnection(); ioGuard.release(); @@ -440,7 +440,7 @@ void DoHConnectionToBackend::handleWritableIOCallback(int fd, FDMultiplexer::fun conn->d_out.clear(); conn->d_outPos = 0; conn->stopIO(); - if (conn->getConcurrentStreamsCount() > 0) { + if (!conn->isIdle()) { conn->updateIO(IOState::NeedRead, handleReadableIOCallback); } else { @@ -460,12 +460,15 @@ void DoHConnectionToBackend::stopIO() { d_ioState->reset(); + auto shared = std::dynamic_pointer_cast(shared_from_this()); if (!willBeReusable(false)) { /* remove ourselves from the connection cache, this might mean that our reference count drops to zero after that, so we need to be careful */ - auto shared = std::dynamic_pointer_cast(shared_from_this()); t_downstreamDoHConnectionsManager.removeDownstreamConnection(shared); } + else { + t_downstreamDoHConnectionsManager.moveToIdle(shared); + } } void DoHConnectionToBackend::updateIO(IOState newState, FDMultiplexer::callbackfunc_t callback, bool noTTD) @@ -555,7 +558,7 @@ ssize_t DoHConnectionToBackend::send_callback(nghttp2_session* session, const ui conn->d_out.clear(); conn->d_outPos = 0; conn->stopIO(); - if (conn->getConcurrentStreamsCount() > 0) { + if (!conn->isIdle()) { conn->updateIO(IOState::NeedRead, handleReadableIOCallback); } else { @@ -629,7 +632,7 @@ int DoHConnectionToBackend::on_frame_recv_callback(nghttp2_session* session, con conn->handleResponseError(std::move(request), now); } - if (conn->getConcurrentStreamsCount() == 0) { + if (conn->isIdle()) { conn->stopIO(); conn->watchForRemoteHostClosingConnection(); } @@ -680,7 +683,7 @@ int DoHConnectionToBackend::on_data_chunk_recv_callback(nghttp2_session* session conn->handleResponseError(std::move(request), now); } - if (conn->getConcurrentStreamsCount() == 0) { + if (conn->isIdle()) { conn->stopIO(); conn->watchForRemoteHostClosingConnection(); } @@ -724,7 +727,7 @@ int DoHConnectionToBackend::on_stream_close_callback(nghttp2_session* session, i } //cerr<<"we now have "<getConcurrentStreamsCount()<<" concurrent connections"<getConcurrentStreamsCount() == 0) { + if (conn->isIdle()) { //cerr<<"stopping IO"<stopIO(); conn->watchForRemoteHostClosingConnection(); @@ -1194,9 +1197,9 @@ void setDoHDownstreamMaxIdleTime(uint16_t max) #endif /* HAVE_NGHTTP2 */ } -void setDoHDownstreamMaxConnectionsPerBackend(size_t max) +void setDoHDownstreamMaxIdleConnectionsPerBackend(size_t max) { #ifdef HAVE_NGHTTP2 - DownstreamDoHConnectionsManager::setMaxCachedConnectionsPerDownstream(max); + DownstreamDoHConnectionsManager::setMaxIdleConnectionsPerDownstream(max); #endif /* HAVE_NGHTTP2 */ } diff --git a/pdns/dnsdistdist/dnsdist-nghttp2.hh b/pdns/dnsdistdist/dnsdist-nghttp2.hh index 7b7ed6333d..6e38f28cc3 100644 --- a/pdns/dnsdistdist/dnsdist-nghttp2.hh +++ b/pdns/dnsdistdist/dnsdist-nghttp2.hh @@ -72,4 +72,4 @@ size_t clearH2Connections(); void setDoHDownstreamCleanupInterval(uint16_t max); void setDoHDownstreamMaxIdleTime(uint16_t max); -void setDoHDownstreamMaxConnectionsPerBackend(size_t max); +void setDoHDownstreamMaxIdleConnectionsPerBackend(size_t max); diff --git a/pdns/dnsdistdist/dnsdist-tcp-downstream.cc b/pdns/dnsdistdist/dnsdist-tcp-downstream.cc index 1677d65e7e..7d3158cc21 100644 --- a/pdns/dnsdistdist/dnsdist-tcp-downstream.cc +++ b/pdns/dnsdistdist/dnsdist-tcp-downstream.cc @@ -633,12 +633,14 @@ IOState TCPConnectionToBackend::handleResponse(std::shared_ptrd_ds->outstanding; /* marking as idle for now, so we can accept new queries if our queues are empty */ if (d_pendingQueries.empty() && d_pendingResponses.empty()) { + t_downstreamTCPConnectionsManager.moveToIdle(conn); d_state = State::idle; } } sender->handleXFRResponse(now, std::move(response)); if (done) { + t_downstreamTCPConnectionsManager.moveToIdle(conn); d_state = State::idle; return IOState::Done; } @@ -655,6 +657,7 @@ IOState TCPConnectionToBackend::handleResponse(std::shared_ptr class DownstreamConnectionsManager { + struct SequencedTag {}; + struct OrderedTag {}; + + typedef multi_index_container< + std::shared_ptr, + indexed_by < + ordered_unique, + identity> + >, + /* new elements are added to the front of the sequence */ + sequenced > + > + > list_t; + struct ConnectionLists + { + list_t d_actives; + list_t d_idles; + }; + public: - static void setMaxCachedConnectionsPerDownstream(size_t max) + static void setMaxIdleConnectionsPerDownstream(size_t max) { - s_maxCachedConnectionsPerDownstream = max; + s_maxIdleConnectionsPerDownstream = max; } static void setCleanupInterval(uint16_t interval) @@ -329,37 +348,27 @@ public: if (!haveProxyProtocol) { const auto& it = d_downstreamConnections.find(backendId); if (it != d_downstreamConnections.end()) { - auto& list = it->second; - for (auto listIt = list.begin(); listIt != list.end(); ) { - if (!(*listIt)) { - listIt = list.erase(listIt); - continue; - } - - auto& entry = *listIt; - if (isConnectionUsable(entry, now, freshCutOff)) { - entry->setReused(); - ++ds->tcpReusedConnections; - return entry; - } - - if (entry->willBeReusable(false)) { - ++listIt; - continue; - } - - listIt = list.erase(listIt); + /* first scan idle connections, more recent first */ + auto entry = findUsableConnectionInList(now, freshCutOff, it->second.d_idles, true); + if (entry) { + ++ds->tcpReusedConnections; + it->second.d_actives.insert(entry); + return entry; + } + + /* then scan actives ones, more recent first as well */ + entry = findUsableConnectionInList(now, freshCutOff, it->second.d_actives, false); + if (entry) { + ++ds->tcpReusedConnections; + return entry; } } } auto newConnection = std::make_shared(ds, mplexer, now, std::move(proxyProtocolPayload)); if (!haveProxyProtocol) { - auto& list = d_downstreamConnections[backendId]; - if (list.size() == s_maxCachedConnectionsPerDownstream) { - list.pop_back(); - } - list.push_front(newConnection); + auto& list = d_downstreamConnections[backendId].d_actives; + list.template get().push_front(newConnection); } return newConnection; @@ -379,39 +388,14 @@ public: idleCutOff.tv_sec -= s_maxIdleTime; for (auto dsIt = d_downstreamConnections.begin(); dsIt != d_downstreamConnections.end(); ) { - for (auto connIt = dsIt->second.begin(); connIt != dsIt->second.end(); ) { - if (!(*connIt)) { - connIt = dsIt->second.erase(connIt); - continue; - } - - auto& entry = *connIt; + cleanUpList(dsIt->second.d_idles, now, freshCutOff, idleCutOff); + cleanUpList(dsIt->second.d_actives, now, freshCutOff, idleCutOff); - /* don't bother checking freshly used connections */ - if (freshCutOff < entry->getLastDataReceivedTime()) { - ++connIt; - continue; - } - - if (entry->isIdle() && entry->getLastDataReceivedTime() < idleCutOff) { - /* idle for too long */ - connIt = dsIt->second.erase(connIt); - continue; - } - - if (entry->isUsable()) { - ++connIt; - continue; - } - - connIt = dsIt->second.erase(connIt); - } - - if (!dsIt->second.empty()) { - ++dsIt; + if (dsIt->second.d_idles.empty() && dsIt->second.d_actives.empty()) { + dsIt = d_downstreamConnections.erase(dsIt); } else { - dsIt = d_downstreamConnections.erase(dsIt); + ++dsIt; } } } @@ -420,8 +404,12 @@ public: { size_t count = 0; for (const auto& downstream : d_downstreamConnections) { - count += downstream.second.size(); - for (auto& conn : downstream.second) { + count += downstream.second.d_actives.size(); + for (auto& conn : downstream.second.d_actives) { + conn->stopIO(); + } + count += downstream.second.d_idles.size(); + for (auto& conn : downstream.second.d_idles) { conn->stopIO(); } } @@ -431,35 +419,142 @@ public: } size_t count() const + { + return getActiveCount() + getIdleCount(); + } + + size_t getActiveCount() const { size_t count = 0; for (const auto& downstream : d_downstreamConnections) { - count += downstream.second.size(); + count += downstream.second.d_actives.size(); + } + return count; + } + + size_t getIdleCount() const + { + size_t count = 0; + for (const auto& downstream : d_downstreamConnections) { + count += downstream.second.d_idles.size(); } return count; } bool removeDownstreamConnection(std::shared_ptr& conn) { - bool found = false; auto backendIt = d_downstreamConnections.find(conn->getDS()->getID()); if (backendIt == d_downstreamConnections.end()) { - return found; + return false; } - for (auto connIt = backendIt->second.begin(); connIt != backendIt->second.end(); ++connIt) { - if (*connIt == conn) { - backendIt->second.erase(connIt); - found = true; - break; + /* idle list first */ + { + auto it = backendIt->second.d_idles.find(conn); + if (it != backendIt->second.d_idles.end()) { + backendIt->second.d_idles.erase(it); + return true; + } + } + /* then active */ + { + auto it = backendIt->second.d_actives.find(conn); + if (it != backendIt->second.d_actives.end()) { + backendIt->second.d_actives.erase(it); + return true; } } - return found; + return false; + } + + bool moveToIdle(std::shared_ptr& conn) + { + auto backendIt = d_downstreamConnections.find(conn->getDS()->getID()); + if (backendIt == d_downstreamConnections.end()) { + return false; + } + + auto it = backendIt->second.d_actives.find(conn); + if (it == backendIt->second.d_actives.end()) { + return false; + } + + backendIt->second.d_actives.erase(it); + + if (backendIt->second.d_idles.size() >= s_maxIdleConnectionsPerDownstream) { + backendIt->second.d_idles.template get().pop_back(); + } + + backendIt->second.d_idles.template get().push_front(conn); + return true; } protected: + void cleanUpList(list_t& list, const struct timeval& now, const struct timeval& freshCutOff, const struct timeval& idleCutOff) + { + auto& sidx = list.template get(); + for (auto connIt = sidx.begin(); connIt != sidx.end(); ) { + if (!(*connIt)) { + connIt = sidx.erase(connIt); + continue; + } + + auto& entry = *connIt; + + /* don't bother checking freshly used connections */ + if (freshCutOff < entry->getLastDataReceivedTime()) { + ++connIt; + continue; + } + + if (entry->isIdle() && entry->getLastDataReceivedTime() < idleCutOff) { + /* idle for too long */ + connIt = sidx.erase(connIt); + continue; + } + + if (entry->isUsable()) { + ++connIt; + continue; + } + + connIt = sidx.erase(connIt); + } + } + + std::shared_ptr findUsableConnectionInList(const struct timeval& now, struct timeval& freshCutOff, list_t& list, bool removeIfFound) + { + auto& sidx = list.template get(); + for (auto listIt = sidx.begin(); listIt != sidx.end(); ) { + if (!(*listIt)) { + listIt = sidx.erase(listIt); + continue; + } + + auto& entry = *listIt; + if (isConnectionUsable(entry, now, freshCutOff)) { + entry->setReused(); + auto result = entry; + if (removeIfFound) { + sidx.erase(listIt); + } + return result; + } + + if (entry->willBeReusable(false)) { + ++listIt; + continue; + } + + /* that connection will not be usable later, no need to keep it in that list */ + listIt = sidx.erase(listIt); + } + + return nullptr; + } + bool isConnectionUsable(const std::shared_ptr& conn, const struct timeval& now, const struct timeval& freshCutOff) { if (!conn->canBeReused()) { @@ -480,15 +575,16 @@ protected: return false; } - static size_t s_maxCachedConnectionsPerDownstream; + static size_t s_maxIdleConnectionsPerDownstream; static uint16_t s_cleanupInterval; static uint16_t s_maxIdleTime; - std::map>> d_downstreamConnections; + std::map d_downstreamConnections; + time_t d_nextCleanup{0}; }; -template size_t DownstreamConnectionsManager::s_maxCachedConnectionsPerDownstream{10}; +template size_t DownstreamConnectionsManager::s_maxIdleConnectionsPerDownstream{10}; template uint16_t DownstreamConnectionsManager::s_cleanupInterval{60}; template uint16_t DownstreamConnectionsManager::s_maxIdleTime{300}; diff --git a/pdns/dnsdistdist/docs/reference/tuning.rst b/pdns/dnsdistdist/docs/reference/tuning.rst index 5eeb400d4b..90442f5b80 100644 --- a/pdns/dnsdistdist/docs/reference/tuning.rst +++ b/pdns/dnsdistdist/docs/reference/tuning.rst @@ -16,7 +16,7 @@ Tuning related functions :param int max: The maximum time in seconds. -.. function:: setMaxCachedDoHConnectionsPerDownstream(max) +.. function:: setMaxIdleDoHConnectionsPerDownstream(max) .. versionadded:: 1.7.0 diff --git a/pdns/dnsdistdist/test-dnsdist-connections-cache.cc b/pdns/dnsdistdist/test-dnsdist-connections-cache.cc index da9e733a46..6cde1186da 100644 --- a/pdns/dnsdistdist/test-dnsdist-connections-cache.cc +++ b/pdns/dnsdistdist/test-dnsdist-connections-cache.cc @@ -29,7 +29,7 @@ class MockupConnection { public: - MockupConnection(const std::shared_ptr&, std::unique_ptr&, const struct timeval&, std::string&&) + MockupConnection(const std::shared_ptr& ds, std::unique_ptr&, const struct timeval&, std::string&&): d_ds(ds) { } @@ -66,6 +66,12 @@ public: { } + std::shared_ptr getDS() const + { + return d_ds; + } + + std::shared_ptr d_ds; struct timeval d_lastDataReceivedTime { 0, 0 @@ -80,10 +86,10 @@ BOOST_AUTO_TEST_SUITE(test_dnsdist_connections_cache) BOOST_AUTO_TEST_CASE(test_ConnectionsCache) { DownstreamConnectionsManager manager; - const size_t maxConnPerDownstream = 5; + const size_t maxIdleConnPerDownstream = 5; const uint16_t cleanupInterval = 1; const uint16_t maxIdleTime = 5; - manager.setMaxCachedConnectionsPerDownstream(maxConnPerDownstream); + manager.setMaxIdleConnectionsPerDownstream(maxIdleConnPerDownstream); manager.setCleanupInterval(cleanupInterval); manager.setMaxIdleTime(maxIdleTime); @@ -96,12 +102,15 @@ BOOST_AUTO_TEST_CASE(test_ConnectionsCache) auto conn = manager.getConnectionToDownstream(mplexer, downstream1, now, std::string()); BOOST_REQUIRE(conn != nullptr); BOOST_CHECK_EQUAL(manager.count(), 1U); + BOOST_CHECK_EQUAL(manager.getActiveCount(), 1U); + BOOST_CHECK_EQUAL(manager.getIdleCount(), 0U); /* since the connection can be reused, we should get the same one */ { auto conn1 = manager.getConnectionToDownstream(mplexer, downstream1, now, std::string()); BOOST_CHECK(conn.get() == conn1.get()); BOOST_CHECK_EQUAL(manager.count(), 1U); + BOOST_CHECK_EQUAL(manager.getActiveCount(), 1U); } /* if we mark it non-usable, we should get a new one */ @@ -109,12 +118,14 @@ BOOST_AUTO_TEST_CASE(test_ConnectionsCache) auto conn2 = manager.getConnectionToDownstream(mplexer, downstream1, now, std::string()); BOOST_CHECK(conn.get() != conn2.get()); BOOST_CHECK_EQUAL(manager.count(), 2U); + BOOST_CHECK_EQUAL(manager.getActiveCount(), 2U); /* since the second connection can be reused, we should get it */ { auto conn3 = manager.getConnectionToDownstream(mplexer, downstream1, now, std::string()); BOOST_CHECK(conn3.get() == conn2.get()); BOOST_CHECK_EQUAL(manager.count(), 2U); + BOOST_CHECK_EQUAL(manager.getActiveCount(), 2U); } /* different downstream so different connection */ @@ -123,11 +134,13 @@ BOOST_AUTO_TEST_CASE(test_ConnectionsCache) BOOST_CHECK(differentConn.get() != conn.get()); BOOST_CHECK(differentConn.get() != conn2.get()); BOOST_CHECK_EQUAL(manager.count(), 3U); + BOOST_CHECK_EQUAL(manager.getActiveCount(), 3U); { /* but we should be able to reuse it */ auto sameConn = manager.getConnectionToDownstream(mplexer, downstream2, now, std::string()); BOOST_CHECK(sameConn.get() == differentConn.get()); BOOST_CHECK_EQUAL(manager.count(), 3U); + BOOST_CHECK_EQUAL(manager.getActiveCount(), 3U); } struct timeval later = now; @@ -155,16 +168,16 @@ BOOST_AUTO_TEST_CASE(test_ConnectionsCache) conn->d_lastDataReceivedTime.tv_sec = 0; std::vector> conns = {conn}; - while (conns.size() < maxConnPerDownstream) { + while (conns.size() < maxIdleConnPerDownstream) { auto newConn = manager.getConnectionToDownstream(mplexer, downstream1, now, std::string()); newConn->d_usable = false; conns.push_back(newConn); BOOST_CHECK_EQUAL(manager.count(), conns.size()); } - /* if we add a new one, the oldest should get expunged */ + /* if we add a new one, the oldest should NOT get expunged because they are all active ones! */ auto newConn = manager.getConnectionToDownstream(mplexer, downstream1, now, std::string()); - BOOST_CHECK_EQUAL(manager.count(), maxConnPerDownstream); + BOOST_CHECK_GT(manager.count(), maxIdleConnPerDownstream); { /* mark all connections as not usable anymore */ @@ -175,7 +188,7 @@ BOOST_AUTO_TEST_CASE(test_ConnectionsCache) /* except the last one */ newConn->d_usable = true; - BOOST_CHECK_EQUAL(manager.count(), maxConnPerDownstream); + BOOST_CHECK_EQUAL(manager.count(), conns.size() + 1); later.tv_sec += cleanupInterval + 1; manager.cleanupClosedConnections(later); BOOST_CHECK_EQUAL(manager.count(), 1U); @@ -184,6 +197,25 @@ BOOST_AUTO_TEST_CASE(test_ConnectionsCache) conns.clear(); auto cleared = manager.clear(); BOOST_CHECK_EQUAL(cleared, 1U); + + /* add 10 actives connections */ + while (conns.size() < 10) { + newConn = manager.getConnectionToDownstream(mplexer, downstream1, now, std::string()); + newConn->d_usable = false; + conns.push_back(newConn); + BOOST_CHECK_EQUAL(manager.count(), conns.size()); + BOOST_CHECK_EQUAL(manager.getActiveCount(), conns.size()); + } + /* now we mark them as idle */ + for (auto& c : conns) { + /* use a different shared_ptr to make sure that the comparison is done on the actual raw pointer */ + auto shared = c; + shared->d_idle = true; + BOOST_CHECK(manager.moveToIdle(shared)); + } + BOOST_CHECK_EQUAL(manager.count(), maxIdleConnPerDownstream); + BOOST_CHECK_EQUAL(manager.getActiveCount(), 0U); + BOOST_CHECK_EQUAL(manager.getIdleCount(), maxIdleConnPerDownstream); } BOOST_AUTO_TEST_SUITE_END();