__all__ = 'staggered_race',
import contextlib
-import typing
from . import events
from . import exceptions as exceptions_mod
from . import tasks
-async def staggered_race(
- coro_fns: typing.Iterable[typing.Callable[[], typing.Awaitable]],
- delay: typing.Optional[float],
- *,
- loop: events.AbstractEventLoop = None,
-) -> typing.Tuple[
- typing.Any,
- typing.Optional[int],
- typing.List[typing.Optional[Exception]]
-]:
+async def staggered_race(coro_fns, delay, *, loop=None):
"""Run coroutines with staggered start times and take the first to finish.
This method takes an iterable of coroutine functions. The first one is