task.cancel()
loop.run_until_complete(
- asyncio.gather(*to_cancel, loop=loop, return_exceptions=True))
+ asyncio.gather(*to_cancel, return_exceptions=True))
for task in to_cancel:
if task.cancelled():
output = test.run()
self.assertFalse(output.wasSuccessful())
+ def test_cancellation_hanging_tasks(self):
+ cancelled = False
+ class Test(unittest.IsolatedAsyncioTestCase):
+ async def test_leaking_task(self):
+ async def coro():
+ nonlocal cancelled
+ try:
+ await asyncio.sleep(1)
+ except asyncio.CancelledError:
+ cancelled = True
+ raise
+
+ # Leave this running in the background
+ asyncio.create_task(coro())
+
+ test = Test("test_leaking_task")
+ output = test.run()
+ self.assertTrue(cancelled)
+
+
if __name__ == "__main__":