]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
dnsdist: Better handling of backend TCP responses
authorRemi Gacogne <remi.gacogne@powerdns.com>
Fri, 12 Feb 2021 16:56:18 +0000 (17:56 +0100)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Tue, 2 Mar 2021 10:39:48 +0000 (11:39 +0100)
pdns/dnsdist-tcp.cc
pdns/dnsdistdist/dnsdist-tcp-downstream.cc
pdns/dnsdistdist/dnsdist-tcp-upstream.hh

index d2fdb801197a1726a1f2a32e0eeae04e4181b1b7..43716a11443c0b584ee9c59c60e1bdef25306bc3 100644 (file)
@@ -133,6 +133,11 @@ public:
     }
   }
 
+  static void clear()
+  {
+    t_downstreamConnections.clear();
+  }
+
 private:
   static thread_local map<std::shared_ptr<DownstreamState>, std::deque<std::shared_ptr<TCPConnectionToBackend>>> t_downstreamConnections;
   static const size_t s_maxCachedConnectionsPerDownstream;
@@ -170,6 +175,11 @@ IncomingTCPConnectionState::~IncomingTCPConnectionState()
   d_handler.close();
 }
 
+void IncomingTCPConnectionState::clearAllDownstreamConnections()
+{
+  DownstreamConnectionsManager::clear();
+}
+
 std::shared_ptr<TCPConnectionToBackend> IncomingTCPConnectionState::getDownstreamConnection(std::shared_ptr<DownstreamState>& ds, const std::unique_ptr<std::vector<ProxyProtocolValue>>& tlvs, const struct timeval& now)
 {
   std::shared_ptr<TCPConnectionToBackend> downstream{nullptr};
@@ -304,7 +314,7 @@ static IOState sendQueuedResponses(std::shared_ptr<IncomingTCPConnectionState>&
   return IOState::Done;
 }
 
-static void handleResponseSent(std::shared_ptr<IncomingTCPConnectionState>& state)
+static void handleResponseSent(std::shared_ptr<IncomingTCPConnectionState>& state, const TCPResponse& currentResponse)
 {
   if (state->d_isXFR) {
     return;
@@ -312,7 +322,6 @@ static void handleResponseSent(std::shared_ptr<IncomingTCPConnectionState>& stat
 
   --state->d_currentQueriesCount;
 
-  const auto& currentResponse = state->d_currentResponse;
   if (currentResponse.d_selfGenerated == false && currentResponse.d_connection && currentResponse.d_connection->getDS()) {
     const auto& ds = currentResponse.d_connection->getDS();
     struct timespec answertime;
@@ -339,6 +348,11 @@ static void handleResponseSent(std::shared_ptr<IncomingTCPConnectionState>& stat
 
 bool IncomingTCPConnectionState::canAcceptNewQueries(const struct timeval& now)
 {
+  if (d_hadErrors) {
+    DEBUGLOG("not accepting new queries because we encountered some error during the processing already");
+    return false;
+  }
+
   if (d_isXFR) {
     DEBUGLOG("not accepting new queries because used for XFR");
     return false;
@@ -413,7 +427,7 @@ IOState IncomingTCPConnectionState::sendResponse(std::shared_ptr<IncomingTCPConn
     auto iostate = state->d_handler.tryWrite(state->d_currentResponse.d_buffer, state->d_currentPos, state->d_currentResponse.d_buffer.size());
     if (iostate == IOState::Done) {
       DEBUGLOG("response sent from "<<__PRETTY_FUNCTION__);
-      handleResponseSent(state);
+      handleResponseSent(state, state->d_currentResponse);
       return iostate;
     } else {
       state->d_lastIOBlocked = true;
@@ -434,7 +448,7 @@ IOState IncomingTCPConnectionState::sendResponse(std::shared_ptr<IncomingTCPConn
 
 void IncomingTCPConnectionState::terminateClientConnection()
 {
-  cerr<<"terminating client connection"<<endl;
+  DEBUGLOG("terminating client connection");
   d_queuedResponses.clear();
   /* we have already released idle connections that could be reused,
      we don't care about the ones still waiting for responses */
@@ -459,10 +473,15 @@ void IncomingTCPConnectionState::queueResponse(std::shared_ptr<IncomingTCPConnec
       state->d_state == IncomingTCPConnectionState::State::waitingForQuery) {
     auto iostate = sendQueuedResponses(state, now);
 
-    if (iostate == IOState::Done && state->canAcceptNewQueries(now)) {
-      state->resetForNewQuery();
-      state->d_state = IncomingTCPConnectionState::State::waitingForQuery;
-      iostate = IOState::NeedRead;
+    if (iostate == IOState::Done) {
+      if (state->canAcceptNewQueries(now)) {
+        state->resetForNewQuery();
+        state->d_state = IncomingTCPConnectionState::State::waitingForQuery;
+        iostate = IOState::NeedRead;
+      }
+      else {
+        state->d_state = IncomingTCPConnectionState::State::idle;
+      }
     }
 
     // for the same reason we need to update the state right away, nobody will do that for us
@@ -473,7 +492,6 @@ void IncomingTCPConnectionState::queueResponse(std::shared_ptr<IncomingTCPConnec
 /* called from the backend code when a new response has been received */
 void IncomingTCPConnectionState::handleResponse(std::shared_ptr<IncomingTCPConnectionState> state, const struct timeval& now, TCPResponse&& response)
 {
-  cerr<<"in "<<__PRETTY_FUNCTION__<<endl;
   if (!state->d_isXFR && response.d_connection && response.d_connection->isIdle()) {
     // if we have added a TCP Proxy Protocol payload to a connection, don't release it to the general pool yet, no one else will be able to use it anyway
     if (response.d_connection->canBeReused()) {
@@ -496,23 +514,30 @@ void IncomingTCPConnectionState::handleResponse(std::shared_ptr<IncomingTCPConne
   }
 
   if (response.d_buffer.size() < sizeof(dnsheader)) {
-    cerr<<"too small!"<<endl;
+    state->terminateClientConnection();
     return;
   }
 
-  auto& ids = response.d_idstate;
-  unsigned int qnameWireLength;
-  if (!responseContentMatches(response.d_buffer, ids.qname, ids.qtype, ids.qclass, response.d_connection->getRemote(), qnameWireLength)) {
-    cerr<<"does not match!"<<endl;
-    return;
-  }
+  try {
+    auto& ids = response.d_idstate;
+    unsigned int qnameWireLength;
+    if (!responseContentMatches(response.d_buffer, ids.qname, ids.qtype, ids.qclass, response.d_connection->getRemote(), qnameWireLength)) {
+      state->terminateClientConnection();
+      return;
+    }
 
-  DNSResponse dr = makeDNSResponseFromIDState(ids, response.d_buffer, true);
+    DNSResponse dr = makeDNSResponseFromIDState(ids, response.d_buffer, true);
 
-  memcpy(&response.d_cleartextDH, dr.getHeader(), sizeof(response.d_cleartextDH));
+    memcpy(&response.d_cleartextDH, dr.getHeader(), sizeof(response.d_cleartextDH));
 
-  if (!processResponse(response.d_buffer, state->d_threadData.localRespRulactions, dr, false)) {
-    cerr<<"processResponse failed"<<endl;
+    if (!processResponse(response.d_buffer, state->d_threadData.localRespRulactions, dr, false)) {
+      state->terminateClientConnection();
+      return;
+    }
+  }
+  catch (const std::exception& e) {
+    vinfolog("Unxpected exception while handling response from backend: %s", e.what());
+    state->terminateClientConnection();
     return;
   }
 
@@ -534,7 +559,6 @@ void IncomingTCPConnectionState::handleResponse(std::shared_ptr<IncomingTCPConne
     }
   }
 
-  cerr<<"calling queueResponse"<<endl;
   queueResponse(state, now, std::move(response));
 }
 
@@ -764,8 +788,8 @@ void IncomingTCPConnectionState::handleIO(std::shared_ptr<IncomingTCPConnectionS
       }
 
       if (!state->d_lastIOBlocked && state->d_state == IncomingTCPConnectionState::State::readingProxyProtocolHeader) {
-        DEBUGLOG("reading proxy protocol header");
         do {
+          DEBUGLOG("reading proxy protocol header");
           iostate = state->d_handler.tryRead(state->d_buffer, state->d_currentPos, state->d_proxyProtocolNeed);
           if (iostate == IOState::Done) {
             state->d_buffer.resize(state->d_currentPos);
@@ -848,7 +872,6 @@ void IncomingTCPConnectionState::handleIO(std::shared_ptr<IncomingTCPConnectionS
 
           state->d_state = IncomingTCPConnectionState::State::idle;
           handleQuery(state, now);
-          cerr<<"out of handleQuery, state is "<<(int)state->d_state<<", iostate is "<<(int)iostate<<endl;
           /* the state might have been updated in the meantime, we don't want to override it
              in that case */
           if (state->active() && state->d_state != IncomingTCPConnectionState::State::idle) {
@@ -865,7 +888,7 @@ void IncomingTCPConnectionState::handleIO(std::shared_ptr<IncomingTCPConnectionS
         iostate = state->d_handler.tryWrite(state->d_currentResponse.d_buffer, state->d_currentPos, state->d_currentResponse.d_buffer.size());
         if (iostate == IOState::Done) {
           DEBUGLOG("response sent from "<<__PRETTY_FUNCTION__);
-          handleResponseSent(state);
+          handleResponseSent(state, state->d_currentResponse);
           state->d_state = IncomingTCPConnectionState::State::idle;
         }
         else {
@@ -880,16 +903,21 @@ void IncomingTCPConnectionState::handleIO(std::shared_ptr<IncomingTCPConnectionS
            state->d_state == IncomingTCPConnectionState::State::waitingForQuery))
       {
         // try sending querued responses
-        cerr<<"send responses, if any"<<endl;
+        DEBUGLOG("send responses, if any");
         iostate = sendQueuedResponses(state, now);
 
         if (!state->d_lastIOBlocked && iostate == IOState::Done) {
           // if the query has been passed to a backend, or dropped, and the responses have been sent,
           // we can start reading again
-          if (!state->d_isXFR && state->canAcceptNewQueries(now)) {
-            cerr<<"reset for new query"<<endl;
-            state->resetForNewQuery();
-            iostate = IOState::NeedRead;
+          if (!state->d_isXFR) {
+            if (state->canAcceptNewQueries(now)) {
+              state->resetForNewQuery();
+              iostate = IOState::NeedRead;
+            }
+            else {
+              state->d_state = IncomingTCPConnectionState::State::idle;
+              iostate = IOState::Done;
+            }
           }
         }
       }
@@ -930,11 +958,11 @@ void IncomingTCPConnectionState::handleIO(std::shared_ptr<IncomingTCPConnectionS
         DEBUGLOG("Closing TCP client connection: "<<e.what());
       }
       /* remove this FD from the IO multiplexer */
-      iostate = IOState::Done;
+      state->terminateClientConnection();
     }
 
     if (!state->active()) {
-      cerr<<"state is no longer active"<<endl;
+      DEBUGLOG("state is no longer active");
       return;
     }
 
@@ -952,6 +980,7 @@ void IncomingTCPConnectionState::handleIO(std::shared_ptr<IncomingTCPConnectionS
 void IncomingTCPConnectionState::notifyIOError(std::shared_ptr<IncomingTCPConnectionState>& state, IDState&& query, const struct timeval& now)
 {
   --state->d_currentQueriesCount;
+  state->d_hadErrors = true;
 
   if (state->d_state == State::sendingResponse) {
     /* if we have responses to send, let's do that first */
@@ -970,7 +999,7 @@ void IncomingTCPConnectionState::notifyIOError(std::shared_ptr<IncomingTCPConnec
   }
   else {
     // the backend code already tried to reconnect if it was possible
-    state->d_ioState->reset();
+    state->terminateClientConnection();
   }
 }
 
@@ -981,16 +1010,18 @@ void IncomingTCPConnectionState::handleXFRResponse(std::shared_ptr<IncomingTCPCo
 
 void IncomingTCPConnectionState::handleTimeout(std::shared_ptr<IncomingTCPConnectionState>& state, bool write)
 {
+  vinfolog("Timeout while %s TCP client %s", (write ? "writing to" : "reading from"), state->d_ci.remote.toStringWithPort());
   DEBUGLOG("client timeout");
   DEBUGLOG("Processed "<<state->d_queriesCount<<" queries, current count is "<<state->d_currentQueriesCount<<", "<<state->d_activeConnectionsToBackend.size()<<" active connections, "<<state->d_queuedResponses.size()<<" response queued");
 
   if (write || state->d_currentQueriesCount == 0) {
     ++state->d_ci.cs->tcpClientTimeouts;
-    state->d_ioState->reset();
+    state->d_ioState.reset();
   }
   else {
     DEBUGLOG("Going idle");
     /* we still have some queries in flight, let's just stop reading for now */
+    state->d_hadErrors = true;
     state->d_state = IncomingTCPConnectionState::State::idle;
     state->d_ioState->update(IOState::Done, handleIOCallback, state);
 
index e93d6f6a340dbdceb78886196f2d8a859883f99d..2f9ffbf86231e71f4ed931d80a51f2bd4acff6ea 100644 (file)
@@ -84,155 +84,161 @@ void TCPConnectionToBackend::handleIO(std::shared_ptr<TCPConnectionToBackend>& c
   bool connectionDied = false;
   IOState iostate = IOState::Done;
   IOStateGuard ioGuard(conn->d_ioState);
+  bool reconnected = false;
 
-  try {
-    if (conn->d_state == State::sendingQueryToBackend) {
-      iostate = sendQuery(conn, now);
+  do {
+    reconnected = false;
 
-      while (iostate == IOState::Done && !conn->d_pendingQueries.empty()) {
-        queueNextQuery(conn);
+    try {
+      if (conn->d_state == State::sendingQueryToBackend) {
         iostate = sendQuery(conn, now);
+
+        while (iostate == IOState::Done && !conn->d_pendingQueries.empty()) {
+          queueNextQuery(conn);
+          iostate = sendQuery(conn, now);
+        }
+
+        if (iostate == IOState::Done && conn->d_pendingQueries.empty()) {
+          conn->d_state = State::readingResponseSizeFromBackend;
+          conn->d_currentPos = 0;
+          conn->d_responseBuffer.resize(sizeof(uint16_t));
+          iostate = IOState::NeedRead;
+        }
       }
 
-      if (iostate == IOState::Done && conn->d_pendingQueries.empty()) {
-        conn->d_state = State::readingResponseSizeFromBackend;
-        conn->d_currentPos = 0;
+      if (conn->d_state == State::readingResponseSizeFromBackend) {
+        DEBUGLOG("reading response size from backend");
+        // then we need to allocate a new buffer (new because we might need to re-send the query if the
+        // backend dies on us)
+        // We also might need to read and send to the client more than one response in case of XFR (yeah!)
         conn->d_responseBuffer.resize(sizeof(uint16_t));
-        iostate = IOState::NeedRead;
+        iostate = conn->d_handler->tryRead(conn->d_responseBuffer, conn->d_currentPos, sizeof(uint16_t));
+        if (iostate == IOState::Done) {
+          DEBUGLOG("got response size from backend");
+          conn->d_state = State::readingResponseFromBackend;
+          conn->d_responseSize = conn->d_responseBuffer.at(0) * 256 + conn->d_responseBuffer.at(1);
+          conn->d_responseBuffer.reserve(conn->d_responseSize + /* we will need to prepend the size later */ 2);
+          conn->d_responseBuffer.resize(conn->d_responseSize);
+          conn->d_currentPos = 0;
+        }
       }
-    }
 
-    if (conn->d_state == State::readingResponseSizeFromBackend) {
-      DEBUGLOG("reading response size from backend");
-      // then we need to allocate a new buffer (new because we might need to re-send the query if the
-      // backend dies on us)
-      // We also might need to read and send to the client more than one response in case of XFR (yeah!)
-      conn->d_responseBuffer.resize(sizeof(uint16_t));
-      iostate = conn->d_handler->tryRead(conn->d_responseBuffer, conn->d_currentPos, sizeof(uint16_t) - conn->d_currentPos);
-      if (iostate == IOState::Done) {
-        DEBUGLOG("got response size from backend");
-        conn->d_state = State::readingResponseFromBackend;
-        conn->d_responseSize = conn->d_responseBuffer.at(0) * 256 + conn->d_responseBuffer.at(1);
-        conn->d_responseBuffer.reserve(conn->d_responseSize + /* we will need to prepend the size later */ 2);
-        conn->d_responseBuffer.resize(conn->d_responseSize);
-        conn->d_currentPos = 0;
+      if (conn->d_state == State::readingResponseFromBackend) {
+        DEBUGLOG("reading response from backend");
+        iostate = conn->d_handler->tryRead(conn->d_responseBuffer, conn->d_currentPos, conn->d_responseSize);
+        if (iostate == IOState::Done) {
+          DEBUGLOG("got response from backend");
+          try {
+            iostate = conn->handleResponse(conn, now);
+          }
+          catch (const std::exception& e) {
+            vinfolog("Got an exception while handling TCP response from %s (client is %s): %s", conn->d_ds ? conn->d_ds->getName() : "unknown", conn->d_currentQuery.d_idstate.origRemote.toStringWithPort(), e.what());
+            ioGuard.release();
+            conn->release();
+            return;
+          }
+        }
       }
-    }
 
-    if (conn->d_state == State::readingResponseFromBackend) {
-      DEBUGLOG("reading response from backend");
-      iostate = conn->d_handler->tryRead(conn->d_responseBuffer, conn->d_currentPos, conn->d_responseSize - conn->d_currentPos);
-      if (iostate == IOState::Done) {
-        DEBUGLOG("got response from backend");
-        try {
-          iostate = conn->handleResponse(conn, now);
-        }
-        catch (const std::exception& e) {
-          vinfolog("Got an exception while handling TCP response from %s (client is %s): %s", conn->d_ds ? conn->d_ds->getName() : "unknown", conn->d_currentQuery.d_idstate.origRemote.toStringWithPort(), e.what());
-          ioGuard.release();
-          conn->release();
-          return;
-        }
+      if (conn->d_state != State::idle &&
+          conn->d_state != State::sendingQueryToBackend &&
+          conn->d_state != State::readingResponseSizeFromBackend &&
+          conn->d_state != State::readingResponseFromBackend) {
+        vinfolog("Unexpected state %d in TCPConnectionToBackend::handleIO", static_cast<int>(conn->d_state));
       }
     }
+    catch (const std::exception& e) {
+      /* most likely an EOF because the other end closed the connection,
+         but it might also be a real IO error or something else.
+         Let's just drop the connection
+      */
+      vinfolog("Got an exception while handling (%s backend) TCP query from %s: %s", (conn->d_state == State::sendingQueryToBackend ? "writing to" : "reading from"), conn->d_currentQuery.d_idstate.origRemote.toStringWithPort(), e.what());
+      if (conn->d_state == State::sendingQueryToBackend) {
+        ++conn->d_ds->tcpDiedSendingQuery;
+      }
+      else {
+        ++conn->d_ds->tcpDiedReadingResponse;
+      }
 
-    if (conn->d_state != State::idle &&
-        conn->d_state != State::sendingQueryToBackend &&
-        conn->d_state != State::readingResponseSizeFromBackend &&
-        conn->d_state != State::readingResponseFromBackend) {
-      vinfolog("Unexpected state %d in TCPConnectionToBackend::handleIO", static_cast<int>(conn->d_state));
-    }
-  }
-  catch (const std::exception& e) {
-    /* most likely an EOF because the other end closed the connection,
-       but it might also be a real IO error or something else.
-       Let's just drop the connection
-    */
-    vinfolog("Got an exception while handling (%s backend) TCP query from %s: %s", (conn->d_ioState->getState() == IOState::NeedRead ? "reading from" : "writing to"), conn->d_currentQuery.d_idstate.origRemote.toStringWithPort(), e.what());
-    if (conn->d_state == State::sendingQueryToBackend) {
-      ++conn->d_ds->tcpDiedSendingQuery;
-    }
-    else {
-      ++conn->d_ds->tcpDiedReadingResponse;
-    }
+      /* don't increase this counter when reusing connections */
+      if (conn->d_fresh) {
+        ++conn->d_downstreamFailures;
+      }
 
-    /* don't increase this counter when reusing connections */
-    if (conn->d_fresh) {
-      ++conn->d_downstreamFailures;
+      /* remove this FD from the IO multiplexer */
+      iostate = IOState::Done;
+      connectionDied = true;
     }
 
-    /* remove this FD from the IO multiplexer */
-    iostate = IOState::Done;
-    connectionDied = true;
-  }
-
-  if (connectionDied) {
+    if (connectionDied) {
 
-    bool reconnected = false;
-    DEBUGLOG("connection died, number of failures is "<<conn->d_downstreamFailures<<", retries is "<<conn->d_ds->retries);
+      DEBUGLOG("connection died, number of failures is "<<conn->d_downstreamFailures<<", retries is "<<conn->d_ds->retries);
 
-    if ((!conn->d_usedForXFR || conn->d_queries == 0) && conn->d_downstreamFailures < conn->d_ds->retries) {
+      if ((!conn->d_usedForXFR || conn->d_queries == 0) && conn->d_downstreamFailures < conn->d_ds->retries) {
 
-      conn->d_ioState->reset();
-      ioGuard.release();
+        conn->d_ioState.reset();
+        ioGuard.release();
 
-      try {
-        if (conn->reconnect()) {
-          conn->d_ioState = make_unique<IOStateHandler>(conn->d_clientConn->getIOMPlexer(), conn->d_handler->getDescriptor());
+        try {
+          if (conn->reconnect()) {
+            conn->d_ioState = make_unique<IOStateHandler>(conn->d_clientConn->getIOMPlexer(), conn->d_handler->getDescriptor());
+
+            /* we need to resend the queries that were in flight, if any */
+            for (auto& pending : conn->d_pendingResponses) {
+              conn->d_pendingQueries.push_back(std::move(pending.second));
+              if (!conn->d_usedForXFR) {
+                --conn->d_ds->outstanding;
+              }
+            }
+            conn->d_pendingResponses.clear();
+            conn->d_currentPos = 0;
 
-          /* we need to resend the queries that were in flight, if any */
-          for (auto& pending : conn->d_pendingResponses) {
-            conn->d_pendingQueries.push_back(std::move(pending.second));
-            if (!conn->d_usedForXFR) {
-              --conn->d_ds->outstanding;
+            if (conn->d_state == State::doingHandshake ||
+                conn->d_state == State::sendingQueryToBackend) {
+              iostate = IOState::NeedWrite;
+              // resume sending query
             }
-          }
-          conn->d_pendingResponses.clear();
-          conn->d_currentPos = 0;
+            else {
+              if (conn->d_pendingQueries.empty()) {
+                throw std::runtime_error("TCP connection to a backend in state " + std::to_string((int)conn->d_state) + " with no pending queries");
+              }
 
-          if (conn->d_state == State::doingHandshake ||
-              conn->d_state == State::sendingQueryToBackend) {
-            iostate = IOState::NeedWrite;
-            // resume sending query
-          }
-          else {
-            if (conn->d_pendingQueries.empty()) {
-              throw std::runtime_error("TCP connection to a backend in state " + std::to_string((int)conn->d_state) + " with no pending queries");
+              iostate = queueNextQuery(conn);
             }
 
-            iostate = queueNextQuery(conn);
-          }
+            if (!conn->d_proxyProtocolPayloadAdded && !conn->d_proxyProtocolPayload.empty()) {
+              conn->d_currentQuery.d_buffer.insert(conn->d_currentQuery.d_buffer.begin(), conn->d_proxyProtocolPayload.begin(), conn->d_proxyProtocolPayload.end());
+              conn->d_proxyProtocolPayloadAdded = true;
+            }
 
-          if (!conn->d_proxyProtocolPayloadAdded && !conn->d_proxyProtocolPayload.empty()) {
-            conn->d_currentQuery.d_buffer.insert(conn->d_currentQuery.d_buffer.begin(), conn->d_proxyProtocolPayload.begin(), conn->d_proxyProtocolPayload.end());
-            conn->d_proxyProtocolPayloadAdded = true;
+            reconnected = true;
+            connectionDied = false;
           }
-
-          reconnected = true;
+        }
+        catch (const std::exception& e) {
+          // reconnect might throw on failure, let's ignore that, we just need to know
+          // it failed
         }
       }
-      catch (const std::exception& e) {
-        // reconnect might throw on failure, let's ignore that, we just need to know
-        // it failed
-      }
-    }
 
-    if (!reconnected) {
-      /* reconnect failed, we give up */
-      DEBUGLOG("reconnect failed, we give up");
-      ++conn->d_ds->tcpGaveUp;
-      conn->notifyAllQueriesFailed(now, FailureReason::gaveUp);
+      if (!reconnected) {
+        /* reconnect failed, we give up */
+        DEBUGLOG("reconnect failed, we give up");
+        ++conn->d_ds->tcpGaveUp;
+        conn->notifyAllQueriesFailed(now, FailureReason::gaveUp);
+      }
     }
-  }
 
-  if (conn->d_ioState) {
-    if (iostate == IOState::Done) {
-      conn->d_ioState->update(iostate, handleIOCallback, conn);
-    }
-    else {
-      conn->d_ioState->update(iostate, handleIOCallback, conn, iostate == IOState::NeedRead ? conn->getBackendReadTTD(now) : conn->getBackendWriteTTD(now));
+    if (conn->d_ioState) {
+      if (iostate == IOState::Done) {
+        conn->d_ioState->update(iostate, handleIOCallback, conn);
+      }
+      else {
+        conn->d_ioState->update(iostate, handleIOCallback, conn, iostate == IOState::NeedRead ? conn->getBackendReadTTD(now) : conn->getBackendWriteTTD(now));
+      }
     }
   }
+  while (reconnected);
 
   ioGuard.release();
 }
@@ -326,12 +332,12 @@ bool TCPConnectionToBackend::reconnect()
     catch(const std::runtime_error& e) {
       vinfolog("Connection to downstream server %s failed: %s", d_ds->getName(), e.what());
       d_downstreamFailures++;
-      if (d_downstreamFailures > d_ds->retries) {
+      if (d_downstreamFailures >= d_ds->retries) {
         throw;
       }
     }
   }
-  while (d_downstreamFailures <= d_ds->retries);
+  while (d_downstreamFailures < d_ds->retries);
 
   return false;
 }
@@ -339,6 +345,7 @@ bool TCPConnectionToBackend::reconnect()
 void TCPConnectionToBackend::handleTimeout(const struct timeval& now, bool write)
 {
   /* in some cases we could retry, here, reconnecting and sending our pending responses again */
+  vinfolog("Timeout while %s TCP backend %s", (write ? "writing to" : "reading from"), d_ds->getName());
   if (write) {
     ++d_ds->tcpWriteTimeouts;
   }
@@ -426,58 +433,57 @@ IOState TCPConnectionToBackend::handleResponse(std::shared_ptr<TCPConnectionToBa
     // get ready to read the next packet, if any
     return IOState::NeedRead;
   }
-  else {
-    uint16_t queryId = 0;
-    try {
-      queryId = getQueryIdFromResponse();
-    }
-    catch (const std::exception& e) {
-      DEBUGLOG("Unable to get query ID");
-      notifyAllQueriesFailed(now, FailureReason::unexpectedQueryID);
-      throw;
-    }
 
-    auto it = d_pendingResponses.find(queryId);
-    if (it == d_pendingResponses.end()) {
-      DEBUGLOG("could not find any corresponding query for ID "<<queryId<<". This is likely a duplicated ID over the same TCP connection, giving up!");
-      notifyAllQueriesFailed(now, FailureReason::unexpectedQueryID);
-      return IOState::Done;
-    }
+  uint16_t queryId = 0;
+  try {
+    queryId = getQueryIdFromResponse();
+  }
+  catch (const std::exception& e) {
+    DEBUGLOG("Unable to get query ID");
+    notifyAllQueriesFailed(now, FailureReason::unexpectedQueryID);
+    throw;
+  }
 
-    if (!conn->d_usedForXFR) {
-      --conn->d_ds->outstanding;
-    }
+  auto it = d_pendingResponses.find(queryId);
+  if (it == d_pendingResponses.end()) {
+    DEBUGLOG("could not find any corresponding query for ID "<<queryId<<". This is likely a duplicated ID over the same TCP connection, giving up!");
+    notifyAllQueriesFailed(now, FailureReason::unexpectedQueryID);
+    return IOState::Done;
+  }
 
-    auto ids = std::move(it->second.d_idstate);
-    d_pendingResponses.erase(it);
-    DEBUGLOG("passing response to client connection for "<<ids.qname);
-    /* marking as idle for now, so we can accept new queries if our queues are empty */
-    if (d_pendingQueries.empty() && d_pendingResponses.empty()) {
-      d_state = State::idle;
-    }
-    clientConn->handleResponse(clientConn, now, TCPResponse(std::move(d_responseBuffer), std::move(ids), conn));
-
-    if (!d_pendingQueries.empty()) {
-      DEBUGLOG("still have some queries to send");
-      d_state = State::sendingQueryToBackend;
-      d_currentQuery = std::move(d_pendingQueries.front());
-      d_currentPos = 0;
-      d_pendingQueries.pop_front();
-      return IOState::NeedWrite;
-    }
-    else if (!d_pendingResponses.empty()) {
-      DEBUGLOG("still have some responses to read");
-      d_state = State::readingResponseSizeFromBackend;
-      d_currentPos = 0;
-      d_responseBuffer.resize(sizeof(uint16_t));
-      return IOState::NeedRead;
-    }
-    else {
-      DEBUGLOG("nothing to do, waiting for a new query");
-      d_state = State::idle;
-      d_clientConn.reset();
-      return IOState::Done;
-    }
+  if (!conn->d_usedForXFR) {
+    --conn->d_ds->outstanding;
+  }
+
+  auto ids = std::move(it->second.d_idstate);
+  d_pendingResponses.erase(it);
+  DEBUGLOG("passing response to client connection for "<<ids.qname);
+  /* marking as idle for now, so we can accept new queries if our queues are empty */
+  if (d_pendingQueries.empty() && d_pendingResponses.empty()) {
+    d_state = State::idle;
+  }
+  clientConn->handleResponse(clientConn, now, TCPResponse(std::move(d_responseBuffer), std::move(ids), conn));
+
+  if (!d_pendingQueries.empty()) {
+    DEBUGLOG("still have some queries to send");
+    d_state = State::sendingQueryToBackend;
+    d_currentQuery = std::move(d_pendingQueries.front());
+    d_currentPos = 0;
+    d_pendingQueries.pop_front();
+    return IOState::NeedWrite;
+  }
+  else if (!d_pendingResponses.empty()) {
+    DEBUGLOG("still have some responses to read");
+    d_state = State::readingResponseSizeFromBackend;
+    d_currentPos = 0;
+    d_responseBuffer.resize(sizeof(uint16_t));
+    return IOState::NeedRead;
+  }
+  else {
+    DEBUGLOG("nothing to do, waiting for a new query");
+    d_state = State::idle;
+    d_clientConn.reset();
+    return IOState::Done;
   }
 }
 
index f3724b85c8b6087e31169752966ffa0d61882c4e..c53db29f0ba1482454bcc3509c8acf869950e2d6 100644 (file)
@@ -151,6 +151,8 @@ public:
     return d_threadData.mplexer;
   }
 
+  static void clearAllDownstreamConnections();
+
   static void handleIO(std::shared_ptr<IncomingTCPConnectionState>& conn, const struct timeval& now);
   static void handleIOCallback(int fd, FDMultiplexer::funcparam_t& param);
   static void notifyIOError(std::shared_ptr<IncomingTCPConnectionState>& state, IDState&& query, const struct timeval& now);
@@ -210,4 +212,5 @@ public:
   bool d_xfrStarted{false};
   bool d_proxyProtocolPayloadHasTLV{false};
   bool d_lastIOBlocked{false};
+  bool d_hadErrors{false};
 };