From: Remi Gacogne Date: Thu, 10 Sep 2020 15:05:04 +0000 (+0200) Subject: dnsdist: Match backend connections on the backend, not its address X-Git-Tag: auth-4.5.0-alpha0~14^2~19 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=4f0a424e6a236c57493ec7aa8ebd20c6c52bda9a;p=thirdparty%2Fpdns.git dnsdist: Match backend connections on the backend, not its address --- diff --git a/pdns/dnsdist-tcp.cc b/pdns/dnsdist-tcp.cc index dae2462a63..44ad539ef5 100644 --- a/pdns/dnsdist-tcp.cc +++ b/pdns/dnsdist-tcp.cc @@ -33,7 +33,6 @@ #include "dnsdist-xpf.hh" #include "dnsparser.hh" #include "dolog.hh" -#include "ednsoptions.hh" #include "gettime.hh" #include "lock.hh" #include "sstuff.hh" @@ -74,7 +73,7 @@ public: { std::shared_ptr result; - const auto& it = t_downstreamConnections.find(ds->remote); + const auto& it = t_downstreamConnections.find(ds); if (it != t_downstreamConnections.end()) { auto& list = it->second; if (!list.empty()) { @@ -99,8 +98,8 @@ public: return; } - const auto& remote = conn->getRemote(); - const auto& it = t_downstreamConnections.find(remote); + const auto& ds = conn->getDS(); + const auto& it = t_downstreamConnections.find(ds); if (it != t_downstreamConnections.end()) { auto& list = it->second; if (list.size() >= s_maxCachedConnectionsPerDownstream) { @@ -112,7 +111,7 @@ public: list.push_back(std::move(conn)); } else { - t_downstreamConnections[remote].push_back(std::move(conn)); + t_downstreamConnections[ds].push_back(std::move(conn)); } } @@ -138,11 +137,11 @@ public: } private: - static thread_local map>> t_downstreamConnections; + static thread_local map, std::deque>> t_downstreamConnections; static const size_t s_maxCachedConnectionsPerDownstream; }; -thread_local map>> DownstreamConnectionsManager::t_downstreamConnections; +thread_local map, std::deque>> DownstreamConnectionsManager::t_downstreamConnections; const size_t DownstreamConnectionsManager::s_maxCachedConnectionsPerDownstream{20}; static void decrementTCPClientCount(const ComboAddress& client) @@ -158,7 +157,6 @@ static void decrementTCPClientCount(const ComboAddress& client) IncomingTCPConnectionState::~IncomingTCPConnectionState() { - DEBUGLOG("in "<<__PRETTY_FUNCTION__); decrementTCPClientCount(d_ci.remote); if (d_ci.cs != nullptr) { @@ -311,7 +309,6 @@ static IOState handleResponseSent(std::shared_ptr& s } if (state->d_queuedResponses.empty()) { - DEBUGLOG("no response remaining"); if (state->d_isXFR) { /* we should still be reading from the backend, and we don't want to read from the client */ state->d_state = IncomingTCPConnectionState::State::idle; @@ -319,12 +316,13 @@ static IOState handleResponseSent(std::shared_ptr& s DEBUGLOG("idling for XFR completion"); return IOState::Done; } else { - DEBUGLOG("reading new queries if any"); if (state->canAcceptNewQueries()) { + DEBUGLOG("waiting for new queries"); state->resetForNewQuery(); return IOState::NeedRead; } else { + DEBUGLOG("idling"); state->d_state = IncomingTCPConnectionState::State::idle; return IOState::Done; } @@ -348,13 +346,11 @@ bool IncomingTCPConnectionState::canAcceptNewQueries() const return false; } - // d_state ? if (d_currentQueriesCount >= d_ci.cs->d_maxInFlightQueriesPerConn) { DEBUGLOG("not accepting new queries because we already have "<d_maxInFlightQueriesPerConn); return false; } - DEBUGLOG("accepting new queries"); return true; } @@ -371,18 +367,18 @@ void IncomingTCPConnectionState::resetForNewQuery() std::shared_ptr IncomingTCPConnectionState::getActiveDownstreamConnection(const std::shared_ptr& ds) { - auto it = d_activeConnectionsToBackend.find(ds->remote); + auto it = d_activeConnectionsToBackend.find(ds); if (it == d_activeConnectionsToBackend.end()) { - DEBUGLOG("no active connection found for "<remote.toString()); + DEBUGLOG("no active connection found for "<getName()); return nullptr; } for (auto& conn : it->second) { if (conn->canAcceptNewQueries()) { - DEBUGLOG("Got one active connection accepting more for "<remote.toString()); + DEBUGLOG("Got one active connection accepting more for "<getName()); return conn; } - DEBUGLOG("not accepting more for "<remote.toString()); + DEBUGLOG("not accepting more for "<getName()); } return nullptr; @@ -390,13 +386,12 @@ std::shared_ptr IncomingTCPConnectionState::getActiveDow void IncomingTCPConnectionState::registerActiveDownstreamConnection(std::shared_ptr& conn) { - d_activeConnectionsToBackend[conn->getRemote()].push_front(conn); + d_activeConnectionsToBackend[conn->getDS()].push_front(conn); } /* this version is called when the buffer has been set and the rules have been processed */ void IncomingTCPConnectionState::sendResponse(std::shared_ptr& state, const struct timeval& now, TCPResponse&& response) { - DEBUGLOG("in "<<__PRETTY_FUNCTION__); // if we already reading a query (not the query size, mind you), or sending a response we need to either queue the response // otherwise we can start sending it right away if (state->d_state == IncomingTCPConnectionState::State::idle || @@ -425,9 +420,9 @@ void IncomingTCPConnectionState::sendResponse(std::shared_ptr& state, const struct timeval& now, TCPResponse&& response) { - DEBUGLOG("in "<<__PRETTY_FUNCTION__); - if (response.d_connection && response.d_connection->isIdle()) { - auto& list = d_activeConnectionsToBackend.at(response.d_connection->getRemote()); + // if we have added a TCP Proxy Protocol payload to a connection, don't release it yet, no one else will be able to use it anyway + if (!state->d_isXFR && response.d_connection && response.d_connection->isIdle() && response.d_connection->canBeReused()) { + auto& list = d_activeConnectionsToBackend.at(response.d_connection->getDS()); for (auto it = list.begin(); it != list.end(); ++it) { if (*it == response.d_connection) { DownstreamConnectionsManager::releaseDownstreamConnection(std::move(*it)); @@ -650,7 +645,6 @@ void IncomingTCPConnectionState::handleIOCallback(int fd, FDMultiplexer::funcpar void IncomingTCPConnectionState::handleIO(std::shared_ptr& state, const struct timeval& now) { - DEBUGLOG("in "<<__PRETTY_FUNCTION__); // why do we loop? Because the TLS layer does buffering, and thus can have data ready to read // even though the underlying socket is not ready, so we need to actually ask for the data first bool wouldBlock = false; @@ -728,7 +722,6 @@ void IncomingTCPConnectionState::handleIO(std::shared_ptrd_queuedResponses.empty()) { @@ -752,7 +745,6 @@ void IncomingTCPConnectionState::handleIO(std::shared_ptr& state, IDState&& query, const struct timeval& now) { - DEBUGLOG("in "<<__PRETTY_FUNCTION__); if (d_state == State::sendingResponse) { /* if we have responses to send, let's do that first */ } diff --git a/pdns/dnsdistdist/dnsdist-tcp-downstream.cc b/pdns/dnsdistdist/dnsdist-tcp-downstream.cc index 0a5a11b1df..8b5e56e0bb 100644 --- a/pdns/dnsdistdist/dnsdist-tcp-downstream.cc +++ b/pdns/dnsdistdist/dnsdist-tcp-downstream.cc @@ -6,13 +6,21 @@ const uint16_t TCPConnectionToBackend::s_xfrID = 0; void TCPConnectionToBackend::assignToClientConnection(std::shared_ptr& clientConn, bool isXFR) { - DEBUGLOG("in "<<__PRETTY_FUNCTION__); + if (d_usedForXFR == true) { + throw std::runtime_error("Trying to send a query over a backend connection used for XFR"); + } + if (isXFR) { d_usedForXFR = true; } - d_clientConn = clientConn; - d_ioState = make_unique(clientConn->getIOMPlexer(), d_socket->getHandle()); + if (!d_clientConn) { + d_clientConn = clientConn; + d_ioState = make_unique(clientConn->getIOMPlexer(), d_socket->getHandle()); + } + else if (d_clientConn != clientConn) { + throw std::runtime_error("Assigning a query from a different client to an existing backend connection with pending queries"); + } } IOState TCPConnectionToBackend::sendNextQuery(std::shared_ptr& conn) @@ -63,7 +71,6 @@ static IOState tryRead(int fd, std::vector& buffer, size_t& pos, size_t void TCPConnectionToBackend::handleIO(std::shared_ptr& conn, const struct timeval& now) { - DEBUGLOG("in "<<__PRETTY_FUNCTION__); if (conn->d_socket == nullptr) { throw std::runtime_error("No downstream socket in " + std::string(__PRETTY_FUNCTION__) + "!"); } @@ -75,7 +82,7 @@ void TCPConnectionToBackend::handleIO(std::shared_ptr& c try { if (conn->d_state == State::sendingQueryToBackend) { - DEBUGLOG("sending query to backend over FD "<getDS()->getName()<<" over FD "<isFastOpenEnabled()) { @@ -89,7 +96,7 @@ void TCPConnectionToBackend::handleIO(std::shared_ptr& c /* request sent ! */ conn->incQueries(); conn->d_currentPos = 0; - //conn->d_currentQuery.d_querySentTime = now; + DEBUGLOG("adding a pending response for ID "<d_currentQuery.d_idstate.origID<<" and QNAME "<d_currentQuery.d_idstate.qname); conn->d_pendingResponses[conn->d_currentQuery.d_idstate.origID] = std::move(conn->d_currentQuery); conn->d_currentQuery.d_buffer.clear(); @@ -117,7 +124,7 @@ void TCPConnectionToBackend::handleIO(std::shared_ptr& c // then we need to allocate a new buffer (new because we might need to re-send the query if the // backend dies on us) // We also might need to read and send to the client more than one response in case of XFR (yeah!) - // should very likely be a TCPIOHandler d_downstreamHandler + // should very likely be a TCPIOHandler conn->d_responseBuffer.resize(sizeof(uint16_t)); iostate = tryRead(fd, conn->d_responseBuffer, conn->d_currentPos, sizeof(uint16_t) - conn->d_currentPos); if (iostate == IOState::Done) { @@ -135,7 +142,6 @@ void TCPConnectionToBackend::handleIO(std::shared_ptr& c iostate = tryRead(fd, conn->d_responseBuffer, conn->d_currentPos, conn->d_responseSize - conn->d_currentPos); if (iostate == IOState::Done) { DEBUGLOG("got response from backend"); - //conn->d_responseReadTime = now; try { iostate = conn->handleResponse(conn, now); } @@ -170,15 +176,6 @@ void TCPConnectionToBackend::handleIO(std::shared_ptr& c ++conn->d_downstreamFailures; } -#if 0 - if (conn->d_outstanding) { - conn->d_outstanding = false; - - if (conn->d_ds != nullptr) { - --conn->d_ds->outstanding; - } - } -#endif /* remove this FD from the IO multiplexer */ iostate = IOState::Done; connectionDied = true; @@ -189,15 +186,14 @@ void TCPConnectionToBackend::handleIO(std::shared_ptr& c DEBUGLOG("connection died, number of failures is "<d_downstreamFailures<<", retries is "<d_ds->retries); if ((!conn->d_usedForXFR || conn->d_queries == 0) && conn->d_downstreamFailures < conn->d_ds->retries) { - DEBUGLOG("reconnecting"); + conn->d_ioState->reset(); ioGuard.release(); if (conn->reconnect()) { - DEBUGLOG("reconnected"); - conn->d_ioState = make_unique(conn->d_clientConn->getIOMPlexer(), conn->d_socket->getHandle()); + /* we need to resend the queries that were in flight, if any */ for (auto& pending : conn->d_pendingResponses) { conn->d_pendingQueries.push_back(std::move(pending.second)); } @@ -252,7 +248,6 @@ void TCPConnectionToBackend::handleIOCallback(int fd, FDMultiplexer::funcparam_t void TCPConnectionToBackend::queueQuery(TCPQuery&& query, std::shared_ptr& sharedSelf) { - DEBUGLOG("in "<<__PRETTY_FUNCTION__); if (d_ioState == nullptr) { throw std::runtime_error("Trying to queue a query to a TCP connection that has no incoming client connection assigned"); } @@ -269,8 +264,6 @@ void TCPConnectionToBackend::queueQuery(TCPQuery&& query, std::shared_ptrgetNameWithAddr(), d_downstreamFailures); + DEBUGLOG("Opening TCP connection to backend "<getNameWithAddr()); try { result = std::unique_ptr(new Socket(d_ds->remote.sin4.sin_family, SOCK_STREAM, 0)); DEBUGLOG("result of connect is "<getHandle()); + if (!IsAnyAddress(d_ds->sourceAddr)) { SSetsockopt(result->getHandle(), SOL_SOCKET, SO_REUSEADDR, 1); #ifdef IP_BIND_ADDRESS_NO_PORT @@ -327,7 +321,6 @@ bool TCPConnectionToBackend::reconnect() #endif /* MSG_FASTOPEN */ d_socket = std::move(result); - DEBUGLOG("connected new socket "<getHandle()); ++d_ds->tcpCurrentConnections; return true; } @@ -362,7 +355,6 @@ void TCPConnectionToBackend::handleTimeout(const struct timeval& now, bool write void TCPConnectionToBackend::notifyAllQueriesFailed(const struct timeval& now, bool timeout) { - DEBUGLOG("in "<<__PRETTY_FUNCTION__); d_connectionDied = true; auto& clientConn = d_clientConn; @@ -376,7 +368,7 @@ void TCPConnectionToBackend::notifyAllQueriesFailed(const struct timeval& now, b ++clientConn->d_ci.cs->tcpDownstreamTimeouts; } - if (d_state == State::doingHandshake || d_state == State::sendingQueryToBackend) { + if (d_state == State::sendingQueryToBackend) { clientConn->notifyIOError(clientConn, std::move(d_currentQuery.d_idstate), now); } @@ -396,13 +388,10 @@ void TCPConnectionToBackend::notifyAllQueriesFailed(const struct timeval& now, b IOState TCPConnectionToBackend::handleResponse(std::shared_ptr& conn, const struct timeval& now) { - DEBUGLOG("in "<<__PRETTY_FUNCTION__); - d_downstreamFailures = 0; auto& clientConn = d_clientConn; if (!clientConn->active()) { - DEBUGLOG("client is not active"); // a client timeout occured, or something like that */ d_connectionDied = true; d_clientConn.reset(); @@ -418,11 +407,10 @@ IOState TCPConnectionToBackend::handleResponse(std::shared_ptrsecond.d_idstate); d_pendingResponses.erase(it); DEBUGLOG("passing response to client connection for "<>> d_activeConnectionsToBackend; + std::map, std::deque>> d_activeConnectionsToBackend; std::vector d_buffer; std::deque d_queuedResponses; TCPClientThreadData& d_threadData;