]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
dnsdist: Add metrics for outgoing DoH and cross-protocol flows
authorRemi Gacogne <remi.gacogne@powerdns.com>
Thu, 9 Sep 2021 14:43:28 +0000 (16:43 +0200)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Mon, 13 Sep 2021 13:34:34 +0000 (15:34 +0200)
pdns/dnsdist-lua-inspection.cc
pdns/dnsdist-tcp.cc
pdns/dnsdist.hh
pdns/dnsdistdist/dnsdist-nghttp2.cc
pdns/dnsdistdist/dnsdist-tcp-downstream.cc
pdns/dnsdistdist/dnsdist-tcp.hh

index 1853e0fc0fc99842cd30f48e2e22f5916e620062..44aafbe555cc45a2a744e4a2d6847e94da2f2a2e 100644 (file)
@@ -22,6 +22,7 @@
 #include "dnsdist.hh"
 #include "dnsdist-lua.hh"
 #include "dnsdist-dynblocks.hh"
+#include "dnsdist-nghttp2.hh"
 #include "dnsdist-rings.hh"
 #include "dnsdist-tcp.hh"
 
@@ -610,8 +611,8 @@ void setupLuaInspection(LuaContext& luaCtx)
       ret << endl;
 
       ret << "Backends:" << endl;
-      fmt = boost::format("%-3d %-20.20s %-20.20s %-20d %-20d %-25d %-20d %-20d %-20d %-20d %-20d %-20d %-20d %-20d %-20f %-20f");
-      ret << (fmt % "#" % "Name" % "Address" % "Connections" % " Max concurrent conn" % "Died sending query" % "Died reading response" % "Gave up" % "Read timeouts" % "Write timeouts" % "Connect timeouts" % "Total connections" % "Reused connections" % "TLS resumptions" % "Avg queries/conn" % "Avg duration") << endl;
+      fmt = boost::format("%-3d %-20.20s %-20.20s %-20d %-20d %-25d %-25d %-20d %-20d %-20d %-20d %-20d %-20d %-20d %-20f %-20f");
+      ret << (fmt % "#" % "Name" % "Address" % "Connections" % "Max concurrent conn" % "Died sending query" % "Died reading response" % "Gave up" % "Read timeouts" % "Write timeouts" % "Connect timeouts" % "Total connections" % "Reused connections" % "TLS resumptions" % "Avg queries/conn" % "Avg duration") << endl;
 
       auto states = g_dstates.getLocal();
       counter = 0;
@@ -660,6 +661,11 @@ void setupLuaInspection(LuaContext& luaCtx)
     g_tcpStatesDumpRequested += g_tcpclientthreads->getThreadsCount();
   });
 
+  luaCtx.writeFunction("requestDoHStatesDump", [] {
+    setLuaNoSideEffect();
+    g_dohStatesDumpRequested += g_dohClientThreads->getThreadsCount();
+  });
+
   luaCtx.writeFunction("dumpStats", [] {
       setLuaNoSideEffect();
       vector<string> leftcolumn, rightcolumn;
index 20d1f00fcb4d025b9d896ea8c0bb0cfa155b94f9..a8d8971154914bc33d0ef34c60cadda51420c98d 100644 (file)
@@ -520,6 +520,7 @@ public:
     ssize_t sent = write(d_responseDesc, &ptr, sizeof(ptr));
     if (sent != sizeof(ptr)) {
       if (errno == EAGAIN || errno == EWOULDBLOCK) {
+        ++g_stats.tcpCrossProtocolResponsePipeFull;
         vinfolog("Unable to pass a cross-protocol response to the TCP worker thread because the pipe is full");
       }
       else {
@@ -1220,6 +1221,8 @@ static void tcpClientThread(int pipefd, int crossProtocolQueriesPipeFD, int cros
   for (;;) {
     data.mplexer->run(&now);
 
+    DownstreamConnectionsManager::cleanupClosedTCPConnections(now);
+
     if (now.tv_sec > lastTimeoutScan) {
       lastTimeoutScan = now.tv_sec;
       auto expiredReadConns = data.mplexer->getTimeouts(now, false);
index 31c62c7236cdd5adbb8aec29d68e8dfaed47646c..17dd54fa26400e42632b790efa81044144ecb566 100644 (file)
@@ -347,7 +347,11 @@ struct DNSDistStats
   stat_t securityStatus{0};
   stat_t dohQueryPipeFull{0};
   stat_t dohResponsePipeFull{0};
+  stat_t outgoingDoHQueryPipeFull{0};
   stat_t proxyProtocolInvalid{0};
+  stat_t tcpQueryPipeFull{0};
+  stat_t tcpCrossProtocolQueryPipeFull{0};
+  stat_t tcpCrossProtocolResponsePipeFull{0};
 
   double latencyAvg100{0}, latencyAvg1000{0}, latencyAvg10000{0}, latencyAvg1000000{0};
   typedef std::function<uint64_t(const std::string&)> statfunction_t;
@@ -405,6 +409,10 @@ struct DNSDistStats
     {"security-status", &securityStatus},
     {"doh-query-pipe-full", &dohQueryPipeFull},
     {"doh-response-pipe-full", &dohResponsePipeFull},
+    {"outgoing-doh-query-pipe-full", &outgoingDoHQueryPipeFull},
+    {"tcp-query-pipe-full", &tcpQueryPipeFull},
+    {"tcp-cross-protocol-query-pipe-full", &tcpCrossProtocolQueryPipeFull},
+    {"tcp-cross-protocol-response-pipe-full", &tcpCrossProtocolResponsePipeFull},
     // Latency histogram
     {"latency-sum", &latencySum},
     {"latency-count", getLatencyCount},
index 57d8dafe2060c99205bf4f21e0d8535831ec76fd..abebdec1b4353da6e2097257b45455e56e49e35d 100644 (file)
@@ -58,7 +58,10 @@ public:
     return o.str();
   }
 
+  bool reachedMaxStreamID() const;
   bool canBeReused() const override;
+  /* full now but will become usable later */
+  bool willBeReusable() const;
 
   void setHealthCheck(bool h)
   {
@@ -92,6 +95,7 @@ private:
   void addToIOState(IOState state, FDMultiplexer::callbackfunc_t callback);
   void updateIO(IOState newState, FDMultiplexer::callbackfunc_t callback);
   void stopIO();
+
   void handleResponse(PendingRequest&& request);
   void handleResponseError(PendingRequest&& request, const struct timeval& now);
   void handleIOError();
@@ -114,6 +118,7 @@ private:
   size_t d_inPos{0};
   uint32_t d_highestStreamID{0};
   bool d_healthCheckQuery{false};
+  bool d_firstWrite{true};
 };
 
 class DownstreamDoHConnectionsManager
@@ -186,9 +191,27 @@ void DoHConnectionToBackend::handleIOError()
 
 void DoHConnectionToBackend::handleTimeout(const struct timeval& now, bool write)
 {
+  if (write) {
+    if (d_firstWrite) {
+      ++d_ds->tcpConnectTimeouts;
+    }
+    else {
+      ++d_ds->tcpWriteTimeouts;
+    }
+  }
+  else {
+    ++d_ds->tcpReadTimeouts;
+  }
+
   handleIOError();
 }
 
+bool DoHConnectionToBackend::reachedMaxStreamID() const
+{
+  const uint32_t maximumStreamID = (static_cast<uint32_t>(1) << 31) - 1;
+  return d_highestStreamID == maximumStreamID;
+}
+
 bool DoHConnectionToBackend::canBeReused() const
 {
   if (d_connectionDied) {
@@ -199,8 +222,7 @@ bool DoHConnectionToBackend::canBeReused() const
     return false;
   }
 
-  const uint32_t maximumStreamID = (static_cast<uint32_t>(1) << 31) - 1;
-  if (d_highestStreamID == maximumStreamID) {
+  if (reachedMaxStreamID()) {
     return false;
   }
 
@@ -212,6 +234,15 @@ bool DoHConnectionToBackend::canBeReused() const
   return true;
 }
 
+bool DoHConnectionToBackend::willBeReusable() const
+{
+  if (!d_connectionDied && d_proxyProtocolPayload.empty() && !reachedMaxStreamID()) {
+    return true;
+  }
+
+  return false;
+}
+
 const std::unordered_map<std::string, std::string> DoHConnectionToBackend::s_constants = {
   {"method-name", ":method"},
   {"method-value", "POST"},
@@ -253,7 +284,6 @@ void DoHConnectionToBackend::addDynamicHeader(std::vector<nghttp2_nv>& headers,
 
 void DoHConnectionToBackend::queueQuery(std::shared_ptr<TCPQuerySender>& sender, TCPQuery&& query)
 {
-  // cerr<<"in "<<__PRETTY_FUNCTION__<<" with query ID "<<ntohs(dh->id)<<endl;
   auto payloadSize = std::to_string(query.d_buffer.size());
 
   bool addXForwarded = d_ds->d_addXForwardedHeaders;
@@ -341,14 +371,15 @@ void DoHConnectionToBackend::queueQuery(std::shared_ptr<TCPQuerySender>& sender,
   auto newStreamId = nghttp2_submit_request(d_session.get(), nullptr, headers.data(), headers.size(), &data_provider, this);
   if (newStreamId < 0) {
     d_connectionDied = true;
+    ++d_ds->tcpDiedSendingQuery;
     d_currentStreams.erase(streamId);
     throw std::runtime_error("Error submitting HTTP request:" + std::string(nghttp2_strerror(newStreamId)));
   }
-  // cerr<<"stream ID is "<<newStreamId<<" for a query of size "<<payloadSize<<endl;
 
   auto rv = nghttp2_session_send(d_session.get());
   if (rv != 0) {
     d_connectionDied = true;
+    ++d_ds->tcpDiedSendingQuery;
     d_currentStreams.erase(streamId);
     throw std::runtime_error("Error in nghttp2_session_send:" + std::to_string(rv));
   }
@@ -397,6 +428,11 @@ void DoHConnectionToBackend::handleReadableIOCallback(int fd, FDMultiplexer::fun
         if (readlen > 0 && static_cast<size_t>(readlen) < conn->d_inPos) {
           throw std::runtime_error("Fatal error while passing received data to nghttp2: " + std::string(nghttp2_strerror((int)readlen)));
         }
+
+        struct timeval now;
+        gettimeofday(&now, nullptr);
+        conn->d_lastDataReceivedTime = now;
+
         // cerr<<"after read send"<<endl;
         nghttp2_session_send(conn->d_session.get());
       }
@@ -419,6 +455,7 @@ void DoHConnectionToBackend::handleReadableIOCallback(int fd, FDMultiplexer::fun
     }
     catch (const std::exception& e) {
       vinfolog("Exception while trying to read from HTTP backend connection: %s", e.what());
+      ++conn->d_ds->tcpDiedReadingResponse;
       conn->handleIOError();
       break;
     }
@@ -442,7 +479,7 @@ void DoHConnectionToBackend::handleWritableIOCallback(int fd, FDMultiplexer::fun
     }
     else if (newState == IOState::Done) {
       // cerr<<"done, buffer size was "<<conn->d_out.size()<<", pos was "<<conn->d_outPos<<endl;
-      ++conn->d_queries;
+      conn->d_firstWrite = false;
       conn->d_out.clear();
       conn->d_outPos = 0;
       conn->stopIO();
@@ -454,6 +491,7 @@ void DoHConnectionToBackend::handleWritableIOCallback(int fd, FDMultiplexer::fun
   }
   catch (const std::exception& e) {
     vinfolog("Exception while trying to write (ready) to HTTP backend connection: %s", e.what());
+    ++conn->d_ds->tcpDiedSendingQuery;
     conn->handleIOError();
   }
 }
@@ -481,7 +519,7 @@ void DoHConnectionToBackend::updateIO(IOState newState, FDMultiplexer::callbackf
   else if (newState == IOState::NeedRead) {
     ttd = getBackendReadTTD(now);
   }
-  else if (isFresh() && d_queries == 0) {
+  else if (isFresh() && d_firstWrite) {
     /* first write just after the non-blocking connect */
     ttd = getBackendConnectTTD(now);
   }
@@ -508,7 +546,7 @@ void DoHConnectionToBackend::addToIOState(IOState state, FDMultiplexer::callback
   if (state == IOState::NeedRead) {
     ttd = getBackendReadTTD(now);
   }
-  else if (isFresh() && d_queries == 0) {
+  else if (isFresh() && d_firstWrite == 0) {
     /* first write just after the non-blocking connect */
     ttd = getBackendConnectTTD(now);
   }
@@ -544,7 +582,7 @@ ssize_t DoHConnectionToBackend::send_callback(nghttp2_session* session, const ui
       auto state = conn->d_handler->tryWrite(conn->d_out, conn->d_outPos, conn->d_out.size());
       // cerr<<"got a "<<(int)state<<" state, "<<conn->d_out.size()-conn->d_outPos<<" bytes remaining"<<endl;
       if (state == IOState::Done) {
-        ++conn->d_queries;
+        conn->d_firstWrite = false;
         conn->d_out.clear();
         conn->d_outPos = 0;
         conn->stopIO();
@@ -559,6 +597,7 @@ ssize_t DoHConnectionToBackend::send_callback(nghttp2_session* session, const ui
     catch (const std::exception& e) {
       vinfolog("Exception while trying to write (send) to HTTP backend connection: %s", e.what());
       conn->handleIOError();
+      ++conn->d_ds->tcpDiedSendingQuery;
     }
   }
 
@@ -599,6 +638,7 @@ int DoHConnectionToBackend::on_frame_recv_callback(nghttp2_session* session, con
     if (stream != conn->d_currentStreams.end()) {
       // cerr<<"Stream "<<frame->hd.stream_id<<" is now finished"<<endl;
       stream->second.d_finished = true;
+      ++conn->d_queries;
 
       auto request = std::move(stream->second);
       conn->d_currentStreams.erase(stream->first);
@@ -619,6 +659,7 @@ int DoHConnectionToBackend::on_frame_recv_callback(nghttp2_session* session, con
     else {
       vinfolog("Stream %d NOT FOUND", frame->hd.stream_id);
       conn->d_connectionDied = true;
+      ++conn->d_ds->tcpDiedReadingResponse;
       return NGHTTP2_ERR_CALLBACK_FAILURE;
     }
   }
@@ -634,11 +675,13 @@ int DoHConnectionToBackend::on_data_chunk_recv_callback(nghttp2_session* session
   if (stream == conn->d_currentStreams.end()) {
     vinfolog("Unable to match the stream ID %d to a known one!", stream_id);
     conn->d_connectionDied = true;
+    ++conn->d_ds->tcpDiedReadingResponse;
     return NGHTTP2_ERR_CALLBACK_FAILURE;
   }
   if (len > std::numeric_limits<uint16_t>::max() || (std::numeric_limits<uint16_t>::max() - stream->second.d_buffer.size()) < len) {
     vinfolog("Data frame of size %d is too large for a DNS response (we already have %d)", len, stream->second.d_buffer.size());
     conn->d_connectionDied = true;
+    ++conn->d_ds->tcpDiedReadingResponse;
     return NGHTTP2_ERR_CALLBACK_FAILURE;
   }
 
@@ -680,6 +723,7 @@ int DoHConnectionToBackend::on_stream_close_callback(nghttp2_session* session, i
 
   // cerr << "Stream " << stream_id << " closed with error_code=" << error_code << endl;
   conn->d_connectionDied = true;
+  ++conn->d_ds->tcpDiedReadingResponse;
 
   auto stream = conn->d_currentStreams.find(stream_id);
   if (stream == conn->d_currentStreams.end()) {
@@ -735,6 +779,7 @@ int DoHConnectionToBackend::on_header_callback(nghttp2_session* session, const n
       catch (...) {
         vinfolog("Error parsing the status header for stream ID %d", frame->hd.stream_id);
         conn->d_connectionDied = true;
+        ++conn->d_ds->tcpDiedReadingResponse;
         return NGHTTP2_ERR_CALLBACK_FAILURE;
       }
     }
@@ -748,6 +793,7 @@ int DoHConnectionToBackend::on_error_callback(nghttp2_session* session, int lib_
 
   DoHConnectionToBackend* conn = reinterpret_cast<DoHConnectionToBackend*>(user_data);
   conn->d_connectionDied = true;
+  ++conn->d_ds->tcpDiedReadingResponse;
 
   return 0;
 }
@@ -761,6 +807,7 @@ DoHConnectionToBackend::DoHConnectionToBackend(std::shared_ptr<DownstreamState>
   nghttp2_session_callbacks* cbs = nullptr;
   if (nghttp2_session_callbacks_new(&cbs) != 0) {
     d_connectionDied = true;
+    ++d_ds->tcpDiedSendingQuery;
     vinfolog("Unable to create a callback object for a new HTTP/2 session");
     return;
   }
@@ -777,6 +824,7 @@ DoHConnectionToBackend::DoHConnectionToBackend(std::shared_ptr<DownstreamState>
   nghttp2_session* sess = nullptr;
   if (nghttp2_session_client_new(&sess, callbacks.get(), this) != 0) {
     d_connectionDied = true;
+    ++d_ds->tcpDiedSendingQuery;
     vinfolog("Coult not allocate a new HTTP/2 session");
     return;
   }
@@ -800,6 +848,7 @@ DoHConnectionToBackend::DoHConnectionToBackend(std::shared_ptr<DownstreamState>
   int rv = nghttp2_submit_settings(d_session.get(), NGHTTP2_FLAG_NONE, iv, sizeof(iv) / sizeof(*iv));
   if (rv != 0) {
     d_connectionDied = true;
+    ++d_ds->tcpDiedSendingQuery;
     vinfolog("Could not submit SETTINGS: %s", nghttp2_strerror(rv));
     return;
   }
@@ -841,13 +890,18 @@ bool DownstreamDoHConnectionsManager::removeDownstreamConnection(std::shared_ptr
 
 void DownstreamDoHConnectionsManager::cleanupClosedConnections(struct timeval now)
 {
+  if (s_cleanupInterval <= 0 || (t_nextCleanup > 0 && t_nextCleanup > now.tv_sec)) {
+    return;
+  }
+  t_nextCleanup = now.tv_sec + s_cleanupInterval;
+
   struct timeval freshCutOff = now;
   freshCutOff.tv_sec -= 1;
 
   for (auto dsIt = t_downstreamConnections.begin(); dsIt != t_downstreamConnections.end();) {
     for (auto connIt = dsIt->second.begin(); connIt != dsIt->second.end();) {
       if (!(*connIt)) {
-        ++connIt;
+        connIt = dsIt->second.erase(connIt);
         continue;
       }
 
@@ -882,11 +936,7 @@ std::shared_ptr<DoHConnectionToBackend> DownstreamDoHConnectionsManager::getConn
 
   auto backendId = ds->getID();
 
-  if (s_cleanupInterval > 0 && (t_nextCleanup == 0 || t_nextCleanup <= now.tv_sec)) {
-    t_nextCleanup = now.tv_sec + s_cleanupInterval;
-    //cerr<<"cleaning up"<<endl;
-    cleanupClosedConnections(now);
-  }
+  cleanupClosedConnections(now);
 
   const bool haveProxyProtocol = !proxyProtocolPayload.empty();
   if (!haveProxyProtocol) {
@@ -897,7 +947,12 @@ std::shared_ptr<DoHConnectionToBackend> DownstreamDoHConnectionsManager::getConn
       for (auto listIt = list.begin(); listIt != list.end();) {
         auto& entry = *listIt;
         if (!entry->canBeReused()) {
-          listIt = list.erase(listIt);
+          if (!entry->willBeReusable()) {
+            listIt = list.erase(listIt);
+          }
+          else {
+            ++listIt;
+          }
           continue;
         }
         entry->setReused();
@@ -975,7 +1030,6 @@ static void dohClientThread(int crossProtocolPipeFD)
   setThreadName("dnsdist/dohClie");
 
   DoHClientThreadData data;
-
   data.mplexer->addReadFD(crossProtocolPipeFD, handleCrossProtocolQuery, &data);
 
   struct timeval now;
@@ -988,6 +1042,7 @@ static void dohClientThread(int crossProtocolPipeFD)
     if (now.tv_sec > lastTimeoutScan) {
       lastTimeoutScan = now.tv_sec;
 
+      DownstreamDoHConnectionsManager::cleanupClosedConnections(now);
       handleH2Timeouts(*data.mplexer, now);
 
       if (g_dohStatesDumpRequested > 0) {
@@ -1092,6 +1147,7 @@ bool DoHClientCollection::passCrossProtocolQueryToThread(std::unique_ptr<CrossPr
 
   if (write(pipe, &tmp, sizeof(tmp)) != sizeof(tmp)) {
     delete tmp;
+    ++g_stats.outgoingDoHQueryPipeFull;
     tmp = nullptr;
     return false;
   }
@@ -1175,7 +1231,9 @@ bool initDoHWorkers()
 #ifdef HAVE_NGHTTP2
   if (g_outgoingDoHWorkerThreads > 0) {
     g_dohClientThreads = std::make_unique<DoHClientCollection>(g_outgoingDoHWorkerThreads);
-    g_dohClientThreads->addThread();
+    for (size_t idx = 0; idx < g_outgoingDoHWorkerThreads; idx++) {
+      g_dohClientThreads->addThread();
+    }
   }
   return true;
 #else
index 59c065d3384ff987c3b9e9129351757c19d6a02a..f6288293a50b0e06ec1ead7ebd22a412d662bb41 100644 (file)
@@ -31,6 +31,7 @@ TCPConnectionToBackend::~TCPConnectionToBackend()
       }
     }
     auto diff = now - d_connectionStartTime;
+    // cerr<<"connection to backend terminated after "<<d_queries<<" queries, "<<diff.tv_sec<<" seconds"<<endl;
     d_ds->updateTCPMetrics(d_queries, diff.tv_sec * 1000 + diff.tv_usec / 1000);
   }
 }
@@ -721,10 +722,7 @@ std::shared_ptr<TCPConnectionToBackend> DownstreamConnectionsManager::getConnect
 
   auto backendId = ds->getID();
 
-  if (s_cleanupInterval > 0 && (t_nextCleanup == 0 || t_nextCleanup <= now.tv_sec)) {
-    t_nextCleanup = now.tv_sec + s_cleanupInterval;
-    cleanupClosedTCPConnections(now);
-  }
+  cleanupClosedTCPConnections(now);
 
   {
     const auto& it = t_downstreamConnections.find(backendId);
@@ -781,6 +779,12 @@ void DownstreamConnectionsManager::releaseDownstreamConnection(std::shared_ptr<T
 
 void DownstreamConnectionsManager::cleanupClosedTCPConnections(struct timeval now)
 {
+  if (s_cleanupInterval == 0 || (t_nextCleanup != 0 && t_nextCleanup > now.tv_sec)) {
+    return;
+  }
+
+  t_nextCleanup = now.tv_sec + s_cleanupInterval;
+
   struct timeval freshCutOff = now;
   freshCutOff.tv_sec -= 1;
 
index 2049cd3cdf7eb21228fe957cf4b52c424edf6d89..4588755a35d0a6a5315a2a8afd7813df34155d98 100644 (file)
@@ -208,6 +208,7 @@ public:
     auto tmp = conn.release();
 
     if (write(pipe, &tmp, sizeof(tmp)) != sizeof(tmp)) {
+      ++g_stats.tcpQueryPipeFull;
       delete tmp;
       tmp = nullptr;
       return false;
@@ -227,6 +228,7 @@ public:
     auto tmp = cpq.release();
 
     if (write(pipe, &tmp, sizeof(tmp)) != sizeof(tmp)) {
+      ++g_stats.tcpCrossProtocolQueryPipeFull;
       delete tmp;
       tmp = nullptr;
       return false;