return self._ssl_protocol._app_protocol
def is_closing(self):
- return self._closed
+ return self._closed or self._ssl_protocol._is_transport_closing()
def close(self):
"""Close the transport.
self._app_transport_created = True
return self._app_transport
+ def _is_transport_closing(self):
+ return self._transport is not None and self._transport.is_closing()
+
def connection_made(self, transport):
"""Called when the low-level connection is made.
test_utils.run_briefly(self.loop)
self.assertIsInstance(waiter.exception(), ConnectionAbortedError)
+ def test_connection_lost_when_busy(self):
+ # gh-118950: SSLProtocol.connection_lost not being called when OSError
+ # is thrown on asyncio.write.
+ sock = mock.Mock()
+ sock.fileno = mock.Mock(return_value=12345)
+ sock.send = mock.Mock(side_effect=BrokenPipeError)
+
+ # construct StreamWriter chain that contains loop dependant logic this emulates
+ # what _make_ssl_transport() does in BaseSelectorEventLoop
+ reader = asyncio.StreamReader(limit=2 ** 16, loop=self.loop)
+ protocol = asyncio.StreamReaderProtocol(reader, loop=self.loop)
+ ssl_proto = self.ssl_protocol(proto=protocol)
+
+ # emulate reading decompressed data
+ sslobj = mock.Mock()
+ sslobj.read.side_effect = ssl.SSLWantReadError
+ sslobj.write.side_effect = ssl.SSLWantReadError
+ ssl_proto._sslobj = sslobj
+
+ # emulate outgoing data
+ data = b'An interesting message'
+
+ outgoing = mock.Mock()
+ outgoing.read = mock.Mock(return_value=data)
+ outgoing.pending = len(data)
+ ssl_proto._outgoing = outgoing
+
+ # use correct socket transport to initialize the SSLProtocol
+ self.loop._make_socket_transport(sock, ssl_proto)
+
+ transport = ssl_proto._app_transport
+ writer = asyncio.StreamWriter(transport, protocol, reader, self.loop)
+
+ async def main():
+ # writes data to transport
+ async def write():
+ writer.write(data)
+ await writer.drain()
+
+ # try to write for the first time
+ await write()
+ # try to write for the second time, this raises as the connection_lost
+ # callback should be done with error
+ with self.assertRaises(ConnectionResetError):
+ await write()
+
+ self.loop.run_until_complete(main())
+
def test_close_during_handshake(self):
# bpo-29743 Closing transport during handshake process leaks socket
waiter = self.loop.create_future()