});
luaCtx.writeFunction("setMaxCachedTCPConnectionsPerDownstream", [](uint64_t max) {
- DownstreamTCPConnectionsManager::setMaxIdleConnectionsPerDownstream(max);
+ setTCPDownstreamMaxIdleConnectionsPerBackend(max);
});
luaCtx.writeFunction("setMaxIdleDoHConnectionsPerDownstream", [](uint64_t max) {
luaCtx.writeFunction("setTCPDownstreamCleanupInterval", [](uint64_t interval) {
setLuaSideEffect();
checkParameterBound("setTCPDownstreamCleanupInterval", interval);
- DownstreamTCPConnectionsManager::setCleanupInterval(interval);
+ setTCPDownstreamCleanupInterval(interval);
});
luaCtx.writeFunction("setDoHDownstreamCleanupInterval", [](uint64_t interval) {
luaCtx.writeFunction("setTCPDownstreamMaxIdleTime", [](uint64_t max) {
setLuaSideEffect();
checkParameterBound("setTCPDownstreamMaxIdleTime", max);
- DownstreamTCPConnectionsManager::setMaxIdleTime(max);
+ setTCPDownstreamMaxIdleTime(max);
});
luaCtx.writeFunction("setDoHDownstreamMaxIdleTime", [](uint64_t max) {
#include "dnsdist-rings.hh"
#include "dnsdist-tcp.hh"
#include "dnsdist-tcp-downstream.hh"
+#include "dnsdist-downstream-connection.hh"
#include "dnsdist-tcp-upstream.hh"
#include "dnsdist-xpf.hh"
#include "dnsparser.hh"
dnsdist-console.cc dnsdist-console.hh \
dnsdist-discovery.cc dnsdist-discovery.hh \
dnsdist-dnscrypt.cc \
+ dnsdist-downstream-connection.hh \
dnsdist-dynblocks.cc dnsdist-dynblocks.hh \
dnsdist-dynbpf.cc dnsdist-dynbpf.hh \
dnsdist-ecs.cc dnsdist-ecs.hh \
dnscrypt.cc dnscrypt.hh \
dnsdist-backend.cc \
dnsdist-cache.cc dnsdist-cache.hh \
+ dnsdist-downstream-connection.hh \
dnsdist-dynblocks.cc dnsdist-dynblocks.hh \
dnsdist-dynbpf.cc dnsdist-dynbpf.hh \
dnsdist-ecs.cc dnsdist-ecs.hh \
--- /dev/null
+#pragma once
+
+#include <boost/multi_index_container.hpp>
+#include <boost/multi_index/ordered_index.hpp>
+#include <boost/multi_index/sequenced_index.hpp>
+#include <boost/multi_index/key_extractors.hpp>
+
+#include "tcpiohandler-mplexer.hh"
+#include "dnsdist-tcp.hh"
+
+template <class T>
+class DownstreamConnectionsManager
+{
+ struct SequencedTag
+ {
+ };
+ struct OrderedTag
+ {
+ };
+
+ typedef multi_index_container<
+ std::shared_ptr<T>,
+ indexed_by<
+ ordered_unique<tag<OrderedTag>,
+ identity<std::shared_ptr<T>>>,
+ /* new elements are added to the front of the sequence */
+ sequenced<tag<SequencedTag>>>>
+ list_t;
+ struct ConnectionLists
+ {
+ list_t d_actives;
+ list_t d_idles;
+ };
+
+public:
+ static void setMaxIdleConnectionsPerDownstream(size_t max)
+ {
+ s_maxIdleConnectionsPerDownstream = max;
+ }
+
+ static void setCleanupInterval(uint16_t interval)
+ {
+ s_cleanupInterval = interval;
+ }
+
+ static void setMaxIdleTime(uint16_t max)
+ {
+ s_maxIdleTime = max;
+ }
+
+ std::shared_ptr<T> getConnectionToDownstream(std::unique_ptr<FDMultiplexer>& mplexer, const std::shared_ptr<DownstreamState>& ds, const struct timeval& now, std::string&& proxyProtocolPayload)
+ {
+ struct timeval freshCutOff = now;
+ freshCutOff.tv_sec -= 1;
+
+ auto backendId = ds->getID();
+
+ cleanupClosedConnections(now);
+
+ const bool haveProxyProtocol = ds->d_config.useProxyProtocol || !proxyProtocolPayload.empty();
+ if (!haveProxyProtocol) {
+ const auto& it = d_downstreamConnections.find(backendId);
+ if (it != d_downstreamConnections.end()) {
+ /* first scan idle connections, more recent first */
+ auto entry = findUsableConnectionInList(now, freshCutOff, it->second.d_idles, true);
+ if (entry) {
+ ++ds->tcpReusedConnections;
+ it->second.d_actives.insert(entry);
+ return entry;
+ }
+
+ /* then scan actives ones, more recent first as well */
+ entry = findUsableConnectionInList(now, freshCutOff, it->second.d_actives, false);
+ if (entry) {
+ ++ds->tcpReusedConnections;
+ return entry;
+ }
+ }
+ }
+
+ auto newConnection = std::make_shared<T>(ds, mplexer, now, std::move(proxyProtocolPayload));
+ if (!haveProxyProtocol) {
+ auto& list = d_downstreamConnections[backendId].d_actives;
+ list.template get<SequencedTag>().push_front(newConnection);
+ }
+
+ return newConnection;
+ }
+
+ void cleanupClosedConnections(const struct timeval& now)
+ {
+ if (s_cleanupInterval == 0 || (d_nextCleanup != 0 && d_nextCleanup > now.tv_sec)) {
+ return;
+ }
+
+ d_nextCleanup = now.tv_sec + s_cleanupInterval;
+
+ struct timeval freshCutOff = now;
+ freshCutOff.tv_sec -= 1;
+ struct timeval idleCutOff = now;
+ idleCutOff.tv_sec -= s_maxIdleTime;
+
+ for (auto dsIt = d_downstreamConnections.begin(); dsIt != d_downstreamConnections.end();) {
+ cleanUpList(dsIt->second.d_idles, now, freshCutOff, idleCutOff);
+ cleanUpList(dsIt->second.d_actives, now, freshCutOff, idleCutOff);
+
+ if (dsIt->second.d_idles.empty() && dsIt->second.d_actives.empty()) {
+ dsIt = d_downstreamConnections.erase(dsIt);
+ }
+ else {
+ ++dsIt;
+ }
+ }
+ }
+
+ size_t clear()
+ {
+ size_t count = 0;
+ for (const auto& downstream : d_downstreamConnections) {
+ count += downstream.second.d_actives.size();
+ for (auto& conn : downstream.second.d_actives) {
+ conn->stopIO();
+ }
+ count += downstream.second.d_idles.size();
+ for (auto& conn : downstream.second.d_idles) {
+ conn->stopIO();
+ }
+ }
+
+ d_downstreamConnections.clear();
+ return count;
+ }
+
+ size_t count() const
+ {
+ return getActiveCount() + getIdleCount();
+ }
+
+ size_t getActiveCount() const
+ {
+ size_t count = 0;
+ for (const auto& downstream : d_downstreamConnections) {
+ count += downstream.second.d_actives.size();
+ }
+ return count;
+ }
+
+ size_t getIdleCount() const
+ {
+ size_t count = 0;
+ for (const auto& downstream : d_downstreamConnections) {
+ count += downstream.second.d_idles.size();
+ }
+ return count;
+ }
+
+ bool removeDownstreamConnection(std::shared_ptr<T>& conn)
+ {
+ auto backendIt = d_downstreamConnections.find(conn->getDS()->getID());
+ if (backendIt == d_downstreamConnections.end()) {
+ return false;
+ }
+
+ /* idle list first */
+ {
+ auto it = backendIt->second.d_idles.find(conn);
+ if (it != backendIt->second.d_idles.end()) {
+ backendIt->second.d_idles.erase(it);
+ return true;
+ }
+ }
+ /* then active */
+ {
+ auto it = backendIt->second.d_actives.find(conn);
+ if (it != backendIt->second.d_actives.end()) {
+ backendIt->second.d_actives.erase(it);
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ bool moveToIdle(std::shared_ptr<T>& conn)
+ {
+ auto backendIt = d_downstreamConnections.find(conn->getDS()->getID());
+ if (backendIt == d_downstreamConnections.end()) {
+ return false;
+ }
+
+ auto it = backendIt->second.d_actives.find(conn);
+ if (it == backendIt->second.d_actives.end()) {
+ return false;
+ }
+
+ backendIt->second.d_actives.erase(it);
+
+ if (backendIt->second.d_idles.size() >= s_maxIdleConnectionsPerDownstream) {
+ backendIt->second.d_idles.template get<SequencedTag>().pop_back();
+ }
+
+ backendIt->second.d_idles.template get<SequencedTag>().push_front(conn);
+ return true;
+ }
+
+protected:
+ void cleanUpList(list_t& list, const struct timeval& now, const struct timeval& freshCutOff, const struct timeval& idleCutOff)
+ {
+ auto& sidx = list.template get<SequencedTag>();
+ for (auto connIt = sidx.begin(); connIt != sidx.end();) {
+ if (!(*connIt)) {
+ connIt = sidx.erase(connIt);
+ continue;
+ }
+
+ auto& entry = *connIt;
+
+ /* don't bother checking freshly used connections */
+ if (freshCutOff < entry->getLastDataReceivedTime()) {
+ ++connIt;
+ continue;
+ }
+
+ if (entry->isIdle() && entry->getLastDataReceivedTime() < idleCutOff) {
+ /* idle for too long */
+ connIt = sidx.erase(connIt);
+ continue;
+ }
+
+ if (entry->isUsable()) {
+ ++connIt;
+ continue;
+ }
+
+ connIt = sidx.erase(connIt);
+ }
+ }
+
+ std::shared_ptr<T> findUsableConnectionInList(const struct timeval& now, const struct timeval& freshCutOff, list_t& list, bool removeIfFound)
+ {
+ auto& sidx = list.template get<SequencedTag>();
+ for (auto listIt = sidx.begin(); listIt != sidx.end();) {
+ if (!(*listIt)) {
+ listIt = sidx.erase(listIt);
+ continue;
+ }
+
+ auto& entry = *listIt;
+ if (isConnectionUsable(entry, now, freshCutOff)) {
+ entry->setReused();
+ // make a copy since the iterator will be invalidated after erasing
+ auto result = entry;
+ if (removeIfFound) {
+ sidx.erase(listIt);
+ }
+ return result;
+ }
+
+ if (entry->willBeReusable(false)) {
+ ++listIt;
+ continue;
+ }
+
+ /* that connection will not be usable later, no need to keep it in that list */
+ listIt = sidx.erase(listIt);
+ }
+
+ return nullptr;
+ }
+
+ bool isConnectionUsable(const std::shared_ptr<T>& conn, const struct timeval& now, const struct timeval& freshCutOff)
+ {
+ if (!conn->canBeReused()) {
+ return false;
+ }
+
+ /* for connections that have not been used very recently,
+ check whether they have been closed in the meantime */
+ if (freshCutOff < conn->getLastDataReceivedTime()) {
+ /* used recently enough, skip the check */
+ return true;
+ }
+
+ return conn->isUsable();
+ }
+
+ static size_t s_maxIdleConnectionsPerDownstream;
+ static uint16_t s_cleanupInterval;
+ static uint16_t s_maxIdleTime;
+
+ std::map<boost::uuids::uuid, ConnectionLists> d_downstreamConnections;
+
+ time_t d_nextCleanup{0};
+};
+
+template <class T>
+size_t DownstreamConnectionsManager<T>::s_maxIdleConnectionsPerDownstream{10};
+template <class T>
+uint16_t DownstreamConnectionsManager<T>::s_cleanupInterval{60};
+template <class T>
+uint16_t DownstreamConnectionsManager<T>::s_maxIdleTime{300};
+
+using DownstreamTCPConnectionsManager = DownstreamConnectionsManager<TCPConnectionToBackend>;
+extern thread_local DownstreamTCPConnectionsManager t_downstreamTCPConnectionsManager;
#include "dnsdist-nghttp2.hh"
#include "dnsdist-tcp.hh"
#include "dnsdist-tcp-downstream.hh"
+#include "dnsdist-downstream-connection.hh"
#include "dolog.hh"
#include "iputils.hh"
#include "dnsdist-session-cache.hh"
#include "dnsdist-tcp-downstream.hh"
#include "dnsdist-tcp-upstream.hh"
+#include "dnsdist-downstream-connection.hh"
#include "dnsparser.hh"
}
return done;
}
+
+void setTCPDownstreamMaxIdleConnectionsPerBackend(uint64_t max)
+{
+ DownstreamTCPConnectionsManager::setMaxIdleConnectionsPerDownstream(max);
+}
+
+void setTCPDownstreamCleanupInterval(uint64_t interval)
+{
+ DownstreamTCPConnectionsManager::setCleanupInterval(interval);
+}
+
+void setTCPDownstreamMaxIdleTime(uint64_t max)
+{
+ DownstreamTCPConnectionsManager::setMaxIdleTime(max);
+}
#pragma once
-#include <boost/multi_index_container.hpp>
-#include <boost/multi_index/ordered_index.hpp>
-#include <boost/multi_index/sequenced_index.hpp>
-#include <boost/multi_index/key_extractors.hpp>
-
-#include <queue>
-
#include "sstuff.hh"
#include "tcpiohandler-mplexer.hh"
#include "dnsdist.hh"
State d_state{State::idle};
};
-template <class T> class DownstreamConnectionsManager
-{
- struct SequencedTag {};
- struct OrderedTag {};
-
- typedef multi_index_container<
- std::shared_ptr<T>,
- indexed_by <
- ordered_unique<tag<OrderedTag>,
- identity<std::shared_ptr<T>>
- >,
- /* new elements are added to the front of the sequence */
- sequenced<tag<SequencedTag> >
- >
- > list_t;
- struct ConnectionLists
- {
- list_t d_actives;
- list_t d_idles;
- };
-
-public:
- static void setMaxIdleConnectionsPerDownstream(size_t max)
- {
- s_maxIdleConnectionsPerDownstream = max;
- }
-
- static void setCleanupInterval(uint16_t interval)
- {
- s_cleanupInterval = interval;
- }
-
- static void setMaxIdleTime(uint16_t max)
- {
- s_maxIdleTime = max;
- }
-
- std::shared_ptr<T> getConnectionToDownstream(std::unique_ptr<FDMultiplexer>& mplexer, const std::shared_ptr<DownstreamState>& ds, const struct timeval& now, std::string&& proxyProtocolPayload)
- {
- struct timeval freshCutOff = now;
- freshCutOff.tv_sec -= 1;
-
- auto backendId = ds->getID();
-
- cleanupClosedConnections(now);
-
- const bool haveProxyProtocol = ds->d_config.useProxyProtocol || !proxyProtocolPayload.empty();
- if (!haveProxyProtocol) {
- const auto& it = d_downstreamConnections.find(backendId);
- if (it != d_downstreamConnections.end()) {
- /* first scan idle connections, more recent first */
- auto entry = findUsableConnectionInList(now, freshCutOff, it->second.d_idles, true);
- if (entry) {
- ++ds->tcpReusedConnections;
- it->second.d_actives.insert(entry);
- return entry;
- }
-
- /* then scan actives ones, more recent first as well */
- entry = findUsableConnectionInList(now, freshCutOff, it->second.d_actives, false);
- if (entry) {
- ++ds->tcpReusedConnections;
- return entry;
- }
- }
- }
-
- auto newConnection = std::make_shared<T>(ds, mplexer, now, std::move(proxyProtocolPayload));
- if (!haveProxyProtocol) {
- auto& list = d_downstreamConnections[backendId].d_actives;
- list.template get<SequencedTag>().push_front(newConnection);
- }
-
- return newConnection;
- }
-
- void cleanupClosedConnections(const struct timeval& now)
- {
- if (s_cleanupInterval == 0 || (d_nextCleanup != 0 && d_nextCleanup > now.tv_sec)) {
- return;
- }
-
- d_nextCleanup = now.tv_sec + s_cleanupInterval;
-
- struct timeval freshCutOff = now;
- freshCutOff.tv_sec -= 1;
- struct timeval idleCutOff = now;
- idleCutOff.tv_sec -= s_maxIdleTime;
-
- for (auto dsIt = d_downstreamConnections.begin(); dsIt != d_downstreamConnections.end(); ) {
- cleanUpList(dsIt->second.d_idles, now, freshCutOff, idleCutOff);
- cleanUpList(dsIt->second.d_actives, now, freshCutOff, idleCutOff);
-
- if (dsIt->second.d_idles.empty() && dsIt->second.d_actives.empty()) {
- dsIt = d_downstreamConnections.erase(dsIt);
- }
- else {
- ++dsIt;
- }
- }
- }
-
- size_t clear()
- {
- size_t count = 0;
- for (const auto& downstream : d_downstreamConnections) {
- count += downstream.second.d_actives.size();
- for (auto& conn : downstream.second.d_actives) {
- conn->stopIO();
- }
- count += downstream.second.d_idles.size();
- for (auto& conn : downstream.second.d_idles) {
- conn->stopIO();
- }
- }
-
- d_downstreamConnections.clear();
- return count;
- }
-
- size_t count() const
- {
- return getActiveCount() + getIdleCount();
- }
-
- size_t getActiveCount() const
- {
- size_t count = 0;
- for (const auto& downstream : d_downstreamConnections) {
- count += downstream.second.d_actives.size();
- }
- return count;
- }
-
- size_t getIdleCount() const
- {
- size_t count = 0;
- for (const auto& downstream : d_downstreamConnections) {
- count += downstream.second.d_idles.size();
- }
- return count;
- }
-
- bool removeDownstreamConnection(std::shared_ptr<T>& conn)
- {
- auto backendIt = d_downstreamConnections.find(conn->getDS()->getID());
- if (backendIt == d_downstreamConnections.end()) {
- return false;
- }
-
- /* idle list first */
- {
- auto it = backendIt->second.d_idles.find(conn);
- if (it != backendIt->second.d_idles.end()) {
- backendIt->second.d_idles.erase(it);
- return true;
- }
- }
- /* then active */
- {
- auto it = backendIt->second.d_actives.find(conn);
- if (it != backendIt->second.d_actives.end()) {
- backendIt->second.d_actives.erase(it);
- return true;
- }
- }
-
- return false;
- }
-
- bool moveToIdle(std::shared_ptr<T>& conn)
- {
- auto backendIt = d_downstreamConnections.find(conn->getDS()->getID());
- if (backendIt == d_downstreamConnections.end()) {
- return false;
- }
-
- auto it = backendIt->second.d_actives.find(conn);
- if (it == backendIt->second.d_actives.end()) {
- return false;
- }
-
- backendIt->second.d_actives.erase(it);
-
- if (backendIt->second.d_idles.size() >= s_maxIdleConnectionsPerDownstream) {
- backendIt->second.d_idles.template get<SequencedTag>().pop_back();
- }
-
- backendIt->second.d_idles.template get<SequencedTag>().push_front(conn);
- return true;
- }
-
-protected:
-
- void cleanUpList(list_t& list, const struct timeval& now, const struct timeval& freshCutOff, const struct timeval& idleCutOff)
- {
- auto& sidx = list.template get<SequencedTag>();
- for (auto connIt = sidx.begin(); connIt != sidx.end(); ) {
- if (!(*connIt)) {
- connIt = sidx.erase(connIt);
- continue;
- }
-
- auto& entry = *connIt;
-
- /* don't bother checking freshly used connections */
- if (freshCutOff < entry->getLastDataReceivedTime()) {
- ++connIt;
- continue;
- }
-
- if (entry->isIdle() && entry->getLastDataReceivedTime() < idleCutOff) {
- /* idle for too long */
- connIt = sidx.erase(connIt);
- continue;
- }
-
- if (entry->isUsable()) {
- ++connIt;
- continue;
- }
-
- connIt = sidx.erase(connIt);
- }
- }
-
- std::shared_ptr<T> findUsableConnectionInList(const struct timeval& now, const struct timeval& freshCutOff, list_t& list, bool removeIfFound)
- {
- auto& sidx = list.template get<SequencedTag>();
- for (auto listIt = sidx.begin(); listIt != sidx.end(); ) {
- if (!(*listIt)) {
- listIt = sidx.erase(listIt);
- continue;
- }
-
- auto& entry = *listIt;
- if (isConnectionUsable(entry, now, freshCutOff)) {
- entry->setReused();
- // make a copy since the iterator will be invalidated after erasing
- auto result = entry;
- if (removeIfFound) {
- sidx.erase(listIt);
- }
- return result;
- }
-
- if (entry->willBeReusable(false)) {
- ++listIt;
- continue;
- }
-
- /* that connection will not be usable later, no need to keep it in that list */
- listIt = sidx.erase(listIt);
- }
-
- return nullptr;
- }
-
- bool isConnectionUsable(const std::shared_ptr<T>& conn, const struct timeval& now, const struct timeval& freshCutOff)
- {
- if (!conn->canBeReused()) {
- return false;
- }
-
- /* for connections that have not been used very recently,
- check whether they have been closed in the meantime */
- if (freshCutOff < conn->getLastDataReceivedTime()) {
- /* used recently enough, skip the check */
- return true;
- }
-
- return conn->isUsable();
- }
-
- static size_t s_maxIdleConnectionsPerDownstream;
- static uint16_t s_cleanupInterval;
- static uint16_t s_maxIdleTime;
-
- std::map<boost::uuids::uuid, ConnectionLists> d_downstreamConnections;
-
- time_t d_nextCleanup{0};
-};
-
-template <class T> size_t DownstreamConnectionsManager<T>::s_maxIdleConnectionsPerDownstream{10};
-template <class T> uint16_t DownstreamConnectionsManager<T>::s_cleanupInterval{60};
-template <class T> uint16_t DownstreamConnectionsManager<T>::s_maxIdleTime{300};
-
-using DownstreamTCPConnectionsManager = DownstreamConnectionsManager<TCPConnectionToBackend>;
-extern thread_local DownstreamTCPConnectionsManager t_downstreamTCPConnectionsManager;
+void setTCPDownstreamMaxIdleConnectionsPerBackend(uint64_t max);
+void setTCPDownstreamCleanupInterval(uint64_t interval);
+void setTCPDownstreamMaxIdleTime(uint64_t max);
#include <boost/test/unit_test.hpp>
#include "dnsdist-tcp-downstream.hh"
+#include "dnsdist-downstream-connection.hh"
class MockupConnection
{