]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
dnsdist: Add more TCP metrics 7559/head
authorRemi Gacogne <remi.gacogne@powerdns.com>
Fri, 5 Apr 2019 10:51:45 +0000 (12:51 +0200)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Fri, 5 Apr 2019 10:51:45 +0000 (12:51 +0200)
Keep, for each frontend and backend:
- the number of concurrent TCP connections
- the average number of queries per connection
- the average duration of a connection

pdns/dnsdist-carbon.cc
pdns/dnsdist-lua-inspection.cc
pdns/dnsdist-tcp.cc
pdns/dnsdist-web.cc
pdns/dnsdist.hh

index 9ad220f4078bb776d9713c537897f7e1fbd3ac89..1fa49f58468010255209bff4ab789ac568a073d8 100644 (file)
@@ -100,6 +100,9 @@ try
           str<<base<<"tcpgaveup" << ' '<< state->tcpGaveUp.load() << " " << now << "\r\n";
           str<<base<<"tcpreadimeouts" << ' '<< state->tcpReadTimeouts.load() << " " << now << "\r\n";
           str<<base<<"tcpwritetimeouts" << ' '<< state->tcpWriteTimeouts.load() << " " << now << "\r\n";
+          str<<base<<"tcpcurrentconnections" << ' '<< state->tcpCurrentConnections.load() << " " << now << "\r\n";
+          str<<base<<"tcpavgqueriesperconnection" << ' '<< state->tcpAvgQueriesPerConnection.load() << " " << now << "\r\n";
+          str<<base<<"tcpavgconnectionduration" << ' '<< state->tcpAvgConnectionDuration.load() << " " << now << "\r\n";
         }
         for(const auto& front : g_frontends) {
           if (front->udpFD == -1 && front->tcpFD == -1)
@@ -114,6 +117,9 @@ try
           str<<base<<"tcpgaveup" << ' '<< front->tcpGaveUp.load() << " " << now << "\r\n";
           str<<base<<"tcpclientimeouts" << ' '<< front->tcpClientTimeouts.load() << " " << now << "\r\n";
           str<<base<<"tcpdownstreamtimeouts" << ' '<< front->tcpDownstreamTimeouts.load() << " " << now << "\r\n";
+          str<<base<<"tcpcurrentconnections" << ' '<< front->tcpCurrentConnections.load() << " " << now << "\r\n";
+          str<<base<<"tcpavgqueriesperconnection" << ' '<< front->tcpAvgQueriesPerConnection.load() << " " << now << "\r\n";
+          str<<base<<"tcpavgconnectionduration" << ' '<< front->tcpAvgConnectionDuration.load() << " " << now << "\r\n";
         }
         auto localPools = g_pools.getLocal();
         for (const auto& entry : *localPools) {
index 92fb413047bd689e91d7f480e8d39485b12ad3c1..25d9a187d3acd45da71d1dea8e67a797e44bfe26 100644 (file)
@@ -562,24 +562,24 @@ void setupLuaInspection()
       ret << endl;
 
       ret << "Frontends:" << endl;
-      fmt = boost::format("%-3d %-20.20s %-25d %-25d %-25d %-25d %-25d");
-      ret << (fmt % "#" % "Address" % "Died reading query" % "Died sending response" % "Gave up" % "Client timeouts" % "Downstream timeouts" ) << endl;
+      fmt = boost::format("%-3d %-20.20s %-20d %-20d %-25d %-20d %-20d %-20d %-20f %-20f");
+      ret << (fmt % "#" % "Address" % "Connnections" % "Died reading query" % "Died sending response" % "Gave up" % "Client timeouts" % "Downstream timeouts" % "Avg queries/conn" % "Avg duration") << endl;
 
       size_t counter = 0;
       for(const auto& f : g_frontends) {
-        ret << (fmt % counter % f->local.toStringWithPort() % f->tcpDiedReadingQuery % f->tcpDiedSendingResponse % f->tcpGaveUp % f->tcpClientTimeouts % f->tcpDownstreamTimeouts) << endl;
+        ret << (fmt % counter % f->local.toStringWithPort() % f->tcpCurrentConnections % f->tcpDiedReadingQuery % f->tcpDiedSendingResponse % f->tcpGaveUp % f->tcpClientTimeouts % f->tcpDownstreamTimeouts % f->tcpAvgQueriesPerConnection % f->tcpAvgConnectionDuration) << endl;
         ++counter;
       }
       ret << endl;
 
       ret << "Backends:" << endl;
-      fmt = boost::format("%-3d %-20.20s %-20.20s %-25d %-25d %-25d %-25d %-25d");
-      ret << (fmt % "#" % "Name" % "Address" % "Died sending query" % "Died reading response" % "Gave up" % "Read timeouts" % "Write timeouts" ) << endl;
+      fmt = boost::format("%-3d %-20.20s %-20.20s %-20d %-20d %-25d %-20d %-20d %-20d %-20f %-20f");
+      ret << (fmt % "#" % "Name" % "Address" % "Connections" % "Died sending query" % "Died reading response" % "Gave up" % "Read timeouts" % "Write timeouts" % "Avg queries/conn" % "Avg duration") << endl;
 
       auto states = g_dstates.getLocal();
       counter = 0;
       for(const auto& s : *states) {
-        ret << (fmt % counter % s->name % s->remote.toStringWithPort() % s->tcpDiedSendingQuery % s->tcpDiedReadingResponse % s->tcpGaveUp % s->tcpReadTimeouts % s->tcpWriteTimeouts) << endl;
+        ret << (fmt % counter % s->name % s->remote.toStringWithPort() % s->tcpCurrentConnections % s->tcpDiedSendingQuery % s->tcpDiedReadingResponse % s->tcpGaveUp % s->tcpReadTimeouts % s->tcpWriteTimeouts % s->tcpAvgQueriesPerConnection % s->tcpAvgConnectionDuration) << endl;
         ++counter;
       }
 
index e9ee3e6e7585024d9ba12bf2dca0a38eda3e557d..878f056a02ec68844874abe230a466a4ab65eb43 100644 (file)
@@ -55,7 +55,6 @@ using std::atomic;
    Let's start naively.
 */
 
-static thread_local map<ComboAddress, std::deque<std::unique_ptr<Socket>>> t_downstreamSockets;
 static std::mutex tcpClientsCountMutex;
 static std::map<ComboAddress,size_t,ComboAddress::addressOnlyLessThan> tcpClientsCount;
 static const size_t g_maxCachedConnectionsPerDownstream = 20;
@@ -63,10 +62,10 @@ uint64_t g_maxTCPQueuedConnections{1000};
 size_t g_maxTCPQueriesPerConn{0};
 size_t g_maxTCPConnectionDuration{0};
 size_t g_maxTCPConnectionsPerClient{0};
+uint16_t g_downstreamTCPCleanupInterval{60};
 bool g_useTCPSinglePipe{false};
-std::atomic<uint16_t> g_downstreamTCPCleanupInterval{60};
 
-static std::unique_ptr<Socket> setupTCPDownstream(shared_ptr<DownstreamState>& ds, uint16_t& downstreamFailures, int timeout)
+static std::unique_ptr<Socket> setupTCPDownstream(shared_ptr<DownstreamState>& ds, uint16_t& downstreamFailures)
 {
   std::unique_ptr<Socket> result;
 
@@ -86,10 +85,10 @@ static std::unique_ptr<Socket> setupTCPDownstream(shared_ptr<DownstreamState>& d
       result->setNonBlocking();
 #ifdef MSG_FASTOPEN
       if (!ds->tcpFastOpen) {
-        SConnectWithTimeout(result->getHandle(), ds->remote, timeout);
+        SConnectWithTimeout(result->getHandle(), ds->remote, /* no timeout, we will handle it ourselves */ 0);
       }
 #else
-      SConnectWithTimeout(result->getHandle(), ds->remote, timeout);
+      SConnectWithTimeout(result->getHandle(), ds->remote, /* no timeout, we will handle it ourselves */ 0);
 #endif /* MSG_FASTOPEN */
       return result;
     }
@@ -105,49 +104,109 @@ static std::unique_ptr<Socket> setupTCPDownstream(shared_ptr<DownstreamState>& d
   return nullptr;
 }
 
-static std::unique_ptr<Socket> getConnectionToDownstream(std::shared_ptr<DownstreamState>& ds, uint16_t& downstreamFailures, bool& isFresh)
+class TCPConnectionToBackend
 {
-  std::unique_ptr<Socket> result;
+public:
+  TCPConnectionToBackend(std::shared_ptr<DownstreamState>& ds, uint16_t& downstreamFailures, const struct timeval& now): d_ds(ds), d_connectionStartTime(now)
+  {
+    d_socket = setupTCPDownstream(d_ds, downstreamFailures);
+    ++d_ds->tcpCurrentConnections;
+  }
 
-  const auto& it = t_downstreamSockets.find(ds->remote);
-  if (it != t_downstreamSockets.end()) {
+  ~TCPConnectionToBackend()
+  {
+    if (d_ds && d_socket) {
+      --d_ds->tcpCurrentConnections;
+      struct timeval now;
+      gettimeofday(&now, nullptr);
+
+      auto diff = now - d_connectionStartTime;
+      d_ds->updateTCPMetrics(d_queries, diff.tv_sec * 1000 + diff.tv_usec / 1000);
+    }
+  }
+
+  int getHandle() const
+  {
+    if (!d_socket) {
+      throw std::runtime_error("Attempt to get the socket handle from a non-established TCP connection");
+    }
+
+    return d_socket->getHandle();
+  }
+
+  const ComboAddress& getRemote() const
+  {
+    return d_ds->remote;
+  }
+
+  bool isFresh() const
+  {
+    return d_fresh;
+  }
+
+  void incQueries()
+  {
+    ++d_queries;
+  }
+
+  void setReused()
+  {
+    d_fresh = false;
+  }
+
+private:
+  std::unique_ptr<Socket> d_socket{nullptr};
+  std::shared_ptr<DownstreamState> d_ds{nullptr};
+  struct timeval d_connectionStartTime;
+  uint64_t d_queries{0};
+  bool d_fresh{true};
+};
+
+static thread_local map<ComboAddress, std::deque<std::unique_ptr<TCPConnectionToBackend>>> t_downstreamConnections;
+
+static std::unique_ptr<TCPConnectionToBackend> getConnectionToDownstream(std::shared_ptr<DownstreamState>& ds, uint16_t& downstreamFailures, const struct timeval& now)
+{
+  std::unique_ptr<TCPConnectionToBackend> result;
+
+  const auto& it = t_downstreamConnections.find(ds->remote);
+  if (it != t_downstreamConnections.end()) {
     auto& list = it->second;
     if (!list.empty()) {
       result = std::move(list.front());
       list.pop_front();
-      isFresh = false;
+      result->setReused();
       return result;
     }
   }
 
-  isFresh = true;
-  return setupTCPDownstream(ds, downstreamFailures, 0);
+  return std::unique_ptr<TCPConnectionToBackend>(new TCPConnectionToBackend(ds, downstreamFailures, now));
 }
 
-static void releaseDownstreamConnection(std::shared_ptr<DownstreamState>& ds, std::unique_ptr<Socket>&& socket)
+static void releaseDownstreamConnection(std::unique_ptr<TCPConnectionToBackend>&& conn)
 {
-  if (socket == nullptr) {
+  if (conn == nullptr) {
     return;
   }
 
-  const auto& it = t_downstreamSockets.find(ds->remote);
-  if (it != t_downstreamSockets.end()) {
+  const auto& remote = conn->getRemote();
+  const auto& it = t_downstreamConnections.find(remote);
+  if (it != t_downstreamConnections.end()) {
     auto& list = it->second;
     if (list.size() >= g_maxCachedConnectionsPerDownstream) {
       /* too many connections queued already */
-      socket.reset();
+      conn.reset();
       return;
     }
-    list.push_back(std::move(socket));
+    list.push_back(std::move(conn));
   }
   else {
-    t_downstreamSockets[ds->remote].push_back(std::move(socket));
+    t_downstreamConnections[remote].push_back(std::move(conn));
   }
 }
 
 struct ConnectionInfo
 {
-  ConnectionInfo(): cs(nullptr), fd(-1)
+  ConnectionInfo(ClientState* cs_): cs(cs_), fd(-1)
   {
   }
   ConnectionInfo(ConnectionInfo&& rhs)
@@ -178,6 +237,9 @@ struct ConnectionInfo
       close(fd);
       fd = -1;
     }
+    if (cs) {
+      --cs->tcpCurrentConnections;
+    }
   }
 
   ComboAddress remote;
@@ -263,13 +325,13 @@ void TCPClientCollection::addTCPClientThread()
 
 static void cleanupClosedTCPConnections()
 {
-  for(auto dsIt = t_downstreamSockets.begin(); dsIt != t_downstreamSockets.end(); ) {
-    for (auto socketIt = dsIt->second.begin(); socketIt != dsIt->second.end(); ) {
-      if (*socketIt && isTCPSocketUsable((*socketIt)->getHandle())) {
-        ++socketIt;
+  for(auto dsIt = t_downstreamConnections.begin(); dsIt != t_downstreamConnections.end(); ) {
+    for (auto connIt = dsIt->second.begin(); connIt != dsIt->second.end(); ) {
+      if (*connIt && isTCPSocketUsable((*connIt)->getHandle())) {
+        ++connIt;
       }
       else {
-        socketIt = dsIt->second.erase(socketIt);
+        connIt = dsIt->second.erase(connIt);
       }
     }
 
@@ -277,7 +339,7 @@ static void cleanupClosedTCPConnections()
       ++dsIt;
     }
     else {
-      dsIt = t_downstreamSockets.erase(dsIt);
+      dsIt = t_downstreamConnections.erase(dsIt);
     }
   }
 }
@@ -353,6 +415,13 @@ public:
   ~IncomingTCPConnectionState()
   {
     decrementTCPClientCount(d_ci.remote);
+    if (d_ci.cs != nullptr) {
+      struct timeval now;
+      gettimeofday(&now, nullptr);
+
+      auto diff = now - d_connectionStartTime;
+      d_ci.cs->updateTCPMetrics(d_queriesCount, diff.tv_sec * 1000.0 + diff.tv_usec / 1000.0);
+    }
 
     if (d_ds != nullptr) {
       if (d_outstanding) {
@@ -360,15 +429,15 @@ public:
         d_outstanding = false;
       }
 
-      if (d_downstreamSocket) {
+      if (d_downstreamConnection) {
         try {
           if (d_lastIOState == IOState::NeedRead) {
-            cerr<<__func__<<": removing leftover backend read FD "<<d_downstreamSocket->getHandle()<<endl;
-            d_threadData.mplexer->removeReadFD(d_downstreamSocket->getHandle());
+            cerr<<__func__<<": removing leftover backend read FD "<<d_downstreamConnection->getHandle()<<endl;
+            d_threadData.mplexer->removeReadFD(d_downstreamConnection->getHandle());
           }
           else if (d_lastIOState == IOState::NeedWrite) {
-            cerr<<__func__<<": removing leftover backend write FD "<<d_downstreamSocket->getHandle()<<endl;
-            d_threadData.mplexer->removeWriteFD(d_downstreamSocket->getHandle());
+            cerr<<__func__<<": removing leftover backend write FD "<<d_downstreamConnection->getHandle()<<endl;
+            d_threadData.mplexer->removeWriteFD(d_downstreamConnection->getHandle());
           }
         }
         catch(const FDMultiplexerException& e) {
@@ -425,7 +494,7 @@ public:
     return now;
   }
 
-  boost::optional<struct timeval> getBackendReadTTD() const
+  boost::optional<struct timeval> getBackendReadTTD(const struct timeval& now) const
   {
     if (d_ds == nullptr) {
       throw std::runtime_error("getBackendReadTTD() without any backend selected");
@@ -434,27 +503,19 @@ public:
       return boost::none;
     }
 
-    struct timeval res;
-    gettimeofday(&res, 0);
-
+    struct timeval res = now;
     res.tv_sec += d_ds->tcpRecvTimeout;
 
     return res;
   }
 
-  boost::optional<struct timeval> getClientWriteTTD(boost::optional<struct timeval> now=boost::none) const
+  boost::optional<struct timeval> getClientWriteTTD(const struct timeval& now) const
   {
     if (g_maxTCPConnectionDuration == 0 && g_tcpSendTimeout == 0) {
       return boost::none;
     }
 
-    struct timeval res;
-    if (now) {
-      res = *now;
-    }
-    else {
-      gettimeofday(&res, 0);
-    }
+    struct timeval res = now;
 
     if (g_maxTCPConnectionDuration > 0) {
       auto elapsed = res.tv_sec - d_connectionStartTime.tv_sec;
@@ -472,7 +533,7 @@ public:
     return res;
   }
 
-  boost::optional<struct timeval> getBackendWriteTTD() const
+  boost::optional<struct timeval> getBackendWriteTTD(const struct timeval& now) const
   {
     if (d_ds == nullptr) {
       throw std::runtime_error("getBackendReadTTD() called without any backend selected");
@@ -481,9 +542,7 @@ public:
       return boost::none;
     }
 
-    struct timeval res;
-    gettimeofday(&res, 0);
-
+    struct timeval res = now;
     res.tv_sec += d_ds->tcpSendTimeout;
 
     return res;
@@ -506,6 +565,40 @@ public:
     return false;
   }
 
+  void dump() const
+  {
+    static std::mutex s_mutex;
+
+    struct timeval now;
+    gettimeofday(&now, 0);
+
+    {
+      std::lock_guard<std::mutex> lock(s_mutex);
+      fprintf(stderr, "State is %p\n", this);
+      cerr << "Current state is " << static_cast<int>(d_state) << ", got "<<d_queriesCount<<" queries so far" << endl;
+      cerr << "Current time is " << now.tv_sec << " - " << now.tv_usec << endl;
+      cerr << "Connection started at " << d_connectionStartTime.tv_sec << " - " << d_connectionStartTime.tv_usec << endl;
+      if (d_state > State::doingHandshake) {
+        cerr << "Handshake done at " << d_handshakeDoneTime.tv_sec << " - " << d_handshakeDoneTime.tv_usec << endl;
+      }
+      if (d_state > State::readingQuerySize) {
+        cerr << "Got first query size at " << d_firstQuerySizeReadTime.tv_sec << " - " << d_firstQuerySizeReadTime.tv_usec << endl;
+      }
+      if (d_state > State::readingQuerySize) {
+        cerr << "Got query size at " << d_querySizeReadTime.tv_sec << " - " << d_querySizeReadTime.tv_usec << endl;
+      }
+      if (d_state > State::readingQuery) {
+        cerr << "Got query at " << d_queryReadTime.tv_sec << " - " << d_queryReadTime.tv_usec << endl;
+      }
+      if (d_state > State::sendingQueryToBackend) {
+        cerr << "Sent query at " << d_querySentTime.tv_sec << " - " << d_querySentTime.tv_usec << endl;
+      }
+      if (d_state > State::readingResponseFromBackend) {
+        cerr << "Got response at " << d_responseReadTime.tv_sec << " - " << d_responseReadTime.tv_usec << endl;
+      }
+    }
+  }
+
   enum class State { doingHandshake, readingQuerySize, readingQuery, sendingQueryToBackend, readingResponseSizeFromBackend, readingResponseFromBackend, sendingResponse };
 
   std::vector<uint8_t> d_buffer;
@@ -514,9 +607,15 @@ public:
   IDState d_ids;
   ConnectionInfo d_ci;
   TCPIOHandler d_handler;
-  std::unique_ptr<Socket> d_downstreamSocket{nullptr};
+  std::unique_ptr<TCPConnectionToBackend> d_downstreamConnection{nullptr};
   std::shared_ptr<DownstreamState> d_ds{nullptr};
   struct timeval d_connectionStartTime;
+  struct timeval d_handshakeDoneTime;
+  struct timeval d_firstQuerySizeReadTime;
+  struct timeval d_querySizeReadTime;
+  struct timeval d_queryReadTime;
+  struct timeval d_querySentTime;
+  struct timeval d_responseReadTime;
   size_t d_currentPos{0};
   size_t d_queriesCount{0};
   unsigned int d_remainingTime{0};
@@ -525,7 +624,6 @@ public:
   uint16_t d_downstreamFailures{0};
   State d_state{State::doingHandshake};
   IOState d_lastIOState{IOState::Done};
-  bool d_freshDownstreamConnection{false};
   bool d_readingFirstQuery{true};
   bool d_outstanding{false};
   bool d_firstResponsePacket{true};
@@ -542,7 +640,7 @@ static void handleResponseSent(std::shared_ptr<IncomingTCPConnectionState>& stat
 {
   handleNewIOState(state, IOState::Done, state->d_ci.fd, handleIOCallback);
 
-  if (state->d_isXFR && state->d_downstreamSocket) {
+  if (state->d_isXFR && state->d_downstreamConnection) {
     /* we need to resume reading from the backend! */
     state->d_state = IncomingTCPConnectionState::State::readingResponseSizeFromBackend;
     state->d_currentPos = 0;
@@ -643,7 +741,7 @@ static void sendQueryToBackend(std::shared_ptr<IncomingTCPConnectionState>& stat
   state->d_state = IncomingTCPConnectionState::State::sendingQueryToBackend;
   state->d_currentPos = 0;
   state->d_firstResponsePacket = true;
-  state->d_downstreamSocket.reset();
+  state->d_downstreamConnection.reset();
 
   if (state->d_xfrStarted) {
     /* sorry, but we are not going to resume a XFR if we have already sent some packets
@@ -653,9 +751,9 @@ static void sendQueryToBackend(std::shared_ptr<IncomingTCPConnectionState>& stat
 
   while (state->d_downstreamFailures < state->d_ds->retries)
   {
-    state->d_downstreamSocket = getConnectionToDownstream(ds, state->d_downstreamFailures, state->d_freshDownstreamConnection);
+    state->d_downstreamConnection = getConnectionToDownstream(ds, state->d_downstreamFailures, now);
 
-    if (!state->d_downstreamSocket) {
+    if (!state->d_downstreamConnection) {
       ++ds->tcpGaveUp;
       ++state->d_ci.cs->tcpGaveUp;
       vinfolog("Downstream connection to %s failed %d times in a row, giving up.", ds->getName(), state->d_downstreamFailures);
@@ -789,11 +887,11 @@ static void handleNewIOState(std::shared_ptr<IncomingTCPConnectionState>& state,
 
 static void handleDownstreamIO(std::shared_ptr<IncomingTCPConnectionState>& state, struct timeval& now)
 {
-  if (state->d_downstreamSocket == nullptr) {
+  if (state->d_downstreamConnection == nullptr) {
     throw std::runtime_error("No downstream socket in " + std::string(__func__) + "!");
   }
 
-  int fd = state->d_downstreamSocket->getHandle();
+  int fd = state->d_downstreamConnection->getHandle();
   IOState iostate = IOState::Done;
   bool connectionDied = false;
 
@@ -801,7 +899,7 @@ static void handleDownstreamIO(std::shared_ptr<IncomingTCPConnectionState>& stat
     if (state->d_state == IncomingTCPConnectionState::State::sendingQueryToBackend) {
       int socketFlags = 0;
 #ifdef MSG_FASTOPEN
-      if (state->d_ds->tcpFastOpen && state->d_freshDownstreamConnection) {
+      if (state->d_ds->tcpFastOpen && state->d_downstreamConnection->isFresh()) {
         socketFlags |= MSG_FASTOPEN;
       }
 #endif /* MSG_FASTOPEN */
@@ -809,8 +907,10 @@ static void handleDownstreamIO(std::shared_ptr<IncomingTCPConnectionState>& stat
       size_t sent = sendMsgWithTimeout(fd, reinterpret_cast<const char *>(&state->d_buffer.at(state->d_currentPos)), state->d_buffer.size() - state->d_currentPos, 0, &state->d_ds->remote, &state->d_ds->sourceAddr, state->d_ds->sourceItf, 0, socketFlags);
       if (sent == state->d_buffer.size()) {
         /* request sent ! */
+        state->d_downstreamConnection->incQueries();
         state->d_state = IncomingTCPConnectionState::State::readingResponseSizeFromBackend;
         state->d_currentPos = 0;
+        state->d_querySentTime = now;
         iostate = IOState::NeedRead;
         if (!state->d_isXFR) {
           /* don't bother with the outstanding count for XFR queries */
@@ -822,7 +922,7 @@ static void handleDownstreamIO(std::shared_ptr<IncomingTCPConnectionState>& stat
         state->d_currentPos += sent;
         iostate = IOState::NeedWrite;
         /* disable fast open on partial write */
-        state->d_freshDownstreamConnection = false;
+        state->d_downstreamConnection->setReused();
       }
     }
 
@@ -850,10 +950,11 @@ static void handleDownstreamIO(std::shared_ptr<IncomingTCPConnectionState>& stat
           /* but don't reset it either, we will need to read more messages */
         }
         else {
-          releaseDownstreamConnection(state->d_ds, std::move(state->d_downstreamSocket));
+          releaseDownstreamConnection(std::move(state->d_downstreamConnection));
         }
         fd = -1;
 
+        state->d_responseReadTime = now;
         handleResponse(state, now);
         return;
       }
@@ -879,7 +980,7 @@ static void handleDownstreamIO(std::shared_ptr<IncomingTCPConnectionState>& stat
     }
 
     /* don't increase this counter when reusing connections */
-    if (state->d_freshDownstreamConnection) {
+    if (state->d_downstreamConnection->isFresh()) {
       ++state->d_downstreamFailures;
     }
     if (state->d_outstanding && state->d_ds != nullptr) {
@@ -895,7 +996,7 @@ static void handleDownstreamIO(std::shared_ptr<IncomingTCPConnectionState>& stat
     handleNewIOState(state, iostate, fd, handleDownstreamIOCallback);
   }
   else {
-    handleNewIOState(state, iostate, fd, handleDownstreamIOCallback, iostate == IOState::NeedRead ? state->getBackendReadTTD() : state->getBackendWriteTTD());
+    handleNewIOState(state, iostate, fd, handleDownstreamIOCallback, iostate == IOState::NeedRead ? state->getBackendReadTTD(now) : state->getBackendWriteTTD(now));
   }
 
   if (connectionDied) {
@@ -906,11 +1007,11 @@ static void handleDownstreamIO(std::shared_ptr<IncomingTCPConnectionState>& stat
 static void handleDownstreamIOCallback(int fd, FDMultiplexer::funcparam_t& param)
 {
   auto state = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(param);
-  if (state->d_downstreamSocket == nullptr) {
+  if (state->d_downstreamConnection == nullptr) {
     throw std::runtime_error("No downstream socket in " + std::string(__func__) + "!");
   }
-  if (fd != state->d_downstreamSocket->getHandle()) {
-    throw std::runtime_error("Unexpected socket descriptor " + std::to_string(fd) + " received in " + std::string(__func__) + ", expected " + std::to_string(state->d_downstreamSocket->getHandle()));
+  if (fd != state->d_downstreamConnection->getHandle()) {
+    throw std::runtime_error("Unexpected socket descriptor " + std::to_string(fd) + " received in " + std::string(__func__) + ", expected " + std::to_string(state->d_downstreamConnection->getHandle()));
   }
 
   struct timeval now;
@@ -933,6 +1034,7 @@ static void handleIO(std::shared_ptr<IncomingTCPConnectionState>& state, struct
     if (state->d_state == IncomingTCPConnectionState::State::doingHandshake) {
       iostate = state->d_handler.tryHandshake();
       if (iostate == IOState::Done) {
+        state->d_handshakeDoneTime = now;
         state->d_state = IncomingTCPConnectionState::State::readingQuerySize;
       }
     }
@@ -941,6 +1043,10 @@ static void handleIO(std::shared_ptr<IncomingTCPConnectionState>& state, struct
       iostate = state->d_handler.tryRead(state->d_buffer, state->d_currentPos, sizeof(uint16_t) - state->d_currentPos);
       if (iostate == IOState::Done) {
         state->d_state = IncomingTCPConnectionState::State::readingQuery;
+        state->d_querySizeReadTime = now;
+        if (state->d_queriesCount == 0) {
+          state->d_firstQuerySizeReadTime = now;
+        }
         state->d_querySize = state->d_buffer.at(0) * 256 + state->d_buffer.at(1);
         if (state->d_querySize < sizeof(dnsheader)) {
           /* go away */
@@ -1144,13 +1250,14 @@ void tcpAcceptorThread(void* p)
     tcpClientCountIncremented = false;
     try {
       socklen_t remlen = remote.getSocklen();
-      ci = std::unique_ptr<ConnectionInfo>(new ConnectionInfo);
-      ci->cs = cs;
+      ci = std::unique_ptr<ConnectionInfo>(new ConnectionInfo(cs));
 #ifdef HAVE_ACCEPT4
       ci->fd = accept4(cs->tcpFD, reinterpret_cast<struct sockaddr*>(&remote), &remlen, SOCK_NONBLOCK);
 #else
       ci->fd = accept(cs->tcpFD, reinterpret_cast<struct sockaddr*>(&remote), &remlen);
 #endif
+      ++cs->tcpCurrentConnections;
+
       if(ci->fd < 0) {
         throw std::runtime_error((boost::format("accepting new connection on socket: %s") % strerror(errno)).str());
       }
index 745a798f022f11c247f66e70ea820a272693a09a..291e16661d7cb1b7af40c158ca8fe53dc7f42552 100644 (file)
@@ -470,6 +470,12 @@ static void connectionThread(int sock, ComboAddress remote)
         output << "# TYPE " << statesbase << "tcpreadtimeouts "        << "counter"                                                           << "\n";
         output << "# HELP " << statesbase << "tcpwritetimeouts "       << "The number of TCP write timeouts"                                  << "\n";
         output << "# TYPE " << statesbase << "tcpwritetimeouts "       << "counter"                                                           << "\n";
+        output << "# HELP " << statesbase << "tcpcurrentconnections "  << "The number of current TCP connections"                             << "\n";
+        output << "# TYPE " << statesbase << "tcpcurrentconnections "  << "gauge"                                                             << "\n";
+        output << "# HELP " << statesbase << "tcpavgqueriesperconn "   << "The average number of queries per TCP connection"                  << "\n";
+        output << "# TYPE " << statesbase << "tcpavgqueriesperconn "   << "gauge"                                                             << "\n";
+        output << "# HELP " << statesbase << "tcpavgconnduration "     << "The average duration of a TCP connection (ms)"                     << "\n";
+        output << "# TYPE " << statesbase << "tcpavgconnduration "     << "gauge"                                                             << "\n";
 
         for (const auto& state : *states) {
           string serverName;
@@ -484,18 +490,21 @@ static void connectionThread(int sock, ComboAddress remote)
           const std::string label = boost::str(boost::format("{server=\"%1%\",address=\"%2%\"}")
             % serverName % state->remote.toStringWithPort());
 
-          output << statesbase << "queries"                << label << " " << state->queries.load()         << "\n";
-          output << statesbase << "drops"                  << label << " " << state->reuseds.load()         << "\n";
-          output << statesbase << "latency"                << label << " " << state->latencyUsec/1000.0     << "\n";
-          output << statesbase << "senderrors"             << label << " " << state->sendErrors.load()      << "\n";
-          output << statesbase << "outstanding"            << label << " " << state->outstanding.load()     << "\n";
-          output << statesbase << "order"                  << label << " " << state->order                  << "\n";
-          output << statesbase << "weight"                 << label << " " << state->weight                 << "\n";
-          output << statesbase << "tcpdiedsendingquery"    << label << " " << state->tcpDiedSendingQuery    << "\n";
-          output << statesbase << "tcpdiedreadingresponse" << label << " " << state->tcpDiedReadingResponse << "\n";
-          output << statesbase << "tcpgaveup"              << label << " " << state->tcpGaveUp              << "\n";
-          output << statesbase << "tcpreadtimeouts"        << label << " " << state->tcpReadTimeouts        << "\n";
-          output << statesbase << "tcpwritetimeouts"       << label << " " << state->tcpWriteTimeouts       << "\n";
+          output << statesbase << "queries"                << label << " " << state->queries.load()             << "\n";
+          output << statesbase << "drops"                  << label << " " << state->reuseds.load()             << "\n";
+          output << statesbase << "latency"                << label << " " << state->latencyUsec/1000.0         << "\n";
+          output << statesbase << "senderrors"             << label << " " << state->sendErrors.load()          << "\n";
+          output << statesbase << "outstanding"            << label << " " << state->outstanding.load()         << "\n";
+          output << statesbase << "order"                  << label << " " << state->order                      << "\n";
+          output << statesbase << "weight"                 << label << " " << state->weight                     << "\n";
+          output << statesbase << "tcpdiedsendingquery"    << label << " " << state->tcpDiedSendingQuery        << "\n";
+          output << statesbase << "tcpdiedreadingresponse" << label << " " << state->tcpDiedReadingResponse     << "\n";
+          output << statesbase << "tcpgaveup"              << label << " " << state->tcpGaveUp                  << "\n";
+          output << statesbase << "tcpreadtimeouts"        << label << " " << state->tcpReadTimeouts            << "\n";
+          output << statesbase << "tcpwritetimeouts"       << label << " " << state->tcpWriteTimeouts           << "\n";
+          output << statesbase << "tcpcurrentconnections"  << label << " " << state->tcpCurrentConnections      << "\n";
+          output << statesbase << "tcpavgqueriesperconn"   << label << " " << state->tcpAvgQueriesPerConnection << "\n";
+          output << statesbase << "tcpavgconnduration"     << label << " " << state->tcpAvgConnectionDuration   << "\n";
         }
 
         for (const auto& front : g_frontends) {
@@ -582,6 +591,9 @@ static void connectionThread(int sock, ComboAddress remote)
           {"tcpGaveUp", (double)a->tcpGaveUp},
           {"tcpReadTimeouts", (double)a->tcpReadTimeouts},
           {"tcpWriteTimeouts", (double)a->tcpWriteTimeouts},
+          {"tcpCurrentConnections", (double)a->tcpCurrentConnections},
+          {"tcpAvgQueriesPerConnection", (double)a->tcpAvgQueriesPerConnection},
+          {"tcpAvgConnectionDuration", (double)a->tcpAvgConnectionDuration},
           {"dropRate", (double)a->dropRate}
         };
 
@@ -610,6 +622,9 @@ static void connectionThread(int sock, ComboAddress remote)
           { "tcpGaveUp", (double) front->tcpGaveUp.load() },
           { "tcpClientTimeouts", (double) front->tcpClientTimeouts },
           { "tcpDownstreamTimeouts", (double) front->tcpDownstreamTimeouts },
+          { "tcpCurrentConnections", (double) front->tcpCurrentConnections },
+          { "tcpAvgQueriesPerConnection", (double) front->tcpAvgQueriesPerConnection },
+          { "tcpAvgConnectionDuration", (double) front->tcpAvgConnectionDuration },
         };
         frontends.push_back(frontend);
       }
index 0a3eb01f0b6a3d17737bf2a7196f3e670e60b808..663e18061bdb115b3deaef847b91d12ef1fa5d5e 100644 (file)
@@ -591,6 +591,10 @@ struct ClientState
   std::atomic<uint64_t> tcpGaveUp{0};
   std::atomic<uint64_t> tcpClientTimeouts{0};
   std::atomic<uint64_t> tcpDownstreamTimeouts{0};
+  std::atomic<uint64_t> tcpCurrentConnections{0};
+  std::atomic<double> tcpAvgQueriesPerConnection{0.0};
+  /* in ms */
+  std::atomic<double> tcpAvgConnectionDuration{0.0};
   int udpFD{-1};
   int tcpFD{-1};
   bool muted{false};
@@ -633,6 +637,12 @@ struct ClientState
     d_filter = bpf;
   }
 #endif /* HAVE_EBPF */
+
+  void updateTCPMetrics(size_t queries, uint64_t durationMs)
+  {
+    tcpAvgQueriesPerConnection = (99.0 * tcpAvgQueriesPerConnection / 100.0) + (queries / 100.0);
+    tcpAvgConnectionDuration = (99.0 * tcpAvgConnectionDuration / 100.0) + (durationMs / 100.0);
+  }
 };
 
 class TCPClientCollection {
@@ -744,6 +754,10 @@ struct DownstreamState
   std::atomic<uint64_t> tcpGaveUp{0};
   std::atomic<uint64_t> tcpReadTimeouts{0};
   std::atomic<uint64_t> tcpWriteTimeouts{0};
+  std::atomic<uint64_t> tcpCurrentConnections{0};
+  std::atomic<double> tcpAvgQueriesPerConnection{0.0};
+  /* in ms */
+  std::atomic<double> tcpAvgConnectionDuration{0.0};
   string name;
   size_t socketsOffset{0};
   double queryLoad{0.0};
@@ -815,6 +829,12 @@ struct DownstreamState
   void hash();
   void setId(const boost::uuids::uuid& newId);
   void setWeight(int newWeight);
+
+  void updateTCPMetrics(size_t queries, uint64_t durationMs)
+  {
+    tcpAvgQueriesPerConnection = (99.0 * tcpAvgQueriesPerConnection / 100.0) + (queries / 100.0);
+    tcpAvgConnectionDuration = (99.0 * tcpAvgConnectionDuration / 100.0) + (durationMs / 100.0);
+  }
 };
 using servers_t =vector<std::shared_ptr<DownstreamState>>;
 
@@ -1010,7 +1030,7 @@ extern std::string g_apiConfigDirectory;
 extern bool g_servFailOnNoPolicy;
 extern uint32_t g_hashperturb;
 extern bool g_useTCPSinglePipe;
-extern std::atomic<uint16_t> g_downstreamTCPCleanupInterval;
+extern uint16_t g_downstreamTCPCleanupInterval;
 extern size_t g_udpVectorSize;
 extern bool g_preserveTrailingData;
 extern bool g_allowEmptyResponse;