]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
TCP/DoT connection pooling
authorOtto <otto.moerbeek@open-xchange.com>
Mon, 26 Jul 2021 11:13:13 +0000 (13:13 +0200)
committerOtto <otto.moerbeek@open-xchange.com>
Fri, 24 Sep 2021 07:59:43 +0000 (09:59 +0200)
pdns/lwres.cc
pdns/pdns_recursor.cc
pdns/rec_channel_rec.cc
pdns/recursordist/Makefile.am
pdns/recursordist/rec-tcpout.cc [new file with mode: 0644]
pdns/recursordist/rec-tcpout.hh [new file with mode: 0644]

index 1441a39edbaf28ca86b6b6cc3d0e44c1c8daed91..601586471ffc93c235cdd1c151e64bf1ee460562 100644 (file)
@@ -57,6 +57,9 @@
 #include "dnstap.hh"
 #include "fstrm_logger.hh"
 
+#include "rec-tcpout.hh"
+
+thread_local TCPOutConnectionManager t_tcp_manager;
 
 bool g_syslog;
 
@@ -232,7 +235,7 @@ static void logIncomingResponse(const std::shared_ptr<std::vector<std::unique_pt
 /** lwr is only filled out in case 1 was returned, and even when returning 1 for 'success', lwr might contain DNS errors
     Never throws! 
  */
-LWResult::Result asyncresolve(const ComboAddress& ip, const DNSName& domain, int type, bool doTCP, bool sendRDQuery, int EDNS0Level, struct timeval* now, boost::optional<Netmask>& srcmask, boost::optional<const ResolveContext&> context, const std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>>& outgoingLoggers, const std::shared_ptr<std::vector<std::unique_ptr<FrameStreamLogger>>>& fstrmLoggers, const std::set<uint16_t>& exportTypes, LWResult *lwr, bool* chained)
+static LWResult::Result asyncresolve1(const ComboAddress& ip, const DNSName& domain, int type, bool doTCP, bool sendRDQuery, int EDNS0Level, struct timeval* now, boost::optional<Netmask>& srcmask, boost::optional<const ResolveContext&> context, const std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>>& outgoingLoggers, const std::shared_ptr<std::vector<std::unique_ptr<FrameStreamLogger>>>& fstrmLoggers, const std::set<uint16_t>& exportTypes, LWResult *lwr, bool* chained, TCPOutConnectionManager::Connection& connection)
 {
   size_t len;
   size_t bufsize=g_outgoingEDNSBufsize;
@@ -343,70 +346,88 @@ LWResult::Result asyncresolve(const ComboAddress& ip, const DNSName& domain, int
   }
   else {
     try {
-      const struct timeval timeout{ g_networkTimeoutMsec / 1000, static_cast<suseconds_t>(g_networkTimeoutMsec) % 1000 * 1000};
-
-      Socket s(ip.sin4.sin_family, SOCK_STREAM);
-      s.setNonBlocking();
-      localip = pdns::getQueryLocalAddress(ip.sin4.sin_family, 0);
-      s.bind(localip);
-
-      std::shared_ptr<TLSCtx> tlsCtx{nullptr};
-      if (SyncRes::s_dot_to_port_853 && ip.getPort() == 853) {
-        TLSContextParameters tlsParams;
-        tlsParams.d_provider = "openssl";
-        tlsParams.d_validateCertificates = false;
-        //tlsParams.d_caStore = caaStore;
-        tlsCtx = getTLSContext(tlsParams);
-        if (tlsCtx == nullptr) {
-          g_log << Logger::Error << "DoT to " << ip << " requested but not available" << endl;
+      while (true) {
+        bool isNew = false;
+        connection = t_tcp_manager.get(ip);
+        if (!connection.d_handler) {
+          isNew = true;
+          const struct timeval timeout{ g_networkTimeoutMsec / 1000, static_cast<suseconds_t>(g_networkTimeoutMsec) % 1000 * 1000};
+          Socket s(ip.sin4.sin_family, SOCK_STREAM);
+          s.setNonBlocking();
+          localip = pdns::getQueryLocalAddress(ip.sin4.sin_family, 0);
+          s.bind(localip);
+
+          std::shared_ptr<TLSCtx> tlsCtx{nullptr};
+          if (SyncRes::s_dot_to_port_853 && ip.getPort() == 853) {
+            TLSContextParameters tlsParams;
+            tlsParams.d_provider = "openssl";
+            tlsParams.d_validateCertificates = false;
+            //tlsParams.d_caStore = caaStore;
+            tlsCtx = getTLSContext(tlsParams);
+            if (tlsCtx == nullptr) {
+              g_log << Logger::Error << "DoT to " << ip << " requested but not available" << endl;
+          }
+            else {
+              dnsOverTLS = true;
+            }
+          }
+          connection.d_handler = std::make_shared<TCPIOHandler>("", s.releaseHandle(), timeout, tlsCtx, now->tv_sec);
+          // Returned state ignored
+          connection.d_handler->tryConnect(SyncRes::s_tcp_fast_open_connect, ip);
         }
-        else {
-          dnsOverTLS = true;
+        localip.sin4.sin_family = ip.sin4.sin_family;
+        socklen_t slen = ip.getSocklen();
+        getsockname(connection.d_handler->getDescriptor(), reinterpret_cast<sockaddr*>(&localip), &slen);
+        uint16_t tlen = htons(vpacket.size());
+        char *lenP = (char*)&tlen;
+        const char *msgP=(const char*)&*vpacket.begin();
+        PacketBuffer packet;
+        packet.reserve(2 + vpacket.size());
+        packet.insert(packet.end(), lenP, lenP+2);
+        packet.insert(packet.end(), msgP, msgP+vpacket.size());
+        ret = asendtcp(packet, connection.d_handler);
+        if (ret != LWResult::Result::Success) {
+          if (isNew) {
+            connection.d_handler->close();
+            return ret;
+          } else {
+            continue;
+          }
         }
-      }
-      auto handler = std::make_shared<TCPIOHandler>("", s.releaseHandle(), timeout, tlsCtx, now->tv_sec);
-      // Returned state ignored
-      handler->tryConnect(SyncRes::s_tcp_fast_open_connect, ip);
-
-      uint16_t tlen=htons(vpacket.size());
-      char *lenP=(char*)&tlen;
-      const char *msgP=(const char*)&*vpacket.begin();
-      PacketBuffer packet;
-      packet.reserve(2 + vpacket.size());
-      packet.insert(packet.end(), lenP, lenP+2);
-      packet.insert(packet.end(), msgP, msgP+vpacket.size());
-      ret = asendtcp(packet, handler);
-      if (ret != LWResult::Result::Success) {
-        handler->close();
-        return ret;
-      }
-
 #ifdef HAVE_FSTRM
-      if (fstrmQEnabled) {
-        logFstreamQuery(fstrmLoggers, queryTime, localip, ip, !dnsOverTLS ? DnstapMessage::ProtocolType::DoTCP : DnstapMessage::ProtocolType::DoT, context ? context->d_auth : boost::none, vpacket);
-      }
+        if (fstrmQEnabled) {
+          logFstreamQuery(fstrmLoggers, queryTime, localip, ip, !dnsOverTLS ? DnstapMessage::ProtocolType::DoTCP : DnstapMessage::ProtocolType::DoT, context ? context->d_auth : boost::none, vpacket);
+        }
 #endif /* HAVE_FSTRM */
 
-      ret = arecvtcp(packet, 2, handler, false);
-      if (ret != LWResult::Result::Success) {
-        return ret;
-      }
-
-      memcpy(&tlen, packet.data(), sizeof(tlen));
-      len=ntohs(tlen); // switch to the 'len' shared with the rest of the function
+        ret = arecvtcp(packet, 2, connection.d_handler, false);
+        if (ret != LWResult::Result::Success) {
+          if (isNew) {
+            return ret;
+          } else {
+            continue;
+          }
+        }
 
-      // XXX receive into buf directly?
-      packet.resize(len);
-      ret = arecvtcp(packet, len, handler, false);
-      if (ret != LWResult::Result::Success) {
-        return ret;
+        memcpy(&tlen, packet.data(), sizeof(tlen));
+        len = ntohs(tlen); // switch to the 'len' shared with the rest of the function
+
+        // XXX receive into buf directly?
+        packet.resize(len);
+        ret = arecvtcp(packet, len, connection.d_handler, false);
+        if (ret != LWResult::Result::Success) {
+          if (isNew) {
+            return ret;
+          } else {
+            continue;
+          }
+        }
+        buf.resize(len);
+        memcpy(buf.data(), packet.data(), len);
+        //handler->close();
+        ret = LWResult::Result::Success;
+        break;
       }
-
-      buf.resize(len);
-      memcpy(buf.data(), packet.data(), len);
-
-      handler->close();
-      ret = LWResult::Result::Success;
     }
     catch (const NetworkError& ne) {
       ret = LWResult::Result::OSLimitError; // OS limits error
@@ -491,7 +512,7 @@ LWResult::Result asyncresolve(const ComboAddress& ip, const DNSName& domain, int
     if(outgoingLoggers) {
       logIncomingResponse(outgoingLoggers, context ? context->d_initialRequestId : boost::none, uuid, ip, domain, type, qid, doTCP, srcmask, len, lwr->d_rcode, lwr->d_records, queryTime, exportTypes);
     }
-
+    
     lwr->d_validpacket = true;
     return LWResult::Result::Success;
   }
@@ -524,3 +545,19 @@ LWResult::Result asyncresolve(const ComboAddress& ip, const DNSName& domain, int
   return LWResult::Result::PermanentError;
 }
 
+LWResult::Result asyncresolve(const ComboAddress& ip, const DNSName& domain, int type, bool doTCP, bool sendRDQuery, int EDNS0Level, struct timeval* now, boost::optional<Netmask>& srcmask, boost::optional<const ResolveContext&> context, const std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>>& outgoingLoggers, const std::shared_ptr<std::vector<std::unique_ptr<FrameStreamLogger>>>& fstrmLoggers, const std::set<uint16_t>& exportTypes, LWResult *lwr, bool* chained)
+{
+  TCPOutConnectionManager::Connection connection;
+  auto ret = asyncresolve1(ip, domain, type,doTCP, sendRDQuery, EDNS0Level, now, srcmask, context, outgoingLoggers, fstrmLoggers, exportTypes, lwr, chained, connection);
+
+  if (doTCP) {
+    if (!lwr->d_validpacket) {
+      ret = asyncresolve1(ip, domain, type,doTCP, sendRDQuery, EDNS0Level, now, srcmask, context, outgoingLoggers, fstrmLoggers, exportTypes, lwr, chained, connection);
+    } 
+    if (connection.d_handler && lwr->d_validpacket) {
+      t_tcp_manager.store(ip, connection);
+    }
+  }
+  return ret;
+}
+
index 186e01de1af386c27cdcf87b5af6e749ab760cbb..b16c9dde1a3c259fd7dcc602f2e10fd9fe847166 100644 (file)
 #include "nod.hh"
 #endif /* NOD_ENABLED */
 #include "query-local-address.hh"
+#include "rec-tcpout.hh"
 
 #include "rec-snmp.hh"
 #include "rec-taskqueue.hh"
@@ -3662,8 +3663,8 @@ static void doStats(void)
       <<broadcastAccFunction<uint64_t>(pleaseGetEDNSStatusesSize)<<endl;
     g_log<<Logger::Notice<<"stats: outpacket/query ratio "<<ratePercentage(SyncRes::s_outqueries, SyncRes::s_queries)<<"%";
     g_log<<Logger::Notice<<", "<<ratePercentage(SyncRes::s_throttledqueries, SyncRes::s_outqueries+SyncRes::s_throttledqueries)<<"% throttled"<<endl;
-    g_log<<Logger::Notice<<"stats: "<<SyncRes::s_tcpoutqueries<<"/"<<SyncRes::s_dotoutqueries << " outgoing tcp/dot connections, "<<
-      broadcastAccFunction<uint64_t>(pleaseGetConcurrentQueries)<<" queries running, "<<SyncRes::s_outgoingtimeouts<<" outgoing timeouts"<<endl;
+    g_log<<Logger::Notice<<"stats: "<<SyncRes::s_tcpoutqueries<<"/"<<SyncRes::s_dotoutqueries << "/" << getCurrentIdleTCPConnections() << " outgoing tcp/dot/idle connections, "<<
+      broadcastAccFunction<uint64_t>(pleaseGetConcurrentQueries)<<" queries running, "<<SyncRes::s_outgoingtimeouts<<" outgoing timeouts "<<endl;
 
     uint64_t pcSize = broadcastAccFunction<uint64_t>(pleaseGetPacketCacheSize);
     uint64_t pcHits = broadcastAccFunction<uint64_t>(pleaseGetPacketCacheHits);
@@ -3734,6 +3735,7 @@ static void houseKeeping(void *)
       SyncRes::pruneThrottledServers();
       SyncRes::pruneNonResolving(now.tv_sec - SyncRes::s_nonresolvingnsthrottletime);
       Utility::gettimeofday(&last_prune, nullptr);
+      t_tcp_manager.cleanup();
     }
 
     if(isHandlerThread()) {
@@ -4504,6 +4506,7 @@ static void checkOrFixFDS()
 {
   unsigned int availFDs=getFilenumLimit(); 
   unsigned int wantFDs = g_maxMThreads * g_numWorkerThreads +25; // even healthier margin then before
+  wantFDs += g_numWorkerThreads * TCPOutConnectionManager::maxIdlePerThread;
 
   if(wantFDs > availFDs) {
     unsigned int hardlimit= getFilenumLimit(true);
@@ -4512,7 +4515,7 @@ static void checkOrFixFDS()
       g_log<<Logger::Warning<<"Raised soft limit on number of filedescriptors to "<<wantFDs<<" to match max-mthreads and threads settings"<<endl;
     }
     else {
-      int newval = (hardlimit - 25) / g_numWorkerThreads;
+      int newval = (hardlimit - 25 - TCPOutConnectionManager::maxIdlePerThread) / g_numWorkerThreads;
       g_log<<Logger::Warning<<"Insufficient number of filedescriptors available for max-mthreads*threads setting! ("<<hardlimit<<" < "<<wantFDs<<"), reducing max-mthreads to "<<newval<<endl;
       g_maxMThreads = newval;
       setFilenumLimit(hardlimit);
@@ -5062,6 +5065,12 @@ static int serviceMain(int argc, char*argv[])
     TCPConnection::s_maxInFlight = maxInFlight;
   }
 
+  int64_t millis = ::arg().asNum("tcpout-maxidle-ms");
+  TCPOutConnectionManager::maxIdleTime = timeval{millis / 1000, (millis % 1000) * 1000 };
+  TCPOutConnectionManager::maxIdlePerAuth = ::arg().asNum("tcpout-maxidle-per-auth");
+  TCPOutConnectionManager::maxQueries = ::arg().asNum("tcpout-max-queries");
+  TCPOutConnectionManager::maxIdlePerThread = ::arg().asNum("tcpout-maxidle-per-thread");
+
   g_gettagNeedsEDNSOptions = ::arg().mustDo("gettag-needs-edns-options");
 
   g_statisticsInterval = ::arg().asNum("statistics-interval");
@@ -5933,6 +5942,11 @@ int main(int argc, char **argv)
     ::arg().setSwitch("dot-to-port-853", "Force DoT connection to target port 853 if DoT compiled in")="yes";
     ::arg().set("dot-to-auth-names", "Use DoT to authoritative servers with these names or suffixes")="";
 
+    ::arg().set("tcpout-maxidle-ms", "Maximum time TCP connections are left idle in milliseconds or 0 if no limit") = "10000";
+    ::arg().set("tcpout-maxidle-per-auth", "Maximum number of idle TCP connections to a specific IP per thread, 0 means do not keep idle connections open") = "10";
+    ::arg().set("tcpout-max-queries", "Maximum total number of queries per connection, 0 means no limit") = "0";
+    ::arg().set("tcpout-maxidle-per-thread", "Maximum number of idle TCP connections per thread") = "100";
+
     ::arg().setCmd("help","Provide a helpful message");
     ::arg().setCmd("version","Print version string");
     ::arg().setCmd("config","Output blank configuration");
index 88349e0155e2fd0f2f3a2849af819fada75638e2..d19d719b0c67d0f96abe04c5d0b12c13a3aee526 100644 (file)
@@ -39,6 +39,7 @@
 #include "pubsuffix.hh"
 #include "namespaces.hh"
 #include "rec-taskqueue.hh"
+#include "rec-tcpout.hh"
 
 std::pair<std::string, std::string> PrefixDashNumberCompare::prefixAndTrailingNum(const std::string& a)
 {
@@ -1423,6 +1424,8 @@ static void registerAllStats1()
   addGetStat("almost-expired-run",  []() { return getAlmostExpiredTasksRun(); });
   addGetStat("almost-expired-exceptions",  []() { return getAlmostExpiredTaskExceptions(); });
 
+  addGetStat("idle-tcpout-connections",  getCurrentIdleTCPConnections);
+
   /* make sure that the ECS stats are properly initialized */
   SyncRes::clearECSStats();
   for (size_t idx = 0; idx < SyncRes::s_ecsResponsesBySubnetSize4.size(); idx++) {
index 7bc9ec6fb9ec888d4f7bb6348ef105d9a0a9e178..d8d39cfa1fdec209df116b91a911a8a283671307 100644 (file)
@@ -163,6 +163,7 @@ pdns_recursor_SOURCES = \
        rec-protozero.cc rec-protozero.hh \
        rec-snmp.hh rec-snmp.cc \
        rec-taskqueue.cc rec-taskqueue.hh \
+        rec-tcpout.cc rec_tcpout.hh \
        rec_channel.cc rec_channel.hh rec_metrics.hh \
        rec_channel_rec.cc \
        recpacketcache.cc recpacketcache.hh \
diff --git a/pdns/recursordist/rec-tcpout.cc b/pdns/recursordist/rec-tcpout.cc
new file mode 100644 (file)
index 0000000..91a0ef8
--- /dev/null
@@ -0,0 +1,80 @@
+/*
+ * This file is part of PowerDNS or dnsdist.
+ * Copyright -- PowerDNS.COM B.V. and its contributors
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of version 2 of the GNU General Public License as
+ * published by the Free Software Foundation.
+ *
+ * In addition, for the avoidance of any doubt, permission is granted to
+ * link this program with OpenSSL and to (re)distribute the binaries
+ * produced as the result of such linking.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#include "rec-tcpout.hh"
+
+timeval TCPOutConnectionManager::maxIdleTime;
+size_t TCPOutConnectionManager::maxQueries;
+size_t TCPOutConnectionManager::maxIdlePerAuth;
+size_t TCPOutConnectionManager::maxIdlePerThread;
+
+void TCPOutConnectionManager::cleanup()
+{
+  if (maxIdleTime.tv_sec == 0 && maxIdleTime.tv_usec == 0) {
+    // no maximum idle time
+    return;
+  }
+  struct timeval now;
+  gettimeofday(&now, nullptr);
+
+  for (auto it = d_idle_connections.begin(); it != d_idle_connections.end();) {
+    timeval idle = now - it->second.d_last_used;
+    if (maxIdleTime < idle) {
+      it = d_idle_connections.erase(it);
+    }
+    else {
+      ++it;
+    }
+  }
+}
+
+void TCPOutConnectionManager::store(const ComboAddress& ip, Connection& connection)
+{
+  cleanup();
+  if (d_idle_connections.size() >= maxIdlePerThread) {
+    return;
+  }
+  if (d_idle_connections.count(ip) >= maxIdlePerAuth) {
+    return;
+  }
+
+  ++connection.d_numqueries;
+  if (maxQueries > 0 && connection.d_numqueries > maxQueries) {
+    return;
+  }
+  gettimeofday(&connection.d_last_used, nullptr);
+  d_idle_connections.emplace(ip, connection);
+}
+
+TCPOutConnectionManager::Connection TCPOutConnectionManager::get(const ComboAddress& ip)
+{
+  if (d_idle_connections.count(ip) > 0) {
+    auto h = d_idle_connections.extract(ip);
+    return h.mapped();
+  }
+  return Connection{};
+}
+
+uint64_t getCurrentIdleTCPConnections()
+{
+  return broadcastAccFunction<uint64_t>([] { return t_tcp_manager.getSize(); });
+}
diff --git a/pdns/recursordist/rec-tcpout.hh b/pdns/recursordist/rec-tcpout.hh
new file mode 100644 (file)
index 0000000..3b17da1
--- /dev/null
@@ -0,0 +1,76 @@
+/*
+ * This file is part of PowerDNS or dnsdist.
+ * Copyright -- PowerDNS.COM B.V. and its contributors
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of version 2 of the GNU General Public License as
+ * published by the Free Software Foundation.
+ *
+ * In addition, for the avoidance of any doubt, permission is granted to
+ * link this program with OpenSSL and to (re)distribute the binaries
+ * produced as the result of such linking.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#pragma once
+
+#include "iputils.hh"
+#include "tcpiohandler.hh"
+#include "syncres.hh"
+
+class TCPOutConnectionManager
+{
+public:
+  // Max idle time for a connection, 0 is no timeout
+  static struct timeval maxIdleTime;
+  // Per thread maximum of idle connections for a specific destination, 0 means no idle connections will be kept open
+  static size_t maxIdlePerAuth;
+  // Max total number of queries to handle per connection, 0 is no max
+  static size_t maxQueries;
+  // Per thread max # of idle connections, here 0 means a real limit
+  static size_t maxIdlePerThread;
+
+  struct Connection
+  {
+    std::string toString() const
+    {
+      if (d_handler) {
+        return std::to_string(d_handler->getDescriptor()) + ' ' + std::to_string(d_handler.use_count());
+      }
+      return "";
+    }
+
+    std::shared_ptr<TCPIOHandler> d_handler;
+    timeval d_last_used{0, 0};
+    size_t d_numqueries{0};
+  };
+
+  void store(const ComboAddress& ip, Connection& connection);
+  Connection get(const ComboAddress& ip);
+  void cleanup();
+
+  size_t size() const
+  {
+    return d_idle_connections.size();
+  }
+  uint64_t* getSize() const
+  {
+    return new uint64_t(size());
+  }
+
+private:
+
+  std::multimap<ComboAddress, Connection> d_idle_connections;
+};
+
+extern thread_local TCPOutConnectionManager t_tcp_manager;
+uint64_t getCurrentIdleTCPConnections();
+