]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
dnsdist: Keep watching idle DoH backend connections
authorRemi Gacogne <remi.gacogne@powerdns.com>
Wed, 13 Oct 2021 16:02:33 +0000 (18:02 +0200)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Wed, 13 Oct 2021 16:02:33 +0000 (18:02 +0200)
So we can quickly detect a connection closed by the remote host. The
`isTCPSocketUsable()` function is unfortunately not enough if the
remote end properly sent a GO AWAY frame before closing the connection,
as the socket then becomes readable and usable, instead of closed. We
need to actually read that message to know that the remote end closed
the connection.

pdns/dnsdistdist/dnsdist-nghttp2.cc
pdns/dnsdistdist/test-dnsdistnghttp2_cc.cc

index 0f1e78d8fac144efc048ba91bb375e4811887963..e9a659129b018db62d09bd7477eb8a39384f22b0 100644 (file)
@@ -68,6 +68,8 @@ public:
     d_healthCheckQuery = h;
   }
 
+  void stopIO();
+
 private:
   static ssize_t send_callback(nghttp2_session* session, const uint8_t* data, size_t length, int flags, void* user_data);
   static int on_frame_recv_callback(nghttp2_session* session, const nghttp2_frame* frame, void* user_data);
@@ -93,9 +95,8 @@ private:
     bool d_finished{false};
   };
   void addToIOState(IOState state, FDMultiplexer::callbackfunc_t callback);
-  void updateIO(IOState newState, FDMultiplexer::callbackfunc_t callback);
-  void stopIO();
-
+  void updateIO(IOState newState, FDMultiplexer::callbackfunc_t callback, bool noTTD=false);
+  void watchForRemoteHostClosingConnection();
   void handleResponse(PendingRequest&& request);
   void handleResponseError(PendingRequest&& request, const struct timeval& now);
   void handleIOError();
@@ -440,6 +441,7 @@ void DoHConnectionToBackend::handleReadableIOCallback(int fd, FDMultiplexer::fun
       if (newState == IOState::Done) {
         if (conn->getConcurrentStreamsCount() == 0) {
           conn->stopIO();
+          conn->watchForRemoteHostClosingConnection();
           ioGuard.release();
           break;
         }
@@ -486,6 +488,9 @@ void DoHConnectionToBackend::handleWritableIOCallback(int fd, FDMultiplexer::fun
       if (conn->getConcurrentStreamsCount() > 0) {
         conn->updateIO(IOState::NeedRead, handleReadableIOCallback);
       }
+      else {
+        conn->watchForRemoteHostClosingConnection();
+      }
     }
     ioGuard.release();
   }
@@ -508,23 +513,25 @@ void DoHConnectionToBackend::stopIO()
   }
 }
 
-void DoHConnectionToBackend::updateIO(IOState newState, FDMultiplexer::callbackfunc_t callback)
+void DoHConnectionToBackend::updateIO(IOState newState, FDMultiplexer::callbackfunc_t callback, bool noTTD)
 {
   struct timeval now;
   gettimeofday(&now, nullptr);
   boost::optional<struct timeval> ttd{boost::none};
-  if (d_healthCheckQuery) {
-    ttd = getBackendHealthCheckTTD(now);
-  }
-  else if (newState == IOState::NeedRead) {
-    ttd = getBackendReadTTD(now);
-  }
-  else if (isFresh() && d_firstWrite) {
-    /* first write just after the non-blocking connect */
-    ttd = getBackendConnectTTD(now);
-  }
-  else {
-    ttd = getBackendWriteTTD(now);
+  if (!noTTD) {
+    if (d_healthCheckQuery) {
+      ttd = getBackendHealthCheckTTD(now);
+    }
+    else if (newState == IOState::NeedRead) {
+      ttd = getBackendReadTTD(now);
+    }
+    else if (isFresh() && d_firstWrite) {
+      /* first write just after the non-blocking connect */
+      ttd = getBackendConnectTTD(now);
+    }
+    else {
+      ttd = getBackendWriteTTD(now);
+    }
   }
 
   auto shared = std::dynamic_pointer_cast<DoHConnectionToBackend>(shared_from_this());
@@ -538,6 +545,13 @@ void DoHConnectionToBackend::updateIO(IOState newState, FDMultiplexer::callbackf
   }
 }
 
+void DoHConnectionToBackend::watchForRemoteHostClosingConnection()
+{
+  if (willBeReusable() && !d_healthCheckQuery) {
+    updateIO(IOState::NeedRead, handleReadableIOCallback, false);
+  }
+}
+
 void DoHConnectionToBackend::addToIOState(IOState state, FDMultiplexer::callbackfunc_t callback)
 {
   struct timeval now;
@@ -589,6 +603,9 @@ ssize_t DoHConnectionToBackend::send_callback(nghttp2_session* session, const ui
         if (conn->getConcurrentStreamsCount() > 0) {
           conn->updateIO(IOState::NeedRead, handleReadableIOCallback);
         }
+        else {
+          conn->watchForRemoteHostClosingConnection();
+        }
       }
       else {
         conn->updateIO(state, handleWritableIOCallback);
@@ -632,6 +649,10 @@ int DoHConnectionToBackend::on_frame_recv_callback(nghttp2_session* session, con
   }
 #endif
 
+  if (frame->hd.type == NGHTTP2_GOAWAY) {
+    conn->d_connectionDied = true;
+  }
+
   /* is this the last frame for this stream? */
   if ((frame->hd.type == NGHTTP2_HEADERS || frame->hd.type == NGHTTP2_DATA) && frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
     auto stream = conn->d_currentStreams.find(frame->hd.stream_id);
@@ -652,8 +673,10 @@ int DoHConnectionToBackend::on_frame_recv_callback(nghttp2_session* session, con
 
         conn->handleResponseError(std::move(request), now);
       }
+
       if (conn->getConcurrentStreamsCount() == 0) {
         conn->stopIO();
+        conn->watchForRemoteHostClosingConnection();
       }
     }
     else {
@@ -687,8 +710,8 @@ int DoHConnectionToBackend::on_data_chunk_recv_callback(nghttp2_session* session
 
   stream->second.d_buffer.insert(stream->second.d_buffer.end(), data, data + len);
   if (stream->second.d_finished) {
-    //cerr<<"we now have the full response!"<<endl;
-    //cerr<<std::string(reinterpret_cast<const char*>(data), len)<<endl;
+    // cerr<<"we now have the full response!"<<endl;
+    // cerr<<std::string(reinterpret_cast<const char*>(data), len)<<endl;
 
     auto request = std::move(stream->second);
     conn->d_currentStreams.erase(stream->first);
@@ -704,11 +727,9 @@ int DoHConnectionToBackend::on_data_chunk_recv_callback(nghttp2_session* session
     }
     if (conn->getConcurrentStreamsCount() == 0) {
       conn->stopIO();
+      conn->watchForRemoteHostClosingConnection();
     }
   }
-  else {
-    //cerr<<"but the stream is not finished yet"<<endl;
-  }
 
   return 0;
 }
@@ -751,7 +772,7 @@ int DoHConnectionToBackend::on_stream_close_callback(nghttp2_session* session, i
   if (conn->getConcurrentStreamsCount() == 0) {
     //cerr<<"stopping IO"<<endl;
     conn->stopIO();
-    //cerr<<"our current refcnt is now "<<conn->getUsageCount()<<endl;
+    conn->watchForRemoteHostClosingConnection();
   }
 
   return 0;
@@ -864,6 +885,9 @@ size_t DownstreamDoHConnectionsManager::clear()
   size_t result = 0;
   for (const auto& backend : t_downstreamConnections) {
     result += backend.second.size();
+    for (auto& conn : backend.second) {
+      conn->stopIO();
+    }
   }
   t_downstreamConnections.clear();
   return result;
@@ -890,9 +914,11 @@ bool DownstreamDoHConnectionsManager::removeDownstreamConnection(std::shared_ptr
 
 void DownstreamDoHConnectionsManager::cleanupClosedConnections(struct timeval now)
 {
+  //cerr<<"cleanup interval is "<<s_cleanupInterval<<", next cleanup is "<<t_nextCleanup<<", now is "<<now.tv_sec<<endl;
   if (s_cleanupInterval <= 0 || (t_nextCleanup > 0 && t_nextCleanup > now.tv_sec)) {
     return;
   }
+
   t_nextCleanup = now.tv_sec + s_cleanupInterval;
 
   struct timeval freshCutOff = now;
index e26e0a9075898cac80bf0ceb3b573ff80501321f..5886d1a229fea46eeb2cc011f8c0262c1964854b 100644 (file)
@@ -80,8 +80,9 @@ std::ostream& operator<<(std::ostream& os, const ExpectedStep::ExpectedRequest d
   return os;
 }
 
-struct DOHConnection
+class DOHConnection
 {
+public:
   DOHConnection(bool needProxyProtocol) :
     d_session(std::unique_ptr<nghttp2_session, void (*)(nghttp2_session*)>(nullptr, nghttp2_session_del)), d_needProxyProtocol(needProxyProtocol)
   {
@@ -181,6 +182,9 @@ struct DOHConnection
     int rv = nghttp2_submit_response(d_session.get(), streamId, hdrs, sizeof(hdrs) / sizeof(*hdrs), &dataProvider);
     // cerr<<"Submitting response for stream ID "<<streamId<<": "<<rv<<endl;
     BOOST_CHECK_EQUAL(rv, 0);
+    /* just in case, see if we have anything to send */
+    rv = nghttp2_session_send(d_session.get());
+    BOOST_CHECK_EQUAL(rv, 0);
   }
 
   void submitError(uint32_t streamId, uint16_t status, const std::string& msg)
@@ -190,12 +194,18 @@ struct DOHConnection
 
     int rv = nghttp2_submit_response(d_session.get(), streamId, hdrs, sizeof(hdrs) / sizeof(*hdrs), nullptr);
     BOOST_CHECK_EQUAL(rv, 0);
+    /* just in case, see if we have anything to send */
+    rv = nghttp2_session_send(d_session.get());
+    BOOST_CHECK_EQUAL(rv, 0);
   }
 
   void submitGoAway()
   {
     int rv = nghttp2_submit_goaway(d_session.get(), NGHTTP2_FLAG_NONE, 0, NGHTTP2_INTERNAL_ERROR, nullptr, 0);
     BOOST_CHECK_EQUAL(rv, 0);
+    /* just in case, see if we have anything to send */
+    rv = nghttp2_session_send(d_session.get());
+    BOOST_CHECK_EQUAL(rv, 0);
   }
 
 private:
@@ -289,8 +299,6 @@ private:
 
   static int on_stream_close_callback(nghttp2_session* session, int32_t stream_id, uint32_t error_code, void* user_data)
   {
-    //DOHConnection* conn = reinterpret_cast<DOHConnection*>(user_data);
-
     if (error_code == 0) {
       return 0;
     }
@@ -728,9 +736,16 @@ BOOST_FIXTURE_TEST_CASE(test_SingleQuery, TestFixture)
        dynamic_cast<MockupFDMultiplexer*>(s_mplexer.get())->setReady(desc);
      }},
     /* read settings, headers and response from the server */
-    {ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, std::numeric_limits<size_t>::max()},
+    {ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, std::numeric_limits<size_t>::max(), [](int desc, const ExpectedStep& step) {
+       /* set the outgoing descriptor (backend connection) as NOT ready anymore */
+       dynamic_cast<MockupFDMultiplexer*>(s_mplexer.get())->setNotReady(desc);
+     }},
     /* acknowledge settings */
-    {ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, std::numeric_limits<size_t>::max()},
+    {ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, std::numeric_limits<size_t>::max(), [](int desc, const ExpectedStep& step) {
+       s_connectionBuffers.at(desc)->submitGoAway();
+       dynamic_cast<MockupFDMultiplexer*>(s_mplexer.get())->setReady(desc);
+     }},
+    {ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, std::numeric_limits<size_t>::max()},
     {ExpectedStep::ExpectedRequest::closeBackend, IOState::Done},
   };
 
@@ -810,7 +825,11 @@ BOOST_FIXTURE_TEST_CASE(test_ConcurrentQueries, TestFixture)
     /* read settings, headers and responses from the server */
     {ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, std::numeric_limits<size_t>::max()},
     /* acknowledge settings */
-    {ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, std::numeric_limits<size_t>::max()},
+    {ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, std::numeric_limits<size_t>::max(), [](int desc, const ExpectedStep& step) {
+       s_connectionBuffers.at(desc)->submitGoAway();
+       dynamic_cast<MockupFDMultiplexer*>(s_mplexer.get())->setReady(desc);
+     }},
+    {ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, std::numeric_limits<size_t>::max()},
     {ExpectedStep::ExpectedRequest::closeBackend, IOState::Done},
   };
 
@@ -872,6 +891,7 @@ BOOST_FIXTURE_TEST_CASE(test_ConnectionReuse, TestFixture)
     queries.push_back({std::move(sender), std::move(internalQuery)});
   }
 
+  bool firstQueryDone = false;
   s_steps = {
     {ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done},
     /* opening */
@@ -888,16 +908,23 @@ BOOST_FIXTURE_TEST_CASE(test_ConnectionReuse, TestFixture)
     /* read settings, headers and responses from the server */
     {ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, std::numeric_limits<size_t>::max()},
     /* acknowledge settings */
-    {ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, std::numeric_limits<size_t>::max()},
+    {ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, std::numeric_limits<size_t>::max(), [&firstQueryDone](int desc, const ExpectedStep& step) {
+      firstQueryDone = true;
+    }},
     /* headers */
-    {ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, std::numeric_limits<size_t>::max()},
+    {ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, std::numeric_limits<size_t>::max(), [](int desc, const ExpectedStep& step) {
+    }},
     /* data */
     {ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, std::numeric_limits<size_t>::max(), [](int desc, const ExpectedStep& step) {
        /* set the outgoing descriptor (backend connection) as ready */
-       dynamic_cast<MockupFDMultiplexer*>(s_mplexer.get())->setReady(desc);
+      dynamic_cast<MockupFDMultiplexer*>(s_mplexer.get())->setReady(desc);
      }},
     /* read settings, headers and responses from the server */
     {ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, std::numeric_limits<size_t>::max()},
+    /* later the backend sends a go away frame */
+    {ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, std::numeric_limits<size_t>::max(), [](int desc, const ExpectedStep& step) {
+      s_connectionBuffers.at(desc)->submitGoAway();
+     }},
     {ExpectedStep::ExpectedRequest::closeBackend, IOState::Done},
   };
 
@@ -907,11 +934,12 @@ BOOST_FIXTURE_TEST_CASE(test_ConnectionReuse, TestFixture)
     bool result = sendH2Query(backend, s_mplexer, sliced, std::move(query.second), false);
     BOOST_CHECK_EQUAL(result, true);
 
-    while (s_mplexer->getWatchedFDCount(false) != 0 || s_mplexer->getWatchedFDCount(true) != 0) {
+    while (!firstQueryDone && (s_mplexer->getWatchedFDCount(false) != 0 || s_mplexer->getWatchedFDCount(true) != 0)) {
       s_mplexer->run(&now);
     }
 
     BOOST_CHECK_EQUAL(query.first->d_valid, true);
+    BOOST_CHECK_EQUAL(firstQueryDone, true);
   }
 
   {
@@ -992,6 +1020,10 @@ BOOST_FIXTURE_TEST_CASE(test_InvalidDNSAnswer, TestFixture)
     {ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, std::numeric_limits<size_t>::max()},
     /* acknowledge settings */
     {ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, std::numeric_limits<size_t>::max()},
+    /* try to read, the backend says to go away */
+    {ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, std::numeric_limits<size_t>::max(), [](int desc, const ExpectedStep& step) {
+      s_connectionBuffers.at(desc)->submitGoAway();
+     }},
     {ExpectedStep::ExpectedRequest::closeBackend, IOState::Done},
   };
 
@@ -1222,6 +1254,7 @@ BOOST_FIXTURE_TEST_CASE(test_ShortWrite, TestFixture)
     queries.push_back({std::move(sender), std::move(internalQuery)});
   }
 
+  bool done = false;
   s_steps = {
     {ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done},
     /* opening */
@@ -1240,7 +1273,11 @@ BOOST_FIXTURE_TEST_CASE(test_ShortWrite, TestFixture)
     /* read settings, headers and responses from the server */
     {ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, std::numeric_limits<size_t>::max()},
     /* acknowledge settings */
-    {ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, std::numeric_limits<size_t>::max()},
+    {ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, std::numeric_limits<size_t>::max(), [&done](int desc, const ExpectedStep& step) {
+      /* mark backend as not ready */
+      dynamic_cast<MockupFDMultiplexer*>(s_mplexer.get())->setNotReady(desc);
+      done = true;
+    }},
     {ExpectedStep::ExpectedRequest::closeBackend, IOState::Done},
   };
 
@@ -1250,7 +1287,7 @@ BOOST_FIXTURE_TEST_CASE(test_ShortWrite, TestFixture)
     BOOST_CHECK_EQUAL(result, true);
   }
 
-  while (s_mplexer->getWatchedFDCount(false) != 0 || s_mplexer->getWatchedFDCount(true) != 0) {
+  while (!done && (s_mplexer->getWatchedFDCount(false) != 0 || s_mplexer->getWatchedFDCount(true) != 0)) {
     s_mplexer->run(&now);
   }
 
@@ -1304,6 +1341,7 @@ BOOST_FIXTURE_TEST_CASE(test_ShortRead, TestFixture)
     queries.push_back({std::move(sender), std::move(internalQuery)});
   }
 
+  bool done = false;
   s_steps = {
     {ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done},
     /* opening */
@@ -1329,7 +1367,11 @@ BOOST_FIXTURE_TEST_CASE(test_ShortRead, TestFixture)
     /* read settings, headers and responses (second attempt) */
     {ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, std::numeric_limits<size_t>::max()},
     /* acknowledge settings */
-    {ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, std::numeric_limits<size_t>::max()},
+    {ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, std::numeric_limits<size_t>::max(), [&done](int desc, const ExpectedStep& step) {
+      /* mark backend as not ready */
+      dynamic_cast<MockupFDMultiplexer*>(s_mplexer.get())->setNotReady(desc);
+      done = true;
+    }},
     {ExpectedStep::ExpectedRequest::closeBackend, IOState::Done},
   };
 
@@ -1339,7 +1381,7 @@ BOOST_FIXTURE_TEST_CASE(test_ShortRead, TestFixture)
     BOOST_CHECK_EQUAL(result, true);
   }
 
-  while (s_mplexer->getWatchedFDCount(false) != 0 || s_mplexer->getWatchedFDCount(true) != 0) {
+  while (!done && (s_mplexer->getWatchedFDCount(false) != 0 || s_mplexer->getWatchedFDCount(true) != 0)) {
     s_mplexer->run(&now);
   }
 
@@ -1479,6 +1521,7 @@ BOOST_FIXTURE_TEST_CASE(test_ConnectionClosedWhileWriting, TestFixture)
     queries.push_back({std::move(sender), std::move(internalQuery)});
   }
 
+  bool done = false;
   s_steps = {
     {ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done},
     /* opening */
@@ -1503,7 +1546,11 @@ BOOST_FIXTURE_TEST_CASE(test_ConnectionClosedWhileWriting, TestFixture)
     /* read settings, headers and response from the server */
     {ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, std::numeric_limits<size_t>::max()},
     /* acknowledge settings */
-    {ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, std::numeric_limits<size_t>::max()},
+    {ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, std::numeric_limits<size_t>::max(), [&done](int desc, const ExpectedStep& step) {
+      /* mark backend as not ready */
+      dynamic_cast<MockupFDMultiplexer*>(s_mplexer.get())->setNotReady(desc);
+      done = true;
+    }},
     {ExpectedStep::ExpectedRequest::closeBackend, IOState::Done},
   };
 
@@ -1513,7 +1560,7 @@ BOOST_FIXTURE_TEST_CASE(test_ConnectionClosedWhileWriting, TestFixture)
     BOOST_CHECK_EQUAL(result, true);
   }
 
-  while (s_mplexer->getWatchedFDCount(false) != 0 || s_mplexer->getWatchedFDCount(true) != 0) {
+  while (!done && (s_mplexer->getWatchedFDCount(false) != 0 || s_mplexer->getWatchedFDCount(true) != 0)) {
     s_mplexer->run(&now);
   }
 
@@ -1679,6 +1726,7 @@ BOOST_FIXTURE_TEST_CASE(test_HTTP500FromServer, TestFixture)
     queries.push_back({std::move(sender), std::move(internalQuery)});
   }
 
+  bool done = false;
   s_steps = {
     {ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done},
     /* opening */
@@ -1702,7 +1750,11 @@ BOOST_FIXTURE_TEST_CASE(test_HTTP500FromServer, TestFixture)
     /* read settings, headers and responses from the server */
     {ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, std::numeric_limits<size_t>::max()},
     /* acknowledge settings */
-    {ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, std::numeric_limits<size_t>::max()},
+    {ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, std::numeric_limits<size_t>::max(), [&done](int desc, const ExpectedStep& step) {
+      /* mark backend as not ready */
+      dynamic_cast<MockupFDMultiplexer*>(s_mplexer.get())->setNotReady(desc);
+      done = true;
+    }},
     {ExpectedStep::ExpectedRequest::closeBackend, IOState::Done},
   };
 
@@ -1712,7 +1764,7 @@ BOOST_FIXTURE_TEST_CASE(test_HTTP500FromServer, TestFixture)
     BOOST_CHECK_EQUAL(result, true);
   }
 
-  while (s_mplexer->getWatchedFDCount(false) != 0 || s_mplexer->getWatchedFDCount(true) != 0) {
+  while (!done && (s_mplexer->getWatchedFDCount(false) != 0 || s_mplexer->getWatchedFDCount(true) != 0)) {
     s_mplexer->run(&now);
   }
 
@@ -1921,6 +1973,8 @@ BOOST_FIXTURE_TEST_CASE(test_ProxyProtocol, TestFixture)
   for (auto& query : queries) {
     BOOST_CHECK_EQUAL(query.first->d_valid, true);
   }
+
+  BOOST_CHECK_EQUAL(clearH2Connections(), 0U);
 }
 
 BOOST_AUTO_TEST_SUITE_END();