self.callers_graph = collections.defaultdict(set)
self.function_definitions = {}
+ # Map each sampled line to its function for proper caller lookup
+ # (filename, lineno) -> funcname
+ self.line_to_function = {}
+
# Edge counting for call path analysis
self.edge_samples = collections.Counter()
if funcname and (filename, funcname) not in self.function_definitions:
self.function_definitions[(filename, funcname)] = lineno
+ # Map this line to its function for caller/callee navigation
+ if funcname:
+ self.line_to_function[(filename, lineno)] = funcname
+
def _record_bytecode_sample(self, filename, lineno, opcode,
end_lineno=None, col_offset=None, end_col_offset=None,
weight=1):
return f"rgba({r}, {g}, {b}, {alpha})"
def _build_navigation_buttons(self, filename: str, line_num: int) -> str:
- """Build navigation buttons for callers/callees."""
+ """Build navigation buttons for callers/callees.
+
+ - Callers: All lines in a function show who calls this function
+ - Callees: Only actual call site lines show what they call
+ """
line_key = (filename, line_num)
- caller_list = self._deduplicate_by_function(self.callers_graph.get(line_key, set()))
+
+ funcname = self.line_to_function.get(line_key)
+
+ # Get callers: look up by function definition line, not current line
+ # This ensures all lines in a function show who calls this function
+ if funcname:
+ func_def_line = self.function_definitions.get((filename, funcname), line_num)
+ func_def_key = (filename, func_def_line)
+ caller_list = self._deduplicate_by_function(self.callers_graph.get(func_def_key, set()))
+ else:
+ caller_list = self._deduplicate_by_function(self.callers_graph.get(line_key, set()))
+
+ # Get callees: only show for actual call site lines (not every line in function)
callee_list = self._deduplicate_by_function(self.call_graph.get(line_key, set()))
# Get edge counts for each caller/callee
- callers_with_counts = self._get_edge_counts(line_key, caller_list, is_caller=True)
+ # For callers, use the function definition key for edge lookup
+ if funcname:
+ func_def_line = self.function_definitions.get((filename, funcname), line_num)
+ caller_edge_key = (filename, func_def_line)
+ else:
+ caller_edge_key = line_key
+ callers_with_counts = self._get_edge_counts(caller_edge_key, caller_list, is_caller=True)
+ # For callees, use the actual line key since that's where the call happens
callees_with_counts = self._get_edge_counts(line_key, callee_list, is_caller=False)
# Build navigation buttons with counts
self.assertEqual(collector.file_samples['test.py'][10], 1)
+def frame(filename, line, func):
+ """Create a frame tuple: (filename, location, funcname, opcode)."""
+ return (filename, (line, line, -1, -1), func, None)
+
+
+class TestHeatmapCollectorNavigationButtons(unittest.TestCase):
+ """Test navigation button behavior for caller/callee relationships.
+
+ For every call stack:
+ - Root frames (entry points): only DOWN arrow (callees)
+ - Middle frames: both UP and DOWN arrows
+ - Leaf frames: only UP arrow (callers)
+ """
+
+ def collect(self, *stacks):
+ """Create collector and process frame stacks."""
+ collector = HeatmapCollector(sample_interval_usec=100)
+ for stack in stacks:
+ collector.process_frames(stack, thread_id=1)
+ return collector
+
+ def test_deep_call_stack_relationships(self):
+ """Test root/middle/leaf navigation in a 5-level call stack."""
+ # Stack: root -> A -> B -> C -> leaf
+ stack = [
+ frame('leaf.py', 5, 'leaf'),
+ frame('c.py', 10, 'func_c'),
+ frame('b.py', 15, 'func_b'),
+ frame('a.py', 20, 'func_a'),
+ frame('root.py', 25, 'root'),
+ ]
+ c = self.collect(stack)
+
+ # Root: only callees (no one calls it)
+ self.assertIn(('root.py', 25), c.call_graph)
+ self.assertNotIn(('root.py', 25), c.callers_graph)
+
+ # Middle frames: both callers and callees
+ for key in [('a.py', 20), ('b.py', 15), ('c.py', 10)]:
+ self.assertIn(key, c.call_graph)
+ self.assertIn(key, c.callers_graph)
+
+ # Leaf: only callers (doesn't call anyone)
+ self.assertNotIn(('leaf.py', 5), c.call_graph)
+ self.assertIn(('leaf.py', 5), c.callers_graph)
+
+ def test_all_lines_in_function_see_callers(self):
+ """Test that interior lines map to their function for caller lookup."""
+ # Same function sampled at different lines (12, 15, 10)
+ c = self.collect(
+ [frame('mod.py', 12, 'my_func'), frame('caller.py', 100, 'caller')],
+ [frame('mod.py', 15, 'my_func'), frame('caller.py', 100, 'caller')],
+ [frame('mod.py', 10, 'my_func'), frame('caller.py', 100, 'caller')],
+ )
+
+ # All lines should map to same function
+ for line in [10, 12, 15]:
+ self.assertEqual(c.line_to_function[('mod.py', line)], 'my_func')
+
+ # Function definition line should have callers
+ func_def = c.function_definitions[('mod.py', 'my_func')]
+ self.assertIn(('mod.py', func_def), c.callers_graph)
+
+ def test_multiple_callers_and_callees(self):
+ """Test multiple callers/callees are recorded correctly."""
+ # Two callers -> target, and caller -> two callees
+ c = self.collect(
+ [frame('target.py', 10, 'target'), frame('caller1.py', 20, 'c1')],
+ [frame('target.py', 10, 'target'), frame('caller2.py', 30, 'c2')],
+ [frame('callee1.py', 5, 'f1'), frame('dispatcher.py', 40, 'dispatch')],
+ [frame('callee2.py', 6, 'f2'), frame('dispatcher.py', 40, 'dispatch')],
+ )
+
+ # Target has 2 callers
+ callers = c.callers_graph[('target.py', 10)]
+ self.assertEqual({x[0] for x in callers}, {'caller1.py', 'caller2.py'})
+
+ # Dispatcher has 2 callees
+ callees = c.call_graph[('dispatcher.py', 40)]
+ self.assertEqual({x[0] for x in callees}, {'callee1.py', 'callee2.py'})
+
+ def test_edge_samples_counted(self):
+ """Test that repeated calls accumulate edge counts."""
+ stack = [frame('callee.py', 10, 'callee'), frame('caller.py', 20, 'caller')]
+ c = self.collect(stack, stack, stack)
+
+ edge_key = (('caller.py', 20), ('callee.py', 10))
+ self.assertEqual(c.edge_samples[edge_key], 3)
+
+
class TestHeatmapCollectorExport(unittest.TestCase):
"""Test HeatmapCollector.export() method."""