From: Remi Gacogne Date: Tue, 24 Aug 2021 15:39:49 +0000 (+0200) Subject: dnsdist: Implement health-checks for DoH backends X-Git-Tag: dnsdist-1.7.0-alpha1~23^2~32 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=8229d4f07168004a5ce2584f9f0143ba880eb7da;p=thirdparty%2Fpdns.git dnsdist: Implement health-checks for DoH backends --- diff --git a/pdns/dnsdist.cc b/pdns/dnsdist.cc index 490a3d52c8..00fa3f2286 100644 --- a/pdns/dnsdist.cc +++ b/pdns/dnsdist.cc @@ -1312,9 +1312,9 @@ public: return true; } - const ClientState& getClientState() override + const ClientState* getClientState() override { - return d_cs; + return &d_cs; } void handleResponse(const struct timeval& now, TCPResponse&& response) override @@ -1839,7 +1839,7 @@ static void healthChecksThread() for(;;) { sleep(interval); - auto mplexer = std::shared_ptr(FDMultiplexer::getMultiplexerSilent()); + auto mplexer = std::unique_ptr(FDMultiplexer::getMultiplexerSilent()); auto states = g_dstates.getLocal(); // this points to the actual shared_ptrs! for(auto& dss : *states) { if (++dss->lastCheck < dss->checkInterval) { @@ -1896,7 +1896,7 @@ static void healthChecksThread() } } - handleQueuedHealthChecks(mplexer); + handleQueuedHealthChecks(*mplexer); } } @@ -2576,7 +2576,7 @@ int main(int argc, char** argv) checkFileDescriptorsLimits(udpBindsCount, tcpBindsCount); - auto mplexer = std::shared_ptr(FDMultiplexer::getMultiplexerSilent()); + auto mplexer = std::unique_ptr(FDMultiplexer::getMultiplexerSilent()); for(auto& dss : g_dstates.getCopy()) { // it is a copy, but the internal shared_ptrs are the real deal if (dss->availability == DownstreamState::Availability::Auto) { if (!queueHealthCheck(mplexer, dss, true)) { @@ -2585,7 +2585,7 @@ int main(int argc, char** argv) } } } - handleQueuedHealthChecks(mplexer, true); + handleQueuedHealthChecks(*mplexer, true); /* we need to create the TCP worker threads before the acceptor ones, otherwise we might crash when processing diff --git a/pdns/dnsdist.hh b/pdns/dnsdist.hh index 4ba0b80876..067a6e77df 100644 --- a/pdns/dnsdist.hh +++ b/pdns/dnsdist.hh @@ -826,6 +826,11 @@ struct DownstreamState return d_tcpOnly || d_tlsCtx != nullptr; } + bool isDoH() const + { + return !d_dohPath.empty(); + } + bool passCrossProtocolQuery(std::unique_ptr&& cpq); private: diff --git a/pdns/dnsdistdist/dnsdist-healthchecks.cc b/pdns/dnsdistdist/dnsdist-healthchecks.cc index acae3cd63e..b8c280ac63 100644 --- a/pdns/dnsdistdist/dnsdist-healthchecks.cc +++ b/pdns/dnsdistdist/dnsdist-healthchecks.cc @@ -24,6 +24,8 @@ #include "tcpiohandler-mplexer.hh" #include "dnswriter.hh" #include "dolog.hh" +#include "dnsdist-tcp.hh" +#include "dnsdist-nghttp2.hh" bool g_verboseHealthChecks{false}; @@ -31,12 +33,12 @@ struct HealthCheckData { enum class TCPState : uint8_t { WritingQuery, ReadingResponseSize, ReadingResponse }; - HealthCheckData(std::shared_ptr& mplexer, const std::shared_ptr& ds, DNSName&& checkName, uint16_t checkType, uint16_t checkClass, uint16_t queryID): d_ds(ds), d_mplexer(mplexer), d_udpSocket(-1), d_checkName(std::move(checkName)), d_checkType(checkType), d_checkClass(checkClass), d_queryID(queryID) + HealthCheckData(FDMultiplexer& mplexer, const std::shared_ptr& ds, DNSName&& checkName, uint16_t checkType, uint16_t checkClass, uint16_t queryID): d_ds(ds), d_mplexer(mplexer), d_udpSocket(-1), d_checkName(std::move(checkName)), d_checkType(checkType), d_checkClass(checkClass), d_queryID(queryID) { } const std::shared_ptr d_ds; - std::shared_ptr d_mplexer; + FDMultiplexer& d_mplexer; std::unique_ptr d_tcpHandler{nullptr}; std::unique_ptr d_ioState{nullptr}; PacketBuffer d_buffer; @@ -178,10 +180,51 @@ static bool handleResponse(std::shared_ptr& data) return true; } +class HealthCheckQuerySender : public TCPQuerySender +{ +public: + HealthCheckQuerySender(std::shared_ptr& data): d_data(data) + { + } + + ~HealthCheckQuerySender() + { + } + + bool active() const override + { + return true; + } + + const ClientState* getClientState() override + { + return nullptr; + } + + void handleResponse(const struct timeval& now, TCPResponse&& response) override + { + d_data->d_buffer = std::move(response.d_buffer); + updateHealthCheckResult(d_data->d_ds, d_data->d_initial, ::handleResponse(d_data)); + } + + void handleXFRResponse(const struct timeval& now, TCPResponse&& response) override + { + throw std::runtime_error("Unexpected XFR reponse to a health check query"); + } + + void notifyIOError(IDState&& query, const struct timeval& now) override + { + updateHealthCheckResult(d_data->d_ds, d_data->d_initial, false); + } + +private: + std::shared_ptr d_data; +}; + static void healthCheckUDPCallback(int fd, FDMultiplexer::funcparam_t& param) { auto data = boost::any_cast>(param); - data->d_mplexer->removeReadFD(fd); + data->d_mplexer.removeReadFD(fd); ComboAddress from; from.sin4.sin_family = data->d_ds->remote.sin4.sin_family; @@ -264,7 +307,7 @@ static void healthCheckTCPCallback(int fd, FDMultiplexer::funcparam_t& param) } } -bool queueHealthCheck(std::shared_ptr& mplexer, const std::shared_ptr& ds, bool initialCheck) +bool queueHealthCheck(std::unique_ptr& mplexer, const std::shared_ptr& ds, bool initialCheck) { try { @@ -327,7 +370,7 @@ bool queueHealthCheck(std::shared_ptr& mplexer, const std::shared sock.bind(ds->sourceAddr); } - auto data = std::make_shared(mplexer, ds, std::move(checkName), checkType, checkClass, queryID); + auto data = std::make_shared(*mplexer, ds, std::move(checkName), checkType, checkClass, queryID); data->d_initial = initialCheck; gettimeofday(&data->d_ttd, nullptr); @@ -352,6 +395,13 @@ bool queueHealthCheck(std::shared_ptr& mplexer, const std::shared mplexer->addReadFD(data->d_udpSocket.getHandle(), &healthCheckUDPCallback, data, &data->d_ttd); } + else if (ds->isDoH()) { + InternalQuery query(std::move(packet), IDState()); + auto sender = std::shared_ptr(new HealthCheckQuerySender(data)); + if (!sendH2Query(ds, mplexer, sender, std::move(query))) { + updateHealthCheckResult(data->d_ds, data->d_initial, false); + } + } else { data->d_tcpHandler = std::make_unique(ds->d_tlsSubjectName, sock.releaseHandle(), timeval{ds->checkTimeout,0}, ds->d_tlsCtx, time(nullptr)); data->d_ioState = std::make_unique(*mplexer, data->d_tcpHandler->getDescriptor()); @@ -390,18 +440,18 @@ bool queueHealthCheck(std::shared_ptr& mplexer, const std::shared } } -void handleQueuedHealthChecks(std::shared_ptr& mplexer, bool initial) +void handleQueuedHealthChecks(FDMultiplexer& mplexer, bool initial) { - while (mplexer->getWatchedFDCount(false) > 0 || mplexer->getWatchedFDCount(true) > 0) { + while (mplexer.getWatchedFDCount(false) > 0 || mplexer.getWatchedFDCount(true) > 0) { struct timeval now; - int ret = mplexer->run(&now, 100); + int ret = mplexer.run(&now, 100); if (ret == -1) { if (g_verboseHealthChecks) { infolog("Error while waiting for the health check response from backends: %d", ret); } break; } - auto timeouts = mplexer->getTimeouts(now); + auto timeouts = mplexer.getTimeouts(now); for (const auto& timeout : timeouts) { auto data = boost::any_cast>(timeout.second); try { @@ -409,7 +459,7 @@ void handleQueuedHealthChecks(std::shared_ptr& mplexer, bool init data->d_ioState.reset(); } else { - mplexer->removeReadFD(timeout.first); + mplexer.removeReadFD(timeout.first); } if (g_verboseHealthChecks) { infolog("Timeout while waiting for the health check response from backend %s", data->d_ds->getNameWithAddr()); @@ -429,7 +479,7 @@ void handleQueuedHealthChecks(std::shared_ptr& mplexer, bool init } } - timeouts = mplexer->getTimeouts(now, true); + timeouts = mplexer.getTimeouts(now, true); for (const auto& timeout : timeouts) { auto data = boost::any_cast>(timeout.second); try { diff --git a/pdns/dnsdistdist/dnsdist-healthchecks.hh b/pdns/dnsdistdist/dnsdist-healthchecks.hh index 08ac59948d..04af75b0db 100644 --- a/pdns/dnsdistdist/dnsdist-healthchecks.hh +++ b/pdns/dnsdistdist/dnsdist-healthchecks.hh @@ -28,6 +28,6 @@ extern bool g_verboseHealthChecks; void updateHealthCheckResult(const std::shared_ptr& dss, bool initial, bool newState); -bool queueHealthCheck(std::shared_ptr& mplexer, const std::shared_ptr& ds, bool initial=false); -void handleQueuedHealthChecks(std::shared_ptr& mplexer, bool initial=false); +bool queueHealthCheck(std::unique_ptr& mplexer, const std::shared_ptr& ds, bool initial=false); +void handleQueuedHealthChecks(FDMultiplexer& mplexer, bool initial=false); diff --git a/pdns/dnsdistdist/dnsdist-nghttp2.cc b/pdns/dnsdistdist/dnsdist-nghttp2.cc index b248a98a1e..0aea4d7170 100644 --- a/pdns/dnsdistdist/dnsdist-nghttp2.cc +++ b/pdns/dnsdistdist/dnsdist-nghttp2.cc @@ -1017,3 +1017,13 @@ bool setupDoHClientProtocolNegotiation(std::shared_ptr& ctx) ctx->setNextProtocolSelectCallback(select_next_proto_callback); return true; } + +bool sendH2Query(const std::shared_ptr& ds, std::unique_ptr& mplexer, std::shared_ptr& sender, InternalQuery&& query) +{ + struct timeval now; + gettimeofday(&now, nullptr); + + auto newConnection = std::make_shared(ds, mplexer, now); + newConnection->queueQuery(sender, std::move(query)); + return true; +} diff --git a/pdns/dnsdistdist/dnsdist-nghttp2.hh b/pdns/dnsdistdist/dnsdist-nghttp2.hh index 0775898e81..1f3aa23bdd 100644 --- a/pdns/dnsdistdist/dnsdist-nghttp2.hh +++ b/pdns/dnsdistdist/dnsdist-nghttp2.hh @@ -25,6 +25,7 @@ #include #include +#include "dnsdist-tcp.hh" #include "stat_t.hh" struct CrossProtocolQuery; @@ -64,3 +65,7 @@ class TLSCtx; bool initDoHWorkers(); bool setupDoHClientProtocolNegotiation(std::shared_ptr& ctx); + +/* opens a new HTTP/2 connection to the supplied backend (attached to the supplied multiplexer), sends the query, + waits for the response to come back or an error to occur then notifies the sender, closing the connection. */ +bool sendH2Query(const std::shared_ptr& ds, std::unique_ptr& mplexer, std::shared_ptr& sender, InternalQuery&& query); diff --git a/pdns/dnsdistdist/dnsdist-tcp-downstream.cc b/pdns/dnsdistdist/dnsdist-tcp-downstream.cc index 9370a8a007..c097fe8263 100644 --- a/pdns/dnsdistdist/dnsdist-tcp-downstream.cc +++ b/pdns/dnsdistdist/dnsdist-tcp-downstream.cc @@ -455,10 +455,16 @@ void TCPConnectionToBackend::notifyAllQueriesFailed(const struct timeval& now, F } if (reason == FailureReason::timeout) { - ++sender->getClientState().tcpDownstreamTimeouts; + const ClientState* cs = sender->getClientState(); + if (cs) { + ++cs->tcpDownstreamTimeouts; + } } else if (reason == FailureReason::gaveUp) { - ++sender->getClientState().tcpGaveUp; + const ClientState* cs = sender->getClientState(); + if (cs) { + ++cs->tcpGaveUp; + } } try { diff --git a/pdns/dnsdistdist/dnsdist-tcp-upstream.hh b/pdns/dnsdistdist/dnsdist-tcp-upstream.hh index f698d9d435..498fea31e7 100644 --- a/pdns/dnsdistdist/dnsdist-tcp-upstream.hh +++ b/pdns/dnsdistdist/dnsdist-tcp-upstream.hh @@ -132,9 +132,9 @@ static void handleTimeout(std::shared_ptr& state, bo return d_ioState != nullptr; } - const ClientState& getClientState() override + const ClientState* getClientState() override { - return *d_ci.cs; + return d_ci.cs; } std::string toString() const diff --git a/pdns/dnsdistdist/dnsdist-tcp.hh b/pdns/dnsdistdist/dnsdist-tcp.hh index e48c599f64..d891f31cdd 100644 --- a/pdns/dnsdistdist/dnsdist-tcp.hh +++ b/pdns/dnsdistdist/dnsdist-tcp.hh @@ -144,7 +144,7 @@ public: } virtual bool active() const = 0; - virtual const ClientState& getClientState() = 0; + virtual const ClientState* getClientState() = 0; virtual void handleResponse(const struct timeval& now, TCPResponse&& response) = 0; virtual void handleXFRResponse(const struct timeval& now, TCPResponse&& response) = 0; virtual void notifyIOError(IDState&& query, const struct timeval& now) = 0; diff --git a/pdns/dnsdistdist/doh.cc b/pdns/dnsdistdist/doh.cc index 6662e7d3de..a7a43e26d8 100644 --- a/pdns/dnsdistdist/doh.cc +++ b/pdns/dnsdistdist/doh.cc @@ -424,13 +424,13 @@ public: return true; } - const ClientState& getClientState() override + const ClientState* getClientState() override { if (!du || !du->dsc || !du->dsc->cs) { throw std::runtime_error("No query associated to this DoHTCPCrossQuerySender"); } - return *du->dsc->cs; + return du->dsc->cs; } void handleResponse(const struct timeval& now, TCPResponse&& response) override