]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
dnsdist: Try to send before calling epoll_wait
authorRemi Gacogne <remi.gacogne@powerdns.com>
Thu, 17 Sep 2020 15:04:04 +0000 (17:04 +0200)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Tue, 10 Nov 2020 08:52:23 +0000 (09:52 +0100)
It usually works and saves adding the descriptor to the set, calling
epoll_wait() then removing the descriptor.

pdns/dnsdist-tcp.cc
pdns/dnsdistdist/dnsdist-tcp-downstream.cc
pdns/dnsdistdist/dnsdist-tcp-downstream.hh
pdns/dnsdistdist/dnsdist-tcp-upstream.hh
regression-tests.dnsdist/test_OOOR.py

index 1da80972684f11dafc4e0466f9e4f4d86645a6d5..1f5387e274ed19c928a1a4b1c6a1f21deac05608 100644 (file)
@@ -253,7 +253,42 @@ void TCPClientCollection::addTCPClientThread()
 
 std::unique_ptr<TCPClientCollection> g_tcpclientthreads;
 
-static IOState handleResponseSent(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now)
+static IOState sendQueuedResponses(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now)
+{
+  IOState result = IOState::Done;
+
+  while (!state->d_queuedResponses.empty()) {
+    DEBUGLOG("queue size is "<<state->d_queuedResponses.size()<<", sending the next one");
+    TCPResponse resp = std::move(state->d_queuedResponses.front());
+    state->d_queuedResponses.pop_front();
+    state->d_state = IncomingTCPConnectionState::State::idle;
+    result = state->sendResponse(state, now, std::move(resp));
+    if (result != IOState::Done) {
+      return result;
+    }
+  }
+
+  if (state->d_isXFR) {
+    /* we should still be reading from the backend, and we don't want to read from the client */
+    state->d_state = IncomingTCPConnectionState::State::idle;
+    state->d_currentPos = 0;
+    DEBUGLOG("idling for XFR completion");
+    return IOState::Done;
+  } else {
+    if (state->canAcceptNewQueries()) {
+      DEBUGLOG("waiting for new queries");
+      state->resetForNewQuery();
+      return IOState::NeedRead;
+    }
+    else {
+      DEBUGLOG("idling");
+      state->d_state = IncomingTCPConnectionState::State::idle;
+      return IOState::Done;
+    }
+  }
+}
+
+static bool handleResponseSent(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now)
 {
   --state->d_currentQueriesCount;
 
@@ -284,43 +319,16 @@ static IOState handleResponseSent(std::shared_ptr<IncomingTCPConnectionState>& s
 
     if (g_maxTCPQueriesPerConn && state->d_queriesCount > g_maxTCPQueriesPerConn) {
       vinfolog("Terminating TCP connection from %s because it reached the maximum number of queries per conn (%d / %d)", state->d_ci.remote.toStringWithPort(), state->d_queriesCount, g_maxTCPQueriesPerConn);
-      return IOState::Done;
+      return false;
     }
 
     if (state->maxConnectionDurationReached(g_maxTCPConnectionDuration, now)) {
       vinfolog("Terminating TCP connection from %s because it reached the maximum TCP connection duration", state->d_ci.remote.toStringWithPort());
-      return IOState::Done;
+      return false;
     }
   }
 
-  if (state->d_queuedResponses.empty()) {
-    if (state->d_isXFR) {
-      /* we should still be reading from the backend, and we don't want to read from the client */
-      state->d_state = IncomingTCPConnectionState::State::idle;
-      state->d_currentPos = 0;
-      DEBUGLOG("idling for XFR completion");
-      return IOState::Done;
-    } else {
-      if (state->canAcceptNewQueries()) {
-        DEBUGLOG("waiting for new queries");
-        state->resetForNewQuery();
-        return IOState::NeedRead;
-      }
-      else {
-        DEBUGLOG("idling");
-        state->d_state = IncomingTCPConnectionState::State::idle;
-        return IOState::Done;
-      }
-    }
-  }
-  else {
-    DEBUGLOG("queue size is "<<state->d_queuedResponses.size()<<", sending the next one");
-    TCPResponse resp = std::move(state->d_queuedResponses.front());
-    state->d_queuedResponses.pop_front();
-    state->d_state = IncomingTCPConnectionState::State::idle;
-    state->sendResponse(state, now, std::move(resp));
-    return IOState::NeedWrite;
-  }
+  return true;
 }
 
 bool IncomingTCPConnectionState::canAcceptNewQueries() const
@@ -372,26 +380,43 @@ void IncomingTCPConnectionState::registerActiveDownstreamConnection(std::shared_
   d_activeConnectionsToBackend[conn->getDS()].push_front(conn);
 }
 
-/* this version is called when the buffer has been set and the rules have been processed */
-void IncomingTCPConnectionState::sendResponse(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now, TCPResponse&& response)
+/* called when the buffer has been set and the rules have been processed, and only from handleIO (sometimes indirectly via handleQuery) */
+IOState IncomingTCPConnectionState::sendResponse(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now, TCPResponse&& response)
+{
+  state->d_state = IncomingTCPConnectionState::State::sendingResponse;
+
+  uint16_t responseSize = static_cast<uint16_t>(response.d_buffer.size());
+  const uint8_t sizeBytes[] = { static_cast<uint8_t>(responseSize / 256), static_cast<uint8_t>(responseSize % 256) };
+  /* prepend the size. Yes, this is not the most efficient way but it prevents mistakes
+     that could occur if we had to deal with the size during the processing,
+     especially alignment issues */
+  response.d_buffer.insert(response.d_buffer.begin(), sizeBytes, sizeBytes + 2);
+  state->d_currentPos = 0;
+  state->d_currentResponse = std::move(response);
+
+  auto iostate = state->d_handler.tryWrite(state->d_currentResponse.d_buffer, state->d_currentPos, state->d_currentResponse.d_buffer.size());
+  if (iostate == IOState::Done) {
+    DEBUGLOG("response sent");
+    if (!handleResponseSent(state, now)) {
+      return IOState::Done;
+    }
+    return sendQueuedResponses(state, now);
+  } else {
+    return IOState::NeedWrite;
+    DEBUGLOG("partial write");
+  }
+}
+
+/* called when handling a response or error coming from a backend */
+void IncomingTCPConnectionState::sendOrQueueResponse(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now, TCPResponse&& response)
 {
   // if we already reading a query (not the query size, mind you), or sending a response we need to either queue the response
   // otherwise we can start sending it right away
   if (state->d_state == IncomingTCPConnectionState::State::idle ||
       state->d_state == IncomingTCPConnectionState::State::readingQuerySize) {
 
-    state->d_state = IncomingTCPConnectionState::State::sendingResponse;
-
-    uint16_t responseSize = static_cast<uint16_t>(response.d_buffer.size());
-    const uint8_t sizeBytes[] = { static_cast<uint8_t>(responseSize / 256), static_cast<uint8_t>(responseSize % 256) };
-    /* prepend the size. Yes, this is not the most efficient way but it prevents mistakes
-       that could occur if we had to deal with the size during the processing,
-       especially alignment issues */
-    response.d_buffer.insert(response.d_buffer.begin(), sizeBytes, sizeBytes + 2);
-    state->d_currentPos = 0;
-    state->d_currentResponse = std::move(response);
-
-    state->d_ioState->update(IOState::NeedWrite, handleIOCallback, state, state->getClientWriteTTD(now));
+    auto iostate = sendResponse(state, now, std::move(response));
+    state->d_ioState->update(iostate, handleIOCallback, state, iostate == IOState::NeedWrite ? state->getClientWriteTTD(now) : state->getClientReadTTD(now));
   }
   else {
     // queue response
@@ -400,17 +425,21 @@ void IncomingTCPConnectionState::sendResponse(std::shared_ptr<IncomingTCPConnect
   }
 }
 
-/* this version is called from the backend code when a new response has been received */
-void IncomingTCPConnectionState::handleResponse(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now, TCPResponse&& response)
+/* called from the backend code when a new response has been received */
+void IncomingTCPConnectionState::handleResponse(std::shared_ptr<IncomingTCPConnectionState> state, const struct timeval& now, TCPResponse&& response)
 {
-  // if we have added a TCP Proxy Protocol payload to a connection, don't release it yet, no one else will be able to use it anyway
-  if (!state->d_isXFR && response.d_connection && response.d_connection->isIdle() && response.d_connection->canBeReused()) {
-    auto& list = state->d_activeConnectionsToBackend.at(response.d_connection->getDS());
-    for (auto it = list.begin(); it != list.end(); ++it) {
-      if (*it == response.d_connection) {
-        DownstreamConnectionsManager::releaseDownstreamConnection(std::move(*it));
-        list.erase(it);
-        break;
+  if (!state->d_isXFR && response.d_connection && response.d_connection->isIdle()) {
+    // if we have added a TCP Proxy Protocol payload to a connection, don't release it to the general pool yet, no one else will be able to use it anyway
+    if (response.d_connection->canBeReused()) {
+      auto& list = state->d_activeConnectionsToBackend.at(response.d_connection->getDS());
+
+      for (auto it = list.begin(); it != list.end(); ++it) {
+        if (*it == response.d_connection) {
+          response.d_connection->release();
+          DownstreamConnectionsManager::releaseDownstreamConnection(std::move(*it));
+          list.erase(it);
+          break;
+        }
       }
     }
   }
@@ -471,14 +500,14 @@ void IncomingTCPConnectionState::handleResponse(std::shared_ptr<IncomingTCPConne
     }
   }
 
-  sendResponse(state, now, std::move(response));
+  sendOrQueueResponse(state, now, std::move(response));
 }
 
-static bool handleQuery(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now)
+static IOState handleQuery(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now)
 {
   if (state->d_querySize < sizeof(dnsheader)) {
     ++g_stats.nonCompliantQueries;
-    return true;
+    return IOState::NeedRead;
   }
 
   state->d_readingFirstQuery = false;
@@ -521,13 +550,12 @@ static bool handleQuery(std::shared_ptr<IncomingTCPConnectionState>& state, cons
     response.d_buffer = std::move(*dnsCryptResponse);
     state->d_state = IncomingTCPConnectionState::State::idle;
     ++state->d_currentQueriesCount;
-    state->sendResponse(state, now, std::move(response));
-    return false;
+    return state->sendResponse(state, now, std::move(response));
   }
 
   const auto& dh = reinterpret_cast<dnsheader*>(query);
   if (!checkQueryHeaders(dh)) {
-    return true;
+    return IOState::NeedRead;
   }
 
   uint16_t qtype, qclass;
@@ -546,7 +574,7 @@ static bool handleQuery(std::shared_ptr<IncomingTCPConnectionState>& state, cons
   auto result = processQuery(dq, *state->d_ci.cs, state->d_threadData.holders, ds);
 
   if (result == ProcessQueryResult::Drop) {
-    return true;
+    return IOState::Done;
   }
 
   if (result == ProcessQueryResult::SendAnswer) {
@@ -556,12 +584,11 @@ static bool handleQuery(std::shared_ptr<IncomingTCPConnectionState>& state, cons
     response.d_buffer = std::move(state->d_buffer);
     state->d_state = IncomingTCPConnectionState::State::idle;
     ++state->d_currentQueriesCount;
-    state->sendResponse(state, now, std::move(response));
-    return false;
+    return state->sendResponse(state, now, std::move(response));
   }
 
   if (result != ProcessQueryResult::PassToBackend || ds == nullptr) {
-    return true;
+    return IOState::Done;
   }
 
   IDState ids;
@@ -613,7 +640,7 @@ static bool handleQuery(std::shared_ptr<IncomingTCPConnectionState>& state, cons
   vinfolog("Got query for %s|%s from %s (%s, %d bytes), relayed to %s", ids.qname.toLogString(), QType(ids.qtype).getName(), state->d_ci.remote.toStringWithPort(), (state->d_ci.cs->tlsFrontend ? "DoT" : "TCP"), state->d_buffer.size(), ds->getName());
   downstreamConnection->queueQuery(TCPQuery(std::move(state->d_buffer), std::move(ids)), downstreamConnection);
 
-  return true;
+  return IOState::NeedRead;
 }
 
 void IncomingTCPConnectionState::handleIOCallback(int fd, FDMultiplexer::funcparam_t& param)
@@ -706,13 +733,13 @@ void IncomingTCPConnectionState::handleIO(std::shared_ptr<IncomingTCPConnectionS
         if (iostate == IOState::Done) {
           DEBUGLOG("query received");
 
-          if (handleQuery(state, now)) {
-            // if the query has been passed to a backend, or dropped, we can start
-            // reading again, or sending queued responses
+          iostate = handleQuery(state, now);
+          // if the query has been passed to a backend, or dropped, we can start
+          // reading again, or sending queued responses
+          if (iostate == IOState::NeedRead) {
             if (state->d_queuedResponses.empty()) {
               if (state->canAcceptNewQueries()) {
                 state->resetForNewQuery();
-                iostate = IOState::NeedRead;
               }
               else {
                 state->d_state = IncomingTCPConnectionState::State::idle;
@@ -724,16 +751,9 @@ void IncomingTCPConnectionState::handleIO(std::shared_ptr<IncomingTCPConnectionS
               state->d_queuedResponses.pop_front();
               ioGuard.release();
               state->d_state = IncomingTCPConnectionState::State::idle;
-              state->sendResponse(state, now, std::move(resp));
-              return;
+              iostate = sendResponse(state, now, std::move(resp));
             }
           }
-          else {
-            /* otherwise the state should already be waiting for
-               the socket to be writable */
-            ioGuard.release();
-            return;
-          }
         }
         else {
           wouldBlock = true;
@@ -745,7 +765,12 @@ void IncomingTCPConnectionState::handleIO(std::shared_ptr<IncomingTCPConnectionS
         iostate = state->d_handler.tryWrite(state->d_currentResponse.d_buffer, state->d_currentPos, state->d_currentResponse.d_buffer.size());
         if (iostate == IOState::Done) {
           DEBUGLOG("response sent");
-          iostate = handleResponseSent(state, now);
+          if (!handleResponseSent(state, now)) {
+            iostate = IOState::Done;
+          }
+          else {
+            iostate = sendQueuedResponses(state, now);
+          }
         } else {
           wouldBlock = true;
           DEBUGLOG("partial write");
@@ -795,7 +820,7 @@ void IncomingTCPConnectionState::handleIO(std::shared_ptr<IncomingTCPConnectionS
     }
     ioGuard.release();
   }
-  while (state->d_state == IncomingTCPConnectionState::State::readingQuerySize && iostate == IOState::NeedRead && !wouldBlock);
+  while ((iostate == IOState::NeedRead || iostate == IOState::NeedWrite) && !wouldBlock);
 }
 
 void IncomingTCPConnectionState::notifyIOError(std::shared_ptr<IncomingTCPConnectionState>& state, IDState&& query, const struct timeval& now)
@@ -810,7 +835,7 @@ void IncomingTCPConnectionState::notifyIOError(std::shared_ptr<IncomingTCPConnec
     TCPResponse resp = std::move(state->d_queuedResponses.front());
     state->d_queuedResponses.pop_front();
     state->d_state = IncomingTCPConnectionState::State::idle;
-    sendResponse(state, now, std::move(resp));
+    sendOrQueueResponse(state, now, std::move(resp));
   }
   else {
     // the backend code already tried to reconnect if it was possible
@@ -820,7 +845,7 @@ void IncomingTCPConnectionState::notifyIOError(std::shared_ptr<IncomingTCPConnec
 
 void IncomingTCPConnectionState::handleXFRResponse(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now, TCPResponse&& response)
 {
-  sendResponse(state, now, std::move(response));
+  sendOrQueueResponse(state, now, std::move(response));
 }
 
 void IncomingTCPConnectionState::handleTimeout(std::shared_ptr<IncomingTCPConnectionState>& state, bool write)
index 4fa155c55cad7eaeb95953cf3ba099c245dc487e..8b19b8d7de3fec9bf2fa218c2b0a3408f0846d62 100644 (file)
@@ -23,7 +23,15 @@ void TCPConnectionToBackend::assignToClientConnection(std::shared_ptr<IncomingTC
   }
 }
 
-IOState TCPConnectionToBackend::sendNextQuery(std::shared_ptr<TCPConnectionToBackend>& conn)
+void TCPConnectionToBackend::release()
+{
+  d_clientConn.reset();
+  if (d_ioState) {
+    d_ioState.reset();
+  }
+}
+
+IOState TCPConnectionToBackend::queueNextQuery(std::shared_ptr<TCPConnectionToBackend>& conn)
 {
   conn->d_currentQuery = std::move(conn->d_pendingQueries.front());
   conn->d_pendingQueries.pop_front();
@@ -69,6 +77,38 @@ static IOState tryRead(int fd, std::vector<uint8_t>& buffer, size_t& pos, size_t
   return IOState::Done;
 }
 
+IOState TCPConnectionToBackend::sendQuery(std::shared_ptr<TCPConnectionToBackend>& conn, const struct timeval& now)
+{
+  int fd = conn->d_socket->getHandle();
+  DEBUGLOG("sending query to backend "<<conn->getDS()->getName()<<" over FD "<<fd);
+  int socketFlags = 0;
+#ifdef MSG_FASTOPEN
+  if (conn->isFastOpenEnabled()) {
+    socketFlags |= MSG_FASTOPEN;
+  }
+#endif /* MSG_FASTOPEN */
+
+  size_t sent = sendMsgWithOptions(fd, reinterpret_cast<const char *>(&conn->d_currentQuery.d_buffer.at(conn->d_currentPos)), conn->d_currentQuery.d_buffer.size() - conn->d_currentPos, &conn->d_ds->remote, &conn->d_ds->sourceAddr, conn->d_ds->sourceItf, socketFlags);
+  if (sent == conn->d_currentQuery.d_buffer.size()) {
+    DEBUGLOG("query sent to backend");
+    /* request sent ! */
+    conn->incQueries();
+    conn->d_currentPos = 0;
+
+    DEBUGLOG("adding a pending response for ID "<<conn->d_currentQuery.d_idstate.origID<<" and QNAME "<<conn->d_currentQuery.d_idstate.qname);
+    conn->d_pendingResponses[conn->d_currentQuery.d_idstate.origID] = std::move(conn->d_currentQuery);
+    conn->d_currentQuery.d_buffer.clear();
+
+    return IOState::Done;
+  }
+  else {
+    conn->d_currentPos += sent;
+    /* disable fast open on partial write */
+    conn->disableFastOpen();
+    return IOState::NeedWrite;
+  }
+}
+
 void TCPConnectionToBackend::handleIO(std::shared_ptr<TCPConnectionToBackend>& conn, const struct timeval& now)
 {
   if (conn->d_socket == nullptr) {
@@ -82,40 +122,18 @@ void TCPConnectionToBackend::handleIO(std::shared_ptr<TCPConnectionToBackend>& c
 
   try {
     if (conn->d_state == State::sendingQueryToBackend) {
-      DEBUGLOG("sending query to backend "<<conn->getDS()->getName()<<" over FD "<<fd);
-      int socketFlags = 0;
-#ifdef MSG_FASTOPEN
-      if (conn->isFastOpenEnabled()) {
-        socketFlags |= MSG_FASTOPEN;
+      iostate = sendQuery(conn, now);
+
+      while (iostate == IOState::Done && !conn->d_pendingQueries.empty()) {
+        queueNextQuery(conn);
+        iostate = sendQuery(conn, now);
       }
-#endif /* MSG_FASTOPEN */
 
-      size_t sent = sendMsgWithOptions(fd, reinterpret_cast<const char *>(&conn->d_currentQuery.d_buffer.at(conn->d_currentPos)), conn->d_currentQuery.d_buffer.size() - conn->d_currentPos, &conn->d_ds->remote, &conn->d_ds->sourceAddr, conn->d_ds->sourceItf, socketFlags);
-      if (sent == conn->d_currentQuery.d_buffer.size()) {
-        DEBUGLOG("query sent to backend");
-        /* request sent ! */
-        conn->incQueries();
+      if (iostate == IOState::Done && conn->d_pendingQueries.empty()) {
+        conn->d_state = State::readingResponseSizeFromBackend;
         conn->d_currentPos = 0;
-
-        DEBUGLOG("adding a pending response for ID "<<conn->d_currentQuery.d_idstate.origID<<" and QNAME "<<conn->d_currentQuery.d_idstate.qname);
-        conn->d_pendingResponses[conn->d_currentQuery.d_idstate.origID] = std::move(conn->d_currentQuery);
-        conn->d_currentQuery.d_buffer.clear();
-
-        if (conn->d_pendingQueries.empty()) {
-          conn->d_state = State::readingResponseSizeFromBackend;
-          conn->d_currentPos = 0;
-          conn->d_responseBuffer.resize(sizeof(uint16_t));
-          iostate = IOState::NeedRead;
-        }
-        else {
-          iostate = sendNextQuery(conn);
-        }
-      }
-      else {
-        conn->d_currentPos += sent;
-        iostate = IOState::NeedWrite;
-        /* disable fast open on partial write */
-        conn->disableFastOpen();
+        conn->d_responseBuffer.resize(sizeof(uint16_t));
+        iostate = IOState::NeedRead;
       }
     }
 
@@ -206,7 +224,11 @@ void TCPConnectionToBackend::handleIO(std::shared_ptr<TCPConnectionToBackend>& c
           // resume sending query
         }
         else {
-          iostate = sendNextQuery(conn);
+          if (conn->d_pendingQueries.empty()) {
+            throw std::runtime_error("TCP connection to a backend in state " + std::to_string((int)conn->d_state) + " with no pending queries");
+          }
+
+          iostate = queueNextQuery(conn);
         }
 
         if (!conn->d_proxyProtocolPayloadAdded && !conn->d_proxyProtocolPayload.empty()) {
@@ -225,12 +247,15 @@ void TCPConnectionToBackend::handleIO(std::shared_ptr<TCPConnectionToBackend>& c
     }
   }
 
-  if (iostate == IOState::Done) {
-    conn->d_ioState->update(iostate, handleIOCallback, conn);
-  }
-  else {
-    conn->d_ioState->update(iostate, handleIOCallback, conn, iostate == IOState::NeedRead ? conn->getBackendReadTTD(now) : conn->getBackendWriteTTD(now));
+  if (conn->d_ioState) {
+    if (iostate == IOState::Done) {
+      conn->d_ioState->update(iostate, handleIOCallback, conn);
+    }
+    else {
+      conn->d_ioState->update(iostate, handleIOCallback, conn, iostate == IOState::NeedRead ? conn->getBackendReadTTD(now) : conn->getBackendWriteTTD(now));
+    }
   }
+
   ioGuard.release();
 }
 
@@ -267,7 +292,8 @@ void TCPConnectionToBackend::queueQuery(TCPQuery&& query, std::shared_ptr<TCPCon
     struct timeval now;
     gettimeofday(&now, 0);
 
-    d_ioState->update(IOState::NeedWrite, handleIOCallback, sharedSelf, getBackendWriteTTD(now));
+    handleIO(sharedSelf, now);
+    // d_ioState->update(IOState::NeedWrite, handleIOCallback, sharedSelf, getBackendWriteTTD(now));
   }
   else {
     // store query in the list of queries to send
@@ -391,7 +417,7 @@ IOState TCPConnectionToBackend::handleResponse(std::shared_ptr<TCPConnectionToBa
   d_downstreamFailures = 0;
 
   auto& clientConn = d_clientConn;
-  if (!clientConn->active()) {
+  if (!clientConn || !clientConn->active()) {
     // a client timeout occured, or something like that */
     d_connectionDied = true;
     d_clientConn.reset();
@@ -431,6 +457,10 @@ IOState TCPConnectionToBackend::handleResponse(std::shared_ptr<TCPConnectionToBa
     auto ids = std::move(it->second.d_idstate);
     d_pendingResponses.erase(it);
     DEBUGLOG("passing response to client connection for "<<ids.qname);
+    /* marking as idle for now, so we can accept new queries if our queues are empty */
+    if (d_pendingQueries.empty() && d_pendingResponses.empty()) {
+      d_state = State::idle;
+    }
     clientConn->handleResponse(clientConn, now, TCPResponse(std::move(d_responseBuffer), std::move(ids), conn));
 
     if (!d_pendingQueries.empty()) {
index c815364831ca60175140c60cb3a908924c4b14aa..bc8d6d442d0804610de5fe347383a9f69777fe32 100644 (file)
@@ -128,7 +128,7 @@ public:
 
   bool isIdle() const
   {
-    return d_pendingQueries.size() == 0 && d_pendingResponses.size() == 0;
+    return d_state == State::idle && d_pendingQueries.size() == 0 && d_pendingResponses.size() == 0;
   }
 
   /* whether a connection can be reused for a different client */
@@ -158,13 +158,16 @@ public:
 
   void queueQuery(TCPQuery&& query, std::shared_ptr<TCPConnectionToBackend>& sharedSelf);
   void handleTimeout(const struct timeval& now, bool write);
+  void release();
+
   void setProxyProtocolPayload(std::string&& payload);
   void setProxyProtocolPayloadAdded(bool added);
 
 private:
   static void handleIO(std::shared_ptr<TCPConnectionToBackend>& conn, const struct timeval& now);
   static void handleIOCallback(int fd, FDMultiplexer::funcparam_t& param);
-  static IOState sendNextQuery(std::shared_ptr<TCPConnectionToBackend>& conn);
+  static IOState queueNextQuery(std::shared_ptr<TCPConnectionToBackend>& conn);
+  static IOState sendQuery(std::shared_ptr<TCPConnectionToBackend>& conn, const struct timeval& now);
 
   IOState handleResponse(std::shared_ptr<TCPConnectionToBackend>& conn, const struct timeval& now);
   uint16_t getQueryIdFromResponse();
index 704e19747664091009ecc5be45f5ee5c39b85f2f..91b090ff9ca0dc2989d3cbd399b9ee9e9516b415 100644 (file)
@@ -179,8 +179,11 @@ public:
   static void handleIO(std::shared_ptr<IncomingTCPConnectionState>& conn, const struct timeval& now);
   static void handleIOCallback(int fd, FDMultiplexer::funcparam_t& param);
   static void notifyIOError(std::shared_ptr<IncomingTCPConnectionState>& state, IDState&& query, const struct timeval& now);
-  static void sendResponse(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now, TCPResponse&& response);
-  static void handleResponse(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now, TCPResponse&& response);
+  static IOState sendResponse(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now, TCPResponse&& response);
+  static void sendOrQueueResponse(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now, TCPResponse&& response);
+
+  /* we take a copy of a shared pointer, not a reference, because the initial shared pointer might be released during the handling of the response */
+  static void handleResponse(std::shared_ptr<IncomingTCPConnectionState> state, const struct timeval& now, TCPResponse&& response);
   static void handleXFRResponse(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now, TCPResponse&& response);
   static void handleTimeout(std::shared_ptr<IncomingTCPConnectionState>& state, bool write);
 
index ae1e457461587e15dd17eb90ba339248a13cfbe4..545bd808082d7e7375250d0295def59d213b37a0 100644 (file)
@@ -234,7 +234,7 @@ class TestOOORWithClientNotBackend(DNSDistTest):
         for idx in range(5):
             self.assertIn('%d.more-queries.ooor.tests.powerdns.com.' % (idx), receivedResponses)
 
-        self.assertLessEqual(OOORTCPResponder.numberOfConnections, 10)
+        self.assertLessEqual(OOORTCPResponder.numberOfConnections, self._concurrentQueriesFromClient)
 
 class TestOOORWithClientAndBackend(DNSDistTest):
     # this test suite uses a different responder port