});
luaCtx.writeFunction("setMaxCachedTCPConnectionsPerDownstream", [](size_t max) {
- DownstreamTCPConnectionsManager::setMaxCachedConnectionsPerDownstream(max);
+ DownstreamTCPConnectionsManager::setMaxIdleConnectionsPerDownstream(max);
});
- luaCtx.writeFunction("setMaxCachedDoHConnectionsPerDownstream", [](size_t max) {
- setDoHDownstreamMaxConnectionsPerBackend(max);
+ luaCtx.writeFunction("setMaxIdleDoHConnectionsPerDownstream", [](size_t max) {
+ setDoHDownstreamMaxIdleConnectionsPerBackend(max);
});
luaCtx.writeFunction("setOutgoingDoHWorkerThreads", [](uint64_t workers) {
}
if (newState == IOState::Done) {
- if (conn->getConcurrentStreamsCount() == 0) {
+ if (conn->isIdle()) {
conn->stopIO();
conn->watchForRemoteHostClosingConnection();
ioGuard.release();
conn->d_out.clear();
conn->d_outPos = 0;
conn->stopIO();
- if (conn->getConcurrentStreamsCount() > 0) {
+ if (!conn->isIdle()) {
conn->updateIO(IOState::NeedRead, handleReadableIOCallback);
}
else {
{
d_ioState->reset();
+ auto shared = std::dynamic_pointer_cast<DoHConnectionToBackend>(shared_from_this());
if (!willBeReusable(false)) {
/* remove ourselves from the connection cache, this might mean that our
reference count drops to zero after that, so we need to be careful */
- auto shared = std::dynamic_pointer_cast<DoHConnectionToBackend>(shared_from_this());
t_downstreamDoHConnectionsManager.removeDownstreamConnection(shared);
}
+ else {
+ t_downstreamDoHConnectionsManager.moveToIdle(shared);
+ }
}
void DoHConnectionToBackend::updateIO(IOState newState, FDMultiplexer::callbackfunc_t callback, bool noTTD)
conn->d_out.clear();
conn->d_outPos = 0;
conn->stopIO();
- if (conn->getConcurrentStreamsCount() > 0) {
+ if (!conn->isIdle()) {
conn->updateIO(IOState::NeedRead, handleReadableIOCallback);
}
else {
conn->handleResponseError(std::move(request), now);
}
- if (conn->getConcurrentStreamsCount() == 0) {
+ if (conn->isIdle()) {
conn->stopIO();
conn->watchForRemoteHostClosingConnection();
}
conn->handleResponseError(std::move(request), now);
}
- if (conn->getConcurrentStreamsCount() == 0) {
+ if (conn->isIdle()) {
conn->stopIO();
conn->watchForRemoteHostClosingConnection();
}
}
//cerr<<"we now have "<<conn->getConcurrentStreamsCount()<<" concurrent connections"<<endl;
- if (conn->getConcurrentStreamsCount() == 0) {
+ if (conn->isIdle()) {
//cerr<<"stopping IO"<<endl;
conn->stopIO();
conn->watchForRemoteHostClosingConnection();
#endif /* HAVE_NGHTTP2 */
}
-void setDoHDownstreamMaxConnectionsPerBackend(size_t max)
+void setDoHDownstreamMaxIdleConnectionsPerBackend(size_t max)
{
#ifdef HAVE_NGHTTP2
- DownstreamDoHConnectionsManager::setMaxCachedConnectionsPerDownstream(max);
+ DownstreamDoHConnectionsManager::setMaxIdleConnectionsPerDownstream(max);
#endif /* HAVE_NGHTTP2 */
}
void setDoHDownstreamCleanupInterval(uint16_t max);
void setDoHDownstreamMaxIdleTime(uint16_t max);
-void setDoHDownstreamMaxConnectionsPerBackend(size_t max);
+void setDoHDownstreamMaxIdleConnectionsPerBackend(size_t max);
--conn->d_ds->outstanding;
/* marking as idle for now, so we can accept new queries if our queues are empty */
if (d_pendingQueries.empty() && d_pendingResponses.empty()) {
+ t_downstreamTCPConnectionsManager.moveToIdle(conn);
d_state = State::idle;
}
}
sender->handleXFRResponse(now, std::move(response));
if (done) {
+ t_downstreamTCPConnectionsManager.moveToIdle(conn);
d_state = State::idle;
return IOState::Done;
}
d_pendingResponses.erase(it);
/* marking as idle for now, so we can accept new queries if our queues are empty */
if (d_pendingQueries.empty() && d_pendingResponses.empty()) {
+ t_downstreamTCPConnectionsManager.moveToIdle(conn);
d_state = State::idle;
}
}
else {
DEBUGLOG("nothing to do, waiting for a new query");
+ t_downstreamTCPConnectionsManager.moveToIdle(conn);
d_state = State::idle;
return IOState::Done;
}
template <class T> class DownstreamConnectionsManager
{
+ struct SequencedTag {};
+ struct OrderedTag {};
+
+ typedef multi_index_container<
+ std::shared_ptr<T>,
+ indexed_by <
+ ordered_unique<tag<OrderedTag>,
+ identity<std::shared_ptr<T>>
+ >,
+ /* new elements are added to the front of the sequence */
+ sequenced<tag<SequencedTag> >
+ >
+ > list_t;
+ struct ConnectionLists
+ {
+ list_t d_actives;
+ list_t d_idles;
+ };
+
public:
- static void setMaxCachedConnectionsPerDownstream(size_t max)
+ static void setMaxIdleConnectionsPerDownstream(size_t max)
{
- s_maxCachedConnectionsPerDownstream = max;
+ s_maxIdleConnectionsPerDownstream = max;
}
static void setCleanupInterval(uint16_t interval)
if (!haveProxyProtocol) {
const auto& it = d_downstreamConnections.find(backendId);
if (it != d_downstreamConnections.end()) {
- auto& list = it->second;
- for (auto listIt = list.begin(); listIt != list.end(); ) {
- if (!(*listIt)) {
- listIt = list.erase(listIt);
- continue;
- }
-
- auto& entry = *listIt;
- if (isConnectionUsable(entry, now, freshCutOff)) {
- entry->setReused();
- ++ds->tcpReusedConnections;
- return entry;
- }
-
- if (entry->willBeReusable(false)) {
- ++listIt;
- continue;
- }
-
- listIt = list.erase(listIt);
+ /* first scan idle connections, more recent first */
+ auto entry = findUsableConnectionInList(now, freshCutOff, it->second.d_idles, true);
+ if (entry) {
+ ++ds->tcpReusedConnections;
+ it->second.d_actives.insert(entry);
+ return entry;
+ }
+
+ /* then scan actives ones, more recent first as well */
+ entry = findUsableConnectionInList(now, freshCutOff, it->second.d_actives, false);
+ if (entry) {
+ ++ds->tcpReusedConnections;
+ return entry;
}
}
}
auto newConnection = std::make_shared<T>(ds, mplexer, now, std::move(proxyProtocolPayload));
if (!haveProxyProtocol) {
- auto& list = d_downstreamConnections[backendId];
- if (list.size() == s_maxCachedConnectionsPerDownstream) {
- list.pop_back();
- }
- list.push_front(newConnection);
+ auto& list = d_downstreamConnections[backendId].d_actives;
+ list.template get<SequencedTag>().push_front(newConnection);
}
return newConnection;
idleCutOff.tv_sec -= s_maxIdleTime;
for (auto dsIt = d_downstreamConnections.begin(); dsIt != d_downstreamConnections.end(); ) {
- for (auto connIt = dsIt->second.begin(); connIt != dsIt->second.end(); ) {
- if (!(*connIt)) {
- connIt = dsIt->second.erase(connIt);
- continue;
- }
-
- auto& entry = *connIt;
+ cleanUpList(dsIt->second.d_idles, now, freshCutOff, idleCutOff);
+ cleanUpList(dsIt->second.d_actives, now, freshCutOff, idleCutOff);
- /* don't bother checking freshly used connections */
- if (freshCutOff < entry->getLastDataReceivedTime()) {
- ++connIt;
- continue;
- }
-
- if (entry->isIdle() && entry->getLastDataReceivedTime() < idleCutOff) {
- /* idle for too long */
- connIt = dsIt->second.erase(connIt);
- continue;
- }
-
- if (entry->isUsable()) {
- ++connIt;
- continue;
- }
-
- connIt = dsIt->second.erase(connIt);
- }
-
- if (!dsIt->second.empty()) {
- ++dsIt;
+ if (dsIt->second.d_idles.empty() && dsIt->second.d_actives.empty()) {
+ dsIt = d_downstreamConnections.erase(dsIt);
}
else {
- dsIt = d_downstreamConnections.erase(dsIt);
+ ++dsIt;
}
}
}
{
size_t count = 0;
for (const auto& downstream : d_downstreamConnections) {
- count += downstream.second.size();
- for (auto& conn : downstream.second) {
+ count += downstream.second.d_actives.size();
+ for (auto& conn : downstream.second.d_actives) {
+ conn->stopIO();
+ }
+ count += downstream.second.d_idles.size();
+ for (auto& conn : downstream.second.d_idles) {
conn->stopIO();
}
}
}
size_t count() const
+ {
+ return getActiveCount() + getIdleCount();
+ }
+
+ size_t getActiveCount() const
{
size_t count = 0;
for (const auto& downstream : d_downstreamConnections) {
- count += downstream.second.size();
+ count += downstream.second.d_actives.size();
+ }
+ return count;
+ }
+
+ size_t getIdleCount() const
+ {
+ size_t count = 0;
+ for (const auto& downstream : d_downstreamConnections) {
+ count += downstream.second.d_idles.size();
}
return count;
}
bool removeDownstreamConnection(std::shared_ptr<T>& conn)
{
- bool found = false;
auto backendIt = d_downstreamConnections.find(conn->getDS()->getID());
if (backendIt == d_downstreamConnections.end()) {
- return found;
+ return false;
}
- for (auto connIt = backendIt->second.begin(); connIt != backendIt->second.end(); ++connIt) {
- if (*connIt == conn) {
- backendIt->second.erase(connIt);
- found = true;
- break;
+ /* idle list first */
+ {
+ auto it = backendIt->second.d_idles.find(conn);
+ if (it != backendIt->second.d_idles.end()) {
+ backendIt->second.d_idles.erase(it);
+ return true;
+ }
+ }
+ /* then active */
+ {
+ auto it = backendIt->second.d_actives.find(conn);
+ if (it != backendIt->second.d_actives.end()) {
+ backendIt->second.d_actives.erase(it);
+ return true;
}
}
- return found;
+ return false;
+ }
+
+ bool moveToIdle(std::shared_ptr<T>& conn)
+ {
+ auto backendIt = d_downstreamConnections.find(conn->getDS()->getID());
+ if (backendIt == d_downstreamConnections.end()) {
+ return false;
+ }
+
+ auto it = backendIt->second.d_actives.find(conn);
+ if (it == backendIt->second.d_actives.end()) {
+ return false;
+ }
+
+ backendIt->second.d_actives.erase(it);
+
+ if (backendIt->second.d_idles.size() >= s_maxIdleConnectionsPerDownstream) {
+ backendIt->second.d_idles.template get<SequencedTag>().pop_back();
+ }
+
+ backendIt->second.d_idles.template get<SequencedTag>().push_front(conn);
+ return true;
}
protected:
+ void cleanUpList(list_t& list, const struct timeval& now, const struct timeval& freshCutOff, const struct timeval& idleCutOff)
+ {
+ auto& sidx = list.template get<SequencedTag>();
+ for (auto connIt = sidx.begin(); connIt != sidx.end(); ) {
+ if (!(*connIt)) {
+ connIt = sidx.erase(connIt);
+ continue;
+ }
+
+ auto& entry = *connIt;
+
+ /* don't bother checking freshly used connections */
+ if (freshCutOff < entry->getLastDataReceivedTime()) {
+ ++connIt;
+ continue;
+ }
+
+ if (entry->isIdle() && entry->getLastDataReceivedTime() < idleCutOff) {
+ /* idle for too long */
+ connIt = sidx.erase(connIt);
+ continue;
+ }
+
+ if (entry->isUsable()) {
+ ++connIt;
+ continue;
+ }
+
+ connIt = sidx.erase(connIt);
+ }
+ }
+
+ std::shared_ptr<T> findUsableConnectionInList(const struct timeval& now, struct timeval& freshCutOff, list_t& list, bool removeIfFound)
+ {
+ auto& sidx = list.template get<SequencedTag>();
+ for (auto listIt = sidx.begin(); listIt != sidx.end(); ) {
+ if (!(*listIt)) {
+ listIt = sidx.erase(listIt);
+ continue;
+ }
+
+ auto& entry = *listIt;
+ if (isConnectionUsable(entry, now, freshCutOff)) {
+ entry->setReused();
+ auto result = entry;
+ if (removeIfFound) {
+ sidx.erase(listIt);
+ }
+ return result;
+ }
+
+ if (entry->willBeReusable(false)) {
+ ++listIt;
+ continue;
+ }
+
+ /* that connection will not be usable later, no need to keep it in that list */
+ listIt = sidx.erase(listIt);
+ }
+
+ return nullptr;
+ }
+
bool isConnectionUsable(const std::shared_ptr<T>& conn, const struct timeval& now, const struct timeval& freshCutOff)
{
if (!conn->canBeReused()) {
return false;
}
- static size_t s_maxCachedConnectionsPerDownstream;
+ static size_t s_maxIdleConnectionsPerDownstream;
static uint16_t s_cleanupInterval;
static uint16_t s_maxIdleTime;
- std::map<boost::uuids::uuid, std::deque<std::shared_ptr<T>>> d_downstreamConnections;
+ std::map<boost::uuids::uuid, ConnectionLists> d_downstreamConnections;
+
time_t d_nextCleanup{0};
};
-template <class T> size_t DownstreamConnectionsManager<T>::s_maxCachedConnectionsPerDownstream{10};
+template <class T> size_t DownstreamConnectionsManager<T>::s_maxIdleConnectionsPerDownstream{10};
template <class T> uint16_t DownstreamConnectionsManager<T>::s_cleanupInterval{60};
template <class T> uint16_t DownstreamConnectionsManager<T>::s_maxIdleTime{300};
:param int max: The maximum time in seconds.
-.. function:: setMaxCachedDoHConnectionsPerDownstream(max)
+.. function:: setMaxIdleDoHConnectionsPerDownstream(max)
.. versionadded:: 1.7.0
class MockupConnection
{
public:
- MockupConnection(const std::shared_ptr<DownstreamState>&, std::unique_ptr<FDMultiplexer>&, const struct timeval&, std::string&&)
+ MockupConnection(const std::shared_ptr<DownstreamState>& ds, std::unique_ptr<FDMultiplexer>&, const struct timeval&, std::string&&): d_ds(ds)
{
}
{
}
+ std::shared_ptr<DownstreamState> getDS() const
+ {
+ return d_ds;
+ }
+
+ std::shared_ptr<DownstreamState> d_ds;
struct timeval d_lastDataReceivedTime
{
0, 0
BOOST_AUTO_TEST_CASE(test_ConnectionsCache)
{
DownstreamConnectionsManager<MockupConnection> manager;
- const size_t maxConnPerDownstream = 5;
+ const size_t maxIdleConnPerDownstream = 5;
const uint16_t cleanupInterval = 1;
const uint16_t maxIdleTime = 5;
- manager.setMaxCachedConnectionsPerDownstream(maxConnPerDownstream);
+ manager.setMaxIdleConnectionsPerDownstream(maxIdleConnPerDownstream);
manager.setCleanupInterval(cleanupInterval);
manager.setMaxIdleTime(maxIdleTime);
auto conn = manager.getConnectionToDownstream(mplexer, downstream1, now, std::string());
BOOST_REQUIRE(conn != nullptr);
BOOST_CHECK_EQUAL(manager.count(), 1U);
+ BOOST_CHECK_EQUAL(manager.getActiveCount(), 1U);
+ BOOST_CHECK_EQUAL(manager.getIdleCount(), 0U);
/* since the connection can be reused, we should get the same one */
{
auto conn1 = manager.getConnectionToDownstream(mplexer, downstream1, now, std::string());
BOOST_CHECK(conn.get() == conn1.get());
BOOST_CHECK_EQUAL(manager.count(), 1U);
+ BOOST_CHECK_EQUAL(manager.getActiveCount(), 1U);
}
/* if we mark it non-usable, we should get a new one */
auto conn2 = manager.getConnectionToDownstream(mplexer, downstream1, now, std::string());
BOOST_CHECK(conn.get() != conn2.get());
BOOST_CHECK_EQUAL(manager.count(), 2U);
+ BOOST_CHECK_EQUAL(manager.getActiveCount(), 2U);
/* since the second connection can be reused, we should get it */
{
auto conn3 = manager.getConnectionToDownstream(mplexer, downstream1, now, std::string());
BOOST_CHECK(conn3.get() == conn2.get());
BOOST_CHECK_EQUAL(manager.count(), 2U);
+ BOOST_CHECK_EQUAL(manager.getActiveCount(), 2U);
}
/* different downstream so different connection */
BOOST_CHECK(differentConn.get() != conn.get());
BOOST_CHECK(differentConn.get() != conn2.get());
BOOST_CHECK_EQUAL(manager.count(), 3U);
+ BOOST_CHECK_EQUAL(manager.getActiveCount(), 3U);
{
/* but we should be able to reuse it */
auto sameConn = manager.getConnectionToDownstream(mplexer, downstream2, now, std::string());
BOOST_CHECK(sameConn.get() == differentConn.get());
BOOST_CHECK_EQUAL(manager.count(), 3U);
+ BOOST_CHECK_EQUAL(manager.getActiveCount(), 3U);
}
struct timeval later = now;
conn->d_lastDataReceivedTime.tv_sec = 0;
std::vector<std::shared_ptr<MockupConnection>> conns = {conn};
- while (conns.size() < maxConnPerDownstream) {
+ while (conns.size() < maxIdleConnPerDownstream) {
auto newConn = manager.getConnectionToDownstream(mplexer, downstream1, now, std::string());
newConn->d_usable = false;
conns.push_back(newConn);
BOOST_CHECK_EQUAL(manager.count(), conns.size());
}
- /* if we add a new one, the oldest should get expunged */
+ /* if we add a new one, the oldest should NOT get expunged because they are all active ones! */
auto newConn = manager.getConnectionToDownstream(mplexer, downstream1, now, std::string());
- BOOST_CHECK_EQUAL(manager.count(), maxConnPerDownstream);
+ BOOST_CHECK_GT(manager.count(), maxIdleConnPerDownstream);
{
/* mark all connections as not usable anymore */
/* except the last one */
newConn->d_usable = true;
- BOOST_CHECK_EQUAL(manager.count(), maxConnPerDownstream);
+ BOOST_CHECK_EQUAL(manager.count(), conns.size() + 1);
later.tv_sec += cleanupInterval + 1;
manager.cleanupClosedConnections(later);
BOOST_CHECK_EQUAL(manager.count(), 1U);
conns.clear();
auto cleared = manager.clear();
BOOST_CHECK_EQUAL(cleared, 1U);
+
+ /* add 10 actives connections */
+ while (conns.size() < 10) {
+ newConn = manager.getConnectionToDownstream(mplexer, downstream1, now, std::string());
+ newConn->d_usable = false;
+ conns.push_back(newConn);
+ BOOST_CHECK_EQUAL(manager.count(), conns.size());
+ BOOST_CHECK_EQUAL(manager.getActiveCount(), conns.size());
+ }
+ /* now we mark them as idle */
+ for (auto& c : conns) {
+ /* use a different shared_ptr to make sure that the comparison is done on the actual raw pointer */
+ auto shared = c;
+ shared->d_idle = true;
+ BOOST_CHECK(manager.moveToIdle(shared));
+ }
+ BOOST_CHECK_EQUAL(manager.count(), maxIdleConnPerDownstream);
+ BOOST_CHECK_EQUAL(manager.getActiveCount(), 0U);
+ BOOST_CHECK_EQUAL(manager.getIdleCount(), maxIdleConnPerDownstream);
}
BOOST_AUTO_TEST_SUITE_END();