From: Remi Gacogne Date: Wed, 13 Oct 2021 16:02:33 +0000 (+0200) Subject: dnsdist: Keep watching idle DoH backend connections X-Git-Tag: rec-4.6.0-beta1~34^2~2 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=becad6135c0a21e5b1b7dcf6c5681b315e010e05;p=thirdparty%2Fpdns.git dnsdist: Keep watching idle DoH backend connections So we can quickly detect a connection closed by the remote host. The `isTCPSocketUsable()` function is unfortunately not enough if the remote end properly sent a GO AWAY frame before closing the connection, as the socket then becomes readable and usable, instead of closed. We need to actually read that message to know that the remote end closed the connection. --- diff --git a/pdns/dnsdistdist/dnsdist-nghttp2.cc b/pdns/dnsdistdist/dnsdist-nghttp2.cc index 0f1e78d8fa..e9a659129b 100644 --- a/pdns/dnsdistdist/dnsdist-nghttp2.cc +++ b/pdns/dnsdistdist/dnsdist-nghttp2.cc @@ -68,6 +68,8 @@ public: d_healthCheckQuery = h; } + void stopIO(); + private: static ssize_t send_callback(nghttp2_session* session, const uint8_t* data, size_t length, int flags, void* user_data); static int on_frame_recv_callback(nghttp2_session* session, const nghttp2_frame* frame, void* user_data); @@ -93,9 +95,8 @@ private: bool d_finished{false}; }; void addToIOState(IOState state, FDMultiplexer::callbackfunc_t callback); - void updateIO(IOState newState, FDMultiplexer::callbackfunc_t callback); - void stopIO(); - + void updateIO(IOState newState, FDMultiplexer::callbackfunc_t callback, bool noTTD=false); + void watchForRemoteHostClosingConnection(); void handleResponse(PendingRequest&& request); void handleResponseError(PendingRequest&& request, const struct timeval& now); void handleIOError(); @@ -440,6 +441,7 @@ void DoHConnectionToBackend::handleReadableIOCallback(int fd, FDMultiplexer::fun if (newState == IOState::Done) { if (conn->getConcurrentStreamsCount() == 0) { conn->stopIO(); + conn->watchForRemoteHostClosingConnection(); ioGuard.release(); break; } @@ -486,6 +488,9 @@ void DoHConnectionToBackend::handleWritableIOCallback(int fd, FDMultiplexer::fun if (conn->getConcurrentStreamsCount() > 0) { conn->updateIO(IOState::NeedRead, handleReadableIOCallback); } + else { + conn->watchForRemoteHostClosingConnection(); + } } ioGuard.release(); } @@ -508,23 +513,25 @@ void DoHConnectionToBackend::stopIO() } } -void DoHConnectionToBackend::updateIO(IOState newState, FDMultiplexer::callbackfunc_t callback) +void DoHConnectionToBackend::updateIO(IOState newState, FDMultiplexer::callbackfunc_t callback, bool noTTD) { struct timeval now; gettimeofday(&now, nullptr); boost::optional ttd{boost::none}; - if (d_healthCheckQuery) { - ttd = getBackendHealthCheckTTD(now); - } - else if (newState == IOState::NeedRead) { - ttd = getBackendReadTTD(now); - } - else if (isFresh() && d_firstWrite) { - /* first write just after the non-blocking connect */ - ttd = getBackendConnectTTD(now); - } - else { - ttd = getBackendWriteTTD(now); + if (!noTTD) { + if (d_healthCheckQuery) { + ttd = getBackendHealthCheckTTD(now); + } + else if (newState == IOState::NeedRead) { + ttd = getBackendReadTTD(now); + } + else if (isFresh() && d_firstWrite) { + /* first write just after the non-blocking connect */ + ttd = getBackendConnectTTD(now); + } + else { + ttd = getBackendWriteTTD(now); + } } auto shared = std::dynamic_pointer_cast(shared_from_this()); @@ -538,6 +545,13 @@ void DoHConnectionToBackend::updateIO(IOState newState, FDMultiplexer::callbackf } } +void DoHConnectionToBackend::watchForRemoteHostClosingConnection() +{ + if (willBeReusable() && !d_healthCheckQuery) { + updateIO(IOState::NeedRead, handleReadableIOCallback, false); + } +} + void DoHConnectionToBackend::addToIOState(IOState state, FDMultiplexer::callbackfunc_t callback) { struct timeval now; @@ -589,6 +603,9 @@ ssize_t DoHConnectionToBackend::send_callback(nghttp2_session* session, const ui if (conn->getConcurrentStreamsCount() > 0) { conn->updateIO(IOState::NeedRead, handleReadableIOCallback); } + else { + conn->watchForRemoteHostClosingConnection(); + } } else { conn->updateIO(state, handleWritableIOCallback); @@ -632,6 +649,10 @@ int DoHConnectionToBackend::on_frame_recv_callback(nghttp2_session* session, con } #endif + if (frame->hd.type == NGHTTP2_GOAWAY) { + conn->d_connectionDied = true; + } + /* is this the last frame for this stream? */ if ((frame->hd.type == NGHTTP2_HEADERS || frame->hd.type == NGHTTP2_DATA) && frame->hd.flags & NGHTTP2_FLAG_END_STREAM) { auto stream = conn->d_currentStreams.find(frame->hd.stream_id); @@ -652,8 +673,10 @@ int DoHConnectionToBackend::on_frame_recv_callback(nghttp2_session* session, con conn->handleResponseError(std::move(request), now); } + if (conn->getConcurrentStreamsCount() == 0) { conn->stopIO(); + conn->watchForRemoteHostClosingConnection(); } } else { @@ -687,8 +710,8 @@ int DoHConnectionToBackend::on_data_chunk_recv_callback(nghttp2_session* session stream->second.d_buffer.insert(stream->second.d_buffer.end(), data, data + len); if (stream->second.d_finished) { - //cerr<<"we now have the full response!"<(data), len)<(data), len)<second); conn->d_currentStreams.erase(stream->first); @@ -704,11 +727,9 @@ int DoHConnectionToBackend::on_data_chunk_recv_callback(nghttp2_session* session } if (conn->getConcurrentStreamsCount() == 0) { conn->stopIO(); + conn->watchForRemoteHostClosingConnection(); } } - else { - //cerr<<"but the stream is not finished yet"<getConcurrentStreamsCount() == 0) { //cerr<<"stopping IO"<stopIO(); - //cerr<<"our current refcnt is now "<getUsageCount()<watchForRemoteHostClosingConnection(); } return 0; @@ -864,6 +885,9 @@ size_t DownstreamDoHConnectionsManager::clear() size_t result = 0; for (const auto& backend : t_downstreamConnections) { result += backend.second.size(); + for (auto& conn : backend.second) { + conn->stopIO(); + } } t_downstreamConnections.clear(); return result; @@ -890,9 +914,11 @@ bool DownstreamDoHConnectionsManager::removeDownstreamConnection(std::shared_ptr void DownstreamDoHConnectionsManager::cleanupClosedConnections(struct timeval now) { + //cerr<<"cleanup interval is "< 0 && t_nextCleanup > now.tv_sec)) { return; } + t_nextCleanup = now.tv_sec + s_cleanupInterval; struct timeval freshCutOff = now; diff --git a/pdns/dnsdistdist/test-dnsdistnghttp2_cc.cc b/pdns/dnsdistdist/test-dnsdistnghttp2_cc.cc index e26e0a9075..5886d1a229 100644 --- a/pdns/dnsdistdist/test-dnsdistnghttp2_cc.cc +++ b/pdns/dnsdistdist/test-dnsdistnghttp2_cc.cc @@ -80,8 +80,9 @@ std::ostream& operator<<(std::ostream& os, const ExpectedStep::ExpectedRequest d return os; } -struct DOHConnection +class DOHConnection { +public: DOHConnection(bool needProxyProtocol) : d_session(std::unique_ptr(nullptr, nghttp2_session_del)), d_needProxyProtocol(needProxyProtocol) { @@ -181,6 +182,9 @@ struct DOHConnection int rv = nghttp2_submit_response(d_session.get(), streamId, hdrs, sizeof(hdrs) / sizeof(*hdrs), &dataProvider); // cerr<<"Submitting response for stream ID "<(user_data); - if (error_code == 0) { return 0; } @@ -728,9 +736,16 @@ BOOST_FIXTURE_TEST_CASE(test_SingleQuery, TestFixture) dynamic_cast(s_mplexer.get())->setReady(desc); }}, /* read settings, headers and response from the server */ - {ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, std::numeric_limits::max()}, + {ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, std::numeric_limits::max(), [](int desc, const ExpectedStep& step) { + /* set the outgoing descriptor (backend connection) as NOT ready anymore */ + dynamic_cast(s_mplexer.get())->setNotReady(desc); + }}, /* acknowledge settings */ - {ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, std::numeric_limits::max()}, + {ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, std::numeric_limits::max(), [](int desc, const ExpectedStep& step) { + s_connectionBuffers.at(desc)->submitGoAway(); + dynamic_cast(s_mplexer.get())->setReady(desc); + }}, + {ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, std::numeric_limits::max()}, {ExpectedStep::ExpectedRequest::closeBackend, IOState::Done}, }; @@ -810,7 +825,11 @@ BOOST_FIXTURE_TEST_CASE(test_ConcurrentQueries, TestFixture) /* read settings, headers and responses from the server */ {ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, std::numeric_limits::max()}, /* acknowledge settings */ - {ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, std::numeric_limits::max()}, + {ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, std::numeric_limits::max(), [](int desc, const ExpectedStep& step) { + s_connectionBuffers.at(desc)->submitGoAway(); + dynamic_cast(s_mplexer.get())->setReady(desc); + }}, + {ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, std::numeric_limits::max()}, {ExpectedStep::ExpectedRequest::closeBackend, IOState::Done}, }; @@ -872,6 +891,7 @@ BOOST_FIXTURE_TEST_CASE(test_ConnectionReuse, TestFixture) queries.push_back({std::move(sender), std::move(internalQuery)}); } + bool firstQueryDone = false; s_steps = { {ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done}, /* opening */ @@ -888,16 +908,23 @@ BOOST_FIXTURE_TEST_CASE(test_ConnectionReuse, TestFixture) /* read settings, headers and responses from the server */ {ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, std::numeric_limits::max()}, /* acknowledge settings */ - {ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, std::numeric_limits::max()}, + {ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, std::numeric_limits::max(), [&firstQueryDone](int desc, const ExpectedStep& step) { + firstQueryDone = true; + }}, /* headers */ - {ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, std::numeric_limits::max()}, + {ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, std::numeric_limits::max(), [](int desc, const ExpectedStep& step) { + }}, /* data */ {ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, std::numeric_limits::max(), [](int desc, const ExpectedStep& step) { /* set the outgoing descriptor (backend connection) as ready */ - dynamic_cast(s_mplexer.get())->setReady(desc); + dynamic_cast(s_mplexer.get())->setReady(desc); }}, /* read settings, headers and responses from the server */ {ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, std::numeric_limits::max()}, + /* later the backend sends a go away frame */ + {ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, std::numeric_limits::max(), [](int desc, const ExpectedStep& step) { + s_connectionBuffers.at(desc)->submitGoAway(); + }}, {ExpectedStep::ExpectedRequest::closeBackend, IOState::Done}, }; @@ -907,11 +934,12 @@ BOOST_FIXTURE_TEST_CASE(test_ConnectionReuse, TestFixture) bool result = sendH2Query(backend, s_mplexer, sliced, std::move(query.second), false); BOOST_CHECK_EQUAL(result, true); - while (s_mplexer->getWatchedFDCount(false) != 0 || s_mplexer->getWatchedFDCount(true) != 0) { + while (!firstQueryDone && (s_mplexer->getWatchedFDCount(false) != 0 || s_mplexer->getWatchedFDCount(true) != 0)) { s_mplexer->run(&now); } BOOST_CHECK_EQUAL(query.first->d_valid, true); + BOOST_CHECK_EQUAL(firstQueryDone, true); } { @@ -992,6 +1020,10 @@ BOOST_FIXTURE_TEST_CASE(test_InvalidDNSAnswer, TestFixture) {ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, std::numeric_limits::max()}, /* acknowledge settings */ {ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, std::numeric_limits::max()}, + /* try to read, the backend says to go away */ + {ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, std::numeric_limits::max(), [](int desc, const ExpectedStep& step) { + s_connectionBuffers.at(desc)->submitGoAway(); + }}, {ExpectedStep::ExpectedRequest::closeBackend, IOState::Done}, }; @@ -1222,6 +1254,7 @@ BOOST_FIXTURE_TEST_CASE(test_ShortWrite, TestFixture) queries.push_back({std::move(sender), std::move(internalQuery)}); } + bool done = false; s_steps = { {ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done}, /* opening */ @@ -1240,7 +1273,11 @@ BOOST_FIXTURE_TEST_CASE(test_ShortWrite, TestFixture) /* read settings, headers and responses from the server */ {ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, std::numeric_limits::max()}, /* acknowledge settings */ - {ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, std::numeric_limits::max()}, + {ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, std::numeric_limits::max(), [&done](int desc, const ExpectedStep& step) { + /* mark backend as not ready */ + dynamic_cast(s_mplexer.get())->setNotReady(desc); + done = true; + }}, {ExpectedStep::ExpectedRequest::closeBackend, IOState::Done}, }; @@ -1250,7 +1287,7 @@ BOOST_FIXTURE_TEST_CASE(test_ShortWrite, TestFixture) BOOST_CHECK_EQUAL(result, true); } - while (s_mplexer->getWatchedFDCount(false) != 0 || s_mplexer->getWatchedFDCount(true) != 0) { + while (!done && (s_mplexer->getWatchedFDCount(false) != 0 || s_mplexer->getWatchedFDCount(true) != 0)) { s_mplexer->run(&now); } @@ -1304,6 +1341,7 @@ BOOST_FIXTURE_TEST_CASE(test_ShortRead, TestFixture) queries.push_back({std::move(sender), std::move(internalQuery)}); } + bool done = false; s_steps = { {ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done}, /* opening */ @@ -1329,7 +1367,11 @@ BOOST_FIXTURE_TEST_CASE(test_ShortRead, TestFixture) /* read settings, headers and responses (second attempt) */ {ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, std::numeric_limits::max()}, /* acknowledge settings */ - {ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, std::numeric_limits::max()}, + {ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, std::numeric_limits::max(), [&done](int desc, const ExpectedStep& step) { + /* mark backend as not ready */ + dynamic_cast(s_mplexer.get())->setNotReady(desc); + done = true; + }}, {ExpectedStep::ExpectedRequest::closeBackend, IOState::Done}, }; @@ -1339,7 +1381,7 @@ BOOST_FIXTURE_TEST_CASE(test_ShortRead, TestFixture) BOOST_CHECK_EQUAL(result, true); } - while (s_mplexer->getWatchedFDCount(false) != 0 || s_mplexer->getWatchedFDCount(true) != 0) { + while (!done && (s_mplexer->getWatchedFDCount(false) != 0 || s_mplexer->getWatchedFDCount(true) != 0)) { s_mplexer->run(&now); } @@ -1479,6 +1521,7 @@ BOOST_FIXTURE_TEST_CASE(test_ConnectionClosedWhileWriting, TestFixture) queries.push_back({std::move(sender), std::move(internalQuery)}); } + bool done = false; s_steps = { {ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done}, /* opening */ @@ -1503,7 +1546,11 @@ BOOST_FIXTURE_TEST_CASE(test_ConnectionClosedWhileWriting, TestFixture) /* read settings, headers and response from the server */ {ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, std::numeric_limits::max()}, /* acknowledge settings */ - {ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, std::numeric_limits::max()}, + {ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, std::numeric_limits::max(), [&done](int desc, const ExpectedStep& step) { + /* mark backend as not ready */ + dynamic_cast(s_mplexer.get())->setNotReady(desc); + done = true; + }}, {ExpectedStep::ExpectedRequest::closeBackend, IOState::Done}, }; @@ -1513,7 +1560,7 @@ BOOST_FIXTURE_TEST_CASE(test_ConnectionClosedWhileWriting, TestFixture) BOOST_CHECK_EQUAL(result, true); } - while (s_mplexer->getWatchedFDCount(false) != 0 || s_mplexer->getWatchedFDCount(true) != 0) { + while (!done && (s_mplexer->getWatchedFDCount(false) != 0 || s_mplexer->getWatchedFDCount(true) != 0)) { s_mplexer->run(&now); } @@ -1679,6 +1726,7 @@ BOOST_FIXTURE_TEST_CASE(test_HTTP500FromServer, TestFixture) queries.push_back({std::move(sender), std::move(internalQuery)}); } + bool done = false; s_steps = { {ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done}, /* opening */ @@ -1702,7 +1750,11 @@ BOOST_FIXTURE_TEST_CASE(test_HTTP500FromServer, TestFixture) /* read settings, headers and responses from the server */ {ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, std::numeric_limits::max()}, /* acknowledge settings */ - {ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, std::numeric_limits::max()}, + {ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, std::numeric_limits::max(), [&done](int desc, const ExpectedStep& step) { + /* mark backend as not ready */ + dynamic_cast(s_mplexer.get())->setNotReady(desc); + done = true; + }}, {ExpectedStep::ExpectedRequest::closeBackend, IOState::Done}, }; @@ -1712,7 +1764,7 @@ BOOST_FIXTURE_TEST_CASE(test_HTTP500FromServer, TestFixture) BOOST_CHECK_EQUAL(result, true); } - while (s_mplexer->getWatchedFDCount(false) != 0 || s_mplexer->getWatchedFDCount(true) != 0) { + while (!done && (s_mplexer->getWatchedFDCount(false) != 0 || s_mplexer->getWatchedFDCount(true) != 0)) { s_mplexer->run(&now); } @@ -1921,6 +1973,8 @@ BOOST_FIXTURE_TEST_CASE(test_ProxyProtocol, TestFixture) for (auto& query : queries) { BOOST_CHECK_EQUAL(query.first->d_valid, true); } + + BOOST_CHECK_EQUAL(clearH2Connections(), 0U); } BOOST_AUTO_TEST_SUITE_END();