]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
dnsdist: Try to reuse active connections
authorRemi Gacogne <remi.gacogne@powerdns.com>
Tue, 8 Sep 2020 11:12:27 +0000 (13:12 +0200)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Tue, 10 Nov 2020 08:46:53 +0000 (09:46 +0100)
pdns/dnsdist-tcp.cc
pdns/dnsdist.hh
pdns/dnsdistdist/dnsdist-tcp-downstream.cc
pdns/dnsdistdist/dnsdist-tcp-downstream.hh
pdns/dnsdistdist/dnsdist-tcp-upstream.hh
pdns/dnsdistdist/tcpiohandler-mplexer.hh
regression-tests.dnsdist/test_ProxyProtocol.py

index 3533bccda5e78145249de55c325324acdbba065b..dae2462a6338bdf6a6e33c78a6c13ea03c38c051 100644 (file)
@@ -70,9 +70,9 @@ class DownstreamConnectionsManager
 {
 public:
 
-  static std::unique_ptr<TCPConnectionToBackend> getConnectionToDownstream(std::unique_ptr<FDMultiplexer>& mplexer, std::shared_ptr<DownstreamState>& ds, const struct timeval& now)
+  static std::shared_ptr<TCPConnectionToBackend> getConnectionToDownstream(std::unique_ptr<FDMultiplexer>& mplexer, std::shared_ptr<DownstreamState>& ds, const struct timeval& now)
   {
-    std::unique_ptr<TCPConnectionToBackend> result;
+    std::shared_ptr<TCPConnectionToBackend> result;
 
     const auto& it = t_downstreamConnections.find(ds->remote);
     if (it != t_downstreamConnections.end()) {
@@ -85,10 +85,10 @@ public:
       }
     }
 
-    return make_unique<TCPConnectionToBackend>(ds, now);
+    return std::make_shared<TCPConnectionToBackend>(ds, now);
   }
 
-  static void releaseDownstreamConnection(std::unique_ptr<TCPConnectionToBackend>&& conn)
+  static void releaseDownstreamConnection(std::shared_ptr<TCPConnectionToBackend>&& conn)
   {
     if (conn == nullptr) {
       return;
@@ -108,6 +108,7 @@ public:
         conn.reset();
         return;
       }
+
       list.push_back(std::move(conn));
     }
     else {
@@ -137,11 +138,11 @@ public:
   }
 
 private:
-  static thread_local map<ComboAddress, std::deque<std::unique_ptr<TCPConnectionToBackend>>> t_downstreamConnections;
+  static thread_local map<ComboAddress, std::deque<std::shared_ptr<TCPConnectionToBackend>>> t_downstreamConnections;
   static const size_t s_maxCachedConnectionsPerDownstream;
 };
 
-thread_local map<ComboAddress, std::deque<std::unique_ptr<TCPConnectionToBackend>>> DownstreamConnectionsManager::t_downstreamConnections;
+thread_local map<ComboAddress, std::deque<std::shared_ptr<TCPConnectionToBackend>>> DownstreamConnectionsManager::t_downstreamConnections;
 const size_t DownstreamConnectionsManager::s_maxCachedConnectionsPerDownstream{20};
 
 static void decrementTCPClientCount(const ComboAddress& client)
@@ -157,53 +158,22 @@ static void decrementTCPClientCount(const ComboAddress& client)
 
 IncomingTCPConnectionState::~IncomingTCPConnectionState()
 {
-  // DEBUG: cerr<<"in "<<__PRETTY_FUNCTION__<<endl;
+  DEBUGLOG("in "<<__PRETTY_FUNCTION__);
   decrementTCPClientCount(d_ci.remote);
-  // DEBUG: cerr<<"decremented"<<endl;
 
   if (d_ci.cs != nullptr) {
     struct timeval now;
     gettimeofday(&now, nullptr);
 
     auto diff = now - d_connectionStartTime;
-    // DEBUG: cerr<<"updating tcp metrics"<<endl;
     d_ci.cs->updateTCPMetrics(d_queriesCount, diff.tv_sec * 1000.0 + diff.tv_usec / 1000.0);
-    // DEBUG: cerr<<"updated tcp metrics"<<endl;
   }
 
-#if 0
-  if (d_ds != nullptr) {
-
-    if (d_downstreamConnection) {
-      try {
-        if (d_lastIOState == IOState::NeedRead) {
-          // DEBUG: cerr<<__PRETTY_FUNCTION__<<": removing leftover backend read FD "<<d_downstreamConnection->getHandle()<<endl;
-          d_threadData.mplexer->removeReadFD(d_downstreamConnection->getHandle());
-        }
-        else if (d_lastIOState == IOState::NeedWrite) {
-          // DEBUG: cerr<<__PRETTY_FUNCTION__<<": removing leftover backend write FD "<<d_downstreamConnection->getHandle()<<endl;
-          d_threadData.mplexer->removeWriteFD(d_downstreamConnection->getHandle());
-        }
-      }
-      catch(const FDMultiplexerException& e) {
-        vinfolog("Got an exception when trying to remove a pending IO operation on the socket to the %s backend: %s", d_ds->getName(), e.what());
-      }
-      catch(const std::runtime_error& e) {
-        /* might be thrown by getHandle() */
-        vinfolog("Got an exception when trying to remove a pending IO operation on the socket to the %s backend: %s", d_ds->getName(), e.what());
-      }
-    }
-  }
-#endif
-
-  // DEBUG: cerr<<"about to remove left over FDs"<<endl;
   try {
     if (d_lastIOState == IOState::NeedRead) {
-      // DEBUG: cerr<<__PRETTY_FUNCTION__<<": removing leftover client read FD "<<d_ci.fd<<endl;
       d_threadData.mplexer->removeReadFD(d_ci.fd);
     }
     else if (d_lastIOState == IOState::NeedWrite) {
-      // DEBUG: cerr<<__PRETTY_FUNCTION__<<": removing leftover client write FD "<<d_ci.fd<<endl;
       d_threadData.mplexer->removeWriteFD(d_ci.fd);
     }
   }
@@ -213,7 +183,6 @@ IncomingTCPConnectionState::~IncomingTCPConnectionState()
   catch (...) {
     vinfolog("Got an unknown exception when trying to remove a pending IO operation on an incoming TCP connection from %s", d_ci.remote.toStringWithPort());
   }
-  // DEBUG: cerr<<"done"<<endl;
 }
 
 std::shared_ptr<TCPConnectionToBackend> IncomingTCPConnectionState::getDownstreamConnection(std::shared_ptr<DownstreamState>& ds, const struct timeval& now)
@@ -227,6 +196,7 @@ std::shared_ptr<TCPConnectionToBackend> IncomingTCPConnectionState::getDownstrea
   if (!downstream) {
     /* we don't have a connection to this backend active yet, let's ask one (it might not be a fresh one, though) */
     downstream = DownstreamConnectionsManager::getConnectionToDownstream(d_threadData.mplexer, ds, now);
+    registerActiveDownstreamConnection(downstream);
   }
 
   return downstream;
@@ -298,55 +268,22 @@ void TCPClientCollection::addTCPClientThread()
   }
 }
 
-/* Tries to read exactly toRead bytes into the buffer, starting at position pos.
-   Updates pos everytime a successful read occurs,
-   throws an std::runtime_error in case of IO error,
-   return Done when toRead bytes have been read, needRead or needWrite if the IO operation
-   would block.
-*/
-// XXX could probably be implemented as a TCPIOHandler
-IOState tryRead(int fd, std::vector<uint8_t>& buffer, size_t& pos, size_t toRead)
-{
-  if (buffer.size() < (pos + toRead)) {
-    throw std::out_of_range("Calling tryRead() with a too small buffer (" + std::to_string(buffer.size()) + ") for a read of " + std::to_string(toRead) + " bytes starting at " + std::to_string(pos));
-  }
-
-  size_t got = 0;
-  do {
-    ssize_t res = ::read(fd, reinterpret_cast<char*>(&buffer.at(pos)), toRead - got);
-    if (res == 0) {
-      throw runtime_error("EOF while reading message");
-    }
-    if (res < 0) {
-      if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOTCONN) {
-        return IOState::NeedRead;
-      }
-      else {
-        throw std::runtime_error(std::string("Error while reading message: ") + stringerror());
-      }
-    }
-
-    pos += static_cast<size_t>(res);
-    got += static_cast<size_t>(res);
-  }
-  while (got < toRead);
-
-  return IOState::Done;
-}
-
 std::unique_ptr<TCPClientCollection> g_tcpclientthreads;
 
 static IOState handleResponseSent(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now)
 {
+  --state->d_currentQueriesCount;
+
   if (!state->d_isXFR) {
     const auto& currentResponse = state->d_currentResponse;
-    if (state->d_selfGeneratedResponse == false && currentResponse.d_ds) {
+    if (state->d_selfGeneratedResponse == false && currentResponse.d_connection && currentResponse.d_connection->getDS()) {
+      const auto& ds = currentResponse.d_connection->getDS();
       struct timespec answertime;
       gettime(&answertime);
       const auto& ids = currentResponse.d_idstate;
       double udiff = ids.sentTime.udiff();
-      g_rings.insertResponse(answertime, state->d_ci.remote, ids.qname, ids.qtype, static_cast<unsigned int>(udiff), static_cast<unsigned int>(currentResponse.d_buffer.size()), currentResponse.d_cleartextDH, currentResponse.d_ds->remote);
-      vinfolog("Got answer from %s, relayed to %s (%s), took %f usec", currentResponse.d_ds->remote.toStringWithPort(), ids.origRemote.toStringWithPort(), (state->d_ci.cs->tlsFrontend ? "DoT" : "TCP"), udiff);
+      g_rings.insertResponse(answertime, state->d_ci.remote, ids.qname, ids.qtype, static_cast<unsigned int>(udiff), static_cast<unsigned int>(currentResponse.d_buffer.size()), currentResponse.d_cleartextDH, ds->remote);
+      vinfolog("Got answer from %s, relayed to %s (%s), took %f usec", ds->remote.toStringWithPort(), ids.origRemote.toStringWithPort(), (state->d_ci.cs->tlsFrontend ? "DoT" : "TCP"), udiff);
     }
 
     switch (currentResponse.d_cleartextDH.rcode) {
@@ -374,21 +311,27 @@ static IOState handleResponseSent(std::shared_ptr<IncomingTCPConnectionState>& s
   }
 
   if (state->d_queuedResponses.empty()) {
-    // DEBUG: cerr<<"no response remaining"<<endl;
+    DEBUGLOG("no response remaining");
     if (state->d_isXFR) {
       /* we should still be reading from the backend, and we don't want to read from the client */
       state->d_state = IncomingTCPConnectionState::State::idle;
       state->d_currentPos = 0;
-      // DEBUG: cerr<<"idling for XFR completion"<<endl;
+      DEBUGLOG("idling for XFR completion");
       return IOState::Done;
     } else {
-      // DEBUG: cerr<<"reading new queries if any"<<endl;
-      state->resetForNewQuery();
-      return IOState::NeedRead;
+      DEBUGLOG("reading new queries if any");
+      if (state->canAcceptNewQueries()) {
+        state->resetForNewQuery();
+        return IOState::NeedRead;
+      }
+      else {
+        state->d_state = IncomingTCPConnectionState::State::idle;
+        return IOState::Done;
+      }
     }
   }
   else {
-    // DEBUG: cerr<<"queue size is "<<state->d_queuedResponses.size()<<endl;
+    DEBUGLOG("queue size is "<<state->d_queuedResponses.size());
     TCPResponse resp = std::move(state->d_queuedResponses.front());
     state->d_queuedResponses.pop_front();
     state->d_state = IncomingTCPConnectionState::State::idle;
@@ -397,22 +340,63 @@ static IOState handleResponseSent(std::shared_ptr<IncomingTCPConnectionState>& s
   }
 }
 
+
+bool IncomingTCPConnectionState::canAcceptNewQueries() const
+{
+  if (d_isXFR) {
+    DEBUGLOG("not accepting new queries because used for XFR");
+    return false;
+  }
+
+  // d_state ?
+  if (d_currentQueriesCount >= d_ci.cs->d_maxInFlightQueriesPerConn) {
+    DEBUGLOG("not accepting new queries because we already have "<<d_currentQueriesCount<<" out of "<<d_ci.cs->d_maxInFlightQueriesPerConn);
+    return false;
+  }
+
+  DEBUGLOG("accepting new queries");
+  return true;
+}
+
 void IncomingTCPConnectionState::resetForNewQuery()
 {
   d_buffer.resize(sizeof(uint16_t));
   d_currentPos = 0;
   d_querySize = 0;
-  //d_responseSize = 0;
   d_downstreamFailures = 0;
   d_state = State::readingQuerySize;
   d_lastIOState = IOState::Done;
   d_selfGeneratedResponse = false;
 }
 
+std::shared_ptr<TCPConnectionToBackend> IncomingTCPConnectionState::getActiveDownstreamConnection(const std::shared_ptr<DownstreamState>& ds)
+{
+  auto it = d_activeConnectionsToBackend.find(ds->remote);
+  if (it == d_activeConnectionsToBackend.end()) {
+    DEBUGLOG("no active connection found for "<<ds->remote.toString());
+    return nullptr;
+  }
+
+  for (auto& conn : it->second) {
+    if (conn->canAcceptNewQueries()) {
+      DEBUGLOG("Got one active connection accepting more for "<<ds->remote.toString());
+      return conn;
+    }
+    DEBUGLOG("not accepting more for "<<ds->remote.toString());
+  }
+
+  return nullptr;
+}
+
+void IncomingTCPConnectionState::registerActiveDownstreamConnection(std::shared_ptr<TCPConnectionToBackend>& conn)
+{
+  d_activeConnectionsToBackend[conn->getRemote()].push_front(conn);
+}
+
 /* this version is called when the buffer has been set and the rules have been processed */
 void IncomingTCPConnectionState::sendResponse(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now, TCPResponse&& response)
 {
-  // DEBUG: cerr<<"in "<<__PRETTY_FUNCTION__<<endl;
+  DEBUGLOG("in "<<__PRETTY_FUNCTION__);
   // if we already reading a query (not the query size, mind you), or sending a response we need to either queue the response
   // otherwise we can start sending it right away
   if (state->d_state == IncomingTCPConnectionState::State::idle ||
@@ -429,23 +413,31 @@ void IncomingTCPConnectionState::sendResponse(std::shared_ptr<IncomingTCPConnect
     state->d_currentPos = 0;
     state->d_currentResponse = std::move(response);
 
-    //IncomingTCPConnectionState::handleIO(state, now);
-     state->d_ioState->update(IOState::NeedWrite, handleIOCallback, state, getClientWriteTTD(now));
-    // DEBUG: cerr<<"updated IO state"<<endl;
+    state->d_ioState->update(IOState::NeedWrite, handleIOCallback, state, getClientWriteTTD(now));
   }
   else {
     // queue response
     state->d_queuedResponses.push_back(std::move(response));
-    // DEBUG: cerr<<"queueing response because state is "<<(int)state->d_state<<", queue size is now "<<state->d_queuedResponses.size()<<endl;
+    DEBUGLOG("queueing response because state is "<<(int)state->d_state<<", queue size is now "<<state->d_queuedResponses.size());
   }
 }
 
 /* this version is called from the backend code when a new response has been received */
 void IncomingTCPConnectionState::handleResponse(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now, TCPResponse&& response)
 {
-  // DEBUG: cerr<<"in "<<__PRETTY_FUNCTION__<<endl;
+  DEBUGLOG("in "<<__PRETTY_FUNCTION__);
+  if (response.d_connection && response.d_connection->isIdle()) {
+    auto& list = d_activeConnectionsToBackend.at(response.d_connection->getRemote());
+    for (auto it = list.begin(); it != list.end(); ++it) {
+      if (*it == response.d_connection) {
+        DownstreamConnectionsManager::releaseDownstreamConnection(std::move(*it));
+        list.erase(it);
+        break;
+      }
+    }
+  }
+
   if (response.d_buffer.size() < sizeof(dnsheader)) {
-    // DEBUG: cerr<<"too small"<<endl;
     return;
   }
 
@@ -455,11 +447,8 @@ void IncomingTCPConnectionState::handleResponse(std::shared_ptr<IncomingTCPConne
   auto responseAsCharArray = reinterpret_cast<char*>(&response.d_buffer.at(0));
 
   auto& ids = response.d_idstate;
-  // DEBUG: cerr<<"IDS has "<<(ids.qTag?" TAGS ": "NO TAGS")<<endl;
   unsigned int consumed;
-  // DEBUG: cerr<<"about to match response for "<<ids.qname<<endl;
-  if (!responseContentMatches(responseAsCharArray, responseSize, ids.qname, ids.qtype, ids.qclass, response.d_ds->remote, consumed)) {
-    // DEBUG: cerr<<"content does not match"<<endl;
+  if (!responseContentMatches(responseAsCharArray, responseSize, ids.qname, ids.qtype, ids.qclass, response.d_connection->getRemote(), consumed)) {
     return;
   }
 
@@ -474,7 +463,6 @@ void IncomingTCPConnectionState::handleResponse(std::shared_ptr<IncomingTCPConne
 
   std::vector<uint8_t> rewrittenResponse;
   if (!processResponse(&responseAsCharArray, &responseSize, &responseCapacity, state->d_threadData.localRespRulactions, dr, addRoom, rewrittenResponse, false)) {
-    // DEBUG: cerr<<"process said to drop it"<<endl;
     return;
   }
 
@@ -492,13 +480,17 @@ void IncomingTCPConnectionState::handleResponse(std::shared_ptr<IncomingTCPConne
     state->d_xfrStarted = true;
     ++g_stats.responses;
     ++state->d_ci.cs->responses;
-    ++response.d_ds->responses;
+    if (response.d_connection->getDS()) {
+      ++response.d_connection->getDS()->responses;
+    }
   }
 
   if (!state->d_isXFR) {
     ++g_stats.responses;
     ++state->d_ci.cs->responses;
-    ++response.d_ds->responses;
+    if (response.d_connection->getDS()) {
+      ++response.d_connection->getDS()->responses;
+    }
   }
 
   sendResponse(state, now, std::move(response));
@@ -547,11 +539,10 @@ static bool handleQuery(std::shared_ptr<IncomingTCPConnectionState>& state, cons
   std::shared_ptr<DNSCryptQuery> dnsCryptQuery{nullptr};
   auto dnsCryptResponse = checkDNSCryptQuery(*state->d_ci.cs, query, state->d_querySize, dnsCryptQuery, queryRealTime.tv_sec, true);
   if (dnsCryptResponse) {
-    //state->d_responseBuffer = std::move(*dnsCryptResponse);
-    //state->d_responseSize = state->d_responseBuffer.size();
     TCPResponse response;
     response.d_buffer = std::move(*dnsCryptResponse);
     state->d_state = IncomingTCPConnectionState::State::idle;
+    ++state->d_currentQueriesCount;
     state->sendResponse(state, now, std::move(response));
     return false;
   }
@@ -586,6 +577,7 @@ static bool handleQuery(std::shared_ptr<IncomingTCPConnectionState>& state, cons
     TCPResponse response;
     response.d_buffer = std::move(state->d_buffer);
     state->d_state = IncomingTCPConnectionState::State::idle;
+    ++state->d_currentQueriesCount;
     state->sendResponse(state, now, std::move(response));
     return false;
   }
@@ -594,17 +586,8 @@ static bool handleQuery(std::shared_ptr<IncomingTCPConnectionState>& state, cons
     return true;
   }
 
-#warning move this, we should just never read another question again on this client connection
-  if (state->d_xfrStarted) {
-    /* sorry, but we are not going to resume a XFR if we have already sent some packets
-       to the client */
-    return true;
-  }
-
   IDState ids;
-  // DEBUG: cerr<<"DQ has "<<(dq.qTag?" TAGS ": "NO TAGS")<<endl;
   setIDStateFromDNSQuestion(ids, dq, std::move(qname));
-  // DEBUG: cerr<<"query IDS has "<<(ids.qTag?" TAGS ": "NO TAGS")<<endl;
   ids.origID = ntohs(dh->id);
 
   const uint8_t sizeBytes[] = { static_cast<uint8_t>(dq.len / 256), static_cast<uint8_t>(dq.len % 256) };
@@ -647,11 +630,9 @@ static bool handleQuery(std::shared_ptr<IncomingTCPConnectionState>& state, cons
 
   vinfolog("Got query for %s|%s from %s (%s, %d bytes), relayed to %s", ids.qname.toLogString(), QType(ids.qtype).getName(), state->d_ci.remote.toStringWithPort(), (state->d_ci.cs->tlsFrontend ? "DoT" : "TCP"), state->d_buffer.size(), ds->getName());
 
-// DEBUG: cerr<<"about to be queued query IDS has "<<(ids.qTag?" TAGS ": "NO TAGS")<<endl;
+  ++state->d_currentQueriesCount;
   downstreamConnection->queueQuery(TCPQuery(std::move(state->d_buffer), std::move(ids)), downstreamConnection);
 
-  //sendQueryToBackend(state, now);
-  // DEBUG: cerr<<"out of "<<__PRETTY_FUNCTION__<<endl;
   return true;
 }
 
@@ -669,7 +650,7 @@ void IncomingTCPConnectionState::handleIOCallback(int fd, FDMultiplexer::funcpar
 
 void IncomingTCPConnectionState::handleIO(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now)
 {
-  // DEBUG: cerr<<"in "<<__PRETTY_FUNCTION__<<endl;
+  DEBUGLOG("in "<<__PRETTY_FUNCTION__);
   // why do we loop? Because the TLS layer does buffering, and thus can have data ready to read
   // even though the underlying socket is not ready, so we need to actually ask for the data first
   bool wouldBlock = false;
@@ -687,10 +668,10 @@ void IncomingTCPConnectionState::handleIO(std::shared_ptr<IncomingTCPConnectionS
 
     try {
       if (state->d_state == IncomingTCPConnectionState::State::doingHandshake) {
-        // DEBUG: cerr<<"doing handshake"<<endl;
+        DEBUGLOG("doing handshake");
         iostate = state->d_handler.tryHandshake();
         if (iostate == IOState::Done) {
-          // DEBUG: cerr<<"handshake done"<<endl;
+          DEBUGLOG("handshake done");
           if (state->d_handler.isTLS()) {
             if (!state->d_handler.hasTLSSessionBeenResumed()) {
               ++state->d_ci.cs->tlsNewSessions;
@@ -715,10 +696,10 @@ void IncomingTCPConnectionState::handleIO(std::shared_ptr<IncomingTCPConnectionS
       }
 
       if (state->d_state == IncomingTCPConnectionState::State::readingQuerySize) {
-        // DEBUG: cerr<<"reading query size"<<endl;
+        DEBUGLOG("reading query size");
         iostate = state->d_handler.tryRead(state->d_buffer, state->d_currentPos, sizeof(uint16_t));
         if (iostate == IOState::Done) {
-          // DEBUG: cerr<<"query size received"<<endl;
+          DEBUGLOG("query size received");
           state->d_state = IncomingTCPConnectionState::State::readingQuery;
           state->d_querySizeReadTime = now;
           if (state->d_queriesCount == 0) {
@@ -727,8 +708,6 @@ void IncomingTCPConnectionState::handleIO(std::shared_ptr<IncomingTCPConnectionS
           state->d_querySize = state->d_buffer.at(0) * 256 + state->d_buffer.at(1);
           if (state->d_querySize < sizeof(dnsheader)) {
             /* go away */
-            // will be handled by the guard
-            //handleNewIOState(state, IOState::Done, fd, handleIOCallback);
             return;
           }
 
@@ -743,22 +722,24 @@ void IncomingTCPConnectionState::handleIO(std::shared_ptr<IncomingTCPConnectionS
       }
 
       if (state->d_state == IncomingTCPConnectionState::State::readingQuery) {
-        // DEBUG: cerr<<"reading query"<<endl;
+        DEBUGLOG("reading query");
         iostate = state->d_handler.tryRead(state->d_buffer, state->d_currentPos, state->d_querySize);
         if (iostate == IOState::Done) {
-          // DEBUG: cerr<<"query received"<<endl;
-          //handleNewIOState(state, IOState::Done, fd, handleIOCallback);
+          DEBUGLOG("query received");
+
           if (handleQuery(state, now)) {
-            // DEBUG: cerr<<"handle query returned true"<<endl;
+            DEBUGLOG("handle query returned true");
             // if the query has been passed to a backend, or dropped, we can start
             // reading again, or sending queued responses
             if (state->d_queuedResponses.empty()) {
-              state->resetForNewQuery();
-              // DEBUG: cerr<<__LINE__<<endl;
-              iostate = IOState::NeedRead;
-              //state->d_ioState->update(IOState::NeedRead, handleIOCallback, state, state->getClientReadTTD(now));
-              // DEBUG: cerr<<__LINE__<<endl;
-              //ioGuard.release();
+              if (state->canAcceptNewQueries()) {
+                state->resetForNewQuery();
+                iostate = IOState::NeedRead;
+              }
+              else {
+                state->d_state = IncomingTCPConnectionState::State::idle;
+                iostate = IOState::Done;
+              }
             }
             else {
               TCPResponse resp = std::move(state->d_queuedResponses.front());
@@ -771,7 +752,7 @@ void IncomingTCPConnectionState::handleIO(std::shared_ptr<IncomingTCPConnectionS
           else {
             /* otherwise the state should already be waiting for
                the socket to be writable */
-            // DEBUG: cerr<<"should be waiting for writable socket"<<endl;
+            DEBUGLOG("should be waiting for writable socket");
             ioGuard.release();
             return;
           }
@@ -782,18 +763,15 @@ void IncomingTCPConnectionState::handleIO(std::shared_ptr<IncomingTCPConnectionS
       }
 
       if (state->d_state == IncomingTCPConnectionState::State::sendingResponse) {
-        // DEBUG: cerr<<"sending response"<<endl;
+        DEBUGLOG("sending response");
         iostate = state->d_handler.tryWrite(state->d_currentResponse.d_buffer, state->d_currentPos, state->d_currentResponse.d_buffer.size());
         if (iostate == IOState::Done) {
-          // DEBUG: cerr<<"response sent"<<endl;
+          DEBUGLOG("response sent");
           iostate = handleResponseSent(state, now);
         } else {
           wouldBlock = true;
-          // DEBUG: cerr<<"partial write"<<endl;
+          DEBUGLOG("partial write");
         }
-        // DEBUG: cerr<<__LINE__<<endl;
-        //state->d_ioState->update(IOState::NeedRead, handleIOCallback, state, state->getClientReadTTD(now));
-        //// DEBUG: cerr<<__LINE__<<endl;
       }
 
       if (state->d_state != IncomingTCPConnectionState::State::idle &&
@@ -820,26 +798,22 @@ void IncomingTCPConnectionState::handleIO(std::shared_ptr<IncomingTCPConnectionS
       }
 
       if (state->d_lastIOState == IOState::NeedWrite || state->d_readingFirstQuery) {
-        // DEBUG: cerr<<"Got an exception while handling TCP query: "<<e.what()<<endl;
+        DEBUGLOG("Got an exception while handling TCP query: "<<e.what());
         vinfolog("Got an exception while handling (%s) TCP query from %s: %s", (state->d_lastIOState == IOState::NeedRead ? "reading" : "writing"), state->d_ci.remote.toStringWithPort(), e.what());
       }
       else {
         vinfolog("Closing TCP client connection with %s", state->d_ci.remote.toStringWithPort());
-        // DEBUG: cerr<<"Closing TCP client connection: "<<e.what()<<endl;
+        DEBUGLOG("Closing TCP client connection: "<<e.what());
       }
       /* remove this FD from the IO multiplexer */
       iostate = IOState::Done;
     }
 
     if (iostate == IOState::Done) {
-      // DEBUG: cerr<<__LINE__<<endl;
       state->d_ioState->update(iostate, handleIOCallback, state);
-      // DEBUG: cerr<<__LINE__<<endl;
     }
     else {
-      // DEBUG: cerr<<__LINE__<<endl;
       state->d_ioState->update(iostate, handleIOCallback, state, iostate == IOState::NeedRead ? state->getClientReadTTD(now) : state->getClientWriteTTD(now));
-      // DEBUG: cerr<<__LINE__<<endl;
     }
     ioGuard.release();
   }
@@ -848,11 +822,7 @@ void IncomingTCPConnectionState::handleIO(std::shared_ptr<IncomingTCPConnectionS
 
 void IncomingTCPConnectionState::notifyIOError(std::shared_ptr<IncomingTCPConnectionState>& state, IDState&& query, const struct timeval& now)
 {
-  // DEBUG: cerr<<"in "<<__PRETTY_FUNCTION__<<endl;
-  if (d_isXFR) {
-    d_xfrDone = true;
-  }
-
+  DEBUGLOG("in "<<__PRETTY_FUNCTION__);
   if (d_state == State::sendingResponse) {
     /* if we have responses to send, let's do that first */
   }
@@ -867,8 +837,6 @@ void IncomingTCPConnectionState::notifyIOError(std::shared_ptr<IncomingTCPConnec
     d_lastIOState = IOState::Done;
     d_ioState->reset();
   }
-
-  // DEBUG: cerr<<"out "<<__PRETTY_FUNCTION__<<endl;
 }
 
 void IncomingTCPConnectionState::handleXFRResponse(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now, TCPResponse&& response)
@@ -878,7 +846,7 @@ void IncomingTCPConnectionState::handleXFRResponse(std::shared_ptr<IncomingTCPCo
 
 void IncomingTCPConnectionState::handleTimeout(bool write)
 {
-  // DEBUG: cerr<<"client timeout"<<endl;
+  DEBUGLOG("client timeout");
   ++d_ci.cs->tcpClientTimeouts;
   d_lastIOState = IOState::Done;
   d_ioState->reset();
@@ -980,14 +948,6 @@ static void tcpClientThread(int pipefd)
           vinfolog("Timeout (write) from remote backend %s", conn->getBackendName());
           conn->handleTimeout(now, true);
         }
-#if 0
-        try {
-          data.mplexer->removeWriteFD(cbData.first);
-        }
-        catch (const FDMultiplexerException& fde) {
-          warnlog("Exception while removing a socket (%d) after a write timeout: %s", cbData.first, fde.what());
-        }
-#endif
       }
     }
   }
index 0dee6cc8df28633fd562ea92cf2ade619d249df1..4640254aec25f457c1029173b671127891f7259b 100644 (file)
@@ -669,6 +669,7 @@ struct ClientState
   std::atomic<double> tcpAvgQueriesPerConnection{0.0};
   /* in ms */
   std::atomic<double> tcpAvgConnectionDuration{0.0};
+  size_t d_maxInFlightQueriesPerConn{1};
   int udpFD{-1};
   int tcpFD{-1};
   int tcpListenQueueSize{SOMAXCONN};
@@ -852,6 +853,7 @@ struct DownstreamState
   /* in ms */
   std::atomic<double> tcpAvgConnectionDuration{0.0};
   size_t socketsOffset{0};
+  size_t d_maxInFlightQueriesPerConn{1};
   double queryLoad{0.0};
   double dropRate{0.0};
   double latencyUsec{0.0};
index 86dee1720474b17b9a1c75f181d668dba2f87c0b..03a4c41b244acb4812ad7b686782a1f85ee838b9 100644 (file)
@@ -6,7 +6,7 @@ const uint16_t TCPConnectionToBackend::s_xfrID = 0;
 
 void TCPConnectionToBackend::assignToClientConnection(std::shared_ptr<IncomingTCPConnectionState>& clientConn, bool isXFR)
 {
-  // DEBUG: cerr<<"in "<<__PRETTY_FUNCTION__<<endl;
+  DEBUGLOG("in "<<__PRETTY_FUNCTION__);
   if (isXFR) {
     d_usedForXFR = true;
   }
@@ -20,12 +20,50 @@ IOState TCPConnectionToBackend::sendNextQuery(std::shared_ptr<TCPConnectionToBac
   conn->d_currentQuery = std::move(conn->d_pendingQueries.front());
   conn->d_pendingQueries.pop_front();
   conn->d_state = State::sendingQueryToBackend;
+  conn->d_currentPos = 0;
+
   return IOState::NeedWrite;
 }
 
+/* Tries to read exactly toRead bytes into the buffer, starting at position pos.
+   Updates pos everytime a successful read occurs,
+   throws an std::runtime_error in case of IO error,
+   return Done when toRead bytes have been read, needRead or needWrite if the IO operation
+   would block.
+*/
+// XXX could probably be implemented as a TCPIOHandler
+static IOState tryRead(int fd, std::vector<uint8_t>& buffer, size_t& pos, size_t toRead)
+{
+  if (buffer.size() < (pos + toRead)) {
+    throw std::out_of_range("Calling tryRead() with a too small buffer (" + std::to_string(buffer.size()) + ") for a read of " + std::to_string(toRead) + " bytes starting at " + std::to_string(pos));
+  }
+
+  size_t got = 0;
+  do {
+    ssize_t res = ::read(fd, reinterpret_cast<char*>(&buffer.at(pos)), toRead - got);
+    if (res == 0) {
+      throw runtime_error("EOF while reading message");
+    }
+    if (res < 0) {
+      if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOTCONN) {
+        return IOState::NeedRead;
+      }
+      else {
+        throw std::runtime_error(std::string("Error while reading message: ") + stringerror());
+      }
+    }
+
+    pos += static_cast<size_t>(res);
+    got += static_cast<size_t>(res);
+  }
+  while (got < toRead);
+
+  return IOState::Done;
+}
+
 void TCPConnectionToBackend::handleIO(std::shared_ptr<TCPConnectionToBackend>& conn, const struct timeval& now)
 {
-  // DEBUG: cerr<<"in "<<__PRETTY_FUNCTION__<<endl;
+  DEBUGLOG("in "<<__PRETTY_FUNCTION__);
   if (conn->d_socket == nullptr) {
     throw std::runtime_error("No downstream socket in " + std::string(__PRETTY_FUNCTION__) + "!");
   }
@@ -37,7 +75,7 @@ void TCPConnectionToBackend::handleIO(std::shared_ptr<TCPConnectionToBackend>& c
 
   try {
     if (conn->d_state == State::sendingQueryToBackend) {
-      // DEBUG: cerr<<"sending query to backend over FD "<<fd<<endl;
+      DEBUGLOG("sending query to backend over FD "<<fd);
       int socketFlags = 0;
 #ifdef MSG_FASTOPEN
       if (conn->isFastOpenEnabled()) {
@@ -47,22 +85,14 @@ void TCPConnectionToBackend::handleIO(std::shared_ptr<TCPConnectionToBackend>& c
 
       size_t sent = sendMsgWithOptions(fd, reinterpret_cast<const char *>(&conn->d_currentQuery.d_buffer.at(conn->d_currentPos)), conn->d_currentQuery.d_buffer.size() - conn->d_currentPos, &conn->d_ds->remote, &conn->d_ds->sourceAddr, conn->d_ds->sourceItf, socketFlags);
       if (sent == conn->d_currentQuery.d_buffer.size()) {
-        // DEBUG: cerr<<"query sent to backend"<<endl;
+        DEBUGLOG("query sent to backend");
         /* request sent ! */
         conn->incQueries();
         conn->d_currentPos = 0;
         //conn->d_currentQuery.d_querySentTime = now;
-        // DEBUG: cerr<<"adding a pending response for ID "<<conn->d_currentQuery.d_idstate.origID<<" and QNAME "<<conn->d_currentQuery.d_idstate.qname<<endl;
-        // DEBUG: cerr<<"IDS has "<<(conn->d_currentQuery.d_idstate.qTag?"tags":"no tags")<<endl;
+        DEBUGLOG("adding a pending response for ID "<<conn->d_currentQuery.d_idstate.origID<<" and QNAME "<<conn->d_currentQuery.d_idstate.qname);
         conn->d_pendingResponses[conn->d_currentQuery.d_idstate.origID] = std::move(conn->d_currentQuery);
         conn->d_currentQuery.d_buffer.clear();
-#if 0
-        if (!conn->d_usedForXFR) {
-          /* don't bother with the outstanding count for XFR queries */
-          ++conn->d_ds->outstanding;
-          ++conn->d_outstanding;
-        }
-#endif
 
         if (conn->d_pendingQueries.empty()) {
           conn->d_state = State::readingResponseSizeFromBackend;
@@ -83,7 +113,7 @@ void TCPConnectionToBackend::handleIO(std::shared_ptr<TCPConnectionToBackend>& c
     }
 
     if (conn->d_state == State::readingResponseSizeFromBackend) {
-      // DEBUG: cerr<<"reading response size from backend"<<endl;
+      DEBUGLOG("reading response size from backend");
       // then we need to allocate a new buffer (new because we might need to re-send the query if the
       // backend dies on us)
       // We also might need to read and send to the client more than one response in case of XFR (yeah!)
@@ -91,7 +121,7 @@ void TCPConnectionToBackend::handleIO(std::shared_ptr<TCPConnectionToBackend>& c
       conn->d_responseBuffer.resize(sizeof(uint16_t));
       iostate = tryRead(fd, conn->d_responseBuffer, conn->d_currentPos, sizeof(uint16_t) - conn->d_currentPos);
       if (iostate == IOState::Done) {
-        // DEBUG: cerr<<"got response size from backend"<<endl;
+        DEBUGLOG("got response size from backend");
         conn->d_state = State::readingResponseFromBackend;
         conn->d_responseSize = conn->d_responseBuffer.at(0) * 256 + conn->d_responseBuffer.at(1);
         conn->d_responseBuffer.reserve(conn->d_responseSize + /* we will need to prepend the size later */ 2);
@@ -101,18 +131,17 @@ void TCPConnectionToBackend::handleIO(std::shared_ptr<TCPConnectionToBackend>& c
     }
 
     if (conn->d_state == State::readingResponseFromBackend) {
-      // DEBUG: cerr<<"reading response from backend"<<endl;
+      DEBUGLOG("reading response from backend");
       iostate = tryRead(fd, conn->d_responseBuffer, conn->d_currentPos, conn->d_responseSize - conn->d_currentPos);
       if (iostate == IOState::Done) {
-        // DEBUG: cerr<<"got response from backend"<<endl;
+        DEBUGLOG("got response from backend");
         //conn->d_responseReadTime = now;
         try {
-          iostate = conn->handleResponse(now);
+          iostate = conn->handleResponse(conn, now);
         }
         catch (const std::exception& e) {
           vinfolog("Got an exception while handling TCP response from %s (client is %s): %s", conn->d_ds ? conn->d_ds->getName() : "unknown", conn->d_currentQuery.d_idstate.origRemote.toStringWithPort(), e.what());
         }
-        //return;
       }
     }
 
@@ -157,18 +186,17 @@ void TCPConnectionToBackend::handleIO(std::shared_ptr<TCPConnectionToBackend>& c
 
   if (connectionDied) {
     bool reconnected = false;
-    // DEBUG: cerr<<"connection died, number of failures is "<<conn->d_downstreamFailures<<", retries is "<<conn->d_ds->retries<<endl;
+    DEBUGLOG("connection died, number of failures is "<<conn->d_downstreamFailures<<", retries is "<<conn->d_ds->retries);
 
     if ((!conn->d_usedForXFR || conn->d_queries == 0) && conn->d_downstreamFailures < conn->d_ds->retries) {
-      // DEBUG: cerr<<"reconnecting"<<endl;
+      DEBUGLOG("reconnecting");
       conn->d_ioState->reset();
       ioGuard.release();
 
       if (conn->reconnect()) {
-        // DEBUG: cerr<<"reconnected"<<endl;
+        DEBUGLOG("reconnected");
 
         conn->d_ioState = make_unique<IOStateHandler>(conn->d_clientConn->getIOMPlexer(), conn->d_socket->getHandle());
-        // DEBUG: cerr<<"new state"<<endl;
 
         for (auto& pending : conn->d_pendingResponses) {
           conn->d_pendingQueries.push_back(std::move(pending.second));
@@ -182,9 +210,7 @@ void TCPConnectionToBackend::handleIO(std::shared_ptr<TCPConnectionToBackend>& c
           // resume sending query
         }
         else {
-          // DEBUG: cerr<<"sending next query"<<endl;
           iostate = sendNextQuery(conn);
-          // DEBUG: cerr<<"after call to sendNextQuery"<<endl;
         }
 
         if (!conn->d_proxyProtocolPayloadAdded && !conn->d_proxyProtocolPayload.empty()) {
@@ -198,21 +224,18 @@ void TCPConnectionToBackend::handleIO(std::shared_ptr<TCPConnectionToBackend>& c
 
     if (!reconnected) {
       /* reconnect failed, we give up */
-      conn->d_connectionDied = true;
+      DEBUGLOG("reconnect failed, we give up");
       conn->notifyAllQueriesFailed(now);
     }
   }
 
   if (iostate == IOState::Done) {
-    // DEBUG: cerr<<"in "<<__PRETTY_FUNCTION__<<", done"<<endl;
     conn->d_ioState->update(iostate, handleIOCallback, conn);
   }
   else {
-    // DEBUG: cerr<<"in "<<__PRETTY_FUNCTION__<<", updating to "<<(int)iostate<<endl;
     conn->d_ioState->update(iostate, handleIOCallback, conn, iostate == IOState::NeedRead ? conn->getBackendReadTTD(now) : conn->getBackendWriteTTD(now));
   }
   ioGuard.release();
-
 }
 
 void TCPConnectionToBackend::handleIOCallback(int fd, FDMultiplexer::funcparam_t& param)
@@ -229,8 +252,7 @@ void TCPConnectionToBackend::handleIOCallback(int fd, FDMultiplexer::funcparam_t
 
 void TCPConnectionToBackend::queueQuery(TCPQuery&& query, std::shared_ptr<TCPConnectionToBackend>& sharedSelf)
 {
-  // DEBUG: cerr<<"in "<<__PRETTY_FUNCTION__<<endl;
-  // DEBUG: cerr<<"IDS has "<<(query.d_idstate.qTag?"tags":"no tags")<<endl;
+  DEBUGLOG("in "<<__PRETTY_FUNCTION__);
   if (d_ioState == nullptr) {
     throw std::runtime_error("Trying to queue a query to a TCP connection that has no incoming client connection assigned");
   }
@@ -238,9 +260,16 @@ void TCPConnectionToBackend::queueQuery(TCPQuery&& query, std::shared_ptr<TCPCon
   // if we are not already sending a query or in the middle of reading a response (so idle or doingHandshake),
   // start sending the query
   if (d_state == State::idle || d_state == State::waitingForResponseFromBackend) {
+
     d_state = State::sendingQueryToBackend;
+    d_currentPos = 0;
     d_currentQuery = std::move(query);
-    // DEBUG: cerr<<"need write"<<endl;
+    if (!d_proxyProtocolPayloadAdded && !d_proxyProtocolPayload.empty()) {
+      d_currentQuery.d_buffer.insert(d_currentQuery.d_buffer.begin(), d_proxyProtocolPayload.begin(), d_proxyProtocolPayload.end());
+      d_proxyProtocolPayloadAdded = true;
+    }
+
+    DEBUGLOG("need write");
 
     struct timeval now;
     gettimeofday(&now, 0);
@@ -251,7 +280,7 @@ void TCPConnectionToBackend::queueQuery(TCPQuery&& query, std::shared_ptr<TCPCon
     // store query in the list of queries to send
     d_pendingQueries.push_back(std::move(query));
   }
-  // DEBUG: cerr<<"out of "<<__PRETTY_FUNCTION__<<endl;
+  DEBUGLOG("out of "<<__PRETTY_FUNCTION__);
 }
 
 bool TCPConnectionToBackend::reconnect()
@@ -259,7 +288,7 @@ bool TCPConnectionToBackend::reconnect()
   std::unique_ptr<Socket> result;
 
   if (d_socket) {
-    // DEBUG: cerr<<"closing socket "<<d_socket->getHandle()<<endl;
+    DEBUGLOG("closing socket "<<d_socket->getHandle());
     shutdown(d_socket->getHandle(), SHUT_RDWR);
     d_socket.reset();
     d_ioState.reset();
@@ -270,7 +299,7 @@ bool TCPConnectionToBackend::reconnect()
     vinfolog("TCP connecting to downstream %s (%d)", d_ds->getNameWithAddr(), d_downstreamFailures);
     try {
       result = std::unique_ptr<Socket>(new Socket(d_ds->remote.sin4.sin_family, SOCK_STREAM, 0));
-      // DEBUG: cerr<<"result of connect is "<<result->getHandle()<<endl;
+      DEBUGLOG("result of connect is "<<result->getHandle());
       if (!IsAnyAddress(d_ds->sourceAddr)) {
         SSetsockopt(result->getHandle(), SOL_SOCKET, SO_REUSEADDR, 1);
 #ifdef IP_BIND_ADDRESS_NO_PORT
@@ -298,7 +327,7 @@ bool TCPConnectionToBackend::reconnect()
 #endif /* MSG_FASTOPEN */
 
       d_socket = std::move(result);
-      // DEBUG: cerr<<"connected new socket "<<d_socket->getHandle()<<endl;
+      DEBUGLOG("connected new socket "<<d_socket->getHandle());
       ++d_ds->tcpCurrentConnections;
       return true;
     }
@@ -333,16 +362,12 @@ void TCPConnectionToBackend::handleTimeout(const struct timeval& now, bool write
 
 void TCPConnectionToBackend::notifyAllQueriesFailed(const struct timeval& now, bool timeout)
 {
+  DEBUGLOG("in "<<__PRETTY_FUNCTION__);
   d_connectionDied = true;
-  //auto clientConn = d_clientConn.lock();
-  //if (!clientConn) {
-  //  d_clientConn.reset();
-  //  return;
-  //}
+
   auto& clientConn = d_clientConn;
   if (!clientConn->active()) {
     // a client timeout occured, or something like that */
-    d_connectionDied = true;
     d_clientConn.reset();
     return;
   }
@@ -369,20 +394,15 @@ void TCPConnectionToBackend::notifyAllQueriesFailed(const struct timeval& now, b
   d_clientConn.reset();
 }
 
-IOState TCPConnectionToBackend::handleResponse(const struct timeval& now)
+IOState TCPConnectionToBackend::handleResponse(std::shared_ptr<TCPConnectionToBackend>& conn, const struct timeval& now)
 {
-  // DEBUG: cerr<<"in "<<__PRETTY_FUNCTION__<<endl;
-  //auto clientConn = d_clientConn.lock();
-  //if (!clientConn) {
-  //d_clientConn.reset();
-  //  d_connectionDied = true;
-  //  // DEBUG: cerr<<"connection to client died, bye bye"<<endl;
-  //  return IOState::Done;
-  //}
+  DEBUGLOG("in "<<__PRETTY_FUNCTION__);
+
+  d_downstreamFailures = 0;
 
   auto& clientConn = d_clientConn;
   if (!clientConn->active()) {
-    // DEBUG: cerr<<"client is not active"<<endl;
+    DEBUGLOG("client is not active");
     // a client timeout occured, or something like that */
     d_connectionDied = true;
     d_clientConn.reset();
@@ -390,10 +410,10 @@ IOState TCPConnectionToBackend::handleResponse(const struct timeval& now)
   }
 
   if (d_usedForXFR) {
-    // DEBUG: cerr<<"XFR!"<<endl;
+    DEBUGLOG("XFR!");
     TCPResponse response;
     response.d_buffer = std::move(d_responseBuffer);
-    response.d_ds = d_ds;
+    response.d_connection = conn;
     clientConn->handleXFRResponse(clientConn, now, std::move(response));
     d_state = State::readingResponseSizeFromBackend;
     d_currentPos = 0;
@@ -402,44 +422,45 @@ IOState TCPConnectionToBackend::handleResponse(const struct timeval& now)
     // get ready to read the next packet, if any
   }
   else {
-    // DEBUG: cerr<<"not XFR, phew"<<endl;
+    DEBUGLOG("not XFR, phew");
     uint16_t queryId = 0;
     try {
       queryId = getQueryIdFromResponse();
     }
     catch (const std::exception& e) {
+      DEBUGLOG("Unable to get query ID");
       notifyAllQueriesFailed(now);
       throw;
     }
 
     auto it = d_pendingResponses.find(queryId);
     if (it == d_pendingResponses.end()) {
-      // DEBUG: cerr<<"could not found any corresponding query for ID "<<queryId<<endl;
+      DEBUGLOG("could not found any corresponding query for ID "<<queryId);
       notifyAllQueriesFailed(now);
       return IOState::Done;
     }
     auto ids = std::move(it->second.d_idstate);
-    // DEBUG: cerr<<"IDS has "<<(ids.qTag?" TAGS ": "NO TAGS")<<endl;
-    // DEBUG: cerr<<"passing response to client connection for "<<ids.qname<<endl;
-    clientConn->handleResponse(clientConn, now, TCPResponse(std::move(d_responseBuffer), std::move(ids), d_ds));
+    DEBUGLOG("passing response to client connection for "<<ids.qname);
+    clientConn->handleResponse(clientConn, now, TCPResponse(std::move(d_responseBuffer), std::move(ids), conn));
     d_pendingResponses.erase(it);
 
     if (!d_pendingQueries.empty()) {
-      // DEBUG: cerr<<"still have some queries to send"<<endl;
+      DEBUGLOG("still have some queries to send");
       d_state = State::sendingQueryToBackend;
       d_currentQuery = std::move(d_pendingQueries.front());
+      d_currentPos = 0;
       d_pendingQueries.pop_front();
       return IOState::NeedWrite;
     }
     else if (!d_pendingResponses.empty()) {
-      // DEBUG: cerr<<"still have some responses to read"<<endl;
+      DEBUGLOG("still have some responses to read");
       d_state = State::readingResponseSizeFromBackend;
       d_currentPos = 0;
       d_responseBuffer.resize(sizeof(uint16_t));
       return IOState::NeedRead;
     }
     else {
-      // DEBUG: cerr<<"nothing to do, phewwwww"<<endl;
+      DEBUGLOG("nothing to do, phewwwww");
       d_state = State::idle;
       d_clientConn.reset();
       return IOState::Done;
index 9d24de5f17f3d5b549f70a95c86e591b292d3347..c815364831ca60175140c60cb3a908924c4b14aa 100644 (file)
@@ -20,17 +20,19 @@ struct TCPQuery
   std::vector<uint8_t> d_buffer;
 };
 
+class TCPConnectionToBackend;
+
 struct TCPResponse : public TCPQuery
 {
   TCPResponse()
   {
   }
 
-  TCPResponse(std::vector<uint8_t>&& buffer, IDState&& state, std::shared_ptr<DownstreamState> ds): TCPQuery(std::move(buffer), std::move(state)), d_ds(ds)
+  TCPResponse(std::vector<uint8_t>&& buffer, IDState&& state, std::shared_ptr<TCPConnectionToBackend> conn): TCPQuery(std::move(buffer), std::move(state)), d_connection(conn)
   {
   }
 
-  std::shared_ptr<DownstreamState> d_ds{nullptr};
+  std::shared_ptr<TCPConnectionToBackend> d_connection{nullptr};
   dnsheader d_cleartextDH;
   bool d_selfGenerated{false};
 };
@@ -68,6 +70,11 @@ public:
     return d_socket->getHandle();
   }
 
+  const std::shared_ptr<DownstreamState>& getDS() const
+  {
+    return d_ds;
+  }
+
   const ComboAddress& getRemote() const
   {
     return d_ds->remote;
@@ -103,6 +110,7 @@ public:
     return d_enableFastOpen;
   }
 
+  /* whether we can acept new queries FOR THE SAME CLIENT */
   bool canAcceptNewQueries() const
   {
     if (d_usedForXFR || d_connectionDied) {
@@ -110,10 +118,20 @@ public:
       /* Don't reuse the TCP connection after an {A,I}XFR */
       /* but don't reset it either, we will need to read more messages */
     }
-#warning FIXME: maximum number of pending queries
+
+    if ((d_pendingQueries.size() + d_pendingResponses.size()) >= d_ds->d_maxInFlightQueriesPerConn) {
+      return false;
+    }
+
     return true;
   }
 
+  bool isIdle() const
+  {
+    return d_pendingQueries.size() == 0 && d_pendingResponses.size() == 0;
+  }
+
+  /* whether a connection can be reused for a different client */
   bool canBeReused() const
   {
     if (d_usedForXFR || d_connectionDied) {
@@ -138,17 +156,17 @@ public:
     return ds == d_ds;
   }
 
-  static void handleIO(std::shared_ptr<TCPConnectionToBackend>& conn, const struct timeval& now);
-  static void handleIOCallback(int fd, FDMultiplexer::funcparam_t& param);
-  static IOState sendNextQuery(std::shared_ptr<TCPConnectionToBackend>& conn);
-
   void queueQuery(TCPQuery&& query, std::shared_ptr<TCPConnectionToBackend>& sharedSelf);
   void handleTimeout(const struct timeval& now, bool write);
-  IOState handleResponse(const struct timeval& now);
   void setProxyProtocolPayload(std::string&& payload);
   void setProxyProtocolPayloadAdded(bool added);
 
 private:
+  static void handleIO(std::shared_ptr<TCPConnectionToBackend>& conn, const struct timeval& now);
+  static void handleIOCallback(int fd, FDMultiplexer::funcparam_t& param);
+  static IOState sendNextQuery(std::shared_ptr<TCPConnectionToBackend>& conn);
+
+  IOState handleResponse(std::shared_ptr<TCPConnectionToBackend>& conn, const struct timeval& now);
   uint16_t getQueryIdFromResponse();
   bool reconnect();
   void notifyAllQueriesFailed(const struct timeval& now, bool timeout = false);
@@ -194,7 +212,6 @@ private:
   std::unique_ptr<Socket> d_socket{nullptr};
   std::unique_ptr<IOStateHandler> d_ioState{nullptr};
   std::shared_ptr<DownstreamState> d_ds{nullptr};
-  //std::weak_ptr<IncomingTCPConnectionState> d_clientConn;
   std::shared_ptr<IncomingTCPConnectionState> d_clientConn;
   std::string d_proxyProtocolPayload;
   TCPQuery d_currentQuery;
@@ -206,7 +223,7 @@ private:
   State d_state{State::idle};
   bool d_fresh{true};
   bool d_enableFastOpen{false};
-  bool d_connectionDied{true};
+  bool d_connectionDied{false};
   bool d_usedForXFR{false};
   bool d_proxyProtocolPayloadAdded{false};
 };
index 9479f0a4037ee0e5de69c74a4c03ac50e8b25e1a..721559256ea4caae3ebd963a69b77219a9a3967e 100644 (file)
@@ -57,7 +57,6 @@ struct ConnectionInfo
 class IncomingTCPConnectionState
 {
 public:
-  //IncomingTCPConnectionState(ConnectionInfo&& ci, TCPClientThreadData& threadData, const struct timeval& now): d_buffer(s_maxPacketCacheEntrySize), d_responseBuffer(s_maxPacketCacheEntrySize), d_threadData(threadData), d_ci(std::move(ci)), d_handler(d_ci.fd, g_tcpRecvTimeout, d_ci.cs->tlsFrontend ? d_ci.cs->tlsFrontend->getContext() : nullptr, now.tv_sec), d_ioState(threadData.mplexer, d_ci.fd), _connectionStartTime(now)
   IncomingTCPConnectionState(ConnectionInfo&& ci, TCPClientThreadData& threadData, const struct timeval& now): d_buffer(s_maxPacketCacheEntrySize), d_threadData(threadData), d_ci(std::move(ci)), d_handler(d_ci.fd, g_tcpRecvTimeout, d_ci.cs->tlsFrontend ? d_ci.cs->tlsFrontend->getContext() : nullptr, now.tv_sec), d_ioState(make_unique<IOStateHandler>(threadData.mplexer, d_ci.fd)), d_connectionStartTime(now)
   {
     d_origDest.reset();
@@ -138,6 +137,7 @@ public:
     return false;
   }
 
+#if 0
   void dump() const
   {
     static std::mutex s_mutex;
@@ -165,14 +165,11 @@ public:
       }
     }
   }
+#endif
 
-  std::shared_ptr<TCPConnectionToBackend> getActiveDownstreamConnection(const std::shared_ptr<DownstreamState>& ds)
-  {
-#warning TODO: we need to find a connection to this DS, usable (no TLV values sent) and supporting OOR
-    return nullptr;
-  }
-
+  std::shared_ptr<TCPConnectionToBackend> getActiveDownstreamConnection(const std::shared_ptr<DownstreamState>& ds);
   std::shared_ptr<TCPConnectionToBackend> getDownstreamConnection(std::shared_ptr<DownstreamState>& ds, const struct timeval& now);
+  void registerActiveDownstreamConnection(std::shared_ptr<TCPConnectionToBackend>& conn);
 
   std::unique_ptr<FDMultiplexer>& getIOMPlexer() const
   {
@@ -189,6 +186,8 @@ public:
   void handleXFRResponse(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now, TCPResponse&& response);
   void handleTimeout(bool write);
 
+  bool canAcceptNewQueries() const;
+
   bool active() const
   {
     return d_ioState != nullptr;
@@ -196,6 +195,7 @@ public:
 
   enum class State { doingHandshake, readingQuerySize, readingQuery, sendingResponse, idle /* in case of XFR, we stop processing queries */ };
 
+  std::map<ComboAddress, std::deque<std::shared_ptr<TCPConnectionToBackend>>> d_activeConnectionsToBackend;
   std::vector<uint8_t> d_buffer;
   std::deque<TCPResponse> d_queuedResponses;
   TCPClientThreadData& d_threadData;
@@ -211,6 +211,7 @@ public:
   struct timeval d_queryReadTime;
   size_t d_currentPos{0};
   size_t d_queriesCount{0};
+  size_t d_currentQueriesCount{0};
   unsigned int d_remainingTime{0};
   uint16_t d_querySize{0};
   uint16_t d_downstreamFailures{0};
@@ -219,10 +220,8 @@ public:
   bool d_readingFirstQuery{true};
   bool d_isXFR{false};
   bool d_xfrStarted{false};
-  bool d_xfrDone{false};
   bool d_selfGeneratedResponse{false};
   bool d_proxyProtocolPayloadAdded{false};
   bool d_proxyProtocolPayloadHasTLV{false};
 };
 
-IOState tryRead(int fd, std::vector<uint8_t>& buffer, size_t& pos, size_t toRead);
index fdb1c5ed97e3f5d49128b36e38d8ca704c9b8a68..e0e366dd78bc40c9fa699696b6d0cb2dd4f18f42 100644 (file)
@@ -4,6 +4,12 @@
 #include "mplexer.hh"
 #include "tcpiohandler.hh"
 
+#if 0
+#define DEBUGLOG(x) cerr<<x<<endl;
+#else
+#define DEBUGLOG(x)
+#endif
+
 class IOStateHandler
 {
 public:
@@ -43,14 +49,14 @@ public:
 
   void update(IOState iostate, FDMultiplexer::callbackfunc_t callback = FDMultiplexer::callbackfunc_t(), FDMultiplexer::funcparam_t callbackData = boost::any(), boost::optional<struct timeval> ttd = boost::none)
   {
-    cerr<<"in "<<__PRETTY_FUNCTION__<<" for fd "<<d_fd<<", last state was "<<(int)d_currentState<<", new state is "<<(int)iostate<<endl;
+    DEBUGLOG("in "<<__PRETTY_FUNCTION__<<" for fd "<<d_fd<<", last state was "<<(int)d_currentState<<", new state is "<<(int)iostate);
     if (d_currentState == IOState::NeedRead && iostate != IOState::NeedRead) {
-      cerr<<__PRETTY_FUNCTION__<<": remove read FD "<<d_fd<<endl;
+      DEBUGLOG(__PRETTY_FUNCTION__<<": remove read FD "<<d_fd);
       d_mplexer->removeReadFD(d_fd);
       d_currentState = IOState::Done;
     }
     else if (d_currentState == IOState::NeedWrite && iostate != IOState::NeedWrite) {
-      cerr<<__PRETTY_FUNCTION__<<": remove write FD "<<d_fd<<endl;
+      DEBUGLOG(__PRETTY_FUNCTION__<<": remove write FD "<<d_fd);
       d_mplexer->removeWriteFD(d_fd);
       d_currentState = IOState::Done;
     }
@@ -65,7 +71,7 @@ public:
       }
 
       d_currentState = IOState::NeedRead;
-      cerr<<__PRETTY_FUNCTION__<<": add read FD "<<d_fd<<endl;
+      DEBUGLOG(__PRETTY_FUNCTION__<<": add read FD "<<d_fd);
       d_mplexer->addReadFD(d_fd, callback, callbackData, ttd ? &*ttd : nullptr);
     }
     else if (iostate == IOState::NeedWrite) {
@@ -74,12 +80,12 @@ public:
       }
 
       d_currentState = IOState::NeedWrite;
-      cerr<<__PRETTY_FUNCTION__<<": add write FD "<<d_fd<<endl;
+      DEBUGLOG(__PRETTY_FUNCTION__<<": add write FD "<<d_fd);
       d_mplexer->addWriteFD(d_fd, callback, callbackData, ttd ? &*ttd : nullptr);
     }
     else if (iostate == IOState::Done) {
       d_currentState = IOState::Done;
-      cerr<<__PRETTY_FUNCTION__<<": done"<<endl;
+      DEBUGLOG(__PRETTY_FUNCTION__<<": done");
     }
   }
 
@@ -104,7 +110,7 @@ public:
        let's reset the state so it's not registered to the IO multiplexer anymore
        and its reference count goes to zero */
     if (d_enabled && d_handler) {
-      cerr<<"IOStateGuard destroyed while holding a state, let's reset it"<<endl;
+      DEBUGLOG("IOStateGuard destroyed while holding a state, let's reset it");
       d_handler->reset();
       d_enabled = false;
     }
index ab6c1a208c8b9632430ba527d0e1ace9c38d3445..9c3313d40f0a69bf0a289c9e8cc8d7d6e0b9e529 100644 (file)
@@ -61,6 +61,10 @@ def ProxyProtocolUDPResponder(port, fromQueue, toQueue):
     sock.close()
 
 def ProxyProtocolTCPResponder(port, fromQueue, toQueue):
+    # be aware that this responder will not accept a new connection
+    # until the last one has been closed. This is done on purpose to
+    # to check for connection reuse, making sure that a lot of connections
+    # are not open in parallel.
     sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
     sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
     sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)