From: Remi Gacogne Date: Thu, 11 Feb 2021 18:03:07 +0000 (+0100) Subject: dnsdist: Better handling of TCP responses mixed with queries X-Git-Tag: dnsdist-1.6.0-alpha2~11^2~22 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=4253054a0b8e9ecca4b97d91786e87e2f62dc4d3;p=thirdparty%2Fpdns.git dnsdist: Better handling of TCP responses mixed with queries --- diff --git a/pdns/dnsdist-tcp.cc b/pdns/dnsdist-tcp.cc index f63f8b8f82..d2fdb80119 100644 --- a/pdns/dnsdist-tcp.cc +++ b/pdns/dnsdist-tcp.cc @@ -300,70 +300,44 @@ static IOState sendQueuedResponses(std::shared_ptr& } } - if (state->d_isXFR) { - /* we should still be reading from the backend, and we don't want to read from the client */ - state->d_state = IncomingTCPConnectionState::State::idle; - state->d_currentPos = 0; - DEBUGLOG("idling for XFR completion"); - return IOState::Done; - } else { - if (state->canAcceptNewQueries()) { - DEBUGLOG("waiting for new queries"); - state->resetForNewQuery(); - return IOState::NeedRead; - } - else { - DEBUGLOG("idling"); - state->d_state = IncomingTCPConnectionState::State::idle; - return IOState::Done; - } - } + state->d_state = IncomingTCPConnectionState::State::idle; + return IOState::Done; } -static bool handleResponseSent(std::shared_ptr& state, const struct timeval& now) +static void handleResponseSent(std::shared_ptr& state) { - if (!state->d_isXFR) { - --state->d_currentQueriesCount; - - const auto& currentResponse = state->d_currentResponse; - if (currentResponse.d_selfGenerated == false && currentResponse.d_connection && currentResponse.d_connection->getDS()) { - const auto& ds = currentResponse.d_connection->getDS(); - struct timespec answertime; - gettime(&answertime); - const auto& ids = currentResponse.d_idstate; - double udiff = ids.sentTime.udiff(); - g_rings.insertResponse(answertime, state->d_ci.remote, ids.qname, ids.qtype, static_cast(udiff), static_cast(currentResponse.d_buffer.size()), currentResponse.d_cleartextDH, ds->remote); - vinfolog("Got answer from %s, relayed to %s (%s), took %f usec", ds->remote.toStringWithPort(), ids.origRemote.toStringWithPort(), (state->d_ci.cs->tlsFrontend ? "DoT" : "TCP"), udiff); - } - - switch (currentResponse.d_cleartextDH.rcode) { - case RCode::NXDomain: - ++g_stats.frontendNXDomain; - break; - case RCode::ServFail: - ++g_stats.servfailResponses; - ++g_stats.frontendServFail; - break; - case RCode::NoError: - ++g_stats.frontendNoError; - break; - } + if (state->d_isXFR) { + return; + } - if (g_maxTCPQueriesPerConn && state->d_queriesCount > g_maxTCPQueriesPerConn) { - vinfolog("Terminating TCP connection from %s because it reached the maximum number of queries per conn (%d / %d)", state->d_ci.remote.toStringWithPort(), state->d_queriesCount, g_maxTCPQueriesPerConn); - return false; - } + --state->d_currentQueriesCount; - if (state->maxConnectionDurationReached(g_maxTCPConnectionDuration, now)) { - vinfolog("Terminating TCP connection from %s because it reached the maximum TCP connection duration", state->d_ci.remote.toStringWithPort()); - return false; - } + const auto& currentResponse = state->d_currentResponse; + if (currentResponse.d_selfGenerated == false && currentResponse.d_connection && currentResponse.d_connection->getDS()) { + const auto& ds = currentResponse.d_connection->getDS(); + struct timespec answertime; + gettime(&answertime); + const auto& ids = currentResponse.d_idstate; + double udiff = ids.sentTime.udiff(); + g_rings.insertResponse(answertime, state->d_ci.remote, ids.qname, ids.qtype, static_cast(udiff), static_cast(currentResponse.d_buffer.size()), currentResponse.d_cleartextDH, ds->remote); + vinfolog("Got answer from %s, relayed to %s (%s), took %f usec", ds->remote.toStringWithPort(), ids.origRemote.toStringWithPort(), (state->d_ci.cs->tlsFrontend ? "DoT" : "TCP"), udiff); } - return true; + switch (currentResponse.d_cleartextDH.rcode) { + case RCode::NXDomain: + ++g_stats.frontendNXDomain; + break; + case RCode::ServFail: + ++g_stats.servfailResponses; + ++g_stats.frontendServFail; + break; + case RCode::NoError: + ++g_stats.frontendNoError; + break; + } } -bool IncomingTCPConnectionState::canAcceptNewQueries() const +bool IncomingTCPConnectionState::canAcceptNewQueries(const struct timeval& now) { if (d_isXFR) { DEBUGLOG("not accepting new queries because used for XFR"); @@ -375,6 +349,16 @@ bool IncomingTCPConnectionState::canAcceptNewQueries() const return false; } + if (g_maxTCPQueriesPerConn && d_queriesCount > g_maxTCPQueriesPerConn) { + vinfolog("not accepting new queries from %s because it reached the maximum number of queries per conn (%d / %d)", d_ci.remote.toStringWithPort(), d_queriesCount, g_maxTCPQueriesPerConn); + return false; + } + + if (maxConnectionDurationReached(g_maxTCPConnectionDuration, now)) { + vinfolog("not accepting new queries from %s because it reached the maximum TCP connection duration", d_ci.remote.toStringWithPort()); + return false; + } + return true; } @@ -383,7 +367,7 @@ void IncomingTCPConnectionState::resetForNewQuery() d_buffer.resize(sizeof(uint16_t)); d_currentPos = 0; d_querySize = 0; - d_state = State::readingQuerySize; + d_state = State::waitingForQuery; } std::shared_ptr IncomingTCPConnectionState::getActiveDownstreamConnection(const std::shared_ptr& ds, const std::unique_ptr>& tlvs) @@ -428,13 +412,12 @@ IOState IncomingTCPConnectionState::sendResponse(std::shared_ptrd_handler.tryWrite(state->d_currentResponse.d_buffer, state->d_currentPos, state->d_currentResponse.d_buffer.size()); if (iostate == IOState::Done) { - DEBUGLOG("response sent"); - if (!handleResponseSent(state, now)) { - return IOState::Done; - } - return sendQueuedResponses(state, now); + DEBUGLOG("response sent from "<<__PRETTY_FUNCTION__); + handleResponseSent(state); + return iostate; } else { - return IOState::NeedWrite; + state->d_lastIOBlocked = true; + return IOState::NeedWrite; DEBUGLOG("partial write"); } } @@ -451,38 +434,46 @@ IOState IncomingTCPConnectionState::sendResponse(std::shared_ptrreset(); + d_ioState.reset(); d_handler.close(); } -/* called when handling a response or error coming from a backend */ -void IncomingTCPConnectionState::sendOrQueueResponse(std::shared_ptr& state, const struct timeval& now, TCPResponse&& response) +void IncomingTCPConnectionState::queueResponse(std::shared_ptr& state, const struct timeval& now, TCPResponse&& response) { - // if we were already reading a query (not the query size, mind you), or sending a response we need to queue the response - // otherwise we can start sending it right away + // queue response + state->d_queuedResponses.push_back(std::move(response)); + DEBUGLOG("queueing response, state is "<<(int)state->d_state<<", queue size is now "<d_queuedResponses.size()); + + // when the response comes from a backend, there is a real possibility that we are currently + // idle, and thus not trying to send the response right away would make our ref count go to 0. + // Even if we are waiting for a query, we will not wake up before the new query arrives or a + // timeout occurs if (state->d_state == IncomingTCPConnectionState::State::idle || - state->d_state == IncomingTCPConnectionState::State::readingProxyProtocolHeader || - state->d_state == IncomingTCPConnectionState::State::readingQuerySize) { + state->d_state == IncomingTCPConnectionState::State::waitingForQuery) { + auto iostate = sendQueuedResponses(state, now); - auto iostate = sendResponse(state, now, std::move(response)); + if (iostate == IOState::Done && state->canAcceptNewQueries(now)) { + state->resetForNewQuery(); + state->d_state = IncomingTCPConnectionState::State::waitingForQuery; + iostate = IOState::NeedRead; + } + + // for the same reason we need to update the state right away, nobody will do that for us state->d_ioState->update(iostate, handleIOCallback, state, iostate == IOState::NeedWrite ? state->getClientWriteTTD(now) : state->getClientReadTTD(now)); } - else { - // queue response - state->d_queuedResponses.push_back(std::move(response)); - DEBUGLOG("queueing response because state is "<<(int)state->d_state<<", queue size is now "<d_queuedResponses.size()); - } } /* called from the backend code when a new response has been received */ void IncomingTCPConnectionState::handleResponse(std::shared_ptr state, const struct timeval& now, TCPResponse&& response) { + cerr<<"in "<<__PRETTY_FUNCTION__<d_isXFR && response.d_connection && response.d_connection->isIdle()) { // if we have added a TCP Proxy Protocol payload to a connection, don't release it to the general pool yet, no one else will be able to use it anyway if (response.d_connection->canBeReused()) { @@ -505,12 +496,14 @@ void IncomingTCPConnectionState::handleResponse(std::shared_ptrgetRemote(), qnameWireLength)) { + cerr<<"does not match!"<d_threadData.localRespRulactions, dr, false)) { + cerr<<"processResponse failed"<& state, const struct timeval& now) +static void handleQuery(std::shared_ptr& state, const struct timeval& now) { if (state->d_querySize < sizeof(dnsheader)) { ++g_stats.nonCompliantQueries; - return IOState::NeedRead; + state->terminateClientConnection(); + return; } state->d_readingFirstQuery = false; @@ -587,14 +583,16 @@ static IOState handleQuery(std::shared_ptr& state, c TCPResponse response; state->d_state = IncomingTCPConnectionState::State::idle; ++state->d_currentQueriesCount; - return state->sendResponse(state, now, std::move(response)); + state->queueResponse(state, now, std::move(response)); + return; } { /* this pointer will be invalidated the second the buffer is resized, don't hold onto it! */ auto* dh = reinterpret_cast(state->d_buffer.data()); if (!checkQueryHeaders(dh)) { - return IOState::NeedRead; + state->terminateClientConnection(); + return; } if (dh->qdcount == 0) { @@ -605,7 +603,8 @@ static IOState handleQuery(std::shared_ptr& state, c response.d_buffer = std::move(state->d_buffer); state->d_state = IncomingTCPConnectionState::State::idle; ++state->d_currentQueriesCount; - return state->sendResponse(state, now, std::move(response)); + state->queueResponse(state, now, std::move(response)); + return; } } @@ -630,7 +629,8 @@ static IOState handleQuery(std::shared_ptr& state, c auto result = processQuery(dq, *state->d_ci.cs, state->d_threadData.holders, ds); if (result == ProcessQueryResult::Drop) { - return IOState::Done; + state->terminateClientConnection(); + return; } // the buffer might have been invalidated by now @@ -641,11 +641,13 @@ static IOState handleQuery(std::shared_ptr& state, c response.d_buffer = std::move(state->d_buffer); state->d_state = IncomingTCPConnectionState::State::idle; ++state->d_currentQueriesCount; - return state->sendResponse(state, now, std::move(response)); + state->queueResponse(state, now, std::move(response)); + return; } if (result != ProcessQueryResult::PassToBackend || ds == nullptr) { - return IOState::Done; + state->terminateClientConnection(); + return; } IDState ids; @@ -693,8 +695,6 @@ static IOState handleQuery(std::shared_ptr& state, c ++state->d_currentQueriesCount; vinfolog("Got query for %s|%s from %s (%s, %d bytes), relayed to %s", ids.qname.toLogString(), QType(ids.qtype).getName(), state->d_proxiedRemote.toStringWithPort(), (state->d_ci.cs->tlsFrontend ? "DoT" : "TCP"), state->d_buffer.size(), ds->getName()); downstreamConnection->queueQuery(TCPQuery(std::move(state->d_buffer), std::move(ids)), downstreamConnection); - - return IOState::NeedRead; } void IncomingTCPConnectionState::handleIOCallback(int fd, FDMultiplexer::funcparam_t& param) @@ -713,7 +713,6 @@ void IncomingTCPConnectionState::handleIO(std::shared_ptrd_lastIOBlocked = false; + try { if (state->d_state == IncomingTCPConnectionState::State::doingHandshake) { DEBUGLOG("doing handshake"); @@ -758,11 +759,11 @@ void IncomingTCPConnectionState::handleIO(std::shared_ptrd_lastIOBlocked = true; } } - if (!wouldBlock && state->d_state == IncomingTCPConnectionState::State::readingProxyProtocolHeader) { + if (!state->d_lastIOBlocked && state->d_state == IncomingTCPConnectionState::State::readingProxyProtocolHeader) { DEBUGLOG("reading proxy protocol header"); do { iostate = state->d_handler.tryRead(state->d_buffer, state->d_currentPos, state->d_proxyProtocolNeed); @@ -800,15 +801,21 @@ void IncomingTCPConnectionState::handleIO(std::shared_ptrd_lastIOBlocked = true; } } - while (!wouldBlock); + while (state->active() && !state->d_lastIOBlocked); } - if (!wouldBlock && state->d_state == IncomingTCPConnectionState::State::readingQuerySize) { + if (!state->d_lastIOBlocked && (state->d_state == IncomingTCPConnectionState::State::waitingForQuery || + state->d_state == IncomingTCPConnectionState::State::readingQuerySize)) { DEBUGLOG("reading query size"); iostate = state->d_handler.tryRead(state->d_buffer, state->d_currentPos, sizeof(uint16_t)); + if (state->d_currentPos > 0) { + /* if we got at least one byte, we can't go around sending responses */ + state->d_state = IncomingTCPConnectionState::State::readingQuerySize; + } + if (iostate == IOState::Done) { DEBUGLOG("query size received"); state->d_state = IncomingTCPConnectionState::State::readingQuery; @@ -828,77 +835,76 @@ void IncomingTCPConnectionState::handleIO(std::shared_ptrd_currentPos = 0; } else { - wouldBlock = true; + state->d_lastIOBlocked = true; } } - if (!wouldBlock && state->d_state == IncomingTCPConnectionState::State::readingQuery) { + if (!state->d_lastIOBlocked && state->d_state == IncomingTCPConnectionState::State::readingQuery) { DEBUGLOG("reading query"); iostate = state->d_handler.tryRead(state->d_buffer, state->d_currentPos, state->d_querySize); if (iostate == IOState::Done) { DEBUGLOG("query received"); state->d_buffer.resize(state->d_querySize); - iostate = handleQuery(state, now); - // if the query has been passed to a backend, or dropped, we can start - // reading again, or sending queued responses - if (iostate == IOState::NeedRead) { - if (state->d_queuedResponses.empty()) { - if (state->canAcceptNewQueries()) { - state->resetForNewQuery(); - } - else { - state->d_state = IncomingTCPConnectionState::State::idle; - iostate = IOState::Done; - } - } - else { - TCPResponse resp = std::move(state->d_queuedResponses.front()); - state->d_queuedResponses.pop_front(); - ioGuard.release(); - state->d_state = IncomingTCPConnectionState::State::idle; - iostate = sendResponse(state, now, std::move(resp)); - if (iostate != IOState::Done) { - wouldBlock = true; - } - } - } - else if (iostate != IOState::Done) { - wouldBlock = true; + state->d_state = IncomingTCPConnectionState::State::idle; + handleQuery(state, now); + cerr<<"out of handleQuery, state is "<<(int)state->d_state<<", iostate is "<<(int)iostate<active() && state->d_state != IncomingTCPConnectionState::State::idle) { + iostate = state->d_ioState->getState(); } } else { - wouldBlock = true; + state->d_lastIOBlocked = true; } } - if (!wouldBlock && state->d_state == IncomingTCPConnectionState::State::sendingResponse) { + if (!state->d_lastIOBlocked && state->d_state == IncomingTCPConnectionState::State::sendingResponse) { DEBUGLOG("sending response"); iostate = state->d_handler.tryWrite(state->d_currentResponse.d_buffer, state->d_currentPos, state->d_currentResponse.d_buffer.size()); if (iostate == IOState::Done) { - DEBUGLOG("response sent"); - if (!handleResponseSent(state, now)) { - iostate = IOState::Done; - } - else { - iostate = sendQueuedResponses(state, now); + DEBUGLOG("response sent from "<<__PRETTY_FUNCTION__); + handleResponseSent(state); + state->d_state = IncomingTCPConnectionState::State::idle; + } + else { + state->d_lastIOBlocked = true; + } + } + + if (state->active() && + !state->d_lastIOBlocked && + iostate == IOState::Done && + (state->d_state == IncomingTCPConnectionState::State::idle || + state->d_state == IncomingTCPConnectionState::State::waitingForQuery)) + { + // try sending querued responses + cerr<<"send responses, if any"<d_lastIOBlocked && iostate == IOState::Done) { + // if the query has been passed to a backend, or dropped, and the responses have been sent, + // we can start reading again + if (!state->d_isXFR && state->canAcceptNewQueries(now)) { + cerr<<"reset for new query"<resetForNewQuery(); + iostate = IOState::NeedRead; } - } else { - wouldBlock = true; - DEBUGLOG("partial write"); } } if (state->d_state != IncomingTCPConnectionState::State::idle && state->d_state != IncomingTCPConnectionState::State::doingHandshake && state->d_state != IncomingTCPConnectionState::State::readingProxyProtocolHeader && + state->d_state != IncomingTCPConnectionState::State::waitingForQuery && state->d_state != IncomingTCPConnectionState::State::readingQuerySize && state->d_state != IncomingTCPConnectionState::State::readingQuery && state->d_state != IncomingTCPConnectionState::State::sendingResponse) { vinfolog("Unexpected state %d in handleIOCallback", static_cast(state->d_state)); } } - catch(const std::exception& e) { + catch (const std::exception& e) { /* most likely an EOF because the other end closed the connection, but it might also be a real IO error or something else. Let's just drop the connection @@ -906,6 +912,7 @@ void IncomingTCPConnectionState::handleIO(std::shared_ptrd_state == IncomingTCPConnectionState::State::idle || state->d_state == IncomingTCPConnectionState::State::doingHandshake || state->d_state != IncomingTCPConnectionState::State::readingProxyProtocolHeader || + state->d_state == IncomingTCPConnectionState::State::waitingForQuery || state->d_state == IncomingTCPConnectionState::State::readingQuerySize || state->d_state == IncomingTCPConnectionState::State::readingQuery) { ++state->d_ci.cs->tcpDiedReadingQuery; @@ -926,6 +933,11 @@ void IncomingTCPConnectionState::handleIO(std::shared_ptractive()) { + cerr<<"state is no longer active"<d_ioState->update(iostate, handleIOCallback, state); } @@ -934,7 +946,7 @@ void IncomingTCPConnectionState::handleIO(std::shared_ptrd_lastIOBlocked); } void IncomingTCPConnectionState::notifyIOError(std::shared_ptr& state, IDState&& query, const struct timeval& now) @@ -950,7 +962,7 @@ void IncomingTCPConnectionState::notifyIOError(std::shared_ptrd_queuedResponses.pop_front(); state->d_state = IncomingTCPConnectionState::State::idle; try { - sendOrQueueResponse(state, now, std::move(resp)); + queueResponse(state, now, std::move(resp)); } catch (const std::exception& e) { vinfolog("exception in notifyIOError: %s", e.what()); @@ -964,7 +976,7 @@ void IncomingTCPConnectionState::notifyIOError(std::shared_ptr& state, const struct timeval& now, TCPResponse&& response) { - sendOrQueueResponse(state, now, std::move(response)); + queueResponse(state, now, std::move(response)); } void IncomingTCPConnectionState::handleTimeout(std::shared_ptr& state, bool write) diff --git a/pdns/dnsdistdist/dnsdist-tcp-upstream.hh b/pdns/dnsdistdist/dnsdist-tcp-upstream.hh index a9ba94c1ae..f3724b85c8 100644 --- a/pdns/dnsdistdist/dnsdist-tcp-upstream.hh +++ b/pdns/dnsdistdist/dnsdist-tcp-upstream.hh @@ -155,7 +155,7 @@ public: static void handleIOCallback(int fd, FDMultiplexer::funcparam_t& param); static void notifyIOError(std::shared_ptr& state, IDState&& query, const struct timeval& now); static IOState sendResponse(std::shared_ptr& state, const struct timeval& now, TCPResponse&& response); - static void sendOrQueueResponse(std::shared_ptr& state, const struct timeval& now, TCPResponse&& response); + static void queueResponse(std::shared_ptr& state, const struct timeval& now, TCPResponse&& response); /* we take a copy of a shared pointer, not a reference, because the initial shared pointer might be released during the handling of the response */ static void handleResponse(std::shared_ptr state, const struct timeval& now, TCPResponse&& response); @@ -165,7 +165,7 @@ public: void terminateClientConnection(); void queueQuery(TCPQuery&& query); - bool canAcceptNewQueries() const; + bool canAcceptNewQueries(const struct timeval& now); bool active() const { @@ -179,7 +179,7 @@ public: return o.str(); } - enum class State { doingHandshake, readingProxyProtocolHeader, readingQuerySize, readingQuery, sendingResponse, idle /* in case of XFR, we stop processing queries */ }; + enum class State { doingHandshake, readingProxyProtocolHeader, waitingForQuery, readingQuerySize, readingQuery, sendingResponse, idle /* in case of XFR, we stop processing queries */ }; std::map, std::deque>> d_activeConnectionsToBackend; PacketBuffer d_buffer; @@ -209,5 +209,5 @@ public: bool d_isXFR{false}; bool d_xfrStarted{false}; bool d_proxyProtocolPayloadHasTLV{false}; + bool d_lastIOBlocked{false}; }; -