--- /dev/null
+import json
+import os
+import platform
+import subprocess
+import sys
+import sysconfig
+import unittest
+
+from test import support
+from test.support import import_helper
+
+
+_testinternalcapi = import_helper.import_module("_testinternalcapi")
+
+
+if not support.has_subprocess_support:
+ raise unittest.SkipTest("test requires subprocess support")
+
+
+def _frame_pointers_expected(machine):
+ cflags = " ".join(
+ value for value in (
+ sysconfig.get_config_var("PY_CORE_CFLAGS"),
+ sysconfig.get_config_var("CFLAGS"),
+ )
+ if value
+ )
+ if "no-omit-frame-pointer" in cflags:
+ return True
+ if "omit-frame-pointer" in cflags:
+ return False
+ if sys.platform == "darwin":
+ # macOS x86_64/ARM64 always have frame pointer by default.
+ return True
+ if sys.platform == "linux":
+ if machine in {"aarch64", "arm64"}:
+ # 32-bit Linux is not supported
+ if sys.maxsize < 2**32:
+ return None
+ return True
+ if machine == "x86_64":
+ return False
+ if sys.platform == "win32":
+ # MSVC ignores /Oy and /Oy- on x64/ARM64.
+ if machine == "arm64":
+ # Windows ARM64 guidelines recommend frame pointers (x29) for stack walking.
+ return True
+ elif machine == "x86_64":
+ # Windows x64 uses unwind metadata; frame pointers are not required.
+ return None
+ return None
+
+
+def _build_stack_and_unwind():
+ import operator
+
+ def build_stack(n, unwinder, warming_up_caller=False):
+ if warming_up_caller:
+ return
+ if n == 0:
+ return unwinder()
+ warming_up = True
+ while warming_up:
+ # Can't branch on JIT state inside JITted code, so compute here.
+ warming_up = (
+ hasattr(sys, "_jit")
+ and sys._jit.is_enabled()
+ and not sys._jit.is_active()
+ )
+ result = operator.call(build_stack, n - 1, unwinder, warming_up)
+ return result
+
+ stack = build_stack(10, _testinternalcapi.manual_frame_pointer_unwind)
+ return stack
+
+
+def _classify_stack(stack, jit_enabled):
+ labels = _testinternalcapi.classify_stack_addresses(stack, jit_enabled)
+
+ annotated = []
+ jit_frames = 0
+ python_frames = 0
+ other_frames = 0
+ for idx, (frame, tag) in enumerate(zip(stack, labels)):
+ addr = int(frame)
+ if tag == "jit":
+ jit_frames += 1
+ elif tag == "python":
+ python_frames += 1
+ else:
+ other_frames += 1
+ annotated.append((idx, addr, tag))
+ return annotated, python_frames, jit_frames, other_frames
+
+
+def _annotate_unwind():
+ stack = _build_stack_and_unwind()
+ jit_enabled = hasattr(sys, "_jit") and sys._jit.is_enabled()
+ jit_backend = _testinternalcapi.get_jit_backend()
+ ranges = _testinternalcapi.get_jit_code_ranges() if jit_enabled else []
+ if jit_enabled and ranges:
+ print("JIT ranges:")
+ for start, end in ranges:
+ print(f" {int(start):#x}-{int(end):#x}")
+ annotated, python_frames, jit_frames, other_frames = _classify_stack(
+ stack, jit_enabled
+ )
+ for idx, addr, tag in annotated:
+ print(f"#{idx:02d} {addr:#x} -> {tag}")
+ return json.dumps({
+ "length": len(stack),
+ "python_frames": python_frames,
+ "jit_frames": jit_frames,
+ "other_frames": other_frames,
+ "jit_backend": jit_backend,
+ })
+
+
+def _manual_unwind_length(**env):
+ code = (
+ "from test.test_frame_pointer_unwind import _annotate_unwind; "
+ "print(_annotate_unwind());"
+ )
+ run_env = os.environ.copy()
+ run_env.update(env)
+ proc = subprocess.run(
+ [sys.executable, "-c", code],
+ env=run_env,
+ capture_output=True,
+ text=True,
+ )
+ # Surface the output for debugging/visibility when running this test
+ if proc.stdout:
+ print(proc.stdout, end="")
+ if proc.returncode:
+ raise RuntimeError(
+ f"unwind helper failed (rc={proc.returncode}): {proc.stderr or proc.stdout}"
+ )
+ stdout_lines = proc.stdout.strip().splitlines()
+ if not stdout_lines:
+ raise RuntimeError("unwind helper produced no output")
+ try:
+ return json.loads(stdout_lines[-1])
+ except ValueError as exc:
+ raise RuntimeError(
+ f"unexpected output from unwind helper: {proc.stdout!r}"
+ ) from exc
+
+
+@support.requires_gil_enabled("test requires the GIL enabled")
+@unittest.skipIf(support.is_wasi, "test not supported on WASI")
+class FramePointerUnwindTests(unittest.TestCase):
+
+ def setUp(self):
+ super().setUp()
+ machine = platform.machine().lower()
+ expected = _frame_pointers_expected(machine)
+ if expected is None:
+ self.skipTest(f"unsupported architecture for frame pointer check: {machine}")
+ try:
+ _testinternalcapi.manual_frame_pointer_unwind()
+ except RuntimeError as exc:
+ if "not supported" in str(exc):
+ self.skipTest("manual frame pointer unwinding not supported on this platform")
+ raise
+ self.machine = machine
+ self.frame_pointers_expected = expected
+
+ def test_manual_unwind_respects_frame_pointers(self):
+ jit_available = hasattr(sys, "_jit") and sys._jit.is_available()
+ envs = [({"PYTHON_JIT": "0"}, False)]
+ if jit_available:
+ envs.append(({"PYTHON_JIT": "1"}, True))
+
+ for env, using_jit in envs:
+ with self.subTest(env=env):
+ result = _manual_unwind_length(**env)
+ jit_frames = result["jit_frames"]
+ python_frames = result.get("python_frames", 0)
+ jit_backend = result.get("jit_backend")
+ if self.frame_pointers_expected:
+ self.assertGreater(
+ python_frames,
+ 0,
+ f"expected to find Python frames on {self.machine} with env {env}",
+ )
+ if using_jit:
+ if jit_backend == "jit":
+ self.assertGreater(
+ jit_frames,
+ 0,
+ f"expected to find JIT frames on {self.machine} with env {env}",
+ )
+ else:
+ # jit_backend is "interpreter" or not present
+ self.assertEqual(
+ jit_frames,
+ 0,
+ f"unexpected JIT frames counted on {self.machine} with env {env}",
+ )
+ else:
+ self.assertEqual(
+ jit_frames,
+ 0,
+ f"unexpected JIT frames counted on {self.machine} with env {env}",
+ )
+ else:
+ self.assertLessEqual(
+ python_frames,
+ 1,
+ f"unexpected Python frames counted on {self.machine} with env {env}",
+ )
+ self.assertEqual(
+ jit_frames,
+ 0,
+ f"unexpected JIT frames counted on {self.machine} with env {env}",
+ )
+
+
+if __name__ == "__main__":
+ unittest.main()
#undef NDEBUG
#include "Python.h"
+#include <string.h>
#include "pycore_backoff.h" // JUMP_BACKWARD_INITIAL_VALUE
#include "pycore_bitutils.h" // _Py_bswap32()
#include "pycore_bytesobject.h" // _PyBytes_Find()
#include "pycore_initconfig.h" // _Py_GetConfigsAsDict()
#include "pycore_instruction_sequence.h" // _PyInstructionSequence_New()
#include "pycore_interpframe.h" // _PyFrame_GetFunction()
+#include "pycore_jit.h" // _PyJIT_AddressInJitCode()
#include "pycore_object.h" // _PyObject_IsFreed()
#include "pycore_optimizer.h" // _Py_Executor_DependsOn
#include "pycore_pathconfig.h" // _PyPathConfig_ClearGlobal()
// Include test definitions from _testinternalcapi/
#include "_testinternalcapi/parts.h"
+#if defined(HAVE_DLADDR) && !defined(__wasi__)
+# include <dlfcn.h>
+#endif
+#ifdef MS_WINDOWS
+# include <windows.h>
+# include <intrin.h>
+# include <winnt.h>
+# include <wchar.h>
+#endif
#define MODULE_NAME "_testinternalcapi"
+static const uintptr_t min_frame_pointer_addr = 0x1000;
+
+
static PyObject *
_get_current_module(void)
{
return PyLong_FromSize_t(_PyOS_STACK_MARGIN_BYTES);
}
+#ifdef MS_WINDOWS
+static const char *
+classify_address(uintptr_t addr, int jit_enabled, PyInterpreterState *interp)
+{
+ HMODULE module = NULL;
+ if (GetModuleHandleExW(
+ GET_MODULE_HANDLE_EX_FLAG_FROM_ADDRESS
+ | GET_MODULE_HANDLE_EX_FLAG_UNCHANGED_REFCOUNT,
+ (LPCWSTR)addr,
+ &module)) {
+ wchar_t path[MAX_PATH];
+ DWORD len = GetModuleFileNameW(module, path, Py_ARRAY_LENGTH(path));
+ if (len > 0 && len < Py_ARRAY_LENGTH(path)) {
+ const wchar_t *base = wcsrchr(path, L'\\');
+ base = base ? base + 1 : path;
+ if (_wcsnicmp(base, L"python", 6) == 0) {
+ return "python";
+ }
+ return "other";
+ }
+ /* Module resolved but path unavailable: treat as non-JIT. */
+ return "other";
+ }
+#ifdef _Py_JIT
+ if (jit_enabled && _PyJIT_AddressInJitCode(interp, addr)) {
+ return "jit";
+ }
+#endif
+ return "other";
+}
+#elif defined(HAVE_DLADDR) && !defined(__wasi__)
+static const char *
+classify_address(uintptr_t addr, int jit_enabled, PyInterpreterState *interp)
+{
+ Dl_info info;
+ if (dladdr((void *)addr, &info) != 0
+ && info.dli_fname != NULL
+ && info.dli_fname[0] != '\0') {
+ const char *base = strrchr(info.dli_fname, '/');
+ base = base ? base + 1 : info.dli_fname;
+ if (strncmp(base, "python", 6) == 0) {
+ return "python";
+ }
+ return "other";
+ }
+#ifdef _Py_JIT
+ if (jit_enabled && _PyJIT_AddressInJitCode(interp, addr)) {
+ return "jit";
+ }
+#endif
+ return "other";
+}
+#else
+static const char *
+classify_address(uintptr_t addr, int jit_enabled, PyInterpreterState *interp)
+{
+#ifdef _Py_JIT
+ if (jit_enabled && _PyJIT_AddressInJitCode(interp, addr)) {
+ return "jit";
+ }
+#endif
+ return "other";
+}
+#endif
+
+static PyObject *
+classify_stack_addresses(PyObject *self, PyObject *args)
+{
+ PyObject *seq = NULL;
+ int jit_enabled = 0;
+
+ if (!PyArg_ParseTuple(args, "O|p:classify_stack_addresses",
+ &seq, &jit_enabled)) {
+ return NULL;
+ }
+ PyObject *fast = PySequence_Fast(seq, "addresses must be iterable");
+ if (fast == NULL) {
+ return NULL;
+ }
+ Py_ssize_t n = PySequence_Fast_GET_SIZE(fast);
+ PyObject *labels = PyList_New(n);
+ if (labels == NULL) {
+ Py_DECREF(fast);
+ return NULL;
+ }
+ PyThreadState *tstate = _PyThreadState_GET();
+ PyInterpreterState *interp = tstate ? tstate->interp : NULL;
+ PyObject **items = PySequence_Fast_ITEMS(fast);
+ for (Py_ssize_t i = 0; i < n; i++) {
+ unsigned long long value = PyLong_AsUnsignedLongLong(items[i]);
+ if (PyErr_Occurred()) {
+ Py_DECREF(labels);
+ Py_DECREF(fast);
+ return NULL;
+ }
+ const char *label = classify_address((uintptr_t)value, jit_enabled, interp);
+ PyObject *label_obj = PyUnicode_FromString(label);
+ if (label_obj == NULL) {
+ Py_DECREF(labels);
+ Py_DECREF(fast);
+ return NULL;
+ }
+ PyList_SET_ITEM(labels, i, label_obj);
+ }
+ Py_DECREF(fast);
+ return labels;
+}
+
+static PyObject *
+get_jit_code_ranges(PyObject *self, PyObject *Py_UNUSED(args))
+{
+ PyObject *ranges = PyList_New(0);
+ if (ranges == NULL) {
+ return NULL;
+ }
+#ifdef _Py_JIT
+ PyThreadState *tstate = _PyThreadState_GET();
+ PyInterpreterState *interp = tstate ? tstate->interp : NULL;
+ if (interp == NULL) {
+ return ranges;
+ }
+ for (_PyExecutorObject *exec = interp->executor_list_head;
+ exec != NULL;
+ exec = exec->vm_data.links.next)
+ {
+ if (exec->jit_code == NULL || exec->jit_size == 0) {
+ continue;
+ }
+ uintptr_t start = (uintptr_t)exec->jit_code;
+ uintptr_t end = start + exec->jit_size;
+ PyObject *start_obj = PyLong_FromUnsignedLongLong(start);
+ PyObject *end_obj = PyLong_FromUnsignedLongLong(end);
+ if (start_obj == NULL || end_obj == NULL) {
+ Py_XDECREF(start_obj);
+ Py_XDECREF(end_obj);
+ Py_DECREF(ranges);
+ return NULL;
+ }
+ PyObject *pair = PyTuple_New(2);
+ if (pair == NULL) {
+ Py_DECREF(start_obj);
+ Py_DECREF(end_obj);
+ Py_DECREF(ranges);
+ return NULL;
+ }
+ PyTuple_SET_ITEM(pair, 0, start_obj);
+ PyTuple_SET_ITEM(pair, 1, end_obj);
+ if (PyList_Append(ranges, pair) < 0) {
+ Py_DECREF(pair);
+ Py_DECREF(ranges);
+ return NULL;
+ }
+ Py_DECREF(pair);
+ }
+#endif
+ return ranges;
+}
+
+static PyObject *
+get_jit_backend(PyObject *self, PyObject *Py_UNUSED(args))
+{
+#ifdef _Py_JIT
+ return PyUnicode_FromString("jit");
+#elif defined(_Py_TIER2)
+ return PyUnicode_FromString("interpreter");
+#else
+ Py_RETURN_NONE;
+#endif
+}
+
+static PyObject *
+manual_unwind_from_fp(uintptr_t *frame_pointer)
+{
+ Py_ssize_t max_depth = 200;
+ int stack_grows_down = _Py_STACK_GROWS_DOWN;
+
+ if (frame_pointer == NULL) {
+ return PyList_New(0);
+ }
+
+ PyObject *result = PyList_New(0);
+ if (result == NULL) {
+ return NULL;
+ }
+
+ for (Py_ssize_t depth = 0;
+ depth < max_depth && frame_pointer != NULL;
+ depth++)
+ {
+ uintptr_t fp_addr = (uintptr_t)frame_pointer;
+ if ((fp_addr % sizeof(uintptr_t)) != 0) {
+ break;
+ }
+ uintptr_t return_addr = frame_pointer[1];
+
+ PyObject *addr_obj = PyLong_FromUnsignedLongLong(return_addr);
+ if (addr_obj == NULL) {
+ Py_DECREF(result);
+ return NULL;
+ }
+ if (PyList_Append(result, addr_obj) < 0) {
+ Py_DECREF(addr_obj);
+ Py_DECREF(result);
+ return NULL;
+ }
+ Py_DECREF(addr_obj);
+
+ uintptr_t *next_fp = (uintptr_t *)frame_pointer[0];
+ // Stop if the frame pointer is extremely low.
+ if ((uintptr_t)next_fp < min_frame_pointer_addr) {
+ break;
+ }
+ uintptr_t next_addr = (uintptr_t)next_fp;
+ if (stack_grows_down) {
+ if (next_addr <= fp_addr) {
+ break;
+ }
+ }
+ else {
+ if (next_addr >= fp_addr) {
+ break;
+ }
+ }
+ frame_pointer = next_fp;
+ }
+
+ return result;
+}
+#if defined(__GNUC__) || defined(__clang__)
+static PyObject *
+manual_frame_pointer_unwind(PyObject *self, PyObject *args)
+{
+ uintptr_t *frame_pointer = (uintptr_t *)__builtin_frame_address(0);
+ return manual_unwind_from_fp(frame_pointer);
+}
+#elif defined(MS_WINDOWS) && defined(_M_ARM64)
+static PyObject *
+manual_frame_pointer_unwind(PyObject *self, PyObject *args)
+{
+ CONTEXT ctx;
+ uintptr_t *frame_pointer = NULL;
+
+ RtlCaptureContext(&ctx);
+ frame_pointer = (uintptr_t *)ctx.Fp;
+ return manual_unwind_from_fp(frame_pointer);
+}
+#else
+static PyObject *
+manual_frame_pointer_unwind(PyObject *self, PyObject *Py_UNUSED(args))
+{
+ PyErr_SetString(PyExc_RuntimeError,
+ "manual_frame_pointer_unwind is not supported on this platform");
+ return NULL;
+}
+#endif
+
static PyObject*
test_bswap(PyObject *self, PyObject *Py_UNUSED(args))
{
{"get_c_recursion_remaining", get_c_recursion_remaining, METH_NOARGS},
{"get_stack_pointer", get_stack_pointer, METH_NOARGS},
{"get_stack_margin", get_stack_margin, METH_NOARGS},
+ {"classify_stack_addresses", classify_stack_addresses, METH_VARARGS},
+ {"get_jit_code_ranges", get_jit_code_ranges, METH_NOARGS},
+ {"get_jit_backend", get_jit_backend, METH_NOARGS},
+ {"manual_frame_pointer_unwind", manual_frame_pointer_unwind, METH_NOARGS},
{"test_bswap", test_bswap, METH_NOARGS},
{"test_popcount", test_popcount, METH_NOARGS},
{"test_bit_length", test_bit_length, METH_NOARGS},