]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
dnsdist: Match backend connections on the backend, not its address
authorRemi Gacogne <remi.gacogne@powerdns.com>
Thu, 10 Sep 2020 15:05:04 +0000 (17:05 +0200)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Tue, 10 Nov 2020 08:46:54 +0000 (09:46 +0100)
pdns/dnsdist-tcp.cc
pdns/dnsdistdist/dnsdist-tcp-downstream.cc
pdns/dnsdistdist/dnsdist-tcp-upstream.hh

index dae2462a6338bdf6a6e33c78a6c13ea03c38c051..44ad539ef597700631424f4c2fc0fcd1cda75036 100644 (file)
@@ -33,7 +33,6 @@
 #include "dnsdist-xpf.hh"
 #include "dnsparser.hh"
 #include "dolog.hh"
-#include "ednsoptions.hh"
 #include "gettime.hh"
 #include "lock.hh"
 #include "sstuff.hh"
@@ -74,7 +73,7 @@ public:
   {
     std::shared_ptr<TCPConnectionToBackend> result;
 
-    const auto& it = t_downstreamConnections.find(ds->remote);
+    const auto& it = t_downstreamConnections.find(ds);
     if (it != t_downstreamConnections.end()) {
       auto& list = it->second;
       if (!list.empty()) {
@@ -99,8 +98,8 @@ public:
       return;
     }
 
-    const auto& remote = conn->getRemote();
-    const auto& it = t_downstreamConnections.find(remote);
+    const auto& ds = conn->getDS();
+    const auto& it = t_downstreamConnections.find(ds);
     if (it != t_downstreamConnections.end()) {
       auto& list = it->second;
       if (list.size() >= s_maxCachedConnectionsPerDownstream) {
@@ -112,7 +111,7 @@ public:
       list.push_back(std::move(conn));
     }
     else {
-      t_downstreamConnections[remote].push_back(std::move(conn));
+      t_downstreamConnections[ds].push_back(std::move(conn));
     }
   }
 
@@ -138,11 +137,11 @@ public:
   }
 
 private:
-  static thread_local map<ComboAddress, std::deque<std::shared_ptr<TCPConnectionToBackend>>> t_downstreamConnections;
+  static thread_local map<std::shared_ptr<DownstreamState>, std::deque<std::shared_ptr<TCPConnectionToBackend>>> t_downstreamConnections;
   static const size_t s_maxCachedConnectionsPerDownstream;
 };
 
-thread_local map<ComboAddress, std::deque<std::shared_ptr<TCPConnectionToBackend>>> DownstreamConnectionsManager::t_downstreamConnections;
+thread_local map<std::shared_ptr<DownstreamState>, std::deque<std::shared_ptr<TCPConnectionToBackend>>> DownstreamConnectionsManager::t_downstreamConnections;
 const size_t DownstreamConnectionsManager::s_maxCachedConnectionsPerDownstream{20};
 
 static void decrementTCPClientCount(const ComboAddress& client)
@@ -158,7 +157,6 @@ static void decrementTCPClientCount(const ComboAddress& client)
 
 IncomingTCPConnectionState::~IncomingTCPConnectionState()
 {
-  DEBUGLOG("in "<<__PRETTY_FUNCTION__);
   decrementTCPClientCount(d_ci.remote);
 
   if (d_ci.cs != nullptr) {
@@ -311,7 +309,6 @@ static IOState handleResponseSent(std::shared_ptr<IncomingTCPConnectionState>& s
   }
 
   if (state->d_queuedResponses.empty()) {
-    DEBUGLOG("no response remaining");
     if (state->d_isXFR) {
       /* we should still be reading from the backend, and we don't want to read from the client */
       state->d_state = IncomingTCPConnectionState::State::idle;
@@ -319,12 +316,13 @@ static IOState handleResponseSent(std::shared_ptr<IncomingTCPConnectionState>& s
       DEBUGLOG("idling for XFR completion");
       return IOState::Done;
     } else {
-      DEBUGLOG("reading new queries if any");
       if (state->canAcceptNewQueries()) {
+        DEBUGLOG("waiting for new queries");
         state->resetForNewQuery();
         return IOState::NeedRead;
       }
       else {
+        DEBUGLOG("idling");
         state->d_state = IncomingTCPConnectionState::State::idle;
         return IOState::Done;
       }
@@ -348,13 +346,11 @@ bool IncomingTCPConnectionState::canAcceptNewQueries() const
     return false;
   }
 
-  // d_state ?
   if (d_currentQueriesCount >= d_ci.cs->d_maxInFlightQueriesPerConn) {
     DEBUGLOG("not accepting new queries because we already have "<<d_currentQueriesCount<<" out of "<<d_ci.cs->d_maxInFlightQueriesPerConn);
     return false;
   }
 
-  DEBUGLOG("accepting new queries");
   return true;
 }
 
@@ -371,18 +367,18 @@ void IncomingTCPConnectionState::resetForNewQuery()
 
 std::shared_ptr<TCPConnectionToBackend> IncomingTCPConnectionState::getActiveDownstreamConnection(const std::shared_ptr<DownstreamState>& ds)
 {
-  auto it = d_activeConnectionsToBackend.find(ds->remote);
+  auto it = d_activeConnectionsToBackend.find(ds);
   if (it == d_activeConnectionsToBackend.end()) {
-    DEBUGLOG("no active connection found for "<<ds->remote.toString());
+    DEBUGLOG("no active connection found for "<<ds->getName());
     return nullptr;
   }
 
   for (auto& conn : it->second) {
     if (conn->canAcceptNewQueries()) {
-      DEBUGLOG("Got one active connection accepting more for "<<ds->remote.toString());
+      DEBUGLOG("Got one active connection accepting more for "<<ds->getName());
       return conn;
     }
-    DEBUGLOG("not accepting more for "<<ds->remote.toString());
+    DEBUGLOG("not accepting more for "<<ds->getName());
   }
 
   return nullptr;
@@ -390,13 +386,12 @@ std::shared_ptr<TCPConnectionToBackend> IncomingTCPConnectionState::getActiveDow
 
 void IncomingTCPConnectionState::registerActiveDownstreamConnection(std::shared_ptr<TCPConnectionToBackend>& conn)
 {
-  d_activeConnectionsToBackend[conn->getRemote()].push_front(conn);
+  d_activeConnectionsToBackend[conn->getDS()].push_front(conn);
 }
 
 /* this version is called when the buffer has been set and the rules have been processed */
 void IncomingTCPConnectionState::sendResponse(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now, TCPResponse&& response)
 {
-  DEBUGLOG("in "<<__PRETTY_FUNCTION__);
   // if we already reading a query (not the query size, mind you), or sending a response we need to either queue the response
   // otherwise we can start sending it right away
   if (state->d_state == IncomingTCPConnectionState::State::idle ||
@@ -425,9 +420,9 @@ void IncomingTCPConnectionState::sendResponse(std::shared_ptr<IncomingTCPConnect
 /* this version is called from the backend code when a new response has been received */
 void IncomingTCPConnectionState::handleResponse(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now, TCPResponse&& response)
 {
-  DEBUGLOG("in "<<__PRETTY_FUNCTION__);
-  if (response.d_connection && response.d_connection->isIdle()) {
-    auto& list = d_activeConnectionsToBackend.at(response.d_connection->getRemote());
+  // if we have added a TCP Proxy Protocol payload to a connection, don't release it yet, no one else will be able to use it anyway
+  if (!state->d_isXFR && response.d_connection && response.d_connection->isIdle() && response.d_connection->canBeReused()) {
+    auto& list = d_activeConnectionsToBackend.at(response.d_connection->getDS());
     for (auto it = list.begin(); it != list.end(); ++it) {
       if (*it == response.d_connection) {
         DownstreamConnectionsManager::releaseDownstreamConnection(std::move(*it));
@@ -650,7 +645,6 @@ void IncomingTCPConnectionState::handleIOCallback(int fd, FDMultiplexer::funcpar
 
 void IncomingTCPConnectionState::handleIO(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now)
 {
-  DEBUGLOG("in "<<__PRETTY_FUNCTION__);
   // why do we loop? Because the TLS layer does buffering, and thus can have data ready to read
   // even though the underlying socket is not ready, so we need to actually ask for the data first
   bool wouldBlock = false;
@@ -728,7 +722,6 @@ void IncomingTCPConnectionState::handleIO(std::shared_ptr<IncomingTCPConnectionS
           DEBUGLOG("query received");
 
           if (handleQuery(state, now)) {
-            DEBUGLOG("handle query returned true");
             // if the query has been passed to a backend, or dropped, we can start
             // reading again, or sending queued responses
             if (state->d_queuedResponses.empty()) {
@@ -752,7 +745,6 @@ void IncomingTCPConnectionState::handleIO(std::shared_ptr<IncomingTCPConnectionS
           else {
             /* otherwise the state should already be waiting for
                the socket to be writable */
-            DEBUGLOG("should be waiting for writable socket");
             ioGuard.release();
             return;
           }
@@ -822,7 +814,6 @@ void IncomingTCPConnectionState::handleIO(std::shared_ptr<IncomingTCPConnectionS
 
 void IncomingTCPConnectionState::notifyIOError(std::shared_ptr<IncomingTCPConnectionState>& state, IDState&& query, const struct timeval& now)
 {
-  DEBUGLOG("in "<<__PRETTY_FUNCTION__);
   if (d_state == State::sendingResponse) {
     /* if we have responses to send, let's do that first */
   }
index 0a5a11b1df7a1c2c53110f6c7bffeb47ed008814..8b5e56e0bb4798ed4c8bf6f40b5843e106de851b 100644 (file)
@@ -6,13 +6,21 @@ const uint16_t TCPConnectionToBackend::s_xfrID = 0;
 
 void TCPConnectionToBackend::assignToClientConnection(std::shared_ptr<IncomingTCPConnectionState>& clientConn, bool isXFR)
 {
-  DEBUGLOG("in "<<__PRETTY_FUNCTION__);
+  if (d_usedForXFR == true) {
+    throw std::runtime_error("Trying to send a query over a backend connection used for XFR");
+  }
+
   if (isXFR) {
     d_usedForXFR = true;
   }
 
-  d_clientConn = clientConn;
-  d_ioState = make_unique<IOStateHandler>(clientConn->getIOMPlexer(), d_socket->getHandle());
+  if (!d_clientConn) {
+    d_clientConn = clientConn;
+    d_ioState = make_unique<IOStateHandler>(clientConn->getIOMPlexer(), d_socket->getHandle());
+  }
+  else if (d_clientConn != clientConn) {
+    throw std::runtime_error("Assigning a query from a different client to an existing backend connection with pending queries");
+  }
 }
 
 IOState TCPConnectionToBackend::sendNextQuery(std::shared_ptr<TCPConnectionToBackend>& conn)
@@ -63,7 +71,6 @@ static IOState tryRead(int fd, std::vector<uint8_t>& buffer, size_t& pos, size_t
 
 void TCPConnectionToBackend::handleIO(std::shared_ptr<TCPConnectionToBackend>& conn, const struct timeval& now)
 {
-  DEBUGLOG("in "<<__PRETTY_FUNCTION__);
   if (conn->d_socket == nullptr) {
     throw std::runtime_error("No downstream socket in " + std::string(__PRETTY_FUNCTION__) + "!");
   }
@@ -75,7 +82,7 @@ void TCPConnectionToBackend::handleIO(std::shared_ptr<TCPConnectionToBackend>& c
 
   try {
     if (conn->d_state == State::sendingQueryToBackend) {
-      DEBUGLOG("sending query to backend over FD "<<fd);
+      DEBUGLOG("sending query to backend "<<conn->getDS()->getName()<<" over FD "<<fd);
       int socketFlags = 0;
 #ifdef MSG_FASTOPEN
       if (conn->isFastOpenEnabled()) {
@@ -89,7 +96,7 @@ void TCPConnectionToBackend::handleIO(std::shared_ptr<TCPConnectionToBackend>& c
         /* request sent ! */
         conn->incQueries();
         conn->d_currentPos = 0;
-        //conn->d_currentQuery.d_querySentTime = now;
+
         DEBUGLOG("adding a pending response for ID "<<conn->d_currentQuery.d_idstate.origID<<" and QNAME "<<conn->d_currentQuery.d_idstate.qname);
         conn->d_pendingResponses[conn->d_currentQuery.d_idstate.origID] = std::move(conn->d_currentQuery);
         conn->d_currentQuery.d_buffer.clear();
@@ -117,7 +124,7 @@ void TCPConnectionToBackend::handleIO(std::shared_ptr<TCPConnectionToBackend>& c
       // then we need to allocate a new buffer (new because we might need to re-send the query if the
       // backend dies on us)
       // We also might need to read and send to the client more than one response in case of XFR (yeah!)
-      // should very likely be a TCPIOHandler d_downstreamHandler
+      // should very likely be a TCPIOHandler
       conn->d_responseBuffer.resize(sizeof(uint16_t));
       iostate = tryRead(fd, conn->d_responseBuffer, conn->d_currentPos, sizeof(uint16_t) - conn->d_currentPos);
       if (iostate == IOState::Done) {
@@ -135,7 +142,6 @@ void TCPConnectionToBackend::handleIO(std::shared_ptr<TCPConnectionToBackend>& c
       iostate = tryRead(fd, conn->d_responseBuffer, conn->d_currentPos, conn->d_responseSize - conn->d_currentPos);
       if (iostate == IOState::Done) {
         DEBUGLOG("got response from backend");
-        //conn->d_responseReadTime = now;
         try {
           iostate = conn->handleResponse(conn, now);
         }
@@ -170,15 +176,6 @@ void TCPConnectionToBackend::handleIO(std::shared_ptr<TCPConnectionToBackend>& c
       ++conn->d_downstreamFailures;
     }
 
-#if 0
-    if (conn->d_outstanding) {
-      conn->d_outstanding = false;
-
-      if (conn->d_ds != nullptr) {
-        --conn->d_ds->outstanding;
-      }
-    }
-#endif
     /* remove this FD from the IO multiplexer */
     iostate = IOState::Done;
     connectionDied = true;
@@ -189,15 +186,14 @@ void TCPConnectionToBackend::handleIO(std::shared_ptr<TCPConnectionToBackend>& c
     DEBUGLOG("connection died, number of failures is "<<conn->d_downstreamFailures<<", retries is "<<conn->d_ds->retries);
 
     if ((!conn->d_usedForXFR || conn->d_queries == 0) && conn->d_downstreamFailures < conn->d_ds->retries) {
-      DEBUGLOG("reconnecting");
+
       conn->d_ioState->reset();
       ioGuard.release();
 
       if (conn->reconnect()) {
-        DEBUGLOG("reconnected");
-
         conn->d_ioState = make_unique<IOStateHandler>(conn->d_clientConn->getIOMPlexer(), conn->d_socket->getHandle());
 
+        /* we need to resend the queries that were in flight, if any */
         for (auto& pending : conn->d_pendingResponses) {
           conn->d_pendingQueries.push_back(std::move(pending.second));
         }
@@ -252,7 +248,6 @@ void TCPConnectionToBackend::handleIOCallback(int fd, FDMultiplexer::funcparam_t
 
 void TCPConnectionToBackend::queueQuery(TCPQuery&& query, std::shared_ptr<TCPConnectionToBackend>& sharedSelf)
 {
-  DEBUGLOG("in "<<__PRETTY_FUNCTION__);
   if (d_ioState == nullptr) {
     throw std::runtime_error("Trying to queue a query to a TCP connection that has no incoming client connection assigned");
   }
@@ -269,8 +264,6 @@ void TCPConnectionToBackend::queueQuery(TCPQuery&& query, std::shared_ptr<TCPCon
       d_proxyProtocolPayloadAdded = true;
     }
 
-    DEBUGLOG("need write");
-
     struct timeval now;
     gettimeofday(&now, 0);
 
@@ -280,7 +273,6 @@ void TCPConnectionToBackend::queueQuery(TCPQuery&& query, std::shared_ptr<TCPCon
     // store query in the list of queries to send
     d_pendingQueries.push_back(std::move(query));
   }
-  DEBUGLOG("out of "<<__PRETTY_FUNCTION__);
 }
 
 bool TCPConnectionToBackend::reconnect()
@@ -297,9 +289,11 @@ bool TCPConnectionToBackend::reconnect()
 
   do {
     vinfolog("TCP connecting to downstream %s (%d)", d_ds->getNameWithAddr(), d_downstreamFailures);
+    DEBUGLOG("Opening TCP connection to backend "<<d_ds->getNameWithAddr());
     try {
       result = std::unique_ptr<Socket>(new Socket(d_ds->remote.sin4.sin_family, SOCK_STREAM, 0));
       DEBUGLOG("result of connect is "<<result->getHandle());
+
       if (!IsAnyAddress(d_ds->sourceAddr)) {
         SSetsockopt(result->getHandle(), SOL_SOCKET, SO_REUSEADDR, 1);
 #ifdef IP_BIND_ADDRESS_NO_PORT
@@ -327,7 +321,6 @@ bool TCPConnectionToBackend::reconnect()
 #endif /* MSG_FASTOPEN */
 
       d_socket = std::move(result);
-      DEBUGLOG("connected new socket "<<d_socket->getHandle());
       ++d_ds->tcpCurrentConnections;
       return true;
     }
@@ -362,7 +355,6 @@ void TCPConnectionToBackend::handleTimeout(const struct timeval& now, bool write
 
 void TCPConnectionToBackend::notifyAllQueriesFailed(const struct timeval& now, bool timeout)
 {
-  DEBUGLOG("in "<<__PRETTY_FUNCTION__);
   d_connectionDied = true;
 
   auto& clientConn = d_clientConn;
@@ -376,7 +368,7 @@ void TCPConnectionToBackend::notifyAllQueriesFailed(const struct timeval& now, b
     ++clientConn->d_ci.cs->tcpDownstreamTimeouts;
   }
 
-  if (d_state == State::doingHandshake || d_state == State::sendingQueryToBackend) {
+  if (d_state == State::sendingQueryToBackend) {
     clientConn->notifyIOError(clientConn, std::move(d_currentQuery.d_idstate), now);
   }
 
@@ -396,13 +388,10 @@ void TCPConnectionToBackend::notifyAllQueriesFailed(const struct timeval& now, b
 
 IOState TCPConnectionToBackend::handleResponse(std::shared_ptr<TCPConnectionToBackend>& conn, const struct timeval& now)
 {
-  DEBUGLOG("in "<<__PRETTY_FUNCTION__);
-
   d_downstreamFailures = 0;
 
   auto& clientConn = d_clientConn;
   if (!clientConn->active()) {
-    DEBUGLOG("client is not active");
     // a client timeout occured, or something like that */
     d_connectionDied = true;
     d_clientConn.reset();
@@ -418,11 +407,10 @@ IOState TCPConnectionToBackend::handleResponse(std::shared_ptr<TCPConnectionToBa
     d_state = State::readingResponseSizeFromBackend;
     d_currentPos = 0;
     d_responseBuffer.resize(sizeof(uint16_t));
-    return IOState::NeedRead;
     // get ready to read the next packet, if any
+    return IOState::NeedRead;
   }
   else {
-    DEBUGLOG("not XFR, phew");
     uint16_t queryId = 0;
     try {
       queryId = getQueryIdFromResponse();
@@ -439,6 +427,7 @@ IOState TCPConnectionToBackend::handleResponse(std::shared_ptr<TCPConnectionToBa
       notifyAllQueriesFailed(now);
       return IOState::Done;
     }
+
     auto ids = std::move(it->second.d_idstate);
     d_pendingResponses.erase(it);
     DEBUGLOG("passing response to client connection for "<<ids.qname);
@@ -460,7 +449,7 @@ IOState TCPConnectionToBackend::handleResponse(std::shared_ptr<TCPConnectionToBa
       return IOState::NeedRead;
     }
     else {
-      DEBUGLOG("nothing to do, phewwwww");
+      DEBUGLOG("nothing to do, waiting for a new query");
       d_state = State::idle;
       d_clientConn.reset();
       return IOState::Done;
index 721559256ea4caae3ebd963a69b77219a9a3967e..a8f69ef4ffd595e1a7264eb2a07b14579336169c 100644 (file)
@@ -195,7 +195,7 @@ public:
 
   enum class State { doingHandshake, readingQuerySize, readingQuery, sendingResponse, idle /* in case of XFR, we stop processing queries */ };
 
-  std::map<ComboAddress, std::deque<std::shared_ptr<TCPConnectionToBackend>>> d_activeConnectionsToBackend;
+  std::map<std::shared_ptr<DownstreamState>, std::deque<std::shared_ptr<TCPConnectionToBackend>>> d_activeConnectionsToBackend;
   std::vector<uint8_t> d_buffer;
   std::deque<TCPResponse> d_queuedResponses;
   TCPClientThreadData& d_threadData;