# is established.
self._strong_reader = stream_reader
self._reject_connection = False
- self._stream_writer = None
self._task = None
self._transport = None
self._client_connected_cb = client_connected_cb
return None
return self._stream_reader_wr()
- def _replace_writer(self, writer):
+ def _replace_transport(self, transport):
loop = self._loop
- transport = writer.transport
- self._stream_writer = writer
self._transport = transport
self._over_ssl = transport.get_extra_info('sslcontext') is not None
reader.set_transport(transport)
self._over_ssl = transport.get_extra_info('sslcontext') is not None
if self._client_connected_cb is not None:
- self._stream_writer = StreamWriter(transport, self,
- reader,
- self._loop)
- res = self._client_connected_cb(reader,
- self._stream_writer)
+ writer = StreamWriter(transport, self, reader, self._loop)
+ res = self._client_connected_cb(reader, writer)
if coroutines.iscoroutine(res):
def callback(task):
if task.cancelled():
ssl_handshake_timeout=ssl_handshake_timeout,
ssl_shutdown_timeout=ssl_shutdown_timeout)
self._transport = new_transport
- protocol._replace_writer(self)
+ protocol._replace_transport(new_transport)
def __del__(self, warnings=warnings):
if not self._transport.is_closing():
self.assertEqual(messages, [])
+ def test_unclosed_server_resource_warnings(self):
+ async def inner(rd, wr):
+ fut.set_result(True)
+ with self.assertWarns(ResourceWarning) as cm:
+ del wr
+ gc.collect()
+ self.assertEqual(len(cm.warnings), 1)
+ self.assertTrue(str(cm.warnings[0].message).startswith("unclosed <StreamWriter"))
+
+ async def outer():
+ srv = await asyncio.start_server(inner, socket_helper.HOSTv4, 0)
+ async with srv:
+ addr = srv.sockets[0].getsockname()
+ with socket.create_connection(addr):
+ # Give the loop some time to notice the connection
+ await fut
+
+ messages = []
+ self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
+
+ fut = self.loop.create_future()
+ self.loop.run_until_complete(outer())
+
+ self.assertEqual(messages, [])
+
def _basetest_unhandled_exceptions(self, handle_echo):
port = socket_helper.find_unused_port()