:widths: 25 75
* - Option
- - Default behavior
- * - ``--interval`` / ``-i``
+ - Default
+ * - Default for ``--interval`` / ``-i``
- 100 µs between samples (~10,000 samples/sec)
- * - ``--duration`` / ``-d``
- - Profile for 10 seconds
- * - ``--all-threads`` / ``-a``
- - Sample main thread only
- * - ``--native``
+ * - Default for ``--duration`` / ``-d``
+ - 10 seconds
+ * - Default for ``--all-threads`` / ``-a``
+ - Main thread only
+ * - Default for ``--native``
- No ``<native>`` frames (C code time attributed to caller)
- * - ``--no-gc``
- - Include ``<GC>`` frames when garbage collection is active
- * - ``--mode``
+ * - Default for ``--no-gc``
+ - ``<GC>`` frames included when garbage collection is active
+ * - Default for ``--mode``
- Wall-clock mode (all samples recorded)
- * - ``--realtime-stats``
- - No live statistics display during profiling
+ * - Default for ``--realtime-stats``
+ - Disabled
+ * - Default for ``--subprocesses``
+ - Disabled
Sampling interval and duration
:ref:`sampling-efficiency` for details on interpreting these metrics.
+Subprocess profiling
+--------------------
+
+The :option:`--subprocesses` option enables automatic profiling of subprocesses
+spawned by the target::
+
+ python -m profiling.sampling run --subprocesses script.py
+ python -m profiling.sampling attach --subprocesses 12345
+
+When enabled, the profiler monitors the target process for child process
+creation. When a new Python child process is detected, a separate profiler
+instance is automatically spawned to profile it. This is useful for
+applications that use :mod:`multiprocessing`, :mod:`subprocess`,
+:mod:`concurrent.futures` with :class:`~concurrent.futures.ProcessPoolExecutor`,
+or other process spawning mechanisms.
+
+.. code-block:: python
+ :caption: worker_pool.py
+
+ from concurrent.futures import ProcessPoolExecutor
+ import math
+
+ def compute_factorial(n):
+ total = 0
+ for i in range(50):
+ total += math.factorial(n)
+ return total
+
+ if __name__ == "__main__":
+ numbers = [5000 + i * 100 for i in range(50)]
+ with ProcessPoolExecutor(max_workers=4) as executor:
+ results = list(executor.map(compute_factorial, numbers))
+ print(f"Computed {len(results)} factorials")
+
+::
+
+ python -m profiling.sampling run --subprocesses --flamegraph worker_pool.py
+
+This produces separate flame graphs for the main process and each worker
+process: ``flamegraph_<main_pid>.html``, ``flamegraph_<worker1_pid>.html``,
+and so on.
+
+Each subprocess receives its own output file. The filename is derived from
+the specified output path (or the default) with the subprocess's process ID
+appended:
+
+- If you specify ``-o profile.html``, subprocesses produce ``profile_12345.html``,
+ ``profile_12346.html``, and so on
+- With default output, subprocesses produce files like ``flamegraph_12345.html``
+ or directories like ``heatmap_12345``
+- For pstats format (which defaults to stdout), subprocesses produce files like
+ ``profile_12345.pstats``
+
+The subprocess profilers inherit most sampling options from the parent (interval,
+duration, thread selection, native frames, GC frames, async-aware mode, and
+output format). All Python descendant processes are profiled recursively,
+including grandchildren and further descendants.
+
+Subprocess detection works by periodically scanning for new descendants of
+the target process and checking whether each new process is a Python process
+by probing the process memory for Python runtime structures. Non-Python
+subprocesses (such as shell commands or external tools) are ignored.
+
+There is a limit of 100 concurrent subprocess profilers to prevent resource
+exhaustion in programs that spawn many processes. If this limit is reached,
+additional subprocesses are not profiled and a warning is printed.
+
+The :option:`--subprocesses` option is incompatible with :option:`--live` mode
+because live mode uses an interactive terminal interface that cannot
+accommodate multiple concurrent profiler displays.
+
+
.. _sampling-efficiency:
Sampling efficiency
Compatible with ``--live``, ``--flamegraph``, ``--heatmap``, and ``--gecko``
formats only.
+.. option:: --subprocesses
+
+ Also profile subprocesses. Each subprocess gets its own profiler
+ instance and output file. Incompatible with ``--live``.
+
Mode options
------------
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(readline));
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(readonly));
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(real));
+ _PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(recursive));
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(reducer_override));
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(registry));
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(rel_tol));
STRUCT_FOR_ID(readline)
STRUCT_FOR_ID(readonly)
STRUCT_FOR_ID(real)
+ STRUCT_FOR_ID(recursive)
STRUCT_FOR_ID(reducer_override)
STRUCT_FOR_ID(registry)
STRUCT_FOR_ID(rel_tol)
INIT_ID(readline), \
INIT_ID(readonly), \
INIT_ID(real), \
+ INIT_ID(recursive), \
INIT_ID(reducer_override), \
INIT_ID(registry), \
INIT_ID(rel_tol), \
_PyUnicode_InternStatic(interp, &string);
assert(_PyUnicode_CheckConsistency(string, 1));
assert(PyUnicode_GET_LENGTH(string) != 1);
+ string = &_Py_ID(recursive);
+ _PyUnicode_InternStatic(interp, &string);
+ assert(_PyUnicode_CheckConsistency(string, 1));
+ assert(PyUnicode_GET_LENGTH(string) != 1);
string = &_Py_ID(reducer_override);
_PyUnicode_InternStatic(interp, &string);
assert(_PyUnicode_CheckConsistency(string, 1));
--- /dev/null
+"""
+Child process monitoring for the sampling profiler.
+
+This module monitors a target process for child process creation and spawns
+separate profiler instances for each discovered child.
+"""
+
+import subprocess
+import sys
+import threading
+import time
+
+import _remote_debugging
+
+# Polling interval for child process discovery
+_CHILD_POLL_INTERVAL_SEC = 0.1
+
+# Default timeout for waiting on child profilers
+_DEFAULT_WAIT_TIMEOUT = 30.0
+
+# Maximum number of child profilers to spawn (prevents resource exhaustion)
+_MAX_CHILD_PROFILERS = 100
+
+# Interval for cleaning up completed profilers (in polling cycles)
+_CLEANUP_INTERVAL_CYCLES = 10
+
+
+def get_child_pids(pid, recursive=True):
+ """
+ Get all child process IDs of the given process.
+
+ Args:
+ pid: Process ID of the parent process
+ recursive: If True, return all descendants (children, grandchildren, etc.)
+
+ Returns:
+ List of child PIDs
+ """
+ return _remote_debugging.get_child_pids(pid, recursive=recursive)
+
+
+def is_python_process(pid):
+ """
+ Check if a process is a Python process.
+
+ Args:
+ pid: Process ID to check
+
+ Returns:
+ bool: True if the process appears to be a Python process, False otherwise
+ """
+ return _remote_debugging.is_python_process(pid)
+
+
+class ChildProcessMonitor:
+ """
+ Monitors a target process for child processes and spawns profilers for them.
+
+ Use as a context manager:
+ with ChildProcessMonitor(pid, cli_args, output_pattern) as monitor:
+ # monitoring runs here
+ monitor.wait_for_profilers() # optional: wait before cleanup
+ # cleanup happens automatically
+ """
+
+ def __init__(self, pid, cli_args, output_pattern):
+ """
+ Initialize the child process monitor.
+
+ Args:
+ pid: Parent process ID to monitor
+ cli_args: CLI arguments to pass to child profilers
+ output_pattern: Pattern for output files (format string with {pid})
+ """
+ self.parent_pid = pid
+ self.cli_args = cli_args
+ self.output_pattern = output_pattern
+
+ self._known_children = set()
+ self._spawned_profilers = []
+ self._lock = threading.Lock()
+ self._stop_event = threading.Event()
+ self._monitor_thread = None
+ self._poll_count = 0
+
+ def __enter__(self):
+ self._monitor_thread = threading.Thread(
+ target=self._monitor_loop,
+ daemon=True,
+ name=f"child-monitor-{self.parent_pid}",
+ )
+ self._monitor_thread.start()
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self._stop_event.set()
+ if self._monitor_thread is not None:
+ self._monitor_thread.join(timeout=2.0)
+ if self._monitor_thread.is_alive():
+ print(
+ "Warning: Monitor thread did not stop cleanly",
+ file=sys.stderr,
+ )
+
+ # Wait for child profilers to complete naturally
+ self.wait_for_profilers()
+
+ # Terminate any remaining profilers
+ with self._lock:
+ profilers_to_cleanup = list(self._spawned_profilers)
+ self._spawned_profilers.clear()
+
+ for proc in profilers_to_cleanup:
+ self._cleanup_process(proc)
+ return False
+
+ def _cleanup_process(self, proc, terminate_timeout=2.0, kill_timeout=1.0):
+ if proc.poll() is not None:
+ return # Already terminated
+
+ proc.terminate()
+ try:
+ proc.wait(timeout=terminate_timeout)
+ except subprocess.TimeoutExpired:
+ proc.kill()
+ try:
+ proc.wait(timeout=kill_timeout)
+ except subprocess.TimeoutExpired:
+ # Last resort: wait indefinitely to avoid zombie
+ # SIGKILL should always work, but we must reap the process
+ try:
+ proc.wait()
+ except Exception:
+ pass
+
+ @property
+ def spawned_profilers(self):
+ with self._lock:
+ return list(self._spawned_profilers)
+
+ def wait_for_profilers(self, timeout=_DEFAULT_WAIT_TIMEOUT):
+ """
+ Wait for all spawned child profilers to complete.
+
+ Call this before exiting the context if you want profilers to finish
+ their work naturally rather than being terminated.
+
+ Args:
+ timeout: Maximum time to wait in seconds
+ """
+ profilers = self.spawned_profilers
+ if not profilers:
+ return
+
+ print(
+ f"Waiting for {len(profilers)} child profiler(s) to complete...",
+ file=sys.stderr,
+ )
+
+ deadline = time.monotonic() + timeout
+ for proc in profilers:
+ remaining = deadline - time.monotonic()
+ if remaining <= 0:
+ break
+ try:
+ proc.wait(timeout=max(0.1, remaining))
+ except subprocess.TimeoutExpired:
+ pass
+
+ def _monitor_loop(self):
+ # Note: There is an inherent TOCTOU race between discovering a child
+ # process and checking if it's Python. This is expected for process monitoring.
+ while not self._stop_event.is_set():
+ try:
+ self._poll_count += 1
+
+ # Periodically clean up completed profilers to avoid memory buildup
+ if self._poll_count % _CLEANUP_INTERVAL_CYCLES == 0:
+ self._cleanup_completed_profilers()
+
+ children = set(get_child_pids(self.parent_pid, recursive=True))
+
+ with self._lock:
+ new_children = children - self._known_children
+ self._known_children.update(new_children)
+
+ for child_pid in new_children:
+ # Only spawn profiler if this is actually a Python process
+ if is_python_process(child_pid):
+ self._spawn_profiler_for_child(child_pid)
+
+ except ProcessLookupError:
+ # Parent process exited, stop monitoring
+ break
+ except Exception as e:
+ # Log error but continue monitoring
+ print(
+ f"Warning: Error in child monitor loop: {e}",
+ file=sys.stderr,
+ )
+
+ self._stop_event.wait(timeout=_CHILD_POLL_INTERVAL_SEC)
+
+ def _cleanup_completed_profilers(self):
+ with self._lock:
+ # Keep only profilers that are still running
+ self._spawned_profilers = [
+ p for p in self._spawned_profilers if p.poll() is None
+ ]
+
+ def _spawn_profiler_for_child(self, child_pid):
+ if self._stop_event.is_set():
+ return
+
+ # Check if we've reached the maximum number of child profilers
+ with self._lock:
+ if len(self._spawned_profilers) >= _MAX_CHILD_PROFILERS:
+ print(
+ f"Warning: Max child profilers ({_MAX_CHILD_PROFILERS}) reached, "
+ f"skipping PID {child_pid}",
+ file=sys.stderr,
+ )
+ return
+
+ cmd = [
+ sys.executable,
+ "-m",
+ "profiling.sampling",
+ "attach",
+ str(child_pid),
+ ]
+ cmd.extend(self._build_child_cli_args(child_pid))
+
+ proc = None
+ try:
+ proc = subprocess.Popen(
+ cmd,
+ stdin=subprocess.DEVNULL,
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL,
+ )
+ with self._lock:
+ if self._stop_event.is_set():
+ self._cleanup_process(
+ proc, terminate_timeout=1.0, kill_timeout=1.0
+ )
+ return
+ self._spawned_profilers.append(proc)
+
+ print(
+ f"Started profiler for child process {child_pid}",
+ file=sys.stderr,
+ )
+ except Exception as e:
+ if proc is not None:
+ self._cleanup_process(
+ proc, terminate_timeout=1.0, kill_timeout=1.0
+ )
+ print(
+ f"Warning: Failed to start profiler for child {child_pid}: {e}",
+ file=sys.stderr,
+ )
+
+ def _build_child_cli_args(self, child_pid):
+ args = list(self.cli_args)
+
+ if self.output_pattern:
+ # Use replace() instead of format() to handle user filenames with braces
+ output_file = self.output_pattern.replace("{pid}", str(child_pid))
+ found_output = False
+ for i, arg in enumerate(args):
+ if arg in ("-o", "--output") and i + 1 < len(args):
+ args[i + 1] = output_file
+ found_output = True
+ break
+ if not found_output:
+ args.extend(["-o", output_file])
+
+ return args
import subprocess
import sys
import time
+from contextlib import nullcontext
from .sample import sample, sample_live
from .pstats_collector import PstatsCollector
"heatmap": HeatmapCollector,
}
+def _setup_child_monitor(args, parent_pid):
+ from ._child_monitor import ChildProcessMonitor
+
+ # Build CLI args for child profilers (excluding --subprocesses to avoid recursion)
+ child_cli_args = _build_child_profiler_args(args)
+
+ # Build output pattern
+ output_pattern = _build_output_pattern(args)
+
+ return ChildProcessMonitor(
+ pid=parent_pid,
+ cli_args=child_cli_args,
+ output_pattern=output_pattern,
+ )
+
+
+def _get_child_monitor_context(args, pid):
+ if getattr(args, 'subprocesses', False):
+ return _setup_child_monitor(args, pid)
+ return nullcontext()
+
+
+def _build_child_profiler_args(args):
+ child_args = []
+
+ # Sampling options
+ child_args.extend(["-i", str(args.interval)])
+ child_args.extend(["-d", str(args.duration)])
+
+ if args.all_threads:
+ child_args.append("-a")
+ if args.realtime_stats:
+ child_args.append("--realtime-stats")
+ if args.native:
+ child_args.append("--native")
+ if not args.gc:
+ child_args.append("--no-gc")
+ if args.opcodes:
+ child_args.append("--opcodes")
+ if args.async_aware:
+ child_args.append("--async-aware")
+ async_mode = getattr(args, 'async_mode', 'running')
+ if async_mode != "running":
+ child_args.extend(["--async-mode", async_mode])
+
+ # Mode options
+ mode = getattr(args, 'mode', 'wall')
+ if mode != "wall":
+ child_args.extend(["--mode", mode])
+
+ # Format options (skip pstats as it's the default)
+ if args.format != "pstats":
+ child_args.append(f"--{args.format}")
+
+ return child_args
+
+
+def _build_output_pattern(args):
+ """Build output filename pattern for child profilers.
+
+ The pattern uses {pid} as a placeholder which will be replaced with the
+ actual child PID using str.replace(), so user filenames with braces are safe.
+ """
+ if args.outfile:
+ # User specified output - add PID to filename
+ base, ext = os.path.splitext(args.outfile)
+ if ext:
+ return f"{base}_{{pid}}{ext}"
+ else:
+ return f"{args.outfile}_{{pid}}"
+ else:
+ # Use default pattern based on format (consistent _ separator)
+ extension = FORMAT_EXTENSIONS.get(args.format, "txt")
+ if args.format == "heatmap":
+ return "heatmap_{pid}"
+ if args.format == "pstats":
+ # pstats defaults to stdout, but for subprocesses we need files
+ return "profile_{pid}.pstats"
+ return f"{args.format}_{{pid}}.{extension}"
+
def _parse_mode(mode_string):
"""Convert mode string to mode constant."""
action="store_true",
help="Enable async-aware profiling (uses task-based stack reconstruction)",
)
+ sampling_group.add_argument(
+ "--subprocesses",
+ action="store_true",
+ help="Also profile subprocesses. Each subprocess gets its own profiler and output file.",
+ )
def _add_mode_options(parser):
# For heatmap, use cleaner directory name without extension
if format_type == "heatmap":
return f"heatmap_{pid}"
- return f"{format_type}.{pid}.{extension}"
+ return f"{format_type}_{pid}.{extension}"
def _handle_output(collector, args, pid, mode):
"""
if args.format == "pstats":
if args.outfile:
- collector.export(args.outfile)
+ # If outfile is a directory, generate filename inside it
+ if os.path.isdir(args.outfile):
+ filename = os.path.join(args.outfile, _generate_output_filename(args.format, pid))
+ collector.export(filename)
+ else:
+ collector.export(args.outfile)
else:
# Print to stdout with defaults applied
sort_choice = args.sort if args.sort is not None else "nsamples"
)
else:
# Export to file
- filename = args.outfile or _generate_output_filename(args.format, pid)
+ if args.outfile and os.path.isdir(args.outfile):
+ # If outfile is a directory, generate filename inside it
+ filename = os.path.join(args.outfile, _generate_output_filename(args.format, pid))
+ else:
+ filename = args.outfile or _generate_output_filename(args.format, pid)
collector.export(filename)
"Live mode requires the curses module, which is not available."
)
+ # --subprocesses is incompatible with --live
+ if hasattr(args, 'subprocesses') and args.subprocesses:
+ if hasattr(args, 'live') and args.live:
+ parser.error("--subprocesses is incompatible with --live mode.")
+
# Async-aware mode is incompatible with --native, --no-gc, --mode, and --all-threads
if args.async_aware:
issues = []
# Create the appropriate collector
collector = _create_collector(args.format, args.interval, skip_idle, args.opcodes)
- # Sample the process
- collector = sample(
- args.pid,
- collector,
- duration_sec=args.duration,
- all_threads=args.all_threads,
- realtime_stats=args.realtime_stats,
- mode=mode,
- async_aware=args.async_mode if args.async_aware else None,
- native=args.native,
- gc=args.gc,
- opcodes=args.opcodes,
- )
-
- # Handle output
- _handle_output(collector, args, args.pid, mode)
+ with _get_child_monitor_context(args, args.pid):
+ collector = sample(
+ args.pid,
+ collector,
+ duration_sec=args.duration,
+ all_threads=args.all_threads,
+ realtime_stats=args.realtime_stats,
+ mode=mode,
+ async_aware=args.async_mode if args.async_aware else None,
+ native=args.native,
+ gc=args.gc,
+ opcodes=args.opcodes,
+ )
+ _handle_output(collector, args, args.pid, mode)
def _handle_run(args):
# Create the appropriate collector
collector = _create_collector(args.format, args.interval, skip_idle, args.opcodes)
- # Profile the subprocess
- try:
- collector = sample(
- process.pid,
- collector,
- duration_sec=args.duration,
- all_threads=args.all_threads,
- realtime_stats=args.realtime_stats,
- mode=mode,
- async_aware=args.async_mode if args.async_aware else None,
- native=args.native,
- gc=args.gc,
- opcodes=args.opcodes,
- )
-
- # Handle output
- _handle_output(collector, args, process.pid, mode)
- finally:
- # Clean up the subprocess
- if process.poll() is None:
- process.terminate()
- try:
- process.wait(timeout=_PROCESS_KILL_TIMEOUT)
- except subprocess.TimeoutExpired:
- process.kill()
- process.wait()
+ with _get_child_monitor_context(args, process.pid):
+ try:
+ collector = sample(
+ process.pid,
+ collector,
+ duration_sec=args.duration,
+ all_threads=args.all_threads,
+ realtime_stats=args.realtime_stats,
+ mode=mode,
+ async_aware=args.async_mode if args.async_aware else None,
+ native=args.native,
+ gc=args.gc,
+ opcodes=args.opcodes,
+ )
+ _handle_output(collector, args, process.pid, mode)
+ finally:
+ # Terminate the main subprocess - child profilers finish when their
+ # target processes exit
+ if process.poll() is None:
+ process.terminate()
+ try:
+ process.wait(timeout=_PROCESS_KILL_TIMEOUT)
+ except subprocess.TimeoutExpired:
+ process.kill()
+ process.wait()
def _handle_live_attach(args, pid):
"has_fork_support", "requires_fork",
"has_subprocess_support", "requires_subprocess",
"has_socket_support", "requires_working_socket",
+ "has_remote_subprocess_debugging", "requires_remote_subprocess_debugging",
"anticipate_failure", "load_package_tests", "detect_api_mismatch",
"check__all__", "skip_if_buggy_ucrt_strfptime",
"check_disallow_instantiation", "check_sanitizer", "skip_if_sanitizer",
else:
return unittest.skipUnless(has_socket_support, msg)
+
+@functools.cache
+def has_remote_subprocess_debugging():
+ """Check if we have permissions to debug subprocesses remotely.
+
+ Returns True if we have permissions, False if we don't.
+ Checks for:
+ - Platform support (Linux, macOS, Windows only)
+ - On Linux: process_vm_readv support
+ - _remote_debugging module availability
+ - Actual subprocess debugging permissions (e.g., macOS entitlements)
+ Result is cached.
+ """
+ # Check platform support
+ if sys.platform not in ("linux", "darwin", "win32"):
+ return False
+
+ try:
+ import _remote_debugging
+ except ImportError:
+ return False
+
+ # On Linux, check for process_vm_readv support
+ if sys.platform == "linux":
+ if not getattr(_remote_debugging, "PROCESS_VM_READV_SUPPORTED", False):
+ return False
+
+ # First check if we can read our own process
+ if not _remote_debugging.is_python_process(os.getpid()):
+ return False
+
+ # Check subprocess access - debugging child processes may require
+ # additional permissions depending on platform security settings
+ import socket
+ import subprocess
+
+ # Create a socket for child to signal readiness
+ server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ server.bind(("127.0.0.1", 0))
+ server.listen(1)
+ port = server.getsockname()[1]
+
+ # Child connects to signal it's ready, then waits for parent to close
+ child_code = f"""
+import socket
+s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+s.connect(("127.0.0.1", {port}))
+s.recv(1) # Wait for parent to signal done
+"""
+ proc = subprocess.Popen(
+ [sys.executable, "-c", child_code],
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL,
+ )
+ try:
+ server.settimeout(5.0)
+ conn, _ = server.accept()
+ # Child is ready, test if we can probe it
+ result = _remote_debugging.is_python_process(proc.pid)
+ # Check if subprocess is still alive after probing
+ if proc.poll() is not None:
+ return False
+ conn.close() # Signal child to exit
+ return result
+ except (socket.timeout, OSError):
+ return False
+ finally:
+ server.close()
+ proc.kill()
+ proc.wait()
+
+
+def requires_remote_subprocess_debugging():
+ """Skip tests that require remote subprocess debugging permissions.
+
+ This also implies subprocess support, so no need to use both
+ @requires_subprocess() and @requires_remote_subprocess_debugging().
+ """
+ if not has_subprocess_support:
+ return unittest.skip("requires subprocess support")
+ return unittest.skipUnless(
+ has_remote_subprocess_debugging(),
+ "requires remote subprocess debugging permissions"
+ )
+
+
# Does strftime() support glibc extension like '%4Y'?
has_strftime_extensions = False
if sys.platform != "win32":
SHORT_TIMEOUT,
busy_retry,
requires_gil_enabled,
+ requires_remote_subprocess_debugging,
)
from test.support.script_helper import make_script
from test.support.socket_helper import find_unused_port
if wait_for_signals:
_wait_for_signal(client_socket, wait_for_signals)
- try:
- trace = trace_func(p.pid)
- except PermissionError:
- self.skipTest(
- "Insufficient permissions to read the stack trace"
- )
+ trace = trace_func(p.pid)
return trace, script_name
finally:
_cleanup_sockets(client_socket, server_socket)
# ============================================================================
+@requires_remote_subprocess_debugging()
class TestGetStackTrace(RemoteInspectionTestBase):
@skip_if_not_supported
@unittest.skipIf(
client_socket, [b"ready:main", b"ready:thread"]
)
- try:
- stack_trace = get_stack_trace(p.pid)
- except PermissionError:
- self.skipTest(
- "Insufficient permissions to read the stack trace"
- )
+ stack_trace = get_stack_trace(p.pid)
# Find expected thread stack by funcname
found_thread = self._find_thread_with_frame(
response = _wait_for_signal(client_socket, b"ready")
self.assertIn(b"ready", response)
- try:
- stack_trace = get_async_stack_trace(p.pid)
- except PermissionError:
- self.skipTest(
- "Insufficient permissions to read the stack trace"
- )
+ stack_trace = get_async_stack_trace(p.pid)
# Check all tasks are present
tasks_names = [
response = _wait_for_signal(client_socket, b"ready")
self.assertIn(b"ready", response)
- try:
- stack_trace = get_async_stack_trace(p.pid)
- except PermissionError:
- self.skipTest(
- "Insufficient permissions to read the stack trace"
- )
+ stack_trace = get_async_stack_trace(p.pid)
# For this simple asyncgen test, we only expect one task
self.assertEqual(len(stack_trace[0].awaited_by), 1)
response = _wait_for_signal(client_socket, b"ready")
self.assertIn(b"ready", response)
- try:
- stack_trace = get_async_stack_trace(p.pid)
- except PermissionError:
- self.skipTest(
- "Insufficient permissions to read the stack trace"
- )
+ stack_trace = get_async_stack_trace(p.pid)
# Check all tasks are present
tasks_names = [
response = _wait_for_signal(client_socket, b"ready")
self.assertIn(b"ready", response)
- try:
- stack_trace = get_async_stack_trace(p.pid)
- except PermissionError:
- self.skipTest(
- "Insufficient permissions to read the stack trace"
- )
+ stack_trace = get_async_stack_trace(p.pid)
# Check all tasks are present
tasks_names = [
except RuntimeError as e:
self.fail(str(e))
- try:
- all_awaited_by = get_all_awaited_by(p.pid)
- except PermissionError:
- self.skipTest(
- "Insufficient permissions to read the stack trace"
- )
+ all_awaited_by = get_all_awaited_by(p.pid)
# Expected: a list of two elements: 1 thread, 1 interp
self.assertEqual(len(all_awaited_by), 2)
server_socket.close()
server_socket = None
- try:
- stack_trace = get_stack_trace(p.pid)
- except PermissionError:
- self.skipTest(
- "Insufficient permissions to read the stack trace"
- )
+ stack_trace = get_stack_trace(p.pid)
# Verify we have at least one interpreter
self.assertGreaterEqual(len(stack_trace), 1)
server_socket.close()
server_socket = None
- try:
- stack_trace = get_stack_trace(p.pid)
- except PermissionError:
- self.skipTest(
- "Insufficient permissions to read the stack trace"
- )
+ stack_trace = get_stack_trace(p.pid)
# Verify we have multiple interpreters
self.assertGreaterEqual(len(stack_trace), 2)
# Wait for ready and working signals
_wait_for_signal(client_socket, [b"ready", b"working"])
- try:
- # Get stack trace with all threads
- unwinder_all = RemoteUnwinder(p.pid, all_threads=True)
- for _ in range(MAX_TRIES):
- all_traces = unwinder_all.get_stack_trace()
- found = self._find_frame_in_trace(
- all_traces,
- lambda f: f.funcname == "main_work"
- and f.location.lineno > 12,
- )
- if found:
- break
- time.sleep(0.1)
- else:
- self.fail(
- "Main thread did not start its busy work on time"
- )
-
- # Get stack trace with only GIL holder
- unwinder_gil = RemoteUnwinder(
- p.pid, only_active_thread=True
+ # Get stack trace with all threads
+ unwinder_all = RemoteUnwinder(p.pid, all_threads=True)
+ for _ in range(MAX_TRIES):
+ all_traces = unwinder_all.get_stack_trace()
+ found = self._find_frame_in_trace(
+ all_traces,
+ lambda f: f.funcname == "main_work"
+ and f.location.lineno > 12,
)
- gil_traces = unwinder_gil.get_stack_trace()
- except PermissionError:
- self.skipTest(
- "Insufficient permissions to read the stack trace"
+ if found:
+ break
+ time.sleep(0.1)
+ else:
+ self.fail(
+ "Main thread did not start its busy work on time"
)
+ # Get stack trace with only GIL holder
+ unwinder_gil = RemoteUnwinder(
+ p.pid, only_active_thread=True
+ )
+ gil_traces = unwinder_gil.get_stack_trace()
+
# Count threads
total_threads = sum(
len(interp.threads) for interp in all_traces
)
+@requires_remote_subprocess_debugging()
class TestDetectionOfThreadStatus(RemoteInspectionTestBase):
def _run_thread_status_test(self, mode, check_condition):
"""
# Sample until we see expected thread states
statuses = {}
- try:
- unwinder = RemoteUnwinder(
- p.pid,
- all_threads=True,
- mode=mode,
- skip_non_matching_threads=False,
- )
- for _ in range(MAX_TRIES):
- traces = unwinder.get_stack_trace()
- statuses = self._get_thread_statuses(traces)
-
- if check_condition(
- statuses, sleeper_tid, busy_tid
- ):
- break
- time.sleep(0.5)
- except PermissionError:
- self.skipTest(
- "Insufficient permissions to read the stack trace"
- )
+ unwinder = RemoteUnwinder(
+ p.pid,
+ all_threads=True,
+ mode=mode,
+ skip_non_matching_threads=False,
+ )
+ for _ in range(MAX_TRIES):
+ traces = unwinder.get_stack_trace()
+ statuses = self._get_thread_statuses(traces)
+
+ if check_condition(
+ statuses, sleeper_tid, busy_tid
+ ):
+ break
+ time.sleep(0.5)
return statuses, sleeper_tid, busy_tid
finally:
server_socket = None
statuses = {}
- try:
- unwinder = RemoteUnwinder(
- p.pid,
- all_threads=True,
- mode=PROFILING_MODE_ALL,
- skip_non_matching_threads=False,
- )
- for _ in range(MAX_TRIES):
- traces = unwinder.get_stack_trace()
- statuses = self._get_thread_statuses(traces)
-
- # Check ALL mode provides both GIL and CPU info
- if (
- sleeper_tid in statuses
- and busy_tid in statuses
- and not (
- statuses[sleeper_tid]
- & THREAD_STATUS_ON_CPU
- )
- and not (
- statuses[sleeper_tid]
- & THREAD_STATUS_HAS_GIL
- )
- and (statuses[busy_tid] & THREAD_STATUS_ON_CPU)
- and (
- statuses[busy_tid] & THREAD_STATUS_HAS_GIL
- )
- ):
- break
- time.sleep(0.5)
- except PermissionError:
- self.skipTest(
- "Insufficient permissions to read the stack trace"
- )
+ unwinder = RemoteUnwinder(
+ p.pid,
+ all_threads=True,
+ mode=PROFILING_MODE_ALL,
+ skip_non_matching_threads=False,
+ )
+ for _ in range(MAX_TRIES):
+ traces = unwinder.get_stack_trace()
+ statuses = self._get_thread_statuses(traces)
+
+ # Check ALL mode provides both GIL and CPU info
+ if (
+ sleeper_tid in statuses
+ and busy_tid in statuses
+ and not (
+ statuses[sleeper_tid]
+ & THREAD_STATUS_ON_CPU
+ )
+ and not (
+ statuses[sleeper_tid]
+ & THREAD_STATUS_HAS_GIL
+ )
+ and (statuses[busy_tid] & THREAD_STATUS_ON_CPU)
+ and (
+ statuses[busy_tid] & THREAD_STATUS_HAS_GIL
+ )
+ ):
+ break
+ time.sleep(0.5)
self.assertIsNotNone(
sleeper_tid, "Sleeper thread id not received"
self.assertIsNotNone(normal_tid, "Normal thread id not received")
statuses = {}
- try:
- unwinder = RemoteUnwinder(
- p.pid,
- all_threads=True,
- mode=PROFILING_MODE_ALL,
- skip_non_matching_threads=False,
- )
- for _ in range(MAX_TRIES):
- traces = unwinder.get_stack_trace()
- statuses = self._get_thread_statuses(traces)
-
- if (
- exception_tid in statuses
- and normal_tid in statuses
- and (statuses[exception_tid] & THREAD_STATUS_HAS_EXCEPTION)
- and not (statuses[normal_tid] & THREAD_STATUS_HAS_EXCEPTION)
- ):
- break
- time.sleep(0.5)
- except PermissionError:
- self.skipTest("Insufficient permissions to read the stack trace")
+ unwinder = RemoteUnwinder(
+ p.pid,
+ all_threads=True,
+ mode=PROFILING_MODE_ALL,
+ skip_non_matching_threads=False,
+ )
+ for _ in range(MAX_TRIES):
+ traces = unwinder.get_stack_trace()
+ statuses = self._get_thread_statuses(traces)
+
+ if (
+ exception_tid in statuses
+ and normal_tid in statuses
+ and (statuses[exception_tid] & THREAD_STATUS_HAS_EXCEPTION)
+ and not (statuses[normal_tid] & THREAD_STATUS_HAS_EXCEPTION)
+ ):
+ break
+ time.sleep(0.5)
self.assertIn(exception_tid, statuses)
self.assertIn(normal_tid, statuses)
self.assertIsNotNone(exception_tid, "Exception thread id not received")
self.assertIsNotNone(normal_tid, "Normal thread id not received")
- try:
- unwinder = RemoteUnwinder(
- p.pid,
- all_threads=True,
- mode=PROFILING_MODE_EXCEPTION,
- skip_non_matching_threads=True,
- )
- for _ in range(MAX_TRIES):
- traces = unwinder.get_stack_trace()
- statuses = self._get_thread_statuses(traces)
-
- if exception_tid in statuses:
- self.assertNotIn(
- normal_tid,
- statuses,
- "Normal thread should be filtered out in exception mode",
- )
- return
- time.sleep(0.5)
- except PermissionError:
- self.skipTest("Insufficient permissions to read the stack trace")
+ unwinder = RemoteUnwinder(
+ p.pid,
+ all_threads=True,
+ mode=PROFILING_MODE_EXCEPTION,
+ skip_non_matching_threads=True,
+ )
+ for _ in range(MAX_TRIES):
+ traces = unwinder.get_stack_trace()
+ statuses = self._get_thread_statuses(traces)
+
+ if exception_tid in statuses:
+ self.assertNotIn(
+ normal_tid,
+ statuses,
+ "Normal thread should be filtered out in exception mode",
+ )
+ return
+ time.sleep(0.5)
self.fail("Never found exception thread in exception mode")
+@requires_remote_subprocess_debugging()
class TestExceptionDetectionScenarios(RemoteInspectionTestBase):
"""Test exception detection across all scenarios.
def _check_exception_status(self, p, thread_tid, expect_exception):
"""Helper to check if thread has expected exception status."""
- try:
- unwinder = RemoteUnwinder(
- p.pid,
- all_threads=True,
- mode=PROFILING_MODE_ALL,
- skip_non_matching_threads=False,
- )
-
- # Collect multiple samples for reliability
- results = []
- for _ in range(MAX_TRIES):
- traces = unwinder.get_stack_trace()
- statuses = self._get_thread_statuses(traces)
+ unwinder = RemoteUnwinder(
+ p.pid,
+ all_threads=True,
+ mode=PROFILING_MODE_ALL,
+ skip_non_matching_threads=False,
+ )
- if thread_tid in statuses:
- has_exc = bool(statuses[thread_tid] & THREAD_STATUS_HAS_EXCEPTION)
- results.append(has_exc)
+ # Collect multiple samples for reliability
+ results = []
+ for _ in range(MAX_TRIES):
+ traces = unwinder.get_stack_trace()
+ statuses = self._get_thread_statuses(traces)
- if len(results) >= 3:
- break
+ if thread_tid in statuses:
+ has_exc = bool(statuses[thread_tid] & THREAD_STATUS_HAS_EXCEPTION)
+ results.append(has_exc)
- time.sleep(0.2)
+ if len(results) >= 3:
+ break
- # Check majority of samples match expected
- if not results:
- self.fail("Never found target thread in stack traces")
+ time.sleep(0.2)
- majority = sum(results) > len(results) // 2
- if expect_exception:
- self.assertTrue(
- majority,
- f"Thread should have HAS_EXCEPTION flag, got {results}"
- )
- else:
- self.assertFalse(
- majority,
- f"Thread should NOT have HAS_EXCEPTION flag, got {results}"
- )
+ # Check majority of samples match expected
+ if not results:
+ self.fail("Never found target thread in stack traces")
- except PermissionError:
- self.skipTest("Insufficient permissions to read the stack trace")
+ majority = sum(results) > len(results) // 2
+ if expect_exception:
+ self.assertTrue(
+ majority,
+ f"Thread should have HAS_EXCEPTION flag, got {results}"
+ )
+ else:
+ self.assertFalse(
+ majority,
+ f"Thread should NOT have HAS_EXCEPTION flag, got {results}"
+ )
@unittest.skipIf(
sys.platform not in ("linux", "darwin", "win32"),
self._check_exception_status(p, thread_tid, expect_exception=False)
+@requires_remote_subprocess_debugging()
class TestFrameCaching(RemoteInspectionTestBase):
"""Test that frame caching produces correct results.
)
yield p, client_socket, make_unwinder
-
- except PermissionError:
- self.skipTest(
- "Insufficient permissions to read the stack trace"
- )
finally:
_cleanup_sockets(client_socket, server_socket)
SHORT_TIMEOUT,
SuppressCrashReport,
os_helper,
- requires_subprocess,
+ requires_remote_subprocess_debugging,
script_helper,
)
-from .helpers import close_and_unlink, skip_if_not_supported, test_subprocess
+from .helpers import close_and_unlink, test_subprocess
-@requires_subprocess()
-@skip_if_not_supported
+@requires_remote_subprocess_debugging()
class TestGCFrameTracking(unittest.TestCase):
"""Tests for GC frame tracking in the sampling profiler."""
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
- try:
- from profiling.sampling.pstats_collector import PstatsCollector
- collector = PstatsCollector(sample_interval_usec=5000, skip_idle=False)
- profiling.sampling.sample.sample(
- subproc.process.pid,
- collector,
- duration_sec=1,
- native=False,
- gc=True,
- )
- collector.print_stats(show_summary=False)
- except PermissionError:
- self.skipTest("Insufficient permissions for remote profiling")
+ from profiling.sampling.pstats_collector import PstatsCollector
+ collector = PstatsCollector(sample_interval_usec=5000, skip_idle=False)
+ profiling.sampling.sample.sample(
+ subproc.process.pid,
+ collector,
+ duration_sec=1,
+ native=False,
+ gc=True,
+ )
+ collector.print_stats(show_summary=False)
output = captured_output.getvalue()
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
- try:
- from profiling.sampling.pstats_collector import PstatsCollector
- collector = PstatsCollector(sample_interval_usec=5000, skip_idle=False)
- profiling.sampling.sample.sample(
- subproc.process.pid,
- collector,
- duration_sec=1,
- native=False,
- gc=False,
- )
- collector.print_stats(show_summary=False)
- except PermissionError:
- self.skipTest("Insufficient permissions for remote profiling")
+ from profiling.sampling.pstats_collector import PstatsCollector
+ collector = PstatsCollector(sample_interval_usec=5000, skip_idle=False)
+ profiling.sampling.sample.sample(
+ subproc.process.pid,
+ collector,
+ duration_sec=1,
+ native=False,
+ gc=False,
+ )
+ collector.print_stats(show_summary=False)
output = captured_output.getvalue()
self.assertNotIn("<GC>", output)
-@requires_subprocess()
-@skip_if_not_supported
+@requires_remote_subprocess_debugging()
class TestNativeFrameTracking(unittest.TestCase):
"""Tests for native frame tracking in the sampling profiler."""
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
- try:
- from profiling.sampling.stack_collector import CollapsedStackCollector
- collector = CollapsedStackCollector(1000, skip_idle=False)
- profiling.sampling.sample.sample(
- subproc.process.pid,
- collector,
- duration_sec=1,
- native=True,
- )
- collector.export(collapsed_file.name)
- except PermissionError:
- self.skipTest(
- "Insufficient permissions for remote profiling"
- )
+ from profiling.sampling.stack_collector import CollapsedStackCollector
+ collector = CollapsedStackCollector(1000, skip_idle=False)
+ profiling.sampling.sample.sample(
+ subproc.process.pid,
+ collector,
+ duration_sec=1,
+ native=True,
+ )
+ collector.export(collapsed_file.name)
# Verify file was created and contains valid data
self.assertTrue(os.path.exists(collapsed_file.name))
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
- try:
- from profiling.sampling.pstats_collector import PstatsCollector
- collector = PstatsCollector(sample_interval_usec=5000, skip_idle=False)
- profiling.sampling.sample.sample(
- subproc.process.pid,
- collector,
- duration_sec=1,
- )
- collector.print_stats(show_summary=False)
- except PermissionError:
- self.skipTest("Insufficient permissions for remote profiling")
+ from profiling.sampling.pstats_collector import PstatsCollector
+ collector = PstatsCollector(sample_interval_usec=5000, skip_idle=False)
+ profiling.sampling.sample.sample(
+ subproc.process.pid,
+ collector,
+ duration_sec=1,
+ )
+ collector.print_stats(show_summary=False)
output = captured_output.getvalue()
# Native frames should NOT be present:
self.assertNotIn("<native>", output)
-@requires_subprocess()
-@skip_if_not_supported
+@requires_remote_subprocess_debugging()
class TestProcessPoolExecutorSupport(unittest.TestCase):
"""
Test that ProcessPoolExecutor works correctly with profiling.sampling.
proc.kill()
stdout, stderr = proc.communicate()
- if "Permission Error" in stderr:
- self.skipTest("Insufficient permissions for remote profiling")
-
self.assertIn("Results: [2, 4, 6]", stdout)
self.assertNotIn("Can't pickle", stderr)
--- /dev/null
+"""Tests for --subprocesses subprocess profiling support."""
+
+import argparse
+import io
+import os
+import signal
+import subprocess
+import sys
+import tempfile
+import threading
+import time
+import unittest
+
+from test.support import (
+ SHORT_TIMEOUT,
+ reap_children,
+ requires_remote_subprocess_debugging,
+)
+
+from .helpers import _cleanup_process
+
+# String to check for in stderr when profiler lacks permissions (e.g., macOS)
+_PERMISSION_ERROR_MSG = "Permission Error"
+
+
+def _readline_with_timeout(file_obj, timeout):
+ # Thread-based readline with timeout - works across all platforms
+ # including Windows where select() doesn't work with pipes.
+ # Returns the line read, or None if timeout occurred.
+ result = [None]
+ exception = [None]
+
+ def reader():
+ try:
+ result[0] = file_obj.readline()
+ except Exception as e:
+ exception[0] = e
+
+ thread = threading.Thread(target=reader, daemon=True)
+ thread.start()
+ thread.join(timeout=timeout)
+
+ if thread.is_alive():
+ return None
+
+ if exception[0] is not None:
+ raise exception[0]
+
+ return result[0]
+
+
+def _wait_for_process_ready(proc, timeout):
+ # Wait for a subprocess to be ready using polling instead of fixed sleep.
+ # Returns True if process is ready, False if it exited or timeout.
+ deadline = time.time() + timeout
+ poll_interval = 0.01
+
+ while time.time() < deadline:
+ if proc.poll() is not None:
+ return False
+
+ try:
+ if sys.platform == "linux":
+ if os.path.exists(f"/proc/{proc.pid}/exe"):
+ return True
+ else:
+ return True
+ except OSError:
+ pass
+
+ time.sleep(poll_interval)
+ poll_interval = min(poll_interval * 2, 0.1)
+
+ return proc.poll() is None
+
+
+@requires_remote_subprocess_debugging()
+class TestGetChildPids(unittest.TestCase):
+ """Tests for the get_child_pids function."""
+
+ def setUp(self):
+ reap_children()
+
+ def tearDown(self):
+ reap_children()
+
+ def test_get_child_pids_from_remote_debugging(self):
+ """Test get_child_pids from _remote_debugging module."""
+ try:
+ import _remote_debugging
+
+ # Test that the function exists
+ self.assertTrue(hasattr(_remote_debugging, "get_child_pids"))
+
+ # Test with current process (should return empty or have children if any)
+ result = _remote_debugging.get_child_pids(os.getpid())
+ self.assertIsInstance(result, list)
+ except (ImportError, AttributeError):
+ self.skipTest("_remote_debugging.get_child_pids not available")
+
+ def test_get_child_pids_fallback(self):
+ """Test the fallback implementation for get_child_pids."""
+ from profiling.sampling._child_monitor import get_child_pids
+
+ # Test with current process
+ result = get_child_pids(os.getpid())
+ self.assertIsInstance(result, list)
+
+ @unittest.skipUnless(sys.platform == "linux", "Linux only")
+ def test_discover_child_process_linux(self):
+ """Test that we can discover child processes on Linux."""
+ from profiling.sampling._child_monitor import get_child_pids
+
+ # Create a child process
+ proc = subprocess.Popen(
+ [sys.executable, "-c", "import time; time.sleep(10)"],
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL,
+ )
+
+ try:
+ # Poll until child appears
+ deadline = time.time() + SHORT_TIMEOUT
+ children = []
+ while time.time() < deadline:
+ children = get_child_pids(os.getpid())
+ if proc.pid in children:
+ break
+ time.sleep(0.05)
+
+ self.assertIn(
+ proc.pid,
+ children,
+ f"Child PID {proc.pid} not discovered within {SHORT_TIMEOUT}s. "
+ f"Found PIDs: {children}",
+ )
+ finally:
+ _cleanup_process(proc)
+
+ def test_recursive_child_discovery(self):
+ """Test that recursive=True finds grandchildren."""
+ from profiling.sampling._child_monitor import get_child_pids
+
+ # Create a child that spawns a grandchild and keeps a reference to it
+ # so we can clean it up via the child process
+ code = """
+import subprocess
+import sys
+import threading
+grandchild = subprocess.Popen([sys.executable, '-c', 'import time; time.sleep(60)'])
+print(grandchild.pid, flush=True)
+# Wait for parent to send signal byte (cross-platform)
+# Using threading with timeout so test doesn't hang if something goes wrong
+# Timeout is 60s (2x test timeout) to ensure child outlives test in worst case
+def wait_for_signal():
+ try:
+ sys.stdin.buffer.read(1)
+ except:
+ pass
+t = threading.Thread(target=wait_for_signal, daemon=True)
+t.start()
+t.join(timeout=60)
+# Clean up grandchild before exiting
+if grandchild.poll() is None:
+ grandchild.terminate()
+ try:
+ grandchild.wait(timeout=2)
+ except subprocess.TimeoutExpired:
+ grandchild.kill()
+ try:
+ grandchild.wait(timeout=2)
+ except subprocess.TimeoutExpired:
+ grandchild.wait()
+"""
+ proc = subprocess.Popen(
+ [sys.executable, "-c", code],
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.DEVNULL,
+ )
+
+ grandchild_pid = None
+ try:
+ # Read grandchild PID with thread-based timeout
+ # This prevents indefinite blocking on all platforms
+ grandchild_pid_line = _readline_with_timeout(
+ proc.stdout, SHORT_TIMEOUT
+ )
+ if grandchild_pid_line is None:
+ self.fail(
+ f"Timeout waiting for grandchild PID from child process "
+ f"(child PID: {proc.pid})"
+ )
+ if not grandchild_pid_line:
+ self.fail(
+ f"Child process {proc.pid} closed stdout without printing "
+ f"grandchild PID"
+ )
+ grandchild_pid = int(grandchild_pid_line.strip())
+
+ # Poll until grandchild is visible
+ deadline = time.time() + SHORT_TIMEOUT
+ pids_recursive = []
+ while time.time() < deadline:
+ pids_recursive = get_child_pids(os.getpid(), recursive=True)
+ if grandchild_pid in pids_recursive:
+ break
+ time.sleep(0.05)
+
+ self.assertIn(
+ proc.pid,
+ pids_recursive,
+ f"Child PID {proc.pid} not found in recursive discovery. "
+ f"Found: {pids_recursive}",
+ )
+ self.assertIn(
+ grandchild_pid,
+ pids_recursive,
+ f"Grandchild PID {grandchild_pid} not found in recursive discovery. "
+ f"Found: {pids_recursive}",
+ )
+
+ # Non-recursive should find only direct child
+ pids_direct = get_child_pids(os.getpid(), recursive=False)
+ self.assertIn(
+ proc.pid,
+ pids_direct,
+ f"Child PID {proc.pid} not found in non-recursive discovery. "
+ f"Found: {pids_direct}",
+ )
+ self.assertNotIn(
+ grandchild_pid,
+ pids_direct,
+ f"Grandchild PID {grandchild_pid} should NOT be in non-recursive "
+ f"discovery. Found: {pids_direct}",
+ )
+ finally:
+ # Send signal byte to child to trigger cleanup, then close stdin
+ try:
+ proc.stdin.write(b"x")
+ proc.stdin.flush()
+ proc.stdin.close()
+ except OSError:
+ pass
+ proc.stdout.close()
+ _cleanup_process(proc)
+ # The grandchild may not have been cleaned up by the child process
+ # (e.g., if the child was killed). Explicitly terminate the
+ # grandchild to prevent PermissionError on Windows when removing
+ # temp directories.
+ if grandchild_pid is not None:
+ try:
+ os.kill(grandchild_pid, signal.SIGTERM)
+ except (OSError, ProcessLookupError):
+ pass # Process already exited
+
+ def test_nonexistent_pid_returns_empty(self):
+ """Test that nonexistent PID returns empty list."""
+ from profiling.sampling._child_monitor import get_child_pids
+
+ # Use a very high PID that's unlikely to exist
+ result = get_child_pids(999999999)
+ self.assertEqual(result, [])
+
+
+@requires_remote_subprocess_debugging()
+class TestChildProcessMonitor(unittest.TestCase):
+ """Tests for the ChildProcessMonitor class."""
+
+ def setUp(self):
+ reap_children()
+
+ def tearDown(self):
+ reap_children()
+
+ def test_monitor_creation(self):
+ """Test that ChildProcessMonitor can be created."""
+ from profiling.sampling._child_monitor import ChildProcessMonitor
+
+ monitor = ChildProcessMonitor(
+ pid=os.getpid(),
+ cli_args=["-i", "100", "-d", "5"],
+ output_pattern="test_{pid}.pstats",
+ )
+ self.assertEqual(monitor.parent_pid, os.getpid())
+ self.assertEqual(monitor.cli_args, ["-i", "100", "-d", "5"])
+ self.assertEqual(monitor.output_pattern, "test_{pid}.pstats")
+
+ def test_monitor_lifecycle(self):
+ """Test monitor lifecycle via context manager."""
+ from profiling.sampling._child_monitor import ChildProcessMonitor
+
+ monitor = ChildProcessMonitor(
+ pid=os.getpid(), cli_args=[], output_pattern=None
+ )
+
+ # Before entering context, thread should not exist
+ self.assertIsNone(monitor._monitor_thread)
+
+ with monitor:
+ # Inside context, thread should be running
+ self.assertIsNotNone(monitor._monitor_thread)
+ self.assertTrue(monitor._monitor_thread.is_alive())
+
+ # After exiting context, thread should be stopped
+ self.assertFalse(monitor._monitor_thread.is_alive())
+
+ def test_spawned_profilers_property(self):
+ """Test that spawned_profilers returns a copy of the list."""
+ from profiling.sampling._child_monitor import ChildProcessMonitor
+
+ monitor = ChildProcessMonitor(
+ pid=os.getpid(), cli_args=[], output_pattern=None
+ )
+
+ # Should return empty list initially
+ profilers = monitor.spawned_profilers
+ self.assertEqual(profilers, [])
+ self.assertIsNot(profilers, monitor._spawned_profilers)
+
+ def test_context_manager(self):
+ """Test that ChildProcessMonitor works as a context manager."""
+ from profiling.sampling._child_monitor import ChildProcessMonitor
+
+ with ChildProcessMonitor(
+ pid=os.getpid(), cli_args=[], output_pattern=None
+ ) as monitor:
+ self.assertIsNotNone(monitor._monitor_thread)
+ self.assertTrue(monitor._monitor_thread.is_alive())
+
+ # After exiting context, thread should be stopped
+ self.assertFalse(monitor._monitor_thread.is_alive())
+
+
+@requires_remote_subprocess_debugging()
+class TestCLIChildrenFlag(unittest.TestCase):
+ """Tests for the --subprocesses CLI flag."""
+
+ def setUp(self):
+ reap_children()
+
+ def tearDown(self):
+ reap_children()
+
+ def test_subprocesses_flag_parsed(self):
+ """Test that --subprocesses flag is recognized."""
+ from profiling.sampling.cli import _add_sampling_options
+
+ parser = argparse.ArgumentParser()
+ _add_sampling_options(parser)
+
+ # Parse with --subprocesses
+ args = parser.parse_args(["--subprocesses"])
+ self.assertTrue(args.subprocesses)
+
+ # Parse without --subprocesses
+ args = parser.parse_args([])
+ self.assertFalse(args.subprocesses)
+
+ def test_subprocesses_incompatible_with_live(self):
+ """Test that --subprocesses is incompatible with --live."""
+ from profiling.sampling.cli import _validate_args
+
+ # Create mock args with both subprocesses and live
+ args = argparse.Namespace(
+ subprocesses=True,
+ live=True,
+ async_aware=False,
+ format="pstats",
+ mode="wall",
+ sort=None,
+ limit=None,
+ no_summary=False,
+ opcodes=False,
+ )
+
+ parser = argparse.ArgumentParser()
+
+ with self.assertRaises(SystemExit):
+ _validate_args(args, parser)
+
+ def test_build_child_profiler_args(self):
+ """Test building CLI args for child profilers."""
+ from profiling.sampling.cli import _build_child_profiler_args
+
+ args = argparse.Namespace(
+ interval=200,
+ duration=15,
+ all_threads=True,
+ realtime_stats=False,
+ native=True,
+ gc=True,
+ opcodes=False,
+ async_aware=False,
+ mode="cpu",
+ format="flamegraph",
+ )
+
+ child_args = _build_child_profiler_args(args)
+
+ # Verify flag-value pairs are correctly paired (flag followed by value)
+ def assert_flag_value_pair(flag, value):
+ self.assertIn(
+ flag,
+ child_args,
+ f"Flag '{flag}' not found in args: {child_args}",
+ )
+ flag_index = child_args.index(flag)
+ self.assertGreater(
+ len(child_args),
+ flag_index + 1,
+ f"No value after flag '{flag}' in args: {child_args}",
+ )
+ self.assertEqual(
+ child_args[flag_index + 1],
+ str(value),
+ f"Flag '{flag}' should be followed by '{value}', got "
+ f"'{child_args[flag_index + 1]}' in args: {child_args}",
+ )
+
+ assert_flag_value_pair("-i", 200)
+ assert_flag_value_pair("-d", 15)
+ assert_flag_value_pair("--mode", "cpu")
+
+ # Verify standalone flags are present
+ self.assertIn(
+ "-a", child_args, f"Flag '-a' not found in args: {child_args}"
+ )
+ self.assertIn(
+ "--native",
+ child_args,
+ f"Flag '--native' not found in args: {child_args}",
+ )
+ self.assertIn(
+ "--flamegraph",
+ child_args,
+ f"Flag '--flamegraph' not found in args: {child_args}",
+ )
+
+ def test_build_child_profiler_args_no_gc(self):
+ """Test building CLI args with --no-gc."""
+ from profiling.sampling.cli import _build_child_profiler_args
+
+ args = argparse.Namespace(
+ interval=100,
+ duration=5,
+ all_threads=False,
+ realtime_stats=False,
+ native=False,
+ gc=False, # Explicitly disabled
+ opcodes=False,
+ async_aware=False,
+ mode="wall",
+ format="pstats",
+ )
+
+ child_args = _build_child_profiler_args(args)
+
+ self.assertIn(
+ "--no-gc",
+ child_args,
+ f"Flag '--no-gc' not found when gc=False. Args: {child_args}",
+ )
+
+ def test_build_output_pattern_with_outfile(self):
+ """Test output pattern generation with user-specified output."""
+ from profiling.sampling.cli import _build_output_pattern
+
+ # With extension
+ args = argparse.Namespace(outfile="output.html", format="flamegraph")
+ pattern = _build_output_pattern(args)
+ self.assertEqual(pattern, "output_{pid}.html")
+
+ # Without extension
+ args = argparse.Namespace(outfile="output", format="pstats")
+ pattern = _build_output_pattern(args)
+ self.assertEqual(pattern, "output_{pid}")
+
+ def test_build_output_pattern_default(self):
+ """Test output pattern generation with default output."""
+ from profiling.sampling.cli import _build_output_pattern
+
+ # Flamegraph format
+ args = argparse.Namespace(outfile=None, format="flamegraph")
+ pattern = _build_output_pattern(args)
+ self.assertIn("{pid}", pattern)
+ self.assertIn("flamegraph", pattern)
+ self.assertTrue(pattern.endswith(".html"))
+
+ # Heatmap format
+ args = argparse.Namespace(outfile=None, format="heatmap")
+ pattern = _build_output_pattern(args)
+ self.assertEqual(pattern, "heatmap_{pid}")
+
+
+@requires_remote_subprocess_debugging()
+class TestChildrenIntegration(unittest.TestCase):
+ """Integration tests for --subprocesses functionality."""
+
+ def setUp(self):
+ reap_children()
+
+ def tearDown(self):
+ reap_children()
+
+ def test_setup_child_monitor(self):
+ """Test setting up a child monitor from args."""
+ from profiling.sampling.cli import _setup_child_monitor
+
+ args = argparse.Namespace(
+ interval=100,
+ duration=5,
+ all_threads=False,
+ realtime_stats=False,
+ native=False,
+ gc=True,
+ opcodes=False,
+ async_aware=False,
+ mode="wall",
+ format="pstats",
+ outfile=None,
+ )
+
+ monitor = _setup_child_monitor(args, os.getpid())
+ # Use addCleanup to ensure monitor is properly cleaned up even if
+ # assertions fail
+ self.addCleanup(monitor.__exit__, None, None, None)
+
+ self.assertIsNotNone(monitor)
+ self.assertEqual(
+ monitor.parent_pid,
+ os.getpid(),
+ f"Monitor parent_pid should be {os.getpid()}, got {monitor.parent_pid}",
+ )
+
+
+@requires_remote_subprocess_debugging()
+class TestIsPythonProcess(unittest.TestCase):
+ """Tests for the is_python_process function."""
+
+ def setUp(self):
+ reap_children()
+
+ def tearDown(self):
+ reap_children()
+
+ def test_is_python_process_current_process(self):
+ """Test that current process is detected as Python."""
+ from profiling.sampling._child_monitor import is_python_process
+
+ # Current process should be Python
+ result = is_python_process(os.getpid())
+ self.assertTrue(
+ result,
+ f"Current process (PID {os.getpid()}) should be detected as Python",
+ )
+
+ def test_is_python_process_python_subprocess(self):
+ """Test that a Python subprocess is detected as Python."""
+ from profiling.sampling._child_monitor import is_python_process
+
+ # Start a Python subprocess
+ proc = subprocess.Popen(
+ [sys.executable, "-c", "import time; time.sleep(10)"],
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL,
+ )
+
+ try:
+ # Poll until Python runtime structures are initialized
+ # (is_python_process probes for runtime structures which take
+ # time to initialize after process start)
+ deadline = time.time() + SHORT_TIMEOUT
+ detected = False
+ while time.time() < deadline:
+ if proc.poll() is not None:
+ self.fail(f"Process {proc.pid} exited unexpectedly")
+ if is_python_process(proc.pid):
+ detected = True
+ break
+ time.sleep(0.05)
+
+ self.assertTrue(
+ detected,
+ f"Python subprocess (PID {proc.pid}) should be detected as Python "
+ f"within {SHORT_TIMEOUT}s",
+ )
+ finally:
+ _cleanup_process(proc)
+
+ @unittest.skipUnless(sys.platform == "linux", "Linux only test")
+ def test_is_python_process_non_python_subprocess(self):
+ """Test that a non-Python subprocess is not detected as Python."""
+ from profiling.sampling._child_monitor import is_python_process
+
+ # Start a non-Python subprocess (sleep command)
+ proc = subprocess.Popen(
+ ["sleep", "10"],
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL,
+ )
+
+ try:
+ # Wait for process to be ready using polling
+ self.assertTrue(
+ _wait_for_process_ready(proc, SHORT_TIMEOUT),
+ f"Process {proc.pid} should be ready within {SHORT_TIMEOUT}s",
+ )
+
+ self.assertFalse(
+ is_python_process(proc.pid),
+ f"Non-Python subprocess 'sleep' (PID {proc.pid}) should NOT be "
+ f"detected as Python",
+ )
+ finally:
+ _cleanup_process(proc)
+
+ def test_is_python_process_nonexistent_pid(self):
+ """Test that nonexistent PID returns False."""
+ from profiling.sampling._child_monitor import is_python_process
+
+ # Use a very high PID that's unlikely to exist
+ result = is_python_process(999999999)
+ self.assertFalse(
+ result,
+ "Nonexistent PID 999999999 should return False",
+ )
+
+ def test_is_python_process_exited_process(self):
+ """Test handling of a process that exits quickly."""
+ from profiling.sampling._child_monitor import is_python_process
+
+ # Start a process that exits immediately
+ proc = subprocess.Popen(
+ [sys.executable, "-c", "pass"],
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL,
+ )
+
+ # Wait for it to exit
+ proc.wait(timeout=SHORT_TIMEOUT)
+
+ # Should return False for exited process (not raise)
+ result = is_python_process(proc.pid)
+ self.assertFalse(
+ result, f"Exited process (PID {proc.pid}) should return False"
+ )
+
+
+@requires_remote_subprocess_debugging()
+class TestMaxChildProfilersLimit(unittest.TestCase):
+ """Tests for the _MAX_CHILD_PROFILERS limit."""
+
+ def setUp(self):
+ reap_children()
+
+ def tearDown(self):
+ reap_children()
+
+ def test_max_profilers_constant_exists(self):
+ """Test that _MAX_CHILD_PROFILERS constant is defined."""
+ from profiling.sampling._child_monitor import _MAX_CHILD_PROFILERS
+
+ self.assertEqual(
+ _MAX_CHILD_PROFILERS,
+ 100,
+ f"_MAX_CHILD_PROFILERS should be 100, got {_MAX_CHILD_PROFILERS}",
+ )
+
+ def test_cleanup_interval_constant_exists(self):
+ """Test that _CLEANUP_INTERVAL_CYCLES constant is defined."""
+ from profiling.sampling._child_monitor import _CLEANUP_INTERVAL_CYCLES
+
+ self.assertEqual(
+ _CLEANUP_INTERVAL_CYCLES,
+ 10,
+ f"_CLEANUP_INTERVAL_CYCLES should be 10, got {_CLEANUP_INTERVAL_CYCLES}",
+ )
+
+ def test_monitor_respects_max_limit(self):
+ """Test that monitor refuses to spawn more than _MAX_CHILD_PROFILERS."""
+ from profiling.sampling._child_monitor import (
+ ChildProcessMonitor,
+ _MAX_CHILD_PROFILERS,
+ )
+ from unittest.mock import MagicMock, patch
+
+ # Create a monitor
+ monitor = ChildProcessMonitor(
+ pid=os.getpid(),
+ cli_args=["-i", "100", "-d", "5"],
+ output_pattern="test_{pid}.pstats",
+ )
+
+ # Manually fill up the profilers list to the limit
+ mock_profilers = [MagicMock() for _ in range(_MAX_CHILD_PROFILERS)]
+ for mock_proc in mock_profilers:
+ mock_proc.poll.return_value = None # Simulate running process
+ monitor._spawned_profilers = mock_profilers
+
+ # Try to spawn another profiler - should be rejected
+ stderr_capture = io.StringIO()
+ with patch("sys.stderr", stderr_capture):
+ monitor._spawn_profiler_for_child(99999)
+
+ # Verify warning was printed
+ stderr_output = stderr_capture.getvalue()
+ self.assertIn(
+ "Max child profilers",
+ stderr_output,
+ f"Expected warning about max profilers, got: {stderr_output}",
+ )
+ self.assertIn(
+ str(_MAX_CHILD_PROFILERS),
+ stderr_output,
+ f"Warning should mention limit ({_MAX_CHILD_PROFILERS}): {stderr_output}",
+ )
+
+ # Verify no new profiler was added
+ self.assertEqual(
+ len(monitor._spawned_profilers),
+ _MAX_CHILD_PROFILERS,
+ f"Should still have {_MAX_CHILD_PROFILERS} profilers, got "
+ f"{len(monitor._spawned_profilers)}",
+ )
+
+
+@requires_remote_subprocess_debugging()
+class TestWaitForProfilers(unittest.TestCase):
+ """Tests for the wait_for_profilers method."""
+
+ def setUp(self):
+ reap_children()
+
+ def tearDown(self):
+ reap_children()
+
+ def test_wait_for_profilers_empty_list(self):
+ """Test that wait_for_profilers returns immediately with no profilers."""
+ from profiling.sampling._child_monitor import ChildProcessMonitor
+
+ monitor = ChildProcessMonitor(
+ pid=os.getpid(), cli_args=[], output_pattern=None
+ )
+
+ # Should return immediately without printing anything
+ stderr_capture = io.StringIO()
+ with unittest.mock.patch("sys.stderr", stderr_capture):
+ start = time.time()
+ monitor.wait_for_profilers(timeout=10.0)
+ elapsed = time.time() - start
+
+ # Should complete very quickly (less than 1 second)
+ self.assertLess(
+ elapsed,
+ 1.0,
+ f"wait_for_profilers with empty list took {elapsed:.2f}s, expected < 1s",
+ )
+ # No "Waiting for..." message should be printed
+ self.assertNotIn(
+ "Waiting for",
+ stderr_capture.getvalue(),
+ "Should not print waiting message when no profilers",
+ )
+
+ def test_wait_for_profilers_with_completed_process(self):
+ """Test waiting for profilers that complete quickly."""
+ from profiling.sampling._child_monitor import ChildProcessMonitor
+
+ monitor = ChildProcessMonitor(
+ pid=os.getpid(), cli_args=[], output_pattern=None
+ )
+
+ # Start a process that exits quickly
+ proc = subprocess.Popen(
+ [sys.executable, "-c", "pass"],
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL,
+ )
+
+ # Add to spawned profilers
+ monitor._spawned_profilers.append(proc)
+
+ try:
+ stderr_capture = io.StringIO()
+ with unittest.mock.patch("sys.stderr", stderr_capture):
+ start = time.time()
+ monitor.wait_for_profilers(timeout=SHORT_TIMEOUT)
+ elapsed = time.time() - start
+
+ # Should complete quickly since process exits fast
+ self.assertLess(
+ elapsed,
+ 5.0,
+ f"wait_for_profilers took {elapsed:.2f}s for quick process",
+ )
+ # Should print waiting message
+ self.assertIn(
+ "Waiting for 1 child profiler",
+ stderr_capture.getvalue(),
+ "Should print waiting message",
+ )
+ finally:
+ _cleanup_process(proc)
+
+ def test_wait_for_profilers_timeout(self):
+ """Test that wait_for_profilers respects timeout."""
+ from profiling.sampling._child_monitor import ChildProcessMonitor
+
+ monitor = ChildProcessMonitor(
+ pid=os.getpid(), cli_args=[], output_pattern=None
+ )
+
+ # Start a process that runs for a long time
+ proc = subprocess.Popen(
+ [sys.executable, "-c", "import time; time.sleep(60)"],
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL,
+ )
+
+ # Add to spawned profilers
+ monitor._spawned_profilers.append(proc)
+
+ try:
+ stderr_capture = io.StringIO()
+ with unittest.mock.patch("sys.stderr", stderr_capture):
+ start = time.time()
+ # Use short timeout
+ monitor.wait_for_profilers(timeout=0.5)
+ elapsed = time.time() - start
+
+ # Should timeout after approximately 0.5 seconds
+ self.assertGreater(
+ elapsed,
+ 0.4,
+ f"wait_for_profilers returned too quickly ({elapsed:.2f}s)",
+ )
+ self.assertLess(
+ elapsed,
+ 2.0,
+ f"wait_for_profilers took too long ({elapsed:.2f}s), timeout not respected",
+ )
+ finally:
+ _cleanup_process(proc)
+
+ def test_wait_for_profilers_multiple(self):
+ """Test waiting for multiple profilers."""
+ from profiling.sampling._child_monitor import ChildProcessMonitor
+
+ monitor = ChildProcessMonitor(
+ pid=os.getpid(), cli_args=[], output_pattern=None
+ )
+
+ # Start multiple processes
+ procs = []
+ for _ in range(3):
+ proc = subprocess.Popen(
+ [sys.executable, "-c", "import time; time.sleep(0.1)"],
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL,
+ )
+ procs.append(proc)
+ monitor._spawned_profilers.append(proc)
+
+ try:
+ stderr_capture = io.StringIO()
+ with unittest.mock.patch("sys.stderr", stderr_capture):
+ monitor.wait_for_profilers(timeout=SHORT_TIMEOUT)
+
+ # Should report correct count
+ self.assertIn(
+ "Waiting for 3 child profiler",
+ stderr_capture.getvalue(),
+ "Should report correct profiler count",
+ )
+ finally:
+ for proc in procs:
+ _cleanup_process(proc)
+
+
+@requires_remote_subprocess_debugging()
+class TestEndToEndChildrenCLI(unittest.TestCase):
+ """End-to-end tests for --subprocesses CLI flag."""
+
+ def setUp(self):
+ reap_children()
+
+ def tearDown(self):
+ reap_children()
+
+ def test_subprocesses_flag_spawns_child_and_creates_output(self):
+ """Test that --subprocesses flag works end-to-end with actual subprocesses."""
+ # Create a temporary directory for output files
+ with tempfile.TemporaryDirectory() as tmpdir:
+ # Create a script that spawns a child Python process
+ parent_script = f"""
+import subprocess
+import sys
+import time
+
+# Spawn a child that does some work
+child = subprocess.Popen([
+ sys.executable, '-c',
+ 'import time; [i**2 for i in range(1000)]; time.sleep(2)'
+])
+# Do some work in parent
+for i in range(1000):
+ _ = i ** 2
+time.sleep(2)
+child.wait()
+"""
+ script_file = os.path.join(tmpdir, "parent_script.py")
+ with open(script_file, "w") as f:
+ f.write(parent_script)
+
+ output_file = os.path.join(tmpdir, "profile.pstats")
+
+ # Run the profiler with --subprocesses flag
+ result = subprocess.run(
+ [
+ sys.executable,
+ "-m",
+ "profiling.sampling",
+ "run",
+ "--subprocesses",
+ "-d",
+ "3",
+ "-i",
+ "10000",
+ "-o",
+ output_file,
+ script_file,
+ ],
+ capture_output=True,
+ text=True,
+ timeout=SHORT_TIMEOUT,
+ )
+
+ # Check that parent output file was created
+ self.assertTrue(
+ os.path.exists(output_file),
+ f"Parent profile output not created. "
+ f"stdout: {result.stdout}, stderr: {result.stderr}",
+ )
+
+ # Check for child profiler output files (pattern: profile_{pid}.pstats)
+ output_files = os.listdir(tmpdir)
+ child_profiles = [
+ f
+ for f in output_files
+ if f.startswith("profile_") and f.endswith(".pstats")
+ ]
+
+ # Note: Child profiling is best-effort; the child may exit before
+ # profiler attaches, or the process may not be detected as Python.
+ # We just verify the mechanism doesn't crash.
+ if result.returncode != 0:
+ self.fail(
+ f"Profiler exited with code {result.returncode}. "
+ f"stdout: {result.stdout}, stderr: {result.stderr}"
+ )
+
+ def test_subprocesses_flag_with_flamegraph_output(self):
+ """Test --subprocesses with flamegraph output format."""
+ with tempfile.TemporaryDirectory() as tmpdir:
+ # Simple parent that spawns a child
+ parent_script = f"""
+import subprocess
+import sys
+import time
+child = subprocess.Popen([sys.executable, '-c', 'import time; time.sleep(1)'])
+time.sleep(1)
+child.wait()
+"""
+ script_file = os.path.join(tmpdir, "parent.py")
+ with open(script_file, "w") as f:
+ f.write(parent_script)
+
+ output_file = os.path.join(tmpdir, "flame.html")
+
+ result = subprocess.run(
+ [
+ sys.executable,
+ "-m",
+ "profiling.sampling",
+ "run",
+ "--subprocesses",
+ "-d",
+ "2",
+ "-i",
+ "10000",
+ "--flamegraph",
+ "-o",
+ output_file,
+ script_file,
+ ],
+ capture_output=True,
+ text=True,
+ timeout=SHORT_TIMEOUT,
+ )
+
+ self.assertTrue(
+ os.path.exists(output_file),
+ f"Flamegraph output not created. stderr: {result.stderr}",
+ )
+
+ # Verify it's valid HTML
+ with open(output_file, "r") as f:
+ content = f.read()
+ self.assertIn(
+ "<html",
+ content.lower(),
+ "Flamegraph output should be HTML",
+ )
+
+ def test_subprocesses_flag_no_crash_on_quick_child(self):
+ """Test that --subprocesses doesn't crash when child exits quickly."""
+ with tempfile.TemporaryDirectory() as tmpdir:
+ # Parent spawns a child that exits immediately
+ parent_script = f"""
+import subprocess
+import sys
+import time
+# Child exits immediately
+child = subprocess.Popen([sys.executable, '-c', 'pass'])
+child.wait()
+time.sleep(1)
+"""
+ script_file = os.path.join(tmpdir, "parent.py")
+ with open(script_file, "w") as f:
+ f.write(parent_script)
+
+ output_file = os.path.join(tmpdir, "profile.pstats")
+
+ result = subprocess.run(
+ [
+ sys.executable,
+ "-m",
+ "profiling.sampling",
+ "run",
+ "--subprocesses",
+ "-d",
+ "2",
+ "-i",
+ "10000",
+ "-o",
+ output_file,
+ script_file,
+ ],
+ capture_output=True,
+ text=True,
+ timeout=SHORT_TIMEOUT,
+ )
+
+ # Should not crash - exit code 0
+ self.assertEqual(
+ result.returncode,
+ 0,
+ f"Profiler crashed with quick-exit child. "
+ f"stderr: {result.stderr}",
+ )
+
+
+if __name__ == "__main__":
+ unittest.main()
"Test only runs when _remote_debugging is available"
)
-from test.support import is_emscripten, requires_subprocess
+from test.support import is_emscripten, requires_remote_subprocess_debugging
from profiling.sampling.cli import main
self.assertEqual(coordinator_cmd[5:], expected_target_args)
@unittest.skipIf(is_emscripten, "socket.SO_REUSEADDR does not exist")
- @requires_subprocess()
+ @requires_remote_subprocess_debugging()
def test_cli_module_argument_parsing(self):
test_args = ["profiling.sampling.cli", "run", "-m", "mymodule"]
mock_sample.assert_called_once()
@unittest.skipIf(is_emscripten, "socket.SO_REUSEADDR does not exist")
- @requires_subprocess()
+ @requires_remote_subprocess_debugging()
def test_cli_module_with_arguments(self):
test_args = [
"profiling.sampling.cli",
self.assertIn("invalid choice", error_msg)
@unittest.skipIf(is_emscripten, "socket.SO_REUSEADDR does not exist")
- @requires_subprocess()
+ @requires_remote_subprocess_debugging()
def test_cli_module_with_profiler_options(self):
test_args = [
"profiling.sampling.cli",
self.assertIn("required: target", error_msg) # argparse error for missing positional arg
@unittest.skipIf(is_emscripten, "socket.SO_REUSEADDR does not exist")
- @requires_subprocess()
+ @requires_remote_subprocess_debugging()
def test_cli_long_module_option(self):
test_args = [
"profiling.sampling.cli",
)
from test.support import (
- requires_subprocess,
+ requires_remote_subprocess_debugging,
SHORT_TIMEOUT,
)
from .helpers import (
test_subprocess,
close_and_unlink,
- skip_if_not_supported,
- PROCESS_VM_READV_SUPPORTED,
)
from .mocks import MockFrameInfo, MockThreadInfo, MockInterpreterInfo
PROFILING_DURATION_SEC = 2
-@skip_if_not_supported
-@unittest.skipIf(
- sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
- "Test only runs on Linux with process_vm_readv support",
-)
+@requires_remote_subprocess_debugging()
class TestRecursiveFunctionProfiling(unittest.TestCase):
"""Test profiling of recursive functions and complex call patterns."""
'''
-@requires_subprocess()
-@skip_if_not_supported
+@requires_remote_subprocess_debugging()
class TestSampleProfilerIntegration(unittest.TestCase):
@classmethod
def setUpClass(cls):
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
- try:
- collector = PstatsCollector(sample_interval_usec=1000, skip_idle=False)
- profiling.sampling.sample.sample(
- subproc.process.pid,
- collector,
- duration_sec=PROFILING_DURATION_SEC,
- )
- collector.print_stats(show_summary=False)
- except PermissionError:
- self.skipTest("Insufficient permissions for remote profiling")
+ collector = PstatsCollector(sample_interval_usec=1000, skip_idle=False)
+ profiling.sampling.sample.sample(
+ subproc.process.pid,
+ collector,
+ duration_sec=PROFILING_DURATION_SEC,
+ )
+ collector.print_stats(show_summary=False)
output = captured_output.getvalue()
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
- try:
- collector = PstatsCollector(sample_interval_usec=10000, skip_idle=False)
- profiling.sampling.sample.sample(
- subproc.process.pid,
- collector,
- duration_sec=PROFILING_DURATION_SEC,
- )
- collector.export(pstats_out.name)
- except PermissionError:
- self.skipTest(
- "Insufficient permissions for remote profiling"
- )
+ collector = PstatsCollector(sample_interval_usec=10000, skip_idle=False)
+ profiling.sampling.sample.sample(
+ subproc.process.pid,
+ collector,
+ duration_sec=PROFILING_DURATION_SEC,
+ )
+ collector.export(pstats_out.name)
# Verify file was created and contains valid data
self.assertTrue(os.path.exists(pstats_out.name))
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
- try:
- collector = CollapsedStackCollector(1000, skip_idle=False)
- profiling.sampling.sample.sample(
- subproc.process.pid,
- collector,
- duration_sec=PROFILING_DURATION_SEC,
- )
- collector.export(collapsed_file.name)
- except PermissionError:
- self.skipTest(
- "Insufficient permissions for remote profiling"
- )
+ collector = CollapsedStackCollector(1000, skip_idle=False)
+ profiling.sampling.sample.sample(
+ subproc.process.pid,
+ collector,
+ duration_sec=PROFILING_DURATION_SEC,
+ )
+ collector.export(collapsed_file.name)
# Verify file was created and contains valid data
self.assertTrue(os.path.exists(collapsed_file.name))
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
- try:
- collector = PstatsCollector(sample_interval_usec=10000, skip_idle=False)
- profiling.sampling.sample.sample(
- subproc.process.pid,
- collector,
- duration_sec=PROFILING_DURATION_SEC,
- all_threads=True,
- )
- collector.print_stats(show_summary=False)
- except PermissionError:
- self.skipTest("Insufficient permissions for remote profiling")
+ collector = PstatsCollector(sample_interval_usec=10000, skip_idle=False)
+ profiling.sampling.sample.sample(
+ subproc.process.pid,
+ collector,
+ duration_sec=PROFILING_DURATION_SEC,
+ all_threads=True,
+ )
+ collector.print_stats(show_summary=False)
# Just verify that sampling completed without error
# We're not testing output format here
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
- try:
- from profiling.sampling.cli import main
- main()
- except PermissionError:
- self.skipTest("Insufficient permissions for remote profiling")
+ from profiling.sampling.cli import main
+ main()
output = captured_output.getvalue()
# Change to temp directory so subprocess can find the module
contextlib.chdir(tempdir.name),
):
- try:
- from profiling.sampling.cli import main
- main()
- except PermissionError:
- self.skipTest("Insufficient permissions for remote profiling")
+ from profiling.sampling.cli import main
+ main()
output = captured_output.getvalue()
self.assertIn("slow_fibonacci", output)
-@skip_if_not_supported
-@unittest.skipIf(
- sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
- "Test only runs on Linux with process_vm_readv support",
-)
+@requires_remote_subprocess_debugging()
class TestSampleProfilerErrorHandling(unittest.TestCase):
def test_invalid_pid(self):
with self.assertRaises((OSError, RuntimeError)):
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
- try:
- collector = PstatsCollector(sample_interval_usec=50000, skip_idle=False)
- profiling.sampling.sample.sample(
- subproc.process.pid,
- collector,
- duration_sec=2, # Longer than process lifetime
- )
- except PermissionError:
- self.skipTest(
- "Insufficient permissions for remote profiling"
- )
+ collector = PstatsCollector(sample_interval_usec=50000, skip_idle=False)
+ profiling.sampling.sample.sample(
+ subproc.process.pid,
+ collector,
+ duration_sec=2, # Longer than process lifetime
+ )
output = captured_output.getvalue()
"import time; time.sleep(1000)",
wait_for_working=False
) as subproc:
- try:
- profiler = SampleProfiler(
- pid=subproc.process.pid,
- sample_interval_usec=1000,
- all_threads=False,
- )
- except PermissionError:
- self.skipTest(
- "Insufficient permissions to read the stack trace"
- )
+ profiler = SampleProfiler(
+ pid=subproc.process.pid,
+ sample_interval_usec=1000,
+ all_threads=False,
+ )
self.assertTrue(profiler._is_process_running())
self.assertIsNotNone(profiler.unwinder.get_stack_trace())
subproc.process.kill()
"import time; time.sleep(1000)",
wait_for_working=False
) as subproc:
- try:
- unwinder = _remote_debugging.RemoteUnwinder(
- subproc.process.pid
- )
- except PermissionError:
- self.skipTest(
- "Insufficient permissions to read the stack trace"
- )
+ unwinder = _remote_debugging.RemoteUnwinder(
+ subproc.process.pid
+ )
initial_trace = unwinder.get_stack_trace()
self.assertIsNotNone(initial_trace)
)
output = result.stdout + result.stderr
- if "PermissionError" in output:
- self.skipTest("Insufficient permissions for remote profiling")
self.assertNotIn("Script file not found", output)
self.assertIn(
"No such file or directory: 'nonexistent_file.txt'", output
self.assertNotEqual(cm.exception.code, 0)
-@requires_subprocess()
-@skip_if_not_supported
-@unittest.skipIf(
- sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
- "Test only runs on Linux with process_vm_readv support",
-)
+@requires_remote_subprocess_debugging()
class TestAsyncAwareProfilingIntegration(unittest.TestCase):
"""Integration tests for async-aware profiling mode."""
Returns a dict mapping function names to their sample counts.
"""
with test_subprocess(self.async_script, wait_for_working=True) as subproc:
- try:
- collector = CollapsedStackCollector(1000, skip_idle=False)
- profiling.sampling.sample.sample(
- subproc.process.pid,
- collector,
- duration_sec=PROFILING_DURATION_SEC,
- async_aware=async_aware_mode,
- )
- except PermissionError:
- self.skipTest("Insufficient permissions for remote profiling")
+ collector = CollapsedStackCollector(1000, skip_idle=False)
+ profiling.sampling.sample.sample(
+ subproc.process.pid,
+ collector,
+ duration_sec=PROFILING_DURATION_SEC,
+ async_aware=async_aware_mode,
+ )
# Count samples per function from collapsed stacks
# stack_counter keys are (call_tree, thread_id) where call_tree
"Test only runs when _remote_debugging is available"
)
-from test.support import requires_subprocess
+from test.support import requires_remote_subprocess_debugging
from .helpers import test_subprocess
from .mocks import MockFrameInfo, MockInterpreterInfo
+@requires_remote_subprocess_debugging()
class TestCpuModeFiltering(unittest.TestCase):
"""Test CPU mode filtering functionality (--mode=cpu)."""
idle_key, collector_no_skip.result
) # Idle thread should be included
- @requires_subprocess()
def test_cpu_mode_integration_filtering(self):
"""Integration test: CPU mode should only capture active threads, not idle ones."""
# Script with one mostly-idle thread and one CPU-active thread
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
- try:
- collector = PstatsCollector(sample_interval_usec=5000, skip_idle=True)
- profiling.sampling.sample.sample(
- subproc.process.pid,
- collector,
- duration_sec=2.0,
- mode=1, # CPU mode
- all_threads=True,
- )
- collector.print_stats(show_summary=False, mode=1)
- except (PermissionError, RuntimeError) as e:
- self.skipTest(
- "Insufficient permissions for remote profiling"
- )
+ collector = PstatsCollector(sample_interval_usec=5000, skip_idle=True)
+ profiling.sampling.sample.sample(
+ subproc.process.pid,
+ collector,
+ duration_sec=2.0,
+ mode=1, # CPU mode
+ all_threads=True,
+ )
+ collector.print_stats(show_summary=False, mode=1)
cpu_mode_output = captured_output.getvalue()
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
- try:
- collector = PstatsCollector(sample_interval_usec=5000, skip_idle=False)
- profiling.sampling.sample.sample(
- subproc.process.pid,
- collector,
- duration_sec=2.0,
- mode=0, # Wall-clock mode
- all_threads=True,
- )
- collector.print_stats(show_summary=False)
- except (PermissionError, RuntimeError) as e:
- self.skipTest(
- "Insufficient permissions for remote profiling"
- )
+ collector = PstatsCollector(sample_interval_usec=5000, skip_idle=False)
+ profiling.sampling.sample.sample(
+ subproc.process.pid,
+ collector,
+ duration_sec=2.0,
+ mode=0, # Wall-clock mode
+ all_threads=True,
+ )
+ collector.print_stats(show_summary=False)
wall_mode_output = captured_output.getvalue()
self.assertIn("CPU mode", output)
+@requires_remote_subprocess_debugging()
class TestGilModeFiltering(unittest.TestCase):
"""Test GIL mode filtering functionality (--mode=gil)."""
self.assertEqual(call_args.kwargs.get("mode"), 2) # GIL mode
self.assertEqual(call_args.kwargs.get("duration_sec"), 5)
- @requires_subprocess()
def test_gil_mode_integration_behavior(self):
"""Integration test: GIL mode should capture GIL-holding threads."""
# Create a test script with GIL-releasing operations
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
- try:
- collector = PstatsCollector(sample_interval_usec=5000, skip_idle=True)
- profiling.sampling.sample.sample(
- subproc.process.pid,
- collector,
- duration_sec=2.0,
- mode=2, # GIL mode
- all_threads=True,
- )
- collector.print_stats(show_summary=False)
- except (PermissionError, RuntimeError) as e:
- self.skipTest(
- "Insufficient permissions for remote profiling"
- )
+ collector = PstatsCollector(sample_interval_usec=5000, skip_idle=True)
+ profiling.sampling.sample.sample(
+ subproc.process.pid,
+ collector,
+ duration_sec=2.0,
+ mode=2, # GIL mode
+ all_threads=True,
+ )
+ collector.print_stats(show_summary=False)
gil_mode_output = captured_output.getvalue()
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
- try:
- collector = PstatsCollector(sample_interval_usec=5000, skip_idle=False)
- profiling.sampling.sample.sample(
- subproc.process.pid,
- collector,
- duration_sec=0.5,
- mode=0, # Wall-clock mode
- all_threads=True,
- )
- collector.print_stats(show_summary=False)
- except (PermissionError, RuntimeError) as e:
- self.skipTest(
- "Insufficient permissions for remote profiling"
- )
+ collector = PstatsCollector(sample_interval_usec=5000, skip_idle=False)
+ profiling.sampling.sample.sample(
+ subproc.process.pid,
+ collector,
+ duration_sec=0.5,
+ mode=0, # Wall-clock mode
+ all_threads=True,
+ )
+ collector.print_stats(show_summary=False)
wall_mode_output = captured_output.getvalue()
_parse_mode("invalid")
+@requires_remote_subprocess_debugging()
class TestExceptionModeFiltering(unittest.TestCase):
"""Test exception mode filtering functionality (--mode=exception)."""
from profiling.sampling.constants import PROFILING_MODE_EXCEPTION
self.assertEqual(PROFILING_MODE_EXCEPTION, 4)
- @requires_subprocess()
def test_exception_mode_integration_filtering(self):
"""Integration test: Exception mode should only capture threads with active exceptions."""
# Script with one thread handling an exception and one normal thread
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
- try:
- collector = PstatsCollector(sample_interval_usec=5000, skip_idle=True)
- profiling.sampling.sample.sample(
- subproc.process.pid,
- collector,
- duration_sec=2.0,
- mode=4, # Exception mode
- all_threads=True,
- )
- collector.print_stats(show_summary=False, mode=4)
- except (PermissionError, RuntimeError) as e:
- self.skipTest(
- "Insufficient permissions for remote profiling"
- )
+ collector = PstatsCollector(sample_interval_usec=5000, skip_idle=True)
+ profiling.sampling.sample.sample(
+ subproc.process.pid,
+ collector,
+ duration_sec=2.0,
+ mode=4, # Exception mode
+ all_threads=True,
+ )
+ collector.print_stats(show_summary=False, mode=4)
exception_mode_output = captured_output.getvalue()
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
- try:
- collector = PstatsCollector(sample_interval_usec=5000, skip_idle=False)
- profiling.sampling.sample.sample(
- subproc.process.pid,
- collector,
- duration_sec=2.0,
- mode=0, # Wall-clock mode
- all_threads=True,
- )
- collector.print_stats(show_summary=False)
- except (PermissionError, RuntimeError) as e:
- self.skipTest(
- "Insufficient permissions for remote profiling"
- )
+ collector = PstatsCollector(sample_interval_usec=5000, skip_idle=False)
+ profiling.sampling.sample.sample(
+ subproc.process.pid,
+ collector,
+ duration_sec=2.0,
+ mode=0, # Wall-clock mode
+ all_threads=True,
+ )
+ collector.print_stats(show_summary=False)
wall_mode_output = captured_output.getvalue()
--- /dev/null
+Add ``--subprocesses`` flag to :mod:`profiling.sampling` CLI to automatically
+profile subprocesses spawned by the target. When enabled, the profiler
+monitors for new Python subprocesses and profiles each one separately,
+writing results to individual output files. This is useful for profiling
+applications that use :mod:`multiprocessing`, :class:`~concurrent.futures.ProcessPoolExecutor`,
+or other subprocess-based parallelism. Patch by Pablo Galindo.
@MODULE__PICKLE_TRUE@_pickle _pickle.c
@MODULE__QUEUE_TRUE@_queue _queuemodule.c
@MODULE__RANDOM_TRUE@_random _randommodule.c
-@MODULE__REMOTE_DEBUGGING_TRUE@_remote_debugging _remote_debugging/module.c _remote_debugging/object_reading.c _remote_debugging/code_objects.c _remote_debugging/frames.c _remote_debugging/frame_cache.c _remote_debugging/threads.c _remote_debugging/asyncio.c
+@MODULE__REMOTE_DEBUGGING_TRUE@_remote_debugging _remote_debugging/module.c _remote_debugging/object_reading.c _remote_debugging/code_objects.c _remote_debugging/frames.c _remote_debugging/frame_cache.c _remote_debugging/threads.c _remote_debugging/asyncio.c _remote_debugging/subprocess.c
@MODULE__STRUCT_TRUE@_struct _struct.c
# build supports subinterpreters
#ifndef Py_REMOTE_DEBUGGING_H
#define Py_REMOTE_DEBUGGING_H
+/* _GNU_SOURCE must be defined before any system headers */
+#define _GNU_SOURCE
+
#ifdef __cplusplus
extern "C" {
#endif
-#define _GNU_SOURCE
-
#ifndef Py_BUILD_CORE_BUILTIN
# define Py_BUILD_CORE_MODULE 1
#endif
#include "Python.h"
-#include <internal/pycore_debug_offsets.h> // _Py_DebugOffsets
-#include <internal/pycore_frame.h> // FRAME_SUSPENDED_YIELD_FROM
-#include <internal/pycore_interpframe.h> // FRAME_OWNED_BY_INTERPRETER
-#include <internal/pycore_llist.h> // struct llist_node
-#include <internal/pycore_long.h> // _PyLong_GetZero
-#include <internal/pycore_stackref.h> // Py_TAG_BITS
+#include "internal/pycore_debug_offsets.h" // _Py_DebugOffsets
+#include "internal/pycore_frame.h" // FRAME_SUSPENDED_YIELD_FROM
+#include "internal/pycore_interpframe.h" // FRAME_OWNED_BY_INTERPRETER
+#include "internal/pycore_llist.h" // struct llist_node
+#include "internal/pycore_long.h" // _PyLong_GetZero
+#include "internal/pycore_stackref.h" // Py_TAG_BITS
#include "../../Python/remote_debug.h"
#include <assert.h>
# define HAVE_PROCESS_VM_READV 0
#endif
-#if defined(__APPLE__) && TARGET_OS_OSX
-#include <libproc.h>
-#include <sys/types.h>
-#define MAX_NATIVE_THREADS 4096
+#if defined(__APPLE__)
+#include <TargetConditionals.h>
+# if !defined(TARGET_OS_OSX)
+ /* Older macOS SDKs do not define TARGET_OS_OSX */
+# define TARGET_OS_OSX 1
+# endif
+# if TARGET_OS_OSX
+# include <libproc.h>
+# include <sys/types.h>
+# define MAX_NATIVE_THREADS 4096
+# endif
#endif
#ifdef MS_WINDOWS
void *context
);
+/* ============================================================================
+ * SUBPROCESS ENUMERATION FUNCTION DECLARATIONS
+ * ============================================================================ */
+
+/* Get all child PIDs of a process.
+ * Returns a new Python list of PIDs, or NULL on error with exception set.
+ * If recursive is true, includes all descendants (children, grandchildren, etc.)
+ */
+extern PyObject *enumerate_child_pids(pid_t target_pid, int recursive);
+
#ifdef __cplusplus
}
#endif
return return_value;
}
-/*[clinic end generated code: output=1943fb7a56197e39 input=a9049054013a1b77]*/
+
+PyDoc_STRVAR(_remote_debugging_get_child_pids__doc__,
+"get_child_pids($module, /, pid, *, recursive=True)\n"
+"--\n"
+"\n"
+"Get all child process IDs of the given process.\n"
+"\n"
+" pid\n"
+" Process ID of the parent process\n"
+" recursive\n"
+" If True, return all descendants (children, grandchildren, etc.).\n"
+" If False, return only direct children.\n"
+"\n"
+"Returns a list of child process IDs. Returns an empty list if no children\n"
+"are found.\n"
+"\n"
+"This function provides a snapshot of child processes at a moment in time.\n"
+"Child processes may exit or new ones may be created after the list is returned.\n"
+"\n"
+"Raises:\n"
+" OSError: If unable to enumerate processes\n"
+" NotImplementedError: If not supported on this platform");
+
+#define _REMOTE_DEBUGGING_GET_CHILD_PIDS_METHODDEF \
+ {"get_child_pids", _PyCFunction_CAST(_remote_debugging_get_child_pids), METH_FASTCALL|METH_KEYWORDS, _remote_debugging_get_child_pids__doc__},
+
+static PyObject *
+_remote_debugging_get_child_pids_impl(PyObject *module, int pid,
+ int recursive);
+
+static PyObject *
+_remote_debugging_get_child_pids(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames)
+{
+ PyObject *return_value = NULL;
+ #if defined(Py_BUILD_CORE) && !defined(Py_BUILD_CORE_MODULE)
+
+ #define NUM_KEYWORDS 2
+ static struct {
+ PyGC_Head _this_is_not_used;
+ PyObject_VAR_HEAD
+ Py_hash_t ob_hash;
+ PyObject *ob_item[NUM_KEYWORDS];
+ } _kwtuple = {
+ .ob_base = PyVarObject_HEAD_INIT(&PyTuple_Type, NUM_KEYWORDS)
+ .ob_hash = -1,
+ .ob_item = { &_Py_ID(pid), &_Py_ID(recursive), },
+ };
+ #undef NUM_KEYWORDS
+ #define KWTUPLE (&_kwtuple.ob_base.ob_base)
+
+ #else // !Py_BUILD_CORE
+ # define KWTUPLE NULL
+ #endif // !Py_BUILD_CORE
+
+ static const char * const _keywords[] = {"pid", "recursive", NULL};
+ static _PyArg_Parser _parser = {
+ .keywords = _keywords,
+ .fname = "get_child_pids",
+ .kwtuple = KWTUPLE,
+ };
+ #undef KWTUPLE
+ PyObject *argsbuf[2];
+ Py_ssize_t noptargs = nargs + (kwnames ? PyTuple_GET_SIZE(kwnames) : 0) - 1;
+ int pid;
+ int recursive = 1;
+
+ args = _PyArg_UnpackKeywords(args, nargs, NULL, kwnames, &_parser,
+ /*minpos*/ 1, /*maxpos*/ 1, /*minkw*/ 0, /*varpos*/ 0, argsbuf);
+ if (!args) {
+ goto exit;
+ }
+ pid = PyLong_AsInt(args[0]);
+ if (pid == -1 && PyErr_Occurred()) {
+ goto exit;
+ }
+ if (!noptargs) {
+ goto skip_optional_kwonly;
+ }
+ recursive = PyObject_IsTrue(args[1]);
+ if (recursive < 0) {
+ goto exit;
+ }
+skip_optional_kwonly:
+ return_value = _remote_debugging_get_child_pids_impl(module, pid, recursive);
+
+exit:
+ return return_value;
+}
+
+PyDoc_STRVAR(_remote_debugging_is_python_process__doc__,
+"is_python_process($module, /, pid)\n"
+"--\n"
+"\n"
+"Check if a process is a Python process.");
+
+#define _REMOTE_DEBUGGING_IS_PYTHON_PROCESS_METHODDEF \
+ {"is_python_process", _PyCFunction_CAST(_remote_debugging_is_python_process), METH_FASTCALL|METH_KEYWORDS, _remote_debugging_is_python_process__doc__},
+
+static PyObject *
+_remote_debugging_is_python_process_impl(PyObject *module, int pid);
+
+static PyObject *
+_remote_debugging_is_python_process(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames)
+{
+ PyObject *return_value = NULL;
+ #if defined(Py_BUILD_CORE) && !defined(Py_BUILD_CORE_MODULE)
+
+ #define NUM_KEYWORDS 1
+ static struct {
+ PyGC_Head _this_is_not_used;
+ PyObject_VAR_HEAD
+ Py_hash_t ob_hash;
+ PyObject *ob_item[NUM_KEYWORDS];
+ } _kwtuple = {
+ .ob_base = PyVarObject_HEAD_INIT(&PyTuple_Type, NUM_KEYWORDS)
+ .ob_hash = -1,
+ .ob_item = { &_Py_ID(pid), },
+ };
+ #undef NUM_KEYWORDS
+ #define KWTUPLE (&_kwtuple.ob_base.ob_base)
+
+ #else // !Py_BUILD_CORE
+ # define KWTUPLE NULL
+ #endif // !Py_BUILD_CORE
+
+ static const char * const _keywords[] = {"pid", NULL};
+ static _PyArg_Parser _parser = {
+ .keywords = _keywords,
+ .fname = "is_python_process",
+ .kwtuple = KWTUPLE,
+ };
+ #undef KWTUPLE
+ PyObject *argsbuf[1];
+ int pid;
+
+ args = _PyArg_UnpackKeywords(args, nargs, NULL, kwnames, &_parser,
+ /*minpos*/ 1, /*maxpos*/ 1, /*minkw*/ 0, /*varpos*/ 0, argsbuf);
+ if (!args) {
+ goto exit;
+ }
+ pid = PyLong_AsInt(args[0]);
+ if (pid == -1 && PyErr_Occurred()) {
+ goto exit;
+ }
+ return_value = _remote_debugging_is_python_process_impl(module, pid);
+
+exit:
+ return return_value;
+}
+/*[clinic end generated code: output=dc0550ad3d6a409c input=a9049054013a1b77]*/
}
// Validate that the debug offsets are valid
- if(validate_debug_offsets(&self->debug_offsets) == -1) {
+ if (validate_debug_offsets(&self->debug_offsets) == -1) {
set_exception_cause(self, PyExc_RuntimeError, "Invalid debug offsets found");
return -1;
}
_Py_hashtable_destroy(self->code_object_cache);
}
#ifdef MS_WINDOWS
- if(self->win_process_buffer != NULL) {
+ if (self->win_process_buffer != NULL) {
PyMem_Free(self->win_process_buffer);
}
#endif
{0, NULL},
};
+/* ============================================================================
+ * MODULE-LEVEL FUNCTIONS
+ * ============================================================================ */
+
+/*[clinic input]
+_remote_debugging.get_child_pids
+
+ pid: int
+ Process ID of the parent process
+ *
+ recursive: bool = True
+ If True, return all descendants (children, grandchildren, etc.).
+ If False, return only direct children.
+
+Get all child process IDs of the given process.
+
+Returns a list of child process IDs. Returns an empty list if no children
+are found.
+
+This function provides a snapshot of child processes at a moment in time.
+Child processes may exit or new ones may be created after the list is returned.
+
+Raises:
+ OSError: If unable to enumerate processes
+ NotImplementedError: If not supported on this platform
+[clinic start generated code]*/
+
+static PyObject *
+_remote_debugging_get_child_pids_impl(PyObject *module, int pid,
+ int recursive)
+/*[clinic end generated code: output=1ae2289c6b953e4b input=3395cbe7f17066c9]*/
+{
+ return enumerate_child_pids((pid_t)pid, recursive);
+}
+
+/*[clinic input]
+_remote_debugging.is_python_process
+
+ pid: int
+
+Check if a process is a Python process.
+[clinic start generated code]*/
+
+static PyObject *
+_remote_debugging_is_python_process_impl(PyObject *module, int pid)
+/*[clinic end generated code: output=22947dc8afcac362 input=13488e28c7295d84]*/
+{
+ proc_handle_t handle;
+
+ if (_Py_RemoteDebug_InitProcHandle(&handle, pid) < 0) {
+ PyErr_Clear();
+ Py_RETURN_FALSE;
+ }
+
+ uintptr_t runtime_start_address = _Py_RemoteDebug_GetPyRuntimeAddress(&handle);
+ _Py_RemoteDebug_CleanupProcHandle(&handle);
+
+ if (runtime_start_address == 0) {
+ PyErr_Clear();
+ Py_RETURN_FALSE;
+ }
+
+ Py_RETURN_TRUE;
+}
+
static PyMethodDef remote_debugging_methods[] = {
+ _REMOTE_DEBUGGING_GET_CHILD_PIDS_METHODDEF
+ _REMOTE_DEBUGGING_IS_PYTHON_PROCESS_METHODDEF
{NULL, NULL, 0, NULL},
};
--- /dev/null
+/******************************************************************************
+ * Remote Debugging Module - Subprocess Enumeration
+ *
+ * This file contains platform-specific functions for enumerating child
+ * processes of a given PID.
+ ******************************************************************************/
+
+#include "_remote_debugging.h"
+
+#ifndef MS_WINDOWS
+#include <unistd.h>
+#include <dirent.h>
+#endif
+
+#ifdef MS_WINDOWS
+#include <tlhelp32.h>
+#endif
+
+/* ============================================================================
+ * INTERNAL DATA STRUCTURES
+ * ============================================================================ */
+
+/* Simple dynamic array for collecting PIDs */
+typedef struct {
+ pid_t *pids;
+ size_t count;
+ size_t capacity;
+} pid_array_t;
+
+static int
+pid_array_init(pid_array_t *arr)
+{
+ arr->capacity = 64;
+ arr->count = 0;
+ arr->pids = (pid_t *)PyMem_Malloc(arr->capacity * sizeof(pid_t));
+ if (arr->pids == NULL) {
+ PyErr_NoMemory();
+ return -1;
+ }
+ return 0;
+}
+
+static void
+pid_array_cleanup(pid_array_t *arr)
+{
+ if (arr->pids != NULL) {
+ PyMem_Free(arr->pids);
+ arr->pids = NULL;
+ }
+ arr->count = 0;
+ arr->capacity = 0;
+}
+
+static int
+pid_array_append(pid_array_t *arr, pid_t pid)
+{
+ if (arr->count >= arr->capacity) {
+ /* Check for overflow before multiplication */
+ if (arr->capacity > SIZE_MAX / 2) {
+ PyErr_SetString(PyExc_OverflowError, "PID array capacity overflow");
+ return -1;
+ }
+ size_t new_capacity = arr->capacity * 2;
+ /* Check allocation size won't overflow */
+ if (new_capacity > SIZE_MAX / sizeof(pid_t)) {
+ PyErr_SetString(PyExc_OverflowError, "PID array size overflow");
+ return -1;
+ }
+ pid_t *new_pids = (pid_t *)PyMem_Realloc(arr->pids, new_capacity * sizeof(pid_t));
+ if (new_pids == NULL) {
+ PyErr_NoMemory();
+ return -1;
+ }
+ arr->pids = new_pids;
+ arr->capacity = new_capacity;
+ }
+ arr->pids[arr->count++] = pid;
+ return 0;
+}
+
+static int
+pid_array_contains(pid_array_t *arr, pid_t pid)
+{
+ for (size_t i = 0; i < arr->count; i++) {
+ if (arr->pids[i] == pid) {
+ return 1;
+ }
+ }
+ return 0;
+}
+
+/* ============================================================================
+ * SHARED BFS HELPER
+ * ============================================================================ */
+
+/* Find child PIDs using BFS traversal of the pid->ppid mapping.
+ * all_pids and ppids must have the same count (parallel arrays).
+ * Returns 0 on success, -1 on error. */
+static int
+find_children_bfs(pid_t target_pid, int recursive,
+ pid_t *all_pids, pid_t *ppids, size_t pid_count,
+ pid_array_t *result)
+{
+ int retval = -1;
+ pid_array_t to_process = {0};
+
+ if (pid_array_init(&to_process) < 0) {
+ goto done;
+ }
+ if (pid_array_append(&to_process, target_pid) < 0) {
+ goto done;
+ }
+
+ size_t process_idx = 0;
+ while (process_idx < to_process.count) {
+ pid_t current_pid = to_process.pids[process_idx++];
+
+ for (size_t i = 0; i < pid_count; i++) {
+ if (ppids[i] != current_pid) {
+ continue;
+ }
+ pid_t child_pid = all_pids[i];
+ if (pid_array_contains(result, child_pid)) {
+ continue;
+ }
+ if (pid_array_append(result, child_pid) < 0) {
+ goto done;
+ }
+ if (recursive && pid_array_append(&to_process, child_pid) < 0) {
+ goto done;
+ }
+ }
+
+ if (!recursive) {
+ break;
+ }
+ }
+
+ retval = 0;
+
+done:
+ pid_array_cleanup(&to_process);
+ return retval;
+}
+
+/* ============================================================================
+ * LINUX IMPLEMENTATION
+ * ============================================================================ */
+
+#if defined(__linux__)
+
+/* Parse /proc/{pid}/stat to get parent PID */
+static pid_t
+get_ppid_linux(pid_t pid)
+{
+ char stat_path[64];
+ char buffer[2048];
+
+ snprintf(stat_path, sizeof(stat_path), "/proc/%d/stat", (int)pid);
+
+ int fd = open(stat_path, O_RDONLY);
+ if (fd == -1) {
+ return -1;
+ }
+
+ ssize_t n = read(fd, buffer, sizeof(buffer) - 1);
+ close(fd);
+
+ if (n <= 0) {
+ return -1;
+ }
+ buffer[n] = '\0';
+
+ /* Find closing paren of comm field - stat format: pid (comm) state ppid ... */
+ char *p = strrchr(buffer, ')');
+ if (!p) {
+ return -1;
+ }
+
+ /* Skip ") " with bounds checking */
+ char *end = buffer + n;
+ p += 2;
+ if (p >= end) {
+ return -1;
+ }
+ if (*p == ' ') {
+ p++;
+ if (p >= end) {
+ return -1;
+ }
+ }
+
+ /* Parse: state ppid */
+ char state;
+ int ppid;
+ if (sscanf(p, "%c %d", &state, &ppid) != 2) {
+ return -1;
+ }
+
+ return (pid_t)ppid;
+}
+
+static int
+get_child_pids_platform(pid_t target_pid, int recursive, pid_array_t *result)
+{
+ int retval = -1;
+ pid_array_t all_pids = {0};
+ pid_array_t ppids = {0};
+ DIR *proc_dir = NULL;
+
+ if (pid_array_init(&all_pids) < 0) {
+ goto done;
+ }
+
+ if (pid_array_init(&ppids) < 0) {
+ goto done;
+ }
+
+ proc_dir = opendir("/proc");
+ if (!proc_dir) {
+ PyErr_SetFromErrnoWithFilename(PyExc_OSError, "/proc");
+ goto done;
+ }
+
+ /* Single pass: collect PIDs and their PPIDs together */
+ struct dirent *entry;
+ while ((entry = readdir(proc_dir)) != NULL) {
+ /* Skip non-numeric entries (also skips . and ..) */
+ if (entry->d_name[0] < '1' || entry->d_name[0] > '9') {
+ continue;
+ }
+ char *endptr;
+ long pid_long = strtol(entry->d_name, &endptr, 10);
+ if (*endptr != '\0' || pid_long <= 0) {
+ continue;
+ }
+ pid_t pid = (pid_t)pid_long;
+ pid_t ppid = get_ppid_linux(pid);
+ if (ppid < 0) {
+ continue;
+ }
+ if (pid_array_append(&all_pids, pid) < 0 ||
+ pid_array_append(&ppids, ppid) < 0) {
+ goto done;
+ }
+ }
+
+ closedir(proc_dir);
+ proc_dir = NULL;
+
+ if (find_children_bfs(target_pid, recursive,
+ all_pids.pids, ppids.pids, all_pids.count,
+ result) < 0) {
+ goto done;
+ }
+
+ retval = 0;
+
+done:
+ if (proc_dir) {
+ closedir(proc_dir);
+ }
+ pid_array_cleanup(&all_pids);
+ pid_array_cleanup(&ppids);
+ return retval;
+}
+
+#endif /* __linux__ */
+
+/* ============================================================================
+ * MACOS IMPLEMENTATION
+ * ============================================================================ */
+
+#if defined(__APPLE__) && TARGET_OS_OSX
+
+#include <sys/proc_info.h>
+
+static int
+get_child_pids_platform(pid_t target_pid, int recursive, pid_array_t *result)
+{
+ int retval = -1;
+ pid_t *pid_list = NULL;
+ pid_t *ppids = NULL;
+
+ /* Get count of all PIDs */
+ int n_pids = proc_listallpids(NULL, 0);
+ if (n_pids <= 0) {
+ PyErr_SetString(PyExc_OSError, "Failed to get process count");
+ goto done;
+ }
+
+ /* Allocate buffer for PIDs (add some slack for new processes) */
+ int buffer_size = n_pids + 64;
+ pid_list = (pid_t *)PyMem_Malloc(buffer_size * sizeof(pid_t));
+ if (!pid_list) {
+ PyErr_NoMemory();
+ goto done;
+ }
+
+ /* Get actual PIDs */
+ int actual = proc_listallpids(pid_list, buffer_size * sizeof(pid_t));
+ if (actual <= 0) {
+ PyErr_SetString(PyExc_OSError, "Failed to list PIDs");
+ goto done;
+ }
+
+ /* Build pid -> ppid mapping */
+ ppids = (pid_t *)PyMem_Malloc(actual * sizeof(pid_t));
+ if (!ppids) {
+ PyErr_NoMemory();
+ goto done;
+ }
+
+ /* Get parent PIDs for each process */
+ int valid_count = 0;
+ for (int i = 0; i < actual; i++) {
+ struct proc_bsdinfo proc_info;
+ int ret = proc_pidinfo(pid_list[i], PROC_PIDTBSDINFO, 0,
+ &proc_info, sizeof(proc_info));
+ if (ret != sizeof(proc_info)) {
+ continue;
+ }
+ pid_list[valid_count] = pid_list[i];
+ ppids[valid_count] = proc_info.pbi_ppid;
+ valid_count++;
+ }
+
+ if (find_children_bfs(target_pid, recursive,
+ pid_list, ppids, valid_count,
+ result) < 0) {
+ goto done;
+ }
+
+ retval = 0;
+
+done:
+ PyMem_Free(pid_list);
+ PyMem_Free(ppids);
+ return retval;
+}
+
+#endif /* __APPLE__ && TARGET_OS_OSX */
+
+/* ============================================================================
+ * WINDOWS IMPLEMENTATION
+ * ============================================================================ */
+
+#ifdef MS_WINDOWS
+
+static int
+get_child_pids_platform(pid_t target_pid, int recursive, pid_array_t *result)
+{
+ int retval = -1;
+ pid_array_t all_pids = {0};
+ pid_array_t ppids = {0};
+ HANDLE snapshot = INVALID_HANDLE_VALUE;
+
+ snapshot = CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0);
+ if (snapshot == INVALID_HANDLE_VALUE) {
+ PyErr_SetFromWindowsErr(0);
+ goto done;
+ }
+
+ if (pid_array_init(&all_pids) < 0) {
+ goto done;
+ }
+
+ if (pid_array_init(&ppids) < 0) {
+ goto done;
+ }
+
+ /* Single pass: collect PIDs and PPIDs together */
+ PROCESSENTRY32 pe;
+ pe.dwSize = sizeof(PROCESSENTRY32);
+ if (Process32First(snapshot, &pe)) {
+ do {
+ if (pid_array_append(&all_pids, (pid_t)pe.th32ProcessID) < 0 ||
+ pid_array_append(&ppids, (pid_t)pe.th32ParentProcessID) < 0) {
+ goto done;
+ }
+ } while (Process32Next(snapshot, &pe));
+ }
+
+ CloseHandle(snapshot);
+ snapshot = INVALID_HANDLE_VALUE;
+
+ if (find_children_bfs(target_pid, recursive,
+ all_pids.pids, ppids.pids, all_pids.count,
+ result) < 0) {
+ goto done;
+ }
+
+ retval = 0;
+
+done:
+ if (snapshot != INVALID_HANDLE_VALUE) {
+ CloseHandle(snapshot);
+ }
+ pid_array_cleanup(&all_pids);
+ pid_array_cleanup(&ppids);
+ return retval;
+}
+
+#endif /* MS_WINDOWS */
+
+/* ============================================================================
+ * UNSUPPORTED PLATFORM STUB
+ * ============================================================================ */
+
+#if !defined(__linux__) && !(defined(__APPLE__) && TARGET_OS_OSX) && !defined(MS_WINDOWS)
+
+static int
+get_child_pids_platform(pid_t target_pid, int recursive, pid_array_t *result)
+{
+ PyErr_SetString(PyExc_NotImplementedError,
+ "Subprocess enumeration not supported on this platform");
+ return -1;
+}
+
+#endif
+
+/* ============================================================================
+ * PUBLIC API
+ * ============================================================================ */
+
+PyObject *
+enumerate_child_pids(pid_t target_pid, int recursive)
+{
+ pid_array_t result;
+
+ if (pid_array_init(&result) < 0) {
+ return NULL;
+ }
+
+ if (get_child_pids_platform(target_pid, recursive, &result) < 0) {
+ pid_array_cleanup(&result);
+ return NULL;
+ }
+
+ /* Convert to Python list */
+ PyObject *list = PyList_New(result.count);
+ if (list == NULL) {
+ pid_array_cleanup(&result);
+ return NULL;
+ }
+
+ for (size_t i = 0; i < result.count; i++) {
+ PyObject *pid_obj = PyLong_FromLong((long)result.pids[i]);
+ if (pid_obj == NULL) {
+ Py_DECREF(list);
+ pid_array_cleanup(&result);
+ return NULL;
+ }
+ PyList_SET_ITEM(list, i, pid_obj);
+ }
+
+ pid_array_cleanup(&result);
+ return list;
+}
<ClCompile Include="..\Modules\_remote_debugging\frame_cache.c" />
<ClCompile Include="..\Modules\_remote_debugging\threads.c" />
<ClCompile Include="..\Modules\_remote_debugging\asyncio.c" />
+ <ClCompile Include="..\Modules\_remote_debugging\subprocess.c" />
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\Modules\_remote_debugging\_remote_debugging.h" />
<ClCompile Include="..\Modules\_remote_debugging\asyncio.c">
<Filter>Source Files</Filter>
</ClCompile>
+ <ClCompile Include="..\Modules\_remote_debugging\subprocess.c">
+ <Filter>Source Files</Filter>
+ </ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\Modules\_remote_debugging\_remote_debugging.h">