]> git.ipfire.org Git - thirdparty/pdns.git/blobdiff - pdns/pdns_recursor.cc
Merge pull request #7397 from rgacogne/rec41-sec-20190121
[thirdparty/pdns.git] / pdns / pdns_recursor.cc
index fed8b01a33841ea56951af4c6db2836947db8025..d92b1ffaeb7b15e69859c440bd30bce5538918c0 100644 (file)
@@ -27,9 +27,9 @@
 #include <sys/stat.h>
 #include <unistd.h>
 
+#include "recpacketcache.hh"
 #include "ws-recursor.hh"
 #include <pthread.h>
-#include "recpacketcache.hh"
 #include "utility.hh"
 #include "dns_random.hh"
 #ifdef HAVE_LIBSODIUM
@@ -97,7 +97,7 @@
 typedef map<ComboAddress, uint32_t, ComboAddress::addressOnlyLessThan> tcpClientCounts_t;
 
 static thread_local std::shared_ptr<RecursorLua4> t_pdl;
-static thread_local unsigned int t_id;
+static thread_local int t_id = -1;
 static thread_local std::shared_ptr<Regex> t_traceRegex;
 static thread_local std::unique_ptr<tcpClientCounts_t> t_tcpClientCounts;
 
@@ -120,10 +120,19 @@ struct ThreadPipeSet
   int readToThread;
   int writeFromThread;
   int readFromThread;
+  int writeQueriesToThread; // this one is non-blocking
+  int readQueriesToThread;
 };
 
-typedef vector<int> tcpListenSockets_t;
+/* the TID of the thread handling the web server, carbon, statistics and the control channel */
+static const int s_handlerThreadID = -1;
+/* when pdns-distributes-queries is set, the TID of the thread handling, hashing and distributing new queries
+   to the other threads */
+static const int s_distributorThreadID = 0;
+
+typedef std::map<int, std::set<int>> tcpListenSockets_t;
 typedef map<int, ComboAddress> listenSocketsAddresses_t; // is shared across all threads right now
+
 typedef vector<pair<int, function< void(int, any&) > > > deferredAdd_t;
 
 static const ComboAddress g_local4("0.0.0.0"), g_local6("::");
@@ -137,6 +146,7 @@ static AtomicCounter counter;
 static std::shared_ptr<SyncRes::domainmap_t> g_initialDomainMap; // new threads needs this to be setup
 static std::shared_ptr<NetmaskGroup> g_initialAllowFrom; // new thread needs to be setup with this
 static size_t g_tcpMaxQueriesPerConn;
+static size_t s_maxUDPQueriesPerRound;
 static uint64_t g_latencyStatSize;
 static uint32_t g_disthashseed;
 static unsigned int g_maxTCPPerClient;
@@ -209,6 +219,7 @@ struct DNSComboWriter {
   string d_requestorId;
   string d_deviceId;
 #endif
+  std::string d_query;
   EDNSSubnetOpts d_ednssubnet;
   bool d_ecsFound{false};
   bool d_ecsParsed{false};
@@ -216,11 +227,14 @@ struct DNSComboWriter {
   int d_socket;
   unsigned int d_tag{0};
   uint32_t d_qhash{0};
-  string d_query;
   shared_ptr<TCPConnection> d_tcpConnection;
   vector<pair<uint16_t, string> > d_ednsOpts;
   std::vector<std::string> d_policyTags;
   LuaContext::LuaObject d_data;
+  uint32_t d_ttlCap{std::numeric_limits<uint32_t>::max()};
+  uint16_t d_ecsBegin{0};
+  uint16_t d_ecsEnd{0};
+  bool d_variable{false};
 };
 
 MT_t* getMT()
@@ -234,7 +248,7 @@ ArgvMap &arg()
   return theArg;
 }
 
-unsigned int getRecursorThreadId()
+int getRecursorThreadId()
 {
   return t_id;
 }
@@ -654,6 +668,7 @@ static void protobufLogQuery(const std::shared_ptr<RemoteLogger>& logger, uint8_
   Netmask requestorNM(remote, remote.sin4.sin_family == AF_INET ? maskV4 : maskV6);
   const ComboAddress& requestor = requestorNM.getMaskedNetwork();
   RecProtoBufMessage message(DNSProtoBufMessage::Query, uniqueId, &requestor, &local, qname, qtype, qclass, id, tcp, len);
+  message.setServerIdentity(SyncRes::s_serverID);
   message.setEDNSSubnet(ednssubnet, ednssubnet.isIpv4() ? maskV4 : maskV6);
   message.setRequestorId(requestorId);
   message.setDeviceId(deviceId);
@@ -703,9 +718,9 @@ static void handleRPZCustom(const DNSRecord& spoofed, const QType& qtype, SyncRe
   }
 }
 
-static bool addRecordToPacket(DNSPacketWriter& pw, const DNSRecord& rec, uint32_t& minTTL, const uint16_t maxAnswerSize)
+static bool addRecordToPacket(DNSPacketWriter& pw, const DNSRecord& rec, uint32_t& minTTL, uint32_t ttlCap, const uint16_t maxAnswerSize)
 {
-  pw.startRecord(rec.d_name, rec.d_type, rec.d_ttl, rec.d_class, rec.d_place);
+  pw.startRecord(rec.d_name, rec.d_type, (rec.d_ttl > ttlCap ? ttlCap : rec.d_ttl), rec.d_class, rec.d_place);
 
   if(rec.d_type != QType::OPT) // their TTL ain't real
     minTTL = min(minTTL, rec.d_ttl);
@@ -765,6 +780,7 @@ static void startDoResolve(void *p)
     if (luaconfsLocal->protobufServer) {
       Netmask requestorNM(dc->d_remote, dc->d_remote.sin4.sin_family == AF_INET ? luaconfsLocal->protobufMaskV4 : luaconfsLocal->protobufMaskV6);
       const ComboAddress& requestor = requestorNM.getMaskedNetwork();
+      pbMessage.setServerIdentity(SyncRes::s_serverID);
       pbMessage.update(dc->d_uuid, &requestor, &dc->d_local, dc->d_tcp, dc->d_mdp.d_header.id);
       pbMessage.setEDNSSubnet(dc->d_ednssubnet.source, dc->d_ednssubnet.source.isIpv4() ? luaconfsLocal->protobufMaskV4 : luaconfsLocal->protobufMaskV6);
       pbMessage.setQuestion(dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_mdp.d_qclass);
@@ -781,7 +797,11 @@ static void startDoResolve(void *p)
     pw.getHeader()->rd=dc->d_mdp.d_header.rd;
     pw.getHeader()->cd=dc->d_mdp.d_header.cd;
 
-    uint32_t minTTL=std::numeric_limits<uint32_t>::max();
+    /* This is the lowest TTL seen in the records of the response,
+       so we can't cache it for longer than this value.
+       If we have a TTL cap, this value can't be larger than the
+       cap no matter what. */
+    uint32_t minTTL = dc->d_ttlCap;
 
     SyncRes sr(dc->d_now);
 
@@ -816,7 +836,7 @@ static void startDoResolve(void *p)
     }
 
     bool tracedQuery=false; // we could consider letting Lua know about this too
-    bool variableAnswer = false;
+    bool variableAnswer = dc->d_variable;
     bool shouldNotValidate = false;
 
     /* preresolve expects res (dq.rcode) to be set to RCode::NoError by default */
@@ -1132,7 +1152,7 @@ static void startDoResolve(void *p)
           continue;
         }
 
-        if (!addRecordToPacket(pw, *i, minTTL, maxanswersize)) {
+        if (!addRecordToPacket(pw, *i, minTTL, dc->d_ttlCap, maxanswersize)) {
           needCommit = false;
           break;
         }
@@ -1156,7 +1176,7 @@ static void startDoResolve(void *p)
          OPT record.  This MUST also occur when a truncated response (using
          the DNS header's TC bit) is returned."
       */
-      if (addRecordToPacket(pw, makeOpt(edo.d_packetsize, 0, edo.d_Z), minTTL, maxanswersize)) {
+      if (addRecordToPacket(pw, makeOpt(edo.d_packetsize, 0, edo.d_Z), minTTL, dc->d_ttlCap, maxanswersize)) {
         pw.commit();
       }
     }
@@ -1190,12 +1210,15 @@ static void startDoResolve(void *p)
       }
       if(sendmsg(dc->d_socket, &msgh, 0) < 0 && g_logCommonErrors) 
         L<<Logger::Warning<<"Sending UDP reply to client "<<dc->d_remote.toStringWithPort()<<" failed with: "<<strerror(errno)<<endl;
+
       if(!SyncRes::s_nopacketcache && !variableAnswer && !sr.wasVariable() ) {
-        t_packetCache->insertResponsePacket(dc->d_tag, dc->d_qhash, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_mdp.d_qclass,
+        t_packetCache->insertResponsePacket(dc->d_tag, dc->d_qhash, dc->d_query, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_mdp.d_qclass,
                                             string((const char*)&*packet.begin(), packet.size()),
                                             g_now.tv_sec,
                                             pw.getHeader()->rcode == RCode::ServFail ? SyncRes::s_packetcacheservfailttl :
                                             min(minTTL,SyncRes::s_packetcachettl),
+                                            dc->d_ecsBegin,
+                                            dc->d_ecsEnd,
                                             &pbMessage);
       }
       //      else cerr<<"Not putting in packet cache: "<<sr.wasVariable()<<endl;
@@ -1256,7 +1279,12 @@ static void startDoResolve(void *p)
 
     }
 
-    sr.d_outqueries ? t_RC->cacheMisses++ : t_RC->cacheHits++;
+    if (sr.d_outqueries || sr.d_authzonequeries) {
+      t_RC->cacheMisses++;
+    }
+    else {
+      t_RC->cacheHits++;
+    }
 
     if(spent < 0.001)
       g_stats.answers0_1++;
@@ -1322,6 +1350,7 @@ static void startDoResolve(void *p)
   }
   catch(...) {
     L<<Logger::Error<<"Any other exception in a resolver context "<< makeLoginfo(dc) <<endl;
+    delete dc;
   }
 
   g_stats.maxMThreadStackUsage = max(MT->getMaxStackUsage(), g_stats.maxMThreadStackUsage);
@@ -1460,7 +1489,7 @@ static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var)
       dc->d_tcp=true;
       dc->setRemote(&conn->d_remote);
       ComboAddress dest;
-      memset(&dest, 0, sizeof(dest));
+      dest.reset();
       dest.sin4.sin_family = conn->d_remote.sin4.sin_family;
       socklen_t len = dest.getSocklen();
       getsockname(conn->getFD(), (sockaddr*)&dest, &len); // if this fails, we're ok with it
@@ -1478,24 +1507,29 @@ static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var)
       }
 #endif
 
-      if(needECS || (t_pdl && t_pdl->d_gettag)) {
+      if(needECS || (t_pdl && (t_pdl->d_gettag_ffi || t_pdl->d_gettag))) {
 
         try {
           std::map<uint16_t, EDNSOptionView> ednsOptions;
           dc->d_ecsParsed = true;
           dc->d_ecsFound = getQNameAndSubnet(std::string(conn->data, conn->qlen), &qname, &qtype, &qclass, &dc->d_ednssubnet, g_gettagNeedsEDNSOptions ? &ednsOptions : nullptr);
 
-          if(t_pdl && t_pdl->d_gettag) {
+          if(t_pdl) {
             try {
-              dc->d_tag = t_pdl->gettag(conn->d_remote, dc->d_ednssubnet.source, dest, qname, qtype, &dc->d_policyTags, dc->d_data, ednsOptions, true, requestorId, deviceId);
+              if (t_pdl->d_gettag_ffi) {
+                dc->d_tag = t_pdl->gettag_ffi(conn->d_remote, dc->d_ednssubnet.source, dest, qname, qtype, &dc->d_policyTags, dc->d_data, ednsOptions, true, requestorId, deviceId, dc->d_ttlCap, dc->d_variable);
+              }
+              else if (t_pdl->d_gettag) {
+                dc->d_tag = t_pdl->gettag(conn->d_remote, dc->d_ednssubnet.source, dest, qname, qtype, &dc->d_policyTags, dc->d_data, ednsOptions, true, requestorId, deviceId);
+              }
             }
-            catch(std::exception& e)  {
+            catch(const std::exception& e)  {
               if(g_logCommonErrors)
                 L<<Logger::Warning<<"Error parsing a query packet qname='"<<qname<<"' for tag determination, setting tag=0: "<<e.what()<<endl;
             }
           }
         }
-        catch(std::exception& e)
+        catch(const std::exception& e)
         {
           if(g_logCommonErrors)
             L<<Logger::Warning<<"Error parsing a query packet for tag determination, setting tag=0: "<<e.what()<<endl;
@@ -1508,9 +1542,9 @@ static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var)
         dc->d_uuid = (*t_uuidGenerator)();
       }
 
+      const struct dnsheader* dh = (const struct dnsheader*) conn->data;
       if(luaconfsLocal->protobufServer) {
         try {
-          const struct dnsheader* dh = (const struct dnsheader*) conn->data;
 
           if (!luaconfsLocal->protobufTaggedOnly) {
             protobufLogQuery(luaconfsLocal->protobufServer, luaconfsLocal->protobufMaskV4, luaconfsLocal->protobufMaskV6, dc->d_uuid, conn->d_remote, dest, dc->d_ednssubnet.source, true, dh->id, conn->qlen, qname, qtype, qclass, dc->d_policyTags, dc->d_requestorId, dc->d_deviceId);
@@ -1522,6 +1556,16 @@ static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var)
         }
       }
 #endif
+      if(t_pdl) {
+        if(t_pdl->ipfilter(dc->d_remote, dc->d_local, *dh)) {
+          delete dc;
+          if(!g_quiet)
+            L<<Logger::Notice<<t_id<<" ["<<MT->getTid()<<"/"<<MT->numProcesses()<<"] DROPPED TCP question from "<<conn->d_remote.toStringWithPort()<<" based on policy"<<endl;
+          g_stats.policyDrops++;
+          return;
+        }
+      }
+
       if(dc->d_mdp.d_header.qr) {
         delete dc;
         g_stats.ignoredCount++;
@@ -1637,6 +1681,10 @@ static string* doProcessUDPQuestion(const std::string& question, const ComboAddr
   EDNSSubnetOpts ednssubnet;
   bool ecsFound = false;
   bool ecsParsed = false;
+  uint16_t ecsBegin = 0;
+  uint16_t ecsEnd = 0;
+  uint32_t ttlCap = std::numeric_limits<uint32_t>::max();
+  bool variable = false;
   try {
     DNSName qname;
     uint16_t qtype=0;
@@ -1655,24 +1703,29 @@ static string* doProcessUDPQuestion(const std::string& question, const ComboAddr
     */
 #endif
 
-    if(needECS || (t_pdl && t_pdl->d_gettag)) {
+    if(needECS || (t_pdl && (t_pdl->d_gettag || t_pdl->d_gettag_ffi))) {
       try {
         std::map<uint16_t, EDNSOptionView> ednsOptions;
         ecsFound = getQNameAndSubnet(question, &qname, &qtype, &qclass, &ednssubnet, g_gettagNeedsEDNSOptions ? &ednsOptions : nullptr);
         qnameParsed = true;
         ecsParsed = true;
 
-        if(t_pdl && t_pdl->d_gettag) {
+        if(t_pdl) {
           try {
-            ctag=t_pdl->gettag(fromaddr, ednssubnet.source, destaddr, qname, qtype, &policyTags, data, ednsOptions, false, requestorId, deviceId);
+            if (t_pdl->d_gettag_ffi) {
+              ctag = t_pdl->gettag_ffi(fromaddr, ednssubnet.source, destaddr, qname, qtype, &policyTags, data, ednsOptions, false, requestorId, deviceId, ttlCap, variable);
+            }
+            else if (t_pdl->d_gettag) {
+              ctag=t_pdl->gettag(fromaddr, ednssubnet.source, destaddr, qname, qtype, &policyTags, data, ednsOptions, false, requestorId, deviceId);
+            }
           }
-          catch(std::exception& e)  {
+          catch(const std::exception& e)  {
             if(g_logCommonErrors)
               L<<Logger::Warning<<"Error parsing a query packet qname='"<<qname<<"' for tag determination, setting tag=0: "<<e.what()<<endl;
           }
         }
       }
-      catch(std::exception& e)
+      catch(const std::exception& e)
       {
         if(g_logCommonErrors)
           L<<Logger::Warning<<"Error parsing a query packet for tag determination, setting tag=0: "<<e.what()<<endl;
@@ -1682,6 +1735,7 @@ static string* doProcessUDPQuestion(const std::string& question, const ComboAddr
     bool cacheHit = false;
     RecProtoBufMessage pbMessage(DNSProtoBufMessage::DNSProtoBufMessageType::Response);
 #ifdef HAVE_PROTOBUF
+    pbMessage.setServerIdentity(SyncRes::s_serverID);
     if(luaconfsLocal->protobufServer) {
       if (!luaconfsLocal->protobufTaggedOnly || !policyTags.empty()) {
         protobufLogQuery(luaconfsLocal->protobufServer, luaconfsLocal->protobufMaskV4, luaconfsLocal->protobufMaskV6, uniqueId, fromaddr, destaddr, ednssubnet.source, false, dh->id, question.size(), qname, qtype, qclass, policyTags, requestorId, deviceId);
@@ -1689,11 +1743,14 @@ static string* doProcessUDPQuestion(const std::string& question, const ComboAddr
     }
 #endif /* HAVE_PROTOBUF */
 
+    /* It might seem like a good idea to skip the packet cache lookup if we know that the answer is not cacheable,
+       but it means that the hash would not be computed. If some script decides at a later time to mark back the answer
+       as cacheable we would cache it with a wrong tag, so better safe than sorry. */
     if (qnameParsed) {
-      cacheHit = (!SyncRes::s_nopacketcache && t_packetCache->getResponsePacket(ctag, question, qname, qtype, qclass, g_now.tv_sec, &response, &age, &qhash, &pbMessage));
+      cacheHit = (!SyncRes::s_nopacketcache && t_packetCache->getResponsePacket(ctag, question, qname, qtype, qclass, g_now.tv_sec, &response, &age, &qhash, &ecsBegin, &ecsEnd, &pbMessage));
     }
     else {
-      cacheHit = (!SyncRes::s_nopacketcache && t_packetCache->getResponsePacket(ctag, question, g_now.tv_sec, &response, &age, &qhash, &pbMessage));
+      cacheHit = (!SyncRes::s_nopacketcache && t_packetCache->getResponsePacket(ctag, question, qname, &qtype, &qclass, g_now.tv_sec, &response, &age, &qhash, &ecsBegin, &ecsEnd, &pbMessage));
     }
 
     if (cacheHit) {
@@ -1771,7 +1828,11 @@ static string* doProcessUDPQuestion(const std::string& question, const ComboAddr
   dc->d_data = data;
   dc->d_ecsFound = ecsFound;
   dc->d_ecsParsed = ecsParsed;
+  dc->d_ecsBegin = ecsBegin;
+  dc->d_ecsEnd = ecsEnd;
   dc->d_ednssubnet = ednssubnet;
+  dc->d_ttlCap = ttlCap;
+  dc->d_variable = variable;
 #ifdef HAVE_PROTOBUF
   if (luaconfsLocal->protobufServer || luaconfsLocal->outgoingProtobufServer) {
     dc->d_uuid = uniqueId;
@@ -1798,7 +1859,7 @@ static void handleNewUDPQuestion(int fd, FDMultiplexer::funcparam_t& var)
   fromaddr.sin6.sin6_family=AF_INET6; // this makes sure fromaddr is big enough
   fillMSGHdr(&msgh, &iov, cbuf, sizeof(cbuf), data, sizeof(data), &fromaddr);
 
-  for(;;)
+  for(size_t counter = 0; counter < s_maxUDPQueriesPerRound; counter++)
   if((len=recvmsg(fd, &msgh, 0)) >= 0) {
 
     firstQuery = false;
@@ -1839,7 +1900,7 @@ static void handleNewUDPQuestion(int fd, FDMultiplexer::funcparam_t& var)
        struct timeval tv={0,0};
        HarvestTimestamp(&msgh, &tv);
        ComboAddress dest;
-       memset(&dest, 0, sizeof(dest)); // this makes sure we ignore this address if not returned by recvmsg above
+       dest.reset(); // this makes sure we ignore this address if not returned by recvmsg above
         auto loc = rplookup(g_listenSocketsAddresses, fd);
        if(HarvestDestinationAddress(&msgh, &dest)) {
           // but.. need to get port too
@@ -1898,7 +1959,7 @@ static void makeTCPServerSockets(unsigned int threadId)
 
     ComboAddress sin;
 
-    memset((char *)&sin,0, sizeof(sin));
+    sin.reset();
     sin.sin4.sin_family = AF_INET;
     if(!IpToU32(st.host, (uint32_t*)&sin.sin4.sin_addr.s_addr)) {
       sin.sin6.sin6_family = AF_INET6;
@@ -1958,7 +2019,7 @@ static void makeTCPServerSockets(unsigned int threadId)
     setSocketSendBuffer(fd, 65000);
     listen(fd, 128);
     deferredAdds[threadId].push_back(make_pair(fd, handleNewTCPQuestion));
-    g_tcpListenSockets.push_back(fd);
+    g_tcpListenSockets[threadId].insert(fd);
     // we don't need to update g_listenSocketsAddresses since it doesn't work for TCP/IP:
     //  - fd is not that which we know here, but returned from accept()
     if(sin.sin4.sin_family == AF_INET)
@@ -1984,7 +2045,7 @@ static void makeUDPServerSockets(unsigned int threadId)
 
     ComboAddress sin;
 
-    memset(&sin, 0, sizeof(sin));
+    sin.reset();
     sin.sin4.sin_family = AF_INET;
     if(!IpToU32(st.host.c_str() , (uint32_t*)&sin.sin4.sin_addr.s_addr)) {
       sin.sin6.sin6_family = AF_INET6;
@@ -2116,7 +2177,7 @@ static void doStats(void)
 
 static void houseKeeping(void *)
 {
-  static thread_local time_t last_stat, last_rootupdate, last_prune, last_secpoll;
+  static thread_local time_t last_rootupdate, last_prune, last_secpoll;
   static thread_local int cleanCounter=0;
   static thread_local bool s_running;  // houseKeeping can get suspended in secpoll, and be restarted, which makes us do duplicate work
   try {
@@ -2148,11 +2209,7 @@ static void houseKeeping(void *)
         last_rootupdate=now.tv_sec;
     }
 
-    if(!t_id) {
-      if(g_statisticsInterval > 0 && now.tv_sec - last_stat >= g_statisticsInterval) {
-       doStats();
-       last_stat=time(0);
-      }
+    if(t_id == s_distributorThreadID) {
 
       if(now.tv_sec - last_secpoll >= 3600) {
        try {
@@ -2166,6 +2223,10 @@ static void houseKeeping(void *)
         {
           L<<Logger::Error<<"Exception while performing security poll: "<<e.reason<<endl;
         }
+        catch(ImmediateServFailException &e)
+        {
+          L<<Logger::Error<<"Exception while performing security poll: "<<e.reason<<endl;
+        }
         catch(...)
         {
           L<<Logger::Error<<"Exception while performing security poll"<<endl;
@@ -2199,6 +2260,15 @@ static void makeThreadPipes()
     tps.readFromThread = fd[0];
     tps.writeFromThread = fd[1];
 
+    if(pipe(fd) < 0)
+      unixDie("Creating pipe for inter-thread communications");
+    tps.readQueriesToThread = fd[0];
+    tps.writeQueriesToThread = fd[1];
+
+    if (!setNonBlocking(tps.writeQueriesToThread)) {
+      unixDie("Making pipe for inter-thread communications non-blocking");
+    }
+
     g_pipes.push_back(tps);
   }
 }
@@ -2209,14 +2279,25 @@ struct ThreadMSG
   bool wantAnswer;
 };
 
-void broadcastFunction(const pipefunc_t& func, bool skipSelf)
+void broadcastFunction(const pipefunc_t& func)
 {
-  unsigned int n = 0;
+  /* This function might be called before t_id are set during startup
+     for the initialization of ACLs and domain maps, but the default is the same
+     than the handler thread */
+  if (t_id != s_handlerThreadID) {
+    L<<Logger::Error<<"broadcastFunction() has been called by a worker ("<<t_id<<")"<<endl;
+    exit(1);
+  }
+
+  /* the distributor will call itself below, but if we are the handler thread,
+     call the function ourselves to update the ACL or domain maps for example */
+  func();
+
+  int n = 0;
   for(ThreadPipeSet& tps : g_pipes)
   {
     if(n++ == t_id) {
-      if(!skipSelf)
-        func(); // don't write to ourselves!
+      func(); // don't write to ourselves!
       continue;
     }
 
@@ -2228,34 +2309,73 @@ void broadcastFunction(const pipefunc_t& func, bool skipSelf)
       unixDie("write to thread pipe returned wrong size or error");
     }
 
-    string* resp;
+    string* resp = nullptr;
     if(read(tps.readFromThread, &resp, sizeof(resp)) != sizeof(resp))
       unixDie("read from thread pipe returned wrong size or error");
 
     if(resp) {
-//      cerr <<"got response: " << *resp << endl;
       delete resp;
+      resp = nullptr;
     }
   }
 }
 
+static bool trySendingQueryToWorker(unsigned int target, ThreadMSG* tmsg)
+{
+  if(target == static_cast<unsigned int>(s_distributorThreadID)) {
+    L<<Logger::Error<<"distributeAsyncFunction() tried to assign a query to the distributor"<<endl;
+    exit(1);
+  }
+
+  ThreadPipeSet& tps = g_pipes[target];
+  ssize_t written = write(tps.writeQueriesToThread, &tmsg, sizeof(tmsg));
+  if (written > 0) {
+    if (static_cast<size_t>(written) != sizeof(tmsg)) {
+      delete tmsg;
+      unixDie("write to thread pipe returned wrong size or error");
+    }
+  }
+  else {
+    int error = errno;
+    if (error == EAGAIN || error == EWOULDBLOCK) {
+      /* the pipe is full, sorry */
+      return false;
+    } else {
+      delete tmsg;
+      unixDie("write to thread pipe returned wrong size or error:" + std::to_string(error));
+    }
+  }
+
+  return true;
+}
+
+// This function is only called by the distributor thread, when pdns-distributes-queries is set
 void distributeAsyncFunction(const string& packet, const pipefunc_t& func)
 {
+  if (t_id != s_distributorThreadID) {
+    L<<Logger::Error<<"distributeAsyncFunction() has been called by a worker ("<<t_id<<")"<<endl;
+    exit(1);
+  }
+
   unsigned int hash = hashQuestion(packet.c_str(), packet.length(), g_disthashseed);
   unsigned int target = 1 + (hash % (g_pipes.size()-1));
 
-  if(target == t_id) {
-    func();
-    return;
-  }
-  ThreadPipeSet& tps = g_pipes[target];
   ThreadMSG* tmsg = new ThreadMSG();
   tmsg->func = func;
   tmsg->wantAnswer = false;
 
-  if(write(tps.writeToThread, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) {
-    delete tmsg;
-    unixDie("write to thread pipe returned wrong size or error");
+  if (!trySendingQueryToWorker(target, tmsg)) {
+    /* if this function failed but did not raise an exception, it means that the pipe
+       was full, let's try another one */
+    unsigned int newTarget = 0;
+    do {
+      newTarget = 1 + dns_random(g_pipes.size()-1);
+    } while (newTarget == target);
+
+    if (!trySendingQueryToWorker(newTarget, tmsg)) {
+      g_stats.queryPipeFullDrops++;
+      delete tmsg;
+    }
   }
 }
 
@@ -2263,7 +2383,7 @@ static void handlePipeRequest(int fd, FDMultiplexer::funcparam_t& var)
 {
   ThreadMSG* tmsg = nullptr;
 
-  if(read(fd, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) { // fd == readToThread
+  if(read(fd, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) { // fd == readToThread || fd == readQueriesToThread
     unixDie("read from thread pipe returned wrong size or error");
   }
 
@@ -2313,24 +2433,21 @@ vector<pair<DNSName, uint16_t> >& operator+=(vector<pair<DNSName, uint16_t> >&a,
 }
 
 
-template<class T> T broadcastAccFunction(const boost::function<T*()>& func, bool skipSelf)
-{
-  unsigned int n = 0;
+/*
+  This function should only be called by the handler to gather metrics, wipe the cache,
+  reload the Lua script (not the Lua config) or change the current trace regex,
+  and by the SNMP thread to gather metrics. */
+template<class T> T broadcastAccFunction(const boost::function<T*()>& func)
+{
+  /* the SNMP thread uses id -1 too */
+  if (t_id != s_handlerThreadID) {
+    L<<Logger::Error<<"broadcastAccFunction has been called by a worker ("<<t_id<<")"<<endl;
+    exit(1);
+  }
+
   T ret=T();
   for(ThreadPipeSet& tps : g_pipes)
   {
-    if(n++ == t_id) {
-      if(!skipSelf) {
-        T* resp = (T*)func(); // don't write to ourselves!
-        if(resp) {
-          //~ cerr <<"got direct: " << *resp << endl;
-          ret += *resp;
-          delete resp;
-        }
-      }
-      continue;
-    }
-
     ThreadMSG* tmsg = new ThreadMSG();
     tmsg->func = boost::bind(voider<T>, func);
     tmsg->wantAnswer = true;
@@ -2340,23 +2457,23 @@ template<class T> T broadcastAccFunction(const boost::function<T*()>& func, bool
       unixDie("write to thread pipe returned wrong size or error");
     }
 
-    T* resp;
+    T* resp = nullptr;
     if(read(tps.readFromThread, &resp, sizeof(resp)) != sizeof(resp))
       unixDie("read from thread pipe returned wrong size or error");
 
     if(resp) {
-      //~ cerr <<"got response: " << *resp << endl;
       ret += *resp;
       delete resp;
+      resp = nullptr;
     }
   }
   return ret;
 }
 
-template string broadcastAccFunction(const boost::function<string*()>& fun, bool skipSelf); // explicit instantiation
-template uint64_t broadcastAccFunction(const boost::function<uint64_t*()>& fun, bool skipSelf); // explicit instantiation
-template vector<ComboAddress> broadcastAccFunction(const boost::function<vector<ComboAddress> *()>& fun, bool skipSelf); // explicit instantiation
-template vector<pair<DNSName,uint16_t> > broadcastAccFunction(const boost::function<vector<pair<DNSName, uint16_t> > *()>& fun, bool skipSelf); // explicit instantiation
+template string broadcastAccFunction(const boost::function<string*()>& fun); // explicit instantiation
+template uint64_t broadcastAccFunction(const boost::function<uint64_t*()>& fun); // explicit instantiation
+template vector<ComboAddress> broadcastAccFunction(const boost::function<vector<ComboAddress> *()>& fun); // explicit instantiation
+template vector<pair<DNSName,uint16_t> > broadcastAccFunction(const boost::function<vector<pair<DNSName, uint16_t> > *()>& fun); // explicit instantiation
 
 static void handleRCC(int fd, FDMultiplexer::funcparam_t& var)
 {
@@ -2652,7 +2769,7 @@ static void checkOrFixFDS()
   }
 }
 
-static void* recursorThread(void*);
+static void* recursorThread(int tid, bool worker);
 
 static void* pleaseSupplantACLs(std::shared_ptr<NetmaskGroup> ng)
 {
@@ -2880,14 +2997,21 @@ static int serviceMain(int argc, char*argv[])
     exit(1);
   }
 
+  g_signatureInceptionSkew = ::arg().asNum("signature-inception-skew");
+  if (g_signatureInceptionSkew < 0) {
+    L<<Logger::Error<<"A negative value for 'signature-inception-skew' is not allowed"<<endl;
+    exit(1);
+  }
+
   g_dnssecLogBogus = ::arg().mustDo("dnssec-log-bogus");
   g_maxNSEC3Iterations = ::arg().asNum("nsec3-max-iterations");
 
   g_maxCacheEntries = ::arg().asNum("max-cache-entries");
   g_maxPacketCacheEntries = ::arg().asNum("max-packetcache-entries");
-  
+
+  luaConfigDelayedThreads delayedLuaThreads;
   try {
-    loadRecursorLuaConfig(::arg()["lua-config-file"], ::arg().mustDo("daemon"));
+    loadRecursorLuaConfig(::arg()["lua-config-file"], delayedLuaThreads);
   }
   catch (PDNSException &e) {
     L<<Logger::Error<<"Cannot load Lua configuration: "<<e.reason<<endl;
@@ -3028,6 +3152,14 @@ static int serviceMain(int argc, char*argv[])
   else {
     makeUDPServerSockets(0);
     makeTCPServerSockets(0);
+
+    if (!g_weDistributeQueries) {
+      /* we are not distributing queries and we don't have reuseport,
+         so every thread will be listening on all the TCP sockets */
+      for (unsigned int threadId = 1; threadId < g_numWorkerThreads; threadId++) {
+        g_tcpListenSockets[threadId] = g_tcpListenSockets[0];
+      }
+    }
   }
 
   SyncRes::parseEDNSSubnetWhitelist(::arg()["edns-subnet-whitelist"]);
@@ -3043,7 +3175,6 @@ static int serviceMain(int argc, char*argv[])
     L<<Logger::Warning<<"Calling daemonize, going to background"<<endl;
     L.toConsole(Logger::Critical);
     daemonize();
-    loadRecursorLuaConfig(::arg()["lua-config-file"], false);
   }
   signal(SIGUSR1,usr1Handler);
   signal(SIGUSR2,usr2Handler);
@@ -3096,47 +3227,53 @@ static int serviceMain(int argc, char*argv[])
 
   Utility::dropUserPrivs(newuid);
 
+  startLuaConfigDelayedThreads(delayedLuaThreads);
+
   makeThreadPipes();
 
   g_tcpTimeout=::arg().asNum("client-tcp-timeout");
   g_maxTCPPerClient=::arg().asNum("max-tcp-per-client");
   g_tcpMaxQueriesPerConn=::arg().asNum("max-tcp-queries-per-connection");
+  s_maxUDPQueriesPerRound=::arg().asNum("max-udp-queries-per-round");
 
   if (::arg().mustDo("snmp-agent")) {
     g_snmpAgent = std::make_shared<RecursorSNMPAgent>("recursor", ::arg()["snmp-master-socket"]);
     g_snmpAgent->run();
   }
 
+  /* This thread handles the web server, carbon, statistics and the control channel */
+  std::thread handlerThread(recursorThread, s_handlerThreadID, false);
+
   const auto cpusMap = parseCPUMap();
+
+  std::vector<std::thread> workers(g_numThreads);
   if(g_numThreads == 1) {
     L<<Logger::Warning<<"Operating unthreaded"<<endl;
 #ifdef HAVE_SYSTEMD
     sd_notify(0, "READY=1");
 #endif
     setCPUMap(cpusMap, 0, pthread_self());
-    recursorThread(0);
+    recursorThread(0, true);
   }
   else {
-    pthread_t tid;
     L<<Logger::Warning<<"Launching "<< g_numThreads <<" threads"<<endl;
     for(unsigned int n=0; n < g_numThreads; ++n) {
-      pthread_create(&tid, 0, recursorThread, (void*)(long)n);
+      workers[n] = std::thread(recursorThread, n, true);
 
-      setCPUMap(cpusMap, n, tid);
+      setCPUMap(cpusMap, n, workers[n].native_handle());
     }
-    void* res;
 #ifdef HAVE_SYSTEMD
     sd_notify(0, "READY=1");
 #endif
-    pthread_join(tid, &res);
+    workers.back().join();
   }
   return 0;
 }
 
-static void* recursorThread(void* ptr)
+static void* recursorThread(int n, bool worker)
 try
 {
-  t_id=(int) (long) ptr;
+  t_id=n;
   SyncRes tmp(g_now); // make sure it allocates tsstorage before we do anything, like primeHints or so..
   SyncRes::setDomainMap(g_initialDomainMap);
   t_allowFrom = g_initialAllowFrom;
@@ -3151,15 +3288,17 @@ try
 #endif
   L<<Logger::Warning<<"Done priming cache with root hints"<<endl;
 
-  try {
-    if(!::arg()["lua-dns-script"].empty()) {
-      t_pdl = std::make_shared<RecursorLua4>(::arg()["lua-dns-script"]);
-      L<<Logger::Warning<<"Loaded 'lua' script from '"<<::arg()["lua-dns-script"]<<"'"<<endl;
+  if(worker) {
+    try {
+      if(!::arg()["lua-dns-script"].empty()) {
+        t_pdl = std::make_shared<RecursorLua4>(::arg()["lua-dns-script"]);
+        L<<Logger::Warning<<"Loaded 'lua' script from '"<<::arg()["lua-dns-script"]<<"'"<<endl;
+      }
+    }
+    catch(std::exception &e) {
+      L<<Logger::Error<<"Failed to load 'lua' script from '"<<::arg()["lua-dns-script"]<<"': "<<e.what()<<endl;
+      _exit(99);
     }
-  }
-  catch(std::exception &e) {
-    L<<Logger::Error<<"Failed to load 'lua' script from '"<<::arg()["lua-dns-script"]<<"': "<<e.what()<<endl;
-    _exit(99);
   }
 
   unsigned int ringsize=::arg().asNum("stats-ringbuffer-entries") / g_numWorkerThreads;
@@ -3185,7 +3324,8 @@ try
   PacketID pident;
 
   t_fdm=getMultiplexer();
-  if(!t_id) {
+
+  if(!worker) {
     if(::arg().mustDo("webserver")) {
       L<<Logger::Warning << "Enabling web server" << endl;
       try {
@@ -3198,24 +3338,27 @@ try
     }
     L<<Logger::Error<<"Enabled '"<< t_fdm->getName() << "' multiplexer"<<endl;
   }
-
-  t_fdm->addReadFD(g_pipes[t_id].readToThread, handlePipeRequest);
-
-  if(g_useOneSocketPerThread) {
-    for(deferredAdd_t::const_iterator i = deferredAdds[t_id].cbegin(); i != deferredAdds[t_id].cend(); ++i) {
-      t_fdm->addReadFD(i->first, i->second);
-    }
-  }
   else {
-    if(!g_weDistributeQueries || !t_id) { // if we distribute queries, only t_id = 0 listens
-      for(deferredAdd_t::const_iterator i = deferredAdds[0].cbegin(); i != deferredAdds[0].cend(); ++i) {
+    t_fdm->addReadFD(g_pipes[t_id].readToThread, handlePipeRequest);
+    t_fdm->addReadFD(g_pipes[t_id].readQueriesToThread, handlePipeRequest);
+
+    if(g_useOneSocketPerThread) {
+      for(deferredAdd_t::const_iterator i = deferredAdds[t_id].cbegin(); i != deferredAdds[t_id].cend(); ++i) {
         t_fdm->addReadFD(i->first, i->second);
       }
     }
+    else {
+      if(!g_weDistributeQueries || t_id == s_distributorThreadID) { // if we distribute queries, only t_id = 0 listens
+        for(deferredAdd_t::const_iterator i = deferredAdds[0].cbegin(); i != deferredAdds[0].cend(); ++i) {
+          t_fdm->addReadFD(i->first, i->second);
+        }
+      }
+    }
   }
 
   registerAllStats();
-  if(!t_id) {
+
+  if(!worker) {
     t_fdm->addReadFD(s_rcc.d_fd, handleRCC); // control channel
   }
 
@@ -3223,6 +3366,7 @@ try
 
   bool listenOnTCP(true);
 
+  time_t last_stat = 0;
   time_t last_carbon=0;
   time_t carbonInterval=::arg().asNum("carbon-interval");
   counter.store(0); // used to periodically execute certain tasks
@@ -3247,32 +3391,37 @@ try
 
     counter++;
 
-    if(!t_id && statsWanted) {
-      doStats();
-    }
+    if(!worker) {
+      if(statsWanted || (g_statisticsInterval > 0 && (g_now.tv_sec - last_stat) >= g_statisticsInterval)) {
+        doStats();
+        last_stat = g_now.tv_sec;
+      }
 
-    Utility::gettimeofday(&g_now, 0);
+      Utility::gettimeofday(&g_now, 0);
 
-    if(!t_id && (g_now.tv_sec - last_carbon >= carbonInterval)) {
-      MT->makeThread(doCarbonDump, 0);
-      last_carbon = g_now.tv_sec;
+      if((g_now.tv_sec - last_carbon) >= carbonInterval) {
+        MT->makeThread(doCarbonDump, 0);
+        last_carbon = g_now.tv_sec;
+      }
     }
 
     t_fdm->run(&g_now);
     // 'run' updates g_now for us
 
-    if(!g_weDistributeQueries || !t_id) { // if pdns distributes queries, only tid 0 should do this
+    if(worker && (!g_weDistributeQueries || t_id == s_distributorThreadID)) { // if pdns distributes queries, only tid 0 should do this
       if(listenOnTCP) {
        if(TCPConnection::getCurrentConnections() > maxTcpClients) {  // shutdown, too many connections
-         for(tcpListenSockets_t::iterator i=g_tcpListenSockets.begin(); i != g_tcpListenSockets.end(); ++i)
-           t_fdm->removeReadFD(*i);
+         for(const auto fd : g_tcpListenSockets[t_id]) {
+           t_fdm->removeReadFD(fd);
+          }
          listenOnTCP=false;
        }
       }
       else {
        if(TCPConnection::getCurrentConnections() <= maxTcpClients) {  // reenable
-         for(tcpListenSockets_t::iterator i=g_tcpListenSockets.begin(); i != g_tcpListenSockets.end(); ++i)
-           t_fdm->addReadFD(*i, handleNewTCPQuestion);
+          for(const auto fd : g_tcpListenSockets[t_id]) {
+           t_fdm->addReadFD(fd, handleNewTCPQuestion);
+          }
          listenOnTCP=true;
        }
       }
@@ -3314,6 +3463,7 @@ int main(int argc, char **argv)
     ::arg().set("trace","if we should output heaps of logging. set to 'fail' to only log failing domains")="off";
     ::arg().set("dnssec", "DNSSEC mode: off/process-no-validate (default)/process/log-fail/validate")="process-no-validate";
     ::arg().set("dnssec-log-bogus", "Log DNSSEC bogus validations")="no";
+    ::arg().set("signature-inception-skew", "Allow the signture inception to be off by this number of seconds")="0";
     ::arg().set("daemon","Operate as a daemon")="no";
     ::arg().setSwitch("write-pid","Write a PID file")="yes";
     ::arg().set("loglevel","Amount of logging. Higher is more. Do not set below 3")="6";
@@ -3363,7 +3513,7 @@ int main(int argc, char **argv)
     ::arg().set("packetcache-ttl", "maximum number of seconds to keep a cached entry in packetcache")="3600";
     ::arg().set("max-packetcache-entries", "maximum number of entries to keep in the packetcache")="500000";
     ::arg().set("packetcache-servfail-ttl", "maximum number of seconds to keep a cached servfail entry in packetcache")="60";
-    ::arg().set("server-id", "Returned when queried for 'server.id' TXT or NSID, defaults to hostname")="";
+    ::arg().set("server-id", "Returned when queried for 'id.server' TXT or NSID, defaults to hostname")="";
     ::arg().set("stats-ringbuffer-entries", "maximum number of packets to store statistics for")="10000";
     ::arg().set("version-string", "string reported on version.pdns or version.bind")=fullVersionString();
     ::arg().set("allow-from", "If set, only allow these comma separated netmasks to recurse")=LOCAL_NETS;
@@ -3403,6 +3553,7 @@ int main(int argc, char **argv)
     ::arg().set("max-qperq", "Maximum outgoing queries per query")="50";
     ::arg().set("max-total-msec", "Maximum total wall-clock time per query in milliseconds, 0 for unlimited")="7000";
     ::arg().set("max-recursion-depth", "Maximum number of internal recursion calls per query, 0 for unlimited")="40";
+    ::arg().set("max-udp-queries-per-round", "Maximum number of UDP queries processed per recvmsg() round, before returning back to normal processing")="10000";
 
     ::arg().set("include-dir","Include *.conf files from this directory")="";
     ::arg().set("security-poll-suffix","Domain name from which to query security update notifications")="secpoll.powerdns.com.";