#include "dnstap.hh"
#include "fstrm_logger.hh"
+#include "rec-tcpout.hh"
+
+thread_local TCPOutConnectionManager t_tcp_manager;
bool g_syslog;
/** lwr is only filled out in case 1 was returned, and even when returning 1 for 'success', lwr might contain DNS errors
Never throws!
*/
-LWResult::Result asyncresolve(const ComboAddress& ip, const DNSName& domain, int type, bool doTCP, bool sendRDQuery, int EDNS0Level, struct timeval* now, boost::optional<Netmask>& srcmask, boost::optional<const ResolveContext&> context, const std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>>& outgoingLoggers, const std::shared_ptr<std::vector<std::unique_ptr<FrameStreamLogger>>>& fstrmLoggers, const std::set<uint16_t>& exportTypes, LWResult *lwr, bool* chained)
+static LWResult::Result asyncresolve1(const ComboAddress& ip, const DNSName& domain, int type, bool doTCP, bool sendRDQuery, int EDNS0Level, struct timeval* now, boost::optional<Netmask>& srcmask, boost::optional<const ResolveContext&> context, const std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>>& outgoingLoggers, const std::shared_ptr<std::vector<std::unique_ptr<FrameStreamLogger>>>& fstrmLoggers, const std::set<uint16_t>& exportTypes, LWResult *lwr, bool* chained, TCPOutConnectionManager::Connection& connection)
{
size_t len;
size_t bufsize=g_outgoingEDNSBufsize;
}
else {
try {
- const struct timeval timeout{ g_networkTimeoutMsec / 1000, static_cast<suseconds_t>(g_networkTimeoutMsec) % 1000 * 1000};
-
- Socket s(ip.sin4.sin_family, SOCK_STREAM);
- s.setNonBlocking();
- localip = pdns::getQueryLocalAddress(ip.sin4.sin_family, 0);
- s.bind(localip);
-
- std::shared_ptr<TLSCtx> tlsCtx{nullptr};
- if (SyncRes::s_dot_to_port_853 && ip.getPort() == 853) {
- TLSContextParameters tlsParams;
- tlsParams.d_provider = "openssl";
- tlsParams.d_validateCertificates = false;
- //tlsParams.d_caStore = caaStore;
- tlsCtx = getTLSContext(tlsParams);
- if (tlsCtx == nullptr) {
- g_log << Logger::Error << "DoT to " << ip << " requested but not available" << endl;
+ while (true) {
+ bool isNew = false;
+ connection = t_tcp_manager.get(ip);
+ if (!connection.d_handler) {
+ isNew = true;
+ const struct timeval timeout{ g_networkTimeoutMsec / 1000, static_cast<suseconds_t>(g_networkTimeoutMsec) % 1000 * 1000};
+ Socket s(ip.sin4.sin_family, SOCK_STREAM);
+ s.setNonBlocking();
+ localip = pdns::getQueryLocalAddress(ip.sin4.sin_family, 0);
+ s.bind(localip);
+
+ std::shared_ptr<TLSCtx> tlsCtx{nullptr};
+ if (SyncRes::s_dot_to_port_853 && ip.getPort() == 853) {
+ TLSContextParameters tlsParams;
+ tlsParams.d_provider = "openssl";
+ tlsParams.d_validateCertificates = false;
+ //tlsParams.d_caStore = caaStore;
+ tlsCtx = getTLSContext(tlsParams);
+ if (tlsCtx == nullptr) {
+ g_log << Logger::Error << "DoT to " << ip << " requested but not available" << endl;
+ }
+ else {
+ dnsOverTLS = true;
+ }
+ }
+ connection.d_handler = std::make_shared<TCPIOHandler>("", s.releaseHandle(), timeout, tlsCtx, now->tv_sec);
+ // Returned state ignored
+ connection.d_handler->tryConnect(SyncRes::s_tcp_fast_open_connect, ip);
}
- else {
- dnsOverTLS = true;
+ localip.sin4.sin_family = ip.sin4.sin_family;
+ socklen_t slen = ip.getSocklen();
+ getsockname(connection.d_handler->getDescriptor(), reinterpret_cast<sockaddr*>(&localip), &slen);
+ uint16_t tlen = htons(vpacket.size());
+ char *lenP = (char*)&tlen;
+ const char *msgP=(const char*)&*vpacket.begin();
+ PacketBuffer packet;
+ packet.reserve(2 + vpacket.size());
+ packet.insert(packet.end(), lenP, lenP+2);
+ packet.insert(packet.end(), msgP, msgP+vpacket.size());
+ ret = asendtcp(packet, connection.d_handler);
+ if (ret != LWResult::Result::Success) {
+ if (isNew) {
+ connection.d_handler->close();
+ return ret;
+ } else {
+ continue;
+ }
}
- }
- auto handler = std::make_shared<TCPIOHandler>("", s.releaseHandle(), timeout, tlsCtx, now->tv_sec);
- // Returned state ignored
- handler->tryConnect(SyncRes::s_tcp_fast_open_connect, ip);
-
- uint16_t tlen=htons(vpacket.size());
- char *lenP=(char*)&tlen;
- const char *msgP=(const char*)&*vpacket.begin();
- PacketBuffer packet;
- packet.reserve(2 + vpacket.size());
- packet.insert(packet.end(), lenP, lenP+2);
- packet.insert(packet.end(), msgP, msgP+vpacket.size());
- ret = asendtcp(packet, handler);
- if (ret != LWResult::Result::Success) {
- handler->close();
- return ret;
- }
-
#ifdef HAVE_FSTRM
- if (fstrmQEnabled) {
- logFstreamQuery(fstrmLoggers, queryTime, localip, ip, !dnsOverTLS ? DnstapMessage::ProtocolType::DoTCP : DnstapMessage::ProtocolType::DoT, context ? context->d_auth : boost::none, vpacket);
- }
+ if (fstrmQEnabled) {
+ logFstreamQuery(fstrmLoggers, queryTime, localip, ip, !dnsOverTLS ? DnstapMessage::ProtocolType::DoTCP : DnstapMessage::ProtocolType::DoT, context ? context->d_auth : boost::none, vpacket);
+ }
#endif /* HAVE_FSTRM */
- ret = arecvtcp(packet, 2, handler, false);
- if (ret != LWResult::Result::Success) {
- return ret;
- }
-
- memcpy(&tlen, packet.data(), sizeof(tlen));
- len=ntohs(tlen); // switch to the 'len' shared with the rest of the function
+ ret = arecvtcp(packet, 2, connection.d_handler, false);
+ if (ret != LWResult::Result::Success) {
+ if (isNew) {
+ return ret;
+ } else {
+ continue;
+ }
+ }
- // XXX receive into buf directly?
- packet.resize(len);
- ret = arecvtcp(packet, len, handler, false);
- if (ret != LWResult::Result::Success) {
- return ret;
+ memcpy(&tlen, packet.data(), sizeof(tlen));
+ len = ntohs(tlen); // switch to the 'len' shared with the rest of the function
+
+ // XXX receive into buf directly?
+ packet.resize(len);
+ ret = arecvtcp(packet, len, connection.d_handler, false);
+ if (ret != LWResult::Result::Success) {
+ if (isNew) {
+ return ret;
+ } else {
+ continue;
+ }
+ }
+ buf.resize(len);
+ memcpy(buf.data(), packet.data(), len);
+ //handler->close();
+ ret = LWResult::Result::Success;
+ break;
}
-
- buf.resize(len);
- memcpy(buf.data(), packet.data(), len);
-
- handler->close();
- ret = LWResult::Result::Success;
}
catch (const NetworkError& ne) {
ret = LWResult::Result::OSLimitError; // OS limits error
if(outgoingLoggers) {
logIncomingResponse(outgoingLoggers, context ? context->d_initialRequestId : boost::none, uuid, ip, domain, type, qid, doTCP, srcmask, len, lwr->d_rcode, lwr->d_records, queryTime, exportTypes);
}
-
+
lwr->d_validpacket = true;
return LWResult::Result::Success;
}
return LWResult::Result::PermanentError;
}
+LWResult::Result asyncresolve(const ComboAddress& ip, const DNSName& domain, int type, bool doTCP, bool sendRDQuery, int EDNS0Level, struct timeval* now, boost::optional<Netmask>& srcmask, boost::optional<const ResolveContext&> context, const std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>>& outgoingLoggers, const std::shared_ptr<std::vector<std::unique_ptr<FrameStreamLogger>>>& fstrmLoggers, const std::set<uint16_t>& exportTypes, LWResult *lwr, bool* chained)
+{
+ TCPOutConnectionManager::Connection connection;
+ auto ret = asyncresolve1(ip, domain, type,doTCP, sendRDQuery, EDNS0Level, now, srcmask, context, outgoingLoggers, fstrmLoggers, exportTypes, lwr, chained, connection);
+
+ if (doTCP) {
+ if (!lwr->d_validpacket) {
+ ret = asyncresolve1(ip, domain, type,doTCP, sendRDQuery, EDNS0Level, now, srcmask, context, outgoingLoggers, fstrmLoggers, exportTypes, lwr, chained, connection);
+ }
+ if (connection.d_handler && lwr->d_validpacket) {
+ t_tcp_manager.store(ip, connection);
+ }
+ }
+ return ret;
+}
+
#include "nod.hh"
#endif /* NOD_ENABLED */
#include "query-local-address.hh"
+#include "rec-tcpout.hh"
#include "rec-snmp.hh"
#include "rec-taskqueue.hh"
<<broadcastAccFunction<uint64_t>(pleaseGetEDNSStatusesSize)<<endl;
g_log<<Logger::Notice<<"stats: outpacket/query ratio "<<ratePercentage(SyncRes::s_outqueries, SyncRes::s_queries)<<"%";
g_log<<Logger::Notice<<", "<<ratePercentage(SyncRes::s_throttledqueries, SyncRes::s_outqueries+SyncRes::s_throttledqueries)<<"% throttled"<<endl;
- g_log<<Logger::Notice<<"stats: "<<SyncRes::s_tcpoutqueries<<"/"<<SyncRes::s_dotoutqueries << " outgoing tcp/dot connections, "<<
- broadcastAccFunction<uint64_t>(pleaseGetConcurrentQueries)<<" queries running, "<<SyncRes::s_outgoingtimeouts<<" outgoing timeouts"<<endl;
+ g_log<<Logger::Notice<<"stats: "<<SyncRes::s_tcpoutqueries<<"/"<<SyncRes::s_dotoutqueries << "/" << getCurrentIdleTCPConnections() << " outgoing tcp/dot/idle connections, "<<
+ broadcastAccFunction<uint64_t>(pleaseGetConcurrentQueries)<<" queries running, "<<SyncRes::s_outgoingtimeouts<<" outgoing timeouts "<<endl;
uint64_t pcSize = broadcastAccFunction<uint64_t>(pleaseGetPacketCacheSize);
uint64_t pcHits = broadcastAccFunction<uint64_t>(pleaseGetPacketCacheHits);
SyncRes::pruneThrottledServers();
SyncRes::pruneNonResolving(now.tv_sec - SyncRes::s_nonresolvingnsthrottletime);
Utility::gettimeofday(&last_prune, nullptr);
+ t_tcp_manager.cleanup();
}
if(isHandlerThread()) {
{
unsigned int availFDs=getFilenumLimit();
unsigned int wantFDs = g_maxMThreads * g_numWorkerThreads +25; // even healthier margin then before
+ wantFDs += g_numWorkerThreads * TCPOutConnectionManager::maxIdlePerThread;
if(wantFDs > availFDs) {
unsigned int hardlimit= getFilenumLimit(true);
g_log<<Logger::Warning<<"Raised soft limit on number of filedescriptors to "<<wantFDs<<" to match max-mthreads and threads settings"<<endl;
}
else {
- int newval = (hardlimit - 25) / g_numWorkerThreads;
+ int newval = (hardlimit - 25 - TCPOutConnectionManager::maxIdlePerThread) / g_numWorkerThreads;
g_log<<Logger::Warning<<"Insufficient number of filedescriptors available for max-mthreads*threads setting! ("<<hardlimit<<" < "<<wantFDs<<"), reducing max-mthreads to "<<newval<<endl;
g_maxMThreads = newval;
setFilenumLimit(hardlimit);
TCPConnection::s_maxInFlight = maxInFlight;
}
+ int64_t millis = ::arg().asNum("tcpout-maxidle-ms");
+ TCPOutConnectionManager::maxIdleTime = timeval{millis / 1000, (millis % 1000) * 1000 };
+ TCPOutConnectionManager::maxIdlePerAuth = ::arg().asNum("tcpout-maxidle-per-auth");
+ TCPOutConnectionManager::maxQueries = ::arg().asNum("tcpout-max-queries");
+ TCPOutConnectionManager::maxIdlePerThread = ::arg().asNum("tcpout-maxidle-per-thread");
+
g_gettagNeedsEDNSOptions = ::arg().mustDo("gettag-needs-edns-options");
g_statisticsInterval = ::arg().asNum("statistics-interval");
::arg().setSwitch("dot-to-port-853", "Force DoT connection to target port 853 if DoT compiled in")="yes";
::arg().set("dot-to-auth-names", "Use DoT to authoritative servers with these names or suffixes")="";
+ ::arg().set("tcpout-maxidle-ms", "Maximum time TCP connections are left idle in milliseconds or 0 if no limit") = "10000";
+ ::arg().set("tcpout-maxidle-per-auth", "Maximum number of idle TCP connections to a specific IP per thread, 0 means do not keep idle connections open") = "10";
+ ::arg().set("tcpout-max-queries", "Maximum total number of queries per connection, 0 means no limit") = "0";
+ ::arg().set("tcpout-maxidle-per-thread", "Maximum number of idle TCP connections per thread") = "100";
+
::arg().setCmd("help","Provide a helpful message");
::arg().setCmd("version","Print version string");
::arg().setCmd("config","Output blank configuration");
#include "pubsuffix.hh"
#include "namespaces.hh"
#include "rec-taskqueue.hh"
+#include "rec-tcpout.hh"
std::pair<std::string, std::string> PrefixDashNumberCompare::prefixAndTrailingNum(const std::string& a)
{
addGetStat("almost-expired-run", []() { return getAlmostExpiredTasksRun(); });
addGetStat("almost-expired-exceptions", []() { return getAlmostExpiredTaskExceptions(); });
+ addGetStat("idle-tcpout-connections", getCurrentIdleTCPConnections);
+
/* make sure that the ECS stats are properly initialized */
SyncRes::clearECSStats();
for (size_t idx = 0; idx < SyncRes::s_ecsResponsesBySubnetSize4.size(); idx++) {
rec-protozero.cc rec-protozero.hh \
rec-snmp.hh rec-snmp.cc \
rec-taskqueue.cc rec-taskqueue.hh \
+ rec-tcpout.cc rec_tcpout.hh \
rec_channel.cc rec_channel.hh rec_metrics.hh \
rec_channel_rec.cc \
recpacketcache.cc recpacketcache.hh \
--- /dev/null
+/*
+ * This file is part of PowerDNS or dnsdist.
+ * Copyright -- PowerDNS.COM B.V. and its contributors
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of version 2 of the GNU General Public License as
+ * published by the Free Software Foundation.
+ *
+ * In addition, for the avoidance of any doubt, permission is granted to
+ * link this program with OpenSSL and to (re)distribute the binaries
+ * produced as the result of such linking.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#include "rec-tcpout.hh"
+
+timeval TCPOutConnectionManager::maxIdleTime;
+size_t TCPOutConnectionManager::maxQueries;
+size_t TCPOutConnectionManager::maxIdlePerAuth;
+size_t TCPOutConnectionManager::maxIdlePerThread;
+
+void TCPOutConnectionManager::cleanup()
+{
+ if (maxIdleTime.tv_sec == 0 && maxIdleTime.tv_usec == 0) {
+ // no maximum idle time
+ return;
+ }
+ struct timeval now;
+ gettimeofday(&now, nullptr);
+
+ for (auto it = d_idle_connections.begin(); it != d_idle_connections.end();) {
+ timeval idle = now - it->second.d_last_used;
+ if (maxIdleTime < idle) {
+ it = d_idle_connections.erase(it);
+ }
+ else {
+ ++it;
+ }
+ }
+}
+
+void TCPOutConnectionManager::store(const ComboAddress& ip, Connection& connection)
+{
+ cleanup();
+ if (d_idle_connections.size() >= maxIdlePerThread) {
+ return;
+ }
+ if (d_idle_connections.count(ip) >= maxIdlePerAuth) {
+ return;
+ }
+
+ ++connection.d_numqueries;
+ if (maxQueries > 0 && connection.d_numqueries > maxQueries) {
+ return;
+ }
+ gettimeofday(&connection.d_last_used, nullptr);
+ d_idle_connections.emplace(ip, connection);
+}
+
+TCPOutConnectionManager::Connection TCPOutConnectionManager::get(const ComboAddress& ip)
+{
+ if (d_idle_connections.count(ip) > 0) {
+ auto h = d_idle_connections.extract(ip);
+ return h.mapped();
+ }
+ return Connection{};
+}
+
+uint64_t getCurrentIdleTCPConnections()
+{
+ return broadcastAccFunction<uint64_t>([] { return t_tcp_manager.getSize(); });
+}
--- /dev/null
+/*
+ * This file is part of PowerDNS or dnsdist.
+ * Copyright -- PowerDNS.COM B.V. and its contributors
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of version 2 of the GNU General Public License as
+ * published by the Free Software Foundation.
+ *
+ * In addition, for the avoidance of any doubt, permission is granted to
+ * link this program with OpenSSL and to (re)distribute the binaries
+ * produced as the result of such linking.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#pragma once
+
+#include "iputils.hh"
+#include "tcpiohandler.hh"
+#include "syncres.hh"
+
+class TCPOutConnectionManager
+{
+public:
+ // Max idle time for a connection, 0 is no timeout
+ static struct timeval maxIdleTime;
+ // Per thread maximum of idle connections for a specific destination, 0 means no idle connections will be kept open
+ static size_t maxIdlePerAuth;
+ // Max total number of queries to handle per connection, 0 is no max
+ static size_t maxQueries;
+ // Per thread max # of idle connections, here 0 means a real limit
+ static size_t maxIdlePerThread;
+
+ struct Connection
+ {
+ std::string toString() const
+ {
+ if (d_handler) {
+ return std::to_string(d_handler->getDescriptor()) + ' ' + std::to_string(d_handler.use_count());
+ }
+ return "";
+ }
+
+ std::shared_ptr<TCPIOHandler> d_handler;
+ timeval d_last_used{0, 0};
+ size_t d_numqueries{0};
+ };
+
+ void store(const ComboAddress& ip, Connection& connection);
+ Connection get(const ComboAddress& ip);
+ void cleanup();
+
+ size_t size() const
+ {
+ return d_idle_connections.size();
+ }
+ uint64_t* getSize() const
+ {
+ return new uint64_t(size());
+ }
+
+private:
+
+ std::multimap<ComboAddress, Connection> d_idle_connections;
+};
+
+extern thread_local TCPOutConnectionManager t_tcp_manager;
+uint64_t getCurrentIdleTCPConnections();
+