]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
gh-138122: Move local imports to module level in sampling profiler (#143257)
authorPablo Galindo Salgado <Pablogsal@gmail.com>
Fri, 2 Jan 2026 02:31:39 +0000 (02:31 +0000)
committerGitHub <noreply@github.com>
Fri, 2 Jan 2026 02:31:39 +0000 (02:31 +0000)
15 files changed:
Lib/profiling/sampling/binary_reader.py
Lib/profiling/sampling/cli.py
Lib/profiling/sampling/heatmap_collector.py
Lib/profiling/sampling/live_collector/collector.py
Lib/profiling/sampling/live_collector/widgets.py
Lib/profiling/sampling/pstats_collector.py
Lib/profiling/sampling/stack_collector.py
Lib/test/test_profiling/test_sampling_profiler/test_advanced.py
Lib/test/test_profiling/test_sampling_profiler/test_async.py
Lib/test/test_profiling/test_sampling_profiler/test_children.py
Lib/test/test_profiling/test_sampling_profiler/test_collectors.py
Lib/test/test_profiling/test_sampling_profiler/test_integration.py
Lib/test/test_profiling/test_sampling_profiler/test_live_collector_interaction.py
Lib/test/test_profiling/test_sampling_profiler/test_modes.py
Lib/test/test_profiling/test_sampling_profiler/test_profiler.py

index 50c96668cc585baf5f4ffa8952775781c7e1c370..a11be3652597a6b25143112cd6ff38ce77a6ef50 100644 (file)
@@ -1,5 +1,11 @@
 """Thin Python wrapper around C binary reader for profiling data."""
 
+import _remote_debugging
+
+from .gecko_collector import GeckoCollector
+from .stack_collector import FlamegraphCollector, CollapsedStackCollector
+from .pstats_collector import PstatsCollector
+
 
 class BinaryReader:
     """High-performance binary reader using C implementation.
@@ -23,7 +29,6 @@ class BinaryReader:
         self._reader = None
 
     def __enter__(self):
-        import _remote_debugging
         self._reader = _remote_debugging.BinaryReader(self.filename)
         return self
 
@@ -99,10 +104,6 @@ def convert_binary_to_format(input_file, output_file, output_format,
     Returns:
         int: Number of samples converted
     """
-    from .gecko_collector import GeckoCollector
-    from .stack_collector import FlamegraphCollector, CollapsedStackCollector
-    from .pstats_collector import PStatsCollector
-
     with BinaryReader(input_file) as reader:
         info = reader.get_info()
         interval = sample_interval_usec or info['sample_interval_us']
@@ -113,7 +114,7 @@ def convert_binary_to_format(input_file, output_file, output_format,
         elif output_format == 'collapsed':
             collector = CollapsedStackCollector(interval)
         elif output_format == 'pstats':
-            collector = PStatsCollector(interval)
+            collector = PstatsCollector(interval)
         elif output_format == 'gecko':
             collector = GeckoCollector(interval)
         else:
index ea3926c95658096f82536b04e8a3e4a9e4204e43..c0dcda46fc29d346af2e08b138d67afc9f1a41bb 100644 (file)
@@ -36,6 +36,12 @@ from .constants import (
     SORT_MODE_NSAMPLES_CUMUL,
 )
 
+try:
+    from ._child_monitor import ChildProcessMonitor
+except ImportError:
+    # _remote_debugging module not available on this platform (e.g., WASI)
+    ChildProcessMonitor = None
+
 try:
     from .live_collector import LiveStatsCollector
 except ImportError:
@@ -94,8 +100,6 @@ COLLECTOR_MAP = {
 }
 
 def _setup_child_monitor(args, parent_pid):
-    from ._child_monitor import ChildProcessMonitor
-
     # Build CLI args for child profilers (excluding --subprocesses to avoid recursion)
     child_cli_args = _build_child_profiler_args(args)
 
@@ -691,6 +695,11 @@ def _validate_args(args, parser):
 
     # --subprocesses is incompatible with --live
     if hasattr(args, 'subprocesses') and args.subprocesses:
+        if ChildProcessMonitor is None:
+            parser.error(
+                "--subprocesses is not available on this platform "
+                "(requires _remote_debugging module)."
+            )
         if hasattr(args, 'live') and args.live:
             parser.error("--subprocesses is incompatible with --live mode.")
 
@@ -1160,8 +1169,6 @@ def _handle_live_run(args):
 
 def _handle_replay(args):
     """Handle the 'replay' command - convert binary profile to another format."""
-    import os
-
     if not os.path.exists(args.input_file):
         sys.exit(f"Error: Input file not found: {args.input_file}")
 
index 022e94d014f9b759191f9044876128089b233951..b6d9ff79e8ceece3c568d21913c164f133907992 100644 (file)
@@ -18,6 +18,7 @@ from typing import Dict, List, Tuple
 from ._css_utils import get_combined_css
 from ._format_utils import fmt
 from .collector import normalize_location, extract_lineno
+from .opcode_utils import get_opcode_info, format_opcode
 from .stack_collector import StackTraceCollector
 
 
@@ -642,8 +643,6 @@ class HeatmapCollector(StackTraceCollector):
         Returns:
             List of dicts with instruction info, sorted by samples descending
         """
-        from .opcode_utils import get_opcode_info, format_opcode
-
         key = (filename, lineno)
         opcode_data = self.line_opcodes.get(key, {})
 
@@ -1046,8 +1045,6 @@ class HeatmapCollector(StackTraceCollector):
         Simple: collect ranges with sample counts, assign each byte position to
         smallest covering range, then emit spans for contiguous runs with sample data.
         """
-        import html as html_module
-
         content = line_content.rstrip('\n')
         if not content:
             return ''
@@ -1070,7 +1067,7 @@ class HeatmapCollector(StackTraceCollector):
                             range_data[key]['opcodes'].append(opname)
 
         if not range_data:
-            return html_module.escape(content)
+            return html.escape(content)
 
         # For each byte position, find the smallest covering range
         byte_to_range = {}
@@ -1098,7 +1095,7 @@ class HeatmapCollector(StackTraceCollector):
         def flush_span():
             nonlocal span_chars, current_range
             if span_chars:
-                text = html_module.escape(''.join(span_chars))
+                text = html.escape(''.join(span_chars))
                 if current_range:
                     data = range_data.get(current_range, {'samples': 0, 'opcodes': []})
                     samples = data['samples']
@@ -1112,7 +1109,7 @@ class HeatmapCollector(StackTraceCollector):
                                   f'data-samples="{samples}" '
                                   f'data-max-samples="{max_range_samples}" '
                                   f'data-pct="{pct}" '
-                                  f'data-opcodes="{html_module.escape(opcodes)}">{text}</span>')
+                                  f'data-opcodes="{html.escape(opcodes)}">{text}</span>')
                 else:
                     result.append(text)
                 span_chars = []
index c91ed9e0ea9367c822b9200ac57550eff3c6fe94..c03df4075277cdb8862ebea4bf784669bbe4d61a 100644 (file)
@@ -916,8 +916,6 @@ class LiveStatsCollector(Collector):
 
     def _handle_input(self):
         """Handle keyboard input (non-blocking)."""
-        from . import constants
-
         self.display.set_nodelay(True)
         ch = self.display.get_input()
 
index ac215dbfeb896e5dddbc5c836ef918a7fc70be78..86d2649f875e6227369d18ad467c3739520e6c86 100644 (file)
@@ -31,6 +31,7 @@ from ..constants import (
     PROFILING_MODE_GIL,
     PROFILING_MODE_WALL,
 )
+from ..opcode_utils import get_opcode_info, format_opcode
 
 
 class Widget(ABC):
@@ -1013,8 +1014,6 @@ class OpcodePanel(Widget):
         Returns:
             Next available line number
         """
-        from ..opcode_utils import get_opcode_info, format_opcode
-
         stats_list = kwargs.get("stats_list", [])
         height = kwargs.get("height", 24)
         selected_row = self.collector.selected_row
index e0dc9ab6bb7edb8e308828789f083ee551e4ac3f..6be1d698ffaa9a5c9c20b0105aca685438550ffc 100644 (file)
@@ -1,9 +1,10 @@
 import collections
 import marshal
+import pstats
 
 from _colorize import ANSIColors
 from .collector import Collector, extract_lineno
-from .constants import MICROSECONDS_PER_SECOND
+from .constants import MICROSECONDS_PER_SECOND, PROFILING_MODE_CPU
 
 
 class PstatsCollector(Collector):
@@ -86,9 +87,6 @@ class PstatsCollector(Collector):
 
     def print_stats(self, sort=-1, limit=None, show_summary=True, mode=None):
         """Print formatted statistics to stdout."""
-        import pstats
-        from .constants import PROFILING_MODE_CPU
-
         # Create stats object
         stats = pstats.SampledStats(self).strip_dirs()
         if not stats.stats:
index 55e643d0e9c8cb2edcd720413b5cef8c06193db1..4e213cfe41ca24ffb98ebf6fa41a6f253c3c55b9 100644 (file)
@@ -6,6 +6,7 @@ import json
 import linecache
 import os
 import sys
+import sysconfig
 
 from ._css_utils import get_combined_css
 from .collector import Collector, extract_lineno
@@ -244,7 +245,6 @@ class FlamegraphCollector(StackTraceCollector):
             }
 
         # Calculate thread status percentages for display
-        import sysconfig
         is_free_threaded = bool(sysconfig.get_config_var("Py_GIL_DISABLED"))
         total_threads = max(1, self.thread_status_counts["total"])
         thread_stats = {
index bcd4de7f5d7ebe6a02d247335eba80984ea81faa..11b1ad84242fd4388c303ec0e00713513634a3d9 100644 (file)
@@ -11,6 +11,8 @@ try:
     import _remote_debugging  # noqa: F401
     import profiling.sampling
     import profiling.sampling.sample
+    from profiling.sampling.pstats_collector import PstatsCollector
+    from profiling.sampling.stack_collector import CollapsedStackCollector
 except ImportError:
     raise unittest.SkipTest(
         "Test only runs when _remote_debugging is available"
@@ -61,7 +63,6 @@ while True:
             io.StringIO() as captured_output,
             mock.patch("sys.stdout", captured_output),
         ):
-            from profiling.sampling.pstats_collector import PstatsCollector
             collector = PstatsCollector(sample_interval_usec=5000, skip_idle=False)
             profiling.sampling.sample.sample(
                 subproc.process.pid,
@@ -88,7 +89,6 @@ while True:
             io.StringIO() as captured_output,
             mock.patch("sys.stdout", captured_output),
         ):
-            from profiling.sampling.pstats_collector import PstatsCollector
             collector = PstatsCollector(sample_interval_usec=5000, skip_idle=False)
             profiling.sampling.sample.sample(
                 subproc.process.pid,
@@ -140,7 +140,6 @@ while True:
                 io.StringIO() as captured_output,
                 mock.patch("sys.stdout", captured_output),
             ):
-                from profiling.sampling.stack_collector import CollapsedStackCollector
                 collector = CollapsedStackCollector(1000, skip_idle=False)
                 profiling.sampling.sample.sample(
                     subproc.process.pid,
@@ -176,7 +175,6 @@ while True:
             io.StringIO() as captured_output,
             mock.patch("sys.stdout", captured_output),
         ):
-            from profiling.sampling.pstats_collector import PstatsCollector
             collector = PstatsCollector(sample_interval_usec=5000, skip_idle=False)
             profiling.sampling.sample.sample(
                 subproc.process.pid,
index d8ca86c996bffa45e9c1397846befe00b3164d9c..1f5685717b6273c0466a5d23b1688936cb0edba2 100644 (file)
@@ -6,11 +6,14 @@ Each test covers a distinct algorithm path or edge case:
 3. Stack traversal: _build_linear_stacks() with BFS
 """
 
+import inspect
 import unittest
 
 try:
     import _remote_debugging  # noqa: F401
     from profiling.sampling.pstats_collector import PstatsCollector
+    from profiling.sampling.stack_collector import FlamegraphCollector
+    from profiling.sampling.sample import sample, sample_live, SampleProfiler
 except ImportError:
     raise unittest.SkipTest(
         "Test only runs when _remote_debugging is available"
@@ -561,8 +564,6 @@ class TestFlamegraphCollectorAsync(unittest.TestCase):
 
     def test_flamegraph_with_async_frames(self):
         """Test FlamegraphCollector correctly processes async task frames."""
-        from profiling.sampling.stack_collector import FlamegraphCollector
-
         collector = FlamegraphCollector(sample_interval_usec=1000)
 
         # Build async task tree: Root -> Child
@@ -607,8 +608,6 @@ class TestFlamegraphCollectorAsync(unittest.TestCase):
 
     def test_flamegraph_with_task_markers(self):
         """Test FlamegraphCollector includes <task> boundary markers."""
-        from profiling.sampling.stack_collector import FlamegraphCollector
-
         collector = FlamegraphCollector(sample_interval_usec=1000)
 
         task = MockTaskInfo(
@@ -643,8 +642,6 @@ class TestFlamegraphCollectorAsync(unittest.TestCase):
 
     def test_flamegraph_multiple_async_samples(self):
         """Test FlamegraphCollector aggregates multiple async samples correctly."""
-        from profiling.sampling.stack_collector import FlamegraphCollector
-
         collector = FlamegraphCollector(sample_interval_usec=1000)
 
         task = MockTaskInfo(
@@ -675,25 +672,16 @@ class TestAsyncAwareParameterFlow(unittest.TestCase):
 
     def test_sample_function_accepts_async_aware(self):
         """Test that sample() function accepts async_aware parameter."""
-        from profiling.sampling.sample import sample
-        import inspect
-
         sig = inspect.signature(sample)
         self.assertIn("async_aware", sig.parameters)
 
     def test_sample_live_function_accepts_async_aware(self):
         """Test that sample_live() function accepts async_aware parameter."""
-        from profiling.sampling.sample import sample_live
-        import inspect
-
         sig = inspect.signature(sample_live)
         self.assertIn("async_aware", sig.parameters)
 
     def test_sample_profiler_sample_accepts_async_aware(self):
         """Test that SampleProfiler.sample() accepts async_aware parameter."""
-        from profiling.sampling.sample import SampleProfiler
-        import inspect
-
         sig = inspect.signature(SampleProfiler.sample)
         self.assertIn("async_aware", sig.parameters)
 
index 84d50cd2088a9eb7ba41741cbe50207494ede3e8..bb49faa890f3481dc929ce89b402dd09acc72b8a 100644 (file)
@@ -10,6 +10,7 @@ import tempfile
 import threading
 import time
 import unittest
+from unittest.mock import MagicMock, patch
 
 from test.support import (
     SHORT_TIMEOUT,
@@ -17,6 +18,40 @@ from test.support import (
     requires_remote_subprocess_debugging,
 )
 
+# Guard imports that require _remote_debugging module.
+# This module is not available on all platforms (e.g., WASI).
+try:
+    from profiling.sampling._child_monitor import (
+        get_child_pids,
+        ChildProcessMonitor,
+        is_python_process,
+        _MAX_CHILD_PROFILERS,
+        _CLEANUP_INTERVAL_CYCLES,
+    )
+except ImportError:
+    # Module will be skipped via @requires_remote_subprocess_debugging decorators
+    get_child_pids = None
+    ChildProcessMonitor = None
+    is_python_process = None
+    _MAX_CHILD_PROFILERS = None
+    _CLEANUP_INTERVAL_CYCLES = None
+
+try:
+    from profiling.sampling.cli import (
+        _add_sampling_options,
+        _validate_args,
+        _build_child_profiler_args,
+        _build_output_pattern,
+        _setup_child_monitor,
+    )
+except ImportError:
+    # cli module imports sample module which requires _remote_debugging
+    _add_sampling_options = None
+    _validate_args = None
+    _build_child_profiler_args = None
+    _build_output_pattern = None
+    _setup_child_monitor = None
+
 from .helpers import _cleanup_process
 
 # String to check for in stderr when profiler lacks permissions (e.g., macOS)
@@ -100,8 +135,6 @@ class TestGetChildPids(unittest.TestCase):
 
     def test_get_child_pids_fallback(self):
         """Test the fallback implementation for get_child_pids."""
-        from profiling.sampling._child_monitor import get_child_pids
-
         # Test with current process
         result = get_child_pids(os.getpid())
         self.assertIsInstance(result, list)
@@ -109,8 +142,6 @@ class TestGetChildPids(unittest.TestCase):
     @unittest.skipUnless(sys.platform == "linux", "Linux only")
     def test_discover_child_process_linux(self):
         """Test that we can discover child processes on Linux."""
-        from profiling.sampling._child_monitor import get_child_pids
-
         # Create a child process
         proc = subprocess.Popen(
             [sys.executable, "-c", "import time; time.sleep(10)"],
@@ -139,8 +170,6 @@ class TestGetChildPids(unittest.TestCase):
 
     def test_recursive_child_discovery(self):
         """Test that recursive=True finds grandchildren."""
-        from profiling.sampling._child_monitor import get_child_pids
-
         # Create a child that spawns a grandchild and keeps a reference to it
         # so we can clean it up via the child process
         code = """
@@ -256,8 +285,6 @@ if grandchild.poll() is None:
 
     def test_nonexistent_pid_returns_empty(self):
         """Test that nonexistent PID returns empty list."""
-        from profiling.sampling._child_monitor import get_child_pids
-
         # Use a very high PID that's unlikely to exist
         result = get_child_pids(999999999)
         self.assertEqual(result, [])
@@ -275,8 +302,6 @@ class TestChildProcessMonitor(unittest.TestCase):
 
     def test_monitor_creation(self):
         """Test that ChildProcessMonitor can be created."""
-        from profiling.sampling._child_monitor import ChildProcessMonitor
-
         monitor = ChildProcessMonitor(
             pid=os.getpid(),
             cli_args=["-r", "10khz", "-d", "5"],
@@ -288,8 +313,6 @@ class TestChildProcessMonitor(unittest.TestCase):
 
     def test_monitor_lifecycle(self):
         """Test monitor lifecycle via context manager."""
-        from profiling.sampling._child_monitor import ChildProcessMonitor
-
         monitor = ChildProcessMonitor(
             pid=os.getpid(), cli_args=[], output_pattern=None
         )
@@ -307,8 +330,6 @@ class TestChildProcessMonitor(unittest.TestCase):
 
     def test_spawned_profilers_property(self):
         """Test that spawned_profilers returns a copy of the list."""
-        from profiling.sampling._child_monitor import ChildProcessMonitor
-
         monitor = ChildProcessMonitor(
             pid=os.getpid(), cli_args=[], output_pattern=None
         )
@@ -320,8 +341,6 @@ class TestChildProcessMonitor(unittest.TestCase):
 
     def test_context_manager(self):
         """Test that ChildProcessMonitor works as a context manager."""
-        from profiling.sampling._child_monitor import ChildProcessMonitor
-
         with ChildProcessMonitor(
             pid=os.getpid(), cli_args=[], output_pattern=None
         ) as monitor:
@@ -344,8 +363,6 @@ class TestCLIChildrenFlag(unittest.TestCase):
 
     def test_subprocesses_flag_parsed(self):
         """Test that --subprocesses flag is recognized."""
-        from profiling.sampling.cli import _add_sampling_options
-
         parser = argparse.ArgumentParser()
         _add_sampling_options(parser)
 
@@ -359,8 +376,6 @@ class TestCLIChildrenFlag(unittest.TestCase):
 
     def test_subprocesses_incompatible_with_live(self):
         """Test that --subprocesses is incompatible with --live."""
-        from profiling.sampling.cli import _validate_args
-
         # Create mock args with both subprocesses and live
         args = argparse.Namespace(
             subprocesses=True,
@@ -383,8 +398,6 @@ class TestCLIChildrenFlag(unittest.TestCase):
 
     def test_build_child_profiler_args(self):
         """Test building CLI args for child profilers."""
-        from profiling.sampling.cli import _build_child_profiler_args
-
         args = argparse.Namespace(
             sample_interval_usec=200,
             duration=15,
@@ -446,8 +459,6 @@ class TestCLIChildrenFlag(unittest.TestCase):
 
     def test_build_child_profiler_args_no_gc(self):
         """Test building CLI args with --no-gc."""
-        from profiling.sampling.cli import _build_child_profiler_args
-
         args = argparse.Namespace(
             sample_interval_usec=100,
             duration=5,
@@ -471,8 +482,6 @@ class TestCLIChildrenFlag(unittest.TestCase):
 
     def test_build_output_pattern_with_outfile(self):
         """Test output pattern generation with user-specified output."""
-        from profiling.sampling.cli import _build_output_pattern
-
         # With extension
         args = argparse.Namespace(outfile="output.html", format="flamegraph")
         pattern = _build_output_pattern(args)
@@ -485,8 +494,6 @@ class TestCLIChildrenFlag(unittest.TestCase):
 
     def test_build_output_pattern_default(self):
         """Test output pattern generation with default output."""
-        from profiling.sampling.cli import _build_output_pattern
-
         # Flamegraph format
         args = argparse.Namespace(outfile=None, format="flamegraph")
         pattern = _build_output_pattern(args)
@@ -512,8 +519,6 @@ class TestChildrenIntegration(unittest.TestCase):
 
     def test_setup_child_monitor(self):
         """Test setting up a child monitor from args."""
-        from profiling.sampling.cli import _setup_child_monitor
-
         args = argparse.Namespace(
             sample_interval_usec=100,
             duration=5,
@@ -553,8 +558,6 @@ class TestIsPythonProcess(unittest.TestCase):
 
     def test_is_python_process_current_process(self):
         """Test that current process is detected as Python."""
-        from profiling.sampling._child_monitor import is_python_process
-
         # Current process should be Python
         result = is_python_process(os.getpid())
         self.assertTrue(
@@ -564,8 +567,6 @@ class TestIsPythonProcess(unittest.TestCase):
 
     def test_is_python_process_python_subprocess(self):
         """Test that a Python subprocess is detected as Python."""
-        from profiling.sampling._child_monitor import is_python_process
-
         # Start a Python subprocess
         proc = subprocess.Popen(
             [sys.executable, "-c", "import time; time.sleep(10)"],
@@ -598,8 +599,6 @@ class TestIsPythonProcess(unittest.TestCase):
     @unittest.skipUnless(sys.platform == "linux", "Linux only test")
     def test_is_python_process_non_python_subprocess(self):
         """Test that a non-Python subprocess is not detected as Python."""
-        from profiling.sampling._child_monitor import is_python_process
-
         # Start a non-Python subprocess (sleep command)
         proc = subprocess.Popen(
             ["sleep", "10"],
@@ -624,8 +623,6 @@ class TestIsPythonProcess(unittest.TestCase):
 
     def test_is_python_process_nonexistent_pid(self):
         """Test that nonexistent PID returns False."""
-        from profiling.sampling._child_monitor import is_python_process
-
         # Use a very high PID that's unlikely to exist
         result = is_python_process(999999999)
         self.assertFalse(
@@ -635,8 +632,6 @@ class TestIsPythonProcess(unittest.TestCase):
 
     def test_is_python_process_exited_process(self):
         """Test handling of a process that exits quickly."""
-        from profiling.sampling._child_monitor import is_python_process
-
         # Start a process that exits immediately
         proc = subprocess.Popen(
             [sys.executable, "-c", "pass"],
@@ -666,8 +661,6 @@ class TestMaxChildProfilersLimit(unittest.TestCase):
 
     def test_max_profilers_constant_exists(self):
         """Test that _MAX_CHILD_PROFILERS constant is defined."""
-        from profiling.sampling._child_monitor import _MAX_CHILD_PROFILERS
-
         self.assertEqual(
             _MAX_CHILD_PROFILERS,
             100,
@@ -676,8 +669,6 @@ class TestMaxChildProfilersLimit(unittest.TestCase):
 
     def test_cleanup_interval_constant_exists(self):
         """Test that _CLEANUP_INTERVAL_CYCLES constant is defined."""
-        from profiling.sampling._child_monitor import _CLEANUP_INTERVAL_CYCLES
-
         self.assertEqual(
             _CLEANUP_INTERVAL_CYCLES,
             10,
@@ -686,12 +677,6 @@ class TestMaxChildProfilersLimit(unittest.TestCase):
 
     def test_monitor_respects_max_limit(self):
         """Test that monitor refuses to spawn more than _MAX_CHILD_PROFILERS."""
-        from profiling.sampling._child_monitor import (
-            ChildProcessMonitor,
-            _MAX_CHILD_PROFILERS,
-        )
-        from unittest.mock import MagicMock, patch
-
         # Create a monitor
         monitor = ChildProcessMonitor(
             pid=os.getpid(),
@@ -744,8 +729,6 @@ class TestWaitForProfilers(unittest.TestCase):
 
     def test_wait_for_profilers_empty_list(self):
         """Test that wait_for_profilers returns immediately with no profilers."""
-        from profiling.sampling._child_monitor import ChildProcessMonitor
-
         monitor = ChildProcessMonitor(
             pid=os.getpid(), cli_args=[], output_pattern=None
         )
@@ -772,8 +755,6 @@ class TestWaitForProfilers(unittest.TestCase):
 
     def test_wait_for_profilers_with_completed_process(self):
         """Test waiting for profilers that complete quickly."""
-        from profiling.sampling._child_monitor import ChildProcessMonitor
-
         monitor = ChildProcessMonitor(
             pid=os.getpid(), cli_args=[], output_pattern=None
         )
@@ -812,8 +793,6 @@ class TestWaitForProfilers(unittest.TestCase):
 
     def test_wait_for_profilers_timeout(self):
         """Test that wait_for_profilers respects timeout."""
-        from profiling.sampling._child_monitor import ChildProcessMonitor
-
         monitor = ChildProcessMonitor(
             pid=os.getpid(), cli_args=[], output_pattern=None
         )
@@ -852,8 +831,6 @@ class TestWaitForProfilers(unittest.TestCase):
 
     def test_wait_for_profilers_multiple(self):
         """Test waiting for multiple profilers."""
-        from profiling.sampling._child_monitor import ChildProcessMonitor
-
         monitor = ChildProcessMonitor(
             pid=os.getpid(), cli_args=[], output_pattern=None
         )
index 30615a7d31d86c4c6a060ea856f7fef0d160ffd2..13bdb4e111364cc2a253c971c64d52187c1e4ed4 100644 (file)
@@ -2,6 +2,7 @@
 
 import json
 import marshal
+import opcode
 import os
 import tempfile
 import unittest
@@ -1437,7 +1438,6 @@ class TestOpcodeFormatting(unittest.TestCase):
 
     def test_get_opcode_info_standard_opcode(self):
         """Test get_opcode_info for a standard opcode."""
-        import opcode
         # LOAD_CONST is a standard opcode
         load_const = opcode.opmap.get('LOAD_CONST')
         if load_const is not None:
@@ -1455,7 +1455,6 @@ class TestOpcodeFormatting(unittest.TestCase):
 
     def test_format_opcode_standard(self):
         """Test format_opcode for a standard opcode."""
-        import opcode
         load_const = opcode.opmap.get('LOAD_CONST')
         if load_const is not None:
             formatted = format_opcode(load_const)
@@ -1463,7 +1462,6 @@ class TestOpcodeFormatting(unittest.TestCase):
 
     def test_format_opcode_specialized(self):
         """Test format_opcode for a specialized opcode shows base in parens."""
-        import opcode
         if not hasattr(opcode, '_specialized_opmap'):
             self.skipTest("No specialized opcodes in this Python version")
         if not hasattr(opcode, '_specializations'):
index b82474858ddd4a7f99e63377aca6a650927c73bb..c6731e956391a9136c63ed6f5ff6eba8e2ce507e 100644 (file)
@@ -18,6 +18,7 @@ try:
     from profiling.sampling.pstats_collector import PstatsCollector
     from profiling.sampling.stack_collector import CollapsedStackCollector
     from profiling.sampling.sample import SampleProfiler, _is_process_running
+    from profiling.sampling.cli import main
 except ImportError:
     raise unittest.SkipTest(
         "Test only runs when _remote_debugging is available"
@@ -547,7 +548,6 @@ do_work()
             io.StringIO() as captured_output,
             mock.patch("sys.stdout", captured_output),
         ):
-            from profiling.sampling.cli import main
             main()
 
             output = captured_output.getvalue()
@@ -585,7 +585,6 @@ do_work()
             # Change to temp directory so subprocess can find the module
             contextlib.chdir(tempdir.name),
         ):
-            from profiling.sampling.cli import main
             main()
 
             output = captured_output.getvalue()
@@ -714,8 +713,7 @@ class TestSampleProfilerErrorHandling(unittest.TestCase):
                 test_args = ["profiling.sampling.cli", "run", "--live"] + args + ["test.py"]
                 with mock.patch("sys.argv", test_args):
                     with self.assertRaises(SystemExit) as cm:
-                        from profiling.sampling.cli import main
-                        main()
+                                    main()
                     self.assertNotEqual(cm.exception.code, 0)
 
     def test_live_incompatible_with_multiple_pstats_options(self):
@@ -727,8 +725,7 @@ class TestSampleProfilerErrorHandling(unittest.TestCase):
 
         with mock.patch("sys.argv", test_args):
             with self.assertRaises(SystemExit) as cm:
-                from profiling.sampling.cli import main
-                main()
+                    main()
             self.assertNotEqual(cm.exception.code, 0)
 
     def test_live_incompatible_with_pstats_default_values(self):
@@ -738,8 +735,7 @@ class TestSampleProfilerErrorHandling(unittest.TestCase):
 
         with mock.patch("sys.argv", test_args):
             with self.assertRaises(SystemExit) as cm:
-                from profiling.sampling.cli import main
-                main()
+                    main()
             self.assertNotEqual(cm.exception.code, 0)
 
         # Test with --limit=15 (the default value)
@@ -747,8 +743,7 @@ class TestSampleProfilerErrorHandling(unittest.TestCase):
 
         with mock.patch("sys.argv", test_args):
             with self.assertRaises(SystemExit) as cm:
-                from profiling.sampling.cli import main
-                main()
+                    main()
             self.assertNotEqual(cm.exception.code, 0)
 
 
index 38f1d03e4939f1e089859157417a886a119524be..8342faffb947629f4266a23fba049b388e3d4add 100644 (file)
@@ -383,10 +383,9 @@ class TestLiveCollectorInteractiveControls(unittest.TestCase):
 
     def test_finished_state_freezes_time(self):
         """Test that time displays are frozen when finished."""
-        import time as time_module
 
         # Set up collector with known start time
-        self.collector.start_time = time_module.perf_counter() - 10.0  # 10 seconds ago
+        self.collector.start_time = time.perf_counter() - 10.0  # 10 seconds ago
 
         # Mark as finished - this should freeze the time
         self.collector.mark_finished()
@@ -396,7 +395,7 @@ class TestLiveCollectorInteractiveControls(unittest.TestCase):
         frozen_time_display = self.collector.current_time_display
 
         # Wait a bit to ensure time would advance
-        time_module.sleep(0.1)
+        time.sleep(0.1)
 
         # Time should remain frozen
         self.assertEqual(self.collector.elapsed_time, frozen_elapsed)
@@ -1215,7 +1214,6 @@ class TestLiveCollectorNewFeatures(unittest.TestCase):
 
     def test_time_display_fix_when_finished(self):
         """Test that time display shows correct frozen time when finished."""
-        import time as time_module
 
         # Mark as finished to freeze time
         self.collector.mark_finished()
@@ -1228,7 +1226,7 @@ class TestLiveCollectorNewFeatures(unittest.TestCase):
         frozen_time = self.collector.current_time_display
 
         # Wait a bit
-        time_module.sleep(0.1)
+        time.sleep(0.1)
 
         # Should still show the same frozen time (not jump to wrong time)
         self.assertEqual(self.collector.current_time_display, frozen_time)
index 877237866b1e65fb5ac6cfbec5736703b3d8e0fd..0b38fb4ad4bcf68ec71244dcc251c750a6366639 100644 (file)
@@ -9,6 +9,12 @@ try:
     import profiling.sampling
     import profiling.sampling.sample
     from profiling.sampling.pstats_collector import PstatsCollector
+    from profiling.sampling.cli import main, _parse_mode
+    from profiling.sampling.constants import PROFILING_MODE_EXCEPTION
+    from _remote_debugging import (
+        THREAD_STATUS_HAS_GIL,
+        THREAD_STATUS_ON_CPU,
+    )
 except ImportError:
     raise unittest.SkipTest(
         "Test only runs when _remote_debugging is available"
@@ -40,7 +46,6 @@ class TestCpuModeFiltering(unittest.TestCase):
             mock.patch("sys.stderr", io.StringIO()) as mock_stderr,
             self.assertRaises(SystemExit) as cm,
         ):
-            from profiling.sampling.cli import main
             main()
 
         self.assertEqual(cm.exception.code, 2)  # argparse error
@@ -49,16 +54,6 @@ class TestCpuModeFiltering(unittest.TestCase):
 
     def test_frames_filtered_with_skip_idle(self):
         """Test that frames are actually filtered when skip_idle=True."""
-        # Import thread status flags
-        try:
-            from _remote_debugging import (
-                THREAD_STATUS_HAS_GIL,
-                THREAD_STATUS_ON_CPU,
-            )
-        except ImportError:
-            THREAD_STATUS_HAS_GIL = 1 << 0
-            THREAD_STATUS_ON_CPU = 1 << 1
-
         # Create mock frames with different thread statuses
         class MockThreadInfoWithStatus:
             def __init__(self, thread_id, frame_info, status):
@@ -240,7 +235,6 @@ class TestGilModeFiltering(unittest.TestCase):
 
     def test_gil_mode_validation(self):
         """Test that CLI accepts gil mode choice correctly."""
-        from profiling.sampling.cli import main
 
         test_args = [
             "profiling.sampling.cli",
@@ -298,7 +292,6 @@ class TestGilModeFiltering(unittest.TestCase):
 
     def test_gil_mode_cli_argument_parsing(self):
         """Test CLI argument parsing for GIL mode with various options."""
-        from profiling.sampling.cli import main
 
         test_args = [
             "profiling.sampling.cli",
@@ -405,7 +398,6 @@ cpu_thread.join()
 
     def test_parse_mode_function(self):
         """Test the _parse_mode function with all valid modes."""
-        from profiling.sampling.cli import _parse_mode
         self.assertEqual(_parse_mode("wall"), 0)
         self.assertEqual(_parse_mode("cpu"), 1)
         self.assertEqual(_parse_mode("gil"), 2)
@@ -422,7 +414,6 @@ class TestExceptionModeFiltering(unittest.TestCase):
 
     def test_exception_mode_validation(self):
         """Test that CLI accepts exception mode choice correctly."""
-        from profiling.sampling.cli import main
 
         test_args = [
             "profiling.sampling.cli",
@@ -480,7 +471,6 @@ class TestExceptionModeFiltering(unittest.TestCase):
 
     def test_exception_mode_cli_argument_parsing(self):
         """Test CLI argument parsing for exception mode with various options."""
-        from profiling.sampling.cli import main
 
         test_args = [
             "profiling.sampling.cli",
@@ -512,7 +502,6 @@ class TestExceptionModeFiltering(unittest.TestCase):
 
     def test_exception_mode_constants_are_defined(self):
         """Test that exception mode constant is properly defined."""
-        from profiling.sampling.constants import PROFILING_MODE_EXCEPTION
         self.assertEqual(PROFILING_MODE_EXCEPTION, 4)
 
     def test_exception_mode_integration_filtering(self):
index 822f559561eb0aaa0b51116a005bdc935da43f82..8d70a1d2ef8cfcedf9ad7447db6aed2b21b5b865 100644 (file)
@@ -1,6 +1,7 @@
 """Tests for sampling profiler core functionality."""
 
 import io
+import re
 from unittest import mock
 import unittest
 
@@ -591,7 +592,6 @@ class TestPrintSampledStats(unittest.TestCase):
 
         # Extract just the function names for comparison
         func_names = []
-        import re
 
         for line in data_lines:
             # Function name is between the last ( and ), accounting for ANSI color codes