SubprocessInfo = namedtuple("SubprocessInfo", ["process", "socket"])
+def _wait_for_signal(sock, expected_signals, timeout=SHORT_TIMEOUT):
+ """
+ Wait for expected signal(s) from a socket with proper timeout and EOF handling.
+
+ Args:
+ sock: Connected socket to read from
+ expected_signals: Single bytes object or list of bytes objects to wait for
+ timeout: Socket timeout in seconds
+
+ Returns:
+ bytes: Complete accumulated response buffer
+
+ Raises:
+ RuntimeError: If connection closed before signal received or timeout
+ """
+ if isinstance(expected_signals, bytes):
+ expected_signals = [expected_signals]
+
+ sock.settimeout(timeout)
+ buffer = b""
+
+ while True:
+ # Check if all expected signals are in buffer
+ if all(sig in buffer for sig in expected_signals):
+ return buffer
+
+ try:
+ chunk = sock.recv(4096)
+ if not chunk:
+ raise RuntimeError(
+ f"Connection closed before receiving expected signals. "
+ f"Expected: {expected_signals}, Got: {buffer[-200:]!r}"
+ )
+ buffer += chunk
+ except socket.timeout:
+ raise RuntimeError(
+ f"Timeout waiting for signals. "
+ f"Expected: {expected_signals}, Got: {buffer[-200:]!r}"
+ ) from None
+ except OSError as e:
+ raise RuntimeError(
+ f"Socket error while waiting for signals: {e}. "
+ f"Expected: {expected_signals}, Got: {buffer[-200:]!r}"
+ ) from None
+
+
+def _cleanup_sockets(*sockets):
+ """Safely close multiple sockets, ignoring errors."""
+ for sock in sockets:
+ if sock is not None:
+ try:
+ sock.close()
+ except OSError:
+ pass
+
+
+def _cleanup_process(proc, timeout=SHORT_TIMEOUT):
+ """Terminate a process gracefully, escalating to kill if needed."""
+ if proc.poll() is not None:
+ return
+ proc.terminate()
+ try:
+ proc.wait(timeout=timeout)
+ return
+ except subprocess.TimeoutExpired:
+ pass
+ proc.kill()
+ try:
+ proc.wait(timeout=timeout)
+ except subprocess.TimeoutExpired:
+ pass # Process refuses to die, nothing more we can do
+
+
@contextlib.contextmanager
-def test_subprocess(script):
+def test_subprocess(script, wait_for_working=False):
"""Context manager to create a test subprocess with socket synchronization.
Args:
- script: Python code to execute in the subprocess
+ script: Python code to execute in the subprocess. If wait_for_working
+ is True, script should send b"working" after starting work.
+ wait_for_working: If True, wait for both "ready" and "working" signals.
+ Default False for backward compatibility.
Yields:
SubprocessInfo: Named tuple with process and socket objects
# Wait for process to connect and send ready signal
client_socket, _ = server_socket.accept()
server_socket.close()
- response = client_socket.recv(1024)
- if response != b"ready":
- raise RuntimeError(
- f"Unexpected response from subprocess: {response!r}"
- )
+ server_socket = None
+
+ # Wait for ready signal, and optionally working signal
+ if wait_for_working:
+ _wait_for_signal(client_socket, [b"ready", b"working"])
+ else:
+ _wait_for_signal(client_socket, b"ready")
yield SubprocessInfo(proc, client_socket)
finally:
- if client_socket is not None:
- client_socket.close()
- if proc.poll() is None:
- proc.kill()
- proc.wait()
+ _cleanup_sockets(client_socket, server_socket)
+ _cleanup_process(proc)
def close_and_unlink(file):
import gc
class ExpensiveGarbage:
- """Class that triggers GC with expensive finalizer (callback)."""
def __init__(self):
self.cycle = self
def __del__(self):
- # CPU-intensive work in the finalizer callback
result = 0
for i in range(100000):
result += i * i
if i % 1000 == 0:
result = result % 1000000
-def main_loop():
- """Main loop that triggers GC with expensive callback."""
- while True:
- ExpensiveGarbage()
- gc.collect()
-
-if __name__ == "__main__":
- main_loop()
+_test_sock.sendall(b"working")
+while True:
+ ExpensiveGarbage()
+ gc.collect()
'''
def test_gc_frames_enabled(self):
"""Test that GC frames appear when gc tracking is enabled."""
with (
- test_subprocess(self.gc_test_script) as subproc,
+ test_subprocess(self.gc_test_script, wait_for_working=True) as subproc,
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
def test_gc_frames_disabled(self):
"""Test that GC frames do not appear when gc tracking is disabled."""
with (
- test_subprocess(self.gc_test_script) as subproc,
+ test_subprocess(self.gc_test_script, wait_for_working=True) as subproc,
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
cls.native_test_script = """
import operator
-def main_loop():
- while True:
- # Native code in the middle of the stack:
- operator.call(inner)
-
def inner():
- # Python code at the top of the stack:
for _ in range(1_000_0000):
pass
-if __name__ == "__main__":
- main_loop()
+_test_sock.sendall(b"working")
+while True:
+ operator.call(inner)
"""
def test_native_frames_enabled(self):
)
self.addCleanup(close_and_unlink, collapsed_file)
- with (
- test_subprocess(self.native_test_script) as subproc,
- ):
- # Suppress profiler output when testing file export
+ with test_subprocess(self.native_test_script, wait_for_working=True) as subproc:
with (
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
def test_native_frames_disabled(self):
"""Test that native frames do not appear when native tracking is disabled."""
with (
- test_subprocess(self.native_test_script) as subproc,
+ test_subprocess(self.native_test_script, wait_for_working=True) as subproc,
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
# Duration for profiling tests - long enough for process to complete naturally
PROFILING_TIMEOUT = str(int(SHORT_TIMEOUT))
+# Duration for profiling in tests - short enough to complete quickly
+PROFILING_DURATION_SEC = 2
+
@skip_if_not_supported
@unittest.skipIf(
self.assertEqual(total_occurrences(main_key), 2)
-@requires_subprocess()
-@skip_if_not_supported
-class TestSampleProfilerIntegration(unittest.TestCase):
- @classmethod
- def setUpClass(cls):
- cls.test_script = '''
-import time
-import os
-
+# Shared workload functions for test scripts
+_WORKLOAD_FUNCTIONS = '''
def slow_fibonacci(n):
- """Recursive fibonacci - should show up prominently in profiler."""
if n <= 1:
return n
return slow_fibonacci(n-1) + slow_fibonacci(n-2)
def cpu_intensive_work():
- """CPU intensive work that should show in profiler."""
result = 0
for i in range(10000):
result += i * i
result = result % 1000000
return result
-def main_loop():
- """Main test loop."""
- max_iterations = 200
-
- for iteration in range(max_iterations):
+def do_work():
+ iteration = 0
+ while True:
if iteration % 2 == 0:
- result = slow_fibonacci(15)
+ slow_fibonacci(15)
else:
- result = cpu_intensive_work()
+ cpu_intensive_work()
+ iteration += 1
+'''
+
-if __name__ == "__main__":
- main_loop()
+@requires_subprocess()
+@skip_if_not_supported
+class TestSampleProfilerIntegration(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ # Test script for use with test_subprocess() - signals when work starts
+ cls.test_script = _WORKLOAD_FUNCTIONS + '''
+_test_sock.sendall(b"working")
+do_work()
+'''
+ # CLI test script - runs for fixed duration (no socket sync)
+ cls.cli_test_script = '''
+import time
+''' + _WORKLOAD_FUNCTIONS.replace(
+ 'while True:', 'end_time = time.time() + 30\n while time.time() < end_time:'
+) + '''
+do_work()
'''
def test_sampling_basic_functionality(self):
with (
- test_subprocess(self.test_script) as subproc,
+ test_subprocess(self.test_script, wait_for_working=True) as subproc,
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
try:
- # Sample for up to SHORT_TIMEOUT seconds, but process exits after fixed iterations
collector = PstatsCollector(sample_interval_usec=1000, skip_idle=False)
profiling.sampling.sample.sample(
subproc.process.pid,
collector,
- duration_sec=SHORT_TIMEOUT,
+ duration_sec=PROFILING_DURATION_SEC,
)
collector.print_stats(show_summary=False)
except PermissionError:
)
self.addCleanup(close_and_unlink, pstats_out)
- with test_subprocess(self.test_script) as subproc:
+ with test_subprocess(self.test_script, wait_for_working=True) as subproc:
# Suppress profiler output when testing file export
with (
io.StringIO() as captured_output,
profiling.sampling.sample.sample(
subproc.process.pid,
collector,
- duration_sec=1,
+ duration_sec=PROFILING_DURATION_SEC,
)
collector.export(pstats_out.name)
except PermissionError:
self.addCleanup(close_and_unlink, collapsed_file)
with (
- test_subprocess(self.test_script) as subproc,
+ test_subprocess(self.test_script, wait_for_working=True) as subproc,
):
# Suppress profiler output when testing file export
with (
profiling.sampling.sample.sample(
subproc.process.pid,
collector,
- duration_sec=1,
+ duration_sec=PROFILING_DURATION_SEC,
)
collector.export(collapsed_file.name)
except PermissionError:
def test_sampling_all_threads(self):
with (
- test_subprocess(self.test_script) as subproc,
+ test_subprocess(self.test_script, wait_for_working=True) as subproc,
# Suppress profiler output
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
profiling.sampling.sample.sample(
subproc.process.pid,
collector,
- duration_sec=1,
+ duration_sec=PROFILING_DURATION_SEC,
all_threads=True,
)
collector.print_stats(show_summary=False)
def test_sample_target_script(self):
script_file = tempfile.NamedTemporaryFile(delete=False)
- script_file.write(self.test_script.encode("utf-8"))
+ script_file.write(self.cli_test_script.encode("utf-8"))
script_file.flush()
self.addCleanup(close_and_unlink, script_file)
- # Sample for up to SHORT_TIMEOUT seconds, but process exits after fixed iterations
- test_args = ["profiling.sampling.sample", "run", "-d", PROFILING_TIMEOUT, script_file.name]
+ # Sample for PROFILING_DURATION_SEC seconds
+ test_args = [
+ "profiling.sampling.sample", "run",
+ "-d", str(PROFILING_DURATION_SEC),
+ script_file.name
+ ]
with (
mock.patch("sys.argv", test_args),
module_path = os.path.join(tempdir.name, "test_module.py")
with open(module_path, "w") as f:
- f.write(self.test_script)
+ f.write(self.cli_test_script)
test_args = [
"profiling.sampling.cli",
"run",
"-d",
- PROFILING_TIMEOUT,
+ str(PROFILING_DURATION_SEC),
"-m",
"test_module",
]
profiling.sampling.sample.sample(-1, collector, duration_sec=1)
def test_process_dies_during_sampling(self):
+ # Use wait_for_working=False since this simple script doesn't send "working"
with test_subprocess(
- "import time; time.sleep(0.5); exit()"
+ "import time; time.sleep(0.5); exit()",
+ wait_for_working=False
) as subproc:
with (
io.StringIO() as captured_output,
self.assertIn("Error rate", output)
def test_is_process_running(self):
- with test_subprocess("import time; time.sleep(1000)") as subproc:
+ # Use wait_for_working=False since this simple script doesn't send "working"
+ with test_subprocess(
+ "import time; time.sleep(1000)",
+ wait_for_working=False
+ ) as subproc:
try:
profiler = SampleProfiler(
pid=subproc.process.pid,
@unittest.skipUnless(sys.platform == "linux", "Only valid on Linux")
def test_esrch_signal_handling(self):
- with test_subprocess("import time; time.sleep(1000)") as subproc:
+ # Use wait_for_working=False since this simple script doesn't send "working"
+ with test_subprocess(
+ "import time; time.sleep(1000)",
+ wait_for_working=False
+ ) as subproc:
try:
unwinder = _remote_debugging.RemoteUnwinder(
subproc.process.pid
@classmethod
def setUpClass(cls):
+ # Async test script that runs indefinitely until killed.
+ # Sends "working" signal AFTER tasks are created and scheduled.
cls.async_script = '''
import asyncio
async def sleeping_leaf():
- """Leaf task that just sleeps - visible in 'all' mode."""
- for _ in range(50):
+ while True:
await asyncio.sleep(0.02)
async def cpu_leaf():
- """Leaf task that does CPU work - visible in both modes."""
total = 0
- for _ in range(200):
+ while True:
for i in range(10000):
total += i * i
await asyncio.sleep(0)
- return total
async def supervisor():
- """Middle layer that spawns leaf tasks."""
tasks = [
asyncio.create_task(sleeping_leaf(), name="Sleeper-0"),
asyncio.create_task(sleeping_leaf(), name="Sleeper-1"),
asyncio.create_task(sleeping_leaf(), name="Sleeper-2"),
asyncio.create_task(cpu_leaf(), name="Worker"),
]
+ await asyncio.sleep(0) # Let tasks get scheduled
+ _test_sock.sendall(b"working")
await asyncio.gather(*tasks)
-async def main():
- await supervisor()
-
-if __name__ == "__main__":
- asyncio.run(main())
+asyncio.run(supervisor())
'''
def _collect_async_samples(self, async_aware_mode):
Returns a dict mapping function names to their sample counts.
"""
- with test_subprocess(self.async_script) as subproc:
+ with test_subprocess(self.async_script, wait_for_working=True) as subproc:
try:
collector = CollapsedStackCollector(1000, skip_idle=False)
profiling.sampling.sample.sample(
subproc.process.pid,
collector,
- duration_sec=SHORT_TIMEOUT,
+ duration_sec=PROFILING_DURATION_SEC,
async_aware=async_aware_mode,
)
except PermissionError:
while True:
x += 1
-def main():
- # Start both threads
- idle_thread = threading.Thread(target=idle_worker)
- cpu_thread = threading.Thread(target=cpu_active_worker)
- idle_thread.start()
- cpu_thread.start()
-
- # Wait for CPU thread to be running, then signal test
- cpu_ready.wait()
- _test_sock.sendall(b"threads_ready")
-
- idle_thread.join()
- cpu_thread.join()
-
-main()
-
+idle_thread = threading.Thread(target=idle_worker)
+cpu_thread = threading.Thread(target=cpu_active_worker)
+idle_thread.start()
+cpu_thread.start()
+cpu_ready.wait()
+_test_sock.sendall(b"working")
+idle_thread.join()
+cpu_thread.join()
"""
- with test_subprocess(cpu_vs_idle_script) as subproc:
- # Wait for signal that threads are running
- response = subproc.socket.recv(1024)
- self.assertEqual(response, b"threads_ready")
+ with test_subprocess(cpu_vs_idle_script, wait_for_working=True) as subproc:
with (
io.StringIO() as captured_output,
while True:
x += 1
-def main():
- # Start both threads
- idle_thread = threading.Thread(target=gil_releasing_work)
- cpu_thread = threading.Thread(target=gil_holding_work)
- idle_thread.start()
- cpu_thread.start()
-
- # Wait for GIL-holding thread to be running, then signal test
- gil_ready.wait()
- _test_sock.sendall(b"threads_ready")
-
- idle_thread.join()
- cpu_thread.join()
-
-main()
+idle_thread = threading.Thread(target=gil_releasing_work)
+cpu_thread = threading.Thread(target=gil_holding_work)
+idle_thread.start()
+cpu_thread.start()
+gil_ready.wait()
+_test_sock.sendall(b"working")
+idle_thread.join()
+cpu_thread.join()
"""
- with test_subprocess(gil_test_script) as subproc:
- # Wait for signal that threads are running
- response = subproc.socket.recv(1024)
- self.assertEqual(response, b"threads_ready")
+ with test_subprocess(gil_test_script, wait_for_working=True) as subproc:
with (
io.StringIO() as captured_output,