]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
dnsdist: Implement health-checks for DoH backends
authorRemi Gacogne <remi.gacogne@powerdns.com>
Tue, 24 Aug 2021 15:39:49 +0000 (17:39 +0200)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Mon, 13 Sep 2021 13:28:27 +0000 (15:28 +0200)
pdns/dnsdist.cc
pdns/dnsdist.hh
pdns/dnsdistdist/dnsdist-healthchecks.cc
pdns/dnsdistdist/dnsdist-healthchecks.hh
pdns/dnsdistdist/dnsdist-nghttp2.cc
pdns/dnsdistdist/dnsdist-nghttp2.hh
pdns/dnsdistdist/dnsdist-tcp-downstream.cc
pdns/dnsdistdist/dnsdist-tcp-upstream.hh
pdns/dnsdistdist/dnsdist-tcp.hh
pdns/dnsdistdist/doh.cc

index 490a3d52c8db6d3462940ae4aaf6dd1ff660da9e..00fa3f22865fd64351f42dd48f3cca58dcaf81bb 100644 (file)
@@ -1312,9 +1312,9 @@ public:
     return true;
   }
 
-  const ClientState& getClientState() override
+  const ClientState* getClientState() override
   {
-    return d_cs;
+    return &d_cs;
   }
 
   void handleResponse(const struct timeval& now, TCPResponse&& response) override
@@ -1839,7 +1839,7 @@ static void healthChecksThread()
   for(;;) {
     sleep(interval);
 
-    auto mplexer = std::shared_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
+    auto mplexer = std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
     auto states = g_dstates.getLocal(); // this points to the actual shared_ptrs!
     for(auto& dss : *states) {
       if (++dss->lastCheck < dss->checkInterval) {
@@ -1896,7 +1896,7 @@ static void healthChecksThread()
       }
     }
 
-    handleQueuedHealthChecks(mplexer);
+    handleQueuedHealthChecks(*mplexer);
   }
 }
 
@@ -2576,7 +2576,7 @@ int main(int argc, char** argv)
 
     checkFileDescriptorsLimits(udpBindsCount, tcpBindsCount);
 
-    auto mplexer = std::shared_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
+    auto mplexer = std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
     for(auto& dss : g_dstates.getCopy()) { // it is a copy, but the internal shared_ptrs are the real deal
       if (dss->availability == DownstreamState::Availability::Auto) {
         if (!queueHealthCheck(mplexer, dss, true)) {
@@ -2585,7 +2585,7 @@ int main(int argc, char** argv)
         }
       }
     }
-    handleQueuedHealthChecks(mplexer, true);
+    handleQueuedHealthChecks(*mplexer, true);
 
     /* we need to create the TCP worker threads before the
        acceptor ones, otherwise we might crash when processing
index 4ba0b80876fad176d607376b4bff3205c6dba64f..067a6e77dfc65c93311a925ef608cb1bfcd6ce29 100644 (file)
@@ -826,6 +826,11 @@ struct DownstreamState
     return d_tcpOnly || d_tlsCtx != nullptr;
   }
 
+  bool isDoH() const
+  {
+    return !d_dohPath.empty();
+  }
+
   bool passCrossProtocolQuery(std::unique_ptr<CrossProtocolQuery>&& cpq);
 
 private:
index acae3cd63ef54a65e05665860241807bc3a3a3b4..b8c280ac63b21225e03586ca0204550db4f6da29 100644 (file)
@@ -24,6 +24,8 @@
 #include "tcpiohandler-mplexer.hh"
 #include "dnswriter.hh"
 #include "dolog.hh"
+#include "dnsdist-tcp.hh"
+#include "dnsdist-nghttp2.hh"
 
 bool g_verboseHealthChecks{false};
 
@@ -31,12 +33,12 @@ struct HealthCheckData
 {
   enum class TCPState : uint8_t { WritingQuery, ReadingResponseSize, ReadingResponse };
 
-  HealthCheckData(std::shared_ptr<FDMultiplexer>& mplexer, const std::shared_ptr<DownstreamState>& ds, DNSName&& checkName, uint16_t checkType, uint16_t checkClass, uint16_t queryID): d_ds(ds), d_mplexer(mplexer), d_udpSocket(-1), d_checkName(std::move(checkName)), d_checkType(checkType), d_checkClass(checkClass), d_queryID(queryID)
+  HealthCheckData(FDMultiplexer& mplexer, const std::shared_ptr<DownstreamState>& ds, DNSName&& checkName, uint16_t checkType, uint16_t checkClass, uint16_t queryID): d_ds(ds), d_mplexer(mplexer), d_udpSocket(-1), d_checkName(std::move(checkName)), d_checkType(checkType), d_checkClass(checkClass), d_queryID(queryID)
   {
   }
 
   const std::shared_ptr<DownstreamState> d_ds;
-  std::shared_ptr<FDMultiplexer> d_mplexer;
+  FDMultiplexer& d_mplexer;
   std::unique_ptr<TCPIOHandler> d_tcpHandler{nullptr};
   std::unique_ptr<IOStateHandler> d_ioState{nullptr};
   PacketBuffer d_buffer;
@@ -178,10 +180,51 @@ static bool handleResponse(std::shared_ptr<HealthCheckData>& data)
   return true;
 }
 
+class HealthCheckQuerySender : public TCPQuerySender
+{
+public:
+  HealthCheckQuerySender(std::shared_ptr<HealthCheckData>& data): d_data(data)
+  {
+  }
+
+  ~HealthCheckQuerySender()
+  {
+  }
+
+  bool active() const override
+  {
+    return true;
+  }
+
+  const ClientState* getClientState() override
+  {
+    return nullptr;
+  }
+
+  void handleResponse(const struct timeval& now, TCPResponse&& response) override
+  {
+    d_data->d_buffer = std::move(response.d_buffer);
+    updateHealthCheckResult(d_data->d_ds, d_data->d_initial, ::handleResponse(d_data));
+  }
+
+  void handleXFRResponse(const struct timeval& now, TCPResponse&& response) override
+  {
+    throw std::runtime_error("Unexpected XFR reponse to a health check query");
+  }
+
+  void notifyIOError(IDState&& query, const struct timeval& now) override
+  {
+    updateHealthCheckResult(d_data->d_ds, d_data->d_initial, false);
+  }
+
+private:
+  std::shared_ptr<HealthCheckData> d_data;
+};
+
 static void healthCheckUDPCallback(int fd, FDMultiplexer::funcparam_t& param)
 {
   auto data = boost::any_cast<std::shared_ptr<HealthCheckData>>(param);
-  data->d_mplexer->removeReadFD(fd);
+  data->d_mplexer.removeReadFD(fd);
 
   ComboAddress from;
   from.sin4.sin_family = data->d_ds->remote.sin4.sin_family;
@@ -264,7 +307,7 @@ static void healthCheckTCPCallback(int fd, FDMultiplexer::funcparam_t& param)
   }
 }
 
-bool queueHealthCheck(std::shared_ptr<FDMultiplexer>& mplexer, const std::shared_ptr<DownstreamState>& ds, bool initialCheck)
+bool queueHealthCheck(std::unique_ptr<FDMultiplexer>& mplexer, const std::shared_ptr<DownstreamState>& ds, bool initialCheck)
 {
   try
   {
@@ -327,7 +370,7 @@ bool queueHealthCheck(std::shared_ptr<FDMultiplexer>& mplexer, const std::shared
       sock.bind(ds->sourceAddr);
     }
 
-    auto data = std::make_shared<HealthCheckData>(mplexer, ds, std::move(checkName), checkType, checkClass, queryID);
+    auto data = std::make_shared<HealthCheckData>(*mplexer, ds, std::move(checkName), checkType, checkClass, queryID);
     data->d_initial = initialCheck;
 
     gettimeofday(&data->d_ttd, nullptr);
@@ -352,6 +395,13 @@ bool queueHealthCheck(std::shared_ptr<FDMultiplexer>& mplexer, const std::shared
 
       mplexer->addReadFD(data->d_udpSocket.getHandle(), &healthCheckUDPCallback, data, &data->d_ttd);
     }
+    else if (ds->isDoH()) {
+      InternalQuery query(std::move(packet), IDState());
+      auto sender = std::shared_ptr<TCPQuerySender>(new HealthCheckQuerySender(data));
+      if (!sendH2Query(ds, mplexer, sender, std::move(query))) {
+        updateHealthCheckResult(data->d_ds, data->d_initial, false);
+      }
+    }
     else {
       data->d_tcpHandler = std::make_unique<TCPIOHandler>(ds->d_tlsSubjectName, sock.releaseHandle(), timeval{ds->checkTimeout,0}, ds->d_tlsCtx, time(nullptr));
       data->d_ioState = std::make_unique<IOStateHandler>(*mplexer, data->d_tcpHandler->getDescriptor());
@@ -390,18 +440,18 @@ bool queueHealthCheck(std::shared_ptr<FDMultiplexer>& mplexer, const std::shared
   }
 }
 
-void handleQueuedHealthChecks(std::shared_ptr<FDMultiplexer>& mplexer, bool initial)
+void handleQueuedHealthChecks(FDMultiplexer& mplexer, bool initial)
 {
-  while (mplexer->getWatchedFDCount(false) > 0 || mplexer->getWatchedFDCount(true) > 0) {
+  while (mplexer.getWatchedFDCount(false) > 0 || mplexer.getWatchedFDCount(true) > 0) {
     struct timeval now;
-    int ret = mplexer->run(&now, 100);
+    int ret = mplexer.run(&now, 100);
     if (ret == -1) {
       if (g_verboseHealthChecks) {
         infolog("Error while waiting for the health check response from backends: %d", ret);
       }
       break;
     }
-    auto timeouts = mplexer->getTimeouts(now);
+    auto timeouts = mplexer.getTimeouts(now);
     for (const auto& timeout : timeouts) {
       auto data = boost::any_cast<std::shared_ptr<HealthCheckData>>(timeout.second);
       try {
@@ -409,7 +459,7 @@ void handleQueuedHealthChecks(std::shared_ptr<FDMultiplexer>& mplexer, bool init
           data->d_ioState.reset();
         }
         else {
-          mplexer->removeReadFD(timeout.first);
+          mplexer.removeReadFD(timeout.first);
         }
         if (g_verboseHealthChecks) {
           infolog("Timeout while waiting for the health check response from backend %s", data->d_ds->getNameWithAddr());
@@ -429,7 +479,7 @@ void handleQueuedHealthChecks(std::shared_ptr<FDMultiplexer>& mplexer, bool init
       }
     }
 
-    timeouts = mplexer->getTimeouts(now, true);
+    timeouts = mplexer.getTimeouts(now, true);
     for (const auto& timeout : timeouts) {
       auto data = boost::any_cast<std::shared_ptr<HealthCheckData>>(timeout.second);
       try {
index 08ac59948d9b5ea0be99224c0941934256178ba9..04af75b0dbdfb0eed2418430bfcf3892b16b9aa8 100644 (file)
@@ -28,6 +28,6 @@
 extern bool g_verboseHealthChecks;
 
 void updateHealthCheckResult(const std::shared_ptr<DownstreamState>& dss, bool initial, bool newState);
-bool queueHealthCheck(std::shared_ptr<FDMultiplexer>& mplexer, const std::shared_ptr<DownstreamState>& ds, bool initial=false);
-void handleQueuedHealthChecks(std::shared_ptr<FDMultiplexer>& mplexer, bool initial=false);
+bool queueHealthCheck(std::unique_ptr<FDMultiplexer>& mplexer, const std::shared_ptr<DownstreamState>& ds, bool initial=false);
+void handleQueuedHealthChecks(FDMultiplexer& mplexer, bool initial=false);
 
index b248a98a1e7c3976028002acb6abeecd528dd973..0aea4d7170d892b0d012a47f4e26978b5b61f7d6 100644 (file)
@@ -1017,3 +1017,13 @@ bool setupDoHClientProtocolNegotiation(std::shared_ptr<TLSCtx>& ctx)
   ctx->setNextProtocolSelectCallback(select_next_proto_callback);
   return true;
 }
+
+bool sendH2Query(const std::shared_ptr<DownstreamState>& ds, std::unique_ptr<FDMultiplexer>& mplexer, std::shared_ptr<TCPQuerySender>& sender, InternalQuery&& query)
+{
+  struct timeval now;
+  gettimeofday(&now, nullptr);
+
+  auto newConnection = std::make_shared<DoHConnectionToBackend>(ds, mplexer, now);
+  newConnection->queueQuery(sender, std::move(query));
+  return true;
+}
index 0775898e81818fed4dcfd8b9547a0d858047ad47..1f3aa23bdd8c11f93c48321549e9d020a65da104 100644 (file)
@@ -25,6 +25,7 @@
 #include <mutex>
 #include <vector>
 
+#include "dnsdist-tcp.hh"
 #include "stat_t.hh"
 
 struct CrossProtocolQuery;
@@ -64,3 +65,7 @@ class TLSCtx;
 
 bool initDoHWorkers();
 bool setupDoHClientProtocolNegotiation(std::shared_ptr<TLSCtx>& ctx);
+
+/* opens a new HTTP/2 connection to the supplied backend (attached to the supplied multiplexer), sends the query,
+   waits for the response to come back or an error to occur then notifies the sender, closing the connection. */
+bool sendH2Query(const std::shared_ptr<DownstreamState>& ds, std::unique_ptr<FDMultiplexer>& mplexer, std::shared_ptr<TCPQuerySender>& sender, InternalQuery&& query);
index 9370a8a0073262a62654af4f1bde73413dc01d1b..c097fe826331d6e46c68d79a3bcefb4c644ebf5c 100644 (file)
@@ -455,10 +455,16 @@ void TCPConnectionToBackend::notifyAllQueriesFailed(const struct timeval& now, F
   }
 
   if (reason == FailureReason::timeout) {
-    ++sender->getClientState().tcpDownstreamTimeouts;
+    const ClientState* cs = sender->getClientState();
+    if (cs) {
+      ++cs->tcpDownstreamTimeouts;
+    }
   }
   else if (reason == FailureReason::gaveUp) {
-    ++sender->getClientState().tcpGaveUp;
+    const ClientState* cs = sender->getClientState();
+    if (cs) {
+      ++cs->tcpGaveUp;
+    }
   }
 
   try {
index f698d9d4353538956939b88883f7019764e069af..498fea31e72ce660257784fa0a7c5c3d9831c932 100644 (file)
@@ -132,9 +132,9 @@ static void handleTimeout(std::shared_ptr<IncomingTCPConnectionState>& state, bo
     return d_ioState != nullptr;
   }
 
-  const ClientState& getClientState() override
+  const ClientState* getClientState() override
   {
-    return *d_ci.cs;
+    return d_ci.cs;
   }
 
   std::string toString() const
index e48c599f644c8fa2923c0fa6962c1679b7993ad8..d891f31cddc33dbdad054f746d264381a176d386 100644 (file)
@@ -144,7 +144,7 @@ public:
   }
 
   virtual bool active() const = 0;
-  virtual const ClientState& getClientState() = 0;
+  virtual const ClientState* getClientState() = 0;
   virtual void handleResponse(const struct timeval& now, TCPResponse&& response) = 0;
   virtual void handleXFRResponse(const struct timeval& now, TCPResponse&& response) = 0;
   virtual void notifyIOError(IDState&& query, const struct timeval& now) = 0;
index 6662e7d3dedfadb6dd8467be08948938055fa14d..a7a43e26d81e005f57ece6169595e96a591ce71d 100644 (file)
@@ -424,13 +424,13 @@ public:
     return true;
   }
 
-  const ClientState& getClientState() override
+  const ClientState* getClientState() override
   {
     if (!du || !du->dsc || !du->dsc->cs) {
       throw std::runtime_error("No query associated to this DoHTCPCrossQuerySender");
     }
 
-    return *du->dsc->cs;
+    return du->dsc->cs;
   }
 
   void handleResponse(const struct timeval& now, TCPResponse&& response) override