]> git.ipfire.org Git - thirdparty/pdns.git/blobdiff - pdns/pdns_recursor.cc
rec: Remove useless accounting of FDs in the UDPClientSocks class
[thirdparty/pdns.git] / pdns / pdns_recursor.cc
index ae37f9da8d92a4d7daae3a05c00aad63e92faf07..af5cde11ad8c7c47e2cfffa31e0f9d8336623a36 100644 (file)
@@ -210,6 +210,7 @@ static bool g_reusePort{false};
 static bool g_gettagNeedsEDNSOptions{false};
 static time_t g_statisticsInterval;
 static bool g_useIncomingECS;
+static bool g_useKernelTimestamp;
 std::atomic<uint32_t> g_maxCacheEntries, g_maxPacketCacheEntries;
 #ifdef NOD_ENABLED
 static bool g_nodEnabled;
@@ -240,6 +241,10 @@ unsigned int g_numThreads;
 uint16_t g_outgoingEDNSBufsize;
 bool g_logRPZChanges{false};
 
+// Used in the Syncres to not throttle certain servers
+GlobalStateHolder<SuffixMatchNode> g_dontThrottleNames;
+GlobalStateHolder<NetmaskGroup> g_dontThrottleNetmasks;
+
 #define LOCAL_NETS "127.0.0.0/8, 10.0.0.0/8, 100.64.0.0/10, 169.254.0.0/16, 192.168.0.0/16, 172.16.0.0/12, ::1/128, fc00::/7, fe80::/10"
 #define LOCAL_NETS_INVERSE "!127.0.0.0/8, !10.0.0.0/8, !100.64.0.0/10, !169.254.0.0/16, !192.168.0.0/16, !172.16.0.0/12, !::1/128, !fc00::/7, !fe80::/10"
 // Bad Nets taken from both:
@@ -311,6 +316,7 @@ struct DNSComboWriter {
   boost::uuids::uuid d_uuid;
   string d_requestorId;
   string d_deviceId;
+  struct timeval d_kernelTimestamp{0,0};
 #endif
   std::string d_query;
   std::vector<std::string> d_policyTags;
@@ -529,9 +535,6 @@ public:
   {
   }
 
-  typedef set<int> socks_t;
-  socks_t d_socks;
-
   // returning -2 means: temporary OS error (ie, out of files), -1 means error related to remote
   int getSocket(const ComboAddress& toaddr, int* fd)
   {
@@ -554,43 +557,32 @@ public:
       return -1;
     }
 
-    d_socks.insert(*fd);
     d_numsocks++;
     return 0;
   }
 
-  void returnSocket(int fd)
-  {
-    socks_t::iterator i=d_socks.find(fd);
-    if(i==d_socks.end()) {
-      throw PDNSException("Trying to return a socket (fd="+std::to_string(fd)+") not in the pool");
-    }
-    returnSocketLocked(i);
-  }
-
   // return a socket to the pool, or simply erase it
-  void returnSocketLocked(socks_t::iterator& i)
+  void returnSocket(int fd)
   {
-    if(i==d_socks.end()) {
-      throw PDNSException("Trying to return a socket not in the pool");
-    }
     try {
-      t_fdm->removeReadFD(*i);
+      t_fdm->removeReadFD(fd);
     }
-    catch(FDMultiplexerException& e) {
+    catch(const FDMultiplexerException& e) {
       // we sometimes return a socket that has not yet been assigned to t_fdm
     }
+
     try {
-      closesocket(*i);
+      closesocket(fd);
     }
     catch(const PDNSException& e) {
       g_log<<Logger::Error<<"Error closing returned UDP socket: "<<e.reason<<endl;
     }
 
-    d_socks.erase(i++);
     --d_numsocks;
   }
 
+private:
+
   // returns -1 for errors which might go away, throws for ones that won't
   static int makeClientSocket(int family)
   {
@@ -1537,7 +1529,12 @@ static void startDoResolve(void *p)
         pbMessage->setAppliedPolicyType(appliedPolicy.d_type);
       }
       pbMessage->setPolicyTags(dc->d_policyTags);
-      pbMessage->setQueryTime(dc->d_now.tv_sec, dc->d_now.tv_usec);
+      if (g_useKernelTimestamp && dc->d_kernelTimestamp.tv_sec) {
+        pbMessage->setQueryTime(dc->d_kernelTimestamp.tv_sec, dc->d_kernelTimestamp.tv_usec);
+      }
+      else {
+        pbMessage->setQueryTime(dc->d_now.tv_sec, dc->d_now.tv_usec);
+      }
       pbMessage->setRequestorId(dq.requestorId);
       pbMessage->setDeviceId(dq.deviceId);
 #ifdef NOD_ENABLED
@@ -1629,8 +1626,10 @@ static void startDoResolve(void *p)
         else {
           dc->d_tcpConnection->state=TCPConnection::BYTE0;
           Utility::gettimeofday(&g_now, 0); // needs to be updated
-          t_fdm->addReadFD(dc->d_socket, handleRunningTCPQuestion, dc->d_tcpConnection);
-          t_fdm->setReadTTD(dc->d_socket, g_now, g_tcpTimeout);
+          struct timeval ttd = g_now;
+          ttd.tv_sec += g_tcpTimeout;
+
+          t_fdm->addReadFD(dc->d_socket, handleRunningTCPQuestion, dc->d_tcpConnection, &ttd);
         }
       }
     }
@@ -2050,23 +2049,25 @@ static void handleNewTCPQuestion(int fd, FDMultiplexer::funcparam_t& )
     std::shared_ptr<TCPConnection> tc = std::make_shared<TCPConnection>(newsock, addr);
     tc->state=TCPConnection::BYTE0;
 
-    t_fdm->addReadFD(tc->getFD(), handleRunningTCPQuestion, tc);
+    struct timeval ttd;
+    Utility::gettimeofday(&ttd, 0);
+    ttd.tv_sec += g_tcpTimeout;
 
-    struct timeval now;
-    Utility::gettimeofday(&now, 0);
-    t_fdm->setReadTTD(tc->getFD(), now, g_tcpTimeout);
+    t_fdm->addReadFD(tc->getFD(), handleRunningTCPQuestion, tc, &ttd);
   }
 }
 
 static string* doProcessUDPQuestion(const std::string& question, const ComboAddress& fromaddr, const ComboAddress& destaddr, struct timeval tv, int fd)
 {
   gettimeofday(&g_now, 0);
-  struct timeval diff = g_now - tv;
-  double delta=(diff.tv_sec*1000 + diff.tv_usec/1000.0);
+  if (tv.tv_sec) {
+    struct timeval diff = g_now - tv;
+    double delta=(diff.tv_sec*1000 + diff.tv_usec/1000.0);
 
-  if(tv.tv_sec && delta > 1000.0) {
-    g_stats.tooOldDrops++;
-    return 0;
+    if(delta > 1000.0) {
+      g_stats.tooOldDrops++;
+      return nullptr;
+    }
   }
 
   ++g_stats.qcounter;
@@ -2196,7 +2197,12 @@ static string* doProcessUDPQuestion(const std::string& question, const ComboAddr
         const ComboAddress& requestor = requestorNM.getMaskedNetwork();
         pbMessage->update(uniqueId, &requestor, &destination, false, dh->id);
         pbMessage->setEDNSSubnet(ednssubnet.source, ednssubnet.source.isIpv4() ? luaconfsLocal->protobufMaskV4 : luaconfsLocal->protobufMaskV6);
-        pbMessage->setQueryTime(g_now.tv_sec, g_now.tv_usec);
+        if (g_useKernelTimestamp && tv.tv_sec) {
+          pbMessage->setQueryTime(tv.tv_sec, tv.tv_usec);
+        }
+        else {
+          pbMessage->setQueryTime(g_now.tv_sec, g_now.tv_usec);
+        }
         pbMessage->setRequestorId(requestorId);
         pbMessage->setDeviceId(deviceId);
         protobufLogResponse(*pbMessage);
@@ -2274,6 +2280,7 @@ static string* doProcessUDPQuestion(const std::string& question, const ComboAddr
   }
   dc->d_requestorId = requestorId;
   dc->d_deviceId = deviceId;
+  dc->d_kernelTimestamp = tv;
 #endif
 
   MT->makeThread(startDoResolve, (void*) dc.release()); // deletes dc
@@ -2563,7 +2570,17 @@ static void makeUDPServerSockets(deferredAdd_t& deferredAdds)
         throw PDNSException("SO_REUSEPORT: "+stringerror());
     }
 #endif
-  socklen_t socklen=sin.getSocklen();
+
+    if (sin.isIPv4()) {
+      try {
+        setSocketIgnorePMTU(fd);
+      }
+      catch(const std::exception& e) {
+        g_log<<Logger::Warning<<"Failed to set IP_MTU_DISCOVER on UDP server socket: "<<e.what()<<endl;
+      }
+    }
+
+    socklen_t socklen=sin.getSocklen();
     if (::bind(fd, (struct sockaddr *)&sin, socklen)<0)
       throw PDNSException("Resolver binding to server socket on port "+ std::to_string(st.port) +" for "+ st.host+": "+stringerror());
 
@@ -2640,7 +2657,7 @@ static void doStats(void)
     size_t idx = 0;
     for (const auto& threadInfo : s_threadInfos) {
       if(threadInfo.isWorker) {
-        g_log<<Logger::Notice<<"Thread "<<idx<<" has been distributed "<<threadInfo.numberOfDistributedQueries<<" queries"<<endl;
+        g_log<<Logger::Notice<<"stats: thread "<<idx<<" has been distributed "<<threadInfo.numberOfDistributedQueries<<" queries"<<endl;
         ++idx;
       }
     }
@@ -2671,16 +2688,15 @@ static void houseKeeping(void *)
   }
 
   try {
-    if(s_running)
+    if(s_running) {
       return;
+    }
     s_running=true;
 
     struct timeval now;
     Utility::gettimeofday(&now, 0);
 
     if(now.tv_sec - last_prune > (time_t)(5 + t_id)) {
-      DTime dt;
-      dt.setTimeval(now);
       t_RC->doPrune(g_maxCacheEntries / g_numThreads); // this function is local to a thread, so fine anyhow
       t_packetCache->doPruneTo(g_maxPacketCacheEntries / g_numWorkerThreads);
 
@@ -2738,8 +2754,8 @@ static void houseKeeping(void *)
           g_log<<Logger::Error<<"Unable to update Trust Anchors: "<<pe.reason<<endl;
         }
       }
-      s_running=false;
     }
+    s_running=false;
   }
   catch(PDNSException& ae)
     {
@@ -2751,6 +2767,11 @@ static void houseKeeping(void *)
 
 static void makeThreadPipes()
 {
+  auto pipeBufferSize = ::arg().asNum("distribution-pipe-buffer-size");
+  if (pipeBufferSize > 0) {
+    g_log<<Logger::Info<<"Resizing the buffer of the distribution pipe to "<<pipeBufferSize<<endl;
+  }
+
   /* thread 0 is the handler / SNMP, we start at 1 */
   for(unsigned int n = 1; n <= (g_numWorkerThreads + g_numDistributorThreads); ++n) {
     auto& threadInfos = s_threadInfos.at(n);
@@ -2774,6 +2795,16 @@ static void makeThreadPipes()
     threadInfos.pipes.readQueriesToThread = fd[0];
     threadInfos.pipes.writeQueriesToThread = fd[1];
 
+    if (pipeBufferSize > 0) {
+      if (!setPipeBufferSize(threadInfos.pipes.writeQueriesToThread, pipeBufferSize)) {
+        g_log<<Logger::Warning<<"Error resizing the buffer of the distribution pipe for thread "<<n<<" to "<<pipeBufferSize<<": "<<strerror(errno)<<endl;
+        auto existingSize = getPipeBufferSize(threadInfos.pipes.writeQueriesToThread);
+        if (existingSize > 0) {
+          g_log<<Logger::Warning<<"The current size of the distribution pipe's buffer for thread "<<n<<" is "<<existingSize<<endl;
+        }
+      }
+    }
+
     if (!setNonBlocking(threadInfos.pipes.writeQueriesToThread)) {
       unixDie("Making pipe for inter-thread communications non-blocking");
     }
@@ -3031,6 +3062,7 @@ template string broadcastAccFunction(const boost::function<string*()>& fun); //
 template uint64_t broadcastAccFunction(const boost::function<uint64_t*()>& fun); // explicit instantiation
 template vector<ComboAddress> broadcastAccFunction(const boost::function<vector<ComboAddress> *()>& fun); // explicit instantiation
 template vector<pair<DNSName,uint16_t> > broadcastAccFunction(const boost::function<vector<pair<DNSName, uint16_t> > *()>& fun); // explicit instantiation
+template ThreadTimes broadcastAccFunction(const boost::function<ThreadTimes*()>& fun);
 
 static void handleRCC(int fd, FDMultiplexer::funcparam_t& var)
 {
@@ -3776,6 +3808,23 @@ static int serviceMain(int argc, char*argv[])
 
   g_statisticsInterval = ::arg().asNum("statistics-interval");
 
+  {
+    SuffixMatchNode dontThrottleNames;
+    vector<string> parts;
+    stringtok(parts, ::arg()["dont-throttle-names"]);
+    for (const auto &p : parts) {
+      dontThrottleNames.add(DNSName(p));
+    }
+    g_dontThrottleNames.setState(dontThrottleNames);
+
+    NetmaskGroup dontThrottleNetmasks;
+    stringtok(parts, ::arg()["dont-throttle-netmasks"]);
+    for (const auto &p : parts) {
+      dontThrottleNetmasks.addMask(Netmask(p));
+    }
+    g_dontThrottleNetmasks.setState(dontThrottleNetmasks);
+  }
+
   s_balancingFactor = ::arg().asDouble("distribution-load-factor");
   if (s_balancingFactor != 0.0 && s_balancingFactor < 1.0) {
     s_balancingFactor = 0.0;
@@ -3923,6 +3972,8 @@ static int serviceMain(int argc, char*argv[])
   g_tcpMaxQueriesPerConn=::arg().asNum("max-tcp-queries-per-connection");
   s_maxUDPQueriesPerRound=::arg().asNum("max-udp-queries-per-round");
 
+  g_useKernelTimestamp = ::arg().mustDo("protobuf-use-kernel-timestamp");
+
   blacklistStats(StatComponent::API, ::arg()["stats-api-blacklist"]);
   blacklistStats(StatComponent::Carbon, ::arg()["stats-carbon-blacklist"]);
   blacklistStats(StatComponent::RecControl, ::arg()["stats-rec-control-blacklist"]);
@@ -4294,6 +4345,8 @@ int main(int argc, char **argv)
     ::arg().set("max-tcp-clients","Maximum number of simultaneous TCP clients")="128";
     ::arg().set("server-down-max-fails","Maximum number of consecutive timeouts (and unreachables) to mark a server as down ( 0 => disabled )")="64";
     ::arg().set("server-down-throttle-time","Number of seconds to throttle all queries to a server after being marked as down")="60";
+    ::arg().set("dont-throttle-names", "Do not throttle nameservers with this name or suffix")="";
+    ::arg().set("dont-throttle-netmasks", "Do not throttle nameservers with this IP netmask")="";
     ::arg().set("hint-file", "If set, load root hints from this file")="";
     ::arg().set("max-cache-entries", "If set, maximum number of entries in the main cache")="1000000";
     ::arg().set("max-negative-ttl", "maximum number of seconds to keep a negative cached entry in memory")="3600";
@@ -4349,6 +4402,8 @@ int main(int argc, char **argv)
     ::arg().set("max-total-msec", "Maximum total wall-clock time per query in milliseconds, 0 for unlimited")="7000";
     ::arg().set("max-recursion-depth", "Maximum number of internal recursion calls per query, 0 for unlimited")="40";
     ::arg().set("max-udp-queries-per-round", "Maximum number of UDP queries processed per recvmsg() round, before returning back to normal processing")="10000";
+    ::arg().set("protobuf-use-kernel-timestamp", "Compute the latency of queries in protobuf messages by using the timestamp set by the kernel when the query was received (when available)")="";
+    ::arg().set("distribution-pipe-buffer-size", "Size in bytes of the internal buffer of the pipe used by the distributor to pass incoming queries to a worker thread")="0";
 
     ::arg().set("include-dir","Include *.conf files from this directory")="";
     ::arg().set("security-poll-suffix","Domain name from which to query security update notifications")="secpoll.powerdns.com.";