"""Thin Python wrapper around C binary reader for profiling data."""
+import _remote_debugging
+
+from .gecko_collector import GeckoCollector
+from .stack_collector import FlamegraphCollector, CollapsedStackCollector
+from .pstats_collector import PstatsCollector
+
class BinaryReader:
"""High-performance binary reader using C implementation.
self._reader = None
def __enter__(self):
- import _remote_debugging
self._reader = _remote_debugging.BinaryReader(self.filename)
return self
Returns:
int: Number of samples converted
"""
- from .gecko_collector import GeckoCollector
- from .stack_collector import FlamegraphCollector, CollapsedStackCollector
- from .pstats_collector import PStatsCollector
-
with BinaryReader(input_file) as reader:
info = reader.get_info()
interval = sample_interval_usec or info['sample_interval_us']
elif output_format == 'collapsed':
collector = CollapsedStackCollector(interval)
elif output_format == 'pstats':
- collector = PStatsCollector(interval)
+ collector = PstatsCollector(interval)
elif output_format == 'gecko':
collector = GeckoCollector(interval)
else:
SORT_MODE_NSAMPLES_CUMUL,
)
+try:
+ from ._child_monitor import ChildProcessMonitor
+except ImportError:
+ # _remote_debugging module not available on this platform (e.g., WASI)
+ ChildProcessMonitor = None
+
try:
from .live_collector import LiveStatsCollector
except ImportError:
}
def _setup_child_monitor(args, parent_pid):
- from ._child_monitor import ChildProcessMonitor
-
# Build CLI args for child profilers (excluding --subprocesses to avoid recursion)
child_cli_args = _build_child_profiler_args(args)
# --subprocesses is incompatible with --live
if hasattr(args, 'subprocesses') and args.subprocesses:
+ if ChildProcessMonitor is None:
+ parser.error(
+ "--subprocesses is not available on this platform "
+ "(requires _remote_debugging module)."
+ )
if hasattr(args, 'live') and args.live:
parser.error("--subprocesses is incompatible with --live mode.")
def _handle_replay(args):
"""Handle the 'replay' command - convert binary profile to another format."""
- import os
-
if not os.path.exists(args.input_file):
sys.exit(f"Error: Input file not found: {args.input_file}")
from ._css_utils import get_combined_css
from ._format_utils import fmt
from .collector import normalize_location, extract_lineno
+from .opcode_utils import get_opcode_info, format_opcode
from .stack_collector import StackTraceCollector
Returns:
List of dicts with instruction info, sorted by samples descending
"""
- from .opcode_utils import get_opcode_info, format_opcode
-
key = (filename, lineno)
opcode_data = self.line_opcodes.get(key, {})
Simple: collect ranges with sample counts, assign each byte position to
smallest covering range, then emit spans for contiguous runs with sample data.
"""
- import html as html_module
-
content = line_content.rstrip('\n')
if not content:
return ''
range_data[key]['opcodes'].append(opname)
if not range_data:
- return html_module.escape(content)
+ return html.escape(content)
# For each byte position, find the smallest covering range
byte_to_range = {}
def flush_span():
nonlocal span_chars, current_range
if span_chars:
- text = html_module.escape(''.join(span_chars))
+ text = html.escape(''.join(span_chars))
if current_range:
data = range_data.get(current_range, {'samples': 0, 'opcodes': []})
samples = data['samples']
f'data-samples="{samples}" '
f'data-max-samples="{max_range_samples}" '
f'data-pct="{pct}" '
- f'data-opcodes="{html_module.escape(opcodes)}">{text}</span>')
+ f'data-opcodes="{html.escape(opcodes)}">{text}</span>')
else:
result.append(text)
span_chars = []
def _handle_input(self):
"""Handle keyboard input (non-blocking)."""
- from . import constants
-
self.display.set_nodelay(True)
ch = self.display.get_input()
PROFILING_MODE_GIL,
PROFILING_MODE_WALL,
)
+from ..opcode_utils import get_opcode_info, format_opcode
class Widget(ABC):
Returns:
Next available line number
"""
- from ..opcode_utils import get_opcode_info, format_opcode
-
stats_list = kwargs.get("stats_list", [])
height = kwargs.get("height", 24)
selected_row = self.collector.selected_row
import collections
import marshal
+import pstats
from _colorize import ANSIColors
from .collector import Collector, extract_lineno
-from .constants import MICROSECONDS_PER_SECOND
+from .constants import MICROSECONDS_PER_SECOND, PROFILING_MODE_CPU
class PstatsCollector(Collector):
def print_stats(self, sort=-1, limit=None, show_summary=True, mode=None):
"""Print formatted statistics to stdout."""
- import pstats
- from .constants import PROFILING_MODE_CPU
-
# Create stats object
stats = pstats.SampledStats(self).strip_dirs()
if not stats.stats:
import linecache
import os
import sys
+import sysconfig
from ._css_utils import get_combined_css
from .collector import Collector, extract_lineno
}
# Calculate thread status percentages for display
- import sysconfig
is_free_threaded = bool(sysconfig.get_config_var("Py_GIL_DISABLED"))
total_threads = max(1, self.thread_status_counts["total"])
thread_stats = {
import _remote_debugging # noqa: F401
import profiling.sampling
import profiling.sampling.sample
+ from profiling.sampling.pstats_collector import PstatsCollector
+ from profiling.sampling.stack_collector import CollapsedStackCollector
except ImportError:
raise unittest.SkipTest(
"Test only runs when _remote_debugging is available"
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
- from profiling.sampling.pstats_collector import PstatsCollector
collector = PstatsCollector(sample_interval_usec=5000, skip_idle=False)
profiling.sampling.sample.sample(
subproc.process.pid,
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
- from profiling.sampling.pstats_collector import PstatsCollector
collector = PstatsCollector(sample_interval_usec=5000, skip_idle=False)
profiling.sampling.sample.sample(
subproc.process.pid,
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
- from profiling.sampling.stack_collector import CollapsedStackCollector
collector = CollapsedStackCollector(1000, skip_idle=False)
profiling.sampling.sample.sample(
subproc.process.pid,
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
- from profiling.sampling.pstats_collector import PstatsCollector
collector = PstatsCollector(sample_interval_usec=5000, skip_idle=False)
profiling.sampling.sample.sample(
subproc.process.pid,
3. Stack traversal: _build_linear_stacks() with BFS
"""
+import inspect
import unittest
try:
import _remote_debugging # noqa: F401
from profiling.sampling.pstats_collector import PstatsCollector
+ from profiling.sampling.stack_collector import FlamegraphCollector
+ from profiling.sampling.sample import sample, sample_live, SampleProfiler
except ImportError:
raise unittest.SkipTest(
"Test only runs when _remote_debugging is available"
def test_flamegraph_with_async_frames(self):
"""Test FlamegraphCollector correctly processes async task frames."""
- from profiling.sampling.stack_collector import FlamegraphCollector
-
collector = FlamegraphCollector(sample_interval_usec=1000)
# Build async task tree: Root -> Child
def test_flamegraph_with_task_markers(self):
"""Test FlamegraphCollector includes <task> boundary markers."""
- from profiling.sampling.stack_collector import FlamegraphCollector
-
collector = FlamegraphCollector(sample_interval_usec=1000)
task = MockTaskInfo(
def test_flamegraph_multiple_async_samples(self):
"""Test FlamegraphCollector aggregates multiple async samples correctly."""
- from profiling.sampling.stack_collector import FlamegraphCollector
-
collector = FlamegraphCollector(sample_interval_usec=1000)
task = MockTaskInfo(
def test_sample_function_accepts_async_aware(self):
"""Test that sample() function accepts async_aware parameter."""
- from profiling.sampling.sample import sample
- import inspect
-
sig = inspect.signature(sample)
self.assertIn("async_aware", sig.parameters)
def test_sample_live_function_accepts_async_aware(self):
"""Test that sample_live() function accepts async_aware parameter."""
- from profiling.sampling.sample import sample_live
- import inspect
-
sig = inspect.signature(sample_live)
self.assertIn("async_aware", sig.parameters)
def test_sample_profiler_sample_accepts_async_aware(self):
"""Test that SampleProfiler.sample() accepts async_aware parameter."""
- from profiling.sampling.sample import SampleProfiler
- import inspect
-
sig = inspect.signature(SampleProfiler.sample)
self.assertIn("async_aware", sig.parameters)
import threading
import time
import unittest
+from unittest.mock import MagicMock, patch
from test.support import (
SHORT_TIMEOUT,
requires_remote_subprocess_debugging,
)
+# Guard imports that require _remote_debugging module.
+# This module is not available on all platforms (e.g., WASI).
+try:
+ from profiling.sampling._child_monitor import (
+ get_child_pids,
+ ChildProcessMonitor,
+ is_python_process,
+ _MAX_CHILD_PROFILERS,
+ _CLEANUP_INTERVAL_CYCLES,
+ )
+except ImportError:
+ # Module will be skipped via @requires_remote_subprocess_debugging decorators
+ get_child_pids = None
+ ChildProcessMonitor = None
+ is_python_process = None
+ _MAX_CHILD_PROFILERS = None
+ _CLEANUP_INTERVAL_CYCLES = None
+
+try:
+ from profiling.sampling.cli import (
+ _add_sampling_options,
+ _validate_args,
+ _build_child_profiler_args,
+ _build_output_pattern,
+ _setup_child_monitor,
+ )
+except ImportError:
+ # cli module imports sample module which requires _remote_debugging
+ _add_sampling_options = None
+ _validate_args = None
+ _build_child_profiler_args = None
+ _build_output_pattern = None
+ _setup_child_monitor = None
+
from .helpers import _cleanup_process
# String to check for in stderr when profiler lacks permissions (e.g., macOS)
def test_get_child_pids_fallback(self):
"""Test the fallback implementation for get_child_pids."""
- from profiling.sampling._child_monitor import get_child_pids
-
# Test with current process
result = get_child_pids(os.getpid())
self.assertIsInstance(result, list)
@unittest.skipUnless(sys.platform == "linux", "Linux only")
def test_discover_child_process_linux(self):
"""Test that we can discover child processes on Linux."""
- from profiling.sampling._child_monitor import get_child_pids
-
# Create a child process
proc = subprocess.Popen(
[sys.executable, "-c", "import time; time.sleep(10)"],
def test_recursive_child_discovery(self):
"""Test that recursive=True finds grandchildren."""
- from profiling.sampling._child_monitor import get_child_pids
-
# Create a child that spawns a grandchild and keeps a reference to it
# so we can clean it up via the child process
code = """
def test_nonexistent_pid_returns_empty(self):
"""Test that nonexistent PID returns empty list."""
- from profiling.sampling._child_monitor import get_child_pids
-
# Use a very high PID that's unlikely to exist
result = get_child_pids(999999999)
self.assertEqual(result, [])
def test_monitor_creation(self):
"""Test that ChildProcessMonitor can be created."""
- from profiling.sampling._child_monitor import ChildProcessMonitor
-
monitor = ChildProcessMonitor(
pid=os.getpid(),
cli_args=["-r", "10khz", "-d", "5"],
def test_monitor_lifecycle(self):
"""Test monitor lifecycle via context manager."""
- from profiling.sampling._child_monitor import ChildProcessMonitor
-
monitor = ChildProcessMonitor(
pid=os.getpid(), cli_args=[], output_pattern=None
)
def test_spawned_profilers_property(self):
"""Test that spawned_profilers returns a copy of the list."""
- from profiling.sampling._child_monitor import ChildProcessMonitor
-
monitor = ChildProcessMonitor(
pid=os.getpid(), cli_args=[], output_pattern=None
)
def test_context_manager(self):
"""Test that ChildProcessMonitor works as a context manager."""
- from profiling.sampling._child_monitor import ChildProcessMonitor
-
with ChildProcessMonitor(
pid=os.getpid(), cli_args=[], output_pattern=None
) as monitor:
def test_subprocesses_flag_parsed(self):
"""Test that --subprocesses flag is recognized."""
- from profiling.sampling.cli import _add_sampling_options
-
parser = argparse.ArgumentParser()
_add_sampling_options(parser)
def test_subprocesses_incompatible_with_live(self):
"""Test that --subprocesses is incompatible with --live."""
- from profiling.sampling.cli import _validate_args
-
# Create mock args with both subprocesses and live
args = argparse.Namespace(
subprocesses=True,
def test_build_child_profiler_args(self):
"""Test building CLI args for child profilers."""
- from profiling.sampling.cli import _build_child_profiler_args
-
args = argparse.Namespace(
sample_interval_usec=200,
duration=15,
def test_build_child_profiler_args_no_gc(self):
"""Test building CLI args with --no-gc."""
- from profiling.sampling.cli import _build_child_profiler_args
-
args = argparse.Namespace(
sample_interval_usec=100,
duration=5,
def test_build_output_pattern_with_outfile(self):
"""Test output pattern generation with user-specified output."""
- from profiling.sampling.cli import _build_output_pattern
-
# With extension
args = argparse.Namespace(outfile="output.html", format="flamegraph")
pattern = _build_output_pattern(args)
def test_build_output_pattern_default(self):
"""Test output pattern generation with default output."""
- from profiling.sampling.cli import _build_output_pattern
-
# Flamegraph format
args = argparse.Namespace(outfile=None, format="flamegraph")
pattern = _build_output_pattern(args)
def test_setup_child_monitor(self):
"""Test setting up a child monitor from args."""
- from profiling.sampling.cli import _setup_child_monitor
-
args = argparse.Namespace(
sample_interval_usec=100,
duration=5,
def test_is_python_process_current_process(self):
"""Test that current process is detected as Python."""
- from profiling.sampling._child_monitor import is_python_process
-
# Current process should be Python
result = is_python_process(os.getpid())
self.assertTrue(
def test_is_python_process_python_subprocess(self):
"""Test that a Python subprocess is detected as Python."""
- from profiling.sampling._child_monitor import is_python_process
-
# Start a Python subprocess
proc = subprocess.Popen(
[sys.executable, "-c", "import time; time.sleep(10)"],
@unittest.skipUnless(sys.platform == "linux", "Linux only test")
def test_is_python_process_non_python_subprocess(self):
"""Test that a non-Python subprocess is not detected as Python."""
- from profiling.sampling._child_monitor import is_python_process
-
# Start a non-Python subprocess (sleep command)
proc = subprocess.Popen(
["sleep", "10"],
def test_is_python_process_nonexistent_pid(self):
"""Test that nonexistent PID returns False."""
- from profiling.sampling._child_monitor import is_python_process
-
# Use a very high PID that's unlikely to exist
result = is_python_process(999999999)
self.assertFalse(
def test_is_python_process_exited_process(self):
"""Test handling of a process that exits quickly."""
- from profiling.sampling._child_monitor import is_python_process
-
# Start a process that exits immediately
proc = subprocess.Popen(
[sys.executable, "-c", "pass"],
def test_max_profilers_constant_exists(self):
"""Test that _MAX_CHILD_PROFILERS constant is defined."""
- from profiling.sampling._child_monitor import _MAX_CHILD_PROFILERS
-
self.assertEqual(
_MAX_CHILD_PROFILERS,
100,
def test_cleanup_interval_constant_exists(self):
"""Test that _CLEANUP_INTERVAL_CYCLES constant is defined."""
- from profiling.sampling._child_monitor import _CLEANUP_INTERVAL_CYCLES
-
self.assertEqual(
_CLEANUP_INTERVAL_CYCLES,
10,
def test_monitor_respects_max_limit(self):
"""Test that monitor refuses to spawn more than _MAX_CHILD_PROFILERS."""
- from profiling.sampling._child_monitor import (
- ChildProcessMonitor,
- _MAX_CHILD_PROFILERS,
- )
- from unittest.mock import MagicMock, patch
-
# Create a monitor
monitor = ChildProcessMonitor(
pid=os.getpid(),
def test_wait_for_profilers_empty_list(self):
"""Test that wait_for_profilers returns immediately with no profilers."""
- from profiling.sampling._child_monitor import ChildProcessMonitor
-
monitor = ChildProcessMonitor(
pid=os.getpid(), cli_args=[], output_pattern=None
)
def test_wait_for_profilers_with_completed_process(self):
"""Test waiting for profilers that complete quickly."""
- from profiling.sampling._child_monitor import ChildProcessMonitor
-
monitor = ChildProcessMonitor(
pid=os.getpid(), cli_args=[], output_pattern=None
)
def test_wait_for_profilers_timeout(self):
"""Test that wait_for_profilers respects timeout."""
- from profiling.sampling._child_monitor import ChildProcessMonitor
-
monitor = ChildProcessMonitor(
pid=os.getpid(), cli_args=[], output_pattern=None
)
def test_wait_for_profilers_multiple(self):
"""Test waiting for multiple profilers."""
- from profiling.sampling._child_monitor import ChildProcessMonitor
-
monitor = ChildProcessMonitor(
pid=os.getpid(), cli_args=[], output_pattern=None
)
import json
import marshal
+import opcode
import os
import tempfile
import unittest
def test_get_opcode_info_standard_opcode(self):
"""Test get_opcode_info for a standard opcode."""
- import opcode
# LOAD_CONST is a standard opcode
load_const = opcode.opmap.get('LOAD_CONST')
if load_const is not None:
def test_format_opcode_standard(self):
"""Test format_opcode for a standard opcode."""
- import opcode
load_const = opcode.opmap.get('LOAD_CONST')
if load_const is not None:
formatted = format_opcode(load_const)
def test_format_opcode_specialized(self):
"""Test format_opcode for a specialized opcode shows base in parens."""
- import opcode
if not hasattr(opcode, '_specialized_opmap'):
self.skipTest("No specialized opcodes in this Python version")
if not hasattr(opcode, '_specializations'):
from profiling.sampling.pstats_collector import PstatsCollector
from profiling.sampling.stack_collector import CollapsedStackCollector
from profiling.sampling.sample import SampleProfiler, _is_process_running
+ from profiling.sampling.cli import main
except ImportError:
raise unittest.SkipTest(
"Test only runs when _remote_debugging is available"
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
- from profiling.sampling.cli import main
main()
output = captured_output.getvalue()
# Change to temp directory so subprocess can find the module
contextlib.chdir(tempdir.name),
):
- from profiling.sampling.cli import main
main()
output = captured_output.getvalue()
test_args = ["profiling.sampling.cli", "run", "--live"] + args + ["test.py"]
with mock.patch("sys.argv", test_args):
with self.assertRaises(SystemExit) as cm:
- from profiling.sampling.cli import main
- main()
+ main()
self.assertNotEqual(cm.exception.code, 0)
def test_live_incompatible_with_multiple_pstats_options(self):
with mock.patch("sys.argv", test_args):
with self.assertRaises(SystemExit) as cm:
- from profiling.sampling.cli import main
- main()
+ main()
self.assertNotEqual(cm.exception.code, 0)
def test_live_incompatible_with_pstats_default_values(self):
with mock.patch("sys.argv", test_args):
with self.assertRaises(SystemExit) as cm:
- from profiling.sampling.cli import main
- main()
+ main()
self.assertNotEqual(cm.exception.code, 0)
# Test with --limit=15 (the default value)
with mock.patch("sys.argv", test_args):
with self.assertRaises(SystemExit) as cm:
- from profiling.sampling.cli import main
- main()
+ main()
self.assertNotEqual(cm.exception.code, 0)
def test_finished_state_freezes_time(self):
"""Test that time displays are frozen when finished."""
- import time as time_module
# Set up collector with known start time
- self.collector.start_time = time_module.perf_counter() - 10.0 # 10 seconds ago
+ self.collector.start_time = time.perf_counter() - 10.0 # 10 seconds ago
# Mark as finished - this should freeze the time
self.collector.mark_finished()
frozen_time_display = self.collector.current_time_display
# Wait a bit to ensure time would advance
- time_module.sleep(0.1)
+ time.sleep(0.1)
# Time should remain frozen
self.assertEqual(self.collector.elapsed_time, frozen_elapsed)
def test_time_display_fix_when_finished(self):
"""Test that time display shows correct frozen time when finished."""
- import time as time_module
# Mark as finished to freeze time
self.collector.mark_finished()
frozen_time = self.collector.current_time_display
# Wait a bit
- time_module.sleep(0.1)
+ time.sleep(0.1)
# Should still show the same frozen time (not jump to wrong time)
self.assertEqual(self.collector.current_time_display, frozen_time)
import profiling.sampling
import profiling.sampling.sample
from profiling.sampling.pstats_collector import PstatsCollector
+ from profiling.sampling.cli import main, _parse_mode
+ from profiling.sampling.constants import PROFILING_MODE_EXCEPTION
+ from _remote_debugging import (
+ THREAD_STATUS_HAS_GIL,
+ THREAD_STATUS_ON_CPU,
+ )
except ImportError:
raise unittest.SkipTest(
"Test only runs when _remote_debugging is available"
mock.patch("sys.stderr", io.StringIO()) as mock_stderr,
self.assertRaises(SystemExit) as cm,
):
- from profiling.sampling.cli import main
main()
self.assertEqual(cm.exception.code, 2) # argparse error
def test_frames_filtered_with_skip_idle(self):
"""Test that frames are actually filtered when skip_idle=True."""
- # Import thread status flags
- try:
- from _remote_debugging import (
- THREAD_STATUS_HAS_GIL,
- THREAD_STATUS_ON_CPU,
- )
- except ImportError:
- THREAD_STATUS_HAS_GIL = 1 << 0
- THREAD_STATUS_ON_CPU = 1 << 1
-
# Create mock frames with different thread statuses
class MockThreadInfoWithStatus:
def __init__(self, thread_id, frame_info, status):
def test_gil_mode_validation(self):
"""Test that CLI accepts gil mode choice correctly."""
- from profiling.sampling.cli import main
test_args = [
"profiling.sampling.cli",
def test_gil_mode_cli_argument_parsing(self):
"""Test CLI argument parsing for GIL mode with various options."""
- from profiling.sampling.cli import main
test_args = [
"profiling.sampling.cli",
def test_parse_mode_function(self):
"""Test the _parse_mode function with all valid modes."""
- from profiling.sampling.cli import _parse_mode
self.assertEqual(_parse_mode("wall"), 0)
self.assertEqual(_parse_mode("cpu"), 1)
self.assertEqual(_parse_mode("gil"), 2)
def test_exception_mode_validation(self):
"""Test that CLI accepts exception mode choice correctly."""
- from profiling.sampling.cli import main
test_args = [
"profiling.sampling.cli",
def test_exception_mode_cli_argument_parsing(self):
"""Test CLI argument parsing for exception mode with various options."""
- from profiling.sampling.cli import main
test_args = [
"profiling.sampling.cli",
def test_exception_mode_constants_are_defined(self):
"""Test that exception mode constant is properly defined."""
- from profiling.sampling.constants import PROFILING_MODE_EXCEPTION
self.assertEqual(PROFILING_MODE_EXCEPTION, 4)
def test_exception_mode_integration_filtering(self):
"""Tests for sampling profiler core functionality."""
import io
+import re
from unittest import mock
import unittest
# Extract just the function names for comparison
func_names = []
- import re
for line in data_lines:
# Function name is between the last ( and ), accounting for ANSI color codes