/* Pointer to currently executing frame. */
struct _PyInterpreterFrame *current_frame;
+ /* Pointer to the base frame (bottommost sentinel frame).
+ Used by profilers to validate complete stack unwinding.
+ Points to the embedded base_frame in _PyThreadStateImpl.
+ The frame is embedded there rather than here because _PyInterpreterFrame
+ is defined in internal headers that cannot be exposed in the public API. */
+ struct _PyInterpreterFrame *base_frame;
+
struct _PyInterpreterFrame *last_profiled_frame;
Py_tracefunc c_profilefunc;
uint64_t next;
uint64_t interp;
uint64_t current_frame;
+ uint64_t base_frame;
uint64_t last_profiled_frame;
uint64_t thread_id;
uint64_t native_thread_id;
.next = offsetof(PyThreadState, next), \
.interp = offsetof(PyThreadState, interp), \
.current_frame = offsetof(PyThreadState, current_frame), \
+ .base_frame = offsetof(PyThreadState, base_frame), \
.last_profiled_frame = offsetof(PyThreadState, last_profiled_frame), \
.thread_id = offsetof(PyThreadState, thread_id), \
.native_thread_id = offsetof(PyThreadState, native_thread_id), \
#include "pycore_brc.h" // struct _brc_thread_state
#include "pycore_freelist_state.h" // struct _Py_freelists
+#include "pycore_interpframe_structs.h" // _PyInterpreterFrame
#include "pycore_mimalloc.h" // struct _mimalloc_thread_state
#include "pycore_qsbr.h" // struct qsbr
#include "pycore_uop.h" // struct _PyUOpInstruction
// semi-public fields are in PyThreadState.
PyThreadState base;
+ // Embedded base frame - sentinel at the bottom of the frame stack.
+ // Used by profiling/sampling to detect incomplete stack traces.
+ _PyInterpreterFrame base_frame;
+
// The reference count field is used to synchronize deallocation of the
// thread state during runtime finalization.
Py_ssize_t refcount;
instruction which cleans up the shim frame and returns.
+### Base frame
+
+Each thread state contains an embedded `_PyInterpreterFrame` called the "base frame"
+that serves as a sentinel at the bottom of the frame stack. This frame is allocated
+in `_PyThreadStateImpl` (the internal extension of `PyThreadState`) and initialized
+when the thread state is created. The `owner` field is set to `FRAME_OWNED_BY_INTERPRETER`.
+
+External profilers and sampling tools can validate that they have successfully unwound
+the complete call stack by checking that the frame chain terminates at the base frame.
+The `PyThreadState.base_frame` pointer provides the expected address to compare against.
+If a stack walk doesn't reach this frame, the sample is incomplete (possibly due to a
+race condition) and should be discarded.
+
+The base frame is embedded in `_PyThreadStateImpl` rather than `PyThreadState` because
+`_PyInterpreterFrame` is defined in internal headers that cannot be exposed in the
+public API. A pointer (`PyThreadState.base_frame`) is provided for profilers to access
+the address without needing internal headers.
+
+See the initialization in `new_threadstate()` in [Python/pystate.c](../Python/pystate.c).
+
+#### How profilers should use the base frame
+
+External profilers should read `tstate->base_frame` before walking the stack, then
+walk from `tstate->current_frame` following `frame->previous` pointers until reaching
+a frame with `owner == FRAME_OWNED_BY_INTERPRETER`. After the walk, verify that the
+last frame address matches `base_frame`. If not, discard the sample as incomplete
+since the frame chain may have been in an inconsistent state due to concurrent updates.
+
+
### Remote Profiling Frame Cache
The `last_profiled_frame` field in `PyThreadState` supports an optimization for
-import contextlib
import unittest
import os
import textwrap
+import contextlib
import importlib
import sys
import socket
# Simple wrapper functions for RemoteUnwinder
# ============================================================================
-# Errors that can occur transiently when reading process memory without synchronization
-RETRIABLE_ERRORS = (
- "Task list appears corrupted",
- "Invalid linked list structure reading remote memory",
- "Unknown error reading memory",
- "Unhandled frame owner",
- "Failed to parse initial frame",
- "Failed to process frame chain",
- "Failed to unwind stack",
-)
-
-
-def _is_retriable_error(exc):
- """Check if an exception is a transient error that should be retried."""
- msg = str(exc)
- return any(msg.startswith(err) or err in msg for err in RETRIABLE_ERRORS)
-
-
def get_stack_trace(pid):
for _ in busy_retry(SHORT_TIMEOUT):
try:
unwinder = RemoteUnwinder(pid, all_threads=True, debug=True)
return unwinder.get_stack_trace()
except RuntimeError as e:
- if _is_retriable_error(e):
- continue
- raise
+ continue
raise RuntimeError("Failed to get stack trace after retries")
unwinder = RemoteUnwinder(pid, debug=True)
return unwinder.get_async_stack_trace()
except RuntimeError as e:
- if _is_retriable_error(e):
- continue
- raise
+ continue
raise RuntimeError("Failed to get async stack trace after retries")
unwinder = RemoteUnwinder(pid, debug=True)
return unwinder.get_all_awaited_by()
except RuntimeError as e:
- if _is_retriable_error(e):
- continue
- raise
+ continue
raise RuntimeError("Failed to get all awaited_by after retries")
def _get_frames_with_retry(self, unwinder, required_funcs):
"""Get frames containing required_funcs, with retry for transient errors."""
for _ in range(MAX_TRIES):
- try:
+ with contextlib.suppress(OSError, RuntimeError):
traces = unwinder.get_stack_trace()
for interp in traces:
for thread in interp.threads:
funcs = {f.funcname for f in thread.frame_info}
if required_funcs.issubset(funcs):
return thread.frame_info
- except RuntimeError as e:
- if _is_retriable_error(e):
- pass
- else:
- raise
time.sleep(0.1)
return None
make_unwinder,
):
unwinder = make_unwinder(cache_frames=True)
- buffer = b""
-
- def recv_msg():
- """Receive a single message from socket."""
- nonlocal buffer
- while b"\n" not in buffer:
- chunk = client_socket.recv(256)
- if not chunk:
- return None
- buffer += chunk
- msg, buffer = buffer.split(b"\n", 1)
- return msg
-
- def get_thread_frames(target_funcs):
- """Get frames for thread matching target functions."""
- retries = 0
- for _ in busy_retry(SHORT_TIMEOUT):
- if retries >= 5:
- break
- retries += 1
- # On Windows, ReadProcessMemory can fail with OSError
- # (WinError 299) when frame pointers are in flux
- with contextlib.suppress(RuntimeError, OSError):
- traces = unwinder.get_stack_trace()
- for interp in traces:
- for thread in interp.threads:
- funcs = [f.funcname for f in thread.frame_info]
- if any(f in funcs for f in target_funcs):
- return funcs
- return None
+
+ # Message dispatch table: signal -> required functions for that thread
+ dispatch = {
+ b"t1:baz1": {"baz1", "bar1", "foo1"},
+ b"t2:baz2": {"baz2", "bar2", "foo2"},
+ b"t1:blech1": {"blech1", "foo1"},
+ b"t2:blech2": {"blech2", "foo2"},
+ }
# Track results for each sync point
results = {}
- # Process 4 sync points: baz1, baz2, blech1, blech2
- # With the lock, threads are serialized - handle one at a time
- for _ in range(4):
- msg = recv_msg()
- self.assertIsNotNone(msg, "Expected message from subprocess")
-
- # Determine which thread/function and take snapshot
- if msg == b"t1:baz1":
- funcs = get_thread_frames(["baz1", "bar1", "foo1"])
- self.assertIsNotNone(funcs, "Thread 1 not found at baz1")
- results["t1:baz1"] = funcs
- elif msg == b"t2:baz2":
- funcs = get_thread_frames(["baz2", "bar2", "foo2"])
- self.assertIsNotNone(funcs, "Thread 2 not found at baz2")
- results["t2:baz2"] = funcs
- elif msg == b"t1:blech1":
- funcs = get_thread_frames(["blech1", "foo1"])
- self.assertIsNotNone(funcs, "Thread 1 not found at blech1")
- results["t1:blech1"] = funcs
- elif msg == b"t2:blech2":
- funcs = get_thread_frames(["blech2", "foo2"])
- self.assertIsNotNone(funcs, "Thread 2 not found at blech2")
- results["t2:blech2"] = funcs
-
- # Release thread to continue
+ # Process 4 sync points (order depends on thread scheduling)
+ buffer = _wait_for_signal(client_socket, b"\n")
+ for i in range(4):
+ # Extract first message from buffer
+ msg, sep, buffer = buffer.partition(b"\n")
+ self.assertIn(msg, dispatch, f"Unexpected message: {msg!r}")
+
+ # Sample frames for the thread at this sync point
+ required_funcs = dispatch[msg]
+ frames = self._get_frames_with_retry(unwinder, required_funcs)
+ self.assertIsNotNone(frames, f"Thread not found for {msg!r}")
+ results[msg] = [f.funcname for f in frames]
+
+ # Release thread and wait for next message (if not last)
client_socket.sendall(b"k")
+ if i < 3:
+ buffer += _wait_for_signal(client_socket, b"\n")
# Validate Phase 1: baz snapshots
- t1_baz = results.get("t1:baz1")
- t2_baz = results.get("t2:baz2")
+ t1_baz = results.get(b"t1:baz1")
+ t2_baz = results.get(b"t2:baz2")
self.assertIsNotNone(t1_baz, "Missing t1:baz1 snapshot")
self.assertIsNotNone(t2_baz, "Missing t2:baz2 snapshot")
self.assertNotIn("foo1", t2_baz)
# Validate Phase 2: blech snapshots (cache invalidation test)
- t1_blech = results.get("t1:blech1")
- t2_blech = results.get("t2:blech2")
+ t1_blech = results.get(b"t1:blech1")
+ t2_blech = results.get(b"t2:blech2")
self.assertIsNotNone(t1_blech, "Missing t1:blech1 snapshot")
self.assertIsNotNone(t2_blech, "Missing t2:blech2 snapshot")
--- /dev/null
+Add incomplete sample detection to prevent corrupted profiling data. Each
+thread state now contains an embedded base frame (sentinel at the bottom of
+the frame stack) with owner type ``FRAME_OWNED_BY_INTERPRETER``. The profiler
+validates that stack unwinding terminates at this sentinel frame. Samples that
+fail to reach the base frame (due to race conditions, memory corruption, or
+other errors) are now rejected rather than being included as spurious data.
uintptr_t initial_frame_addr,
StackChunkList *chunks,
PyObject *frame_info,
+ uintptr_t base_frame_addr,
uintptr_t gc_frame,
uintptr_t last_profiled_frame,
int *stopped_at_cached_frame,
void* frame = (void*)frame_addr;
- if (GET_MEMBER(char, frame, unwinder->debug_offsets.interpreter_frame.owner) == FRAME_OWNED_BY_INTERPRETER) {
- return 0; // C frame
+ char owner = GET_MEMBER(char, frame, unwinder->debug_offsets.interpreter_frame.owner);
+ if (owner == FRAME_OWNED_BY_INTERPRETER) {
+ return 0; // C frame or sentinel base frame
}
- if (GET_MEMBER(char, frame, unwinder->debug_offsets.interpreter_frame.owner) != FRAME_OWNED_BY_GENERATOR
- && GET_MEMBER(char, frame, unwinder->debug_offsets.interpreter_frame.owner) != FRAME_OWNED_BY_THREAD) {
- PyErr_Format(PyExc_RuntimeError, "Unhandled frame owner %d.\n",
- GET_MEMBER(char, frame, unwinder->debug_offsets.interpreter_frame.owner));
+ if (owner != FRAME_OWNED_BY_GENERATOR && owner != FRAME_OWNED_BY_THREAD) {
+ PyErr_Format(PyExc_RuntimeError, "Unhandled frame owner %d.\n", owner);
set_exception_cause(unwinder, PyExc_RuntimeError, "Unhandled frame owner type in async frame");
return -1;
}
uintptr_t initial_frame_addr,
StackChunkList *chunks,
PyObject *frame_info,
+ uintptr_t base_frame_addr,
uintptr_t gc_frame,
uintptr_t last_profiled_frame,
int *stopped_at_cached_frame,
{
uintptr_t frame_addr = initial_frame_addr;
uintptr_t prev_frame_addr = 0;
+ uintptr_t last_frame_addr = 0; // Track last frame visited for validation
const size_t MAX_FRAMES = 1024 + 512;
size_t frame_count = 0;
PyObject *frame = NULL;
uintptr_t next_frame_addr = 0;
uintptr_t stackpointer = 0;
+ last_frame_addr = frame_addr; // Remember this frame address
if (++frame_count > MAX_FRAMES) {
PyErr_SetString(PyExc_RuntimeError, "Too many stack frames (possible infinite loop)");
return -1;
}
- // Try chunks first, fallback to direct memory read
if (parse_frame_from_chunks(unwinder, &frame, frame_addr, &next_frame_addr, &stackpointer, chunks) < 0) {
PyErr_Clear();
uintptr_t address_of_code_object = 0;
frame_addr = next_frame_addr;
}
+ // Validate we reached the base frame (sentinel at bottom of stack)
+ // Only validate if we walked the full chain (didn't stop at cached frame)
+ // and base_frame_addr is provided (non-zero)
+ int stopped_early = stopped_at_cached_frame && *stopped_at_cached_frame;
+ if (!stopped_early && base_frame_addr != 0 && last_frame_addr != base_frame_addr) {
+ PyErr_Format(PyExc_RuntimeError,
+ "Incomplete sample: did not reach base frame (expected 0x%lx, got 0x%lx)",
+ base_frame_addr, last_frame_addr);
+ return -1;
+ }
+
return 0;
}
Py_ssize_t frames_before = PyList_GET_SIZE(frame_info);
int stopped_at_cached = 0;
- if (process_frame_chain(unwinder, frame_addr, chunks, frame_info, gc_frame,
+ if (process_frame_chain(unwinder, frame_addr, chunks, frame_info, 0, gc_frame,
last_profiled_frame, &stopped_at_cached,
addrs, &num_addrs, FRAME_CACHE_MAX_FRAMES) < 0) {
return -1;
// Cache miss - continue walking from last_profiled_frame to get the rest
STATS_INC(unwinder, frame_cache_misses);
Py_ssize_t frames_before_walk = PyList_GET_SIZE(frame_info);
- if (process_frame_chain(unwinder, last_profiled_frame, chunks, frame_info, gc_frame,
+ if (process_frame_chain(unwinder, last_profiled_frame, chunks, frame_info, 0, gc_frame,
0, NULL, addrs, &num_addrs, FRAME_CACHE_MAX_FRAMES) < 0) {
return -1;
}
}
uintptr_t frame_addr = GET_MEMBER(uintptr_t, ts, unwinder->debug_offsets.thread_state.current_frame);
+ uintptr_t base_frame_addr = GET_MEMBER(uintptr_t, ts, unwinder->debug_offsets.thread_state.base_frame);
frame_info = PyList_New(0);
if (!frame_info) {
PyErr_Clear(); // Non-fatal
}
} else {
- // No caching - process entire frame chain
+ // No caching - process entire frame chain with base_frame validation
if (process_frame_chain(unwinder, frame_addr, &chunks, frame_info,
- gc_frame, 0, NULL, NULL, NULL, 0) < 0) {
+ base_frame_addr, gc_frame, 0, NULL, NULL, NULL, 0) < 0) {
set_exception_cause(unwinder, PyExc_RuntimeError, "Failed to process frame chain");
goto error;
}
{
PyThreadState *tstate = _PyThreadState_GET();
_PyInterpreterFrame *current_frame = tstate->current_frame;
+ if (current_frame == tstate->base_frame) {
+ current_frame = NULL;
+ }
int result = cf->cf_flags != 0;
if (current_frame != NULL) {
if (tstate != _PyThreadState_GET()) {
Py_FatalError("thread is not current");
}
- if (tstate->current_frame != NULL) {
+ if (tstate->current_frame != tstate->base_frame) {
Py_FatalError("thread still has a frame");
}
interp->finalizing = 1;
// This is cleared when PyGILState_Ensure() creates the thread state.
tstate->gilstate_counter = 1;
- tstate->current_frame = NULL;
+ // Initialize the embedded base frame - sentinel at the bottom of the frame stack
+ _tstate->base_frame.previous = NULL;
+ _tstate->base_frame.f_executable = PyStackRef_None;
+ _tstate->base_frame.f_funcobj = PyStackRef_NULL;
+ _tstate->base_frame.f_globals = NULL;
+ _tstate->base_frame.f_builtins = NULL;
+ _tstate->base_frame.f_locals = NULL;
+ _tstate->base_frame.frame_obj = NULL;
+ _tstate->base_frame.instr_ptr = NULL;
+ _tstate->base_frame.stackpointer = _tstate->base_frame.localsplus;
+ _tstate->base_frame.return_offset = 0;
+ _tstate->base_frame.owner = FRAME_OWNED_BY_INTERPRETER;
+ _tstate->base_frame.visited = 0;
+#ifdef Py_DEBUG
+ _tstate->base_frame.lltrace = 0;
+#endif
+#ifdef Py_GIL_DISABLED
+ _tstate->base_frame.tlbc_index = 0;
+#endif
+ _tstate->base_frame.localsplus[0] = PyStackRef_NULL;
+
+ // current_frame starts pointing to the base frame
+ tstate->current_frame = &_tstate->base_frame;
+ // base_frame pointer for profilers to validate stack unwinding
+ tstate->base_frame = &_tstate->base_frame;
tstate->datastack_chunk = NULL;
tstate->datastack_top = NULL;
tstate->datastack_limit = NULL;
int verbose = _PyInterpreterState_GetConfig(tstate->interp)->verbose;
- if (verbose && tstate->current_frame != NULL) {
+ if (verbose && tstate->current_frame != tstate->base_frame) {
/* bpo-20526: After the main thread calls
_PyInterpreterState_SetFinalizing() in Py_FinalizeEx()
(or in Py_EndInterpreter() for subinterpreters),
dump_frame(int fd, _PyInterpreterFrame *frame)
{
if (frame->owner == FRAME_OWNED_BY_INTERPRETER) {
- /* Ignore trampoline frame */
+ /* Ignore trampoline frames and base frame sentinel */
return 0;
}