else:
task = self._loop.create_task(coro, name=name, context=context)
- # optimization: Immediately call the done callback if the task is
+ # Always schedule the done callback even if the task is
# already done (e.g. if the coro was able to complete eagerly),
- # and skip scheduling a done callback
- if task.done():
- self._on_task_done(task)
- else:
- self._tasks.add(task)
- task.add_done_callback(self._on_task_done)
+ # otherwise if the task completes with an exception then it will cancel
+ # the current task too early. gh-128550, gh-128588
+ self._tasks.add(task)
+ task.add_done_callback(self._on_task_done)
try:
return task
finally:
self.assertListEqual(gc.get_referrers(exc), [])
+ async def test_cancels_task_if_created_during_creation(self):
+ # regression test for gh-128550
+ ran = False
+ class MyError(Exception):
+ pass
+
+ exc = None
+ try:
+ async with asyncio.TaskGroup() as tg:
+ async def third_task():
+ raise MyError("third task failed")
+
+ async def second_task():
+ nonlocal ran
+ tg.create_task(third_task())
+ with self.assertRaises(asyncio.CancelledError):
+ await asyncio.sleep(0) # eager tasks cancel here
+ await asyncio.sleep(0) # lazy tasks cancel here
+ ran = True
+
+ tg.create_task(second_task())
+ except* MyError as excs:
+ exc = excs.exceptions[0]
+
+ self.assertTrue(ran)
+ self.assertIsInstance(exc, MyError)
+
+
+ async def test_cancellation_does_not_leak_out_of_tg(self):
+ class MyError(Exception):
+ pass
+
+ async def throw_error():
+ raise MyError
+
+ try:
+ async with asyncio.TaskGroup() as tg:
+ tg.create_task(throw_error())
+ except* MyError:
+ pass
+ else:
+ self.fail("should have raised one MyError in group")
+
+ # if this test fails this current task will be cancelled
+ # outside the task group and inside unittest internals
+ # we yield to the event loop with sleep(0) so that
+ # cancellation happens here and error is more understandable
+ await asyncio.sleep(0)
+
+
class TestTaskGroup(BaseTestTaskGroup, unittest.IsolatedAsyncioTestCase):
loop_factory = asyncio.EventLoop