From: Remi Gacogne Date: Tue, 8 Sep 2020 11:12:27 +0000 (+0200) Subject: dnsdist: Try to reuse active connections X-Git-Tag: auth-4.5.0-alpha0~14^2~22 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=20ad50125812ecbda394deb8af1e9dcc769d4317;p=thirdparty%2Fpdns.git dnsdist: Try to reuse active connections --- diff --git a/pdns/dnsdist-tcp.cc b/pdns/dnsdist-tcp.cc index 3533bccda5..dae2462a63 100644 --- a/pdns/dnsdist-tcp.cc +++ b/pdns/dnsdist-tcp.cc @@ -70,9 +70,9 @@ class DownstreamConnectionsManager { public: - static std::unique_ptr getConnectionToDownstream(std::unique_ptr& mplexer, std::shared_ptr& ds, const struct timeval& now) + static std::shared_ptr getConnectionToDownstream(std::unique_ptr& mplexer, std::shared_ptr& ds, const struct timeval& now) { - std::unique_ptr result; + std::shared_ptr result; const auto& it = t_downstreamConnections.find(ds->remote); if (it != t_downstreamConnections.end()) { @@ -85,10 +85,10 @@ public: } } - return make_unique(ds, now); + return std::make_shared(ds, now); } - static void releaseDownstreamConnection(std::unique_ptr&& conn) + static void releaseDownstreamConnection(std::shared_ptr&& conn) { if (conn == nullptr) { return; @@ -108,6 +108,7 @@ public: conn.reset(); return; } + list.push_back(std::move(conn)); } else { @@ -137,11 +138,11 @@ public: } private: - static thread_local map>> t_downstreamConnections; + static thread_local map>> t_downstreamConnections; static const size_t s_maxCachedConnectionsPerDownstream; }; -thread_local map>> DownstreamConnectionsManager::t_downstreamConnections; +thread_local map>> DownstreamConnectionsManager::t_downstreamConnections; const size_t DownstreamConnectionsManager::s_maxCachedConnectionsPerDownstream{20}; static void decrementTCPClientCount(const ComboAddress& client) @@ -157,53 +158,22 @@ static void decrementTCPClientCount(const ComboAddress& client) IncomingTCPConnectionState::~IncomingTCPConnectionState() { - // DEBUG: cerr<<"in "<<__PRETTY_FUNCTION__<updateTCPMetrics(d_queriesCount, diff.tv_sec * 1000.0 + diff.tv_usec / 1000.0); - // DEBUG: cerr<<"updated tcp metrics"<getHandle()<removeReadFD(d_downstreamConnection->getHandle()); - } - else if (d_lastIOState == IOState::NeedWrite) { - // DEBUG: cerr<<__PRETTY_FUNCTION__<<": removing leftover backend write FD "<getHandle()<removeWriteFD(d_downstreamConnection->getHandle()); - } - } - catch(const FDMultiplexerException& e) { - vinfolog("Got an exception when trying to remove a pending IO operation on the socket to the %s backend: %s", d_ds->getName(), e.what()); - } - catch(const std::runtime_error& e) { - /* might be thrown by getHandle() */ - vinfolog("Got an exception when trying to remove a pending IO operation on the socket to the %s backend: %s", d_ds->getName(), e.what()); - } - } - } -#endif - - // DEBUG: cerr<<"about to remove left over FDs"<removeReadFD(d_ci.fd); } else if (d_lastIOState == IOState::NeedWrite) { - // DEBUG: cerr<<__PRETTY_FUNCTION__<<": removing leftover client write FD "<removeWriteFD(d_ci.fd); } } @@ -213,7 +183,6 @@ IncomingTCPConnectionState::~IncomingTCPConnectionState() catch (...) { vinfolog("Got an unknown exception when trying to remove a pending IO operation on an incoming TCP connection from %s", d_ci.remote.toStringWithPort()); } - // DEBUG: cerr<<"done"< IncomingTCPConnectionState::getDownstreamConnection(std::shared_ptr& ds, const struct timeval& now) @@ -227,6 +196,7 @@ std::shared_ptr IncomingTCPConnectionState::getDownstrea if (!downstream) { /* we don't have a connection to this backend active yet, let's ask one (it might not be a fresh one, though) */ downstream = DownstreamConnectionsManager::getConnectionToDownstream(d_threadData.mplexer, ds, now); + registerActiveDownstreamConnection(downstream); } return downstream; @@ -298,55 +268,22 @@ void TCPClientCollection::addTCPClientThread() } } -/* Tries to read exactly toRead bytes into the buffer, starting at position pos. - Updates pos everytime a successful read occurs, - throws an std::runtime_error in case of IO error, - return Done when toRead bytes have been read, needRead or needWrite if the IO operation - would block. -*/ -// XXX could probably be implemented as a TCPIOHandler -IOState tryRead(int fd, std::vector& buffer, size_t& pos, size_t toRead) -{ - if (buffer.size() < (pos + toRead)) { - throw std::out_of_range("Calling tryRead() with a too small buffer (" + std::to_string(buffer.size()) + ") for a read of " + std::to_string(toRead) + " bytes starting at " + std::to_string(pos)); - } - - size_t got = 0; - do { - ssize_t res = ::read(fd, reinterpret_cast(&buffer.at(pos)), toRead - got); - if (res == 0) { - throw runtime_error("EOF while reading message"); - } - if (res < 0) { - if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOTCONN) { - return IOState::NeedRead; - } - else { - throw std::runtime_error(std::string("Error while reading message: ") + stringerror()); - } - } - - pos += static_cast(res); - got += static_cast(res); - } - while (got < toRead); - - return IOState::Done; -} - std::unique_ptr g_tcpclientthreads; static IOState handleResponseSent(std::shared_ptr& state, const struct timeval& now) { + --state->d_currentQueriesCount; + if (!state->d_isXFR) { const auto& currentResponse = state->d_currentResponse; - if (state->d_selfGeneratedResponse == false && currentResponse.d_ds) { + if (state->d_selfGeneratedResponse == false && currentResponse.d_connection && currentResponse.d_connection->getDS()) { + const auto& ds = currentResponse.d_connection->getDS(); struct timespec answertime; gettime(&answertime); const auto& ids = currentResponse.d_idstate; double udiff = ids.sentTime.udiff(); - g_rings.insertResponse(answertime, state->d_ci.remote, ids.qname, ids.qtype, static_cast(udiff), static_cast(currentResponse.d_buffer.size()), currentResponse.d_cleartextDH, currentResponse.d_ds->remote); - vinfolog("Got answer from %s, relayed to %s (%s), took %f usec", currentResponse.d_ds->remote.toStringWithPort(), ids.origRemote.toStringWithPort(), (state->d_ci.cs->tlsFrontend ? "DoT" : "TCP"), udiff); + g_rings.insertResponse(answertime, state->d_ci.remote, ids.qname, ids.qtype, static_cast(udiff), static_cast(currentResponse.d_buffer.size()), currentResponse.d_cleartextDH, ds->remote); + vinfolog("Got answer from %s, relayed to %s (%s), took %f usec", ds->remote.toStringWithPort(), ids.origRemote.toStringWithPort(), (state->d_ci.cs->tlsFrontend ? "DoT" : "TCP"), udiff); } switch (currentResponse.d_cleartextDH.rcode) { @@ -374,21 +311,27 @@ static IOState handleResponseSent(std::shared_ptr& s } if (state->d_queuedResponses.empty()) { - // DEBUG: cerr<<"no response remaining"<d_isXFR) { /* we should still be reading from the backend, and we don't want to read from the client */ state->d_state = IncomingTCPConnectionState::State::idle; state->d_currentPos = 0; - // DEBUG: cerr<<"idling for XFR completion"<resetForNewQuery(); - return IOState::NeedRead; + DEBUGLOG("reading new queries if any"); + if (state->canAcceptNewQueries()) { + state->resetForNewQuery(); + return IOState::NeedRead; + } + else { + state->d_state = IncomingTCPConnectionState::State::idle; + return IOState::Done; + } } } else { - // DEBUG: cerr<<"queue size is "<d_queuedResponses.size()<d_queuedResponses.size()); TCPResponse resp = std::move(state->d_queuedResponses.front()); state->d_queuedResponses.pop_front(); state->d_state = IncomingTCPConnectionState::State::idle; @@ -397,22 +340,63 @@ static IOState handleResponseSent(std::shared_ptr& s } } + +bool IncomingTCPConnectionState::canAcceptNewQueries() const +{ + if (d_isXFR) { + DEBUGLOG("not accepting new queries because used for XFR"); + return false; + } + + // d_state ? + if (d_currentQueriesCount >= d_ci.cs->d_maxInFlightQueriesPerConn) { + DEBUGLOG("not accepting new queries because we already have "<d_maxInFlightQueriesPerConn); + return false; + } + + DEBUGLOG("accepting new queries"); + return true; +} + void IncomingTCPConnectionState::resetForNewQuery() { d_buffer.resize(sizeof(uint16_t)); d_currentPos = 0; d_querySize = 0; - //d_responseSize = 0; d_downstreamFailures = 0; d_state = State::readingQuerySize; d_lastIOState = IOState::Done; d_selfGeneratedResponse = false; } +std::shared_ptr IncomingTCPConnectionState::getActiveDownstreamConnection(const std::shared_ptr& ds) +{ + auto it = d_activeConnectionsToBackend.find(ds->remote); + if (it == d_activeConnectionsToBackend.end()) { + DEBUGLOG("no active connection found for "<remote.toString()); + return nullptr; + } + + for (auto& conn : it->second) { + if (conn->canAcceptNewQueries()) { + DEBUGLOG("Got one active connection accepting more for "<remote.toString()); + return conn; + } + DEBUGLOG("not accepting more for "<remote.toString()); + } + + return nullptr; +} + +void IncomingTCPConnectionState::registerActiveDownstreamConnection(std::shared_ptr& conn) +{ + d_activeConnectionsToBackend[conn->getRemote()].push_front(conn); +} + /* this version is called when the buffer has been set and the rules have been processed */ void IncomingTCPConnectionState::sendResponse(std::shared_ptr& state, const struct timeval& now, TCPResponse&& response) { - // DEBUG: cerr<<"in "<<__PRETTY_FUNCTION__<d_state == IncomingTCPConnectionState::State::idle || @@ -429,23 +413,31 @@ void IncomingTCPConnectionState::sendResponse(std::shared_ptrd_currentPos = 0; state->d_currentResponse = std::move(response); - //IncomingTCPConnectionState::handleIO(state, now); - state->d_ioState->update(IOState::NeedWrite, handleIOCallback, state, getClientWriteTTD(now)); - // DEBUG: cerr<<"updated IO state"<d_ioState->update(IOState::NeedWrite, handleIOCallback, state, getClientWriteTTD(now)); } else { // queue response state->d_queuedResponses.push_back(std::move(response)); - // DEBUG: cerr<<"queueing response because state is "<<(int)state->d_state<<", queue size is now "<d_queuedResponses.size()<d_state<<", queue size is now "<d_queuedResponses.size()); } } /* this version is called from the backend code when a new response has been received */ void IncomingTCPConnectionState::handleResponse(std::shared_ptr& state, const struct timeval& now, TCPResponse&& response) { - // DEBUG: cerr<<"in "<<__PRETTY_FUNCTION__<isIdle()) { + auto& list = d_activeConnectionsToBackend.at(response.d_connection->getRemote()); + for (auto it = list.begin(); it != list.end(); ++it) { + if (*it == response.d_connection) { + DownstreamConnectionsManager::releaseDownstreamConnection(std::move(*it)); + list.erase(it); + break; + } + } + } + if (response.d_buffer.size() < sizeof(dnsheader)) { - // DEBUG: cerr<<"too small"<(&response.d_buffer.at(0)); auto& ids = response.d_idstate; - // DEBUG: cerr<<"IDS has "<<(ids.qTag?" TAGS ": "NO TAGS")<remote, consumed)) { - // DEBUG: cerr<<"content does not match"<getRemote(), consumed)) { return; } @@ -474,7 +463,6 @@ void IncomingTCPConnectionState::handleResponse(std::shared_ptr rewrittenResponse; if (!processResponse(&responseAsCharArray, &responseSize, &responseCapacity, state->d_threadData.localRespRulactions, dr, addRoom, rewrittenResponse, false)) { - // DEBUG: cerr<<"process said to drop it"<d_xfrStarted = true; ++g_stats.responses; ++state->d_ci.cs->responses; - ++response.d_ds->responses; + if (response.d_connection->getDS()) { + ++response.d_connection->getDS()->responses; + } } if (!state->d_isXFR) { ++g_stats.responses; ++state->d_ci.cs->responses; - ++response.d_ds->responses; + if (response.d_connection->getDS()) { + ++response.d_connection->getDS()->responses; + } } sendResponse(state, now, std::move(response)); @@ -547,11 +539,10 @@ static bool handleQuery(std::shared_ptr& state, cons std::shared_ptr dnsCryptQuery{nullptr}; auto dnsCryptResponse = checkDNSCryptQuery(*state->d_ci.cs, query, state->d_querySize, dnsCryptQuery, queryRealTime.tv_sec, true); if (dnsCryptResponse) { - //state->d_responseBuffer = std::move(*dnsCryptResponse); - //state->d_responseSize = state->d_responseBuffer.size(); TCPResponse response; response.d_buffer = std::move(*dnsCryptResponse); state->d_state = IncomingTCPConnectionState::State::idle; + ++state->d_currentQueriesCount; state->sendResponse(state, now, std::move(response)); return false; } @@ -586,6 +577,7 @@ static bool handleQuery(std::shared_ptr& state, cons TCPResponse response; response.d_buffer = std::move(state->d_buffer); state->d_state = IncomingTCPConnectionState::State::idle; + ++state->d_currentQueriesCount; state->sendResponse(state, now, std::move(response)); return false; } @@ -594,17 +586,8 @@ static bool handleQuery(std::shared_ptr& state, cons return true; } -#warning move this, we should just never read another question again on this client connection - if (state->d_xfrStarted) { - /* sorry, but we are not going to resume a XFR if we have already sent some packets - to the client */ - return true; - } - IDState ids; - // DEBUG: cerr<<"DQ has "<<(dq.qTag?" TAGS ": "NO TAGS")<id); const uint8_t sizeBytes[] = { static_cast(dq.len / 256), static_cast(dq.len % 256) }; @@ -647,11 +630,9 @@ static bool handleQuery(std::shared_ptr& state, cons vinfolog("Got query for %s|%s from %s (%s, %d bytes), relayed to %s", ids.qname.toLogString(), QType(ids.qtype).getName(), state->d_ci.remote.toStringWithPort(), (state->d_ci.cs->tlsFrontend ? "DoT" : "TCP"), state->d_buffer.size(), ds->getName()); -// DEBUG: cerr<<"about to be queued query IDS has "<<(ids.qTag?" TAGS ": "NO TAGS")<d_currentQueriesCount; downstreamConnection->queueQuery(TCPQuery(std::move(state->d_buffer), std::move(ids)), downstreamConnection); - //sendQueryToBackend(state, now); - // DEBUG: cerr<<"out of "<<__PRETTY_FUNCTION__<& state, const struct timeval& now) { - // DEBUG: cerr<<"in "<<__PRETTY_FUNCTION__<d_state == IncomingTCPConnectionState::State::doingHandshake) { - // DEBUG: cerr<<"doing handshake"<d_handler.tryHandshake(); if (iostate == IOState::Done) { - // DEBUG: cerr<<"handshake done"<d_handler.isTLS()) { if (!state->d_handler.hasTLSSessionBeenResumed()) { ++state->d_ci.cs->tlsNewSessions; @@ -715,10 +696,10 @@ void IncomingTCPConnectionState::handleIO(std::shared_ptrd_state == IncomingTCPConnectionState::State::readingQuerySize) { - // DEBUG: cerr<<"reading query size"<d_handler.tryRead(state->d_buffer, state->d_currentPos, sizeof(uint16_t)); if (iostate == IOState::Done) { - // DEBUG: cerr<<"query size received"<d_state = IncomingTCPConnectionState::State::readingQuery; state->d_querySizeReadTime = now; if (state->d_queriesCount == 0) { @@ -727,8 +708,6 @@ void IncomingTCPConnectionState::handleIO(std::shared_ptrd_querySize = state->d_buffer.at(0) * 256 + state->d_buffer.at(1); if (state->d_querySize < sizeof(dnsheader)) { /* go away */ - // will be handled by the guard - //handleNewIOState(state, IOState::Done, fd, handleIOCallback); return; } @@ -743,22 +722,24 @@ void IncomingTCPConnectionState::handleIO(std::shared_ptrd_state == IncomingTCPConnectionState::State::readingQuery) { - // DEBUG: cerr<<"reading query"<d_handler.tryRead(state->d_buffer, state->d_currentPos, state->d_querySize); if (iostate == IOState::Done) { - // DEBUG: cerr<<"query received"<d_queuedResponses.empty()) { - state->resetForNewQuery(); - // DEBUG: cerr<<__LINE__<d_ioState->update(IOState::NeedRead, handleIOCallback, state, state->getClientReadTTD(now)); - // DEBUG: cerr<<__LINE__<canAcceptNewQueries()) { + state->resetForNewQuery(); + iostate = IOState::NeedRead; + } + else { + state->d_state = IncomingTCPConnectionState::State::idle; + iostate = IOState::Done; + } } else { TCPResponse resp = std::move(state->d_queuedResponses.front()); @@ -771,7 +752,7 @@ void IncomingTCPConnectionState::handleIO(std::shared_ptrd_state == IncomingTCPConnectionState::State::sendingResponse) { - // DEBUG: cerr<<"sending response"<d_handler.tryWrite(state->d_currentResponse.d_buffer, state->d_currentPos, state->d_currentResponse.d_buffer.size()); if (iostate == IOState::Done) { - // DEBUG: cerr<<"response sent"<d_ioState->update(IOState::NeedRead, handleIOCallback, state, state->getClientReadTTD(now)); - //// DEBUG: cerr<<__LINE__<d_state != IncomingTCPConnectionState::State::idle && @@ -820,26 +798,22 @@ void IncomingTCPConnectionState::handleIO(std::shared_ptrd_lastIOState == IOState::NeedWrite || state->d_readingFirstQuery) { - // DEBUG: cerr<<"Got an exception while handling TCP query: "<d_lastIOState == IOState::NeedRead ? "reading" : "writing"), state->d_ci.remote.toStringWithPort(), e.what()); } else { vinfolog("Closing TCP client connection with %s", state->d_ci.remote.toStringWithPort()); - // DEBUG: cerr<<"Closing TCP client connection: "<d_ioState->update(iostate, handleIOCallback, state); - // DEBUG: cerr<<__LINE__<d_ioState->update(iostate, handleIOCallback, state, iostate == IOState::NeedRead ? state->getClientReadTTD(now) : state->getClientWriteTTD(now)); - // DEBUG: cerr<<__LINE__<& state, IDState&& query, const struct timeval& now) { - // DEBUG: cerr<<"in "<<__PRETTY_FUNCTION__<reset(); } - - // DEBUG: cerr<<"out "<<__PRETTY_FUNCTION__<& state, const struct timeval& now, TCPResponse&& response) @@ -878,7 +846,7 @@ void IncomingTCPConnectionState::handleXFRResponse(std::shared_ptrtcpClientTimeouts; d_lastIOState = IOState::Done; d_ioState->reset(); @@ -980,14 +948,6 @@ static void tcpClientThread(int pipefd) vinfolog("Timeout (write) from remote backend %s", conn->getBackendName()); conn->handleTimeout(now, true); } -#if 0 - try { - data.mplexer->removeWriteFD(cbData.first); - } - catch (const FDMultiplexerException& fde) { - warnlog("Exception while removing a socket (%d) after a write timeout: %s", cbData.first, fde.what()); - } -#endif } } } diff --git a/pdns/dnsdist.hh b/pdns/dnsdist.hh index 0dee6cc8df..4640254aec 100644 --- a/pdns/dnsdist.hh +++ b/pdns/dnsdist.hh @@ -669,6 +669,7 @@ struct ClientState std::atomic tcpAvgQueriesPerConnection{0.0}; /* in ms */ std::atomic tcpAvgConnectionDuration{0.0}; + size_t d_maxInFlightQueriesPerConn{1}; int udpFD{-1}; int tcpFD{-1}; int tcpListenQueueSize{SOMAXCONN}; @@ -852,6 +853,7 @@ struct DownstreamState /* in ms */ std::atomic tcpAvgConnectionDuration{0.0}; size_t socketsOffset{0}; + size_t d_maxInFlightQueriesPerConn{1}; double queryLoad{0.0}; double dropRate{0.0}; double latencyUsec{0.0}; diff --git a/pdns/dnsdistdist/dnsdist-tcp-downstream.cc b/pdns/dnsdistdist/dnsdist-tcp-downstream.cc index 86dee17204..03a4c41b24 100644 --- a/pdns/dnsdistdist/dnsdist-tcp-downstream.cc +++ b/pdns/dnsdistdist/dnsdist-tcp-downstream.cc @@ -6,7 +6,7 @@ const uint16_t TCPConnectionToBackend::s_xfrID = 0; void TCPConnectionToBackend::assignToClientConnection(std::shared_ptr& clientConn, bool isXFR) { - // DEBUG: cerr<<"in "<<__PRETTY_FUNCTION__<d_currentQuery = std::move(conn->d_pendingQueries.front()); conn->d_pendingQueries.pop_front(); conn->d_state = State::sendingQueryToBackend; + conn->d_currentPos = 0; + return IOState::NeedWrite; } +/* Tries to read exactly toRead bytes into the buffer, starting at position pos. + Updates pos everytime a successful read occurs, + throws an std::runtime_error in case of IO error, + return Done when toRead bytes have been read, needRead or needWrite if the IO operation + would block. +*/ +// XXX could probably be implemented as a TCPIOHandler +static IOState tryRead(int fd, std::vector& buffer, size_t& pos, size_t toRead) +{ + if (buffer.size() < (pos + toRead)) { + throw std::out_of_range("Calling tryRead() with a too small buffer (" + std::to_string(buffer.size()) + ") for a read of " + std::to_string(toRead) + " bytes starting at " + std::to_string(pos)); + } + + size_t got = 0; + do { + ssize_t res = ::read(fd, reinterpret_cast(&buffer.at(pos)), toRead - got); + if (res == 0) { + throw runtime_error("EOF while reading message"); + } + if (res < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOTCONN) { + return IOState::NeedRead; + } + else { + throw std::runtime_error(std::string("Error while reading message: ") + stringerror()); + } + } + + pos += static_cast(res); + got += static_cast(res); + } + while (got < toRead); + + return IOState::Done; +} + void TCPConnectionToBackend::handleIO(std::shared_ptr& conn, const struct timeval& now) { - // DEBUG: cerr<<"in "<<__PRETTY_FUNCTION__<d_socket == nullptr) { throw std::runtime_error("No downstream socket in " + std::string(__PRETTY_FUNCTION__) + "!"); } @@ -37,7 +75,7 @@ void TCPConnectionToBackend::handleIO(std::shared_ptr& c try { if (conn->d_state == State::sendingQueryToBackend) { - // DEBUG: cerr<<"sending query to backend over FD "<isFastOpenEnabled()) { @@ -47,22 +85,14 @@ void TCPConnectionToBackend::handleIO(std::shared_ptr& c size_t sent = sendMsgWithOptions(fd, reinterpret_cast(&conn->d_currentQuery.d_buffer.at(conn->d_currentPos)), conn->d_currentQuery.d_buffer.size() - conn->d_currentPos, &conn->d_ds->remote, &conn->d_ds->sourceAddr, conn->d_ds->sourceItf, socketFlags); if (sent == conn->d_currentQuery.d_buffer.size()) { - // DEBUG: cerr<<"query sent to backend"<incQueries(); conn->d_currentPos = 0; //conn->d_currentQuery.d_querySentTime = now; - // DEBUG: cerr<<"adding a pending response for ID "<d_currentQuery.d_idstate.origID<<" and QNAME "<d_currentQuery.d_idstate.qname<d_currentQuery.d_idstate.qTag?"tags":"no tags")<d_currentQuery.d_idstate.origID<<" and QNAME "<d_currentQuery.d_idstate.qname); conn->d_pendingResponses[conn->d_currentQuery.d_idstate.origID] = std::move(conn->d_currentQuery); conn->d_currentQuery.d_buffer.clear(); -#if 0 - if (!conn->d_usedForXFR) { - /* don't bother with the outstanding count for XFR queries */ - ++conn->d_ds->outstanding; - ++conn->d_outstanding; - } -#endif if (conn->d_pendingQueries.empty()) { conn->d_state = State::readingResponseSizeFromBackend; @@ -83,7 +113,7 @@ void TCPConnectionToBackend::handleIO(std::shared_ptr& c } if (conn->d_state == State::readingResponseSizeFromBackend) { - // DEBUG: cerr<<"reading response size from backend"<& c conn->d_responseBuffer.resize(sizeof(uint16_t)); iostate = tryRead(fd, conn->d_responseBuffer, conn->d_currentPos, sizeof(uint16_t) - conn->d_currentPos); if (iostate == IOState::Done) { - // DEBUG: cerr<<"got response size from backend"<d_state = State::readingResponseFromBackend; conn->d_responseSize = conn->d_responseBuffer.at(0) * 256 + conn->d_responseBuffer.at(1); conn->d_responseBuffer.reserve(conn->d_responseSize + /* we will need to prepend the size later */ 2); @@ -101,18 +131,17 @@ void TCPConnectionToBackend::handleIO(std::shared_ptr& c } if (conn->d_state == State::readingResponseFromBackend) { - // DEBUG: cerr<<"reading response from backend"<d_responseBuffer, conn->d_currentPos, conn->d_responseSize - conn->d_currentPos); if (iostate == IOState::Done) { - // DEBUG: cerr<<"got response from backend"<d_responseReadTime = now; try { - iostate = conn->handleResponse(now); + iostate = conn->handleResponse(conn, now); } catch (const std::exception& e) { vinfolog("Got an exception while handling TCP response from %s (client is %s): %s", conn->d_ds ? conn->d_ds->getName() : "unknown", conn->d_currentQuery.d_idstate.origRemote.toStringWithPort(), e.what()); } - //return; } } @@ -157,18 +186,17 @@ void TCPConnectionToBackend::handleIO(std::shared_ptr& c if (connectionDied) { bool reconnected = false; - // DEBUG: cerr<<"connection died, number of failures is "<d_downstreamFailures<<", retries is "<d_ds->retries<d_downstreamFailures<<", retries is "<d_ds->retries); if ((!conn->d_usedForXFR || conn->d_queries == 0) && conn->d_downstreamFailures < conn->d_ds->retries) { - // DEBUG: cerr<<"reconnecting"<d_ioState->reset(); ioGuard.release(); if (conn->reconnect()) { - // DEBUG: cerr<<"reconnected"<d_ioState = make_unique(conn->d_clientConn->getIOMPlexer(), conn->d_socket->getHandle()); - // DEBUG: cerr<<"new state"<d_pendingResponses) { conn->d_pendingQueries.push_back(std::move(pending.second)); @@ -182,9 +210,7 @@ void TCPConnectionToBackend::handleIO(std::shared_ptr& c // resume sending query } else { - // DEBUG: cerr<<"sending next query"<d_proxyProtocolPayloadAdded && !conn->d_proxyProtocolPayload.empty()) { @@ -198,21 +224,18 @@ void TCPConnectionToBackend::handleIO(std::shared_ptr& c if (!reconnected) { /* reconnect failed, we give up */ - conn->d_connectionDied = true; + DEBUGLOG("reconnect failed, we give up"); conn->notifyAllQueriesFailed(now); } } if (iostate == IOState::Done) { - // DEBUG: cerr<<"in "<<__PRETTY_FUNCTION__<<", done"<d_ioState->update(iostate, handleIOCallback, conn); } else { - // DEBUG: cerr<<"in "<<__PRETTY_FUNCTION__<<", updating to "<<(int)iostate<d_ioState->update(iostate, handleIOCallback, conn, iostate == IOState::NeedRead ? conn->getBackendReadTTD(now) : conn->getBackendWriteTTD(now)); } ioGuard.release(); - } void TCPConnectionToBackend::handleIOCallback(int fd, FDMultiplexer::funcparam_t& param) @@ -229,8 +252,7 @@ void TCPConnectionToBackend::handleIOCallback(int fd, FDMultiplexer::funcparam_t void TCPConnectionToBackend::queueQuery(TCPQuery&& query, std::shared_ptr& sharedSelf) { - // DEBUG: cerr<<"in "<<__PRETTY_FUNCTION__< result; if (d_socket) { - // DEBUG: cerr<<"closing socket "<getHandle()<getHandle()); shutdown(d_socket->getHandle(), SHUT_RDWR); d_socket.reset(); d_ioState.reset(); @@ -270,7 +299,7 @@ bool TCPConnectionToBackend::reconnect() vinfolog("TCP connecting to downstream %s (%d)", d_ds->getNameWithAddr(), d_downstreamFailures); try { result = std::unique_ptr(new Socket(d_ds->remote.sin4.sin_family, SOCK_STREAM, 0)); - // DEBUG: cerr<<"result of connect is "<getHandle()<getHandle()); if (!IsAnyAddress(d_ds->sourceAddr)) { SSetsockopt(result->getHandle(), SOL_SOCKET, SO_REUSEADDR, 1); #ifdef IP_BIND_ADDRESS_NO_PORT @@ -298,7 +327,7 @@ bool TCPConnectionToBackend::reconnect() #endif /* MSG_FASTOPEN */ d_socket = std::move(result); - // DEBUG: cerr<<"connected new socket "<getHandle()<getHandle()); ++d_ds->tcpCurrentConnections; return true; } @@ -333,16 +362,12 @@ void TCPConnectionToBackend::handleTimeout(const struct timeval& now, bool write void TCPConnectionToBackend::notifyAllQueriesFailed(const struct timeval& now, bool timeout) { + DEBUGLOG("in "<<__PRETTY_FUNCTION__); d_connectionDied = true; - //auto clientConn = d_clientConn.lock(); - //if (!clientConn) { - // d_clientConn.reset(); - // return; - //} + auto& clientConn = d_clientConn; if (!clientConn->active()) { // a client timeout occured, or something like that */ - d_connectionDied = true; d_clientConn.reset(); return; } @@ -369,20 +394,15 @@ void TCPConnectionToBackend::notifyAllQueriesFailed(const struct timeval& now, b d_clientConn.reset(); } -IOState TCPConnectionToBackend::handleResponse(const struct timeval& now) +IOState TCPConnectionToBackend::handleResponse(std::shared_ptr& conn, const struct timeval& now) { - // DEBUG: cerr<<"in "<<__PRETTY_FUNCTION__<active()) { - // DEBUG: cerr<<"client is not active"<handleXFRResponse(clientConn, now, std::move(response)); d_state = State::readingResponseSizeFromBackend; d_currentPos = 0; @@ -402,44 +422,45 @@ IOState TCPConnectionToBackend::handleResponse(const struct timeval& now) // get ready to read the next packet, if any } else { - // DEBUG: cerr<<"not XFR, phew"<second.d_idstate); - // DEBUG: cerr<<"IDS has "<<(ids.qTag?" TAGS ": "NO TAGS")<handleResponse(clientConn, now, TCPResponse(std::move(d_responseBuffer), std::move(ids), d_ds)); + DEBUGLOG("passing response to client connection for "<handleResponse(clientConn, now, TCPResponse(std::move(d_responseBuffer), std::move(ids), conn)); d_pendingResponses.erase(it); if (!d_pendingQueries.empty()) { - // DEBUG: cerr<<"still have some queries to send"< d_buffer; }; +class TCPConnectionToBackend; + struct TCPResponse : public TCPQuery { TCPResponse() { } - TCPResponse(std::vector&& buffer, IDState&& state, std::shared_ptr ds): TCPQuery(std::move(buffer), std::move(state)), d_ds(ds) + TCPResponse(std::vector&& buffer, IDState&& state, std::shared_ptr conn): TCPQuery(std::move(buffer), std::move(state)), d_connection(conn) { } - std::shared_ptr d_ds{nullptr}; + std::shared_ptr d_connection{nullptr}; dnsheader d_cleartextDH; bool d_selfGenerated{false}; }; @@ -68,6 +70,11 @@ public: return d_socket->getHandle(); } + const std::shared_ptr& getDS() const + { + return d_ds; + } + const ComboAddress& getRemote() const { return d_ds->remote; @@ -103,6 +110,7 @@ public: return d_enableFastOpen; } + /* whether we can acept new queries FOR THE SAME CLIENT */ bool canAcceptNewQueries() const { if (d_usedForXFR || d_connectionDied) { @@ -110,10 +118,20 @@ public: /* Don't reuse the TCP connection after an {A,I}XFR */ /* but don't reset it either, we will need to read more messages */ } -#warning FIXME: maximum number of pending queries + + if ((d_pendingQueries.size() + d_pendingResponses.size()) >= d_ds->d_maxInFlightQueriesPerConn) { + return false; + } + return true; } + bool isIdle() const + { + return d_pendingQueries.size() == 0 && d_pendingResponses.size() == 0; + } + + /* whether a connection can be reused for a different client */ bool canBeReused() const { if (d_usedForXFR || d_connectionDied) { @@ -138,17 +156,17 @@ public: return ds == d_ds; } - static void handleIO(std::shared_ptr& conn, const struct timeval& now); - static void handleIOCallback(int fd, FDMultiplexer::funcparam_t& param); - static IOState sendNextQuery(std::shared_ptr& conn); - void queueQuery(TCPQuery&& query, std::shared_ptr& sharedSelf); void handleTimeout(const struct timeval& now, bool write); - IOState handleResponse(const struct timeval& now); void setProxyProtocolPayload(std::string&& payload); void setProxyProtocolPayloadAdded(bool added); private: + static void handleIO(std::shared_ptr& conn, const struct timeval& now); + static void handleIOCallback(int fd, FDMultiplexer::funcparam_t& param); + static IOState sendNextQuery(std::shared_ptr& conn); + + IOState handleResponse(std::shared_ptr& conn, const struct timeval& now); uint16_t getQueryIdFromResponse(); bool reconnect(); void notifyAllQueriesFailed(const struct timeval& now, bool timeout = false); @@ -194,7 +212,6 @@ private: std::unique_ptr d_socket{nullptr}; std::unique_ptr d_ioState{nullptr}; std::shared_ptr d_ds{nullptr}; - //std::weak_ptr d_clientConn; std::shared_ptr d_clientConn; std::string d_proxyProtocolPayload; TCPQuery d_currentQuery; @@ -206,7 +223,7 @@ private: State d_state{State::idle}; bool d_fresh{true}; bool d_enableFastOpen{false}; - bool d_connectionDied{true}; + bool d_connectionDied{false}; bool d_usedForXFR{false}; bool d_proxyProtocolPayloadAdded{false}; }; diff --git a/pdns/dnsdistdist/dnsdist-tcp-upstream.hh b/pdns/dnsdistdist/dnsdist-tcp-upstream.hh index 9479f0a403..721559256e 100644 --- a/pdns/dnsdistdist/dnsdist-tcp-upstream.hh +++ b/pdns/dnsdistdist/dnsdist-tcp-upstream.hh @@ -57,7 +57,6 @@ struct ConnectionInfo class IncomingTCPConnectionState { public: - //IncomingTCPConnectionState(ConnectionInfo&& ci, TCPClientThreadData& threadData, const struct timeval& now): d_buffer(s_maxPacketCacheEntrySize), d_responseBuffer(s_maxPacketCacheEntrySize), d_threadData(threadData), d_ci(std::move(ci)), d_handler(d_ci.fd, g_tcpRecvTimeout, d_ci.cs->tlsFrontend ? d_ci.cs->tlsFrontend->getContext() : nullptr, now.tv_sec), d_ioState(threadData.mplexer, d_ci.fd), _connectionStartTime(now) IncomingTCPConnectionState(ConnectionInfo&& ci, TCPClientThreadData& threadData, const struct timeval& now): d_buffer(s_maxPacketCacheEntrySize), d_threadData(threadData), d_ci(std::move(ci)), d_handler(d_ci.fd, g_tcpRecvTimeout, d_ci.cs->tlsFrontend ? d_ci.cs->tlsFrontend->getContext() : nullptr, now.tv_sec), d_ioState(make_unique(threadData.mplexer, d_ci.fd)), d_connectionStartTime(now) { d_origDest.reset(); @@ -138,6 +137,7 @@ public: return false; } +#if 0 void dump() const { static std::mutex s_mutex; @@ -165,14 +165,11 @@ public: } } } +#endif - std::shared_ptr getActiveDownstreamConnection(const std::shared_ptr& ds) - { -#warning TODO: we need to find a connection to this DS, usable (no TLV values sent) and supporting OOR - return nullptr; - } - + std::shared_ptr getActiveDownstreamConnection(const std::shared_ptr& ds); std::shared_ptr getDownstreamConnection(std::shared_ptr& ds, const struct timeval& now); + void registerActiveDownstreamConnection(std::shared_ptr& conn); std::unique_ptr& getIOMPlexer() const { @@ -189,6 +186,8 @@ public: void handleXFRResponse(std::shared_ptr& state, const struct timeval& now, TCPResponse&& response); void handleTimeout(bool write); + bool canAcceptNewQueries() const; + bool active() const { return d_ioState != nullptr; @@ -196,6 +195,7 @@ public: enum class State { doingHandshake, readingQuerySize, readingQuery, sendingResponse, idle /* in case of XFR, we stop processing queries */ }; + std::map>> d_activeConnectionsToBackend; std::vector d_buffer; std::deque d_queuedResponses; TCPClientThreadData& d_threadData; @@ -211,6 +211,7 @@ public: struct timeval d_queryReadTime; size_t d_currentPos{0}; size_t d_queriesCount{0}; + size_t d_currentQueriesCount{0}; unsigned int d_remainingTime{0}; uint16_t d_querySize{0}; uint16_t d_downstreamFailures{0}; @@ -219,10 +220,8 @@ public: bool d_readingFirstQuery{true}; bool d_isXFR{false}; bool d_xfrStarted{false}; - bool d_xfrDone{false}; bool d_selfGeneratedResponse{false}; bool d_proxyProtocolPayloadAdded{false}; bool d_proxyProtocolPayloadHasTLV{false}; }; -IOState tryRead(int fd, std::vector& buffer, size_t& pos, size_t toRead); diff --git a/pdns/dnsdistdist/tcpiohandler-mplexer.hh b/pdns/dnsdistdist/tcpiohandler-mplexer.hh index fdb1c5ed97..e0e366dd78 100644 --- a/pdns/dnsdistdist/tcpiohandler-mplexer.hh +++ b/pdns/dnsdistdist/tcpiohandler-mplexer.hh @@ -4,6 +4,12 @@ #include "mplexer.hh" #include "tcpiohandler.hh" +#if 0 +#define DEBUGLOG(x) cerr< ttd = boost::none) { - cerr<<"in "<<__PRETTY_FUNCTION__<<" for fd "<removeReadFD(d_fd); d_currentState = IOState::Done; } else if (d_currentState == IOState::NeedWrite && iostate != IOState::NeedWrite) { - cerr<<__PRETTY_FUNCTION__<<": remove write FD "<removeWriteFD(d_fd); d_currentState = IOState::Done; } @@ -65,7 +71,7 @@ public: } d_currentState = IOState::NeedRead; - cerr<<__PRETTY_FUNCTION__<<": add read FD "<addReadFD(d_fd, callback, callbackData, ttd ? &*ttd : nullptr); } else if (iostate == IOState::NeedWrite) { @@ -74,12 +80,12 @@ public: } d_currentState = IOState::NeedWrite; - cerr<<__PRETTY_FUNCTION__<<": add write FD "<addWriteFD(d_fd, callback, callbackData, ttd ? &*ttd : nullptr); } else if (iostate == IOState::Done) { d_currentState = IOState::Done; - cerr<<__PRETTY_FUNCTION__<<": done"<reset(); d_enabled = false; } diff --git a/regression-tests.dnsdist/test_ProxyProtocol.py b/regression-tests.dnsdist/test_ProxyProtocol.py index ab6c1a208c..9c3313d40f 100644 --- a/regression-tests.dnsdist/test_ProxyProtocol.py +++ b/regression-tests.dnsdist/test_ProxyProtocol.py @@ -61,6 +61,10 @@ def ProxyProtocolUDPResponder(port, fromQueue, toQueue): sock.close() def ProxyProtocolTCPResponder(port, fromQueue, toQueue): + # be aware that this responder will not accept a new connection + # until the last one has been closed. This is done on purpose to + # to check for connection reuse, making sure that a lot of connections + # are not open in parallel. sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)