from . import exceptions as exceptions_mod
from . import locks
from . import tasks
+from . import futures
async def staggered_race(coro_fns, delay, *, loop=None):
"""
# TODO: when we have aiter() and anext(), allow async iterables in coro_fns.
loop = loop or events.get_running_loop()
+ parent_task = tasks.current_task(loop)
enum_coro_fns = enumerate(coro_fns)
winner_result = None
winner_index = None
def task_done(task):
running_tasks.discard(task)
+ futures.future_discard_from_awaited_by(task, parent_task)
if (
on_completed_fut is not None
and not on_completed_fut.done()
this_failed = locks.Event()
next_ok_to_start = locks.Event()
next_task = loop.create_task(run_one_coro(next_ok_to_start, this_failed))
+ futures.future_add_to_awaited_by(next_task, parent_task)
running_tasks.add(next_task)
next_task.add_done_callback(task_done)
# next_task has been appended to running_tasks so next_task is ok to
try:
ok_to_start = locks.Event()
first_task = loop.create_task(run_one_coro(ok_to_start, None))
+ futures.future_add_to_awaited_by(first_task, parent_task)
running_tasks.add(first_task)
first_task.add_done_callback(task_done)
# first_task has been appended to running_tasks so first_task is ok to start.
raise propagate_cancellation_error
return winner_result, winner_index, exceptions
finally:
- del exceptions, propagate_cancellation_error, unhandled_exceptions
+ del exceptions, propagate_cancellation_error, unhandled_exceptions, parent_task
]
self.assertEqual(stack_trace, expected_stack_trace)
+ @unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux",
+ "Test only runs on Linux and MacOS")
+ @unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
+ "Test only runs on Linux with process_vm_readv support")
+ def test_async_staggered_race_remote_stack_trace(self):
+ # Spawn a process with some realistic Python code
+ script = textwrap.dedent("""\
+ import asyncio.staggered
+ import time
+ import sys
+
+ async def deep():
+ await asyncio.sleep(0)
+ fifo_path = sys.argv[1]
+ with open(fifo_path, "w") as fifo:
+ fifo.write("ready")
+ time.sleep(10000)
+
+ async def c1():
+ await asyncio.sleep(0)
+ await deep()
+
+ async def c2():
+ await asyncio.sleep(10000)
+
+ async def main():
+ await asyncio.staggered.staggered_race(
+ [c1, c2],
+ delay=None,
+ )
+
+ asyncio.run(main())
+ """)
+ stack_trace = None
+ with os_helper.temp_dir() as work_dir:
+ script_dir = os.path.join(work_dir, "script_pkg")
+ os.mkdir(script_dir)
+ fifo = f"{work_dir}/the_fifo"
+ os.mkfifo(fifo)
+ script_name = _make_test_script(script_dir, 'script', script)
+ try:
+ p = subprocess.Popen([sys.executable, script_name, str(fifo)])
+ with open(fifo, "r") as fifo_file:
+ response = fifo_file.read()
+ self.assertEqual(response, "ready")
+ stack_trace = get_async_stack_trace(p.pid)
+ except PermissionError:
+ self.skipTest(
+ "Insufficient permissions to read the stack trace")
+ finally:
+ os.remove(fifo)
+ p.kill()
+ p.terminate()
+ p.wait(timeout=SHORT_TIMEOUT)
+
+ # sets are unordered, so we want to sort "awaited_by"s
+ stack_trace[2].sort(key=lambda x: x[1])
+
+ expected_stack_trace = [
+ ['deep', 'c1', 'run_one_coro'], 'Task-2', [[['main'], 'Task-1', []]]
+ ]
+ self.assertEqual(stack_trace, expected_stack_trace)
+
@unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux",
"Test only runs on Linux and MacOS")
@unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,