From: Remi Gacogne Date: Thu, 17 Sep 2020 15:04:04 +0000 (+0200) Subject: dnsdist: Try to send before calling epoll_wait X-Git-Tag: auth-4.5.0-alpha0~14^2~8 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=44e34991ec27326930fbca60e773667d652e5a36;p=thirdparty%2Fpdns.git dnsdist: Try to send before calling epoll_wait It usually works and saves adding the descriptor to the set, calling epoll_wait() then removing the descriptor. --- diff --git a/pdns/dnsdist-tcp.cc b/pdns/dnsdist-tcp.cc index 1da8097268..1f5387e274 100644 --- a/pdns/dnsdist-tcp.cc +++ b/pdns/dnsdist-tcp.cc @@ -253,7 +253,42 @@ void TCPClientCollection::addTCPClientThread() std::unique_ptr g_tcpclientthreads; -static IOState handleResponseSent(std::shared_ptr& state, const struct timeval& now) +static IOState sendQueuedResponses(std::shared_ptr& state, const struct timeval& now) +{ + IOState result = IOState::Done; + + while (!state->d_queuedResponses.empty()) { + DEBUGLOG("queue size is "<d_queuedResponses.size()<<", sending the next one"); + TCPResponse resp = std::move(state->d_queuedResponses.front()); + state->d_queuedResponses.pop_front(); + state->d_state = IncomingTCPConnectionState::State::idle; + result = state->sendResponse(state, now, std::move(resp)); + if (result != IOState::Done) { + return result; + } + } + + if (state->d_isXFR) { + /* we should still be reading from the backend, and we don't want to read from the client */ + state->d_state = IncomingTCPConnectionState::State::idle; + state->d_currentPos = 0; + DEBUGLOG("idling for XFR completion"); + return IOState::Done; + } else { + if (state->canAcceptNewQueries()) { + DEBUGLOG("waiting for new queries"); + state->resetForNewQuery(); + return IOState::NeedRead; + } + else { + DEBUGLOG("idling"); + state->d_state = IncomingTCPConnectionState::State::idle; + return IOState::Done; + } + } +} + +static bool handleResponseSent(std::shared_ptr& state, const struct timeval& now) { --state->d_currentQueriesCount; @@ -284,43 +319,16 @@ static IOState handleResponseSent(std::shared_ptr& s if (g_maxTCPQueriesPerConn && state->d_queriesCount > g_maxTCPQueriesPerConn) { vinfolog("Terminating TCP connection from %s because it reached the maximum number of queries per conn (%d / %d)", state->d_ci.remote.toStringWithPort(), state->d_queriesCount, g_maxTCPQueriesPerConn); - return IOState::Done; + return false; } if (state->maxConnectionDurationReached(g_maxTCPConnectionDuration, now)) { vinfolog("Terminating TCP connection from %s because it reached the maximum TCP connection duration", state->d_ci.remote.toStringWithPort()); - return IOState::Done; + return false; } } - if (state->d_queuedResponses.empty()) { - if (state->d_isXFR) { - /* we should still be reading from the backend, and we don't want to read from the client */ - state->d_state = IncomingTCPConnectionState::State::idle; - state->d_currentPos = 0; - DEBUGLOG("idling for XFR completion"); - return IOState::Done; - } else { - if (state->canAcceptNewQueries()) { - DEBUGLOG("waiting for new queries"); - state->resetForNewQuery(); - return IOState::NeedRead; - } - else { - DEBUGLOG("idling"); - state->d_state = IncomingTCPConnectionState::State::idle; - return IOState::Done; - } - } - } - else { - DEBUGLOG("queue size is "<d_queuedResponses.size()<<", sending the next one"); - TCPResponse resp = std::move(state->d_queuedResponses.front()); - state->d_queuedResponses.pop_front(); - state->d_state = IncomingTCPConnectionState::State::idle; - state->sendResponse(state, now, std::move(resp)); - return IOState::NeedWrite; - } + return true; } bool IncomingTCPConnectionState::canAcceptNewQueries() const @@ -372,26 +380,43 @@ void IncomingTCPConnectionState::registerActiveDownstreamConnection(std::shared_ d_activeConnectionsToBackend[conn->getDS()].push_front(conn); } -/* this version is called when the buffer has been set and the rules have been processed */ -void IncomingTCPConnectionState::sendResponse(std::shared_ptr& state, const struct timeval& now, TCPResponse&& response) +/* called when the buffer has been set and the rules have been processed, and only from handleIO (sometimes indirectly via handleQuery) */ +IOState IncomingTCPConnectionState::sendResponse(std::shared_ptr& state, const struct timeval& now, TCPResponse&& response) +{ + state->d_state = IncomingTCPConnectionState::State::sendingResponse; + + uint16_t responseSize = static_cast(response.d_buffer.size()); + const uint8_t sizeBytes[] = { static_cast(responseSize / 256), static_cast(responseSize % 256) }; + /* prepend the size. Yes, this is not the most efficient way but it prevents mistakes + that could occur if we had to deal with the size during the processing, + especially alignment issues */ + response.d_buffer.insert(response.d_buffer.begin(), sizeBytes, sizeBytes + 2); + state->d_currentPos = 0; + state->d_currentResponse = std::move(response); + + auto iostate = state->d_handler.tryWrite(state->d_currentResponse.d_buffer, state->d_currentPos, state->d_currentResponse.d_buffer.size()); + if (iostate == IOState::Done) { + DEBUGLOG("response sent"); + if (!handleResponseSent(state, now)) { + return IOState::Done; + } + return sendQueuedResponses(state, now); + } else { + return IOState::NeedWrite; + DEBUGLOG("partial write"); + } +} + +/* called when handling a response or error coming from a backend */ +void IncomingTCPConnectionState::sendOrQueueResponse(std::shared_ptr& state, const struct timeval& now, TCPResponse&& response) { // if we already reading a query (not the query size, mind you), or sending a response we need to either queue the response // otherwise we can start sending it right away if (state->d_state == IncomingTCPConnectionState::State::idle || state->d_state == IncomingTCPConnectionState::State::readingQuerySize) { - state->d_state = IncomingTCPConnectionState::State::sendingResponse; - - uint16_t responseSize = static_cast(response.d_buffer.size()); - const uint8_t sizeBytes[] = { static_cast(responseSize / 256), static_cast(responseSize % 256) }; - /* prepend the size. Yes, this is not the most efficient way but it prevents mistakes - that could occur if we had to deal with the size during the processing, - especially alignment issues */ - response.d_buffer.insert(response.d_buffer.begin(), sizeBytes, sizeBytes + 2); - state->d_currentPos = 0; - state->d_currentResponse = std::move(response); - - state->d_ioState->update(IOState::NeedWrite, handleIOCallback, state, state->getClientWriteTTD(now)); + auto iostate = sendResponse(state, now, std::move(response)); + state->d_ioState->update(iostate, handleIOCallback, state, iostate == IOState::NeedWrite ? state->getClientWriteTTD(now) : state->getClientReadTTD(now)); } else { // queue response @@ -400,17 +425,21 @@ void IncomingTCPConnectionState::sendResponse(std::shared_ptr& state, const struct timeval& now, TCPResponse&& response) +/* called from the backend code when a new response has been received */ +void IncomingTCPConnectionState::handleResponse(std::shared_ptr state, const struct timeval& now, TCPResponse&& response) { - // if we have added a TCP Proxy Protocol payload to a connection, don't release it yet, no one else will be able to use it anyway - if (!state->d_isXFR && response.d_connection && response.d_connection->isIdle() && response.d_connection->canBeReused()) { - auto& list = state->d_activeConnectionsToBackend.at(response.d_connection->getDS()); - for (auto it = list.begin(); it != list.end(); ++it) { - if (*it == response.d_connection) { - DownstreamConnectionsManager::releaseDownstreamConnection(std::move(*it)); - list.erase(it); - break; + if (!state->d_isXFR && response.d_connection && response.d_connection->isIdle()) { + // if we have added a TCP Proxy Protocol payload to a connection, don't release it to the general pool yet, no one else will be able to use it anyway + if (response.d_connection->canBeReused()) { + auto& list = state->d_activeConnectionsToBackend.at(response.d_connection->getDS()); + + for (auto it = list.begin(); it != list.end(); ++it) { + if (*it == response.d_connection) { + response.d_connection->release(); + DownstreamConnectionsManager::releaseDownstreamConnection(std::move(*it)); + list.erase(it); + break; + } } } } @@ -471,14 +500,14 @@ void IncomingTCPConnectionState::handleResponse(std::shared_ptr& state, const struct timeval& now) +static IOState handleQuery(std::shared_ptr& state, const struct timeval& now) { if (state->d_querySize < sizeof(dnsheader)) { ++g_stats.nonCompliantQueries; - return true; + return IOState::NeedRead; } state->d_readingFirstQuery = false; @@ -521,13 +550,12 @@ static bool handleQuery(std::shared_ptr& state, cons response.d_buffer = std::move(*dnsCryptResponse); state->d_state = IncomingTCPConnectionState::State::idle; ++state->d_currentQueriesCount; - state->sendResponse(state, now, std::move(response)); - return false; + return state->sendResponse(state, now, std::move(response)); } const auto& dh = reinterpret_cast(query); if (!checkQueryHeaders(dh)) { - return true; + return IOState::NeedRead; } uint16_t qtype, qclass; @@ -546,7 +574,7 @@ static bool handleQuery(std::shared_ptr& state, cons auto result = processQuery(dq, *state->d_ci.cs, state->d_threadData.holders, ds); if (result == ProcessQueryResult::Drop) { - return true; + return IOState::Done; } if (result == ProcessQueryResult::SendAnswer) { @@ -556,12 +584,11 @@ static bool handleQuery(std::shared_ptr& state, cons response.d_buffer = std::move(state->d_buffer); state->d_state = IncomingTCPConnectionState::State::idle; ++state->d_currentQueriesCount; - state->sendResponse(state, now, std::move(response)); - return false; + return state->sendResponse(state, now, std::move(response)); } if (result != ProcessQueryResult::PassToBackend || ds == nullptr) { - return true; + return IOState::Done; } IDState ids; @@ -613,7 +640,7 @@ static bool handleQuery(std::shared_ptr& state, cons vinfolog("Got query for %s|%s from %s (%s, %d bytes), relayed to %s", ids.qname.toLogString(), QType(ids.qtype).getName(), state->d_ci.remote.toStringWithPort(), (state->d_ci.cs->tlsFrontend ? "DoT" : "TCP"), state->d_buffer.size(), ds->getName()); downstreamConnection->queueQuery(TCPQuery(std::move(state->d_buffer), std::move(ids)), downstreamConnection); - return true; + return IOState::NeedRead; } void IncomingTCPConnectionState::handleIOCallback(int fd, FDMultiplexer::funcparam_t& param) @@ -706,13 +733,13 @@ void IncomingTCPConnectionState::handleIO(std::shared_ptrd_queuedResponses.empty()) { if (state->canAcceptNewQueries()) { state->resetForNewQuery(); - iostate = IOState::NeedRead; } else { state->d_state = IncomingTCPConnectionState::State::idle; @@ -724,16 +751,9 @@ void IncomingTCPConnectionState::handleIO(std::shared_ptrd_queuedResponses.pop_front(); ioGuard.release(); state->d_state = IncomingTCPConnectionState::State::idle; - state->sendResponse(state, now, std::move(resp)); - return; + iostate = sendResponse(state, now, std::move(resp)); } } - else { - /* otherwise the state should already be waiting for - the socket to be writable */ - ioGuard.release(); - return; - } } else { wouldBlock = true; @@ -745,7 +765,12 @@ void IncomingTCPConnectionState::handleIO(std::shared_ptrd_handler.tryWrite(state->d_currentResponse.d_buffer, state->d_currentPos, state->d_currentResponse.d_buffer.size()); if (iostate == IOState::Done) { DEBUGLOG("response sent"); - iostate = handleResponseSent(state, now); + if (!handleResponseSent(state, now)) { + iostate = IOState::Done; + } + else { + iostate = sendQueuedResponses(state, now); + } } else { wouldBlock = true; DEBUGLOG("partial write"); @@ -795,7 +820,7 @@ void IncomingTCPConnectionState::handleIO(std::shared_ptrd_state == IncomingTCPConnectionState::State::readingQuerySize && iostate == IOState::NeedRead && !wouldBlock); + while ((iostate == IOState::NeedRead || iostate == IOState::NeedWrite) && !wouldBlock); } void IncomingTCPConnectionState::notifyIOError(std::shared_ptr& state, IDState&& query, const struct timeval& now) @@ -810,7 +835,7 @@ void IncomingTCPConnectionState::notifyIOError(std::shared_ptrd_queuedResponses.front()); state->d_queuedResponses.pop_front(); state->d_state = IncomingTCPConnectionState::State::idle; - sendResponse(state, now, std::move(resp)); + sendOrQueueResponse(state, now, std::move(resp)); } else { // the backend code already tried to reconnect if it was possible @@ -820,7 +845,7 @@ void IncomingTCPConnectionState::notifyIOError(std::shared_ptr& state, const struct timeval& now, TCPResponse&& response) { - sendResponse(state, now, std::move(response)); + sendOrQueueResponse(state, now, std::move(response)); } void IncomingTCPConnectionState::handleTimeout(std::shared_ptr& state, bool write) diff --git a/pdns/dnsdistdist/dnsdist-tcp-downstream.cc b/pdns/dnsdistdist/dnsdist-tcp-downstream.cc index 4fa155c55c..8b19b8d7de 100644 --- a/pdns/dnsdistdist/dnsdist-tcp-downstream.cc +++ b/pdns/dnsdistdist/dnsdist-tcp-downstream.cc @@ -23,7 +23,15 @@ void TCPConnectionToBackend::assignToClientConnection(std::shared_ptr& conn) +void TCPConnectionToBackend::release() +{ + d_clientConn.reset(); + if (d_ioState) { + d_ioState.reset(); + } +} + +IOState TCPConnectionToBackend::queueNextQuery(std::shared_ptr& conn) { conn->d_currentQuery = std::move(conn->d_pendingQueries.front()); conn->d_pendingQueries.pop_front(); @@ -69,6 +77,38 @@ static IOState tryRead(int fd, std::vector& buffer, size_t& pos, size_t return IOState::Done; } +IOState TCPConnectionToBackend::sendQuery(std::shared_ptr& conn, const struct timeval& now) +{ + int fd = conn->d_socket->getHandle(); + DEBUGLOG("sending query to backend "<getDS()->getName()<<" over FD "<isFastOpenEnabled()) { + socketFlags |= MSG_FASTOPEN; + } +#endif /* MSG_FASTOPEN */ + + size_t sent = sendMsgWithOptions(fd, reinterpret_cast(&conn->d_currentQuery.d_buffer.at(conn->d_currentPos)), conn->d_currentQuery.d_buffer.size() - conn->d_currentPos, &conn->d_ds->remote, &conn->d_ds->sourceAddr, conn->d_ds->sourceItf, socketFlags); + if (sent == conn->d_currentQuery.d_buffer.size()) { + DEBUGLOG("query sent to backend"); + /* request sent ! */ + conn->incQueries(); + conn->d_currentPos = 0; + + DEBUGLOG("adding a pending response for ID "<d_currentQuery.d_idstate.origID<<" and QNAME "<d_currentQuery.d_idstate.qname); + conn->d_pendingResponses[conn->d_currentQuery.d_idstate.origID] = std::move(conn->d_currentQuery); + conn->d_currentQuery.d_buffer.clear(); + + return IOState::Done; + } + else { + conn->d_currentPos += sent; + /* disable fast open on partial write */ + conn->disableFastOpen(); + return IOState::NeedWrite; + } +} + void TCPConnectionToBackend::handleIO(std::shared_ptr& conn, const struct timeval& now) { if (conn->d_socket == nullptr) { @@ -82,40 +122,18 @@ void TCPConnectionToBackend::handleIO(std::shared_ptr& c try { if (conn->d_state == State::sendingQueryToBackend) { - DEBUGLOG("sending query to backend "<getDS()->getName()<<" over FD "<isFastOpenEnabled()) { - socketFlags |= MSG_FASTOPEN; + iostate = sendQuery(conn, now); + + while (iostate == IOState::Done && !conn->d_pendingQueries.empty()) { + queueNextQuery(conn); + iostate = sendQuery(conn, now); } -#endif /* MSG_FASTOPEN */ - size_t sent = sendMsgWithOptions(fd, reinterpret_cast(&conn->d_currentQuery.d_buffer.at(conn->d_currentPos)), conn->d_currentQuery.d_buffer.size() - conn->d_currentPos, &conn->d_ds->remote, &conn->d_ds->sourceAddr, conn->d_ds->sourceItf, socketFlags); - if (sent == conn->d_currentQuery.d_buffer.size()) { - DEBUGLOG("query sent to backend"); - /* request sent ! */ - conn->incQueries(); + if (iostate == IOState::Done && conn->d_pendingQueries.empty()) { + conn->d_state = State::readingResponseSizeFromBackend; conn->d_currentPos = 0; - - DEBUGLOG("adding a pending response for ID "<d_currentQuery.d_idstate.origID<<" and QNAME "<d_currentQuery.d_idstate.qname); - conn->d_pendingResponses[conn->d_currentQuery.d_idstate.origID] = std::move(conn->d_currentQuery); - conn->d_currentQuery.d_buffer.clear(); - - if (conn->d_pendingQueries.empty()) { - conn->d_state = State::readingResponseSizeFromBackend; - conn->d_currentPos = 0; - conn->d_responseBuffer.resize(sizeof(uint16_t)); - iostate = IOState::NeedRead; - } - else { - iostate = sendNextQuery(conn); - } - } - else { - conn->d_currentPos += sent; - iostate = IOState::NeedWrite; - /* disable fast open on partial write */ - conn->disableFastOpen(); + conn->d_responseBuffer.resize(sizeof(uint16_t)); + iostate = IOState::NeedRead; } } @@ -206,7 +224,11 @@ void TCPConnectionToBackend::handleIO(std::shared_ptr& c // resume sending query } else { - iostate = sendNextQuery(conn); + if (conn->d_pendingQueries.empty()) { + throw std::runtime_error("TCP connection to a backend in state " + std::to_string((int)conn->d_state) + " with no pending queries"); + } + + iostate = queueNextQuery(conn); } if (!conn->d_proxyProtocolPayloadAdded && !conn->d_proxyProtocolPayload.empty()) { @@ -225,12 +247,15 @@ void TCPConnectionToBackend::handleIO(std::shared_ptr& c } } - if (iostate == IOState::Done) { - conn->d_ioState->update(iostate, handleIOCallback, conn); - } - else { - conn->d_ioState->update(iostate, handleIOCallback, conn, iostate == IOState::NeedRead ? conn->getBackendReadTTD(now) : conn->getBackendWriteTTD(now)); + if (conn->d_ioState) { + if (iostate == IOState::Done) { + conn->d_ioState->update(iostate, handleIOCallback, conn); + } + else { + conn->d_ioState->update(iostate, handleIOCallback, conn, iostate == IOState::NeedRead ? conn->getBackendReadTTD(now) : conn->getBackendWriteTTD(now)); + } } + ioGuard.release(); } @@ -267,7 +292,8 @@ void TCPConnectionToBackend::queueQuery(TCPQuery&& query, std::shared_ptrupdate(IOState::NeedWrite, handleIOCallback, sharedSelf, getBackendWriteTTD(now)); + handleIO(sharedSelf, now); + // d_ioState->update(IOState::NeedWrite, handleIOCallback, sharedSelf, getBackendWriteTTD(now)); } else { // store query in the list of queries to send @@ -391,7 +417,7 @@ IOState TCPConnectionToBackend::handleResponse(std::shared_ptractive()) { + if (!clientConn || !clientConn->active()) { // a client timeout occured, or something like that */ d_connectionDied = true; d_clientConn.reset(); @@ -431,6 +457,10 @@ IOState TCPConnectionToBackend::handleResponse(std::shared_ptrsecond.d_idstate); d_pendingResponses.erase(it); DEBUGLOG("passing response to client connection for "<handleResponse(clientConn, now, TCPResponse(std::move(d_responseBuffer), std::move(ids), conn)); if (!d_pendingQueries.empty()) { diff --git a/pdns/dnsdistdist/dnsdist-tcp-downstream.hh b/pdns/dnsdistdist/dnsdist-tcp-downstream.hh index c815364831..bc8d6d442d 100644 --- a/pdns/dnsdistdist/dnsdist-tcp-downstream.hh +++ b/pdns/dnsdistdist/dnsdist-tcp-downstream.hh @@ -128,7 +128,7 @@ public: bool isIdle() const { - return d_pendingQueries.size() == 0 && d_pendingResponses.size() == 0; + return d_state == State::idle && d_pendingQueries.size() == 0 && d_pendingResponses.size() == 0; } /* whether a connection can be reused for a different client */ @@ -158,13 +158,16 @@ public: void queueQuery(TCPQuery&& query, std::shared_ptr& sharedSelf); void handleTimeout(const struct timeval& now, bool write); + void release(); + void setProxyProtocolPayload(std::string&& payload); void setProxyProtocolPayloadAdded(bool added); private: static void handleIO(std::shared_ptr& conn, const struct timeval& now); static void handleIOCallback(int fd, FDMultiplexer::funcparam_t& param); - static IOState sendNextQuery(std::shared_ptr& conn); + static IOState queueNextQuery(std::shared_ptr& conn); + static IOState sendQuery(std::shared_ptr& conn, const struct timeval& now); IOState handleResponse(std::shared_ptr& conn, const struct timeval& now); uint16_t getQueryIdFromResponse(); diff --git a/pdns/dnsdistdist/dnsdist-tcp-upstream.hh b/pdns/dnsdistdist/dnsdist-tcp-upstream.hh index 704e197476..91b090ff9c 100644 --- a/pdns/dnsdistdist/dnsdist-tcp-upstream.hh +++ b/pdns/dnsdistdist/dnsdist-tcp-upstream.hh @@ -179,8 +179,11 @@ public: static void handleIO(std::shared_ptr& conn, const struct timeval& now); static void handleIOCallback(int fd, FDMultiplexer::funcparam_t& param); static void notifyIOError(std::shared_ptr& state, IDState&& query, const struct timeval& now); - static void sendResponse(std::shared_ptr& state, const struct timeval& now, TCPResponse&& response); - static void handleResponse(std::shared_ptr& state, const struct timeval& now, TCPResponse&& response); + static IOState sendResponse(std::shared_ptr& state, const struct timeval& now, TCPResponse&& response); + static void sendOrQueueResponse(std::shared_ptr& state, const struct timeval& now, TCPResponse&& response); + + /* we take a copy of a shared pointer, not a reference, because the initial shared pointer might be released during the handling of the response */ + static void handleResponse(std::shared_ptr state, const struct timeval& now, TCPResponse&& response); static void handleXFRResponse(std::shared_ptr& state, const struct timeval& now, TCPResponse&& response); static void handleTimeout(std::shared_ptr& state, bool write); diff --git a/regression-tests.dnsdist/test_OOOR.py b/regression-tests.dnsdist/test_OOOR.py index ae1e457461..545bd80808 100644 --- a/regression-tests.dnsdist/test_OOOR.py +++ b/regression-tests.dnsdist/test_OOOR.py @@ -234,7 +234,7 @@ class TestOOORWithClientNotBackend(DNSDistTest): for idx in range(5): self.assertIn('%d.more-queries.ooor.tests.powerdns.com.' % (idx), receivedResponses) - self.assertLessEqual(OOORTCPResponder.numberOfConnections, 10) + self.assertLessEqual(OOORTCPResponder.numberOfConnections, self._concurrentQueriesFromClient) class TestOOORWithClientAndBackend(DNSDistTest): # this test suite uses a different responder port