]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
Introduce rec-main.hh and split out TCP code to rec-tcp.cc
authorOtto Moerbeek <otto.moerbeek@open-xchange.com>
Tue, 11 Jan 2022 09:01:45 +0000 (10:01 +0100)
committerOtto Moerbeek <otto.moerbeek@open-xchange.com>
Tue, 11 Jan 2022 10:43:16 +0000 (11:43 +0100)
.not-formatted
pdns/lua-recursor4.cc
pdns/lua-recursor4.hh
pdns/pdns_recursor.cc
pdns/rec_channel_rec.cc
pdns/recursordist/Makefile.am
pdns/recursordist/rec-main.cc [new file with mode: 0644]
pdns/recursordist/rec-main.hh [new file with mode: 0644]
pdns/recursordist/rec-tcp.cc [new file with mode: 0644]
pdns/syncres.hh
pdns/ws-recursor.cc

index fb2b51ceabb08b12754d7dd900eb3c28ded64779..0dfb64d2544a28b2f6af2e3d0090bb5636cd6e6a 100644 (file)
 ./pdns/packethandler.cc
 ./pdns/packethandler.hh
 ./pdns/pdns_hw.cc
-./pdns/pdns_recursor.cc
 ./pdns/pdnsexception.hh
 ./pdns/pdnsutil.cc
 ./pdns/pkcs11signers.cc
index b0731d4822e530f94e5cb365ad584ee2c23c1a9a..b346c3f4130ec22edf1c9237e97a8efd3bbfb9eb 100644 (file)
@@ -31,6 +31,7 @@
 #include "filterpo.hh"
 #include "rec-snmp.hh"
 #include <unordered_set>
+#include "rec-main.hh"
 
 RecursorLua4::RecursorLua4() { prepareContext(); }
 
index c877725982b55fa343d1042a20f8d42d38294418..2fabcc06c3889146afe00a95fad1feffd420770f 100644 (file)
@@ -41,9 +41,6 @@
 
 #include "lua-recursor4-ffi.hh"
 
-PacketBuffer GenUDPQueryResponse(const ComboAddress& dest, const string& query);
-unsigned int getRecursorThreadId();
-
 // pdns_ffi_param_t is a lightuserdata
 template <>
 struct LuaContext::Pusher<pdns_ffi_param*>
index 090bb4f16c5c86a5fd6bcef5f37c6a523ccafd71..5853c1339ba71e13bf115266a1ff8b984d48c5c3 100644 (file)
@@ -19,9 +19,8 @@
  * along with this program; if not, write to the Free Software
  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  */
-#ifdef HAVE_CONFIG_H
-#include "config.h"
-#endif
+
+#include "rec-main.hh"
 
 #include <netdb.h>
 #include <sys/stat.h>
 #include "xpf.hh"
 #include "rec-eventtrace.hh"
 
-typedef map<ComboAddress, uint32_t, ComboAddress::addressOnlyLessThan> tcpClientCounts_t;
-
-static thread_local std::shared_ptr<RecursorLua4> t_pdl;
-static thread_local unsigned int t_id = 0;
+thread_local std::shared_ptr<RecursorLua4> t_pdl;
+thread_local unsigned int t_id = 0;
 static thread_local std::shared_ptr<Regex> t_traceRegex;
-static thread_local std::unique_ptr<tcpClientCounts_t> t_tcpClientCounts;
-static thread_local std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>> t_protobufServers{nullptr};
+thread_local std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>> t_protobufServers{nullptr};
 static thread_local uint64_t t_protobufServersGeneration;
-static thread_local std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>> t_outgoingProtobufServers{nullptr};
+thread_local std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>> t_outgoingProtobufServers{nullptr};
 static thread_local uint64_t t_outgoingProtobufServersGeneration;
 
 #ifdef HAVE_FSTRM
@@ -150,8 +146,6 @@ thread_local std::shared_ptr<nod::UniqueResponseDB> t_udrDBp;
 #endif /* NOD_ENABLED */
 __thread struct timeval g_now; // timestamp, updated (too) frequently
 
-typedef vector<pair<int, boost::function< void(int, boost::any&) > > > deferredAdd_t;
-
 // for communicating with our threads
 // effectively readonly after startup
 struct RecThreadInfo
@@ -196,7 +190,6 @@ static std::vector<RecThreadInfo> s_threadInfos;
 /* without reuseport, all listeners share the same sockets */
 static deferredAdd_t g_deferredAdds;
 
-typedef vector<int> tcpListenSockets_t;
 typedef map<int, ComboAddress> listenSocketsAddresses_t; // is shared across all threads right now
 enum class PaddingMode { Always, PaddedQueries };
 
@@ -207,32 +200,28 @@ static std::shared_ptr<SyncRes::domainmap_t> g_initialDomainMap; // new threads
 static std::shared_ptr<NetmaskGroup> g_initialAllowFrom; // new thread needs to be setup with this
 static std::shared_ptr<NetmaskGroup> g_initialAllowNotifyFrom; // new threads need this to be setup
 static std::shared_ptr<notifyset_t> g_initialAllowNotifyFor; // new threads need this to be setup
-static NetmaskGroup g_XPFAcl;
+NetmaskGroup g_XPFAcl;
 static NetmaskGroup g_proxyProtocolACL;
-static NetmaskGroup g_paddingFrom;
+NetmaskGroup g_paddingFrom;
 static boost::optional<ComboAddress> g_dns64Prefix{boost::none};
 static DNSName g_dns64PrefixReverse;
-static size_t g_proxyProtocolMaximumSize;
-static size_t g_tcpMaxQueriesPerConn;
+size_t g_proxyProtocolMaximumSize;
 static size_t s_maxUDPQueriesPerRound;
 static uint64_t g_latencyStatSize;
 static uint32_t g_disthashseed;
-static unsigned int g_maxTCPPerClient;
-static unsigned int g_maxMThreads;
+unsigned int g_maxMThreads;
 static unsigned int g_numDistributorThreads;
 static unsigned int g_numWorkerThreads;
-static unsigned int g_paddingTag;
-static int g_tcpTimeout;
+unsigned int g_paddingTag;
 static uint16_t g_udpTruncationThreshold;
 static uint16_t g_xpfRRCode{0};
 static PaddingMode g_paddingMode;
 static std::atomic<bool> statsWanted;
-static std::atomic<bool> g_quiet;
-static bool g_logCommonErrors;
-static bool g_anyToTcp;
+std::atomic<bool> g_quiet;
+bool g_logCommonErrors;
 static bool g_weDistributeQueries; // if true, 1 or more threads listen on the incoming query sockets and distribute them to workers
-static bool g_reusePort{false};
-static bool g_gettagNeedsEDNSOptions{false};
+bool g_reusePort{false};
+bool g_gettagNeedsEDNSOptions{false};
 static time_t g_statisticsInterval;
 static bool g_useIncomingECS;
 static bool g_useKernelTimestamp;
@@ -290,98 +279,6 @@ struct ThreadMSG
   bool wantAnswer;
 };
 
-//! used to send information to a newborn mthread
-struct DNSComboWriter {
-  DNSComboWriter(const std::string& query, const struct timeval& now): d_mdp(true, query), d_now(now), d_query(query)
-  {
-  }
-
-  DNSComboWriter(const std::string& query, const struct timeval& now, std::unordered_set<std::string>&& policyTags, LuaContext::LuaObject&& data, std::vector<DNSRecord>&& records): d_mdp(true, query), d_now(now), d_query(query), d_policyTags(std::move(policyTags)), d_records(std::move(records)), d_data(std::move(data))
-  {
-  }
-
-  void setRemote(const ComboAddress& sa)
-  {
-    d_remote=sa;
-  }
-
-  void setSource(const ComboAddress& sa)
-  {
-    d_source=sa;
-  }
-
-  void setLocal(const ComboAddress& sa)
-  {
-    d_local=sa;
-  }
-
-  void setDestination(const ComboAddress& sa)
-  {
-    d_destination=sa;
-  }
-
-  void setSocket(int sock)
-  {
-    d_socket=sock;
-  }
-
-  string getRemote() const
-  {
-    if (d_source == d_remote) {
-      return d_source.toStringWithPort();
-    }
-    return d_source.toStringWithPort() + " (proxied by " + d_remote.toStringWithPort() + ")";
-  }
-
-  std::vector<ProxyProtocolValue> d_proxyProtocolValues;
-  MOADNSParser d_mdp;
-  struct timeval d_now;
-  /* Remote client, might differ from d_source
-     in case of XPF, in which case d_source holds
-     the IP of the client and d_remote of the proxy
-  */
-  ComboAddress d_remote;
-  ComboAddress d_source;
-  /* Destination address, might differ from
-     d_destination in case of XPF, in which case
-     d_destination holds the IP of the proxy and
-     d_local holds our own. */
-  ComboAddress d_local;
-  ComboAddress d_destination;
-  RecEventTrace d_eventTrace;
-  boost::uuids::uuid d_uuid;
-  string d_requestorId;
-  string d_deviceId;
-  string d_deviceName;
-  struct timeval d_kernelTimestamp{0,0};
-  std::string d_query;
-  std::unordered_set<std::string> d_policyTags;
-  std::string d_routingTag;
-  std::vector<DNSRecord> d_records;
-  LuaContext::LuaObject d_data;
-  EDNSSubnetOpts d_ednssubnet;
-  shared_ptr<TCPConnection> d_tcpConnection;
-  boost::optional<uint16_t> d_extendedErrorCode{boost::none};
-  string d_extendedErrorExtra;
-  boost::optional<int> d_rcode{boost::none};
-  int d_socket{-1};
-  unsigned int d_tag{0};
-  uint32_t d_qhash{0};
-  uint32_t d_ttlCap{std::numeric_limits<uint32_t>::max()};
-  bool d_variable{false};
-  bool d_ecsFound{false};
-  bool d_ecsParsed{false};
-  bool d_followCNAMERecords{false};
-  bool d_logResponse{false};
-  bool d_tcp{false};
-  bool d_responsePaddingDisabled{false};
-  std::map<std::string, RecursorLua4::MetaValue> d_meta;
-};
-
-MT_t* getMT()
-{
-  return MT ? MT.get() : nullptr;
-}
 
 ArgvMap &arg()
 {
@@ -389,10 +286,6 @@ ArgvMap &arg()
   return theArg;
 }
 
-unsigned int getRecursorThreadId()
-{
-  return t_id;
-}
 
 static bool isDistributorThread()
 {
@@ -412,136 +305,6 @@ static bool isHandlerThread()
   return s_threadInfos.at(t_id).isHandler;
 }
 
-#if 0
-#define TCPLOG(tcpsock, x) do { cerr << [](){ timeval t; gettimeofday(&t, nullptr); return t.tv_sec % 10  + t.tv_usec/1000000.0; }() << " FD " << (tcpsock) << ' ' << x; } while (0)
-#else
-#define TCPLOG(pid, x)
-#endif
-
-static void TCPIOHandlerIO(int fd, FDMultiplexer::funcparam_t& var);
-static void TCPIOHandlerStateChange(IOState, IOState, std::shared_ptr<PacketID>&);
-
-LWResult::Result asendtcp(const PacketBuffer& data, shared_ptr<TCPIOHandler>& handler)
-{
-  TCPLOG(handler->getDescriptor(), "asendtcp called " << data.size() << endl);
-
-  auto pident = std::make_shared<PacketID>();
-  pident->tcphandler = handler;
-  pident->tcpsock = handler->getDescriptor();
-  pident->outMSG = data;
-  pident->highState = TCPAction::DoingWrite;
-
-  IOState state;
-  try {
-    TCPLOG(pident->tcpsock, "Initial tryWrite: " << pident->outPos << '/' << pident->outMSG.size() << ' ' << " -> ");
-    state = handler->tryWrite(pident->outMSG, pident->outPos, pident->outMSG.size());
-    TCPLOG(pident->tcpsock, pident->outPos << '/' << pident->outMSG.size() << endl);
-
-    if (state == IOState::Done) {
-      TCPLOG(pident->tcpsock, "asendtcp success A" << endl);
-      return LWResult::Result::Success;
-    }
-  }
-  catch (const std::exception& e) {
-    TCPLOG(pident->tcpsock, "tryWrite() exception..." << e.what() << endl);
-    return LWResult::Result::PermanentError;
-  }
-
-  // Will set pident->lowState
-  TCPIOHandlerStateChange(IOState::Done, state, pident);
-
-  PacketBuffer packet;
-  int ret = MT->waitEvent(pident, &packet, g_networkTimeoutMsec);
-  TCPLOG(pident->tcpsock, "asendtcp waitEvent returned " << ret << ' ' << packet.size() << '/' << data.size() << ' ');
-  if (ret == 0) {
-    TCPLOG(pident->tcpsock, "timeout" << endl);
-    TCPIOHandlerStateChange(pident->lowState, IOState::Done, pident);
-    return LWResult::Result::Timeout;
-  }
-  else if (ret == -1) { // error
-    TCPLOG(pident->tcpsock, "PermanentError" << endl);
-    TCPIOHandlerStateChange(pident->lowState, IOState::Done, pident);
-    return LWResult::Result::PermanentError;
-  }
-  else if (packet.size() != data.size()) { // main loop tells us what it sent out, or empty in case of an error
-    // fd housekeeping done by TCPIOHandlerIO
-    TCPLOG(pident->tcpsock, "PermanentError size mismatch" << endl);
-    return LWResult::Result::PermanentError;
-  }
-
-  TCPLOG(pident->tcpsock, "asendtcp success" << endl);
-  return LWResult::Result::Success;
-}
-
-LWResult::Result arecvtcp(PacketBuffer& data, const size_t len, shared_ptr<TCPIOHandler>& handler, const bool incompleteOkay)
-{
-  TCPLOG(handler->getDescriptor(), "arecvtcp called " << len << ' ' << data.size() << endl);
-  data.resize(len);
-
-  // We might have data already available from the TLS layer, try to get that into the buffer
-  size_t pos = 0;
-  IOState state;
-  try {
-    TCPLOG(handler->getDescriptor(), "calling tryRead() " << len << endl);
-    state = handler->tryRead(data, pos, len);
-    TCPLOG(handler->getDescriptor(), "arcvtcp tryRead() returned " << int(state) << ' ' << pos << '/' << len << endl);
-    switch (state) {
-    case IOState::Done:
-    case IOState::NeedRead:
-      if (pos == len || (incompleteOkay && pos > 0)) {
-        data.resize(pos);
-        TCPLOG(handler->getDescriptor(), "acecvtcp success A" << endl);
-        return LWResult::Result::Success;
-      }
-      break;
-    case IOState::NeedWrite:
-      break;
-    case IOState::Async:
-      throw std::runtime_error("TLS async mode not supported");
-      break;
-    }
-  }
-  catch (const std::exception& e) {
-    TCPLOG(handler->getDescriptor(), "tryRead() exception..." << e.what() << endl);
-    return LWResult::Result::PermanentError;
-  }
-
-  auto pident = std::make_shared<PacketID>();
-  pident->tcphandler = handler;
-  pident->tcpsock = handler->getDescriptor();
-  // We might have a partial result
-  pident->inMSG = std::move(data);
-  pident->inPos = pos;
-  pident->inWanted = len;
-  pident->inIncompleteOkay = incompleteOkay;
-  pident->highState = TCPAction::DoingRead;
-
-  data.clear();
-
-  // Will set pident->lowState
-  TCPIOHandlerStateChange(IOState::Done, state, pident);
-
-  int ret = MT->waitEvent(pident, &data, g_networkTimeoutMsec);
-  TCPLOG(pident->tcpsock, "arecvtcp " << ret << ' ' << data.size() << ' ' );
-  if (ret == 0) {
-    TCPLOG(pident->tcpsock, "timeout" << endl);
-    TCPIOHandlerStateChange(pident->lowState, IOState::Done, pident);
-    return LWResult::Result::Timeout;
-  }
-  else if (ret == -1) {
-    TCPLOG(pident->tcpsock, "PermanentError" << endl);
-    TCPIOHandlerStateChange(pident->lowState, IOState::Done, pident);
-    return LWResult::Result::PermanentError;
-  }
-  else if (data.empty()) {// error, EOF or other
-    // fd housekeeping done by TCPIOHandlerIO
-    TCPLOG(pident->tcpsock, "EOF" << endl);
-    return LWResult::Result::PermanentError;
-  }
-
-  TCPLOG(pident->tcpsock, "arecvtcp success" << endl);
-  return LWResult::Result::Success;
-}
 
 static void handleGenUDPQueryResponse(int fd, FDMultiplexer::funcparam_t& var)
 {
@@ -818,157 +581,8 @@ static void writePid(void)
   }
 }
 
-uint16_t TCPConnection::s_maxInFlight;
-
-TCPConnection::TCPConnection(int fd, const ComboAddress& addr) : data(2, 0), d_remote(addr), d_fd(fd)
-{
-  ++s_currentConnections;
-  (*t_tcpClientCounts)[d_remote]++;
-}
-
-TCPConnection::~TCPConnection()
-{
-  try {
-    if(closesocket(d_fd) < 0)
-      g_log<<Logger::Error<<"Error closing socket for TCPConnection"<<endl;
-  }
-  catch(const PDNSException& e) {
-    g_log<<Logger::Error<<"Error closing TCPConnection socket: "<<e.reason<<endl;
-  }
-
-  if(t_tcpClientCounts->count(d_remote) && !(*t_tcpClientCounts)[d_remote]--)
-    t_tcpClientCounts->erase(d_remote);
-  --s_currentConnections;
-}
-
-std::atomic<uint32_t> TCPConnection::s_currentConnections;
-
-static void terminateTCPConnection(int fd)
-{
-  try {
-    t_fdm->removeReadFD(fd);
-  }
-  catch (const FDMultiplexerException& fde)
-  {
-  }
-}
-
-/* this function is called with both a string and a vector<uint8_t> representing a packet */
-template <class T>
-static bool sendResponseOverTCP(const std::unique_ptr<DNSComboWriter>& dc, const T& packet)
-{
-  uint8_t buf[2];
-  buf[0] = packet.size() / 256;
-  buf[1] = packet.size() % 256;
-
-  Utility::iovec iov[2];
-  iov[0].iov_base = (void*)buf;              iov[0].iov_len = 2;
-  iov[1].iov_base = (void*)&*packet.begin(); iov[1].iov_len = packet.size();
-
-  int wret = Utility::writev(dc->d_socket, iov, 2);
-  bool hadError = true;
-
-  if (wret == 0) {
-    g_log<<Logger::Warning<<"EOF writing TCP answer to "<<dc->getRemote()<<endl;
-  } else if (wret < 0 ) {
-    int err = errno;
-    g_log << Logger::Warning << "Error writing TCP answer to " << dc->getRemote() << ": " << strerror(err) << endl;
-  } else if ((unsigned int)wret != 2 + packet.size()) {
-    g_log<<Logger::Warning<<"Oops, partial answer sent to "<<dc->getRemote()<<" for "<<dc->d_mdp.d_qname<<" (size="<< (2 + packet.size()) <<", sent "<<wret<<")"<<endl;
-  } else {
-    hadError = false;
-  }
-
-  return hadError;
-}
-
-static void sendErrorOverTCP(std::unique_ptr<DNSComboWriter>& dc, int rcode)
-{
-  std::vector<uint8_t> packet;
-  if (dc->d_mdp.d_header.qdcount == 0) {
-    /* header-only */
-    packet.resize(sizeof(dnsheader));
-  }
-  else {
-    DNSPacketWriter pw(packet, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_mdp.d_qclass);
-    if (dc->d_mdp.hasEDNS()) {
-      /* we try to add the EDNS OPT RR even for truncated answers,
-         as rfc6891 states:
-         "The minimal response MUST be the DNS header, question section, and an
-         OPT record.  This MUST also occur when a truncated response (using
-         the DNS header's TC bit) is returned."
-      */
-      pw.addOpt(512, 0, 0);
-      pw.commit();
-    }
-  }
-
-  dnsheader& header = reinterpret_cast<dnsheader&>(packet.at(0));
-  header.aa = 0;
-  header.ra = 1;
-  header.qr = 1;
-  header.tc = 0;
-  header.id = dc->d_mdp.d_header.id;
-  header.rd = dc->d_mdp.d_header.rd;
-  header.cd = dc->d_mdp.d_header.cd;
-  header.rcode = rcode;
-
-  sendResponseOverTCP(dc, packet);
-}
-
-static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var);
 
-static void finishTCPReply(std::unique_ptr<DNSComboWriter>& dc, bool hadError, bool updateInFlight)
-{
-  // update tcp connection status, closing if needed and doing the fd multiplexer accounting
-  if (updateInFlight && dc->d_tcpConnection->d_requestsInFlight > 0) {
-    dc->d_tcpConnection->d_requestsInFlight--;
-  }
-
-  // In the code below, we try to remove the fd from the set, but
-  // we don't know if another mthread already did the remove, so we can get a
-  // "Tried to remove unlisted fd" exception.  Not that an inflight < limit test
-  // will not work since we do not know if the other mthread got an error or not.
-  if (hadError) {
-    terminateTCPConnection(dc->d_socket);
-    dc->d_socket = -1;
-    return;
-  }
-  dc->d_tcpConnection->queriesCount++;
-  if ((g_tcpMaxQueriesPerConn && dc->d_tcpConnection->queriesCount >= g_tcpMaxQueriesPerConn) ||
-      (dc->d_tcpConnection->isDropOnIdle() && dc->d_tcpConnection->d_requestsInFlight == 0)) {
-    try {
-      t_fdm->removeReadFD(dc->d_socket);
-    }
-    catch (FDMultiplexerException &) {
-    }
-    dc->d_socket = -1;
-    return;
-  }
 
-  Utility::gettimeofday(&g_now, nullptr); // needs to be updated
-  struct timeval ttd = g_now;
-
-  // If we cross from max to max-1 in flight requests, the fd was not listened to, add it back
-  if (updateInFlight && dc->d_tcpConnection->d_requestsInFlight == TCPConnection::s_maxInFlight - 1) {
-    // A read error might have happened. If we add the fd back, it will most likely error again.
-    // This is not a big issue, the next handleTCPClientReadable() will see another read error
-    // and take action.
-    ttd.tv_sec += g_tcpTimeout;
-    t_fdm->addReadFD(dc->d_socket, handleRunningTCPQuestion, dc->d_tcpConnection, &ttd);
-    return;
-  }
-  // fd might have been removed by read error code, or a read timeout, so expect an exception
-  try {
-    t_fdm->setReadTTD(dc->d_socket, ttd, g_tcpTimeout);
-  }
-  catch (const FDMultiplexerException &) {
-    // but if the FD was removed because of a timeout while we were sending a response,
-    // we need to re-arm it. If it was an error it will error again.
-    ttd.tv_sec += g_tcpTimeout;
-    t_fdm->addReadFD(dc->d_socket, handleRunningTCPQuestion, dc->d_tcpConnection, &ttd);
-  }
-}
 
 // the idea is, only do things that depend on the *response* here. Incoming accounting is on incoming.
 static void updateResponseStats(int res, const ComboAddress& remote, unsigned int packetsize, const DNSName* query, uint16_t qtype)
@@ -1003,7 +617,7 @@ catch(...)
   return "Exception making error message for exception";
 }
 
-static void protobufLogQuery(LocalStateHolder<LuaConfigItems>& luaconfsLocal, const boost::uuids::uuid& uniqueId, const ComboAddress& remote, const ComboAddress& local, const Netmask& ednssubnet, bool tcp, uint16_t id, size_t len, const DNSName& qname, uint16_t qtype, uint16_t qclass, const std::unordered_set<std::string>& policyTags, const std::string& requestorId, const std::string& deviceId, const std::string& deviceName, const std::map<std::string, RecursorLua4::MetaValue>& meta)
+void protobufLogQuery(LocalStateHolder<LuaConfigItems>& luaconfsLocal, const boost::uuids::uuid& uniqueId, const ComboAddress& remote, const ComboAddress& local, const Netmask& ednssubnet, bool tcp, uint16_t id, size_t len, const DNSName& qname, uint16_t qtype, uint16_t qclass, const std::unordered_set<std::string>& policyTags, const std::string& requestorId, const std::string& deviceId, const std::string& deviceName, const std::map<std::string, RecursorLua4::MetaValue>& meta)
 {
   if (!t_protobufServers) {
     return;
@@ -1035,7 +649,7 @@ static void protobufLogQuery(LocalStateHolder<LuaConfigItems>& luaconfsLocal, co
   }
 }
 
-static void protobufLogResponse(pdns::ProtoZero::RecMessage& message)
+void protobufLogResponse(pdns::ProtoZero::RecMessage& message)
 {
   if (!t_protobufServers) {
     return;
@@ -1047,7 +661,7 @@ static void protobufLogResponse(pdns::ProtoZero::RecMessage& message)
   }
 }
 
-static void protobufLogResponse(const struct dnsheader* dh, LocalStateHolder<LuaConfigItems>& luaconfsLocal,
+void protobufLogResponse(const struct dnsheader* dh, LocalStateHolder<LuaConfigItems>& luaconfsLocal,
                                 const RecursorPacketCache::OptPBData& pbData, const struct timeval& tv,
                                 bool tcp, const ComboAddress& source, const ComboAddress& destination,
                                 const EDNSSubnetOpts& ednssubnet,
@@ -1284,7 +898,7 @@ static std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>> startProtobuf
   return result;
 }
 
-static bool checkProtobufExport(LocalStateHolder<LuaConfigItems>& luaconfsLocal)
+bool checkProtobufExport(LocalStateHolder<LuaConfigItems>& luaconfsLocal)
 {
   if (!luaconfsLocal->protobufExportConfig.enabled) {
     if (t_protobufServers) {
@@ -1386,7 +1000,7 @@ static std::shared_ptr<std::vector<std::unique_ptr<FrameStreamLogger>>> startFra
   return result;
 }
 
-static bool checkFrameStreamExport(LocalStateHolder<LuaConfigItems>& luaconfsLocal)
+bool checkFrameStreamExport(LocalStateHolder<LuaConfigItems>& luaconfsLocal)
 {
   if (!luaconfsLocal->frameStreamExportConfig.enabled) {
     if (t_frameStreamServers) {
@@ -1617,7 +1231,7 @@ static bool answerIsNOData(uint16_t requestedType, int rcode, const std::vector<
   return true;
 }
 
-static bool isAllowNotifyForZone(DNSName qname)
+bool isAllowNotifyForZone(DNSName qname)
 {
   if (t_allowNotifyFor->empty()) {
     return false;
@@ -1632,7 +1246,7 @@ static bool isAllowNotifyForZone(DNSName qname)
   return false;
 }
 
-static void startDoResolve(void *p)
+void startDoResolve(void *p)
 {
   auto dc=std::unique_ptr<DNSComboWriter>(reinterpret_cast<DNSComboWriter*>(p));
   try {
@@ -2533,7 +2147,7 @@ static void makeControlChannelSocket(int processNum=-1)
   }
 }
 
-static void getQNameAndSubnet(const std::string& question, DNSName* dnsname, uint16_t* qtype, uint16_t* qclass,
+void getQNameAndSubnet(const std::string& question, DNSName* dnsname, uint16_t* qtype, uint16_t* qclass,
                               bool& foundECS, EDNSSubnetOpts* ednssubnet, EDNSOptionViewMap* options,
                               bool& foundXPF, ComboAddress* xpfSource, ComboAddress* xpfDest)
 {
@@ -2604,7 +2218,7 @@ static void getQNameAndSubnet(const std::string& question, DNSName* dnsname, uin
   }
 }
 
-static bool checkForCacheHit(bool qnameParsed, unsigned int tag, const string& data,
+bool checkForCacheHit(bool qnameParsed, unsigned int tag, const string& data,
                              DNSName& qname, uint16_t& qtype, uint16_t& qclass,
                              const struct timeval& now,
                              string& response, uint32_t& qhash,
@@ -2656,7 +2270,7 @@ static void* pleaseWipeCaches(const DNSName& canon, bool subtree, uint16_t qtype
   return nullptr;
 }
 
-static void requestWipeCaches(const DNSName& canon)
+void requestWipeCaches(const DNSName& canon)
 {
   // send a message to the handler thread asking it
   // to wipe all of the caches
@@ -2670,497 +2284,11 @@ static void requestWipeCaches(const DNSName& canon)
   }
 }
 
-/*
- * A helper class that by default closes the incoming TCP connection on destruct
- * If you want to keep the connection alive, call keep() on the guard object
- */
-class RunningTCPQuestionGuard {
-public:
-  RunningTCPQuestionGuard(int fd)
-  {
-    d_fd = fd;
-  }
-  ~RunningTCPQuestionGuard()
-  {
-    if (d_fd != -1) {
-      terminateTCPConnection(d_fd);
-      d_fd = -1;
-    }
-  }
-  void keep()
-  {
-    d_fd = -1;
-  }
-  bool handleTCPReadResult(int fd, ssize_t bytes)
-  {
-    if (bytes == 0) {
-      /* EOF */
-      return false;
-    }
-    else if (bytes < 0) {
-      if (errno != EAGAIN && errno != EWOULDBLOCK) {
-        return false;
-      }
-    }
-    keep();
-    return true;
-  }
-
-private:
-  int d_fd{-1};
-};
-
-static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var)
-{
-  shared_ptr<TCPConnection> conn=boost::any_cast<shared_ptr<TCPConnection> >(var);
-
-  RunningTCPQuestionGuard tcpGuard{fd};
-
-  if (conn->state == TCPConnection::PROXYPROTOCOLHEADER) {
-    ssize_t bytes = recv(conn->getFD(), &conn->data.at(conn->proxyProtocolGot), conn->proxyProtocolNeed, 0);
-    if (bytes <= 0) {
-      tcpGuard.handleTCPReadResult(fd, bytes);
-      return;
-    }
-
-    conn->proxyProtocolGot += bytes;
-    conn->data.resize(conn->proxyProtocolGot);
-    ssize_t remaining = isProxyHeaderComplete(conn->data);
-    if (remaining == 0) {
-      if (g_logCommonErrors) {
-        g_log<<Logger::Error<<"Unable to consume proxy protocol header in packet from TCP client "<< conn->d_remote.toStringWithPort() <<endl;
-      }
-      ++g_stats.proxyProtocolInvalidCount;
-      return;
-    }
-    else if (remaining < 0) {
-      conn->proxyProtocolNeed = -remaining;
-      conn->data.resize(conn->proxyProtocolGot + conn->proxyProtocolNeed);
-      tcpGuard.keep();
-      return;
-    }
-    else {
-      /* proxy header received */
-      /* we ignore the TCP field for now, but we could properly set whether
-         the connection was received over UDP or TCP if needed */
-      bool tcp;
-      bool proxy = false;
-      size_t used = parseProxyHeader(conn->data, proxy, conn->d_source, conn->d_destination, tcp, conn->proxyProtocolValues);
-      if (used <= 0) {
-        if (g_logCommonErrors) {
-          g_log<<Logger::Error<<"Unable to parse proxy protocol header in packet from TCP client "<< conn->d_remote.toStringWithPort() <<endl;
-        }
-        ++g_stats.proxyProtocolInvalidCount;
-        return;
-      }
-      else if (static_cast<size_t>(used) > g_proxyProtocolMaximumSize) {
-        if (g_logCommonErrors) {
-          g_log<<Logger::Error<<"Proxy protocol header in packet from TCP client "<< conn->d_remote.toStringWithPort() << " is larger than proxy-protocol-maximum-size (" << used << "), dropping"<< endl;
-        }
-        ++g_stats.proxyProtocolInvalidCount;
-        return;
-      }
-
-      /* Now that we have retrieved the address of the client, as advertised by the proxy
-         via the proxy protocol header, check that it is allowed by our ACL */
-      /* note that if the proxy header used a 'LOCAL' command, the original source and destination are untouched so everything should be fine */
-      if (t_allowFrom && !t_allowFrom->match(&conn->d_source)) {
-        if (!g_quiet) {
-          g_log<<Logger::Error<<"["<<MT->getTid()<<"] dropping TCP query from "<<conn->d_source.toString()<<", address not matched by allow-from"<<endl;
-        }
-
-        ++g_stats.unauthorizedTCP;
-        return;
-      }
-
-      conn->data.resize(2);
-      conn->state = TCPConnection::BYTE0;
-    }
-  }
-
-  if (conn->state==TCPConnection::BYTE0) {
-    ssize_t bytes=recv(conn->getFD(), &conn->data[0], 2, 0);
-    if(bytes==1)
-      conn->state=TCPConnection::BYTE1;
-    if(bytes==2) {
-      conn->qlen=(((unsigned char)conn->data[0]) << 8)+ (unsigned char)conn->data[1];
-      conn->data.resize(conn->qlen);
-      conn->bytesread=0;
-      conn->state=TCPConnection::GETQUESTION;
-    }
-    if (bytes <= 0) {
-      tcpGuard.handleTCPReadResult(fd, bytes);
-      return;
-    }
-  }
-
-  if (conn->state==TCPConnection::BYTE1) {
-    ssize_t bytes=recv(conn->getFD(), &conn->data[1], 1, 0);
-    if(bytes==1) {
-      conn->state=TCPConnection::GETQUESTION;
-      conn->qlen=(((unsigned char)conn->data[0]) << 8)+ (unsigned char)conn->data[1];
-      conn->data.resize(conn->qlen);
-      conn->bytesread=0;
-    }
-    if (bytes <= 0) {
-      if (!tcpGuard.handleTCPReadResult(fd, bytes)) {
-        if(g_logCommonErrors) {
-          g_log<<Logger::Error<<"TCP client "<< conn->d_remote.toStringWithPort() <<" disconnected after first byte"<<endl;
-        }
-      }
-      return;
-    }
-  }
-
-  if(conn->state==TCPConnection::GETQUESTION) {
-    ssize_t bytes=recv(conn->getFD(), &conn->data[conn->bytesread], conn->qlen - conn->bytesread, 0);
-    if (bytes <= 0) {
-      if (!tcpGuard.handleTCPReadResult(fd, bytes)) {
-        if(g_logCommonErrors) {
-          g_log<<Logger::Error<<"TCP client "<< conn->d_remote.toStringWithPort() <<" disconnected while reading question body"<<endl;
-        }
-      }
-      return;
-    }
-    else if (bytes > std::numeric_limits<std::uint16_t>::max()) {
-      if(g_logCommonErrors) {
-        g_log<<Logger::Error<<"TCP client "<< conn->d_remote.toStringWithPort() <<" sent an invalid question size while reading question body"<<endl;
-      }
-      return;
-    }
-    conn->bytesread+=(uint16_t)bytes;
-    if(conn->bytesread==conn->qlen) {
-      conn->state = TCPConnection::BYTE0;
-      std::unique_ptr<DNSComboWriter> dc;
-      try {
-        dc = std::make_unique<DNSComboWriter>(conn->data, g_now);
-      }
-      catch(const MOADNSException &mde) {
-        g_stats.clientParseError++;
-        if (g_logCommonErrors) {
-          g_log<<Logger::Error<<"Unable to parse packet from TCP client "<< conn->d_remote.toStringWithPort() <<endl;
-        }
-        return;
-      }
-      dc->d_tcpConnection = conn; // carry the torch
-      dc->setSocket(conn->getFD()); // this is the only time a copy is made of the actual fd
-      dc->d_tcp=true;
-      dc->setRemote(conn->d_remote);
-      dc->setSource(conn->d_source);
-      ComboAddress dest;
-      dest.reset();
-      dest.sin4.sin_family = conn->d_remote.sin4.sin_family;
-      socklen_t len = dest.getSocklen();
-      getsockname(conn->getFD(), (sockaddr*)&dest, &len); // if this fails, we're ok with it
-      dc->setLocal(dest);
-      dc->setDestination(conn->d_destination);
-      /* we can't move this if we want to be able to access the values in
-         all queries sent over this connection */
-      dc->d_proxyProtocolValues = conn->proxyProtocolValues;
-
-      struct timeval start;
-      Utility::gettimeofday(&start, nullptr);
-      DNSName qname;
-      uint16_t qtype=0;
-      uint16_t qclass=0;
-      bool needECS = false;
-      bool needXPF = g_XPFAcl.match(conn->d_remote);
-      string requestorId;
-      string deviceId;
-      string deviceName;
-      bool logQuery = false;
-      bool qnameParsed = false;
-
-      dc->d_eventTrace.setEnabled(SyncRes::s_event_trace_enabled);
-      dc->d_eventTrace.add(RecEventTrace::ReqRecv);
-      auto luaconfsLocal = g_luaconfs.getLocal();
-      if (checkProtobufExport(luaconfsLocal)) {
-        needECS = true;
-      }
-      logQuery = t_protobufServers && luaconfsLocal->protobufExportConfig.logQueries;
-      dc->d_logResponse = t_protobufServers && luaconfsLocal->protobufExportConfig.logResponses;
-
-#ifdef HAVE_FSTRM
-      checkFrameStreamExport(luaconfsLocal);
-#endif
-
-      if(needECS || needXPF || (t_pdl && (t_pdl->d_gettag_ffi || t_pdl->d_gettag)) || dc->d_mdp.d_header.opcode == Opcode::Notify) {
-
-        try {
-          EDNSOptionViewMap ednsOptions;
-          bool xpfFound = false;
-          dc->d_ecsParsed = true;
-          dc->d_ecsFound = false;
-          getQNameAndSubnet(conn->data, &qname, &qtype, &qclass,
-                            dc->d_ecsFound, &dc->d_ednssubnet, g_gettagNeedsEDNSOptions ? &ednsOptions : nullptr,
-                            xpfFound, needXPF ? &dc->d_source : nullptr, needXPF ? &dc->d_destination : nullptr);
-          qnameParsed = true;
-
-          if(t_pdl) {
-            try {
-              if (t_pdl->d_gettag_ffi) {
-                RecursorLua4::FFIParams params(qname, qtype, dc->d_destination, dc->d_source, dc->d_ednssubnet.source, dc->d_data, dc->d_policyTags, dc->d_records, ednsOptions, dc->d_proxyProtocolValues, requestorId, deviceId, deviceName, dc->d_routingTag, dc->d_rcode, dc->d_ttlCap, dc->d_variable, true, logQuery, dc->d_logResponse, dc->d_followCNAMERecords, dc->d_extendedErrorCode, dc->d_extendedErrorExtra, dc->d_responsePaddingDisabled, dc->d_meta);
-                dc->d_eventTrace.add(RecEventTrace::LuaGetTagFFI);
-                dc->d_tag = t_pdl->gettag_ffi(params);
-                dc->d_eventTrace.add(RecEventTrace::LuaGetTagFFI, dc->d_tag, false);
-              }
-              else if (t_pdl->d_gettag) {
-                dc->d_eventTrace.add(RecEventTrace::LuaGetTag);
-                dc->d_tag = t_pdl->gettag(dc->d_source, dc->d_ednssubnet.source, dc->d_destination, qname, qtype, &dc->d_policyTags, dc->d_data, ednsOptions, true, requestorId, deviceId, deviceName, dc->d_routingTag, dc->d_proxyProtocolValues);
-                dc->d_eventTrace.add(RecEventTrace::LuaGetTag, dc->d_tag, false);
-              }
-            }
-            catch(const std::exception& e)  {
-              if(g_logCommonErrors) {
-                g_log<<Logger::Warning<<"Error parsing a query packet qname='"<<qname<<"' for tag determination, setting tag=0: "<<e.what()<<endl;
-              }
-            }
-          }
-        }
-        catch(const std::exception& e)
-        {
-          if (g_logCommonErrors) {
-            g_log<<Logger::Warning<<"Error parsing a query packet for tag determination, setting tag=0: "<<e.what()<<endl;
-          }
-        }
-      }
-
-      if (dc->d_tag == 0 && !dc->d_responsePaddingDisabled && g_paddingFrom.match(dc->d_remote)) {
-        dc->d_tag = g_paddingTag;
-      }
-
-      const struct dnsheader* dh = reinterpret_cast<const struct dnsheader*>(&conn->data[0]);
-
-      if (t_protobufServers || t_outgoingProtobufServers) {
-        dc->d_requestorId = requestorId;
-        dc->d_deviceId = deviceId;
-        dc->d_deviceName = deviceName;
-        dc->d_uuid = getUniqueID();
-      }
-
-      if(t_protobufServers) {
-        try {
-
-          if (logQuery && !(luaconfsLocal->protobufExportConfig.taggedOnly && dc->d_policyTags.empty())) {
-            protobufLogQuery(luaconfsLocal, dc->d_uuid, dc->d_source, dc->d_destination, dc->d_ednssubnet.source, true, dh->id, conn->qlen, qname, qtype, qclass, dc->d_policyTags, dc->d_requestorId, dc->d_deviceId, dc->d_deviceName, dc->d_meta);
-          }
-        }
-        catch (const std::exception& e) {
-          if (g_logCommonErrors) {
-            g_log<<Logger::Warning<<"Error parsing a TCP query packet for edns subnet: "<<e.what()<<endl;
-          }
-        }
-      }
-
-      if (t_pdl) {
-        bool ipf = t_pdl->ipfilter(dc->d_source, dc->d_destination, *dh, dc->d_eventTrace);
-        if (ipf) {
-          if (!g_quiet) {
-            g_log<<Logger::Notice<<t_id<<" ["<<MT->getTid()<<"/"<<MT->numProcesses()<<"] DROPPED TCP question from "<<dc->d_source.toStringWithPort()<<(dc->d_source != dc->d_remote ? " (via "+dc->d_remote.toStringWithPort()+")" : "")<<" based on policy"<<endl;
-          }
-          g_stats.policyDrops++;
-          return;
-        }
-      }
-
-      if (dc->d_mdp.d_header.qr) {
-        g_stats.ignoredCount++;
-        if (g_logCommonErrors) {
-          g_log<<Logger::Error<<"Ignoring answer from TCP client "<< dc->getRemote() <<" on server socket!"<<endl;
-        }
-        return;
-      }
-      if (dc->d_mdp.d_header.opcode != Opcode::Query && dc->d_mdp.d_header.opcode != Opcode::Notify) {
-        g_stats.ignoredCount++;
-        if (g_logCommonErrors) {
-          g_log<<Logger::Error<<"Ignoring unsupported opcode "<<Opcode::to_s(dc->d_mdp.d_header.opcode)<<" from TCP client "<< dc->getRemote() <<" on server socket!"<<endl;
-        }
-        sendErrorOverTCP(dc, RCode::NotImp);
-        tcpGuard.keep();
-        return;
-      }
-      else if (dh->qdcount == 0) {
-        g_stats.emptyQueriesCount++;
-        if (g_logCommonErrors) {
-          g_log<<Logger::Error<<"Ignoring empty (qdcount == 0) query from "<< dc->getRemote() <<" on server socket!"<<endl;
-        }
-        sendErrorOverTCP(dc, RCode::NotImp);
-        tcpGuard.keep();
-        return;
-      }
-      else {
-        // We have read a proper query
-        ++g_stats.qcounter;
-        ++g_stats.tcpqcounter;
-
-        if(dc->d_mdp.d_header.opcode == Opcode::Notify) {
-          if(!t_allowNotifyFrom || !t_allowNotifyFrom->match(dc->d_source)) {
-            if(!g_quiet) {
-              g_log<<Logger::Error<<"["<<MT->getTid()<<"] dropping TCP NOTIFY from "<<dc->d_source.toString()<<", address not matched by allow-notify-from"<<endl;
-            }
-
-            g_stats.sourceDisallowedNotify++;
-            return;
-          }
-
-          if(!isAllowNotifyForZone(qname)) {
-            if(!g_quiet) {
-              g_log<<Logger::Error<<"["<<MT->getTid()<<"] dropping TCP NOTIFY from "<<dc->d_source.toString()<<", for "<<qname.toLogString()<<", zone not matched by allow-notify-for"<<endl;
-            }
-
-            g_stats.zoneDisallowedNotify++;
-            return;
-          }
-        }
-
-        string response;
-        RecursorPacketCache::OptPBData pbData{boost::none};
-
-        if(dc->d_mdp.d_header.opcode == Opcode::Query) {
-          /* It might seem like a good idea to skip the packet cache lookup if we know that the answer is not cacheable,
-             but it means that the hash would not be computed. If some script decides at a later time to mark back the answer
-             as cacheable we would cache it with a wrong tag, so better safe than sorry. */
-          dc->d_eventTrace.add(RecEventTrace::PCacheCheck);
-          bool cacheHit = checkForCacheHit(qnameParsed, dc->d_tag, conn->data, qname, qtype, qclass, g_now, response, dc->d_qhash, pbData, true, dc->d_source);
-          dc->d_eventTrace.add(RecEventTrace::PCacheCheck, cacheHit, false);
-
-          if (cacheHit) {
-            if (!g_quiet) {
-              g_log<<Logger::Notice<<t_id<< " TCP question answered from packet cache tag="<<dc->d_tag<<" from "<<dc->d_source.toStringWithPort()<<(dc->d_source != dc->d_remote ? " (via "+dc->d_remote.toStringWithPort()+")" : "")<<endl;
-            }
-
-            bool hadError = sendResponseOverTCP(dc, response);
-            finishTCPReply(dc, hadError, false);
-            struct timeval now;
-            Utility::gettimeofday(&now, nullptr);
-            uint64_t spentUsec = uSec(now - start);
-            g_stats.cumulativeAnswers(spentUsec);
-            dc->d_eventTrace.add(RecEventTrace::AnswerSent);
-
-            if (t_protobufServers && dc->d_logResponse && !(luaconfsLocal->protobufExportConfig.taggedOnly && pbData && !pbData->d_tagged)) {
-              struct timeval tv{0, 0};
-              protobufLogResponse(dh, luaconfsLocal, pbData, tv, true, dc->d_source, dc->d_destination, dc->d_ednssubnet, dc->d_uuid, dc->d_requestorId, dc->d_deviceId, dc->d_deviceName, dc->d_meta, dc->d_eventTrace);
-            }
-
-            if (dc->d_eventTrace.enabled() && SyncRes::s_event_trace_enabled & SyncRes::event_trace_to_log) {
-              g_log << Logger::Info << dc->d_eventTrace.toString() << endl;
-            }
-            tcpGuard.keep();
-            return;
-          } // cache hit
-        } // query opcode
-
-        if(dc->d_mdp.d_header.opcode == Opcode::Notify) {
-          if (!g_quiet) {
-            g_log<<Logger::Notice<<t_id<< " got NOTIFY for "<<qname.toLogString()<<" from "<<dc->d_source.toStringWithPort()<<(dc->d_source != dc->d_remote ? " (via "+dc->d_remote.toStringWithPort()+")" : "")<<endl;
-          }
-
-          requestWipeCaches(qname);
-
-          // the operation will now be treated as a Query, generating
-          // a normal response, as the rest of the code does not
-          // check dh->opcode, but we need to ensure that the response
-          // to this request does not get put into the packet cache
-          dc->d_variable = true;
-        }
-
-        // setup for startDoResolve() in an mthread
-        ++conn->d_requestsInFlight;
-        if (conn->d_requestsInFlight >= TCPConnection::s_maxInFlight) {
-          t_fdm->removeReadFD(fd); // should no longer awake ourselves when there is data to read
-        } else {
-          Utility::gettimeofday(&g_now, nullptr); // needed?
-          struct timeval ttd = g_now;
-          t_fdm->setReadTTD(fd, ttd, g_tcpTimeout);
-        }
-        tcpGuard.keep();
-        MT->makeThread(startDoResolve, dc.release()); // deletes dc
-      } // good query
-    } // read full query
-  } // reading query
-
-  // more to come
-  tcpGuard.keep();
-}
-
-static bool expectProxyProtocol(const ComboAddress& from)
+bool expectProxyProtocol(const ComboAddress& from)
 {
   return g_proxyProtocolACL.match(from);
 }
 
-//! Handle new incoming TCP connection
-static void handleNewTCPQuestion(int fd, FDMultiplexer::funcparam_t& )
-{
-  ComboAddress addr;
-  socklen_t addrlen=sizeof(addr);
-  int newsock=accept(fd, (struct sockaddr*)&addr, &addrlen);
-  if(newsock>=0) {
-    if(MT->numProcesses() > g_maxMThreads) {
-      g_stats.overCapacityDrops++;
-      try {
-        closesocket(newsock);
-      }
-      catch(const PDNSException& e) {
-        g_log<<Logger::Error<<"Error closing TCP socket after an over capacity drop: "<<e.reason<<endl;
-      }
-      return;
-    }
-
-    if(t_remotes) {
-      t_remotes->push_back(addr);
-    }
-
-    bool fromProxyProtocolSource = expectProxyProtocol(addr);
-    if(t_allowFrom && !t_allowFrom->match(&addr) && !fromProxyProtocolSource) {
-      if(!g_quiet)
-        g_log<<Logger::Error<<"["<<MT->getTid()<<"] dropping TCP query from "<<addr.toString()<<", address neither matched by allow-from nor proxy-protocol-from"<<endl;
-
-      g_stats.unauthorizedTCP++;
-      try {
-        closesocket(newsock);
-      }
-      catch(const PDNSException& e) {
-        g_log<<Logger::Error<<"Error closing TCP socket after an ACL drop: "<<e.reason<<endl;
-      }
-      return;
-    }
-
-    if(g_maxTCPPerClient && t_tcpClientCounts->count(addr) && (*t_tcpClientCounts)[addr] >= g_maxTCPPerClient) {
-      g_stats.tcpClientOverflow++;
-      try {
-        closesocket(newsock); // don't call TCPConnection::closeAndCleanup here - did not enter it in the counts yet!
-      }
-      catch(const PDNSException& e) {
-        g_log<<Logger::Error<<"Error closing TCP socket after an overflow drop: "<<e.reason<<endl;
-      }
-      return;
-    }
-
-    setNonBlocking(newsock);
-    std::shared_ptr<TCPConnection> tc = std::make_shared<TCPConnection>(newsock, addr);
-    tc->d_source = addr;
-    tc->d_destination.reset();
-    tc->d_destination.sin4.sin_family = addr.sin4.sin_family;
-    socklen_t len = tc->d_destination.getSocklen();
-    getsockname(tc->getFD(), reinterpret_cast<sockaddr*>(&tc->d_destination), &len); // if this fails, we're ok with it
-
-    if (fromProxyProtocolSource) {
-      tc->proxyProtocolNeed = s_proxyProtocolMinimumHeaderSize;
-      tc->data.resize(tc->proxyProtocolNeed);
-      tc->state = TCPConnection::PROXYPROTOCOLHEADER;
-    }
-    else {
-      tc->state = TCPConnection::BYTE0;
-    }
-
-    struct timeval ttd;
-    Utility::gettimeofday(&ttd, nullptr);
-    ttd.tv_sec += g_tcpTimeout;
-
-    t_fdm->addReadFD(tc->getFD(), handleRunningTCPQuestion, tc, &ttd);
-  }
-}
 
 static string* doProcessUDPQuestion(const std::string& question, const ComboAddress& fromaddr, const ComboAddress& destaddr, ComboAddress source, ComboAddress destination, struct timeval tv, int fd, std::vector<ProxyProtocolValue>& proxyProtocolValues, RecEventTrace& eventTrace)
 {
@@ -3616,145 +2744,6 @@ static void handleNewUDPQuestion(int fd, FDMultiplexer::funcparam_t& var)
   }
 }
 
-static void checkFastOpenSysctl(bool active)
-{
-#ifdef __linux__
-  string line;
-  if (readFileIfThere("/proc/sys/net/ipv4/tcp_fastopen", &line)) {
-    int flag = std::stoi(line);
-    if (active && !(flag & 1)) {
-      g_log << Logger::Error << "tcp-fast-open-connect enabled but net.ipv4.tcp_fastopen does not allow it" << endl;
-    }
-    if (!active && !(flag & 2)) {
-      g_log << Logger::Error << "tcp-fast-open enabled but net.ipv4.tcp_fastopen does not allow it" << endl;
-    }
-  }
-  else {
-    g_log << Logger::Notice << "Cannot determine if kernel settings allow fast-open" << endl;
- }
-#else
-  g_log << Logger::Notice << "Cannot determine if kernel settings allow fast-open" << endl;
-#endif
-}
-
-static void checkTFOconnect()
-{
-  try {
-    Socket s(AF_INET, SOCK_STREAM);
-    s.setNonBlocking();
-    s.setFastOpenConnect();
-  }
-  catch (const NetworkError& e) {
-    g_log << Logger::Error << "tcp-fast-open-connect enabled but returned error: " << e.what() << endl;
-  }
-}
-
-static void makeTCPServerSockets(deferredAdd_t& deferredAdds, std::set<int>& tcpSockets)
-{
-  int fd;
-  vector<string>locals;
-  stringtok(locals,::arg()["local-address"]," ,");
-
-  if(locals.empty())
-    throw PDNSException("No local address specified");
-
-  for(vector<string>::const_iterator i=locals.begin();i!=locals.end();++i) {
-    ServiceTuple st;
-    st.port=::arg().asNum("local-port");
-    parseService(*i, st);
-
-    ComboAddress sin;
-
-    sin.reset();
-    sin.sin4.sin_family = AF_INET;
-    if(!IpToU32(st.host, (uint32_t*)&sin.sin4.sin_addr.s_addr)) {
-      sin.sin6.sin6_family = AF_INET6;
-      if(makeIPv6sockaddr(st.host, &sin.sin6) < 0)
-        throw PDNSException("Unable to resolve local address for TCP server on '"+ st.host +"'");
-    }
-
-    fd=socket(sin.sin6.sin6_family, SOCK_STREAM, 0);
-    if(fd<0)
-      throw PDNSException("Making a TCP server socket for resolver: "+stringerror());
-
-    setCloseOnExec(fd);
-
-    int tmp=1;
-    if(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &tmp, sizeof tmp)<0) {
-      g_log<<Logger::Error<<"Setsockopt failed for TCP listening socket"<<endl;
-      exit(1);
-    }
-    if(sin.sin6.sin6_family == AF_INET6 && setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &tmp, sizeof(tmp)) < 0) {
-      int err = errno;
-      g_log<<Logger::Error<<"Failed to set IPv6 socket to IPv6 only, continuing anyhow: "<<strerror(err)<<endl;
-    }
-
-#ifdef TCP_DEFER_ACCEPT
-    if(setsockopt(fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &tmp, sizeof tmp) >= 0) {
-      if(i==locals.begin())
-        g_log<<Logger::Info<<"Enabled TCP data-ready filter for (slight) DoS protection"<<endl;
-    }
-#endif
-
-    if( ::arg().mustDo("non-local-bind") )
-       Utility::setBindAny(AF_INET, fd);
-
-    if (g_reusePort) {
-#if defined(SO_REUSEPORT_LB)
-      try {
-        SSetsockopt(fd, SOL_SOCKET, SO_REUSEPORT_LB, 1);
-      }
-      catch (const std::exception& e) {
-        throw PDNSException(std::string("SO_REUSEPORT_LB: ") + e.what());
-      }
-#elif defined(SO_REUSEPORT)
-      try {
-        SSetsockopt(fd, SOL_SOCKET, SO_REUSEPORT, 1);
-      }
-      catch (const std::exception& e) {
-        throw PDNSException(std::string("SO_REUSEPORT: ") + e.what());
-      }
-#endif
-    }
-
-    if (SyncRes::s_tcp_fast_open > 0) {
-      checkFastOpenSysctl(false);
-#ifdef TCP_FASTOPEN
-      if (setsockopt(fd, IPPROTO_TCP, TCP_FASTOPEN, &SyncRes::s_tcp_fast_open, sizeof SyncRes::s_tcp_fast_open) < 0) {
-        int err = errno;
-        g_log<<Logger::Error<<"Failed to enable TCP Fast Open for listening socket: "<<strerror(err)<<endl;
-      }
-#else
-      g_log<<Logger::Warning<<"TCP Fast Open configured but not supported for listening socket"<<endl;
-#endif
-    }
-
-    sin.sin4.sin_port = htons(st.port);
-    socklen_t socklen=sin.sin4.sin_family==AF_INET ? sizeof(sin.sin4) : sizeof(sin.sin6);
-    if (::bind(fd, (struct sockaddr *)&sin, socklen )<0)
-      throw PDNSException("Binding TCP server socket for "+ st.host +": "+stringerror());
-
-    setNonBlocking(fd);
-    try {
-      setSocketSendBuffer(fd, 65000);
-    }
-    catch (const std::exception& e) {
-      g_log<<Logger::Error<<e.what()<<endl;
-    }
-
-    listen(fd, 128);
-    deferredAdds.emplace_back(fd, handleNewTCPQuestion);
-    tcpSockets.insert(fd);
-
-    // we don't need to update g_listenSocketsAddresses since it doesn't work for TCP/IP:
-    //  - fd is not that which we know here, but returned from accept()
-    if(sin.sin4.sin_family == AF_INET)
-      g_log<<Logger::Info<<"Listening for TCP queries on "<< sin.toString() <<":"<<st.port<<endl;
-    else
-      g_log<<Logger::Info<<"Listening for TCP queries on ["<< sin.toString() <<"]:"<<st.port<<endl;
-  }
-}
-
 static void makeUDPServerSockets(deferredAdd_t& deferredAdds)
 {
   int one=1;
@@ -4410,164 +3399,6 @@ static void handleRCC(int fd, FDMultiplexer::funcparam_t& var)
   }
 }
 
-static void TCPIOHandlerStateChange(IOState oldstate, IOState newstate, std::shared_ptr<PacketID>& pid)
-{
-  TCPLOG(pid->tcpsock, "State transation " << int(oldstate) << "->" << int(newstate) << endl);
-
-  pid->lowState = newstate;
-
-  // handle state transitions
-  switch (oldstate) {
-  case IOState::NeedRead:
-
-    switch (newstate) {
-    case IOState::NeedWrite:
-      TCPLOG(pid->tcpsock, "NeedRead -> NeedWrite: flip FD" << endl);
-      t_fdm->alterFDToWrite(pid->tcpsock, TCPIOHandlerIO, pid);
-      break;
-    case IOState::NeedRead:
-      break;
-    case IOState::Done:
-      TCPLOG(pid->tcpsock, "Done -> removeReadFD" << endl);
-      t_fdm->removeReadFD(pid->tcpsock);
-      break;
-    case IOState::Async:
-      throw std::runtime_error("TLS async mode not supported");
-      break;
-    }
-    break;
-
-  case IOState::NeedWrite:
-
-    switch (newstate) {
-    case IOState::NeedRead:
-      TCPLOG(pid->tcpsock, "NeedWrite -> NeedRead: flip FD" << endl);
-      t_fdm->alterFDToRead(pid->tcpsock, TCPIOHandlerIO, pid);
-      break;
-    case IOState::NeedWrite:
-      break;
-    case IOState::Done:
-      TCPLOG(pid->tcpsock, "Done -> removeWriteFD" << endl);
-      t_fdm->removeWriteFD(pid->tcpsock);
-      break;
-    case IOState::Async:
-      throw std::runtime_error("TLS async mode not supported");
-      break;
-    }
-    break;
-
-  case IOState::Done:
-    switch (newstate) {
-    case IOState::NeedRead:
-      TCPLOG(pid->tcpsock, "NeedRead: addReadFD" << endl);
-      t_fdm->addReadFD(pid->tcpsock, TCPIOHandlerIO, pid);
-      break;
-    case IOState::NeedWrite:
-      TCPLOG(pid->tcpsock, "NeedWrite: addWriteFD" << endl);
-      t_fdm->addWriteFD(pid->tcpsock, TCPIOHandlerIO, pid);
-      break;
-    case IOState::Done:
-      break;
-    case IOState::Async:
-      throw std::runtime_error("TLS async mode not supported");
-      break;
-    }
-    break;
-
-  case IOState::Async:
-    throw std::runtime_error("TLS async mode not supported");
-    break;
-  }
-
-}
-
-static void TCPIOHandlerIO(int fd, FDMultiplexer::funcparam_t& var)
-{
-  std::shared_ptr<PacketID> pid = boost::any_cast<std::shared_ptr<PacketID>>(var);
-  assert(pid->tcphandler);
-  assert(fd == pid->tcphandler->getDescriptor());
-  IOState newstate = IOState::Done;
-
-  TCPLOG(pid->tcpsock, "TCPIOHandlerIO: lowState " << int(pid->lowState) << endl);
-
-  // In the code below, we want to update the state of the fd before calling sendEvent
-  // a sendEvent might close the fd, and some poll multiplexers do not like to manipulate a closed fd
-
-  switch (pid->highState) {
-  case TCPAction::DoingRead:
-    TCPLOG(pid->tcpsock, "highState: Reading" << endl);
-    // In arecvtcp, the buffer was resized already so inWanted bytes will fit
-    // try reading
-    try {
-      newstate = pid->tcphandler->tryRead(pid->inMSG, pid->inPos, pid->inWanted);
-      switch (newstate) {
-      case IOState::Done:
-      case IOState::NeedRead:
-        TCPLOG(pid->tcpsock, "tryRead: Done or NeedRead " << int(newstate) << ' ' << pid->inPos << '/' << pid->inWanted << endl);
-        TCPLOG(pid->tcpsock, "TCPIOHandlerIO " << pid->inWanted << ' ' << pid->inIncompleteOkay << endl);
-        if (pid->inPos == pid->inWanted || (pid->inIncompleteOkay && pid->inPos > 0)) {
-          pid->inMSG.resize(pid->inPos); // old content (if there) + new bytes read, only relevant for the inIncompleteOkay case
-          newstate = IOState::Done;
-          TCPIOHandlerStateChange(pid->lowState, newstate, pid);
-          MT->sendEvent(pid, &pid->inMSG);
-          return;
-        }
-        break;
-      case IOState::NeedWrite:
-        break;
-      case IOState::Async:
-        throw std::runtime_error("TLS async mode not supported");
-        break;
-      }
-    }
-    catch (const std::exception& e) {
-      newstate = IOState::Done;
-      TCPLOG(pid->tcpsock, "read exception..." << e.what() << endl);
-      PacketBuffer empty;
-      TCPIOHandlerStateChange(pid->lowState, newstate, pid);
-      MT->sendEvent(pid, &empty); // this conveys error status
-      return;
-    }
-    break;
-
-  case TCPAction::DoingWrite:
-    TCPLOG(pid->tcpsock, "highState: Writing" << endl);
-    try {
-      TCPLOG(pid->tcpsock, "tryWrite: " << pid->outPos << '/' << pid->outMSG.size() << ' ' << " -> ");
-      newstate = pid->tcphandler->tryWrite(pid->outMSG, pid->outPos, pid->outMSG.size());
-      TCPLOG(pid->tcpsock, pid->outPos << '/' << pid->outMSG.size() << endl);
-      switch (newstate) {
-      case IOState::Done: {
-        TCPLOG(pid->tcpsock, "tryWrite: Done" << endl);
-        TCPIOHandlerStateChange(pid->lowState, newstate, pid);
-        MT->sendEvent(pid, &pid->outMSG); // send back what we sent to convey everything is ok
-        return;
-      }
-      case IOState::NeedRead:
-        TCPLOG(pid->tcpsock, "tryWrite: NeedRead" << endl);
-        break;
-      case IOState::NeedWrite:
-        TCPLOG(pid->tcpsock, "tryWrite: NeedWrite" << endl);
-        break;
-      case IOState::Async:
-        throw std::runtime_error("TLS async mode not supported");
-        break;
-      }
-    }
-    catch (const std::exception& e) {
-      newstate = IOState::Done;
-      TCPLOG(pid->tcpsock, "write exception..." << e.what() << endl);
-      PacketBuffer sent;
-      TCPIOHandlerStateChange(pid->lowState, newstate, pid);
-      MT->sendEvent(pid, &sent); // we convey error status by sending empty string
-      return;
-    }
-    break;
-  }
-
-  // Cases that did not end up doing a sendEvent
-  TCPIOHandlerStateChange(pid->lowState, newstate, pid);
-}
 
 // resend event to everybody chained onto it
 static void doResends(MT_t::waiters_t::iterator& iter, const std::shared_ptr<PacketID>& resend, const PacketBuffer& content)
index c87c4cc56b5027d6358c0c869416c3d7e623962b..8f325ae428fc023f1f3b503d50a76fb3942e3b27 100644 (file)
@@ -40,6 +40,7 @@
 #include "namespaces.hh"
 #include "rec-taskqueue.hh"
 #include "rec-tcpout.hh"
+#include "rec-main.hh"
 
 std::pair<std::string, std::string> PrefixDashNumberCompare::prefixAndTrailingNum(const std::string& a)
 {
index d1829d4b179f5c30af907cd48c79e290b5562977..420c5de448e32eb985c88b803ea51198d74d8723 100644 (file)
@@ -160,10 +160,12 @@ pdns_recursor_SOURCES = \
        rcpgenerator.cc rcpgenerator.hh \
        rec-carbon.cc \
        rec-eventtrace.cc rec-eventtrace.hh \
+       ec-main.hh rec-main.cc \
        rec-lua-conf.hh rec-lua-conf.cc \
        rec-protozero.cc rec-protozero.hh \
        rec-snmp.hh rec-snmp.cc \
        rec-taskqueue.cc rec-taskqueue.hh \
+       rec-tcp.cc \
         rec-tcpout.cc rec-tcpout.hh \
         rec-zonetocache.cc rec-zonetocache.hh \
        rec_channel.cc rec_channel.hh rec_metrics.hh \
diff --git a/pdns/recursordist/rec-main.cc b/pdns/recursordist/rec-main.cc
new file mode 100644 (file)
index 0000000..eff5f89
--- /dev/null
@@ -0,0 +1,24 @@
+/*
+ * This file is part of PowerDNS or dnsdist.
+ * Copyright -- PowerDNS.COM B.V. and its contributors
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of version 2 of the GNU General Public License as
+ * published by the Free Software Foundation.
+ *
+ * In addition, for the avoidance of any doubt, permission is granted to
+ * link this program with OpenSSL and to (re)distribute the binaries
+ * produced as the result of such linking.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#include "rec-main.hh"
+
diff --git a/pdns/recursordist/rec-main.hh b/pdns/recursordist/rec-main.hh
new file mode 100644 (file)
index 0000000..8260427
--- /dev/null
@@ -0,0 +1,222 @@
+/*
+ * This file is part of PowerDNS or dnsdist.
+ * Copyright -- PowerDNS.COM B.V. and its contributors
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of version 2 of the GNU General Public License as
+ * published by the Free Software Foundation.
+ *
+ * In addition, for the avoidance of any doubt, permission is granted to
+ * link this program with OpenSSL and to (re)distribute the binaries
+ * produced as the result of such linking.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#pragma once
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "logger.hh"
+#include "lua-recursor4.hh"
+#include "mplexer.hh"
+#include "namespaces.hh"
+#include "rec-lua-conf.hh"
+#include "rec-protozero.hh"
+#include "syncres.hh"
+
+
+//! used to send information to a newborn mthread
+struct DNSComboWriter {
+  DNSComboWriter(const std::string& query, const struct timeval& now): d_mdp(true, query), d_now(now), d_query(query)
+  {
+  }
+
+  DNSComboWriter(const std::string& query, const struct timeval& now, std::unordered_set<std::string>&& policyTags, LuaContext::LuaObject&& data, std::vector<DNSRecord>&& records): d_mdp(true, query), d_now(now), d_query(query), d_policyTags(std::move(policyTags)), d_records(std::move(records)), d_data(std::move(data))
+  {
+  }
+
+  void setRemote(const ComboAddress& sa)
+  {
+    d_remote=sa;
+  }
+
+  void setSource(const ComboAddress& sa)
+  {
+    d_source=sa;
+  }
+
+  void setLocal(const ComboAddress& sa)
+  {
+    d_local=sa;
+  }
+
+  void setDestination(const ComboAddress& sa)
+  {
+    d_destination=sa;
+  }
+
+  void setSocket(int sock)
+  {
+    d_socket=sock;
+  }
+
+  string getRemote() const
+  {
+    if (d_source == d_remote) {
+      return d_source.toStringWithPort();
+    }
+    return d_source.toStringWithPort() + " (proxied by " + d_remote.toStringWithPort() + ")";
+  }
+
+  std::vector<ProxyProtocolValue> d_proxyProtocolValues;
+  MOADNSParser d_mdp;
+  struct timeval d_now;
+  /* Remote client, might differ from d_source
+     in case of XPF, in which case d_source holds
+     the IP of the client and d_remote of the proxy
+  */
+  ComboAddress d_remote;
+  ComboAddress d_source;
+  /* Destination address, might differ from
+     d_destination in case of XPF, in which case
+     d_destination holds the IP of the proxy and
+     d_local holds our own. */
+  ComboAddress d_local;
+  ComboAddress d_destination;
+  RecEventTrace d_eventTrace;
+  boost::uuids::uuid d_uuid;
+  string d_requestorId;
+  string d_deviceId;
+  string d_deviceName;
+  struct timeval d_kernelTimestamp{0,0};
+  std::string d_query;
+  std::unordered_set<std::string> d_policyTags;
+  std::string d_routingTag;
+  std::vector<DNSRecord> d_records;
+  LuaContext::LuaObject d_data;
+  EDNSSubnetOpts d_ednssubnet;
+  shared_ptr<TCPConnection> d_tcpConnection;
+  boost::optional<uint16_t> d_extendedErrorCode{boost::none};
+  string d_extendedErrorExtra;
+  boost::optional<int> d_rcode{boost::none};
+  int d_socket{-1};
+  unsigned int d_tag{0};
+  uint32_t d_qhash{0};
+  uint32_t d_ttlCap{std::numeric_limits<uint32_t>::max()};
+  bool d_variable{false};
+  bool d_ecsFound{false};
+  bool d_ecsParsed{false};
+  bool d_followCNAMERecords{false};
+  bool d_logResponse{false};
+  bool d_tcp{false};
+  bool d_responsePaddingDisabled{false};
+  std::map<std::string, RecursorLua4::MetaValue> d_meta;
+};
+
+
+typedef MTasker<std::shared_ptr<PacketID>, PacketBuffer, PacketIDCompare> MT_t;
+extern thread_local std::unique_ptr<MT_t> MT; // the big MTasker
+
+extern thread_local FDMultiplexer* t_fdm;
+extern bool g_logCommonErrors;
+extern size_t g_proxyProtocolMaximumSize;
+extern std::atomic<bool> g_quiet;
+extern NetmaskGroup g_XPFAcl;
+extern thread_local std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>> t_protobufServers;
+extern thread_local std::shared_ptr<RecursorLua4> t_pdl;
+extern bool g_gettagNeedsEDNSOptions;
+extern NetmaskGroup g_paddingFrom;
+extern unsigned int g_paddingTag;
+extern thread_local std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>> t_outgoingProtobufServers;
+extern unsigned int g_maxMThreads;
+extern bool g_reusePort;
+extern bool g_anyToTcp;
+extern size_t g_tcpMaxQueriesPerConn;
+extern unsigned int g_maxTCPPerClient;
+extern int g_tcpTimeout;
+
+typedef map<ComboAddress, uint32_t, ComboAddress::addressOnlyLessThan> tcpClientCounts_t;
+extern thread_local std::unique_ptr<tcpClientCounts_t> t_tcpClientCounts;
+
+typedef vector<pair<int, boost::function< void(int, boost::any&) > > > deferredAdd_t;
+
+inline MT_t* getMT()
+{
+  return MT ? MT.get() : nullptr;
+}
+
+extern thread_local unsigned int t_id;
+
+inline unsigned int getRecursorThreadId()
+{
+  return t_id;
+}
+
+/* this function is called with both a string and a vector<uint8_t> representing a packet */
+template <class T>
+static bool sendResponseOverTCP(const std::unique_ptr<DNSComboWriter>& dc, const T& packet)
+{
+  uint8_t buf[2];
+  buf[0] = packet.size() / 256;
+  buf[1] = packet.size() % 256;
+
+  Utility::iovec iov[2];
+  iov[0].iov_base = (void*)buf;              iov[0].iov_len = 2;
+  iov[1].iov_base = (void*)&*packet.begin(); iov[1].iov_len = packet.size();
+
+  int wret = Utility::writev(dc->d_socket, iov, 2);
+  bool hadError = true;
+
+  if (wret == 0) {
+    g_log<<Logger::Warning<<"EOF writing TCP answer to "<<dc->getRemote()<<endl;
+  } else if (wret < 0 ) {
+    int err = errno;
+    g_log << Logger::Warning << "Error writing TCP answer to " << dc->getRemote() << ": " << strerror(err) << endl;
+  } else if ((unsigned int)wret != 2 + packet.size()) {
+    g_log<<Logger::Warning<<"Oops, partial answer sent to "<<dc->getRemote()<<" for "<<dc->d_mdp.d_qname<<" (size="<< (2 + packet.size()) <<", sent "<<wret<<")"<<endl;
+  } else {
+    hadError = false;
+  }
+
+  return hadError;
+}
+
+PacketBuffer GenUDPQueryResponse(const ComboAddress& dest, const string& query);
+bool checkProtobufExport(LocalStateHolder<LuaConfigItems>& luaconfsLocal);
+bool checkFrameStreamExport(LocalStateHolder<LuaConfigItems>& luaconfsLocal);
+void getQNameAndSubnet(const std::string& question, DNSName* dnsname, uint16_t* qtype, uint16_t* qclass,
+                       bool& foundECS, EDNSSubnetOpts* ednssubnet, EDNSOptionViewMap* options,
+                       bool& foundXPF, ComboAddress* xpfSource, ComboAddress* xpfDest);
+void protobufLogQuery(LocalStateHolder<LuaConfigItems>& luaconfsLocal, const boost::uuids::uuid& uniqueId, const ComboAddress& remote, const ComboAddress& local, const Netmask& ednssubnet, bool tcp, uint16_t id, size_t len, const DNSName& qname, uint16_t qtype, uint16_t qclass, const std::unordered_set<std::string>& policyTags, const std::string& requestorId, const std::string& deviceId, const std::string& deviceName, const std::map<std::string, RecursorLua4::MetaValue>& meta);
+bool isAllowNotifyForZone(DNSName qname);
+bool checkForCacheHit(bool qnameParsed, unsigned int tag, const string& data,
+                             DNSName& qname, uint16_t& qtype, uint16_t& qclass,
+                             const struct timeval& now,
+                             string& response, uint32_t& qhash,
+                      RecursorPacketCache::OptPBData& pbData, bool tcp, const ComboAddress& source);
+void protobufLogResponse(pdns::ProtoZero::RecMessage& message);
+void protobufLogResponse(const struct dnsheader* dh, LocalStateHolder<LuaConfigItems>& luaconfsLocal,
+                                const RecursorPacketCache::OptPBData& pbData, const struct timeval& tv,
+                                bool tcp, const ComboAddress& source, const ComboAddress& destination,
+                                const EDNSSubnetOpts& ednssubnet,
+                                const boost::uuids::uuid& uniqueId, const string& requestorId, const string& deviceId,
+                                const string& deviceName, const std::map<std::string, RecursorLua4::MetaValue>& meta,
+                         const RecEventTrace& eventTrace);
+void requestWipeCaches(const DNSName& canon);
+void startDoResolve(void *p);
+bool expectProxyProtocol(const ComboAddress& from);
+void finishTCPReply(std::unique_ptr<DNSComboWriter>& dc, bool hadError, bool updateInFlight);
+void checkFastOpenSysctl(bool active);
+void checkTFOconnect();
+void makeTCPServerSockets(deferredAdd_t& deferredAdds, std::set<int>& tcpSockets);
+void handleNewTCPQuestion(int fd, FDMultiplexer::funcparam_t& );
diff --git a/pdns/recursordist/rec-tcp.cc b/pdns/recursordist/rec-tcp.cc
new file mode 100644 (file)
index 0000000..86740f3
--- /dev/null
@@ -0,0 +1,1077 @@
+/*
+ * This file is part of PowerDNS or dnsdist.
+ * Copyright -- PowerDNS.COM B.V. and its contributors
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of version 2 of the GNU General Public License as
+ * published by the Free Software Foundation.
+ *
+ * In addition, for the avoidance of any doubt, permission is granted to
+ * link this program with OpenSSL and to (re)distribute the binaries
+ * produced as the result of such linking.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#include "rec-main.hh"
+
+#include "arguments.hh"
+#include "logger.hh"
+#include "mplexer.hh"
+#include "uuid-utils.hh"
+
+size_t g_tcpMaxQueriesPerConn;
+unsigned int g_maxTCPPerClient;
+int g_tcpTimeout;
+bool g_anyToTcp;
+
+uint16_t TCPConnection::s_maxInFlight;
+std::atomic<uint32_t> TCPConnection::s_currentConnections;
+
+typedef map<ComboAddress, uint32_t, ComboAddress::addressOnlyLessThan> tcpClientCounts_t;
+thread_local std::unique_ptr<tcpClientCounts_t> t_tcpClientCounts;
+
+static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var);
+
+#if 0
+#define TCPLOG(tcpsock, x) do { cerr << [](){ timeval t; gettimeofday(&t, nullptr); return t.tv_sec % 10  + t.tv_usec/1000000.0; }() << " FD " << (tcpsock) << ' ' << x; } while (0)
+#else
+#define TCPLOG(pid, x)
+#endif
+
+TCPConnection::TCPConnection(int fd, const ComboAddress& addr) : data(2, 0), d_remote(addr), d_fd(fd)
+{
+  ++s_currentConnections;
+  (*t_tcpClientCounts)[d_remote]++;
+}
+
+TCPConnection::~TCPConnection()
+{
+  try {
+    if(closesocket(d_fd) < 0)
+      g_log<<Logger::Error<<"Error closing socket for TCPConnection"<<endl;
+  }
+  catch(const PDNSException& e) {
+    g_log<<Logger::Error<<"Error closing TCPConnection socket: "<<e.reason<<endl;
+  }
+
+  if(t_tcpClientCounts->count(d_remote) && !(*t_tcpClientCounts)[d_remote]--)
+    t_tcpClientCounts->erase(d_remote);
+  --s_currentConnections;
+}
+
+
+static void terminateTCPConnection(int fd)
+{
+  try {
+    t_fdm->removeReadFD(fd);
+  }
+  catch (const FDMultiplexerException& fde)
+  {
+  }
+}
+
+static void sendErrorOverTCP(std::unique_ptr<DNSComboWriter>& dc, int rcode)
+{
+  std::vector<uint8_t> packet;
+  if (dc->d_mdp.d_header.qdcount == 0) {
+    /* header-only */
+    packet.resize(sizeof(dnsheader));
+  }
+  else {
+    DNSPacketWriter pw(packet, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_mdp.d_qclass);
+    if (dc->d_mdp.hasEDNS()) {
+      /* we try to add the EDNS OPT RR even for truncated answers,
+         as rfc6891 states:
+         "The minimal response MUST be the DNS header, question section, and an
+         OPT record.  This MUST also occur when a truncated response (using
+         the DNS header's TC bit) is returned."
+      */
+      pw.addOpt(512, 0, 0);
+      pw.commit();
+    }
+  }
+
+  dnsheader& header = reinterpret_cast<dnsheader&>(packet.at(0));
+  header.aa = 0;
+  header.ra = 1;
+  header.qr = 1;
+  header.tc = 0;
+  header.id = dc->d_mdp.d_header.id;
+  header.rd = dc->d_mdp.d_header.rd;
+  header.cd = dc->d_mdp.d_header.cd;
+  header.rcode = rcode;
+
+  sendResponseOverTCP(dc, packet);
+}
+
+void finishTCPReply(std::unique_ptr<DNSComboWriter>& dc, bool hadError, bool updateInFlight)
+{
+  // update tcp connection status, closing if needed and doing the fd multiplexer accounting
+  if (updateInFlight && dc->d_tcpConnection->d_requestsInFlight > 0) {
+    dc->d_tcpConnection->d_requestsInFlight--;
+  }
+
+  // In the code below, we try to remove the fd from the set, but
+  // we don't know if another mthread already did the remove, so we can get a
+  // "Tried to remove unlisted fd" exception.  Not that an inflight < limit test
+  // will not work since we do not know if the other mthread got an error or not.
+  if (hadError) {
+    terminateTCPConnection(dc->d_socket);
+    dc->d_socket = -1;
+    return;
+  }
+  dc->d_tcpConnection->queriesCount++;
+  if ((g_tcpMaxQueriesPerConn && dc->d_tcpConnection->queriesCount >= g_tcpMaxQueriesPerConn) ||
+      (dc->d_tcpConnection->isDropOnIdle() && dc->d_tcpConnection->d_requestsInFlight == 0)) {
+    try {
+      t_fdm->removeReadFD(dc->d_socket);
+    }
+    catch (FDMultiplexerException &) {
+    }
+    dc->d_socket = -1;
+    return;
+  }
+
+  Utility::gettimeofday(&g_now, nullptr); // needs to be updated
+  struct timeval ttd = g_now;
+
+  // If we cross from max to max-1 in flight requests, the fd was not listened to, add it back
+  if (updateInFlight && dc->d_tcpConnection->d_requestsInFlight == TCPConnection::s_maxInFlight - 1) {
+    // A read error might have happened. If we add the fd back, it will most likely error again.
+    // This is not a big issue, the next handleTCPClientReadable() will see another read error
+    // and take action.
+    ttd.tv_sec += g_tcpTimeout;
+    t_fdm->addReadFD(dc->d_socket, handleRunningTCPQuestion, dc->d_tcpConnection, &ttd);
+    return;
+  }
+  // fd might have been removed by read error code, or a read timeout, so expect an exception
+  try {
+    t_fdm->setReadTTD(dc->d_socket, ttd, g_tcpTimeout);
+  }
+  catch (const FDMultiplexerException &) {
+    // but if the FD was removed because of a timeout while we were sending a response,
+    // we need to re-arm it. If it was an error it will error again.
+    ttd.tv_sec += g_tcpTimeout;
+    t_fdm->addReadFD(dc->d_socket, handleRunningTCPQuestion, dc->d_tcpConnection, &ttd);
+  }
+}
+
+/*
+ * A helper class that by default closes the incoming TCP connection on destruct
+ * If you want to keep the connection alive, call keep() on the guard object
+ */
+class RunningTCPQuestionGuard {
+public:
+  RunningTCPQuestionGuard(int fd)
+  {
+    d_fd = fd;
+  }
+  ~RunningTCPQuestionGuard()
+  {
+    if (d_fd != -1) {
+      terminateTCPConnection(d_fd);
+      d_fd = -1;
+    }
+  }
+  void keep()
+  {
+    d_fd = -1;
+  }
+  bool handleTCPReadResult(int fd, ssize_t bytes)
+  {
+    if (bytes == 0) {
+      /* EOF */
+      return false;
+    }
+    else if (bytes < 0) {
+      if (errno != EAGAIN && errno != EWOULDBLOCK) {
+        return false;
+      }
+    }
+    keep();
+    return true;
+  }
+
+private:
+  int d_fd{-1};
+};
+
+static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var)
+{
+  shared_ptr<TCPConnection> conn=boost::any_cast<shared_ptr<TCPConnection> >(var);
+
+  RunningTCPQuestionGuard tcpGuard{fd};
+
+  if (conn->state == TCPConnection::PROXYPROTOCOLHEADER) {
+    ssize_t bytes = recv(conn->getFD(), &conn->data.at(conn->proxyProtocolGot), conn->proxyProtocolNeed, 0);
+    if (bytes <= 0) {
+      tcpGuard.handleTCPReadResult(fd, bytes);
+      return;
+    }
+
+    conn->proxyProtocolGot += bytes;
+    conn->data.resize(conn->proxyProtocolGot);
+    ssize_t remaining = isProxyHeaderComplete(conn->data);
+    if (remaining == 0) {
+      if (g_logCommonErrors) {
+        g_log<<Logger::Error<<"Unable to consume proxy protocol header in packet from TCP client "<< conn->d_remote.toStringWithPort() <<endl;
+      }
+      ++g_stats.proxyProtocolInvalidCount;
+      return;
+    }
+    else if (remaining < 0) {
+      conn->proxyProtocolNeed = -remaining;
+      conn->data.resize(conn->proxyProtocolGot + conn->proxyProtocolNeed);
+      tcpGuard.keep();
+      return;
+    }
+    else {
+      /* proxy header received */
+      /* we ignore the TCP field for now, but we could properly set whether
+         the connection was received over UDP or TCP if needed */
+      bool tcp;
+      bool proxy = false;
+      size_t used = parseProxyHeader(conn->data, proxy, conn->d_source, conn->d_destination, tcp, conn->proxyProtocolValues);
+      if (used <= 0) {
+        if (g_logCommonErrors) {
+          g_log<<Logger::Error<<"Unable to parse proxy protocol header in packet from TCP client "<< conn->d_remote.toStringWithPort() <<endl;
+        }
+        ++g_stats.proxyProtocolInvalidCount;
+        return;
+      }
+      else if (static_cast<size_t>(used) > g_proxyProtocolMaximumSize) {
+        if (g_logCommonErrors) {
+          g_log<<Logger::Error<<"Proxy protocol header in packet from TCP client "<< conn->d_remote.toStringWithPort() << " is larger than proxy-protocol-maximum-size (" << used << "), dropping"<< endl;
+        }
+        ++g_stats.proxyProtocolInvalidCount;
+        return;
+      }
+
+      /* Now that we have retrieved the address of the client, as advertised by the proxy
+         via the proxy protocol header, check that it is allowed by our ACL */
+      /* note that if the proxy header used a 'LOCAL' command, the original source and destination are untouched so everything should be fine */
+      if (t_allowFrom && !t_allowFrom->match(&conn->d_source)) {
+        if (!g_quiet) {
+          g_log<<Logger::Error<<"["<<MT->getTid()<<"] dropping TCP query from "<<conn->d_source.toString()<<", address not matched by allow-from"<<endl;
+        }
+
+        ++g_stats.unauthorizedTCP;
+        return;
+      }
+
+      conn->data.resize(2);
+      conn->state = TCPConnection::BYTE0;
+    }
+  }
+
+  if (conn->state==TCPConnection::BYTE0) {
+    ssize_t bytes=recv(conn->getFD(), &conn->data[0], 2, 0);
+    if(bytes==1)
+      conn->state=TCPConnection::BYTE1;
+    if(bytes==2) {
+      conn->qlen=(((unsigned char)conn->data[0]) << 8)+ (unsigned char)conn->data[1];
+      conn->data.resize(conn->qlen);
+      conn->bytesread=0;
+      conn->state=TCPConnection::GETQUESTION;
+    }
+    if (bytes <= 0) {
+      tcpGuard.handleTCPReadResult(fd, bytes);
+      return;
+    }
+  }
+
+  if (conn->state==TCPConnection::BYTE1) {
+    ssize_t bytes=recv(conn->getFD(), &conn->data[1], 1, 0);
+    if(bytes==1) {
+      conn->state=TCPConnection::GETQUESTION;
+      conn->qlen=(((unsigned char)conn->data[0]) << 8)+ (unsigned char)conn->data[1];
+      conn->data.resize(conn->qlen);
+      conn->bytesread=0;
+    }
+    if (bytes <= 0) {
+      if (!tcpGuard.handleTCPReadResult(fd, bytes)) {
+        if(g_logCommonErrors) {
+          g_log<<Logger::Error<<"TCP client "<< conn->d_remote.toStringWithPort() <<" disconnected after first byte"<<endl;
+        }
+      }
+      return;
+    }
+  }
+
+  if(conn->state==TCPConnection::GETQUESTION) {
+    ssize_t bytes=recv(conn->getFD(), &conn->data[conn->bytesread], conn->qlen - conn->bytesread, 0);
+    if (bytes <= 0) {
+      if (!tcpGuard.handleTCPReadResult(fd, bytes)) {
+        if(g_logCommonErrors) {
+          g_log<<Logger::Error<<"TCP client "<< conn->d_remote.toStringWithPort() <<" disconnected while reading question body"<<endl;
+        }
+      }
+      return;
+    }
+    else if (bytes > std::numeric_limits<std::uint16_t>::max()) {
+      if(g_logCommonErrors) {
+        g_log<<Logger::Error<<"TCP client "<< conn->d_remote.toStringWithPort() <<" sent an invalid question size while reading question body"<<endl;
+      }
+      return;
+    }
+    conn->bytesread+=(uint16_t)bytes;
+    if(conn->bytesread==conn->qlen) {
+      conn->state = TCPConnection::BYTE0;
+      std::unique_ptr<DNSComboWriter> dc;
+      try {
+        dc = std::make_unique<DNSComboWriter>(conn->data, g_now);
+      }
+      catch(const MOADNSException &mde) {
+        g_stats.clientParseError++;
+        if (g_logCommonErrors) {
+          g_log<<Logger::Error<<"Unable to parse packet from TCP client "<< conn->d_remote.toStringWithPort() <<endl;
+        }
+        return;
+      }
+      dc->d_tcpConnection = conn; // carry the torch
+      dc->setSocket(conn->getFD()); // this is the only time a copy is made of the actual fd
+      dc->d_tcp=true;
+      dc->setRemote(conn->d_remote);
+      dc->setSource(conn->d_source);
+      ComboAddress dest;
+      dest.reset();
+      dest.sin4.sin_family = conn->d_remote.sin4.sin_family;
+      socklen_t len = dest.getSocklen();
+      getsockname(conn->getFD(), (sockaddr*)&dest, &len); // if this fails, we're ok with it
+      dc->setLocal(dest);
+      dc->setDestination(conn->d_destination);
+      /* we can't move this if we want to be able to access the values in
+         all queries sent over this connection */
+      dc->d_proxyProtocolValues = conn->proxyProtocolValues;
+
+      struct timeval start;
+      Utility::gettimeofday(&start, nullptr);
+      DNSName qname;
+      uint16_t qtype=0;
+      uint16_t qclass=0;
+      bool needECS = false;
+      bool needXPF = g_XPFAcl.match(conn->d_remote);
+      string requestorId;
+      string deviceId;
+      string deviceName;
+      bool logQuery = false;
+      bool qnameParsed = false;
+
+      dc->d_eventTrace.setEnabled(SyncRes::s_event_trace_enabled);
+      dc->d_eventTrace.add(RecEventTrace::ReqRecv);
+      auto luaconfsLocal = g_luaconfs.getLocal();
+      if (checkProtobufExport(luaconfsLocal)) {
+        needECS = true;
+      }
+      logQuery = t_protobufServers && luaconfsLocal->protobufExportConfig.logQueries;
+      dc->d_logResponse = t_protobufServers && luaconfsLocal->protobufExportConfig.logResponses;
+
+#ifdef HAVE_FSTRM
+      checkFrameStreamExport(luaconfsLocal);
+#endif
+
+      if(needECS || needXPF || (t_pdl && (t_pdl->d_gettag_ffi || t_pdl->d_gettag)) || dc->d_mdp.d_header.opcode == Opcode::Notify) {
+
+        try {
+          EDNSOptionViewMap ednsOptions;
+          bool xpfFound = false;
+          dc->d_ecsParsed = true;
+          dc->d_ecsFound = false;
+          getQNameAndSubnet(conn->data, &qname, &qtype, &qclass,
+                            dc->d_ecsFound, &dc->d_ednssubnet, g_gettagNeedsEDNSOptions ? &ednsOptions : nullptr,
+                            xpfFound, needXPF ? &dc->d_source : nullptr, needXPF ? &dc->d_destination : nullptr);
+          qnameParsed = true;
+
+          if(t_pdl) {
+            try {
+              if (t_pdl->d_gettag_ffi) {
+                RecursorLua4::FFIParams params(qname, qtype, dc->d_destination, dc->d_source, dc->d_ednssubnet.source, dc->d_data, dc->d_policyTags, dc->d_records, ednsOptions, dc->d_proxyProtocolValues, requestorId, deviceId, deviceName, dc->d_routingTag, dc->d_rcode, dc->d_ttlCap, dc->d_variable, true, logQuery, dc->d_logResponse, dc->d_followCNAMERecords, dc->d_extendedErrorCode, dc->d_extendedErrorExtra, dc->d_responsePaddingDisabled, dc->d_meta);
+                dc->d_eventTrace.add(RecEventTrace::LuaGetTagFFI);
+                dc->d_tag = t_pdl->gettag_ffi(params);
+                dc->d_eventTrace.add(RecEventTrace::LuaGetTagFFI, dc->d_tag, false);
+              }
+              else if (t_pdl->d_gettag) {
+                dc->d_eventTrace.add(RecEventTrace::LuaGetTag);
+                dc->d_tag = t_pdl->gettag(dc->d_source, dc->d_ednssubnet.source, dc->d_destination, qname, qtype, &dc->d_policyTags, dc->d_data, ednsOptions, true, requestorId, deviceId, deviceName, dc->d_routingTag, dc->d_proxyProtocolValues);
+                dc->d_eventTrace.add(RecEventTrace::LuaGetTag, dc->d_tag, false);
+              }
+            }
+            catch(const std::exception& e)  {
+              if(g_logCommonErrors) {
+                g_log<<Logger::Warning<<"Error parsing a query packet qname='"<<qname<<"' for tag determination, setting tag=0: "<<e.what()<<endl;
+              }
+            }
+          }
+        }
+        catch(const std::exception& e)
+        {
+          if (g_logCommonErrors) {
+            g_log<<Logger::Warning<<"Error parsing a query packet for tag determination, setting tag=0: "<<e.what()<<endl;
+          }
+        }
+      }
+
+      if (dc->d_tag == 0 && !dc->d_responsePaddingDisabled && g_paddingFrom.match(dc->d_remote)) {
+        dc->d_tag = g_paddingTag;
+      }
+
+      const struct dnsheader* dh = reinterpret_cast<const struct dnsheader*>(&conn->data[0]);
+
+      if (t_protobufServers || t_outgoingProtobufServers) {
+        dc->d_requestorId = requestorId;
+        dc->d_deviceId = deviceId;
+        dc->d_deviceName = deviceName;
+        dc->d_uuid = getUniqueID();
+      }
+
+      if(t_protobufServers) {
+        try {
+
+          if (logQuery && !(luaconfsLocal->protobufExportConfig.taggedOnly && dc->d_policyTags.empty())) {
+            protobufLogQuery(luaconfsLocal, dc->d_uuid, dc->d_source, dc->d_destination, dc->d_ednssubnet.source, true, dh->id, conn->qlen, qname, qtype, qclass, dc->d_policyTags, dc->d_requestorId, dc->d_deviceId, dc->d_deviceName, dc->d_meta);
+          }
+        }
+        catch (const std::exception& e) {
+          if (g_logCommonErrors) {
+            g_log<<Logger::Warning<<"Error parsing a TCP query packet for edns subnet: "<<e.what()<<endl;
+          }
+        }
+      }
+
+      if (t_pdl) {
+        bool ipf = t_pdl->ipfilter(dc->d_source, dc->d_destination, *dh, dc->d_eventTrace);
+        if (ipf) {
+          if (!g_quiet) {
+            g_log<<Logger::Notice<<t_id<<" ["<<MT->getTid()<<"/"<<MT->numProcesses()<<"] DROPPED TCP question from "<<dc->d_source.toStringWithPort()<<(dc->d_source != dc->d_remote ? " (via "+dc->d_remote.toStringWithPort()+")" : "")<<" based on policy"<<endl;
+          }
+          g_stats.policyDrops++;
+          return;
+        }
+      }
+
+      if (dc->d_mdp.d_header.qr) {
+        g_stats.ignoredCount++;
+        if (g_logCommonErrors) {
+          g_log<<Logger::Error<<"Ignoring answer from TCP client "<< dc->getRemote() <<" on server socket!"<<endl;
+        }
+        return;
+      }
+      if (dc->d_mdp.d_header.opcode != Opcode::Query && dc->d_mdp.d_header.opcode != Opcode::Notify) {
+        g_stats.ignoredCount++;
+        if (g_logCommonErrors) {
+          g_log<<Logger::Error<<"Ignoring unsupported opcode "<<Opcode::to_s(dc->d_mdp.d_header.opcode)<<" from TCP client "<< dc->getRemote() <<" on server socket!"<<endl;
+        }
+        sendErrorOverTCP(dc, RCode::NotImp);
+        tcpGuard.keep();
+        return;
+      }
+      else if (dh->qdcount == 0) {
+        g_stats.emptyQueriesCount++;
+        if (g_logCommonErrors) {
+          g_log<<Logger::Error<<"Ignoring empty (qdcount == 0) query from "<< dc->getRemote() <<" on server socket!"<<endl;
+        }
+        sendErrorOverTCP(dc, RCode::NotImp);
+        tcpGuard.keep();
+        return;
+      }
+      else {
+        // We have read a proper query
+        ++g_stats.qcounter;
+        ++g_stats.tcpqcounter;
+
+        if(dc->d_mdp.d_header.opcode == Opcode::Notify) {
+          if(!t_allowNotifyFrom || !t_allowNotifyFrom->match(dc->d_source)) {
+            if(!g_quiet) {
+              g_log<<Logger::Error<<"["<<MT->getTid()<<"] dropping TCP NOTIFY from "<<dc->d_source.toString()<<", address not matched by allow-notify-from"<<endl;
+            }
+
+            g_stats.sourceDisallowedNotify++;
+            return;
+          }
+
+          if(!isAllowNotifyForZone(qname)) {
+            if(!g_quiet) {
+              g_log<<Logger::Error<<"["<<MT->getTid()<<"] dropping TCP NOTIFY from "<<dc->d_source.toString()<<", for "<<qname.toLogString()<<", zone not matched by allow-notify-for"<<endl;
+            }
+
+            g_stats.zoneDisallowedNotify++;
+            return;
+          }
+        }
+
+        string response;
+        RecursorPacketCache::OptPBData pbData{boost::none};
+
+        if(dc->d_mdp.d_header.opcode == Opcode::Query) {
+          /* It might seem like a good idea to skip the packet cache lookup if we know that the answer is not cacheable,
+             but it means that the hash would not be computed. If some script decides at a later time to mark back the answer
+             as cacheable we would cache it with a wrong tag, so better safe than sorry. */
+          dc->d_eventTrace.add(RecEventTrace::PCacheCheck);
+          bool cacheHit = checkForCacheHit(qnameParsed, dc->d_tag, conn->data, qname, qtype, qclass, g_now, response, dc->d_qhash, pbData, true, dc->d_source);
+          dc->d_eventTrace.add(RecEventTrace::PCacheCheck, cacheHit, false);
+
+          if (cacheHit) {
+            if (!g_quiet) {
+              g_log<<Logger::Notice<<t_id<< " TCP question answered from packet cache tag="<<dc->d_tag<<" from "<<dc->d_source.toStringWithPort()<<(dc->d_source != dc->d_remote ? " (via "+dc->d_remote.toStringWithPort()+")" : "")<<endl;
+            }
+
+            bool hadError = sendResponseOverTCP(dc, response);
+            finishTCPReply(dc, hadError, false);
+            struct timeval now;
+            Utility::gettimeofday(&now, nullptr);
+            uint64_t spentUsec = uSec(now - start);
+            g_stats.cumulativeAnswers(spentUsec);
+            dc->d_eventTrace.add(RecEventTrace::AnswerSent);
+
+            if (t_protobufServers && dc->d_logResponse && !(luaconfsLocal->protobufExportConfig.taggedOnly && pbData && !pbData->d_tagged)) {
+              struct timeval tv{0, 0};
+              protobufLogResponse(dh, luaconfsLocal, pbData, tv, true, dc->d_source, dc->d_destination, dc->d_ednssubnet, dc->d_uuid, dc->d_requestorId, dc->d_deviceId, dc->d_deviceName, dc->d_meta, dc->d_eventTrace);
+            }
+
+            if (dc->d_eventTrace.enabled() && SyncRes::s_event_trace_enabled & SyncRes::event_trace_to_log) {
+              g_log << Logger::Info << dc->d_eventTrace.toString() << endl;
+            }
+            tcpGuard.keep();
+            return;
+          } // cache hit
+        } // query opcode
+
+        if(dc->d_mdp.d_header.opcode == Opcode::Notify) {
+          if (!g_quiet) {
+            g_log<<Logger::Notice<<t_id<< " got NOTIFY for "<<qname.toLogString()<<" from "<<dc->d_source.toStringWithPort()<<(dc->d_source != dc->d_remote ? " (via "+dc->d_remote.toStringWithPort()+")" : "")<<endl;
+          }
+
+          requestWipeCaches(qname);
+
+          // the operation will now be treated as a Query, generating
+          // a normal response, as the rest of the code does not
+          // check dh->opcode, but we need to ensure that the response
+          // to this request does not get put into the packet cache
+          dc->d_variable = true;
+        }
+
+        // setup for startDoResolve() in an mthread
+        ++conn->d_requestsInFlight;
+        if (conn->d_requestsInFlight >= TCPConnection::s_maxInFlight) {
+          t_fdm->removeReadFD(fd); // should no longer awake ourselves when there is data to read
+        } else {
+          Utility::gettimeofday(&g_now, nullptr); // needed?
+          struct timeval ttd = g_now;
+          t_fdm->setReadTTD(fd, ttd, g_tcpTimeout);
+        }
+        tcpGuard.keep();
+        MT->makeThread(startDoResolve, dc.release()); // deletes dc
+      } // good query
+    } // read full query
+  } // reading query
+
+  // more to come
+  tcpGuard.keep();
+}
+
+//! Handle new incoming TCP connection
+void handleNewTCPQuestion(int fd, FDMultiplexer::funcparam_t& )
+{
+  ComboAddress addr;
+  socklen_t addrlen=sizeof(addr);
+  int newsock=accept(fd, (struct sockaddr*)&addr, &addrlen);
+  if(newsock>=0) {
+    if(MT->numProcesses() > g_maxMThreads) {
+      g_stats.overCapacityDrops++;
+      try {
+        closesocket(newsock);
+      }
+      catch(const PDNSException& e) {
+        g_log<<Logger::Error<<"Error closing TCP socket after an over capacity drop: "<<e.reason<<endl;
+      }
+      return;
+    }
+
+    if(t_remotes) {
+      t_remotes->push_back(addr);
+    }
+
+    bool fromProxyProtocolSource = expectProxyProtocol(addr);
+    if(t_allowFrom && !t_allowFrom->match(&addr) && !fromProxyProtocolSource) {
+      if(!g_quiet)
+        g_log<<Logger::Error<<"["<<MT->getTid()<<"] dropping TCP query from "<<addr.toString()<<", address neither matched by allow-from nor proxy-protocol-from"<<endl;
+
+      g_stats.unauthorizedTCP++;
+      try {
+        closesocket(newsock);
+      }
+      catch(const PDNSException& e) {
+        g_log<<Logger::Error<<"Error closing TCP socket after an ACL drop: "<<e.reason<<endl;
+      }
+      return;
+    }
+
+    if(g_maxTCPPerClient && t_tcpClientCounts->count(addr) && (*t_tcpClientCounts)[addr] >= g_maxTCPPerClient) {
+      g_stats.tcpClientOverflow++;
+      try {
+        closesocket(newsock); // don't call TCPConnection::closeAndCleanup here - did not enter it in the counts yet!
+      }
+      catch(const PDNSException& e) {
+        g_log<<Logger::Error<<"Error closing TCP socket after an overflow drop: "<<e.reason<<endl;
+      }
+      return;
+    }
+
+    setNonBlocking(newsock);
+    std::shared_ptr<TCPConnection> tc = std::make_shared<TCPConnection>(newsock, addr);
+    tc->d_source = addr;
+    tc->d_destination.reset();
+    tc->d_destination.sin4.sin_family = addr.sin4.sin_family;
+    socklen_t len = tc->d_destination.getSocklen();
+    getsockname(tc->getFD(), reinterpret_cast<sockaddr*>(&tc->d_destination), &len); // if this fails, we're ok with it
+
+    if (fromProxyProtocolSource) {
+      tc->proxyProtocolNeed = s_proxyProtocolMinimumHeaderSize;
+      tc->data.resize(tc->proxyProtocolNeed);
+      tc->state = TCPConnection::PROXYPROTOCOLHEADER;
+    }
+    else {
+      tc->state = TCPConnection::BYTE0;
+    }
+
+    struct timeval ttd;
+    Utility::gettimeofday(&ttd, nullptr);
+    ttd.tv_sec += g_tcpTimeout;
+
+    t_fdm->addReadFD(tc->getFD(), handleRunningTCPQuestion, tc, &ttd);
+  }
+}
+
+static void TCPIOHandlerIO(int fd, FDMultiplexer::funcparam_t& var);
+
+static void TCPIOHandlerStateChange(IOState oldstate, IOState newstate, std::shared_ptr<PacketID>& pid)
+{
+  TCPLOG(pid->tcpsock, "State transation " << int(oldstate) << "->" << int(newstate) << endl);
+
+  pid->lowState = newstate;
+
+  // handle state transitions
+  switch (oldstate) {
+  case IOState::NeedRead:
+
+    switch (newstate) {
+    case IOState::NeedWrite:
+      TCPLOG(pid->tcpsock, "NeedRead -> NeedWrite: flip FD" << endl);
+      t_fdm->alterFDToWrite(pid->tcpsock, TCPIOHandlerIO, pid);
+      break;
+    case IOState::NeedRead:
+      break;
+    case IOState::Done:
+      TCPLOG(pid->tcpsock, "Done -> removeReadFD" << endl);
+      t_fdm->removeReadFD(pid->tcpsock);
+      break;
+    case IOState::Async:
+      throw std::runtime_error("TLS async mode not supported");
+      break;
+    }
+    break;
+
+  case IOState::NeedWrite:
+
+    switch (newstate) {
+    case IOState::NeedRead:
+      TCPLOG(pid->tcpsock, "NeedWrite -> NeedRead: flip FD" << endl);
+      t_fdm->alterFDToRead(pid->tcpsock, TCPIOHandlerIO, pid);
+      break;
+    case IOState::NeedWrite:
+      break;
+    case IOState::Done:
+      TCPLOG(pid->tcpsock, "Done -> removeWriteFD" << endl);
+      t_fdm->removeWriteFD(pid->tcpsock);
+      break;
+    case IOState::Async:
+      throw std::runtime_error("TLS async mode not supported");
+      break;
+    }
+    break;
+
+  case IOState::Done:
+    switch (newstate) {
+    case IOState::NeedRead:
+      TCPLOG(pid->tcpsock, "NeedRead: addReadFD" << endl);
+      t_fdm->addReadFD(pid->tcpsock, TCPIOHandlerIO, pid);
+      break;
+    case IOState::NeedWrite:
+      TCPLOG(pid->tcpsock, "NeedWrite: addWriteFD" << endl);
+      t_fdm->addWriteFD(pid->tcpsock, TCPIOHandlerIO, pid);
+      break;
+    case IOState::Done:
+      break;
+    case IOState::Async:
+      throw std::runtime_error("TLS async mode not supported");
+      break;
+    }
+    break;
+
+  case IOState::Async:
+    throw std::runtime_error("TLS async mode not supported");
+    break;
+  }
+
+}
+
+static void TCPIOHandlerIO(int fd, FDMultiplexer::funcparam_t& var)
+{
+  std::shared_ptr<PacketID> pid = boost::any_cast<std::shared_ptr<PacketID>>(var);
+  assert(pid->tcphandler);
+  assert(fd == pid->tcphandler->getDescriptor());
+  IOState newstate = IOState::Done;
+
+  TCPLOG(pid->tcpsock, "TCPIOHandlerIO: lowState " << int(pid->lowState) << endl);
+
+  // In the code below, we want to update the state of the fd before calling sendEvent
+  // a sendEvent might close the fd, and some poll multiplexers do not like to manipulate a closed fd
+
+  switch (pid->highState) {
+  case TCPAction::DoingRead:
+    TCPLOG(pid->tcpsock, "highState: Reading" << endl);
+    // In arecvtcp, the buffer was resized already so inWanted bytes will fit
+    // try reading
+    try {
+      newstate = pid->tcphandler->tryRead(pid->inMSG, pid->inPos, pid->inWanted);
+      switch (newstate) {
+      case IOState::Done:
+      case IOState::NeedRead:
+        TCPLOG(pid->tcpsock, "tryRead: Done or NeedRead " << int(newstate) << ' ' << pid->inPos << '/' << pid->inWanted << endl);
+        TCPLOG(pid->tcpsock, "TCPIOHandlerIO " << pid->inWanted << ' ' << pid->inIncompleteOkay << endl);
+        if (pid->inPos == pid->inWanted || (pid->inIncompleteOkay && pid->inPos > 0)) {
+          pid->inMSG.resize(pid->inPos); // old content (if there) + new bytes read, only relevant for the inIncompleteOkay case
+          newstate = IOState::Done;
+          TCPIOHandlerStateChange(pid->lowState, newstate, pid);
+          MT->sendEvent(pid, &pid->inMSG);
+          return;
+        }
+        break;
+      case IOState::NeedWrite:
+        break;
+      case IOState::Async:
+        throw std::runtime_error("TLS async mode not supported");
+        break;
+      }
+    }
+    catch (const std::exception& e) {
+      newstate = IOState::Done;
+      TCPLOG(pid->tcpsock, "read exception..." << e.what() << endl);
+      PacketBuffer empty;
+      TCPIOHandlerStateChange(pid->lowState, newstate, pid);
+      MT->sendEvent(pid, &empty); // this conveys error status
+      return;
+    }
+    break;
+
+  case TCPAction::DoingWrite:
+    TCPLOG(pid->tcpsock, "highState: Writing" << endl);
+    try {
+      TCPLOG(pid->tcpsock, "tryWrite: " << pid->outPos << '/' << pid->outMSG.size() << ' ' << " -> ");
+      newstate = pid->tcphandler->tryWrite(pid->outMSG, pid->outPos, pid->outMSG.size());
+      TCPLOG(pid->tcpsock, pid->outPos << '/' << pid->outMSG.size() << endl);
+      switch (newstate) {
+      case IOState::Done: {
+        TCPLOG(pid->tcpsock, "tryWrite: Done" << endl);
+        TCPIOHandlerStateChange(pid->lowState, newstate, pid);
+        MT->sendEvent(pid, &pid->outMSG); // send back what we sent to convey everything is ok
+        return;
+      }
+      case IOState::NeedRead:
+        TCPLOG(pid->tcpsock, "tryWrite: NeedRead" << endl);
+        break;
+      case IOState::NeedWrite:
+        TCPLOG(pid->tcpsock, "tryWrite: NeedWrite" << endl);
+        break;
+      case IOState::Async:
+        throw std::runtime_error("TLS async mode not supported");
+        break;
+      }
+    }
+    catch (const std::exception& e) {
+      newstate = IOState::Done;
+      TCPLOG(pid->tcpsock, "write exception..." << e.what() << endl);
+      PacketBuffer sent;
+      TCPIOHandlerStateChange(pid->lowState, newstate, pid);
+      MT->sendEvent(pid, &sent); // we convey error status by sending empty string
+      return;
+    }
+    break;
+  }
+
+  // Cases that did not end up doing a sendEvent
+  TCPIOHandlerStateChange(pid->lowState, newstate, pid);
+}
+
+
+void checkFastOpenSysctl(bool active)
+{
+#ifdef __linux__
+  string line;
+  if (readFileIfThere("/proc/sys/net/ipv4/tcp_fastopen", &line)) {
+    int flag = std::stoi(line);
+    if (active && !(flag & 1)) {
+      g_log << Logger::Error << "tcp-fast-open-connect enabled but net.ipv4.tcp_fastopen does not allow it" << endl;
+    }
+    if (!active && !(flag & 2)) {
+      g_log << Logger::Error << "tcp-fast-open enabled but net.ipv4.tcp_fastopen does not allow it" << endl;
+    }
+  }
+  else {
+    g_log << Logger::Notice << "Cannot determine if kernel settings allow fast-open" << endl;
+ }
+#else
+  g_log << Logger::Notice << "Cannot determine if kernel settings allow fast-open" << endl;
+#endif
+}
+
+void checkTFOconnect()
+{
+  try {
+    Socket s(AF_INET, SOCK_STREAM);
+    s.setNonBlocking();
+    s.setFastOpenConnect();
+  }
+  catch (const NetworkError& e) {
+    g_log << Logger::Error << "tcp-fast-open-connect enabled but returned error: " << e.what() << endl;
+  }
+}
+
+
+
+LWResult::Result asendtcp(const PacketBuffer& data, shared_ptr<TCPIOHandler>& handler)
+{
+  TCPLOG(handler->getDescriptor(), "asendtcp called " << data.size() << endl);
+
+  auto pident = std::make_shared<PacketID>();
+  pident->tcphandler = handler;
+  pident->tcpsock = handler->getDescriptor();
+  pident->outMSG = data;
+  pident->highState = TCPAction::DoingWrite;
+
+  IOState state;
+  try {
+    TCPLOG(pident->tcpsock, "Initial tryWrite: " << pident->outPos << '/' << pident->outMSG.size() << ' ' << " -> ");
+    state = handler->tryWrite(pident->outMSG, pident->outPos, pident->outMSG.size());
+    TCPLOG(pident->tcpsock, pident->outPos << '/' << pident->outMSG.size() << endl);
+
+    if (state == IOState::Done) {
+      TCPLOG(pident->tcpsock, "asendtcp success A" << endl);
+      return LWResult::Result::Success;
+    }
+  }
+  catch (const std::exception& e) {
+    TCPLOG(pident->tcpsock, "tryWrite() exception..." << e.what() << endl);
+    return LWResult::Result::PermanentError;
+  }
+
+  // Will set pident->lowState
+  TCPIOHandlerStateChange(IOState::Done, state, pident);
+
+  PacketBuffer packet;
+  int ret = MT->waitEvent(pident, &packet, g_networkTimeoutMsec);
+  TCPLOG(pident->tcpsock, "asendtcp waitEvent returned " << ret << ' ' << packet.size() << '/' << data.size() << ' ');
+  if (ret == 0) {
+    TCPLOG(pident->tcpsock, "timeout" << endl);
+    TCPIOHandlerStateChange(pident->lowState, IOState::Done, pident);
+    return LWResult::Result::Timeout;
+  }
+  else if (ret == -1) { // error
+    TCPLOG(pident->tcpsock, "PermanentError" << endl);
+    TCPIOHandlerStateChange(pident->lowState, IOState::Done, pident);
+    return LWResult::Result::PermanentError;
+  }
+  else if (packet.size() != data.size()) { // main loop tells us what it sent out, or empty in case of an error
+    // fd housekeeping done by TCPIOHandlerIO
+    TCPLOG(pident->tcpsock, "PermanentError size mismatch" << endl);
+    return LWResult::Result::PermanentError;
+  }
+
+  TCPLOG(pident->tcpsock, "asendtcp success" << endl);
+  return LWResult::Result::Success;
+}
+
+LWResult::Result arecvtcp(PacketBuffer& data, const size_t len, shared_ptr<TCPIOHandler>& handler, const bool incompleteOkay)
+{
+  TCPLOG(handler->getDescriptor(), "arecvtcp called " << len << ' ' << data.size() << endl);
+  data.resize(len);
+
+  // We might have data already available from the TLS layer, try to get that into the buffer
+  size_t pos = 0;
+  IOState state;
+  try {
+    TCPLOG(handler->getDescriptor(), "calling tryRead() " << len << endl);
+    state = handler->tryRead(data, pos, len);
+    TCPLOG(handler->getDescriptor(), "arcvtcp tryRead() returned " << int(state) << ' ' << pos << '/' << len << endl);
+    switch (state) {
+    case IOState::Done:
+    case IOState::NeedRead:
+      if (pos == len || (incompleteOkay && pos > 0)) {
+        data.resize(pos);
+        TCPLOG(handler->getDescriptor(), "acecvtcp success A" << endl);
+        return LWResult::Result::Success;
+      }
+      break;
+    case IOState::NeedWrite:
+      break;
+    case IOState::Async:
+      throw std::runtime_error("TLS async mode not supported");
+      break;
+    }
+  }
+  catch (const std::exception& e) {
+    TCPLOG(handler->getDescriptor(), "tryRead() exception..." << e.what() << endl);
+    return LWResult::Result::PermanentError;
+  }
+
+  auto pident = std::make_shared<PacketID>();
+  pident->tcphandler = handler;
+  pident->tcpsock = handler->getDescriptor();
+  // We might have a partial result
+  pident->inMSG = std::move(data);
+  pident->inPos = pos;
+  pident->inWanted = len;
+  pident->inIncompleteOkay = incompleteOkay;
+  pident->highState = TCPAction::DoingRead;
+
+  data.clear();
+
+  // Will set pident->lowState
+  TCPIOHandlerStateChange(IOState::Done, state, pident);
+
+  int ret = MT->waitEvent(pident, &data, g_networkTimeoutMsec);
+  TCPLOG(pident->tcpsock, "arecvtcp " << ret << ' ' << data.size() << ' ' );
+  if (ret == 0) {
+    TCPLOG(pident->tcpsock, "timeout" << endl);
+    TCPIOHandlerStateChange(pident->lowState, IOState::Done, pident);
+    return LWResult::Result::Timeout;
+  }
+  else if (ret == -1) {
+    TCPLOG(pident->tcpsock, "PermanentError" << endl);
+    TCPIOHandlerStateChange(pident->lowState, IOState::Done, pident);
+    return LWResult::Result::PermanentError;
+  }
+  else if (data.empty()) {// error, EOF or other
+    // fd housekeeping done by TCPIOHandlerIO
+    TCPLOG(pident->tcpsock, "EOF" << endl);
+    return LWResult::Result::PermanentError;
+  }
+
+  TCPLOG(pident->tcpsock, "arecvtcp success" << endl);
+  return LWResult::Result::Success;
+}
+
+void makeTCPServerSockets(deferredAdd_t& deferredAdds, std::set<int>& tcpSockets)
+{
+  int fd;
+  vector<string>locals;
+  stringtok(locals,::arg()["local-address"]," ,");
+
+  if(locals.empty())
+    throw PDNSException("No local address specified");
+
+  for(vector<string>::const_iterator i=locals.begin();i!=locals.end();++i) {
+    ServiceTuple st;
+    st.port=::arg().asNum("local-port");
+    parseService(*i, st);
+
+    ComboAddress sin;
+
+    sin.reset();
+    sin.sin4.sin_family = AF_INET;
+    if(!IpToU32(st.host, (uint32_t*)&sin.sin4.sin_addr.s_addr)) {
+      sin.sin6.sin6_family = AF_INET6;
+      if(makeIPv6sockaddr(st.host, &sin.sin6) < 0)
+        throw PDNSException("Unable to resolve local address for TCP server on '"+ st.host +"'");
+    }
+
+    fd=socket(sin.sin6.sin6_family, SOCK_STREAM, 0);
+    if(fd<0)
+      throw PDNSException("Making a TCP server socket for resolver: "+stringerror());
+
+    setCloseOnExec(fd);
+
+    int tmp=1;
+    if(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &tmp, sizeof tmp)<0) {
+      g_log<<Logger::Error<<"Setsockopt failed for TCP listening socket"<<endl;
+      exit(1);
+    }
+    if(sin.sin6.sin6_family == AF_INET6 && setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &tmp, sizeof(tmp)) < 0) {
+      int err = errno;
+      g_log<<Logger::Error<<"Failed to set IPv6 socket to IPv6 only, continuing anyhow: "<<strerror(err)<<endl;
+    }
+
+#ifdef TCP_DEFER_ACCEPT
+    if(setsockopt(fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &tmp, sizeof tmp) >= 0) {
+      if(i==locals.begin())
+        g_log<<Logger::Info<<"Enabled TCP data-ready filter for (slight) DoS protection"<<endl;
+    }
+#endif
+
+    if( ::arg().mustDo("non-local-bind") )
+       Utility::setBindAny(AF_INET, fd);
+
+    if (g_reusePort) {
+#if defined(SO_REUSEPORT_LB)
+      try {
+        SSetsockopt(fd, SOL_SOCKET, SO_REUSEPORT_LB, 1);
+      }
+      catch (const std::exception& e) {
+        throw PDNSException(std::string("SO_REUSEPORT_LB: ") + e.what());
+      }
+#elif defined(SO_REUSEPORT)
+      try {
+        SSetsockopt(fd, SOL_SOCKET, SO_REUSEPORT, 1);
+      }
+      catch (const std::exception& e) {
+        throw PDNSException(std::string("SO_REUSEPORT: ") + e.what());
+      }
+#endif
+    }
+
+    if (SyncRes::s_tcp_fast_open > 0) {
+      checkFastOpenSysctl(false);
+#ifdef TCP_FASTOPEN
+      if (setsockopt(fd, IPPROTO_TCP, TCP_FASTOPEN, &SyncRes::s_tcp_fast_open, sizeof SyncRes::s_tcp_fast_open) < 0) {
+        int err = errno;
+        g_log<<Logger::Error<<"Failed to enable TCP Fast Open for listening socket: "<<strerror(err)<<endl;
+      }
+#else
+      g_log<<Logger::Warning<<"TCP Fast Open configured but not supported for listening socket"<<endl;
+#endif
+    }
+
+    sin.sin4.sin_port = htons(st.port);
+    socklen_t socklen=sin.sin4.sin_family==AF_INET ? sizeof(sin.sin4) : sizeof(sin.sin6);
+    if (::bind(fd, (struct sockaddr *)&sin, socklen )<0)
+      throw PDNSException("Binding TCP server socket for "+ st.host +": "+stringerror());
+
+    setNonBlocking(fd);
+    try {
+      setSocketSendBuffer(fd, 65000);
+    }
+    catch (const std::exception& e) {
+      g_log<<Logger::Error<<e.what()<<endl;
+    }
+
+    listen(fd, 128);
+    deferredAdds.emplace_back(fd, handleNewTCPQuestion);
+    tcpSockets.insert(fd);
+
+    // we don't need to update g_listenSocketsAddresses since it doesn't work for TCP/IP:
+    //  - fd is not that which we know here, but returned from accept()
+    if(sin.sin4.sin_family == AF_INET)
+      g_log<<Logger::Info<<"Listening for TCP queries on "<< sin.toString() <<":"<<st.port<<endl;
+    else
+      g_log<<Logger::Info<<"Listening for TCP queries on ["<< sin.toString() <<"]:"<<st.port<<endl;
+  }
+}
index 856b275d935a2a3cf080efd650e47a3472ba8abf..90b0259b43f8d3147ed738673b253942e1cec6b0 100644 (file)
@@ -1026,8 +1026,6 @@ struct PacketIDBirthdayCompare
 };
 extern std::unique_ptr<MemRecursorCache> g_recCache;
 extern thread_local std::unique_ptr<RecursorPacketCache> t_packetCache;
-typedef MTasker<std::shared_ptr<PacketID>, PacketBuffer, PacketIDCompare> MT_t;
-MT_t* getMT();
 
 struct RecursorStats
 {
index 84feae92864017eb44fb396dfd4e6de5d6f253ba..de4b6e5fec961686374860b3beb939592f97167e 100644 (file)
@@ -43,6 +43,7 @@
 #include "rpzloader.hh"
 #include "uuid-utils.hh"
 #include "tcpiohandler.hh"
+#include "rec-main.hh"
 
 extern thread_local FDMultiplexer* t_fdm;