- Disabled
* - Default for ``--subprocesses``
- Disabled
+ * - Default for ``--blocking``
+ - Disabled (non-blocking sampling)
Sampling interval and duration
when work is distributed across a thread pool.
+.. _blocking-mode:
+
+Blocking mode
+-------------
+
+By default, Tachyon reads the target process's memory without stopping it.
+This non-blocking approach is ideal for most profiling scenarios because it
+imposes virtually zero overhead on the target application: the profiled
+program runs at full speed and is unaware it is being observed.
+
+However, non-blocking sampling can occasionally produce incomplete or
+inconsistent stack traces in applications with many generators or coroutines
+that rapidly switch between yield points, or in programs with very fast-changing
+call stacks where functions enter and exit between the start and end of a single
+stack read, resulting in reconstructed stacks that mix frames from different
+execution states or that never actually existed.
+
+For these cases, the :option:`--blocking` option stops the target process during
+each sample::
+
+ python -m profiling.sampling run --blocking script.py
+ python -m profiling.sampling attach --blocking 12345
+
+When blocking mode is enabled, the profiler suspends the target process,
+reads its stack, then resumes it. This guarantees that each captured stack
+represents a real, consistent snapshot of what the process was doing at that
+instant. The trade-off is that the target process runs slower because it is
+repeatedly paused.
+
+.. warning::
+
+ Do not use very high sample rates (low ``--interval`` values) with blocking
+ mode. Suspending and resuming a process takes time, and if the sampling
+ interval is too short, the target will spend more time stopped than running.
+ For blocking mode, intervals of 1000 microseconds (1 millisecond) or higher
+ are recommended. The default 100 microsecond interval may cause noticeable
+ slowdown in the target application.
+
+Use blocking mode only when you observe inconsistent stacks in your profiles,
+particularly with generator-heavy or coroutine-heavy code. For most
+applications, the default non-blocking mode provides accurate results with
+zero impact on the target process.
+
+
Special frames
--------------
Also profile subprocesses. Each subprocess gets its own profiler
instance and output file. Incompatible with ``--live``.
+.. option:: --blocking
+
+ Pause the target process during each sample. This ensures consistent
+ stack traces at the cost of slowing down the target. Use with longer
+ intervals (1000 µs or higher) to minimize impact. See :ref:`blocking-mode`
+ for details.
+
Mode options
------------
action="store_true",
help="Also profile subprocesses. Each subprocess gets its own profiler and output file.",
)
+ sampling_group.add_argument(
+ "--blocking",
+ action="store_true",
+ help="Stop all threads in target process before sampling to get consistent snapshots. "
+ "Uses thread_suspend on macOS and ptrace on Linux. Adds overhead but ensures memory "
+ "reads are from a frozen state.",
+ )
def _add_mode_options(parser):
if getattr(args, 'command', None) == "replay":
return
+ # Warn about blocking mode with aggressive sampling intervals
+ if args.blocking and args.interval < 100:
+ print(
+ f"Warning: --blocking with a {args.interval} µs interval will stop all threads "
+ f"{1_000_000 // args.interval} times per second. "
+ "Consider using --interval 1000 or higher to reduce overhead.",
+ file=sys.stderr
+ )
+
# Check if live mode is available
if hasattr(args, 'live') and args.live and LiveStatsCollector is None:
parser.error(
native=args.native,
gc=args.gc,
opcodes=args.opcodes,
+ blocking=args.blocking,
)
_handle_output(collector, args, args.pid, mode)
native=args.native,
gc=args.gc,
opcodes=args.opcodes,
+ blocking=args.blocking,
)
_handle_output(collector, args, process.pid, mode)
finally:
native=args.native,
gc=args.gc,
opcodes=args.opcodes,
+ blocking=args.blocking,
)
native=args.native,
gc=args.gc,
opcodes=args.opcodes,
+ blocking=args.blocking,
)
finally:
# Clean up the subprocess
import _remote_debugging
+import contextlib
import os
import statistics
import sys
from collections import deque
from _colorize import ANSIColors
+from .pstats_collector import PstatsCollector
+from .stack_collector import CollapsedStackCollector, FlamegraphCollector
+from .heatmap_collector import HeatmapCollector
+from .gecko_collector import GeckoCollector
from .binary_collector import BinaryCollector
+
+
+@contextlib.contextmanager
+def _pause_threads(unwinder, blocking):
+ """Context manager to pause/resume threads around sampling if blocking is True."""
+ if blocking:
+ unwinder.pause_threads()
+ try:
+ yield
+ finally:
+ unwinder.resume_threads()
+ else:
+ yield
+
+
from .constants import (
PROFILING_MODE_WALL,
PROFILING_MODE_CPU,
class SampleProfiler:
- def __init__(self, pid, sample_interval_usec, all_threads, *, mode=PROFILING_MODE_WALL, native=False, gc=True, opcodes=False, skip_non_matching_threads=True, collect_stats=False):
+ def __init__(self, pid, sample_interval_usec, all_threads, *, mode=PROFILING_MODE_WALL, native=False, gc=True, opcodes=False, skip_non_matching_threads=True, collect_stats=False, blocking=False):
self.pid = pid
self.sample_interval_usec = sample_interval_usec
self.all_threads = all_threads
self.mode = mode # Store mode for later use
self.collect_stats = collect_stats
+ self.blocking = blocking
try:
self.unwinder = self._new_unwinder(native, gc, opcodes, skip_non_matching_threads)
except RuntimeError as err:
running_time = 0
num_samples = 0
errors = 0
+ interrupted = False
start_time = next_time = time.perf_counter()
last_sample_time = start_time
realtime_update_interval = 1.0 # Update every second
last_realtime_update = start_time
- interrupted = False
-
try:
while running_time < duration_sec:
# Check if live collector wants to stop
current_time = time.perf_counter()
if next_time < current_time:
try:
- if async_aware == "all":
- stack_frames = self.unwinder.get_all_awaited_by()
- elif async_aware == "running":
- stack_frames = self.unwinder.get_async_stack_trace()
- else:
- stack_frames = self.unwinder.get_stack_trace()
- collector.collect(stack_frames)
- except ProcessLookupError:
+ with _pause_threads(self.unwinder, self.blocking):
+ if async_aware == "all":
+ stack_frames = self.unwinder.get_all_awaited_by()
+ elif async_aware == "running":
+ stack_frames = self.unwinder.get_async_stack_trace()
+ else:
+ stack_frames = self.unwinder.get_stack_trace()
+ collector.collect(stack_frames)
+ except ProcessLookupError as e:
duration_sec = current_time - start_time
break
except (RuntimeError, UnicodeDecodeError, MemoryError, OSError):
native=False,
gc=True,
opcodes=False,
+ blocking=False,
):
"""Sample a process using the provided collector.
native: Whether to include native frames
gc: Whether to include GC frames
opcodes: Whether to include opcode information
+ blocking: Whether to stop all threads before sampling for consistent snapshots
Returns:
The collector with collected samples
opcodes=opcodes,
skip_non_matching_threads=skip_non_matching_threads,
collect_stats=realtime_stats,
+ blocking=blocking,
)
profiler.realtime_stats = realtime_stats
native=False,
gc=True,
opcodes=False,
+ blocking=False,
):
"""Sample a process in live/interactive mode with curses TUI.
native: Whether to include native frames
gc: Whether to include GC frames
opcodes: Whether to include opcode information
+ blocking: Whether to stop all threads before sampling for consistent snapshots
Returns:
The collector with collected samples
opcodes=opcodes,
skip_non_matching_threads=skip_non_matching_threads,
collect_stats=realtime_stats,
+ blocking=blocking,
)
profiler.realtime_stats = realtime_stats
"Test only runs on Linux with process_vm_readv support",
)
def test_partial_stack_reuse(self):
- """Test that unchanged bottom frames are reused when top changes (A→B→C to A→B→D)."""
+ """Test that unchanged parent frames are reused from cache when top frame moves."""
script_body = """\
- def func_c():
- sock.sendall(b"at_c")
+ def level4():
+ sock.sendall(b"sync1")
sock.recv(16)
-
- def func_d():
- sock.sendall(b"at_d")
+ sock.sendall(b"sync2")
sock.recv(16)
- def func_b():
- func_c()
- func_d()
+ def level3():
+ level4()
- def func_a():
- func_b()
+ def level2():
+ level3()
+
+ def level1():
+ level2()
- func_a()
+ level1()
"""
with self._target_process(script_body) as (
):
unwinder = make_unwinder(cache_frames=True)
- # Sample at C: stack is A→B→C
- frames_c = self._sample_frames(
+ # Sample 1: level4 at first sendall
+ frames1 = self._sample_frames(
client_socket,
unwinder,
- b"at_c",
+ b"sync1",
b"ack",
- {"func_a", "func_b", "func_c"},
+ {"level1", "level2", "level3", "level4"},
)
- # Sample at D: stack is A→B→D (C returned, D called)
- frames_d = self._sample_frames(
+ # Sample 2: level4 at second sendall (same stack, different line)
+ frames2 = self._sample_frames(
client_socket,
unwinder,
- b"at_d",
+ b"sync2",
b"done",
- {"func_a", "func_b", "func_d"},
+ {"level1", "level2", "level3", "level4"},
)
- self.assertIsNotNone(frames_c)
- self.assertIsNotNone(frames_d)
+ self.assertIsNotNone(frames1)
+ self.assertIsNotNone(frames2)
- # Find func_a and func_b frames in both samples
def find_frame(frames, funcname):
for f in frames:
if f.funcname == funcname:
return f
return None
- frame_a_in_c = find_frame(frames_c, "func_a")
- frame_b_in_c = find_frame(frames_c, "func_b")
- frame_a_in_d = find_frame(frames_d, "func_a")
- frame_b_in_d = find_frame(frames_d, "func_b")
-
- self.assertIsNotNone(frame_a_in_c)
- self.assertIsNotNone(frame_b_in_c)
- self.assertIsNotNone(frame_a_in_d)
- self.assertIsNotNone(frame_b_in_d)
-
- # The bottom frames (A, B) should be the SAME objects (cache reuse)
- self.assertIs(
- frame_a_in_c,
- frame_a_in_d,
- "func_a frame should be reused from cache",
- )
- self.assertIs(
- frame_b_in_c,
- frame_b_in_d,
- "func_b frame should be reused from cache",
+ # level4 should have different line numbers (it moved)
+ l4_1 = find_frame(frames1, "level4")
+ l4_2 = find_frame(frames2, "level4")
+ self.assertIsNotNone(l4_1)
+ self.assertIsNotNone(l4_2)
+ self.assertNotEqual(
+ l4_1.location.lineno,
+ l4_2.location.lineno,
+ "level4 should be at different lines",
)
+ # Parent frames (level1, level2, level3) should be reused from cache
+ for name in ["level1", "level2", "level3"]:
+ f1 = find_frame(frames1, name)
+ f2 = find_frame(frames2, name)
+ self.assertIsNotNone(f1, f"{name} missing from sample 1")
+ self.assertIsNotNone(f2, f"{name} missing from sample 2")
+ self.assertIs(f1, f2, f"{name} should be reused from cache")
+
@skip_if_not_supported
@unittest.skipIf(
sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
--- /dev/null
+"""Tests for blocking mode sampling profiler."""
+
+import io
+import textwrap
+import unittest
+from unittest import mock
+
+try:
+ import _remote_debugging # noqa: F401
+ import profiling.sampling
+ import profiling.sampling.sample
+ from profiling.sampling.stack_collector import CollapsedStackCollector
+except ImportError:
+ raise unittest.SkipTest(
+ "Test only runs when _remote_debugging is available"
+ )
+
+from test.support import requires_remote_subprocess_debugging
+
+from .helpers import test_subprocess
+
+# Duration for profiling in tests
+PROFILING_DURATION_SEC = 1
+
+
+@requires_remote_subprocess_debugging()
+class TestBlockingModeStackAccuracy(unittest.TestCase):
+ """Test that blocking mode produces accurate stack traces.
+
+ When using blocking mode, the target process is stopped during sampling.
+ This ensures that we see accurate stack traces where functions appear
+ in the correct caller/callee relationship.
+
+ These tests verify that generator functions are correctly shown at the
+ top of the stack when they are actively executing, and not incorrectly
+ shown under their caller's code.
+ """
+
+ @classmethod
+ def setUpClass(cls):
+ # Test script that uses a generator consumed in a loop.
+ # When consume_generator is on the arithmetic lines (temp1, temp2, etc.),
+ # fibonacci_generator should NOT be in the stack at all.
+ # Line numbers are important here - see ARITHMETIC_LINES below.
+ cls.generator_script = textwrap.dedent('''
+ def fibonacci_generator(n):
+ a, b = 0, 1
+ for _ in range(n):
+ yield a
+ a, b = b, a + b
+
+ def consume_generator():
+ gen = fibonacci_generator(10000)
+ for value in gen:
+ temp1 = value + 1
+ temp2 = value * 2
+ temp3 = value - 1
+ result = temp1 + temp2 + temp3
+
+ def main():
+ while True:
+ consume_generator()
+
+ _test_sock.sendall(b"working")
+ main()
+ ''')
+ # Line numbers of the arithmetic operations in consume_generator.
+ # These are the lines where fibonacci_generator should NOT be in the stack.
+ # The socket injection code adds 7 lines before our script.
+ # temp1 = value + 1 -> line 17
+ # temp2 = value * 2 -> line 18
+ # temp3 = value - 1 -> line 19
+ # result = ... -> line 20
+ cls.ARITHMETIC_LINES = {17, 18, 19, 20}
+
+ def test_generator_not_under_consumer_arithmetic(self):
+ """Test that fibonacci_generator doesn't appear when consume_generator does arithmetic.
+
+ When consume_generator is executing arithmetic lines (temp1, temp2, etc.),
+ fibonacci_generator should NOT be anywhere in the stack - it's not being
+ called at that point.
+
+ Valid stacks:
+ - consume_generator at 'for value in gen:' line WITH fibonacci_generator
+ at the top (generator is yielding)
+ - consume_generator at arithmetic lines WITHOUT fibonacci_generator
+ (we're just doing math, not calling the generator)
+
+ Invalid stacks (indicate torn/inconsistent reads):
+ - consume_generator at arithmetic lines WITH fibonacci_generator
+ anywhere in the stack
+
+ Note: call_tree is ordered from bottom (index 0) to top (index -1).
+ """
+ with test_subprocess(self.generator_script, wait_for_working=True) as subproc:
+ collector = CollapsedStackCollector(sample_interval_usec=100, skip_idle=False)
+
+ with (
+ io.StringIO() as captured_output,
+ mock.patch("sys.stdout", captured_output),
+ ):
+ profiling.sampling.sample.sample(
+ subproc.process.pid,
+ collector,
+ duration_sec=PROFILING_DURATION_SEC,
+ blocking=True,
+ )
+
+ # Analyze collected stacks
+ total_samples = 0
+ invalid_stacks = 0
+ arithmetic_samples = 0
+
+ for (call_tree, _thread_id), count in collector.stack_counter.items():
+ total_samples += count
+
+ if not call_tree:
+ continue
+
+ # Find consume_generator in the stack and check its line number
+ for i, (filename, lineno, funcname) in enumerate(call_tree):
+ if funcname == "consume_generator" and lineno in self.ARITHMETIC_LINES:
+ arithmetic_samples += count
+ # Check if fibonacci_generator appears anywhere in this stack
+ func_names = [frame[2] for frame in call_tree]
+ if "fibonacci_generator" in func_names:
+ invalid_stacks += count
+ break
+
+ self.assertGreater(total_samples, 10,
+ f"Expected at least 10 samples, got {total_samples}")
+
+ # We should have some samples on the arithmetic lines
+ self.assertGreater(arithmetic_samples, 0,
+ f"Expected some samples on arithmetic lines, got {arithmetic_samples}")
+
+ self.assertEqual(invalid_stacks, 0,
+ f"Found {invalid_stacks}/{arithmetic_samples} invalid stacks where "
+ f"fibonacci_generator appears in the stack when consume_generator "
+ f"is on an arithmetic line. This indicates torn/inconsistent stack "
+ f"traces are being captured.")
limit=None,
no_summary=False,
opcodes=False,
+ blocking=False,
+ interval=1000,
)
parser = argparse.ArgumentParser()
--- /dev/null
+Add blocking mode to Tachyon for accurate stack traces in applications with
+many generators or fast-changing call stacks. Patch by Pablo Galindo.
# endif
#endif
+// Platforms that support pausing/resuming threads for accurate stack sampling
+#if defined(MS_WINDOWS) || defined(__linux__) || (defined(__APPLE__) && TARGET_OS_OSX)
+# define Py_REMOTE_DEBUG_SUPPORTS_BLOCKING 1
+#endif
+
#ifdef MS_WINDOWS
#include <windows.h>
#include <winternl.h>
+#endif
+
+#if defined(__APPLE__) && TARGET_OS_OSX
+
+typedef struct {
+ mach_port_t task;
+ int suspended;
+} _Py_RemoteDebug_ThreadsState;
+
+#elif defined(__linux__)
+
+typedef struct {
+ pid_t *tids; // Points to unwinder's reusable buffer
+ size_t count; // Number of threads currently seized
+} _Py_RemoteDebug_ThreadsState;
+
+#elif defined(MS_WINDOWS)
+
+typedef NTSTATUS (NTAPI *NtSuspendProcessFunc)(HANDLE ProcessHandle);
+typedef NTSTATUS (NTAPI *NtResumeProcessFunc)(HANDLE ProcessHandle);
+
+typedef struct {
+ HANDLE hProcess;
+ int suspended;
+} _Py_RemoteDebug_ThreadsState;
+
+#else
+
+typedef struct {
+ int dummy;
+} _Py_RemoteDebug_ThreadsState;
+
+#endif
+
+#ifdef MS_WINDOWS
#define STATUS_SUCCESS ((NTSTATUS)0x00000000L)
#define STATUS_INFO_LENGTH_MISMATCH ((NTSTATUS)0xC0000004L)
typedef enum _WIN32_THREADSTATE {
#ifdef MS_WINDOWS
PVOID win_process_buffer;
ULONG win_process_buffer_size;
+#endif
+ // Thread stopping state (only on platforms that support it)
+#ifdef Py_REMOTE_DEBUG_SUPPORTS_BLOCKING
+ _Py_RemoteDebug_ThreadsState threads_state;
+ int threads_stopped; // 1 if threads are currently stopped
+#endif
+#ifdef __linux__
+ pid_t *thread_tids; // Reusable buffer for thread IDs
+ size_t thread_tids_capacity; // Current capacity of thread_tids buffer
#endif
} RemoteUnwinderObject;
uintptr_t gc_frame; // GC frame address (0 if not tracking)
uintptr_t last_profiled_frame; // Last cached frame (0 if no cache)
StackChunkList *chunks; // Pre-copied stack chunks
+ int skip_first_frame; // Skip frame_addr itself (continue from its caller)
/* Outputs */
PyObject *frame_info; // List to append FrameInfo objects
uintptr_t gc_frame
);
+/* Thread stopping functions (for blocking mode) */
+extern void _Py_RemoteDebug_InitThreadsState(RemoteUnwinderObject *unwinder, _Py_RemoteDebug_ThreadsState *st);
+extern int _Py_RemoteDebug_StopAllThreads(RemoteUnwinderObject *unwinder, _Py_RemoteDebug_ThreadsState *st);
+extern void _Py_RemoteDebug_ResumeAllThreads(RemoteUnwinderObject *unwinder, _Py_RemoteDebug_ThreadsState *st);
+
/* ============================================================================
* ASYNCIO FUNCTION DECLARATIONS
* ============================================================================ */
return return_value;
}
+PyDoc_STRVAR(_remote_debugging_RemoteUnwinder_pause_threads__doc__,
+"pause_threads($self, /)\n"
+"--\n"
+"\n"
+"Pause all threads in the target process.\n"
+"\n"
+"This stops all threads in the target process to allow for consistent\n"
+"memory reads during sampling. Must be paired with a call to resume_threads().\n"
+"\n"
+"Returns True if threads were successfully paused, False if they were already paused.\n"
+"\n"
+"Raises:\n"
+" RuntimeError: If there is an error stopping the threads");
+
+#define _REMOTE_DEBUGGING_REMOTEUNWINDER_PAUSE_THREADS_METHODDEF \
+ {"pause_threads", (PyCFunction)_remote_debugging_RemoteUnwinder_pause_threads, METH_NOARGS, _remote_debugging_RemoteUnwinder_pause_threads__doc__},
+
+static PyObject *
+_remote_debugging_RemoteUnwinder_pause_threads_impl(RemoteUnwinderObject *self);
+
+static PyObject *
+_remote_debugging_RemoteUnwinder_pause_threads(PyObject *self, PyObject *Py_UNUSED(ignored))
+{
+ PyObject *return_value = NULL;
+
+ Py_BEGIN_CRITICAL_SECTION(self);
+ return_value = _remote_debugging_RemoteUnwinder_pause_threads_impl((RemoteUnwinderObject *)self);
+ Py_END_CRITICAL_SECTION();
+
+ return return_value;
+}
+
+PyDoc_STRVAR(_remote_debugging_RemoteUnwinder_resume_threads__doc__,
+"resume_threads($self, /)\n"
+"--\n"
+"\n"
+"Resume all threads in the target process.\n"
+"\n"
+"This resumes threads that were previously paused with pause_threads().\n"
+"\n"
+"Returns True if threads were successfully resumed, False if they were not paused.");
+
+#define _REMOTE_DEBUGGING_REMOTEUNWINDER_RESUME_THREADS_METHODDEF \
+ {"resume_threads", (PyCFunction)_remote_debugging_RemoteUnwinder_resume_threads, METH_NOARGS, _remote_debugging_RemoteUnwinder_resume_threads__doc__},
+
+static PyObject *
+_remote_debugging_RemoteUnwinder_resume_threads_impl(RemoteUnwinderObject *self);
+
+static PyObject *
+_remote_debugging_RemoteUnwinder_resume_threads(PyObject *self, PyObject *Py_UNUSED(ignored))
+{
+ PyObject *return_value = NULL;
+
+ Py_BEGIN_CRITICAL_SECTION(self);
+ return_value = _remote_debugging_RemoteUnwinder_resume_threads_impl((RemoteUnwinderObject *)self);
+ Py_END_CRITICAL_SECTION();
+
+ return return_value;
+}
+
PyDoc_STRVAR(_remote_debugging_BinaryWriter___init____doc__,
"BinaryWriter(filename, sample_interval_us, start_time_us, *,\n"
" compression=0)\n"
exit:
return return_value;
}
-/*[clinic end generated code: output=036de0b06d0e34cc input=a9049054013a1b77]*/
+/*[clinic end generated code: output=34f50b18f317b9b6 input=a9049054013a1b77]*/
Py_ssize_t num_frames = PyList_GET_SIZE(entry->frame_list);
- // Extend frame_info with frames from start_idx onwards
- PyObject *slice = PyList_GetSlice(entry->frame_list, start_idx, num_frames);
+ // Extend frame_info with frames ABOVE start_idx (not including it).
+ // The frame at start_idx (last_profiled_frame) was the executing frame
+ // in the previous sample and its line number may have changed.
+ // Only frames above it (its callers) are frozen at their call sites.
+ Py_ssize_t cache_start = start_idx + 1;
+ if (cache_start >= num_frames) {
+ return 0; // Nothing above last_profiled_frame to extend with
+ }
+
+ PyObject *slice = PyList_GetSlice(entry->frame_list, cache_start, num_frames);
if (!slice) {
return -1;
}
return -1;
}
- // Also extend frame_addrs with cached addresses if provided
+ // Also extend frame_addrs with cached addresses (above last_profiled_frame)
if (frame_addrs) {
- for (Py_ssize_t i = start_idx; i < entry->num_addrs && *num_addrs < max_addrs; i++) {
+ for (Py_ssize_t i = cache_start; i < entry->num_addrs && *num_addrs < max_addrs; i++) {
frame_addrs[(*num_addrs)++] = entry->addrs[i];
}
}
ctx->stopped_at_cached_frame = 0;
ctx->last_frame_visited = 0;
- if (ctx->last_profiled_frame != 0 && ctx->frame_addr == ctx->last_profiled_frame) {
- ctx->stopped_at_cached_frame = 1;
- return 0;
- }
-
while ((void*)frame_addr != NULL) {
- if (ctx->last_profiled_frame != 0 && frame_addr == ctx->last_profiled_frame) {
- ctx->stopped_at_cached_frame = 1;
- break;
- }
PyObject *frame = NULL;
uintptr_t next_frame_addr = 0;
uintptr_t stackpointer = 0;
return -1;
}
}
+
+ // Skip first frame if requested (used for cache miss continuation)
+ if (ctx->skip_first_frame && frame_count == 1) {
+ Py_XDECREF(frame);
+ frame_addr = next_frame_addr;
+ continue;
+ }
+
if (frame == NULL && PyList_GET_SIZE(ctx->frame_info) == 0) {
const char *e = "Failed to parse initial frame in chain";
PyErr_SetString(PyExc_RuntimeError, e);
Py_DECREF(frame);
}
+ if (ctx->last_profiled_frame != 0 && frame_addr == ctx->last_profiled_frame) {
+ ctx->stopped_at_cached_frame = 1;
+ break;
+ }
+
prev_frame_addr = next_frame_addr;
frame_addr = next_frame_addr;
}
}
if (cache_result == 0) {
STATS_INC(unwinder, frame_cache_misses);
- Py_ssize_t frames_before_walk = PyList_GET_SIZE(ctx->frame_info);
+ // Continue walking from last_profiled_frame, skipping it (already processed)
+ Py_ssize_t frames_before_walk = PyList_GET_SIZE(ctx->frame_info);
FrameWalkContext continue_ctx = {
.frame_addr = ctx->last_profiled_frame,
.base_frame_addr = ctx->base_frame_addr,
.gc_frame = ctx->gc_frame,
- .last_profiled_frame = 0,
.chunks = ctx->chunks,
+ .skip_first_frame = 1,
.frame_info = ctx->frame_info,
.frame_addrs = ctx->frame_addrs,
.num_addrs = ctx->num_addrs,
}
ctx->num_addrs = continue_ctx.num_addrs;
ctx->last_frame_visited = continue_ctx.last_frame_visited;
-
STATS_ADD(unwinder, frames_read_from_memory, PyList_GET_SIZE(ctx->frame_info) - frames_before_walk);
} else {
// Partial cache hit - cached stack was validated as complete when stored,
self->skip_non_matching_threads = skip_non_matching_threads;
self->cached_state = NULL;
self->frame_cache = NULL;
+#ifdef Py_REMOTE_DEBUG_SUPPORTS_BLOCKING
+ self->threads_stopped = 0;
+#endif
// Initialize stats to zero
memset(&self->stats, 0, sizeof(self->stats));
if (_Py_RemoteDebug_InitProcHandle(&self->handle, pid) < 0) {
self->win_process_buffer = NULL;
self->win_process_buffer_size = 0;
#endif
+#ifdef __linux__
+ self->thread_tids = NULL;
+ self->thread_tids_capacity = 0;
+#endif
if (cache_frames && frame_cache_init(self) < 0) {
return -1;
return result;
}
+/*[clinic input]
+@critical_section
+_remote_debugging.RemoteUnwinder.pause_threads
+
+Pause all threads in the target process.
+
+This stops all threads in the target process to allow for consistent
+memory reads during sampling. Must be paired with a call to resume_threads().
+
+Returns True if threads were successfully paused, False if they were already paused.
+
+Raises:
+ RuntimeError: If there is an error stopping the threads
+[clinic start generated code]*/
+
+static PyObject *
+_remote_debugging_RemoteUnwinder_pause_threads_impl(RemoteUnwinderObject *self)
+/*[clinic end generated code: output=aaf2bdc0a725750c input=78601c60dbc245fe]*/
+{
+#ifdef Py_REMOTE_DEBUG_SUPPORTS_BLOCKING
+ if (self->threads_stopped) {
+ Py_RETURN_FALSE;
+ }
+
+ _Py_RemoteDebug_InitThreadsState(self, &self->threads_state);
+ if (_Py_RemoteDebug_StopAllThreads(self, &self->threads_state) < 0) {
+ return NULL;
+ }
+
+ self->threads_stopped = 1;
+ Py_RETURN_TRUE;
+#else
+ PyErr_SetString(PyExc_NotImplementedError,
+ "pause_threads is not supported on this platform");
+ return NULL;
+#endif
+}
+
+/*[clinic input]
+@critical_section
+_remote_debugging.RemoteUnwinder.resume_threads
+
+Resume all threads in the target process.
+
+This resumes threads that were previously paused with pause_threads().
+
+Returns True if threads were successfully resumed, False if they were not paused.
+[clinic start generated code]*/
+
+static PyObject *
+_remote_debugging_RemoteUnwinder_resume_threads_impl(RemoteUnwinderObject *self)
+/*[clinic end generated code: output=8d6781ea37095536 input=67ca813bd804289e]*/
+{
+#ifdef Py_REMOTE_DEBUG_SUPPORTS_BLOCKING
+ if (!self->threads_stopped) {
+ Py_RETURN_FALSE;
+ }
+
+ _Py_RemoteDebug_ResumeAllThreads(self, &self->threads_state);
+ self->threads_stopped = 0;
+ Py_RETURN_TRUE;
+#else
+ PyErr_SetString(PyExc_NotImplementedError,
+ "resume_threads is not supported on this platform");
+ return NULL;
+#endif
+}
+
static PyMethodDef RemoteUnwinder_methods[] = {
_REMOTE_DEBUGGING_REMOTEUNWINDER_GET_STACK_TRACE_METHODDEF
_REMOTE_DEBUGGING_REMOTEUNWINDER_GET_ALL_AWAITED_BY_METHODDEF
_REMOTE_DEBUGGING_REMOTEUNWINDER_GET_ASYNC_STACK_TRACE_METHODDEF
_REMOTE_DEBUGGING_REMOTEUNWINDER_GET_STATS_METHODDEF
+ _REMOTE_DEBUGGING_REMOTEUNWINDER_PAUSE_THREADS_METHODDEF
+ _REMOTE_DEBUGGING_REMOTEUNWINDER_RESUME_THREADS_METHODDEF
{NULL, NULL}
};
{
RemoteUnwinderObject *self = RemoteUnwinder_CAST(op);
PyTypeObject *tp = Py_TYPE(self);
+
+#ifdef Py_REMOTE_DEBUG_SUPPORTS_BLOCKING
+ if (self->threads_stopped) {
+ _Py_RemoteDebug_ResumeAllThreads(self, &self->threads_state);
+ self->threads_stopped = 0;
+ }
+#endif
+#ifdef __linux__
+ if (self->thread_tids != NULL) {
+ PyMem_RawFree(self->thread_tids);
+ self->thread_tids = NULL;
+ }
+#endif
+
if (self->code_object_cache) {
_Py_hashtable_destroy(self->code_object_cache);
}
#include <unistd.h>
#endif
+#ifdef __linux__
+#include <dirent.h>
+#include <sys/ptrace.h>
+#include <sys/wait.h>
+#endif
+
/* ============================================================================
* THREAD ITERATION FUNCTIONS
* ============================================================================ */
cleanup_stack_chunks(&chunks);
return NULL;
}
+
+/* ============================================================================
+ * PROCESS STOP FUNCTIONS
+ * ============================================================================ */
+
+#if defined(__APPLE__) && TARGET_OS_OSX
+
+void
+_Py_RemoteDebug_InitThreadsState(RemoteUnwinderObject *unwinder, _Py_RemoteDebug_ThreadsState *st)
+{
+ st->task = MACH_PORT_NULL;
+ st->suspended = 0;
+}
+
+int
+_Py_RemoteDebug_StopAllThreads(RemoteUnwinderObject *unwinder, _Py_RemoteDebug_ThreadsState *st)
+{
+ kern_return_t kr = task_suspend(unwinder->handle.task);
+ if (kr != KERN_SUCCESS) {
+ if (kr == MACH_SEND_INVALID_DEST) {
+ PyErr_Format(PyExc_ProcessLookupError,
+ "Process %d has terminated", unwinder->handle.pid);
+ } else {
+ PyErr_Format(PyExc_RuntimeError,
+ "task_suspend failed for PID %d: kern_return_t %d",
+ unwinder->handle.pid, kr);
+ }
+ return -1;
+ }
+
+ st->task = unwinder->handle.task;
+ st->suspended = 1;
+ _Py_RemoteDebug_ClearCache(&unwinder->handle);
+ return 0;
+}
+
+void
+_Py_RemoteDebug_ResumeAllThreads(RemoteUnwinderObject *unwinder, _Py_RemoteDebug_ThreadsState *st)
+{
+ if (!st->suspended || st->task == MACH_PORT_NULL) {
+ return;
+ }
+ task_resume(st->task);
+ st->task = MACH_PORT_NULL;
+ st->suspended = 0;
+}
+
+#elif defined(__linux__)
+
+void
+_Py_RemoteDebug_InitThreadsState(RemoteUnwinderObject *unwinder, _Py_RemoteDebug_ThreadsState *st)
+{
+ st->tids = NULL;
+ st->count = 0;
+}
+
+static int
+read_thread_ids(RemoteUnwinderObject *unwinder, _Py_RemoteDebug_ThreadsState *st)
+{
+ char task_path[64];
+ snprintf(task_path, sizeof(task_path), "/proc/%d/task", unwinder->handle.pid);
+
+ DIR *dir = opendir(task_path);
+ if (dir == NULL) {
+ st->tids = NULL;
+ st->count = 0;
+ if (errno == ENOENT || errno == ESRCH) {
+ PyErr_Format(PyExc_ProcessLookupError,
+ "Process %d has terminated", unwinder->handle.pid);
+ } else {
+ PyErr_SetFromErrnoWithFilename(PyExc_OSError, task_path);
+ }
+ return -1;
+ }
+
+ st->count = 0;
+
+ struct dirent *entry;
+ while ((entry = readdir(dir)) != NULL) {
+ if (entry->d_name[0] < '1' || entry->d_name[0] > '9') {
+ continue;
+ }
+ char *endptr;
+ long tid = strtol(entry->d_name, &endptr, 10);
+ if (*endptr != '\0' || tid <= 0) {
+ continue;
+ }
+ if (st->count >= unwinder->thread_tids_capacity) {
+ size_t new_cap = unwinder->thread_tids_capacity == 0 ? 64 : unwinder->thread_tids_capacity * 2;
+ pid_t *new_tids = PyMem_RawRealloc(unwinder->thread_tids, new_cap * sizeof(pid_t));
+ if (new_tids == NULL) {
+ closedir(dir);
+ st->tids = NULL;
+ st->count = 0;
+ PyErr_NoMemory();
+ return -1;
+ }
+ unwinder->thread_tids = new_tids;
+ unwinder->thread_tids_capacity = new_cap;
+ }
+ unwinder->thread_tids[st->count++] = (pid_t)tid;
+ }
+
+ st->tids = unwinder->thread_tids;
+ closedir(dir);
+ return 0;
+}
+
+static inline void
+detach_threads(_Py_RemoteDebug_ThreadsState *st, size_t up_to)
+{
+ for (size_t j = 0; j < up_to; j++) {
+ ptrace(PTRACE_DETACH, st->tids[j], NULL, NULL);
+ }
+}
+
+static int
+seize_thread(pid_t tid)
+{
+ if (ptrace(PTRACE_SEIZE, tid, NULL, 0) == 0) {
+ return 0;
+ }
+ if (errno == ESRCH) {
+ return 1; // Thread gone, skip
+ }
+ if (errno == EINVAL || errno == EIO) {
+ // Fallback for older kernels
+ if (ptrace(PTRACE_ATTACH, tid, NULL, NULL) == 0) {
+ int status;
+ waitpid(tid, &status, __WALL);
+ return 0;
+ }
+ if (errno == ESRCH) {
+ return 1; // Thread gone
+ }
+ }
+ return -1; // Real error
+}
+
+int
+_Py_RemoteDebug_StopAllThreads(RemoteUnwinderObject *unwinder, _Py_RemoteDebug_ThreadsState *st)
+{
+ if (read_thread_ids(unwinder, st) < 0) {
+ return -1;
+ }
+
+ for (size_t i = 0; i < st->count; i++) {
+ pid_t tid = st->tids[i];
+
+ int ret = seize_thread(tid);
+ if (ret == 1) {
+ continue; // Thread gone, skip
+ }
+ if (ret < 0) {
+ detach_threads(st, i);
+ PyErr_Format(PyExc_RuntimeError, "Failed to seize thread %d: %s", tid, strerror(errno));
+ st->tids = NULL;
+ st->count = 0;
+ return -1;
+ }
+
+ if (ptrace(PTRACE_INTERRUPT, tid, NULL, NULL) == -1 && errno != ESRCH) {
+ detach_threads(st, i + 1);
+ PyErr_Format(PyExc_RuntimeError, "Failed to interrupt thread %d: %s", tid, strerror(errno));
+ st->tids = NULL;
+ st->count = 0;
+ return -1;
+ }
+
+ int status;
+ if (waitpid(tid, &status, __WALL) == -1 && errno != ECHILD && errno != ESRCH) {
+ detach_threads(st, i + 1);
+ PyErr_Format(PyExc_RuntimeError, "waitpid failed for thread %d: %s", tid, strerror(errno));
+ st->tids = NULL;
+ st->count = 0;
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+void
+_Py_RemoteDebug_ResumeAllThreads(RemoteUnwinderObject *unwinder, _Py_RemoteDebug_ThreadsState *st)
+{
+ if (st->tids == NULL || st->count == 0) {
+ return;
+ }
+ detach_threads(st, st->count);
+ st->tids = NULL;
+ st->count = 0;
+}
+
+#elif defined(MS_WINDOWS)
+
+void
+_Py_RemoteDebug_InitThreadsState(RemoteUnwinderObject *unwinder, _Py_RemoteDebug_ThreadsState *st)
+{
+ st->hProcess = NULL;
+ st->suspended = 0;
+}
+
+int
+_Py_RemoteDebug_StopAllThreads(RemoteUnwinderObject *unwinder, _Py_RemoteDebug_ThreadsState *st)
+{
+ static NtSuspendProcessFunc pNtSuspendProcess = NULL;
+ static int tried_load = 0;
+
+ if (!tried_load) {
+ HMODULE hNtdll = GetModuleHandleW(L"ntdll.dll");
+ if (hNtdll) {
+ pNtSuspendProcess = (NtSuspendProcessFunc)GetProcAddress(hNtdll, "NtSuspendProcess");
+ }
+ tried_load = 1;
+ }
+
+ if (pNtSuspendProcess == NULL) {
+ PyErr_SetString(PyExc_RuntimeError, "NtSuspendProcess not available");
+ return -1;
+ }
+
+ NTSTATUS status = pNtSuspendProcess(unwinder->handle.hProcess);
+ if (status >= 0) {
+ st->hProcess = unwinder->handle.hProcess;
+ st->suspended = 1;
+ _Py_RemoteDebug_ClearCache(&unwinder->handle);
+ return 0;
+ }
+
+ PyErr_Format(PyExc_RuntimeError, "NtSuspendProcess failed: 0x%lx", status);
+ return -1;
+}
+
+void
+_Py_RemoteDebug_ResumeAllThreads(RemoteUnwinderObject *unwinder, _Py_RemoteDebug_ThreadsState *st)
+{
+ if (!st->suspended || st->hProcess == NULL) {
+ return;
+ }
+
+ static NtResumeProcessFunc pNtResumeProcess = NULL;
+ static int tried_load = 0;
+
+ if (!tried_load) {
+ HMODULE hNtdll = GetModuleHandleW(L"ntdll.dll");
+ if (hNtdll) {
+ pNtResumeProcess = (NtResumeProcessFunc)GetProcAddress(hNtdll, "NtResumeProcess");
+ }
+ tried_load = 1;
+ }
+
+ if (pNtResumeProcess != NULL) {
+ pNtResumeProcess(st->hProcess);
+ }
+ st->hProcess = NULL;
+ st->suspended = 0;
+}
+
+#else
+
+void
+_Py_RemoteDebug_InitThreadsState(RemoteUnwinderObject *unwinder, _Py_RemoteDebug_ThreadsState *st)
+{
+ (void)unwinder;
+ (void)st;
+}
+
+int
+_Py_RemoteDebug_StopAllThreads(RemoteUnwinderObject *unwinder, _Py_RemoteDebug_ThreadsState *st)
+{
+ (void)unwinder;
+ (void)st;
+ return 0;
+}
+
+void
+_Py_RemoteDebug_ResumeAllThreads(RemoteUnwinderObject *unwinder, _Py_RemoteDebug_ThreadsState *st)
+{
+ (void)unwinder;
+ (void)st;
+}
+
+#endif
#ifdef __linux__
# include <elf.h>
# include <sys/uio.h>
+# include <sys/ptrace.h>
+# include <sys/wait.h>
+# include <dirent.h>
# if INTPTR_MAX == INT64_MAX
# define Elf_Ehdr Elf64_Ehdr
# define Elf_Shdr Elf64_Shdr
# define Elf_Phdr Elf32_Phdr
# endif
# include <sys/mman.h>
+
+// PTRACE options - define if not available
+# ifndef PTRACE_SEIZE
+# define PTRACE_SEIZE 0x4206
+# endif
+# ifndef PTRACE_INTERRUPT
+# define PTRACE_INTERRUPT 0x4207
+# endif
+# ifndef PTRACE_EVENT_STOP
+# define PTRACE_EVENT_STOP 128
+# endif
#endif
#if defined(__APPLE__) && defined(TARGET_OS_OSX) && TARGET_OS_OSX
# include <mach/mach_vm.h>
# include <mach/machine.h>
# include <mach/task_info.h>
+# include <mach/thread_act.h>
# include <sys/mman.h>
# include <sys/proc.h>
# include <sys/sysctl.h>
}
#elif defined(MS_WINDOWS)
handle->hProcess = OpenProcess(
- PROCESS_VM_READ | PROCESS_VM_WRITE | PROCESS_VM_OPERATION | PROCESS_QUERY_INFORMATION,
+ PROCESS_VM_READ | PROCESS_VM_WRITE | PROCESS_VM_OPERATION | PROCESS_QUERY_INFORMATION | PROCESS_SUSPEND_RESUME,
FALSE, pid);
if (handle->hProcess == NULL) {
PyErr_SetFromWindowsErr(0);
}
-def benchmark(unwinder, duration_seconds=10):
+def benchmark(unwinder, duration_seconds=10, blocking=False):
"""Benchmark mode - measure raw sampling speed for specified duration"""
sample_count = 0
fail_count = 0
total_attempts += 1
work_start = time.perf_counter()
try:
- stack_trace = unwinder.get_stack_trace()
- if stack_trace:
- sample_count += 1
+ if blocking:
+ unwinder.pause_threads()
+ try:
+ stack_trace = unwinder.get_stack_trace()
+ if stack_trace:
+ sample_count += 1
+ finally:
+ if blocking:
+ unwinder.resume_threads()
except (OSError, RuntimeError, UnicodeDecodeError) as e:
fail_count += 1
help="Which threads to include in the benchmark (default: all)",
)
+ parser.add_argument(
+ "--blocking",
+ action="store_true",
+ help="Stop all threads before sampling for consistent snapshots",
+ )
+
return parser.parse_args()
print(
f"{colors.CYAN}Benchmark Duration:{colors.RESET} {colors.YELLOW}{args.duration}{colors.RESET} seconds"
)
+ print(
+ f"{colors.CYAN}Blocking Mode:{colors.RESET} {colors.GREEN if args.blocking else colors.YELLOW}{'enabled' if args.blocking else 'disabled'}{colors.RESET}"
+ )
process = None
temp_file_path = None
unwinder = _remote_debugging.RemoteUnwinder(
process.pid, cache_frames=True, **kwargs
)
- results = benchmark(unwinder, duration_seconds=args.duration)
+ results = benchmark(unwinder, duration_seconds=args.duration, blocking=args.blocking)
finally:
cleanup_process(process, temp_file_path)