From 277ee25d7c8851d2c2cd1e683cfd86e786ba9317 Mon Sep 17 00:00:00 2001 From: Bob Halley Date: Fri, 27 Oct 2023 08:55:10 -0700 Subject: [PATCH] Fix a race condition in trio quic shutdown. It was possible to have a "lost wakeup" situation where we had stuff to send but the trio worker was blocked indefinitely in the receive. There is no test for this as the race is very race-y and I can't reproduce it reliably in the test suite, though I was able to do reliable replication a different way when debugging. I also reordered event processing to happen after timer handling but before sending in the trio and sync quic code. The async code already worked this way due to its different struture and needed no changes. --- dns/quic/_sync.py | 3 ++- dns/quic/_trio.py | 17 ++++++++++++++++- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/dns/quic/_sync.py b/dns/quic/_sync.py index 25dd6d6e..a71ac67c 100644 --- a/dns/quic/_sync.py +++ b/dns/quic/_sync.py @@ -128,6 +128,8 @@ class SyncQuicConnection(BaseQuicConnection): key.data() with self._lock: self._handle_timer(expiration) + self._handle_events() + with self._lock: datagrams = self._connection.datagrams_to_send(time.time()) for datagram, _ in datagrams: try: @@ -135,7 +137,6 @@ class SyncQuicConnection(BaseQuicConnection): except BlockingIOError: # we let QUIC handle any lossage pass - self._handle_events() finally: with self._lock: self._done = True diff --git a/dns/quic/_trio.py b/dns/quic/_trio.py index 1ee702b5..0ff0497e 100644 --- a/dns/quic/_trio.py +++ b/dns/quic/_trio.py @@ -81,12 +81,21 @@ class TrioQuicConnection(AsyncQuicConnection): self._handshake_complete = trio.Event() self._run_done = trio.Event() self._worker_scope = None + self._send_pending = False async def _worker(self): try: await self._socket.connect(self._peer) while not self._done: (expiration, interval) = self._get_timer_values(False) + if self._send_pending: + # Do not block forever if sends are pending. Even though we + # have a wake-up mechanism if we've already started the blocking + # read, the possibility of context switching in send means that + # more writes can happen while we have no wake up context, so + # we need self._send_pending to avoid (effectively) a "lost wakeup" + # race. + interval = 0.0 with trio.CancelScope( deadline=trio.current_time() + interval ) as self._worker_scope: @@ -94,10 +103,14 @@ class TrioQuicConnection(AsyncQuicConnection): self._connection.receive_datagram(datagram, self._peer, time.time()) self._worker_scope = None self._handle_timer(expiration) + await self._handle_events() + # We clear this now, before sending anything, as sending can cause + # context switches that do more sends. We want to know if that + # happens so we don't block a long time on the recv() above. + self._send_pending = False datagrams = self._connection.datagrams_to_send(time.time()) for datagram, _ in datagrams: await self._socket.send(datagram) - await self._handle_events() finally: self._done = True self._handshake_complete.set() @@ -129,6 +142,7 @@ class TrioQuicConnection(AsyncQuicConnection): async def write(self, stream, data, is_end=False): self._connection.send_stream_data(stream, data, is_end) + self._send_pending = True if self._worker_scope is not None: self._worker_scope.cancel() @@ -159,6 +173,7 @@ class TrioQuicConnection(AsyncQuicConnection): self._manager.closed(self._peer[0], self._peer[1]) self._closed = True self._connection.close() + self._send_pending = True if self._worker_scope is not None: self._worker_scope.cancel() await self._run_done.wait() -- 2.47.3