self._handshake_complete = trio.Event()
self._run_done = trio.Event()
self._worker_scope = None
+ self._send_pending = False
async def _worker(self):
try:
await self._socket.connect(self._peer)
while not self._done:
(expiration, interval) = self._get_timer_values(False)
+ if self._send_pending:
+ # Do not block forever if sends are pending. Even though we
+ # have a wake-up mechanism if we've already started the blocking
+ # read, the possibility of context switching in send means that
+ # more writes can happen while we have no wake up context, so
+ # we need self._send_pending to avoid (effectively) a "lost wakeup"
+ # race.
+ interval = 0.0
with trio.CancelScope(
deadline=trio.current_time() + interval
) as self._worker_scope:
self._connection.receive_datagram(datagram, self._peer, time.time())
self._worker_scope = None
self._handle_timer(expiration)
+ await self._handle_events()
+ # We clear this now, before sending anything, as sending can cause
+ # context switches that do more sends. We want to know if that
+ # happens so we don't block a long time on the recv() above.
+ self._send_pending = False
datagrams = self._connection.datagrams_to_send(time.time())
for datagram, _ in datagrams:
await self._socket.send(datagram)
- await self._handle_events()
finally:
self._done = True
self._handshake_complete.set()
async def write(self, stream, data, is_end=False):
self._connection.send_stream_data(stream, data, is_end)
+ self._send_pending = True
if self._worker_scope is not None:
self._worker_scope.cancel()
self._manager.closed(self._peer[0], self._peer[1])
self._closed = True
self._connection.close()
+ self._send_pending = True
if self._worker_scope is not None:
self._worker_scope.cancel()
await self._run_done.wait()