appears in the collected samples, then multiplies by the sampling interval to
estimate time.
-For example, with a 100 microsecond sampling interval over a 10-second profile,
+For example, with a 10 kHz sampling rate over a 10-second profile,
Tachyon collects approximately 100,000 samples. If a function appears in 5,000
samples (5% of total), Tachyon estimates it consumed 5% of the 10-second
duration, or about 500 milliseconds. This is a statistical estimate, not a
Profile for 60 seconds with a faster sampling rate::
- python -m profiling.sampling run -d 60 -i 50 script.py
+ python -m profiling.sampling run -d 60 -r 20khz script.py
Generate a line-by-line heatmap::
* - Option
- Default
- * - Default for ``--interval`` / ``-i``
- - 100 µs between samples (~10,000 samples/sec)
+ * - Default for ``--sampling-rate`` / ``-r``
+ - 1 kHz
* - Default for ``--duration`` / ``-d``
- 10 seconds
* - Default for ``--all-threads`` / ``-a``
- Disabled (non-blocking sampling)
-Sampling interval and duration
-------------------------------
+Sampling rate and duration
+--------------------------
-The two most fundamental parameters are the sampling interval and duration.
+The two most fundamental parameters are the sampling rate and duration.
Together, these determine how many samples will be collected during a profiling
session.
-The :option:`--interval` option (:option:`-i`) sets the time between samples in
-microseconds. The default is 100 microseconds, which produces approximately
-10,000 samples per second::
+The :option:`--sampling-rate` option (:option:`-r`) sets how frequently samples
+are collected. The default is 1 kHz (10,000 samples per second)::
- python -m profiling.sampling run -i 50 script.py
+ python -m profiling.sampling run -r 20khz script.py
-Lower intervals capture more samples and provide finer-grained data at the
-cost of slightly higher profiler CPU usage. Higher intervals reduce profiler
+Higher rates capture more samples and provide finer-grained data at the
+cost of slightly higher profiler CPU usage. Lower rates reduce profiler
overhead but may miss short-lived functions. For most applications, the
-default interval provides a good balance between accuracy and overhead.
+default rate provides a good balance between accuracy and overhead.
The :option:`--duration` option (:option:`-d`) sets how long to profile in seconds. The
default is 10 seconds::
- For pstats format (which defaults to stdout), subprocesses produce files like
``profile_12345.pstats``
-The subprocess profilers inherit most sampling options from the parent (interval,
-duration, thread selection, native frames, GC frames, async-aware mode, and
-output format). All Python descendant processes are profiled recursively,
+The subprocess profilers inherit most sampling options from the parent (sampling
+rate, duration, thread selection, native frames, GC frames, async-aware mode,
+and output format). All Python descendant processes are profiled recursively,
including grandchildren and further descendants.
Subprocess detection works by periodically scanning for new descendants of
Sampling options
----------------
-.. option:: -i <microseconds>, --interval <microseconds>
+.. option:: -r <rate>, --sampling-rate <rate>
- Sampling interval in microseconds. Default: 100.
+ Sampling rate (for example, ``10000``, ``10khz``, ``10k``). Default: ``1khz``.
.. option:: -d <seconds>, --duration <seconds>
_CHILD_POLL_INTERVAL_SEC = 0.1
# Default timeout for waiting on child profilers
-_DEFAULT_WAIT_TIMEOUT = 30.0
+_DEFAULT_WAIT_TIMEOUT_SEC = 30.0
# Maximum number of child profilers to spawn (prevents resource exhaustion)
_MAX_CHILD_PROFILERS = 100
with self._lock:
return list(self._spawned_profilers)
- def wait_for_profilers(self, timeout=_DEFAULT_WAIT_TIMEOUT):
+ def wait_for_profilers(self, timeout=_DEFAULT_WAIT_TIMEOUT_SEC):
"""
Wait for all spawned child profilers to complete.
# Constants for socket communication
_MAX_RETRIES = 3
-_INITIAL_RETRY_DELAY = 0.1
-_SOCKET_TIMEOUT = 2.0
+_INITIAL_RETRY_DELAY_SEC = 0.1
+_SOCKET_TIMEOUT_SEC = 2.0
_READY_MESSAGE = b"ready"
for attempt in range(_MAX_RETRIES):
try:
# Use context manager for automatic cleanup
- with socket.create_connection(("127.0.0.1", sync_port), timeout=_SOCKET_TIMEOUT) as sock:
+ with socket.create_connection(("127.0.0.1", sync_port), timeout=_SOCKET_TIMEOUT_SEC) as sock:
sock.send(_READY_MESSAGE)
return
except (socket.error, OSError) as e:
last_error = e
if attempt < _MAX_RETRIES - 1:
# Exponential backoff before retry
- time.sleep(_INITIAL_RETRY_DELAY * (2 ** attempt))
+ time.sleep(_INITIAL_RETRY_DELAY_SEC * (2 ** attempt))
# If we get here, all retries failed
raise SyncError(f"Failed to signal readiness after {_MAX_RETRIES} attempts: {last_error}") from last_error
import importlib.util
import locale
import os
+import re
import selectors
import socket
import subprocess
from .binary_collector import BinaryCollector
from .binary_reader import BinaryReader
from .constants import (
+ MICROSECONDS_PER_SECOND,
PROFILING_MODE_ALL,
PROFILING_MODE_WALL,
PROFILING_MODE_CPU,
# Constants for socket synchronization
-_SYNC_TIMEOUT = 5.0
-_PROCESS_KILL_TIMEOUT = 2.0
+_SYNC_TIMEOUT_SEC = 5.0
+_PROCESS_KILL_TIMEOUT_SEC = 2.0
_READY_MESSAGE = b"ready"
_RECV_BUFFER_SIZE = 1024
child_args = []
# Sampling options
- child_args.extend(["-i", str(args.interval)])
+ hz = MICROSECONDS_PER_SECOND // args.sample_interval_usec
+ child_args.extend(["-r", str(hz)])
child_args.extend(["-d", str(args.duration)])
if args.all_threads:
sync_sock.bind(("127.0.0.1", 0)) # Let OS choose a free port
sync_port = sync_sock.getsockname()[1]
sync_sock.listen(1)
- sync_sock.settimeout(_SYNC_TIMEOUT)
+ sync_sock.settimeout(_SYNC_TIMEOUT_SEC)
# Get current working directory to preserve it
cwd = os.getcwd()
process = subprocess.Popen(cmd, **popen_kwargs)
try:
- _wait_for_ready_signal(sync_sock, process, _SYNC_TIMEOUT)
+ _wait_for_ready_signal(sync_sock, process, _SYNC_TIMEOUT_SEC)
# Close stderr pipe if we were capturing it
if process.stderr:
if process.poll() is None:
process.terminate()
try:
- process.wait(timeout=_PROCESS_KILL_TIMEOUT)
+ process.wait(timeout=_PROCESS_KILL_TIMEOUT_SEC)
except subprocess.TimeoutExpired:
process.kill()
process.wait()
return process
+_RATE_PATTERN = re.compile(r'''
+ ^ # Start of string
+ ( # Group 1: The numeric value
+ \d+ # One or more digits (integer part)
+ (?:\.\d+)? # Optional: decimal point followed by digits
+ ) # Examples: "10", "0.5", "100.25"
+ ( # Group 2: Optional unit suffix
+ hz # "hz" - hertz
+ | khz # "khz" - kilohertz
+ | k # "k" - shorthand for kilohertz
+ )? # Suffix is optional (bare number = Hz)
+ $ # End of string
+ ''', re.VERBOSE | re.IGNORECASE)
+
+
+def _parse_sampling_rate(rate_str: str) -> int:
+ """Parse sampling rate string to microseconds."""
+ rate_str = rate_str.strip().lower()
+
+ match = _RATE_PATTERN.match(rate_str)
+ if not match:
+ raise argparse.ArgumentTypeError(
+ f"Invalid sampling rate format: {rate_str}. "
+ "Expected: number followed by optional suffix (hz, khz, k) with no spaces (e.g., 10khz)"
+ )
+
+ number_part = match.group(1)
+ suffix = match.group(2) or ''
+
+ # Determine multiplier based on suffix
+ suffix_map = {
+ 'hz': 1,
+ 'khz': 1000,
+ 'k': 1000,
+ }
+ multiplier = suffix_map.get(suffix, 1)
+ hz = float(number_part) * multiplier
+ if hz <= 0:
+ raise argparse.ArgumentTypeError(f"Sampling rate must be positive: {rate_str}")
+
+ interval_usec = int(MICROSECONDS_PER_SECOND / hz)
+ if interval_usec < 1:
+ raise argparse.ArgumentTypeError(f"Sampling rate too high: {rate_str}")
+
+ return interval_usec
+
+
def _add_sampling_options(parser):
"""Add sampling configuration options to a parser."""
sampling_group = parser.add_argument_group("Sampling configuration")
sampling_group.add_argument(
- "-i",
- "--interval",
- type=int,
- default=100,
- metavar="MICROSECONDS",
- help="sampling interval",
+ "-r",
+ "--sampling-rate",
+ type=_parse_sampling_rate,
+ default="1khz",
+ metavar="RATE",
+ dest="sample_interval_usec",
+ help="sampling rate (e.g., 10000, 10khz, 10k)",
)
sampling_group.add_argument(
"-d",
}
return sort_map.get(sort_choice, SORT_MODE_NSAMPLES)
-
-def _create_collector(format_type, interval, skip_idle, opcodes=False,
+def _create_collector(format_type, sample_interval_usec, skip_idle, opcodes=False,
output_file=None, compression='auto'):
"""Create the appropriate collector based on format type.
Args:
format_type: The output format ('pstats', 'collapsed', 'flamegraph', 'gecko', 'heatmap', 'binary')
- interval: Sampling interval in microseconds
+ sample_interval_usec: Sampling interval in microseconds
skip_idle: Whether to skip idle samples
opcodes: Whether to collect opcode information (only used by gecko format
for creating interval markers in Firefox Profiler)
# and is the only format that uses opcodes for interval markers
if format_type == "gecko":
skip_idle = False
- return collector_class(interval, skip_idle=skip_idle, opcodes=opcodes)
+ return collector_class(sample_interval_usec, skip_idle=skip_idle, opcodes=opcodes)
- return collector_class(interval, skip_idle=skip_idle)
+ return collector_class(sample_interval_usec, skip_idle=skip_idle)
def _generate_output_filename(format_type, pid):
# Generate flamegraph from a script
`python -m profiling.sampling run --flamegraph -o output.html script.py`
- # Profile with custom interval and duration
- `python -m profiling.sampling run -i 50 -d 30 script.py`
+ # Profile with custom rate and duration
+ `python -m profiling.sampling run -r 5khz -d 30 script.py`
# Save collapsed stacks to file
`python -m profiling.sampling run --collapsed -o stacks.txt script.py`
# Create the appropriate collector
collector = _create_collector(
- args.format, args.interval, skip_idle, args.opcodes,
+ args.format, args.sample_interval_usec, skip_idle, args.opcodes,
output_file=output_file,
compression=getattr(args, 'compression', 'auto')
)
# Create the appropriate collector
collector = _create_collector(
- args.format, args.interval, skip_idle, args.opcodes,
+ args.format, args.sample_interval_usec, skip_idle, args.opcodes,
output_file=output_file,
compression=getattr(args, 'compression', 'auto')
)
if process.poll() is None:
process.terminate()
try:
- process.wait(timeout=_PROCESS_KILL_TIMEOUT)
+ process.wait(timeout=_PROCESS_KILL_TIMEOUT_SEC)
except subprocess.TimeoutExpired:
process.kill()
process.wait()
# Create live collector with default settings
collector = LiveStatsCollector(
- args.interval,
+ args.sample_interval_usec,
skip_idle=skip_idle,
sort_by="tottime", # Default initial sort
limit=20, # Default limit
# Create live collector with default settings
collector = LiveStatsCollector(
- args.interval,
+ args.sample_interval_usec,
skip_idle=skip_idle,
sort_by="tottime", # Default initial sort
limit=20, # Default limit
"""Constants for the sampling profiler."""
+# Time unit conversion constants
+MICROSECONDS_PER_SECOND = 1_000_000
+MILLISECONDS_PER_SECOND = 1_000
+
# Profiling mode constants
PROFILING_MODE_WALL = 0
PROFILING_MODE_CPU = 1
from .constants import (
MICROSECONDS_PER_SECOND,
DISPLAY_UPDATE_HZ,
- DISPLAY_UPDATE_INTERVAL,
+ DISPLAY_UPDATE_INTERVAL_SEC,
MIN_TERMINAL_WIDTH,
MIN_TERMINAL_HEIGHT,
WIDTH_THRESHOLD_SAMPLE_PCT,
# Constants
"MICROSECONDS_PER_SECOND",
"DISPLAY_UPDATE_HZ",
- "DISPLAY_UPDATE_INTERVAL",
+ "DISPLAY_UPDATE_INTERVAL_SEC",
"MIN_TERMINAL_WIDTH",
"MIN_TERMINAL_HEIGHT",
"WIDTH_THRESHOLD_SAMPLE_PCT",
)
from .constants import (
MICROSECONDS_PER_SECOND,
- DISPLAY_UPDATE_INTERVAL,
+ DISPLAY_UPDATE_INTERVAL_SEC,
MIN_TERMINAL_WIDTH,
MIN_TERMINAL_HEIGHT,
HEADER_LINES,
self.max_sample_rate = 0 # Track maximum sample rate seen
self.successful_samples = 0 # Track samples that captured frames
self.failed_samples = 0 # Track samples that failed to capture frames
- self.display_update_interval = DISPLAY_UPDATE_INTERVAL # Instance variable for display refresh rate
+ self.display_update_interval_sec = DISPLAY_UPDATE_INTERVAL_SEC # Instance variable for display refresh rate
# Thread status statistics (bit flags)
self.thread_status_counts = {
if (
self._last_display_update is None
or (current_time - self._last_display_update)
- >= self.display_update_interval
+ >= self.display_update_interval_sec
):
self._update_display()
self._last_display_update = current_time
elif ch == ord("+") or ch == ord("="):
# Decrease update interval (faster refresh)
- self.display_update_interval = max(
- 0.05, self.display_update_interval - 0.05
+ self.display_update_interval_sec = max(
+ 0.05, self.display_update_interval_sec - 0.05
) # Min 20Hz
elif ch == ord("-") or ch == ord("_"):
# Increase update interval (slower refresh)
- self.display_update_interval = min(
- 1.0, self.display_update_interval + 0.05
+ self.display_update_interval_sec = min(
+ 1.0, self.display_update_interval_sec + 0.05
) # Max 1Hz
elif ch == ord("c") or ch == ord("C"):
# Display update constants
DISPLAY_UPDATE_HZ = 10
-DISPLAY_UPDATE_INTERVAL = 1.0 / DISPLAY_UPDATE_HZ # 0.1 seconds
+DISPLAY_UPDATE_INTERVAL_SEC = 1.0 / DISPLAY_UPDATE_HZ # 0.1 seconds
# Terminal size constraints
MIN_TERMINAL_WIDTH = 60
WIDTH_THRESHOLD_CUMUL_PCT,
WIDTH_THRESHOLD_CUMTIME,
MICROSECONDS_PER_SECOND,
- DISPLAY_UPDATE_INTERVAL,
+ DISPLAY_UPDATE_INTERVAL_SEC,
MIN_BAR_WIDTH,
MAX_SAMPLE_RATE_BAR_WIDTH,
MAX_EFFICIENCY_BAR_WIDTH,
# Calculate display refresh rate
refresh_hz = (
- 1.0 / self.collector.display_update_interval if self.collector.display_update_interval > 0 else 0
+ 1.0 / self.collector.display_update_interval_sec if self.collector.display_update_interval_sec > 0 else 0
)
# Get current view mode and thread display
def format_rate_with_units(self, rate_hz):
"""Format a rate in Hz with appropriate units (Hz, KHz, MHz)."""
- if rate_hz >= 1_000_000:
- return f"{rate_hz / 1_000_000:.1f}MHz"
+ if rate_hz >= MICROSECONDS_PER_SECOND:
+ return f"{rate_hz / MICROSECONDS_PER_SECOND:.1f}MHz"
elif rate_hz >= 1_000:
return f"{rate_hz / 1_000:.1f}KHz"
else:
from _colorize import ANSIColors
from .collector import Collector, extract_lineno
+from .constants import MICROSECONDS_PER_SECOND
class PstatsCollector(Collector):
# Needed for compatibility with pstats.Stats
def create_stats(self):
- sample_interval_sec = self.sample_interval_usec / 1_000_000
+ sample_interval_sec = self.sample_interval_usec / MICROSECONDS_PER_SECOND
callers = {}
for fname, call_counts in self.result.items():
total = call_counts["direct_calls"] * sample_interval_sec
elif max_value >= 0.001:
return "ms", 1000.0
else:
- return "μs", 1000000.0
+ return "μs", float(MICROSECONDS_PER_SECOND)
def _print_summary(self, stats_list, total_samples):
"""Print summary of interesting functions."""
"run",
"-d",
"5",
- "-i",
- "100000",
+ "-r",
+ "10",
script,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
monitor = ChildProcessMonitor(
pid=os.getpid(),
- cli_args=["-i", "100", "-d", "5"],
+ cli_args=["-r", "10khz", "-d", "5"],
output_pattern="test_{pid}.pstats",
)
self.assertEqual(monitor.parent_pid, os.getpid())
- self.assertEqual(monitor.cli_args, ["-i", "100", "-d", "5"])
+ self.assertEqual(monitor.cli_args, ["-r", "10khz", "-d", "5"])
self.assertEqual(monitor.output_pattern, "test_{pid}.pstats")
def test_monitor_lifecycle(self):
from profiling.sampling.cli import _build_child_profiler_args
args = argparse.Namespace(
- interval=200,
+ sample_interval_usec=200,
duration=15,
all_threads=True,
realtime_stats=False,
f"'{child_args[flag_index + 1]}' in args: {child_args}",
)
- assert_flag_value_pair("-i", 200)
+ assert_flag_value_pair("-r", 5000)
assert_flag_value_pair("-d", 15)
assert_flag_value_pair("--mode", "cpu")
from profiling.sampling.cli import _build_child_profiler_args
args = argparse.Namespace(
- interval=100,
+ sample_interval_usec=100,
duration=5,
all_threads=False,
realtime_stats=False,
from profiling.sampling.cli import _setup_child_monitor
args = argparse.Namespace(
- interval=100,
+ sample_interval_usec=100,
duration=5,
all_threads=False,
realtime_stats=False,
# Create a monitor
monitor = ChildProcessMonitor(
pid=os.getpid(),
- cli_args=["-i", "100", "-d", "5"],
+ cli_args=["-r", "10khz", "-d", "5"],
output_pattern="test_{pid}.pstats",
)
"--subprocesses",
"-d",
"3",
- "-i",
- "10000",
+ "-r",
+ "100",
"-o",
output_file,
script_file,
"--subprocesses",
"-d",
"2",
- "-i",
- "10000",
+ "-r",
+ "100",
"--flamegraph",
"-o",
output_file,
"--subprocesses",
"-d",
"2",
- "-i",
- "10000",
+ "-r",
+ "100",
"-o",
output_file,
script_file,
test_args = [
"profiling.sampling.cli",
"run",
- "-i",
+ "-r",
"1000",
"-d",
"30",
test_args = [
"profiling.sampling.cli",
"run",
- "-i",
- "2000",
+ "-r",
+ "500",
"-d",
"60",
"--collapsed",
)
self.collector.start_time = time.perf_counter()
# Set a consistent display update interval for tests
- self.collector.display_update_interval = 0.1
+ self.collector.display_update_interval_sec = 0.1
def tearDown(self):
"""Clean up after test."""
def test_increase_refresh_rate(self):
"""Test increasing refresh rate (faster updates)."""
- initial_interval = self.collector.display_update_interval
+ initial_interval = self.collector.display_update_interval_sec
# Simulate '+' key press (faster = smaller interval)
self.display.simulate_input(ord("+"))
self.collector._handle_input()
- self.assertLess(self.collector.display_update_interval, initial_interval)
+ self.assertLess(self.collector.display_update_interval_sec, initial_interval)
def test_decrease_refresh_rate(self):
"""Test decreasing refresh rate (slower updates)."""
- initial_interval = self.collector.display_update_interval
+ initial_interval = self.collector.display_update_interval_sec
# Simulate '-' key press (slower = larger interval)
self.display.simulate_input(ord("-"))
self.collector._handle_input()
- self.assertGreater(self.collector.display_update_interval, initial_interval)
+ self.assertGreater(self.collector.display_update_interval_sec, initial_interval)
def test_refresh_rate_minimum(self):
"""Test that refresh rate has a minimum (max speed)."""
- self.collector.display_update_interval = 0.05 # Set to minimum
+ self.collector.display_update_interval_sec = 0.05 # Set to minimum
# Try to go faster
self.display.simulate_input(ord("+"))
self.collector._handle_input()
# Should stay at minimum
- self.assertEqual(self.collector.display_update_interval, 0.05)
+ self.assertEqual(self.collector.display_update_interval_sec, 0.05)
def test_refresh_rate_maximum(self):
"""Test that refresh rate has a maximum (min speed)."""
- self.collector.display_update_interval = 1.0 # Set to maximum
+ self.collector.display_update_interval_sec = 1.0 # Set to maximum
# Try to go slower
self.display.simulate_input(ord("-"))
self.collector._handle_input()
# Should stay at maximum
- self.assertEqual(self.collector.display_update_interval, 1.0)
+ self.assertEqual(self.collector.display_update_interval_sec, 1.0)
def test_help_toggle(self):
"""Test help screen toggle."""
def test_increase_refresh_rate_with_equals(self):
"""Test increasing refresh rate with '=' key."""
- initial_interval = self.collector.display_update_interval
+ initial_interval = self.collector.display_update_interval_sec
# Simulate '=' key press (alternative to '+')
self.display.simulate_input(ord("="))
self.collector._handle_input()
- self.assertLess(self.collector.display_update_interval, initial_interval)
+ self.assertLess(self.collector.display_update_interval_sec, initial_interval)
def test_decrease_refresh_rate_with_underscore(self):
"""Test decreasing refresh rate with '_' key."""
- initial_interval = self.collector.display_update_interval
+ initial_interval = self.collector.display_update_interval_sec
# Simulate '_' key press (alternative to '-')
self.display.simulate_input(ord("_"))
self.collector._handle_input()
- self.assertGreater(self.collector.display_update_interval, initial_interval)
+ self.assertGreater(self.collector.display_update_interval_sec, initial_interval)
def test_finished_state_displays_banner(self):
"""Test that finished state shows prominent banner."""
"12345",
"--mode",
"gil",
- "-i",
- "500",
+ "-r",
+ "2000",
"-d",
"5",
]
"12345",
"--mode",
"exception",
- "-i",
- "500",
+ "-r",
+ "2000",
"-d",
"5",
]