"""Command-line interface for the sampling profiler."""
import argparse
+import importlib.util
import os
+import selectors
import socket
import subprocess
import sys
+import time
from .sample import sample, sample_live
from .pstats_collector import PstatsCollector
return mode_map[mode_string]
+def _check_process_died(process):
+ """Check if process died and raise an error with stderr if available."""
+ if process.poll() is None:
+ return # Process still running
+
+ # Process died - try to get stderr for error message
+ stderr_msg = ""
+ if process.stderr:
+ try:
+ stderr_msg = process.stderr.read().decode().strip()
+ except (OSError, UnicodeDecodeError):
+ pass
+
+ if stderr_msg:
+ raise RuntimeError(stderr_msg)
+ raise RuntimeError(f"Process exited with code {process.returncode}")
+
+
+def _wait_for_ready_signal(sync_sock, process, timeout):
+ """Wait for the ready signal from the subprocess, checking for early death."""
+ deadline = time.monotonic() + timeout
+ sel = selectors.DefaultSelector()
+ sel.register(sync_sock, selectors.EVENT_READ)
+
+ try:
+ while True:
+ _check_process_died(process)
+
+ remaining = deadline - time.monotonic()
+ if remaining <= 0:
+ raise socket.timeout("timed out")
+
+ if not sel.select(timeout=min(0.1, remaining)):
+ continue
+
+ conn, _ = sync_sock.accept()
+ try:
+ ready_signal = conn.recv(_RECV_BUFFER_SIZE)
+ finally:
+ conn.close()
+
+ if ready_signal != _READY_MESSAGE:
+ raise RuntimeError(f"Invalid ready signal received: {ready_signal!r}")
+ return
+ finally:
+ sel.close()
+
+
def _run_with_sync(original_cmd, suppress_output=False):
"""Run a command with socket-based synchronization and return the process."""
# Create a TCP socket for synchronization with better socket options
) + tuple(target_args)
# Start the process with coordinator
- # Suppress stdout/stderr if requested (for live mode)
+ # When suppress_output=True (live mode), capture stderr so we can
+ # report errors if the process dies before signaling ready.
+ # When suppress_output=False (normal mode), let stderr inherit so
+ # script errors print to the terminal.
popen_kwargs = {}
if suppress_output:
popen_kwargs["stdin"] = subprocess.DEVNULL
popen_kwargs["stdout"] = subprocess.DEVNULL
- popen_kwargs["stderr"] = subprocess.DEVNULL
+ popen_kwargs["stderr"] = subprocess.PIPE
process = subprocess.Popen(cmd, **popen_kwargs)
try:
- # Wait for ready signal with timeout
- with sync_sock.accept()[0] as conn:
- ready_signal = conn.recv(_RECV_BUFFER_SIZE)
+ _wait_for_ready_signal(sync_sock, process, _SYNC_TIMEOUT)
- if ready_signal != _READY_MESSAGE:
- raise RuntimeError(
- f"Invalid ready signal received: {ready_signal!r}"
- )
+ # Close stderr pipe if we were capturing it
+ if process.stderr:
+ process.stderr.close()
except socket.timeout:
# If we timeout, kill the process and raise an error
def _handle_run(args):
"""Handle the 'run' command."""
+ # Validate target exists before launching subprocess
+ if args.module:
+ # Temporarily add cwd to sys.path so we can find modules in the
+ # current directory, matching the coordinator's behavior
+ cwd = os.getcwd()
+ added_cwd = False
+ if cwd not in sys.path:
+ sys.path.insert(0, cwd)
+ added_cwd = True
+ try:
+ if importlib.util.find_spec(args.target) is None:
+ sys.exit(f"Error: Module not found: {args.target}")
+ finally:
+ if added_cwd:
+ sys.path.remove(cwd)
+ else:
+ if not os.path.exists(args.target):
+ sys.exit(f"Error: Script not found: {args.target}")
+
# Check if live mode is requested
if args.live:
_handle_live_run(args)
cmd = (sys.executable, args.target, *args.args)
# Run with synchronization
- process = _run_with_sync(cmd, suppress_output=False)
+ try:
+ process = _run_with_sync(cmd, suppress_output=False)
+ except RuntimeError as e:
+ sys.exit(f"Error: {e}")
# Use PROFILING_MODE_ALL for gecko format
mode = (
cmd = (sys.executable, args.target, *args.args)
# Run with synchronization, suppressing output for live mode
- process = _run_with_sync(cmd, suppress_output=True)
+ try:
+ process = _run_with_sync(cmd, suppress_output=True)
+ except RuntimeError as e:
+ sys.exit(f"Error: {e}")
mode = _parse_mode(args.mode)
"Test only runs when _remote_debugging is available"
)
-from test.support import is_emscripten
+from test.support import is_emscripten, requires_subprocess
+
+from profiling.sampling.cli import main
class TestSampleProfilerCLI(unittest.TestCase):
self.assertEqual(coordinator_cmd[5:], expected_target_args)
@unittest.skipIf(is_emscripten, "socket.SO_REUSEADDR does not exist")
+ @requires_subprocess()
def test_cli_module_argument_parsing(self):
test_args = ["profiling.sampling.cli", "run", "-m", "mymodule"]
mock.patch("profiling.sampling.cli.sample") as mock_sample,
mock.patch("subprocess.Popen") as mock_popen,
mock.patch("socket.socket") as mock_socket,
+ mock.patch("profiling.sampling.cli._wait_for_ready_signal"),
+ mock.patch("importlib.util.find_spec", return_value=True),
):
- from profiling.sampling.cli import main
self._setup_sync_mocks(mock_socket, mock_popen)
main()
mock_sample.assert_called_once()
@unittest.skipIf(is_emscripten, "socket.SO_REUSEADDR does not exist")
+ @requires_subprocess()
def test_cli_module_with_arguments(self):
test_args = [
"profiling.sampling.cli",
mock.patch("profiling.sampling.cli.sample") as mock_sample,
mock.patch("subprocess.Popen") as mock_popen,
mock.patch("socket.socket") as mock_socket,
+ mock.patch("profiling.sampling.cli._wait_for_ready_signal"),
+ mock.patch("importlib.util.find_spec", return_value=True),
):
self._setup_sync_mocks(mock_socket, mock_popen)
- from profiling.sampling.cli import main
main()
self._verify_coordinator_command(
mock.patch("profiling.sampling.cli.sample") as mock_sample,
mock.patch("subprocess.Popen") as mock_popen,
mock.patch("socket.socket") as mock_socket,
+ mock.patch("profiling.sampling.cli._wait_for_ready_signal"),
+ mock.patch("os.path.exists", return_value=True),
):
self._setup_sync_mocks(mock_socket, mock_popen)
- from profiling.sampling.cli import main
main()
self._verify_coordinator_command(mock_popen, ("myscript.py",))
mock.patch("profiling.sampling.cli.sample") as mock_sample,
mock.patch("subprocess.Popen") as mock_popen,
mock.patch("socket.socket") as mock_socket,
+ mock.patch("profiling.sampling.cli._wait_for_ready_signal"),
+ mock.patch("os.path.exists", return_value=True),
):
# Use the helper to set up mocks consistently
mock_process = self._setup_sync_mocks(mock_socket, mock_popen)
None,
]
- from profiling.sampling.cli import main
main()
# Verify the coordinator command was called
mock.patch("sys.stderr", io.StringIO()) as mock_stderr,
self.assertRaises(SystemExit) as cm,
):
- from profiling.sampling.cli import main
main()
self.assertEqual(cm.exception.code, 2) # argparse error
with (
mock.patch("sys.argv", test_args),
mock.patch("sys.stderr", io.StringIO()) as mock_stderr,
- mock.patch("subprocess.Popen") as mock_popen,
- mock.patch("socket.socket") as mock_socket,
- self.assertRaises(FileNotFoundError) as cm, # Expect FileNotFoundError, not SystemExit
+ self.assertRaises(SystemExit) as cm,
):
- self._setup_sync_mocks(mock_socket, mock_popen)
- # Override to raise FileNotFoundError for non-existent script
- mock_popen.side_effect = FileNotFoundError("12345")
- from profiling.sampling.cli import main
main()
# Verify the error is about the non-existent script
- self.assertIn("12345", str(cm.exception))
+ self.assertIn("12345", str(cm.exception.code))
def test_cli_no_target_specified(self):
# In new CLI, must specify a subcommand
mock.patch("sys.stderr", io.StringIO()) as mock_stderr,
self.assertRaises(SystemExit) as cm,
):
- from profiling.sampling.cli import main
main()
self.assertEqual(cm.exception.code, 2) # argparse error
self.assertIn("invalid choice", error_msg)
@unittest.skipIf(is_emscripten, "socket.SO_REUSEADDR does not exist")
+ @requires_subprocess()
def test_cli_module_with_profiler_options(self):
test_args = [
"profiling.sampling.cli",
mock.patch("profiling.sampling.cli.sample") as mock_sample,
mock.patch("subprocess.Popen") as mock_popen,
mock.patch("socket.socket") as mock_socket,
+ mock.patch("profiling.sampling.cli._wait_for_ready_signal"),
+ mock.patch("importlib.util.find_spec", return_value=True),
):
self._setup_sync_mocks(mock_socket, mock_popen)
- from profiling.sampling.cli import main
main()
self._verify_coordinator_command(mock_popen, ("-m", "mymodule"))
mock.patch("profiling.sampling.cli.sample") as mock_sample,
mock.patch("subprocess.Popen") as mock_popen,
mock.patch("socket.socket") as mock_socket,
+ mock.patch("profiling.sampling.cli._wait_for_ready_signal"),
+ mock.patch("os.path.exists", return_value=True),
):
self._setup_sync_mocks(mock_socket, mock_popen)
- from profiling.sampling.cli import main
main()
self._verify_coordinator_command(
mock.patch("sys.stderr", io.StringIO()) as mock_stderr,
self.assertRaises(SystemExit) as cm,
):
- from profiling.sampling.cli import main
main()
self.assertEqual(cm.exception.code, 2) # argparse error
self.assertIn("required: target", error_msg) # argparse error for missing positional arg
@unittest.skipIf(is_emscripten, "socket.SO_REUSEADDR does not exist")
+ @requires_subprocess()
def test_cli_long_module_option(self):
test_args = [
"profiling.sampling.cli",
mock.patch("profiling.sampling.cli.sample") as mock_sample,
mock.patch("subprocess.Popen") as mock_popen,
mock.patch("socket.socket") as mock_socket,
+ mock.patch("profiling.sampling.cli._wait_for_ready_signal"),
+ mock.patch("importlib.util.find_spec", return_value=True),
):
self._setup_sync_mocks(mock_socket, mock_popen)
- from profiling.sampling.cli import main
main()
self._verify_coordinator_command(
mock.patch(
"profiling.sampling.cli._run_with_sync"
) as mock_run_with_sync,
+ mock.patch("os.path.exists", return_value=True),
):
mock_process = mock.MagicMock()
mock_process.pid = 12345
mock_process.poll.return_value = None
mock_run_with_sync.return_value = mock_process
- from profiling.sampling.cli import main
main()
mock_run_with_sync.assert_called_once_with(
),
]
- from profiling.sampling.cli import main
-
for test_args, expected_error_keyword in test_cases:
with (
mock.patch("sys.argv", test_args),
mock.patch("sys.argv", test_args),
mock.patch("profiling.sampling.cli.sample") as mock_sample,
):
- from profiling.sampling.cli import main
main()
# Check that sample was called (exact filename depends on implementation)
),
]
- from profiling.sampling.cli import main
-
for test_args, expected_filename, expected_format in test_cases:
with (
mock.patch("sys.argv", test_args),
mock.patch("sys.stderr", io.StringIO()),
):
with self.assertRaises(SystemExit):
- from profiling.sampling.cli import main
main()
def test_cli_mutually_exclusive_format_options(self):
mock.patch("sys.stderr", io.StringIO()),
):
with self.assertRaises(SystemExit):
- from profiling.sampling.cli import main
main()
def test_argument_parsing_basic(self):
mock.patch("sys.argv", test_args),
mock.patch("profiling.sampling.cli.sample") as mock_sample,
):
- from profiling.sampling.cli import main
main()
mock_sample.assert_called_once()
def test_sort_options(self):
- from profiling.sampling.cli import main
-
sort_options = [
("nsamples", 0),
("tottime", 1),
mock.patch("sys.argv", test_args),
mock.patch("profiling.sampling.cli.sample") as mock_sample,
):
- from profiling.sampling.cli import main
main()
mock_sample.assert_called_once()
mock.patch("sys.argv", test_args),
mock.patch("profiling.sampling.cli.sample") as mock_sample,
):
- from profiling.sampling.cli import main
main()
mock_sample.assert_called_once()
mock.patch("sys.argv", test_args),
mock.patch("profiling.sampling.cli.sample") as mock_sample,
):
- from profiling.sampling.cli import main
main()
mock_sample.assert_called_once()
mock.patch("sys.argv", test_args),
mock.patch("profiling.sampling.cli.sample") as mock_sample,
):
- from profiling.sampling.cli import main
main()
mock_sample.assert_called_once()
mock.patch("sys.stderr", io.StringIO()),
self.assertRaises(SystemExit) as cm,
):
- from profiling.sampling.cli import main
main()
self.assertEqual(cm.exception.code, 2) # argparse error
mock.patch("sys.stderr", io.StringIO()) as mock_stderr,
self.assertRaises(SystemExit) as cm,
):
- from profiling.sampling.cli import main
main()
self.assertEqual(cm.exception.code, 2) # argparse error
mock.patch("sys.stderr", io.StringIO()) as mock_stderr,
self.assertRaises(SystemExit) as cm,
):
- from profiling.sampling.cli import main
main()
self.assertEqual(cm.exception.code, 2) # argparse error
mock.patch("sys.stderr", io.StringIO()) as mock_stderr,
self.assertRaises(SystemExit) as cm,
):
- from profiling.sampling.cli import main
main()
self.assertEqual(cm.exception.code, 2) # argparse error
mock.patch("sys.stderr", io.StringIO()) as mock_stderr,
self.assertRaises(SystemExit) as cm,
):
- from profiling.sampling.cli import main
main()
self.assertEqual(cm.exception.code, 2) # argparse error
mock.patch("sys.stderr", io.StringIO()) as mock_stderr,
self.assertRaises(SystemExit) as cm,
):
- from profiling.sampling.cli import main
main()
self.assertEqual(cm.exception.code, 2) # argparse error
mock.patch("sys.stderr", io.StringIO()) as mock_stderr,
self.assertRaises(SystemExit) as cm,
):
- from profiling.sampling.cli import main
main()
self.assertEqual(cm.exception.code, 2) # argparse error
error_msg = mock_stderr.getvalue()
self.assertIn("--all-threads", error_msg)
self.assertIn("incompatible with --async-aware", error_msg)
+
+ @unittest.skipIf(is_emscripten, "subprocess not available")
+ def test_run_nonexistent_script_exits_cleanly(self):
+ """Test that running a non-existent script exits with a clean error."""
+ with mock.patch("sys.argv", ["profiling.sampling.cli", "run", "/nonexistent/script.py"]):
+ with self.assertRaises(SystemExit) as cm:
+ main()
+ self.assertIn("Script not found", str(cm.exception.code))
+
+ @unittest.skipIf(is_emscripten, "subprocess not available")
+ def test_run_nonexistent_module_exits_cleanly(self):
+ """Test that running a non-existent module exits with a clean error."""
+ with mock.patch("sys.argv", ["profiling.sampling.cli", "run", "-m", "nonexistent_module_xyz"]):
+ with self.assertRaises(SystemExit) as cm:
+ main()
+ self.assertIn("Module not found", str(cm.exception.code))