From: Remi Gacogne Date: Thu, 9 Sep 2021 14:43:28 +0000 (+0200) Subject: dnsdist: Add metrics for outgoing DoH and cross-protocol flows X-Git-Tag: dnsdist-1.7.0-alpha1~23^2~2 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=eec638962fbb0e951a8e07548c0ac34e9f571d3a;p=thirdparty%2Fpdns.git dnsdist: Add metrics for outgoing DoH and cross-protocol flows --- diff --git a/pdns/dnsdist-lua-inspection.cc b/pdns/dnsdist-lua-inspection.cc index 1853e0fc0f..44aafbe555 100644 --- a/pdns/dnsdist-lua-inspection.cc +++ b/pdns/dnsdist-lua-inspection.cc @@ -22,6 +22,7 @@ #include "dnsdist.hh" #include "dnsdist-lua.hh" #include "dnsdist-dynblocks.hh" +#include "dnsdist-nghttp2.hh" #include "dnsdist-rings.hh" #include "dnsdist-tcp.hh" @@ -610,8 +611,8 @@ void setupLuaInspection(LuaContext& luaCtx) ret << endl; ret << "Backends:" << endl; - fmt = boost::format("%-3d %-20.20s %-20.20s %-20d %-20d %-25d %-20d %-20d %-20d %-20d %-20d %-20d %-20d %-20d %-20f %-20f"); - ret << (fmt % "#" % "Name" % "Address" % "Connections" % " Max concurrent conn" % "Died sending query" % "Died reading response" % "Gave up" % "Read timeouts" % "Write timeouts" % "Connect timeouts" % "Total connections" % "Reused connections" % "TLS resumptions" % "Avg queries/conn" % "Avg duration") << endl; + fmt = boost::format("%-3d %-20.20s %-20.20s %-20d %-20d %-25d %-25d %-20d %-20d %-20d %-20d %-20d %-20d %-20d %-20f %-20f"); + ret << (fmt % "#" % "Name" % "Address" % "Connections" % "Max concurrent conn" % "Died sending query" % "Died reading response" % "Gave up" % "Read timeouts" % "Write timeouts" % "Connect timeouts" % "Total connections" % "Reused connections" % "TLS resumptions" % "Avg queries/conn" % "Avg duration") << endl; auto states = g_dstates.getLocal(); counter = 0; @@ -660,6 +661,11 @@ void setupLuaInspection(LuaContext& luaCtx) g_tcpStatesDumpRequested += g_tcpclientthreads->getThreadsCount(); }); + luaCtx.writeFunction("requestDoHStatesDump", [] { + setLuaNoSideEffect(); + g_dohStatesDumpRequested += g_dohClientThreads->getThreadsCount(); + }); + luaCtx.writeFunction("dumpStats", [] { setLuaNoSideEffect(); vector leftcolumn, rightcolumn; diff --git a/pdns/dnsdist-tcp.cc b/pdns/dnsdist-tcp.cc index 20d1f00fcb..a8d8971154 100644 --- a/pdns/dnsdist-tcp.cc +++ b/pdns/dnsdist-tcp.cc @@ -520,6 +520,7 @@ public: ssize_t sent = write(d_responseDesc, &ptr, sizeof(ptr)); if (sent != sizeof(ptr)) { if (errno == EAGAIN || errno == EWOULDBLOCK) { + ++g_stats.tcpCrossProtocolResponsePipeFull; vinfolog("Unable to pass a cross-protocol response to the TCP worker thread because the pipe is full"); } else { @@ -1220,6 +1221,8 @@ static void tcpClientThread(int pipefd, int crossProtocolQueriesPipeFD, int cros for (;;) { data.mplexer->run(&now); + DownstreamConnectionsManager::cleanupClosedTCPConnections(now); + if (now.tv_sec > lastTimeoutScan) { lastTimeoutScan = now.tv_sec; auto expiredReadConns = data.mplexer->getTimeouts(now, false); diff --git a/pdns/dnsdist.hh b/pdns/dnsdist.hh index 31c62c7236..17dd54fa26 100644 --- a/pdns/dnsdist.hh +++ b/pdns/dnsdist.hh @@ -347,7 +347,11 @@ struct DNSDistStats stat_t securityStatus{0}; stat_t dohQueryPipeFull{0}; stat_t dohResponsePipeFull{0}; + stat_t outgoingDoHQueryPipeFull{0}; stat_t proxyProtocolInvalid{0}; + stat_t tcpQueryPipeFull{0}; + stat_t tcpCrossProtocolQueryPipeFull{0}; + stat_t tcpCrossProtocolResponsePipeFull{0}; double latencyAvg100{0}, latencyAvg1000{0}, latencyAvg10000{0}, latencyAvg1000000{0}; typedef std::function statfunction_t; @@ -405,6 +409,10 @@ struct DNSDistStats {"security-status", &securityStatus}, {"doh-query-pipe-full", &dohQueryPipeFull}, {"doh-response-pipe-full", &dohResponsePipeFull}, + {"outgoing-doh-query-pipe-full", &outgoingDoHQueryPipeFull}, + {"tcp-query-pipe-full", &tcpQueryPipeFull}, + {"tcp-cross-protocol-query-pipe-full", &tcpCrossProtocolQueryPipeFull}, + {"tcp-cross-protocol-response-pipe-full", &tcpCrossProtocolResponsePipeFull}, // Latency histogram {"latency-sum", &latencySum}, {"latency-count", getLatencyCount}, diff --git a/pdns/dnsdistdist/dnsdist-nghttp2.cc b/pdns/dnsdistdist/dnsdist-nghttp2.cc index 57d8dafe20..abebdec1b4 100644 --- a/pdns/dnsdistdist/dnsdist-nghttp2.cc +++ b/pdns/dnsdistdist/dnsdist-nghttp2.cc @@ -58,7 +58,10 @@ public: return o.str(); } + bool reachedMaxStreamID() const; bool canBeReused() const override; + /* full now but will become usable later */ + bool willBeReusable() const; void setHealthCheck(bool h) { @@ -92,6 +95,7 @@ private: void addToIOState(IOState state, FDMultiplexer::callbackfunc_t callback); void updateIO(IOState newState, FDMultiplexer::callbackfunc_t callback); void stopIO(); + void handleResponse(PendingRequest&& request); void handleResponseError(PendingRequest&& request, const struct timeval& now); void handleIOError(); @@ -114,6 +118,7 @@ private: size_t d_inPos{0}; uint32_t d_highestStreamID{0}; bool d_healthCheckQuery{false}; + bool d_firstWrite{true}; }; class DownstreamDoHConnectionsManager @@ -186,9 +191,27 @@ void DoHConnectionToBackend::handleIOError() void DoHConnectionToBackend::handleTimeout(const struct timeval& now, bool write) { + if (write) { + if (d_firstWrite) { + ++d_ds->tcpConnectTimeouts; + } + else { + ++d_ds->tcpWriteTimeouts; + } + } + else { + ++d_ds->tcpReadTimeouts; + } + handleIOError(); } +bool DoHConnectionToBackend::reachedMaxStreamID() const +{ + const uint32_t maximumStreamID = (static_cast(1) << 31) - 1; + return d_highestStreamID == maximumStreamID; +} + bool DoHConnectionToBackend::canBeReused() const { if (d_connectionDied) { @@ -199,8 +222,7 @@ bool DoHConnectionToBackend::canBeReused() const return false; } - const uint32_t maximumStreamID = (static_cast(1) << 31) - 1; - if (d_highestStreamID == maximumStreamID) { + if (reachedMaxStreamID()) { return false; } @@ -212,6 +234,15 @@ bool DoHConnectionToBackend::canBeReused() const return true; } +bool DoHConnectionToBackend::willBeReusable() const +{ + if (!d_connectionDied && d_proxyProtocolPayload.empty() && !reachedMaxStreamID()) { + return true; + } + + return false; +} + const std::unordered_map DoHConnectionToBackend::s_constants = { {"method-name", ":method"}, {"method-value", "POST"}, @@ -253,7 +284,6 @@ void DoHConnectionToBackend::addDynamicHeader(std::vector& headers, void DoHConnectionToBackend::queueQuery(std::shared_ptr& sender, TCPQuery&& query) { - // cerr<<"in "<<__PRETTY_FUNCTION__<<" with query ID "<id)<d_addXForwardedHeaders; @@ -341,14 +371,15 @@ void DoHConnectionToBackend::queueQuery(std::shared_ptr& sender, auto newStreamId = nghttp2_submit_request(d_session.get(), nullptr, headers.data(), headers.size(), &data_provider, this); if (newStreamId < 0) { d_connectionDied = true; + ++d_ds->tcpDiedSendingQuery; d_currentStreams.erase(streamId); throw std::runtime_error("Error submitting HTTP request:" + std::string(nghttp2_strerror(newStreamId))); } - // cerr<<"stream ID is "<tcpDiedSendingQuery; d_currentStreams.erase(streamId); throw std::runtime_error("Error in nghttp2_session_send:" + std::to_string(rv)); } @@ -397,6 +428,11 @@ void DoHConnectionToBackend::handleReadableIOCallback(int fd, FDMultiplexer::fun if (readlen > 0 && static_cast(readlen) < conn->d_inPos) { throw std::runtime_error("Fatal error while passing received data to nghttp2: " + std::string(nghttp2_strerror((int)readlen))); } + + struct timeval now; + gettimeofday(&now, nullptr); + conn->d_lastDataReceivedTime = now; + // cerr<<"after read send"<d_session.get()); } @@ -419,6 +455,7 @@ void DoHConnectionToBackend::handleReadableIOCallback(int fd, FDMultiplexer::fun } catch (const std::exception& e) { vinfolog("Exception while trying to read from HTTP backend connection: %s", e.what()); + ++conn->d_ds->tcpDiedReadingResponse; conn->handleIOError(); break; } @@ -442,7 +479,7 @@ void DoHConnectionToBackend::handleWritableIOCallback(int fd, FDMultiplexer::fun } else if (newState == IOState::Done) { // cerr<<"done, buffer size was "<d_out.size()<<", pos was "<d_outPos<d_queries; + conn->d_firstWrite = false; conn->d_out.clear(); conn->d_outPos = 0; conn->stopIO(); @@ -454,6 +491,7 @@ void DoHConnectionToBackend::handleWritableIOCallback(int fd, FDMultiplexer::fun } catch (const std::exception& e) { vinfolog("Exception while trying to write (ready) to HTTP backend connection: %s", e.what()); + ++conn->d_ds->tcpDiedSendingQuery; conn->handleIOError(); } } @@ -481,7 +519,7 @@ void DoHConnectionToBackend::updateIO(IOState newState, FDMultiplexer::callbackf else if (newState == IOState::NeedRead) { ttd = getBackendReadTTD(now); } - else if (isFresh() && d_queries == 0) { + else if (isFresh() && d_firstWrite) { /* first write just after the non-blocking connect */ ttd = getBackendConnectTTD(now); } @@ -508,7 +546,7 @@ void DoHConnectionToBackend::addToIOState(IOState state, FDMultiplexer::callback if (state == IOState::NeedRead) { ttd = getBackendReadTTD(now); } - else if (isFresh() && d_queries == 0) { + else if (isFresh() && d_firstWrite == 0) { /* first write just after the non-blocking connect */ ttd = getBackendConnectTTD(now); } @@ -544,7 +582,7 @@ ssize_t DoHConnectionToBackend::send_callback(nghttp2_session* session, const ui auto state = conn->d_handler->tryWrite(conn->d_out, conn->d_outPos, conn->d_out.size()); // cerr<<"got a "<<(int)state<<" state, "<d_out.size()-conn->d_outPos<<" bytes remaining"<d_queries; + conn->d_firstWrite = false; conn->d_out.clear(); conn->d_outPos = 0; conn->stopIO(); @@ -559,6 +597,7 @@ ssize_t DoHConnectionToBackend::send_callback(nghttp2_session* session, const ui catch (const std::exception& e) { vinfolog("Exception while trying to write (send) to HTTP backend connection: %s", e.what()); conn->handleIOError(); + ++conn->d_ds->tcpDiedSendingQuery; } } @@ -599,6 +638,7 @@ int DoHConnectionToBackend::on_frame_recv_callback(nghttp2_session* session, con if (stream != conn->d_currentStreams.end()) { // cerr<<"Stream "<hd.stream_id<<" is now finished"<second.d_finished = true; + ++conn->d_queries; auto request = std::move(stream->second); conn->d_currentStreams.erase(stream->first); @@ -619,6 +659,7 @@ int DoHConnectionToBackend::on_frame_recv_callback(nghttp2_session* session, con else { vinfolog("Stream %d NOT FOUND", frame->hd.stream_id); conn->d_connectionDied = true; + ++conn->d_ds->tcpDiedReadingResponse; return NGHTTP2_ERR_CALLBACK_FAILURE; } } @@ -634,11 +675,13 @@ int DoHConnectionToBackend::on_data_chunk_recv_callback(nghttp2_session* session if (stream == conn->d_currentStreams.end()) { vinfolog("Unable to match the stream ID %d to a known one!", stream_id); conn->d_connectionDied = true; + ++conn->d_ds->tcpDiedReadingResponse; return NGHTTP2_ERR_CALLBACK_FAILURE; } if (len > std::numeric_limits::max() || (std::numeric_limits::max() - stream->second.d_buffer.size()) < len) { vinfolog("Data frame of size %d is too large for a DNS response (we already have %d)", len, stream->second.d_buffer.size()); conn->d_connectionDied = true; + ++conn->d_ds->tcpDiedReadingResponse; return NGHTTP2_ERR_CALLBACK_FAILURE; } @@ -680,6 +723,7 @@ int DoHConnectionToBackend::on_stream_close_callback(nghttp2_session* session, i // cerr << "Stream " << stream_id << " closed with error_code=" << error_code << endl; conn->d_connectionDied = true; + ++conn->d_ds->tcpDiedReadingResponse; auto stream = conn->d_currentStreams.find(stream_id); if (stream == conn->d_currentStreams.end()) { @@ -735,6 +779,7 @@ int DoHConnectionToBackend::on_header_callback(nghttp2_session* session, const n catch (...) { vinfolog("Error parsing the status header for stream ID %d", frame->hd.stream_id); conn->d_connectionDied = true; + ++conn->d_ds->tcpDiedReadingResponse; return NGHTTP2_ERR_CALLBACK_FAILURE; } } @@ -748,6 +793,7 @@ int DoHConnectionToBackend::on_error_callback(nghttp2_session* session, int lib_ DoHConnectionToBackend* conn = reinterpret_cast(user_data); conn->d_connectionDied = true; + ++conn->d_ds->tcpDiedReadingResponse; return 0; } @@ -761,6 +807,7 @@ DoHConnectionToBackend::DoHConnectionToBackend(std::shared_ptr nghttp2_session_callbacks* cbs = nullptr; if (nghttp2_session_callbacks_new(&cbs) != 0) { d_connectionDied = true; + ++d_ds->tcpDiedSendingQuery; vinfolog("Unable to create a callback object for a new HTTP/2 session"); return; } @@ -777,6 +824,7 @@ DoHConnectionToBackend::DoHConnectionToBackend(std::shared_ptr nghttp2_session* sess = nullptr; if (nghttp2_session_client_new(&sess, callbacks.get(), this) != 0) { d_connectionDied = true; + ++d_ds->tcpDiedSendingQuery; vinfolog("Coult not allocate a new HTTP/2 session"); return; } @@ -800,6 +848,7 @@ DoHConnectionToBackend::DoHConnectionToBackend(std::shared_ptr int rv = nghttp2_submit_settings(d_session.get(), NGHTTP2_FLAG_NONE, iv, sizeof(iv) / sizeof(*iv)); if (rv != 0) { d_connectionDied = true; + ++d_ds->tcpDiedSendingQuery; vinfolog("Could not submit SETTINGS: %s", nghttp2_strerror(rv)); return; } @@ -841,13 +890,18 @@ bool DownstreamDoHConnectionsManager::removeDownstreamConnection(std::shared_ptr void DownstreamDoHConnectionsManager::cleanupClosedConnections(struct timeval now) { + if (s_cleanupInterval <= 0 || (t_nextCleanup > 0 && t_nextCleanup > now.tv_sec)) { + return; + } + t_nextCleanup = now.tv_sec + s_cleanupInterval; + struct timeval freshCutOff = now; freshCutOff.tv_sec -= 1; for (auto dsIt = t_downstreamConnections.begin(); dsIt != t_downstreamConnections.end();) { for (auto connIt = dsIt->second.begin(); connIt != dsIt->second.end();) { if (!(*connIt)) { - ++connIt; + connIt = dsIt->second.erase(connIt); continue; } @@ -882,11 +936,7 @@ std::shared_ptr DownstreamDoHConnectionsManager::getConn auto backendId = ds->getID(); - if (s_cleanupInterval > 0 && (t_nextCleanup == 0 || t_nextCleanup <= now.tv_sec)) { - t_nextCleanup = now.tv_sec + s_cleanupInterval; - //cerr<<"cleaning up"< DownstreamDoHConnectionsManager::getConn for (auto listIt = list.begin(); listIt != list.end();) { auto& entry = *listIt; if (!entry->canBeReused()) { - listIt = list.erase(listIt); + if (!entry->willBeReusable()) { + listIt = list.erase(listIt); + } + else { + ++listIt; + } continue; } entry->setReused(); @@ -975,7 +1030,6 @@ static void dohClientThread(int crossProtocolPipeFD) setThreadName("dnsdist/dohClie"); DoHClientThreadData data; - data.mplexer->addReadFD(crossProtocolPipeFD, handleCrossProtocolQuery, &data); struct timeval now; @@ -988,6 +1042,7 @@ static void dohClientThread(int crossProtocolPipeFD) if (now.tv_sec > lastTimeoutScan) { lastTimeoutScan = now.tv_sec; + DownstreamDoHConnectionsManager::cleanupClosedConnections(now); handleH2Timeouts(*data.mplexer, now); if (g_dohStatesDumpRequested > 0) { @@ -1092,6 +1147,7 @@ bool DoHClientCollection::passCrossProtocolQueryToThread(std::unique_ptr 0) { g_dohClientThreads = std::make_unique(g_outgoingDoHWorkerThreads); - g_dohClientThreads->addThread(); + for (size_t idx = 0; idx < g_outgoingDoHWorkerThreads; idx++) { + g_dohClientThreads->addThread(); + } } return true; #else diff --git a/pdns/dnsdistdist/dnsdist-tcp-downstream.cc b/pdns/dnsdistdist/dnsdist-tcp-downstream.cc index 59c065d338..f6288293a5 100644 --- a/pdns/dnsdistdist/dnsdist-tcp-downstream.cc +++ b/pdns/dnsdistdist/dnsdist-tcp-downstream.cc @@ -31,6 +31,7 @@ TCPConnectionToBackend::~TCPConnectionToBackend() } } auto diff = now - d_connectionStartTime; + // cerr<<"connection to backend terminated after "<updateTCPMetrics(d_queries, diff.tv_sec * 1000 + diff.tv_usec / 1000); } } @@ -721,10 +722,7 @@ std::shared_ptr DownstreamConnectionsManager::getConnect auto backendId = ds->getID(); - if (s_cleanupInterval > 0 && (t_nextCleanup == 0 || t_nextCleanup <= now.tv_sec)) { - t_nextCleanup = now.tv_sec + s_cleanupInterval; - cleanupClosedTCPConnections(now); - } + cleanupClosedTCPConnections(now); { const auto& it = t_downstreamConnections.find(backendId); @@ -781,6 +779,12 @@ void DownstreamConnectionsManager::releaseDownstreamConnection(std::shared_ptr now.tv_sec)) { + return; + } + + t_nextCleanup = now.tv_sec + s_cleanupInterval; + struct timeval freshCutOff = now; freshCutOff.tv_sec -= 1; diff --git a/pdns/dnsdistdist/dnsdist-tcp.hh b/pdns/dnsdistdist/dnsdist-tcp.hh index 2049cd3cdf..4588755a35 100644 --- a/pdns/dnsdistdist/dnsdist-tcp.hh +++ b/pdns/dnsdistdist/dnsdist-tcp.hh @@ -208,6 +208,7 @@ public: auto tmp = conn.release(); if (write(pipe, &tmp, sizeof(tmp)) != sizeof(tmp)) { + ++g_stats.tcpQueryPipeFull; delete tmp; tmp = nullptr; return false; @@ -227,6 +228,7 @@ public: auto tmp = cpq.release(); if (write(pipe, &tmp, sizeof(tmp)) != sizeof(tmp)) { + ++g_stats.tcpCrossProtocolQueryPipeFull; delete tmp; tmp = nullptr; return false;