import textwrap
import importlib
import sys
-from test.support import os_helper, SHORT_TIMEOUT
+from test.support import os_helper, SHORT_TIMEOUT, busy_retry
from test.support.script_helper import make_script
import subprocess
from _testexternalinspection import PROCESS_VM_READV_SUPPORTED
from _testexternalinspection import get_stack_trace
from _testexternalinspection import get_async_stack_trace
+ from _testexternalinspection import get_all_awaited_by
except ImportError:
raise unittest.SkipTest(
"Test only runs when _testexternalinspection is available")
]
self.assertEqual(stack_trace, expected_stack_trace)
+ @unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux",
+ "Test only runs on Linux and MacOS")
+ @unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
+ "Test only runs on Linux with process_vm_readv support")
+ def test_async_global_awaited_by(self):
+ script = textwrap.dedent("""\
+ import asyncio
+ import os
+ import random
+ import sys
+ from string import ascii_lowercase, digits
+ from test.support import socket_helper, SHORT_TIMEOUT
+
+ HOST = '127.0.0.1'
+ PORT = socket_helper.find_unused_port()
+ connections = 0
+
+ class EchoServerProtocol(asyncio.Protocol):
+ def connection_made(self, transport):
+ global connections
+ connections += 1
+ self.transport = transport
+
+ def data_received(self, data):
+ self.transport.write(data)
+ self.transport.close()
+
+ async def echo_client(message):
+ reader, writer = await asyncio.open_connection(HOST, PORT)
+ writer.write(message.encode())
+ await writer.drain()
+
+ data = await reader.read(100)
+ assert message == data.decode()
+ writer.close()
+ await writer.wait_closed()
+ await asyncio.sleep(SHORT_TIMEOUT)
+
+ async def echo_client_spam(server):
+ async with asyncio.TaskGroup() as tg:
+ while connections < 1000:
+ msg = list(ascii_lowercase + digits)
+ random.shuffle(msg)
+ tg.create_task(echo_client("".join(msg)))
+ await asyncio.sleep(0)
+ # at least a 1000 tasks created
+ fifo_path = sys.argv[1]
+ with open(fifo_path, "w") as fifo:
+ fifo.write("ready")
+ # at this point all client tasks completed without assertion errors
+ # let's wrap up the test
+ server.close()
+ await server.wait_closed()
+
+ async def main():
+ loop = asyncio.get_running_loop()
+ server = await loop.create_server(EchoServerProtocol, HOST, PORT)
+ async with server:
+ async with asyncio.TaskGroup() as tg:
+ tg.create_task(server.serve_forever(), name="server task")
+ tg.create_task(echo_client_spam(server), name="echo client spam")
+
+ asyncio.run(main())
+ """)
+ stack_trace = None
+ with os_helper.temp_dir() as work_dir:
+ script_dir = os.path.join(work_dir, "script_pkg")
+ os.mkdir(script_dir)
+ fifo = f"{work_dir}/the_fifo"
+ os.mkfifo(fifo)
+ script_name = _make_test_script(script_dir, 'script', script)
+ try:
+ p = subprocess.Popen([sys.executable, script_name, str(fifo)])
+ with open(fifo, "r") as fifo_file:
+ response = fifo_file.read()
+ self.assertEqual(response, "ready")
+ for _ in busy_retry(SHORT_TIMEOUT):
+ try:
+ all_awaited_by = get_all_awaited_by(p.pid)
+ except RuntimeError as re:
+ # This call reads a linked list in another process with
+ # no synchronization. That occasionally leads to invalid
+ # reads. Here we avoid making the test flaky.
+ msg = str(re)
+ if msg.startswith("Task list appears corrupted"):
+ continue
+ elif msg.startswith("Invalid linked list structure reading remote memory"):
+ continue
+ elif msg.startswith("Unknown error reading memory"):
+ continue
+ elif msg.startswith("Unhandled frame owner"):
+ continue
+ raise # Unrecognized exception, safest not to ignore it
+ else:
+ break
+ # expected: a list of two elements: 1 thread, 1 interp
+ self.assertEqual(len(all_awaited_by), 2)
+ # expected: a tuple with the thread ID and the awaited_by list
+ self.assertEqual(len(all_awaited_by[0]), 2)
+ # expected: no tasks in the fallback per-interp task list
+ self.assertEqual(all_awaited_by[1], (0, []))
+ entries = all_awaited_by[0][1]
+ # expected: at least 1000 pending tasks
+ self.assertGreaterEqual(len(entries), 1000)
+ # the first three tasks stem from the code structure
+ self.assertIn(('Task-1', []), entries)
+ self.assertIn(('server task', [[['main'], 'Task-1', []]]), entries)
+ self.assertIn(('echo client spam', [[['main'], 'Task-1', []]]), entries)
+ # the final task will have some random number, but it should for
+ # sure be one of the echo client spam horde
+ self.assertEqual([[['echo_client_spam'], 'echo client spam', [[['main'], 'Task-1', []]]]], entries[-1][1])
+ except PermissionError:
+ self.skipTest(
+ "Insufficient permissions to read the stack trace")
+ finally:
+ os.remove(fifo)
+ p.kill()
+ p.terminate()
+ p.wait(timeout=SHORT_TIMEOUT)
+
@unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux",
"Test only runs on Linux and MacOS")
@unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
--- /dev/null
+Add ability to externally inspect all pending asyncio tasks, even if no task
+is currently entered on the event loop.
uint64_t task_is_task;
uint64_t task_awaited_by_is_set;
uint64_t task_coro;
+ uint64_t task_node;
} asyncio_task_object;
+ struct _asyncio_interpreter_state {
+ uint64_t size;
+ uint64_t asyncio_tasks_head;
+ } asyncio_interpreter_state;
struct _asyncio_thread_state {
uint64_t size;
uint64_t asyncio_running_loop;
uint64_t asyncio_running_task;
+ uint64_t asyncio_tasks_head;
} asyncio_thread_state;
} Py_AsyncioModuleDebugOffsets;
.task_is_task = offsetof(TaskObj, task_is_task),
.task_awaited_by_is_set = offsetof(TaskObj, task_awaited_by_is_set),
.task_coro = offsetof(TaskObj, task_coro),
+ .task_node = offsetof(TaskObj, task_node),
+ },
+ .asyncio_interpreter_state = {
+ .size = sizeof(PyInterpreterState),
+ .asyncio_tasks_head = offsetof(PyInterpreterState, asyncio_tasks_head),
},
.asyncio_thread_state = {
.size = sizeof(_PyThreadStateImpl),
.asyncio_running_loop = offsetof(_PyThreadStateImpl, asyncio_running_loop),
.asyncio_running_task = offsetof(_PyThreadStateImpl, asyncio_running_task),
+ .asyncio_tasks_head = offsetof(_PyThreadStateImpl, asyncio_tasks_head),
}};
/* State of the _asyncio module */
#include <internal/pycore_debug_offsets.h> // _Py_DebugOffsets
#include <internal/pycore_frame.h> // FRAME_SUSPENDED_YIELD_FROM
#include <internal/pycore_interpframe.h> // FRAME_OWNED_BY_CSTACK
+#include <internal/pycore_llist.h> // struct llist_node
#include <internal/pycore_stackref.h> // Py_TAG_BITS
#ifndef HAVE_PROCESS_VM_READV
uint64_t task_is_task;
uint64_t task_awaited_by_is_set;
uint64_t task_coro;
+ uint64_t task_node;
} asyncio_task_object;
+ struct _asyncio_interpreter_state {
+ uint64_t size;
+ uint64_t asyncio_tasks_head;
+ } asyncio_interpreter_state;
struct _asyncio_thread_state {
uint64_t size;
uint64_t asyncio_running_loop;
uint64_t asyncio_running_task;
+ uint64_t asyncio_tasks_head;
} asyncio_thread_state;
};
return 0;
}
+static int
+append_awaited_by_for_thread(
+ int pid,
+ uintptr_t head_addr,
+ struct _Py_DebugOffsets *debug_offsets,
+ struct _Py_AsyncioModuleDebugOffsets *async_offsets,
+ PyObject *result
+) {
+ struct llist_node task_node;
+
+ if (0 > read_memory(
+ pid,
+ head_addr,
+ sizeof(task_node),
+ &task_node))
+ {
+ return -1;
+ }
+
+ size_t iteration_count = 0;
+ const size_t MAX_ITERATIONS = 2 << 15; // A reasonable upper bound
+ while ((uintptr_t)task_node.next != head_addr) {
+ if (++iteration_count > MAX_ITERATIONS) {
+ PyErr_SetString(PyExc_RuntimeError, "Task list appears corrupted");
+ return -1;
+ }
+
+ if (task_node.next == NULL) {
+ PyErr_SetString(
+ PyExc_RuntimeError,
+ "Invalid linked list structure reading remote memory");
+ return -1;
+ }
+
+ uintptr_t task_addr = (uintptr_t)task_node.next
+ - async_offsets->asyncio_task_object.task_node;
+
+ PyObject *tn = parse_task_name(
+ pid,
+ debug_offsets,
+ async_offsets,
+ task_addr);
+ if (tn == NULL) {
+ return -1;
+ }
+
+ PyObject *current_awaited_by = PyList_New(0);
+ if (current_awaited_by == NULL) {
+ Py_DECREF(tn);
+ return -1;
+ }
+
+ PyObject *result_item = PyTuple_New(2);
+ if (result_item == NULL) {
+ Py_DECREF(tn);
+ Py_DECREF(current_awaited_by);
+ return -1;
+ }
+
+ PyTuple_SET_ITEM(result_item, 0, tn); // steals ref
+ PyTuple_SET_ITEM(result_item, 1, current_awaited_by); // steals ref
+ if (PyList_Append(result, result_item)) {
+ Py_DECREF(result_item);
+ return -1;
+ }
+ Py_DECREF(result_item);
+
+ if (parse_task_awaited_by(pid, debug_offsets, async_offsets,
+ task_addr, current_awaited_by))
+ {
+ return -1;
+ }
+
+ // onto the next one...
+ if (0 > read_memory(
+ pid,
+ (uintptr_t)task_node.next,
+ sizeof(task_node),
+ &task_node))
+ {
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+static int
+append_awaited_by(
+ int pid,
+ unsigned long tid,
+ uintptr_t head_addr,
+ struct _Py_DebugOffsets *debug_offsets,
+ struct _Py_AsyncioModuleDebugOffsets *async_offsets,
+ PyObject *result)
+{
+ PyObject *tid_py = PyLong_FromUnsignedLong(tid);
+ if (tid_py == NULL) {
+ return -1;
+ }
+
+ PyObject *result_item = PyTuple_New(2);
+ if (result_item == NULL) {
+ Py_DECREF(tid_py);
+ return -1;
+ }
+
+ PyObject* awaited_by_for_thread = PyList_New(0);
+ if (awaited_by_for_thread == NULL) {
+ Py_DECREF(tid_py);
+ Py_DECREF(result_item);
+ return -1;
+ }
+
+ PyTuple_SET_ITEM(result_item, 0, tid_py); // steals ref
+ PyTuple_SET_ITEM(result_item, 1, awaited_by_for_thread); // steals ref
+ if (PyList_Append(result, result_item)) {
+ Py_DECREF(result_item);
+ return -1;
+ }
+ Py_DECREF(result_item);
+
+ if (append_awaited_by_for_thread(
+ pid,
+ head_addr,
+ debug_offsets,
+ async_offsets,
+ awaited_by_for_thread))
+ {
+ return -1;
+ }
+
+ return 0;
+}
+
+static PyObject*
+get_all_awaited_by(PyObject* self, PyObject* args)
+{
+#if (!defined(__linux__) && !defined(__APPLE__)) || \
+ (defined(__linux__) && !HAVE_PROCESS_VM_READV)
+ PyErr_SetString(
+ PyExc_RuntimeError,
+ "get_all_awaited_by is not implemented on this platform");
+ return NULL;
+#endif
+
+ int pid;
+
+ if (!PyArg_ParseTuple(args, "i", &pid)) {
+ return NULL;
+ }
+
+ uintptr_t runtime_start_addr = get_py_runtime(pid);
+ if (runtime_start_addr == 0) {
+ if (!PyErr_Occurred()) {
+ PyErr_SetString(
+ PyExc_RuntimeError, "Failed to get .PyRuntime address");
+ }
+ return NULL;
+ }
+ struct _Py_DebugOffsets local_debug_offsets;
+
+ if (read_offsets(pid, &runtime_start_addr, &local_debug_offsets)) {
+ return NULL;
+ }
+
+ struct _Py_AsyncioModuleDebugOffsets local_async_debug;
+ if (read_async_debug(pid, &local_async_debug)) {
+ return NULL;
+ }
+
+ PyObject *result = PyList_New(0);
+ if (result == NULL) {
+ return NULL;
+ }
+
+ off_t interpreter_state_list_head =
+ local_debug_offsets.runtime_state.interpreters_head;
+
+ uintptr_t interpreter_state_addr;
+ if (0 > read_memory(
+ pid,
+ runtime_start_addr + interpreter_state_list_head,
+ sizeof(void*),
+ &interpreter_state_addr))
+ {
+ goto result_err;
+ }
+
+ uintptr_t thread_state_addr;
+ unsigned long tid = 0;
+ if (0 > read_memory(
+ pid,
+ interpreter_state_addr
+ + local_debug_offsets.interpreter_state.threads_head,
+ sizeof(void*),
+ &thread_state_addr))
+ {
+ goto result_err;
+ }
+
+ uintptr_t head_addr;
+ while (thread_state_addr != 0) {
+ if (0 > read_memory(
+ pid,
+ thread_state_addr
+ + local_debug_offsets.thread_state.native_thread_id,
+ sizeof(tid),
+ &tid))
+ {
+ goto result_err;
+ }
+
+ head_addr = thread_state_addr
+ + local_async_debug.asyncio_thread_state.asyncio_tasks_head;
+
+ if (append_awaited_by(pid, tid, head_addr, &local_debug_offsets,
+ &local_async_debug, result))
+ {
+ goto result_err;
+ }
+
+ if (0 > read_memory(
+ pid,
+ thread_state_addr + local_debug_offsets.thread_state.next,
+ sizeof(void*),
+ &thread_state_addr))
+ {
+ goto result_err;
+ }
+ }
+
+ head_addr = interpreter_state_addr
+ + local_async_debug.asyncio_interpreter_state.asyncio_tasks_head;
+
+ // On top of a per-thread task lists used by default by asyncio to avoid
+ // contention, there is also a fallback per-interpreter list of tasks;
+ // any tasks still pending when a thread is destroyed will be moved to the
+ // per-interpreter task list. It's unlikely we'll find anything here, but
+ // interesting for debugging.
+ if (append_awaited_by(pid, 0, head_addr, &local_debug_offsets,
+ &local_async_debug, result))
+ {
+ goto result_err;
+ }
+
+ return result;
+
+result_err:
+ Py_DECREF(result);
+ return NULL;
+}
+
static PyObject*
get_stack_trace(PyObject* self, PyObject* args)
{
"Get the Python stack from a given PID"},
{"get_async_stack_trace", get_async_stack_trace, METH_VARARGS,
"Get the asyncio stack from a given PID"},
+ {"get_all_awaited_by", get_all_awaited_by, METH_VARARGS,
+ "Get all tasks and their awaited_by from a given PID"},
{NULL, NULL, 0, NULL},
};