]> git.ipfire.org Git - thirdparty/pdns.git/blobdiff - pdns/pdns_recursor.cc
Merge pull request #7631 from omoerbeek/rec-ecs-cache-limit-with-ttl
[thirdparty/pdns.git] / pdns / pdns_recursor.cc
index d89e9eff34b6b0b131d8febd8a15085232becf69..ae37f9da8d92a4d7daae3a05c00aad63e92faf07 100644 (file)
 
 #include "namespaces.hh"
 
+#ifdef HAVE_PROTOBUF
+#include "uuid-utils.hh"
+#endif
+
 #include "xpf.hh"
 
 typedef map<ComboAddress, uint32_t, ComboAddress::addressOnlyLessThan> tcpClientCounts_t;
@@ -124,9 +128,6 @@ thread_local FDMultiplexer* t_fdm{nullptr};
 thread_local std::unique_ptr<addrringbuf_t> t_remotes, t_servfailremotes, t_largeanswerremotes, t_bogusremotes;
 thread_local std::unique_ptr<boost::circular_buffer<pair<DNSName, uint16_t> > > t_queryring, t_servfailqueryring, t_bogusqueryring;
 thread_local std::shared_ptr<NetmaskGroup> t_allowFrom;
-#ifdef HAVE_PROTOBUF
-thread_local std::unique_ptr<boost::uuids::random_generator> t_uuidGenerator;
-#endif
 #ifdef NOD_ENABLED
 thread_local std::shared_ptr<nod::NODDB> t_nodDBp;
 thread_local std::shared_ptr<nod::UniqueResponseDB> t_udrDBp;
@@ -160,6 +161,8 @@ struct RecThreadInfo
   deferredAdd_t deferredAdds;
   struct ThreadPipeSet pipes;
   std::thread thread;
+  MT_t* mt{nullptr};
+  uint64_t numberOfDistributedQueries{0};
   /* handle the web server, carbon, statistics and the control channel */
   bool isHandler{false};
   /* accept incoming queries (and distributes them to the workers if pdns-distributes-queries is set) */
@@ -225,6 +228,7 @@ static std::set<uint16_t> s_avoidUdpSourcePorts;
 #endif
 static uint16_t s_minUdpSourcePort;
 static uint16_t s_maxUdpSourcePort;
+static double s_balancingFactor;
 
 RecursorControlChannel s_rcc; // only active in the handler thread
 RecursorStats g_stats;
@@ -622,6 +626,7 @@ public:
     if(!tries)
       throw PDNSException("Resolver binding to local query client socket on "+sin.toString()+": "+stringerror());
 
+    setReceiveSocketErrors(ret, family);
     setNonBlocking(ret);
     return ret;
   }
@@ -877,7 +882,7 @@ static std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>> startProtobuf
 
   for (const auto& server : config.servers) {
     try {
-      result->emplace_back(new RemoteLogger(server, config.timeout, config.maxQueuedEntries, config.reconnectWaitTime, config.asyncConnect));
+      result->emplace_back(new RemoteLogger(server, config.timeout, 100*config.maxQueuedEntries, config.reconnectWaitTime, config.asyncConnect));
     }
     catch(const std::exception& e) {
       g_log<<Logger::Error<<"Error while starting protobuf logger to '"<<server<<": "<<e.what()<<endl;
@@ -1043,7 +1048,6 @@ static void startDoResolve(void *p)
         maxanswersize = min(static_cast<uint16_t>(edo.d_packetsize >= 512 ? edo.d_packetsize : 512), g_udpTruncationThreshold);
       }
       ednsOpts = edo.d_options;
-      haveEDNS=true;
       maxanswersize -= 11; // EDNS header size
 
       for (const auto& o : edo.d_options) {
@@ -1942,7 +1946,7 @@ static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var)
       if(t_protobufServers || t_outgoingProtobufServers) {
         dc->d_requestorId = requestorId;
         dc->d_deviceId = deviceId;
-        dc->d_uuid = (*t_uuidGenerator)();
+        dc->d_uuid = getUniqueID();
       }
 
       if(t_protobufServers) {
@@ -1958,6 +1962,15 @@ static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var)
         }
       }
 #endif
+      if(t_pdl) {
+        if(t_pdl->ipfilter(dc->d_source, dc->d_destination, *dh)) {
+          if(!g_quiet)
+            g_log<<Logger::Notice<<t_id<<" ["<<MT->getTid()<<"/"<<MT->numProcesses()<<"] DROPPED TCP question from "<<dc->d_source.toStringWithPort()<<(dc->d_source != dc->d_remote ? " (via "+dc->d_remote.toStringWithPort()+")" : "")<<" based on policy"<<endl;
+          g_stats.policyDrops++;
+          return;
+        }
+      }
+
       if(dc->d_mdp.d_header.qr) {
         g_stats.ignoredCount++;
         if(g_logCommonErrors) {
@@ -2077,10 +2090,10 @@ static string* doProcessUDPQuestion(const std::string& question, const ComboAddr
   boost::uuids::uuid uniqueId;
   auto luaconfsLocal = g_luaconfs.getLocal();
   if (checkProtobufExport(luaconfsLocal)) {
-    uniqueId = (*t_uuidGenerator)();
+    uniqueId = getUniqueID();
     needECS = true;
   } else if (checkOutgoingProtobufExport(luaconfsLocal)) {
-    uniqueId = (*t_uuidGenerator)();
+    uniqueId = getUniqueID();
   }
   logQuery = t_protobufServers && luaconfsLocal->protobufExportConfig.logQueries;
   bool logResponse = t_protobufServers && luaconfsLocal->protobufExportConfig.logResponses;
@@ -2375,6 +2388,7 @@ static void handleNewUDPQuestion(int fd, FDMultiplexer::funcparam_t& var)
             distributeAsyncFunction(data, boost::bind(doProcessUDPQuestion, data, fromaddr, dest, tv, fd));
           }
           else {
+            ++s_threadInfos[t_id].numberOfDistributedQueries;
             doProcessUDPQuestion(data, fromaddr, dest, tv, fd);
           }
         }
@@ -2623,6 +2637,14 @@ static void doStats(void)
     g_log<<Logger::Notice<<"stats: " <<  broadcastAccFunction<uint64_t>(pleaseGetPacketCacheSize) <<
     " packet cache entries, "<<(int)(100.0*broadcastAccFunction<uint64_t>(pleaseGetPacketCacheHits)/SyncRes::s_queries) << "% packet cache hits"<<endl;
 
+    size_t idx = 0;
+    for (const auto& threadInfo : s_threadInfos) {
+      if(threadInfo.isWorker) {
+        g_log<<Logger::Notice<<"Thread "<<idx<<" has been distributed "<<threadInfo.numberOfDistributedQueries<<" queries"<<endl;
+        ++idx;
+      }
+    }
+
     time_t now = time(0);
     if(lastOutputTime && lastQueryCount && now != lastOutputTime) {
       g_log<<Logger::Notice<<"stats: "<< (SyncRes::s_queries - lastQueryCount) / (now - lastOutputTime) <<" qps (average over "<< (now - lastOutputTime) << " seconds)"<<endl;
@@ -2808,7 +2830,7 @@ void broadcastFunction(const pipefunc_t& func)
 
 static bool trySendingQueryToWorker(unsigned int target, ThreadMSG* tmsg)
 {
-  const auto& targetInfo = s_threadInfos[target];
+  auto& targetInfo = s_threadInfos[target];
   if(!targetInfo.isWorker) {
     g_log<<Logger::Error<<"distributeAsyncFunction() tried to assign a query to a non-worker thread"<<endl;
     exit(1);
@@ -2833,9 +2855,52 @@ static bool trySendingQueryToWorker(unsigned int target, ThreadMSG* tmsg)
     }
   }
 
+  ++targetInfo.numberOfDistributedQueries;
+
   return true;
 }
 
+static unsigned int getWorkerLoad(size_t workerIdx)
+{
+  const auto mt = s_threadInfos[/* skip handler */ 1 + g_numDistributorThreads + workerIdx].mt;
+  if (mt != nullptr) {
+    return mt->numProcesses();
+  }
+  return 0;
+}
+
+static unsigned int selectWorker(unsigned int hash)
+{
+  if (s_balancingFactor == 0) {
+    return /* skip handler */ 1 + g_numDistributorThreads + (hash % g_numWorkerThreads);
+  }
+
+  /* we start with one, representing the query we are currently handling */
+  double currentLoad = 1;
+  std::vector<unsigned int> load(g_numWorkerThreads);
+  for (size_t idx = 0; idx < g_numWorkerThreads; idx++) {
+    load[idx] = getWorkerLoad(idx);
+    currentLoad += load[idx];
+    // cerr<<"load for worker "<<idx<<" is "<<load[idx]<<endl;
+  }
+
+  double targetLoad = (currentLoad / g_numWorkerThreads) * s_balancingFactor;
+  // cerr<<"total load is "<<currentLoad<<", number of workers is "<<g_numWorkerThreads<<", target load is "<<targetLoad<<endl;
+
+  unsigned int worker = hash % g_numWorkerThreads;
+  /* at least one server has to be at or below the average load */
+  if (load[worker] > targetLoad) {
+    ++g_stats.rebalancedQueries;
+    do {
+      // cerr<<"worker "<<worker<<" is above the target load, selecting another one"<<endl;
+      worker = (worker + 1) % g_numWorkerThreads;
+    }
+    while(load[worker] > targetLoad);
+  }
+
+  return /* skip handler */ 1 + g_numDistributorThreads + worker;
+}
+
 // This function is only called by the distributor threads, when pdns-distributes-queries is set
 void distributeAsyncFunction(const string& packet, const pipefunc_t& func)
 {
@@ -2845,7 +2910,7 @@ void distributeAsyncFunction(const string& packet, const pipefunc_t& func)
   }
 
   unsigned int hash = hashQuestion(packet.c_str(), packet.length(), g_disthashseed);
-  unsigned int target = /* skip handler */ 1 + g_numDistributorThreads + (hash % g_numWorkerThreads);
+  unsigned int target = selectWorker(hash);
 
   ThreadMSG* tmsg = new ThreadMSG();
   tmsg->func = func;
@@ -2969,27 +3034,27 @@ template vector<pair<DNSName,uint16_t> > broadcastAccFunction(const boost::funct
 
 static void handleRCC(int fd, FDMultiplexer::funcparam_t& var)
 {
-  string remote;
-  string msg=s_rcc.recv(&remote);
-  RecursorControlParser rcp;
-  RecursorControlParser::func_t* command;
+  try {
+    string remote;
+    string msg=s_rcc.recv(&remote);
+    RecursorControlParser rcp;
+    RecursorControlParser::func_t* command;
 
-  string answer=rcp.getAnswer(msg, &command);
+    string answer=rcp.getAnswer(msg, &command);
 
-  // If we are inside a chroot, we need to strip
-  if (!arg()["chroot"].empty()) {
-    size_t len = arg()["chroot"].length();
-    remote = remote.substr(len);
-  }
+    // If we are inside a chroot, we need to strip
+    if (!arg()["chroot"].empty()) {
+      size_t len = arg()["chroot"].length();
+      remote = remote.substr(len);
+    }
 
-  try {
     s_rcc.send(answer, &remote);
     command();
   }
-  catch(std::exception& e) {
+  catch(const std::exception& e) {
     g_log<<Logger::Error<<"Error dealing with control socket request: "<<e.what()<<endl;
   }
-  catch(PDNSException& ae) {
+  catch(const PDNSException& ae) {
     g_log<<Logger::Error<<"Error dealing with control socket request: "<<ae.reason<<endl;
   }
 }
@@ -3620,10 +3685,12 @@ static int serviceMain(int argc, char*argv[])
   }
 
   SyncRes::s_minimumTTL = ::arg().asNum("minimum-ttl-override");
+  SyncRes::s_minimumECSTTL = ::arg().asNum("ecs-minimum-ttl-override");
 
   SyncRes::s_nopacketcache = ::arg().mustDo("disable-packetcache");
 
   SyncRes::s_maxnegttl=::arg().asNum("max-negative-ttl");
+  SyncRes::s_maxbogusttl=::arg().asNum("max-cache-bogus-ttl");
   SyncRes::s_maxcachettl=max(::arg().asNum("max-cache-ttl"), 15);
   SyncRes::s_packetcachettl=::arg().asNum("packetcache-ttl");
   // Cap the packetcache-servfail-ttl to the packetcache-ttl
@@ -3642,6 +3709,10 @@ static int serviceMain(int argc, char*argv[])
 
   SyncRes::s_ecsipv4limit = ::arg().asNum("ecs-ipv4-bits");
   SyncRes::s_ecsipv6limit = ::arg().asNum("ecs-ipv6-bits");
+  SyncRes::clearECSStats();
+  SyncRes::s_ecsipv4cachelimit = ::arg().asNum("ecs-ipv4-cache-bits");
+  SyncRes::s_ecsipv6cachelimit = ::arg().asNum("ecs-ipv6-cache-bits");
+  SyncRes::s_ecscachelimitttl = ::arg().asNum("ecs-cache-limit-ttl");
 
   if (!::arg().isEmpty("ecs-scope-zero-address")) {
     ComboAddress scopeZero(::arg()["ecs-scope-zero-address"]);
@@ -3705,6 +3776,12 @@ static int serviceMain(int argc, char*argv[])
 
   g_statisticsInterval = ::arg().asNum("statistics-interval");
 
+  s_balancingFactor = ::arg().asDouble("distribution-load-factor");
+  if (s_balancingFactor != 0.0 && s_balancingFactor < 1.0) {
+    s_balancingFactor = 0.0;
+    g_log<<Logger::Warning<<"Asked to run with a distribution-load-factor below 1.0, disabling it instead"<<endl;
+  }
+
 #ifdef SO_REUSEPORT
   g_reusePort = ::arg().mustDo("reuseport");
 #endif
@@ -3846,6 +3923,11 @@ static int serviceMain(int argc, char*argv[])
   g_tcpMaxQueriesPerConn=::arg().asNum("max-tcp-queries-per-connection");
   s_maxUDPQueriesPerRound=::arg().asNum("max-udp-queries-per-round");
 
+  blacklistStats(StatComponent::API, ::arg()["stats-api-blacklist"]);
+  blacklistStats(StatComponent::Carbon, ::arg()["stats-carbon-blacklist"]);
+  blacklistStats(StatComponent::RecControl, ::arg()["stats-rec-control-blacklist"]);
+  blacklistStats(StatComponent::SNMP, ::arg()["stats-snmp-blacklist"]);
+
   if (::arg().mustDo("snmp-agent")) {
     g_snmpAgent = std::make_shared<RecursorSNMPAgent>("recursor", ::arg()["snmp-master-socket"]);
     g_snmpAgent->run();
@@ -3952,17 +4034,15 @@ try
 
   t_packetCache = std::unique_ptr<RecursorPacketCache>(new RecursorPacketCache());
 
-#ifdef HAVE_PROTOBUF
-  t_uuidGenerator = std::unique_ptr<boost::uuids::random_generator>(new boost::uuids::random_generator());
-#endif
   g_log<<Logger::Warning<<"Done priming cache with root hints"<<endl;
 
 #ifdef NOD_ENABLED
   if (threadInfo.isWorker)
     setupNODThread();
 #endif /* NOD_ENABLED */
-  
-  if(threadInfo.isWorker) {
+
+  /* the listener threads handle TCP queries */
+  if(threadInfo.isWorker || threadInfo.isListener) {
     try {
       if(!::arg()["lua-dns-script"].empty()) {
         t_pdl = std::make_shared<RecursorLua4>();
@@ -4001,6 +4081,7 @@ try
   }
 
   MT=std::unique_ptr<MTasker<PacketID,string> >(new MTasker<PacketID,string>(::arg().asNum("stack-size")));
+  threadInfo.mt = MT.get();
 
 #ifdef HAVE_PROTOBUF
   /* start protobuf export threads if needed */
@@ -4098,7 +4179,8 @@ try
     }
     if (t_pdl != nullptr) {
       // lua-dns-script directive is present, call the maintenance callback if needed
-      if (threadInfo.isWorker) {
+      /* remember that the listener threads handle TCP queries */
+      if (threadInfo.isWorker || threadInfo.isListener) {
         // Only on threads processing queries
         if(g_now.tv_sec - last_lua_maintenance >= luaMaintenanceInterval) {
           t_pdl->maintenance();
@@ -4149,6 +4231,7 @@ int main(int argc, char **argv)
   g_argc = argc;
   g_argv = argv;
   g_stats.startupTime=time(0);
+  Utility::srandom();
   versionSetProduct(ProductRecursor);
   reportBasicTypes();
   reportOtherTypes();
@@ -4187,7 +4270,7 @@ int main(int argc, char **argv)
     ::arg().set("webserver-port", "Port of webserver to listen on") = "8082";
     ::arg().set("webserver-password", "Password required for accessing the webserver") = "";
     ::arg().set("webserver-allow-from","Webserver access is only allowed from these subnets")="127.0.0.1,::1";
-    ::arg().set("webserver-log-level","Amount of logging in the webserver (none, common, detailed)")="common";
+    ::arg().set("webserver-loglevel", "Amount of logging in the webserver (none, normal, detailed)") = "normal";
     ::arg().set("carbon-ourname", "If set, overrides our reported hostname for carbon stats")="";
     ::arg().set("carbon-server", "If set, send metrics in carbon (graphite) format to this server IP address")="";
     ::arg().set("carbon-interval", "Number of seconds between carbon (graphite) updates")="30";
@@ -4214,6 +4297,7 @@ int main(int argc, char **argv)
     ::arg().set("hint-file", "If set, load root hints from this file")="";
     ::arg().set("max-cache-entries", "If set, maximum number of entries in the main cache")="1000000";
     ::arg().set("max-negative-ttl", "maximum number of seconds to keep a negative cached entry in memory")="3600";
+    ::arg().set("max-cache-bogus-ttl", "maximum number of seconds to keep a Bogus (positive or negative) cached entry in memory")="3600";
     ::arg().set("max-cache-ttl", "maximum number of seconds to keep a cached entry in memory")="86400";
     ::arg().set("packetcache-ttl", "maximum number of seconds to keep a cached entry in packetcache")="3600";
     ::arg().set("max-packetcache-entries", "maximum number of entries to keep in the packetcache")="500000";
@@ -4244,7 +4328,11 @@ int main(int argc, char **argv)
     ::arg().set("latency-statistic-size","Number of latency values to calculate the qa-latency average")="10000";
     ::arg().setSwitch( "disable-packetcache", "Disable packetcache" )= "no";
     ::arg().set("ecs-ipv4-bits", "Number of bits of IPv4 address to pass for EDNS Client Subnet")="24";
+    ::arg().set("ecs-ipv4-cache-bits", "Maximum number of bits of IPv4 mask to cache ECS response")="24";
     ::arg().set("ecs-ipv6-bits", "Number of bits of IPv6 address to pass for EDNS Client Subnet")="56";
+    ::arg().set("ecs-ipv6-cache-bits", "Maximum number of bits of IPv6 mask to cache ECS response")="56";
+    ::arg().set("ecs-minimum-ttl-override", "Set under adverse conditions, a minimum TTL for records in ECS-specific answers")="0";
+    ::arg().set("ecs-cache-limit-ttl", "Minimum TTL to cache ECS response")="0";
     ::arg().set("edns-subnet-whitelist", "List of netmasks and domains that we should enable EDNS subnet for")="";
     ::arg().set("ecs-add-for", "List of client netmasks for which EDNS Client Subnet will be added")="0.0.0.0/0, ::/0, " LOCAL_NETS_INVERSE;
     ::arg().set("ecs-scope-zero-address", "Address to send to whitelisted authoritative servers for incoming queries with ECS prefix-length source of 0")="";
@@ -4270,6 +4358,18 @@ int main(int argc, char **argv)
     ::arg().setSwitch("snmp-agent", "If set, register as an SNMP agent")="no";
     ::arg().set("snmp-master-socket", "If set and snmp-agent is set, the socket to use to register to the SNMP master")="";
 
+    std::string defaultBlacklistedStats = "cache-bytes, packetcache-bytes, special-memory-usage";
+    for (size_t idx = 0; idx < 32; idx++) {
+      defaultBlacklistedStats += ", ecs-v4-response-bits-" + std::to_string(idx + 1);
+    }
+    for (size_t idx = 0; idx < 128; idx++) {
+      defaultBlacklistedStats += ", ecs-v6-response-bits-" + std::to_string(idx + 1);
+    }
+    ::arg().set("stats-api-blacklist", "List of statistics that are disabled when retrieving the complete list of statistics via the API")=defaultBlacklistedStats;
+    ::arg().set("stats-carbon-blacklist", "List of statistics that are prevented from being exported via Carbon")=defaultBlacklistedStats;
+    ::arg().set("stats-rec-control-blacklist", "List of statistics that are prevented from being exported via rec_control get-all")=defaultBlacklistedStats;
+    ::arg().set("stats-snmp-blacklist", "List of statistics that are prevented from being exported via SNMP")=defaultBlacklistedStats;
+
     ::arg().set("tcp-fast-open", "Enable TCP Fast Open support on the listening sockets, using the supplied numerical value as the queue size")="0";
     ::arg().set("nsec3-max-iterations", "Maximum number of iterations allowed for an NSEC3 record")="2500";
 
@@ -4285,6 +4385,7 @@ int main(int argc, char **argv)
     ::arg().set("udp-source-port-avoid", "List of comma separated UDP port number to avoid")="11211";
     ::arg().set("rng", "Specify random number generator to use. Valid values are auto,sodium,openssl,getrandom,arc4random,urandom.")="auto";
     ::arg().set("public-suffix-list-file", "Path to the Public Suffix List file, if any")="";
+    ::arg().set("distribution-load-factor", "The load factor used when PowerDNS is distributing queries to worker threads")="0.0";
 #ifdef NOD_ENABLED
     ::arg().set("new-domain-tracking", "Track newly observed domains (i.e. never seen before).")="no";
     ::arg().set("new-domain-log", "Log newly observed domains.")="yes";