finally:
_cleanup_sockets(client_socket, server_socket)
+ @skip_if_not_supported
+ @unittest.skipIf(
+ sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
+ "Test only runs on Linux with process_vm_readv support",
+ )
+ def test_async_global_awaited_by_from_non_main_thread(self):
+ port = find_unused_port()
+ script = textwrap.dedent(
+ f"""\
+ import asyncio
+ import socket
+ import threading
+ import time
+
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.connect(('localhost', {port}))
+
+ async def worker_main():
+ task = asyncio.create_task(
+ asyncio.sleep(10_000),
+ name="worker task",
+ )
+ await asyncio.sleep(0)
+ sock.sendall(f"ready:{{threading.get_native_id()}}\\n".encode())
+ await task
+
+ def run_worker_loop():
+ asyncio.run(worker_main())
+
+ threading.Thread(
+ target=run_worker_loop,
+ name="async-worker",
+ daemon=True,
+ ).start()
+ time.sleep(10_000)
+ """
+ )
+
+ with os_helper.temp_dir() as work_dir:
+ script_dir = os.path.join(work_dir, "script_pkg")
+ os.mkdir(script_dir)
+
+ server_socket = _create_server_socket(port)
+ script_name = _make_test_script(script_dir, "script", script)
+ client_socket = None
+
+ try:
+ with _managed_subprocess([sys.executable, script_name]) as p:
+ client_socket, _ = server_socket.accept()
+ server_socket.close()
+ server_socket = None
+
+ response = _wait_for_signal(client_socket, b"ready:")
+ worker_thread_id = int(
+ response.split(b"ready:", 1)[1].splitlines()[0]
+ )
+
+ for _ in busy_retry(SHORT_TIMEOUT):
+ all_awaited_by = get_all_awaited_by(p.pid)
+ if any(
+ task.task_name == "worker task"
+ for info in all_awaited_by
+ if info.thread_id == worker_thread_id
+ for task in info.awaited_by
+ ):
+ break
+ else:
+ self.fail(
+ "get_all_awaited_by() did not report "
+ "the asyncio task from the non-main thread"
+ )
+ finally:
+ _cleanup_sockets(client_socket, server_socket)
+
+ @skip_if_not_supported
+ @unittest.skipIf(
+ sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
+ "Test only runs on Linux with process_vm_readv support",
+ )
+ def test_async_remote_stack_trace_from_non_main_thread(self):
+ port = find_unused_port()
+ script = textwrap.dedent(
+ f"""\
+ import asyncio
+ import socket
+ import threading
+ import time
+
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.connect(('localhost', {port}))
+
+ def blocking_call():
+ sock.sendall(f"ready:{{threading.get_native_id()}}\\n".encode())
+ time.sleep(10_000)
+
+ async def worker_task():
+ await asyncio.sleep(0)
+ blocking_call()
+
+ async def worker_main():
+ task = asyncio.create_task(
+ worker_task(),
+ name="worker task",
+ )
+ await task
+
+ def run_worker_loop():
+ asyncio.run(worker_main())
+
+ threading.Thread(
+ target=run_worker_loop,
+ name="async-worker",
+ daemon=True,
+ ).start()
+ time.sleep(10_000)
+ """
+ )
+
+ with os_helper.temp_dir() as work_dir:
+ script_dir = os.path.join(work_dir, "script_pkg")
+ os.mkdir(script_dir)
+
+ server_socket = _create_server_socket(port)
+ script_name = _make_test_script(script_dir, "script", script)
+ client_socket = None
+
+ try:
+ with _managed_subprocess([sys.executable, script_name]) as p:
+ client_socket, _ = server_socket.accept()
+ server_socket.close()
+ server_socket = None
+
+ response = _wait_for_signal(client_socket, b"ready:")
+ worker_thread_id = int(
+ response.split(b"ready:", 1)[1].splitlines()[0]
+ )
+
+ for _ in busy_retry(SHORT_TIMEOUT):
+ stack_trace = get_async_stack_trace(p.pid)
+ if any(
+ task.task_name == "worker task"
+ for info in stack_trace
+ if info.thread_id == worker_thread_id
+ for task in info.awaited_by
+ ):
+ break
+ else:
+ self.fail(
+ "get_async_stack_trace() did not report "
+ "the running asyncio task from the non-main thread"
+ )
+ finally:
+ _cleanup_sockets(client_socket, server_socket)
+
@skip_if_not_supported
@unittest.skipIf(
sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,