From 8daba80b44a79c58c7ddcc1149b72dc6eaa53a8c Mon Sep 17 00:00:00 2001 From: Otto Date: Mon, 26 Jul 2021 13:13:13 +0200 Subject: [PATCH] TCP/DoT connection pooling --- pdns/lwres.cc | 155 ++++++++++++++++++++------------ pdns/pdns_recursor.cc | 20 ++++- pdns/rec_channel_rec.cc | 3 + pdns/recursordist/Makefile.am | 1 + pdns/recursordist/rec-tcpout.cc | 80 +++++++++++++++++ pdns/recursordist/rec-tcpout.hh | 76 ++++++++++++++++ 6 files changed, 273 insertions(+), 62 deletions(-) create mode 100644 pdns/recursordist/rec-tcpout.cc create mode 100644 pdns/recursordist/rec-tcpout.hh diff --git a/pdns/lwres.cc b/pdns/lwres.cc index 1441a39edb..601586471f 100644 --- a/pdns/lwres.cc +++ b/pdns/lwres.cc @@ -57,6 +57,9 @@ #include "dnstap.hh" #include "fstrm_logger.hh" +#include "rec-tcpout.hh" + +thread_local TCPOutConnectionManager t_tcp_manager; bool g_syslog; @@ -232,7 +235,7 @@ static void logIncomingResponse(const std::shared_ptr& srcmask, boost::optional context, const std::shared_ptr>>& outgoingLoggers, const std::shared_ptr>>& fstrmLoggers, const std::set& exportTypes, LWResult *lwr, bool* chained) +static LWResult::Result asyncresolve1(const ComboAddress& ip, const DNSName& domain, int type, bool doTCP, bool sendRDQuery, int EDNS0Level, struct timeval* now, boost::optional& srcmask, boost::optional context, const std::shared_ptr>>& outgoingLoggers, const std::shared_ptr>>& fstrmLoggers, const std::set& exportTypes, LWResult *lwr, bool* chained, TCPOutConnectionManager::Connection& connection) { size_t len; size_t bufsize=g_outgoingEDNSBufsize; @@ -343,70 +346,88 @@ LWResult::Result asyncresolve(const ComboAddress& ip, const DNSName& domain, int } else { try { - const struct timeval timeout{ g_networkTimeoutMsec / 1000, static_cast(g_networkTimeoutMsec) % 1000 * 1000}; - - Socket s(ip.sin4.sin_family, SOCK_STREAM); - s.setNonBlocking(); - localip = pdns::getQueryLocalAddress(ip.sin4.sin_family, 0); - s.bind(localip); - - std::shared_ptr tlsCtx{nullptr}; - if (SyncRes::s_dot_to_port_853 && ip.getPort() == 853) { - TLSContextParameters tlsParams; - tlsParams.d_provider = "openssl"; - tlsParams.d_validateCertificates = false; - //tlsParams.d_caStore = caaStore; - tlsCtx = getTLSContext(tlsParams); - if (tlsCtx == nullptr) { - g_log << Logger::Error << "DoT to " << ip << " requested but not available" << endl; + while (true) { + bool isNew = false; + connection = t_tcp_manager.get(ip); + if (!connection.d_handler) { + isNew = true; + const struct timeval timeout{ g_networkTimeoutMsec / 1000, static_cast(g_networkTimeoutMsec) % 1000 * 1000}; + Socket s(ip.sin4.sin_family, SOCK_STREAM); + s.setNonBlocking(); + localip = pdns::getQueryLocalAddress(ip.sin4.sin_family, 0); + s.bind(localip); + + std::shared_ptr tlsCtx{nullptr}; + if (SyncRes::s_dot_to_port_853 && ip.getPort() == 853) { + TLSContextParameters tlsParams; + tlsParams.d_provider = "openssl"; + tlsParams.d_validateCertificates = false; + //tlsParams.d_caStore = caaStore; + tlsCtx = getTLSContext(tlsParams); + if (tlsCtx == nullptr) { + g_log << Logger::Error << "DoT to " << ip << " requested but not available" << endl; + } + else { + dnsOverTLS = true; + } + } + connection.d_handler = std::make_shared("", s.releaseHandle(), timeout, tlsCtx, now->tv_sec); + // Returned state ignored + connection.d_handler->tryConnect(SyncRes::s_tcp_fast_open_connect, ip); } - else { - dnsOverTLS = true; + localip.sin4.sin_family = ip.sin4.sin_family; + socklen_t slen = ip.getSocklen(); + getsockname(connection.d_handler->getDescriptor(), reinterpret_cast(&localip), &slen); + uint16_t tlen = htons(vpacket.size()); + char *lenP = (char*)&tlen; + const char *msgP=(const char*)&*vpacket.begin(); + PacketBuffer packet; + packet.reserve(2 + vpacket.size()); + packet.insert(packet.end(), lenP, lenP+2); + packet.insert(packet.end(), msgP, msgP+vpacket.size()); + ret = asendtcp(packet, connection.d_handler); + if (ret != LWResult::Result::Success) { + if (isNew) { + connection.d_handler->close(); + return ret; + } else { + continue; + } } - } - auto handler = std::make_shared("", s.releaseHandle(), timeout, tlsCtx, now->tv_sec); - // Returned state ignored - handler->tryConnect(SyncRes::s_tcp_fast_open_connect, ip); - - uint16_t tlen=htons(vpacket.size()); - char *lenP=(char*)&tlen; - const char *msgP=(const char*)&*vpacket.begin(); - PacketBuffer packet; - packet.reserve(2 + vpacket.size()); - packet.insert(packet.end(), lenP, lenP+2); - packet.insert(packet.end(), msgP, msgP+vpacket.size()); - ret = asendtcp(packet, handler); - if (ret != LWResult::Result::Success) { - handler->close(); - return ret; - } - #ifdef HAVE_FSTRM - if (fstrmQEnabled) { - logFstreamQuery(fstrmLoggers, queryTime, localip, ip, !dnsOverTLS ? DnstapMessage::ProtocolType::DoTCP : DnstapMessage::ProtocolType::DoT, context ? context->d_auth : boost::none, vpacket); - } + if (fstrmQEnabled) { + logFstreamQuery(fstrmLoggers, queryTime, localip, ip, !dnsOverTLS ? DnstapMessage::ProtocolType::DoTCP : DnstapMessage::ProtocolType::DoT, context ? context->d_auth : boost::none, vpacket); + } #endif /* HAVE_FSTRM */ - ret = arecvtcp(packet, 2, handler, false); - if (ret != LWResult::Result::Success) { - return ret; - } - - memcpy(&tlen, packet.data(), sizeof(tlen)); - len=ntohs(tlen); // switch to the 'len' shared with the rest of the function + ret = arecvtcp(packet, 2, connection.d_handler, false); + if (ret != LWResult::Result::Success) { + if (isNew) { + return ret; + } else { + continue; + } + } - // XXX receive into buf directly? - packet.resize(len); - ret = arecvtcp(packet, len, handler, false); - if (ret != LWResult::Result::Success) { - return ret; + memcpy(&tlen, packet.data(), sizeof(tlen)); + len = ntohs(tlen); // switch to the 'len' shared with the rest of the function + + // XXX receive into buf directly? + packet.resize(len); + ret = arecvtcp(packet, len, connection.d_handler, false); + if (ret != LWResult::Result::Success) { + if (isNew) { + return ret; + } else { + continue; + } + } + buf.resize(len); + memcpy(buf.data(), packet.data(), len); + //handler->close(); + ret = LWResult::Result::Success; + break; } - - buf.resize(len); - memcpy(buf.data(), packet.data(), len); - - handler->close(); - ret = LWResult::Result::Success; } catch (const NetworkError& ne) { ret = LWResult::Result::OSLimitError; // OS limits error @@ -491,7 +512,7 @@ LWResult::Result asyncresolve(const ComboAddress& ip, const DNSName& domain, int if(outgoingLoggers) { logIncomingResponse(outgoingLoggers, context ? context->d_initialRequestId : boost::none, uuid, ip, domain, type, qid, doTCP, srcmask, len, lwr->d_rcode, lwr->d_records, queryTime, exportTypes); } - + lwr->d_validpacket = true; return LWResult::Result::Success; } @@ -524,3 +545,19 @@ LWResult::Result asyncresolve(const ComboAddress& ip, const DNSName& domain, int return LWResult::Result::PermanentError; } +LWResult::Result asyncresolve(const ComboAddress& ip, const DNSName& domain, int type, bool doTCP, bool sendRDQuery, int EDNS0Level, struct timeval* now, boost::optional& srcmask, boost::optional context, const std::shared_ptr>>& outgoingLoggers, const std::shared_ptr>>& fstrmLoggers, const std::set& exportTypes, LWResult *lwr, bool* chained) +{ + TCPOutConnectionManager::Connection connection; + auto ret = asyncresolve1(ip, domain, type,doTCP, sendRDQuery, EDNS0Level, now, srcmask, context, outgoingLoggers, fstrmLoggers, exportTypes, lwr, chained, connection); + + if (doTCP) { + if (!lwr->d_validpacket) { + ret = asyncresolve1(ip, domain, type,doTCP, sendRDQuery, EDNS0Level, now, srcmask, context, outgoingLoggers, fstrmLoggers, exportTypes, lwr, chained, connection); + } + if (connection.d_handler && lwr->d_validpacket) { + t_tcp_manager.store(ip, connection); + } + } + return ret; +} + diff --git a/pdns/pdns_recursor.cc b/pdns/pdns_recursor.cc index 186e01de1a..b16c9dde1a 100644 --- a/pdns/pdns_recursor.cc +++ b/pdns/pdns_recursor.cc @@ -100,6 +100,7 @@ #include "nod.hh" #endif /* NOD_ENABLED */ #include "query-local-address.hh" +#include "rec-tcpout.hh" #include "rec-snmp.hh" #include "rec-taskqueue.hh" @@ -3662,8 +3663,8 @@ static void doStats(void) <(pleaseGetEDNSStatusesSize)<(pleaseGetConcurrentQueries)<<" queries running, "<(pleaseGetConcurrentQueries)<<" queries running, "<(pleaseGetPacketCacheSize); uint64_t pcHits = broadcastAccFunction(pleaseGetPacketCacheHits); @@ -3734,6 +3735,7 @@ static void houseKeeping(void *) SyncRes::pruneThrottledServers(); SyncRes::pruneNonResolving(now.tv_sec - SyncRes::s_nonresolvingnsthrottletime); Utility::gettimeofday(&last_prune, nullptr); + t_tcp_manager.cleanup(); } if(isHandlerThread()) { @@ -4504,6 +4506,7 @@ static void checkOrFixFDS() { unsigned int availFDs=getFilenumLimit(); unsigned int wantFDs = g_maxMThreads * g_numWorkerThreads +25; // even healthier margin then before + wantFDs += g_numWorkerThreads * TCPOutConnectionManager::maxIdlePerThread; if(wantFDs > availFDs) { unsigned int hardlimit= getFilenumLimit(true); @@ -4512,7 +4515,7 @@ static void checkOrFixFDS() g_log< PrefixDashNumberCompare::prefixAndTrailingNum(const std::string& a) { @@ -1423,6 +1424,8 @@ static void registerAllStats1() addGetStat("almost-expired-run", []() { return getAlmostExpiredTasksRun(); }); addGetStat("almost-expired-exceptions", []() { return getAlmostExpiredTaskExceptions(); }); + addGetStat("idle-tcpout-connections", getCurrentIdleTCPConnections); + /* make sure that the ECS stats are properly initialized */ SyncRes::clearECSStats(); for (size_t idx = 0; idx < SyncRes::s_ecsResponsesBySubnetSize4.size(); idx++) { diff --git a/pdns/recursordist/Makefile.am b/pdns/recursordist/Makefile.am index 7bc9ec6fb9..d8d39cfa1f 100644 --- a/pdns/recursordist/Makefile.am +++ b/pdns/recursordist/Makefile.am @@ -163,6 +163,7 @@ pdns_recursor_SOURCES = \ rec-protozero.cc rec-protozero.hh \ rec-snmp.hh rec-snmp.cc \ rec-taskqueue.cc rec-taskqueue.hh \ + rec-tcpout.cc rec_tcpout.hh \ rec_channel.cc rec_channel.hh rec_metrics.hh \ rec_channel_rec.cc \ recpacketcache.cc recpacketcache.hh \ diff --git a/pdns/recursordist/rec-tcpout.cc b/pdns/recursordist/rec-tcpout.cc new file mode 100644 index 0000000000..91a0ef822c --- /dev/null +++ b/pdns/recursordist/rec-tcpout.cc @@ -0,0 +1,80 @@ +/* + * This file is part of PowerDNS or dnsdist. + * Copyright -- PowerDNS.COM B.V. and its contributors + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of version 2 of the GNU General Public License as + * published by the Free Software Foundation. + * + * In addition, for the avoidance of any doubt, permission is granted to + * link this program with OpenSSL and to (re)distribute the binaries + * produced as the result of such linking. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include "rec-tcpout.hh" + +timeval TCPOutConnectionManager::maxIdleTime; +size_t TCPOutConnectionManager::maxQueries; +size_t TCPOutConnectionManager::maxIdlePerAuth; +size_t TCPOutConnectionManager::maxIdlePerThread; + +void TCPOutConnectionManager::cleanup() +{ + if (maxIdleTime.tv_sec == 0 && maxIdleTime.tv_usec == 0) { + // no maximum idle time + return; + } + struct timeval now; + gettimeofday(&now, nullptr); + + for (auto it = d_idle_connections.begin(); it != d_idle_connections.end();) { + timeval idle = now - it->second.d_last_used; + if (maxIdleTime < idle) { + it = d_idle_connections.erase(it); + } + else { + ++it; + } + } +} + +void TCPOutConnectionManager::store(const ComboAddress& ip, Connection& connection) +{ + cleanup(); + if (d_idle_connections.size() >= maxIdlePerThread) { + return; + } + if (d_idle_connections.count(ip) >= maxIdlePerAuth) { + return; + } + + ++connection.d_numqueries; + if (maxQueries > 0 && connection.d_numqueries > maxQueries) { + return; + } + gettimeofday(&connection.d_last_used, nullptr); + d_idle_connections.emplace(ip, connection); +} + +TCPOutConnectionManager::Connection TCPOutConnectionManager::get(const ComboAddress& ip) +{ + if (d_idle_connections.count(ip) > 0) { + auto h = d_idle_connections.extract(ip); + return h.mapped(); + } + return Connection{}; +} + +uint64_t getCurrentIdleTCPConnections() +{ + return broadcastAccFunction([] { return t_tcp_manager.getSize(); }); +} diff --git a/pdns/recursordist/rec-tcpout.hh b/pdns/recursordist/rec-tcpout.hh new file mode 100644 index 0000000000..3b17da1b6c --- /dev/null +++ b/pdns/recursordist/rec-tcpout.hh @@ -0,0 +1,76 @@ +/* + * This file is part of PowerDNS or dnsdist. + * Copyright -- PowerDNS.COM B.V. and its contributors + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of version 2 of the GNU General Public License as + * published by the Free Software Foundation. + * + * In addition, for the avoidance of any doubt, permission is granted to + * link this program with OpenSSL and to (re)distribute the binaries + * produced as the result of such linking. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#pragma once + +#include "iputils.hh" +#include "tcpiohandler.hh" +#include "syncres.hh" + +class TCPOutConnectionManager +{ +public: + // Max idle time for a connection, 0 is no timeout + static struct timeval maxIdleTime; + // Per thread maximum of idle connections for a specific destination, 0 means no idle connections will be kept open + static size_t maxIdlePerAuth; + // Max total number of queries to handle per connection, 0 is no max + static size_t maxQueries; + // Per thread max # of idle connections, here 0 means a real limit + static size_t maxIdlePerThread; + + struct Connection + { + std::string toString() const + { + if (d_handler) { + return std::to_string(d_handler->getDescriptor()) + ' ' + std::to_string(d_handler.use_count()); + } + return ""; + } + + std::shared_ptr d_handler; + timeval d_last_used{0, 0}; + size_t d_numqueries{0}; + }; + + void store(const ComboAddress& ip, Connection& connection); + Connection get(const ComboAddress& ip); + void cleanup(); + + size_t size() const + { + return d_idle_connections.size(); + } + uint64_t* getSize() const + { + return new uint64_t(size()); + } + +private: + + std::multimap d_idle_connections; +}; + +extern thread_local TCPOutConnectionManager t_tcp_manager; +uint64_t getCurrentIdleTCPConnections(); + -- 2.47.2