});
luaCtx.writeFunction("setMaxCachedTCPConnectionsPerDownstream", [](size_t max) {
- DownstreamConnectionsManager::setMaxCachedConnectionsPerDownstream(max);
+ DownstreamTCPConnectionsManager::setMaxCachedConnectionsPerDownstream(max);
});
luaCtx.writeFunction("setMaxCachedDoHConnectionsPerDownstream", [](size_t max) {
luaCtx.writeFunction("setTCPDownstreamCleanupInterval", [](uint64_t interval) {
setLuaSideEffect();
checkParameterBound("setTCPDownstreamCleanupInterval", interval);
- DownstreamConnectionsManager::setCleanupInterval(interval);
+ DownstreamTCPConnectionsManager::setCleanupInterval(interval);
});
luaCtx.writeFunction("setDoHDownstreamCleanupInterval", [](uint64_t interval) {
luaCtx.writeFunction("setTCPDownstreamMaxIdleTime", [](uint64_t max) {
setLuaSideEffect();
checkParameterBound("setTCPDownstreamMaxIdleTime", max);
- DownstreamConnectionsManager::setMaxIdleTime(max);
+ DownstreamTCPConnectionsManager::setMaxIdleTime(max);
});
luaCtx.writeFunction("setDoHDownstreamMaxIdleTime", [](uint64_t max) {
size_t IncomingTCPConnectionState::clearAllDownstreamConnections()
{
- return DownstreamConnectionsManager::clear();
+ return t_downstreamTCPConnectionsManager.clear();
}
std::shared_ptr<TCPConnectionToBackend> IncomingTCPConnectionState::getDownstreamConnection(std::shared_ptr<DownstreamState>& ds, const std::unique_ptr<std::vector<ProxyProtocolValue>>& tlvs, const struct timeval& now)
if (!downstream) {
/* we don't have a connection to this backend owned yet, let's get one (it might not be a fresh one, though) */
- downstream = DownstreamConnectionsManager::getConnectionToDownstream(d_threadData.mplexer, ds, now);
+ downstream = t_downstreamTCPConnectionsManager.getConnectionToDownstream(d_threadData.mplexer, ds, now, std::string());
if (ds->useProxyProtocol) {
registerOwnedDownstreamConnection(downstream);
}
tmp = nullptr;
try {
- auto downstream = DownstreamConnectionsManager::getConnectionToDownstream(threadData->mplexer, downstreamServer, now);
+ auto downstream = t_downstreamTCPConnectionsManager.getConnectionToDownstream(threadData->mplexer, downstreamServer, now, std::string());
prependSizeToTCPQuery(query.d_buffer, proxyProtocolPayloadSize);
downstream->queueQuery(tqs, std::move(query));
data.mplexer->run(&now);
try {
- DownstreamConnectionsManager::cleanupClosedTCPConnections(now);
+ t_downstreamTCPConnectionsManager.cleanupClosedConnections(now);
if (now.tv_sec > lastTimeoutScan) {
lastTimeoutScan = now.tv_sec;
class DoHConnectionToBackend : public ConnectionToBackend
{
public:
- DoHConnectionToBackend(std::shared_ptr<DownstreamState> ds, std::unique_ptr<FDMultiplexer>& mplexer, const struct timeval& now, std::string&& proxyProtocolPayload);
+ DoHConnectionToBackend(const std::shared_ptr<DownstreamState>& ds, std::unique_ptr<FDMultiplexer>& mplexer, const struct timeval& now, std::string&& proxyProtocolPayload);
void handleTimeout(const struct timeval& now, bool write) override;
void queueQuery(std::shared_ptr<TCPQuerySender>& sender, TCPQuery&& query) override;
d_healthCheckQuery = h;
}
- void stopIO();
+ void stopIO() override;
bool reachedMaxConcurrentQueries() const override;
bool reachedMaxStreamID() const override;
bool isIdle() const override;
bool d_firstWrite{true};
};
-class DownstreamDoHConnectionsManager
-{
-public:
- static std::shared_ptr<DoHConnectionToBackend> getConnectionToDownstream(std::unique_ptr<FDMultiplexer>& mplexer, const std::shared_ptr<DownstreamState>& ds, const struct timeval& now, std::string&& proxyProtocolPayload);
- static void releaseDownstreamConnection(std::shared_ptr<DoHConnectionToBackend>&& conn);
- static bool removeDownstreamConnection(std::shared_ptr<DoHConnectionToBackend>& conn);
- static void cleanupClosedConnections(struct timeval now);
- static size_t clear();
-
- static void setMaxCachedConnectionsPerDownstream(size_t max)
- {
- s_maxCachedConnectionsPerDownstream = max;
- }
-
- static void setCleanupInterval(uint16_t interval)
- {
- s_cleanupInterval = interval;
- }
-
- static void setMaxIdleTime(uint16_t max)
- {
- s_maxIdleTime = max;
- }
-
-private:
- static thread_local map<boost::uuids::uuid, std::deque<std::shared_ptr<DoHConnectionToBackend>>> t_downstreamConnections;
- static thread_local time_t t_nextCleanup;
- static size_t s_maxCachedConnectionsPerDownstream;
- static uint16_t s_cleanupInterval;
- static uint16_t s_maxIdleTime;
-};
+using DownstreamDoHConnectionsManager = DownstreamConnectionsManager<DoHConnectionToBackend>;
+thread_local DownstreamDoHConnectionsManager t_downstreamDoHConnectionsManager;
uint32_t DoHConnectionToBackend::getConcurrentStreamsCount() const
{
/* remove ourselves from the connection cache, this might mean that our
reference count drops to zero after that, so we need to be careful */
auto shared = std::dynamic_pointer_cast<DoHConnectionToBackend>(shared_from_this());
- DownstreamDoHConnectionsManager::removeDownstreamConnection(shared);
+ t_downstreamDoHConnectionsManager.removeDownstreamConnection(shared);
}
}
if (request.d_query.d_downstreamFailures < conn->d_ds->d_retries) {
// cerr<<"in "<<__PRETTY_FUNCTION__<<", looking for a connection to send a query of size "<<request.d_query.d_buffer.size()<<endl;
++request.d_query.d_downstreamFailures;
- auto downstream = DownstreamDoHConnectionsManager::getConnectionToDownstream(conn->d_mplexer, conn->d_ds, now, std::string(conn->d_proxyProtocolPayload));
+ auto downstream = t_downstreamDoHConnectionsManager.getConnectionToDownstream(conn->d_mplexer, conn->d_ds, now, std::string(conn->d_proxyProtocolPayload));
downstream->queueQuery(request.d_sender, std::move(request.d_query));
}
else {
return 0;
}
-DoHConnectionToBackend::DoHConnectionToBackend(std::shared_ptr<DownstreamState> ds, std::unique_ptr<FDMultiplexer>& mplexer, const struct timeval& now, std::string&& proxyProtocolPayload) :
+DoHConnectionToBackend::DoHConnectionToBackend(const std::shared_ptr<DownstreamState>& ds, std::unique_ptr<FDMultiplexer>& mplexer, const struct timeval& now, std::string&& proxyProtocolPayload) :
ConnectionToBackend(ds, mplexer, now), d_proxyProtocolPayload(std::move(proxyProtocolPayload))
{
// inherit most of the stuff from the ConnectionToBackend()
}
}
-thread_local map<boost::uuids::uuid, std::deque<std::shared_ptr<DoHConnectionToBackend>>> DownstreamDoHConnectionsManager::t_downstreamConnections;
-thread_local time_t DownstreamDoHConnectionsManager::t_nextCleanup{0};
-size_t DownstreamDoHConnectionsManager::s_maxCachedConnectionsPerDownstream{10};
-uint16_t DownstreamDoHConnectionsManager::s_cleanupInterval{60};
-uint16_t DownstreamDoHConnectionsManager::s_maxIdleTime{300};
-
-size_t DownstreamDoHConnectionsManager::clear()
-{
- size_t result = 0;
- for (const auto& backend : t_downstreamConnections) {
- result += backend.second.size();
- for (auto& conn : backend.second) {
- conn->stopIO();
- }
- }
- t_downstreamConnections.clear();
- return result;
-}
-
-bool DownstreamDoHConnectionsManager::removeDownstreamConnection(std::shared_ptr<DoHConnectionToBackend>& conn)
-{
- bool found = false;
- auto backendIt = t_downstreamConnections.find(conn->getDS()->getID());
- if (backendIt == t_downstreamConnections.end()) {
- return found;
- }
-
- for (auto connIt = backendIt->second.begin(); connIt != backendIt->second.end(); ++connIt) {
- if (*connIt == conn) {
- backendIt->second.erase(connIt);
- found = true;
- break;
- }
- }
-
- return found;
-}
-
-void DownstreamDoHConnectionsManager::cleanupClosedConnections(struct timeval now)
-{
- //cerr<<"cleanup interval is "<<s_cleanupInterval<<", next cleanup is "<<t_nextCleanup<<", now is "<<now.tv_sec<<endl;
- if (s_cleanupInterval <= 0 || (t_nextCleanup > 0 && t_nextCleanup > now.tv_sec)) {
- return;
- }
-
- t_nextCleanup = now.tv_sec + s_cleanupInterval;
-
- struct timeval freshCutOff = now;
- freshCutOff.tv_sec -= 1;
- struct timeval idleCutOff = now;
- idleCutOff.tv_sec -= s_maxIdleTime;
-
- for (auto dsIt = t_downstreamConnections.begin(); dsIt != t_downstreamConnections.end();) {
- for (auto connIt = dsIt->second.begin(); connIt != dsIt->second.end();) {
- if (!(*connIt)) {
- connIt = dsIt->second.erase(connIt);
- continue;
- }
-
- /* don't bother checking freshly used connections */
- if (freshCutOff < (*connIt)->getLastDataReceivedTime()) {
- ++connIt;
- continue;
- }
-
- if ((*connIt)->isIdle() && (*connIt)->getLastDataReceivedTime() < idleCutOff) {
- /* idle for too long */
- connIt = dsIt->second.erase(connIt);
- continue;
- }
-
- if ((*connIt)->isUsable()) {
- ++connIt;
- continue;
- }
-
- connIt = dsIt->second.erase(connIt);
- }
-
- if (!dsIt->second.empty()) {
- ++dsIt;
- }
- else {
- dsIt = t_downstreamConnections.erase(dsIt);
- }
- }
-}
-
-std::shared_ptr<DoHConnectionToBackend> DownstreamDoHConnectionsManager::getConnectionToDownstream(std::unique_ptr<FDMultiplexer>& mplexer, const std::shared_ptr<DownstreamState>& ds, const struct timeval& now, std::string&& proxyProtocolPayload)
-{
- std::shared_ptr<DoHConnectionToBackend> result;
- struct timeval freshCutOff = now;
- freshCutOff.tv_sec -= 1;
-
- auto backendId = ds->getID();
-
- cleanupClosedConnections(now);
-
- const bool haveProxyProtocol = !proxyProtocolPayload.empty();
- if (!haveProxyProtocol) {
- //cerr<<"looking for existing connection"<<endl;
- const auto& it = t_downstreamConnections.find(backendId);
- if (it != t_downstreamConnections.end()) {
- auto& list = it->second;
- for (auto listIt = list.begin(); listIt != list.end();) {
- auto& entry = *listIt;
- if (!entry->canBeReused()) {
- if (!entry->willBeReusable(false)) {
- listIt = list.erase(listIt);
- }
- else {
- ++listIt;
- }
- continue;
- }
- entry->setReused();
- /* for connections that have not been used very recently,
- check whether they have been closed in the meantime */
- if (freshCutOff < entry->getLastDataReceivedTime()) {
- /* used recently enough, skip the check */
- ++ds->tcpReusedConnections;
- return entry;
- }
-
- if (isTCPSocketUsable(entry->getHandle())) {
- ++ds->tcpReusedConnections;
- return entry;
- }
-
- listIt = list.erase(listIt);
- }
- }
- }
-
- auto newConnection = std::make_shared<DoHConnectionToBackend>(ds, mplexer, now, std::move(proxyProtocolPayload));
- if (!haveProxyProtocol) {
- t_downstreamConnections[backendId].push_front(newConnection);
- }
- return newConnection;
-}
-
static void handleCrossProtocolQuery(int pipefd, FDMultiplexer::funcparam_t& param)
{
auto threadData = boost::any_cast<DoHClientThreadData*>(param);
tmp = nullptr;
try {
- auto downstream = DownstreamDoHConnectionsManager::getConnectionToDownstream(threadData->mplexer, downstreamServer, now, std::move(query.d_proxyProtocolPayload));
+ auto downstream = t_downstreamDoHConnectionsManager.getConnectionToDownstream(threadData->mplexer, downstreamServer, now, std::move(query.d_proxyProtocolPayload));
downstream->queueQuery(tqs, std::move(query));
}
catch (...) {
lastTimeoutScan = now.tv_sec;
try {
- DownstreamDoHConnectionsManager::cleanupClosedConnections(now);
+ t_downstreamDoHConnectionsManager.cleanupClosedConnections(now);
handleH2Timeouts(*data.mplexer, now);
if (g_dohStatesDumpRequested > 0) {
newConnection->queueQuery(sender, std::move(query));
}
else {
- auto connection = DownstreamDoHConnectionsManager::getConnectionToDownstream(mplexer, ds, now, std::move(query.d_proxyProtocolPayload));
+ auto connection = t_downstreamDoHConnectionsManager.getConnectionToDownstream(mplexer, ds, now, std::move(query.d_proxyProtocolPayload));
connection->queueQuery(sender, std::move(query));
}
{
size_t cleared = 0;
#ifdef HAVE_NGHTTP2
- cleared = DownstreamDoHConnectionsManager::clear();
+ cleared = t_downstreamDoHConnectionsManager.clear();
#endif /* HAVE_NGHTTP2 */
return cleared;
}
#include "dnsparser.hh"
+thread_local DownstreamTCPConnectionsManager t_downstreamTCPConnectionsManager;
+
ConnectionToBackend::~ConnectionToBackend()
{
if (d_ds && d_handler) {
}
auto raw = unknownContent->getRawContent();
auto serial = getSerialFromRawSOAContent(raw);
-
++query.d_xfrSerialCount;
if (query.d_xfrMasterSerial == 0) {
// store the first SOA in our client's connection metadata
}
return done;
}
-
-std::shared_ptr<TCPConnectionToBackend> DownstreamConnectionsManager::getConnectionToDownstream(std::unique_ptr<FDMultiplexer>& mplexer, std::shared_ptr<DownstreamState>& ds, const struct timeval& now)
-{
- struct timeval freshCutOff = now;
- freshCutOff.tv_sec -= 1;
-
- auto backendId = ds->getID();
-
- cleanupClosedTCPConnections(now);
-
- {
- const auto& it = t_downstreamConnections.find(backendId);
- if (it != t_downstreamConnections.end()) {
- auto& list = it->second;
- for (auto listIt = list.begin(); listIt != list.end(); ) {
- auto& entry = *listIt;
- if (!entry->canBeReused()) {
- if (!entry->willBeReusable(false)) {
- listIt = list.erase(listIt);
- }
- else {
- ++listIt;
- }
- continue;
- }
-
- entry->setReused();
- /* for connections that have not been used very recently,
- check whether they have been closed in the meantime */
- if (freshCutOff < entry->getLastDataReceivedTime()) {
- /* used recently enough, skip the check */
- ++ds->tcpReusedConnections;
- return entry;
- }
-
- if (entry->isUsable()) {
- ++ds->tcpReusedConnections;
- return entry;
- }
-
- listIt = list.erase(listIt);
- }
- }
- }
-
- auto newConnection = std::make_shared<TCPConnectionToBackend>(ds, mplexer, now);
- if (!ds->useProxyProtocol) {
- t_downstreamConnections[backendId].push_front(newConnection);
- }
- return newConnection;
-}
-
-void DownstreamConnectionsManager::cleanupClosedTCPConnections(struct timeval now)
-{
- if (s_cleanupInterval == 0 || (t_nextCleanup != 0 && t_nextCleanup > now.tv_sec)) {
- return;
- }
-
- t_nextCleanup = now.tv_sec + s_cleanupInterval;
-
- struct timeval freshCutOff = now;
- freshCutOff.tv_sec -= 1;
- struct timeval idleCutOff = now;
- idleCutOff.tv_sec -= s_maxIdleTime;
-
- for (auto dsIt = t_downstreamConnections.begin(); dsIt != t_downstreamConnections.end(); ) {
- for (auto connIt = dsIt->second.begin(); connIt != dsIt->second.end(); ) {
- if (!(*connIt)) {
- ++connIt;
- continue;
- }
-
- /* don't bother checking freshly used connections */
- if (freshCutOff < (*connIt)->getLastDataReceivedTime()) {
- ++connIt;
- continue;
- }
-
- if ((*connIt)->isIdle() && (*connIt)->getLastDataReceivedTime() < idleCutOff) {
- /* idle for too long */
- connIt = dsIt->second.erase(connIt);
- continue;
- }
-
- if ((*connIt)->isUsable()) {
- ++connIt;
- continue;
- }
-
- connIt = dsIt->second.erase(connIt);
- }
-
- if (!dsIt->second.empty()) {
- ++dsIt;
- }
- else {
- dsIt = t_downstreamConnections.erase(dsIt);
- }
- }
-}
-
-size_t DownstreamConnectionsManager::clear()
-{
- size_t count = 0;
- for (const auto& downstream : t_downstreamConnections) {
- count += downstream.second.size();
- }
-
- t_downstreamConnections.clear();
-
- return count;
-}
-
-thread_local map<boost::uuids::uuid, std::deque<std::shared_ptr<TCPConnectionToBackend>>> DownstreamConnectionsManager::t_downstreamConnections;
-thread_local time_t DownstreamConnectionsManager::t_nextCleanup{0};
-size_t DownstreamConnectionsManager::s_maxCachedConnectionsPerDownstream{10};
-uint16_t DownstreamConnectionsManager::s_cleanupInterval{60};
-uint16_t DownstreamConnectionsManager::s_maxIdleTime{300};
class ConnectionToBackend : public std::enable_shared_from_this<ConnectionToBackend>
{
public:
- ConnectionToBackend(std::shared_ptr<DownstreamState>& ds, std::unique_ptr<FDMultiplexer>& mplexer, const struct timeval& now): d_connectionStartTime(now), d_lastDataReceivedTime(now), d_ds(ds), d_mplexer(mplexer), d_enableFastOpen(ds->tcpFastOpen)
+ ConnectionToBackend(const std::shared_ptr<DownstreamState>& ds, std::unique_ptr<FDMultiplexer>& mplexer, const struct timeval& now): d_connectionStartTime(now), d_lastDataReceivedTime(now), d_ds(ds), d_mplexer(mplexer), d_enableFastOpen(ds->tcpFastOpen)
{
reconnect();
}
virtual bool reachedMaxConcurrentQueries() const = 0;
virtual bool isIdle() const = 0;
virtual void release() = 0;
+ virtual void stopIO()
+ {
+ }
bool matches(const std::shared_ptr<DownstreamState>& ds) const
{
struct timeval d_connectionStartTime;
struct timeval d_lastDataReceivedTime;
- std::shared_ptr<DownstreamState> d_ds{nullptr};
+ const std::shared_ptr<DownstreamState> d_ds{nullptr};
std::shared_ptr<TCPQuerySender> d_sender{nullptr};
std::unique_ptr<FDMultiplexer>& d_mplexer;
std::unique_ptr<TCPIOHandler> d_handler{nullptr};
class TCPConnectionToBackend : public ConnectionToBackend
{
public:
- TCPConnectionToBackend(std::shared_ptr<DownstreamState>& ds, std::unique_ptr<FDMultiplexer>& mplexer, const struct timeval& now): ConnectionToBackend(ds, mplexer, now), d_responseBuffer(s_maxPacketCacheEntrySize)
+ TCPConnectionToBackend(const std::shared_ptr<DownstreamState>& ds, std::unique_ptr<FDMultiplexer>& mplexer, const struct timeval& now, std::string&& /* proxyProtocolPayload*, unused but there to match the HTTP2 connections, so we can use the same templated connections manager class */): ConnectionToBackend(ds, mplexer, now), d_responseBuffer(s_maxPacketCacheEntrySize)
{
}
State d_state{State::idle};
};
-class DownstreamConnectionsManager
+template <class T> class DownstreamConnectionsManager
{
public:
- static std::shared_ptr<TCPConnectionToBackend> getConnectionToDownstream(std::unique_ptr<FDMultiplexer>& mplexer, std::shared_ptr<DownstreamState>& ds, const struct timeval& now);
- static void cleanupClosedTCPConnections(struct timeval now);
- static size_t clear();
-
static void setMaxCachedConnectionsPerDownstream(size_t max)
{
s_maxCachedConnectionsPerDownstream = max;
s_maxIdleTime = max;
}
-private:
- static thread_local map<boost::uuids::uuid, std::deque<std::shared_ptr<TCPConnectionToBackend>>> t_downstreamConnections;
- static thread_local time_t t_nextCleanup;
+ bool isConnectionUsable(const std::shared_ptr<T>& conn, const struct timeval& now, const struct timeval& freshCutOff)
+ {
+ if (!conn->canBeReused()) {
+ return false;
+ }
+
+ /* for connections that have not been used very recently,
+ check whether they have been closed in the meantime */
+ if (freshCutOff < conn->getLastDataReceivedTime()) {
+ /* used recently enough, skip the check */
+ return true;
+ }
+
+ if (conn->isUsable()) {
+ return true;
+ }
+
+ return false;
+ }
+
+ std::shared_ptr<T> getConnectionToDownstream(std::unique_ptr<FDMultiplexer>& mplexer, const std::shared_ptr<DownstreamState>& ds, const struct timeval& now, std::string&& proxyProtocolPayload)
+ {
+ struct timeval freshCutOff = now;
+ freshCutOff.tv_sec -= 1;
+
+ auto backendId = ds->getID();
+
+ cleanupClosedConnections(now);
+
+ const bool haveProxyProtocol = ds->useProxyProtocol || !proxyProtocolPayload.empty();
+ if (!haveProxyProtocol) {
+ const auto& it = d_downstreamConnections.find(backendId);
+ if (it != d_downstreamConnections.end()) {
+ auto& list = it->second;
+ for (auto listIt = list.begin(); listIt != list.end(); ) {
+ if (!(*listIt)) {
+ listIt = list.erase(listIt);
+ continue;
+ }
+
+ auto& entry = *listIt;
+ if (isConnectionUsable(entry, now, freshCutOff)) {
+ entry->setReused();
+ ++ds->tcpReusedConnections;
+ return entry;
+ }
+
+ if (entry->willBeReusable(false)) {
+ ++listIt;
+ continue;
+ }
+
+ listIt = list.erase(listIt);
+ }
+ }
+ }
+
+ auto newConnection = std::make_shared<T>(ds, mplexer, now, std::move(proxyProtocolPayload));
+ if (!haveProxyProtocol) {
+ d_downstreamConnections[backendId].push_front(newConnection);
+ }
+
+ return newConnection;
+ }
+
+ void cleanupClosedConnections(struct timeval now)
+ {
+ if (s_cleanupInterval == 0 || (d_nextCleanup != 0 && d_nextCleanup > now.tv_sec)) {
+ return;
+ }
+
+ d_nextCleanup = now.tv_sec + s_cleanupInterval;
+
+ struct timeval freshCutOff = now;
+ freshCutOff.tv_sec -= 1;
+ struct timeval idleCutOff = now;
+ idleCutOff.tv_sec -= s_maxIdleTime;
+
+ for (auto dsIt = d_downstreamConnections.begin(); dsIt != d_downstreamConnections.end(); ) {
+ for (auto connIt = dsIt->second.begin(); connIt != dsIt->second.end(); ) {
+ if (!(*connIt)) {
+ connIt = dsIt->second.erase(connIt);
+ continue;
+ }
+
+ auto& entry = *connIt;
+
+ /* don't bother checking freshly used connections */
+ if (freshCutOff < entry->getLastDataReceivedTime()) {
+ ++connIt;
+ continue;
+ }
+
+ if (entry->isIdle() && entry->getLastDataReceivedTime() < idleCutOff) {
+ /* idle for too long */
+ connIt = dsIt->second.erase(connIt);
+ continue;
+ }
+
+ if (entry->isUsable()) {
+ ++connIt;
+ continue;
+ }
+
+ connIt = dsIt->second.erase(connIt);
+ }
+
+ if (!dsIt->second.empty()) {
+ ++dsIt;
+ }
+ else {
+ dsIt = d_downstreamConnections.erase(dsIt);
+ }
+ }
+ }
+
+ size_t clear()
+ {
+ size_t count = 0;
+ for (const auto& downstream : d_downstreamConnections) {
+ count += downstream.second.size();
+ for (auto& conn : downstream.second) {
+ conn->stopIO();
+ }
+ }
+
+ d_downstreamConnections.clear();
+ return count;
+ }
+
+ bool removeDownstreamConnection(std::shared_ptr<T>& conn)
+ {
+ bool found = false;
+ auto backendIt = d_downstreamConnections.find(conn->getDS()->getID());
+ if (backendIt == d_downstreamConnections.end()) {
+ return found;
+ }
+
+ for (auto connIt = backendIt->second.begin(); connIt != backendIt->second.end(); ++connIt) {
+ if (*connIt == conn) {
+ backendIt->second.erase(connIt);
+ found = true;
+ break;
+ }
+ }
+
+ return found;
+ }
+
+protected:
+
static size_t s_maxCachedConnectionsPerDownstream;
static uint16_t s_cleanupInterval;
static uint16_t s_maxIdleTime;
+
+ std::map<boost::uuids::uuid, std::deque<std::shared_ptr<T>>> d_downstreamConnections;
+ time_t d_nextCleanup{0};
};
+
+template <class T> size_t DownstreamConnectionsManager<T>::s_maxCachedConnectionsPerDownstream{10};
+template <class T> uint16_t DownstreamConnectionsManager<T>::s_cleanupInterval{60};
+template <class T> uint16_t DownstreamConnectionsManager<T>::s_maxIdleTime{300};
+
+using DownstreamTCPConnectionsManager = DownstreamConnectionsManager<TCPConnectionToBackend>;
+extern thread_local DownstreamTCPConnectionsManager t_downstreamTCPConnectionsManager;