]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
dnsdist: Better handling of TCP responses mixed with queries
authorRemi Gacogne <remi.gacogne@powerdns.com>
Thu, 11 Feb 2021 18:03:07 +0000 (19:03 +0100)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Tue, 2 Mar 2021 09:53:47 +0000 (10:53 +0100)
pdns/dnsdist-tcp.cc
pdns/dnsdistdist/dnsdist-tcp-upstream.hh

index f63f8b8f82d4b343dfeadde8e17d416951794cd7..d2fdb801197a1726a1f2a32e0eeae04e4181b1b7 100644 (file)
@@ -300,70 +300,44 @@ static IOState sendQueuedResponses(std::shared_ptr<IncomingTCPConnectionState>&
     }
   }
 
-  if (state->d_isXFR) {
-    /* we should still be reading from the backend, and we don't want to read from the client */
-    state->d_state = IncomingTCPConnectionState::State::idle;
-    state->d_currentPos = 0;
-    DEBUGLOG("idling for XFR completion");
-    return IOState::Done;
-  } else {
-    if (state->canAcceptNewQueries()) {
-      DEBUGLOG("waiting for new queries");
-      state->resetForNewQuery();
-      return IOState::NeedRead;
-    }
-    else {
-      DEBUGLOG("idling");
-      state->d_state = IncomingTCPConnectionState::State::idle;
-      return IOState::Done;
-    }
-  }
+  state->d_state = IncomingTCPConnectionState::State::idle;
+  return IOState::Done;
 }
 
-static bool handleResponseSent(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now)
+static void handleResponseSent(std::shared_ptr<IncomingTCPConnectionState>& state)
 {
-  if (!state->d_isXFR) {
-    --state->d_currentQueriesCount;
-
-    const auto& currentResponse = state->d_currentResponse;
-    if (currentResponse.d_selfGenerated == false && currentResponse.d_connection && currentResponse.d_connection->getDS()) {
-      const auto& ds = currentResponse.d_connection->getDS();
-      struct timespec answertime;
-      gettime(&answertime);
-      const auto& ids = currentResponse.d_idstate;
-      double udiff = ids.sentTime.udiff();
-      g_rings.insertResponse(answertime, state->d_ci.remote, ids.qname, ids.qtype, static_cast<unsigned int>(udiff), static_cast<unsigned int>(currentResponse.d_buffer.size()), currentResponse.d_cleartextDH, ds->remote);
-      vinfolog("Got answer from %s, relayed to %s (%s), took %f usec", ds->remote.toStringWithPort(), ids.origRemote.toStringWithPort(), (state->d_ci.cs->tlsFrontend ? "DoT" : "TCP"), udiff);
-    }
-
-    switch (currentResponse.d_cleartextDH.rcode) {
-    case RCode::NXDomain:
-      ++g_stats.frontendNXDomain;
-      break;
-    case RCode::ServFail:
-      ++g_stats.servfailResponses;
-      ++g_stats.frontendServFail;
-      break;
-    case RCode::NoError:
-      ++g_stats.frontendNoError;
-      break;
-    }
+  if (state->d_isXFR) {
+    return;
+  }
 
-    if (g_maxTCPQueriesPerConn && state->d_queriesCount > g_maxTCPQueriesPerConn) {
-      vinfolog("Terminating TCP connection from %s because it reached the maximum number of queries per conn (%d / %d)", state->d_ci.remote.toStringWithPort(), state->d_queriesCount, g_maxTCPQueriesPerConn);
-      return false;
-    }
+  --state->d_currentQueriesCount;
 
-    if (state->maxConnectionDurationReached(g_maxTCPConnectionDuration, now)) {
-      vinfolog("Terminating TCP connection from %s because it reached the maximum TCP connection duration", state->d_ci.remote.toStringWithPort());
-      return false;
-    }
+  const auto& currentResponse = state->d_currentResponse;
+  if (currentResponse.d_selfGenerated == false && currentResponse.d_connection && currentResponse.d_connection->getDS()) {
+    const auto& ds = currentResponse.d_connection->getDS();
+    struct timespec answertime;
+    gettime(&answertime);
+    const auto& ids = currentResponse.d_idstate;
+    double udiff = ids.sentTime.udiff();
+    g_rings.insertResponse(answertime, state->d_ci.remote, ids.qname, ids.qtype, static_cast<unsigned int>(udiff), static_cast<unsigned int>(currentResponse.d_buffer.size()), currentResponse.d_cleartextDH, ds->remote);
+    vinfolog("Got answer from %s, relayed to %s (%s), took %f usec", ds->remote.toStringWithPort(), ids.origRemote.toStringWithPort(), (state->d_ci.cs->tlsFrontend ? "DoT" : "TCP"), udiff);
   }
 
-  return true;
+  switch (currentResponse.d_cleartextDH.rcode) {
+  case RCode::NXDomain:
+    ++g_stats.frontendNXDomain;
+    break;
+  case RCode::ServFail:
+    ++g_stats.servfailResponses;
+    ++g_stats.frontendServFail;
+    break;
+  case RCode::NoError:
+    ++g_stats.frontendNoError;
+    break;
+  }
 }
 
-bool IncomingTCPConnectionState::canAcceptNewQueries() const
+bool IncomingTCPConnectionState::canAcceptNewQueries(const struct timeval& now)
 {
   if (d_isXFR) {
     DEBUGLOG("not accepting new queries because used for XFR");
@@ -375,6 +349,16 @@ bool IncomingTCPConnectionState::canAcceptNewQueries() const
     return false;
   }
 
+  if (g_maxTCPQueriesPerConn && d_queriesCount > g_maxTCPQueriesPerConn) {
+    vinfolog("not accepting new queries from %s because it reached the maximum number of queries per conn (%d / %d)", d_ci.remote.toStringWithPort(), d_queriesCount, g_maxTCPQueriesPerConn);
+    return false;
+  }
+
+  if (maxConnectionDurationReached(g_maxTCPConnectionDuration, now)) {
+    vinfolog("not accepting new queries from %s because it reached the maximum TCP connection duration", d_ci.remote.toStringWithPort());
+    return false;
+  }
+
   return true;
 }
 
@@ -383,7 +367,7 @@ void IncomingTCPConnectionState::resetForNewQuery()
   d_buffer.resize(sizeof(uint16_t));
   d_currentPos = 0;
   d_querySize = 0;
-  d_state = State::readingQuerySize;
+  d_state = State::waitingForQuery;
 }
 
 std::shared_ptr<TCPConnectionToBackend> IncomingTCPConnectionState::getActiveDownstreamConnection(const std::shared_ptr<DownstreamState>& ds, const std::unique_ptr<std::vector<ProxyProtocolValue>>& tlvs)
@@ -428,13 +412,12 @@ IOState IncomingTCPConnectionState::sendResponse(std::shared_ptr<IncomingTCPConn
   try {
     auto iostate = state->d_handler.tryWrite(state->d_currentResponse.d_buffer, state->d_currentPos, state->d_currentResponse.d_buffer.size());
     if (iostate == IOState::Done) {
-      DEBUGLOG("response sent");
-      if (!handleResponseSent(state, now)) {
-        return IOState::Done;
-      }
-      return sendQueuedResponses(state, now);
+      DEBUGLOG("response sent from "<<__PRETTY_FUNCTION__);
+      handleResponseSent(state);
+      return iostate;
     } else {
-      return IOState::NeedWrite;
+      state->d_lastIOBlocked = true;
+     return IOState::NeedWrite;
       DEBUGLOG("partial write");
     }
   }
@@ -451,38 +434,46 @@ IOState IncomingTCPConnectionState::sendResponse(std::shared_ptr<IncomingTCPConn
 
 void IncomingTCPConnectionState::terminateClientConnection()
 {
+  cerr<<"terminating client connection"<<endl;
   d_queuedResponses.clear();
   /* we have already released idle connections that could be reused,
      we don't care about the ones still waiting for responses */
   d_activeConnectionsToBackend.clear();
   /* meaning we will no longer be 'active' when the backend
      response or timeout comes in */
-  d_ioState->reset();
+  d_ioState.reset();
   d_handler.close();
 }
 
-/* called when handling a response or error coming from a backend */
-void IncomingTCPConnectionState::sendOrQueueResponse(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now, TCPResponse&& response)
+void IncomingTCPConnectionState::queueResponse(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now, TCPResponse&& response)
 {
-  // if we were already reading a query (not the query size, mind you), or sending a response we need to queue the response
-  // otherwise we can start sending it right away
+  // queue response
+  state->d_queuedResponses.push_back(std::move(response));
+  DEBUGLOG("queueing response, state is "<<(int)state->d_state<<", queue size is now "<<state->d_queuedResponses.size());
+
+  // when the response comes from a backend, there is a real possibility that we are currently
+  // idle, and thus not trying to send the response right away would make our ref count go to 0.
+  // Even if we are waiting for a query, we will not wake up before the new query arrives or a
+  // timeout occurs
   if (state->d_state == IncomingTCPConnectionState::State::idle ||
-      state->d_state == IncomingTCPConnectionState::State::readingProxyProtocolHeader ||
-      state->d_state == IncomingTCPConnectionState::State::readingQuerySize) {
+      state->d_state == IncomingTCPConnectionState::State::waitingForQuery) {
+    auto iostate = sendQueuedResponses(state, now);
 
-    auto iostate = sendResponse(state, now, std::move(response));
+    if (iostate == IOState::Done && state->canAcceptNewQueries(now)) {
+      state->resetForNewQuery();
+      state->d_state = IncomingTCPConnectionState::State::waitingForQuery;
+      iostate = IOState::NeedRead;
+    }
+
+    // for the same reason we need to update the state right away, nobody will do that for us
     state->d_ioState->update(iostate, handleIOCallback, state, iostate == IOState::NeedWrite ? state->getClientWriteTTD(now) : state->getClientReadTTD(now));
   }
-  else {
-    // queue response
-    state->d_queuedResponses.push_back(std::move(response));
-    DEBUGLOG("queueing response because state is "<<(int)state->d_state<<", queue size is now "<<state->d_queuedResponses.size());
-  }
 }
 
 /* called from the backend code when a new response has been received */
 void IncomingTCPConnectionState::handleResponse(std::shared_ptr<IncomingTCPConnectionState> state, const struct timeval& now, TCPResponse&& response)
 {
+  cerr<<"in "<<__PRETTY_FUNCTION__<<endl;
   if (!state->d_isXFR && response.d_connection && response.d_connection->isIdle()) {
     // if we have added a TCP Proxy Protocol payload to a connection, don't release it to the general pool yet, no one else will be able to use it anyway
     if (response.d_connection->canBeReused()) {
@@ -505,12 +496,14 @@ void IncomingTCPConnectionState::handleResponse(std::shared_ptr<IncomingTCPConne
   }
 
   if (response.d_buffer.size() < sizeof(dnsheader)) {
+    cerr<<"too small!"<<endl;
     return;
   }
 
   auto& ids = response.d_idstate;
   unsigned int qnameWireLength;
   if (!responseContentMatches(response.d_buffer, ids.qname, ids.qtype, ids.qclass, response.d_connection->getRemote(), qnameWireLength)) {
+    cerr<<"does not match!"<<endl;
     return;
   }
 
@@ -519,6 +512,7 @@ void IncomingTCPConnectionState::handleResponse(std::shared_ptr<IncomingTCPConne
   memcpy(&response.d_cleartextDH, dr.getHeader(), sizeof(response.d_cleartextDH));
 
   if (!processResponse(response.d_buffer, state->d_threadData.localRespRulactions, dr, false)) {
+    cerr<<"processResponse failed"<<endl;
     return;
   }
 
@@ -540,14 +534,16 @@ void IncomingTCPConnectionState::handleResponse(std::shared_ptr<IncomingTCPConne
     }
   }
 
-  sendOrQueueResponse(state, now, std::move(response));
+  cerr<<"calling queueResponse"<<endl;
+  queueResponse(state, now, std::move(response));
 }
 
-static IOState handleQuery(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now)
+static void handleQuery(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now)
 {
   if (state->d_querySize < sizeof(dnsheader)) {
     ++g_stats.nonCompliantQueries;
-    return IOState::NeedRead;
+    state->terminateClientConnection();
+    return;
   }
 
   state->d_readingFirstQuery = false;
@@ -587,14 +583,16 @@ static IOState handleQuery(std::shared_ptr<IncomingTCPConnectionState>& state, c
     TCPResponse response;
     state->d_state = IncomingTCPConnectionState::State::idle;
     ++state->d_currentQueriesCount;
-    return state->sendResponse(state, now, std::move(response));
+    state->queueResponse(state, now, std::move(response));
+    return;
   }
 
   {
     /* this pointer will be invalidated the second the buffer is resized, don't hold onto it! */
     auto* dh = reinterpret_cast<dnsheader*>(state->d_buffer.data());
     if (!checkQueryHeaders(dh)) {
-      return IOState::NeedRead;
+      state->terminateClientConnection();
+      return;
     }
 
     if (dh->qdcount == 0) {
@@ -605,7 +603,8 @@ static IOState handleQuery(std::shared_ptr<IncomingTCPConnectionState>& state, c
       response.d_buffer = std::move(state->d_buffer);
       state->d_state = IncomingTCPConnectionState::State::idle;
       ++state->d_currentQueriesCount;
-      return state->sendResponse(state, now, std::move(response));
+      state->queueResponse(state, now, std::move(response));
+      return;
     }
   }
 
@@ -630,7 +629,8 @@ static IOState handleQuery(std::shared_ptr<IncomingTCPConnectionState>& state, c
   auto result = processQuery(dq, *state->d_ci.cs, state->d_threadData.holders, ds);
 
   if (result == ProcessQueryResult::Drop) {
-    return IOState::Done;
+    state->terminateClientConnection();
+    return;
   }
 
   // the buffer might have been invalidated by now
@@ -641,11 +641,13 @@ static IOState handleQuery(std::shared_ptr<IncomingTCPConnectionState>& state, c
     response.d_buffer = std::move(state->d_buffer);
     state->d_state = IncomingTCPConnectionState::State::idle;
     ++state->d_currentQueriesCount;
-    return state->sendResponse(state, now, std::move(response));
+    state->queueResponse(state, now, std::move(response));
+    return;
   }
 
   if (result != ProcessQueryResult::PassToBackend || ds == nullptr) {
-    return IOState::Done;
+    state->terminateClientConnection();
+    return;
   }
 
   IDState ids;
@@ -693,8 +695,6 @@ static IOState handleQuery(std::shared_ptr<IncomingTCPConnectionState>& state, c
   ++state->d_currentQueriesCount;
   vinfolog("Got query for %s|%s from %s (%s, %d bytes), relayed to %s", ids.qname.toLogString(), QType(ids.qtype).getName(), state->d_proxiedRemote.toStringWithPort(), (state->d_ci.cs->tlsFrontend ? "DoT" : "TCP"), state->d_buffer.size(), ds->getName());
   downstreamConnection->queueQuery(TCPQuery(std::move(state->d_buffer), std::move(ids)), downstreamConnection);
-
-  return IOState::NeedRead;
 }
 
 void IncomingTCPConnectionState::handleIOCallback(int fd, FDMultiplexer::funcparam_t& param)
@@ -713,7 +713,6 @@ void IncomingTCPConnectionState::handleIO(std::shared_ptr<IncomingTCPConnectionS
 {
   // why do we loop? Because the TLS layer does buffering, and thus can have data ready to read
   // even though the underlying socket is not ready, so we need to actually ask for the data first
-  bool wouldBlock = false;
   IOState iostate = IOState::Done;
   do {
     iostate = IOState::Done;
@@ -726,6 +725,8 @@ void IncomingTCPConnectionState::handleIO(std::shared_ptr<IncomingTCPConnectionS
       return;
     }
 
+    state->d_lastIOBlocked = false;
+
     try {
       if (state->d_state == IncomingTCPConnectionState::State::doingHandshake) {
         DEBUGLOG("doing handshake");
@@ -758,11 +759,11 @@ void IncomingTCPConnectionState::handleIO(std::shared_ptr<IncomingTCPConnectionS
           }
         }
         else {
-          wouldBlock = true;
+          state->d_lastIOBlocked = true;
         }
       }
 
-      if (!wouldBlock && state->d_state == IncomingTCPConnectionState::State::readingProxyProtocolHeader) {
+      if (!state->d_lastIOBlocked && state->d_state == IncomingTCPConnectionState::State::readingProxyProtocolHeader) {
         DEBUGLOG("reading proxy protocol header");
         do {
           iostate = state->d_handler.tryRead(state->d_buffer, state->d_currentPos, state->d_proxyProtocolNeed);
@@ -800,15 +801,21 @@ void IncomingTCPConnectionState::handleIO(std::shared_ptr<IncomingTCPConnectionS
             }
           }
           else {
-            wouldBlock = true;
+            state->d_lastIOBlocked = true;
           }
         }
-        while (!wouldBlock);
+        while (state->active() && !state->d_lastIOBlocked);
       }
 
-      if (!wouldBlock && state->d_state == IncomingTCPConnectionState::State::readingQuerySize) {
+      if (!state->d_lastIOBlocked && (state->d_state == IncomingTCPConnectionState::State::waitingForQuery ||
+                                      state->d_state == IncomingTCPConnectionState::State::readingQuerySize)) {
         DEBUGLOG("reading query size");
         iostate = state->d_handler.tryRead(state->d_buffer, state->d_currentPos, sizeof(uint16_t));
+        if (state->d_currentPos > 0) {
+          /* if we got at least one byte, we can't go around sending responses */
+          state->d_state = IncomingTCPConnectionState::State::readingQuerySize;
+        }
+
         if (iostate == IOState::Done) {
           DEBUGLOG("query size received");
           state->d_state = IncomingTCPConnectionState::State::readingQuery;
@@ -828,77 +835,76 @@ void IncomingTCPConnectionState::handleIO(std::shared_ptr<IncomingTCPConnectionS
           state->d_currentPos = 0;
         }
         else {
-          wouldBlock = true;
+          state->d_lastIOBlocked = true;
         }
       }
 
-      if (!wouldBlock && state->d_state == IncomingTCPConnectionState::State::readingQuery) {
+      if (!state->d_lastIOBlocked && state->d_state == IncomingTCPConnectionState::State::readingQuery) {
         DEBUGLOG("reading query");
         iostate = state->d_handler.tryRead(state->d_buffer, state->d_currentPos, state->d_querySize);
         if (iostate == IOState::Done) {
           DEBUGLOG("query received");
           state->d_buffer.resize(state->d_querySize);
 
-          iostate = handleQuery(state, now);
-          // if the query has been passed to a backend, or dropped, we can start
-          // reading again, or sending queued responses
-          if (iostate == IOState::NeedRead) {
-            if (state->d_queuedResponses.empty()) {
-              if (state->canAcceptNewQueries()) {
-                state->resetForNewQuery();
-              }
-              else {
-                state->d_state = IncomingTCPConnectionState::State::idle;
-                iostate = IOState::Done;
-              }
-            }
-            else {
-              TCPResponse resp = std::move(state->d_queuedResponses.front());
-              state->d_queuedResponses.pop_front();
-              ioGuard.release();
-              state->d_state = IncomingTCPConnectionState::State::idle;
-              iostate = sendResponse(state, now, std::move(resp));
-              if (iostate != IOState::Done) {
-                wouldBlock = true;
-              }
-            }
-          }
-          else if (iostate != IOState::Done) {
-            wouldBlock = true;
+          state->d_state = IncomingTCPConnectionState::State::idle;
+          handleQuery(state, now);
+          cerr<<"out of handleQuery, state is "<<(int)state->d_state<<", iostate is "<<(int)iostate<<endl;
+          /* the state might have been updated in the meantime, we don't want to override it
+             in that case */
+          if (state->active() && state->d_state != IncomingTCPConnectionState::State::idle) {
+            iostate = state->d_ioState->getState();
           }
         }
         else {
-          wouldBlock = true;
+          state->d_lastIOBlocked = true;
         }
       }
 
-      if (!wouldBlock && state->d_state == IncomingTCPConnectionState::State::sendingResponse) {
+      if (!state->d_lastIOBlocked && state->d_state == IncomingTCPConnectionState::State::sendingResponse) {
         DEBUGLOG("sending response");
         iostate = state->d_handler.tryWrite(state->d_currentResponse.d_buffer, state->d_currentPos, state->d_currentResponse.d_buffer.size());
         if (iostate == IOState::Done) {
-          DEBUGLOG("response sent");
-          if (!handleResponseSent(state, now)) {
-            iostate = IOState::Done;
-          }
-          else {
-            iostate = sendQueuedResponses(state, now);
+          DEBUGLOG("response sent from "<<__PRETTY_FUNCTION__);
+          handleResponseSent(state);
+          state->d_state = IncomingTCPConnectionState::State::idle;
+        }
+        else {
+          state->d_lastIOBlocked = true;
+        }
+      }
+
+      if (state->active() &&
+          !state->d_lastIOBlocked &&
+          iostate == IOState::Done &&
+          (state->d_state == IncomingTCPConnectionState::State::idle ||
+           state->d_state == IncomingTCPConnectionState::State::waitingForQuery))
+      {
+        // try sending querued responses
+        cerr<<"send responses, if any"<<endl;
+        iostate = sendQueuedResponses(state, now);
+
+        if (!state->d_lastIOBlocked && iostate == IOState::Done) {
+          // if the query has been passed to a backend, or dropped, and the responses have been sent,
+          // we can start reading again
+          if (!state->d_isXFR && state->canAcceptNewQueries(now)) {
+            cerr<<"reset for new query"<<endl;
+            state->resetForNewQuery();
+            iostate = IOState::NeedRead;
           }
-        } else {
-          wouldBlock = true;
-          DEBUGLOG("partial write");
         }
       }
 
       if (state->d_state != IncomingTCPConnectionState::State::idle &&
           state->d_state != IncomingTCPConnectionState::State::doingHandshake &&
           state->d_state != IncomingTCPConnectionState::State::readingProxyProtocolHeader &&
+          state->d_state != IncomingTCPConnectionState::State::waitingForQuery &&
           state->d_state != IncomingTCPConnectionState::State::readingQuerySize &&
           state->d_state != IncomingTCPConnectionState::State::readingQuery &&
           state->d_state != IncomingTCPConnectionState::State::sendingResponse) {
         vinfolog("Unexpected state %d in handleIOCallback", static_cast<int>(state->d_state));
       }
     }
-    catch(const std::exception& e) {
+    catch (const std::exception& e) {
       /* most likely an EOF because the other end closed the connection,
          but it might also be a real IO error or something else.
          Let's just drop the connection
@@ -906,6 +912,7 @@ void IncomingTCPConnectionState::handleIO(std::shared_ptr<IncomingTCPConnectionS
       if (state->d_state == IncomingTCPConnectionState::State::idle ||
           state->d_state == IncomingTCPConnectionState::State::doingHandshake ||
           state->d_state != IncomingTCPConnectionState::State::readingProxyProtocolHeader ||
+          state->d_state == IncomingTCPConnectionState::State::waitingForQuery ||
           state->d_state == IncomingTCPConnectionState::State::readingQuerySize ||
           state->d_state == IncomingTCPConnectionState::State::readingQuery) {
         ++state->d_ci.cs->tcpDiedReadingQuery;
@@ -926,6 +933,11 @@ void IncomingTCPConnectionState::handleIO(std::shared_ptr<IncomingTCPConnectionS
       iostate = IOState::Done;
     }
 
+    if (!state->active()) {
+      cerr<<"state is no longer active"<<endl;
+      return;
+    }
+
     if (iostate == IOState::Done) {
       state->d_ioState->update(iostate, handleIOCallback, state);
     }
@@ -934,7 +946,7 @@ void IncomingTCPConnectionState::handleIO(std::shared_ptr<IncomingTCPConnectionS
     }
     ioGuard.release();
   }
-  while ((iostate == IOState::NeedRead || iostate == IOState::NeedWrite) && !wouldBlock);
+  while ((iostate == IOState::NeedRead || iostate == IOState::NeedWrite) && !state->d_lastIOBlocked);
 }
 
 void IncomingTCPConnectionState::notifyIOError(std::shared_ptr<IncomingTCPConnectionState>& state, IDState&& query, const struct timeval& now)
@@ -950,7 +962,7 @@ void IncomingTCPConnectionState::notifyIOError(std::shared_ptr<IncomingTCPConnec
     state->d_queuedResponses.pop_front();
     state->d_state = IncomingTCPConnectionState::State::idle;
     try {
-      sendOrQueueResponse(state, now, std::move(resp));
+      queueResponse(state, now, std::move(resp));
     }
     catch (const std::exception& e) {
       vinfolog("exception in notifyIOError: %s", e.what());
@@ -964,7 +976,7 @@ void IncomingTCPConnectionState::notifyIOError(std::shared_ptr<IncomingTCPConnec
 
 void IncomingTCPConnectionState::handleXFRResponse(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now, TCPResponse&& response)
 {
-  sendOrQueueResponse(state, now, std::move(response));
+  queueResponse(state, now, std::move(response));
 }
 
 void IncomingTCPConnectionState::handleTimeout(std::shared_ptr<IncomingTCPConnectionState>& state, bool write)
index a9ba94c1ae0a4d9d0ea390eed08cce25ab068d0a..f3724b85c8b6087e31169752966ffa0d61882c4e 100644 (file)
@@ -155,7 +155,7 @@ public:
   static void handleIOCallback(int fd, FDMultiplexer::funcparam_t& param);
   static void notifyIOError(std::shared_ptr<IncomingTCPConnectionState>& state, IDState&& query, const struct timeval& now);
   static IOState sendResponse(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now, TCPResponse&& response);
-  static void sendOrQueueResponse(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now, TCPResponse&& response);
+  static void queueResponse(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now, TCPResponse&& response);
 
   /* we take a copy of a shared pointer, not a reference, because the initial shared pointer might be released during the handling of the response */
   static void handleResponse(std::shared_ptr<IncomingTCPConnectionState> state, const struct timeval& now, TCPResponse&& response);
@@ -165,7 +165,7 @@ public:
   void terminateClientConnection();
   void queueQuery(TCPQuery&& query);
 
-  bool canAcceptNewQueries() const;
+  bool canAcceptNewQueries(const struct timeval& now);
 
   bool active() const
   {
@@ -179,7 +179,7 @@ public:
     return o.str();
   }
 
-  enum class State { doingHandshake, readingProxyProtocolHeader, readingQuerySize, readingQuery, sendingResponse, idle /* in case of XFR, we stop processing queries */ };
+  enum class State { doingHandshake, readingProxyProtocolHeader, waitingForQuery, readingQuerySize, readingQuery, sendingResponse, idle /* in case of XFR, we stop processing queries */ };
 
   std::map<std::shared_ptr<DownstreamState>, std::deque<std::shared_ptr<TCPConnectionToBackend>>> d_activeConnectionsToBackend;
   PacketBuffer d_buffer;
@@ -209,5 +209,5 @@ public:
   bool d_isXFR{false};
   bool d_xfrStarted{false};
   bool d_proxyProtocolPayloadHasTLV{false};
+  bool d_lastIOBlocked{false};
 };
-