class _Done(Exception):
pass
-async def staggered_race(coro_fns, delay):
+async def staggered_race(coro_fns, delay, *, loop=None):
"""Run coroutines with staggered start times and take the first to finish.
This method takes an iterable of coroutine functions. The first one is
raise _Done
try:
- async with taskgroups.TaskGroup() as tg:
+ tg = taskgroups.TaskGroup()
+ # Intentionally override the loop in the TaskGroup to avoid
+ # using the running loop, preserving backwards compatibility
+ # TaskGroup only starts using `_loop` after `__aenter__`
+ # so overriding it here is safe.
+ tg._loop = loop
+ async with tg:
for this_index, coro_fn in enumerate(coro_fns):
this_failed = locks.Event()
exceptions.append(None)
self.assertIsInstance(excs[0], ValueError)
self.assertIsNone(excs[1])
+ def test_loop_argument(self):
+ loop = asyncio.new_event_loop()
+ async def coro():
+ self.assertEqual(loop, asyncio.get_running_loop())
+ return 'coro'
+
+ async def main():
+ winner, index, excs = await staggered_race(
+ [coro],
+ delay=0.1,
+ loop=loop
+ )
+
+ self.assertEqual(winner, 'coro')
+ self.assertEqual(index, 0)
+
+ loop.run_until_complete(main())
+ loop.close()
+
if __name__ == "__main__":
unittest.main()