--- /dev/null
+"""
+Internal synchronization coordinator for the sample profiler.
+
+This module is used internally by the sample profiler to coordinate
+the startup of target processes. It should not be called directly by users.
+"""
+
+import os
+import sys
+import socket
+import runpy
+import time
+from typing import List, NoReturn
+
+
+class CoordinatorError(Exception):
+ """Base exception for coordinator errors."""
+ pass
+
+
+class ArgumentError(CoordinatorError):
+ """Raised when invalid arguments are provided."""
+ pass
+
+
+class SyncError(CoordinatorError):
+ """Raised when synchronization with profiler fails."""
+ pass
+
+
+class TargetError(CoordinatorError):
+ """Raised when target execution fails."""
+ pass
+
+
+def _validate_arguments(args: List[str]) -> tuple[int, str, List[str]]:
+ """
+ Validate and parse command line arguments.
+
+ Args:
+ args: Command line arguments including script name
+
+ Returns:
+ Tuple of (sync_port, working_directory, target_args)
+
+ Raises:
+ ArgumentError: If arguments are invalid
+ """
+ if len(args) < 4:
+ raise ArgumentError(
+ "Insufficient arguments. Expected: <sync_port> <cwd> <target> [args...]"
+ )
+
+ try:
+ sync_port = int(args[1])
+ if not (1 <= sync_port <= 65535):
+ raise ValueError("Port out of range")
+ except ValueError as e:
+ raise ArgumentError(f"Invalid sync port '{args[1]}': {e}") from e
+
+ cwd = args[2]
+ if not os.path.isdir(cwd):
+ raise ArgumentError(f"Working directory does not exist: {cwd}")
+
+ target_args = args[3:]
+ if not target_args:
+ raise ArgumentError("No target specified")
+
+ return sync_port, cwd, target_args
+
+
+# Constants for socket communication
+_MAX_RETRIES = 3
+_INITIAL_RETRY_DELAY = 0.1
+_SOCKET_TIMEOUT = 2.0
+_READY_MESSAGE = b"ready"
+
+
+def _signal_readiness(sync_port: int) -> None:
+ """
+ Signal readiness to the profiler via TCP socket.
+
+ Args:
+ sync_port: Port number where profiler is listening
+
+ Raises:
+ SyncError: If unable to signal readiness
+ """
+ last_error = None
+
+ for attempt in range(_MAX_RETRIES):
+ try:
+ # Use context manager for automatic cleanup
+ with socket.create_connection(("127.0.0.1", sync_port), timeout=_SOCKET_TIMEOUT) as sock:
+ sock.send(_READY_MESSAGE)
+ return
+ except (socket.error, OSError) as e:
+ last_error = e
+ if attempt < _MAX_RETRIES - 1:
+ # Exponential backoff before retry
+ time.sleep(_INITIAL_RETRY_DELAY * (2 ** attempt))
+
+ # If we get here, all retries failed
+ raise SyncError(f"Failed to signal readiness after {_MAX_RETRIES} attempts: {last_error}") from last_error
+
+
+def _setup_environment(cwd: str) -> None:
+ """
+ Set up the execution environment.
+
+ Args:
+ cwd: Working directory to change to
+
+ Raises:
+ TargetError: If unable to set up environment
+ """
+ try:
+ os.chdir(cwd)
+ except OSError as e:
+ raise TargetError(f"Failed to change to directory {cwd}: {e}") from e
+
+ # Add current directory to sys.path if not present (for module imports)
+ if cwd not in sys.path:
+ sys.path.insert(0, cwd)
+
+
+def _execute_module(module_name: str, module_args: List[str]) -> None:
+ """
+ Execute a Python module.
+
+ Args:
+ module_name: Name of the module to execute
+ module_args: Arguments to pass to the module
+
+ Raises:
+ TargetError: If module execution fails
+ """
+ # Replace sys.argv to match how Python normally runs modules
+ # When running 'python -m module args', sys.argv is ["__main__.py", "args"]
+ sys.argv = [f"__main__.py"] + module_args
+
+ try:
+ runpy.run_module(module_name, run_name="__main__", alter_sys=True)
+ except ImportError as e:
+ raise TargetError(f"Module '{module_name}' not found: {e}") from e
+ except SystemExit:
+ # SystemExit is normal for modules
+ pass
+ except Exception as e:
+ raise TargetError(f"Error executing module '{module_name}': {e}") from e
+
+
+def _execute_script(script_path: str, script_args: List[str], cwd: str) -> None:
+ """
+ Execute a Python script.
+
+ Args:
+ script_path: Path to the script to execute
+ script_args: Arguments to pass to the script
+ cwd: Current working directory for path resolution
+
+ Raises:
+ TargetError: If script execution fails
+ """
+ # Make script path absolute if it isn't already
+ if not os.path.isabs(script_path):
+ script_path = os.path.join(cwd, script_path)
+
+ if not os.path.isfile(script_path):
+ raise TargetError(f"Script not found: {script_path}")
+
+ # Replace sys.argv to match original script call
+ sys.argv = [script_path] + script_args
+
+ try:
+ with open(script_path, 'rb') as f:
+ source_code = f.read()
+
+ # Compile and execute the script
+ code = compile(source_code, script_path, 'exec')
+ exec(code, {'__name__': '__main__', '__file__': script_path})
+ except FileNotFoundError as e:
+ raise TargetError(f"Script file not found: {script_path}") from e
+ except PermissionError as e:
+ raise TargetError(f"Permission denied reading script: {script_path}") from e
+ except SyntaxError as e:
+ raise TargetError(f"Syntax error in script {script_path}: {e}") from e
+ except SystemExit:
+ # SystemExit is normal for scripts
+ pass
+ except Exception as e:
+ raise TargetError(f"Error executing script '{script_path}': {e}") from e
+
+
+def main() -> NoReturn:
+ """
+ Main coordinator function.
+
+ This function coordinates the startup of a target Python process
+ with the sample profiler by signaling when the process is ready
+ to be profiled.
+ """
+ try:
+ # Parse and validate arguments
+ sync_port, cwd, target_args = _validate_arguments(sys.argv)
+
+ # Set up execution environment
+ _setup_environment(cwd)
+
+ # Signal readiness to profiler
+ _signal_readiness(sync_port)
+
+ # Execute the target
+ if target_args[0] == "-m":
+ # Module execution
+ if len(target_args) < 2:
+ raise ArgumentError("Module name required after -m")
+
+ module_name = target_args[1]
+ module_args = target_args[2:]
+ _execute_module(module_name, module_args)
+ else:
+ # Script execution
+ script_path = target_args[0]
+ script_args = target_args[1:]
+ _execute_script(script_path, script_args, cwd)
+
+ except CoordinatorError as e:
+ print(f"Profiler coordinator error: {e}", file=sys.stderr)
+ sys.exit(1)
+ except KeyboardInterrupt:
+ print("Interrupted", file=sys.stderr)
+ sys.exit(1)
+ except Exception as e:
+ print(f"Unexpected error in profiler coordinator: {e}", file=sys.stderr)
+ sys.exit(1)
+
+ # Normal exit
+ sys.exit(0)
+
+
+if __name__ == "__main__":
+ main()
import _remote_debugging
import os
import pstats
+import socket
+import subprocess
import statistics
import sys
import sysconfig
from .pstats_collector import PstatsCollector
from .stack_collector import CollapsedStackCollector
-FREE_THREADED_BUILD = sysconfig.get_config_var("Py_GIL_DISABLED") is not None
+_FREE_THREADED_BUILD = sysconfig.get_config_var("Py_GIL_DISABLED") is not None
+_MAX_STARTUP_ATTEMPTS = 5
+_STARTUP_RETRY_DELAY_SECONDS = 0.1
+_HELP_DESCRIPTION = """Sample a process's stack frames and generate profiling data.
+Supports the following target modes:
+ - -p PID: Profile an existing process by PID
+ - -m MODULE [ARGS...]: Profile a module as python -m module ...
+ - filename [ARGS...]: Profile the specified script by running it in a subprocess
+
+Examples:
+ # Profile process 1234 for 10 seconds with default settings
+ python -m profile.sample -p 1234
+
+ # Profile a script by running it in a subprocess
+ python -m profile.sample myscript.py arg1 arg2
+
+ # Profile a module by running it as python -m module in a subprocess
+ python -m profile.sample -m mymodule arg1 arg2
+
+ # Profile with custom interval and duration, save to file
+ python -m profile.sample -i 50 -d 30 -o profile.stats -p 1234
+
+ # Generate collapsed stacks for flamegraph
+ python -m profile.sample --collapsed -p 1234
+
+ # Profile all threads, sort by total time
+ python -m profile.sample -a --sort-tottime -p 1234
+
+ # Profile for 1 minute with 1ms sampling interval
+ python -m profile.sample -i 1000 -d 60 -p 1234
+
+ # Show only top 20 functions sorted by direct samples
+ python -m profile.sample --sort-nsamples -l 20 -p 1234
+
+ # Profile all threads and save collapsed stacks
+ python -m profile.sample -a --collapsed -o stacks.txt -p 1234
+
+ # Profile with real-time sampling statistics
+ python -m profile.sample --realtime-stats -p 1234
+
+ # Sort by sample percentage to find most sampled functions
+ python -m profile.sample --sort-sample-pct -p 1234
+
+ # Sort by cumulative samples to find functions most on call stack
+ python -m profile.sample --sort-nsamples-cumul -p 1234"""
+
+
+# Constants for socket synchronization
+_SYNC_TIMEOUT = 5.0
+_PROCESS_KILL_TIMEOUT = 2.0
+_READY_MESSAGE = b"ready"
+_RECV_BUFFER_SIZE = 1024
+
+
+def _run_with_sync(original_cmd):
+ """Run a command with socket-based synchronization and return the process."""
+ # Create a TCP socket for synchronization with better socket options
+ with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sync_sock:
+ # Set SO_REUSEADDR to avoid "Address already in use" errors
+ sync_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ sync_sock.bind(("127.0.0.1", 0)) # Let OS choose a free port
+ sync_port = sync_sock.getsockname()[1]
+ sync_sock.listen(1)
+ sync_sock.settimeout(_SYNC_TIMEOUT)
+
+ # Get current working directory to preserve it
+ cwd = os.getcwd()
+
+ # Build command using the sync coordinator
+ target_args = original_cmd[1:] # Remove python executable
+ cmd = (sys.executable, "-m", "profile._sync_coordinator", str(sync_port), cwd) + tuple(target_args)
+
+ # Start the process with coordinator
+ process = subprocess.Popen(cmd)
+
+ try:
+ # Wait for ready signal with timeout
+ with sync_sock.accept()[0] as conn:
+ ready_signal = conn.recv(_RECV_BUFFER_SIZE)
+
+ if ready_signal != _READY_MESSAGE:
+ raise RuntimeError(f"Invalid ready signal received: {ready_signal!r}")
+
+ except socket.timeout:
+ # If we timeout, kill the process and raise an error
+ if process.poll() is None:
+ process.terminate()
+ try:
+ process.wait(timeout=_PROCESS_KILL_TIMEOUT)
+ except subprocess.TimeoutExpired:
+ process.kill()
+ process.wait()
+ raise RuntimeError("Process failed to signal readiness within timeout")
+
+ return process
+
+
+
class SampleProfiler:
def __init__(self, pid, sample_interval_usec, all_threads):
self.pid = pid
self.sample_interval_usec = sample_interval_usec
self.all_threads = all_threads
- if FREE_THREADED_BUILD:
+ if _FREE_THREADED_BUILD:
self.unwinder = _remote_debugging.RemoteUnwinder(
self.pid, all_threads=self.all_threads
)
stack_frames = self.unwinder.get_stack_trace()
collector.collect(stack_frames)
except ProcessLookupError:
+ duration_sec = current_time - start_time
break
except (RuntimeError, UnicodeDecodeError, MemoryError, OSError):
errors += 1
f"The following options are only valid with --pstats format: {', '.join(invalid_opts)}"
)
- # Set default output filename for collapsed format
- if not args.outfile:
+ # Set default output filename for collapsed format only if we have a PID
+ # For module/script execution, this will be set later with the subprocess PID
+ if not args.outfile and args.pid is not None:
args.outfile = f"collapsed.{args.pid}.txt"
+def wait_for_process_and_sample(pid, sort_value, args):
+ """Sample the process immediately since it has already signaled readiness."""
+ # Set default collapsed filename with subprocess PID if not already set
+ filename = args.outfile
+ if not filename and args.format == "collapsed":
+ filename = f"collapsed.{pid}.txt"
+
+ sample(
+ pid,
+ sort=sort_value,
+ sample_interval_usec=args.interval,
+ duration_sec=args.duration,
+ filename=filename,
+ all_threads=args.all_threads,
+ limit=args.limit,
+ show_summary=not args.no_summary,
+ output_format=args.format,
+ realtime_stats=args.realtime_stats,
+ )
+
+
def main():
# Create the main parser
parser = argparse.ArgumentParser(
- description=(
- "Sample a process's stack frames and generate profiling data.\n"
- "Supports two output formats:\n"
- " - pstats: Detailed profiling statistics with sorting options\n"
- " - collapsed: Stack traces for generating flamegraphs\n"
- "\n"
- "Examples:\n"
- " # Profile process 1234 for 10 seconds with default settings\n"
- " python -m profile.sample 1234\n"
- "\n"
- " # Profile with custom interval and duration, save to file\n"
- " python -m profile.sample -i 50 -d 30 -o profile.stats 1234\n"
- "\n"
- " # Generate collapsed stacks for flamegraph\n"
- " python -m profile.sample --collapsed 1234\n"
- "\n"
- " # Profile all threads, sort by total time\n"
- " python -m profile.sample -a --sort-tottime 1234\n"
- "\n"
- " # Profile for 1 minute with 1ms sampling interval\n"
- " python -m profile.sample -i 1000 -d 60 1234\n"
- "\n"
- " # Show only top 20 functions sorted by direct samples\n"
- " python -m profile.sample --sort-nsamples -l 20 1234\n"
- "\n"
- " # Profile all threads and save collapsed stacks\n"
- " python -m profile.sample -a --collapsed -o stacks.txt 1234\n"
- "\n"
- " # Profile with real-time sampling statistics\n"
- " python -m profile.sample --realtime-stats 1234\n"
- "\n"
- " # Sort by sample percentage to find most sampled functions\n"
- " python -m profile.sample --sort-sample-pct 1234\n"
- "\n"
- " # Sort by cumulative samples to find functions most on call stack\n"
- " python -m profile.sample --sort-nsamples-cumul 1234"
- ),
+ description=_HELP_DESCRIPTION,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
- # Required arguments
- parser.add_argument("pid", type=int, help="Process ID to sample")
+ # Target selection
+ target_group = parser.add_mutually_exclusive_group(required=False)
+ target_group.add_argument(
+ "-p", "--pid", type=int, help="Process ID to sample"
+ )
+ target_group.add_argument(
+ "-m", "--module",
+ help="Run and profile a module as python -m module [ARGS...]"
+ )
+ parser.add_argument(
+ "args",
+ nargs=argparse.REMAINDER,
+ help="Script to run and profile, with optional arguments"
+ )
# Sampling options
sampling_group = parser.add_argument_group("Sampling configuration")
sort_value = args.sort if args.sort is not None else 2
- sample(
- args.pid,
- sample_interval_usec=args.interval,
- duration_sec=args.duration,
- filename=args.outfile,
- all_threads=args.all_threads,
- limit=args.limit,
- sort=sort_value,
- show_summary=not args.no_summary,
- output_format=args.format,
- realtime_stats=args.realtime_stats,
- )
+ if args.module is not None and not args.module:
+ parser.error("argument -m/--module: expected one argument")
+
+ # Validate that we have exactly one target type
+ # Note: args can be present with -m (module arguments) but not as standalone script
+ has_pid = args.pid is not None
+ has_module = args.module is not None
+ has_script = bool(args.args) and args.module is None
+
+ target_count = sum([has_pid, has_module, has_script])
+
+ if target_count == 0:
+ parser.error("one of the arguments -p/--pid -m/--module or script name is required")
+ elif target_count > 1:
+ parser.error("only one target type can be specified: -p/--pid, -m/--module, or script")
+
+ if args.pid:
+ sample(
+ args.pid,
+ sample_interval_usec=args.interval,
+ duration_sec=args.duration,
+ filename=args.outfile,
+ all_threads=args.all_threads,
+ limit=args.limit,
+ sort=sort_value,
+ show_summary=not args.no_summary,
+ output_format=args.format,
+ realtime_stats=args.realtime_stats,
+ )
+ elif args.module or args.args:
+ if args.module:
+ cmd = (sys.executable, "-m", args.module, *args.args)
+ else:
+ cmd = (sys.executable, *args.args)
+
+ # Use synchronized process startup
+ process = _run_with_sync(cmd)
+ # Process has already signaled readiness, start sampling immediately
+ try:
+ wait_for_process_and_sample(process.pid, sort_value, args)
+ finally:
+ if process.poll() is None:
+ process.terminate()
+ try:
+ process.wait(timeout=2)
+ except subprocess.TimeoutExpired:
+ process.kill()
+ process.wait()
if __name__ == "__main__":
main()
import io
import marshal
import os
+import shutil
import socket
import subprocess
import sys
# Just verify that sampling completed without error
# We're not testing output format here
+ def test_sample_target_script(self):
+ script_file = tempfile.NamedTemporaryFile(delete=False)
+ script_file.write(self.test_script.encode("utf-8"))
+ script_file.flush()
+ self.addCleanup(close_and_unlink, script_file)
+
+ test_args = ["profile.sample", "-d", "1", script_file.name]
+
+ with (
+ mock.patch("sys.argv", test_args),
+ io.StringIO() as captured_output,
+ mock.patch("sys.stdout", captured_output),
+ ):
+ try:
+ profile.sample.main()
+ except PermissionError:
+ self.skipTest("Insufficient permissions for remote profiling")
+
+ output = captured_output.getvalue()
+
+ # Basic checks on output
+ self.assertIn("Captured", output)
+ self.assertIn("samples", output)
+ self.assertIn("Profile Stats", output)
+
+ # Should see some of our test functions
+ self.assertIn("slow_fibonacci", output)
+
+
+ def test_sample_target_module(self):
+ tempdir = tempfile.TemporaryDirectory(delete=False)
+ self.addCleanup(lambda x: shutil.rmtree(x), tempdir.name)
+
+ module_path = os.path.join(tempdir.name, "test_module.py")
+
+ with open(module_path, "w") as f:
+ f.write(self.test_script)
+
+ test_args = ["profile.sample", "-d", "1", "-m", "test_module"]
+
+ with (
+ mock.patch("sys.argv", test_args),
+ io.StringIO() as captured_output,
+ mock.patch("sys.stdout", captured_output),
+ # Change to temp directory so subprocess can find the module
+ contextlib.chdir(tempdir.name),
+ ):
+ try:
+ profile.sample.main()
+ except PermissionError:
+ self.skipTest("Insufficient permissions for remote profiling")
+
+ output = captured_output.getvalue()
+
+ # Basic checks on output
+ self.assertIn("Captured", output)
+ self.assertIn("samples", output)
+ self.assertIn("Profile Stats", output)
+
+ # Should see some of our test functions
+ self.assertIn("slow_fibonacci", output)
+
@skip_if_not_supported
@unittest.skipIf(
self.assertIsNotNone(profiler.unwinder.get_stack_trace())
proc.kill()
proc.wait()
- # ValueError on MacOS (yeah I know), ProcessLookupError on Linux and Windows
- self.assertRaises((ValueError, ProcessLookupError), profiler.unwinder.get_stack_trace)
+ self.assertRaises(ProcessLookupError, profiler.unwinder.get_stack_trace)
# Exit the context manager to ensure the process is terminated
self.assertFalse(profiler._is_process_running())
- self.assertRaises((ValueError, ProcessLookupError), profiler.unwinder.get_stack_trace)
+ self.assertRaises(ProcessLookupError, profiler.unwinder.get_stack_trace)
@unittest.skipUnless(sys.platform == "linux", "Only valid on Linux")
def test_esrch_signal_handling(self):
class TestSampleProfilerCLI(unittest.TestCase):
+ def _setup_sync_mocks(self, mock_socket, mock_popen):
+ """Helper to set up socket and process mocks for coordinator tests."""
+ # Mock the sync socket with context manager support
+ mock_sock_instance = mock.MagicMock()
+ mock_sock_instance.getsockname.return_value = ("127.0.0.1", 12345)
+
+ # Mock the connection with context manager support
+ mock_conn = mock.MagicMock()
+ mock_conn.recv.return_value = b"ready"
+ mock_conn.__enter__.return_value = mock_conn
+ mock_conn.__exit__.return_value = None
+
+ # Mock accept() to return (connection, address) and support indexing
+ mock_accept_result = mock.MagicMock()
+ mock_accept_result.__getitem__.return_value = mock_conn # [0] returns the connection
+ mock_sock_instance.accept.return_value = mock_accept_result
+
+ # Mock socket with context manager support
+ mock_sock_instance.__enter__.return_value = mock_sock_instance
+ mock_sock_instance.__exit__.return_value = None
+ mock_socket.return_value = mock_sock_instance
+
+ # Mock the subprocess
+ mock_process = mock.MagicMock()
+ mock_process.pid = 12345
+ mock_process.poll.return_value = None
+ mock_popen.return_value = mock_process
+ return mock_process
+
+ def _verify_coordinator_command(self, mock_popen, expected_target_args):
+ """Helper to verify the coordinator command was called correctly."""
+ args, kwargs = mock_popen.call_args
+ coordinator_cmd = args[0]
+ self.assertEqual(coordinator_cmd[0], sys.executable)
+ self.assertEqual(coordinator_cmd[1], "-m")
+ self.assertEqual(coordinator_cmd[2], "profile._sync_coordinator")
+ self.assertEqual(coordinator_cmd[3], "12345") # port
+ # cwd is coordinator_cmd[4]
+ self.assertEqual(coordinator_cmd[5:], expected_target_args)
+
+ def test_cli_module_argument_parsing(self):
+ test_args = ["profile.sample", "-m", "mymodule"]
+
+ with (
+ mock.patch("sys.argv", test_args),
+ mock.patch("profile.sample.sample") as mock_sample,
+ mock.patch("subprocess.Popen") as mock_popen,
+ mock.patch("socket.socket") as mock_socket,
+ ):
+ self._setup_sync_mocks(mock_socket, mock_popen)
+ profile.sample.main()
+
+ self._verify_coordinator_command(mock_popen, ("-m", "mymodule"))
+ mock_sample.assert_called_once_with(
+ 12345,
+ sort=2, # default sort (sort_value from args.sort)
+ sample_interval_usec=100,
+ duration_sec=10,
+ filename=None,
+ all_threads=False,
+ limit=15,
+ show_summary=True,
+ output_format="pstats",
+ realtime_stats=False,
+ )
+
+ def test_cli_module_with_arguments(self):
+ test_args = ["profile.sample", "-m", "mymodule", "arg1", "arg2", "--flag"]
+
+ with (
+ mock.patch("sys.argv", test_args),
+ mock.patch("profile.sample.sample") as mock_sample,
+ mock.patch("subprocess.Popen") as mock_popen,
+ mock.patch("socket.socket") as mock_socket,
+ ):
+ self._setup_sync_mocks(mock_socket, mock_popen)
+ profile.sample.main()
+
+ self._verify_coordinator_command(mock_popen, ("-m", "mymodule", "arg1", "arg2", "--flag"))
+ mock_sample.assert_called_once_with(
+ 12345,
+ sort=2,
+ sample_interval_usec=100,
+ duration_sec=10,
+ filename=None,
+ all_threads=False,
+ limit=15,
+ show_summary=True,
+ output_format="pstats",
+ realtime_stats=False,
+ )
+
+ def test_cli_script_argument_parsing(self):
+ test_args = ["profile.sample", "myscript.py"]
+
+ with (
+ mock.patch("sys.argv", test_args),
+ mock.patch("profile.sample.sample") as mock_sample,
+ mock.patch("subprocess.Popen") as mock_popen,
+ mock.patch("socket.socket") as mock_socket,
+ ):
+ self._setup_sync_mocks(mock_socket, mock_popen)
+ profile.sample.main()
+
+ self._verify_coordinator_command(mock_popen, ("myscript.py",))
+ mock_sample.assert_called_once_with(
+ 12345,
+ sort=2,
+ sample_interval_usec=100,
+ duration_sec=10,
+ filename=None,
+ all_threads=False,
+ limit=15,
+ show_summary=True,
+ output_format="pstats",
+ realtime_stats=False,
+ )
+
+ def test_cli_script_with_arguments(self):
+ test_args = ["profile.sample", "myscript.py", "arg1", "arg2", "--flag"]
+
+ with (
+ mock.patch("sys.argv", test_args),
+ mock.patch("profile.sample.sample") as mock_sample,
+ mock.patch("subprocess.Popen") as mock_popen,
+ mock.patch("socket.socket") as mock_socket,
+ ):
+ # Use the helper to set up mocks consistently
+ mock_process = self._setup_sync_mocks(mock_socket, mock_popen)
+ # Override specific behavior for this test
+ mock_process.wait.side_effect = [subprocess.TimeoutExpired(test_args, 0.1), None]
+
+ profile.sample.main()
+
+ # Verify the coordinator command was called
+ args, kwargs = mock_popen.call_args
+ coordinator_cmd = args[0]
+ self.assertEqual(coordinator_cmd[0], sys.executable)
+ self.assertEqual(coordinator_cmd[1], "-m")
+ self.assertEqual(coordinator_cmd[2], "profile._sync_coordinator")
+ self.assertEqual(coordinator_cmd[3], "12345") # port
+ # cwd is coordinator_cmd[4]
+ self.assertEqual(coordinator_cmd[5:], ("myscript.py", "arg1", "arg2", "--flag"))
+
+ def test_cli_mutually_exclusive_pid_module(self):
+ test_args = ["profile.sample", "-p", "12345", "-m", "mymodule"]
+
+ with (
+ mock.patch("sys.argv", test_args),
+ mock.patch("sys.stderr", io.StringIO()) as mock_stderr,
+ self.assertRaises(SystemExit) as cm,
+ ):
+ profile.sample.main()
+
+ self.assertEqual(cm.exception.code, 2) # argparse error
+ error_msg = mock_stderr.getvalue()
+ self.assertIn("not allowed with argument", error_msg)
+
+ def test_cli_mutually_exclusive_pid_script(self):
+ test_args = ["profile.sample", "-p", "12345", "myscript.py"]
+
+ with (
+ mock.patch("sys.argv", test_args),
+ mock.patch("sys.stderr", io.StringIO()) as mock_stderr,
+ self.assertRaises(SystemExit) as cm,
+ ):
+ profile.sample.main()
+
+ self.assertEqual(cm.exception.code, 2) # argparse error
+ error_msg = mock_stderr.getvalue()
+ self.assertIn("only one target type can be specified", error_msg)
+
+ def test_cli_no_target_specified(self):
+ test_args = ["profile.sample", "-d", "5"]
+
+ with (
+ mock.patch("sys.argv", test_args),
+ mock.patch("sys.stderr", io.StringIO()) as mock_stderr,
+ self.assertRaises(SystemExit) as cm,
+ ):
+ profile.sample.main()
+
+ self.assertEqual(cm.exception.code, 2) # argparse error
+ error_msg = mock_stderr.getvalue()
+ self.assertIn("one of the arguments", error_msg)
+
+ def test_cli_module_with_profiler_options(self):
+ test_args = [
+ "profile.sample", "-i", "1000", "-d", "30", "-a",
+ "--sort-tottime", "-l", "20", "-m", "mymodule",
+ ]
+
+ with (
+ mock.patch("sys.argv", test_args),
+ mock.patch("profile.sample.sample") as mock_sample,
+ mock.patch("subprocess.Popen") as mock_popen,
+ mock.patch("socket.socket") as mock_socket,
+ ):
+ self._setup_sync_mocks(mock_socket, mock_popen)
+ profile.sample.main()
+
+ self._verify_coordinator_command(mock_popen, ("-m", "mymodule"))
+ mock_sample.assert_called_once_with(
+ 12345,
+ sort=1, # sort-tottime
+ sample_interval_usec=1000,
+ duration_sec=30,
+ filename=None,
+ all_threads=True,
+ limit=20,
+ show_summary=True,
+ output_format="pstats",
+ realtime_stats=False,
+ )
+
+ def test_cli_script_with_profiler_options(self):
+ """Test script with various profiler options."""
+ test_args = [
+ "profile.sample", "-i", "2000", "-d", "60",
+ "--collapsed", "-o", "output.txt",
+ "myscript.py", "scriptarg",
+ ]
+
+ with (
+ mock.patch("sys.argv", test_args),
+ mock.patch("profile.sample.sample") as mock_sample,
+ mock.patch("subprocess.Popen") as mock_popen,
+ mock.patch("socket.socket") as mock_socket,
+ ):
+ self._setup_sync_mocks(mock_socket, mock_popen)
+ profile.sample.main()
+
+ self._verify_coordinator_command(mock_popen, ("myscript.py", "scriptarg"))
+ # Verify profiler options were passed correctly
+ mock_sample.assert_called_once_with(
+ 12345,
+ sort=2, # default sort
+ sample_interval_usec=2000,
+ duration_sec=60,
+ filename="output.txt",
+ all_threads=False,
+ limit=15,
+ show_summary=True,
+ output_format="collapsed",
+ realtime_stats=False,
+ )
+
+ def test_cli_empty_module_name(self):
+ test_args = ["profile.sample", "-m"]
+
+ with (
+ mock.patch("sys.argv", test_args),
+ mock.patch("sys.stderr", io.StringIO()) as mock_stderr,
+ self.assertRaises(SystemExit) as cm,
+ ):
+ profile.sample.main()
+
+ self.assertEqual(cm.exception.code, 2) # argparse error
+ error_msg = mock_stderr.getvalue()
+ self.assertIn("argument -m/--module: expected one argument", error_msg)
+
+ def test_cli_long_module_option(self):
+ test_args = ["profile.sample", "--module", "mymodule", "arg1"]
+
+ with (
+ mock.patch("sys.argv", test_args),
+ mock.patch("profile.sample.sample") as mock_sample,
+ mock.patch("subprocess.Popen") as mock_popen,
+ mock.patch("socket.socket") as mock_socket,
+ ):
+ self._setup_sync_mocks(mock_socket, mock_popen)
+ profile.sample.main()
+
+ self._verify_coordinator_command(mock_popen, ("-m", "mymodule", "arg1"))
+
+ def test_cli_complex_script_arguments(self):
+ test_args = [
+ "profile.sample", "script.py",
+ "--input", "file.txt", "-v", "--output=/tmp/out", "positional"
+ ]
+
+ with (
+ mock.patch("sys.argv", test_args),
+ mock.patch("profile.sample.sample") as mock_sample,
+ mock.patch("profile.sample._run_with_sync") as mock_run_with_sync,
+ ):
+ mock_process = mock.MagicMock()
+ mock_process.pid = 12345
+ mock_process.wait.side_effect = [subprocess.TimeoutExpired(test_args, 0.1), None]
+ mock_process.poll.return_value = None
+ mock_run_with_sync.return_value = mock_process
+
+ profile.sample.main()
+
+ mock_run_with_sync.assert_called_once_with((
+ sys.executable, "script.py",
+ "--input", "file.txt", "-v", "--output=/tmp/out", "positional",
+ ))
+
def test_cli_collapsed_format_validation(self):
"""Test that CLI properly validates incompatible options with collapsed format."""
test_cases = [
# Test sort options are invalid with collapsed
(
- ["profile.sample", "--collapsed", "--sort-nsamples", "12345"],
+ ["profile.sample", "--collapsed", "--sort-nsamples", "-p", "12345"],
"sort",
),
(
- ["profile.sample", "--collapsed", "--sort-tottime", "12345"],
+ ["profile.sample", "--collapsed", "--sort-tottime", "-p", "12345"],
"sort",
),
(
"profile.sample",
"--collapsed",
"--sort-cumtime",
+ "-p",
"12345",
],
"sort",
"profile.sample",
"--collapsed",
"--sort-sample-pct",
+ "-p",
"12345",
],
"sort",
"profile.sample",
"--collapsed",
"--sort-cumul-pct",
+ "-p",
"12345",
],
"sort",
),
(
- ["profile.sample", "--collapsed", "--sort-name", "12345"],
+ ["profile.sample", "--collapsed", "--sort-name", "-p", "12345"],
"sort",
),
# Test limit option is invalid with collapsed
- (["profile.sample", "--collapsed", "-l", "20", "12345"], "limit"),
+ (["profile.sample", "--collapsed", "-l", "20", "-p", "12345"], "limit"),
(
- ["profile.sample", "--collapsed", "--limit", "20", "12345"],
+ ["profile.sample", "--collapsed", "--limit", "20", "-p", "12345"],
"limit",
),
# Test no-summary option is invalid with collapsed
(
- ["profile.sample", "--collapsed", "--no-summary", "12345"],
+ ["profile.sample", "--collapsed", "--no-summary", "-p", "12345"],
"summary",
),
]
def test_cli_default_collapsed_filename(self):
"""Test that collapsed format gets a default filename when not specified."""
- test_args = ["profile.sample", "--collapsed", "12345"]
+ test_args = ["profile.sample", "--collapsed", "-p", "12345"]
with (
mock.patch("sys.argv", test_args),
"""Test custom output filenames for both formats."""
test_cases = [
(
- ["profile.sample", "--pstats", "-o", "custom.pstats", "12345"],
+ ["profile.sample", "--pstats", "-o", "custom.pstats", "-p", "12345"],
"custom.pstats",
"pstats",
),
(
- ["profile.sample", "--collapsed", "-o", "custom.txt", "12345"],
+ ["profile.sample", "--collapsed", "-o", "custom.txt", "-p", "12345"],
"custom.txt",
"collapsed",
),
with (
mock.patch(
"sys.argv",
- ["profile.sample", "--pstats", "--collapsed", "12345"],
+ ["profile.sample", "--pstats", "--collapsed", "-p", "12345"],
),
mock.patch("sys.stderr", io.StringIO()),
):
profile.sample.main()
def test_argument_parsing_basic(self):
- test_args = ["profile.sample", "12345"]
+ test_args = ["profile.sample", "-p", "12345"]
with (
mock.patch("sys.argv", test_args),
]
for option, expected_sort_value in sort_options:
- test_args = ["profile.sample", option, "12345"]
+ test_args = ["profile.sample", option, "-p", "12345"]
with (
mock.patch("sys.argv", test_args),
#endif
#define INTERP_STATE_BUFFER_SIZE MAX(INTERP_STATE_MIN_SIZE, 256)
-
+#define MAX_TLBC_SIZE 2048
// Copied from Modules/_asynciomodule.c because it's not exported
TLBCCacheEntry *entry = NULL;
// Read the TLBC array pointer
- if (read_ptr(unwinder, tlbc_array_addr, &tlbc_array_ptr) != 0 || tlbc_array_ptr == 0) {
+ if (read_ptr(unwinder, tlbc_array_addr, &tlbc_array_ptr) != 0) {
+ PyErr_SetString(PyExc_RuntimeError, "Failed to read TLBC array pointer");
set_exception_cause(unwinder, PyExc_RuntimeError, "Failed to read TLBC array pointer");
+ return 0; // Read error
+ }
+
+ // Validate TLBC array pointer
+ if (tlbc_array_ptr == 0) {
+ PyErr_SetString(PyExc_RuntimeError, "TLBC array pointer is NULL");
return 0; // No TLBC array
}
// Read the TLBC array size
Py_ssize_t tlbc_size;
- if (_Py_RemoteDebug_PagedReadRemoteMemory(&unwinder->handle, tlbc_array_ptr, sizeof(tlbc_size), &tlbc_size) != 0 || tlbc_size <= 0) {
+ if (_Py_RemoteDebug_PagedReadRemoteMemory(&unwinder->handle, tlbc_array_ptr, sizeof(tlbc_size), &tlbc_size) != 0) {
+ PyErr_SetString(PyExc_RuntimeError, "Failed to read TLBC array size");
set_exception_cause(unwinder, PyExc_RuntimeError, "Failed to read TLBC array size");
+ return 0; // Read error
+ }
+
+ // Validate TLBC array size
+ if (tlbc_size <= 0) {
+ PyErr_SetString(PyExc_RuntimeError, "Invalid TLBC array size");
+ return 0; // Invalid size
+ }
+
+ if (tlbc_size > MAX_TLBC_SIZE) {
+ PyErr_SetString(PyExc_RuntimeError, "TLBC array size exceeds maximum limit");
return 0; // Invalid size
}
size_t array_data_size = tlbc_size * sizeof(void*);
tlbc_array = PyMem_RawMalloc(sizeof(Py_ssize_t) + array_data_size);
if (!tlbc_array) {
+ PyErr_NoMemory();
set_exception_cause(unwinder, PyExc_MemoryError, "Failed to allocate TLBC array");
return 0; // Memory error
}
// Create cache entry
entry = PyMem_RawMalloc(sizeof(TLBCCacheEntry));
if (!entry) {
+ PyErr_NoMemory();
PyMem_RawFree(tlbc_array);
set_exception_cause(unwinder, PyExc_MemoryError, "Failed to allocate TLBC cache entry");
return 0; // Memory error
meta = PyMem_RawMalloc(sizeof(CachedCodeMetadata));
if (!meta) {
+ PyErr_NoMemory();
set_exception_cause(unwinder, PyExc_MemoryError, "Failed to allocate cached code metadata");
goto error;
}
# include <mach-o/fat.h>
# include <mach-o/loader.h>
# include <mach-o/nlist.h>
+# include <mach/error.h>
# include <mach/mach.h>
# include <mach/mach_vm.h>
# include <mach/machine.h>
+# include <mach/task_info.h>
# include <sys/mman.h>
# include <sys/proc.h>
# include <sys/sysctl.h>
(mach_vm_size_t*)&result);
if (kr != KERN_SUCCESS) {
- switch (kr) {
+ switch (err_get_code(kr)) {
case KERN_PROTECTION_FAILURE:
PyErr_Format(PyExc_PermissionError,
"Memory protection failure reading from PID %d at address "
"0x%lx (size %zu): insufficient permissions",
handle->pid, remote_address, len);
break;
- case KERN_INVALID_ARGUMENT:
- PyErr_Format(PyExc_ValueError,
- "Invalid argument to mach_vm_read_overwrite for PID %d at "
- "address 0x%lx (size %zu)",
- handle->pid, remote_address, len);
+ case KERN_INVALID_ARGUMENT: {
+ // Perform a task_info check to see if the invalid argument is due
+ // to the process being terminated
+ task_basic_info_data_t task_basic_info;
+ mach_msg_type_number_t task_info_count = TASK_BASIC_INFO_COUNT;
+ kern_return_t task_valid_check = task_info(handle->task, TASK_BASIC_INFO,
+ (task_info_t)&task_basic_info,
+ &task_info_count);
+ if (task_valid_check == KERN_INVALID_ARGUMENT) {
+ PyErr_Format(PyExc_ProcessLookupError,
+ "Process %d is no longer accessible (process terminated)",
+ handle->pid);
+ } else {
+ PyErr_Format(PyExc_ValueError,
+ "Invalid argument to mach_vm_read_overwrite for PID %d at "
+ "address 0x%lx (size %zu) - check memory permissions",
+ handle->pid, remote_address, len);
+ }
+ break;
+ }
+ case KERN_NO_SPACE:
+ case KERN_MEMORY_ERROR:
+ PyErr_Format(PyExc_ProcessLookupError,
+ "Process %d memory space no longer available (process terminated)",
+ handle->pid);
break;
default:
PyErr_Format(PyExc_RuntimeError,