]> git.ipfire.org Git - thirdparty/pdns.git/blobdiff - pdns/dnsdist.cc
dnsdist: Add steal, iowait and UDP errors metrics
[thirdparty/pdns.git] / pdns / dnsdist.cc
index 901903e9843b4c2f6cdd1d6babc0220fb433f7d2..a8c74d90b45f23a9d9aea1e376f8e48ee035c943 100644 (file)
@@ -45,6 +45,7 @@
 #include "dnsdist-cache.hh"
 #include "dnsdist-console.hh"
 #include "dnsdist-ecs.hh"
+#include "dnsdist-healthchecks.hh"
 #include "dnsdist-lua.hh"
 #include "dnsdist-rings.hh"
 #include "dnsdist-secpoll.hh"
@@ -55,7 +56,6 @@
 #include "dolog.hh"
 #include "dnsname.hh"
 #include "dnsparser.hh"
-#include "dnswriter.hh"
 #include "ednsoptions.hh"
 #include "gettime.hh"
 #include "lock.hh"
@@ -81,10 +81,8 @@ using std::thread;
 bool g_verbose;
 
 struct DNSDistStats g_stats;
-MetricDefinitionStorage g_metricDefinitions;
 
 uint16_t g_maxOutstanding{std::numeric_limits<uint16_t>::max()};
-bool g_verboseHealthChecks{false};
 uint32_t g_staleCacheEntriesTTL{0};
 bool g_syslog{true};
 bool g_allowEmptyResponse{false};
@@ -144,6 +142,8 @@ bool g_fixupCase{false};
 bool g_preserveTrailingData{false};
 bool g_roundrobinFailOnNoServer{false};
 
+std::set<std::string> g_capabilitiesToRetain;
+
 static void truncateTC(char* packet, uint16_t* len, size_t responseSize, unsigned int consumed)
 try
 {
@@ -437,7 +437,7 @@ bool processResponse(char** response, uint16_t* responseLen, size_t* responseSiz
     return false;
   }
 
-  if (dr.packetCache && !dr.skipCache) {
+  if (dr.packetCache && !dr.skipCache && *responseLen <= s_maxPacketCacheEntrySize) {
     if (!dr.useZeroScope) {
       /* if the query was not suitable for zero-scope, for
          example because it had an existing ECS entry so the hash is
@@ -513,7 +513,7 @@ void responderThread(std::shared_ptr<DownstreamState> dss)
 try {
   setThreadName("dnsdist/respond");
   auto localRespRulactions = g_resprulactions.getLocal();
-  char packet[4096 + DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE];
+  char packet[s_maxPacketCacheEntrySize + DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE];
   static_assert(sizeof(packet) <= UINT16_MAX, "Packet size should fit in a uint16_t");
   /* when the answer is encrypted in place, we need to get a copy
      of the original header before encryption to fill the ring buffer */
@@ -614,8 +614,10 @@ try {
             du->response = std::string(response, responseLen);
             if (send(du->rsock, &du, sizeof(du), 0) != sizeof(du)) {
               /* at this point we have the only remaining pointer on this
-                 DOHUnit object since we did set ids->du to nullptr earlier */
-              delete du;
+                 DOHUnit object since we did set ids->du to nullptr earlier,
+                 except if we got the response before the pointer could be
+                 released by the frontend */
+              du->release();
             }
 #endif /* HAVE_DNS_OVER_HTTPS */
             du = nullptr;
@@ -630,6 +632,10 @@ try {
         }
 
         ++g_stats.responses;
+        if (ids->cs) {
+          ++ids->cs->responses;
+        }
+        ++dss->responses;
 
         double udiff = ids->sentTime.udiff();
         vinfolog("Got answer from %s, relayed to %s%s, took %f usec", dss->remote.toStringWithPort(), ids->origRemote.toStringWithPort(),
@@ -700,6 +706,15 @@ bool DownstreamState::reconnect()
       fd = SSocket(remote.sin4.sin_family, SOCK_DGRAM, 0);
       if (!IsAnyAddress(sourceAddr)) {
         SSetsockopt(fd, SOL_SOCKET, SO_REUSEADDR, 1);
+        if (!sourceItfName.empty()) {
+#ifdef SO_BINDTODEVICE
+          int res = setsockopt(fd, SOL_SOCKET, SO_BINDTODEVICE, sourceItfName.c_str(), sourceItfName.length());
+          if (res != 0) {
+            infolog("Error setting up the interface on backend socket '%s': %s", remote.toStringWithPort(), stringerror());
+          }
+#endif
+        }
+
         SBind(fd, sourceAddr);
       }
       try {
@@ -771,7 +786,7 @@ void DownstreamState::setWeight(int newWeight)
   }
 }
 
-DownstreamState::DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf_, size_t numberOfSockets): remote(remote_), sourceAddr(sourceAddr_), sourceItf(sourceItf_)
+DownstreamState::DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf_, const std::string& sourceItfName_, size_t numberOfSockets, bool connect=true): sourceItfName(sourceItfName_), remote(remote_), sourceAddr(sourceAddr_), sourceItf(sourceItf_)
 {
   pthread_rwlock_init(&d_lock, nullptr);
   id = getUniqueID();
@@ -784,7 +799,7 @@ DownstreamState::DownstreamState(const ComboAddress& remote_, const ComboAddress
     fd = -1;
   }
 
-  if (!IsAnyAddress(remote)) {
+  if (connect && !IsAnyAddress(remote)) {
     reconnect();
     idStates.resize(g_maxOutstanding);
     sw.start();
@@ -865,6 +880,7 @@ shared_ptr<DownstreamState> wrandom(const NumberedServerVector& servers, const D
 }
 
 uint32_t g_hashperturb;
+double g_consistentHashBalancingFactor = 0;
 shared_ptr<DownstreamState> whashed(const NumberedServerVector& servers, const DNSQuestion* dq)
 {
   return valrandom(dq->qname->hash(g_hashperturb), servers, dq);
@@ -877,8 +893,18 @@ shared_ptr<DownstreamState> chashed(const NumberedServerVector& servers, const D
   unsigned int min = std::numeric_limits<unsigned int>::max();
   shared_ptr<DownstreamState> ret = nullptr, first = nullptr;
 
+  double targetLoad = std::numeric_limits<double>::max();
+  if (g_consistentHashBalancingFactor > 0) {
+    /* we start with one, representing the query we are currently handling */
+    double currentLoad = 1;
+    for (const auto& pair : servers) {
+      currentLoad += pair.second->outstanding;
+    }
+    targetLoad = (currentLoad / servers.size()) * g_consistentHashBalancingFactor;
+  }
+
   for (const auto& d: servers) {
-    if (d.second->isUp()) {
+    if (d.second->isUp() && d.second->outstanding <= targetLoad) {
       // make sure hashes have been computed
       if (d.second->hashes.empty()) {
         d.second->hash();
@@ -1071,6 +1097,9 @@ bool processRulesResult(const DNSAction::Action& action, DNSQuestion& dq, std::s
   case DNSAction::Action::Truncate:
     dq.dh->tc = true;
     dq.dh->qr = true;
+    dq.dh->ra = dq.dh->rd;
+    dq.dh->aa = false;
+    dq.dh->ad = false;
     return true;
     break;
   case DNSAction::Action::HeaderModify:
@@ -1158,6 +1187,9 @@ static bool applyRulesToQuery(LocalHolders& holders, DNSQuestion& dq, const stru
           vinfolog("Query from %s truncated because of dynamic block", dq.remote->toStringWithPort());
           dq.dh->tc = true;
           dq.dh->qr = true;
+          dq.dh->ra = dq.dh->rd;
+          dq.dh->aa = false;
+          dq.dh->ad = false;
           return true;
         }
         else {
@@ -1213,6 +1245,9 @@ static bool applyRulesToQuery(LocalHolders& holders, DNSQuestion& dq, const stru
           vinfolog("Query from %s for %s truncated because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toLogString());
           dq.dh->tc = true;
           dq.dh->qr = true;
+          dq.dh->ra = dq.dh->rd;
+          dq.dh->aa = false;
+          dq.dh->ad = false;
           return true;
         }
         else {
@@ -1442,6 +1477,7 @@ ProcessQueryResult processQuery(DNSQuestion& dq, ClientState& cs, LocalHolders&
       }
 
       ++g_stats.selfAnswered;
+      ++cs.responses;
       return ProcessQueryResult::SendAnswer;
     }
 
@@ -1469,6 +1505,8 @@ ProcessQueryResult processQuery(DNSQuestion& dq, ClientState& cs, LocalHolders&
 
     if (dq.useECS && ((selectedBackend && selectedBackend->useECS) || (!selectedBackend && serverPool->getECS()))) {
       // we special case our cache in case a downstream explicitly gave us a universally valid response with a 0 scope
+      // we need ECS parsing (parseECS) to be true so we can be sure that the initial incoming query did not have an existing
+      // ECS option, which would make it unsuitable for the zero-scope feature.
       if (dq.packetCache && !dq.skipCache && (!selectedBackend || !selectedBackend->disableZeroScope) && dq.packetCache->isECSParsingEnabled()) {
         if (dq.packetCache->get(dq, dq.consumed, dq.dh->id, reinterpret_cast<char*>(dq.dh), &cachedResponseSize, &dq.cacheKeyNoECS, dq.subnet, dq.dnssecOK, allowExpired)) {
           dq.len = cachedResponseSize;
@@ -1667,7 +1705,7 @@ static void MultipleMessagesUDPClientThread(ClientState* cs, LocalHolders& holde
 {
   struct MMReceiver
   {
-    char packet[4096];
+    char packet[s_maxPacketCacheEntrySize];
     ComboAddress remote;
     ComboAddress dest;
     struct iovec iov;
@@ -1758,7 +1796,7 @@ try
   else
 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
   {
-    char packet[4096];
+    char packet[s_maxPacketCacheEntrySize];
     /* the actual buffer is larger because:
        - we may have to add EDNS and/or ECS
        - we use it for self-generated responses (from rule or cache)
@@ -1773,7 +1811,7 @@ try
     ComboAddress remote;
     ComboAddress dest;
     remote.sin4.sin_family = cs->local.sin4.sin_family;
-    fillMSGHdr(&msgh, &iov, &cbuf, sizeof(cbuf), packet, sizeof(packet), &remote);
+    fillMSGHdr(&msgh, &iov, &cbuf, sizeof(cbuf), packet, s_udpIncomingBufferSize, &remote);
 
     for(;;) {
       ssize_t got = recvmsg(cs->udpFD, &msgh, 0);
@@ -1783,7 +1821,7 @@ try
         continue;
       }
 
-      processUDPQuery(*cs, holders, &msgh, remote, dest, packet, static_cast<uint16_t>(got), s_udpIncomingBufferSize, nullptr, nullptr, nullptr, nullptr);
+      processUDPQuery(*cs, holders, &msgh, remote, dest, packet, static_cast<uint16_t>(got), sizeof(packet), nullptr, nullptr, nullptr, nullptr);
     }
   }
 }
@@ -1803,139 +1841,12 @@ catch(...)
 uint16_t getRandomDNSID()
 {
 #ifdef HAVE_LIBSODIUM
-  return (randombytes_random() % 65536);
+  return randombytes_uniform(65536);
 #else
   return (random() % 65536);
 #endif
 }
 
-static bool upCheck(const shared_ptr<DownstreamState>& ds)
-try
-{
-  DNSName checkName = ds->checkName;
-  uint16_t checkType = ds->checkType.getCode();
-  uint16_t checkClass = ds->checkClass;
-  dnsheader checkHeader;
-  memset(&checkHeader, 0, sizeof(checkHeader));
-
-  checkHeader.qdcount = htons(1);
-  checkHeader.id = getRandomDNSID();
-
-  checkHeader.rd = true;
-  if (ds->setCD) {
-    checkHeader.cd = true;
-  }
-
-  if (ds->checkFunction) {
-    std::lock_guard<std::mutex> lock(g_luamutex);
-    auto ret = ds->checkFunction(checkName, checkType, checkClass, &checkHeader);
-    checkName = std::get<0>(ret);
-    checkType = std::get<1>(ret);
-    checkClass = std::get<2>(ret);
-  }
-
-  vector<uint8_t> packet;
-  DNSPacketWriter dpw(packet, checkName, checkType, checkClass);
-  dnsheader * requestHeader = dpw.getHeader();
-  *requestHeader = checkHeader;
-
-  Socket sock(ds->remote.sin4.sin_family, SOCK_DGRAM);
-  sock.setNonBlocking();
-  if (!IsAnyAddress(ds->sourceAddr)) {
-    sock.setReuseAddr();
-    sock.bind(ds->sourceAddr);
-  }
-  sock.connect(ds->remote);
-  ssize_t sent = udpClientSendRequestToBackend(ds, sock.getHandle(), reinterpret_cast<char*>(&packet[0]), packet.size(), true);
-  if (sent < 0) {
-    int ret = errno;
-    if (g_verboseHealthChecks)
-      infolog("Error while sending a health check query to backend %s: %d", ds->getNameWithAddr(), ret);
-    return false;
-  }
-
-  int ret = waitForRWData(sock.getHandle(), true, /* ms to seconds */ ds->checkTimeout / 1000, /* remaining ms to us */ (ds->checkTimeout % 1000) * 1000);
-  if(ret < 0 || !ret) { // error, timeout, both are down!
-    if (ret < 0) {
-      ret = errno;
-      if (g_verboseHealthChecks)
-        infolog("Error while waiting for the health check response from backend %s: %d", ds->getNameWithAddr(), ret);
-    }
-    else {
-      if (g_verboseHealthChecks)
-        infolog("Timeout while waiting for the health check response from backend %s", ds->getNameWithAddr());
-    }
-    return false;
-  }
-
-  string reply;
-  ComboAddress from;
-  sock.recvFrom(reply, from);
-
-  /* we are using a connected socket but hey.. */
-  if (from != ds->remote) {
-    if (g_verboseHealthChecks)
-      infolog("Invalid health check response received from %s, expecting one from %s", from.toStringWithPort(), ds->remote.toStringWithPort());
-    return false;
-  }
-
-  const dnsheader * responseHeader = reinterpret_cast<const dnsheader *>(reply.c_str());
-
-  if (reply.size() < sizeof(*responseHeader)) {
-    if (g_verboseHealthChecks)
-      infolog("Invalid health check response of size %d from backend %s, expecting at least %d", reply.size(), ds->getNameWithAddr(), sizeof(*responseHeader));
-    return false;
-  }
-
-  if (responseHeader->id != requestHeader->id) {
-    if (g_verboseHealthChecks)
-      infolog("Invalid health check response id %d from backend %s, expecting %d", responseHeader->id, ds->getNameWithAddr(), requestHeader->id);
-    return false;
-  }
-
-  if (!responseHeader->qr) {
-    if (g_verboseHealthChecks)
-      infolog("Invalid health check response from backend %s, expecting QR to be set", ds->getNameWithAddr());
-    return false;
-  }
-
-  if (responseHeader->rcode == RCode::ServFail) {
-    if (g_verboseHealthChecks)
-      infolog("Backend %s responded to health check with ServFail", ds->getNameWithAddr());
-    return false;
-  }
-
-  if (ds->mustResolve && (responseHeader->rcode == RCode::NXDomain || responseHeader->rcode == RCode::Refused)) {
-    if (g_verboseHealthChecks)
-      infolog("Backend %s responded to health check with %s while mustResolve is set", ds->getNameWithAddr(), responseHeader->rcode == RCode::NXDomain ? "NXDomain" : "Refused");
-    return false;
-  }
-
-  uint16_t receivedType;
-  uint16_t receivedClass;
-  DNSName receivedName(reply.c_str(), reply.size(), sizeof(dnsheader), false, &receivedType, &receivedClass);
-
-  if (receivedName != checkName || receivedType != checkType || receivedClass != checkClass) {
-    if (g_verboseHealthChecks)
-      infolog("Backend %s responded to health check with an invalid qname (%s vs %s), qtype (%s vs %s) or qclass (%d vs %d)", ds->getNameWithAddr(), receivedName.toLogString(), checkName.toLogString(), QType(receivedType).getName(), QType(checkType).getName(), receivedClass, checkClass);
-    return false;
-  }
-
-  return true;
-}
-catch(const std::exception& e)
-{
-  if (g_verboseHealthChecks)
-    infolog("Error checking the health of backend %s: %s", ds->getNameWithAddr(), e.what());
-  return false;
-}
-catch(...)
-{
-  if (g_verboseHealthChecks)
-    infolog("Unknown exception while checking the health of backend %s", ds->getNameWithAddr());
-  return false;
-}
-
 uint64_t g_maxTCPClientThreads{10};
 std::atomic<uint16_t> g_cacheCleaningDelay{60};
 std::atomic<uint16_t> g_cacheCleaningPercentage{100};
@@ -2032,68 +1943,27 @@ static void healthChecksThread()
 {
   setThreadName("dnsdist/healthC");
 
-  int interval = 1;
+  static const int interval = 1;
 
   for(;;) {
     sleep(interval);
 
-    if(g_tcpclientthreads->getQueuedCount() > 1 && !g_tcpclientthreads->hasReachedMaxThreads())
+    if(g_tcpclientthreads->getQueuedCount() > 1 && !g_tcpclientthreads->hasReachedMaxThreads()) {
       g_tcpclientthreads->addTCPClientThread();
+    }
 
+    auto mplexer = std::shared_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
     auto states = g_dstates.getLocal(); // this points to the actual shared_ptrs!
     for(auto& dss : *states) {
-      if(++dss->lastCheck < dss->checkInterval)
+      if(++dss->lastCheck < dss->checkInterval) {
         continue;
-      dss->lastCheck = 0;
-      if(dss->availability==DownstreamState::Availability::Auto) {
-        bool newState=upCheck(dss);
-        if (newState) {
-          /* check succeeded */
-          dss->currentCheckFailures = 0;
-
-          if (!dss->upStatus) {
-            /* we were marked as down */
-            dss->consecutiveSuccessfulChecks++;
-            if (dss->consecutiveSuccessfulChecks < dss->minRiseSuccesses) {
-              /* if we need more than one successful check to rise
-                 and we didn't reach the threshold yet,
-                 let's stay down */
-              newState = false;
-            }
-          }
-        }
-        else {
-          /* check failed */
-          dss->consecutiveSuccessfulChecks = 0;
-
-          if (dss->upStatus) {
-            /* we are currently up */
-            dss->currentCheckFailures++;
-            if (dss->currentCheckFailures < dss->maxCheckFailures) {
-              /* we need more than one failure to be marked as down,
-                 and we did not reach the threshold yet, let's stay down */
-              newState = true;
-            }
-          }
-        }
-
-        if(newState != dss->upStatus) {
-          warnlog("Marking downstream %s as '%s'", dss->getNameWithAddr(), newState ? "up" : "down");
-
-          if (newState && !dss->connected) {
-            newState = dss->reconnect();
+      }
 
-            if (dss->connected && !dss->threadStarted.test_and_set()) {
-              dss->tid = thread(responderThread, dss);
-            }
-          }
+      dss->lastCheck = 0;
 
-          dss->upStatus = newState;
-          dss->currentCheckFailures = 0;
-          dss->consecutiveSuccessfulChecks = 0;
-          if (g_snmpAgent && g_snmpTrapsEnabled) {
-            g_snmpAgent->sendBackendStatusChangeTrap(dss);
-          }
+      if (dss->availability == DownstreamState::Availability::Auto) {
+        if (!queueHealthCheck(mplexer, dss)) {
+          updateHealthCheckResult(dss, false);
         }
       }
 
@@ -2103,7 +1973,7 @@ static void healthChecksThread()
       dss->prev.queries.store(dss->queries.load());
       dss->prev.reuseds.store(dss->reuseds.load());
       
-      for(IDState& ids  : dss->idStates) { // timeouts
+      for (IDState& ids  : dss->idStates) { // timeouts
         int64_t usageIndicator = ids.usageIndicator;
         if(IDState::isInUse(usageIndicator) && ids.age++ > g_udpTimeout) {
           /* We mark the state as unused as soon as possible
@@ -2138,6 +2008,8 @@ static void healthChecksThread()
         }          
       }
     }
+
+    handleQueuedHealthChecks(mplexer);
   }
 }
 
@@ -2587,7 +2459,7 @@ try
 
   g_policy.setState(leastOutstandingPol);
   if(g_cmdLine.beClient || !g_cmdLine.command.empty()) {
-    setupLua(true, g_cmdLine.config);
+    setupLua(true, false, g_cmdLine.config);
     if (clientAddress != ComboAddress())
       g_serverControl = clientAddress;
     doClient(g_serverControl, g_cmdLine.command);
@@ -2608,13 +2480,13 @@ try
   g_consoleACL.setState(consoleACL);
 
   if (g_cmdLine.checkConfig) {
-    setupLua(true, g_cmdLine.config);
+    setupLua(false, true, g_cmdLine.config);
     // No exception was thrown
     infolog("Configuration '%s' OK!", g_cmdLine.config);
     _exit(EXIT_SUCCESS);
   }
 
-  auto todo=setupLua(false, g_cmdLine.config);
+  auto todo=setupLua(false, false, g_cmdLine.config);
 
   auto localPools = g_pools.getCopy();
   {
@@ -2724,7 +2596,7 @@ try
        or as an unprivileged user with ambient
        capabilities like CAP_NET_BIND_SERVICE.
     */
-    dropCapabilities();
+    dropCapabilities(g_capabilitiesToRetain);
   }
   catch(const std::exception& e) {
     warnlog("%s", e.what());
@@ -2764,13 +2636,16 @@ try
 
   checkFileDescriptorsLimits(udpBindsCount, tcpBindsCount);
 
+  auto mplexer = std::shared_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
   for(auto& dss : g_dstates.getCopy()) { // it is a copy, but the internal shared_ptrs are the real deal
-    if(dss->availability==DownstreamState::Availability::Auto) {
-      bool newState=upCheck(dss);
-      warnlog("Marking downstream %s as '%s'", dss->getNameWithAddr(), newState ? "up" : "down");
-      dss->upStatus = newState;
+    if (dss->availability == DownstreamState::Availability::Auto) {
+      if (!queueHealthCheck(mplexer, dss, true)) {
+        dss->upStatus = false;
+        warnlog("Marking downstream %s as 'down'", dss->getNameWithAddr());
+      }
     }
   }
+  handleQueuedHealthChecks(mplexer, true);
 
   for(auto& cs : g_frontends) {
     if (cs->dohFrontend != nullptr) {