]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
[3.15] gh-149718: Aggregate same stack frames in Tachyon in some collectors (GH-14971...
authorMiss Islington (bot) <31488909+miss-islington@users.noreply.github.com>
Wed, 13 May 2026 00:30:22 +0000 (02:30 +0200)
committerGitHub <noreply@github.com>
Wed, 13 May 2026 00:30:22 +0000 (01:30 +0100)
gh-149718: Aggregate same stack frames in Tachyon in some collectors (GH-149719)
(cherry picked from commit 76f22853410d3ded872cbfe1430852cf8c048962)

Co-authored-by: Maurycy Pawłowski-Wieroński <maurycy@maurycy.com>
Lib/profiling/sampling/collector.py
Lib/profiling/sampling/gecko_collector.py
Lib/profiling/sampling/heatmap_collector.py
Lib/profiling/sampling/pstats_collector.py
Lib/profiling/sampling/sample.py
Lib/profiling/sampling/stack_collector.py
Lib/test/test_profiling/test_heatmap.py
Lib/test/test_profiling/test_sampling_profiler/test_profiler.py
Misc/NEWS.d/next/Library/2026-05-12-13-03-45.gh-issue-149718.SaM1NJ.rst [new file with mode: 0644]

index 81ec6344ebdea4abba7a47781dcbc7a3bb18d029..8e0f0c44c4f8f367e7751bcba7990e5475dd3d87 100644 (file)
@@ -143,6 +143,8 @@ def iter_async_frames(awaited_info_list):
 
 
 class Collector(ABC):
+    aggregating = False
+
     @abstractmethod
     def collect(self, stack_frames, timestamps_us=None):
         """Collect profiling data from stack frames.
index 8986194268b3ce47f49518c009ceb483210032dd..54392af95000082932f0f8ec23b1d97474294bab 100644 (file)
@@ -63,6 +63,8 @@ STACKWALK_DISABLED = 0
 
 
 class GeckoCollector(Collector):
+    aggregating = True
+
     def __init__(self, sample_interval_usec, *, skip_idle=False, opcodes=False):
         self.sample_interval_usec = sample_interval_usec
         self.skip_idle = skip_idle
index 5c36d78f5535e7157fa948028b3996809fadd2b5..6e650ec08f410bc76ae844a11f5c6cc231b2593d 100644 (file)
@@ -452,7 +452,8 @@ class HeatmapCollector(StackTraceCollector):
                 next_lineno = extract_lineno(next_frame[1])
                 self._record_call_relationship(
                     (filename, lineno, funcname),
-                    (next_frame[0], next_lineno, next_frame[2])
+                    (next_frame[0], next_lineno, next_frame[2]),
+                    weight=weight,
                 )
 
     def _is_valid_frame(self, filename, lineno):
@@ -561,7 +562,7 @@ class HeatmapCollector(StackTraceCollector):
         result.sort(key=lambda x: (-x['samples'], x['opcode']))
         return result
 
-    def _record_call_relationship(self, callee_frame, caller_frame):
+    def _record_call_relationship(self, callee_frame, caller_frame, weight=1):
         """Record caller/callee relationship between adjacent frames."""
         callee_filename, callee_lineno, callee_funcname = callee_frame
         caller_filename, caller_lineno, caller_funcname = caller_frame
@@ -587,7 +588,7 @@ class HeatmapCollector(StackTraceCollector):
 
         # Count this call edge for path analysis
         edge_key = (caller_key, callee_key)
-        self.edge_samples[edge_key] += 1
+        self.edge_samples[edge_key] += weight
 
     def export(self, output_path):
         """Export heatmap data as HTML files in a directory.
index 50500296c15acc9ce3b82bee1b8723c622efb296..43b1daf2a119d4ea7777bc5b4266343b256fb867 100644 (file)
@@ -8,6 +8,8 @@ from .constants import MICROSECONDS_PER_SECOND, PROFILING_MODE_CPU
 
 
 class PstatsCollector(Collector):
+    aggregating = True
+
     def __init__(self, sample_interval_usec, *, skip_idle=False):
         self.result = collections.defaultdict(
             lambda: dict(total_rec_calls=0, direct_calls=0, cumulative_calls=0)
index 5bbe24835813332592d6d7200221ee701527bb90..b9e7e2625d09e478f02d25612e970d0a19884c34 100644 (file)
@@ -47,6 +47,9 @@ _FREE_THREADED_BUILD = sysconfig.get_config_var("Py_GIL_DISABLED") is not None
 # If fewer samples are collected, we skip the TUI and just print a message
 MIN_SAMPLES_FOR_TUI = 200
 
+# Maximum number of consecutive identical samples to keep before flushing.
+MAX_PENDING_SAMPLES = 8192
+
 class SampleProfiler:
     def __init__(self, pid, sample_interval_usec, all_threads, *, mode=PROFILING_MODE_WALL, native=False, gc=True, opcodes=False, skip_non_matching_threads=True, collect_stats=False, blocking=False):
         self.pid = pid
@@ -109,6 +112,20 @@ class SampleProfiler:
         last_sample_time = start_time
         realtime_update_interval = 1.0  # Update every second
         last_realtime_update = start_time
+        aggregating = getattr(collector, 'aggregating', False) is True
+        prev_stack = None
+        pending_count = 0
+        pending_timestamps = [] if aggregating else None
+
+        def flush_pending():
+            nonlocal pending_count, pending_timestamps
+            if pending_count == 0:
+                return
+            pending_count = 0
+            ts = pending_timestamps
+            pending_timestamps = []
+            collector.collect(prev_stack, timestamps_us=ts)
+
         try:
             while duration_sec is None or running_time_sec < duration_sec:
                 # Check if live collector wants to stop
@@ -116,6 +133,7 @@ class SampleProfiler:
                     break
 
                 current_time = time.perf_counter()
+                current_time_us = int(current_time * 1_000_000)
                 if next_time > current_time:
                     sleep_time = (next_time - current_time) * 0.9
                     if sleep_time > 0.0001:
@@ -125,13 +143,24 @@ class SampleProfiler:
                         stack_frames = self._get_stack_trace(
                             async_aware=async_aware
                         )
-                        collector.collect(stack_frames)
+                        if aggregating:
+                            if stack_frames != prev_stack:
+                                flush_pending()
+                                prev_stack = stack_frames
+                            pending_count += 1
+                            pending_timestamps.append(current_time_us)
+                            if pending_count >= MAX_PENDING_SAMPLES:
+                                flush_pending()
+                        else:
+                            collector.collect(stack_frames)
                     except ProcessLookupError as e:
                         running_time_sec = current_time - start_time
                         break
                     except (RuntimeError, UnicodeDecodeError, MemoryError, OSError):
+                        flush_pending()
                         collector.collect_failed_sample()
                         errors += 1
+                        prev_stack = None
                     except Exception as e:
                         if not _is_process_running(self.pid):
                             break
@@ -163,6 +192,8 @@ class SampleProfiler:
             interrupted = True
             running_time_sec = time.perf_counter() - start_time
             print("Interrupted by user.")
+        finally:
+            flush_pending()
 
         # Clear real-time stats line if it was being displayed
         if self.realtime_stats and len(self.sample_intervals) > 0:
index 60df026ed76a6cec73fc036dec9272f75882191c..42281dc6454c83c99dfc2a45c436c99d910da10d 100644 (file)
@@ -16,6 +16,8 @@ from .module_utils import extract_module_name, get_python_path_info
 
 
 class StackTraceCollector(Collector):
+    aggregating = True
+
     def __init__(self, sample_interval_usec, *, skip_idle=False):
         self.sample_interval_usec = sample_interval_usec
         self.skip_idle = skip_idle
index b2acb1cf577341d179c25a0ff18446b23f3259d6..ee27fdd3fa3053c1cb0d476eabde4e90018ab92c 100644 (file)
@@ -345,6 +345,21 @@ class TestHeatmapCollectorProcessFrames(unittest.TestCase):
         # Check that edge count is tracked
         self.assertGreater(len(collector.edge_samples), 0)
 
+    def test_process_frames_weight_applies_to_identical_samples(self):
+        collector = HeatmapCollector(sample_interval_usec=100)
+
+        frames = [
+            ('callee.py', (5, 5, -1, -1), 'callee', None),
+            ('caller.py', (10, 10, -1, -1), 'caller', None),
+        ]
+
+        collector.process_frames(frames, thread_id=1, weight=5)
+
+        edge_key = (('caller.py', 10), ('callee.py', 5))
+        self.assertEqual(collector.edge_samples[edge_key], 5)
+        self.assertEqual(collector.line_samples[('callee.py', 5)], 5)
+        self.assertEqual(collector.line_samples[('caller.py', 10)], 5)
+
     def test_process_frames_handles_empty_frames(self):
         """Test that process_frames handles empty frame list."""
         collector = HeatmapCollector(sample_interval_usec=100)
index 68bc59a5414a05c936e12ea4c3d7ae16edfa83a2..2f5a5e2732865901c97afddf269e125561c27e5b 100644 (file)
@@ -198,8 +198,83 @@ class TestSampleProfiler(unittest.TestCase):
             self.assertIn("samples", result)
 
             # Verify collector was called multiple times
-            self.assertGreaterEqual(mock_collector.collect.call_count, 5)
-            self.assertLessEqual(mock_collector.collect.call_count, 11)
+            total_weight = sum(
+                len(c.kwargs.get("timestamps_us") or [None])
+                for c in mock_collector.collect.call_args_list
+            )
+            self.assertGreaterEqual(total_weight, 5)
+            self.assertLessEqual(total_weight, 11)
+
+    def test_sample_profiler_does_not_buffer_non_aggregating_collectors(self):
+        """Test that non-aggregating collectors get each sample immediately."""
+
+        stack_frames = [mock.sentinel.stack_frames]
+        mock_collector = mock.MagicMock()
+        mock_collector.aggregating = False
+
+        with self._patched_unwinder() as u:
+            u.instance.get_stack_trace.return_value = stack_frames
+
+            manager = mock.Mock()
+            manager.attach_mock(u.instance.get_stack_trace, "unwind")
+            manager.attach_mock(mock_collector.collect, "collect")
+
+            profiler = SampleProfiler(
+                pid=12345, sample_interval_usec=10000, all_threads=False
+            )
+
+            times = [0.0, 0.01, 0.011, 0.02, 0.03]
+            with mock.patch("time.perf_counter", side_effect=times):
+                with io.StringIO() as output:
+                    with mock.patch("sys.stdout", output):
+                        profiler.sample(mock_collector, duration_sec=0.025)
+
+        self.assertEqual(
+            manager.mock_calls,
+            [
+                mock.call.unwind(),
+                mock.call.collect(stack_frames),
+                mock.call.unwind(),
+                mock.call.collect(stack_frames),
+            ],
+        )
+
+    def test_sample_profiler_flushes_aggregated_batches_at_limit(self):
+        """Test that aggregating collectors flush after MAX_PENDING_SAMPLES samples."""
+
+        stack_frames = [mock.sentinel.stack_frames]
+        mock_collector = mock.MagicMock()
+        mock_collector.aggregating = True
+
+        with self._patched_unwinder() as u:
+            u.instance.get_stack_trace.return_value = stack_frames
+
+            profiler = SampleProfiler(
+                pid=12345, sample_interval_usec=10000, all_threads=False
+            )
+
+            times = [
+                0.0,
+                0.01, 0.011,
+                0.02, 0.021,
+                0.03, 0.031,
+                0.04, 0.041,
+                0.05, 0.051,
+            ]
+            with mock.patch("profiling.sampling.sample.MAX_PENDING_SAMPLES", 2):
+                with mock.patch("time.perf_counter", side_effect=times):
+                    with io.StringIO() as output:
+                        with mock.patch("sys.stdout", output):
+                            profiler.sample(mock_collector, duration_sec=0.045)
+
+        batches = [
+            (c.args[0], len(c.kwargs["timestamps_us"]))
+            for c in mock_collector.collect.call_args_list
+        ]
+        self.assertEqual(
+            batches,
+            [(stack_frames, 2), (stack_frames, 2), (stack_frames, 1)],
+        )
 
     def test_sample_profiler_error_handling(self):
         """Test that the sample method handles errors gracefully."""
diff --git a/Misc/NEWS.d/next/Library/2026-05-12-13-03-45.gh-issue-149718.SaM1NJ.rst b/Misc/NEWS.d/next/Library/2026-05-12-13-03-45.gh-issue-149718.SaM1NJ.rst
new file mode 100644 (file)
index 0000000..25344e5
--- /dev/null
@@ -0,0 +1,4 @@
+Coalesce consecutive identical stack frames in Tachyon, so aggregating
+collectors (pstats, collapsed, flamegraph, gecko) receive one collect.
+Improves sample rate 3x, error rate and missed rate drop by 70%. Patch by
+Maurycy Pawłowski-Wieroński.