@unittest.skipIf(support.check_bolt_optimized(), "fails on BOLT instrumented binaries")
def test_sys_api(self):
- code = """if 1:
- import sys
- def foo():
- pass
-
- def spam():
- pass
+ for define_eval_hook in (False, True):
+ code = """if 1:
+ import sys
+ def foo():
+ pass
- def bar():
- sys.deactivate_stack_trampoline()
- foo()
- sys.activate_stack_trampoline("perf")
- spam()
+ def spam():
+ pass
- def baz():
- bar()
+ def bar():
+ sys.deactivate_stack_trampoline()
+ foo()
+ sys.activate_stack_trampoline("perf")
+ spam()
- sys.activate_stack_trampoline("perf")
- baz()
- """
- with temp_dir() as script_dir:
- script = make_script(script_dir, "perftest", code)
- env = {**os.environ, "PYTHON_JIT": "0"}
- with subprocess.Popen(
- [sys.executable, script],
- text=True,
- stderr=subprocess.PIPE,
- stdout=subprocess.PIPE,
- env=env,
- ) as process:
- stdout, stderr = process.communicate()
+ def baz():
+ bar()
- self.assertEqual(stderr, "")
- self.assertEqual(stdout, "")
+ sys.activate_stack_trampoline("perf")
+ baz()
+ """
+ if define_eval_hook:
+ set_eval_hook = """if 1:
+ import _testinternalcapi
+ _testinternalcapi.set_eval_frame_record([])
+"""
+ code = set_eval_hook + code
+ with temp_dir() as script_dir:
+ script = make_script(script_dir, "perftest", code)
+ env = {**os.environ, "PYTHON_JIT": "0"}
+ with subprocess.Popen(
+ [sys.executable, script],
+ text=True,
+ stderr=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ env=env,
+ ) as process:
+ stdout, stderr = process.communicate()
- perf_file = pathlib.Path(f"/tmp/perf-{process.pid}.map")
- self.assertTrue(perf_file.exists())
- perf_file_contents = perf_file.read_text()
- self.assertNotIn(f"py::foo:{script}", perf_file_contents)
- self.assertIn(f"py::spam:{script}", perf_file_contents)
- self.assertIn(f"py::bar:{script}", perf_file_contents)
- self.assertIn(f"py::baz:{script}", perf_file_contents)
+ self.assertEqual(stderr, "")
+ self.assertEqual(stdout, "")
+
+ perf_file = pathlib.Path(f"/tmp/perf-{process.pid}.map")
+ self.assertTrue(perf_file.exists())
+ perf_file_contents = perf_file.read_text()
+ self.assertNotIn(f"py::foo:{script}", perf_file_contents)
+ self.assertIn(f"py::spam:{script}", perf_file_contents)
+ self.assertIn(f"py::bar:{script}", perf_file_contents)
+ self.assertIn(f"py::baz:{script}", perf_file_contents)
def test_sys_api_with_existing_trampoline(self):
code = """if 1:
#define perf_map_file _PyRuntime.ceval.perf.map_file
#define persist_after_fork _PyRuntime.ceval.perf.persist_after_fork
#define perf_trampoline_type _PyRuntime.ceval.perf.perf_trampoline_type
+#define prev_eval_frame _PyRuntime.ceval.perf.prev_eval_frame
static void
perf_map_write_entry(void *state, const void *code_addr,
f = new_trampoline;
}
assert(f != NULL);
- return f(ts, frame, throw, _PyEval_EvalFrameDefault);
+ return f(ts, frame, throw, prev_eval_frame != NULL ? prev_eval_frame : _PyEval_EvalFrameDefault);
default_eval:
// Something failed, fall back to the default evaluator.
+ if (prev_eval_frame) {
+ return prev_eval_frame(ts, frame, throw);
+ }
return _PyEval_EvalFrameDefault(ts, frame, throw);
}
#endif // PY_HAVE_PERF_TRAMPOLINE
{
#ifdef PY_HAVE_PERF_TRAMPOLINE
PyThreadState *tstate = _PyThreadState_GET();
- if (tstate->interp->eval_frame &&
- tstate->interp->eval_frame != py_trampoline_evaluator) {
- PyErr_SetString(PyExc_RuntimeError,
- "Trampoline cannot be initialized as a custom eval "
- "frame is already present");
- return -1;
- }
if (!activate) {
- _PyInterpreterState_SetEvalFrameFunc(tstate->interp, NULL);
+ _PyInterpreterState_SetEvalFrameFunc(tstate->interp, prev_eval_frame);
perf_status = PERF_STATUS_NO_INIT;
}
- else {
+ else if (tstate->interp->eval_frame != py_trampoline_evaluator) {
+ prev_eval_frame = _PyInterpreterState_GetEvalFrameFunc(tstate->interp);
_PyInterpreterState_SetEvalFrameFunc(tstate->interp, py_trampoline_evaluator);
extra_code_index = _PyEval_RequestCodeExtraIndex(NULL);
if (extra_code_index == -1) {