]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
dnsdist: Implement parallel health checks 8491/head
authorRemi Gacogne <remi.gacogne@powerdns.com>
Wed, 30 Oct 2019 15:06:49 +0000 (16:06 +0100)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Tue, 12 Nov 2019 09:00:21 +0000 (10:00 +0100)
pdns/dnsdist-lua.cc
pdns/dnsdist-tcp.cc
pdns/dnsdist-web.cc
pdns/dnsdist.cc
pdns/dnsdist.hh
pdns/dnsdistdist/Makefile.am
pdns/dnsdistdist/dnsdist-healthchecks.cc [new file with mode: 0644]
pdns/dnsdistdist/dnsdist-healthchecks.hh [new file with mode: 0644]
pdns/mplexer.hh

index 39ef2da24d2fa76a17cb827abf458326042b4b68..18f9331d4a53bbea250c25f99d46b8d9749b5def 100644 (file)
@@ -34,6 +34,7 @@
 #include "dnsdist.hh"
 #include "dnsdist-console.hh"
 #include "dnsdist-ecs.hh"
+#include "dnsdist-healthchecks.hh"
 #include "dnsdist-lua.hh"
 #include "dnsdist-rings.hh"
 #include "dnsdist-secpoll.hh"
index 3c1b7459f6438c8ec1fa532da955e20e31f5c01b..40f8ac2dc46b0bc7154a2f8ef7cb74c126560e77 100644 (file)
@@ -336,9 +336,8 @@ void TCPClientCollection::addTCPClientThread()
     }
 
     d_tcpclientthreads.push_back(pipefds[1]);
+    ++d_numthreads;
   }
-
-  ++d_numthreads;
 }
 
 static void cleanupClosedTCPConnections()
@@ -1339,7 +1338,9 @@ void tcpAcceptorThread(void* p)
   ComboAddress remote;
   remote.sin4.sin_family = cs->local.sin4.sin_family;
 
-  g_tcpclientthreads->addTCPClientThread();
+  if(!g_tcpclientthreads->hasReachedMaxThreads()) {
+    g_tcpclientthreads->addTCPClientThread();
+  }
 
   auto acl = g_ACL.getLocal();
   for(;;) {
index 07613f131c516b9ee81378c885d38bcf2ec2cdfe..421ce833a3a008489902ca2347f97715b4b6523d 100644 (file)
@@ -20,6 +20,8 @@
  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  */
 #include "dnsdist.hh"
+#include "dnsdist-healthchecks.hh"
+
 #include "sstuff.hh"
 #include "ext/json11/json11.hpp"
 #include "ext/incbin/incbin.h"
index 1674e9fc853d730a1bed7537eae336ece0341942..5ab93dc5aa18693355ff75383453a5604ae41e3e 100644 (file)
@@ -45,6 +45,7 @@
 #include "dnsdist-cache.hh"
 #include "dnsdist-console.hh"
 #include "dnsdist-ecs.hh"
+#include "dnsdist-healthchecks.hh"
 #include "dnsdist-lua.hh"
 #include "dnsdist-rings.hh"
 #include "dnsdist-secpoll.hh"
@@ -55,7 +56,6 @@
 #include "dolog.hh"
 #include "dnsname.hh"
 #include "dnsparser.hh"
-#include "dnswriter.hh"
 #include "ednsoptions.hh"
 #include "gettime.hh"
 #include "lock.hh"
@@ -84,7 +84,6 @@ struct DNSDistStats g_stats;
 MetricDefinitionStorage g_metricDefinitions;
 
 uint16_t g_maxOutstanding{std::numeric_limits<uint16_t>::max()};
-bool g_verboseHealthChecks{false};
 uint32_t g_staleCacheEntriesTTL{0};
 bool g_syslog{true};
 bool g_allowEmptyResponse{false};
@@ -1821,147 +1820,12 @@ catch(...)
 uint16_t getRandomDNSID()
 {
 #ifdef HAVE_LIBSODIUM
-  return (randombytes_random() % 65536);
+  return randombytes_uniform(65536);
 #else
   return (random() % 65536);
 #endif
 }
 
-static bool upCheck(const shared_ptr<DownstreamState>& ds)
-try
-{
-  DNSName checkName = ds->checkName;
-  uint16_t checkType = ds->checkType.getCode();
-  uint16_t checkClass = ds->checkClass;
-  dnsheader checkHeader;
-  memset(&checkHeader, 0, sizeof(checkHeader));
-
-  checkHeader.qdcount = htons(1);
-  checkHeader.id = getRandomDNSID();
-
-  checkHeader.rd = true;
-  if (ds->setCD) {
-    checkHeader.cd = true;
-  }
-
-  if (ds->checkFunction) {
-    std::lock_guard<std::mutex> lock(g_luamutex);
-    auto ret = ds->checkFunction(checkName, checkType, checkClass, &checkHeader);
-    checkName = std::get<0>(ret);
-    checkType = std::get<1>(ret);
-    checkClass = std::get<2>(ret);
-  }
-
-  vector<uint8_t> packet;
-  DNSPacketWriter dpw(packet, checkName, checkType, checkClass);
-  dnsheader * requestHeader = dpw.getHeader();
-  *requestHeader = checkHeader;
-
-  Socket sock(ds->remote.sin4.sin_family, SOCK_DGRAM);
-  sock.setNonBlocking();
-  if (!IsAnyAddress(ds->sourceAddr)) {
-    sock.setReuseAddr();
-    if (!ds->sourceItfName.empty()) {
-#ifdef SO_BINDTODEVICE
-      int res = setsockopt(sock.getHandle(), SOL_SOCKET, SO_BINDTODEVICE, ds->sourceItfName.c_str(), ds->sourceItfName.length());
-      if (res != 0 && g_verboseHealthChecks) {
-        infolog("Error setting SO_BINDTODEVICE on the health check socket for backend '%s': %s", ds->getNameWithAddr(), stringerror());
-      }
-#endif
-    }
-    sock.bind(ds->sourceAddr);
-  }
-  sock.connect(ds->remote);
-  ssize_t sent = udpClientSendRequestToBackend(ds, sock.getHandle(), reinterpret_cast<char*>(&packet[0]), packet.size(), true);
-  if (sent < 0) {
-    int ret = errno;
-    if (g_verboseHealthChecks)
-      infolog("Error while sending a health check query to backend %s: %d", ds->getNameWithAddr(), ret);
-    return false;
-  }
-
-  int ret = waitForRWData(sock.getHandle(), true, /* ms to seconds */ ds->checkTimeout / 1000, /* remaining ms to us */ (ds->checkTimeout % 1000) * 1000);
-  if(ret < 0 || !ret) { // error, timeout, both are down!
-    if (ret < 0) {
-      ret = errno;
-      if (g_verboseHealthChecks)
-        infolog("Error while waiting for the health check response from backend %s: %d", ds->getNameWithAddr(), ret);
-    }
-    else {
-      if (g_verboseHealthChecks)
-        infolog("Timeout while waiting for the health check response from backend %s", ds->getNameWithAddr());
-    }
-    return false;
-  }
-
-  string reply;
-  ComboAddress from;
-  sock.recvFrom(reply, from);
-
-  /* we are using a connected socket but hey.. */
-  if (from != ds->remote) {
-    if (g_verboseHealthChecks)
-      infolog("Invalid health check response received from %s, expecting one from %s", from.toStringWithPort(), ds->remote.toStringWithPort());
-    return false;
-  }
-
-  const dnsheader * responseHeader = reinterpret_cast<const dnsheader *>(reply.c_str());
-
-  if (reply.size() < sizeof(*responseHeader)) {
-    if (g_verboseHealthChecks)
-      infolog("Invalid health check response of size %d from backend %s, expecting at least %d", reply.size(), ds->getNameWithAddr(), sizeof(*responseHeader));
-    return false;
-  }
-
-  if (responseHeader->id != requestHeader->id) {
-    if (g_verboseHealthChecks)
-      infolog("Invalid health check response id %d from backend %s, expecting %d", responseHeader->id, ds->getNameWithAddr(), requestHeader->id);
-    return false;
-  }
-
-  if (!responseHeader->qr) {
-    if (g_verboseHealthChecks)
-      infolog("Invalid health check response from backend %s, expecting QR to be set", ds->getNameWithAddr());
-    return false;
-  }
-
-  if (responseHeader->rcode == RCode::ServFail) {
-    if (g_verboseHealthChecks)
-      infolog("Backend %s responded to health check with ServFail", ds->getNameWithAddr());
-    return false;
-  }
-
-  if (ds->mustResolve && (responseHeader->rcode == RCode::NXDomain || responseHeader->rcode == RCode::Refused)) {
-    if (g_verboseHealthChecks)
-      infolog("Backend %s responded to health check with %s while mustResolve is set", ds->getNameWithAddr(), responseHeader->rcode == RCode::NXDomain ? "NXDomain" : "Refused");
-    return false;
-  }
-
-  uint16_t receivedType;
-  uint16_t receivedClass;
-  DNSName receivedName(reply.c_str(), reply.size(), sizeof(dnsheader), false, &receivedType, &receivedClass);
-
-  if (receivedName != checkName || receivedType != checkType || receivedClass != checkClass) {
-    if (g_verboseHealthChecks)
-      infolog("Backend %s responded to health check with an invalid qname (%s vs %s), qtype (%s vs %s) or qclass (%d vs %d)", ds->getNameWithAddr(), receivedName.toLogString(), checkName.toLogString(), QType(receivedType).getName(), QType(checkType).getName(), receivedClass, checkClass);
-    return false;
-  }
-
-  return true;
-}
-catch(const std::exception& e)
-{
-  if (g_verboseHealthChecks)
-    infolog("Error checking the health of backend %s: %s", ds->getNameWithAddr(), e.what());
-  return false;
-}
-catch(...)
-{
-  if (g_verboseHealthChecks)
-    infolog("Unknown exception while checking the health of backend %s", ds->getNameWithAddr());
-  return false;
-}
-
 uint64_t g_maxTCPClientThreads{10};
 std::atomic<uint16_t> g_cacheCleaningDelay{60};
 std::atomic<uint16_t> g_cacheCleaningPercentage{100};
@@ -2058,68 +1922,27 @@ static void healthChecksThread()
 {
   setThreadName("dnsdist/healthC");
 
-  int interval = 1;
+  static const int interval = 1;
 
   for(;;) {
     sleep(interval);
 
-    if(g_tcpclientthreads->getQueuedCount() > 1 && !g_tcpclientthreads->hasReachedMaxThreads())
+    if(g_tcpclientthreads->getQueuedCount() > 1 && !g_tcpclientthreads->hasReachedMaxThreads()) {
       g_tcpclientthreads->addTCPClientThread();
+    }
 
+    auto mplexer = std::shared_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
     auto states = g_dstates.getLocal(); // this points to the actual shared_ptrs!
     for(auto& dss : *states) {
-      if(++dss->lastCheck < dss->checkInterval)
+      if(++dss->lastCheck < dss->checkInterval) {
         continue;
-      dss->lastCheck = 0;
-      if(dss->availability==DownstreamState::Availability::Auto) {
-        bool newState=upCheck(dss);
-        if (newState) {
-          /* check succeeded */
-          dss->currentCheckFailures = 0;
-
-          if (!dss->upStatus) {
-            /* we were marked as down */
-            dss->consecutiveSuccessfulChecks++;
-            if (dss->consecutiveSuccessfulChecks < dss->minRiseSuccesses) {
-              /* if we need more than one successful check to rise
-                 and we didn't reach the threshold yet,
-                 let's stay down */
-              newState = false;
-            }
-          }
-        }
-        else {
-          /* check failed */
-          dss->consecutiveSuccessfulChecks = 0;
-
-          if (dss->upStatus) {
-            /* we are currently up */
-            dss->currentCheckFailures++;
-            if (dss->currentCheckFailures < dss->maxCheckFailures) {
-              /* we need more than one failure to be marked as down,
-                 and we did not reach the threshold yet, let's stay down */
-              newState = true;
-            }
-          }
-        }
-
-        if(newState != dss->upStatus) {
-          warnlog("Marking downstream %s as '%s'", dss->getNameWithAddr(), newState ? "up" : "down");
-
-          if (newState && !dss->connected) {
-            newState = dss->reconnect();
+      }
 
-            if (dss->connected && !dss->threadStarted.test_and_set()) {
-              dss->tid = thread(responderThread, dss);
-            }
-          }
+      dss->lastCheck = 0;
 
-          dss->upStatus = newState;
-          dss->currentCheckFailures = 0;
-          dss->consecutiveSuccessfulChecks = 0;
-          if (g_snmpAgent && g_snmpTrapsEnabled) {
-            g_snmpAgent->sendBackendStatusChangeTrap(dss);
-          }
+      if (dss->availability == DownstreamState::Availability::Auto) {
+        if (!queueHealthCheck(mplexer, dss)) {
+          updateHealthCheckResult(dss, false);
         }
       }
 
@@ -2129,7 +1952,7 @@ static void healthChecksThread()
       dss->prev.queries.store(dss->queries.load());
       dss->prev.reuseds.store(dss->reuseds.load());
       
-      for(IDState& ids  : dss->idStates) { // timeouts
+      for (IDState& ids  : dss->idStates) { // timeouts
         int64_t usageIndicator = ids.usageIndicator;
         if(IDState::isInUse(usageIndicator) && ids.age++ > g_udpTimeout) {
           /* We mark the state as unused as soon as possible
@@ -2164,6 +1987,8 @@ static void healthChecksThread()
         }          
       }
     }
+
+    handleQueuedHealthChecks(mplexer);
   }
 }
 
@@ -2790,13 +2615,16 @@ try
 
   checkFileDescriptorsLimits(udpBindsCount, tcpBindsCount);
 
+  auto mplexer = std::shared_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
   for(auto& dss : g_dstates.getCopy()) { // it is a copy, but the internal shared_ptrs are the real deal
-    if(dss->availability==DownstreamState::Availability::Auto) {
-      bool newState=upCheck(dss);
-      warnlog("Marking downstream %s as '%s'", dss->getNameWithAddr(), newState ? "up" : "down");
-      dss->upStatus = newState;
+    if (dss->availability == DownstreamState::Availability::Auto) {
+      if (!queueHealthCheck(mplexer, dss, true)) {
+        dss->upStatus = false;
+        warnlog("Marking downstream %s as 'down'", dss->getNameWithAddr());
+      }
     }
   }
+  handleQueuedHealthChecks(mplexer, true);
 
   for(auto& cs : g_frontends) {
     if (cs->dohFrontend != nullptr) {
index 69bc7833627e162db964debde7fd4d04d2ac0200..8acf7628ae4bb6b4e15a3d7888c6b712bb924c1d 100644 (file)
@@ -1149,7 +1149,6 @@ extern size_t g_maxTCPConnectionDuration;
 extern size_t g_maxTCPConnectionsPerClient;
 extern std::atomic<uint16_t> g_cacheCleaningDelay;
 extern std::atomic<uint16_t> g_cacheCleaningPercentage;
-extern bool g_verboseHealthChecks;
 extern uint32_t g_staleCacheEntriesTTL;
 extern bool g_apiReadWrite;
 extern std::string g_apiConfigDirectory;
index 163d4413802daa3d48c167834e3d5fccc3aa4a47..60ddb145b246c4ee286f1191816a18d7628042c4 100644 (file)
@@ -124,6 +124,7 @@ dnsdist_SOURCES = \
        dnsdist-dnscrypt.cc \
        dnsdist-dynblocks.hh \
        dnsdist-ecs.cc dnsdist-ecs.hh \
+       dnsdist-healthchecks.cc dnsdist-healthchecks.hh \
        dnsdist-idstate.cc \
        dnsdist-kvs.hh dnsdist-kvs.cc \
        dnsdist-lua.hh dnsdist-lua.cc \
diff --git a/pdns/dnsdistdist/dnsdist-healthchecks.cc b/pdns/dnsdistdist/dnsdist-healthchecks.cc
new file mode 100644 (file)
index 0000000..3dc7e6a
--- /dev/null
@@ -0,0 +1,289 @@
+/*
+ * This file is part of PowerDNS or dnsdist.
+ * Copyright -- PowerDNS.COM B.V. and its contributors
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of version 2 of the GNU General Public License as
+ * published by the Free Software Foundation.
+ *
+ * In addition, for the avoidance of any doubt, permission is granted to
+ * link this program with OpenSSL and to (re)distribute the binaries
+ * produced as the result of such linking.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#include "dnsdist-healthchecks.hh"
+#include "dnswriter.hh"
+#include "dolog.hh"
+
+bool g_verboseHealthChecks{false};
+
+void updateHealthCheckResult(const std::shared_ptr<DownstreamState>& dss, bool newState)
+{
+  if (newState) {
+    /* check succeeded */
+    dss->currentCheckFailures = 0;
+
+    if (!dss->upStatus) {
+      /* we were marked as down */
+      dss->consecutiveSuccessfulChecks++;
+      if (dss->consecutiveSuccessfulChecks < dss->minRiseSuccesses) {
+        /* if we need more than one successful check to rise
+           and we didn't reach the threshold yet,
+           let's stay down */
+        newState = false;
+      }
+    }
+  }
+  else {
+    /* check failed */
+    dss->consecutiveSuccessfulChecks = 0;
+
+    if (dss->upStatus) {
+      /* we are currently up */
+      dss->currentCheckFailures++;
+      if (dss->currentCheckFailures < dss->maxCheckFailures) {
+        /* we need more than one failure to be marked as down,
+           and we did not reach the threshold yet, let's stay down */
+        newState = true;
+      }
+    }
+  }
+  if(newState != dss->upStatus) {
+    warnlog("Marking downstream %s as '%s'", dss->getNameWithAddr(), newState ? "up" : "down");
+
+    if (newState && !dss->connected) {
+      newState = dss->reconnect();
+
+      if (dss->connected && !dss->threadStarted.test_and_set()) {
+        dss->tid = std::thread(responderThread, dss);
+      }
+    }
+
+    dss->upStatus = newState;
+    dss->currentCheckFailures = 0;
+    dss->consecutiveSuccessfulChecks = 0;
+    if (g_snmpAgent && g_snmpTrapsEnabled) {
+      g_snmpAgent->sendBackendStatusChangeTrap(dss);
+    }
+  }
+}
+
+static bool handleResponse(std::shared_ptr<HealthCheckData>& data)
+{
+  auto& ds = data->d_ds;
+  try {
+    string reply;
+    ComboAddress from;
+    data->d_sock.recvFrom(reply, from);
+
+    /* we are using a connected socket but hey.. */
+    if (from != ds->remote) {
+      if (g_verboseHealthChecks) {
+        infolog("Invalid health check response received from %s, expecting one from %s", from.toStringWithPort(), ds->remote.toStringWithPort());
+      }
+      return false;
+    }
+
+    const dnsheader * responseHeader = reinterpret_cast<const dnsheader *>(reply.c_str());
+
+    if (reply.size() < sizeof(*responseHeader)) {
+      if (g_verboseHealthChecks) {
+        infolog("Invalid health check response of size %d from backend %s, expecting at least %d", reply.size(), ds->getNameWithAddr(), sizeof(*responseHeader));
+      }
+      return false;
+    }
+
+    if (responseHeader->id != data->d_queryID) {
+      if (g_verboseHealthChecks) {
+        infolog("Invalid health check response id %d from backend %s, expecting %d", data->d_queryID, ds->getNameWithAddr(), data->d_queryID);
+      }
+      return false;
+    }
+
+    if (!responseHeader->qr) {
+      if (g_verboseHealthChecks) {
+        infolog("Invalid health check response from backend %s, expecting QR to be set", ds->getNameWithAddr());
+      }
+      return false;
+    }
+
+    if (responseHeader->rcode == RCode::ServFail) {
+      if (g_verboseHealthChecks) {
+        infolog("Backend %s responded to health check with ServFail", ds->getNameWithAddr());
+      }
+      return false;
+    }
+
+    if (ds->mustResolve && (responseHeader->rcode == RCode::NXDomain || responseHeader->rcode == RCode::Refused)) {
+      if (g_verboseHealthChecks) {
+        infolog("Backend %s responded to health check with %s while mustResolve is set", ds->getNameWithAddr(), responseHeader->rcode == RCode::NXDomain ? "NXDomain" : "Refused");
+      }
+      return false;
+    }
+
+    uint16_t receivedType;
+    uint16_t receivedClass;
+    DNSName receivedName(reply.c_str(), reply.size(), sizeof(dnsheader), false, &receivedType, &receivedClass);
+
+    if (receivedName != data->d_checkName || receivedType != data->d_checkType || receivedClass != data->d_checkClass) {
+      if (g_verboseHealthChecks) {
+        infolog("Backend %s responded to health check with an invalid qname (%s vs %s), qtype (%s vs %s) or qclass (%d vs %d)", ds->getNameWithAddr(), receivedName.toLogString(), data->d_checkName.toLogString(), QType(receivedType).getName(), QType(data->d_checkType).getName(), receivedClass, data->d_checkClass);
+      }
+      return false;
+    }
+  }
+  catch(const std::exception& e)
+  {
+    if (g_verboseHealthChecks) {
+      infolog("Error checking the health of backend %s: %s", ds->getNameWithAddr(), e.what());
+    }
+    return false;
+  }
+  catch(...)
+  {
+    if (g_verboseHealthChecks) {
+      infolog("Unknown exception while checking the health of backend %s", ds->getNameWithAddr());
+    }
+    return false;
+  }
+
+  return true;
+}
+
+static void healthCheckCallback(int fd, FDMultiplexer::funcparam_t& param)
+{
+  auto data = boost::any_cast<std::shared_ptr<HealthCheckData>>(param);
+  data->d_mplexer->removeReadFD(fd);
+  updateHealthCheckResult(data->d_ds, handleResponse(data));
+}
+
+static void initialHealthCheckCallback(int fd, FDMultiplexer::funcparam_t& param)
+{
+  auto data = boost::any_cast<std::shared_ptr<HealthCheckData>>(param);
+  data->d_mplexer->removeReadFD(fd);
+  bool up = handleResponse(data);
+  warnlog("Marking downstream %s as '%s'", data->d_ds->getNameWithAddr(), up ? "up" : "down");
+  data->d_ds->upStatus = up;
+}
+
+bool queueHealthCheck(std::shared_ptr<FDMultiplexer>& mplexer, const std::shared_ptr<DownstreamState>& ds, bool initialCheck)
+{
+  try
+  {
+    uint16_t queryID = getRandomDNSID();
+    DNSName checkName = ds->checkName;
+    uint16_t checkType = ds->checkType.getCode();
+    uint16_t checkClass = ds->checkClass;
+    dnsheader checkHeader;
+    memset(&checkHeader, 0, sizeof(checkHeader));
+
+    checkHeader.qdcount = htons(1);
+    checkHeader.id = queryID;
+
+    checkHeader.rd = true;
+    if (ds->setCD) {
+      checkHeader.cd = true;
+    }
+
+    if (ds->checkFunction) {
+      std::lock_guard<std::mutex> lock(g_luamutex);
+      auto ret = ds->checkFunction(checkName, checkType, checkClass, &checkHeader);
+      checkName = std::get<0>(ret);
+      checkType = std::get<1>(ret);
+      checkClass = std::get<2>(ret);
+    }
+
+    vector<uint8_t> packet;
+    DNSPacketWriter dpw(packet, checkName, checkType, checkClass);
+    dnsheader * requestHeader = dpw.getHeader();
+    *requestHeader = checkHeader;
+
+    Socket sock(ds->remote.sin4.sin_family, SOCK_DGRAM);
+    sock.setNonBlocking();
+    if (!IsAnyAddress(ds->sourceAddr)) {
+      sock.setReuseAddr();
+      if (!ds->sourceItfName.empty()) {
+#ifdef SO_BINDTODEVICE
+        int res = setsockopt(sock.getHandle(), SOL_SOCKET, SO_BINDTODEVICE, ds->sourceItfName.c_str(), ds->sourceItfName.length());
+        if (res != 0 && g_verboseHealthChecks) {
+          infolog("Error setting SO_BINDTODEVICE on the health check socket for backend '%s': %s", ds->getNameWithAddr(), stringerror());
+        }
+#endif
+      }
+      sock.bind(ds->sourceAddr);
+    }
+    sock.connect(ds->remote);
+    ssize_t sent = udpClientSendRequestToBackend(ds, sock.getHandle(), reinterpret_cast<char*>(&packet[0]), packet.size(), true);
+    if (sent < 0) {
+      int ret = errno;
+      if (g_verboseHealthChecks)
+        infolog("Error while sending a health check query to backend %s: %d", ds->getNameWithAddr(), ret);
+      return false;
+    }
+
+    auto data = std::make_shared<HealthCheckData>(mplexer, ds, std::move(sock), std::move(checkName), checkType, checkClass, queryID);
+    struct timeval ttd;
+    gettimeofday(&ttd, nullptr);
+    ttd.tv_sec += ds->checkTimeout / 1000; /* ms to seconds */
+    ttd.tv_usec += (ds->checkTimeout % 1000) * 1000; /* remaining ms to us */
+    if (ttd.tv_usec > 1000000) {
+      ++ttd.tv_sec;
+      ttd.tv_usec -= 1000000;
+    }
+    mplexer->addReadFD(data->d_sock.getHandle(), initialCheck ? &initialHealthCheckCallback : &healthCheckCallback, data, &ttd);
+
+    return true;
+  }
+  catch(const std::exception& e)
+  {
+    if (g_verboseHealthChecks) {
+      infolog("Error checking the health of backend %s: %s", ds->getNameWithAddr(), e.what());
+    }
+    return false;
+  }
+  catch(...)
+  {
+    if (g_verboseHealthChecks) {
+      infolog("Unknown exception while checking the health of backend %s", ds->getNameWithAddr());
+    }
+    return false;
+  }
+}
+
+void handleQueuedHealthChecks(std::shared_ptr<FDMultiplexer>& mplexer, bool initial)
+{
+  while (mplexer->getWatchedFDCount(false) > 0) {
+    struct timeval now;
+    int ret = mplexer->run(&now, 100);
+    if (ret == -1) {
+      if (g_verboseHealthChecks) {
+        infolog("Error while waiting for the health check response from backends: %d", ret);
+      }
+      break;
+    }
+    auto timeouts = mplexer->getTimeouts(now);
+    for (const auto& timeout : timeouts) {
+      mplexer->removeReadFD(timeout.first);
+      auto data = boost::any_cast<std::shared_ptr<HealthCheckData>>(timeout.second);
+      if (g_verboseHealthChecks) {
+        infolog("Timeout while waiting for the health check response from backend %s", data->d_ds->getNameWithAddr());
+      }
+      if (initial) {
+        warnlog("Marking downstream %s as 'down'", data->d_ds->getNameWithAddr());
+        data->d_ds->upStatus = false;
+      }
+      else {
+        updateHealthCheckResult(data->d_ds, false);
+      }
+    }
+  }
+}
diff --git a/pdns/dnsdistdist/dnsdist-healthchecks.hh b/pdns/dnsdistdist/dnsdist-healthchecks.hh
new file mode 100644 (file)
index 0000000..99d512b
--- /dev/null
@@ -0,0 +1,48 @@
+/*
+ * This file is part of PowerDNS or dnsdist.
+ * Copyright -- PowerDNS.COM B.V. and its contributors
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of version 2 of the GNU General Public License as
+ * published by the Free Software Foundation.
+ *
+ * In addition, for the avoidance of any doubt, permission is granted to
+ * link this program with OpenSSL and to (re)distribute the binaries
+ * produced as the result of such linking.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+#pragma once
+
+#include "dnsdist.hh"
+#include "mplexer.hh"
+#include "sstuff.hh"
+
+struct HealthCheckData
+{
+  HealthCheckData(std::shared_ptr<FDMultiplexer>& mplexer, const std::shared_ptr<DownstreamState>& ds, Socket&& sock, DNSName&& checkName, uint16_t checkType, uint16_t checkClass, uint16_t queryID): d_mplexer(mplexer), d_ds(ds), d_sock(std::move(sock)), d_checkName(std::move(checkName)), d_checkType(checkType), d_checkClass(checkClass), d_queryID(queryID)
+  {
+  }
+
+  std::shared_ptr<FDMultiplexer> d_mplexer;
+  const std::shared_ptr<DownstreamState> d_ds;
+  Socket d_sock;
+  DNSName d_checkName;
+  uint16_t d_checkType;
+  uint16_t d_checkClass;
+  uint16_t d_queryID;
+};
+
+extern bool g_verboseHealthChecks;
+
+void updateHealthCheckResult(const std::shared_ptr<DownstreamState>& dss, bool newState);
+bool queueHealthCheck(std::shared_ptr<FDMultiplexer>& mplexer, const std::shared_ptr<DownstreamState>& ds, bool initial=false);
+void handleQueuedHealthChecks(std::shared_ptr<FDMultiplexer>& mplexer, bool initial=false);
+
index 927651c332c5324e43b470715e29438d63922052..b14a39dc931f6bb4f68a636032d3ba25f28c85ec 100644 (file)
@@ -78,9 +78,11 @@ public:
   
   /* tv will be updated to 'now' before run returns */
   /* timeout is in ms */
+  /* returns 0 on timeout, -1 in case of error (but all implementations
+     actually throw in that case) and the number of ready events otherwise */
   virtual int run(struct timeval* tv, int timeout=500) = 0;
 
-  /* timeout is in ms, 0 will return immediatly, -1 will block until at least one FD is ready */
+  /* timeout is in ms, 0 will return immediately, -1 will block until at least one FD is ready */
   virtual void getAvailableFDs(std::vector<int>& fds, int timeout) = 0;
 
   //! Add an fd to the read watch list - currently an fd can only be on one list at a time!