]> git.ipfire.org Git - thirdparty/pdns.git/blobdiff - pdns/dnsdist.cc
dnsdist: Add steal, iowait and UDP errors metrics
[thirdparty/pdns.git] / pdns / dnsdist.cc
index d9a149df5370f82d0d350be560a6f23229b32466..a8c74d90b45f23a9d9aea1e376f8e48ee035c943 100644 (file)
@@ -45,6 +45,7 @@
 #include "dnsdist-cache.hh"
 #include "dnsdist-console.hh"
 #include "dnsdist-ecs.hh"
+#include "dnsdist-healthchecks.hh"
 #include "dnsdist-lua.hh"
 #include "dnsdist-rings.hh"
 #include "dnsdist-secpoll.hh"
@@ -55,7 +56,6 @@
 #include "dolog.hh"
 #include "dnsname.hh"
 #include "dnsparser.hh"
-#include "dnswriter.hh"
 #include "ednsoptions.hh"
 #include "gettime.hh"
 #include "lock.hh"
@@ -81,10 +81,8 @@ using std::thread;
 bool g_verbose;
 
 struct DNSDistStats g_stats;
-MetricDefinitionStorage g_metricDefinitions;
 
 uint16_t g_maxOutstanding{std::numeric_limits<uint16_t>::max()};
-bool g_verboseHealthChecks{false};
 uint32_t g_staleCacheEntriesTTL{0};
 bool g_syslog{true};
 bool g_allowEmptyResponse{false};
@@ -788,7 +786,7 @@ void DownstreamState::setWeight(int newWeight)
   }
 }
 
-DownstreamState::DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf_, const std::string& sourceItfName_, size_t numberOfSockets): sourceItfName(sourceItfName_), remote(remote_), sourceAddr(sourceAddr_), sourceItf(sourceItf_)
+DownstreamState::DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf_, const std::string& sourceItfName_, size_t numberOfSockets, bool connect=true): sourceItfName(sourceItfName_), remote(remote_), sourceAddr(sourceAddr_), sourceItf(sourceItf_)
 {
   pthread_rwlock_init(&d_lock, nullptr);
   id = getUniqueID();
@@ -801,7 +799,7 @@ DownstreamState::DownstreamState(const ComboAddress& remote_, const ComboAddress
     fd = -1;
   }
 
-  if (!IsAnyAddress(remote)) {
+  if (connect && !IsAnyAddress(remote)) {
     reconnect();
     idStates.resize(g_maxOutstanding);
     sw.start();
@@ -882,6 +880,7 @@ shared_ptr<DownstreamState> wrandom(const NumberedServerVector& servers, const D
 }
 
 uint32_t g_hashperturb;
+double g_consistentHashBalancingFactor = 0;
 shared_ptr<DownstreamState> whashed(const NumberedServerVector& servers, const DNSQuestion* dq)
 {
   return valrandom(dq->qname->hash(g_hashperturb), servers, dq);
@@ -894,8 +893,18 @@ shared_ptr<DownstreamState> chashed(const NumberedServerVector& servers, const D
   unsigned int min = std::numeric_limits<unsigned int>::max();
   shared_ptr<DownstreamState> ret = nullptr, first = nullptr;
 
+  double targetLoad = std::numeric_limits<double>::max();
+  if (g_consistentHashBalancingFactor > 0) {
+    /* we start with one, representing the query we are currently handling */
+    double currentLoad = 1;
+    for (const auto& pair : servers) {
+      currentLoad += pair.second->outstanding;
+    }
+    targetLoad = (currentLoad / servers.size()) * g_consistentHashBalancingFactor;
+  }
+
   for (const auto& d: servers) {
-    if (d.second->isUp()) {
+    if (d.second->isUp() && d.second->outstanding <= targetLoad) {
       // make sure hashes have been computed
       if (d.second->hashes.empty()) {
         d.second->hash();
@@ -1088,6 +1097,9 @@ bool processRulesResult(const DNSAction::Action& action, DNSQuestion& dq, std::s
   case DNSAction::Action::Truncate:
     dq.dh->tc = true;
     dq.dh->qr = true;
+    dq.dh->ra = dq.dh->rd;
+    dq.dh->aa = false;
+    dq.dh->ad = false;
     return true;
     break;
   case DNSAction::Action::HeaderModify:
@@ -1175,6 +1187,9 @@ static bool applyRulesToQuery(LocalHolders& holders, DNSQuestion& dq, const stru
           vinfolog("Query from %s truncated because of dynamic block", dq.remote->toStringWithPort());
           dq.dh->tc = true;
           dq.dh->qr = true;
+          dq.dh->ra = dq.dh->rd;
+          dq.dh->aa = false;
+          dq.dh->ad = false;
           return true;
         }
         else {
@@ -1230,6 +1245,9 @@ static bool applyRulesToQuery(LocalHolders& holders, DNSQuestion& dq, const stru
           vinfolog("Query from %s for %s truncated because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toLogString());
           dq.dh->tc = true;
           dq.dh->qr = true;
+          dq.dh->ra = dq.dh->rd;
+          dq.dh->aa = false;
+          dq.dh->ad = false;
           return true;
         }
         else {
@@ -1487,6 +1505,8 @@ ProcessQueryResult processQuery(DNSQuestion& dq, ClientState& cs, LocalHolders&
 
     if (dq.useECS && ((selectedBackend && selectedBackend->useECS) || (!selectedBackend && serverPool->getECS()))) {
       // we special case our cache in case a downstream explicitly gave us a universally valid response with a 0 scope
+      // we need ECS parsing (parseECS) to be true so we can be sure that the initial incoming query did not have an existing
+      // ECS option, which would make it unsuitable for the zero-scope feature.
       if (dq.packetCache && !dq.skipCache && (!selectedBackend || !selectedBackend->disableZeroScope) && dq.packetCache->isECSParsingEnabled()) {
         if (dq.packetCache->get(dq, dq.consumed, dq.dh->id, reinterpret_cast<char*>(dq.dh), &cachedResponseSize, &dq.cacheKeyNoECS, dq.subnet, dq.dnssecOK, allowExpired)) {
           dq.len = cachedResponseSize;
@@ -1821,147 +1841,12 @@ catch(...)
 uint16_t getRandomDNSID()
 {
 #ifdef HAVE_LIBSODIUM
-  return (randombytes_random() % 65536);
+  return randombytes_uniform(65536);
 #else
   return (random() % 65536);
 #endif
 }
 
-static bool upCheck(const shared_ptr<DownstreamState>& ds)
-try
-{
-  DNSName checkName = ds->checkName;
-  uint16_t checkType = ds->checkType.getCode();
-  uint16_t checkClass = ds->checkClass;
-  dnsheader checkHeader;
-  memset(&checkHeader, 0, sizeof(checkHeader));
-
-  checkHeader.qdcount = htons(1);
-  checkHeader.id = getRandomDNSID();
-
-  checkHeader.rd = true;
-  if (ds->setCD) {
-    checkHeader.cd = true;
-  }
-
-  if (ds->checkFunction) {
-    std::lock_guard<std::mutex> lock(g_luamutex);
-    auto ret = ds->checkFunction(checkName, checkType, checkClass, &checkHeader);
-    checkName = std::get<0>(ret);
-    checkType = std::get<1>(ret);
-    checkClass = std::get<2>(ret);
-  }
-
-  vector<uint8_t> packet;
-  DNSPacketWriter dpw(packet, checkName, checkType, checkClass);
-  dnsheader * requestHeader = dpw.getHeader();
-  *requestHeader = checkHeader;
-
-  Socket sock(ds->remote.sin4.sin_family, SOCK_DGRAM);
-  sock.setNonBlocking();
-  if (!IsAnyAddress(ds->sourceAddr)) {
-    sock.setReuseAddr();
-    if (!ds->sourceItfName.empty()) {
-#ifdef SO_BINDTODEVICE
-      int res = setsockopt(sock.getHandle(), SOL_SOCKET, SO_BINDTODEVICE, ds->sourceItfName.c_str(), ds->sourceItfName.length());
-      if (res != 0 && g_verboseHealthChecks) {
-        infolog("Error settting SO_BINDTODEVICE on the health check socket for backend '%s': %s", ds->getNameWithAddr(), stringerror());
-      }
-#endif
-    }
-    sock.bind(ds->sourceAddr);
-  }
-  sock.connect(ds->remote);
-  ssize_t sent = udpClientSendRequestToBackend(ds, sock.getHandle(), reinterpret_cast<char*>(&packet[0]), packet.size(), true);
-  if (sent < 0) {
-    int ret = errno;
-    if (g_verboseHealthChecks)
-      infolog("Error while sending a health check query to backend %s: %d", ds->getNameWithAddr(), ret);
-    return false;
-  }
-
-  int ret = waitForRWData(sock.getHandle(), true, /* ms to seconds */ ds->checkTimeout / 1000, /* remaining ms to us */ (ds->checkTimeout % 1000) * 1000);
-  if(ret < 0 || !ret) { // error, timeout, both are down!
-    if (ret < 0) {
-      ret = errno;
-      if (g_verboseHealthChecks)
-        infolog("Error while waiting for the health check response from backend %s: %d", ds->getNameWithAddr(), ret);
-    }
-    else {
-      if (g_verboseHealthChecks)
-        infolog("Timeout while waiting for the health check response from backend %s", ds->getNameWithAddr());
-    }
-    return false;
-  }
-
-  string reply;
-  ComboAddress from;
-  sock.recvFrom(reply, from);
-
-  /* we are using a connected socket but hey.. */
-  if (from != ds->remote) {
-    if (g_verboseHealthChecks)
-      infolog("Invalid health check response received from %s, expecting one from %s", from.toStringWithPort(), ds->remote.toStringWithPort());
-    return false;
-  }
-
-  const dnsheader * responseHeader = reinterpret_cast<const dnsheader *>(reply.c_str());
-
-  if (reply.size() < sizeof(*responseHeader)) {
-    if (g_verboseHealthChecks)
-      infolog("Invalid health check response of size %d from backend %s, expecting at least %d", reply.size(), ds->getNameWithAddr(), sizeof(*responseHeader));
-    return false;
-  }
-
-  if (responseHeader->id != requestHeader->id) {
-    if (g_verboseHealthChecks)
-      infolog("Invalid health check response id %d from backend %s, expecting %d", responseHeader->id, ds->getNameWithAddr(), requestHeader->id);
-    return false;
-  }
-
-  if (!responseHeader->qr) {
-    if (g_verboseHealthChecks)
-      infolog("Invalid health check response from backend %s, expecting QR to be set", ds->getNameWithAddr());
-    return false;
-  }
-
-  if (responseHeader->rcode == RCode::ServFail) {
-    if (g_verboseHealthChecks)
-      infolog("Backend %s responded to health check with ServFail", ds->getNameWithAddr());
-    return false;
-  }
-
-  if (ds->mustResolve && (responseHeader->rcode == RCode::NXDomain || responseHeader->rcode == RCode::Refused)) {
-    if (g_verboseHealthChecks)
-      infolog("Backend %s responded to health check with %s while mustResolve is set", ds->getNameWithAddr(), responseHeader->rcode == RCode::NXDomain ? "NXDomain" : "Refused");
-    return false;
-  }
-
-  uint16_t receivedType;
-  uint16_t receivedClass;
-  DNSName receivedName(reply.c_str(), reply.size(), sizeof(dnsheader), false, &receivedType, &receivedClass);
-
-  if (receivedName != checkName || receivedType != checkType || receivedClass != checkClass) {
-    if (g_verboseHealthChecks)
-      infolog("Backend %s responded to health check with an invalid qname (%s vs %s), qtype (%s vs %s) or qclass (%d vs %d)", ds->getNameWithAddr(), receivedName.toLogString(), checkName.toLogString(), QType(receivedType).getName(), QType(checkType).getName(), receivedClass, checkClass);
-    return false;
-  }
-
-  return true;
-}
-catch(const std::exception& e)
-{
-  if (g_verboseHealthChecks)
-    infolog("Error checking the health of backend %s: %s", ds->getNameWithAddr(), e.what());
-  return false;
-}
-catch(...)
-{
-  if (g_verboseHealthChecks)
-    infolog("Unknown exception while checking the health of backend %s", ds->getNameWithAddr());
-  return false;
-}
-
 uint64_t g_maxTCPClientThreads{10};
 std::atomic<uint16_t> g_cacheCleaningDelay{60};
 std::atomic<uint16_t> g_cacheCleaningPercentage{100};
@@ -2058,68 +1943,27 @@ static void healthChecksThread()
 {
   setThreadName("dnsdist/healthC");
 
-  int interval = 1;
+  static const int interval = 1;
 
   for(;;) {
     sleep(interval);
 
-    if(g_tcpclientthreads->getQueuedCount() > 1 && !g_tcpclientthreads->hasReachedMaxThreads())
+    if(g_tcpclientthreads->getQueuedCount() > 1 && !g_tcpclientthreads->hasReachedMaxThreads()) {
       g_tcpclientthreads->addTCPClientThread();
+    }
 
+    auto mplexer = std::shared_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
     auto states = g_dstates.getLocal(); // this points to the actual shared_ptrs!
     for(auto& dss : *states) {
-      if(++dss->lastCheck < dss->checkInterval)
+      if(++dss->lastCheck < dss->checkInterval) {
         continue;
-      dss->lastCheck = 0;
-      if(dss->availability==DownstreamState::Availability::Auto) {
-        bool newState=upCheck(dss);
-        if (newState) {
-          /* check succeeded */
-          dss->currentCheckFailures = 0;
-
-          if (!dss->upStatus) {
-            /* we were marked as down */
-            dss->consecutiveSuccessfulChecks++;
-            if (dss->consecutiveSuccessfulChecks < dss->minRiseSuccesses) {
-              /* if we need more than one successful check to rise
-                 and we didn't reach the threshold yet,
-                 let's stay down */
-              newState = false;
-            }
-          }
-        }
-        else {
-          /* check failed */
-          dss->consecutiveSuccessfulChecks = 0;
-
-          if (dss->upStatus) {
-            /* we are currently up */
-            dss->currentCheckFailures++;
-            if (dss->currentCheckFailures < dss->maxCheckFailures) {
-              /* we need more than one failure to be marked as down,
-                 and we did not reach the threshold yet, let's stay down */
-              newState = true;
-            }
-          }
-        }
-
-        if(newState != dss->upStatus) {
-          warnlog("Marking downstream %s as '%s'", dss->getNameWithAddr(), newState ? "up" : "down");
-
-          if (newState && !dss->connected) {
-            newState = dss->reconnect();
+      }
 
-            if (dss->connected && !dss->threadStarted.test_and_set()) {
-              dss->tid = thread(responderThread, dss);
-            }
-          }
+      dss->lastCheck = 0;
 
-          dss->upStatus = newState;
-          dss->currentCheckFailures = 0;
-          dss->consecutiveSuccessfulChecks = 0;
-          if (g_snmpAgent && g_snmpTrapsEnabled) {
-            g_snmpAgent->sendBackendStatusChangeTrap(dss);
-          }
+      if (dss->availability == DownstreamState::Availability::Auto) {
+        if (!queueHealthCheck(mplexer, dss)) {
+          updateHealthCheckResult(dss, false);
         }
       }
 
@@ -2129,7 +1973,7 @@ static void healthChecksThread()
       dss->prev.queries.store(dss->queries.load());
       dss->prev.reuseds.store(dss->reuseds.load());
       
-      for(IDState& ids  : dss->idStates) { // timeouts
+      for (IDState& ids  : dss->idStates) { // timeouts
         int64_t usageIndicator = ids.usageIndicator;
         if(IDState::isInUse(usageIndicator) && ids.age++ > g_udpTimeout) {
           /* We mark the state as unused as soon as possible
@@ -2164,6 +2008,8 @@ static void healthChecksThread()
         }          
       }
     }
+
+    handleQueuedHealthChecks(mplexer);
   }
 }
 
@@ -2613,7 +2459,7 @@ try
 
   g_policy.setState(leastOutstandingPol);
   if(g_cmdLine.beClient || !g_cmdLine.command.empty()) {
-    setupLua(true, g_cmdLine.config);
+    setupLua(true, false, g_cmdLine.config);
     if (clientAddress != ComboAddress())
       g_serverControl = clientAddress;
     doClient(g_serverControl, g_cmdLine.command);
@@ -2634,13 +2480,13 @@ try
   g_consoleACL.setState(consoleACL);
 
   if (g_cmdLine.checkConfig) {
-    setupLua(true, g_cmdLine.config);
+    setupLua(false, true, g_cmdLine.config);
     // No exception was thrown
     infolog("Configuration '%s' OK!", g_cmdLine.config);
     _exit(EXIT_SUCCESS);
   }
 
-  auto todo=setupLua(false, g_cmdLine.config);
+  auto todo=setupLua(false, false, g_cmdLine.config);
 
   auto localPools = g_pools.getCopy();
   {
@@ -2790,13 +2636,16 @@ try
 
   checkFileDescriptorsLimits(udpBindsCount, tcpBindsCount);
 
+  auto mplexer = std::shared_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
   for(auto& dss : g_dstates.getCopy()) { // it is a copy, but the internal shared_ptrs are the real deal
-    if(dss->availability==DownstreamState::Availability::Auto) {
-      bool newState=upCheck(dss);
-      warnlog("Marking downstream %s as '%s'", dss->getNameWithAddr(), newState ? "up" : "down");
-      dss->upStatus = newState;
+    if (dss->availability == DownstreamState::Availability::Auto) {
+      if (!queueHealthCheck(mplexer, dss, true)) {
+        dss->upStatus = false;
+        warnlog("Marking downstream %s as 'down'", dss->getNameWithAddr());
+      }
     }
   }
+  handleQueuedHealthChecks(mplexer, true);
 
   for(auto& cs : g_frontends) {
     if (cs->dohFrontend != nullptr) {