reuse_address=False)
with self.assertWarns(DeprecationWarning):
- self.loop.run_until_complete(coro)
+ transport, protocol = self.loop.run_until_complete(coro)
+ transport.close()
+ self.loop.run_until_complete(protocol.done)
+ self.assertEqual('CLOSED', protocol.state)
@patch_socket
def test_create_datagram_endpoint_nosoreuseport(self, m_socket):
coro = self.loop.create_datagram_endpoint(
lambda: MyDatagramProto(loop=self.loop),
local_addr=('127.0.0.1', 0),
- reuse_address=False,
reuse_port=True)
self.assertRaises(ValueError, self.loop.run_until_complete, coro)
coro = self.loop.create_datagram_endpoint(
lambda: MyDatagramProto(loop=self.loop),
local_addr=('1.2.3.4', 0),
- reuse_address=False,
reuse_port=reuseport_supported)
t, p = self.loop.run_until_complete(coro)